Signed-off-by: Michael Smithhisler <smithhisler.mike@gmail.com>
| ... | ... |
@@ -45,12 +45,15 @@ func (r stream) Close() error {
|
| 45 | 45 |
// JSONMessages decodes the response stream as a sequence of JSONMessages. |
| 46 | 46 |
// if stream ends or context is cancelled, the underlying [io.Reader] is closed. |
| 47 | 47 |
func (r stream) JSONMessages(ctx context.Context) iter.Seq2[jsonstream.Message, error] {
|
| 48 |
- context.AfterFunc(ctx, func() {
|
|
| 48 |
+ stop := context.AfterFunc(ctx, func() {
|
|
| 49 | 49 |
_ = r.Close() |
| 50 | 50 |
}) |
| 51 | 51 |
dec := json.NewDecoder(r) |
| 52 | 52 |
return func(yield func(jsonstream.Message, error) bool) {
|
| 53 |
- defer r.Close() |
|
| 53 |
+ defer func() {
|
|
| 54 |
+ stop() // unregister AfterFunc |
|
| 55 |
+ r.Close() |
|
| 56 |
+ }() |
|
| 54 | 57 |
for {
|
| 55 | 58 |
var jm jsonstream.Message |
| 56 | 59 |
err := dec.Decode(&jm) |
| ... | ... |
@@ -136,14 +136,19 @@ func newCancelReadCloser(ctx context.Context, rc io.ReadCloser) io.ReadCloser {
|
| 136 | 136 |
rc: rc, |
| 137 | 137 |
close: sync.OnceValue(rc.Close), |
| 138 | 138 |
} |
| 139 |
- context.AfterFunc(ctx, func() { _ = crc.Close() })
|
|
| 139 |
+ crc.stop = context.AfterFunc(ctx, func() { _ = crc.Close() })
|
|
| 140 | 140 |
return crc |
| 141 | 141 |
} |
| 142 | 142 |
|
| 143 | 143 |
type cancelReadCloser struct {
|
| 144 | 144 |
rc io.ReadCloser |
| 145 | 145 |
close func() error |
| 146 |
+ stop func() bool |
|
| 146 | 147 |
} |
| 147 | 148 |
|
| 148 | 149 |
func (c *cancelReadCloser) Read(p []byte) (int, error) { return c.rc.Read(p) }
|
| 149 |
-func (c *cancelReadCloser) Close() error { return c.close() }
|
|
| 150 |
+ |
|
| 151 |
+func (c *cancelReadCloser) Close() error {
|
|
| 152 |
+ c.stop() // unregister AfterFunc |
|
| 153 |
+ return c.close() |
|
| 154 |
+} |
| ... | ... |
@@ -45,12 +45,15 @@ func (r stream) Close() error {
|
| 45 | 45 |
// JSONMessages decodes the response stream as a sequence of JSONMessages. |
| 46 | 46 |
// if stream ends or context is cancelled, the underlying [io.Reader] is closed. |
| 47 | 47 |
func (r stream) JSONMessages(ctx context.Context) iter.Seq2[jsonstream.Message, error] {
|
| 48 |
- context.AfterFunc(ctx, func() {
|
|
| 48 |
+ stop := context.AfterFunc(ctx, func() {
|
|
| 49 | 49 |
_ = r.Close() |
| 50 | 50 |
}) |
| 51 | 51 |
dec := json.NewDecoder(r) |
| 52 | 52 |
return func(yield func(jsonstream.Message, error) bool) {
|
| 53 |
- defer r.Close() |
|
| 53 |
+ defer func() {
|
|
| 54 |
+ stop() // unregister AfterFunc |
|
| 55 |
+ r.Close() |
|
| 56 |
+ }() |
|
| 54 | 57 |
for {
|
| 55 | 58 |
var jm jsonstream.Message |
| 56 | 59 |
err := dec.Decode(&jm) |
| ... | ... |
@@ -136,14 +136,19 @@ func newCancelReadCloser(ctx context.Context, rc io.ReadCloser) io.ReadCloser {
|
| 136 | 136 |
rc: rc, |
| 137 | 137 |
close: sync.OnceValue(rc.Close), |
| 138 | 138 |
} |
| 139 |
- context.AfterFunc(ctx, func() { _ = crc.Close() })
|
|
| 139 |
+ crc.stop = context.AfterFunc(ctx, func() { _ = crc.Close() })
|
|
| 140 | 140 |
return crc |
| 141 | 141 |
} |
| 142 | 142 |
|
| 143 | 143 |
type cancelReadCloser struct {
|
| 144 | 144 |
rc io.ReadCloser |
| 145 | 145 |
close func() error |
| 146 |
+ stop func() bool |
|
| 146 | 147 |
} |
| 147 | 148 |
|
| 148 | 149 |
func (c *cancelReadCloser) Read(p []byte) (int, error) { return c.rc.Read(p) }
|
| 149 |
-func (c *cancelReadCloser) Close() error { return c.close() }
|
|
| 150 |
+ |
|
| 151 |
+func (c *cancelReadCloser) Close() error {
|
|
| 152 |
+ c.stop() // unregister AfterFunc |
|
| 153 |
+ return c.close() |
|
| 154 |
+} |