diff --git a/videosink/display.go b/videosink/display.go index cf51149..e20e99e 100644 --- a/videosink/display.go +++ b/videosink/display.go @@ -25,11 +25,14 @@ import ( "image/png" "net/http" "sync" + "time" "periph.io/x/conn/v3/display" ) const defaultJPEGQuality = 95 +const defaultKeepAliveInterval = time.Minute +const defaultMinFrameInterval = time.Second / 15 // Options for videosink devices. type Options struct { @@ -48,6 +51,16 @@ type Options struct { // encoder. Defaults to png.DefaultCompression. CompressionLevel png.CompressionLevel } + + // MinFrameInterval is the amount of time which needs to pass before + // sending a new image, thus implementing rate-limiting. Defaults to 15 + // frames per second. + MinFrameInterval time.Duration + + // KeepAliveInterval is the amount of time after which to send a new + // image regardless of whether any changes have been made. Defaults to + // once per minute. + KeepAliveInterval time.Duration } // Display is a virtual device receiving drawing operations and sending @@ -56,6 +69,8 @@ type Display struct { defaultFormat ImageFormat jpegOptions jpeg.Options pngCompressionLevel png.CompressionLevel + keepAliveInterval time.Duration + minFrameInterval time.Duration mu sync.Mutex buffer *image.RGBA @@ -77,6 +92,8 @@ func New(opt *Options) *Display { d := &Display{ jpegOptions: opt.JPEG, pngCompressionLevel: opt.PNG.CompressionLevel, + keepAliveInterval: opt.KeepAliveInterval, + minFrameInterval: opt.MinFrameInterval, buffer: buffer, clients: map[*client]struct{}{}, @@ -88,6 +105,14 @@ func New(opt *Options) *Display { d.jpegOptions.Quality = defaultJPEGQuality } + if d.keepAliveInterval == 0 { + d.keepAliveInterval = defaultKeepAliveInterval + } + + if d.minFrameInterval == 0 { + d.minFrameInterval = defaultMinFrameInterval + } + return d } diff --git a/videosink/handler.go b/videosink/handler.go index d24f476..c45b014 100644 --- a/videosink/handler.go +++ b/videosink/handler.go @@ -6,6 +6,7 @@ package videosink import ( "bytes" + "context" "fmt" "image/jpeg" "log" @@ -14,6 +15,7 @@ import ( "net/textproto" "net/url" "sync" + "time" ) // bufferPool stores reusable []byte instances. @@ -23,6 +25,17 @@ var bufferPool = sync.Pool{ }, } +// stopAndDrain ensures that the given timer is stopped and has no pending +// event. +func stopAndDrain(timer *time.Timer) { + if timer != nil && !timer.Stop() { + select { + case <-timer.C: + default: + } + } +} + type imageConfig struct { format ImageFormat } @@ -44,8 +57,58 @@ func (d *Display) configFromQuery(values url.Values) (imageConfig, error) { } type client struct { + disp *Display + refresh chan struct{} terminate chan struct{} + + mostRecent time.Time +} + +func newClient(d *Display) *client { + return &client{ + disp: d, + refresh: make(chan struct{}, 1), + terminate: make(chan struct{}, 1), + } +} + +// Wait until the next frame should be sent, either because the keep-alive +// interval has passed or a change has been made to the buffer and the rate +// limit isn't violated. +func (c *client) waitNext(ctx context.Context) bool { + earliestFrameAt := c.mostRecent.Add(c.disp.minFrameInterval) + latestFrameAt := c.mostRecent.Add(c.disp.keepAliveInterval) + + var rateLimit <-chan time.Time + keepAliveTimer := time.NewTimer(time.Until(latestFrameAt)) + + defer stopAndDrain(keepAliveTimer) + + for { + select { + case <-rateLimit: + return true + + case <-c.refresh: + if remaining := time.Until(earliestFrameAt); remaining <= 0 { + return true + } else if rateLimit == nil { + rateLimitTimer := time.NewTimer(remaining) + defer stopAndDrain(rateLimitTimer) + rateLimit = rateLimitTimer.C + } + + case <-keepAliveTimer.C: + return true + + case <-c.terminate: + return false + + case <-ctx.Done(): + return false + } + } } func (d *Display) bufferChangedLocked() { @@ -141,10 +204,7 @@ func (d *Display) ServeHTTP(w http.ResponseWriter, r *http.Request) { "boundary": pw.boundary, })) - c := &client{ - refresh: make(chan struct{}, 1), - terminate: make(chan struct{}, 1), - } + c := newClient(d) d.mu.Lock() d.clients[c] = struct{}{} @@ -176,18 +236,13 @@ func (d *Display) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } + c.mostRecent = time.Now() + if flusher, ok := w.(http.Flusher); ok { flusher.Flush() } - // TODO: Keepalive (send an image every N seconds) - // TODO: Rate-limiting (don't send more than N per time unit) - - select { - case <-c.refresh: - case <-c.terminate: - return - case <-r.Context().Done(): + if !c.waitNext(r.Context()) { return } } diff --git a/videosink/handler_test.go b/videosink/handler_test.go index 12a9678..0cfee22 100644 --- a/videosink/handler_test.go +++ b/videosink/handler_test.go @@ -14,6 +14,7 @@ import ( "image/png" "io" "io/ioutil" + "math/rand" "mime" "mime/multipart" "net/http" @@ -165,6 +166,9 @@ func TestMultipartResponse(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Minute) t.Cleanup(cancel) + tc.opt.MinFrameInterval = time.Millisecond + tc.opt.KeepAliveInterval = time.Minute + d := New(&tc.opt) srv := httptest.NewServer(d) @@ -273,3 +277,131 @@ func TestRequestStatus(t *testing.T) { }) } } + +func TestWaitNextRateLimit(t *testing.T) { + for _, tc := range []struct { + name string + interval time.Duration + }{ + { + name: "1ms", + interval: time.Millisecond, + }, + { + name: "10ms", + interval: 10 * time.Millisecond, + }, + } { + t.Run(tc.name, func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + t.Cleanup(cancel) + + opt := &Options{ + MinFrameInterval: tc.interval, + KeepAliveInterval: time.Minute, + } + + c := newClient(New(opt)) + + quit := make(chan struct{}) + + var wg sync.WaitGroup + wg.Add(1) + + go func() { + defer wg.Done() + + for { + time.Sleep(time.Duration(rand.Intn(int(tc.interval / 10)))) + + select { + case c.refresh <- struct{}{}: + case <-quit: + return + } + } + }() + + count := int(time.Second / 10 / tc.interval) + + if count < 10 { + count = 10 + } + + for i := 0; i < count; i++ { + begin := time.Now() + c.mostRecent = begin + + if got := c.waitNext(ctx); !got { + t.Errorf("waitNext() returned %v, want %v", got, true) + } + + if duration := time.Since(begin); duration < opt.MinFrameInterval { + t.Errorf("waitNext() returned after %v, want at least %v", duration, opt.MinFrameInterval) + } + } + + close(quit) + + wg.Wait() + }) + } +} + +func TestWaitNextKeepAlive(t *testing.T) { + for _, tc := range []struct { + name string + interval time.Duration + }{ + { + name: "1ms", + interval: time.Millisecond, + }, + { + name: "100ms", + interval: 100 * time.Millisecond, + }, + } { + t.Run(tc.name, func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + t.Cleanup(cancel) + + opt := &Options{ + MinFrameInterval: time.Minute, + KeepAliveInterval: tc.interval, + } + + c := newClient(New(opt)) + + begin := time.Now() + c.mostRecent = begin + + if got := c.waitNext(ctx); !got { + t.Errorf("waitNext() returned %v, want %v", got, true) + } + + if duration := time.Since(begin); duration < opt.KeepAliveInterval { + t.Errorf("waitNext() returned after %v, want at least %v", duration, opt.KeepAliveInterval) + } + }) + } +} + +func TestWaitNextTerminate(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + t.Cleanup(cancel) + + c := newClient(New(&Options{ + MinFrameInterval: time.Minute, + KeepAliveInterval: time.Minute, + })) + c.mostRecent = time.Now() + + go func() { + c.terminate <- struct{}{} + }() + + if got := c.waitNext(ctx); got { + t.Errorf("waitNext() returned %v, want %v", got, false) + } +}