// A custom websocket-client with our protocol.
// Originally we used graphql protocol and client (apollo), but several unique requirements made it more of a burden than a benefit.
// After abandoning the graphql protocol we customized and used the `subscriptions-transport-ws` which was originally for the graphql transport: https://github.com/apollographql/subscriptions-transport-ws
// We attempted to fix several critical bugs that happened at reconnecting mostly, but that was also abandoned.
// To remedy the problem our implementation keeps a similar API, but focuses on being transparent and predictable.

import Backoff from 'backo2';
import { default as EventEmitterType, EventEmitter, ListenerFn } from 'eventemitter3';
import $$observable from 'symbol-observable';

export interface ExecutionResult {
  errors?: ReadonlyArray<Error>;
  data?: { [key: string]: any } | null;
  extensions?: { [key: string]: any };
}

export interface Observer<T> {
  next?: (value: T) => void;
  error?: (error: Error) => void;
  complete?: () => void;
}

export interface Observable<T> {
  subscribe(observer: Observer<T>): {
    unsubscribe: () => void;
  };
}

export interface OperationOptions {
  query?: string;
  variables?: Object;
  operationName?: string;
  [key: string]: any;
}

export interface Operation {
  options: OperationOptions;
  handler: (error: Error[], result?: any) => void;
}

export interface Operations {
  [id: string]: Operation;
}

export type ConnectionParams = {
  [paramName: string]: any;
};

export type FormattedError = Error & {
  originalError?: any;
};

export enum WebSocketClientState {
  IDLE, // indicates that no client is alive, may transition to CONNECTING
  CONNECTING, // indicates waiting on conn-ack and connect-timeout is active, may transition to BACKOFF or OPEN
  BACKOFF, // indicates backoff before next connection attempt, may transition to CONNECTING
  OPEN, // received conn-ack
  CLOSING, // temporary state only used during the client-socket shutdown
}

export class WebSocketClient {
  public client: any;
  public operations: Operations;
  private wsImpl: any;
  private nextOperationId: number;
  private connectionParamsCallback: any;
  private eventEmitter: EventEmitterType;
  private wsProtocols: string | string[];
  private anyServerMessageIsKeepalive: boolean;
  private wsOptionArguments: any[];

  private url: string;
  private dataReceiver: Promise<void>;
  private dataLoadImpl: Function;
  private dataDumpImpl: Function;
  private minConnectTimeout: number;
  private serverTimeout: number;
  private connecting_backoff_waiter: any;
  private connecting_timeout_waiter: any;

  private activate: boolean;
  private deactivate: boolean;
  private manageNormalId: any;
  private manageNowId: any;
  private client_state: WebSocketClientState;
  private connecting_timeout: boolean;
  private connecting_timeout_id: any;
  private connecting_backoff: boolean;
  private connecting_backoff_id: any;
  private connection_alive: boolean;
  private connection_alive_id: any;

  // to support event handlers with relevant state-change info
  private client_connected: boolean;
  private client_opened: boolean;
  private client_close_code: number;

  public static GQL_CONNECTION_INIT = 'connection_init'; // Client -> Server
  public static GQL_CONNECTION_ACK = 'connection_ack'; // Server -> Client
  public static GQL_CONNECTION_ERROR = 'connection_error'; // Server -> Client
  public static GQL_CONNECTION_TERMINATE = 'connection_terminate'; // Client -> Server
  public static GQL_CONNECTION_KEEP_ALIVE = 'ka'; // Server -> Client
  public static GQL_START = 'start'; // Client -> Server
  public static GQL_DATA = 'data'; // Server -> Client
  public static GQL_ERROR = 'error'; // Server -> Client
  public static GQL_COMPLETE = 'complete'; // Server -> Client
  public static GQL_STOP = 'stop'; // Client -> Server

  constructor(
    url: string,
    dataLoadImpl: any,
    dataDumpImpl: any,
    connectionParamsCallback: Function | Promise<ConnectionParams>,
    webSocketImpl?: any,
    minConnectTimeout?: number,
    serverTimeout?: number, // max connect timeout and keepalive timeout
    anyServerMessageIsKeepalive?: boolean,
    wsOptionArguments?: any[]
  ) {
    this.wsImpl = webSocketImpl || WebSocket;
    if (!this.wsImpl) {
      throw new Error(
        'Unable to find native implementation, or alternative implementation for WebSocket!'
      );
    }

    this.client = null;
    this.operations = {};
    this.nextOperationId = 0;
    this.connectionParamsCallback = connectionParamsCallback;
    this.eventEmitter = new EventEmitter();
    this.wsProtocols = 'graphql-ws';
    this.anyServerMessageIsKeepalive = anyServerMessageIsKeepalive || false;
    this.wsOptionArguments = wsOptionArguments || [];

    this.url = url;
    this.dataReceiver = new Promise((resolve) => resolve());
    this.dataLoadImpl = dataLoadImpl;
    this.dataDumpImpl = dataDumpImpl;
    this.minConnectTimeout = minConnectTimeout || 2500;
    this.serverTimeout = serverTimeout || 25000;
    this.connecting_backoff_waiter = new Backoff({ jitter: 0.5 });
    this.connecting_timeout_waiter = new Backoff({
      min: this.minConnectTimeout,
      max: this.serverTimeout,
      factor: 1.25,
    });

    this.activate = false;
    this.deactivate = false;
    this.manageNormalId = null;
    this.manageNowId = null;
    this.client_state = WebSocketClientState.IDLE;
    this.client_connected = false;
    this.client_opened = false;
    this.client_close_code = null;
    this.connecting_timeout = false;
    this.connecting_timeout_id = null;
    this.connecting_backoff = false;
    this.connecting_backoff_id = null;
    this.connection_alive = false;
    this.connection_alive_id = null;
  }

  /**
   * Returns the current state of the websocket.
   */
  public get_ws_status() {
    if (this.client === null) {
      return this.wsImpl.CLOSED;
    }
    return this.client.readyState;
  }

  /**
   * Returns true when the websocket is open, but only after the connection-ack has been received.
   */
  public is_open() {
    return (
      this.client_state === WebSocketClientState.OPEN && this.get_ws_status() === this.wsImpl.OPEN
    );
  }

  public on(eventName: string, callback: ListenerFn, context?: any): Function {
    const handler = this.eventEmitter.on(eventName, callback, context);

    return () => {
      handler.off(eventName, callback, context);
    };
  }

  /**
   * Operational
   * Emitted when the socket connection has been established and is ready for use.
   * Provides the conn-ack message payload as the first parameter.
   */
  public onOpen(callback: ListenerFn, context?: any): Function {
    return this.on('open', callback, context);
  }

  /**
   * Operational
   * Emitted when the client has reset to default state on demand.
   */
  public onClosed(callback: ListenerFn, context?: any): Function {
    return this.on('closed', callback, context);
  }

  /**
   * Informational
   * Emitted when the socket has just connected to the server. The client is not ready for use yet.
   */
  public onConnected(callback: ListenerFn, context?: any): Function {
    return this.on('connected', callback, context);
  }

  /**
   * Informational
   * Emitted when the socket has just been created to connect to the server.
   */
  public onConnecting(callback: ListenerFn, context?: any): Function {
    return this.on('connecting', callback, context);
  }

  /**
   * Operational
   * Emitted when the socket client has been discarded. The arguments:
   *  - boolean indicating if the websocket connected
   *  - boolean indication if the client connection opened (init-ack handshake successful)
   *  - close-code that the server sent, this is only available if the server initiates the termination
   */
  public onDisconnected(callback: ListenerFn, context?: any): Function {
    return this.on('disconnected', callback, context);
  }

  /**
   * The first argument is a string that categorizes the error, then more values and errors:
   *  - If the websocket can't be instantiated: 'conn-create', Error
   *  - If the websocket instance raised an error during close: 'conn-close', Error
   *  - Connection error received from server: 'conn-err', message-payload
   *  - Connection ack received from server, but in incorrect state: 'conn-late-ack', message-payload, client-state
   *  - For errors received directly from the websocket instance: 'socket', Error
   *  - When a string message is received on the websocket (instead of a binary blob): 'recv-str', message
   *  - When a binary message fails to deserialize: 'recv-load', message, Error
   *  - When a message is received with unexpected type: 'recv-type', message
   */
  public onError(callback: ListenerFn, context?: any): Function {
    return this.on('error', callback, context);
  }

  /**
   * Informational
   * Emitted each time the client transitions between states.
   */
  public onState(callback: ListenerFn, context?: any): Function {
    return this.on('state', callback, context);
  }

  private _change_state(new_state: WebSocketClientState) {
    if (this.client_state !== new_state) {
      this.client_state = new_state;
      this._safe_emit('state', new_state);
    }
  }

  /**
   * This emit wrapper shield all internal logics from rouge errors from event handler codes.
   * @private
   */
  private _safe_emit(event: string, ...args) {
    try {
      this.eventEmitter.emit(event, ...args);
    } catch (e) {
      // don't really care about event handler issues
      console.warn('Error from handling event', event, e);
    }
  }

  /**
   * Activates the client to want to establish its connection. The websocket will be opened asynchronously in
   * the background. When connection issues arise, the websocket will be re-opened automatically in the background.
   */
  public start() {
    this.activate = true;
    this._ensure_manage_now();
  }

  /**
   * Deactivates the client. The connection will be closed in the background and all active operations stopped.
   */
  public stop() {
    this.activate = false;
    this.deactivate = true;
    this._ensure_manage_now();
  }

  private _ensure_manage_now() {
    if (this.manageNowId === null) {
      this.manageNowId = setTimeout(this._manage_now_cb.bind(this), 1);
    }
  }

  private _manage_now_cb() {
    this.manageNowId = null;
    if (this.manageNormalId !== null) {
      // clear normal, it will re-schedule itself if necessary
      clearTimeout(this.manageNormalId);
    }
    this._manage_normal_cb();
  }

  private _manage_normal_cb() {
    let schedule_time = 500;
    try {
      schedule_time = this._manage_client();
    } catch (e) {
      console.error('May be unexpected internal error from WebSocketClient', e);
    } finally {
      if (schedule_time > 0) {
        this.manageNormalId = setTimeout(this._manage_normal_cb.bind(this), schedule_time);
      } else {
        this.manageNormalId = null;
      }
    }
  }

  /**
   * Must return 0 to stop the manager timeout.
   * @private
   */
  private _manage_client(): number {
    if (this.deactivate || !this.activate) {
      if (this._relinquish_client()) {
        this._safe_emit('closed');
        this.deactivate = false;
        if (this.activate) {
          return 1; // reopen
        }
        return 0; // closed, stop manager timeout
      } else {
        return 100; // closing, keep checking frequently what to do
      }
    }
    if (this._establish_client()) {
      return 2500; // open and connected, keep checking though
    } else {
      return 100; // opening, keep checking frequently what to do
    }
  }

  /**
   * Return true when the target state is reached.
   * @private
   */
  private _establish_client(): boolean {
    const status = this.get_ws_status();
    switch (this.client_state) {
      case WebSocketClientState.OPEN:
        if (status === this.wsImpl.OPEN && this.connection_alive) {
          return true;
        }
        // A good connection has ended for some reason.
        this._manage_state_to_idle_from_any();
        break;
      case WebSocketClientState.IDLE:
        this._manage_state_to_connecting_from_idle();
        break;
      case WebSocketClientState.CONNECTING:
        if (this.connecting_timeout) {
          // go to backoff at timeout
          this._manage_state_to_backoff_from_connecting();
        }
        break;
      case WebSocketClientState.BACKOFF:
        if (this.connecting_backoff) {
          // go to idle at timeout
          this._manage_state_to_idle_from_backoff();
        }
        break;
      case WebSocketClientState.CLOSING:
        console.error(
          'WebSocketClient internal state got to CLOSING in manager loop, which is not intended'
        );
        this._manage_state_to_idle_from_any();
        break;
    }
    return false;
  }

  /**
   * Return true when the target state is reached.
   * @private
   */
  private _relinquish_client(): boolean {
    this._manage_state_to_idle_from_any();
    return true;
  }

  /**
   * Call this to enter CONNECTING state.
   * @private
   */
  private _manage_state_to_connecting_from_idle() {
    this.connecting_timeout = false;
    if (this.connecting_timeout_id !== null) {
      clearTimeout(this.connecting_timeout_id);
      this.connecting_timeout_id = null;
    }
    try {
      this.client = new this.wsImpl(this.url, this.wsProtocols, ...this.wsOptionArguments);
    } catch (e) {
      this._safe_emit('error', 'conn-create', e);
      throw e; // propagate to retry at least
    }
    this.connecting_timeout_id = setTimeout(
      this._connecting_timeout_cb.bind(this),
      this.connecting_timeout_waiter.duration()
    );
    this._change_state(WebSocketClientState.CONNECTING);

    this.client.onopen = async () => {
      if (this.get_ws_status() === this.wsImpl.OPEN) {
        this._safe_emit('connected');
        this.client_connected = true;
        try {
          const connectionParams: ConnectionParams = await this.connectionParamsCallback();
          this._send_message(undefined, WebSocketClient.GQL_CONNECTION_INIT, connectionParams);
        } catch (error) {
          this.connecting_timeout = true; // treat as timeout happened and transition to backoff by closing the socket immediately
          this._safe_emit('error', 'conn-init', error);
        }
      }
    };

    this.client.onclose = (event: any) => {
      if (event) {
        this.client_close_code = event.code || null;
      }
      this._ensure_manage_now();
    };

    this.client.onerror = (err: Error) => {
      this._safe_emit('error', 'socket', err);
    };

    this.client.onmessage = ({ data }: { data: any }) => {
      if (typeof data === 'string') {
        // we do not use text transport
        this._safe_emit('error', 'recv-str', data);
      } else {
        // Message arrived on binary transport; data is a Blob.
        this.dataReceiver = this.dataReceiver
          .catch(() => null) // ignore error of previous message processing if any; it is irrelevant now
          .then(() => data.arrayBuffer()) // no error handler, socket read will likely trigger disconnect
          .then((buffer: ArrayBuffer) => this._process_received_data(new Uint8Array(buffer)));
      }
    };

    this._safe_emit('connecting');
  }

  private _connecting_timeout_cb() {
    this.connecting_timeout_id = null;
    this.connecting_timeout = true;
    this._ensure_manage_now();
  }

  private _manage_state_to_backoff_from_connecting() {
    this.connecting_backoff = false;
    if (this.connecting_backoff_id !== null) {
      clearTimeout(this.connecting_backoff_id);
    }
    this.connecting_backoff_id = setTimeout(
      this._connecting_backoff_cb.bind(this),
      this.connecting_backoff_waiter.duration()
    );
    this._close_client(WebSocketClientState.BACKOFF);
  }

  private _connecting_backoff_cb() {
    this.connecting_backoff_id = null;
    this.connecting_backoff = true;
    this._ensure_manage_now();
  }

  private _manage_state_to_idle_from_backoff() {
    this._change_state(WebSocketClientState.IDLE);
  }

  /**
   * Call this to forcibly return to IDLE state. Use for de-activation and before reconnecting in OPEN state.
   * @private
   */
  private _manage_state_to_idle_from_any() {
    this.connecting_timeout_waiter.reset();
    this.connecting_backoff_waiter.reset();
    this._close_client(WebSocketClientState.IDLE);
  }

  private _close_client(state_after: WebSocketClientState) {
    this._change_state(WebSocketClientState.CLOSING);
    if (this.client !== null) {
      try {
        this._send_message(undefined, WebSocketClient.GQL_CONNECTION_TERMINATE, null);
      } catch {} // nothing to do here
      try {
        this.client.close();
      } catch (e) {
        this._safe_emit('error', 'conn-close', e);
      }
      this.client.onopen = null;
      this.client.onclose = null;
      this.client.onerror = null;
      this.client.onmessage = null;
      this.client = null;
    }
    Object.keys(this.operations).forEach((opId) => {
      try {
        this.operations[opId].handler(this._format_errors('Client is closing'), null);
      } catch (e) {
        console.warn('Error from operation handler while closing client', e);
      } finally {
        delete this.operations[opId];
      }
    });
    this._change_state(state_after);
    this._safe_emit(
      'disconnected',
      this.client_connected,
      this.client_opened,
      this.client_close_code
    );
    this.client_connected = false;
    this.client_opened = false;
    this.client_close_code = null;
  }

  public _process_received_data(receivedData: any) {
    let parsedMessage: any;
    let opId: string;

    try {
      parsedMessage = this.dataLoadImpl(receivedData);
      opId = parsedMessage.id;
    } catch (e) {
      this._safe_emit('error', 'recv-load', receivedData, e);
      return;
    }

    if (this.anyServerMessageIsKeepalive) {
      this._handle_connection_alive();
    }

    let op: Operation;
    switch (parsedMessage.type) {
      case WebSocketClient.GQL_CONNECTION_ERROR:
        this._safe_emit('error', 'conn-err', parsedMessage.payload);
        break;

      case WebSocketClient.GQL_CONNECTION_ACK:
        if (this.client_state === WebSocketClientState.CONNECTING) {
          // Advance to open state.
          this._change_state(WebSocketClientState.OPEN);
          if (!this.anyServerMessageIsKeepalive) {
            this._handle_connection_alive();
          }
          this._safe_emit('open', parsedMessage.payload);
          this.client_opened = true;
        } else {
          this._safe_emit('error', 'conn-late-ack', parsedMessage.payload, this.client_state);
        }
        break;

      case WebSocketClient.GQL_COMPLETE:
        op = this.operations[opId];
        if (op) {
          try {
            op.handler.call(this, null, null);
          } finally {
            // ensure cleanup in spite of handler issues
            delete this.operations[opId];
          }
        }
        break;

      case WebSocketClient.GQL_ERROR:
        op = this.operations[opId];
        if (op) {
          op.handler(this._format_errors(parsedMessage.payload), null);
        } else {
          // rouge or broken-abandoned op
          this._send_message(opId, WebSocketClient.GQL_STOP, undefined);
        }
        break;

      case WebSocketClient.GQL_DATA:
        op = this.operations[opId];
        if (op) {
          const parsedPayload = !parsedMessage.payload.errors
            ? parsedMessage.payload
            : {
                ...parsedMessage.payload,
                errors: this._format_errors(parsedMessage.payload.errors),
              };
          op.handler(null, parsedPayload);
        } else {
          // rouge or broken-abandoned op
          this._send_message(opId, WebSocketClient.GQL_STOP, undefined);
        }
        break;

      case WebSocketClient.GQL_CONNECTION_KEEP_ALIVE:
        if (!this.anyServerMessageIsKeepalive) {
          this._handle_connection_alive();
        }
        break;

      default:
        this._safe_emit('error', 'recv-type', parsedMessage);
    }
  }

  private _handle_connection_alive() {
    if (this.connection_alive_id !== null) {
      clearTimeout(this.connection_alive_id);
    }
    this.connection_alive = true;
    this.connection_alive_id = setTimeout(this._connection_alive_cb.bind(this), this.serverTimeout);
  }

  private _connection_alive_cb() {
    this.connection_alive_id = null;
    this.connection_alive = false;
    this._ensure_manage_now();
  }

  public request(request: OperationOptions): Observable<ExecutionResult> {
    const getObserver = this._get_observer.bind(this);
    const executeOperation = this._execute_operation.bind(this);
    const unsubscribe = this._unsubscribe.bind(this);

    let opId: string;

    return {
      [$$observable]() {
        return this;
      },
      subscribe(
        observerOrNext: Observer<ExecutionResult> | ((v: ExecutionResult) => void),
        onError?: (error: Error) => void,
        onComplete?: () => void
      ) {
        const observer = getObserver(observerOrNext, onError, onComplete);

        opId = executeOperation(request, (error: Error[], result: any) => {
          if (error === null && result === null) {
            if (observer.complete) {
              observer.complete();
            }
          } else if (error) {
            if (observer.error) {
              observer.error(error[0]);
            }
          } else {
            if (observer.next) {
              observer.next(result);
            }
          }
        });

        return {
          unsubscribe: () => {
            if (opId) {
              unsubscribe(opId);
              opId = null;
            }
          },
        };
      },
    };
  }

  private _get_observer<T>(
    observerOrNext: Observer<T> | ((v: T) => void),
    error?: (e: Error) => void,
    complete?: () => void
  ) {
    if (typeof observerOrNext === 'function') {
      return {
        next: (v: T) => observerOrNext(v),
        error: (e: Error) => error && error(e),
        complete: () => complete && complete(),
      };
    }

    return observerOrNext;
  }

  private _execute_operation(
    options: OperationOptions,
    handler: (error: Error[], result?: any) => void
  ): string {
    if (this.is_open()) {
      const opId = String(++this.nextOperationId);
      try {
        this._send_message(opId, WebSocketClient.GQL_START, options);
      } catch (e) {
        handler(this._format_errors('Message send failed'), null);
        return opId; // it's not in operations, so unsub callback will be no-op
      }
      this.operations[opId] = { options: options, handler };
      return opId;
    } else {
      handler(this._format_errors('Client is not open'), null);
    }
  }

  /**
   * Used for user-initiated cancel.
   * @private
   */
  private _unsubscribe(opId: string) {
    if (this.client_state === WebSocketClientState.CLOSING) {
      return; // no reason to do anything in this state
    }
    const op = this.operations[opId];
    if (op) {
      delete this.operations[opId];
      try {
        this._send_message(opId, WebSocketClient.GQL_STOP, undefined);
      } catch {
        // if the stop is not sent, one will be sent from receiving a rouge message
      } finally {
        // ensure closing callback is fired
        op.handler(null, null);
      }
    }
  }

  /**
   * Low level structured message send on the socket. This shall be only called when the client is in appropriate state
   * and thrown errors must be handled as well by caller.
   * @private
   */
  private _send_message(id: string, type: string, payload: any) {
    const message: Uint8Array = this.dataDumpImpl(this._build_message(id, type, payload));
    this.client.send(message);
  }

  // ensure we have an array of errors
  private _format_errors(errors: any): FormattedError[] {
    if (Array.isArray(errors)) {
      return errors;
    }
    if (errors && errors.errors) {
      return this._format_errors(errors.errors);
    }
    if (errors && errors.message) {
      return [errors];
    }
    return [
      {
        name: 'FormatedError',
        message: 'Unknown error',
        originalError: errors,
      },
    ];
  }

  private _build_message(id: string, type: string, payload: any) {
    const payloadToReturn =
      payload && payload.query
        ? {
            ...payload,
            query: payload.query,
          }
        : payload;

    return {
      id,
      type,
      payload: payloadToReturn,
    };
  }
}
