Commit 3016ef5b authored by Lucio Maciel's avatar Lucio Maciel

Improve socket connection

parent 3b576568
......@@ -6,16 +6,14 @@ import chat.rocket.android.chatroom.domain.UriInteractor
import chat.rocket.android.chatroom.viewmodel.ViewModelMapper
import chat.rocket.android.core.lifecycle.CancelStrategy
import chat.rocket.android.server.domain.*
import chat.rocket.android.server.infraestructure.RocketChatClientFactory
import chat.rocket.android.server.infraestructure.ConnectionManagerFactory
import chat.rocket.android.server.infraestructure.state
import chat.rocket.android.util.extensions.launchUI
import chat.rocket.common.RocketChatException
import chat.rocket.common.model.RoomType
import chat.rocket.common.model.roomTypeOf
import chat.rocket.common.util.ifNull
import chat.rocket.core.internal.realtime.State
import chat.rocket.core.internal.realtime.connect
import chat.rocket.core.internal.realtime.subscribeRoomMessages
import chat.rocket.core.internal.realtime.unsubscribe
import chat.rocket.core.internal.rest.*
import chat.rocket.core.model.Message
import chat.rocket.core.model.Value
......@@ -23,6 +21,7 @@ import kotlinx.coroutines.experimental.CommonPool
import kotlinx.coroutines.experimental.async
import kotlinx.coroutines.experimental.channels.Channel
import kotlinx.coroutines.experimental.launch
import org.threeten.bp.Instant
import timber.log.Timber
import javax.inject.Inject
......@@ -33,14 +32,22 @@ class ChatRoomPresenter @Inject constructor(private val view: ChatRoomView,
private val permissions: GetPermissionsInteractor,
private val uriInteractor: UriInteractor,
private val messagesRepository: MessagesRepository,
factory: RocketChatClientFactory,
factory: ConnectionManagerFactory,
private val mapper: ViewModelMapper) {
private val client = factory.create(serverInteractor.get()!!)
private var subId: String? = null
private val currentServer = serverInteractor.get()!!
private val manager = factory.create(currentServer)
private val client = manager.client
private var settings: Map<String, Value<Any>> = getSettingsInteractor.get(serverInteractor.get()!!)!!
private val messagesChannel = Channel<Message>()
private var chatRoomId: String? = null
private var chatRoomType: String? = null
private val stateChannel = Channel<State>()
private var lastState = manager.state
fun loadMessages(chatRoomId: String, chatRoomType: String, offset: Long = 0) {
this.chatRoomId = chatRoomId
this.chatRoomType = chatRoomType
launchUI(strategy) {
view.showLoading()
try {
......@@ -55,10 +62,7 @@ class ChatRoomPresenter @Inject constructor(private val view: ChatRoomView,
val messagesViewModels = mapper.map(messages)
view.showMessages(messagesViewModels)
// Subscribe after getting the first page of messages from REST
if (offset == 0L) {
subscribeMessages(chatRoomId)
}
subscribeMessages(chatRoomId)
} catch (ex: Exception) {
ex.printStackTrace()
ex.message?.let {
......@@ -131,7 +135,7 @@ class ChatRoomPresenter @Inject constructor(private val view: ChatRoomView,
}
}
fun markRoomAsRead(roomId: String) {
private fun markRoomAsRead(roomId: String) {
launchUI(strategy) {
try {
client.markAsRead(roomId)
......@@ -142,57 +146,62 @@ class ChatRoomPresenter @Inject constructor(private val view: ChatRoomView,
}
}
private fun subscribeMessages(roomId: String) {
client.addStateChannel(stateChannel)
manager.subscribeRoomMessages(roomId, messagesChannel)
launch(CommonPool + strategy.jobs) {
for (status in stateChannel) {
Timber.d("Changing status to: $status")
when (status) {
State.Authenticating -> Timber.d("Authenticating")
State.Connected -> {
Timber.d("Connected")
subId = client.subscribeRoomMessages(roomId) {
Timber.d("subscribe messages for $roomId: $it")
}
}
}
for (message in messagesChannel) {
Timber.d("New message for room ${message.roomId}")
updateMessage(message)
}
Timber.d("Done on statusChannel")
}
when (client.state) {
State.Connected -> {
Timber.d("Already connected")
subId = client.subscribeRoomMessages(roomId) {
Timber.d("subscribe messages for $roomId: $it")
lastState = manager.state
manager.addStatusChannel(stateChannel)
launch(CommonPool + strategy.jobs) {
for (state in stateChannel) {
Timber.d("Got new state: $state - last: $lastState")
if (state is State.Connected && lastState != state) {
loadMissingMessages()
}
lastState = state
}
else -> client.connect()
}
}
launchUI(strategy) {
listenMessages(roomId)
}
private fun loadMissingMessages() {
launch(parent = strategy.jobs) {
if (chatRoomId != null && chatRoomType != null) {
val roomType = roomTypeOf(chatRoomType!!)
val lastMessage = messagesRepository.getByRoomId(chatRoomId!!).sortedByDescending { it.timestamp }.first()
val instant = Instant.ofEpochMilli(lastMessage.timestamp)
val messages = client.history(chatRoomId!!, roomType, count = 50,
oldest = instant.toString())
Timber.d("History: $messages")
// TODO - when we have a proper service, we won't need to take care of connection, just
// subscribe and listen...
/*launchUI(strategy) {
subId = client.subscribeRoomMessages(roomId) {
Timber.d("subscribe messages for $roomId: $it")
}
listenMessages(roomId)
}*/
}
if (messages.result.isNotEmpty()) {
val models = mapper.map(messages.result)
messagesRepository.saveAll(messages.result)
fun unsubscribeMessages() {
launch(CommonPool) {
client.removeStateChannel(stateChannel)
subId?.let { subscriptionId ->
client.unsubscribe(subscriptionId)
launchUI(strategy) {
view.showNewMessage(models)
}
if (messages.result.size == 50) {
// we loade at least count messages, try one more to fetch more messages
loadMissingMessages()
}
}
}
}
}
fun unsubscribeMessages(chatRoomId: String) {
manager.removeStatusChannel(stateChannel)
manager.unsubscribeRoomMessages(chatRoomId)
}
/**
* Delete the message with the given id.
*
......@@ -317,17 +326,6 @@ class ChatRoomPresenter @Inject constructor(private val view: ChatRoomView,
}
}
private suspend fun listenMessages(roomId: String) {
launch(CommonPool + strategy.jobs) {
for (message in client.messagesChannel) {
if (message.roomId != roomId) {
Timber.d("Ignoring message for room ${message.roomId}, expecting $roomId")
}
updateMessage(message)
}
}
}
private fun updateMessage(streamedMessage: Message) {
launchUI(strategy) {
val viewModelStreamedMessage = mapper.map(streamedMessage)
......
......@@ -112,7 +112,7 @@ class ChatRoomFragment : Fragment(), ChatRoomView, EmojiKeyboardPopup.Listener {
}
override fun onDestroyView() {
presenter.unsubscribeMessages()
presenter.unsubscribeMessages(chatRoomId)
handler.removeCallbacksAndMessages(null)
unsubscribeTextMessage()
super.onDestroyView()
......
......@@ -2,14 +2,18 @@ package chat.rocket.android.chatrooms.presentation
import chat.rocket.android.core.lifecycle.CancelStrategy
import chat.rocket.android.server.domain.*
import chat.rocket.android.server.infraestructure.RocketChatClientFactory
import chat.rocket.android.server.infraestructure.ConnectionManager
import chat.rocket.android.server.infraestructure.ConnectionManagerFactory
import chat.rocket.android.server.infraestructure.chatRooms
import chat.rocket.android.server.infraestructure.state
import chat.rocket.android.util.extensions.launchUI
import chat.rocket.common.RocketChatException
import chat.rocket.common.model.BaseRoom
import chat.rocket.common.model.RoomType
import chat.rocket.core.RocketChatClient
import chat.rocket.core.internal.model.Subscription
import chat.rocket.core.internal.realtime.*
import chat.rocket.core.internal.rest.chatRooms
import chat.rocket.core.internal.realtime.State
import chat.rocket.core.internal.realtime.StreamMessage
import chat.rocket.core.internal.realtime.Type
import chat.rocket.core.model.ChatRoom
import chat.rocket.core.model.Room
import kotlinx.coroutines.experimental.*
......@@ -25,14 +29,17 @@ class ChatRoomsPresenter @Inject constructor(private val view: ChatRoomsView,
private val saveChatRoomsInteractor: SaveChatRoomsInteractor,
private val refreshSettingsInteractor: RefreshSettingsInteractor,
settingsRepository: SettingsRepository,
factory: RocketChatClientFactory) {
private val client: RocketChatClient = factory.create(serverInteractor.get()!!)
factory: ConnectionManagerFactory) {
private val manager: ConnectionManager = factory.create(serverInteractor.get()!!)
private val currentServer = serverInteractor.get()!!
private var reloadJob: Deferred<List<ChatRoom>>? = null
private val settings = settingsRepository.get(currentServer)!!
private val subscriptionsChannel = Channel<StreamMessage<BaseRoom>>()
private val stateChannel = Channel<State>()
private var lastState = manager.state
fun loadChatRooms() {
refreshSettingsInteractor.refreshAsync(currentServer)
launchUI(strategy) {
......@@ -75,8 +82,9 @@ class ChatRoomsPresenter @Inject constructor(private val view: ChatRoomsView,
}
private suspend fun loadRooms(): List<ChatRoom> {
val chatRooms = client.chatRooms().update
val chatRooms = manager.chatRooms().update
val sortedRooms = sortRooms(chatRooms)
Timber.d("Loaded rooms: ${sortedRooms.size}")
saveChatRoomsInteractor.save(currentServer, sortedRooms)
return sortedRooms
}
......@@ -87,6 +95,7 @@ class ChatRoomsPresenter @Inject constructor(private val view: ChatRoomsView,
}
private fun updateRooms() {
Timber.d("Updating Rooms")
launch {
view.updateChatRooms(getChatRoomsInteractor.get(currentServer))
}
......@@ -104,7 +113,7 @@ class ChatRoomsPresenter @Inject constructor(private val view: ChatRoomsView,
// TODO - Temporary stuff, remove when adding DB support
private suspend fun subscribeRoomUpdates() {
client.addStateChannel(stateChannel)
/*client.addStateChannel(stateChannel)
launch(CommonPool + strategy.jobs) {
for (status in stateChannel) {
Timber.d("Changing status to: $status")
......@@ -124,7 +133,7 @@ class ChatRoomsPresenter @Inject constructor(private val view: ChatRoomsView,
Timber.d("Done on statusChannel")
}
when (client.state) {
when (manager.state) {
State.Connected -> {
Timber.d("Already connected")
}
......@@ -143,11 +152,35 @@ class ChatRoomsPresenter @Inject constructor(private val view: ChatRoomsView,
Timber.d("Got message: $message")
updateSubscription(message)
}
}*/
manager.addStatusChannel(stateChannel)
manager.addRoomsAndSubscriptionsChannel(subscriptionsChannel)
launch(CommonPool + strategy.jobs) {
for (message in subscriptionsChannel) {
Timber.d("Got message: $message")
when (message.data) {
is Room -> updateRoom(message as StreamMessage<Room>)
is Subscription -> updateSubscription(message as StreamMessage<Subscription>)
}
}
}
lastState = manager.state
launch(CommonPool + strategy.jobs) {
for (state in stateChannel) {
Timber.d("Got new state: $state - last: $lastState")
if (state is State.Connected && lastState != state) {
reloadRooms()
updateRooms()
}
lastState = state
}
}
}
private fun updateRoom(message: StreamMessage<Room>) {
launchUI(strategy) {
private suspend fun updateRoom(message: StreamMessage<Room>) {
Timber.d("Update Room: ${message.type} - ${message.data.id} - ${message.data.name}")
//launchUI(strategy) {
when (message.type) {
Type.Removed -> {
removeRoom(message.data.id)
......@@ -163,11 +196,12 @@ class ChatRoomsPresenter @Inject constructor(private val view: ChatRoomsView,
}
updateRooms()
}
//}
}
private fun updateSubscription(message: StreamMessage<Subscription>) {
launchUI(strategy) {
private suspend fun updateSubscription(message: StreamMessage<Subscription>) {
Timber.d("Update Subscription: ${message.type} - ${message.data.id} - ${message.data.name}")
//launchUI(strategy) {
when (message.type) {
Type.Removed -> {
removeRoom(message.data.roomId)
......@@ -183,7 +217,7 @@ class ChatRoomsPresenter @Inject constructor(private val view: ChatRoomsView,
}
updateRooms()
}
//}
}
private suspend fun reloadRooms() {
......@@ -200,6 +234,7 @@ class ChatRoomsPresenter @Inject constructor(private val view: ChatRoomsView,
// Update a ChatRoom with a Room information
private fun updateRoom(room: Room) {
Timber.d("Updating Room: ${room.id} - ${room.name}")
val chatRooms = getChatRoomsInteractor.get(currentServer).toMutableList()
val chatRoom = chatRooms.find { chatRoom -> chatRoom.id == room.id }
chatRoom?.apply {
......@@ -230,6 +265,7 @@ class ChatRoomsPresenter @Inject constructor(private val view: ChatRoomsView,
// Update a ChatRoom with a Subscription information
private fun updateSubscription(subscription: Subscription) {
Timber.d("Updating subscrition: ${subscription.id} - ${subscription.name}")
val chatRooms = getChatRoomsInteractor.get(currentServer).toMutableList()
val chatRoom = chatRooms.find { chatRoom -> chatRoom.id == subscription.roomId }
chatRoom?.apply {
......@@ -261,6 +297,7 @@ class ChatRoomsPresenter @Inject constructor(private val view: ChatRoomsView,
private fun removeRoom(id: String,
chatRooms: MutableList<ChatRoom> = getChatRoomsInteractor.get(currentServer).toMutableList()) {
Timber.d("Removing ROOM: $id")
synchronized(this) {
chatRooms.removeAll { chatRoom -> chatRoom.id == id }
}
......@@ -268,7 +305,7 @@ class ChatRoomsPresenter @Inject constructor(private val view: ChatRoomsView,
}
fun disconnect() {
client.removeStateChannel(stateChannel)
client.disconnect()
manager.removeStatusChannel(stateChannel)
manager.removeRoomsAndSubscriptionsChannel(subscriptionsChannel)
}
}
\ No newline at end of file
......@@ -20,7 +20,10 @@ import chat.rocket.android.widget.DividerItemDecoration
import chat.rocket.core.model.ChatRoom
import dagger.android.support.AndroidSupportInjection
import kotlinx.android.synthetic.main.fragment_chat_rooms.*
import kotlinx.coroutines.experimental.CommonPool
import kotlinx.coroutines.experimental.Job
import kotlinx.coroutines.experimental.android.UI
import kotlinx.coroutines.experimental.async
import kotlinx.coroutines.experimental.launch
import javax.inject.Inject
......@@ -30,6 +33,8 @@ class ChatRoomsFragment : Fragment(), ChatRoomsView {
@Inject lateinit var settingsRepository: SettingsRepository
private var searchView: SearchView? = null
private var listJob: Job? = null
companion object {
fun newInstance() = ChatRoomsFragment()
}
......@@ -74,17 +79,19 @@ class ChatRoomsFragment : Fragment(), ChatRoomsView {
override suspend fun updateChatRooms(newDataSet: List<ChatRoom>) {
activity.apply {
launch(UI) {
listJob?.cancel()
listJob = launch(UI) {
val adapter = recycler_view.adapter as ChatRoomsAdapter
// FIXME https://fabric.io/rocketchat3/android/apps/chat.rocket.android.dev/issues/5a90d4718cb3c2fa63b3f557?time=last-seven-days
// TODO - fix this bug to reenable DiffUtil
/*val diff = async(CommonPool) {
val diff = async(CommonPool) {
DiffUtil.calculateDiff(RoomsDiffCallback(adapter.dataSet, newDataSet))
}.await()*/
}.await()
adapter.updateRooms(newDataSet)
adapter.notifyDataSetChanged()
//diff.dispatchUpdatesTo(adapter)
if (isActive) {
adapter.updateRooms(newDataSet)
diff.dispatchUpdatesTo(adapter)
}
}
}
}
......
......@@ -2,6 +2,7 @@ package chat.rocket.android.main.presentation
import chat.rocket.android.infrastructure.LocalRepository
import chat.rocket.android.server.domain.GetCurrentServerInteractor
import chat.rocket.android.server.infraestructure.ConnectionManagerFactory
import chat.rocket.android.server.infraestructure.RocketChatClientFactory
import chat.rocket.common.RocketChatException
import chat.rocket.core.RocketChatClient
......@@ -13,9 +14,11 @@ import javax.inject.Inject
class MainPresenter @Inject constructor(private val navigator: MainNavigator,
private val serverInteractor: GetCurrentServerInteractor,
private val localRepository: LocalRepository,
managerFactory: ConnectionManagerFactory,
factory: RocketChatClientFactory) {
private val client: RocketChatClient = factory.create(serverInteractor.get()!!)
private val currentServer = serverInteractor.get()!!
private val manager = managerFactory.create(currentServer)
private val client: RocketChatClient = factory.create(currentServer)
fun toChatList() = navigator.toChatList()
......@@ -49,4 +52,12 @@ class MainPresenter @Inject constructor(private val navigator: MainNavigator,
}
localRepository.clearAllFromServer(currentServer)
}
fun connect() {
manager.connect()
}
fun disconnect() {
manager.disconnect()
}
}
\ No newline at end of file
......@@ -29,6 +29,7 @@ class MainActivity : AppCompatActivity(), MainView, HasSupportFragmentInjector {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
presenter.connect()
setupToolbar()
setupNavigationView()
}
......@@ -41,6 +42,13 @@ class MainActivity : AppCompatActivity(), MainView, HasSupportFragmentInjector {
}
}
override fun onDestroy() {
super.onDestroy()
if (isFinishing) {
presenter.disconnect()
}
}
override fun onLogout() {
finish()
val intent = Intent(this, AuthenticationActivity::class.java)
......
package chat.rocket.android.server.infraestructure
import chat.rocket.common.model.BaseRoom
import chat.rocket.core.RocketChatClient
import chat.rocket.core.internal.realtime.*
import chat.rocket.core.internal.rest.chatRooms
import chat.rocket.core.model.Message
import kotlinx.coroutines.experimental.Job
import kotlinx.coroutines.experimental.channels.Channel
import kotlinx.coroutines.experimental.launch
import timber.log.Timber
class ConnectionManager(internal val client: RocketChatClient) {
private val statusChannelList = ArrayList<Channel<State>>()
private val statusChannel = Channel<State>()
private var connectJob: Job? = null
private val roomAndSubscriptionChannels = ArrayList<Channel<StreamMessage<BaseRoom>>>()
private val roomMessagesChannels = LinkedHashMap<String, Channel<Message>>()
private val subscriptionIdMap = HashMap<String, String>()
private var subscriptionId: String? = null
private var roomsId: String? = null
fun connect() {
if (connectJob?.isActive == true && client.state is State.Connected) {
Timber.d("Already connected, just returning...")
return
}
// cleanup first
client.removeStateChannel(statusChannel)
client.disconnect()
connectJob?.cancel()
// Connect and setup
client.connect()
client.addStateChannel(statusChannel)
connectJob = launch {
for (status in statusChannel) {
Timber.d("Changing status to: $status")
when (status) {
is State.Connected -> {
client.subscribeSubscriptions { _, id ->
Timber.d("Subscribed to subscriptions: $id")
subscriptionId = id
}
client.subscribeRooms { _, id ->
Timber.d("Subscribed to rooms: $id")
roomsId = id
}
resubscribeRooms()
}
is State.Waiting -> {
Timber.d("Connection in: ${status.seconds}")
}
}
for (channel in statusChannelList) {
channel.send(status)
}
}
}
launch(parent = connectJob) {
for (room in client.roomsChannel) {
Timber.d("GOT Room streamed")
for (channel in roomAndSubscriptionChannels) {
channel.send(room)
}
}
}
launch(parent = connectJob) {
for (subscription in client.subscriptionsChannel) {
Timber.d("GOT Subscription streamed")
for (channel in roomAndSubscriptionChannels) {
channel.send(subscription)
}
}
}
launch(parent = connectJob) {
for (message in client.messagesChannel) {
Timber.d("Received new Message for room ${message.roomId}")
val channel = roomMessagesChannels[message.roomId]
channel?.send(message)
}
}
// Broadcast initial state...
val state = client.state
for (channel in statusChannelList) {
channel.offer(state)
}
}
private fun resubscribeRooms() {
roomMessagesChannels.toList().map { (roomId, channel) ->
client.subscribeRoomMessages(roomId) { _, id ->
Timber.d("Subscribed to $roomId: $id")
subscriptionIdMap[roomId] = id
}
}
}
fun disconnect() {
client.removeStateChannel(statusChannel)
client.disconnect()
connectJob?.cancel()
}
fun addStatusChannel(channel: Channel<State>) = statusChannelList.add(channel)
fun removeStatusChannel(channel: Channel<State>) = statusChannelList.remove(channel)
fun addRoomsAndSubscriptionsChannel(channel: Channel<StreamMessage<BaseRoom>>) = roomAndSubscriptionChannels.add(channel)
fun removeRoomsAndSubscriptionsChannel(channel: Channel<StreamMessage<BaseRoom>>) = roomAndSubscriptionChannels.remove(channel)
fun subscribeRoomMessages(roomId: String, channel: Channel<Message>) {
val oldSub = roomMessagesChannels.put(roomId, channel)
if (oldSub != null) {
Timber.d("Room $roomId already subscribed...")
return
}
if (client.state is State.Connected) {
client.subscribeRoomMessages(roomId) { _, id ->
Timber.d("Subscribed to $roomId: $id")
subscriptionIdMap[roomId] = id
}
}
}
fun unsubscribeRoomMessages(roomId: String) {
val sub = roomMessagesChannels.remove(roomId)
if (sub != null) {
val id = subscriptionIdMap.remove(roomId)
id?.let { client.unsubscribe(it) }
}
}
}
suspend fun ConnectionManager.chatRooms(timestamp: Long = 0, filterCustom: Boolean = true)
= client.chatRooms(timestamp, filterCustom)
val ConnectionManager.state: State
get() = client.state
\ No newline at end of file
package chat.rocket.android.server.infraestructure
import timber.log.Timber
import javax.inject.Inject
import javax.inject.Singleton
@Singleton
class ConnectionManagerFactory @Inject constructor(private val factory: RocketChatClientFactory) {
private val cache = HashMap<String, ConnectionManager>()
fun create(url: String): ConnectionManager {
cache[url]?.let {
Timber.d("Returning CACHED Manager for: $url")
return it
}
Timber.d("Returning FRESH Manager for: $url")
val manager = ConnectionManager(factory.create(url))
cache[url] = manager
return manager
}
}
\ No newline at end of file
......@@ -28,7 +28,7 @@ class RocketChatClientFactory @Inject constructor(val okHttpClient: OkHttpClient
}
Timber.d("Returning NEW client for: $url")
cache.put(url, client)
cache[url] = client
return client
}
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment