indexer_worker.go 2.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. package engine
  2. import (
  3. "github.com/huichen/wukong/types"
  4. "sync/atomic"
  5. )
  6. type indexerAddDocumentRequest struct {
  7. document *types.DocumentIndex
  8. }
  9. type indexerLookupRequest struct {
  10. countDocsOnly bool
  11. tokens []string
  12. labels []string
  13. docIds map[uint64]bool
  14. options types.RankOptions
  15. rankerReturnChannel chan rankerReturnRequest
  16. orderless bool
  17. }
  18. type indexerRemoveDocRequest struct {
  19. docId uint64
  20. }
  21. func (engine *Engine) indexerAddDocumentWorker(shard int) {
  22. for {
  23. request := <-engine.indexerAddDocumentChannels[shard]
  24. engine.indexers[shard].AddDocument(request.document)
  25. atomic.AddUint64(&engine.numTokenIndexAdded,
  26. uint64(len(request.document.Keywords)))
  27. atomic.AddUint64(&engine.numDocumentsIndexed, 1)
  28. }
  29. }
  30. func (engine *Engine) indexerLookupWorker(shard int) {
  31. for {
  32. request := <-engine.indexerLookupChannels[shard]
  33. var docs []types.IndexedDocument
  34. var numDocs int
  35. if request.docIds == nil {
  36. docs, numDocs = engine.indexers[shard].Lookup(request.tokens, request.labels, nil, request.countDocsOnly)
  37. } else {
  38. docs, numDocs = engine.indexers[shard].Lookup(request.tokens, request.labels, request.docIds, request.countDocsOnly)
  39. }
  40. if request.countDocsOnly {
  41. request.rankerReturnChannel <- rankerReturnRequest{numDocs: numDocs}
  42. continue
  43. }
  44. if len(docs) == 0 {
  45. request.rankerReturnChannel <- rankerReturnRequest{}
  46. continue
  47. }
  48. if request.orderless {
  49. var outputDocs []types.ScoredDocument
  50. for _, d := range docs {
  51. outputDocs = append(outputDocs, types.ScoredDocument{
  52. DocId: d.DocId,
  53. TokenSnippetLocations: d.TokenSnippetLocations,
  54. TokenLocations: d.TokenLocations})
  55. }
  56. request.rankerReturnChannel <- rankerReturnRequest{
  57. docs: outputDocs,
  58. numDocs: len(outputDocs),
  59. }
  60. continue
  61. }
  62. rankerRequest := rankerRankRequest{
  63. countDocsOnly: request.countDocsOnly,
  64. docs: docs,
  65. options: request.options,
  66. rankerReturnChannel: request.rankerReturnChannel,
  67. }
  68. engine.rankerRankChannels[shard] <- rankerRequest
  69. }
  70. }
  71. func (engine *Engine) indexerRemoveDocWorker(shard int) {
  72. for {
  73. request := <-engine.indexerRemoveDocChannels[shard]
  74. engine.indexers[shard].RemoveDoc(request.docId)
  75. }
  76. }