提交 8c5f03a4 编写于 作者: G guru4elephant

simplify bert reader and add benchmark scripts for bert, refine some basic api

上级 7bc54c46
#coding:utf-8
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Mask, padding and batching."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import numpy as np
def prepare_batch_data(insts,
total_token_num,
max_seq_len=128,
pad_id=None,
cls_id=None,
sep_id=None,
mask_id=None,
return_input_mask=True,
return_max_len=True,
return_num_token=False):
"""
1. generate Tensor of data
2. generate Tensor of position
3. generate self attention mask, [shape: batch_size * max_len * max_len]
"""
batch_src_ids = [inst[0] for inst in insts]
batch_sent_ids = [inst[1] for inst in insts]
batch_pos_ids = [inst[2] for inst in insts]
labels_list = []
# compatible with squad, whose example includes start/end positions,
# or unique id
for i in range(3, len(insts[0]), 1):
labels = [inst[i] for inst in insts]
labels = np.array(labels).astype("int64").reshape([-1, 1])
labels_list.append(labels)
out = batch_src_ids
# Second step: padding
src_id, self_input_mask = pad_batch_data(
out, pad_idx=pad_id, max_seq_len=max_seq_len, return_input_mask=True)
pos_id = pad_batch_data(
batch_pos_ids,
pad_idx=pad_id,
max_seq_len=max_seq_len,
return_pos=False,
return_input_mask=False)
sent_id = pad_batch_data(
batch_sent_ids,
pad_idx=pad_id,
max_seq_len=max_seq_len,
return_pos=False,
return_input_mask=False)
return_list = [src_id, pos_id, sent_id, self_input_mask] + labels_list
return return_list if len(return_list) > 1 else return_list[0]
def pad_batch_data(insts,
pad_idx=0,
max_seq_len=128,
return_pos=False,
return_input_mask=False,
return_max_len=False,
return_num_token=False,
return_seq_lens=False):
"""
Pad the instances to the max sequence length in batch, and generate the
corresponding position data and input mask.
"""
return_list = []
#max_len = max(len(inst) for inst in insts)
max_len = max_seq_len
# Any token included in dict can be used to pad, since the paddings' loss
# will be masked out by weights and make no effect on parameter gradients.
inst_data = np.array([
list(inst) + list([pad_idx] * (max_len - len(inst))) for inst in insts
])
return_list += [inst_data.astype("int64").reshape([-1, max_len, 1])]
# position data
if return_pos:
inst_pos = np.array([
list(range(0, len(inst))) + [pad_idx] * (max_len - len(inst))
for inst in insts
])
return_list += [inst_pos.astype("int64").reshape([-1, max_len, 1])]
if return_input_mask:
# This is used to avoid attention on paddings.
input_mask_data = np.array(
[[1] * len(inst) + [0] * (max_len - len(inst)) for inst in insts])
input_mask_data = np.expand_dims(input_mask_data, axis=-1)
return_list += [input_mask_data.astype("float32")]
if return_max_len:
return_list += [max_len]
if return_num_token:
num_token = 0
for inst in insts:
num_token += len(inst)
return_list += [num_token]
if return_seq_lens:
seq_lens = np.array([len(inst) for inst in insts])
return_list += [seq_lens.astype("int64").reshape([-1, 1])]
return return_list if len(return_list) > 1 else return_list[0]
# -*- coding: utf-8 -*-
#
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
......@@ -12,54 +14,53 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import unicode_literals, absolute_import
import os
import sys
import time
from paddle_serving_client import Client
from paddle_serving_client.metric import auc
from paddle_serving_client.utils import MultiThreadRunner
import time
from bert_client import BertService
from paddle_serving_client.utils import benchmark_args
from batching import pad_batch_data
import tokenization
import requests
import json
from bert_reader import BertReader
args = benchmark_args()
def predict(thr_id, resource):
bc = BertService(
model_name="bert_chinese_L-12_H-768_A-12",
max_seq_len=20,
do_lower_case=True)
bc.load_client(resource["conf_file"], resource["server_endpoint"])
thread_num = resource["thread_num"]
file_list = resource["filelist"]
line_id = 0
result = []
label_list = []
dataset = []
for fn in file_list:
fin = open(fn)
def single_func(idx, resource):
fin = open("data-c.txt")
if args.request == "rpc":
reader = BertReader(vocab_file="vocab.txt", max_seq_len=20)
config_file = './serving_client_conf/serving_client_conf.prototxt'
fetch = ["pooled_output"]
client = Client()
client.load_client_config(args.model)
client.connect([resource["endpoint"][idx % 4]])
start = time.time()
for line in fin:
if line_id % thread_num == thr_id - 1:
dataset.append(line.strip())
line_id += 1
fin.close()
start = time.time()
fetch = ["pooled_output"]
for inst in dataset:
fetch_map = bc.run_general([[inst]], fetch)
result.append(fetch_map["pooled_output"])
end = time.time()
return [result, label_list, [end - start]]
feed_dict = reader.process(line)
result = client.predict(feed=feed_dict,
fetch=fetch)
end = time.time()
elif args.request == "http":
start = time.time()
header = {"Content-Type":"application/json"}
for line in fin:
#dict_data = {"words": "this is for output ", "fetch": ["pooled_output"]}
dict_data = {"words": line, "fetch": ["pooled_output"]}
r = requests.post('http://{}/bert/prediction'.format(resource["endpoint"][0]),
data=json.dumps(dict_data), headers=header)
end = time.time()
return [[end - start]]
if __name__ == '__main__':
conf_file = sys.argv[1]
data_file = sys.argv[2]
thread_num = sys.argv[3]
resource = {}
resource["conf_file"] = conf_file
resource["server_endpoint"] = ["127.0.0.1:9292"]
resource["filelist"] = [data_file]
resource["thread_num"] = int(thread_num)
thread_runner = MultiThreadRunner()
result = thread_runner.run(predict, int(sys.argv[3]), resource)
multi_thread_runner = MultiThreadRunner()
endpoint_list = ["127.0.0.1:9494", "127.0.0.1:9495", "127.0.0.1:9496", "127.0.0.1:9497"]
#endpoint_list = endpoint_list + endpoint_list + endpoint_list
#result = multi_thread_runner.run(single_func, args.thread, {"endpoint":endpoint_list})
result = single_func(0, {"endpoint":endpoint_list})
print(result)
print("total time {} s".format(sum(result[-1]) / len(result[-1])))
from batching import pad_batch_data
import tokenization
class BertReader():
def __init__(self, vocab_file="", max_seq_len=128):
self.vocab_file = vocab_file
self.tokenizer = tokenization.FullTokenizer(vocab_file=vocab_file)
self.max_seq_len = max_seq_len
self.vocab = self.tokenizer.vocab
self.pad_id = self.vocab["[PAD]"]
self.cls_id = self.vocab["[CLS]"]
self.sep_id = self.vocab["[SEP]"]
self.mask_id = self.vocab["[MASK]"]
def pad_batch(self, token_ids, text_type_ids, position_ids):
batch_token_ids = [token_ids]
batch_text_type_ids = [text_type_ids]
batch_position_ids = [position_ids]
padded_token_ids, input_mask = pad_batch_data(
batch_token_ids,
max_seq_len=self.max_seq_len,
pad_idx=self.pad_id,
return_input_mask=True)
padded_text_type_ids = pad_batch_data(
batch_text_type_ids,
max_seq_len=self.max_seq_len,
pad_idx=self.pad_id)
padded_position_ids = pad_batch_data(
batch_position_ids,
max_seq_len=self.max_seq_len,
pad_idx=self.pad_id)
return padded_token_ids, padded_position_ids, padded_text_type_ids, input_mask
def process(self, sent):
text_a = tokenization.convert_to_unicode(sent)
tokens_a = self.tokenizer.tokenize(text_a)
if len(tokens_a) > self.max_seq_len - 2:
tokens_a = tokens_a[0:(self.max_seq_len - 2)]
tokens = []
text_type_ids = []
tokens.append("[CLS]")
text_type_ids.append(0)
for token in tokens_a:
tokens.append(token)
text_type_ids.append(0)
token_ids = self.tokenizer.convert_tokens_to_ids(tokens)
position_ids = list(range(len(token_ids)))
p_token_ids, p_pos_ids, p_text_type_ids, input_mask = \
self.pad_batch(token_ids, text_type_ids, position_ids)
feed_result = {"input_ids": p_token_ids.reshape(-1).tolist(),
"position_ids": p_pos_ids.reshape(-1).tolist(),
"segment_ids": p_text_type_ids.reshape(-1).tolist(),
"input_mask": input_mask.reshape(-1).tolist()}
return feed_result
# coding=utf-8
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from paddle_serving_server_gpu.web_service import WebService
from bert_reader import BertReader
import sys
import os
class BertService(WebService):
def load(self):
self.reader = BertReader(vocab_file="vocab.txt", max_seq_len=20)
def preprocess(self, feed={}, fetch=[]):
feed_res = self.reader.process(feed["words"].encode("utf-8"))
return feed_res, fetch
bert_service = BertService(name="bert")
bert_service.load()
bert_service.load_model_config(sys.argv[1])
gpu_ids = os.environ["CUDA_VISIBLE_DEVICES"]
gpus = [int(x) for x in gpu_ids.split(",")]
bert_service.set_gpus(gpus)
bert_service.prepare_server(
workdir="workdir", port=9494, device="gpu")
bert_service.run_server()
此差异已折叠。
......@@ -48,7 +48,7 @@ def single_func(idx, resource):
for line in fin:
word_ids, label = imdb_dataset.get_words_and_label(line)
r = requests.post("http://{}/imdb/prediction".format(args.endpoint),
data={"words": word_ids})
data={"words": word_ids, "fetch": ["prediction"]})
end = time.time()
return [[end - start]]
......
......@@ -22,6 +22,26 @@ import paddle_serving_server_gpu as paddle_serving_server
from version import serving_server_version
from contextlib import closing
def serve_args():
parser = argparse.ArgumentParser("serve")
parser.add_argument(
"--thread", type=int, default=10, help="Concurrency of server")
parser.add_argument(
"--model", type=str, default="", help="Model for serving")
parser.add_argument(
"--port", type=int, default=9292, help="Port of the starting gpu")
parser.add_argument(
"--workdir",
type=str,
default="workdir",
help="Working dir of current service")
parser.add_argument(
"--device", type=str, default="gpu", help="Type of device")
parser.add_argument(
"--gpu_ids", type=str, default="", help="gpu ids")
parser.add_argument(
"--name", type=str, default="default", help="Default service name")
return parser.parse_args()
class OpMaker(object):
def __init__(self):
......@@ -126,7 +146,8 @@ class Server(object):
self.model_config_path = model_config_path
self.engine.name = "general_model"
self.engine.reloadable_meta = model_config_path + "/fluid_time_file"
#self.engine.reloadable_meta = model_config_path + "/fluid_time_file"
self.engine.reloadable_meta = self.workdir + "/fluid_time_file"
os.system("touch {}".format(self.engine.reloadable_meta))
self.engine.reloadable_type = "timestamp_ne"
self.engine.runtime_thread_num = 0
......@@ -154,6 +175,7 @@ class Server(object):
self.infer_service_conf.services.extend([infer_service])
def _prepare_resource(self, workdir):
self.workdir = workdir
if self.resource_conf == None:
with open("{}/{}".format(workdir, self.general_model_config_fn),
"w") as fout:
......
......@@ -19,30 +19,10 @@ Usage:
"""
import argparse
from multiprocessing import Pool, Process
from paddle_serving_server_gpu import serve_args
def parse_args():
parser = argparse.ArgumentParser("serve")
parser.add_argument(
"--thread", type=int, default=10, help="Concurrency of server")
parser.add_argument(
"--model", type=str, default="", help="Model for serving")
parser.add_argument(
"--port", type=int, default=9292, help="Port of the starting gpu")
parser.add_argument(
"--workdir",
type=str,
default="workdir",
help="Working dir of current service")
parser.add_argument(
"--device", type=str, default="gpu", help="Type of device")
parser.add_argument(
"--gpu_ids", type=str, default="", help="gpu ids")
return parser.parse_args()
args = parse_args()
def start_gpu_card_model(gpuid):
def start_gpu_card_model(gpuid, args):
gpuid = int(gpuid)
device = "gpu"
port = args.port
......@@ -79,17 +59,24 @@ def start_gpu_card_model(gpuid):
server.set_gpuid(gpuid)
server.run_server()
if __name__ == "__main__":
gpus = args.gpu_ids.split(",")
def start_multi_card(args):
gpus = ""
if args.gpu_ids == "":
gpus = os.environ["CUDA_VISIBLE_DEVICES"]
else:
gpus = args.gpu_ids.split(",")
if len(gpus) <= 0:
start_gpu_card_model(-1)
else:
gpu_processes = []
for i, gpu_id in enumerate(gpus):
p = Process(target=start_gpu_card_model, args=(i,))
p = Process(target=start_gpu_card_model, args=(i, args, ))
gpu_processes.append(p)
for p in gpu_processes:
p.start()
for p in gpu_processes:
p.join()
if __name__ == "__main__":
args = serve_args()
start_multi_card(args)
......@@ -17,39 +17,20 @@ Usage:
Example:
python -m paddle_serving_server.web_serve --model ./serving_server_model --port 9292
"""
import argparse
import os
from multiprocessing import Pool, Process
from .web_service import WebService
def parse_args():
parser = argparse.ArgumentParser("web_serve")
parser.add_argument(
"--thread", type=int, default=10, help="Concurrency of server")
parser.add_argument(
"--model", type=str, default="", help="Model for serving")
parser.add_argument(
"--port", type=int, default=9292, help="Port the server")
parser.add_argument(
"--workdir",
type=str,
default="workdir",
help="Working dir of current service")
parser.add_argument(
"--device", type=str, default="cpu", help="Type of device")
parser.add_argument(
"--gpu_ids", type=str, default="", help="GPU ids of current service")
parser.add_argument(
"--name", type=str, default="default", help="Default service name")
return parser.parse_args()
import paddle_serving_server_gpu as serving
from paddle_serving_server_gpu import serve_args
if __name__ == "__main__":
args = parse_args()
service = WebService(name=args.name)
service.load_model_config(args.model)
service.prepare_server(
args = serve_args()
web_service = WebService(name=args.name)
web_service.load_model_config(args.model)
if args.gpu_ids == "":
gpu_ids = os.environ["CUDA_VISIBLE_DEVICES"]
gpus = [int(x) for x in gpu_ids.split(",")]
web_service.set_gpus(gpus)
web_service.prepare_server(
workdir=args.workdir, port=args.port, device=args.device)
service.run_server(args.gpu_ids)
service.run_server()
......@@ -15,53 +15,82 @@
from flask import Flask, request, abort
from multiprocessing import Pool, Process
from paddle_serving_server_gpu import OpMaker, OpSeqMaker, Server
import paddle_serving_server_gpu as serving
from paddle_serving_client import Client
from .serve import start_multi_card
import time
import random
class WebService(object):
def __init__(self, name="default_service"):
self.name = name
self.gpus = []
self.rpc_service_list = []
def load_model_config(self, model_config):
self.model_config = model_config
def _launch_rpc_service(self, gpuid):
if gpuid < 0:
def set_gpus(self, gpus):
self.gpus = gpus
def default_rpc_service(self, workdir="conf", port=9292,
gpuid=0, thread_num=10):
device = "gpu"
if gpuid == -1:
device = "cpu"
else:
device = "gpu"
op_maker = OpMaker()
op_maker = serving.OpMaker()
read_op = op_maker.create('general_reader')
general_infer_op = op_maker.create('general_infer')
general_response_op = op_maker.create('general_response')
op_seq_maker = OpSeqMaker()
op_seq_maker = serving.OpSeqMaker()
op_seq_maker.add_op(read_op)
op_seq_maker.add_op(general_infer_op)
op_seq_maker.add_op(general_response_op)
server = Server()
server = serving.Server()
server.set_op_sequence(op_seq_maker.get_op_sequence())
server.set_num_threads(10)
server.set_num_threads(thread_num)
server.load_model_config(self.model_config)
if gpuid >= 0:
server.set_gpuid(gpuid)
server.load_model_config(self.model_config)
server.prepare_server(
workdir="{}_{}".format(self.workdir, gpuid),
port=self.port + gpuid + 1, device=device)
server.run_server()
server.prepare_server(workdir=workdir, port=port, device=device)
return server
def _launch_rpc_service(self, service_idx):
self.rpc_service_list[service_idx].run_server()
def prepare_server(self, workdir="", port=9393, device="gpu", gpuid=0):
self.workdir = workdir
self.port = port
self.device = device
self.gpuid = gpuid
if len(self.gpus) == 0:
# init cpu service
self.rpc_service_list.append(
self.default_rpc_service(self.workdir, self.port+1,
-1, thread_num=10))
else:
for i, gpuid in enumerate(self.gpus):
self.rpc_service_list.append(
self.default_rpc_service("{}_{}".format(self.workdir, i),
self.port+1+i,
gpuid, thread_num=10))
def _launch_web_service(self):
def _launch_web_service(self, gpu_num):
app_instance = Flask(__name__)
client_service = Client()
client_service.load_client_config(
"{}/serving_server_conf.prototxt".format(self.model_config))
client_service.connect(["127.0.0.1:{}".format(self.port + 1)])
client_list = []
if gpu_num > 1:
gpu_num = 0
for i in range(gpu_num):
client_service = Client()
client_service.load_client_config(
"{}/serving_server_conf.prototxt".format(self.model_config))
client_service.connect(["127.0.0.1:{}".format(self.port + i + 1)])
client_list.append(client_service)
time.sleep(1)
service_name = "/" + self.name + "/prediction"
@app_instance.route(service_name, methods=['POST'])
......@@ -71,7 +100,8 @@ class WebService(object):
if "fetch" not in request.json:
abort(400)
feed, fetch = self.preprocess(request.json, request.json["fetch"])
fetch_map = client_service.predict(feed=feed, fetch=fetch)
fetch_map = client_list[0].predict(
feed=feed, fetch=fetch)
fetch_map = self.postprocess(
feed=request.json, fetch=fetch, fetch_map=fetch_map)
return fetch_map
......@@ -81,27 +111,26 @@ class WebService(object):
threaded=False,
processes=1)
def run_server(self, gpu_ids):
def run_server(self):
import socket
localIP = socket.gethostbyname(socket.gethostname())
print("web service address:")
print("http://{}:{}/{}/prediction".format(localIP, self.port,
self.name))
gpus = gpu_ids.split(",")
if len(gpus) <= 0:
self._launch_rpc_service(-1)
else:
gpu_processes = []
for i, gpu_id in gpus:
p = Process(target=self._launch_rpc_service, (i,))
gpu_processes.append(p)
for p in gpu_processes:
p.start()
p_web = Process(target=self._launch_web_service)
for p in gpu_processes:
p.join()
p_web.join()
rpc_processes = []
for idx in range(len(self.rpc_service_list)):
p_rpc = Process(target=self._launch_rpc_service, args=(idx,))
rpc_processes.append(p_rpc)
for p in rpc_processes:
p.start()
p_web = Process(target=self._launch_web_service, args=(len(self.gpus),))
p_web.start()
for p in rpc_processes:
p.join()
p_web.join()
def preprocess(self, feed={}, fetch=[]):
return feed, fetch
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册