dhcgn/hm2mqtt

View on GitHub
mqtthandler/mqtthandler.go

Summary

Maintainability
A
0 mins
Test Coverage
package mqtthandler

import (
    "log"
    "time"

    "github.com/denisbrodbeck/machineid"
    "github.com/dhcgn/hm2mqtt/shared"
    mqtt "github.com/eclipse/paho.mqtt.golang"
)

// TOOO Change Impl to https://github.com/eclipse/paho.mqtt.golang/blob/43c9c445a89e7dca549a9bd445e3553dade9a2bc/cmd/docker/subscriber/main.go#L80

// Handle for send data to a broker
type Handle interface {
    SendToBroker(e shared.Event)
    Disconnect()
}

type handle struct {
    options *mqtt.ClientOptions
    client  mqtt.Client
    config  *shared.Configuration
}

const (
    autoReconnect    = true
    subscribeChannel = "hm/set/#"
)

var (
    // TODO Set Last Will
    id, err  = machineid.ProtectedID("HomeMaticMqttPlugin")
    clientID = "HomeMaticMqttPlugin_" + id[0:16]
)

// New creates a new mqtthandler to creates a client for subscription and publishing
func New(config *shared.Configuration, handler mqtt.MessageHandler) Handle {
    log.Println("Connect to Broker", config.BrokerURL, "as", clientID)
    opts := mqtt.NewClientOptions().
        AddBroker(config.BrokerURL).
        SetClientID(clientID).
        SetAutoReconnect(autoReconnect).
        SetWill("hm/bridge/state", "lost connection", 1, true)

    c := mqtt.NewClient(opts)

    if token := c.Connect(); token.Wait() && token.Error() != nil {
        panic(token.Error())
    }

    onlinetoken := c.Publish("hm/bridge/state", 1, true, "online")
    <-onlinetoken.Done()

    t := c.Subscribe(subscribeChannel, 1, handler)
    go func() {
        <-t.Done()
        if t.Error() != nil {
            log.Println("ERROR SUBSCRIBING:", t.Error())
        } else {
            log.Println("subscribed to", subscribeChannel)
        }
    }()

    return &handle{
        options: opts,
        client:  c,
        config:  config,
    }
}

func (h handle) Disconnect() {
    log.Println("Disconnect from Broker")

    onlinetoken := h.client.Publish("hm/bridge/state", 1, true, "offline")
    <-onlinetoken.Done()

    h.client.Disconnect(100)
}

func (h handle) SendToBroker(e shared.Event) {
    start := time.Now()

    topic := "hm/" + e.SerialNumber + "/" + e.Type
    token := h.client.Publish(topic, 1, h.config.Retain, e.DataValue)
    wait := token.WaitTimeout(2 * time.Second)
    err := token.Error()

    elapsed := time.Since(start)

    // Meta
    h.client.Publish(topic+"/time", 1, h.config.Retain, time.Now().Format(time.RFC3339))

    if wait && err == nil {
        log.Println("OK:    topic:", topic, "with value: ", e.DataValue, elapsed)
    } else {
        log.Println("ERROR: topic:", topic, "with value: ", e.DataValue, "Wait", wait, "Error:", err, elapsed)
        time.Sleep(500 * time.Millisecond)
    }
}