Commit 3ff26da3 authored by Leonardo Aramaki's avatar Leonardo Aramaki

Minor refactor and add version 1 to supported server versions to connect

parent bf4ec2bf
...@@ -28,7 +28,7 @@ public class DDPClientImpl { ...@@ -28,7 +28,7 @@ public class DDPClientImpl {
private CompositeDisposable disposables; private CompositeDisposable disposables;
private String currentSession; private String currentSession;
public DDPClientImpl(DDPClient self, OkHttpClient client) { /* package */ DDPClientImpl(DDPClient self, OkHttpClient client) {
websocket = new RxWebSocket(client); websocket = new RxWebSocket(client);
this.client = self; this.client = self;
} }
...@@ -60,13 +60,13 @@ public class DDPClientImpl { ...@@ -60,13 +60,13 @@ public class DDPClientImpl {
disposables.add( disposables.add(
flowable.filter(callback -> callback instanceof RxWebSocketCallback.Open) flowable.filter(callback -> callback instanceof RxWebSocketCallback.Open)
.subscribe( .subscribe(
callback -> callback ->
sendMessage("connect", sendMessage("connect",
json -> (TextUtils.isEmpty(session) ? json : json.put("session", DDPClientImpl.this.currentSession)) json -> (TextUtils.isEmpty(session) ? json : json.put("session", DDPClientImpl.this.currentSession))
.put( .put(
"version", "pre2") "version", "1")
.put("support", new JSONArray().put("pre2").put("pre1")), .put("support", new JSONArray().put("1").put("pre2").put("pre1")),
task), task),
RCLog::e RCLog::e
) )
...@@ -107,7 +107,7 @@ public class DDPClientImpl { ...@@ -107,7 +107,7 @@ public class DDPClientImpl {
} }
} }
public Maybe<DDPClientCallback.Base> ping(@Nullable final String id) { /* package */ Maybe<DDPClientCallback.Base> ping(@Nullable final String id) {
final boolean requested = (TextUtils.isEmpty(id)) ? final boolean requested = (TextUtils.isEmpty(id)) ?
sendMessage("ping", null) : sendMessage("ping", null) :
...@@ -145,7 +145,7 @@ public class DDPClientImpl { ...@@ -145,7 +145,7 @@ public class DDPClientImpl {
} }
} }
public void ping(final TaskCompletionSource<DDPClientCallback.Ping> task, /* package */void ping(final TaskCompletionSource<DDPClientCallback.Ping> task,
@Nullable final String id) { @Nullable final String id) {
final boolean requested = (TextUtils.isEmpty(id)) ? final boolean requested = (TextUtils.isEmpty(id)) ?
...@@ -185,7 +185,7 @@ public class DDPClientImpl { ...@@ -185,7 +185,7 @@ public class DDPClientImpl {
} }
} }
public void sub(final TaskCompletionSource<DDPSubscription.Ready> task, String name, /* package */ void sub(final TaskCompletionSource<DDPSubscription.Ready> task, String name,
JSONArray params, String id) { JSONArray params, String id) {
final boolean requested = final boolean requested =
sendMessage("sub", json -> json.put("id", id).put("name", name).put("params", params)); sendMessage("sub", json -> json.put("id", id).put("name", name).put("params", params));
...@@ -230,7 +230,7 @@ public class DDPClientImpl { ...@@ -230,7 +230,7 @@ public class DDPClientImpl {
} }
} }
public void unsub(final TaskCompletionSource<DDPSubscription.NoSub> task, /* package */ void unsub(final TaskCompletionSource<DDPSubscription.NoSub> task,
@Nullable final String id) { @Nullable final String id) {
final boolean requested = sendMessage("unsub", json -> json.put("id", id)); final boolean requested = sendMessage("unsub", json -> json.put("id", id));
...@@ -264,7 +264,7 @@ public class DDPClientImpl { ...@@ -264,7 +264,7 @@ public class DDPClientImpl {
} }
} }
public void rpc(final TaskCompletionSource<DDPClientCallback.RPC> task, String method, /* package */ void rpc(final TaskCompletionSource<DDPClientCallback.RPC> task, String method,
JSONArray params, String id, long timeoutMs) { JSONArray params, String id, long timeoutMs) {
final boolean requested = final boolean requested =
sendMessage("method", sendMessage("method",
...@@ -336,7 +336,7 @@ public class DDPClientImpl { ...@@ -336,7 +336,7 @@ public class DDPClientImpl {
); );
} }
public Flowable<DDPSubscription.Event> getDDPSubscription() { /* package */ Flowable<DDPSubscription.Event> getDDPSubscription() {
String[] targetMsgs = {"added", "changed", "removed", "addedBefore", "movedBefore"}; String[] targetMsgs = {"added", "changed", "removed", "addedBefore", "movedBefore"};
return flowable.filter(callback -> callback instanceof RxWebSocketCallback.Message) return flowable.filter(callback -> callback instanceof RxWebSocketCallback.Message)
.map(callback -> ((RxWebSocketCallback.Message) callback).responseBodyString) .map(callback -> ((RxWebSocketCallback.Message) callback).responseBodyString)
...@@ -379,13 +379,13 @@ public class DDPClientImpl { ...@@ -379,13 +379,13 @@ public class DDPClientImpl {
}); });
} }
public void unsubscribeBaseListeners() { /* package */ void unsubscribeBaseListeners() {
if (disposables.size() > 0 || !disposables.isDisposed()) { if (disposables.size() > 0 || !disposables.isDisposed()) {
disposables.clear(); disposables.clear();
} }
} }
public Task<RxWebSocketCallback.Close> getOnCloseCallback() { /* package */ Task<RxWebSocketCallback.Close> getOnCloseCallback() {
TaskCompletionSource<RxWebSocketCallback.Close> task = new TaskCompletionSource<>(); TaskCompletionSource<RxWebSocketCallback.Close> task = new TaskCompletionSource<>();
flowable.filter(callback -> callback instanceof RxWebSocketCallback.Close) flowable.filter(callback -> callback instanceof RxWebSocketCallback.Close)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment