Compare commits
16 Commits
main
...
drew/mqtt-
Author | SHA1 | Date |
---|---|---|
|
42ca31273c | 12 months ago |
|
612f507af8 | 12 months ago |
|
5481f39c2b | 12 months ago |
|
11839e6c7c | 12 months ago |
|
519d5f2a95 | 1 year ago |
|
cf0a8d3049 | 1 year ago |
|
1ddac8b8bf | 1 year ago |
|
872213900a | 1 year ago |
|
c2cc2e6a81 | 1 year ago |
|
e05c1fd788 | 1 year ago |
|
e04ca2f136 | 1 year ago |
![]() |
7affc696f1 | 1 year ago |
|
d9506f6ba5 | 1 year ago |
|
44810ce030 | 1 year ago |
|
245f99d333 | 1 year ago |
|
217f05223d | 1 year ago |
@ -1,19 +0,0 @@
|
||||
kind: pipeline
|
||||
type: docker
|
||||
name: default
|
||||
|
||||
steps:
|
||||
- name: Unit Test and lint
|
||||
image: golang:1.23-bookworm
|
||||
commands:
|
||||
- go mod download
|
||||
- go test -v ./...
|
||||
- go vet ./...
|
||||
group: test-lint
|
||||
|
||||
trigger:
|
||||
event:
|
||||
- pull_request
|
||||
- push
|
||||
branch:
|
||||
- main
|
@ -0,0 +1,32 @@
|
||||
# Design Considerations
|
||||
|
||||
|
||||
- I would like one process `C` to be able to control `D`n devices remotely. To determine a partial ordering we will use a Vector Clock **per device**. So a Vector of 2 values.
|
||||
- Devices never talk to each other they only ever talk with a Controller.
|
||||
- Events on device takes precedence when comparing remote events. This means tie's are broken by Dn(e) winning over C(e) event when events occur concurrently.
|
||||
- Example At time T1 a remote user creates an event in the controlling process to changes D1's temperature (Controller: [1,0], Device: [0,0]). A message is sent at T2 by the controller to a subscribing device (Controller: [2,0], Device: [0,0]). Before receipt of this message a second user changes D1's temperature at device time T'1 (Controller: [2,0], Device: [0,1]). The device creates a message at T'2 and sends this temperature update notification to the controller (Controller: [2,0], Device: [0,2]). On receipt of the message send by the controller we will detect an in comparable event where the message vector would be of the form T || T' because Message Vector [2,0] and local Device vector [0,2] where 2 > 0 but 0 !> 2 and the reverse also is in conflict. This means we have detected concurrent events. In this case the message command sent from the controller at T2 is discarded and D1's temp set event takes precedence. D1's time will be advanced to [2,3] and when the controller receives D1's event message it's clock will advance to [3,2].
|
||||
- If the remote user sends the temperature command again. The time of message sent on C(T3) will be [4,2]. On receipt at D1 it's own clock will advance to [4,4], and the temperature will be set.
|
||||
|
||||
|
||||
## Enabling TLS for Mosquitto
|
||||
|
||||
It is recommended to expose Mosquitto using TLS on port 8883 and port 8084 for Secure websockets. This implementation will only enable TLS security over TCP 8883 port. To achieve this declare a new listener for port 8883. All configuration that appears after the new listener declaration applies specifically to that listener.
|
||||
|
||||
So the following can be added to the end of the mosquitto.conf
|
||||
```
|
||||
listener 8883 0.0.0.0
|
||||
cafile /mosquitto/certs/ca.crt
|
||||
certfile /mosquitto/certs/server.crt
|
||||
keyfile /mosquitto/certs/server.key
|
||||
|
||||
# By default an TLS enabled listener will operate in a similar fashion to a
|
||||
# https enabled web server, in that the server has a certificate signed by a CA
|
||||
# and the client will verify that it is a trusted certificate. The overall aim
|
||||
# is encryption of the network traffic. By setting require_certificate to true,
|
||||
# the client must provide a valid certificate in order for the network
|
||||
# connection to proceed. This allows access to the broker to be controlled
|
||||
# outside of the mechanisms provided by MQTT.
|
||||
#require_certificate false
|
||||
```
|
||||
|
||||
Note at this point clients are not required to provide their own certificate since we have not set `require_certificate true`. This is what we want to start with, but eventually we will want to issue certs to clients that will server not only as their client id but also as their authentication. No passkey is required to use the servery.key in this setup. If there was you would need to provide it in `keyfile_password your_passphrase_here` and the mosquitto.conf file should be limited with `chmod 600`.
|
@ -0,0 +1,151 @@
|
||||
package clock
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
)
|
||||
|
||||
type Uint interface {
|
||||
uint32 | uint64
|
||||
}
|
||||
|
||||
type VectorClock[T Uint] struct {
|
||||
clock []T
|
||||
}
|
||||
|
||||
func max[T Uint](x, y T) T {
|
||||
if x >= y {
|
||||
return x
|
||||
} else {
|
||||
return y
|
||||
}
|
||||
}
|
||||
|
||||
// GetClock returns a copy of the internal vector clock.
|
||||
//
|
||||
// This method provides a snapshot of the current state of the vector clock
|
||||
// without exposing the internal slice directly. By returning a copy, it ensures
|
||||
// that the original vector clock remains immutable and prevents unintended
|
||||
// modifications to the internal state.
|
||||
//
|
||||
// Returns:
|
||||
//
|
||||
// []T: A copy of the internal vector clock slice, where each element
|
||||
// represents the logical time for a corresponding process. The type T
|
||||
// is constrained by the Uint interface and can be either uint32 or uint64.
|
||||
func (vc *VectorClock[T]) GetClock() []T {
|
||||
clock := make([]T, len(vc.clock))
|
||||
copy(clock, vc.clock)
|
||||
return clock
|
||||
}
|
||||
|
||||
// Sync synchronizes the current VectorClock with another VectorClock of the same type.
|
||||
//
|
||||
// This method takes another VectorClock instance, compares the logical times for each
|
||||
// process, and updates the current VectorClock to hold the maximum logical time for
|
||||
// each process. The synchronization ensures that the resulting vector clock reflects
|
||||
// the latest logical times for both clocks.
|
||||
//
|
||||
// If the lengths of the two VectorClocks differ, an error is returned.
|
||||
//
|
||||
// Parameters:
|
||||
//
|
||||
// v VectorClock[T]: The other VectorClock instance to synchronize with. It must be
|
||||
// of the same type T (either uint32 or uint64), as constrained by the Uint interface.
|
||||
//
|
||||
// Returns:
|
||||
//
|
||||
// []T: A copy of the synchronized vector clock, where each element represents the
|
||||
// updated logical time for the corresponding process.
|
||||
//
|
||||
// error: An error is returned if the two VectorClocks have different lengths.
|
||||
func (vc *VectorClock[T]) Sync(v VectorClock[T]) ([]T, error) {
|
||||
compClock := v.GetClock()
|
||||
|
||||
if len(vc.clock) != len(compClock) {
|
||||
return nil, errors.New("VectorClocks are of different lengths.")
|
||||
}
|
||||
|
||||
for i := range vc.clock {
|
||||
vc.clock[i] = max(vc.clock[i], compClock[i])
|
||||
}
|
||||
|
||||
return vc.GetClock(), nil
|
||||
}
|
||||
|
||||
// Increment increments the logical time at the specified index of the vector clock.
|
||||
//
|
||||
// This method updates the logical time for a given process (specified by the index) by
|
||||
// incrementing the corresponding value in the vector clock. It ensures that the index
|
||||
// is within the bounds of the vector clock. If the index is out of bounds or the vector
|
||||
// clock is uninitialized, an error is returned.
|
||||
//
|
||||
// Parameters:
|
||||
//
|
||||
// index (int): The index of the process whose logical time is to be incremented.
|
||||
// It must be within the range of the vector clock's length.
|
||||
//
|
||||
// Returns:
|
||||
//
|
||||
// []T: A copy of the updated vector clock after the logical time at the given index
|
||||
// has been incremented.
|
||||
//
|
||||
// error: An error is returned if the index is out of bounds or the vector clock is uninitialized.
|
||||
//
|
||||
// Note: Handling of potential overflow for the underlying type T (uint32 or uint64) is currently
|
||||
// not implemented and should be handled accordingly if required.
|
||||
func (vc *VectorClock[T]) Increment(index int) ([]T, error) {
|
||||
if index > len(vc.clock) || vc.clock == nil {
|
||||
return nil, errors.New(fmt.Sprintf("Cannot access index: %d, clock is of length %d", index, len(vc.clock)))
|
||||
}
|
||||
|
||||
// TODO handle Uint overflow?
|
||||
vc.clock[index] = vc.clock[index] + 1
|
||||
return vc.GetClock(), nil
|
||||
|
||||
}
|
||||
|
||||
// NewVectorClock creates a new generic VectorClock initialized to zero.
|
||||
//
|
||||
// This function creates a new instance of VectorClock for the specified type T,
|
||||
// which can be either uint32 or uint64, depending on the needs of the application.
|
||||
//
|
||||
// Parameters:
|
||||
//
|
||||
// size (T): The number of entries in the vector clock, typically corresponding
|
||||
// to the number of processes or nodes. The size is of type T, which can be uint32
|
||||
// or uint64, as defined by the Uint constraint.
|
||||
//
|
||||
// Returns:
|
||||
//
|
||||
// *VectorClock[T]: A pointer to the newly created VectorClock instance, where all
|
||||
// elements of the clock are initialized to zero.
|
||||
func NewVectorClock[T Uint](size int) *VectorClock[T] {
|
||||
return &VectorClock[T]{
|
||||
clock: make([]T, size),
|
||||
}
|
||||
}
|
||||
|
||||
// ArrayToVectorClock converts an array of type T into a VectorClock instance.
|
||||
//
|
||||
// This function creates a new VectorClock where the internal clock is initialized
|
||||
// by copying the values from the provided array. The input array's values are
|
||||
// treated as the logical times for each process in the vector clock.
|
||||
//
|
||||
// Parameters:
|
||||
//
|
||||
// a []T: An array of type T (either uint32 or uint64) representing logical
|
||||
// times for each process. The type T is constrained by the Uint interface, which
|
||||
// ensures it is either uint32 or uint64.
|
||||
//
|
||||
// Returns:
|
||||
//
|
||||
// *VectorClock[T]: A pointer to a newly created VectorClock instance where the
|
||||
// internal clock is a copy of the provided array.
|
||||
func ArrayToVectorClock[T Uint](a []T) *VectorClock[T] {
|
||||
vc := &VectorClock[T]{
|
||||
clock: make([]T, len(a)),
|
||||
}
|
||||
copy(vc.clock, a)
|
||||
return vc
|
||||
}
|
@ -0,0 +1,211 @@
|
||||
package clock
|
||||
|
||||
import (
|
||||
"math"
|
||||
"reflect"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func assertClocksEqual[T Uint](t testing.TB, got, want []T) {
|
||||
t.Helper()
|
||||
|
||||
if !reflect.DeepEqual(got, want) {
|
||||
t.Errorf("Error: got %v want %v", got, want)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestVectorClock(t *testing.T) {
|
||||
|
||||
t.Run("test cases for uint32 type", func(t *testing.T) {
|
||||
testCases := []struct {
|
||||
a []uint32
|
||||
b []uint32
|
||||
expected []uint32
|
||||
}{
|
||||
{[]uint32{0, 0}, []uint32{0, 0}, []uint32{0, 0}},
|
||||
{[]uint32{2, 0}, []uint32{0, 2}, []uint32{2, 2}},
|
||||
{[]uint32{4, 11}, []uint32{3, 10}, []uint32{4, 11}},
|
||||
{[]uint32{5, 9}, []uint32{8, 12}, []uint32{8, 12}},
|
||||
{[]uint32{1, 1}, []uint32{1, 1}, []uint32{1, 1}},
|
||||
{[]uint32{math.MaxUint32, 1}, []uint32{2, 1}, []uint32{4294967295, 1}},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
vc := VectorClock[uint32]{clock: tc.a}
|
||||
|
||||
clock, err := vc.Sync(VectorClock[uint32]{clock: tc.b})
|
||||
|
||||
if err != nil {
|
||||
t.Errorf("Sync should not have errored")
|
||||
}
|
||||
|
||||
assertClocksEqual(t, clock, tc.expected)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("test cases for uint64 type", func(t *testing.T) {
|
||||
testCases := []struct {
|
||||
a []uint64
|
||||
b []uint64
|
||||
expected []uint64
|
||||
}{
|
||||
{[]uint64{0, 0}, []uint64{0, 0}, []uint64{0, 0}},
|
||||
{[]uint64{2, 0}, []uint64{0, 2}, []uint64{2, 2}},
|
||||
{[]uint64{4, 11}, []uint64{3, 10}, []uint64{4, 11}},
|
||||
{[]uint64{5, 9}, []uint64{8, 12}, []uint64{8, 12}},
|
||||
{[]uint64{1, 1}, []uint64{1, 1}, []uint64{1, 1}},
|
||||
{[]uint64{math.MaxUint64, 1}, []uint64{2, 1}, []uint64{18446744073709551615, 1}},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
vc := VectorClock[uint64]{clock: tc.a}
|
||||
|
||||
clock, err := vc.Sync(VectorClock[uint64]{clock: tc.b})
|
||||
|
||||
if err != nil {
|
||||
t.Errorf("Sync should not have errored")
|
||||
}
|
||||
|
||||
assertClocksEqual(t, clock, tc.expected)
|
||||
}
|
||||
|
||||
})
|
||||
|
||||
t.Run("test empty new", func(t *testing.T) {
|
||||
|
||||
got := NewVectorClock[uint32](2)
|
||||
want := VectorClock[uint32]{clock: []uint32{0, 0}}
|
||||
|
||||
assertClocksEqual(t, got.GetClock(), want.GetClock())
|
||||
})
|
||||
|
||||
t.Run("test new vc for uint32", func(t *testing.T) {
|
||||
vc := VectorClock[uint32]{clock: []uint32{4, 11}}
|
||||
|
||||
clock, err := vc.Increment(0)
|
||||
|
||||
if err != nil {
|
||||
t.Errorf("Increment should not have errored")
|
||||
}
|
||||
|
||||
assertClocksEqual(t, clock, []uint32{5, 11})
|
||||
|
||||
clock, err = vc.Increment(1)
|
||||
|
||||
if err != nil {
|
||||
t.Errorf("Increment should not have errored")
|
||||
}
|
||||
|
||||
assertClocksEqual(t, clock, []uint32{5, 12})
|
||||
|
||||
})
|
||||
|
||||
t.Run("test overflow condition", func(t *testing.T) {
|
||||
vc := VectorClock[uint32]{clock: []uint32{math.MaxUint32, 11}}
|
||||
|
||||
clock, err := vc.Increment(0)
|
||||
|
||||
if err != nil {
|
||||
t.Errorf("Increment should not have errored")
|
||||
}
|
||||
|
||||
assertClocksEqual(t, clock, []uint32{0, 11})
|
||||
|
||||
})
|
||||
|
||||
t.Run("test index cannot be incremented", func(t *testing.T) {
|
||||
vc := VectorClock[uint64]{}
|
||||
|
||||
clock, err := vc.Increment(0)
|
||||
|
||||
if clock != nil {
|
||||
t.Errorf("Clock should be nil")
|
||||
}
|
||||
|
||||
if err == nil {
|
||||
t.Errorf("There should have been an error")
|
||||
}
|
||||
|
||||
})
|
||||
}
|
||||
|
||||
func TestNewVectorClock(t *testing.T) {
|
||||
t.Run("test new vc for uint32", func(t *testing.T) {
|
||||
|
||||
got := NewVectorClock[uint32](2)
|
||||
want := VectorClock[uint32]{clock: []uint32{0, 0}}
|
||||
|
||||
assertClocksEqual(t, got.GetClock(), want.GetClock())
|
||||
})
|
||||
|
||||
t.Run("test new vc for uint64", func(t *testing.T) {
|
||||
|
||||
got := NewVectorClock[uint64](2)
|
||||
want := VectorClock[uint64]{clock: []uint64{0, 0}}
|
||||
|
||||
assertClocksEqual(t, got.GetClock(), want.GetClock())
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
func TestArrayToVectorClock(t *testing.T) {
|
||||
|
||||
t.Run("from empty slice", func(t *testing.T) {
|
||||
got := ArrayToVectorClock([]uint32{})
|
||||
want := VectorClock[uint32]{
|
||||
clock: []uint32{},
|
||||
}
|
||||
|
||||
if len(got.GetClock()) != 0 {
|
||||
t.Errorf("Expected VectorClock.clock to be of length zero.")
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(got.GetClock(), want.GetClock()) {
|
||||
t.Errorf("Error: got %v want %v", got.GetClock(), want.GetClock())
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("from zero slice", func(t *testing.T) {
|
||||
got := ArrayToVectorClock(make([]uint64, 6))
|
||||
want := VectorClock[uint64]{
|
||||
clock: []uint64{0, 0, 0, 0, 0, 0},
|
||||
}
|
||||
|
||||
assertClocksEqual(t, got.GetClock(), want.GetClock())
|
||||
})
|
||||
|
||||
t.Run("from start clock", func(t *testing.T) {
|
||||
want := NewVectorClock[uint64](3)
|
||||
got := ArrayToVectorClock(want.GetClock())
|
||||
|
||||
assertClocksEqual(t, got.GetClock(), want.GetClock())
|
||||
})
|
||||
|
||||
t.Run("from a populated clock", func(t *testing.T) {
|
||||
want := VectorClock[uint32]{
|
||||
clock: []uint32{2, 3},
|
||||
}
|
||||
got := ArrayToVectorClock(want.GetClock())
|
||||
assertClocksEqual(t, got.GetClock(), want.GetClock())
|
||||
})
|
||||
|
||||
t.Run("from a slice that is then modified", func(t *testing.T) {
|
||||
initial_a := []uint64{1, 2, 3, 4}
|
||||
got := ArrayToVectorClock(initial_a)
|
||||
|
||||
want := make([]uint64, len(initial_a))
|
||||
copy(want, initial_a)
|
||||
|
||||
initial_a[0] = 12
|
||||
initial_a[2] = 4
|
||||
|
||||
assertClocksEqual(t, got.GetClock(), want)
|
||||
|
||||
if reflect.DeepEqual(initial_a, got.GetClock()) {
|
||||
t.Errorf("Initial array %v should not be equal to clock %v", initial_a, got.GetClock())
|
||||
}
|
||||
|
||||
})
|
||||
|
||||
}
|
File diff suppressed because one or more lines are too long
After Width: | Height: | Size: 85 KiB |
@ -1,15 +0,0 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
)
|
||||
|
||||
func printMessage(w io.Writer, msg string) {
|
||||
fmt.Fprint(w, msg)
|
||||
}
|
||||
|
||||
func main() {
|
||||
printMessage(os.Stdout, "Hello MQTT\n")
|
||||
}
|
@ -1,17 +0,0 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestPrintMessage(t *testing.T) {
|
||||
buffer := bytes.Buffer{}
|
||||
printMessage(&buffer, "Hello MQTT\n")
|
||||
got := buffer.String()
|
||||
want := "Hello MQTT\n"
|
||||
|
||||
if got != want {
|
||||
t.Errorf("Error. Got %s but wanted %s", got, want)
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue