Commit 4b61077d authored by Lucio Maciel's avatar Lucio Maciel

Create an Operation queue for DB write operations

parent 62a28388
......@@ -174,7 +174,9 @@ class ChatRoomPresenter @Inject constructor(
view.showLoading()
try {
if (offset == 0L) {
val localMessages = messagesRepository.getByRoomId(chatRoomId)
// FIXME - load just 50 messages from DB to speed up. We will reload from Network after that
// FIXME - We need to handle the pagination, first fetch from DB, then from network
val localMessages = messagesRepository.getRecentMessages(chatRoomId, 50)
val oldMessages = mapper.map(
localMessages, RoomUiModel(
roles = chatRoles,
......
......@@ -24,6 +24,7 @@ import chat.rocket.common.model.roomTypeOf
import chat.rocket.common.model.userStatusOf
import chat.rocket.core.model.Room
import chat.rocket.core.model.SpotlightResult
import ru.noties.markwon.Markwon
class RoomUiModelMapper(
private val context: Application,
......@@ -119,6 +120,8 @@ class RoomUiModelMapper(
type is RoomType.DirectMessage) } else { null }
val open = open
val lastMessageMarkdown = lastMessage?.let { Markwon.markdown(context, it.toString()).toString() }
RoomUiModel(
id = id,
name = roomName,
......@@ -128,7 +131,7 @@ class RoomUiModelMapper(
date = timestamp,
unread = unread,
alert = isUnread,
lastMessage = lastMessage,
lastMessage = lastMessageMarkdown,
status = status,
username = if (type is RoomType.DirectMessage) name else null
)
......
......@@ -33,7 +33,7 @@ class RoomViewHolder(itemView: View, private val listener: (RoomUiModel) -> Unit
if (room.lastMessage != null) {
text_last_message.isVisible = true
text_last_message.text = Markwon.markdown(context, room.lastMessage.toString()).toString()
text_last_message.text = room.lastMessage
} else {
text_last_message.isGone = true
}
......
......@@ -19,17 +19,21 @@ class RoomsAdapter(private val listener: (RoomUiModel) -> Unit) : RecyclerView.A
}
override fun onCreateViewHolder(parent: ViewGroup, viewType: Int): ViewHolder<*> {
if (viewType == VIEW_TYPE_ROOM) {
val view = parent.inflate(R.layout.item_chat)
return RoomViewHolder(view, listener)
} else if (viewType == VIEW_TYPE_HEADER) {
val view = parent.inflate(R.layout.item_chatroom_header)
return HeaderViewHolder(view)
} else if (viewType == VIEW_TYPE_LOADING) {
val view = parent.inflate(R.layout.item_loading)
return LoadingViewHolder(view)
return when (viewType) {
VIEW_TYPE_ROOM -> {
val view = parent.inflate(R.layout.item_chat)
RoomViewHolder(view, listener)
}
VIEW_TYPE_HEADER -> {
val view = parent.inflate(R.layout.item_chatroom_header)
HeaderViewHolder(view)
}
VIEW_TYPE_LOADING -> {
val view = parent.inflate(R.layout.item_loading)
LoadingViewHolder(view)
}
else -> throw IllegalStateException("View type must be either Room, Header or Loading")
}
throw IllegalStateException("View type must be either Room, Header or Loading")
}
override fun getItemCount() = values.size
......
......@@ -9,11 +9,13 @@ import chat.rocket.android.db.model.MessageChannels
import chat.rocket.android.db.model.MessageEntity
import chat.rocket.android.db.model.MessageFavoritesRelation
import chat.rocket.android.db.model.MessageMentionsRelation
import chat.rocket.android.db.model.MessagesSync
import chat.rocket.android.db.model.ReactionEntity
import chat.rocket.android.db.model.UrlEntity
import chat.rocket.android.db.model.UserEntity
import chat.rocket.android.db.model.UserStatus
import chat.rocket.android.db.model.asEntity
import chat.rocket.android.util.extensions.exhaustive
import chat.rocket.android.util.extensions.removeTrailingSlash
import chat.rocket.android.util.extensions.toEntity
import chat.rocket.android.util.extensions.userId
......@@ -31,6 +33,7 @@ import chat.rocket.core.model.Room
import chat.rocket.core.model.attachment.Attachment
import chat.rocket.core.model.userId
import kotlinx.coroutines.experimental.Job
import kotlinx.coroutines.experimental.channels.Channel
import kotlinx.coroutines.experimental.launch
import kotlinx.coroutines.experimental.newSingleThreadContext
import kotlinx.coroutines.experimental.withContext
......@@ -45,7 +48,11 @@ class DatabaseManager(val context: Application,
RCDatabase::class.java, serverUrl.databaseName())
.fallbackToDestructiveMigration()
.build()
val dbContext = newSingleThreadContext("$serverUrl-db-context")
private val dbContext = newSingleThreadContext("$serverUrl-db-context")
private val dbManagerContext = newSingleThreadContext("$serverUrl-db-manager-context")
private val writeChannel = Channel<Operation>(Channel.UNLIMITED)
private var dbJob: Job? = null
private val insertSubs = HashMap<String, Subscription>()
private val insertRooms = HashMap<String, Room>()
......@@ -56,9 +63,32 @@ class DatabaseManager(val context: Application,
fun userDao(): UserDao = database.userDao()
fun messageDao(): MessageDao = database.messageDao()
init {
start()
}
fun start() {
dbJob?.cancel()
dbJob = launch(dbContext) {
for (operation in writeChannel) {
doOperation(operation)
}
}
}
fun stop() {
dbJob?.cancel()
dbJob = null
}
suspend fun sendOperation(operation: Operation) {
Timber.d("writerChannel: $writeChannel, closedForSend: ${writeChannel.isClosedForSend}, closedForReceive: ${writeChannel.isClosedForReceive}, empty: ${writeChannel.isEmpty}, full: ${writeChannel.isFull}")
writeChannel.send(operation)
}
suspend fun clearUsersStatus() {
withContext(dbContext) {
userDao().clearStatus()
withContext(dbManagerContext) {
sendOperation(Operation.ClearStatus)
}
}
......@@ -66,15 +96,14 @@ class DatabaseManager(val context: Application,
database.clearAllTables()
}
suspend fun getRoom(id: String) = withContext(dbContext) {
suspend fun getRoom(id: String) = withContext(dbManagerContext) {
chatRoomDao().get(id)
}
fun processUsersBatch(users: List<User>) {
launch(dbContext) {
val dao = userDao()
launch(dbManagerContext) {
val list = ArrayList<BaseUserEntity>(users.size)
var time = measureTimeMillis {
val time = measureTimeMillis {
users.forEach { user ->
user.toEntity()?.let { entity ->
list.add(entity)
......@@ -82,9 +111,7 @@ class DatabaseManager(val context: Application,
}
}
Timber.d("Converted users batch(${users.size}) in $time MS")
time = measureTimeMillis { dao.upsert(list) }
Timber.d("Upserted users batch(${users.size}) in $time MS")
sendOperation(Operation.InsertUsers(list))
}
}
......@@ -92,7 +119,7 @@ class DatabaseManager(val context: Application,
* Creates a list of data base operations
*/
fun processChatRoomsBatch(batch: List<StreamMessage<BaseRoom>>) {
launch(dbContext) {
launch(dbManagerContext) {
val toRemove = HashSet<String>()
val toInsert = ArrayList<ChatRoomEntity>(batch.size / 2)
val toUpdate = ArrayList<ChatRoomEntity>(batch.size)
......@@ -116,28 +143,15 @@ class DatabaseManager(val context: Application,
val filteredUpdate = toUpdate.filterNot { toRemove.contains(it.id) }
val filteredInsert = toInsert.filterNot { toRemove.contains(it.id) }
Timber.d("Running ChatRooms transaction: remove: $toRemove - insert: $toInsert - update: $filteredUpdate")
chatRoomDao().update(filteredInsert, filteredUpdate, toRemove.toList())
//updateMessages(batch)
sendOperation(Operation.UpdateRooms(filteredInsert, filteredUpdate, toRemove.toList()))
} catch (ex: Exception) {
Timber.d(ex, "Error updating chatrooms")
}
}
}
private fun updateMessages(batch: List<StreamMessage<BaseRoom>>) {
val list = batch.filterNot { it.type == Type.Removed }
.filter { it.data is Room }
.filterNot { (it.data as Room).lastMessage == null }
.map { (it.data as Room).lastMessage!! }
processMessagesBatch(list)
}
fun updateSelfUser(myself: Myself) {
launch(dbContext) {
launch(dbManagerContext) {
val user = userDao().getUser(myself.id)
val entity = user?.copy(
name = myself.name ?: user.name,
......@@ -147,27 +161,26 @@ class DatabaseManager(val context: Application,
) ?: myself.asUser().toEntity()
Timber.d("UPDATING SELF: $entity")
entity?.let { userDao().upsert(entity) }
entity?.let { sendOperation(Operation.UpsertUser(it)) }
}
}
fun processRooms(rooms: List<ChatRoom>) {
launch(dbContext) {
launch(dbManagerContext) {
val entities = rooms.map { mapChatRoom(it) }
chatRoomDao().insertOrReplace(entities)
sendOperation(Operation.InsertRooms(entities))
}
}
fun processMessagesBatch(messages: List<Message>): Job {
return launch(dbContext) {
val dao = messageDao()
return launch(dbManagerContext) {
val list = mutableListOf<Pair<MessageEntity, List<BaseMessageEntity>>>()
messages.forEach { message ->
val pair = createMessageEntities(message)
list.add(pair)
}
dao.insert(list)
sendOperation(Operation.InsertMessages(list))
}
}
......@@ -499,32 +512,83 @@ class DatabaseManager(val context: Application,
}
suspend fun insert(rooms: List<ChatRoomEntity>) {
withContext(dbContext) {
chatRoomDao().cleanInsert(rooms)
withContext(dbManagerContext) {
sendOperation(Operation.CleanInsertRooms(rooms))
}
}
suspend fun insert(user: UserEntity) {
withContext(dbContext) {
userDao().insert(user)
}
sendOperation(Operation.InsertUser(user))
}
private suspend fun insertUserIfMissing(id: String?) {
if (id != null && findUser(id) == null) {
Timber.d("Missing user, inserting: $id")
insert(UserEntity(id))
sendOperation(Operation.InsertUser(UserEntity(id)))
}
}
private suspend fun insertUserIfMissing(user: SimpleUser?) {
if (user?.id != null && findUser(user.id!!) == null) {
Timber.d("Missing user, inserting: ${user.id}")
insert(UserEntity(user.id!!, user.username, user.name))
sendOperation(Operation.InsertUser(UserEntity(user.id!!, user.username, user.name)))
}
}
fun findUser(userId: String): String? = userDao().findUser(userId)
private fun findUser(userId: String): String? = userDao().findUser(userId)
private fun doOperation(operation: Operation) {
when (operation) {
is Operation.ClearStatus -> userDao().clearStatus()
is Operation.UpdateRooms -> {
Timber.d("Running ChatRooms transaction: remove: ${operation.toRemove} - insert: ${operation.toInsert} - update: ${operation.toUpdate}")
chatRoomDao().update(operation.toInsert, operation.toUpdate, operation.toRemove)
}
is Operation.InsertRooms -> {
chatRoomDao().insertOrReplace(operation.chatRooms)
}
is Operation.CleanInsertRooms -> {
chatRoomDao().cleanInsert(operation.chatRooms)
}
is Operation.InsertUsers -> {
val time = measureTimeMillis { userDao().upsert(operation.users) }
Timber.d("Upserted users batch(${operation.users.size}) in $time MS")
}
is Operation.InsertUser -> {
userDao().insert(operation.user)
}
is Operation.UpsertUser -> {
userDao().upsert(operation.user)
}
is Operation.InsertMessages -> {
messageDao().insert(operation.list)
}
is Operation.SaveLastSync -> {
messageDao().saveLastSync(operation.sync)
}
}.exhaustive
}
}
sealed class Operation {
object ClearStatus : Operation()
data class UpdateRooms(
val toInsert: List<ChatRoomEntity>,
val toUpdate: List<ChatRoomEntity>,
val toRemove: List<String>
) : Operation()
data class InsertRooms(val chatRooms: List<ChatRoomEntity>) : Operation()
data class CleanInsertRooms(val chatRooms: List<ChatRoomEntity>) : Operation()
data class InsertUsers(val users: List<BaseUserEntity>) : Operation()
data class UpsertUser(val user: BaseUserEntity) : Operation()
data class InsertUser(val user: UserEntity) : Operation()
data class InsertMessages(val list: List<Pair<MessageEntity, List<BaseMessageEntity>>>) : Operation()
data class SaveLastSync(val sync: MessagesSync) : Operation()
}
fun User.toEntity(): BaseUserEntity? {
......
package chat.rocket.android.server.infraestructure
import chat.rocket.android.db.DatabaseManager
import chat.rocket.android.db.Operation
import chat.rocket.android.db.model.MessagesSync
import chat.rocket.android.server.domain.MessagesRepository
import chat.rocket.core.model.Message
......@@ -61,9 +62,7 @@ class DatabaseMessagesRepository(
}
override suspend fun saveLastSyncDate(roomId: String, timeMillis: Long) {
withContext(dbManager.dbContext) {
dbManager.messageDao().saveLastSync(MessagesSync(roomId, timeMillis))
}
dbManager.sendOperation(Operation.SaveLastSync(MessagesSync(roomId, timeMillis)))
}
override suspend fun getLastSyncDate(roomId: String): Long? = withContext(CommonPool) {
......
package chat.rocket.android.util.extensions
val <T> T.exhaustive: T
get() = this
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment