Commit cb594811 authored by Gaston Dombiak's avatar Gaston Dombiak Committed by gato

Queue tasks for rooms that were not found while still joining the cluster. JM-1387

git-svn-id: http://svn.igniterealtime.org/svn/repos/openfire/branches/openfire_3_5_2@10502 b35dd754-fafc-0310-a699-88a17e54d16e
parent 51d3d455
......@@ -51,7 +51,12 @@ public class AddMember extends MUCRoomTask {
}
public void run() {
getRoom().memberAdded(this);
// Execute the operation considering that we may still be joining the cluster
execute(new Runnable() {
public void run() {
getRoom().memberAdded(AddMember.this);
}
});
}
public void writeExternal(ObjectOutput out) throws IOException {
......
......@@ -55,7 +55,12 @@ public class BroascastMessageRequest extends MUCRoomTask {
}
public void run() {
getRoom().broadcast(this);
// Execute the operation considering that we may still be joining the cluster
execute(new Runnable() {
public void run() {
getRoom().broadcast(BroascastMessageRequest.this);
}
});
}
public void writeExternal(ObjectOutput out) throws IOException {
......
......@@ -50,7 +50,12 @@ public class BroascastPresenceRequest extends MUCRoomTask {
}
public void run() {
getRoom().broadcast(this);
// Execute the operation considering that we may still be joining the cluster
execute(new Runnable() {
public void run() {
getRoom().broadcast(BroascastPresenceRequest.this);
}
});
}
public void writeExternal(ObjectOutput out) throws IOException {
......
......@@ -62,7 +62,12 @@ public class ChangeNickname extends MUCRoomTask {
}
public void run() {
getRoom().nicknameChanged(this);
// Execute the operation considering that we may still be joining the cluster
execute(new Runnable() {
public void run() {
getRoom().nicknameChanged(ChangeNickname.this);
}
});
}
public void writeExternal(ObjectOutput out) throws IOException {
......
......@@ -44,7 +44,12 @@ public class DestroyRoomRequest extends MUCRoomTask {
}
public void run() {
getRoom().destroyRoom(this);
// Execute the operation considering that we may still be joining the cluster
execute(new Runnable() {
public void run() {
getRoom().destroyRoom(DestroyRoomRequest.this);
}
});
}
public String getAlternateJID() {
......
......@@ -13,7 +13,9 @@
package org.jivesoftware.openfire.muc.cluster;
import org.jivesoftware.openfire.XMPPServer;
import org.jivesoftware.openfire.cluster.ClusterManager;
import org.jivesoftware.openfire.muc.spi.LocalMUCRoom;
import org.jivesoftware.util.Log;
import org.jivesoftware.util.cache.ClusterTask;
import org.jivesoftware.util.cache.ExternalizableUtil;
......@@ -30,19 +32,52 @@ import java.io.ObjectOutput;
*/
public abstract class MUCRoomTask implements ClusterTask {
private boolean originator;
private LocalMUCRoom room;
private String roomName;
protected MUCRoomTask() {
}
protected MUCRoomTask(LocalMUCRoom room) {
this.room = room;
this.roomName = room.getName();
}
public LocalMUCRoom getRoom() {
LocalMUCRoom room = (LocalMUCRoom) XMPPServer.getInstance().getMultiUserChatServer().getChatRoom(roomName);
if (room == null) {
throw new IllegalArgumentException("Room not found: " + roomName);
}
return room;
}
/**
* Executes the requested task considering that this JVM may still be joining the cluster.
* This means that events regarding rooms that were not loaded yet will be stored for later
* processing. Once the JVM is done joining the cluster queued tasks will be processed.
*
* @param runnable the task to execute.
*/
protected void execute(Runnable runnable) {
// Check if we are joining a cluster
boolean clusterStarting = ClusterManager.isClusteringStarting();
try {
// Check that the room exists
getRoom();
// Room was found so now execute the task
runnable.run();
}
catch (IllegalArgumentException e) {
// Room not found so check if we are still joining the cluster
if (clusterStarting) {
// Queue task in case the cluster
QueuedTasksManager.getInstance().addTask(this);
}
else {
// Task failed since room was not found
Log.error(e);
}
}
}
public boolean isOriginator() {
return originator;
}
......@@ -53,15 +88,11 @@ public abstract class MUCRoomTask implements ClusterTask {
public void writeExternal(ObjectOutput out) throws IOException {
ExternalizableUtil.getInstance().writeBoolean(out, originator);
ExternalizableUtil.getInstance().writeSafeUTF(out, room.getName());
ExternalizableUtil.getInstance().writeSafeUTF(out, roomName);
}
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
originator = ExternalizableUtil.getInstance().readBoolean(in);
String roomName = ExternalizableUtil.getInstance().readSafeUTF(in);
room = (LocalMUCRoom) XMPPServer.getInstance().getMultiUserChatServer().getChatRoom(roomName);
if (room == null) {
throw new IllegalArgumentException("Room not found: " + roomName);
}
roomName = ExternalizableUtil.getInstance().readSafeUTF(in);
}
}
......@@ -115,7 +115,12 @@ public class OccupantAddedEvent extends MUCRoomTask {
}
public void run() {
getRoom().occupantAdded(this);
// Execute the operation considering that we may still be joining the cluster
execute(new Runnable() {
public void run() {
getRoom().occupantAdded(OccupantAddedEvent.this);
}
});
}
public void writeExternal(ObjectOutput out) throws IOException {
......
......@@ -56,7 +56,12 @@ public class OccupantLeftEvent extends MUCRoomTask {
}
public void run() {
getRoom().leaveRoom(this);
// Execute the operation considering that we may still be joining the cluster
execute(new Runnable() {
public void run() {
getRoom().leaveRoom(OccupantLeftEvent.this);
}
});
}
public void writeExternal(ObjectOutput out) throws IOException {
......
/**
* $RCSfile: $
* $Revision: $
* $Date: $
*
* Copyright (C) 2008-2008 Jive Software. All rights reserved.
*
* This software is published under the terms of the GNU Public License (GPL),
* a copy of which is included in this distribution, or a commercial license
* agreement with Jive.
*/
package org.jivesoftware.openfire.muc.cluster;
import org.jivesoftware.openfire.cluster.ClusterManager;
import org.jivesoftware.util.TaskEngine;
import java.util.Queue;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
* Queue tasks while this JVM is joining the cluster and the requested room was still not loaded.
*
* @author Gaston Dombiak
*/
public class QueuedTasksManager {
private static QueuedTasksManager instance = new QueuedTasksManager();
private Queue<MUCRoomTask> taskQueue = new ConcurrentLinkedQueue<MUCRoomTask>();
public static QueuedTasksManager getInstance() {
return instance;
}
/**
* Hide the constructor so no one can create other instances
*/
private QueuedTasksManager() {
// Register a periodic task that will execute queued tasks
TaskEngine.getInstance().scheduleAtFixedRate(new TimerTask() {
public void run() {
if (!ClusterManager.isClusteringStarting()) {
MUCRoomTask mucRoomTask;
while ((mucRoomTask = taskQueue.poll()) != null) {
mucRoomTask.run();
}
}
}
}, 1000, 30000);
}
/**
* Queues a task. The queued task will be executed once this JVM completed joining the cluster.
* Moreover, if joining the cluster failed then the queue will also be consumed.
*
* @param task the task to queue.
*/
public void addTask(MUCRoomTask task) {
taskQueue.add(task);
}
}
......@@ -41,7 +41,12 @@ public class RoomUpdatedEvent extends MUCRoomTask {
}
public void run() {
getRoom().updateConfiguration(room);
// Execute the operation considering that we may still be joining the cluster
execute(new Runnable() {
public void run() {
getRoom().updateConfiguration(room);
}
});
}
public void writeExternal(ObjectOutput out) throws IOException {
......
......@@ -69,7 +69,12 @@ public class UpdateOccupant extends MUCRoomTask {
}
public void run() {
getRoom().occupantUpdated(this);
// Execute the operation considering that we may still be joining the cluster
execute(new Runnable() {
public void run() {
getRoom().occupantUpdated(UpdateOccupant.this);
}
});
}
public void writeExternal(ObjectOutput out) throws IOException {
......
......@@ -55,7 +55,12 @@ public class UpdatePresence extends MUCRoomTask {
}
public void run() {
getRoom().presenceUpdated(this);
// Execute the operation considering that we may still be joining the cluster
execute(new Runnable() {
public void run() {
getRoom().presenceUpdated(UpdatePresence.this);
}
});
}
public void writeExternal(ObjectOutput out) throws IOException {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment