mirror of
https://github.com/harness/drone.git
synced 2025-05-03 06:01:14 +08:00

* Formatting. * fix: [CDE-150]: In logstream, adding locking and panic recovery around a subscriber's publish method. Adding check in the stream's publish method to not publish if the sub is closed. Closing the err channel in the log stream API handler.
93 lines
2.0 KiB
Go
93 lines
2.0 KiB
Go
// Copyright 2023 Harness, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package livelog
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
)
|
|
|
|
// this is the amount of items that are stored in memory
|
|
// in the buffer. This should result in approximately 10kb
|
|
// of memory allocated per-stream and per-subscriber, not
|
|
// including any logdata stored in these structures.
|
|
const bufferSize = 5000
|
|
|
|
type stream struct {
|
|
sync.Mutex
|
|
|
|
hist []*Line
|
|
list map[*subscriber]struct{}
|
|
}
|
|
|
|
func newStream() *stream {
|
|
return &stream{
|
|
list: map[*subscriber]struct{}{},
|
|
}
|
|
}
|
|
|
|
func (s *stream) write(line *Line) error {
|
|
s.Lock()
|
|
s.hist = append(s.hist, line)
|
|
for l := range s.list {
|
|
if !l.closed {
|
|
l.publish(line)
|
|
}
|
|
}
|
|
// the history should not be unbounded. The history
|
|
// slice is capped and items are removed in a FIFO
|
|
// ordering when capacity is reached.
|
|
if size := len(s.hist); size >= bufferSize {
|
|
s.hist = s.hist[size-bufferSize:]
|
|
}
|
|
s.Unlock()
|
|
return nil
|
|
}
|
|
|
|
func (s *stream) subscribe(ctx context.Context) (<-chan *Line, <-chan error) {
|
|
sub := &subscriber{
|
|
handler: make(chan *Line, bufferSize),
|
|
closec: make(chan struct{}),
|
|
}
|
|
err := make(chan error)
|
|
|
|
s.Lock()
|
|
for _, line := range s.hist {
|
|
sub.publish(line)
|
|
}
|
|
s.list[sub] = struct{}{}
|
|
s.Unlock()
|
|
|
|
go func() {
|
|
defer close(err)
|
|
select {
|
|
case <-sub.closec:
|
|
case <-ctx.Done():
|
|
sub.close()
|
|
}
|
|
}()
|
|
return sub.handler, err
|
|
}
|
|
|
|
func (s *stream) close() error {
|
|
s.Lock()
|
|
defer s.Unlock()
|
|
for sub := range s.list {
|
|
delete(s.list, sub)
|
|
sub.close()
|
|
}
|
|
return nil
|
|
}
|