ClearspaceMUCTranscriptManager.java 13.4 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/**
 * Copyright (C) 2004-2009 Jive Software. All rights reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

17 18
package org.jivesoftware.openfire.clearspace;

19 20 21 22 23 24 25 26
import java.util.ArrayList;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.TimerTask;

import org.dom4j.Element;
27
import org.jivesoftware.openfire.XMPPServer;
28 29 30 31 32
import org.jivesoftware.openfire.muc.MUCEventDispatcher;
import org.jivesoftware.openfire.muc.MUCEventListener;
import org.jivesoftware.openfire.muc.MUCRoom;
import org.jivesoftware.openfire.muc.MultiUserChatManager;
import org.jivesoftware.openfire.muc.MultiUserChatService;
33
import org.jivesoftware.openfire.user.UserNotFoundException;
34
import org.jivesoftware.util.JiveConstants;
35
import org.jivesoftware.util.JiveGlobals;
36 37 38 39
import org.jivesoftware.util.TaskEngine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xmpp.packet.IQ;
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
import org.xmpp.packet.JID;
import org.xmpp.packet.Message;

/**
 * Stores MUC events that are intended to be recorded as a transcript for a group chat room in Clearspace.
 * A task will periodically flush the queue of MUC events, sending them to the Clearspace component via XMPP
 * for parsing and storing.
 *
 * Clearspace is expected to handle the packets containing MUC events by parsing them as they come in, accumulating
 * them into a daily group chat transcript for the room it is associated with.
 *
 * The task will flush each queue of MUC events assoicated with a room based on either the size of the queue, or time.
 * If the size of the queue exceeds a limit we have set, or a certain period of time has elapsed,
 * the queue will be sent to Clearspace -- whichever happens first. (When we say size of the queue, we really mean
 * the effective size as it will appear in a transcript-update packet).
 *
 * Example of a transcript-update packet:
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
 *     <iq type='set' to='clearspace.example.org' from='clearspace-conference.example.org'>
 *         <transcript-update xmlns='http://jivesoftware.com/clearspace'>
 *             <presence from='user1@example.org'>
 *                 <roomjid>14-1234@clearspace-conference.example.org</roomjid>
 *                 <timestamp>1207933781000</timestamp>
 *             </presence>
 *             <message from='user1@example.org'>
 *                 <roomjid>14-1234@clearspace-conference.example.org</roomjid>
 *                 <timestamp>1207933783000</timestamp>
 *                 <body>user2, I won the lottery!</body>
 *             </message>
 *             <message from='user2@example.org'>
 *                 <roomjid>14-1234@clearspace-conference.example.org</roomjid>
 *                 <timestamp>1207933785000</timestamp>
 *                 <body>WHAT?!</body>
 *             </message>
 *             <message from='user1@example.org'>
 *                 <roomjid>14-1234@clearspace-conference.example.org</roomjid>
 *                 <timestamp>1207933787000</timestamp>
 *                 <body>April Fools!</body>
 *             </message>
 *             <presence from='user3@example.org' type='unavailable'>
 *                 <roomjid>14-1234@clearspace-conference.example.org</roomjid>
 *                 <timestamp>1207933789000</timestamp>
 *             </presence>
 *             <message from="user2@example.org">
 *                 <roomjid>14-1234@clearspace-conference.example.org</roomjid>
 *                 <timestamp>120793379100</timestamp>
 *                 <body>Wow, that was lame.</body>
 *             </message>
 *
 *               ...
 *         </transcript-update>
 *     </iq>
91 92 93 94 95
 *
 * @author Armando Jagucki
 */
public class ClearspaceMUCTranscriptManager implements MUCEventListener {

96 97
	private static final Logger Log = LoggerFactory.getLogger(ClearspaceMUCTranscriptManager.class);

98 99 100 101 102 103 104 105
    /**
     * Group chat events that are pending to be sent to Clearspace.
     */
    private final List<ClearspaceMUCTranscriptEvent> roomEvents;

    private final TaskEngine taskEngine;
    private TimerTask  transcriptUpdateTask;

106
    private final int MAX_QUEUE_SIZE = 64;
107
    private final long  FLUSH_PERIOD =
108
            JiveGlobals.getLongProperty("clearspace.transcript.flush.period", JiveConstants.MINUTE * 1);
109 110 111

    private String csMucDomain;
    private String csComponentAddress;
112 113 114 115

