import { retry } from '@lifeomic/attempt';
import { Snapshot } from '@protos/snapshots';
import { Order, ProductRisk } from '@protos/trading';
import { Dashboards } from '@shared/protos/dashboard';
import { MESSAGE_TYPE } from '@shared/protos/options';
import { RFQTicker, Ticker } from '@shared/protos/ticker';
import { OrderResponse, OtcOrder } from '@shared/protos/trading';
import AsyncLock from 'async-lock';
import { gridUpdateService } from './GridUpdateService';
import { toastifyService } from './ToastifyService';
import { logger } from './context';

export interface StreamResponse {
  id: string;
  method: string;
  error: boolean;
  message: Record<string, any>;
}

export class StreamEvent {
  channel: string;
  message: any;
  raw: MessageEvent;
  messageType = 'unknown';
  timestamp = 0;

  constructor(channel: string, message: any, messageType: number, event: MessageEvent) {
    this.raw = event;
    this.timestamp = event.timeStamp;
    this.channel = channel;
    this.message = message;
    this.messageType = MESSAGE_TYPE[messageType] || 'unknown';
  }

  asTickers(): Ticker[] | null {
    if (this.channel !== 'tickers') return null;
    return this.message as Ticker[];
  }

  asRFQTickers(): RFQTicker[] | null {
    if (this.channel !== 'rfq') return null;
    return this.message as RFQTicker[];
  }

  asProductRisk(): ProductRisk | null {
    if (this.channel !== 'product_risk') return null;
    return this.message as ProductRisk;
  }

  asOrder(): Order | null {
    if (this.channel !== 'orders') return null;
    return this.message as Order;
  }

  asSnapshots(): Snapshot | null {
    if (this.channel !== 'snapshots') return null;
    return this.message as Snapshot;
  }

  asDashboards(): Dashboards | null {
    if (this.channel !== 'dashboards') return null;
    return new Dashboards(this.message, this.messageType);
  }
}

export type StreamEventCallback = (event: StreamEvent) => void;
export type VoidCallback = () => void;
export type RFQDataSpread = { front: string; back: string; size: number };
export type RiskPayload = { productSymbol: string; account_id: string };
export type RFQData = { rfqInfo: string; exchange: string };

export class Subscription {
  // one of: tickers, server-info, dashboards
  channel: string;
  // optional product symbols for tickers
  products?: string[];
  rfqData?: RFQData;
  riskData?: RiskPayload;

  constructor(channel: string, products?: string[], rfqData?: RFQData, riskData?: RiskPayload) {
    this.channel = channel;
    this.products = products;
    this.rfqData = rfqData;
    this.riskData = riskData;
  }

  payload(): string | Record<string, any> {
    if (this.channel === 'tickers') {
      const payload = {};
      if (this.products) payload['products'] = this.products;
      return { tickers: payload };
    } else if (this.channel === 'rfq') {
      const payload = {};
      const rfqData = this.rfqData?.rfqInfo.split('-');

      // const exchange = this.rfqData?.exchange;
      // if (exchange) {
      //   payload['exchange'] = exchange;
      // }

      if (rfqData) {
        if (rfqData.length === 2) {
          payload['symbol'] = rfqData[0];
          payload['size'] = +rfqData[1];
        } else {
          const [front, back, size] = rfqData;
          payload['symbol'] = { front, back };
          payload['size'] = size;
        }
      }

      return { rfq: payload };
    } else if (this.channel === 'product_risk') {
      const { productSymbol, account_id } = this.riskData || {};
      if (!productSymbol || !account_id) return { product_risk: {} };
      return { product_risk: { product_symbol: productSymbol, account_id } };
    }
    return this.channel;
  }

  static tickers(products: string[]) {
    return new Subscription('tickers', products);
  }

  static rfq(rfqData: RFQData) {
    return new Subscription('rfq', undefined, rfqData);
  }

  static product_risk(productSymbol: string, account_id: string) {
    return new Subscription('product_risk', undefined, undefined, { productSymbol, account_id });
  }

  static order() {
    return new Subscription('orders');
  }

  static serverInfo() {
    return new Subscription('server_info');
  }

  static dashboards() {
    return new Subscription('dashboards');
  }

  static snapshots() {
    return new Subscription('snapshots');
  }
}

class Channel {
  id: string;
  lastUpdate = 0;
  handlers: Set<StreamEventCallback>;

  constructor(id: string) {
    this.id = id;
    this.lastUpdate = 0;
    this.handlers = new Set();
  }

  dispatch(msg: StreamEvent) {
    this.lastUpdate = msg.timestamp;
    for (const handler of this.handlers) {
      try {
        handler(msg);
      } catch (e) {
        logger.error('could not deliver stream event', msg, e);
      }
    }
  }

  removeHandler(store: Stream, handler: StreamEventCallback) {
    this.handlers.delete(handler);
    if (this.handlers.size === 0) store.channels.delete(this.id);
  }
}

export enum Status {
  Connected = 'connected',
  Reconnecting = 'reconnecting',
  Disconnected = 'disconnected',
}

export class Stream {
  private lock: AsyncLock;
  private wsUrl: string;
  private messageId = 0;
  private socketError: { hasError: boolean; message: string } | undefined = undefined;
  private authenticated: boolean = false;
  private resolvers: Map<string, any>;
  onConnects: Set<VoidCallback>;
  channels: Map<string, Channel>;
  private token?: string = undefined;
  socket?: WebSocket = undefined;
  status: Status;
  uid = '';

  constructor(wsUrl: string) {
    this.wsUrl = wsUrl;
    this.onConnects = new Set<VoidCallback>();
    this.channels = new Map<string, Channel>();
    this.resolvers = new Map<string, any>();
    this.status = Status.Disconnected;
    this.lock = new AsyncLock();
  }

  // register a callback to be called when the stream is connected
  onConnect(handler: VoidCallback) {
    this.onConnects.add(handler);
  }

  // register a callback to be called when a stream event is received on a channel
  onEvent(channel: string, handler: StreamEventCallback): VoidCallback {
    this.getChannel(channel).handlers.add(handler);
    return () => this.removeHandler(channel, handler);
  }

  // Is the stream authenticated?
  get isAuthenticated(): boolean {
    return this.authenticated;
  }

  // The authentication token if any
  get authToken(): string | undefined {
    return this.token;
  }

  // Authenticate the stream with a token
  // If successful re subsccribe to all channels
  async authenticate(token: string) {
    this.token = token;
    await this.rpc('auth', { token });
    await this.reSubscribe();
  }

  async subscribe(sub: Subscription) {
    await this.maybeSendJson({
      id: this.newId(),
      method: 'subscribe',
      channel: sub.payload(),
    });
  }

  async unsubscribe(sub: Subscription) {
    await this.maybeSendJson({
      id: this.newId(),
      method: 'unsubscribe',
      channel: sub.payload(),
    });
  }

  async placeOtcOrder(order: OtcOrder) {
    const id = this.newId();
    this.resolvers.set(id, (response: OrderResponse) => {
      toastifyService.showTradeConfirmation(response);
      if (response.error || typeof response.message === 'string') return;
      gridUpdateService.setUpdatedRow(response.message.Order);
    });
    await this.maybeSendJson({
      id,
      method: 'order',
      ...order,
    });
  }

  async modifyDashboard(id: number, body: any) {
    const messageId = this.newId();

    this.resolvers.set(messageId, (response: StreamResponse) => {
      const { error } = response;
      if (error) {
        logger.error('stream response error', response);
      }
    });

    const newDashboard = {
      id,
      ...body,
    };

    await this.maybeSendJson({
      id: messageId,
      method: 'update_dashboard',
      dashboard: newDashboard,
    });
    return newDashboard;
  }

  // Resubscribe to all channels
  private reSubscribe() {
    for (const onConnect of this.onConnects) {
      onConnect();
    }
  }

  removeHandler(channel: string, handler: StreamEventCallback) {
    const c = this.channels.get(channel);
    if (c) c.removeHandler(this, handler);
  }

  // Low level API
  async rpc(method: string, payload: Record<string, unknown>) {
    const id = this.newId();
    const promise = new Promise((resolve, reject) => {
      this.resolvers.set(id, resolve);
    });
    await this.sendJson({
      id,
      method,
      ...payload,
    });
    await promise;
  }

  async maybeSendJson(payload: Record<string, unknown>) {
    if (this.authenticated && this.socket) {
      await this.sendJson(payload);
    }
  }

  async sendJson(payload: Record<string, unknown>) {
    logger.info('stream - request ', payload);
    const msg = JSON.stringify(payload);
    await this.send(msg);
  }

  async send(msg: string) {
    const socket = await this.getSocket();
    if (socket) socket.send(msg);
  }

  newId(): string {
    this.messageId += 1;
    return `flux-ws-${this.messageId}`;
  }

  private getChannel(channelId: string): Channel {
    let channel = this.channels.get(channelId);
    if (!channel) {
      channel = new Channel(channelId);
      this.channels.set(channelId, channel);
    }
    return channel;
  }

  private onResponse(data: any) {
    const resolver = this.resolvers.get(data.id);
    this.resolvers.delete(data.id);
    if (data.error) {
      logger.error('stream response error', data);
      this.socketError = { hasError: data.error, message: data.message?.Message || '' };
    } else {
      logger.log('stream - response: ', data);
      if (data.method === 'auth') {
        this.authenticated = true;
      }
    }
    if (resolver) resolver(data);
  }

  private async reconnect(): Promise<WebSocket> {
    this.reset();
    const sock = await this.getSocket();
    if (this.token) await this.authenticate(this.token);
    return sock;
  }

  private async getSocket(): Promise<WebSocket> {
    return await this.lock.acquire(
      'websocket',
      async () =>
        await retry(
          async () => {
            if (this.socket) return this.socket;
            if (this.socketError?.hasError && this.socketError?.message.includes('429')) return;
            return await this.connect();
          },
          { delay: 200, factor: 2, maxAttempts: 0 }
        )
    );
  }

  private connect(): Promise<WebSocket> {
    return new Promise((resolve, reject) => {
      const socket = new WebSocket(this.wsUrl);
      socket.onopen = event => {
        this.reset();
        this.socket = socket;
        logger.info('stream - new websocket connection', socket);
        this.status = Status.Connected;
        resolve(socket);
      };
      socket.onerror = event => {
        this.reset();
        this.socketError = { hasError: true, message: 'Websocket connection error' };
      };
      socket.onclose = event => {
        if (this.socket) {
          logger.warn('lost websocket connection, reconnecting');
          this.dispatchError({ error: 'Websocket lost connection' });
        } else reject(event);

        this.reconnect();
      };
      socket.onmessage = (event: MessageEvent) => {
        const data = JSON.parse(event.data);
        if (data.channel) {
          const streamEvent = new StreamEvent(data.channel, data.message, data.message_type, event);
          this.dispatch(streamEvent);
        } else {
          this.onResponse(data);
        }
      };
    });
  }

  private dispatch(msg: StreamEvent) {
    if (!msg.message) return;
    const channel = this.channels.get(msg.channel);
    if (channel) channel.dispatch(msg);
  }

  private reset() {
    this.status = Status.Disconnected;
    this.socket = undefined;
    this.authenticated = false;
  }

  private dispatchError(msg: any) {}

  public getSocketError() {
    return this.socketError;
  }
}
