persistent_storage_worker.go 1.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
  1. package engine
  2. import (
  3. "bytes"
  4. "encoding/binary"
  5. "encoding/gob"
  6. "github.com/huichen/wukong/types"
  7. "io"
  8. "log"
  9. "sync/atomic"
  10. )
  11. type persistentStorageIndexDocumentRequest struct {
  12. docId uint64
  13. data types.DocumentIndexData
  14. }
  15. func (engine *Engine) persistentStorageIndexDocumentWorker(shard int) {
  16. for {
  17. request := <-engine.persistentStorageIndexDocumentChannels[shard]
  18. // 得到key
  19. b := make([]byte, 8)
  20. length := binary.PutUvarint(b, request.docId)
  21. // 得到value
  22. var buf bytes.Buffer
  23. enc := gob.NewEncoder(&buf)
  24. err := enc.Encode(request.data)
  25. if err != nil {
  26. atomic.AddUint64(&engine.numDocumentsStored, 1)
  27. continue
  28. }
  29. // 将key-value写入数据库
  30. engine.dbs[shard].Set(b[0:length], buf.Bytes())
  31. atomic.AddUint64(&engine.numDocumentsStored, 1)
  32. }
  33. }
  34. func (engine *Engine) persistentStorageRemoveDocumentWorker(docId uint64, shard uint32) {
  35. // 得到key
  36. b := make([]byte, 8)
  37. length := binary.PutUvarint(b, docId)
  38. // 从数据库删除该key
  39. engine.dbs[shard].Delete(b[0:length])
  40. }
  41. func (engine *Engine) persistentStorageInitWorker(shard int) {
  42. iter, err := engine.dbs[shard].SeekFirst()
  43. if err == io.EOF {
  44. engine.persistentStorageInitChannel <- true
  45. return
  46. } else if err != nil {
  47. engine.persistentStorageInitChannel <- true
  48. log.Fatal("无法遍历数据库")
  49. }
  50. for {
  51. key, value, err := iter.Next()
  52. if err == io.EOF {
  53. break
  54. } else if err != nil {
  55. continue
  56. }
  57. // 得到docID
  58. docId, _ := binary.Uvarint(key)
  59. // 得到data
  60. buf := bytes.NewReader(value)
  61. dec := gob.NewDecoder(buf)
  62. var data types.DocumentIndexData
  63. err = dec.Decode(&data)
  64. if err != nil {
  65. continue
  66. }
  67. // 添加索引
  68. engine.internalIndexDocument(docId, data)
  69. }
  70. engine.persistentStorageInitChannel <- true
  71. }