Browse Source

refactor storage 代码

geili 10 years ago
parent
commit
1325da904c

+ 5 - 8
engine/engine.go

@@ -2,11 +2,10 @@ package engine
 
 import (
 	"fmt"
-	//"github.com/cznic/kv"
 	"github.com/huichen/murmur"
 	"github.com/huichen/sego"
 	"github.com/huichen/wukong/core"
-	kv "github.com/huichen/wukong/storage"
+	"github.com/huichen/wukong/storage"
 	"github.com/huichen/wukong/types"
 	"github.com/huichen/wukong/utils"
 	"log"
@@ -38,8 +37,7 @@ type Engine struct {
 	rankers    []core.Ranker
 	segmenter  sego.Segmenter
 	stopTokens StopTokens
-	//dbs        []*kv.DB
-	dbs []kv.Storage
+	dbs []storage.Storage
 
 	// 建立索引器使用的通信通道
 	segmenterChannel               chan segmenterRequest
@@ -160,11 +158,10 @@ func (engine *Engine) Init(options types.EngineInitOptions) {
 		}
 
 		// 打开或者创建数据库
-		//engine.dbs = make([]*kv.DB, engine.initOptions.PersistentStorageShards)
-		engine.dbs = make([]kv.Storage, engine.initOptions.PersistentStorageShards)
+		engine.dbs = make([]storage.Storage, engine.initOptions.PersistentStorageShards)
 		for shard := 0; shard < engine.initOptions.PersistentStorageShards; shard++ {
 			dbPath := engine.initOptions.PersistentStorageFolder + "/" + PersistentStorageFilePrefix + "." + strconv.Itoa(shard)
-			db, err := utils.OpenOrCreateKv(dbPath, &kv.Options{})
+			db, err := storage.OpenStorage(dbPath)
 			if db == nil || err != nil {
 				log.Fatal("无法打开数据库", dbPath, ": ", err)
 			}
@@ -191,7 +188,7 @@ func (engine *Engine) Init(options types.EngineInitOptions) {
 		for shard := 0; shard < engine.initOptions.PersistentStorageShards; shard++ {
 			engine.dbs[shard].Close()
 			dbPath := engine.initOptions.PersistentStorageFolder + "/" + PersistentStorageFilePrefix + "." + strconv.Itoa(shard)
-			db, err := utils.OpenOrCreateKv(dbPath, &kv.Options{})
+			db, err := storage.OpenStorage(dbPath)
 			if db == nil || err != nil {
 				log.Fatal("无法打开数据库", dbPath, ": ", err)
 			}

+ 0 - 38
engine/persistent_storage_worker.go

@@ -5,8 +5,6 @@ import (
 	"encoding/binary"
 	"encoding/gob"
 	"github.com/huichen/wukong/types"
-	//"io"
-	//"log"
 	"sync/atomic"
 )
 
@@ -47,42 +45,6 @@ func (engine *Engine) persistentStorageRemoveDocumentWorker(docId uint64, shard
 	engine.dbs[shard].Delete(b[0:length])
 }
 
-// func (engine *Engine) persistentStorageInitWorker(shard int) {
-// 	iter, err := engine.dbs[shard].SeekFirst()
-// 	if err == io.EOF {
-// 		engine.persistentStorageInitChannel <- true
-// 		return
-// 	} else if err != nil {
-// 		engine.persistentStorageInitChannel <- true
-// 		log.Fatal("无法遍历数据库")
-// 	}
-
-// 	for {
-// 		key, value, err := iter.Next()
-// 		if err == io.EOF {
-// 			break
-// 		} else if err != nil {
-// 			continue
-// 		}
-
-// 		// 得到docID
-// 		docId, _ := binary.Uvarint(key)
-
-// 		// 得到data
-// 		buf := bytes.NewReader(value)
-// 		dec := gob.NewDecoder(buf)
-// 		var data types.DocumentIndexData
-// 		err = dec.Decode(&data)
-// 		if err != nil {
-// 			continue
-// 		}
-
-// 		// 添加索引
-// 		engine.internalIndexDocument(docId, data)
-// 	}
-// 	engine.persistentStorageInitChannel <- true
-// }
-
 func (engine *Engine) persistentStorageInitWorker(shard int) {
 	engine.dbs[shard].ForEach(func(k, v []byte) error {
 		key, value := k, v

+ 4 - 1
storage/bolt_storage.go

@@ -27,14 +27,16 @@ func openBoltStorage(path string) (Storage, error) {
 	return &boltStorage{db}, nil
 }
 
-func (s *boltStorage) WAlName() string {
+func (s *boltStorage) WALName() string {
 	return s.db.Path()
 }
+
 func (s *boltStorage) Set(k []byte, v []byte) error {
 	return s.db.Update(func(tx *bolt.Tx) error {
 		return tx.Bucket(wukong_documents).Put(k, v)
 	})
 }
+
 func (s *boltStorage) Get(k []byte) (b []byte, err error) {
 	err = s.db.View(func(tx *bolt.Tx) error {
 		b = tx.Bucket(wukong_documents).Get(k)
@@ -42,6 +44,7 @@ func (s *boltStorage) Get(k []byte) (b []byte, err error) {
 	})
 	return
 }
+
 func (s *boltStorage) Delete(k []byte) error {
 	return s.db.Update(func(tx *bolt.Tx) error {
 		return tx.Bucket(wukong_documents).Delete(k)

+ 1 - 1
storage/kv_storage.go

@@ -21,7 +21,7 @@ func openKVStorage(path string) (Storage, error) {
 	return db, nil
 }
 
-func (s *kvStorage) WAlName() string {
+func (s *kvStorage) WALName() string {
 	return s.db.WALName()
 }
 

+ 29 - 0
storage/kv_storage_test.go

@@ -0,0 +1,29 @@
+package storage
+
+import (
+	"github.com/huichen/wukong/storage"
+	"os"
+	"testing"
+)
+
+func TestOpenOrCreateKv(t *testing.T) {
+	db, err := storage.OpenStorage("test")
+	Expect(t, "<nil>", err)
+	db.Close()
+
+	db, err := storage.OpenStorage("test")
+	Expect(t, "<nil>", err)
+	db.Buc
+	err = db.Set([]byte("key1"), []byte("value1"))
+	Expect(t, "<nil>", err)
+
+	buffer := make([]byte, 100)
+	buffer, err = db.Get(nil, []byte("key1"))
+	Expect(t, "<nil>", err)
+	Expect(t, "value1", string(buffer))
+
+	walFile := db.WALName()
+	db.Close()
+	os.Remove(walFile)
+	os.Remove("test")
+}

+ 6 - 10
storage/storage.go

@@ -6,19 +6,15 @@ import (
 	"time"
 )
 
-const DEFAULT_STORAGE_ENGIND = "bolt"
+const DEFAULT_STORAGE_ENGINE = "bolt"
 
-var _supported_storage = map[string]func(path string) (Storage, error){
+var supportedStorage = map[string]func(path string) (Storage, error){
 	"kv":   openKVStorage,
 	"bolt": openBoltStorage,
 }
 
 func RegisterStorageEngine(name string, fn func(path string) (Storage, error)) {
-	_supported_storage[name] = fn
-}
-
-type Options struct {
-	Timeout time.Duration
+	supportedStorage[name] = fn
 }
 
 type Storage interface {
@@ -27,15 +23,15 @@ type Storage interface {
 	Delete(k []byte) error
 	ForEach(fn func(k, v []byte) error) error
 	Close() error
-	WAlName() string
+	WALName() string
 }
 
 func OpenStorage(path string) (Storage, error) {
 	wse := os.Getenv("WUKONG_STORAGE_ENGINE")
 	if wse == "" {
-		wse = DEFAULT_STORAGE_ENGIND
+		wse = DEFAULT_STORAGE_ENGINE
 	}
-	if has, fn := _supported_storage[wse]; has {
+	if has, fn := supportedStorage[wse]; has {
 		return fn(path)
 	}
 	return nil, fmt.Errorf("unsupported storage engine %v", wse)

+ 0 - 25
utils/kv_utills.go

@@ -1,25 +0,0 @@
-package utils
-
-import (
-	//"github.com/cznic/kv"
-	"github.com/huichen/wukong/storage"
-)
-
-// 打开或者创建KV数据库
-// 当path指向的数据库存在时打开该数据库,否则尝试在该路径处创建新数据库
-// func OpenOrCreateKv(path string, options *kv.Options) (*kv.DB, error) {
-// 	db, errOpen := kv.Open(path, options)
-// 	if errOpen != nil {
-// 		var errCreate error
-// 		db, errCreate = kv.Create(path, options)
-// 		if errCreate != nil {
-// 			return db, errCreate
-// 		}
-// 	}
-
-// 	return db, nil
-// }
-
-func OpenOrCreateKv(path string, options *storage.Options) (storage.Storage, error) {
-	return storage.OpenStorage(path)
-}

+ 0 - 51
utils/kv_utills_test.go

@@ -1,51 +0,0 @@
-package utils
-
-import (
-	//"github.com/cznic/kv"
-	"github.com/huichen/wukong/storage"
-	"os"
-	"testing"
-)
-
-// func TestOpenOrCreateKv(t *testing.T) {
-// 	db, err := OpenOrCreateKv("test.kv", &kv.Options{})
-// 	Expect(t, "<nil>", err)
-// 	db.Close()
-
-// 	db, err = OpenOrCreateKv("test.kv", &kv.Options{})
-// 	Expect(t, "<nil>", err)
-// 	err = db.Set([]byte("key1"), []byte("value1"))
-// 	Expect(t, "<nil>", err)
-
-// 	buffer := make([]byte, 100)
-// 	buffer, err = db.Get(nil, []byte("key1"))
-// 	Expect(t, "<nil>", err)
-// 	Expect(t, "value1", string(buffer))
-
-// 	walFile := db.WALName()
-// 	db.Close()
-// 	os.Remove(walFile)
-// 	os.Remove("test.kv")
-// }
-
-func TestOpenOrCreateKv(t *testing.T) {
-	db, err := OpenOrCreateKv("test.kv", &storage.Options{})
-	Expect(t, "<nil>", err)
-	db.Close()
-
-	db, err = OpenOrCreateKv("test.kv", &storage.Options{})
-	Expect(t, "<nil>", err)
-	db.Buc
-	err = db.Set([]byte("key1"), []byte("value1"))
-	Expect(t, "<nil>", err)
-
-	buffer := make([]byte, 100)
-	buffer, err = db.Get(nil, []byte("key1"))
-	Expect(t, "<nil>", err)
-	Expect(t, "value1", string(buffer))
-
-	walFile := db.WALName()
-	db.Close()
-	os.Remove(walFile)
-	os.Remove("test.kv")
-}