Commit 8e235f66 authored by Yusuke Iwaki's avatar Yusuke Iwaki

add error handling for "already closed"

parent a0d5c1cb
...@@ -15,7 +15,8 @@ public class DDPClientCallback { ...@@ -15,7 +15,8 @@ public class DDPClientCallback {
public static abstract class BaseException extends Exception { public static abstract class BaseException extends Exception {
public DDPClient client; public DDPClient client;
public BaseException(DDPClient client) { public BaseException(Class<? extends BaseException> clazz, DDPClient client) {
super(clazz.getName());
this.client = client; this.client = client;
} }
} }
...@@ -32,14 +33,14 @@ public class DDPClientCallback { ...@@ -32,14 +33,14 @@ public class DDPClientCallback {
public String version; public String version;
public Failed(DDPClient client, String version) { public Failed(DDPClient client, String version) {
super(client); super(Failed.class, client);
this.version = version; this.version = version;
} }
} }
public static class Timeout extends BaseException { public static class Timeout extends BaseException {
public Timeout(DDPClient client) { public Timeout(DDPClient client) {
super(client); super(Timeout.class, client);
} }
} }
} }
...@@ -54,7 +55,7 @@ public class DDPClientCallback { ...@@ -54,7 +55,7 @@ public class DDPClientCallback {
public static class Timeout extends BaseException { public static class Timeout extends BaseException {
public Timeout(DDPClient client) { public Timeout(DDPClient client) {
super(client); super(Timeout.class, client);
} }
} }
} }
...@@ -74,7 +75,7 @@ public class DDPClientCallback { ...@@ -74,7 +75,7 @@ public class DDPClientCallback {
public JSONObject error; public JSONObject error;
public Error(DDPClient client, String id, JSONObject error) { public Error(DDPClient client, String id, JSONObject error) {
super(client); super(Error.class, client);
this.id = id; this.id = id;
this.error = error; this.error = error;
} }
...@@ -82,8 +83,14 @@ public class DDPClientCallback { ...@@ -82,8 +83,14 @@ public class DDPClientCallback {
public static class Timeout extends BaseException { public static class Timeout extends BaseException {
public Timeout(DDPClient client) { public Timeout(DDPClient client) {
super(client); super(Timeout.class, client);
}
} }
} }
public static class Closed extends BaseException {
public Closed(DDPClient client) {
super(Closed.class, client);
}
} }
} }
...@@ -59,7 +59,8 @@ public class DDPClientImpl { ...@@ -59,7 +59,8 @@ public class DDPClientImpl {
.subscribe(callback -> { .subscribe(callback -> {
sendMessage("connect", sendMessage("connect",
json -> (TextUtils.isEmpty(session) ? json : json.put("session", session)).put( json -> (TextUtils.isEmpty(session) ? json : json.put("session", session)).put(
"version", "pre2").put("support", new JSONArray().put("pre2").put("pre1"))); "version", "pre2").put("support", new JSONArray().put("pre2").put("pre1")),
task);
}, err -> { }, err -> {
})); }));
...@@ -71,20 +72,20 @@ public class DDPClientImpl { ...@@ -71,20 +72,20 @@ public class DDPClientImpl {
.subscribe(response -> { .subscribe(response -> {
String msg = extractMsg(response); String msg = extractMsg(response);
if ("connected".equals(msg) && !response.isNull("session")) { if ("connected".equals(msg) && !response.isNull("session")) {
task.setResult( task.trySetResult(
new DDPClientCallback.Connect(client, response.optString("session"))); new DDPClientCallback.Connect(client, response.optString("session")));
subscriptions.unsubscribe(); subscriptions.unsubscribe();
} else if ("error".equals(msg) && "Already connected".equals( } else if ("error".equals(msg) && "Already connected".equals(
response.optString("reason"))) { response.optString("reason"))) {
task.setResult(new DDPClientCallback.Connect(client, null)); task.trySetResult(new DDPClientCallback.Connect(client, null));
subscriptions.unsubscribe(); subscriptions.unsubscribe();
} else if ("failed".equals(msg)) { } else if ("failed".equals(msg)) {
task.setError( task.trySetError(
new DDPClientCallback.Connect.Failed(client, response.optString("version"))); new DDPClientCallback.Connect.Failed(client, response.optString("version")));
subscriptions.unsubscribe(); subscriptions.unsubscribe();
} }
}, err -> { }, err -> {
task.setError(new DDPClientCallback.Connect.Timeout(client)); task.trySetError(new DDPClientCallback.Connect.Timeout(client));
})); }));
addErrorCallback(subscriptions, task); addErrorCallback(subscriptions, task);
...@@ -101,6 +102,12 @@ public class DDPClientImpl { ...@@ -101,6 +102,12 @@ public class DDPClientImpl {
public void ping(final TaskCompletionSource<DDPClientCallback.Ping> task, public void ping(final TaskCompletionSource<DDPClientCallback.Ping> task,
@Nullable final String id) { @Nullable final String id) {
final boolean requested = (TextUtils.isEmpty(id)) ?
sendMessage("ping", null) :
sendMessage("ping", json -> json.put("id", id));
if (requested) {
CompositeSubscription subscriptions = new CompositeSubscription(); CompositeSubscription subscriptions = new CompositeSubscription();
subscriptions.add( subscriptions.add(
...@@ -127,16 +134,15 @@ public class DDPClientImpl { ...@@ -127,16 +134,15 @@ public class DDPClientImpl {
})); }));
addErrorCallback(subscriptions, task); addErrorCallback(subscriptions, task);
if (TextUtils.isEmpty(id)) {
sendMessage("ping", null);
} else {
sendMessage("ping", json -> json.put("id", id));
} }
} }
public void sub(final TaskCompletionSource<DDPSubscription.Ready> task, String name, public void sub(final TaskCompletionSource<DDPSubscription.Ready> task, String name,
JSONArray params, String id) { JSONArray params, String id) {
final boolean requested =
sendMessage("sub", json -> json.put("id", id).put("name", name).put("params", params));
if (requested) {
CompositeSubscription subscriptions = new CompositeSubscription(); CompositeSubscription subscriptions = new CompositeSubscription();
subscriptions.add( subscriptions.add(
...@@ -168,12 +174,15 @@ public class DDPClientImpl { ...@@ -168,12 +174,15 @@ public class DDPClientImpl {
})); }));
addErrorCallback(subscriptions, task); addErrorCallback(subscriptions, task);
}
sendMessage("sub", json -> json.put("id", id).put("name", name).put("params", params));
} }
public void unsub(final TaskCompletionSource<DDPSubscription.NoSub> task, public void unsub(final TaskCompletionSource<DDPSubscription.NoSub> task,
@Nullable final String id) { @Nullable final String id) {
final boolean requested = sendMessage("unsub", json -> json.put("id", id));
if (requested) {
CompositeSubscription subscriptions = new CompositeSubscription(); CompositeSubscription subscriptions = new CompositeSubscription();
subscriptions.add( subscriptions.add(
...@@ -193,12 +202,16 @@ public class DDPClientImpl { ...@@ -193,12 +202,16 @@ public class DDPClientImpl {
})); }));
addErrorCallback(subscriptions, task); addErrorCallback(subscriptions, task);
}
sendMessage("unsub", json -> json.put("id", id));
} }
public void rpc(final TaskCompletionSource<DDPClientCallback.RPC> task, String method, public void rpc(final TaskCompletionSource<DDPClientCallback.RPC> task, String method,
JSONArray params, String id, long timeoutMs) { JSONArray params, String id, long timeoutMs) {
final boolean requested =
sendMessage("method",
json -> json.put("method", method).put("params", params).put("id", id));
if (requested) {
CompositeSubscription subscriptions = new CompositeSubscription(); CompositeSubscription subscriptions = new CompositeSubscription();
subscriptions.add( subscriptions.add(
...@@ -228,8 +241,7 @@ public class DDPClientImpl { ...@@ -228,8 +241,7 @@ public class DDPClientImpl {
})); }));
addErrorCallback(subscriptions, task); addErrorCallback(subscriptions, task);
}
sendMessage("method", json -> json.put("method", method).put("params", params).put("id", id));
} }
private void subscribeBaseListeners() { private void subscribeBaseListeners() {
...@@ -325,14 +337,22 @@ public class DDPClientImpl { ...@@ -325,14 +337,22 @@ public class DDPClientImpl {
}); });
} }
private void sendMessage(String msg, @Nullable JSONBuilder json) { private boolean sendMessage(String msg, @Nullable JSONBuilder json) {
try { try {
JSONObject origJson = new JSONObject().put("msg", msg); JSONObject origJson = new JSONObject().put("msg", msg);
String msg2 = (json == null ? origJson : json.create(origJson)).toString(); String msg2 = (json == null ? origJson : json.create(origJson)).toString();
websocket.sendText(msg2); return websocket.sendText(msg2);
} catch (Exception e) { } catch (Exception e) {
RCLog.e(e); RCLog.e(e);
} }
return true; // ignore exception here.
}
private void sendMessage(String msg, @Nullable JSONBuilder json,
TaskCompletionSource<?> taskForSetError) {
if (!sendMessage(msg, json)) {
taskForSetError.trySetError(new DDPClientCallback.Closed(client));
}
} }
private void addErrorCallback(CompositeSubscription subscriptions, TaskCompletionSource<?> task) { private void addErrorCallback(CompositeSubscription subscriptions, TaskCompletionSource<?> task) {
......
...@@ -63,15 +63,15 @@ public class RxWebSocket { ...@@ -63,15 +63,15 @@ public class RxWebSocket {
}).publish(); }).publish();
} }
public void sendText(String message) throws IOException { public boolean sendText(String message) throws IOException {
webSocket.send(message); return webSocket.send(message);
} }
public boolean isConnected() { public boolean isConnected() {
return isConnected; return isConnected;
} }
public void close(int code, String reason) throws IOException { public boolean close(int code, String reason) throws IOException {
webSocket.close(code, reason); return webSocket.close(code, reason);
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment