Skip to content

Commit 1f18b56

Browse files
committed
Remove atomic integer loads and stores in favour of atomic.Value
- Also allows SetReadLimit to be called concurrently which is a nice touch
1 parent ff63b19 commit 1f18b56

File tree

4 files changed

+42
-17
lines changed

4 files changed

+42
-17
lines changed

ci/wasm.sh

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,5 +26,7 @@ GOOS=js GOARCH=wasm go test -exec=wasmbrowsertest ./... -args "$WS_ECHO_SERVER_U
2626

2727
if ! wait "$wsjstestPID"; then
2828
echo "wsjstest exited unsuccessfully"
29+
echo "output:"
30+
cat "$wsjstestOut"
2931
exit 1
3032
fi

conn.go

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,7 @@ import (
2020
)
2121

2222
// Conn represents a WebSocket connection.
23-
// All methods may be called concurrently except for Reader, Read
24-
// and SetReadLimit.
23+
// All methods may be called concurrently except for Reader and Read.
2524
//
2625
// You must always read from the connection. Otherwise control
2726
// frames will not be handled. See the docs on Reader and CloseRead.
@@ -55,8 +54,7 @@ type Conn struct {
5554
writeFrameLock chan struct{}
5655
writeHeaderBuf []byte
5756
writeHeader *header
58-
// read limit for a message in bytes.
59-
msgReadLimit int64
57+
msgReadLimit *atomicInt64
6058

6159
// Used to ensure a previous writer is not used after being closed.
6260
activeWriter atomic.Value
@@ -70,8 +68,7 @@ type Conn struct {
7068
activeReader *messageReader
7169
// readFrameLock is acquired to read from bw.
7270
readFrameLock chan struct{}
73-
// Not int32 because of https://github.com/nhooyr/websocket/issues/153
74-
readClosed int32
71+
readClosed *atomicInt64
7572
readHeaderBuf []byte
7673
controlPayloadBuf []byte
7774

@@ -91,7 +88,8 @@ type Conn struct {
9188
func (c *Conn) init() {
9289
c.closed = make(chan struct{})
9390

94-
c.msgReadLimit = 32768
91+
c.msgReadLimit = &atomicInt64{}
92+
c.msgReadLimit.Store(32768)
9593

9694
c.writeMsgLock = make(chan struct{}, 1)
9795
c.writeFrameLock = make(chan struct{}, 1)
@@ -106,6 +104,7 @@ func (c *Conn) init() {
106104
c.writeHeaderBuf = makeWriteHeaderBuf()
107105
c.writeHeader = &header{}
108106
c.readHeaderBuf = makeReadHeaderBuf()
107+
c.readClosed = &atomicInt64{}
109108
c.controlPayloadBuf = make([]byte, maxControlFramePayload)
110109

111110
runtime.SetFinalizer(c, func(c *Conn) {
@@ -342,7 +341,7 @@ func (c *Conn) handleControl(ctx context.Context, h header) error {
342341
// See https://github.com/nhooyr/websocket/issues/87#issue-451703332
343342
// Most users should not need this.
344343
func (c *Conn) Reader(ctx context.Context) (MessageType, io.Reader, error) {
345-
if atomic.LoadInt32(&c.readClosed) == 1 {
344+
if c.readClosed.Load() == 1 {
346345
return 0, nil, fmt.Errorf("websocket connection read closed")
347346
}
348347

@@ -392,7 +391,7 @@ func (c *Conn) reader(ctx context.Context) (MessageType, io.Reader, error) {
392391
c.readerMsgHeader = h
393392
c.readerFrameEOF = false
394393
c.readerMaskPos = 0
395-
c.readMsgLeft = c.msgReadLimit
394+
c.readMsgLeft = c.msgReadLimit.Load()
396395

397396
r := &messageReader{
398397
c: c,

conn_common.go

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ func (c *netConn) SetReadDeadline(t time.Time) error {
178178
// Use this when you do not want to read data messages from the connection anymore but will
179179
// want to write messages to it.
180180
func (c *Conn) CloseRead(ctx context.Context) context.Context {
181-
atomic.StoreInt32(&c.readClosed, 1)
181+
c.readClosed.Store(1)
182182

183183
ctx, cancel := context.WithCancel(ctx)
184184
go func() {
@@ -200,11 +200,32 @@ func (c *Conn) CloseRead(ctx context.Context) context.Context {
200200
//
201201
// When the limit is hit, the connection will be closed with StatusMessageTooBig.
202202
func (c *Conn) SetReadLimit(n int64) {
203-
c.msgReadLimit = n
203+
c.msgReadLimit.Store(n)
204204
}
205205

206206
func (c *Conn) setCloseErr(err error) {
207207
c.closeErrOnce.Do(func() {
208208
c.closeErr = fmt.Errorf("websocket closed: %w", err)
209209
})
210210
}
211+
212+
// See https://github.com/nhooyr/websocket/issues/153
213+
type atomicInt64 struct {
214+
v atomic.Value
215+
}
216+
217+
func (v *atomicInt64) Load() int64 {
218+
i, ok := v.v.Load().(int64)
219+
if !ok {
220+
return 0
221+
}
222+
return i
223+
}
224+
225+
func (v *atomicInt64) Store(i int64) {
226+
v.v.Store(i)
227+
}
228+
229+
func (v *atomicInt64) String() string {
230+
return fmt.Sprint(v.v.Load())
231+
}

websocket_js.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import (
1010
"reflect"
1111
"runtime"
1212
"sync"
13-
"sync/atomic"
1413
"syscall/js"
1514

1615
"nhooyr.io/websocket/internal/bpool"
@@ -21,9 +20,9 @@ import (
2120
type Conn struct {
2221
ws wsjs.WebSocket
2322

24-
msgReadLimit int64
23+
msgReadLimit *atomicInt64
2524

26-
readClosed int64
25+
readClosed *atomicInt64
2726
closeOnce sync.Once
2827
closed chan struct{}
2928
closeErrOnce sync.Once
@@ -49,7 +48,11 @@ func (c *Conn) close(err error) {
4948
func (c *Conn) init() {
5049
c.closed = make(chan struct{})
5150
c.readSignal = make(chan struct{}, 1)
52-
c.msgReadLimit = 32768
51+
52+
c.msgReadLimit = &atomicInt64{}
53+
c.msgReadLimit.Store(32768)
54+
55+
c.readClosed = &atomicInt64{}
5356

5457
c.releaseOnClose = c.ws.OnClose(func(e wsjs.CloseEvent) {
5558
cerr := CloseError{
@@ -89,15 +92,15 @@ func (c *Conn) closeWithInternal() {
8992
// Read attempts to read a message from the connection.
9093
// The maximum time spent waiting is bounded by the context.
9194
func (c *Conn) Read(ctx context.Context) (MessageType, []byte, error) {
92-
if atomic.LoadInt32(&c.readClosed) == 1 {
95+
if c.readClosed.Load() == 1 {
9396
return 0, nil, fmt.Errorf("websocket connection read closed")
9497
}
9598

9699
typ, p, err := c.read(ctx)
97100
if err != nil {
98101
return 0, nil, fmt.Errorf("failed to read: %w", err)
99102
}
100-
if int64(len(p)) > c.msgReadLimit {
103+
if int64(len(p)) > c.msgReadLimit.Load() {
101104
c.Close(StatusMessageTooBig, fmt.Sprintf("read limited at %v bytes", c.msgReadLimit))
102105
return 0, nil, c.closeErr
103106
}

0 commit comments

Comments
 (0)