Commit 9aa8f4b9 authored by Leonardo Aramaki's avatar Leonardo Aramaki

Implementation of the ping call using only RxJava without bolts

parent debaa308
......@@ -7,6 +7,7 @@ import org.json.JSONArray;
import bolts.Task;
import bolts.TaskCompletionSource;
import chat.rocket.android_ddp.rx.RxWebSocketCallback;
import io.reactivex.Single;
import okhttp3.OkHttpClient;
public class DDPClient {
......@@ -34,6 +35,10 @@ public class DDPClient {
return task.getTask();
}
public Flowable<DDPClientCallback.Base> doPing(@Nullable String id) {
return impl.ping(id);
}
public Task<DDPClientCallback.RPC> rpc(String method, JSONArray params, String id,
long timeoutMs) {
TaskCompletionSource<DDPClientCallback.RPC> task = new TaskCompletionSource<>();
......
package chat.rocket.android_ddp;
import android.support.annotation.NonNull;
import android.support.annotation.Nullable;
import org.json.JSONObject;
......@@ -53,6 +54,15 @@ public class DDPClientCallback {
this.id = id;
}
public static class UnMatched extends Base {
@NonNull public String id;
public UnMatched(DDPClient client, @NonNull String id) {
super(client);
this.id = id;
}
}
public static class Timeout extends BaseException {
public Timeout(DDPClient client) {
super(Timeout.class, client);
......
......@@ -17,6 +17,7 @@ import chat.rocket.android.log.RCLog;
import chat.rocket.android_ddp.rx.RxWebSocket;
import chat.rocket.android_ddp.rx.RxWebSocketCallback;
import io.reactivex.Flowable;
import io.reactivex.Single;
import io.reactivex.disposables.CompositeDisposable;
import okhttp3.OkHttpClient;
......@@ -106,40 +107,76 @@ public class DDPClientImpl {
}
}
public void ping(final TaskCompletionSource<DDPClientCallback.Ping> task,
@Nullable final String id) {
public Flowable<DDPClientCallback.Base> ping(@Nullable final String id) {
final boolean requested = (TextUtils.isEmpty(id)) ?
sendMessage("ping", null) :
sendMessage("ping", json -> json.put("id", id));
if (requested) {
CompositeDisposable disposables = new CompositeDisposable();
disposables.add(
flowable.filter(callback -> callback instanceof RxWebSocketCallback.Message)
.timeout(8, TimeUnit.SECONDS)
return flowable.filter(callback -> callback instanceof RxWebSocketCallback.Message)
// .timeout(8, TimeUnit.SECONDS)
.map(callback -> ((RxWebSocketCallback.Message) callback).responseBodyString)
.map(DDPClientImpl::toJson)
.subscribe(
response -> {
String msg = extractMsg(response);
if ("pong".equals(msg)) {
if (response.isNull("id")) {
task.setResult(new DDPClientCallback.Ping(client, null));
disposables.clear();
} else {
String _id = response.optString("id");
if (id.equals(_id)) {
task.setResult(new DDPClientCallback.Ping(client, id));
disposables.clear();
}
}
disposables.clear();
.filter(response -> "pong".equalsIgnoreCase(extractMsg(response)))
.doOnError(error -> {
RCLog.e(error, "Heartbeat ping[%s] xxx failed xxx", id);
})
.map(response -> {
String msg = extractMsg(response);
if ("pong".equals(msg)) {
RCLog.d("pong[%s] <", id);
if (response.isNull("id")) {
return new DDPClientCallback.Ping(client, null);
} else {
String _id = response.optString("id");
if (id.equals(_id)) {
return new DDPClientCallback.Ping(client, _id);
} else {
return new DDPClientCallback.Ping.UnMatched(client, id);
}
},
err -> task.setError(new DDPClientCallback.Ping.Timeout(client))
)
}
}
// if we receive anything other than a pong throw an exception
throw new DDPClientCallback.RPC.Error(client, id, response);
});
} else {
return Flowable.error(new DDPClientCallback.Closed(client));
}
}
public void ping(final TaskCompletionSource<DDPClientCallback.Ping> task,
@Nullable final String id) {
final boolean requested = (TextUtils.isEmpty(id)) ?
sendMessage("ping", null) :
sendMessage("ping", json -> json.put("id", id));
if (requested) {
CompositeDisposable disposables = new CompositeDisposable();
disposables.add(
flowable.filter(callback -> callback instanceof RxWebSocketCallback.Message)
.timeout(8, TimeUnit.SECONDS)
.map(callback -> ((RxWebSocketCallback.Message) callback).responseBodyString)
.map(DDPClientImpl::toJson)
.subscribe(
response -> {
String msg = extractMsg(response);
if ("pong".equals(msg)) {
if (response.isNull("id")) {
task.setResult(new DDPClientCallback.Ping(client, null));
} else {
String _id = response.optString("id");
if (id.equals(_id)) {
task.setResult(new DDPClientCallback.Ping(client, id));
}
}
disposables.clear();
}
},
err -> task.setError(new DDPClientCallback.Ping.Timeout(client))
)
);
addErrorCallback(disposables, task);
......@@ -368,12 +405,11 @@ public class DDPClientImpl {
try {
JSONObject origJson = new JSONObject().put("msg", msg);
String msg2 = (json == null ? origJson : json.create(origJson)).toString();
websocket.sendText(msg2);
return websocket.sendText(msg2);
} catch (Exception e) {
RCLog.e(e);
return false;
}
return true; // ignore exception here.
}
private void sendMessage(String msg, @Nullable JSONBuilder json,
......
......@@ -6,6 +6,7 @@ import org.json.JSONArray;
import org.json.JSONException;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
import bolts.Task;
import chat.rocket.android.helper.OkHttpHelper;
......@@ -15,6 +16,7 @@ import chat.rocket.android_ddp.DDPClient;
import chat.rocket.android_ddp.DDPClientCallback;
import chat.rocket.android_ddp.DDPSubscription;
import io.reactivex.Flowable;
import io.reactivex.Single;
/**
* DDP client wrapper.
......@@ -124,4 +126,13 @@ public class DDPClientWrapper {
}
});
}
/**
* check WebSocket connectivity with ping.
*/
public Flowable<DDPClientCallback.Base> doPing() {
final String pingId = UUID.randomUUID().toString();
RCLog.d("ping[%s] >", pingId);
return ddpClient.doPing(pingId);
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment