Explorar o código

1. 修改 benchmark, 打乱 docId 顺序模拟普通情况测试

2. 修 testcase
Zacky Su %!s(int64=9) %!d(string=hai) anos
pai
achega
2cf3d2f095
Modificáronse 7 ficheiros con 60 adicións e 42 borrados
  1. 3 3
      README.md
  2. 15 17
      core/indexer.go
  3. 1 1
      docs/codelab.md
  4. 2 2
      docs/realtime_indexing.md
  5. 0 3
      engine/engine.go
  6. 2 0
      engine/engine_test.go
  7. 37 16
      examples/benchmark.go

+ 3 - 3
README.md

@@ -45,9 +45,9 @@ func main() {
 	defer searcher.Close()
 
 	// 将文档加入索引
-	searcher.IndexDocument(0, types.DocumentIndexData{Content: "此次百度收购将成中国互联网最大并购"}, true)
-	searcher.IndexDocument(1, types.DocumentIndexData{Content: "百度宣布拟全资收购91无线业务"}, true)
-	searcher.IndexDocument(2, types.DocumentIndexData{Content: "百度是中国最大的搜索引擎"}, true)
+	searcher.IndexDocument(1, types.DocumentIndexData{Content: "此次百度收购将成中国互联网最大并购"}, true)
+	searcher.IndexDocument(2, types.DocumentIndexData{Content: "百度宣布拟全资收购91无线业务"}, true)
+	searcher.IndexDocument(3, types.DocumentIndexData{Content: "百度是中国最大的搜索引擎"}, true)
 
 	// 等待索引刷新完毕
 	searcher.FlushIndex()

+ 15 - 17
core/indexer.go

@@ -17,7 +17,7 @@ type Indexer struct {
 	tableLock struct {
 		sync.RWMutex
 		table     map[string]*KeywordIndices
-		docsState map[uint64]int // 0: 存在于索引中,1: 等待删除,2: 等待加入
+		docsState map[uint64]int // nil: 表示无状态记录,0: 存在于索引中,1: 等待删除,2: 等待加入
 	}
 	addCacheLock struct {
 		sync.RWMutex
@@ -45,9 +45,6 @@ type Indexer struct {
 
 // 反向索引表的一行,收集了一个搜索键出现的所有文档,按照DocId从小到大排序。
 type KeywordIndices struct {
-	// 用于标记在 docIds[] 进行批量加入索引时二分查找的下界
-	lowerBound int
-
 	// 下面的切片是否为空,取决于初始化时IndexType的值
 	docIds      []uint64  // 全部类型都有
 	frequencies []float32 // IndexType == FrequenciesIndex
@@ -97,6 +94,7 @@ func (indexer *Indexer) AddDocumentToCache(document *types.DocumentIndex, forceU
 		for i := 0; i < indexer.addCacheLock.addCachePointer; i++ {
 			docIndex := indexer.addCacheLock.addCache[i]
 			if docState, ok := indexer.tableLock.docsState[docIndex.DocId]; ok && docState == 0 {
+				// ok && docState == 0 表示存在于索引中,需先删除再添加
 				if position != i {
 					indexer.addCacheLock.addCache[position], indexer.addCacheLock.addCache[i] =
 						indexer.addCacheLock.addCache[i], indexer.addCacheLock.addCache[position]
@@ -110,13 +108,14 @@ func (indexer *Indexer) AddDocumentToCache(document *types.DocumentIndex, forceU
 				indexer.numDocuments--
 				position++
 			} else if !(ok && docState == 1) {
-				// ok && docState == 1 表示等待删除或者删除当前 doc
+				// ok && docState == 1 表示等待删除
 				indexer.tableLock.docsState[docIndex.DocId] = 2
 			}
 		}
 
 		indexer.tableLock.Unlock()
 		if indexer.RemoveDocumentToCache(0, forceUpdate) {
+			// 只有当存在于索引表中的文档已被删除,其才可以重新加入到索引表中
 			position = 0
 		}
 
@@ -138,9 +137,7 @@ func (indexer *Indexer) AddDocuments(documents *types.DocumentsIndex) {
 
 	indexer.tableLock.Lock()
 	defer indexer.tableLock.Unlock()
-	for _, indices := range indexer.tableLock.table {
-		indices.lowerBound = 0
-	}
+	indexPointers := make(map[string]int, len(indexer.tableLock.table))
 
 	// DocId 递增顺序遍历插入文档保证索引移动次数最少
 	for i, document := range *documents {
@@ -178,8 +175,8 @@ func (indexer *Indexer) AddDocuments(documents *types.DocumentsIndex) {
 
 			// 查找应该插入的位置,且索引一定不存在
 			position, _ := indexer.searchIndex(
-				indices, indices.lowerBound, indexer.getIndexLength(indices)-1, document.DocId)
-			indices.lowerBound = position
+				indices, indexPointers[keyword.Text], indexer.getIndexLength(indices)-1, document.DocId)
+			indexPointers[keyword.Text] = position
 			switch indexer.initOptions.IndexType {
 			case types.LocationsIndex:
 				indices.locations = append(indices.locations, []int{})
@@ -204,6 +201,7 @@ func (indexer *Indexer) AddDocuments(documents *types.DocumentsIndex) {
 }
 
 // 向 REMOVECACHE 中加入一个待删除文档
+// 返回值表示文档是否在索引表中被删除
 func (indexer *Indexer) RemoveDocumentToCache(docId uint64, forceUpdate bool) bool {
 	if indexer.initialized == false {
 		log.Fatal("索引器尚未初始化")
@@ -218,7 +216,7 @@ func (indexer *Indexer) RemoveDocumentToCache(docId uint64, forceUpdate bool) bo
 			indexer.tableLock.docsState[docId] = 1
 			indexer.numDocuments--
 		} else if !ok {
-			// 删除一个等待加入的文档
+			// 删除一个不存在或者等待加入的文档
 			indexer.tableLock.docsState[docId] = 1
 		}
 		indexer.tableLock.Unlock()
@@ -399,7 +397,7 @@ func (indexer *Indexer) Lookup(
 				}
 
 				// 计算搜索键在文档中的紧邻距离
-				tokenProximity, tokenLocations := computeTokenProximity(table[:len(tokens)], &indexPointers, tokens)
+				tokenProximity, tokenLocations := computeTokenProximity(table[:len(tokens)], indexPointers, tokens)
 				indexedDoc.TokenProximity = int32(tokenProximity)
 				indexedDoc.TokenSnippetLocations = tokenLocations
 
@@ -489,7 +487,7 @@ func (indexer *Indexer) searchIndex(
 //
 // 具体由动态规划实现,依次计算前 i 个 token 在每个出现位置的最优值。
 // 选定的 P_i 通过 tokenLocations 参数传回。
-func computeTokenProximity(table []*KeywordIndices, indexPointers *[]int, tokens []string) (
+func computeTokenProximity(table []*KeywordIndices, indexPointers []int, tokens []string) (
 	minTokenProximity int, tokenLocations []int) {
 	minTokenProximity = -1
 	tokenLocations = make([]int, len(tokens))
@@ -503,14 +501,14 @@ func computeTokenProximity(table []*KeywordIndices, indexPointers *[]int, tokens
 	// 初始化路径数组
 	path = make([][]int, len(tokens))
 	for i := 1; i < len(path); i++ {
-		path[i] = make([]int, len(table[i].locations[(*indexPointers)[i]]))
+		path[i] = make([]int, len(table[i].locations[indexPointers[i]]))
 	}
 
 	// 动态规划
-	currentLocations = table[0].locations[(*indexPointers)[0]]
+	currentLocations = table[0].locations[indexPointers[0]]
 	currentMinValues = make([]int, len(currentLocations))
 	for i := 1; i < len(tokens); i++ {
-		nextLocations = table[i].locations[(*indexPointers)[i]]
+		nextLocations = table[i].locations[indexPointers[i]]
 		nextMinValues = make([]int, len(nextLocations))
 		for j, _ := range nextMinValues {
 			nextMinValues[j] = -1
@@ -562,7 +560,7 @@ func computeTokenProximity(table []*KeywordIndices, indexPointers *[]int, tokens
 		if i != len(tokens)-1 {
 			cursor = path[i+1][cursor]
 		}
-		tokenLocations[i] = table[i].locations[(*indexPointers)[i]][cursor]
+		tokenLocations[i] = table[i].locations[indexPointers[i]][cursor]
 	}
 	return
 }

+ 1 - 1
docs/codelab.md

@@ -94,7 +94,7 @@ searcher.IndexDocument(docId, types.DocumentIndexData{
 })
 ```
 
-文档的docId必须唯一,对微博来说可以直接用微博的ID。悟空引擎允许你加入三种索引数据:
+文档的docId必须大于0且唯一,对微博来说可以直接用微博的ID。悟空引擎允许你加入三种索引数据:
 
 1. 文档的正文(content),会被分词为关键词(tokens)加入索引。
 2. 文档的关键词(tokens)。当正文为空的时候,允许用户绕过悟空内置的分词器直接输入文档关键词,这使得在引擎外部进行文档分词成为可能。

+ 2 - 2
docs/realtime_indexing.md

@@ -1,5 +1,5 @@
 ## 动态修改索引表
 
-悟空引擎支持搜索的同时添加索引(engine.IndexDocument函数),但由于添加索引时会对索引表进行写锁定,因此在添加索引的同时搜索性能会有所下降。请控制添加操作的频率,或者将大量添加操作转移到引擎比较空闲时进行。
+悟空引擎支持搜索的同时添加索引(engine.IndexDocument函数),但由于添加索引时会对索引表进行写锁定,因此在添加索引的同时搜索性能会有所下降。请控制添加操作的频率,或者将大量添加操作转移到引擎比较空闲时进行。删除一条文档(engine.RemoveDocument函数)也有同样的问题。
 
-删除一条文档(engine.RemoveDocument函数)也有同样的问题。但是删除操作不会对索引表进行修改,仅仅从排序器中删除该文档的自定义评分字段。因此,在悟空引擎上做大量的删除操作是内存低效的。当删除操作很频繁时,比如和添加操作的频率接近,建议周期性地重启引擎进行索引表重建。
+悟空引擎支持缓存插入和删除索引操作,实现批量插入和删除文档,以提高性能。同时删除操作支持从排序器中删除该文档的自定义评分字段。

+ 0 - 3
engine/engine.go

@@ -90,7 +90,6 @@ func (engine *Engine) Init(options types.EngineInitOptions) {
 
 	// 初始化分词器通道
 	engine.segmenterChannel = make(
-		//chan segmenterRequest)
 		chan segmenterRequest, options.NumSegmenterThreads)
 
 	// 初始化索引器通道
@@ -102,11 +101,9 @@ func (engine *Engine) Init(options types.EngineInitOptions) {
 		[]chan indexerLookupRequest, options.NumShards)
 	for shard := 0; shard < options.NumShards; shard++ {
 		engine.indexerAddDocChannels[shard] = make(
-			//chan indexerAddDocumentRequest)
 			chan indexerAddDocumentRequest,
 			options.IndexerBufferLength)
 		engine.indexerRemoveDocChannels[shard] = make(
-			//chan indexerRemoveDocRequest)
 			chan indexerRemoveDocRequest,
 			options.IndexerBufferLength)
 		engine.indexerLookupChannels[shard] = make(

+ 2 - 0
engine/engine_test.go

@@ -242,6 +242,7 @@ func TestRemoveDocument(t *testing.T) {
 
 	AddDocs(&engine)
 	engine.RemoveDocument(5, true)
+	engine.FlushIndex()
 
 	outputs := engine.Search(types.SearchRequest{Text: "中国人口"})
 	utils.Expect(t, "1", len(outputs.Docs))
@@ -382,6 +383,7 @@ func TestCountDocsOnly(t *testing.T) {
 
 	AddDocs(&engine)
 	engine.RemoveDocument(5, true)
+	engine.FlushIndex()
 
 	outputs := engine.Search(types.SearchRequest{Text: "中国人口", CountDocsOnly: true})
 	utils.Expect(t, "0", len(outputs.Docs))

+ 37 - 16
examples/benchmark.go

@@ -4,14 +4,17 @@ package main
 import (
 	"bufio"
 	"flag"
-	"github.com/huichen/wukong/engine"
-	"github.com/huichen/wukong/types"
 	"log"
+	"math/rand"
 	"os"
 	"runtime"
 	"runtime/pprof"
 	"strings"
+	"sync"
 	"time"
+
+	"github.com/huichen/wukong/engine"
+	"github.com/huichen/wukong/types"
 )
 
 const (
@@ -120,14 +123,16 @@ func main() {
 
 	// 建索引
 	log.Print("建索引 ... ")
-	docId := uint64(1)
+	// 打乱 docId 顺序进行测试,若 docId 最大值超 Int 则不能用 rand.Perm 方法
+	docIds := rand.Perm(*num_repeat_text * len(lines))
+	docIdx := 0
 	for i := 0; i < *num_repeat_text; i++ {
 		for _, line := range lines {
-			searcher.IndexDocument(docId, types.DocumentIndexData{
+			searcher.IndexDocument(uint64(docIds[docIdx]+1), types.DocumentIndexData{
 				Content: line}, false)
-			docId++
-			if docId-docId/1000000*1000000 == 0 {
-				log.Printf("已索引%d百万文档", docId/1000000)
+			docIdx++
+			if docIdx-docIdx/1000000*1000000 == 0 {
+				log.Printf("已索引%d百万文档", docIdx/1000000)
 				runtime.GC()
 			}
 		}
@@ -140,6 +145,7 @@ func main() {
 	log.Printf("建立索引花费时间 %v", t1.Sub(t0))
 	log.Printf("建立索引速度每秒添加 %f 百万个索引",
 		float64(searcher.NumTokenIndexAdded())/t1.Sub(t0).Seconds()/(1000000))
+	runtime.GC()
 
 	// 写入内存profile文件
 	if *memprofile != "" {
@@ -161,17 +167,14 @@ func main() {
 	log.Printf("删除 %d 条索引花费时间 %v", *num_delete_docs, t3.Sub(t2))
 
 	done := make(chan bool)
-	recordResponseLength := make(map[string]int)
+	recordResponse := recordResponseLock{}
+	recordResponse.count = make(map[string]int)
 	for iThread := 0; iThread < numQueryThreads; iThread++ {
-		go search(done, recordResponseLength)
+		go search(done, &recordResponse)
 	}
 	for iThread := 0; iThread < numQueryThreads; iThread++ {
 		<-done
 	}
-	// 测试搜索结果输出,因为不同 case 的 docId 对应不上,所以只测试总数
-	for keyword, count := range recordResponseLength {
-		log.Printf("关键词 [%s] 共搜索到 %d 个相关文档", keyword, count)
-	}
 
 	// 记录时间并计算分词速度
 	t4 := time.Now()
@@ -181,6 +184,13 @@ func main() {
 		float64(numRepeatQuery*numQueryThreads*len(searchQueries))/
 			t4.Sub(t3).Seconds())
 
+	// 测试搜索结果输出,因为不同 case 的 docId 对应不上,所以只测试总数
+	recordResponse.RLock()
+	for keyword, count := range recordResponse.count {
+		log.Printf("关键词 [%s] 共搜索到 %d 个相关文档", keyword, count)
+	}
+	recordResponse.RUnlock()
+
 	if *use_persistent {
 		searcher.Close()
 		t5 := time.Now()
@@ -209,12 +219,23 @@ func main() {
 	//os.RemoveAll(*persistent_storage_folder)
 }
 
-func search(ch chan bool, record map[string]int) {
+type recordResponseLock struct {
+	sync.RWMutex
+	count map[string]int
+}
+
+func search(ch chan bool, record *recordResponseLock) {
 	for i := 0; i < numRepeatQuery; i++ {
 		for _, query := range searchQueries {
 			output := searcher.Search(types.SearchRequest{Text: query})
-			if _, found := record[query]; !found {
-				record[query] = len(output.Docs)
+			record.RLock()
+			if _, found := record.count[query]; !found {
+				record.RUnlock()
+				record.Lock()
+				record.count[query] = len(output.Docs)
+				record.Unlock()
+			} else {
+				record.RUnlock()
 			}
 		}
 	}