Commit abe34222 authored by Leonardo Aramaki's avatar Leonardo Aramaki

Fix reconnection loop issue when add new server

parent 65d5c3a7
...@@ -22,440 +22,467 @@ import io.reactivex.disposables.CompositeDisposable; ...@@ -22,440 +22,467 @@ import io.reactivex.disposables.CompositeDisposable;
import okhttp3.OkHttpClient; import okhttp3.OkHttpClient;
public class DDPClientImpl { public class DDPClientImpl {
private final DDPClient client; private final DDPClient client;
private RxWebSocket websocket; private RxWebSocket websocket;
private Flowable<RxWebSocketCallback.Base> flowable; private Flowable<RxWebSocketCallback.Base> flowable;
private CompositeDisposable disposables; private CompositeDisposable disposables;
private String currentSession; private String currentSession;
/* package */ DDPClientImpl(DDPClient self, OkHttpClient client) { /* package */ DDPClientImpl(DDPClient self, OkHttpClient client) {
websocket = new RxWebSocket(client); websocket = new RxWebSocket(client);
this.client = self; this.client = self;
}
private static JSONObject toJson(String s) {
if (TextUtils.isEmpty(s)) {
return null;
} }
try {
return new JSONObject(s);
} catch (JSONException e) {
return null;
}
}
private static String extractMsg(JSONObject response) { private static JSONObject toJson(String s) {
if (response == null || response.isNull("msg")) { if (TextUtils.isEmpty(s)) {
return null; return null;
} else { }
return response.optString("msg"); try {
return new JSONObject(s);
} catch (JSONException e) {
return null;
}
} }
}
/* package */ void connect(final TaskCompletionSource<DDPClientCallback.Connect> task, final String url, private static String extractMsg(JSONObject response) {
String session) { if (response == null || response.isNull("msg")) {
try { return null;
flowable = websocket.connect(url).autoConnect(2); } else {
CompositeDisposable disposables = new CompositeDisposable(); return response.optString("msg");
}
}
disposables.add( /* package */ void connect(final TaskCompletionSource<DDPClientCallback.Connect> task, final String url,
flowable.filter(callback -> callback instanceof RxWebSocketCallback.Open) String session) {
.subscribe( try {
callback -> flowable = websocket.connect(url).autoConnect(2);
sendMessage("connect", CompositeDisposable disposables = new CompositeDisposable();
json -> (TextUtils.isEmpty(session) ? json : json.put("session", DDPClientImpl.this.currentSession))
.put( disposables.add(
"version", "1") flowable.filter(callback -> callback instanceof RxWebSocketCallback.Open)
.put("support", new JSONArray().put("1").put("pre2").put("pre1")), .subscribe(
task), callback ->
RCLog::e sendMessage("connect",
) json -> (TextUtils.isEmpty(session) ? json : json.put("session", DDPClientImpl.this.currentSession))
); .put(
"version", "1")
disposables.add( .put("support", new JSONArray().put("1").put("pre2").put("pre1")),
flowable.filter( task),
callback -> callback instanceof RxWebSocketCallback.Message) RCLog::e
.map(callback -> ((RxWebSocketCallback.Message) callback).responseBodyString) )
.map(DDPClientImpl::toJson) );
.timeout(7, TimeUnit.SECONDS)
.subscribe(response -> { disposables.add(
String msg = extractMsg(response); flowable.filter(
if ("connected".equals(msg) && !response.isNull("session")) { callback -> callback instanceof RxWebSocketCallback.Message)
currentSession = response.optString("session"); .map(callback -> ((RxWebSocketCallback.Message) callback).responseBodyString)
task.trySetResult( .map(DDPClientImpl::toJson)
new DDPClientCallback.Connect(client, response.optString("session"))); .timeout(7, TimeUnit.SECONDS)
disposables.clear(); .subscribe(response -> {
} else if ("error".equals(msg) && "Already connected".equals( String msg = extractMsg(response);
response.optString("reason"))) { if ("connected".equals(msg) && !response.isNull("session")) {
task.trySetResult(new DDPClientCallback.Connect(client, null)); currentSession = response.optString("session");
disposables.clear(); task.trySetResult(
} else if ("failed".equals(msg)) { new DDPClientCallback.Connect(client, response.optString("session")));
task.trySetError( disposables.clear();
new DDPClientCallback.Connect.Failed(client, response.optString("version"))); } else if ("error".equals(msg) && "Already connected".equals(
disposables.clear(); response.optString("reason"))) {
} task.trySetResult(new DDPClientCallback.Connect(client, null));
}, disposables.clear();
err -> task.trySetError(new DDPClientCallback.Connect.Timeout(client)) } else if ("failed".equals(msg)) {
) task.trySetError(
); new DDPClientCallback.Connect.Failed(client, response.optString("version")));
disposables.clear();
}
},
err -> {
if (err instanceof TimeoutException) {
task.trySetError(new Exception("Your connection seems off…"));
} else {
task.trySetError(new Exception("Ooops. Something's up!"));
}
}
)
);
addErrorCallback(disposables, task); addErrorCallback(disposables, task);
subscribeBaseListeners(); subscribeBaseListeners();
} catch (Exception e) { } catch (Exception e) {
RCLog.e(e); RCLog.e(e);
}
} }
}
/* package */ Maybe<DDPClientCallback.Base> ping(@Nullable final String id) {
/* package */ Maybe<DDPClientCallback.Base> ping(@Nullable final String id) {
final boolean requested = (TextUtils.isEmpty(id)) ?
final boolean requested = (TextUtils.isEmpty(id)) ? sendMessage("ping", null) :
sendMessage("ping", null) : sendMessage("ping", json -> json.put("id", id));
sendMessage("ping", json -> json.put("id", id));
if (requested) {
if (requested) { return flowable.filter(callback -> callback instanceof RxWebSocketCallback.Message)
return flowable.filter(callback -> callback instanceof RxWebSocketCallback.Message) .timeout(8, TimeUnit.SECONDS)
.timeout(8, TimeUnit.SECONDS) .map(callback -> ((RxWebSocketCallback.Message) callback).responseBodyString)
.map(callback -> ((RxWebSocketCallback.Message) callback).responseBodyString) .map(DDPClientImpl::toJson)
.map(DDPClientImpl::toJson) .filter(response -> "pong".equalsIgnoreCase(extractMsg(response)))
.filter(response -> "pong".equalsIgnoreCase(extractMsg(response))) .doOnError(error -> {
.doOnError(error -> { RCLog.e(error, "Heartbeat ping[%s] xxx failed xxx", id);
RCLog.e(error, "Heartbeat ping[%s] xxx failed xxx", id); })
}) .map(response -> {
.map(response -> { String msg = extractMsg(response);
String msg = extractMsg(response); if ("pong".equals(msg)) {
if ("pong".equals(msg)) { RCLog.d("pong[%s] <", id);
RCLog.d("pong[%s] <", id); if (response.isNull("id")) {
if (response.isNull("id")) { return new DDPClientCallback.Ping(client, null);
return new DDPClientCallback.Ping(client, null); } else {
} else { String _id = response.optString("id");
String _id = response.optString("id"); if (id.equals(_id)) {
if (id.equals(_id)) { return new DDPClientCallback.Ping(client, _id);
return new DDPClientCallback.Ping(client, _id); } else {
} else { return new DDPClientCallback.Ping.UnMatched(client, _id);
return new DDPClientCallback.Ping.UnMatched(client, _id); }
} }
} }
} // if we receive anything other than a pong throw an exception
// if we receive anything other than a pong throw an exception throw new DDPClientCallback.RPC.Error(client, id, response);
throw new DDPClientCallback.RPC.Error(client, id, response); }).firstElement();
}).firstElement(); } else {
} else { return Maybe.error(new DDPClientCallback.Closed(client));
return Maybe.error(new DDPClientCallback.Closed(client)); }
} }
}
/* package */void ping(final TaskCompletionSource<DDPClientCallback.Ping> task,
/* package */void ping(final TaskCompletionSource<DDPClientCallback.Ping> task, @Nullable final String id) {
@Nullable final String id) {
final boolean requested = (TextUtils.isEmpty(id)) ?
final boolean requested = (TextUtils.isEmpty(id)) ? sendMessage("ping", null) :
sendMessage("ping", null) : sendMessage("ping", json -> json.put("id", id));
sendMessage("ping", json -> json.put("id", id));
if (requested) {
if (requested) { CompositeDisposable disposables = new CompositeDisposable();
CompositeDisposable disposables = new CompositeDisposable();
disposables.add(
disposables.add( flowable.filter(callback -> callback instanceof RxWebSocketCallback.Message)
flowable.filter(callback -> callback instanceof RxWebSocketCallback.Message) .timeout(8, TimeUnit.SECONDS)
.timeout(8, TimeUnit.SECONDS) .map(callback -> ((RxWebSocketCallback.Message) callback).responseBodyString)
.map(callback -> ((RxWebSocketCallback.Message) callback).responseBodyString) .map(DDPClientImpl::toJson)
.map(DDPClientImpl::toJson) .subscribe(
.subscribe( response -> {
response -> { String msg = extractMsg(response);
String msg = extractMsg(response); if ("pong".equals(msg)) {
if ("pong".equals(msg)) { if (response.isNull("id")) {
if (response.isNull("id")) { task.setResult(new DDPClientCallback.Ping(client, null));
task.setResult(new DDPClientCallback.Ping(client, null)); } else {
} else { String _id = response.optString("id");
String _id = response.optString("id"); if (id.equals(_id)) {
if (id.equals(_id)) { task.setResult(new DDPClientCallback.Ping(client, id));
task.setResult(new DDPClientCallback.Ping(client, id)); }
}
disposables.clear();
}
},
err -> {
if (err instanceof TimeoutException) {
task.trySetError(new Exception("Your connection seems off…"));
} else {
task.trySetError(new Exception("Ooops. Something's up!"));
}
} }
} )
disposables.clear(); );
}
}, addErrorCallback(disposables, task);
err -> task.trySetError(new DDPClientCallback.Ping.Timeout(client)) } else {
) task.trySetError(new DDPClientCallback.Closed(client));
); }
addErrorCallback(disposables, task);
} else {
task.trySetError(new DDPClientCallback.Closed(client));
} }
}
/* package */
/* package */ void sub(final TaskCompletionSource<DDPSubscription.Ready> task, String name,
JSONArray params, String id) { void sub(final TaskCompletionSource<DDPSubscription.Ready> task, String name,
final boolean requested = JSONArray params, String id) {
sendMessage("sub", json -> json.put("id", id).put("name", name).put("params", params)); final boolean requested =
sendMessage("sub", json -> json.put("id", id).put("name", name).put("params", params));
if (requested) {
CompositeDisposable disposables = new CompositeDisposable(); if (requested) {
CompositeDisposable disposables = new CompositeDisposable();
disposables.add(
flowable.filter(callback -> callback instanceof RxWebSocketCallback.Message) disposables.add(
.map(callback -> ((RxWebSocketCallback.Message) callback).responseBodyString) flowable.filter(callback -> callback instanceof RxWebSocketCallback.Message)
.map(DDPClientImpl::toJson) .map(callback -> ((RxWebSocketCallback.Message) callback).responseBodyString)
.subscribe( .map(DDPClientImpl::toJson)
response -> { .subscribe(
String msg = extractMsg(response); response -> {
if ("ready".equals(msg) && !response.isNull("subs")) { String msg = extractMsg(response);
JSONArray ids = response.optJSONArray("subs"); if ("ready".equals(msg) && !response.isNull("subs")) {
for (int i = 0; i < ids.length(); i++) { JSONArray ids = response.optJSONArray("subs");
String _id = ids.optString(i); for (int i = 0; i < ids.length(); i++) {
if (id.equals(_id)) { String _id = ids.optString(i);
task.setResult(new DDPSubscription.Ready(client, id)); if (id.equals(_id)) {
disposables.clear(); task.setResult(new DDPSubscription.Ready(client, id));
break; disposables.clear();
} break;
} }
} else if ("nosub".equals(msg) && !response.isNull("id") && !response.isNull( }
"error")) { } else if ("nosub".equals(msg) && !response.isNull("id") && !response.isNull(
String _id = response.optString("id"); "error")) {
if (id.equals(_id)) { String _id = response.optString("id");
task.trySetError(new DDPSubscription.NoSub.Error(client, id, if (id.equals(_id)) {
response.optJSONObject("error"))); task.trySetError(new DDPSubscription.NoSub.Error(client, id,
disposables.clear(); response.optJSONObject("error")));
} disposables.clear();
} }
}, }
RCLog::e },
) err -> {
); if (err instanceof TimeoutException) {
task.trySetError(new Exception("Your connection seems off…"));
addErrorCallback(disposables, task); } else {
} else { task.trySetError(new Exception("Ooops. Something's up!"));
task.trySetError(new DDPClientCallback.Closed(client)); }
}
)
);
addErrorCallback(disposables, task);
} else {
task.trySetError(new DDPClientCallback.Closed(client));
}
} }
}
/* package */ void unsub(final TaskCompletionSource<DDPSubscription.NoSub> task, /* package */ void unsub(final TaskCompletionSource<DDPSubscription.NoSub> task,
@Nullable final String id) { @Nullable final String id) {
final boolean requested = sendMessage("unsub", json -> json.put("id", id));
if (requested) {
CompositeDisposable disposables = new CompositeDisposable();
disposables.add(
flowable.filter(callback -> callback instanceof RxWebSocketCallback.Message)
.map(callback -> ((RxWebSocketCallback.Message) callback).responseBodyString)
.map(DDPClientImpl::toJson)
.subscribe(
response -> {
String msg = extractMsg(response);
if ("nosub".equals(msg) && response.isNull("error") && !response.isNull("id")) {
String _id = response.optString("id");
if (id.equals(_id)) {
task.setResult(new DDPSubscription.NoSub(client, id));
disposables.clear();
}
}
},
err -> {
if (err instanceof TimeoutException) {
task.trySetError(new Exception("Your connection seems off…"));
} else {
task.trySetError(new Exception("Ooops. Something's up!"));
}
}
)
);
final boolean requested = sendMessage("unsub", json -> json.put("id", id)); addErrorCallback(disposables, task);
} else {
task.trySetError(new DDPClientCallback.Closed(client));
}
}
if (requested) { /* package */ void rpc(final TaskCompletionSource<DDPClientCallback.RPC> task, String method,
CompositeDisposable disposables = new CompositeDisposable(); JSONArray params, String id, long timeoutMs) {
final boolean requested =
sendMessage("method",
json -> json.put("method", method).put("params", params).put("id", id));
if (requested) {
CompositeDisposable disposables = new CompositeDisposable();
disposables.add(
flowable.filter(callback -> callback instanceof RxWebSocketCallback.Message)
.map(callback -> ((RxWebSocketCallback.Message) callback).responseBodyString)
.map(DDPClientImpl::toJson)
.timeout(timeoutMs, TimeUnit.MILLISECONDS)
.subscribe(
response -> {
String msg = extractMsg(response);
if ("result".equals(msg)) {
String _id = response.optString("id");
if (id.equals(_id)) {
if (!response.isNull("error")) {
task.trySetError(new DDPClientCallback.RPC.Error(client, id,
response.optJSONObject("error")));
} else {
String result = response.optString("result");
task.setResult(new DDPClientCallback.RPC(client, id, result));
}
disposables.clear();
}
}
},
err -> {
if (err instanceof TimeoutException) {
task.trySetError(new Exception("Your connection seems off…"));
} else {
task.trySetError(new Exception("Ooops. Something's up!"));
}
}
)
);
disposables.add( addErrorCallback(disposables, task);
flowable.filter(callback -> callback instanceof RxWebSocketCallback.Message) } else {
.map(callback -> ((RxWebSocketCallback.Message) callback).responseBodyString) task.trySetError(new DDPClientCallback.Closed(client));
.map(DDPClientImpl::toJson) }
.subscribe( }
response -> {
String msg = extractMsg(response); private void subscribeBaseListeners() {
if ("nosub".equals(msg) && response.isNull("error") && !response.isNull("id")) { if (disposables != null &&
String _id = response.optString("id"); disposables.size() > 0 && !disposables.isDisposed()) {
if (id.equals(_id)) { return;
task.setResult(new DDPSubscription.NoSub(client, id)); }
disposables.clear();
} disposables = new CompositeDisposable();
} disposables.add(
}, flowable.filter(callback -> callback instanceof RxWebSocketCallback.Message)
err -> { .map(callback -> ((RxWebSocketCallback.Message) callback).responseBodyString)
} .map(DDPClientImpl::toJson)
) .subscribe(
); response -> {
String msg = extractMsg(response);
addErrorCallback(disposables, task); if ("ping".equals(msg)) {
} else { if (response.isNull("id")) {
task.trySetError(new DDPClientCallback.Closed(client)); sendMessage("pong", null);
} else {
sendMessage("pong", json -> json.put("id", response.getString("id")));
}
}
},
RCLog::e
)
);
} }
}
/* package */ Flowable<DDPSubscription.Event> getDDPSubscription() {
/* package */ void rpc(final TaskCompletionSource<DDPClientCallback.RPC> task, String method, String[] targetMsgs = {"added", "changed", "removed", "addedBefore", "movedBefore"};
JSONArray params, String id, long timeoutMs) { return flowable.filter(callback -> callback instanceof RxWebSocketCallback.Message)
final boolean requested = .map(callback -> ((RxWebSocketCallback.Message) callback).responseBodyString)
sendMessage("method", .map(DDPClientImpl::toJson)
json -> json.put("method", method).put("params", params).put("id", id)); .filter(response -> {
if (requested) {
CompositeDisposable disposables = new CompositeDisposable();
disposables.add(
flowable.filter(callback -> callback instanceof RxWebSocketCallback.Message)
.map(callback -> ((RxWebSocketCallback.Message) callback).responseBodyString)
.map(DDPClientImpl::toJson)
.timeout(timeoutMs, TimeUnit.MILLISECONDS)
.subscribe(
response -> {
String msg = extractMsg(response); String msg = extractMsg(response);
if ("result".equals(msg)) { for (String m : targetMsgs) {
String _id = response.optString("id"); if (m.equals(msg)) {
if (id.equals(_id)) { return true;
if (!response.isNull("error")) {
task.trySetError(new DDPClientCallback.RPC.Error(client, id,
response.optJSONObject("error")));
} else {
String result = response.optString("result");
task.setResult(new DDPClientCallback.RPC(client, id, result));
} }
disposables.clear();
}
} }
}, return false;
err -> { })
if (err instanceof TimeoutException) { .map(response -> {
task.trySetError(new DDPClientCallback.RPC.Timeout(client)); String msg = extractMsg(response);
if ("added".equals(msg)) {
return new DDPSubscription.Added(client, response.optString("collection"),
response.optString("id"),
response.isNull("fields") ? null : response.optJSONObject("fields"));
} else if ("addedBefore".equals(msg)) {
return new DDPSubscription.Added.Before(client, response.optString("collection"),
response.optString("id"),
response.isNull("fields") ? null : response.optJSONObject("fields"),
response.isNull("before") ? null : response.optString("before"));
} else if ("changed".equals(msg)) {
return new DDPSubscription.Changed(client, response.optString("collection"),
response.optString("id"),
response.isNull("fields") ? null : response.optJSONObject("fields"),
response.isNull("cleared") ? new JSONArray() : response.optJSONArray("before"));
} else if ("removed".equals(msg)) {
return new DDPSubscription.Removed(client, response.optString("collection"),
response.optString("id"));
} else if ("movedBefore".equals(msg)) {
return new DDPSubscription.MovedBefore(client, response.optString("collection"),
response.optString("id"),
response.isNull("before") ? null : response.optString("before"));
} }
}
)
);
addErrorCallback(disposables, task); return null;
} else { });
task.trySetError(new DDPClientCallback.Closed(client));
} }
}
private void subscribeBaseListeners() { /* package */ void unsubscribeBaseListeners() {
if (disposables != null && if (disposables.size() > 0 || !disposables.isDisposed()) {
disposables.size() > 0 && !disposables.isDisposed()) { disposables.clear();
return; }
} }
disposables = new CompositeDisposable(); /* package */ Task<RxWebSocketCallback.Close> getOnCloseCallback() {
disposables.add( TaskCompletionSource<RxWebSocketCallback.Close> task = new TaskCompletionSource<>();
flowable.filter(callback -> callback instanceof RxWebSocketCallback.Message)
.map(callback -> ((RxWebSocketCallback.Message) callback).responseBodyString) flowable.filter(callback -> callback instanceof RxWebSocketCallback.Close)
.map(DDPClientImpl::toJson) .cast(RxWebSocketCallback.Close.class)
.subscribe( .subscribe(
response -> { task::setResult,
String msg = extractMsg(response); err -> setTaskError(task, err)
if ("ping".equals(msg)) { );
if (response.isNull("id")) {
sendMessage("pong", null); return task.getTask().onSuccessTask(_task -> {
} else { unsubscribeBaseListeners();
sendMessage("pong", json -> json.put("id", response.getString("id"))); return _task;
}
}
},
RCLog::e
)
);
}
/* package */ Flowable<DDPSubscription.Event> getDDPSubscription() {
String[] targetMsgs = {"added", "changed", "removed", "addedBefore", "movedBefore"};
return flowable.filter(callback -> callback instanceof RxWebSocketCallback.Message)
.map(callback -> ((RxWebSocketCallback.Message) callback).responseBodyString)
.map(DDPClientImpl::toJson)
.filter(response -> {
String msg = extractMsg(response);
for (String m : targetMsgs) {
if (m.equals(msg)) {
return true;
}
}
return false;
})
.map(response -> {
String msg = extractMsg(response);
if ("added".equals(msg)) {
return new DDPSubscription.Added(client, response.optString("collection"),
response.optString("id"),
response.isNull("fields") ? null : response.optJSONObject("fields"));
} else if ("addedBefore".equals(msg)) {
return new DDPSubscription.Added.Before(client, response.optString("collection"),
response.optString("id"),
response.isNull("fields") ? null : response.optJSONObject("fields"),
response.isNull("before") ? null : response.optString("before"));
} else if ("changed".equals(msg)) {
return new DDPSubscription.Changed(client, response.optString("collection"),
response.optString("id"),
response.isNull("fields") ? null : response.optJSONObject("fields"),
response.isNull("cleared") ? new JSONArray() : response.optJSONArray("before"));
} else if ("removed".equals(msg)) {
return new DDPSubscription.Removed(client, response.optString("collection"),
response.optString("id"));
} else if ("movedBefore".equals(msg)) {
return new DDPSubscription.MovedBefore(client, response.optString("collection"),
response.optString("id"),
response.isNull("before") ? null : response.optString("before"));
}
return null;
}); });
} }
/* package */ void unsubscribeBaseListeners() { private boolean sendMessage(String msg, @Nullable JSONBuilder json) {
if (disposables.size() > 0 || !disposables.isDisposed()) { try {
disposables.clear(); JSONObject origJson = new JSONObject().put("msg", msg);
String msg2 = (json == null ? origJson : json.create(origJson)).toString();
return websocket.sendText(msg2);
} catch (Exception e) {
RCLog.e(e);
return false;
}
} }
}
/* package */ Task<RxWebSocketCallback.Close> getOnCloseCallback() { private void sendMessage(String msg, @Nullable JSONBuilder json,
TaskCompletionSource<RxWebSocketCallback.Close> task = new TaskCompletionSource<>(); TaskCompletionSource<?> taskForSetError) {
if (!sendMessage(msg, json) && taskForSetError != null) {
taskForSetError.trySetError(new DDPClientCallback.Closed(client));
}
}
flowable.filter(callback -> callback instanceof RxWebSocketCallback.Close) private void addErrorCallback(CompositeDisposable disposables, TaskCompletionSource<?> task) {
.cast(RxWebSocketCallback.Close.class) disposables.add(
.subscribe( flowable.subscribe(
task::setResult, base -> {
err -> setTaskError(task, err) if (base instanceof RxWebSocketCallback.Close) {
task.trySetError(new Exception(((RxWebSocketCallback.Close) base).reason));
}
},
err -> {
setTaskError(task, new Exception(err));
disposables.clear();
}
)
); );
return task.getTask().onSuccessTask(_task -> {
unsubscribeBaseListeners();
return _task;
});
}
private boolean sendMessage(String msg, @Nullable JSONBuilder json) {
try {
JSONObject origJson = new JSONObject().put("msg", msg);
String msg2 = (json == null ? origJson : json.create(origJson)).toString();
return websocket.sendText(msg2);
} catch (Exception e) {
RCLog.e(e);
return false;
} }
}
private void sendMessage(String msg, @Nullable JSONBuilder json, public void close(int code, String reason) {
TaskCompletionSource<?> taskForSetError) { try {
if (!sendMessage(msg, json) && taskForSetError != null) { websocket.close(code, reason);
taskForSetError.trySetError(new DDPClientCallback.Closed(client)); } catch (Exception e) {
} RCLog.e(e);
} }
private void addErrorCallback(CompositeDisposable disposables, TaskCompletionSource<?> task) {
disposables.add(
flowable.subscribe(
base -> {
if (base instanceof RxWebSocketCallback.Close) {
task.trySetError(new Exception(((RxWebSocketCallback.Close) base).reason));
}
},
err -> {
setTaskError(task, new Exception(err));
disposables.clear();
}
)
);
}
public void close(int code, String reason) {
try {
websocket.close(code, reason);
} catch (Exception e) {
RCLog.e(e);
} }
}
private void setTaskError(TaskCompletionSource task, Throwable throwable) { private void setTaskError(TaskCompletionSource task, Throwable throwable) {
if (task.getTask().isCompleted()) { if (task.getTask().isCompleted()) {
return; return;
}
if (throwable instanceof Exception) {
task.setError((Exception) throwable);
} else {
task.setError(new Exception(throwable));
}
} }
if (throwable instanceof Exception) {
task.setError((Exception) throwable);
} else {
task.setError(new Exception(throwable));
}
}
private interface JSONBuilder { private interface JSONBuilder {
@NonNull @NonNull
JSONObject create(JSONObject root) throws JSONException; JSONObject create(JSONObject root) throws JSONException;
} }
} }
...@@ -11,6 +11,10 @@ object ConnectionStatusManager { ...@@ -11,6 +11,10 @@ object ConnectionStatusManager {
} }
private const val DEBUG = false private const val DEBUG = false
private val DEFAULT_CALLBACK = object : TransitionCallback {
override fun onTransitioned(success: Boolean) {
}
}
private val stateMachine: EnumStateMachine<State> private val stateMachine: EnumStateMachine<State>
init { init {
...@@ -28,23 +32,46 @@ object ConnectionStatusManager { ...@@ -28,23 +32,46 @@ object ConnectionStatusManager {
fun currentState() = stateMachine.currentState() fun currentState() = stateMachine.currentState()
@Synchronized @Synchronized
fun setOnline(callback: TransitionCallback) { fun setOnline(callback: TransitionCallback = DEFAULT_CALLBACK) {
KeepAliveJob.cancelPendingJobRequests() KeepAliveJob.cancelPendingJobRequests()
tryTransitionTo(State.ONLINE, callback) tryTransitionTo(State.ONLINE, callback)
} }
@Synchronized @Synchronized
fun setConnecting(callback: TransitionCallback) { fun setOnline() {
KeepAliveJob.cancelPendingJobRequests()
tryTransitionTo(State.ONLINE, DEFAULT_CALLBACK)
}
@Synchronized
fun setConnecting(callback: TransitionCallback = DEFAULT_CALLBACK) {
KeepAliveJob.cancelPendingJobRequests() KeepAliveJob.cancelPendingJobRequests()
tryTransitionTo(State.CONNECTING, callback) tryTransitionTo(State.CONNECTING, callback)
} }
@Synchronized @Synchronized
fun setConnectionError(callback: TransitionCallback) { fun setConnecting() {
KeepAliveJob.cancelPendingJobRequests()
tryTransitionTo(State.CONNECTING, DEFAULT_CALLBACK)
}
@Synchronized
fun setConnectionError(callback: TransitionCallback = DEFAULT_CALLBACK) {
KeepAliveJob.schedule() KeepAliveJob.schedule()
tryTransitionTo(State.OFFLINE, callback) tryTransitionTo(State.OFFLINE, callback)
} }
@Synchronized
fun setConnectionError() {
KeepAliveJob.schedule()
tryTransitionTo(State.OFFLINE, DEFAULT_CALLBACK)
}
@Synchronized
fun setOffline() {
stateMachine.reset()
}
private fun tryTransitionTo(newState: State, callback: TransitionCallback) { private fun tryTransitionTo(newState: State, callback: TransitionCallback) {
try { try {
stateMachine.transition(newState) stateMachine.transition(newState)
......
...@@ -72,7 +72,7 @@ object RocketChatCache { ...@@ -72,7 +72,7 @@ object RocketChatCache {
return getString(KEY_SELECTED_SERVER_HOSTNAME, null) return getString(KEY_SELECTED_SERVER_HOSTNAME, null)
} }
fun setSelectedRoomId(roomId: String) { fun setSelectedRoomId(roomId: String?) {
try { try {
val jsonObject = getSelectedRoomIdJsonObject() val jsonObject = getSelectedRoomIdJsonObject()
jsonObject.put(getSelectedServerHostname(), roomId) jsonObject.put(getSelectedServerHostname(), roomId)
...@@ -81,7 +81,6 @@ object RocketChatCache { ...@@ -81,7 +81,6 @@ object RocketChatCache {
RCLog.e(e) RCLog.e(e)
Logger.report(e) Logger.report(e)
} }
} }
@Throws(JSONException::class) @Throws(JSONException::class)
...@@ -234,10 +233,11 @@ object RocketChatCache { ...@@ -234,10 +233,11 @@ object RocketChatCache {
fun clearSelectedHostnameReferences() { fun clearSelectedHostnameReferences() {
val hostname = getSelectedServerHostname() val hostname = getSelectedServerHostname()
if (hostname != null) { if (hostname != null) {
setString(KEY_OPENED_ROOMS, null)
removeSiteName(hostname) removeSiteName(hostname)
removeHostname(hostname) removeHostname(hostname)
removeSiteUrl(hostname) removeSiteUrl(hostname)
setSelectedServerHostname(null) setSelectedServerHostname(getFirstLoggedHostnameIfAny())
} }
} }
...@@ -308,7 +308,7 @@ object RocketChatCache { ...@@ -308,7 +308,7 @@ object RocketChatCache {
fun setSessionToken(sessionToken: String) { fun setSessionToken(sessionToken: String) {
val selectedServerHostname = getSelectedServerHostname() ?: val selectedServerHostname = getSelectedServerHostname() ?:
throw IllegalStateException("Trying to set sessionToken to null hostname") throw IllegalStateException("Trying to set sessionToken to null hostname")
val sessions = getSessionToken() val sessions = getString(KEY_SESSION_TOKEN, null)
try { try {
val jsonObject = if (sessions == null) JSONObject() else JSONObject(sessions) val jsonObject = if (sessions == null) JSONObject() else JSONObject(sessions)
jsonObject.put(selectedServerHostname, sessionToken) jsonObject.put(selectedServerHostname, sessionToken)
......
...@@ -118,7 +118,7 @@ public class MethodCallHelper { ...@@ -118,7 +118,7 @@ public class MethodCallHelper {
} else if (exception instanceof DDPClientCallback.RPC.Timeout) { } else if (exception instanceof DDPClientCallback.RPC.Timeout) {
return Task.forError(new MethodCall.Timeout()); return Task.forError(new MethodCall.Timeout());
} else if (exception instanceof DDPClientCallback.Closed) { } else if (exception instanceof DDPClientCallback.Closed) {
return Task.forError(new Exception("Oops, your connection seems off...")); return Task.forError(new Exception(exception.getMessage()));
} else { } else {
return Task.forError(exception); return Task.forError(exception);
} }
......
...@@ -17,6 +17,7 @@ import chat.rocket.core.models.PublicSetting ...@@ -17,6 +17,7 @@ import chat.rocket.core.models.PublicSetting
import chat.rocket.core.repositories.LoginServiceConfigurationRepository import chat.rocket.core.repositories.LoginServiceConfigurationRepository
import chat.rocket.core.repositories.PublicSettingRepository import chat.rocket.core.repositories.PublicSettingRepository
import com.hadisatrio.optional.Optional import com.hadisatrio.optional.Optional
import io.reactivex.Completable
import io.reactivex.android.schedulers.AndroidSchedulers import io.reactivex.android.schedulers.AndroidSchedulers
import io.reactivex.rxkotlin.subscribeBy import io.reactivex.rxkotlin.subscribeBy
...@@ -74,25 +75,34 @@ class LoginPresenter(private val loginServiceConfigurationRepository: LoginServi ...@@ -74,25 +75,34 @@ class LoginPresenter(private val loginServiceConfigurationRepository: LoginServi
} }
private fun doLogin(username: String, password: String, optional: Optional<PublicSetting>) { private fun doLogin(username: String, password: String, optional: Optional<PublicSetting>) {
call(username, password, optional) addSubscription(
.continueWith(object : Continuation<Void, Any?> { Completable.create {
override fun then(task: Task<Void>?): Any? { call(username, password, optional)
if (task != null && task.isFaulted()) { .continueWith(object : Continuation<Void, Any?> {
view.hideLoader() override fun then(task: Task<Void>?): Any? {
if (task != null && task.isFaulted()) {
view.hideLoader()
val error = task.getError() val error = task.getError()
error?.let { error?.let {
if (error is TwoStepAuthException) { if (error is TwoStepAuthException) {
view.showTwoStepAuth() view.showTwoStepAuth()
} else { } else {
view.showError(error.message) view.showError(error.message)
}
}
return Completable.complete()
}
return null
} }
} }, Task.UI_THREAD_EXECUTOR)
}.subscribeBy(
onError = {
view.showError(it.message)
} }
return null )
} )
}, Task.UI_THREAD_EXECUTOR)
} }
private fun call(username: String, password: String, optional: Optional<PublicSetting>): Task<Void> { private fun call(username: String, password: String, optional: Optional<PublicSetting>): Task<Void> {
......
...@@ -14,6 +14,7 @@ import java.util.Map; ...@@ -14,6 +14,7 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import chat.rocket.android.ConnectionStatusManager;
import chat.rocket.android.RocketChatCache; import chat.rocket.android.RocketChatCache;
import chat.rocket.android.helper.RxHelper; import chat.rocket.android.helper.RxHelper;
import chat.rocket.android.log.RCLog; import chat.rocket.android.log.RCLog;
...@@ -260,6 +261,7 @@ import io.reactivex.subjects.BehaviorSubject; ...@@ -260,6 +261,7 @@ import io.reactivex.subjects.BehaviorSubject;
private Single<Boolean> connectToServer(String hostname) { private Single<Boolean> connectToServer(String hostname) {
return Single.defer(() -> { return Single.defer(() -> {
if (!serverConnectivityList.containsKey(hostname)) { if (!serverConnectivityList.containsKey(hostname)) {
ConnectionStatusManager.INSTANCE.setConnectionError();
return Single.error(new IllegalArgumentException("hostname not found")); return Single.error(new IllegalArgumentException("hostname not found"));
} }
...@@ -270,8 +272,10 @@ import io.reactivex.subjects.BehaviorSubject; ...@@ -270,8 +272,10 @@ import io.reactivex.subjects.BehaviorSubject;
} }
if (serviceInterface != null) { if (serviceInterface != null) {
ConnectionStatusManager.INSTANCE.setConnecting();
return serviceInterface.ensureConnectionToServer(hostname); return serviceInterface.ensureConnectionToServer(hostname);
} else { } else {
ConnectionStatusManager.INSTANCE.setConnectionError();
return Single.error(new ThreadLooperNotPreparedException("not prepared")); return Single.error(new ThreadLooperNotPreparedException("not prepared"));
} }
}); });
......
...@@ -38,7 +38,8 @@ public class RocketChatService extends Service implements ConnectivityServiceInt ...@@ -38,7 +38,8 @@ public class RocketChatService extends Service implements ConnectivityServiceInt
/** /**
* ensure RocketChatService alive. * ensure RocketChatService alive.
*/ */
/*package*/static void keepAlive(Context context) { /*package*/
static void keepAlive(Context context) {
context.startService(new Intent(context, RocketChatService.class)); context.startService(new Intent(context, RocketChatService.class));
} }
...@@ -99,31 +100,29 @@ public class RocketChatService extends Service implements ConnectivityServiceInt ...@@ -99,31 +100,29 @@ public class RocketChatService extends Service implements ConnectivityServiceInt
return Single.defer(() -> { return Single.defer(() -> {
webSocketThreadLock.acquire(); webSocketThreadLock.acquire();
int connectivityState = ConnectivityManager.getInstance(getApplicationContext()).getConnectivityState(hostname); int connectivityState = ConnectivityManager.getInstance(getApplicationContext()).getConnectivityState(hostname);
boolean isDisconnected = connectivityState < ServerConnectivity.STATE_CONNECTED ; boolean isDisconnected = connectivityState < ServerConnectivity.STATE_CONNECTED;
if (currentWebSocketThread != null && existsThreadForHostname(hostname) && !isDisconnected) { if (currentWebSocketThread != null && existsThreadForHostname(hostname) && !isDisconnected) {
webSocketThreadLock.release(); webSocketThreadLock.release();
return Single.just(currentWebSocketThread); return Single.just(currentWebSocketThread);
} }
if (currentWebSocketThread != null) { if (currentWebSocketThread != null) {
if (isDisconnected) { boolean hasFailed = existsThreadForHostname(hostname);
return currentWebSocketThread.terminate(true) return currentWebSocketThread.terminate(hasFailed)
.doAfterTerminate(() -> currentWebSocketThread = null) .doAfterTerminate(() -> currentWebSocketThread = null)
.flatMap(terminated -> .flatMap(terminated ->
RocketChatWebSocketThread.getStarted(getApplicationContext(), hostname) RocketChatWebSocketThread.getStarted(getApplicationContext(), hostname)
.doOnSuccess(thread -> { .doOnSuccess(thread -> {
currentWebSocketThread = thread; currentWebSocketThread = thread;
webSocketThreadLock.release(); webSocketThreadLock.release();
}) })
.doOnError(throwable -> { .doOnError(throwable -> {
currentWebSocketThread = null; currentWebSocketThread = null;
RCLog.e(throwable); RCLog.e(throwable);
Logger.INSTANCE.report(throwable); Logger.INSTANCE.report(throwable);
webSocketThreadLock.release(); webSocketThreadLock.release();
}) })
); );
}
return Single.just(currentWebSocketThread);
} }
return RocketChatWebSocketThread.getStarted(getApplicationContext(), hostname) return RocketChatWebSocketThread.getStarted(getApplicationContext(), hostname)
......
...@@ -14,6 +14,7 @@ import java.util.concurrent.TimeUnit; ...@@ -14,6 +14,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import bolts.Task; import bolts.Task;
import chat.rocket.android.ConnectionStatusManager;
import chat.rocket.android.api.MethodCallHelper; import chat.rocket.android.api.MethodCallHelper;
import chat.rocket.android.helper.LogIfError; import chat.rocket.android.helper.LogIfError;
import chat.rocket.android.helper.RxHelper; import chat.rocket.android.helper.RxHelper;
...@@ -123,26 +124,26 @@ public class RocketChatWebSocketThread extends HandlerThread { ...@@ -123,26 +124,26 @@ public class RocketChatWebSocketThread extends HandlerThread {
} }
/** /**
* terminate WebSocket thread. * Terminate WebSocket thread. If {@code hasFailed} is {@code true} it means that a connection was
* in progress but failed and got offline. If it's {@code false} means that the user explicitly
* disconnected from server either by logging out or by means of changing servers.
* *
* @param isDisconnected {@code true} If we're trying to terminate a disconnected a websocket * @param hasFailed {@code true} if the termination is due to a network error, otherwise
* thread which means it has failed. * return false
*/ */
@DebugLog @DebugLog
/* package */ Single<Boolean> terminate(boolean isDisconnected) { /* package */ Single<Boolean> terminate(boolean hasFailed) {
if (isAlive()) { if (isAlive()) {
return Single.create(emitter -> { return Single.create(emitter -> new Handler(getLooper()).post(() -> {
new Handler(getLooper()).post(() -> { RCLog.d("thread %s: terminated()", Thread.currentThread().getId());
RCLog.d("thread %s: terminated()", Thread.currentThread().getId()); int reason = (hasFailed) ?
int reason = (isDisconnected) ? DDPClient.REASON_NETWORK_ERROR : DDPClient.REASON_CLOSED_BY_USER;
DDPClient.REASON_NETWORK_ERROR : DDPClient.REASON_CLOSED_BY_USER; unregisterListenersAndClose(reason);
unregisterListenersAndClose(reason); connectivityManager.notifyConnectionLost(hostname, reason);
connectivityManager.notifyConnectionLost(hostname, RocketChatWebSocketThread.super.quit();
isDisconnected ? DDPClient.REASON_NETWORK_ERROR : DDPClient.REASON_CLOSED_BY_USER); ConnectionStatusManager.INSTANCE.setOffline();
RocketChatWebSocketThread.super.quit(); emitter.onSuccess(true);
emitter.onSuccess(true); }));
});
});
} else { } else {
connectivityManager.notifyConnectionLost(hostname, connectivityManager.notifyConnectionLost(hostname,
DDPClient.REASON_NETWORK_ERROR); DDPClient.REASON_NETWORK_ERROR);
...@@ -166,7 +167,7 @@ public class RocketChatWebSocketThread extends HandlerThread { ...@@ -166,7 +167,7 @@ public class RocketChatWebSocketThread extends HandlerThread {
@DebugLog @DebugLog
/* package */ Single<Boolean> keepAlive() { /* package */ Single<Boolean> keepAlive() {
return checkIfConnectionAlive() return checkIfConnectionAlive()
.flatMap(alive -> alive ? Single.just(true) : connectWithExponentialBackoff()); .flatMap(alive -> connectWithExponentialBackoff());
} }
@DebugLog @DebugLog
...@@ -227,7 +228,7 @@ public class RocketChatWebSocketThread extends HandlerThread { ...@@ -227,7 +228,7 @@ public class RocketChatWebSocketThread extends HandlerThread {
.onSuccessTask(task -> { .onSuccessTask(task -> {
final String newSession = task.getResult().session; final String newSession = task.getResult().session;
connectivityManager.notifyConnectionEstablished(hostname, newSession); connectivityManager.notifyConnectionEstablished(hostname, newSession);
// handling WebSocket#onClose() callback.
task.getResult().client.getOnCloseCallback().onSuccess(_task -> { task.getResult().client.getOnCloseCallback().onSuccess(_task -> {
RxWebSocketCallback.Close result = _task.getResult(); RxWebSocketCallback.Close result = _task.getResult();
if (result.code == DDPClient.REASON_NETWORK_ERROR) { if (result.code == DDPClient.REASON_NETWORK_ERROR) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment