Commit ffa584b9 authored by Leonardo Aramaki's avatar Leonardo Aramaki

Band-aid fix for the network issues without removing bolts, yet

parent 3fa0cd78
...@@ -61,6 +61,10 @@ public class DDPClient { ...@@ -61,6 +61,10 @@ public class DDPClient {
return impl.getOnCloseCallback(); return impl.getOnCloseCallback();
} }
public Task<RxWebSocketCallback.Failure> getOnFailureCallback() {
return impl.getOnFailureCallback();
}
public void close() { public void close() {
impl.close(1000, "closed by DDPClient#close()"); impl.close(1000, "closed by DDPClient#close()");
} }
......
...@@ -3,28 +3,34 @@ package chat.rocket.android_ddp; ...@@ -3,28 +3,34 @@ package chat.rocket.android_ddp;
import android.support.annotation.NonNull; import android.support.annotation.NonNull;
import android.support.annotation.Nullable; import android.support.annotation.Nullable;
import android.text.TextUtils; import android.text.TextUtils;
import io.reactivex.Flowable;
import io.reactivex.disposables.CompositeDisposable;
import org.json.JSONArray; import org.json.JSONArray;
import org.json.JSONException; import org.json.JSONException;
import org.json.JSONObject; import org.json.JSONObject;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import bolts.Task; import bolts.Task;
import bolts.TaskCompletionSource; import bolts.TaskCompletionSource;
import chat.rocket.android.log.RCLog; import chat.rocket.android.log.RCLog;
import chat.rocket.android_ddp.rx.RxWebSocket; import chat.rocket.android_ddp.rx.RxWebSocket;
import chat.rocket.android_ddp.rx.RxWebSocketCallback; import chat.rocket.android_ddp.rx.RxWebSocketCallback;
import io.reactivex.Flowable;
import io.reactivex.disposables.CompositeDisposable;
import okhttp3.OkHttpClient; import okhttp3.OkHttpClient;
public class DDPClientImpl { public class DDPClientImpl {
private final DDPClient client; private final DDPClient client;
private final RxWebSocket websocket; private RxWebSocket websocket;
private Flowable<RxWebSocketCallback.Base> flowable; private Flowable<RxWebSocketCallback.Base> flowable;
private CompositeDisposable subscriptions; private CompositeDisposable subscriptions;
private OkHttpClient okHttpClient;
private String currentSession;
private String url;
public DDPClientImpl(DDPClient self, OkHttpClient client) { public DDPClientImpl(DDPClient self, OkHttpClient client) {
okHttpClient = client;
websocket = new RxWebSocket(client); websocket = new RxWebSocket(client);
this.client = self; this.client = self;
} }
...@@ -51,21 +57,23 @@ public class DDPClientImpl { ...@@ -51,21 +57,23 @@ public class DDPClientImpl {
public void connect(final TaskCompletionSource<DDPClientCallback.Connect> task, final String url, public void connect(final TaskCompletionSource<DDPClientCallback.Connect> task, final String url,
String session) { String session) {
try { try {
this.url = url;
flowable = websocket.connect(url).autoConnect(); flowable = websocket.connect(url).autoConnect();
CompositeDisposable subscriptions = new CompositeDisposable(); CompositeDisposable subscriptions = new CompositeDisposable();
subscriptions.add( subscriptions.add(
flowable.filter(callback -> callback instanceof RxWebSocketCallback.Open) flowable.retry().filter(callback -> callback instanceof RxWebSocketCallback.Open)
.subscribe( .subscribe(
callback -> { callback -> {
sendMessage("connect", sendMessage("connect",
json -> (TextUtils.isEmpty(session) ? json : json.put("session", session)) json -> (TextUtils.isEmpty(session) ? json : json.put("session", DDPClientImpl.this.currentSession))
.put( .put(
"version", "pre2") "version", "pre2")
.put("support", new JSONArray().put("pre2").put("pre1")), .put("support", new JSONArray().put("pre2").put("pre1")),
task); task);
}, },
err -> { err -> {
System.err.println("Something bad happened!");
} }
) )
); );
...@@ -79,6 +87,7 @@ public class DDPClientImpl { ...@@ -79,6 +87,7 @@ public class DDPClientImpl {
.subscribe(response -> { .subscribe(response -> {
String msg = extractMsg(response); String msg = extractMsg(response);
if ("connected".equals(msg) && !response.isNull("session")) { if ("connected".equals(msg) && !response.isNull("session")) {
currentSession = response.optString("session");
task.trySetResult( task.trySetResult(
new DDPClientCallback.Connect(client, response.optString("session"))); new DDPClientCallback.Connect(client, response.optString("session")));
subscriptions.dispose(); subscriptions.dispose();
...@@ -369,6 +378,28 @@ public class DDPClientImpl { ...@@ -369,6 +378,28 @@ public class DDPClientImpl {
}); });
} }
public Task<RxWebSocketCallback.Failure> getOnFailureCallback() {
TaskCompletionSource<RxWebSocketCallback.Failure> task = new TaskCompletionSource<>();
flowable.filter(callback -> callback instanceof RxWebSocketCallback.Failure)
.cast(RxWebSocketCallback.Failure.class)
.subscribe(
task::setResult,
err -> {
if (err instanceof Exception) {
task.setError((Exception) err);
} else {
task.setError(new Exception(err));
}
}
);
return task.getTask().onSuccessTask(_task -> {
unsubscribeBaseListeners();
return _task;
});
}
private boolean sendMessage(String msg, @Nullable JSONBuilder json) { private boolean sendMessage(String msg, @Nullable JSONBuilder json) {
try { try {
JSONObject origJson = new JSONObject().put("msg", msg); JSONObject origJson = new JSONObject().put("msg", msg);
...@@ -383,7 +414,8 @@ public class DDPClientImpl { ...@@ -383,7 +414,8 @@ public class DDPClientImpl {
private void sendMessage(String msg, @Nullable JSONBuilder json, private void sendMessage(String msg, @Nullable JSONBuilder json,
TaskCompletionSource<?> taskForSetError) { TaskCompletionSource<?> taskForSetError) {
if (!sendMessage(msg, json)) { if (!sendMessage(msg, json)) {
taskForSetError.trySetError(new DDPClientCallback.Closed(client)); if (taskForSetError != null)
taskForSetError.trySetError(new DDPClientCallback.Closed(client));
} }
} }
......
...@@ -7,6 +7,9 @@ import io.reactivex.exceptions.OnErrorNotImplementedException; ...@@ -7,6 +7,9 @@ import io.reactivex.exceptions.OnErrorNotImplementedException;
import io.reactivex.flowables.ConnectableFlowable; import io.reactivex.flowables.ConnectableFlowable;
import java.io.IOException; import java.io.IOException;
import java.net.UnknownHostException;
import java.util.concurrent.TimeUnit;
import chat.rocket.android.log.RCLog; import chat.rocket.android.log.RCLog;
import okhttp3.OkHttpClient; import okhttp3.OkHttpClient;
import okhttp3.Request; import okhttp3.Request;
...@@ -37,7 +40,11 @@ public class RxWebSocket { ...@@ -37,7 +40,11 @@ public class RxWebSocket {
@Override @Override
public void onFailure(WebSocket webSocket, Throwable err, Response response) { public void onFailure(WebSocket webSocket, Throwable err, Response response) {
try { try {
emitter.onError(new RxWebSocketCallback.Failure(webSocket, err, response)); if (err instanceof UnknownHostException) {
emitter.onError(err);
} else {
emitter.onNext(new RxWebSocketCallback.Failure(webSocket, err, response));
}
} catch (OnErrorNotImplementedException ex) { } catch (OnErrorNotImplementedException ex) {
RCLog.w(ex, "OnErrorNotImplementedException ignored"); RCLog.w(ex, "OnErrorNotImplementedException ignored");
} }
...@@ -55,7 +62,7 @@ public class RxWebSocket { ...@@ -55,7 +62,7 @@ public class RxWebSocket {
} }
}), }),
BackpressureStrategy.BUFFER BackpressureStrategy.BUFFER
).publish(); ).delay(2000, TimeUnit.MILLISECONDS).publish();
} }
public boolean sendText(String message) throws IOException { public boolean sendText(String message) throws IOException {
......
...@@ -31,13 +31,11 @@ public class RxWebSocketCallback { ...@@ -31,13 +31,11 @@ public class RxWebSocketCallback {
} }
} }
public static class Failure extends Exception { public static class Failure extends Base {
public WebSocket ws;
public Response response; public Response response;
public Failure(WebSocket websocket, Throwable err, Response response) { public Failure(WebSocket websocket, Throwable err, Response response) {
super(err); super("Failure", websocket);
this.ws = websocket;
this.response = response; this.response = response;
} }
......
...@@ -3,6 +3,9 @@ package chat.rocket.android.activity; ...@@ -3,6 +3,9 @@ package chat.rocket.android.activity;
import android.support.annotation.NonNull; import android.support.annotation.NonNull;
import android.support.v4.util.Pair; import android.support.v4.util.Pair;
import chat.rocket.android.service.RocketChatWebSocketThread;
import chat.rocket.android.service.ServerConnectivity;
import hu.akarnokd.rxjava.interop.RxJavaInterop;
import io.reactivex.Flowable; import io.reactivex.Flowable;
import io.reactivex.android.schedulers.AndroidSchedulers; import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.disposables.Disposable; import io.reactivex.disposables.Disposable;
...@@ -19,6 +22,7 @@ import chat.rocket.core.interactors.RoomInteractor; ...@@ -19,6 +22,7 @@ import chat.rocket.core.interactors.RoomInteractor;
import chat.rocket.core.interactors.SessionInteractor; import chat.rocket.core.interactors.SessionInteractor;
import chat.rocket.core.models.Session; import chat.rocket.core.models.Session;
import chat.rocket.core.models.User; import chat.rocket.core.models.User;
import io.reactivex.schedulers.Schedulers;
public class MainPresenter extends BasePresenter<MainContract.View> public class MainPresenter extends BasePresenter<MainContract.View>
implements MainContract.Presenter { implements MainContract.Presenter {
...@@ -159,6 +163,19 @@ public class MainPresenter extends BasePresenter<MainContract.View> ...@@ -159,6 +163,19 @@ public class MainPresenter extends BasePresenter<MainContract.View>
); );
addSubscription(subscription); addSubscription(subscription);
addSubscription(
RxJavaInterop.toV2Observable(connectivityManagerApi.getServerConnectivityAsObservable())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(serverConnectivity -> {
if (serverConnectivity.state == ServerConnectivity.STATE_CONNECTING) {
view.showConnecting();
}
},
err -> {
})
);
} }
private void setUserOnline() { private void setUserOnline() {
......
...@@ -36,7 +36,7 @@ import hugo.weaving.DebugLog; ...@@ -36,7 +36,7 @@ import hugo.weaving.DebugLog;
*/ */
public class MethodCallHelper { public class MethodCallHelper {
protected static final long TIMEOUT_MS = 20000; protected static final long TIMEOUT_MS = 10000;
protected static final Continuation<String, Task<JSONObject>> CONVERT_TO_JSON_OBJECT = protected static final Continuation<String, Task<JSONObject>> CONVERT_TO_JSON_OBJECT =
task -> Task.forResult(new JSONObject(task.getResult())); task -> Task.forResult(new JSONObject(task.getResult()));
protected static final Continuation<String, Task<JSONArray>> CONVERT_TO_JSON_ARRAY = protected static final Continuation<String, Task<JSONArray>> CONVERT_TO_JSON_ARRAY =
......
...@@ -23,4 +23,6 @@ import chat.rocket.core.models.ServerInfo; ...@@ -23,4 +23,6 @@ import chat.rocket.core.models.ServerInfo;
void notifyConnectionEstablished(String hostname, String session); void notifyConnectionEstablished(String hostname, String session);
void notifyConnectionLost(String hostname, int reason); void notifyConnectionLost(String hostname, int reason);
void notifyConnecting(String hostname);
} }
...@@ -133,6 +133,14 @@ import rx.subjects.PublishSubject; ...@@ -133,6 +133,14 @@ import rx.subjects.PublishSubject;
new ServerConnectivity(hostname, ServerConnectivity.STATE_DISCONNECTED)); new ServerConnectivity(hostname, ServerConnectivity.STATE_DISCONNECTED));
} }
@DebugLog
@Override
public void notifyConnecting(String hostname) {
serverConnectivityList.put(hostname, ServerConnectivity.STATE_CONNECTING);
connectivitySubject.onNext(
new ServerConnectivity(hostname, ServerConnectivity.STATE_CONNECTING));
}
@Override @Override
public Observable<ServerConnectivity> getServerConnectivityAsObservable() { public Observable<ServerConnectivity> getServerConnectivityAsObservable() {
return Observable.concat(Observable.from(getCurrentConnectivityList()), connectivitySubject); return Observable.concat(Observable.from(getCurrentConnectivityList()), connectivitySubject);
......
...@@ -9,11 +9,14 @@ import java.lang.reflect.Constructor; ...@@ -9,11 +9,14 @@ import java.lang.reflect.Constructor;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Iterator; import java.util.Iterator;
import bolts.Task; import bolts.Task;
import chat.rocket.android.RocketChatCache;
import chat.rocket.android.api.DDPClientWrapper; import chat.rocket.android.api.DDPClientWrapper;
import chat.rocket.android.api.MethodCallHelper; import chat.rocket.android.api.MethodCallHelper;
import chat.rocket.android.helper.LogIfError; import chat.rocket.android.helper.LogIfError;
import chat.rocket.android.helper.Logger;
import chat.rocket.android.helper.TextUtils; import chat.rocket.android.helper.TextUtils;
import chat.rocket.android.log.RCLog; import chat.rocket.android.log.RCLog;
import chat.rocket.android.service.ddp.stream.StreamRoomMessage;
import chat.rocket.core.models.ServerInfo; import chat.rocket.core.models.ServerInfo;
import chat.rocket.persistence.realm.models.internal.RealmSession; import chat.rocket.persistence.realm.models.internal.RealmSession;
import chat.rocket.persistence.realm.RealmHelper; import chat.rocket.persistence.realm.RealmHelper;
...@@ -32,8 +35,11 @@ import chat.rocket.android.service.observer.NewMessageObserver; ...@@ -32,8 +35,11 @@ import chat.rocket.android.service.observer.NewMessageObserver;
import chat.rocket.android.service.observer.PushSettingsObserver; import chat.rocket.android.service.observer.PushSettingsObserver;
import chat.rocket.android.service.observer.SessionObserver; import chat.rocket.android.service.observer.SessionObserver;
import chat.rocket.android.service.observer.TokenLoginObserver; import chat.rocket.android.service.observer.TokenLoginObserver;
import hu.akarnokd.rxjava.interop.RxJavaInterop;
import hugo.weaving.DebugLog; import hugo.weaving.DebugLog;
import io.reactivex.disposables.CompositeDisposable;
import rx.Single; import rx.Single;
import rx.Subscription;
/** /**
* Thread for handling WebSocket connection. * Thread for handling WebSocket connection.
...@@ -62,7 +68,8 @@ public class RocketChatWebSocketThread extends HandlerThread { ...@@ -62,7 +68,8 @@ public class RocketChatWebSocketThread extends HandlerThread {
private final ArrayList<Registrable> listeners = new ArrayList<>(); private final ArrayList<Registrable> listeners = new ArrayList<>();
private DDPClientWrapper ddpClient; private DDPClientWrapper ddpClient;
private boolean listenersRegistered; private boolean listenersRegistered;
private final DDPClientRef ddpClientRef = new DDPClientRef() { private RocketChatCache rocketChatCache;
private DDPClientRef ddpClientRef = new DDPClientRef() {
@Override @Override
public DDPClientWrapper get() { public DDPClientWrapper get() {
return ddpClient; return ddpClient;
...@@ -96,6 +103,7 @@ public class RocketChatWebSocketThread extends HandlerThread { ...@@ -96,6 +103,7 @@ public class RocketChatWebSocketThread extends HandlerThread {
this.hostname = hostname; this.hostname = hostname;
this.realmHelper = RealmStore.getOrCreate(hostname); this.realmHelper = RealmStore.getOrCreate(hostname);
this.connectivityManager = ConnectivityManager.getInstanceForInternal(appContext); this.connectivityManager = ConnectivityManager.getInstanceForInternal(appContext);
this.rocketChatCache = new RocketChatCache(appContext);
} }
/** /**
...@@ -216,6 +224,12 @@ public class RocketChatWebSocketThread extends HandlerThread { ...@@ -216,6 +224,12 @@ public class RocketChatWebSocketThread extends HandlerThread {
if (!alive) { if (!alive) {
RCLog.d("DDPClient#create"); RCLog.d("DDPClient#create");
ddpClient = DDPClientWrapper.create(hostname); ddpClient = DDPClientWrapper.create(hostname);
ddpClientRef = new DDPClientRef() {
@Override
public DDPClientWrapper get() {
return ddpClient;
}
};
} }
}); });
} }
...@@ -238,6 +252,35 @@ public class RocketChatWebSocketThread extends HandlerThread { ...@@ -238,6 +252,35 @@ public class RocketChatWebSocketThread extends HandlerThread {
return null; return null;
}); });
task.getResult().client.getOnFailureCallback().onSuccess(_task -> {
ddpClient = null;
CompositeDisposable subscriptions = new CompositeDisposable();
connectivityManager.notifyConnecting(hostname);
subscriptions.add(
RxJavaInterop.toV2Single(connect().retry())
.subscribe(
rocketChatWebSocketThread -> {
String roomId = rocketChatCache.getSelectedRoomId();
if (roomId != null) {
StreamRoomMessage streamRoomObserver = new StreamRoomMessage(
appContext,
hostname,
realmHelper,
ddpClientRef,
roomId
);
streamRoomObserver.register();
listeners.add(streamRoomObserver);
}
},
err -> {
subscriptions.dispose();
}
)
);
return null;
});
return realmHelper.executeTransaction(realm -> { return realmHelper.executeTransaction(realm -> {
RealmSession sessionObj = RealmSession.queryDefaultSession(realm).findFirst(); RealmSession sessionObj = RealmSession.queryDefaultSession(realm).findFirst();
if (sessionObj == null) { if (sessionObj == null) {
...@@ -308,6 +351,15 @@ public class RocketChatWebSocketThread extends HandlerThread { ...@@ -308,6 +351,15 @@ public class RocketChatWebSocketThread extends HandlerThread {
registrable.register(); registrable.register();
listeners.add(registrable); listeners.add(registrable);
} }
String roomId = rocketChatCache.getSelectedRoomId();
if (roomId != null) {
StreamRoomMessage streamRoomMessage = new StreamRoomMessage(
appContext, hostname, realmHelper, ddpClientRef, roomId);
streamRoomMessage.register();
listeners.add(streamRoomMessage);
}
} catch (Exception exception) { } catch (Exception exception) {
RCLog.w(exception, "Failed to register listeners!!"); RCLog.w(exception, "Failed to register listeners!!");
} }
......
...@@ -6,7 +6,7 @@ package chat.rocket.android.service; ...@@ -6,7 +6,7 @@ package chat.rocket.android.service;
public class ServerConnectivity { public class ServerConnectivity {
public static final int STATE_CONNECTED = 1; public static final int STATE_CONNECTED = 1;
public static final int STATE_DISCONNECTED = 2; public static final int STATE_DISCONNECTED = 2;
/*package*/ static final int STATE_CONNECTING = 3; public static final int STATE_CONNECTING = 3;
/*package*/ static final int STATE_DISCONNECTING = 4; /*package*/ static final int STATE_DISCONNECTING = 4;
public final String hostname; public final String hostname;
......
...@@ -117,10 +117,10 @@ public class MethodCallObserver extends AbstractModelObserver<MethodCall> { ...@@ -117,10 +117,10 @@ public class MethodCallObserver extends AbstractModelObserver<MethodCall> {
} else if (exception instanceof DDPClientCallback.RPC.Timeout) { } else if (exception instanceof DDPClientCallback.RPC.Timeout) {
// temp "fix"- we need to rewrite the connection layer a bit // temp "fix"- we need to rewrite the connection layer a bit
errMessage = "{\"message\": \"Connection Timeout\"}"; errMessage = "{\"message\": \"Connection Timeout\"}";
} else if (exception instanceof RxWebSocketCallback.Failure) { } /*else if (exception instanceof RxWebSocketCallback.Failure) {
// temp "fix"- we need to rewrite the connection layer a bit // temp "fix"- we need to rewrite the connection layer a bit
errMessage = "{\"message\": \"Connection Failure\"}"; errMessage = "{\"message\": \"Connection Failure\"}";
} else { }*/ else {
errMessage = exception.getMessage(); errMessage = exception.getMessage();
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment