Commit 9707162b authored by Leonardo Aramaki's avatar Leonardo Aramaki

Fix reconnection issues

parent 87461e2a
......@@ -50,30 +50,30 @@ public class DDPClient {
impl = new DDPClientImpl(this, client);
}
public Task<DDPClientCallback.Connect> connect(String url, String session) {
private Task<DDPClientCallback.Connect> connect(String url, String session) {
hostname.set(url);
TaskCompletionSource<DDPClientCallback.Connect> task = new TaskCompletionSource<>();
impl.connect(task, url, session);
return task.getTask();
}
public Task<DDPClientCallback.Ping> ping(@Nullable String id) {
private Task<DDPClientCallback.Ping> ping(@Nullable String id) {
TaskCompletionSource<DDPClientCallback.Ping> task = new TaskCompletionSource<>();
impl.ping(task, id);
return task.getTask();
}
public Maybe<DDPClientCallback.Base> doPing(@Nullable String id) {
private Maybe<DDPClientCallback.Base> doPing(@Nullable String id) {
return impl.ping(id);
}
public Task<DDPSubscription.Ready> sub(String id, String name, JSONArray params) {
private Task<DDPSubscription.Ready> sub(String id, String name, JSONArray params) {
TaskCompletionSource<DDPSubscription.Ready> task = new TaskCompletionSource<>();
impl.sub(task, name, params, id);
return task.getTask();
}
public Task<DDPSubscription.NoSub> unsub(String id) {
private Task<DDPSubscription.NoSub> unsub(String id) {
TaskCompletionSource<DDPSubscription.NoSub> task = new TaskCompletionSource<>();
impl.unsub(task, id);
return task.getTask();
......
......@@ -52,7 +52,7 @@ public class DDPClientImpl {
}
}
public void connect(final TaskCompletionSource<DDPClientCallback.Connect> task, final String url,
/* package */ void connect(final TaskCompletionSource<DDPClientCallback.Connect> task, final String url,
String session) {
try {
flowable = websocket.connect(url).autoConnect(2);
......
......@@ -80,7 +80,11 @@ import io.reactivex.subjects.BehaviorSubject;
}
connectToServerIfNeeded(hostname, true/* force connect */)
.subscribeOn(Schedulers.io())
.subscribe(_val -> {
.subscribe(connected -> {
System.out.println(connected);
if (!connected) {
notifyConnectionLost(hostname, DDPClient.REASON_NETWORK_ERROR);
}
}, error -> {
RCLog.e(error);
notifyConnectionLost(hostname, DDPClient.REASON_NETWORK_ERROR);
......@@ -95,7 +99,7 @@ import io.reactivex.subjects.BehaviorSubject;
serverConnectivityList.put(hostname, ServerConnectivity.STATE_DISCONNECTED);
}
connectToServerIfNeeded(hostname, false)
.subscribe(_val -> {
.subscribe(connected -> {
}, RCLog::e);
}
......@@ -191,7 +195,8 @@ import io.reactivex.subjects.BehaviorSubject;
// notifyConnecting(hostname);
}
return connectToServer(hostname);
return connectToServer(hostname)
.onErrorResumeNext(Single.just(false));
});
}
......
......@@ -76,26 +76,6 @@ public class RocketChatWebSocketThread extends HandlerThread {
private final CompositeDisposable reconnectDisposable = new CompositeDisposable();
private boolean listenersRegistered;
private static class KeepAliveTimer {
private long lastTime;
private final long thresholdMs;
public KeepAliveTimer(long thresholdMs) {
this.thresholdMs = thresholdMs;
lastTime = System.currentTimeMillis();
}
public boolean shouldCheckPrecisely() {
return lastTime + thresholdMs < System.currentTimeMillis();
}
public void update() {
lastTime = System.currentTimeMillis();
}
}
private final KeepAliveTimer keepAliveTimer = new KeepAliveTimer(20000);
private RocketChatWebSocketThread(Context appContext, String hostname) {
super("RC_thread_" + hostname);
this.appContext = appContext;
......@@ -108,7 +88,7 @@ public class RocketChatWebSocketThread extends HandlerThread {
* build new Thread.
*/
@DebugLog
public static Single<RocketChatWebSocketThread> getStarted(Context appContext, String hostname) {
/* package */ static Single<RocketChatWebSocketThread> getStarted(Context appContext, String hostname) {
return Single.<RocketChatWebSocketThread>create(objectSingleEmitter -> {
new RocketChatWebSocketThread(appContext, hostname) {
@Override
......@@ -148,7 +128,7 @@ public class RocketChatWebSocketThread extends HandlerThread {
* terminate WebSocket thread.
*/
@DebugLog
public Single<Boolean> terminate() {
/* package */ Single<Boolean> terminate() {
if (isAlive()) {
return Single.create(emitter -> {
new Handler(getLooper()).post(() -> {
......@@ -181,7 +161,7 @@ public class RocketChatWebSocketThread extends HandlerThread {
* synchronize the state of the thread with ServerConfig.
*/
@DebugLog
public Single<Boolean> keepAlive() {
/* package */ Single<Boolean> keepAlive() {
return checkIfConnectionAlive()
.flatMap(alive -> alive ? Single.just(true) : connectWithExponentialBackoff());
}
......@@ -192,11 +172,6 @@ public class RocketChatWebSocketThread extends HandlerThread {
return Single.just(false);
}
if (!keepAliveTimer.shouldCheckPrecisely()) {
return Single.just(true);
}
keepAliveTimer.update();
return Single.create(emitter -> {
new Thread() {
@Override
......@@ -207,9 +182,8 @@ public class RocketChatWebSocketThread extends HandlerThread {
RCLog.e(error);
connectivityManager.notifyConnectionLost(
hostname, DDPClient.REASON_CLOSED_BY_USER);
emitter.onError(error);
emitter.onSuccess(false);
} else {
keepAliveTimer.update();
emitter.onSuccess(true);
}
return null;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment