PacketWriter.java 13.8 KB
Newer Older
Alexander Ivanov's avatar
Alexander Ivanov committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
/**
 * $RCSfile$
 * $Revision$
 * $Date$
 *
 * Copyright 2003-2007 Jive Software.
 *
 * All rights reserved. Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.jivesoftware.smack;

import org.jivesoftware.smack.packet.Packet;
import org.jivesoftware.smackx.packet.Ping;


import java.io.IOException;
import java.io.Writer;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

/**
 * Writes packets to a XMPP server. Packets are sent using a dedicated thread. Packet
 * interceptors can be registered to dynamically modify packets before they're actually
 * sent. Packet listeners can be registered to listen for all outgoing packets.
 *
37
 * @author Matt Tucker
Alexander Ivanov's avatar
Alexander Ivanov committed
38 39 40 41 42 43 44 45 46 47 48 49
 * @see Connection#addPacketInterceptor
 * @see Connection#addPacketSendingListener
 */
class PacketWriter {

    private Thread writerThread;
    private Thread keepAliveThread;
    private Writer writer;
    private XMPPConnection connection;
    private final BlockingQueue<Packet> queue;
    private boolean done;

50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105
    /**
     * Lock used to avoid access to incompatible values.
     */
    private final Object keepAliveAccessLock;

    /**
     * Lock for changing values while send or receive packet . This lock can be
     * hold for a long period. Should be taken before
     * {@link #keepAliveAccessLock}.
     */
    private final Object keepAliveWriteLock;

    /**
     * Keep-alive requests must be send to the server.
     */
    private boolean keepAliveIsEnabled;

    /**
     * Timestamp when the next ping request must be sent.
     */
    private long keepAliveNextRequest;

    /**
     * Timestamp when response from server must be received or connection will
     * be broken.
     */
    private Long keepAliveNextResponse;

    /**
     * Timestamp data sending must be completed or connection will be broken.
     */
    private Long sendNextComplete;

    /**
     * The number of milleseconds delay between sending keep-alive requests to
     * the server. The default value is 30000 ms. A value of -1 mean no
     * keep-alive requests will be sent to the server.
     */
    private final int keepAliveRequestInterval;

    /**
     * The number of milleseconds to wait answer from the server before
     * connection will be broken.
     */
    private final int keepAliveResponseInterval;

    /**
     * Packet for keep alive.
     */
    private static final String PING;

    static {
        Ping ping = new Ping();
        ping.setPacketID("ping");
        PING = ping.toXML();
    }
Alexander Ivanov's avatar
Alexander Ivanov committed
106 107 108 109 110 111 112 113 114

    /**
     * Creates a new packet writer with the specified connection.
     *
     * @param connection the connection.
     */
    protected PacketWriter(XMPPConnection connection) {
        this.queue = new LinkedBlockingQueue<Packet>();
        this.connection = connection;
115 116 117 118 119 120 121 122
        keepAliveRequestInterval = SmackConfiguration.getKeepAliveInterval();
        keepAliveResponseInterval = SmackConfiguration.getKeepAliveResponse();
        keepAliveIsEnabled = false;
        keepAliveNextRequest = System.currentTimeMillis();
        keepAliveNextResponse = null;
        sendNextComplete = null;
        keepAliveWriteLock = new Object();
        keepAliveAccessLock = new Object();
Alexander Ivanov's avatar
Alexander Ivanov committed
123 124 125
        init();
    }

126 127 128 129
    /**
     * Initializes the writer in order to be used. It is called at the first connection and also
     * is invoked if the connection is disconnected by an error.
     */
Alexander Ivanov's avatar
Alexander Ivanov committed
130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155
    protected void init() {
        this.writer = connection.writer;
        done = false;

        writerThread = new Thread() {
            public void run() {
                writePackets(this);
            }
        };
        writerThread.setName("Smack Packet Writer (" + connection.connectionCounterValue + ")");
        writerThread.setDaemon(true);
    }

    /**
     * Sends the specified packet to the server.
     *
     * @param packet the packet to send.
     */
    public void sendPacket(Packet packet) {
        if (!done) {
            // Invoke interceptors for the new packet that is about to be sent. Interceptors
            // may modify the content of the packet.
            connection.firePacketInterceptors(packet);

            try {
                queue.put(packet);
156
            } catch (InterruptedException ie) {
Alexander Ivanov's avatar
Alexander Ivanov committed
157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184
                ie.printStackTrace();
                return;
            }
            synchronized (queue) {
                queue.notifyAll();
            }

            // Process packet writer listeners. Note that we're using the sending
            // thread so it's expected that listeners are fast.
            connection.firePacketSendingListeners(packet);
        }
    }

    /**
     * Starts the packet writer thread and opens a connection to the server. The
     * packet writer will continue writing packets until {@link #shutdown} or an
     * error occurs.
     */
    public void startup() {
        writerThread.start();
    }

    /**
     * Starts the keep alive process. A white space (aka heartbeat) is going to be
     * sent to the server every 30 seconds (by default) since the last stanza was sent
     * to the server.
     */
    void startKeepAliveProcess() {
185 186 187 188 189 190 191 192 193 194 195 196 197 198
        // Schedule a keep-alive task to run if the feature is enabled. will
        // write
        // out a space character each time it runs to keep the TCP/IP connection
        // open.
        if (keepAliveRequestInterval < 0)
            return;
        resumeKeepAliveProcess();
        KeepAliveTask task = new KeepAliveTask();
        keepAliveThread = new Thread(task);
        task.setThread(keepAliveThread);
        keepAliveThread.setDaemon(true);
        keepAliveThread.setName("Smack Keep Alive ("
                + connection.connectionCounterValue + ")");
        keepAliveThread.start();
Alexander Ivanov's avatar
Alexander Ivanov committed
199 200
    }

201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222
    /**
     * Stops the keep alive process white SASL negotiation or compression
     * waiting.
     */
    void stopKeepAliveProcess() {
        synchronized (keepAliveWriteLock) {
            synchronized (keepAliveAccessLock) {
                keepAliveIsEnabled = false;
            }
        }
    }

    void resumeKeepAliveProcess() {
        if (keepAliveRequestInterval < 0)
            return;
        synchronized (keepAliveWriteLock) {
            synchronized (keepAliveAccessLock) {
                keepAliveIsEnabled = true;
                responseReceived();
            }
        }
    }
Alexander Ivanov's avatar
Alexander Ivanov committed
223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254

    void setWriter(Writer writer) {
        this.writer = writer;
    }

    /**
     * Shuts down the packet writer. Once this method has been called, no further
     * packets will be written to the server.
     */
    public void shutdown() {
        done = true;
        synchronized (queue) {
            queue.notifyAll();
        }
    }

    /**
     * Cleans up all resources used by the packet writer.
     */
    void cleanup() {
        connection.interceptors.clear();
        connection.sendListeners.clear();
    }

    /**
     * Returns the next available packet from the queue for writing.
     *
     * @return the next packet for writing.
     */
    private Packet nextPacket() {
        Packet packet = null;
        // Wait until there's a packet or we're done.
255
        while (!done && (packet = queue.poll()) == null) {
Alexander Ivanov's avatar
Alexander Ivanov committed
256 257
            try {
                synchronized (queue) {
258 259 260
                    queue.wait();
                }
            } catch (InterruptedException ie) {
Alexander Ivanov's avatar
Alexander Ivanov committed
261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276
                // Do nothing
            }
        }
        return packet;
    }

    private void writePackets(Thread thisThread) {
        try {
            // Open the stream.
            openStream();
            // Write out packets from the queue.
            while (!done && (writerThread == thisThread)) {
                Packet packet = nextPacket();
                if (packet != null) {
                    synchronized (writer) {
                        synchronized (keepAliveWriteLock) {
277 278 279 280 281 282 283 284 285 286 287 288
                            synchronized (keepAliveAccessLock) {
                                if (keepAliveIsEnabled)
                                    sendNextComplete = System.currentTimeMillis()
                                            + keepAliveResponseInterval;
                            }
                            writer.write(packet.toXML());
                            writer.flush();
                            synchronized (keepAliveAccessLock) {
                                if (keepAliveIsEnabled)
                                    sendNextComplete = null;
                            }
                        }
Alexander Ivanov's avatar
Alexander Ivanov committed
289 290 291 292 293 294 295 296
                    }
                }
            }
            // Flush out the rest of the queue. If the queue is extremely large, it's possible
            // we won't have time to entirely flush it before the socket is forced closed
            // by the shutdown process.
            try {
                synchronized (writer) {
297 298
                    while (!queue.isEmpty()) {
                        Packet packet = queue.remove();
Alexander Ivanov's avatar
Alexander Ivanov committed
299 300 301 302
                        writer.write(packet.toXML());
                    }
                    writer.flush();
                }
303
            } catch (Exception e) {
Alexander Ivanov's avatar
Alexander Ivanov committed
304 305 306 307 308 309 310 311 312 313
                e.printStackTrace();
            }

            // Delete the queue contents (hopefully nothing is left).
            queue.clear();

            // Close the stream.
            try {
                writer.write("</stream:stream>");
                writer.flush();
314
            } catch (Exception e) {
Alexander Ivanov's avatar
Alexander Ivanov committed
315
                // Do nothing
316
            } finally {
Alexander Ivanov's avatar
Alexander Ivanov committed
317 318
                try {
                    writer.close();
319
                } catch (Exception e) {
Alexander Ivanov's avatar
Alexander Ivanov committed
320 321 322
                    // Do nothing
                }
            }
323
        } catch (IOException ioe) {
Alexander Ivanov's avatar
Alexander Ivanov committed
324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348
            if (!done) {
                done = true;
                connection.packetReader.notifyConnectionError(ioe);
            }
        }
    }

    /**
     * Sends to the server a new stream element. This operation may be requested several times
     * so we need to encapsulate the logic in one place. This message will be sent while doing
     * TLS, SASL and resource binding.
     *
     * @throws IOException If an error occurs while sending the stanza to the server.
     */
    void openStream() throws IOException {
        StringBuilder stream = new StringBuilder();
        stream.append("<stream:stream");
        stream.append(" to=\"").append(connection.getServiceName()).append("\"");
        stream.append(" xmlns=\"jabber:client\"");
        stream.append(" xmlns:stream=\"http://etherx.jabber.org/streams\"");
        stream.append(" version=\"1.0\">");
        writer.write(stream.toString());
        writer.flush();
    }

349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436
    /**
     * Returns whether connection with server is alive.
     *
     * @return <code>false</code> if timeout occur.
     */
    boolean isAlive() {
        synchronized (keepAliveAccessLock) {
            if (!keepAliveIsEnabled)
                return true;
            long current = System.currentTimeMillis();
            if (keepAliveNextResponse != null
                    && keepAliveNextResponse <= current) {
                System.out.println("No response!");
                return false;
            }
            if (sendNextComplete != null && sendNextComplete <= current) {
                System.out.println("Not sent!");
                return false;
            }
            return true;
        }
    }

    /**
     * Some date from server was received.
     */
    void responseReceived() {
        synchronized (keepAliveWriteLock) {
            synchronized (keepAliveAccessLock) {
                if (!keepAliveIsEnabled)
                    return;
                keepAliveNextRequest = System.currentTimeMillis()
                        + keepAliveRequestInterval;
                keepAliveNextResponse = null;
            }
        }
    }

    /**
     * A task that keeps connections to the server alive by sending a ping on an
     * interval.
     */
    private class KeepAliveTask implements Runnable {

        private Thread thread;

        protected void setThread(Thread thread) {
            this.thread = thread;
        }

        private void ping() {
            synchronized (writer) {
                synchronized (keepAliveWriteLock) {
                    synchronized (keepAliveAccessLock) {
                        // Don`t ping until response will be received.
                        if (!keepAliveIsEnabled
                                || keepAliveNextResponse != null)
                            return;
                        long current = System.currentTimeMillis();
                        if (keepAliveNextRequest > current)
                            return;
                        sendNextComplete = current + keepAliveResponseInterval;
                    }
                    try {
                        writer.write(PING);
                        writer.flush();
                    } catch (IOException ioe) {
                    }
                    synchronized (keepAliveAccessLock) {
                        sendNextComplete = null;
                        keepAliveNextResponse = System.currentTimeMillis()
                                + keepAliveResponseInterval;
                    }
                }
            }
        }

        public void run() {
            while (!done && keepAliveThread == thread) {
                ping();
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException ie) {
                    // Do nothing
                }
            }
        }
    }
Alexander Ivanov's avatar
Alexander Ivanov committed
437
}