Commit d6f192c5 authored by Leonardo Aramaki's avatar Leonardo Aramaki

Handling timeout errors from service keep alive as a Failure callback event instead of a close one

parent 75bf293c
...@@ -65,7 +65,7 @@ public class DDPClient { ...@@ -65,7 +65,7 @@ public class DDPClient {
return impl.getOnFailureCallback(); return impl.getOnFailureCallback();
} }
public void close() { public void close(int code, String reason) {
impl.close(1000, "closed by DDPClient#close()"); impl.close(code, reason);
} }
} }
package chat.rocket.android_ddp; package chat.rocket.android_ddp;
import android.os.SystemClock;
import android.support.annotation.NonNull; import android.support.annotation.NonNull;
import android.support.annotation.Nullable; import android.support.annotation.Nullable;
import android.text.TextUtils; import android.text.TextUtils;
...@@ -8,6 +9,7 @@ import org.json.JSONArray; ...@@ -8,6 +9,7 @@ import org.json.JSONArray;
import org.json.JSONException; import org.json.JSONException;
import org.json.JSONObject; import org.json.JSONObject;
import java.io.IOException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
...@@ -19,8 +21,11 @@ import chat.rocket.android_ddp.rx.RxWebSocketCallback; ...@@ -19,8 +21,11 @@ import chat.rocket.android_ddp.rx.RxWebSocketCallback;
import io.reactivex.Flowable; import io.reactivex.Flowable;
import io.reactivex.disposables.CompositeDisposable; import io.reactivex.disposables.CompositeDisposable;
import okhttp3.OkHttpClient; import okhttp3.OkHttpClient;
import okhttp3.Response;
public class DDPClientImpl { public class DDPClientImpl {
public static final int CLOSED_NORMALLY = 1000;
public static final int CLOSED_NOT_ALIVE = 1001;
private final DDPClient client; private final DDPClient client;
private RxWebSocket websocket; private RxWebSocket websocket;
private Flowable<RxWebSocketCallback.Base> flowable; private Flowable<RxWebSocketCallback.Base> flowable;
...@@ -135,6 +140,7 @@ public class DDPClientImpl { ...@@ -135,6 +140,7 @@ public class DDPClientImpl {
disposables.clear(); disposables.clear();
} }
} }
disposables.clear();
} }
}, },
err -> task.setError(new DDPClientCallback.Ping.Timeout(client)) err -> task.setError(new DDPClientCallback.Ping.Timeout(client))
...@@ -286,6 +292,7 @@ public class DDPClientImpl { ...@@ -286,6 +292,7 @@ public class DDPClientImpl {
response -> { response -> {
String msg = extractMsg(response); String msg = extractMsg(response);
if ("ping".equals(msg)) { if ("ping".equals(msg)) {
SystemClock.sleep(8000);
if (response.isNull("id")) { if (response.isNull("id")) {
sendMessage("pong", null); sendMessage("pong", null);
} else { } else {
......
package chat.rocket.android_ddp.rx; package chat.rocket.android_ddp.rx;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableOnSubscribe;
import io.reactivex.exceptions.OnErrorNotImplementedException;
import io.reactivex.flowables.ConnectableFlowable;
import java.io.IOException; import java.io.IOException;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import chat.rocket.android.log.RCLog; import chat.rocket.android.log.RCLog;
import chat.rocket.android_ddp.DDPClientImpl;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableOnSubscribe;
import io.reactivex.exceptions.OnErrorNotImplementedException;
import io.reactivex.flowables.ConnectableFlowable;
import okhttp3.OkHttpClient; import okhttp3.OkHttpClient;
import okhttp3.Request; import okhttp3.Request;
import okhttp3.Response; import okhttp3.Response;
...@@ -57,8 +57,17 @@ public class RxWebSocket { ...@@ -57,8 +57,17 @@ public class RxWebSocket {
@Override @Override
public void onClosed(WebSocket webSocket, int code, String reason) { public void onClosed(WebSocket webSocket, int code, String reason) {
emitter.onNext(new RxWebSocketCallback.Close(webSocket, code, reason)); switch (code) {
emitter.onComplete(); case DDPClientImpl.CLOSED_NORMALLY:
emitter.onNext(new RxWebSocketCallback.Close(webSocket, code, reason));
emitter.onComplete();
break;
case DDPClientImpl.CLOSED_NOT_ALIVE:
emitter.onNext(new RxWebSocketCallback.Failure(webSocket, new Exception(reason), null));
break;
default:
RCLog.e("Websocket closed abnormally");
}
} }
}), }),
BackpressureStrategy.BUFFER BackpressureStrategy.BUFFER
......
...@@ -45,8 +45,8 @@ public class DDPClientWrapper { ...@@ -45,8 +45,8 @@ public class DDPClientWrapper {
/** /**
* close connection. * close connection.
*/ */
public void close() { public void close(int code, String reason) {
ddpClient.close(); ddpClient.close(code, reason);
} }
/** /**
......
...@@ -32,6 +32,7 @@ import chat.rocket.android.service.observer.NewMessageObserver; ...@@ -32,6 +32,7 @@ import chat.rocket.android.service.observer.NewMessageObserver;
import chat.rocket.android.service.observer.PushSettingsObserver; import chat.rocket.android.service.observer.PushSettingsObserver;
import chat.rocket.android.service.observer.SessionObserver; import chat.rocket.android.service.observer.SessionObserver;
import chat.rocket.android.service.observer.TokenLoginObserver; import chat.rocket.android.service.observer.TokenLoginObserver;
import chat.rocket.android_ddp.DDPClientImpl;
import chat.rocket.core.models.ServerInfo; import chat.rocket.core.models.ServerInfo;
import chat.rocket.persistence.realm.RealmHelper; import chat.rocket.persistence.realm.RealmHelper;
import chat.rocket.persistence.realm.RealmStore; import chat.rocket.persistence.realm.RealmStore;
...@@ -207,7 +208,7 @@ public class RocketChatWebSocketThread extends HandlerThread { ...@@ -207,7 +208,7 @@ public class RocketChatWebSocketThread extends HandlerThread {
if (task.isFaulted()) { if (task.isFaulted()) {
RCLog.e(task.getError()); RCLog.e(task.getError());
emitter.onSuccess(false); emitter.onSuccess(false);
ddpClient.close(); ddpClient.close(DDPClientImpl.CLOSED_NOT_ALIVE, "Ping timeout");
} else { } else {
keepAliveTimer.update(); keepAliveTimer.update();
emitter.onSuccess(true); emitter.onSuccess(true);
...@@ -379,7 +380,7 @@ public class RocketChatWebSocketThread extends HandlerThread { ...@@ -379,7 +380,7 @@ public class RocketChatWebSocketThread extends HandlerThread {
} }
listenersRegistered = false; listenersRegistered = false;
if (ddpClient != null) { if (ddpClient != null) {
ddpClient.close(); ddpClient.close(DDPClientImpl.CLOSED_NORMALLY, "Closed by client");
ddpClient = null; ddpClient = null;
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment