diff --git a/kernel/api/broadcast.go b/kernel/api/broadcast.go index 0574ebac0..d12d40590 100644 --- a/kernel/api/broadcast.go +++ b/kernel/api/broadcast.go @@ -23,6 +23,7 @@ import ( "time" "github.com/88250/gulu" + "github.com/asaskevich/EventBus" "github.com/gin-contrib/sse" "github.com/gin-gonic/gin" "github.com/olahol/melody" @@ -34,12 +35,21 @@ const ( MessageTypeString MessageType = "string" MessageTypeBinary MessageType = "binary" MessageTypeClose MessageType = "close" + + EvtBroadcastMessage = "broadcast.message" ) var ( BroadcastChannels = sync.Map{} // [string (channel-name)] -> *BroadcastChannel - UnifiedSSE = NewEventSourceServer() - messageID = &MessageID{ + UnifiedSSE = &EventSourceServer{ + EventBus: EventBus.New(), + WaitGroup: &sync.WaitGroup{}, + Subscriber: &EventSourceSubscriber{ + lock: &sync.Mutex{}, + count: 0, + }, + } + messageID = &MessageID{ lock: &sync.Mutex{}, id: 0, } @@ -124,7 +134,7 @@ func (b *BroadcastChannel) Subscribed() bool { } func (b *BroadcastChannel) Destroy(force bool) bool { - if force || b.Subscribed() { + if force || !b.Subscribed() { b.WebSocket.Close() UnifiedSSE.SendEvent(&MessageEvent{ Type: MessageTypeClose, @@ -156,38 +166,11 @@ func (s *EventSourceSubscriber) Count() int { } type EventSourceServer struct { - Channel MessageEventChannel // message broadcast channel - Open chan MessageEventChannel // SSE connection open channel - Close chan MessageEventChannel // SSE connection close channel - Connections map[MessageEventChannel]bool // SSE connections - + EventBus EventBus.Bus WaitGroup *sync.WaitGroup Subscriber *EventSourceSubscriber } -// Start starts the SSE server -func (s *EventSourceServer) Start() { - // REF: https://github.com/gin-gonic/examples/blob/master/server-sent-event/main.go - for { - select { - // Add new available client - case channel := <-s.Open: - s.Connections[channel] = true - - // Remove closed client - case channel := <-s.Close: - delete(s.Connections, channel) - close(channel) - - // Broadcast message to client - case event := <-s.Channel: - for connection := range s.Connections { - connection <- event - } - } - } -} - // SendEvent sends a message to all subscribers func (s *EventSourceServer) SendEvent(event *MessageEvent) bool { if event.ID == "" { @@ -198,23 +181,12 @@ func (s *EventSourceServer) SendEvent(event *MessageEvent) bool { } } - s.Channel <- event - return true - - // select { - // case s.Channel <- event: - // return true - // default: - // logging.LogErrorf("send event failed: %v", event) - // return false - // } + s.EventBus.Publish(EvtBroadcastMessage, event) + return s.EventBus.HasCallback(EvtBroadcastMessage) } // Subscribe subscribes to specified broadcast channels -func (s *EventSourceServer) Subscribe(c *gin.Context, messageEventChannel MessageEventChannel, channels ...string) { - defer s.WaitGroup.Done() - s.WaitGroup.Add(1) - +func (s *EventSourceServer) Subscribe(c *gin.Context, retry uint, channels ...string) { wg := sync.WaitGroup{} wg.Add(len(channels)) for _, channel := range channels { @@ -239,8 +211,7 @@ func (s *EventSourceServer) Subscribe(c *gin.Context, messageEventChannel Messag } c.Writer.Flush() - retry := s.GetRetry(c) - s.Stream(c, messageEventChannel, func(event *MessageEvent, ok bool) bool { + s.Stream(c, func(event *MessageEvent, ok bool) bool { if ok { if _, exists := channelSet[event.Name]; exists { switch event.Type { @@ -288,15 +259,11 @@ func (s *EventSourceServer) Subscribe(c *gin.Context, messageEventChannel Messag } // SubscribeAll subscribes to all broadcast channels -func (s *EventSourceServer) SubscribeAll(c *gin.Context, messageEventChannel MessageEventChannel) { - defer s.WaitGroup.Done() - s.WaitGroup.Add(1) - +func (s *EventSourceServer) SubscribeAll(c *gin.Context, retry uint) { s.Subscriber.updateCount(1) c.Writer.Flush() - retry := s.GetRetry(c) - s.Stream(c, messageEventChannel, func(event *MessageEvent, ok bool) bool { + s.Stream(c, func(event *MessageEvent, ok bool) bool { if ok { switch event.Type { case MessageTypeClose: @@ -324,34 +291,44 @@ func (s *EventSourceServer) SubscribeAll(c *gin.Context, messageEventChannel Mes s.Subscriber.updateCount(-1) PruneBroadcastChannels() - } // GetRetry gets the retry interval // // If the retry interval is not specified, it will return 0 func (s *EventSourceServer) GetRetry(c *gin.Context) uint { - value, err := c.GetQuery("retry") - if !err { - retry, err := strconv.ParseUint(value, 10, 0) - if err == nil { - return uint(retry) - } + value := c.DefaultQuery("retry", "") + retry, err := strconv.ParseUint(value, 10, 0) + if err == nil { + return uint(retry) } return 0 } // Stream streams message to client -func (s *EventSourceServer) Stream(c *gin.Context, channel MessageEventChannel, step func(event *MessageEvent, ok bool) bool) bool { +// +// If the client is gone, it will return true +func (s *EventSourceServer) Stream(c *gin.Context, step func(event *MessageEvent, ok bool) bool) bool { + channel := make(MessageEventChannel) + defer close(channel) + + subscriber := func(event *MessageEvent) { + channel <- event + } + s.EventBus.Subscribe(EvtBroadcastMessage, subscriber) + defer s.EventBus.Unsubscribe(EvtBroadcastMessage, subscriber) + clientGone := c.Writer.CloseNotify() for { select { case <-clientGone: + logging.LogInfof("event source connection is closed by client") return true case event, ok := <-channel: if step(event, ok) { continue } + logging.LogInfof("event source connection is closed by server") return false } } @@ -362,27 +339,11 @@ func (s *EventSourceServer) SSEvent(c *gin.Context, event *sse.Event) { c.Render(-1, event) } +// Subscribed checks whether the SSE server is subscribed func (s *EventSourceServer) Subscribed() bool { return s.Subscriber.Count() > 0 } -func NewEventSourceServer() (server *EventSourceServer) { - server = &EventSourceServer{ - Channel: make(MessageEventChannel, 1024), - Open: make(chan MessageEventChannel, 32), - Close: make(chan MessageEventChannel, 32), - Connections: make(map[MessageEventChannel]bool), - - WaitGroup: &sync.WaitGroup{}, - Subscriber: &EventSourceSubscriber{ - lock: &sync.Mutex{}, - count: 0, - }, - } - go server.Start() - return -} - type ChannelInfo struct { Name string `json:"name"` Count int `json:"count"` @@ -571,17 +532,16 @@ func broadcastSubscribe(c *gin.Context) { c.Writer.Header().Set("Connection", "keep-alive") c.Writer.Header().Set("Transfer-Encoding", "chunked") - messageEventChannel := make(MessageEventChannel) - UnifiedSSE.Open <- messageEventChannel + defer UnifiedSSE.WaitGroup.Done() + UnifiedSSE.WaitGroup.Add(1) + retry := UnifiedSSE.GetRetry(c) channels, ok := c.GetQueryArray("channel") if ok { // subscribe specified broadcast channels - UnifiedSSE.Subscribe(c, messageEventChannel, channels...) + UnifiedSSE.Subscribe(c, retry, channels...) } else { // subscribe all broadcast channels - UnifiedSSE.SubscribeAll(c, messageEventChannel) + UnifiedSSE.SubscribeAll(c, retry) } - - UnifiedSSE.Close <- messageEventChannel } // broadcastPublish push multiple binary messages to multiple broadcast channels diff --git a/kernel/go.mod b/kernel/go.mod index 8877fbb31..05b4da667 100644 --- a/kernel/go.mod +++ b/kernel/go.mod @@ -18,6 +18,7 @@ require ( github.com/PuerkitoBio/goquery v1.10.0 github.com/Xuanwo/go-locale v1.1.2 github.com/araddon/dateparse v0.0.0-20210429162001-6b43995a97de + github.com/asaskevich/EventBus v0.0.0-20200907212545-49d423059eef github.com/common-nighthawk/go-figure v0.0.0-20210622060536-734e95fb86be github.com/denisbrodbeck/machineid v1.0.1 github.com/dgraph-io/ristretto v1.0.0 @@ -91,7 +92,6 @@ require ( github.com/alecthomas/chroma v0.10.0 // indirect github.com/andybalholm/brotli v1.1.1 // indirect github.com/andybalholm/cascadia v1.3.2 // indirect - github.com/asaskevich/EventBus v0.0.0-20200907212545-49d423059eef // indirect github.com/aws/aws-sdk-go-v2 v1.32.7 // indirect github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.7 // indirect github.com/aws/aws-sdk-go-v2/config v1.28.7 // indirect