提交 1a313516 编写于 作者: D dongdaxiang

refactor pipeline server

上级 65fc3e00
...@@ -12,3 +12,40 @@ ...@@ -12,3 +12,40 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
# pylint: disable=doc-string-missing # pylint: disable=doc-string-missing
import threading
import multiprocessing
import multiprocessing.queues
import sys
if sys.version_info.major == 2:
import Queue
elif sys.version_info.major == 3:
import queue as Queue
else:
raise Exception("Error Python version")
class ChannelDataEcode(enum.Enum):
OK = 0
TIMEOUT = 1
NOT_IMPLEMENTED = 2
TYPE_ERROR = 3
RPC_PACKAGE_ERROR = 4
UNKNOW = 5
class ChannelDataType(enum.Enum):
DICT = 0
CHANNEL_NPDATA = 1
ERROR = 2
class ChannelData(object):
pass
class ThreadChannel(Queue.Queue):
pass
class ProcessChannel(multiprocessing.queues.Queue):
pass
...@@ -12,3 +12,38 @@ ...@@ -12,3 +12,38 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
# pylint: disable=doc-string-missing # pylint: disable=doc-string-missing
class Op(object):
def __init__(self,
name="",
input_ops=[],
server_endpoints=[],
concurrency=1,
timeout=-1,
retry=1):
pass
def _get_input_channel(self):
pass
def _get_output_channel(self):
pass
def preprocess(self, input_dict):
pass
def process(self, feed_dict):
pass
def postprocess(self, fetch_dict):
pass
def stop(self):
pass
def start_with_process(self):
pass
def start_with_thread(self):
pass
...@@ -12,3 +12,23 @@ ...@@ -12,3 +12,23 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
# pylint: disable=doc-string-missing # pylint: disable=doc-string-missing
class PipelineService(pipeline_service_pb2_grpc.PipelineServiceServicer):
def __init__(self, in_channel, out_channel, retry=2):
super(PipelineService, self).__init__()
pass
class PipelineServer(object):
def __init__(self):
pass
def set_response_op(self, response_op):
pass
def prepare_server(self, yml_file):
pass
def run_server(self):
pass
...@@ -12,3 +12,41 @@ ...@@ -12,3 +12,41 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
# pylint: disable=doc-string-missing # pylint: disable=doc-string-missing
class TimeProfiler(object):
def __init__(self):
self._pid = os.getpid()
self._print_head = 'PROFILE\tpid:{}\t'.format(self._pid)
self._time_record = Queue.Queue()
self._enable = False
def enable(self, enable):
self._enable = enable
def record(self, name_with_tag):
if self._enable is False:
return
name_with_tag = name_with_tag.split("_")
tag = name_with_tag[-1]
name = '_'.join(name_with_tag[:-1])
self._time_record.put((name, tag, int(round(time.time() * 1000000))))
def print_profile(self):
if self._enable is False:
return
print_str = self._print_head
tmp = {}
while not self._time_record.empty():
name, tag, timestamp = self._time_record.get()
if name in tmp:
ptag, ptimestamp = tmp.pop(name)
print_str += "{}_{}:{} ".format(name, ptag, ptimestamp)
print_str += "{}_{}:{} ".format(name, tag, timestamp)
else:
tmp[name] = (tag, timestamp)
print_str += "\n"
sys.stderr.write(print_str)
for name, item in tmp.items():
tag, timestamp = item
self._time_record.put((name, tag, timestamp))
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto2";
message Request {
repeated string key = 1;
repeated string value = 2;
};
message Response {
repeated string key = 1;
repeated string value = 2;
};
service PipelineService {
rpc inference(Request) returns (Response) {}
};
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright 2015 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Runs protoc with the gRPC plugin to generate messages and gRPC stubs."""
from grpc_tools import protoc
protoc.main((
'',
'-I.',
'--python_out=.',
'--grpc_python_out=.',
'pipeline_service.proto', ))
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册