Commit 35de4115 authored by Yusuke Iwaki's avatar Yusuke Iwaki

implement stream-room-messages

parent 436b385e
......@@ -42,10 +42,14 @@ public abstract class AbstractDDPDocEventSubscriber implements Registerable {
protected abstract Class<? extends RealmObject> getModelClass();
protected JSONObject customizeFieldJson(JSONObject json) {
protected JSONObject customizeFieldJson(JSONObject json) throws JSONException {
return json;
}
protected void onRegister() {}
protected void onUnregister() {}
@Override public final void register() {
JSONArray params = null;
try {
......@@ -75,6 +79,7 @@ public abstract class AbstractDDPDocEventSubscriber implements Registerable {
} else {
rxSubscription = subscribe();
}
onRegister();
}
protected Subscription subscribe() {
......@@ -158,6 +163,7 @@ public abstract class AbstractDDPDocEventSubscriber implements Registerable {
}
@Override public final void unregister() {
onUnregister();
if (rxSubscription != null) {
rxSubscription.unsubscribe();
}
......
......@@ -29,8 +29,8 @@ abstract class AbstractStreamNotifyEventSubscriber extends AbstractDDPDocEventSu
@Override protected void onDocumentChanged(DDPSubscription.Changed docEvent) {
try {
JSONArray args = docEvent.fields.getJSONArray("args");
String msg = args.getString(0);
JSONObject target = args.getJSONObject(1);
String msg = args.length() > 0 ? args.getString(0) : null;
JSONObject target = args.getJSONObject(args.length() - 1);
if ("removed".equals(msg)) {
realmHelper.executeTransaction(realm ->
realm.where(getModelClass())
......
package chat.rocket.android.service.ddp.stream;
import android.content.Context;
import chat.rocket.android.api.DDPClientWraper;
import chat.rocket.android.model.ddp.Message;
import chat.rocket.android.realm_helper.RealmHelper;
import io.realm.RealmObject;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
/**
* stream-room-message subscriber.
*/
public class StreamRoomMessage extends AbstractStreamNotifyEventSubscriber {
private String roomId;
public StreamRoomMessage(Context context, RealmHelper realmHelper, DDPClientWraper ddpClient,
String roomId) {
super(context, realmHelper, ddpClient);
this.roomId = roomId;
}
@Override protected String getSubscriptionName() {
return "stream-room-messages";
}
@Override protected JSONArray getSubscriptionParams() throws JSONException {
return new JSONArray()
.put(roomId)
.put(false);
}
@Override protected Class<? extends RealmObject> getModelClass() {
return Message.class;
}
@Override protected String getPrimaryKeyForModel() {
return "_id";
}
@Override protected JSONObject customizeFieldJson(JSONObject json) throws JSONException {
return Message.customizeJson(super.customizeFieldJson(json));
}
}
package chat.rocket.android.service.internal;
import android.content.Context;
import android.content.SharedPreferences;
import chat.rocket.android.RocketChatCache;
import chat.rocket.android.helper.TextUtils;
import chat.rocket.android.model.ddp.RoomSubscription;
import chat.rocket.android.realm_helper.RealmHelper;
import chat.rocket.android.service.Registerable;
public abstract class AbstractRocketChatCacheObserver implements Registerable {
private final Context context;
private final RealmHelper realmHelper;
private String roomId;
protected AbstractRocketChatCacheObserver(Context context, RealmHelper realmHelper) {
this.context = context;
this.realmHelper = realmHelper;
}
private void updateRoomIdWith(SharedPreferences prefs) {
String roomId = prefs.getString(RocketChatCache.KEY_SELECTED_ROOM_ID, null);
if (!TextUtils.isEmpty(roomId)) {
RoomSubscription room = realmHelper.executeTransactionForRead(realm ->
realm.where(RoomSubscription.class).equalTo("rid", roomId).findFirst());
if (room != null) {
if (this.roomId == null || !this.roomId.equals(roomId)) {
this.roomId = roomId;
onRoomIdUpdated(roomId);
}
return;
}
}
if (this.roomId != null) {
this.roomId = null;
onRoomIdUpdated(null);
}
}
protected abstract void onRoomIdUpdated(String roomId);
private SharedPreferences.OnSharedPreferenceChangeListener listener =
(prefs, key) -> {
if (RocketChatCache.KEY_SELECTED_ROOM_ID.equals(key)) {
updateRoomIdWith(prefs);
}
};
@Override public final void register() {
SharedPreferences prefs = RocketChatCache.get(context);
prefs.registerOnSharedPreferenceChangeListener(listener);
updateRoomIdWith(prefs);
}
@Override public final void unregister() {
RocketChatCache.get(context).unregisterOnSharedPreferenceChangeListener(listener);
}
}
package chat.rocket.android.service.internal;
import android.content.Context;
import android.os.Handler;
import android.os.Looper;
import chat.rocket.android.api.DDPClientWraper;
import chat.rocket.android.realm_helper.RealmHelper;
import chat.rocket.android.service.Registerable;
import chat.rocket.android.service.ddp.stream.StreamRoomMessage;
/**
* wrapper for managing stream-notify-message depending on RocketChatCache.
*/
public class StreamRoomMessageManager implements Registerable {
private StreamRoomMessage streamRoomMessage;
private final Context context;
private final RealmHelper realmHelper;
private final DDPClientWraper ddpClient;
private final AbstractRocketChatCacheObserver cacheObserver;
private final Handler handler;
public StreamRoomMessageManager(Context context, RealmHelper realmHelper,
DDPClientWraper ddpClient) {
this.context = context;
this.realmHelper = realmHelper;
this.ddpClient = ddpClient;
cacheObserver = new AbstractRocketChatCacheObserver(context, realmHelper) {
@Override protected void onRoomIdUpdated(String roomId) {
unregisterStreamNotifyMessageIfNeeded();
registerStreamNotifyMessage(roomId);
}
};
handler = new Handler(Looper.myLooper());
}
private void registerStreamNotifyMessage(String roomId) {
streamRoomMessage = new StreamRoomMessage(context, realmHelper, ddpClient, roomId);
handler.post(() -> {
streamRoomMessage.register();
});
}
private void unregisterStreamNotifyMessageIfNeeded() {
if (streamRoomMessage != null) {
handler.post(() -> {
streamRoomMessage.unregister();
});
streamRoomMessage = null;
}
}
@Override public void register() {
cacheObserver.register();
}
@Override public void unregister() {
unregisterStreamNotifyMessageIfNeeded();
cacheObserver.unregister();
}
}
......@@ -8,6 +8,7 @@ import chat.rocket.android.model.internal.LoadMessageProcedure;
import chat.rocket.android.model.internal.MethodCall;
import chat.rocket.android.model.internal.Session;
import chat.rocket.android.realm_helper.RealmHelper;
import chat.rocket.android.service.internal.StreamRoomMessageManager;
import hugo.weaving.DebugLog;
import io.realm.Realm;
import io.realm.RealmResults;
......@@ -19,12 +20,16 @@ import java.util.List;
public class SessionObserver extends AbstractModelObserver<Session> {
private int count;
private final StreamRoomMessageManager streamNotifyMessage;
/**
* constructor.
*/
public SessionObserver(Context context, RealmHelper realmHelper, DDPClientWraper ddpClient) {
super(context, realmHelper, ddpClient);
count = 0;
streamNotifyMessage = new StreamRoomMessageManager(context, realmHelper, ddpClient);
}
@Override public RealmResults<Session> queryItems(Realm realm) {
......@@ -55,10 +60,12 @@ public class SessionObserver extends AbstractModelObserver<Session> {
}
@DebugLog private void onLogin() {
streamNotifyMessage.register();
}
@DebugLog private void onLogout() {
streamNotifyMessage.unregister();
realmHelper.executeTransaction(realm -> {
// remove all tables. ONLY INTERNAL TABLES!.
realm.delete(MethodCall.class);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment