未验证 提交 32ada7aa 编写于 作者: J Jiawei Wang 提交者: GitHub

Merge pull request #1399 from loveululu/develop

增加python http方式请求cube数据样例
[{
"dict_name": "test",
"shard": 2,
"nodes": [{
"ip": "127.0.0.1",
"port": 8731
},{
"ip": "127.0.0.1",
"port": 8730
}]
}]
package main
import (
"encoding/json"
"flag"
"fmt"
"io/ioutil"
)
func main() {
dict_name := flag.String("n", "test", "cube name")
conf_path := flag.String("c", "./conf/cube.conf", "cube conf path")
input_path := flag.String("i", "./input.json", "keys to seek")
output_path := flag.String("o", "./output.json", "result to save")
flag.Parse()
bytes, err := ioutil.ReadFile(*conf_path)
if err != nil {
fmt.Println("读取配置文件失败", err)
return
}
var meta Meta
err = json.Unmarshal(bytes, &meta.Servers)
if err != nil {
fmt.Println("解析数据失败", err)
return
}
err = meta.Seek(*dict_name, *input_path, *output_path)
if err != nil {
fmt.Println(err)
}
return
}
{"keys": [0,1,2,3,4,5,6,7]}
{"keys": [1]}
package main
import "fmt"
type Meta struct {
Servers []CubeServer `json:"servers,omitempty"`
}
func (meta *Meta) Seek(dict_name string, input string, output string) (err error) {
var server CubeServer
for _, s := range meta.Servers {
if s.Name == dict_name {
server = s
break
}
}
if server.Name != dict_name {
err = fmt.Errorf("%s server not exist", dict_name)
return err
}
err = server.Seek(input, output)
return err
}
package main
import (
"bufio"
"bytes"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"log"
"net/http"
"os"
)
type Input struct {
Keys []uint64 `json:"keys"`
}
type SingleValue struct {
Status uint32 `json:"status"`
Value string `json:"value"`
}
type Output struct {
Values []SingleValue `json:"values"`
}
type ServerNode struct {
Ip string `json:"ip"`
Port uint64 `json:"port"`
}
type CubeServer struct {
Name string `json:"dict_name"`
Shard uint64 `json:"shard"`
Nodes []ServerNode `json:"nodes"`
}
func (server *CubeServer) SplitKeys(keys []uint64) (splited_keys map[uint64]Input, offset map[uint64][]uint64) {
splited_keys = make(map[uint64]Input)
offset = make(map[uint64][]uint64)
for i, key := range keys {
shard_id := key % server.Shard
temp_split, _ := splited_keys[shard_id]
temp_split.Keys = append(temp_split.Keys, key)
splited_keys[shard_id] = temp_split
temp_offset, _ := offset[shard_id]
temp_offset = append(temp_offset, uint64(i))
offset[shard_id] = temp_offset
}
return splited_keys, offset
}
func (server *CubeServer) Seek(input string, output_path string) (err error) {
file, err := os.Open(input)
if err != nil {
return err
}
defer file.Close()
buf := bufio.NewReader(file)
for {
line, err := buf.ReadBytes('\n')
//line = strings.TrimSpace(line)
if err != nil || io.EOF == err {
break
}
var temp_input Input
json.Unmarshal(line, &temp_input)
key_nums := len(temp_input.Keys)
var output Output
output.Values = make([]SingleValue, key_nums+1)
splited_keys, offset := server.SplitKeys(temp_input.Keys)
for shard_id, keys := range splited_keys {
cur_output, _ := server.Post(shard_id, keys)
for index, single_value := range cur_output.Values {
output.Values[offset[shard_id][index]] = single_value
}
}
json_str, _ := json.Marshal(output)
fp, err := os.OpenFile(output_path, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0755)
if err != nil {
log.Fatal(err)
}
defer fp.Close()
_, err = fp.Write(json_str)
}
return err
}
func (server *CubeServer) Post(shard_id uint64, input Input) (output Output, err error) {
if shard_id >= uint64(len(server.Nodes)) {
err = fmt.Errorf("have no shard:%v", shard_id)
return output, err
}
json_str, _ := json.Marshal(input)
URL := fmt.Sprintf("http://%s:%v/DictService/seek", server.Nodes[shard_id].Ip, server.Nodes[shard_id].Port)
req, err := http.NewRequest("POST", URL, bytes.NewBuffer(json_str))
if err != nil {
return output, err
}
req.Header.Set("Content-Type", "application/json")
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return output, err
}
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return output, err
}
err = json.Unmarshal(body, &output)
return output, err
}
[{
"dict_name": "test",
"shard": 2,
"nodes": [{
"ip": "127.0.0.1",
"port": 8731
},{
"ip": "127.0.0.1",
"port": 8730
}]
}]
#coding=utf-8
import requests
import sys
import json
class Meta(object):
"""记录cube分片server路由"""
def __init__(self, conf_path):
"""根据配置文件初始化路由"""
self.server_api = "/DictService/seek"
self.server_meta = {}
with open(conf_path, "r", encoding="utf8") as fp:
cube_servcers = json.load(fp)
for server in cube_servcers:
self.server_meta[server["dict_name"]] = server
fp.close()
def seek(self, dict_name, keys_path, save_path):
"""查询"""
save_file = open(save_path, 'w')
with open(keys_path, "r", encoding="utf8") as fp:
lines = fp.readlines()
for line in lines:
json_line = json.loads(line)
values = [{} for i in range(len(json_line["keys"]))]
splited_keys, offset = self.split_keys(json_line)
for shard_id, keys in splited_keys.items():
results = self.post(dict_name, shard_id, keys)
for i, result in enumerate(results["values"]):
values[offset[shard_id][i]] = result
cur_line_results = {}
cur_line_results["values"] = values
json.dump(cur_line_results, save_file)
save_file.write("\n")
fp.close()
save_file.close()
def split_keys(self, json_line):
"""根据key值及分片数判断去哪一个分片上查询"""
keys_split = {}
offset = {}
i = 0
for key in json_line["keys"]:
shard_id = key % self.server_meta[dict_name]["shard"]
if shard_id not in keys_split:
keys_split[shard_id] = []
keys_split[shard_id].append(key)
if shard_id not in offset:
offset[shard_id] = []
offset[shard_id].append(i)
i += 1
return keys_split, offset
def post(self, dict_name, shard_id, keys):
"""向分片server发送post请求"""
api = "http://%s:%s%s" % (self.server_meta[dict_name]["nodes"][shard_id]["ip"],
self.server_meta[dict_name]["nodes"][shard_id]["port"],
self.server_api)
data = {"keys": keys}
response = requests.post(api, json.dumps(data))
return response.json()
if __name__ == '__main__':
if len(sys.argv) != 5:
print('please usage: python demo.py conf_path dict_name keys_path save_path')
exit(0)
conf_path = sys.argv[1]
dict_name = sys.argv[2]
keys_path = sys.argv[3]
save_path = sys.argv[4]
meta = Meta(conf_path)
meta.seek(dict_name, keys_path, save_path)
{"keys": [0,1,2,3,4,5,6,7]}
{"keys": [1]}
\ No newline at end of file
# cube python api说明文档
参考[大规模稀疏参数服务Cube的部署和使用](https://github.com/PaddlePaddle/Serving/blob/master/doc/DEPLOY.md#2-大规模稀疏参数服务cube的部署和使用)文档进行cube的部署。
使用python api,可替代上述文档中第3节预测服务的部署、使用
## 配置说明
conf/cube.conf 以json格式,设置各个分片cube server的ip以及port,shard与分片数一致,示例:
```bash
[{
"dict_name": "test",
"shard": 2,
"nodes": [{
"ip": "127.0.0.1",
"port": 8731
},{
"ip": "127.0.0.1",
"port": 8730
}]
}]
```
## 数据格式
```bash
{"keys": [0,1,2,3,4,5,6,7]}
{"keys": [1]}
```
支持批量查询,每次查询一行
## 使用
```bash
cd ./python-api
python3 demo.py conf/cube.conf test input.json result.json
```
\ No newline at end of file
{"values": [{"status": 4294967295, "value": ""}, {"status": 4294967295, "value": ""}, {"status": 4294967295, "value": ""}, {"status": 4294967295, "value": ""}, {"status": 4294967295, "value": ""}, {"status": 4294967295, "value": ""}, {"status": 4294967295, "value": ""}, {"status": 4294967295, "value": ""}]}
{"values": [{"status": 4294967295, "value": ""}]}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册