converted stream reader to io.ReadCloser

This commit is contained in:
Brad Rydzewski 2016-04-22 18:15:07 -07:00
parent f8a2a32383
commit b9037b9d7c
5 changed files with 19 additions and 9 deletions

View File

@ -3,11 +3,13 @@ package stream
import ( import (
"bytes" "bytes"
"io" "io"
"sync/atomic"
) )
type reader struct { type reader struct {
w *writer w *writer
off int off int
closed uint32
} }
// Read reads from the Buffer // Read reads from the Buffer
@ -31,6 +33,10 @@ func (r *reader) Read(p []byte) (n int, err error) {
err = io.EOF err = io.EOF
break break
} }
if r.Closed() {
err = io.EOF
break
}
r.w.Wait() r.w.Wait()
} }
@ -39,6 +45,10 @@ func (r *reader) Read(p []byte) (n int, err error) {
} }
func (r *reader) Close() error { func (r *reader) Close() error {
// TODO close should remove reader from the parent! atomic.StoreUint32(&r.closed, 1)
return nil return nil
} }
func (r *reader) Closed() bool {
return atomic.LoadUint32(&r.closed) != 0
}

View File

@ -12,7 +12,7 @@ import (
type Stream interface { type Stream interface {
Create(string) error Create(string) error
Delete(string) error Delete(string) error
Reader(string) (io.Reader, error) Reader(string) (io.ReadCloser, error)
Writer(string) (io.WriteCloser, error) Writer(string) (io.WriteCloser, error)
} }
@ -22,7 +22,7 @@ func Create(c context.Context, key string) error {
} }
// Reader opens the stream for reading. // Reader opens the stream for reading.
func Reader(c context.Context, key string) (io.Reader, error) { func Reader(c context.Context, key string) (io.ReadCloser, error) {
return FromContext(c).Reader(key) return FromContext(c).Reader(key)
} }

View File

@ -19,7 +19,7 @@ func New() Stream {
} }
// Reader returns an io.Reader for reading from to the stream. // Reader returns an io.Reader for reading from to the stream.
func (s *stream) Reader(name string) (io.Reader, error) { func (s *stream) Reader(name string) (io.ReadCloser, error) {
s.Lock() s.Lock()
defer s.Unlock() defer s.Unlock()

View File

@ -31,7 +31,7 @@ func (w *writer) Write(p []byte) (n int, err error) {
return w.buffer.Write(p) return w.buffer.Write(p)
} }
func (w *writer) Reader() (io.Reader, error) { func (w *writer) Reader() (io.ReadCloser, error) {
return &reader{w: w}, nil return &reader{w: w}, nil
} }
@ -48,5 +48,5 @@ func (w *writer) Close() error {
} }
func (w *writer) Closed() bool { func (w *writer) Closed() bool {
return atomic.LoadUint32(&w.closed) == 1 return atomic.LoadUint32(&w.closed) != 0
} }

View File

@ -99,7 +99,7 @@ func GetStream2(c *gin.Context) {
go func() { go func() {
<-c.Writer.CloseNotify() <-c.Writer.CloseNotify()
// rc.Close() rc.Close()
}() }()
var line int var line int