import type { OperatorFunction, UnaryFunction } from 'rxjs';
import { Observable, pipe } from 'rxjs';

/** Pipe creation process for batch updating, based on a batch complete condition and a mapper
 * @param obsToBatch The observable to batch using the condition
 * @param batchCondition The condition to check if the batch is complete
 * @param batchMapper The initial mapper pipe to apply to the batched data
 */
export function getBatchedPipe<TOutputValue, TInputValue = TOutputValue>({
  batchCondition,
  batchHandlerPipe,
}: {
  /** The condition to check if the batch is complete */
  batchCondition: (value: TInputValue) => boolean;
  /** The initial mapper pipe to apply to the batched data */
  batchHandlerPipe: UnaryFunction<Observable<TInputValue[]>, Observable<TOutputValue>>;
}): OperatorFunction<TInputValue, TOutputValue> {
  const inputBatcher = (source: Observable<TInputValue>) =>
    new Observable<TInputValue[]>(observer => {
      let buffer: TInputValue[] = [];

      return source.subscribe({
        next(value) {
          buffer.push(value);

          if (batchCondition(value)) {
            observer.next(buffer);
            buffer = [];
          }
        },
        error(err) {
          observer.error(err);
        },
        complete() {
          observer.complete();
        },
      });
    });

  return pipe(inputBatcher, batchHandlerPipe);
}
