Commit 0d0e4556 authored by Leonardo Aramaki's avatar Leonardo Aramaki

Some refactoring

parent 692cd67f
package chat.rocket.android.service; package chat.rocket.android.service;
import android.annotation.SuppressLint;
import android.content.ComponentName; import android.content.ComponentName;
import android.content.Context; import android.content.Context;
import android.content.ServiceConnection; import android.content.ServiceConnection;
...@@ -68,6 +69,7 @@ import io.reactivex.subjects.PublishSubject; ...@@ -68,6 +69,7 @@ import io.reactivex.subjects.PublishSubject;
} }
} }
@SuppressLint("RxLeakedSubscription")
@DebugLog @DebugLog
@Override @Override
public void ensureConnections() { public void ensureConnections() {
...@@ -84,6 +86,7 @@ import io.reactivex.subjects.PublishSubject; ...@@ -84,6 +86,7 @@ import io.reactivex.subjects.PublishSubject;
}); });
} }
@SuppressLint("RxLeakedSubscription")
@Override @Override
public void addOrUpdateServer(String hostname, @Nullable String name, boolean insecure) { public void addOrUpdateServer(String hostname, @Nullable String name, boolean insecure) {
RealmBasedServerInfo.addOrUpdate(hostname, name, insecure); RealmBasedServerInfo.addOrUpdate(hostname, name, insecure);
...@@ -95,6 +98,7 @@ import io.reactivex.subjects.PublishSubject; ...@@ -95,6 +98,7 @@ import io.reactivex.subjects.PublishSubject;
}, RCLog::e); }, RCLog::e);
} }
@SuppressLint("RxLeakedSubscription")
@Override @Override
public void removeServer(String hostname) { public void removeServer(String hostname) {
RealmBasedServerInfo.remove(hostname); RealmBasedServerInfo.remove(hostname);
......
...@@ -50,416 +50,398 @@ import io.reactivex.disposables.CompositeDisposable; ...@@ -50,416 +50,398 @@ import io.reactivex.disposables.CompositeDisposable;
* Thread for handling WebSocket connection. * Thread for handling WebSocket connection.
*/ */
public class RocketChatWebSocketThread extends HandlerThread { public class RocketChatWebSocketThread extends HandlerThread {
private static final Class[] REGISTERABLE_CLASSES = { private static final Class[] REGISTERABLE_CLASSES = {
LoginServiceConfigurationSubscriber.class, LoginServiceConfigurationSubscriber.class,
ActiveUsersSubscriber.class, ActiveUsersSubscriber.class,
UserDataSubscriber.class, UserDataSubscriber.class,
MethodCallObserver.class, MethodCallObserver.class,
SessionObserver.class, SessionObserver.class,
LoadMessageProcedureObserver.class, LoadMessageProcedureObserver.class,
GetUsersOfRoomsProcedureObserver.class, GetUsersOfRoomsProcedureObserver.class,
NewMessageObserver.class, NewMessageObserver.class,
DeletedMessageObserver.class, DeletedMessageObserver.class,
CurrentUserObserver.class, CurrentUserObserver.class,
FileUploadingToUrlObserver.class, FileUploadingToUrlObserver.class,
FileUploadingWithUfsObserver.class, FileUploadingWithUfsObserver.class,
PushSettingsObserver.class, PushSettingsObserver.class,
GcmPushRegistrationObserver.class GcmPushRegistrationObserver.class
}; };
private static final long HEARTBEAT_PERIOD_MS = 20000; private static final long HEARTBEAT_PERIOD_MS = 20000;
private final Context appContext; private final Context appContext;
private final String hostname; private final String hostname;
private final RealmHelper realmHelper; private final RealmHelper realmHelper;
private final ConnectivityManagerInternal connectivityManager; private final ConnectivityManagerInternal connectivityManager;
private final ArrayList<Registrable> listeners = new ArrayList<>(); private final ArrayList<Registrable> listeners = new ArrayList<>();
private final CompositeDisposable hearbeatDisposable = new CompositeDisposable(); private final CompositeDisposable heartbeatDisposable = new CompositeDisposable();
private final CompositeDisposable reconnectSubscription = new CompositeDisposable(); private final CompositeDisposable reconnectDisposable = new CompositeDisposable();
private boolean listenersRegistered; private boolean listenersRegistered;
private static class KeepAliveTimer { private static class KeepAliveTimer {
private long lastTime; private long lastTime;
private final long thresholdMs; private final long thresholdMs;
public KeepAliveTimer(long thresholdMs) { public KeepAliveTimer(long thresholdMs) {
this.thresholdMs = thresholdMs; this.thresholdMs = thresholdMs;
lastTime = System.currentTimeMillis(); lastTime = System.currentTimeMillis();
}
public boolean shouldCheckPrecisely() {
return lastTime + thresholdMs < System.currentTimeMillis();
}
public void update() {
lastTime = System.currentTimeMillis();
}
}
private final KeepAliveTimer keepAliveTimer = new KeepAliveTimer(20000);
private RocketChatWebSocketThread(Context appContext, String hostname) {
super("RC_thread_" + hostname);
this.appContext = appContext;
this.hostname = hostname;
this.realmHelper = RealmStore.getOrCreate(hostname);
this.connectivityManager = ConnectivityManager.getInstanceForInternal(appContext);
}
/**
* build new Thread.
*/
@DebugLog
public static Single<RocketChatWebSocketThread> getStarted(Context appContext, String hostname) {
return Single.<RocketChatWebSocketThread>create(objectSingleEmitter -> {
new RocketChatWebSocketThread(appContext, hostname) {
@Override
protected void onLooperPrepared() {
try {
super.onLooperPrepared();
objectSingleEmitter.onSuccess(this);
} catch (Exception exception) {
objectSingleEmitter.onError(exception);
}
}
}.start();
}).flatMap(webSocket ->
webSocket.connectWithExponentialBackoff().map(_val -> webSocket));
} }
public boolean shouldCheckPrecisely() { @Override
return lastTime + thresholdMs < System.currentTimeMillis(); protected void onLooperPrepared() {
super.onLooperPrepared();
forceInvalidateTokens();
} }
public void update() { private void forceInvalidateTokens() {
lastTime = System.currentTimeMillis(); realmHelper.executeTransaction(realm -> {
RealmSession session = RealmSession.queryDefaultSession(realm).findFirst();
if (session != null
&& !TextUtils.isEmpty(session.getToken())
&& (session.isTokenVerified() || !TextUtils.isEmpty(session.getError()))) {
session.setTokenVerified(false);
session.setError(null);
}
return null;
}).continueWith(new LogIfError());
} }
}
/**
private final KeepAliveTimer keepAliveTimer = new KeepAliveTimer(20000); * terminate WebSocket thread.
*/
private RocketChatWebSocketThread(Context appContext, String hostname) { @DebugLog
super("RC_thread_" + hostname); public Single<Boolean> terminate() {
this.appContext = appContext; if (isAlive()) {
this.hostname = hostname; return Single.create(emitter -> {
this.realmHelper = RealmStore.getOrCreate(hostname); new Handler(getLooper()).post(() -> {
this.connectivityManager = ConnectivityManager.getInstanceForInternal(appContext); RCLog.d("thread %s: terminated()", Thread.currentThread().getId());
} unregisterListenersAndClose();
connectivityManager.notifyConnectionLost(hostname,
/** DDPClient.REASON_CLOSED_BY_USER);
* build new Thread. RocketChatWebSocketThread.super.quit();
*/ emitter.onSuccess(true);
@DebugLog });
public static Single<RocketChatWebSocketThread> getStarted(Context appContext, String hostname) { });
return Single.<RocketChatWebSocketThread>fromPublisher(objectSingleEmitter -> { } else {
new RocketChatWebSocketThread(appContext, hostname) { connectivityManager.notifyConnectionLost(hostname,
@Override DDPClient.REASON_NETWORK_ERROR);
protected void onLooperPrepared() { super.quit();
try { return Single.just(true);
super.onLooperPrepared();
objectSingleEmitter.onNext(this);
objectSingleEmitter.onComplete();
} catch (Exception exception) {
objectSingleEmitter.onError(exception);
}
} }
}.start();
}).flatMap(webSocket ->
webSocket.connectWithExponentialBackoff().map(_val -> webSocket));
}
@Override
protected void onLooperPrepared() {
super.onLooperPrepared();
}
private void forceInvalidateTokens() {
realmHelper.executeTransaction(realm -> {
RealmSession session = RealmSession.queryDefaultSession(realm).findFirst();
if (session != null
&& !TextUtils.isEmpty(session.getToken())
&& (session.isTokenVerified() || !TextUtils.isEmpty(session.getError()))) {
session.setTokenVerified(false);
session.setError(null);
}
return null;
}).continueWith(new LogIfError());
}
/**
* terminate WebSocket thread.
*/
@DebugLog
public Single<Boolean> terminate() {
if (isAlive()) {
return Single.fromPublisher(emitter -> {
new Handler(getLooper()).post(() -> {
RCLog.d("thread %s: terminated()", Thread.currentThread().getId());
unregisterListenersAndClose();
connectivityManager.notifyConnectionLost(hostname,
DDPClient.REASON_CLOSED_BY_USER);
RocketChatWebSocketThread.super.quit();
emitter.onNext(true);
emitter.onComplete();
});
});
} else {
connectivityManager.notifyConnectionLost(hostname,
DDPClient.REASON_NETWORK_ERROR);
super.quit();
return Single.just(true);
} }
}
/**
/** * THIS METHOD THROWS EXCEPTION!! Use terminate() instead!!
* THIS METHOD THROWS EXCEPTION!! Use terminate() instead!! */
*/ @Deprecated
@Deprecated @Override
@Override public final boolean quit() {
public final boolean quit() { throw new UnsupportedOperationException();
throw new UnsupportedOperationException();
}
/**
* synchronize the state of the thread with ServerConfig.
*/
@DebugLog
public Single<Boolean> keepAlive() {
return checkIfConnectionAlive()
.flatMap(alive -> alive ? Single.just(true) : connectWithExponentialBackoff());
}
@DebugLog
private Single<Boolean> checkIfConnectionAlive() {
if (DDPClient.get() == null) {
return Single.just(false);
} }
if (!keepAliveTimer.shouldCheckPrecisely()) { /**
return Single.just(true); * synchronize the state of the thread with ServerConfig.
*/
@DebugLog
public Single<Boolean> keepAlive() {
return checkIfConnectionAlive()
.flatMap(alive -> alive ? Single.just(true) : connectWithExponentialBackoff());
} }
keepAliveTimer.update();
@DebugLog
return Single.fromPublisher(emitter -> { private Single<Boolean> checkIfConnectionAlive() {
new Thread() { if (DDPClient.get() == null) {
@Override return Single.just(false);
public void run() {
DDPClient.get().ping().continueWith(task -> {
if (task.isFaulted()) {
Exception error = task.getError();
RCLog.e(error);
connectivityManager.notifyConnectionLost(
hostname, DDPClient.REASON_CLOSED_BY_USER);
emitter.onError(error);
} else {
keepAliveTimer.update();
emitter.onNext(true);
emitter.onComplete();
}
return null;
});
} }
}.start();
});
}
@DebugLog
private Flowable<Boolean> heartbeat(long interval) {
return Flowable.interval(interval, TimeUnit.MILLISECONDS)
.onBackpressureDrop()
.flatMap(tick -> DDPClient.get().doPing().toFlowable())
.map(callback -> {
if (callback instanceof DDPClientCallback.Ping) {
return true;
}
// ideally we should never get here. We should always receive a DDPClientCallback.Ping
// because we just received a pong. But maybe we received a pong from an unmatched
// ping id which we should ignore. In this case or any other random error, log and
// send false downstream
RCLog.d("heartbeat pong < %s", callback.toString());
return false;
});
}
private Single<Boolean> prepareDDPClient() {
// TODO: temporarily replaced checkIfConnectionAlive() call for this single checking if ddpClient is
// null or not. In case it is, build a new client, otherwise just keep connecting with existing one.
return Single.just(DDPClient.get() != null)
.doOnSuccess(alive -> {
if (!alive) {
RCLog.d("DDPClient#build");
}
});
}
private Single<Boolean> connectDDPClient() {
return prepareDDPClient()
.flatMap(_val -> Single.fromPublisher(emitter -> {
ServerInfo info = connectivityManager.getServerInfoForHost(hostname);
if (info == null) {
emitter.onNext(false);
emitter.onComplete();
return;
}
RCLog.d("DDPClient#connect");
DDPClient.get().connect(hostname, info.getSession(), info.isSecure())
.onSuccessTask(task -> {
final String newSession = task.getResult().session;
connectivityManager.notifyConnectionEstablished(hostname, newSession);
// handling WebSocket#onClose() callback.
task.getResult().client.getOnCloseCallback().onSuccess(_task -> {
RxWebSocketCallback.Close result = _task.getResult();
if (result.code == DDPClient.REASON_NETWORK_ERROR) {
reconnect();
}
return null;
});
return realmHelper.executeTransaction(realm -> { if (!keepAliveTimer.shouldCheckPrecisely()) {
RealmSession sessionObj = RealmSession.queryDefaultSession(realm).findFirst(); return Single.just(true);
if (sessionObj == null) { }
realm.createOrUpdateObjectFromJson(RealmSession.class, keepAliveTimer.update();
new JSONObject().put(RealmSession.ID, RealmSession.DEFAULT_ID));
} else { return Single.create(emitter -> {
// invalidate login token. new Thread() {
if (!TextUtils.isEmpty(sessionObj.getToken()) && sessionObj.isTokenVerified()) { @Override
sessionObj.setTokenVerified(false); public void run() {
sessionObj.setError(null); DDPClient.get().ping().continueWith(task -> {
if (task.isFaulted()) {
Exception error = task.getError();
RCLog.e(error);
connectivityManager.notifyConnectionLost(
hostname, DDPClient.REASON_CLOSED_BY_USER);
emitter.onError(error);
} else {
keepAliveTimer.update();
emitter.onSuccess(true);
} }
return null;
}
return null;
}); });
}) }
.continueWith(task -> { }.start();
if (task.isFaulted()) { });
emitter.onError(task.getError());
} else {
emitter.onNext(true);
emitter.onComplete();
}
return null;
});
}));
}
private void reconnect() {
// if we are already trying to reconnect then return.
if (reconnectSubscription.size() > 0) {
return;
} }
connectivityManager.notifyConnecting(hostname);
reconnectSubscription.add( @DebugLog
connectWithExponentialBackoff() private Flowable<Boolean> heartbeat(long interval) {
.subscribe( return Flowable.interval(interval, TimeUnit.MILLISECONDS)
connected -> { .onBackpressureDrop()
if (!connected) { .flatMap(tick -> DDPClient.get().doPing().toFlowable())
connectivityManager.notifyConnecting(hostname); .map(callback -> {
if (callback instanceof DDPClientCallback.Ping) {
return true;
} }
reconnectSubscription.clear(); // ideally we should never get here. We should always receive a DDPClientCallback.Ping
}, // because we just received a pong. But maybe we received a pong from an unmatched
error -> { // ping id which we should ignore. In this case or any other random error, log and
logErrorAndUnsubscribe(reconnectSubscription, error); // send false downstream
connectivityManager.notifyConnectionLost(hostname, RCLog.d("heartbeat pong < %s", callback.toString());
DDPClient.REASON_NETWORK_ERROR); return false;
} });
)
);
}
private void logErrorAndUnsubscribe(CompositeDisposable disposables, Throwable err) {
RCLog.e(err);
disposables.clear();
}
private Single<Boolean> connectWithExponentialBackoff() {
return connect().retryWhen(RxHelper.exponentialBackoff(3, 500, TimeUnit.MILLISECONDS));
}
@DebugLog
private Single<Boolean> connect() {
return connectDDPClient()
.flatMap(_val -> Single.fromPublisher(emitter -> {
fetchPublicSettings();
fetchPermissions();
registerListeners();
emitter.onNext(true);
emitter.onComplete();
}));
}
private Task<Void> fetchPublicSettings() {
return new MethodCallHelper(appContext, realmHelper).getPublicSettings(hostname);
}
private Task<Void> fetchPermissions() {
return new MethodCallHelper(realmHelper).getPermissions();
}
@DebugLog
private void registerListeners() {
if (!Thread.currentThread().getName().equals("RC_thread_" + hostname)) {
// execute in Looper.
new Handler(getLooper()).post(this::registerListeners);
return;
} }
if (listenersRegistered) { private Single<Boolean> connectDDPClient() {
unregisterListeners(); return Single.create(emitter -> {
ServerInfo info = connectivityManager.getServerInfoForHost(hostname);
if (info == null) {
emitter.onSuccess(false);
return;
}
RCLog.d("DDPClient#connect");
DDPClient.get().connect(hostname, info.getSession(), info.isSecure())
.onSuccessTask(task -> {
final String newSession = task.getResult().session;
connectivityManager.notifyConnectionEstablished(hostname, newSession);
// handling WebSocket#onClose() callback.
task.getResult().client.getOnCloseCallback().onSuccess(_task -> {
RxWebSocketCallback.Close result = _task.getResult();
if (result.code == DDPClient.REASON_NETWORK_ERROR) {
reconnect();
}
return null;
});
return realmHelper.executeTransaction(realm -> {
RealmSession sessionObj = RealmSession.queryDefaultSession(realm).findFirst();
if (sessionObj == null) {
realm.createOrUpdateObjectFromJson(RealmSession.class,
new JSONObject().put(RealmSession.ID, RealmSession.DEFAULT_ID));
} else {
// invalidate login token.
if (!TextUtils.isEmpty(sessionObj.getToken()) && sessionObj.isTokenVerified()) {
sessionObj.setTokenVerified(false);
sessionObj.setError(null);
}
}
return null;
});
})
.continueWith(task -> {
if (task.isFaulted()) {
emitter.onError(task.getError());
} else {
emitter.onSuccess(true);
}
return null;
});
});
} }
List<RealmSession> sessions = realmHelper.executeTransactionForReadResults(realm -> private void reconnect() {
realm.where(RealmSession.class) // if we are already trying to reconnect then return.
.isNotNull(RealmSession.TOKEN) if (reconnectDisposable.size() > 0) {
.equalTo(RealmSession.TOKEN_VERIFIED, false) return;
.isNull(RealmSession.ERROR) }
.findAll()); forceInvalidateTokens();
connectivityManager.notifyConnecting(hostname);
if (sessions != null && sessions.size() > 0) { reconnectDisposable.add(
// if we have a session try to resume it. At this point we're probably recovering from connectWithExponentialBackoff()
// a disconnection state .subscribe(connected -> {
final CompositeDisposable disposables = new CompositeDisposable(); if (!connected) {
MethodCallHelper methodCall = new MethodCallHelper(realmHelper); connectivityManager.notifyConnecting(hostname);
disposables.add( }
Completable.defer(() -> { reconnectDisposable.clear();
Task<Void> result = methodCall.loginWithToken(sessions.get(0).getToken()); }, error -> {
if (result.isFaulted()) { logErrorAndUnsubscribe(reconnectDisposable, error);
return Completable.error(result.getError()); connectivityManager.notifyConnectionLost(hostname,
} else { DDPClient.REASON_NETWORK_ERROR);
return Completable.complete(); }
} )
}).retryWhen(RxHelper.exponentialBackoff(Integer.MAX_VALUE, 500, TimeUnit.MILLISECONDS)) );
.subscribe( }
() -> {
createObserversAndRegister(); private void logErrorAndUnsubscribe(CompositeDisposable disposables, Throwable err) {
disposables.clear(); RCLog.e(err);
}, disposables.clear();
error -> logErrorAndUnsubscribe(disposables, error) }
)
); private Single<Boolean> connectWithExponentialBackoff() {
} else { return connect().retryWhen(RxHelper.exponentialBackoff(3, 500, TimeUnit.MILLISECONDS));
// if we don't have any session then just build the observers and register normally }
createObserversAndRegister();
@DebugLog
private Single<Boolean> connect() {
return connectDDPClient()
.flatMap(_val -> Single.create(emitter -> {
fetchPublicSettings();
fetchPermissions();
registerListeners();
emitter.onSuccess(true);
}));
}
private Task<Void> fetchPublicSettings() {
return new MethodCallHelper(appContext, realmHelper).getPublicSettings(hostname);
}
private Task<Void> fetchPermissions() {
return new MethodCallHelper(realmHelper).getPermissions();
}
@DebugLog
private void registerListeners() {
if (!Thread.currentThread().getName().equals("RC_thread_" + hostname)) {
// execute in Looper.
new Handler(getLooper()).post(this::registerListeners);
return;
}
if (listenersRegistered) {
unregisterListeners();
}
List<RealmSession> sessions = realmHelper.executeTransactionForReadResults(realm ->
realm.where(RealmSession.class)
.isNotNull(RealmSession.TOKEN)
.equalTo(RealmSession.TOKEN_VERIFIED, false)
.isNull(RealmSession.ERROR)
.findAll());
if (sessions != null && sessions.size() > 0) {
// if we have a session try to resume it. At this point we're probably recovering from
// a disconnection state
final CompositeDisposable disposables = new CompositeDisposable();
MethodCallHelper methodCall = new MethodCallHelper(realmHelper);
disposables.add(
Completable.defer(() -> {
Task<Void> result = methodCall.loginWithToken(sessions.get(0).getToken());
if (result.isFaulted()) {
return Completable.error(result.getError());
} else {
return Completable.complete();
}
}).retryWhen(RxHelper.exponentialBackoff(3, 500, TimeUnit.MILLISECONDS))
.subscribe(
() -> {
createObserversAndRegister();
disposables.clear();
},
error -> logErrorAndUnsubscribe(disposables, error)
)
);
} else {
// if we don't have any session then just build the observers and register normally
createObserversAndRegister();
}
} }
}
@DebugLog
@DebugLog private void createObserversAndRegister() {
private void createObserversAndRegister() { for (Class clazz : REGISTERABLE_CLASSES) {
for (Class clazz : REGISTERABLE_CLASSES) { try {
try { Constructor ctor = clazz.getConstructor(Context.class, String.class, RealmHelper.class);
Constructor ctor = clazz.getConstructor(Context.class, String.class, RealmHelper.class); Object obj = ctor.newInstance(appContext, hostname, realmHelper);
Object obj = ctor.newInstance(appContext, hostname, realmHelper);
if (obj instanceof Registrable) {
if (obj instanceof Registrable) { Registrable registrable = (Registrable) obj;
Registrable registrable = (Registrable) obj; registrable.register();
registrable.register(); listeners.add(registrable);
listeners.add(registrable); }
} catch (Exception exception) {
RCLog.w(exception, "Failed to register listeners!!");
}
} }
} catch (Exception exception) { listenersRegistered = true;
RCLog.w(exception, "Failed to register listeners!!"); startHeartBeat();
}
} }
listenersRegistered = true;
startHeartBeat(); private void startHeartBeat() {
} heartbeatDisposable.clear();
heartbeatDisposable.add(
private void startHeartBeat() { heartbeat(HEARTBEAT_PERIOD_MS)
hearbeatDisposable.clear(); .subscribe(
hearbeatDisposable.add( ponged -> {
heartbeat(HEARTBEAT_PERIOD_MS) if (!ponged) {
.subscribe( RCLog.d("Pong received but didn't match ping id");
ponged -> { }
if (!ponged) { },
RCLog.d("Pong received but didn't match ping id"); error -> {
} RCLog.e(error);
}, // Stop pinging
error -> { heartbeatDisposable.clear();
RCLog.e(error); if (error instanceof DDPClientCallback.Closed || error instanceof TimeoutException) {
// Stop pinging RCLog.d("Hearbeat failure: retrying connection...");
hearbeatDisposable.clear(); reconnect();
if (error instanceof DDPClientCallback.Closed || error instanceof TimeoutException) { }
RCLog.d("Hearbeat failure: retrying connection..."); }
reconnect(); )
} );
} }
)
); @DebugLog
} private void unregisterListenersAndClose() {
unregisterListeners();
@DebugLog DDPClient.get().close();
private void unregisterListenersAndClose() { }
unregisterListeners();
DDPClient.get().close(); @DebugLog
} private void unregisterListeners() {
Iterator<Registrable> iterator = listeners.iterator();
@DebugLog while (iterator.hasNext()) {
private void unregisterListeners() { Registrable registrable = iterator.next();
Iterator<Registrable> iterator = listeners.iterator(); registrable.unregister();
while (iterator.hasNext()) { iterator.remove();
Registrable registrable = iterator.next(); }
registrable.unregister(); heartbeatDisposable.clear();
iterator.remove(); listenersRegistered = false;
} }
hearbeatDisposable.clear();
listenersRegistered = false;
}
} }
...@@ -7,6 +7,7 @@ import com.hadisatrio.optional.Optional; ...@@ -7,6 +7,7 @@ import com.hadisatrio.optional.Optional;
import chat.rocket.core.models.Session; import chat.rocket.core.models.Session;
import chat.rocket.core.repositories.SessionRepository; import chat.rocket.core.repositories.SessionRepository;
import chat.rocket.persistence.realm.RealmHelper;
import chat.rocket.persistence.realm.RealmStore; import chat.rocket.persistence.realm.RealmStore;
import chat.rocket.persistence.realm.models.internal.RealmSession; import chat.rocket.persistence.realm.models.internal.RealmSession;
import hu.akarnokd.rxjava.interop.RxJavaInterop; import hu.akarnokd.rxjava.interop.RxJavaInterop;
...@@ -35,7 +36,7 @@ public class RealmSessionRepository extends RealmRepository implements SessionRe ...@@ -35,7 +36,7 @@ public class RealmSessionRepository extends RealmRepository implements SessionRe
return RxJavaInterop.toV2Flowable(pair.first.where(RealmSession.class) return RxJavaInterop.toV2Flowable(pair.first.where(RealmSession.class)
.equalTo(RealmSession.ID, id) .equalTo(RealmSession.ID, id)
.findAll() .findAll()
.<RealmSession>asObservable()); .<RealmSession>asObservable().first());
}, },
pair -> close(pair.first, pair.second) pair -> close(pair.first, pair.second)
) )
...@@ -45,7 +46,7 @@ public class RealmSessionRepository extends RealmRepository implements SessionRe ...@@ -45,7 +46,7 @@ public class RealmSessionRepository extends RealmRepository implements SessionRe
if (realmSessions.size() == 0) { if (realmSessions.size() == 0) {
return Optional.absent(); return Optional.absent();
} }
return Optional.of(realmSessions.get(0).asSession()); return Optional.of(realmSessions.get(0).asSession());
})); }));
} }
...@@ -74,14 +75,9 @@ public class RealmSessionRepository extends RealmRepository implements SessionRe ...@@ -74,14 +75,9 @@ public class RealmSessionRepository extends RealmRepository implements SessionRe
realmSession.setTokenVerified(session.isTokenVerified()); realmSession.setTokenVerified(session.isTokenVerified());
realmSession.setError(session.getError()); realmSession.setError(session.getError());
realm.beginTransaction(); return RealmHelper.copyToRealmOrUpdate(realm, realmSession)
return RxJavaInterop.toV2Flowable(realm.copyToRealmOrUpdate(realmSession)
.asObservable())
.filter(it -> it != null && it.isLoaded() && it.isValid()) .filter(it -> it != null && it.isLoaded() && it.isValid())
.firstElement() .firstElement()
.doOnSuccess(it -> realm.commitTransaction())
.doOnError(throwable -> realm.cancelTransaction())
.doOnEvent((realmObject, throwable) -> close(realm, looper)) .doOnEvent((realmObject, throwable) -> close(realm, looper))
.toSingle() .toSingle()
.map(realmObject -> true); .map(realmObject -> true);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment