Commit 1ed4286b authored by Leonardo Aramaki's avatar Leonardo Aramaki

Synchrounously execute transactions for copyToRealmOrUpdate

parent 0d1677b1
...@@ -21,6 +21,7 @@ import chat.rocket.core.models.Settings; ...@@ -21,6 +21,7 @@ import chat.rocket.core.models.Settings;
import chat.rocket.core.models.User; import chat.rocket.core.models.User;
import chat.rocket.core.repositories.RoomRepository; import chat.rocket.core.repositories.RoomRepository;
import chat.rocket.core.repositories.UserRepository; import chat.rocket.core.repositories.UserRepository;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Single; import io.reactivex.Single;
import io.reactivex.android.schedulers.AndroidSchedulers; import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.disposables.Disposable; import io.reactivex.disposables.Disposable;
......
...@@ -8,15 +8,20 @@ import org.json.JSONException; ...@@ -8,15 +8,20 @@ import org.json.JSONException;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.function.BiFunction;
import java.util.function.Function;
import bolts.Task; import bolts.Task;
import bolts.TaskCompletionSource; import bolts.TaskCompletionSource;
import chat.rocket.android.log.RCLog; import chat.rocket.android.log.RCLog;
import io.reactivex.Flowable;
import io.realm.Realm; import io.realm.Realm;
import io.realm.RealmConfiguration; import io.realm.RealmConfiguration;
import io.realm.RealmModel;
import io.realm.RealmObject; import io.realm.RealmObject;
import io.realm.RealmQuery; import io.realm.RealmQuery;
import io.realm.RealmResults; import io.realm.RealmResults;
import io.realm.log.RealmLog;
@SuppressLint("NewApi") @SuppressLint("NewApi")
public class RealmHelper { public class RealmHelper {
...@@ -166,6 +171,25 @@ public class RealmHelper { ...@@ -166,6 +171,25 @@ public class RealmHelper {
return constructor.getNewInstance(context).initializeWith(this, filter); return constructor.getNewInstance(context).initializeWith(this, filter);
} }
public static <T extends RealmModel> Flowable<T> copyToRealmOrUpdate(Realm realm, T objectToCopy) {
return Flowable.defer(() -> {
realm.beginTransaction();
try {
T object = realm.copyToRealmOrUpdate(objectToCopy);
realm.commitTransaction();
return Flowable.just(object);
} catch (Throwable e) {
if (realm.isInTransaction()) {
realm.cancelTransaction();
} else {
RealmLog.warn("Could not cancel transaction, not currently in a transaction.");
}
throw e;
}
});
}
public interface Transaction<T> { public interface Transaction<T> {
T execute(Realm realm) throws JSONException; T execute(Realm realm) throws JSONException;
} }
......
...@@ -13,6 +13,7 @@ import chat.rocket.core.models.Message; ...@@ -13,6 +13,7 @@ import chat.rocket.core.models.Message;
import chat.rocket.core.models.Room; import chat.rocket.core.models.Room;
import chat.rocket.core.models.User; import chat.rocket.core.models.User;
import chat.rocket.core.repositories.MessageRepository; import chat.rocket.core.repositories.MessageRepository;
import chat.rocket.persistence.realm.RealmHelper;
import chat.rocket.persistence.realm.RealmStore; import chat.rocket.persistence.realm.RealmStore;
import chat.rocket.persistence.realm.models.ddp.RealmMessage; import chat.rocket.persistence.realm.models.ddp.RealmMessage;
import chat.rocket.persistence.realm.models.ddp.RealmUser; import chat.rocket.persistence.realm.models.ddp.RealmUser;
...@@ -89,16 +90,12 @@ public class RealmMessageRepository extends RealmRepository implements MessageRe ...@@ -89,16 +90,12 @@ public class RealmMessageRepository extends RealmRepository implements MessageRe
} }
realmMessage.setUser(realmUser); realmMessage.setUser(realmUser);
realm.beginTransaction(); final RealmMessage messageToSave = realmMessage;
return realm.copyToRealmOrUpdate(realmMessage) return RealmHelper.copyToRealmOrUpdate(realm, messageToSave)
.asFlowable()
.filter(it -> it.isLoaded() && it.isValid()) .filter(it -> it.isLoaded() && it.isValid())
.firstElement() .first(new RealmMessage())
.doOnSuccess(it -> realm.commitTransaction())
.doOnError(throwable -> realm.cancelTransaction())
.doOnEvent((realmObject, throwable) -> close(realm, looper)) .doOnEvent((realmObject, throwable) -> close(realm, looper))
.toSingle()
.map(realmObject -> true); .map(realmObject -> true);
}); });
} }
......
...@@ -12,6 +12,7 @@ import chat.rocket.core.SortDirection; ...@@ -12,6 +12,7 @@ import chat.rocket.core.SortDirection;
import chat.rocket.core.models.Room; import chat.rocket.core.models.Room;
import chat.rocket.core.models.RoomHistoryState; import chat.rocket.core.models.RoomHistoryState;
import chat.rocket.core.repositories.RoomRepository; import chat.rocket.core.repositories.RoomRepository;
import chat.rocket.persistence.realm.RealmHelper;
import chat.rocket.persistence.realm.RealmStore; import chat.rocket.persistence.realm.RealmStore;
import chat.rocket.persistence.realm.models.ddp.RealmRoom; import chat.rocket.persistence.realm.models.ddp.RealmRoom;
import chat.rocket.persistence.realm.models.internal.LoadMessageProcedure; import chat.rocket.persistence.realm.models.internal.LoadMessageProcedure;
...@@ -22,7 +23,6 @@ import io.realm.Case; ...@@ -22,7 +23,6 @@ import io.realm.Case;
import io.realm.Realm; import io.realm.Realm;
import io.realm.RealmResults; import io.realm.RealmResults;
import io.realm.Sort; import io.realm.Sort;
import io.realm.log.RealmLog;
public class RealmRoomRepository extends RealmRepository implements RoomRepository { public class RealmRoomRepository extends RealmRepository implements RoomRepository {
...@@ -139,24 +139,9 @@ public class RealmRoomRepository extends RealmRepository implements RoomReposito ...@@ -139,24 +139,9 @@ public class RealmRoomRepository extends RealmRepository implements RoomReposito
loadMessage.setHasNext(!roomHistoryState.isComplete()); loadMessage.setHasNext(!roomHistoryState.isComplete());
loadMessage.setTimestamp(roomHistoryState.getTimestamp()); loadMessage.setTimestamp(roomHistoryState.getTimestamp());
return Flowable.defer(() -> { return RealmHelper.copyToRealmOrUpdate(realm, loadMessage)
realm.beginTransaction();
try {
LoadMessageProcedure loadMessageProcedure = realm.copyToRealmOrUpdate(loadMessage);
realm.commitTransaction();
return Flowable.just(loadMessageProcedure);
} catch (Throwable e) {
if (realm.isInTransaction()) {
realm.cancelTransaction();
} else {
RealmLog.warn("Could not cancel transaction, not currently in a transaction.");
}
throw e;
}
})
.filter(realmObject -> realmObject.isLoaded() && realmObject.isValid()) .filter(realmObject -> realmObject.isLoaded() && realmObject.isValid())
.firstElement() .firstElement()
.doOnError(throwable -> realm.cancelTransaction())
.doOnEvent((realmObject, throwable) -> close(realm, looper)) .doOnEvent((realmObject, throwable) -> close(realm, looper))
.toSingle() .toSingle()
.map(realmObject -> true); .map(realmObject -> true);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment