提交 5618c090 编写于 作者: G guru4elephant

simplify bert reader and add benchmark scripts for bert, refine some basic api

上级 2d54930d
#coding:utf-8
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Mask, padding and batching."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import numpy as np
def prepare_batch_data(insts,
total_token_num,
max_seq_len=128,
pad_id=None,
cls_id=None,
sep_id=None,
mask_id=None,
return_input_mask=True,
return_max_len=True,
return_num_token=False):
"""
1. generate Tensor of data
2. generate Tensor of position
3. generate self attention mask, [shape: batch_size * max_len * max_len]
"""
batch_src_ids = [inst[0] for inst in insts]
batch_sent_ids = [inst[1] for inst in insts]
batch_pos_ids = [inst[2] for inst in insts]
labels_list = []
# compatible with squad, whose example includes start/end positions,
# or unique id
for i in range(3, len(insts[0]), 1):
labels = [inst[i] for inst in insts]
labels = np.array(labels).astype("int64").reshape([-1, 1])
labels_list.append(labels)
out = batch_src_ids
# Second step: padding
src_id, self_input_mask = pad_batch_data(
out, pad_idx=pad_id, max_seq_len=max_seq_len, return_input_mask=True)
pos_id = pad_batch_data(
batch_pos_ids,
pad_idx=pad_id,
max_seq_len=max_seq_len,
return_pos=False,
return_input_mask=False)
sent_id = pad_batch_data(
batch_sent_ids,
pad_idx=pad_id,
max_seq_len=max_seq_len,
return_pos=False,
return_input_mask=False)
return_list = [src_id, pos_id, sent_id, self_input_mask] + labels_list
return return_list if len(return_list) > 1 else return_list[0]
def pad_batch_data(insts,
pad_idx=0,
max_seq_len=128,
return_pos=False,
return_input_mask=False,
return_max_len=False,
return_num_token=False,
return_seq_lens=False):
"""
Pad the instances to the max sequence length in batch, and generate the
corresponding position data and input mask.
"""
return_list = []
#max_len = max(len(inst) for inst in insts)
max_len = max_seq_len
# Any token included in dict can be used to pad, since the paddings' loss
# will be masked out by weights and make no effect on parameter gradients.
inst_data = np.array([
list(inst) + list([pad_idx] * (max_len - len(inst))) for inst in insts
])
return_list += [inst_data.astype("int64").reshape([-1, max_len, 1])]
# position data
if return_pos:
inst_pos = np.array([
list(range(0, len(inst))) + [pad_idx] * (max_len - len(inst))
for inst in insts
])
return_list += [inst_pos.astype("int64").reshape([-1, max_len, 1])]
if return_input_mask:
# This is used to avoid attention on paddings.
input_mask_data = np.array(
[[1] * len(inst) + [0] * (max_len - len(inst)) for inst in insts])
input_mask_data = np.expand_dims(input_mask_data, axis=-1)
return_list += [input_mask_data.astype("float32")]
if return_max_len:
return_list += [max_len]
if return_num_token:
num_token = 0
for inst in insts:
num_token += len(inst)
return_list += [num_token]
if return_seq_lens:
seq_lens = np.array([len(inst) for inst in insts])
return_list += [seq_lens.astype("int64").reshape([-1, 1])]
return return_list if len(return_list) > 1 else return_list[0]
# -*- coding: utf-8 -*-
#
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. # Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
# #
# Licensed under the Apache License, Version 2.0 (the "License"); # Licensed under the Apache License, Version 2.0 (the "License");
...@@ -12,54 +14,53 @@ ...@@ -12,54 +14,53 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from __future__ import unicode_literals, absolute_import
import os
import sys import sys
import time
from paddle_serving_client import Client from paddle_serving_client import Client
from paddle_serving_client.metric import auc
from paddle_serving_client.utils import MultiThreadRunner from paddle_serving_client.utils import MultiThreadRunner
import time from paddle_serving_client.utils import benchmark_args
from bert_client import BertService from batching import pad_batch_data
import tokenization
import requests
import json
from bert_reader import BertReader
args = benchmark_args()
def predict(thr_id, resource): def single_func(idx, resource):
bc = BertService( fin = open("data-c.txt")
model_name="bert_chinese_L-12_H-768_A-12", if args.request == "rpc":
max_seq_len=20, reader = BertReader(vocab_file="vocab.txt", max_seq_len=20)
do_lower_case=True) config_file = './serving_client_conf/serving_client_conf.prototxt'
bc.load_client(resource["conf_file"], resource["server_endpoint"]) fetch = ["pooled_output"]
thread_num = resource["thread_num"] client = Client()
file_list = resource["filelist"] client.load_client_config(args.model)
line_id = 0 client.connect([resource["endpoint"][idx % 4]])
result = []
label_list = [] start = time.time()
dataset = []
for fn in file_list:
fin = open(fn)
for line in fin: for line in fin:
if line_id % thread_num == thr_id - 1: feed_dict = reader.process(line)
dataset.append(line.strip()) result = client.predict(feed=feed_dict,
line_id += 1 fetch=fetch)
fin.close() end = time.time()
elif args.request == "http":
start = time.time() start = time.time()
fetch = ["pooled_output"] header = {"Content-Type":"application/json"}
for inst in dataset: for line in fin:
fetch_map = bc.run_general([[inst]], fetch) #dict_data = {"words": "this is for output ", "fetch": ["pooled_output"]}
result.append(fetch_map["pooled_output"]) dict_data = {"words": line, "fetch": ["pooled_output"]}
end = time.time() r = requests.post('http://{}/bert/prediction'.format(resource["endpoint"][0]),
return [result, label_list, [end - start]] data=json.dumps(dict_data), headers=header)
end = time.time()
return [[end - start]]
if __name__ == '__main__': if __name__ == '__main__':
conf_file = sys.argv[1] multi_thread_runner = MultiThreadRunner()
data_file = sys.argv[2] endpoint_list = ["127.0.0.1:9494", "127.0.0.1:9495", "127.0.0.1:9496", "127.0.0.1:9497"]
thread_num = sys.argv[3] #endpoint_list = endpoint_list + endpoint_list + endpoint_list
resource = {} #result = multi_thread_runner.run(single_func, args.thread, {"endpoint":endpoint_list})
resource["conf_file"] = conf_file result = single_func(0, {"endpoint":endpoint_list})
resource["server_endpoint"] = ["127.0.0.1:9292"] print(result)
resource["filelist"] = [data_file]
resource["thread_num"] = int(thread_num)
thread_runner = MultiThreadRunner()
result = thread_runner.run(predict, int(sys.argv[3]), resource)
print("total time {} s".format(sum(result[-1]) / len(result[-1])))
from batching import pad_batch_data
import tokenization
class BertReader():
def __init__(self, vocab_file="", max_seq_len=128):
self.vocab_file = vocab_file
self.tokenizer = tokenization.FullTokenizer(vocab_file=vocab_file)
self.max_seq_len = max_seq_len
self.vocab = self.tokenizer.vocab
self.pad_id = self.vocab["[PAD]"]
self.cls_id = self.vocab["[CLS]"]
self.sep_id = self.vocab["[SEP]"]
self.mask_id = self.vocab["[MASK]"]
def pad_batch(self, token_ids, text_type_ids, position_ids):
batch_token_ids = [token_ids]
batch_text_type_ids = [text_type_ids]
batch_position_ids = [position_ids]
padded_token_ids, input_mask = pad_batch_data(
batch_token_ids,
max_seq_len=self.max_seq_len,
pad_idx=self.pad_id,
return_input_mask=True)
padded_text_type_ids = pad_batch_data(
batch_text_type_ids,
max_seq_len=self.max_seq_len,
pad_idx=self.pad_id)
padded_position_ids = pad_batch_data(
batch_position_ids,
max_seq_len=self.max_seq_len,
pad_idx=self.pad_id)
return padded_token_ids, padded_position_ids, padded_text_type_ids, input_mask
def process(self, sent):
text_a = tokenization.convert_to_unicode(sent)
tokens_a = self.tokenizer.tokenize(text_a)
if len(tokens_a) > self.max_seq_len - 2:
tokens_a = tokens_a[0:(self.max_seq_len - 2)]
tokens = []
text_type_ids = []
tokens.append("[CLS]")
text_type_ids.append(0)
for token in tokens_a:
tokens.append(token)
text_type_ids.append(0)
token_ids = self.tokenizer.convert_tokens_to_ids(tokens)
position_ids = list(range(len(token_ids)))
p_token_ids, p_pos_ids, p_text_type_ids, input_mask = \
self.pad_batch(token_ids, text_type_ids, position_ids)
feed_result = {"input_ids": p_token_ids.reshape(-1).tolist(),
"position_ids": p_pos_ids.reshape(-1).tolist(),
"segment_ids": p_text_type_ids.reshape(-1).tolist(),
"input_mask": input_mask.reshape(-1).tolist()}
return feed_result
# coding=utf-8
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from paddle_serving_server_gpu.web_service import WebService
from bert_reader import BertReader
import sys
import os
class BertService(WebService):
def load(self):
self.reader = BertReader(vocab_file="vocab.txt", max_seq_len=20)
def preprocess(self, feed={}, fetch=[]):
feed_res = self.reader.process(feed["words"].encode("utf-8"))
return feed_res, fetch
bert_service = BertService(name="bert")
bert_service.load()
bert_service.load_model_config(sys.argv[1])
gpu_ids = os.environ["CUDA_VISIBLE_DEVICES"]
gpus = [int(x) for x in gpu_ids.split(",")]
bert_service.set_gpus(gpus)
bert_service.prepare_server(
workdir="workdir", port=9494, device="gpu")
bert_service.run_server()
此差异已折叠。
...@@ -48,7 +48,7 @@ def single_func(idx, resource): ...@@ -48,7 +48,7 @@ def single_func(idx, resource):
for line in fin: for line in fin:
word_ids, label = imdb_dataset.get_words_and_label(line) word_ids, label = imdb_dataset.get_words_and_label(line)
r = requests.post("http://{}/imdb/prediction".format(args.endpoint), r = requests.post("http://{}/imdb/prediction".format(args.endpoint),
data={"words": word_ids}) data={"words": word_ids, "fetch": ["prediction"]})
end = time.time() end = time.time()
return [[end - start]] return [[end - start]]
......
...@@ -22,6 +22,26 @@ import paddle_serving_server_gpu as paddle_serving_server ...@@ -22,6 +22,26 @@ import paddle_serving_server_gpu as paddle_serving_server
from version import serving_server_version from version import serving_server_version
from contextlib import closing from contextlib import closing
def serve_args():
parser = argparse.ArgumentParser("serve")
parser.add_argument(
"--thread", type=int, default=10, help="Concurrency of server")
parser.add_argument(
"--model", type=str, default="", help="Model for serving")
parser.add_argument(
"--port", type=int, default=9292, help="Port of the starting gpu")
parser.add_argument(
"--workdir",
type=str,
default="workdir",
help="Working dir of current service")
parser.add_argument(
"--device", type=str, default="gpu", help="Type of device")
parser.add_argument(
"--gpu_ids", type=str, default="", help="gpu ids")
parser.add_argument(
"--name", type=str, default="default", help="Default service name")
return parser.parse_args()
class OpMaker(object): class OpMaker(object):
def __init__(self): def __init__(self):
...@@ -126,7 +146,8 @@ class Server(object): ...@@ -126,7 +146,8 @@ class Server(object):
self.model_config_path = model_config_path self.model_config_path = model_config_path
self.engine.name = "general_model" self.engine.name = "general_model"
self.engine.reloadable_meta = model_config_path + "/fluid_time_file" #self.engine.reloadable_meta = model_config_path + "/fluid_time_file"
self.engine.reloadable_meta = self.workdir + "/fluid_time_file"
os.system("touch {}".format(self.engine.reloadable_meta)) os.system("touch {}".format(self.engine.reloadable_meta))
self.engine.reloadable_type = "timestamp_ne" self.engine.reloadable_type = "timestamp_ne"
self.engine.runtime_thread_num = 0 self.engine.runtime_thread_num = 0
...@@ -154,6 +175,7 @@ class Server(object): ...@@ -154,6 +175,7 @@ class Server(object):
self.infer_service_conf.services.extend([infer_service]) self.infer_service_conf.services.extend([infer_service])
def _prepare_resource(self, workdir): def _prepare_resource(self, workdir):
self.workdir = workdir
if self.resource_conf == None: if self.resource_conf == None:
with open("{}/{}".format(workdir, self.general_model_config_fn), with open("{}/{}".format(workdir, self.general_model_config_fn),
"w") as fout: "w") as fout:
......
...@@ -19,30 +19,10 @@ Usage: ...@@ -19,30 +19,10 @@ Usage:
""" """
import argparse import argparse
from multiprocessing import Pool, Process from multiprocessing import Pool, Process
from paddle_serving_server_gpu import serve_args
def parse_args(): def start_gpu_card_model(gpuid, args):
parser = argparse.ArgumentParser("serve")
parser.add_argument(
"--thread", type=int, default=10, help="Concurrency of server")
parser.add_argument(
"--model", type=str, default="", help="Model for serving")
parser.add_argument(
"--port", type=int, default=9292, help="Port of the starting gpu")
parser.add_argument(
"--workdir",
type=str,
default="workdir",
help="Working dir of current service")
parser.add_argument(
"--device", type=str, default="gpu", help="Type of device")
parser.add_argument(
"--gpu_ids", type=str, default="", help="gpu ids")
return parser.parse_args()
args = parse_args()
def start_gpu_card_model(gpuid):
gpuid = int(gpuid) gpuid = int(gpuid)
device = "gpu" device = "gpu"
port = args.port port = args.port
...@@ -79,17 +59,24 @@ def start_gpu_card_model(gpuid): ...@@ -79,17 +59,24 @@ def start_gpu_card_model(gpuid):
server.set_gpuid(gpuid) server.set_gpuid(gpuid)
server.run_server() server.run_server()
if __name__ == "__main__": def start_multi_card(args):
gpus = args.gpu_ids.split(",") gpus = ""
if args.gpu_ids == "":
gpus = os.environ["CUDA_VISIBLE_DEVICES"]
else:
gpus = args.gpu_ids.split(",")
if len(gpus) <= 0: if len(gpus) <= 0:
start_gpu_card_model(-1) start_gpu_card_model(-1)
else: else:
gpu_processes = [] gpu_processes = []
for i, gpu_id in enumerate(gpus): for i, gpu_id in enumerate(gpus):
p = Process(target=start_gpu_card_model, args=(i,)) p = Process(target=start_gpu_card_model, args=(i, args, ))
gpu_processes.append(p) gpu_processes.append(p)
for p in gpu_processes: for p in gpu_processes:
p.start() p.start()
for p in gpu_processes: for p in gpu_processes:
p.join() p.join()
if __name__ == "__main__":
args = serve_args()
start_multi_card(args)
...@@ -17,39 +17,20 @@ Usage: ...@@ -17,39 +17,20 @@ Usage:
Example: Example:
python -m paddle_serving_server.web_serve --model ./serving_server_model --port 9292 python -m paddle_serving_server.web_serve --model ./serving_server_model --port 9292
""" """
import argparse import os
from multiprocessing import Pool, Process from multiprocessing import Pool, Process
from .web_service import WebService from .web_service import WebService
import paddle_serving_server_gpu as serving
from paddle_serving_server_gpu import serve_args
def parse_args():
parser = argparse.ArgumentParser("web_serve")
parser.add_argument(
"--thread", type=int, default=10, help="Concurrency of server")
parser.add_argument(
"--model", type=str, default="", help="Model for serving")
parser.add_argument(
"--port", type=int, default=9292, help="Port the server")
parser.add_argument(
"--workdir",
type=str,
default="workdir",
help="Working dir of current service")
parser.add_argument(
"--device", type=str, default="cpu", help="Type of device")
parser.add_argument(
"--gpu_ids", type=str, default="", help="GPU ids of current service")
parser.add_argument(
"--name", type=str, default="default", help="Default service name")
return parser.parse_args()
if __name__ == "__main__": if __name__ == "__main__":
args = parse_args() args = serve_args()
service = WebService(name=args.name) web_service = WebService(name=args.name)
service.load_model_config(args.model) web_service.load_model_config(args.model)
service.prepare_server( if args.gpu_ids == "":
gpu_ids = os.environ["CUDA_VISIBLE_DEVICES"]
gpus = [int(x) for x in gpu_ids.split(",")]
web_service.set_gpus(gpus)
web_service.prepare_server(
workdir=args.workdir, port=args.port, device=args.device) workdir=args.workdir, port=args.port, device=args.device)
service.run_server(args.gpu_ids) service.run_server()
...@@ -15,53 +15,82 @@ ...@@ -15,53 +15,82 @@
from flask import Flask, request, abort from flask import Flask, request, abort
from multiprocessing import Pool, Process from multiprocessing import Pool, Process
from paddle_serving_server_gpu import OpMaker, OpSeqMaker, Server from paddle_serving_server_gpu import OpMaker, OpSeqMaker, Server
import paddle_serving_server_gpu as serving
from paddle_serving_client import Client from paddle_serving_client import Client
from .serve import start_multi_card
import time
import random
class WebService(object): class WebService(object):
def __init__(self, name="default_service"): def __init__(self, name="default_service"):
self.name = name self.name = name
self.gpus = []
self.rpc_service_list = []
def load_model_config(self, model_config): def load_model_config(self, model_config):
self.model_config = model_config self.model_config = model_config
def _launch_rpc_service(self, gpuid): def set_gpus(self, gpus):
if gpuid < 0: self.gpus = gpus
def default_rpc_service(self, workdir="conf", port=9292,
gpuid=0, thread_num=10):
device = "gpu"
if gpuid == -1:
device = "cpu" device = "cpu"
else: op_maker = serving.OpMaker()
device = "gpu"
op_maker = OpMaker()
read_op = op_maker.create('general_reader') read_op = op_maker.create('general_reader')
general_infer_op = op_maker.create('general_infer') general_infer_op = op_maker.create('general_infer')
general_response_op = op_maker.create('general_response') general_response_op = op_maker.create('general_response')
op_seq_maker = OpSeqMaker()
op_seq_maker = serving.OpSeqMaker()
op_seq_maker.add_op(read_op) op_seq_maker.add_op(read_op)
op_seq_maker.add_op(general_infer_op) op_seq_maker.add_op(general_infer_op)
op_seq_maker.add_op(general_response_op) op_seq_maker.add_op(general_response_op)
server = Server()
server = serving.Server()
server.set_op_sequence(op_seq_maker.get_op_sequence()) server.set_op_sequence(op_seq_maker.get_op_sequence())
server.set_num_threads(10) server.set_num_threads(thread_num)
server.load_model_config(self.model_config)
if gpuid >= 0: if gpuid >= 0:
server.set_gpuid(gpuid) server.set_gpuid(gpuid)
server.load_model_config(self.model_config) server.prepare_server(workdir=workdir, port=port, device=device)
server.prepare_server( return server
workdir="{}_{}".format(self.workdir, gpuid),
port=self.port + gpuid + 1, device=device) def _launch_rpc_service(self, service_idx):
server.run_server() self.rpc_service_list[service_idx].run_server()
def prepare_server(self, workdir="", port=9393, device="gpu", gpuid=0): def prepare_server(self, workdir="", port=9393, device="gpu", gpuid=0):
self.workdir = workdir self.workdir = workdir
self.port = port self.port = port
self.device = device self.device = device
self.gpuid = gpuid self.gpuid = gpuid
if len(self.gpus) == 0:
# init cpu service
self.rpc_service_list.append(
self.default_rpc_service(self.workdir, self.port+1,
-1, thread_num=10))
else:
for i, gpuid in enumerate(self.gpus):
self.rpc_service_list.append(
self.default_rpc_service("{}_{}".format(self.workdir, i),
self.port+1+i,
gpuid, thread_num=10))
def _launch_web_service(self): def _launch_web_service(self, gpu_num):
app_instance = Flask(__name__) app_instance = Flask(__name__)
client_service = Client() client_list = []
client_service.load_client_config( if gpu_num > 1:
"{}/serving_server_conf.prototxt".format(self.model_config)) gpu_num = 0
client_service.connect(["127.0.0.1:{}".format(self.port + 1)]) for i in range(gpu_num):
client_service = Client()
client_service.load_client_config(
"{}/serving_server_conf.prototxt".format(self.model_config))
client_service.connect(["127.0.0.1:{}".format(self.port + i + 1)])
client_list.append(client_service)
time.sleep(1)
service_name = "/" + self.name + "/prediction" service_name = "/" + self.name + "/prediction"
@app_instance.route(service_name, methods=['POST']) @app_instance.route(service_name, methods=['POST'])
...@@ -71,7 +100,8 @@ class WebService(object): ...@@ -71,7 +100,8 @@ class WebService(object):
if "fetch" not in request.json: if "fetch" not in request.json:
abort(400) abort(400)
feed, fetch = self.preprocess(request.json, request.json["fetch"]) feed, fetch = self.preprocess(request.json, request.json["fetch"])
fetch_map = client_service.predict(feed=feed, fetch=fetch) fetch_map = client_list[0].predict(
feed=feed, fetch=fetch)
fetch_map = self.postprocess( fetch_map = self.postprocess(
feed=request.json, fetch=fetch, fetch_map=fetch_map) feed=request.json, fetch=fetch, fetch_map=fetch_map)
return fetch_map return fetch_map
...@@ -81,27 +111,26 @@ class WebService(object): ...@@ -81,27 +111,26 @@ class WebService(object):
threaded=False, threaded=False,
processes=1) processes=1)
def run_server(self, gpu_ids): def run_server(self):
import socket import socket
localIP = socket.gethostbyname(socket.gethostname()) localIP = socket.gethostbyname(socket.gethostname())
print("web service address:") print("web service address:")
print("http://{}:{}/{}/prediction".format(localIP, self.port, print("http://{}:{}/{}/prediction".format(localIP, self.port,
self.name)) self.name))
gpus = gpu_ids.split(",") rpc_processes = []
if len(gpus) <= 0: for idx in range(len(self.rpc_service_list)):
self._launch_rpc_service(-1) p_rpc = Process(target=self._launch_rpc_service, args=(idx,))
else: rpc_processes.append(p_rpc)
gpu_processes = []
for i, gpu_id in gpus: for p in rpc_processes:
p = Process(target=self._launch_rpc_service, (i,)) p.start()
gpu_processes.append(p)
for p in gpu_processes: p_web = Process(target=self._launch_web_service, args=(len(self.gpus),))
p.start() p_web.start()
p_web = Process(target=self._launch_web_service) for p in rpc_processes:
for p in gpu_processes: p.join()
p.join() p_web.join()
p_web.join()
def preprocess(self, feed={}, fetch=[]): def preprocess(self, feed={}, fetch=[]):
return feed, fetch return feed, fetch
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册