| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384 |
- package engine
- import (
- "bytes"
- "encoding/binary"
- "encoding/gob"
- "github.com/huichen/wukong/types"
- "io"
- "log"
- "sync/atomic"
- )
- type persistentStorageIndexDocumentRequest struct {
- docId uint64
- data types.DocumentIndexData
- }
- func (engine *Engine) persistentStorageIndexDocumentWorker(shard int) {
- for {
- request := <-engine.persistentStorageIndexDocumentChannel
- // 得到key
- b := make([]byte, 8)
- length := binary.PutUvarint(b, request.docId)
- // 得到value
- var buf bytes.Buffer
- enc := gob.NewEncoder(&buf)
- err := enc.Encode(request.data)
- if err != nil {
- atomic.AddUint64(&engine.numDocumentsStored, 1)
- continue
- }
- // 将key-value写入数据库
- engine.dbs[shard].Set(b[0:length], buf.Bytes())
- atomic.AddUint64(&engine.numDocumentsStored, 1)
- }
- }
- func (engine *Engine) persistentStorageRemoveDocumentWorker(docId uint64, shard int) {
- // 得到key
- b := make([]byte, 8)
- length := binary.PutUvarint(b, docId)
- // 从数据库删除该key
- engine.dbs[shard].Delete(b[0:length])
- }
- func (engine *Engine) persistentStorageInitWorker(shard int) {
- iter, err := engine.dbs[shard].SeekFirst()
- if err == io.EOF {
- engine.persistentStorageInitChannel <- true
- return
- } else if err != nil {
- engine.persistentStorageInitChannel <- true
- log.Fatal("无法遍历数据库")
- }
- for {
- key, value, err := iter.Next()
- if err == io.EOF {
- break
- } else if err != nil {
- continue
- }
- // 得到docID
- docId, _ := binary.Uvarint(key)
- // 得到data
- buf := bytes.NewReader(value)
- dec := gob.NewDecoder(buf)
- var data types.DocumentIndexData
- err = dec.Decode(&data)
- if err != nil {
- continue
- }
- // 添加索引
- engine.internalIndexDocument(docId, data)
- }
- engine.persistentStorageInitChannel <- true
- }
|