Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
O
Openfire
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Administrator
Openfire
Commits
81111e99
Commit
81111e99
authored
Nov 28, 2015
by
Guus der Kinderen
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Allow for more types of connection acceptors than just NIO/MINA based ones.
parent
c4773fd9
Changes
3
Show whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
340 additions
and
296 deletions
+340
-296
ConnectionAcceptor.java
...ava/org/jivesoftware/openfire/spi/ConnectionAcceptor.java
+27
-288
ConnectionListener.java
...ava/org/jivesoftware/openfire/spi/ConnectionListener.java
+7
-8
MINAConnectionAcceptor.java
...org/jivesoftware/openfire/spi/MINAConnectionAcceptor.java
+306
-0
No files found.
src/java/org/jivesoftware/openfire/spi/ConnectionAcceptor.java
View file @
81111e99
package
org
.
jivesoftware
.
openfire
.
spi
;
import
org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder
;
import
org.apache.mina.core.service.IoService
;
import
org.apache.mina.core.service.IoServiceListener
;
import
org.apache.mina.core.session.IdleStatus
;
import
org.apache.mina.core.session.IoSession
;
import
org.apache.mina.filter.codec.ProtocolCodecFilter
;
import
org.apache.mina.filter.executor.ExecutorFilter
;
import
org.apache.mina.filter.ssl.SslFilter
;
import
org.apache.mina.integration.jmx.IoServiceMBean
;
import
org.apache.mina.integration.jmx.IoSessionMBean
;
import
org.apache.mina.transport.socket.SocketSessionConfig
;
import
org.apache.mina.transport.socket.nio.NioSocketAcceptor
;
import
org.jivesoftware.openfire.Connection
;
import
org.jivesoftware.openfire.JMXManager
;
import
org.jivesoftware.openfire.net.StalledSessionsFilter
;
import
org.jivesoftware.openfire.nio.*
;
import
org.jivesoftware.util.JiveGlobals
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
javax.management.JMException
;
import
javax.management.MBeanServer
;
import
javax.management.MalformedObjectNameException
;
import
javax.management.ObjectName
;
import
java.lang.management.ManagementFactory
;
import
java.net.InetSocketAddress
;
import
java.util.concurrent.ThreadFactory
;
import
java.util.concurrent.ThreadPoolExecutor
;
import
java.util.concurrent.TimeUnit
;
import
java.util.concurrent.atomic.AtomicInteger
;
/**
*
This class is responsible for accepting new (socket) connection
s.
*
ConnectionAcceptors are responsible for accepting new (typically socket) connections from peer
s.
*
* The configuration (but not the state) of an instance
of this class is
immutable. When configuration changes are
* The configuration (but not the state) of an instance
is intended to be
immutable. When configuration changes are
* needed, an instance needs to be replaced by a new instance.
*
* @author Guus der Kinderen, guus.der.kinderen@gmail.com
*/
class
ConnectionAcceptor
public
abstract
class
ConnectionAcceptor
{
private
final
Logger
Log
;
private
final
String
name
;
private
final
ConnectionHandler
connectionHandler
;
// Configuration
private
final
ConnectionConfiguration
configuration
;
private
final
EncryptionArtifactFactory
encryptionArtifactFactory
;
private
NioSocketAcceptor
socketAcceptor
;
protected
final
ConnectionConfiguration
configuration
;
/**
* Instantiates, but not starts, a new instance.
* Constructs a new instance which will accept new connections based on the provided configuration.
*
* The provided configuration is expected to be immutable. ConnectionAcceptor instances are not expected to handle
* changes in configuration. When such changes are to be applied, an instance is expected to be replaced.
*
* Newly instantiated ConnectionAcceptors will not accept any connections before {@link #start()} is invoked.
*
* @param configuration The configuration for connections to be accepted (cannot be null).
*/
public
ConnectionAcceptor
(
ConnectionConfiguration
configuration
)
{
if
(
configuration
==
null
)
{
throw
new
IllegalArgumentException
(
"Argument 'configuation' cannot be null"
);
}
this
.
configuration
=
configuration
;
this
.
encryptionArtifactFactory
=
new
EncryptionArtifactFactory
(
configuration
);
this
.
name
=
configuration
.
getType
().
toString
().
toLowerCase
()
+
(
configuration
.
getTlsPolicy
()
==
Connection
.
TLSPolicy
.
legacyMode
?
"_ssl"
:
""
);
Log
=
LoggerFactory
.
getLogger
(
ConnectionAcceptor
.
class
.
getName
()
+
"["
+
name
+
"]"
);
// FIXME implement missing switch branches.
switch
(
configuration
.
getType
()
)
if
(
configuration
==
null
)
{
case
SOCKET_S2S:
connectionHandler
=
new
ServerConnectionHandler
(
configuration
);
break
;
case
SOCKET_C2S:
connectionHandler
=
new
ClientConnectionHandler
(
configuration
);
break
;
// case BOSH_C2S:
// break;
// case ADMIN:
// break;
// case WEBADMIN:
// break;
case
COMPONENT:
connectionHandler
=
new
ComponentConnectionHandler
(
configuration
);
break
;
case
CONNECTION_MANAGER:
connectionHandler
=
new
MultiplexerConnectionHandler
(
configuration
);
break
;
default
:
throw
new
IllegalStateException
(
"Cannot determine 'connection handler' for connection type : "
+
configuration
.
getType
()
);
throw
new
IllegalArgumentException
(
"Argument 'configuration' cannot be null"
);
}
this
.
configuration
=
configuration
;
}
/**
* Starts this connection by binding the socket acceptor. When the acceptor is already started, a warning will be
* logged and the method invocation is otherwise ignored.
* Makes the instance start accepting connections.
*
* An invocation of this method on an instance that is already started should have no effect (to the extend that the
* instance should continue to accept connections without interruption or configuration changes).
*/
public
synchronized
void
start
()
{
if
(
socketAcceptor
!=
null
)
{
Log
.
warn
(
"Unable to start acceptor (it is already started!)"
);
return
;
}
try
{
// Configure the thread pool that is to be used.
final
int
initialSize
=
(
configuration
.
getMaxThreadPoolSize
()
/
4
)
+
1
;
final
ExecutorFilter
executorFilter
=
new
ExecutorFilter
(
initialSize
,
configuration
.
getMaxThreadPoolSize
(),
60
,
TimeUnit
.
SECONDS
);
final
ThreadPoolExecutor
eventExecutor
=
(
ThreadPoolExecutor
)
executorFilter
.
getExecutor
();
final
ThreadFactory
threadFactory
=
new
DelegatingThreadFactory
(
name
+
"-thread-"
,
eventExecutor
.
getThreadFactory
()
);
eventExecutor
.
setThreadFactory
(
threadFactory
);
// Construct a new socket acceptor, and configure it.
socketAcceptor
=
buildSocketAcceptor
();
if
(
JMXManager
.
isEnabled
()
)
{
configureJMX
(
socketAcceptor
,
name
);
}
final
DefaultIoFilterChainBuilder
filterChain
=
socketAcceptor
.
getFilterChain
();
filterChain
.
addFirst
(
ConnectionManagerImpl
.
EXECUTOR_FILTER_NAME
,
executorFilter
);
// Add the XMPP codec filter
filterChain
.
addAfter
(
ConnectionManagerImpl
.
EXECUTOR_FILTER_NAME
,
ConnectionManagerImpl
.
XMPP_CODEC_FILTER_NAME
,
new
ProtocolCodecFilter
(
new
XMPPCodecFactory
()
)
);
// Kill sessions whose outgoing queues keep growing and fail to send traffic
filterChain
.
addAfter
(
ConnectionManagerImpl
.
XMPP_CODEC_FILTER_NAME
,
ConnectionManagerImpl
.
CAPACITY_FILTER_NAME
,
new
StalledSessionsFilter
()
);
// Ports can be configured to start connections in SSL (as opposed to upgrade a non-encrypted socket to an encrypted one, typically using StartTLS)
if
(
configuration
.
getTlsPolicy
()
==
Connection
.
TLSPolicy
.
legacyMode
)
{
final
SslFilter
sslFilter
=
encryptionArtifactFactory
.
createServerModeSslFilter
();
filterChain
.
addAfter
(
ConnectionManagerImpl
.
EXECUTOR_FILTER_NAME
,
ConnectionManagerImpl
.
TLS_FILTER_NAME
,
sslFilter
);
}
// Throttle sessions who send data too fast
if
(
configuration
.
getMaxBufferSize
()
>
0
)
{
socketAcceptor
.
getSessionConfig
().
setMaxReadBufferSize
(
configuration
.
getMaxBufferSize
()
);
Log
.
debug
(
"Throttling read buffer for connections to max={} bytes"
,
configuration
.
getMaxBufferSize
()
);
}
// Start accepting connections
socketAcceptor
.
setHandler
(
connectionHandler
);
socketAcceptor
.
bind
(
new
InetSocketAddress
(
configuration
.
getBindAddress
(),
configuration
.
getPort
()
)
);
}
catch
(
Exception
e
)
{
System
.
err
.
println
(
"Error starting "
+
configuration
.
getPort
()
+
": "
+
e
.
getMessage
()
);
Log
.
error
(
"Error starting: "
+
configuration
.
getPort
(),
e
);
}
}
abstract
void
start
();
/**
* Stops this connection by unbinding the socket acceptor. Does nothing when the instance is not started.
* Halts connection acceptation and gracefully releases resources.
*
* An invocation of this method on an instance that was not accepting connections should have no effect.
*
* Instances of this class do not support configuration changes (see class documentation). As a result, there is no
* requirement that an instance that is stopped after it was running can successfully be restarted.
*/
public
synchronized
void
stop
()
{
if
(
socketAcceptor
!=
null
)
{
socketAcceptor
.
unbind
();
socketAcceptor
=
null
;
}
}
abstract
void
stop
();
/**
* Determines if this instance is currently in a state where it is actively serving connections.
*
* @return false when this instance is started and is currently being used to serve connections (otherwise true)
*/
public
boolean
isIdle
()
{
return
this
.
socketAcceptor
!=
null
&&
this
.
socketAcceptor
.
getManagedSessionCount
()
==
0
;
}
public
synchronized
int
getPort
()
{
return
configuration
.
getPort
();
}
// TODO see if we can avoid exposing MINA internals.
public
synchronized
NioSocketAcceptor
getSocketAcceptor
()
{
return
socketAcceptor
;
}
private
static
NioSocketAcceptor
buildSocketAcceptor
()
{
// Create SocketAcceptor with correct number of processors
final
int
processorCount
=
JiveGlobals
.
getIntProperty
(
"xmpp.processor.count"
,
Runtime
.
getRuntime
().
availableProcessors
()
);
final
NioSocketAcceptor
socketAcceptor
=
new
NioSocketAcceptor
(
processorCount
);
// Set that it will be possible to bind a socket if there is a connection in the timeout state.
socketAcceptor
.
setReuseAddress
(
true
);
// Set the listen backlog (queue) length. Default is 50.
socketAcceptor
.
setBacklog
(
JiveGlobals
.
getIntProperty
(
"xmpp.socket.backlog"
,
50
)
);
// Set default (low level) settings for new socket connections
final
SocketSessionConfig
socketSessionConfig
=
socketAcceptor
.
getSessionConfig
();
//socketSessionConfig.setKeepAlive();
final
int
receiveBuffer
=
JiveGlobals
.
getIntProperty
(
"xmpp.socket.buffer.receive"
,
-
1
);
if
(
receiveBuffer
>
0
)
{
socketSessionConfig
.
setReceiveBufferSize
(
receiveBuffer
);
}
final
int
sendBuffer
=
JiveGlobals
.
getIntProperty
(
"xmpp.socket.buffer.send"
,
-
1
);
if
(
sendBuffer
>
0
)
{
socketSessionConfig
.
setSendBufferSize
(
sendBuffer
);
}
final
int
linger
=
JiveGlobals
.
getIntProperty
(
"xmpp.socket.linger"
,
-
1
);
if
(
linger
>
0
)
{
socketSessionConfig
.
setSoLinger
(
linger
);
}
socketSessionConfig
.
setTcpNoDelay
(
JiveGlobals
.
getBooleanProperty
(
"xmpp.socket.tcp-nodelay"
,
socketSessionConfig
.
isTcpNoDelay
()
)
);
return
socketAcceptor
;
}
private
void
configureJMX
(
NioSocketAcceptor
acceptor
,
String
suffix
)
{
final
String
prefix
=
IoServiceMBean
.
class
.
getPackage
().
getName
();
// monitor the IoService
try
{
final
IoServiceMBean
mbean
=
new
IoServiceMBean
(
acceptor
);
final
MBeanServer
mbs
=
ManagementFactory
.
getPlatformMBeanServer
();
final
ObjectName
name
=
new
ObjectName
(
prefix
+
":type=SocketAcceptor,name="
+
suffix
);
mbs
.
registerMBean
(
mbean
,
name
);
// mbean.startCollectingStats(JiveGlobals.getIntProperty("xmpp.socket.jmx.interval", 60000));
}
catch
(
JMException
ex
)
{
Log
.
warn
(
"Failed to register MINA acceptor mbean (JMX): "
+
ex
);
}
// optionally register IoSession mbeans (one per session)
if
(
JiveGlobals
.
getBooleanProperty
(
"xmpp.socket.jmx.sessions"
,
false
)
)
{
acceptor
.
addListener
(
new
IoServiceListener
()
{
private
ObjectName
getObjectNameForSession
(
IoSession
session
)
throws
MalformedObjectNameException
{
return
new
ObjectName
(
prefix
+
":type=IoSession,name="
+
session
.
getRemoteAddress
().
toString
().
replace
(
':'
,
'/'
)
);
}
public
void
sessionCreated
(
IoSession
session
)
{
try
{
ManagementFactory
.
getPlatformMBeanServer
().
registerMBean
(
new
IoSessionMBean
(
session
),
getObjectNameForSession
(
session
)
);
}
catch
(
JMException
ex
)
{
Log
.
warn
(
"Failed to register MINA session mbean (JMX): "
+
ex
);
}
}
public
void
sessionDestroyed
(
IoSession
session
)
{
try
{
ManagementFactory
.
getPlatformMBeanServer
().
unregisterMBean
(
getObjectNameForSession
(
session
)
);
}
catch
(
JMException
ex
)
{
Log
.
warn
(
"Failed to unregister MINA session mbean (JMX): "
+
ex
);
}
}
public
void
serviceActivated
(
IoService
service
)
throws
Exception
{}
public
void
serviceDeactivated
(
IoService
service
)
throws
Exception
{}
public
void
serviceIdle
(
IoService
service
,
IdleStatus
idleStatus
)
throws
Exception
{}
}
);
}
}
// TODO this is a utility class that can be pulled out. There are several similar implementations throughout the codebase.
private
static
class
DelegatingThreadFactory
implements
ThreadFactory
{
private
final
AtomicInteger
threadId
;
private
final
ThreadFactory
originalThreadFactory
;
private
String
threadNamePrefix
;
public
DelegatingThreadFactory
(
String
threadNamePrefix
,
ThreadFactory
originalThreadFactory
)
{
this
.
originalThreadFactory
=
originalThreadFactory
;
threadId
=
new
AtomicInteger
(
0
);
this
.
threadNamePrefix
=
threadNamePrefix
;
}
public
Thread
newThread
(
Runnable
runnable
)
{
Thread
t
=
originalThreadFactory
.
newThread
(
runnable
);
t
.
setName
(
threadNamePrefix
+
threadId
.
incrementAndGet
());
t
.
setDaemon
(
true
);
return
t
;
}
}
abstract
boolean
isIdle
();
}
src/java/org/jivesoftware/openfire/spi/ConnectionListener.java
View file @
81111e99
...
...
@@ -19,8 +19,7 @@ import java.util.Set;
* As a server, Openfire accepts connection requests from other network entities. The exact functionality is subject to
* configuration details (eg: TCP port on which connections are accepted, TLS policy that is applied, etc). An instance
* of this class is used to manage this configuration for one type of connection (on one TCP port), and is responsible
* for managing the lifecycle of the entity that implements the acceptance of new socket connections (as implemented by
* {@link ConnectionAcceptor}.
* for managing the lifecycle of the entity that implements the acceptance of new connections.
*
* @author Guus der Kinderen, guus.der.kinderen@gmail.com
*/
...
...
@@ -178,7 +177,7 @@ public class ConnectionListener
case
SOCKET_S2S:
case
BOSH_C2S:
case
WEBADMIN:
Log
.
debug
(
"Not starting a (
NIO
-based) connection acceptor, as connections of type "
+
getType
()
+
" depend on another IO technology."
);
Log
.
debug
(
"Not starting a (
MINA
-based) connection acceptor, as connections of type "
+
getType
()
+
" depend on another IO technology."
);
return
;
default
:
...
...
@@ -206,7 +205,7 @@ public class ConnectionListener
}
Log
.
debug
(
"Starting..."
);
connectionAcceptor
=
new
ConnectionAcceptor
(
generateConnectionConfiguration
()
);
connectionAcceptor
=
new
MINA
ConnectionAcceptor
(
generateConnectionConfiguration
()
);
connectionAcceptor
.
start
();
Log
.
info
(
"Started."
);
}
...
...
@@ -313,19 +312,19 @@ public class ConnectionListener
}
/**
* Returns the acceptor that is managed by the instance.
* Returns the
MINA-specific socket
acceptor that is managed by the instance.
*
* @return A socket acceptor, or null when this listener is disabled.
* @return A socket acceptor, or null when this listener is disabled
or not based on a MINA implementation
.
*/
// TODO see if we can avoid exposing MINA internals.
public
NioSocketAcceptor
getSocketAcceptor
()
{
if
(
connectionAcceptor
==
null
)
if
(
connectionAcceptor
==
null
||
!(
connectionAcceptor
instanceof
MINAConnectionAcceptor
)
)
{
return
null
;
}
return
connectionAcceptor
.
getSocketAcceptor
();
return
((
MINAConnectionAcceptor
)
connectionAcceptor
)
.
getSocketAcceptor
();
}
/**
...
...
src/java/org/jivesoftware/openfire/spi/MINAConnectionAcceptor.java
0 → 100644
View file @
81111e99
package
org
.
jivesoftware
.
openfire
.
spi
;
import
org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder
;
import
org.apache.mina.core.service.IoService
;
import
org.apache.mina.core.service.IoServiceListener
;
import
org.apache.mina.core.session.IdleStatus
;
import
org.apache.mina.core.session.IoSession
;
import
org.apache.mina.filter.codec.ProtocolCodecFilter
;
import
org.apache.mina.filter.executor.ExecutorFilter
;
import
org.apache.mina.filter.ssl.SslFilter
;
import
org.apache.mina.integration.jmx.IoServiceMBean
;
import
org.apache.mina.integration.jmx.IoSessionMBean
;
import
org.apache.mina.transport.socket.SocketSessionConfig
;
import
org.apache.mina.transport.socket.nio.NioSocketAcceptor
;
import
org.jivesoftware.openfire.Connection
;
import
org.jivesoftware.openfire.JMXManager
;
import
org.jivesoftware.openfire.net.StalledSessionsFilter
;
import
org.jivesoftware.openfire.nio.*
;
import
org.jivesoftware.util.JiveGlobals
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
javax.management.JMException
;
import
javax.management.MBeanServer
;
import
javax.management.MalformedObjectNameException
;
import
javax.management.ObjectName
;
import
java.lang.management.ManagementFactory
;
import
java.net.InetSocketAddress
;
import
java.util.concurrent.ThreadFactory
;
import
java.util.concurrent.ThreadPoolExecutor
;
import
java.util.concurrent.TimeUnit
;
import
java.util.concurrent.atomic.AtomicInteger
;
/**
* This class is responsible for accepting new (socket) connections, using Java NIO implementation provided by the
* Apache MINA framework.
*
* @author Guus der Kinderen, guus.der.kinderen@gmail.com
*/
class
MINAConnectionAcceptor
extends
ConnectionAcceptor
{
private
final
Logger
Log
;
private
final
String
name
;
private
final
ConnectionHandler
connectionHandler
;
private
final
EncryptionArtifactFactory
encryptionArtifactFactory
;
private
NioSocketAcceptor
socketAcceptor
;
/**
* Instantiates, but not starts, a new instance.
*/
public
MINAConnectionAcceptor
(
ConnectionConfiguration
configuration
)
{
super
(
configuration
);
this
.
name
=
configuration
.
getType
().
toString
().
toLowerCase
()
+
(
configuration
.
getTlsPolicy
()
==
Connection
.
TLSPolicy
.
legacyMode
?
"_ssl"
:
""
);
Log
=
LoggerFactory
.
getLogger
(
MINAConnectionAcceptor
.
class
.
getName
()
+
"["
+
name
+
"]"
);
switch
(
configuration
.
getType
()
)
{
case
SOCKET_S2S:
connectionHandler
=
new
ServerConnectionHandler
(
configuration
);
break
;
case
SOCKET_C2S:
connectionHandler
=
new
ClientConnectionHandler
(
configuration
);
break
;
case
COMPONENT:
connectionHandler
=
new
ComponentConnectionHandler
(
configuration
);
break
;
case
CONNECTION_MANAGER:
connectionHandler
=
new
MultiplexerConnectionHandler
(
configuration
);
break
;
default
:
throw
new
IllegalStateException
(
"This implementation does not support the connection type as defined in the provided configuration: "
+
configuration
.
getType
()
);
}
this
.
encryptionArtifactFactory
=
new
EncryptionArtifactFactory
(
configuration
);
}
/**
* Starts this acceptor by binding the socket acceptor. When the acceptor is already started, a warning will be
* logged and the method invocation is otherwise ignored.
*/
@Override
public
synchronized
void
start
()
{
if
(
socketAcceptor
!=
null
)
{
Log
.
warn
(
"Unable to start acceptor (it is already started!)"
);
return
;
}
try
{
// Configure the thread pool that is to be used.
final
int
initialSize
=
(
configuration
.
getMaxThreadPoolSize
()
/
4
)
+
1
;
final
ExecutorFilter
executorFilter
=
new
ExecutorFilter
(
initialSize
,
configuration
.
getMaxThreadPoolSize
(),
60
,
TimeUnit
.
SECONDS
);
final
ThreadPoolExecutor
eventExecutor
=
(
ThreadPoolExecutor
)
executorFilter
.
getExecutor
();
final
ThreadFactory
threadFactory
=
new
DelegatingThreadFactory
(
name
+
"-thread-"
,
eventExecutor
.
getThreadFactory
()
);
eventExecutor
.
setThreadFactory
(
threadFactory
);
// Construct a new socket acceptor, and configure it.
socketAcceptor
=
buildSocketAcceptor
();
if
(
JMXManager
.
isEnabled
()
)
{
configureJMX
(
socketAcceptor
,
name
);
}
final
DefaultIoFilterChainBuilder
filterChain
=
socketAcceptor
.
getFilterChain
();
filterChain
.
addFirst
(
ConnectionManagerImpl
.
EXECUTOR_FILTER_NAME
,
executorFilter
);
// Add the XMPP codec filter
filterChain
.
addAfter
(
ConnectionManagerImpl
.
EXECUTOR_FILTER_NAME
,
ConnectionManagerImpl
.
XMPP_CODEC_FILTER_NAME
,
new
ProtocolCodecFilter
(
new
XMPPCodecFactory
()
)
);
// Kill sessions whose outgoing queues keep growing and fail to send traffic
filterChain
.
addAfter
(
ConnectionManagerImpl
.
XMPP_CODEC_FILTER_NAME
,
ConnectionManagerImpl
.
CAPACITY_FILTER_NAME
,
new
StalledSessionsFilter
()
);
// Ports can be configured to start connections in SSL (as opposed to upgrade a non-encrypted socket to an encrypted one, typically using StartTLS)
if
(
configuration
.
getTlsPolicy
()
==
Connection
.
TLSPolicy
.
legacyMode
)
{
final
SslFilter
sslFilter
=
encryptionArtifactFactory
.
createServerModeSslFilter
();
filterChain
.
addAfter
(
ConnectionManagerImpl
.
EXECUTOR_FILTER_NAME
,
ConnectionManagerImpl
.
TLS_FILTER_NAME
,
sslFilter
);
}
// Throttle sessions who send data too fast
if
(
configuration
.
getMaxBufferSize
()
>
0
)
{
socketAcceptor
.
getSessionConfig
().
setMaxReadBufferSize
(
configuration
.
getMaxBufferSize
()
);
Log
.
debug
(
"Throttling read buffer for connections to max={} bytes"
,
configuration
.
getMaxBufferSize
()
);
}
// Start accepting connections
socketAcceptor
.
setHandler
(
connectionHandler
);
socketAcceptor
.
bind
(
new
InetSocketAddress
(
configuration
.
getBindAddress
(),
configuration
.
getPort
()
)
);
}
catch
(
Exception
e
)
{
System
.
err
.
println
(
"Error starting "
+
configuration
.
getPort
()
+
": "
+
e
.
getMessage
()
);
Log
.
error
(
"Error starting: "
+
configuration
.
getPort
(),
e
);
}
}
/**
* Stops this acceptor by unbinding the socket acceptor. Does nothing when the instance is not started.
*/
@Override
public
synchronized
void
stop
()
{
if
(
socketAcceptor
!=
null
)
{
socketAcceptor
.
unbind
();
socketAcceptor
=
null
;
}
}
/**
* Determines if this instance is currently in a state where it is actively serving connections.
*
* @return false when this instance is started and is currently being used to serve connections (otherwise true)
*/
@Override
public
boolean
isIdle
()
{
return
this
.
socketAcceptor
!=
null
&&
this
.
socketAcceptor
.
getManagedSessionCount
()
==
0
;
}
public
synchronized
int
getPort
()
{
return
configuration
.
getPort
();
}
// TODO see if we can avoid exposing MINA internals.
public
synchronized
NioSocketAcceptor
getSocketAcceptor
()
{
return
socketAcceptor
;
}
private
static
NioSocketAcceptor
buildSocketAcceptor
()
{
// Create SocketAcceptor with correct number of processors
final
int
processorCount
=
JiveGlobals
.
getIntProperty
(
"xmpp.processor.count"
,
Runtime
.
getRuntime
().
availableProcessors
()
);
final
NioSocketAcceptor
socketAcceptor
=
new
NioSocketAcceptor
(
processorCount
);
// Set that it will be possible to bind a socket if there is a connection in the timeout state.
socketAcceptor
.
setReuseAddress
(
true
);
// Set the listen backlog (queue) length. Default is 50.
socketAcceptor
.
setBacklog
(
JiveGlobals
.
getIntProperty
(
"xmpp.socket.backlog"
,
50
)
);
// Set default (low level) settings for new socket connections
final
SocketSessionConfig
socketSessionConfig
=
socketAcceptor
.
getSessionConfig
();
//socketSessionConfig.setKeepAlive();
final
int
receiveBuffer
=
JiveGlobals
.
getIntProperty
(
"xmpp.socket.buffer.receive"
,
-
1
);
if
(
receiveBuffer
>
0
)
{
socketSessionConfig
.
setReceiveBufferSize
(
receiveBuffer
);
}
final
int
sendBuffer
=
JiveGlobals
.
getIntProperty
(
"xmpp.socket.buffer.send"
,
-
1
);
if
(
sendBuffer
>
0
)
{
socketSessionConfig
.
setSendBufferSize
(
sendBuffer
);
}
final
int
linger
=
JiveGlobals
.
getIntProperty
(
"xmpp.socket.linger"
,
-
1
);
if
(
linger
>
0
)
{
socketSessionConfig
.
setSoLinger
(
linger
);
}
socketSessionConfig
.
setTcpNoDelay
(
JiveGlobals
.
getBooleanProperty
(
"xmpp.socket.tcp-nodelay"
,
socketSessionConfig
.
isTcpNoDelay
()
)
);
return
socketAcceptor
;
}
private
void
configureJMX
(
NioSocketAcceptor
acceptor
,
String
suffix
)
{
final
String
prefix
=
IoServiceMBean
.
class
.
getPackage
().
getName
();
// monitor the IoService
try
{
final
IoServiceMBean
mbean
=
new
IoServiceMBean
(
acceptor
);
final
MBeanServer
mbs
=
ManagementFactory
.
getPlatformMBeanServer
();
final
ObjectName
name
=
new
ObjectName
(
prefix
+
":type=SocketAcceptor,name="
+
suffix
);
mbs
.
registerMBean
(
mbean
,
name
);
// mbean.startCollectingStats(JiveGlobals.getIntProperty("xmpp.socket.jmx.interval", 60000));
}
catch
(
JMException
ex
)
{
Log
.
warn
(
"Failed to register MINA acceptor mbean (JMX): "
+
ex
);
}
// optionally register IoSession mbeans (one per session)
if
(
JiveGlobals
.
getBooleanProperty
(
"xmpp.socket.jmx.sessions"
,
false
)
)
{
acceptor
.
addListener
(
new
IoServiceListener
()
{
private
ObjectName
getObjectNameForSession
(
IoSession
session
)
throws
MalformedObjectNameException
{
return
new
ObjectName
(
prefix
+
":type=IoSession,name="
+
session
.
getRemoteAddress
().
toString
().
replace
(
':'
,
'/'
)
);
}
public
void
sessionCreated
(
IoSession
session
)
{
try
{
ManagementFactory
.
getPlatformMBeanServer
().
registerMBean
(
new
IoSessionMBean
(
session
),
getObjectNameForSession
(
session
)
);
}
catch
(
JMException
ex
)
{
Log
.
warn
(
"Failed to register MINA session mbean (JMX): "
+
ex
);
}
}
public
void
sessionDestroyed
(
IoSession
session
)
{
try
{
ManagementFactory
.
getPlatformMBeanServer
().
unregisterMBean
(
getObjectNameForSession
(
session
)
);
}
catch
(
JMException
ex
)
{
Log
.
warn
(
"Failed to unregister MINA session mbean (JMX): "
+
ex
);
}
}
public
void
serviceActivated
(
IoService
service
)
throws
Exception
{}
public
void
serviceDeactivated
(
IoService
service
)
throws
Exception
{}
public
void
serviceIdle
(
IoService
service
,
IdleStatus
idleStatus
)
throws
Exception
{}
}
);
}
}
// TODO this is a utility class that can be pulled out. There are several similar implementations throughout the codebase.
private
static
class
DelegatingThreadFactory
implements
ThreadFactory
{
private
final
AtomicInteger
threadId
;
private
final
ThreadFactory
originalThreadFactory
;
private
String
threadNamePrefix
;
public
DelegatingThreadFactory
(
String
threadNamePrefix
,
ThreadFactory
originalThreadFactory
)
{
this
.
originalThreadFactory
=
originalThreadFactory
;
threadId
=
new
AtomicInteger
(
0
);
this
.
threadNamePrefix
=
threadNamePrefix
;
}
public
Thread
newThread
(
Runnable
runnable
)
{
Thread
t
=
originalThreadFactory
.
newThread
(
runnable
);
t
.
setName
(
threadNamePrefix
+
threadId
.
incrementAndGet
());
t
.
setDaemon
(
true
);
return
t
;
}
}
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment