Commit 9dab2bf7 authored by Rafael Kellermann Streit's avatar Rafael Kellermann Streit Committed by GitHub

Merge branch 'develop' into iss321

parents e38ca8d4 388a4f9c
package chat.rocket.android_ddp; package chat.rocket.android_ddp;
import android.support.annotation.Nullable; import android.support.annotation.Nullable;
import io.reactivex.Flowable;
import org.json.JSONArray; import org.json.JSONArray;
import bolts.Task; import bolts.Task;
import bolts.TaskCompletionSource; import bolts.TaskCompletionSource;
import chat.rocket.android_ddp.rx.RxWebSocketCallback; import chat.rocket.android_ddp.rx.RxWebSocketCallback;
import io.reactivex.Flowable;
import io.reactivex.Maybe;
import okhttp3.OkHttpClient; import okhttp3.OkHttpClient;
public class DDPClient { public class DDPClient {
...@@ -34,6 +36,10 @@ public class DDPClient { ...@@ -34,6 +36,10 @@ public class DDPClient {
return task.getTask(); return task.getTask();
} }
public Maybe<DDPClientCallback.Base> doPing(@Nullable String id) {
return impl.ping(id);
}
public Task<DDPClientCallback.RPC> rpc(String method, JSONArray params, String id, public Task<DDPClientCallback.RPC> rpc(String method, JSONArray params, String id,
long timeoutMs) { long timeoutMs) {
TaskCompletionSource<DDPClientCallback.RPC> task = new TaskCompletionSource<>(); TaskCompletionSource<DDPClientCallback.RPC> task = new TaskCompletionSource<>();
......
package chat.rocket.android_ddp; package chat.rocket.android_ddp;
import android.support.annotation.NonNull;
import android.support.annotation.Nullable; import android.support.annotation.Nullable;
import org.json.JSONObject; import org.json.JSONObject;
...@@ -53,6 +54,15 @@ public class DDPClientCallback { ...@@ -53,6 +54,15 @@ public class DDPClientCallback {
this.id = id; this.id = id;
} }
public static class UnMatched extends Base {
@NonNull public String id;
public UnMatched(DDPClient client, @NonNull String id) {
super(client);
this.id = id;
}
}
public static class Timeout extends BaseException { public static class Timeout extends BaseException {
public Timeout(DDPClient client) { public Timeout(DDPClient client) {
super(Timeout.class, client); super(Timeout.class, client);
......
...@@ -17,6 +17,7 @@ import chat.rocket.android.log.RCLog; ...@@ -17,6 +17,7 @@ import chat.rocket.android.log.RCLog;
import chat.rocket.android_ddp.rx.RxWebSocket; import chat.rocket.android_ddp.rx.RxWebSocket;
import chat.rocket.android_ddp.rx.RxWebSocketCallback; import chat.rocket.android_ddp.rx.RxWebSocketCallback;
import io.reactivex.Flowable; import io.reactivex.Flowable;
import io.reactivex.Maybe;
import io.reactivex.disposables.CompositeDisposable; import io.reactivex.disposables.CompositeDisposable;
import okhttp3.OkHttpClient; import okhttp3.OkHttpClient;
...@@ -106,40 +107,75 @@ public class DDPClientImpl { ...@@ -106,40 +107,75 @@ public class DDPClientImpl {
} }
} }
public void ping(final TaskCompletionSource<DDPClientCallback.Ping> task, public Maybe<DDPClientCallback.Base> ping(@Nullable final String id) {
@Nullable final String id) {
final boolean requested = (TextUtils.isEmpty(id)) ? final boolean requested = (TextUtils.isEmpty(id)) ?
sendMessage("ping", null) : sendMessage("ping", null) :
sendMessage("ping", json -> json.put("id", id)); sendMessage("ping", json -> json.put("id", id));
if (requested) { if (requested) {
CompositeDisposable disposables = new CompositeDisposable(); return flowable.filter(callback -> callback instanceof RxWebSocketCallback.Message)
disposables.add(
flowable.filter(callback -> callback instanceof RxWebSocketCallback.Message)
.timeout(8, TimeUnit.SECONDS)
.map(callback -> ((RxWebSocketCallback.Message) callback).responseBodyString) .map(callback -> ((RxWebSocketCallback.Message) callback).responseBodyString)
.map(DDPClientImpl::toJson) .map(DDPClientImpl::toJson)
.subscribe( .filter(response -> "pong".equalsIgnoreCase(extractMsg(response)))
response -> { .doOnError(error -> {
String msg = extractMsg(response); RCLog.e(error, "Heartbeat ping[%s] xxx failed xxx", id);
if ("pong".equals(msg)) { })
if (response.isNull("id")) { .map(response -> {
task.setResult(new DDPClientCallback.Ping(client, null)); String msg = extractMsg(response);
disposables.clear(); if ("pong".equals(msg)) {
} else { RCLog.d("pong[%s] <", id);
String _id = response.optString("id"); if (response.isNull("id")) {
if (id.equals(_id)) { return new DDPClientCallback.Ping(client, null);
task.setResult(new DDPClientCallback.Ping(client, id)); } else {
disposables.clear(); String _id = response.optString("id");
} if (id.equals(_id)) {
} return new DDPClientCallback.Ping(client, _id);
disposables.clear(); } else {
return new DDPClientCallback.Ping.UnMatched(client, _id);
} }
}, }
err -> task.setError(new DDPClientCallback.Ping.Timeout(client)) }
) // if we receive anything other than a pong throw an exception
throw new DDPClientCallback.RPC.Error(client, id, response);
}).firstElement();
} else {
return Maybe.error(new DDPClientCallback.Closed(client));
}
}
public void ping(final TaskCompletionSource<DDPClientCallback.Ping> task,
@Nullable final String id) {
final boolean requested = (TextUtils.isEmpty(id)) ?
sendMessage("ping", null) :
sendMessage("ping", json -> json.put("id", id));
if (requested) {
CompositeDisposable disposables = new CompositeDisposable();
disposables.add(
flowable.filter(callback -> callback instanceof RxWebSocketCallback.Message)
.timeout(8, TimeUnit.SECONDS)
.map(callback -> ((RxWebSocketCallback.Message) callback).responseBodyString)
.map(DDPClientImpl::toJson)
.subscribe(
response -> {
String msg = extractMsg(response);
if ("pong".equals(msg)) {
if (response.isNull("id")) {
task.setResult(new DDPClientCallback.Ping(client, null));
} else {
String _id = response.optString("id");
if (id.equals(_id)) {
task.setResult(new DDPClientCallback.Ping(client, id));
}
}
disposables.clear();
}
},
err -> task.setError(new DDPClientCallback.Ping.Timeout(client))
)
); );
addErrorCallback(disposables, task); addErrorCallback(disposables, task);
...@@ -368,12 +404,11 @@ public class DDPClientImpl { ...@@ -368,12 +404,11 @@ public class DDPClientImpl {
try { try {
JSONObject origJson = new JSONObject().put("msg", msg); JSONObject origJson = new JSONObject().put("msg", msg);
String msg2 = (json == null ? origJson : json.create(origJson)).toString(); String msg2 = (json == null ? origJson : json.create(origJson)).toString();
websocket.sendText(msg2); return websocket.sendText(msg2);
} catch (Exception e) { } catch (Exception e) {
RCLog.e(e); RCLog.e(e);
return false; return false;
} }
return true; // ignore exception here.
} }
private void sendMessage(String msg, @Nullable JSONBuilder json, private void sendMessage(String msg, @Nullable JSONBuilder json,
...@@ -387,6 +422,9 @@ public class DDPClientImpl { ...@@ -387,6 +422,9 @@ public class DDPClientImpl {
disposables.add( disposables.add(
flowable.subscribe( flowable.subscribe(
base -> { base -> {
if (base instanceof RxWebSocketCallback.Close) {
task.trySetError(new Exception(((RxWebSocketCallback.Close) base).reason));
}
}, },
err -> { err -> {
task.trySetError(new Exception(err)); task.trySetError(new Exception(err));
......
...@@ -62,14 +62,20 @@ public class RxWebSocket { ...@@ -62,14 +62,20 @@ public class RxWebSocket {
} }
}), }),
BackpressureStrategy.BUFFER BackpressureStrategy.BUFFER
).delay(4, TimeUnit.SECONDS).publish(); ).publish();
} }
public boolean sendText(String message) throws IOException { public boolean sendText(String message) throws IOException {
if (webSocket == null) {
return false;
}
return webSocket.send(message); return webSocket.send(message);
} }
public boolean close(int code, String reason) throws IOException { public boolean close(int code, String reason) throws IOException {
if (webSocket == null) {
return false;
}
return webSocket.close(code, reason); return webSocket.close(code, reason);
} }
} }
...@@ -142,9 +142,15 @@ dependencies { ...@@ -142,9 +142,15 @@ dependencies {
compile "com.github.hotchemi:permissionsdispatcher:$permissionsdispatcherVersion" compile "com.github.hotchemi:permissionsdispatcher:$permissionsdispatcherVersion"
annotationProcessor "com.github.hotchemi:permissionsdispatcher-processor:$permissionsdispatcherVersion" annotationProcessor "com.github.hotchemi:permissionsdispatcher-processor:$permissionsdispatcherVersion"
compile('com.crashlytics.sdk.android:crashlytics:2.6.8@aar') { compile('com.crashlytics.sdk.android:crashlytics:2.6.8@aar') {
transitive = true; transitive = true;
} }
provided 'com.parse.bolts:bolts-tasks:1.4.0'
provided 'io.reactivex.rxjava2:rxjava:2.1.0'
provided 'io.reactivex:rxjava:1.3.0'
provided "com.github.akarnokd:rxjava2-interop:0.10.2"
} }
apply plugin: 'com.google.gms.google-services' apply plugin: 'com.google.gms.google-services'
...@@ -69,8 +69,18 @@ ...@@ -69,8 +69,18 @@
<action android:name="com.google.android.gms.iid.InstanceID"/> <action android:name="com.google.android.gms.iid.InstanceID"/>
</intent-filter> </intent-filter>
</service> </service>
<service android:name=".service.TaskService"
android:permission="com.google.android.gms.permission.BIND_NETWORK_TASK_SERVICE"
android:exported="true">
<intent-filter>
<action android:name="com.google.android.gms.gcm.ACTION_TASK_READY"/>
</intent-filter>
</service>
<meta-data <meta-data
android:name="io.fabric.ApiKey" android:name="io.fabric.ApiKey"
android:value="12ac6e94f850aaffcdff52001af77ca415d06a43" /> android:value="12ac6e94f850aaffcdff52001af77ca415d06a43" />
</application> </application>
</manifest> </manifest>
\ No newline at end of file
...@@ -3,11 +3,13 @@ package chat.rocket.android; ...@@ -3,11 +3,13 @@ package chat.rocket.android;
import android.content.Context; import android.content.Context;
import android.content.SharedPreferences; import android.content.SharedPreferences;
import io.reactivex.BackpressureStrategy; import com.hadisatrio.optional.Optional;
import io.reactivex.Flowable;
import java.util.UUID; import java.util.UUID;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
/** /**
* sharedpreference-based cache. * sharedpreference-based cache.
*/ */
...@@ -51,11 +53,11 @@ public class RocketChatCache { ...@@ -51,11 +53,11 @@ public class RocketChatCache {
return preferences.getString(KEY_PUSH_ID, null); return preferences.getString(KEY_PUSH_ID, null);
} }
public Flowable<String> getSelectedServerHostnamePublisher() { public Flowable<Optional<String>> getSelectedServerHostnamePublisher() {
return getValuePublisher(KEY_SELECTED_SERVER_HOSTNAME); return getValuePublisher(KEY_SELECTED_SERVER_HOSTNAME);
} }
public Flowable<String> getSelectedRoomIdPublisher() { public Flowable<Optional<String>> getSelectedRoomIdPublisher() {
return getValuePublisher(KEY_SELECTED_ROOM_ID); return getValuePublisher(KEY_SELECTED_ROOM_ID);
} }
...@@ -75,12 +77,13 @@ public class RocketChatCache { ...@@ -75,12 +77,13 @@ public class RocketChatCache {
getEditor().putString(key, value).apply(); getEditor().putString(key, value).apply();
} }
private Flowable<String> getValuePublisher(final String key) { private Flowable<Optional<String>> getValuePublisher(final String key) {
return Flowable.create(emitter -> { return Flowable.create(emitter -> {
SharedPreferences.OnSharedPreferenceChangeListener SharedPreferences.OnSharedPreferenceChangeListener
listener = (sharedPreferences, changedKey) -> { listener = (sharedPreferences, changedKey) -> {
if (key.equals(changedKey) && !emitter.isCancelled()) { if (key.equals(changedKey) && !emitter.isCancelled()) {
emitter.onNext(getString(key, null)); String value = getString(key, null);
emitter.onNext(Optional.of(value));
} }
}; };
......
...@@ -4,6 +4,8 @@ import android.content.Intent; ...@@ -4,6 +4,8 @@ import android.content.Intent;
import android.os.Bundle; import android.os.Bundle;
import android.support.annotation.Nullable; import android.support.annotation.Nullable;
import com.hadisatrio.optional.Optional;
import java.util.List; import java.util.List;
import chat.rocket.android.LaunchUtil; import chat.rocket.android.LaunchUtil;
...@@ -179,6 +181,7 @@ abstract class AbstractAuthedActivity extends AbstractFragmentActivity { ...@@ -179,6 +181,7 @@ abstract class AbstractAuthedActivity extends AbstractFragmentActivity {
private void subscribeToConfigChanges() { private void subscribeToConfigChanges() {
compositeDisposable.add( compositeDisposable.add(
rocketChatCache.getSelectedServerHostnamePublisher() rocketChatCache.getSelectedServerHostnamePublisher()
.map(Optional::get)
.distinctUntilChanged() .distinctUntilChanged()
.subscribeOn(Schedulers.io()) .subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread()) .observeOn(AndroidSchedulers.mainThread())
...@@ -190,6 +193,7 @@ abstract class AbstractAuthedActivity extends AbstractFragmentActivity { ...@@ -190,6 +193,7 @@ abstract class AbstractAuthedActivity extends AbstractFragmentActivity {
compositeDisposable.add( compositeDisposable.add(
rocketChatCache.getSelectedRoomIdPublisher() rocketChatCache.getSelectedRoomIdPublisher()
.map(Optional::get)
.distinctUntilChanged() .distinctUntilChanged()
.subscribeOn(Schedulers.io()) .subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread()) .observeOn(AndroidSchedulers.mainThread())
......
...@@ -34,7 +34,6 @@ public class MainActivity extends AbstractAuthedActivity implements MainContract ...@@ -34,7 +34,6 @@ public class MainActivity extends AbstractAuthedActivity implements MainContract
private StatusTicker statusTicker; private StatusTicker statusTicker;
private MainContract.Presenter presenter; private MainContract.Presenter presenter;
private RoomFragment roomFragment;
@Override @Override
protected int getLayoutContainerForFragment() { protected int getLayoutContainerForFragment() {
...@@ -180,8 +179,7 @@ public class MainActivity extends AbstractAuthedActivity implements MainContract ...@@ -180,8 +179,7 @@ public class MainActivity extends AbstractAuthedActivity implements MainContract
@Override @Override
public void showRoom(String hostname, String roomId) { public void showRoom(String hostname, String roomId) {
roomFragment = RoomFragment.create(hostname, roomId); showFragment(RoomFragment.create(hostname, roomId));
showFragment(roomFragment);
closeSidebarIfNeeded(); closeSidebarIfNeeded();
KeyboardHelper.hideSoftKeyboard(this); KeyboardHelper.hideSoftKeyboard(this);
} }
...@@ -224,9 +222,6 @@ public class MainActivity extends AbstractAuthedActivity implements MainContract ...@@ -224,9 +222,6 @@ public class MainActivity extends AbstractAuthedActivity implements MainContract
@Override @Override
public void showConnectionOk() { public void showConnectionOk() {
statusTicker.updateStatus(StatusTicker.STATUS_DISMISS, null); statusTicker.updateStatus(StatusTicker.STATUS_DISMISS, null);
if (roomFragment != null) {
roomFragment.refreshRoom();
}
} }
//TODO: consider this class to define in layouthelper for more complicated operation. //TODO: consider this class to define in layouthelper for more complicated operation.
......
...@@ -98,8 +98,10 @@ public class MainPresenter extends BasePresenter<MainContract.View> ...@@ -98,8 +98,10 @@ public class MainPresenter extends BasePresenter<MainContract.View>
@Override @Override
public void onRetryLogin() { public void onRetryLogin() {
view.showConnecting(); final Disposable subscription = sessionInteractor.retryLogin()
connectivityManagerApi.keepAliveServer(); .subscribe();
addSubscription(subscription);
} }
private void openRoom() { private void openRoom() {
......
...@@ -15,6 +15,7 @@ import chat.rocket.android_ddp.DDPClient; ...@@ -15,6 +15,7 @@ import chat.rocket.android_ddp.DDPClient;
import chat.rocket.android_ddp.DDPClientCallback; import chat.rocket.android_ddp.DDPClientCallback;
import chat.rocket.android_ddp.DDPSubscription; import chat.rocket.android_ddp.DDPSubscription;
import io.reactivex.Flowable; import io.reactivex.Flowable;
import io.reactivex.Maybe;
/** /**
* DDP client wrapper. * DDP client wrapper.
...@@ -124,4 +125,13 @@ public class DDPClientWrapper { ...@@ -124,4 +125,13 @@ public class DDPClientWrapper {
} }
}); });
} }
/**
* check WebSocket connectivity with ping.
*/
public Maybe<DDPClientCallback.Base> doPing() {
final String pingId = UUID.randomUUID().toString();
RCLog.d("ping[%s] >", pingId);
return ddpClient.doPing(pingId);
}
} }
...@@ -83,6 +83,10 @@ public class MethodCallHelper { ...@@ -83,6 +83,10 @@ public class MethodCallHelper {
return task.continueWithTask(_task -> { return task.continueWithTask(_task -> {
if (_task.isFaulted()) { if (_task.isFaulted()) {
Exception exception = _task.getError(); Exception exception = _task.getError();
// If wet get any error, close the socket to let the RocketChatWebSocketThread aware of it.
// FIXME: when rewriting the network layer we should get rid of this MethodCallHelper
// monolith concept. It decouples a lot the socket from the rest of the app.
ddpClientRef.get().close();
if (exception instanceof MethodCall.Error) { if (exception instanceof MethodCall.Error) {
String errMessageJson = exception.getMessage(); String errMessageJson = exception.getMessage();
if (TextUtils.isEmpty(errMessageJson)) { if (TextUtils.isEmpty(errMessageJson)) {
...@@ -94,7 +98,6 @@ public class MethodCallHelper { ...@@ -94,7 +98,6 @@ public class MethodCallHelper {
if (TwoStepAuthException.TYPE.equals(errType)) { if (TwoStepAuthException.TYPE.equals(errType)) {
return Task.forError(new TwoStepAuthException(errMessage)); return Task.forError(new TwoStepAuthException(errMessage));
} }
return Task.forError(new Exception(errMessage)); return Task.forError(new Exception(errMessage));
} else if (exception instanceof DDPClientCallback.RPC.Error) { } else if (exception instanceof DDPClientCallback.RPC.Error) {
String errMessage = ((DDPClientCallback.RPC.Error) exception).error.getString("message"); String errMessage = ((DDPClientCallback.RPC.Error) exception).error.getString("message");
......
...@@ -602,8 +602,4 @@ public class RoomFragment extends AbstractChatRoomFragment implements ...@@ -602,8 +602,4 @@ public class RoomFragment extends AbstractChatRoomFragment implements
edittingMessage = message; edittingMessage = message;
messageFormManager.setEditMessage(message.getMessage()); messageFormManager.setEditMessage(message.getMessage());
} }
public void refreshRoom() {
presenter.loadMessages();
}
} }
\ No newline at end of file
...@@ -63,6 +63,7 @@ import rx.subjects.PublishSubject; ...@@ -63,6 +63,7 @@ import rx.subjects.PublishSubject;
} }
} }
@DebugLog
@Override @Override
public void ensureConnections() { public void ensureConnections() {
for (String hostname : serverConnectivityList.keySet()) { for (String hostname : serverConnectivityList.keySet()) {
...@@ -146,6 +147,7 @@ import rx.subjects.PublishSubject; ...@@ -146,6 +147,7 @@ import rx.subjects.PublishSubject;
return Observable.concat(Observable.from(getCurrentConnectivityList()), connectivitySubject); return Observable.concat(Observable.from(getCurrentConnectivityList()), connectivitySubject);
} }
@DebugLog
private Single<Boolean> connectToServerIfNeeded(String hostname, boolean forceConnect) { private Single<Boolean> connectToServerIfNeeded(String hostname, boolean forceConnect) {
return Single.defer(() -> { return Single.defer(() -> {
final int connectivity = serverConnectivityList.get(hostname); final int connectivity = serverConnectivityList.get(hostname);
...@@ -163,8 +165,8 @@ import rx.subjects.PublishSubject; ...@@ -163,8 +165,8 @@ import rx.subjects.PublishSubject;
} }
return connectToServer(hostname) return connectToServer(hostname)
//.doOnError(RCLog::e) .doOnError(RCLog::e)
.retryWhen(RxHelper.exponentialBackoff(3, 500, TimeUnit.MILLISECONDS)); .retryWhen(RxHelper.exponentialBackoff(Integer.MAX_VALUE, 500, TimeUnit.MILLISECONDS));
}); });
} }
...@@ -191,7 +193,7 @@ import rx.subjects.PublishSubject; ...@@ -191,7 +193,7 @@ import rx.subjects.PublishSubject;
}); });
} }
@DebugLog
private Single<Boolean> waitForConnected(String hostname) { private Single<Boolean> waitForConnected(String hostname) {
return connectivitySubject return connectivitySubject
.filter(serverConnectivity -> hostname.equals(serverConnectivity.hostname)) .filter(serverConnectivity -> hostname.equals(serverConnectivity.hostname))
...@@ -207,6 +209,7 @@ import rx.subjects.PublishSubject; ...@@ -207,6 +209,7 @@ import rx.subjects.PublishSubject;
: Single.error(new ServerConnectivity.DisconnectedException())); : Single.error(new ServerConnectivity.DisconnectedException()));
} }
@DebugLog
private Single<Boolean> waitForDisconnected(String hostname) { private Single<Boolean> waitForDisconnected(String hostname) {
return connectivitySubject return connectivitySubject
.filter(serverConnectivity -> hostname.equals(serverConnectivity.hostname)) .filter(serverConnectivity -> hostname.equals(serverConnectivity.hostname))
......
...@@ -4,15 +4,18 @@ import android.content.Context; ...@@ -4,15 +4,18 @@ import android.content.Context;
import android.os.Handler; import android.os.Handler;
import android.os.HandlerThread; import android.os.HandlerThread;
import com.google.android.gms.gcm.GcmNetworkManager;
import com.google.android.gms.gcm.PeriodicTask;
import org.json.JSONObject; import org.json.JSONObject;
import java.lang.reflect.Constructor; import java.lang.reflect.Constructor;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Iterator; import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import bolts.Task; import bolts.Task;
import chat.rocket.android.RocketChatCache;
import chat.rocket.android.api.DDPClientWrapper; import chat.rocket.android.api.DDPClientWrapper;
import chat.rocket.android.api.MethodCallHelper; import chat.rocket.android.api.MethodCallHelper;
import chat.rocket.android.helper.LogIfError; import chat.rocket.android.helper.LogIfError;
...@@ -22,7 +25,6 @@ import chat.rocket.android.log.RCLog; ...@@ -22,7 +25,6 @@ import chat.rocket.android.log.RCLog;
import chat.rocket.android.service.ddp.base.ActiveUsersSubscriber; import chat.rocket.android.service.ddp.base.ActiveUsersSubscriber;
import chat.rocket.android.service.ddp.base.LoginServiceConfigurationSubscriber; import chat.rocket.android.service.ddp.base.LoginServiceConfigurationSubscriber;
import chat.rocket.android.service.ddp.base.UserDataSubscriber; import chat.rocket.android.service.ddp.base.UserDataSubscriber;
import chat.rocket.android.service.ddp.stream.StreamRoomMessage;
import chat.rocket.android.service.observer.CurrentUserObserver; import chat.rocket.android.service.observer.CurrentUserObserver;
import chat.rocket.android.service.observer.FileUploadingToUrlObserver; import chat.rocket.android.service.observer.FileUploadingToUrlObserver;
import chat.rocket.android.service.observer.FileUploadingWithUfsObserver; import chat.rocket.android.service.observer.FileUploadingWithUfsObserver;
...@@ -33,12 +35,15 @@ import chat.rocket.android.service.observer.MethodCallObserver; ...@@ -33,12 +35,15 @@ import chat.rocket.android.service.observer.MethodCallObserver;
import chat.rocket.android.service.observer.NewMessageObserver; import chat.rocket.android.service.observer.NewMessageObserver;
import chat.rocket.android.service.observer.PushSettingsObserver; import chat.rocket.android.service.observer.PushSettingsObserver;
import chat.rocket.android.service.observer.SessionObserver; import chat.rocket.android.service.observer.SessionObserver;
import chat.rocket.android.service.observer.TokenLoginObserver; import chat.rocket.android_ddp.DDPClientCallback;
import chat.rocket.core.models.ServerInfo; import chat.rocket.core.models.ServerInfo;
import chat.rocket.persistence.realm.RealmHelper; import chat.rocket.persistence.realm.RealmHelper;
import chat.rocket.persistence.realm.RealmStore; import chat.rocket.persistence.realm.RealmStore;
import chat.rocket.persistence.realm.models.internal.RealmSession; import chat.rocket.persistence.realm.models.internal.RealmSession;
import hugo.weaving.DebugLog; import hugo.weaving.DebugLog;
import io.reactivex.Flowable;
import io.reactivex.disposables.CompositeDisposable;
import rx.Completable;
import rx.Single; import rx.Single;
import rx.subscriptions.CompositeSubscription; import rx.subscriptions.CompositeSubscription;
...@@ -50,7 +55,6 @@ public class RocketChatWebSocketThread extends HandlerThread { ...@@ -50,7 +55,6 @@ public class RocketChatWebSocketThread extends HandlerThread {
LoginServiceConfigurationSubscriber.class, LoginServiceConfigurationSubscriber.class,
ActiveUsersSubscriber.class, ActiveUsersSubscriber.class,
UserDataSubscriber.class, UserDataSubscriber.class,
TokenLoginObserver.class,
MethodCallObserver.class, MethodCallObserver.class,
SessionObserver.class, SessionObserver.class,
LoadMessageProcedureObserver.class, LoadMessageProcedureObserver.class,
...@@ -62,14 +66,16 @@ public class RocketChatWebSocketThread extends HandlerThread { ...@@ -62,14 +66,16 @@ public class RocketChatWebSocketThread extends HandlerThread {
PushSettingsObserver.class, PushSettingsObserver.class,
GcmPushRegistrationObserver.class GcmPushRegistrationObserver.class
}; };
private static final long HEARTBEAT_PERIOD_MS = 20000;
private final Context appContext; private final Context appContext;
private final String hostname; private final String hostname;
private final RealmHelper realmHelper; private final RealmHelper realmHelper;
private final ConnectivityManagerInternal connectivityManager; private final ConnectivityManagerInternal connectivityManager;
private final ArrayList<Registrable> listeners = new ArrayList<>(); private final ArrayList<Registrable> listeners = new ArrayList<>();
private final CompositeDisposable hearbeatDisposable = new CompositeDisposable();
private final CompositeSubscription reconnectSubscription = new CompositeSubscription();
private DDPClientWrapper ddpClient; private DDPClientWrapper ddpClient;
private boolean listenersRegistered; private boolean listenersRegistered;
private RocketChatCache rocketChatCache;
private final DDPClientRef ddpClientRef = new DDPClientRef() { private final DDPClientRef ddpClientRef = new DDPClientRef() {
@Override @Override
public DDPClientWrapper get() { public DDPClientWrapper get() {
...@@ -103,7 +109,6 @@ public class RocketChatWebSocketThread extends HandlerThread { ...@@ -103,7 +109,6 @@ public class RocketChatWebSocketThread extends HandlerThread {
this.hostname = hostname; this.hostname = hostname;
this.realmHelper = RealmStore.getOrCreate(hostname); this.realmHelper = RealmStore.getOrCreate(hostname);
this.connectivityManager = ConnectivityManager.getInstanceForInternal(appContext); this.connectivityManager = ConnectivityManager.getInstanceForInternal(appContext);
this.rocketChatCache = new RocketChatCache(appContext);
} }
/** /**
...@@ -185,9 +190,10 @@ public class RocketChatWebSocketThread extends HandlerThread { ...@@ -185,9 +190,10 @@ public class RocketChatWebSocketThread extends HandlerThread {
@DebugLog @DebugLog
public Single<Boolean> keepAlive() { public Single<Boolean> keepAlive() {
return checkIfConnectionAlive() return checkIfConnectionAlive()
.flatMap(alive -> alive ? Single.just(true) : connect()); .flatMap(alive -> alive ? Single.just(true) : connectWithExponentialBackoff());
} }
@DebugLog
private Single<Boolean> checkIfConnectionAlive() { private Single<Boolean> checkIfConnectionAlive() {
if (ddpClient == null) { if (ddpClient == null) {
return Single.just(false); return Single.just(false);
...@@ -206,7 +212,7 @@ public class RocketChatWebSocketThread extends HandlerThread { ...@@ -206,7 +212,7 @@ public class RocketChatWebSocketThread extends HandlerThread {
if (task.isFaulted()) { if (task.isFaulted()) {
Exception error = task.getError(); Exception error = task.getError();
RCLog.e(error); RCLog.e(error);
emitter.onSuccess(false); emitter.onError(error);
} else { } else {
keepAliveTimer.update(); keepAliveTimer.update();
emitter.onSuccess(true); emitter.onSuccess(true);
...@@ -218,8 +224,28 @@ public class RocketChatWebSocketThread extends HandlerThread { ...@@ -218,8 +224,28 @@ public class RocketChatWebSocketThread extends HandlerThread {
}); });
} }
@DebugLog
private Flowable<Boolean> heartbeat(long interval) {
return Flowable.interval(interval, TimeUnit.MILLISECONDS)
.onBackpressureDrop()
.flatMap(tick -> ddpClient.doPing().toFlowable())
.map(callback -> {
if (callback instanceof DDPClientCallback.Ping) {
return true;
}
// ideally we should never get here. We should always receive a DDPClientCallback.Ping
// because we just received a pong. But maybe we received a pong from an unmatched
// ping id which we should ignore. In this case or any other random error, log and
// send false downstream
RCLog.d("heartbeat pong < %s", callback.toString());
return false;
});
}
private Single<Boolean> prepareDDPClient() { private Single<Boolean> prepareDDPClient() {
return checkIfConnectionAlive() // TODO: temporarily replaced checkIfConnectionAlive() call for this single checking if ddpClient is
// null or not. In case it is, create a new client, otherwise just keep connecting with existing one.
return Single.just(ddpClient != null)
.doOnSuccess(alive -> { .doOnSuccess(alive -> {
if (!alive) { if (!alive) {
RCLog.d("DDPClient#create"); RCLog.d("DDPClient#create");
...@@ -240,26 +266,7 @@ public class RocketChatWebSocketThread extends HandlerThread { ...@@ -240,26 +266,7 @@ public class RocketChatWebSocketThread extends HandlerThread {
// handling WebSocket#onClose() callback. // handling WebSocket#onClose() callback.
task.getResult().client.getOnCloseCallback().onSuccess(_task -> { task.getResult().client.getOnCloseCallback().onSuccess(_task -> {
ddpClient.close(); reconnect();
forceInvalidateTokens();
connectivityManager.notifyConnecting(hostname);
// Needed to use subscriptions because of legacy code.
// TODO: Should update to RxJava 2
final CompositeSubscription subscriptions = new CompositeSubscription();
subscriptions.add(
connect().retryWhen(RxHelper.exponentialBackoff(3, 500, TimeUnit.MILLISECONDS))
.subscribe(
connected -> {
if (!connected) {
connectivityManager.notifyConnectionLost(
hostname, ConnectivityManagerInternal.REASON_NETWORK_ERROR
);
}
subscriptions.clear();
},
err -> logErrorAndUnsubscribe(subscriptions, err)
)
);
return null; return null;
}); });
...@@ -290,11 +297,39 @@ public class RocketChatWebSocketThread extends HandlerThread { ...@@ -290,11 +297,39 @@ public class RocketChatWebSocketThread extends HandlerThread {
})); }));
} }
private void reconnect() {
// if we are already trying to reconnect then return.
if (reconnectSubscription.hasSubscriptions()) {
return;
}
ddpClient.close();
forceInvalidateTokens();
connectivityManager.notifyConnecting(hostname);
// Needed to use subscriptions because of legacy code.
// TODO: Should update to RxJava 2
reconnectSubscription.add(
connectWithExponentialBackoff()
.subscribe(
connected -> {
if (!connected) {
connectivityManager.notifyConnecting(hostname);
}
reconnectSubscription.clear();
},
err -> logErrorAndUnsubscribe(reconnectSubscription, err)
)
);
}
private void logErrorAndUnsubscribe(CompositeSubscription subscriptions, Throwable err) { private void logErrorAndUnsubscribe(CompositeSubscription subscriptions, Throwable err) {
RCLog.e(err); RCLog.e(err);
subscriptions.clear(); subscriptions.clear();
} }
private Single<Boolean> connectWithExponentialBackoff() {
return connect().retryWhen(RxHelper.exponentialBackoff(Integer.MAX_VALUE, 500, TimeUnit.MILLISECONDS));
}
@DebugLog @DebugLog
private Single<Boolean> connect() { private Single<Boolean> connect() {
return connectDDPClient() return connectDDPClient()
...@@ -325,12 +360,48 @@ public class RocketChatWebSocketThread extends HandlerThread { ...@@ -325,12 +360,48 @@ public class RocketChatWebSocketThread extends HandlerThread {
if (listenersRegistered) { if (listenersRegistered) {
unregisterListeners(); unregisterListeners();
} }
listenersRegistered = true;
List<RealmSession> sessions = realmHelper.executeTransactionForReadResults(realm ->
realm.where(RealmSession.class)
.isNotNull(RealmSession.TOKEN)
.equalTo(RealmSession.TOKEN_VERIFIED, false)
.isNull(RealmSession.ERROR)
.findAll());
if (sessions != null && sessions.size() > 0) {
// if we have a session try to resume it. At this point we're probably recovering from
// a disconnection state
final CompositeSubscription subscriptions = new CompositeSubscription();
MethodCallHelper methodCall = new MethodCallHelper(realmHelper, ddpClientRef);
subscriptions.add(
Completable.defer(() -> {
Task<Void> result = methodCall.loginWithToken(sessions.get(0).getToken());
if (result.isFaulted()) {
return Completable.error(result.getError());
} else {
return Completable.complete();
}
}).retryWhen(RxHelper.exponentialBackoff(Integer.MAX_VALUE, 500, TimeUnit.MILLISECONDS))
.subscribe(
() -> {
createObserversAndRegister();
subscriptions.clear();
},
error -> logErrorAndUnsubscribe(subscriptions, error)
)
);
} else {
// if we don't have any session then just create the observers and register normally
createObserversAndRegister();
}
}
@DebugLog
private void createObserversAndRegister() {
for (Class clazz : REGISTERABLE_CLASSES) { for (Class clazz : REGISTERABLE_CLASSES) {
try { try {
Constructor ctor = clazz.getConstructor(Context.class, String.class, RealmHelper.class, Constructor ctor = clazz.getConstructor(Context.class, String.class, RealmHelper.class,
DDPClientRef.class); DDPClientRef.class);
Object obj = ctor.newInstance(appContext, hostname, realmHelper, ddpClientRef); Object obj = ctor.newInstance(appContext, hostname, realmHelper, ddpClientRef);
if (obj instanceof Registrable) { if (obj instanceof Registrable) {
...@@ -338,19 +409,57 @@ public class RocketChatWebSocketThread extends HandlerThread { ...@@ -338,19 +409,57 @@ public class RocketChatWebSocketThread extends HandlerThread {
registrable.register(); registrable.register();
listeners.add(registrable); listeners.add(registrable);
} }
// Register for room stream messages
String roomId = rocketChatCache.getSelectedRoomId();
if (roomId != null && !roomId.isEmpty()) {
StreamRoomMessage streamRoomMessage = new StreamRoomMessage(
appContext, hostname, realmHelper, ddpClientRef, roomId
);
streamRoomMessage.register();
listeners.add(streamRoomMessage);
}
} catch (Exception exception) { } catch (Exception exception) {
RCLog.w(exception, "Failed to register listeners!!"); RCLog.w(exception, "Failed to register listeners!!");
} }
} }
listenersRegistered = true;
startHeartBeat();
}
private void startHeartBeat() {
// This task is scheduled to guarantee that RocketChatService is still bound at the application
// process. This is necessary due to the way the keep-alive assertions are currently architectured.
// By doing those keep-alives periodically we try to ensure that we have the app alive
// if for some reason its process gets killed (for any reason).
// TODO: should set this at another point; we should specify a reasonable time-window.
// TODO: should check and handle the case Google Play Services isn't available.
// TODO: consider on using https://github.com/evernote/android-job for this as it allows much more
// customisation like running at exponential backoff and others. We should alos use it more
// extensively throughout the app on all the common tasks, like i.e. sending a message
GcmNetworkManager gcmNetworkManager = GcmNetworkManager.getInstance(appContext);
gcmNetworkManager.schedule(
new PeriodicTask.Builder()
.setRequiresCharging(false)
.setUpdateCurrent(true)
.setRequiredNetwork(com.google.android.gms.gcm.Task.NETWORK_STATE_ANY)
.setTag(TaskService.TAG_KEEP_ALIVE)
.setService(TaskService.class)
.setPeriod(30)
.setFlex(15)
.build()
);
hearbeatDisposable.clear();
hearbeatDisposable.add(
heartbeat(HEARTBEAT_PERIOD_MS)
.subscribe(
ponged -> {
if (!ponged) {
RCLog.d("Pong received but didn't match ping id");
}
},
error -> {
RCLog.e(error);
// Stop pinging
hearbeatDisposable.clear();
if (error instanceof DDPClientCallback.Closed) {
RCLog.d("Hearbeat failure: retrying connection...");
reconnect();
}
}
)
);
} }
@DebugLog @DebugLog
...@@ -370,6 +479,7 @@ public class RocketChatWebSocketThread extends HandlerThread { ...@@ -370,6 +479,7 @@ public class RocketChatWebSocketThread extends HandlerThread {
registrable.unregister(); registrable.unregister();
iterator.remove(); iterator.remove();
} }
hearbeatDisposable.clear();
listenersRegistered = false; listenersRegistered = false;
} }
} }
package chat.rocket.android.service;
import com.google.android.gms.gcm.GcmNetworkManager;
import com.google.android.gms.gcm.GcmTaskService;
import com.google.android.gms.gcm.TaskParams;
public class TaskService extends GcmTaskService {
public static final String TAG_KEEP_ALIVE = "TAG_KEEP_ALIVE";
@Override
public int onRunTask(TaskParams taskParams) {
switch (taskParams.getTag()) {
case TAG_KEEP_ALIVE:
ConnectivityManager.getInstance(getApplicationContext()).keepAliveServer();
return GcmNetworkManager.RESULT_SUCCESS;
default:
return GcmNetworkManager.RESULT_FAILURE;
}
}
}
...@@ -20,7 +20,7 @@ abstract class AbstractBaseSubscriber extends AbstractDDPDocEventSubscriber { ...@@ -20,7 +20,7 @@ abstract class AbstractBaseSubscriber extends AbstractDDPDocEventSubscriber {
@Override @Override
protected final boolean shouldTruncateTableOnInitialize() { protected final boolean shouldTruncateTableOnInitialize() {
return true; return false;
} }
protected abstract String getSubscriptionCallbackName(); protected abstract String getSubscriptionCallbackName();
......
...@@ -2,6 +2,8 @@ package chat.rocket.android.service.internal; ...@@ -2,6 +2,8 @@ package chat.rocket.android.service.internal;
import android.content.Context; import android.content.Context;
import com.hadisatrio.optional.Optional;
import chat.rocket.android.log.RCLog; import chat.rocket.android.log.RCLog;
import io.reactivex.disposables.CompositeDisposable; import io.reactivex.disposables.CompositeDisposable;
...@@ -48,6 +50,7 @@ public abstract class AbstractRocketChatCacheObserver implements Registrable { ...@@ -48,6 +50,7 @@ public abstract class AbstractRocketChatCacheObserver implements Registrable {
compositeDisposable.add( compositeDisposable.add(
new RocketChatCache(context) new RocketChatCache(context)
.getSelectedRoomIdPublisher() .getSelectedRoomIdPublisher()
.map(Optional::get)
.subscribe(this::updateRoomIdWith, RCLog::e) .subscribe(this::updateRoomIdWith, RCLog::e)
); );
} }
......
...@@ -4,6 +4,7 @@ import android.content.Context; ...@@ -4,6 +4,7 @@ import android.content.Context;
import android.os.Handler; import android.os.Handler;
import android.os.Looper; import android.os.Looper;
import chat.rocket.android.RocketChatCache;
import chat.rocket.persistence.realm.RealmHelper; import chat.rocket.persistence.realm.RealmHelper;
import chat.rocket.android.service.DDPClientRef; import chat.rocket.android.service.DDPClientRef;
import chat.rocket.android.service.Registrable; import chat.rocket.android.service.Registrable;
...@@ -19,6 +20,7 @@ public class StreamRoomMessageManager implements Registrable { ...@@ -19,6 +20,7 @@ public class StreamRoomMessageManager implements Registrable {
private final DDPClientRef ddpClientRef; private final DDPClientRef ddpClientRef;
private final AbstractRocketChatCacheObserver cacheObserver; private final AbstractRocketChatCacheObserver cacheObserver;
private final Handler handler; private final Handler handler;
private final RocketChatCache rocketChatCache;
private StreamRoomMessage streamRoomMessage; private StreamRoomMessage streamRoomMessage;
public StreamRoomMessageManager(Context context, String hostname, public StreamRoomMessageManager(Context context, String hostname,
...@@ -27,6 +29,7 @@ public class StreamRoomMessageManager implements Registrable { ...@@ -27,6 +29,7 @@ public class StreamRoomMessageManager implements Registrable {
this.hostname = hostname; this.hostname = hostname;
this.realmHelper = realmHelper; this.realmHelper = realmHelper;
this.ddpClientRef = ddpClientRef; this.ddpClientRef = ddpClientRef;
this.rocketChatCache = new RocketChatCache(context);
cacheObserver = new AbstractRocketChatCacheObserver(context, realmHelper) { cacheObserver = new AbstractRocketChatCacheObserver(context, realmHelper) {
@Override @Override
...@@ -57,6 +60,11 @@ public class StreamRoomMessageManager implements Registrable { ...@@ -57,6 +60,11 @@ public class StreamRoomMessageManager implements Registrable {
@Override @Override
public void register() { public void register() {
cacheObserver.register(); cacheObserver.register();
String selectedRoomId = rocketChatCache.getSelectedRoomId();
if (selectedRoomId == null) {
return;
}
registerStreamNotifyMessage(selectedRoomId);
} }
@Override @Override
......
...@@ -135,13 +135,15 @@ public class RealmMessageRepository extends RealmRepository implements MessageRe ...@@ -135,13 +135,15 @@ public class RealmMessageRepository extends RealmRepository implements MessageRe
() -> new Pair<>(RealmStore.getRealm(hostname), Looper.myLooper()), () -> new Pair<>(RealmStore.getRealm(hostname), Looper.myLooper()),
pair -> RxJavaInterop.toV2Flowable(pair.first.where(RealmMessage.class) pair -> RxJavaInterop.toV2Flowable(pair.first.where(RealmMessage.class)
.equalTo(RealmMessage.ROOM_ID, room.getRoomId()) .equalTo(RealmMessage.ROOM_ID, room.getRoomId())
.isNotNull(RealmMessage.USER)
.findAllSorted(RealmMessage.TIMESTAMP, Sort.DESCENDING) .findAllSorted(RealmMessage.TIMESTAMP, Sort.DESCENDING)
.asObservable()), .asObservable()),
pair -> close(pair.first, pair.second) pair -> close(pair.first, pair.second)
) )
.unsubscribeOn(AndroidSchedulers.from(Looper.myLooper())) .unsubscribeOn(AndroidSchedulers.from(Looper.myLooper()))
.filter(it -> it.isLoaded() && it.isValid()) .filter(it -> it.isLoaded() && it.isValid())
.map(this::toList)); .map(this::toList)
.distinctUntilChanged());
} }
@Override @Override
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment