Commit bd6f84e5 authored by Yusuke Iwaki's avatar Yusuke Iwaki

set connect timeout.

parent 2545749d
......@@ -36,6 +36,12 @@ public class DDPClientCallback {
this.version = version;
}
}
public static class Timeout extends BaseException {
public Timeout(DDPClient client) {
super(client);
}
}
}
public static class Ping extends Base {
......
......@@ -67,6 +67,7 @@ public class DDPClientImpl {
observable.filter(callback -> callback instanceof RxWebSocketCallback.Message)
.map(callback -> ((RxWebSocketCallback.Message) callback).responseBodyString)
.map(DDPClientImpl::toJson)
.timeout(7, TimeUnit.SECONDS)
.subscribe(response -> {
String msg = extractMsg(response);
if ("connected".equals(msg) && !response.isNull("session")) {
......@@ -83,6 +84,7 @@ public class DDPClientImpl {
subscriptions.unsubscribe();
}
}, err -> {
task.setError(new DDPClientCallback.Connect.Timeout(client));
}));
addErrorCallback(subscriptions, task);
......@@ -336,7 +338,7 @@ public class DDPClientImpl {
private void addErrorCallback(CompositeSubscription subscriptions, TaskCompletionSource<?> task) {
subscriptions.add(observable.subscribe(base -> {
}, err -> {
task.setError(new Exception(err));
task.trySetError(new Exception(err));
subscriptions.unsubscribe();
}));
}
......
......@@ -205,7 +205,12 @@ public class RocketChatWebSocketThread extends HandlerThread {
}
}).continueWithTask(task -> {
if (task.isFaulted()) {
ServerConfig.logConnectionError(serverConfigId, task.getError());
Exception error = task.getError();
if (error instanceof DDPClientCallback.Connect.Timeout) {
ServerConfig.logConnectionError(serverConfigId, new Exception("Connection Timeout"));
} else {
ServerConfig.logConnectionError(serverConfigId, task.getError());
}
}
return task;
});
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment