import { merge, Observable, pipe, Subject, timer } from 'rxjs';
import { delayWhen, map, repeatWhen, retryWhen, share, takeWhile } from 'rxjs/operators';
import { webSocket, WebSocketSubject } from 'rxjs/webSocket';

import { defined } from '../utils/runtypes';

import { Utils } from '../utils/utils';

export const RESET = 'RESET' as const;
export type RESET = typeof RESET;

const RETRY_INITIAL_DELAY = 1000;
const RETRY_BACKOFF = 2;
const RETRY_MAX_DELAY = 60000;

export class RetryableWebSocket<In, Out = In> {
  // WebSocketSubject is insufficiently generic. See https://github.com/ReactiveX/rxjs/discussions/6018
  private readonly websocket: WebSocketSubject<In | Out>;
  private readonly source: Observable<In | RESET>;

  private readonly reset = new Subject<RESET>();

  constructor(
    url: string,
    onOpen = () => {
      // Nothing to do
    },
    keepTryingCheck = (_code?: number) => true
  ) {
    // In case the socket is initially closed
    let connectionRetryDelay = RETRY_INITIAL_DELAY;

    // Unfortunately WebSocketSubject doesn't treat server error codes as errors
    const errors = new Subject<CloseEvent>();

    this.websocket = webSocket({
      url: Utils.getWebSocketProtocol() + window.location.host + url,
      binaryType: 'arraybuffer',
      deserializer: mixedDeserialiser,
      openObserver: {
        next: () => {
          // reset retry delay on successful connection
          connectionRetryDelay = RETRY_INITIAL_DELAY;
          this.reset.next(RESET);
          onOpen();
        },
      },
      closeObserver: {
        next: (msg) => {
          // WebSocketSubject already throws an error for 1006
          if (msg.code > 1000 && msg.code !== 1006) {
            errors.next(msg);
          }
        },
      },
    });

    const retry = pipe(
      map((error) => {
        if (keepTryingCheck((error instanceof CloseEvent && error.code) || undefined)) {
          const delay = connectionRetryDelay;
          connectionRetryDelay = Math.min(connectionRetryDelay * RETRY_BACKOFF, RETRY_MAX_DELAY);
          return delay;
        }
        return undefined;
      }),
      takeWhile(defined),
      delayWhen(timer)
    );

    this.source = merge(
      this.websocket.asObservable() as Observable<In>,
      errors.pipe(
        // Turn all error events into real errors. This can't be done by calling errors.error() as that will also
        // complete the Subject
        map((e) => {
          throw e;
        })
      )
    ).pipe(
      // retry on error?
      retryWhen((close) => close.pipe(retry)),

      // retry on server clean disconnect?
      repeatWhen((close) => close.pipe(retry)),

      share()
    );
  }

  send(msg: Out): void {
    this.websocket.next(msg);
  }

  observe(): Observable<In | typeof RESET> {
    return merge(this.source, this.reset);
  }
}

const mixedDeserialiser = (msg: MessageEvent) => {
  // The server decides whether text or binary is sent, usually based on what we have requested.
  const data =
    msg.data instanceof ArrayBuffer
      ? // We are currently expecting binary to just be UTF-8 encoded strings.
        // Binary is prefeerd as it is more optimal on the server side.
        new TextDecoder().decode(msg.data)
      : msg.data;
  return JSON.parse(data);
};

export const withoutResetNotifications = <T>(msg: T | RESET): msg is T => msg !== RESET;
