Unverified Commit 028f1cd8 authored by Rafael Kellermann Streit's avatar Rafael Kellermann Streit Committed by GitHub

Merge pull request #556 from RocketChat/fix/irreversible_offline_state

[WIP][BUG] Fix sync state when sending new messages after reconnection
parents e5198165 6fbbd3c1
......@@ -59,7 +59,7 @@ public class DDPClientImpl {
CompositeDisposable disposables = new CompositeDisposable();
disposables.add(
flowable.retry().filter(callback -> callback instanceof RxWebSocketCallback.Open)
flowable.filter(callback -> callback instanceof RxWebSocketCallback.Open)
.subscribe(
callback ->
sendMessage("connect",
......@@ -115,6 +115,7 @@ public class DDPClientImpl {
if (requested) {
return flowable.filter(callback -> callback instanceof RxWebSocketCallback.Message)
.timeout(8, TimeUnit.SECONDS)
.map(callback -> ((RxWebSocketCallback.Message) callback).responseBodyString)
.map(DDPClientImpl::toJson)
.filter(response -> "pong".equalsIgnoreCase(extractMsg(response)))
......
......@@ -9,6 +9,7 @@ import com.hadisatrio.optional.Optional;
import java.util.List;
import chat.rocket.android.LaunchUtil;
import chat.rocket.android.R;
import chat.rocket.android.RocketChatCache;
import chat.rocket.android.helper.Logger;
import chat.rocket.android.push.PushManager;
......@@ -42,7 +43,6 @@ abstract class AbstractAuthedActivity extends AbstractFragmentActivity {
}
updateHostnameIfNeeded(rocketChatCache.getSelectedServerHostname());
updateRoomIdIfNeeded(rocketChatCache.getSelectedRoomId());
}
@Override
......@@ -93,17 +93,22 @@ abstract class AbstractAuthedActivity extends AbstractFragmentActivity {
if (hostname == null) {
if (newHostname != null && assertServerRealmStoreExists(newHostname)) {
updateHostname(newHostname);
updateRoomIdIfNeeded(rocketChatCache.getSelectedRoomId());
} else {
recoverFromHostnameError();
}
} else {
if (hostname.equals(newHostname)) {
updateHostname(newHostname);
updateRoomIdIfNeeded(rocketChatCache.getSelectedRoomId());
return;
}
if (assertServerRealmStoreExists(newHostname)) {
recreate();
Intent intent = new Intent(this, MainActivity.class);
startActivity(intent);
finish();
overridePendingTransition(R.anim.slide_in, R.anim.slide_out);
} else {
recoverFromHostnameError();
}
......
......@@ -138,6 +138,7 @@ public class MainActivity extends AbstractAuthedActivity implements MainContract
});
}
}
closeSidebarIfNeeded();
}
private boolean closeSidebarIfNeeded() {
......@@ -249,7 +250,7 @@ public class MainActivity extends AbstractAuthedActivity implements MainContract
Snackbar.make(findViewById(getLayoutContainerForFragment()),
R.string.fragment_retry_login_error_title, Snackbar.LENGTH_INDEFINITE)
.setAction(R.string.fragment_retry_login_retry_title, view ->
presenter.onRetryLogin()));
ConnectivityManager.getInstance(getApplicationContext()).keepAliveServer()));
}
@Override
......
......@@ -18,6 +18,7 @@ import chat.rocket.android.log.RCLog;
import chat.rocket.android.service.ConnectivityManagerApi;
import chat.rocket.android.service.ServerConnectivity;
import chat.rocket.android.shared.BasePresenter;
import chat.rocket.android_ddp.DDPClient;
import chat.rocket.core.PublicSettingsConstants;
import chat.rocket.core.interactors.CanCreateRoomInteractor;
import chat.rocket.core.interactors.RoomInteractor;
......@@ -226,9 +227,13 @@ public class MainPresenter extends BasePresenter<MainContract.View>
if (connectivity.state == ServerConnectivity.STATE_CONNECTED) {
view.showConnectionOk();
view.refreshRoom();
return;
} else if (connectivity.state == ServerConnectivity.STATE_DISCONNECTED) {
if (connectivity.code == DDPClient.REASON_NETWORK_ERROR) {
view.showConnectionError();
}
} else {
view.showConnecting();
}
view.showConnecting();
},
Logger::report
);
......
......@@ -16,11 +16,13 @@ import java.util.concurrent.TimeUnit;
import chat.rocket.android.RocketChatCache;
import chat.rocket.android.helper.RxHelper;
import chat.rocket.android.log.RCLog;
import chat.rocket.android_ddp.DDPClient;
import chat.rocket.core.models.ServerInfo;
import chat.rocket.persistence.realm.models.RealmBasedServerInfo;
import hugo.weaving.DebugLog;
import rx.Observable;
import rx.Single;
import rx.schedulers.Schedulers;
import rx.subjects.PublishSubject;
/**
......@@ -28,7 +30,7 @@ import rx.subjects.PublishSubject;
*/
/*package*/ class RealmBasedConnectivityManager
implements ConnectivityManagerApi, ConnectivityManagerInternal {
private final ConcurrentHashMap<String, Integer> serverConnectivityList = new ConcurrentHashMap<>();
private volatile ConcurrentHashMap<String, Integer> serverConnectivityList = new ConcurrentHashMap<>();
private final PublishSubject<ServerConnectivity> connectivitySubject = PublishSubject.create();
private Context appContext;
private final ServiceConnection serviceConnection = new ServiceConnection() {
......@@ -70,9 +72,16 @@ import rx.subjects.PublishSubject;
@Override
public void ensureConnections() {
String hostname = new RocketChatCache(appContext).getSelectedServerHostname();
if (hostname == null) {
return;
}
connectToServerIfNeeded(hostname, true/* force connect */)
.subscribeOn(Schedulers.io())
.subscribe(_val -> {
}, RCLog::e);
}, error -> {
RCLog.e(error);
notifyConnectionLost(hostname, DDPClient.REASON_NETWORK_ERROR);
});
}
@Override
......@@ -130,10 +139,10 @@ import rx.subjects.PublishSubject;
@DebugLog
@Override
public void notifyConnectionLost(String hostname, int reason) {
public void notifyConnectionLost(String hostname, int code) {
serverConnectivityList.put(hostname, ServerConnectivity.STATE_DISCONNECTED);
connectivitySubject.onNext(
new ServerConnectivity(hostname, ServerConnectivity.STATE_DISCONNECTED));
new ServerConnectivity(hostname, ServerConnectivity.STATE_DISCONNECTED, code));
}
@DebugLog
......@@ -167,13 +176,16 @@ import rx.subjects.PublishSubject;
.flatMap(_val -> connectToServerIfNeeded(hostname, forceConnect));
}
if (connectivity == ServerConnectivity.STATE_CONNECTING) {
return waitForConnected(hostname);
// if (connectivity == ServerConnectivity.STATE_CONNECTING) {
// return waitForConnected(hostname)
// .doOnError(error -> notifyConnectionLost(hostname, REASON_NETWORK_ERROR));
// }
if (connectivity == ServerConnectivity.STATE_DISCONNECTED) {
notifyConnecting(hostname);
}
return connectToServer(hostname)
.doOnError(RCLog::e)
.retryWhen(RxHelper.exponentialBackoff(Integer.MAX_VALUE, 500, TimeUnit.MILLISECONDS));
return connectToServer(hostname);
});
}
......@@ -186,7 +198,7 @@ import rx.subjects.PublishSubject;
if (connectivity == ServerConnectivity.STATE_CONNECTING) {
return waitForConnected(hostname)
.onErrorReturn(err -> true)
.doOnError(err -> notifyConnectionLost(hostname, DDPClient.REASON_NETWORK_ERROR))
.flatMap(_val -> disconnectFromServerIfNeeded(hostname));
}
......@@ -195,8 +207,7 @@ import rx.subjects.PublishSubject;
}
return disconnectFromServer(hostname)
//.doOnError(RCLog::e)
.retryWhen(RxHelper.exponentialBackoff(3, 500, TimeUnit.MILLISECONDS));
.retryWhen(RxHelper.exponentialBackoff(1, 500, TimeUnit.MILLISECONDS));
});
}
......@@ -237,7 +248,7 @@ import rx.subjects.PublishSubject;
if (serverConnectivityList.get(hostname) != ServerConnectivity.STATE_CONNECTED) {
// Mark as CONNECTING except for the case [forceConnect && connected] because
// ensureConnectionToServer doesn't notify ConnectionEstablished/Lost is already connected.
serverConnectivityList.put(hostname, ServerConnectivity.STATE_CONNECTING);
// serverConnectivityList.put(hostname, ServerConnectivity.STATE_CONNECTING);
}
if (serviceInterface != null) {
......
......@@ -8,11 +8,11 @@ import android.os.Binder;
import android.os.IBinder;
import android.support.annotation.Nullable;
import java.util.HashMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import chat.rocket.android.helper.Logger;
import chat.rocket.android.log.RCLog;
import chat.rocket.persistence.realm.RealmStore;
import hugo.weaving.DebugLog;
import rx.Observable;
......@@ -24,8 +24,8 @@ import rx.Single;
public class RocketChatService extends Service implements ConnectivityServiceInterface {
private ConnectivityManagerInternal connectivityManager;
private HashMap<String, RocketChatWebSocketThread> webSocketThreads;
private Semaphore webSocketThreadLock = new Semaphore(1);
private static volatile Semaphore webSocketThreadLock = new Semaphore(1);
private static volatile RocketChatWebSocketThread currentWebSocketThread;
public class LocalBinder extends Binder {
ConnectivityServiceInterface getServiceInterface() {
......@@ -57,7 +57,6 @@ public class RocketChatService extends Service implements ConnectivityServiceInt
super.onCreate();
connectivityManager = ConnectivityManager.getInstanceForInternal(getApplicationContext());
connectivityManager.resetConnectivityStateList();
webSocketThreads = new HashMap<>();
}
@DebugLog
......@@ -71,8 +70,9 @@ public class RocketChatService extends Service implements ConnectivityServiceInt
public Single<Boolean> ensureConnectionToServer(String hostname) { //called via binder.
return getOrCreateWebSocketThread(hostname)
.doOnError(err -> {
webSocketThreads.remove(hostname);
connectivityManager.notifyConnectionLost(hostname, ConnectivityManagerInternal.REASON_NETWORK_ERROR);
err.printStackTrace();
currentWebSocketThread = null;
// connectivityManager.notifyConnectionLost(hostname, ConnectivityManagerInternal.REASON_NETWORK_ERROR);
})
.flatMap(webSocketThreads -> webSocketThreads.keepAlive());
}
......@@ -80,17 +80,15 @@ public class RocketChatService extends Service implements ConnectivityServiceInt
@Override
public Single<Boolean> disconnectFromServer(String hostname) { //called via binder.
return Single.defer(() -> {
if (!webSocketThreads.containsKey(hostname)) {
if (!existsThreadForHostname(hostname)) {
return Single.just(true);
}
RocketChatWebSocketThread thread = webSocketThreads.get(hostname);
if (thread != null) {
return thread.terminate()
if (currentWebSocketThread != null) {
return currentWebSocketThread.terminate()
// after disconnection from server
.doAfterTerminate(() -> {
// remove RCWebSocket key from HashMap
webSocketThreads.remove(hostname);
currentWebSocketThread = null;
// remove RealmConfiguration key from HashMap
RealmStore.sStore.remove(hostname);
});
......@@ -106,25 +104,54 @@ public class RocketChatService extends Service implements ConnectivityServiceInt
return Single.defer(() -> {
webSocketThreadLock.acquire();
int connectivityState = ConnectivityManager.getInstance(getApplicationContext()).getConnectivityState(hostname);
boolean isConnected = connectivityState == ServerConnectivity.STATE_CONNECTED;
if (webSocketThreads.containsKey(hostname) && isConnected) {
RocketChatWebSocketThread thread = webSocketThreads.get(hostname);
boolean isDisconnected = connectivityState != ServerConnectivity.STATE_CONNECTED;
if (currentWebSocketThread != null && existsThreadForHostname(hostname) && !isDisconnected) {
webSocketThreadLock.release();
return Single.just(thread);
return Single.just(currentWebSocketThread);
}
connectivityManager.notifyConnecting(hostname);
if (currentWebSocketThread != null) {
return currentWebSocketThread.terminate()
.doAfterTerminate(() -> currentWebSocketThread = null)
.doOnError(RCLog::e)
.flatMap(terminated ->
RocketChatWebSocketThread.getStarted(getApplicationContext(), hostname)
.doOnSuccess(thread -> {
currentWebSocketThread = thread;
webSocketThreadLock.release();
})
.doOnError(throwable -> {
currentWebSocketThread = null;
RCLog.e(throwable);
Logger.report(throwable);
webSocketThreadLock.release();
})
);
}
return RocketChatWebSocketThread.getStarted(getApplicationContext(), hostname)
.doOnSuccess(thread -> {
webSocketThreads.put(hostname, thread);
currentWebSocketThread = thread;
webSocketThreadLock.release();
})
.doOnError(throwable -> {
currentWebSocketThread = null;
RCLog.e(throwable);
Logger.report(throwable);
webSocketThreadLock.release();
});
});
}
private boolean existsThreadForHostname(String hostname) {
if (hostname == null || currentWebSocketThread == null) {
return false;
}
return currentWebSocketThread.getName().equals("RC_thread_" + hostname);
}
@Nullable
@Override
public IBinder onBind(Intent intent) {
......
......@@ -11,6 +11,7 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import bolts.Task;
import chat.rocket.android.api.MethodCallHelper;
......@@ -34,6 +35,7 @@ import chat.rocket.android.service.observer.PushSettingsObserver;
import chat.rocket.android.service.observer.SessionObserver;
import chat.rocket.android_ddp.DDPClient;
import chat.rocket.android_ddp.DDPClientCallback;
import chat.rocket.android_ddp.rx.RxWebSocketCallback;
import chat.rocket.core.models.ServerInfo;
import chat.rocket.persistence.realm.RealmHelper;
import chat.rocket.persistence.realm.RealmStore;
......@@ -121,7 +123,7 @@ public class RocketChatWebSocketThread extends HandlerThread {
}
}.start();
}).flatMap(webSocket ->
webSocket.connect().map(_val -> webSocket));
webSocket.connectWithExponentialBackoff().map(_val -> webSocket));
}
@Override
......@@ -154,14 +156,14 @@ public class RocketChatWebSocketThread extends HandlerThread {
RCLog.d("thread %s: terminated()", Thread.currentThread().getId());
unregisterListenersAndClose();
connectivityManager.notifyConnectionLost(hostname,
ConnectivityManagerInternal.REASON_CLOSED_BY_USER);
DDPClient.REASON_CLOSED_BY_USER);
RocketChatWebSocketThread.super.quit();
emitter.onSuccess(true);
});
});
} else {
connectivityManager.notifyConnectionLost(hostname,
ConnectivityManagerInternal.REASON_NETWORK_ERROR);
DDPClient.REASON_NETWORK_ERROR);
super.quit();
return Single.just(true);
}
......@@ -205,7 +207,7 @@ public class RocketChatWebSocketThread extends HandlerThread {
Exception error = task.getError();
RCLog.e(error);
connectivityManager.notifyConnectionLost(
hostname, ConnectivityManagerInternal.REASON_NETWORK_ERROR);
hostname, DDPClient.REASON_CLOSED_BY_USER);
emitter.onError(error);
} else {
keepAliveTimer.update();
......@@ -257,42 +259,43 @@ public class RocketChatWebSocketThread extends HandlerThread {
}
RCLog.d("DDPClient#connect");
DDPClient.get().connect(hostname, info.getSession(), info.isSecure())
.onSuccessTask(task -> {
final String newSession = task.getResult().session;
connectivityManager.notifyConnectionEstablished(hostname, newSession);
// handling WebSocket#onClose() callback.
task.getResult().client.getOnCloseCallback().onSuccess(_task -> {
if (_task.getResult().code != 1000) {
reconnect();
}
return null;
});
return realmHelper.executeTransaction(realm -> {
RealmSession sessionObj = RealmSession.queryDefaultSession(realm).findFirst();
if (sessionObj == null) {
realm.createOrUpdateObjectFromJson(RealmSession.class,
new JSONObject().put(RealmSession.ID, RealmSession.DEFAULT_ID));
} else {
// invalidate login token.
if (!TextUtils.isEmpty(sessionObj.getToken()) && sessionObj.isTokenVerified()) {
sessionObj.setTokenVerified(false);
sessionObj.setError(null);
}
.onSuccessTask(task -> {
final String newSession = task.getResult().session;
connectivityManager.notifyConnectionEstablished(hostname, newSession);
// handling WebSocket#onClose() callback.
task.getResult().client.getOnCloseCallback().onSuccess(_task -> {
RxWebSocketCallback.Close result = _task.getResult();
if (result.code == DDPClient.REASON_NETWORK_ERROR) {
reconnect();
}
return null;
});
return realmHelper.executeTransaction(realm -> {
RealmSession sessionObj = RealmSession.queryDefaultSession(realm).findFirst();
if (sessionObj == null) {
realm.createOrUpdateObjectFromJson(RealmSession.class,
new JSONObject().put(RealmSession.ID, RealmSession.DEFAULT_ID));
} else {
// invalidate login token.
if (!TextUtils.isEmpty(sessionObj.getToken()) && sessionObj.isTokenVerified()) {
sessionObj.setTokenVerified(false);
sessionObj.setError(null);
}
}
return null;
});
})
.continueWith(task -> {
if (task.isFaulted()) {
emitter.onError(task.getError());
} else {
emitter.onSuccess(true);
}
return null;
});
}
return null;
});
})
.continueWith(task -> {
if (task.isFaulted()) {
emitter.onError(task.getError());
} else {
emitter.onSuccess(true);
}
return null;
});
}));
}
......@@ -301,22 +304,25 @@ public class RocketChatWebSocketThread extends HandlerThread {
if (reconnectSubscription.hasSubscriptions()) {
return;
}
DDPClient.get().close();
forceInvalidateTokens();
connectivityManager.notifyConnecting(hostname);
// Needed to use subscriptions because of legacy code.
// TODO: Should update to RxJava 2
reconnectSubscription.add(
connectWithExponentialBackoff()
.subscribe(
connected -> {
if (!connected) {
connectivityManager.notifyConnecting(hostname);
}
reconnectSubscription.clear();
},
err -> logErrorAndUnsubscribe(reconnectSubscription, err)
)
connectWithExponentialBackoff()
.subscribe(
connected -> {
if (!connected) {
connectivityManager.notifyConnecting(hostname);
}
reconnectSubscription.clear();
},
error -> {
logErrorAndUnsubscribe(reconnectSubscription, error);
connectivityManager.notifyConnectionLost(hostname,
DDPClient.REASON_NETWORK_ERROR);
}
)
);
}
......@@ -326,7 +332,7 @@ public class RocketChatWebSocketThread extends HandlerThread {
}
private Single<Boolean> connectWithExponentialBackoff() {
return connect().retryWhen(RxHelper.exponentialBackoff(Integer.MAX_VALUE, 500, TimeUnit.MILLISECONDS));
return connect().retryWhen(RxHelper.exponentialBackoff(3, 500, TimeUnit.MILLISECONDS));
}
@DebugLog
......@@ -429,7 +435,7 @@ public class RocketChatWebSocketThread extends HandlerThread {
RCLog.e(error);
// Stop pinging
hearbeatDisposable.clear();
if (error instanceof DDPClientCallback.Closed) {
if (error instanceof DDPClientCallback.Closed || error instanceof TimeoutException) {
RCLog.d("Hearbeat failure: retrying connection...");
reconnect();
}
......@@ -440,10 +446,8 @@ public class RocketChatWebSocketThread extends HandlerThread {
@DebugLog
private void unregisterListenersAndClose() {
unregisterListeners();
if (DDPClient.get() != null) {
DDPClient.get().close();
}
unregisterListeners();
DDPClient.get().close();
}
@DebugLog
......
......@@ -11,10 +11,18 @@ public class ServerConnectivity {
public final String hostname;
public final int state;
public final int code;
public ServerConnectivity(String hostname, int state) {
ServerConnectivity(String hostname, int state) {
this.hostname = hostname;
this.state = state;
this.code = -1;
}
ServerConnectivity(String hostname, int state, int code) {
this.hostname = hostname;
this.state = state;
this.code = code;
}
/**
......
<?xml version="1.0" encoding="utf-8"?>
<set xmlns:android="http://schemas.android.com/apk/res/android"
android:shareInterpolator="true"
android:interpolator="@android:anim/decelerate_interpolator">
<translate
android:duration="@android:integer/config_mediumAnimTime"
android:fromYDelta="100%p"
android:toYDelta="0">
</translate>
</set>
\ No newline at end of file
<?xml version="1.0" encoding="utf-8"?>
<set xmlns:android="http://schemas.android.com/apk/res/android">
<translate
xmlns:android="http://schemas.android.com/apk/res/android"
android:duration="@android:integer/config_mediumAnimTime"
android:fromYDelta="0%p"
android:toYDelta="0">
</translate>
</set>
\ No newline at end of file
......@@ -14,5 +14,4 @@
org.gradle.jvmargs=-Xmx6144M
#org.gradle.parallel=true
android.enableBuildCache=true
android.enableAapt2=false
\ No newline at end of file
android.enableBuildCache=true
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment