Add videosink display driver (#27)

The videosink package provides a display driver implementing an HTTP
request handler. Client requests get an initial snapshot of the graphics
buffer and are updated further on every change.

Signed-off-by: Michael Hanselmann <public@hansmi.ch>
pull/30/head
hansmi 4 years ago committed by GitHub
parent 02831f4a67
commit 42ee8553ed
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -0,0 +1,102 @@
// Copyright 2021 The Periph Authors. All rights reserved.
// Use of this source code is governed under the Apache License, Version 2.0
// that can be found in the LICENSE file.
// Package videosink provides a display driver implementing an HTTP request
// handler. Client requests get an initial snapshot of the graphics buffer and
// are updated further on every change.
//
// The primary use case is the development of display outputs on a host
// machine. Additionally devices with network connectivity can use this driver
// to provide a copy of their local display via a web interface.
//
// The protocol used is "MJPEG" (https://en.wikipedia.org/wiki/Motion_JPEG)
// which is often used by IP cameras. Because of its better suitability for
// computer-drawn graphics the PNG image format is used by default. JPEG as
// a format can be selected via Options.Format or using the "format" URL
// parameter.
package videosink
import (
"image"
"image/color"
"image/draw"
"net/http"
"sync"
"periph.io/x/conn/v3/display"
)
// Options for videosink devices.
type Options struct {
// Width and height of the image buffer.
Width, Height int
// Format specifies the image format to send to clients.
Format ImageFormat
// TODO: Add options for JPEG and PNG encoder settings
}
type Display struct {
defaultFormat ImageFormat
mu sync.Mutex
buffer *image.RGBA
clients map[*client]struct{}
snapshot map[imageConfig][]byte
}
var _ display.Drawer = (*Display)(nil)
var _ http.Handler = (*Display)(nil)
// New creates a new videosink device instance.
func New(opt *Options) *Display {
buffer := image.NewRGBA(image.Rect(0, 0, opt.Width, opt.Height))
// By default the alpha channel is set to full transparency. The following
// draw operation makes it opaque.
draw.Draw(buffer, buffer.Bounds(), image.Black, image.Point{}, draw.Src)
return &Display{
buffer: buffer,
clients: map[*client]struct{}{},
snapshot: map[imageConfig][]byte{},
defaultFormat: opt.Format,
}
}
// String returns the name of the device.
func (d *Display) String() string {
return "VideoSink"
}
// Halt implements conn.Resource and terminates all running client requests
// asynchronously.
func (d *Display) Halt() error {
d.mu.Lock()
d.terminateClientsLocked()
d.mu.Unlock()
return nil
}
// ColorModel implements display.Drawer.
func (d *Display) ColorModel() color.Model {
return d.buffer.ColorModel()
}
// Bounds implements display.Drawer.
func (d *Display) Bounds() image.Rectangle {
return d.buffer.Bounds()
}
// Draw implements display.Drawer.
func (d *Display) Draw(dstRect image.Rectangle, src image.Image, srcPts image.Point) error {
d.mu.Lock()
draw.Draw(d.buffer, dstRect, src, srcPts, draw.Src)
d.bufferChangedLocked()
d.mu.Unlock()
return nil
}

@ -0,0 +1,15 @@
// Copyright 2021 The Periph Authors. All rights reserved.
// Use of this source code is governed under the Apache License, Version 2.0
// that can be found in the LICENSE file.
package videosink
import "testing"
func TestNewHalt(t *testing.T) {
d := New(&Options{Width: 100, Height: 100})
if err := d.Halt(); err != nil {
t.Errorf("Halt() failed: %v", err)
}
}

@ -0,0 +1,46 @@
// Copyright 2021 The Periph Authors. All rights reserved.
// Use of this source code is governed under the Apache License, Version 2.0
// that can be found in the LICENSE file.
package videosink
import (
"image/jpeg"
"image/png"
"sync"
)
var jpegOptions = jpeg.Options{
Quality: 95,
}
type pngEncoderBufferPool sync.Pool
func (p *pngEncoderBufferPool) Get() *png.EncoderBuffer {
buf, _ := (*sync.Pool)(p).Get().(*png.EncoderBuffer)
return buf
}
func (p *pngEncoderBufferPool) Put(buf *png.EncoderBuffer) {
(*sync.Pool)(p).Put(buf)
}
type pngEncoderManager struct {
once sync.Once
pool pngEncoderBufferPool
enc *png.Encoder
}
var pngEncoder pngEncoderManager
// get returns a PNG encoder with a globally shared buffer pool.
func (m *pngEncoderManager) get() *png.Encoder {
m.once.Do(func() {
m.enc = &png.Encoder{
CompressionLevel: png.BestSpeed,
BufferPool: &m.pool,
}
})
return m.enc
}

@ -0,0 +1,53 @@
// Copyright 2021 The Periph Authors. All rights reserved.
// Use of this source code is governed under the Apache License, Version 2.0
// that can be found in the LICENSE file.
package videosink
import "fmt"
type ImageFormat int
const (
PNG ImageFormat = iota
JPEG
// DefaultFormat is the format used when not set explicitly in options or
// as a URL parameter.
DefaultFormat = PNG
)
func (f ImageFormat) String() string {
switch f {
case PNG:
return "PNG"
case JPEG:
return "JPEG"
default:
return fmt.Sprint(int(f))
}
}
func (f ImageFormat) mimeType() string {
switch f {
case PNG:
return "image/png"
case JPEG:
return "image/jpeg"
}
return "application/octet-stream"
}
// ImageFormatFromString returns the ImageFormat value for the given format
// abbreviation.
func ImageFormatFromString(value string) (ImageFormat, error) {
switch value {
case "png":
return PNG, nil
case "jpg", "jpeg":
return JPEG, nil
}
return DefaultFormat, fmt.Errorf("unrecognized image format %q", value)
}

@ -0,0 +1,53 @@
// Copyright 2021 The Periph Authors. All rights reserved.
// Use of this source code is governed under the Apache License, Version 2.0
// that can be found in the LICENSE file.
package videosink
import (
"fmt"
"testing"
)
func TestImageFormat(t *testing.T) {
for _, tc := range []struct {
format ImageFormat
wantString string
wantMimeType string
}{
{
format: ImageFormat(-1),
wantString: "-1",
wantMimeType: "application/octet-stream",
},
{
wantString: "PNG",
wantMimeType: "image/png",
},
{
format: DefaultFormat,
wantString: "PNG",
wantMimeType: "image/png",
},
{
format: PNG,
wantString: "PNG",
wantMimeType: "image/png",
},
{
format: JPEG,
wantString: "JPEG",
wantMimeType: "image/jpeg",
},
} {
t.Run(fmt.Sprint(tc), func(t *testing.T) {
if got := tc.format.String(); got != tc.wantString {
t.Errorf("String() returned %q, want %q", got, tc.wantString)
}
if got := tc.format.mimeType(); got != tc.wantMimeType {
t.Errorf("mimeType() returned %q, want %q", got, tc.wantMimeType)
}
})
}
}

@ -0,0 +1,194 @@
// Copyright 2021 The Periph Authors. All rights reserved.
// Use of this source code is governed under the Apache License, Version 2.0
// that can be found in the LICENSE file.
package videosink
import (
"bytes"
"fmt"
"image/jpeg"
"log"
"mime"
"net/http"
"net/textproto"
"net/url"
"sync"
)
// bufferPool stores reusable []byte instances.
var bufferPool = sync.Pool{
New: func() interface{} {
return []byte(nil)
},
}
type imageConfig struct {
format ImageFormat
}
func (d *Display) configFromQuery(values url.Values) (imageConfig, error) {
cfg := imageConfig{
format: d.defaultFormat,
}
if value := values.Get("format"); value != "" {
if format, err := ImageFormatFromString(value); err != nil {
return imageConfig{}, err
} else {
cfg.format = format
}
}
return cfg, nil
}
type client struct {
refresh chan struct{}
terminate chan struct{}
}
func (d *Display) bufferChangedLocked() {
for cfg, buffer := range d.snapshot {
if buffer != nil {
//lint:ignore SA6002 buffer is []byte and thus pointer-like
bufferPool.Put(buffer)
}
delete(d.snapshot, cfg)
}
for c := range d.clients {
select {
case c.refresh <- struct{}{}:
default:
}
}
}
func (d *Display) terminateClientsLocked() {
for c := range d.clients {
select {
case c.terminate <- struct{}{}:
default:
}
}
}
func (d *Display) encodeBufferLocked(format ImageFormat) ([]byte, error) {
buf := bytes.NewBuffer(bufferPool.Get().([]byte)[:0])
switch format {
case PNG:
if err := pngEncoder.get().Encode(buf, d.buffer); err != nil {
return nil, err
}
case JPEG:
if err := jpeg.Encode(buf, d.buffer, &jpegOptions); err != nil {
return nil, err
}
default:
return nil, fmt.Errorf("unhandled image format %s", format)
}
return buf.Bytes(), nil
}
func (d *Display) grabSnapshot(cfg imageConfig) []byte {
d.mu.Lock()
defer d.mu.Unlock()
encoded, ok := d.snapshot[cfg]
if !ok {
var err error
encoded, err = d.encodeBufferLocked(cfg.format)
if err != nil {
panic(fmt.Sprintf("encoding image failed: %v", err))
}
d.snapshot[cfg] = encoded
}
return append(bufferPool.Get().([]byte)[:0], encoded...)
}
// ServeHTTP handles HTTP GET requests and sends a stream of images
// representing the display buffer in response. The display options control the
// default format and clients can explicitly request PNG or JPEG images using
// the "format" parameter ("?format=png", "?format=jpeg").
func (d *Display) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if err := r.Body.Close(); err != nil {
log.Printf("Closing request body failed: %v", err)
}
if r.Method != http.MethodGet {
http.Error(w, "", http.StatusMethodNotAllowed)
return
}
cfg, err := d.configFromQuery(r.URL.Query())
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
pw := makePartWriter(w)
w.Header().Set("Content-Type",
mime.FormatMediaType("multipart/x-mixed-replace", map[string]string{
"boundary": pw.boundary,
}))
c := &client{
refresh: make(chan struct{}, 1),
terminate: make(chan struct{}, 1),
}
d.mu.Lock()
d.clients[c] = struct{}{}
d.mu.Unlock()
defer func() {
d.mu.Lock()
delete(d.clients, c)
d.mu.Unlock()
}()
partHeaders := make(textproto.MIMEHeader)
partHeaders.Set("Content-Type", mime.FormatMediaType(cfg.format.mimeType(), nil))
partHeaders.Set("Content-Transfer-Encoding", "binary")
for {
payload := d.grabSnapshot(cfg)
err := pw.writeFrame(partHeaders, payload)
if payload != nil {
//lint:ignore SA6002 buffer is []byte and thus pointer-like
bufferPool.Put(payload)
}
if err != nil {
// Errors cause the request to be silently terminated. There's no
// good way to deliver an error message to the client within an
// image stream.
return
}
if flusher, ok := w.(http.Flusher); ok {
flusher.Flush()
}
// TODO: Keepalive (send an image every N seconds)
// TODO: Rate-limiting (don't send more than N per time unit)
select {
case <-c.refresh:
case <-c.terminate:
return
case <-r.Context().Done():
return
}
}
}

@ -0,0 +1,275 @@
// Copyright 2021 The Periph Authors. All rights reserved.
// Use of this source code is governed under the Apache License, Version 2.0
// that can be found in the LICENSE file.
package videosink
import (
"bytes"
"context"
"errors"
"fmt"
"image"
"image/jpeg"
"image/png"
"io"
"io/ioutil"
"mime"
"mime/multipart"
"net/http"
"net/http/httptest"
"strconv"
"strings"
"sync"
"testing"
"time"
)
type testCase struct {
name string
opt Options
target string
wantMediaType string
onImage func(*testing.T, image.Image)
}
func (tc *testCase) validatePart(t *testing.T, part *multipart.Part) {
t.Helper()
contentLength, err := strconv.ParseInt(part.Header.Get("Content-Length"), 10, 32)
if err != nil {
t.Errorf("Parsing Content-Length header failed: %v", err)
}
decodeFunc := func(io.Reader) (image.Image, error) {
return nil, errors.New("unknown image format")
}
if mediaType, _, err := mime.ParseMediaType(part.Header.Get("Content-Type")); err != nil {
t.Errorf("ParseMediaType() failed: %v", err)
} else if mediaType != tc.wantMediaType {
t.Errorf("Got content-type %q, want %q", mediaType, tc.wantMediaType)
} else {
switch mediaType {
case "image/png":
decodeFunc = png.Decode
case "image/jpeg":
decodeFunc = jpeg.Decode
}
}
if content, err := ioutil.ReadAll(part); err != nil {
t.Errorf("ReadAll() failed: %v", err)
} else if got, want := len(content), int(contentLength); got != want {
t.Errorf("Read %d bytes, Content-Length header is %d", got, want)
} else if img, err := decodeFunc(bytes.NewReader(content)); err != nil {
t.Errorf("Decoding image failed: %v", err)
} else if got, want := img.Bounds().Size(), (image.Point{tc.opt.Width, tc.opt.Height}); got != want {
t.Errorf("Got image size %v, want %v", got, want)
} else if tc.onImage != nil {
tc.onImage(t, img)
}
if err := part.Close(); err != nil {
t.Errorf("Close() failed: %v", err)
}
}
func (tc *testCase) validateResponse(t *testing.T, resp *http.Response) {
t.Helper()
if got, want := resp.StatusCode, http.StatusOK; got != want {
t.Errorf("ServeHTTP() status %d, want %d", got, want)
}
if mediaType, mediaParams, err := mime.ParseMediaType(resp.Header.Get("Content-Type")); err != nil {
t.Errorf("ParseMediaType() failed: %v", err)
} else if got, want := mediaType, "multipart/x-mixed-replace"; got != want {
t.Errorf("Content-Type is %q, want %q", got, want)
} else if boundary, ok := mediaParams["boundary"]; !(ok && len(boundary) > 50) {
t.Errorf("Insufficient boundary: %s", boundary)
} else {
mr := multipart.NewReader(resp.Body, boundary)
for {
if part, err := mr.NextPart(); errors.Is(err, io.EOF) {
break
} else if err != nil {
t.Errorf("NextPart() failed: %v", err)
} else {
tc.validatePart(t, part)
}
}
if _, err := mr.NextPart(); !(errors.Is(err, io.EOF) || strings.HasSuffix(err.Error(), " EOF")) {
t.Errorf("Reading beyond last part didn't fail with EOF: %v", err)
}
}
}
func TestMultipartResponse(t *testing.T) {
for _, tc := range []testCase{
{
name: "defaults",
opt: Options{
Width: 120,
Height: 200,
Format: DefaultFormat,
},
target: "/",
wantMediaType: "image/png",
},
{
name: "default PNG",
opt: Options{
Width: 4,
Height: 4,
Format: PNG,
},
target: "/",
wantMediaType: "image/png",
},
{
name: "default JPEG",
opt: Options{
Width: 200,
Height: 100,
Format: JPEG,
},
target: "/",
wantMediaType: "image/jpeg",
},
{
name: "format param PNG",
opt: Options{
Width: 234,
Height: 123,
Format: JPEG,
},
target: "/?format=png",
wantMediaType: "image/png",
},
{
name: "format param JPEG",
opt: Options{
Width: 123,
Height: 456,
Format: PNG,
},
target: "/?format=jpeg",
wantMediaType: "image/jpeg",
},
} {
t.Run(tc.name, func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
t.Cleanup(cancel)
d := New(&tc.opt)
srv := httptest.NewServer(d)
t.Cleanup(srv.Close)
t.Cleanup(srv.CloseClientConnections)
quit := make(chan struct{})
remaining := 10
tc.onImage = func(*testing.T, image.Image) {
if remaining == 0 {
tc.onImage = nil
defer close(quit)
if err := d.Halt(); err != nil {
t.Errorf("Halt() failed: %v", err)
}
} else {
remaining--
}
}
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for {
if err := d.Draw(d.Bounds(), image.Black, image.Point{}); err != nil {
t.Errorf("Draw() failed: %v", err)
}
select {
case <-quit:
return
case <-ctx.Done():
return
default:
}
time.Sleep(10 * time.Millisecond)
}
}()
if resp, err := srv.Client().Get(srv.URL + tc.target); err != nil {
t.Errorf("Get() failed: %v", err)
} else {
tc.validateResponse(t, resp)
}
if t.Failed() {
cancel()
}
wg.Wait()
})
}
}
func TestRequestStatus(t *testing.T) {
for _, tc := range []struct {
method string
target string
wantStatus int
}{
{
target: "/?format=",
wantStatus: http.StatusOK,
},
{
target: "/?format=bmp",
wantStatus: http.StatusBadRequest,
},
{
method: http.MethodPost,
target: "/",
wantStatus: http.StatusMethodNotAllowed,
},
} {
t.Run(fmt.Sprint(tc), func(t *testing.T) {
d := New(&Options{
Width: 16,
Height: 16,
})
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
t.Cleanup(cancel)
srv := httptest.NewServer(d)
t.Cleanup(srv.Close)
t.Cleanup(srv.CloseClientConnections)
req, err := http.NewRequestWithContext(ctx, tc.method, srv.URL+tc.target, nil)
if err != nil {
t.Errorf("NewRequest() failed: %v", err)
}
if resp, err := srv.Client().Do(req); err != nil {
t.Errorf("Get() failed: %v", err)
} else if got, want := resp.StatusCode, tc.wantStatus; got != want {
t.Errorf("Request for %s %s returned status %d (%s), want %d",
req.Method, req.URL.String(), got, resp.Status, want)
}
})
}
}

@ -0,0 +1,74 @@
// Copyright 2021 The Periph Authors. All rights reserved.
// Use of this source code is governed under the Apache License, Version 2.0
// that can be found in the LICENSE file.
package videosink
import (
"bytes"
"crypto/rand"
"fmt"
"io"
"net/textproto"
"strconv"
)
// randomBoundary generates a MIME multipart boundary compatible with RFC 2046
// (section 5.1.1).
func randomBoundary() string {
var buf [34]byte
if _, err := io.ReadFull(rand.Reader, buf[:]); err != nil {
panic(err)
}
return fmt.Sprintf("%x", buf[:])
}
type partWriter struct {
u io.Writer
boundary string
started bool
}
func makePartWriter(u io.Writer) partWriter {
return partWriter{
u: u,
boundary: randomBoundary(),
}
}
// writeFrame sends a single part of a MIME multipart entity, ensuring it's
// fully written by the time the function returns.
//
// The caller-owned headers are modified to set a Content-Length header.
//
// Go has a writer for MIME multipart messages in "mime/multipart".Writer. As
// of Go 1.17 it's not suitable for writing a neverending stream of parts where
// each must be flushed to the client with the part-ending boundary line.
func (w *partWriter) writeFrame(header textproto.MIMEHeader, body []byte) error {
header.Set("Content-Length", strconv.FormatInt(int64(len(body)), 10))
var buf bytes.Buffer
if !w.started {
fmt.Fprintf(&buf, "--%s\r\n", w.boundary)
w.started = true
}
for name := range header {
for _, value := range header[name] {
fmt.Fprintf(&buf, "%s: %s\r\n", name, value)
}
}
buf.WriteString("\r\n")
_, err := buf.WriteTo(w.u)
if err == nil {
_, err = io.Copy(w.u, bytes.NewReader(body))
if err == nil {
_, err = fmt.Fprintf(w.u, "\r\n--%s\r\n", w.boundary)
}
}
return err
}

@ -0,0 +1,20 @@
// Copyright 2021 The Periph Authors. All rights reserved.
// Use of this source code is governed under the Apache License, Version 2.0
// that can be found in the LICENSE file.
package videosink
import (
"regexp"
"testing"
)
var boundaryRe = regexp.MustCompile(`^[a-f0-9]{60,70}$`)
func TestRandomBoundary(t *testing.T) {
for i := 0; i < 100; i++ {
if got := randomBoundary(); !boundaryRe.MatchString(got) {
t.Errorf("Boundary must match the expression %q: %s", boundaryRe.String(), got)
}
}
}
Loading…
Cancel
Save