Commit 26615672 authored by Yusuke Iwaki's avatar Yusuke Iwaki

fix keepalive.

parent 5e11a6bd
......@@ -112,6 +112,7 @@ public class MethodCallHelper {
.put("sessionId", Session.DEFAULT_ID)
.put("token", task.getResult())
.put("tokenVerified", true)
.put("error", JSONObject.NULL)
));
}
......
......@@ -13,9 +13,7 @@ import chat.rocket.android.realm_helper.RealmListObserver;
import chat.rocket.android.realm_helper.RealmStore;
import io.realm.RealmResults;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
/**
* Background service for Rocket.Chat.Application class.
......@@ -42,7 +40,7 @@ public class RocketChatService extends Service {
.isNotNull("hostname")
.equalTo("state", ServerConfig.STATE_READY)
.findAll())
.setOnUpdateListener(this::syncWebSocketThreadsWith);
.setOnUpdateListener(this::connectToServerWithServerConfig);
refreshServerConfigState();
}
......@@ -51,17 +49,37 @@ public class RocketChatService extends Service {
realmHelper.executeTransaction(realm -> {
RealmResults<ServerConfig> configs = realm.where(ServerConfig.class).findAll();
for (ServerConfig config: configs) {
if (config.getState() != ServerConfig.STATE_READY) {
config.setState(ServerConfig.STATE_READY);
}
}
return null;
}).continueWith(new LogcatIfError());;
}
@Override public int onStartCommand(Intent intent, int flags, int startId) {
List<ServerConfig> configs = realmHelper.executeTransactionForReadResults(realm ->
realm.where(ServerConfig.class)
.equalTo("state", ServerConfig.STATE_CONNECTED)
.findAll());
for (ServerConfig config: configs) {
String serverConfigId = config.getServerConfigId();
if (webSocketThreads.containsKey(serverConfigId)) {
RocketChatWebSocketThread thread = webSocketThreads.get(serverConfigId);
if (thread != null) {
thread.keepalive();
}
}
}
realmHelper.executeTransaction(realm -> {
RealmResults<ServerConfig> targetConfigs = realm
.where(ServerConfig.class)
.beginGroup()
.equalTo("state", ServerConfig.STATE_CONNECTION_ERROR)
.or()
.isNotNull("error")
.endGroup()
.isNotNull("session")
.findAll();
for (ServerConfig config : targetConfigs) {
......@@ -76,31 +94,13 @@ public class RocketChatService extends Service {
return START_STICKY;
}
private void syncWebSocketThreadsWith(List<ServerConfig> configList) {
final Iterator<Map.Entry<String, RocketChatWebSocketThread>> iterator =
webSocketThreads.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, RocketChatWebSocketThread> entry = iterator.next();
String serverConfigId = entry.getKey();
boolean found = false;
for (ServerConfig config : configList) {
if (serverConfigId.equals(config.getServerConfigId())) {
found = true;
break;
}
}
if (!found) {
RocketChatWebSocketThread thread = entry.getValue();
if (thread != null) {
RocketChatWebSocketThread.destroy(thread);
}
iterator.remove();
}
private void connectToServerWithServerConfig(List<ServerConfig> configList) {
if (configList.isEmpty()) {
return;
}
for (ServerConfig config : configList) {
findOrCreateWebSocketThread(config).onSuccess(task -> {
ServerConfig config = configList.get(0);
createWebSocketThread(config).onSuccess(task -> {
RocketChatWebSocketThread thread = task.getResult();
if (thread != null) {
thread.keepalive();
......@@ -108,19 +108,13 @@ public class RocketChatService extends Service {
return null;
});
}
}
private Task<RocketChatWebSocketThread> findOrCreateWebSocketThread(final ServerConfig config) {
private Task<RocketChatWebSocketThread> createWebSocketThread(final ServerConfig config) {
final String serverConfigId = config.getServerConfigId();
if (webSocketThreads.containsKey(serverConfigId)) {
return ServerConfig.updateState(serverConfigId, ServerConfig.STATE_CONNECTED)
.onSuccessTask(_task -> Task.forResult(webSocketThreads.get(serverConfigId)));
} else {
return ServerConfig.updateState(serverConfigId, ServerConfig.STATE_CONNECTING)
.onSuccessTask(_task -> {
webSocketThreads.put(serverConfigId, null);
return RocketChatWebSocketThread.getStarted(getApplicationContext(), config);
})
return ServerConfig.updateState(serverConfigId, ServerConfig.STATE_CONNECTING)
.onSuccessTask(_task ->
RocketChatWebSocketThread.getStarted(getApplicationContext(), config))
.onSuccessTask(task ->
ServerConfig.updateState(serverConfigId, ServerConfig.STATE_CONNECTED)
.onSuccessTask(_task -> task))
......@@ -129,6 +123,14 @@ public class RocketChatService extends Service {
return task;
});
}
private Task<RocketChatWebSocketThread> findOrCreateWebSocketThread(final ServerConfig config) {
final String serverConfigId = config.getServerConfigId();
if (webSocketThreads.containsKey(serverConfigId)) {
return Task.forResult(webSocketThreads.get(serverConfigId));
} else {
return createWebSocketThread(config);
}
}
@Nullable
......
......@@ -45,7 +45,6 @@ public class RocketChatWebSocketThread extends HandlerThread {
private final RealmHelper serverConfigRealm;
private final ArrayList<Registerable> listeners = new ArrayList<>();
private DDPClientWraper ddpClient;
private boolean socketExists;
private boolean listenersRegistered;
private RocketChatWebSocketThread(Context appContext, String serverConfigId) {
......@@ -116,22 +115,24 @@ public class RocketChatWebSocketThread extends HandlerThread {
}
}
private Task<Void> ensureConnection() {
if (ddpClient == null || !ddpClient.isConnected()) {
return connect();
} else {
return Task.forResult(null);
}
}
/**
* synchronize the state of the thread with ServerConfig.
*/
@DebugLog public void keepalive() {
ensureConnection().continueWith(task -> {
new Handler(getLooper()).post(this::keepaliveListeners);
if (ddpClient == null || !ddpClient.isConnected()) {
defaultRealm.executeTransaction(realm -> {
ServerConfig config = realm.where(ServerConfig.class)
.equalTo("serverConfigId", serverConfigId)
.findFirst();
if (config != null && config.getState() == ServerConfig.STATE_CONNECTED) {
config.setState(ServerConfig.STATE_READY);
quit();
}
return null;
});
} else {
new Handler(getLooper()).post(this::keepaliveListeners);
}
}
private void prepareWebSocket(String hostname) {
......@@ -141,11 +142,6 @@ public class RocketChatWebSocketThread extends HandlerThread {
}
@DebugLog private Task<Void> connect() {
if (socketExists) {
return Task.forResult(null);
}
socketExists = true;
final ServerConfig config = defaultRealm.executeTransactionForRead(realm ->
realm.where(ServerConfig.class).equalTo("serverConfigId", serverConfigId).findFirst());
......@@ -229,7 +225,7 @@ public class RocketChatWebSocketThread extends HandlerThread {
//@DebugLog
private void keepaliveListeners() {
if (!socketExists || !listenersRegistered) {
if (!listenersRegistered) {
return;
}
......@@ -240,7 +236,7 @@ public class RocketChatWebSocketThread extends HandlerThread {
@DebugLog
private void unregisterListeners() {
if (!socketExists || !listenersRegistered) {
if (!listenersRegistered) {
return;
}
......@@ -255,6 +251,5 @@ public class RocketChatWebSocketThread extends HandlerThread {
ddpClient = null;
}
listenersRegistered = false;
socketExists = false;
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment