1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
/**
* $RCSfile: ServerSocketReader.java,v $
* $Revision: 3174 $
* $Date: 2005-12-08 17:41:00 -0300 (Thu, 08 Dec 2005) $
*
* Copyright (C) 2007 Jive Software. All rights reserved.
*
* This software is published under the terms of the GNU Public License (GPL),
* a copy of which is included in this distribution.
*/
package org.jivesoftware.wildfire.net;
import org.dom4j.Element;
import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.Log;
import org.jivesoftware.wildfire.PacketRouter;
import org.jivesoftware.wildfire.RoutingTable;
import org.jivesoftware.wildfire.auth.UnauthorizedException;
import org.jivesoftware.wildfire.interceptor.PacketRejectedException;
import org.jivesoftware.wildfire.session.IncomingServerSession;
import org.xmlpull.v1.XmlPullParserException;
import org.xmpp.packet.*;
import java.io.IOException;
import java.net.Socket;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* A SocketReader specialized for server connections. This reader will be used when the open
* stream contains a jabber:server namespace. Server-to-server communication requires two
* TCP connections between the servers where one is used for sending packets whilst the other
* connection is used for receiving packets. The connection used for receiving packets will use
* a ServerSocketReader since the other connection will not receive packets.<p>
*
* The received packets will be routed using another thread to ensure that many received packets
* could be routed at the same time. To avoid creating new threads every time a packet is received
* each <tt>ServerSocketReader</tt> instance uses a {@link ThreadPoolExecutor}. By default the
* maximum number of threads that the executor may have is 50. However, this value may be modified
* by changing the property <b>xmpp.server.processing.max.threads</b>.
*
* @author Gaston Dombiak
*/
public class ServerSocketReader extends SocketReader {
/**
* Pool of threads that are available for processing the requests.
*/
private ThreadPoolExecutor threadPool;
public ServerSocketReader(PacketRouter router, RoutingTable routingTable, String serverName,
Socket socket, SocketConnection connection, boolean useBlockingMode) {
super(router, routingTable, serverName, socket, connection, useBlockingMode);
// Create a pool of threads that will process received packets. If more threads are
// required then the command will be executed on the SocketReader process
int coreThreads = JiveGlobals.getIntProperty("xmpp.server.processing.core.threads", 2);
int maxThreads = JiveGlobals.getIntProperty("xmpp.server.processing.max.threads", 50);
int queueSize = JiveGlobals.getIntProperty("xmpp.server.processing.queue", 50);
threadPool =
new ThreadPoolExecutor(coreThreads, maxThreads, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(queueSize),
new ThreadPoolExecutor.CallerRunsPolicy());
}
/**
* Processes the packet in another thread if the packet has not been rejected.
*
* @param packet the received packet.
*/
protected void processIQ(final IQ packet) throws UnauthorizedException {
try {
packetReceived(packet);
// Process the packet in another thread
threadPool.execute(new Runnable() {
public void run() {
try {
ServerSocketReader.super.processIQ(packet);
}
catch (UnauthorizedException e) {
Log.error("Error processing packet", e);
}
}
});
}
catch (PacketRejectedException e) {
Log.debug("IQ rejected: " + packet.toXML(), e);
}
}
/**
* Processes the packet in another thread if the packet has not been rejected.
*
* @param packet the received packet.
*/
protected void processPresence(final Presence packet) throws UnauthorizedException {
try {
packetReceived(packet);
// Process the packet in another thread
threadPool.execute(new Runnable() {
public void run() {
try {
ServerSocketReader.super.processPresence(packet);
}
catch (UnauthorizedException e) {
Log.error("Error processing packet", e);
}
}
});
}
catch (PacketRejectedException e) {
Log.debug("Presence rejected: " + packet.toXML(), e);
}
}
/**
* Processes the packet in another thread if the packet has not been rejected.
*
* @param packet the received packet.
*/
protected void processMessage(final Message packet) throws UnauthorizedException {
try {
packetReceived(packet);
// Process the packet in another thread
threadPool.execute(new Runnable() {
public void run() {
try {
ServerSocketReader.super.processMessage(packet);
}
catch (UnauthorizedException e) {
Log.error("Error processing packet", e);
}
}
});
}
catch (PacketRejectedException e) {
Log.debug("Message rejected: " + packet.toXML(), e);
}
}
/**
* Remote servers may send subsequent db:result packets so we need to process them in order
* to validate new domains.
*
* @param doc the unknown DOM element that was received
* @return true if the packet is a db:result packet otherwise false.
*/
protected boolean processUnknowPacket(Element doc) {
// Handle subsequent db:result packets
if ("db".equals(doc.getNamespacePrefix()) && "result".equals(doc.getName())) {
if (!((IncomingServerSession) session).validateSubsequentDomain(doc)) {
open = false;
}
return true;
}
else if ("db".equals(doc.getNamespacePrefix()) && "verify".equals(doc.getName())) {
// The Receiving Server is reusing an existing connection for sending the
// Authoritative Server a request for verification of a key
((IncomingServerSession) session).verifyReceivedKey(doc);
return true;
}
return false;
}
/**
* Make sure that the received packet has a TO and FROM values defined and that it was sent
* from a previously validated domain. If the packet does not matches any of the above
* conditions then a PacketRejectedException will be thrown.
*
* @param packet the received packet.
* @throws PacketRejectedException if the packet does not include a TO or FROM or if the packet
* was sent from a domain that was not previously validated.
*/
private void packetReceived(Packet packet) throws PacketRejectedException {
if (packet.getTo() == null || packet.getFrom() == null) {
Log.debug("Closing IncomingServerSession due to packet with no TO or FROM: " +
packet.toXML());
// Send a stream error saying that the packet includes no TO or FROM
StreamError error = new StreamError(StreamError.Condition.improper_addressing);
connection.deliverRawText(error.toXML());
// Close the underlying connection
connection.close();
open = false;
throw new PacketRejectedException("Packet with no TO or FROM attributes");
}
else if (!((IncomingServerSession) session).isValidDomain(packet.getFrom().getDomain())) {
Log.debug("Closing IncomingServerSession due to packet with invalid domain: " +
packet.toXML());
// Send a stream error saying that the packet includes an invalid FROM
StreamError error = new StreamError(StreamError.Condition.invalid_from);
connection.deliverRawText(error.toXML());
// Close the underlying connection
connection.close();
open = false;
throw new PacketRejectedException("Packet with no TO or FROM attributes");
}
}
protected void shutdown() {
super.shutdown();
// Shutdown the pool of threads that are processing packets sent by
// the remote server
threadPool.shutdown();
}
boolean createSession(String namespace) throws UnauthorizedException, XmlPullParserException,
IOException {
if ("jabber:server".equals(namespace)) {
// The connected client is a server so create an IncomingServerSession
session = IncomingServerSession.createSession(serverName, reader, connection);
return true;
}
return false;
}
String getNamespace() {
return "jabber:server";
}
String getName() {
return "Server SR - " + hashCode();
}
boolean validateHost() {
return true;
}
}