Commit 00f243a5 authored by Tiago Cunha's avatar Tiago Cunha

One step back, two forward

parent 50453051
...@@ -130,6 +130,7 @@ dependencies { ...@@ -130,6 +130,7 @@ dependencies {
provided "com.google.auto.value:auto-value:1.3" provided "com.google.auto.value:auto-value:1.3"
annotationProcessor "com.google.auto.value:auto-value:1.3" annotationProcessor "com.google.auto.value:auto-value:1.3"
annotationProcessor 'com.gabrielittner.auto.value:auto-value-with:1.0.0'
} }
apply plugin: 'com.google.gms.google-services' apply plugin: 'com.google.gms.google-services'
...@@ -11,11 +11,23 @@ public interface RoomContract { ...@@ -11,11 +11,23 @@ public interface RoomContract {
void render(Room room); void render(Room room);
void updateHistoryState(boolean hasNext, boolean isLoaded); void updateHistoryState(boolean hasNext, boolean isLoaded);
void onMessageSendSuccessfully();
} }
interface Presenter { interface Presenter {
void bindView(@NonNull View view); void bindView(@NonNull View view);
void release(); void release();
void loadMessages();
void loadMoreMessages();
void sendMessage(String messageText);
void resendMessage(String messageId);
void deleteMessage(String messageId);
} }
} }
...@@ -21,13 +21,10 @@ import android.support.v7.widget.RecyclerView; ...@@ -21,13 +21,10 @@ import android.support.v7.widget.RecyclerView;
import android.view.View; import android.view.View;
import com.jakewharton.rxbinding.support.v4.widget.RxDrawerLayout; import com.jakewharton.rxbinding.support.v4.widget.RxDrawerLayout;
import io.realm.Sort; import io.realm.Sort;
import org.json.JSONObject;
import java.lang.reflect.Field; import java.lang.reflect.Field;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.UUID;
import bolts.Task;
import chat.rocket.android.R; import chat.rocket.android.R;
import chat.rocket.android.api.MethodCallHelper; import chat.rocket.android.api.MethodCallHelper;
import chat.rocket.android.fragment.chatroom.dialog.FileUploadProgressDialogFragment; import chat.rocket.android.fragment.chatroom.dialog.FileUploadProgressDialogFragment;
...@@ -55,9 +52,10 @@ import chat.rocket.android.model.core.Room; ...@@ -55,9 +52,10 @@ import chat.rocket.android.model.core.Room;
import chat.rocket.android.model.ddp.RealmMessage; import chat.rocket.android.model.ddp.RealmMessage;
import chat.rocket.android.model.ddp.RoomSubscription; import chat.rocket.android.model.ddp.RoomSubscription;
import chat.rocket.android.model.ddp.RealmUser; import chat.rocket.android.model.ddp.RealmUser;
import chat.rocket.android.model.internal.LoadMessageProcedure;
import chat.rocket.android.model.internal.Session; import chat.rocket.android.model.internal.Session;
import chat.rocket.android.repositories.RealmMessageRepository;
import chat.rocket.android.repositories.RealmRoomRepository; import chat.rocket.android.repositories.RealmRoomRepository;
import chat.rocket.android.repositories.RealmUserRepository;
import chat.rocket.persistence.realm.RealmHelper; import chat.rocket.persistence.realm.RealmHelper;
import chat.rocket.persistence.realm.RealmModelListAdapter; import chat.rocket.persistence.realm.RealmModelListAdapter;
import chat.rocket.persistence.realm.RealmStore; import chat.rocket.persistence.realm.RealmStore;
...@@ -130,7 +128,10 @@ public class RoomFragment extends AbstractChatRoomFragment ...@@ -130,7 +128,10 @@ public class RoomFragment extends AbstractChatRoomFragment
presenter = new RoomPresenter( presenter = new RoomPresenter(
roomId, roomId,
new RealmRoomRepository(hostname) new RealmUserRepository(hostname),
new RealmRoomRepository(hostname),
new RealmMessageRepository(hostname),
ConnectivityManager.getInstance(getContext().getApplicationContext())
); );
realmHelper = RealmStore.get(hostname); realmHelper = RealmStore.get(hostname);
...@@ -261,20 +262,11 @@ public class RoomFragment extends AbstractChatRoomFragment ...@@ -261,20 +262,11 @@ public class RoomFragment extends AbstractChatRoomFragment
if (syncState == SyncState.FAILED) { if (syncState == SyncState.FAILED) {
final String messageId = pairedMessage.target.getId(); final String messageId = pairedMessage.target.getId();
new AlertDialog.Builder(getContext()) new AlertDialog.Builder(getContext())
.setPositiveButton(R.string.resend, (dialog, which) -> { .setPositiveButton(R.string.resend,
realmHelper.executeTransaction(realm -> (dialog, which) -> presenter.resendMessage(messageId))
realm.createOrUpdateObjectFromJson(RealmMessage.class, new JSONObject()
.put(RealmMessage.ID, messageId)
.put(RealmMessage.SYNC_STATE, SyncState.NOT_SYNCED))
).continueWith(new LogcatIfError());
})
.setNegativeButton(android.R.string.cancel, null) .setNegativeButton(android.R.string.cancel, null)
.setNeutralButton(R.string.discard, (dialog, which) -> { .setNeutralButton(R.string.discard,
realmHelper.executeTransaction(realm -> (dialog, which) -> presenter.deleteMessage(messageId))
realm.where(RealmMessage.class)
.equalTo(RealmMessage.ID, messageId).findAll().deleteAllFromRealm()
).continueWith(new LogcatIfError());
})
.show(); .show();
} }
} }
...@@ -370,40 +362,11 @@ public class RoomFragment extends AbstractChatRoomFragment ...@@ -370,40 +362,11 @@ public class RoomFragment extends AbstractChatRoomFragment
} }
private void initialRequest() { private void initialRequest() {
realmHelper.executeTransaction(realm -> { presenter.loadMessages();
realm.createOrUpdateObjectFromJson(LoadMessageProcedure.class, new JSONObject()
.put(LoadMessageProcedure.ID, roomId)
.put(LoadMessageProcedure.SYNC_STATE, SyncState.NOT_SYNCED)
.put(LoadMessageProcedure.COUNT, 100)
.put(LoadMessageProcedure.RESET, true));
return null;
}).onSuccessTask(task -> {
ConnectivityManager.getInstance(getContext().getApplicationContext())
.keepAliveServer();
return task;
}).continueWith(new LogcatIfError());
} }
private void loadMoreRequest() { private void loadMoreRequest() {
realmHelper.executeTransaction(realm -> { presenter.loadMoreMessages();
LoadMessageProcedure procedure = realm.where(LoadMessageProcedure.class)
.equalTo(LoadMessageProcedure.ID, roomId)
.beginGroup()
.equalTo(LoadMessageProcedure.SYNC_STATE, SyncState.SYNCED)
.or()
.equalTo(LoadMessageProcedure.SYNC_STATE, SyncState.FAILED)
.endGroup()
.equalTo(LoadMessageProcedure.HAS_NEXT, true)
.findFirst();
if (procedure != null) {
procedure.setSyncState(SyncState.NOT_SYNCED);
}
return null;
}).onSuccessTask(task -> {
ConnectivityManager.getInstance(getContext().getApplicationContext())
.keepAliveServer();
return task;
}).continueWith(new LogcatIfError());
} }
private void markAsReadIfNeeded() { private void markAsReadIfNeeded() {
...@@ -501,20 +464,8 @@ public class RoomFragment extends AbstractChatRoomFragment ...@@ -501,20 +464,8 @@ public class RoomFragment extends AbstractChatRoomFragment
return true; return true;
} }
private Task<Void> sendMessage(String messageText) { private void sendMessage(String messageText) {
return realmHelper.executeTransaction(realm -> presenter.sendMessage(messageText);
realm.createOrUpdateObjectFromJson(RealmMessage.class, new JSONObject()
.put(RealmMessage.ID, UUID.randomUUID().toString())
.put(RealmMessage.SYNC_STATE, SyncState.NOT_SYNCED)
.put(RealmMessage.TIMESTAMP, System.currentTimeMillis())
.put(RealmMessage.ROOM_ID, roomId)
.put(RealmMessage.USER, new JSONObject()
.put(RealmUser.ID, userId))
.put(RealmMessage.MESSAGE, messageText)))
.onSuccess(_task -> {
scrollToLatestMessage();
return null;
});
} }
@Override @Override
...@@ -535,4 +486,10 @@ public class RoomFragment extends AbstractChatRoomFragment ...@@ -535,4 +486,10 @@ public class RoomFragment extends AbstractChatRoomFragment
} }
adapter.updateFooter(hasNext, isLoaded); adapter.updateFooter(hasNext, isLoaded);
} }
@Override
public void onMessageSendSuccessfully() {
scrollToLatestMessage();
messageFormManager.onMessageSend();
}
} }
...@@ -2,9 +2,15 @@ package chat.rocket.android.fragment.chatroom; ...@@ -2,9 +2,15 @@ package chat.rocket.android.fragment.chatroom;
import android.support.annotation.NonNull; import android.support.annotation.NonNull;
import java.util.UUID;
import chat.rocket.android.BackgroundLooper; import chat.rocket.android.BackgroundLooper;
import chat.rocket.android.model.SyncState; import chat.rocket.android.model.SyncState;
import chat.rocket.android.repositories.RoomRepository; import chat.rocket.android.model.core.Message;
import chat.rocket.android.model.core.RoomHistoryState;
import chat.rocket.android.repositories.core.MessageRepository;
import chat.rocket.android.repositories.core.RoomRepository;
import chat.rocket.android.repositories.core.UserRepository;
import chat.rocket.android.service.ConnectivityManagerApi;
import rx.Subscription; import rx.Subscription;
import rx.android.schedulers.AndroidSchedulers; import rx.android.schedulers.AndroidSchedulers;
import rx.subscriptions.CompositeSubscription; import rx.subscriptions.CompositeSubscription;
...@@ -12,14 +18,23 @@ import rx.subscriptions.CompositeSubscription; ...@@ -12,14 +18,23 @@ import rx.subscriptions.CompositeSubscription;
public class RoomPresenter implements RoomContract.Presenter { public class RoomPresenter implements RoomContract.Presenter {
private final String roomId; private final String roomId;
private final UserRepository userRepository;
private final RoomRepository roomRepository; private final RoomRepository roomRepository;
private final MessageRepository messageRepository;
private final ConnectivityManagerApi connectivityManagerApi;
private CompositeSubscription compositeSubscription = new CompositeSubscription(); private CompositeSubscription compositeSubscription = new CompositeSubscription();
private RoomContract.View view; private RoomContract.View view;
public RoomPresenter(String roomId, RoomRepository roomRepository) { public RoomPresenter(String roomId, UserRepository userRepository,
RoomRepository roomRepository,
MessageRepository messageRepository,
ConnectivityManagerApi connectivityManagerApi) {
this.roomId = roomId; this.roomId = roomId;
this.userRepository = userRepository;
this.roomRepository = roomRepository; this.roomRepository = roomRepository;
this.messageRepository = messageRepository;
this.connectivityManagerApi = connectivityManagerApi;
} }
@Override @Override
...@@ -32,10 +47,110 @@ public class RoomPresenter implements RoomContract.Presenter { ...@@ -32,10 +47,110 @@ public class RoomPresenter implements RoomContract.Presenter {
@Override @Override
public void release() { public void release() {
compositeSubscription.unsubscribe(); compositeSubscription.clear();
this.view = null; this.view = null;
} }
@Override
public void loadMessages() {
RoomHistoryState roomHistoryState = RoomHistoryState.builder()
.setRoomId(roomId)
.setSyncState(SyncState.NOT_SYNCED)
.setCount(100)
.setReset(true)
.setComplete(false)
.setTimestamp(0)
.build();
final Subscription subscription = roomRepository.setHistoryState(roomHistoryState)
.subscribeOn(AndroidSchedulers.from(BackgroundLooper.get()))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(success -> {
if (success) {
connectivityManagerApi.keepAliveServer();
}
});
compositeSubscription.add(subscription);
}
@Override
public void loadMoreMessages() {
final Subscription subscription = roomRepository.getHistoryStateByRoomId(roomId)
.filter(roomHistoryState -> {
int syncState = roomHistoryState.getSyncState();
return !roomHistoryState.isComplete()
&& (syncState == SyncState.SYNCED || syncState == SyncState.FAILED);
})
.first()
.toSingle()
.flatMap(roomHistoryState -> roomRepository
.setHistoryState(roomHistoryState.withSyncState(SyncState.NOT_SYNCED)))
.subscribeOn(AndroidSchedulers.from(BackgroundLooper.get()))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(success -> {
if (success) {
connectivityManagerApi.keepAliveServer();
}
});
compositeSubscription.add(subscription);
}
@Override
public void sendMessage(String messageText) {
final Subscription subscription = userRepository.getCurrentUser()
.filter(user -> user != null)
.first()
.toSingle()
.flatMap(user -> {
Message message = Message.builder()
.setId(UUID.randomUUID().toString())
.setSyncState(SyncState.NOT_SYNCED)
.setTimestamp(System.currentTimeMillis())
.setRoomId(roomId)
.setMessage(messageText)
.setGroupable(false)
.setUser(user)
.build();
return messageRepository.save(message);
})
.subscribeOn(AndroidSchedulers.from(BackgroundLooper.get()))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(success -> {
if (success) {
view.onMessageSendSuccessfully();
}
});
compositeSubscription.add(subscription);
}
@Override
public void resendMessage(String messageId) {
final Subscription subscription = messageRepository.getById(messageId)
.map(message -> message.withSyncState(SyncState.NOT_SYNCED))
.flatMap(message -> messageRepository.resend(message))
.subscribeOn(AndroidSchedulers.from(BackgroundLooper.get()))
.observeOn(AndroidSchedulers.mainThread())
.subscribe();
compositeSubscription.add(subscription);
}
@Override
public void deleteMessage(String messageId) {
final Subscription subscription = messageRepository.getById(messageId)
.flatMap(message -> messageRepository.delete(message))
.subscribeOn(AndroidSchedulers.from(BackgroundLooper.get()))
.observeOn(AndroidSchedulers.mainThread())
.subscribe();
compositeSubscription.add(subscription);
}
private void getRoomInfo() { private void getRoomInfo() {
final Subscription subscription = roomRepository.getById(roomId) final Subscription subscription = roomRepository.getById(roomId)
.distinctUntilChanged() .distinctUntilChanged()
......
...@@ -7,11 +7,10 @@ import chat.rocket.android.api.MethodCallHelper; ...@@ -7,11 +7,10 @@ import chat.rocket.android.api.MethodCallHelper;
import chat.rocket.android.helper.LogcatIfError; import chat.rocket.android.helper.LogcatIfError;
import chat.rocket.android.helper.TextUtils; import chat.rocket.android.helper.TextUtils;
import chat.rocket.android.model.core.User; import chat.rocket.android.model.core.User;
import chat.rocket.android.repositories.RoomRepository; import chat.rocket.android.repositories.core.RoomRepository;
import chat.rocket.android.repositories.UserRepository; import chat.rocket.android.repositories.core.UserRepository;
import rx.Subscription; import rx.Subscription;
import rx.android.schedulers.AndroidSchedulers; import rx.android.schedulers.AndroidSchedulers;
import rx.schedulers.Schedulers;
import rx.subscriptions.CompositeSubscription; import rx.subscriptions.CompositeSubscription;
public class SidebarMainPresenter implements SidebarMainContract.Presenter { public class SidebarMainPresenter implements SidebarMainContract.Presenter {
...@@ -50,7 +49,7 @@ public class SidebarMainPresenter implements SidebarMainContract.Presenter { ...@@ -50,7 +49,7 @@ public class SidebarMainPresenter implements SidebarMainContract.Presenter {
@Override @Override
public void release() { public void release() {
compositeSubscription.unsubscribe(); compositeSubscription.clear();
view = null; view = null;
} }
......
package chat.rocket.android.layouthelper.chatroom; package chat.rocket.android.layouthelper.chatroom;
import bolts.Task;
import chat.rocket.android.widget.message.MessageFormLayout; import chat.rocket.android.widget.message.MessageFormLayout;
/** /**
...@@ -29,22 +28,21 @@ public class MessageFormManager { ...@@ -29,22 +28,21 @@ public class MessageFormManager {
messageFormLayout.setText(""); messageFormLayout.setText("");
} }
public void onMessageSend() {
clearComposingText();
messageFormLayout.setEnabled(true);
}
private void sendMessage(String message) { private void sendMessage(String message) {
if (sendMessageCallback == null) { if (sendMessageCallback == null) {
return; return;
} }
messageFormLayout.setEnabled(false); messageFormLayout.setEnabled(false);
sendMessageCallback.onSubmitText(message).onSuccess(task -> { sendMessageCallback.onSubmitText(message);
clearComposingText();
return null;
}).continueWith(task -> {
messageFormLayout.setEnabled(true);
return null;
});
} }
public interface SendMessageCallback { public interface SendMessageCallback {
Task<Void> onSubmitText(String messageText); void onSubmitText(String messageText);
} }
} }
...@@ -22,6 +22,7 @@ public abstract class Message { ...@@ -22,6 +22,7 @@ public abstract class Message {
public abstract String getMessage(); public abstract String getMessage();
@Nullable
public abstract User getUser(); public abstract User getUser();
public abstract boolean isGroupable(); public abstract boolean isGroupable();
...@@ -38,6 +39,8 @@ public abstract class Message { ...@@ -38,6 +39,8 @@ public abstract class Message {
@Nullable @Nullable
public abstract String getAvatar(); public abstract String getAvatar();
public abstract Message withSyncState(int syncState);
public static Builder builder() { public static Builder builder() {
return new AutoValue_Message.Builder(); return new AutoValue_Message.Builder();
} }
......
...@@ -17,6 +17,8 @@ public abstract class RoomHistoryState { ...@@ -17,6 +17,8 @@ public abstract class RoomHistoryState {
public abstract boolean isComplete(); public abstract boolean isComplete();
public abstract RoomHistoryState withSyncState(int syncState);
public static Builder builder() { public static Builder builder() {
return new AutoValue_RoomHistoryState.Builder(); return new AutoValue_RoomHistoryState.Builder();
} }
......
...@@ -168,7 +168,7 @@ public class RealmMessage extends RealmObject { ...@@ -168,7 +168,7 @@ public class RealmMessage extends RealmObject {
.setSyncState(syncstate) .setSyncState(syncstate)
.setTimestamp(ts) .setTimestamp(ts)
.setMessage(msg) .setMessage(msg)
.setUser(u.asUser()) .setUser(u != null ? u.asUser() : null)
.setGroupable(groupable) .setGroupable(groupable)
.setAlias(alias) .setAlias(alias)
.setAvatar(avatar) .setAvatar(avatar)
......
package chat.rocket.android.repositories;
import android.os.Looper;
import io.realm.Realm;
import io.realm.RealmResults;
import org.json.JSONObject;
import chat.rocket.android.model.core.Message;
import chat.rocket.android.model.core.Room;
import chat.rocket.android.model.ddp.RealmMessage;
import chat.rocket.android.model.ddp.RealmUser;
import chat.rocket.android.repositories.core.MessageRepository;
import chat.rocket.persistence.realm.RealmStore;
import rx.Observable;
import rx.Single;
import rx.android.schedulers.AndroidSchedulers;
public class RealmMessageRepository extends RealmRepository implements MessageRepository {
private final String hostname;
public RealmMessageRepository(String hostname) {
this.hostname = hostname;
}
@Override
public Single<Message> getById(String messageId) {
return Single.defer(() -> {
final Realm realm = RealmStore.getRealm(hostname);
final Looper looper = Looper.myLooper();
if (realm == null) {
return Single.just(null);
}
final RealmMessage realmMessage = realm.where(RealmMessage.class)
.equalTo(RealmMessage.ID, messageId)
.findFirst();
if (realmMessage == null) {
realm.close();
return Single.just(null);
}
return realmMessage
.<RealmMessage>asObservable()
.unsubscribeOn(AndroidSchedulers.from(looper))
.doOnUnsubscribe(() -> close(realm, looper))
.filter(it -> it != null && it.isLoaded()
&& it.isValid())
.first()
.toSingle()
.map(it -> it.asMessage());
});
}
@Override
public Single<Boolean> save(Message message) {
return Single.defer(() -> {
final Realm realm = RealmStore.getRealm(hostname);
final Looper looper = Looper.myLooper();
if (realm == null) {
return Single.just(false);
}
// need to improve this for real
final JSONObject messageToSend = new JSONObject()
.put(RealmMessage.ID, message.getId())
.put(RealmMessage.SYNC_STATE, message.getSyncState())
.put(RealmMessage.TIMESTAMP, message.getTimestamp())
.put(RealmMessage.ROOM_ID, message.getRoomId())
.put(RealmMessage.USER, new JSONObject()
.put(RealmUser.ID, message.getUser().getId()))
.put(RealmMessage.MESSAGE, message.getMessage());
realm.beginTransaction();
return realm.createOrUpdateObjectFromJson(RealmMessage.class, messageToSend)
.asObservable()
.unsubscribeOn(AndroidSchedulers.from(looper))
.doOnUnsubscribe(() -> close(realm, looper))
.filter(it -> it != null && it.isLoaded() && it.isValid())
.first()
.doOnNext(it -> realm.commitTransaction())
.toSingle()
.map(realmObject -> true);
});
}
@Override
public Single<Boolean> resend(Message message) {
return Single.defer(() -> {
final Realm realm = RealmStore.getRealm(hostname);
final Looper looper = Looper.myLooper();
if (realm == null) {
return Single.just(false);
}
final JSONObject messageToSend = new JSONObject()
.put(RealmMessage.ID, message.getId())
.put(RealmMessage.SYNC_STATE, message.getSyncState());
realm.beginTransaction();
return realm.createOrUpdateObjectFromJson(RealmMessage.class, messageToSend)
.asObservable()
.unsubscribeOn(AndroidSchedulers.from(looper))
.doOnUnsubscribe(() -> close(realm, looper))
.filter(it -> it != null && it.isLoaded() && it.isValid())
.first()
.doOnNext(it -> realm.commitTransaction())
.toSingle()
.map(realmObject -> true);
});
}
@Override
public Single<Boolean> delete(Message message) {
return Single.defer(() -> {
final Realm realm = RealmStore.getRealm(hostname);
final Looper looper = Looper.myLooper();
if (realm == null) {
return Single.just(false);
}
realm.beginTransaction();
return realm.where(RealmMessage.class)
.equalTo(RealmMessage.ID, message.getId())
.findAll()
.<RealmResults<RealmMessage>>asObservable()
.unsubscribeOn(AndroidSchedulers.from(looper))
.doOnUnsubscribe(() -> close(realm, looper))
.filter(realmObject -> realmObject != null
&& realmObject.isLoaded() && realmObject.isValid())
.first()
.toSingle()
.flatMap(realmMessages -> Single.just(realmMessages.deleteAllFromRealm()))
.doOnEach(notification -> {
if (notification.getValue()) {
realm.commitTransaction();
} else {
realm.cancelTransaction();
}
});
});
}
@Override
public Observable<Message> getAllFrom(Room room) {
return null;
}
@Override
public Single<Integer> unreadCountFrom(Room room) {
return null;
}
}
...@@ -10,8 +10,10 @@ import chat.rocket.android.model.core.Room; ...@@ -10,8 +10,10 @@ import chat.rocket.android.model.core.Room;
import chat.rocket.android.model.core.RoomHistoryState; import chat.rocket.android.model.core.RoomHistoryState;
import chat.rocket.android.model.ddp.RoomSubscription; import chat.rocket.android.model.ddp.RoomSubscription;
import chat.rocket.android.model.internal.LoadMessageProcedure; import chat.rocket.android.model.internal.LoadMessageProcedure;
import chat.rocket.android.repositories.core.RoomRepository;
import chat.rocket.persistence.realm.RealmStore; import chat.rocket.persistence.realm.RealmStore;
import rx.Observable; import rx.Observable;
import rx.Single;
import rx.android.schedulers.AndroidSchedulers; import rx.android.schedulers.AndroidSchedulers;
public class RealmRoomRepository extends RealmRepository implements RoomRepository { public class RealmRoomRepository extends RealmRepository implements RoomRepository {
...@@ -56,7 +58,7 @@ public class RealmRoomRepository extends RealmRepository implements RoomReposito ...@@ -56,7 +58,7 @@ public class RealmRoomRepository extends RealmRepository implements RoomReposito
return realm.where(RoomSubscription.class) return realm.where(RoomSubscription.class)
.equalTo(RoomSubscription.ROOM_ID, roomId) .equalTo(RoomSubscription.ROOM_ID, roomId)
.findFirstAsync() .findFirst()
.<RoomSubscription>asObservable() .<RoomSubscription>asObservable()
.unsubscribeOn(AndroidSchedulers.from(looper)) .unsubscribeOn(AndroidSchedulers.from(looper))
.doOnUnsubscribe(() -> close(realm, looper)) .doOnUnsubscribe(() -> close(realm, looper))
...@@ -78,7 +80,7 @@ public class RealmRoomRepository extends RealmRepository implements RoomReposito ...@@ -78,7 +80,7 @@ public class RealmRoomRepository extends RealmRepository implements RoomReposito
return realm.where(LoadMessageProcedure.class) return realm.where(LoadMessageProcedure.class)
.equalTo(LoadMessageProcedure.ID, roomId) .equalTo(LoadMessageProcedure.ID, roomId)
.findFirstAsync() .findFirst()
.<LoadMessageProcedure>asObservable() .<LoadMessageProcedure>asObservable()
.unsubscribeOn(AndroidSchedulers.from(looper)) .unsubscribeOn(AndroidSchedulers.from(looper))
.doOnUnsubscribe(() -> close(realm, looper)) .doOnUnsubscribe(() -> close(realm, looper))
...@@ -88,6 +90,39 @@ public class RealmRoomRepository extends RealmRepository implements RoomReposito ...@@ -88,6 +90,39 @@ public class RealmRoomRepository extends RealmRepository implements RoomReposito
}); });
} }
@Override
public Single<Boolean> setHistoryState(RoomHistoryState roomHistoryState) {
return Single.defer(() -> {
final Realm realm = RealmStore.getRealm(hostname);
final Looper looper = Looper.myLooper();
if (realm == null) {
return Single.just(false);
}
LoadMessageProcedure loadMessage = new LoadMessageProcedure();
loadMessage.setRoomId(roomHistoryState.getRoomId());
loadMessage.setSyncState(roomHistoryState.getSyncState());
loadMessage.setCount(roomHistoryState.getCount());
loadMessage.setReset(roomHistoryState.isReset());
loadMessage.setHasNext(!roomHistoryState.isComplete());
loadMessage.setTimestamp(roomHistoryState.getTimestamp());
realm.beginTransaction();
return realm.copyToRealmOrUpdate(loadMessage)
.asObservable()
.unsubscribeOn(AndroidSchedulers.from(looper))
.doOnUnsubscribe(() -> close(realm, looper))
.filter(realmObject -> realmObject != null
&& realmObject.isLoaded() && realmObject.isValid())
.first()
.doOnNext(realmObject -> realm.commitTransaction())
.toSingle()
.map(realmObject -> true);
});
}
private List<Room> toList(RealmResults<RoomSubscription> roomSubscriptions) { private List<Room> toList(RealmResults<RoomSubscription> roomSubscriptions) {
int total = roomSubscriptions.size(); int total = roomSubscriptions.size();
......
...@@ -5,6 +5,7 @@ import io.realm.Realm; ...@@ -5,6 +5,7 @@ import io.realm.Realm;
import chat.rocket.android.model.core.User; import chat.rocket.android.model.core.User;
import chat.rocket.android.model.ddp.RealmUser; import chat.rocket.android.model.ddp.RealmUser;
import chat.rocket.android.repositories.core.UserRepository;
import chat.rocket.persistence.realm.RealmStore; import chat.rocket.persistence.realm.RealmStore;
import rx.Observable; import rx.Observable;
import rx.android.schedulers.AndroidSchedulers; import rx.android.schedulers.AndroidSchedulers;
......
package chat.rocket.android.repositories.core;
import chat.rocket.android.model.core.Message;
import chat.rocket.android.model.core.Room;
import rx.Observable;
import rx.Single;
public interface MessageRepository {
Single<Message> getById(String messageId);
Single<Boolean> save(Message message);
Single<Boolean> resend(Message message);
Single<Boolean> delete(Message message);
Observable<Message> getAllFrom(Room room);
Single<Integer> unreadCountFrom(Room room);
}
package chat.rocket.android.repositories; package chat.rocket.android.repositories.core;
import java.util.List; import java.util.List;
import chat.rocket.android.model.core.Room; import chat.rocket.android.model.core.Room;
import chat.rocket.android.model.core.RoomHistoryState; import chat.rocket.android.model.core.RoomHistoryState;
import rx.Observable; import rx.Observable;
import rx.Single;
public interface RoomRepository { public interface RoomRepository {
...@@ -12,4 +13,6 @@ public interface RoomRepository { ...@@ -12,4 +13,6 @@ public interface RoomRepository {
Observable<Room> getById(String roomId); Observable<Room> getById(String roomId);
Observable<RoomHistoryState> getHistoryStateByRoomId(String roomId); Observable<RoomHistoryState> getHistoryStateByRoomId(String roomId);
Single<Boolean> setHistoryState(RoomHistoryState roomHistoryState);
} }
package chat.rocket.android.repositories; package chat.rocket.android.repositories.core;
import chat.rocket.android.model.core.User; import chat.rocket.android.model.core.User;
import rx.Observable; import rx.Observable;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment