Commit 40a217f0 authored by Leonardo Aramaki's avatar Leonardo Aramaki

Added a semaphore lock in RocketChatService method getStarted()

preventing multiple threads inconsistently creating multiple websocket
threads for the same host and thus avoiding connecting to server multiple times unnecessarily.
Changed network changing fallback implemention to try to reconnect to
automatically and force-invalidating the session token before doing
this.
parent 971b7569
package chat.rocket.android_ddp; package chat.rocket.android_ddp;
import android.os.SystemClock;
import android.support.annotation.NonNull; import android.support.annotation.NonNull;
import android.support.annotation.Nullable; import android.support.annotation.Nullable;
import android.text.TextUtils; import android.text.TextUtils;
...@@ -9,7 +8,6 @@ import org.json.JSONArray; ...@@ -9,7 +8,6 @@ import org.json.JSONArray;
import org.json.JSONException; import org.json.JSONException;
import org.json.JSONObject; import org.json.JSONObject;
import java.io.IOException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
...@@ -21,11 +19,8 @@ import chat.rocket.android_ddp.rx.RxWebSocketCallback; ...@@ -21,11 +19,8 @@ import chat.rocket.android_ddp.rx.RxWebSocketCallback;
import io.reactivex.Flowable; import io.reactivex.Flowable;
import io.reactivex.disposables.CompositeDisposable; import io.reactivex.disposables.CompositeDisposable;
import okhttp3.OkHttpClient; import okhttp3.OkHttpClient;
import okhttp3.Response;
public class DDPClientImpl { public class DDPClientImpl {
public static final int CLOSED_NORMALLY = 1000;
public static final int CLOSED_NOT_ALIVE = 1001;
private final DDPClient client; private final DDPClient client;
private RxWebSocket websocket; private RxWebSocket websocket;
private Flowable<RxWebSocketCallback.Base> flowable; private Flowable<RxWebSocketCallback.Base> flowable;
...@@ -123,6 +118,7 @@ public class DDPClientImpl { ...@@ -123,6 +118,7 @@ public class DDPClientImpl {
disposables.add( disposables.add(
flowable.filter(callback -> callback instanceof RxWebSocketCallback.Message) flowable.filter(callback -> callback instanceof RxWebSocketCallback.Message)
.timeout(8, TimeUnit.SECONDS)
.map(callback -> ((RxWebSocketCallback.Message) callback).responseBodyString) .map(callback -> ((RxWebSocketCallback.Message) callback).responseBodyString)
.map(DDPClientImpl::toJson) .map(DDPClientImpl::toJson)
.subscribe( .subscribe(
...@@ -291,7 +287,6 @@ public class DDPClientImpl { ...@@ -291,7 +287,6 @@ public class DDPClientImpl {
response -> { response -> {
String msg = extractMsg(response); String msg = extractMsg(response);
if ("ping".equals(msg)) { if ("ping".equals(msg)) {
SystemClock.sleep(8000);
if (response.isNull("id")) { if (response.isNull("id")) {
sendMessage("pong", null); sendMessage("pong", null);
} else { } else {
......
...@@ -4,7 +4,6 @@ import java.io.IOException; ...@@ -4,7 +4,6 @@ import java.io.IOException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import chat.rocket.android.log.RCLog; import chat.rocket.android.log.RCLog;
import chat.rocket.android_ddp.DDPClientImpl;
import io.reactivex.BackpressureStrategy; import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable; import io.reactivex.Flowable;
import io.reactivex.FlowableOnSubscribe; import io.reactivex.FlowableOnSubscribe;
...@@ -17,10 +16,7 @@ import okhttp3.WebSocket; ...@@ -17,10 +16,7 @@ import okhttp3.WebSocket;
import okhttp3.WebSocketListener; import okhttp3.WebSocketListener;
public class RxWebSocket { public class RxWebSocket {
public static final int REASON_CLOSED_BY_USER = 101; public static final int REASON_NETWORK_ERROR = 100;
public static final int REASON_NETWORK_ERROR = 102;
public static final int REASON_SERVER_ERROR = 103;
public static final int REASON_UNKNOWN = 104;
private OkHttpClient httpClient; private OkHttpClient httpClient;
private WebSocket webSocket; private WebSocket webSocket;
private boolean hadErrorsBefore; private boolean hadErrorsBefore;
...@@ -62,17 +58,7 @@ public class RxWebSocket { ...@@ -62,17 +58,7 @@ public class RxWebSocket {
@Override @Override
public void onClosed(WebSocket webSocket, int code, String reason) { public void onClosed(WebSocket webSocket, int code, String reason) {
switch (code) { emitter.onNext(new RxWebSocketCallback.Close(webSocket, code, reason));
case DDPClientImpl.CLOSED_NORMALLY:
emitter.onNext(new RxWebSocketCallback.Close(webSocket, code, reason));
emitter.onComplete();
break;
case DDPClientImpl.CLOSED_NOT_ALIVE:
emitter.onNext(new RxWebSocketCallback.Failure(webSocket, new Exception(reason), null));
break;
default:
RCLog.e("Websocket closed abnormally");
}
} }
}), }),
BackpressureStrategy.BUFFER BackpressureStrategy.BUFFER
......
...@@ -163,10 +163,15 @@ public class MainPresenter extends BasePresenter<MainContract.View> ...@@ -163,10 +163,15 @@ public class MainPresenter extends BasePresenter<MainContract.View>
private void subscribeToNetworkChanges() { private void subscribeToNetworkChanges() {
Disposable disposable = RxJavaInterop.toV2Flowable(connectivityManagerApi.getServerConnectivityAsObservable()) Disposable disposable = RxJavaInterop.toV2Flowable(connectivityManagerApi.getServerConnectivityAsObservable())
.filter(connectivity -> connectivity.state == ServerConnectivity.STATE_DISCONNECTED)
.observeOn(AndroidSchedulers.mainThread()) .observeOn(AndroidSchedulers.mainThread())
.subscribe( .subscribe(
a -> view.showConnectionError(), connectivity -> {
if (connectivity.state == ServerConnectivity.STATE_CONNECTED) {
view.showConnectionOk();
return;
}
view.showConnecting();
},
Logger::report Logger::report
); );
......
...@@ -9,6 +9,7 @@ import android.os.IBinder; ...@@ -9,6 +9,7 @@ import android.os.IBinder;
import android.support.annotation.Nullable; import android.support.annotation.Nullable;
import java.util.HashMap; import java.util.HashMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import chat.rocket.android.activity.MainActivity; import chat.rocket.android.activity.MainActivity;
...@@ -24,6 +25,7 @@ public class RocketChatService extends Service implements ConnectivityServiceInt ...@@ -24,6 +25,7 @@ public class RocketChatService extends Service implements ConnectivityServiceInt
private ConnectivityManagerInternal connectivityManager; private ConnectivityManagerInternal connectivityManager;
private HashMap<String, RocketChatWebSocketThread> webSocketThreads; private HashMap<String, RocketChatWebSocketThread> webSocketThreads;
private Semaphore webSocketThreadLock = new Semaphore(1);
public class LocalBinder extends Binder { public class LocalBinder extends Binder {
ConnectivityServiceInterface getServiceInterface() { ConnectivityServiceInterface getServiceInterface() {
...@@ -49,6 +51,7 @@ public class RocketChatService extends Service implements ConnectivityServiceInt ...@@ -49,6 +51,7 @@ public class RocketChatService extends Service implements ConnectivityServiceInt
context.unbindService(serviceConnection); context.unbindService(serviceConnection);
} }
@DebugLog
@Override @Override
public void onCreate() { public void onCreate() {
super.onCreate(); super.onCreate();
...@@ -57,6 +60,7 @@ public class RocketChatService extends Service implements ConnectivityServiceInt ...@@ -57,6 +60,7 @@ public class RocketChatService extends Service implements ConnectivityServiceInt
webSocketThreads = new HashMap<>(); webSocketThreads = new HashMap<>();
} }
@DebugLog
@Override @Override
public int onStartCommand(Intent intent, int flags, int startId) { public int onStartCommand(Intent intent, int flags, int startId) {
connectivityManager.ensureConnections(); connectivityManager.ensureConnections();
...@@ -106,12 +110,17 @@ public class RocketChatService extends Service implements ConnectivityServiceInt ...@@ -106,12 +110,17 @@ public class RocketChatService extends Service implements ConnectivityServiceInt
@DebugLog @DebugLog
private Single<RocketChatWebSocketThread> getOrCreateWebSocketThread(String hostname) { private Single<RocketChatWebSocketThread> getOrCreateWebSocketThread(String hostname) {
return Single.defer(() -> { return Single.defer(() -> {
webSocketThreadLock.acquire();
if (webSocketThreads.containsKey(hostname)) { if (webSocketThreads.containsKey(hostname)) {
RocketChatWebSocketThread thread = webSocketThreads.get(hostname); RocketChatWebSocketThread thread = webSocketThreads.get(hostname);
webSocketThreadLock.release();
return Single.just(thread); return Single.just(thread);
} }
return RocketChatWebSocketThread.getStarted(getApplicationContext(), hostname) return RocketChatWebSocketThread.getStarted(getApplicationContext(), hostname)
.doOnSuccess(thread -> webSocketThreads.put(hostname, thread)); .doOnSuccess(thread -> {
webSocketThreads.put(hostname, thread);
webSocketThreadLock.release();
});
}); });
} }
......
...@@ -9,12 +9,14 @@ import org.json.JSONObject; ...@@ -9,12 +9,14 @@ import org.json.JSONObject;
import java.lang.reflect.Constructor; import java.lang.reflect.Constructor;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Iterator; import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import bolts.Task; import bolts.Task;
import chat.rocket.android.RocketChatCache; import chat.rocket.android.RocketChatCache;
import chat.rocket.android.api.DDPClientWrapper; import chat.rocket.android.api.DDPClientWrapper;
import chat.rocket.android.api.MethodCallHelper; import chat.rocket.android.api.MethodCallHelper;
import chat.rocket.android.helper.LogIfError; import chat.rocket.android.helper.LogIfError;
import chat.rocket.android.helper.RxHelper;
import chat.rocket.android.helper.TextUtils; import chat.rocket.android.helper.TextUtils;
import chat.rocket.android.log.RCLog; import chat.rocket.android.log.RCLog;
import chat.rocket.android.service.ddp.base.ActiveUsersSubscriber; import chat.rocket.android.service.ddp.base.ActiveUsersSubscriber;
...@@ -37,8 +39,8 @@ import chat.rocket.persistence.realm.RealmHelper; ...@@ -37,8 +39,8 @@ import chat.rocket.persistence.realm.RealmHelper;
import chat.rocket.persistence.realm.RealmStore; import chat.rocket.persistence.realm.RealmStore;
import chat.rocket.persistence.realm.models.internal.RealmSession; import chat.rocket.persistence.realm.models.internal.RealmSession;
import hugo.weaving.DebugLog; import hugo.weaving.DebugLog;
import io.reactivex.disposables.CompositeDisposable;
import rx.Single; import rx.Single;
import rx.subscriptions.CompositeSubscription;
/** /**
* Thread for handling WebSocket connection. * Thread for handling WebSocket connection.
...@@ -65,17 +67,16 @@ public class RocketChatWebSocketThread extends HandlerThread { ...@@ -65,17 +67,16 @@ public class RocketChatWebSocketThread extends HandlerThread {
private final RealmHelper realmHelper; private final RealmHelper realmHelper;
private final ConnectivityManagerInternal connectivityManager; private final ConnectivityManagerInternal connectivityManager;
private final ArrayList<Registrable> listeners = new ArrayList<>(); private final ArrayList<Registrable> listeners = new ArrayList<>();
private static DDPClientWrapper ddpClient; private DDPClientWrapper ddpClient;
private boolean listenersRegistered; private boolean listenersRegistered;
private RocketChatCache rocketChatCache; private RocketChatCache rocketChatCache;
private DDPClientRef ddpClientRef = new DDPClientRef() { private final DDPClientRef ddpClientRef = new DDPClientRef() {
@Override @Override
public DDPClientWrapper get() { public DDPClientWrapper get() {
return ddpClient; return ddpClient;
} }
}; };
private static class KeepAliveTimer { private static class KeepAliveTimer {
private long lastTime; private long lastTime;
private final long thresholdMs; private final long thresholdMs;
...@@ -223,12 +224,6 @@ public class RocketChatWebSocketThread extends HandlerThread { ...@@ -223,12 +224,6 @@ public class RocketChatWebSocketThread extends HandlerThread {
if (!alive) { if (!alive) {
RCLog.d("DDPClient#create"); RCLog.d("DDPClient#create");
ddpClient = DDPClientWrapper.create(hostname); ddpClient = DDPClientWrapper.create(hostname);
ddpClientRef = new DDPClientRef() {
@Override
public DDPClientWrapper get() {
return ddpClient;
}
};
} }
}); });
} }
...@@ -246,9 +241,25 @@ public class RocketChatWebSocketThread extends HandlerThread { ...@@ -246,9 +241,25 @@ public class RocketChatWebSocketThread extends HandlerThread {
// handling WebSocket#onClose() callback. // handling WebSocket#onClose() callback.
task.getResult().client.getOnCloseCallback().onSuccess(_task -> { task.getResult().client.getOnCloseCallback().onSuccess(_task -> {
ddpClient.close(); ddpClient.close();
ddpClient = null; forceInvalidateTokens();
connectivityManager.notifyConnectionLost(hostname, connectivityManager.notifyConnecting(hostname);
ConnectivityManagerInternal.REASON_NETWORK_ERROR); // Needed to use subscriptions because of legacy code.
// TODO: Should update to RxJava 2
final CompositeSubscription subscriptions = new CompositeSubscription();
subscriptions.add(
connect().retryWhen(RxHelper.exponentialBackoff(3, 500, TimeUnit.MILLISECONDS))
.subscribe(
connected -> {
if (!connected) {
connectivityManager.notifyConnectionLost(
hostname, ConnectivityManagerInternal.REASON_NETWORK_ERROR
);
}
subscriptions.clear();
},
err -> logErrorAndUnsubscribe(subscriptions, err)
)
);
return null; return null;
}); });
...@@ -279,20 +290,9 @@ public class RocketChatWebSocketThread extends HandlerThread { ...@@ -279,20 +290,9 @@ public class RocketChatWebSocketThread extends HandlerThread {
})); }));
} }
private void logErrorAndDispose(Throwable throwable, CompositeDisposable disposables) { private void logErrorAndUnsubscribe(CompositeSubscription subscriptions, Throwable err) {
RCLog.e(throwable); RCLog.e(err);
disposables.clear(); subscriptions.clear();
}
private void forceRegisteringListeners() {
Iterator<Registrable> iterator = listeners.iterator();
while (iterator.hasNext()) {
Registrable registrable = iterator.next();
registrable.unregister();
iterator.remove();
}
listenersRegistered = false;
registerListeners();
} }
@DebugLog @DebugLog
...@@ -340,7 +340,7 @@ public class RocketChatWebSocketThread extends HandlerThread { ...@@ -340,7 +340,7 @@ public class RocketChatWebSocketThread extends HandlerThread {
} }
// Register for room stream messages // Register for room stream messages
String roomId = rocketChatCache.getSelectedRoomId(); String roomId = rocketChatCache.getSelectedRoomId();
if (roomId != null) { if (roomId != null && !roomId.isEmpty()) {
StreamRoomMessage streamRoomMessage = new StreamRoomMessage( StreamRoomMessage streamRoomMessage = new StreamRoomMessage(
appContext, hostname, realmHelper, ddpClientRef, roomId appContext, hostname, realmHelper, ddpClientRef, roomId
); );
......
...@@ -185,7 +185,7 @@ public abstract class AbstractDDPDocEventSubscriber implements Registrable { ...@@ -185,7 +185,7 @@ public abstract class AbstractDDPDocEventSubscriber implements Registrable {
if (rxSubscription != null) { if (rxSubscription != null) {
rxSubscription.dispose(); rxSubscription.dispose();
} }
if (!TextUtils.isEmpty(subscriptionId)) { if (!TextUtils.isEmpty(subscriptionId) && ddpClientRef.get() != null) {
ddpClientRef.get().unsubscribe(subscriptionId).continueWith(new LogIfError()); ddpClientRef.get().unsubscribe(subscriptionId).continueWith(new LogIfError());
} }
} }
......
...@@ -2,6 +2,7 @@ package chat.rocket.android.service.internal; ...@@ -2,6 +2,7 @@ package chat.rocket.android.service.internal;
import android.content.Context; import android.content.Context;
import chat.rocket.android.log.RCLog;
import io.reactivex.disposables.CompositeDisposable; import io.reactivex.disposables.CompositeDisposable;
import chat.rocket.android.RocketChatCache; import chat.rocket.android.RocketChatCache;
...@@ -47,7 +48,7 @@ public abstract class AbstractRocketChatCacheObserver implements Registrable { ...@@ -47,7 +48,7 @@ public abstract class AbstractRocketChatCacheObserver implements Registrable {
compositeDisposable.add( compositeDisposable.add(
new RocketChatCache(context) new RocketChatCache(context)
.getSelectedRoomIdPublisher() .getSelectedRoomIdPublisher()
.subscribe(this::updateRoomIdWith) .subscribe(this::updateRoomIdWith, RCLog::e)
); );
} }
......
...@@ -33,5 +33,6 @@ public class RocketChatWidgets { ...@@ -33,5 +33,6 @@ public class RocketChatWidgets {
CustomImageFormatConfigurator.addCustomDrawableFactories(draweeConfigBuilder); CustomImageFormatConfigurator.addCustomDrawableFactories(draweeConfigBuilder);
Fresco.initialize(context, imagePipelineConfig, draweeConfigBuilder.build()); Fresco.initialize(context, imagePipelineConfig, draweeConfigBuilder.build());
FLog.setMinimumLoggingLevel(FLog.ERROR);
} }
} }
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment