PubSubPersistenceManager.java 78 KB
Newer Older
Matt Tucker's avatar
Matt Tucker committed
1 2 3 4 5
/**
 * $RCSfile: $
 * $Revision: $
 * $Date: $
 *
6
 * Copyright (C) 2005-2008 Jive Software. All rights reserved.
Matt Tucker's avatar
Matt Tucker committed
7
 *
8 9 10 11 12 13 14 15 16 17 18
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
Matt Tucker's avatar
Matt Tucker committed
19 20
 */

21
package org.jivesoftware.openfire.pubsub;
Matt Tucker's avatar
Matt Tucker committed
22 23 24 25 26

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
27 28
import java.util.ArrayList;
import java.util.Collection;
29
import java.util.Collections;
30 31
import java.util.Date;
import java.util.HashMap;
32
import java.util.Iterator;
33
import java.util.List;
34
import java.util.Map;
35
import java.util.Random;
36
import java.util.StringTokenizer;
37
import java.util.TimerTask;
38
import java.util.concurrent.locks.Lock;
Matt Tucker's avatar
Matt Tucker committed
39

40
import org.jivesoftware.database.DbConnectionManager;
41
import org.jivesoftware.database.DbConnectionManager.DatabaseType;
42
import org.jivesoftware.openfire.cluster.ClusterManager;
43
import org.jivesoftware.openfire.pubsub.cluster.FlushTask;
44 45
import org.jivesoftware.openfire.pubsub.models.AccessModel;
import org.jivesoftware.openfire.pubsub.models.PublisherModel;
46 47 48
import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.LinkedList;
import org.jivesoftware.util.LinkedListNode;
49
import org.jivesoftware.util.StringUtils;
50
import org.jivesoftware.util.TaskEngine;
51
import org.jivesoftware.util.cache.Cache;
52
import org.jivesoftware.util.cache.CacheFactory;
53 54 55 56
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xmpp.packet.JID;

Matt Tucker's avatar
Matt Tucker committed
57 58 59 60 61 62 63
/**
 * A manager responsible for ensuring node persistence.
 *
 * @author Matt Tucker
 */
public class PubSubPersistenceManager {

64
    private static final Logger log = LoggerFactory.getLogger(PubSubPersistenceManager.class);
65

66 67 68
    private static final String PERSISTENT_NODES = "SELECT serviceID, nodeID, maxItems " +
    		"FROM ofPubsubNode WHERE leaf=1 AND persistItems=1 AND maxItems > 0";
    
69
    private static final String PURGE_FOR_SIZE =
70 71 72 73 74
    		"DELETE ofPubsubItem FROM ofPubsubItem LEFT JOIN " +
			"(SELECT id FROM ofPubsubItem WHERE serviceID=? AND nodeID=? " +
			"ORDER BY creationDate DESC LIMIT ?) AS noDelete " +
			"ON ofPubsubItem.id = noDelete.id WHERE noDelete.id IS NULL AND " +
			"ofPubsubItem.serviceID = ? AND nodeID = ?";
75

76 77
	private static final String PURGE_FOR_SIZE_HSQLDB = "DELETE FROM ofPubsubItem WHERE serviceID=? AND nodeID=? AND id NOT IN "
			+ "(SELECT id FROM ofPubsubItem WHERE serviceID=? AND nodeID=? ORDER BY creationDate DESC LIMIT ?)";
78

79
	private static final String LOAD_NODES =
Matt Tucker's avatar
Matt Tucker committed
80 81 82
            "SELECT nodeID, leaf, creationDate, modificationDate, parent, deliverPayloads, " +
            "maxPayloadSize, persistItems, maxItems, notifyConfigChanges, notifyDelete, " +
            "notifyRetract, presenceBased, sendItemSubscribe, publisherModel, " +
83 84
            "subscriptionEnabled, configSubscription, accessModel, payloadType, " +
            "bodyXSLT, dataformXSLT, creator, description, language, name, " +
85
            "replyPolicy, associationPolicy, maxLeafNodes FROM ofPubsubNode " +
86 87
 "WHERE serviceID=?";

88
	private static final String LOAD_NODE = LOAD_NODES + " AND nodeID=?";
89

Matt Tucker's avatar
Matt Tucker committed
90
    private static final String UPDATE_NODE =
91
            "UPDATE ofPubsubNode SET modificationDate=?, parent=?, deliverPayloads=?, " +
Matt Tucker's avatar
Matt Tucker committed
92 93 94
            "maxPayloadSize=?, persistItems=?, maxItems=?, " +
            "notifyConfigChanges=?, notifyDelete=?, notifyRetract=?, presenceBased=?, " +
            "sendItemSubscribe=?, publisherModel=?, subscriptionEnabled=?, configSubscription=?, " +
95 96
            "accessModel=?, payloadType=?, bodyXSLT=?, dataformXSLT=?, description=?, " +
            "language=?, name=?, replyPolicy=?, associationPolicy=?, maxLeafNodes=? " +
Matt Tucker's avatar
Matt Tucker committed
97 98
            "WHERE serviceID=? AND nodeID=?";
    private static final String ADD_NODE =
99
            "INSERT INTO ofPubsubNode (serviceID, nodeID, leaf, creationDate, modificationDate, " +
Matt Tucker's avatar
Matt Tucker committed
100 101 102
            "parent, deliverPayloads, maxPayloadSize, persistItems, maxItems, " +
            "notifyConfigChanges, notifyDelete, notifyRetract, presenceBased, " +
            "sendItemSubscribe, publisherModel, subscriptionEnabled, configSubscription, " +
103 104 105
            "accessModel, payloadType, bodyXSLT, dataformXSLT, creator, description, " +
            "language, name, replyPolicy, associationPolicy, maxLeafNodes) " +
            "VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)";
Matt Tucker's avatar
Matt Tucker committed
106
    private static final String DELETE_NODE =
107
            "DELETE FROM ofPubsubNode WHERE serviceID=? AND nodeID=?";
Matt Tucker's avatar
Matt Tucker committed
108

109
    private static final String LOAD_NODES_JIDS =
110
            "SELECT nodeID, jid, associationType FROM ofPubsubNodeJIDs WHERE serviceID=?";
111 112
	private static final String LOAD_NODE_JIDS = "SELECT nodeID, jid, associationType FROM ofPubsubNodeJIDs WHERE serviceID=? AND nodeID=?";
	private static final String ADD_NODE_JIDS =
113
            "INSERT INTO ofPubsubNodeJIDs (serviceID, nodeID, jid, associationType) " +
114 115
            "VALUES (?,?,?,?)";
    private static final String DELETE_NODE_JIDS =
116
            "DELETE FROM ofPubsubNodeJIDs WHERE serviceID=? AND nodeID=?";
117 118

    private static final String LOAD_NODES_GROUPS =
119
            "SELECT nodeID, rosterGroup FROM ofPubsubNodeGroups WHERE serviceID=?";
120
	private static final String LOAD_NODE_GROUPS = "SELECT nodeID, rosterGroup FROM ofPubsubNodeGroups WHERE serviceID=? AND nodeID=?";
121
    private static final String ADD_NODE_GROUPS =
122
            "INSERT INTO ofPubsubNodeGroups (serviceID, nodeID, rosterGroup) " +
123 124
            "VALUES (?,?,?)";
    private static final String DELETE_NODE_GROUPS =
125
            "DELETE FROM ofPubsubNodeGroups WHERE serviceID=? AND nodeID=?";
126

Matt Tucker's avatar
Matt Tucker committed
127
    private static final String LOAD_AFFILIATIONS =
128
            "SELECT nodeID,jid,affiliation FROM ofPubsubAffiliation WHERE serviceID=? " +
Matt Tucker's avatar
Matt Tucker committed
129
            "ORDER BY nodeID";
130
	private static final String LOAD_NODE_AFFILIATIONS = "SELECT nodeID,jid,affiliation FROM ofPubsubAffiliation WHERE serviceID=? AND nodeID=?";
Matt Tucker's avatar
Matt Tucker committed
131
    private static final String ADD_AFFILIATION =
132
            "INSERT INTO ofPubsubAffiliation (serviceID,nodeID,jid,affiliation) VALUES (?,?,?,?)";
Matt Tucker's avatar
Matt Tucker committed
133
    private static final String UPDATE_AFFILIATION =
134
            "UPDATE ofPubsubAffiliation SET affiliation=? WHERE serviceID=? AND nodeID=? AND jid=?";
Matt Tucker's avatar
Matt Tucker committed
135
    private static final String DELETE_AFFILIATION =
136
            "DELETE FROM ofPubsubAffiliation WHERE serviceID=? AND nodeID=? AND jid=?";
Matt Tucker's avatar
Matt Tucker committed
137
    private static final String DELETE_AFFILIATIONS =
138
            "DELETE FROM ofPubsubAffiliation WHERE serviceID=? AND nodeID=?";
Matt Tucker's avatar
Matt Tucker committed
139

140 141 142 143 144 145 146
	private static final String LOAD_SUBSCRIPTIONS_BASE = "SELECT nodeID, id, jid, owner, state, deliver, digest, digest_frequency, "
			+ "expire, includeBody, showValues, subscriptionType, subscriptionDepth, "
			+ "keyword FROM ofPubsubSubscription WHERE serviceID=? ";
	private static final String LOAD_NODE_SUBSCRIPTION = LOAD_SUBSCRIPTIONS_BASE + "AND nodeID=? AND id=?";
	private static final String LOAD_NODE_SUBSCRIPTIONS = LOAD_SUBSCRIPTIONS_BASE + "AND nodeID=?";
	private static final String LOAD_SUBSCRIPTIONS = LOAD_SUBSCRIPTIONS_BASE + "ORDER BY nodeID";

Matt Tucker's avatar
Matt Tucker committed
147
    private static final String ADD_SUBSCRIPTION =
148
            "INSERT INTO ofPubsubSubscription (serviceID, nodeID, id, jid, owner, state, " +
Matt Tucker's avatar
Matt Tucker committed
149
            "deliver, digest, digest_frequency, expire, includeBody, showValues, " +
Matt Tucker's avatar
Matt Tucker committed
150
            "subscriptionType, subscriptionDepth, keyword) " +
Matt Tucker's avatar
Matt Tucker committed
151
            "VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)";
Matt Tucker's avatar
Matt Tucker committed
152
    private static final String UPDATE_SUBSCRIPTION =
153
            "UPDATE ofPubsubSubscription SET owner=?, state=?, deliver=?, digest=?, " +
Matt Tucker's avatar
Matt Tucker committed
154 155 156
            "digest_frequency=?, expire=?, includeBody=?, showValues=?, subscriptionType=?, " +
            "subscriptionDepth=?, keyword=? WHERE serviceID=? AND nodeID=? AND id=?";
    private static final String DELETE_SUBSCRIPTION =
157
            "DELETE FROM ofPubsubSubscription WHERE serviceID=? AND nodeID=? AND id=?";
Matt Tucker's avatar
Matt Tucker committed
158
    private static final String DELETE_SUBSCRIPTIONS =
159
            "DELETE FROM ofPubsubSubscription WHERE serviceID=? AND nodeID=?";
Matt Tucker's avatar
Matt Tucker committed
160
    private static final String LOAD_ITEMS =
161
            "SELECT id,jid,creationDate,payload FROM ofPubsubItem " +
162
            "WHERE serviceID=? AND nodeID=? ORDER BY creationDate DESC";
163 164 165 166
    private static final String LOAD_ITEM =
            "SELECT jid,creationDate,payload FROM ofPubsubItem " +
            "WHERE serviceID=? AND nodeID=? AND id=?";
    private static final String LOAD_LAST_ITEM =
167 168
            "SELECT id,jid,creationDate,payload FROM ofPubsubItem " +
            "WHERE serviceID=? AND nodeID=? ORDER BY creationDate DESC";
Matt Tucker's avatar
Matt Tucker committed
169
    private static final String ADD_ITEM =
170
            "INSERT INTO ofPubsubItem (serviceID,nodeID,id,jid,creationDate,payload) " +
Matt Tucker's avatar
Matt Tucker committed
171 172
            "VALUES (?,?,?,?,?,?)";
    private static final String DELETE_ITEM =
173
            "DELETE FROM ofPubsubItem WHERE serviceID=? AND nodeID=? AND id=?";
Matt Tucker's avatar
Matt Tucker committed
174
    private static final String DELETE_ITEMS =
175
            "DELETE FROM ofPubsubItem WHERE serviceID=? AND nodeID=?";
Matt Tucker's avatar
Matt Tucker committed
176 177 178 179 180 181

    private static final String LOAD_DEFAULT_CONF =
            "SELECT deliverPayloads, maxPayloadSize, persistItems, maxItems, " +
            "notifyConfigChanges, notifyDelete, notifyRetract, presenceBased, " +
            "sendItemSubscribe, publisherModel, subscriptionEnabled, accessModel, language, " +
            "replyPolicy, associationPolicy, maxLeafNodes " +
182
            "FROM ofPubsubDefaultConf WHERE serviceID=? AND leaf=?";
Matt Tucker's avatar
Matt Tucker committed
183
    private static final String UPDATE_DEFAULT_CONF =
184
            "UPDATE ofPubsubDefaultConf SET deliverPayloads=?, maxPayloadSize=?, persistItems=?, " +
Matt Tucker's avatar
Matt Tucker committed
185 186 187 188 189
            "maxItems=?, notifyConfigChanges=?, notifyDelete=?, notifyRetract=?, " +
            "presenceBased=?, sendItemSubscribe=?, publisherModel=?, subscriptionEnabled=?, " +
            "accessModel=?, language=? replyPolicy=?, associationPolicy=?, maxLeafNodes=? " +
            "WHERE serviceID=? AND leaf=?";
    private static final String ADD_DEFAULT_CONF =
190
            "INSERT INTO ofPubsubDefaultConf (serviceID, leaf, deliverPayloads, maxPayloadSize, " +
Matt Tucker's avatar
Matt Tucker committed
191 192 193 194 195
            "persistItems, maxItems, notifyConfigChanges, notifyDelete, notifyRetract, " +
            "presenceBased, sendItemSubscribe, publisherModel, subscriptionEnabled, " +
            "accessModel, language, replyPolicy, associationPolicy, maxLeafNodes) " +
            "VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)";

196 197 198 199 200 201 202 203 204 205 206
    /**
     * Pseudo-random number generator is used to offset timing for scheduled tasks
     * within a cluster (so they don't run at the same time on all members).
     */
    private static Random prng = new Random();
    
    /**
     * Flush timer delay is configurable, but not less than 20 seconds (default: 2 mins)
     */
    private static long flushTimerDelay = Math.max(20000, 
    		JiveGlobals.getIntProperty("xmpp.pubsub.flush.timer", 120)*1000);
207

208 209 210 211 212
    /**
     * Purge timer delay is configurable, but not less than 60 seconds (default: 5 mins)
     */
    private static long purgeTimerDelay = Math.max(60000, 
    		JiveGlobals.getIntProperty("xmpp.pubsub.purge.timer", 300)*1000);
213

214 215 216 217
    /**
     * Maximum number of published items allowed in the write cache
     * before being flushed to the database.
     */
218
	private static final int MAX_ITEMS_FLUSH = JiveGlobals.getIntProperty("xmpp.pubsub.flush.max", 1000);
219

220 221 222
    /**
     * Maximum number of rows that will be fetched from the published items table.
     */
223
    private static final int MAX_ROWS_FETCH = JiveGlobals.getIntProperty("xmpp.pubsub.fetch.max", 2000);
224

225 226 227 228 229
    /**
     * Number of retry attempts we will make trying to write an item to the DB
     */
	private static final int MAX_ITEM_RETRY = JiveGlobals.getIntProperty("xmpp.pubsub.item.retry", 1);
    
230
    /**
231
     * Queue that holds the (wrapped) items that need to be added to the database.
232
     */
233
    private static LinkedList<RetryWrapper> itemsToAdd = new LinkedList<RetryWrapper>();
234 235 236 237

    /**
     * Queue that holds the items that need to be deleted from the database.
     */
238
    private static LinkedList<PublishedItem> itemsToDelete = new LinkedList<PublishedItem>();
239 240

    /**
241 242 243
     * Keeps reference to published items that haven't been persisted yet so they 
     * can be removed before being deleted. Note these items are wrapped via the 
     * RetryWrapper to allow multiple persistence attempts when needed.
244
     */
245
    private static final HashMap<String, LinkedListNode<RetryWrapper>> itemsPending = new HashMap<String, LinkedListNode<RetryWrapper>>();
246
    
Matt Tucker's avatar
Matt Tucker committed
247
    /**
248
     * Cache name for recently accessed published items.
Matt Tucker's avatar
Matt Tucker committed
249
     */
250
    private static final String ITEM_CACHE = "Published Items";
Matt Tucker's avatar
Matt Tucker committed
251

252 253 254 255 256
    /**
     * Cache for recently accessed published items.
     */
    private static final Cache<String, PublishedItem> itemCache = CacheFactory.createCache(ITEM_CACHE);
    
Matt Tucker's avatar
Matt Tucker committed
257
    static {
258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275
    	try {
        	if (MAX_ITEMS_FLUSH > 0) {
        		TaskEngine.getInstance().schedule(new TimerTask() {
        			public void run() { flushPendingItems(false); } // this member only
        		}, Math.abs(prng.nextLong())%flushTimerDelay, flushTimerDelay);
        	}

    		// increase the timer delay when running in cluster mode
    		// because other members are also running the purge task
    		if (ClusterManager.isClusteringEnabled()) {
    			purgeTimerDelay = purgeTimerDelay*2;
    		}
    		TaskEngine.getInstance().schedule(new TimerTask() {
    			public void run() { purgeItems(); }
    		}, Math.abs(prng.nextLong())%purgeTimerDelay, purgeTimerDelay);
    		
    	} catch (Exception ex) {
    		log.error("Failed to initialize pubsub maintentence tasks", ex);
276
    	}
277
		
Matt Tucker's avatar
Matt Tucker committed
278 279 280 281 282 283 284
    }

    /**
     * Creates and stores the node configuration in the database.
     *
     * @param node The newly created node.
     */
285
    public static void createNode(Node node) {
Matt Tucker's avatar
Matt Tucker committed
286 287
        Connection con = null;
        PreparedStatement pstmt = null;
288
        boolean abortTransaction = false;
Matt Tucker's avatar
Matt Tucker committed
289
        try {
290
            con = DbConnectionManager.getTransactionConnection();
Matt Tucker's avatar
Matt Tucker committed
291
            pstmt = con.prepareStatement(ADD_NODE);
292
            pstmt.setString(1, node.getService().getServiceID());
Matt Tucker's avatar
Matt Tucker committed
293
            pstmt.setString(2, encodeNodeID(node.getNodeID()));
Matt Tucker's avatar
Matt Tucker committed
294 295 296
            pstmt.setInt(3, (node.isCollectionNode() ? 0 : 1));
            pstmt.setString(4, StringUtils.dateToMillis(node.getCreationDate()));
            pstmt.setString(5, StringUtils.dateToMillis(node.getModificationDate()));
Matt Tucker's avatar
Matt Tucker committed
297
            pstmt.setString(6, node.getParent() != null ? encodeNodeID(node.getParent().getNodeID()) : null);
Matt Tucker's avatar
Matt Tucker committed
298
            pstmt.setInt(7, (node.isPayloadDelivered() ? 1 : 0));
Matt Tucker's avatar
Matt Tucker committed
299 300 301 302 303 304 305 306 307 308
            if (!node.isCollectionNode()) {
                pstmt.setInt(8, ((LeafNode) node).getMaxPayloadSize());
                pstmt.setInt(9, (((LeafNode) node).isPersistPublishedItems() ? 1 : 0));
                pstmt.setInt(10, ((LeafNode) node).getMaxPublishedItems());
            }
            else {
                pstmt.setInt(8, 0);
                pstmt.setInt(9, 0);
                pstmt.setInt(10, 0);
            }
Matt Tucker's avatar
Matt Tucker committed
309 310 311
            pstmt.setInt(11, (node.isNotifiedOfConfigChanges() ? 1 : 0));
            pstmt.setInt(12, (node.isNotifiedOfDelete() ? 1 : 0));
            pstmt.setInt(13, (node.isNotifiedOfRetract() ? 1 : 0));
Matt Tucker's avatar
Matt Tucker committed
312 313 314 315 316 317
            pstmt.setInt(14, (node.isPresenceBasedDelivery() ? 1 : 0));
            pstmt.setInt(15, (node.isSendItemSubscribe() ? 1 : 0));
            pstmt.setString(16, node.getPublisherModel().getName());
            pstmt.setInt(17, (node.isSubscriptionEnabled() ? 1 : 0));
            pstmt.setInt(18, (node.isSubscriptionConfigurationRequired() ? 1 : 0));
            pstmt.setString(19, node.getAccessModel().getName());
318 319 320 321 322 323 324
            pstmt.setString(20, node.getPayloadType());
            pstmt.setString(21, node.getBodyXSLT());
            pstmt.setString(22, node.getDataformXSLT());
            pstmt.setString(23, node.getCreator().toString());
            pstmt.setString(24, node.getDescription());
            pstmt.setString(25, node.getLanguage());
            pstmt.setString(26, node.getName());
Matt Tucker's avatar
Matt Tucker committed
325
            if (node.getReplyPolicy() != null) {
326
                pstmt.setString(27, node.getReplyPolicy().name());
Matt Tucker's avatar
Matt Tucker committed
327 328
            }
            else {
329
                pstmt.setString(27, null);
Matt Tucker's avatar
Matt Tucker committed
330 331
            }
            if (node.isCollectionNode()) {
332 333
                pstmt.setString(28, ((CollectionNode)node).getAssociationPolicy().name());
                pstmt.setInt(29, ((CollectionNode)node).getMaxLeafNodes());
Matt Tucker's avatar
Matt Tucker committed
334 335
            }
            else {
336 337
                pstmt.setString(28, null);
                pstmt.setInt(29, 0);
Matt Tucker's avatar
Matt Tucker committed
338 339
            }
            pstmt.executeUpdate();
340 341

            // Save associated JIDs and roster groups
342
            saveAssociatedElements(con, node);
Matt Tucker's avatar
Matt Tucker committed
343 344
        }
        catch (SQLException sqle) {
345
            log.error(sqle.getMessage(), sqle);
346
            abortTransaction = true;
Matt Tucker's avatar
Matt Tucker committed
347 348
        }
        finally {
349
            DbConnectionManager.closeStatement(pstmt);
350
            DbConnectionManager.closeTransactionConnection(con, abortTransaction);
Matt Tucker's avatar
Matt Tucker committed
351 352 353 354 355 356 357 358
        }
    }

    /**
     * Updates the node configuration in the database.
     *
     * @param node The updated node.
     */
359
    public static void updateNode(Node node) {
Matt Tucker's avatar
Matt Tucker committed
360 361
        Connection con = null;
        PreparedStatement pstmt = null;
362
        boolean abortTransaction = false;
Matt Tucker's avatar
Matt Tucker committed
363
        try {
364
            con = DbConnectionManager.getTransactionConnection();
Matt Tucker's avatar
Matt Tucker committed
365 366
            pstmt = con.prepareStatement(UPDATE_NODE);
            pstmt.setString(1, StringUtils.dateToMillis(node.getModificationDate()));
Matt Tucker's avatar
Matt Tucker committed
367
            pstmt.setString(2, node.getParent() != null ? encodeNodeID(node.getParent().getNodeID()) : null);
Matt Tucker's avatar
Matt Tucker committed
368
            pstmt.setInt(3, (node.isPayloadDelivered() ? 1 : 0));
Matt Tucker's avatar
Matt Tucker committed
369 370 371 372 373 374 375 376 377 378
            if (!node.isCollectionNode()) {
                pstmt.setInt(4, ((LeafNode) node).getMaxPayloadSize());
                pstmt.setInt(5, (((LeafNode) node).isPersistPublishedItems() ? 1 : 0));
                pstmt.setInt(6, ((LeafNode) node).getMaxPublishedItems());
            }
            else {
                pstmt.setInt(4, 0);
                pstmt.setInt(5, 0);
                pstmt.setInt(6, 0);
            }
Matt Tucker's avatar
Matt Tucker committed
379 380 381
            pstmt.setInt(7, (node.isNotifiedOfConfigChanges() ? 1 : 0));
            pstmt.setInt(8, (node.isNotifiedOfDelete() ? 1 : 0));
            pstmt.setInt(9, (node.isNotifiedOfRetract() ? 1 : 0));
Matt Tucker's avatar
Matt Tucker committed
382 383 384 385 386
            pstmt.setInt(10, (node.isPresenceBasedDelivery() ? 1 : 0));
            pstmt.setInt(11, (node.isSendItemSubscribe() ? 1 : 0));
            pstmt.setString(12, node.getPublisherModel().getName());
            pstmt.setInt(13, (node.isSubscriptionEnabled() ? 1 : 0));
            pstmt.setInt(14, (node.isSubscriptionConfigurationRequired() ? 1 : 0));
387 388 389 390 391 392 393
            pstmt.setString(15, node.getAccessModel().getName());
            pstmt.setString(16, node.getPayloadType());
            pstmt.setString(17, node.getBodyXSLT());
            pstmt.setString(18, node.getDataformXSLT());
            pstmt.setString(19, node.getDescription());
            pstmt.setString(20, node.getLanguage());
            pstmt.setString(21, node.getName());
Matt Tucker's avatar
Matt Tucker committed
394
            if (node.getReplyPolicy() != null) {
395
                pstmt.setString(22, node.getReplyPolicy().name());
Matt Tucker's avatar
Matt Tucker committed
396 397
            }
            else {
398
                pstmt.setString(22, null);
Matt Tucker's avatar
Matt Tucker committed
399 400
            }
            if (node.isCollectionNode()) {
401 402
                pstmt.setString(23, ((CollectionNode) node).getAssociationPolicy().name());
                pstmt.setInt(24, ((CollectionNode) node).getMaxLeafNodes());
Matt Tucker's avatar
Matt Tucker committed
403 404
            }
            else {
405 406
                pstmt.setString(23, null);
                pstmt.setInt(24, 0);
Matt Tucker's avatar
Matt Tucker committed
407
            }
408
            pstmt.setString(25, node.getService().getServiceID());
409
            pstmt.setString(26, encodeNodeID(node.getNodeID()));
Matt Tucker's avatar
Matt Tucker committed
410
            pstmt.executeUpdate();
411
            DbConnectionManager.fastcloseStmt(pstmt);
412 413 414

            // Remove existing JIDs associated with the the node
            pstmt = con.prepareStatement(DELETE_NODE_JIDS);
415
            pstmt.setString(1, node.getService().getServiceID());
416 417
            pstmt.setString(2, encodeNodeID(node.getNodeID()));
            pstmt.executeUpdate();
418
            DbConnectionManager.fastcloseStmt(pstmt);
419 420 421

            // Remove roster groups associated with the the node being deleted
            pstmt = con.prepareStatement(DELETE_NODE_GROUPS);
422
            pstmt.setString(1, node.getService().getServiceID());
423 424 425 426
            pstmt.setString(2, encodeNodeID(node.getNodeID()));
            pstmt.executeUpdate();

            // Save associated JIDs and roster groups
427
            saveAssociatedElements(con, node);
Matt Tucker's avatar
Matt Tucker committed
428 429
        }
        catch (SQLException sqle) {
430
            log.error(sqle.getMessage(), sqle);
431
            abortTransaction = true;
Matt Tucker's avatar
Matt Tucker committed
432 433
        }
        finally {
434
            DbConnectionManager.closeStatement(pstmt);
435 436 437 438
            DbConnectionManager.closeTransactionConnection(con, abortTransaction);
        }
    }

439
    private static void saveAssociatedElements(Connection con, Node node) throws SQLException {
440 441 442 443
        // Add new JIDs associated with the the node
        PreparedStatement pstmt = con.prepareStatement(ADD_NODE_JIDS);
        try {
            for (JID jid : node.getContacts()) {
444
                pstmt.setString(1, node.getService().getServiceID());
445 446 447 448 449 450
                pstmt.setString(2, encodeNodeID(node.getNodeID()));
                pstmt.setString(3, jid.toString());
                pstmt.setString(4, "contacts");
                pstmt.executeUpdate();
            }
            for (JID jid : node.getReplyRooms()) {
451
                pstmt.setString(1, node.getService().getServiceID());
452 453 454 455 456 457
                pstmt.setString(2, encodeNodeID(node.getNodeID()));
                pstmt.setString(3, jid.toString());
                pstmt.setString(4, "replyRooms");
                pstmt.executeUpdate();
            }
            for (JID jid : node.getReplyTo()) {
458
                pstmt.setString(1, node.getService().getServiceID());
459 460 461 462 463 464 465
                pstmt.setString(2, encodeNodeID(node.getNodeID()));
                pstmt.setString(3, jid.toString());
                pstmt.setString(4, "replyTo");
                pstmt.executeUpdate();
            }
            if (node.isCollectionNode()) {
                for (JID jid : ((CollectionNode) node).getAssociationTrusted()) {
466
                    pstmt.setString(1, node.getService().getServiceID());
467 468 469 470 471 472
                    pstmt.setString(2, encodeNodeID(node.getNodeID()));
                    pstmt.setString(3, jid.toString());
                    pstmt.setString(4, "associationTrusted");
                    pstmt.executeUpdate();
                }
            }
473
            DbConnectionManager.fastcloseStmt(pstmt);
474 475 476
            // Add new roster groups associated with the the node
            pstmt = con.prepareStatement(ADD_NODE_GROUPS);
            for (String groupName : node.getRosterGroupsAllowed()) {
477
                pstmt.setString(1, node.getService().getServiceID());
478 479 480 481 482 483
                pstmt.setString(2, encodeNodeID(node.getNodeID()));
                pstmt.setString(3, groupName);
                pstmt.executeUpdate();
            }
        }
        finally {
484
            DbConnectionManager.closeStatement(pstmt);
Matt Tucker's avatar
Matt Tucker committed
485 486 487 488 489 490 491 492 493
        }
    }

    /**
     * Removes the specified node from the DB.
     *
     * @param node The node that is being deleted.
     * @return true If the operation was successful.
     */
494
    public static boolean removeNode(Node node) {
Matt Tucker's avatar
Matt Tucker committed
495 496 497 498 499 500 501
        Connection con = null;
        PreparedStatement pstmt = null;
        boolean abortTransaction = false;
        try {
            con = DbConnectionManager.getTransactionConnection();
            // Remove the affiliate from the table of node affiliates
            pstmt = con.prepareStatement(DELETE_NODE);
502
            pstmt.setString(1, node.getService().getServiceID());
Matt Tucker's avatar
Matt Tucker committed
503
            pstmt.setString(2, encodeNodeID(node.getNodeID()));
Matt Tucker's avatar
Matt Tucker committed
504
            pstmt.executeUpdate();
505
            DbConnectionManager.fastcloseStmt(pstmt);
Matt Tucker's avatar
Matt Tucker committed
506

507 508
            // Remove JIDs associated with the the node being deleted
            pstmt = con.prepareStatement(DELETE_NODE_JIDS);
509
            pstmt.setString(1, node.getService().getServiceID());
510 511
            pstmt.setString(2, encodeNodeID(node.getNodeID()));
            pstmt.executeUpdate();
512
            DbConnectionManager.fastcloseStmt(pstmt);
513 514 515

            // Remove roster groups associated with the the node being deleted
            pstmt = con.prepareStatement(DELETE_NODE_GROUPS);
516
            pstmt.setString(1, node.getService().getServiceID());
517 518
            pstmt.setString(2, encodeNodeID(node.getNodeID()));
            pstmt.executeUpdate();
519
            DbConnectionManager.fastcloseStmt(pstmt);
520

Matt Tucker's avatar
Matt Tucker committed
521
            // Remove published items of the node being deleted
522 523 524 525
			if (node instanceof LeafNode)
			{
				purgeNode((LeafNode) node, con);
			}
Matt Tucker's avatar
Matt Tucker committed
526 527 528

            // Remove all affiliates from the table of node affiliates
            pstmt = con.prepareStatement(DELETE_AFFILIATIONS);
529
            pstmt.setString(1, node.getService().getServiceID());
Matt Tucker's avatar
Matt Tucker committed
530
            pstmt.setString(2, encodeNodeID(node.getNodeID()));
Matt Tucker's avatar
Matt Tucker committed
531
            pstmt.executeUpdate();
532
            DbConnectionManager.fastcloseStmt(pstmt);
Matt Tucker's avatar
Matt Tucker committed
533 534 535

            // Remove users that were subscribed to the node
            pstmt = con.prepareStatement(DELETE_SUBSCRIPTIONS);
536
            pstmt.setString(1, node.getService().getServiceID());
Matt Tucker's avatar
Matt Tucker committed
537
            pstmt.setString(2, encodeNodeID(node.getNodeID()));
Matt Tucker's avatar
Matt Tucker committed
538 539 540
            pstmt.executeUpdate();
        }
        catch (SQLException sqle) {
541
            log.error(sqle.getMessage(), sqle);
Matt Tucker's avatar
Matt Tucker committed
542 543 544
            abortTransaction = true;
        }
        finally {
545
            DbConnectionManager.closeStatement(pstmt);
Matt Tucker's avatar
Matt Tucker committed
546 547 548 549 550 551 552 553 554 555 556 557 558
            DbConnectionManager.closeTransactionConnection(con, abortTransaction);
        }
        return !abortTransaction;
    }

    /**
     * Loads all nodes from the database and adds them to the PubSub service.
     *
     * @param service the pubsub service that is hosting the nodes.
     */
    public static void loadNodes(PubSubService service) {
        Connection con = null;
        PreparedStatement pstmt = null;
559
        ResultSet rs = null;
Matt Tucker's avatar
Matt Tucker committed
560 561 562
        Map<String, Node> nodes = new HashMap<String, Node>();
        try {
            con = DbConnectionManager.getConnection();
563
            // Get all non-leaf nodes (to ensure parent nodes are loaded before their children)
564
			pstmt = con.prepareStatement(LOAD_NODES);
Matt Tucker's avatar
Matt Tucker committed
565
            pstmt.setString(1, service.getServiceID());
566
            rs = pstmt.executeQuery();
567 568 569
            
            Map<String, String> parentMappings = new HashMap<String, String>();
            
570 571
            // Rebuild loaded non-leaf nodes
            while(rs.next()) {
572
                loadNode(service, nodes, parentMappings, rs);
573
            }
574
            DbConnectionManager.fastcloseStmt(rs, pstmt);
575

576 577 578 579 580 581 582 583 584 585 586 587 588 589 590
            if (nodes.size() == 0) {
            	log.info("No nodes found in pubsub");
            	return;
            }
            
            for (Map.Entry<String, String> entry : parentMappings.entrySet()) {
            	Node child = nodes.get(entry.getKey());
            	CollectionNode parent = (CollectionNode) nodes.get(entry.getValue());
            	
            	if (parent == null) {
            		log.error("Could not find parent node " + entry.getValue() + " for node " + entry.getKey());
            	}
            	else {
            		child.changeParent(parent);
            	}
Matt Tucker's avatar
Matt Tucker committed
591
            }
592 593 594 595 596 597 598 599
            // Get JIDs associated with all nodes
            pstmt = con.prepareStatement(LOAD_NODES_JIDS);
            pstmt.setString(1, service.getServiceID());
            rs = pstmt.executeQuery();
            // Add to each node the associated JIDs
            while(rs.next()) {
                loadAssociatedJIDs(nodes, rs);
            }
600
            DbConnectionManager.fastcloseStmt(rs, pstmt);
601 602 603 604 605 606 607 608 609

            // Get roster groups associateds with all nodes
            pstmt = con.prepareStatement(LOAD_NODES_GROUPS);
            pstmt.setString(1, service.getServiceID());
            rs = pstmt.executeQuery();
            // Add to each node the associated Groups
            while(rs.next()) {
                loadAssociatedGroups(nodes, rs);
            }
610
            DbConnectionManager.fastcloseStmt(rs, pstmt);
611

Matt Tucker's avatar
Matt Tucker committed
612 613 614 615 616 617 618 619
            // Get affiliations of all nodes
            pstmt = con.prepareStatement(LOAD_AFFILIATIONS);
            pstmt.setString(1, service.getServiceID());
            rs = pstmt.executeQuery();
            // Add to each node the correspondiding affiliates
            while(rs.next()) {
                loadAffiliations(nodes, rs);
            }
620
            DbConnectionManager.fastcloseStmt(rs, pstmt);
Matt Tucker's avatar
Matt Tucker committed
621 622 623 624 625 626 627 628 629

            // Get subscriptions to all nodes
            pstmt = con.prepareStatement(LOAD_SUBSCRIPTIONS);
            pstmt.setString(1, service.getServiceID());
            rs = pstmt.executeQuery();
            // Add to each node the correspondiding subscriptions
            while(rs.next()) {
                loadSubscriptions(service, nodes, rs);
            }
630
            DbConnectionManager.fastcloseStmt(rs, pstmt);
Matt Tucker's avatar
Matt Tucker committed
631 632
        }
        catch (SQLException sqle) {
633
            log.error(sqle.getMessage(), sqle);
Matt Tucker's avatar
Matt Tucker committed
634 635
        }
        finally {
636
            DbConnectionManager.closeConnection(rs, pstmt, con);
Matt Tucker's avatar
Matt Tucker committed
637 638 639 640 641 642 643 644 645 646 647 648
        }

        for (Node node : nodes.values()) {
            // Set now that the node is persistent in the database. Note: We need to
            // set this now since otherwise the node's affiliations will be saved to the database
            // "again" while adding them to the node!
            node.setSavedToDB(true);
            // Add the node to the service
            service.addNode(node);
        }
    }

649 650
	/**
	 * Loads all nodes from the database and adds them to the PubSub service.
651
     *
652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669
	 * @param service
	 *            the pubsub service that is hosting the nodes.
	 */
	public static void loadNode(PubSubService service, String nodeId)
	{
		Connection con = null;
		PreparedStatement pstmt = null;
		ResultSet rs = null;
		Map<String, Node> nodes = new HashMap<String, Node>();
		try
		{
			con = DbConnectionManager.getConnection();
			// Get all non-leaf nodes (to ensure parent nodes are loaded before
			// their children)
			pstmt = con.prepareStatement(LOAD_NODE);
			pstmt.setString(1, service.getServiceID());
			pstmt.setString(2, nodeId);
			rs = pstmt.executeQuery();
670 671
			Map<String, String> parentMapping = new HashMap<String, String>();
			
672 673 674
			// Rebuild loaded non-leaf nodes
			if (rs.next())
			{
675
				loadNode(service, nodes, parentMapping, rs);
676 677
			}
			DbConnectionManager.fastcloseStmt(rs, pstmt);
678 679 680 681 682 683 684 685 686 687 688 689 690
			String parentId = parentMapping.get(nodeId);
			
			if (parentId != null) {
				CollectionNode parent = (CollectionNode) service.getNode(parentId);
				
				if (parent == null) {
            		log.error("Could not find parent node " + parentId + " for node " + nodeId);
				}
				else {
					nodes.get(nodeId).changeParent(parent);
				}
			}
				
691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760
			// Get JIDs associated with all nodes
			pstmt = con.prepareStatement(LOAD_NODE_JIDS);
			pstmt.setString(1, service.getServiceID());
			pstmt.setString(2, nodeId);
			rs = pstmt.executeQuery();
			// Add to each node the associated JIDs
			while (rs.next())
			{
				loadAssociatedJIDs(nodes, rs);
			}
			DbConnectionManager.fastcloseStmt(rs, pstmt);

			// Get roster groups associated with all nodes
			pstmt = con.prepareStatement(LOAD_NODE_GROUPS);
			pstmt.setString(1, service.getServiceID());
			pstmt.setString(2, nodeId);
			rs = pstmt.executeQuery();
			// Add to each node the associated Groups
			while (rs.next())
			{
				loadAssociatedGroups(nodes, rs);
			}
			DbConnectionManager.fastcloseStmt(rs, pstmt);

			// Get affiliations of all nodes
			pstmt = con.prepareStatement(LOAD_NODE_AFFILIATIONS);
			pstmt.setString(1, service.getServiceID());
			pstmt.setString(2, nodeId);
			rs = pstmt.executeQuery();
			// Add to each node the corresponding affiliates
			while (rs.next())
			{
				loadAffiliations(nodes, rs);
			}
			DbConnectionManager.fastcloseStmt(rs, pstmt);

			// Get subscriptions to all nodes
			pstmt = con.prepareStatement(LOAD_NODE_SUBSCRIPTIONS);
			pstmt.setString(1, service.getServiceID());
			pstmt.setString(2, nodeId);
			rs = pstmt.executeQuery();
			// Add to each node the corresponding subscriptions
			while (rs.next())
			{
				loadSubscriptions(service, nodes, rs);
			}
			DbConnectionManager.fastcloseStmt(rs, pstmt);
		}
		catch (SQLException sqle)
		{
			log.error(sqle.getMessage(), sqle);
		}
		finally
		{
			DbConnectionManager.closeConnection(rs, pstmt, con);
		}

		for (Node node : nodes.values())
		{
			// Set now that the node is persistent in the database. Note: We
			// need to
			// set this now since otherwise the node's affiliations will be
			// saved to the database
			// "again" while adding them to the node!
			node.setSavedToDB(true);
			// Add the node to the service
			service.addNode(node);
		}
	}

761
    private static void loadNode(PubSubService service, Map<String, Node> loadedNodes, Map<String, String> parentMappings, ResultSet rs) {
Matt Tucker's avatar
Matt Tucker committed
762
        Node node;
Matt Tucker's avatar
Matt Tucker committed
763
        try {
Matt Tucker's avatar
Matt Tucker committed
764
            String nodeID = decodeNodeID(rs.getString(1));
Matt Tucker's avatar
Matt Tucker committed
765
            boolean leaf = rs.getInt(2) == 1;
Matt Tucker's avatar
Matt Tucker committed
766
            String parent = decodeNodeID(rs.getString(5));
767
            JID creator = new JID(rs.getString(22));
768
            
Matt Tucker's avatar
Matt Tucker committed
769
            if (parent != null) {
770
            	parentMappings.put(nodeID, parent);
Matt Tucker's avatar
Matt Tucker committed
771 772 773 774
            }

            if (leaf) {
                // Retrieving a leaf node
775
                node = new LeafNode(service, null, nodeID, creator);
Matt Tucker's avatar
Matt Tucker committed
776 777 778
            }
            else {
                // Retrieving a collection node
779
                node = new CollectionNode(service, null, nodeID, creator);
Matt Tucker's avatar
Matt Tucker committed
780 781 782
            }
            node.setCreationDate(new Date(Long.parseLong(rs.getString(3).trim())));
            node.setModificationDate(new Date(Long.parseLong(rs.getString(4).trim())));
Matt Tucker's avatar
Matt Tucker committed
783
            node.setPayloadDelivered(rs.getInt(6) == 1);
Matt Tucker's avatar
Matt Tucker committed
784 785 786 787
            if (leaf) {
                ((LeafNode) node).setMaxPayloadSize(rs.getInt(7));
                ((LeafNode) node).setPersistPublishedItems(rs.getInt(8) == 1);
                ((LeafNode) node).setMaxPublishedItems(rs.getInt(9));
Matt Tucker's avatar
Matt Tucker committed
788
                ((LeafNode) node).setSendItemSubscribe(rs.getInt(14) == 1);
Matt Tucker's avatar
Matt Tucker committed
789
            }
Matt Tucker's avatar
Matt Tucker committed
790 791 792
            node.setNotifiedOfConfigChanges(rs.getInt(10) == 1);
            node.setNotifiedOfDelete(rs.getInt(11) == 1);
            node.setNotifiedOfRetract(rs.getInt(12) == 1);
Matt Tucker's avatar
Matt Tucker committed
793 794 795 796
            node.setPresenceBasedDelivery(rs.getInt(13) == 1);
            node.setPublisherModel(PublisherModel.valueOf(rs.getString(15)));
            node.setSubscriptionEnabled(rs.getInt(16) == 1);
            node.setSubscriptionConfigurationRequired(rs.getInt(17) == 1);
797 798 799 800 801 802 803 804 805
            node.setAccessModel(AccessModel.valueOf(rs.getString(18)));
            node.setPayloadType(rs.getString(19));
            node.setBodyXSLT(rs.getString(20));
            node.setDataformXSLT(rs.getString(21));
            node.setDescription(rs.getString(23));
            node.setLanguage(rs.getString(24));
            node.setName(rs.getString(25));
            if (rs.getString(26) != null) {
                node.setReplyPolicy(Node.ItemReplyPolicy.valueOf(rs.getString(26)));
Matt Tucker's avatar
Matt Tucker committed
806 807 808
            }
            if (!leaf) {
                ((CollectionNode) node).setAssociationPolicy(
809 810
                        CollectionNode.LeafNodeAssociationPolicy.valueOf(rs.getString(27)));
                ((CollectionNode) node).setMaxLeafNodes(rs.getInt(28));
Matt Tucker's avatar
Matt Tucker committed
811 812 813 814 815 816
            }

            // Add the load to the list of loaded nodes
            loadedNodes.put(node.getNodeID(), node);
        }
        catch (SQLException sqle) {
817
            log.error(sqle.getMessage(), sqle);
Matt Tucker's avatar
Matt Tucker committed
818 819 820
        }
    }

821 822 823 824 825
    private static void loadAssociatedJIDs(Map<String, Node> nodes, ResultSet rs) {
        try {
            String nodeID = decodeNodeID(rs.getString(1));
            Node node = nodes.get(nodeID);
            if (node == null) {
826
                log.warn("JID associated to a non-existent node: " + nodeID);
827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844
                return;
            }
            JID jid = new JID(rs.getString(2));
            String associationType = rs.getString(3);
            if ("contacts".equals(associationType)) {
                node.addContact(jid);
            }
            else if ("replyRooms".equals(associationType)) {
                node.addReplyRoom(jid);
            }
            else if ("replyTo".equals(associationType)) {
                node.addReplyTo(jid);
            }
            else if ("associationTrusted".equals(associationType)) {
                ((CollectionNode) node).addAssociationTrusted(jid);
            }
        }
        catch (Exception ex) {
845
            log.error(ex.getMessage(), ex);
846 847 848 849 850 851 852 853
        }
    }

    private static void loadAssociatedGroups(Map<String, Node> nodes, ResultSet rs) {
        try {
            String nodeID = decodeNodeID(rs.getString(1));
            Node node = nodes.get(nodeID);
            if (node == null) {
854
                log.warn("Roster Group associated to a non-existent node: " + nodeID);
855 856 857 858 859
                return;
            }
            node.addAllowedRosterGroup(rs.getString(2));
        }
        catch (SQLException ex) {
860
            log.error(ex.getMessage(), ex);
861 862 863
        }
    }

Matt Tucker's avatar
Matt Tucker committed
864 865
    private static void loadAffiliations(Map<String, Node> nodes, ResultSet rs) {
        try {
Matt Tucker's avatar
Matt Tucker committed
866
            String nodeID = decodeNodeID(rs.getString(1));
Matt Tucker's avatar
Matt Tucker committed
867 868
            Node node = nodes.get(nodeID);
            if (node == null) {
869
                log.warn("Affiliations found for a non-existent node: " + nodeID);
Matt Tucker's avatar
Matt Tucker committed
870 871 872 873 874 875 876
                return;
            }
            NodeAffiliate affiliate = new NodeAffiliate(node, new JID(rs.getString(2)));
            affiliate.setAffiliation(NodeAffiliate.Affiliation.valueOf(rs.getString(3)));
            node.addAffiliate(affiliate);
        }
        catch (SQLException sqle) {
877
            log.error(sqle.getMessage(), sqle);
Matt Tucker's avatar
Matt Tucker committed
878 879 880
        }
    }

881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915
	public static void loadSubscription(PubSubService service, Node node, String subId)
	{
		Connection con = null;
		PreparedStatement pstmt = null;
		ResultSet rs = null;
		Map<String, Node> nodes = new HashMap<String, Node>();
		nodes.put(node.getNodeID(), node);

		try
		{
			con = DbConnectionManager.getConnection();

			// Get subscriptions to all nodes
			pstmt = con.prepareStatement(LOAD_NODE_SUBSCRIPTION);
			pstmt.setString(1, service.getServiceID());
			pstmt.setString(2, node.getNodeID());
			pstmt.setString(3, subId);
			rs = pstmt.executeQuery();

			// Add to each node the corresponding subscription
			if (rs.next())
			{
				loadSubscriptions(service, nodes, rs);
			}
		}
		catch (SQLException sqle)
		{
			log.error(sqle.getMessage(), sqle);
		}
		finally
		{
			DbConnectionManager.closeConnection(rs, pstmt, con);
		}
	}

916
    private static void loadSubscriptions(PubSubService service, Map<String, Node> nodes, ResultSet rs) {
Matt Tucker's avatar
Matt Tucker committed
917
        try {
Matt Tucker's avatar
Matt Tucker committed
918
            String nodeID = decodeNodeID(rs.getString(1));
Matt Tucker's avatar
Matt Tucker committed
919 920
            Node node = nodes.get(nodeID);
            if (node == null) {
921
                log.warn("Subscription found for a non-existent node: " + nodeID);
Matt Tucker's avatar
Matt Tucker committed
922 923 924 925 926
                return;
            }
            String subID = rs.getString(2);
            JID subscriber = new JID(rs.getString(3));
            JID owner = new JID(rs.getString(4));
927
            if (node.getAffiliate(owner) == null) {
928
                log.warn("Subscription found for a non-existent affiliate: " + owner +
929 930 931
                        " in node: " + nodeID);
                return;
            }
Matt Tucker's avatar
Matt Tucker committed
932
            NodeSubscription.State state = NodeSubscription.State.valueOf(rs.getString(5));
933
			NodeSubscription subscription = new NodeSubscription(node, owner, subscriber, state, subID);
Matt Tucker's avatar
Matt Tucker committed
934 935 936 937 938 939 940 941 942 943 944
            subscription.setShouldDeliverNotifications(rs.getInt(6) == 1);
            subscription.setUsingDigest(rs.getInt(7) == 1);
            subscription.setDigestFrequency(rs.getInt(8));
            if (rs.getString(9) != null) {
                subscription.setExpire(new Date(Long.parseLong(rs.getString(9).trim())));
            }
            subscription.setIncludingBody(rs.getInt(10) == 1);
            subscription.setPresenceStates(decodeWithComma(rs.getString(11)));
            subscription.setType(NodeSubscription.Type.valueOf(rs.getString(12)));
            subscription.setDepth(rs.getInt(13));
            subscription.setKeyword(rs.getString(14));
Matt Tucker's avatar
Matt Tucker committed
945 946 947 948 949
            // Indicate the subscription that is has already been saved to the database
            subscription.setSavedToDB(true);
            node.addSubscription(subscription);
        }
        catch (SQLException sqle) {
950
            log.error(sqle.getMessage(), sqle);
Matt Tucker's avatar
Matt Tucker committed
951 952 953
        }
    }

Matt Tucker's avatar
Matt Tucker committed
954 955 956 957 958 959 960
    /**
     * Update the DB with the new affiliation of the user in the node.
     *
     * @param node      The node where the affiliation of the user was updated.
     * @param affiliate The new affiliation of the user in the node.
     * @param create    True if this is a new affiliate.
     */
961
    public static void saveAffiliation(Node node, NodeAffiliate affiliate, boolean create) {
Matt Tucker's avatar
Matt Tucker committed
962 963 964 965 966 967 968
        Connection con = null;
        PreparedStatement pstmt = null;
        try {
            con = DbConnectionManager.getConnection();
            if (create) {
                // Add the user to the generic affiliations table
                pstmt = con.prepareStatement(ADD_AFFILIATION);
969
                pstmt.setString(1, node.getService().getServiceID());
Matt Tucker's avatar
Matt Tucker committed
970
                pstmt.setString(2, encodeNodeID(node.getNodeID()));
Matt Tucker's avatar
Matt Tucker committed
971 972 973 974 975
                pstmt.setString(3, affiliate.getJID().toString());
                pstmt.setString(4, affiliate.getAffiliation().name());
                pstmt.executeUpdate();
            }
            else {
976 977 978
                // Update the affiliate's data in the backend store
                pstmt = con.prepareStatement(UPDATE_AFFILIATION);
                pstmt.setString(1, affiliate.getAffiliation().name());
979
                pstmt.setString(2, node.getService().getServiceID());
Matt Tucker's avatar
Matt Tucker committed
980
                pstmt.setString(3, encodeNodeID(node.getNodeID()));
981 982
                pstmt.setString(4, affiliate.getJID().toString());
                pstmt.executeUpdate();
Matt Tucker's avatar
Matt Tucker committed
983 984 985
            }
        }
        catch (SQLException sqle) {
986
            log.error(sqle.getMessage(), sqle);
Matt Tucker's avatar
Matt Tucker committed
987 988
        }
        finally {
989
            DbConnectionManager.closeConnection(pstmt, con);
Matt Tucker's avatar
Matt Tucker committed
990 991 992 993 994 995 996 997 998
        }
    }

    /**
     * Removes the affiliation and subsription state of the user from the DB.
     *
     * @param node      The node where the affiliation of the user was updated.
     * @param affiliate The existing affiliation and subsription state of the user in the node.
     */
999
    public static void removeAffiliation(Node node, NodeAffiliate affiliate) {
Matt Tucker's avatar
Matt Tucker committed
1000 1001 1002 1003 1004 1005
        Connection con = null;
        PreparedStatement pstmt = null;
        try {
            con = DbConnectionManager.getConnection();
            // Remove the affiliate from the table of node affiliates
            pstmt = con.prepareStatement(DELETE_AFFILIATION);
1006
            pstmt.setString(1, node.getService().getServiceID());
Matt Tucker's avatar
Matt Tucker committed
1007
            pstmt.setString(2, encodeNodeID(node.getNodeID()));
Matt Tucker's avatar
Matt Tucker committed
1008 1009 1010 1011
            pstmt.setString(3, affiliate.getJID().toString());
            pstmt.executeUpdate();
        }
        catch (SQLException sqle) {
1012
            log.error(sqle.getMessage(), sqle);
Matt Tucker's avatar
Matt Tucker committed
1013 1014
        }
        finally {
1015
            DbConnectionManager.closeConnection(pstmt, con);
Matt Tucker's avatar
Matt Tucker committed
1016 1017 1018 1019 1020 1021 1022 1023 1024 1025
        }
    }

    /**
     * Updates the DB with the new subsription of the user to the node.
     *
     * @param node      The node where the user has subscribed to.
     * @param subscription The new subscription of the user to the node.
     * @param create    True if this is a new affiliate.
     */
1026
    public static void saveSubscription(Node node, NodeSubscription subscription, boolean create) {
Matt Tucker's avatar
Matt Tucker committed
1027 1028 1029 1030 1031 1032 1033
        Connection con = null;
        PreparedStatement pstmt = null;
        try {
            con = DbConnectionManager.getConnection();
            if (create) {
                // Add the subscription of the user to the database
                pstmt = con.prepareStatement(ADD_SUBSCRIPTION);
1034
                pstmt.setString(1, node.getService().getServiceID());
Matt Tucker's avatar
Matt Tucker committed
1035
                pstmt.setString(2, encodeNodeID(node.getNodeID()));
Matt Tucker's avatar
Matt Tucker committed
1036 1037 1038 1039
                pstmt.setString(3, subscription.getID());
                pstmt.setString(4, subscription.getJID().toString());
                pstmt.setString(5, subscription.getOwner().toString());
                pstmt.setString(6, subscription.getState().name());
Matt Tucker's avatar
Matt Tucker committed
1040 1041 1042
                pstmt.setInt(7, (subscription.shouldDeliverNotifications() ? 1 : 0));
                pstmt.setInt(8, (subscription.isUsingDigest() ? 1 : 0));
                pstmt.setInt(9, subscription.getDigestFrequency());
Matt Tucker's avatar
Matt Tucker committed
1043 1044
                Date expireDate = subscription.getExpire();
                if (expireDate == null) {
Matt Tucker's avatar
Matt Tucker committed
1045
                    pstmt.setString(10, null);
Matt Tucker's avatar
Matt Tucker committed
1046 1047
                }
                else {
Matt Tucker's avatar
Matt Tucker committed
1048
                    pstmt.setString(10, StringUtils.dateToMillis(expireDate));
Matt Tucker's avatar
Matt Tucker committed
1049
                }
Matt Tucker's avatar
Matt Tucker committed
1050 1051 1052 1053 1054
                pstmt.setInt(11, (subscription.isIncludingBody() ? 1 : 0));
                pstmt.setString(12, encodeWithComma(subscription.getPresenceStates()));
                pstmt.setString(13, subscription.getType().name());
                pstmt.setInt(14, subscription.getDepth());
                pstmt.setString(15, subscription.getKeyword());
Matt Tucker's avatar
Matt Tucker committed
1055 1056 1057 1058 1059 1060 1061 1062
                pstmt.executeUpdate();
                // Indicate the subscription that is has been saved to the database
                subscription.setSavedToDB(true);
            }
            else {
                if (NodeSubscription.State.none == subscription.getState()) {
                    // Remove the subscription of the user from the table
                    pstmt = con.prepareStatement(DELETE_SUBSCRIPTION);
1063
                    pstmt.setString(1, node.getService().getServiceID());
Matt Tucker's avatar
Matt Tucker committed
1064
                    pstmt.setString(2, encodeNodeID(node.getNodeID()));
Matt Tucker's avatar
Matt Tucker committed
1065 1066 1067 1068 1069 1070 1071 1072
                    pstmt.setString(2, subscription.getID());
                    pstmt.executeUpdate();
                }
                else {
                    // Update the subscription of the user in the backend store
                    pstmt = con.prepareStatement(UPDATE_SUBSCRIPTION);
                    pstmt.setString(1, subscription.getOwner().toString());
                    pstmt.setString(2, subscription.getState().name());
Matt Tucker's avatar
Matt Tucker committed
1073 1074 1075
                    pstmt.setInt(3, (subscription.shouldDeliverNotifications() ? 1 : 0));
                    pstmt.setInt(4, (subscription.isUsingDigest() ? 1 : 0));
                    pstmt.setInt(5, subscription.getDigestFrequency());
Matt Tucker's avatar
Matt Tucker committed
1076 1077
                    Date expireDate = subscription.getExpire();
                    if (expireDate == null) {
Matt Tucker's avatar
Matt Tucker committed
1078
                        pstmt.setString(6, null);
Matt Tucker's avatar
Matt Tucker committed
1079 1080
                    }
                    else {
Matt Tucker's avatar
Matt Tucker committed
1081
                        pstmt.setString(6, StringUtils.dateToMillis(expireDate));
Matt Tucker's avatar
Matt Tucker committed
1082
                    }
Matt Tucker's avatar
Matt Tucker committed
1083 1084 1085 1086 1087
                    pstmt.setInt(7, (subscription.isIncludingBody() ? 1 : 0));
                    pstmt.setString(8, encodeWithComma(subscription.getPresenceStates()));
                    pstmt.setString(9, subscription.getType().name());
                    pstmt.setInt(10, subscription.getDepth());
                    pstmt.setString(11, subscription.getKeyword());
1088
                    pstmt.setString(12, node.getService().getServiceID());
Matt Tucker's avatar
Matt Tucker committed
1089
                    pstmt.setString(13, encodeNodeID(node.getNodeID()));
Matt Tucker's avatar
Matt Tucker committed
1090
                    pstmt.setString(14, subscription.getID());
Matt Tucker's avatar
Matt Tucker committed
1091 1092 1093 1094 1095
                    pstmt.executeUpdate();
                }
            }
        }
        catch (SQLException sqle) {
1096
            log.error(sqle.getMessage(), sqle);
Matt Tucker's avatar
Matt Tucker committed
1097 1098
        }
        finally {
1099
            DbConnectionManager.closeConnection(pstmt, con);
Matt Tucker's avatar
Matt Tucker committed
1100 1101 1102 1103 1104 1105 1106 1107
        }
    }

    /**
     * Removes the subscription of the user from the DB.
     *
     * @param subscription The existing subsription of the user to the node.
     */
1108
    public static void removeSubscription(NodeSubscription subscription) {
Matt Tucker's avatar
Matt Tucker committed
1109 1110 1111 1112 1113 1114
        Connection con = null;
        PreparedStatement pstmt = null;
        try {
            con = DbConnectionManager.getConnection();
            // Remove the affiliate from the table of node affiliates
            pstmt = con.prepareStatement(DELETE_SUBSCRIPTION);
1115 1116
            pstmt.setString(1, subscription.getNode().getService().getServiceID());
            pstmt.setString(2, encodeNodeID(subscription.getNode().getNodeID()));
Matt Tucker's avatar
Matt Tucker committed
1117 1118 1119 1120
            pstmt.setString(3, subscription.getID());
            pstmt.executeUpdate();
        }
        catch (SQLException sqle) {
1121
            log.error(sqle.getMessage(), sqle);
Matt Tucker's avatar
Matt Tucker committed
1122 1123
        }
        finally {
1124
            DbConnectionManager.closeConnection(pstmt, con);
Matt Tucker's avatar
Matt Tucker committed
1125 1126 1127 1128
        }
    }

    /**
1129 1130 1131 1132 1133 1134 1135 1136
     * Creates and stores the published item in the database. Note that the
     * item will be cached temporarily before being flushed asynchronously 
     * to the database. The write cache can be tuned using the following
     * two properties:
     * <pre>
     *   "xmpp.pubsub.flush.max" - maximum items in the cache (-1 to disable cache)
     *   "xmpp.pubsub.flush.timer" - number of seconds between cache flushes
     * </pre>
1137
     * @param item The published item to save.
Matt Tucker's avatar
Matt Tucker committed
1138
     */
1139
    public static void savePublishedItem(PublishedItem item) {
1140
    	savePublishedItem(new RetryWrapper(item));
1141 1142 1143 1144
    }

    /**
     * Creates and stores the published item in the database. 
1145
     * @param wrapper The published item, wrapped for retry
1146
     */
1147 1148 1149
    private static void savePublishedItem(RetryWrapper wrapper) {
    	boolean firstPass = (wrapper.getRetryCount() == 0);
    	PublishedItem item = wrapper.get();
1150 1151 1152 1153
		String itemKey = item.getItemKey();
		itemCache.put(itemKey, item);
		log.debug("Added new (inbound) item to cache");
        synchronized (itemsPending) {
1154
    		LinkedListNode<RetryWrapper> itemToReplace = itemsPending.remove(itemKey);
1155
    		if (itemToReplace != null) {
1156
    			itemToReplace.remove(); // remove duplicate from itemsToAdd linked list
1157
    		}
1158 1159 1160
    		LinkedListNode<RetryWrapper> listNode = firstPass ? 
    							itemsToAdd.addLast(wrapper) : 
    							itemsToAdd.addFirst(wrapper);
1161 1162
    		itemsPending.put(itemKey, listNode);
        }
1163 1164
        // skip the flush step if this is a retry attempt
		if (firstPass && itemsPending.size() > MAX_ITEMS_FLUSH) {
1165 1166 1167
			TaskEngine.getInstance().submit(new Runnable() {
				public void run() { flushPendingItems(false); }
			});
1168
		}
Matt Tucker's avatar
Matt Tucker committed
1169
    }
1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182
    
    /**
     * This class is used internally to wrap PublishedItems. It adds
     * a retry counter for the persistence exception handling logic.
     */
    private static class RetryWrapper {
    	private PublishedItem item;
        private volatile transient int retryCount = 0;
    	public RetryWrapper(PublishedItem item) { this.item = item; }
    	public PublishedItem get() { return item; }
    	public int getRetryCount() { return retryCount; }
    	public int nextRetry() { return ++retryCount; }
    }
Matt Tucker's avatar
Matt Tucker committed
1183

Matt Tucker's avatar
Matt Tucker committed
1184
    /**
1185
     * Flush the cache(s) of items to be persisted (itemsToAdd) and deleted (itemsToDelete).
Matt Tucker's avatar
Matt Tucker committed
1186
     */
1187
	public static void flushPendingItems()
1188
    {
1189
        flushPendingItems(ClusterManager.isClusteringEnabled());
1190 1191 1192
    }

    /**
1193
     * Flush the cache(s) of items to be persisted (itemsToAdd) and deleted (itemsToDelete).
1194 1195
     * @param sendToCluster If true, delegate to cluster members, otherwise local only
     */
1196
    public static void flushPendingItems(boolean sendToCluster)
1197
    {
1198 1199
		// forward to other cluster members and wait for response
		if (sendToCluster) {
1200 1201 1202
            CacheFactory.doSynchronousClusterTask(new FlushTask(), false);
        }

1203
		if (itemsToAdd.getFirst() == null && itemsToDelete.getFirst() == null) {
1204
        	return;	 // nothing to do for this cluster member
1205
        }
1206
        
1207 1208
		Connection con = null;
		boolean rollback = false;
1209 1210
    	LinkedList<RetryWrapper> addList = null;
    	LinkedList<PublishedItem> delList = null;
1211

1212
    	// Swap pending items so we can parse and save the contents from this point in time
1213
    	// while not blocking new entries from being cached.
1214
    	synchronized(itemsPending) 
1215 1216 1217
    	{
    		addList = itemsToAdd;
    		delList = itemsToDelete;
1218

1219 1220
    		itemsToAdd = new LinkedList<RetryWrapper>();
    		itemsToDelete = new LinkedList<PublishedItem>();
1221
    		
1222 1223
    		// Ensure pending items are available via the item read cache;
    		// this allows the item(s) to be fetched by other request threads
1224 1225 1226 1227
    		// while being written to the DB from this thread
    		int copied = 0;
    		for (String key : itemsPending.keySet()) {
    			if (!itemCache.containsKey(key)) {
1228
    				itemCache.put(key, (((RetryWrapper)itemsPending.get(key).object)).get());
1229 1230 1231 1232 1233 1234 1235
    				copied++;
    			}
    		}
    		if (log.isDebugEnabled() && copied > 0) {
    			log.debug("Added " + copied + " pending items to published item cache");
    		}
    		itemsPending.clear();
1236
    	}
1237

1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251
    	// Note that we now make multiple attempts to write cached items to the DB:
    	//   1) insert all pending items in a single batch
    	//   2) if the batch insert fails, retry by inserting each item separately
    	//   3) if a given item cannot be written, return it to the pending write cache
    	// By default step 3 will be tried once per item, but this can be configured
    	// (or disabled) using the "xmpp.pubsub.item.retry" property. In the event of
    	// a transaction rollback, items that could not be written to the database
    	// will be returned to the pending item write cache.
    	try {
			con = DbConnectionManager.getTransactionConnection();
			writePendingItems(con, addList, delList);
		} catch (SQLException se) {
			log.error("Failed to flush pending items; initiating rollback", se);
			// return new items to the write cache
1252
	        LinkedListNode<RetryWrapper> node = addList.getLast();
1253
	        while (node != null) {
1254
	            savePublishedItem(node.object);
1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270
	            node.remove();
	            node = addList.getLast();
	        }
			rollback = true;
		} finally {
			DbConnectionManager.closeTransactionConnection(con, rollback);
		}
	}

    /**
     * Loop through the lists of added and deleted items and write to the database
     * @param con
     * @param addList
     * @param delList
     * @throws SQLException
     */
1271
	private static void writePendingItems(Connection con, LinkedList<RetryWrapper> addList, LinkedList<PublishedItem> delList) throws SQLException
1272
	{
1273 1274
        LinkedListNode<RetryWrapper> addItem = addList.getFirst();
        LinkedListNode<PublishedItem> delItem = delList.getFirst();
1275 1276 1277 1278 1279 1280 1281
        
        // is there anything to do?
        if ((addItem == null) && (delItem == null)) { return; }
        
    	if (log.isDebugEnabled()) {
    		log.debug("Flush " + itemsPending.size() + " pending items to database");
    	}
1282

1283 1284
        // ensure there are no duplicates by deleting before adding
        if (addItem != null) {
1285
        	LinkedListNode<RetryWrapper> addHead = addItem.previous;
1286
        	while (addItem != addHead) {
1287
        		delList.addLast(addItem.object.get());
1288 1289 1290
        		addItem = addItem.next;
        	}
        }
1291

1292
        // delete first (to remove possible duplicates), then add new items
1293
        delItem = delList.getFirst();
1294
        if (delItem != null) {
1295
            PreparedStatement pstmt = null;
1296
			try {
1297
                LinkedListNode<PublishedItem> delHead = delItem.previous;
1298
				pstmt = con.prepareStatement(DELETE_ITEM);
1299

1300 1301
                while (delItem != delHead)
                {
1302
                	PublishedItem item = delItem.object;
1303 1304 1305 1306 1307 1308 1309 1310
                    pstmt.setString(1, item.getNode().getService().getServiceID());
                    pstmt.setString(2, encodeNodeID(item.getNode().getNodeID()));
                    pstmt.setString(3, item.getID());
                    pstmt.addBatch();

                    delItem = delItem.next;
                }
				pstmt.executeBatch();
1311 1312 1313 1314
			} catch (SQLException ex) {
				log.error("Failed to delete published item(s) from DB", ex);
				// do not re-throw here; continue with insert operation if possible
			} finally {
1315
				DbConnectionManager.closeStatement(pstmt);
1316 1317 1318
	        }
        }
		
1319 1320 1321 1322 1323 1324
        try { 
            // first try to add the pending items as a batch
        	writePendingItems(con, addList.getFirst(), true);
        } catch (SQLException ex) {
        	// retry each item individually rather than rolling back
        	writePendingItems(con, addList.getFirst(), false);       	
Matt Tucker's avatar
Matt Tucker committed
1325 1326
        }
    }
1327 1328 1329 1330 1331 1332 1333 1334
	
	/**
	 * Execute JDBC calls (optionally via batch) to persist the given published items
	 * @param con
	 * @param addItem
	 * @param batch
	 * @throws SQLException
	 */
1335
	private static void writePendingItems(Connection con, LinkedListNode<RetryWrapper> addItem, boolean batch)  throws SQLException 
1336 1337
	{	
		if (addItem == null) { return; }
1338
        LinkedListNode<RetryWrapper> addHead = addItem.previous;
1339
        PreparedStatement pstmt = null;
1340
        RetryWrapper wrappedItem = null;
1341 1342 1343 1344 1345
        PublishedItem item = null;       
    	try {
			pstmt = con.prepareStatement(ADD_ITEM);
            while (addItem != addHead)
            {
1346 1347
            	wrappedItem = addItem.object;
            	item = wrappedItem.get();
1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359
                pstmt.setString(1, item.getNode().getService().getServiceID());
                pstmt.setString(2, encodeNodeID(item.getNodeID()));
                pstmt.setString(3, item.getID());
                pstmt.setString(4, item.getPublisher().toString());
                pstmt.setString(5, StringUtils.dateToMillis(item.getCreationDate()));
                pstmt.setString(6, item.getPayloadXML());
                if (batch) { pstmt.addBatch(); }
                else { 
                	try { pstmt.execute(); }
                	catch (SQLException se) {
        	    		// individual item could not be persisted; retry (up to MAX_ITEM_RETRY attempts)
        	    		String itemKey = item.getItemKey();
1360
        	    		if (wrappedItem.nextRetry() < MAX_ITEM_RETRY) {
1361
        	        		log.warn("Failed to persist published item (will retry): " + itemKey);
1362
        	                savePublishedItem(wrappedItem);
1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379
        	    		} else {
        	    			// all hope is lost ... item will be dropped
        	    			log.error("Published item could not be written to database: " + itemKey + "\n" + item.getPayloadXML(), se);
        	    		}
                	}
                }
                addItem = addItem.next;
            }
            if (batch) { pstmt.executeBatch(); }			
    	} catch (SQLException se) {
			log.error("Failed to persist published items as batch; will retry individually", se);
			// caught by caller; should not cause a transaction rollback
			throw se;
    	} finally {
    		DbConnectionManager.closeStatement(pstmt);
    	}
	}
Matt Tucker's avatar
Matt Tucker committed
1380

Matt Tucker's avatar
Matt Tucker committed
1381 1382 1383 1384 1385
    /**
     * Removes the specified published item from the DB.
     *
     * @param item The published item to delete.
     */
1386
    public static void removePublishedItem(PublishedItem item) {
1387 1388 1389
    	String itemKey = item.getItemKey();
        itemCache.remove(itemKey);
        synchronized (itemsPending)
1390 1391
    	{
    		itemsToDelete.addLast(item);
1392
			LinkedListNode<RetryWrapper> itemToAdd = itemsPending.remove(itemKey);
1393 1394
			if (itemToAdd != null)
				itemToAdd.remove();  // drop from itemsToAdd linked list
1395
		}
Matt Tucker's avatar
Matt Tucker committed
1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410
    }

    /**
     * Loads from the database the default node configuration for the specified node type
     * and pubsub service.
     *
     * @param service the default node configuration used by this pubsub service.
     * @param isLeafType true if loading default configuration for leaf nodes.
     * @return the loaded default node configuration for the specified node type and service
     *         or <tt>null</tt> if none was found.
     */
    public static DefaultNodeConfiguration loadDefaultConfiguration(PubSubService service,
            boolean isLeafType) {
        Connection con = null;
        PreparedStatement pstmt = null;
1411
        ResultSet rs = null;
Matt Tucker's avatar
Matt Tucker committed
1412 1413 1414 1415 1416 1417 1418
        DefaultNodeConfiguration config = null;
        try {
            con = DbConnectionManager.getConnection();
            // Get default node configuration for the specified service
            pstmt = con.prepareStatement(LOAD_DEFAULT_CONF);
            pstmt.setString(1, service.getServiceID());
            pstmt.setInt(2, (isLeafType ? 1 : 0));
1419
            rs = pstmt.executeQuery();
Matt Tucker's avatar
Matt Tucker committed
1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444
            if (rs.next()) {
                config = new DefaultNodeConfiguration(isLeafType);
                // Rebuild loaded default node configuration
                config.setDeliverPayloads(rs.getInt(1) == 1);
                config.setMaxPayloadSize(rs.getInt(2));
                config.setPersistPublishedItems(rs.getInt(3) == 1);
                config.setMaxPublishedItems(rs.getInt(4));
                config.setNotifyConfigChanges(rs.getInt(5) == 1);
                config.setNotifyDelete(rs.getInt(6) == 1);
                config.setNotifyRetract(rs.getInt(7) == 1);
                config.setPresenceBasedDelivery(rs.getInt(8) == 1);
                config.setSendItemSubscribe(rs.getInt(9) == 1);
                config.setPublisherModel(PublisherModel.valueOf(rs.getString(10)));
                config.setSubscriptionEnabled(rs.getInt(11) == 1);
                config.setAccessModel(AccessModel.valueOf(rs.getString(12)));
                config.setLanguage(rs.getString(13));
                if (rs.getString(14) != null) {
                    config.setReplyPolicy(Node.ItemReplyPolicy.valueOf(rs.getString(14)));
                }
                config.setAssociationPolicy(
                        CollectionNode.LeafNodeAssociationPolicy.valueOf(rs.getString(15)));
                config.setMaxLeafNodes(rs.getInt(16));
            }
        }
        catch (Exception sqle) {
1445
            log.error(sqle.getMessage(), sqle);
Matt Tucker's avatar
Matt Tucker committed
1446 1447
        }
        finally {
1448
            DbConnectionManager.closeConnection(rs, pstmt, con);
Matt Tucker's avatar
Matt Tucker committed
1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491
        }
        return config;
    }

    /**
     * Creates a new default node configuration for the specified service.
     *
     * @param service the default node configuration used by this pubsub service.
     * @param config the default node configuration to create in the database.
     */
    public static void createDefaultConfiguration(PubSubService service,
            DefaultNodeConfiguration config) {
        Connection con = null;
        PreparedStatement pstmt = null;
        try {
            con = DbConnectionManager.getConnection();
            pstmt = con.prepareStatement(ADD_DEFAULT_CONF);
            pstmt.setString(1, service.getServiceID());
            pstmt.setInt(2, (config.isLeaf() ? 1 : 0));
            pstmt.setInt(3, (config.isDeliverPayloads() ? 1 : 0));
            pstmt.setInt(4, config.getMaxPayloadSize());
            pstmt.setInt(5, (config.isPersistPublishedItems() ? 1 : 0));
            pstmt.setInt(6, config.getMaxPublishedItems());
            pstmt.setInt(7, (config.isNotifyConfigChanges() ? 1 : 0));
            pstmt.setInt(8, (config.isNotifyDelete() ? 1 : 0));
            pstmt.setInt(9, (config.isNotifyRetract() ? 1 : 0));
            pstmt.setInt(10, (config.isPresenceBasedDelivery() ? 1 : 0));
            pstmt.setInt(11, (config.isSendItemSubscribe() ? 1 : 0));
            pstmt.setString(12, config.getPublisherModel().getName());
            pstmt.setInt(13, (config.isSubscriptionEnabled() ? 1 : 0));
            pstmt.setString(14, config.getAccessModel().getName());
            pstmt.setString(15, config.getLanguage());
            if (config.getReplyPolicy() != null) {
                pstmt.setString(16, config.getReplyPolicy().name());
            }
            else {
                pstmt.setString(16, null);
            }
            pstmt.setString(17, config.getAssociationPolicy().name());
            pstmt.setInt(18, config.getMaxLeafNodes());
            pstmt.executeUpdate();
        }
        catch (SQLException sqle) {
1492
            log.error(sqle.getMessage(), sqle);
Matt Tucker's avatar
Matt Tucker committed
1493 1494
        }
        finally {
1495
            DbConnectionManager.closeConnection(pstmt, con);
Matt Tucker's avatar
Matt Tucker committed
1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537
        }
    }

    /**
     * Updates the default node configuration for the specified service.
     *
     * @param service the default node configuration used by this pubsub service.
     * @param config the default node configuration to update in the database.
     */
    public static void updateDefaultConfiguration(PubSubService service,
            DefaultNodeConfiguration config) {
        Connection con = null;
        PreparedStatement pstmt = null;
        try {
            con = DbConnectionManager.getConnection();
            pstmt = con.prepareStatement(UPDATE_DEFAULT_CONF);
            pstmt.setInt(1, (config.isDeliverPayloads() ? 1 : 0));
            pstmt.setInt(2, config.getMaxPayloadSize());
            pstmt.setInt(3, (config.isPersistPublishedItems() ? 1 : 0));
            pstmt.setInt(4, config.getMaxPublishedItems());
            pstmt.setInt(5, (config.isNotifyConfigChanges() ? 1 : 0));
            pstmt.setInt(6, (config.isNotifyDelete() ? 1 : 0));
            pstmt.setInt(7, (config.isNotifyRetract() ? 1 : 0));
            pstmt.setInt(8, (config.isPresenceBasedDelivery() ? 1 : 0));
            pstmt.setInt(9, (config.isSendItemSubscribe() ? 1 : 0));
            pstmt.setString(10, config.getPublisherModel().getName());
            pstmt.setInt(11, (config.isSubscriptionEnabled() ? 1 : 0));
            pstmt.setString(12, config.getAccessModel().getName());
            pstmt.setString(13, config.getLanguage());
            if (config.getReplyPolicy() != null) {
                pstmt.setString(14, config.getReplyPolicy().name());
            }
            else {
                pstmt.setString(14, null);
            }
            pstmt.setString(15, config.getAssociationPolicy().name());
            pstmt.setInt(16, config.getMaxLeafNodes());
            pstmt.setString(17, service.getServiceID());
            pstmt.setInt(18, (config.isLeaf() ? 1 : 0));
            pstmt.executeUpdate();
        }
        catch (SQLException sqle) {
1538
            log.error(sqle.getMessage(), sqle);
Matt Tucker's avatar
Matt Tucker committed
1539 1540
        }
        finally {
1541
            DbConnectionManager.closeConnection(pstmt, con);
Matt Tucker's avatar
Matt Tucker committed
1542 1543 1544
        }
    }

1545 1546 1547 1548 1549 1550 1551 1552 1553

    /**
     * Fetches all the results for the specified node, limited by {@link LeafNode#getMaxPublishedItems()}.
     *
     * @param node the leaf node to load its published items.
     */
    public static List<PublishedItem> getPublishedItems(LeafNode node) {
    	return getPublishedItems(node, node.getMaxPublishedItems());
    }
1554

1555 1556 1557 1558 1559 1560
    /**
     * Fetches all the results for the specified node, limited by {@link LeafNode#getMaxPublishedItems()}.
     *
     * @param node the leaf node to load its published items.
     */
    public static List<PublishedItem> getPublishedItems(LeafNode node, int maxRows) {
1561 1562 1563 1564 1565 1566 1567 1568 1569
        Lock itemLock = CacheFactory.getLock(ITEM_CACHE, itemCache);
        try {
	    	// NOTE: force other requests to wait for DB I/O to complete
        	itemLock.lock();
	    	flushPendingItems();
        } finally {
        	itemLock.unlock();
        }
    	Connection con = null;
1570 1571 1572 1573
        PreparedStatement pstmt = null;
        ResultSet rs = null;
        int max = MAX_ROWS_FETCH;
        int maxPublished = node.getMaxPublishedItems();
1574

1575 1576 1577 1578 1579
        // Limit the max rows until a solution is in place with Result Set Management
        if (maxRows != -1)
        	max = maxPublished == -1 ? Math.min(maxRows, MAX_ROWS_FETCH) :  Math.min(maxRows, maxPublished);
        else if (maxPublished != -1)
        	max = Math.min(MAX_ROWS_FETCH, maxPublished);
1580

1581
        // We don't know how many items are in the db, so we will start with an allocation of 500
1582 1583
		java.util.LinkedList<PublishedItem> results = new java.util.LinkedList<PublishedItem>();
		boolean descending = JiveGlobals.getBooleanProperty("xmpp.pubsub.order.descending", false);
1584

1585 1586
		try
		{
Matt Tucker's avatar
Matt Tucker committed
1587
            con = DbConnectionManager.getConnection();
1588 1589 1590 1591 1592 1593 1594
            // Get published items of the specified node
            pstmt = con.prepareStatement(LOAD_ITEMS);
            pstmt.setMaxRows(max);
            pstmt.setString(1, node.getService().getServiceID());
            pstmt.setString(2, encodeNodeID(node.getNodeID()));
            rs = pstmt.executeQuery();
            int counter = 0;
1595

1596 1597 1598 1599 1600 1601 1602 1603 1604
            // Rebuild loaded published items
            while(rs.next() && (counter < max)) {
                String itemID = rs.getString(1);
                JID publisher = new JID(rs.getString(2));
                Date creationDate = new Date(Long.parseLong(rs.getString(3).trim()));
                // Create the item
                PublishedItem item = new PublishedItem(node, publisher, itemID, creationDate);
                // Add the extra fields to the published item
                if (rs.getString(4) != null) {
1605
                	item.setPayloadXML(rs.getString(4));
1606 1607
                }
                // Add the published item to the node
1608 1609 1610 1611
				if (descending)
					results.add(item);
				else
					results.addFirst(item);
1612 1613
                counter++;
            }
Matt Tucker's avatar
Matt Tucker committed
1614
        }
1615 1616
        catch (Exception sqle) {
            log.error(sqle.getMessage(), sqle);
Matt Tucker's avatar
Matt Tucker committed
1617 1618
        }
        finally {
1619
            DbConnectionManager.closeConnection(rs, pstmt, con);
Matt Tucker's avatar
Matt Tucker committed
1620
        }
1621

1622
        if (results.size() == 0)
1623
			return Collections.emptyList();
1624

1625
        return results;
Matt Tucker's avatar
Matt Tucker committed
1626 1627
    }

1628 1629 1630 1631 1632 1633
    /**
     * Fetches the last published item for the specified node.
     *
     * @param node the leaf node to load its last published items.
     */
    public static PublishedItem getLastPublishedItem(LeafNode node) {
1634 1635 1636 1637 1638 1639 1640 1641
        Lock itemLock = CacheFactory.getLock(ITEM_CACHE, itemCache);
        try {
        	// NOTE: force other requests to wait for DB I/O to complete
        	itemLock.lock();
	    	flushPendingItems();
        } finally {
        	itemLock.unlock();
        }
1642
        Connection con = null;
Matt Tucker's avatar
Matt Tucker committed
1643
        PreparedStatement pstmt = null;
1644 1645
        ResultSet rs = null;
        PublishedItem item = null;
1646

Matt Tucker's avatar
Matt Tucker committed
1647
        try {
1648 1649 1650
            con = DbConnectionManager.getConnection();
            // Get published items of the specified node
            pstmt = con.prepareStatement(LOAD_LAST_ITEM);
1651 1652
            pstmt.setFetchSize(1);
            pstmt.setMaxRows(1);
1653 1654
            pstmt.setString(1, node.getService().getServiceID());
            pstmt.setString(2, encodeNodeID(node.getNodeID()));
Matt Tucker's avatar
Matt Tucker committed
1655
            rs = pstmt.executeQuery();
1656 1657 1658 1659 1660 1661 1662 1663 1664
            // Rebuild loaded published items
            if (rs.next()) {
                String itemID = rs.getString(1);
                JID publisher = new JID(rs.getString(2));
                Date creationDate = new Date(Long.parseLong(rs.getString(3).trim()));
                // Create the item
                item = new PublishedItem(node, publisher, itemID, creationDate);
                // Add the extra fields to the published item
                if (rs.getString(4) != null) {
1665
                	item.setPayloadXML(rs.getString(4));
Matt Tucker's avatar
Matt Tucker committed
1666 1667
                }
            }
1668 1669 1670 1671 1672 1673 1674 1675 1676
        }
        catch (Exception sqle) {
            log.error(sqle.getMessage(), sqle);
        }
        finally {
            DbConnectionManager.closeConnection(rs, pstmt, con);
        }
        return item;
    }
Matt Tucker's avatar
Matt Tucker committed
1677

1678
    public static PublishedItem getPublishedItem(LeafNode node, String itemID) {
1679 1680 1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 1691 1692 1693 1694 1695 1696 1697 1698 1699 1700 1701 1702 1703 1704 1705 1706 1707 1708 1709 1710 1711 1712 1713 1714 1715 1716 1717 1718 1719 1720 1721 1722 1723 1724 1725 1726 1727
    	String itemKey = PublishedItem.getItemKey(node, itemID);

        // try to fetch from cache first without locking
        PublishedItem result = itemCache.get(itemKey);
    	if (result == null) {
            Lock itemLock = CacheFactory.getLock(ITEM_CACHE, itemCache);
            try {
    	    	// Acquire lock, then re-check cache before reading from DB;
            	// allows clustered item cache to be primed by first request
            	itemLock.lock();
            	result = itemCache.get(itemKey);
            	if (result == null) {
	            	flushPendingItems(); 
	
	        		// fetch item from DB
	                Connection con = null;
	                PreparedStatement pstmt = null;
	                ResultSet rs = null;
	                try {
	                    con = DbConnectionManager.getConnection();
	                    pstmt = con.prepareStatement(LOAD_ITEM);
	                    pstmt.setString(1, node.getService().getServiceID());
	                    pstmt.setString(2, node.getNodeID());
	                    pstmt.setString(3, itemID);
	                    rs = pstmt.executeQuery();
	
	                    // Add to each node the corresponding subscriptions
	                    if (rs.next()) {
	                        JID publisher = new JID(rs.getString(1));
	                        Date creationDate = new Date(Long.parseLong(rs.getString(2).trim()));
	                        // Create the item
	                        result = new PublishedItem(node, publisher, itemID, creationDate);
	                        // Add the extra fields to the published item
	                        if (rs.getString(3) != null) {
	                        	result.setPayloadXML(rs.getString(3));
	                        }
	                        itemCache.put(itemKey, result);
	                		log.debug("Loaded item into cache from DB");
	                    }
	                } catch (Exception exc) {
	                    log.error(exc.getMessage(), exc);
	                } finally {
	                    DbConnectionManager.closeConnection(pstmt, con);
	                }
            	} else {
            		log.debug("Found cached item on second attempt (after acquiring lock)");
            	}
            } finally {
            	itemLock.unlock();
Matt Tucker's avatar
Matt Tucker committed
1728
            }
1729 1730 1731
    	} else {
    		log.debug("Found cached item on first attempt (no lock)");
    	}
1732 1733
        return result;
	}
Matt Tucker's avatar
Matt Tucker committed
1734

1735 1736 1737 1738 1739 1740 1741 1742 1743 1744 1745 1746 1747 1748 1749
	public static void purgeNode(LeafNode leafNode)
	{
		Connection con = null;
		boolean rollback = false;

		try
		{
			con = DbConnectionManager.getTransactionConnection();

			purgeNode(leafNode, con);

			// Delete all the entries from the itemsToAdd list and pending map
			// that match this node.
			synchronized (itemsPending)
			{
1750
				Iterator<Map.Entry<String, LinkedListNode<RetryWrapper>>> pendingIt = itemsPending.entrySet().iterator();
1751 1752 1753

				while (pendingIt.hasNext())
				{
1754
					LinkedListNode<RetryWrapper> itemNode = pendingIt.next().getValue();
Matt Tucker's avatar
Matt Tucker committed
1755

1756
					if (itemNode.object.get().getNodeID().equals(leafNode.getNodeID()))
1757 1758 1759 1760 1761 1762 1763 1764 1765 1766 1767 1768 1769 1770 1771 1772 1773 1774 1775 1776
					{
						itemNode.remove();
						pendingIt.remove();
					}
				}
			}
		}
		catch (SQLException exc)
		{
			log.error(exc.getMessage(), exc);
			rollback = true;
		}
		finally
		{
			DbConnectionManager.closeTransactionConnection(con, rollback);
		}
	}

	private static void purgeNode(LeafNode leafNode, Connection con) throws SQLException
	{
1777
		flushPendingItems();
1778 1779
        // Remove published items of the node being deleted
        PreparedStatement pstmt = null;
1780

1781 1782
		try
		{
1783 1784 1785 1786
            pstmt = con.prepareStatement(DELETE_ITEMS);
            pstmt.setString(1, leafNode.getService().getServiceID());
            pstmt.setString(2, encodeNodeID(leafNode.getNodeID()));
            pstmt.executeUpdate();
1787 1788 1789 1790 1791 1792 1793 1794 1795 1796 1797 1798 1799 1800 1801
		}
		finally
		{
			DbConnectionManager.closeStatement(pstmt);
		}

		// drop cached items for purged node
		synchronized (itemCache)
		{
			for (PublishedItem item : itemCache.values())
			{
				if (leafNode.getNodeID().equals(item.getNodeID()))
				{
					itemCache.remove(item.getItemKey());
				}
1802
            }
1803
		}
1804
	}
Matt Tucker's avatar
Matt Tucker committed
1805

1806
	private static String encodeWithComma(Collection<String> strings) {
Matt Tucker's avatar
Matt Tucker committed
1807 1808 1809 1810 1811 1812 1813
        StringBuilder sb = new StringBuilder(90);
        for (String group : strings) {
            sb.append(group).append(",");
        }
        if (!strings.isEmpty()) {
            sb.setLength(sb.length()-1);
        }
1814 1815 1816 1817
        else {
            // Add a blank so an empty string is never replaced with NULL (oracle...arggg!!!)
            sb.append(" ");
        }
Matt Tucker's avatar
Matt Tucker committed
1818 1819 1820 1821 1822
        return sb.toString();
    }

    private static Collection<String> decodeWithComma(String strings) {
        Collection<String> decodedStrings = new ArrayList<String>();
1823
        StringTokenizer tokenizer = new StringTokenizer(strings.trim(), ",");
Matt Tucker's avatar
Matt Tucker committed
1824 1825 1826 1827 1828
        while (tokenizer.hasMoreTokens()) {
            decodedStrings.add(tokenizer.nextToken());
        }
        return decodedStrings;
    }
Matt Tucker's avatar
Matt Tucker committed
1829 1830 1831 1832 1833 1834 1835 1836 1837 1838 1839 1840 1841 1842 1843 1844 1845 1846

    private static String encodeNodeID(String nodeID) {
        if (DbConnectionManager.getDatabaseType() == DbConnectionManager.DatabaseType.oracle &&
                "".equals(nodeID)) {
            // Oracle stores empty strings as null so return a string with a space
            return " ";
        }
        return nodeID;
    }

    private static String decodeNodeID(String nodeID) {
        if (DbConnectionManager.getDatabaseType() == DbConnectionManager.DatabaseType.oracle &&
                " ".equals(nodeID)) {
            // Oracle stores empty strings as null so convert them back to empty strings
            return "";
        }
        return nodeID;
    }
1847 1848 1849 1850 1851 1852 1853

    /**
     * Purges all items from the database that exceed the defined item count on
     * all nodes.
     */
    private static void purgeItems()
    {
1854
		boolean abortTransaction = false;
1855 1856
        Connection con = null;
        PreparedStatement pstmt = null;
1857
		PreparedStatement nodeConfig = null;
1858 1859 1860 1861 1862
        ResultSet rs = null;

        try
        {
            con = DbConnectionManager.getTransactionConnection();
1863
			nodeConfig = con.prepareStatement(PERSISTENT_NODES);
1864 1865 1866 1867 1868 1869 1870 1871 1872
            rs = nodeConfig.executeQuery();
			PreparedStatement purgeNode = con
					.prepareStatement(getPurgeStatement(DbConnectionManager.getDatabaseType()));

            while (rs.next())
            {
            	String svcId = rs.getString(1);
            	String nodeId = rs.getString(2);
            	int maxItems = rs.getInt(3);
1873

1874
				setPurgeParams(DbConnectionManager.getDatabaseType(), purgeNode, svcId, nodeId, maxItems);
1875

1876
				purgeNode.addBatch();
1877
            }
1878
			purgeNode.executeBatch();
1879 1880 1881 1882
		}
		catch (Exception sqle)
		{
		    log.error(sqle.getMessage(), sqle);
1883
			abortTransaction = true;
1884 1885 1886
		}
		finally
		{
1887 1888 1889
			DbConnectionManager.closeResultSet(rs);
			DbConnectionManager.closeStatement(rs, nodeConfig);
			DbConnectionManager.closeTransactionConnection(pstmt, con, abortTransaction);
1890 1891 1892 1893 1894 1895 1896 1897 1898 1899 1900 1901 1902 1903 1904 1905 1906 1907 1908 1909 1910 1911 1912 1913 1914 1915 1916 1917 1918 1919 1920 1921 1922 1923 1924 1925 1926 1927 1928 1929
		}
    }

	private static void setPurgeParams(DatabaseType dbType, PreparedStatement purgeStmt, String serviceId,
			String nodeId, int maxItems) throws SQLException
	{
		switch (dbType)
		{
		case hsqldb:
			purgeStmt.setString(1, serviceId);
			purgeStmt.setString(2, nodeId);
			purgeStmt.setString(3, serviceId);
			purgeStmt.setString(4, nodeId);
			purgeStmt.setInt(5, maxItems);
			break;

		default:
			purgeStmt.setString(1, serviceId);
			purgeStmt.setString(2, nodeId);
			purgeStmt.setInt(3, maxItems);
			purgeStmt.setString(4, serviceId);
			purgeStmt.setString(5, nodeId);
			break;
		}
	}

	private static String getPurgeStatement(DatabaseType type)
	{
		switch (type)
		{
		case hsqldb:
			return PURGE_FOR_SIZE_HSQLDB;

		default:
			return PURGE_FOR_SIZE;
		}
	}

    public static void shutdown()
    {
1930
    	log.info("Flushing write cache to database");
1931 1932 1933 1934 1935 1936
		flushPendingItems(false); // local member only
		
		// node cleanup (skip when running as a cluster)
		if (!ClusterManager.isClusteringEnabled()) {
			purgeItems();
		}
1937
    }
Matt Tucker's avatar
Matt Tucker committed
1938
}