indexer.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574
  1. package core
  2. import (
  3. "log"
  4. "math"
  5. "sort"
  6. "sync"
  7. "github.com/huichen/wukong/types"
  8. "github.com/huichen/wukong/utils"
  9. )
  10. // 索引器
  11. type Indexer struct {
  12. // 从搜索键到文档列表的反向索引
  13. // 加了读写锁以保证读写安全
  14. tableLock struct {
  15. sync.RWMutex
  16. table map[string]*KeywordIndices
  17. docsState map[uint64]int // nil: 表示无状态记录,0: 存在于索引中,1: 等待删除,2: 等待加入
  18. }
  19. addCacheLock struct {
  20. sync.RWMutex
  21. addCachePointer int
  22. addCache types.DocumentsIndex
  23. }
  24. removeCacheLock struct {
  25. sync.RWMutex
  26. removeCachePointer int
  27. removeCache types.DocumentsId
  28. }
  29. initOptions types.IndexerInitOptions
  30. initialized bool
  31. // 这实际上是总文档数的一个近似
  32. numDocuments uint64
  33. // 所有被索引文本的总关键词数
  34. totalTokenLength float32
  35. // 每个文档的关键词长度
  36. docTokenLengths map[uint64]float32
  37. }
  38. // 反向索引表的一行,收集了一个搜索键出现的所有文档,按照DocId从小到大排序。
  39. type KeywordIndices struct {
  40. // 下面的切片是否为空,取决于初始化时IndexType的值
  41. docIds []uint64 // 全部类型都有
  42. frequencies []float32 // IndexType == FrequenciesIndex
  43. locations [][]int // IndexType == LocationsIndex
  44. }
  45. // 初始化索引器
  46. func (indexer *Indexer) Init(options types.IndexerInitOptions) {
  47. if indexer.initialized == true {
  48. log.Fatal("索引器不能初始化两次")
  49. }
  50. options.Init()
  51. indexer.initOptions = options
  52. indexer.initialized = true
  53. indexer.tableLock.table = make(map[string]*KeywordIndices)
  54. indexer.tableLock.docsState = make(map[uint64]int)
  55. indexer.addCacheLock.addCache = make([]*types.DocumentIndex, indexer.initOptions.DocCacheSize)
  56. indexer.removeCacheLock.removeCache = make([]uint64, indexer.initOptions.DocCacheSize*2)
  57. indexer.docTokenLengths = make(map[uint64]float32)
  58. }
  59. // 从KeywordIndices中得到第i个文档的DocId
  60. func (indexer *Indexer) getDocId(ti *KeywordIndices, i int) uint64 {
  61. return ti.docIds[i]
  62. }
  63. // 得到KeywordIndices中文档总数
  64. func (indexer *Indexer) getIndexLength(ti *KeywordIndices) int {
  65. return len(ti.docIds)
  66. }
  67. // 向 ADDCACHE 中加入一个文档
  68. func (indexer *Indexer) AddDocumentToCache(document *types.DocumentIndex, forceUpdate bool) {
  69. if indexer.initialized == false {
  70. log.Fatal("索引器尚未初始化")
  71. }
  72. indexer.addCacheLock.Lock()
  73. if document != nil {
  74. indexer.addCacheLock.addCache[indexer.addCacheLock.addCachePointer] = document
  75. indexer.addCacheLock.addCachePointer++
  76. }
  77. if indexer.addCacheLock.addCachePointer >= indexer.initOptions.DocCacheSize || forceUpdate {
  78. indexer.tableLock.Lock()
  79. position := 0
  80. for i := 0; i < indexer.addCacheLock.addCachePointer; i++ {
  81. docIndex := indexer.addCacheLock.addCache[i]
  82. if docState, ok := indexer.tableLock.docsState[docIndex.DocId]; ok && docState <= 1 {
  83. // ok && docState == 0 表示存在于索引中,需先删除再添加
  84. // ok && docState == 1 表示不一定存在于索引中,等待删除,需先删除再添加
  85. if position != i {
  86. indexer.addCacheLock.addCache[position], indexer.addCacheLock.addCache[i] =
  87. indexer.addCacheLock.addCache[i], indexer.addCacheLock.addCache[position]
  88. }
  89. if docState == 0 {
  90. indexer.removeCacheLock.Lock()
  91. indexer.removeCacheLock.removeCache[indexer.removeCacheLock.removeCachePointer] =
  92. docIndex.DocId
  93. indexer.removeCacheLock.removeCachePointer++
  94. indexer.removeCacheLock.Unlock()
  95. indexer.tableLock.docsState[docIndex.DocId] = 1
  96. indexer.numDocuments--
  97. }
  98. position++
  99. } else if !ok {
  100. indexer.tableLock.docsState[docIndex.DocId] = 2
  101. }
  102. }
  103. indexer.tableLock.Unlock()
  104. if indexer.RemoveDocumentToCache(0, forceUpdate) {
  105. // 只有当存在于索引表中的文档已被删除,其才可以重新加入到索引表中
  106. position = 0
  107. }
  108. addCachedDocuments := indexer.addCacheLock.addCache[position:indexer.addCacheLock.addCachePointer]
  109. indexer.addCacheLock.addCachePointer = position
  110. indexer.addCacheLock.Unlock()
  111. sort.Sort(addCachedDocuments)
  112. indexer.AddDocuments(&addCachedDocuments)
  113. } else {
  114. indexer.addCacheLock.Unlock()
  115. }
  116. }
  117. // 向反向索引表中加入 ADDCACHE 中所有文档
  118. func (indexer *Indexer) AddDocuments(documents *types.DocumentsIndex) {
  119. if indexer.initialized == false {
  120. log.Fatal("索引器尚未初始化")
  121. }
  122. indexer.tableLock.Lock()
  123. defer indexer.tableLock.Unlock()
  124. indexPointers := make(map[string]int, len(indexer.tableLock.table))
  125. // DocId 递增顺序遍历插入文档保证索引移动次数最少
  126. for i, document := range *documents {
  127. if i < len(*documents)-1 && (*documents)[i].DocId == (*documents)[i+1].DocId {
  128. // 如果有重复文档加入,因为稳定排序,只加入最后一个
  129. continue
  130. }
  131. if docState, ok := indexer.tableLock.docsState[document.DocId]; ok && docState == 1 {
  132. // 如果此时 docState 仍为 1,说明该文档需被删除
  133. // docState 合法状态为 nil & 2,保证一定不会插入已经在索引表中的文档
  134. continue
  135. }
  136. // 更新文档关键词总长度
  137. if document.TokenLength != 0 {
  138. indexer.docTokenLengths[document.DocId] = float32(document.TokenLength)
  139. indexer.totalTokenLength += document.TokenLength
  140. }
  141. docIdIsNew := true
  142. for _, keyword := range document.Keywords {
  143. indices, foundKeyword := indexer.tableLock.table[keyword.Text]
  144. if !foundKeyword {
  145. // 如果没找到该搜索键则加入
  146. ti := KeywordIndices{}
  147. switch indexer.initOptions.IndexType {
  148. case types.LocationsIndex:
  149. ti.locations = [][]int{keyword.Starts}
  150. case types.FrequenciesIndex:
  151. ti.frequencies = []float32{keyword.Frequency}
  152. }
  153. ti.docIds = []uint64{document.DocId}
  154. indexer.tableLock.table[keyword.Text] = &ti
  155. continue
  156. }
  157. // 查找应该插入的位置,且索引一定不存在
  158. position, _ := indexer.searchIndex(
  159. indices, indexPointers[keyword.Text], indexer.getIndexLength(indices)-1, document.DocId)
  160. indexPointers[keyword.Text] = position
  161. switch indexer.initOptions.IndexType {
  162. case types.LocationsIndex:
  163. indices.locations = append(indices.locations, []int{})
  164. copy(indices.locations[position+1:], indices.locations[position:])
  165. indices.locations[position] = keyword.Starts
  166. case types.FrequenciesIndex:
  167. indices.frequencies = append(indices.frequencies, float32(0))
  168. copy(indices.frequencies[position+1:], indices.frequencies[position:])
  169. indices.frequencies[position] = keyword.Frequency
  170. }
  171. indices.docIds = append(indices.docIds, 0)
  172. copy(indices.docIds[position+1:], indices.docIds[position:])
  173. indices.docIds[position] = document.DocId
  174. }
  175. // 更新文章状态和总数
  176. if docIdIsNew {
  177. indexer.tableLock.docsState[document.DocId] = 0
  178. indexer.numDocuments++
  179. }
  180. }
  181. }
  182. // 向 REMOVECACHE 中加入一个待删除文档
  183. // 返回值表示文档是否在索引表中被删除
  184. func (indexer *Indexer) RemoveDocumentToCache(docId uint64, forceUpdate bool) bool {
  185. if indexer.initialized == false {
  186. log.Fatal("索引器尚未初始化")
  187. }
  188. indexer.removeCacheLock.Lock()
  189. if docId != 0 {
  190. indexer.tableLock.Lock()
  191. if docState, ok := indexer.tableLock.docsState[docId]; ok && docState == 0 {
  192. indexer.removeCacheLock.removeCache[indexer.removeCacheLock.removeCachePointer] = docId
  193. indexer.removeCacheLock.removeCachePointer++
  194. indexer.tableLock.docsState[docId] = 1
  195. indexer.numDocuments--
  196. } else if ok && docState == 2 {
  197. // 删除一个等待加入的文档
  198. indexer.tableLock.docsState[docId] = 1
  199. } else if !ok {
  200. // 若文档不存在,则无法判断其是否在 addCache 中,需避免这样的操作
  201. }
  202. indexer.tableLock.Unlock()
  203. }
  204. if indexer.removeCacheLock.removeCachePointer > 0 &&
  205. (indexer.removeCacheLock.removeCachePointer >= indexer.initOptions.DocCacheSize ||
  206. forceUpdate) {
  207. removeCachedDocuments := indexer.removeCacheLock.removeCache[:indexer.removeCacheLock.removeCachePointer]
  208. indexer.removeCacheLock.removeCachePointer = 0
  209. indexer.removeCacheLock.Unlock()
  210. sort.Sort(removeCachedDocuments)
  211. indexer.RemoveDocuments(&removeCachedDocuments)
  212. return true
  213. }
  214. indexer.removeCacheLock.Unlock()
  215. return false
  216. }
  217. // 向反向索引表中删除 REMOVECACHE 中所有文档
  218. func (indexer *Indexer) RemoveDocuments(documents *types.DocumentsId) {
  219. if indexer.initialized == false {
  220. log.Fatal("索引器尚未初始化")
  221. }
  222. indexer.tableLock.Lock()
  223. defer indexer.tableLock.Unlock()
  224. // 更新文档关键词总长度,删除文档状态
  225. for _, docId := range *documents {
  226. indexer.totalTokenLength -= indexer.docTokenLengths[docId]
  227. delete(indexer.docTokenLengths, docId)
  228. delete(indexer.tableLock.docsState, docId)
  229. }
  230. for keyword, indices := range indexer.tableLock.table {
  231. indicesTop, indicesPointer := 0, 0
  232. documentsPointer := sort.Search(
  233. len(*documents), func(i int) bool { return (*documents)[i] >= indices.docIds[0] })
  234. // 双指针扫描,进行批量删除操作
  235. for documentsPointer < len(*documents) && indicesPointer < indexer.getIndexLength(indices) {
  236. if indices.docIds[indicesPointer] < (*documents)[documentsPointer] {
  237. if indicesTop != indicesPointer {
  238. switch indexer.initOptions.IndexType {
  239. case types.LocationsIndex:
  240. indices.locations[indicesTop] = indices.locations[indicesPointer]
  241. case types.FrequenciesIndex:
  242. indices.frequencies[indicesTop] = indices.frequencies[indicesPointer]
  243. }
  244. indices.docIds[indicesTop] = indices.docIds[indicesPointer]
  245. }
  246. indicesTop++
  247. indicesPointer++
  248. } else if indices.docIds[indicesPointer] == (*documents)[documentsPointer] {
  249. indicesPointer++
  250. documentsPointer++
  251. } else {
  252. documentsPointer++
  253. }
  254. }
  255. if indicesTop != indicesPointer {
  256. switch indexer.initOptions.IndexType {
  257. case types.LocationsIndex:
  258. indices.locations = append(
  259. indices.locations[:indicesTop], indices.locations[indicesPointer:]...)
  260. case types.FrequenciesIndex:
  261. indices.frequencies = append(
  262. indices.frequencies[:indicesTop], indices.frequencies[indicesPointer:]...)
  263. }
  264. indices.docIds = append(
  265. indices.docIds[:indicesTop], indices.docIds[indicesPointer:]...)
  266. }
  267. if len(indices.docIds) == 0 {
  268. delete(indexer.tableLock.table, keyword)
  269. }
  270. }
  271. }
  272. // 查找包含全部搜索键(AND操作)的文档
  273. // 当docIds不为nil时仅从docIds指定的文档中查找
  274. func (indexer *Indexer) Lookup(
  275. tokens []string, labels []string, docIds map[uint64]bool, countDocsOnly bool) (docs []types.IndexedDocument, numDocs int) {
  276. if indexer.initialized == false {
  277. log.Fatal("索引器尚未初始化")
  278. }
  279. if indexer.numDocuments == 0 {
  280. return
  281. }
  282. numDocs = 0
  283. // 合并关键词和标签为搜索键
  284. keywords := make([]string, len(tokens)+len(labels))
  285. copy(keywords, tokens)
  286. copy(keywords[len(tokens):], labels)
  287. indexer.tableLock.RLock()
  288. defer indexer.tableLock.RUnlock()
  289. table := make([]*KeywordIndices, len(keywords))
  290. for i, keyword := range keywords {
  291. indices, found := indexer.tableLock.table[keyword]
  292. if !found {
  293. // 当反向索引表中无此搜索键时直接返回
  294. return
  295. } else {
  296. // 否则加入反向表中
  297. table[i] = indices
  298. }
  299. }
  300. // 当没有找到时直接返回
  301. if len(table) == 0 {
  302. return
  303. }
  304. // 归并查找各个搜索键出现文档的交集
  305. // 从后向前查保证先输出DocId较大文档
  306. indexPointers := make([]int, len(table))
  307. for iTable := 0; iTable < len(table); iTable++ {
  308. indexPointers[iTable] = indexer.getIndexLength(table[iTable]) - 1
  309. }
  310. // 平均文本关键词长度,用于计算BM25
  311. avgDocLength := indexer.totalTokenLength / float32(indexer.numDocuments)
  312. for ; indexPointers[0] >= 0; indexPointers[0]-- {
  313. // 以第一个搜索键出现的文档作为基准,并遍历其他搜索键搜索同一文档
  314. baseDocId := indexer.getDocId(table[0], indexPointers[0])
  315. if docIds != nil {
  316. if _, found := docIds[baseDocId]; !found {
  317. continue
  318. }
  319. }
  320. iTable := 1
  321. found := true
  322. for ; iTable < len(table); iTable++ {
  323. // 二分法比简单的顺序归并效率高,也有更高效率的算法,
  324. // 但顺序归并也许是更好的选择,考虑到将来需要用链表重新实现
  325. // 以避免反向表添加新文档时的写锁。
  326. // TODO: 进一步研究不同求交集算法的速度和可扩展性。
  327. position, foundBaseDocId := indexer.searchIndex(table[iTable],
  328. 0, indexPointers[iTable], baseDocId)
  329. if foundBaseDocId {
  330. indexPointers[iTable] = position
  331. } else {
  332. if position == 0 {
  333. // 该搜索键中所有的文档ID都比baseDocId大,因此已经没有
  334. // 继续查找的必要。
  335. return
  336. } else {
  337. // 继续下一indexPointers[0]的查找
  338. indexPointers[iTable] = position - 1
  339. found = false
  340. break
  341. }
  342. }
  343. }
  344. if found {
  345. if docState, ok := indexer.tableLock.docsState[baseDocId]; !ok || docState != 0 {
  346. continue
  347. }
  348. indexedDoc := types.IndexedDocument{}
  349. // 当为LocationsIndex时计算关键词紧邻距离
  350. if indexer.initOptions.IndexType == types.LocationsIndex {
  351. // 计算有多少关键词是带有距离信息的
  352. numTokensWithLocations := 0
  353. for i, t := range table[:len(tokens)] {
  354. if len(t.locations[indexPointers[i]]) > 0 {
  355. numTokensWithLocations++
  356. }
  357. }
  358. if numTokensWithLocations != len(tokens) {
  359. if !countDocsOnly {
  360. docs = append(docs, types.IndexedDocument{
  361. DocId: baseDocId,
  362. })
  363. }
  364. numDocs++
  365. //当某个关键字对应多个文档且有lable关键字存在时,若直接break,将会丢失相当一部分搜索结果
  366. continue
  367. }
  368. // 计算搜索键在文档中的紧邻距离
  369. tokenProximity, tokenLocations := computeTokenProximity(table[:len(tokens)], indexPointers, tokens)
  370. indexedDoc.TokenProximity = int32(tokenProximity)
  371. indexedDoc.TokenSnippetLocations = tokenLocations
  372. // 添加TokenLocations
  373. indexedDoc.TokenLocations = make([][]int, len(tokens))
  374. for i, t := range table[:len(tokens)] {
  375. indexedDoc.TokenLocations[i] = t.locations[indexPointers[i]]
  376. }
  377. }
  378. // 当为LocationsIndex或者FrequenciesIndex时计算BM25
  379. if indexer.initOptions.IndexType == types.LocationsIndex ||
  380. indexer.initOptions.IndexType == types.FrequenciesIndex {
  381. bm25 := float32(0)
  382. d := indexer.docTokenLengths[baseDocId]
  383. for i, t := range table[:len(tokens)] {
  384. var frequency float32
  385. if indexer.initOptions.IndexType == types.LocationsIndex {
  386. frequency = float32(len(t.locations[indexPointers[i]]))
  387. } else {
  388. frequency = t.frequencies[indexPointers[i]]
  389. }
  390. // 计算BM25
  391. if len(t.docIds) > 0 && frequency > 0 && indexer.initOptions.BM25Parameters != nil && avgDocLength != 0 {
  392. // 带平滑的idf
  393. idf := float32(math.Log2(float64(indexer.numDocuments)/float64(len(t.docIds)) + 1))
  394. k1 := indexer.initOptions.BM25Parameters.K1
  395. b := indexer.initOptions.BM25Parameters.B
  396. bm25 += idf * frequency * (k1 + 1) / (frequency + k1*(1-b+b*d/avgDocLength))
  397. }
  398. }
  399. indexedDoc.BM25 = float32(bm25)
  400. }
  401. indexedDoc.DocId = baseDocId
  402. if !countDocsOnly {
  403. docs = append(docs, indexedDoc)
  404. }
  405. numDocs++
  406. }
  407. }
  408. return
  409. }
  410. // 二分法查找indices中某文档的索引项
  411. // 第一个返回参数为找到的位置或需要插入的位置
  412. // 第二个返回参数标明是否找到
  413. func (indexer *Indexer) searchIndex(
  414. indices *KeywordIndices, start int, end int, docId uint64) (int, bool) {
  415. // 特殊情况
  416. if indexer.getIndexLength(indices) == start {
  417. return start, false
  418. }
  419. if docId < indexer.getDocId(indices, start) {
  420. return start, false
  421. } else if docId == indexer.getDocId(indices, start) {
  422. return start, true
  423. }
  424. if docId > indexer.getDocId(indices, end) {
  425. return end + 1, false
  426. } else if docId == indexer.getDocId(indices, end) {
  427. return end, true
  428. }
  429. // 二分
  430. var middle int
  431. for end-start > 1 {
  432. middle = (start + end) / 2
  433. if docId == indexer.getDocId(indices, middle) {
  434. return middle, true
  435. } else if docId > indexer.getDocId(indices, middle) {
  436. start = middle
  437. } else {
  438. end = middle
  439. }
  440. }
  441. return end, false
  442. }
  443. // 计算搜索键在文本中的紧邻距离
  444. //
  445. // 假定第 i 个搜索键首字节出现在文本中的位置为 P_i,长度 L_i
  446. // 紧邻距离计算公式为
  447. //
  448. // ArgMin(Sum(Abs(P_(i+1) - P_i - L_i)))
  449. //
  450. // 具体由动态规划实现,依次计算前 i 个 token 在每个出现位置的最优值。
  451. // 选定的 P_i 通过 tokenLocations 参数传回。
  452. func computeTokenProximity(table []*KeywordIndices, indexPointers []int, tokens []string) (
  453. minTokenProximity int, tokenLocations []int) {
  454. minTokenProximity = -1
  455. tokenLocations = make([]int, len(tokens))
  456. var (
  457. currentLocations, nextLocations []int
  458. currentMinValues, nextMinValues []int
  459. path [][]int
  460. )
  461. // 初始化路径数组
  462. path = make([][]int, len(tokens))
  463. for i := 1; i < len(path); i++ {
  464. path[i] = make([]int, len(table[i].locations[indexPointers[i]]))
  465. }
  466. // 动态规划
  467. currentLocations = table[0].locations[indexPointers[0]]
  468. currentMinValues = make([]int, len(currentLocations))
  469. for i := 1; i < len(tokens); i++ {
  470. nextLocations = table[i].locations[indexPointers[i]]
  471. nextMinValues = make([]int, len(nextLocations))
  472. for j, _ := range nextMinValues {
  473. nextMinValues[j] = -1
  474. }
  475. var iNext int
  476. for iCurrent, currentLocation := range currentLocations {
  477. if currentMinValues[iCurrent] == -1 {
  478. continue
  479. }
  480. for iNext+1 < len(nextLocations) && nextLocations[iNext+1] < currentLocation {
  481. iNext++
  482. }
  483. update := func(from int, to int) {
  484. if to >= len(nextLocations) {
  485. return
  486. }
  487. value := currentMinValues[from] + utils.AbsInt(nextLocations[to]-currentLocations[from]-len(tokens[i-1]))
  488. if nextMinValues[to] == -1 || value < nextMinValues[to] {
  489. nextMinValues[to] = value
  490. path[i][to] = from
  491. }
  492. }
  493. // 最优解的状态转移只发生在左右最接近的位置
  494. update(iCurrent, iNext)
  495. update(iCurrent, iNext+1)
  496. }
  497. currentLocations = nextLocations
  498. currentMinValues = nextMinValues
  499. }
  500. // 找出最优解
  501. var cursor int
  502. for i, value := range currentMinValues {
  503. if value == -1 {
  504. continue
  505. }
  506. if minTokenProximity == -1 || value < minTokenProximity {
  507. minTokenProximity = value
  508. cursor = i
  509. }
  510. }
  511. // 从路径倒推出最优解的位置
  512. for i := len(tokens) - 1; i >= 0; i-- {
  513. if i != len(tokens)-1 {
  514. cursor = path[i+1][cursor]
  515. }
  516. tokenLocations[i] = table[i].locations[indexPointers[i]][cursor]
  517. }
  518. return
  519. }