Skip to content

Commit 804cb60

Browse files
authored
Prepared Messages (#211)
1 parent 9bc973a commit 804cb60

File tree

6 files changed

+357
-7
lines changed

6 files changed

+357
-7
lines changed

conn.go

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -659,12 +659,33 @@ func (w *messageWriter) Close() error {
659659
return nil
660660
}
661661

662+
// WritePreparedMessage writes prepared message into connection.
663+
func (c *Conn) WritePreparedMessage(pm *PreparedMessage) error {
664+
frameType, frameData, err := pm.frame(prepareKey{
665+
isServer: c.isServer,
666+
compress: c.newCompressionWriter != nil && c.enableWriteCompression && isData(pm.messageType),
667+
compressionLevel: c.compressionLevel,
668+
})
669+
if err != nil {
670+
return err
671+
}
672+
if c.isWriting {
673+
panic("concurrent write to websocket connection")
674+
}
675+
c.isWriting = true
676+
err = c.write(frameType, c.writeDeadline, frameData, nil)
677+
if !c.isWriting {
678+
panic("concurrent write to websocket connection")
679+
}
680+
c.isWriting = false
681+
return err
682+
}
683+
662684
// WriteMessage is a helper method for getting a writer using NextWriter,
663685
// writing the message and closing the writer.
664686
func (c *Conn) WriteMessage(messageType int, data []byte) error {
665687

666688
if c.isServer && (c.newCompressionWriter == nil || !c.enableWriteCompression) {
667-
668689
// Fast path with no allocations and single frame.
669690

670691
if err := c.prepWrite(messageType); err != nil {

conn_broadcast_test.go

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
// Copyright 2017 The Gorilla WebSocket Authors. All rights reserved.
2+
// Use of this source code is governed by a BSD-style
3+
// license that can be found in the LICENSE file.
4+
5+
// +build go1.7
6+
7+
package websocket
8+
9+
import (
10+
"io"
11+
"io/ioutil"
12+
"sync/atomic"
13+
"testing"
14+
)
15+
16+
// broadcastBench allows to run broadcast benchmarks.
17+
// In every broadcast benchmark we create many connections, then send the same
18+
// message into every connection and wait for all writes complete. This emulates
19+
// an application where many connections listen to the same data - i.e. PUB/SUB
20+
// scenarios with many subscribers in one channel.
21+
type broadcastBench struct {
22+
w io.Writer
23+
message *broadcastMessage
24+
closeCh chan struct{}
25+
doneCh chan struct{}
26+
count int32
27+
conns []*broadcastConn
28+
compression bool
29+
usePrepared bool
30+
}
31+
32+
type broadcastMessage struct {
33+
payload []byte
34+
prepared *PreparedMessage
35+
}
36+
37+
type broadcastConn struct {
38+
conn *Conn
39+
msgCh chan *broadcastMessage
40+
}
41+
42+
func newBroadcastConn(c *Conn) *broadcastConn {
43+
return &broadcastConn{
44+
conn: c,
45+
msgCh: make(chan *broadcastMessage, 1),
46+
}
47+
}
48+
49+
func newBroadcastBench(usePrepared, compression bool) *broadcastBench {
50+
bench := &broadcastBench{
51+
w: ioutil.Discard,
52+
doneCh: make(chan struct{}),
53+
closeCh: make(chan struct{}),
54+
usePrepared: usePrepared,
55+
compression: compression,
56+
}
57+
msg := &broadcastMessage{
58+
payload: textMessages(1)[0],
59+
}
60+
if usePrepared {
61+
pm, _ := NewPreparedMessage(TextMessage, msg.payload)
62+
msg.prepared = pm
63+
}
64+
bench.message = msg
65+
bench.makeConns(10000)
66+
return bench
67+
}
68+
69+
func (b *broadcastBench) makeConns(numConns int) {
70+
conns := make([]*broadcastConn, numConns)
71+
72+
for i := 0; i < numConns; i++ {
73+
c := newConn(fakeNetConn{Reader: nil, Writer: b.w}, true, 1024, 1024)
74+
if b.compression {
75+
c.enableWriteCompression = true
76+
c.newCompressionWriter = compressNoContextTakeover
77+
}
78+
conns[i] = newBroadcastConn(c)
79+
go func(c *broadcastConn) {
80+
for {
81+
select {
82+
case msg := <-c.msgCh:
83+
if b.usePrepared {
84+
c.conn.WritePreparedMessage(msg.prepared)
85+
} else {
86+
c.conn.WriteMessage(TextMessage, msg.payload)
87+
}
88+
val := atomic.AddInt32(&b.count, 1)
89+
if val%int32(numConns) == 0 {
90+
b.doneCh <- struct{}{}
91+
}
92+
case <-b.closeCh:
93+
return
94+
}
95+
}
96+
}(conns[i])
97+
}
98+
b.conns = conns
99+
}
100+
101+
func (b *broadcastBench) close() {
102+
close(b.closeCh)
103+
}
104+
105+
func (b *broadcastBench) runOnce() {
106+
for _, c := range b.conns {
107+
c.msgCh <- b.message
108+
}
109+
<-b.doneCh
110+
}
111+
112+
func BenchmarkBroadcast(b *testing.B) {
113+
benchmarks := []struct {
114+
name string
115+
usePrepared bool
116+
compression bool
117+
}{
118+
{"NoCompression", false, false},
119+
{"WithCompression", false, true},
120+
{"NoCompressionPrepared", true, false},
121+
{"WithCompressionPrepared", true, true},
122+
}
123+
for _, bm := range benchmarks {
124+
b.Run(bm.name, func(b *testing.B) {
125+
bench := newBroadcastBench(bm.usePrepared, bm.compression)
126+
defer bench.close()
127+
b.ResetTimer()
128+
for i := 0; i < b.N; i++ {
129+
bench.runOnce()
130+
}
131+
b.ReportAllocs()
132+
})
133+
}
134+
}

examples/autobahn/fuzzingclient.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
"outdir": "./reports/clients",
55
"servers": [
66
{"agent": "ReadAllWriteMessage", "url": "ws://localhost:9000/m", "options": {"version": 18}},
7+
{"agent": "ReadAllWritePreparedMessage", "url": "ws://localhost:9000/p", "options": {"version": 18}},
78
{"agent": "ReadAllWrite", "url": "ws://localhost:9000/r", "options": {"version": 18}},
89
{"agent": "CopyFull", "url": "ws://localhost:9000/f", "options": {"version": 18}},
910
{"agent": "CopyWriterOnly", "url": "ws://localhost:9000/c", "options": {"version": 18}}

examples/autobahn/server.go

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ func echoCopyFull(w http.ResponseWriter, r *http.Request) {
8585

8686
// echoReadAll echoes messages from the client by reading the entire message
8787
// with ioutil.ReadAll.
88-
func echoReadAll(w http.ResponseWriter, r *http.Request, writeMessage bool) {
88+
func echoReadAll(w http.ResponseWriter, r *http.Request, writeMessage, writePrepared bool) {
8989
conn, err := upgrader.Upgrade(w, r, nil)
9090
if err != nil {
9191
log.Println("Upgrade:", err)
@@ -109,9 +109,21 @@ func echoReadAll(w http.ResponseWriter, r *http.Request, writeMessage bool) {
109109
}
110110
}
111111
if writeMessage {
112-
err = conn.WriteMessage(mt, b)
113-
if err != nil {
114-
log.Println("WriteMessage:", err)
112+
if !writePrepared {
113+
err = conn.WriteMessage(mt, b)
114+
if err != nil {
115+
log.Println("WriteMessage:", err)
116+
}
117+
} else {
118+
pm, err := websocket.NewPreparedMessage(mt, b)
119+
if err != nil {
120+
log.Println("NewPreparedMessage:", err)
121+
return
122+
}
123+
err = conn.WritePreparedMessage(pm)
124+
if err != nil {
125+
log.Println("WritePreparedMessage:", err)
126+
}
115127
}
116128
} else {
117129
w, err := conn.NextWriter(mt)
@@ -132,11 +144,15 @@ func echoReadAll(w http.ResponseWriter, r *http.Request, writeMessage bool) {
132144
}
133145

134146
func echoReadAllWriter(w http.ResponseWriter, r *http.Request) {
135-
echoReadAll(w, r, false)
147+
echoReadAll(w, r, false, false)
136148
}
137149

138150
func echoReadAllWriteMessage(w http.ResponseWriter, r *http.Request) {
139-
echoReadAll(w, r, true)
151+
echoReadAll(w, r, true, false)
152+
}
153+
154+
func echoReadAllWritePreparedMessage(w http.ResponseWriter, r *http.Request) {
155+
echoReadAll(w, r, true, true)
140156
}
141157

142158
func serveHome(w http.ResponseWriter, r *http.Request) {
@@ -161,6 +177,7 @@ func main() {
161177
http.HandleFunc("/f", echoCopyFull)
162178
http.HandleFunc("/r", echoReadAllWriter)
163179
http.HandleFunc("/m", echoReadAllWriteMessage)
180+
http.HandleFunc("/p", echoReadAllWritePreparedMessage)
164181
err := http.ListenAndServe(*addr, nil)
165182
if err != nil {
166183
log.Fatal("ListenAndServe: ", err)

prepared.go

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
// Copyright 2017 The Gorilla WebSocket Authors. All rights reserved.
2+
// Use of this source code is governed by a BSD-style
3+
// license that can be found in the LICENSE file.
4+
5+
package websocket
6+
7+
import (
8+
"bytes"
9+
"net"
10+
"sync"
11+
"time"
12+
)
13+
14+
// PreparedMessage caches on the wire representations of a message payload.
15+
// Use PreparedMessage to efficiently send a message payload to multiple
16+
// connections. PreparedMessage is especially useful when compression is used
17+
// because the CPU and memory expensive compression operation can be executed
18+
// once for a given set of compression options.
19+
type PreparedMessage struct {
20+
messageType int
21+
data []byte
22+
err error
23+
mu sync.Mutex
24+
frames map[prepareKey]*preparedFrame
25+
}
26+
27+
// prepareKey defines a unique set of options to cache prepared frames in PreparedMessage.
28+
type prepareKey struct {
29+
isServer bool
30+
compress bool
31+
compressionLevel int
32+
}
33+
34+
// preparedFrame contains data in wire representation.
35+
type preparedFrame struct {
36+
once sync.Once
37+
data []byte
38+
}
39+
40+
// NewPreparedMessage returns an initialized PreparedMessage. You can then send
41+
// it to connection using WritePreparedMessage method. Valid wire
42+
// representation will be calculated lazily only once for a set of current
43+
// connection options.
44+
func NewPreparedMessage(messageType int, data []byte) (*PreparedMessage, error) {
45+
pm := &PreparedMessage{
46+
messageType: messageType,
47+
frames: make(map[prepareKey]*preparedFrame),
48+
data: data,
49+
}
50+
51+
// Prepare a plain server frame.
52+
_, frameData, err := pm.frame(prepareKey{isServer: true, compress: false})
53+
if err != nil {
54+
return nil, err
55+
}
56+
57+
// To protect against caller modifying the data argument, remember the data
58+
// copied to the plain server frame.
59+
pm.data = frameData[len(frameData)-len(data):]
60+
return pm, nil
61+
}
62+
63+
func (pm *PreparedMessage) frame(key prepareKey) (int, []byte, error) {
64+
pm.mu.Lock()
65+
frame, ok := pm.frames[key]
66+
if !ok {
67+
frame = &preparedFrame{}
68+
pm.frames[key] = frame
69+
}
70+
pm.mu.Unlock()
71+
72+
var err error
73+
frame.once.Do(func() {
74+
// Prepare a frame using a 'fake' connection.
75+
// TODO: Refactor code in conn.go to allow more direct construction of
76+
// the frame.
77+
mu := make(chan bool, 1)
78+
mu <- true
79+
var nc prepareConn
80+
c := &Conn{
81+
conn: &nc,
82+
mu: mu,
83+
isServer: key.isServer,
84+
compressionLevel: key.compressionLevel,
85+
enableWriteCompression: true,
86+
writeBuf: make([]byte, defaultWriteBufferSize+maxFrameHeaderSize),
87+
}
88+
if key.compress {
89+
c.newCompressionWriter = compressNoContextTakeover
90+
}
91+
err = c.WriteMessage(pm.messageType, pm.data)
92+
frame.data = nc.buf.Bytes()
93+
})
94+
return pm.messageType, frame.data, err
95+
}
96+
97+
type prepareConn struct {
98+
buf bytes.Buffer
99+
net.Conn
100+
}
101+
102+
func (pc *prepareConn) Write(p []byte) (int, error) { return pc.buf.Write(p) }
103+
func (pc *prepareConn) SetWriteDeadline(t time.Time) error { return nil }

0 commit comments

Comments
 (0)