Commit 7f01c5ed authored by Tiago Cunha's avatar Tiago Cunha

Using a backpressure enable observable

parent abcedb84
...@@ -7,9 +7,10 @@ import okhttp3.Request; ...@@ -7,9 +7,10 @@ import okhttp3.Request;
import okhttp3.Response; import okhttp3.Response;
import okhttp3.WebSocket; import okhttp3.WebSocket;
import okhttp3.WebSocketListener; import okhttp3.WebSocketListener;
import rx.Emitter;
import rx.Observable; import rx.Observable;
import rx.Subscriber;
import rx.exceptions.OnErrorNotImplementedException; import rx.exceptions.OnErrorNotImplementedException;
import rx.functions.Action1;
import rx.observables.ConnectableObservable; import rx.observables.ConnectableObservable;
public class RxWebSocket { public class RxWebSocket {
...@@ -23,38 +24,41 @@ public class RxWebSocket { ...@@ -23,38 +24,41 @@ public class RxWebSocket {
public ConnectableObservable<RxWebSocketCallback.Base> connect(String url) { public ConnectableObservable<RxWebSocketCallback.Base> connect(String url) {
final Request request = new Request.Builder().url(url).build(); final Request request = new Request.Builder().url(url).build();
return Observable.create(new Observable.OnSubscribe<RxWebSocketCallback.Base>() { return Observable.fromEmitter(
@Override new Action1<Emitter<RxWebSocketCallback.Base>>() {
public void call(Subscriber<? super RxWebSocketCallback.Base> subscriber) {
httpClient.newWebSocket(request, new WebSocketListener() {
@Override @Override
public void onOpen(WebSocket webSocket, Response response) { public void call(Emitter<RxWebSocketCallback.Base> emitter) {
RxWebSocket.this.webSocket = webSocket; httpClient.newWebSocket(request, new WebSocketListener() {
subscriber.onNext(new RxWebSocketCallback.Open(RxWebSocket.this.webSocket, response)); @Override
} public void onOpen(WebSocket webSocket, Response response) {
RxWebSocket.this.webSocket = webSocket;
emitter.onNext(new RxWebSocketCallback.Open(RxWebSocket.this.webSocket, response));
}
@Override @Override
public void onFailure(WebSocket webSocket, Throwable err, Response response) { public void onFailure(WebSocket webSocket, Throwable err, Response response) {
try { try {
subscriber.onError(new RxWebSocketCallback.Failure(webSocket, err, response)); emitter.onError(new RxWebSocketCallback.Failure(webSocket, err, response));
} catch (OnErrorNotImplementedException ex) { } catch (OnErrorNotImplementedException ex) {
RCLog.w(ex, "OnErrorNotImplementedException ignored"); RCLog.w(ex, "OnErrorNotImplementedException ignored");
} }
} }
@Override @Override
public void onMessage(WebSocket webSocket, String text) { public void onMessage(WebSocket webSocket, String text) {
subscriber.onNext(new RxWebSocketCallback.Message(webSocket, text)); emitter.onNext(new RxWebSocketCallback.Message(webSocket, text));
} }
@Override @Override
public void onClosed(WebSocket webSocket, int code, String reason) { public void onClosed(WebSocket webSocket, int code, String reason) {
subscriber.onNext(new RxWebSocketCallback.Close(webSocket, code, reason)); emitter.onNext(new RxWebSocketCallback.Close(webSocket, code, reason));
subscriber.onCompleted(); emitter.onCompleted();
}
});
} }
}); },
} Emitter.BackpressureMode.BUFFER
}).publish(); ).publish();
} }
public boolean sendText(String message) throws IOException { public boolean sendText(String message) throws IOException {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment