videosink: Implement rate-limiting, keep-alive

Resolve the remaining two TODOs by adding an upper limit on the frame
rate (defaults to 15 per second) and sending an image every once in
a while even when there were no changes (defaults to once a minute).

Signed-off-by: Michael Hanselmann <public@hansmi.ch>
pull/45/head
Michael Hanselmann 4 years ago committed by M-A
parent 1b1970bf7c
commit 8f521b945e

@ -25,11 +25,14 @@ import (
"image/png"
"net/http"
"sync"
"time"
"periph.io/x/conn/v3/display"
)
const defaultJPEGQuality = 95
const defaultKeepAliveInterval = time.Minute
const defaultMinFrameInterval = time.Second / 15
// Options for videosink devices.
type Options struct {
@ -48,6 +51,16 @@ type Options struct {
// encoder. Defaults to png.DefaultCompression.
CompressionLevel png.CompressionLevel
}
// MinFrameInterval is the amount of time which needs to pass before
// sending a new image, thus implementing rate-limiting. Defaults to 15
// frames per second.
MinFrameInterval time.Duration
// KeepAliveInterval is the amount of time after which to send a new
// image regardless of whether any changes have been made. Defaults to
// once per minute.
KeepAliveInterval time.Duration
}
// Display is a virtual device receiving drawing operations and sending
@ -56,6 +69,8 @@ type Display struct {
defaultFormat ImageFormat
jpegOptions jpeg.Options
pngCompressionLevel png.CompressionLevel
keepAliveInterval time.Duration
minFrameInterval time.Duration
mu sync.Mutex
buffer *image.RGBA
@ -77,6 +92,8 @@ func New(opt *Options) *Display {
d := &Display{
jpegOptions: opt.JPEG,
pngCompressionLevel: opt.PNG.CompressionLevel,
keepAliveInterval: opt.KeepAliveInterval,
minFrameInterval: opt.MinFrameInterval,
buffer: buffer,
clients: map[*client]struct{}{},
@ -88,6 +105,14 @@ func New(opt *Options) *Display {
d.jpegOptions.Quality = defaultJPEGQuality
}
if d.keepAliveInterval == 0 {
d.keepAliveInterval = defaultKeepAliveInterval
}
if d.minFrameInterval == 0 {
d.minFrameInterval = defaultMinFrameInterval
}
return d
}

@ -6,6 +6,7 @@ package videosink
import (
"bytes"
"context"
"fmt"
"image/jpeg"
"log"
@ -14,6 +15,7 @@ import (
"net/textproto"
"net/url"
"sync"
"time"
)
// bufferPool stores reusable []byte instances.
@ -23,6 +25,17 @@ var bufferPool = sync.Pool{
},
}
// stopAndDrain ensures that the given timer is stopped and has no pending
// event.
func stopAndDrain(timer *time.Timer) {
if timer != nil && !timer.Stop() {
select {
case <-timer.C:
default:
}
}
}
type imageConfig struct {
format ImageFormat
}
@ -44,8 +57,58 @@ func (d *Display) configFromQuery(values url.Values) (imageConfig, error) {
}
type client struct {
disp *Display
refresh chan struct{}
terminate chan struct{}
mostRecent time.Time
}
func newClient(d *Display) *client {
return &client{
disp: d,
refresh: make(chan struct{}, 1),
terminate: make(chan struct{}, 1),
}
}
// Wait until the next frame should be sent, either because the keep-alive
// interval has passed or a change has been made to the buffer and the rate
// limit isn't violated.
func (c *client) waitNext(ctx context.Context) bool {
earliestFrameAt := c.mostRecent.Add(c.disp.minFrameInterval)
latestFrameAt := c.mostRecent.Add(c.disp.keepAliveInterval)
var rateLimit <-chan time.Time
keepAliveTimer := time.NewTimer(time.Until(latestFrameAt))
defer stopAndDrain(keepAliveTimer)
for {
select {
case <-rateLimit:
return true
case <-c.refresh:
if remaining := time.Until(earliestFrameAt); remaining <= 0 {
return true
} else if rateLimit == nil {
rateLimitTimer := time.NewTimer(remaining)
defer stopAndDrain(rateLimitTimer)
rateLimit = rateLimitTimer.C
}
case <-keepAliveTimer.C:
return true
case <-c.terminate:
return false
case <-ctx.Done():
return false
}
}
}
func (d *Display) bufferChangedLocked() {
@ -141,10 +204,7 @@ func (d *Display) ServeHTTP(w http.ResponseWriter, r *http.Request) {
"boundary": pw.boundary,
}))
c := &client{
refresh: make(chan struct{}, 1),
terminate: make(chan struct{}, 1),
}
c := newClient(d)
d.mu.Lock()
d.clients[c] = struct{}{}
@ -176,18 +236,13 @@ func (d *Display) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}
c.mostRecent = time.Now()
if flusher, ok := w.(http.Flusher); ok {
flusher.Flush()
}
// TODO: Keepalive (send an image every N seconds)
// TODO: Rate-limiting (don't send more than N per time unit)
select {
case <-c.refresh:
case <-c.terminate:
return
case <-r.Context().Done():
if !c.waitNext(r.Context()) {
return
}
}

@ -14,6 +14,7 @@ import (
"image/png"
"io"
"io/ioutil"
"math/rand"
"mime"
"mime/multipart"
"net/http"
@ -165,6 +166,9 @@ func TestMultipartResponse(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
t.Cleanup(cancel)
tc.opt.MinFrameInterval = time.Millisecond
tc.opt.KeepAliveInterval = time.Minute
d := New(&tc.opt)
srv := httptest.NewServer(d)
@ -273,3 +277,131 @@ func TestRequestStatus(t *testing.T) {
})
}
}
func TestWaitNextRateLimit(t *testing.T) {
for _, tc := range []struct {
name string
interval time.Duration
}{
{
name: "1ms",
interval: time.Millisecond,
},
{
name: "10ms",
interval: 10 * time.Millisecond,
},
} {
t.Run(tc.name, func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
t.Cleanup(cancel)
opt := &Options{
MinFrameInterval: tc.interval,
KeepAliveInterval: time.Minute,
}
c := newClient(New(opt))
quit := make(chan struct{})
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for {
time.Sleep(time.Duration(rand.Intn(int(tc.interval / 10))))
select {
case c.refresh <- struct{}{}:
case <-quit:
return
}
}
}()
count := int(time.Second / 10 / tc.interval)
if count < 10 {
count = 10
}
for i := 0; i < count; i++ {
begin := time.Now()
c.mostRecent = begin
if got := c.waitNext(ctx); !got {
t.Errorf("waitNext() returned %v, want %v", got, true)
}
if duration := time.Since(begin); duration < opt.MinFrameInterval {
t.Errorf("waitNext() returned after %v, want at least %v", duration, opt.MinFrameInterval)
}
}
close(quit)
wg.Wait()
})
}
}
func TestWaitNextKeepAlive(t *testing.T) {
for _, tc := range []struct {
name string
interval time.Duration
}{
{
name: "1ms",
interval: time.Millisecond,
},
{
name: "100ms",
interval: 100 * time.Millisecond,
},
} {
t.Run(tc.name, func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
t.Cleanup(cancel)
opt := &Options{
MinFrameInterval: time.Minute,
KeepAliveInterval: tc.interval,
}
c := newClient(New(opt))
begin := time.Now()
c.mostRecent = begin
if got := c.waitNext(ctx); !got {
t.Errorf("waitNext() returned %v, want %v", got, true)
}
if duration := time.Since(begin); duration < opt.KeepAliveInterval {
t.Errorf("waitNext() returned after %v, want at least %v", duration, opt.KeepAliveInterval)
}
})
}
}
func TestWaitNextTerminate(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
t.Cleanup(cancel)
c := newClient(New(&Options{
MinFrameInterval: time.Minute,
KeepAliveInterval: time.Minute,
}))
c.mostRecent = time.Now()
go func() {
c.terminate <- struct{}{}
}()
if got := c.waitNext(ctx); got {
t.Errorf("waitNext() returned %v, want %v", got, false)
}
}

Loading…
Cancel
Save