From bea32e96d51b5490bf74401a84dba9e347ded282 Mon Sep 17 00:00:00 2001 From: Daniel <845765@qq.com> Date: Fri, 8 Dec 2023 13:05:50 +0800 Subject: [PATCH] :art: Improve kernel stability by eliminating some data races https://github.com/siyuan-note/siyuan/issues/9842 --- kernel/go.mod | 11 +++---- kernel/go.sum | 20 ++++++------- kernel/model/conf.go | 2 +- kernel/model/import.go | 11 ++++--- kernel/model/index.go | 20 ++++++------- kernel/model/index_fix.go | 14 ++++----- kernel/model/repository.go | 18 ++++++------ kernel/model/sync.go | 49 ++++++++++++++++++++++--------- kernel/model/transaction.go | 2 +- kernel/model/updater.go | 4 +-- kernel/sql/queue.go | 42 ++++++++++++-------------- kernel/sql/queue_asset_content.go | 2 +- kernel/sql/queue_history.go | 2 +- kernel/task/queue.go | 28 ++++++++++++++---- kernel/treenode/blocktree.go | 12 +++++--- kernel/util/runtime.go | 7 ++--- kernel/util/working.go | 31 +++++++++++-------- 17 files changed, 158 insertions(+), 117 deletions(-) diff --git a/kernel/go.mod b/kernel/go.mod index 8998d9fff..a66b5d8d6 100644 --- a/kernel/go.mod +++ b/kernel/go.mod @@ -8,7 +8,7 @@ require ( code.sajari.com/docconv v1.3.8 github.com/88250/clipboard v0.1.5 github.com/88250/epub v0.0.0-20230830085737-c19055cd1f48 - github.com/88250/gulu v1.2.3-0.20231205093500-bcc43ee27cba + github.com/88250/gulu v1.2.3-0.20231208041529-dafc64f2bd1d github.com/88250/lute v1.7.6-0.20231205144411-36eb02d113a6 github.com/88250/pdfcpu v0.3.14-0.20230401044135-c7369a99720c github.com/88250/vitess-sqlparser v0.0.0-20210205111146-56a2ded2aba1 @@ -50,12 +50,12 @@ require ( github.com/rqlite/sql v0.0.0-20221103124402-8f9ff0ceb8f0 github.com/sashabaranov/go-openai v1.17.9 github.com/shirou/gopsutil/v3 v3.23.11 - github.com/siyuan-note/dejavu v0.0.0-20231206081926-c2bda200c8ab + github.com/siyuan-note/dejavu v0.0.0-20231208043525-6211184a4438 github.com/siyuan-note/encryption v0.0.0-20220713091850-5ecd92177b75 github.com/siyuan-note/eventbus v0.0.0-20230804030110-cf250f838c80 github.com/siyuan-note/filelock v0.0.0-20231206081043-b75b363ddb1b github.com/siyuan-note/httpclient v0.0.0-20231120083123-750db4d28b38 - github.com/siyuan-note/logging v0.0.0-20231030034701-8265764f00ff + github.com/siyuan-note/logging v0.0.0-20231208035918-61f884c854f0 github.com/siyuan-note/riff v0.0.0-20231128081053-0cd7a5fa0076 github.com/steambap/captcha v1.4.1 github.com/studio-b12/gowebdav v0.9.0 @@ -78,7 +78,7 @@ require ( github.com/andybalholm/brotli v1.0.6 // indirect github.com/andybalholm/cascadia v1.3.2 // indirect github.com/asaskevich/EventBus v0.0.0-20200907212545-49d423059eef // indirect - github.com/aws/aws-sdk-go v1.48.13 // indirect + github.com/aws/aws-sdk-go v1.48.15 // indirect github.com/bytedance/sonic v1.10.2 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/chenzhuoyu/base64x v0.0.0-20230717121745-296ad89f973d // indirect @@ -158,7 +158,7 @@ require ( go.uber.org/mock v0.3.0 // indirect golang.org/x/arch v0.6.0 // indirect golang.org/x/crypto v0.16.0 // indirect - golang.org/x/exp v0.0.0-20231127185646-65229373498e // indirect + golang.org/x/exp v0.0.0-20231206192017-f3f8817b8deb // indirect golang.org/x/net v0.19.0 // indirect golang.org/x/sync v0.5.0 // indirect golang.org/x/sys v0.15.0 // indirect @@ -180,3 +180,4 @@ replace github.com/mattn/go-sqlite3 => github.com/88250/go-sqlite3 v1.14.13-0.20 //replace github.com/88250/gulu => D:\88250\gulu //replace github.com/mattn/go-sqlite3 => D:\88250\go-sqlite3 //replace github.com/88250/epub => D:\88250\epub +//replace github.com/siyuan-note/logging => D:\88250\logging diff --git a/kernel/go.sum b/kernel/go.sum index a77a3e2db..fbb4d6b0a 100644 --- a/kernel/go.sum +++ b/kernel/go.sum @@ -8,8 +8,8 @@ github.com/88250/epub v0.0.0-20230830085737-c19055cd1f48 h1:qiE88Pw/9GG8hvMfpfB4 github.com/88250/epub v0.0.0-20230830085737-c19055cd1f48/go.mod h1:UgVSq5iO9pOvqs3hIGNVk6WXDiAB0v3Dlg4nssQJ7W4= github.com/88250/go-sqlite3 v1.14.13-0.20220714142610-fbbda1ee84f5 h1:8HdZozCsXSiEXYAo8Zbi/r2Ld6Dd4MmGHgir3EaSuHQ= github.com/88250/go-sqlite3 v1.14.13-0.20220714142610-fbbda1ee84f5/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU= -github.com/88250/gulu v1.2.3-0.20231205093500-bcc43ee27cba h1:ypA1nYRPXm+oac3GnAziPOhMx49nTLtrajePha1CU0g= -github.com/88250/gulu v1.2.3-0.20231205093500-bcc43ee27cba/go.mod h1:pTWnjt+6qUqNnP9xltswsJxgCBVu3C7eW09u48LWX0k= +github.com/88250/gulu v1.2.3-0.20231208041529-dafc64f2bd1d h1:5v2fUykQ2LHVLjYH6hitmpGQ4q9NIFTzdncnI7TNkaY= +github.com/88250/gulu v1.2.3-0.20231208041529-dafc64f2bd1d/go.mod h1:pTWnjt+6qUqNnP9xltswsJxgCBVu3C7eW09u48LWX0k= github.com/88250/lute v1.7.6-0.20231205144411-36eb02d113a6 h1:nCZMMQB5XqhX2jJYegnmrlBAYKotE7Cwex17Kz24rko= github.com/88250/lute v1.7.6-0.20231205144411-36eb02d113a6/go.mod h1:+wUqx/1kdFDbWtxn9LYJlaCOAeol2pjSO6w+WJTVQsg= github.com/88250/pdfcpu v0.3.14-0.20230401044135-c7369a99720c h1:Dl/8S9iLyPMTElnWIBxmjaLiWrkI5P4a21ivwAn5pU0= @@ -49,8 +49,8 @@ github.com/araddon/dateparse v0.0.0-20210429162001-6b43995a97de h1:FxWPpzIjnTlhP github.com/araddon/dateparse v0.0.0-20210429162001-6b43995a97de/go.mod h1:DCaWoUhZrYW9p1lxo/cm8EmUOOzAPSEZNGF2DK1dJgw= github.com/asaskevich/EventBus v0.0.0-20200907212545-49d423059eef h1:2JGTg6JapxP9/R33ZaagQtAM4EkkSYnIAlOG5EI8gkM= github.com/asaskevich/EventBus v0.0.0-20200907212545-49d423059eef/go.mod h1:JS7hed4L1fj0hXcyEejnW57/7LCetXggd+vwrRnYeII= -github.com/aws/aws-sdk-go v1.48.13 h1:6N4GTme6MpxfCisWf5pql8k3TBORiKTmbeutZCDXlG8= -github.com/aws/aws-sdk-go v1.48.13/go.mod h1:LF8svs817+Nz+DmiMQKTO3ubZ/6IaTpq3TjupRn3Eqk= +github.com/aws/aws-sdk-go v1.48.15 h1:Gad2C4pLzuZDd5CA0Rvkfko6qUDDTOYru145gkO7w/Y= +github.com/aws/aws-sdk-go v1.48.15/go.mod h1:LF8svs817+Nz+DmiMQKTO3ubZ/6IaTpq3TjupRn3Eqk= github.com/bytedance/sonic v1.5.0/go.mod h1:ED5hyg4y6t3/9Ku1R6dU/4KyJ48DZ4jPhfY1O2AihPM= github.com/bytedance/sonic v1.10.0-rc/go.mod h1:ElCzW+ufi8qKqNW0FY314xriJhyJhuoJ3gFZdAHF7NM= github.com/bytedance/sonic v1.10.2 h1:GQebETVBxYB7JGWJtLBi07OVzWwt+8dWA00gEVW2ZFE= @@ -356,8 +356,8 @@ github.com/shopspring/decimal v1.3.1/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFR github.com/shurcooL/gofontwoff v0.0.0-20181114050219-180f79e6909d h1:lvCTyBbr36+tqMccdGMwuEU+hjux/zL6xSmf5S9ITaA= github.com/shurcooL/gofontwoff v0.0.0-20181114050219-180f79e6909d/go.mod h1:05UtEgK5zq39gLST6uB0cf3NEHjETfB4Fgr3Gx5R9Vw= github.com/simplereach/timeutils v1.2.0/go.mod h1:VVbQDfN/FHRZa1LSqcwo4kNZ62OOyqLLGQKYB3pB0Q8= -github.com/siyuan-note/dejavu v0.0.0-20231206081926-c2bda200c8ab h1:yrHnTwFWgjjWBxPiOjpS+l64IDgfL2wPMSo6BJbcpB0= -github.com/siyuan-note/dejavu v0.0.0-20231206081926-c2bda200c8ab/go.mod h1:tcmM806ypoV+j+LXHkeGvYcWMGzTg1h2qqrxjish5DQ= +github.com/siyuan-note/dejavu v0.0.0-20231208043525-6211184a4438 h1:MMQYT1Kbrr2kgmwSwJXseCM+CUg5QmWMgFz3xseQIBk= +github.com/siyuan-note/dejavu v0.0.0-20231208043525-6211184a4438/go.mod h1:JFtbncCYIJft4Krp7U8wdxfqmV7DzpVpfBwz/KGUjX4= github.com/siyuan-note/encryption v0.0.0-20220713091850-5ecd92177b75 h1:Bi7/7f29LW+Fm0cHc0J1NO1cZqyJwljSWVmfOqVZgaE= github.com/siyuan-note/encryption v0.0.0-20220713091850-5ecd92177b75/go.mod h1:H8fyqqAbp9XreANjeSbc72zEdFfKTXYN34tc1TjZwtw= github.com/siyuan-note/eventbus v0.0.0-20230804030110-cf250f838c80 h1:XghjHKJd+SiL0DkGYFVC+UGUDFtnR4v9gkAbPeh9Eq8= @@ -366,8 +366,8 @@ github.com/siyuan-note/filelock v0.0.0-20231206081043-b75b363ddb1b h1:zl/gZ/6emG github.com/siyuan-note/filelock v0.0.0-20231206081043-b75b363ddb1b/go.mod h1:4teUA+79ESPDbagztng2dJfyv66pGrri5ldGJQG/saI= github.com/siyuan-note/httpclient v0.0.0-20231120083123-750db4d28b38 h1:deUrbUOwmsNYRT0x12GHaJ8exxmDMwXFL/1J4dVnLMM= github.com/siyuan-note/httpclient v0.0.0-20231120083123-750db4d28b38/go.mod h1:QOTSBBSeKU90Kb4aeDVcQ0G+2zJcNuhkwAlsJ2cnmkQ= -github.com/siyuan-note/logging v0.0.0-20231030034701-8265764f00ff h1:5GcxrTOJTsusXOLhg4GuHWbEa4M5gu+CNfL0giwNjDM= -github.com/siyuan-note/logging v0.0.0-20231030034701-8265764f00ff/go.mod h1:6mRFtAAvYPn3cDzqvyv+t8BVPGqpONDMMb5ywOhY1D4= +github.com/siyuan-note/logging v0.0.0-20231208035918-61f884c854f0 h1:+XjUr9UMXsczdO2bGA72p/k9wa2ShPb8ybi7CDBJ7HQ= +github.com/siyuan-note/logging v0.0.0-20231208035918-61f884c854f0/go.mod h1:6mRFtAAvYPn3cDzqvyv+t8BVPGqpONDMMb5ywOhY1D4= github.com/siyuan-note/riff v0.0.0-20231128081053-0cd7a5fa0076 h1:mfXnjcnOyY+2Rhtn5VhWjeYBfdHln9MppR91Yory0M4= github.com/siyuan-note/riff v0.0.0-20231128081053-0cd7a5fa0076/go.mod h1:vizseu1SWCiP6lZM2Hx2dJD4t7nCIgChwkJR7B/hQ2A= github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykElWQ6/NYmHa3jpm/yHnI4xSofP+UP6SpjHcSeM= @@ -447,8 +447,8 @@ golang.org/x/crypto v0.12.0/go.mod h1:NF0Gs7EO5K4qLn+Ylc+fih8BSTeIjAP05siRnAh98y golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4= golang.org/x/crypto v0.16.0 h1:mMMrFzRSCF0GvB7Ne27XVtVAaXLrPmgPC7/v0tkwHaY= golang.org/x/crypto v0.16.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4= -golang.org/x/exp v0.0.0-20231127185646-65229373498e h1:Gvh4YaCaXNs6dKTlfgismwWZKyjVZXwOPfIyUaqU3No= -golang.org/x/exp v0.0.0-20231127185646-65229373498e/go.mod h1:iRJReGqOEeBhDZGkGbynYwcHlctCvnjTYIamk7uXpHI= +golang.org/x/exp v0.0.0-20231206192017-f3f8817b8deb h1:c0vyKkb6yr3KR7jEfJaOSv4lG7xPkbN6r52aJz1d8a8= +golang.org/x/exp v0.0.0-20231206192017-f3f8817b8deb/go.mod h1:iRJReGqOEeBhDZGkGbynYwcHlctCvnjTYIamk7uXpHI= golang.org/x/image v0.0.0-20210628002857-a66eb6448b8d/go.mod h1:023OzeP/+EPmXeapQh35lcL3II3LrY8Ic+EFFKVhULM= golang.org/x/image v0.11.0/go.mod h1:bglhjqbqVuEb9e9+eNR45Jfu7D+T4Qan+NhQk8Ck2P8= golang.org/x/image v0.14.0 h1:tNgSxAFe3jC4uYqvZdTr84SZoM1KfwdC9SKIFrLjFn4= diff --git a/kernel/model/conf.go b/kernel/model/conf.go index cbd9f2494..93272e79f 100644 --- a/kernel/model/conf.go +++ b/kernel/model/conf.go @@ -492,7 +492,7 @@ var exitLock = sync.Mutex{} func Close(force bool, execInstallPkg int) (exitCode int) { exitLock.Lock() defer exitLock.Unlock() - util.IsExiting = true + util.IsExiting.Store(true) logging.LogInfof("exiting kernel [force=%v, execInstallPkg=%d]", force, execInstallPkg) util.PushMsg(Conf.Language(95), 10000*60) diff --git a/kernel/model/import.go b/kernel/model/import.go index 2ee565d64..1ed0ce561 100644 --- a/kernel/model/import.go +++ b/kernel/model/import.go @@ -84,8 +84,8 @@ func ImportSY(zipPath, boxID, toPath string) (err error) { util.PushEndlessProgress(Conf.Language(73)) defer util.ClearPushProgress(100) - syncLock.Lock() - defer syncLock.Unlock() + lockSync() + defer unlockSync() baseName := filepath.Base(zipPath) ext := filepath.Ext(baseName) @@ -550,8 +550,8 @@ func ImportData(zipPath string) (err error) { util.PushEndlessProgress(Conf.Language(73)) defer util.ClearPushProgress(100) - syncLock.Lock() - defer syncLock.Unlock() + lockSync() + defer unlockSync() baseName := filepath.Base(zipPath) ext := filepath.Ext(baseName) @@ -605,6 +605,9 @@ func ImportFromLocalPath(boxID, localPath string, toPath string) (err error) { } }() + lockSync() + defer unlockSync() + WaitForWritingFiles() var baseHPath, baseTargetPath, boxLocalPath string diff --git a/kernel/model/index.go b/kernel/model/index.go index e6fd62b37..9f22fa552 100644 --- a/kernel/model/index.go +++ b/kernel/model/index.go @@ -24,6 +24,7 @@ import ( "runtime/debug" "strings" "sync" + "sync/atomic" "time" "github.com/88250/gulu" @@ -70,13 +71,12 @@ func index(boxID string) { if 1 > boxLen { boxLen = 1 } - bootProgressPart := 30.0 / float64(boxLen) / float64(len(files)) + bootProgressPart := int32(30.0 / float64(boxLen) / float64(len(files))) start := time.Now() luteEngine := util.NewLute() - var treeCount int - var treeSize int64 - i := 0 + treeCount := atomic.Int32{} + treeSize := atomic.Int64{} util.PushStatusBar(fmt.Sprintf("["+html.EscapeString(box.Name)+"] "+Conf.Language(64), len(files))) poolSize := runtime.NumCPU() @@ -109,12 +109,12 @@ func index(boxID string) { sql.IndexTreeQueue(box.ID, file.path) util.IncBootProgress(bootProgressPart, fmt.Sprintf(Conf.Language(92), util.ShortPathForBootingDisplay(tree.Path))) - treeSize += file.size - treeCount++ + treeSize.Add(file.size) + treeCount.Add(1) + i := treeCount.Load() if 1 < i && 0 == i%64 { - util.PushStatusBar(fmt.Sprintf(Conf.Language(88), i, len(files)-i)) + util.PushStatusBar(fmt.Sprintf(Conf.Language(88), i, int32(len(files))-i)) } - i++ }) for _, file := range files { if file.isdir || !strings.HasSuffix(file.name, ".sy") { @@ -130,7 +130,7 @@ func index(boxID string) { box.UpdateHistoryGenerated() // 初始化历史生成时间为当前时间 end := time.Now() elapsed := end.Sub(start).Seconds() - logging.LogInfof("rebuilt database for notebook [%s] in [%.2fs], tree [count=%d, size=%s]", box.ID, elapsed, treeCount, humanize.Bytes(uint64(treeSize))) + logging.LogInfof("rebuilt database for notebook [%s] in [%.2fs], tree [count=%d, size=%s]", box.ID, elapsed, treeCount.Load(), humanize.Bytes(uint64(treeSize.Load()))) debug.FreeOSMemory() return } @@ -187,7 +187,7 @@ func IndexRefs() { i := 0 size := len(defBlockIDs) if 0 < size { - bootProgressPart := 10.0 / float64(size) + bootProgressPart := int32(10.0 / float64(size)) for _, defBlockID := range defBlockIDs { defTree, loadErr := LoadTreeByID(defBlockID) diff --git a/kernel/model/index_fix.go b/kernel/model/index_fix.go index 2a63272b3..334684a90 100644 --- a/kernel/model/index_fix.go +++ b/kernel/model/index_fix.go @@ -116,7 +116,7 @@ func removeDuplicateDatabaseIndex() { } deletes++ toRemoveRootIDs = append(toRemoveRootIDs, rootID) - if util.IsExiting { + if util.IsExiting.Load() { break } } @@ -335,12 +335,12 @@ func fixBlockTreeByFileSys() { } reindexTreeByPath(box.ID, p, i, size, luteEngine) - if util.IsExiting { + if util.IsExiting.Load() { break } } - if util.IsExiting { + if util.IsExiting.Load() { break } } @@ -371,7 +371,7 @@ func reindexTreeByUpdated(rootUpdatedMap, dbRootUpdatedMap map[string]string) { for rootID, updated := range rootUpdatedMap { i++ - if util.IsExiting { + if util.IsExiting.Load() { break } @@ -396,7 +396,7 @@ func reindexTreeByUpdated(rootUpdatedMap, dbRootUpdatedMap map[string]string) { continue } - if util.IsExiting { + if util.IsExiting.Load() { break } } @@ -407,7 +407,7 @@ func reindexTreeByUpdated(rootUpdatedMap, dbRootUpdatedMap map[string]string) { rootIDs = append(rootIDs, rootID) } - if util.IsExiting { + if util.IsExiting.Load() { break } } @@ -424,7 +424,7 @@ func reindexTreeByUpdated(rootUpdatedMap, dbRootUpdatedMap map[string]string) { } toRemoveRootIDs = append(toRemoveRootIDs, id) - if util.IsExiting { + if util.IsExiting.Load() { break } } diff --git a/kernel/model/repository.go b/kernel/model/repository.go index ca59ff585..eadce362c 100644 --- a/kernel/model/repository.go +++ b/kernel/model/repository.go @@ -1294,12 +1294,12 @@ func processSyncMergeResult(exit, byHand bool, mergeResult *dejavu.MergeResult, } if 1 > len(mergeResult.Upserts) && 1 > len(mergeResult.Removes) && 1 > len(mergeResult.Conflicts) { // 没有数据变更 - syncSameCount++ - if 10 < syncSameCount { - syncSameCount = 5 + syncSameCount.Add(1) + if 10 < syncSameCount.Load() { + syncSameCount.Store(5) } if !byHand { - delay := time.Minute * time.Duration(int(math.Pow(2, float64(syncSameCount)))) + delay := time.Minute * time.Duration(int(math.Pow(2, float64(syncSameCount.Load())))) if fixSyncInterval.Minutes() > delay.Minutes() { delay = time.Minute * 8 } @@ -1618,11 +1618,11 @@ func subscribeRepoEvents() { } coWalkDataCount++ }) - var bootProgressPart float64 + var bootProgressPart int32 eventbus.Subscribe(eventbus.EvtCheckoutUpsertFiles, func(context map[string]interface{}, total int) { msg := fmt.Sprintf(Conf.Language(162), 0, total) util.SetBootDetails(msg) - bootProgressPart = 10 / float64(total) + bootProgressPart = int32(10 / float64(total)) util.ContextPushMsg(context, msg) }) coUpsertFileCount := 0 @@ -1637,7 +1637,7 @@ func subscribeRepoEvents() { eventbus.Subscribe(eventbus.EvtCheckoutRemoveFiles, func(context map[string]interface{}, total int) { msg := fmt.Sprintf(Conf.Language(163), 0, total) util.SetBootDetails(msg) - bootProgressPart = 10 / float64(total) + bootProgressPart = int32(10 / float64(total)) util.ContextPushMsg(context, msg) }) @@ -1658,7 +1658,7 @@ func subscribeRepoEvents() { eventbus.Subscribe(eventbus.EvtCloudBeforeDownloadFiles, func(context map[string]interface{}, total int) { msg := fmt.Sprintf(Conf.Language(165), 0, total) util.SetBootDetails(msg) - bootProgressPart = 10 / float64(total) + bootProgressPart = int32(10 / float64(total)) util.ContextPushMsg(context, msg) }) @@ -1672,7 +1672,7 @@ func subscribeRepoEvents() { eventbus.Subscribe(eventbus.EvtCloudBeforeDownloadChunks, func(context map[string]interface{}, total int) { msg := fmt.Sprintf(Conf.Language(166), 0, total) util.SetBootDetails(msg) - bootProgressPart = 10 / float64(total) + bootProgressPart = int32(10 / float64(total)) util.ContextPushMsg(context, msg) }) eventbus.Subscribe(eventbus.EvtCloudBeforeDownloadChunk, func(context map[string]interface{}, count, total int) { diff --git a/kernel/model/sync.go b/kernel/model/sync.go index 619707854..e34c82de0 100644 --- a/kernel/model/sync.go +++ b/kernel/model/sync.go @@ -26,6 +26,7 @@ import ( "runtime" "strings" "sync" + "sync/atomic" "time" "github.com/88250/gulu" @@ -55,8 +56,8 @@ func SyncDataDownload() { return } - syncLock.Lock() - defer syncLock.Unlock() + lockSync() + defer unlockSync() now := util.CurrentTimeMillis() Conf.Sync.Synced = now @@ -82,8 +83,8 @@ func SyncDataUpload() { return } - syncLock.Lock() - defer syncLock.Unlock() + lockSync() + defer unlockSync() now := util.CurrentTimeMillis() Conf.Sync.Synced = now @@ -98,9 +99,11 @@ func SyncDataUpload() { } var ( - syncSameCount = 0 + syncSameCount = atomic.Int32{} autoSyncErrCount = 0 fixSyncInterval = 5 * time.Minute + + syncPlanTimeLock = sync.Mutex{} syncPlanTime = time.Now().Add(fixSyncInterval) BootSyncSucc = -1 // -1:未执行,0:执行成功,1:执行失败 @@ -108,9 +111,12 @@ var ( ) func SyncDataJob() { + syncPlanTimeLock.Lock() if time.Now().Before(syncPlanTime) { + syncPlanTimeLock.Unlock() return } + syncPlanTimeLock.Unlock() SyncData(false) } @@ -132,8 +138,8 @@ func BootSyncData() { return } - syncLock.Lock() - defer syncLock.Unlock() + lockSync() + defer unlockSync() util.IncBootProgress(3, "Syncing data from the cloud...") BootSyncSucc = 0 @@ -155,6 +161,16 @@ func SyncData(byHand bool) { syncData(false, byHand) } +func lockSync() { + syncLock.Lock() + isSyncing = true +} + +func unlockSync() { + isSyncing = false + syncLock.Unlock() +} + func syncData(exit, byHand bool) { defer logging.Recover() @@ -168,8 +184,8 @@ func syncData(exit, byHand bool) { return } - syncLock.Lock() - defer syncLock.Unlock() + lockSync() + defer unlockSync() if exit { ExitSyncSucc = 0 @@ -240,7 +256,7 @@ func checkSync(boot, exit, byHand bool) bool { } } - if gulu.IsMutexLocked(&syncLock) { + if isSyncing { logging.LogWarnf("sync is in progress") planSyncAfter(fixSyncInterval) return false @@ -266,7 +282,7 @@ func incReindex(upserts, removes []string) (upsertRootIDs, removeRootIDs []strin luteEngine := util.NewLute() // 先执行 remove,否则移动文档时 upsert 会被忽略,导致未被索引 - bootProgressPart := 10 / float64(len(removes)) + bootProgressPart := int32(10 / float64(len(removes))) for _, removeFile := range removes { if !strings.HasSuffix(removeFile, ".sy") { continue @@ -288,7 +304,7 @@ func incReindex(upserts, removes []string) (upsertRootIDs, removeRootIDs []strin msg = fmt.Sprintf(Conf.Language(35)) util.PushStatusBar(msg) - bootProgressPart = 10 / float64(len(upserts)) + bootProgressPart = int32(10 / float64(len(upserts))) for _, upsertFile := range upserts { if !strings.HasSuffix(upsertFile, ".sy") { continue @@ -413,7 +429,10 @@ func SetSyncProviderWebDAV(webdav *conf.WebDAV) (err error) { return } -var syncLock = sync.Mutex{} +var ( + syncLock = sync.Mutex{} + isSyncing bool +) func CreateCloudSyncDir(name string) (err error) { if conf.ProviderSiYuan != Conf.Sync.Provider { @@ -587,12 +606,14 @@ func getIgnoreLines() (ret []string) { } func IncSync() { - syncSameCount = 0 + syncSameCount.Store(0) planSyncAfter(30 * time.Second) } func planSyncAfter(d time.Duration) { + syncPlanTimeLock.Lock() syncPlanTime = time.Now().Add(d) + syncPlanTimeLock.Unlock() } func isProviderOnline(byHand bool) (ret bool) { diff --git a/kernel/model/transaction.go b/kernel/model/transaction.go index 9a00454ba..a6efdf2a4 100644 --- a/kernel/model/transaction.go +++ b/kernel/model/transaction.go @@ -87,7 +87,7 @@ var ( func isWritingFiles() bool { time.Sleep(time.Duration(20) * time.Millisecond) - return 0 < len(txQueue) || gulu.IsMutexLocked(&flushLock) + return 0 < len(txQueue) } func init() { diff --git a/kernel/model/updater.go b/kernel/model/updater.go index 8628d31ba..522de5685 100644 --- a/kernel/model/updater.go +++ b/kernel/model/updater.go @@ -82,11 +82,9 @@ func checkDownloadInstallPkg() { return } - if gulu.IsMutexLocked(&checkDownloadInstallPkgLock) { + if !checkDownloadInstallPkgLock.TryLock() { return } - - checkDownloadInstallPkgLock.Lock() defer checkDownloadInstallPkgLock.Unlock() downloadPkgURLs, checksum, err := getUpdatePkg() diff --git a/kernel/sql/queue.go b/kernel/sql/queue.go index edcd5f40d..490d2d0e6 100644 --- a/kernel/sql/queue.go +++ b/kernel/sql/queue.go @@ -20,7 +20,6 @@ import ( "database/sql" "errors" "fmt" - "github.com/88250/gulu" "path" "runtime/debug" "sync" @@ -36,8 +35,6 @@ import ( var ( operationQueue []*dbQueueOperation dbQueueLock = sync.Mutex{} - - txLock = sync.Mutex{} ) type dbQueueOperation struct { @@ -77,14 +74,18 @@ func WaitForWritingDatabase() { func isWritingDatabase() bool { time.Sleep(util.SQLFlushInterval + 50*time.Millisecond) - if 0 < len(operationQueue) || gulu.IsMutexLocked(&txLock) { + dbQueueLock.Lock() + defer dbQueueLock.Unlock() + if 0 < len(operationQueue) { return true } return false } func IsEmptyQueue() bool { - return 1 > len(operationQueue) && !gulu.IsMutexLocked(&txLock) + dbQueueLock.Lock() + defer dbQueueLock.Unlock() + return 1 > len(operationQueue) } func ClearQueue() { @@ -94,30 +95,30 @@ func ClearQueue() { } func FlushQueue() { - ops := mergeUpsertTrees() - if 1 > len(ops) { + dbQueueLock.Lock() + defer dbQueueLock.Unlock() + + total := len(operationQueue) + if 1 > total { return } - txLock.Lock() - defer txLock.Unlock() start := time.Now() context := map[string]interface{}{eventbus.CtxPushMsg: eventbus.CtxPushMsgToStatusBar} - total := len(ops) if 512 < total { disableCache() defer enableCache() } groupOpsTotal := map[string]int{} - for _, op := range ops { + for _, op := range operationQueue { groupOpsTotal[op.action]++ } groupOpsCurrent := map[string]int{} - for i, op := range ops { - if util.IsExiting { + for i, op := range operationQueue { + if util.IsExiting.Load() { return } @@ -137,7 +138,7 @@ func FlushQueue() { if err = commitTx(tx); nil != err { logging.LogErrorf("commit tx failed: %s", err) - return + continue } if 16 < i && 0 == i%128 { @@ -145,10 +146,12 @@ func FlushQueue() { } } - if 128 < len(ops) { + if 128 < total { debug.FreeOSMemory() } + operationQueue = nil + elapsed := time.Now().Sub(start).Milliseconds() if 7000 < elapsed { logging.LogInfof("database op tx [%dms]", elapsed) @@ -415,12 +418,3 @@ func RemoveTreePathQueue(treeBox, treePathPrefix string) { } operationQueue = append(operationQueue, newOp) } - -func mergeUpsertTrees() (ops []*dbQueueOperation) { - dbQueueLock.Lock() - defer dbQueueLock.Unlock() - - ops = operationQueue - operationQueue = nil - return -} diff --git a/kernel/sql/queue_asset_content.go b/kernel/sql/queue_asset_content.go index 0741b81fb..08b7c839e 100644 --- a/kernel/sql/queue_asset_content.go +++ b/kernel/sql/queue_asset_content.go @@ -67,7 +67,7 @@ func FlushAssetContentQueue() { context := map[string]interface{}{eventbus.CtxPushMsg: eventbus.CtxPushMsgToStatusBar} groupOpsCurrent := map[string]int{} for i, op := range ops { - if util.IsExiting { + if util.IsExiting.Load() { return } diff --git a/kernel/sql/queue_history.go b/kernel/sql/queue_history.go index 74fc2e160..a0338e0b9 100644 --- a/kernel/sql/queue_history.go +++ b/kernel/sql/queue_history.go @@ -67,7 +67,7 @@ func FlushHistoryQueue() { context := map[string]interface{}{eventbus.CtxPushMsg: eventbus.CtxPushMsgToStatusBar} groupOpsCurrent := map[string]int{} for i, op := range ops { - if util.IsExiting { + if util.IsExiting.Load() { return } diff --git a/kernel/task/queue.go b/kernel/task/queue.go index 06dc2c8d4..3d30f5ff2 100644 --- a/kernel/task/queue.go +++ b/kernel/task/queue.go @@ -45,7 +45,7 @@ func AppendTask(action string, handler interface{}, args ...interface{}) { } func AppendTaskWithTimeout(action string, timeout time.Duration, handler interface{}, args ...interface{}) { - if util.IsExiting { + if util.IsExiting.Load() { //logging.LogWarnf("task queue is paused, action [%s] will be ignored", action) return } @@ -69,15 +69,18 @@ func AppendTaskWithTimeout(action string, timeout time.Duration, handler interfa func getCurrentActions() (ret []string) { queueLock.Lock() - defer queueLock.Unlock() + currentTaskActionLock.Lock() if "" != currentTaskAction { ret = append(ret, currentTaskAction) } + currentTaskActionLock.Unlock() for _, task := range taskQueue { ret = append(ret, task.Action) } + + queueLock.Unlock() return } @@ -117,20 +120,23 @@ func Contain(action string, moreActions ...string) bool { actions := append(moreActions, action) actions = gulu.Str.RemoveDuplicatedElem(actions) + queueLock.Lock() for _, task := range taskQueue { if gulu.Str.Contains(task.Action, actions) { return true } } + queueLock.Unlock() return false } func StatusJob() { - tasks := taskQueue var items []map[string]interface{} count := map[string]int{} actionLangs := util.TaskActionLangs[util.Lang] - for _, task := range tasks { + + queueLock.Lock() + for _, task := range taskQueue { action := task.Action if c := count[action]; 2 < c { logging.LogWarnf("too many tasks [%s], ignore show its status", action) @@ -147,7 +153,9 @@ func StatusJob() { item := map[string]interface{}{"action": action} items = append(items, item) } + defer queueLock.Unlock() + currentTaskActionLock.Lock() if "" != currentTaskAction { if nil != actionLangs { if label := actionLangs[currentTaskAction]; nil != label { @@ -155,6 +163,7 @@ func StatusJob() { } } } + currentTaskActionLock.Unlock() if 1 > len(items) { items = []map[string]interface{}{} @@ -170,7 +179,7 @@ func ExecTaskJob() { return } - if util.IsExiting { + if util.IsExiting.Load() { return } @@ -190,7 +199,10 @@ func popTask() (ret *Task) { return } -var currentTaskAction string +var ( + currentTaskAction string + currentTaskActionLock = sync.Mutex{} +) func execTask(task *Task) { defer logging.Recover() @@ -204,7 +216,9 @@ func execTask(task *Task) { } } + currentTaskActionLock.Lock() currentTaskAction = task.Action + currentTaskActionLock.Unlock() ctx, cancel := context.WithTimeout(context.Background(), task.Timeout) defer cancel() @@ -221,5 +235,7 @@ func execTask(task *Task) { //logging.LogInfof("task [%s] done", task.Action) } + currentTaskActionLock.Lock() currentTaskAction = "" + currentTaskActionLock.Unlock() } diff --git a/kernel/treenode/blocktree.go b/kernel/treenode/blocktree.go index 3929c503a..08f7af5b5 100644 --- a/kernel/treenode/blocktree.go +++ b/kernel/treenode/blocktree.go @@ -21,6 +21,7 @@ import ( "path/filepath" "strings" "sync" + "sync/atomic" "time" "github.com/88250/gulu" @@ -436,7 +437,7 @@ func InitBlockTree(force bool) { return } - size := int64(0) + size := atomic.Int64{} waitGroup := &sync.WaitGroup{} p, _ := ants.NewPoolWithFunc(4, func(arg interface{}) { defer waitGroup.Done() @@ -457,7 +458,7 @@ func InitBlockTree(force bool) { os.Exit(logging.ExitCodeFileSysErr) return } - size += info.Size() + size.Add(info.Size()) sliceData := map[string]*BlockTree{} if err = msgpack.NewDecoder(f).Decode(&sliceData); nil != err { @@ -491,7 +492,7 @@ func InitBlockTree(force bool) { p.Release() elapsed := time.Since(start).Seconds() - logging.LogInfof("read block tree [%s] to [%s], elapsed [%.2fs]", humanize.Bytes(uint64(size)), util.BlockTreePath, elapsed) + logging.LogInfof("read block tree [%s] to [%s], elapsed [%.2fs]", humanize.Bytes(uint64(size.Load())), util.BlockTreePath, elapsed) return } @@ -514,11 +515,12 @@ func SaveBlockTree(force bool) { var count int blockTrees.Range(func(key, value interface{}) bool { slice := value.(*btSlice) + slice.m.Lock() if !force && slice.changed.IsZero() { + slice.m.Unlock() return true } - slice.m.Lock() data, err := msgpack.Marshal(slice.data) if nil != err { logging.LogErrorf("marshal block tree failed: %s", err) @@ -534,7 +536,9 @@ func SaveBlockTree(force bool) { return false } + slice.m.Lock() slice.changed = time.Time{} + slice.m.Unlock() size += uint64(len(data)) count++ return true diff --git a/kernel/util/runtime.go b/kernel/util/runtime.go index 49440fb3a..5227d7864 100644 --- a/kernel/util/runtime.go +++ b/kernel/util/runtime.go @@ -27,6 +27,7 @@ import ( "runtime/debug" "strings" "sync" + "sync/atomic" "time" "github.com/88250/gulu" @@ -60,7 +61,7 @@ func HookUILoaded() { } // IsExiting 是否正在退出程序。 -var IsExiting = false +var IsExiting = atomic.Bool{} // MobileOSVer 移动端操作系统版本。 var MobileOSVer string @@ -171,12 +172,10 @@ func CheckFileSysStatus() { func checkFileSysStatus() { defer logging.Recover() - if gulu.IsMutexLocked(&checkFileSysStatusLock) { + if !checkFileSysStatusLock.TryLock() { logging.LogWarnf("check file system status is locked, skip") return } - - checkFileSysStatusLock.Lock() defer checkFileSysStatusLock.Unlock() const fileSysStatusCheckFile = ".siyuan/filesys_status_check" diff --git a/kernel/util/working.go b/kernel/util/working.go index 2d083f75d..54e3e471c 100644 --- a/kernel/util/working.go +++ b/kernel/util/working.go @@ -28,6 +28,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" "github.com/88250/gulu" @@ -60,9 +61,9 @@ func initEnvVars() { } var ( - bootProgress float64 // 启动进度,从 0 到 100 - bootDetails string // 启动细节描述 - HttpServing = false // 是否 HTTP 伺服已经可用 + bootProgress = atomic.Int32{} // 启动进度,从 0 到 100 + bootDetails string // 启动细节描述 + HttpServing = false // 是否 HTTP 伺服已经可用 ) func Boot() { @@ -146,40 +147,44 @@ func Boot() { logBootInfo() } +var bootDetailsLock = sync.Mutex{} + func setBootDetails(details string) { + bootDetailsLock.Lock() bootDetails = "v" + Ver + " " + details + bootDetailsLock.Unlock() } func SetBootDetails(details string) { - if 100 <= bootProgress { + if 100 <= bootProgress.Load() { return } setBootDetails(details) } -func IncBootProgress(progress float64, details string) { - if 100 <= bootProgress { +func IncBootProgress(progress int32, details string) { + if 100 <= bootProgress.Load() { return } - bootProgress += progress + bootProgress.Add(progress) setBootDetails(details) } func IsBooted() bool { - return 100 <= bootProgress + return 100 <= bootProgress.Load() } -func GetBootProgressDetails() (float64, string) { - return bootProgress, bootDetails +func GetBootProgressDetails() (int32, string) { + return bootProgress.Load(), bootDetails } -func GetBootProgress() float64 { - return bootProgress +func GetBootProgress() int32 { + return bootProgress.Load() } func SetBooted() { setBootDetails("Finishing boot...") - bootProgress = 100 + bootProgress.Store(100) logging.LogInfof("kernel booted") }