AbstractDDPDocEventSubscriber.java 6.1 KB
Newer Older
Yusuke Iwaki's avatar
Yusuke Iwaki committed
1
package chat.rocket.android.service.ddp;
2 3 4

import android.content.Context;
import android.text.TextUtils;
Yusuke Iwaki's avatar
Yusuke Iwaki committed
5 6 7 8 9 10 11
import io.realm.Realm;
import io.realm.RealmObject;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;

import java.util.Iterator;
12
import chat.rocket.android.api.DDPClientWrapper;
13
import chat.rocket.android.helper.LogcatIfError;
Yusuke Iwaki's avatar
Yusuke Iwaki committed
14
import chat.rocket.android.log.RCLog;
15
import chat.rocket.android.realm_helper.RealmHelper;
16
import chat.rocket.android.service.Registrable;
17 18 19
import chat.rocket.android_ddp.DDPSubscription;
import rx.Subscription;

20
public abstract class AbstractDDPDocEventSubscriber implements Registrable {
Yusuke Iwaki's avatar
Yusuke Iwaki committed
21
  protected final Context context;
22
  protected final String hostname;
23
  protected final RealmHelper realmHelper;
24
  protected final DDPClientWrapper ddpClient;
25
  private boolean isUnsubscribed;
Yusuke Iwaki's avatar
Yusuke Iwaki committed
26 27
  private String subscriptionId;
  private Subscription rxSubscription;
28

29
  protected AbstractDDPDocEventSubscriber(Context context, String hostname,
30
                                          RealmHelper realmHelper, DDPClientWrapper ddpClient) {
Yusuke Iwaki's avatar
Yusuke Iwaki committed
31
    this.context = context;
32
    this.hostname = hostname;
33
    this.realmHelper = realmHelper;
34
    this.ddpClient = ddpClient;
35 36 37 38
  }

  protected abstract String getSubscriptionName();

39 40 41 42 43 44 45
  protected abstract JSONArray getSubscriptionParams() throws JSONException;

  protected boolean shouldTruncateTableOnInitialize() {
    return false;
  }

  protected abstract boolean isTarget(String callbackName);
46 47 48

  protected abstract Class<? extends RealmObject> getModelClass();

49
  protected JSONObject customizeFieldJson(JSONObject json) throws JSONException {
50 51 52
    return json;
  }

Yusuke Iwaki's avatar
Yusuke Iwaki committed
53 54
  protected void onRegister() {
  }
55

Yusuke Iwaki's avatar
Yusuke Iwaki committed
56 57
  protected void onUnregister() {
  }
58

Yusuke Iwaki's avatar
Yusuke Iwaki committed
59 60
  @Override
  public final void register() {
61
    isUnsubscribed = false;
62 63 64 65 66 67 68 69
    JSONArray params = null;
    try {
      params = getSubscriptionParams();
    } catch (JSONException exception) {
      // just ignore.
    }

    ddpClient.subscribe(getSubscriptionName(), params).onSuccess(task -> {
70 71 72 73 74
      if (isUnsubscribed) {
        ddpClient.unsubscribe(task.getResult().id).continueWith(new LogcatIfError());
      } else {
        subscriptionId = task.getResult().id;
      }
75 76 77
      return null;
    }).continueWith(task -> {
      if (task.isFaulted()) {
Yusuke Iwaki's avatar
Yusuke Iwaki committed
78
        RCLog.w(task.getError(), "DDP subscription failed.");
79 80 81 82
      }
      return null;
    });

83 84 85 86 87 88 89 90 91 92 93
    if (shouldTruncateTableOnInitialize()) {
      realmHelper.executeTransaction(realm -> {
        realm.delete(getModelClass());
        return null;
      }).onSuccess(task -> {
        rxSubscription = subscribe();
        return null;
      }).continueWith(new LogcatIfError());
    } else {
      rxSubscription = subscribe();
    }
94
    onRegister();
95 96
  }

97 98
  protected Subscription subscribe() {
    return ddpClient.getSubscriptionCallback()
99 100
        .filter(event -> event instanceof DDPSubscription.DocEvent)
        .cast(DDPSubscription.DocEvent.class)
101
        .filter(event -> isTarget(event.collection))
102 103 104 105 106 107 108 109 110 111 112 113
        .subscribe(docEvent -> {
          try {
            if (docEvent instanceof DDPSubscription.Added.Before) {
              onDocumentAdded((DDPSubscription.Added) docEvent); //ignore Before
            } else if (docEvent instanceof DDPSubscription.Added) {
              onDocumentAdded((DDPSubscription.Added) docEvent);
            } else if (docEvent instanceof DDPSubscription.Removed) {
              onDocumentRemoved((DDPSubscription.Removed) docEvent);
            } else if (docEvent instanceof DDPSubscription.Changed) {
              onDocumentChanged((DDPSubscription.Changed) docEvent);
            } else if (docEvent instanceof DDPSubscription.MovedBefore) {
              //ignore movedBefore
114
            }
Yusuke Iwaki's avatar
Yusuke Iwaki committed
115
          } catch (Exception exception) {
Yusuke Iwaki's avatar
Yusuke Iwaki committed
116
            RCLog.w(exception, "failed to handle subscription callback");
117
          }
118
        });
119 120 121
  }

  protected void onDocumentAdded(DDPSubscription.Added docEvent) {
122
    realmHelper.executeTransaction(realm -> {
123 124 125 126 127
      onDocumentAdded(realm, docEvent);
      return null;
    }).continueWith(new LogcatIfError());
  }

Yusuke Iwaki's avatar
Yusuke Iwaki committed
128 129
  private void onDocumentAdded(Realm realm, DDPSubscription.Added docEvent) throws JSONException {
    //executed in RealmTransaction
130
    JSONObject json = new JSONObject().put("_id", docEvent.docID);
Yusuke Iwaki's avatar
Yusuke Iwaki committed
131 132
    mergeJson(json, docEvent.fields);
    realm.createOrUpdateObjectFromJson(getModelClass(), customizeFieldJson(json));
133 134
  }

Yusuke Iwaki's avatar
Yusuke Iwaki committed
135
  protected void onDocumentChanged(DDPSubscription.Changed docEvent) {
136
    realmHelper.executeTransaction(realm -> {
Yusuke Iwaki's avatar
Yusuke Iwaki committed
137
      onDocumentChanged(realm, docEvent);
138 139 140 141 142 143 144
      return null;
    }).continueWith(new LogcatIfError());
  }

  private void onDocumentChanged(Realm realm, DDPSubscription.Changed docEvent)
      throws JSONException {
    //executed in RealmTransaction
145
    JSONObject json = new JSONObject().put("_id", docEvent.docID);
146 147 148 149 150
    if (docEvent.cleared != null) {
      for (int i = 0; i < docEvent.cleared.length(); i++) {
        String fieldToDelete = docEvent.cleared.getString(i);
        json.put(fieldToDelete, JSONObject.NULL);
      }
151
    }
Yusuke Iwaki's avatar
Yusuke Iwaki committed
152 153 154 155 156
    mergeJson(json, docEvent.fields);
    realm.createOrUpdateObjectFromJson(getModelClass(), customizeFieldJson(json));
  }

  protected void onDocumentRemoved(DDPSubscription.Removed docEvent) {
157
    realmHelper.executeTransaction(realm -> {
Yusuke Iwaki's avatar
Yusuke Iwaki committed
158 159 160
      onDocumentRemoved(realm, docEvent);
      return null;
    }).continueWith(new LogcatIfError());
161
  }
162

163 164 165
  private void onDocumentRemoved(Realm realm, DDPSubscription.Removed docEvent)
      throws JSONException {
    //executed in RealmTransaction
166
    realm.where(getModelClass()).equalTo("_id", docEvent.docID).findAll().deleteAllFromRealm();
167
  }
168

Yusuke Iwaki's avatar
Yusuke Iwaki committed
169 170 171 172 173 174 175 176
  private void mergeJson(JSONObject target, JSONObject src) throws JSONException {
    Iterator<String> iterator = src.keys();
    while (iterator.hasNext()) {
      String key = iterator.next();
      target.put(key, src.get(key));
    }
  }

Yusuke Iwaki's avatar
Yusuke Iwaki committed
177 178
  @Override
  public final void unregister() {
179
    isUnsubscribed = true;
180
    onUnregister();
Yusuke Iwaki's avatar
Yusuke Iwaki committed
181 182
    if (rxSubscription != null) {
      rxSubscription.unsubscribe();
183
    }
Yusuke Iwaki's avatar
Yusuke Iwaki committed
184
    if (!TextUtils.isEmpty(subscriptionId)) {
185
      ddpClient.unsubscribe(subscriptionId).continueWith(new LogcatIfError());
186
    }
187
  }
188
}