import { useEffect, useMemo, useRef, useState } from 'react';
import { useRecoilState, useRecoilValue } from 'recoil';
import {
  ET,
  expectedStartEndForSymbol,
  getDateFormatted,
  hexToRGBA,
  insertIntoSortedArr,
  isBBEnvAvailable,
  isLatestTradingDay,
  isMarketOpen,
  isMarketOpenOnDate,
  isPostMarket,
  isPreMarket,
  lenseForStreamType,
  optionKeyForStreamType,
  streamTypeForLense,
  update,
  getCachedToken,
  splitCandle,
  getParsedManifest,
  utcMsToTzSecs,
} from '../../../util';
import dayjs from 'dayjs';
import {
  bottomCandlesTypeState,
  candleDurationState,
  hiroManifestState,
  negativeTrendColorState,
  positiveTrendColorState,
  streamingPricesOverrideForSymState,
  timezoneState,
} from 'states';
import { useStreamingRefs } from './useStreamingRefs';
import { useStreamingSocket } from './useStreamingSocket';
import {
  BottomCandlesType,
  HiroLense,
  HiroOptionType,
  PriceCandle,
  ProductType,
  StreamType,
} from 'types';
import { useTheme } from '@mui/material/styles';
import useLog from '../../../hooks/useLog';
import usePrices from './../api/usePrices';
import { useSetSym } from '../../hiro';
import { DefaultChartSettings } from '../../../config';

const LENSE_FILTER_TO_DATA_KEY = {
  all: 'all',
  next_exp: 'nextExp',
  retail: 'retail',
};

export enum INCLUDE_STREAMING_DATA_TYPE {
  HIRO = 1,
  PRICE = 1 << 1,
  BOTTOM_CANDLES = 1 << 2,
}

export type useStreamingProps = {
  token?: string;
  seriesRefs?: any;
  api?: any;
  priceSeriesRef?: any;
  setChartLoading: (loading: boolean) => void;
  includeStreamingData?: INCLUDE_STREAMING_DATA_TYPE;
  priceUpdateCallback?: (price: any, lastPrice: any) => void;
  hiroUpdateCallback?: (
    latestHiroVal: any,
    lense: HiroLense,
    option: HiroOptionType,
  ) => void;
  candleDurationSecs?: number;
  sym: string;
  showPremarket: boolean;
  showPostmarket: boolean;
  onlyShowExtendedHoursForLatestDay?: boolean;
  useCandlesForPrice?: boolean;
  useDayjs?: boolean;
  useUtcFakeOffset?: boolean;
  startDate: dayjs.Dayjs;
  endDate: dayjs.Dayjs;
  selectedLenses: HiroLense[];
  selectedOptionType: string;
  rollingSeconds: number;
  productType: ProductType;
};

const SERVER_CANDLE_DURATION_SECS = 5;

const useStreaming = (enabled: boolean, props?: useStreamingProps) => {
  const timezone = useRecoilValue(timezoneState);
  const {
    token,
    seriesRefs,
    api,
    priceSeriesRef,
    setChartLoading,
    showPremarket,
    showPostmarket,
    useCandlesForPrice,
    useDayjs,
    useUtcFakeOffset,
    onlyShowExtendedHoursForLatestDay,
    productType,
  } = props ?? {};
  const selectedLenses = props?.selectedLenses ?? [];
  const [hiroManifest, setHiroManifest] = useRecoilState(hiroManifestState);
  const selectedLensesStr = useMemo(
    () => (props?.selectedLenses ?? []).sort().join(','),
    [props],
  );

  const selectedOptionType =
    props?.selectedOptionType ?? DefaultChartSettings.optionType;
  const rollingSeconds =
    props?.rollingSeconds ?? DefaultChartSettings.sumWindow;
  // this is total in a lot of users settings, so we need to support that for now
  // but should be using HiroOptionType from now on
  const showTotal = 'total' === selectedOptionType;

  const streamingPricesOverrideForSym = useRecoilValue(
    streamingPricesOverrideForSymState,
  );
  const bottomCandlesType = useRecoilValue(bottomCandlesTypeState);
  const { getSym } = useSetSym();
  const sym = props?.sym ?? getSym();
  const [lastSym, setLastSym] = useState(sym);
  const candleDurationRecoilVal = useRecoilValue(candleDurationState);
  const theme = useTheme();
  const { logError, nonProdDebugLog } = useLog('useStreaming');
  const positiveTrendColor = useRecoilValue(positiveTrendColorState);
  const negativeTrendColor = useRecoilValue(negativeTrendColorState);

  const candleDuration = props?.candleDurationSecs ?? candleDurationRecoilVal;
  const startDate = props?.startDate ?? dayjs();
  const endDate = props?.endDate ?? dayjs();

  // by default, include all data types
  const includeStreamingData =
    props?.includeStreamingData ??
    INCLUDE_STREAMING_DATA_TYPE.HIRO |
      INCLUDE_STREAMING_DATA_TYPE.PRICE |
      INCLUDE_STREAMING_DATA_TYPE.BOTTOM_CANDLES;

  usePrices(updatePrice);

  const usePricesHistorical = true;
  const streamingToken = token ?? getCachedToken();
  const [liveStreamBucketingEnabled, setLiveStreamBucketingEnabled] =
    useState(false);

  const premarketEnabled = (date: dayjs.Dayjs) =>
    !!showPremarket &&
    (!onlyShowExtendedHoursForLatestDay || isLatestTradingDay(date));
  const postmarketEnabled = (date: dayjs.Dayjs) =>
    !!showPostmarket &&
    (!onlyShowExtendedHoursForLatestDay || isLatestTradingDay(date));

  const {
    hiroStreamingRefs,
    dataRefs,
    resetPricesRefs,
    resetCandlesRefs,
    getRolledCandles,
    getUnrolledCandles,
    calculateRolledCandles,
    isDataLoadedForLenses,
    getPrices,
    initUnrolledCandle,
    insertCandleIntoUnrolledCandles,
    insertBottomCandle,
    resetAllRefs,
    getBottomCandles,
  } = useStreamingRefs(enabled, showTotal, sym, rollingSeconds);

  const eligibleForLiveUpdate = (): boolean =>
    isLatestTradingDay(endDate) &&
    (isMarketOpen() ||
      (isPreMarket() && !!showPremarket) ||
      (isPostMarket() && !!showPostmarket));

  const {
    fetchStreamingEndpoint,
    fetchAndDecodeStreamingEndpoint,
    reconnectSocket,
  } = useStreamingSocket(
    enabled && eligibleForLiveUpdate(),
    hiroStreamingRefs,
    dataRefs,
    showTotal,
    addAndUpdateChartWithCandle,
    streamingSubscribeBitmask(),
    endDate,
    token,
  );

  function streamingSubscribeBitmask() {
    const usePricesRealtime = isBBEnvAvailable()
      ? streamingPricesOverrideForSym ===
          hiroStreamingRefs.currentSymbol.current ||
        process.env.REACT_APP_USE_STREAMING_PRICES_FOR_BBG_REALTIME === 'true'
      : true;

    let finalMask = 0;
    if ((includeStreamingData & INCLUDE_STREAMING_DATA_TYPE.HIRO) > 0) {
      const lensesMask = selectedLenses.reduce(
        (mask, lense) => mask | streamTypeForLense(lense, showTotal),
        0,
      );
      finalMask = finalMask | lensesMask;
    }

    if (
      (includeStreamingData & INCLUDE_STREAMING_DATA_TYPE.BOTTOM_CANDLES) >
      0
    ) {
      const bottomCandlesMask =
        bottomCandlesType === BottomCandlesType.ABSOLUTE_DELTA
          ? StreamType.AbsoluteTotalDelta
          : 0;
      finalMask = finalMask | bottomCandlesMask;
    }

    if ((includeStreamingData & INCLUDE_STREAMING_DATA_TYPE.PRICE) > 0) {
      const pricesMask = usePricesRealtime ? StreamType.AbsolutePrice : 0;
      finalMask = finalMask | pricesMask;
    }

    return finalMask;
  }

  function addAndUpdateChartWithCandle(candleArr: any, stream: StreamType) {
    if (!eligibleForLiveUpdate()) {
      nonProdDebugLog('not eligible for live updates, returning');
      return;
    }

    let symbol, timestamp, open, high, low, close;
    if (candleArr.length >= 6) {
      [symbol, timestamp, open, high, low, close] = candleArr;
    } else {
      [symbol, timestamp, high, low, close] = candleArr;
      open = 0;
    }
    if (symbol != sym) {
      return;
    }
    const time = roundToNearestCandleBucket(convertToSecs(timestamp));
    const candle = { open, high, low, close, time };
    if (stream === StreamType.AbsolutePrice) {
      addPriceAndUpdate(candle);
    } else {
      addCandleAndUpdate(candle, stream);
    }
  }

  const addPriceAndUpdate = (latestPrice: any) => {
    if (
      hiroStreamingRefs.chartUpdatingEnabled.current &&
      latestPrice != null &&
      eligibleForLiveUpdate()
    ) {
      updatePrice(latestPrice);

      if (api != null && selectedLenses.length > 0) {
        // If current time rolling is looking at latest data, update it's
        // rightmost bound to bring in new data
        const timeRange = api.timeScale().getVisibleLogicalRange();
        if (timeRange == null) {
          return;
        }
        const { from, to } = timeRange;
        const unrolledCandles = getUnrolledCandles(
          selectedLenses[0],
          showTotal ? 'TOT' : 'C',
        );
        if (to >= unrolledCandles.length && from != null && to != null) {
          let len = unrolledCandles.length;
          api.timeScale().setVisibleLogicalRange({
            from: from === 0 ? 0 : from + len,
            to: to + len,
          });
        }
      }
    }
  };

  const mergeCandles = (newCandle: any, candlesArr: any) => {
    if (candlesArr.length === 0) {
      return newCandle;
    }
    const lastCandle = candlesArr[candlesArr.length - 1];
    if (lastCandle.time !== newCandle.time) {
      return newCandle;
    }
    return {
      ...newCandle,
      high: Math.max(newCandle.high, lastCandle.high),
      low: Math.min(newCandle.low, lastCandle.low),
      close: lastCandle.close + newCandle.close,
    };
  };

  const addCandleAndUpdate = (newCandle: any, stream: StreamType) => {
    if (
      !newCandle ||
      Object.keys(newCandle).length === 0 ||
      !eligibleForLiveUpdate()
    ) {
      return null;
    }

    const lense = lenseForStreamType(stream);

    if (stream === StreamType.AbsoluteTotalDelta) {
      if (bottomCandlesType !== BottomCandlesType.ABSOLUTE_DELTA) {
        return;
      }
      newCandle = mergeCandles(newCandle, getBottomCandles(lense));
      insertBottomCandle(newCandle, lense);
      updateHistogram(newCandle, lense);
    } else {
      const option = optionKeyForStreamType(stream);
      const refCandlesUnrolled = getUnrolledCandles(lense, option);
      if (!refCandlesUnrolled) {
        return null;
      }
      newCandle = mergeCandles(newCandle, refCandlesUnrolled);
      insertCandleIntoUnrolledCandles(newCandle, lense, option);
      updateChart(newCandle, lense, option);

      if (
        bottomCandlesType === BottomCandlesType.FILTERED_DELTA &&
        option === HiroOptionType.TOT // only show histogram for total
      ) {
        updateHistogram(newCandle, lense);
      }
    }
  };

  const updateChart = (newCandle: any, lense: string, option: string) => {
    // this is called from the socket. don't use states here - everything must be refs
    if (
      hiroStreamingRefs.chartUpdatingEnabled.current &&
      getDateFormatted(dayjs().tz(ET)) === getDateFormatted(endDate) &&
      eligibleForLiveUpdate()
    ) {
      const rolledCandles = calculateRolledCandles(
        lense,
        option,
        hiroStreamingRefs.rollingSeconds.current,
      );
      if (rolledCandles.length === 0) {
        return;
      }

      if (seriesRefs != null) {
        const chartRef = seriesRefs[lense][option];
        update(chartRef.current, [rolledCandles[rolledCandles.length - 1]]);
      }

      if (props?.hiroUpdateCallback) {
        props.hiroUpdateCallback(
          rolledCandles[rolledCandles.length - 1],
          lense as HiroLense,
          option as HiroOptionType,
        );
      }
    }
  };

  const updateHistogram = (newCandle: any, lense: string) => {
    if (
      hiroStreamingRefs.chartUpdatingEnabled.current &&
      eligibleForLiveUpdate() &&
      seriesRefs != null &&
      seriesRefs[lense] &&
      seriesRefs[lense].histogram
    ) {
      const histogramRef = seriesRefs[lense].histogram;
      update(histogramRef.current, [candleToHistogramEntry(newCandle)]);
    }
  };

  const convertToSecs = (ts: dayjs.Dayjs | number) => {
    if (!useUtcFakeOffset) {
      return Math.floor(Number(ts) / 1000);
    }

    return utcMsToTzSecs(ts, timezone);
  };

  const roundToNearestCandleBucket = (time: number, ms = false) => {
    let divisor = candleDuration;
    if (ms) {
      divisor *= 1000;
    }

    return Math.floor(time / divisor) * divisor;
  };

  const getBottomCandlesLense = () => {
    if (selectedLenses.length === 0) {
      return null;
    }
    return selectedLenses.length !== 1 ? 'all' : selectedLenses[0];
  };

  const getQueryStreamingCandlesParams = () => {
    if ((includeStreamingData & INCLUDE_STREAMING_DATA_TYPE.HIRO) === 0) {
      nonProdDebugLog(
        'hiro not included in streaming options, not fetching candles for it',
      );
      return [];
    }
    // option = 'total' (default) | 'call' | 'put'
    // filter = 'all' (default) | 'next_exp' | 'retail'
    // field = 'delta' ( default) | 'gamma' | 'vega' | 'price'
    const options = showTotal ? ['total'] : ['call', 'put'];
    const filtersToFetch = selectedLenses.map(
      (lense: string) =>
        Object.keys(LENSE_FILTER_TO_DATA_KEY).find(
          (key) => (LENSE_FILTER_TO_DATA_KEY as any)[key] === lense,
        )!,
    );

    return options.flatMap((option) => {
      return filtersToFetch.flatMap((filter: string) => {
        return [
          {
            option,
            filter,
            field: 'delta',
          },
        ];
      });
    });
  };

  const addToPricesArr = (
    ts: number,
    open: number,
    high: number,
    low: number,
    close: number,
  ) => {
    const time: any = { time: ts };
    if (useDayjs) {
      time.datetime = dayjs.unix(ts);
    }
    let newEntry;
    if (useCandlesForPrice) {
      newEntry = {
        ...time,
        open,
        high,
        low,
        close,
      };
    } else {
      newEntry = {
        ...time,
        value: close,
      };
    }

    getPrices().push(newEntry);
    return newEntry;
  };

  const queryStreamingCandlesData = async () => {
    if (streamingToken == null) {
      nonProdDebugLog('Streaming token is null');
      return;
    }
    disableChartUpdating();
    resetCandlesRefs(sym, true);
    reconnectSocket();
    const symbol = sym;

    const start = expectedStartEndForSymbol(
      sym,
      startDate,
      premarketEnabled(startDate),
      postmarketEnabled(startDate),
      productType,
    ).start;

    const end = expectedStartEndForSymbol(
      sym,
      startDate,
      premarketEnabled(endDate),
      postmarketEnabled(endDate),
      productType,
    ).end;

    // option = 'total' (default) | 'call' | 'put'
    // filter = 'all' (default) | 'next_exp' | 'retail'
    // field = 'delta' ( default) | 'gamma' | 'vega' | 'price'
    const endpointPrefix = `/candles?sym=${encodeURIComponent(
      symbol,
    )}&start=${start.valueOf()}&end=${end.valueOf()}`;

    const includePrices = usePricesHistorical;
    const paramsArr = getQueryStreamingCandlesParams();
    const candlesPromises = paramsArr.map(async (params: any) => {
      try {
        const endpoint = `${endpointPrefix}&option=${params.option}&filter=${params.filter}&field=${params.field}`;
        nonProdDebugLog('fetching endpoint', endpoint);
        const data = await fetchAndDecodeStreamingEndpoint(endpoint);
        return { ...params, data };
      } catch (e) {
        logError(e, 'queryStreamingCandlesData');
        return null;
      }
    });

    const pricePromise = async () => {
      if ((includeStreamingData & INCLUDE_STREAMING_DATA_TYPE.PRICE) === 0) {
        return null;
      }

      try {
        const endpoint = `${endpointPrefix}&field=price`;
        const data = await fetchAndDecodeStreamingEndpoint(endpoint);
        return { field: 'price', data };
      } catch (e) {
        logError(e, 'pricePromise');
        return null;
      }
    };

    const bottomCandlesHistogramPromise = async () => {
      if (
        (includeStreamingData & INCLUDE_STREAMING_DATA_TYPE.BOTTOM_CANDLES) ===
        0
      ) {
        return null;
      }

      try {
        const lense = getBottomCandlesLense();
        if (
          bottomCandlesType === BottomCandlesType.FILTERED_DELTA ||
          lense == null
        ) {
          return null;
        }

        let data;
        let endpoint = '';
        if (bottomCandlesType === BottomCandlesType.ABSOLUTE_DELTA) {
          endpoint = `${endpointPrefix}&filter=${lense}&stream=absolute&field=delta`;
        } else if (bottomCandlesType === BottomCandlesType.SIZE) {
          endpoint = `${endpointPrefix}&field=size`;
        } else {
          logError(
            'Unrecognized bottom candles type, returning...',
            'bottomCandlesHistogramPromise',
            { bottomCandlesType },
          );
          return null;
        }

        data = await fetchAndDecodeStreamingEndpoint(endpoint);
        return { field: 'bottom_candles', data };
      } catch (e) {
        logError(e, 'bottomCandlesHistogramPromise');
        return null;
      }
    };

    const allData = await Promise.all([
      // order here matters. candlesPromises must come before latestCandleEventsPromise
      ...candlesPromises,
      includePrices ? pricePromise() : Promise.resolve(),
      bottomCandlesHistogramPromise(),
    ]);

    nonProdDebugLog('fetched streaming candles data', allData);

    const parsePriceData = (priceData: any) => {
      resetPricesRefs(symbol);

      let currentDay = start;
      let expectedStartEnd = expectedStartEndForSymbol(
        sym,
        currentDay,
        premarketEnabled(currentDay),
        postmarketEnabled(currentDay),
        productType,
      );
      let expectedTsStart = convertToSecs(expectedStartEnd.start.valueOf());
      let expectedTsEnd = convertToSecs(expectedStartEnd.end.valueOf());

      let lastBucketedCandle = null;
      for (let i = 0; i < priceData.length; i++) {
        const price = priceData[i];
        let [timestamp, open, high, low, close] = splitCandle(price);
        const ts = convertToSecs(timestamp);
        const bucketTs = roundToNearestCandleBucket(ts);

        if (ts < expectedTsStart) {
          continue;
        } else if (ts > expectedTsEnd) {
          if (getDateFormatted(currentDay) >= getDateFormatted(endDate)) {
            continue;
          }
          currentDay = currentDay.add(1, 'day');
          while (!isMarketOpenOnDate(currentDay)) {
            currentDay = currentDay.add(1, 'day');
          }
          expectedStartEnd = expectedStartEndForSymbol(
            sym,
            currentDay,
            premarketEnabled(currentDay),
            postmarketEnabled(currentDay),
            productType,
          );
          expectedTsStart = convertToSecs(expectedStartEnd.start.valueOf());
          expectedTsEnd = convertToSecs(expectedStartEnd.end.valueOf());
          if (ts < expectedTsStart || ts > expectedTsEnd) {
            continue;
          }
        }

        if (lastBucketedCandle != null) {
          const [lastTs, lastOpen, lastHigh, lastLow, lastClose] =
            splitCandle(lastBucketedCandle);

          if (lastTs === bucketTs) {
            high = Math.max(lastHigh, high);
            low = Math.min(lastLow, low);
            open = lastOpen;
          } else {
            addToPricesArr(lastTs, lastOpen, lastHigh, lastLow, lastClose);
          }
        }

        lastBucketedCandle = [bucketTs, open, high, low, close];
      }

      if (
        getPrices().length > 0 &&
        lastBucketedCandle != null &&
        getPrices()[getPrices().length - 1].time !== lastBucketedCandle[0]
      ) {
        const [lastTs, lastOpen, lastHigh, lastLow, lastClose] =
          splitCandle(lastBucketedCandle);
        addToPricesArr(lastTs, lastOpen, lastHigh, lastLow, lastClose);
      }

      nonProdDebugLog('price candles after parse', getPrices());
    };

    const parseCandleData = (candleDataAndParams: any) => {
      let filterDataKey = candleDataAndParams.filter;
      if (filterDataKey === 'next_exp') {
        filterDataKey = HiroLense.NextExp;
      }
      let optionDataKey =
        candleDataAndParams.option === 'total'
          ? HiroOptionType.TOT
          : candleDataAndParams.option === 'call'
          ? HiroOptionType.C
          : HiroOptionType.P;

      // once data is loaded, we can initialize the candles for that lense+option to prepare them to have data added
      initUnrolledCandle(filterDataKey, optionDataKey, symbol);

      let i = 0;
      let currentDay = start;

      while (currentDay < end) {
        const expectedStartEnd = expectedStartEndForSymbol(
          sym,
          currentDay,
          premarketEnabled(currentDay),
          postmarketEnabled(currentDay),
          productType,
        );

        const expectedTsStart = expectedStartEnd.start.unix();
        const expectedTsEnd = Math.min(
          dayjs().tz(ET).unix(),
          expectedStartEnd.end.unix(),
        );

        let lastBucketedCandle = null;

        for (
          let es = expectedTsStart;
          es < expectedTsEnd;
          es += SERVER_CANDLE_DURATION_SECS
        ) {
          let ts = 1;
          let candleData = [];
          let shouldIncrement = true;

          // find starting point
          while (ts > 0 && es > ts) {
            candleData =
              i < candleDataAndParams.data.length
                ? candleDataAndParams.data[i]
                : [];
            ts = candleData[0] ? candleData[0] / 1000 : 0;
            if (es > ts) {
              i += 1;
            }
          }

          if (ts > es) {
            candleData = [es * 1000, 0, 0, 0, 0];
            ts = es;
            shouldIncrement = false;
          }

          if (candleData.length > 0 && ts === es) {
            let [timestamp, _open, high, low, close] = splitCandle(candleData);

            if (
              candleData.length > 0 &&
              i < candleDataAndParams.data.length - 1 &&
              timestamp === candleDataAndParams.data[i + 1][0]
            ) {
              // may have duplicate timestamps in array. if so ignore the earliest ones
              continue;
            }

            const bucketTs = convertToSecs(
              roundToNearestCandleBucket(timestamp, true),
            );

            if (lastBucketedCandle != null) {
              if (lastBucketedCandle.time === bucketTs) {
                high = Math.max(lastBucketedCandle.high, high);
                low = Math.min(lastBucketedCandle.low, low);
                close += lastBucketedCandle.close;
              } else {
                insertCandleIntoUnrolledCandles(
                  lastBucketedCandle,
                  filterDataKey,
                  optionDataKey,
                );
              }
            }

            lastBucketedCandle = { time: bucketTs, open: 0, high, low, close };
            if (shouldIncrement) {
              i += 1;
            }
          }
        }

        insertCandleIntoUnrolledCandles(
          lastBucketedCandle,
          filterDataKey,
          optionDataKey,
        );
        currentDay = currentDay.add(1, 'day');
        while (!isMarketOpenOnDate(currentDay)) {
          currentDay = currentDay.add(1, 'day');
        }
      }

      nonProdDebugLog(
        'unrolled hiro candles after parse for ALL TOT',
        getUnrolledCandles(HiroLense.All, HiroOptionType.TOT),
      );
    };

    const parseBottomCandles = (data: any) => {
      const lense = getBottomCandlesLense();
      if (!Array.isArray(data) || lense == null) {
        return;
      }
      let currentDay = start;
      let expectedStartEnd = expectedStartEndForSymbol(
        sym,
        currentDay,
        premarketEnabled(currentDay),
        postmarketEnabled(currentDay),
        productType,
      );
      let expectedTsStart = convertToSecs(expectedStartEnd.start.valueOf());
      let expectedTsEnd = convertToSecs(expectedStartEnd.end.valueOf());
      let lastBucketedCandle = null;

      for (const event of data) {
        if (!Array.isArray(event)) {
          continue;
        }
        let [time, _open, high, low, close] = splitCandle(event);
        const ts = convertToSecs(time);

        if (ts < expectedTsStart) {
          continue;
        } else if (ts > expectedTsEnd) {
          if (getDateFormatted(currentDay) >= getDateFormatted(endDate)) {
            continue;
          }
          currentDay = currentDay.add(1, 'day');
          while (!isMarketOpenOnDate(currentDay)) {
            currentDay = currentDay.add(1, 'day');
          }
          expectedStartEnd = expectedStartEndForSymbol(
            sym,
            currentDay,
            premarketEnabled(currentDay),
            postmarketEnabled(currentDay),
            productType,
          );
          expectedTsStart = convertToSecs(expectedStartEnd.start.valueOf());
          expectedTsEnd = convertToSecs(expectedStartEnd.end.valueOf());

          if (ts < expectedTsStart || ts > expectedTsEnd) {
            continue;
          }
        }

        const bucketTs = roundToNearestCandleBucket(ts, false);
        if (lastBucketedCandle != null) {
          if (lastBucketedCandle.time === bucketTs) {
            high = Math.max(lastBucketedCandle.high, high);
            low = Math.min(lastBucketedCandle.low, low);
            close += lastBucketedCandle.close;
          } else {
            insertIntoSortedArr(
              dataRefs[lense].TOT.bottomCandles.current[sym],
              lastBucketedCandle,
              'time',
              { replace: true },
            );
          }
        }

        lastBucketedCandle = { time: bucketTs, open: 0, high, low, close };
      }

      insertIntoSortedArr(
        dataRefs[lense].TOT.bottomCandles.current[sym],
        lastBucketedCandle,
        'time',
        { replace: true },
      );
    };

    for (const dataAndParams of allData) {
      if (!dataAndParams) {
        continue;
      }

      if (dataAndParams.field === 'price') {
        parsePriceData(dataAndParams.data);
      } else if (dataAndParams.field === 'delta') {
        parseCandleData(dataAndParams);
      } else if (dataAndParams.field === 'bottom_candles') {
        parseBottomCandles(dataAndParams.data);
      }
    }

    enableChartUpdating();
  };

  const enableChartUpdating = () => {
    if (hiroStreamingRefs.chartUpdatingEnabled.current) {
      return;
    }

    hiroStreamingRefs.chartUpdatingEnabled.current = true;
    setLiveStreamBucketingEnabled(true);
    setChartLoading!(false);
  };

  const disableChartUpdating = () => {
    hiroStreamingRefs.chartUpdatingEnabled.current = false;
    setLiveStreamBucketingEnabled(false);
    setChartLoading!(true);
  };

  useEffect(() => {
    if (!enabled || !liveStreamBucketingEnabled) {
      return;
    }

    return reconnectSocket();
  }, [
    liveStreamBucketingEnabled,
    sym,
    selectedLensesStr,
    endDate,
    streamingPricesOverrideForSym,
    bottomCandlesType,
    enabled,
  ]);

  useEffect(() => {
    if (!enabled) {
      return;
    }

    const symChanged = sym !== lastSym;
    if (symChanged) {
      hiroStreamingRefs.currentSymbol.current = sym;
      resetAllRefs();
      setLastSym(sym);
    }

    queryStreamingCandlesData();
  }, [
    bottomCandlesType,
    showPremarket,
    showPostmarket,
    startDate,
    endDate,
    sym,
    selectedLensesStr,
    showTotal,
    candleDuration,
    timezone,
    enabled,
  ]);

  useEffect(() => {
    if (!enabled) {
      return;
    }

    let retries = 0;
    const maxRetries = 3;
    async function fetchManifest() {
      const response = await fetchStreamingEndpoint('/manifest');
      const data = await response.json();
      setHiroManifest(getParsedManifest(data));
    }

    try {
      if (streamingToken == null || hiroManifest != null) {
        return;
      }
      fetchManifest();
    } catch (err) {
      logError(err);
      retries += 1;
      if (retries < maxRetries) {
        setTimeout(() => fetchManifest(), 3000);
      }
    }
  }, [streamingToken, enabled]);

  useEffect(() => {
    if (!enabled) {
      return;
    }

    if (isDataLoadedForLenses(selectedLenses)) {
      for (const lense of selectedLenses) {
        for (const option of showTotal ? ['TOT'] : ['C', 'P']) {
          calculateRolledCandles(lense, option, rollingSeconds);
        }
      }
    }
  }, [selectedLensesStr, showTotal, rollingSeconds, enabled]);

  const candleToHistogramEntry = (candle: PriceCandle | null) => {
    if (candle == null) {
      return null;
    }

    const alpha = 0.6;
    let color = theme.palette.hiro.bottomCandles.absolute;

    if (bottomCandlesType === BottomCandlesType.FILTERED_DELTA) {
      color = candle.close > 0 ? positiveTrendColor : negativeTrendColor;
    }

    return {
      value: Math.abs(candle.close),
      time: candle.time,
      color: hexToRGBA(color, alpha),
    };
  };

  const getHistogramData = (lense: HiroLense) => {
    let arr: any[] = [];
    if (bottomCandlesType === BottomCandlesType.FILTERED_DELTA) {
      arr = getUnrolledCandles(lense, 'TOT');
    } else if (bottomCandlesType === BottomCandlesType.ABSOLUTE_DELTA) {
      arr = getBottomCandles(lense);
    }

    if (!arr?.length) {
      return [];
    }

    return arr.flatMap((candle: any) => {
      const e = candleToHistogramEntry(candle);
      return e == null ? [] : [e];
    });
  };

  function updatePrice(newPrice: any) {
    if (!newPrice) {
      return;
    }

    try {
      const currPrices = getPrices();
      let newPriceTime = roundToNearestCandleBucket(newPrice.time);
      let price = { ...newPrice, time: newPriceTime };

      const lastPrice =
        currPrices.length > 0 ? currPrices[currPrices.length - 1] : null;

      if (lastPrice != null) {
        if (price.time < lastPrice.time) {
          // we already have a more recent price, so no need to update
          return;
        }

        if (price.time === lastPrice.time) {
          // remove last time from data arr to avoid having duplicates
          currPrices.splice(currPrices.length - 1, 1);

          if (useCandlesForPrice) {
            price = {
              high: Math.max(price.high, lastPrice.high),
              low: Math.min(price.low, lastPrice.low),
              open: lastPrice.open,
              close: price.close,
              time: price.time,
            };
          }
        }
      }

      const newEntry = addToPricesArr(
        price.time,
        price.open,
        price.high,
        price.low,
        price.close,
      );

      if (props?.priceUpdateCallback) {
        props.priceUpdateCallback(newEntry, lastPrice);
      }

      if (priceSeriesRef != null) {
        update(priceSeriesRef.current, [newEntry]);
      }
    } catch (e: any) {
      logError(e, 'updatePrice');
    }
  }

  return {
    getPrices,
    getHistogramData,
    getRolledCandles,
    getUnrolledCandles,
    candleToHistogramEntry,
    roundToNearestCandleBucket,
  };
};

export default useStreaming;
