import { HubConnection, HubConnectionBuilder, HubConnectionState, LogLevel } from '@aspnet/signalr';

import { LoggerService, LoggerTopic } from '@statera/sdk/logger';
import { BehaviorSubject, interval, Observable, Subject } from 'rxjs';
import { filter, finalize, switchMap, tap } from 'rxjs/operators';

import { WebsocketResponse } from './websocket.model';

interface Listener<T> {
  counter: number;
  connection: HubConnection;
  observer: Subject<WebsocketResponse<T>>;
}

export class WebsocketConnection<T> {
  private static readonly _defaultReconnectAttemptsInterval: number = 2_000;
  private static readonly _defaultReconnectAttemptsLimit: number = 10;

  private _connectionSubject: BehaviorSubject<HubConnection>;

  private _reconnectAttempt: number;

  private readonly _endpoint: string;
  private readonly _reconnectAttemptsInterval: number;
  private readonly _reconnectAttemptsLimit: number;

  private readonly _listeners: {[method: string]: Listener<T>};

  private readonly _connectionTerminatedObserver: Subject<Error>;

  private readonly _loggerService: LoggerService;

  constructor(endpoint: string, reconnectAttemptsInterval: number, reconnectAttemptsLimit: number, loggerService: LoggerService) {
    this._endpoint = endpoint;
    this._reconnectAttemptsInterval = reconnectAttemptsInterval || WebsocketConnection._defaultReconnectAttemptsInterval;
    this._reconnectAttemptsLimit = reconnectAttemptsLimit || WebsocketConnection._defaultReconnectAttemptsLimit;

    this._reconnectAttempt = 1;

    this._listeners = {};

    this._connectionTerminatedObserver = new Subject<Error>();

    this._loggerService = loggerService;
  }

  connectionTerminated(): Observable<Error> {
    return this._connectionTerminatedObserver;
  }

  listen(method: string): Observable<WebsocketResponse<T>> {
    return this._getConnection()
      .pipe(
        filter(connection => Boolean(connection)),
        switchMap(connection => {
          // Signalr has a listener limit, so we only need to keep one listener per method
          let listener = this._listeners[method];
          if (!listener || !listener.counter) {
            listener = <Listener<T>>{
              counter: 0,
              connection: connection,
              observer: new Subject<WebsocketResponse<T>>(),
            };

            this._listeners[method] = listener;

            connection.on(method, payload => {
              if (!listener || !listener.observer) {
                return;
              }

              this._loggerService
                .debug(LoggerTopic.WebSocket,
                  `Connection with '${this._endpoint}' and method '${method}' endpoint received payload`,
                  payload);

              listener.observer.next({
                model: payload,
                isPulling: false,
              });
            });
          }

          // Due to listeners limit we need to keep count of our observers
          // It will increase for each listener registered in our code and decrease when the listener is closed
          this._listeners[method].counter++;

          this._loggerService
            .debug(LoggerTopic.WebSocket, `Listener registered`, {method, listener: {...this._listeners[method]}});

          return this._listeners[method].observer;
        }),
        finalize(() => {
          const listener = this._listeners[method];
          if (!listener) {
            return;
          }

          // Decreases count of listeners
          listener.counter--;
          if (0 === listener.counter) {
            if (listener.connection) {
              listener.connection.off(method);
            }

            this._loggerService
              .debug(LoggerTopic.WebSocket, `Listener removed`, {method, listener: {...listener}});

            delete this._listeners[method];
          }

          // Closes an established connection if there are no more listeners
          const methods = Object.keys(this._listeners);
          if (!methods || !methods.length) {
            this._disconnect();
          }
        }),
      );
  }

  private _getConnection(): Observable<HubConnection> {
    // Returns an existing connection only if it is established
    if (this._connectionSubject && !this._connectionSubject.closed && !this._connectionSubject.isStopped) {
      this._loggerService
        .debug(LoggerTopic.WebSocket, `Trying to reopen connection '${this._endpoint}', switch to existing`, this._connectionSubject);

      return this._connectionSubject;
    }

    // Attempts to create a connection
    // If connection is not established observable object will not be fired
    this._connectionSubject = this._createConnection();

    return this._connectionSubject;
  }

  private _createConnection(): BehaviorSubject<HubConnection> {
    // An observable that must be returned in order to keep the established connection
    // It will be fired after after establishing a connection with the hub
    const connectionSubject = new BehaviorSubject<HubConnection>(null);

    // Builds a hub connection with the passed endpoint and a preconfigured logger
    const hubConnection = new HubConnectionBuilder()
      .configureLogging(LogLevel.None)
      .withUrl(this._endpoint)
      .build();

    let unsafeUnlock: () => void = (() => null);

    // Unlocks locked tab
    const unlock: () => void = () => {
      if (typeof unsafeUnlock === 'function') {
        unsafeUnlock();

        this._loggerService.debug(LoggerTopic.WebSocket, `Tab unlocked`);
      }
    };

    // Locks browser tab to prevent from freezing
    // For more information see: https://developer.mozilla.org/en-US/docs/Web/API/Web_Locks_API
    const unsafeBrowserAPI = <Navigator & { locks?: any }>navigator;
    if (unsafeBrowserAPI && unsafeBrowserAPI.locks && typeof unsafeBrowserAPI.locks.request === 'function') {
      const now = Date.now();

      unsafeBrowserAPI.locks.request(`statera_websocket_lock_${now}`, { mode: 'shared' }, () => {
        return new Promise(resolve => unsafeUnlock = <any>resolve);
      });

      this._loggerService.debug(LoggerTopic.WebSocket, `Tab locked due to open websocket connection`);
    }

    // Connects to the hub and fires the connectionSubject after the connection is established
    hubConnection
      .start()
      .then(() => {
        this._loggerService
          .debug(LoggerTopic.WebSocket, `Connection with '${this._endpoint}' endpoint opened`);

        connectionSubject.next(hubConnection);
      });

    // Tracks closing of connection
    // If the connection is closed without any error, this does nothing,
    // but if an error occurs it starts the reconnect process
    hubConnection
      .onclose(error => {
        if (!error) {
          unlock();

          this._loggerService
            .debug(LoggerTopic.WebSocket, `Connection with '${this._endpoint}' endpoint closed`);

          return;
        }

        this._loggerService
          .error(LoggerTopic.WebSocket, `Connection with '${this._endpoint}' endpoint closed due to an error`, error);

        // Attempts to establish connection every {_reconnectAttemptsInterval * _reconnectionAttempt}ms
        // until the {_reconnectAttemptsLimit} limit is reached
        const startInterval = (delay: number) => {
          let intervalSubscription = interval(delay)
            .pipe(
              tap(() => {
                if (intervalSubscription) {
                  intervalSubscription.unsubscribe();
                  intervalSubscription = null;
                }

                this._loggerService.debug(
                  LoggerTopic.WebSocket,
                  `Attempts to establish connection with '${this._endpoint}' endpoint`,
                  {
                    attempt: this._reconnectAttempt,
                    limit: this._reconnectAttemptsLimit,
                  },
                );

                const increaseAttemptAndRestartInterval = () => {
                  // Checks the limit of attempts, when the limit is reached
                  // unlocks the browser tab and starts {_connectionTerminatedObserver}
                  if (this._reconnectAttemptsLimit <= this._reconnectAttempt) {
                    this._loggerService.error(
                      LoggerTopic.WebSocket,
                      `Connection with '${this._endpoint}' endpoint closed due to reached reconnection limit`,
                    );

                    unlock();

                    // Fires an event to notify all modules that a WebSocket connection was terminated with an error
                    this._connectionTerminatedObserver.next(error);

                    return;
                  }

                  this._reconnectAttempt += 1;

                  // Restarts interval with increased delay
                  startInterval(this._reconnectAttemptsInterval * (this._reconnectAttempt));
                };

                hubConnection
                  .start()
                  .then(() => {
                    if (hubConnection.state !== HubConnectionState.Connected) {
                      increaseAttemptAndRestartInterval();
                      return;
                    }

                    this._reconnectAttempt = 1;

                    this._loggerService
                      .debug(LoggerTopic.WebSocket, `Connection with '${this._endpoint}' endpoint restored`);

                    // Starts pulling process for all methods on this endpoint
                    if (this._listeners) {
                      const methods = Object.keys(this._listeners);

                      for (let i = 0, num = methods.length; i < num; i++) {
                        const method = methods[i];
                        const listener = this._listeners[method];

                        if (!listener || !listener.observer) {
                          continue;
                        }

                        listener.observer.next({
                          model: null,
                          isPulling: true,
                        });
                      }
                    }
                  })
                  .catch(() => increaseAttemptAndRestartInterval());
              }),
            )
            .subscribe();
        };

        startInterval(0); // starts immediately
      });

    return connectionSubject;
  }

  private _disconnect(): void {
    if (!this._connectionSubject || !this._connectionSubject.value) {
      return;
    }

    const hubConnection = this._connectionSubject.value;

    // Closes an established connection and connection observer
    hubConnection
      .stop()
      .then(() => {
        if (!this._connectionSubject) {
          return;
        }

        this._connectionSubject.complete();
        this._connectionSubject = null;
      })
      .catch(error => {
        this._loggerService
          .error(LoggerTopic.WebSocket, `Cannot disconnect from '${this._endpoint}' due to an error`, error);
      });
  }
}
