indexer_worker.go 1.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859
  1. package engine
  2. import (
  3. "github.com/huichen/wukong/types"
  4. "sync/atomic"
  5. )
  6. type indexerAddDocumentRequest struct {
  7. document *types.DocumentIndex
  8. }
  9. type indexerLookupRequest struct {
  10. countDocsOnly bool
  11. tokens []string
  12. labels []string
  13. docIds []uint64
  14. options types.RankOptions
  15. rankerReturnChannel chan rankerReturnRequest
  16. }
  17. func (engine *Engine) indexerAddDocumentWorker(shard int) {
  18. for {
  19. request := <-engine.indexerAddDocumentChannels[shard]
  20. engine.indexers[shard].AddDocument(request.document)
  21. atomic.AddUint64(&engine.numTokenIndexAdded,
  22. uint64(len(request.document.Keywords)))
  23. atomic.AddUint64(&engine.numDocumentsIndexed, 1)
  24. }
  25. }
  26. func (engine *Engine) indexerLookupWorker(shard int) {
  27. for {
  28. request := <-engine.indexerLookupChannels[shard]
  29. var docs []types.IndexedDocument
  30. if len(request.docIds) == 0 {
  31. docs = engine.indexers[shard].Lookup(request.tokens, request.labels, nil)
  32. } else {
  33. docIds := make(map[uint64]bool)
  34. for _, ids := range request.docIds {
  35. docIds[ids] = true
  36. }
  37. docs = engine.indexers[shard].Lookup(request.tokens, request.labels, &docIds)
  38. }
  39. if len(docs) == 0 {
  40. request.rankerReturnChannel <- rankerReturnRequest{}
  41. continue
  42. }
  43. rankerRequest := rankerRankRequest{
  44. countDocsOnly: request.countDocsOnly,
  45. docs: docs,
  46. options: request.options,
  47. rankerReturnChannel: request.rankerReturnChannel,
  48. }
  49. engine.rankerRankChannels[shard] <- rankerRequest
  50. }
  51. }