package chat.rocket.android_ddp.rx;

import java.io.IOException;
import chat.rocket.android.log.RCLog;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;
import okhttp3.ResponseBody;
import okhttp3.ws.WebSocket;
import okhttp3.ws.WebSocketCall;
import okhttp3.ws.WebSocketListener;
import okio.Buffer;
import rx.Observable;
import rx.Subscriber;
import rx.exceptions.OnErrorNotImplementedException;
import rx.observables.ConnectableObservable;

public class RxWebSocket {
  private OkHttpClient httpClient;
  private WebSocket webSocket;
  private boolean isConnected;

  public RxWebSocket(OkHttpClient client) {
    httpClient = client;
    isConnected = false;
  }

  public ConnectableObservable<RxWebSocketCallback.Base> connect(String url) {
    final Request request = new Request.Builder().url(url).build();
    WebSocketCall call = WebSocketCall.create(httpClient, request);

    return Observable.create(new Observable.OnSubscribe<RxWebSocketCallback.Base>() {
      @Override
      public void call(Subscriber<? super RxWebSocketCallback.Base> subscriber) {
        call.enqueue(new WebSocketListener() {
          @Override
          public void onOpen(WebSocket webSocket, Response response) {
            isConnected = true;
            RxWebSocket.this.webSocket = webSocket;
            subscriber.onNext(new RxWebSocketCallback.Open(RxWebSocket.this.webSocket, response));
          }

          @Override
          public void onFailure(IOException e, Response response) {
            try {
              isConnected = false;
              subscriber.onError(new RxWebSocketCallback.Failure(webSocket, e, response));
            } catch (OnErrorNotImplementedException ex) {
              RCLog.w(ex, "OnErrorNotImplementedException ignored");
            }
          }

          @Override
          public void onMessage(ResponseBody responseBody) throws IOException {
            isConnected = true;
            subscriber.onNext(new RxWebSocketCallback.Message(webSocket, responseBody));
          }

          @Override
          public void onPong(Buffer payload) {
            isConnected = true;
            subscriber.onNext(new RxWebSocketCallback.Pong(webSocket, payload));
          }

          @Override
          public void onClose(int code, String reason) {
            isConnected = false;
            subscriber.onNext(new RxWebSocketCallback.Close(webSocket, code, reason));
            subscriber.onCompleted();
          }
        });
      }
    }).publish();
  }

  public void sendText(String message) throws IOException {
    webSocket.sendMessage(RequestBody.create(WebSocket.TEXT, message));
  }

  public boolean isConnected() {
    return isConnected;
  }

  public void close(int code, String reason) throws IOException {
    webSocket.close(code, reason);
  }
}