Commit 9300692a authored by Alex Wenckus's avatar Alex Wenckus Committed by alex

Implemented blocking NIO for file transfer proxy. JM-782

git-svn-id: http://svn.igniterealtime.org/svn/repos/wildfire/trunk@4535 b35dd754-fafc-0310-a699-88a17e54d16e
parent 25653287
...@@ -11,8 +11,8 @@ ...@@ -11,8 +11,8 @@
package org.jivesoftware.wildfire.filetransfer; package org.jivesoftware.wildfire.filetransfer;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.io.InputStream; import java.nio.channels.ReadableByteChannel;
import java.io.OutputStream; import java.nio.channels.WritableByteChannel;
/** /**
* An interface to track the progress of a file transfer through the server. This interface is used * An interface to track the progress of a file transfer through the server. This interface is used
...@@ -58,11 +58,11 @@ public interface FileTransferProgress { ...@@ -58,11 +58,11 @@ public interface FileTransferProgress {
*/ */
public void setTransferFuture(Future<?> future); public void setTransferFuture(Future<?> future);
public void setInputStream(InputStream initiatorInputStream); public void setInputChannel(ReadableByteChannel inputChannel);
public InputStream getInputStream(); public ReadableByteChannel getInputChannel();
public void setOutputStream(OutputStream targetOutputStream); public void setOutputChannel(WritableByteChannel outputChannel);
public OutputStream getOutputStream(); public WritableByteChannel getOutputChannel();
} }
...@@ -14,8 +14,9 @@ import org.jivesoftware.util.CacheSizes; ...@@ -14,8 +14,9 @@ import org.jivesoftware.util.CacheSizes;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.nio.channels.ReadableByteChannel;
import java.io.OutputStream; import java.nio.channels.WritableByteChannel;
import java.nio.ByteBuffer;
/** /**
* Tracks the different connections related to a file transfer. There are two connections, the * Tracks the different connections related to a file transfer. There are two connections, the
...@@ -25,9 +26,9 @@ public class DefaultProxyTransfer implements ProxyTransfer { ...@@ -25,9 +26,9 @@ public class DefaultProxyTransfer implements ProxyTransfer {
private String initiator; private String initiator;
private InputStream inputStream; private ReadableByteChannel inputStream;
private OutputStream outputStream; private WritableByteChannel outputStream;
private String target; private String target;
...@@ -52,20 +53,20 @@ public class DefaultProxyTransfer implements ProxyTransfer { ...@@ -52,20 +53,20 @@ public class DefaultProxyTransfer implements ProxyTransfer {
this.initiator = initiator; this.initiator = initiator;
} }
public InputStream getInputStream() { public ReadableByteChannel getInputChannel() {
return inputStream; return inputStream;
} }
public void setInputStream(InputStream initiatorInputStream) { public void setInputChannel(ReadableByteChannel inputChannel) {
this.inputStream = initiatorInputStream; this.inputStream = inputChannel;
} }
public OutputStream getOutputStream() { public WritableByteChannel getOutputChannel() {
return outputStream; return outputStream;
} }
public void setOutputStream(OutputStream outputStream) { public void setOutputChannel(WritableByteChannel outputChannel) {
this.outputStream = outputStream; this.outputStream = outputChannel;
} }
public String getTarget() { public String getTarget() {
...@@ -112,25 +113,27 @@ public class DefaultProxyTransfer implements ProxyTransfer { ...@@ -112,25 +113,27 @@ public class DefaultProxyTransfer implements ProxyTransfer {
if(!isActivatable()) { if(!isActivatable()) {
throw new IOException("Transfer missing party"); throw new IOException("Transfer missing party");
} }
InputStream in = getInputStream(); ReadableByteChannel in = getInputChannel();
OutputStream out = new ProxyOutputStream(getOutputStream()); WritableByteChannel out = new ProxyOutputChannel(getOutputChannel());
final byte[] b = new byte[BUFFER_SIZE]; ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
int count = 0; int count = 0;
amountWritten = 0; amountWritten = 0;
do { do {
// write to the output stream // write to the output channel
out.write(b, 0, count); out.write(buffer);
amountWritten += count; amountWritten += count;
// read more bytes from the input stream // read more bytes from the input channel
count = in.read(b); buffer.clear();
count = in.read(buffer);
buffer.flip();
} while (count >= 0); } while (count >= 0);
getInputStream().close(); in.close();
getOutputStream().close(); out.close();
} }
public int getCachedSize() { public int getCachedSize() {
......
...@@ -20,13 +20,15 @@ import org.jivesoftware.wildfire.stats.i18nStatistic; ...@@ -20,13 +20,15 @@ import org.jivesoftware.wildfire.stats.i18nStatistic;
import org.xmpp.packet.JID; import org.xmpp.packet.JID;
import java.io.*; import java.io.*;
import java.net.ServerSocket; import java.net.*;
import java.net.Socket;
import java.net.InetAddress;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.ByteBuffer;
/** /**
* Manages the connections to the proxy server. The connections go through two stages before * Manages the connections to the proxy server. The connections go through two stages before
...@@ -48,14 +50,14 @@ public class ProxyConnectionManager { ...@@ -48,14 +50,14 @@ public class ProxyConnectionManager {
private Future<?> socketProcess; private Future<?> socketProcess;
private ServerSocket serverSocket;
private int proxyPort; private int proxyPort;
private FileTransferManager transferManager; private FileTransferManager transferManager;
private String className; private String className;
private ServerSocketChannel serverChannel;
public ProxyConnectionManager(FileTransferManager manager) { public ProxyConnectionManager(FileTransferManager manager) {
String cacheName = "File Transfer"; String cacheName = "File Transfer";
connectionMap = new Cache<String, ProxyTransfer>(cacheName, -1, 1000 * 60 * 10); connectionMap = new Cache<String, ProxyTransfer>(cacheName, -1, 1000 * 60 * 10);
...@@ -80,20 +82,23 @@ public class ProxyConnectionManager { ...@@ -80,20 +82,23 @@ public class ProxyConnectionManager {
reset(); reset();
socketProcess = executor.submit(new Runnable() { socketProcess = executor.submit(new Runnable() {
public void run() { public void run() {
ServerSocket serverSocket;
try { try {
serverSocket = new ServerSocket(port, -1, bindInterface); serverChannel = ServerSocketChannel.open();
serverSocket = serverChannel.socket();
serverSocket.bind(new InetSocketAddress(bindInterface, port));
} }
catch (IOException e) { catch (IOException e) {
Log.error("Error creating server socket", e); Log.error("Error binding server socket", e);
return; return;
} }
while (serverSocket.isBound()) { while (serverSocket.isBound()) {
final Socket socket; final SocketChannel channel;
try { try {
socket = serverSocket.accept(); channel = serverChannel.accept();
} }
catch (IOException e) { catch (IOException e) {
if (!serverSocket.isClosed()) { if (serverChannel.isOpen()) {
Log.error("Error accepting proxy connection", e); Log.error("Error accepting proxy connection", e);
continue; continue;
} }
...@@ -104,13 +109,13 @@ public class ProxyConnectionManager { ...@@ -104,13 +109,13 @@ public class ProxyConnectionManager {
executor.submit(new Runnable() { executor.submit(new Runnable() {
public void run() { public void run() {
try { try {
processConnection(socket); processConnection(channel);
} }
catch (IOException ie) { catch (IOException ie) {
Log.error("Error processing file transfer proxy connection", Log.error("Error processing file transfer proxy connection",
ie); ie);
try { try {
socket.close(); channel.close();
} }
catch (IOException e) { catch (IOException e) {
/* Do Nothing */ /* Do Nothing */
...@@ -128,21 +133,21 @@ public class ProxyConnectionManager { ...@@ -128,21 +133,21 @@ public class ProxyConnectionManager {
return proxyPort; return proxyPort;
} }
private void processConnection(Socket connection) throws IOException { private void processConnection(SocketChannel connection) throws IOException {
OutputStream out = new DataOutputStream(connection.getOutputStream()); ByteBuffer buffer = ByteBuffer.allocate(512);
InputStream in = new DataInputStream(connection.getInputStream()); connection.read(buffer);
buffer.flip();
// first byte is version should be 5 // first byte is version should be 5
int b = in.read(); int b = buffer.get();
if (b != 5) { if (b != 5) {
throw new IOException("Only SOCKS5 supported"); throw new IOException("Only SOCKS5 supported");
} }
// second byte number of authentication methods supported // second byte number of authentication methods supported
b = in.read(); b = buffer.get();
int[] auth = new int[b]; int[] auth = new int[b];
for (int i = 0; i < b; i++) { for (int i = 0; i < b; i++) {
auth[i] = in.read(); auth[i] = buffer.get();
} }
int authMethod = -1; int authMethod = -1;
...@@ -163,9 +168,11 @@ public class ProxyConnectionManager { ...@@ -163,9 +168,11 @@ public class ProxyConnectionManager {
byte[] cmd = new byte[2]; byte[] cmd = new byte[2];
cmd[0] = (byte) 0x05; cmd[0] = (byte) 0x05;
cmd[1] = (byte) 0x00; cmd[1] = (byte) 0x00;
out.write(cmd); buffer.clear();
buffer.put(cmd).flip();
connection.write(buffer);
String responseDigest = processIncomingSocks5Message(in); String responseDigest = processIncomingSocks5Message(connection, buffer);
try { try {
synchronized (connectionLock) { synchronized (connectionLock) {
ProxyTransfer transfer = connectionMap.get(responseDigest); ProxyTransfer transfer = connectionMap.get(responseDigest);
...@@ -175,21 +182,26 @@ public class ProxyConnectionManager { ...@@ -175,21 +182,26 @@ public class ProxyConnectionManager {
connectionMap.put(responseDigest, transfer); connectionMap.put(responseDigest, transfer);
} }
else { else {
transfer.setInputStream(connection.getInputStream()); transfer.setInputChannel(connection);
} }
} }
cmd = createOutgoingSocks5Message(0, responseDigest); cmd = createOutgoingSocks5Message(0, responseDigest);
out.write(cmd); buffer.clear();
buffer.put(cmd).flip();
connection.write(buffer);
} }
catch (UnauthorizedException eu) { catch (UnauthorizedException eu) {
cmd = createOutgoingSocks5Message(2, responseDigest); cmd = createOutgoingSocks5Message(2, responseDigest);
out.write(cmd); buffer.clear();
buffer.put(cmd).flip();
connection.write(buffer);
throw new IOException("Illegal proxy transfer"); throw new IOException("Illegal proxy transfer");
} }
} }
private ProxyTransfer createProxyTransfer(String transferDigest, Socket targetSocket) private ProxyTransfer createProxyTransfer(String transferDigest,
throws IOException { WritableByteChannel targetSocket)
{
ProxyTransfer provider; ProxyTransfer provider;
try { try {
Class c = ClassUtils.forName(className); Class c = ClassUtils.forName(className);
...@@ -201,32 +213,26 @@ public class ProxyConnectionManager { ...@@ -201,32 +213,26 @@ public class ProxyConnectionManager {
} }
provider.setTransferDigest(transferDigest); provider.setTransferDigest(transferDigest);
provider.setOutputStream(targetSocket.getOutputStream()); provider.setOutputChannel(targetSocket);
return provider; return provider;
} }
@SuppressWarnings({"ResultOfMethodCallIgnored"}) private static String processIncomingSocks5Message(SocketChannel in, ByteBuffer buffer)
private static String processIncomingSocks5Message(InputStream in)
throws IOException { throws IOException {
buffer.clear();
// read the version and command // read the version and command
byte[] cmd = new byte[5]; int read = in.read(buffer);
int read = in.read(cmd, 0, 5);
if (read != 5) { if (read < 5) {
throw new IOException("Error reading Socks5 version and command"); throw new IOException("Error reading Socks5 version and command");
} }
buffer.position(5);
// read the digest // read the digest
byte[] addr = new byte[cmd[4]]; byte[] addr = new byte[buffer.get(4)];
read = in.read(addr, 0, addr.length); buffer.get(addr, 0, addr.length);
if (read != addr.length) {
throw new IOException("Error reading provided address");
}
String digest = new String(addr);
in.read();
in.read();
return digest; return new String(addr);
} }
private static byte[] createOutgoingSocks5Message(int cmd, String digest) { private static byte[] createOutgoingSocks5Message(int cmd, String digest) {
...@@ -338,9 +344,9 @@ public class ProxyConnectionManager { ...@@ -338,9 +344,9 @@ public class ProxyConnectionManager {
socketProcess.cancel(true); socketProcess.cancel(true);
socketProcess = null; socketProcess = null;
} }
if (serverSocket != null) { if (serverChannel != null) {
try { try {
serverSocket.close(); serverChannel.close();
} }
catch (IOException e) { catch (IOException e) {
Log.warn("Error closing proxy listening socket", e); Log.warn("Error closing proxy listening socket", e);
...@@ -354,7 +360,7 @@ public class ProxyConnectionManager { ...@@ -354,7 +360,7 @@ public class ProxyConnectionManager {
} }
public double sample() { public double sample() {
return (ProxyOutputStream.amountTransfered.getAndSet(0) / 1000); return (ProxyOutputChannel.amountTransfered.getAndSet(0) / 1000);
} }
} }
} }
\ No newline at end of file
...@@ -8,23 +8,33 @@ ...@@ -8,23 +8,33 @@
*/ */
package org.jivesoftware.wildfire.filetransfer.proxy; package org.jivesoftware.wildfire.filetransfer.proxy;
import java.io.OutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.DataOutputStream;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.nio.channels.WritableByteChannel;
import java.nio.ByteBuffer;
/** /**
* An output stream which tracks the amount of bytes transfered by proxy sockets. * An output stream which tracks the amount of bytes transfered by proxy sockets.
*/ */
public class ProxyOutputStream extends DataOutputStream { public class ProxyOutputChannel implements WritableByteChannel {
static AtomicLong amountTransfered = new AtomicLong(0); static AtomicLong amountTransfered = new AtomicLong(0);
private WritableByteChannel channel;
public ProxyOutputStream(OutputStream out) { public ProxyOutputChannel(WritableByteChannel channel) {
super(out); this.channel = channel;
} }
public synchronized void write(byte b[], int off, int len) throws IOException { public int write(ByteBuffer src) throws IOException {
super.write(b, off, len); int bytesWritten = channel.write(src);
amountTransfered.addAndGet(len); amountTransfered.addAndGet(bytesWritten);
return bytesWritten;
}
public boolean isOpen() {
return channel.isOpen();
}
public void close() throws IOException {
channel.close();
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment