Commit 760fe05b authored by Yusuke Iwaki's avatar Yusuke Iwaki

FIX #58 Merge branch 'stream-notify' into develop

parents d9834a6c 35de4115
......@@ -215,7 +215,7 @@ public class MethodCallHelper {
/**
* request "subscriptions/get".
*/
public Task<Void> getRooms() {
public Task<Void> getRoomSubscriptions() {
return call("subscriptions/get", TIMEOUT_MS).onSuccessTask(CONVERT_TO_JSON_ARRAY)
.onSuccessTask(task -> {
final JSONArray result = task.getResult();
......
......@@ -67,7 +67,7 @@ public class SidebarMainFragment extends AbstractFragment {
.setOnUpdateListener(list -> roomListManager.setRooms(list));
currentUserObserver = realmHelper
.createObjectObserver(realm -> realm.where(User.class).isNotEmpty("emails"))
.createObjectObserver(User::queryCurrentUser)
.setOnUpdateListener(this::onRenderCurrentUser);
methodCallHelper = new MethodCallHelper(getContext(), serverConfigId);
......
package chat.rocket.android.model.ddp;
import io.realm.Realm;
import io.realm.RealmList;
import io.realm.RealmObject;
import io.realm.RealmQuery;
import io.realm.annotations.PrimaryKey;
/**
......@@ -60,4 +62,8 @@ public class User extends RealmObject {
public void setEmails(RealmList<Email> emails) {
this.emails = emails;
}
public static RealmQuery<User> queryCurrentUser(Realm realm) {
return realm.where(User.class).isNotEmpty("emails");
}
}
......@@ -13,9 +13,10 @@ import chat.rocket.android.model.ServerConfig;
import chat.rocket.android.model.internal.Session;
import chat.rocket.android.realm_helper.RealmHelper;
import chat.rocket.android.realm_helper.RealmStore;
import chat.rocket.android.service.ddp.ActiveUsersSubscriber;
import chat.rocket.android.service.ddp.LoginServiceConfigurationSubscriber;
import chat.rocket.android.service.ddp.UserDataSubscriber;
import chat.rocket.android.service.ddp.base.ActiveUsersSubscriber;
import chat.rocket.android.service.ddp.base.LoginServiceConfigurationSubscriber;
import chat.rocket.android.service.ddp.base.UserDataSubscriber;
import chat.rocket.android.service.observer.CurrentUserObserver;
import chat.rocket.android.service.observer.GetUsersOfRoomsProcedureObserver;
import chat.rocket.android.service.observer.LoadMessageProcedureObserver;
import chat.rocket.android.service.observer.MethodCallObserver;
......@@ -43,7 +44,8 @@ public class RocketChatWebSocketThread extends HandlerThread {
SessionObserver.class,
LoadMessageProcedureObserver.class,
GetUsersOfRoomsProcedureObserver.class,
NewMessageObserver.class
NewMessageObserver.class,
CurrentUserObserver.class
};
private final Context appContext;
private final String serverConfigId;
......
......@@ -2,20 +2,21 @@ package chat.rocket.android.service.ddp;
import android.content.Context;
import android.text.TextUtils;
import chat.rocket.android.api.DDPClientWraper;
import chat.rocket.android.helper.LogcatIfError;
import chat.rocket.android.realm_helper.RealmHelper;
import chat.rocket.android.service.Registerable;
import chat.rocket.android.api.DDPClientWraper;
import chat.rocket.android_ddp.DDPSubscription;
import io.realm.Realm;
import io.realm.RealmObject;
import java.util.Iterator;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
import rx.Subscription;
import timber.log.Timber;
abstract class AbstractDDPDocEventSubscriber implements Registerable {
public abstract class AbstractDDPDocEventSubscriber implements Registerable {
protected final Context context;
protected final RealmHelper realmHelper;
protected final DDPClientWraper ddpClient;
......@@ -31,16 +32,33 @@ abstract class AbstractDDPDocEventSubscriber implements Registerable {
protected abstract String getSubscriptionName();
protected abstract String getSubscriptionCallbackName();
protected abstract JSONArray getSubscriptionParams() throws JSONException;
protected boolean shouldTruncateTableOnInitialize() {
return false;
}
protected abstract boolean isTarget(String callbackName);
protected abstract Class<? extends RealmObject> getModelClass();
protected JSONObject customizeFieldJson(JSONObject json) {
protected JSONObject customizeFieldJson(JSONObject json) throws JSONException {
return json;
}
@Override public void register() {
ddpClient.subscribe(getSubscriptionName(), null).onSuccess(task -> {
protected void onRegister() {}
protected void onUnregister() {}
@Override public final void register() {
JSONArray params = null;
try {
params = getSubscriptionParams();
} catch (JSONException exception) {
// just ignore.
}
ddpClient.subscribe(getSubscriptionName(), params).onSuccess(task -> {
subscriptionId = task.getResult().id;
return null;
}).continueWith(task -> {
......@@ -50,20 +68,25 @@ abstract class AbstractDDPDocEventSubscriber implements Registerable {
return null;
});
realmHelper.executeTransaction(realm -> {
realm.delete(getModelClass());
return null;
}).onSuccess(task -> {
registerSubscriptionCallback();
return null;
}).continueWith(new LogcatIfError());
if (shouldTruncateTableOnInitialize()) {
realmHelper.executeTransaction(realm -> {
realm.delete(getModelClass());
return null;
}).onSuccess(task -> {
rxSubscription = subscribe();
return null;
}).continueWith(new LogcatIfError());
} else {
rxSubscription = subscribe();
}
onRegister();
}
private void registerSubscriptionCallback() {
rxSubscription = ddpClient.getSubscriptionCallback()
protected Subscription subscribe() {
return ddpClient.getSubscriptionCallback()
.filter(event -> event instanceof DDPSubscription.DocEvent)
.cast(DDPSubscription.DocEvent.class)
.filter(event -> getSubscriptionCallbackName().equals(event.collection))
.filter(event -> isTarget(event.collection))
.subscribe(docEvent -> {
try {
if (docEvent instanceof DDPSubscription.Added.Before) {
......@@ -139,7 +162,8 @@ abstract class AbstractDDPDocEventSubscriber implements Registerable {
}
}
@Override public void unregister() {
@Override public final void unregister() {
onUnregister();
if (rxSubscription != null) {
rxSubscription.unsubscribe();
}
......
package chat.rocket.android.service.ddp.base;
import android.content.Context;
import chat.rocket.android.api.DDPClientWraper;
import chat.rocket.android.realm_helper.RealmHelper;
import chat.rocket.android.service.ddp.AbstractDDPDocEventSubscriber;
import org.json.JSONArray;
abstract class AbstractBaseSubscriber extends AbstractDDPDocEventSubscriber {
protected AbstractBaseSubscriber(Context context, RealmHelper realmHelper,
DDPClientWraper ddpClient) {
super(context, realmHelper, ddpClient);
}
@Override protected final JSONArray getSubscriptionParams() {
return null;
}
@Override protected final boolean shouldTruncateTableOnInitialize() {
return true;
}
protected abstract String getSubscriptionCallbackName();
@Override protected final boolean isTarget(String callbackName) {
return getSubscriptionCallbackName().equals(callbackName);
}
}
package chat.rocket.android.service.ddp;
package chat.rocket.android.service.ddp.base;
import android.content.Context;
import chat.rocket.android.model.ddp.User;
......@@ -9,7 +9,7 @@ import io.realm.RealmObject;
/**
* "activeUsers" subscriber.
*/
public class ActiveUsersSubscriber extends AbstractDDPDocEventSubscriber {
public class ActiveUsersSubscriber extends AbstractBaseSubscriber {
public ActiveUsersSubscriber(Context context, RealmHelper realmHelper,
DDPClientWraper ddpClient) {
super(context, realmHelper, ddpClient);
......
package chat.rocket.android.service.ddp;
package chat.rocket.android.service.ddp.base;
import android.content.Context;
import chat.rocket.android.model.ddp.MeteorLoginServiceConfiguration;
......@@ -9,7 +9,7 @@ import io.realm.RealmObject;
/**
* meteor.loginServiceConfiguration subscriber
*/
public class LoginServiceConfigurationSubscriber extends AbstractDDPDocEventSubscriber {
public class LoginServiceConfigurationSubscriber extends AbstractBaseSubscriber {
public LoginServiceConfigurationSubscriber(Context context, RealmHelper realmHelper,
DDPClientWraper ddpClient) {
super(context, realmHelper, ddpClient);
......
package chat.rocket.android.service.ddp;
package chat.rocket.android.service.ddp.base;
import android.content.Context;
import chat.rocket.android.api.DDPClientWraper;
......@@ -9,7 +9,7 @@ import io.realm.RealmObject;
/**
* "userData" subscriber.
*/
public class UserDataSubscriber extends AbstractDDPDocEventSubscriber {
public class UserDataSubscriber extends AbstractBaseSubscriber {
public UserDataSubscriber(Context context, RealmHelper realmHelper,
DDPClientWraper ddpClient) {
super(context, realmHelper, ddpClient);
......
package chat.rocket.android.service.ddp.stream;
import android.content.Context;
import chat.rocket.android.api.DDPClientWraper;
import chat.rocket.android.helper.LogcatIfError;
import chat.rocket.android.realm_helper.RealmHelper;
import chat.rocket.android.service.ddp.AbstractDDPDocEventSubscriber;
import chat.rocket.android_ddp.DDPSubscription;
import org.json.JSONArray;
import org.json.JSONObject;
import timber.log.Timber;
abstract class AbstractStreamNotifyEventSubscriber extends AbstractDDPDocEventSubscriber {
protected AbstractStreamNotifyEventSubscriber(Context context, RealmHelper realmHelper,
DDPClientWraper ddpClient) {
super(context, realmHelper, ddpClient);
}
@Override protected final boolean shouldTruncateTableOnInitialize() {
return false;
}
@Override protected final boolean isTarget(String callbackName) {
return getSubscriptionName().equals(callbackName);
}
protected abstract String getPrimaryKeyForModel();
@Override protected void onDocumentChanged(DDPSubscription.Changed docEvent) {
try {
JSONArray args = docEvent.fields.getJSONArray("args");
String msg = args.length() > 0 ? args.getString(0) : null;
JSONObject target = args.getJSONObject(args.length() - 1);
if ("removed".equals(msg)) {
realmHelper.executeTransaction(realm ->
realm.where(getModelClass())
.equalTo(getPrimaryKeyForModel(), target.getString(getPrimaryKeyForModel()))
.findAll().deleteAllFromRealm()
).continueWith(new LogcatIfError());
} else { //inserted, updated
realmHelper.executeTransaction(realm ->
realm.createOrUpdateObjectFromJson(getModelClass(), customizeFieldJson(target))
).continueWith(new LogcatIfError());
}
} catch (Exception exception) {
Timber.w(exception, "failed to save stream-notify event.");
}
}
}
package chat.rocket.android.service.ddp.stream;
import android.content.Context;
import chat.rocket.android.api.DDPClientWraper;
import chat.rocket.android.model.ddp.RoomSubscription;
import chat.rocket.android.realm_helper.RealmHelper;
import io.realm.RealmObject;
import org.json.JSONArray;
import org.json.JSONException;
public class StreamNotifyUserSubscriptionsChanged extends AbstractStreamNotifyEventSubscriber {
private final String userId;
public StreamNotifyUserSubscriptionsChanged(Context context, RealmHelper realmHelper,
DDPClientWraper ddpClient, String userId) {
super(context, realmHelper, ddpClient);
this.userId = userId;
}
@Override protected String getSubscriptionName() {
return "stream-notify-user";
}
@Override protected JSONArray getSubscriptionParams() throws JSONException {
return new JSONArray()
.put(userId + "/subscriptions-changed")
.put(false);
}
@Override protected Class<? extends RealmObject> getModelClass() {
return RoomSubscription.class;
}
@Override protected String getPrimaryKeyForModel() {
return "rid";
}
}
package chat.rocket.android.service.ddp.stream;
import android.content.Context;
import chat.rocket.android.api.DDPClientWraper;
import chat.rocket.android.model.ddp.Message;
import chat.rocket.android.realm_helper.RealmHelper;
import io.realm.RealmObject;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
/**
* stream-room-message subscriber.
*/
public class StreamRoomMessage extends AbstractStreamNotifyEventSubscriber {
private String roomId;
public StreamRoomMessage(Context context, RealmHelper realmHelper, DDPClientWraper ddpClient,
String roomId) {
super(context, realmHelper, ddpClient);
this.roomId = roomId;
}
@Override protected String getSubscriptionName() {
return "stream-room-messages";
}
@Override protected JSONArray getSubscriptionParams() throws JSONException {
return new JSONArray()
.put(roomId)
.put(false);
}
@Override protected Class<? extends RealmObject> getModelClass() {
return Message.class;
}
@Override protected String getPrimaryKeyForModel() {
return "_id";
}
@Override protected JSONObject customizeFieldJson(JSONObject json) throws JSONException {
return Message.customizeJson(super.customizeFieldJson(json));
}
}
package chat.rocket.android.service.internal;
import android.content.Context;
import android.content.SharedPreferences;
import chat.rocket.android.RocketChatCache;
import chat.rocket.android.helper.TextUtils;
import chat.rocket.android.model.ddp.RoomSubscription;
import chat.rocket.android.realm_helper.RealmHelper;
import chat.rocket.android.service.Registerable;
public abstract class AbstractRocketChatCacheObserver implements Registerable {
private final Context context;
private final RealmHelper realmHelper;
private String roomId;
protected AbstractRocketChatCacheObserver(Context context, RealmHelper realmHelper) {
this.context = context;
this.realmHelper = realmHelper;
}
private void updateRoomIdWith(SharedPreferences prefs) {
String roomId = prefs.getString(RocketChatCache.KEY_SELECTED_ROOM_ID, null);
if (!TextUtils.isEmpty(roomId)) {
RoomSubscription room = realmHelper.executeTransactionForRead(realm ->
realm.where(RoomSubscription.class).equalTo("rid", roomId).findFirst());
if (room != null) {
if (this.roomId == null || !this.roomId.equals(roomId)) {
this.roomId = roomId;
onRoomIdUpdated(roomId);
}
return;
}
}
if (this.roomId != null) {
this.roomId = null;
onRoomIdUpdated(null);
}
}
protected abstract void onRoomIdUpdated(String roomId);
private SharedPreferences.OnSharedPreferenceChangeListener listener =
(prefs, key) -> {
if (RocketChatCache.KEY_SELECTED_ROOM_ID.equals(key)) {
updateRoomIdWith(prefs);
}
};
@Override public final void register() {
SharedPreferences prefs = RocketChatCache.get(context);
prefs.registerOnSharedPreferenceChangeListener(listener);
updateRoomIdWith(prefs);
}
@Override public final void unregister() {
RocketChatCache.get(context).unregisterOnSharedPreferenceChangeListener(listener);
}
}
package chat.rocket.android.service.internal;
import android.content.Context;
import android.os.Handler;
import android.os.Looper;
import chat.rocket.android.api.DDPClientWraper;
import chat.rocket.android.realm_helper.RealmHelper;
import chat.rocket.android.service.Registerable;
import chat.rocket.android.service.ddp.stream.StreamRoomMessage;
/**
* wrapper for managing stream-notify-message depending on RocketChatCache.
*/
public class StreamRoomMessageManager implements Registerable {
private StreamRoomMessage streamRoomMessage;
private final Context context;
private final RealmHelper realmHelper;
private final DDPClientWraper ddpClient;
private final AbstractRocketChatCacheObserver cacheObserver;
private final Handler handler;
public StreamRoomMessageManager(Context context, RealmHelper realmHelper,
DDPClientWraper ddpClient) {
this.context = context;
this.realmHelper = realmHelper;
this.ddpClient = ddpClient;
cacheObserver = new AbstractRocketChatCacheObserver(context, realmHelper) {
@Override protected void onRoomIdUpdated(String roomId) {
unregisterStreamNotifyMessageIfNeeded();
registerStreamNotifyMessage(roomId);
}
};
handler = new Handler(Looper.myLooper());
}
private void registerStreamNotifyMessage(String roomId) {
streamRoomMessage = new StreamRoomMessage(context, realmHelper, ddpClient, roomId);
handler.post(() -> {
streamRoomMessage.register();
});
}
private void unregisterStreamNotifyMessageIfNeeded() {
if (streamRoomMessage != null) {
handler.post(() -> {
streamRoomMessage.unregister();
});
streamRoomMessage = null;
}
}
@Override public void register() {
cacheObserver.register();
}
@Override public void unregister() {
unregisterStreamNotifyMessageIfNeeded();
cacheObserver.unregister();
}
}
package chat.rocket.android.service.observer;
import android.content.Context;
import chat.rocket.android.api.DDPClientWraper;
import chat.rocket.android.api.MethodCallHelper;
import chat.rocket.android.model.ddp.User;
import chat.rocket.android.realm_helper.RealmHelper;
import chat.rocket.android.service.Registerable;
import chat.rocket.android.service.ddp.stream.StreamNotifyUserSubscriptionsChanged;
import hugo.weaving.DebugLog;
import io.realm.Realm;
import io.realm.RealmResults;
import java.util.ArrayList;
import java.util.List;
/**
* observe the user with emails.
*/
public class CurrentUserObserver extends AbstractModelObserver<User> {
private boolean currentUserExists;
private final MethodCallHelper methodCall;
private ArrayList<Registerable> listeners;
public CurrentUserObserver(Context context, RealmHelper realmHelper,
DDPClientWraper ddpClient) {
super(context, realmHelper, ddpClient);
methodCall = new MethodCallHelper(realmHelper, ddpClient);
currentUserExists = false;
}
@Override public RealmResults<User> queryItems(Realm realm) {
return User.queryCurrentUser(realm).findAll();
}
@Override public void onUpdateResults(List<User> results) {
boolean exists = !results.isEmpty();
if (currentUserExists != exists) {
if (exists) {
onLogin(results.get(0));
} else {
onLogout();
}
currentUserExists = exists;
}
}
@DebugLog
private void onLogin(User user) {
if (listeners != null) {
onLogout();
}
listeners = new ArrayList<>();
final String userId = user.get_id();
// get and observe Room subscriptions.
methodCall.getRoomSubscriptions().onSuccess(task -> {
Registerable listener = new StreamNotifyUserSubscriptionsChanged(
context, realmHelper, ddpClient, userId);
listener.register();
listeners.add(listener);
return null;
});
}
@DebugLog
private void onLogout() {
if (listeners != null) {
for (Registerable listener : listeners) {
listener.unregister();
}
}
listeners = null;
}
}
......@@ -2,12 +2,13 @@ package chat.rocket.android.service.observer;
import android.content.Context;
import chat.rocket.android.api.DDPClientWraper;
import chat.rocket.android.api.MethodCallHelper;
import chat.rocket.android.helper.LogcatIfError;
import chat.rocket.android.model.internal.GetUsersOfRoomsProcedure;
import chat.rocket.android.model.internal.LoadMessageProcedure;
import chat.rocket.android.model.internal.MethodCall;
import chat.rocket.android.model.internal.Session;
import chat.rocket.android.realm_helper.RealmHelper;
import chat.rocket.android.service.internal.StreamRoomMessageManager;
import hugo.weaving.DebugLog;
import io.realm.Realm;
import io.realm.RealmResults;
......@@ -17,16 +18,18 @@ import java.util.List;
* Observes user is logged into server.
*/
public class SessionObserver extends AbstractModelObserver<Session> {
private final MethodCallHelper methodCall;
private int count;
private final StreamRoomMessageManager streamNotifyMessage;
/**
* constructor.
*/
public SessionObserver(Context context, RealmHelper realmHelper, DDPClientWraper ddpClient) {
super(context, realmHelper, ddpClient);
methodCall = new MethodCallHelper(realmHelper, ddpClient);
count = 0;
streamNotifyMessage = new StreamRoomMessageManager(context, realmHelper, ddpClient);
}
@Override public RealmResults<Session> queryItems(Realm realm) {
......@@ -57,15 +60,17 @@ public class SessionObserver extends AbstractModelObserver<Session> {
}
@DebugLog private void onLogin() {
methodCall.getRooms().continueWith(new LogcatIfError());
streamNotifyMessage.register();
}
@DebugLog private void onLogout() {
streamNotifyMessage.unregister();
realmHelper.executeTransaction(realm -> {
// remove all tables. ONLY INTERNAL TABLES!.
realm.delete(MethodCall.class);
realm.delete(LoadMessageProcedure.class);
realm.delete(GetUsersOfRoomsProcedure.class);
return null;
}).continueWith(new LogcatIfError());
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment