Преглед изворни кода

Merge pull request #25 from gqf2008/master

增加bolt存储引擎支持
Hui Chen пре 10 година
родитељ
комит
618bee365d
7 измењених фајлова са 264 додато и 42 уклоњено
  1. 6 3
      engine/engine.go
  2. 45 25
      engine/persistent_storage_worker.go
  3. 66 0
      storage/bolt_storage.go
  4. 63 0
      storage/kv_storage.go
  5. 42 0
      storage/storage.go
  6. 16 11
      utils/kv_utills.go
  7. 26 3
      utils/kv_utills_test.go

+ 6 - 3
engine/engine.go

@@ -2,10 +2,11 @@ package engine
 
 import (
 	"fmt"
-	"github.com/cznic/kv"
+	//"github.com/cznic/kv"
 	"github.com/huichen/murmur"
 	"github.com/huichen/sego"
 	"github.com/huichen/wukong/core"
+	kv "github.com/huichen/wukong/storage"
 	"github.com/huichen/wukong/types"
 	"github.com/huichen/wukong/utils"
 	"log"
@@ -37,7 +38,8 @@ type Engine struct {
 	rankers    []core.Ranker
 	segmenter  sego.Segmenter
 	stopTokens StopTokens
-	dbs        []*kv.DB
+	//dbs        []*kv.DB
+	dbs []kv.Storage
 
 	// 建立索引器使用的通信通道
 	segmenterChannel               chan segmenterRequest
@@ -158,7 +160,8 @@ func (engine *Engine) Init(options types.EngineInitOptions) {
 		}
 
 		// 打开或者创建数据库
-		engine.dbs = make([]*kv.DB, engine.initOptions.PersistentStorageShards)
+		//engine.dbs = make([]*kv.DB, engine.initOptions.PersistentStorageShards)
+		engine.dbs = make([]kv.Storage, engine.initOptions.PersistentStorageShards)
 		for shard := 0; shard < engine.initOptions.PersistentStorageShards; shard++ {
 			dbPath := engine.initOptions.PersistentStorageFolder + "/" + PersistentStorageFilePrefix + "." + strconv.Itoa(shard)
 			db, err := utils.OpenOrCreateKv(dbPath, &kv.Options{})

+ 45 - 25
engine/persistent_storage_worker.go

@@ -5,8 +5,8 @@ import (
 	"encoding/binary"
 	"encoding/gob"
 	"github.com/huichen/wukong/types"
-	"io"
-	"log"
+	//"io"
+	//"log"
 	"sync/atomic"
 )
 
@@ -47,24 +47,45 @@ func (engine *Engine) persistentStorageRemoveDocumentWorker(docId uint64, shard
 	engine.dbs[shard].Delete(b[0:length])
 }
 
-func (engine *Engine) persistentStorageInitWorker(shard int) {
-	iter, err := engine.dbs[shard].SeekFirst()
-	if err == io.EOF {
-		engine.persistentStorageInitChannel <- true
-		return
-	} else if err != nil {
-		engine.persistentStorageInitChannel <- true
-		log.Fatal("无法遍历数据库")
-	}
+// func (engine *Engine) persistentStorageInitWorker(shard int) {
+// 	iter, err := engine.dbs[shard].SeekFirst()
+// 	if err == io.EOF {
+// 		engine.persistentStorageInitChannel <- true
+// 		return
+// 	} else if err != nil {
+// 		engine.persistentStorageInitChannel <- true
+// 		log.Fatal("无法遍历数据库")
+// 	}
 
-	for {
-		key, value, err := iter.Next()
-		if err == io.EOF {
-			break
-		} else if err != nil {
-			continue
-		}
+// 	for {
+// 		key, value, err := iter.Next()
+// 		if err == io.EOF {
+// 			break
+// 		} else if err != nil {
+// 			continue
+// 		}
+
+// 		// 得到docID
+// 		docId, _ := binary.Uvarint(key)
+
+// 		// 得到data
+// 		buf := bytes.NewReader(value)
+// 		dec := gob.NewDecoder(buf)
+// 		var data types.DocumentIndexData
+// 		err = dec.Decode(&data)
+// 		if err != nil {
+// 			continue
+// 		}
 
+// 		// 添加索引
+// 		engine.internalIndexDocument(docId, data)
+// 	}
+// 	engine.persistentStorageInitChannel <- true
+// }
+
+func (engine *Engine) persistentStorageInitWorker(shard int) {
+	engine.dbs[shard].ForEach(func(k, v []byte) error {
+		key, value := k, v
 		// 得到docID
 		docId, _ := binary.Uvarint(key)
 
@@ -72,13 +93,12 @@ func (engine *Engine) persistentStorageInitWorker(shard int) {
 		buf := bytes.NewReader(value)
 		dec := gob.NewDecoder(buf)
 		var data types.DocumentIndexData
-		err = dec.Decode(&data)
-		if err != nil {
-			continue
+		err := dec.Decode(&data)
+		if err == nil {
+			// 添加索引
+			engine.internalIndexDocument(docId, data)
 		}
-
-		// 添加索引
-		engine.internalIndexDocument(docId, data)
-	}
+		return nil
+	})
 	engine.persistentStorageInitChannel <- true
 }

+ 66 - 0
storage/bolt_storage.go

@@ -0,0 +1,66 @@
+package storage
+
+import (
+	"github.com/boltdb/bolt"
+	"time"
+)
+
+var wukong_documents = []byte("wukong_documents")
+
+type boltStorage struct {
+	db *bolt.DB
+}
+
+func openBoltStorage(path string) (Storage, error) {
+	db, err := bolt.Open(path, 0600, &bolt.Options{Timeout: 3600 * time.Second})
+	if err != nil {
+		return nil, err
+	}
+	err = db.Update(func(tx *bolt.Tx) error {
+		_, err := tx.CreateBucketIfNotExists(wukong_documents)
+		return err
+	})
+	if err != nil {
+		db.Close()
+		return nil, err
+	}
+	return &boltStorage{db}, nil
+}
+
+func (s *boltStorage) WAlName() string {
+	return s.db.Path()
+}
+func (s *boltStorage) Set(k []byte, v []byte) error {
+	return s.db.Update(func(tx *bolt.Tx) error {
+		return tx.Bucket(wukong_documents).Put(k, v)
+	})
+}
+func (s *boltStorage) Get(k []byte) (b []byte, err error) {
+	err = s.db.View(func(tx *bolt.Tx) error {
+		b = tx.Bucket(wukong_documents).Get(k)
+		return nil
+	})
+	return
+}
+func (s *boltStorage) Delete(k []byte) error {
+	return s.db.Update(func(tx *bolt.Tx) error {
+		return tx.Bucket(wukong_documents).Delete(k)
+	})
+}
+
+func (s *boltStorage) ForEach(fn func(k, v []byte) error) error {
+	return s.db.View(func(tx *bolt.Tx) error {
+		b := tx.Bucket(wukong_documents)
+		c := b.Cursor()
+		for k, v := c.First(); k != nil; k, v = c.Next() {
+			if err := fn(k, v); err != nil {
+				return err
+			}
+		}
+		return nil
+	})
+}
+
+func (s *boltStorage) Close() error {
+	return s.db.Close()
+}

+ 63 - 0
storage/kv_storage.go

@@ -0,0 +1,63 @@
+package storage
+
+import (
+	"github.com/cznic/kv"
+)
+
+type kvStorage struct {
+	db *kv.DB
+}
+
+func openKVStorage(path string) (Storage, error) {
+	options := &kv.Options{}
+	db, errOpen := kv.Open(path, options)
+	if errOpen != nil {
+		var errCreate error
+		db, errCreate = kv.Create(path, options)
+		if errCreate != nil {
+			return db, errCreate
+		}
+	}
+	return db, nil
+}
+
+func (s *kvStorage) WAlName() string {
+	return s.db.WALName()
+}
+
+func (s *kvStorage) Set(k []byte, v []byte) error {
+	return s.db.Set(k, v)
+}
+
+func (s *kvStorage) Get(k []byte) ([]byte, error) {
+	return s.db.Get(nil, k)
+}
+
+func (s *kvStorage) Delete(k []byte) error {
+	return s.db.Delete(k)
+}
+
+func (s *kvStorage) ForEach(fn func(k, v []byte) error) error {
+	iter, err := s.db.SeekFirst()
+	if err == io.EOF {
+		return nil
+	} else if err != nil {
+		return err
+	}
+	for {
+		key, value, err := iter.Next()
+		if err == io.EOF {
+			break
+		} else if err != nil {
+			return err
+		}
+		if err := fn(key, value); err != nil {
+			return err
+		}
+	}
+	return nil
+}
+
+func (s *kvStorage) Close() error {
+	return s.db.Close()
+}

+ 42 - 0
storage/storage.go

@@ -0,0 +1,42 @@
+package storage
+
+import (
+	"fmt"
+	"os"
+	"time"
+)
+
+const DEFAULT_STORAGE_ENGIND = "bolt"
+
+var _supported_storage = map[string]func(path string) (Storage, error){
+	"kv":   openKVStorage,
+	"bolt": openBoltStorage,
+}
+
+func RegisterStorageEngine(name string, fn func(path string) (Storage, error)) {
+	_supported_storage[name] = fn
+}
+
+type Options struct {
+	Timeout time.Duration
+}
+
+type Storage interface {
+	Set(k, v []byte) error
+	Get(k []byte) ([]byte, error)
+	Delete(k []byte) error
+	ForEach(fn func(k, v []byte) error) error
+	Close() error
+	WAlName() string
+}
+
+func OpenStorage(path string) (Storage, error) {
+	wse := os.Getenv("WUKONG_STORAGE_ENGINE")
+	if wse == "" {
+		wse = DEFAULT_STORAGE_ENGIND
+	}
+	if has, fn := _supported_storage[wse]; has {
+		return fn(path)
+	}
+	return nil, fmt.Errorf("unsupported storage engine %v", wse)
+}

+ 16 - 11
utils/kv_utills.go

@@ -1,20 +1,25 @@
 package utils
 
 import (
-	"github.com/cznic/kv"
+	//"github.com/cznic/kv"
+	"github.com/huichen/wukong/storage"
 )
 
 // 打开或者创建KV数据库
 // 当path指向的数据库存在时打开该数据库,否则尝试在该路径处创建新数据库
-func OpenOrCreateKv(path string, options *kv.Options) (*kv.DB, error) {
-	db, errOpen := kv.Open(path, options)
-	if errOpen != nil {
-		var errCreate error
-		db, errCreate = kv.Create(path, options)
-		if errCreate != nil {
-			return db, errCreate
-		}
-	}
+// func OpenOrCreateKv(path string, options *kv.Options) (*kv.DB, error) {
+// 	db, errOpen := kv.Open(path, options)
+// 	if errOpen != nil {
+// 		var errCreate error
+// 		db, errCreate = kv.Create(path, options)
+// 		if errCreate != nil {
+// 			return db, errCreate
+// 		}
+// 	}
 
-	return db, nil
+// 	return db, nil
+// }
+
+func OpenOrCreateKv(path string, options *storage.Options) (storage.Storage, error) {
+	return storage.OpenStorage(path)
 }

+ 26 - 3
utils/kv_utills_test.go

@@ -1,18 +1,41 @@
 package utils
 
 import (
-	"github.com/cznic/kv"
+	//"github.com/cznic/kv"
+	"github.com/huichen/wukong/storage"
 	"os"
 	"testing"
 )
 
+// func TestOpenOrCreateKv(t *testing.T) {
+// 	db, err := OpenOrCreateKv("test.kv", &kv.Options{})
+// 	Expect(t, "<nil>", err)
+// 	db.Close()
+
+// 	db, err = OpenOrCreateKv("test.kv", &kv.Options{})
+// 	Expect(t, "<nil>", err)
+// 	err = db.Set([]byte("key1"), []byte("value1"))
+// 	Expect(t, "<nil>", err)
+
+// 	buffer := make([]byte, 100)
+// 	buffer, err = db.Get(nil, []byte("key1"))
+// 	Expect(t, "<nil>", err)
+// 	Expect(t, "value1", string(buffer))
+
+// 	walFile := db.WALName()
+// 	db.Close()
+// 	os.Remove(walFile)
+// 	os.Remove("test.kv")
+// }
+
 func TestOpenOrCreateKv(t *testing.T) {
-	db, err := OpenOrCreateKv("test.kv", &kv.Options{})
+	db, err := OpenOrCreateKv("test.kv", &storage.Options{})
 	Expect(t, "<nil>", err)
 	db.Close()
 
-	db, err = OpenOrCreateKv("test.kv", &kv.Options{})
+	db, err = OpenOrCreateKv("test.kv", &storage.Options{})
 	Expect(t, "<nil>", err)
+	db.Buc
 	err = db.Set([]byte("key1"), []byte("value1"))
 	Expect(t, "<nil>", err)