Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
A
AloqaIM-Android
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Administrator
AloqaIM-Android
Commits
9daf0a31
Unverified
Commit
9daf0a31
authored
Nov 22, 2017
by
Leonardo Aramaki
Committed by
GitHub
Nov 22, 2017
Browse files
Options
Browse Files
Download
Plain Diff
Merge pull request #581 from RocketChat/fix/servers-logging-out
Fix servers logging out on app update
parents
975511e9
c20777bc
Changes
10
Hide whitespace changes
Inline
Side-by-side
Showing
10 changed files
with
402 additions
and
413 deletions
+402
-413
build.gradle
app/build.gradle
+3
-3
RealmBasedConnectivityManager.java
...rocket/android/service/RealmBasedConnectivityManager.java
+9
-1
RocketChatWebSocketThread.java
...hat/rocket/android/service/RocketChatWebSocketThread.java
+370
-390
build.gradle
build.gradle
+3
-3
Migration.java
...rc/main/java/chat/rocket/persistence/realm/Migration.java
+8
-0
RealmStore.java
...c/main/java/chat/rocket/persistence/realm/RealmStore.java
+2
-4
RealmMessageRepository.java
...ersistence/realm/repositories/RealmMessageRepository.java
+1
-2
RealmSessionRepository.java
...ersistence/realm/repositories/RealmSessionRepository.java
+3
-7
RealmSpotlightRepository.kt
...ersistence/realm/repositories/RealmSpotlightRepository.kt
+2
-2
build.gradle
rocket-chat-core/build.gradle
+1
-1
No files found.
app/build.gradle
View file @
9daf0a31
...
...
@@ -20,8 +20,8 @@ android {
applicationId
"chat.rocket.android"
minSdkVersion
16
targetSdkVersion
rootProject
.
ext
.
targetSdkVersion
versionCode
5
0
versionName
"1.0.2
7
"
versionCode
5
2
versionName
"1.0.2
9
"
testInstrumentationRunner
"android.support.test.runner.AndroidJUnitRunner"
vectorDrawables
.
useSupportLibrary
=
true
multiDexEnabled
true
...
...
@@ -144,7 +144,7 @@ dependencies {
testCompile
"org.jetbrains.kotlin:kotlin-test:$rootProject.ext.kotlinVersion"
testCompile
"org.jetbrains.kotlin:kotlin-test-junit:$rootProject.ext.kotlinVersion"
testCompile
"org.jetbrains.kotlin:kotlin-reflect:$rootProject.ext.kotlinVersion"
testCompile
"com.nhaarman:mockito-kotlin:1.
1
.0"
testCompile
"com.nhaarman:mockito-kotlin:1.
5
.0"
testCompile
'org.amshove.kluent:kluent:1.14'
}
apply
plugin:
'com.google.gms.google-services'
app/src/main/java/chat/rocket/android/service/RealmBasedConnectivityManager.java
View file @
9daf0a31
package
chat
.
rocket
.
android
.
service
;
import
android.annotation.SuppressLint
;
import
android.content.ComponentName
;
import
android.content.Context
;
import
android.content.ServiceConnection
;
...
...
@@ -68,6 +69,7 @@ import io.reactivex.subjects.PublishSubject;
}
}
@SuppressLint
(
"RxLeakedSubscription"
)
@DebugLog
@Override
public
void
ensureConnections
()
{
...
...
@@ -84,6 +86,7 @@ import io.reactivex.subjects.PublishSubject;
});
}
@SuppressLint
(
"RxLeakedSubscription"
)
@Override
public
void
addOrUpdateServer
(
String
hostname
,
@Nullable
String
name
,
boolean
insecure
)
{
RealmBasedServerInfo
.
addOrUpdate
(
hostname
,
name
,
insecure
);
...
...
@@ -95,6 +98,7 @@ import io.reactivex.subjects.PublishSubject;
},
RCLog:
:
e
);
}
@SuppressLint
(
"RxLeakedSubscription"
)
@Override
public
void
removeServer
(
String
hostname
)
{
RealmBasedServerInfo
.
remove
(
hostname
);
...
...
@@ -166,7 +170,11 @@ import io.reactivex.subjects.PublishSubject;
@DebugLog
private
Single
<
Boolean
>
connectToServerIfNeeded
(
String
hostname
,
boolean
forceConnect
)
{
return
Single
.
defer
(()
->
{
final
int
connectivity
=
serverConnectivityList
.
get
(
hostname
);
Integer
state
=
serverConnectivityList
.
get
(
hostname
);
if
(
state
==
null
)
{
state
=
ServerConnectivity
.
STATE_DISCONNECTED
;
}
final
int
connectivity
=
state
;
if
(!
forceConnect
&&
connectivity
==
ServerConnectivity
.
STATE_CONNECTED
)
{
return
Single
.
just
(
true
);
}
...
...
app/src/main/java/chat/rocket/android/service/RocketChatWebSocketThread.java
View file @
9daf0a31
...
...
@@ -50,418 +50,398 @@ import io.reactivex.disposables.CompositeDisposable;
* Thread for handling WebSocket connection.
*/
public
class
RocketChatWebSocketThread
extends
HandlerThread
{
private
static
final
Class
[]
REGISTERABLE_CLASSES
=
{
LoginServiceConfigurationSubscriber
.
class
,
ActiveUsersSubscriber
.
class
,
UserDataSubscriber
.
class
,
MethodCallObserver
.
class
,
SessionObserver
.
class
,
LoadMessageProcedureObserver
.
class
,
GetUsersOfRoomsProcedureObserver
.
class
,
NewMessageObserver
.
class
,
DeletedMessageObserver
.
class
,
CurrentUserObserver
.
class
,
FileUploadingToUrlObserver
.
class
,
FileUploadingWithUfsObserver
.
class
,
PushSettingsObserver
.
class
,
GcmPushRegistrationObserver
.
class
};
private
static
final
long
HEARTBEAT_PERIOD_MS
=
20000
;
private
final
Context
appContext
;
private
final
String
hostname
;
private
final
RealmHelper
realmHelper
;
private
final
ConnectivityManagerInternal
connectivityManager
;
private
final
ArrayList
<
Registrable
>
listeners
=
new
ArrayList
<>();
private
final
CompositeDisposable
hearbeatDisposable
=
new
CompositeDisposable
();
private
final
CompositeDisposable
reconnectSubscription
=
new
CompositeDisposable
();
private
boolean
listenersRegistered
;
private
static
class
KeepAliveTimer
{
private
long
lastTime
;
private
final
long
thresholdMs
;
public
KeepAliveTimer
(
long
thresholdMs
)
{
this
.
thresholdMs
=
thresholdMs
;
lastTime
=
System
.
currentTimeMillis
();
private
static
final
Class
[]
REGISTERABLE_CLASSES
=
{
LoginServiceConfigurationSubscriber
.
class
,
ActiveUsersSubscriber
.
class
,
UserDataSubscriber
.
class
,
MethodCallObserver
.
class
,
SessionObserver
.
class
,
LoadMessageProcedureObserver
.
class
,
GetUsersOfRoomsProcedureObserver
.
class
,
NewMessageObserver
.
class
,
DeletedMessageObserver
.
class
,
CurrentUserObserver
.
class
,
FileUploadingToUrlObserver
.
class
,
FileUploadingWithUfsObserver
.
class
,
PushSettingsObserver
.
class
,
GcmPushRegistrationObserver
.
class
};
private
static
final
long
HEARTBEAT_PERIOD_MS
=
20000
;
private
final
Context
appContext
;
private
final
String
hostname
;
private
final
RealmHelper
realmHelper
;
private
final
ConnectivityManagerInternal
connectivityManager
;
private
final
ArrayList
<
Registrable
>
listeners
=
new
ArrayList
<>();
private
final
CompositeDisposable
heartbeatDisposable
=
new
CompositeDisposable
();
private
final
CompositeDisposable
reconnectDisposable
=
new
CompositeDisposable
();
private
boolean
listenersRegistered
;
private
static
class
KeepAliveTimer
{
private
long
lastTime
;
private
final
long
thresholdMs
;
public
KeepAliveTimer
(
long
thresholdMs
)
{
this
.
thresholdMs
=
thresholdMs
;
lastTime
=
System
.
currentTimeMillis
();
}
public
boolean
shouldCheckPrecisely
()
{
return
lastTime
+
thresholdMs
<
System
.
currentTimeMillis
();
}
public
void
update
()
{
lastTime
=
System
.
currentTimeMillis
();
}
}
private
final
KeepAliveTimer
keepAliveTimer
=
new
KeepAliveTimer
(
20000
);
private
RocketChatWebSocketThread
(
Context
appContext
,
String
hostname
)
{
super
(
"RC_thread_"
+
hostname
);
this
.
appContext
=
appContext
;
this
.
hostname
=
hostname
;
this
.
realmHelper
=
RealmStore
.
getOrCreate
(
hostname
);
this
.
connectivityManager
=
ConnectivityManager
.
getInstanceForInternal
(
appContext
);
}
/**
* build new Thread.
*/
@DebugLog
public
static
Single
<
RocketChatWebSocketThread
>
getStarted
(
Context
appContext
,
String
hostname
)
{
return
Single
.<
RocketChatWebSocketThread
>
create
(
objectSingleEmitter
->
{
new
RocketChatWebSocketThread
(
appContext
,
hostname
)
{
@Override
protected
void
onLooperPrepared
()
{
try
{
super
.
onLooperPrepared
();
objectSingleEmitter
.
onSuccess
(
this
);
}
catch
(
Exception
exception
)
{
objectSingleEmitter
.
onError
(
exception
);
}
}
}.
start
();
}).
flatMap
(
webSocket
->
webSocket
.
connectWithExponentialBackoff
().
map
(
_val
->
webSocket
));
}
public
boolean
shouldCheckPrecisely
()
{
return
lastTime
+
thresholdMs
<
System
.
currentTimeMillis
();
@Override
protected
void
onLooperPrepared
()
{
super
.
onLooperPrepared
();
forceInvalidateTokens
();
}
public
void
update
()
{
lastTime
=
System
.
currentTimeMillis
();
private
void
forceInvalidateTokens
()
{
realmHelper
.
executeTransaction
(
realm
->
{
RealmSession
session
=
RealmSession
.
queryDefaultSession
(
realm
).
findFirst
();
if
(
session
!=
null
&&
!
TextUtils
.
isEmpty
(
session
.
getToken
())
&&
(
session
.
isTokenVerified
()
||
!
TextUtils
.
isEmpty
(
session
.
getError
())))
{
session
.
setTokenVerified
(
false
);
session
.
setError
(
null
);
}
return
null
;
}).
continueWith
(
new
LogIfError
());
}
}
private
final
KeepAliveTimer
keepAliveTimer
=
new
KeepAliveTimer
(
20000
);
private
RocketChatWebSocketThread
(
Context
appContext
,
String
hostname
)
{
super
(
"RC_thread_"
+
hostname
);
this
.
appContext
=
appContext
;
this
.
hostname
=
hostname
;
this
.
realmHelper
=
RealmStore
.
getOrCreate
(
hostname
);
this
.
connectivityManager
=
ConnectivityManager
.
getInstanceForInternal
(
appContext
);
}
/**
* build new Thread.
*/
@DebugLog
public
static
Single
<
RocketChatWebSocketThread
>
getStarted
(
Context
appContext
,
String
hostname
)
{
return
Single
.<
RocketChatWebSocketThread
>
fromPublisher
(
objectSingleEmitter
->
{
new
RocketChatWebSocketThread
(
appContext
,
hostname
)
{
@Override
protected
void
onLooperPrepared
()
{
try
{
super
.
onLooperPrepared
();
objectSingleEmitter
.
onNext
(
this
);
objectSingleEmitter
.
onComplete
();
}
catch
(
Exception
exception
)
{
objectSingleEmitter
.
onError
(
exception
);
}
/**
* terminate WebSocket thread.
*/
@DebugLog
public
Single
<
Boolean
>
terminate
()
{
if
(
isAlive
())
{
return
Single
.
create
(
emitter
->
{
new
Handler
(
getLooper
()).
post
(()
->
{
RCLog
.
d
(
"thread %s: terminated()"
,
Thread
.
currentThread
().
getId
());
unregisterListenersAndClose
();
connectivityManager
.
notifyConnectionLost
(
hostname
,
DDPClient
.
REASON_CLOSED_BY_USER
);
RocketChatWebSocketThread
.
super
.
quit
();
emitter
.
onSuccess
(
true
);
});
});
}
else
{
connectivityManager
.
notifyConnectionLost
(
hostname
,
DDPClient
.
REASON_NETWORK_ERROR
);
super
.
quit
();
return
Single
.
just
(
true
);
}
}.
start
();
}).
flatMap
(
webSocket
->
webSocket
.
connectWithExponentialBackoff
().
map
(
_val
->
webSocket
));
}
@Override
protected
void
onLooperPrepared
()
{
super
.
onLooperPrepared
();
forceInvalidateTokens
();
}
private
void
forceInvalidateTokens
()
{
realmHelper
.
executeTransaction
(
realm
->
{
RealmSession
session
=
RealmSession
.
queryDefaultSession
(
realm
).
findFirst
();
if
(
session
!=
null
&&
!
TextUtils
.
isEmpty
(
session
.
getToken
())
&&
(
session
.
isTokenVerified
()
||
!
TextUtils
.
isEmpty
(
session
.
getError
())))
{
session
.
setTokenVerified
(
false
);
session
.
setError
(
null
);
}
return
null
;
}).
continueWith
(
new
LogIfError
());
}
/**
* terminate WebSocket thread.
*/
@DebugLog
public
Single
<
Boolean
>
terminate
()
{
if
(
isAlive
())
{
return
Single
.
fromPublisher
(
emitter
->
{
new
Handler
(
getLooper
()).
post
(()
->
{
RCLog
.
d
(
"thread %s: terminated()"
,
Thread
.
currentThread
().
getId
());
unregisterListenersAndClose
();
connectivityManager
.
notifyConnectionLost
(
hostname
,
DDPClient
.
REASON_CLOSED_BY_USER
);
RocketChatWebSocketThread
.
super
.
quit
();
emitter
.
onNext
(
true
);
emitter
.
onComplete
();
});
});
}
else
{
connectivityManager
.
notifyConnectionLost
(
hostname
,
DDPClient
.
REASON_NETWORK_ERROR
);
super
.
quit
();
return
Single
.
just
(
true
);
}
}
/**
* THIS METHOD THROWS EXCEPTION!! Use terminate() instead!!
*/
@Deprecated
@Override
public
final
boolean
quit
()
{
throw
new
UnsupportedOperationException
();
}
/**
* synchronize the state of the thread with ServerConfig.
*/
@DebugLog
public
Single
<
Boolean
>
keepAlive
()
{
return
checkIfConnectionAlive
()
.
flatMap
(
alive
->
alive
?
Single
.
just
(
true
)
:
connectWithExponentialBackoff
());
}
@DebugLog
private
Single
<
Boolean
>
checkIfConnectionAlive
()
{
if
(
DDPClient
.
get
()
==
null
)
{
return
Single
.
just
(
false
);
/**
* THIS METHOD THROWS EXCEPTION!! Use terminate() instead!!
*/
@Deprecated
@Override
public
final
boolean
quit
()
{
throw
new
UnsupportedOperationException
();
}
if
(!
keepAliveTimer
.
shouldCheckPrecisely
())
{
return
Single
.
just
(
true
);
/**
* synchronize the state of the thread with ServerConfig.
*/
@DebugLog
public
Single
<
Boolean
>
keepAlive
()
{
return
checkIfConnectionAlive
()
.
flatMap
(
alive
->
alive
?
Single
.
just
(
true
)
:
connectWithExponentialBackoff
());
}
keepAliveTimer
.
update
();
return
Single
.
fromPublisher
(
emitter
->
{
new
Thread
()
{
@Override
public
void
run
()
{
DDPClient
.
get
().
ping
().
continueWith
(
task
->
{
if
(
task
.
isFaulted
())
{
Exception
error
=
task
.
getError
();
RCLog
.
e
(
error
);
connectivityManager
.
notifyConnectionLost
(
hostname
,
DDPClient
.
REASON_CLOSED_BY_USER
);
emitter
.
onError
(
error
);
}
else
{
keepAliveTimer
.
update
();
emitter
.
onNext
(
true
);
emitter
.
onComplete
();
}
return
null
;
});
@DebugLog
private
Single
<
Boolean
>
checkIfConnectionAlive
()
{
if
(
DDPClient
.
get
()
==
null
)
{
return
Single
.
just
(
false
);
}
}.
start
();
});
}
@DebugLog
private
Flowable
<
Boolean
>
heartbeat
(
long
interval
)
{
return
Flowable
.
interval
(
interval
,
TimeUnit
.
MILLISECONDS
)
.
onBackpressureDrop
()
.
flatMap
(
tick
->
DDPClient
.
get
().
doPing
().
toFlowable
())
.
map
(
callback
->
{
if
(
callback
instanceof
DDPClientCallback
.
Ping
)
{
return
true
;
}
// ideally we should never get here. We should always receive a DDPClientCallback.Ping
// because we just received a pong. But maybe we received a pong from an unmatched
// ping id which we should ignore. In this case or any other random error, log and
// send false downstream
RCLog
.
d
(
"heartbeat pong < %s"
,
callback
.
toString
());
return
false
;
});
}
private
Single
<
Boolean
>
prepareDDPClient
()
{
// TODO: temporarily replaced checkIfConnectionAlive() call for this single checking if ddpClient is
// null or not. In case it is, build a new client, otherwise just keep connecting with existing one.
return
Single
.
just
(
DDPClient
.
get
()
!=
null
)
.
doOnSuccess
(
alive
->
{
if
(!
alive
)
{
RCLog
.
d
(
"DDPClient#build"
);
}
});
}
private
Single
<
Boolean
>
connectDDPClient
()
{
return
prepareDDPClient
()
.
flatMap
(
_val
->
Single
.
fromPublisher
(
emitter
->
{
ServerInfo
info
=
connectivityManager
.
getServerInfoForHost
(
hostname
);
if
(
info
==
null
)
{
emitter
.
onNext
(
false
);
emitter
.
onComplete
();
return
;
}
RCLog
.
d
(
"DDPClient#connect"
);
DDPClient
.
get
().
connect
(
hostname
,
info
.
getSession
(),
info
.
isSecure
())
.
onSuccessTask
(
task
->
{
final
String
newSession
=
task
.
getResult
().
session
;
connectivityManager
.
notifyConnectionEstablished
(
hostname
,
newSession
);
// handling WebSocket#onClose() callback.
task
.
getResult
().
client
.
getOnCloseCallback
().
onSuccess
(
_task
->
{
RxWebSocketCallback
.
Close
result
=
_task
.
getResult
();
if
(
result
.
code
==
DDPClient
.
REASON_NETWORK_ERROR
)
{
reconnect
();
}
return
null
;
});
return
realmHelper
.
executeTransaction
(
realm
->
{
RealmSession
sessionObj
=
RealmSession
.
queryDefaultSession
(
realm
).
findFirst
();
if
(
sessionObj
==
null
)
{
realm
.
createOrUpdateObjectFromJson
(
RealmSession
.
class
,
new
JSONObject
().
put
(
RealmSession
.
ID
,
RealmSession
.
DEFAULT_ID
));
}
else
{
// invalidate login token.
if
(!
TextUtils
.
isEmpty
(
sessionObj
.
getToken
())
&&
sessionObj
.
isTokenVerified
())
{
sessionObj
.
setTokenVerified
(
false
);
sessionObj
.
setError
(
null
);
if
(!
keepAliveTimer
.
shouldCheckPrecisely
())
{
return
Single
.
just
(
true
);
}
keepAliveTimer
.
update
();
return
Single
.
create
(
emitter
->
{
new
Thread
()
{
@Override
public
void
run
()
{
DDPClient
.
get
().
ping
().
continueWith
(
task
->
{
if
(
task
.
isFaulted
())
{
Exception
error
=
task
.
getError
();
RCLog
.
e
(
error
);
connectivityManager
.
notifyConnectionLost
(
hostname
,
DDPClient
.
REASON_CLOSED_BY_USER
);
emitter
.
onError
(
error
);
}
else
{
keepAliveTimer
.
update
();
emitter
.
onSuccess
(
true
);
}
}
return
null
;
return
null
;
});
})
.
continueWith
(
task
->
{
if
(
task
.
isFaulted
())
{
emitter
.
onError
(
task
.
getError
());
}
else
{
emitter
.
onNext
(
true
);
emitter
.
onComplete
();
}
return
null
;
});
}));
}
private
void
reconnect
()
{
// if we are already trying to reconnect then return.
if
(
reconnectSubscription
.
size
()
>
0
)
{
return
;
}
}.
start
();
});
}
forceInvalidateTokens
();
connectivityManager
.
notifyConnecting
(
hostname
);
reconnectSubscription
.
add
(
connectWithExponentialBackoff
()
.
subscribe
(
connected
->
{
if
(!
connected
)
{
connectivityManager
.
notifyConnecting
(
hostname
);
@DebugLog
private
Flowable
<
Boolean
>
heartbeat
(
long
interval
)
{
return
Flowable
.
interval
(
interval
,
TimeUnit
.
MILLISECONDS
)
.
onBackpressureDrop
()
.
flatMap
(
tick
->
DDPClient
.
get
().
doPing
().
toFlowable
())
.
map
(
callback
->
{
if
(
callback
instanceof
DDPClientCallback
.
Ping
)
{
return
true
;
}
reconnectSubscription
.
clear
();
},
error
->
{
logErrorAndUnsubscribe
(
reconnectSubscription
,
error
);
connectivityManager
.
notifyConnectionLost
(
hostname
,
DDPClient
.
REASON_NETWORK_ERROR
);
}
)
);
}
private
void
logErrorAndUnsubscribe
(
CompositeDisposable
disposables
,
Throwable
err
)
{
RCLog
.
e
(
err
);
disposables
.
clear
();
}
private
Single
<
Boolean
>
connectWithExponentialBackoff
()
{
return
connect
().
retryWhen
(
RxHelper
.
exponentialBackoff
(
3
,
500
,
TimeUnit
.
MILLISECONDS
));
}
@DebugLog
private
Single
<
Boolean
>
connect
()
{
return
connectDDPClient
()
.
flatMap
(
_val
->
Single
.
fromPublisher
(
emitter
->
{
fetchPublicSettings
();
fetchPermissions
();
registerListeners
();
emitter
.
onNext
(
true
);
emitter
.
onComplete
();
}));
}
private
Task
<
Void
>
fetchPublicSettings
()
{
return
new
MethodCallHelper
(
appContext
,
realmHelper
).
getPublicSettings
(
hostname
);
}
private
Task
<
Void
>
fetchPermissions
()
{
return
new
MethodCallHelper
(
realmHelper
).
getPermissions
();
}
@DebugLog
private
void
registerListeners
()
{
if
(!
Thread
.
currentThread
().
getName
().
equals
(
"RC_thread_"
+
hostname
))
{
// execute in Looper.
new
Handler
(
getLooper
()).
post
(
this
::
registerListeners
);
return
;
// ideally we should never get here. We should always receive a DDPClientCallback.Ping
// because we just received a pong. But maybe we received a pong from an unmatched
// ping id which we should ignore. In this case or any other random error, log and
// send false downstream
RCLog
.
d
(
"heartbeat pong < %s"
,
callback
.
toString
());
return
false
;
});
}
if
(
listenersRegistered
)
{
unregisterListeners
();
private
Single
<
Boolean
>
connectDDPClient
()
{
return
Single
.
create
(
emitter
->
{
ServerInfo
info
=
connectivityManager
.
getServerInfoForHost
(
hostname
);
if
(
info
==
null
)
{
emitter
.
onSuccess
(
false
);
return
;
}
RCLog
.
d
(
"DDPClient#connect"
);
DDPClient
.
get
().
connect
(
hostname
,
info
.
getSession
(),
info
.
isSecure
())
.
onSuccessTask
(
task
->
{
final
String
newSession
=
task
.
getResult
().
session
;
connectivityManager
.
notifyConnectionEstablished
(
hostname
,
newSession
);
// handling WebSocket#onClose() callback.
task
.
getResult
().
client
.
getOnCloseCallback
().
onSuccess
(
_task
->
{
RxWebSocketCallback
.
Close
result
=
_task
.
getResult
();
if
(
result
.
code
==
DDPClient
.
REASON_NETWORK_ERROR
)
{
reconnect
();
}
return
null
;
});
return
realmHelper
.
executeTransaction
(
realm
->
{
RealmSession
sessionObj
=
RealmSession
.
queryDefaultSession
(
realm
).
findFirst
();
if
(
sessionObj
==
null
)
{
realm
.
createOrUpdateObjectFromJson
(
RealmSession
.
class
,
new
JSONObject
().
put
(
RealmSession
.
ID
,
RealmSession
.
DEFAULT_ID
));
}
else
{
// invalidate login token.
if
(!
TextUtils
.
isEmpty
(
sessionObj
.
getToken
())
&&
sessionObj
.
isTokenVerified
())
{
sessionObj
.
setTokenVerified
(
false
);
sessionObj
.
setError
(
null
);
}
}
return
null
;
});
})
.
continueWith
(
task
->
{
if
(
task
.
isFaulted
())
{
emitter
.
onError
(
task
.
getError
());
}
else
{
emitter
.
onSuccess
(
true
);
}
return
null
;
});
});
}
List
<
RealmSession
>
sessions
=
realmHelper
.
executeTransactionForReadResults
(
realm
->
realm
.
where
(
RealmSession
.
class
)
.
isNotNull
(
RealmSession
.
TOKEN
)
.
equalTo
(
RealmSession
.
TOKEN_VERIFIED
,
false
)
.
isNull
(
RealmSession
.
ERROR
)
.
findAll
());
if
(
sessions
!=
null
&&
sessions
.
size
()
>
0
)
{
// if we have a session try to resume it. At this point we're probably recovering from
// a disconnection state
final
CompositeDisposable
disposables
=
new
CompositeDisposable
();
MethodCallHelper
methodCall
=
new
MethodCallHelper
(
realmHelper
);
disposables
.
add
(
Completable
.
defer
(()
->
{
Task
<
Void
>
result
=
methodCall
.
loginWithToken
(
sessions
.
get
(
0
).
getToken
());
if
(
result
.
isFaulted
())
{
return
Completable
.
error
(
result
.
getError
());
}
else
{
return
Completable
.
complete
();
}
}).
retryWhen
(
RxHelper
.
exponentialBackoff
(
Integer
.
MAX_VALUE
,
500
,
TimeUnit
.
MILLISECONDS
))
.
subscribe
(
()
->
{
createObserversAndRegister
();
disposables
.
clear
();
},
error
->
logErrorAndUnsubscribe
(
disposables
,
error
)
)
);
}
else
{
// if we don't have any session then just build the observers and register normally
createObserversAndRegister
();
private
void
reconnect
()
{
// if we are already trying to reconnect then return.
if
(
reconnectDisposable
.
size
()
>
0
)
{
return
;
}
forceInvalidateTokens
();
connectivityManager
.
notifyConnecting
(
hostname
);
reconnectDisposable
.
add
(
connectWithExponentialBackoff
()
.
subscribe
(
connected
->
{
if
(!
connected
)
{
connectivityManager
.
notifyConnecting
(
hostname
);
}
reconnectDisposable
.
clear
();
},
error
->
{
logErrorAndUnsubscribe
(
reconnectDisposable
,
error
);
connectivityManager
.
notifyConnectionLost
(
hostname
,
DDPClient
.
REASON_NETWORK_ERROR
);
}
)
);
}
private
void
logErrorAndUnsubscribe
(
CompositeDisposable
disposables
,
Throwable
err
)
{
RCLog
.
e
(
err
);
disposables
.
clear
();
}
private
Single
<
Boolean
>
connectWithExponentialBackoff
()
{
return
connect
().
retryWhen
(
RxHelper
.
exponentialBackoff
(
3
,
500
,
TimeUnit
.
MILLISECONDS
));
}
@DebugLog
private
Single
<
Boolean
>
connect
()
{
return
connectDDPClient
()
.
flatMap
(
_val
->
Single
.
create
(
emitter
->
{
fetchPublicSettings
();
fetchPermissions
();
registerListeners
();
emitter
.
onSuccess
(
true
);
}));
}
private
Task
<
Void
>
fetchPublicSettings
()
{
return
new
MethodCallHelper
(
appContext
,
realmHelper
).
getPublicSettings
(
hostname
);
}
private
Task
<
Void
>
fetchPermissions
()
{
return
new
MethodCallHelper
(
realmHelper
).
getPermissions
();
}
@DebugLog
private
void
registerListeners
()
{
if
(!
Thread
.
currentThread
().
getName
().
equals
(
"RC_thread_"
+
hostname
))
{
// execute in Looper.
new
Handler
(
getLooper
()).
post
(
this
::
registerListeners
);
return
;
}
if
(
listenersRegistered
)
{
unregisterListeners
();
}
List
<
RealmSession
>
sessions
=
realmHelper
.
executeTransactionForReadResults
(
realm
->
realm
.
where
(
RealmSession
.
class
)
.
isNotNull
(
RealmSession
.
TOKEN
)
.
equalTo
(
RealmSession
.
TOKEN_VERIFIED
,
false
)
.
isNull
(
RealmSession
.
ERROR
)
.
findAll
());
if
(
sessions
!=
null
&&
sessions
.
size
()
>
0
)
{
// if we have a session try to resume it. At this point we're probably recovering from
// a disconnection state
final
CompositeDisposable
disposables
=
new
CompositeDisposable
();
MethodCallHelper
methodCall
=
new
MethodCallHelper
(
realmHelper
);
disposables
.
add
(
Completable
.
defer
(()
->
{
Task
<
Void
>
result
=
methodCall
.
loginWithToken
(
sessions
.
get
(
0
).
getToken
());
if
(
result
.
isFaulted
())
{
return
Completable
.
error
(
result
.
getError
());
}
else
{
return
Completable
.
complete
();
}
}).
retryWhen
(
RxHelper
.
exponentialBackoff
(
3
,
500
,
TimeUnit
.
MILLISECONDS
))
.
subscribe
(
()
->
{
createObserversAndRegister
();
disposables
.
clear
();
},
error
->
logErrorAndUnsubscribe
(
disposables
,
error
)
)
);
}
else
{
// if we don't have any session then just build the observers and register normally
createObserversAndRegister
();
}
}
}
@DebugLog
private
void
createObserversAndRegister
()
{
for
(
Class
clazz
:
REGISTERABLE_CLASSES
)
{
try
{
Constructor
ctor
=
clazz
.
getConstructor
(
Context
.
class
,
String
.
class
,
RealmHelper
.
class
);
Object
obj
=
ctor
.
newInstance
(
appContext
,
hostname
,
realmHelper
);
if
(
obj
instanceof
Registrable
)
{
Registrable
registrable
=
(
Registrable
)
obj
;
registrable
.
register
();
listeners
.
add
(
registrable
);
@DebugLog
private
void
createObserversAndRegister
()
{
for
(
Class
clazz
:
REGISTERABLE_CLASSES
)
{
try
{
Constructor
ctor
=
clazz
.
getConstructor
(
Context
.
class
,
String
.
class
,
RealmHelper
.
class
);
Object
obj
=
ctor
.
newInstance
(
appContext
,
hostname
,
realmHelper
);
if
(
obj
instanceof
Registrable
)
{
Registrable
registrable
=
(
Registrable
)
obj
;
registrable
.
register
();
listeners
.
add
(
registrable
);
}
}
catch
(
Exception
exception
)
{
RCLog
.
w
(
exception
,
"Failed to register listeners!!"
);
}
}
}
catch
(
Exception
exception
)
{
RCLog
.
w
(
exception
,
"Failed to register listeners!!"
);
}
listenersRegistered
=
true
;
startHeartBeat
();
}
listenersRegistered
=
true
;
startHeartBeat
();
}
private
void
startHeartBeat
()
{
hearbeatDisposable
.
clear
();
hearbeatDisposable
.
add
(
heartbeat
(
HEARTBEAT_PERIOD_MS
)
.
subscribe
(
ponged
->
{
if
(!
ponged
)
{
RCLog
.
d
(
"Pong received but didn't match ping id"
);
}
},
error
->
{
RCLog
.
e
(
error
);
// Stop pinging
hearbeatDisposable
.
clear
();
if
(
error
instanceof
DDPClientCallback
.
Closed
||
error
instanceof
TimeoutException
)
{
RCLog
.
d
(
"Hearbeat failure: retrying connection..."
);
reconnect
();
}
}
)
);
}
@DebugLog
private
void
unregisterListenersAndClose
()
{
unregisterListeners
();
DDPClient
.
get
().
close
();
}
@DebugLog
private
void
unregisterListeners
()
{
Iterator
<
Registrable
>
iterator
=
listeners
.
iterato
r
();
while
(
iterator
.
hasNext
())
{
Registrable
registrable
=
iterator
.
next
();
registrable
.
unregiste
r
();
iterator
.
remove
()
;
private
void
startHeartBeat
()
{
heartbeatDisposable
.
clear
();
heartbeatDisposable
.
add
(
heartbeat
(
HEARTBEAT_PERIOD_MS
)
.
subscribe
(
ponged
->
{
if
(!
ponged
)
{
RCLog
.
d
(
"Pong received but didn't match ping id"
);
}
},
error
->
{
RCLog
.
e
(
error
);
// Stop pinging
heartbeatDisposable
.
clear
();
if
(
error
instanceof
DDPClientCallback
.
Closed
||
error
instanceof
TimeoutException
)
{
RCLog
.
d
(
"Hearbeat failure: retrying connection..."
);
reconnect
();
}
}
)
);
}
@DebugLog
private
void
unregisterListenersAndClose
()
{
unregisterListeners
();
DDPClient
.
get
().
close
();
}
@DebugLog
private
void
unregisterListeners
()
{
Iterator
<
Registrable
>
iterator
=
listeners
.
iterator
();
while
(
iterator
.
hasNext
())
{
Registrable
registrable
=
iterator
.
next
();
registrable
.
unregiste
r
();
iterator
.
remove
();
}
heartbeatDisposable
.
clea
r
();
listenersRegistered
=
false
;
}
hearbeatDisposable
.
clear
();
listenersRegistered
=
false
;
}
}
build.gradle
View file @
9daf0a31
...
...
@@ -15,9 +15,9 @@ buildscript {
maven
{
url
'http://oss.jfrog.org/artifactory/oss-snapshot-local'
}
}
dependencies
{
classpath
'com.android.tools.build:gradle:3.0.
0
'
classpath
"org.jetbrains.kotlin:kotlin-gradle-plugin:1.1.
51
"
classpath
'io.realm:realm-gradle-plugin:4.2.0
-SNAPSHOT
'
classpath
'com.android.tools.build:gradle:3.0.
1
'
classpath
"org.jetbrains.kotlin:kotlin-gradle-plugin:1.1.
60
"
classpath
'io.realm:realm-gradle-plugin:4.2.0'
classpath
'com.jakewharton.hugo:hugo-plugin:1.2.1'
classpath
'com.google.gms:google-services:3.0.0'
classpath
'com.github.triplet.gradle:play-publisher:1.1.5'
...
...
persistence-realm/src/main/java/chat/rocket/persistence/realm/Migration.java
View file @
9daf0a31
...
...
@@ -6,6 +6,7 @@ import chat.rocket.persistence.realm.models.ddp.RealmRole;
import
chat.rocket.persistence.realm.models.ddp.RealmRoomRole
;
import
chat.rocket.persistence.realm.models.ddp.RealmSpotlightRoom
;
import
chat.rocket.persistence.realm.models.ddp.RealmSpotlightUser
;
import
chat.rocket.persistence.realm.models.ddp.RealmUser
;
import
io.realm.DynamicRealm
;
import
io.realm.FieldAttribute
;
import
io.realm.RealmMigration
;
...
...
@@ -67,6 +68,13 @@ public class Migration implements RealmMigration {
if
(
oldVersion
==
4
)
{
RealmObjectSchema
messageSchema
=
schema
.
get
(
"RealmMessage"
);
messageSchema
.
addField
(
RealmMessage
.
EDITED_AT
,
long
.
class
);
oldVersion
++;
}
if
(
oldVersion
==
5
)
{
RealmObjectSchema
userSchema
=
schema
.
get
(
"RealmUser"
);
userSchema
.
addField
(
RealmUser
.
NAME
,
String
.
class
);
}
}
...
...
persistence-realm/src/main/java/chat/rocket/persistence/realm/RealmStore.java
View file @
9daf0a31
...
...
@@ -15,9 +15,7 @@ public class RealmStore {
.
name
(
name
+
".realm"
)
.
modules
(
new
RocketChatLibraryModule
())
.
migration
(
new
Migration
())
.
schemaVersion
(
5
)
// Just in case
.
deleteRealmIfMigrationNeeded
()
.
schemaVersion
(
6
)
.
build
();
}
...
...
@@ -49,7 +47,7 @@ public class RealmStore {
sStore
.
put
(
name
,
new
RealmConfiguration
.
Builder
()
.
name
(
name
+
".realm"
)
.
modules
(
new
RocketChatServerModule
())
.
deleteRealmIfMigrationNeeded
().
build
());
.
build
());
}
return
new
RealmHelper
(
sStore
.
get
(
name
));
}
...
...
persistence-realm/src/main/java/chat/rocket/persistence/realm/repositories/RealmMessageRepository.java
View file @
9daf0a31
...
...
@@ -41,8 +41,7 @@ public class RealmMessageRepository extends RealmRepository implements MessageRe
return
Flowable
.
empty
();
}
return
pair
.
first
.
where
(
RealmMessage
.
class
)
return
pair
.
first
.
where
(
RealmMessage
.
class
)
.
equalTo
(
RealmMessage
.
ID
,
messageId
)
.
findAll
()
.<
RealmResults
<
RealmMessage
>>
asFlowable
();
...
...
persistence-realm/src/main/java/chat/rocket/persistence/realm/repositories/RealmSessionRepository.java
View file @
9daf0a31
...
...
@@ -7,6 +7,7 @@ import com.hadisatrio.optional.Optional;
import
chat.rocket.core.models.Session
;
import
chat.rocket.core.repositories.SessionRepository
;
import
chat.rocket.persistence.realm.RealmHelper
;
import
chat.rocket.persistence.realm.RealmStore
;
import
chat.rocket.persistence.realm.models.internal.RealmSession
;
import
io.reactivex.Flowable
;
...
...
@@ -44,7 +45,7 @@ public class RealmSessionRepository extends RealmRepository implements SessionRe
if
(
realmSessions
.
size
()
==
0
)
{
return
Optional
.
absent
();
}
return
Optional
.
of
(
realmSessions
.
get
(
0
).
asSession
());
return
Optional
.
of
(
realmSessions
.
get
(
0
).
asSession
());
}));
}
...
...
@@ -73,14 +74,9 @@ public class RealmSessionRepository extends RealmRepository implements SessionRe
realmSession
.
setTokenVerified
(
session
.
isTokenVerified
());
realmSession
.
setError
(
session
.
getError
());
realm
.
beginTransaction
();
return
realm
.
copyToRealmOrUpdate
(
realmSession
)
.
asFlowable
()
return
RealmHelper
.
copyToRealmOrUpdate
(
realm
,
realmSession
)
.
filter
(
it
->
it
!=
null
&&
it
.
isLoaded
()
&&
it
.
isValid
())
.
firstElement
()
.
doOnSuccess
(
it
->
realm
.
commitTransaction
())
.
doOnError
(
throwable
->
realm
.
cancelTransaction
())
.
doOnEvent
((
realmObject
,
throwable
)
->
close
(
realm
,
looper
))
.
toSingle
()
.
map
(
realmObject
->
true
);
...
...
persistence-realm/src/main/java/chat/rocket/persistence/realm/repositories/RealmSpotlightRepository.kt
View file @
9daf0a31
...
...
@@ -24,8 +24,8 @@ class RealmSpotlightRepository(private val hostname: String) : RealmRepository()
}
return
@using
pair
.
first
.
where
(
RealmSpotlight
::
class
.
java
)
.
findAllSorted
(
Columns
.
TYPE
,
Sort
.
DESCENDING
)
.
asFlowable
()
.
findAllSorted
(
Columns
.
TYPE
,
Sort
.
DESCENDING
)
.
asFlowable
()
})
{
pair
->
close
(
pair
.
first
,
pair
.
second
)
}
.
unsubscribeOn
(
AndroidSchedulers
.
from
(
Looper
.
myLooper
()
!!
))
.
filter
{
realmSpotlightResults
->
realmSpotlightResults
.
isLoaded
&&
realmSpotlightResults
.
isValid
}
...
...
rocket-chat-core/build.gradle
View file @
9daf0a31
...
...
@@ -6,7 +6,7 @@ dependencies {
compile
extraDependencies
.
rxJava
compile
extraDependencies
.
optional
compile
"org.jetbrains.kotlin:kotlin-stdlib-jre7:$rootProject.ext.kotlinVersion"
compile
'com.google.code.findbugs:jsr305:3.0.
1
'
compile
'com.google.code.findbugs:jsr305:3.0.
2
'
compileOnly
'com.google.auto.value:auto-value:1.3'
kapt
'com.google.auto.value:auto-value:1.3'
kapt
'com.gabrielittner.auto.value:auto-value-with:1.0.0'
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment