persistent_storage_worker.go 1.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960
  1. package engine
  2. import (
  3. "bytes"
  4. "encoding/gob"
  5. "sync/atomic"
  6. "github.com/huichen/wukong/types"
  7. )
  8. type persistentStorageIndexDocumentRequest struct {
  9. docId string
  10. data types.DocumentIndexData
  11. }
  12. func (engine *Engine) persistentStorageIndexDocumentWorker(shard int) {
  13. for {
  14. request := <-engine.persistentStorageIndexDocumentChannels[shard]
  15. // 得到key
  16. // 得到value
  17. var buf bytes.Buffer
  18. enc := gob.NewEncoder(&buf)
  19. err := enc.Encode(request.data)
  20. if err != nil {
  21. atomic.AddUint64(&engine.numDocumentsStored, 1)
  22. continue
  23. }
  24. // 将key-value写入数据库
  25. engine.dbs[shard].Set([]byte(request.docId), buf.Bytes())
  26. atomic.AddUint64(&engine.numDocumentsStored, 1)
  27. }
  28. }
  29. func (engine *Engine) persistentStorageRemoveDocumentWorker(docId string, shard uint32) {
  30. // 得到key
  31. // 从数据库删除该key
  32. engine.dbs[shard].Delete([]byte(docId))
  33. }
  34. func (engine *Engine) persistentStorageInitWorker(shard int) {
  35. engine.dbs[shard].ForEach(func(k, v []byte) error {
  36. // 得到docID
  37. docId, value := string(k), v
  38. // 得到data
  39. buf := bytes.NewReader(value)
  40. dec := gob.NewDecoder(buf)
  41. var data types.DocumentIndexData
  42. err := dec.Decode(&data)
  43. if err == nil {
  44. // 添加索引
  45. engine.internalIndexDocument(docId, data, false)
  46. }
  47. return nil
  48. })
  49. engine.persistentStorageInitChannel <- true
  50. }