package chat.rocket.android_ddp;

import android.support.annotation.NonNull;
import android.support.annotation.Nullable;
import android.text.TextUtils;

import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import bolts.Task;
import bolts.TaskCompletionSource;
import chat.rocket.android.log.RCLog;
import chat.rocket.android_ddp.rx.RxWebSocket;
import chat.rocket.android_ddp.rx.RxWebSocketCallback;
import io.reactivex.Flowable;
import io.reactivex.Maybe;
import io.reactivex.disposables.CompositeDisposable;
import okhttp3.OkHttpClient;

public class DDPClientImpl {
  private final DDPClient client;
  private RxWebSocket websocket;
  private Flowable<RxWebSocketCallback.Base> flowable;
  private CompositeDisposable disposables;
  private String currentSession;

  public DDPClientImpl(DDPClient self, OkHttpClient client) {
    websocket = new RxWebSocket(client);
    this.client = self;
  }

  private static JSONObject toJson(String s) {
    if (TextUtils.isEmpty(s)) {
      return null;
    }
    try {
      return new JSONObject(s);
    } catch (JSONException e) {
      return null;
    }
  }

  private static String extractMsg(JSONObject response) {
    if (response == null || response.isNull("msg")) {
      return null;
    } else {
      return response.optString("msg");
    }
  }

  public void connect(final TaskCompletionSource<DDPClientCallback.Connect> task, final String url,
                      String session) {
    try {
      flowable = websocket.connect(url).autoConnect(2);
      CompositeDisposable disposables = new CompositeDisposable();

      disposables.add(
          flowable.retry().filter(callback -> callback instanceof RxWebSocketCallback.Open)
              .subscribe(
                  callback ->
                    sendMessage("connect",
                        json -> (TextUtils.isEmpty(session) ? json : json.put("session", DDPClientImpl.this.currentSession))
                            .put(
                                "version", "pre2")
                            .put("support", new JSONArray().put("pre2").put("pre1")),
                        task),
                  RCLog::e
              )
      );

      disposables.add(
          flowable.filter(
              callback -> callback instanceof RxWebSocketCallback.Message)
              .map(callback -> ((RxWebSocketCallback.Message) callback).responseBodyString)
              .map(DDPClientImpl::toJson)
              .timeout(7, TimeUnit.SECONDS)
              .subscribe(response -> {
                    String msg = extractMsg(response);
                    if ("connected".equals(msg) && !response.isNull("session")) {
                      currentSession = response.optString("session");
                      task.trySetResult(
                          new DDPClientCallback.Connect(client, response.optString("session")));
                      disposables.clear();
                    } else if ("error".equals(msg) && "Already connected".equals(
                        response.optString("reason"))) {
                      task.trySetResult(new DDPClientCallback.Connect(client, null));
                      disposables.clear();
                    } else if ("failed".equals(msg)) {
                      task.trySetError(
                          new DDPClientCallback.Connect.Failed(client, response.optString("version")));
                      disposables.clear();
                    }
                  },
                  err -> task.trySetError(new DDPClientCallback.Connect.Timeout(client))
              )
      );

      addErrorCallback(disposables, task);

      subscribeBaseListeners();
    } catch (Exception e) {
      RCLog.e(e);
    }
  }

  public Maybe<DDPClientCallback.Base> ping(@Nullable final String id) {

    final boolean requested = (TextUtils.isEmpty(id)) ?
        sendMessage("ping", null) :
        sendMessage("ping", json -> json.put("id", id));

    if (requested) {
      return flowable.filter(callback -> callback instanceof RxWebSocketCallback.Message)
              .map(callback -> ((RxWebSocketCallback.Message) callback).responseBodyString)
              .map(DDPClientImpl::toJson)
              .filter(response -> "pong".equalsIgnoreCase(extractMsg(response)))
              .doOnError(error -> {
                RCLog.e(error, "Heartbeat ping[%s] xxx failed xxx", id);
              })
              .map(response -> {
                String msg = extractMsg(response);
                if ("pong".equals(msg)) {
                  RCLog.d("pong[%s] <", id);
                  if (response.isNull("id")) {
                    return new DDPClientCallback.Ping(client, null);
                  } else {
                    String _id = response.optString("id");
                    if (id.equals(_id)) {
                      return new DDPClientCallback.Ping(client, _id);
                    } else {
                      return new DDPClientCallback.Ping.UnMatched(client, _id);
                    }
                  }
                }
                // if we receive anything other than a pong throw an exception
                throw new DDPClientCallback.RPC.Error(client, id, response);
              }).firstElement();
    } else {
      return Maybe.error(new DDPClientCallback.Closed(client));
    }
  }

  public void ping(final TaskCompletionSource<DDPClientCallback.Ping> task,
                   @Nullable final String id) {

    final boolean requested = (TextUtils.isEmpty(id)) ?
            sendMessage("ping", null) :
            sendMessage("ping", json -> json.put("id", id));

    if (requested) {
      CompositeDisposable disposables = new CompositeDisposable();

      disposables.add(
              flowable.filter(callback -> callback instanceof RxWebSocketCallback.Message)
                      .timeout(8, TimeUnit.SECONDS)
                      .map(callback -> ((RxWebSocketCallback.Message) callback).responseBodyString)
                      .map(DDPClientImpl::toJson)
                      .subscribe(
                              response -> {
                                String msg = extractMsg(response);
                                if ("pong".equals(msg)) {
                                  if (response.isNull("id")) {
                                    task.setResult(new DDPClientCallback.Ping(client, null));
                                  } else {
                                    String _id = response.optString("id");
                                    if (id.equals(_id)) {
                                      task.setResult(new DDPClientCallback.Ping(client, id));
                                    }
                                  }
                                  disposables.clear();
                                }
                              },
                              err -> task.setError(new DDPClientCallback.Ping.Timeout(client))
                      )
      );

      addErrorCallback(disposables, task);
    } else {
      task.trySetError(new DDPClientCallback.Closed(client));
    }
  }

  public void sub(final TaskCompletionSource<DDPSubscription.Ready> task, String name,
                  JSONArray params, String id) {
    final boolean requested =
        sendMessage("sub", json -> json.put("id", id).put("name", name).put("params", params));

    if (requested) {
      CompositeDisposable disposables = new CompositeDisposable();

      disposables.add(
          flowable.filter(callback -> callback instanceof RxWebSocketCallback.Message)
              .map(callback -> ((RxWebSocketCallback.Message) callback).responseBodyString)
              .map(DDPClientImpl::toJson)
              .subscribe(
                  response -> {
                    String msg = extractMsg(response);
                    if ("ready".equals(msg) && !response.isNull("subs")) {
                      JSONArray ids = response.optJSONArray("subs");
                      for (int i = 0; i < ids.length(); i++) {
                        String _id = ids.optString(i);
                        if (id.equals(_id)) {
                          task.setResult(new DDPSubscription.Ready(client, id));
                          disposables.clear();
                          break;
                        }
                      }
                    } else if ("nosub".equals(msg) && !response.isNull("id") && !response.isNull(
                        "error")) {
                      String _id = response.optString("id");
                      if (id.equals(_id)) {
                        task.setError(new DDPSubscription.NoSub.Error(client, id,
                            response.optJSONObject("error")));
                        disposables.clear();
                      }
                    }
                  },
                  RCLog::e
              )
      );

      addErrorCallback(disposables, task);
    } else {
      task.trySetError(new DDPClientCallback.Closed(client));
    }
  }

  public void unsub(final TaskCompletionSource<DDPSubscription.NoSub> task,
                    @Nullable final String id) {

    final boolean requested = sendMessage("unsub", json -> json.put("id", id));

    if (requested) {
      CompositeDisposable disposables = new CompositeDisposable();

      disposables.add(
          flowable.filter(callback -> callback instanceof RxWebSocketCallback.Message)
              .map(callback -> ((RxWebSocketCallback.Message) callback).responseBodyString)
              .map(DDPClientImpl::toJson)
              .subscribe(
                  response -> {
                    String msg = extractMsg(response);
                    if ("nosub".equals(msg) && response.isNull("error") && !response.isNull("id")) {
                      String _id = response.optString("id");
                      if (id.equals(_id)) {
                        task.setResult(new DDPSubscription.NoSub(client, id));
                        disposables.clear();
                      }
                    }
                  },
                  err -> {
                  }
              )
      );

      addErrorCallback(disposables, task);
    } else {
      task.trySetError(new DDPClientCallback.Closed(client));
    }
  }

  public void rpc(final TaskCompletionSource<DDPClientCallback.RPC> task, String method,
                  JSONArray params, String id, long timeoutMs) {
    final boolean requested =
        sendMessage("method",
            json -> json.put("method", method).put("params", params).put("id", id));

    if (requested) {
      CompositeDisposable disposables = new CompositeDisposable();

      disposables.add(
          flowable.filter(callback -> callback instanceof RxWebSocketCallback.Message)
              .map(callback -> ((RxWebSocketCallback.Message) callback).responseBodyString)
              .map(DDPClientImpl::toJson)
              .timeout(timeoutMs, TimeUnit.MILLISECONDS)
              .subscribe(
                  response -> {
                    String msg = extractMsg(response);
                    if ("result".equals(msg)) {
                      String _id = response.optString("id");
                      if (id.equals(_id)) {
                        if (!response.isNull("error")) {
                          task.setError(new DDPClientCallback.RPC.Error(client, id,
                              response.optJSONObject("error")));
                        } else {
                          String result = response.optString("result");
                          task.setResult(new DDPClientCallback.RPC(client, id, result));
                        }
                        disposables.clear();
                      }
                    }
                  },
                  err -> {
                    if (err instanceof TimeoutException) {
                      task.setError(new DDPClientCallback.RPC.Timeout(client));
                    }
                  }
              )
      );

      addErrorCallback(disposables, task);
    } else {
      task.trySetError(new DDPClientCallback.Closed(client));
    }
  }

  private void subscribeBaseListeners() {
    if (disposables != null &&
        disposables.size() > 0 && !disposables.isDisposed()) {
      return;
    }

    disposables = new CompositeDisposable();
    disposables.add(
        flowable.filter(callback -> callback instanceof RxWebSocketCallback.Message)
            .map(callback -> ((RxWebSocketCallback.Message) callback).responseBodyString)
            .map(DDPClientImpl::toJson)
            .subscribe(
                response -> {
                  String msg = extractMsg(response);
                  if ("ping".equals(msg)) {
                    if (response.isNull("id")) {
                      sendMessage("pong", null);
                    } else {
                      sendMessage("pong", json -> json.put("id", response.getString("id")));
                    }
                  }
                },
                RCLog::e
            )
    );
  }

  public Flowable<DDPSubscription.Event> getDDPSubscription() {
    String[] targetMsgs = {"added", "changed", "removed", "addedBefore", "movedBefore"};
    return flowable.filter(callback -> callback instanceof RxWebSocketCallback.Message)
        .map(callback -> ((RxWebSocketCallback.Message) callback).responseBodyString)
        .map(DDPClientImpl::toJson)
        .filter(response -> {
          String msg = extractMsg(response);
          for (String m : targetMsgs) {
            if (m.equals(msg)) {
              return true;
            }
          }
          return false;
        })
        .map(response -> {
          String msg = extractMsg(response);
          if ("added".equals(msg)) {
            return new DDPSubscription.Added(client, response.optString("collection"),
                response.optString("id"),
                response.isNull("fields") ? null : response.optJSONObject("fields"));
          } else if ("addedBefore".equals(msg)) {
            return new DDPSubscription.Added.Before(client, response.optString("collection"),
                response.optString("id"),
                response.isNull("fields") ? null : response.optJSONObject("fields"),
                response.isNull("before") ? null : response.optString("before"));
          } else if ("changed".equals(msg)) {
            return new DDPSubscription.Changed(client, response.optString("collection"),
                response.optString("id"),
                response.isNull("fields") ? null : response.optJSONObject("fields"),
                response.isNull("cleared") ? new JSONArray() : response.optJSONArray("before"));
          } else if ("removed".equals(msg)) {
            return new DDPSubscription.Removed(client, response.optString("collection"),
                response.optString("id"));
          } else if ("movedBefore".equals(msg)) {
            return new DDPSubscription.MovedBefore(client, response.optString("collection"),
                response.optString("id"),
                response.isNull("before") ? null : response.optString("before"));
          }

          return null;
        });
  }

  public void unsubscribeBaseListeners() {
    if (disposables.size() > 0 || !disposables.isDisposed()) {
      disposables.clear();
    }
  }

  public Task<RxWebSocketCallback.Close> getOnCloseCallback() {
    TaskCompletionSource<RxWebSocketCallback.Close> task = new TaskCompletionSource<>();

    flowable.filter(callback -> callback instanceof RxWebSocketCallback.Close)
        .cast(RxWebSocketCallback.Close.class)
        .subscribe(
            task::setResult,
            err -> setTaskError(task, err)
        );

    return task.getTask().onSuccessTask(_task -> {
      unsubscribeBaseListeners();
      return _task;
    });
  }

  private boolean sendMessage(String msg, @Nullable JSONBuilder json) {
    try {
      JSONObject origJson = new JSONObject().put("msg", msg);
      String msg2 = (json == null ? origJson : json.create(origJson)).toString();
      return websocket.sendText(msg2);
    } catch (Exception e) {
      RCLog.e(e);
      return false;
    }
  }

  private void sendMessage(String msg, @Nullable JSONBuilder json,
                           TaskCompletionSource<?> taskForSetError) {
    if (!sendMessage(msg, json) && taskForSetError != null) {
      taskForSetError.trySetError(new DDPClientCallback.Closed(client));
    }
  }

  private void addErrorCallback(CompositeDisposable disposables, TaskCompletionSource<?> task) {
    disposables.add(
        flowable.subscribe(
            base -> {
              if (base instanceof RxWebSocketCallback.Close) {
                task.trySetError(new Exception(((RxWebSocketCallback.Close) base).reason));
              }
            },
            err -> {
              task.trySetError(new Exception(err));
              disposables.clear();
            }
        )
    );
  }

  public void close(int code, String reason) {
    try {
      websocket.close(code, reason);
    } catch (Exception e) {
      RCLog.e(e);
    }
  }

  private void setTaskError(TaskCompletionSource<? extends RxWebSocketCallback.Base> task, Throwable throwable) {
    if (throwable instanceof Exception) {
      task.setError((Exception) throwable);
    } else {
      task.setError(new Exception(throwable));
    }
  }

  private interface JSONBuilder {
    @NonNull
    JSONObject create(JSONObject root) throws JSONException;
  }
}