Merge remote-tracking branch 'origin/dev' into dev

This commit is contained in:
Vanessa 2025-01-01 23:01:55 +08:00
commit ba838cc4a5
4 changed files with 548 additions and 110 deletions

View File

@ -18,44 +18,390 @@ package api
import ( import (
"net/http" "net/http"
"strconv"
"sync" "sync"
"time" "time"
"github.com/88250/gulu" "github.com/88250/gulu"
"github.com/gin-contrib/sse"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"github.com/olahol/melody" "github.com/olahol/melody"
"github.com/siyuan-note/logging" "github.com/siyuan-note/logging"
"github.com/siyuan-note/siyuan/kernel/util" "github.com/siyuan-note/siyuan/kernel/util"
) )
type Channel struct { const (
MessageTypeString MessageType = "string"
MessageTypeBinary MessageType = "binary"
MessageTypeClose MessageType = "close"
)
var (
BroadcastChannels = sync.Map{} // [string (channel-name)] -> *BroadcastChannel
UnifiedSSE = NewEventSourceServer()
messageID = &MessageID{
lock: &sync.Mutex{},
id: 0,
}
)
type MessageType string
type MessageEventChannel chan *MessageEvent
type MessageID struct {
lock *sync.Mutex
id uint64
}
func (m *MessageID) Next() uint64 {
m.lock.Lock()
defer m.lock.Unlock()
m.id++
return m.id
}
type MessageEvent struct {
ID string // event ID
Type MessageType
Name string // channel name
Data []byte
}
type BroadcastSubscriber struct {
Count int // SEE subscriber count
}
type BroadcastChannel struct {
Name string // channel name
WebSocket *melody.Melody
Subscriber *BroadcastSubscriber // SEE subscriber
}
// SubscriberCount gets the total number of subscribers
func (b *BroadcastChannel) SubscriberCount() int {
return b.WebSocket.Len() + b.Subscriber.Count + UnifiedSSE.Subscriber.Count()
}
// BroadcastString broadcast string message to all subscribers
func (b *BroadcastChannel) BroadcastString(message string) (sent bool, err error) {
data := []byte(message)
sent = UnifiedSSE.SendEvent(&MessageEvent{
Type: MessageTypeString,
Name: b.Name,
Data: data,
})
err = b.WebSocket.Broadcast(data)
return
}
// BroadcastBinary broadcast binary message to all subscribers
func (b *BroadcastChannel) BroadcastBinary(data []byte) (sent bool, err error) {
sent = UnifiedSSE.SendEvent(&MessageEvent{
Type: MessageTypeBinary,
Name: b.Name,
Data: data,
})
err = b.WebSocket.BroadcastBinary(data)
return
}
func (b *BroadcastChannel) HandleRequest(c *gin.Context) {
if err := b.WebSocket.HandleRequestWithKeys(
c.Writer,
c.Request,
map[string]interface{}{
"channel": b.Name,
},
); err != nil {
logging.LogErrorf("create broadcast channel failed: %s", err)
return
}
}
func (b *BroadcastChannel) Subscribed() bool {
return b.SubscriberCount() > 0
}
func (b *BroadcastChannel) Destroy(force bool) bool {
if force || b.Subscribed() {
b.WebSocket.Close()
UnifiedSSE.SendEvent(&MessageEvent{
Type: MessageTypeClose,
Name: b.Name,
})
logging.LogInfof("destroy broadcast channel [%s]", b.Name)
return true
}
return false
}
type EventSourceSubscriber struct {
lock *sync.Mutex
count int
}
func (s *EventSourceSubscriber) updateCount(delta int) {
s.lock.Lock()
defer s.lock.Unlock()
s.count += delta
}
func (s *EventSourceSubscriber) Count() int {
s.lock.Lock()
defer s.lock.Unlock()
return s.count
}
type EventSourceServer struct {
Channel MessageEventChannel // message broadcast channel
Open chan MessageEventChannel // SSE connection open channel
Close chan MessageEventChannel // SSE connection close channel
Connections map[MessageEventChannel]bool // SSE connections
WaitGroup *sync.WaitGroup
Subscriber *EventSourceSubscriber
}
// Start starts the SSE server
func (s *EventSourceServer) Start() {
// REF: https://github.com/gin-gonic/examples/blob/master/server-sent-event/main.go
for {
select {
// Add new available client
case channel := <-s.Open:
s.Connections[channel] = true
// Remove closed client
case channel := <-s.Close:
delete(s.Connections, channel)
close(channel)
// Broadcast message to client
case event := <-s.Channel:
for connection := range s.Connections {
connection <- event
}
}
}
}
// SendEvent sends a message to all subscribers
func (s *EventSourceServer) SendEvent(event *MessageEvent) bool {
if event.ID == "" {
switch event.Type {
case MessageTypeClose:
default:
event.ID = strconv.FormatUint(messageID.Next(), 10)
}
}
s.Channel <- event
return true
// select {
// case s.Channel <- event:
// return true
// default:
// logging.LogErrorf("send event failed: %v", event)
// return false
// }
}
// Subscribe subscribes to specified broadcast channels
func (s *EventSourceServer) Subscribe(c *gin.Context, messageEventChannel MessageEventChannel, channels ...string) {
defer s.WaitGroup.Done()
s.WaitGroup.Add(1)
wg := sync.WaitGroup{}
wg.Add(len(channels))
for _, channel := range channels {
go func() {
defer wg.Done()
var broadcastChannel *BroadcastChannel
_broadcastChannel, exist := BroadcastChannels.Load(channel)
if exist { // channel exists, use it
broadcastChannel = _broadcastChannel.(*BroadcastChannel)
} else {
broadcastChannel = ConstructBroadcastChannel(channel)
}
broadcastChannel.Subscriber.Count++
}()
}
wg.Wait()
channelSet := make(map[string]bool)
for _, channel := range channels {
channelSet[channel] = true
}
c.Writer.Flush()
retry := s.GetRetry(c)
s.Stream(c, messageEventChannel, func(event *MessageEvent, ok bool) bool {
if ok {
if _, exists := channelSet[event.Name]; exists {
switch event.Type {
case MessageTypeClose:
return false
case MessageTypeString:
s.SSEvent(c, &sse.Event{
Id: event.ID,
Event: event.Name,
Retry: retry,
Data: string(event.Data),
})
default:
s.SSEvent(c, &sse.Event{
Id: event.ID,
Event: event.Name,
Retry: retry,
Data: event.Data,
})
}
c.Writer.Flush()
return true
}
return true
}
return false
})
wg.Add(len(channels))
for _, channel := range channels {
go func() {
defer wg.Done()
_broadcastChannel, exist := BroadcastChannels.Load(channel)
if exist {
broadcastChannel := _broadcastChannel.(*BroadcastChannel)
broadcastChannel.Subscriber.Count--
if !broadcastChannel.Subscribed() {
BroadcastChannels.Delete(channel)
broadcastChannel.Destroy(true)
}
}
}()
}
wg.Wait()
}
// SubscribeAll subscribes to all broadcast channels
func (s *EventSourceServer) SubscribeAll(c *gin.Context, messageEventChannel MessageEventChannel) {
defer s.WaitGroup.Done()
s.WaitGroup.Add(1)
s.Subscriber.updateCount(1)
c.Writer.Flush()
retry := s.GetRetry(c)
s.Stream(c, messageEventChannel, func(event *MessageEvent, ok bool) bool {
if ok {
switch event.Type {
case MessageTypeClose:
return true
case MessageTypeString:
s.SSEvent(c, &sse.Event{
Id: event.ID,
Event: event.Name,
Retry: retry,
Data: string(event.Data),
})
default:
s.SSEvent(c, &sse.Event{
Id: event.ID,
Event: event.Name,
Retry: retry,
Data: event.Data,
})
}
c.Writer.Flush()
return true
}
return false
})
s.Subscriber.updateCount(-1)
PruneBroadcastChannels()
}
// GetRetry gets the retry interval
//
// If the retry interval is not specified, it will return 0
func (s *EventSourceServer) GetRetry(c *gin.Context) uint {
value, err := c.GetQuery("retry")
if !err {
retry, err := strconv.ParseUint(value, 10, 0)
if err == nil {
return uint(retry)
}
}
return 0
}
// Stream streams message to client
func (s *EventSourceServer) Stream(c *gin.Context, channel MessageEventChannel, step func(event *MessageEvent, ok bool) bool) bool {
clientGone := c.Writer.CloseNotify()
for {
select {
case <-clientGone:
return true
case event, ok := <-channel:
if step(event, ok) {
continue
}
return false
}
}
}
// SSEvent writes a Server-Sent Event into the body stream.
func (s *EventSourceServer) SSEvent(c *gin.Context, event *sse.Event) {
c.Render(-1, event)
}
func (s *EventSourceServer) Subscribed() bool {
return s.Subscriber.Count() > 0
}
func NewEventSourceServer() (server *EventSourceServer) {
server = &EventSourceServer{
Channel: make(MessageEventChannel, 1024),
Open: make(chan MessageEventChannel, 32),
Close: make(chan MessageEventChannel, 32),
Connections: make(map[MessageEventChannel]bool),
WaitGroup: &sync.WaitGroup{},
Subscriber: &EventSourceSubscriber{
lock: &sync.Mutex{},
count: 0,
},
}
go server.Start()
return
}
type ChannelInfo struct {
Name string `json:"name"` Name string `json:"name"`
Count int `json:"count"` Count int `json:"count"`
} }
type PublishMessage struct { type PublishMessage struct {
Type string `json:"type"` // "string" | "binary" Type MessageType `json:"type"` // "string" | "binary"
Size int `json:"size"` // message size Size int `json:"size"` // message size
Filename string `json:"filename"` // empty string for string-message Filename string `json:"filename"` // empty string for string-message
} }
type PublishResult struct { type PublishResult struct {
Code int `json:"code"` // 0: success Code int `json:"code"` // 0: success
Msg string `json:"msg"` // error message Msg string `json:"msg"` // error message
Channel Channel `json:"channel"` Channel ChannelInfo `json:"channel"`
Message PublishMessage `json:"message"` Message PublishMessage `json:"message"`
} }
var (
BroadcastChannels = sync.Map{}
)
const (
StringMessageType = "string"
BinaryMessageType = "binary"
)
// broadcast create a broadcast channel WebSocket connection // broadcast create a broadcast channel WebSocket connection
// //
// @param // @param
@ -70,89 +416,172 @@ const (
func broadcast(c *gin.Context) { func broadcast(c *gin.Context) {
var ( var (
channel string = c.Query("channel") channel string = c.Query("channel")
broadcastChannel *melody.Melody broadcastChannel *BroadcastChannel
) )
_broadcastChannel, exist := BroadcastChannels.Load(channel) _broadcastChannel, exist := BroadcastChannels.Load(channel)
if exist { if exist { // channel exists, use it
// channel exists, use it broadcastChannel = _broadcastChannel.(*BroadcastChannel)
broadcastChannel = _broadcastChannel.(*melody.Melody) if broadcastChannel.WebSocket.IsClosed() { // channel is closed
if broadcastChannel.IsClosed() { // delete channel before creating a new one
BroadcastChannels.Delete(channel) DestroyBroadcastChannel(channel, true)
} else { } else { // channel is open
subscribe(c, broadcastChannel, channel) // connect to the existing channel
broadcastChannel.HandleRequest(c)
return return
} }
} }
initialize(c, channel)
// create a new channel
broadcastChannel = ConstructBroadcastChannel(channel)
broadcastChannel.HandleRequest(c)
} }
// initialize initializes an broadcast session set // GetBroadcastChannel gets a broadcast channel
func initialize(c *gin.Context, channel string) { //
// channel not found, create a new one // If the channel does not exist but the SSE server is subscribed, it will create a new broadcast channel.
broadcastChannel := melody.New() // If the SSE server is not subscribed, it will return nil.
broadcastChannel.Config.MaxMessageSize = 1024 * 1024 * 128 // 128 MiB func GetBroadcastChannel(channel string) *BroadcastChannel {
_broadcastChannel, exist := BroadcastChannels.Load(channel)
if exist {
return _broadcastChannel.(*BroadcastChannel)
}
if UnifiedSSE.Subscribed() {
return ConstructBroadcastChannel(channel)
}
return nil
}
// ConstructBroadcastChannel creates a broadcast channel
func ConstructBroadcastChannel(channel string) *BroadcastChannel {
websocket := melody.New()
websocket.Config.MaxMessageSize = 1024 * 1024 * 128 // 128 MiB
// broadcast string message to other session // broadcast string message to other session
broadcastChannel.HandleMessage(func(s *melody.Session, msg []byte) { websocket.HandleMessage(func(s *melody.Session, msg []byte) {
broadcastChannel.BroadcastOthers(msg, s) UnifiedSSE.SendEvent(&MessageEvent{
Type: MessageTypeString,
Name: channel,
Data: msg,
})
websocket.BroadcastOthers(msg, s)
}) })
// broadcast binary message to other session // broadcast binary message to other session
broadcastChannel.HandleMessageBinary(func(s *melody.Session, msg []byte) { websocket.HandleMessageBinary(func(s *melody.Session, msg []byte) {
broadcastChannel.BroadcastBinaryOthers(msg, s) UnifiedSSE.SendEvent(&MessageEvent{
Type: MessageTypeBinary,
Name: channel,
Data: msg,
})
websocket.BroadcastBinaryOthers(msg, s)
}) })
// recycling // client close the connection
broadcastChannel.HandleClose(func(s *melody.Session, status int, reason string) error { websocket.HandleClose(func(s *melody.Session, status int, reason string) error {
channel := s.Keys["channel"].(string) channel := s.Keys["channel"].(string)
logging.LogInfof("close broadcast session in channel [%s] with status code %d: %s", channel, status, reason) logging.LogInfof("close broadcast session in channel [%s] with status code %d: %s", channel, status, reason)
count := broadcastChannel.Len() DestroyBroadcastChannel(channel, false)
if count == 0 {
BroadcastChannels.Delete(channel)
broadcastChannel.Close()
logging.LogInfof("dispose broadcast channel [%s]", channel)
}
return nil return nil
}) })
var broadcastChannel *BroadcastChannel
for { for {
// Melody Initialization is an asynchronous process, so we need to wait for it to complete // Melody Initialization is an asynchronous process, so we need to wait for it to complete
if broadcastChannel.IsClosed() { if websocket.IsClosed() {
time.Sleep(1 * time.Nanosecond) time.Sleep(1 * time.Nanosecond)
} else { } else {
_broadcastChannel, loaded := BroadcastChannels.LoadOrStore(channel, broadcastChannel) newBroadcastChannel := &BroadcastChannel{
__broadcastChannel := _broadcastChannel.(*melody.Melody) Name: channel,
if loaded { WebSocket: websocket,
// channel exists Subscriber: &BroadcastSubscriber{
if __broadcastChannel.IsClosed() { Count: 0,
// channel is closed, replace it },
BroadcastChannels.Store(channel, broadcastChannel) }
__broadcastChannel = broadcastChannel _broadcastChannel, loaded := BroadcastChannels.LoadOrStore(channel, newBroadcastChannel)
} else { broadcastChannel = _broadcastChannel.(*BroadcastChannel)
// channel is open, close the new one if loaded { // channel exists
broadcastChannel.Close() if broadcastChannel.WebSocket.IsClosed() { // channel is closed, replace it
BroadcastChannels.Store(channel, newBroadcastChannel)
broadcastChannel = newBroadcastChannel
} else { // channel is open, destroy the new one
newBroadcastChannel.Destroy(true)
} }
} }
subscribe(c, __broadcastChannel, channel)
break break
} }
} }
return broadcastChannel
} }
// subscribe creates a new websocket session to a channel // DestroyBroadcastChannel tries to destroy a broadcast channel
func subscribe(c *gin.Context, broadcastChannel *melody.Melody, channel string) { //
if err := broadcastChannel.HandleRequestWithKeys( // Return true if the channel destroy successfully, otherwise false
c.Writer, func DestroyBroadcastChannel(channel string, force bool) bool {
c.Request, _broadcastChannel, exist := BroadcastChannels.Load(channel)
map[string]interface{}{ if !exist {
"channel": channel, return true
},
); err != nil {
logging.LogErrorf("create broadcast channel failed: %s", err)
return
} }
broadcastChannel := _broadcastChannel.(*BroadcastChannel)
if force || !broadcastChannel.Subscribed() {
BroadcastChannels.Delete(channel)
broadcastChannel.Destroy(true)
return true
}
return false
}
// PruneBroadcastChannels prunes all broadcast channels without subscribers
func PruneBroadcastChannels() []string {
channels := []string{}
BroadcastChannels.Range(func(key, value any) bool {
channel := key.(string)
broadcastChannel := value.(*BroadcastChannel)
if !broadcastChannel.Subscribed() {
BroadcastChannels.Delete(channel)
broadcastChannel.Destroy(true)
channels = append(channels, channel)
}
return true
})
return channels
}
// broadcastSubscribe subscribe to a broadcast channel by SSE
//
// If the channel-name does not specified, the client will subscribe to all broadcast channels.
//
// @param
//
// {
// retry: string, // retry interval (ms) (optional)
// channel: string, // channel name (optional, multiple)
// }
//
// @example
//
// "http://localhost:6806/es/broadcast/subscribe?retry=1000&channel=test1&channel=test2"
func broadcastSubscribe(c *gin.Context) {
// REF: https://github.com/gin-gonic/examples/blob/master/server-sent-event/main.go
c.Writer.Header().Set("Content-Type", "text/event-stream")
c.Writer.Header().Set("Cache-Control", "no-cache")
c.Writer.Header().Set("Connection", "keep-alive")
c.Writer.Header().Set("Transfer-Encoding", "chunked")
messageEventChannel := make(MessageEventChannel)
UnifiedSSE.Open <- messageEventChannel
channels, ok := c.GetQueryArray("channel")
if ok { // subscribe specified broadcast channels
UnifiedSSE.Subscribe(c, messageEventChannel, channels...)
} else { // subscribe all broadcast channels
UnifiedSSE.SubscribeAll(c, messageEventChannel)
}
UnifiedSSE.Close <- messageEventChannel
} }
// broadcastPublish push multiple binary messages to multiple broadcast channels // broadcastPublish push multiple binary messages to multiple broadcast channels
@ -203,42 +632,38 @@ func broadcastPublish(c *gin.Context) {
// Broadcast string messages // Broadcast string messages
for name, values := range form.Value { for name, values := range form.Value {
channel := Channel{ channel := ChannelInfo{
Name: name, Name: name,
Count: 0, Count: 0,
} }
// Get broadcast channel // Get broadcast channel
_broadcastChannel, exist := BroadcastChannels.Load(name) broadcastChannel := GetBroadcastChannel(channel.Name)
var broadcastChannel *melody.Melody if broadcastChannel == nil {
if exist {
broadcastChannel = _broadcastChannel.(*melody.Melody)
channel.Count = broadcastChannel.Len()
} else {
broadcastChannel = nil
channel.Count = 0 channel.Count = 0
} else {
channel.Count = broadcastChannel.SubscriberCount()
} }
// Broadcast each string message to the same channel // Broadcast each string message to the same channel
for _, value := range values { for _, value := range values {
content := []byte(value)
result := &PublishResult{ result := &PublishResult{
Code: 0, Code: 0,
Msg: "", Msg: "",
Channel: channel, Channel: channel,
Message: PublishMessage{ Message: PublishMessage{
Type: StringMessageType, Type: MessageTypeString,
Size: len(content), Size: len(value),
Filename: "", Filename: "",
}, },
} }
results = append(results, result) results = append(results, result)
if broadcastChannel != nil { if broadcastChannel != nil {
err := broadcastChannel.Broadcast(content) _, err := broadcastChannel.BroadcastString(value)
if err != nil { if err != nil {
logging.LogErrorf("broadcast message failed: %s", err) logging.LogErrorf("broadcast message failed: %s", err)
result.Code = -1 result.Code = -2
result.Msg = err.Error() result.Msg = err.Error()
continue continue
} }
@ -248,20 +673,17 @@ func broadcastPublish(c *gin.Context) {
// Broadcast binary message // Broadcast binary message
for name, files := range form.File { for name, files := range form.File {
channel := Channel{ channel := ChannelInfo{
Name: name, Name: name,
Count: 0, Count: 0,
} }
// Get broadcast channel // Get broadcast channel
_broadcastChannel, exist := BroadcastChannels.Load(name) broadcastChannel := GetBroadcastChannel(channel.Name)
var broadcastChannel *melody.Melody if broadcastChannel == nil {
if exist {
broadcastChannel = _broadcastChannel.(*melody.Melody)
channel.Count = broadcastChannel.Len()
} else {
broadcastChannel = nil
channel.Count = 0 channel.Count = 0
} else {
channel.Count = broadcastChannel.SubscriberCount()
} }
// Broadcast each binary message to the same channel // Broadcast each binary message to the same channel
@ -271,7 +693,7 @@ func broadcastPublish(c *gin.Context) {
Msg: "", Msg: "",
Channel: channel, Channel: channel,
Message: PublishMessage{ Message: PublishMessage{
Type: BinaryMessageType, Type: MessageTypeBinary,
Size: int(file.Size), Size: int(file.Size),
Filename: file.Filename, Filename: file.Filename,
}, },
@ -282,7 +704,7 @@ func broadcastPublish(c *gin.Context) {
value, err := file.Open() value, err := file.Open()
if err != nil { if err != nil {
logging.LogErrorf("open multipart form file [%s] failed: %s", file.Filename, err) logging.LogErrorf("open multipart form file [%s] failed: %s", file.Filename, err)
result.Code = -2 result.Code = -4
result.Msg = err.Error() result.Msg = err.Error()
continue continue
} }
@ -295,9 +717,9 @@ func broadcastPublish(c *gin.Context) {
continue continue
} }
if err := broadcastChannel.BroadcastBinary(content); err != nil { if _, err := broadcastChannel.BroadcastBinary(content); err != nil {
logging.LogErrorf("broadcast binary message failed: %s", err) logging.LogErrorf("broadcast binary message failed: %s", err)
result.Code = -1 result.Code = -2
result.Msg = err.Error() result.Msg = err.Error()
continue continue
} }
@ -341,24 +763,23 @@ func postMessage(c *gin.Context) {
} }
message := arg["message"].(string) message := arg["message"].(string)
channel := &Channel{ channel := &ChannelInfo{
Name: arg["channel"].(string), Name: arg["channel"].(string),
Count: 0, Count: 0,
} }
if _broadcastChannel, ok := BroadcastChannels.Load(channel.Name); !ok { broadcastChannel := GetBroadcastChannel(channel.Name)
if broadcastChannel == nil {
channel.Count = 0 channel.Count = 0
} else { } else {
var broadcastChannel = _broadcastChannel.(*melody.Melody) channel.Count = broadcastChannel.SubscriberCount()
if err := broadcastChannel.Broadcast([]byte(message)); err != nil { if _, err := broadcastChannel.BroadcastString(message); err != nil {
logging.LogErrorf("broadcast message failed: %s", err) logging.LogErrorf("broadcast message failed: %s", err)
ret.Code = -2 ret.Code = -2
ret.Msg = err.Error() ret.Msg = err.Error()
return return
} }
channel.Count = broadcastChannel.Len()
} }
ret.Data = map[string]interface{}{ ret.Data = map[string]interface{}{
"channel": channel, "channel": channel,
@ -394,7 +815,7 @@ func getChannelInfo(c *gin.Context) {
return return
} }
channel := &Channel{ channel := &ChannelInfo{
Name: arg["name"].(string), Name: arg["name"].(string),
Count: 0, Count: 0,
} }
@ -402,8 +823,8 @@ func getChannelInfo(c *gin.Context) {
if _broadcastChannel, ok := BroadcastChannels.Load(channel.Name); !ok { if _broadcastChannel, ok := BroadcastChannels.Load(channel.Name); !ok {
channel.Count = 0 channel.Count = 0
} else { } else {
var broadcastChannel = _broadcastChannel.(*melody.Melody) var broadcastChannel = _broadcastChannel.(*BroadcastChannel)
channel.Count = broadcastChannel.Len() channel.Count = broadcastChannel.SubscriberCount()
} }
ret.Data = map[string]interface{}{ ret.Data = map[string]interface{}{
@ -429,12 +850,12 @@ func getChannels(c *gin.Context) {
ret := gulu.Ret.NewResult() ret := gulu.Ret.NewResult()
defer c.JSON(http.StatusOK, ret) defer c.JSON(http.StatusOK, ret)
channels := []*Channel{} channels := []*ChannelInfo{}
BroadcastChannels.Range(func(key, value any) bool { BroadcastChannels.Range(func(key, value any) bool {
broadcastChannel := value.(*melody.Melody) broadcastChannel := value.(*BroadcastChannel)
channels = append(channels, &Channel{ channels = append(channels, &ChannelInfo{
Name: key.(string), Name: key.(string),
Count: broadcastChannel.Len(), Count: broadcastChannel.SubscriberCount(),
}) })
return true return true
}) })

View File

@ -459,6 +459,8 @@ func ServeAPI(ginServer *gin.Engine) {
ginServer.Handle("POST", "/api/network/forwardProxy", model.CheckAuth, model.CheckAdminRole, forwardProxy) ginServer.Handle("POST", "/api/network/forwardProxy", model.CheckAuth, model.CheckAdminRole, forwardProxy)
ginServer.Handle("GET", "/ws/broadcast", model.CheckAuth, model.CheckAdminRole, broadcast) ginServer.Handle("GET", "/ws/broadcast", model.CheckAuth, model.CheckAdminRole, broadcast)
ginServer.Handle("GET", "/es/broadcast/subscribe", model.CheckAuth, model.CheckAdminRole, broadcastSubscribe)
ginServer.Handle("POST", "/api/broadcast/publish", model.CheckAuth, model.CheckAdminRole, broadcastPublish) ginServer.Handle("POST", "/api/broadcast/publish", model.CheckAuth, model.CheckAdminRole, broadcastPublish)
ginServer.Handle("POST", "/api/broadcast/postMessage", model.CheckAuth, model.CheckAdminRole, postMessage) ginServer.Handle("POST", "/api/broadcast/postMessage", model.CheckAuth, model.CheckAdminRole, postMessage)
ginServer.Handle("POST", "/api/broadcast/getChannels", model.CheckAuth, model.CheckAdminRole, getChannels) ginServer.Handle("POST", "/api/broadcast/getChannels", model.CheckAuth, model.CheckAdminRole, getChannels)

View File

@ -32,6 +32,7 @@ require (
github.com/gabriel-vasile/mimetype v1.4.5 github.com/gabriel-vasile/mimetype v1.4.5
github.com/gin-contrib/gzip v1.0.1 github.com/gin-contrib/gzip v1.0.1
github.com/gin-contrib/sessions v1.0.1 github.com/gin-contrib/sessions v1.0.1
github.com/gin-contrib/sse v0.1.0
github.com/gin-gonic/gin v1.10.0 github.com/gin-gonic/gin v1.10.0
github.com/go-ole/go-ole v1.3.0 github.com/go-ole/go-ole v1.3.0
github.com/goccy/go-json v0.10.3 github.com/goccy/go-json v0.10.3
@ -121,7 +122,6 @@ require (
github.com/ebitengine/purego v0.8.1 // indirect github.com/ebitengine/purego v0.8.1 // indirect
github.com/fatih/set v0.2.1 // indirect github.com/fatih/set v0.2.1 // indirect
github.com/gigawattio/window v0.0.0-20180317192513-0f5467e35573 // indirect github.com/gigawattio/window v0.0.0-20180317192513-0f5467e35573 // indirect
github.com/gin-contrib/sse v0.1.0 // indirect
github.com/go-playground/locales v0.14.1 // indirect github.com/go-playground/locales v0.14.1 // indirect
github.com/go-playground/universal-translator v0.18.1 // indirect github.com/go-playground/universal-translator v0.18.1 // indirect
github.com/go-playground/validator/v10 v10.22.1 // indirect github.com/go-playground/validator/v10 v10.22.1 // indirect

View File

@ -426,19 +426,34 @@ func ControlConcurrency(c *gin.Context) {
reqPath := c.Request.URL.Path reqPath := c.Request.URL.Path
// Improve the concurrency of the kernel data reading interfaces https://github.com/siyuan-note/siyuan/issues/10149 // Improve the concurrency of the kernel data reading interfaces https://github.com/siyuan-note/siyuan/issues/10149
if strings.HasPrefix(reqPath, "/stage/") || strings.HasPrefix(reqPath, "/assets/") || strings.HasPrefix(reqPath, "/appearance/") { if strings.HasPrefix(reqPath, "/stage/") ||
strings.HasPrefix(reqPath, "/assets/") ||
strings.HasPrefix(reqPath, "/emojis/") ||
strings.HasPrefix(reqPath, "/plugins/") ||
strings.HasPrefix(reqPath, "/public/") ||
strings.HasPrefix(reqPath, "/snippets/") ||
strings.HasPrefix(reqPath, "/templates/") ||
strings.HasPrefix(reqPath, "/widgets/") ||
strings.HasPrefix(reqPath, "/appearance/") ||
strings.HasPrefix(reqPath, "/export/") ||
strings.HasPrefix(reqPath, "/history/") ||
strings.HasPrefix(reqPath, "/api/query/") ||
strings.HasPrefix(reqPath, "/api/search/") ||
strings.HasPrefix(reqPath, "/api/network/") ||
strings.HasPrefix(reqPath, "/api/broadcast/") ||
strings.HasPrefix(reqPath, "/es/") {
c.Next() c.Next()
return return
} }
parts := strings.Split(reqPath, "/") parts := strings.Split(reqPath, "/")
function := parts[len(parts)-1] function := parts[len(parts)-1]
if strings.HasPrefix(function, "get") || strings.HasPrefix(function, "list") || if strings.HasPrefix(function, "get") ||
strings.HasPrefix(function, "search") || strings.HasPrefix(function, "render") || strings.HasPrefix(function, "ls") { strings.HasPrefix(function, "list") ||
c.Next() strings.HasPrefix(function, "search") ||
return strings.HasPrefix(function, "render") ||
} strings.HasPrefix(function, "ls") {
if strings.HasPrefix(function, "/api/query/") || strings.HasPrefix(function, "/api/search/") {
c.Next() c.Next()
return return
} }