import { useEffect } from 'react';
import {
  getCachedToken,
  getQueryDateFormatted,
  nonProdDebugLog,
} from '../../../util';
import { decode } from '@msgpack/msgpack';
import { StreamType } from 'types';
import dayjs from 'dayjs';

const STREAM_HOST =
  process.env.REACT_APP_STREAMING_HOST ?? 'bbg.stream.spotgamma.com';
// const STREAM_HOST = 'bbg.staging.stream.spotgamma.com';
const MAX_CLOSE_RETRIES = 3;

export const useStreamingSocket = (
  enabled: boolean,
  hiroStreamingRefs: any,
  dataRefs: any,
  showTotal: boolean,
  updateCandleCallback: (candle: any, streamType: StreamType) => void,
  streamingSubscribeBitmask: number,
  endDate: dayjs.Dayjs,
  token?: string,
) => {
  const fetchStreamingEndpoint = async (endpoint: string) => {
    const url = `https://${STREAM_HOST}${endpoint}`;
    nonProdDebugLog('Fetching', url);
    return await fetch(url, {
      headers: {
        Authorization: `Bearer ${token ?? getCachedToken()}`,
      },
    });
  };

  const fetchAndDecodeStreamingEndpoint = async (endpoint: string) => {
    const response = await fetchStreamingEndpoint(endpoint);
    return decode(await response.arrayBuffer(), {});
  };

  const sendSocketEvent = (msg: any) => {
    if (hiroStreamingRefs.socket.current?.readyState !== WebSocket.OPEN) {
      nonProdDebugLog('Attempted to send msg but websocket is not open', msg);
      return;
    }

    hiroStreamingRefs.socket.current.send(JSON.stringify(msg));
  };

  const socketSubscribeToSym = () => {
    nonProdDebugLog('Subsribing to ', hiroStreamingRefs.currentSymbol.current);
    const msg = {
      action: 'subscribe',
      underlyings: [hiroStreamingRefs.currentSymbol.current],
      stream_types: streamingSubscribeBitmask,
    };
    sendSocketEvent(msg);
  };

  const onOpen = (_evt: any) => {
    nonProdDebugLog(
      '%c Websocket connected. Subscribing...',
      'background: #222; color: #bada55',
    );
    socketSubscribeToSym();
  };

  const onMessage = async (event: any) => {
    const { data } = event;
    if (typeof data === 'string') {
      console.warn('Ack from server: ', data);
    } else {
      const signalTuple: any = decode(await data.arrayBuffer(), {
        useBigInt64: true,
      });
      const [stream, signal] = signalTuple;
      // console.log(`onMessage event ${streamTypeEnumName(stream)}`, signal);
      updateCandleCallback(signal, stream);
    }
  };

  const onError = (event: any) => {
    // the event does not give us any information that is useful unfortunately
    console.error(event);
  };

  const onClose = (event: any) => {
    if (!enabled) {
      return;
    }

    const socketRetries = hiroStreamingRefs.socketRetries.current;

    if (socketRetries >= MAX_CLOSE_RETRIES) {
      // TODO - surface error to user via UX since the chart will be stalled
      console.warn(
        'Socket closed. Reached maximum number of retries to re-open, not restarting.',
        event,
      );
      return;
    }

    console.warn(
      `Socket closed. Retry #${socketRetries + 1} - restarting in 1 second...`,
      event,
    );
    hiroStreamingRefs.socketRetries.current = socketRetries + 1;
    setTimeout(() => {
      if (!isSocketOpen()) {
        authenticateAndConnect(false);
      }
    }, 1_000);
  };
  const isSocketOpen = () => {
    return hiroStreamingRefs.socket.current?.readyState === WebSocket.OPEN;
  };

  const cleanupSocket = (socketRef: any) => {
    return () => {
      nonProdDebugLog('Clearing out socket');
      if (socketRef.current != null) {
        socketRef.current.removeEventListener('open', onOpen);
        socketRef.current.removeEventListener('message', onMessage);
        socketRef.current.removeEventListener('error', onError);
        socketRef.current.removeEventListener('close', onClose);
        socketRef.current.close();
        socketRef.current = null;
      }
    };
  };

  const authenticateAndConnect = async (isNewConnection = true) => {
    const streamingToken = token ?? getCachedToken();
    if (streamingToken == null) {
      nonProdDebugLog('Found null streamingToken in authenticateAndConnect');
      return;
    }
    let socket = hiroStreamingRefs.socket.current;

    if (socket != null) {
      cleanupSocket(hiroStreamingRefs.socket)();
    }
    if (isNewConnection) {
      hiroStreamingRefs.socketRetries.current = 0;
    }
    socket = new WebSocket(
      `wss://${STREAM_HOST}/stream?token=${encodeURIComponent(streamingToken)}`,
    );

    socket.addEventListener('open', onOpen);
    socket.addEventListener('message', onMessage);
    socket.addEventListener('error', onError);
    socket.addEventListener('close', onClose);
    hiroStreamingRefs.socket.current = socket;

    nonProdDebugLog('Created Socket');
  };

  const reconnectSocket = () => {
    if (!enabled) {
      return;
    }

    if (isSocketOpen()) {
      sendSocketEvent({
        action: 'unsubscribe_all',
      });
      socketSubscribeToSym();
    } else if (endDate.isSame(getQueryDateFormatted(true), 'day')) {
      authenticateAndConnect();
    }

    return cleanupSocket(hiroStreamingRefs.socket);
  };

  const closeSocket = () => {
    if (isSocketOpen()) {
      sendSocketEvent({
        action: 'unsubscribe_all',
      });
      cleanupSocket(hiroStreamingRefs.socket)();
    }
  };

  useEffect(() => {
    // in case enabled state changes, close any open socket
    if (!enabled) {
      closeSocket();
    }
  }, [enabled]);

  return {
    fetchStreamingEndpoint,
    fetchAndDecodeStreamingEndpoint,
    reconnectSocket,
  };
};
