Skip to content

Commit aa58806

Browse files
fix(client): use scanner for streaming
1 parent d59453c commit aa58806

File tree

1 file changed

+10
-50
lines changed

1 file changed

+10
-50
lines changed

packages/ssestream/ssestream.go

Lines changed: 10 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,9 @@ func NewDecoder(res *http.Response) Decoder {
3131
if t, ok := decoderTypes[contentType]; ok {
3232
decoder = t(res.Body)
3333
} else {
34-
decoder = &eventStreamDecoder{rc: res.Body, rdr: bufio.NewReader(res.Body)}
34+
scn := bufio.NewScanner(res.Body)
35+
scn.Buffer(nil, bufio.MaxScanTokenSize<<2)
36+
decoder = &eventStreamDecoder{rc: res.Body, scn: scn}
3537
}
3638
return decoder
3739
}
@@ -51,48 +53,10 @@ type Event struct {
5153
type eventStreamDecoder struct {
5254
evt Event
5355
rc io.ReadCloser
54-
rdr *bufio.Reader
56+
scn *bufio.Scanner
5557
err error
5658
}
5759

58-
func line(r *bufio.Reader) ([]byte, error) {
59-
var overflow bytes.Buffer
60-
61-
// To prevent infinite loops, the failsafe stops when a line is
62-
// 100 times longer than the [io.Reader] default buffer size,
63-
// or after 20 failed attempts to find an end of line.
64-
for f := 0; f < 100; f++ {
65-
part, isPrefix, err := r.ReadLine()
66-
if err != nil {
67-
return nil, err
68-
}
69-
70-
// Happy case, the line fits in the default buffer.
71-
if !isPrefix && overflow.Len() == 0 {
72-
return part, nil
73-
}
74-
75-
// Overflow case, append to the buffer.
76-
if isPrefix || overflow.Len() > 0 {
77-
n, err := overflow.Write(part)
78-
if err != nil {
79-
return nil, err
80-
}
81-
82-
// Didn't find an end of line, heavily increment the failsafe.
83-
if n != r.Size() {
84-
f += 5
85-
}
86-
}
87-
88-
if !isPrefix {
89-
return overflow.Bytes(), nil
90-
}
91-
}
92-
93-
return nil, fmt.Errorf("ssestream: too many attempts to read a line")
94-
}
95-
9660
func (s *eventStreamDecoder) Next() bool {
9761
if s.err != nil {
9862
return false
@@ -101,16 +65,8 @@ func (s *eventStreamDecoder) Next() bool {
10165
event := ""
10266
data := bytes.NewBuffer(nil)
10367

104-
for {
105-
txt, err := line(s.rdr)
106-
if err == io.EOF {
107-
return false
108-
}
109-
110-
if err != nil {
111-
s.err = err
112-
break
113-
}
68+
for s.scn.Scan() {
69+
txt := s.scn.Bytes()
11470

11571
// Dispatch event on an empty line
11672
if len(txt) == 0 {
@@ -147,6 +103,10 @@ func (s *eventStreamDecoder) Next() bool {
147103
}
148104
}
149105

106+
if s.scn.Err() != nil {
107+
s.err = s.scn.Err()
108+
}
109+
150110
return false
151111
}
152112

0 commit comments

Comments
 (0)