Commit 739efd88 authored by Leonardo Aramaki's avatar Leonardo Aramaki

Keep always a single thread for any number of signed-in servers

parent c6132148
......@@ -20,165 +20,164 @@ import io.reactivex.annotations.Nullable;
import okhttp3.OkHttpClient;
public class DDPClient {
// reference: https://github.com/eddflrs/meteor-ddp/blob/master/meteor-ddp.js
private static volatile DDPClient singleton;
private static volatile OkHttpClient client;
private final DDPClientImpl impl;
private final AtomicReference<String> hostname = new AtomicReference<>();
public static void initialize(OkHttpClient okHttpClient) {
client = okHttpClient;
}
public static DDPClient get() {
DDPClient result = singleton;
if (result == null) {
synchronized (DDPClient.class) {
result = singleton;
// reference: https://github.com/eddflrs/meteor-ddp/blob/master/meteor-ddp.js
public static final int REASON_CLOSED_BY_USER = 1000;
public static final int REASON_NETWORK_ERROR = 1001;
private static volatile DDPClient singleton;
private static volatile OkHttpClient client;
private final DDPClientImpl impl;
private final AtomicReference<String> hostname = new AtomicReference<>();
public static void initialize(OkHttpClient okHttpClient) {
client = okHttpClient;
}
public static DDPClient get() {
DDPClient result = singleton;
if (result == null) {
singleton = result = new DDPClient(client);
synchronized (DDPClient.class) {
result = singleton;
if (result == null) {
singleton = result = new DDPClient(client);
}
}
}
}
}
return result;
}
private DDPClient(OkHttpClient client) {
impl = new DDPClientImpl(this, client);
}
public Task<DDPClientCallback.Connect> connect(String url, String session) {
String oldHostname = hostname.getAndSet(url);
if (oldHostname != null && !oldHostname.equalsIgnoreCase(url)) {
close();
}
TaskCompletionSource<DDPClientCallback.Connect> task = new TaskCompletionSource<>();
impl.connect(task, url, session);
return task.getTask();
}
public Task<DDPClientCallback.Ping> ping(@Nullable String id) {
TaskCompletionSource<DDPClientCallback.Ping> task = new TaskCompletionSource<>();
impl.ping(task, id);
return task.getTask();
}
public Maybe<DDPClientCallback.Base> doPing(@Nullable String id) {
return impl.ping(id);
}
public Task<DDPSubscription.Ready> sub(String id, String name, JSONArray params) {
TaskCompletionSource<DDPSubscription.Ready> task = new TaskCompletionSource<>();
impl.sub(task, name, params, id);
return task.getTask();
}
public Task<DDPSubscription.NoSub> unsub(String id) {
TaskCompletionSource<DDPSubscription.NoSub> task = new TaskCompletionSource<>();
impl.unsub(task, id);
return task.getTask();
}
public Task<RxWebSocketCallback.Close> getOnCloseCallback() {
return impl.getOnCloseCallback();
}
public void close() {
impl.close(1000, "closed by DDPClient#close()");
}
/**
* check WebSocket connectivity with ping.
*/
public Task<Void> ping() {
final String pingId = UUID.randomUUID().toString();
RCLog.d("ping[%s] >", pingId);
return ping(pingId)
.continueWithTask(task -> {
if (task.isFaulted()) {
RCLog.d(task.getError(), "ping[%s] xxx failed xxx", pingId);
return Task.forError(task.getError());
} else {
RCLog.d("pong[%s] <", pingId);
return Task.forResult(null);
}
return result;
}
private DDPClient(OkHttpClient client) {
impl = new DDPClientImpl(this, client);
}
public Task<DDPClientCallback.Connect> connect(String url, String session) {
hostname.set(url);
TaskCompletionSource<DDPClientCallback.Connect> task = new TaskCompletionSource<>();
impl.connect(task, url, session);
return task.getTask();
}
public Task<DDPClientCallback.Ping> ping(@Nullable String id) {
TaskCompletionSource<DDPClientCallback.Ping> task = new TaskCompletionSource<>();
impl.ping(task, id);
return task.getTask();
}
public Maybe<DDPClientCallback.Base> doPing(@Nullable String id) {
return impl.ping(id);
}
public Task<DDPSubscription.Ready> sub(String id, String name, JSONArray params) {
TaskCompletionSource<DDPSubscription.Ready> task = new TaskCompletionSource<>();
impl.sub(task, name, params, id);
return task.getTask();
}
public Task<DDPSubscription.NoSub> unsub(String id) {
TaskCompletionSource<DDPSubscription.NoSub> task = new TaskCompletionSource<>();
impl.unsub(task, id);
return task.getTask();
}
public Task<RxWebSocketCallback.Close> getOnCloseCallback() {
return impl.getOnCloseCallback();
}
public void close() {
impl.close(REASON_CLOSED_BY_USER, "closed by DDPClient#close()");
}
/**
* check WebSocket connectivity with ping.
*/
public Task<Void> ping() {
final String pingId = UUID.randomUUID().toString();
RCLog.d("ping[%s] >", pingId);
return ping(pingId)
.continueWithTask(task -> {
if (task.isFaulted()) {
RCLog.d(task.getError(), "ping[%s] xxx failed xxx", pingId);
return Task.forError(task.getError());
} else {
RCLog.d("pong[%s] <", pingId);
return Task.forResult(null);
}
});
}
/**
* check WebSocket connectivity with ping.
*/
public Maybe<DDPClientCallback.Base> doPing() {
final String pingId = UUID.randomUUID().toString();
RCLog.d("ping[%s] >", pingId);
return doPing(pingId);
}
/**
* Connect to WebSocket server with DDP client.
*/
public Task<DDPClientCallback.Connect> connect(@NonNull String hostname, @Nullable String session,
boolean usesSecureConnection) {
final String protocol = usesSecureConnection ? "wss://" : "ws://";
return connect(protocol + hostname + "/websocket", session);
}
/**
* Subscribe with DDP client.
*/
public Task<DDPSubscription.Ready> subscribe(final String name, JSONArray param) {
final String subscriptionId = UUID.randomUUID().toString();
RCLog.d("sub:[%s]> %s(%s)", subscriptionId, name, param);
return sub(subscriptionId, name, param);
}
/**
* Unsubscribe with DDP client.
*/
public Task<DDPSubscription.NoSub> unsubscribe(final String subscriptionId) {
RCLog.d("unsub:[%s]>", subscriptionId);
return unsub(subscriptionId);
}
/**
* Returns Observable for handling DDP subscription.
*/
public Flowable<DDPSubscription.Event> getSubscriptionCallback() {
return impl.getDDPSubscription();
}
/**
* Execute raw RPC.
*/
public Task<DDPClientCallback.RPC> rpc(String methodCallId, String methodName, String params,
long timeoutMs) {
TaskCompletionSource<DDPClientCallback.RPC> task = new TaskCompletionSource<>();
RCLog.d("rpc:[%s]> %s(%s) timeout=%d", methodCallId, methodName, params, timeoutMs);
if (TextUtils.isEmpty(params)) {
impl.rpc(task, methodName, null, methodCallId, timeoutMs);
return task.getTask().continueWithTask(task_ -> {
if (task_.isFaulted()) {
RCLog.d("rpc:[%s]< error = %s", methodCallId, task_.getError());
} else {
RCLog.d("rpc:[%s]< result = %s", methodCallId, task_.getResult().result);
}
return task_;
});
}
/**
* check WebSocket connectivity with ping.
*/
public Maybe<DDPClientCallback.Base> doPing() {
final String pingId = UUID.randomUUID().toString();
RCLog.d("ping[%s] >", pingId);
return doPing(pingId);
}
/**
* Connect to WebSocket server with DDP client.
*/
public Task<DDPClientCallback.Connect> connect(@NonNull String hostname, @Nullable String session,
boolean usesSecureConnection) {
final String protocol = usesSecureConnection ? "wss://" : "ws://";
return connect(protocol + hostname + "/websocket", session);
}
/**
* Subscribe with DDP client.
*/
public Task<DDPSubscription.Ready> subscribe(final String name, JSONArray param) {
final String subscriptionId = UUID.randomUUID().toString();
RCLog.d("sub:[%s]> %s(%s)", subscriptionId, name, param);
return sub(subscriptionId, name, param);
}
/**
* Unsubscribe with DDP client.
*/
public Task<DDPSubscription.NoSub> unsubscribe(final String subscriptionId) {
RCLog.d("unsub:[%s]>", subscriptionId);
return unsub(subscriptionId);
}
/**
* Returns Observable for handling DDP subscription.
*/
public Flowable<DDPSubscription.Event> getSubscriptionCallback() {
return impl.getDDPSubscription();
}
/**
* Execute raw RPC.
*/
public Task<DDPClientCallback.RPC> rpc(String methodCallId, String methodName, String params,
long timeoutMs) {
TaskCompletionSource<DDPClientCallback.RPC> task = new TaskCompletionSource<>();
RCLog.d("rpc:[%s]> %s(%s) timeout=%d", methodCallId, methodName, params, timeoutMs);
if (TextUtils.isEmpty(params)) {
impl.rpc(task, methodName, null, methodCallId, timeoutMs);
return task.getTask().continueWithTask(task_ -> {
if (task_.isFaulted()) {
RCLog.d("rpc:[%s]< error = %s", methodCallId, task_.getError());
} else {
RCLog.d("rpc:[%s]< result = %s", methodCallId, task_.getResult().result);
}
return task_;
});
}
try {
impl.rpc(task, methodName, new JSONArray(params), methodCallId, timeoutMs);
return task.getTask().continueWithTask(task_ -> {
if (task_.isFaulted()) {
RCLog.d("rpc:[%s]< error = %s", methodCallId, task_.getError());
} else {
RCLog.d("rpc:[%s]< result = %s", methodCallId, task_.getResult().result);
try {
impl.rpc(task, methodName, new JSONArray(params), methodCallId, timeoutMs);
return task.getTask().continueWithTask(task_ -> {
if (task_.isFaulted()) {
RCLog.d("rpc:[%s]< error = %s", methodCallId, task_.getError());
} else {
RCLog.d("rpc:[%s]< result = %s", methodCallId, task_.getResult().result);
}
return task_;
});
} catch (JSONException exception) {
return Task.forError(exception);
}
return task_;
});
} catch (JSONException exception) {
return Task.forError(exception);
}
}
}
......@@ -18,6 +18,7 @@ import chat.rocket.android.log.RCLog;
import chat.rocket.android.service.ConnectivityManagerApi;
import chat.rocket.android.service.ServerConnectivity;
import chat.rocket.android.shared.BasePresenter;
import chat.rocket.android_ddp.DDPClient;
import chat.rocket.core.PublicSettingsConstants;
import chat.rocket.core.interactors.CanCreateRoomInteractor;
import chat.rocket.core.interactors.RoomInteractor;
......@@ -227,7 +228,9 @@ public class MainPresenter extends BasePresenter<MainContract.View>
view.showConnectionOk();
view.refreshRoom();
} else if (connectivity.state == ServerConnectivity.STATE_DISCONNECTED) {
view.showConnectionError();
if (connectivity.code == DDPClient.REASON_NETWORK_ERROR) {
view.showConnectionError();
}
} else {
view.showConnecting();
}
......
......@@ -16,6 +16,7 @@ import java.util.concurrent.TimeUnit;
import chat.rocket.android.RocketChatCache;
import chat.rocket.android.helper.RxHelper;
import chat.rocket.android.log.RCLog;
import chat.rocket.android_ddp.DDPClient;
import chat.rocket.core.models.ServerInfo;
import chat.rocket.persistence.realm.models.RealmBasedServerInfo;
import hugo.weaving.DebugLog;
......@@ -79,7 +80,7 @@ import rx.subjects.PublishSubject;
.subscribe(_val -> {
}, error -> {
RCLog.e(error);
notifyConnectionLost(hostname, REASON_NETWORK_ERROR);
notifyConnectionLost(hostname, DDPClient.REASON_NETWORK_ERROR);
});
}
......@@ -138,10 +139,10 @@ import rx.subjects.PublishSubject;
@DebugLog
@Override
public void notifyConnectionLost(String hostname, int reason) {
public void notifyConnectionLost(String hostname, int code) {
serverConnectivityList.put(hostname, ServerConnectivity.STATE_DISCONNECTED);
connectivitySubject.onNext(
new ServerConnectivity(hostname, ServerConnectivity.STATE_DISCONNECTED));
new ServerConnectivity(hostname, ServerConnectivity.STATE_DISCONNECTED, code));
}
@DebugLog
......@@ -197,7 +198,7 @@ import rx.subjects.PublishSubject;
if (connectivity == ServerConnectivity.STATE_CONNECTING) {
return waitForConnected(hostname)
.doOnError(err -> notifyConnectionLost(hostname, REASON_NETWORK_ERROR))
.doOnError(err -> notifyConnectionLost(hostname, DDPClient.REASON_NETWORK_ERROR))
.flatMap(_val -> disconnectFromServerIfNeeded(hostname));
}
......
......@@ -8,11 +8,11 @@ import android.os.Binder;
import android.os.IBinder;
import android.support.annotation.Nullable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import chat.rocket.android.helper.Logger;
import chat.rocket.android.log.RCLog;
import chat.rocket.persistence.realm.RealmStore;
import hugo.weaving.DebugLog;
import rx.Observable;
......@@ -24,8 +24,8 @@ import rx.Single;
public class RocketChatService extends Service implements ConnectivityServiceInterface {
private ConnectivityManagerInternal connectivityManager;
private static volatile ConcurrentHashMap<String, RocketChatWebSocketThread> webSocketThreads;
private static volatile Semaphore webSocketThreadLock = new Semaphore(1);
private static volatile RocketChatWebSocketThread currentWebSocketThread;
public class LocalBinder extends Binder {
ConnectivityServiceInterface getServiceInterface() {
......@@ -57,7 +57,6 @@ public class RocketChatService extends Service implements ConnectivityServiceInt
super.onCreate();
connectivityManager = ConnectivityManager.getInstanceForInternal(getApplicationContext());
connectivityManager.resetConnectivityStateList();
webSocketThreads = new ConcurrentHashMap<>();
}
@DebugLog
......@@ -72,7 +71,7 @@ public class RocketChatService extends Service implements ConnectivityServiceInt
return getOrCreateWebSocketThread(hostname)
.doOnError(err -> {
err.printStackTrace();
webSocketThreads.remove(hostname);
currentWebSocketThread = null;
// connectivityManager.notifyConnectionLost(hostname, ConnectivityManagerInternal.REASON_NETWORK_ERROR);
})
.flatMap(webSocketThreads -> webSocketThreads.keepAlive());
......@@ -81,17 +80,15 @@ public class RocketChatService extends Service implements ConnectivityServiceInt
@Override
public Single<Boolean> disconnectFromServer(String hostname) { //called via binder.
return Single.defer(() -> {
if (!webSocketThreads.containsKey(hostname)) {
if (!threadCreatedForHostname(hostname)) {
return Single.just(true);
}
RocketChatWebSocketThread thread = webSocketThreads.get(hostname);
if (thread != null) {
return thread.terminate()
if (currentWebSocketThread != null) {
return currentWebSocketThread.terminate()
// after disconnection from server
.doAfterTerminate(() -> {
// remove RCWebSocket key from HashMap
webSocketThreads.remove(hostname);
currentWebSocketThread = null;
// remove RealmConfiguration key from HashMap
RealmStore.sStore.remove(hostname);
});
......@@ -108,24 +105,52 @@ public class RocketChatService extends Service implements ConnectivityServiceInt
webSocketThreadLock.acquire();
int connectivityState = ConnectivityManager.getInstance(getApplicationContext()).getConnectivityState(hostname);
boolean isConnected = connectivityState == ServerConnectivity.STATE_CONNECTED;
if (webSocketThreads.containsKey(hostname) && isConnected) {
RocketChatWebSocketThread thread = webSocketThreads.get(hostname);
if (currentWebSocketThread != null && threadCreatedForHostname(hostname)) {
webSocketThreadLock.release();
return Single.just(thread);
return Single.just(currentWebSocketThread);
}
connectivityManager.notifyConnecting(hostname);
if (currentWebSocketThread != null) {
return currentWebSocketThread.terminate()
.doOnError(RCLog::e)
.flatMap(terminated ->
RocketChatWebSocketThread.getStarted(getApplicationContext(), hostname)
.doOnSuccess(thread -> {
currentWebSocketThread = thread;
webSocketThreadLock.release();
})
.doOnError(throwable -> {
currentWebSocketThread = null;
RCLog.e(throwable);
Logger.report(throwable);
webSocketThreadLock.release();
})
);
}
return RocketChatWebSocketThread.getStarted(getApplicationContext(), hostname)
.doOnSuccess(thread -> {
webSocketThreads.put(hostname, thread);
currentWebSocketThread = thread;
webSocketThreadLock.release();
})
.doOnError(throwable -> {
currentWebSocketThread = null;
RCLog.e(throwable);
Logger.report(throwable);
webSocketThreadLock.release();
});
});
}
private boolean threadCreatedForHostname(String hostname) {
if (hostname == null || currentWebSocketThread == null) {
return false;
}
return currentWebSocketThread.getName().equals("RC_thread_" + hostname);
}
@Nullable
@Override
public IBinder onBind(Intent intent) {
......
......@@ -35,6 +35,7 @@ import chat.rocket.android.service.observer.PushSettingsObserver;
import chat.rocket.android.service.observer.SessionObserver;
import chat.rocket.android_ddp.DDPClient;
import chat.rocket.android_ddp.DDPClientCallback;
import chat.rocket.android_ddp.rx.RxWebSocketCallback;
import chat.rocket.core.models.ServerInfo;
import chat.rocket.persistence.realm.RealmHelper;
import chat.rocket.persistence.realm.RealmStore;
......@@ -155,14 +156,14 @@ public class RocketChatWebSocketThread extends HandlerThread {
RCLog.d("thread %s: terminated()", Thread.currentThread().getId());
unregisterListenersAndClose();
connectivityManager.notifyConnectionLost(hostname,
ConnectivityManagerInternal.REASON_CLOSED_BY_USER);
DDPClient.REASON_CLOSED_BY_USER);
RocketChatWebSocketThread.super.quit();
emitter.onSuccess(true);
});
});
} else {
connectivityManager.notifyConnectionLost(hostname,
ConnectivityManagerInternal.REASON_NETWORK_ERROR);
DDPClient.REASON_NETWORK_ERROR);
super.quit();
return Single.just(true);
}
......@@ -206,7 +207,7 @@ public class RocketChatWebSocketThread extends HandlerThread {
Exception error = task.getError();
RCLog.e(error);
connectivityManager.notifyConnectionLost(
hostname, ConnectivityManagerInternal.REASON_NETWORK_ERROR);
hostname, DDPClient.REASON_CLOSED_BY_USER);
emitter.onError(error);
} else {
keepAliveTimer.update();
......@@ -258,42 +259,43 @@ public class RocketChatWebSocketThread extends HandlerThread {
}
RCLog.d("DDPClient#connect");
DDPClient.get().connect(hostname, info.getSession(), info.isSecure())
.onSuccessTask(task -> {
final String newSession = task.getResult().session;
connectivityManager.notifyConnectionEstablished(hostname, newSession);
// handling WebSocket#onClose() callback.
task.getResult().client.getOnCloseCallback().onSuccess(_task -> {
if (_task.getResult().code != 1000) {
reconnect();
}
return null;
});
return realmHelper.executeTransaction(realm -> {
RealmSession sessionObj = RealmSession.queryDefaultSession(realm).findFirst();
if (sessionObj == null) {
realm.createOrUpdateObjectFromJson(RealmSession.class,
new JSONObject().put(RealmSession.ID, RealmSession.DEFAULT_ID));
} else {
// invalidate login token.
if (!TextUtils.isEmpty(sessionObj.getToken()) && sessionObj.isTokenVerified()) {
sessionObj.setTokenVerified(false);
sessionObj.setError(null);
}
.onSuccessTask(task -> {
final String newSession = task.getResult().session;
connectivityManager.notifyConnectionEstablished(hostname, newSession);
// handling WebSocket#onClose() callback.
task.getResult().client.getOnCloseCallback().onSuccess(_task -> {
RxWebSocketCallback.Close result = _task.getResult();
if (result.code == DDPClient.REASON_NETWORK_ERROR) {
reconnect();
}
return null;
});
return realmHelper.executeTransaction(realm -> {
RealmSession sessionObj = RealmSession.queryDefaultSession(realm).findFirst();
if (sessionObj == null) {
realm.createOrUpdateObjectFromJson(RealmSession.class,
new JSONObject().put(RealmSession.ID, RealmSession.DEFAULT_ID));
} else {
// invalidate login token.
if (!TextUtils.isEmpty(sessionObj.getToken()) && sessionObj.isTokenVerified()) {
sessionObj.setTokenVerified(false);
sessionObj.setError(null);
}
}
return null;
});
})
.continueWith(task -> {
if (task.isFaulted()) {
emitter.onError(task.getError());
} else {
emitter.onSuccess(true);
}
return null;
});
}
return null;
});
})
.continueWith(task -> {
if (task.isFaulted()) {
emitter.onError(task.getError());
} else {
emitter.onSuccess(true);
}
return null;
});
}));
}
......@@ -318,8 +320,10 @@ public class RocketChatWebSocketThread extends HandlerThread {
error -> {
logErrorAndUnsubscribe(reconnectSubscription, error);
connectivityManager.notifyConnectionLost(hostname,
ConnectivityManagerInternal.REASON_NETWORK_ERROR);
new Handler(getLooper()).post(this::unregisterListeners);
DDPClient.REASON_CLOSED_BY_USER);
if (isAlive()) {
new Handler(getLooper()).post(this::unregisterListeners);
}
}
)
);
......@@ -445,10 +449,8 @@ public class RocketChatWebSocketThread extends HandlerThread {
@DebugLog
private void unregisterListenersAndClose() {
unregisterListeners();
if (DDPClient.get() != null) {
DDPClient.get().close();
}
unregisterListeners();
DDPClient.get().close();
}
@DebugLog
......
......@@ -11,10 +11,18 @@ public class ServerConnectivity {
public final String hostname;
public final int state;
public final int code;
public ServerConnectivity(String hostname, int state) {
ServerConnectivity(String hostname, int state) {
this.hostname = hostname;
this.state = state;
this.code = -1;
}
ServerConnectivity(String hostname, int state, int code) {
this.hostname = hostname;
this.state = state;
this.code = code;
}
/**
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment