import { debug, isClient } from "@hornet-web-react/core/utils"
import type { AppConfig } from "./AppConfig"
import LoggerService from "./LoggerService"
import invariant from "tiny-invariant"
import type { MqttClient } from "mqtt"
import { head, propEq, uniq } from "ramda"
import { MutableRefObject } from "react"

export interface WebsocketServiceContext {
  accessToken: string | null
  profileId: string | null
}

type ConnectionStatus =
  | "Disconnected"
  | "Connecting"
  | "Connected"
  | "Reconnecting"

export type MessageListenerCallback<CallbackMessageType> = (
  message: CallbackMessageType,
  context: WebsocketServiceContext
) => void

type MessageListener<CallbackMessageType> = {
  name: string
  topic: string
  callback: MutableRefObject<MessageListenerCallback<CallbackMessageType>>
}

class WebsocketService {
  protected readonly _appConfig: AppConfig
  protected readonly _loggerService: LoggerService
  protected _context!: WebsocketServiceContext
  private _connectionStatus: ConnectionStatus = "Disconnected"
  private _client: MqttClient | null
  private _messageListeners: MessageListener<any>[] = []
  private _topicSubscriptionsQueue: string[] = []
  private _receivedMessageIds: Set<string>

  constructor(
    appConfig: AppConfig,
    loggerService: LoggerService,
    context: WebsocketServiceContext
  ) {
    debug(`WebsocketService: constructor`)

    this._appConfig = appConfig
    this._loggerService = loggerService
    this._client = null
    this._receivedMessageIds = new Set()
    this.updateContext(context)
  }

  get connectionStatus() {
    return this._connectionStatus
  }

  updateContext(context: WebsocketServiceContext) {
    const isProfileIdChanged = this._context?.profileId !== context.profileId
    this._context = context

    if (isProfileIdChanged) {
      this.createConnection().catch(console.error)
    }
  }

  addMessageListener<CallbackMessageType>(
    name: string,
    topic: string,
    callback: MutableRefObject<MessageListenerCallback<CallbackMessageType>>
  ) {
    debug(`WebsocketService: addMessageListener(${name}, ${topic}, ...)`)

    // check so we don't add the same listener twice - we could also just override
    // the instance, but this way we can be more aware of what's going on
    if (
      this._messageListeners.filter((listener) => name === listener.name)
        .length > 0
    ) {
      try {
        const error = new Error(
          `Message handler with name ${name} already exists`
        )
        this._loggerService.logExceptionWithSentry(
          error,
          this._loggerService.createLoggingContext({
            service: "WebsocketService",
            method: "addMessageListener",
          })
        )
      } catch (e) {
        //
      }

      return
    }

    const topicWithProfileId = this.addProfileIdToTopic(topic)

    // subscribe to the topic if it's not already subscribed
    const isSubscribedAlready =
      this._messageListeners.filter(
        (handler) => topicWithProfileId === handler.topic
      ).length > 0

    // add it to the listeners
    this._messageListeners = [
      ...this._messageListeners,
      { name, topic: topicWithProfileId, callback },
    ]

    // subscribe if it's first topic listener
    if (!isSubscribedAlready) {
      this.subscribeToTopic(topicWithProfileId)
    }
  }

  removeMessageListener(name: string) {
    debug(`WebsocketService: removeMessageListener(${name})`)

    const listener = head(this._messageListeners.filter(propEq("name", name)))

    if (!listener) {
      try {
        const error = new Error(
          `Message handler with name ${name} does not exist`
        )
        this._loggerService.logExceptionWithSentry(
          error,
          this._loggerService.createLoggingContext({
            service: "WebsocketService",
            method: "removeMessageListener",
          })
        )
      } catch (e) {
        //
      }
      return
    }

    // remove the listener
    this._messageListeners = this._messageListeners.filter(
      (handler) => name !== handler.name
    )

    // unsubscribe from the topic if no other listener is using it
    const isTopicUsed =
      this._messageListeners.filter(
        (handler) => listener.topic === handler.topic
      ).length > 0

    if (!isTopicUsed) {
      this.unsubscribeFromTopic(listener.topic)
    }
  }

  private addProfileIdToTopic(topic: string) {
    return topic.replace("{profileId}", this._context.profileId || "")
  }

  private checkTopicMatch(websocketTopic: string, listenerTopic: string) {
    return websocketTopic === listenerTopic
  }

  private subscribeToTopic(topic: string) {
    if (this._client) {
      this._client.subscribe(topic, { qos: 1 })
      return
    }

    this._topicSubscriptionsQueue = [...this._topicSubscriptionsQueue, topic]
  }

  private unsubscribeFromTopic(topic: string) {
    if (this._client) {
      this._client.unsubscribe(topic)
      return
    }

    // no action needed here
  }

  private async createConnection(reconnectPeriod = 1000) {
    // no action server-side
    if (!isClient) {
      return
    }

    // cleanup, if we had a previous one
    if (this._client) {
      this._client.end(true)
      this._client = null
    }

    // no client here
    if (!this._context.accessToken || !this._context.profileId) {
      return
    }

    debug(`WebsocketService: createConnection`)

    const clientId =
      window.navigator.userAgent
        .substr(0, window.navigator.userAgent.indexOf(" "))
        .replace(/[.()]/g, "-")
        .toLowerCase() +
      "-" +
      this._context.profileId +
      Date.now()

    invariant(this._appConfig.mqtt, "appConfig.mqtt is not defined")

    const mqtt = (await import("mqtt")).default

    const client = mqtt.connect(this._appConfig.mqtt.host, {
      username: this._context.accessToken,
      password: this._context.profileId,
      clientId: clientId,
      protocolId: "MQTT",
      protocolVersion: 4,
      clean: false,
      keepalive: 15,
      reconnectPeriod: reconnectPeriod,
    })

    client.on("connect", () => {
      debug("WebsocketService: connected")
      this._connectionStatus = "Connected"
    })

    client.on("disconnect", () => {
      debug("WebsocketService: disconnected")
      this._connectionStatus = "Disconnected"
    })

    client.on("offline", () => {
      debug("WebsocketService: offline")
      this._connectionStatus = "Disconnected"
    })

    let connectionErrorCounter = 0
    client.on("error", (error: any) => {
      debug(`WebsocketService: error ${connectionErrorCounter}`)
      connectionErrorCounter++
      if (error.code === 4) {
        this._connectionStatus = "Disconnected"
      }

      // after 5 retries, let's increase the reconnect period
      if (connectionErrorCounter > 5) {
        client.end(true, () => {
          connectionErrorCounter = 0
          this._client = null
          this.createConnection(reconnectPeriod * 4)
        })
      }

      // log error only once in this session
      if (reconnectPeriod === 1000 && connectionErrorCounter === 5) {
        console.error(error)
      }
    })

    client.on("message", async (topic: string, msg: Buffer) => {
      try {
        const messageData = JSON.parse(msg.toString())

        // console.log(`topic, messageData`, topic, messageData);

        debug(`WebsocketService: message received on: ${topic}`)

        const msgId = messageData.pubsub_uuid

        if (msgId && this._receivedMessageIds.has(msgId)) {
          // duplicate, do not process
          debug(`WebsocketService: duplicate ignored: ${msgId}`)
          return
        }

        // process via message listeners
        this._messageListeners.forEach((listener) => {
          if (this.checkTopicMatch(topic, listener.topic)) {
            try {
              listener.callback.current(messageData, this._context)
            } catch (error) {
              if (error instanceof Error) {
                this._loggerService.logExceptionWithSentry(
                  error,
                  this._loggerService.createLoggingContext({
                    service: "WebsocketService",
                    method: "client.on.message",
                    listener: listener.name,
                  })
                )
              }
            }
          }
        })

        if (msgId) {
          this._receivedMessageIds.add(msgId)
        }
      } catch (error) {
        if (error instanceof Error) {
          this._loggerService.logExceptionWithSentry(
            error,
            this._loggerService.createLoggingContext({
              service: "WebsocketService",
              method: "createConnection",
            })
          )
        }
      }
    })

    // if there are pending subscriptions, let's subscribe
    // to them and clear out the queue
    const queue = uniq(this._topicSubscriptionsQueue)
    if (queue.length > 0) {
      while (queue.length > 0) {
        const topic = queue.shift()
        topic && client.subscribe(topic, { qos: 1 })
      }
    }

    this._client = client

    return client
  }
}

export default WebsocketService
