Commit 75bf293c authored by Leonardo Aramaki's avatar Leonardo Aramaki

Upon failure, do unregister of current listeners and register them

all again with the new dppclient just reconnected
parent a604ae0d
......@@ -24,13 +24,10 @@ public class DDPClientImpl {
private final DDPClient client;
private RxWebSocket websocket;
private Flowable<RxWebSocketCallback.Base> flowable;
private CompositeDisposable subscriptions;
private OkHttpClient okHttpClient;
private CompositeDisposable disposables;
private String currentSession;
private String url;
public DDPClientImpl(DDPClient self, OkHttpClient client) {
okHttpClient = client;
websocket = new RxWebSocket(client);
this.client = self;
}
......@@ -57,28 +54,24 @@ public class DDPClientImpl {
public void connect(final TaskCompletionSource<DDPClientCallback.Connect> task, final String url,
String session) {
try {
this.url = url;
flowable = websocket.connect(url).autoConnect();
CompositeDisposable subscriptions = new CompositeDisposable();
CompositeDisposable disposables = new CompositeDisposable();
subscriptions.add(
disposables.add(
flowable.retry().filter(callback -> callback instanceof RxWebSocketCallback.Open)
.subscribe(
callback -> {
callback ->
sendMessage("connect",
json -> (TextUtils.isEmpty(session) ? json : json.put("session", DDPClientImpl.this.currentSession))
.put(
"version", "pre2")
.put("support", new JSONArray().put("pre2").put("pre1")),
task);
},
err -> {
System.err.println("Something bad happened!");
}
task),
RCLog::e
)
);
subscriptions.add(
disposables.add(
flowable.filter(
callback -> callback instanceof RxWebSocketCallback.Message)
.map(callback -> ((RxWebSocketCallback.Message) callback).responseBodyString)
......@@ -90,22 +83,22 @@ public class DDPClientImpl {
currentSession = response.optString("session");
task.trySetResult(
new DDPClientCallback.Connect(client, response.optString("session")));
subscriptions.dispose();
disposables.clear();
} else if ("error".equals(msg) && "Already connected".equals(
response.optString("reason"))) {
task.trySetResult(new DDPClientCallback.Connect(client, null));
subscriptions.dispose();
disposables.clear();
} else if ("failed".equals(msg)) {
task.trySetError(
new DDPClientCallback.Connect.Failed(client, response.optString("version")));
subscriptions.dispose();
disposables.clear();
}
},
err -> task.trySetError(new DDPClientCallback.Connect.Timeout(client))
)
);
addErrorCallback(subscriptions, task);
addErrorCallback(disposables, task);
subscribeBaseListeners();
} catch (Exception e) {
......@@ -121,9 +114,9 @@ public class DDPClientImpl {
sendMessage("ping", json -> json.put("id", id));
if (requested) {
CompositeDisposable subscriptions = new CompositeDisposable();
CompositeDisposable disposables = new CompositeDisposable();
subscriptions.add(
disposables.add(
flowable.filter(callback -> callback instanceof RxWebSocketCallback.Message)
.map(callback -> ((RxWebSocketCallback.Message) callback).responseBodyString)
.map(DDPClientImpl::toJson)
......@@ -134,12 +127,12 @@ public class DDPClientImpl {
if ("pong".equals(msg)) {
if (response.isNull("id")) {
task.setResult(new DDPClientCallback.Ping(client, null));
subscriptions.dispose();
disposables.clear();
} else {
String _id = response.optString("id");
if (id.equals(_id)) {
task.setResult(new DDPClientCallback.Ping(client, id));
subscriptions.dispose();
disposables.clear();
}
}
}
......@@ -148,7 +141,7 @@ public class DDPClientImpl {
)
);
addErrorCallback(subscriptions, task);
addErrorCallback(disposables, task);
} else {
task.trySetError(new DDPClientCallback.Closed(client));
}
......@@ -160,9 +153,9 @@ public class DDPClientImpl {
sendMessage("sub", json -> json.put("id", id).put("name", name).put("params", params));
if (requested) {
CompositeDisposable subscriptions = new CompositeDisposable();
CompositeDisposable disposables = new CompositeDisposable();
subscriptions.add(
disposables.add(
flowable.filter(callback -> callback instanceof RxWebSocketCallback.Message)
.map(callback -> ((RxWebSocketCallback.Message) callback).responseBodyString)
.map(DDPClientImpl::toJson)
......@@ -175,7 +168,7 @@ public class DDPClientImpl {
String _id = ids.optString(i);
if (id.equals(_id)) {
task.setResult(new DDPSubscription.Ready(client, id));
subscriptions.dispose();
disposables.clear();
break;
}
}
......@@ -185,16 +178,15 @@ public class DDPClientImpl {
if (id.equals(_id)) {
task.setError(new DDPSubscription.NoSub.Error(client, id,
response.optJSONObject("error")));
subscriptions.dispose();
disposables.clear();
}
}
},
err -> {
}
RCLog::e
)
);
addErrorCallback(subscriptions, task);
addErrorCallback(disposables, task);
} else {
task.trySetError(new DDPClientCallback.Closed(client));
}
......@@ -206,9 +198,9 @@ public class DDPClientImpl {
final boolean requested = sendMessage("unsub", json -> json.put("id", id));
if (requested) {
CompositeDisposable subscriptions = new CompositeDisposable();
CompositeDisposable disposables = new CompositeDisposable();
subscriptions.add(
disposables.add(
flowable.filter(callback -> callback instanceof RxWebSocketCallback.Message)
.map(callback -> ((RxWebSocketCallback.Message) callback).responseBodyString)
.map(DDPClientImpl::toJson)
......@@ -219,7 +211,7 @@ public class DDPClientImpl {
String _id = response.optString("id");
if (id.equals(_id)) {
task.setResult(new DDPSubscription.NoSub(client, id));
subscriptions.dispose();
disposables.clear();
}
}
},
......@@ -228,7 +220,7 @@ public class DDPClientImpl {
)
);
addErrorCallback(subscriptions, task);
addErrorCallback(disposables, task);
} else {
task.trySetError(new DDPClientCallback.Closed(client));
}
......@@ -241,9 +233,9 @@ public class DDPClientImpl {
json -> json.put("method", method).put("params", params).put("id", id));
if (requested) {
CompositeDisposable subscriptions = new CompositeDisposable();
CompositeDisposable disposables = new CompositeDisposable();
subscriptions.add(
disposables.add(
flowable.filter(callback -> callback instanceof RxWebSocketCallback.Message)
.map(callback -> ((RxWebSocketCallback.Message) callback).responseBodyString)
.map(DDPClientImpl::toJson)
......@@ -261,7 +253,7 @@ public class DDPClientImpl {
String result = response.optString("result");
task.setResult(new DDPClientCallback.RPC(client, id, result));
}
subscriptions.dispose();
disposables.clear();
}
}
},
......@@ -273,20 +265,20 @@ public class DDPClientImpl {
)
);
addErrorCallback(subscriptions, task);
addErrorCallback(disposables, task);
} else {
task.trySetError(new DDPClientCallback.Closed(client));
}
}
private void subscribeBaseListeners() {
if (subscriptions != null &&
subscriptions.size() > 0 && !subscriptions.isDisposed()) {
if (disposables != null &&
disposables.size() > 0 && !disposables.isDisposed()) {
return;
}
subscriptions = new CompositeDisposable();
subscriptions.add(
disposables = new CompositeDisposable();
disposables.add(
flowable.filter(callback -> callback instanceof RxWebSocketCallback.Message)
.map(callback -> ((RxWebSocketCallback.Message) callback).responseBodyString)
.map(DDPClientImpl::toJson)
......@@ -301,8 +293,7 @@ public class DDPClientImpl {
}
}
},
err -> {
}
RCLog::e
)
);
}
......@@ -351,8 +342,8 @@ public class DDPClientImpl {
}
public void unsubscribeBaseListeners() {
if (subscriptions.size() > 0 || !subscriptions.isDisposed()) {
subscriptions.dispose();
if (disposables.size() > 0 || !disposables.isDisposed()) {
disposables.clear();
}
}
......@@ -363,13 +354,7 @@ public class DDPClientImpl {
.cast(RxWebSocketCallback.Close.class)
.subscribe(
task::setResult,
err -> {
if (err instanceof Exception) {
task.setError((Exception) err);
} else {
task.setError(new Exception(err));
}
}
err -> setTaskError(task, err)
);
return task.getTask().onSuccessTask(_task -> {
......@@ -385,13 +370,7 @@ public class DDPClientImpl {
.cast(RxWebSocketCallback.Failure.class)
.subscribe(
task::setResult,
err -> {
if (err instanceof Exception) {
task.setError((Exception) err);
} else {
task.setError(new Exception(err));
}
}
err -> setTaskError(task, err)
);
return task.getTask().onSuccessTask(_task -> {
......@@ -414,19 +393,20 @@ public class DDPClientImpl {
private void sendMessage(String msg, @Nullable JSONBuilder json,
TaskCompletionSource<?> taskForSetError) {
if (!sendMessage(msg, json)) {
if (taskForSetError != null)
if (taskForSetError != null) {
taskForSetError.trySetError(new DDPClientCallback.Closed(client));
}
}
}
private void addErrorCallback(CompositeDisposable subscriptions, TaskCompletionSource<?> task) {
subscriptions.add(
private void addErrorCallback(CompositeDisposable disposables, TaskCompletionSource<?> task) {
disposables.add(
flowable.subscribe(
base -> {
},
err -> {
task.trySetError(new Exception(err));
subscriptions.dispose();
disposables.clear();
}
)
);
......@@ -440,6 +420,14 @@ public class DDPClientImpl {
}
}
private void setTaskError(TaskCompletionSource<? extends RxWebSocketCallback.Base> task, Throwable throwable) {
if (throwable instanceof Exception) {
task.setError((Exception) throwable);
} else {
task.setError(new Exception(throwable));
}
}
private interface JSONBuilder {
@NonNull
JSONObject create(JSONObject root) throws JSONException;
......
......@@ -162,6 +162,7 @@ public class MainPresenter extends BasePresenter<MainContract.View>
addSubscription(subscription);
// Update to RxJava 2 (issue: https://github.com/RocketChat/Rocket.Chat.Android/issues/355)
addSubscription(
RxJavaInterop.toV2Observable(connectivityManagerApi.getServerConnectivityAsObservable())
.subscribeOn(Schedulers.io())
......@@ -171,8 +172,7 @@ public class MainPresenter extends BasePresenter<MainContract.View>
view.showConnecting();
}
},
err -> {
})
Logger::report)
);
}
......
......@@ -38,6 +38,7 @@ import chat.rocket.persistence.realm.RealmStore;
import chat.rocket.persistence.realm.models.internal.RealmSession;
import hu.akarnokd.rxjava.interop.RxJavaInterop;
import hugo.weaving.DebugLog;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.disposables.CompositeDisposable;
import rx.Single;
......@@ -254,28 +255,14 @@ public class RocketChatWebSocketThread extends HandlerThread {
task.getResult().client.getOnFailureCallback().onSuccess(_task -> {
ddpClient = null;
CompositeDisposable subscriptions = new CompositeDisposable();
CompositeDisposable disposables = new CompositeDisposable();
connectivityManager.notifyConnecting(hostname);
subscriptions.add(
disposables.add(
RxJavaInterop.toV2Single(connect().retry())
.observeOn(AndroidSchedulers.from(getLooper()))
.subscribe(
rocketChatWebSocketThread -> {
String roomId = rocketChatCache.getSelectedRoomId();
if (roomId != null) {
StreamRoomMessage streamRoomObserver = new StreamRoomMessage(
appContext,
hostname,
realmHelper,
ddpClientRef,
roomId
);
streamRoomObserver.register();
listeners.add(streamRoomObserver);
}
},
err -> {
subscriptions.dispose();
}
rocketChatWebSocketThread -> forceRegisteringListeners(),
err -> logErrorAndDispose(err, disposables)
)
);
return null;
......@@ -308,6 +295,22 @@ public class RocketChatWebSocketThread extends HandlerThread {
}));
}
private void logErrorAndDispose(Throwable throwable, CompositeDisposable disposables) {
RCLog.e(throwable);
disposables.clear();
}
private void forceRegisteringListeners() {
Iterator<Registrable> iterator = listeners.iterator();
while (iterator.hasNext()) {
Registrable registrable = iterator.next();
registrable.unregister();
iterator.remove();
}
listenersRegistered = false;
registerListeners();
}
@DebugLog
private Single<Boolean> connect() {
return connectDDPClient()
......
......@@ -118,10 +118,7 @@ public class MethodCallObserver extends AbstractModelObserver<MethodCall> {
} else if (exception instanceof DDPClientCallback.RPC.Timeout) {
// temp "fix"- we need to rewrite the connection layer a bit
errMessage = "{\"message\": \"Connection Timeout\"}";
} /*else if (exception instanceof RxWebSocketCallback.Failure) {
// temp "fix"- we need to rewrite the connection layer a bit
errMessage = "{\"message\": \"Connection Failure\"}";
}*/ else {
} else {
errMessage = exception.getMessage();
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment