Commit 87461e2a authored by Leonardo Aramaki's avatar Leonardo Aramaki

Refactor indentation

parent 184cfda5
......@@ -33,227 +33,227 @@ import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.disposables.Disposable;
public class MainPresenter extends BasePresenter<MainContract.View>
implements MainContract.Presenter {
private final CanCreateRoomInteractor canCreateRoomInteractor;
private final RoomInteractor roomInteractor;
private final SessionInteractor sessionInteractor;
private final MethodCallHelper methodCallHelper;
private final ConnectivityManagerApi connectivityManagerApi;
private final RocketChatCache rocketChatCache;
private final PublicSettingRepository publicSettingRepository;
public MainPresenter(RoomInteractor roomInteractor,
CanCreateRoomInteractor canCreateRoomInteractor,
SessionInteractor sessionInteractor,
MethodCallHelper methodCallHelper,
ConnectivityManagerApi connectivityManagerApi,
RocketChatCache rocketChatCache, PublicSettingRepository publicSettingRepository) {
this.roomInteractor = roomInteractor;
this.canCreateRoomInteractor = canCreateRoomInteractor;
this.sessionInteractor = sessionInteractor;
this.methodCallHelper = methodCallHelper;
this.connectivityManagerApi = connectivityManagerApi;
this.rocketChatCache = rocketChatCache;
this.publicSettingRepository = publicSettingRepository;
}
@Override
public void bindViewOnly(@NonNull MainContract.View view) {
super.bindView(view);
subscribeToUnreadCount();
subscribeToSession();
setUserOnline();
}
@Override
public void loadSignedInServers(@NonNull String hostname) {
final Disposable disposable = publicSettingRepository.getById(PublicSettingsConstants.Assets.LOGO)
.zipWith(publicSettingRepository.getById(PublicSettingsConstants.General.SITE_NAME), Pair::new)
.map(this::getLogoAndSiteNamePair)
.map(settings -> getServerList(hostname, settings))
.subscribeOn(AndroidSchedulers.from(BackgroundLooper.get()))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
view::showSignedInServers,
RCLog::e
);
addSubscription(disposable);
}
@Override
public void bindView(@NonNull MainContract.View view) {
super.bindView(view);
if (shouldLaunchAddServerActivity()) {
view.showAddServerScreen();
return;
implements MainContract.Presenter {
private final CanCreateRoomInteractor canCreateRoomInteractor;
private final RoomInteractor roomInteractor;
private final SessionInteractor sessionInteractor;
private final MethodCallHelper methodCallHelper;
private final ConnectivityManagerApi connectivityManagerApi;
private final RocketChatCache rocketChatCache;
private final PublicSettingRepository publicSettingRepository;
public MainPresenter(RoomInteractor roomInteractor,
CanCreateRoomInteractor canCreateRoomInteractor,
SessionInteractor sessionInteractor,
MethodCallHelper methodCallHelper,
ConnectivityManagerApi connectivityManagerApi,
RocketChatCache rocketChatCache, PublicSettingRepository publicSettingRepository) {
this.roomInteractor = roomInteractor;
this.canCreateRoomInteractor = canCreateRoomInteractor;
this.sessionInteractor = sessionInteractor;
this.methodCallHelper = methodCallHelper;
this.connectivityManagerApi = connectivityManagerApi;
this.rocketChatCache = rocketChatCache;
this.publicSettingRepository = publicSettingRepository;
}
openRoom();
subscribeToNetworkChanges();
subscribeToUnreadCount();
subscribeToSession();
setUserOnline();
}
@Override
public void release() {
setUserAway();
super.release();
}
@Override
public void onOpenRoom(String hostname, String roomId) {
final Disposable subscription = canCreateRoomInteractor.canCreate(roomId)
.subscribeOn(AndroidSchedulers.from(BackgroundLooper.get()))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
allowed -> {
if (allowed) {
view.showRoom(hostname, roomId);
} else {
view.showHome();
}
},
Logger::report
);
addSubscription(subscription);
}
@Override
public void onRetryLogin() {
final Disposable subscription = sessionInteractor.retryLogin()
.subscribe();
addSubscription(subscription);
}
@Override
public void beforeLogoutCleanUp() {
clearSubscriptions();
}
private Pair<String, String> getLogoAndSiteNamePair(Pair<Optional<PublicSetting>, Optional<PublicSetting>> settingsPair) {
String logoUrl = "";
String siteName = "";
if (settingsPair.first.isPresent()) {
logoUrl = settingsPair.first.get().getValue();
}
if (settingsPair.second.isPresent()) {
siteName = settingsPair.second.get().getValue();
}
return new Pair<>(logoUrl, siteName);
}
private List<Pair<String, Pair<String, String>>> getServerList(String hostname, Pair<String, String> serverInfoPair) throws JSONException {
JSONObject jsonObject = new JSONObject(serverInfoPair.first);
String logoUrl = (jsonObject.has("url")) ?
jsonObject.optString("url") : jsonObject.optString("defaultUrl");
String siteName = serverInfoPair.second;
rocketChatCache.addHostname(hostname.toLowerCase(), logoUrl, siteName);
return rocketChatCache.getServerList();
}
private void openRoom() {
String hostname = rocketChatCache.getSelectedServerHostname();
String roomId = rocketChatCache.getSelectedRoomId();
if (roomId == null || roomId.length() == 0) {
view.showHome();
return;
@Override
public void bindViewOnly(@NonNull MainContract.View view) {
super.bindView(view);
subscribeToUnreadCount();
subscribeToSession();
setUserOnline();
}
onOpenRoom(hostname, roomId);
}
private void subscribeToUnreadCount() {
final Disposable subscription = Flowable.combineLatest(
roomInteractor.getTotalUnreadRoomsCount(),
roomInteractor.getTotalUnreadMentionsCount(),
(Pair::new)
)
.subscribeOn(AndroidSchedulers.from(BackgroundLooper.get()))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
pair -> view.showUnreadCount(pair.first, pair.second),
Logger::report
);
addSubscription(subscription);
}
private void subscribeToSession() {
final Disposable subscription = sessionInteractor.getDefault()
.subscribeOn(AndroidSchedulers.from(BackgroundLooper.get()))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
sessionOptional -> {
Session session = sessionOptional.orNull();
if (session == null || session.getToken() == null) {
view.showLoginScreen();
return;
}
String error = session.getError();
if (error != null && error.length() != 0) {
view.showConnectionError();
return;
}
if (!session.isTokenVerified()) {
view.showConnecting();
return;
}
view.showConnectionOk();
},
Logger::report
);
addSubscription(subscription);
}
private void subscribeToNetworkChanges() {
Disposable disposable = connectivityManagerApi.getServerConnectivityAsObservable()
.distinctUntilChanged()
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
connectivity -> {
if (connectivity.state == ServerConnectivity.STATE_CONNECTED) {
view.showConnectionOk();
view.refreshRoom();
} else if (connectivity.state == ServerConnectivity.STATE_DISCONNECTED) {
if (connectivity.code == DDPClient.REASON_NETWORK_ERROR) {
view.showConnectionError();
} else {
view.showConnectionOk();
}
} else {
view.showConnecting();
}
},
Logger::report
);
addSubscription(disposable);
}
private void setUserOnline() {
methodCallHelper.setUserPresence(User.STATUS_ONLINE)
.continueWith(new LogIfError());
}
private void setUserAway() {
methodCallHelper.setUserPresence(User.STATUS_AWAY)
.continueWith(new LogIfError());
}
private boolean shouldLaunchAddServerActivity() {
return connectivityManagerApi.getServerList().isEmpty();
}
@Override
public void loadSignedInServers(@NonNull String hostname) {
final Disposable disposable = publicSettingRepository.getById(PublicSettingsConstants.Assets.LOGO)
.zipWith(publicSettingRepository.getById(PublicSettingsConstants.General.SITE_NAME), Pair::new)
.map(this::getLogoAndSiteNamePair)
.map(settings -> getServerList(hostname, settings))
.subscribeOn(AndroidSchedulers.from(BackgroundLooper.get()))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
view::showSignedInServers,
RCLog::e
);
addSubscription(disposable);
}
@Override
public void bindView(@NonNull MainContract.View view) {
super.bindView(view);
if (shouldLaunchAddServerActivity()) {
view.showAddServerScreen();
return;
}
openRoom();
subscribeToNetworkChanges();
subscribeToUnreadCount();
subscribeToSession();
setUserOnline();
}
@Override
public void release() {
setUserAway();
super.release();
}
@Override
public void onOpenRoom(String hostname, String roomId) {
final Disposable subscription = canCreateRoomInteractor.canCreate(roomId)
.subscribeOn(AndroidSchedulers.from(BackgroundLooper.get()))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
allowed -> {
if (allowed) {
view.showRoom(hostname, roomId);
} else {
view.showHome();
}
},
Logger::report
);
addSubscription(subscription);
}
@Override
public void onRetryLogin() {
final Disposable subscription = sessionInteractor.retryLogin()
.subscribe();
addSubscription(subscription);
}
@Override
public void beforeLogoutCleanUp() {
clearSubscriptions();
}
private Pair<String, String> getLogoAndSiteNamePair(Pair<Optional<PublicSetting>, Optional<PublicSetting>> settingsPair) {
String logoUrl = "";
String siteName = "";
if (settingsPair.first.isPresent()) {
logoUrl = settingsPair.first.get().getValue();
}
if (settingsPair.second.isPresent()) {
siteName = settingsPair.second.get().getValue();
}
return new Pair<>(logoUrl, siteName);
}
private List<Pair<String, Pair<String, String>>> getServerList(String hostname, Pair<String, String> serverInfoPair) throws JSONException {
JSONObject jsonObject = new JSONObject(serverInfoPair.first);
String logoUrl = (jsonObject.has("url")) ?
jsonObject.optString("url") : jsonObject.optString("defaultUrl");
String siteName = serverInfoPair.second;
rocketChatCache.addHostname(hostname.toLowerCase(), logoUrl, siteName);
return rocketChatCache.getServerList();
}
private void openRoom() {
String hostname = rocketChatCache.getSelectedServerHostname();
String roomId = rocketChatCache.getSelectedRoomId();
if (roomId == null || roomId.length() == 0) {
view.showHome();
return;
}
onOpenRoom(hostname, roomId);
}
private void subscribeToUnreadCount() {
final Disposable subscription = Flowable.combineLatest(
roomInteractor.getTotalUnreadRoomsCount(),
roomInteractor.getTotalUnreadMentionsCount(),
(Pair::new)
)
.subscribeOn(AndroidSchedulers.from(BackgroundLooper.get()))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
pair -> view.showUnreadCount(pair.first, pair.second),
Logger::report
);
addSubscription(subscription);
}
private void subscribeToSession() {
final Disposable subscription = sessionInteractor.getDefault()
.subscribeOn(AndroidSchedulers.from(BackgroundLooper.get()))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
sessionOptional -> {
Session session = sessionOptional.orNull();
if (session == null || session.getToken() == null) {
view.showLoginScreen();
return;
}
String error = session.getError();
if (error != null && error.length() != 0) {
view.showConnectionError();
return;
}
if (!session.isTokenVerified()) {
view.showConnecting();
return;
}
view.showConnectionOk();
},
Logger::report
);
addSubscription(subscription);
}
private void subscribeToNetworkChanges() {
Disposable disposable = connectivityManagerApi.getServerConnectivityAsObservable()
.distinctUntilChanged()
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
connectivity -> {
if (connectivity.state == ServerConnectivity.STATE_CONNECTED) {
view.showConnectionOk();
view.refreshRoom();
} else if (connectivity.state == ServerConnectivity.STATE_DISCONNECTED) {
if (connectivity.code == DDPClient.REASON_NETWORK_ERROR) {
view.showConnectionError();
} else {
view.showConnectionOk();
}
} else {
view.showConnecting();
}
},
Logger::report
);
addSubscription(disposable);
}
private void setUserOnline() {
methodCallHelper.setUserPresence(User.STATUS_ONLINE)
.continueWith(new LogIfError());
}
private void setUserAway() {
methodCallHelper.setUserPresence(User.STATUS_AWAY)
.continueWith(new LogIfError());
}
private boolean shouldLaunchAddServerActivity() {
return connectivityManagerApi.getServerList().isEmpty();
}
}
......@@ -13,19 +13,19 @@ import io.reactivex.Single;
* interfaces used for Activity/Fragment and other UI-related logic.
*/
public interface ConnectivityManagerApi {
void keepAliveServer();
void keepAliveServer();
void addOrUpdateServer(String hostname, @Nullable String name, boolean insecure);
void addOrUpdateServer(String hostname, @Nullable String name, boolean insecure);
void removeServer(String hostname);
void removeServer(String hostname);
Single<Boolean> connect(String hostname);
Single<Boolean> connect(String hostname);
List<ServerInfo> getServerList();
List<ServerInfo> getServerList();
Flowable<ServerConnectivity> getServerConnectivityAsObservable();
Flowable<ServerConnectivity> getServerConnectivityAsObservable();
int getConnectivityState(@NonNull String hostname);
int getConnectivityState(@NonNull String hostname);
void resetConnectivityStateList();
void resetConnectivityStateList();
}
......@@ -23,264 +23,262 @@ import chat.rocket.persistence.realm.models.RealmBasedServerInfo;
import hugo.weaving.DebugLog;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.Observable;
import io.reactivex.Single;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.subjects.BehaviorSubject;
import io.reactivex.subjects.PublishSubject;
/**
* Connectivity management implementation.
*/
/*package*/ class RealmBasedConnectivityManager
implements ConnectivityManagerApi, ConnectivityManagerInternal {
private volatile ConcurrentHashMap<String, Integer> serverConnectivityList = new ConcurrentHashMap<>();
private volatile BehaviorSubject<ServerConnectivity> connectivitySubject = BehaviorSubject.createDefault(ServerConnectivity.CONNECTED);
private Context appContext;
private final ServiceConnection serviceConnection = new ServiceConnection() {
implements ConnectivityManagerApi, ConnectivityManagerInternal {
private volatile ConcurrentHashMap<String, Integer> serverConnectivityList = new ConcurrentHashMap<>();
private volatile BehaviorSubject<ServerConnectivity> connectivitySubject = BehaviorSubject.createDefault(ServerConnectivity.CONNECTED);
private Context appContext;
private final ServiceConnection serviceConnection = new ServiceConnection() {
@Override
public void onServiceConnected(ComponentName componentName, IBinder binder) {
serviceInterface = ((RocketChatService.LocalBinder) binder).getServiceInterface();
}
@Override
public void onServiceDisconnected(ComponentName componentName) {
serviceInterface = null;
}
};
private ConnectivityServiceInterface serviceInterface;
/*package*/ RealmBasedConnectivityManager setContext(Context appContext) {
this.appContext = appContext.getApplicationContext();
return this;
}
@Override
public void resetConnectivityStateList() {
serverConnectivityList.clear();
for (ServerInfo serverInfo : RealmBasedServerInfo.getServerInfoList()) {
serverConnectivityList.put(serverInfo.getHostname(), ServerConnectivity.STATE_DISCONNECTED);
}
}
@Override
public void keepAliveServer() {
RocketChatService.keepAlive(appContext);
if (serviceInterface == null) {
RocketChatService.bind(appContext, serviceConnection);
}
}
@SuppressLint("RxLeakedSubscription")
@DebugLog
@Override
public void ensureConnections() {
String hostname = new RocketChatCache(appContext).getSelectedServerHostname();
if (hostname == null) {
return;
}
connectToServerIfNeeded(hostname, true/* force connect */)
.subscribeOn(Schedulers.io())
.subscribe(_val -> {
}, error -> {
RCLog.e(error);
notifyConnectionLost(hostname, DDPClient.REASON_NETWORK_ERROR);
});
}
@SuppressLint("RxLeakedSubscription")
@Override
public void addOrUpdateServer(String hostname, @Nullable String name, boolean insecure) {
RealmBasedServerInfo.addOrUpdate(hostname, name, insecure);
if (!serverConnectivityList.containsKey(hostname)) {
serverConnectivityList.put(hostname, ServerConnectivity.STATE_DISCONNECTED);
}
connectToServerIfNeeded(hostname, false)
.subscribe(_val -> {
}, RCLog::e);
}
@SuppressLint("RxLeakedSubscription")
@Override
public void removeServer(String hostname) {
RealmBasedServerInfo.remove(hostname);
if (serverConnectivityList.containsKey(hostname)) {
disconnectFromServerIfNeeded(hostname)
.subscribe(_val -> {
}, RCLog::e);
}
}
@Override
public Single<Boolean> connect(String hostname) {
return connectToServerIfNeeded(hostname, false);
}
@Override
public void onServiceConnected(ComponentName componentName, IBinder binder) {
serviceInterface = ((RocketChatService.LocalBinder) binder).getServiceInterface();
public List<ServerInfo> getServerList() {
return RealmBasedServerInfo.getServerInfoList();
}
@Override
public void onServiceDisconnected(ComponentName componentName) {
serviceInterface = null;
public ServerInfo getServerInfoForHost(String hostname) {
return RealmBasedServerInfo.getServerInfoForHost(hostname);
}
};
private ConnectivityServiceInterface serviceInterface;
private List<ServerConnectivity> getCurrentConnectivityList() {
ArrayList<ServerConnectivity> list = new ArrayList<>();
for (Map.Entry<String, Integer> entry : serverConnectivityList.entrySet()) {
list.add(new ServerConnectivity(entry.getKey(), entry.getValue()));
}
return list;
}
/*package*/ RealmBasedConnectivityManager setContext(Context appContext) {
this.appContext = appContext.getApplicationContext();
return this;
}
@DebugLog
@Override
public void notifyConnectionEstablished(String hostname, String session) {
if (session != null) {
RealmBasedServerInfo.updateSession(hostname, session);
}
serverConnectivityList.put(hostname, ServerConnectivity.STATE_CONNECTED);
connectivitySubject.onNext(
new ServerConnectivity(hostname, ServerConnectivity.STATE_CONNECTED));
}
@Override
public void resetConnectivityStateList() {
serverConnectivityList.clear();
for (ServerInfo serverInfo : RealmBasedServerInfo.getServerInfoList()) {
serverConnectivityList.put(serverInfo.getHostname(), ServerConnectivity.STATE_DISCONNECTED);
@DebugLog
@Override
public void notifyConnectionLost(String hostname, int code) {
serverConnectivityList.put(hostname, ServerConnectivity.STATE_DISCONNECTED);
connectivitySubject.onNext(
new ServerConnectivity(hostname, ServerConnectivity.STATE_DISCONNECTED, code));
}
}
@Override
public void keepAliveServer() {
RocketChatService.keepAlive(appContext);
if (serviceInterface == null) {
RocketChatService.bind(appContext, serviceConnection);
@DebugLog
@Override
public void notifyConnecting(String hostname) {
serverConnectivityList.put(hostname, ServerConnectivity.STATE_CONNECTING);
connectivitySubject.onNext(
new ServerConnectivity(hostname, ServerConnectivity.STATE_CONNECTING));
}
}
@SuppressLint("RxLeakedSubscription")
@DebugLog
@Override
public void ensureConnections() {
String hostname = new RocketChatCache(appContext).getSelectedServerHostname();
if (hostname == null) {
return;
@Override
public Flowable<ServerConnectivity> getServerConnectivityAsObservable() {
return connectivitySubject.toFlowable(BackpressureStrategy.LATEST);
}
connectToServerIfNeeded(hostname, true/* force connect */)
.subscribeOn(Schedulers.io())
.subscribe(_val -> {
}, error -> {
RCLog.e(error);
notifyConnectionLost(hostname, DDPClient.REASON_NETWORK_ERROR);
});
}
@SuppressLint("RxLeakedSubscription")
@Override
public void addOrUpdateServer(String hostname, @Nullable String name, boolean insecure) {
RealmBasedServerInfo.addOrUpdate(hostname, name, insecure);
if (!serverConnectivityList.containsKey(hostname)) {
serverConnectivityList.put(hostname, ServerConnectivity.STATE_DISCONNECTED);
@Override
public int getConnectivityState(@NonNull String hostname) {
return serverConnectivityList.get(hostname);
}
connectToServerIfNeeded(hostname, false)
.subscribe(_val -> {
}, RCLog::e);
}
@SuppressLint("RxLeakedSubscription")
@Override
public void removeServer(String hostname) {
RealmBasedServerInfo.remove(hostname);
if (serverConnectivityList.containsKey(hostname)) {
disconnectFromServerIfNeeded(hostname)
.subscribe(_val -> {
}, RCLog::e);
@DebugLog
private Single<Boolean> connectToServerIfNeeded(String hostname, boolean forceConnect) {
return Single.defer(() -> {
Integer state = serverConnectivityList.get(hostname);
if (state == null) {
state = ServerConnectivity.STATE_DISCONNECTED;
}
final int connectivity = state;
if (!forceConnect && connectivity == ServerConnectivity.STATE_CONNECTED) {
return Single.just(true);
}
if (connectivity == ServerConnectivity.STATE_DISCONNECTING) {
return waitForDisconnected(hostname)
.flatMap(_val -> connectToServerIfNeeded(hostname, forceConnect));
}
if (connectivity == ServerConnectivity.STATE_DISCONNECTED) {
// notifyConnecting(hostname);
}
return connectToServer(hostname);
});
}
}
@Override
public Single<Boolean> connect(String hostname) {
return connectToServerIfNeeded(hostname, false);
}
@Override
public List<ServerInfo> getServerList() {
return RealmBasedServerInfo.getServerInfoList();
}
@Override
public ServerInfo getServerInfoForHost(String hostname) {
return RealmBasedServerInfo.getServerInfoForHost(hostname);
}
private List<ServerConnectivity> getCurrentConnectivityList() {
ArrayList<ServerConnectivity> list = new ArrayList<>();
for (Map.Entry<String, Integer> entry : serverConnectivityList.entrySet()) {
list.add(new ServerConnectivity(entry.getKey(), entry.getValue()));
private Single<Boolean> disconnectFromServerIfNeeded(String hostname) {
return Single.defer(() -> {
final int connectivity = serverConnectivityList.get(hostname);
if (connectivity == ServerConnectivity.STATE_DISCONNECTED) {
return Single.just(true);
}
if (connectivity == ServerConnectivity.STATE_CONNECTING) {
return waitForConnected(hostname)
.doOnError(err -> notifyConnectionLost(hostname, DDPClient.REASON_NETWORK_ERROR))
.flatMap(_val -> disconnectFromServerIfNeeded(hostname));
}
if (connectivity == ServerConnectivity.STATE_DISCONNECTING) {
return waitForDisconnected(hostname);
}
return disconnectFromServer(hostname)
.retryWhen(RxHelper.exponentialBackoff(1, 500, TimeUnit.MILLISECONDS));
});
}
return list;
}
@DebugLog
@Override
public void notifyConnectionEstablished(String hostname, String session) {
if (session != null) {
RealmBasedServerInfo.updateSession(hostname, session);
@DebugLog
private Single<Boolean> waitForConnected(String hostname) {
return connectivitySubject
.filter(serverConnectivity -> hostname.equals(serverConnectivity.hostname))
.map(serverConnectivity -> serverConnectivity.state)
.filter(state ->
state == ServerConnectivity.STATE_CONNECTED
|| state == ServerConnectivity.STATE_DISCONNECTED)
.firstElement()
.toSingle()
.flatMap(state ->
state == ServerConnectivity.STATE_CONNECTED
? Single.just(true)
: Single.error(new ServerConnectivity.DisconnectedException()));
}
@DebugLog
private Single<Boolean> waitForDisconnected(String hostname) {
return connectivitySubject
.filter(serverConnectivity -> hostname.equals(serverConnectivity.hostname))
.map(serverConnectivity -> serverConnectivity.state)
.filter(state -> state == ServerConnectivity.STATE_DISCONNECTED)
.firstElement()
.toSingle()
.map(state -> true);
}
@DebugLog
private Single<Boolean> connectToServer(String hostname) {
return Single.defer(() -> {
if (!serverConnectivityList.containsKey(hostname)) {
return Single.error(new IllegalArgumentException("hostname not found"));
}
if (serverConnectivityList.get(hostname) != ServerConnectivity.STATE_CONNECTED) {
// Mark as CONNECTING except for the case [forceConnect && connected] because
// ensureConnectionToServer doesn't notify ConnectionEstablished/Lost is already connected.
// serverConnectivityList.put(hostname, ServerConnectivity.STATE_CONNECTING);
}
if (serviceInterface != null) {
return serviceInterface.ensureConnectionToServer(hostname);
} else {
return Single.error(new IllegalStateException("not prepared"));
}
});
}
private Single<Boolean> disconnectFromServer(String hostname) {
return Single.defer(() -> {
if (!serverConnectivityList.containsKey(hostname)) {
return Single.error(new IllegalArgumentException("hostname not found"));
}
serverConnectivityList.put(hostname, ServerConnectivity.STATE_DISCONNECTING);
if (serviceInterface != null) {
return serviceInterface.disconnectFromServer(hostname)
// //after disconnection from server, remove HOSTNAME key from HashMap
.doAfterTerminate(() -> serverConnectivityList.remove(hostname));
} else {
return Single.error(new IllegalStateException("not prepared"));
}
});
}
serverConnectivityList.put(hostname, ServerConnectivity.STATE_CONNECTED);
connectivitySubject.onNext(
new ServerConnectivity(hostname, ServerConnectivity.STATE_CONNECTED));
}
@DebugLog
@Override
public void notifyConnectionLost(String hostname, int code) {
serverConnectivityList.put(hostname, ServerConnectivity.STATE_DISCONNECTED);
connectivitySubject.onNext(
new ServerConnectivity(hostname, ServerConnectivity.STATE_DISCONNECTED, code));
}
@DebugLog
@Override
public void notifyConnecting(String hostname) {
serverConnectivityList.put(hostname, ServerConnectivity.STATE_CONNECTING);
connectivitySubject.onNext(
new ServerConnectivity(hostname, ServerConnectivity.STATE_CONNECTING));
}
@Override
public Flowable<ServerConnectivity> getServerConnectivityAsObservable() {
return connectivitySubject.toFlowable(BackpressureStrategy.LATEST);
}
@Override
public int getConnectivityState(@NonNull String hostname) {
return serverConnectivityList.get(hostname);
}
@DebugLog
private Single<Boolean> connectToServerIfNeeded(String hostname, boolean forceConnect) {
return Single.defer(() -> {
Integer state = serverConnectivityList.get(hostname);
if (state == null) {
state = ServerConnectivity.STATE_DISCONNECTED;
}
final int connectivity = state;
if (!forceConnect && connectivity == ServerConnectivity.STATE_CONNECTED) {
return Single.just(true);
}
if (connectivity == ServerConnectivity.STATE_DISCONNECTING) {
return waitForDisconnected(hostname)
.flatMap(_val -> connectToServerIfNeeded(hostname, forceConnect));
}
if (connectivity == ServerConnectivity.STATE_DISCONNECTED) {
// notifyConnecting(hostname);
}
return connectToServer(hostname);
});
}
private Single<Boolean> disconnectFromServerIfNeeded(String hostname) {
return Single.defer(() -> {
final int connectivity = serverConnectivityList.get(hostname);
if (connectivity == ServerConnectivity.STATE_DISCONNECTED) {
return Single.just(true);
}
if (connectivity == ServerConnectivity.STATE_CONNECTING) {
return waitForConnected(hostname)
.doOnError(err -> notifyConnectionLost(hostname, DDPClient.REASON_NETWORK_ERROR))
.flatMap(_val -> disconnectFromServerIfNeeded(hostname));
}
if (connectivity == ServerConnectivity.STATE_DISCONNECTING) {
return waitForDisconnected(hostname);
}
return disconnectFromServer(hostname)
.retryWhen(RxHelper.exponentialBackoff(1, 500, TimeUnit.MILLISECONDS));
});
}
@DebugLog
private Single<Boolean> waitForConnected(String hostname) {
return connectivitySubject
.filter(serverConnectivity -> hostname.equals(serverConnectivity.hostname))
.map(serverConnectivity -> serverConnectivity.state)
.filter(state ->
state == ServerConnectivity.STATE_CONNECTED
|| state == ServerConnectivity.STATE_DISCONNECTED)
.firstElement()
.toSingle()
.flatMap(state ->
state == ServerConnectivity.STATE_CONNECTED
? Single.just(true)
: Single.error(new ServerConnectivity.DisconnectedException()));
}
@DebugLog
private Single<Boolean> waitForDisconnected(String hostname) {
return connectivitySubject
.filter(serverConnectivity -> hostname.equals(serverConnectivity.hostname))
.map(serverConnectivity -> serverConnectivity.state)
.filter(state -> state == ServerConnectivity.STATE_DISCONNECTED)
.firstElement()
.toSingle()
.map(state -> true);
}
@DebugLog
private Single<Boolean> connectToServer(String hostname) {
return Single.defer(() -> {
if (!serverConnectivityList.containsKey(hostname)) {
return Single.error(new IllegalArgumentException("hostname not found"));
}
if (serverConnectivityList.get(hostname) != ServerConnectivity.STATE_CONNECTED) {
// Mark as CONNECTING except for the case [forceConnect && connected] because
// ensureConnectionToServer doesn't notify ConnectionEstablished/Lost is already connected.
// serverConnectivityList.put(hostname, ServerConnectivity.STATE_CONNECTING);
}
if (serviceInterface != null) {
return serviceInterface.ensureConnectionToServer(hostname);
} else {
return Single.error(new IllegalStateException("not prepared"));
}
});
}
private Single<Boolean> disconnectFromServer(String hostname) {
return Single.defer(() -> {
if (!serverConnectivityList.containsKey(hostname)) {
return Single.error(new IllegalArgumentException("hostname not found"));
}
serverConnectivityList.put(hostname, ServerConnectivity.STATE_DISCONNECTING);
if (serviceInterface != null) {
return serviceInterface.disconnectFromServer(hostname)
// //after disconnection from server, remove HOSTNAME key from HashMap
.doAfterTerminate(() -> serverConnectivityList.remove(hostname));
} else {
return Single.error(new IllegalStateException("not prepared"));
}
});
}
}
\ No newline at end of file
......@@ -23,130 +23,130 @@ import io.reactivex.Single;
*/
public class RocketChatService extends Service implements ConnectivityServiceInterface {
private ConnectivityManagerInternal connectivityManager;
private static volatile Semaphore webSocketThreadLock = new Semaphore(1);
private static volatile RocketChatWebSocketThread currentWebSocketThread;
private ConnectivityManagerInternal connectivityManager;
private static volatile Semaphore webSocketThreadLock = new Semaphore(1);
private static volatile RocketChatWebSocketThread currentWebSocketThread;
public class LocalBinder extends Binder {
ConnectivityServiceInterface getServiceInterface() {
return RocketChatService.this;
}
}
private final LocalBinder localBinder = new LocalBinder();
/**
* ensure RocketChatService alive.
*/
/*package*/static void keepAlive(Context context) {
context.startService(new Intent(context, RocketChatService.class));
}
public static void bind(Context context, ServiceConnection serviceConnection) {
context.bindService(
new Intent(context, RocketChatService.class), serviceConnection, Context.BIND_AUTO_CREATE);
}
public static void unbind(Context context, ServiceConnection serviceConnection) {
context.unbindService(serviceConnection);
}
@DebugLog
@Override
public void onCreate() {
super.onCreate();
connectivityManager = ConnectivityManager.getInstanceForInternal(getApplicationContext());
connectivityManager.resetConnectivityStateList();
}
public class LocalBinder extends Binder {
ConnectivityServiceInterface getServiceInterface() {
return RocketChatService.this;
@DebugLog
@Override
public int onStartCommand(Intent intent, int flags, int startId) {
connectivityManager.ensureConnections();
return START_NOT_STICKY;
}
}
private final LocalBinder localBinder = new LocalBinder();
/**
* ensure RocketChatService alive.
*/
/*package*/ static void keepAlive(Context context) {
context.startService(new Intent(context, RocketChatService.class));
}
public static void bind(Context context, ServiceConnection serviceConnection) {
context.bindService(
new Intent(context, RocketChatService.class), serviceConnection, Context.BIND_AUTO_CREATE);
}
public static void unbind(Context context, ServiceConnection serviceConnection) {
context.unbindService(serviceConnection);
}
@DebugLog
@Override
public void onCreate() {
super.onCreate();
connectivityManager = ConnectivityManager.getInstanceForInternal(getApplicationContext());
connectivityManager.resetConnectivityStateList();
}
@DebugLog
@Override
public int onStartCommand(Intent intent, int flags, int startId) {
connectivityManager.ensureConnections();
return START_NOT_STICKY;
}
@Override
public Single<Boolean> ensureConnectionToServer(String hostname) { //called via binder.
return getOrCreateWebSocketThread(hostname)
.flatMap(RocketChatWebSocketThread::keepAlive);
}
@Override
public Single<Boolean> disconnectFromServer(String hostname) { //called via binder.
return Single.defer(() -> {
if (!existsThreadForHostname(hostname)) {
return Single.just(true);
}
if (currentWebSocketThread != null) {
return currentWebSocketThread.terminate()
// after disconnection from server
.doAfterTerminate(() -> {
currentWebSocketThread = null;
// remove RealmConfiguration key from HashMap
RealmStore.sStore.remove(hostname);
});
} else {
return Observable.timer(1, TimeUnit.SECONDS).singleOrError()
.flatMap(_val -> disconnectFromServer(hostname));
}
});
}
@DebugLog
private Single<RocketChatWebSocketThread> getOrCreateWebSocketThread(String hostname) {
return Single.defer(() -> {
webSocketThreadLock.acquire();
int connectivityState = ConnectivityManager.getInstance(getApplicationContext()).getConnectivityState(hostname);
boolean isDisconnected = connectivityState != ServerConnectivity.STATE_CONNECTED;
if (currentWebSocketThread != null && existsThreadForHostname(hostname) && !isDisconnected) {
webSocketThreadLock.release();
return Single.just(currentWebSocketThread);
}
if (currentWebSocketThread != null) {
return currentWebSocketThread.terminate()
.doAfterTerminate(() -> currentWebSocketThread = null)
.flatMap(terminated ->
RocketChatWebSocketThread.getStarted(getApplicationContext(), hostname)
.doOnSuccess(thread -> {
currentWebSocketThread = thread;
webSocketThreadLock.release();
})
.doOnError(throwable -> {
@Override
public Single<Boolean> ensureConnectionToServer(String hostname) { //called via binder.
return getOrCreateWebSocketThread(hostname)
.flatMap(RocketChatWebSocketThread::keepAlive);
}
@Override
public Single<Boolean> disconnectFromServer(String hostname) { //called via binder.
return Single.defer(() -> {
if (!existsThreadForHostname(hostname)) {
return Single.just(true);
}
if (currentWebSocketThread != null) {
return currentWebSocketThread.terminate()
// after disconnection from server
.doAfterTerminate(() -> {
currentWebSocketThread = null;
RCLog.e(throwable);
Logger.report(throwable);
webSocketThreadLock.release();
})
);
}
return RocketChatWebSocketThread.getStarted(getApplicationContext(), hostname)
.doOnSuccess(thread -> {
currentWebSocketThread = thread;
webSocketThreadLock.release();
})
.doOnError(throwable -> {
currentWebSocketThread = null;
RCLog.e(throwable);
Logger.report(throwable);
webSocketThreadLock.release();
});
});
}
private boolean existsThreadForHostname(String hostname) {
if (hostname == null || currentWebSocketThread == null) {
return false;
// remove RealmConfiguration key from HashMap
RealmStore.sStore.remove(hostname);
});
} else {
return Observable.timer(1, TimeUnit.SECONDS).singleOrError()
.flatMap(_val -> disconnectFromServer(hostname));
}
});
}
@DebugLog
private Single<RocketChatWebSocketThread> getOrCreateWebSocketThread(String hostname) {
return Single.defer(() -> {
webSocketThreadLock.acquire();
int connectivityState = ConnectivityManager.getInstance(getApplicationContext()).getConnectivityState(hostname);
boolean isDisconnected = connectivityState != ServerConnectivity.STATE_CONNECTED;
if (currentWebSocketThread != null && existsThreadForHostname(hostname) && !isDisconnected) {
webSocketThreadLock.release();
return Single.just(currentWebSocketThread);
}
if (currentWebSocketThread != null) {
return currentWebSocketThread.terminate()
.doAfterTerminate(() -> currentWebSocketThread = null)
.flatMap(terminated ->
RocketChatWebSocketThread.getStarted(getApplicationContext(), hostname)
.doOnSuccess(thread -> {
currentWebSocketThread = thread;
webSocketThreadLock.release();
})
.doOnError(throwable -> {
currentWebSocketThread = null;
RCLog.e(throwable);
Logger.report(throwable);
webSocketThreadLock.release();
})
);
}
return RocketChatWebSocketThread.getStarted(getApplicationContext(), hostname)
.doOnSuccess(thread -> {
currentWebSocketThread = thread;
webSocketThreadLock.release();
})
.doOnError(throwable -> {
currentWebSocketThread = null;
RCLog.e(throwable);
Logger.report(throwable);
webSocketThreadLock.release();
});
});
}
private boolean existsThreadForHostname(String hostname) {
if (hostname == null || currentWebSocketThread == null) {
return false;
}
return currentWebSocketThread.getName().equals("RC_thread_" + hostname);
}
@Nullable
@Override
public IBinder onBind(Intent intent) {
return localBinder;
}
return currentWebSocketThread.getName().equals("RC_thread_" + hostname);
}
@Nullable
@Override
public IBinder onBind(Intent intent) {
return localBinder;
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment