import { Ticker } from '@shared/protos/ticker';
import { Stream, StreamEvent, Subscription } from './Stream';

export type TickerCallback = (ticker: Ticker) => void;

interface ProductStream {
  onMessage: TickerCallback;
  subscribe: (callback: TickerCallback) => void;
  callbacks: Set<TickerCallback>;
}

const productStream = (product: string): ProductStream => {
  const callbacks = new Set<TickerCallback>();
  // cache of tickers - required for callbacks that subscribe after the server snapshot
  const cache: Record<string, Ticker> = {};

  const subscribe = (callback: TickerCallback) => {
    Object.values(cache).forEach(ticker => callback(ticker));
    callbacks.add(callback);
  };

  const onMessage = (ticker: Ticker) => {
    cache[ticker.symbol as string] = ticker;
    for (const callback of callbacks) {
      callback(ticker);
    }
  };

  return {
    onMessage,
    subscribe,
    callbacks,
  };
};

// Service for subscribing to products
export const streamProductService = (stream: Stream) => {
  const productStreams: Record<string, ProductStream> = {};

  const scheduleUnsubscribe = (productSymbol: string) => {
    setTimeout(() => {
      const ps = productStreams[productSymbol];
      if (ps && ps.callbacks.size === 0) {
        delete productStreams[productSymbol];
        stream.unsubscribe(Subscription.tickers([productSymbol]));
      }
    }, 5000);
  };

  // manage all subscriptions
  stream.onEvent('tickers', (event: StreamEvent) => {
    const tickers = event.asTickers();
    if (!tickers) return;
    tickers.forEach(ticker => {
      productStreams[ticker.product_symbol]?.onMessage(ticker);
    });
  });

  // manage disconnection
  stream.onConnect(() => {
    const products = Object.keys(productStreams);
    if (products.length > 0) {
      stream.subscribe(Subscription.tickers(products));
    }
  });

  // Subscribe to a product symbols
  const subscribe = (productSymbols: string[], callback: TickerCallback) => {
    const toSubscribe = productSymbols.reduce((toSubscribe: string[], productSymbol: string) => {
      let ps = productStreams[productSymbol];
      if (!ps) {
        ps = productStream(productSymbol);
        productStreams[productSymbol] = ps;
        toSubscribe.push(productSymbol);
      }
      ps.subscribe(callback);
      return toSubscribe;
    }, []);
    // subscribe to the stream if not already subscribed
    if (toSubscribe.length > 0) {
      stream.subscribe(Subscription.tickers(toSubscribe));
    }
  };

  // Unsubscribe from a product
  const unsubscribe = (productSymbols: string[], callback: TickerCallback) => {
    productSymbols.forEach((productSymbol: string) => {
      const ps = productStreams[productSymbol];
      if (ps) {
        ps.callbacks.delete(callback);
        if (ps.callbacks.size === 0) {
          scheduleUnsubscribe(productSymbol);
        }
      }
    });
  };

  return {
    subscribe,
    unsubscribe,
  };
};
