indexer_worker.go 1.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455
  1. package engine
  2. import (
  3. "github.com/huichen/wukong/types"
  4. "sync/atomic"
  5. )
  6. type indexerAddDocumentRequest struct {
  7. document *types.DocumentIndex
  8. }
  9. type indexerLookupRequest struct {
  10. countDocsOnly bool
  11. tokens []string
  12. labels []string
  13. docIds map[uint64]bool
  14. options types.RankOptions
  15. rankerReturnChannel chan rankerReturnRequest
  16. }
  17. func (engine *Engine) indexerAddDocumentWorker(shard int) {
  18. for {
  19. request := <-engine.indexerAddDocumentChannels[shard]
  20. engine.indexers[shard].AddDocument(request.document)
  21. atomic.AddUint64(&engine.numTokenIndexAdded,
  22. uint64(len(request.document.Keywords)))
  23. atomic.AddUint64(&engine.numDocumentsIndexed, 1)
  24. }
  25. }
  26. func (engine *Engine) indexerLookupWorker(shard int) {
  27. for {
  28. request := <-engine.indexerLookupChannels[shard]
  29. var docs []types.IndexedDocument
  30. if request.docIds == nil {
  31. docs = engine.indexers[shard].Lookup(request.tokens, request.labels, nil)
  32. } else {
  33. docs = engine.indexers[shard].Lookup(request.tokens, request.labels, request.docIds)
  34. }
  35. if len(docs) == 0 {
  36. request.rankerReturnChannel <- rankerReturnRequest{}
  37. continue
  38. }
  39. rankerRequest := rankerRankRequest{
  40. countDocsOnly: request.countDocsOnly,
  41. docs: docs,
  42. options: request.options,
  43. rankerReturnChannel: request.rankerReturnChannel,
  44. }
  45. engine.rankerRankChannels[shard] <- rankerRequest
  46. }
  47. }