import { FixedSizeList, Type, Vector, Table } from "apache-arrow";
import * as arrow from "apache-arrow";
import assert from "assert";

/**
 * @param input A `Vector<FixedSizeList>`, e.g. [[1,2],[3,4],[5,6]]
 * @param zeroCopy Whether to enforce that this should be a zero-copy operation
 * @returns A TypedArray of the flattened list, e.g. Float64Array([1,2,3,4,5,6]). If
 *           `input` has only one internal chunk, then this is a zero copy operation.
 */
export function flattenFixedSizeList<T extends arrow.Float | arrow.Int>(
  input: Vector<FixedSizeList<T>>,
  zeroCopy = false,
): T["TArray"] {
  const listType = input.type.typeId;
  if (listType != Type.FixedSizeList) {
    throw new TypeError(
      `This only works on FixedSizeLists, you passed a ${input.type.toString()}`,
    );
  }

  if (input.type.children.length != 1) {
    throw new Error("unexpected input type.children.length");
  }

  const childType = input.type.children[0];

  if (input.data.length == 0) {
    //empty
    return new childType.type.ArrayType();
  } else if (input.data.length == 1) {
    // already flat
    const data = input.data[0];
    ASSERTS && assert(data.children.length == 1); // FSL should guarantee this
    const child = data.children[0];
    return child.values.subarray(0, child.length * child.stride);
  } else if (input.data.length > 1) {
    // need to flatten

    if (ASSERTS && zeroCopy) {
      throw new Error("tried to flatten a fsl that was not really flat");
    }

    const firstData = input.data[0];
    let totalLength = 0;
    const stride = firstData.children[0].stride;
    for (const data of input.data) {
      if (data.children.length != 1) {
        throw new Error("unexpected input data[?].children.length");
      }
      totalLength += data.children[0].length;
    }

    const ret = new childType.type.ArrayType(totalLength * stride);
    let offset = 0;
    for (const data of input.data) {
      ASSERTS && assert(data.children.length == 1); // FSL should guarantee this
      const child = data.children[0] as arrow.Data<T>;
      const values = child.values.subarray(0, child.length * child.stride);
      // eslint-disable-next-line @typescript-eslint/ban-ts-comment
      // @ts-ignore - typescript has issues with the type of set() varying with `values` here.
      ret.set(values, offset);
      offset += values.length;
    }
    return ret;
  }

  throw new Error("unreachable");
}

function _flattenBatchesValueOffsets<T extends arrow.Utf8>(
  data: arrow.Data<T>[],
): arrow.Data<T> {
  const b0 = data[0];
  const bufferLength = data.reduce((sum, batch) => {
    const length = batch.valueOffsets[(batch.offset, batch.length)];
    return sum + length;
  }, 0);
  const rowLength = data.reduce((sum, batch) => sum + batch.length, 0);

  ASSERTS && assert(b0.children.length == 0);

  const buf = new b0.ArrayType(bufferLength);
  const nullBitmap = undefined; // TODO?
  const nullCount = 0; // TODO?
  const valueOffsets = new Int32Array(rowLength + 1);

  let offset = 0;
  let valueOffsetIdx = 0;
  let valueOffsetValue = 0;

  for (const batch of data) {
    const length = batch.valueOffsets[batch.length];

    const values: T["TArray"] = batch.values.subarray(batch.offset, length);
    buf.set(values, offset);

    const batchVO = batch.valueOffsets.subarray(batch.offset, batch.length + 1);
    valueOffsets.set(
      batchVO.map((x) => x + valueOffsetValue),
      valueOffsetIdx,
    );
    // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
    valueOffsetValue += batchVO.at(-1)!;
    valueOffsetIdx += batchVO.length - 1;

    offset += values.length;
  }

  return new arrow.Data(b0.type, 0, rowLength, nullCount, [
    valueOffsets,
    new b0.ArrayType(buf),
    nullBitmap,
  ]);
}

function _flattenBatchesNumeric<T extends arrow.Float | arrow.Int>(
  data: arrow.Data<T>[],
): arrow.Data<T> {
  const b0 = data[0];

  const bufferLength = data.reduce((sum, batch) => {
    const length = batch.length;
    return sum + length;
  }, 0);
  const rowLength = data.reduce((sum, batch) => sum + batch.length, 0);

  ASSERTS && assert(b0.children.length == 0);

  const buf = new b0.ArrayType(bufferLength);
  const valueOffsets = undefined;
  const nullBitmap = undefined; // TODO?
  const nullCount = 0; // TODO?

  let offset = 0;
  for (const batch of data) {
    const length = batch.length * batch.stride;

    const values = batch.values.subarray(batch.offset, length);
    // eslint-disable-next-line @typescript-eslint/ban-ts-comment
    // @ts-ignore - typescript can't cope with set()
    buf.set(values, offset);

    offset += values.length;
  }
  return new arrow.Data(b0.type, 0, rowLength, nullCount, [
    valueOffsets,
    new b0.ArrayType(buf),
    nullBitmap,
  ]);
}

function _flattenBatchesStructured<T extends CompoundData>(
  data: arrow.Data<T>[],
): arrow.Data<T> {
  const b0 = data[0];
  const rowLength = data.reduce((sum, batch) => sum + batch.length, 0);
  const numChildren = b0.children.length;
  ASSERTS && assert(numChildren > 0);

  const children: arrow.Data<T["TChildren"]>[] = new Array(numChildren);
  for (let c = 0; c < numChildren; c++) {
    children[c] = _flattenBatches(data.map((d) => d.children[c]));
  }
  const nullCount = 0;
  const d = new arrow.Data(
    b0.type,
    0,
    rowLength,
    nullCount,
    [undefined, undefined, undefined],
    children,
  );
  return d;
}

type CompoundData = arrow.FixedSizeList | arrow.Struct;

function _flattenBatches<
  D extends
    | arrow.Data<CompoundData>[]
    | arrow.Data<arrow.Utf8>[]
    | arrow.Data<arrow.Float | arrow.Int>[]
    | arrow.Data[],
>(data: D): typeof data[0] {
  const b0 = data[0];
  if (b0.children.length > 0) {
    // structured data
    if (
      !(
        arrow.DataType.isFixedSizeList(b0.type) ||
        arrow.DataType.isStruct(b0.type)
      )
    ) {
      throw new Error(`unsupported compound type ${b0.type}`);
    }
    // sigh, i give up
    const _data = data as arrow.Data<CompoundData>[];
    return _flattenBatchesStructured(_data);
  } else if (b0.valueOffsets) {
    // raw varlength data
    if (!arrow.DataType.isUtf8(b0.type)) {
      throw new Error(`unsupported valueoffsets type ${b0.type}`);
    }
    const _data = data as arrow.Data<arrow.Utf8>[];
    return _flattenBatchesValueOffsets(_data);
  } else {
    // everything else
    if (!(arrow.DataType.isInt(b0) || arrow.DataType.isFloat(b0))) {
      throw new Error(`unsuppored type ${b0.type}`);
    }
    const _data = data as arrow.Data<arrow.Int | arrow.Float>[];
    return _flattenBatchesNumeric(_data);
  }
}

export function flattenBatches<TM extends arrow.TypeMap>(
  table: arrow.Table<TM>,
): typeof table {
  return new Table(
    new arrow.RecordBatch(table.schema, _flattenBatches(table.data)),
  );
}

/**
 * given a typedarray [a,b,c,d,e,f] and fixedsizelist type of FixedSizeList(3, Int8),
 * returns an arrow Vector<FixedSizeList<3, Int8>> like [[a,b,c], [d,e,f]] as a zero-copy
 * operation.
 */
export function makeVectorFixedSizeList<T extends arrow.Int | arrow.Float>(
  data: T["TArray"],
  type: FixedSizeList<T>,
): arrow.Vector<typeof type> {
  // arrow.makeVector (zero-copy) doesn't work with FixedSizeList.
  // arrow.vectorFromArray works great, but copies data, which we're trying to avoid.
  // arrow.makeData doesn't set data for FixedSizedList on its own. (probably could raise an Arrow.js bug for this one.)
  // So, DIY...
  if (data.length % type.listSize != 0) {
    throw new TypeError(
      `data length ${data.length} must divide evenly into list row length ${type.listSize}`,
    );
  }
  const fslData = arrow.makeData({
    offset: 0,
    length: data.length / type.listSize,
    nullCount: 0,
    type: type,
  });
  fslData.children[0] = arrow.makeData({
    offset: 0,
    length: data.length,
    nullCount: 0,
    type: type.valueType,
    // eslint-disable-next-line @typescript-eslint/ban-ts-comment
    // @ts-ignore I don't know why this isn't allowed.
    data: data,
  });
  const vec = new arrow.Vector([fslData]);
  return vec;
}
