Commit 9ac83d10 authored by Tiago Cunha's avatar Tiago Cunha

Untagles the modules and updates dependencies

Using RxJava2
parent 1c0e06e7
...@@ -6,23 +6,23 @@ buildscript { ...@@ -6,23 +6,23 @@ buildscript {
jcenter() jcenter()
} }
dependencies { dependencies {
classpath rootProject.ext.androidPlugin classpath 'com.android.tools.build:gradle:2.2.3'
classpath rootProject.ext.retroLambdaPlugin classpath 'me.tatarka:gradle-retrolambda:3.5.0'
classpath rootProject.ext.retroLambdaPatch classpath 'me.tatarka.retrolambda.projectlombok:lombok.ast:0.2.3.a2'
} }
} }
android { android {
compileSdkVersion rootProject.ext.compileSdkVersion compileSdkVersion 25
buildToolsVersion rootProject.ext.buildToolsVersion buildToolsVersion '25.0.2'
compileOptions { compileOptions {
sourceCompatibility JavaVersion.VERSION_1_8 sourceCompatibility JavaVersion.VERSION_1_8
targetCompatibility JavaVersion.VERSION_1_8 targetCompatibility JavaVersion.VERSION_1_8
} }
defaultConfig { defaultConfig {
minSdkVersion rootProject.ext.minSdkVersion minSdkVersion 16
targetSdkVersion rootProject.ext.compileSdkVersion targetSdkVersion 25
versionCode 1 versionCode 1
versionName "0.0.8" versionName "0.0.8"
} }
...@@ -34,10 +34,18 @@ android { ...@@ -34,10 +34,18 @@ android {
} }
} }
ext {
supportVersion = '25.1.1'
}
dependencies { dependencies {
compile project(':log-wrapper') compile project(':log-wrapper')
compile rootProject.ext.supportAnnotations
compile rootProject.ext.okhttp3 compile "com.android.support:support-annotations:$supportVersion"
compile rootProject.ext.rxJava
compile rootProject.ext.boltsTask compile 'com.squareup.okhttp3:okhttp:3.6.0'
compile 'io.reactivex.rxjava2:rxjava:2.0.6'
compile 'com.parse.bolts:bolts-tasks:1.4.0'
} }
package chat.rocket.android_ddp; package chat.rocket.android_ddp;
import android.support.annotation.Nullable; import android.support.annotation.Nullable;
import io.reactivex.Flowable;
import org.json.JSONArray; import org.json.JSONArray;
import bolts.Task; import bolts.Task;
import bolts.TaskCompletionSource; import bolts.TaskCompletionSource;
import chat.rocket.android_ddp.rx.RxWebSocketCallback; import chat.rocket.android_ddp.rx.RxWebSocketCallback;
import okhttp3.OkHttpClient; import okhttp3.OkHttpClient;
import rx.Observable;
public class DDPClient { public class DDPClient {
// reference: https://github.com/eddflrs/meteor-ddp/blob/master/meteor-ddp.js // reference: https://github.com/eddflrs/meteor-ddp/blob/master/meteor-ddp.js
...@@ -53,7 +53,7 @@ public class DDPClient { ...@@ -53,7 +53,7 @@ public class DDPClient {
return task.getTask(); return task.getTask();
} }
public Observable<DDPSubscription.Event> getSubscriptionCallback() { public Flowable<DDPSubscription.Event> getSubscriptionCallback() {
return impl.getDDPSubscription(); return impl.getDDPSubscription();
} }
......
...@@ -3,6 +3,8 @@ package chat.rocket.android_ddp; ...@@ -3,6 +3,8 @@ package chat.rocket.android_ddp;
import android.support.annotation.NonNull; import android.support.annotation.NonNull;
import android.support.annotation.Nullable; import android.support.annotation.Nullable;
import android.text.TextUtils; import android.text.TextUtils;
import io.reactivex.Flowable;
import io.reactivex.disposables.CompositeDisposable;
import org.json.JSONArray; import org.json.JSONArray;
import org.json.JSONException; import org.json.JSONException;
import org.json.JSONObject; import org.json.JSONObject;
...@@ -15,15 +17,12 @@ import chat.rocket.android.log.RCLog; ...@@ -15,15 +17,12 @@ import chat.rocket.android.log.RCLog;
import chat.rocket.android_ddp.rx.RxWebSocket; import chat.rocket.android_ddp.rx.RxWebSocket;
import chat.rocket.android_ddp.rx.RxWebSocketCallback; import chat.rocket.android_ddp.rx.RxWebSocketCallback;
import okhttp3.OkHttpClient; import okhttp3.OkHttpClient;
import rx.Observable;
import rx.functions.Func1;
import rx.subscriptions.CompositeSubscription;
public class DDPClientImpl { public class DDPClientImpl {
private final DDPClient client; private final DDPClient client;
private final RxWebSocket websocket; private final RxWebSocket websocket;
private Observable<RxWebSocketCallback.Base> observable; private Flowable<RxWebSocketCallback.Base> flowable;
private CompositeSubscription subscriptions; private CompositeDisposable subscriptions;
public DDPClientImpl(DDPClient self, OkHttpClient client) { public DDPClientImpl(DDPClient self, OkHttpClient client) {
websocket = new RxWebSocket(client); websocket = new RxWebSocket(client);
...@@ -52,10 +51,10 @@ public class DDPClientImpl { ...@@ -52,10 +51,10 @@ public class DDPClientImpl {
public void connect(final TaskCompletionSource<DDPClientCallback.Connect> task, final String url, public void connect(final TaskCompletionSource<DDPClientCallback.Connect> task, final String url,
String session) { String session) {
try { try {
observable = websocket.connect(url).autoConnect(); flowable = websocket.connect(url).autoConnect();
CompositeSubscription subscriptions = new CompositeSubscription(); CompositeDisposable subscriptions = new CompositeDisposable();
subscriptions.add(observable.filter(callback -> callback instanceof RxWebSocketCallback.Open) subscriptions.add(flowable.filter(callback -> callback instanceof RxWebSocketCallback.Open)
.subscribe(callback -> { .subscribe(callback -> {
sendMessage("connect", sendMessage("connect",
json -> (TextUtils.isEmpty(session) ? json : json.put("session", session)).put( json -> (TextUtils.isEmpty(session) ? json : json.put("session", session)).put(
...@@ -65,7 +64,7 @@ public class DDPClientImpl { ...@@ -65,7 +64,7 @@ public class DDPClientImpl {
})); }));
subscriptions.add( subscriptions.add(
observable.filter(callback -> callback instanceof RxWebSocketCallback.Message) flowable.filter(callback -> callback instanceof RxWebSocketCallback.Message)
.map(callback -> ((RxWebSocketCallback.Message) callback).responseBodyString) .map(callback -> ((RxWebSocketCallback.Message) callback).responseBodyString)
.map(DDPClientImpl::toJson) .map(DDPClientImpl::toJson)
.timeout(7, TimeUnit.SECONDS) .timeout(7, TimeUnit.SECONDS)
...@@ -74,15 +73,15 @@ public class DDPClientImpl { ...@@ -74,15 +73,15 @@ public class DDPClientImpl {
if ("connected".equals(msg) && !response.isNull("session")) { if ("connected".equals(msg) && !response.isNull("session")) {
task.trySetResult( task.trySetResult(
new DDPClientCallback.Connect(client, response.optString("session"))); new DDPClientCallback.Connect(client, response.optString("session")));
subscriptions.unsubscribe(); subscriptions.dispose();
} else if ("error".equals(msg) && "Already connected".equals( } else if ("error".equals(msg) && "Already connected".equals(
response.optString("reason"))) { response.optString("reason"))) {
task.trySetResult(new DDPClientCallback.Connect(client, null)); task.trySetResult(new DDPClientCallback.Connect(client, null));
subscriptions.unsubscribe(); subscriptions.dispose();
} else if ("failed".equals(msg)) { } else if ("failed".equals(msg)) {
task.trySetError( task.trySetError(
new DDPClientCallback.Connect.Failed(client, response.optString("version"))); new DDPClientCallback.Connect.Failed(client, response.optString("version")));
subscriptions.unsubscribe(); subscriptions.dispose();
} }
}, err -> { }, err -> {
task.trySetError(new DDPClientCallback.Connect.Timeout(client)); task.trySetError(new DDPClientCallback.Connect.Timeout(client));
...@@ -104,10 +103,10 @@ public class DDPClientImpl { ...@@ -104,10 +103,10 @@ public class DDPClientImpl {
sendMessage("ping", json -> json.put("id", id)); sendMessage("ping", json -> json.put("id", id));
if (requested) { if (requested) {
CompositeSubscription subscriptions = new CompositeSubscription(); CompositeDisposable subscriptions = new CompositeDisposable();
subscriptions.add( subscriptions.add(
observable.filter(callback -> callback instanceof RxWebSocketCallback.Message) flowable.filter(callback -> callback instanceof RxWebSocketCallback.Message)
.map(callback -> ((RxWebSocketCallback.Message) callback).responseBodyString) .map(callback -> ((RxWebSocketCallback.Message) callback).responseBodyString)
.map(DDPClientImpl::toJson) .map(DDPClientImpl::toJson)
.timeout(4, TimeUnit.SECONDS) .timeout(4, TimeUnit.SECONDS)
...@@ -116,12 +115,12 @@ public class DDPClientImpl { ...@@ -116,12 +115,12 @@ public class DDPClientImpl {
if ("pong".equals(msg)) { if ("pong".equals(msg)) {
if (response.isNull("id")) { if (response.isNull("id")) {
task.setResult(new DDPClientCallback.Ping(client, null)); task.setResult(new DDPClientCallback.Ping(client, null));
subscriptions.unsubscribe(); subscriptions.dispose();
} else { } else {
String _id = response.optString("id"); String _id = response.optString("id");
if (id.equals(_id)) { if (id.equals(_id)) {
task.setResult(new DDPClientCallback.Ping(client, id)); task.setResult(new DDPClientCallback.Ping(client, id));
subscriptions.unsubscribe(); subscriptions.dispose();
} }
} }
} }
...@@ -141,10 +140,10 @@ public class DDPClientImpl { ...@@ -141,10 +140,10 @@ public class DDPClientImpl {
sendMessage("sub", json -> json.put("id", id).put("name", name).put("params", params)); sendMessage("sub", json -> json.put("id", id).put("name", name).put("params", params));
if (requested) { if (requested) {
CompositeSubscription subscriptions = new CompositeSubscription(); CompositeDisposable subscriptions = new CompositeDisposable();
subscriptions.add( subscriptions.add(
observable.filter(callback -> callback instanceof RxWebSocketCallback.Message) flowable.filter(callback -> callback instanceof RxWebSocketCallback.Message)
.map(callback -> ((RxWebSocketCallback.Message) callback).responseBodyString) .map(callback -> ((RxWebSocketCallback.Message) callback).responseBodyString)
.map(DDPClientImpl::toJson) .map(DDPClientImpl::toJson)
.subscribe(response -> { .subscribe(response -> {
...@@ -155,7 +154,7 @@ public class DDPClientImpl { ...@@ -155,7 +154,7 @@ public class DDPClientImpl {
String _id = ids.optString(i); String _id = ids.optString(i);
if (id.equals(_id)) { if (id.equals(_id)) {
task.setResult(new DDPSubscription.Ready(client, id)); task.setResult(new DDPSubscription.Ready(client, id));
subscriptions.unsubscribe(); subscriptions.dispose();
break; break;
} }
} }
...@@ -165,7 +164,7 @@ public class DDPClientImpl { ...@@ -165,7 +164,7 @@ public class DDPClientImpl {
if (id.equals(_id)) { if (id.equals(_id)) {
task.setError(new DDPSubscription.NoSub.Error(client, id, task.setError(new DDPSubscription.NoSub.Error(client, id,
response.optJSONObject("error"))); response.optJSONObject("error")));
subscriptions.unsubscribe(); subscriptions.dispose();
} }
} }
}, err -> { }, err -> {
...@@ -183,10 +182,10 @@ public class DDPClientImpl { ...@@ -183,10 +182,10 @@ public class DDPClientImpl {
final boolean requested = sendMessage("unsub", json -> json.put("id", id)); final boolean requested = sendMessage("unsub", json -> json.put("id", id));
if (requested) { if (requested) {
CompositeSubscription subscriptions = new CompositeSubscription(); CompositeDisposable subscriptions = new CompositeDisposable();
subscriptions.add( subscriptions.add(
observable.filter(callback -> callback instanceof RxWebSocketCallback.Message) flowable.filter(callback -> callback instanceof RxWebSocketCallback.Message)
.map(callback -> ((RxWebSocketCallback.Message) callback).responseBodyString) .map(callback -> ((RxWebSocketCallback.Message) callback).responseBodyString)
.map(DDPClientImpl::toJson) .map(DDPClientImpl::toJson)
.subscribe(response -> { .subscribe(response -> {
...@@ -195,7 +194,7 @@ public class DDPClientImpl { ...@@ -195,7 +194,7 @@ public class DDPClientImpl {
String _id = response.optString("id"); String _id = response.optString("id");
if (id.equals(_id)) { if (id.equals(_id)) {
task.setResult(new DDPSubscription.NoSub(client, id)); task.setResult(new DDPSubscription.NoSub(client, id));
subscriptions.unsubscribe(); subscriptions.dispose();
} }
} }
}, err -> { }, err -> {
...@@ -214,10 +213,10 @@ public class DDPClientImpl { ...@@ -214,10 +213,10 @@ public class DDPClientImpl {
json -> json.put("method", method).put("params", params).put("id", id)); json -> json.put("method", method).put("params", params).put("id", id));
if (requested) { if (requested) {
CompositeSubscription subscriptions = new CompositeSubscription(); CompositeDisposable subscriptions = new CompositeDisposable();
subscriptions.add( subscriptions.add(
observable.filter(callback -> callback instanceof RxWebSocketCallback.Message) flowable.filter(callback -> callback instanceof RxWebSocketCallback.Message)
.map(callback -> ((RxWebSocketCallback.Message) callback).responseBodyString) .map(callback -> ((RxWebSocketCallback.Message) callback).responseBodyString)
.map(DDPClientImpl::toJson) .map(DDPClientImpl::toJson)
.timeout(timeoutMs, TimeUnit.MILLISECONDS) .timeout(timeoutMs, TimeUnit.MILLISECONDS)
...@@ -233,7 +232,7 @@ public class DDPClientImpl { ...@@ -233,7 +232,7 @@ public class DDPClientImpl {
String result = response.optString("result"); String result = response.optString("result");
task.setResult(new DDPClientCallback.RPC(client, id, result)); task.setResult(new DDPClientCallback.RPC(client, id, result));
} }
subscriptions.unsubscribe(); subscriptions.dispose();
} }
} }
}, err -> { }, err -> {
...@@ -250,13 +249,13 @@ public class DDPClientImpl { ...@@ -250,13 +249,13 @@ public class DDPClientImpl {
private void subscribeBaseListeners() { private void subscribeBaseListeners() {
if (subscriptions != null && if (subscriptions != null &&
subscriptions.hasSubscriptions() && !subscriptions.isUnsubscribed()) { subscriptions.size() > 0 && !subscriptions.isDisposed()) {
return; return;
} }
subscriptions = new CompositeSubscription(); subscriptions = new CompositeDisposable();
subscriptions.add( subscriptions.add(
observable.filter(callback -> callback instanceof RxWebSocketCallback.Message) flowable.filter(callback -> callback instanceof RxWebSocketCallback.Message)
.map(callback -> ((RxWebSocketCallback.Message) callback).responseBodyString) .map(callback -> ((RxWebSocketCallback.Message) callback).responseBodyString)
.map(DDPClientImpl::toJson) .map(DDPClientImpl::toJson)
.subscribe(response -> { .subscribe(response -> {
...@@ -272,9 +271,9 @@ public class DDPClientImpl { ...@@ -272,9 +271,9 @@ public class DDPClientImpl {
})); }));
} }
public Observable<DDPSubscription.Event> getDDPSubscription() { public Flowable<DDPSubscription.Event> getDDPSubscription() {
String[] targetMsgs = {"added", "changed", "removed", "addedBefore", "movedBefore"}; String[] targetMsgs = {"added", "changed", "removed", "addedBefore", "movedBefore"};
return observable.filter(callback -> callback instanceof RxWebSocketCallback.Message) return flowable.filter(callback -> callback instanceof RxWebSocketCallback.Message)
.map(callback -> ((RxWebSocketCallback.Message) callback).responseBodyString) .map(callback -> ((RxWebSocketCallback.Message) callback).responseBodyString)
.map(DDPClientImpl::toJson) .map(DDPClientImpl::toJson)
.filter(response -> { .filter(response -> {
...@@ -286,7 +285,7 @@ public class DDPClientImpl { ...@@ -286,7 +285,7 @@ public class DDPClientImpl {
} }
return false; return false;
}) })
.map((Func1<JSONObject, DDPSubscription.Event>) response -> { .map(response -> {
String msg = extractMsg(response); String msg = extractMsg(response);
if ("added".equals(msg)) { if ("added".equals(msg)) {
return new DDPSubscription.Added(client, response.optString("collection"), return new DDPSubscription.Added(client, response.optString("collection"),
...@@ -312,20 +311,19 @@ public class DDPClientImpl { ...@@ -312,20 +311,19 @@ public class DDPClientImpl {
} }
return null; return null;
}) });
.asObservable();
} }
public void unsubscribeBaseListeners() { public void unsubscribeBaseListeners() {
if (subscriptions.hasSubscriptions() && !subscriptions.isUnsubscribed()) { if (subscriptions.size() > 0 || !subscriptions.isDisposed()) {
subscriptions.unsubscribe(); subscriptions.dispose();
} }
} }
public Task<RxWebSocketCallback.Close> getOnCloseCallback() { public Task<RxWebSocketCallback.Close> getOnCloseCallback() {
TaskCompletionSource<RxWebSocketCallback.Close> task = new TaskCompletionSource<>(); TaskCompletionSource<RxWebSocketCallback.Close> task = new TaskCompletionSource<>();
observable.filter(callback -> callback instanceof RxWebSocketCallback.Close) flowable.filter(callback -> callback instanceof RxWebSocketCallback.Close)
.cast(RxWebSocketCallback.Close.class) .cast(RxWebSocketCallback.Close.class)
.subscribe(task::setResult, err -> { .subscribe(task::setResult, err -> {
if (err instanceof Exception) { if (err instanceof Exception) {
...@@ -359,11 +357,11 @@ public class DDPClientImpl { ...@@ -359,11 +357,11 @@ public class DDPClientImpl {
} }
} }
private void addErrorCallback(CompositeSubscription subscriptions, TaskCompletionSource<?> task) { private void addErrorCallback(CompositeDisposable subscriptions, TaskCompletionSource<?> task) {
subscriptions.add(observable.subscribe(base -> { subscriptions.add(flowable.subscribe(base -> {
}, err -> { }, err -> {
task.trySetError(new Exception(err)); task.trySetError(new Exception(err));
subscriptions.unsubscribe(); subscriptions.dispose();
})); }));
} }
......
package chat.rocket.android_ddp.rx; package chat.rocket.android_ddp.rx;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableEmitter;
import io.reactivex.FlowableOnSubscribe;
import io.reactivex.exceptions.OnErrorNotImplementedException;
import io.reactivex.flowables.ConnectableFlowable;
import java.io.IOException; import java.io.IOException;
import chat.rocket.android.log.RCLog; import chat.rocket.android.log.RCLog;
import okhttp3.OkHttpClient; import okhttp3.OkHttpClient;
...@@ -7,11 +14,6 @@ import okhttp3.Request; ...@@ -7,11 +14,6 @@ import okhttp3.Request;
import okhttp3.Response; import okhttp3.Response;
import okhttp3.WebSocket; import okhttp3.WebSocket;
import okhttp3.WebSocketListener; import okhttp3.WebSocketListener;
import rx.Emitter;
import rx.Observable;
import rx.exceptions.OnErrorNotImplementedException;
import rx.functions.Action1;
import rx.observables.ConnectableObservable;
public class RxWebSocket { public class RxWebSocket {
private OkHttpClient httpClient; private OkHttpClient httpClient;
...@@ -21,13 +23,14 @@ public class RxWebSocket { ...@@ -21,13 +23,14 @@ public class RxWebSocket {
httpClient = client; httpClient = client;
} }
public ConnectableObservable<RxWebSocketCallback.Base> connect(String url) { public ConnectableFlowable<RxWebSocketCallback.Base> connect(String url) {
final Request request = new Request.Builder().url(url).build(); final Request request = new Request.Builder().url(url).build();
return Observable.fromEmitter( return Flowable.create(
new Action1<Emitter<RxWebSocketCallback.Base>>() { new FlowableOnSubscribe<RxWebSocketCallback.Base>() {
@Override @Override
public void call(Emitter<RxWebSocketCallback.Base> emitter) { public void subscribe(FlowableEmitter<RxWebSocketCallback.Base> emitter)
throws Exception {
httpClient.newWebSocket(request, new WebSocketListener() { httpClient.newWebSocket(request, new WebSocketListener() {
@Override @Override
public void onOpen(WebSocket webSocket, Response response) { public void onOpen(WebSocket webSocket, Response response) {
...@@ -52,12 +55,11 @@ public class RxWebSocket { ...@@ -52,12 +55,11 @@ public class RxWebSocket {
@Override @Override
public void onClosed(WebSocket webSocket, int code, String reason) { public void onClosed(WebSocket webSocket, int code, String reason) {
emitter.onNext(new RxWebSocketCallback.Close(webSocket, code, reason)); emitter.onNext(new RxWebSocketCallback.Close(webSocket, code, reason));
emitter.onCompleted(); emitter.onComplete();
} }
}); });
} }
}, }, BackpressureStrategy.BUFFER
Emitter.BackpressureMode.BUFFER
).publish(); ).publish();
} }
......
...@@ -10,13 +10,13 @@ buildscript { ...@@ -10,13 +10,13 @@ buildscript {
mavenCentral() mavenCentral()
} }
dependencies { dependencies {
classpath rootProject.ext.androidPlugin classpath 'com.android.tools.build:gradle:2.2.3'
// NOTE: Do not place your application dependencies here; they belong // NOTE: Do not place your application dependencies here; they belong
// in the individual module build.gradle files // in the individual module build.gradle files
classpath rootProject.ext.retroLambdaPlugin classpath 'me.tatarka:gradle-retrolambda:3.5.0'
classpath rootProject.ext.retroLambdaPatch classpath 'me.tatarka.retrolambda.projectlombok:lombok.ast:0.2.3.a2'
classpath rootProject.ext.realmPlugin classpath 'io.realm:realm-gradle-plugin:2.3.1'
classpath 'com.jakewharton.hugo:hugo-plugin:1.2.1' classpath 'com.jakewharton.hugo:hugo-plugin:1.2.1'
classpath 'com.google.gms:google-services:3.0.0' classpath 'com.google.gms:google-services:3.0.0'
classpath 'com.github.triplet.gradle:play-publisher:1.1.5' classpath 'com.github.triplet.gradle:play-publisher:1.1.5'
...@@ -27,12 +27,12 @@ buildscript { ...@@ -27,12 +27,12 @@ buildscript {
} }
android { android {
compileSdkVersion rootProject.ext.compileSdkVersion compileSdkVersion 25
buildToolsVersion rootProject.ext.buildToolsVersion buildToolsVersion '25.0.2'
defaultConfig { defaultConfig {
applicationId "chat.rocket.android" applicationId "chat.rocket.android"
minSdkVersion rootProject.ext.minSdkVersion minSdkVersion 16
targetSdkVersion rootProject.ext.compileSdkVersion targetSdkVersion 25
versionCode 9 versionCode 9
versionName "1.0.0" versionName "1.0.0"
testInstrumentationRunner "android.support.test.runner.AndroidJUnitRunner" testInstrumentationRunner "android.support.test.runner.AndroidJUnitRunner"
...@@ -66,6 +66,7 @@ android { ...@@ -66,6 +66,7 @@ android {
packagingOptions { packagingOptions {
exclude 'META-INF/LICENSE.txt' exclude 'META-INF/LICENSE.txt'
exclude 'META-INF/NOTICE.txt' exclude 'META-INF/NOTICE.txt'
exclude 'META-INF/rxjava.properties'
} }
compileOptions { compileOptions {
sourceCompatibility JavaVersion.VERSION_1_8 sourceCompatibility JavaVersion.VERSION_1_8
...@@ -81,45 +82,56 @@ play { ...@@ -81,45 +82,56 @@ play {
track = "${track}" track = "${track}"
} }
ext {
supportVersion = '25.1.1'
playLibVersion = '10.2.0'
stethoVersion = '1.4.2'
rxbindingVersion = '1.0.0'
rxlifecycleVersion = '2.0.1'
icepickVersion = '3.2.0'
permissionsdispatcherVersion = '2.3.1'
}
dependencies { dependencies {
compile project(':log-wrapper') compile project(':log-wrapper')
compile project(':android-ddp') compile project(':android-ddp')
compile project(':rocket-chat-core') compile project(':rocket-chat-core')
compile project(':rocket-chat-android-widgets') compile project(':rocket-chat-android-widgets')
compile project(':persistence-realm') compile project(':persistence-realm')
compile rootProject.ext.supportAppCompat
compile rootProject.ext.supportDesign compile "com.android.support:appcompat-v7:$supportVersion"
compile "com.android.support:design:$supportVersion"
qaCompile('com.instabug.library:instabug:3.1.0') { qaCompile('com.instabug.library:instabug:3.1.0') {
exclude group: 'io.reactivex' exclude group: 'io.reactivex'
} }
compile 'com.android.support:multidex:1.0.1' compile 'com.android.support:multidex:1.0.1'
compile 'com.google.firebase:firebase-core:10.2.0' compile "com.google.firebase:firebase-core:$playLibVersion"
compile 'com.google.firebase:firebase-crash:10.2.0' compile "com.google.firebase:firebase-crash:$playLibVersion"
compile 'com.google.android.gms:play-services-gcm:10.2.0' compile "com.google.android.gms:play-services-gcm:$playLibVersion"
compile rootProject.ext.okhttp3 compile 'com.squareup.okhttp3:okhttp:3.6.0'
compile 'com.facebook.stetho:stetho:1.4.1' compile "com.facebook.stetho:stetho:$stethoVersion"
compile 'com.facebook.stetho:stetho-okhttp3:1.4.1' compile "com.facebook.stetho:stetho-okhttp3:$stethoVersion"
compile 'com.uphyca:stetho_realm:2.0.1' compile 'com.uphyca:stetho_realm:2.0.1'
compile 'com.jakewharton.rxbinding:rxbinding:1.0.0' compile "com.jakewharton.rxbinding:rxbinding:$rxbindingVersion"
compile 'com.jakewharton.rxbinding:rxbinding-support-v4:1.0.0' compile "com.jakewharton.rxbinding:rxbinding-support-v4:$rxbindingVersion"
compile 'com.trello:rxlifecycle:1.0' compile "com.trello.rxlifecycle2:rxlifecycle:$rxlifecycleVersion"
compile 'com.trello:rxlifecycle-android:1.0' compile "com.trello.rxlifecycle2:rxlifecycle-android:$rxlifecycleVersion"
compile 'com.trello:rxlifecycle-components:1.0' compile "com.trello.rxlifecycle2:rxlifecycle-components:$rxlifecycleVersion"
compile rootProject.ext.textDrawable compile 'com.amulyakhare:com.amulyakhare.textdrawable:1.0.1'
compile 'frankiesardo:icepick:3.2.0' compile "frankiesardo:icepick:$icepickVersion"
provided 'frankiesardo:icepick-processor:3.2.0' provided "frankiesardo:icepick-processor:$icepickVersion"
compile 'com.github.hotchemi:permissionsdispatcher:2.3.0' compile "com.github.hotchemi:permissionsdispatcher:$permissionsdispatcherVersion"
annotationProcessor 'com.github.hotchemi:permissionsdispatcher-processor:2.3.0' annotationProcessor "com.github.hotchemi:permissionsdispatcher-processor:$permissionsdispatcherVersion"
} }
apply plugin: 'com.google.gms.google-services' apply plugin: 'com.google.gms.google-services'
...@@ -6,7 +6,8 @@ import android.support.annotation.Nullable; ...@@ -6,7 +6,8 @@ import android.support.annotation.Nullable;
import android.support.v4.app.Fragment; import android.support.v4.app.Fragment;
import android.support.v4.app.FragmentManager; import android.support.v4.app.FragmentManager;
import android.view.MotionEvent; import android.view.MotionEvent;
import com.trello.rxlifecycle.components.support.RxAppCompatActivity;
import com.trello.rxlifecycle2.components.support.RxAppCompatActivity;
import chat.rocket.android.helper.OnBackPressListener; import chat.rocket.android.helper.OnBackPressListener;
import chat.rocket.android.log.RCLog; import chat.rocket.android.log.RCLog;
......
...@@ -2,12 +2,13 @@ package chat.rocket.android.activity; ...@@ -2,12 +2,13 @@ package chat.rocket.android.activity;
import android.support.annotation.NonNull; import android.support.annotation.NonNull;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.disposables.Disposable;
import chat.rocket.android.BackgroundLooper; import chat.rocket.android.BackgroundLooper;
import chat.rocket.android.service.ConnectivityManagerApi; import chat.rocket.android.service.ConnectivityManagerApi;
import chat.rocket.android.shared.BasePresenter; import chat.rocket.android.shared.BasePresenter;
import chat.rocket.core.interactors.SessionInteractor; import chat.rocket.core.interactors.SessionInteractor;
import rx.Subscription;
import rx.android.schedulers.AndroidSchedulers;
public class LoginPresenter extends BasePresenter<LoginContract.View> public class LoginPresenter extends BasePresenter<LoginContract.View>
implements LoginContract.Presenter { implements LoginContract.Presenter {
...@@ -39,7 +40,7 @@ public class LoginPresenter extends BasePresenter<LoginContract.View> ...@@ -39,7 +40,7 @@ public class LoginPresenter extends BasePresenter<LoginContract.View>
} }
private void loadSessionState() { private void loadSessionState() {
final Subscription subscription = sessionInteractor.getSessionState() final Disposable subscription = sessionInteractor.getSessionState()
.distinctUntilChanged() .distinctUntilChanged()
.subscribeOn(AndroidSchedulers.from(BackgroundLooper.get())) .subscribeOn(AndroidSchedulers.from(BackgroundLooper.get()))
.observeOn(AndroidSchedulers.mainThread()) .observeOn(AndroidSchedulers.mainThread())
......
...@@ -219,10 +219,10 @@ public class MainActivity extends AbstractAuthedActivity implements MainContract ...@@ -219,10 +219,10 @@ public class MainActivity extends AbstractAuthedActivity implements MainContract
} }
@Override @Override
public void showUnreadCount(int roomsCount, int mentionsCount) { public void showUnreadCount(long roomsCount, int mentionsCount) {
RoomToolbar toolbar = (RoomToolbar) findViewById(R.id.activity_main_toolbar); RoomToolbar toolbar = (RoomToolbar) findViewById(R.id.activity_main_toolbar);
if (toolbar != null) { if (toolbar != null) {
toolbar.setUnreadBudge(roomsCount, mentionsCount); toolbar.setUnreadBudge((int) roomsCount, mentionsCount);
} }
} }
......
...@@ -10,7 +10,7 @@ public interface MainContract { ...@@ -10,7 +10,7 @@ public interface MainContract {
void showRoom(String hostname, String roomId); void showRoom(String hostname, String roomId);
void showUnreadCount(int roomsCount, int mentionsCount); void showUnreadCount(long roomsCount, int mentionsCount);
void showLoginScreen(); void showLoginScreen();
......
...@@ -3,14 +3,16 @@ package chat.rocket.android.activity; ...@@ -3,14 +3,16 @@ package chat.rocket.android.activity;
import android.support.annotation.NonNull; import android.support.annotation.NonNull;
import android.support.v4.util.Pair; import android.support.v4.util.Pair;
import io.reactivex.Flowable;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.disposables.Disposable;
import chat.rocket.android.BackgroundLooper; import chat.rocket.android.BackgroundLooper;
import chat.rocket.android.shared.BasePresenter; import chat.rocket.android.shared.BasePresenter;
import chat.rocket.core.interactors.CanCreateRoomInteractor; import chat.rocket.core.interactors.CanCreateRoomInteractor;
import chat.rocket.core.interactors.RoomInteractor; import chat.rocket.core.interactors.RoomInteractor;
import chat.rocket.core.interactors.SessionInteractor; import chat.rocket.core.interactors.SessionInteractor;
import rx.Observable; import chat.rocket.core.models.Session;
import rx.Subscription;
import rx.android.schedulers.AndroidSchedulers;
public class MainPresenter extends BasePresenter<MainContract.View> public class MainPresenter extends BasePresenter<MainContract.View>
implements MainContract.Presenter { implements MainContract.Presenter {
...@@ -37,7 +39,7 @@ public class MainPresenter extends BasePresenter<MainContract.View> ...@@ -37,7 +39,7 @@ public class MainPresenter extends BasePresenter<MainContract.View>
@Override @Override
public void onOpenRoom(String hostname, String roomId) { public void onOpenRoom(String hostname, String roomId) {
final Subscription subscription = canCreateRoomInteractor.canCreate(roomId) final Disposable subscription = canCreateRoomInteractor.canCreate(roomId)
.subscribeOn(AndroidSchedulers.from(BackgroundLooper.get())) .subscribeOn(AndroidSchedulers.from(BackgroundLooper.get()))
.observeOn(AndroidSchedulers.mainThread()) .observeOn(AndroidSchedulers.mainThread())
.subscribe(allowed -> { .subscribe(allowed -> {
...@@ -53,14 +55,14 @@ public class MainPresenter extends BasePresenter<MainContract.View> ...@@ -53,14 +55,14 @@ public class MainPresenter extends BasePresenter<MainContract.View>
@Override @Override
public void onRetryLogin() { public void onRetryLogin() {
final Subscription subscription = sessionInteractor.retryLogin() final Disposable subscription = sessionInteractor.retryLogin()
.subscribe(); .subscribe();
addSubscription(subscription); addSubscription(subscription);
} }
private void subscribeToUnreadCount() { private void subscribeToUnreadCount() {
final Subscription subscription = Observable.combineLatest( final Disposable subscription = Flowable.combineLatest(
roomInteractor.getTotalUnreadRoomsCount(), roomInteractor.getTotalUnreadRoomsCount(),
roomInteractor.getTotalUnreadMentionsCount(), roomInteractor.getTotalUnreadMentionsCount(),
(Pair::new) (Pair::new)
...@@ -73,10 +75,11 @@ public class MainPresenter extends BasePresenter<MainContract.View> ...@@ -73,10 +75,11 @@ public class MainPresenter extends BasePresenter<MainContract.View>
} }
private void subscribeToSession() { private void subscribeToSession() {
final Subscription subscription = sessionInteractor.getDefault() final Disposable subscription = sessionInteractor.getDefault()
.subscribeOn(AndroidSchedulers.from(BackgroundLooper.get())) .subscribeOn(AndroidSchedulers.from(BackgroundLooper.get()))
.observeOn(AndroidSchedulers.mainThread()) .observeOn(AndroidSchedulers.mainThread())
.subscribe(session -> { .subscribe(sessionOptional -> {
Session session = sessionOptional.orNull();
if (session == null || session.getToken() == null) { if (session == null || session.getToken() == null) {
view.showLoginScreen(); view.showLoginScreen();
return; return;
......
package chat.rocket.android.api; package chat.rocket.android.api;
import android.support.annotation.Nullable; import android.support.annotation.Nullable;
import io.reactivex.Flowable;
import org.json.JSONArray; import org.json.JSONArray;
import org.json.JSONException; import org.json.JSONException;
...@@ -12,7 +13,6 @@ import chat.rocket.android.log.RCLog; ...@@ -12,7 +13,6 @@ import chat.rocket.android.log.RCLog;
import chat.rocket.android_ddp.DDPClient; import chat.rocket.android_ddp.DDPClient;
import chat.rocket.android_ddp.DDPClientCallback; import chat.rocket.android_ddp.DDPClientCallback;
import chat.rocket.android_ddp.DDPSubscription; import chat.rocket.android_ddp.DDPSubscription;
import rx.Observable;
/** /**
* DDP client wrapper. * DDP client wrapper.
...@@ -69,7 +69,7 @@ public class DDPClientWrapper { ...@@ -69,7 +69,7 @@ public class DDPClientWrapper {
/** /**
* Returns Observable for handling DDP subscription. * Returns Observable for handling DDP subscription.
*/ */
public Observable<DDPSubscription.Event> getSubscriptionCallback() { public Flowable<DDPSubscription.Event> getSubscriptionCallback() {
return ddpClient.getSubscriptionCallback(); return ddpClient.getSubscriptionCallback();
} }
......
...@@ -2,6 +2,9 @@ package chat.rocket.android.api.rest; ...@@ -2,6 +2,9 @@ package chat.rocket.android.api.rest;
import android.support.annotation.NonNull; import android.support.annotation.NonNull;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableEmitter;
import org.json.JSONObject; import org.json.JSONObject;
import java.io.IOException; import java.io.IOException;
...@@ -9,8 +12,6 @@ import okhttp3.Call; ...@@ -9,8 +12,6 @@ import okhttp3.Call;
import okhttp3.OkHttpClient; import okhttp3.OkHttpClient;
import okhttp3.Request; import okhttp3.Request;
import okhttp3.ResponseBody; import okhttp3.ResponseBody;
import rx.Emitter;
import rx.Observable;
public class DefaultServerPolicyApi implements ServerPolicyApi { public class DefaultServerPolicyApi implements ServerPolicyApi {
...@@ -25,23 +26,23 @@ public class DefaultServerPolicyApi implements ServerPolicyApi { ...@@ -25,23 +26,23 @@ public class DefaultServerPolicyApi implements ServerPolicyApi {
} }
@Override @Override
public Observable<Response<JSONObject>> getApiInfoSecurely() { public Flowable<Response<JSONObject>> getApiInfoSecurely() {
return getApiInfo(SECURE_PROTOCOL); return getApiInfo(SECURE_PROTOCOL);
} }
@Override @Override
public Observable<Response<JSONObject>> getApiInfoInsecurely() { public Flowable<Response<JSONObject>> getApiInfoInsecurely() {
return getApiInfo(INSECURE_PROTOCOL); return getApiInfo(INSECURE_PROTOCOL);
} }
private Observable<Response<JSONObject>> getApiInfo(@NonNull String protocol) { private Flowable<Response<JSONObject>> getApiInfo(@NonNull String protocol) {
return Observable.fromEmitter(responseEmitter -> { return Flowable.create(responseEmitter -> {
final Call call = client.newCall(createRequest(protocol)); final Call call = client.newCall(createRequest(protocol));
call.enqueue(getOkHttpCallback(responseEmitter, protocol)); call.enqueue(getOkHttpCallback(responseEmitter, protocol));
responseEmitter.setCancellation(call::cancel); responseEmitter.setCancellable(call::cancel);
}, Emitter.BackpressureMode.LATEST); }, BackpressureStrategy.LATEST);
} }
private Request createRequest(@NonNull String protocol) { private Request createRequest(@NonNull String protocol) {
...@@ -51,7 +52,7 @@ public class DefaultServerPolicyApi implements ServerPolicyApi { ...@@ -51,7 +52,7 @@ public class DefaultServerPolicyApi implements ServerPolicyApi {
.build(); .build();
} }
private okhttp3.Callback getOkHttpCallback(@NonNull Emitter<Response<JSONObject>> emitter, private okhttp3.Callback getOkHttpCallback(@NonNull FlowableEmitter<Response<JSONObject>> emitter,
@NonNull String protocol) { @NonNull String protocol) {
return new okhttp3.Callback() { return new okhttp3.Callback() {
@Override @Override
...@@ -63,14 +64,14 @@ public class DefaultServerPolicyApi implements ServerPolicyApi { ...@@ -63,14 +64,14 @@ public class DefaultServerPolicyApi implements ServerPolicyApi {
public void onResponse(Call call, okhttp3.Response response) throws IOException { public void onResponse(Call call, okhttp3.Response response) throws IOException {
if (!response.isSuccessful()) { if (!response.isSuccessful()) {
emitter.onNext(new Response<>(false, protocol, null)); emitter.onNext(new Response<>(false, protocol, null));
emitter.onCompleted(); emitter.onComplete();
return; return;
} }
final ResponseBody body = response.body(); final ResponseBody body = response.body();
if (body == null || body.contentLength() == 0) { if (body == null || body.contentLength() == 0) {
emitter.onNext(new Response<>(false, protocol, null)); emitter.onNext(new Response<>(false, protocol, null));
emitter.onCompleted(); emitter.onComplete();
return; return;
} }
...@@ -80,7 +81,7 @@ public class DefaultServerPolicyApi implements ServerPolicyApi { ...@@ -80,7 +81,7 @@ public class DefaultServerPolicyApi implements ServerPolicyApi {
emitter.onNext(new Response<>(false, protocol, null)); emitter.onNext(new Response<>(false, protocol, null));
} }
emitter.onCompleted(); emitter.onComplete();
} }
}; };
} }
......
package chat.rocket.android.api.rest; package chat.rocket.android.api.rest;
import io.reactivex.Flowable;
import org.json.JSONObject; import org.json.JSONObject;
import rx.Observable;
public interface ServerPolicyApi { public interface ServerPolicyApi {
String SECURE_PROTOCOL = "https://"; String SECURE_PROTOCOL = "https://";
String INSECURE_PROTOCOL = "http://"; String INSECURE_PROTOCOL = "http://";
Observable<Response<JSONObject>> getApiInfoSecurely(); Flowable<Response<JSONObject>> getApiInfoSecurely();
Observable<Response<JSONObject>> getApiInfoInsecurely(); Flowable<Response<JSONObject>> getApiInfoInsecurely();
} }
...@@ -6,7 +6,7 @@ import android.support.annotation.Nullable; ...@@ -6,7 +6,7 @@ import android.support.annotation.Nullable;
import android.view.LayoutInflater; import android.view.LayoutInflater;
import android.view.View; import android.view.View;
import android.view.ViewGroup; import android.view.ViewGroup;
import com.trello.rxlifecycle.components.support.RxFragment; import com.trello.rxlifecycle2.components.support.RxFragment;
/** /**
* Fragment base class for this Application. * Fragment base class for this Application.
......
...@@ -2,6 +2,10 @@ package chat.rocket.android.fragment.add_server; ...@@ -2,6 +2,10 @@ package chat.rocket.android.fragment.add_server;
import android.content.SharedPreferences; import android.content.SharedPreferences;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;
import chat.rocket.android.RocketChatCache; import chat.rocket.android.RocketChatCache;
import chat.rocket.android.api.rest.DefaultServerPolicyApi; import chat.rocket.android.api.rest.DefaultServerPolicyApi;
import chat.rocket.android.api.rest.ServerPolicyApi; import chat.rocket.android.api.rest.ServerPolicyApi;
...@@ -10,9 +14,6 @@ import chat.rocket.android.helper.ServerPolicyApiValidationHelper; ...@@ -10,9 +14,6 @@ import chat.rocket.android.helper.ServerPolicyApiValidationHelper;
import chat.rocket.android.helper.ServerPolicyHelper; import chat.rocket.android.helper.ServerPolicyHelper;
import chat.rocket.android.service.ConnectivityManagerApi; import chat.rocket.android.service.ConnectivityManagerApi;
import chat.rocket.android.shared.BasePresenter; import chat.rocket.android.shared.BasePresenter;
import rx.Subscription;
import rx.android.schedulers.AndroidSchedulers;
import rx.schedulers.Schedulers;
public class InputHostnamePresenter extends BasePresenter<InputHostnameContract.View> public class InputHostnamePresenter extends BasePresenter<InputHostnameContract.View>
implements InputHostnameContract.Presenter { implements InputHostnameContract.Presenter {
...@@ -40,9 +41,9 @@ public class InputHostnamePresenter extends BasePresenter<InputHostnameContract. ...@@ -40,9 +41,9 @@ public class InputHostnamePresenter extends BasePresenter<InputHostnameContract.
final ServerPolicyApiValidationHelper validationHelper = final ServerPolicyApiValidationHelper validationHelper =
new ServerPolicyApiValidationHelper(serverPolicyApi); new ServerPolicyApiValidationHelper(serverPolicyApi);
clearSubscripions(); clearSubscriptions();
final Subscription subscription = ServerPolicyHelper.isApiVersionValid(validationHelper) final Disposable subscription = ServerPolicyHelper.isApiVersionValid(validationHelper)
.subscribeOn(Schedulers.io()) .subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread()) .observeOn(AndroidSchedulers.mainThread())
.doOnTerminate(() -> view.hideLoader()) .doOnTerminate(() -> view.hideLoader())
......
...@@ -19,6 +19,7 @@ import android.support.v7.app.AlertDialog; ...@@ -19,6 +19,7 @@ import android.support.v7.app.AlertDialog;
import android.support.v7.widget.LinearLayoutManager; import android.support.v7.widget.LinearLayoutManager;
import android.support.v7.widget.RecyclerView; import android.support.v7.widget.RecyclerView;
import android.view.View; import android.view.View;
import com.jakewharton.rxbinding.support.v4.widget.RxDrawerLayout; import com.jakewharton.rxbinding.support.v4.widget.RxDrawerLayout;
import java.lang.reflect.Field; import java.lang.reflect.Field;
...@@ -56,6 +57,7 @@ import chat.rocket.persistence.realm.RealmStore; ...@@ -56,6 +57,7 @@ import chat.rocket.persistence.realm.RealmStore;
import chat.rocket.android.service.ConnectivityManager; import chat.rocket.android.service.ConnectivityManager;
import chat.rocket.android.widget.internal.ExtraActionPickerDialogFragment; import chat.rocket.android.widget.internal.ExtraActionPickerDialogFragment;
import chat.rocket.android.widget.message.MessageFormLayout; import chat.rocket.android.widget.message.MessageFormLayout;
import hu.akarnokd.rxjava.interop.RxJavaInterop;
import permissions.dispatcher.NeedsPermission; import permissions.dispatcher.NeedsPermission;
import permissions.dispatcher.RuntimePermissions; import permissions.dispatcher.RuntimePermissions;
...@@ -233,7 +235,7 @@ public class RoomFragment extends AbstractChatRoomFragment ...@@ -233,7 +235,7 @@ public class RoomFragment extends AbstractChatRoomFragment
DrawerLayout drawerLayout = (DrawerLayout) rootView.findViewById(R.id.drawer_layout); DrawerLayout drawerLayout = (DrawerLayout) rootView.findViewById(R.id.drawer_layout);
SlidingPaneLayout pane = (SlidingPaneLayout) getActivity().findViewById(R.id.sliding_pane); SlidingPaneLayout pane = (SlidingPaneLayout) getActivity().findViewById(R.id.sliding_pane);
if (drawerLayout != null && pane != null) { if (drawerLayout != null && pane != null) {
RxDrawerLayout.drawerOpen(drawerLayout, GravityCompat.END) RxJavaInterop.toV2Flowable(RxDrawerLayout.drawerOpen(drawerLayout, GravityCompat.END))
.compose(bindToLifecycle()) .compose(bindToLifecycle())
.subscribe(opened -> { .subscribe(opened -> {
try { try {
......
...@@ -4,6 +4,11 @@ import android.support.annotation.NonNull; ...@@ -4,6 +4,11 @@ import android.support.annotation.NonNull;
import android.support.annotation.Nullable; import android.support.annotation.Nullable;
import android.support.v4.util.Pair; import android.support.v4.util.Pair;
import com.fernandocejas.arrow.optional.Optional;
import io.reactivex.Single;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.disposables.Disposable;
import chat.rocket.android.BackgroundLooper; import chat.rocket.android.BackgroundLooper;
import chat.rocket.android.api.MethodCallHelper; import chat.rocket.android.api.MethodCallHelper;
import chat.rocket.android.helper.LogIfError; import chat.rocket.android.helper.LogIfError;
...@@ -16,9 +21,6 @@ import chat.rocket.core.models.User; ...@@ -16,9 +21,6 @@ import chat.rocket.core.models.User;
import chat.rocket.core.repositories.RoomRepository; import chat.rocket.core.repositories.RoomRepository;
import chat.rocket.core.repositories.UserRepository; import chat.rocket.core.repositories.UserRepository;
import chat.rocket.android.service.ConnectivityManagerApi; import chat.rocket.android.service.ConnectivityManagerApi;
import rx.Single;
import rx.Subscription;
import rx.android.schedulers.AndroidSchedulers;
public class RoomPresenter extends BasePresenter<RoomContract.View> public class RoomPresenter extends BasePresenter<RoomContract.View>
implements RoomContract.Presenter { implements RoomContract.Presenter {
...@@ -55,7 +57,7 @@ public class RoomPresenter extends BasePresenter<RoomContract.View> ...@@ -55,7 +57,7 @@ public class RoomPresenter extends BasePresenter<RoomContract.View>
@Override @Override
public void loadMessages() { public void loadMessages() {
final Subscription subscription = getSingleRoom() final Disposable subscription = getSingleRoom()
.flatMap(messageInteractor::loadMessages) .flatMap(messageInteractor::loadMessages)
.subscribeOn(AndroidSchedulers.from(BackgroundLooper.get())) .subscribeOn(AndroidSchedulers.from(BackgroundLooper.get()))
.observeOn(AndroidSchedulers.mainThread()) .observeOn(AndroidSchedulers.mainThread())
...@@ -71,7 +73,7 @@ public class RoomPresenter extends BasePresenter<RoomContract.View> ...@@ -71,7 +73,7 @@ public class RoomPresenter extends BasePresenter<RoomContract.View>
@Override @Override
public void loadMoreMessages() { public void loadMoreMessages() {
final Subscription subscription = getSingleRoom() final Disposable subscription = getSingleRoom()
.flatMap(messageInteractor::loadMoreMessages) .flatMap(messageInteractor::loadMoreMessages)
.subscribeOn(AndroidSchedulers.from(BackgroundLooper.get())) .subscribeOn(AndroidSchedulers.from(BackgroundLooper.get()))
.observeOn(AndroidSchedulers.mainThread()) .observeOn(AndroidSchedulers.mainThread())
...@@ -97,7 +99,7 @@ public class RoomPresenter extends BasePresenter<RoomContract.View> ...@@ -97,7 +99,7 @@ public class RoomPresenter extends BasePresenter<RoomContract.View>
@Override @Override
public void sendMessage(String messageText) { public void sendMessage(String messageText) {
final Subscription subscription = getRoomUserPair() final Disposable subscription = getRoomUserPair()
.flatMap(pair -> messageInteractor.send(pair.first, pair.second, messageText)) .flatMap(pair -> messageInteractor.send(pair.first, pair.second, messageText))
.subscribeOn(AndroidSchedulers.from(BackgroundLooper.get())) .subscribeOn(AndroidSchedulers.from(BackgroundLooper.get()))
.observeOn(AndroidSchedulers.mainThread()) .observeOn(AndroidSchedulers.mainThread())
...@@ -112,7 +114,7 @@ public class RoomPresenter extends BasePresenter<RoomContract.View> ...@@ -112,7 +114,7 @@ public class RoomPresenter extends BasePresenter<RoomContract.View>
@Override @Override
public void resendMessage(Message message) { public void resendMessage(Message message) {
final Subscription subscription = messageInteractor.resend(message) final Disposable subscription = messageInteractor.resend(message)
.subscribeOn(AndroidSchedulers.from(BackgroundLooper.get())) .subscribeOn(AndroidSchedulers.from(BackgroundLooper.get()))
.observeOn(AndroidSchedulers.mainThread()) .observeOn(AndroidSchedulers.mainThread())
.subscribe(); .subscribe();
...@@ -122,7 +124,7 @@ public class RoomPresenter extends BasePresenter<RoomContract.View> ...@@ -122,7 +124,7 @@ public class RoomPresenter extends BasePresenter<RoomContract.View>
@Override @Override
public void deleteMessage(Message message) { public void deleteMessage(Message message) {
final Subscription subscription = messageInteractor.delete(message) final Disposable subscription = messageInteractor.delete(message)
.subscribeOn(AndroidSchedulers.from(BackgroundLooper.get())) .subscribeOn(AndroidSchedulers.from(BackgroundLooper.get()))
.observeOn(AndroidSchedulers.mainThread()) .observeOn(AndroidSchedulers.mainThread())
.subscribe(); .subscribe();
...@@ -132,7 +134,7 @@ public class RoomPresenter extends BasePresenter<RoomContract.View> ...@@ -132,7 +134,7 @@ public class RoomPresenter extends BasePresenter<RoomContract.View>
@Override @Override
public void onUnreadCount() { public void onUnreadCount() {
final Subscription subscription = getRoomUserPair() final Disposable subscription = getRoomUserPair()
.flatMap(roomUserPair -> messageInteractor .flatMap(roomUserPair -> messageInteractor
.unreadCountFor(roomUserPair.first, roomUserPair.second)) .unreadCountFor(roomUserPair.first, roomUserPair.second))
.subscribeOn(AndroidSchedulers.from(BackgroundLooper.get())) .subscribeOn(AndroidSchedulers.from(BackgroundLooper.get()))
...@@ -146,8 +148,8 @@ public class RoomPresenter extends BasePresenter<RoomContract.View> ...@@ -146,8 +148,8 @@ public class RoomPresenter extends BasePresenter<RoomContract.View>
@Override @Override
public void onMarkAsRead() { public void onMarkAsRead() {
final Subscription subscription = roomRepository.getById(roomId) final Disposable subscription = roomRepository.getById(roomId)
.first() .firstElement()
.filter(room -> room != null && room.isAlert()) .filter(room -> room != null && room.isAlert())
.subscribeOn(AndroidSchedulers.from(BackgroundLooper.get())) .subscribeOn(AndroidSchedulers.from(BackgroundLooper.get()))
.observeOn(AndroidSchedulers.mainThread()) .observeOn(AndroidSchedulers.mainThread())
...@@ -160,7 +162,7 @@ public class RoomPresenter extends BasePresenter<RoomContract.View> ...@@ -160,7 +162,7 @@ public class RoomPresenter extends BasePresenter<RoomContract.View>
} }
private void getRoomInfo() { private void getRoomInfo() {
final Subscription subscription = roomRepository.getById(roomId) final Disposable subscription = roomRepository.getById(roomId)
.distinctUntilChanged() .distinctUntilChanged()
.subscribeOn(AndroidSchedulers.from(BackgroundLooper.get())) .subscribeOn(AndroidSchedulers.from(BackgroundLooper.get()))
.observeOn(AndroidSchedulers.mainThread()) .observeOn(AndroidSchedulers.mainThread())
...@@ -172,7 +174,7 @@ public class RoomPresenter extends BasePresenter<RoomContract.View> ...@@ -172,7 +174,7 @@ public class RoomPresenter extends BasePresenter<RoomContract.View>
} }
private void getRoomHistoryStateInfo() { private void getRoomHistoryStateInfo() {
final Subscription subscription = roomRepository.getHistoryStateByRoomId(roomId) final Disposable subscription = roomRepository.getHistoryStateByRoomId(roomId)
.distinctUntilChanged() .distinctUntilChanged()
.subscribeOn(AndroidSchedulers.from(BackgroundLooper.get())) .subscribeOn(AndroidSchedulers.from(BackgroundLooper.get()))
.observeOn(AndroidSchedulers.mainThread()) .observeOn(AndroidSchedulers.mainThread())
...@@ -190,8 +192,7 @@ public class RoomPresenter extends BasePresenter<RoomContract.View> ...@@ -190,8 +192,7 @@ public class RoomPresenter extends BasePresenter<RoomContract.View>
} }
private void getMessages() { private void getMessages() {
final Subscription subscription = roomRepository.getById(roomId) final Disposable subscription = roomRepository.getById(roomId)
.first()
.flatMap(messageInteractor::getAllFrom) .flatMap(messageInteractor::getAllFrom)
.subscribeOn(AndroidSchedulers.from(BackgroundLooper.get())) .subscribeOn(AndroidSchedulers.from(BackgroundLooper.get()))
.observeOn(AndroidSchedulers.mainThread()) .observeOn(AndroidSchedulers.mainThread())
...@@ -204,8 +205,9 @@ public class RoomPresenter extends BasePresenter<RoomContract.View> ...@@ -204,8 +205,9 @@ public class RoomPresenter extends BasePresenter<RoomContract.View>
return Single.zip( return Single.zip(
getSingleRoom(), getSingleRoom(),
userRepository.getCurrent() userRepository.getCurrent()
.filter(user -> user != null) .filter(Optional::isPresent)
.first() .map(Optional::get)
.firstElement()
.toSingle(), .toSingle(),
Pair::new Pair::new
); );
...@@ -213,7 +215,7 @@ public class RoomPresenter extends BasePresenter<RoomContract.View> ...@@ -213,7 +215,7 @@ public class RoomPresenter extends BasePresenter<RoomContract.View>
private Single<Room> getSingleRoom() { private Single<Room> getSingleRoom() {
return roomRepository.getById(roomId) return roomRepository.getById(roomId)
.first() .firstElement()
.toSingle(); .toSingle();
} }
} }
...@@ -8,7 +8,6 @@ import android.widget.CompoundButton; ...@@ -8,7 +8,6 @@ import android.widget.CompoundButton;
import android.widget.ImageView; import android.widget.ImageView;
import android.widget.LinearLayout; import android.widget.LinearLayout;
import android.widget.TextView; import android.widget.TextView;
import com.jakewharton.rxbinding.view.RxView;
import com.jakewharton.rxbinding.widget.RxCompoundButton; import com.jakewharton.rxbinding.widget.RxCompoundButton;
import java.util.List; import java.util.List;
...@@ -29,6 +28,7 @@ import chat.rocket.android.renderer.UserRenderer; ...@@ -29,6 +28,7 @@ import chat.rocket.android.renderer.UserRenderer;
import chat.rocket.persistence.realm.repositories.RealmRoomRepository; import chat.rocket.persistence.realm.repositories.RealmRoomRepository;
import chat.rocket.persistence.realm.repositories.RealmUserRepository; import chat.rocket.persistence.realm.repositories.RealmUserRepository;
import chat.rocket.android.widget.RocketChatAvatar; import chat.rocket.android.widget.RocketChatAvatar;
import hu.akarnokd.rxjava.interop.RxJavaInterop;
public class SidebarMainFragment extends AbstractFragment implements SidebarMainContract.View { public class SidebarMainFragment extends AbstractFragment implements SidebarMainContract.View {
...@@ -115,9 +115,12 @@ public class SidebarMainFragment extends AbstractFragment implements SidebarMain ...@@ -115,9 +115,12 @@ public class SidebarMainFragment extends AbstractFragment implements SidebarMain
rootView.findViewById(R.id.user_info_container).setOnClickListener(view -> { rootView.findViewById(R.id.user_info_container).setOnClickListener(view -> {
toggleUserAction.toggle(); toggleUserAction.toggle();
}); });
RxCompoundButton.checkedChanges(toggleUserAction) RxJavaInterop.toV2Flowable(RxCompoundButton.checkedChanges(toggleUserAction))
.compose(bindToLifecycle()) .compose(bindToLifecycle())
.subscribe(RxView.visibility(rootView.findViewById(R.id.user_action_outer_container))); .subscribe(aBoolean -> {
rootView.findViewById(R.id.user_action_outer_container)
.setVisibility(aBoolean ? View.VISIBLE : View.GONE);
});
} }
private void setupUserStatusButtons() { private void setupUserStatusButtons() {
......
...@@ -2,6 +2,9 @@ package chat.rocket.android.fragment.sidebar; ...@@ -2,6 +2,9 @@ package chat.rocket.android.fragment.sidebar;
import android.support.annotation.NonNull; import android.support.annotation.NonNull;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.disposables.Disposable;
import chat.rocket.android.BackgroundLooper; import chat.rocket.android.BackgroundLooper;
import chat.rocket.android.api.MethodCallHelper; import chat.rocket.android.api.MethodCallHelper;
import chat.rocket.android.helper.LogIfError; import chat.rocket.android.helper.LogIfError;
...@@ -10,8 +13,6 @@ import chat.rocket.android.shared.BasePresenter; ...@@ -10,8 +13,6 @@ import chat.rocket.android.shared.BasePresenter;
import chat.rocket.core.interactors.RoomInteractor; import chat.rocket.core.interactors.RoomInteractor;
import chat.rocket.core.models.User; import chat.rocket.core.models.User;
import chat.rocket.core.repositories.UserRepository; import chat.rocket.core.repositories.UserRepository;
import rx.Subscription;
import rx.android.schedulers.AndroidSchedulers;
public class SidebarMainPresenter extends BasePresenter<SidebarMainContract.View> public class SidebarMainPresenter extends BasePresenter<SidebarMainContract.View>
implements SidebarMainContract.Presenter { implements SidebarMainContract.Presenter {
...@@ -72,7 +73,7 @@ public class SidebarMainPresenter extends BasePresenter<SidebarMainContract.View ...@@ -72,7 +73,7 @@ public class SidebarMainPresenter extends BasePresenter<SidebarMainContract.View
} }
private void subscribeToRooms() { private void subscribeToRooms() {
final Subscription subscription = roomInteractor.getOpenRooms() final Disposable subscription = roomInteractor.getOpenRooms()
.distinctUntilChanged() .distinctUntilChanged()
.subscribeOn(AndroidSchedulers.from(BackgroundLooper.get())) .subscribeOn(AndroidSchedulers.from(BackgroundLooper.get()))
.observeOn(AndroidSchedulers.mainThread()) .observeOn(AndroidSchedulers.mainThread())
...@@ -84,11 +85,11 @@ public class SidebarMainPresenter extends BasePresenter<SidebarMainContract.View ...@@ -84,11 +85,11 @@ public class SidebarMainPresenter extends BasePresenter<SidebarMainContract.View
} }
private void subscribeToUser() { private void subscribeToUser() {
final Subscription subscription = userRepository.getCurrent() final Disposable subscription = userRepository.getCurrent()
.distinctUntilChanged() .distinctUntilChanged()
.subscribeOn(AndroidSchedulers.from(BackgroundLooper.get())) .subscribeOn(AndroidSchedulers.from(BackgroundLooper.get()))
.observeOn(AndroidSchedulers.mainThread()) .observeOn(AndroidSchedulers.mainThread())
.subscribe(user -> view.showUser(user)); .subscribe(userOptional -> view.showUser(userOptional.orNull()));
addSubscription(subscription); addSubscription(subscription);
} }
......
...@@ -7,7 +7,8 @@ import android.support.annotation.NonNull; ...@@ -7,7 +7,8 @@ import android.support.annotation.NonNull;
import android.support.annotation.Nullable; import android.support.annotation.Nullable;
import android.view.View; import android.view.View;
import android.widget.Toast; import android.widget.Toast;
import com.trello.rxlifecycle.components.support.RxAppCompatDialogFragment;
import com.trello.rxlifecycle2.components.support.RxAppCompatDialogFragment;
import bolts.Task; import bolts.Task;
import chat.rocket.android.R; import chat.rocket.android.R;
......
...@@ -4,12 +4,12 @@ import android.os.Bundle; ...@@ -4,12 +4,12 @@ import android.os.Bundle;
import android.view.View; import android.view.View;
import android.widget.CompoundButton; import android.widget.CompoundButton;
import android.widget.TextView; import android.widget.TextView;
import com.jakewharton.rxbinding.view.RxView;
import com.jakewharton.rxbinding.widget.RxTextView; import com.jakewharton.rxbinding.widget.RxTextView;
import bolts.Task; import bolts.Task;
import chat.rocket.android.R; import chat.rocket.android.R;
import chat.rocket.android.helper.TextUtils; import chat.rocket.android.helper.TextUtils;
import hu.akarnokd.rxjava.interop.RxJavaInterop;
/** /**
* add Channel, add Private-group. * add Channel, add Private-group.
...@@ -36,10 +36,11 @@ public class AddChannelDialogFragment extends AbstractAddRoomDialogFragment { ...@@ -36,10 +36,11 @@ public class AddChannelDialogFragment extends AbstractAddRoomDialogFragment {
protected void onSetupDialog() { protected void onSetupDialog() {
View buttonAddChannel = getDialog().findViewById(R.id.btn_add_channel); View buttonAddChannel = getDialog().findViewById(R.id.btn_add_channel);
RxTextView.textChanges((TextView) getDialog().findViewById(R.id.editor_channel_name)) RxJavaInterop.toV2Flowable(
RxTextView.textChanges((TextView) getDialog().findViewById(R.id.editor_channel_name)))
.map(text -> !TextUtils.isEmpty(text)) .map(text -> !TextUtils.isEmpty(text))
.compose(bindToLifecycle()) .compose(bindToLifecycle())
.subscribe(RxView.enabled(buttonAddChannel)); .subscribe(buttonAddChannel::setEnabled);
buttonAddChannel.setOnClickListener(view -> createRoom()); buttonAddChannel.setOnClickListener(view -> createRoom());
} }
......
...@@ -4,7 +4,6 @@ import android.os.Bundle; ...@@ -4,7 +4,6 @@ import android.os.Bundle;
import android.view.View; import android.view.View;
import android.widget.AutoCompleteTextView; import android.widget.AutoCompleteTextView;
import android.widget.TextView; import android.widget.TextView;
import com.jakewharton.rxbinding.view.RxView;
import com.jakewharton.rxbinding.widget.RxTextView; import com.jakewharton.rxbinding.widget.RxTextView;
import io.realm.Case; import io.realm.Case;
...@@ -14,6 +13,7 @@ import chat.rocket.android.helper.TextUtils; ...@@ -14,6 +13,7 @@ import chat.rocket.android.helper.TextUtils;
import chat.rocket.android.layouthelper.sidebar.dialog.SuggestUserAdapter; import chat.rocket.android.layouthelper.sidebar.dialog.SuggestUserAdapter;
import chat.rocket.persistence.realm.models.ddp.RealmUser; import chat.rocket.persistence.realm.models.ddp.RealmUser;
import chat.rocket.persistence.realm.RealmAutoCompleteAdapter; import chat.rocket.persistence.realm.RealmAutoCompleteAdapter;
import hu.akarnokd.rxjava.interop.RxJavaInterop;
/** /**
* add Direct RealmMessage. * add Direct RealmMessage.
...@@ -39,17 +39,18 @@ public class AddDirectMessageDialogFragment extends AbstractAddRoomDialogFragmen ...@@ -39,17 +39,18 @@ public class AddDirectMessageDialogFragment extends AbstractAddRoomDialogFragmen
AutoCompleteTextView autoCompleteTextView = AutoCompleteTextView autoCompleteTextView =
(AutoCompleteTextView) getDialog().findViewById(R.id.editor_username); (AutoCompleteTextView) getDialog().findViewById(R.id.editor_username);
RealmAutoCompleteAdapter<RealmUser> adapter = realmHelper.createAutoCompleteAdapter(getContext(), RealmAutoCompleteAdapter<RealmUser> adapter =
realmHelper.createAutoCompleteAdapter(getContext(),
(realm, text) -> realm.where(RealmUser.class) (realm, text) -> realm.where(RealmUser.class)
.contains(RealmUser.USERNAME, text, Case.INSENSITIVE) .contains(RealmUser.USERNAME, text, Case.INSENSITIVE)
.findAllSorted(RealmUser.USERNAME), .findAllSorted(RealmUser.USERNAME),
context -> new SuggestUserAdapter(context, hostname)); context -> new SuggestUserAdapter(context, hostname));
autoCompleteTextView.setAdapter(adapter); autoCompleteTextView.setAdapter(adapter);
RxTextView.textChanges(autoCompleteTextView) RxJavaInterop.toV2Flowable(RxTextView.textChanges(autoCompleteTextView))
.map(text -> !TextUtils.isEmpty(text)) .map(text -> !TextUtils.isEmpty(text))
.compose(bindToLifecycle()) .compose(bindToLifecycle())
.subscribe(RxView.enabled(buttonAddDirectMessage)); .subscribe(buttonAddDirectMessage::setEnabled);
buttonAddDirectMessage.setOnClickListener(view -> createRoom()); buttonAddDirectMessage.setOnClickListener(view -> createRoom());
} }
......
...@@ -2,8 +2,9 @@ package chat.rocket.android.helper; ...@@ -2,8 +2,9 @@ package chat.rocket.android.helper;
import android.support.annotation.NonNull; import android.support.annotation.NonNull;
import io.reactivex.Flowable;
import chat.rocket.android.api.rest.ServerPolicyApi; import chat.rocket.android.api.rest.ServerPolicyApi;
import rx.Observable;
public class ServerPolicyApiValidationHelper { public class ServerPolicyApiValidationHelper {
...@@ -13,7 +14,7 @@ public class ServerPolicyApiValidationHelper { ...@@ -13,7 +14,7 @@ public class ServerPolicyApiValidationHelper {
this.serverPolicyApi = serverPolicyApi; this.serverPolicyApi = serverPolicyApi;
} }
public Observable<ServerPolicyHelper.ServerInfoResponse> getApiVersion() { public Flowable<ServerPolicyHelper.ServerInfoResponse> getApiVersion() {
return serverPolicyApi.getApiInfoSecurely() return serverPolicyApi.getApiInfoSecurely()
.onErrorResumeNext(serverPolicyApi.getApiInfoInsecurely()) .onErrorResumeNext(serverPolicyApi.getApiInfoInsecurely())
.map(response -> new ServerPolicyHelper.ServerInfoResponse( .map(response -> new ServerPolicyHelper.ServerInfoResponse(
......
...@@ -2,10 +2,9 @@ package chat.rocket.android.helper; ...@@ -2,10 +2,9 @@ package chat.rocket.android.helper;
import android.support.annotation.NonNull; import android.support.annotation.NonNull;
import io.reactivex.Flowable;
import org.json.JSONObject; import org.json.JSONObject;
import rx.Observable;
public class ServerPolicyHelper { public class ServerPolicyHelper {
private static final String DEFAULT_HOST = ".rocket.chat"; private static final String DEFAULT_HOST = ".rocket.chat";
...@@ -19,7 +18,7 @@ public class ServerPolicyHelper { ...@@ -19,7 +18,7 @@ public class ServerPolicyHelper {
return removeTrailingSlash(removeProtocol(enforceDefaultHost(hostname))); return removeTrailingSlash(removeProtocol(enforceDefaultHost(hostname)));
} }
public static Observable<ServerValidation> isApiVersionValid( public static Flowable<ServerValidation> isApiVersionValid(
@NonNull ServerPolicyApiValidationHelper serverPolicyApiValidationHelper) { @NonNull ServerPolicyApiValidationHelper serverPolicyApiValidationHelper) {
return serverPolicyApiValidationHelper.getApiVersion() return serverPolicyApiValidationHelper.getApiVersion()
.map(serverInfo -> .map(serverInfo ->
......
...@@ -2,6 +2,7 @@ package chat.rocket.android.service.ddp; ...@@ -2,6 +2,7 @@ package chat.rocket.android.service.ddp;
import android.content.Context; import android.content.Context;
import android.text.TextUtils; import android.text.TextUtils;
import io.reactivex.disposables.Disposable;
import io.realm.Realm; import io.realm.Realm;
import io.realm.RealmObject; import io.realm.RealmObject;
import org.json.JSONArray; import org.json.JSONArray;
...@@ -15,7 +16,6 @@ import chat.rocket.persistence.realm.RealmHelper; ...@@ -15,7 +16,6 @@ import chat.rocket.persistence.realm.RealmHelper;
import chat.rocket.android.service.DDPClientRef; import chat.rocket.android.service.DDPClientRef;
import chat.rocket.android.service.Registrable; import chat.rocket.android.service.Registrable;
import chat.rocket.android_ddp.DDPSubscription; import chat.rocket.android_ddp.DDPSubscription;
import rx.Subscription;
public abstract class AbstractDDPDocEventSubscriber implements Registrable { public abstract class AbstractDDPDocEventSubscriber implements Registrable {
protected final Context context; protected final Context context;
...@@ -24,7 +24,7 @@ public abstract class AbstractDDPDocEventSubscriber implements Registrable { ...@@ -24,7 +24,7 @@ public abstract class AbstractDDPDocEventSubscriber implements Registrable {
protected final DDPClientRef ddpClientRef; protected final DDPClientRef ddpClientRef;
private boolean isUnsubscribed; private boolean isUnsubscribed;
private String subscriptionId; private String subscriptionId;
private Subscription rxSubscription; private Disposable rxSubscription;
protected AbstractDDPDocEventSubscriber(Context context, String hostname, protected AbstractDDPDocEventSubscriber(Context context, String hostname,
RealmHelper realmHelper, DDPClientRef ddpClientRef) { RealmHelper realmHelper, DDPClientRef ddpClientRef) {
...@@ -94,7 +94,7 @@ public abstract class AbstractDDPDocEventSubscriber implements Registrable { ...@@ -94,7 +94,7 @@ public abstract class AbstractDDPDocEventSubscriber implements Registrable {
onRegister(); onRegister();
} }
protected Subscription subscribe() { protected Disposable subscribe() {
return ddpClientRef.get().getSubscriptionCallback() return ddpClientRef.get().getSubscriptionCallback()
.filter(event -> event instanceof DDPSubscription.DocEvent) .filter(event -> event instanceof DDPSubscription.DocEvent)
.cast(DDPSubscription.DocEvent.class) .cast(DDPSubscription.DocEvent.class)
...@@ -179,7 +179,7 @@ public abstract class AbstractDDPDocEventSubscriber implements Registrable { ...@@ -179,7 +179,7 @@ public abstract class AbstractDDPDocEventSubscriber implements Registrable {
isUnsubscribed = true; isUnsubscribed = true;
onUnregister(); onUnregister();
if (rxSubscription != null) { if (rxSubscription != null) {
rxSubscription.unsubscribe(); rxSubscription.dispose();
} }
if (!TextUtils.isEmpty(subscriptionId)) { if (!TextUtils.isEmpty(subscriptionId)) {
ddpClientRef.get().unsubscribe(subscriptionId).continueWith(new LogIfError()); ddpClientRef.get().unsubscribe(subscriptionId).continueWith(new LogIfError());
......
...@@ -2,14 +2,14 @@ package chat.rocket.android.shared; ...@@ -2,14 +2,14 @@ package chat.rocket.android.shared;
import android.support.annotation.NonNull; import android.support.annotation.NonNull;
import rx.Subscription; import io.reactivex.disposables.CompositeDisposable;
import rx.subscriptions.CompositeSubscription; import io.reactivex.disposables.Disposable;
public abstract class BasePresenter<T extends BaseContract.View> public abstract class BasePresenter<T extends BaseContract.View>
implements BaseContract.Presenter<T> { implements BaseContract.Presenter<T> {
protected T view; protected T view;
private CompositeSubscription compositeSubscription = new CompositeSubscription(); private CompositeDisposable compositeSubscription = new CompositeDisposable();
@Override @Override
public void bindView(@NonNull T view) { public void bindView(@NonNull T view) {
...@@ -22,11 +22,11 @@ public abstract class BasePresenter<T extends BaseContract.View> ...@@ -22,11 +22,11 @@ public abstract class BasePresenter<T extends BaseContract.View>
view = null; view = null;
} }
protected void addSubscription(Subscription subscription) { protected void addSubscription(Disposable subscription) {
compositeSubscription.add(subscription); compositeSubscription.add(subscription);
} }
protected void clearSubscripions() { protected void clearSubscriptions() {
compositeSubscription.clear(); compositeSubscription.clear();
} }
} }
ext { ext {
androidPlugin = 'com.android.tools.build:gradle:2.2.3'
realmPlugin = 'io.realm:realm-gradle-plugin:2.2.1'
retroLambdaPlugin = 'me.tatarka:gradle-retrolambda:3.3.1'
retroLambdaPatch = 'me.tatarka.retrolambda.projectlombok:lombok.ast:0.2.3.a2'
compileSdkVersion = 25
buildToolsVersion = '25.0.1'
minSdkVersion = 16
supportVersion = '25.0.1'
supportAnnotations = "com.android.support:support-annotations:$supportVersion"
supportRecyclerView = "com.android.support:recyclerview-v7:$supportVersion"
supportAppCompat = "com.android.support:appcompat-v7:$supportVersion"
supportV13 = "com.android.support:support-v13:$supportVersion"
supportDesign = "com.android.support:design:$supportVersion"
frescoVersion = '1.0.1'
frescoBase = "com.facebook.fresco:fresco:$frescoVersion"
frescoAnimatedGif = "com.facebook.fresco:animated-gif:$frescoVersion"
frescoAnimatedWebp = "com.facebook.fresco:animated-webp:$frescoVersion"
frescoWebp = "com.facebook.fresco:webpsupport:$frescoVersion"
frescoImagePipelineOkHttp3 = "com.facebook.fresco:imagepipeline-okhttp3:$frescoVersion"
rxJava = 'io.reactivex:rxjava:1.2.3'
boltsTask = 'com.parse.bolts:bolts-tasks:1.4.0'
okhttp3 = 'com.squareup.okhttp3:okhttp:3.5.0'
textDrawable = 'com.amulyakhare:com.amulyakhare.textdrawable:1.0.1'
preDexLibs = !"true".equals(System.getenv("CI")) preDexLibs = !"true".equals(System.getenv("CI"))
} }
subprojects { project ->
project.configurations.all {
resolutionStrategy {
eachDependency { details ->
if (details.requested.group == 'com.android.support'
&& details.requested.name.indexOf("multidex") == -1) {
details.useVersion(rootProject.ext.supportVersion)
}
}
}
}
}
subprojects { subprojects {
project.plugins.whenPluginAdded { plugin -> project.plugins.whenPluginAdded { plugin ->
if ("com.android.build.gradle.AppPlugin".equals(plugin.class.name)) { if ("com.android.build.gradle.AppPlugin".equals(plugin.class.name)) {
......
...@@ -5,17 +5,17 @@ buildscript { ...@@ -5,17 +5,17 @@ buildscript {
jcenter() jcenter()
} }
dependencies { dependencies {
classpath rootProject.ext.androidPlugin classpath 'com.android.tools.build:gradle:2.2.3'
} }
} }
android { android {
compileSdkVersion rootProject.ext.compileSdkVersion compileSdkVersion 25
buildToolsVersion rootProject.ext.buildToolsVersion buildToolsVersion '25.0.2'
defaultConfig { defaultConfig {
minSdkVersion rootProject.ext.minSdkVersion minSdkVersion 16
targetSdkVersion rootProject.ext.compileSdkVersion targetSdkVersion 25
versionCode 1 versionCode 1
versionName "1" versionName "1"
} }
...@@ -26,7 +26,3 @@ android { ...@@ -26,7 +26,3 @@ android {
} }
} }
} }
dependencies {
compile rootProject.ext.supportAnnotations
}
\ No newline at end of file
...@@ -8,25 +8,25 @@ buildscript { ...@@ -8,25 +8,25 @@ buildscript {
jcenter() jcenter()
} }
dependencies { dependencies {
classpath rootProject.ext.androidPlugin classpath 'com.android.tools.build:gradle:2.2.3'
classpath rootProject.ext.realmPlugin classpath 'io.realm:realm-gradle-plugin:2.3.1'
classpath rootProject.ext.retroLambdaPlugin classpath 'me.tatarka:gradle-retrolambda:3.5.0'
classpath rootProject.ext.retroLambdaPatch classpath 'me.tatarka.retrolambda.projectlombok:lombok.ast:0.2.3.a2'
classpath 'com.jakewharton.hugo:hugo-plugin:1.2.1' classpath 'com.jakewharton.hugo:hugo-plugin:1.2.1'
} }
} }
android { android {
compileSdkVersion rootProject.ext.compileSdkVersion compileSdkVersion 25
buildToolsVersion rootProject.ext.buildToolsVersion buildToolsVersion '25.0.2'
compileOptions { compileOptions {
sourceCompatibility JavaVersion.VERSION_1_8 sourceCompatibility JavaVersion.VERSION_1_8
targetCompatibility JavaVersion.VERSION_1_8 targetCompatibility JavaVersion.VERSION_1_8
} }
defaultConfig { defaultConfig {
minSdkVersion rootProject.ext.minSdkVersion minSdkVersion 16
targetSdkVersion rootProject.ext.compileSdkVersion targetSdkVersion 25
versionCode 1 versionCode 1
versionName "1" versionName "1"
} }
...@@ -38,14 +38,24 @@ android { ...@@ -38,14 +38,24 @@ android {
} }
} }
ext {
supportVersion = '25.1.1'
}
dependencies { dependencies {
testCompile 'junit:junit:4.12'
compile project(':log-wrapper') compile project(':log-wrapper')
compile project(':rocket-chat-core') compile project(':rocket-chat-core')
compile 'io.reactivex:rxjava:1.2.3'
compile 'io.reactivex:rxandroid:1.2.1' compile "com.android.support:support-annotations:$supportVersion"
compile rootProject.ext.boltsTask compile "com.android.support:appcompat-v7:$supportVersion"
compile rootProject.ext.supportAnnotations compile "com.android.support:design:$supportVersion"
compile rootProject.ext.supportAppCompat
compile rootProject.ext.supportDesign compile 'io.reactivex.rxjava2:rxjava:2.0.6'
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
compile 'com.github.akarnokd:rxjava2-interop:0.9.1'
compile 'com.parse.bolts:bolts-tasks:1.4.0'
testCompile 'junit:junit:4.12'
} }
package chat.rocket.persistence.realm.repositories; package chat.rocket.persistence.realm.repositories;
import android.os.Looper; import android.os.Looper;
import android.support.v4.util.Pair;
import com.fernandocejas.arrow.optional.Optional;
import io.reactivex.Flowable;
import io.reactivex.Single;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.realm.Realm; import io.realm.Realm;
import io.realm.RealmResults; import io.realm.RealmResults;
import io.realm.Sort; import io.realm.Sort;
...@@ -14,9 +19,7 @@ import chat.rocket.core.repositories.MessageRepository; ...@@ -14,9 +19,7 @@ import chat.rocket.core.repositories.MessageRepository;
import chat.rocket.persistence.realm.RealmStore; import chat.rocket.persistence.realm.RealmStore;
import chat.rocket.persistence.realm.models.ddp.RealmMessage; import chat.rocket.persistence.realm.models.ddp.RealmMessage;
import chat.rocket.persistence.realm.models.ddp.RealmUser; import chat.rocket.persistence.realm.models.ddp.RealmUser;
import rx.Observable; import hu.akarnokd.rxjava.interop.RxJavaInterop;
import rx.Single;
import rx.android.schedulers.AndroidSchedulers;
public class RealmMessageRepository extends RealmRepository implements MessageRepository { public class RealmMessageRepository extends RealmRepository implements MessageRepository {
...@@ -27,34 +30,28 @@ public class RealmMessageRepository extends RealmRepository implements MessageRe ...@@ -27,34 +30,28 @@ public class RealmMessageRepository extends RealmRepository implements MessageRe
} }
@Override @Override
public Single<Message> getById(String messageId) { public Single<Optional<Message>> getById(String messageId) {
return Single.defer(() -> { return Single.defer(() -> Flowable.using(
final Realm realm = RealmStore.getRealm(hostname); () -> new Pair<>(RealmStore.getRealm(hostname), Looper.myLooper()),
final Looper looper = Looper.myLooper(); pair -> RxJavaInterop.toV2Flowable(
pair.first.where(RealmMessage.class)
if (realm == null || looper == null) {
return Single.just(null);
}
final RealmMessage realmMessage = realm.where(RealmMessage.class)
.equalTo(RealmMessage.ID, messageId) .equalTo(RealmMessage.ID, messageId)
.findFirst(); .findAll()
.<RealmResults<RealmMessage>>asObservable()),
if (realmMessage == null) { pair -> close(pair.first, pair.second)
realm.close(); )
return Single.just(null); .unsubscribeOn(AndroidSchedulers.from(Looper.myLooper()))
}
return realmMessage
.<RealmMessage>asObservable()
.unsubscribeOn(AndroidSchedulers.from(looper))
.doOnUnsubscribe(() -> close(realm, looper))
.filter(it -> it != null && it.isLoaded() .filter(it -> it != null && it.isLoaded()
&& it.isValid()) && it.isValid())
.first() .map(realmMessages -> {
.toSingle() if (realmMessages.size() > 0) {
.map(RealmMessage::asMessage); return Optional.of(realmMessages.get(0).asMessage());
}); }
return Optional.<Message>absent();
})
.firstElement()
.toSingle());
} }
@Override @Override
...@@ -92,13 +89,13 @@ public class RealmMessageRepository extends RealmRepository implements MessageRe ...@@ -92,13 +89,13 @@ public class RealmMessageRepository extends RealmRepository implements MessageRe
realm.beginTransaction(); realm.beginTransaction();
return realm.copyToRealmOrUpdate(realmMessage) return RxJavaInterop.toV2Flowable(realm.copyToRealmOrUpdate(realmMessage)
.asObservable() .asObservable())
.unsubscribeOn(AndroidSchedulers.from(looper))
.doOnUnsubscribe(() -> close(realm, looper))
.filter(it -> it != null && it.isLoaded() && it.isValid()) .filter(it -> it != null && it.isLoaded() && it.isValid())
.first() .firstElement()
.doOnNext(it -> realm.commitTransaction()) .doOnSuccess(it -> realm.commitTransaction())
.doOnError(throwable -> realm.cancelTransaction())
.doOnEvent((realmObject, throwable) -> close(realm, looper))
.toSingle() .toSingle()
.map(realmObject -> true); .map(realmObject -> true);
}); });
...@@ -116,71 +113,58 @@ public class RealmMessageRepository extends RealmRepository implements MessageRe ...@@ -116,71 +113,58 @@ public class RealmMessageRepository extends RealmRepository implements MessageRe
realm.beginTransaction(); realm.beginTransaction();
return realm.where(RealmMessage.class) return RxJavaInterop.toV2Flowable(realm.where(RealmMessage.class)
.equalTo(RealmMessage.ID, message.getId()) .equalTo(RealmMessage.ID, message.getId())
.findAll() .findAll()
.<RealmResults<RealmMessage>>asObservable() .<RealmResults<RealmMessage>>asObservable())
.unsubscribeOn(AndroidSchedulers.from(looper))
.doOnUnsubscribe(() -> close(realm, looper))
.filter(realmObject -> realmObject != null .filter(realmObject -> realmObject != null
&& realmObject.isLoaded() && realmObject.isValid()) && realmObject.isLoaded() && realmObject.isValid())
.first() .firstElement()
.toSingle() .toSingle()
.flatMap(realmMessages -> Single.just(realmMessages.deleteAllFromRealm())) .flatMap(realmMessages -> Single.just(realmMessages.deleteAllFromRealm()))
.doOnEach(notification -> { .doOnEvent((success, throwable) -> {
if (notification.getValue()) { if (success) {
realm.commitTransaction(); realm.commitTransaction();
} else { } else {
realm.cancelTransaction(); realm.cancelTransaction();
} }
close(realm, looper);
}); });
}); });
} }
@Override @Override
public Observable<List<Message>> getAllFrom(Room room) { public Flowable<List<Message>> getAllFrom(Room room) {
return Observable.defer(() -> { return Flowable.defer(() -> Flowable.using(
final Realm realm = RealmStore.getRealm(hostname); () -> new Pair<>(RealmStore.getRealm(hostname), Looper.myLooper()),
final Looper looper = Looper.myLooper(); pair -> RxJavaInterop.toV2Flowable(pair.first.where(RealmMessage.class)
if (realm == null || looper == null) {
return Observable.just(null);
}
return realm.where(RealmMessage.class)
.equalTo(RealmMessage.ROOM_ID, room.getRoomId()) .equalTo(RealmMessage.ROOM_ID, room.getRoomId())
.findAllSorted(RealmMessage.TIMESTAMP, Sort.DESCENDING) .findAllSorted(RealmMessage.TIMESTAMP, Sort.DESCENDING)
.asObservable() .asObservable()),
.unsubscribeOn(AndroidSchedulers.from(looper)) pair -> close(pair.first, pair.second)
.doOnUnsubscribe(() -> close(realm, looper)) )
.unsubscribeOn(AndroidSchedulers.from(Looper.myLooper()))
.filter(it -> it != null .filter(it -> it != null
&& it.isLoaded() && it.isValid()) && it.isLoaded() && it.isValid())
.map(this::toList); .map(this::toList));
});
} }
@Override @Override
public Single<Integer> unreadCountFor(Room room, User user) { public Single<Integer> unreadCountFor(Room room, User user) {
return Single.defer(() -> { return Single.defer(() -> Flowable.using(
final Realm realm = RealmStore.getRealm(hostname); () -> new Pair<>(RealmStore.getRealm(hostname), Looper.myLooper()),
final Looper looper = Looper.myLooper(); pair -> RxJavaInterop.toV2Flowable(pair.first.where(RealmMessage.class)
if (realm == null || looper == null) {
return Single.just(0);
}
return realm.where(RealmMessage.class)
.equalTo(RealmMessage.ROOM_ID, room.getId()) .equalTo(RealmMessage.ROOM_ID, room.getId())
.greaterThanOrEqualTo(RealmMessage.TIMESTAMP, room.getLastSeen()) .greaterThanOrEqualTo(RealmMessage.TIMESTAMP, room.getLastSeen())
.notEqualTo(RealmMessage.USER_ID, user.getId()) .notEqualTo(RealmMessage.USER_ID, user.getId())
.findAll() .findAll()
.asObservable() .asObservable()),
.unsubscribeOn(AndroidSchedulers.from(looper)) pair -> close(pair.first, pair.second)
.doOnUnsubscribe(() -> close(realm, looper)) )
.unsubscribeOn(AndroidSchedulers.from(Looper.myLooper()))
.map(RealmResults::size) .map(RealmResults::size)
.first() .firstElement()
.toSingle(); .toSingle());
});
} }
private List<Message> toList(RealmResults<RealmMessage> realmMessages) { private List<Message> toList(RealmResults<RealmMessage> realmMessages) {
......
...@@ -7,6 +7,9 @@ import io.realm.Realm; ...@@ -7,6 +7,9 @@ import io.realm.Realm;
public class RealmRepository { public class RealmRepository {
protected void close(Realm realm, Looper looper) { protected void close(Realm realm, Looper looper) {
if (realm == null || looper == null) {
return;
}
new Handler(looper).post(realm::close); new Handler(looper).post(realm::close);
} }
} }
package chat.rocket.persistence.realm.repositories; package chat.rocket.persistence.realm.repositories;
import android.os.Looper; import android.os.Looper;
import android.support.v4.util.Pair;
import io.reactivex.Flowable;
import io.reactivex.Single;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.realm.Realm; import io.realm.Realm;
import io.realm.RealmResults; import io.realm.RealmResults;
...@@ -12,9 +16,7 @@ import chat.rocket.core.repositories.RoomRepository; ...@@ -12,9 +16,7 @@ import chat.rocket.core.repositories.RoomRepository;
import chat.rocket.persistence.realm.RealmStore; import chat.rocket.persistence.realm.RealmStore;
import chat.rocket.persistence.realm.models.ddp.RealmRoom; import chat.rocket.persistence.realm.models.ddp.RealmRoom;
import chat.rocket.persistence.realm.models.internal.LoadMessageProcedure; import chat.rocket.persistence.realm.models.internal.LoadMessageProcedure;
import rx.Observable; import hu.akarnokd.rxjava.interop.RxJavaInterop;
import rx.Single;
import rx.android.schedulers.AndroidSchedulers;
public class RealmRoomRepository extends RealmRepository implements RoomRepository { public class RealmRoomRepository extends RealmRepository implements RoomRepository {
...@@ -25,68 +27,53 @@ public class RealmRoomRepository extends RealmRepository implements RoomReposito ...@@ -25,68 +27,53 @@ public class RealmRoomRepository extends RealmRepository implements RoomReposito
} }
@Override @Override
public Observable<List<Room>> getAll() { public Flowable<List<Room>> getAll() {
return Observable.defer(() -> { return Flowable.defer(() -> Flowable.using(
final Realm realm = RealmStore.getRealm(hostname); () -> new Pair<>(RealmStore.getRealm(hostname), Looper.myLooper()),
final Looper looper = Looper.myLooper(); pair -> RxJavaInterop.toV2Flowable(
pair.first.where(RealmRoom.class)
if (realm == null || looper == null) {
return Observable.just(null);
}
return realm.where(RealmRoom.class)
.findAll() .findAll()
.asObservable() .asObservable()),
.unsubscribeOn(AndroidSchedulers.from(looper)) pair -> close(pair.first, pair.second)
.doOnUnsubscribe(() -> close(realm, looper)) )
.unsubscribeOn(AndroidSchedulers.from(Looper.myLooper()))
.filter(roomSubscriptions -> roomSubscriptions != null && roomSubscriptions.isLoaded() .filter(roomSubscriptions -> roomSubscriptions != null && roomSubscriptions.isLoaded()
&& roomSubscriptions.isValid()) && roomSubscriptions.isValid())
.map(this::toList); .map(this::toList));
});
} }
@Override @Override
public Observable<Room> getById(String roomId) { public Flowable<Room> getById(String roomId) {
return Observable.defer(() -> { return Flowable.defer(() -> Flowable.using(
final Realm realm = RealmStore.getRealm(hostname); () -> new Pair<>(RealmStore.getRealm(hostname), Looper.myLooper()),
final Looper looper = Looper.myLooper(); pair -> RxJavaInterop.toV2Flowable(
pair.first.where(RealmRoom.class)
if (realm == null || looper == null) {
return Observable.just(null);
}
return realm.where(RealmRoom.class)
.equalTo(RealmRoom.ROOM_ID, roomId) .equalTo(RealmRoom.ROOM_ID, roomId)
.findFirst() .findFirst()
.<RealmRoom>asObservable() .<RealmRoom>asObservable()
.unsubscribeOn(AndroidSchedulers.from(looper))
.doOnUnsubscribe(() -> close(realm, looper))
.filter(roomSubscription -> roomSubscription != null && roomSubscription.isLoaded() .filter(roomSubscription -> roomSubscription != null && roomSubscription.isLoaded()
&& roomSubscription.isValid()) && roomSubscription.isValid())),
.map(RealmRoom::asRoom); pair -> close(pair.first, pair.second)
}); )
.unsubscribeOn(AndroidSchedulers.from(Looper.myLooper()))
.map(RealmRoom::asRoom));
} }
@Override @Override
public Observable<RoomHistoryState> getHistoryStateByRoomId(String roomId) { public Flowable<RoomHistoryState> getHistoryStateByRoomId(String roomId) {
return Observable.defer(() -> { return Flowable.defer(() -> Flowable.using(
final Realm realm = RealmStore.getRealm(hostname); () -> new Pair<>(RealmStore.getRealm(hostname), Looper.myLooper()),
final Looper looper = Looper.myLooper(); pair -> RxJavaInterop.toV2Flowable(
pair.first.where(LoadMessageProcedure.class)
if (realm == null || looper == null) {
return Observable.just(null);
}
return realm.where(LoadMessageProcedure.class)
.equalTo(LoadMessageProcedure.ID, roomId) .equalTo(LoadMessageProcedure.ID, roomId)
.findFirst() .findFirst()
.<LoadMessageProcedure>asObservable() .<LoadMessageProcedure>asObservable()
.unsubscribeOn(AndroidSchedulers.from(looper))
.doOnUnsubscribe(() -> close(realm, looper))
.filter(loadMessageProcedure -> loadMessageProcedure != null .filter(loadMessageProcedure -> loadMessageProcedure != null
&& loadMessageProcedure.isLoaded() && loadMessageProcedure.isValid()) && loadMessageProcedure.isLoaded() && loadMessageProcedure.isValid())),
.map(LoadMessageProcedure::asRoomHistoryState); pair -> close(pair.first, pair.second)
}); )
.unsubscribeOn(AndroidSchedulers.from(Looper.myLooper()))
.map(LoadMessageProcedure::asRoomHistoryState));
} }
@Override @Override
...@@ -109,14 +96,14 @@ public class RealmRoomRepository extends RealmRepository implements RoomReposito ...@@ -109,14 +96,14 @@ public class RealmRoomRepository extends RealmRepository implements RoomReposito
realm.beginTransaction(); realm.beginTransaction();
return realm.copyToRealmOrUpdate(loadMessage) return RxJavaInterop.toV2Flowable(realm.copyToRealmOrUpdate(loadMessage)
.asObservable() .asObservable())
.unsubscribeOn(AndroidSchedulers.from(looper))
.doOnUnsubscribe(() -> close(realm, looper))
.filter(realmObject -> realmObject != null .filter(realmObject -> realmObject != null
&& realmObject.isLoaded() && realmObject.isValid()) && realmObject.isLoaded() && realmObject.isValid())
.first() .firstElement()
.doOnNext(realmObject -> realm.commitTransaction()) .doOnSuccess(it -> realm.commitTransaction())
.doOnError(throwable -> realm.cancelTransaction())
.doOnEvent((realmObject, throwable) -> close(realm, looper))
.toSingle() .toSingle()
.map(realmObject -> true); .map(realmObject -> true);
}); });
......
package chat.rocket.persistence.realm.repositories; package chat.rocket.persistence.realm.repositories;
import android.os.Looper; import android.os.Looper;
import android.support.v4.util.Pair;
import com.fernandocejas.arrow.optional.Optional;
import io.reactivex.Flowable;
import io.reactivex.Single;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.realm.Realm; import io.realm.Realm;
import chat.rocket.core.models.Session; import chat.rocket.core.models.Session;
import chat.rocket.core.repositories.SessionRepository; import chat.rocket.core.repositories.SessionRepository;
import chat.rocket.persistence.realm.RealmStore; import chat.rocket.persistence.realm.RealmStore;
import chat.rocket.persistence.realm.models.internal.RealmSession; import chat.rocket.persistence.realm.models.internal.RealmSession;
import rx.Observable; import hu.akarnokd.rxjava.interop.RxJavaInterop;
import rx.Single;
import rx.android.schedulers.AndroidSchedulers;
public class RealmSessionRepository extends RealmRepository implements SessionRepository { public class RealmSessionRepository extends RealmRepository implements SessionRepository {
...@@ -20,29 +23,24 @@ public class RealmSessionRepository extends RealmRepository implements SessionRe ...@@ -20,29 +23,24 @@ public class RealmSessionRepository extends RealmRepository implements SessionRe
} }
@Override @Override
public Observable<Session> getById(int id) { public Flowable<Optional<Session>> getById(int id) {
return Observable.defer(() -> { return Flowable.defer(() -> Flowable.using(
final Realm realm = RealmStore.getRealm(hostname); () -> new Pair<>(RealmStore.getRealm(hostname), Looper.myLooper()),
final Looper looper = Looper.myLooper(); pair -> RxJavaInterop.toV2Flowable(
pair.first.where(RealmSession.class)
if (realm == null || looper == null) {
return Observable.just(null);
}
return realm.where(RealmSession.class)
.equalTo(RealmSession.ID, id) .equalTo(RealmSession.ID, id)
.findAll() .findAll()
.<RealmSession>asObservable() .<RealmSession>asObservable()),
.unsubscribeOn(AndroidSchedulers.from(looper)) pair -> close(pair.first, pair.second)
.doOnUnsubscribe(() -> close(realm, looper)) )
.unsubscribeOn(AndroidSchedulers.from(Looper.myLooper()))
.filter(it -> it != null && it.isLoaded() && it.isValid()) .filter(it -> it != null && it.isLoaded() && it.isValid())
.map(realmSessions -> { .map(realmSessions -> {
if (realmSessions.size() == 0) { if (realmSessions.size() == 0) {
return null; return Optional.absent();
} }
return realmSessions.get(0).asSession(); return Optional.of(realmSessions.get(0).asSession());
}); }));
});
} }
@Override @Override
...@@ -52,7 +50,7 @@ public class RealmSessionRepository extends RealmRepository implements SessionRe ...@@ -52,7 +50,7 @@ public class RealmSessionRepository extends RealmRepository implements SessionRe
final Looper looper = Looper.myLooper(); final Looper looper = Looper.myLooper();
if (realm == null || looper == null) { if (realm == null || looper == null) {
return Single.just(null); return Single.just(false);
} }
RealmSession realmSession = realm.where(RealmSession.class) RealmSession realmSession = realm.where(RealmSession.class)
...@@ -72,13 +70,13 @@ public class RealmSessionRepository extends RealmRepository implements SessionRe ...@@ -72,13 +70,13 @@ public class RealmSessionRepository extends RealmRepository implements SessionRe
realm.beginTransaction(); realm.beginTransaction();
return realm.copyToRealmOrUpdate(realmSession) return RxJavaInterop.toV2Flowable(realm.copyToRealmOrUpdate(realmSession)
.asObservable() .asObservable())
.unsubscribeOn(AndroidSchedulers.from(looper))
.doOnUnsubscribe(() -> close(realm, looper))
.filter(it -> it != null && it.isLoaded() && it.isValid()) .filter(it -> it != null && it.isLoaded() && it.isValid())
.first() .firstElement()
.doOnNext(it -> realm.commitTransaction()) .doOnSuccess(it -> realm.commitTransaction())
.doOnError(throwable -> realm.cancelTransaction())
.doOnEvent((realmObject, throwable) -> close(realm, looper))
.toSingle() .toSingle()
.map(realmObject -> true); .map(realmObject -> true);
}); });
......
package chat.rocket.persistence.realm.repositories; package chat.rocket.persistence.realm.repositories;
import android.os.Looper; import android.os.Looper;
import io.realm.Realm; import android.support.v4.util.Pair;
import com.fernandocejas.arrow.optional.Optional;
import io.reactivex.Flowable;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.realm.RealmResults;
import chat.rocket.core.models.User; import chat.rocket.core.models.User;
import chat.rocket.core.repositories.UserRepository; import chat.rocket.core.repositories.UserRepository;
import chat.rocket.persistence.realm.RealmStore; import chat.rocket.persistence.realm.RealmStore;
import chat.rocket.persistence.realm.models.ddp.RealmUser; import chat.rocket.persistence.realm.models.ddp.RealmUser;
import rx.Observable; import hu.akarnokd.rxjava.interop.RxJavaInterop;
import rx.android.schedulers.AndroidSchedulers;
public class RealmUserRepository extends RealmRepository implements UserRepository { public class RealmUserRepository extends RealmRepository implements UserRepository {
...@@ -19,30 +22,25 @@ public class RealmUserRepository extends RealmRepository implements UserReposito ...@@ -19,30 +22,25 @@ public class RealmUserRepository extends RealmRepository implements UserReposito
} }
@Override @Override
public Observable<User> getCurrent() { public Flowable<Optional<User>> getCurrent() {
return Observable.defer(() -> { return Flowable.defer(() -> Flowable.using(
final Realm realm = RealmStore.getRealm(hostname); () -> new Pair<>(RealmStore.getRealm(hostname), Looper.myLooper()),
final Looper looper = Looper.myLooper(); pair -> RxJavaInterop.toV2Flowable(
pair.first.where(RealmUser.class)
if (realm == null || looper == null) {
return Observable.just(null);
}
final RealmUser realmUser = realm.where(RealmUser.class)
.isNotEmpty(RealmUser.EMAILS) .isNotEmpty(RealmUser.EMAILS)
.findFirst(); .findAll()
.<RealmResults<RealmUser>>asObservable()),
if (realmUser == null) { pair -> close(pair.first, pair.second)
realm.close(); )
return Observable.just(null); .unsubscribeOn(AndroidSchedulers.from(Looper.myLooper()))
.filter(it -> it != null && it.isLoaded()
&& it.isValid())
.map(realmUsers -> {
if (realmUsers.size() > 0) {
return Optional.of(realmUsers.get(0).asUser());
} }
return realmUser return Optional.<User>absent();
.<RealmUser>asObservable() }));
.unsubscribeOn(AndroidSchedulers.from(looper))
.doOnUnsubscribe(() -> close(realm, looper))
.filter(it -> it != null && it.isLoaded() && it.isValid())
.map(RealmUser::asUser);
});
} }
} }
...@@ -5,17 +5,17 @@ buildscript { ...@@ -5,17 +5,17 @@ buildscript {
jcenter() jcenter()
} }
dependencies { dependencies {
classpath rootProject.ext.androidPlugin classpath 'com.android.tools.build:gradle:2.2.3'
} }
} }
android { android {
compileSdkVersion rootProject.ext.compileSdkVersion compileSdkVersion 25
buildToolsVersion rootProject.ext.buildToolsVersion buildToolsVersion '25.0.2'
defaultConfig { defaultConfig {
minSdkVersion rootProject.ext.minSdkVersion minSdkVersion 16
targetSdkVersion rootProject.ext.compileSdkVersion targetSdkVersion 25
versionCode 1 versionCode 1
versionName "1" versionName "1"
...@@ -29,27 +29,36 @@ android { ...@@ -29,27 +29,36 @@ android {
} }
} }
dependencies { ext {
testCompile 'junit:junit:4.12' supportVersion = '25.1.1'
compile rootProject.ext.supportAnnotations frescoVersion = '1.1.0'
compile rootProject.ext.supportAppCompat }
compile rootProject.ext.supportV13
compile rootProject.ext.supportDesign
dependencies {
compile project(':rocket-chat-core') compile project(':rocket-chat-core')
compile 'org.nibor.autolink:autolink:0.5.0' compile "com.android.support:support-annotations:$supportVersion"
compile "com.android.support:appcompat-v7:$supportVersion"
compile "com.android.support:support-v13:$supportVersion"
compile "com.android.support:design:$supportVersion"
compile 'org.nibor.autolink:autolink:0.6.0'
compile rootProject.ext.textDrawable compile 'com.amulyakhare:com.amulyakhare.textdrawable:1.0.1'
compile rootProject.ext.okhttp3
compile rootProject.ext.boltsTask compile 'com.squareup.okhttp3:okhttp:3.6.0'
compile 'com.parse.bolts:bolts-tasks:1.4.0'
compile 'com.github.yusukeiwaki.android-widget:widget-fontawesome:0.0.1' compile 'com.github.yusukeiwaki.android-widget:widget-fontawesome:0.0.1'
compile rootProject.ext.frescoBase compile "com.facebook.fresco:fresco:$frescoVersion"
compile rootProject.ext.frescoAnimatedGif compile "com.facebook.fresco:animated-gif:$frescoVersion"
compile rootProject.ext.frescoAnimatedWebp compile "com.facebook.fresco:animated-webp:$frescoVersion"
compile rootProject.ext.frescoWebp compile "com.facebook.fresco:webpsupport:$frescoVersion"
compile rootProject.ext.frescoImagePipelineOkHttp3 compile "com.facebook.fresco:imagepipeline-okhttp3:$frescoVersion"
compile 'com.caverock:androidsvg:1.2.1' compile 'com.caverock:androidsvg:1.2.1'
testCompile 'junit:junit:4.12'
} }
...@@ -11,7 +11,9 @@ dependencies { ...@@ -11,7 +11,9 @@ dependencies {
compile 'com.google.code.findbugs:jsr305:3.0.1' compile 'com.google.code.findbugs:jsr305:3.0.1'
compile 'io.reactivex:rxjava:1.2.3' compile 'io.reactivex.rxjava2:rxjava:2.0.6'
compile 'com.fernandocejas:arrow:1.0.0'
compile 'com.google.auto.value:auto-value:1.3' compile 'com.google.auto.value:auto-value:1.3'
apt 'com.google.auto.value:auto-value:1.3' apt 'com.google.auto.value:auto-value:1.3'
......
package chat.rocket.core.interactors; package chat.rocket.core.interactors;
import io.reactivex.Flowable;
import io.reactivex.Single;
import chat.rocket.core.repositories.UserRepository; import chat.rocket.core.repositories.UserRepository;
import rx.Observable;
import rx.Single;
public class CanCreateRoomInteractor { public class CanCreateRoomInteractor {
...@@ -16,13 +17,12 @@ public class CanCreateRoomInteractor { ...@@ -16,13 +17,12 @@ public class CanCreateRoomInteractor {
} }
public Single<Boolean> canCreate(String roomId) { public Single<Boolean> canCreate(String roomId) {
return Observable.zip( return Flowable.zip(
userRepository.getCurrent(), userRepository.getCurrent(),
sessionInteractor.getDefault(), sessionInteractor.getDefault(),
Observable.just(roomId), Flowable.just(roomId),
(user, session, room) -> user != null && session != null && room != null (user, session, room) -> user != null && session != null && room != null
) )
.first() .first(false);
.toSingle();
} }
} }
package chat.rocket.core.interactors; package chat.rocket.core.interactors;
import io.reactivex.Flowable;
import io.reactivex.Single;
import java.util.List; import java.util.List;
import java.util.UUID; import java.util.UUID;
import chat.rocket.core.SyncState; import chat.rocket.core.SyncState;
...@@ -9,8 +12,6 @@ import chat.rocket.core.models.RoomHistoryState; ...@@ -9,8 +12,6 @@ import chat.rocket.core.models.RoomHistoryState;
import chat.rocket.core.models.User; import chat.rocket.core.models.User;
import chat.rocket.core.repositories.MessageRepository; import chat.rocket.core.repositories.MessageRepository;
import chat.rocket.core.repositories.RoomRepository; import chat.rocket.core.repositories.RoomRepository;
import rx.Observable;
import rx.Single;
public class MessageInteractor { public class MessageInteractor {
...@@ -42,7 +43,7 @@ public class MessageInteractor { ...@@ -42,7 +43,7 @@ public class MessageInteractor {
return !roomHistoryState.isComplete() return !roomHistoryState.isComplete()
&& (syncState == SyncState.SYNCED || syncState == SyncState.FAILED); && (syncState == SyncState.SYNCED || syncState == SyncState.FAILED);
}) })
.first() .firstElement()
.toSingle() .toSingle()
.flatMap(roomHistoryState -> roomRepository .flatMap(roomHistoryState -> roomRepository
.setHistoryState(roomHistoryState.withSyncState(SyncState.NOT_SYNCED))); .setHistoryState(roomHistoryState.withSyncState(SyncState.NOT_SYNCED)));
...@@ -75,7 +76,7 @@ public class MessageInteractor { ...@@ -75,7 +76,7 @@ public class MessageInteractor {
return messageRepository.unreadCountFor(room, user); return messageRepository.unreadCountFor(room, user);
} }
public Observable<List<Message>> getAllFrom(Room room) { public Flowable<List<Message>> getAllFrom(Room room) {
return messageRepository.getAllFrom(room); return messageRepository.getAllFrom(room);
} }
} }
package chat.rocket.core.interactors; package chat.rocket.core.interactors;
import io.reactivex.Flowable;
import java.util.List; import java.util.List;
import chat.rocket.core.models.Room; import chat.rocket.core.models.Room;
import chat.rocket.core.repositories.RoomRepository; import chat.rocket.core.repositories.RoomRepository;
import rx.Observable;
public class RoomInteractor { public class RoomInteractor {
...@@ -13,26 +14,29 @@ public class RoomInteractor { ...@@ -13,26 +14,29 @@ public class RoomInteractor {
this.roomRepository = roomRepository; this.roomRepository = roomRepository;
} }
public Observable<Integer> getTotalUnreadMentionsCount() { public Flowable<Integer> getTotalUnreadMentionsCount() {
return roomRepository.getAll() return roomRepository.getAll()
.flatMap(rooms -> Observable.from(rooms) .flatMap(rooms -> Flowable.fromIterable(rooms)
.filter(room -> room.isOpen() && room.isAlert()) .filter(room -> room.isOpen() && room.isAlert())
.map(Room::getUnread) .map(Room::getUnread)
.defaultIfEmpty(0) .defaultIfEmpty(0)
.reduce((unreadCount, unreadCount2) -> unreadCount + unreadCount2)); .reduce((unreadCount, unreadCount2) -> unreadCount + unreadCount2)
.toFlowable());
} }
public Observable<Integer> getTotalUnreadRoomsCount() { public Flowable<Long> getTotalUnreadRoomsCount() {
return roomRepository.getAll() return roomRepository.getAll()
.flatMap(rooms -> Observable.from(rooms) .flatMap(rooms -> Flowable.fromIterable(rooms)
.filter(room -> room.isOpen() && room.isAlert()) .filter(room -> room.isOpen() && room.isAlert())
.count()); .count()
.toFlowable());
} }
public Observable<List<Room>> getOpenRooms() { public Flowable<List<Room>> getOpenRooms() {
return roomRepository.getAll() return roomRepository.getAll()
.flatMap(rooms -> Observable.from(rooms) .flatMap(rooms -> Flowable.fromIterable(rooms)
.filter(Room::isOpen) .filter(Room::isOpen)
.toList()); .toList()
.toFlowable());
} }
} }
package chat.rocket.core.interactors; package chat.rocket.core.interactors;
import com.fernandocejas.arrow.optional.Optional;
import io.reactivex.Flowable;
import io.reactivex.Single;
import chat.rocket.core.models.Session; import chat.rocket.core.models.Session;
import chat.rocket.core.repositories.SessionRepository; import chat.rocket.core.repositories.SessionRepository;
import rx.Observable;
import rx.Single;
public class SessionInteractor { public class SessionInteractor {
...@@ -15,20 +17,23 @@ public class SessionInteractor { ...@@ -15,20 +17,23 @@ public class SessionInteractor {
this.sessionRepository = sessionRepository; this.sessionRepository = sessionRepository;
} }
public Observable<Session> getDefault() { public Flowable<Optional<Session>> getDefault() {
return sessionRepository.getById(DEFAULT_ID); return sessionRepository.getById(DEFAULT_ID);
} }
public Observable<Session.State> getSessionState() { public Flowable<Session.State> getSessionState() {
return getDefault() return getDefault()
.map(this::getStateFrom); .map(sessionOptional -> getStateFrom(sessionOptional.orNull()));
} }
public Single<Boolean> retryLogin() { public Single<Boolean> retryLogin() {
return getDefault() return getDefault()
.filter(Optional::isPresent)
.map(Optional::get)
.filter(session -> session.getToken() != null .filter(session -> session.getToken() != null
&& (!session.isTokenVerified() || session.getError() != null)) && (!session.isTokenVerified() || session.getError() != null))
.map(session -> session.withTokenVerified(false).withError(null)) .map(session -> session.withTokenVerified(false).withError(null))
.firstElement()
.toSingle() .toSingle()
.flatMap(sessionRepository::save); .flatMap(sessionRepository::save);
} }
......
package chat.rocket.core.repositories; package chat.rocket.core.repositories;
import com.fernandocejas.arrow.optional.Optional;
import io.reactivex.Flowable;
import io.reactivex.Single;
import java.util.List; import java.util.List;
import chat.rocket.core.models.Message; import chat.rocket.core.models.Message;
import chat.rocket.core.models.Room; import chat.rocket.core.models.Room;
import chat.rocket.core.models.User; import chat.rocket.core.models.User;
import rx.Observable;
import rx.Single;
public interface MessageRepository { public interface MessageRepository {
Single<Message> getById(String messageId); Single<Optional<Message>> getById(String messageId);
Single<Boolean> save(Message message); Single<Boolean> save(Message message);
Single<Boolean> delete(Message message); Single<Boolean> delete(Message message);
Observable<List<Message>> getAllFrom(Room room); Flowable<List<Message>> getAllFrom(Room room);
Single<Integer> unreadCountFor(Room room, User user); Single<Integer> unreadCountFor(Room room, User user);
} }
package chat.rocket.core.repositories; package chat.rocket.core.repositories;
import io.reactivex.Flowable;
import io.reactivex.Single;
import java.util.List; import java.util.List;
import chat.rocket.core.models.Room; import chat.rocket.core.models.Room;
import chat.rocket.core.models.RoomHistoryState; import chat.rocket.core.models.RoomHistoryState;
import rx.Observable;
import rx.Single;
public interface RoomRepository { public interface RoomRepository {
Observable<List<Room>> getAll(); Flowable<List<Room>> getAll();
Observable<Room> getById(String roomId); Flowable<Room> getById(String roomId);
Observable<RoomHistoryState> getHistoryStateByRoomId(String roomId); Flowable<RoomHistoryState> getHistoryStateByRoomId(String roomId);
Single<Boolean> setHistoryState(RoomHistoryState roomHistoryState); Single<Boolean> setHistoryState(RoomHistoryState roomHistoryState);
} }
package chat.rocket.core.repositories; package chat.rocket.core.repositories;
import com.fernandocejas.arrow.optional.Optional;
import io.reactivex.Flowable;
import io.reactivex.Single;
import chat.rocket.core.models.Session; import chat.rocket.core.models.Session;
import rx.Observable;
import rx.Single;
public interface SessionRepository { public interface SessionRepository {
Observable<Session> getById(int id); Flowable<Optional<Session>> getById(int id);
Single<Boolean> save(Session session); Single<Boolean> save(Session session);
} }
package chat.rocket.core.repositories; package chat.rocket.core.repositories;
import com.fernandocejas.arrow.optional.Optional;
import io.reactivex.Flowable;
import chat.rocket.core.models.User; import chat.rocket.core.models.User;
import rx.Observable;
public interface UserRepository { public interface UserRepository {
Observable<User> getCurrent(); Flowable<Optional<User>> getCurrent();
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment