Commit 436b385e authored by Yusuke Iwaki's avatar Yusuke Iwaki

implement stream-notify-user (subscriptions-changed)

parent d9834a6c
...@@ -215,7 +215,7 @@ public class MethodCallHelper { ...@@ -215,7 +215,7 @@ public class MethodCallHelper {
/** /**
* request "subscriptions/get". * request "subscriptions/get".
*/ */
public Task<Void> getRooms() { public Task<Void> getRoomSubscriptions() {
return call("subscriptions/get", TIMEOUT_MS).onSuccessTask(CONVERT_TO_JSON_ARRAY) return call("subscriptions/get", TIMEOUT_MS).onSuccessTask(CONVERT_TO_JSON_ARRAY)
.onSuccessTask(task -> { .onSuccessTask(task -> {
final JSONArray result = task.getResult(); final JSONArray result = task.getResult();
......
...@@ -67,7 +67,7 @@ public class SidebarMainFragment extends AbstractFragment { ...@@ -67,7 +67,7 @@ public class SidebarMainFragment extends AbstractFragment {
.setOnUpdateListener(list -> roomListManager.setRooms(list)); .setOnUpdateListener(list -> roomListManager.setRooms(list));
currentUserObserver = realmHelper currentUserObserver = realmHelper
.createObjectObserver(realm -> realm.where(User.class).isNotEmpty("emails")) .createObjectObserver(User::queryCurrentUser)
.setOnUpdateListener(this::onRenderCurrentUser); .setOnUpdateListener(this::onRenderCurrentUser);
methodCallHelper = new MethodCallHelper(getContext(), serverConfigId); methodCallHelper = new MethodCallHelper(getContext(), serverConfigId);
......
package chat.rocket.android.model.ddp; package chat.rocket.android.model.ddp;
import io.realm.Realm;
import io.realm.RealmList; import io.realm.RealmList;
import io.realm.RealmObject; import io.realm.RealmObject;
import io.realm.RealmQuery;
import io.realm.annotations.PrimaryKey; import io.realm.annotations.PrimaryKey;
/** /**
...@@ -60,4 +62,8 @@ public class User extends RealmObject { ...@@ -60,4 +62,8 @@ public class User extends RealmObject {
public void setEmails(RealmList<Email> emails) { public void setEmails(RealmList<Email> emails) {
this.emails = emails; this.emails = emails;
} }
public static RealmQuery<User> queryCurrentUser(Realm realm) {
return realm.where(User.class).isNotEmpty("emails");
}
} }
...@@ -13,9 +13,10 @@ import chat.rocket.android.model.ServerConfig; ...@@ -13,9 +13,10 @@ import chat.rocket.android.model.ServerConfig;
import chat.rocket.android.model.internal.Session; import chat.rocket.android.model.internal.Session;
import chat.rocket.android.realm_helper.RealmHelper; import chat.rocket.android.realm_helper.RealmHelper;
import chat.rocket.android.realm_helper.RealmStore; import chat.rocket.android.realm_helper.RealmStore;
import chat.rocket.android.service.ddp.ActiveUsersSubscriber; import chat.rocket.android.service.ddp.base.ActiveUsersSubscriber;
import chat.rocket.android.service.ddp.LoginServiceConfigurationSubscriber; import chat.rocket.android.service.ddp.base.LoginServiceConfigurationSubscriber;
import chat.rocket.android.service.ddp.UserDataSubscriber; import chat.rocket.android.service.ddp.base.UserDataSubscriber;
import chat.rocket.android.service.observer.CurrentUserObserver;
import chat.rocket.android.service.observer.GetUsersOfRoomsProcedureObserver; import chat.rocket.android.service.observer.GetUsersOfRoomsProcedureObserver;
import chat.rocket.android.service.observer.LoadMessageProcedureObserver; import chat.rocket.android.service.observer.LoadMessageProcedureObserver;
import chat.rocket.android.service.observer.MethodCallObserver; import chat.rocket.android.service.observer.MethodCallObserver;
...@@ -43,7 +44,8 @@ public class RocketChatWebSocketThread extends HandlerThread { ...@@ -43,7 +44,8 @@ public class RocketChatWebSocketThread extends HandlerThread {
SessionObserver.class, SessionObserver.class,
LoadMessageProcedureObserver.class, LoadMessageProcedureObserver.class,
GetUsersOfRoomsProcedureObserver.class, GetUsersOfRoomsProcedureObserver.class,
NewMessageObserver.class NewMessageObserver.class,
CurrentUserObserver.class
}; };
private final Context appContext; private final Context appContext;
private final String serverConfigId; private final String serverConfigId;
......
...@@ -2,20 +2,21 @@ package chat.rocket.android.service.ddp; ...@@ -2,20 +2,21 @@ package chat.rocket.android.service.ddp;
import android.content.Context; import android.content.Context;
import android.text.TextUtils; import android.text.TextUtils;
import chat.rocket.android.api.DDPClientWraper;
import chat.rocket.android.helper.LogcatIfError; import chat.rocket.android.helper.LogcatIfError;
import chat.rocket.android.realm_helper.RealmHelper; import chat.rocket.android.realm_helper.RealmHelper;
import chat.rocket.android.service.Registerable; import chat.rocket.android.service.Registerable;
import chat.rocket.android.api.DDPClientWraper;
import chat.rocket.android_ddp.DDPSubscription; import chat.rocket.android_ddp.DDPSubscription;
import io.realm.Realm; import io.realm.Realm;
import io.realm.RealmObject; import io.realm.RealmObject;
import java.util.Iterator; import java.util.Iterator;
import org.json.JSONArray;
import org.json.JSONException; import org.json.JSONException;
import org.json.JSONObject; import org.json.JSONObject;
import rx.Subscription; import rx.Subscription;
import timber.log.Timber; import timber.log.Timber;
abstract class AbstractDDPDocEventSubscriber implements Registerable { public abstract class AbstractDDPDocEventSubscriber implements Registerable {
protected final Context context; protected final Context context;
protected final RealmHelper realmHelper; protected final RealmHelper realmHelper;
protected final DDPClientWraper ddpClient; protected final DDPClientWraper ddpClient;
...@@ -31,7 +32,13 @@ abstract class AbstractDDPDocEventSubscriber implements Registerable { ...@@ -31,7 +32,13 @@ abstract class AbstractDDPDocEventSubscriber implements Registerable {
protected abstract String getSubscriptionName(); protected abstract String getSubscriptionName();
protected abstract String getSubscriptionCallbackName(); protected abstract JSONArray getSubscriptionParams() throws JSONException;
protected boolean shouldTruncateTableOnInitialize() {
return false;
}
protected abstract boolean isTarget(String callbackName);
protected abstract Class<? extends RealmObject> getModelClass(); protected abstract Class<? extends RealmObject> getModelClass();
...@@ -39,8 +46,15 @@ abstract class AbstractDDPDocEventSubscriber implements Registerable { ...@@ -39,8 +46,15 @@ abstract class AbstractDDPDocEventSubscriber implements Registerable {
return json; return json;
} }
@Override public void register() { @Override public final void register() {
ddpClient.subscribe(getSubscriptionName(), null).onSuccess(task -> { JSONArray params = null;
try {
params = getSubscriptionParams();
} catch (JSONException exception) {
// just ignore.
}
ddpClient.subscribe(getSubscriptionName(), params).onSuccess(task -> {
subscriptionId = task.getResult().id; subscriptionId = task.getResult().id;
return null; return null;
}).continueWith(task -> { }).continueWith(task -> {
...@@ -50,20 +64,24 @@ abstract class AbstractDDPDocEventSubscriber implements Registerable { ...@@ -50,20 +64,24 @@ abstract class AbstractDDPDocEventSubscriber implements Registerable {
return null; return null;
}); });
if (shouldTruncateTableOnInitialize()) {
realmHelper.executeTransaction(realm -> { realmHelper.executeTransaction(realm -> {
realm.delete(getModelClass()); realm.delete(getModelClass());
return null; return null;
}).onSuccess(task -> { }).onSuccess(task -> {
registerSubscriptionCallback(); rxSubscription = subscribe();
return null; return null;
}).continueWith(new LogcatIfError()); }).continueWith(new LogcatIfError());
} else {
rxSubscription = subscribe();
}
} }
private void registerSubscriptionCallback() { protected Subscription subscribe() {
rxSubscription = ddpClient.getSubscriptionCallback() return ddpClient.getSubscriptionCallback()
.filter(event -> event instanceof DDPSubscription.DocEvent) .filter(event -> event instanceof DDPSubscription.DocEvent)
.cast(DDPSubscription.DocEvent.class) .cast(DDPSubscription.DocEvent.class)
.filter(event -> getSubscriptionCallbackName().equals(event.collection)) .filter(event -> isTarget(event.collection))
.subscribe(docEvent -> { .subscribe(docEvent -> {
try { try {
if (docEvent instanceof DDPSubscription.Added.Before) { if (docEvent instanceof DDPSubscription.Added.Before) {
...@@ -139,7 +157,7 @@ abstract class AbstractDDPDocEventSubscriber implements Registerable { ...@@ -139,7 +157,7 @@ abstract class AbstractDDPDocEventSubscriber implements Registerable {
} }
} }
@Override public void unregister() { @Override public final void unregister() {
if (rxSubscription != null) { if (rxSubscription != null) {
rxSubscription.unsubscribe(); rxSubscription.unsubscribe();
} }
......
package chat.rocket.android.service.ddp.base;
import android.content.Context;
import chat.rocket.android.api.DDPClientWraper;
import chat.rocket.android.realm_helper.RealmHelper;
import chat.rocket.android.service.ddp.AbstractDDPDocEventSubscriber;
import org.json.JSONArray;
abstract class AbstractBaseSubscriber extends AbstractDDPDocEventSubscriber {
protected AbstractBaseSubscriber(Context context, RealmHelper realmHelper,
DDPClientWraper ddpClient) {
super(context, realmHelper, ddpClient);
}
@Override protected final JSONArray getSubscriptionParams() {
return null;
}
@Override protected final boolean shouldTruncateTableOnInitialize() {
return true;
}
protected abstract String getSubscriptionCallbackName();
@Override protected final boolean isTarget(String callbackName) {
return getSubscriptionCallbackName().equals(callbackName);
}
}
package chat.rocket.android.service.ddp; package chat.rocket.android.service.ddp.base;
import android.content.Context; import android.content.Context;
import chat.rocket.android.model.ddp.User; import chat.rocket.android.model.ddp.User;
...@@ -9,7 +9,7 @@ import io.realm.RealmObject; ...@@ -9,7 +9,7 @@ import io.realm.RealmObject;
/** /**
* "activeUsers" subscriber. * "activeUsers" subscriber.
*/ */
public class ActiveUsersSubscriber extends AbstractDDPDocEventSubscriber { public class ActiveUsersSubscriber extends AbstractBaseSubscriber {
public ActiveUsersSubscriber(Context context, RealmHelper realmHelper, public ActiveUsersSubscriber(Context context, RealmHelper realmHelper,
DDPClientWraper ddpClient) { DDPClientWraper ddpClient) {
super(context, realmHelper, ddpClient); super(context, realmHelper, ddpClient);
......
package chat.rocket.android.service.ddp; package chat.rocket.android.service.ddp.base;
import android.content.Context; import android.content.Context;
import chat.rocket.android.model.ddp.MeteorLoginServiceConfiguration; import chat.rocket.android.model.ddp.MeteorLoginServiceConfiguration;
...@@ -9,7 +9,7 @@ import io.realm.RealmObject; ...@@ -9,7 +9,7 @@ import io.realm.RealmObject;
/** /**
* meteor.loginServiceConfiguration subscriber * meteor.loginServiceConfiguration subscriber
*/ */
public class LoginServiceConfigurationSubscriber extends AbstractDDPDocEventSubscriber { public class LoginServiceConfigurationSubscriber extends AbstractBaseSubscriber {
public LoginServiceConfigurationSubscriber(Context context, RealmHelper realmHelper, public LoginServiceConfigurationSubscriber(Context context, RealmHelper realmHelper,
DDPClientWraper ddpClient) { DDPClientWraper ddpClient) {
super(context, realmHelper, ddpClient); super(context, realmHelper, ddpClient);
......
package chat.rocket.android.service.ddp; package chat.rocket.android.service.ddp.base;
import android.content.Context; import android.content.Context;
import chat.rocket.android.api.DDPClientWraper; import chat.rocket.android.api.DDPClientWraper;
...@@ -9,7 +9,7 @@ import io.realm.RealmObject; ...@@ -9,7 +9,7 @@ import io.realm.RealmObject;
/** /**
* "userData" subscriber. * "userData" subscriber.
*/ */
public class UserDataSubscriber extends AbstractDDPDocEventSubscriber { public class UserDataSubscriber extends AbstractBaseSubscriber {
public UserDataSubscriber(Context context, RealmHelper realmHelper, public UserDataSubscriber(Context context, RealmHelper realmHelper,
DDPClientWraper ddpClient) { DDPClientWraper ddpClient) {
super(context, realmHelper, ddpClient); super(context, realmHelper, ddpClient);
......
package chat.rocket.android.service.ddp.stream;
import android.content.Context;
import chat.rocket.android.api.DDPClientWraper;
import chat.rocket.android.helper.LogcatIfError;
import chat.rocket.android.realm_helper.RealmHelper;
import chat.rocket.android.service.ddp.AbstractDDPDocEventSubscriber;
import chat.rocket.android_ddp.DDPSubscription;
import org.json.JSONArray;
import org.json.JSONObject;
import timber.log.Timber;
abstract class AbstractStreamNotifyEventSubscriber extends AbstractDDPDocEventSubscriber {
protected AbstractStreamNotifyEventSubscriber(Context context, RealmHelper realmHelper,
DDPClientWraper ddpClient) {
super(context, realmHelper, ddpClient);
}
@Override protected final boolean shouldTruncateTableOnInitialize() {
return false;
}
@Override protected final boolean isTarget(String callbackName) {
return getSubscriptionName().equals(callbackName);
}
protected abstract String getPrimaryKeyForModel();
@Override protected void onDocumentChanged(DDPSubscription.Changed docEvent) {
try {
JSONArray args = docEvent.fields.getJSONArray("args");
String msg = args.getString(0);
JSONObject target = args.getJSONObject(1);
if ("removed".equals(msg)) {
realmHelper.executeTransaction(realm ->
realm.where(getModelClass())
.equalTo(getPrimaryKeyForModel(), target.getString(getPrimaryKeyForModel()))
.findAll().deleteAllFromRealm()
).continueWith(new LogcatIfError());
} else { //inserted, updated
realmHelper.executeTransaction(realm ->
realm.createOrUpdateObjectFromJson(getModelClass(), customizeFieldJson(target))
).continueWith(new LogcatIfError());
}
} catch (Exception exception) {
Timber.w(exception, "failed to save stream-notify event.");
}
}
}
package chat.rocket.android.service.ddp.stream;
import android.content.Context;
import chat.rocket.android.api.DDPClientWraper;
import chat.rocket.android.model.ddp.RoomSubscription;
import chat.rocket.android.realm_helper.RealmHelper;
import io.realm.RealmObject;
import org.json.JSONArray;
import org.json.JSONException;
public class StreamNotifyUserSubscriptionsChanged extends AbstractStreamNotifyEventSubscriber {
private final String userId;
public StreamNotifyUserSubscriptionsChanged(Context context, RealmHelper realmHelper,
DDPClientWraper ddpClient, String userId) {
super(context, realmHelper, ddpClient);
this.userId = userId;
}
@Override protected String getSubscriptionName() {
return "stream-notify-user";
}
@Override protected JSONArray getSubscriptionParams() throws JSONException {
return new JSONArray()
.put(userId + "/subscriptions-changed")
.put(false);
}
@Override protected Class<? extends RealmObject> getModelClass() {
return RoomSubscription.class;
}
@Override protected String getPrimaryKeyForModel() {
return "rid";
}
}
package chat.rocket.android.service.observer;
import android.content.Context;
import chat.rocket.android.api.DDPClientWraper;
import chat.rocket.android.api.MethodCallHelper;
import chat.rocket.android.model.ddp.User;
import chat.rocket.android.realm_helper.RealmHelper;
import chat.rocket.android.service.Registerable;
import chat.rocket.android.service.ddp.stream.StreamNotifyUserSubscriptionsChanged;
import hugo.weaving.DebugLog;
import io.realm.Realm;
import io.realm.RealmResults;
import java.util.ArrayList;
import java.util.List;
/**
* observe the user with emails.
*/
public class CurrentUserObserver extends AbstractModelObserver<User> {
private boolean currentUserExists;
private final MethodCallHelper methodCall;
private ArrayList<Registerable> listeners;
public CurrentUserObserver(Context context, RealmHelper realmHelper,
DDPClientWraper ddpClient) {
super(context, realmHelper, ddpClient);
methodCall = new MethodCallHelper(realmHelper, ddpClient);
currentUserExists = false;
}
@Override public RealmResults<User> queryItems(Realm realm) {
return User.queryCurrentUser(realm).findAll();
}
@Override public void onUpdateResults(List<User> results) {
boolean exists = !results.isEmpty();
if (currentUserExists != exists) {
if (exists) {
onLogin(results.get(0));
} else {
onLogout();
}
currentUserExists = exists;
}
}
@DebugLog
private void onLogin(User user) {
if (listeners != null) {
onLogout();
}
listeners = new ArrayList<>();
final String userId = user.get_id();
// get and observe Room subscriptions.
methodCall.getRoomSubscriptions().onSuccess(task -> {
Registerable listener = new StreamNotifyUserSubscriptionsChanged(
context, realmHelper, ddpClient, userId);
listener.register();
listeners.add(listener);
return null;
});
}
@DebugLog
private void onLogout() {
if (listeners != null) {
for (Registerable listener : listeners) {
listener.unregister();
}
}
listeners = null;
}
}
...@@ -2,8 +2,8 @@ package chat.rocket.android.service.observer; ...@@ -2,8 +2,8 @@ package chat.rocket.android.service.observer;
import android.content.Context; import android.content.Context;
import chat.rocket.android.api.DDPClientWraper; import chat.rocket.android.api.DDPClientWraper;
import chat.rocket.android.api.MethodCallHelper;
import chat.rocket.android.helper.LogcatIfError; import chat.rocket.android.helper.LogcatIfError;
import chat.rocket.android.model.internal.GetUsersOfRoomsProcedure;
import chat.rocket.android.model.internal.LoadMessageProcedure; import chat.rocket.android.model.internal.LoadMessageProcedure;
import chat.rocket.android.model.internal.MethodCall; import chat.rocket.android.model.internal.MethodCall;
import chat.rocket.android.model.internal.Session; import chat.rocket.android.model.internal.Session;
...@@ -17,7 +17,6 @@ import java.util.List; ...@@ -17,7 +17,6 @@ import java.util.List;
* Observes user is logged into server. * Observes user is logged into server.
*/ */
public class SessionObserver extends AbstractModelObserver<Session> { public class SessionObserver extends AbstractModelObserver<Session> {
private final MethodCallHelper methodCall;
private int count; private int count;
/** /**
...@@ -25,7 +24,6 @@ public class SessionObserver extends AbstractModelObserver<Session> { ...@@ -25,7 +24,6 @@ public class SessionObserver extends AbstractModelObserver<Session> {
*/ */
public SessionObserver(Context context, RealmHelper realmHelper, DDPClientWraper ddpClient) { public SessionObserver(Context context, RealmHelper realmHelper, DDPClientWraper ddpClient) {
super(context, realmHelper, ddpClient); super(context, realmHelper, ddpClient);
methodCall = new MethodCallHelper(realmHelper, ddpClient);
count = 0; count = 0;
} }
...@@ -57,7 +55,6 @@ public class SessionObserver extends AbstractModelObserver<Session> { ...@@ -57,7 +55,6 @@ public class SessionObserver extends AbstractModelObserver<Session> {
} }
@DebugLog private void onLogin() { @DebugLog private void onLogin() {
methodCall.getRooms().continueWith(new LogcatIfError());
} }
...@@ -66,6 +63,7 @@ public class SessionObserver extends AbstractModelObserver<Session> { ...@@ -66,6 +63,7 @@ public class SessionObserver extends AbstractModelObserver<Session> {
// remove all tables. ONLY INTERNAL TABLES!. // remove all tables. ONLY INTERNAL TABLES!.
realm.delete(MethodCall.class); realm.delete(MethodCall.class);
realm.delete(LoadMessageProcedure.class); realm.delete(LoadMessageProcedure.class);
realm.delete(GetUsersOfRoomsProcedure.class);
return null; return null;
}).continueWith(new LogcatIfError()); }).continueWith(new LogcatIfError());
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment