indexer_worker.go 1.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556
  1. package engine
  2. import (
  3. "github.com/huichen/wukong/types"
  4. "sync/atomic"
  5. )
  6. type indexerAddDocumentRequest struct {
  7. document *types.DocumentIndex
  8. }
  9. type indexerLookupRequest struct {
  10. tokens []string
  11. labels []string
  12. docIds []uint64
  13. options types.RankOptions
  14. rankerReturnChannel chan rankerReturnRequest
  15. }
  16. func (engine *Engine) indexerAddDocumentWorker(shard int) {
  17. for {
  18. request := <-engine.indexerAddDocumentChannels[shard]
  19. engine.indexers[shard].AddDocument(request.document)
  20. atomic.AddUint64(&engine.numTokenIndexAdded,
  21. uint64(len(request.document.Keywords)))
  22. atomic.AddUint64(&engine.numDocumentsIndexed, 1)
  23. }
  24. }
  25. func (engine *Engine) indexerLookupWorker(shard int) {
  26. for {
  27. request := <-engine.indexerLookupChannels[shard]
  28. var docs []types.IndexedDocument
  29. if len(request.docIds) == 0 {
  30. docs = engine.indexers[shard].Lookup(request.tokens, request.labels, nil)
  31. } else {
  32. docIds := make(map[uint64]bool)
  33. for _, ids := range request.docIds {
  34. docIds[ids] = true
  35. }
  36. docs = engine.indexers[shard].Lookup(request.tokens, request.labels, &docIds)
  37. }
  38. if len(docs) == 0 {
  39. request.rankerReturnChannel <- rankerReturnRequest{}
  40. continue
  41. }
  42. rankerRequest := rankerRankRequest{
  43. docs: docs,
  44. options: request.options,
  45. rankerReturnChannel: request.rankerReturnChannel}
  46. engine.rankerRankChannels[shard] <- rankerRequest
  47. }
  48. }