persistent_storage_worker.go 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. package engine
  2. import (
  3. "bytes"
  4. "encoding/binary"
  5. "encoding/gob"
  6. "github.com/huichen/wukong/types"
  7. //"io"
  8. //"log"
  9. "sync/atomic"
  10. )
  11. type persistentStorageIndexDocumentRequest struct {
  12. docId uint64
  13. data types.DocumentIndexData
  14. }
  15. func (engine *Engine) persistentStorageIndexDocumentWorker(shard int) {
  16. for {
  17. request := <-engine.persistentStorageIndexDocumentChannels[shard]
  18. // 得到key
  19. b := make([]byte, 10)
  20. length := binary.PutUvarint(b, request.docId)
  21. // 得到value
  22. var buf bytes.Buffer
  23. enc := gob.NewEncoder(&buf)
  24. err := enc.Encode(request.data)
  25. if err != nil {
  26. atomic.AddUint64(&engine.numDocumentsStored, 1)
  27. continue
  28. }
  29. // 将key-value写入数据库
  30. engine.dbs[shard].Set(b[0:length], buf.Bytes())
  31. atomic.AddUint64(&engine.numDocumentsStored, 1)
  32. }
  33. }
  34. func (engine *Engine) persistentStorageRemoveDocumentWorker(docId uint64, shard uint32) {
  35. // 得到key
  36. b := make([]byte, 10)
  37. length := binary.PutUvarint(b, docId)
  38. // 从数据库删除该key
  39. engine.dbs[shard].Delete(b[0:length])
  40. }
  41. // func (engine *Engine) persistentStorageInitWorker(shard int) {
  42. // iter, err := engine.dbs[shard].SeekFirst()
  43. // if err == io.EOF {
  44. // engine.persistentStorageInitChannel <- true
  45. // return
  46. // } else if err != nil {
  47. // engine.persistentStorageInitChannel <- true
  48. // log.Fatal("无法遍历数据库")
  49. // }
  50. // for {
  51. // key, value, err := iter.Next()
  52. // if err == io.EOF {
  53. // break
  54. // } else if err != nil {
  55. // continue
  56. // }
  57. // // 得到docID
  58. // docId, _ := binary.Uvarint(key)
  59. // // 得到data
  60. // buf := bytes.NewReader(value)
  61. // dec := gob.NewDecoder(buf)
  62. // var data types.DocumentIndexData
  63. // err = dec.Decode(&data)
  64. // if err != nil {
  65. // continue
  66. // }
  67. // // 添加索引
  68. // engine.internalIndexDocument(docId, data)
  69. // }
  70. // engine.persistentStorageInitChannel <- true
  71. // }
  72. func (engine *Engine) persistentStorageInitWorker(shard int) {
  73. engine.dbs[shard].ForEach(func(k, v []byte) error {
  74. key, value := k, v
  75. // 得到docID
  76. docId, _ := binary.Uvarint(key)
  77. // 得到data
  78. buf := bytes.NewReader(value)
  79. dec := gob.NewDecoder(buf)
  80. var data types.DocumentIndexData
  81. err := dec.Decode(&data)
  82. if err == nil {
  83. // 添加索引
  84. engine.internalIndexDocument(docId, data)
  85. }
  86. return nil
  87. })
  88. engine.persistentStorageInitChannel <- true
  89. }