import { EventChannel, buffers } from "redux-saga";
import {
  all,
  call,
  put,
  takeEvery,
  takeLeading,
  cancelled,
  delay,
  Effect,
} from "redux-saga/effects";
import { fromUnixTime, getUnixTime } from "date-fns";

import {
  DeviceSelection,
  QueryStateRequest,
  QueryStateResponse,
  StreamStateRequest,
  StreamStateResponse,
  TimeSeriesRangeSelection as TimeSeriesRangeSelectionPB,
} from "../../generated/sora/app/v1beta/service_pb";

import { connectStream, takeDebounced } from "./channel/stream";
import {
  GroupedStates,
  fromPB as fromAnnotatedDeviceStatePB,
  queryDeviceStateFailed,
  queryDeviceStateRequested,
  queryDeviceStateSucceeded,
  startDeviceStateStream,
  upsertGroupedStates,
  computeChunkBounds,
  computeChunksInRange,
  chunkGroupedItems,
} from "../reducers/device-state";
import {
  TimeSeriesRangeSelection,
  toPB,
} from "../reducers/time-series-range-selection";
import { callConfig } from "../../api/callConfig";
import { ProjectId } from "../reducers/projects";
import { ChunkId, objectEntries } from "../reducers/chunks";

enum RequestState {
  // NEW = undefined,
  IN_PROGRESS = "IN_PROOGRSS",
  DONE = "DONE",
}

export default function* deviceStateSaga() {
  function* queryDeviceStateChunk(
    chunkId: ChunkId,
    projectId: ProjectId,
  ): Generator<Effect<unknown>, void, unknown> {
    type Range = TimeSeriesRangeSelection;
    type RangePB = TimeSeriesRangeSelectionPB;
    type Res = QueryStateResponse;
    type GS = GroupedStates;

    const appService = callConfig.call.appServiceContext?.appService;
    if (!appService) {
      return;
    }

    const bounds = computeChunkBounds(chunkId);

    const startRange: Range = {
      time: {
        startIncl: fromUnixTime(bounds.startIncl),
        endIncl: fromUnixTime(bounds.endExcl), // the api defines time queries as being both inclusive, so this isn't quite correct but at least won't drop data.
      },
      cursor: null,
    };
    let rangePB: RangePB | null = toPB(startRange);

    while (rangePB !== null) {
      console.debug(
        `querying device states for chunk ${chunkId} with range ${rangePB
          .getStartTime()
          ?.toDate()}-${rangePB
          .getEndTime()
          ?.toDate()} < ${rangePB.getEndCursor()}`,
      );

      const devices = new DeviceSelection().setUuidsList([]);
      const queryStateReq = new QueryStateRequest()
        .setDevices(devices)
        .setProjectId(projectId)
        .setRange(rangePB);

      let queryStateResponse: Res;
      try {
        queryStateResponse = (yield call(
          (x) => appService.queryState(x),
          queryStateReq,
        )) as Res;
      } catch (e) {
        console.error(e);
        yield put(queryDeviceStateFailed(e as Error));
        return;
      }

      const groupedStates: GS = {};
      queryStateResponse.getGroupedStatesMap().forEach((state_pb, id) => {
        const devStates = state_pb
          .getStatesList()
          .map(fromAnnotatedDeviceStatePB);
        // we asked for endIncl, but our chunk defn requires endExcl
        const lastIndex = devStates.findLastIndex(
          (ds) => ds.time < bounds.endExcl,
        );
        if (lastIndex >= 0 && lastIndex + 1 < devStates.length) {
          // chop off any states that are not in the current chunk
          devStates.splice(lastIndex + 1);
        }
        groupedStates[id] = devStates;
      });

      let requestState;
      if (queryStateResponse.hasCont()) {
        // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
        rangePB = queryStateResponse.getCont()!;
        requestState = { type: "pending" } as const;
      } else {
        rangePB = null;
        requestState = { type: "succeeded" } as const;
      }

      yield put(
        upsertGroupedStates({ chunkId, states: groupedStates, requestState }),
      );
    }
  }

  const seenRequests = new Map<ChunkId, RequestState | undefined>(); // not really necessary, but for clarity that undefined has meaning.

  type QueryAction = ReturnType<typeof queryDeviceStateRequested>;
  function* queryDeviceStateSaga(action: QueryAction) {
    const range = action.payload.range;

    function* getChunksToQuery() {
      const { start, end } = range;
      for (const chunkId of computeChunksInRange(
        getUnixTime(start),
        getUnixTime(end),
      )) {
        if (seenRequests.has(chunkId)) {
          continue;
        }
        yield chunkId;
      }
    }

    const chunksToQuery = Array.from(getChunksToQuery()).reverse();

    for (const chunkId of chunksToQuery) {
      // if this has been started some other way, abort
      if (seenRequests.get(chunkId)) {
        continue;
      }

      seenRequests.set(chunkId, RequestState.IN_PROGRESS);
      yield* queryDeviceStateChunk(chunkId, action.payload.projectId);
      seenRequests.set(chunkId, RequestState.DONE);
    }

    yield put(queryDeviceStateSucceeded(action.payload));
  }

  type StreamAction = ReturnType<typeof startDeviceStateStream>;
  function* streamDeviceStateSaga(action: StreamAction) {
    type Res = StreamStateResponse;
    type GS = GroupedStates;

    const appService = callConfig.call.appServiceContext?.appService;
    if (!appService) {
      return;
    }

    const projectId = action.payload.projectId;

    const deviceSelection = new DeviceSelection().setUuidsList([]);
    const req = new StreamStateRequest()
      .setDevice(deviceSelection)
      .setProjectId(projectId);

    let deviceStateChan: EventChannel<Res> | null = null;
    // connection retry loop
    connection: while (true) {
      try {
        deviceStateChan = connectStream(
          appService.streamState(req),
          buffers.expanding(10),
        );

        while (true) {
          // simple debounce: wait 1 sec, ...
          const resps: Res[] = yield* takeDebounced(1, deviceStateChan);

          const groupedStates: GS = {};
          for (const r of resps) {
            for (const annotatedState of r.getStatesList()) {
              const state = fromAnnotatedDeviceStatePB(annotatedState);
              groupedStates[state.id] = groupedStates[state.id] || [];
              groupedStates[state.id].push(state);
            }
          }

          const chunkedGroupedStates = chunkGroupedItems(groupedStates);

          // now we can update redux with our batch of devicestates all at once, which is
          // pretty important when our map takes like 50ms to render every update :(
          yield all(
            objectEntries(chunkedGroupedStates.chunks).map(
              ([chunkId, states]) =>
                put(
                  upsertGroupedStates({
                    chunkId,
                    states,
                    requestState: null,
                  }),
                ),
            ),
          );
        }
      } catch (e) {
        console.error("error in device state stream", e);
      } finally {
        console.log("device state stream closed");
        const c: boolean = yield cancelled();
        if (c) {
          console.log("device state stream cancelled");
          deviceStateChan?.close();
          break connection;
        } else {
          console.log("retrying device state connection after a few secons");
          yield delay(3000);
          continue connection;
        }
      }
    }
  }

  yield all([
    takeEvery(queryDeviceStateRequested, queryDeviceStateSaga),
    takeLeading(startDeviceStateStream, streamDeviceStateSaga),
  ]);
}
