Commit db0b3707 authored by Leonardo Aramaki's avatar Leonardo Aramaki

Added a ping rpc call that does not use bolts

parent 823f21ea
package chat.rocket.android_ddp; package chat.rocket.android_ddp;
import android.support.annotation.Nullable; import android.support.annotation.Nullable;
import io.reactivex.Flowable;
import org.json.JSONArray; import org.json.JSONArray;
import bolts.Task; import bolts.Task;
import bolts.TaskCompletionSource; import bolts.TaskCompletionSource;
import chat.rocket.android_ddp.rx.RxWebSocketCallback; import chat.rocket.android_ddp.rx.RxWebSocketCallback;
import io.reactivex.Single; import io.reactivex.Flowable;
import io.reactivex.Maybe;
import okhttp3.OkHttpClient; import okhttp3.OkHttpClient;
public class DDPClient { public class DDPClient {
...@@ -35,7 +36,7 @@ public class DDPClient { ...@@ -35,7 +36,7 @@ public class DDPClient {
return task.getTask(); return task.getTask();
} }
public Flowable<DDPClientCallback.Base> doPing(@Nullable String id) { public Maybe<DDPClientCallback.Base> doPing(@Nullable String id) {
return impl.ping(id); return impl.ping(id);
} }
......
...@@ -17,7 +17,7 @@ import chat.rocket.android.log.RCLog; ...@@ -17,7 +17,7 @@ import chat.rocket.android.log.RCLog;
import chat.rocket.android_ddp.rx.RxWebSocket; import chat.rocket.android_ddp.rx.RxWebSocket;
import chat.rocket.android_ddp.rx.RxWebSocketCallback; import chat.rocket.android_ddp.rx.RxWebSocketCallback;
import io.reactivex.Flowable; import io.reactivex.Flowable;
import io.reactivex.Single; import io.reactivex.Maybe;
import io.reactivex.disposables.CompositeDisposable; import io.reactivex.disposables.CompositeDisposable;
import okhttp3.OkHttpClient; import okhttp3.OkHttpClient;
...@@ -107,7 +107,7 @@ public class DDPClientImpl { ...@@ -107,7 +107,7 @@ public class DDPClientImpl {
} }
} }
public Flowable<DDPClientCallback.Base> ping(@Nullable final String id) { public Maybe<DDPClientCallback.Base> ping(@Nullable final String id) {
final boolean requested = (TextUtils.isEmpty(id)) ? final boolean requested = (TextUtils.isEmpty(id)) ?
sendMessage("ping", null) : sendMessage("ping", null) :
...@@ -115,7 +115,6 @@ public class DDPClientImpl { ...@@ -115,7 +115,6 @@ public class DDPClientImpl {
if (requested) { if (requested) {
return flowable.filter(callback -> callback instanceof RxWebSocketCallback.Message) return flowable.filter(callback -> callback instanceof RxWebSocketCallback.Message)
// .timeout(8, TimeUnit.SECONDS)
.map(callback -> ((RxWebSocketCallback.Message) callback).responseBodyString) .map(callback -> ((RxWebSocketCallback.Message) callback).responseBodyString)
.map(DDPClientImpl::toJson) .map(DDPClientImpl::toJson)
.filter(response -> "pong".equalsIgnoreCase(extractMsg(response))) .filter(response -> "pong".equalsIgnoreCase(extractMsg(response)))
...@@ -133,15 +132,15 @@ public class DDPClientImpl { ...@@ -133,15 +132,15 @@ public class DDPClientImpl {
if (id.equals(_id)) { if (id.equals(_id)) {
return new DDPClientCallback.Ping(client, _id); return new DDPClientCallback.Ping(client, _id);
} else { } else {
return new DDPClientCallback.Ping.UnMatched(client, id); return new DDPClientCallback.Ping.UnMatched(client, _id);
} }
} }
} }
// if we receive anything other than a pong throw an exception // if we receive anything other than a pong throw an exception
throw new DDPClientCallback.RPC.Error(client, id, response); throw new DDPClientCallback.RPC.Error(client, id, response);
}); }).firstElement();
} else { } else {
return Flowable.error(new DDPClientCallback.Closed(client)); return Maybe.error(new DDPClientCallback.Closed(client));
} }
} }
......
...@@ -6,7 +6,6 @@ import org.json.JSONArray; ...@@ -6,7 +6,6 @@ import org.json.JSONArray;
import org.json.JSONException; import org.json.JSONException;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.TimeoutException;
import bolts.Task; import bolts.Task;
import chat.rocket.android.helper.OkHttpHelper; import chat.rocket.android.helper.OkHttpHelper;
...@@ -16,7 +15,7 @@ import chat.rocket.android_ddp.DDPClient; ...@@ -16,7 +15,7 @@ import chat.rocket.android_ddp.DDPClient;
import chat.rocket.android_ddp.DDPClientCallback; import chat.rocket.android_ddp.DDPClientCallback;
import chat.rocket.android_ddp.DDPSubscription; import chat.rocket.android_ddp.DDPSubscription;
import io.reactivex.Flowable; import io.reactivex.Flowable;
import io.reactivex.Single; import io.reactivex.Maybe;
/** /**
* DDP client wrapper. * DDP client wrapper.
...@@ -130,7 +129,7 @@ public class DDPClientWrapper { ...@@ -130,7 +129,7 @@ public class DDPClientWrapper {
/** /**
* check WebSocket connectivity with ping. * check WebSocket connectivity with ping.
*/ */
public Flowable<DDPClientCallback.Base> doPing() { public Maybe<DDPClientCallback.Base> doPing() {
final String pingId = UUID.randomUUID().toString(); final String pingId = UUID.randomUUID().toString();
RCLog.d("ping[%s] >", pingId); RCLog.d("ping[%s] >", pingId);
return ddpClient.doPing(pingId); return ddpClient.doPing(pingId);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment