indexer_worker.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. package engine
  2. import (
  3. "github.com/huichen/wukong/types"
  4. "sync/atomic"
  5. )
  6. type indexerAddDocumentRequest struct {
  7. document *types.DocumentIndex
  8. forceUpdate bool
  9. }
  10. type indexerLookupRequest struct {
  11. countDocsOnly bool
  12. tokens []string
  13. labels []string
  14. docIds map[uint64]bool
  15. options types.RankOptions
  16. rankerReturnChannel chan rankerReturnRequest
  17. orderless bool
  18. }
  19. type indexerRemoveDocRequest struct {
  20. docId uint64
  21. forceUpdate bool
  22. }
  23. func (engine *Engine) indexerAddDocumentWorker(shard int) {
  24. for {
  25. request := <-engine.indexerAddDocChannels[shard]
  26. engine.indexers[shard].AddDocumentToCache(request.document, request.forceUpdate)
  27. if request.document != nil {
  28. atomic.AddUint64(&engine.numTokenIndexAdded,
  29. uint64(len(request.document.Keywords)))
  30. atomic.AddUint64(&engine.numDocumentsIndexed, 1)
  31. }
  32. if request.forceUpdate {
  33. atomic.AddUint64(&engine.numDocumentsForceUpdated, 1)
  34. }
  35. }
  36. }
  37. func (engine *Engine) indexerRemoveDocWorker(shard int) {
  38. for {
  39. request := <-engine.indexerRemoveDocChannels[shard]
  40. engine.indexers[shard].RemoveDocumentToCache(request.docId, request.forceUpdate)
  41. if request.docId != 0 {
  42. atomic.AddUint64(&engine.numDocumentsRemoved, 1)
  43. }
  44. if request.forceUpdate {
  45. atomic.AddUint64(&engine.numDocumentsForceUpdated, 1)
  46. }
  47. }
  48. }
  49. func (engine *Engine) indexerLookupWorker(shard int) {
  50. for {
  51. request := <-engine.indexerLookupChannels[shard]
  52. var docs []types.IndexedDocument
  53. var numDocs int
  54. if request.docIds == nil {
  55. docs, numDocs = engine.indexers[shard].Lookup(request.tokens, request.labels, nil, request.countDocsOnly)
  56. } else {
  57. docs, numDocs = engine.indexers[shard].Lookup(request.tokens, request.labels, request.docIds, request.countDocsOnly)
  58. }
  59. if request.countDocsOnly {
  60. request.rankerReturnChannel <- rankerReturnRequest{numDocs: numDocs}
  61. continue
  62. }
  63. if len(docs) == 0 {
  64. request.rankerReturnChannel <- rankerReturnRequest{}
  65. continue
  66. }
  67. if request.orderless {
  68. var outputDocs []types.ScoredDocument
  69. for _, d := range docs {
  70. outputDocs = append(outputDocs, types.ScoredDocument{
  71. DocId: d.DocId,
  72. TokenSnippetLocations: d.TokenSnippetLocations,
  73. TokenLocations: d.TokenLocations})
  74. }
  75. request.rankerReturnChannel <- rankerReturnRequest{
  76. docs: outputDocs,
  77. numDocs: len(outputDocs),
  78. }
  79. continue
  80. }
  81. rankerRequest := rankerRankRequest{
  82. countDocsOnly: request.countDocsOnly,
  83. docs: docs,
  84. options: request.options,
  85. rankerReturnChannel: request.rankerReturnChannel,
  86. }
  87. engine.rankerRankChannels[shard] <- rankerRequest
  88. }
  89. }