Commit 57830c54 authored by Leonardo Aramaki's avatar Leonardo Aramaki

Turn DDPClient class a singleton and get rid of DDPClientRef and DDPClientWrapper

parent d4f4c5b4
package chat.rocket.android_ddp;
import android.text.TextUtils;
import chat.rocket.android.log.RCLog;
import io.reactivex.Flowable;
import io.reactivex.Maybe;
import io.reactivex.annotations.NonNull;
import io.reactivex.annotations.Nullable;
import org.json.JSONArray;
import org.json.JSONException;
import java.util.UUID;
import bolts.Task;
import bolts.TaskCompletionSource;
......@@ -14,9 +21,28 @@ import okhttp3.OkHttpClient;
public class DDPClient {
// reference: https://github.com/eddflrs/meteor-ddp/blob/master/meteor-ddp.js
private static volatile DDPClient singleton;
private static OkHttpClient client;
private final DDPClientImpl impl;
public DDPClient(OkHttpClient client) {
public static void initialize(OkHttpClient okHttpClient) {
client = okHttpClient;
}
public static DDPClient get() {
DDPClient result = singleton;
if (result == null) {
synchronized (DDPClient.class) {
result = singleton;
if (result == null) {
singleton = result = new DDPClient(client);
}
}
}
return result;
}
private DDPClient(OkHttpClient client) {
impl = new DDPClientImpl(this, client);
}
......@@ -59,10 +85,6 @@ public class DDPClient {
return task.getTask();
}
public Flowable<DDPSubscription.Event> getSubscriptionCallback() {
return impl.getDDPSubscription();
}
public Task<RxWebSocketCallback.Close> getOnCloseCallback() {
return impl.getOnCloseCallback();
}
......@@ -70,4 +92,98 @@ public class DDPClient {
public void close() {
impl.close(1000, "closed by DDPClient#close()");
}
/**
* check WebSocket connectivity with ping.
*/
public Task<Void> ping() {
final String pingId = UUID.randomUUID().toString();
RCLog.d("ping[%s] >", pingId);
return ping(pingId)
.continueWithTask(task -> {
if (task.isFaulted()) {
RCLog.d(task.getError(), "ping[%s] xxx failed xxx", pingId);
return Task.forError(task.getError());
} else {
RCLog.d("pong[%s] <", pingId);
return Task.forResult(null);
}
});
}
/**
* check WebSocket connectivity with ping.
*/
public Maybe<DDPClientCallback.Base> doPing() {
final String pingId = UUID.randomUUID().toString();
RCLog.d("ping[%s] >", pingId);
return doPing(pingId);
}
/**
* Connect to WebSocket server with DDP client.
*/
public Task<DDPClientCallback.Connect> connect(@NonNull String hostname, @Nullable String session,
boolean usesSecureConnection) {
final String protocol = usesSecureConnection ? "wss://" : "ws://";
return connect(protocol + hostname + "/websocket", session);
}
/**
* Subscribe with DDP client.
*/
public Task<DDPSubscription.Ready> subscribe(final String name, JSONArray param) {
final String subscriptionId = UUID.randomUUID().toString();
RCLog.d("sub:[%s]> %s(%s)", subscriptionId, name, param);
return sub(subscriptionId, name, param);
}
/**
* Unsubscribe with DDP client.
*/
public Task<DDPSubscription.NoSub> unsubscribe(final String subscriptionId) {
RCLog.d("unsub:[%s]>", subscriptionId);
return unsub(subscriptionId);
}
/**
* Returns Observable for handling DDP subscription.
*/
public Flowable<DDPSubscription.Event> getSubscriptionCallback() {
return impl.getDDPSubscription();
}
/**
* Execute raw RPC.
*/
public Task<DDPClientCallback.RPC> rpc(String methodCallId, String methodName, String params,
long timeoutMs) {
TaskCompletionSource<DDPClientCallback.RPC> task = new TaskCompletionSource<>();
RCLog.d("rpc:[%s]> %s(%s) timeout=%d", methodCallId, methodName, params, timeoutMs);
if (TextUtils.isEmpty(params)) {
impl.rpc(task, methodName, null, methodCallId, timeoutMs);
return task.getTask().continueWithTask(task_ -> {
if (task_.isFaulted()) {
RCLog.d("rpc:[%s]< error = %s", methodCallId, task_.getError());
} else {
RCLog.d("rpc:[%s]< result = %s", methodCallId, task_.getResult().result);
}
return task_;
});
}
try {
impl.rpc(task, methodName, new JSONArray(params), methodCallId, timeoutMs);
return task.getTask().continueWithTask(task_ -> {
if (task_.isFaulted()) {
RCLog.d("rpc:[%s]< error = %s", methodCallId, task_.getError());
} else {
RCLog.d("rpc:[%s]< result = %s", methodCallId, task_.getResult().result);
}
return task_;
});
} catch (JSONException exception) {
return Task.forError(exception);
}
}
}
......@@ -7,6 +7,7 @@ import android.support.v7.app.AppCompatDelegate;
import chat.rocket.android.helper.OkHttpHelper;
import com.crashlytics.android.Crashlytics;
import chat.rocket.android_ddp.DDPClient;
import io.fabric.sdk.android.Fabric;
import java.util.List;
import chat.rocket.persistence.realm.RealmStore;
......@@ -29,6 +30,7 @@ public class RocketChatApplication extends MultiDexApplication {
@Override
public void onCreate() {
super.onCreate();
DDPClient.initialize(OkHttpHelper.INSTANCE.getClientForWebSocket());
Fabric.with(this, new Crashlytics());
RocketChatPersistenceRealm.init(this);
......
package chat.rocket.android.api;
import android.support.annotation.Nullable;
import chat.rocket.android.helper.OkHttpHelper;
import org.json.JSONArray;
import org.json.JSONException;
import java.util.UUID;
import bolts.Task;
import chat.rocket.android.helper.TextUtils;
import chat.rocket.android.log.RCLog;
import chat.rocket.android_ddp.DDPClient;
import chat.rocket.android_ddp.DDPClientCallback;
import chat.rocket.android_ddp.DDPSubscription;
import io.reactivex.Flowable;
import io.reactivex.Maybe;
/**
* DDP client wrapper.
*/
public class DDPClientWrapper {
private final DDPClient ddpClient;
private final String hostname;
private DDPClientWrapper(String hostname) {
ddpClient = new DDPClient(OkHttpHelper.INSTANCE.getClientForWebSocket());
this.hostname = hostname;
}
/**
* build new API client instance.
*/
public static DDPClientWrapper create(String hostname) {
return new DDPClientWrapper(hostname);
}
/**
* Connect to WebSocket server with DDP client.
*/
public Task<DDPClientCallback.Connect> connect(@Nullable String session,
boolean usesSecureConnection) {
final String protocol = usesSecureConnection ? "wss://" : "ws://";
return ddpClient.connect(protocol + hostname + "/websocket", session);
}
/**
* close connection.
*/
public void close() {
ddpClient.close();
}
/**
* Subscribe with DDP client.
*/
public Task<DDPSubscription.Ready> subscribe(final String name, JSONArray param) {
final String subscriptionId = UUID.randomUUID().toString();
RCLog.d("sub:[%s]> %s(%s)", subscriptionId, name, param);
return ddpClient.sub(subscriptionId, name, param);
}
/**
* Unsubscribe with DDP client.
*/
public Task<DDPSubscription.NoSub> unsubscribe(final String subscriptionId) {
RCLog.d("unsub:[%s]>", subscriptionId);
return ddpClient.unsub(subscriptionId);
}
/**
* Returns Observable for handling DDP subscription.
*/
public Flowable<DDPSubscription.Event> getSubscriptionCallback() {
return ddpClient.getSubscriptionCallback();
}
/**
* Execute raw RPC.
*/
public Task<DDPClientCallback.RPC> rpc(String methodCallId, String methodName, String params,
long timeoutMs) {
RCLog.d("rpc:[%s]> %s(%s) timeout=%d", methodCallId, methodName, params, timeoutMs);
if (TextUtils.isEmpty(params)) {
return ddpClient.rpc(methodName, null, methodCallId, timeoutMs).continueWithTask(task -> {
if (task.isFaulted()) {
RCLog.d("rpc:[%s]< error = %s", methodCallId, task.getError());
} else {
RCLog.d("rpc:[%s]< result = %s", methodCallId, task.getResult().result);
}
return task;
});
}
try {
return ddpClient.rpc(methodName, new JSONArray(params), methodCallId, timeoutMs)
.continueWithTask(task -> {
if (task.isFaulted()) {
RCLog.d("rpc:[%s]< error = %s", methodCallId, task.getError());
} else {
RCLog.d("rpc:[%s]< result = %s", methodCallId, task.getResult().result);
}
return task;
});
} catch (JSONException exception) {
return Task.forError(exception);
}
}
/**
* check WebSocket connectivity with ping.
*/
public Task<Void> ping() {
final String pingId = UUID.randomUUID().toString();
RCLog.d("ping[%s] >", pingId);
return ddpClient.ping(pingId)
.continueWithTask(task -> {
if (task.isFaulted()) {
RCLog.d(task.getError(), "ping[%s] xxx failed xxx", pingId);
return Task.forError(task.getError());
} else {
RCLog.d("pong[%s] <", pingId);
return Task.forResult(null);
}
});
}
/**
* check WebSocket connectivity with ping.
*/
public Maybe<DDPClientCallback.Base> doPing() {
final String pingId = UUID.randomUUID().toString();
RCLog.d("ping[%s] >", pingId);
return ddpClient.doPing(pingId);
}
}
package chat.rocket.android.api;
import android.content.Context;
import org.json.JSONArray;
import org.json.JSONObject;
import bolts.Task;
import chat.rocket.android.helper.TextUtils;
import chat.rocket.persistence.realm.RealmHelper;
import chat.rocket.android.service.DDPClientRef;
/**
* MethodCall for uploading file.
......@@ -17,8 +17,8 @@ public class FileUploadingHelper extends MethodCallHelper {
super(context, hostname);
}
public FileUploadingHelper(RealmHelper realmHelper, DDPClientRef ddpClientRef) {
super(realmHelper, ddpClientRef);
public FileUploadingHelper(RealmHelper realmHelper) {
super(realmHelper);
}
public Task<JSONObject> uploadS3Request(String filename, long filesize, String mimeType,
......
......@@ -15,7 +15,7 @@ import chat.rocket.android.RocketChatCache;
import chat.rocket.android.helper.CheckSum;
import chat.rocket.android.helper.TextUtils;
import chat.rocket.android.service.ConnectivityManager;
import chat.rocket.android.service.DDPClientRef;
import chat.rocket.android_ddp.DDPClient;
import chat.rocket.android_ddp.DDPClientCallback;
import chat.rocket.core.PublicSettingsConstants;
import chat.rocket.core.SyncState;
......@@ -48,7 +48,6 @@ public class MethodCallHelper {
task -> Task.forResult(new JSONArray(task.getResult()));
protected final Context context;
protected final RealmHelper realmHelper;
protected final DDPClientRef ddpClientRef;
/**
* initialize with Context and hostname.
......@@ -56,28 +55,25 @@ public class MethodCallHelper {
public MethodCallHelper(Context context, String hostname) {
this.context = context.getApplicationContext();
this.realmHelper = RealmStore.getOrCreate(hostname);
ddpClientRef = null;
}
/**
* initialize with RealmHelper and DDPClient.
*/
public MethodCallHelper(RealmHelper realmHelper, DDPClientRef ddpClientRef) {
public MethodCallHelper(RealmHelper realmHelper) {
this.context = null;
this.realmHelper = realmHelper;
this.ddpClientRef = ddpClientRef;
}
public MethodCallHelper(Context context, RealmHelper realmHelper, DDPClientRef ddpClientRef) {
public MethodCallHelper(Context context, RealmHelper realmHelper) {
this.context = context.getApplicationContext();
this.realmHelper = realmHelper;
this.ddpClientRef = ddpClientRef;
}
@DebugLog
private Task<String> executeMethodCall(String methodName, String param, long timeout) {
if (ddpClientRef != null) {
return ddpClientRef.get().rpc(UUID.randomUUID().toString(), methodName, param, timeout)
if (DDPClient.get() != null) {
return DDPClient.get().rpc(UUID.randomUUID().toString(), methodName, param, timeout)
.onSuccessTask(task -> Task.forResult(task.getResult().result));
} else {
return MethodCall.execute(realmHelper, methodName, param, timeout)
......
......@@ -3,21 +3,20 @@ package chat.rocket.android.api;
import android.content.Context;
import android.support.annotation.NonNull;
import android.support.annotation.Nullable;
import org.json.JSONArray;
import org.json.JSONObject;
import bolts.Task;
import chat.rocket.persistence.realm.RealmHelper;
import chat.rocket.android.service.DDPClientRef;
public class RaixPushHelper extends MethodCallHelper {
public RaixPushHelper(Context context, String hostname) {
super(context, hostname);
}
public RaixPushHelper(RealmHelper realmHelper,
DDPClientRef ddpClientRef) {
super(realmHelper, ddpClientRef);
public RaixPushHelper(RealmHelper realmHelper) {
super(realmHelper);
}
public Task<Void> pushUpdate(@NonNull String pushId, @NonNull String gcmToken,
......
package chat.rocket.android.service;
import chat.rocket.android.api.DDPClientWrapper;
/**
* reference to get fresh DDPClient instance.
*/
public interface DDPClientRef {
DDPClientWrapper get();
}
......@@ -13,7 +13,6 @@ import java.util.List;
import java.util.concurrent.TimeUnit;
import bolts.Task;
import chat.rocket.android.api.DDPClientWrapper;
import chat.rocket.android.api.MethodCallHelper;
import chat.rocket.android.helper.LogIfError;
import chat.rocket.android.helper.RxHelper;
......@@ -33,6 +32,7 @@ import chat.rocket.android.service.observer.MethodCallObserver;
import chat.rocket.android.service.observer.NewMessageObserver;
import chat.rocket.android.service.observer.PushSettingsObserver;
import chat.rocket.android.service.observer.SessionObserver;
import chat.rocket.android_ddp.DDPClient;
import chat.rocket.android_ddp.DDPClientCallback;
import chat.rocket.core.models.ServerInfo;
import chat.rocket.persistence.realm.RealmHelper;
......@@ -73,14 +73,7 @@ public class RocketChatWebSocketThread extends HandlerThread {
private final ArrayList<Registrable> listeners = new ArrayList<>();
private final CompositeDisposable hearbeatDisposable = new CompositeDisposable();
private final CompositeSubscription reconnectSubscription = new CompositeSubscription();
private DDPClientWrapper ddpClient;
private boolean listenersRegistered;
private final DDPClientRef ddpClientRef = new DDPClientRef() {
@Override
public DDPClientWrapper get() {
return ddpClient;
}
};
private static class KeepAliveTimer {
private long lastTime;
......@@ -194,7 +187,7 @@ public class RocketChatWebSocketThread extends HandlerThread {
@DebugLog
private Single<Boolean> checkIfConnectionAlive() {
if (ddpClient == null) {
if (DDPClient.get() == null) {
return Single.just(false);
}
......@@ -207,7 +200,7 @@ public class RocketChatWebSocketThread extends HandlerThread {
new Thread() {
@Override
public void run() {
ddpClient.ping().continueWith(task -> {
DDPClient.get().ping().continueWith(task -> {
if (task.isFaulted()) {
Exception error = task.getError();
RCLog.e(error);
......@@ -229,7 +222,7 @@ public class RocketChatWebSocketThread extends HandlerThread {
private Flowable<Boolean> heartbeat(long interval) {
return Flowable.interval(interval, TimeUnit.MILLISECONDS)
.onBackpressureDrop()
.flatMap(tick -> ddpClient.doPing().toFlowable())
.flatMap(tick -> DDPClient.get().doPing().toFlowable())
.map(callback -> {
if (callback instanceof DDPClientCallback.Ping) {
return true;
......@@ -246,11 +239,10 @@ public class RocketChatWebSocketThread extends HandlerThread {
private Single<Boolean> prepareDDPClient() {
// TODO: temporarily replaced checkIfConnectionAlive() call for this single checking if ddpClient is
// null or not. In case it is, build a new client, otherwise just keep connecting with existing one.
return Single.just(ddpClient != null)
return Single.just(DDPClient.get() != null)
.doOnSuccess(alive -> {
if (!alive) {
RCLog.d("DDPClient#build");
ddpClient = DDPClientWrapper.create(hostname);
}
});
}
......@@ -264,7 +256,7 @@ public class RocketChatWebSocketThread extends HandlerThread {
return;
}
RCLog.d("DDPClient#connect");
ddpClient.connect(info.getSession(), info.isSecure())
DDPClient.get().connect(hostname, info.getSession(), info.isSecure())
.onSuccessTask(task -> {
final String newSession = task.getResult().session;
connectivityManager.notifyConnectionEstablished(hostname, newSession);
......@@ -307,7 +299,7 @@ public class RocketChatWebSocketThread extends HandlerThread {
if (reconnectSubscription.hasSubscriptions()) {
return;
}
ddpClient.close();
DDPClient.get().close();
forceInvalidateTokens();
connectivityManager.notifyConnecting(hostname);
// Needed to use subscriptions because of legacy code.
......@@ -347,11 +339,11 @@ public class RocketChatWebSocketThread extends HandlerThread {
}
private Task<Void> fetchPublicSettings() {
return new MethodCallHelper(appContext, realmHelper, ddpClientRef).getPublicSettings(hostname);
return new MethodCallHelper(appContext, realmHelper).getPublicSettings(hostname);
}
private Task<Void> fetchPermissions() {
return new MethodCallHelper(realmHelper, ddpClientRef).getPermissions();
return new MethodCallHelper(realmHelper).getPermissions();
}
@DebugLog
......@@ -377,7 +369,7 @@ public class RocketChatWebSocketThread extends HandlerThread {
// if we have a session try to resume it. At this point we're probably recovering from
// a disconnection state
final CompositeSubscription subscriptions = new CompositeSubscription();
MethodCallHelper methodCall = new MethodCallHelper(realmHelper, ddpClientRef);
MethodCallHelper methodCall = new MethodCallHelper(realmHelper);
subscriptions.add(
Completable.defer(() -> {
Task<Void> result = methodCall.loginWithToken(sessions.get(0).getToken());
......@@ -405,9 +397,8 @@ public class RocketChatWebSocketThread extends HandlerThread {
private void createObserversAndRegister() {
for (Class clazz : REGISTERABLE_CLASSES) {
try {
Constructor ctor = clazz.getConstructor(Context.class, String.class, RealmHelper.class,
DDPClientRef.class);
Object obj = ctor.newInstance(appContext, hostname, realmHelper, ddpClientRef);
Constructor ctor = clazz.getConstructor(Context.class, String.class, RealmHelper.class);
Object obj = ctor.newInstance(appContext, hostname, realmHelper);
if (obj instanceof Registrable) {
Registrable registrable = (Registrable) obj;
......@@ -448,9 +439,8 @@ public class RocketChatWebSocketThread extends HandlerThread {
@DebugLog
private void unregisterListenersAndClose() {
unregisterListeners();
if (ddpClient != null) {
ddpClient.close();
ddpClient = null;
if (DDPClient.get() != null) {
DDPClient.get().close();
}
}
......
......@@ -3,38 +3,37 @@ package chat.rocket.android.service.ddp;
import android.content.Context;
import android.text.TextUtils;
import chat.rocket.persistence.realm.models.ddp.RealmUser;
import io.reactivex.disposables.Disposable;
import io.realm.Realm;
import io.realm.RealmObject;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
import java.util.Iterator;
import chat.rocket.android.helper.LogIfError;
import chat.rocket.android.log.RCLog;
import chat.rocket.persistence.realm.RealmHelper;
import chat.rocket.android.service.DDPClientRef;
import chat.rocket.android.service.Registrable;
import chat.rocket.android_ddp.DDPClient;
import chat.rocket.android_ddp.DDPSubscription;
import chat.rocket.persistence.realm.RealmHelper;
import chat.rocket.persistence.realm.models.ddp.RealmUser;
import io.reactivex.disposables.Disposable;
import io.realm.Realm;
import io.realm.RealmObject;
import io.realm.RealmResults;
public abstract class AbstractDDPDocEventSubscriber implements Registrable {
protected final Context context;
protected final String hostname;
protected final RealmHelper realmHelper;
protected final DDPClientRef ddpClientRef;
private boolean isUnsubscribed;
private String subscriptionId;
private Disposable rxSubscription;
protected AbstractDDPDocEventSubscriber(Context context, String hostname,
RealmHelper realmHelper, DDPClientRef ddpClientRef) {
RealmHelper realmHelper) {
this.context = context;
this.hostname = hostname;
this.realmHelper = realmHelper;
this.ddpClientRef = ddpClientRef;
}
protected abstract String getSubscriptionName();
......@@ -69,9 +68,9 @@ public abstract class AbstractDDPDocEventSubscriber implements Registrable {
// just ignore.
}
ddpClientRef.get().subscribe(getSubscriptionName(), params).onSuccess(task -> {
DDPClient.get().subscribe(getSubscriptionName(), params).onSuccess(task -> {
if (isUnsubscribed) {
ddpClientRef.get().unsubscribe(task.getResult().id).continueWith(new LogIfError());
DDPClient.get().unsubscribe(task.getResult().id).continueWith(new LogIfError());
} else {
subscriptionId = task.getResult().id;
}
......@@ -98,7 +97,7 @@ public abstract class AbstractDDPDocEventSubscriber implements Registrable {
}
protected Disposable subscribe() {
return ddpClientRef.get().getSubscriptionCallback()
return DDPClient.get().getSubscriptionCallback()
.filter(event -> event instanceof DDPSubscription.DocEvent)
.cast(DDPSubscription.DocEvent.class)
.filter(event -> isTarget(event.collection))
......@@ -197,8 +196,8 @@ public abstract class AbstractDDPDocEventSubscriber implements Registrable {
if (rxSubscription != null) {
rxSubscription.dispose();
}
if (!TextUtils.isEmpty(subscriptionId) && ddpClientRef.get() != null) {
ddpClientRef.get().unsubscribe(subscriptionId).continueWith(new LogIfError());
if (!TextUtils.isEmpty(subscriptionId) && DDPClient.get() != null) {
DDPClient.get().unsubscribe(subscriptionId).continueWith(new LogIfError());
}
}
}
package chat.rocket.android.service.ddp.base;
import android.content.Context;
import org.json.JSONArray;
import chat.rocket.persistence.realm.RealmHelper;
import chat.rocket.android.service.DDPClientRef;
import chat.rocket.android.service.ddp.AbstractDDPDocEventSubscriber;
import chat.rocket.persistence.realm.RealmHelper;
abstract class AbstractBaseSubscriber extends AbstractDDPDocEventSubscriber {
protected AbstractBaseSubscriber(Context context, String hostname, RealmHelper realmHelper,
DDPClientRef ddpClientRef) {
super(context, hostname, realmHelper, ddpClientRef);
protected AbstractBaseSubscriber(Context context, String hostname, RealmHelper realmHelper) {
super(context, hostname, realmHelper);
}
@Override
......
package chat.rocket.android.service.ddp.base;
import android.content.Context;
import io.realm.RealmObject;
import org.json.JSONException;
import org.json.JSONObject;
import chat.rocket.persistence.realm.models.ddp.RealmUser;
import chat.rocket.persistence.realm.RealmHelper;
import chat.rocket.android.service.DDPClientRef;
import chat.rocket.persistence.realm.models.ddp.RealmUser;
import io.realm.RealmObject;
/**
* "activeUsers" subscriber.
*/
public class ActiveUsersSubscriber extends AbstractBaseSubscriber {
public ActiveUsersSubscriber(Context context, String hostname, RealmHelper realmHelper,
DDPClientRef ddpClientRef) {
super(context, hostname, realmHelper, ddpClientRef);
public ActiveUsersSubscriber(Context context, String hostname, RealmHelper realmHelper) {
super(context, hostname, realmHelper);
}
@Override
......
package chat.rocket.android.service.ddp.base;
import android.content.Context;
import io.realm.RealmObject;
import chat.rocket.persistence.realm.models.ddp.RealmMeteorLoginServiceConfiguration;
import chat.rocket.persistence.realm.RealmHelper;
import chat.rocket.android.service.DDPClientRef;
import chat.rocket.persistence.realm.models.ddp.RealmMeteorLoginServiceConfiguration;
import io.realm.RealmObject;
/**
* meteor.loginServiceConfiguration subscriber
*/
public class LoginServiceConfigurationSubscriber extends AbstractBaseSubscriber {
public LoginServiceConfigurationSubscriber(Context context, String hostname,
RealmHelper realmHelper, DDPClientRef ddpClientRef) {
super(context, hostname, realmHelper, ddpClientRef);
RealmHelper realmHelper) {
super(context, hostname, realmHelper);
}
@Override
......
package chat.rocket.android.service.ddp.base;
import android.content.Context;
import io.realm.RealmObject;
import org.json.JSONException;
import org.json.JSONObject;
import chat.rocket.persistence.realm.models.ddp.RealmUser;
import chat.rocket.persistence.realm.RealmHelper;
import chat.rocket.android.service.DDPClientRef;
import chat.rocket.persistence.realm.models.ddp.RealmUser;
import io.realm.RealmObject;
/**
* "userData" subscriber.
*/
public class UserDataSubscriber extends AbstractBaseSubscriber {
public UserDataSubscriber(Context context, String hostname, RealmHelper realmHelper,
DDPClientRef ddpClientRef) {
super(context, hostname, realmHelper, ddpClientRef);
public UserDataSubscriber(Context context, String hostname, RealmHelper realmHelper) {
super(context, hostname, realmHelper);
}
@Override
......
package chat.rocket.android.service.ddp.stream;
import android.content.Context;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
import chat.rocket.android.helper.LogIfError;
import chat.rocket.android.log.RCLog;
import chat.rocket.persistence.realm.RealmHelper;
import chat.rocket.android.service.DDPClientRef;
import chat.rocket.android.service.ddp.AbstractDDPDocEventSubscriber;
import chat.rocket.android_ddp.DDPSubscription;
import chat.rocket.persistence.realm.RealmHelper;
abstract class AbstractStreamNotifyEventSubscriber extends AbstractDDPDocEventSubscriber {
protected AbstractStreamNotifyEventSubscriber(Context context, String hostname,
RealmHelper realmHelper,
DDPClientRef ddpClientRef) {
super(context, hostname, realmHelper, ddpClientRef);
RealmHelper realmHelper) {
super(context, hostname, realmHelper);
}
@Override
......
......@@ -3,15 +3,13 @@ package chat.rocket.android.service.ddp.stream;
import android.content.Context;
import chat.rocket.persistence.realm.RealmHelper;
import chat.rocket.android.service.DDPClientRef;
abstract class AbstractStreamNotifyUserEventSubscriber extends AbstractStreamNotifyEventSubscriber {
protected final String userId;
protected AbstractStreamNotifyUserEventSubscriber(Context context, String hostname,
RealmHelper realmHelper,
DDPClientRef ddpClientRef, String userId) {
super(context, hostname, realmHelper, ddpClientRef);
RealmHelper realmHelper, String userId) {
super(context, hostname, realmHelper);
this.userId = userId;
}
......
package chat.rocket.android.service.ddp.stream;
import android.content.Context;
import io.realm.RealmObject;
import org.json.JSONException;
import org.json.JSONObject;
import chat.rocket.persistence.realm.models.ddp.RealmRoom;
import chat.rocket.persistence.realm.RealmHelper;
import chat.rocket.android.service.DDPClientRef;
import chat.rocket.persistence.realm.models.ddp.RealmRoom;
import io.realm.RealmObject;
public class StreamNotifyUserSubscriptionsChanged extends AbstractStreamNotifyUserEventSubscriber {
public StreamNotifyUserSubscriptionsChanged(Context context, String hostname,
RealmHelper realmHelper, DDPClientRef ddpClientRef,
RealmHelper realmHelper,
String userId) {
super(context, hostname, realmHelper, ddpClientRef, userId);
super(context, hostname, realmHelper, userId);
}
@Override
......
package chat.rocket.android.service.ddp.stream;
import android.content.Context;
import io.realm.RealmObject;
import org.json.JSONException;
import org.json.JSONObject;
import chat.rocket.persistence.realm.models.ddp.RealmMessage;
import chat.rocket.persistence.realm.RealmHelper;
import chat.rocket.android.service.DDPClientRef;
import chat.rocket.persistence.realm.models.ddp.RealmMessage;
import io.realm.RealmObject;
/**
* stream-room-message subscriber.
......@@ -16,8 +16,8 @@ public class StreamRoomMessage extends AbstractStreamNotifyEventSubscriber {
private String roomId;
public StreamRoomMessage(Context context, String hostname,
RealmHelper realmHelper, DDPClientRef ddpClientRef, String roomId) {
super(context, hostname, realmHelper, ddpClientRef);
RealmHelper realmHelper, String roomId) {
super(context, hostname, realmHelper);
this.roomId = roomId;
}
......
......@@ -5,10 +5,9 @@ import android.os.Handler;
import android.os.Looper;
import chat.rocket.android.RocketChatCache;
import chat.rocket.persistence.realm.RealmHelper;
import chat.rocket.android.service.DDPClientRef;
import chat.rocket.android.service.Registrable;
import chat.rocket.android.service.ddp.stream.StreamRoomMessage;
import chat.rocket.persistence.realm.RealmHelper;
/**
* wrapper for managing stream-notify-message depending on RocketChatCache.
......@@ -17,18 +16,16 @@ public class StreamRoomMessageManager implements Registrable {
private final Context context;
private final String hostname;
private final RealmHelper realmHelper;
private final DDPClientRef ddpClientRef;
private final AbstractRocketChatCacheObserver cacheObserver;
private final Handler handler;
private final RocketChatCache rocketChatCache;
private StreamRoomMessage streamRoomMessage;
public StreamRoomMessageManager(Context context, String hostname,
RealmHelper realmHelper, DDPClientRef ddpClientRef) {
RealmHelper realmHelper) {
this.context = context;
this.hostname = hostname;
this.realmHelper = realmHelper;
this.ddpClientRef = ddpClientRef;
this.rocketChatCache = new RocketChatCache(context);
cacheObserver = new AbstractRocketChatCacheObserver(context, realmHelper) {
......@@ -43,7 +40,7 @@ public class StreamRoomMessageManager implements Registrable {
private void registerStreamNotifyMessage(String roomId) {
handler.post(() -> {
streamRoomMessage = new StreamRoomMessage(context, hostname, realmHelper, ddpClientRef, roomId);
streamRoomMessage = new StreamRoomMessage(context, hostname, realmHelper, roomId);
streamRoomMessage.register();
});
}
......
package chat.rocket.android.service.observer;
import android.content.Context;
import io.realm.RealmObject;
import chat.rocket.android.service.Registrable;
import chat.rocket.persistence.realm.RealmHelper;
import chat.rocket.persistence.realm.RealmListObserver;
import chat.rocket.android.service.DDPClientRef;
import chat.rocket.android.service.Registrable;
import io.realm.RealmObject;
abstract class AbstractModelObserver<T extends RealmObject>
implements Registrable, RealmListObserver.Query<T>, RealmListObserver.OnUpdateListener<T> {
......@@ -14,15 +13,13 @@ abstract class AbstractModelObserver<T extends RealmObject>
protected final Context context;
protected final String hostname;
protected final RealmHelper realmHelper;
protected final DDPClientRef ddpClientRef;
private final RealmListObserver observer;
protected AbstractModelObserver(Context context, String hostname,
RealmHelper realmHelper, DDPClientRef ddpClientRef) {
RealmHelper realmHelper) {
this.context = context;
this.hostname = hostname;
this.realmHelper = realmHelper;
this.ddpClientRef = ddpClientRef;
observer = realmHelper.createListObserver(this).setOnUpdateListener(this);
}
......
package chat.rocket.android.service.observer;
import android.content.Context;
import io.realm.Realm;
import io.realm.RealmResults;
import java.util.ArrayList;
import java.util.List;
import chat.rocket.android.api.MethodCallHelper;
import chat.rocket.android.helper.LogIfError;
import chat.rocket.persistence.realm.models.ddp.RealmUser;
import chat.rocket.persistence.realm.RealmHelper;
import chat.rocket.android.service.DDPClientRef;
import chat.rocket.android.service.Registrable;
import chat.rocket.android.service.ddp.stream.StreamNotifyUserSubscriptionsChanged;
import chat.rocket.persistence.realm.RealmHelper;
import chat.rocket.persistence.realm.models.ddp.RealmUser;
import hugo.weaving.DebugLog;
import io.realm.Realm;
import io.realm.RealmResults;
/**
* observe the user with emails.
......@@ -24,9 +24,9 @@ public class CurrentUserObserver extends AbstractModelObserver<RealmUser> {
private ArrayList<Registrable> listeners;
public CurrentUserObserver(Context context, String hostname,
RealmHelper realmHelper, DDPClientRef ddpClientRef) {
super(context, hostname, realmHelper, ddpClientRef);
methodCall = new MethodCallHelper(realmHelper, ddpClientRef);
RealmHelper realmHelper) {
super(context, hostname, realmHelper);
methodCall = new MethodCallHelper(realmHelper);
currentUserExists = false;
}
......@@ -62,7 +62,7 @@ public class CurrentUserObserver extends AbstractModelObserver<RealmUser> {
methodCall.getRoomSubscriptions().onSuccess(task -> {
if (listeners != null) {
Registrable listener = new StreamNotifyUserSubscriptionsChanged(
context, hostname, realmHelper, ddpClientRef, userId);
context, hostname, realmHelper, userId);
listener.register();
listeners.add(listener);
}
......
......@@ -9,7 +9,6 @@ import java.util.List;
import chat.rocket.android.api.MethodCallHelper;
import chat.rocket.android.helper.LogIfError;
import chat.rocket.android.log.RCLog;
import chat.rocket.android.service.DDPClientRef;
import chat.rocket.core.SyncState;
import chat.rocket.persistence.realm.RealmHelper;
import chat.rocket.persistence.realm.models.ddp.RealmMessage;
......@@ -23,9 +22,9 @@ public class DeletedMessageObserver extends AbstractModelObserver<RealmMessage>
private final MethodCallHelper methodCall;
public DeletedMessageObserver(Context context, String hostname, RealmHelper realmHelper, DDPClientRef ddpClientRef) {
super(context, hostname, realmHelper, ddpClientRef);
methodCall = new MethodCallHelper(realmHelper, ddpClientRef);
public DeletedMessageObserver(Context context, String hostname, RealmHelper realmHelper) {
super(context, hostname, realmHelper);
methodCall = new MethodCallHelper(realmHelper);
realmHelper.executeTransaction(realm -> {
// resume pending operations.
......
......@@ -2,23 +2,24 @@ package chat.rocket.android.service.observer;
import android.content.Context;
import android.net.Uri;
import chat.rocket.android.helper.OkHttpHelper;
import io.realm.Realm;
import io.realm.RealmResults;
import org.json.JSONArray;
import org.json.JSONObject;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import bolts.Task;
import chat.rocket.android.api.FileUploadingHelper;
import chat.rocket.android.helper.LogIfError;
import chat.rocket.android.helper.OkHttpHelper;
import chat.rocket.android.log.RCLog;
import chat.rocket.core.SyncState;
import chat.rocket.persistence.realm.models.internal.FileUploading;
import chat.rocket.persistence.realm.RealmHelper;
import chat.rocket.android.service.DDPClientRef;
import chat.rocket.persistence.realm.models.internal.FileUploading;
import io.realm.Realm;
import io.realm.RealmResults;
import okhttp3.MediaType;
import okhttp3.MultipartBody;
import okhttp3.Request;
......@@ -35,9 +36,9 @@ public class FileUploadingToUrlObserver extends AbstractModelObserver<FileUpload
private FileUploadingHelper methodCall;
public FileUploadingToUrlObserver(Context context, String hostname,
RealmHelper realmHelper, DDPClientRef ddpClientRef) {
super(context, hostname, realmHelper, ddpClientRef);
methodCall = new FileUploadingHelper(realmHelper, ddpClientRef);
RealmHelper realmHelper) {
super(context, hostname, realmHelper);
methodCall = new FileUploadingHelper(realmHelper);
realmHelper.executeTransaction(realm -> {
// resume pending operations.
......
......@@ -2,23 +2,24 @@ package chat.rocket.android.service.observer;
import android.content.Context;
import android.net.Uri;
import chat.rocket.android.helper.OkHttpHelper;
import io.realm.Realm;
import io.realm.RealmResults;
import org.json.JSONObject;
import java.io.InputStream;
import java.util.List;
import bolts.Task;
import chat.rocket.android.api.FileUploadingHelper;
import chat.rocket.android.helper.LogIfError;
import chat.rocket.android.helper.OkHttpHelper;
import chat.rocket.android.log.RCLog;
import chat.rocket.core.SyncState;
import chat.rocket.persistence.realm.RealmHelper;
import chat.rocket.persistence.realm.models.ddp.RealmUser;
import chat.rocket.persistence.realm.models.internal.FileUploading;
import chat.rocket.persistence.realm.models.internal.RealmSession;
import chat.rocket.persistence.realm.RealmHelper;
import chat.rocket.android.service.DDPClientRef;
import io.realm.Realm;
import io.realm.RealmResults;
import okhttp3.MediaType;
import okhttp3.Request;
import okhttp3.RequestBody;
......@@ -31,9 +32,9 @@ public class FileUploadingWithUfsObserver extends AbstractModelObserver<FileUplo
private FileUploadingHelper methodCall;
public FileUploadingWithUfsObserver(Context context, String hostname,
RealmHelper realmHelper, DDPClientRef ddpClientRef) {
super(context, hostname, realmHelper, ddpClientRef);
methodCall = new FileUploadingHelper(realmHelper, ddpClientRef);
RealmHelper realmHelper) {
super(context, hostname, realmHelper);
methodCall = new FileUploadingHelper(realmHelper);
realmHelper.executeTransaction(realm -> {
// resume pending operations.
......
......@@ -13,7 +13,6 @@ import chat.rocket.android.R;
import chat.rocket.android.RocketChatCache;
import chat.rocket.android.api.RaixPushHelper;
import chat.rocket.android.helper.LogIfError;
import chat.rocket.android.service.DDPClientRef;
import chat.rocket.core.SyncState;
import chat.rocket.persistence.realm.RealmHelper;
import chat.rocket.persistence.realm.models.ddp.RealmUser;
......@@ -26,9 +25,8 @@ import io.realm.RealmResults;
*/
public class GcmPushRegistrationObserver extends AbstractModelObserver<GcmPushRegistration> {
public GcmPushRegistrationObserver(Context context, String hostname,
RealmHelper realmHelper,
DDPClientRef ddpClientRef) {
super(context, hostname, realmHelper, ddpClientRef);
RealmHelper realmHelper) {
super(context, hostname, realmHelper);
}
@Override
......@@ -72,7 +70,7 @@ public class GcmPushRegistrationObserver extends AbstractModelObserver<GcmPushRe
final String userId = currentUser != null ? currentUser.getId() : null;
final String pushId = new RocketChatCache(context).getOrCreatePushId();
return new RaixPushHelper(realmHelper, ddpClientRef)
return new RaixPushHelper(realmHelper)
.pushUpdate(pushId, gcmToken, userId);
}
......
package chat.rocket.android.service.observer;
import android.content.Context;
import io.realm.Realm;
import io.realm.RealmResults;
import org.json.JSONObject;
import java.util.List;
import bolts.Task;
import chat.rocket.android.api.MethodCallHelper;
import chat.rocket.android.log.RCLog;
import chat.rocket.core.SyncState;
import chat.rocket.persistence.realm.models.internal.GetUsersOfRoomsProcedure;
import chat.rocket.persistence.realm.RealmHelper;
import chat.rocket.android.service.DDPClientRef;
import chat.rocket.persistence.realm.models.internal.GetUsersOfRoomsProcedure;
import io.realm.Realm;
import io.realm.RealmResults;
/**
* Model observer for executing getUsersOfRooms.
......@@ -23,9 +24,9 @@ public class GetUsersOfRoomsProcedureObserver
private final MethodCallHelper methodCall;
public GetUsersOfRoomsProcedureObserver(Context context, String hostname,
RealmHelper realmHelper, DDPClientRef ddpClientRef) {
super(context, hostname, realmHelper, ddpClientRef);
methodCall = new MethodCallHelper(realmHelper, ddpClientRef);
RealmHelper realmHelper) {
super(context, hostname, realmHelper);
methodCall = new MethodCallHelper(realmHelper);
}
@Override
......
package chat.rocket.android.service.observer;
import android.content.Context;
import org.json.JSONObject;
import java.util.List;
import chat.rocket.android.api.MethodCallHelper;
import chat.rocket.android.log.RCLog;
import chat.rocket.android.service.DDPClientRef;
import chat.rocket.core.SyncState;
import chat.rocket.persistence.realm.RealmHelper;
import chat.rocket.persistence.realm.models.ddp.RealmMessage;
......@@ -11,8 +15,6 @@ import chat.rocket.persistence.realm.models.internal.LoadMessageProcedure;
import io.realm.Realm;
import io.realm.RealmResults;
import io.realm.Sort;
import java.util.List;
import org.json.JSONObject;
/**
* Background process for loading messages.
......@@ -22,9 +24,9 @@ public class LoadMessageProcedureObserver extends AbstractModelObserver<LoadMess
private final MethodCallHelper methodCall;
public LoadMessageProcedureObserver(Context context, String hostname,
RealmHelper realmHelper, DDPClientRef ddpClientRef) {
super(context, hostname, realmHelper, ddpClientRef);
methodCall = new MethodCallHelper(realmHelper, ddpClientRef);
RealmHelper realmHelper) {
super(context, hostname, realmHelper);
methodCall = new MethodCallHelper(realmHelper);
}
@Override
......
......@@ -8,7 +8,7 @@ import java.util.List;
import chat.rocket.android.helper.CheckSum;
import chat.rocket.android.helper.LogIfError;
import chat.rocket.android.service.DDPClientRef;
import chat.rocket.android_ddp.DDPClient;
import chat.rocket.android_ddp.DDPClientCallback;
import chat.rocket.core.SyncState;
import chat.rocket.persistence.realm.RealmHelper;
......@@ -27,8 +27,8 @@ public class MethodCallObserver extends AbstractModelObserver<MethodCall> {
* constructor.
*/
public MethodCallObserver(Context context, String hostname,
RealmHelper realmHelper, DDPClientRef ddpClientRef) {
super(context, hostname, realmHelper, ddpClientRef);
RealmHelper realmHelper) {
super(context, hostname, realmHelper);
realmHelper.executeTransaction(realm -> {
// resume pending operations.
RealmResults<MethodCall> pendingMethodCalls = realm.where(MethodCall.class)
......@@ -99,7 +99,7 @@ public class MethodCallObserver extends AbstractModelObserver<MethodCall> {
.put(MethodCall.ID, methodCallId)
.put(MethodCall.SYNC_STATE, SyncState.SYNCING))
).onSuccessTask(task ->
ddpClientRef.get().rpc(methodCallId, methodName, params, timeout)
DDPClient.get().rpc(methodCallId, methodName, params, timeout)
.onSuccessTask(_task -> realmHelper.executeTransaction(realm -> {
String json = _task.getResult().result;
return realm.createOrUpdateObjectFromJson(MethodCall.class, new JSONObject()
......
package chat.rocket.android.service.observer;
import android.content.Context;
import io.realm.Realm;
import io.realm.RealmResults;
import org.json.JSONObject;
import java.util.List;
import chat.rocket.android.api.MethodCallHelper;
import chat.rocket.android.helper.LogIfError;
import chat.rocket.android.log.RCLog;
import chat.rocket.core.SyncState;
import chat.rocket.persistence.realm.models.ddp.RealmMessage;
import chat.rocket.persistence.realm.RealmHelper;
import chat.rocket.android.service.DDPClientRef;
import chat.rocket.persistence.realm.models.ddp.RealmMessage;
import io.realm.Realm;
import io.realm.RealmResults;
/**
* Observe messages for sending.
......@@ -21,9 +22,9 @@ public class NewMessageObserver extends AbstractModelObserver<RealmMessage> {
private final MethodCallHelper methodCall;
public NewMessageObserver(Context context, String hostname, RealmHelper realmHelper, DDPClientRef ddpClientRef) {
super(context, hostname, realmHelper, ddpClientRef);
methodCall = new MethodCallHelper(realmHelper, ddpClientRef);
public NewMessageObserver(Context context, String hostname, RealmHelper realmHelper) {
super(context, hostname, realmHelper);
methodCall = new MethodCallHelper(realmHelper);
realmHelper.executeTransaction(realm -> {
// resume pending operations.
......
package chat.rocket.android.service.observer;
import android.content.Context;
import io.realm.Realm;
import io.realm.RealmResults;
import java.util.List;
import chat.rocket.android.helper.GcmPushSettingHelper;
import chat.rocket.persistence.realm.RealmHelper;
import chat.rocket.persistence.realm.models.ddp.RealmPublicSetting;
import chat.rocket.persistence.realm.models.internal.GcmPushRegistration;
import chat.rocket.persistence.realm.RealmHelper;
import chat.rocket.android.service.DDPClientRef;
import io.realm.Realm;
import io.realm.RealmResults;
public class PushSettingsObserver extends AbstractModelObserver<RealmPublicSetting> {
public PushSettingsObserver(Context context, String hostname,
RealmHelper realmHelper, DDPClientRef ddpClientRef) {
super(context, hostname, realmHelper, ddpClientRef);
RealmHelper realmHelper) {
super(context, hostname, realmHelper);
}
@Override
......
package chat.rocket.android.service.observer;
import android.content.Context;
import io.realm.Realm;
import io.realm.RealmResults;
import java.util.List;
import chat.rocket.android.RocketChatCache;
import chat.rocket.android.api.RaixPushHelper;
import chat.rocket.android.helper.LogIfError;
import chat.rocket.android.service.internal.StreamRoomMessageManager;
import chat.rocket.persistence.realm.RealmHelper;
import chat.rocket.persistence.realm.models.internal.GetUsersOfRoomsProcedure;
import chat.rocket.persistence.realm.models.internal.LoadMessageProcedure;
import chat.rocket.persistence.realm.models.internal.MethodCall;
import chat.rocket.persistence.realm.models.internal.RealmSession;
import chat.rocket.persistence.realm.RealmHelper;
import chat.rocket.android.service.DDPClientRef;
import chat.rocket.android.service.internal.StreamRoomMessageManager;
import hugo.weaving.DebugLog;
import io.realm.Realm;
import io.realm.RealmResults;
/**
* Observes user is logged into server.
......@@ -29,13 +29,13 @@ public class SessionObserver extends AbstractModelObserver<RealmSession> {
* constructor.
*/
public SessionObserver(Context context, String hostname,
RealmHelper realmHelper, DDPClientRef ddpClientRef) {
super(context, hostname, realmHelper, ddpClientRef);
RealmHelper realmHelper) {
super(context, hostname, realmHelper);
count = 0;
streamNotifyMessage =
new StreamRoomMessageManager(context, hostname, realmHelper, ddpClientRef);
pushHelper = new RaixPushHelper(realmHelper, ddpClientRef);
new StreamRoomMessageManager(context, hostname, realmHelper);
pushHelper = new RaixPushHelper(realmHelper);
}
@Override
......
package chat.rocket.android.service.observer;
import android.content.Context;
import io.realm.Realm;
import io.realm.RealmResults;
import java.util.List;
import chat.rocket.android.api.MethodCallHelper;
import chat.rocket.android.helper.LogIfError;
import chat.rocket.persistence.realm.models.internal.RealmSession;
import chat.rocket.persistence.realm.RealmHelper;
import chat.rocket.android.service.DDPClientRef;
import chat.rocket.persistence.realm.models.internal.RealmSession;
import io.realm.Realm;
import io.realm.RealmResults;
public class TokenLoginObserver extends AbstractModelObserver<RealmSession> {
private final MethodCallHelper methodCall;
public TokenLoginObserver(Context context, String hostname,
RealmHelper realmHelper, DDPClientRef ddpClientRef) {
super(context, hostname, realmHelper, ddpClientRef);
methodCall = new MethodCallHelper(realmHelper, ddpClientRef);
RealmHelper realmHelper) {
super(context, hostname, realmHelper);
methodCall = new MethodCallHelper(realmHelper);
}
@Override
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment