package chat.rocket.android_ddp.rx; import java.io.IOException; import chat.rocket.android.log.RCLog; import okhttp3.OkHttpClient; import okhttp3.Request; import okhttp3.Response; import okhttp3.WebSocket; import okhttp3.WebSocketListener; import rx.Emitter; import rx.Observable; import rx.exceptions.OnErrorNotImplementedException; import rx.functions.Action1; import rx.observables.ConnectableObservable; public class RxWebSocket { private OkHttpClient httpClient; private WebSocket webSocket; public RxWebSocket(OkHttpClient client) { httpClient = client; } public ConnectableObservable<RxWebSocketCallback.Base> connect(String url) { final Request request = new Request.Builder().url(url).build(); return Observable.fromEmitter( new Action1<Emitter<RxWebSocketCallback.Base>>() { @Override public void call(Emitter<RxWebSocketCallback.Base> emitter) { httpClient.newWebSocket(request, new WebSocketListener() { @Override public void onOpen(WebSocket webSocket, Response response) { RxWebSocket.this.webSocket = webSocket; emitter.onNext(new RxWebSocketCallback.Open(RxWebSocket.this.webSocket, response)); } @Override public void onFailure(WebSocket webSocket, Throwable err, Response response) { try { emitter.onError(new RxWebSocketCallback.Failure(webSocket, err, response)); } catch (OnErrorNotImplementedException ex) { RCLog.w(ex, "OnErrorNotImplementedException ignored"); } } @Override public void onMessage(WebSocket webSocket, String text) { emitter.onNext(new RxWebSocketCallback.Message(webSocket, text)); } @Override public void onClosed(WebSocket webSocket, int code, String reason) { emitter.onNext(new RxWebSocketCallback.Close(webSocket, code, reason)); emitter.onCompleted(); } }); } }, Emitter.BackpressureMode.BUFFER ).publish(); } public boolean sendText(String message) throws IOException { return webSocket.send(message); } public boolean close(int code, String reason) throws IOException { return webSocket.close(code, reason); } }