Compare commits

..

16 Commits

@ -1,19 +0,0 @@
kind: pipeline
type: docker
name: default
steps:
- name: Unit Test and lint
image: golang:1.23-bookworm
commands:
- go mod download
- go test -v ./...
- go vet ./...
group: test-lint
trigger:
event:
- pull_request
- push
branch:
- main

@ -23,7 +23,7 @@
allow_anonymous false
# allow_zero_length_clientid
# auto_id_prefix
# password_file
password_file /etc/mosquitto/passwd
# plugin
# plugin_opt_*
# psk_file
@ -233,6 +233,9 @@ allow_anonymous false
# listener port-number [ip address/host name/unix socket path]
listener 1883 0.0.0.0
# TLS Listener
listener 8883 0.0.0.0
# By default, a listener will attempt to listen on all supported IP protocol
# versions. If you do not have an IPv4 or IPv6 interface you may wish to
# disable support for either of those protocol versions. In particular, note
@ -315,10 +318,10 @@ listener 1883 0.0.0.0
# TLS encryption.
# Path to the PEM encoded server certificate.
#certfile
certfile /mosquitto/certs/server.crt
# Path to the PEM encoded keyfile.
#keyfile
keyfile /mosquitto/certs/server.key
# If you wish to control which encryption ciphers are used, use the ciphers
# option. The list of available ciphers can be optained using the "openssl
@ -359,7 +362,7 @@ listener 1883 0.0.0.0
# containing the CA certificates. For capath to work correctly, the
# certificate files must have ".crt" as the file ending and you must run
# "openssl rehash <path to capath>" each time you add/remove a certificate.
#cafile
cafile /mosquitto/certs/ca.crt
#capath

@ -0,0 +1,32 @@
# Design Considerations
- I would like one process `C` to be able to control `D`n devices remotely. To determine a partial ordering we will use a Vector Clock **per device**. So a Vector of 2 values.
- Devices never talk to each other they only ever talk with a Controller.
- Events on device takes precedence when comparing remote events. This means tie's are broken by Dn(e) winning over C(e) event when events occur concurrently.
- Example At time T1 a remote user creates an event in the controlling process to changes D1's temperature (Controller: [1,0], Device: [0,0]). A message is sent at T2 by the controller to a subscribing device (Controller: [2,0], Device: [0,0]). Before receipt of this message a second user changes D1's temperature at device time T'1 (Controller: [2,0], Device: [0,1]). The device creates a message at T'2 and sends this temperature update notification to the controller (Controller: [2,0], Device: [0,2]). On receipt of the message send by the controller we will detect an in comparable event where the message vector would be of the form T || T' because Message Vector [2,0] and local Device vector [0,2] where 2 > 0 but 0 !> 2 and the reverse also is in conflict. This means we have detected concurrent events. In this case the message command sent from the controller at T2 is discarded and D1's temp set event takes precedence. D1's time will be advanced to [2,3] and when the controller receives D1's event message it's clock will advance to [3,2].
- If the remote user sends the temperature command again. The time of message sent on C(T3) will be [4,2]. On receipt at D1 it's own clock will advance to [4,4], and the temperature will be set.
## Enabling TLS for Mosquitto
It is recommended to expose Mosquitto using TLS on port 8883 and port 8084 for Secure websockets. This implementation will only enable TLS security over TCP 8883 port. To achieve this declare a new listener for port 8883. All configuration that appears after the new listener declaration applies specifically to that listener.
So the following can be added to the end of the mosquitto.conf
```
listener 8883 0.0.0.0
cafile /mosquitto/certs/ca.crt
certfile /mosquitto/certs/server.crt
keyfile /mosquitto/certs/server.key
# By default an TLS enabled listener will operate in a similar fashion to a
# https enabled web server, in that the server has a certificate signed by a CA
# and the client will verify that it is a trusted certificate. The overall aim
# is encryption of the network traffic. By setting require_certificate to true,
# the client must provide a valid certificate in order for the network
# connection to proceed. This allows access to the broker to be controlled
# outside of the mechanisms provided by MQTT.
#require_certificate false
```
Note at this point clients are not required to provide their own certificate since we have not set `require_certificate true`. This is what we want to start with, but eventually we will want to issue certs to clients that will server not only as their client id but also as their authentication. No passkey is required to use the servery.key in this setup. If there was you would need to provide it in `keyfile_password your_passphrase_here` and the mosquitto.conf file should be limited with `chmod 600`.

@ -29,3 +29,10 @@ run-pub:
run-sub:
go run ./cmd/subscriber/main.go
.PHONEY: run-sub
dev-logs:
docker logs mosquitto -f
.PHONEY: dev-logs
test:
go test -v ./...

@ -1,16 +1,24 @@
# Learn MQTT with Go
![build-status](https://drone.runcible.io/api/badges/learning/learn_mqtt_go/status.svg)
Learning MQTT with Golang by doing. This repo is a simple example of using a Golang application as a client (pub & sub) of an MQTT broker.
1883, Eclipse Mosquitto unsecured (opening the port on the internet is not recommended).
## Architecture
![mqtt-arch](./img/mqtt-architecture.svg)
Ideally I would like this application to resemble the above design. Conceptually, there
## Development
For local development we use [Mosquitto](https://mosquitto.org/) as our MQTT broker, with TLS enabled.
```
sudo apt install mosquitto-clients
```
First generate local development certs using:
```
@ -31,10 +39,38 @@ Stop the local development environment with:
make stop-dev
```
### Testing
Unit tests can be run with:
```
make test
```
Integration tests are run via:
```
make test-integration
```
## Usage
*Instructions TBD*
To run the application end to end first start the development server:
```
make start-dev
```
Next we can start our subscriber. The following command connects to the TLS listener port.
```
go run cmd/subscriber/main.go --host 0.0.0.0 --port 8883
```
## Design Considerations
See [DESIGN_CONCIDERATIONS.md](./DESIGN_CONCIDERATIONS.md)
## Resources:
@ -48,3 +84,5 @@ make stop-dev
- [ESP32 Micropython MQTT with TLS](https://dev.to/bassparanoya/esp32-micropython-mqtt-tls-28fd): Note cert file format for uPy MQTT client needs to be in .der format which is a binary format.
- [TLS refresher](http://www.steves-internet-guide.com/ssl-certificates-explained/)
- [HA Mosquitto in K8s](https://sko.ai/blog/how-to-run-ha-mosquitto/)
- [Lamport Timestamps Tutorial](https://towardsdatascience.com/understanding-lamport-timestamps-with-pythons-multiprocessing-library-12a6427881c6)
- [Vector Clocks Lecture](https://www.youtube.com/watch?v=x-D8iFU1d-o)

@ -0,0 +1,151 @@
package clock
import (
"errors"
"fmt"
)
type Uint interface {
uint32 | uint64
}
type VectorClock[T Uint] struct {
clock []T
}
func max[T Uint](x, y T) T {
if x >= y {
return x
} else {
return y
}
}
// GetClock returns a copy of the internal vector clock.
//
// This method provides a snapshot of the current state of the vector clock
// without exposing the internal slice directly. By returning a copy, it ensures
// that the original vector clock remains immutable and prevents unintended
// modifications to the internal state.
//
// Returns:
//
// []T: A copy of the internal vector clock slice, where each element
// represents the logical time for a corresponding process. The type T
// is constrained by the Uint interface and can be either uint32 or uint64.
func (vc *VectorClock[T]) GetClock() []T {
clock := make([]T, len(vc.clock))
copy(clock, vc.clock)
return clock
}
// Sync synchronizes the current VectorClock with another VectorClock of the same type.
//
// This method takes another VectorClock instance, compares the logical times for each
// process, and updates the current VectorClock to hold the maximum logical time for
// each process. The synchronization ensures that the resulting vector clock reflects
// the latest logical times for both clocks.
//
// If the lengths of the two VectorClocks differ, an error is returned.
//
// Parameters:
//
// v VectorClock[T]: The other VectorClock instance to synchronize with. It must be
// of the same type T (either uint32 or uint64), as constrained by the Uint interface.
//
// Returns:
//
// []T: A copy of the synchronized vector clock, where each element represents the
// updated logical time for the corresponding process.
//
// error: An error is returned if the two VectorClocks have different lengths.
func (vc *VectorClock[T]) Sync(v VectorClock[T]) ([]T, error) {
compClock := v.GetClock()
if len(vc.clock) != len(compClock) {
return nil, errors.New("VectorClocks are of different lengths.")
}
for i := range vc.clock {
vc.clock[i] = max(vc.clock[i], compClock[i])
}
return vc.GetClock(), nil
}
// Increment increments the logical time at the specified index of the vector clock.
//
// This method updates the logical time for a given process (specified by the index) by
// incrementing the corresponding value in the vector clock. It ensures that the index
// is within the bounds of the vector clock. If the index is out of bounds or the vector
// clock is uninitialized, an error is returned.
//
// Parameters:
//
// index (int): The index of the process whose logical time is to be incremented.
// It must be within the range of the vector clock's length.
//
// Returns:
//
// []T: A copy of the updated vector clock after the logical time at the given index
// has been incremented.
//
// error: An error is returned if the index is out of bounds or the vector clock is uninitialized.
//
// Note: Handling of potential overflow for the underlying type T (uint32 or uint64) is currently
// not implemented and should be handled accordingly if required.
func (vc *VectorClock[T]) Increment(index int) ([]T, error) {
if index > len(vc.clock) || vc.clock == nil {
return nil, errors.New(fmt.Sprintf("Cannot access index: %d, clock is of length %d", index, len(vc.clock)))
}
// TODO handle Uint overflow?
vc.clock[index] = vc.clock[index] + 1
return vc.GetClock(), nil
}
// NewVectorClock creates a new generic VectorClock initialized to zero.
//
// This function creates a new instance of VectorClock for the specified type T,
// which can be either uint32 or uint64, depending on the needs of the application.
//
// Parameters:
//
// size (T): The number of entries in the vector clock, typically corresponding
// to the number of processes or nodes. The size is of type T, which can be uint32
// or uint64, as defined by the Uint constraint.
//
// Returns:
//
// *VectorClock[T]: A pointer to the newly created VectorClock instance, where all
// elements of the clock are initialized to zero.
func NewVectorClock[T Uint](size int) *VectorClock[T] {
return &VectorClock[T]{
clock: make([]T, size),
}
}
// ArrayToVectorClock converts an array of type T into a VectorClock instance.
//
// This function creates a new VectorClock where the internal clock is initialized
// by copying the values from the provided array. The input array's values are
// treated as the logical times for each process in the vector clock.
//
// Parameters:
//
// a []T: An array of type T (either uint32 or uint64) representing logical
// times for each process. The type T is constrained by the Uint interface, which
// ensures it is either uint32 or uint64.
//
// Returns:
//
// *VectorClock[T]: A pointer to a newly created VectorClock instance where the
// internal clock is a copy of the provided array.
func ArrayToVectorClock[T Uint](a []T) *VectorClock[T] {
vc := &VectorClock[T]{
clock: make([]T, len(a)),
}
copy(vc.clock, a)
return vc
}

@ -0,0 +1,211 @@
package clock
import (
"math"
"reflect"
"testing"
)
func assertClocksEqual[T Uint](t testing.TB, got, want []T) {
t.Helper()
if !reflect.DeepEqual(got, want) {
t.Errorf("Error: got %v want %v", got, want)
}
}
func TestVectorClock(t *testing.T) {
t.Run("test cases for uint32 type", func(t *testing.T) {
testCases := []struct {
a []uint32
b []uint32
expected []uint32
}{
{[]uint32{0, 0}, []uint32{0, 0}, []uint32{0, 0}},
{[]uint32{2, 0}, []uint32{0, 2}, []uint32{2, 2}},
{[]uint32{4, 11}, []uint32{3, 10}, []uint32{4, 11}},
{[]uint32{5, 9}, []uint32{8, 12}, []uint32{8, 12}},
{[]uint32{1, 1}, []uint32{1, 1}, []uint32{1, 1}},
{[]uint32{math.MaxUint32, 1}, []uint32{2, 1}, []uint32{4294967295, 1}},
}
for _, tc := range testCases {
vc := VectorClock[uint32]{clock: tc.a}
clock, err := vc.Sync(VectorClock[uint32]{clock: tc.b})
if err != nil {
t.Errorf("Sync should not have errored")
}
assertClocksEqual(t, clock, tc.expected)
}
})
t.Run("test cases for uint64 type", func(t *testing.T) {
testCases := []struct {
a []uint64
b []uint64
expected []uint64
}{
{[]uint64{0, 0}, []uint64{0, 0}, []uint64{0, 0}},
{[]uint64{2, 0}, []uint64{0, 2}, []uint64{2, 2}},
{[]uint64{4, 11}, []uint64{3, 10}, []uint64{4, 11}},
{[]uint64{5, 9}, []uint64{8, 12}, []uint64{8, 12}},
{[]uint64{1, 1}, []uint64{1, 1}, []uint64{1, 1}},
{[]uint64{math.MaxUint64, 1}, []uint64{2, 1}, []uint64{18446744073709551615, 1}},
}
for _, tc := range testCases {
vc := VectorClock[uint64]{clock: tc.a}
clock, err := vc.Sync(VectorClock[uint64]{clock: tc.b})
if err != nil {
t.Errorf("Sync should not have errored")
}
assertClocksEqual(t, clock, tc.expected)
}
})
t.Run("test empty new", func(t *testing.T) {
got := NewVectorClock[uint32](2)
want := VectorClock[uint32]{clock: []uint32{0, 0}}
assertClocksEqual(t, got.GetClock(), want.GetClock())
})
t.Run("test new vc for uint32", func(t *testing.T) {
vc := VectorClock[uint32]{clock: []uint32{4, 11}}
clock, err := vc.Increment(0)
if err != nil {
t.Errorf("Increment should not have errored")
}
assertClocksEqual(t, clock, []uint32{5, 11})
clock, err = vc.Increment(1)
if err != nil {
t.Errorf("Increment should not have errored")
}
assertClocksEqual(t, clock, []uint32{5, 12})
})
t.Run("test overflow condition", func(t *testing.T) {
vc := VectorClock[uint32]{clock: []uint32{math.MaxUint32, 11}}
clock, err := vc.Increment(0)
if err != nil {
t.Errorf("Increment should not have errored")
}
assertClocksEqual(t, clock, []uint32{0, 11})
})
t.Run("test index cannot be incremented", func(t *testing.T) {
vc := VectorClock[uint64]{}
clock, err := vc.Increment(0)
if clock != nil {
t.Errorf("Clock should be nil")
}
if err == nil {
t.Errorf("There should have been an error")
}
})
}
func TestNewVectorClock(t *testing.T) {
t.Run("test new vc for uint32", func(t *testing.T) {
got := NewVectorClock[uint32](2)
want := VectorClock[uint32]{clock: []uint32{0, 0}}
assertClocksEqual(t, got.GetClock(), want.GetClock())
})
t.Run("test new vc for uint64", func(t *testing.T) {
got := NewVectorClock[uint64](2)
want := VectorClock[uint64]{clock: []uint64{0, 0}}
assertClocksEqual(t, got.GetClock(), want.GetClock())
})
}
func TestArrayToVectorClock(t *testing.T) {
t.Run("from empty slice", func(t *testing.T) {
got := ArrayToVectorClock([]uint32{})
want := VectorClock[uint32]{
clock: []uint32{},
}
if len(got.GetClock()) != 0 {
t.Errorf("Expected VectorClock.clock to be of length zero.")
}
if !reflect.DeepEqual(got.GetClock(), want.GetClock()) {
t.Errorf("Error: got %v want %v", got.GetClock(), want.GetClock())
}
})
t.Run("from zero slice", func(t *testing.T) {
got := ArrayToVectorClock(make([]uint64, 6))
want := VectorClock[uint64]{
clock: []uint64{0, 0, 0, 0, 0, 0},
}
assertClocksEqual(t, got.GetClock(), want.GetClock())
})
t.Run("from start clock", func(t *testing.T) {
want := NewVectorClock[uint64](3)
got := ArrayToVectorClock(want.GetClock())
assertClocksEqual(t, got.GetClock(), want.GetClock())
})
t.Run("from a populated clock", func(t *testing.T) {
want := VectorClock[uint32]{
clock: []uint32{2, 3},
}
got := ArrayToVectorClock(want.GetClock())
assertClocksEqual(t, got.GetClock(), want.GetClock())
})
t.Run("from a slice that is then modified", func(t *testing.T) {
initial_a := []uint64{1, 2, 3, 4}
got := ArrayToVectorClock(initial_a)
want := make([]uint64, len(initial_a))
copy(want, initial_a)
initial_a[0] = 12
initial_a[2] = 4
assertClocksEqual(t, got.GetClock(), want)
if reflect.DeepEqual(initial_a, got.GetClock()) {
t.Errorf("Initial array %v should not be equal to clock %v", initial_a, got.GetClock())
}
})
}

@ -15,14 +15,14 @@ func main() {
options := mqtt.NewClientOptions()
options.AddBroker(fmt.Sprintf("tcp://%s:%s", *common.BrokerHost, *common.BrokerPort))
// options.SetClientID("go_mqtt_client")
options.SetClientID("go_mqtt_pub_client")
options.SetUsername("dirp")
options.SetPassword("dirp")
options.SetDefaultPublishHandler(func(client mqtt.Client, msg mqtt.Message) {
fmt.Printf("Received Message: %s from topic %s\n", msg.Payload(), msg.Topic())
})
options.OnConnect = func(c mqtt.Client) { fmt.Println("Connected to Broker") }
options.OnConnectionLost = func(c mqtt.Client, err error) { fmt.Printf("Connection Lost: %v", err) }
options.OnConnectionLost = func(c mqtt.Client, err error) { fmt.Printf("Connection Lost: %v\n", err) }
client := mqtt.NewClient(options)
if token := client.Connect(); token.Wait() && token.Error() != nil {
panic(token.Error())

@ -37,7 +37,8 @@ func main() {
var port = 1883
opts := mqtt.NewClientOptions()
opts.AddBroker(fmt.Sprintf("tcp://%s:%d", broker, port))
// opts.SetClientID("go_mqtt_client")
// Client ID's must be unique.
opts.SetClientID("go_mqtt_sub_client")
opts.SetUsername("dirp")
opts.SetPassword("dirp")
opts.SetDefaultPublishHandler(messagePubHandler)

@ -1,5 +1,3 @@
version: "3.7"
services:
mosquitto:
image: eclipse-mosquitto:latest
@ -8,6 +6,7 @@ services:
restart: unless-stopped
ports:
- "1883:1883"
- "8883:8883"
- "9001:9001"
volumes:
- ./.mosquitto:/etc/mosquitto

@ -2,8 +2,9 @@ module git.runcible.io/learning/learn_mqtt_go
go 1.23.1
require github.com/eclipse/paho.mqtt.golang v1.5.0
require (
github.com/eclipse/paho.mqtt.golang v1.5.0 // indirect
github.com/gorilla/websocket v1.5.3 // indirect
golang.org/x/net v0.27.0 // indirect
golang.org/x/sync v0.7.0 // indirect

File diff suppressed because one or more lines are too long

After

Width:  |  Height:  |  Size: 85 KiB

@ -1,15 +0,0 @@
package main
import (
"fmt"
"io"
"os"
)
func printMessage(w io.Writer, msg string) {
fmt.Fprint(w, msg)
}
func main() {
printMessage(os.Stdout, "Hello MQTT\n")
}

@ -1,17 +0,0 @@
package main
import (
"bytes"
"testing"
)
func TestPrintMessage(t *testing.T) {
buffer := bytes.Buffer{}
printMessage(&buffer, "Hello MQTT\n")
got := buffer.String()
want := "Hello MQTT\n"
if got != want {
t.Errorf("Error. Got %s but wanted %s", got, want)
}
}
Loading…
Cancel
Save