From 031904a3e5d183505ba5ab4ada1496a3e9f52440 Mon Sep 17 00:00:00 2001 From: Michael Hanselmann Date: Wed, 1 Dec 2021 20:40:01 +0100 Subject: [PATCH] Add videosink display driver The videosink package provides a display driver implementing an HTTP request handler. Client requests get an initial snapshot of the graphics buffer and are updated further on every change. Signed-off-by: Michael Hanselmann --- videosink/display.go | 102 ++++++++++++++ videosink/display_test.go | 15 +++ videosink/encoder.go | 46 +++++++ videosink/format.go | 53 ++++++++ videosink/format_test.go | 53 ++++++++ videosink/handler.go | 194 +++++++++++++++++++++++++++ videosink/handler_test.go | 275 ++++++++++++++++++++++++++++++++++++++ videosink/writer.go | 74 ++++++++++ videosink/writer_test.go | 20 +++ 9 files changed, 832 insertions(+) create mode 100644 videosink/display.go create mode 100644 videosink/display_test.go create mode 100644 videosink/encoder.go create mode 100644 videosink/format.go create mode 100644 videosink/format_test.go create mode 100644 videosink/handler.go create mode 100644 videosink/handler_test.go create mode 100644 videosink/writer.go create mode 100644 videosink/writer_test.go diff --git a/videosink/display.go b/videosink/display.go new file mode 100644 index 0000000..05d67d8 --- /dev/null +++ b/videosink/display.go @@ -0,0 +1,102 @@ +// Copyright 2021 The Periph Authors. All rights reserved. +// Use of this source code is governed under the Apache License, Version 2.0 +// that can be found in the LICENSE file. + +// Package videosink provides a display driver implementing an HTTP request +// handler. Client requests get an initial snapshot of the graphics buffer and +// are updated further on every change. +// +// The primary use case is the development of display outputs on a host +// machine. Additionally devices with network connectivity can use this driver +// to provide a copy of their local display via a web interface. +// +// The protocol used is "MJPEG" (https://en.wikipedia.org/wiki/Motion_JPEG) +// which is often used by IP cameras. Because of its better suitability for +// computer-drawn graphics the PNG image format is used by default. JPEG as +// a format can be selected via Options.Format or using the "format" URL +// parameter. +package videosink + +import ( + "image" + "image/color" + "image/draw" + "net/http" + "sync" + + "periph.io/x/conn/v3/display" +) + +// Options for videosink devices. +type Options struct { + // Width and height of the image buffer. + Width, Height int + + // Format specifies the image format to send to clients. + Format ImageFormat + + // TODO: Add options for JPEG and PNG encoder settings +} + +type Display struct { + defaultFormat ImageFormat + + mu sync.Mutex + buffer *image.RGBA + clients map[*client]struct{} + snapshot map[imageConfig][]byte +} + +var _ display.Drawer = (*Display)(nil) +var _ http.Handler = (*Display)(nil) + +// New creates a new videosink device instance. +func New(opt *Options) *Display { + buffer := image.NewRGBA(image.Rect(0, 0, opt.Width, opt.Height)) + + // By default the alpha channel is set to full transparency. The following + // draw operation makes it opaque. + draw.Draw(buffer, buffer.Bounds(), image.Black, image.Point{}, draw.Src) + + return &Display{ + buffer: buffer, + clients: map[*client]struct{}{}, + snapshot: map[imageConfig][]byte{}, + defaultFormat: opt.Format, + } +} + +// String returns the name of the device. +func (d *Display) String() string { + return "VideoSink" +} + +// Halt implements conn.Resource and terminates all running client requests +// asynchronously. +func (d *Display) Halt() error { + d.mu.Lock() + d.terminateClientsLocked() + d.mu.Unlock() + + return nil +} + +// ColorModel implements display.Drawer. +func (d *Display) ColorModel() color.Model { + return d.buffer.ColorModel() +} + +// Bounds implements display.Drawer. +func (d *Display) Bounds() image.Rectangle { + return d.buffer.Bounds() +} + +// Draw implements display.Drawer. +func (d *Display) Draw(dstRect image.Rectangle, src image.Image, srcPts image.Point) error { + d.mu.Lock() + draw.Draw(d.buffer, dstRect, src, srcPts, draw.Src) + d.bufferChangedLocked() + d.mu.Unlock() + + return nil +} diff --git a/videosink/display_test.go b/videosink/display_test.go new file mode 100644 index 0000000..d5004bc --- /dev/null +++ b/videosink/display_test.go @@ -0,0 +1,15 @@ +// Copyright 2021 The Periph Authors. All rights reserved. +// Use of this source code is governed under the Apache License, Version 2.0 +// that can be found in the LICENSE file. + +package videosink + +import "testing" + +func TestNewHalt(t *testing.T) { + d := New(&Options{Width: 100, Height: 100}) + + if err := d.Halt(); err != nil { + t.Errorf("Halt() failed: %v", err) + } +} diff --git a/videosink/encoder.go b/videosink/encoder.go new file mode 100644 index 0000000..ec6c66c --- /dev/null +++ b/videosink/encoder.go @@ -0,0 +1,46 @@ +// Copyright 2021 The Periph Authors. All rights reserved. +// Use of this source code is governed under the Apache License, Version 2.0 +// that can be found in the LICENSE file. + +package videosink + +import ( + "image/jpeg" + "image/png" + "sync" +) + +var jpegOptions = jpeg.Options{ + Quality: 95, +} + +type pngEncoderBufferPool sync.Pool + +func (p *pngEncoderBufferPool) Get() *png.EncoderBuffer { + buf, _ := (*sync.Pool)(p).Get().(*png.EncoderBuffer) + return buf +} + +func (p *pngEncoderBufferPool) Put(buf *png.EncoderBuffer) { + (*sync.Pool)(p).Put(buf) +} + +type pngEncoderManager struct { + once sync.Once + pool pngEncoderBufferPool + enc *png.Encoder +} + +var pngEncoder pngEncoderManager + +// get returns a PNG encoder with a globally shared buffer pool. +func (m *pngEncoderManager) get() *png.Encoder { + m.once.Do(func() { + m.enc = &png.Encoder{ + CompressionLevel: png.BestSpeed, + BufferPool: &m.pool, + } + }) + + return m.enc +} diff --git a/videosink/format.go b/videosink/format.go new file mode 100644 index 0000000..defc83c --- /dev/null +++ b/videosink/format.go @@ -0,0 +1,53 @@ +// Copyright 2021 The Periph Authors. All rights reserved. +// Use of this source code is governed under the Apache License, Version 2.0 +// that can be found in the LICENSE file. + +package videosink + +import "fmt" + +type ImageFormat int + +const ( + PNG ImageFormat = iota + JPEG + + // DefaultFormat is the format used when not set explicitly in options or + // as a URL parameter. + DefaultFormat = PNG +) + +func (f ImageFormat) String() string { + switch f { + case PNG: + return "PNG" + case JPEG: + return "JPEG" + default: + return fmt.Sprint(int(f)) + } +} + +func (f ImageFormat) mimeType() string { + switch f { + case PNG: + return "image/png" + case JPEG: + return "image/jpeg" + } + + return "application/octet-stream" +} + +// ImageFormatFromString returns the ImageFormat value for the given format +// abbreviation. +func ImageFormatFromString(value string) (ImageFormat, error) { + switch value { + case "png": + return PNG, nil + case "jpg", "jpeg": + return JPEG, nil + } + + return DefaultFormat, fmt.Errorf("unrecognized image format %q", value) +} diff --git a/videosink/format_test.go b/videosink/format_test.go new file mode 100644 index 0000000..de2c34d --- /dev/null +++ b/videosink/format_test.go @@ -0,0 +1,53 @@ +// Copyright 2021 The Periph Authors. All rights reserved. +// Use of this source code is governed under the Apache License, Version 2.0 +// that can be found in the LICENSE file. + +package videosink + +import ( + "fmt" + "testing" +) + +func TestImageFormat(t *testing.T) { + for _, tc := range []struct { + format ImageFormat + wantString string + wantMimeType string + }{ + { + format: ImageFormat(-1), + wantString: "-1", + wantMimeType: "application/octet-stream", + }, + { + wantString: "PNG", + wantMimeType: "image/png", + }, + { + format: DefaultFormat, + wantString: "PNG", + wantMimeType: "image/png", + }, + { + format: PNG, + wantString: "PNG", + wantMimeType: "image/png", + }, + { + format: JPEG, + wantString: "JPEG", + wantMimeType: "image/jpeg", + }, + } { + t.Run(fmt.Sprint(tc), func(t *testing.T) { + if got := tc.format.String(); got != tc.wantString { + t.Errorf("String() returned %q, want %q", got, tc.wantString) + } + + if got := tc.format.mimeType(); got != tc.wantMimeType { + t.Errorf("mimeType() returned %q, want %q", got, tc.wantMimeType) + } + }) + } +} diff --git a/videosink/handler.go b/videosink/handler.go new file mode 100644 index 0000000..9189b64 --- /dev/null +++ b/videosink/handler.go @@ -0,0 +1,194 @@ +// Copyright 2021 The Periph Authors. All rights reserved. +// Use of this source code is governed under the Apache License, Version 2.0 +// that can be found in the LICENSE file. + +package videosink + +import ( + "bytes" + "fmt" + "image/jpeg" + "log" + "mime" + "net/http" + "net/textproto" + "net/url" + "sync" +) + +// bufferPool stores reusable []byte instances. +var bufferPool = sync.Pool{ + New: func() interface{} { + return []byte(nil) + }, +} + +type imageConfig struct { + format ImageFormat +} + +func (d *Display) configFromQuery(values url.Values) (imageConfig, error) { + cfg := imageConfig{ + format: d.defaultFormat, + } + + if value := values.Get("format"); value != "" { + if format, err := ImageFormatFromString(value); err != nil { + return imageConfig{}, err + } else { + cfg.format = format + } + } + + return cfg, nil +} + +type client struct { + refresh chan struct{} + terminate chan struct{} +} + +func (d *Display) bufferChangedLocked() { + for cfg, buffer := range d.snapshot { + if buffer != nil { + //lint:ignore SA6002 buffer is []byte and thus pointer-like + bufferPool.Put(buffer) + } + + delete(d.snapshot, cfg) + } + + for c := range d.clients { + select { + case c.refresh <- struct{}{}: + default: + } + } +} + +func (d *Display) terminateClientsLocked() { + for c := range d.clients { + select { + case c.terminate <- struct{}{}: + default: + } + } +} + +func (d *Display) encodeBufferLocked(format ImageFormat) ([]byte, error) { + buf := bytes.NewBuffer(bufferPool.Get().([]byte)[:0]) + + switch format { + case PNG: + if err := pngEncoder.get().Encode(buf, d.buffer); err != nil { + return nil, err + } + + case JPEG: + if err := jpeg.Encode(buf, d.buffer, &jpegOptions); err != nil { + return nil, err + } + + default: + return nil, fmt.Errorf("unhandled image format %s", format) + } + + return buf.Bytes(), nil +} + +func (d *Display) grabSnapshot(cfg imageConfig) []byte { + d.mu.Lock() + defer d.mu.Unlock() + + encoded, ok := d.snapshot[cfg] + if !ok { + var err error + + encoded, err = d.encodeBufferLocked(cfg.format) + if err != nil { + panic(fmt.Sprintf("encoding image failed: %v", err)) + } + d.snapshot[cfg] = encoded + } + + return append(bufferPool.Get().([]byte)[:0], encoded...) +} + +// ServeHTTP handles HTTP GET requests and sends a stream of images +// representing the display buffer in response. The display options control the +// default format and clients can explicitly request PNG or JPEG images using +// the "format" parameter ("?format=png", "?format=jpeg"). +func (d *Display) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if err := r.Body.Close(); err != nil { + log.Printf("Closing request body failed: %v", err) + } + + if r.Method != http.MethodGet { + http.Error(w, "", http.StatusMethodNotAllowed) + return + } + + cfg, err := d.configFromQuery(r.URL.Query()) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + pw := makePartWriter(w) + + w.Header().Set("Content-Type", + mime.FormatMediaType("multipart/x-mixed-replace", map[string]string{ + "boundary": pw.boundary, + })) + + c := &client{ + refresh: make(chan struct{}, 1), + terminate: make(chan struct{}, 1), + } + + d.mu.Lock() + d.clients[c] = struct{}{} + d.mu.Unlock() + + defer func() { + d.mu.Lock() + delete(d.clients, c) + d.mu.Unlock() + }() + + partHeaders := make(textproto.MIMEHeader) + partHeaders.Set("Content-Type", mime.FormatMediaType(cfg.format.mimeType(), nil)) + partHeaders.Set("Content-Transfer-Encoding", "binary") + + for { + payload := d.grabSnapshot(cfg) + err := pw.writeFrame(partHeaders, payload) + + if payload != nil { + //lint:ignore SA6002 buffer is []byte and thus pointer-like + bufferPool.Put(payload) + } + + if err != nil { + // Errors cause the request to be silently terminated. There's no + // good way to deliver an error message to the client within an + // image stream. + return + } + + if flusher, ok := w.(http.Flusher); ok { + flusher.Flush() + } + + // TODO: Keepalive (send an image every N seconds) + // TODO: Rate-limiting (don't send more than N per time unit) + + select { + case <-c.refresh: + case <-c.terminate: + return + case <-r.Context().Done(): + return + } + } +} diff --git a/videosink/handler_test.go b/videosink/handler_test.go new file mode 100644 index 0000000..12a9678 --- /dev/null +++ b/videosink/handler_test.go @@ -0,0 +1,275 @@ +// Copyright 2021 The Periph Authors. All rights reserved. +// Use of this source code is governed under the Apache License, Version 2.0 +// that can be found in the LICENSE file. + +package videosink + +import ( + "bytes" + "context" + "errors" + "fmt" + "image" + "image/jpeg" + "image/png" + "io" + "io/ioutil" + "mime" + "mime/multipart" + "net/http" + "net/http/httptest" + "strconv" + "strings" + "sync" + "testing" + "time" +) + +type testCase struct { + name string + opt Options + target string + wantMediaType string + + onImage func(*testing.T, image.Image) +} + +func (tc *testCase) validatePart(t *testing.T, part *multipart.Part) { + t.Helper() + + contentLength, err := strconv.ParseInt(part.Header.Get("Content-Length"), 10, 32) + if err != nil { + t.Errorf("Parsing Content-Length header failed: %v", err) + } + + decodeFunc := func(io.Reader) (image.Image, error) { + return nil, errors.New("unknown image format") + } + + if mediaType, _, err := mime.ParseMediaType(part.Header.Get("Content-Type")); err != nil { + t.Errorf("ParseMediaType() failed: %v", err) + } else if mediaType != tc.wantMediaType { + t.Errorf("Got content-type %q, want %q", mediaType, tc.wantMediaType) + } else { + switch mediaType { + case "image/png": + decodeFunc = png.Decode + case "image/jpeg": + decodeFunc = jpeg.Decode + } + } + + if content, err := ioutil.ReadAll(part); err != nil { + t.Errorf("ReadAll() failed: %v", err) + } else if got, want := len(content), int(contentLength); got != want { + t.Errorf("Read %d bytes, Content-Length header is %d", got, want) + } else if img, err := decodeFunc(bytes.NewReader(content)); err != nil { + t.Errorf("Decoding image failed: %v", err) + } else if got, want := img.Bounds().Size(), (image.Point{tc.opt.Width, tc.opt.Height}); got != want { + t.Errorf("Got image size %v, want %v", got, want) + } else if tc.onImage != nil { + tc.onImage(t, img) + } + + if err := part.Close(); err != nil { + t.Errorf("Close() failed: %v", err) + } +} + +func (tc *testCase) validateResponse(t *testing.T, resp *http.Response) { + t.Helper() + + if got, want := resp.StatusCode, http.StatusOK; got != want { + t.Errorf("ServeHTTP() status %d, want %d", got, want) + } + + if mediaType, mediaParams, err := mime.ParseMediaType(resp.Header.Get("Content-Type")); err != nil { + t.Errorf("ParseMediaType() failed: %v", err) + } else if got, want := mediaType, "multipart/x-mixed-replace"; got != want { + t.Errorf("Content-Type is %q, want %q", got, want) + } else if boundary, ok := mediaParams["boundary"]; !(ok && len(boundary) > 50) { + t.Errorf("Insufficient boundary: %s", boundary) + } else { + mr := multipart.NewReader(resp.Body, boundary) + + for { + if part, err := mr.NextPart(); errors.Is(err, io.EOF) { + break + } else if err != nil { + t.Errorf("NextPart() failed: %v", err) + } else { + tc.validatePart(t, part) + } + } + + if _, err := mr.NextPart(); !(errors.Is(err, io.EOF) || strings.HasSuffix(err.Error(), " EOF")) { + t.Errorf("Reading beyond last part didn't fail with EOF: %v", err) + } + } +} + +func TestMultipartResponse(t *testing.T) { + for _, tc := range []testCase{ + { + name: "defaults", + opt: Options{ + Width: 120, + Height: 200, + Format: DefaultFormat, + }, + target: "/", + wantMediaType: "image/png", + }, + { + name: "default PNG", + opt: Options{ + Width: 4, + Height: 4, + Format: PNG, + }, + target: "/", + wantMediaType: "image/png", + }, + { + name: "default JPEG", + opt: Options{ + Width: 200, + Height: 100, + Format: JPEG, + }, + target: "/", + wantMediaType: "image/jpeg", + }, + { + name: "format param PNG", + opt: Options{ + Width: 234, + Height: 123, + Format: JPEG, + }, + target: "/?format=png", + wantMediaType: "image/png", + }, + { + name: "format param JPEG", + opt: Options{ + Width: 123, + Height: 456, + Format: PNG, + }, + target: "/?format=jpeg", + wantMediaType: "image/jpeg", + }, + } { + t.Run(tc.name, func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + t.Cleanup(cancel) + + d := New(&tc.opt) + + srv := httptest.NewServer(d) + t.Cleanup(srv.Close) + t.Cleanup(srv.CloseClientConnections) + + quit := make(chan struct{}) + remaining := 10 + + tc.onImage = func(*testing.T, image.Image) { + if remaining == 0 { + tc.onImage = nil + + defer close(quit) + + if err := d.Halt(); err != nil { + t.Errorf("Halt() failed: %v", err) + } + } else { + remaining-- + } + } + + var wg sync.WaitGroup + + wg.Add(1) + go func() { + defer wg.Done() + + for { + if err := d.Draw(d.Bounds(), image.Black, image.Point{}); err != nil { + t.Errorf("Draw() failed: %v", err) + } + + select { + case <-quit: + return + case <-ctx.Done(): + return + default: + } + + time.Sleep(10 * time.Millisecond) + } + }() + + if resp, err := srv.Client().Get(srv.URL + tc.target); err != nil { + t.Errorf("Get() failed: %v", err) + } else { + tc.validateResponse(t, resp) + } + + if t.Failed() { + cancel() + } + + wg.Wait() + }) + } +} + +func TestRequestStatus(t *testing.T) { + for _, tc := range []struct { + method string + target string + wantStatus int + }{ + { + target: "/?format=", + wantStatus: http.StatusOK, + }, + { + target: "/?format=bmp", + wantStatus: http.StatusBadRequest, + }, + { + method: http.MethodPost, + target: "/", + wantStatus: http.StatusMethodNotAllowed, + }, + } { + t.Run(fmt.Sprint(tc), func(t *testing.T) { + d := New(&Options{ + Width: 16, + Height: 16, + }) + + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + t.Cleanup(cancel) + + srv := httptest.NewServer(d) + t.Cleanup(srv.Close) + t.Cleanup(srv.CloseClientConnections) + + req, err := http.NewRequestWithContext(ctx, tc.method, srv.URL+tc.target, nil) + if err != nil { + t.Errorf("NewRequest() failed: %v", err) + } + + if resp, err := srv.Client().Do(req); err != nil { + t.Errorf("Get() failed: %v", err) + } else if got, want := resp.StatusCode, tc.wantStatus; got != want { + t.Errorf("Request for %s %s returned status %d (%s), want %d", + req.Method, req.URL.String(), got, resp.Status, want) + } + }) + } +} diff --git a/videosink/writer.go b/videosink/writer.go new file mode 100644 index 0000000..db05ba4 --- /dev/null +++ b/videosink/writer.go @@ -0,0 +1,74 @@ +// Copyright 2021 The Periph Authors. All rights reserved. +// Use of this source code is governed under the Apache License, Version 2.0 +// that can be found in the LICENSE file. + +package videosink + +import ( + "bytes" + "crypto/rand" + "fmt" + "io" + "net/textproto" + "strconv" +) + +// randomBoundary generates a MIME multipart boundary compatible with RFC 2046 +// (section 5.1.1). +func randomBoundary() string { + var buf [34]byte + if _, err := io.ReadFull(rand.Reader, buf[:]); err != nil { + panic(err) + } + return fmt.Sprintf("%x", buf[:]) +} + +type partWriter struct { + u io.Writer + boundary string + started bool +} + +func makePartWriter(u io.Writer) partWriter { + return partWriter{ + u: u, + boundary: randomBoundary(), + } +} + +// writeFrame sends a single part of a MIME multipart entity, ensuring it's +// fully written by the time the function returns. +// +// The caller-owned headers are modified to set a Content-Length header. +// +// Go has a writer for MIME multipart messages in "mime/multipart".Writer. As +// of Go 1.17 it's not suitable for writing a neverending stream of parts where +// each must be flushed to the client with the part-ending boundary line. +func (w *partWriter) writeFrame(header textproto.MIMEHeader, body []byte) error { + header.Set("Content-Length", strconv.FormatInt(int64(len(body)), 10)) + + var buf bytes.Buffer + + if !w.started { + fmt.Fprintf(&buf, "--%s\r\n", w.boundary) + w.started = true + } + + for name := range header { + for _, value := range header[name] { + fmt.Fprintf(&buf, "%s: %s\r\n", name, value) + } + } + + buf.WriteString("\r\n") + + _, err := buf.WriteTo(w.u) + if err == nil { + _, err = io.Copy(w.u, bytes.NewReader(body)) + if err == nil { + _, err = fmt.Fprintf(w.u, "\r\n--%s\r\n", w.boundary) + } + } + + return err +} diff --git a/videosink/writer_test.go b/videosink/writer_test.go new file mode 100644 index 0000000..67cc081 --- /dev/null +++ b/videosink/writer_test.go @@ -0,0 +1,20 @@ +// Copyright 2021 The Periph Authors. All rights reserved. +// Use of this source code is governed under the Apache License, Version 2.0 +// that can be found in the LICENSE file. + +package videosink + +import ( + "regexp" + "testing" +) + +var boundaryRe = regexp.MustCompile(`^[a-f0-9]{60,70}$`) + +func TestRandomBoundary(t *testing.T) { + for i := 0; i < 100; i++ { + if got := randomBoundary(); !boundaryRe.MatchString(got) { + t.Errorf("Boundary must match the expression %q: %s", boundaryRe.String(), got) + } + } +}