Commit 4401316d authored by Alex Wenckus's avatar Alex Wenckus Committed by alex

Reverted JM-782

git-svn-id: http://svn.igniterealtime.org/svn/repos/wildfire/trunk@5418 b35dd754-fafc-0310-a699-88a17e54d16e
parent 2d8ee095
...@@ -11,8 +11,8 @@ ...@@ -11,8 +11,8 @@
package org.jivesoftware.wildfire.filetransfer; package org.jivesoftware.wildfire.filetransfer;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.nio.channels.ReadableByteChannel; import java.io.InputStream;
import java.nio.channels.WritableByteChannel; import java.io.OutputStream;
/** /**
* An interface to track the progress of a file transfer through the server. This interface is used * An interface to track the progress of a file transfer through the server. This interface is used
...@@ -58,11 +58,11 @@ public interface FileTransferProgress { ...@@ -58,11 +58,11 @@ public interface FileTransferProgress {
*/ */
public void setTransferFuture(Future<?> future); public void setTransferFuture(Future<?> future);
public void setInputChannel(ReadableByteChannel inputChannel); public void setInputStream(InputStream initiatorInputStream);
public ReadableByteChannel getInputChannel(); public InputStream getInputStream();
public void setOutputChannel(WritableByteChannel outputChannel); public void setOutputStream(OutputStream targetOutputStream);
public WritableByteChannel getOutputChannel(); public OutputStream getOutputStream();
} }
...@@ -14,9 +14,8 @@ import org.jivesoftware.util.CacheSizes; ...@@ -14,9 +14,8 @@ import org.jivesoftware.util.CacheSizes;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.io.IOException; import java.io.IOException;
import java.nio.channels.ReadableByteChannel; import java.io.InputStream;
import java.nio.channels.WritableByteChannel; import java.io.OutputStream;
import java.nio.ByteBuffer;
/** /**
* Tracks the different connections related to a file transfer. There are two connections, the * Tracks the different connections related to a file transfer. There are two connections, the
...@@ -26,9 +25,9 @@ public class DefaultProxyTransfer implements ProxyTransfer { ...@@ -26,9 +25,9 @@ public class DefaultProxyTransfer implements ProxyTransfer {
private String initiator; private String initiator;
private ReadableByteChannel inputStream; private InputStream inputStream;
private WritableByteChannel outputStream; private OutputStream outputStream;
private String target; private String target;
...@@ -53,20 +52,20 @@ public class DefaultProxyTransfer implements ProxyTransfer { ...@@ -53,20 +52,20 @@ public class DefaultProxyTransfer implements ProxyTransfer {
this.initiator = initiator; this.initiator = initiator;
} }
public ReadableByteChannel getInputChannel() { public InputStream getInputStream() {
return inputStream; return inputStream;
} }
public void setInputChannel(ReadableByteChannel inputChannel) { public void setInputStream(InputStream initiatorInputStream) {
this.inputStream = inputChannel; this.inputStream = initiatorInputStream;
} }
public WritableByteChannel getOutputChannel() { public OutputStream getOutputStream() {
return outputStream; return outputStream;
} }
public void setOutputChannel(WritableByteChannel outputChannel) { public void setOutputStream(OutputStream outputStream) {
this.outputStream = outputChannel; this.outputStream = outputStream;
} }
public String getTarget() { public String getTarget() {
...@@ -113,27 +112,25 @@ public class DefaultProxyTransfer implements ProxyTransfer { ...@@ -113,27 +112,25 @@ public class DefaultProxyTransfer implements ProxyTransfer {
if(!isActivatable()) { if(!isActivatable()) {
throw new IOException("Transfer missing party"); throw new IOException("Transfer missing party");
} }
ReadableByteChannel in = getInputChannel(); InputStream in = getInputStream();
WritableByteChannel out = new ProxyOutputChannel(getOutputChannel()); OutputStream out = new ProxyOutputStream(getOutputStream());
ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE); final byte[] b = new byte[BUFFER_SIZE];
int count = 0; int count = 0;
amountWritten = 0; amountWritten = 0;
do { do {
// write to the output channel // write to the output stream
out.write(buffer); out.write(b, 0, count);
amountWritten += count; amountWritten += count;
// read more bytes from the input channel // read more bytes from the input stream
buffer.clear(); count = in.read(b);
count = in.read(buffer);
buffer.flip();
} while (count >= 0); } while (count >= 0);
in.close(); getInputStream().close();
out.close(); getOutputStream().close();
} }
public int getCachedSize() { public int getCachedSize() {
......
...@@ -20,15 +20,13 @@ import org.jivesoftware.wildfire.stats.i18nStatistic; ...@@ -20,15 +20,13 @@ import org.jivesoftware.wildfire.stats.i18nStatistic;
import org.xmpp.packet.JID; import org.xmpp.packet.JID;
import java.io.*; import java.io.*;
import java.net.*; import java.net.ServerSocket;
import java.net.Socket;
import java.net.InetAddress;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.ByteBuffer;
/** /**
* Manages the connections to the proxy server. The connections go through two stages before * Manages the connections to the proxy server. The connections go through two stages before
...@@ -50,14 +48,14 @@ public class ProxyConnectionManager { ...@@ -50,14 +48,14 @@ public class ProxyConnectionManager {
private Future<?> socketProcess; private Future<?> socketProcess;
private ServerSocket serverSocket;
private int proxyPort; private int proxyPort;
private FileTransferManager transferManager; private FileTransferManager transferManager;
private String className; private String className;
private ServerSocketChannel serverChannel;
public ProxyConnectionManager(FileTransferManager manager) { public ProxyConnectionManager(FileTransferManager manager) {
String cacheName = "File Transfer"; String cacheName = "File Transfer";
connectionMap = new Cache<String, ProxyTransfer>(cacheName, -1, 1000 * 60 * 10); connectionMap = new Cache<String, ProxyTransfer>(cacheName, -1, 1000 * 60 * 10);
...@@ -82,23 +80,20 @@ public class ProxyConnectionManager { ...@@ -82,23 +80,20 @@ public class ProxyConnectionManager {
reset(); reset();
socketProcess = executor.submit(new Runnable() { socketProcess = executor.submit(new Runnable() {
public void run() { public void run() {
ServerSocket serverSocket;
try { try {
serverChannel = ServerSocketChannel.open(); serverSocket = new ServerSocket(port, -1, bindInterface);
serverSocket = serverChannel.socket();
serverSocket.bind(new InetSocketAddress(bindInterface, port));
} }
catch (IOException e) { catch (IOException e) {
Log.error("Error binding server socket", e); Log.error("Error creating server socket", e);
return; return;
} }
while (serverSocket.isBound()) { while (serverSocket.isBound()) {
final SocketChannel channel; final Socket socket;
try { try {
channel = serverChannel.accept(); socket = serverSocket.accept();
} }
catch (IOException e) { catch (IOException e) {
if (serverChannel.isOpen()) { if (!serverSocket.isClosed()) {
Log.error("Error accepting proxy connection", e); Log.error("Error accepting proxy connection", e);
continue; continue;
} }
...@@ -109,13 +104,13 @@ public class ProxyConnectionManager { ...@@ -109,13 +104,13 @@ public class ProxyConnectionManager {
executor.submit(new Runnable() { executor.submit(new Runnable() {
public void run() { public void run() {
try { try {
processConnection(channel); processConnection(socket);
} }
catch (IOException ie) { catch (IOException ie) {
Log.error("Error processing file transfer proxy connection", Log.error("Error processing file transfer proxy connection",
ie); ie);
try { try {
channel.close(); socket.close();
} }
catch (IOException e) { catch (IOException e) {
/* Do Nothing */ /* Do Nothing */
...@@ -133,21 +128,21 @@ public class ProxyConnectionManager { ...@@ -133,21 +128,21 @@ public class ProxyConnectionManager {
return proxyPort; return proxyPort;
} }
private void processConnection(SocketChannel connection) throws IOException { private void processConnection(Socket connection) throws IOException {
ByteBuffer buffer = ByteBuffer.allocate(512); OutputStream out = new DataOutputStream(connection.getOutputStream());
connection.read(buffer); InputStream in = new DataInputStream(connection.getInputStream());
buffer.flip();
// first byte is version should be 5 // first byte is version should be 5
int b = buffer.get(); int b = in.read();
if (b != 5) { if (b != 5) {
throw new IOException("Only SOCKS5 supported"); throw new IOException("Only SOCKS5 supported");
} }
// second byte number of authentication methods supported // second byte number of authentication methods supported
b = buffer.get(); b = in.read();
int[] auth = new int[b]; int[] auth = new int[b];
for (int i = 0; i < b; i++) { for (int i = 0; i < b; i++) {
auth[i] = buffer.get(); auth[i] = in.read();
} }
int authMethod = -1; int authMethod = -1;
...@@ -168,11 +163,9 @@ public class ProxyConnectionManager { ...@@ -168,11 +163,9 @@ public class ProxyConnectionManager {
byte[] cmd = new byte[2]; byte[] cmd = new byte[2];
cmd[0] = (byte) 0x05; cmd[0] = (byte) 0x05;
cmd[1] = (byte) 0x00; cmd[1] = (byte) 0x00;
buffer.clear(); out.write(cmd);
buffer.put(cmd).flip();
connection.write(buffer);
String responseDigest = processIncomingSocks5Message(connection, buffer); String responseDigest = processIncomingSocks5Message(in);
try { try {
synchronized (connectionLock) { synchronized (connectionLock) {
ProxyTransfer transfer = connectionMap.get(responseDigest); ProxyTransfer transfer = connectionMap.get(responseDigest);
...@@ -182,26 +175,21 @@ public class ProxyConnectionManager { ...@@ -182,26 +175,21 @@ public class ProxyConnectionManager {
connectionMap.put(responseDigest, transfer); connectionMap.put(responseDigest, transfer);
} }
else { else {
transfer.setInputChannel(connection); transfer.setInputStream(connection.getInputStream());
} }
} }
cmd = createOutgoingSocks5Message(0, responseDigest); cmd = createOutgoingSocks5Message(0, responseDigest);
buffer.clear(); out.write(cmd);
buffer.put(cmd).flip();
connection.write(buffer);
} }
catch (UnauthorizedException eu) { catch (UnauthorizedException eu) {
cmd = createOutgoingSocks5Message(2, responseDigest); cmd = createOutgoingSocks5Message(2, responseDigest);
buffer.clear(); out.write(cmd);
buffer.put(cmd).flip();
connection.write(buffer);
throw new IOException("Illegal proxy transfer"); throw new IOException("Illegal proxy transfer");
} }
} }
private ProxyTransfer createProxyTransfer(String transferDigest, private ProxyTransfer createProxyTransfer(String transferDigest, Socket targetSocket)
WritableByteChannel targetSocket) throws IOException {
{
ProxyTransfer provider; ProxyTransfer provider;
try { try {
Class c = ClassUtils.forName(className); Class c = ClassUtils.forName(className);
...@@ -213,26 +201,32 @@ public class ProxyConnectionManager { ...@@ -213,26 +201,32 @@ public class ProxyConnectionManager {
} }
provider.setTransferDigest(transferDigest); provider.setTransferDigest(transferDigest);
provider.setOutputChannel(targetSocket); provider.setOutputStream(targetSocket.getOutputStream());
return provider; return provider;
} }
private static String processIncomingSocks5Message(SocketChannel in, ByteBuffer buffer) @SuppressWarnings({"ResultOfMethodCallIgnored"})
private static String processIncomingSocks5Message(InputStream in)
throws IOException { throws IOException {
buffer.clear();
// read the version and command // read the version and command
int read = in.read(buffer); byte[] cmd = new byte[5];
int read = in.read(cmd, 0, 5);
if (read < 5) { if (read != 5) {
throw new IOException("Error reading Socks5 version and command"); throw new IOException("Error reading Socks5 version and command");
} }
buffer.position(5);
// read the digest // read the digest
byte[] addr = new byte[buffer.get(4)]; byte[] addr = new byte[cmd[4]];
buffer.get(addr, 0, addr.length); read = in.read(addr, 0, addr.length);
if (read != addr.length) {
throw new IOException("Error reading provided address");
}
String digest = new String(addr);
in.read();
in.read();
return new String(addr); return digest;
} }
private static byte[] createOutgoingSocks5Message(int cmd, String digest) { private static byte[] createOutgoingSocks5Message(int cmd, String digest) {
...@@ -344,9 +338,9 @@ public class ProxyConnectionManager { ...@@ -344,9 +338,9 @@ public class ProxyConnectionManager {
socketProcess.cancel(true); socketProcess.cancel(true);
socketProcess = null; socketProcess = null;
} }
if (serverChannel != null) { if (serverSocket != null) {
try { try {
serverChannel.close(); serverSocket.close();
} }
catch (IOException e) { catch (IOException e) {
Log.warn("Error closing proxy listening socket", e); Log.warn("Error closing proxy listening socket", e);
...@@ -360,7 +354,7 @@ public class ProxyConnectionManager { ...@@ -360,7 +354,7 @@ public class ProxyConnectionManager {
} }
public double sample() { public double sample() {
return (ProxyOutputChannel.amountTransfered.getAndSet(0) / 1000); return (ProxyOutputStream.amountTransfered.getAndSet(0) / 1000);
} }
} }
} }
\ No newline at end of file
...@@ -8,33 +8,23 @@ ...@@ -8,33 +8,23 @@
*/ */
package org.jivesoftware.wildfire.filetransfer.proxy; package org.jivesoftware.wildfire.filetransfer.proxy;
import java.io.OutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.DataOutputStream;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.nio.channels.WritableByteChannel;
import java.nio.ByteBuffer;
/** /**
* An output stream which tracks the amount of bytes transfered by proxy sockets. * An output stream which tracks the amount of bytes transfered by proxy sockets.
*/ */
public class ProxyOutputChannel implements WritableByteChannel { public class ProxyOutputStream extends DataOutputStream {
static AtomicLong amountTransfered = new AtomicLong(0); static AtomicLong amountTransfered = new AtomicLong(0);
private WritableByteChannel channel;
public ProxyOutputChannel(WritableByteChannel channel) { public ProxyOutputStream(OutputStream out) {
this.channel = channel; super(out);
} }
public int write(ByteBuffer src) throws IOException { public synchronized void write(byte b[], int off, int len) throws IOException {
int bytesWritten = channel.write(src); super.write(b, off, len);
amountTransfered.addAndGet(bytesWritten); amountTransfered.addAndGet(len);
return bytesWritten;
}
public boolean isOpen() {
return channel.isOpen();
}
public void close() throws IOException {
channel.close();
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment