import mqtt from "mqtt";
import * as mqttTopics from "@common/constants/mqttTopics";
import * as mqttQos from "@common/constants/mqttQos";
import NOTIFICATION_TYPES from "@common/constants/notificationTypes";
import notification_pb from "../util/protobuf/notification_pb";
import log_pb from "../util/protobuf/log_pb";
import { BehaviorSubject, Subject } from "rxjs";
import { filter } from "rxjs/operators";

/**
 * Service containing all businesslogic to communicate with the MQTT Broker
 * And exposes rxjs streams to the components
 */
class MqttService {
  constructor() {
    this.channel = null;
    this.subscriptions = [];
    this.connected = false;
    this.connected$ = new BehaviorSubject();
    this.notification$ = new Subject();
    this.frequentLog$ = new Subject();
    this.frequentLogResponse$ = new BehaviorSubject();
    this.forceReconnect = false;

    this.clientID = null;
    this.username = null;
    this.password = null;
  }

  get connectedMQTT() {
    return this.channel ? this.channel.connected : false;
  }
  /**
   * Exposes notifications stream filtered by machine uuid
   * @param {string} machineUUID
   */
  notificationsByMachine$(machineUUID) {
    return this.notification$.pipe(
      filter(notification => notification.device == machineUUID)
    );
  }

  /**
   * Exposes stream that emits values when a notification is of type
   * COMMAND_EXECUTED
   */
  notificationsExecuted() {
    return this.notification$.pipe(
      filter(
        notification =>
          notification.type === NOTIFICATION_TYPES.NOT_COMMAND_EXECUTED
      )
    );
  }

  /**
   * Exposes stream that emits values when a notification is of type
   * COMMAND_EXECUTED and the provided sequence number matches the
   * notifications sequence number
   * @param {Number} sequence
   */
  notificationsExecutedWithSequence(sequence) {
    return this.notification$.pipe(
      filter(
        notification =>
          notification.type === NOTIFICATION_TYPES.NOT_COMMAND_EXECUTED
      ),
      filter(notification => notification.commandSeq === sequence)
    );
  }

  /**
   * Exposes frequent log stream filtered by machine uuid
   * TODO: NOT WORKING ATM
   * @param {string} machineUUID
   */
  frequentLogByMachine$(machineUUID) {
    return this.frequentLog$.pipe(
      filter(frequentlog => frequentlog.processSummaryDevice == machineUUID)
    );
  }

  /**
   * Connect to the mqtt broker
   * @param {string} clientID a v4 UUID
   * @param {string} username
   * @param {string} password
   */
  connect(clientID, username, password) {
    this.clientID = clientID;
    this.username = username;
    this.password = password;

    console.dev(`connecting with:`);
    console.dev(`client id: ${this.clientID}`);
    console.dev(`username: ${this.username}`);
    console.dev(`password: ${this.password}`);
    const options = {
      username: this.username,
      password: this.password,
      clientId: this.clientID,
      reconnectPeriod: 5000,
      connectTimeout: 30 * 5000,
      will: {
        topic: mqttTopics.lastWillTopic + this.clientID,
        payload: "offline",
        qos: mqttQos.atMostOnce,
        retain: false
      }
    };

    this.channel = mqtt.connect(process.env.MQTT_URL, options);
    this.channel.on("connect", () => this._handleConnect());
    this.channel.on("close", () => this._handleClose());
    this.channel.on("error", error => this._handleError(error));
    this.channel.on("reconnect", () => this._handleReconnect());
    this.channel.on("message", (topic, message) =>
      this.handleMessage({ topic, message })
    );
  }

  reconnect(password) {
    console.dev(`MQTT: reconnect`);
    this.password = password;
    if (this.channel) {
      this.channel.end(false, () => {
        this.forceReconnect = true;
        this.connect(this.clientID, this.username, this.password);
      });
    }
  }

  end(callback) {
    this.channel.end(false, () => {
      this.subscriptions = [];
      callback();
    });
  }

  publish(topic, data) {
    console.dev(`publishing to ${topic}`);
    this.channel.publish(topic, data);
  }

  publishRequestFrequentLogs(deviceMACAddress, clientUUID) {
    let logRequest = new log_pb.LogRequest();
    logRequest.setDeviceId(clientUUID);
    logRequest.setStartOrStop(true); // true = start sending logs, false = stop sending logs
    // logRequest.setDumpHistory(true);
    logRequest.setExtendedLog(false);
    const bytes = logRequest.serializeBinary();

    this.publish(mqttTopics.logRequestTopic + deviceMACAddress, bytes);
  }

  publishStopFrequentLogs(deviceMACAddress, clientUUID) {
    let logRequest = new log_pb.LogRequest();
    logRequest.setDeviceId(clientUUID);
    logRequest.setStartOrStop(false); // true = start sending logs, false = stop sending logs
    // logRequest.setDumpHistory(true);
    logRequest.setExtendedLog(false);
    const bytes = logRequest.serializeBinary();

    this.publish(mqttTopics.logRequestTopic + deviceMACAddress, bytes);
  }

  subscribe(topic) {
    console.dev(`subscribing to ${topic}`);
    if (!this.subscriptions.includes(topic)) {
      this.subscriptions.push(topic);
      this.channel.subscribe(topic);
    } else {
      console.dev("already subscribed to topic");
    }
  }

  subscribeToNotifications(UserUuid) {
    this.subscribe(mqttTopics.notificationTopic + UserUuid);
  }

  subscribeFrequentLog(deviceUuid) {
    this.subscribe(mqttTopics.logsTopic + deviceUuid);
  }

  unsubscribeFrequentLog(deviceUuid) {
    this.unsubscribe(mqttTopics.logsTopic + deviceUuid);
  }

  unsubscribeAllFrequentLog() {
    this.subscriptions.forEach(subscription => {
      if (subscription.indexOf(mqttTopics.logsTopic) !== -1) {
        this.unsubscribe(subscription);
      }
    });
  }

  subscribeFrequentLogResp(deviceUuid) {
    this.subscribe(mqttTopics.logRequestResponseTopic + deviceUuid);
  }

  unsubscribeFrequentLogResp(deviceUuid) {
    this.unsubscribe(mqttTopics.logRequestResponseTopic + deviceUuid);
  }

  unsubscribe(topic) {
    if (this.subscriptions.includes(topic)) {
      console.dev(`subscribing to ${topic}`);

      this.channel.unsubscribe(topic);
      this.subscriptions.splice(this.subscriptions.indexOf(topic), 1);
    } else {
      console.dev("can't unsubscribe because no subscription is present");
    }
  }

  handleMessage(action) {
    const isNotification =
      action.topic.indexOf("backend/notifications/") !== -1;
    let isLog = action.topic.indexOf(mqttTopics.logsTopic) !== -1;
    let isLogRequestResponse =
      action.topic.indexOf(mqttTopics.logRequestResponseTopic) !== -1;

    // If the received message is a notification, send a response that we received the notification
    if (isNotification) {
      const userUUID = action.topic.substr(action.topic.lastIndexOf("/") + 1);

      // Deserialize notification
      let message;
      try {
        message = notification_pb.Notification.deserializeBinary(
          new Uint8Array(action.message)
        ).toObject();
      } catch (err) {
        console.log("NOTIFICATION ERROR");
      }

      // Create notification response
      let notResponse = new notification_pb.NotificationResp();
      notResponse.setSeq(message.seq);
      const bytes = notResponse.serializeBinary();

      // Perhaps dispatch a new action instead of calling channel here
      this.channel.publish(
        mqttTopics.notificationResponseTopic + userUUID,
        bytes
      );

      // Inform observers that a notification was received
      this.notification$.next(message);
    } else if (isLog) {
      const message = log_pb.Logs.deserializeBinary(
        new Uint8Array(action.message)
      ).toObject();
      this.frequentLog$.next(message);
    } else if (isLogRequestResponse) {
      let message;
      try {
        message = log_pb.LogRequestResp.deserializeBinary(
          new Uint8Array(action.message)
        ).toObject();
      } catch (err) {
        console.log("LOG REQUEST RESPONSE ERROR");
      }

      this.frequentLogResponse$.next(message);
    }
  }

  exit() {}

  _generateUUID = () => {
    let d = new Date().getTime();
    let uuid = "xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx".replace(/[xy]/g, function(
      c
    ) {
      let r = (d + Math.random() * 16) % 16 | 0;
      d = Math.floor(d / 16);
      return (c == "x" ? r : (r & 0x3) | 0x8).toString(16);
    });
    return uuid;
  };

  _handleConnect() {
    this.connected = true;
    this.connected$.next(true);

    // If it is a forced reconnect, this can happen when a token is expired
    // we should handle the reconnecting to the appropriate channels ourselves
    if (this.forceReconnect === true) {
      this.forceReconnect = false;
      const subscriptionsCopy = [...this.subscriptions];
      this.subscriptions = [];
      subscriptionsCopy.forEach(subscription => {
        this.subscribe(subscription);
      });
    }
  }
  _handleReconnect() {
    console.dev("reconnecting");
  }
  _handleClose() {
    this.connected = false;
    this.connected$.next(false);
    console.dev("disconnect");
  }
  _handleError(error) {
    this.connected = false;
    this.connected$.next(false);
    console.error(error);
  }

  isConnected() {
    return this.channel && this.channel.connected;
  }
}

const mqttService = new MqttService();
export default mqttService;
