persistent_storage_worker.go 1.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566
  1. package engine
  2. import (
  3. "bytes"
  4. "encoding/binary"
  5. "encoding/gob"
  6. "github.com/huichen/wukong/types"
  7. "sync/atomic"
  8. )
  9. type persistentStorageIndexDocumentRequest struct {
  10. docId uint64
  11. data types.DocumentIndexData
  12. }
  13. func (engine *Engine) persistentStorageIndexDocumentWorker(shard int) {
  14. for {
  15. request := <-engine.persistentStorageIndexDocumentChannels[shard]
  16. // 得到key
  17. b := make([]byte, 10)
  18. length := binary.PutUvarint(b, request.docId)
  19. // 得到value
  20. var buf bytes.Buffer
  21. enc := gob.NewEncoder(&buf)
  22. err := enc.Encode(request.data)
  23. if err != nil {
  24. atomic.AddUint64(&engine.numDocumentsStored, 1)
  25. continue
  26. }
  27. // 将key-value写入数据库
  28. engine.dbs[shard].Set(b[0:length], buf.Bytes())
  29. atomic.AddUint64(&engine.numDocumentsStored, 1)
  30. }
  31. }
  32. func (engine *Engine) persistentStorageRemoveDocumentWorker(docId uint64, shard uint32) {
  33. // 得到key
  34. b := make([]byte, 10)
  35. length := binary.PutUvarint(b, docId)
  36. // 从数据库删除该key
  37. engine.dbs[shard].Delete(b[0:length])
  38. }
  39. func (engine *Engine) persistentStorageInitWorker(shard int) {
  40. engine.dbs[shard].ForEach(func(k, v []byte) error {
  41. key, value := k, v
  42. // 得到docID
  43. docId, _ := binary.Uvarint(key)
  44. // 得到data
  45. buf := bytes.NewReader(value)
  46. dec := gob.NewDecoder(buf)
  47. var data types.DocumentIndexData
  48. err := dec.Decode(&data)
  49. if err == nil {
  50. // 添加索引
  51. engine.internalIndexDocument(docId, data)
  52. }
  53. return nil
  54. })
  55. engine.persistentStorageInitChannel <- true
  56. }