Initial vector clock implementation

drew/mqtt-clients
Drew Bednar 4 months ago
parent 7affc696f1
commit e04ca2f136

@ -0,0 +1,8 @@
# Design Considerations
- I would like one process `C` to be able to control `D`n devices remotely. To determine a partial ordering we will use a Vector Clock **per device**. So a Vector of 2 values.
- Devices never talk to each other they only ever talk with a Controller.
- Events on device takes precedence when comparing remote events. This means tie's are broken by Dn(e) winning over C(e) event when events occur concurrently.
- Example At time T1 a remote user creates an event in the controlling process to changes D1's temperature (Controller: [1,0], Device: [0,0]). A message is sent at T2 by the controller to a subscribing device (Controller: [2,0], Device: [0,0]). Before receipt of this message a second user changes D1's temperature at device time T'1 (Controller: [2,0], Device: [0,1]). The device creates a message at T'2 and sends this temperature update notification to the controller (Controller: [2,0], Device: [0,2]). On receipt of the message send by the controller we will detect an in comparable event where the message vector would be of the form T || T' because Message Vector [2,0] and local Device vector [0,2] where 2 > 0 but 0 !> 2 and the reverse also is in conflict. This means we have detected concurrent events. In this case the message command sent from the controller at T2 is discarded and D1's temp set event takes precedence. D1's time will be advanced to [2,3] and when the controller receives D1's event message it's clock will advance to [3,2].
- If the remote user sends the temperature command again. The time of message sent on C(T3) will be [4,2]. On receipt at D1 it's own clock will advance to [4,4], and the temperature will be set.

@ -8,6 +8,11 @@ Learning MQTT with Golang by doing. This repo is a simple example of using a Gol
For local development we use [Mosquitto](https://mosquitto.org/) as our MQTT broker, with TLS enabled.
```
sudo apt install mosquitto-clients
```
First generate local development certs using:
```
@ -33,6 +38,10 @@ make stop-dev
*Instructions TBD*
## Design Considerations
See [DESIGN_CONCIDERATIONS.md](./DESIGN_CONCIDERATIONS.md)
## Resources:
- [Mosquitto Docs](https://mosquitto.org/man/mosquitto-8.html)
@ -44,4 +53,6 @@ make stop-dev
- [Using Python paho MQTT client with TLS](https://cedalo.com/blog/configuring-paho-mqtt-python-client-with-examples/)
- [ESP32 Micropython MQTT with TLS](https://dev.to/bassparanoya/esp32-micropython-mqtt-tls-28fd): Note cert file format for uPy MQTT client needs to be in .der format which is a binary format.
- [TLS refresher](http://www.steves-internet-guide.com/ssl-certificates-explained/)
- [HA Mosquitto in K8s](https://sko.ai/blog/how-to-run-ha-mosquitto/)
- [HA Mosquitto in K8s](https://sko.ai/blog/how-to-run-ha-mosquitto/)
- [Lamport Timestamps Tutorial](https://towardsdatascience.com/understanding-lamport-timestamps-with-pythons-multiprocessing-library-12a6427881c6)
-[Vector Clocks Lecture](https://www.youtube.com/watch?v=x-D8iFU1d-o)

@ -0,0 +1,62 @@
package clock
// VectorClock type is used for determining the partial ordering of events in a distributed system.
type VectorClock struct {
clock []int
}
// GetClock returns a copy of the internal vector clock.
//
// This method provides a snapshot of the current state of the vector clock
// without exposing the internal slice directly. By returning a copy, it ensures
// that the original vector clock remains immutable and prevents unintended
// modifications to the internal state.
//
// Returns:
//
// []int: A copy of the internal vector clock slice, where each element
// represents the logical time for a corresponding process.
func (vc *VectorClock) GetClock() []int {
clock := make([]int, len(vc.clock))
copy(clock, vc.clock)
return clock
}
// NewVectorClock creates a new VectorClock initialized to zero.
//
// Parameters:
//
// size (int): The number of entries in the vector clock, typically
// corresponding to the number of processes or nodes.
//
// Returns:
//
// *VectorClock: A pointer to the newly created VectorClock instance, with all
// elements of the clock initialized to zero.
func NewVectorClock(size int) *VectorClock {
return &VectorClock{
clock: make([]int, size),
}
}
// ArrayToVectorClock converts an integer array into a VectorClock instance.
//
// This function creates a new VectorClock where the internal clock is initialized
// by copying the values from the provided array. The input array's values are
// treated as the logical times for each process in the vector clock.
//
// Parameters:
//
// a []int: An array of integers representing logical times for each process.
//
// Returns:
//
// *VectorClock: A pointer to a newly created VectorClock instance where the
// internal clock is a copy of the provided array.
func ArrayToVectorClock(a []int) *VectorClock {
vc := &VectorClock{
clock: make([]int, len(a)),
}
copy(vc.clock, a)
return vc
}

@ -0,0 +1,84 @@
package clock
import (
"reflect"
"testing"
)
func assertClocksEqual(t testing.TB, got, want []int) {
t.Helper()
if !reflect.DeepEqual(got, want) {
t.Errorf("Error: got %v want %v", got, want)
}
}
func TestVectorClock(t *testing.T) {
got := NewVectorClock(2)
want := VectorClock{clock: []int{0, 0}}
assertClocksEqual(t, got.GetClock(), want.GetClock())
}
func TestArrayToVectorClock(t *testing.T) {
t.Run("from empty slice", func(t *testing.T) {
got := ArrayToVectorClock([]int{})
want := VectorClock{
clock: []int{},
}
if len(got.GetClock()) != 0 {
t.Errorf("Expected VectorClock.clock to be of length zero.")
}
if !reflect.DeepEqual(got.GetClock(), want.GetClock()) {
t.Errorf("Error: got %v want %v", got.GetClock(), want.GetClock())
}
})
t.Run("from zero slice", func(t *testing.T) {
got := ArrayToVectorClock(make([]int, 6))
want := VectorClock{
clock: []int{0, 0, 0, 0, 0, 0},
}
assertClocksEqual(t, got.GetClock(), want.GetClock())
})
t.Run("from start clock", func(t *testing.T) {
want := NewVectorClock(3)
got := ArrayToVectorClock(want.GetClock())
assertClocksEqual(t, got.GetClock(), want.GetClock())
})
t.Run("from a populated clock", func(t *testing.T) {
want := VectorClock{
clock: []int{2, 3},
}
got := ArrayToVectorClock(want.GetClock())
assertClocksEqual(t, got.GetClock(), want.GetClock())
})
t.Run("from a slice that is then modified", func(t *testing.T) {
initial_a := []int{1, 2, 3, 4}
got := ArrayToVectorClock(initial_a)
want := make([]int, len(initial_a))
copy(want, initial_a)
initial_a[0] = 12
initial_a[2] = 4
assertClocksEqual(t, got.GetClock(), want)
if reflect.DeepEqual(initial_a, got.GetClock()) {
t.Errorf("Initial array %v should not be equal to clock %v", initial_a, got.GetClock())
}
})
}
Loading…
Cancel
Save