import { Injectable, OnDestroy } from '@angular/core';
import { retryBackoff } from 'backoff-rxjs';
import { KeycloakService } from 'keycloak-angular';
import { Observable, Observer, Subject, Subscription } from 'rxjs';
import { delay, filter, take, throttleTime } from 'rxjs/operators';
import { WebSocketSubject, webSocket } from 'rxjs/webSocket';
import { PlotDataRequest } from 'src/app/model/backend';
import { IPojoDevice } from 'src/app/model/device/IPojoDevice';
import { ILogResult } from 'src/app/model/log';
import { INotification, Severity } from 'src/app/model/notification/notification';
import { IThingie } from 'src/app/model/thingie/thingie';
import { delayedShareReplay, throttleTimeBatched } from 'src/app/utility/rxjs';
import { DefaultConfig } from 'src/assets/default.config';
import { SubSink } from 'subsink';
import { v4 } from 'uuid';
import { LogService } from '../loggers/logger.service';

/**
 * Payload with lots of data
 * This has time as unix timestamps, because the database can quickly generate that.
 */
export interface IDataPoints {
  output: string;
  points: {
    time: number;
    data: any;
  }[];
}

export interface IDataEvent<T> {
  event: 'DataEvent';
  data: T;
  className: string;
  key?: Partial<ITaskOutputKey> & Partial<IOfflineDataKey>;
  uuidList?: string[];
}

export interface IAuthenticationEvent {
  event: 'AuthenticationEvent';
  uuid: any;
  accessToken: string;
  newTokenNeeded: boolean;
}

export interface IDeviceFilter {
  filterType: 'DeviceFilter';
  _id?: string;
  address?: string;
  deviceType?: string;
}

export interface IDeviceCreationFilter {
  filterType: 'DeviceCreationFilter';
}

export interface IDeviceDataFilter {
  filterType: 'DeviceDataFilter';
  deviceAddress?: string;
  packetType?: string;
  config?: any;
  property?: string;
  dataRequest?: PlotDataRequest;
  from?: string;
  to?: string;
}


export interface IThingieFilter {
  filterType: 'ThingieFilter';
  _id?: string;
}

export interface IThingieCreationFilter {
  filterType: 'ThingieCreationFilter';
}

export interface IThingieStateFilter {
  filterType: 'ThingieStateFilter';
}

export interface ILogFilter {
  filterType: 'LogFilter';
}

export interface INotificationFilter {
  filterType: 'NotificationFilter';
  source?: string;
  severity?: Severity;
  heading?: string;
  from?: string;
  to?: string;
}

export interface ITaskOutputFilter {
  filterType: 'TaskOutputFilter';
  thingie?: string;
  processIdx?: number;
  taskIdx?: number;
  outputName?: string;
  dataRequest?: PlotDataRequest;
  from?: string;
  to?: string;
}

export interface IOfflineDataFilter {
  filterType: 'OfflineDataFilter';
  thingie?: string;
  processIdx?: number;
  taskIdx?: number;
  inputName?: string;
  outputName?: string;
  dataRequest?: PlotDataRequest;
  from?: string;
  to?: string;
}

export interface ITaskOutputKey {
  keyType: 'taskOutput';
  task: {
    thingie?: string;
    processIdx: number;
    taskIdx: number;
    dataRequest?: PlotDataRequest;
  };
  name: string;
}

export interface IUserInputKey {
  keyType: 'userInput';
  task: {
    thingie: string;
    processIdx: number;
    taskIdx: number;
  };
  input: string;
}

export interface IOfflineDataKey {
  keyType: 'offlineData';
  task: {
    thingie: string | undefined;
    processIdx: number;
    taskIdx: number;
  };
  input: string;
}

export type IFilter =
  IDeviceFilter
  | IDeviceCreationFilter
  | IDeviceDataFilter
  | ITaskOutputFilter
  | IOfflineDataFilter
  | INotificationFilter
  | IThingieFilter
  | IThingieCreationFilter
  | IThingieStateFilter
  | ILogFilter;

interface IHeartbeatEvent {
  event: 'HeartbeatEvent';
}

const heartbeat: IHeartbeatEvent = {
  event: 'HeartbeatEvent',
};

export type IWebsocketEvent =
  IDataEvent<IThingie> |
  IDataEvent<IPojoDevice> |
  IDataEvent<IDataPoints> |
  IDataEvent<INotification> |
  IDataEvent<ILogResult> |
  IHeartbeatEvent |
  IAuthenticationEvent;

@Injectable({
  providedIn: 'root'
})
export class WebsocketConnectionService implements OnDestroy {

  /**
   * This is the handshake protocol:
   * - Send Heartbeat to check if authorisation is needed.
   * - When Heartbeat returns, all is well, subscribe
   * - All AuthorisationEvents that request a token are answered,
   *   followed by one more Heartbeat that will be answered
   *
   * Frontend ---(wants to subscribe, unauthorised)---> Backend
   *    | --------> Heartbeat ------------------------>    |
   *    |     <---- AuthorisationEvent: need token <------ |
   *    | --------> AuthorisationEvent: with token --->    |
   *    | --------> Heartbeat ------------------------>    |
   *    |     <---- Heartbeat <--------------------------- |
   *    | --------> SubscribeEvent ------------------->    |
   *    |     <---- DataEvent <--------------------------- |
   *
   * Frontend -----(wants to subscribe, authorised)---> Backend
   *    | --------> Heartbeat ------------------------>    |
   *    |     <---- Heartbeat <--------------------------- |
   *    | --------> SubscribeEvent ------------------->    |
   *    |     <---- DataEvent <--------------------------- |
   */

  instances: { [k: string]: WebSocketSubject<IWebsocketEvent> } = {};
  heartbeatQueues: { [k: string]: Subject<void> } = {};
  heartbeatQueueSubscriptions: { [k: string]: Subscription } = {};
  heartbeatSubscriptions: { [k: string]: Subscription } = {};
  reauthenticationSubscriptions: { [k: string]: Subscription } = {};
  subscriptions = new SubSink();

  ngOnDestroy(): void {
    this.subscriptions.unsubscribe();
  }

  constructor(private log: LogService, private keycloak: KeycloakService) { }

  private getSubject(uri: string) {
    // have one connection per uri
    let subject = this.instances[uri];
    // if it does not exist, connect
    if (subject === undefined) {
      subject = webSocket({
        url: uri,
        closeObserver: { next: (value) => this.log.debug('closing websocket ', value) },
      });

      const heartbeatQueue = new Subject<void>();
      this.heartbeatQueues[uri] = heartbeatQueue;
      this.heartbeatQueueSubscriptions[uri] = heartbeatQueue
        .pipe(throttleTimeBatched(200, { leading: true, trailing: true }))
        .subscribe(() => {
          subject.next(heartbeat);
        });

      this.reauthenticationSubscriptions[uri] = subject
        .pipe(filter(evt => evt.event === 'AuthenticationEvent' && evt.newTokenNeeded === true))
        .pipe(retryBackoff({
          initialInterval: 1000,
          resetOnSuccess: true
        }))
        .subscribe(() => void this.createAuthenticationEvent().then(evt => {
          subject.next(evt);
          heartbeatQueue.next();
        }));

      // keep the connection open
      // if we don't do this, there can be a timing problem when quickly resubscribing to the subject.
      // the heartbeat is implemented as an echo, so every time the frontend sends a heartbeat,
      // the backend responds with a heartbeat. As we don't really subscribe anything, the unsubscribe
      // message can be whatever, so just send another heartbeat.
      this.heartbeatSubscriptions[uri] = subject
        .multiplex(() => heartbeat, () => heartbeat, (data) => data.event === heartbeat.event)
        // If a heartbeat arrives, wait a moment and then send a heartbeat back, to receive the next one.
        .pipe(
          retryBackoff({
            initialInterval: 1000,
            resetOnSuccess: true
          }),
          throttleTime(DefaultConfig.websocket.heartbeatInterval * 0.9),
          delay(DefaultConfig.websocket.heartbeatInterval)
        )
        .subscribe(() => heartbeatQueue.next());

      this.instances[uri] = subject;
    }
    return subject;
  }

  async createAuthenticationEvent(): Promise<IWebsocketEvent> {
    try {
      await this.keycloak.updateToken();
      const token = await this.keycloak.getToken();
      if (this.keycloak.isTokenExpired()) {
        await this.keycloak.login({ redirectUri: window.location.href });
        // the `login` navigates away
      }
      return {
        event: 'AuthenticationEvent',
        accessToken: token,
        newTokenNeeded: false,
        uuid: ''
      };
    } catch (e) {
      await this.keycloak.login({ redirectUri: window.location.href });
      // the `login` navigates away, so the following return is just for completeness sake
      // and probably won't be executed.
      await this.keycloak.updateToken();
      const token = await this.keycloak.getToken();
      return {
        event: 'AuthenticationEvent',
        accessToken: token,
        newTokenNeeded: false,
        uuid: ''
      };
    }
  }

  // Custom version of the WebSocketSubject.multiplex function that, in case that the subscription
  // was rejected because the authentication was missing, authenticates and retries to subscribe.
  multiplex(
    uri: string,
    subMsg: () => any,
    unsubMsg: () => any,
    messageFilter: (value: IWebsocketEvent) => boolean
  ): Observable<any> {
    const subject = this.getSubject(uri);

    return new Observable((observer: Observer<IWebsocketEvent>) => {
      let subscribeRequest: Subscription;
      let subscription: Subscription;

      const onError = (err: unknown) => {
        // cancel pending requests
        subscribeRequest?.unsubscribe();
        subscription?.unsubscribe();
        // inform observer
        observer.error(err);
      };

      try {
        // subscribe once the heartbeat got through
        subscribeRequest = subject
          .pipe(
            filter(evt => evt.event === heartbeat.event),
            take(1)
          )
          .subscribe(() => {
            subject.next(subMsg());
          },
          (err: unknown) => onError(err));
        // send the heartbeat
        this.heartbeatQueues[uri].next();
      } catch (err) {
        onError(err);
      }

      subscription = subject.subscribe(
        (x) => {
          try {
            if (messageFilter(x)) {
              observer.next(x);
            }
          } catch (err) {
            onError(err);
          }
        },
        (err: unknown) => onError(err),
        () => observer.complete()
      );

      return () => {
        try {
          subject.next(unsubMsg());
        } catch (err) {
          onError(err);
        }
        subscribeRequest.unsubscribe();
        subscription.unsubscribe();
      };
    });
  }

  subscribeWithFilter<T>(uri: string, filter: IFilter): Observable<IDataEvent<T>> {
    const uuid = v4();
    const subRequest = {
      event: 'SubscribeEvent',
      data: filter,
      uuid,
    };
    const unSubRequest = {
      event: 'UnSubscribeEvent',
      data: filter,
      uuid,
    };

    const updateObservable = this.multiplex(
      uri,
      () => subRequest,
      () => unSubRequest,
      (data) => this.uuidFilter(data, uuid)
    ).pipe(
      retryBackoff({
        initialInterval: 1000,
        resetOnSuccess: true
      }),
      delayedShareReplay(1000, 0)
    );
    return updateObservable;
  }

  uuidFilter(data: IWebsocketEvent, uuid: string): boolean {
    if (data.event !== 'DataEvent') {
      return false;
    }
    if ((data.uuidList === undefined) || (data.uuidList.indexOf(uuid) === -1)) {
      return false;
    }
    // no objections
    return true;
  }
}
