package engine import ( "bytes" "encoding/binary" "encoding/gob" "github.com/huichen/wukong/types" "sync/atomic" ) type persistentStorageIndexDocumentRequest struct { docId uint64 data types.DocumentIndexData } func (engine *Engine) persistentStorageIndexDocumentWorker(shard int) { for { request := <-engine.persistentStorageIndexDocumentChannels[shard] // 得到key b := make([]byte, 10) length := binary.PutUvarint(b, request.docId) // 得到value var buf bytes.Buffer enc := gob.NewEncoder(&buf) err := enc.Encode(request.data) if err != nil { atomic.AddUint64(&engine.numDocumentsStored, 1) continue } // 将key-value写入数据库 engine.dbs[shard].Set(b[0:length], buf.Bytes()) atomic.AddUint64(&engine.numDocumentsStored, 1) } } func (engine *Engine) persistentStorageRemoveDocumentWorker(docId uint64, shard uint32) { // 得到key b := make([]byte, 10) length := binary.PutUvarint(b, docId) // 从数据库删除该key engine.dbs[shard].Delete(b[0:length]) } func (engine *Engine) persistentStorageInitWorker(shard int) { engine.dbs[shard].ForEach(func(k, v []byte) error { key, value := k, v // 得到docID docId, _ := binary.Uvarint(key) // 得到data buf := bytes.NewReader(value) dec := gob.NewDecoder(buf) var data types.DocumentIndexData err := dec.Decode(&data) if err == nil { // 添加索引 engine.internalIndexDocument(docId, data, false) } return nil }) engine.persistentStorageInitChannel <- true }