Unverified Commit 92ee59e8 authored by Rafael Kellermann Streit's avatar Rafael Kellermann Streit Committed by GitHub

Merge pull request #586 from RocketChat/fix/indefinite-status-ticker

[FIX] Infinite status ticker
parents 61707e2d cfa5323f
...@@ -50,30 +50,30 @@ public class DDPClient { ...@@ -50,30 +50,30 @@ public class DDPClient {
impl = new DDPClientImpl(this, client); impl = new DDPClientImpl(this, client);
} }
public Task<DDPClientCallback.Connect> connect(String url, String session) { private Task<DDPClientCallback.Connect> connect(String url, String session) {
hostname.set(url); hostname.set(url);
TaskCompletionSource<DDPClientCallback.Connect> task = new TaskCompletionSource<>(); TaskCompletionSource<DDPClientCallback.Connect> task = new TaskCompletionSource<>();
impl.connect(task, url, session); impl.connect(task, url, session);
return task.getTask(); return task.getTask();
} }
public Task<DDPClientCallback.Ping> ping(@Nullable String id) { private Task<DDPClientCallback.Ping> ping(@Nullable String id) {
TaskCompletionSource<DDPClientCallback.Ping> task = new TaskCompletionSource<>(); TaskCompletionSource<DDPClientCallback.Ping> task = new TaskCompletionSource<>();
impl.ping(task, id); impl.ping(task, id);
return task.getTask(); return task.getTask();
} }
public Maybe<DDPClientCallback.Base> doPing(@Nullable String id) { private Maybe<DDPClientCallback.Base> doPing(@Nullable String id) {
return impl.ping(id); return impl.ping(id);
} }
public Task<DDPSubscription.Ready> sub(String id, String name, JSONArray params) { private Task<DDPSubscription.Ready> sub(String id, String name, JSONArray params) {
TaskCompletionSource<DDPSubscription.Ready> task = new TaskCompletionSource<>(); TaskCompletionSource<DDPSubscription.Ready> task = new TaskCompletionSource<>();
impl.sub(task, name, params, id); impl.sub(task, name, params, id);
return task.getTask(); return task.getTask();
} }
public Task<DDPSubscription.NoSub> unsub(String id) { private Task<DDPSubscription.NoSub> unsub(String id) {
TaskCompletionSource<DDPSubscription.NoSub> task = new TaskCompletionSource<>(); TaskCompletionSource<DDPSubscription.NoSub> task = new TaskCompletionSource<>();
impl.unsub(task, id); impl.unsub(task, id);
return task.getTask(); return task.getTask();
......
...@@ -52,7 +52,7 @@ public class DDPClientImpl { ...@@ -52,7 +52,7 @@ public class DDPClientImpl {
} }
} }
public void connect(final TaskCompletionSource<DDPClientCallback.Connect> task, final String url, /* package */ void connect(final TaskCompletionSource<DDPClientCallback.Connect> task, final String url,
String session) { String session) {
try { try {
flowable = websocket.connect(url).autoConnect(2); flowable = websocket.connect(url).autoConnect(2);
......
...@@ -47,9 +47,9 @@ import hugo.weaving.DebugLog; ...@@ -47,9 +47,9 @@ import hugo.weaving.DebugLog;
*/ */
public class MainActivity extends AbstractAuthedActivity implements MainContract.View { public class MainActivity extends AbstractAuthedActivity implements MainContract.View {
private RoomToolbar toolbar; private RoomToolbar toolbar;
private StatusTicker statusTicker;
private SlidingPaneLayout pane; private SlidingPaneLayout pane;
private MainContract.Presenter presenter; private MainContract.Presenter presenter;
private volatile Snackbar statusTicker;
@Override @Override
public int getLayoutContainerForFragment() { public int getLayoutContainerForFragment() {
...@@ -61,7 +61,6 @@ public class MainActivity extends AbstractAuthedActivity implements MainContract ...@@ -61,7 +61,6 @@ public class MainActivity extends AbstractAuthedActivity implements MainContract
super.onCreate(savedInstanceState); super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main); setContentView(R.layout.activity_main);
toolbar = findViewById(R.id.activity_main_toolbar); toolbar = findViewById(R.id.activity_main_toolbar);
statusTicker = new StatusTicker();
pane = findViewById(R.id.sliding_pane); pane = findViewById(R.id.sliding_pane);
setupToolbar(); setupToolbar();
} }
...@@ -95,6 +94,8 @@ public class MainActivity extends AbstractAuthedActivity implements MainContract ...@@ -95,6 +94,8 @@ public class MainActivity extends AbstractAuthedActivity implements MainContract
if (presenter != null) { if (presenter != null) {
presenter.release(); presenter.release();
} }
// Dismiss any status ticker
if (statusTicker != null) statusTicker.dismiss();
super.onPause(); super.onPause();
} }
...@@ -245,28 +246,36 @@ public class MainActivity extends AbstractAuthedActivity implements MainContract ...@@ -245,28 +246,36 @@ public class MainActivity extends AbstractAuthedActivity implements MainContract
@Override @Override
public void showLoginScreen() { public void showLoginScreen() {
LaunchUtil.showLoginActivity(this, hostname); LaunchUtil.showLoginActivity(this, hostname);
statusTicker.updateStatus(StatusTicker.STATUS_DISMISS, null); showConnectionOk();
} }
@Override @Override
public void showConnectionError() { public synchronized void showConnectionError() {
statusTicker.updateStatus(StatusTicker.STATUS_CONNECTION_ERROR, dismissStatusTickerIfShowing();
Snackbar.make(findViewById(getLayoutContainerForFragment()), statusTicker = Snackbar.make(findViewById(getLayoutContainerForFragment()),
R.string.fragment_retry_login_error_title, Snackbar.LENGTH_INDEFINITE) R.string.fragment_retry_login_error_title, Snackbar.LENGTH_INDEFINITE)
.setAction(R.string.fragment_retry_login_retry_title, view -> .setAction(R.string.fragment_retry_login_retry_title, view ->
ConnectivityManager.getInstance(getApplicationContext()).keepAliveServer())); ConnectivityManager.getInstance(getApplicationContext()).keepAliveServer());
statusTicker.show();
} }
@Override @Override
public void showConnecting() { public synchronized void showConnecting() {
statusTicker.updateStatus(StatusTicker.STATUS_TOKEN_LOGIN, dismissStatusTickerIfShowing();
Snackbar.make(findViewById(getLayoutContainerForFragment()), statusTicker = Snackbar.make(findViewById(getLayoutContainerForFragment()),
R.string.server_config_activity_authenticating, Snackbar.LENGTH_INDEFINITE)); R.string.server_config_activity_authenticating, Snackbar.LENGTH_INDEFINITE);
statusTicker.show();
} }
@Override @Override
public void showConnectionOk() { public synchronized void showConnectionOk() {
statusTicker.updateStatus(StatusTicker.STATUS_DISMISS, null); dismissStatusTickerIfShowing();
}
private void dismissStatusTickerIfShowing() {
if (statusTicker != null) {
statusTicker.dismiss();
}
} }
@Override @Override
...@@ -349,34 +358,4 @@ public class MainActivity extends AbstractAuthedActivity implements MainContract ...@@ -349,34 +358,4 @@ public class MainActivity extends AbstractAuthedActivity implements MainContract
public void beforeLogoutCleanUp() { public void beforeLogoutCleanUp() {
presenter.beforeLogoutCleanUp(); presenter.beforeLogoutCleanUp();
} }
//TODO: consider this class to define in layouthelper for more complicated operation.
private static class StatusTicker {
static final int STATUS_DISMISS = 0;
static final int STATUS_CONNECTION_ERROR = 1;
static final int STATUS_TOKEN_LOGIN = 2;
private int status;
private Snackbar snackbar;
StatusTicker() {
status = STATUS_DISMISS;
}
void updateStatus(int status, Snackbar snackbar) {
if (status == this.status) {
return;
}
this.status = status;
if (this.snackbar != null) {
this.snackbar.dismiss();
}
if (status != STATUS_DISMISS) {
this.snackbar = snackbar;
if (this.snackbar != null) {
this.snackbar.show();
}
}
}
}
} }
...@@ -33,224 +33,225 @@ import io.reactivex.android.schedulers.AndroidSchedulers; ...@@ -33,224 +33,225 @@ import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.disposables.Disposable; import io.reactivex.disposables.Disposable;
public class MainPresenter extends BasePresenter<MainContract.View> public class MainPresenter extends BasePresenter<MainContract.View>
implements MainContract.Presenter { implements MainContract.Presenter {
private final CanCreateRoomInteractor canCreateRoomInteractor; private final CanCreateRoomInteractor canCreateRoomInteractor;
private final RoomInteractor roomInteractor; private final RoomInteractor roomInteractor;
private final SessionInteractor sessionInteractor; private final SessionInteractor sessionInteractor;
private final MethodCallHelper methodCallHelper; private final MethodCallHelper methodCallHelper;
private final ConnectivityManagerApi connectivityManagerApi; private final ConnectivityManagerApi connectivityManagerApi;
private final RocketChatCache rocketChatCache; private final RocketChatCache rocketChatCache;
private final PublicSettingRepository publicSettingRepository; private final PublicSettingRepository publicSettingRepository;
public MainPresenter(RoomInteractor roomInteractor, public MainPresenter(RoomInteractor roomInteractor,
CanCreateRoomInteractor canCreateRoomInteractor, CanCreateRoomInteractor canCreateRoomInteractor,
SessionInteractor sessionInteractor, SessionInteractor sessionInteractor,
MethodCallHelper methodCallHelper, MethodCallHelper methodCallHelper,
ConnectivityManagerApi connectivityManagerApi, ConnectivityManagerApi connectivityManagerApi,
RocketChatCache rocketChatCache, PublicSettingRepository publicSettingRepository) { RocketChatCache rocketChatCache, PublicSettingRepository publicSettingRepository) {
this.roomInteractor = roomInteractor; this.roomInteractor = roomInteractor;
this.canCreateRoomInteractor = canCreateRoomInteractor; this.canCreateRoomInteractor = canCreateRoomInteractor;
this.sessionInteractor = sessionInteractor; this.sessionInteractor = sessionInteractor;
this.methodCallHelper = methodCallHelper; this.methodCallHelper = methodCallHelper;
this.connectivityManagerApi = connectivityManagerApi; this.connectivityManagerApi = connectivityManagerApi;
this.rocketChatCache = rocketChatCache; this.rocketChatCache = rocketChatCache;
this.publicSettingRepository = publicSettingRepository; this.publicSettingRepository = publicSettingRepository;
}
@Override
public void bindViewOnly(@NonNull MainContract.View view) {
super.bindView(view);
subscribeToUnreadCount();
subscribeToSession();
setUserOnline();
}
@Override
public void loadSignedInServers(@NonNull String hostname) {
final Disposable disposable = publicSettingRepository.getById(PublicSettingsConstants.Assets.LOGO)
.zipWith(publicSettingRepository.getById(PublicSettingsConstants.General.SITE_NAME), Pair::new)
.map(this::getLogoAndSiteNamePair)
.map(settings -> getServerList(hostname, settings))
.subscribeOn(AndroidSchedulers.from(BackgroundLooper.get()))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
view::showSignedInServers,
RCLog::e
);
addSubscription(disposable);
}
@Override
public void bindView(@NonNull MainContract.View view) {
super.bindView(view);
if (shouldLaunchAddServerActivity()) {
view.showAddServerScreen();
return;
} }
openRoom(); @Override
public void bindViewOnly(@NonNull MainContract.View view) {
subscribeToNetworkChanges(); super.bindView(view);
subscribeToUnreadCount(); subscribeToUnreadCount();
subscribeToSession(); subscribeToSession();
setUserOnline(); setUserOnline();
}
@Override
public void release() {
setUserAway();
super.release();
}
@Override
public void onOpenRoom(String hostname, String roomId) {
final Disposable subscription = canCreateRoomInteractor.canCreate(roomId)
.subscribeOn(AndroidSchedulers.from(BackgroundLooper.get()))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
allowed -> {
if (allowed) {
view.showRoom(hostname, roomId);
} else {
view.showHome();
}
},
Logger::report
);
addSubscription(subscription);
}
@Override
public void onRetryLogin() {
final Disposable subscription = sessionInteractor.retryLogin()
.subscribe();
addSubscription(subscription);
}
@Override
public void beforeLogoutCleanUp() {
clearSubscriptions();
}
private Pair<String, String> getLogoAndSiteNamePair(Pair<Optional<PublicSetting>, Optional<PublicSetting>> settingsPair) {
String logoUrl = "";
String siteName = "";
if (settingsPair.first.isPresent()) {
logoUrl = settingsPair.first.get().getValue();
}
if (settingsPair.second.isPresent()) {
siteName = settingsPair.second.get().getValue();
}
return new Pair<>(logoUrl, siteName);
}
private List<Pair<String, Pair<String, String>>> getServerList(String hostname, Pair<String, String> serverInfoPair) throws JSONException {
JSONObject jsonObject = new JSONObject(serverInfoPair.first);
String logoUrl = (jsonObject.has("url")) ?
jsonObject.optString("url") : jsonObject.optString("defaultUrl");
String siteName = serverInfoPair.second;
rocketChatCache.addHostname(hostname.toLowerCase(), logoUrl, siteName);
return rocketChatCache.getServerList();
}
private void openRoom() {
String hostname = rocketChatCache.getSelectedServerHostname();
String roomId = rocketChatCache.getSelectedRoomId();
if (roomId == null || roomId.length() == 0) {
view.showHome();
return;
} }
onOpenRoom(hostname, roomId); @Override
} public void loadSignedInServers(@NonNull String hostname) {
final Disposable disposable = publicSettingRepository.getById(PublicSettingsConstants.Assets.LOGO)
private void subscribeToUnreadCount() { .zipWith(publicSettingRepository.getById(PublicSettingsConstants.General.SITE_NAME), Pair::new)
final Disposable subscription = Flowable.combineLatest( .map(this::getLogoAndSiteNamePair)
roomInteractor.getTotalUnreadRoomsCount(), .map(settings -> getServerList(hostname, settings))
roomInteractor.getTotalUnreadMentionsCount(), .subscribeOn(AndroidSchedulers.from(BackgroundLooper.get()))
(Pair::new) .observeOn(AndroidSchedulers.mainThread())
) .subscribe(
.subscribeOn(AndroidSchedulers.from(BackgroundLooper.get())) view::showSignedInServers,
.observeOn(AndroidSchedulers.mainThread()) RCLog::e
.subscribe( );
pair -> view.showUnreadCount(pair.first, pair.second),
Logger::report addSubscription(disposable);
); }
addSubscription(subscription); @Override
} public void bindView(@NonNull MainContract.View view) {
super.bindView(view);
private void subscribeToSession() {
final Disposable subscription = sessionInteractor.getDefault() if (shouldLaunchAddServerActivity()) {
.subscribeOn(AndroidSchedulers.from(BackgroundLooper.get())) view.showAddServerScreen();
.observeOn(AndroidSchedulers.mainThread()) return;
.subscribe( }
sessionOptional -> {
Session session = sessionOptional.orNull(); openRoom();
if (session == null || session.getToken() == null) {
view.showLoginScreen(); subscribeToNetworkChanges();
return; subscribeToUnreadCount();
} subscribeToSession();
setUserOnline();
String error = session.getError(); }
if (error != null && error.length() != 0) {
view.showConnectionError(); @Override
return; public void release() {
} setUserAway();
if (!session.isTokenVerified()) { super.release();
view.showConnecting(); }
return;
} @Override
public void onOpenRoom(String hostname, String roomId) {
view.showConnectionOk(); final Disposable subscription = canCreateRoomInteractor.canCreate(roomId)
}, .subscribeOn(AndroidSchedulers.from(BackgroundLooper.get()))
Logger::report .observeOn(AndroidSchedulers.mainThread())
); .subscribe(
allowed -> {
addSubscription(subscription); if (allowed) {
} view.showRoom(hostname, roomId);
} else {
private void subscribeToNetworkChanges() { view.showHome();
Disposable disposable = connectivityManagerApi.getServerConnectivityAsObservable() }
.observeOn(AndroidSchedulers.mainThread()) },
.subscribe( Logger::report
connectivity -> { );
if (connectivity.state == ServerConnectivity.STATE_CONNECTED) {
view.showConnectionOk(); addSubscription(subscription);
view.refreshRoom(); }
} else if (connectivity.state == ServerConnectivity.STATE_DISCONNECTED) {
if (connectivity.code == DDPClient.REASON_NETWORK_ERROR) { @Override
view.showConnectionError(); public void onRetryLogin() {
} final Disposable subscription = sessionInteractor.retryLogin()
} else { .subscribe();
view.showConnecting();
} addSubscription(subscription);
}, }
Logger::report
); @Override
public void beforeLogoutCleanUp() {
addSubscription(disposable); clearSubscriptions();
} }
private void setUserOnline() { private Pair<String, String> getLogoAndSiteNamePair(Pair<Optional<PublicSetting>, Optional<PublicSetting>> settingsPair) {
methodCallHelper.setUserPresence(User.STATUS_ONLINE) String logoUrl = "";
.continueWith(new LogIfError()); String siteName = "";
} if (settingsPair.first.isPresent()) {
logoUrl = settingsPair.first.get().getValue();
private void setUserAway() { }
methodCallHelper.setUserPresence(User.STATUS_AWAY) if (settingsPair.second.isPresent()) {
.continueWith(new LogIfError()); siteName = settingsPair.second.get().getValue();
} }
return new Pair<>(logoUrl, siteName);
private boolean shouldLaunchAddServerActivity() { }
return connectivityManagerApi.getServerList().isEmpty();
} private List<Pair<String, Pair<String, String>>> getServerList(String hostname, Pair<String, String> serverInfoPair) throws JSONException {
JSONObject jsonObject = new JSONObject(serverInfoPair.first);
String logoUrl = (jsonObject.has("url")) ?
jsonObject.optString("url") : jsonObject.optString("defaultUrl");
String siteName = serverInfoPair.second;
rocketChatCache.addHostname(hostname.toLowerCase(), logoUrl, siteName);
return rocketChatCache.getServerList();
}
private void openRoom() {
String hostname = rocketChatCache.getSelectedServerHostname();
String roomId = rocketChatCache.getSelectedRoomId();
if (roomId == null || roomId.length() == 0) {
view.showHome();
return;
}
onOpenRoom(hostname, roomId);
}
private void subscribeToUnreadCount() {
final Disposable subscription = Flowable.combineLatest(
roomInteractor.getTotalUnreadRoomsCount(),
roomInteractor.getTotalUnreadMentionsCount(),
(Pair::new)
)
.subscribeOn(AndroidSchedulers.from(BackgroundLooper.get()))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
pair -> view.showUnreadCount(pair.first, pair.second),
Logger::report
);
addSubscription(subscription);
}
private void subscribeToSession() {
final Disposable subscription = sessionInteractor.getDefault()
.subscribeOn(AndroidSchedulers.from(BackgroundLooper.get()))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
sessionOptional -> {
Session session = sessionOptional.orNull();
if (session == null || session.getToken() == null) {
view.showLoginScreen();
return;
}
String error = session.getError();
if (error != null && error.length() != 0) {
view.showConnectionError();
return;
}
if (!session.isTokenVerified()) {
view.showConnecting();
return;
}
view.showConnectionOk();
},
Logger::report
);
addSubscription(subscription);
}
private void subscribeToNetworkChanges() {
Disposable disposable = connectivityManagerApi.getServerConnectivityAsObservable()
.distinctUntilChanged()
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
connectivity -> {
if (connectivity.state == ServerConnectivity.STATE_CONNECTED) {
view.showConnectionOk();
view.refreshRoom();
} else if (connectivity.state == ServerConnectivity.STATE_DISCONNECTED) {
if (connectivity.code == DDPClient.REASON_NETWORK_ERROR) {
view.showConnectionError();
}
} else {
view.showConnecting();
}
},
Logger::report
);
addSubscription(disposable);
}
private void setUserOnline() {
methodCallHelper.setUserPresence(User.STATUS_ONLINE)
.continueWith(new LogIfError());
}
private void setUserAway() {
methodCallHelper.setUserPresence(User.STATUS_AWAY)
.continueWith(new LogIfError());
}
private boolean shouldLaunchAddServerActivity() {
return connectivityManagerApi.getServerList().isEmpty();
}
} }
...@@ -6,26 +6,26 @@ import android.support.annotation.Nullable; ...@@ -6,26 +6,26 @@ import android.support.annotation.Nullable;
import java.util.List; import java.util.List;
import chat.rocket.core.models.ServerInfo; import chat.rocket.core.models.ServerInfo;
import io.reactivex.Observable; import io.reactivex.Flowable;
import io.reactivex.Single; import io.reactivex.Single;
/** /**
* interfaces used for Activity/Fragment and other UI-related logic. * interfaces used for Activity/Fragment and other UI-related logic.
*/ */
public interface ConnectivityManagerApi { public interface ConnectivityManagerApi {
void keepAliveServer(); void keepAliveServer();
void addOrUpdateServer(String hostname, @Nullable String name, boolean insecure); void addOrUpdateServer(String hostname, @Nullable String name, boolean insecure);
void removeServer(String hostname); void removeServer(String hostname);
Single<Boolean> connect(String hostname); Single<Boolean> connect(String hostname);
List<ServerInfo> getServerList(); List<ServerInfo> getServerList();
Observable<ServerConnectivity> getServerConnectivityAsObservable(); Flowable<ServerConnectivity> getServerConnectivityAsObservable();
int getConnectivityState(@NonNull String hostname); int getConnectivityState(@NonNull String hostname);
void resetConnectivityStateList(); void resetConnectivityStateList();
} }
...@@ -8,10 +8,6 @@ import chat.rocket.core.models.ServerInfo; ...@@ -8,10 +8,6 @@ import chat.rocket.core.models.ServerInfo;
* interfaces used for RocketChatService and RocketChatwebSocketThread. * interfaces used for RocketChatService and RocketChatwebSocketThread.
*/ */
/*package*/ interface ConnectivityManagerInternal { /*package*/ interface ConnectivityManagerInternal {
int REASON_CLOSED_BY_USER = 101;
int REASON_NETWORK_ERROR = 102;
int REASON_SERVER_ERROR = 103;
int REASON_UNKNOWN = 104;
void resetConnectivityStateList(); void resetConnectivityStateList();
......
...@@ -21,261 +21,275 @@ import chat.rocket.android_ddp.DDPClient; ...@@ -21,261 +21,275 @@ import chat.rocket.android_ddp.DDPClient;
import chat.rocket.core.models.ServerInfo; import chat.rocket.core.models.ServerInfo;
import chat.rocket.persistence.realm.models.RealmBasedServerInfo; import chat.rocket.persistence.realm.models.RealmBasedServerInfo;
import hugo.weaving.DebugLog; import hugo.weaving.DebugLog;
import io.reactivex.Observable; import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.Single; import io.reactivex.Single;
import io.reactivex.schedulers.Schedulers; import io.reactivex.schedulers.Schedulers;
import io.reactivex.subjects.PublishSubject; import io.reactivex.subjects.BehaviorSubject;
/** /**
* Connectivity management implementation. * Connectivity management implementation.
*/ */
/*package*/ class RealmBasedConnectivityManager /*package*/ class RealmBasedConnectivityManager
implements ConnectivityManagerApi, ConnectivityManagerInternal { implements ConnectivityManagerApi, ConnectivityManagerInternal {
private volatile ConcurrentHashMap<String, Integer> serverConnectivityList = new ConcurrentHashMap<>(); private volatile ConcurrentHashMap<String, Integer> serverConnectivityList = new ConcurrentHashMap<>();
private final PublishSubject<ServerConnectivity> connectivitySubject = PublishSubject.create(); private volatile BehaviorSubject<ServerConnectivity> connectivitySubject = BehaviorSubject.createDefault(ServerConnectivity.CONNECTED);
private Context appContext; private Context appContext;
private final ServiceConnection serviceConnection = new ServiceConnection() { private final ServiceConnection serviceConnection = new ServiceConnection() {
@Override
public void onServiceConnected(ComponentName componentName, IBinder binder) {
serviceInterface = ((RocketChatService.LocalBinder) binder).getServiceInterface();
}
@Override
public void onServiceDisconnected(ComponentName componentName) {
serviceInterface = null;
}
};
private ConnectivityServiceInterface serviceInterface;
/*package*/ RealmBasedConnectivityManager setContext(Context appContext) {
this.appContext = appContext.getApplicationContext();
return this;
}
@Override @Override
public void onServiceConnected(ComponentName componentName, IBinder binder) { public void resetConnectivityStateList() {
serviceInterface = ((RocketChatService.LocalBinder) binder).getServiceInterface(); serverConnectivityList.clear();
for (ServerInfo serverInfo : RealmBasedServerInfo.getServerInfoList()) {
serverConnectivityList.put(serverInfo.getHostname(), ServerConnectivity.STATE_DISCONNECTED);
}
} }
@Override @Override
public void onServiceDisconnected(ComponentName componentName) { public void keepAliveServer() {
serviceInterface = null; RocketChatService.keepAlive(appContext);
if (serviceInterface == null) {
RocketChatService.bind(appContext, serviceConnection);
}
} }
};
private ConnectivityServiceInterface serviceInterface;
@SuppressLint("RxLeakedSubscription")
@DebugLog
@Override
public void ensureConnections() {
String hostname = new RocketChatCache(appContext).getSelectedServerHostname();
if (hostname == null) {
return;
}
connectToServerIfNeeded(hostname, true/* force connect */)
.subscribeOn(Schedulers.io())
.subscribe(connected -> {
if (!connected) {
notifyConnectionLost(hostname, DDPClient.REASON_NETWORK_ERROR);
}
}, error -> {
RCLog.e(error);
notifyConnectionLost(hostname, DDPClient.REASON_NETWORK_ERROR);
});
}
/*package*/ RealmBasedConnectivityManager setContext(Context appContext) { @SuppressLint("RxLeakedSubscription")
this.appContext = appContext.getApplicationContext(); @Override
return this; public void addOrUpdateServer(String hostname, @Nullable String name, boolean insecure) {
} RealmBasedServerInfo.addOrUpdate(hostname, name, insecure);
if (!serverConnectivityList.containsKey(hostname)) {
serverConnectivityList.put(hostname, ServerConnectivity.STATE_DISCONNECTED);
}
connectToServerIfNeeded(hostname, false)
.subscribe(connected -> {
}, RCLog::e);
}
@Override @SuppressLint("RxLeakedSubscription")
public void resetConnectivityStateList() { @Override
serverConnectivityList.clear(); public void removeServer(String hostname) {
for (ServerInfo serverInfo : RealmBasedServerInfo.getServerInfoList()) { RealmBasedServerInfo.remove(hostname);
serverConnectivityList.put(serverInfo.getHostname(), ServerConnectivity.STATE_DISCONNECTED); if (serverConnectivityList.containsKey(hostname)) {
disconnectFromServerIfNeeded(hostname)
.subscribe(_val -> {
}, RCLog::e);
}
} }
}
@Override @Override
public void keepAliveServer() { public Single<Boolean> connect(String hostname) {
RocketChatService.keepAlive(appContext); return connectToServerIfNeeded(hostname, false);
if (serviceInterface == null) {
RocketChatService.bind(appContext, serviceConnection);
} }
}
@Override
@SuppressLint("RxLeakedSubscription") public List<ServerInfo> getServerList() {
@DebugLog return RealmBasedServerInfo.getServerInfoList();
@Override
public void ensureConnections() {
String hostname = new RocketChatCache(appContext).getSelectedServerHostname();
if (hostname == null) {
return;
} }
connectToServerIfNeeded(hostname, true/* force connect */)
.subscribeOn(Schedulers.io()) @Override
.subscribe(_val -> { public ServerInfo getServerInfoForHost(String hostname) {
}, error -> { return RealmBasedServerInfo.getServerInfoForHost(hostname);
RCLog.e(error); }
notifyConnectionLost(hostname, DDPClient.REASON_NETWORK_ERROR);
}); private List<ServerConnectivity> getCurrentConnectivityList() {
} ArrayList<ServerConnectivity> list = new ArrayList<>();
for (Map.Entry<String, Integer> entry : serverConnectivityList.entrySet()) {
@SuppressLint("RxLeakedSubscription") list.add(new ServerConnectivity(entry.getKey(), entry.getValue()));
@Override }
public void addOrUpdateServer(String hostname, @Nullable String name, boolean insecure) { return list;
RealmBasedServerInfo.addOrUpdate(hostname, name, insecure); }
if (!serverConnectivityList.containsKey(hostname)) {
serverConnectivityList.put(hostname, ServerConnectivity.STATE_DISCONNECTED); @DebugLog
@Override
public void notifyConnectionEstablished(String hostname, String session) {
if (session != null) {
RealmBasedServerInfo.updateSession(hostname, session);
}
serverConnectivityList.put(hostname, ServerConnectivity.STATE_CONNECTED);
connectivitySubject.onNext(
new ServerConnectivity(hostname, ServerConnectivity.STATE_CONNECTED));
} }
connectToServerIfNeeded(hostname, false)
.subscribe(_val -> { @DebugLog
}, RCLog::e); @Override
} public void notifyConnectionLost(String hostname, int code) {
serverConnectivityList.put(hostname, ServerConnectivity.STATE_DISCONNECTED);
@SuppressLint("RxLeakedSubscription") connectivitySubject.onNext(
@Override new ServerConnectivity(hostname, ServerConnectivity.STATE_DISCONNECTED, code));
public void removeServer(String hostname) {
RealmBasedServerInfo.remove(hostname);
if (serverConnectivityList.containsKey(hostname)) {
disconnectFromServerIfNeeded(hostname)
.subscribe(_val -> {
}, RCLog::e);
} }
}
@DebugLog
@Override @Override
public Single<Boolean> connect(String hostname) { public void notifyConnecting(String hostname) {
return connectToServerIfNeeded(hostname, false); serverConnectivityList.put(hostname, ServerConnectivity.STATE_CONNECTING);
} connectivitySubject.onNext(
new ServerConnectivity(hostname, ServerConnectivity.STATE_CONNECTING));
@Override }
public List<ServerInfo> getServerList() {
return RealmBasedServerInfo.getServerInfoList(); @Override
} public Flowable<ServerConnectivity> getServerConnectivityAsObservable() {
return connectivitySubject.toFlowable(BackpressureStrategy.LATEST);
@Override }
public ServerInfo getServerInfoForHost(String hostname) {
return RealmBasedServerInfo.getServerInfoForHost(hostname); @Override
} public int getConnectivityState(@NonNull String hostname) {
return serverConnectivityList.get(hostname);
private List<ServerConnectivity> getCurrentConnectivityList() { }
ArrayList<ServerConnectivity> list = new ArrayList<>();
for (Map.Entry<String, Integer> entry : serverConnectivityList.entrySet()) { @DebugLog
list.add(new ServerConnectivity(entry.getKey(), entry.getValue())); private Single<Boolean> connectToServerIfNeeded(String hostname, boolean forceConnect) {
return Single.defer(() -> {
Integer state = serverConnectivityList.get(hostname);
if (state == null) {
state = ServerConnectivity.STATE_DISCONNECTED;
}
final int connectivity = state;
if (!forceConnect && connectivity == ServerConnectivity.STATE_CONNECTED) {
return Single.just(true);
}
if (connectivity == ServerConnectivity.STATE_DISCONNECTING) {
return waitForDisconnected(hostname)
.flatMap(_val -> connectToServerIfNeeded(hostname, forceConnect));
}
if (connectivity == ServerConnectivity.STATE_DISCONNECTED) {
// notifyConnecting(hostname);
}
return connectToServer(hostname)
.retry(exception -> exception instanceof ThreadLooperNotPreparedException)
.onErrorResumeNext(Single.just(false));
});
}
private Single<Boolean> disconnectFromServerIfNeeded(String hostname) {
return Single.defer(() -> {
final int connectivity = serverConnectivityList.get(hostname);
if (connectivity == ServerConnectivity.STATE_DISCONNECTED) {
return Single.just(true);
}
if (connectivity == ServerConnectivity.STATE_CONNECTING) {
return waitForConnected(hostname)
.doOnError(err -> notifyConnectionLost(hostname, DDPClient.REASON_NETWORK_ERROR))
.flatMap(_val -> disconnectFromServerIfNeeded(hostname));
}
if (connectivity == ServerConnectivity.STATE_DISCONNECTING) {
return waitForDisconnected(hostname);
}
return disconnectFromServer(hostname)
.retryWhen(RxHelper.exponentialBackoff(1, 500, TimeUnit.MILLISECONDS));
});
}
@DebugLog
private Single<Boolean> waitForConnected(String hostname) {
return connectivitySubject
.filter(serverConnectivity -> hostname.equals(serverConnectivity.hostname))
.map(serverConnectivity -> serverConnectivity.state)
.filter(state ->
state == ServerConnectivity.STATE_CONNECTED
|| state == ServerConnectivity.STATE_DISCONNECTED)
.firstElement()
.toSingle()
.flatMap(state ->
state == ServerConnectivity.STATE_CONNECTED
? Single.just(true)
: Single.error(new ServerConnectivity.DisconnectedException()));
}
@DebugLog
private Single<Boolean> waitForDisconnected(String hostname) {
return connectivitySubject
.filter(serverConnectivity -> hostname.equals(serverConnectivity.hostname))
.map(serverConnectivity -> serverConnectivity.state)
.filter(state -> state == ServerConnectivity.STATE_DISCONNECTED)
.firstElement()
.toSingle()
.map(state -> true);
}
@DebugLog
private Single<Boolean> connectToServer(String hostname) {
return Single.defer(() -> {
if (!serverConnectivityList.containsKey(hostname)) {
return Single.error(new IllegalArgumentException("hostname not found"));
}
if (serverConnectivityList.get(hostname) != ServerConnectivity.STATE_CONNECTED) {
// Mark as CONNECTING except for the case [forceConnect && connected] because
// ensureConnectionToServer doesn't notify ConnectionEstablished/Lost is already connected.
// serverConnectivityList.put(hostname, ServerConnectivity.STATE_CONNECTING);
}
if (serviceInterface != null) {
return serviceInterface.ensureConnectionToServer(hostname);
} else {
return Single.error(new ThreadLooperNotPreparedException("not prepared"));
}
});
}
private Single<Boolean> disconnectFromServer(String hostname) {
return Single.defer(() -> {
if (!serverConnectivityList.containsKey(hostname)) {
return Single.error(new IllegalArgumentException("hostname not found"));
}
serverConnectivityList.put(hostname, ServerConnectivity.STATE_DISCONNECTING);
if (serviceInterface != null) {
return serviceInterface.disconnectFromServer(hostname)
// //after disconnection from server, remove HOSTNAME key from HashMap
.doAfterTerminate(() -> serverConnectivityList.remove(hostname));
} else {
return Single.error(new IllegalStateException("not prepared"));
}
});
}
private static class ThreadLooperNotPreparedException extends IllegalStateException {
ThreadLooperNotPreparedException(String message) {
super(message);
}
} }
return list;
}
@DebugLog
@Override
public void notifyConnectionEstablished(String hostname, String session) {
RealmBasedServerInfo.updateSession(hostname, session);
serverConnectivityList.put(hostname, ServerConnectivity.STATE_CONNECTED);
connectivitySubject.onNext(
new ServerConnectivity(hostname, ServerConnectivity.STATE_CONNECTED));
}
@DebugLog
@Override
public void notifyConnectionLost(String hostname, int code) {
serverConnectivityList.put(hostname, ServerConnectivity.STATE_DISCONNECTED);
connectivitySubject.onNext(
new ServerConnectivity(hostname, ServerConnectivity.STATE_DISCONNECTED, code));
}
@DebugLog
@Override
public void notifyConnecting(String hostname) {
serverConnectivityList.put(hostname, ServerConnectivity.STATE_CONNECTING);
connectivitySubject.onNext(
new ServerConnectivity(hostname, ServerConnectivity.STATE_CONNECTING));
}
@Override
public Observable<ServerConnectivity> getServerConnectivityAsObservable() {
return Observable.concat(Observable.fromIterable(getCurrentConnectivityList()), connectivitySubject);
}
@Override
public int getConnectivityState(@NonNull String hostname) {
return serverConnectivityList.get(hostname);
}
@DebugLog
private Single<Boolean> connectToServerIfNeeded(String hostname, boolean forceConnect) {
return Single.defer(() -> {
Integer state = serverConnectivityList.get(hostname);
if (state == null) {
state = ServerConnectivity.STATE_DISCONNECTED;
}
final int connectivity = state;
if (!forceConnect && connectivity == ServerConnectivity.STATE_CONNECTED) {
return Single.just(true);
}
if (connectivity == ServerConnectivity.STATE_DISCONNECTING) {
return waitForDisconnected(hostname)
.flatMap(_val -> connectToServerIfNeeded(hostname, forceConnect));
}
if (connectivity == ServerConnectivity.STATE_DISCONNECTED) {
notifyConnecting(hostname);
}
return connectToServer(hostname);
});
}
private Single<Boolean> disconnectFromServerIfNeeded(String hostname) {
return Single.defer(() -> {
final int connectivity = serverConnectivityList.get(hostname);
if (connectivity == ServerConnectivity.STATE_DISCONNECTED) {
return Single.just(true);
}
if (connectivity == ServerConnectivity.STATE_CONNECTING) {
return waitForConnected(hostname)
.doOnError(err -> notifyConnectionLost(hostname, DDPClient.REASON_NETWORK_ERROR))
.flatMap(_val -> disconnectFromServerIfNeeded(hostname));
}
if (connectivity == ServerConnectivity.STATE_DISCONNECTING) {
return waitForDisconnected(hostname);
}
return disconnectFromServer(hostname)
.retryWhen(RxHelper.exponentialBackoff(1, 500, TimeUnit.MILLISECONDS));
});
}
@DebugLog
private Single<Boolean> waitForConnected(String hostname) {
return connectivitySubject
.filter(serverConnectivity -> hostname.equals(serverConnectivity.hostname))
.map(serverConnectivity -> serverConnectivity.state)
.filter(state ->
state == ServerConnectivity.STATE_CONNECTED
|| state == ServerConnectivity.STATE_DISCONNECTED)
.firstElement()
.toSingle()
.flatMap(state ->
state == ServerConnectivity.STATE_CONNECTED
? Single.just(true)
: Single.error(new ServerConnectivity.DisconnectedException()));
}
@DebugLog
private Single<Boolean> waitForDisconnected(String hostname) {
return connectivitySubject
.filter(serverConnectivity -> hostname.equals(serverConnectivity.hostname))
.map(serverConnectivity -> serverConnectivity.state)
.filter(state -> state == ServerConnectivity.STATE_DISCONNECTED)
.firstElement()
.toSingle()
.map(state -> true);
}
@DebugLog
private Single<Boolean> connectToServer(String hostname) {
return Single.defer(() -> {
if (!serverConnectivityList.containsKey(hostname)) {
return Single.error(new IllegalArgumentException("hostname not found"));
}
if (serverConnectivityList.get(hostname) != ServerConnectivity.STATE_CONNECTED) {
// Mark as CONNECTING except for the case [forceConnect && connected] because
// ensureConnectionToServer doesn't notify ConnectionEstablished/Lost is already connected.
// serverConnectivityList.put(hostname, ServerConnectivity.STATE_CONNECTING);
}
if (serviceInterface != null) {
return serviceInterface.ensureConnectionToServer(hostname);
} else {
return Single.error(new IllegalStateException("not prepared"));
}
});
}
private Single<Boolean> disconnectFromServer(String hostname) {
return Single.defer(() -> {
if (!serverConnectivityList.containsKey(hostname)) {
return Single.error(new IllegalArgumentException("hostname not found"));
}
serverConnectivityList.put(hostname, ServerConnectivity.STATE_DISCONNECTING);
if (serviceInterface != null) {
return serviceInterface.disconnectFromServer(hostname)
// //after disconnection from server, remove HOSTNAME key from HashMap
.doAfterTerminate(() -> serverConnectivityList.remove(hostname));
} else {
return Single.error(new IllegalStateException("not prepared"));
}
});
}
} }
\ No newline at end of file
...@@ -23,137 +23,130 @@ import io.reactivex.Single; ...@@ -23,137 +23,130 @@ import io.reactivex.Single;
*/ */
public class RocketChatService extends Service implements ConnectivityServiceInterface { public class RocketChatService extends Service implements ConnectivityServiceInterface {
private ConnectivityManagerInternal connectivityManager; private ConnectivityManagerInternal connectivityManager;
private static volatile Semaphore webSocketThreadLock = new Semaphore(1); private static volatile Semaphore webSocketThreadLock = new Semaphore(1);
private static volatile RocketChatWebSocketThread currentWebSocketThread; private static volatile RocketChatWebSocketThread currentWebSocketThread;
public class LocalBinder extends Binder {
ConnectivityServiceInterface getServiceInterface() {
return RocketChatService.this;
}
}
private final LocalBinder localBinder = new LocalBinder();
/**
* ensure RocketChatService alive.
*/
/*package*/static void keepAlive(Context context) {
context.startService(new Intent(context, RocketChatService.class));
}
public static void bind(Context context, ServiceConnection serviceConnection) {
context.bindService(
new Intent(context, RocketChatService.class), serviceConnection, Context.BIND_AUTO_CREATE);
}
public static void unbind(Context context, ServiceConnection serviceConnection) {
context.unbindService(serviceConnection);
}
@DebugLog
@Override
public void onCreate() {
super.onCreate();
connectivityManager = ConnectivityManager.getInstanceForInternal(getApplicationContext());
connectivityManager.resetConnectivityStateList();
}
public class LocalBinder extends Binder { @DebugLog
ConnectivityServiceInterface getServiceInterface() { @Override
return RocketChatService.this; public int onStartCommand(Intent intent, int flags, int startId) {
connectivityManager.ensureConnections();
return START_NOT_STICKY;
} }
}
@Override
private final LocalBinder localBinder = new LocalBinder(); public Single<Boolean> ensureConnectionToServer(String hostname) { //called via binder.
return getOrCreateWebSocketThread(hostname)
/** .flatMap(RocketChatWebSocketThread::keepAlive);
* ensure RocketChatService alive. }
*/
/*package*/ static void keepAlive(Context context) { @Override
context.startService(new Intent(context, RocketChatService.class)); public Single<Boolean> disconnectFromServer(String hostname) { //called via binder.
} return Single.defer(() -> {
if (!existsThreadForHostname(hostname)) {
public static void bind(Context context, ServiceConnection serviceConnection) { return Single.just(true);
context.bindService( }
new Intent(context, RocketChatService.class), serviceConnection, Context.BIND_AUTO_CREATE);
} if (currentWebSocketThread != null) {
return currentWebSocketThread.terminate()
public static void unbind(Context context, ServiceConnection serviceConnection) { // after disconnection from server
context.unbindService(serviceConnection); .doAfterTerminate(() -> {
}
@DebugLog
@Override
public void onCreate() {
super.onCreate();
connectivityManager = ConnectivityManager.getInstanceForInternal(getApplicationContext());
connectivityManager.resetConnectivityStateList();
}
@DebugLog
@Override
public int onStartCommand(Intent intent, int flags, int startId) {
connectivityManager.ensureConnections();
return START_NOT_STICKY;
}
@Override
public Single<Boolean> ensureConnectionToServer(String hostname) { //called via binder.
return getOrCreateWebSocketThread(hostname)
.doOnError(err -> {
err.printStackTrace();
currentWebSocketThread = null;
})
.flatMap(webSocketThreads -> webSocketThreads.keepAlive());
}
@Override
public Single<Boolean> disconnectFromServer(String hostname) { //called via binder.
return Single.defer(() -> {
if (!existsThreadForHostname(hostname)) {
return Single.just(true);
}
if (currentWebSocketThread != null) {
return currentWebSocketThread.terminate()
// after disconnection from server
.doAfterTerminate(() -> {
currentWebSocketThread = null;
// remove RealmConfiguration key from HashMap
RealmStore.sStore.remove(hostname);
});
} else {
return Observable.timer(1, TimeUnit.SECONDS).singleOrError()
.flatMap(_val -> disconnectFromServer(hostname));
}
});
}
@DebugLog
private Single<RocketChatWebSocketThread> getOrCreateWebSocketThread(String hostname) {
return Single.defer(() -> {
webSocketThreadLock.acquire();
int connectivityState = ConnectivityManager.getInstance(getApplicationContext()).getConnectivityState(hostname);
boolean isDisconnected = connectivityState != ServerConnectivity.STATE_CONNECTED;
if (currentWebSocketThread != null && existsThreadForHostname(hostname) && !isDisconnected) {
webSocketThreadLock.release();
return Single.just(currentWebSocketThread);
}
connectivityManager.notifyConnecting(hostname);
if (currentWebSocketThread != null) {
return currentWebSocketThread.terminate()
.doAfterTerminate(() -> currentWebSocketThread = null)
.doOnError(RCLog::e)
.flatMap(terminated ->
RocketChatWebSocketThread.getStarted(getApplicationContext(), hostname)
.doOnSuccess(thread -> {
currentWebSocketThread = thread;
webSocketThreadLock.release();
})
.doOnError(throwable -> {
currentWebSocketThread = null; currentWebSocketThread = null;
RCLog.e(throwable); // remove RealmConfiguration key from HashMap
Logger.report(throwable); RealmStore.sStore.remove(hostname);
webSocketThreadLock.release(); });
}) } else {
); return Observable.timer(1, TimeUnit.SECONDS).singleOrError()
} .flatMap(_val -> disconnectFromServer(hostname));
}
return RocketChatWebSocketThread.getStarted(getApplicationContext(), hostname) });
.doOnSuccess(thread -> { }
currentWebSocketThread = thread;
webSocketThreadLock.release(); @DebugLog
}) private Single<RocketChatWebSocketThread> getOrCreateWebSocketThread(String hostname) {
.doOnError(throwable -> { return Single.defer(() -> {
currentWebSocketThread = null; webSocketThreadLock.acquire();
RCLog.e(throwable); int connectivityState = ConnectivityManager.getInstance(getApplicationContext()).getConnectivityState(hostname);
Logger.report(throwable); boolean isDisconnected = connectivityState != ServerConnectivity.STATE_CONNECTED;
webSocketThreadLock.release(); if (currentWebSocketThread != null && existsThreadForHostname(hostname) && !isDisconnected) {
}); webSocketThreadLock.release();
}); return Single.just(currentWebSocketThread);
} }
private boolean existsThreadForHostname(String hostname) { if (currentWebSocketThread != null) {
if (hostname == null || currentWebSocketThread == null) { return currentWebSocketThread.terminate()
return false; .doAfterTerminate(() -> currentWebSocketThread = null)
.flatMap(terminated ->
RocketChatWebSocketThread.getStarted(getApplicationContext(), hostname)
.doOnSuccess(thread -> {
currentWebSocketThread = thread;
webSocketThreadLock.release();
})
.doOnError(throwable -> {
currentWebSocketThread = null;
RCLog.e(throwable);
Logger.report(throwable);
webSocketThreadLock.release();
})
);
}
return RocketChatWebSocketThread.getStarted(getApplicationContext(), hostname)
.doOnSuccess(thread -> {
currentWebSocketThread = thread;
webSocketThreadLock.release();
})
.doOnError(throwable -> {
currentWebSocketThread = null;
RCLog.e(throwable);
Logger.report(throwable);
webSocketThreadLock.release();
});
});
}
private boolean existsThreadForHostname(String hostname) {
if (hostname == null || currentWebSocketThread == null) {
return false;
}
return currentWebSocketThread.getName().equals("RC_thread_" + hostname);
}
@Nullable
@Override
public IBinder onBind(Intent intent) {
return localBinder;
} }
return currentWebSocketThread.getName().equals("RC_thread_" + hostname);
}
@Nullable
@Override
public IBinder onBind(Intent intent) {
return localBinder;
}
} }
...@@ -76,26 +76,6 @@ public class RocketChatWebSocketThread extends HandlerThread { ...@@ -76,26 +76,6 @@ public class RocketChatWebSocketThread extends HandlerThread {
private final CompositeDisposable reconnectDisposable = new CompositeDisposable(); private final CompositeDisposable reconnectDisposable = new CompositeDisposable();
private boolean listenersRegistered; private boolean listenersRegistered;
private static class KeepAliveTimer {
private long lastTime;
private final long thresholdMs;
public KeepAliveTimer(long thresholdMs) {
this.thresholdMs = thresholdMs;
lastTime = System.currentTimeMillis();
}
public boolean shouldCheckPrecisely() {
return lastTime + thresholdMs < System.currentTimeMillis();
}
public void update() {
lastTime = System.currentTimeMillis();
}
}
private final KeepAliveTimer keepAliveTimer = new KeepAliveTimer(20000);
private RocketChatWebSocketThread(Context appContext, String hostname) { private RocketChatWebSocketThread(Context appContext, String hostname) {
super("RC_thread_" + hostname); super("RC_thread_" + hostname);
this.appContext = appContext; this.appContext = appContext;
...@@ -108,7 +88,7 @@ public class RocketChatWebSocketThread extends HandlerThread { ...@@ -108,7 +88,7 @@ public class RocketChatWebSocketThread extends HandlerThread {
* build new Thread. * build new Thread.
*/ */
@DebugLog @DebugLog
public static Single<RocketChatWebSocketThread> getStarted(Context appContext, String hostname) { /* package */ static Single<RocketChatWebSocketThread> getStarted(Context appContext, String hostname) {
return Single.<RocketChatWebSocketThread>create(objectSingleEmitter -> { return Single.<RocketChatWebSocketThread>create(objectSingleEmitter -> {
new RocketChatWebSocketThread(appContext, hostname) { new RocketChatWebSocketThread(appContext, hostname) {
@Override @Override
...@@ -148,7 +128,7 @@ public class RocketChatWebSocketThread extends HandlerThread { ...@@ -148,7 +128,7 @@ public class RocketChatWebSocketThread extends HandlerThread {
* terminate WebSocket thread. * terminate WebSocket thread.
*/ */
@DebugLog @DebugLog
public Single<Boolean> terminate() { /* package */ Single<Boolean> terminate() {
if (isAlive()) { if (isAlive()) {
return Single.create(emitter -> { return Single.create(emitter -> {
new Handler(getLooper()).post(() -> { new Handler(getLooper()).post(() -> {
...@@ -181,7 +161,7 @@ public class RocketChatWebSocketThread extends HandlerThread { ...@@ -181,7 +161,7 @@ public class RocketChatWebSocketThread extends HandlerThread {
* synchronize the state of the thread with ServerConfig. * synchronize the state of the thread with ServerConfig.
*/ */
@DebugLog @DebugLog
public Single<Boolean> keepAlive() { /* package */ Single<Boolean> keepAlive() {
return checkIfConnectionAlive() return checkIfConnectionAlive()
.flatMap(alive -> alive ? Single.just(true) : connectWithExponentialBackoff()); .flatMap(alive -> alive ? Single.just(true) : connectWithExponentialBackoff());
} }
...@@ -192,11 +172,6 @@ public class RocketChatWebSocketThread extends HandlerThread { ...@@ -192,11 +172,6 @@ public class RocketChatWebSocketThread extends HandlerThread {
return Single.just(false); return Single.just(false);
} }
if (!keepAliveTimer.shouldCheckPrecisely()) {
return Single.just(true);
}
keepAliveTimer.update();
return Single.create(emitter -> { return Single.create(emitter -> {
new Thread() { new Thread() {
@Override @Override
...@@ -207,9 +182,8 @@ public class RocketChatWebSocketThread extends HandlerThread { ...@@ -207,9 +182,8 @@ public class RocketChatWebSocketThread extends HandlerThread {
RCLog.e(error); RCLog.e(error);
connectivityManager.notifyConnectionLost( connectivityManager.notifyConnectionLost(
hostname, DDPClient.REASON_CLOSED_BY_USER); hostname, DDPClient.REASON_CLOSED_BY_USER);
emitter.onError(error); emitter.onSuccess(false);
} else { } else {
keepAliveTimer.update();
emitter.onSuccess(true); emitter.onSuccess(true);
} }
return null; return null;
...@@ -245,11 +219,11 @@ public class RocketChatWebSocketThread extends HandlerThread { ...@@ -245,11 +219,11 @@ public class RocketChatWebSocketThread extends HandlerThread {
return; return;
} }
RCLog.d("DDPClient#connect"); RCLog.d("DDPClient#connect");
connectivityManager.notifyConnecting(hostname);
DDPClient.get().connect(hostname, info.getSession(), info.isSecure()) DDPClient.get().connect(hostname, info.getSession(), info.isSecure())
.onSuccessTask(task -> { .onSuccessTask(task -> {
final String newSession = task.getResult().session; final String newSession = task.getResult().session;
connectivityManager.notifyConnectionEstablished(hostname, newSession); connectivityManager.notifyConnectionEstablished(hostname, newSession);
// handling WebSocket#onClose() callback. // handling WebSocket#onClose() callback.
task.getResult().client.getOnCloseCallback().onSuccess(_task -> { task.getResult().client.getOnCloseCallback().onSuccess(_task -> {
RxWebSocketCallback.Close result = _task.getResult(); RxWebSocketCallback.Close result = _task.getResult();
...@@ -292,18 +266,18 @@ public class RocketChatWebSocketThread extends HandlerThread { ...@@ -292,18 +266,18 @@ public class RocketChatWebSocketThread extends HandlerThread {
return; return;
} }
forceInvalidateTokens(); forceInvalidateTokens();
connectivityManager.notifyConnecting(hostname);
reconnectDisposable.add( reconnectDisposable.add(
connectWithExponentialBackoff() connectWithExponentialBackoff()
.subscribe(connected -> { .subscribe(connected -> {
if (!connected) { if (!connected) {
connectivityManager.notifyConnecting(hostname); connectivityManager.notifyConnectionLost(hostname,
DDPClient.REASON_NETWORK_ERROR);
} }
reconnectDisposable.clear(); reconnectDisposable.clear();
}, error -> { }, error -> {
logErrorAndUnsubscribe(reconnectDisposable, error);
connectivityManager.notifyConnectionLost(hostname, connectivityManager.notifyConnectionLost(hostname,
DDPClient.REASON_NETWORK_ERROR); DDPClient.REASON_NETWORK_ERROR);
logErrorAndUnsubscribe(reconnectDisposable, error);
} }
) )
); );
...@@ -315,7 +289,9 @@ public class RocketChatWebSocketThread extends HandlerThread { ...@@ -315,7 +289,9 @@ public class RocketChatWebSocketThread extends HandlerThread {
} }
private Single<Boolean> connectWithExponentialBackoff() { private Single<Boolean> connectWithExponentialBackoff() {
return connect().retryWhen(RxHelper.exponentialBackoff(3, 500, TimeUnit.MILLISECONDS)); return connect()
.retryWhen(RxHelper.exponentialBackoff(1, 250, TimeUnit.MILLISECONDS))
.onErrorResumeNext(Single.just(false));
} }
@DebugLog @DebugLog
......
...@@ -4,10 +4,13 @@ package chat.rocket.android.service; ...@@ -4,10 +4,13 @@ package chat.rocket.android.service;
* pair with server's hostname and its connectivity state. * pair with server's hostname and its connectivity state.
*/ */
public class ServerConnectivity { public class ServerConnectivity {
public static final int STATE_CONNECTED = 1; public static final int STATE_CONNECTED = 1;
public static final int STATE_DISCONNECTED = 2; public static final int STATE_DISCONNECTED = 2;
public static final int STATE_CONNECTING = 3; public static final int STATE_CONNECTING = 3;
/*package*/ static final int STATE_DISCONNECTING = 4; /*package*/ static final int STATE_DISCONNECTING = 4;
public static final ServerConnectivity CONNECTED = new ServerConnectivity(null, STATE_CONNECTED);
public final String hostname; public final String hostname;
public final int state; public final int state;
...@@ -25,6 +28,21 @@ public class ServerConnectivity { ...@@ -25,6 +28,21 @@ public class ServerConnectivity {
this.code = code; this.code = code;
} }
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ServerConnectivity that = (ServerConnectivity) o;
return state == that.state;
}
@Override
public int hashCode() {
return state;
}
/** /**
* This exception should be thrown when connection is lost during waiting for CONNECTED. * This exception should be thrown when connection is lost during waiting for CONNECTED.
*/ */
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment