RocketChatWebSocketThread.java 9.65 KB
Newer Older
1 2 3 4 5
package chat.rocket.android.service;

import android.content.Context;
import android.os.Handler;
import android.os.HandlerThread;
Yusuke Iwaki's avatar
Yusuke Iwaki committed
6 7 8 9 10
import org.json.JSONObject;

import java.lang.reflect.Constructor;
import java.util.ArrayList;
import java.util.Iterator;
11
import bolts.Continuation;
12 13
import bolts.Task;
import bolts.TaskCompletionSource;
14
import chat.rocket.android.api.DDPClientWrapper;
Tiago Cunha's avatar
Tiago Cunha committed
15
import chat.rocket.android.api.MethodCallHelper;
16
import chat.rocket.android.helper.LogcatIfError;
17
import chat.rocket.android.helper.TextUtils;
Yusuke Iwaki's avatar
Yusuke Iwaki committed
18
import chat.rocket.android.log.RCLog;
19
import chat.rocket.android.model.ServerConfig;
20 21 22
import chat.rocket.android.model.internal.Session;
import chat.rocket.android.realm_helper.RealmHelper;
import chat.rocket.android.realm_helper.RealmStore;
23 24 25 26
import chat.rocket.android.service.ddp.base.ActiveUsersSubscriber;
import chat.rocket.android.service.ddp.base.LoginServiceConfigurationSubscriber;
import chat.rocket.android.service.ddp.base.UserDataSubscriber;
import chat.rocket.android.service.observer.CurrentUserObserver;
Yusuke Iwaki's avatar
Yusuke Iwaki committed
27
import chat.rocket.android.service.observer.FileUploadingToS3Observer;
Yusuke Iwaki's avatar
Yusuke Iwaki committed
28
import chat.rocket.android.service.observer.FileUploadingWithUfsObserver;
29
import chat.rocket.android.service.observer.GcmPushRegistrationObserver;
Yusuke Iwaki's avatar
Yusuke Iwaki committed
30
import chat.rocket.android.service.observer.GetUsersOfRoomsProcedureObserver;
31
import chat.rocket.android.service.observer.LoadMessageProcedureObserver;
32
import chat.rocket.android.service.observer.MethodCallObserver;
Yusuke Iwaki's avatar
Yusuke Iwaki committed
33
import chat.rocket.android.service.observer.NewMessageObserver;
Tiago Cunha's avatar
Tiago Cunha committed
34
import chat.rocket.android.service.observer.PushSettingsObserver;
35
import chat.rocket.android.service.observer.ReactiveNotificationManager;
36
import chat.rocket.android.service.observer.SessionObserver;
37
import chat.rocket.android.service.observer.TokenLoginObserver;
38
import chat.rocket.android_ddp.DDPClientCallback;
39 40
import hugo.weaving.DebugLog;

41 42 43
/**
 * Thread for handling WebSocket connection.
 */
44
public class RocketChatWebSocketThread extends HandlerThread {
45
  private static final Class[] REGISTERABLE_CLASSES = {
Yusuke Iwaki's avatar
Yusuke Iwaki committed
46
      LoginServiceConfigurationSubscriber.class,
47
      ActiveUsersSubscriber.class,
48
      UserDataSubscriber.class,
49
      TokenLoginObserver.class,
50
      MethodCallObserver.class,
51
      SessionObserver.class,
52
      LoadMessageProcedureObserver.class,
Yusuke Iwaki's avatar
Yusuke Iwaki committed
53
      GetUsersOfRoomsProcedureObserver.class,
54
      NewMessageObserver.class,
55 56
      CurrentUserObserver.class,
      ReactiveNotificationManager.class,
Yusuke Iwaki's avatar
Yusuke Iwaki committed
57
      FileUploadingToS3Observer.class,
Tiago Cunha's avatar
Tiago Cunha committed
58
      FileUploadingWithUfsObserver.class,
59 60
      PushSettingsObserver.class,
      GcmPushRegistrationObserver.class
61
  };
Yusuke Iwaki's avatar
Yusuke Iwaki committed
62 63
  private final Context appContext;
  private final String serverConfigId;
64 65
  private final RealmHelper defaultRealm;
  private final RealmHelper serverConfigRealm;
66 67
  private final ArrayList<Registrable> listeners = new ArrayList<>();
  private DDPClientWrapper ddpClient;
Yusuke Iwaki's avatar
Yusuke Iwaki committed
68
  private boolean listenersRegistered;
69 70 71

  private RocketChatWebSocketThread(Context appContext, String serverConfigId) {
    super("RC_thread_" + serverConfigId);
Yusuke Iwaki's avatar
Yusuke Iwaki committed
72
    this.appContext = appContext;
73 74 75
    this.serverConfigId = serverConfigId;
    defaultRealm = RealmStore.getDefault();
    serverConfigRealm = RealmStore.getOrCreate(serverConfigId);
76 77 78 79 80
  }

  /**
   * create new Thread.
   */
Yusuke Iwaki's avatar
Yusuke Iwaki committed
81 82 83
  @DebugLog
  public static Task<RocketChatWebSocketThread> getStarted(Context appContext,
                                                           ServerConfig config) {
84
    TaskCompletionSource<RocketChatWebSocketThread> task = new TaskCompletionSource<>();
Yusuke Iwaki's avatar
Yusuke Iwaki committed
85
    new RocketChatWebSocketThread(appContext, config.getServerConfigId()) {
Yusuke Iwaki's avatar
Yusuke Iwaki committed
86 87
      @Override
      protected void onLooperPrepared() {
88 89 90 91 92 93 94 95
        try {
          super.onLooperPrepared();
          task.setResult(this);
        } catch (Exception exception) {
          task.setError(exception);
        }
      }
    }.start();
96 97 98
    return task.getTask()
        .onSuccessTask(_task ->
            _task.getResult().connect().onSuccessTask(__task -> _task));
99 100
  }

Yusuke Iwaki's avatar
Yusuke Iwaki committed
101 102 103 104 105 106 107 108 109 110
  /**
   * destroy the thread.
   */
  @DebugLog
  public static void destroy(RocketChatWebSocketThread thread) {
    thread.quit();
  }

  @Override
  protected void onLooperPrepared() {
111 112
    super.onLooperPrepared();
    forceInvalidateTokens();
113 114
  }

115 116
  private void forceInvalidateTokens() {
    serverConfigRealm.executeTransaction(realm -> {
117
      Session session = Session.queryDefaultSession(realm).findFirst();
118 119 120 121 122 123 124 125
      if (session != null
          && !TextUtils.isEmpty(session.getToken())
          && (session.isTokenVerified() || !TextUtils.isEmpty(session.getError()))) {
        session.setTokenVerified(false);
        session.setError(null);
      }
      return null;
    }).continueWith(new LogcatIfError());
126 127
  }

Yusuke Iwaki's avatar
Yusuke Iwaki committed
128 129
  @Override
  public boolean quit() {
130
    if (isAlive()) {
131
      new Handler(getLooper()).post(() -> {
Yusuke Iwaki's avatar
Yusuke Iwaki committed
132
        RCLog.d("thread %s: quit()", Thread.currentThread().getId());
133 134 135
        unregisterListeners();
        RocketChatWebSocketThread.super.quit();
      });
136 137 138
      return true;
    } else {
      return super.quit();
139
    }
140
  }
141

142 143 144
  /**
   * synchronize the state of the thread with ServerConfig.
   */
Yusuke Iwaki's avatar
Yusuke Iwaki committed
145
  @DebugLog
Tiago Cunha's avatar
Tiago Cunha committed
146
  public void keepAlive() {
Yusuke Iwaki's avatar
Yusuke Iwaki committed
147 148 149
    if (ddpClient == null || !ddpClient.isConnected()) {
      defaultRealm.executeTransaction(realm -> {
        ServerConfig config = realm.where(ServerConfig.class)
Tiago Cunha's avatar
Tiago Cunha committed
150
            .equalTo(ServerConfig.ID, serverConfigId)
Yusuke Iwaki's avatar
Yusuke Iwaki committed
151 152 153 154 155 156 157 158
            .findFirst();
        if (config != null && config.getState() == ServerConfig.STATE_CONNECTED) {
          config.setState(ServerConfig.STATE_READY);
          quit();
        }
        return null;
      });
    }
159 160
  }

161
  private void prepareWebSocket(String hostname) {
162
    if (ddpClient == null || !ddpClient.isConnected()) {
163
      ddpClient = DDPClientWrapper.create(hostname);
164
    }
165
  }
166

Yusuke Iwaki's avatar
Yusuke Iwaki committed
167 168
  @DebugLog
  private Task<Void> connect() {
169
    final ServerConfig config = defaultRealm.executeTransactionForRead(realm ->
Tiago Cunha's avatar
Tiago Cunha committed
170
        realm.where(ServerConfig.class).equalTo(ServerConfig.ID, serverConfigId).findFirst());
171

172
    prepareWebSocket(config.getHostname());
173
    return ddpClient.connect(config.getSession(), config.usesSecureConnection())
Tiago Cunha's avatar
Tiago Cunha committed
174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220
        .onSuccessTask(task -> {
          final String session = task.getResult().session;
          defaultRealm.executeTransaction(realm ->
              realm.createOrUpdateObjectFromJson(ServerConfig.class, new JSONObject()
                  .put("serverConfigId", serverConfigId)
                  .put("session", session))
          ).onSuccess(_task -> serverConfigRealm.executeTransaction(realm -> {
            Session sessionObj = Session.queryDefaultSession(realm).findFirst();

            if (sessionObj == null) {
              realm.createOrUpdateObjectFromJson(Session.class,
                  new JSONObject().put("sessionId", Session.DEFAULT_ID));
            }
            return null;
          })).continueWith(new LogcatIfError());
          return task;
        })
        .onSuccess(new Continuation<DDPClientCallback.Connect, Void>() {
          // TODO type detection doesn't work due to retrolambda's bug...
          @Override
          public Void then(Task<DDPClientCallback.Connect> task)
              throws Exception {
            fetchPublicSettings();
            registerListeners();

            // handling WebSocket#onClose() callback.
            task.getResult().client.getOnCloseCallback().onSuccess(_task -> {
              quit();
              return null;
            }).continueWithTask(_task -> {
              if (_task.isFaulted()) {
                ServerConfig.logConnectionError(serverConfigId, _task.getError());
              }
              return _task;
            });

            return null;
          }
        })
        .continueWithTask(task -> {
          if (task.isFaulted()) {
            Exception error = task.getError();
            if (error instanceof DDPClientCallback.Connect.Timeout) {
              ServerConfig.logConnectionError(serverConfigId, new Exception("Connection Timeout"));
            } else {
              ServerConfig.logConnectionError(serverConfigId, task.getError());
            }
221
          }
Tiago Cunha's avatar
Tiago Cunha committed
222
          return task;
223
        });
Tiago Cunha's avatar
Tiago Cunha committed
224
  }
225

Tiago Cunha's avatar
Tiago Cunha committed
226 227
  private Task<Void> fetchPublicSettings() {
    return new MethodCallHelper(serverConfigRealm, ddpClient).getPublicSettings();
228 229 230
  }

  //@DebugLog
231
  private void registerListeners() {
Yusuke Iwaki's avatar
Yusuke Iwaki committed
232 233
    if (!Thread.currentThread().getName().equals("RC_thread_" + serverConfigId)) {
      // execute in Looper.
Tiago Cunha's avatar
Tiago Cunha committed
234
      new Handler(getLooper()).post(this::registerListeners);
Yusuke Iwaki's avatar
Yusuke Iwaki committed
235 236 237
      return;
    }

Yusuke Iwaki's avatar
Yusuke Iwaki committed
238
    if (listenersRegistered) {
239
      return;
240
    }
Yusuke Iwaki's avatar
Yusuke Iwaki committed
241
    listenersRegistered = true;
242

243
    final ServerConfig config = defaultRealm.executeTransactionForRead(realm ->
Tiago Cunha's avatar
Tiago Cunha committed
244
        realm.where(ServerConfig.class).equalTo(ServerConfig.ID, serverConfigId).findFirst());
245 246
    final String hostname = config.getHostname();

247 248
    for (Class clazz : REGISTERABLE_CLASSES) {
      try {
249
        Constructor ctor = clazz.getConstructor(Context.class, String.class, RealmHelper.class,
250
            DDPClientWrapper.class);
251
        Object obj = ctor.newInstance(appContext, hostname, serverConfigRealm, ddpClient);
252

253
        if (obj instanceof Registrable) {
Tiago Cunha's avatar
Tiago Cunha committed
254 255 256
          Registrable registrable = (Registrable) obj;
          registrable.register();
          listeners.add(registrable);
257
        }
258
      } catch (Exception exception) {
Yusuke Iwaki's avatar
Yusuke Iwaki committed
259
        RCLog.w(exception, "Failed to register listeners!!");
260
      }
261
    }
262
  }
263

264
  @DebugLog
265
  private void unregisterListeners() {
Yusuke Iwaki's avatar
Yusuke Iwaki committed
266
    if (!listenersRegistered) {
267
      return;
268 269
    }

270
    Iterator<Registrable> iterator = listeners.iterator();
271
    while (iterator.hasNext()) {
Tiago Cunha's avatar
Tiago Cunha committed
272 273
      Registrable registrable = iterator.next();
      registrable.unregister();
274
      iterator.remove();
275
    }
276 277 278
    if (ddpClient != null) {
      ddpClient.close();
      ddpClient = null;
279
    }
Yusuke Iwaki's avatar
Yusuke Iwaki committed
280
    listenersRegistered = false;
281
  }
282
}