diff --git a/kernel/model/index_fix.go b/kernel/model/index_fix.go index e5b332886..bc5e450ee 100644 --- a/kernel/model/index_fix.go +++ b/kernel/model/index_fix.go @@ -69,20 +69,23 @@ func autoFixIndex() { for _, root := range roots { rootMap[root.ID] = root } + + var toRemoveRootIDs []string var deletes int for _, rootID := range duplicatedRootIDs { root := rootMap[rootID] if nil == root { continue } - - //logging.LogWarnf("exist more than one tree [%s], reindex it", rootID) - sql.RemoveTreeQueue(root.Box, rootID) deletes++ + toRemoveRootIDs = append(toRemoveRootIDs, rootID) if util.IsExiting { break } } + toRemoveRootIDs = gulu.Str.RemoveDuplicatedElem(toRemoveRootIDs) + sql.BatchRemoveTreeQueue(toRemoveRootIDs) + if 0 < deletes { logging.LogWarnf("exist more than one tree duplicated [%d], reindex it", deletes) } @@ -203,17 +206,20 @@ func reindexTreeByUpdated(rootUpdatedMap, dbRootUpdatedMap map[string]string) { for _, block := range blocks { roots[block.RootID] = block } + var toRemoveRootIDs []string for id, root := range roots { if nil == root { continue } - logging.LogWarnf("tree [%s] is not in block tree, remove it from [%s]", id, root.Box) - sql.RemoveTreeQueue(root.Box, root.ID) + toRemoveRootIDs = append(toRemoveRootIDs, id) if util.IsExiting { break } } + toRemoveRootIDs = gulu.Str.RemoveDuplicatedElem(toRemoveRootIDs) + //logging.LogWarnf("tree [%s] is not in block tree, remove it from [%s]", id, root.Box) + sql.BatchRemoveTreeQueue(toRemoveRootIDs) } func reindexTreeByPath(box, p string, i, size int) { diff --git a/kernel/sql/database.go b/kernel/sql/database.go index 7d0bcb975..47379284b 100644 --- a/kernel/sql/database.go +++ b/kernel/sql/database.go @@ -20,6 +20,7 @@ import ( "bytes" "database/sql" "errors" + "fmt" "os" "path/filepath" "regexp" @@ -982,6 +983,42 @@ func deleteByRootID(tx *sql.Tx, rootID string, context map[string]interface{}) ( return } +func batchDeleteByRootIDs(tx *sql.Tx, rootIDs []string, context map[string]interface{}) (err error) { + ids := strings.Join(rootIDs, "','") + ids = "('" + ids + "')" + stmt := "DELETE FROM blocks WHERE root_id IN " + ids + if err = execStmtTx(tx, stmt); nil != err { + return + } + stmt = "DELETE FROM blocks_fts WHERE root_id IN " + ids + if err = execStmtTx(tx, stmt); nil != err { + return + } + stmt = "DELETE FROM blocks_fts_case_insensitive WHERE root_id IN " + ids + if err = execStmtTx(tx, stmt); nil != err { + return + } + stmt = "DELETE FROM spans WHERE root_id IN " + ids + if err = execStmtTx(tx, stmt); nil != err { + return + } + stmt = "DELETE FROM assets WHERE root_id IN " + ids + if err = execStmtTx(tx, stmt); nil != err { + return + } + stmt = "DELETE FROM refs WHERE root_id IN " + ids + if err = execStmtTx(tx, stmt); nil != err { + return + } + stmt = "DELETE FROM file_annotation_refs WHERE root_id IN " + ids + if err = execStmtTx(tx, stmt); nil != err { + return + } + ClearBlockCache() + eventbus.Publish(eventbus.EvtSQLDeleteBlocks, context, fmt.Sprintf("%d", len(rootIDs))) + return +} + func batchDeleteByPathPrefix(tx *sql.Tx, boxID, pathPrefix string) (err error) { stmt := "DELETE FROM blocks WHERE box = ? AND path LIKE ?" if err = execStmtTx(tx, stmt, boxID, pathPrefix+"%"); nil != err { diff --git a/kernel/sql/queue.go b/kernel/sql/queue.go index 3ed95779f..df9cfe9f2 100644 --- a/kernel/sql/queue.go +++ b/kernel/sql/queue.go @@ -41,12 +41,13 @@ var ( type dbQueueOperation struct { inQueueTime time.Time - action string // upsert/delete/delete_id/rename/delete_box/delete_box_refs/insert_refs/index + action string // upsert/delete/delete_id/rename/delete_box/delete_box_refs/insert_refs/index/delete_ids indexPath string // index upsertTree *parse.Tree // upsert/insert_refs removeTreeBox, removeTreePath string // delete removeTreeIDBox, removeTreeID string // delete_id + removeTreeIDs []string // delete_ids box string // delete_box/delete_box_refs/index renameTree *parse.Tree // rename renameTreeOldHPath string // rename @@ -148,6 +149,8 @@ func execOp(op *dbQueueOperation, tx *sql.Tx, context map[string]interface{}) (e err = batchDeleteByPathPrefix(tx, op.removeTreeBox, op.removeTreePath) case "delete_id": err = deleteByRootID(tx, op.removeTreeID, context) + case "delete_ids": + err = batchDeleteByRootIDs(tx, op.removeTreeIDs, context) case "rename": err = batchUpdateHPath(tx, op.renameTree.Box, op.renameTree.ID, op.renameTreeOldHPath, op.renameTree.HPath) if nil != err { @@ -295,6 +298,14 @@ func RemoveTreeQueue(box, rootID string) { operationQueue = append(operationQueue, newOp) } +func BatchRemoveTreeQueue(rootIDs []string) { + dbQueueLock.Lock() + defer dbQueueLock.Unlock() + + newOp := &dbQueueOperation{removeTreeIDs: rootIDs, inQueueTime: time.Now(), action: "delete_ids"} + operationQueue = append(operationQueue, newOp) +} + func RemoveTreePathQueue(treeBox, treePathPrefix string) { dbQueueLock.Lock() defer dbQueueLock.Unlock() diff --git a/kernel/treenode/blocktree.go b/kernel/treenode/blocktree.go index 01db7bad1..4cddf9217 100644 --- a/kernel/treenode/blocktree.go +++ b/kernel/treenode/blocktree.go @@ -29,6 +29,7 @@ import ( "github.com/88250/lute/ast" "github.com/88250/lute/parse" "github.com/dustin/go-humanize" + "github.com/panjf2000/ants/v2" util2 "github.com/siyuan-note/dejavu/util" "github.com/siyuan-note/logging" "github.com/siyuan-note/siyuan/kernel/util" @@ -425,11 +426,13 @@ func InitBlockTree(force bool) { } size := uint64(0) - for _, entry := range entries { - if !strings.HasSuffix(entry.Name(), ".msgpack") { - continue - } + poolSize := runtime.NumCPU() + waitGroup := &sync.WaitGroup{} + p, _ := ants.NewPoolWithFunc(poolSize, func(arg interface{}) { + defer waitGroup.Done() + + entry := arg.(os.DirEntry) p := filepath.Join(util.BlockTreePath, entry.Name()) var fh *os.File fh, err = os.OpenFile(p, os.O_RDWR, 0644) @@ -461,13 +464,22 @@ func InitBlockTree(force bool) { name := entry.Name()[0:strings.Index(entry.Name(), ".")] blockTrees.Store(name, &btSlice{data: sliceData, changed: time.Time{}, m: &sync.Mutex{}}) size += uint64(len(data)) + }) + for _, entry := range entries { + if !strings.HasSuffix(entry.Name(), ".msgpack") { + continue + } + + waitGroup.Add(1) + p.Invoke(entry) } + waitGroup.Wait() + p.Release() + runtime.GC() - - if elapsed := time.Since(start).Seconds(); 2 < elapsed { - logging.LogWarnf("read block tree [%s] to [%s], elapsed [%.2fs]", humanize.Bytes((size)), util.BlockTreePath, elapsed) - } + elapsed := time.Since(start).Seconds() + logging.LogInfof("read block tree [%s] to [%s], elapsed [%.2fs]", humanize.Bytes((size)), util.BlockTreePath, elapsed) return }