    public ClearspaceMUCTranscriptManager(TaskEngine taskEngine) {
        this.taskEngine = taskEngine;
        roomEvents = new ArrayList<ClearspaceMUCTranscriptEvent>();
116 117 118 119

        String xmppDomain = XMPPServer.getInstance().getServerInfo().getXMPPDomain();
        csMucDomain = ClearspaceManager.MUC_SUBDOMAIN + "." + xmppDomain;
        csComponentAddress = ClearspaceManager.CLEARSPACE_COMPONENT + "." + xmppDomain;
120 121 122 123 124 125 126
    }

    public void start() {
        MUCEventDispatcher.addListener(this);

        // Schedule a task for this new transcript event queue.
        transcriptUpdateTask = new TimerTask() {
127 128
            @Override
			public void run() {
129 130 131 132
                if (roomEvents.isEmpty()) {
                    return;
                }

133 134 135 136
                // Store JIDs of rooms that had presence changes, to track occupant counts
                // TODO: Refactor out into a different class
                Set<String> presenceRoomJids = new HashSet<String>();

137 138 139 140 141 142 143
                // Create the transcript-update packet
                IQ packet = new IQ();
                packet.setTo(csComponentAddress);
                packet.setFrom(csMucDomain);
                packet.setType(IQ.Type.set);
                Element transcriptElement = packet.setChildElement("transcript-update", "http://jivesoftware.com/clearspace");

144 145
                for (ClearspaceMUCTranscriptEvent event : roomEvents) {
                    // Add event to the packet
146 147 148
                    Element mucEventElement = null;

                    switch (event.type) {
149
                        case messageReceived:
150
                            mucEventElement = transcriptElement.addElement("message");
151
                            mucEventElement.addElement("body").setText(event.content);
152 153 154
                            break;
                        case occupantJoined:
                            mucEventElement = transcriptElement.addElement("presence");
155
                            presenceRoomJids.add(event.roomJID.toBareJID());
156 157 158 159
                            break;
                        case occupantLeft:
                            mucEventElement = transcriptElement.addElement("presence");
                            mucEventElement.addAttribute("type", "unavailable");
160
                            presenceRoomJids.add(event.roomJID.toBareJID());
161
                            break;
162 163 164 165
                        case roomSubjectChanged:
                            mucEventElement = transcriptElement.addElement("subject-change");
                            mucEventElement.addElement("subject").setText(event.content);
                            break;
166 167 168 169
                    }

                    // Now add those event fields that are common to all elements in the transcript-update packet.
                    if (mucEventElement != null) {
170 171 172 173 174 175
                        if (event.user != null) {
                            mucEventElement.addAttribute("from", event.user.toBareJID());
                        }
                        if (event.roomJID != null) {
                            mucEventElement.addElement("roomjid").setText(event.roomJID.toBareJID());
                        }
176 177
                        mucEventElement.addElement("timestamp").setText(Long.toString(event.timestamp));
                    }
178 179
                }

180 181 182 183 184 185 186
                // Add occupant count updates to packet
                // TODO: Refactor out into a different class
                MultiUserChatManager mucManager = XMPPServer.getInstance().getMultiUserChatManager();
                for (String roomJid : presenceRoomJids) {
                    JID jid = new JID(roomJid);
                    MultiUserChatService mucService = mucManager.getMultiUserChatService(jid);
                    MUCRoom room = mucService.getChatRoom(jid.getNode());
187 188
                    // Not count room owners as occupants
                    int totalOccupants = room.getOccupantsCount();
189
                    for (JID owner : room.getOwners()) {
190 191 192 193 194 195 196 197
                        try {
                            if (!room.getOccupantsByBareJID(owner).isEmpty()) {
                                totalOccupants--;
                            }
                        } catch (UserNotFoundException e) {
                            // Ignore
                        }
                    }
198 199 200

                    Element occUpdateElement = transcriptElement.addElement("occupant-count-update");
                    occUpdateElement.addElement("roomjid").setText(roomJid);
201
                    occUpdateElement.addElement("count").setText(Integer.toString(totalOccupants));
202 203
                }

204 205 206 207 208
                // Send the transcript-update packet to Clearspace.
                IQ result = ClearspaceManager.getInstance().query(packet, 15000);
                if (result == null) {
                    // No answer was received from Clearspace.
                    Log.warn("Did not get a reply from sending a transcript-update packet to Clearspace.");
209

210 211 212
                    // Return early so that the room-events queue is not cleared.
                    return;
                }
213 214
                else if (result.getType() == IQ.Type.error) {
                    // Clearspace was not able to process the transcript-update
215
                    Log.warn("Clearspace received a transcript-update packet but was not able to process it." + result.toXML());
216

217 218 219 220 221
                    // Return early so that the room-events queue is not cleared.
                    return;
                }

                // We can clear the queue now, as Clearspace has processed the transcript-update packet.
222 223 224 225 226 227 228 229 230 231 232 233
                roomEvents.clear();
            }
        };

        taskEngine.schedule(transcriptUpdateTask, FLUSH_PERIOD, FLUSH_PERIOD);
    }

    public void stop() {
        MUCEventDispatcher.removeListener(this);
    }

    public void roomCreated(JID roomJID) {
234
        // Do nothing
235 236 237
    }

    public void roomDestroyed(JID roomJID) {
238
        // Do nothing
239 240 241
    }

    public void occupantJoined(JID roomJID, JID user, String nickname) {
242
        if (isClearspaceRoom(roomJID) && !isRoomOwner(roomJID, user)) {
243 244
            addGroupChatEvent(ClearspaceMUCTranscriptEvent.occupantJoined(roomJID, user, new Date().getTime()));
        }
245 246 247
    }

    public void occupantLeft(JID roomJID, JID user) {
248
        if (isClearspaceRoom(roomJID) && !isRoomOwner(roomJID, user)) {
249 250
            addGroupChatEvent(ClearspaceMUCTranscriptEvent.occupantLeft(roomJID, user, new Date().getTime()));
        }
251 252 253
    }

    public void nicknameChanged(JID roomJID, JID user, String oldNickname, String newNickname) {
254
        // Do nothing
255 256 257
    }

    public void messageReceived(JID roomJID, JID user, String nickname, Message message) {
258
        if (isClearspaceRoom(roomJID) && !isRoomOwner(roomJID, user)) {
259 260 261
            addGroupChatEvent(ClearspaceMUCTranscriptEvent.messageReceived(roomJID, user, message.getBody(),
                    new Date().getTime()));
        }
262 263
    }

264 265 266
    public void privateMessageRecieved(JID fromJID, JID toJID, Message message) {
    }
    
267
    public void roomSubjectChanged(JID roomJID, JID user, String newSubject) {
268
        if (isClearspaceRoom(roomJID) && !isRoomOwner(roomJID, user)) {
269 270 271 272 273 274
            addGroupChatEvent(ClearspaceMUCTranscriptEvent.roomSubjectChanged(roomJID, user, newSubject,
                    new Date().getTime()));
        }
    }

    private boolean isRoomOwner(JID roomJID, JID user) {
275 276 277
        if (user == null || roomJID == null) {
            return false;
        }
278 279 280 281
        MultiUserChatService chatService =
                XMPPServer.getInstance().getMultiUserChatManager().getMultiUserChatService(roomJID);
        MUCRoom room = chatService.getChatRoom(roomJID.getNode());
        return room != null && room.getOwners().contains(user.toBareJID());
282 283
    }

284 285 286 287
    private boolean isClearspaceRoom(JID roomJID) {
        return roomJID.getDomain().equals(csMucDomain);
    }

288 289 290 291 292 293 294 295 296
    /**
     * Queues the group chat event to be later sent to Clearspace.
     *
     * @param event MUC transcript event.
     */
    private void addGroupChatEvent(ClearspaceMUCTranscriptEvent event) {
        roomEvents.add(event);

        // Check if we have exceeded the allowed size before a flush should occur.
297
        if (roomEvents.size() > MAX_QUEUE_SIZE) {
298
            // Flush the queue immediately and reschedule the task.
299
            forceQueueFlush();
300 301 302 303
        }
    }

    /**
304 305
     * Forces the transcript-event queue to be sent to Clearspace by running the transcript-update
     * task immediately.
306
     *
307
     * The transcript-update task is then rescheduled.
308
     */
309 310 311 312
    private void forceQueueFlush() {
        transcriptUpdateTask.cancel();
        transcriptUpdateTask.run();
        taskEngine.schedule(transcriptUpdateTask, FLUSH_PERIOD, FLUSH_PERIOD);
313 314
    }
}