Commit 8af265c7 authored by Leonardo Aramaki's avatar Leonardo Aramaki

Refresh opened rooms if conn state is SESSION_ESTABLISHED loading

missed messages for that channel
parent 85c02cc3
......@@ -18,6 +18,7 @@ import android.widget.TextView;
import com.facebook.drawee.view.SimpleDraweeView;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import chat.rocket.android.LaunchUtil;
import chat.rocket.android.R;
......@@ -49,7 +50,7 @@ public class MainActivity extends AbstractAuthedActivity implements MainContract
private RoomToolbar toolbar;
private SlidingPaneLayout pane;
private MainContract.Presenter presenter;
private volatile Snackbar statusTicker;
private volatile AtomicReference<Snackbar> statusTicker = new AtomicReference<>();
@Override
public int getLayoutContainerForFragment() {
......@@ -95,7 +96,9 @@ public class MainActivity extends AbstractAuthedActivity implements MainContract
presenter.release();
}
// Dismiss any status ticker
if (statusTicker != null) statusTicker.dismiss();
if (statusTicker.get() != null) {
statusTicker.get().dismiss();
}
super.onPause();
}
......@@ -250,31 +253,47 @@ public class MainActivity extends AbstractAuthedActivity implements MainContract
}
@Override
public synchronized void showConnectionError() {
dismissStatusTickerIfShowing();
statusTicker = Snackbar.make(findViewById(getLayoutContainerForFragment()),
R.string.fragment_retry_login_error_title, Snackbar.LENGTH_INDEFINITE)
.setAction(R.string.fragment_retry_login_retry_title, view ->
ConnectivityManager.getInstance(getApplicationContext()).keepAliveServer());
statusTicker.show();
public void showConnectionError() {
if (statusTicker.get() != null && statusTicker.get().isShown()) {
statusTicker.get().setText(R.string.fragment_retry_login_error_title)
.setAction(R.string.fragment_retry_login_retry_title, view -> {
statusTicker.set(null);
showConnecting();
ConnectivityManager.getInstance(getApplicationContext()).keepAliveServer();
});
} else {
Snackbar newStatusTicker = Snackbar.make(findViewById(getLayoutContainerForFragment()),
R.string.fragment_retry_login_error_title, Snackbar.LENGTH_INDEFINITE)
.setAction(R.string.fragment_retry_login_retry_title, view -> {
statusTicker.set(null);
showConnecting();
ConnectivityManager.getInstance(getApplicationContext()).keepAliveServer();
});
statusTicker.set(newStatusTicker);
statusTicker.get().show();
}
}
@Override
public synchronized void showConnecting() {
dismissStatusTickerIfShowing();
statusTicker = Snackbar.make(findViewById(getLayoutContainerForFragment()),
R.string.server_config_activity_authenticating, Snackbar.LENGTH_INDEFINITE);
statusTicker.show();
public void showConnecting() {
if (statusTicker.get() != null && statusTicker.get().isShown()) {
statusTicker.get().setText(R.string.server_config_activity_authenticating);
} else {
Snackbar newStatusTicker = Snackbar.make(findViewById(getLayoutContainerForFragment()),
R.string.server_config_activity_authenticating, Snackbar.LENGTH_INDEFINITE);
statusTicker.set(newStatusTicker);
statusTicker.get().show();
}
}
@Override
public synchronized void showConnectionOk() {
public void showConnectionOk() {
dismissStatusTickerIfShowing();
}
private void dismissStatusTickerIfShowing() {
if (statusTicker != null) {
statusTicker.dismiss();
statusTicker.get().dismiss();
}
}
......@@ -334,7 +353,7 @@ public class MainActivity extends AbstractAuthedActivity implements MainContract
Fragment fragment = getSupportFragmentManager().findFragmentById(getLayoutContainerForFragment());
if (fragment != null && fragment instanceof RoomFragment) {
RoomFragment roomFragment = (RoomFragment) fragment;
roomFragment.loadMessages();
roomFragment.loadMissedMessages();
}
}
......
......@@ -212,7 +212,7 @@ public class MainPresenter extends BasePresenter<MainContract.View>
return;
}
view.showConnectionOk();
// view.showConnectionOk();
},
Logger::report
);
......@@ -227,12 +227,15 @@ public class MainPresenter extends BasePresenter<MainContract.View>
.subscribe(
connectivity -> {
if (connectivity.state == ServerConnectivity.STATE_CONNECTED) {
view.showConnectionOk();
view.refreshRoom();
//TODO: notify almost connected or something like that.
// view.showConnectionOk();
} else if (connectivity.state == ServerConnectivity.STATE_DISCONNECTED) {
if (connectivity.code == DDPClient.REASON_NETWORK_ERROR) {
view.showConnectionError();
}
} else if (connectivity.state == ServerConnectivity.STATE_SESSION_ESTABLISHED) {
view.refreshRoom();
view.showConnectionOk();
} else {
view.showConnecting();
}
......
......@@ -13,67 +13,69 @@ import chat.rocket.core.models.User;
public interface RoomContract {
interface View extends BaseContract.View {
interface View extends BaseContract.View {
void setupWith(RocketChatAbsoluteUrl rocketChatAbsoluteUrl);
void setupWith(RocketChatAbsoluteUrl rocketChatAbsoluteUrl);
void render(Room room);
void render(Room room);
void showUserStatus(User user);
void showUserStatus(User user);
void updateHistoryState(boolean hasNext, boolean isLoaded);
void updateHistoryState(boolean hasNext, boolean isLoaded);
void onMessageSendSuccessfully();
void onMessageSendSuccessfully();
void disableMessageInput();
void disableMessageInput();
void enableMessageInput();
void enableMessageInput();
void showUnreadCount(int count);
void showUnreadCount(int count);
void showMessages(List<Message> messages);
void showMessages(List<Message> messages);
void showMessageSendFailure(Message message);
void showMessageSendFailure(Message message);
void showMessageDeleteFailure(Message message);
void showMessageDeleteFailure(Message message);
void autoloadImages();
void autoloadImages();
void manualLoadImages();
void manualLoadImages();
void onReply(AbsoluteUrl absoluteUrl, String markdown, Message message);
void onReply(AbsoluteUrl absoluteUrl, String markdown, Message message);
void onCopy(String message);
void onCopy(String message);
void showMessageActions(Message message);
}
void showMessageActions(Message message);
}
interface Presenter extends BaseContract.Presenter<View> {
interface Presenter extends BaseContract.Presenter<View> {
void loadMessages();
void loadMessages();
void loadMoreMessages();
void loadMoreMessages();
void onMessageSelected(@Nullable Message message);
void onMessageSelected(@Nullable Message message);
void onMessageTap(@Nullable Message message);
void onMessageTap(@Nullable Message message);
void sendMessage(String messageText);
void sendMessage(String messageText);
void resendMessage(@NonNull Message message);
void resendMessage(@NonNull Message message);
void updateMessage(@NonNull Message message, String content);
void updateMessage(@NonNull Message message, String content);
void deleteMessage(@NonNull Message message);
void deleteMessage(@NonNull Message message);
void onUnreadCount();
void onUnreadCount();
void onMarkAsRead();
void onMarkAsRead();
void refreshRoom();
void refreshRoom();
void replyMessage(@NonNull Message message, boolean justQuote);
void replyMessage(@NonNull Message message, boolean justQuote);
void acceptMessageDeleteFailure(Message message);
}
void acceptMessageDeleteFailure(Message message);
void loadMissedMessages();
}
}
......@@ -716,7 +716,7 @@ public class RoomFragment extends AbstractChatRoomFragment implements
}
}
public void loadMessages() {
presenter.loadMessages();
public void loadMissedMessages() {
presenter.loadMissedMessages();
}
}
\ No newline at end of file
......@@ -6,11 +6,17 @@ import android.support.v4.util.Pair;
import com.hadisatrio.optional.Optional;
import org.json.JSONException;
import org.json.JSONObject;
import chat.rocket.android.BackgroundLooper;
import chat.rocket.android.RocketChatApplication;
import chat.rocket.android.RocketChatCache;
import chat.rocket.android.api.MethodCallHelper;
import chat.rocket.android.helper.AbsoluteUrlHelper;
import chat.rocket.android.helper.LogIfError;
import chat.rocket.android.helper.Logger;
import chat.rocket.android.log.RCLog;
import chat.rocket.android.service.ConnectivityManagerApi;
import chat.rocket.android.shared.BasePresenter;
import chat.rocket.core.SyncState;
......@@ -27,393 +33,415 @@ import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.disposables.Disposable;
public class RoomPresenter extends BasePresenter<RoomContract.View>
implements RoomContract.Presenter {
private final String roomId;
private final MessageInteractor messageInteractor;
private final UserRepository userRepository;
private final RoomRepository roomRepository;
private final AbsoluteUrlHelper absoluteUrlHelper;
private final MethodCallHelper methodCallHelper;
private final ConnectivityManagerApi connectivityManagerApi;
private Room currentRoom;
public RoomPresenter(String roomId,
UserRepository userRepository,
MessageInteractor messageInteractor,
RoomRepository roomRepository,
AbsoluteUrlHelper absoluteUrlHelper,
MethodCallHelper methodCallHelper,
ConnectivityManagerApi connectivityManagerApi) {
this.roomId = roomId;
this.userRepository = userRepository;
this.messageInteractor = messageInteractor;
this.roomRepository = roomRepository;
this.absoluteUrlHelper = absoluteUrlHelper;
this.methodCallHelper = methodCallHelper;
this.connectivityManagerApi = connectivityManagerApi;
}
@Override
public void bindView(@NonNull RoomContract.View view) {
super.bindView(view);
refreshRoom();
}
@Override
public void refreshRoom() {
getRoomRoles();
getRoomInfo();
getRoomHistoryStateInfo();
getMessages();
getUserPreferences();
}
@Override
public void loadMessages() {
final Disposable subscription = getSingleRoom()
.flatMap(messageInteractor::loadMessages)
.subscribeOn(AndroidSchedulers.from(BackgroundLooper.get()))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
success -> {
if (!success) {
connectivityManagerApi.keepAliveServer();
}
},
Logger::report
);
implements RoomContract.Presenter {
private final String roomId;
private final MessageInteractor messageInteractor;
private final UserRepository userRepository;
private final RoomRepository roomRepository;
private final AbsoluteUrlHelper absoluteUrlHelper;
private final MethodCallHelper methodCallHelper;
private final ConnectivityManagerApi connectivityManagerApi;
private Room currentRoom;
/* package */RoomPresenter(String roomId,
UserRepository userRepository,
MessageInteractor messageInteractor,
RoomRepository roomRepository,
AbsoluteUrlHelper absoluteUrlHelper,
MethodCallHelper methodCallHelper,
ConnectivityManagerApi connectivityManagerApi) {
this.roomId = roomId;
this.userRepository = userRepository;
this.messageInteractor = messageInteractor;
this.roomRepository = roomRepository;
this.absoluteUrlHelper = absoluteUrlHelper;
this.methodCallHelper = methodCallHelper;
this.connectivityManagerApi = connectivityManagerApi;
}
addSubscription(subscription);
}
@Override
public void loadMoreMessages() {
final Disposable subscription = getSingleRoom()
.flatMap(messageInteractor::loadMoreMessages)
.subscribeOn(AndroidSchedulers.from(BackgroundLooper.get()))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
success -> {
if (!success) {
connectivityManagerApi.keepAliveServer();
}
},
Logger::report
);
@Override
public void bindView(@NonNull RoomContract.View view) {
super.bindView(view);
refreshRoom();
}
addSubscription(subscription);
}
@Override
public void refreshRoom() {
getRoomRoles();
getRoomInfo();
getRoomHistoryStateInfo();
getMessages();
getUserPreferences();
}
@Override
public void onMessageSelected(@Nullable Message message) {
if (message == null) {
return;
@Override
public void loadMessages() {
final Disposable subscription = getSingleRoom()
.flatMap(messageInteractor::loadMessages)
.subscribeOn(AndroidSchedulers.from(BackgroundLooper.get()))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
success -> {
if (!success) {
connectivityManagerApi.keepAliveServer();
}
},
Logger::report
);
addSubscription(subscription);
}
if (message.getSyncState() == SyncState.DELETE_FAILED) {
view.showMessageDeleteFailure(message);
} else if (message.getSyncState() == SyncState.FAILED) {
view.showMessageSendFailure(message);
} else if (message.getType() == null && message.getSyncState() == SyncState.SYNCED) {
// If message is not a system message show applicable actions.
view.showMessageActions(message);
@Override
public void loadMoreMessages() {
final Disposable subscription = getSingleRoom()
.flatMap(messageInteractor::loadMoreMessages)
.subscribeOn(AndroidSchedulers.from(BackgroundLooper.get()))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
success -> {
if (!success) {
connectivityManagerApi.keepAliveServer();
}
},
Logger::report
);
addSubscription(subscription);
}
}
@Override
public void onMessageTap(@Nullable Message message) {
if (message == null) {
return;
@Override
public void onMessageSelected(@Nullable Message message) {
if (message == null) {
return;
}
if (message.getSyncState() == SyncState.DELETE_FAILED) {
view.showMessageDeleteFailure(message);
} else if (message.getSyncState() == SyncState.FAILED) {
view.showMessageSendFailure(message);
} else if (message.getType() == null && message.getSyncState() == SyncState.SYNCED) {
// If message is not a system message show applicable actions.
view.showMessageActions(message);
}
}
if (message.getSyncState() == SyncState.FAILED) {
view.showMessageSendFailure(message);
@Override
public void onMessageTap(@Nullable Message message) {
if (message == null) {
return;
}
if (message.getSyncState() == SyncState.FAILED) {
view.showMessageSendFailure(message);
}
}
}
@Override
public void replyMessage(@NonNull Message message, boolean justQuote) {
final Disposable subscription = this.absoluteUrlHelper.getRocketChatAbsoluteUrl()
.cache()
.subscribeOn(AndroidSchedulers.from(BackgroundLooper.get()))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
serverUrl -> {
if (serverUrl.isPresent()) {
RocketChatAbsoluteUrl absoluteUrl = serverUrl.get();
String baseUrl = absoluteUrl.getBaseUrl();
view.onReply(absoluteUrl, buildReplyOrQuoteMarkdown(baseUrl, message, justQuote), message);
}
},
Logger::report
);
addSubscription(subscription);
}
public void acceptMessageDeleteFailure(Message message) {
final Disposable subscription = messageInteractor.acceptDeleteFailure(message)
.subscribeOn(AndroidSchedulers.from(BackgroundLooper.get()))
.observeOn(AndroidSchedulers.mainThread())
.subscribe();
addSubscription(subscription);
}
private String buildReplyOrQuoteMarkdown(String baseUrl, Message message, boolean justQuote) {
if (currentRoom == null || message.getUser() == null) {
return "";
@Override
public void replyMessage(@NonNull Message message, boolean justQuote) {
final Disposable subscription = this.absoluteUrlHelper.getRocketChatAbsoluteUrl()
.cache()
.subscribeOn(AndroidSchedulers.from(BackgroundLooper.get()))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
serverUrl -> {
if (serverUrl.isPresent()) {
RocketChatAbsoluteUrl absoluteUrl = serverUrl.get();
String baseUrl = absoluteUrl.getBaseUrl();
view.onReply(absoluteUrl, buildReplyOrQuoteMarkdown(baseUrl, message, justQuote), message);
}
},
Logger::report
);
addSubscription(subscription);
}
if (currentRoom.isDirectMessage()) {
return String.format("[ ](%s/direct/%s?msg=%s) ", baseUrl,
message.getUser().getUsername(),
message.getId());
} else {
return String.format("[ ](%s/channel/%s?msg=%s) %s", baseUrl,
currentRoom.getName(),
message.getId(),
justQuote ? "" : "@" + message.getUser().getUsername() + " ");
public void acceptMessageDeleteFailure(Message message) {
final Disposable subscription = messageInteractor.acceptDeleteFailure(message)
.subscribeOn(AndroidSchedulers.from(BackgroundLooper.get()))
.observeOn(AndroidSchedulers.mainThread())
.subscribe();
addSubscription(subscription);
}
}
@Override
public void sendMessage(String messageText) {
view.disableMessageInput();
final Disposable subscription = getRoomUserPair()
.flatMap(pair -> messageInteractor.send(pair.first, pair.second, messageText))
.subscribeOn(AndroidSchedulers.from(BackgroundLooper.get()))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
success -> {
if (success) {
view.onMessageSendSuccessfully();
}
view.enableMessageInput();
},
throwable -> {
view.enableMessageInput();
Logger.report(throwable);
}
);
addSubscription(subscription);
}
@Override
public void resendMessage(@NonNull Message message) {
final Disposable subscription = getCurrentUser()
.flatMap(user -> messageInteractor.resend(message, user))
.subscribeOn(AndroidSchedulers.from(BackgroundLooper.get()))
.observeOn(AndroidSchedulers.mainThread())
.subscribe();
addSubscription(subscription);
}
@Override
public void updateMessage(@NonNull Message message, String content) {
view.disableMessageInput();
final Disposable subscription = getCurrentUser()
.flatMap(user -> messageInteractor.update(message, user, content))
.subscribeOn(AndroidSchedulers.from(BackgroundLooper.get()))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
success -> {
if (success) {
view.onMessageSendSuccessfully();
}
view.enableMessageInput();
},
throwable -> {
view.enableMessageInput();
Logger.report(throwable);
@Override
public void loadMissedMessages() {
RocketChatApplication appContext = RocketChatApplication.getInstance();
JSONObject openedRooms = new RocketChatCache(appContext).getOpenedRooms();
if (openedRooms.has(roomId)) {
try {
JSONObject room = openedRooms.getJSONObject(roomId);
String rid = room.optString("rid");
long ls = room.optLong("ls");
methodCallHelper.loadMissedMessages(rid, ls)
.continueWith(new LogIfError());
} catch (JSONException e) {
RCLog.e(e);
}
);
}
}
addSubscription(subscription);
}
@Override
public void deleteMessage(@NonNull Message message) {
final Disposable subscription = messageInteractor.delete(message)
.subscribeOn(AndroidSchedulers.from(BackgroundLooper.get()))
.observeOn(AndroidSchedulers.mainThread())
.subscribe();
addSubscription(subscription);
}
@Override
public void onUnreadCount() {
final Disposable subscription = getRoomUserPair()
.flatMap(roomUserPair -> messageInteractor
.unreadCountFor(roomUserPair.first, roomUserPair.second))
.subscribeOn(AndroidSchedulers.from(BackgroundLooper.get()))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
count -> view.showUnreadCount(count),
Logger::report
);
private String buildReplyOrQuoteMarkdown(String baseUrl, Message message, boolean justQuote) {
if (currentRoom == null || message.getUser() == null) {
return "";
}
if (currentRoom.isDirectMessage()) {
return String.format("[ ](%s/direct/%s?msg=%s) ", baseUrl,
message.getUser().getUsername(),
message.getId());
} else {
return String.format("[ ](%s/channel/%s?msg=%s) %s", baseUrl,
currentRoom.getName(),
message.getId(),
justQuote ? "" : "@" + message.getUser().getUsername() + " ");
}
}
addSubscription(subscription);
}
@Override
public void onMarkAsRead() {
final Disposable subscription = roomRepository.getById(roomId)
.filter(Optional::isPresent)
.map(Optional::get)
.firstElement()
.filter(Room::isAlert)
.subscribeOn(AndroidSchedulers.from(BackgroundLooper.get()))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
room -> methodCallHelper.readMessages(room.getRoomId())
.continueWith(new LogIfError()),
Logger::report
);
@Override
public void sendMessage(String messageText) {
view.disableMessageInput();
final Disposable subscription = getRoomUserPair()
.flatMap(pair -> messageInteractor.send(pair.first, pair.second, messageText))
.subscribeOn(AndroidSchedulers.from(BackgroundLooper.get()))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
success -> {
if (success) {
view.onMessageSendSuccessfully();
}
view.enableMessageInput();
},
throwable -> {
view.enableMessageInput();
Logger.report(throwable);
}
);
addSubscription(subscription);
}
@Override
public void resendMessage(@NonNull Message message) {
final Disposable subscription = getCurrentUser()
.flatMap(user -> messageInteractor.resend(message, user))
.subscribeOn(AndroidSchedulers.from(BackgroundLooper.get()))
.observeOn(AndroidSchedulers.mainThread())
.subscribe();
addSubscription(subscription);
}
private void getRoomRoles() {
methodCallHelper.getRoomRoles(roomId);
}
private void getRoomInfo() {
final Disposable subscription = roomRepository.getById(roomId)
.distinctUntilChanged()
.filter(Optional::isPresent)
.map(Optional::get)
.subscribeOn(AndroidSchedulers.from(BackgroundLooper.get()))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(this::processRoom, Logger::report);
addSubscription(subscription);
}
private void processRoom(Room room) {
this.currentRoom = room;
view.render(room);
if (room.isDirectMessage()) {
getUserByUsername(room.getName());
addSubscription(subscription);
}
}
private void getUserByUsername(String username) {
final Disposable disposable = userRepository.getByUsername(username)
.distinctUntilChanged()
.filter(Optional::isPresent)
.map(Optional::get)
.subscribeOn(AndroidSchedulers.from(BackgroundLooper.get()))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(view::showUserStatus, Logger::report);
addSubscription(disposable);
}
private void getRoomHistoryStateInfo() {
final Disposable subscription = roomRepository.getHistoryStateByRoomId(roomId)
.distinctUntilChanged()
.filter(Optional::isPresent)
.map(Optional::get)
.subscribeOn(AndroidSchedulers.from(BackgroundLooper.get()))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
roomHistoryState -> {
int syncState = roomHistoryState.getSyncState();
view.updateHistoryState(
!roomHistoryState.isComplete(),
syncState == SyncState.SYNCED || syncState == SyncState.FAILED
);
},
Logger::report
);
addSubscription(subscription);
}
private void getMessages() {
final Disposable subscription = Flowable.zip(roomRepository.getById(roomId),
absoluteUrlHelper.getRocketChatAbsoluteUrl().toFlowable().cache(), Pair::new)
.subscribeOn(AndroidSchedulers.from(BackgroundLooper.get()))
.observeOn(AndroidSchedulers.mainThread())
.map(pair -> {
view.setupWith(pair.second.orNull());
return pair.first;
})
.filter(Optional::isPresent)
.map(Optional::get)
.flatMap(messageInteractor::getAllFrom)
.subscribeOn(AndroidSchedulers.from(BackgroundLooper.get()))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
view::showMessages,
Logger::report
);
@Override
public void updateMessage(@NonNull Message message, String content) {
view.disableMessageInput();
final Disposable subscription = getCurrentUser()
.flatMap(user -> messageInteractor.update(message, user, content))
.subscribeOn(AndroidSchedulers.from(BackgroundLooper.get()))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
success -> {
if (success) {
view.onMessageSendSuccessfully();
}
view.enableMessageInput();
},
throwable -> {
view.enableMessageInput();
Logger.report(throwable);
}
);
addSubscription(subscription);
}
addSubscription(subscription);
}
private void getUserPreferences() {
final Disposable subscription = userRepository.getCurrent()
.filter(Optional::isPresent)
.map(Optional::get)
.filter(user -> user.getSettings() != null)
.map(User::getSettings)
.filter(settings -> settings.getPreferences() != null)
.map(Settings::getPreferences)
.distinctUntilChanged()
.subscribeOn(AndroidSchedulers.from(BackgroundLooper.get()))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
preferences -> {
if (preferences.isAutoImageLoad()) {
view.autoloadImages();
} else {
view.manualLoadImages();
}
},
Logger::report
);
@Override
public void deleteMessage(@NonNull Message message) {
final Disposable subscription = messageInteractor.delete(message)
.subscribeOn(AndroidSchedulers.from(BackgroundLooper.get()))
.observeOn(AndroidSchedulers.mainThread())
.subscribe();
addSubscription(subscription);
}
addSubscription(subscription);
}
@Override
public void onUnreadCount() {
final Disposable subscription = getRoomUserPair()
.flatMap(roomUserPair -> messageInteractor
.unreadCountFor(roomUserPair.first, roomUserPair.second))
.subscribeOn(AndroidSchedulers.from(BackgroundLooper.get()))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
count -> view.showUnreadCount(count),
Logger::report
);
addSubscription(subscription);
}
private void getAbsoluteUrl() {
final Disposable subscription = absoluteUrlHelper.getRocketChatAbsoluteUrl()
.subscribeOn(AndroidSchedulers.from(BackgroundLooper.get()))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
it -> view.setupWith(it.orNull()),
Logger::report
@Override
public void onMarkAsRead() {
final Disposable subscription = roomRepository.getById(roomId)
.filter(Optional::isPresent)
.map(Optional::get)
.firstElement()
.filter(Room::isAlert)
.subscribeOn(AndroidSchedulers.from(BackgroundLooper.get()))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
room -> methodCallHelper.readMessages(room.getRoomId())
.continueWith(new LogIfError()),
Logger::report
);
addSubscription(subscription);
}
private void getRoomRoles() {
methodCallHelper.getRoomRoles(roomId);
}
private void getRoomInfo() {
final Disposable subscription = roomRepository.getById(roomId)
.distinctUntilChanged()
.filter(Optional::isPresent)
.map(Optional::get)
.subscribeOn(AndroidSchedulers.from(BackgroundLooper.get()))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(this::processRoom, Logger::report);
addSubscription(subscription);
}
private void processRoom(Room room) {
this.currentRoom = room;
view.render(room);
if (room.isDirectMessage()) {
getUserByUsername(room.getName());
}
}
private void getUserByUsername(String username) {
final Disposable disposable = userRepository.getByUsername(username)
.distinctUntilChanged()
.filter(Optional::isPresent)
.map(Optional::get)
.subscribeOn(AndroidSchedulers.from(BackgroundLooper.get()))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(view::showUserStatus, Logger::report);
addSubscription(disposable);
}
private void getRoomHistoryStateInfo() {
final Disposable subscription = roomRepository.getHistoryStateByRoomId(roomId)
.distinctUntilChanged()
.filter(Optional::isPresent)
.map(Optional::get)
.subscribeOn(AndroidSchedulers.from(BackgroundLooper.get()))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
roomHistoryState -> {
int syncState = roomHistoryState.getSyncState();
view.updateHistoryState(
!roomHistoryState.isComplete(),
syncState == SyncState.SYNCED || syncState == SyncState.FAILED
);
},
Logger::report
);
addSubscription(subscription);
}
private void getMessages() {
final Disposable subscription = Flowable.zip(roomRepository.getById(roomId),
absoluteUrlHelper.getRocketChatAbsoluteUrl().toFlowable().cache(), Pair::new)
.subscribeOn(AndroidSchedulers.from(BackgroundLooper.get()))
.observeOn(AndroidSchedulers.mainThread())
.map(pair -> {
view.setupWith(pair.second.orNull());
return pair.first;
})
.filter(Optional::isPresent)
.map(Optional::get)
.map(room -> {
new RocketChatCache(RocketChatApplication.getInstance())
.addOpenedRoom(room.getRoomId(), room.getLastSeen());
return room;
})
.flatMap(messageInteractor::getAllFrom)
.subscribeOn(AndroidSchedulers.from(BackgroundLooper.get()))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
view::showMessages,
Logger::report
);
addSubscription(subscription);
}
private void getUserPreferences() {
final Disposable subscription = userRepository.getCurrent()
.filter(Optional::isPresent)
.map(Optional::get)
.filter(user -> user.getSettings() != null)
.map(User::getSettings)
.filter(settings -> settings.getPreferences() != null)
.map(Settings::getPreferences)
.distinctUntilChanged()
.subscribeOn(AndroidSchedulers.from(BackgroundLooper.get()))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
preferences -> {
if (preferences.isAutoImageLoad()) {
view.autoloadImages();
} else {
view.manualLoadImages();
}
},
Logger::report
);
addSubscription(subscription);
}
private void getAbsoluteUrl() {
final Disposable subscription = absoluteUrlHelper.getRocketChatAbsoluteUrl()
.subscribeOn(AndroidSchedulers.from(BackgroundLooper.get()))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
it -> view.setupWith(it.orNull()),
Logger::report
);
addSubscription(subscription);
}
private Single<Pair<Room, User>> getRoomUserPair() {
return Single.zip(
getSingleRoom(),
getCurrentUser(),
Pair::new
);
}
addSubscription(subscription);
}
private Single<Pair<Room, User>> getRoomUserPair() {
return Single.zip(
getSingleRoom(),
getCurrentUser(),
Pair::new
);
}
private Single<Room> getSingleRoom() {
return roomRepository.getById(roomId)
.filter(Optional::isPresent)
.map(Optional::get)
.firstElement()
.toSingle();
}
private Single<User> getCurrentUser() {
return userRepository.getCurrent()
.filter(Optional::isPresent)
.map(Optional::get)
.firstElement()
.toSingle();
}
private Single<Room> getSingleRoom() {
return roomRepository.getById(roomId)
.filter(Optional::isPresent)
.map(Optional::get)
.firstElement()
.toSingle();
}
private Single<User> getCurrentUser() {
return userRepository.getCurrent()
.filter(Optional::isPresent)
.map(Optional::get)
.firstElement()
.toSingle();
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment