Commit dfe73eb4 authored by Leonardo Aramaki's avatar Leonardo Aramaki

Change reconnection algorithm

parent 095e2e5e
...@@ -23,7 +23,7 @@ public class DDPClient { ...@@ -23,7 +23,7 @@ public class DDPClient {
// reference: https://github.com/eddflrs/meteor-ddp/blob/master/meteor-ddp.js // reference: https://github.com/eddflrs/meteor-ddp/blob/master/meteor-ddp.js
private static volatile DDPClient singleton; private static volatile DDPClient singleton;
private static OkHttpClient client; private static volatile OkHttpClient client;
private final DDPClientImpl impl; private final DDPClientImpl impl;
private final AtomicReference<String> hostname = new AtomicReference<>(); private final AtomicReference<String> hostname = new AtomicReference<>();
...@@ -49,9 +49,8 @@ public class DDPClient { ...@@ -49,9 +49,8 @@ public class DDPClient {
} }
public Task<DDPClientCallback.Connect> connect(String url, String session) { public Task<DDPClientCallback.Connect> connect(String url, String session) {
String oldHostname = hostname.get(); String oldHostname = hostname.getAndSet(url);
hostname.set(url); if (oldHostname != null && !oldHostname.equalsIgnoreCase(url)) {
if (oldHostname != null && !url.equalsIgnoreCase(oldHostname)) {
close(); close();
} }
TaskCompletionSource<DDPClientCallback.Connect> task = new TaskCompletionSource<>(); TaskCompletionSource<DDPClientCallback.Connect> task = new TaskCompletionSource<>();
......
...@@ -59,7 +59,7 @@ public class DDPClientImpl { ...@@ -59,7 +59,7 @@ public class DDPClientImpl {
CompositeDisposable disposables = new CompositeDisposable(); CompositeDisposable disposables = new CompositeDisposable();
disposables.add( disposables.add(
flowable.retry().filter(callback -> callback instanceof RxWebSocketCallback.Open) flowable.filter(callback -> callback instanceof RxWebSocketCallback.Open)
.subscribe( .subscribe(
callback -> callback ->
sendMessage("connect", sendMessage("connect",
...@@ -115,6 +115,7 @@ public class DDPClientImpl { ...@@ -115,6 +115,7 @@ public class DDPClientImpl {
if (requested) { if (requested) {
return flowable.filter(callback -> callback instanceof RxWebSocketCallback.Message) return flowable.filter(callback -> callback instanceof RxWebSocketCallback.Message)
.timeout(8, TimeUnit.SECONDS)
.map(callback -> ((RxWebSocketCallback.Message) callback).responseBodyString) .map(callback -> ((RxWebSocketCallback.Message) callback).responseBodyString)
.map(DDPClientImpl::toJson) .map(DDPClientImpl::toJson)
.filter(response -> "pong".equalsIgnoreCase(extractMsg(response))) .filter(response -> "pong".equalsIgnoreCase(extractMsg(response)))
......
...@@ -42,7 +42,6 @@ abstract class AbstractAuthedActivity extends AbstractFragmentActivity { ...@@ -42,7 +42,6 @@ abstract class AbstractAuthedActivity extends AbstractFragmentActivity {
} }
updateHostnameIfNeeded(rocketChatCache.getSelectedServerHostname()); updateHostnameIfNeeded(rocketChatCache.getSelectedServerHostname());
updateRoomIdIfNeeded(rocketChatCache.getSelectedRoomId());
} }
@Override @Override
...@@ -93,12 +92,14 @@ abstract class AbstractAuthedActivity extends AbstractFragmentActivity { ...@@ -93,12 +92,14 @@ abstract class AbstractAuthedActivity extends AbstractFragmentActivity {
if (hostname == null) { if (hostname == null) {
if (newHostname != null && assertServerRealmStoreExists(newHostname)) { if (newHostname != null && assertServerRealmStoreExists(newHostname)) {
updateHostname(newHostname); updateHostname(newHostname);
updateRoomIdIfNeeded(rocketChatCache.getSelectedRoomId());
} else { } else {
recoverFromHostnameError(); recoverFromHostnameError();
} }
} else { } else {
if (hostname.equals(newHostname)) { if (hostname.equals(newHostname)) {
updateHostname(newHostname); updateHostname(newHostname);
updateRoomIdIfNeeded(rocketChatCache.getSelectedRoomId());
return; return;
} }
......
...@@ -61,6 +61,7 @@ public class MainActivity extends AbstractAuthedActivity implements MainContract ...@@ -61,6 +61,7 @@ public class MainActivity extends AbstractAuthedActivity implements MainContract
statusTicker = new StatusTicker(); statusTicker = new StatusTicker();
pane = (SlidingPaneLayout) findViewById(R.id.sliding_pane); pane = (SlidingPaneLayout) findViewById(R.id.sliding_pane);
setupToolbar(); setupToolbar();
closeSidebarIfNeeded();
} }
@Override @Override
...@@ -249,7 +250,7 @@ public class MainActivity extends AbstractAuthedActivity implements MainContract ...@@ -249,7 +250,7 @@ public class MainActivity extends AbstractAuthedActivity implements MainContract
Snackbar.make(findViewById(getLayoutContainerForFragment()), Snackbar.make(findViewById(getLayoutContainerForFragment()),
R.string.fragment_retry_login_error_title, Snackbar.LENGTH_INDEFINITE) R.string.fragment_retry_login_error_title, Snackbar.LENGTH_INDEFINITE)
.setAction(R.string.fragment_retry_login_retry_title, view -> .setAction(R.string.fragment_retry_login_retry_title, view ->
presenter.onRetryLogin())); ConnectivityManager.getInstance(getApplicationContext()).keepAliveServer()));
} }
@Override @Override
......
...@@ -226,9 +226,11 @@ public class MainPresenter extends BasePresenter<MainContract.View> ...@@ -226,9 +226,11 @@ public class MainPresenter extends BasePresenter<MainContract.View>
if (connectivity.state == ServerConnectivity.STATE_CONNECTED) { if (connectivity.state == ServerConnectivity.STATE_CONNECTED) {
view.showConnectionOk(); view.showConnectionOk();
view.refreshRoom(); view.refreshRoom();
return; } else if (connectivity.state == ServerConnectivity.STATE_DISCONNECTED) {
view.showConnectionError();
} else {
view.showConnecting();
} }
view.showConnecting();
}, },
Logger::report Logger::report
); );
......
...@@ -21,6 +21,7 @@ import chat.rocket.persistence.realm.models.RealmBasedServerInfo; ...@@ -21,6 +21,7 @@ import chat.rocket.persistence.realm.models.RealmBasedServerInfo;
import hugo.weaving.DebugLog; import hugo.weaving.DebugLog;
import rx.Observable; import rx.Observable;
import rx.Single; import rx.Single;
import rx.schedulers.Schedulers;
import rx.subjects.PublishSubject; import rx.subjects.PublishSubject;
/** /**
...@@ -28,7 +29,7 @@ import rx.subjects.PublishSubject; ...@@ -28,7 +29,7 @@ import rx.subjects.PublishSubject;
*/ */
/*package*/ class RealmBasedConnectivityManager /*package*/ class RealmBasedConnectivityManager
implements ConnectivityManagerApi, ConnectivityManagerInternal { implements ConnectivityManagerApi, ConnectivityManagerInternal {
private final ConcurrentHashMap<String, Integer> serverConnectivityList = new ConcurrentHashMap<>(); private volatile ConcurrentHashMap<String, Integer> serverConnectivityList = new ConcurrentHashMap<>();
private final PublishSubject<ServerConnectivity> connectivitySubject = PublishSubject.create(); private final PublishSubject<ServerConnectivity> connectivitySubject = PublishSubject.create();
private Context appContext; private Context appContext;
private final ServiceConnection serviceConnection = new ServiceConnection() { private final ServiceConnection serviceConnection = new ServiceConnection() {
...@@ -70,9 +71,16 @@ import rx.subjects.PublishSubject; ...@@ -70,9 +71,16 @@ import rx.subjects.PublishSubject;
@Override @Override
public void ensureConnections() { public void ensureConnections() {
String hostname = new RocketChatCache(appContext).getSelectedServerHostname(); String hostname = new RocketChatCache(appContext).getSelectedServerHostname();
if (hostname == null) {
return;
}
connectToServerIfNeeded(hostname, true/* force connect */) connectToServerIfNeeded(hostname, true/* force connect */)
.subscribeOn(Schedulers.io())
.subscribe(_val -> { .subscribe(_val -> {
}, RCLog::e); }, error -> {
RCLog.e(error);
notifyConnectionLost(hostname, REASON_NETWORK_ERROR);
});
} }
@Override @Override
...@@ -167,13 +175,17 @@ import rx.subjects.PublishSubject; ...@@ -167,13 +175,17 @@ import rx.subjects.PublishSubject;
.flatMap(_val -> connectToServerIfNeeded(hostname, forceConnect)); .flatMap(_val -> connectToServerIfNeeded(hostname, forceConnect));
} }
if (connectivity == ServerConnectivity.STATE_CONNECTING) { // if (connectivity == ServerConnectivity.STATE_CONNECTING) {
return waitForConnected(hostname); // return waitForConnected(hostname)
// .doOnError(error -> notifyConnectionLost(hostname, REASON_NETWORK_ERROR));
// }
if (connectivity == ServerConnectivity.STATE_DISCONNECTED) {
notifyConnecting(hostname);
} }
return connectToServer(hostname) return connectToServer(hostname);
.doOnError(RCLog::e) // .retryWhen(RxHelper.exponentialBackoff(1, 500, TimeUnit.MILLISECONDS));
.retryWhen(RxHelper.exponentialBackoff(Integer.MAX_VALUE, 500, TimeUnit.MILLISECONDS));
}); });
} }
...@@ -186,7 +198,7 @@ import rx.subjects.PublishSubject; ...@@ -186,7 +198,7 @@ import rx.subjects.PublishSubject;
if (connectivity == ServerConnectivity.STATE_CONNECTING) { if (connectivity == ServerConnectivity.STATE_CONNECTING) {
return waitForConnected(hostname) return waitForConnected(hostname)
.onErrorReturn(err -> true) .doOnError(err -> notifyConnectionLost(hostname, REASON_NETWORK_ERROR))
.flatMap(_val -> disconnectFromServerIfNeeded(hostname)); .flatMap(_val -> disconnectFromServerIfNeeded(hostname));
} }
...@@ -195,8 +207,7 @@ import rx.subjects.PublishSubject; ...@@ -195,8 +207,7 @@ import rx.subjects.PublishSubject;
} }
return disconnectFromServer(hostname) return disconnectFromServer(hostname)
//.doOnError(RCLog::e) .retryWhen(RxHelper.exponentialBackoff(1, 500, TimeUnit.MILLISECONDS));
.retryWhen(RxHelper.exponentialBackoff(3, 500, TimeUnit.MILLISECONDS));
}); });
} }
...@@ -237,7 +248,7 @@ import rx.subjects.PublishSubject; ...@@ -237,7 +248,7 @@ import rx.subjects.PublishSubject;
if (serverConnectivityList.get(hostname) != ServerConnectivity.STATE_CONNECTED) { if (serverConnectivityList.get(hostname) != ServerConnectivity.STATE_CONNECTED) {
// Mark as CONNECTING except for the case [forceConnect && connected] because // Mark as CONNECTING except for the case [forceConnect && connected] because
// ensureConnectionToServer doesn't notify ConnectionEstablished/Lost is already connected. // ensureConnectionToServer doesn't notify ConnectionEstablished/Lost is already connected.
serverConnectivityList.put(hostname, ServerConnectivity.STATE_CONNECTING); // serverConnectivityList.put(hostname, ServerConnectivity.STATE_CONNECTING);
} }
if (serviceInterface != null) { if (serviceInterface != null) {
......
...@@ -8,7 +8,7 @@ import android.os.Binder; ...@@ -8,7 +8,7 @@ import android.os.Binder;
import android.os.IBinder; import android.os.IBinder;
import android.support.annotation.Nullable; import android.support.annotation.Nullable;
import java.util.HashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore; import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
...@@ -24,8 +24,8 @@ import rx.Single; ...@@ -24,8 +24,8 @@ import rx.Single;
public class RocketChatService extends Service implements ConnectivityServiceInterface { public class RocketChatService extends Service implements ConnectivityServiceInterface {
private ConnectivityManagerInternal connectivityManager; private ConnectivityManagerInternal connectivityManager;
private HashMap<String, RocketChatWebSocketThread> webSocketThreads; private static volatile ConcurrentHashMap<String, RocketChatWebSocketThread> webSocketThreads;
private Semaphore webSocketThreadLock = new Semaphore(1); private static volatile Semaphore webSocketThreadLock = new Semaphore(1);
public class LocalBinder extends Binder { public class LocalBinder extends Binder {
ConnectivityServiceInterface getServiceInterface() { ConnectivityServiceInterface getServiceInterface() {
...@@ -57,7 +57,7 @@ public class RocketChatService extends Service implements ConnectivityServiceInt ...@@ -57,7 +57,7 @@ public class RocketChatService extends Service implements ConnectivityServiceInt
super.onCreate(); super.onCreate();
connectivityManager = ConnectivityManager.getInstanceForInternal(getApplicationContext()); connectivityManager = ConnectivityManager.getInstanceForInternal(getApplicationContext());
connectivityManager.resetConnectivityStateList(); connectivityManager.resetConnectivityStateList();
webSocketThreads = new HashMap<>(); webSocketThreads = new ConcurrentHashMap<>();
} }
@DebugLog @DebugLog
...@@ -71,8 +71,9 @@ public class RocketChatService extends Service implements ConnectivityServiceInt ...@@ -71,8 +71,9 @@ public class RocketChatService extends Service implements ConnectivityServiceInt
public Single<Boolean> ensureConnectionToServer(String hostname) { //called via binder. public Single<Boolean> ensureConnectionToServer(String hostname) { //called via binder.
return getOrCreateWebSocketThread(hostname) return getOrCreateWebSocketThread(hostname)
.doOnError(err -> { .doOnError(err -> {
err.printStackTrace();
webSocketThreads.remove(hostname); webSocketThreads.remove(hostname);
connectivityManager.notifyConnectionLost(hostname, ConnectivityManagerInternal.REASON_NETWORK_ERROR); // connectivityManager.notifyConnectionLost(hostname, ConnectivityManagerInternal.REASON_NETWORK_ERROR);
}) })
.flatMap(webSocketThreads -> webSocketThreads.keepAlive()); .flatMap(webSocketThreads -> webSocketThreads.keepAlive());
} }
......
...@@ -11,6 +11,7 @@ import java.util.ArrayList; ...@@ -11,6 +11,7 @@ import java.util.ArrayList;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import bolts.Task; import bolts.Task;
import chat.rocket.android.api.MethodCallHelper; import chat.rocket.android.api.MethodCallHelper;
...@@ -301,7 +302,6 @@ public class RocketChatWebSocketThread extends HandlerThread { ...@@ -301,7 +302,6 @@ public class RocketChatWebSocketThread extends HandlerThread {
if (reconnectSubscription.hasSubscriptions()) { if (reconnectSubscription.hasSubscriptions()) {
return; return;
} }
DDPClient.get().close();
forceInvalidateTokens(); forceInvalidateTokens();
connectivityManager.notifyConnecting(hostname); connectivityManager.notifyConnecting(hostname);
// Needed to use subscriptions because of legacy code. // Needed to use subscriptions because of legacy code.
...@@ -315,7 +315,11 @@ public class RocketChatWebSocketThread extends HandlerThread { ...@@ -315,7 +315,11 @@ public class RocketChatWebSocketThread extends HandlerThread {
} }
reconnectSubscription.clear(); reconnectSubscription.clear();
}, },
err -> logErrorAndUnsubscribe(reconnectSubscription, err) err -> {
logErrorAndUnsubscribe(reconnectSubscription, err);
connectivityManager.notifyConnectionLost(hostname,
ConnectivityManagerInternal.REASON_NETWORK_ERROR);
}
) )
); );
} }
...@@ -326,7 +330,7 @@ public class RocketChatWebSocketThread extends HandlerThread { ...@@ -326,7 +330,7 @@ public class RocketChatWebSocketThread extends HandlerThread {
} }
private Single<Boolean> connectWithExponentialBackoff() { private Single<Boolean> connectWithExponentialBackoff() {
return connect().retryWhen(RxHelper.exponentialBackoff(Integer.MAX_VALUE, 500, TimeUnit.MILLISECONDS)); return connect().retryWhen(RxHelper.exponentialBackoff(3, 500, TimeUnit.MILLISECONDS));
} }
@DebugLog @DebugLog
...@@ -429,7 +433,7 @@ public class RocketChatWebSocketThread extends HandlerThread { ...@@ -429,7 +433,7 @@ public class RocketChatWebSocketThread extends HandlerThread {
RCLog.e(error); RCLog.e(error);
// Stop pinging // Stop pinging
hearbeatDisposable.clear(); hearbeatDisposable.clear();
if (error instanceof DDPClientCallback.Closed) { if (error instanceof DDPClientCallback.Closed || error instanceof TimeoutException) {
RCLog.d("Hearbeat failure: retrying connection..."); RCLog.d("Hearbeat failure: retrying connection...");
reconnect(); reconnect();
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment