Browse Source

1. 修由 go routines 同步问题引起的 bugs

2. 加固 testcase

PS: 对同一文档进行的相关操作(如删除后添加),中间必须加入 flushIndex()
Zacky Su 9 năm trước cách đây
mục cha
commit
a1a7e8f875
7 tập tin đã thay đổi với 104 bổ sung60 xóa
  1. 22 14
      core/indexer.go
  2. 23 3
      core/indexer_test.go
  3. 10 6
      engine/engine.go
  4. 17 10
      engine/engine_test.go
  5. 20 15
      examples/benchmark.go
  6. 8 8
      examples/enjoy_wukong.go
  7. 4 4
      examples/simplest_example.go

+ 22 - 14
core/indexer.go

@@ -93,22 +93,24 @@ func (indexer *Indexer) AddDocumentToCache(document *types.DocumentIndex, forceU
 		position := 0
 		for i := 0; i < indexer.addCacheLock.addCachePointer; i++ {
 			docIndex := indexer.addCacheLock.addCache[i]
-			if docState, ok := indexer.tableLock.docsState[docIndex.DocId]; ok && docState == 0 {
+			if docState, ok := indexer.tableLock.docsState[docIndex.DocId]; ok && docState <= 1 {
 				// ok && docState == 0 表示存在于索引中,需先删除再添加
+				// ok && docState == 1 表示不一定存在于索引中,等待删除,需先删除再添加
 				if position != i {
 					indexer.addCacheLock.addCache[position], indexer.addCacheLock.addCache[i] =
 						indexer.addCacheLock.addCache[i], indexer.addCacheLock.addCache[position]
 				}
-				indexer.removeCacheLock.Lock()
-				indexer.removeCacheLock.removeCache[indexer.removeCacheLock.removeCachePointer] =
-					docIndex.DocId
-				indexer.removeCacheLock.removeCachePointer++
-				indexer.removeCacheLock.Unlock()
-				indexer.tableLock.docsState[docIndex.DocId] = 1
-				indexer.numDocuments--
+				if docState == 0 {
+					indexer.removeCacheLock.Lock()
+					indexer.removeCacheLock.removeCache[indexer.removeCacheLock.removeCachePointer] =
+						docIndex.DocId
+					indexer.removeCacheLock.removeCachePointer++
+					indexer.removeCacheLock.Unlock()
+					indexer.tableLock.docsState[docIndex.DocId] = 1
+					indexer.numDocuments--
+				}
 				position++
-			} else if !(ok && docState == 1) {
-				// ok && docState == 1 表示等待删除
+			} else if !ok {
 				indexer.tableLock.docsState[docIndex.DocId] = 2
 			}
 		}
@@ -147,6 +149,7 @@ func (indexer *Indexer) AddDocuments(documents *types.DocumentsIndex) {
 		}
 		if docState, ok := indexer.tableLock.docsState[document.DocId]; ok && docState == 1 {
 			// 如果此时 docState 仍为 1,说明该文档需被删除
+			// docState 合法状态为 nil & 2,保证一定不会插入已经在索引表中的文档
 			continue
 		}
 
@@ -215,9 +218,11 @@ func (indexer *Indexer) RemoveDocumentToCache(docId uint64, forceUpdate bool) bo
 			indexer.removeCacheLock.removeCachePointer++
 			indexer.tableLock.docsState[docId] = 1
 			indexer.numDocuments--
-		} else if !ok {
-			// 删除一个不存在或者等待加入的文档
+		} else if ok && docState == 2 {
+			// 删除一个等待加入的文档
 			indexer.tableLock.docsState[docId] = 1
+		} else if !ok {
+			// 若文档不存在,则无法判断其是否在 addCache 中,需避免这样的操作
 		}
 		indexer.tableLock.Unlock()
 	}
@@ -257,8 +262,7 @@ func (indexer *Indexer) RemoveDocuments(documents *types.DocumentsId) {
 		documentsPointer := sort.Search(
 			len(*documents), func(i int) bool { return (*documents)[i] >= indices.docIds[0] })
 		// 双指针扫描,进行批量删除操作
-		for ; documentsPointer < len(*documents) &&
-			indicesPointer < indexer.getIndexLength(indices); indicesPointer++ {
+		for documentsPointer < len(*documents) && indicesPointer < indexer.getIndexLength(indices) {
 			if indices.docIds[indicesPointer] < (*documents)[documentsPointer] {
 				if indicesTop != indicesPointer {
 					switch indexer.initOptions.IndexType {
@@ -270,6 +274,10 @@ func (indexer *Indexer) RemoveDocuments(documents *types.DocumentsId) {
 					indices.docIds[indicesTop] = indices.docIds[indicesPointer]
 				}
 				indicesTop++
+				indicesPointer++
+			} else if indices.docIds[indicesPointer] == (*documents)[documentsPointer] {
+				indicesPointer++
+				documentsPointer++
 			} else {
 				documentsPointer++
 			}

+ 23 - 3
core/indexer_test.go

@@ -45,6 +45,7 @@ func TestAddKeywords(t *testing.T) {
 func TestRemoveDocument(t *testing.T) {
 	var indexer Indexer
 	indexer.Init(types.IndexerInitOptions{IndexType: types.LocationsIndex})
+
 	// doc1 = "token2 token3"
 	indexer.AddDocumentToCache(&types.DocumentIndex{
 		DocId: 1,
@@ -98,12 +99,31 @@ func TestRemoveDocument(t *testing.T) {
 			{"token1", 0, []int{0}},
 			{"token2", 0, []int{7}},
 		},
-	}, false)
-	indexer.RemoveDocumentToCache(3, false)
-	indexer.AddDocumentToCache(nil, true)
+	}, true)
+	indexer.RemoveDocumentToCache(3, true)
 	utils.Expect(t, "1 2 ", indicesToString(&indexer, "token1"))
 	utils.Expect(t, "2 ", indicesToString(&indexer, "token2"))
 	utils.Expect(t, "1 2 ", indicesToString(&indexer, "token3"))
+
+	// doc2 = "token1 token2 token3"
+	indexer.AddDocumentToCache(&types.DocumentIndex{
+		DocId: 2,
+		Keywords: []types.KeywordIndex{
+			{"token2", 0, []int{0}},
+			{"token3", 0, []int{7}},
+		},
+	}, true)
+	// doc3 = "token1 token3"
+	indexer.AddDocumentToCache(&types.DocumentIndex{
+		DocId: 3,
+		Keywords: []types.KeywordIndex{
+			{"token1", 0, []int{0}},
+			{"token2", 0, []int{7}},
+		},
+	}, true)
+	utils.Expect(t, "1 3 ", indicesToString(&indexer, "token1"))
+	utils.Expect(t, "2 3 ", indicesToString(&indexer, "token2"))
+	utils.Expect(t, "1 2 ", indicesToString(&indexer, "token3"))
 }
 
 func TestLookupLocationsIndex(t *testing.T) {

+ 10 - 6
engine/engine.go

@@ -411,16 +411,20 @@ func (engine *Engine) Search(request types.SearchRequest) (output types.SearchRe
 
 // 阻塞等待直到所有索引添加完毕
 func (engine *Engine) FlushIndex() {
-	// 强制更新,CHANNEL 中 REQUESTS 的无序性可能会导致 CACHE 中有残留
-	engine.RemoveDocument(0, true)
-	engine.IndexDocument(0, types.DocumentIndexData{}, true)
 	for {
 		runtime.Gosched()
 		if engine.numIndexingRequests == engine.numDocumentsIndexed &&
 			engine.numRemovingRequests*uint64(engine.initOptions.NumShards) == engine.numDocumentsRemoved &&
-			engine.numForceUpdatingRequests*uint64(engine.initOptions.NumShards) ==
-				engine.numDocumentsForceUpdated && (!engine.initOptions.UsePersistentStorage ||
-			engine.numIndexingRequests == engine.numDocumentsStored) {
+			(!engine.initOptions.UsePersistentStorage || engine.numIndexingRequests == engine.numDocumentsStored) {
+			// 保证 CHANNEL 中 REQUESTS 全部被执行完
+			break
+		}
+	}
+	// 强制更新,保证其为最后的请求
+	engine.IndexDocument(0, types.DocumentIndexData{}, true)
+	for {
+		runtime.Gosched()
+		if engine.numForceUpdatingRequests*uint64(engine.initOptions.NumShards) == engine.numDocumentsForceUpdated {
 			return
 		}
 	}

+ 17 - 10
engine/engine_test.go

@@ -19,28 +19,27 @@ func AddDocs(engine *Engine) {
 	engine.IndexDocument(docId, types.DocumentIndexData{
 		Content: "中国有十三亿人口人口",
 		Fields:  ScoringFields{1, 2, 3},
-	}, true)
+	}, false)
 	docId++
 	engine.IndexDocument(docId, types.DocumentIndexData{
 		Content: "中国人口",
 		Fields:  nil,
-	}, true)
+	}, false)
 	docId++
 	engine.IndexDocument(docId, types.DocumentIndexData{
 		Content: "有人口",
 		Fields:  ScoringFields{2, 3, 1},
-	}, true)
+	}, false)
 	docId++
 	engine.IndexDocument(docId, types.DocumentIndexData{
 		Content: "有十三亿人口",
 		Fields:  ScoringFields{2, 3, 3},
-	}, true)
+	}, false)
 	docId++
 	engine.IndexDocument(docId, types.DocumentIndexData{
 		Content: "中国十三亿人口",
 		Fields:  ScoringFields{0, 9, 1},
-	}, true)
-
+	}, false)
 	engine.FlushIndex()
 }
 
@@ -241,14 +240,22 @@ func TestRemoveDocument(t *testing.T) {
 	})
 
 	AddDocs(&engine)
-	engine.RemoveDocument(5, true)
+	engine.RemoveDocument(5, false)
+	engine.RemoveDocument(6, true)
+	engine.FlushIndex()
+	engine.IndexDocument(6, types.DocumentIndexData{
+		Content: "中国人口有十三亿",
+		Fields:  ScoringFields{0, 9, 1},
+	}, false)
 	engine.FlushIndex()
 
 	outputs := engine.Search(types.SearchRequest{Text: "中国人口"})
-	utils.Expect(t, "1", len(outputs.Docs))
+	utils.Expect(t, "2", len(outputs.Docs))
 
-	utils.Expect(t, "1", outputs.Docs[0].DocId)
-	utils.Expect(t, "6000", int(outputs.Docs[0].Scores[0]*1000))
+	utils.Expect(t, "6", outputs.Docs[0].DocId)
+	utils.Expect(t, "9000", int(outputs.Docs[0].Scores[0]*1000))
+	utils.Expect(t, "1", outputs.Docs[1].DocId)
+	utils.Expect(t, "6000", int(outputs.Docs[1].Scores[0]*1000))
 }
 
 func TestEngineIndexDocumentWithTokens(t *testing.T) {

+ 20 - 15
examples/benchmark.go

@@ -145,6 +145,19 @@ func main() {
 	log.Printf("建立索引花费时间 %v", t1.Sub(t0))
 	log.Printf("建立索引速度每秒添加 %f 百万个索引",
 		float64(searcher.NumTokenIndexAdded())/t1.Sub(t0).Seconds()/(1000000))
+
+	// 记录时间并计算删除索引时间
+	t2 := time.Now()
+	for i := 1; i <= *num_delete_docs; i++ {
+		searcher.RemoveDocument(uint64(i), false)
+	}
+	searcher.FlushIndex()
+
+	t3 := time.Now()
+	log.Printf("删除 %d 条索引花费时间 %v", *num_delete_docs, t3.Sub(t2))
+
+	// 手动做 GC 防止影响性能测试
+	time.Sleep(time.Second)
 	runtime.GC()
 
 	// 写入内存profile文件
@@ -157,15 +170,7 @@ func main() {
 		defer f.Close()
 	}
 
-	// 记录时间并计算删除索引时间
-	t2 := time.Now()
-	for i := 1; i <= *num_delete_docs; i++ {
-		searcher.RemoveDocument(uint64(i), false)
-	}
-	searcher.FlushIndex()
-	t3 := time.Now()
-	log.Printf("删除 %d 条索引花费时间 %v", *num_delete_docs, t3.Sub(t2))
-
+	t4 := time.Now()
 	done := make(chan bool)
 	recordResponse := recordResponseLock{}
 	recordResponse.count = make(map[string]int)
@@ -177,12 +182,12 @@ func main() {
 	}
 
 	// 记录时间并计算分词速度
-	t4 := time.Now()
+	t5 := time.Now()
 	log.Printf("搜索平均响应时间 %v 毫秒",
-		t4.Sub(t3).Seconds()*1000/float64(numRepeatQuery*len(searchQueries)))
+		t5.Sub(t4).Seconds()*1000/float64(numRepeatQuery*len(searchQueries)))
 	log.Printf("搜索吞吐量每秒 %v 次查询",
 		float64(numRepeatQuery*numQueryThreads*len(searchQueries))/
-			t4.Sub(t3).Seconds())
+			t5.Sub(t4).Seconds())
 
 	// 测试搜索结果输出,因为不同 case 的 docId 对应不上,所以只测试总数
 	recordResponse.RLock()
@@ -193,7 +198,7 @@ func main() {
 
 	if *use_persistent {
 		searcher.Close()
-		t5 := time.Now()
+		t6 := time.Now()
 		searcher1 := engine.Engine{}
 		searcher1.Init(types.EngineInitOptions{
 			SegmenterDictionaries: *dictionaries,
@@ -208,8 +213,8 @@ func main() {
 			PersistentStorageShards: *persistent_storage_shards,
 		})
 		defer searcher1.Close()
-		t6 := time.Now()
-		t := t6.Sub(t5).Seconds() - tEndInit.Sub(tBeginInit).Seconds()
+		t7 := time.Now()
+		t := t7.Sub(t6).Seconds() - tEndInit.Sub(tBeginInit).Seconds()
 		log.Print("从持久存储加入的索引总数", searcher1.NumTokenIndexAdded())
 		log.Printf("从持久存储建立索引花费时间 %v 秒", t)
 		log.Printf("从持久存储建立索引速度每秒添加 %f 百万个索引",

+ 8 - 8
examples/enjoy_wukong.go

@@ -22,21 +22,21 @@ func (d *Data) Print() {
 }
 
 func main() {
-	datas := make([]Data, 0)
+	datas := []Data{}
 
-	data0 := Data{Id: 0, Content: "此次百度收购将成中国互联网最大并购", Labels: []string{"百度", "中国"}}
+	data0 := Data{Id: 1, Content: "此次百度收购将成中国互联网最大并购", Labels: []string{"百度", "中国"}}
 	datas = append(datas, data0)
 
-	data1 := Data{Id: 1, Content: "百度宣布拟全资收购91无线业务", Labels: []string{"百度"}}
+	data1 := Data{Id: 2, Content: "百度宣布拟全资收购91无线业务", Labels: []string{"百度"}}
 	datas = append(datas, data1)
 
-	data2 := Data{Id: 2, Content: "百度是中国最大的搜索引擎", Labels: []string{"百度"}}
+	data2 := Data{Id: 3, Content: "百度是中国最大的搜索引擎", Labels: []string{"百度"}}
 	datas = append(datas, data2)
 
-	data3 := Data{Id: 3, Content: "百度在研制无人汽车", Labels: []string{"百度"}}
+	data3 := Data{Id: 4, Content: "百度在研制无人汽车", Labels: []string{"百度"}}
 	datas = append(datas, data3)
 
-	data4 := Data{Id: 4, Content: "BAT是中国互联网三巨头", Labels: []string{"百度"}}
+	data4 := Data{Id: 5, Content: "BAT是中国互联网三巨头", Labels: []string{"百度"}}
 	datas = append(datas, data4)
 
 	// 初始化
@@ -52,7 +52,7 @@ func main() {
 
 	// 将文档加入索引
 	for _, data := range datas {
-		searcher.IndexDocument(uint64(data.Id), types.DocumentIndexData{Content: data.Content, Labels: data.Labels})
+		searcher.IndexDocument(uint64(data.Id), types.DocumentIndexData{Content: data.Content, Labels: data.Labels}, false)
 	}
 
 	// 等待索引刷新完毕
@@ -62,6 +62,6 @@ func main() {
 	res := searcher.Search(types.SearchRequest{Text: "百度"})
 	log.Println("关键字", res.Tokens, "共有", res.NumDocs, "条搜索结果")
 	for i := range res.Docs {
-		datas[res.Docs[i].DocId].Print()
+		datas[res.Docs[i].DocId-1].Print()
 	}
 }

+ 4 - 4
examples/simplest_example.go

@@ -23,10 +23,10 @@ func main() {
 		SegmenterDictionaries: "../data/dictionary.txt"})
 	defer searcher.Close()
 
-	// 将文档加入索引
-	searcher.IndexDocument(0, types.DocumentIndexData{Content: "此次百度收购将成中国互联网最大并购"})
-	searcher.IndexDocument(1, types.DocumentIndexData{Content: "百度宣布拟全资收购91无线业务"})
-	searcher.IndexDocument(2, types.DocumentIndexData{Content: "百度是中国最大的搜索引擎"})
+	// 将文档加入索引,docId 从1开始
+	searcher.IndexDocument(1, types.DocumentIndexData{Content: "此次百度收购将成中国互联网最大并购"}, false)
+	searcher.IndexDocument(2, types.DocumentIndexData{Content: "百度宣布拟全资收购91无线业务"}, false)
+	searcher.IndexDocument(3, types.DocumentIndexData{Content: "百度是中国最大的搜索引擎"}, false)
 
 	// 强制索引刷新
 	searcher.FlushIndex()