提交 815925c5 编写于 作者: C chenxuyi 提交者: Meiyim

upgrade propeller

上级 5e23ec05
......@@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Propeller"""
from __future__ import print_function
from __future__ import absolute_import
from __future__ import unicode_literals
......
......@@ -11,3 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
doc
"""
......@@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Basic Dataset API"""
from __future__ import print_function
from __future__ import absolute_import
from __future__ import unicode_literals
......@@ -39,7 +40,7 @@ __all__ = ['Dataset']
@contextmanager
def open_file(filename, format=None):
def _open_file(filename, format=None):
if format is None:
fd = open(filename, 'rb')
elif format == 'GZIP':
......@@ -50,9 +51,9 @@ def open_file(filename, format=None):
fd.close()
def open_record(filename):
def gen():
with open_file(filename, format='GZIP') as f:
def _open_record(filename):
def _gen():
with _open_file(filename, format='GZIP') as f:
while True:
data = f.read(struct.calcsize('i'))
if not len(data):
......@@ -61,11 +62,11 @@ def open_record(filename):
data = f.read(l)
yield data
return gen
return _gen
def shuffle_func(dataset, buffer_size):
def gen():
def _shuffle_func(dataset, buffer_size):
def _gen():
buf = []
iterable = dataset()
try:
......@@ -82,11 +83,11 @@ def shuffle_func(dataset, buffer_size):
for i in buf:
yield i
return gen
return _gen
def interleave_func(iterable, map_fn, cycle_length, block_length):
def gen():
def _interleave_func(iterable, map_fn, cycle_length, block_length):
def _gen():
ls = itertools.tee(iterable(), cycle_length)
buf = []
for i, j in enumerate(ls):
......@@ -99,11 +100,11 @@ def interleave_func(iterable, map_fn, cycle_length, block_length):
for ii in (i for i in tup if i is not None):
yield ii
return gen
return _gen
def repeat_func(dataset, n):
def gen():
def _repeat_func(dataset, n):
def _gen():
iterable = dataset()
if n >= 0:
ret = itertools.chain(*itertools.tee(iterable, n))
......@@ -113,11 +114,11 @@ def repeat_func(dataset, n):
for i in ret:
yield i
return gen
return _gen
def filter_func(dataset, fn):
def gen():
def _filter_func(dataset, fn):
def _gen():
for i in dataset():
if isinstance(i, tuple) or isinstance(i, list):
if fn(*i) is True:
......@@ -126,41 +127,41 @@ def filter_func(dataset, fn):
if fn(i) is True:
yield i
return gen
return _gen
def map_func(dataset, fn):
def gen():
def _map_func(dataset, fn):
def _gen():
for i in dataset():
if isinstance(i, tuple) or isinstance(i, list):
yield fn(*i)
else:
yield fn(i)
return gen
return _gen
def shard_func(dataset, num_shards, index):
def gen():
def _shard_func(dataset, num_shards, index):
def _gen():
iterable = dataset()
ret = itertools.islice(iterable, index, None, num_shards)
for i in ret:
yield i
return gen
return _gen
def take_func(dataset, count):
def gen():
def _take_func(dataset, count):
def _gen():
iterable = dataset()
ret = itertools.islice(iterable, count)
for i in ret:
yield i
return gen
return _gen
def buffered_func(dataset, size):
def _buffered_func(dataset, size):
"""
Creates a buffered data reader.
......@@ -176,21 +177,21 @@ def buffered_func(dataset, size):
:returns: the buffered data reader.
"""
class EndSignal():
class _EndSignal(object):
pass
end = EndSignal()
end = _EndSignal()
def read_worker(r, q):
def _read_worker(r, q):
for d in r:
q.put(d)
q.put(end)
def data_reader():
def _data_reader():
r = dataset()
q = multiprocessing.Queue(maxsize=size)
t = multiprocessing.Process(
target=read_worker, args=(
target=_read_worker, args=(
r,
q, ))
t.daemon = True
......@@ -200,14 +201,14 @@ def buffered_func(dataset, size):
yield e
e = q.get()
return data_reader
return _data_reader
def padded_batch_func(dataset, batch_size, pad_value=0, max_seqlen=None):
def _padded_batch_func(dataset, batch_size, pad_value=0, max_seqlen=None):
if not isinstance(batch_size, int):
raise ValueError('unknown batch_size: %s' % repr(batch_size))
def gen():
def _gen():
iterable = dataset()
pad_value_t = pad_value
while True:
......@@ -226,71 +227,86 @@ def padded_batch_func(dataset, batch_size, pad_value=0, max_seqlen=None):
if (not np.isscalar(elem)) and elem.shape != ():
max_len = max(map(len,
e)) if max_seqlen is None else max_seqlen
e = map(lambda i: np.pad(i, [0, max_len - len(i)], 'constant', constant_values=pv) if max_len >= len(i) else i[: max_len], e)
def _fn(i):
if max_len >= len(i):
return np.pad(i, [0, max_len - len(i)],
'constant',
constant_values=pv)
else:
return i[:max_len]
e = map(_fn, e)
padded.append(np.stack(list(e)))
yield padded
return gen
return _gen
class Dataset(object):
"""Python Wrapper for PyReader"""
@classmethod
def from_generator_func(cls, gen, data_shapes=None, data_types=None):
if not inspect.isgeneratorfunction(gen):
raise ValueError('expect generator function, got %s' % repr(gen))
def from_generator_func(cls, _gen, data_shapes=None, data_types=None):
"""doc"""
if not inspect.isgeneratorfunction(_gen):
raise ValueError('expect generator function, got %s' % repr(_gen))
def wrapper(): #compat to py3.7
def _wrapper(): #compat to py3.7
try:
for item in gen():
for item in _gen():
yield item
except RuntimeError as e:
if str(e) != 'generator raised StopIteration':
raise e
ret = cls()
ret.generator = wrapper
ret.generator = _wrapper
ret.data_shapes = data_shapes
ret.data_types = data_types
return ret
@classmethod
def from_file(cls, filename, format=None):
"""doc"""
if os.path.getsize(filename) == 0:
raise RuntimeError('%s is empty' % filename)
def gen():
with open_file(filename, format) as f:
def _gen():
with _open_file(filename, format) as f:
for line in f:
yield line
ret = cls()
ret.generator = gen
ret.generator = _gen
ret.data_shapes = []
ret.data_types = str
return ret
@classmethod
def from_record_file(cls, filename):
"""doc"""
if os.path.getsize(filename) == 0:
raise RuntimeError('%s is empty' % filename)
gen = open_record(filename)
_gen = _open_record(filename)
ret = cls()
ret.generator = gen
ret.generator = _gen
ret.data_shapes = []
ret.data_types = str
return ret
@classmethod
def from_list(cls, ls):
"""doc"""
if not isinstance(ls, list):
raise ValueError('expect list, got %s' % repr(ls))
def gen():
def _gen():
for i in ls:
yield i
ret = cls()
ret.generator = gen
ret.generator = _gen
ret.data_shapes = []
ret.data_types = str
return ret
......@@ -339,6 +355,7 @@ class Dataset(object):
@property
def data_shapes(self):
"""doc"""
if self._data_shapes is None:
self._infer_shapes_and_types()
return self._data_shapes
......@@ -347,10 +364,12 @@ class Dataset(object):
@data_shapes.setter
def data_shapes(self, val):
"""doc"""
self._data_shapes = val
@property
def data_types(self):
"""doc"""
if self._data_types is None:
self._infer_shapes_and_types()
return self._data_types
......@@ -359,9 +378,11 @@ class Dataset(object):
@data_types.setter
def data_types(self, val):
"""doc"""
self._data_types = val
def apply(self, transform_func):
"""apply transform func to datasets"""
#input_shapes = transform_func.input_shapes
#input_types = transform_func.input_types
#data_shapes = transform_func.data_shapes
......@@ -377,46 +398,55 @@ class Dataset(object):
return ret
def shuffle(self, buffer_size):
func = functools.partial(shuffle_func, buffer_size=buffer_size)
"""doc"""
func = functools.partial(_shuffle_func, buffer_size=buffer_size)
return self.apply(func)
def repeat(self, n=-1):
func = functools.partial(repeat_func, n=n)
"""doc"""
func = functools.partial(_repeat_func, n=n)
return self.apply(func)
def map(self, fn):
func = functools.partial(map_func, fn=fn)
"""doc"""
func = functools.partial(_map_func, fn=fn)
return self.apply(func)
def filter(self, fn):
func = functools.partial(filter_func, fn=fn)
"""doc"""
func = functools.partial(_filter_func, fn=fn)
return self.apply(func)
def shard(self, num_shards, index):
"""doc"""
func = functools.partial(
shard_func, num_shards=num_shards, index=index)
_shard_func, num_shards=num_shards, index=index)
return self.apply(func)
def interleave(self, map_fn, cycle_length, block_length):
"""doc"""
func = functools.partial(
interleave_func,
_interleave_func,
map_fn=map_fn,
cycle_length=cycle_length,
block_length=block_length)
return self.apply(func)
def padded_batch(self, batch_size, pad_value=0, max_seqlen=None):
"""doc"""
func = functools.partial(
padded_batch_func,
_padded_batch_func,
batch_size=batch_size,
pad_value=pad_value,
max_seqlen=max_seqlen)
return self.apply(func)
def take(self, count=1):
func = functools.partial(take_func, count=count)
"""doc"""
func = functools.partial(_take_func, count=count)
return self.apply(func)
def buffered(self, size=10):
func = functools.partial(buffered_func, size=size)
"""doc"""
func = functools.partial(_buffered_func, size=size)
return self.apply(func)
......@@ -11,6 +11,9 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
doc
"""
from __future__ import print_function
from __future__ import absolute_import
from __future__ import unicode_literals
......
......@@ -11,6 +11,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""global collections"""
from __future__ import print_function
from __future__ import absolute_import
from __future__ import unicode_literals
......@@ -43,13 +45,16 @@ class Collections(object):
_global_collection = None
def add(self, key, val):
"""doc"""
self.col.setdefault(key, []).append(val)
def get(self, key):
"""doc"""
return self.col.get(key, None)
def default_collection():
"""return global collection"""
global _global_collection
if _global_collection is None:
_global_collection = Collections()
......
......@@ -11,6 +11,9 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
doc
"""
from __future__ import print_function
from __future__ import absolute_import
from __future__ import unicode_literals
......
......@@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""FeatureColumns and many Column"""
from __future__ import print_function
from __future__ import absolute_import
from __future__ import unicode_literals
......@@ -30,7 +31,7 @@ import numpy as np
from glob import glob
from propeller.paddle.train import distribution
from propeller.data.functional import interleave_func
from propeller.data.functional import _interleave_func
from propeller.paddle.data.functional import Dataset
from propeller.paddle.data import example_pb2, feature_pb2
......@@ -43,35 +44,47 @@ __all__ = [
def basic_tokenizer(sen):
"""doc"""
seg = sen.split(b' ')
seg = filter(lambda i: i != b' ', seg)
return seg
class Column():
class Column(object):
"""doc"""
def __init__(self, name):
"""doc"""
pass
def raw_to_proto(self, raw):
"""doc"""
return feature_pb2.Feature()
@property
def output_shapes(self):
"""doc"""
pass
@property
def output_types(self):
"""doc"""
pass
def proto_to_instance(self, proto):
"""doc"""
raise NotImplementedError()
def raw_to_instance(self, raw):
"""doc"""
raise NotImplementedError()
class LabelColumn(Column):
"""doc"""
def __init__(self, name, vocab_dict=None, vocab_file=None):
"""doc"""
self.name = name
self.vocab = None
if vocab_file:
......@@ -84,13 +97,16 @@ class LabelColumn(Column):
@property
def output_shapes(self):
"""doc"""
return [1]
@property
def output_types(self):
"""doc"""
return 'int64'
def raw_to_proto(self, raw):
"""doc"""
if self.vocab is None:
ids = [int(raw)]
else:
......@@ -99,10 +115,12 @@ class LabelColumn(Column):
return fe
def proto_to_instance(self, feature):
"""doc"""
ret = np.array(feature.int64_list.value[0], dtype=np.int64)
return ret
def raw_to_instance(self, raw):
"""doc"""
if self.vocab is None:
ids = int(raw)
else:
......@@ -111,6 +129,8 @@ class LabelColumn(Column):
class TextColumn(Column):
"""doc"""
def __init__(self,
name,
unk_id,
......@@ -132,63 +152,75 @@ class TextColumn(Column):
@property
def output_shapes(self):
"""doc"""
return [-1]
@property
def output_types(self):
"""doc"""
return 'int64'
def raw_to_proto(self, raw):
"""doc"""
ids = [self.vocab.get(s, self.unk_id) for s in self.tokenizer(raw)]
fe = feature_pb2.Feature(int64_list=feature_pb2.Int64List(value=ids))
return fe
def proto_to_instance(self, feature):
"""doc"""
ret = np.array(feature.int64_list.value, dtype=np.int64)
return ret
def raw_to_instance(self, raw):
"""doc"""
ids = [self.vocab.get(s, self.unk_id) for s in self.tokenizer(raw)]
return np.array(ids, dtype=np.int64)
class TextIDColumn(Column):
"""doc"""
def __init__(self, name):
"""doc"""
self.name = name
@property
def output_shapes(self):
"""doc"""
return [-1]
@property
def output_types(self):
"""doc"""
return 'int64'
def raw_to_proto(self, raw):
"""doc"""
ids = [int(s) for s in raw.split(b' ')]
fe = feature_pb2.Feature(int64_list=feature_pb2.Int64List(value=ids))
return fe
def proto_to_instance(self, feature):
"""doc"""
ret = np.array(feature.int64_list.value, dtype=np.int64)
return ret
def raw_to_instance(self, raw):
"""doc"""
ret = np.array([int(i) for i in raw.split(b' ')], dtype=np.int64)
return ret
class FeatureColumns(object):
def __init__(self, columns, pad_id=0):
self._columns = columns
def raw_files(self, raw_dir):
def _list_files(raw_dir):
return [os.path.join(raw_dir, p) for p in os.listdir(raw_dir)]
def gz_files(self, gz_dir):
return None if gz_dir is None else [
os.path.join(gz_dir, p) for p in os.listdir(gz_dir)
]
class FeatureColumns(object):
"""A Dataset Factory object"""
def __init__(self, columns):
"""doc"""
self._columns = columns
def _make_gz_dataset(self, raw_dir, gz_dir):
assert raw_dir or gz_dir, 'data_dir not specified when using gz mode'
......@@ -237,7 +269,7 @@ class FeatureColumns(object):
if shuffle:
dataset = dataset.shuffle(buffer_size=len(gz_files))
fn = partial(
interleave_func,
_interleave_func,
map_fn=lambda filename: Dataset.from_record_file(filename),
cycle_length=len(gz_files),
block_length=1)
......@@ -271,7 +303,7 @@ class FeatureColumns(object):
dataset = dataset.shuffle(buffer_size=len(data_files))
fn = partial(
interleave_func,
_interleave_func,
map_fn=lambda filename: Dataset.from_file(filename),
cycle_length=len(data_files),
block_length=1)
......@@ -294,7 +326,7 @@ class FeatureColumns(object):
def _read_stdin_dataset(self, encoding='utf8', shuffle=False, **kwargs):
log.info('reading raw files stdin')
def gen():
def _gen():
if six.PY3:
source = sys.stdin.buffer
else:
......@@ -305,12 +337,12 @@ class FeatureColumns(object):
break
yield line,
dataset = Dataset.from_generator_func(gen)
dataset = Dataset.from_generator_func(_gen)
if shuffle:
dataset = dataset.shuffle(buffer_size=1000)
def _parse_stdin(record_str):
'''function that takes python_str as input'''
"""function that takes python_str as input"""
features = record_str.strip(b'\n').split(b'\t')
ret = [
column.raw_to_instance(feature)
......@@ -346,13 +378,17 @@ class FeatureColumns(object):
gz_dir=None,
data_file=None,
**kwargs):
"""
build `Dataset` from `data_dir` or `data_file`
if `use_gz`, will try to convert data_files to gz format and save to `gz_dir`, if `gz_dir` not given, will create one.
"""
if use_gz:
gz_dir = self._make_gz_dataset(data_dir, gz_dir)
gz_files = self.gz_files(gz_dir)
gz_files = _list_files(gz_dir) if gz_dir is not None else gz_dir
ds = self._read_gz_dataset(gz_files, **kwargs)
else:
if data_dir is not None:
data_files = self.raw_files(data_dir)
data_files = _list_files(data_dir)
elif data_file is not None:
data_files = [data_file]
else:
......@@ -362,6 +398,7 @@ class FeatureColumns(object):
return ds
def build_dataset_from_stdin(self, name, **kwargs):
"""doc"""
ds = self._read_stdin_dataset(**kwargs)
ds.name = name
return ds
......
......@@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Pyreader based Dataset"""
import sys
import numpy as np
......@@ -25,7 +26,10 @@ log = logging.getLogger(__name__)
class Dataset(DatasetBase):
"""Pyreader based Dataset"""
def placeholders(self):
"""doc"""
if self.name is None:
raise ValueError('can not get feature from unnamed Dataset')
......@@ -41,7 +45,7 @@ class Dataset(DatasetBase):
return ret
def features(self):
'''start point of net building. call this in a program scope'''
"""start point of net building. call this in a program scope"""
if self.name is None:
raise ValueError('can not get feature from unnamed Dataset')
......@@ -51,9 +55,13 @@ class Dataset(DatasetBase):
(repr(self._data_shapes), repr(self._data_types)))
return self.placeholders()
def start(self, places=F.cuda_places()):
def start(self, places=None):
"""start Pyreader"""
if places is None:
places = F.cuda_places() if F.core.is_compiled_with_cuda(
) else F.cpu_places()
#assert self.pyreader is not None, 'use Dataset.features to build net first, then start dataset'
def gen():
def _gen():
try:
for idx, i in enumerate(self.generator()):
yield i
......@@ -63,5 +71,5 @@ class Dataset(DatasetBase):
r = F.io.PyReader(
feed_list=self.placeholders(), capacity=50, iterable=True)
r.decorate_batch_generator(gen, places=places)
r.decorate_batch_generator(_gen, places=places)
return r()
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import division
from __future__ import absolute_import
from __future__ import print_function
from __future__ import unicode_literals
import logging
import six
import asyncio
import threading
import grpc
from propeller.service import interface_pb2
from propeller.service import interface_pb2_grpc
import propeller.paddle.service.utils as serv_utils
from concurrent.futures import ThreadPoolExecutor
import paddle.fluid as F
from time import sleep, time
log = logging.getLogger(__name__)
def profile(msg):
def decfn(fn):
def retfn(*args, **kwargs):
start = time()
ret = fn(*args, **kwargs)
end = time()
log.debug('%s timecost: %.5f' % (msg, end - start))
return ret
return retfn
return decfn
def serve(model_dir, host, num_concurrent=None):
if six.PY2:
raise RuntimeError('propeller service work in python3 only')
num_worker = len(F.cuda_places(
)) if num_concurrent is None else num_concurrent
pool = ThreadPoolExecutor(num_worker)
class Predictor(object):
def __init__(self, did):
log.debug('create predictor on card %d' % did)
config = F.core.AnalysisConfig(model_dir)
config.enable_use_gpu(5000, did)
self._predictor = F.core.create_paddle_predictor(config)
@profile('paddle')
def __call__(self, args):
for i, a in enumerate(args):
a.name = 'placeholder_%d' % i
res = self._predictor.run(args)
return res
predictor_context = {}
class InferenceService(interface_pb2_grpc.InferenceServicer):
@profile('service')
def Infer(self, request, context):
try:
slots = request.slots
current_thread = threading.current_thread()
log.debug('%d slots received dispatch to thread %s' %
(len(slots), current_thread))
if current_thread not in predictor_context:
did = list(pool._threads).index(current_thread)
log.debug('spawning worker thread %d' % did)
predictor = Predictor(did)
predictor_context[current_thread] = predictor
else:
predictor = predictor_context[current_thread]
slots = [serv_utils.slot_to_paddlearray(s) for s in slots]
ret = predictor(slots)
response = [serv_utils.paddlearray_to_slot(r) for r in ret]
except Exception as e:
log.exception(e)
raise e
return interface_pb2.Slots(slots=response)
server = grpc.server(pool)
interface_pb2_grpc.add_InferenceServicer_to_server(InferenceService(),
server)
server.add_insecure_port(host)
server.start()
log.info('server started on %s...' % host)
try:
while True:
sleep(100000)
except KeyboardInterrupt as e:
pass
log.info('server stoped...')
if __name__ == '__main__':
from propeller import log
log.setLevel(logging.DEBUG)
serve(
'/home/work/chenxuyi/playground/grpc_play/ernie2.0/',
'10.255.138.19:8334',
num_concurrent=3)
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import division
from __future__ import absolute_import
from __future__ import print_function
from __future__ import unicode_literals
import struct
from propeller.service import interface_pb2
from propeller.service import interface_pb2_grpc
import paddle.fluid.core as core
def slot_to_paddlearray(slot):
if slot.type == interface_pb2.Slot.FP32:
type_str = 'f'
dtype = core.PaddleDType.FLOAT32
elif slot.type == interface_pb2.Slot.INT32:
type_str = 'i'
dtype = core.PaddleDType.INT32
elif slot.type == interface_pb2.Slot.INT64:
type_str = 'q'
dtype = core.PaddleDType.INT64
else:
raise RuntimeError('know type %s' % slot.type)
ret = core.PaddleTensor()
ret.shape = slot.dims
ret.dtype = dtype
num = len(slot.data) // struct.calcsize(type_str)
arr = struct.unpack('%d%s' % (num, type_str), slot.data)
ret.data = core.PaddleBuf(arr)
return ret
def paddlearray_to_slot(arr):
if arr.dtype == core.PaddleDType.FLOAT32:
dtype = interface_pb2.Slot.FP32
type_str = 'f'
arr_data = arr.data.float_data()
elif arr.dtype == core.PaddleDType.INT32:
dtype = interface_pb2.Slot.INT32
type_str = 'i'
arr_data = arr.data.int32_data()
elif arr.dtype == core.PaddleDType.INT64:
dtype = interface_pb2.Slot.INT64
type_str = 'q'
arr_data = arr.data.int64_data()
else:
raise RuntimeError('know type %s' % arr.dtype)
data = struct.pack('%d%s' % (len(arr_data), type_str), *arr_data)
pb = interface_pb2.Slot(type=dtype, dims=list(arr.shape), data=data)
return pb
......@@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""record summary tensor in a collection scope"""
from __future__ import print_function
from __future__ import absolute_import
......@@ -23,6 +24,7 @@ from propeller.paddle.collection import default_collection, Key
def scalar(name, tensor):
"""scalar summary"""
if not isinstance(tensor, F.framework.Variable):
raise ValueError('expect paddle Variable, got %s' % repr(tensor))
tensor.persistable = True
......@@ -30,6 +32,7 @@ def scalar(name, tensor):
def histogram(name, tensor):
"""histogram summary"""
if not isinstance(tensor, F.framework.Variable):
raise ValueError('expect paddle Variable, got %s' % repr(tensor))
tensor.persistable = True
......
......@@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Propeller training"""
from __future__ import print_function
from __future__ import absolute_import
......
......@@ -128,6 +128,7 @@ def init_distribuition_env(program):
elif status.mode == DistributionMode.NCCL:
config = F.DistributeTranspilerConfig()
config.mode = "nccl2"
config.nccl_comm_num = 1
F.DistributeTranspiler(config=config).transpile(
status.replica_id,
trainers=','.join(status._env),
......
......@@ -11,6 +11,9 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
exporters
"""
from __future__ import print_function
from __future__ import absolute_import
from __future__ import unicode_literals
......@@ -19,6 +22,7 @@ import sys
import os
import itertools
import six
import inspect
import abc
import logging
......@@ -28,24 +32,36 @@ import paddle.fluid.layers as L
from propeller.paddle.train import Saver
from propeller.types import InferenceSpec
from propeller.train.model import Model
from propeller.paddle.train.trainer import _build_net
from propeller.paddle.train.trainer import _build_model_fn
from propeller.types import RunMode
from propeller.types import ProgramPair
log = logging.getLogger(__name__)
@six.add_metaclass(abc.ABCMeta)
class Exporter():
class Exporter(object):
"""base exporter"""
@abc.abstractmethod
def export(self, exe, program, eval_result, state):
"""export"""
raise NotImplementedError()
class BestExporter(Exporter):
"""export saved model accordingto `cmp_fn`"""
def __init__(self, export_dir, cmp_fn):
"""doc"""
self._export_dir = export_dir
self._best = None
self.cmp_fn = cmp_fn
def export(self, exe, program, eval_model_spec, eval_result, state):
"""doc"""
log.debug('New evaluate result: %s \nold: %s' %
(repr(eval_result), repr(self._best)))
if self._best is None or self.cmp_fn(old=self._best, new=eval_result):
......@@ -65,40 +81,85 @@ class BestExporter(Exporter):
class BestInferenceModelExporter(Exporter):
def __init__(self, export_dir, cmp_fn):
"""export inference model accordingto `cmp_fn`"""
def __init__(self,
export_dir,
cmp_fn,
model_class_or_model_fn=None,
hparams=None,
dataset=None):
"""doc"""
self._export_dir = export_dir
self._best = None
self.cmp_fn = cmp_fn
self.model_class_or_model_fn = model_class_or_model_fn
self.hparams = hparams
self.dataset = dataset
def export(self, exe, program, eval_model_spec, eval_result, state):
"""doc"""
if self.model_class_or_model_fn is not None and self.hparams is not None \
and self.dataset is not None:
log.info('Building program by user defined model function')
if issubclass(self.model_class_or_model_fn, Model):
_model_fn = _build_model_fn(self.model_class_or_model_fn)
elif inspect.isfunction(self.model_class_or_model_fn):
_model_fn = self.model_class_or_model_fn
else:
raise ValueError('unknown model %s' %
self.model_class_or_model_fn)
# build net
infer_program = F.Program()
startup_prog = F.Program()
with F.program_guard(infer_program, startup_prog):
#share var with Train net
with F.unique_name.guard():
log.info('Building Infer Graph')
infer_fea = self.dataset.features()
# run_config is None
self.model_spec = _build_net(_model_fn, infer_fea,
RunMode.PREDICT, self.hparams,
None)
log.info('Done')
infer_program = infer_program.clone(for_test=True)
self.program = ProgramPair(
train_program=infer_program, startup_program=startup_prog)
else:
self.program = program
self.model_spec = eval_model_spec
log.debug('New evaluate result: %s \nold: %s' %
(repr(eval_result), repr(self._best)))
if self._best is None or self.cmp_fn(old=self._best, new=eval_result):
log.debug('[Best Exporter]: export to %s' % self._export_dir)
if eval_model_spec.inference_spec is None:
if self.model_spec.inference_spec is None:
raise ValueError('model_fn didnt return InferenceSpec')
inf_sepc_dict = eval_model_spec.inference_spec
if not isinstance(inf_sepc_dict, dict):
inf_sepc_dict = {'inference': inf_sepc_dict}
for inf_sepc_name, inf_sepc in six.iteritems(inf_sepc_dict):
if not isinstance(inf_sepc, InferenceSpec):
raise ValueError('unknown inference spec type: %s' % inf_sepc)
inf_spec_dict = self.model_spec.inference_spec
if not isinstance(inf_spec_dict, dict):
inf_spec_dict = {'inference': inf_spec_dict}
for inf_spec_name, inf_spec in six.iteritems(inf_spec_dict):
if not isinstance(inf_spec, InferenceSpec):
raise ValueError('unknow inference spec type: %s' %
inf_spec)
save_dir = os.path.join(self._export_dir, inf_sepc_name)
save_dir = os.path.join(self._export_dir, inf_spec_name)
log.debug('[Best Exporter]: save inference model: "%s" to %s' %
(inf_sepc_name, save_dir))
feed_var = [i.name for i in inf_sepc.inputs]
fetch_var = inf_sepc.outputs
(inf_spec_name, save_dir))
feed_var = [i.name for i in inf_spec.inputs]
fetch_var = inf_spec.outputs
eval_program = program.train_program
infer_program = self.program.train_program
startup_prog = F.Program()
F.io.save_inference_model(
save_dir,
feed_var,
fetch_var,
exe,
main_program=eval_program)
main_program=infer_program)
self._best = eval_result
else:
log.debug('[Best Exporter]: skip step %s' % state.gstep)
......@@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""train hooks"""
from __future__ import print_function
from __future__ import absolute_import
from __future__ import unicode_literals
......@@ -38,44 +39,56 @@ log = logging.getLogger(__name__)
class RunHook(object):
"""RunHook Base class"""
def __init__(self):
"""doc"""
pass
def before_train(self):
def before_train(self, program):
"""doc"""
pass
def before_run(self, state):
"""doc"""
return []
def after_run(self, res_list, state):
"""doc"""
pass
def should_stop(self, state):
"""doc"""
return False
def after_train(self):
"""doc"""
pass
class TqdmProgressBarHook(RunHook):
"""show a progress bar when training"""
def __init__(self, max_steps, desc=None):
"""doc"""
self.tqdm = None
import tqdm
from propeller import log as main_log
hdl = main_log.handlers[0]
class TqdmLogginHandler(logging.Handler):
class _TqdmLogginHandler(logging.Handler):
def emit(self, record):
"""doc"""
try:
msg = self.format(record)
tqdm.tqdm.write(msg, file=sys.stderr)
self.flush()
except (KeyboardInterrupt, SystemExit):
raise
except (KeyboardInterrupt, SystemExit) as e:
raise e
except:
self.handleError(record)
tqdm_hdl = TqdmLogginHandler()
tqdm_hdl = _TqdmLogginHandler()
tqdm_hdl.setFormatter(hdl.formatter)
main_log.removeHandler(hdl)
main_log.addHandler(tqdm_hdl)
......@@ -91,46 +104,55 @@ class TqdmProgressBarHook(RunHook):
class TqdmNotebookProgressBarHook(RunHook):
"""show a progress bar when training"""
def __init__(self, max_steps, desc=None):
"""doc"""
self.tqdm = None
import tqdm
from propeller import log as main_log
hdl = main_log.handlers[0]
class TqdmLogginHandler(logging.Handler):
class _TqdmLogginHandler(logging.Handler):
def emit(self, record):
"""doc"""
try:
msg = self.format(record)
tqdm.tqdm.write(msg, file=sys.stderr)
self.flush()
except (KeyboardInterrupt, SystemExit):
raise
except (KeyboardInterrupt, SystemExit) as e:
raise e
except:
self.handleError(record)
tqdm_hdl = TqdmLogginHandler()
tqdm_hdl = _TqdmLogginHandler()
tqdm_hdl.setFormatter(hdl.formatter)
main_log.removeHandler(hdl)
main_log.addHandler(tqdm_hdl)
self.tqdm = tqdm.tqdm_notebook(total=max_steps, desc=None)
def before_run(self, state):
"""doc"""
self.tqdm.n = state.gstep
self.tqdm.refresh()
return []
def __del__(self):
"""doc"""
if self.tqdm:
self.tqdm.close()
class LoggingHook(RunHook):
"""log tensor in to screan and tensorboard"""
def __init__(self,
loss,
per_step=10,
skip_step=100,
summary_writer=None,
summary_record=None):
"""doc"""
if per_step is None or skip_step is None:
raise ValueError('wrong step argument, per step: %d skip_step %d' %
(per_step, skip_step))
......@@ -141,7 +163,8 @@ class LoggingHook(RunHook):
self.writer = summary_writer
self.last_state = None
def before_train(self):
def before_train(self, program):
"""doc"""
if self.summary_record:
if self.summary_record.scalar:
self.s_name, self.s_tolog = zip(*self.summary_record.scalar)
......@@ -154,6 +177,7 @@ class LoggingHook(RunHook):
self.h_name, self.h_tolog = [], []
def before_run(self, state):
"""doc"""
if state.gstep % self.per_step == 0 and state.step > self.skip_step:
ret = [self.loss]
if self.summary_record:
......@@ -164,6 +188,7 @@ class LoggingHook(RunHook):
return []
def after_run(self, res_list, state):
"""doc"""
if state.gstep % self.per_step == 0 and state.step > self.skip_step:
if not self.summary_record:
return
......@@ -209,11 +234,15 @@ class LoggingHook(RunHook):
class StopAtStepHook(RunHook):
"""stop training at some step"""
def __init__(self, stop_global_step, stop_step):
"""doc"""
self._stop_gstep = stop_global_step
self._stop_step = stop_step
def should_stop(self, state):
"""doc"""
if (self._stop_gstep and state.gstep >= self._stop_gstep) or \
(self._stop_step and state.step >= self._stop_step):
log.info('StopAtStepHook called stop')
......@@ -226,6 +255,7 @@ class EvalHook(RunHook):
"""hook this on a eval Executor"""
def __init__(self, metrics, summary_writer=None):
"""doc"""
self.writer = summary_writer
self._result = None
......@@ -244,11 +274,13 @@ class EvalHook(RunHook):
else:
self.names, self.metrics = [], []
def before_train(self):
def before_train(self, program):
"""doc"""
for m in self.metrics:
m.reset()
def before_run(self, state):
"""doc"""
ls = [m.tensor for m in self.metrics]
for i in ls:
if not (isinstance(i, list) or isinstance(i, tuple)):
......@@ -265,15 +297,18 @@ class EvalHook(RunHook):
return ls_flt
def after_run(self, res_list, state):
"""doc"""
res = util.unflatten(res_list, self.schema)
for r, m in zip(res, self.metrics):
m.update(r)
@property
def result(self):
"""doc"""
return self._result
def after_train(self):
"""doc"""
printable = []
self._result = {}
for n, m in zip(self.names, self.metrics):
......@@ -284,12 +319,16 @@ class EvalHook(RunHook):
class CheckpointSaverHook(RunHook):
"""Save checkpoint every n step"""
def __init__(self, saver, per_step=10, skip_step=100):
"""doc"""
self.saver = saver
self.per_step = per_step
self.skip_step = skip_step
def after_run(self, res_list, state):
"""doc"""
if state.gstep % self.per_step == 0 and \
state.step > self.skip_step:
self.saver.save(state)
......@@ -11,9 +11,12 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""predefined metrics"""
import sys
import os
import six
import numpy as np
import itertools
import logging
......@@ -31,98 +34,132 @@ __all__ = [
class Metrics(object):
"""Metrics base class"""
def __init__(self):
"""doc"""
self.saver = []
@property
def tensor(self):
"""doc"""
pass
def update(self, *args):
"""doc"""
pass
def eval(self):
"""doc"""
pass
class Mean(Metrics):
"""doc"""
def __init__(self, t):
"""doc"""
self.t = t
self.reset()
def reset(self):
"""doc"""
self.saver = np.array([])
@property
def tensor(self):
"""doc"""
self.t.persistable = True
return self.t,
def update(self, args):
"""doc"""
t, = args
t = t.reshape([-1])
self.saver = np.concatenate([self.saver, t])
def eval(self):
"""doc"""
return self.saver.mean()
class Ppl(Mean):
"""doc"""
def eval(self):
"""doc"""
return np.exp(self.saver.mean())
class Acc(Mean):
"""doc"""
def __init__(self, label, pred):
"""doc"""
self.eq = L.equal(pred, label)
self.reset()
@property
def tensor(self):
"""doc"""
self.eq.persistable = True
return self.eq,
class MSE(Mean):
"""doc"""
def __init__(self, label, pred):
"""doc"""
diff = pred - label
self.mse = diff * diff
self.reset()
@property
def tensor(self):
"""doc"""
self.mse.persistable = True
return self.mse,
class Cosine(Mean):
"""doc"""
def __init__(self, label, pred):
"""doc"""
self.cos = L.cos_sim(label, pred)
self.reset()
@property
def tensor(self):
"""doc"""
self.cos.persistable = True
return self.cos,
class Precision(Metrics):
"""doc"""
def __init__(self, label, pred):
"""doc"""
self.label = label
self.pred = pred
self.reset()
def reset(self):
"""doc"""
self.label_saver = np.array([], dtype=np.bool)
self.pred_saver = np.array([], dtype=np.bool)
@property
def tensor(self):
"""doc"""
self.label.persistable = True
self.pred.persistable = True
return self.label, self.pred
def update(self, args):
"""doc"""
label, pred = args
label = label.reshape([-1]).astype(np.bool)
pred = pred.reshape([-1]).astype(np.bool)
......@@ -134,20 +171,27 @@ class Precision(Metrics):
self.pred_saver = np.concatenate([self.pred_saver, pred])
def eval(self):
"""doc"""
tp = (self.label_saver & self.pred_saver).astype(np.int64).sum()
t = self.label_saver.astype(np.int64).sum()
return tp / t
class Recall(Precision):
"""doc"""
def eval(self):
"""doc"""
tp = (self.label_saver & self.pred_saver).astype(np.int64).sum()
p = (self.label_saver).astype(np.int64).sum()
return tp / p
class F1(Precision):
"""doc"""
def eval(self):
"""doc"""
tp = (self.label_saver & self.pred_saver).astype(np.int64).sum()
t = self.label_saver.astype(np.int64).sum()
p = self.pred_saver.astype(np.int64).sum()
......@@ -157,22 +201,28 @@ class F1(Precision):
class Auc(Metrics):
"""doc"""
def __init__(self, label, pred):
"""doc"""
self.pred = pred
self.label = label
self.reset()
def reset(self):
"""doc"""
self.pred_saver = np.array([], dtype=np.float32)
self.label_saver = np.array([], dtype=np.bool)
@property
def tensor(self):
"""doc"""
self.pred.persistable = True
self.label.persistable = True
return [self.pred, self.label]
def update(self, args):
"""doc"""
pred, label = args
pred = pred.reshape([-1]).astype(np.float32)
label = label.reshape([-1]).astype(np.bool)
......@@ -180,6 +230,7 @@ class Auc(Metrics):
self.label_saver = np.concatenate([self.label_saver, label])
def eval(self):
"""doc"""
fpr, tpr, thresholds = sklearn.metrics.roc_curve(
self.label_saver.astype(np.int64), self.pred_saver)
auc = sklearn.metrics.auc(fpr, tpr)
......@@ -187,11 +238,15 @@ class Auc(Metrics):
class RecallAtPrecision(Auc):
"""doc"""
def __init__(self, label, pred, precision=0.9):
"""doc"""
super(RecallAtPrecision, self).__init__(label, pred)
self.precision = precision
def eval(self):
"""doc"""
self.pred_saver = self.pred_saver.reshape(
[self.label_saver.size, -1])[:, -1]
precision, recall, thresholds = sklearn.metrics.precision_recall_curve(
......@@ -202,11 +257,15 @@ class RecallAtPrecision(Auc):
class PrecisionAtThreshold(Auc):
"""doc"""
def __init__(self, label, pred, threshold=0.5):
"""doc"""
super().__init__(label, pred)
self.threshold = threshold
def eval(self):
"""doc"""
infered = self.pred_saver > self.threshold
correct_num = np.array(infered & self.label_saver).sum()
infer_num = infered.sum()
......@@ -214,25 +273,31 @@ class PrecisionAtThreshold(Auc):
class Mrr(Metrics):
"""doc"""
def __init__(self, qid, label, pred):
"""doc"""
self.qid = qid
self.label = label
self.pred = pred
self.reset()
def reset(self):
"""doc"""
self.qid_saver = np.array([], dtype=np.int64)
self.label_saver = np.array([], dtype=np.int64)
self.pred_saver = np.array([], dtype=np.float32)
@property
def tensor(self):
"""doc"""
self.qid.persistable = True
self.label.persistable = True
self.pred.persistable = True
return [self.qid, self.label, self.pred]
def update(self, args):
"""doc"""
qid, label, pred = args
if not (qid.shape[0] == label.shape[0] == pred.shape[0]):
raise ValueError(
......@@ -246,10 +311,12 @@ class Mrr(Metrics):
[self.pred_saver, pred.reshape([-1]).astype(np.float32)])
def eval(self):
def key_func(tup):
"""doc"""
def _key_func(tup):
return tup[0]
def calc_func(tup):
def _calc_func(tup):
ranks = [
1. / (rank + 1.)
for rank, (_, l, p) in enumerate(
......@@ -262,19 +329,22 @@ class Mrr(Metrics):
return 0.
mrr_for_qid = [
calc_func(tup)
_calc_func(tup)
for _, tup in itertools.groupby(
sorted(
zip(self.qid_saver, self.label_saver, self.pred_saver),
key=key_func),
key=key_func)
key=_key_func),
key=_key_func)
]
mrr = np.float32(sum(mrr_for_qid) / len(mrr_for_qid))
return mrr
class ChunkF1(Metrics):
"""doc"""
def __init__(self, label, pred, seqlen, num_label):
"""doc"""
self.label = label
self.pred = pred
self.seqlen = seqlen
......@@ -327,18 +397,21 @@ class ChunkF1(Metrics):
return chunks
def reset(self):
"""doc"""
self.label_cnt = 0
self.pred_cnt = 0
self.correct_cnt = 0
@property
def tensor(self):
"""doc"""
self.pred.persistable = True
self.label.persistable = True
self.seqlen.persistable = True
return [self.pred, self.label, self.seqlen]
def update(self, args):
"""doc"""
pred, label, seqlen = args
pred = pred.reshape([-1]).astype(np.int32).tolist()
label = label.reshape([-1]).astype(np.int32).tolist()
......@@ -374,6 +447,7 @@ class ChunkF1(Metrics):
label_index += 1
def eval(self):
"""doc"""
if self.pred_cnt == 0:
precision = 0.0
else:
......@@ -393,23 +467,29 @@ class ChunkF1(Metrics):
class PNRatio(Metrics):
"""doc"""
def __init__(self, qid, label, pred):
"""doc"""
self.qid = qid
self.label = label
self.pred = pred
self.saver = {}
def reset(self):
"""doc"""
self.saver = {}
@property
def tensor(self):
"""doc"""
self.qid.persistable = True
self.label.persistable = True
self.pred.persistable = True
return [self.qid, self.label, self.pred]
def update(self, args):
"""doc"""
qid, label, pred = args
if not (qid.shape[0] == label.shape[0] == pred.shape[0]):
raise ValueError('dimention not match: qid[%s] label[%s], pred[%s]'
......@@ -424,6 +504,7 @@ class PNRatio(Metrics):
self.saver[q].append((l, p))
def eval(self):
"""doc"""
p = 0
n = 0
for qid, outputs in self.saver.items():
......@@ -446,10 +527,14 @@ class PNRatio(Metrics):
class BinaryPNRatio(PNRatio):
"""doc"""
def __init__(self, qid, label, pred):
"""doc"""
super(BinaryPNRatio, self).__init__(qid, label, pred)
def eval(self):
"""doc"""
p = 0
n = 0
for qid, outputs in self.saver.items():
......@@ -474,7 +559,10 @@ class BinaryPNRatio(PNRatio):
class PrecisionAtK(Metrics):
"""doc"""
def __init__(self, qid, label, pred, k=1):
"""doc"""
self.qid = qid
self.label = label
self.pred = pred
......@@ -482,16 +570,19 @@ class PrecisionAtK(Metrics):
self.saver = {}
def reset(self):
"""doc"""
self.saver = {}
@property
def tensor(self):
"""doc"""
self.qid.persistable = True
self.label.persistable = True
self.pred.persistable = True
return [self.qid, self.label, self.pred]
def update(self, args):
"""doc"""
qid, label, pred = args
if not (qid.shape[0] == label.shape[0] == pred.shape[0]):
raise ValueError('dimention not match: qid[%s] label[%s], pred[%s]'
......@@ -507,6 +598,7 @@ class PrecisionAtK(Metrics):
self.saver[q].append((l, p))
def eval(self):
"""doc"""
right = 0
total = 0
for v in self.saver.values():
......
......@@ -11,6 +11,10 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
doc
"""
from __future__ import print_function
from __future__ import absolute_import
from __future__ import unicode_literals
......@@ -37,9 +41,17 @@ log = logging.getLogger(__name__)
__all__ = ['MonitoredExecutor', 'Saver']
def _get_one_place():
return F.cuda_places()[0] if F.core.is_compiled_with_cuda(
) else F.cpu_places()[0]
class RunState(object):
"""serializable Run state object"""
@classmethod
def from_str(cls, s):
"""doc"""
j = json.loads(s)
ret = RunState()
ret._gstep = j['global_step']
......@@ -48,29 +60,36 @@ class RunState(object):
return ret
def __init__(self):
"""doc"""
self._gstep = 0
self._step = 0
self._time = time()
@property
def gstep(self):
"""doc"""
return self._gstep
@property
def step(self):
"""doc"""
return self._step
@property
def time(self):
"""doc"""
return self._time
def __repr__(self):
"""doc"""
return repr({'global_step': self._gstep, 'time': self._time})
def serialize(self):
"""doc"""
return json.dumps({'global_step': self._gstep, 'time': self._time})
def next(self):
"""doc"""
ret = RunState()
ret._gstep = self._gstep + 1
ret._step = self._step + 1
......@@ -79,12 +98,15 @@ class RunState(object):
class Saver(object):
"""checkpoint saver and manager"""
def __init__(self,
save_dir,
exe,
program,
save_prefix='model',
max_ckpt_to_keep=None):
"""doc"""
if exe is not None:
assert isinstance(
exe, F.Executor
......@@ -108,9 +130,11 @@ class Saver(object):
@property
def last_ckpt(self):
"""doc"""
return self.ckpt_list[-1] if len(self.ckpt_list) else None
def save(self, state):
"""doc"""
save_name = '%s_%d' % (self._save_prefix, state.gstep)
save_dir = os.path.join(self._save_dir, save_name)
tmp_dir = os.path.join(self._save_dir, 'tmp')
......@@ -139,28 +163,26 @@ class Saver(object):
open(self.ckpt_info_path, 'w').write('\n'.join(self.ckpt_list))
def restore(self, ckpt=-1):
if not isinstance(ckpt, (int, ) + six.string_types):
raise ValueError('ckpt type not understood %s' % repr(ckpt))
"""doc"""
if isinstance(ckpt, int):
try:
ckpt = self.ckpt_list[ckpt]
path = os.path.join(self._save_dir, self.ckpt_list[ckpt])
except IndexError:
raise ValueError('invalid restore ckpt number %d' % ckpt)
if isinstance(ckpt, six.string_types):
try:
ckpt = self.ckpt_list.index(ckpt)
except ValueError:
raise ValueError('ckpt: %s not in ckpt list: %s' %
(ckpt, self.ckpt_list))
elif isinstance(ckpt, six.string_types):
if not os.path.exists(ckpt):
raise ValueError('ckpt: %s not found' % ckpt)
path = ckpt
else:
raise ValueError('ckpt type not understood %s' % repr(ckpt))
path = os.path.join(self._save_dir, self.ckpt_list[ckpt])
meta_file = os.path.join(path, 'meta')
if not os.path.exists(meta_file):
raise RuntimeError('meta not found in restore dir: %s' % path)
state = RunState.from_str(open(meta_file).read())
log.info('restore from ckpt %s, ckpt-status: %s' % (path, repr(state)))
def fn(v):
def _fn(v):
vpath = os.path.join(path, v.name)
if F.io.is_persistable(v):
if os.path.exists(vpath):
......@@ -171,12 +193,12 @@ class Saver(object):
return False
F.io.load_vars(
self._exe, path, main_program=self._program, predicate=fn)
self._exe, path, main_program=self._program, predicate=_fn)
return state
class MonitoredExecutor(object):
"""A wrapper handling the train loop"""
"""An Executor wrapper handling the train loop"""
def __init__(
self,
......@@ -209,13 +231,18 @@ class MonitoredExecutor(object):
@property
def state(self):
"""doc"""
return self._state
def init_or_restore_variables(self):
def init_or_restore_variables(self, ckpt=-1):
"""
init vars or restore vars from model_dir
call before train
"""
# The order of this 2 steps really matters
# 1. init train
F.Executor(F.cuda_places()[0]).run(self._program.startup_program)
F.Executor(_get_one_place()).run(self._program.startup_program)
# 2. restore param
if self._warm_start_setting is not None:
if not os.path.exists(self._warm_start_setting.from_dir):
......@@ -224,29 +251,34 @@ class MonitoredExecutor(object):
log.info("warm start from %s" % self._warm_start_setting.from_dir)
if self._warm_start_setting.predicate_fn is not None:
def fn(v):
def _fn(v):
ret = self._warm_start_setting.predicate_fn(v)
if ret:
log.info('warm start: %s' % v.name)
return ret
F.io.load_vars(
F.Executor(F.cuda_places()[0]),
F.Executor(_get_one_place()),
self._warm_start_setting.from_dir,
main_program=self._program.train_program,
predicate=fn)
predicate=_fn)
else:
raise NotImplementedError()
self._saver = Saver(
self._model_dir,
F.Executor(F.cuda_places()[0]),
F.Executor(_get_one_place()),
program=self._program.train_program,
max_ckpt_to_keep=self._max_ckpt)
if self._saver.last_ckpt is not None:
self._state = self._saver.restore()
def freeze(self):
self._state = self._saver.restore(ckpt)
def _freeze(self):
"""
call before enter train loop
convert program to compiled program
will do nothing if loss is None i.e. not in train mode
"""
if self._loss is None:
log.debug('will not freeze a program without loss')
return
......@@ -278,8 +310,16 @@ class MonitoredExecutor(object):
startup_program=self._program.startup_program)
def __enter__(self):
"""
prepapre before enter train loop
"""
if F.core.is_compiled_with_cuda():
log.info('propeller runs in CUDA mode')
else:
log.info('propeller runs in CPU mode')
log.debug('freezing program')
self.freeze()
self._freeze()
log.debug('done freezing')
log.info('********** Start Loop ************')
# TODO init
......@@ -287,10 +327,13 @@ class MonitoredExecutor(object):
self.result = None
for h in self._hooks:
log.debug('train loop has hook %s' % h)
h.before_train()
h.before_train(self._program)
return self
def run(self, fetch_list=[], *args, **kwargs):
"""
wrapper for Executor.run
"""
#log.debug('Executor running step %d' % self._state.gstep)
if self._hooks:
fetch_list = [fetch_list]
......@@ -306,11 +349,12 @@ class MonitoredExecutor(object):
]
#if len(set(fetch_list)) != len(fetch_list):
# log.error('strange shit happend when fetch list has idetity tensors %s' % fetch_list)
#log.debug(fetch_list)
res = self._exe.run(self._program.train_program,
fetch_list=fetch_list,
*args,
**kwargs)
res = [self.merge_result(r) for r in res]
res = [self._merge_result(r) for r in res]
#log.debug(res)
res = util.unflatten(res, schema)
......@@ -330,6 +374,9 @@ class MonitoredExecutor(object):
return ret
def __exit__(self, err_type, err_value, trace):
"""
clean up things and report hook result when exit train loop
"""
if (err_type is None) or isinstance(err_value, (
F.core.EOFException, StopException, KeyboardInterrupt)):
try:
......@@ -344,7 +391,10 @@ class MonitoredExecutor(object):
log.exception('error occur during loop %s: %s' %
(err_type, err_value))
def merge_result(self, ls):
def _merge_result(self, ls):
"""
merge results from multi gpu cards
"""
dev_count = len(self._program.train_program._places) if isinstance(
self._program.train_program, F.compiler.CompiledProgram) else 1
if dev_count == 1:
......
......@@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""common ML train and eval procedure"""
from __future__ import print_function
from __future__ import absolute_import
from __future__ import unicode_literals
......@@ -28,7 +29,8 @@ from time import time
import paddle.fluid as F
import paddle.fluid.layers as L
from propeller.types import RunMode, StopException, SummaryRecord, StopException, ModelSpec, InferenceSpec, ProgramPair, RunConfig
from propeller.types import RunMode, StopException, SummaryRecord, StopException
from propeller.types import ModelSpec, InferenceSpec, ProgramPair, RunConfig
from propeller.paddle import summary, collection
from propeller.paddle.data.functional import Dataset
from propeller.paddle.train import distribution
......@@ -43,7 +45,7 @@ log = logging.getLogger(__name__)
__all__ = ['train_and_eval', 'Learner']
def get_summary_writer(path):
def _get_summary_writer(path):
summary_writer = None
try:
from tensorboardX import SummaryWriter
......@@ -54,7 +56,12 @@ def get_summary_writer(path):
return summary_writer
def log_eval_result(name, eval_result, swriter, state):
def _get_one_place():
return F.cuda_places()[0] if F.core.is_compiled_with_cuda(
) else F.cpu_places()[0]
def _log_eval_result(name, eval_result, swriter, state):
log.debug(eval_result)
printable = []
for n, val in six.iteritems(eval_result):
......@@ -71,7 +78,7 @@ def log_eval_result(name, eval_result, swriter, state):
log.info('******************************')
def build_net(model_fn, features, mode, params, run_config):
def _build_net(model_fn, features, mode, params, run_config):
model_spec = model_fn(
features=features, mode=mode, params=params, run_config=run_config)
......@@ -97,12 +104,14 @@ def build_net(model_fn, features, mode, params, run_config):
class Learner(object):
"""A Learner can train / eval / predict on a Dataset"""
def __init__(self,
model_class_or_model_fn,
run_config,
params=None,
warm_start_setting=None):
'''
"""
model_class_or_model_fn(callable|propeller.train.Model): `model_class_or_model_fn` be specified in 2 ways:
1. subclass of propeller.train.Model which implements:
1. \_\_init\_\_ (hyper_param, mode, run_config)
......@@ -121,58 +130,23 @@ class Learner(object):
params: any python object, will pass to your `model_fn` or `propeller.train.Model`
run_config (propeller.RunConfig): run_config.max_steps should not be None.
warm_start_setting (propeller.WarmStartSetting): Optional. warm start variable will overwrite model variable.
'''
"""
if run_config.model_dir is None:
raise ValueError('model_dir should specified in run_config')
if issubclass(model_class_or_model_fn, Model):
def model_fn(features, mode, params, run_config):
if mode != RunMode.PREDICT:
fea, label = features[:-1], features[-1]
else:
fea = features
model = model_class_or_model_fn(
params, mode, run_config=run_config)
pred = model.forward(fea)
if isinstance(pred, F.framework.Variable):
prediction = [pred]
else:
prediction = pred
if mode == RunMode.TRAIN:
loss = model.loss(pred, label)
model.backward(loss)
return ModelSpec(
loss=loss, predictions=prediction, mode=mode)
elif mode == RunMode.EVAL:
loss = model.loss(pred, label)
me = model.metrics(pred, label)
inf_spec = InferenceSpec(inputs=fea, outputs=prediction)
if 'loss' not in me:
me['loss'] = metrics.Mean(loss)
return ModelSpec(
loss=loss,
predictions=prediction,
metrics=me,
mode=mode,
inference_spec=inf_spec)
elif mode == RunMode.PREDICT:
return ModelSpec(predictions=prediction, mode=mode)
else:
raise RuntimeError('unknown run mode %s' % mode)
_model_fn = _build_model_fn(model_class_or_model_fn)
elif inspect.isfunction(model_class_or_model_fn):
model_fn = model_class_or_model_fn
_model_fn = model_class_or_model_fn
else:
raise ValueError('unknown model %s' % model_class_or_model_fn)
self.model_fn = model_fn
self.model_fn = _model_fn
self.params = params
self.run_config = run_config
self.warm_start_setting = warm_start_setting
def build_for_train(self, train_dataset):
def _build_for_train(self, train_dataset):
train_dataset.name = 'train'
train_program = F.Program()
startup_prog = F.Program()
......@@ -181,7 +155,7 @@ class Learner(object):
with collection.Collections() as collections:
log.info('Building Train Graph...')
fea = train_dataset.features()
model_spec = build_net(self.model_fn, fea, RunMode.TRAIN,
model_spec = _build_net(self.model_fn, fea, RunMode.TRAIN,
self.params, self.run_config)
log.info('Building Train Graph: Done')
......@@ -208,7 +182,7 @@ class Learner(object):
train_program=train_program,
startup_program=startup_prog), model_spec, summary_record
def build_for_eval(self, ds):
def _build_for_eval(self, ds):
ds.name = 'eval'
program = F.Program()
startup_prog = F.Program()
......@@ -217,7 +191,7 @@ class Learner(object):
with F.unique_name.guard():
log.info('Building Eval Graph')
fea = ds.features()
model_spec = build_net(self.model_fn, fea, RunMode.EVAL,
model_spec = _build_net(self.model_fn, fea, RunMode.EVAL,
self.params, self.run_config)
log.info('Done')
program = program.clone(for_test=True)
......@@ -227,7 +201,7 @@ class Learner(object):
return ProgramPair(
train_program=program, startup_program=startup_prog), model_spec
def build_for_predict(self, ds):
def _build_for_predict(self, ds):
ds.name = 'predict'
program = F.Program()
startup_prog = F.Program()
......@@ -236,7 +210,7 @@ class Learner(object):
with F.unique_name.guard():
log.info('Building Predict Graph')
fea = ds.features()
model_spec = build_net(self.model_fn, fea, RunMode.PREDICT,
model_spec = _build_net(self.model_fn, fea, RunMode.PREDICT,
self.params, self.run_config)
log.info('Done')
......@@ -249,11 +223,12 @@ class Learner(object):
train_program=program, startup_program=startup_prog), model_spec
def train(self, train_ds, train_hooks=[]):
"""train on a `Dataset`"""
if not isinstance(train_ds, Dataset):
raise ValueError('expect dataset to be instance of Dataset, got %s'
% repr(train_ds))
train_program, model_spec, summary_record = self.build_for_train(
train_program, model_spec, summary_record = self._build_for_train(
train_ds)
train_run_hooks = [
hooks.StopAtStepHook(self.run_config.max_steps,
......@@ -261,13 +236,16 @@ class Learner(object):
hooks.LoggingHook(
model_spec.loss,
summary_record=summary_record,
summary_writer=get_summary_writer(
summary_writer=_get_summary_writer(
os.path.join(self.run_config.model_dir, 'train_history')),
per_step=self.run_config.log_steps,
skip_step=self.run_config.skip_steps),
]
if model_spec.train_hooks is not None:
train_run_hooks.extend(model_spec.train_hooks)
train_run_hooks.extend(train_hooks)
train_executor = F.Executor(F.cuda_places()[0])
train_executor = F.Executor(_get_one_place())
mon_exe = MonitoredExecutor(
train_executor,
......@@ -297,24 +275,29 @@ class Learner(object):
return mon_exe.result
def evaluate(self, eval_dataset, eval_hooks=[]):
"""eval on a `Dataset`"""
if not isinstance(eval_dataset, Dataset):
raise ValueError('expect dataset to be instance of Dataset, got %s'
% repr(eval_dataset))
program, model_spec = self.build_for_eval(eval_dataset)
single_card_place = F.cuda_places()[0]
program, model_spec = self._build_for_eval(eval_dataset)
single_card_place = _get_one_place()
eval_executor = F.Executor(single_card_place)
eval_hooks = [
eval_run_hooks = [
hooks.StopAtStepHook(self.run_config.eval_max_steps,
self.run_config.eval_max_steps),
hooks.EvalHook(model_spec.metrics, )
]
if model_spec.eval_hooks is not None:
eval_run_hooks.extend(model_spec.eval_hooks)
eval_run_hooks.extend(eval_hooks)
mon_exe = MonitoredExecutor(
eval_executor,
program,
run_config=self.run_config,
run_hooks=eval_hooks)
run_hooks=eval_run_hooks)
mon_exe.init_or_restore_variables()
try:
......@@ -326,32 +309,43 @@ class Learner(object):
_, eval_result = mon_exe.result
summary_writer = get_summary_writer(
summary_writer = _get_summary_writer(
os.path.join(self.run_config.model_dir, 'eval_history'))
log_eval_result('eval', eval_result, summary_writer, mon_exe.state)
_log_eval_result('eval', eval_result, summary_writer, mon_exe.state)
return mon_exe.result
def predict(self, predict_dataset, ckpt=None, steps=-1, split_batch=True):
'''
def predict(self,
predict_dataset,
ckpt=-1,
ckpt_path=None,
steps=-1,
split_batch=True):
"""
Perform predictoin
will call `model_fn` and initiate user-specifed model in `propeller.RunMode.PREDICT` mode
Args:
infer_dataset (propeller.data.Dataset): should not `shuffle` or `repeat`
steps (int): steps to predict, if -1 is specifed, will stop when `StopException` is raised in `infer_dataset`
steps (int): steps to predict, if None is specifed,
will stop when `StopException` is raised in `infer_dataset`
ckpt_path (None|str): Path of a specific checkpoint to predict.
If None, the latest checkpoint in model_dir is used.
If there are no checkpoints in model_dir,
prediction is run with newly initialized Variables instead of ones restored from checkpoint.
ckpt (int): deprecated args
split_batch (bool): if True, prediction of each example in a batch is returned.
Yields:
Evaluated values of predictions tensors.
'''
"""
if not isinstance(predict_dataset, Dataset):
raise ValueError('expect dataset to be instance of Dataset, got %s'
% repr(predict_dataset))
program, model_spec = self.build_for_predict(predict_dataset)
single_card_place = F.cuda_places()[0]
program, model_spec = self._build_for_predict(predict_dataset)
single_card_place = _get_one_place()
executor = F.Executor(single_card_place)
pred_run_config = RunConfig(
run_steps=steps if steps == -1 else None,
......@@ -360,11 +354,12 @@ class Learner(object):
executor,
program,
run_config=pred_run_config, )
mon_exe.init_or_restore_variables()
mon_exe.init_or_restore_variables(ckpt
if ckpt_path is None else ckpt_path)
try:
with mon_exe:
log.info('Runining predict from dir: %s' % repr(mon_exe.state))
single_card_place = F.cuda_places()[0]
single_card_place = _get_one_place()
for data in predict_dataset.start(places=[single_card_place]):
res = mon_exe.run(fetch_list=model_spec.predictions,
feed=data)
......@@ -379,7 +374,7 @@ class Learner(object):
pass
def train_and_eval(_shit=None,
def train_and_eval(_placeholder=None,
model_class_or_model_fn=None,
params=None,
run_config=None,
......@@ -389,36 +384,27 @@ def train_and_eval(_shit=None,
train_hooks=[],
eval_hooks=[],
exporters=[]):
'''
"""
Perform train and evaluate procesure.
will call `model_fn` and initiate user-specifed model in `propeller.RunMode.PREDICT` mode
Args:
model_class_or_model_fn(callable|propeller.train.Model): `model_class_or_model_fn` be specified in 2 ways:
1. subclass of propeller.train.Model which implements:
1. \_\_init\_\_ (hyper_param, mode, run_config)
2. forward (features) => (prediction)
3. backword (loss) => None
4. loss (predictoin) => (loss)
5. metrics (optional) (prediction) => (dict of propeller.Metrics)
2. a model_fn takes following args:
1. features
2. param
3. mode
4. run_config(optional)
1. subclass of propeller.train.Model
2. a model_fn takes following args: 1. features; 2. param; 3. mode; 4. run_config(optional)
and returns a `propeller.ModelSpec`
params: any python object, will pass to your `model_fn` or `propeller.train.Model`
run_config (propeller.RunConfig): run_config.max_steps should not be None.
train_dataset (propeller.paddle.data.Dataset): training will stop if global_step > run_config.max_steps.
eval_dataset (propeller.paddle.data.Dataset|dict): Optional, if Dict of propeller.data.Dataset were specified, will perform evluatation on every evaluation sets and report results.
eval_dataset (propeller.paddle.data.Dataset|dict): Optional, if Dict of propeller.data.Dataset were specified,
will perform evluatation on every evaluation sets and report results.
warm_start_setting (propeller.WarmStartSetting): Optional. warm start variable will overwrite model variable.
train_hooks (list of propeller.paddle.train.RunHook): Optional.
eval_hooks (list of propeller.paddle.train.RunHook): Optional.
exporters (list of propeller.paddle.train.Exporter): Optional.
'''
if _shit is not None:
"""
if _placeholder is not None:
raise ValueError('specify keyword args to this function')
if model_class_or_model_fn is None or params is None or run_config is None or train_dataset is None:
raise ValueError(
......@@ -454,13 +440,13 @@ def train_and_eval(_shit=None,
params,
warm_start_setting=warm_start_setting)
class EvalHookOnTrainLoop(hooks.RunHook):
class _EvalHookOnTrainLoop(hooks.RunHook):
def __init__(self):
self.program, self.model_spec = est.build_for_eval(
self.program, self.model_spec = est._build_for_eval(
list(eval_dataset.values())[
0]) #eval_datasets must have same output shapes
self.summary_writers = {
ds_name: get_summary_writer(
ds_name: _get_summary_writer(
os.path.join(
os.path.join(run_config.model_dir, 'eval_history'),
ds_name))
......@@ -468,6 +454,7 @@ def train_and_eval(_shit=None,
}
def after_run(self, _, state):
"""doc"""
if state.step > run_config.skip_steps and state.gstep % run_config.eval_steps == 0:
eval_results = {}
for name, ds in six.iteritems(eval_dataset):
......@@ -478,7 +465,7 @@ def train_and_eval(_shit=None,
self.model_spec.metrics,
summary_writer=self.summary_writers[name], )
]
single_card_place = F.cuda_places()[0]
single_card_place = _get_one_place()
eval_executor = F.Executor(single_card_place)
mon_exe = MonitoredExecutor(
eval_executor,
......@@ -495,8 +482,8 @@ def train_and_eval(_shit=None,
eval_res = hook_results[
1] # hook_results: [StopAtStepHook, EvalHook, ...]
eval_results[name] = eval_res
log_eval_result(name, eval_res, self.summary_writers[name],
state)
_log_eval_result(name, eval_res,
self.summary_writers[name], state)
for exporter in exporters:
exporter.export(eval_executor, self.program,
self.model_spec, eval_results, state)
......@@ -505,6 +492,46 @@ def train_and_eval(_shit=None,
return eval_results
if distribution.status.is_master:
train_hooks.append(EvalHookOnTrainLoop())
train_hooks.append(_EvalHookOnTrainLoop())
res = est.train(train_dataset, train_hooks=train_hooks)
return res
def _build_model_fn(model_class):
def _model_fn(features, mode, params, run_config):
if mode != RunMode.PREDICT:
fea, label = features[:-1], features[-1]
else:
fea = features
model = model_class(params, mode, run_config=run_config)
pred = model.forward(fea)
if isinstance(pred, F.framework.Variable):
prediction = [pred]
else:
prediction = pred
if mode == RunMode.TRAIN:
loss = model.loss(pred, label)
model.backward(loss)
return ModelSpec(loss=loss, predictions=prediction, mode=mode)
elif mode == RunMode.EVAL:
loss = model.loss(pred, label)
me = model.metrics(pred, label)
inf_spec = InferenceSpec(inputs=fea, outputs=prediction)
if 'loss' not in me:
me['loss'] = metrics.Mean(loss)
return ModelSpec(
loss=loss,
predictions=prediction,
metrics=me,
mode=mode,
inference_spec=inf_spec)
elif mode == RunMode.PREDICT:
inf_spec = InferenceSpec(inputs=fea, outputs=prediction)
return ModelSpec(
predictions=prediction, mode=mode, inference_spec=inf_spec)
else:
raise RuntimeError('unknown run mode %s' % mode)
return _model_fn
......@@ -11,3 +11,4 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""server"""
......@@ -11,6 +11,9 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Never Never Never import paddle.fluid in main process, or any module would import fluid.
"""
from __future__ import division
from __future__ import absolute_import
......@@ -24,27 +27,27 @@ from time import sleep, time
import multiprocessing
import zmq
""" Never Never Never import paddle.fluid in main process, or any module would import fluid.
"""
log = logging.getLogger(__name__)
def profile(msg):
def decfn(fn):
def retfn(*args, **kwargs):
def _profile(msg):
def _decfn(fn):
def _retfn(*args, **kwargs):
start = time()
ret = fn(*args, **kwargs)
end = time()
log.debug('%s timecost: %.5f' % (msg, end - start))
return ret
return retfn
return _retfn
return decfn
return _decfn
class Predictor(object):
"""paddle predictor wrapper"""
def __init__(self, model_dir, device_idx=0):
import paddle.fluid as F
log.debug('create predictor on card %d' % device_idx)
......@@ -52,7 +55,7 @@ class Predictor(object):
config.enable_use_gpu(5000, device_idx)
self._predictor = F.core.create_paddle_predictor(config)
@profile('paddle')
@_profile('paddle')
def __call__(self, args):
for i, a in enumerate(args):
a.name = 'placeholder_%d' % i
......@@ -61,6 +64,7 @@ class Predictor(object):
def run_worker(model_dir, device_idx, endpoint="ipc://worker.ipc"):
"""worker process entrence"""
try:
log.debug("run_worker %s" % device_idx)
os.environ["CUDA_VISIBLE_DEVICES"] = os.getenv(
......@@ -97,6 +101,8 @@ def run_worker(model_dir, device_idx, endpoint="ipc://worker.ipc"):
class InferencePredictor(object):
"""control Predictor for multi gpu card"""
def __init__(self, backend_addr, model_dir, n_devices=1):
self.backend_addr = backend_addr
self.model_dir = model_dir
......@@ -104,6 +110,7 @@ class InferencePredictor(object):
self.children = []
def start(self):
"""doc"""
for device_idx in range(self.n_devices):
p = multiprocessing.Process(
target=run_worker,
......@@ -113,21 +120,27 @@ class InferencePredictor(object):
return self
def join(self):
"""doc"""
for p in self.children:
p.join()
def term(self):
"""doc"""
for p in self.children:
log.debug("terminating children %s" % repr(p))
p.terminate()
class InferenceProxy(object):
"""zmq proxy"""
def __init__(self):
"""doc"""
self.backend = None
self.frontend = None
def listen(self, frontend_addr, backend_addr):
"""doc"""
log.info("InferenceProxy starting...")
try:
context = zmq.Context(1)
......@@ -152,11 +165,15 @@ class InferenceProxy(object):
class InferenceServer(object):
"""start InferencePredictor and InferenceProxy"""
def __init__(self, model_dir, n_devices):
"""doc"""
self.model_dir = model_dir
self.n_devices = n_devices
def listen(self, port):
"""doc"""
frontend_addr = "tcp://*:%s" % port
backend_addr = "ipc://backend.ipc"
predictor = InferencePredictor(backend_addr, self.model_dir,
......
......@@ -13,6 +13,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""utils for server"""
from __future__ import division
from __future__ import absolute_import
......@@ -26,6 +27,7 @@ from propeller.service import interface_pb2
def slot_to_numpy(slot):
"""doc"""
if slot.type == interface_pb2.Slot.FP32:
dtype = np.float32
type_str = 'f'
......@@ -45,6 +47,7 @@ def slot_to_numpy(slot):
def numpy_to_slot(arr):
"""doc"""
if arr.dtype == np.float32:
dtype = interface_pb2.Slot.FP32
elif arr.dtype == np.int32:
......@@ -59,25 +62,30 @@ def numpy_to_slot(arr):
def slot_to_paddlearray(slot):
"""doc"""
import paddle.fluid.core as core
if slot.type == interface_pb2.Slot.FP32:
dtype = np.float32
type_str = 'f'
dtype = core.PaddleDType.FLOAT32
elif slot.type == interface_pb2.Slot.INT32:
dtype = np.int32
type_str = 'i'
dtype = core.PaddleDType.INT32
elif slot.type == interface_pb2.Slot.INT64:
dtype = np.int64
type_str = 'q'
dtype = core.PaddleDType.INT64
else:
raise RuntimeError('know type %s' % slot.type)
ret = core.PaddleTensor()
ret.shape = slot.dims
ret.dtype = dtype
num = len(slot.data) // struct.calcsize(type_str)
arr = struct.unpack('%d%s' % (num, type_str), slot.data)
ret = core.PaddleTensor(np.array(arr, dtype=dtype).reshape(slot.dims))
ret.data = core.PaddleBuf(arr)
return ret
def paddlearray_to_slot(arr):
"""doc"""
import paddle.fluid.core as core
if arr.dtype == core.PaddleDType.FLOAT32:
dtype = interface_pb2.Slot.FP32
......@@ -99,12 +107,14 @@ def paddlearray_to_slot(arr):
def nparray_list_serialize(arr_list):
"""doc"""
slot_list = [numpy_to_slot(arr) for arr in arr_list]
slots = interface_pb2.Slots(slots=slot_list)
return slots.SerializeToString()
def nparray_list_deserialize(string):
"""doc"""
slots = interface_pb2.Slots()
slots.ParseFromString(string)
return [slot_to_numpy(slot) for slot in slots.slots]
......@@ -72,6 +72,10 @@ def parse(filename):
elif proto.data_type == framework_pb2.VarType.INT8:
arr = np.array(
gen_arr(f.read(), 'B'), dtype=np.int8).reshape(proto.dims)
elif proto.data_type == framework_pb2.VarType.FP16:
arr = np.array(
gen_arr(f.read(), 'H'),
dtype=np.uint16).view(np.float16).reshape(proto.dims)
else:
raise RuntimeError('Unknown dtype %s' % proto.data_type)
......
......@@ -11,3 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
doc
"""
......@@ -11,6 +11,10 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Model template
"""
from __future__ import print_function
from __future__ import absolute_import
from __future__ import unicode_literals
......@@ -26,7 +30,11 @@ import numpy as np
@six.add_metaclass(abc.ABCMeta)
class Model():
class Model(object):
"""
template
"""
def __init__(self, config, mode):
"""
Args:
......@@ -39,9 +47,9 @@ class Model():
def forward(self, features):
"""
Args:
features (list of Tensor): depends on your Dataset.output_shapes
features (list of Tensor): inputs features that depends on your Dataset.output_shapes
Returns:
return (Tensor):
return (Tensor): prediction
"""
pass
......@@ -53,8 +61,6 @@ class Model():
label (Tensor): depends on your Dataset.output_shapes
Returns:
return (paddle scalar): loss
"""
pass
......
......@@ -11,6 +11,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Basic types"""
from __future__ import print_function
from __future__ import absolute_import
from __future__ import unicode_literals
......@@ -21,12 +23,15 @@ from collections import namedtuple
class RunMode(object):
"""model_fn will be called in 3 modes"""
TRAIN = 1
PREDICT = 2
EVAL = 3
class HParams(object):
"""Hyper paramerter"""
def __init__(self, **kwargs):
for k, v in kwargs.items():
self.__dict__[k] = v
......@@ -45,30 +50,36 @@ class HParams(object):
def __setitem__(self, key, val):
self.__dict__[key] = val
@staticmethod
def from_json(self, json_str):
@classmethod
def from_json(cls, json_str):
"""doc"""
d = json.loads(json_str)
if type(d) != dict:
raise ValueError('json object must be dict.')
return HParams.from_dict(d)
def get(self, key, default=None):
"""doc"""
return self.__dict__.get(key, default)
@staticmethod
def from_dict(self, d):
@classmethod
def from_dict(cls, d):
"""doc"""
if type(d) != dict:
raise ValueError('input must be dict.')
hp = HParams(**d)
return hp
def to_json(self):
"""doc"""
return json.dumps(self.__dict__)
def to_dict(self):
"""doc"""
return self.__dict__
def join(self, other):
"""doc"""
if not isinstance(other, HParams):
raise ValueError('input must be HParams instance.')
self.__dict__.update(**other.__dict__)
......@@ -95,9 +106,12 @@ ModelSpec = namedtuple('ModelSpec', [
'metrics',
'mode',
'inference_spec',
'train_hooks',
'eval_hooks',
])
ModelSpec.__new__.__defaults__ = (None, ) * len(ModelSpec._fields)
class StopException(Exception):
"""doc"""
pass
......@@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""global utils"""
from __future__ import print_function
from __future__ import absolute_import
from __future__ import unicode_literals
......@@ -31,6 +32,7 @@ log = logging.getLogger(__name__)
def ArgumentParser(name):
"""predefined argparser"""
parser = argparse.ArgumentParser('propeller model')
parser.add_argument('--run_config', type=str, default='')
parser.add_argument(
......@@ -59,6 +61,7 @@ def _get_dict_from_environ_or_json_or_file(args, env_name):
def parse_file(filename):
"""useless api"""
d = _get_dict_from_environ_or_json_or_file(filename, None)
if d is None:
raise ValueError('file(%s) not found' % filename)
......@@ -66,6 +69,7 @@ def parse_file(filename):
def parse_runconfig(args=None):
"""get run_config from env or file"""
d = _get_dict_from_environ_or_json_or_file(args.run_config,
'PROPELLER_RUNCONFIG')
if d is None:
......@@ -74,6 +78,7 @@ def parse_runconfig(args=None):
def parse_hparam(args=None):
"""get hparam from env or file"""
if args is not None:
hparam_strs = reduce(list.__add__, args.hparam)
else:
......@@ -91,6 +96,7 @@ def parse_hparam(args=None):
def flatten(s):
"""doc"""
assert is_struture(s)
schema = [len(ss) for ss in s]
flt = list(itertools.chain(*s))
......@@ -98,6 +104,7 @@ def flatten(s):
def unflatten(structure, schema):
"""doc"""
start = 0
res = []
for _range in schema:
......@@ -107,10 +114,12 @@ def unflatten(structure, schema):
def is_struture(s):
"""doc"""
return isinstance(s, list) or isinstance(s, tuple)
def map_structure(func, s):
"""same sa tf.map_structure"""
if isinstance(s, list) or isinstance(s, tuple):
return [map_structure(func, ss) for ss in s]
elif isinstance(s, dict):
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册