indexer_worker.go 1.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071
  1. package engine
  2. import (
  3. "github.com/huichen/wukong/types"
  4. "sync/atomic"
  5. )
  6. type indexerAddDocumentRequest struct {
  7. document *types.DocumentIndex
  8. }
  9. type indexerLookupRequest struct {
  10. countDocsOnly bool
  11. tokens []string
  12. labels []string
  13. docIds map[uint64]bool
  14. options types.RankOptions
  15. rankerReturnChannel chan rankerReturnRequest
  16. orderless bool
  17. }
  18. func (engine *Engine) indexerAddDocumentWorker(shard int) {
  19. for {
  20. request := <-engine.indexerAddDocumentChannels[shard]
  21. engine.indexers[shard].AddDocument(request.document)
  22. atomic.AddUint64(&engine.numTokenIndexAdded,
  23. uint64(len(request.document.Keywords)))
  24. atomic.AddUint64(&engine.numDocumentsIndexed, 1)
  25. }
  26. }
  27. func (engine *Engine) indexerLookupWorker(shard int) {
  28. for {
  29. request := <-engine.indexerLookupChannels[shard]
  30. var docs []types.IndexedDocument
  31. if request.docIds == nil {
  32. docs = engine.indexers[shard].Lookup(request.tokens, request.labels, nil)
  33. } else {
  34. docs = engine.indexers[shard].Lookup(request.tokens, request.labels, request.docIds)
  35. }
  36. if len(docs) == 0 {
  37. request.rankerReturnChannel <- rankerReturnRequest{}
  38. continue
  39. }
  40. if request.orderless {
  41. var outputDocs []types.ScoredDocument
  42. for _, d := range docs {
  43. outputDocs = append(outputDocs, types.ScoredDocument{
  44. DocId: d.DocId,
  45. TokenSnippetLocations: d.TokenSnippetLocations,
  46. TokenLocations: d.TokenLocations})
  47. }
  48. request.rankerReturnChannel <- rankerReturnRequest{
  49. docs: outputDocs,
  50. numDocs: len(outputDocs),
  51. }
  52. continue
  53. }
  54. rankerRequest := rankerRankRequest{
  55. countDocsOnly: request.countDocsOnly,
  56. docs: docs,
  57. options: request.options,
  58. rankerReturnChannel: request.rankerReturnChannel,
  59. }
  60. engine.rankerRankChannels[shard] <- rankerRequest
  61. }
  62. }