Commit 54632d59 authored by Tiago Cunha's avatar Tiago Cunha

Adding extra on error handler

parent dd29cdd7
...@@ -54,38 +54,47 @@ public class DDPClientImpl { ...@@ -54,38 +54,47 @@ public class DDPClientImpl {
flowable = websocket.connect(url).autoConnect(); flowable = websocket.connect(url).autoConnect();
CompositeDisposable subscriptions = new CompositeDisposable(); CompositeDisposable subscriptions = new CompositeDisposable();
subscriptions.add(flowable.filter(callback -> callback instanceof RxWebSocketCallback.Open) subscriptions.add(
.subscribe(callback -> { flowable.filter(callback -> callback instanceof RxWebSocketCallback.Open)
sendMessage("connect", .subscribe(
json -> (TextUtils.isEmpty(session) ? json : json.put("session", session)).put( callback -> {
"version", "pre2").put("support", new JSONArray().put("pre2").put("pre1")), sendMessage("connect",
task); json -> (TextUtils.isEmpty(session) ? json : json.put("session", session))
}, err -> { .put(
})); "version", "pre2")
.put("support", new JSONArray().put("pre2").put("pre1")),
task);
},
err -> {
}
)
);
subscriptions.add( subscriptions.add(
flowable.filter(callback -> callback instanceof RxWebSocketCallback.Message) flowable.filter(
callback -> callback instanceof RxWebSocketCallback.Message)
.map(callback -> ((RxWebSocketCallback.Message) callback).responseBodyString) .map(callback -> ((RxWebSocketCallback.Message) callback).responseBodyString)
.map(DDPClientImpl::toJson) .map(DDPClientImpl::toJson)
.timeout(7, TimeUnit.SECONDS) .timeout(7, TimeUnit.SECONDS)
.subscribe(response -> { .subscribe(response -> {
String msg = extractMsg(response); String msg = extractMsg(response);
if ("connected".equals(msg) && !response.isNull("session")) { if ("connected".equals(msg) && !response.isNull("session")) {
task.trySetResult( task.trySetResult(
new DDPClientCallback.Connect(client, response.optString("session"))); new DDPClientCallback.Connect(client, response.optString("session")));
subscriptions.dispose(); subscriptions.dispose();
} else if ("error".equals(msg) && "Already connected".equals( } else if ("error".equals(msg) && "Already connected".equals(
response.optString("reason"))) { response.optString("reason"))) {
task.trySetResult(new DDPClientCallback.Connect(client, null)); task.trySetResult(new DDPClientCallback.Connect(client, null));
subscriptions.dispose(); subscriptions.dispose();
} else if ("failed".equals(msg)) { } else if ("failed".equals(msg)) {
task.trySetError( task.trySetError(
new DDPClientCallback.Connect.Failed(client, response.optString("version"))); new DDPClientCallback.Connect.Failed(client, response.optString("version")));
subscriptions.dispose(); subscriptions.dispose();
} }
}, err -> { },
task.trySetError(new DDPClientCallback.Connect.Timeout(client)); err -> task.trySetError(new DDPClientCallback.Connect.Timeout(client))
})); )
);
addErrorCallback(subscriptions, task); addErrorCallback(subscriptions, task);
...@@ -110,23 +119,25 @@ public class DDPClientImpl { ...@@ -110,23 +119,25 @@ public class DDPClientImpl {
.map(callback -> ((RxWebSocketCallback.Message) callback).responseBodyString) .map(callback -> ((RxWebSocketCallback.Message) callback).responseBodyString)
.map(DDPClientImpl::toJson) .map(DDPClientImpl::toJson)
.timeout(4, TimeUnit.SECONDS) .timeout(4, TimeUnit.SECONDS)
.subscribe(response -> { .subscribe(
String msg = extractMsg(response); response -> {
if ("pong".equals(msg)) { String msg = extractMsg(response);
if (response.isNull("id")) { if ("pong".equals(msg)) {
task.setResult(new DDPClientCallback.Ping(client, null)); if (response.isNull("id")) {
subscriptions.dispose(); task.setResult(new DDPClientCallback.Ping(client, null));
} else { subscriptions.dispose();
String _id = response.optString("id"); } else {
if (id.equals(_id)) { String _id = response.optString("id");
task.setResult(new DDPClientCallback.Ping(client, id)); if (id.equals(_id)) {
subscriptions.dispose(); task.setResult(new DDPClientCallback.Ping(client, id));
subscriptions.dispose();
}
}
} }
} },
} err -> task.setError(new DDPClientCallback.Ping.Timeout(client))
}, err -> { )
task.setError(new DDPClientCallback.Ping.Timeout(client)); );
}));
addErrorCallback(subscriptions, task); addErrorCallback(subscriptions, task);
} else { } else {
...@@ -146,29 +157,33 @@ public class DDPClientImpl { ...@@ -146,29 +157,33 @@ public class DDPClientImpl {
flowable.filter(callback -> callback instanceof RxWebSocketCallback.Message) flowable.filter(callback -> callback instanceof RxWebSocketCallback.Message)
.map(callback -> ((RxWebSocketCallback.Message) callback).responseBodyString) .map(callback -> ((RxWebSocketCallback.Message) callback).responseBodyString)
.map(DDPClientImpl::toJson) .map(DDPClientImpl::toJson)
.subscribe(response -> { .subscribe(
String msg = extractMsg(response); response -> {
if ("ready".equals(msg) && !response.isNull("subs")) { String msg = extractMsg(response);
JSONArray ids = response.optJSONArray("subs"); if ("ready".equals(msg) && !response.isNull("subs")) {
for (int i = 0; i < ids.length(); i++) { JSONArray ids = response.optJSONArray("subs");
String _id = ids.optString(i); for (int i = 0; i < ids.length(); i++) {
if (id.equals(_id)) { String _id = ids.optString(i);
task.setResult(new DDPSubscription.Ready(client, id)); if (id.equals(_id)) {
subscriptions.dispose(); task.setResult(new DDPSubscription.Ready(client, id));
break; subscriptions.dispose();
break;
}
}
} else if ("nosub".equals(msg) && !response.isNull("id") && !response.isNull(
"error")) {
String _id = response.optString("id");
if (id.equals(_id)) {
task.setError(new DDPSubscription.NoSub.Error(client, id,
response.optJSONObject("error")));
subscriptions.dispose();
}
} }
},
err -> {
} }
} else if ("nosub".equals(msg) && !response.isNull("id") && !response.isNull( )
"error")) { );
String _id = response.optString("id");
if (id.equals(_id)) {
task.setError(new DDPSubscription.NoSub.Error(client, id,
response.optJSONObject("error")));
subscriptions.dispose();
}
}
}, err -> {
}));
addErrorCallback(subscriptions, task); addErrorCallback(subscriptions, task);
} else { } else {
...@@ -188,17 +203,21 @@ public class DDPClientImpl { ...@@ -188,17 +203,21 @@ public class DDPClientImpl {
flowable.filter(callback -> callback instanceof RxWebSocketCallback.Message) flowable.filter(callback -> callback instanceof RxWebSocketCallback.Message)
.map(callback -> ((RxWebSocketCallback.Message) callback).responseBodyString) .map(callback -> ((RxWebSocketCallback.Message) callback).responseBodyString)
.map(DDPClientImpl::toJson) .map(DDPClientImpl::toJson)
.subscribe(response -> { .subscribe(
String msg = extractMsg(response); response -> {
if ("nosub".equals(msg) && response.isNull("error") && !response.isNull("id")) { String msg = extractMsg(response);
String _id = response.optString("id"); if ("nosub".equals(msg) && response.isNull("error") && !response.isNull("id")) {
if (id.equals(_id)) { String _id = response.optString("id");
task.setResult(new DDPSubscription.NoSub(client, id)); if (id.equals(_id)) {
subscriptions.dispose(); task.setResult(new DDPSubscription.NoSub(client, id));
subscriptions.dispose();
}
}
},
err -> {
} }
} )
}, err -> { );
}));
addErrorCallback(subscriptions, task); addErrorCallback(subscriptions, task);
} else { } else {
...@@ -220,26 +239,30 @@ public class DDPClientImpl { ...@@ -220,26 +239,30 @@ public class DDPClientImpl {
.map(callback -> ((RxWebSocketCallback.Message) callback).responseBodyString) .map(callback -> ((RxWebSocketCallback.Message) callback).responseBodyString)
.map(DDPClientImpl::toJson) .map(DDPClientImpl::toJson)
.timeout(timeoutMs, TimeUnit.MILLISECONDS) .timeout(timeoutMs, TimeUnit.MILLISECONDS)
.subscribe(response -> { .subscribe(
String msg = extractMsg(response); response -> {
if ("result".equals(msg)) { String msg = extractMsg(response);
String _id = response.optString("id"); if ("result".equals(msg)) {
if (id.equals(_id)) { String _id = response.optString("id");
if (!response.isNull("error")) { if (id.equals(_id)) {
task.setError(new DDPClientCallback.RPC.Error(client, id, if (!response.isNull("error")) {
response.optJSONObject("error"))); task.setError(new DDPClientCallback.RPC.Error(client, id,
} else { response.optJSONObject("error")));
String result = response.optString("result"); } else {
task.setResult(new DDPClientCallback.RPC(client, id, result)); String result = response.optString("result");
task.setResult(new DDPClientCallback.RPC(client, id, result));
}
subscriptions.dispose();
}
}
},
err -> {
if (err instanceof TimeoutException) {
task.setError(new DDPClientCallback.RPC.Timeout(client));
} }
subscriptions.dispose();
} }
} )
}, err -> { );
if (err instanceof TimeoutException) {
task.setError(new DDPClientCallback.RPC.Timeout(client));
}
}));
addErrorCallback(subscriptions, task); addErrorCallback(subscriptions, task);
} else { } else {
...@@ -258,17 +281,21 @@ public class DDPClientImpl { ...@@ -258,17 +281,21 @@ public class DDPClientImpl {
flowable.filter(callback -> callback instanceof RxWebSocketCallback.Message) flowable.filter(callback -> callback instanceof RxWebSocketCallback.Message)
.map(callback -> ((RxWebSocketCallback.Message) callback).responseBodyString) .map(callback -> ((RxWebSocketCallback.Message) callback).responseBodyString)
.map(DDPClientImpl::toJson) .map(DDPClientImpl::toJson)
.subscribe(response -> { .subscribe(
String msg = extractMsg(response); response -> {
if ("ping".equals(msg)) { String msg = extractMsg(response);
if (response.isNull("id")) { if ("ping".equals(msg)) {
sendMessage("pong", null); if (response.isNull("id")) {
} else { sendMessage("pong", null);
sendMessage("pong", json -> json.put("id", response.getString("id"))); } else {
sendMessage("pong", json -> json.put("id", response.getString("id")));
}
}
},
err -> {
} }
} )
}, err -> { );
}));
} }
public Flowable<DDPSubscription.Event> getDDPSubscription() { public Flowable<DDPSubscription.Event> getDDPSubscription() {
...@@ -325,13 +352,16 @@ public class DDPClientImpl { ...@@ -325,13 +352,16 @@ public class DDPClientImpl {
flowable.filter(callback -> callback instanceof RxWebSocketCallback.Close) flowable.filter(callback -> callback instanceof RxWebSocketCallback.Close)
.cast(RxWebSocketCallback.Close.class) .cast(RxWebSocketCallback.Close.class)
.subscribe(task::setResult, err -> { .subscribe(
if (err instanceof Exception) { task::setResult,
task.setError((Exception) err); err -> {
} else { if (err instanceof Exception) {
task.setError(new Exception(err)); task.setError((Exception) err);
} } else {
}); task.setError(new Exception(err));
}
}
);
return task.getTask().onSuccessTask(_task -> { return task.getTask().onSuccessTask(_task -> {
unsubscribeBaseListeners(); unsubscribeBaseListeners();
...@@ -358,11 +388,16 @@ public class DDPClientImpl { ...@@ -358,11 +388,16 @@ public class DDPClientImpl {
} }
private void addErrorCallback(CompositeDisposable subscriptions, TaskCompletionSource<?> task) { private void addErrorCallback(CompositeDisposable subscriptions, TaskCompletionSource<?> task) {
subscriptions.add(flowable.subscribe(base -> { subscriptions.add(
}, err -> { flowable.subscribe(
task.trySetError(new Exception(err)); base -> {
subscriptions.dispose(); },
})); err -> {
task.trySetError(new Exception(err));
subscriptions.dispose();
}
)
);
} }
public void close(int code, String reason) { public void close(int code, String reason) {
......
...@@ -99,23 +99,27 @@ public abstract class AbstractDDPDocEventSubscriber implements Registrable { ...@@ -99,23 +99,27 @@ public abstract class AbstractDDPDocEventSubscriber implements Registrable {
.filter(event -> event instanceof DDPSubscription.DocEvent) .filter(event -> event instanceof DDPSubscription.DocEvent)
.cast(DDPSubscription.DocEvent.class) .cast(DDPSubscription.DocEvent.class)
.filter(event -> isTarget(event.collection)) .filter(event -> isTarget(event.collection))
.subscribe(docEvent -> { .subscribe(
try { docEvent -> {
if (docEvent instanceof DDPSubscription.Added.Before) { try {
onDocumentAdded((DDPSubscription.Added) docEvent); //ignore Before if (docEvent instanceof DDPSubscription.Added.Before) {
} else if (docEvent instanceof DDPSubscription.Added) { onDocumentAdded((DDPSubscription.Added) docEvent); //ignore Before
onDocumentAdded((DDPSubscription.Added) docEvent); } else if (docEvent instanceof DDPSubscription.Added) {
} else if (docEvent instanceof DDPSubscription.Removed) { onDocumentAdded((DDPSubscription.Added) docEvent);
onDocumentRemoved((DDPSubscription.Removed) docEvent); } else if (docEvent instanceof DDPSubscription.Removed) {
} else if (docEvent instanceof DDPSubscription.Changed) { onDocumentRemoved((DDPSubscription.Removed) docEvent);
onDocumentChanged((DDPSubscription.Changed) docEvent); } else if (docEvent instanceof DDPSubscription.Changed) {
} else if (docEvent instanceof DDPSubscription.MovedBefore) { onDocumentChanged((DDPSubscription.Changed) docEvent);
//ignore movedBefore } else if (docEvent instanceof DDPSubscription.MovedBefore) {
//ignore movedBefore
}
} catch (Exception exception) {
RCLog.w(exception, "failed to handle subscription callback");
}
},
throwable -> {
} }
} catch (Exception exception) { );
RCLog.w(exception, "failed to handle subscription callback");
}
});
} }
protected void onDocumentAdded(DDPSubscription.Added docEvent) { protected void onDocumentAdded(DDPSubscription.Added docEvent) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment