Commit abcedb84 authored by Tiago Cunha's avatar Tiago Cunha Committed by GitHub

Merge pull request #194 from RocketChat/fix_issue_getOrCreateWebSocket

fix issue that getOrCreateWebSocket is unexpectedly called repeatedly.
parents 86adbe67 559a3128
......@@ -64,9 +64,7 @@ import rx.subjects.PublishSubject;
@Override
public void ensureConnections() {
for (String hostname : serverConnectivityList.keySet()) {
connectToServer(hostname) //force connect.
//.doOnError(RCLog::e)
.retryWhen(RxHelper.exponentialBackoff(3, 500, TimeUnit.MILLISECONDS))
connectToServerIfNeeded(hostname, true/* force connect */)
.subscribe(_val -> { }, RCLog::e);
}
}
......@@ -78,7 +76,7 @@ import rx.subjects.PublishSubject;
if (!serverConnectivityList.containsKey(hostname)) {
serverConnectivityList.put(hostname, ServerConnectivity.STATE_DISCONNECTED);
}
connectToServerIfNeeded(hostname)
connectToServerIfNeeded(hostname, false)
.subscribe(_val -> { }, RCLog::e);
}
......@@ -93,7 +91,7 @@ import rx.subjects.PublishSubject;
@Override
public Single<Boolean> connect(String hostname) {
return connectToServerIfNeeded(hostname);
return connectToServerIfNeeded(hostname, false);
}
@Override
......@@ -136,16 +134,16 @@ import rx.subjects.PublishSubject;
return Observable.concat(Observable.from(getCurrentConnectivityList()), connectivitySubject);
}
private Single<Boolean> connectToServerIfNeeded(String hostname) {
private Single<Boolean> connectToServerIfNeeded(String hostname, boolean forceConnect) {
return Single.defer(() -> {
final int connectivity = serverConnectivityList.get(hostname);
if (connectivity == ServerConnectivity.STATE_CONNECTED) {
if (!forceConnect && connectivity == ServerConnectivity.STATE_CONNECTED) {
return Single.just(true);
}
if (connectivity == ServerConnectivity.STATE_DISCONNECTING) {
return waitForDisconnected(hostname)
.flatMap(_val -> connectToServerIfNeeded(hostname));
.flatMap(_val -> connectToServerIfNeeded(hostname, forceConnect));
}
if (connectivity == ServerConnectivity.STATE_CONNECTING) {
......@@ -167,6 +165,7 @@ import rx.subjects.PublishSubject;
if (connectivity == ServerConnectivity.STATE_CONNECTING) {
return waitForConnected(hostname)
.onErrorReturn(err -> true)
.flatMap(_val -> disconnectFromServerIfNeeded(hostname));
}
......@@ -183,28 +182,40 @@ import rx.subjects.PublishSubject;
private Single<Boolean> waitForConnected(String hostname) {
return connectivitySubject
.filter(serverConnectivity -> (hostname.equals(serverConnectivity.hostname)
&& serverConnectivity.state == ServerConnectivity.STATE_CONNECTED))
.filter(serverConnectivity -> hostname.equals(serverConnectivity.hostname))
.map(serverConnectivity -> serverConnectivity.state)
.filter(state ->
state == ServerConnectivity.STATE_CONNECTED || state == ServerConnectivity.STATE_DISCONNECTED)
.first()
.map(serverConnectivity -> true)
.toSingle();
.toSingle()
.flatMap(state ->
state == ServerConnectivity.STATE_CONNECTED
? Single.just(true)
: Single.error(new ServerConnectivity.DisconnectedException()));
}
private Single<Boolean> waitForDisconnected(String hostname) {
return connectivitySubject
.filter(serverConnectivity -> (hostname.equals(serverConnectivity.hostname)
&& serverConnectivity.state == ServerConnectivity.STATE_DISCONNECTED))
.filter(serverConnectivity -> hostname.equals(serverConnectivity.hostname))
.map(serverConnectivity -> serverConnectivity.state)
.filter(state -> state == ServerConnectivity.STATE_DISCONNECTED)
.first()
.map(serverConnectivity -> true)
.toSingle();
.toSingle()
.map(state -> true);
}
@DebugLog
private Single<Boolean> connectToServer(String hostname) {
return Single.defer(() -> {
if (!serverConnectivityList.containsKey(hostname)) {
return Single.error(new IllegalArgumentException("hostname not found"));
}
if (serverConnectivityList.get(hostname) != ServerConnectivity.STATE_CONNECTED) {
// Mark as CONNECTING except for the case [forceConnect && connected] because
// ensureConnectionToServer doesn't notify ConnectionEstablished/Lost is already connected.
serverConnectivityList.put(hostname, ServerConnectivity.STATE_CONNECTING);
}
if (serviceInterface != null) {
return serviceInterface.ensureConnectionToServer(hostname);
......
......@@ -93,14 +93,8 @@ public class RocketChatService extends Service implements ConnectivityServiceInt
return Single.defer(() -> {
if (webSocketThreads.containsKey(hostname)) {
RocketChatWebSocketThread thread = webSocketThreads.get(hostname);
if (thread != null) {
return Single.just(thread);
} else {
return Observable.timer(1, TimeUnit.SECONDS).toSingle()
.flatMap(_val -> getOrCreateWebSocketThread(hostname));
}
}
webSocketThreads.put(hostname, null);
return RocketChatWebSocketThread.getStarted(getApplicationContext(), hostname)
.doOnSuccess(thread -> webSocketThreads.put(hostname, thread));
});
......
......@@ -16,4 +16,13 @@ public class ServerConnectivity {
this.hostname = hostname;
this.state = state;
}
/**
* This exception should be thrown when connection is lost during waiting for CONNECTED.
*/
public static class DisconnectedException extends Exception {
public DisconnectedException() {
super("Disconnected");
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment