Commit d13702d9 authored by Yusuke Iwaki's avatar Yusuke Iwaki

Merge branch 'origin/okhttp3.5.0' into develop

parents aa7eb67a 8e235f66
...@@ -37,7 +37,7 @@ android { ...@@ -37,7 +37,7 @@ android {
dependencies { dependencies {
compile project(':log-wrapper') compile project(':log-wrapper')
compile rootProject.ext.supportAnnotations compile rootProject.ext.supportAnnotations
compile 'com.squareup.okhttp3:okhttp-ws:3.4.1' compile rootProject.ext.okhttp3
compile rootProject.ext.rxJava compile rootProject.ext.rxJava
compile rootProject.ext.boltsTask compile rootProject.ext.boltsTask
} }
...@@ -15,7 +15,8 @@ public class DDPClientCallback { ...@@ -15,7 +15,8 @@ public class DDPClientCallback {
public static abstract class BaseException extends Exception { public static abstract class BaseException extends Exception {
public DDPClient client; public DDPClient client;
public BaseException(DDPClient client) { public BaseException(Class<? extends BaseException> clazz, DDPClient client) {
super(clazz.getName());
this.client = client; this.client = client;
} }
} }
...@@ -32,14 +33,14 @@ public class DDPClientCallback { ...@@ -32,14 +33,14 @@ public class DDPClientCallback {
public String version; public String version;
public Failed(DDPClient client, String version) { public Failed(DDPClient client, String version) {
super(client); super(Failed.class, client);
this.version = version; this.version = version;
} }
} }
public static class Timeout extends BaseException { public static class Timeout extends BaseException {
public Timeout(DDPClient client) { public Timeout(DDPClient client) {
super(client); super(Timeout.class, client);
} }
} }
} }
...@@ -54,7 +55,7 @@ public class DDPClientCallback { ...@@ -54,7 +55,7 @@ public class DDPClientCallback {
public static class Timeout extends BaseException { public static class Timeout extends BaseException {
public Timeout(DDPClient client) { public Timeout(DDPClient client) {
super(client); super(Timeout.class, client);
} }
} }
} }
...@@ -74,7 +75,7 @@ public class DDPClientCallback { ...@@ -74,7 +75,7 @@ public class DDPClientCallback {
public JSONObject error; public JSONObject error;
public Error(DDPClient client, String id, JSONObject error) { public Error(DDPClient client, String id, JSONObject error) {
super(client); super(Error.class, client);
this.id = id; this.id = id;
this.error = error; this.error = error;
} }
...@@ -82,8 +83,14 @@ public class DDPClientCallback { ...@@ -82,8 +83,14 @@ public class DDPClientCallback {
public static class Timeout extends BaseException { public static class Timeout extends BaseException {
public Timeout(DDPClient client) { public Timeout(DDPClient client) {
super(client); super(Timeout.class, client);
} }
} }
} }
public static class Closed extends BaseException {
public Closed(DDPClient client) {
super(Closed.class, client);
}
}
} }
...@@ -59,7 +59,8 @@ public class DDPClientImpl { ...@@ -59,7 +59,8 @@ public class DDPClientImpl {
.subscribe(callback -> { .subscribe(callback -> {
sendMessage("connect", sendMessage("connect",
json -> (TextUtils.isEmpty(session) ? json : json.put("session", session)).put( json -> (TextUtils.isEmpty(session) ? json : json.put("session", session)).put(
"version", "pre2").put("support", new JSONArray().put("pre2").put("pre1"))); "version", "pre2").put("support", new JSONArray().put("pre2").put("pre1")),
task);
}, err -> { }, err -> {
})); }));
...@@ -71,20 +72,20 @@ public class DDPClientImpl { ...@@ -71,20 +72,20 @@ public class DDPClientImpl {
.subscribe(response -> { .subscribe(response -> {
String msg = extractMsg(response); String msg = extractMsg(response);
if ("connected".equals(msg) && !response.isNull("session")) { if ("connected".equals(msg) && !response.isNull("session")) {
task.setResult( task.trySetResult(
new DDPClientCallback.Connect(client, response.optString("session"))); new DDPClientCallback.Connect(client, response.optString("session")));
subscriptions.unsubscribe(); subscriptions.unsubscribe();
} else if ("error".equals(msg) && "Already connected".equals( } else if ("error".equals(msg) && "Already connected".equals(
response.optString("reason"))) { response.optString("reason"))) {
task.setResult(new DDPClientCallback.Connect(client, null)); task.trySetResult(new DDPClientCallback.Connect(client, null));
subscriptions.unsubscribe(); subscriptions.unsubscribe();
} else if ("failed".equals(msg)) { } else if ("failed".equals(msg)) {
task.setError( task.trySetError(
new DDPClientCallback.Connect.Failed(client, response.optString("version"))); new DDPClientCallback.Connect.Failed(client, response.optString("version")));
subscriptions.unsubscribe(); subscriptions.unsubscribe();
} }
}, err -> { }, err -> {
task.setError(new DDPClientCallback.Connect.Timeout(client)); task.trySetError(new DDPClientCallback.Connect.Timeout(client));
})); }));
addErrorCallback(subscriptions, task); addErrorCallback(subscriptions, task);
...@@ -101,135 +102,146 @@ public class DDPClientImpl { ...@@ -101,135 +102,146 @@ public class DDPClientImpl {
public void ping(final TaskCompletionSource<DDPClientCallback.Ping> task, public void ping(final TaskCompletionSource<DDPClientCallback.Ping> task,
@Nullable final String id) { @Nullable final String id) {
CompositeSubscription subscriptions = new CompositeSubscription();
subscriptions.add( final boolean requested = (TextUtils.isEmpty(id)) ?
observable.filter(callback -> callback instanceof RxWebSocketCallback.Message) sendMessage("ping", null) :
.map(callback -> ((RxWebSocketCallback.Message) callback).responseBodyString) sendMessage("ping", json -> json.put("id", id));
.map(DDPClientImpl::toJson)
.timeout(4, TimeUnit.SECONDS) if (requested) {
.subscribe(response -> { CompositeSubscription subscriptions = new CompositeSubscription();
String msg = extractMsg(response);
if ("pong".equals(msg)) { subscriptions.add(
if (response.isNull("id")) { observable.filter(callback -> callback instanceof RxWebSocketCallback.Message)
task.setResult(new DDPClientCallback.Ping(client, null)); .map(callback -> ((RxWebSocketCallback.Message) callback).responseBodyString)
subscriptions.unsubscribe(); .map(DDPClientImpl::toJson)
} else { .timeout(4, TimeUnit.SECONDS)
String _id = response.optString("id"); .subscribe(response -> {
if (id.equals(_id)) { String msg = extractMsg(response);
task.setResult(new DDPClientCallback.Ping(client, id)); if ("pong".equals(msg)) {
if (response.isNull("id")) {
task.setResult(new DDPClientCallback.Ping(client, null));
subscriptions.unsubscribe(); subscriptions.unsubscribe();
} else {
String _id = response.optString("id");
if (id.equals(_id)) {
task.setResult(new DDPClientCallback.Ping(client, id));
subscriptions.unsubscribe();
}
} }
} }
} }, err -> {
}, err -> { task.setError(new DDPClientCallback.Ping.Timeout(client));
task.setError(new DDPClientCallback.Ping.Timeout(client)); }));
}));
addErrorCallback(subscriptions, task);
if (TextUtils.isEmpty(id)) { addErrorCallback(subscriptions, task);
sendMessage("ping", null);
} else {
sendMessage("ping", json -> json.put("id", id));
} }
} }
public void sub(final TaskCompletionSource<DDPSubscription.Ready> task, String name, public void sub(final TaskCompletionSource<DDPSubscription.Ready> task, String name,
JSONArray params, String id) { JSONArray params, String id) {
CompositeSubscription subscriptions = new CompositeSubscription(); final boolean requested =
sendMessage("sub", json -> json.put("id", id).put("name", name).put("params", params));
subscriptions.add( if (requested) {
observable.filter(callback -> callback instanceof RxWebSocketCallback.Message) CompositeSubscription subscriptions = new CompositeSubscription();
.map(callback -> ((RxWebSocketCallback.Message) callback).responseBodyString)
.map(DDPClientImpl::toJson) subscriptions.add(
.subscribe(response -> { observable.filter(callback -> callback instanceof RxWebSocketCallback.Message)
String msg = extractMsg(response); .map(callback -> ((RxWebSocketCallback.Message) callback).responseBodyString)
if ("ready".equals(msg) && !response.isNull("subs")) { .map(DDPClientImpl::toJson)
JSONArray ids = response.optJSONArray("subs"); .subscribe(response -> {
for (int i = 0; i < ids.length(); i++) { String msg = extractMsg(response);
String _id = ids.optString(i); if ("ready".equals(msg) && !response.isNull("subs")) {
JSONArray ids = response.optJSONArray("subs");
for (int i = 0; i < ids.length(); i++) {
String _id = ids.optString(i);
if (id.equals(_id)) {
task.setResult(new DDPSubscription.Ready(client, id));
subscriptions.unsubscribe();
break;
}
}
} else if ("nosub".equals(msg) && !response.isNull("id") && !response.isNull(
"error")) {
String _id = response.optString("id");
if (id.equals(_id)) { if (id.equals(_id)) {
task.setResult(new DDPSubscription.Ready(client, id)); task.setError(new DDPSubscription.NoSub.Error(client, id,
response.optJSONObject("error")));
subscriptions.unsubscribe(); subscriptions.unsubscribe();
break;
} }
} }
} else if ("nosub".equals(msg) && !response.isNull("id") && !response.isNull( }, err -> {
"error")) { }));
String _id = response.optString("id");
if (id.equals(_id)) {
task.setError(new DDPSubscription.NoSub.Error(client, id,
response.optJSONObject("error")));
subscriptions.unsubscribe();
}
}
}, err -> {
}));
addErrorCallback(subscriptions, task);
sendMessage("sub", json -> json.put("id", id).put("name", name).put("params", params)); addErrorCallback(subscriptions, task);
}
} }
public void unsub(final TaskCompletionSource<DDPSubscription.NoSub> task, public void unsub(final TaskCompletionSource<DDPSubscription.NoSub> task,
@Nullable final String id) { @Nullable final String id) {
CompositeSubscription subscriptions = new CompositeSubscription();
subscriptions.add( final boolean requested = sendMessage("unsub", json -> json.put("id", id));
observable.filter(callback -> callback instanceof RxWebSocketCallback.Message)
.map(callback -> ((RxWebSocketCallback.Message) callback).responseBodyString)
.map(DDPClientImpl::toJson)
.subscribe(response -> {
String msg = extractMsg(response);
if ("nosub".equals(msg) && response.isNull("error") && !response.isNull("id")) {
String _id = response.optString("id");
if (id.equals(_id)) {
task.setResult(new DDPSubscription.NoSub(client, id));
subscriptions.unsubscribe();
}
}
}, err -> {
}));
addErrorCallback(subscriptions, task); if (requested) {
CompositeSubscription subscriptions = new CompositeSubscription();
sendMessage("unsub", json -> json.put("id", id)); subscriptions.add(
observable.filter(callback -> callback instanceof RxWebSocketCallback.Message)
.map(callback -> ((RxWebSocketCallback.Message) callback).responseBodyString)
.map(DDPClientImpl::toJson)
.subscribe(response -> {
String msg = extractMsg(response);
if ("nosub".equals(msg) && response.isNull("error") && !response.isNull("id")) {
String _id = response.optString("id");
if (id.equals(_id)) {
task.setResult(new DDPSubscription.NoSub(client, id));
subscriptions.unsubscribe();
}
}
}, err -> {
}));
addErrorCallback(subscriptions, task);
}
} }
public void rpc(final TaskCompletionSource<DDPClientCallback.RPC> task, String method, public void rpc(final TaskCompletionSource<DDPClientCallback.RPC> task, String method,
JSONArray params, String id, long timeoutMs) { JSONArray params, String id, long timeoutMs) {
CompositeSubscription subscriptions = new CompositeSubscription(); final boolean requested =
sendMessage("method",
json -> json.put("method", method).put("params", params).put("id", id));
subscriptions.add( if (requested) {
observable.filter(callback -> callback instanceof RxWebSocketCallback.Message) CompositeSubscription subscriptions = new CompositeSubscription();
.map(callback -> ((RxWebSocketCallback.Message) callback).responseBodyString)
.map(DDPClientImpl::toJson) subscriptions.add(
.timeout(timeoutMs, TimeUnit.MILLISECONDS) observable.filter(callback -> callback instanceof RxWebSocketCallback.Message)
.subscribe(response -> { .map(callback -> ((RxWebSocketCallback.Message) callback).responseBodyString)
String msg = extractMsg(response); .map(DDPClientImpl::toJson)
if ("result".equals(msg)) { .timeout(timeoutMs, TimeUnit.MILLISECONDS)
String _id = response.optString("id"); .subscribe(response -> {
if (id.equals(_id)) { String msg = extractMsg(response);
if (!response.isNull("error")) { if ("result".equals(msg)) {
task.setError(new DDPClientCallback.RPC.Error(client, id, String _id = response.optString("id");
response.optJSONObject("error"))); if (id.equals(_id)) {
} else { if (!response.isNull("error")) {
String result = response.optString("result"); task.setError(new DDPClientCallback.RPC.Error(client, id,
task.setResult(new DDPClientCallback.RPC(client, id, result)); response.optJSONObject("error")));
} else {
String result = response.optString("result");
task.setResult(new DDPClientCallback.RPC(client, id, result));
}
subscriptions.unsubscribe();
} }
subscriptions.unsubscribe();
} }
} }, err -> {
}, err -> { if (err instanceof TimeoutException) {
if (err instanceof TimeoutException) { task.setError(new DDPClientCallback.RPC.Timeout(client));
task.setError(new DDPClientCallback.RPC.Timeout(client)); }
} }));
}));
addErrorCallback(subscriptions, task);
sendMessage("method", json -> json.put("method", method).put("params", params).put("id", id)); addErrorCallback(subscriptions, task);
}
} }
private void subscribeBaseListeners() { private void subscribeBaseListeners() {
...@@ -325,14 +337,22 @@ public class DDPClientImpl { ...@@ -325,14 +337,22 @@ public class DDPClientImpl {
}); });
} }
private void sendMessage(String msg, @Nullable JSONBuilder json) { private boolean sendMessage(String msg, @Nullable JSONBuilder json) {
try { try {
JSONObject origJson = new JSONObject().put("msg", msg); JSONObject origJson = new JSONObject().put("msg", msg);
String msg2 = (json == null ? origJson : json.create(origJson)).toString(); String msg2 = (json == null ? origJson : json.create(origJson)).toString();
websocket.sendText(msg2); return websocket.sendText(msg2);
} catch (Exception e) { } catch (Exception e) {
RCLog.e(e); RCLog.e(e);
} }
return true; // ignore exception here.
}
private void sendMessage(String msg, @Nullable JSONBuilder json,
TaskCompletionSource<?> taskForSetError) {
if (!sendMessage(msg, json)) {
taskForSetError.trySetError(new DDPClientCallback.Closed(client));
}
} }
private void addErrorCallback(CompositeSubscription subscriptions, TaskCompletionSource<?> task) { private void addErrorCallback(CompositeSubscription subscriptions, TaskCompletionSource<?> task) {
......
...@@ -4,13 +4,9 @@ import java.io.IOException; ...@@ -4,13 +4,9 @@ import java.io.IOException;
import chat.rocket.android.log.RCLog; import chat.rocket.android.log.RCLog;
import okhttp3.OkHttpClient; import okhttp3.OkHttpClient;
import okhttp3.Request; import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response; import okhttp3.Response;
import okhttp3.ResponseBody; import okhttp3.WebSocket;
import okhttp3.ws.WebSocket; import okhttp3.WebSocketListener;
import okhttp3.ws.WebSocketCall;
import okhttp3.ws.WebSocketListener;
import okio.Buffer;
import rx.Observable; import rx.Observable;
import rx.Subscriber; import rx.Subscriber;
import rx.exceptions.OnErrorNotImplementedException; import rx.exceptions.OnErrorNotImplementedException;
...@@ -28,12 +24,11 @@ public class RxWebSocket { ...@@ -28,12 +24,11 @@ public class RxWebSocket {
public ConnectableObservable<RxWebSocketCallback.Base> connect(String url) { public ConnectableObservable<RxWebSocketCallback.Base> connect(String url) {
final Request request = new Request.Builder().url(url).build(); final Request request = new Request.Builder().url(url).build();
WebSocketCall call = WebSocketCall.create(httpClient, request);
return Observable.create(new Observable.OnSubscribe<RxWebSocketCallback.Base>() { return Observable.create(new Observable.OnSubscribe<RxWebSocketCallback.Base>() {
@Override @Override
public void call(Subscriber<? super RxWebSocketCallback.Base> subscriber) { public void call(Subscriber<? super RxWebSocketCallback.Base> subscriber) {
call.enqueue(new WebSocketListener() { httpClient.newWebSocket(request, new WebSocketListener() {
@Override @Override
public void onOpen(WebSocket webSocket, Response response) { public void onOpen(WebSocket webSocket, Response response) {
isConnected = true; isConnected = true;
...@@ -42,29 +37,23 @@ public class RxWebSocket { ...@@ -42,29 +37,23 @@ public class RxWebSocket {
} }
@Override @Override
public void onFailure(IOException e, Response response) { public void onFailure(WebSocket webSocket, Throwable err, Response response) {
try { try {
isConnected = false; isConnected = false;
subscriber.onError(new RxWebSocketCallback.Failure(webSocket, e, response)); subscriber.onError(new RxWebSocketCallback.Failure(webSocket, err, response));
} catch (OnErrorNotImplementedException ex) { } catch (OnErrorNotImplementedException ex) {
RCLog.w(ex, "OnErrorNotImplementedException ignored"); RCLog.w(ex, "OnErrorNotImplementedException ignored");
} }
} }
@Override @Override
public void onMessage(ResponseBody responseBody) throws IOException { public void onMessage(WebSocket webSocket, String text) {
isConnected = true; isConnected = true;
subscriber.onNext(new RxWebSocketCallback.Message(webSocket, responseBody)); subscriber.onNext(new RxWebSocketCallback.Message(webSocket, text));
} }
@Override @Override
public void onPong(Buffer payload) { public void onClosed(WebSocket webSocket, int code, String reason) {
isConnected = true;
subscriber.onNext(new RxWebSocketCallback.Pong(webSocket, payload));
}
@Override
public void onClose(int code, String reason) {
isConnected = false; isConnected = false;
subscriber.onNext(new RxWebSocketCallback.Close(webSocket, code, reason)); subscriber.onNext(new RxWebSocketCallback.Close(webSocket, code, reason));
subscriber.onCompleted(); subscriber.onCompleted();
...@@ -74,15 +63,15 @@ public class RxWebSocket { ...@@ -74,15 +63,15 @@ public class RxWebSocket {
}).publish(); }).publish();
} }
public void sendText(String message) throws IOException { public boolean sendText(String message) throws IOException {
webSocket.sendMessage(RequestBody.create(WebSocket.TEXT, message)); return webSocket.send(message);
} }
public boolean isConnected() { public boolean isConnected() {
return isConnected; return isConnected;
} }
public void close(int code, String reason) throws IOException { public boolean close(int code, String reason) throws IOException {
webSocket.close(code, reason); return webSocket.close(code, reason);
} }
} }
...@@ -2,12 +2,9 @@ package chat.rocket.android_ddp.rx; ...@@ -2,12 +2,9 @@ package chat.rocket.android_ddp.rx;
import static android.R.attr.type; import static android.R.attr.type;
import java.io.IOException;
import chat.rocket.android.log.RCLog; import chat.rocket.android.log.RCLog;
import okhttp3.Response; import okhttp3.Response;
import okhttp3.ResponseBody; import okhttp3.WebSocket;
import okhttp3.ws.WebSocket;
import okio.Buffer;
public class RxWebSocketCallback { public class RxWebSocketCallback {
public static abstract class Base { public static abstract class Base {
...@@ -38,8 +35,8 @@ public class RxWebSocketCallback { ...@@ -38,8 +35,8 @@ public class RxWebSocketCallback {
public WebSocket ws; public WebSocket ws;
public Response response; public Response response;
public Failure(WebSocket websocket, IOException e, Response response) { public Failure(WebSocket websocket, Throwable err, Response response) {
super(e); super(err);
this.ws = websocket; this.ws = websocket;
this.response = response; this.response = response;
} }
...@@ -57,10 +54,10 @@ public class RxWebSocketCallback { ...@@ -57,10 +54,10 @@ public class RxWebSocketCallback {
public static class Message extends Base { public static class Message extends Base {
public String responseBodyString; public String responseBodyString;
public Message(WebSocket websocket, ResponseBody responseBody) { public Message(WebSocket websocket, String responseBody) {
super("Message", websocket); super("Message", websocket);
try { try {
this.responseBodyString = responseBody.string(); this.responseBodyString = responseBody;
} catch (Exception e) { } catch (Exception e) {
RCLog.e(e, "error in reading response(Message)"); RCLog.e(e, "error in reading response(Message)");
} }
...@@ -72,15 +69,6 @@ public class RxWebSocketCallback { ...@@ -72,15 +69,6 @@ public class RxWebSocketCallback {
} }
} }
public static class Pong extends Base {
public Buffer payload;
public Pong(WebSocket websocket, Buffer payload) {
super("Pong", websocket);
this.payload = payload;
}
}
public static class Close extends Base { public static class Close extends Base {
public int code; public int code;
public String reason; public String reason;
......
...@@ -15,7 +15,7 @@ ext { ...@@ -15,7 +15,7 @@ ext {
rxJava = 'io.reactivex:rxjava:1.2.2' rxJava = 'io.reactivex:rxjava:1.2.2'
boltsTask = 'com.parse.bolts:bolts-tasks:1.4.0' boltsTask = 'com.parse.bolts:bolts-tasks:1.4.0'
okhttp3 = 'com.squareup.okhttp3:okhttp:3.4.1' okhttp3 = 'com.squareup.okhttp3:okhttp:3.5.0'
picasso = 'com.squareup.picasso:picasso:2.5.2' picasso = 'com.squareup.picasso:picasso:2.5.2'
picasso2Okhttp3Downloader = 'com.jakewharton.picasso:picasso2-okhttp3-downloader:1.1.0' picasso2Okhttp3Downloader = 'com.jakewharton.picasso:picasso2-okhttp3-downloader:1.1.0'
textDrawable = 'com.amulyakhare:com.amulyakhare.textdrawable:1.0.1' textDrawable = 'com.amulyakhare:com.amulyakhare.textdrawable:1.0.1'
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment