ClearspaceMUCTranscriptManager.java 13.4 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/**
 * Copyright (C) 2004-2009 Jive Software. All rights reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

17 18
package org.jivesoftware.openfire.clearspace;

19 20 21 22 23 24 25 26
import java.util.ArrayList;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.TimerTask;

import org.dom4j.Element;
27
import org.jivesoftware.openfire.XMPPServer;
28 29 30 31 32
import org.jivesoftware.openfire.muc.MUCEventDispatcher;
import org.jivesoftware.openfire.muc.MUCEventListener;
import org.jivesoftware.openfire.muc.MUCRoom;
import org.jivesoftware.openfire.muc.MultiUserChatManager;
import org.jivesoftware.openfire.muc.MultiUserChatService;
33
import org.jivesoftware.openfire.user.UserNotFoundException;
34
import org.jivesoftware.util.JiveConstants;
35
import org.jivesoftware.util.JiveGlobals;
36 37 38 39
import org.jivesoftware.util.TaskEngine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xmpp.packet.IQ;
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
import org.xmpp.packet.JID;
import org.xmpp.packet.Message;

/**
 * Stores MUC events that are intended to be recorded as a transcript for a group chat room in Clearspace.
 * A task will periodically flush the queue of MUC events, sending them to the Clearspace component via XMPP
 * for parsing and storing.
 *
 * Clearspace is expected to handle the packets containing MUC events by parsing them as they come in, accumulating
 * them into a daily group chat transcript for the room it is associated with.
 *
 * The task will flush each queue of MUC events assoicated with a room based on either the size of the queue, or time.
 * If the size of the queue exceeds a limit we have set, or a certain period of time has elapsed,
 * the queue will be sent to Clearspace -- whichever happens first. (When we say size of the queue, we really mean
 * the effective size as it will appear in a transcript-update packet).
 *
 * Example of a transcript-update packet:
57 58
 * <pre>
 * {@code
59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
 *     <iq type='set' to='clearspace.example.org' from='clearspace-conference.example.org'>
 *         <transcript-update xmlns='http://jivesoftware.com/clearspace'>
 *             <presence from='user1@example.org'>
 *                 <roomjid>14-1234@clearspace-conference.example.org</roomjid>
 *                 <timestamp>1207933781000</timestamp>
 *             </presence>
 *             <message from='user1@example.org'>
 *                 <roomjid>14-1234@clearspace-conference.example.org</roomjid>
 *                 <timestamp>1207933783000</timestamp>
 *                 <body>user2, I won the lottery!</body>
 *             </message>
 *             <message from='user2@example.org'>
 *                 <roomjid>14-1234@clearspace-conference.example.org</roomjid>
 *                 <timestamp>1207933785000</timestamp>
 *                 <body>WHAT?!</body>
 *             </message>
 *             <message from='user1@example.org'>
 *                 <roomjid>14-1234@clearspace-conference.example.org</roomjid>
 *                 <timestamp>1207933787000</timestamp>
 *                 <body>April Fools!</body>
 *             </message>
 *             <presence from='user3@example.org' type='unavailable'>
 *                 <roomjid>14-1234@clearspace-conference.example.org</roomjid>
 *                 <timestamp>1207933789000</timestamp>
 *             </presence>
 *             <message from="user2@example.org">
 *                 <roomjid>14-1234@clearspace-conference.example.org</roomjid>
 *                 <timestamp>120793379100</timestamp>
 *                 <body>Wow, that was lame.</body>
 *             </message>
 *
 *               ...
 *         </transcript-update>
 *     </iq>
93 94
 * }
 * </pre>
95 96 97 98
 * @author Armando Jagucki
 */
public class ClearspaceMUCTranscriptManager implements MUCEventListener {

99 100
	private static final Logger Log = LoggerFactory.getLogger(ClearspaceMUCTranscriptManager.class);

101 102 103 104 105 106 107 108
    /**
     * Group chat events that are pending to be sent to Clearspace.
     */
    private final List<ClearspaceMUCTranscriptEvent> roomEvents;

    private final TaskEngine taskEngine;
    private TimerTask  transcriptUpdateTask;

109
    private final int MAX_QUEUE_SIZE = 64;
110
    private final long  FLUSH_PERIOD =
111
            JiveGlobals.getLongProperty("clearspace.transcript.flush.period", JiveConstants.MINUTE * 1);
112 113 114

    private String csMucDomain;
    private String csComponentAddress;
115 116 117 118

    public ClearspaceMUCTranscriptManager(TaskEngine taskEngine) {
        this.taskEngine = taskEngine;
        roomEvents = new ArrayList<ClearspaceMUCTranscriptEvent>();
119 120 121 122

        String xmppDomain = XMPPServer.getInstance().getServerInfo().getXMPPDomain();
        csMucDomain = ClearspaceManager.MUC_SUBDOMAIN + "." + xmppDomain;
        csComponentAddress = ClearspaceManager.CLEARSPACE_COMPONENT + "." + xmppDomain;
123 124 125 126 127 128 129
    }

    public void start() {
        MUCEventDispatcher.addListener(this);

        // Schedule a task for this new transcript event queue.
        transcriptUpdateTask = new TimerTask() {
130 131
            @Override
			public void run() {
132 133 134 135
                if (roomEvents.isEmpty()) {
                    return;
                }

136 137 138 139
                // Store JIDs of rooms that had presence changes, to track occupant counts
                // TODO: Refactor out into a different class
                Set<String> presenceRoomJids = new HashSet<String>();

140 141 142 143 144 145 146
                // Create the transcript-update packet
                IQ packet = new IQ();
                packet.setTo(csComponentAddress);
                packet.setFrom(csMucDomain);
                packet.setType(IQ.Type.set);
                Element transcriptElement = packet.setChildElement("transcript-update", "http://jivesoftware.com/clearspace");

147 148
                for (ClearspaceMUCTranscriptEvent event : roomEvents) {
                    // Add event to the packet
149 150 151
                    Element mucEventElement = null;

                    switch (event.type) {
152
                        case messageReceived:
153
                            mucEventElement = transcriptElement.addElement("message");
154
                            mucEventElement.addElement("body").setText(event.content);
155 156 157
                            break;
                        case occupantJoined:
                            mucEventElement = transcriptElement.addElement("presence");
158
                            presenceRoomJids.add(event.roomJID.toBareJID());
159 160 161 162
                            break;
                        case occupantLeft:
                            mucEventElement = transcriptElement.addElement("presence");
                            mucEventElement.addAttribute("type", "unavailable");
163
                            presenceRoomJids.add(event.roomJID.toBareJID());
164
                            break;
165 166 167 168
                        case roomSubjectChanged:
                            mucEventElement = transcriptElement.addElement("subject-change");
                            mucEventElement.addElement("subject").setText(event.content);
                            break;
169 170 171 172
                    }

                    // Now add those event fields that are common to all elements in the transcript-update packet.
                    if (mucEventElement != null) {
173 174 175 176 177 178
                        if (event.user != null) {
                            mucEventElement.addAttribute("from", event.user.toBareJID());
                        }
                        if (event.roomJID != null) {
                            mucEventElement.addElement("roomjid").setText(event.roomJID.toBareJID());
                        }
179 180
                        mucEventElement.addElement("timestamp").setText(Long.toString(event.timestamp));
                    }
181 182
                }

183 184 185 186 187 188 189
                // Add occupant count updates to packet
                // TODO: Refactor out into a different class
                MultiUserChatManager mucManager = XMPPServer.getInstance().getMultiUserChatManager();
                for (String roomJid : presenceRoomJids) {
                    JID jid = new JID(roomJid);
                    MultiUserChatService mucService = mucManager.getMultiUserChatService(jid);
                    MUCRoom room = mucService.getChatRoom(jid.getNode());
190 191
                    // Not count room owners as occupants
                    int totalOccupants = room.getOccupantsCount();
192
                    for (JID owner : room.getOwners()) {
193 194 195 196 197 198 199 200
                        try {
                            if (!room.getOccupantsByBareJID(owner).isEmpty()) {
                                totalOccupants--;
                            }
                        } catch (UserNotFoundException e) {
                            // Ignore
                        }
                    }
201 202 203

                    Element occUpdateElement = transcriptElement.addElement("occupant-count-update");
                    occUpdateElement.addElement("roomjid").setText(roomJid);
204
                    occUpdateElement.addElement("count").setText(Integer.toString(totalOccupants));
205 206
                }

207 208 209 210 211
                // Send the transcript-update packet to Clearspace.
                IQ result = ClearspaceManager.getInstance().query(packet, 15000);
                if (result == null) {
                    // No answer was received from Clearspace.
                    Log.warn("Did not get a reply from sending a transcript-update packet to Clearspace.");
212

213 214 215
                    // Return early so that the room-events queue is not cleared.
                    return;
                }
216 217
                else if (result.getType() == IQ.Type.error) {
                    // Clearspace was not able to process the transcript-update
218
                    Log.warn("Clearspace received a transcript-update packet but was not able to process it." + result.toXML());
219

220 221 222 223 224
                    // Return early so that the room-events queue is not cleared.
                    return;
                }

                // We can clear the queue now, as Clearspace has processed the transcript-update packet.
225 226 227 228 229 230 231 232 233 234 235 236
                roomEvents.clear();
            }
        };

        taskEngine.schedule(transcriptUpdateTask, FLUSH_PERIOD, FLUSH_PERIOD);
    }

    public void stop() {
        MUCEventDispatcher.removeListener(this);
    }

    public void roomCreated(JID roomJID) {
237
        // Do nothing
238 239 240
    }

    public void roomDestroyed(JID roomJID) {
241
        // Do nothing
242 243 244
    }

    public void occupantJoined(JID roomJID, JID user, String nickname) {
245
        if (isClearspaceRoom(roomJID) && !isRoomOwner(roomJID, user)) {
246 247
            addGroupChatEvent(ClearspaceMUCTranscriptEvent.occupantJoined(roomJID, user, new Date().getTime()));
        }
248 249 250
    }

    public void occupantLeft(JID roomJID, JID user) {
251
        if (isClearspaceRoom(roomJID) && !isRoomOwner(roomJID, user)) {
252 253
            addGroupChatEvent(ClearspaceMUCTranscriptEvent.occupantLeft(roomJID, user, new Date().getTime()));
        }
254 255 256
    }

    public void nicknameChanged(JID roomJID, JID user, String oldNickname, String newNickname) {
257
        // Do nothing
258 259 260
    }

    public void messageReceived(JID roomJID, JID user, String nickname, Message message) {
261
        if (isClearspaceRoom(roomJID) && !isRoomOwner(roomJID, user)) {
262 263 264
            addGroupChatEvent(ClearspaceMUCTranscriptEvent.messageReceived(roomJID, user, message.getBody(),
                    new Date().getTime()));
        }
265 266
    }

267 268 269
    public void privateMessageRecieved(JID fromJID, JID toJID, Message message) {
    }
    
270
    public void roomSubjectChanged(JID roomJID, JID user, String newSubject) {
271
        if (isClearspaceRoom(roomJID) && !isRoomOwner(roomJID, user)) {
272 273 274 275 276 277
            addGroupChatEvent(ClearspaceMUCTranscriptEvent.roomSubjectChanged(roomJID, user, newSubject,
                    new Date().getTime()));
        }
    }

    private boolean isRoomOwner(JID roomJID, JID user) {
278 279 280
        if (user == null || roomJID == null) {
            return false;
        }
281 282 283
        MultiUserChatService chatService =
                XMPPServer.getInstance().getMultiUserChatManager().getMultiUserChatService(roomJID);
        MUCRoom room = chatService.getChatRoom(roomJID.getNode());
284
        return room != null && room.getOwners().contains(user);
285 286
    }

287 288 289 290
    private boolean isClearspaceRoom(JID roomJID) {
        return roomJID.getDomain().equals(csMucDomain);
    }

291 292 293 294 295 296 297 298 299
    /**
     * Queues the group chat event to be later sent to Clearspace.
     *
     * @param event MUC transcript event.
     */
    private void addGroupChatEvent(ClearspaceMUCTranscriptEvent event) {
        roomEvents.add(event);

        // Check if we have exceeded the allowed size before a flush should occur.
300
        if (roomEvents.size() > MAX_QUEUE_SIZE) {
301
            // Flush the queue immediately and reschedule the task.
302
            forceQueueFlush();
303 304 305 306
        }
    }

    /**
307 308
     * Forces the transcript-event queue to be sent to Clearspace by running the transcript-update
     * task immediately.
309
     *
310
     * The transcript-update task is then rescheduled.
311
     */
312 313 314 315
    private void forceQueueFlush() {
        transcriptUpdateTask.cancel();
        transcriptUpdateTask.run();
        taskEngine.schedule(transcriptUpdateTask, FLUSH_PERIOD, FLUSH_PERIOD);
316 317
    }
}