From 4e493483f82645d5cac1aa3a8be0e31ef91d4b27 Mon Sep 17 00:00:00 2001 From: Daniel <845765@qq.com> Date: Sun, 11 Jun 2023 10:13:39 +0800 Subject: [PATCH] :art: Add sync websocket --- kernel/model/sync.go | 90 ++++++++++++++++++++++++++++++++++++++++++++ kernel/util/path.go | 11 +++--- 2 files changed, 96 insertions(+), 5 deletions(-) diff --git a/kernel/model/sync.go b/kernel/model/sync.go index c394f7e13..408138b93 100644 --- a/kernel/model/sync.go +++ b/kernel/model/sync.go @@ -19,6 +19,7 @@ package model import ( "errors" "fmt" + "net/http" "os" "path" "path/filepath" @@ -29,6 +30,7 @@ import ( "github.com/88250/gulu" "github.com/88250/lute/html" "github.com/dustin/go-humanize" + "github.com/gorilla/websocket" "github.com/siyuan-note/dejavu" "github.com/siyuan-note/dejavu/cloud" "github.com/siyuan-note/logging" @@ -133,6 +135,8 @@ func SyncDataJob() { func BootSyncData() { defer logging.Recover() + connectSyncWebSocket() + if !checkSync(true, false, false) { return } @@ -595,3 +599,89 @@ func isProviderOnline(byHand bool) (ret bool) { } return } + +func connectSyncWebSocket() { + defer logging.Recover() + + if !Conf.Sync.Enabled || !IsSubscriber() || conf.ProviderSiYuan != Conf.Sync.Provider { + return + } + + if "1602224134353" != Conf.User.UserId { + return + } + + logging.LogInfof("connecting sync websocket...") + c, dialErr := dialSyncWebSocket() + if nil != dialErr { + logging.LogWarnf("connect sync websocket failed: %s", dialErr) + return + } + + logging.LogInfof("sync websocket connected") + c.SetCloseHandler(func(code int, text string) error { + logging.LogWarnf("sync websocket closed: %d, %s", code, text) + return nil + }) + + go func() { + defer logging.Recover() + + for { + result := map[string]interface{}{} + if readErr := c.ReadJSON(&result); nil != readErr { + reconnected := false + for retries := 0; retries < 7; retries++ { + time.Sleep(7 * time.Second) + logging.LogWarnf("reconnecting sync websocket...") + c, dialErr = dialSyncWebSocket() + if nil != dialErr { + logging.LogWarnf("reconnect sync websocket failed: %s", dialErr) + continue + } else { + logging.LogInfof("sync websocket reconnected") + reconnected = true + break + } + } + if !reconnected { + logging.LogWarnf("reconnect sync websocket failed, do not retry") + return + } + + continue + } + + logging.LogInfof("sync websocket message: %v", result) + } + }() + + go func() { + defer logging.Recover() + + for { + time.Sleep(10 * time.Second) + //request := map[string]interface{}{ + // "cmd": "ping", + //} + // + //if writeErr := c.WriteJSON(request); nil != writeErr { + // logging.LogErrorf("write sync websocket message failed: %s", writeErr) + // return + //} + } + }() +} + +func dialSyncWebSocket() (c *websocket.Conn, err error) { + path := "/apis/siyuan/dejavu/ws" + endpoint := util.AliyunWebSocketServer + path + //endpoint := "ws://127.0.0.1:64388" + path + header := http.Header{ + "x-siyuan-uid": []string{Conf.User.UserId}, + "x-siyuan-kernel": []string{gulu.Rand.String(7)}, + "x-siyuan-ver": []string{util.Ver}, + } + c, _, err = websocket.DefaultDialer.Dial(endpoint, header) + return +} diff --git a/kernel/util/path.go b/kernel/util/path.go index f6dd83e1e..3012bfc66 100644 --- a/kernel/util/path.go +++ b/kernel/util/path.go @@ -36,11 +36,12 @@ var ( ) const ( - AliyunServer = "https://siyuan-sync.b3logfile.com" // 云端服务地址,阿里云负载均衡,用于接口,数据同步文件上传、下载会走七牛云 OSS SiYuanSyncServer - SiYuanSyncServer = "https://siyuan-data.b3logfile.com/" // 云端数据同步服务地址,七牛云 OSS,用于数据同步文件上传、下载 - BazaarStatServer = "http://bazaar.b3logfile.com" // 集市包统计服务地址,直接对接 Bucket 没有 CDN 缓存 - BazaarOSSServer = "https://oss.b3logfile.com" // 云端对象存储地址,七牛云,仅用于读取集市包 - LiandiServer = "https://ld246.com" // 链滴服务地址,用于分享发布帖子 + AliyunServer = "https://siyuan-sync.b3logfile.com" // 云端服务地址,阿里云负载均衡,用于接口,数据同步文件上传、下载会走七牛云 OSS SiYuanSyncServer + AliyunWebSocketServer = "wss://siyuan-sync.b3logfile.com" // 云端服务地址,阿里云负载均衡,用于接口,数据同步文件上传、下载会走七牛云 OSS SiYuanSyncServer + SiYuanSyncServer = "https://siyuan-data.b3logfile.com/" // 云端数据同步服务地址,七牛云 OSS,用于数据同步文件上传、下载 + BazaarStatServer = "http://bazaar.b3logfile.com" // 集市包统计服务地址,直接对接 Bucket 没有 CDN 缓存 + BazaarOSSServer = "https://oss.b3logfile.com" // 云端对象存储地址,七牛云,仅用于读取集市包 + LiandiServer = "https://ld246.com" // 链滴服务地址,用于分享发布帖子 ) func ShortPathForBootingDisplay(p string) string {