import { Injectable } from '@angular/core';
import { ActivatedRoute, Params } from '@angular/router';
import { forkJoin, ReplaySubject } from 'rxjs';
import { take } from 'rxjs/operators';
import { Client } from 'paho-mqtt';

import { DiligentApiService } from './diligent-api.service';

@Injectable({
  providedIn: 'root',
})
export class PQubeSocketServiceService {
  private accountId: number;
  private mpId: number;
  private mWebSocket$ = new ReplaySubject(1);
  private client;
  private disconnected = true;
  private disconnectRequested: boolean;

  public get pQubeSocket(): ReplaySubject<any> {
    return this.mWebSocket$;
  }

  public set _accountId(accountId) {
    this.accountId = accountId;
  }

  public set _mpId(mpId) {
    this.mpId = mpId;
  }

  constructor(private diligent: DiligentApiService, private route: ActivatedRoute) {
    this.route.queryParamMap.pipe(take(1)).subscribe((params: Params) => {
      this.accountId = parseInt(params.get('account'), 10);
      this.mpId = parseInt(params.get('mpId'), 10);
    });
  }

  public establishSocketConnection() {
    forkJoin([
      this.diligent.getSocketTopic(this.accountId, this.mpId),
      this.diligent.getSocketUrl(),
    ])
      .pipe(take(1))
      .subscribe(([mpTopic, socketUrl]) => {
        this.client = new Client(
          socketUrl,
          'cID' + Math.floor(Date.now().valueOf() / (Math.random() * 9))
        );
        this.client.onConnected = (reconnect, URI) => {
          this.disconnectRequested = false;
          this.disconnected = false;
          this.client.subscribe(mpTopic);
        };
        this.client.onMessageDelivered = () => {};
        this.client.onMessageArrived = (message) => {
          this.pQubeSocket.next(message);
        };
        this.client.onConnectionLost = () => {
          if (!this.disconnectRequested) {
            this.establishSocketConnection();
          }
          // TODO remove once stable
          console.log('Closing meter connections');
          this.disconnected = true;
        };
        this.client.connect({
          useSSL: true,
          mqttVersion: 4,
          onSuccess: this.connectionSuccessful,
          invocationContext: { socketUrl, mpTopic },
        });
      });
  }

  public requestPQubePublish(channelArray: number[], mpId: number, accountId: number) {
    this.diligent.initiatePQubePublish(channelArray, mpId, accountId);
  }

  public get isConnected(): boolean {
    return !this.disconnected;
  }

  connectionSuccessful(invocationObject) {
    // console.log(invocationObject);
  }

  public terminateConnection(source: string) {
    this.disconnectRequested = true;
    // TODO remove once stable
    console.log(source, this.disconnectRequested);
    this.client.disconnect();
  }

  convertToObservable(payload) {
    const jsonPayload = JSON.parse(payload.toString());
    // console.log(JSON.stringify(jsonPayload));
  }
}
