diff --git a/core/cube/cube-api/go-api/conf/cube.conf b/core/cube/cube-api/go-api/conf/cube.conf new file mode 100644 index 0000000000000000000000000000000000000000..0a21e83926c722b92a1b6efd9202b5d2c9c29418 --- /dev/null +++ b/core/cube/cube-api/go-api/conf/cube.conf @@ -0,0 +1,11 @@ +[{ + "dict_name": "test", + "shard": 2, + "nodes": [{ + "ip": "127.0.0.1", + "port": 8731 + },{ + "ip": "127.0.0.1", + "port": 8730 + }] +}] diff --git a/core/cube/cube-api/go-api/demo.go b/core/cube/cube-api/go-api/demo.go new file mode 100644 index 0000000000000000000000000000000000000000..bd82040db74890f2b4dd1c24780d1a8bff9e91b8 --- /dev/null +++ b/core/cube/cube-api/go-api/demo.go @@ -0,0 +1,33 @@ +package main + +import ( + "encoding/json" + "flag" + "fmt" + "io/ioutil" +) + +func main() { + dict_name := flag.String("n", "test", "cube name") + conf_path := flag.String("c", "./conf/cube.conf", "cube conf path") + input_path := flag.String("i", "./input.json", "keys to seek") + output_path := flag.String("o", "./output.json", "result to save") + flag.Parse() + bytes, err := ioutil.ReadFile(*conf_path) + if err != nil { + fmt.Println("读取配置文件失败", err) + return + } + var meta Meta + err = json.Unmarshal(bytes, &meta.Servers) + if err != nil { + fmt.Println("解析数据失败", err) + return + } + + err = meta.Seek(*dict_name, *input_path, *output_path) + if err != nil { + fmt.Println(err) + } + return +} diff --git a/core/cube/cube-api/go-api/input.json b/core/cube/cube-api/go-api/input.json new file mode 100644 index 0000000000000000000000000000000000000000..1eb1f62b7d1cbf0ee9a2a9421683ae84c9f3cdce --- /dev/null +++ b/core/cube/cube-api/go-api/input.json @@ -0,0 +1,2 @@ +{"keys": [0,1,2,3,4,5,6,7]} +{"keys": [1]} diff --git a/core/cube/cube-api/go-api/meta.go b/core/cube/cube-api/go-api/meta.go new file mode 100644 index 0000000000000000000000000000000000000000..a7757b4d91f76de23632a3abaed69e4665ca7619 --- /dev/null +++ b/core/cube/cube-api/go-api/meta.go @@ -0,0 +1,24 @@ +package main + +import "fmt" + +type Meta struct { + Servers []CubeServer `json:"servers,omitempty"` +} + +func (meta *Meta) Seek(dict_name string, input string, output string) (err error) { + var server CubeServer + + for _, s := range meta.Servers { + if s.Name == dict_name { + server = s + break + } + } + if server.Name != dict_name { + err = fmt.Errorf("%s server not exist", dict_name) + return err + } + err = server.Seek(input, output) + return err +} diff --git a/core/cube/cube-api/go-api/server.go b/core/cube/cube-api/go-api/server.go new file mode 100644 index 0000000000000000000000000000000000000000..2c6c81d3c0009de6fcfcd57c73feff134d7b3b73 --- /dev/null +++ b/core/cube/cube-api/go-api/server.go @@ -0,0 +1,117 @@ +package main + +import ( + "bufio" + "bytes" + "encoding/json" + "fmt" + "io" + "io/ioutil" + "log" + "net/http" + "os" +) + +type Input struct { + Keys []uint64 `json:"keys"` +} + +type SingleValue struct { + Status uint32 `json:"status"` + Value string `json:"value"` +} +type Output struct { + Values []SingleValue `json:"values"` +} + +type ServerNode struct { + Ip string `json:"ip"` + Port uint64 `json:"port"` +} + +type CubeServer struct { + Name string `json:"dict_name"` + Shard uint64 `json:"shard"` + Nodes []ServerNode `json:"nodes"` +} + +func (server *CubeServer) SplitKeys(keys []uint64) (splited_keys map[uint64]Input, offset map[uint64][]uint64) { + splited_keys = make(map[uint64]Input) + + offset = make(map[uint64][]uint64) + for i, key := range keys { + shard_id := key % server.Shard + temp_split, _ := splited_keys[shard_id] + temp_split.Keys = append(temp_split.Keys, key) + splited_keys[shard_id] = temp_split + + temp_offset, _ := offset[shard_id] + temp_offset = append(temp_offset, uint64(i)) + offset[shard_id] = temp_offset + } + + return splited_keys, offset +} + +func (server *CubeServer) Seek(input string, output_path string) (err error) { + file, err := os.Open(input) + if err != nil { + return err + } + defer file.Close() + + buf := bufio.NewReader(file) + + for { + line, err := buf.ReadBytes('\n') + //line = strings.TrimSpace(line) + if err != nil || io.EOF == err { + break + } + var temp_input Input + json.Unmarshal(line, &temp_input) + key_nums := len(temp_input.Keys) + var output Output + output.Values = make([]SingleValue, key_nums+1) + splited_keys, offset := server.SplitKeys(temp_input.Keys) + for shard_id, keys := range splited_keys { + cur_output, _ := server.Post(shard_id, keys) + for index, single_value := range cur_output.Values { + output.Values[offset[shard_id][index]] = single_value + } + } + json_str, _ := json.Marshal(output) + fp, err := os.OpenFile(output_path, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0755) + if err != nil { + log.Fatal(err) + } + defer fp.Close() + _, err = fp.Write(json_str) + } + return err +} + +func (server *CubeServer) Post(shard_id uint64, input Input) (output Output, err error) { + if shard_id >= uint64(len(server.Nodes)) { + err = fmt.Errorf("have no shard:%v", shard_id) + return output, err + } + json_str, _ := json.Marshal(input) + URL := fmt.Sprintf("http://%s:%v/DictService/seek", server.Nodes[shard_id].Ip, server.Nodes[shard_id].Port) + req, err := http.NewRequest("POST", URL, bytes.NewBuffer(json_str)) + if err != nil { + return output, err + } + req.Header.Set("Content-Type", "application/json") + client := &http.Client{} + resp, err := client.Do(req) + if err != nil { + return output, err + } + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return output, err + } + err = json.Unmarshal(body, &output) + return output, err +} diff --git a/core/cube/cube-api/python-api/conf/cube.conf b/core/cube/cube-api/python-api/conf/cube.conf new file mode 100644 index 0000000000000000000000000000000000000000..0a21e83926c722b92a1b6efd9202b5d2c9c29418 --- /dev/null +++ b/core/cube/cube-api/python-api/conf/cube.conf @@ -0,0 +1,11 @@ +[{ + "dict_name": "test", + "shard": 2, + "nodes": [{ + "ip": "127.0.0.1", + "port": 8731 + },{ + "ip": "127.0.0.1", + "port": 8730 + }] +}] diff --git a/core/cube/cube-api/python-api/demo.py b/core/cube/cube-api/python-api/demo.py new file mode 100644 index 0000000000000000000000000000000000000000..c0d63e6bce58e04c9395d4448645203fd138d23c --- /dev/null +++ b/core/cube/cube-api/python-api/demo.py @@ -0,0 +1,76 @@ +#coding=utf-8 + +import requests +import sys +import json + +class Meta(object): + """记录cube分片server路由""" + def __init__(self, conf_path): + """根据配置文件初始化路由""" + self.server_api = "/DictService/seek" + self.server_meta = {} + with open(conf_path, "r", encoding="utf8") as fp: + cube_servcers = json.load(fp) + for server in cube_servcers: + self.server_meta[server["dict_name"]] = server + fp.close() + + def seek(self, dict_name, keys_path, save_path): + """查询""" + save_file = open(save_path, 'w') + with open(keys_path, "r", encoding="utf8") as fp: + lines = fp.readlines() + for line in lines: + json_line = json.loads(line) + values = [{} for i in range(len(json_line["keys"]))] + splited_keys, offset = self.split_keys(json_line) + for shard_id, keys in splited_keys.items(): + results = self.post(dict_name, shard_id, keys) + for i, result in enumerate(results["values"]): + values[offset[shard_id][i]] = result + cur_line_results = {} + cur_line_results["values"] = values + + json.dump(cur_line_results, save_file) + save_file.write("\n") + + fp.close() + save_file.close() + + def split_keys(self, json_line): + """根据key值及分片数判断去哪一个分片上查询""" + keys_split = {} + offset = {} + i = 0 + for key in json_line["keys"]: + shard_id = key % self.server_meta[dict_name]["shard"] + if shard_id not in keys_split: + keys_split[shard_id] = [] + keys_split[shard_id].append(key) + if shard_id not in offset: + offset[shard_id] = [] + offset[shard_id].append(i) + i += 1 + return keys_split, offset + + def post(self, dict_name, shard_id, keys): + """向分片server发送post请求""" + api = "http://%s:%s%s" % (self.server_meta[dict_name]["nodes"][shard_id]["ip"], + self.server_meta[dict_name]["nodes"][shard_id]["port"], + self.server_api) + data = {"keys": keys} + response = requests.post(api, json.dumps(data)) + return response.json() + + +if __name__ == '__main__': + if len(sys.argv) != 5: + print('please usage: python demo.py conf_path dict_name keys_path save_path') + exit(0) + conf_path = sys.argv[1] + dict_name = sys.argv[2] + keys_path = sys.argv[3] + save_path = sys.argv[4] + meta = Meta(conf_path) + meta.seek(dict_name, keys_path, save_path) diff --git a/core/cube/cube-api/python-api/input.json b/core/cube/cube-api/python-api/input.json new file mode 100644 index 0000000000000000000000000000000000000000..74db7d86a8f64047cdf3c3b9b119574b6a96d286 --- /dev/null +++ b/core/cube/cube-api/python-api/input.json @@ -0,0 +1,2 @@ +{"keys": [0,1,2,3,4,5,6,7]} +{"keys": [1]} \ No newline at end of file diff --git a/core/cube/cube-api/python-api/ptyhon_api.md b/core/cube/cube-api/python-api/ptyhon_api.md new file mode 100644 index 0000000000000000000000000000000000000000..1b321d9153f938e0ad94ddb321e3aae19da755b1 --- /dev/null +++ b/core/cube/cube-api/python-api/ptyhon_api.md @@ -0,0 +1,32 @@ +# cube python api说明文档 +参考[大规模稀疏参数服务Cube的部署和使用](https://github.com/PaddlePaddle/Serving/blob/master/doc/DEPLOY.md#2-大规模稀疏参数服务cube的部署和使用)文档进行cube的部署。 +使用python api,可替代上述文档中第3节预测服务的部署、使用 + +## 配置说明 +conf/cube.conf 以json格式,设置各个分片cube server的ip以及port,shard与分片数一致,示例: +```bash +[{ + "dict_name": "test", + "shard": 2, + "nodes": [{ + "ip": "127.0.0.1", + "port": 8731 + },{ + "ip": "127.0.0.1", + "port": 8730 + }] +}] +``` + +## 数据格式 +```bash +{"keys": [0,1,2,3,4,5,6,7]} +{"keys": [1]} +``` +支持批量查询,每次查询一行 + +## 使用 +```bash +cd ./python-api +python3 demo.py conf/cube.conf test input.json result.json +``` \ No newline at end of file diff --git a/core/cube/cube-api/python-api/result.json b/core/cube/cube-api/python-api/result.json new file mode 100644 index 0000000000000000000000000000000000000000..dbe282debae1f906d99f25b4e7ea0169c874b7ae --- /dev/null +++ b/core/cube/cube-api/python-api/result.json @@ -0,0 +1,2 @@ +{"values": [{"status": 4294967295, "value": ""}, {"status": 4294967295, "value": ""}, {"status": 4294967295, "value": ""}, {"status": 4294967295, "value": ""}, {"status": 4294967295, "value": ""}, {"status": 4294967295, "value": ""}, {"status": 4294967295, "value": ""}, {"status": 4294967295, "value": ""}]} +{"values": [{"status": 4294967295, "value": ""}]}