import { Observable, Subscriber, bufferTime, filter, mergeAll, scan, share, toArray } from "rxjs";
import Immutable from "immutable";
import { SensorReading, SensorType } from "../../proto/common";
import { DeviceServices } from "./DeviceManager";
import { Duration } from "moment";
import moment from "moment";

export interface ISensorReadingsSource {
  readonly data: Immutable.Map<SensorType, Observable<SensorReading[]>>;
  readonly isStreaming: boolean;
}

export class StaticSensorReadingsSource implements ISensorReadingsSource {
  private readonly deviceId: string;
  private readonly sensorTypes: SensorType[];
  private readonly deviceServices: DeviceServices;

  public readonly data: Immutable.Map<SensorType, Observable<SensorReading[]>>;
  public readonly isStreaming = false

  private readonly source$ = new Observable<SensorReading>(x => { this.loadReadings(x) }).pipe(share())

  constructor(deviceId: string, sensorTypes: SensorType[], deviceServices: DeviceServices) {
    this.deviceId = deviceId
    this.sensorTypes = sensorTypes
    this.deviceServices = deviceServices

    const map = new Map<SensorType, Observable<SensorReading[]>>()

    for (const sensorType of sensorTypes) {
      map.set(sensorType, this.source$.pipe(
        filter(x => x.sensorType == sensorType),
        toArray()
      ))
    }

    this.data = Immutable.Map<SensorType, Observable<SensorReading[]>>(map)
  }

  private async loadReadings(obs: Subscriber<SensorReading>) {
    console.log(`Fetching sensor readings for ${this.deviceId}, ${this.sensorTypes}`)
    // Although this is a one-shot request, it can take a long time if there is a lot of data
    const abortController = new AbortController()
    obs.add(() => {
      abortController.abort()
    })
    try {
      const result = await this.deviceServices.sensorClient.getReadings(
        {
          deviceId: this.deviceId,
          sensorTypes: this.sensorTypes,
          periodMs: BigInt(moment.duration(6 * 30, 'days').asMilliseconds()),
          aggregationWindowMs: BigInt(moment.duration(1, 'hours').asMilliseconds()),
        },
        {
          meta: await this.deviceServices.getMeta(),
          abort: abortController.signal,
        }
      )
      // console.log("Got readings", result.response.readings)
      for (const reading of result.response.readings) {
        obs.next(reading)
      }
      obs.complete()
    } catch (error) {
      console.log("Error loading readings: ", error);
    }
  }
}

export class StreamingSensorReadingsSource implements ISensorReadingsSource {
  private readonly deviceServices: DeviceServices;
  private readonly deviceId: string;
  private readonly duration: moment.Duration;
  private readonly sensorTypes: SensorType[];

  // When we first load, we get a lot of points close together. Buffer these, so we don't redraw after every point
  private readonly source$ = new Observable<SensorReading>((obs) => {
    this.getReadings(obs)
  }).pipe(bufferTime(200), mergeAll(), share())

  public readonly data: Immutable.Map<SensorType, Observable<SensorReading[]>>
  public readonly isStreaming = true

  constructor(deviceId: string, sensorTypes: SensorType[], duration: Duration, deviceServices: DeviceServices) {
    this.deviceId = deviceId
    this.duration = duration
    this.sensorTypes = sensorTypes
    this.deviceServices = deviceServices

    const map = new Map<SensorType, Observable<SensorReading[]>>()

    for (const sensorType of sensorTypes) {
      map.set(sensorType, this.source$.pipe(
        filter(x => x.sensorType == sensorType),
        scan((acc: SensorReading[], item) => {
          const threshold = moment().subtract(duration)
          // The javascript engine has probably optimized this, so use it then remove the few
          // stale values from the end (which probably doesn't require reallocation)
          var result = [item, ...acc]
          while (result.length > 0 && moment(Number(result[result.length - 1].timestamp!.milliseconds)) < threshold) {
            result.pop()
          }
          return result
        }, [])
      ))
    }

    this.data = Immutable.Map<SensorType, Observable<SensorReading[]>>(map)
  }

  private async getReadings(obs: Subscriber<SensorReading>) {
    console.log(`Fetching sensor readings: ${this.deviceId}, ${this.sensorTypes}`)
    const abortController = new AbortController()
    obs.add(() => {
      abortController.abort()
    })

    const call = this.deviceServices.sensorClient.streamReadings(
      {
        deviceId: this.deviceId,
        sensorTypes: this.sensorTypes,
        initialPeriodMs: BigInt(this.duration.asMilliseconds()),
      },
      {
        meta: await this.deviceServices.getMeta(),
        abort: abortController.signal,
      }
    )
    call.responses.onMessage(x => {
      obs.next(x.reading!)
    })
  }
}