import { buffers, EventChannel } from "redux-saga";
import {
  all,
  call,
  cancelled,
  CancelledEffect,
  delay,
  Effect,
  put,
  takeEvery,
  takeLatest,
} from "redux-saga/effects";
import { fromUnixTime, getUnixTime } from "date-fns";

import { connectStream, takeDebounced } from "./channel/stream";
import * as loading from "../reducers/loading";

import { callConfig } from "../../api/callConfig";
import {
  DeviceSelection,
  QueryEventsRequest,
  QueryEventsResponse,
  StreamEventsRequest,
  StreamEventsResponse,
  TimeSeriesRangeSelection as TimeSeriesRangeSelectionPB,
} from "../../generated/sora/app/v1beta/service_pb";

import {
  fromPB as fromEventPB,
  addEvents,
  queryEventsRequested,
  queryEventsSucceeded,
  queryEventsFailed,
  startEventStream,
  computeChunksInRange,
  computeChunkBounds,
  computeChunkId,
} from "../reducers/events";
import {
  TimeSeriesRangeSelection,
  toPB,
} from "../reducers/time-series-range-selection";
import { ProjectId } from "../reducers/projects";
import { ChunkId, objectEntries } from "../reducers/chunks";

enum RequestState {
  // NEW = undefined,
  IN_PROGRESS = "IN_PROOGRSS",
  DONE = "DONE",
}

export default function* eventsSaga() {
  function* queryEventsChunk(
    projectId: ProjectId,
    chunkId: ChunkId,
  ): Generator<Effect<unknown>, void, unknown> {
    type Range = TimeSeriesRangeSelection;
    type RangePB = TimeSeriesRangeSelectionPB;
    type Res = QueryEventsResponse;

    const appService = callConfig.call.appServiceContext?.appService;
    if (!appService) {
      return;
    }

    const bounds = computeChunkBounds(chunkId);

    const startRange: Range = {
      time: {
        startIncl: fromUnixTime(bounds.startIncl),
        endIncl: fromUnixTime(bounds.endExcl), // the api defines time queries as being both inclusive, so this isn't quite correct but at least won't drop data.
      },
      cursor: null,
    };
    let rangePB: RangePB | null = toPB(startRange);

    while (rangePB != null) {
      console.debug(
        `querying events for chunk ${chunkId} with range ${rangePB
          .getStartTime()
          ?.toDate()}-${rangePB
          .getEndTime()
          ?.toDate()}, < ${rangePB.getEndCursor()}`,
      );

      const devices = new DeviceSelection().setUuidsList([]);
      const queryEventsReq = new QueryEventsRequest()
        .setDevices(devices)
        .setProjectId(projectId)
        .setRange(rangePB);

      let queryEventsResponse: Res;
      try {
        queryEventsResponse = (yield call(
          (x) => appService.queryEvents(x),
          queryEventsReq,
        )) as Res;
      } catch (e) {
        console.error(e);
        yield put(queryEventsFailed(e as Error));
        return;
      }

      const events = queryEventsResponse
        .getEventsList()
        .map((e) => fromEventPB(e));

      // we asked for endIncl, but our chunk defn requires endExcl
      const lastIndex = events.findLastIndex((ev) => ev.time < bounds.endExcl);
      if (lastIndex >= 0 && lastIndex + 1 < events.length) {
        // chop off any states that are not in the current chunk
        events.splice(lastIndex + 1);
      }

      let requestState: loading.RequestState;
      if (queryEventsResponse.hasCont()) {
        rangePB = queryEventsResponse.getCont()!; // eslint-disable-line @typescript-eslint/no-non-null-assertion
        requestState = { type: "pending" };
      } else {
        rangePB = null;
        requestState = { type: "succeeded" };
      }

      try {
        yield put(addEvents({ chunkId, events, requestState }));
      } catch (e) {
        // TODO: track per-chunk request state? currently
        // this just whinges a bit here then sets it as done.
        console.error(`querying chunk ${chunkId} failed: ${e}`);
        return;
      }
    }
  }

  const seenRequests = new Map<ChunkId, RequestState | undefined>(); // not really necessary, but for clarity that undefined has meaning.

  type QueryAction = ReturnType<typeof queryEventsRequested>;
  function* queryEventsSaga(action: QueryAction) {
    const range = action.payload.range;

    function* getChunksToQuery() {
      const { start, end } = range;
      for (const chunkId of computeChunksInRange(
        getUnixTime(start),
        getUnixTime(end),
      )) {
        if (seenRequests.get(chunkId)) {
          continue;
        }
        yield chunkId;
      }
    }

    const chunksToQuery = Array.from(getChunksToQuery()).reverse();

    for (const chunkId of chunksToQuery) {
      // if this has been started some other way, abort
      if (seenRequests.get(chunkId)) {
        continue;
      }

      seenRequests.set(chunkId, RequestState.IN_PROGRESS);
      yield* queryEventsChunk(action.payload.projectId, chunkId);
      seenRequests.set(chunkId, RequestState.DONE);
    }

    yield put(queryEventsSucceeded(action.payload));
  }

  type StartStream = ReturnType<typeof startEventStream>;

  function* streamEventsSaga(action: StartStream) {
    type Res = StreamEventsResponse;

    const appService = callConfig.call.appServiceContext?.appService;
    if (!appService) {
      return;
    }

    const projectId = action.payload.projectId;
    const deviceSelection = new DeviceSelection().setUuidsList([]);
    const req = new StreamEventsRequest()
      .setDevice(deviceSelection)
      .setProjectId(projectId);

    let eventChan: EventChannel<Res> | null = null;
    connection: while (true) {
      try {
        eventChan = connectStream(
          appService.streamEvents(req),
          buffers.expanding(10),
        );

        while (true) {
          const resps: Res[] = yield* takeDebounced(1, eventChan);

          const chunkedEvents = resps
            .flatMap((r) => r.getEventsList().map((e) => fromEventPB(e)))
            .sort((e1, e2) => e1.time - e2.time)
            .group((e) => computeChunkId(e.time));

          const actions = objectEntries(chunkedEvents).map(
            ([chunkId, events]) => ({
              chunkId,
              events,
              requestState: null,
            }),
          );

          yield all(actions.map((a) => put(addEvents(a))));
        }
      } catch (e) {
        console.error("error in event stream", e);
      } finally {
        eventChan?.close();
        const c: CancelledEffect = yield cancelled();
        console.log("event stream closed");
        if (c) {
          console.log("event stream cancelled");
          break connection;
        } else {
          console.log("restarting event stream after a few seconds");
          yield delay(3000);
          continue connection;
        }
      }
    }
  }

  yield all([
    takeEvery(queryEventsRequested, queryEventsSaga),
    takeLatest(startEventStream, streamEventsSaga),
  ]);
}
