Ver código fonte

修正持久存储写入shard不确定的错误

Hui Chen 12 anos atrás
pai
commit
c876966fbf

+ 1 - 1
docs/persistent_storage.md

@@ -21,7 +21,7 @@ type EngineInitOptions struct {
 或者词典有变化,这些变化会体现在启动后的引擎索引表中。
 2. 在调用engine.IndexDocument时,引擎将索引数据写入到PersistentStorageFolder指定
 的目录中。
-3. PersistentStorageShards定义了数据库裂分数目,默认为CPU数目。为了得到最好的性能,请调整这个参数使得每个裂分文件小于100M。
+3. PersistentStorageShards定义了数据库裂分数目,默认为8。为了得到最好的性能,请调整这个参数使得每个裂分文件小于100M。
 4. 在调用engine.RemoveDocument删除一个文档后,该文档会从持久存储中剔除,下次启动
 引擎时不会载入该文档。
 

+ 14 - 9
engine/engine.go

@@ -50,8 +50,8 @@ type Engine struct {
 	rankerRemoveScoringFieldsChannels []chan rankerRemoveScoringFieldsRequest
 
 	// 建立持久存储使用的通信通道
-	persistentStorageIndexDocumentChannel chan persistentStorageIndexDocumentRequest
-	persistentStorageInitChannel          chan bool
+	persistentStorageIndexDocumentChannels []chan persistentStorageIndexDocumentRequest
+	persistentStorageInitChannel           chan bool
 }
 
 func (engine *Engine) Init(options types.EngineInitOptions) {
@@ -120,9 +120,13 @@ func (engine *Engine) Init(options types.EngineInitOptions) {
 
 	// 初始化持久化存储通道
 	if engine.initOptions.UsePersistentStorage {
-		engine.persistentStorageIndexDocumentChannel = make(
-			chan persistentStorageIndexDocumentRequest,
-			engine.initOptions.PersistentStorageShards)
+		engine.persistentStorageIndexDocumentChannels =
+			make([]chan persistentStorageIndexDocumentRequest,
+				engine.initOptions.PersistentStorageShards)
+		for shard := 0; shard < engine.initOptions.PersistentStorageShards; shard++ {
+			engine.persistentStorageIndexDocumentChannels[shard] = make(
+				chan persistentStorageIndexDocumentRequest)
+		}
 		engine.persistentStorageInitChannel = make(
 			chan bool, engine.initOptions.PersistentStorageShards)
 	}
@@ -211,8 +215,10 @@ func (engine *Engine) Init(options types.EngineInitOptions) {
 //         如果立刻调用Search可能无法查询到这个文档。强制刷新索引请调用FlushIndex函数。
 func (engine *Engine) IndexDocument(docId uint64, data types.DocumentIndexData) {
 	engine.internalIndexDocument(docId, data)
+
+	hash := murmur.Murmur3([]byte(fmt.Sprint("%d", docId))) % uint32(engine.initOptions.PersistentStorageShards)
 	if engine.initOptions.UsePersistentStorage {
-		engine.persistentStorageIndexDocumentChannel <- persistentStorageIndexDocumentRequest{docId: docId, data: data}
+		engine.persistentStorageIndexDocumentChannels[hash] <- persistentStorageIndexDocumentRequest{docId: docId, data: data}
 	}
 }
 
@@ -245,9 +251,8 @@ func (engine *Engine) RemoveDocument(docId uint64) {
 
 	if engine.initOptions.UsePersistentStorage {
 		// 从数据库中删除
-		for shard := 0; shard < engine.initOptions.PersistentStorageShards; shard++ {
-			go engine.persistentStorageRemoveDocumentWorker(docId, shard)
-		}
+		hash := murmur.Murmur3([]byte(fmt.Sprint("%d", docId))) % uint32(engine.initOptions.PersistentStorageShards)
+		go engine.persistentStorageRemoveDocumentWorker(docId, hash)
 	}
 }
 

+ 2 - 2
engine/persistent_storage_worker.go

@@ -17,7 +17,7 @@ type persistentStorageIndexDocumentRequest struct {
 
 func (engine *Engine) persistentStorageIndexDocumentWorker(shard int) {
 	for {
-		request := <-engine.persistentStorageIndexDocumentChannel
+		request := <-engine.persistentStorageIndexDocumentChannels[shard]
 
 		// 得到key
 		b := make([]byte, 8)
@@ -38,7 +38,7 @@ func (engine *Engine) persistentStorageIndexDocumentWorker(shard int) {
 	}
 }
 
-func (engine *Engine) persistentStorageRemoveDocumentWorker(docId uint64, shard int) {
+func (engine *Engine) persistentStorageRemoveDocumentWorker(docId uint64, shard uint32) {
 	// 得到key
 	b := make([]byte, 8)
 	length := binary.PutUvarint(b, docId)

+ 4 - 1
examples/codelab/search_server.go

@@ -152,6 +152,8 @@ func main() {
 		IndexerInitOptions: &types.IndexerInitOptions{
 			IndexType: types.LocationsIndex,
 		},
+		UsePersistentStorage: true,
+		PersistentStorageFolder: "db",
 	})
 	wbs = make(map[uint64]Weibo)
 
@@ -163,8 +165,9 @@ func main() {
 	signal.Notify(c, os.Interrupt)
 	go func(){
 		for _ = range c {
+			log.Print("捕获Ctrl-c,退出服务器")
 			searcher.Close()
-			os.Exit(1)
+			os.Exit(0)
 		}
 	}()
 

+ 1 - 1
types/engine_init_options.go

@@ -24,7 +24,7 @@ var (
 		K1: 2.0,
 		B:  0.75,
 	}
-	defaultPersistentStorageShards = runtime.NumCPU()
+	defaultPersistentStorageShards = 8
 )
 
 type EngineInitOptions struct {