| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566 |
- package engine
- import (
- "bytes"
- "encoding/binary"
- "encoding/gob"
- "github.com/huichen/wukong/types"
- "sync/atomic"
- )
- type persistentStorageIndexDocumentRequest struct {
- docId uint64
- data types.DocumentIndexData
- }
- func (engine *Engine) persistentStorageIndexDocumentWorker(shard int) {
- for {
- request := <-engine.persistentStorageIndexDocumentChannels[shard]
- // 得到key
- b := make([]byte, 10)
- length := binary.PutUvarint(b, request.docId)
- // 得到value
- var buf bytes.Buffer
- enc := gob.NewEncoder(&buf)
- err := enc.Encode(request.data)
- if err != nil {
- atomic.AddUint64(&engine.numDocumentsStored, 1)
- continue
- }
- // 将key-value写入数据库
- engine.dbs[shard].Set(b[0:length], buf.Bytes())
- atomic.AddUint64(&engine.numDocumentsStored, 1)
- }
- }
- func (engine *Engine) persistentStorageRemoveDocumentWorker(docId uint64, shard uint32) {
- // 得到key
- b := make([]byte, 10)
- length := binary.PutUvarint(b, docId)
- // 从数据库删除该key
- engine.dbs[shard].Delete(b[0:length])
- }
- func (engine *Engine) persistentStorageInitWorker(shard int) {
- engine.dbs[shard].ForEach(func(k, v []byte) error {
- key, value := k, v
- // 得到docID
- docId, _ := binary.Uvarint(key)
- // 得到data
- buf := bytes.NewReader(value)
- dec := gob.NewDecoder(buf)
- var data types.DocumentIndexData
- err := dec.Decode(&data)
- if err == nil {
- // 添加索引
- engine.internalIndexDocument(docId, data)
- }
- return nil
- })
- engine.persistentStorageInitChannel <- true
- }
|