Commit f04760f6 authored by Leonardo Aramaki's avatar Leonardo Aramaki

Add a heartbeat method that pings the server at a timed interval. Does

this using the newly added ping call without the use of bolts'
callbacks.
Implemented a TaskService class that is used to do the work offloaded
by the GcmNetworkManager which is used here to schedule
keep-alive recurring tasks.
parent 40fb5adc
......@@ -69,5 +69,13 @@
<action android:name="com.google.android.gms.iid.InstanceID"/>
</intent-filter>
</service>
<service android:name=".service.TaskService"
android:permission="com.google.android.gms.permission.BIND_NETWORK_TASK_SERVICE"
android:exported="true">
<intent-filter>
<action android:name="com.google.android.gms.gcm.ACTION_TASK_READY"/>
</intent-filter>
</service>
</application>
</manifest>
\ No newline at end of file
......@@ -4,11 +4,15 @@ import android.content.Context;
import android.os.Handler;
import android.os.HandlerThread;
import com.google.android.gms.gcm.GcmNetworkManager;
import com.google.android.gms.gcm.PeriodicTask;
import org.json.JSONObject;
import java.lang.reflect.Constructor;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import bolts.Task;
......@@ -32,11 +36,15 @@ import chat.rocket.android.service.observer.NewMessageObserver;
import chat.rocket.android.service.observer.PushSettingsObserver;
import chat.rocket.android.service.observer.SessionObserver;
import chat.rocket.android.service.observer.TokenLoginObserver;
import chat.rocket.android_ddp.DDPClientCallback;
import chat.rocket.core.models.ServerInfo;
import chat.rocket.persistence.realm.RealmHelper;
import chat.rocket.persistence.realm.RealmStore;
import chat.rocket.persistence.realm.models.internal.RealmSession;
import hugo.weaving.DebugLog;
import io.reactivex.Flowable;
import io.reactivex.disposables.CompositeDisposable;
import rx.Completable;
import rx.Single;
import rx.subscriptions.CompositeSubscription;
......@@ -48,7 +56,6 @@ public class RocketChatWebSocketThread extends HandlerThread {
LoginServiceConfigurationSubscriber.class,
ActiveUsersSubscriber.class,
UserDataSubscriber.class,
TokenLoginObserver.class,
MethodCallObserver.class,
SessionObserver.class,
LoadMessageProcedureObserver.class,
......@@ -60,11 +67,14 @@ public class RocketChatWebSocketThread extends HandlerThread {
PushSettingsObserver.class,
GcmPushRegistrationObserver.class
};
private static final long HEARTBEAT_PERIOD_MS = 20000;
private final Context appContext;
private final String hostname;
private final RealmHelper realmHelper;
private final ConnectivityManagerInternal connectivityManager;
private final ArrayList<Registrable> listeners = new ArrayList<>();
private final CompositeDisposable hearbeatDisposable = new CompositeDisposable();
private final CompositeSubscription reconnectSubscription = new CompositeSubscription();
private DDPClientWrapper ddpClient;
private boolean listenersRegistered;
private final DDPClientRef ddpClientRef = new DDPClientRef() {
......@@ -184,6 +194,7 @@ public class RocketChatWebSocketThread extends HandlerThread {
.flatMap(alive -> alive ? Single.just(true) : connectWithExponentialBackoff());
}
@DebugLog
private Single<Boolean> checkIfConnectionAlive() {
if (ddpClient == null) {
return Single.just(false);
......@@ -202,7 +213,7 @@ public class RocketChatWebSocketThread extends HandlerThread {
if (task.isFaulted()) {
Exception error = task.getError();
RCLog.e(error);
emitter.onSuccess(false);
emitter.onError(error);
} else {
keepAliveTimer.update();
emitter.onSuccess(true);
......@@ -214,8 +225,28 @@ public class RocketChatWebSocketThread extends HandlerThread {
});
}
@DebugLog
private Flowable<Boolean> heartbeat(long interval) {
return Flowable.interval(interval, TimeUnit.MILLISECONDS)
.onBackpressureDrop()
.flatMap(tick -> ddpClient.doPing().toFlowable())
.map(callback -> {
if (callback instanceof DDPClientCallback.Ping) {
return true;
}
// ideally we should never get here. We should always receive a DDPClientCallback.Ping
// because we just received a pong. But maybe we received a pong from an unmatched
// ping id which we should ignore. In this case or any other random error, log and
// send false downstream
RCLog.d("heartbeat pong < %s", callback.toString());
return false;
});
}
private Single<Boolean> prepareDDPClient() {
return checkIfConnectionAlive()
// TODO: temporarily replaced checkIfConnectionAlive() call for this single checking if ddpClient is
// null or not. In case it is, create a new client, otherwise just keep connecting with existing one.
return Single.just(ddpClient != null)
.doOnSuccess(alive -> {
if (!alive) {
RCLog.d("DDPClient#create");
......@@ -236,24 +267,7 @@ public class RocketChatWebSocketThread extends HandlerThread {
// handling WebSocket#onClose() callback.
task.getResult().client.getOnCloseCallback().onSuccess(_task -> {
ddpClient.close();
forceInvalidateTokens();
connectivityManager.notifyConnecting(hostname);
// Needed to use subscriptions because of legacy code.
// TODO: Should update to RxJava 2
final CompositeSubscription subscriptions = new CompositeSubscription();
subscriptions.add(
connectWithExponentialBackoff()
.subscribe(
connected -> {
if (!connected) {
connectivityManager.notifyConnecting(hostname);
}
subscriptions.clear();
},
err -> logErrorAndUnsubscribe(subscriptions, err)
)
);
reconnect();
return null;
});
......@@ -284,6 +298,30 @@ public class RocketChatWebSocketThread extends HandlerThread {
}));
}
private void reconnect() {
// if we are already trying to reconnect then return.
if (reconnectSubscription.hasSubscriptions()) {
return;
}
ddpClient.close();
forceInvalidateTokens();
connectivityManager.notifyConnecting(hostname);
// Needed to use subscriptions because of legacy code.
// TODO: Should update to RxJava 2
reconnectSubscription.add(
connectWithExponentialBackoff()
.subscribe(
connected -> {
if (!connected) {
connectivityManager.notifyConnecting(hostname);
}
reconnectSubscription.clear();
},
err -> logErrorAndUnsubscribe(reconnectSubscription, err)
)
);
}
private void logErrorAndUnsubscribe(CompositeSubscription subscriptions, Throwable err) {
RCLog.e(err);
subscriptions.clear();
......@@ -323,8 +361,44 @@ public class RocketChatWebSocketThread extends HandlerThread {
if (listenersRegistered) {
unregisterListeners();
}
listenersRegistered = true;
List<RealmSession> sessions = realmHelper.executeTransactionForReadResults(realm ->
realm.where(RealmSession.class)
.isNotNull(RealmSession.TOKEN)
.equalTo(RealmSession.TOKEN_VERIFIED, false)
.isNull(RealmSession.ERROR)
.findAll());
if (sessions != null && sessions.size() > 0) {
// if we have a session try to resume it. At this point we're probably recovering from
// a disconnection state
final CompositeSubscription subscriptions = new CompositeSubscription();
MethodCallHelper methodCall = new MethodCallHelper(realmHelper, ddpClientRef);
subscriptions.add(
Completable.defer(() -> {
Task<Void> result = methodCall.loginWithToken(sessions.get(0).getToken());
if (result.isFaulted()) {
return Completable.error(result.getError());
} else {
return Completable.complete();
}
}).retryWhen(RxHelper.exponentialBackoff(Integer.MAX_VALUE, 500, TimeUnit.MILLISECONDS))
.subscribe(
() -> {
createObserversAndRegister();
subscriptions.clear();
},
error -> logErrorAndUnsubscribe(subscriptions, error)
)
);
} else {
// if we don't have any session then just create the observers and register normally
createObserversAndRegister();
}
}
@DebugLog
private void createObserversAndRegister() {
for (Class clazz : REGISTERABLE_CLASSES) {
try {
Constructor ctor = clazz.getConstructor(Context.class, String.class, RealmHelper.class,
......@@ -336,11 +410,57 @@ public class RocketChatWebSocketThread extends HandlerThread {
registrable.register();
listeners.add(registrable);
}
} catch (Exception exception) {
RCLog.w(exception, "Failed to register listeners!!");
}
}
listenersRegistered = true;
startHeartBeat();
}
private void startHeartBeat() {
// This task is scheduled to guarantee that RocketChatService is still bound at the application
// process. This is necessary due to the way the keep-alive assertions are currently architectured.
// By doing those keep-alives periodically we try to ensure that we have the app alive
// if for some reason its process gets killed (for any reason).
// TODO: should set this at another point; we should specify a reasonable time-window.
// TODO: should check and handle the case Google Play Services isn't available.
// TODO: consider on using https://github.com/evernote/android-job for this as it allows much more
// customisation like running at exponential backoff and others. We should alos use it more
// extensively throughout the app on all the common tasks, like i.e. sending a message
GcmNetworkManager gcmNetworkManager = GcmNetworkManager.getInstance(appContext);
gcmNetworkManager.schedule(
new PeriodicTask.Builder()
.setRequiresCharging(false)
.setUpdateCurrent(true)
.setRequiredNetwork(com.google.android.gms.gcm.Task.NETWORK_STATE_ANY)
.setTag(TaskService.TAG_KEEP_ALIVE)
.setService(TaskService.class)
.setPeriod(30)
.setFlex(15)
.build()
);
hearbeatDisposable.clear();
hearbeatDisposable.add(
heartbeat(HEARTBEAT_PERIOD_MS)
.subscribe(
ponged -> {
if (!ponged) {
RCLog.d("Pong received but didn't match ping id");
}
},
error -> {
RCLog.e(error);
// Stop pinging
hearbeatDisposable.clear();
if (error instanceof DDPClientCallback.Closed) {
RCLog.d("Hearbeat failure: retrying connection...");
reconnect();
}
}
)
);
}
@DebugLog
......@@ -360,6 +480,7 @@ public class RocketChatWebSocketThread extends HandlerThread {
registrable.unregister();
iterator.remove();
}
hearbeatDisposable.clear();
listenersRegistered = false;
}
}
package chat.rocket.android.service;
import com.google.android.gms.gcm.GcmNetworkManager;
import com.google.android.gms.gcm.GcmTaskService;
import com.google.android.gms.gcm.TaskParams;
public class TaskService extends GcmTaskService {
public static final String TAG_KEEP_ALIVE = "TAG_KEEP_ALIVE";
@Override
public int onRunTask(TaskParams taskParams) {
switch (taskParams.getTag()) {
case TAG_KEEP_ALIVE:
ConnectivityManager.getInstance(getApplicationContext()).keepAliveServer();
return GcmNetworkManager.RESULT_SUCCESS;
default:
return GcmNetworkManager.RESULT_FAILURE;
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment