import { defer, Observable, ReplaySubject, timer } from 'rxjs';
import { filter, finalize, share } from 'rxjs/operators';

import { RESET, RetryableWebSocket } from './retryable.websocket';

/**
 * Can't use the multiplexer built into rxjs' WebSocketSubject as it doesn't allow the connection to be kept open for a
 * little while after the last subscriber unsubscribes.
 */
export class MultiplexedWebSocket<In, Out> {
  private subscriptions = new ReplaySubject<Out>();

  private readonly socket: RetryableWebSocket<In, Out>;
  private readonly multiplex: Observable<In | RESET>;

  constructor(url: string) {
    this.socket = new RetryableWebSocket<In, Out>(
      url,
      // send all subscriptions once the socket is open
      () => {
        this.subscriptions.subscribe((sub) => this.socket.send(sub));
      },
      // always try to reconnect on unexpected disconnects as the socket must have listeners or it would have been
      // closed by us (in finalize below)
      () => true
    );

    this.multiplex = this.socket.observe().pipe(
      // clean up after all consumers (see below) are complete
      finalize(() => {
        this.subscriptions.unsubscribe();
        this.subscriptions = new ReplaySubject();
      }),
      // allow multiple consumers and only complete once there are no consumer for 1 second
      share({ resetOnRefCountZero: () => timer(1000) })
    );
  }

  subscribe<S extends In>(
    subscriptionMessage: unknown,
    unsubscriptionMessage: unknown,
    messageFilter: (msg: In) => msg is S
  ): Observable<S>;
  subscribe(subscriptionMessage: Out, unsubscriptionMessage: Out, messageFilter: (msg: In) => boolean): Observable<In | RESET>;
  subscribe(subscriptionMessage: Out, unsubscriptionMessage: Out, messageFilter: (msg: In) => boolean): Observable<In | RESET> {
    // send subscription message once there's a consumer
    return defer(() => {
      this.subscriptions.next(subscriptionMessage);
      return this.multiplex;
    }).pipe(
      filter((msg) => msg === RESET || messageFilter(msg)),
      // clean up once all consumers (see below) are complete
      finalize(() => this.socket.send(unsubscriptionMessage)),
      // allow multiple consumers and only complete once all those consumers are complete
      share()
    );
  }
}
