Commit 8926ab87 authored by Yusuke Iwaki's avatar Yusuke Iwaki

add ConnectivityManager skelton.

parent d13702d9
...@@ -114,8 +114,8 @@ dependencies { ...@@ -114,8 +114,8 @@ dependencies {
compile 'com.facebook.stetho:stetho-okhttp3:1.4.1' compile 'com.facebook.stetho:stetho-okhttp3:1.4.1'
compile 'com.uphyca:stetho_realm:2.0.1' compile 'com.uphyca:stetho_realm:2.0.1'
compile 'com.jakewharton.rxbinding:rxbinding:0.4.0' compile 'com.jakewharton.rxbinding:rxbinding:1.0.0'
compile 'com.jakewharton.rxbinding:rxbinding-support-v4:0.4.0' compile 'com.jakewharton.rxbinding:rxbinding-support-v4:1.0.0'
compile 'com.trello:rxlifecycle:1.0' compile 'com.trello:rxlifecycle:1.0'
compile 'com.trello:rxlifecycle-android:1.0' compile 'com.trello:rxlifecycle-android:1.0'
......
package chat.rocket.android.api; package chat.rocket.android.api;
import android.support.annotation.Nullable; import android.support.annotation.Nullable;
import org.json.JSONArray;
import org.json.JSONException;
import java.util.UUID;
import bolts.Task; import bolts.Task;
import chat.rocket.android.helper.OkHttpHelper; import chat.rocket.android.helper.OkHttpHelper;
import chat.rocket.android.helper.TextUtils; import chat.rocket.android.helper.TextUtils;
...@@ -9,11 +12,6 @@ import chat.rocket.android.log.RCLog; ...@@ -9,11 +12,6 @@ import chat.rocket.android.log.RCLog;
import chat.rocket.android_ddp.DDPClient; import chat.rocket.android_ddp.DDPClient;
import chat.rocket.android_ddp.DDPClientCallback; import chat.rocket.android_ddp.DDPClientCallback;
import chat.rocket.android_ddp.DDPSubscription; import chat.rocket.android_ddp.DDPSubscription;
import java.util.UUID;
import org.json.JSONArray;
import org.json.JSONException;
import rx.Observable; import rx.Observable;
/** /**
...@@ -82,10 +80,6 @@ public class DDPClientWrapper { ...@@ -82,10 +80,6 @@ public class DDPClientWrapper {
return ddpClient.getSubscriptionCallback(); return ddpClient.getSubscriptionCallback();
} }
private String generateId(String method) {
return method + "-" + UUID.randomUUID().toString().replace("-", "");
}
/** /**
* Execute raw RPC. * Execute raw RPC.
*/ */
...@@ -117,4 +111,22 @@ public class DDPClientWrapper { ...@@ -117,4 +111,22 @@ public class DDPClientWrapper {
return Task.forError(exception); return Task.forError(exception);
} }
} }
/**
* check WebSocket connectivity with ping.
*/
public Task<Void> ping() {
final String pingId = UUID.randomUUID().toString();
RCLog.d("ping[%s] >", pingId);
return ddpClient.ping(pingId)
.continueWithTask(task -> {
if (task.isFaulted()) {
RCLog.d("ping[%s] xxx failed xxx", pingId);
return Task.forError(task.getError());
} else {
RCLog.d("pong[%s] <");
return Task.forResult(null);
}
});
}
} }
package chat.rocket.android.service;
import android.content.Context;
/**
* Connectivity Manager API Factory.
*/
public class ConnectivityManager {
private static ConnectivityManagerImpl IMPL = new ConnectivityManagerImpl();
public static ConnectivityManagerApi getInstance(Context appContext) {
return IMPL.setContext(appContext);
}
/*package*/ static ConnectivityManagerInternal getInstanceForInternal(Context appContext) {
return IMPL.setContext(appContext);
}
}
package chat.rocket.android.service;
import android.support.annotation.Nullable;
import java.util.List;
import rx.Completable;
import rx.Observable;
/**
* interfaces used for Activity/Fragment and other UI-related logic.
*/
public interface ConnectivityManagerApi {
void keepAliveServer();
void addOrUpdateServer(String hostname, @Nullable String name);
void removeServer(String hostname);
Completable connect(String hostname);
List<ServerInfo> getServerList();
Observable<ServerConnectivity> getServerConnectivityAsObservable();
}
package chat.rocket.android.service;
import android.content.ComponentName;
import android.content.Context;
import android.content.ServiceConnection;
import android.os.IBinder;
import android.support.annotation.Nullable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import rx.Completable;
import rx.Observable;
import rx.subjects.PublishSubject;
/**
* Connectivity management implementation.
*/
/*package*/ class ConnectivityManagerImpl implements ConnectivityManagerApi, ConnectivityManagerInternal {
private final HashMap<String, Integer> serverConnectivityList = new HashMap<>();
private final PublishSubject<ServerConnectivity> connectivitySubject = PublishSubject.create();
private Context appContext;
private final ServiceConnection serviceConnection = new ServiceConnection() {
@Override
public void onServiceConnected(ComponentName componentName, IBinder binder) {
serviceInterface = ((RocketChatService.LocalBinder) binder).getServiceInterface();
}
@Override
public void onServiceDisconnected(ComponentName componentName) {
serviceInterface = null;
}
};
private ConnectivityServiceInterface serviceInterface;
/*package*/ ConnectivityManagerImpl setContext(Context appContext) {
this.appContext = appContext;
return this;
}
@Override
public void resetConnectivityStateList() {
serverConnectivityList.clear();
for (ServerInfo serverInfo : ServerInfoImpl.getAllFromRealm()) {
serverConnectivityList.put(serverInfo.hostname, ServerConnectivity.STATE_DISCONNECTED);
}
}
@Override
public void keepAliveServer() {
RocketChatService.keepAlive(appContext);
if (serviceInterface == null) {
RocketChatService.bind(appContext, serviceConnection);
}
}
@Override
public void ensureConnections() {
for (String hostname : serverConnectivityList.keySet()) {
connectToServer(hostname); //force connect.
}
}
@Override
public void addOrUpdateServer(String hostname, @Nullable String name) {
ServerInfoImpl.addOrUpdate(hostname, name);
if (!serverConnectivityList.containsKey(hostname)) {
serverConnectivityList.put(hostname, ServerConnectivity.STATE_DISCONNECTED);
}
connectToServerIfNeeded(hostname);
}
@Override
public void removeServer(String hostname) {
ServerInfoImpl.remove(hostname);
if (serverConnectivityList.containsKey(hostname)) {
disconnectFromServerIfNeeded(hostname);
}
}
@Override
public Completable connect(String hostname) {
return connectToServerIfNeeded(hostname);
}
@Override
public List<ServerInfo> getServerList() {
return ServerInfoImpl.getAllFromRealm();
}
@Override
public ServerInfo getServerInfoForHost(String hostname) {
return ServerInfoImpl.getServerInfoForHost(hostname);
}
private List<ServerConnectivity> getCurrentConnectivityList() {
ArrayList<ServerConnectivity> list = new ArrayList<>();
for (Map.Entry<String, Integer> entry : serverConnectivityList.entrySet()) {
list.add(new ServerConnectivity(entry.getKey(), entry.getValue()));
}
return list;
}
@Override
public void notifyConnectionEstablished(String hostname, String session) {
ServerInfoImpl.updateSession(hostname, session);
serverConnectivityList.put(hostname, ServerConnectivity.STATE_CONNECTED);
connectivitySubject.onNext(
new ServerConnectivity(hostname, ServerConnectivity.STATE_CONNECTED));
}
@Override
public void notifyConnectionLost(String hostname, int reason) {
serverConnectivityList.put(hostname, ServerConnectivity.STATE_DISCONNECTED);
connectivitySubject.onNext(
new ServerConnectivity(hostname, ServerConnectivity.STATE_DISCONNECTED));
}
@Override
public Observable<ServerConnectivity> getServerConnectivityAsObservable() {
return Observable.concat(Observable.from(getCurrentConnectivityList()), connectivitySubject);
}
private Completable connectToServerIfNeeded(String hostname) {
final int connectivity = serverConnectivityList.get(hostname);
if (connectivity == ServerConnectivity.STATE_CONNECTED) {
return Completable.complete();
}
if (connectivity == ServerConnectivity.STATE_DISCONNECTING) {
return waitForDisconnected(hostname).andThen(connectToServerIfNeeded(hostname));
}
if (connectivity == ServerConnectivity.STATE_CONNECTING) {
return waitForConnected(hostname);
}
return connectToServer(hostname).retry(2);
}
private Completable disconnectFromServerIfNeeded(String hostname) {
final int connectivity = serverConnectivityList.get(hostname);
if (connectivity == ServerConnectivity.STATE_DISCONNECTED) {
return Completable.complete();
}
if (connectivity == ServerConnectivity.STATE_CONNECTING) {
return waitForConnected(hostname).andThen(disconnectFromServerIfNeeded(hostname));
}
if (connectivity == ServerConnectivity.STATE_DISCONNECTING) {
return waitForDisconnected(hostname);
}
return disconnectFromServer(hostname).retry(2);
}
private Completable waitForConnected(String hostname) {
return connectivitySubject
.filter(serverConnectivity -> (hostname.equals(serverConnectivity.hostname)
&& serverConnectivity.state == ServerConnectivity.STATE_CONNECTED))
.first()
.toCompletable();
}
private Completable waitForDisconnected(String hostname) {
return connectivitySubject
.filter(serverConnectivity -> (hostname.equals(serverConnectivity.hostname)
&& serverConnectivity.state == ServerConnectivity.STATE_DISCONNECTED))
.first()
.toCompletable();
}
private Completable connectToServer(String hostname) {
if (!serverConnectivityList.containsKey(hostname)) {
throw new IllegalArgumentException("hostname not found");
}
serverConnectivityList.put(hostname, ServerConnectivity.STATE_CONNECTING);
if (serviceInterface != null) {
return serviceInterface.ensureConnectionToServer(hostname);
} else {
return Completable.error(new IllegalStateException("not prepared"));
}
}
private Completable disconnectFromServer(String hostname) {
if (!serverConnectivityList.containsKey(hostname)) {
throw new IllegalArgumentException("hostname not found");
}
serverConnectivityList.put(hostname, ServerConnectivity.STATE_DISCONNECTING);
if (serviceInterface != null) {
return serviceInterface.disconnectFromServer(hostname);
} else {
return Completable.error(new IllegalStateException("not prepared"));
}
}
}
package chat.rocket.android.service;
import java.util.List;
/**
* interfaces used for RocketChatService and RocketChatwebSocketThread.
*/
/*package*/ interface ConnectivityManagerInternal {
int REASON_CLOSED_BY_USER = 101;
int REASON_NETWORK_ERROR = 102;
int REASON_SERVER_ERROR = 103;
int REASON_UNKNOWN = 104;
void resetConnectivityStateList();
void ensureConnections();
List<ServerInfo> getServerList();
ServerInfo getServerInfoForHost(String hostname);
void notifyConnectionEstablished(String hostname, String session);
void notifyConnectionLost(String hostname, int reason);
}
package chat.rocket.android.service;
import rx.Completable;
public interface ConnectivityServiceInterface {
Completable ensureConnectionToServer(String hostname);
Completable disconnectFromServer(String hostname);
}
...@@ -3,27 +3,31 @@ package chat.rocket.android.service; ...@@ -3,27 +3,31 @@ package chat.rocket.android.service;
import android.app.Service; import android.app.Service;
import android.content.Context; import android.content.Context;
import android.content.Intent; import android.content.Intent;
import android.content.ServiceConnection;
import android.os.Binder;
import android.os.IBinder; import android.os.IBinder;
import android.support.annotation.Nullable; import android.support.annotation.Nullable;
import io.realm.RealmResults;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.concurrent.TimeUnit;
import bolts.Task; import rx.Completable;
import chat.rocket.android.helper.LogcatIfError; import rx.Single;
import chat.rocket.android.model.ServerConfig;
import chat.rocket.android.realm_helper.RealmHelper;
import chat.rocket.android.realm_helper.RealmListObserver;
import chat.rocket.android.realm_helper.RealmStore;
/** /**
* Background service for Rocket.Chat.Application class. * Background service for Rocket.Chat.Application class.
*/ */
public class RocketChatService extends Service { public class RocketChatService extends Service implements ConnectivityServiceInterface {
private RealmHelper realmHelper; private ConnectivityManagerInternal connectivityManager;
private HashMap<String, RocketChatWebSocketThread> webSocketThreads; private HashMap<String, RocketChatWebSocketThread> webSocketThreads;
private RealmListObserver<ServerConfig> connectionRequiredServerConfigObserver;
public class LocalBinder extends Binder {
ConnectivityServiceInterface getServiceInterface() {
return RocketChatService.this;
}
}
private final LocalBinder localBinder = new LocalBinder();
/** /**
* ensure RocketChatService alive. * ensure RocketChatService alive.
...@@ -32,111 +36,73 @@ public class RocketChatService extends Service { ...@@ -32,111 +36,73 @@ public class RocketChatService extends Service {
context.startService(new Intent(context, RocketChatService.class)); context.startService(new Intent(context, RocketChatService.class));
} }
public static void bind(Context context, ServiceConnection serviceConnection) {
context.bindService(
new Intent(context, RocketChatService.class), serviceConnection, Context.BIND_AUTO_CREATE);
}
public static void unbind(Context context, ServiceConnection serviceConnection) {
context.unbindService(serviceConnection);
}
@Override @Override
public void onCreate() { public void onCreate() {
super.onCreate(); super.onCreate();
webSocketThreads = new HashMap<>();
realmHelper = RealmStore.getDefault();
connectionRequiredServerConfigObserver = realmHelper
.createListObserver(realm -> realm.where(ServerConfig.class)
.isNotNull(ServerConfig.HOSTNAME)
.equalTo(ServerConfig.STATE, ServerConfig.STATE_READY)
.findAll())
.setOnUpdateListener(this::connectToServerWithServerConfig);
refreshServerConfigState();
}
private void refreshServerConfigState() { connectivityManager = ConnectivityManager.getInstanceForInternal(getApplicationContext());
realmHelper.executeTransaction(realm -> { connectivityManager.resetConnectivityStateList();
RealmResults<ServerConfig> configs = realm.where(ServerConfig.class) webSocketThreads = new HashMap<>();
.notEqualTo(ServerConfig.STATE, ServerConfig.STATE_READY)
.findAll();
for (ServerConfig config : configs) {
config.setState(ServerConfig.STATE_READY);
}
return null;
}).continueWith(new LogcatIfError());
;
} }
@Override @Override
public int onStartCommand(Intent intent, int flags, int startId) { public int onStartCommand(Intent intent, int flags, int startId) {
List<ServerConfig> configs = realmHelper.executeTransactionForReadResults(realm -> connectivityManager.ensureConnections();
realm.where(ServerConfig.class)
.equalTo(ServerConfig.STATE, ServerConfig.STATE_CONNECTED)
.findAll());
for (ServerConfig config : configs) {
String serverConfigId = config.getServerConfigId();
if (webSocketThreads.containsKey(serverConfigId)) {
RocketChatWebSocketThread thread = webSocketThreads.get(serverConfigId);
if (thread != null) {
thread.keepAlive();
}
}
}
realmHelper.executeTransaction(realm -> {
RealmResults<ServerConfig> targetConfigs = realm
.where(ServerConfig.class)
.beginGroup()
.equalTo(ServerConfig.STATE, ServerConfig.STATE_CONNECTION_ERROR)
.or()
.isNotNull(ServerConfig.ERROR)
.endGroup()
.isNotNull(ServerConfig.SESSION)
.findAll();
for (ServerConfig config : targetConfigs) {
config.setState(ServerConfig.STATE_READY);
config.setError(null);
}
return null;
}).onSuccessTask(task -> {
connectionRequiredServerConfigObserver.sub();
return null;
});
return START_NOT_STICKY; return START_NOT_STICKY;
} }
private void connectToServerWithServerConfig(List<ServerConfig> configList) { @Override
if (configList.isEmpty()) { public Completable ensureConnectionToServer(String hostname) { //called via binder.
return; return getOrCreateWebSocketThread(hostname)
} .doOnError(err -> {
webSocketThreads.remove(hostname);
ServerConfig config = configList.get(0); connectivityManager.notifyConnectionLost(hostname, ConnectivityManagerInternal.REASON_NETWORK_ERROR);
final String serverConfigId = config.getServerConfigId(); })
ServerConfig.updateState(serverConfigId, ServerConfig.STATE_CONNECTING) .flatMapCompletable(webSocketThreads -> webSocketThreads.keepAlive());
.onSuccessTask(task -> createWebSocketThread(config))
.onSuccessTask(task -> {
RocketChatWebSocketThread thread = task.getResult();
if (thread != null) {
thread.keepAlive();
}
return ServerConfig.updateState(serverConfigId, ServerConfig.STATE_CONNECTED);
}).continueWith(new LogcatIfError());
} }
private Task<RocketChatWebSocketThread> createWebSocketThread(final ServerConfig config) { @Override
final String serverConfigId = config.getServerConfigId(); public Completable disconnectFromServer(String hostname) { //called via binder.
webSocketThreads.put(serverConfigId, null); if (!webSocketThreads.containsKey(hostname)) {
return RocketChatWebSocketThread.getStarted(getApplicationContext(), config) return Completable.complete();
.onSuccessTask(task -> { }
webSocketThreads.put(serverConfigId, task.getResult());
return task; RocketChatWebSocketThread thread = webSocketThreads.get(hostname);
}); if (thread != null) {
return thread.terminate();
} else {
return Completable.timer(1, TimeUnit.SECONDS).andThen(disconnectFromServer(hostname));
}
} }
@Override private Single<RocketChatWebSocketThread> getOrCreateWebSocketThread(String hostname) {
public void onDestroy() { if (webSocketThreads.containsKey(hostname)) {
if (connectionRequiredServerConfigObserver != null) { RocketChatWebSocketThread thread = webSocketThreads.get(hostname);
connectionRequiredServerConfigObserver.unsub(); if (thread != null) {
return Single.just(thread);
} else {
return Completable.timer(1, TimeUnit.SECONDS).andThen(getOrCreateWebSocketThread(hostname));
}
} }
super.onDestroy(); webSocketThreads.put(hostname, null);
return RocketChatWebSocketThread.getStarted(getApplicationContext(), hostname)
.doOnSuccess(thread -> {
webSocketThreads.put(hostname, thread);
});
} }
@Nullable @Nullable
@Override @Override
public IBinder onBind(Intent intent) { public IBinder onBind(Intent intent) {
return null; return localBinder;
} }
} }
...@@ -8,15 +8,12 @@ import org.json.JSONObject; ...@@ -8,15 +8,12 @@ import org.json.JSONObject;
import java.lang.reflect.Constructor; import java.lang.reflect.Constructor;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Iterator; import java.util.Iterator;
import bolts.Continuation;
import bolts.Task; import bolts.Task;
import bolts.TaskCompletionSource;
import chat.rocket.android.api.DDPClientWrapper; import chat.rocket.android.api.DDPClientWrapper;
import chat.rocket.android.api.MethodCallHelper; import chat.rocket.android.api.MethodCallHelper;
import chat.rocket.android.helper.LogcatIfError; import chat.rocket.android.helper.LogcatIfError;
import chat.rocket.android.helper.TextUtils; import chat.rocket.android.helper.TextUtils;
import chat.rocket.android.log.RCLog; import chat.rocket.android.log.RCLog;
import chat.rocket.android.model.ServerConfig;
import chat.rocket.android.model.internal.Session; import chat.rocket.android.model.internal.Session;
import chat.rocket.android.realm_helper.RealmHelper; import chat.rocket.android.realm_helper.RealmHelper;
import chat.rocket.android.realm_helper.RealmStore; import chat.rocket.android.realm_helper.RealmStore;
...@@ -34,8 +31,9 @@ import chat.rocket.android.service.observer.NewMessageObserver; ...@@ -34,8 +31,9 @@ import chat.rocket.android.service.observer.NewMessageObserver;
import chat.rocket.android.service.observer.PushSettingsObserver; import chat.rocket.android.service.observer.PushSettingsObserver;
import chat.rocket.android.service.observer.SessionObserver; import chat.rocket.android.service.observer.SessionObserver;
import chat.rocket.android.service.observer.TokenLoginObserver; import chat.rocket.android.service.observer.TokenLoginObserver;
import chat.rocket.android_ddp.DDPClientCallback;
import hugo.weaving.DebugLog; import hugo.weaving.DebugLog;
import rx.Completable;
import rx.Single;
/** /**
* Thread for handling WebSocket connection. * Thread for handling WebSocket connection.
...@@ -58,50 +56,40 @@ public class RocketChatWebSocketThread extends HandlerThread { ...@@ -58,50 +56,40 @@ public class RocketChatWebSocketThread extends HandlerThread {
GcmPushRegistrationObserver.class GcmPushRegistrationObserver.class
}; };
private final Context appContext; private final Context appContext;
private final String serverConfigId; private final String hostname;
private final RealmHelper defaultRealm;
private final RealmHelper serverConfigRealm; private final RealmHelper serverConfigRealm;
private final ConnectivityManagerInternal connectivityManager;
private final ArrayList<Registrable> listeners = new ArrayList<>(); private final ArrayList<Registrable> listeners = new ArrayList<>();
private DDPClientWrapper ddpClient; private DDPClientWrapper ddpClient;
private boolean listenersRegistered; private boolean listenersRegistered;
private RocketChatWebSocketThread(Context appContext, String serverConfigId) { private RocketChatWebSocketThread(Context appContext, String hostname) {
super("RC_thread_" + serverConfigId); super("RC_thread_" + hostname);
this.appContext = appContext; this.appContext = appContext;
this.serverConfigId = serverConfigId; this.hostname = hostname;
defaultRealm = RealmStore.getDefault(); this.serverConfigRealm = RealmStore.getOrCreate(hostname);
serverConfigRealm = RealmStore.getOrCreate(serverConfigId); this.connectivityManager = ConnectivityManager.getInstanceForInternal(appContext);
} }
/** /**
* create new Thread. * create new Thread.
*/ */
@DebugLog @DebugLog
public static Task<RocketChatWebSocketThread> getStarted(Context appContext, public static Single<RocketChatWebSocketThread> getStarted(Context appContext, String hostname) {
ServerConfig config) { return Single.<RocketChatWebSocketThread>fromEmitter(objectSingleEmitter -> {
TaskCompletionSource<RocketChatWebSocketThread> task = new TaskCompletionSource<>(); new RocketChatWebSocketThread(appContext, hostname) {
new RocketChatWebSocketThread(appContext, config.getServerConfigId()) { @Override
@Override protected void onLooperPrepared() {
protected void onLooperPrepared() { try {
try { super.onLooperPrepared();
super.onLooperPrepared(); objectSingleEmitter.onSuccess(this);
task.setResult(this); } catch (Exception exception) {
} catch (Exception exception) { objectSingleEmitter.onError(exception);
task.setError(exception); }
} }
} }.start();
}.start(); }).flatMap(webSocket ->
return task.getTask() webSocket.connect().andThen(Single.just(webSocket)));
.onSuccessTask(_task ->
_task.getResult().connect().onSuccessTask(__task -> _task));
}
/**
* destroy the thread.
*/
@DebugLog
public static void destroy(RocketChatWebSocketThread thread) {
thread.quit();
} }
@Override @Override
...@@ -123,102 +111,115 @@ public class RocketChatWebSocketThread extends HandlerThread { ...@@ -123,102 +111,115 @@ public class RocketChatWebSocketThread extends HandlerThread {
}).continueWith(new LogcatIfError()); }).continueWith(new LogcatIfError());
} }
@Override /**
public boolean quit() { * terminate WebSocket thread.
*/
@DebugLog
public Completable terminate() {
if (isAlive()) { if (isAlive()) {
new Handler(getLooper()).post(() -> { return Completable.fromEmitter(completableEmitter -> {
RCLog.d("thread %s: quit()", Thread.currentThread().getId()); new Handler(getLooper()).post(() -> {
unregisterListeners(); RCLog.d("thread %s: terminated()", Thread.currentThread().getId());
RocketChatWebSocketThread.super.quit(); unregisterListeners();
connectivityManager.notifyConnectionLost(hostname,
ConnectivityManagerInternal.REASON_CLOSED_BY_USER);
RocketChatWebSocketThread.super.quit();
completableEmitter.onCompleted();
});
}); });
return true;
} else { } else {
return super.quit(); connectivityManager.notifyConnectionLost(hostname,
ConnectivityManagerInternal.REASON_NETWORK_ERROR);
super.quit();
return Completable.complete();
} }
} }
/**
* THIS METHOD THROWS EXCEPTION!!
* Use terminate() instead!!
*/
@Deprecated public final boolean quit() {
throw new UnsupportedOperationException();
}
/** /**
* synchronize the state of the thread with ServerConfig. * synchronize the state of the thread with ServerConfig.
*/ */
@DebugLog @DebugLog
public void keepAlive() { public Completable keepAlive() {
if (ddpClient == null || !ddpClient.isConnected()) { return checkIfConnectionAlive()
defaultRealm.executeTransaction(realm -> { .flatMapCompletable(alive -> alive ? Completable.complete() : connect());
ServerConfig config = realm.where(ServerConfig.class)
.equalTo(ServerConfig.ID, serverConfigId)
.findFirst();
if (config != null && config.getState() == ServerConfig.STATE_CONNECTED) {
config.setState(ServerConfig.STATE_READY);
quit();
}
return null;
});
}
} }
private void prepareWebSocket(String hostname) { private Single<Boolean> checkIfConnectionAlive() {
if (ddpClient == null || !ddpClient.isConnected()) { if (ddpClient == null || !ddpClient.isConnected()) {
ddpClient = DDPClientWrapper.create(hostname); return Single.just(false);
} }
return Single.fromEmitter(booleanSingleEmitter -> {
ddpClient.ping().continueWith(task -> {
booleanSingleEmitter.onSuccess(!task.isFaulted());
return null;
});
});
} }
@DebugLog private Completable prepareDDPClient() {
private Task<Void> connect() { return checkIfConnectionAlive()
final ServerConfig config = defaultRealm.executeTransactionForRead(realm -> .doOnSuccess(alive -> {
realm.where(ServerConfig.class).equalTo(ServerConfig.ID, serverConfigId).findFirst()); if (!alive) {
ddpClient = DDPClientWrapper.create(hostname);
}
})
.toCompletable();
}
prepareWebSocket(config.getHostname()); private Completable connectDDPClient() {
return ddpClient.connect(config.getSession(), config.usesSecureConnection()) return prepareDDPClient()
.onSuccessTask(task -> { .andThen(Completable.fromEmitter(completableEmitter -> {
final String session = task.getResult().session; ServerInfo info = connectivityManager.getServerInfoForHost(hostname);
defaultRealm.executeTransaction(realm -> ddpClient.connect(info.session, !info.insecure)
realm.createOrUpdateObjectFromJson(ServerConfig.class, new JSONObject() .onSuccessTask(task -> {
.put("serverConfigId", serverConfigId) final String newSession = task.getResult().session;
.put("session", session)) connectivityManager.notifyConnectionEstablished(hostname, newSession);
).onSuccess(_task -> serverConfigRealm.executeTransaction(realm -> {
Session sessionObj = Session.queryDefaultSession(realm).findFirst();
if (sessionObj == null) { // handling WebSocket#onClose() callback.
realm.createOrUpdateObjectFromJson(Session.class, task.getResult().client.getOnCloseCallback().onSuccess(_task -> {
new JSONObject().put("sessionId", Session.DEFAULT_ID)); if (listenersRegistered) {
} terminate();
return null; }
})).continueWith(new LogcatIfError()); return null;
return task; });
})
.onSuccess(new Continuation<DDPClientCallback.Connect, Void>() {
// TODO type detection doesn't work due to retrolambda's bug...
@Override
public Void then(Task<DDPClientCallback.Connect> task)
throws Exception {
fetchPublicSettings();
registerListeners();
// handling WebSocket#onClose() callback. return serverConfigRealm.executeTransaction(realm -> {
task.getResult().client.getOnCloseCallback().onSuccess(_task -> { Session sessionObj = Session.queryDefaultSession(realm).findFirst();
quit(); if (sessionObj == null) {
return null; realm.createOrUpdateObjectFromJson(Session.class,
}).continueWithTask(_task -> { new JSONObject().put(Session.ID, Session.DEFAULT_ID));
if (_task.isFaulted()) { }
ServerConfig.logConnectionError(serverConfigId, _task.getError()); return null;
} });
return _task; })
}); .continueWith(task -> {
if (task.isFaulted()) {
completableEmitter.onError(task.getError());
} else {
completableEmitter.onCompleted();
}
return null;
});
}));
}
return null; @DebugLog
} private Completable connect() {
}) return connectDDPClient()
.continueWithTask(task -> { .andThen(Completable.fromEmitter(completableEmitter -> {
if (task.isFaulted()) { fetchPublicSettings();
Exception error = task.getError(); registerListeners();
if (error instanceof DDPClientCallback.Connect.Timeout) { completableEmitter.onCompleted();
ServerConfig.logConnectionError(serverConfigId, new Exception("Connection Timeout")); }));
} else {
ServerConfig.logConnectionError(serverConfigId, task.getError());
}
}
return task;
});
} }
private Task<Void> fetchPublicSettings() { private Task<Void> fetchPublicSettings() {
...@@ -227,7 +228,7 @@ public class RocketChatWebSocketThread extends HandlerThread { ...@@ -227,7 +228,7 @@ public class RocketChatWebSocketThread extends HandlerThread {
//@DebugLog //@DebugLog
private void registerListeners() { private void registerListeners() {
if (!Thread.currentThread().getName().equals("RC_thread_" + serverConfigId)) { if (!Thread.currentThread().getName().equals("RC_thread_" + hostname)) {
// execute in Looper. // execute in Looper.
new Handler(getLooper()).post(this::registerListeners); new Handler(getLooper()).post(this::registerListeners);
return; return;
...@@ -238,10 +239,6 @@ public class RocketChatWebSocketThread extends HandlerThread { ...@@ -238,10 +239,6 @@ public class RocketChatWebSocketThread extends HandlerThread {
} }
listenersRegistered = true; listenersRegistered = true;
final ServerConfig config = defaultRealm.executeTransactionForRead(realm ->
realm.where(ServerConfig.class).equalTo(ServerConfig.ID, serverConfigId).findFirst());
final String hostname = config.getHostname();
for (Class clazz : REGISTERABLE_CLASSES) { for (Class clazz : REGISTERABLE_CLASSES) {
try { try {
Constructor ctor = clazz.getConstructor(Context.class, String.class, RealmHelper.class, Constructor ctor = clazz.getConstructor(Context.class, String.class, RealmHelper.class,
...@@ -261,20 +258,16 @@ public class RocketChatWebSocketThread extends HandlerThread { ...@@ -261,20 +258,16 @@ public class RocketChatWebSocketThread extends HandlerThread {
@DebugLog @DebugLog
private void unregisterListeners() { private void unregisterListeners() {
if (!listenersRegistered) {
return;
}
Iterator<Registrable> iterator = listeners.iterator(); Iterator<Registrable> iterator = listeners.iterator();
while (iterator.hasNext()) { while (iterator.hasNext()) {
Registrable registrable = iterator.next(); Registrable registrable = iterator.next();
registrable.unregister(); registrable.unregister();
iterator.remove(); iterator.remove();
} }
listenersRegistered = false;
if (ddpClient != null) { if (ddpClient != null) {
ddpClient.close(); ddpClient.close();
ddpClient = null; ddpClient = null;
} }
listenersRegistered = false;
} }
} }
package chat.rocket.android.service;
/**
* pair with server's hostname and its connectivity state.
*/
public class ServerConnectivity {
public static final int STATE_CONNECTED = 1;
public static final int STATE_DISCONNECTED = 2;
/*package*/ static final int STATE_CONNECTING = 3;
/*package*/ static final int STATE_DISCONNECTING = 4;
public final String hostname;
public final int state;
public ServerConnectivity(String hostname, int state) {
this.hostname = hostname;
this.state = state;
}
}
package chat.rocket.android.service;
/**
* Stores information just for required for initializing connectivity manager.
*/
public class ServerInfo {
public final String hostname;
public final String name;
/*package*/ final String session;
public final boolean insecure;
public ServerInfo(String hostname, String name, String session, boolean insecure) {
this.hostname = hostname;
this.name = name;
this.session = session;
this.insecure = insecure;
}
}
package chat.rocket.android.service;
import android.support.annotation.Nullable;
import android.text.TextUtils;
import io.realm.RealmObject;
import io.realm.annotations.PrimaryKey;
import org.json.JSONObject;
import java.util.ArrayList;
import java.util.List;
import chat.rocket.android.realm_helper.RealmHelper;
import chat.rocket.android.realm_helper.RealmStore;
/**
* Backend implementation to store ServerInfo.
*/
public class ServerInfoImpl extends RealmObject {
private static final String DB_NAME = "serverlist";
@PrimaryKey private String hostname;
private String name;
private String session;
private boolean insecure;
interface ColumnName {
String HOSTNAME = "hostname";
String NAME = "name";
String SESSION = "session";
String INSECURE = "insecure";
}
ServerInfo getServerInfo() {
return new ServerInfo(hostname, name, session, insecure);
}
static RealmHelper getRealm() {
return RealmStore.getOrCreate(DB_NAME);
}
static void addOrUpdate(String hostname, String name) {
getRealm().executeTransaction(realm ->
realm.createOrUpdateObjectFromJson(ServerInfoImpl.class, new JSONObject()
.put(ColumnName.HOSTNAME, hostname)
.put(ColumnName.NAME, TextUtils.isEmpty(name) ? JSONObject.NULL : name)));
}
static void remove(String hostname) {
getRealm().executeTransaction(realm -> {
realm.where(ServerInfoImpl.class).equalTo(ColumnName.HOSTNAME, hostname)
.findAll()
.deleteAllFromRealm();
return null;
});
}
static void updateSession(String hostname, String session) {
ServerInfoImpl impl = getRealm().executeTransactionForRead(realm ->
realm.where(ServerInfoImpl.class).equalTo(ColumnName.HOSTNAME, hostname).findFirst());
if (impl != null) {
impl.session = session;
getRealm().executeTransaction(realm -> {
realm.copyToRealmOrUpdate(impl);
return null;
});
}
}
static @Nullable ServerInfo getServerInfoForHost(String hostname) {
ServerInfoImpl impl = getRealm().executeTransactionForRead(realm ->
realm.where(ServerInfoImpl.class).equalTo(ColumnName.HOSTNAME, hostname).findFirst());
return impl == null ? null : impl.getServerInfo();
}
static void setInsecure(String hostname, boolean insecure) {
ServerInfoImpl impl = getRealm().executeTransactionForRead(realm ->
realm.where(ServerInfoImpl.class).equalTo(ColumnName.HOSTNAME, hostname).findFirst());
if (impl != null) {
impl.insecure = insecure;
getRealm().executeTransaction(realm -> {
realm.copyToRealmOrUpdate(impl);
return null;
});
}
}
static List<ServerInfo> getAllFromRealm() {
List<ServerInfoImpl> results = getRealm().executeTransactionForReadResults(realm ->
realm.where(ServerInfoImpl.class).findAll());
ArrayList<ServerInfo> list = new ArrayList<>();
for (ServerInfoImpl impl : results) {
list.add(impl.getServerInfo());
}
return list;
}
}
...@@ -13,7 +13,7 @@ ext { ...@@ -13,7 +13,7 @@ ext {
supportAppCompat = "com.android.support:appcompat-v7:$supportVersion" supportAppCompat = "com.android.support:appcompat-v7:$supportVersion"
supportDesign = "com.android.support:design:$supportVersion" supportDesign = "com.android.support:design:$supportVersion"
rxJava = 'io.reactivex:rxjava:1.2.2' rxJava = 'io.reactivex:rxjava:1.2.3'
boltsTask = 'com.parse.bolts:bolts-tasks:1.4.0' boltsTask = 'com.parse.bolts:bolts-tasks:1.4.0'
okhttp3 = 'com.squareup.okhttp3:okhttp:3.5.0' okhttp3 = 'com.squareup.okhttp3:okhttp:3.5.0'
picasso = 'com.squareup.picasso:picasso:2.5.2' picasso = 'com.squareup.picasso:picasso:2.5.2'
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment