Commit b6bee50a authored by Yusuke Iwaki's avatar Yusuke Iwaki

implement MethodCall (RPC) timeout

parent e5ed1f44
......@@ -68,7 +68,7 @@ dependencies {
compile 'com.facebook.stetho:stetho-okhttp3:1.4.1'
compile 'com.uphyca:stetho_realm:2.0.0'
compile 'chat.rocket:android-ddp:0.0.6'
compile 'chat.rocket:android-ddp:0.0.7'
compile 'com.jakewharton.timber:timber:4.3.1'
compile 'com.jakewharton.rxbinding:rxbinding:0.4.0'
......
......@@ -7,6 +7,7 @@ import chat.rocket.android.model.MethodCall;
import chat.rocket.android.model.ddp.RoomSubscription;
import chat.rocket.android.model.ServerConfig;
import chat.rocket.android.ws.RocketChatWebSocketAPI;
import chat.rocket.android_ddp.DDPClientCallback;
import java.util.UUID;
import jp.co.crowdworks.realm_java_helpers_bolts.RealmHelperBolts;
import org.json.JSONArray;
......@@ -20,6 +21,7 @@ public class MethodCallHelper {
private final String serverConfigId;
private final RocketChatWebSocketAPI api;
private static final long TIMEOUT_MS = 4000;
public MethodCallHelper(String serverConfigId) {
this.serverConfigId = serverConfigId;
......@@ -31,12 +33,12 @@ public class MethodCallHelper {
this.api = api;
}
private Task<JSONObject> executeMethodCall(String methodName, String param) {
private Task<JSONObject> executeMethodCall(String methodName, String param, long timeout) {
if (api != null) {
return api.rpc(UUID.randomUUID().toString(), methodName, param)
return api.rpc(UUID.randomUUID().toString(), methodName, param, timeout)
.onSuccessTask(task -> Task.forResult(task.getResult().result));
} else {
return MethodCall.execute(serverConfigId, methodName, param);
return MethodCall.execute(serverConfigId, methodName, param, timeout);
}
}
......@@ -47,6 +49,8 @@ public class MethodCallHelper {
if (exception instanceof MethodCall.Error) {
String errMessage = new JSONObject(exception.getMessage()).getString("message");
return Task.forError(new Exception(errMessage));
} else if (exception instanceof DDPClientCallback.RPC.Timeout) {
return Task.forError(new MethodCall.Timeout());
} else {
return Task.forError(exception);
}
......@@ -57,16 +61,16 @@ public class MethodCallHelper {
}
private interface ParamBuilder {
void buildParam(JSONArray param) throws JSONException;
void buildParam(JSONArray params) throws JSONException;
}
private <T> Task<T> call(String methodName,
private <T> Task<T> call(String methodName, long timeout,
Continuation<JSONObject, Task<T>> onSuccess) {
return injectErrorHandler(executeMethodCall(methodName, null))
return injectErrorHandler(executeMethodCall(methodName, null, timeout))
.onSuccessTask(onSuccess);
}
private <T> Task<T> call(String methodName, ParamBuilder paramBuilder,
private <T> Task<T> call(String methodName, long timeout, ParamBuilder paramBuilder,
Continuation<JSONObject, Task<T>> onSuccess) {
JSONArray params = new JSONArray();
......@@ -76,7 +80,7 @@ public class MethodCallHelper {
return Task.forError(exception);
}
return injectErrorHandler(executeMethodCall(methodName, params.toString()))
return injectErrorHandler(executeMethodCall(methodName, params.toString(), timeout))
.onSuccessTask(onSuccess);
}
......@@ -85,7 +89,7 @@ public class MethodCallHelper {
*/
public Task<Void> registerUser(final String name, final String email,
final String password, final String confirmPassword) {
return call("registerUser", params -> params.put(new JSONObject()
return call("registerUser", TIMEOUT_MS, params -> params.put(new JSONObject()
.put("name", name)
.put("email", email)
.put("pass", password)
......@@ -105,7 +109,7 @@ public class MethodCallHelper {
* Login with username/email and password.
*/
public Task<Void> loginWithEmail(final String usernameOrEmail, final String password) {
return call("login", params -> {
return call("login", TIMEOUT_MS, params -> {
JSONObject param = new JSONObject();
if (Patterns.EMAIL_ADDRESS.matcher(usernameOrEmail).matches()) {
param.put("user", new JSONObject().put("email", usernameOrEmail));
......@@ -124,7 +128,7 @@ public class MethodCallHelper {
*/
public Task<Void> loginWithGitHub(final String credentialToken,
final String credentialSecret) {
return call("login", params -> params.put(new JSONObject()
return call("login", TIMEOUT_MS, params -> params.put(new JSONObject()
.put("oauth", new JSONObject()
.put("credentialToken", credentialToken)
.put("credentialSecret", credentialSecret))
......@@ -135,7 +139,7 @@ public class MethodCallHelper {
* Login with token.
*/
public Task<Void> loginWithToken(final String token) {
return call("login", param -> param.put(new JSONObject().put("resume", token)),
return call("login", TIMEOUT_MS, params -> params.put(new JSONObject().put("resume", token)),
task -> Task.forResult(task.getResult().getString("token"))).onSuccessTask(this::saveToken);
}
......@@ -143,7 +147,7 @@ public class MethodCallHelper {
* Logout.
*/
public Task<Void> logout() {
return call("logout", task -> Task.forResult(null));
return call("logout", TIMEOUT_MS, task -> Task.forResult(null));
}
/**
......@@ -180,7 +184,7 @@ public class MethodCallHelper {
}
private Task<Long> getObjectRecursive(String objName, Customizer customizer, long timestamp) {
return call(objName + "/get",
return call(objName + "/get", TIMEOUT_MS,
params -> params.put(new JSONObject().put("$date", timestamp)),
task -> {
JSONObject result = task.getResult();
......
......@@ -22,6 +22,7 @@ public class MethodCall extends RealmObject {
private String name;
private String paramsJson;
private String resultJson;
private long timeout;
public String getMethodCallId() {
return methodCallId;
......@@ -71,6 +72,14 @@ public class MethodCall extends RealmObject {
this.resultJson = resultJson;
}
public long getTimeout() {
return timeout;
}
public void setTimeout(long timeout) {
this.timeout = timeout;
}
public static class Error extends Exception {
public Error(String message) {
super(message);
......@@ -81,10 +90,17 @@ public class MethodCall extends RealmObject {
}
}
public static class Timeout extends Exception {
public Timeout() {
super("MethodCall.Timeout");
}
}
/**
* insert a new record to request a method call.
*/
public static Task<JSONObject> execute(String serverConfigId, String name, String paramsJson) {
public static Task<JSONObject> execute(String serverConfigId, String name, String paramsJson,
long timeout) {
final String newId = UUID.randomUUID().toString();
TaskCompletionSource<JSONObject> task = new TaskCompletionSource<>();
RealmHelperBolts.executeTransaction(realm -> {
......@@ -92,6 +108,7 @@ public class MethodCall extends RealmObject {
.put("methodCallId", newId)
.put("serverConfigId", serverConfigId)
.put("syncstate", SyncState.NOT_SYNCED)
.put("timeout", timeout)
.put("name", name));
call.setParamsJson(paramsJson);
return null;
......
......@@ -105,23 +105,21 @@ public class RocketChatWebSocketThread extends HandlerThread {
}
@Override public boolean quit() {
scheduleUnregisterListeners();
if (isAlive()) {
scheduleUnregisterListenersAndQuit();
return true;
} else {
return super.quit();
}
@Override public boolean quitSafely() {
scheduleUnregisterListeners();
return super.quitSafely();
}
private void scheduleUnregisterListeners() {
if (isAlive()) {
private void scheduleUnregisterListenersAndQuit() {
new Handler(getLooper()).post(() -> {
Timber.d("thread %s: quit()", Thread.currentThread().getId());
unregisterListeners();
RocketChatWebSocketThread.super.quit();
});
}
}
private void prepareWebSocket(ServerConfig config) {
if (webSocketAPI == null || !webSocketAPI.isConnected()) {
......@@ -217,7 +215,7 @@ public class RocketChatWebSocketThread extends HandlerThread {
}
}
//@DebugLog
@DebugLog
private void unregisterListeners() {
if (!socketExists || !listenersRegistered) {
return;
......
......@@ -61,13 +61,14 @@ public class MethodCallObserver extends AbstractModelObserver<MethodCall> {
final String methodCallId = call.getMethodCallId();
final String methodName = call.getName();
final String params = call.getParamsJson();
final long timeout = call.getTimeout();
RealmHelperBolts.executeTransaction(realm ->
realm.createOrUpdateObjectFromJson(MethodCall.class, new JSONObject()
.put("methodCallId", methodCallId)
.put("syncstate", SyncState.SYNCING))
).onSuccessTask(task ->
webSocketAPI.rpc(methodCallId, methodName, params).onSuccessTask(_task ->
RealmHelperBolts.executeTransaction(realm -> {
webSocketAPI.rpc(methodCallId, methodName, params, timeout)
.onSuccessTask(_task -> RealmHelperBolts.executeTransaction(realm -> {
JSONObject result = _task.getResult().result;
return realm.createOrUpdateObjectFromJson(MethodCall.class, new JSONObject()
.put("methodCallId", methodCallId)
......
package chat.rocket.android.service.observer;
import android.content.Context;
import chat.rocket.android.helper.LogcatIfError;
import chat.rocket.android.helper.MethodCallHelper;
import chat.rocket.android.model.ServerConfig;
import chat.rocket.android.ws.RocketChatWebSocketAPI;
......@@ -30,6 +29,11 @@ public class TokenLoginObserver extends AbstractModelObserver<ServerConfig> {
ServerConfig config = list.get(0);
new MethodCallHelper(serverConfigId, webSocketAPI).loginWithToken(config.getToken())
.continueWith(new LogcatIfError());
.continueWith(task -> {
if (task.isFaulted()) {
ServerConfig.logConnectionError(serverConfigId, task.getError());
}
return null;
});
}
}
......@@ -80,13 +80,14 @@ public class RocketChatWebSocketAPI {
/**
* Execute raw RPC.
*/
public Task<DDPClientCallback.RPC> rpc(String methodCallId, String methodName, String params) {
public Task<DDPClientCallback.RPC> rpc(String methodCallId, String methodName, String params,
long timeoutMs) {
if (TextUtils.isEmpty(params)) {
return ddpClient.rpc(methodName, null, methodCallId);
return ddpClient.rpc(methodName, null, methodCallId, timeoutMs);
}
try {
return ddpClient.rpc(methodName, new JSONArray(params), methodCallId);
return ddpClient.rpc(methodName, new JSONArray(params), methodCallId, timeoutMs);
} catch (JSONException exception) {
return Task.forError(exception);
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment