Browse Source

add: wukongd

yll 4 years ago
parent
commit
cc7bd5c8aa
8 changed files with 369 additions and 0 deletions
  1. 1 0
      .gitignore
  2. 10 0
      wukongd/build.sh
  3. 98 0
      wukongd/config.go
  4. 136 0
      wukongd/http.go
  5. 1 0
      wukongd/logger.go
  6. 87 0
      wukongd/main.go
  7. 17 0
      wukongd/version.go
  8. 19 0
      wukongd/wukongd.conf.sample

+ 1 - 0
.gitignore

@@ -12,6 +12,7 @@ data/db
 *.log
 *.toml
 *.prof
+*.conf
 
 # editor
 .vscode

+ 10 - 0
wukongd/build.sh

@@ -0,0 +1,10 @@
+#!/usr/bin/env bash
+
+# please use git bash on windows
+
+COMMITID=$(git show -s --format=%H)
+BRANCH=$(git branch --show-current)
+GOVERSION=$(go version)
+BUILDTIME=$(date +"%Y-%m-%d %H:%M:%S %z")
+
+go build -tags purego -ldflags "-X 'main.gitCommit=$COMMITID' -X 'main.gitBranch=$BRANCH' -X 'main.buildTime=$BUILDTIME' -X 'main.goVersion=$GOVERSION'"

+ 98 - 0
wukongd/config.go

@@ -0,0 +1,98 @@
+package main
+
+import (
+	"errors"
+	"fmt"
+	"github.com/BurntSushi/toml"
+	"net"
+	"os"
+	"strings"
+)
+
+var (
+	config *Config
+)
+
+type server struct {
+	Addr, Mode string
+}
+
+type Loggerger struct {
+	File, Level string
+}
+
+type engine struct {
+	Dictionary          string
+	Stoptoken           string
+	IndexType           int
+	ShardsNum           int
+	OutputOffset        int
+	MaxOutput           int
+	Persistent          bool
+	PersistentEngine    string
+	PersistentFolder    string
+	PersistentShardsNum int
+}
+
+type Config struct {
+	Server server
+	Logger Loggerger
+	Engine engine
+}
+
+func loadConfig(from string) (*Config, error) {
+	var config Config
+	if _, err := toml.DecodeFile(from, &config); err != nil {
+		return nil, err
+	}
+	err := checkConfig(&config)
+	if err != nil {
+		return nil, err
+	}
+
+	return &config, nil
+}
+
+func checkConfig(c *Config) error {
+	// server
+	_, err := net.ResolveTCPAddr("tcp", c.Server.Addr)
+	if err != nil {
+		return err
+	}
+	c.Server.Mode = strings.ToLower(c.Server.Mode)
+	fmt.Println(c)
+	if c.Server.Mode != "debug" && c.Server.Mode != "release" {
+		return errors.New("invalid server.Mode config")
+	}
+	//Loggerger
+	c.Logger.Level = strings.ToLower(c.Logger.Level)
+	if c.Logger.Level != "debug" && c.Logger.Level != "info" && c.Logger.Level != "warn" && c.Logger.Level != "error" {
+		return errors.New("invalid Loggerger.Level config")
+	}
+
+	// engine
+	if !exists(c.Engine.Dictionary) {
+		return errors.New("dictionary not exists")
+	}
+	if !exists(c.Engine.Stoptoken) {
+		return errors.New("stoptoken not exists")
+	}
+	if !exists(c.Engine.PersistentFolder) {
+		return errors.New("PersistentFolder not exists")
+	}
+	if c.Engine.IndexType != 0 && c.Engine.IndexType != 1 && c.Engine.IndexType != 2 {
+		return errors.New("invalid engine.index_type")
+	}
+	if c.Engine.PersistentEngine != "kv" && c.Engine.PersistentEngine != "bolt" {
+		return errors.New("invalid engine.persistent_engine")
+	}
+	return nil
+}
+
+func exists(fileOrDir string) bool {
+	_, err := os.Stat(fileOrDir)
+	if err != nil {
+		fmt.Println(err)
+	}
+	return err == nil
+}

+ 136 - 0
wukongd/http.go

@@ -0,0 +1,136 @@
+package main
+
+import (
+	"context"
+	"encoding/json"
+	"fmt"
+	"github.com/gorilla/mux"
+	"github.com/huichen/wukong/types"
+	"net/http"
+	"time"
+)
+
+var (
+	_httpServer *http.Server
+)
+
+type ParamIndex struct {
+	Key         string `json:"key"`
+	Content     string `json:"content"`
+	ForceUpdate bool   `json:"forceUpdate"`
+}
+
+type ParamDelete struct {
+	Key         string `json:"key"`
+	ForceUpdate bool   `json:"forceUpdate"`
+}
+
+type ParamShutdown struct {
+}
+
+type ParamSearch struct {
+	Keywords string `json:"keywords"`
+}
+
+type ReturnSearch types.SearchResponse
+
+func startHttp(addr string) error {
+	router := mux.NewRouter()
+	v1 := router.PathPrefix("/v1").Subrouter()
+	v1.HandleFunc("/index", index_v1).Methods("POST")
+	v1.HandleFunc("/index", delete_index_v1).Methods("DELETE")
+	v1.HandleFunc("/delete_index", delete_index_v1).Methods("POST")
+	v1.HandleFunc("/search", search_v1).Methods("GET").Queries("keywords", "{keywords}")
+	v1.HandleFunc("/shutdown", shutdown).Methods("POST")
+	router.Use(mux.CORSMethodMiddleware(router))
+
+	_httpServer = &http.Server{
+		Handler:      router,
+		Addr:         addr,
+		WriteTimeout: 15 * time.Second,
+		ReadTimeout:  15 * time.Second,
+	}
+
+	go func() {
+		if err := _httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
+			fmt.Println("http server error: ", err)
+		}
+	}()
+
+	return nil
+}
+
+func stopHttp() {
+	ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
+	defer cancel()
+
+	_httpServer.Shutdown(ctx)
+}
+
+func jsonBody(w *http.ResponseWriter, r *http.Request, v interface{}) error {
+	//reader, err := r.GetBody()
+	//if err != nil {
+	//	//log
+	//	return err
+	//}
+	//defer reader.Close()
+	//
+	//b, err := ioutil.ReadAll(reader)
+	//if err != nil {
+	//	//log
+	//	return err
+	//}
+	return json.NewDecoder(r.Body).Decode(v)
+}
+
+// new index
+func index_v1(w http.ResponseWriter, r *http.Request) {
+	var param ParamIndex
+	err := jsonBody(&w, r, &param)
+	if err != nil {
+		w.WriteHeader(http.StatusBadRequest)
+		return
+	}
+	fmt.Println("indexing:", param)
+	searcher.IndexDocumentS(param.Key, types.DocumentIndexData{
+		Content: param.Content,
+	}, param.ForceUpdate)
+
+	w.WriteHeader(http.StatusOK)
+	_, err = w.Write([]byte(`{"ec":0,"result":{}}`))
+}
+
+// delete index
+func delete_index_v1(w http.ResponseWriter, r *http.Request) {
+	var param ParamDelete
+	err := jsonBody(&w, r, &param)
+	if err != nil {
+		w.WriteHeader(http.StatusBadRequest)
+		return
+	}
+	fmt.Println("removing index:", param)
+
+	searcher.RemoveDocumentS(param.Key, param.ForceUpdate)
+
+	w.WriteHeader(http.StatusOK)
+	_, err = w.Write([]byte(`{"ec":0,"result":{}}`))
+}
+
+// search
+func search_v1(w http.ResponseWriter, r *http.Request) {
+	params := mux.Vars(r)
+	keywords := params["keywords"]
+	fmt.Println("searching:", params, keywords)
+	resp := searcher.Search(types.SearchRequest{Text: keywords})
+	sresp, err := json.Marshal(resp)
+	if err != nil {
+		w.WriteHeader(http.StatusInternalServerError)
+		return
+	}
+	w.WriteHeader(http.StatusOK)
+	fmt.Fprintf(w, `{"ec":0,"result":%s}`, string(sresp))
+}
+
+func shutdown(w http.ResponseWriter, r *http.Request) {
+
+}

+ 1 - 0
wukongd/logger.go

@@ -0,0 +1 @@
+package main

+ 87 - 0
wukongd/main.go

@@ -0,0 +1,87 @@
+package main
+
+import (
+	"flag"
+	"fmt"
+	wukong "github.com/huichen/wukong/engine"
+	"github.com/huichen/wukong/types"
+	"os"
+	"os/signal"
+)
+
+/**
+  wukongd 基于 wukong 实现,内置一个 http server. 和一个启动配置文件
+
+  * 支持字符串索引
+
+  多实例部署可直接启动多个实例,http 客户端来实现索引的 hash-sharding 以及 查询汇总
+
+*/
+
+var (
+	searcher    = wukong.Engine{}
+	confile     = flag.String("c", "wukongd.conf", "specify the config file")
+	test_conf   = flag.String("t", "", "test specified config file")
+	showVersion = flag.Bool("v", false, "display version")
+)
+
+func main() {
+	flag.Parse()
+
+	var err error
+	// version
+	if *showVersion {
+		printVersion()
+		return
+	}
+	fmt.Println(*test_conf)
+	// check config
+	if len(*test_conf) > 0 {
+		_, err := loadConfig(*test_conf)
+		if err != nil {
+			fmt.Printf("check config file %s failed: %s.\n", *test_conf, err)
+			return
+		}
+		fmt.Printf("check config file %s succeed.\n", *test_conf)
+		return
+	}
+	// main
+	config, err = loadConfig(*confile)
+	if err != nil {
+		fmt.Printf("check config file %s failed: %v\n", *confile, err)
+		return
+	}
+	// init logger
+	// init engine
+	options := types.RankOptions{
+		OutputOffset: config.Engine.OutputOffset,
+		MaxOutputs:   config.Engine.MaxOutput,
+	}
+	searcher.Init(types.EngineInitOptions{
+		SegmenterDictionaries: config.Engine.Dictionary,
+		StopTokenFile:         config.Engine.Stoptoken,
+		IndexerInitOptions: &types.IndexerInitOptions{
+			IndexType: config.Engine.IndexType,
+		},
+		NumShards:               config.Engine.ShardsNum,
+		DefaultRankOptions:      &options,
+		UsePersistentStorage:    config.Engine.Persistent,
+		PersistentStorageFolder: config.Engine.PersistentFolder,
+		PersistentStorageShards: config.Engine.PersistentShardsNum,
+	})
+	searcher.FlushIndex()
+	fmt.Println("indexed:", searcher.NumDocumentsIndexed())
+	// init http
+	err = startHttp(config.Server.Addr)
+	if err != nil {
+		panic(err)
+	}
+
+	fmt.Println("server started.")
+	ch := make(chan os.Signal, 1)
+	signal.Notify(ch, os.Interrupt)
+	fmt.Println("received signal: ", <-ch, " stopping...")
+	stopHttp()
+	searcher.Close()
+	fmt.Println("stopped.")
+}

+ 17 - 0
wukongd/version.go

@@ -0,0 +1,17 @@
+package main
+
+import "fmt"
+
+var (
+	name      string = "wukongd"
+	gitCommit string
+	gitBranch string
+	buildTime string
+	goVersion string
+	version   = "v0.1.0"
+)
+
+func printVersion() {
+	fmt.Printf("%s %v \nbuild %v \n%v\nbranch: %v\ncommit:%v\n",
+		name, version, buildTime, goVersion, gitBranch, gitCommit)
+}

+ 19 - 0
wukongd/wukongd.conf.sample

@@ -0,0 +1,19 @@
+[server]
+addr = ':13400'
+mode = 'debug'
+
+[logger]
+file  ='/var/log/wukongd.log'
+level = 'debug'
+
+[engine]
+dictionary = 'd:/projects/golang/wukongd/data/dictionary.txt'
+stoptoken =  'd:/projects/golang/wukongd/data/stop_tokens.txt'
+indexType = 0  # 0 DocIdsIndex; 1 FrequenciesIndex; 2 LocationsIndex
+shardsNum = 8
+outputOffset = 0
+maxOutput = 100
+persistent = true
+persistentEngine = 'bolt'  # supported: bolt, kv
+persistentFolder = 'd:/projects/golang/wukongd/data/db'
+persistentShardsNum = 64