♻️ Refactor EventSource broadcast subscribe (#13708)

* 🎨 Refactor broadcast subscribe using event-bus

* 🐛 Fix invalid query param `retry`

* 🎨 Add event source status log

* 🐛 Fix EventSource destroy method
This commit is contained in:
Yingyi / 颖逸 2025-01-03 09:28:16 +08:00 committed by GitHub
parent 866189bcc0
commit ea1081db5b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 45 additions and 85 deletions

View File

@ -23,6 +23,7 @@ import (
"time"
"github.com/88250/gulu"
"github.com/asaskevich/EventBus"
"github.com/gin-contrib/sse"
"github.com/gin-gonic/gin"
"github.com/olahol/melody"
@ -34,12 +35,21 @@ const (
MessageTypeString MessageType = "string"
MessageTypeBinary MessageType = "binary"
MessageTypeClose MessageType = "close"
EvtBroadcastMessage = "broadcast.message"
)
var (
BroadcastChannels = sync.Map{} // [string (channel-name)] -> *BroadcastChannel
UnifiedSSE = NewEventSourceServer()
messageID = &MessageID{
UnifiedSSE = &EventSourceServer{
EventBus: EventBus.New(),
WaitGroup: &sync.WaitGroup{},
Subscriber: &EventSourceSubscriber{
lock: &sync.Mutex{},
count: 0,
},
}
messageID = &MessageID{
lock: &sync.Mutex{},
id: 0,
}
@ -124,7 +134,7 @@ func (b *BroadcastChannel) Subscribed() bool {
}
func (b *BroadcastChannel) Destroy(force bool) bool {
if force || b.Subscribed() {
if force || !b.Subscribed() {
b.WebSocket.Close()
UnifiedSSE.SendEvent(&MessageEvent{
Type: MessageTypeClose,
@ -156,38 +166,11 @@ func (s *EventSourceSubscriber) Count() int {
}
type EventSourceServer struct {
Channel MessageEventChannel // message broadcast channel
Open chan MessageEventChannel // SSE connection open channel
Close chan MessageEventChannel // SSE connection close channel
Connections map[MessageEventChannel]bool // SSE connections
EventBus EventBus.Bus
WaitGroup *sync.WaitGroup
Subscriber *EventSourceSubscriber
}
// Start starts the SSE server
func (s *EventSourceServer) Start() {
// REF: https://github.com/gin-gonic/examples/blob/master/server-sent-event/main.go
for {
select {
// Add new available client
case channel := <-s.Open:
s.Connections[channel] = true
// Remove closed client
case channel := <-s.Close:
delete(s.Connections, channel)
close(channel)
// Broadcast message to client
case event := <-s.Channel:
for connection := range s.Connections {
connection <- event
}
}
}
}
// SendEvent sends a message to all subscribers
func (s *EventSourceServer) SendEvent(event *MessageEvent) bool {
if event.ID == "" {
@ -198,23 +181,12 @@ func (s *EventSourceServer) SendEvent(event *MessageEvent) bool {
}
}
s.Channel <- event
return true
// select {
// case s.Channel <- event:
// return true
// default:
// logging.LogErrorf("send event failed: %v", event)
// return false
// }
s.EventBus.Publish(EvtBroadcastMessage, event)
return s.EventBus.HasCallback(EvtBroadcastMessage)
}
// Subscribe subscribes to specified broadcast channels
func (s *EventSourceServer) Subscribe(c *gin.Context, messageEventChannel MessageEventChannel, channels ...string) {
defer s.WaitGroup.Done()
s.WaitGroup.Add(1)
func (s *EventSourceServer) Subscribe(c *gin.Context, retry uint, channels ...string) {
wg := sync.WaitGroup{}
wg.Add(len(channels))
for _, channel := range channels {
@ -239,8 +211,7 @@ func (s *EventSourceServer) Subscribe(c *gin.Context, messageEventChannel Messag
}
c.Writer.Flush()
retry := s.GetRetry(c)
s.Stream(c, messageEventChannel, func(event *MessageEvent, ok bool) bool {
s.Stream(c, func(event *MessageEvent, ok bool) bool {
if ok {
if _, exists := channelSet[event.Name]; exists {
switch event.Type {
@ -288,15 +259,11 @@ func (s *EventSourceServer) Subscribe(c *gin.Context, messageEventChannel Messag
}
// SubscribeAll subscribes to all broadcast channels
func (s *EventSourceServer) SubscribeAll(c *gin.Context, messageEventChannel MessageEventChannel) {
defer s.WaitGroup.Done()
s.WaitGroup.Add(1)
func (s *EventSourceServer) SubscribeAll(c *gin.Context, retry uint) {
s.Subscriber.updateCount(1)
c.Writer.Flush()
retry := s.GetRetry(c)
s.Stream(c, messageEventChannel, func(event *MessageEvent, ok bool) bool {
s.Stream(c, func(event *MessageEvent, ok bool) bool {
if ok {
switch event.Type {
case MessageTypeClose:
@ -324,34 +291,44 @@ func (s *EventSourceServer) SubscribeAll(c *gin.Context, messageEventChannel Mes
s.Subscriber.updateCount(-1)
PruneBroadcastChannels()
}
// GetRetry gets the retry interval
//
// If the retry interval is not specified, it will return 0
func (s *EventSourceServer) GetRetry(c *gin.Context) uint {
value, err := c.GetQuery("retry")
if !err {
retry, err := strconv.ParseUint(value, 10, 0)
if err == nil {
return uint(retry)
}
value := c.DefaultQuery("retry", "")
retry, err := strconv.ParseUint(value, 10, 0)
if err == nil {
return uint(retry)
}
return 0
}
// Stream streams message to client
func (s *EventSourceServer) Stream(c *gin.Context, channel MessageEventChannel, step func(event *MessageEvent, ok bool) bool) bool {
//
// If the client is gone, it will return true
func (s *EventSourceServer) Stream(c *gin.Context, step func(event *MessageEvent, ok bool) bool) bool {
channel := make(MessageEventChannel)
defer close(channel)
subscriber := func(event *MessageEvent) {
channel <- event
}
s.EventBus.Subscribe(EvtBroadcastMessage, subscriber)
defer s.EventBus.Unsubscribe(EvtBroadcastMessage, subscriber)
clientGone := c.Writer.CloseNotify()
for {
select {
case <-clientGone:
logging.LogInfof("event source connection is closed by client")
return true
case event, ok := <-channel:
if step(event, ok) {
continue
}
logging.LogInfof("event source connection is closed by server")
return false
}
}
@ -362,27 +339,11 @@ func (s *EventSourceServer) SSEvent(c *gin.Context, event *sse.Event) {
c.Render(-1, event)
}
// Subscribed checks whether the SSE server is subscribed
func (s *EventSourceServer) Subscribed() bool {
return s.Subscriber.Count() > 0
}
func NewEventSourceServer() (server *EventSourceServer) {
server = &EventSourceServer{
Channel: make(MessageEventChannel, 1024),
Open: make(chan MessageEventChannel, 32),
Close: make(chan MessageEventChannel, 32),
Connections: make(map[MessageEventChannel]bool),
WaitGroup: &sync.WaitGroup{},
Subscriber: &EventSourceSubscriber{
lock: &sync.Mutex{},
count: 0,
},
}
go server.Start()
return
}
type ChannelInfo struct {
Name string `json:"name"`
Count int `json:"count"`
@ -571,17 +532,16 @@ func broadcastSubscribe(c *gin.Context) {
c.Writer.Header().Set("Connection", "keep-alive")
c.Writer.Header().Set("Transfer-Encoding", "chunked")
messageEventChannel := make(MessageEventChannel)
UnifiedSSE.Open <- messageEventChannel
defer UnifiedSSE.WaitGroup.Done()
UnifiedSSE.WaitGroup.Add(1)
retry := UnifiedSSE.GetRetry(c)
channels, ok := c.GetQueryArray("channel")
if ok { // subscribe specified broadcast channels
UnifiedSSE.Subscribe(c, messageEventChannel, channels...)
UnifiedSSE.Subscribe(c, retry, channels...)
} else { // subscribe all broadcast channels
UnifiedSSE.SubscribeAll(c, messageEventChannel)
UnifiedSSE.SubscribeAll(c, retry)
}
UnifiedSSE.Close <- messageEventChannel
}
// broadcastPublish push multiple binary messages to multiple broadcast channels

View File

@ -18,6 +18,7 @@ require (
github.com/PuerkitoBio/goquery v1.10.0
github.com/Xuanwo/go-locale v1.1.2
github.com/araddon/dateparse v0.0.0-20210429162001-6b43995a97de
github.com/asaskevich/EventBus v0.0.0-20200907212545-49d423059eef
github.com/common-nighthawk/go-figure v0.0.0-20210622060536-734e95fb86be
github.com/denisbrodbeck/machineid v1.0.1
github.com/dgraph-io/ristretto v1.0.0
@ -91,7 +92,6 @@ require (
github.com/alecthomas/chroma v0.10.0 // indirect
github.com/andybalholm/brotli v1.1.1 // indirect
github.com/andybalholm/cascadia v1.3.2 // indirect
github.com/asaskevich/EventBus v0.0.0-20200907212545-49d423059eef // indirect
github.com/aws/aws-sdk-go-v2 v1.32.7 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.7 // indirect
github.com/aws/aws-sdk-go-v2/config v1.28.7 // indirect