未验证 提交 605242a8 编写于 作者: meteor135's avatar meteor135 提交者: GitHub

Fluid API cleanup (#50547)

上级 4d12c12d
......@@ -18,6 +18,7 @@ find_package(OpenSSL REQUIRED)
message(STATUS "ssl:" ${OPENSSL_SSL_LIBRARY})
message(STATUS "crypto:" ${OPENSSL_CRYPTO_LIBRARY})
message(STATUS "WITH_SNAPPY:" ${WITH_SNAPPRY})
add_library(ssl SHARED IMPORTED GLOBAL)
set_property(TARGET ssl PROPERTY IMPORTED_LOCATION ${OPENSSL_SSL_LIBRARY})
......
......@@ -470,9 +470,11 @@ def recompute_sequential(ctx, functions, *args, **kwargs):
Examples:
.. code-block:: python
model = paddle.nn.Sequential(...)
input = recompute_sequential({'segments' : 1}, model, input)
import paddle
from paddle.incubate.distributed.fleet import recompute_sequential
input = paddle.ones(shape=[8, 10])
model = paddle.nn.Sequential(paddle.nn.Linear(10, 10), paddle.nn.Linear(10, 2))
output = recompute_sequential({'segments' : 1}, model, input)
"""
segments = ctx.get('segments', 1)
preserve_rng_state = ctx.get('preserve_rng_state', True)
......
......@@ -1027,6 +1027,13 @@ class Executor:
# of fleet executor with standalone executor is ready.
self._fleet_executor_with_standalone = False
self.op_role_key = core.op_proto_and_checker_maker.kOpRoleAttrName()
def _is_optimizer_op(self, op):
return self.op_role_key in op.attr_names and int(
op.all_attrs()[self.op_role_key]
) & int(core.op_proto_and_checker_maker.OpRole.Optimize)
def __del__(self):
# NOTE(Ruibiao): The manually call of clear is required. Because in Python, executor_cache
# may not immediately destructed after Executor instance deleted (so does not the _StandaloneExecutor),
......@@ -2097,6 +2104,137 @@ class Executor:
dataset.set_thread(pipeline_opt["concurrency_list"][0] * pipeline_num)
return pipeline_num
def split_program_by_device(self, program):
ops_list = []
type_list = []
pre = None
type_cpu = "cpu"
for op in program.global_block().ops:
if self._is_optimizer_op(op):
break
if op.has_attr("op_device"):
cur_attr = (
op.attr("op_device")
if op.attr("op_device") != ""
else type_cpu
)
if pre is None or pre != cur_attr:
ops_list.append([])
type_list.append(cur_attr)
ops_list[-1].append(op)
pre = cur_attr
l = len(type_list)
i = 0
type_heter = None
while i < l:
while i < l and type_list[i] == type_cpu:
i += 1
if i == l:
break
type_heter = type_list[i]
i += 1
start = i
valid = True
while i < l and type_list[i] != type_heter:
if type_list[i] != type_cpu:
valid = False
break
i += 1
if i == l:
break
elif not valid:
continue
for j in range(start, i):
for op in ops_list[j]:
op._set_attr("op_device", type_heter)
type_list[j] = type_heter
j += 1
pre = None
merged_ops_list = []
merged_type_list = []
for i in range(l):
if pre is None or pre != type_list[i]:
merged_ops_list.append([])
merged_type_list.append(type_list[i])
merged_ops_list[-1].extend(ops_list[i])
pre = type_list[i]
data_vars = set()
for k in program.global_block().vars:
var = program.global_block().var(k)
if not var.persistable:
data_vars.add(var.name)
l = len(merged_ops_list)
inputs_pre = set()
outputs_pre = set()
in_from_pre = [[] for i in range(l)]
for i in range(l):
inputs = set()
outputs = set()
for op in merged_ops_list[i]:
for input in op.input_names:
for tmp in op.input(input):
if tmp not in outputs:
inputs.add(tmp)
for output in op.output_names:
for tmp in op.output(output):
outputs.add(tmp)
if i == 0:
in_from_pre[i] = []
elif i == 1:
in_from_pre[i] = (outputs_pre | data_vars) & inputs
else:
in_from_pre[i] = outputs_pre & inputs
inputs_pre = copy.deepcopy(inputs)
outputs_pre = copy.deepcopy(outputs)
l = len(in_from_pre)
start_list = []
end_list = []
send_list = [[] for i in range(l)]
sum = 0
program_list = []
for i in range(l):
start_list.append(sum)
end_list.append(sum + len(merged_ops_list[i]) - 1)
sum += len(merged_ops_list[i])
if i < l - 1:
send_list[i].extend(list(in_from_pre[i + 1]))
prog = program.clone()
if merged_type_list[i] != type_cpu:
prog = prog._prune_with_input(
list(in_from_pre[i]), list(send_list[i])
)
program_list.append(prog)
else:
program_list.append(prog)
recv_list = [list(i) for i in in_from_pre]
found = False
heter_index = None
for i in range(len(merged_type_list)):
t = merged_type_list[i]
if t != type_cpu:
if found:
print("only one region of program can be heter")
found = True
heter_index = i
if heter_index is None:
print("warning: non heter program")
return None
else:
return [
start_list[heter_index],
end_list[heter_index],
send_list[heter_index],
recv_list[heter_index],
program_list[heter_index],
]
def _prepare_trainer(
self,
program=None,
......@@ -2126,11 +2264,7 @@ class Executor:
assert len(fetch_list) == len(fetch_info)
compiled = isinstance(program, compiler.CompiledProgram)
if is_heter:
from paddle.incubate.fleet.parameter_server.pslib import fleet
from paddle.fluid.incubate.fleet.utils.fleet_util import FleetUtil
fu = FleetUtil()
ret = fu.split_program_by_device(program)
ret = self.split_program_by_device(program)
if not compiled:
# TODO: Need a better way to distinguish and specify different execution mode
if program._pipeline_opt:
......
#!/bin/bash
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# start pserver0
python fleet_deep_ctr.py \
--role pserver \
--endpoints 127.0.0.1:7000,127.0.0.1:7001 \
--current_endpoint 127.0.0.1:7000 \
--trainers 2 \
> pserver0.log 2>&1 &
# start pserver1
python fleet_deep_ctr.py \
--role pserver \
--endpoints 127.0.0.1:7000,127.0.0.1:7001 \
--current_endpoint 127.0.0.1:7001 \
--trainers 2 \
> pserver1.log 2>&1 &
# start trainer0
python fleet_deep_ctr.py \
--role trainer \
--endpoints 127.0.0.1:7000,127.0.0.1:7001 \
--trainers 2 \
--trainer_id 0 \
> trainer0.log 2>&1 &
# start trainer1
python fleet_deep_ctr.py \
--role trainer \
--endpoints 127.0.0.1:7000,127.0.0.1:7001 \
--trainers 2 \
--trainer_id 1 \
> trainer1.log 2>&1 &
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import tarfile
import os
import paddle
import paddle.distributed.fleet as fleet
from paddle.fluid.log_helper import get_logger
logger = get_logger(
"paddle", logging.INFO, fmt='%(asctime)s - %(levelname)s - %(message)s'
)
DATA_URL = "http://paddle-ctr-data.bj.bcebos.com/avazu_ctr_data.tgz"
DATA_MD5 = "c11df99fbd14e53cd4bfa6567344b26e"
"""
avazu_ctr_data/train.txt
avazu_ctr_data/infer.txt
avazu_ctr_data/test.txt
avazu_ctr_data/data.meta.txt
"""
def download_file():
file_name = "avazu_ctr_data"
path = paddle.dataset.common.download(DATA_URL, file_name, DATA_MD5)
dir_name = os.path.dirname(path)
text_file_dir_name = os.path.join(dir_name, file_name)
if not os.path.exists(text_file_dir_name):
tar = tarfile.open(path, "r:gz")
tar.extractall(dir_name)
return text_file_dir_name
def load_dnn_input_record(sent):
return list(map(int, sent.split()))
def load_lr_input_record(sent):
res = []
for _ in [x.split(':') for x in sent.split()]:
res.append(int(_[0]))
return res
class DatasetCtrReader(fleet.MultiSlotDataGenerator):
def generate_sample(self, line):
def iter():
fs = line.strip().split('\t')
dnn_input = load_dnn_input_record(fs[0])
lr_input = load_lr_input_record(fs[1])
click = [int(fs[2])]
yield ("dnn_data", dnn_input), ("lr_data", lr_input), (
"click",
click,
)
return iter
def prepare_data():
"""
load data meta info from path, return (dnn_input_dim, lr_input_dim)
"""
file_dir_name = download_file()
meta_file_path = os.path.join(file_dir_name, 'data.meta.txt')
train_file_path = os.path.join(file_dir_name, 'train.txt')
with open(meta_file_path, "r") as f:
lines = f.readlines()
err_info = "wrong meta format"
assert len(lines) == 2, err_info
assert (
'dnn_input_dim:' in lines[0] and 'lr_input_dim:' in lines[1]
), err_info
res = map(int, [_.split(':')[1] for _ in lines])
res = list(res)
dnn_input_dim = res[0]
lr_input_dim = res[1]
logger.info('dnn input dim: %d' % dnn_input_dim)
logger.info('lr input dim: %d' % lr_input_dim)
return dnn_input_dim, lr_input_dim, train_file_path
if __name__ == "__main__":
pairwise_reader = DatasetCtrReader()
pairwise_reader.run_from_stdin()
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import logging
import time
import paddle
import paddle.fluid as fluid
import paddle.fluid.incubate.fleet.base.role_maker as role_maker
from paddle.incubate.fleet.parameter_server.distribute_transpiler import (
fleet,
)
from paddle.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import (
StrategyFactory,
)
from paddle.fluid.log_helper import get_logger
import ctr_dataset_reader
logger = get_logger(
"fluid", logging.INFO, fmt='%(asctime)s - %(levelname)s - %(message)s'
)
def parse_args():
parser = argparse.ArgumentParser(description="PaddlePaddle Fleet ctr")
# the following arguments is used for distributed train, if is_local == false, then you should set them
parser.add_argument(
'--role',
type=str,
default='pserver', # trainer or pserver
help='The path for model to store (default: models)',
)
parser.add_argument(
'--endpoints',
type=str,
default='127.0.0.1:6000',
help='The pserver endpoints, like: 127.0.0.1:6000,127.0.0.1:6001',
)
parser.add_argument(
'--current_endpoint',
type=str,
default='127.0.0.1:6000',
help='The path for model to store (default: 127.0.0.1:6000)',
)
parser.add_argument(
'--trainer_id',
type=int,
default=0,
help='The path for model to store (default: models)',
)
parser.add_argument(
'--trainers',
type=int,
default=1,
help='The num of trainers, (default: 1)',
)
return parser.parse_args()
def model():
(
dnn_input_dim,
lr_input_dim,
train_file_path,
) = ctr_dataset_reader.prepare_data()
""" network definition """
dnn_data = paddle.static.data(
name="dnn_data",
shape=[-1, 1],
dtype="int64",
lod_level=1,
)
lr_data = paddle.static.data(
name="lr_data",
shape=[-1, 1],
dtype="int64",
lod_level=1,
)
label = paddle.static.data(
name="click",
shape=[-1, 1],
dtype="int64",
lod_level=0,
)
datas = [dnn_data, lr_data, label]
# build dnn model
dnn_layer_dims = [128, 64, 32, 1]
dnn_embedding = fluid.layers.embedding(
is_distributed=False,
input=dnn_data,
size=[dnn_input_dim, dnn_layer_dims[0]],
param_attr=fluid.ParamAttr(
name="deep_embedding",
initializer=paddle.nn.initializer.Constant(value=0.01),
),
is_sparse=True,
)
dnn_pool = paddle.static.nn.sequence_lod.sequence_pool(
input=dnn_embedding, pool_type="sum"
)
dnn_out = dnn_pool
for i, dim in enumerate(dnn_layer_dims[1:]):
fc = paddle.static.nn.fc(
x=dnn_out,
size=dim,
activation="relu",
weight_attr=fluid.ParamAttr(
initializer=paddle.nn.initializer.Constant(value=0.01)
),
name='dnn-fc-%d' % i,
)
dnn_out = fc
# build lr model
lr_embbding = fluid.layers.embedding(
is_distributed=False,
input=lr_data,
size=[lr_input_dim, 1],
param_attr=fluid.ParamAttr(
name="wide_embedding",
initializer=paddle.nn.initializer.Constant(value=0.01),
),
is_sparse=True,
)
lr_pool = paddle.static.nn.sequence_lod.sequence_pool(
input=lr_embbding, pool_type="sum"
)
merge_layer = paddle.concat([dnn_out, lr_pool], axis=1)
predict = paddle.static.nn.fc(x=merge_layer, size=2, activation='softmax')
acc = paddle.static.accuracy(input=predict, label=label)
auc_var, batch_auc_var, auc_states = paddle.static.auc(
input=predict, label=label
)
cost = paddle.nn.functional.cross_entropy(
input=predict, label=label, reduction='none', use_softmax=False
)
avg_cost = paddle.mean(x=cost)
return datas, avg_cost, predict, train_file_path
def train(args):
datas, avg_cost, predict, train_file_path = model()
endpoints = args.endpoints.split(",")
if args.role.upper() == "PSERVER":
current_id = endpoints.index(args.current_endpoint)
else:
current_id = 0
role = role_maker.UserDefinedRoleMaker(
current_id=current_id,
role=role_maker.Role.WORKER
if args.role.upper() == "TRAINER"
else role_maker.Role.SERVER,
worker_num=args.trainers,
server_endpoints=endpoints,
)
exe = fluid.Executor(fluid.CPUPlace())
fleet.init(role)
strategy = StrategyFactory.create_half_async_strategy()
optimizer = fluid.optimizer.SGD(learning_rate=0.0001)
optimizer = fleet.distributed_optimizer(optimizer, strategy)
optimizer.minimize(avg_cost)
if fleet.is_server():
logger.info("run pserver")
fleet.init_server()
fleet.run_server()
elif fleet.is_worker():
logger.info("run trainer")
exe.run(fleet.startup_program)
fleet.init_worker()
thread_num = 2
filelist = []
for _ in range(thread_num):
filelist.append(train_file_path)
# config dataset
dataset = fluid.DatasetFactory().create_dataset()
dataset.set_batch_size(128)
dataset.set_use_var(datas)
pipe_command = 'python ctr_dataset_reader.py'
dataset.set_pipe_command(pipe_command)
dataset.set_filelist(filelist)
dataset.set_thread(thread_num)
for epoch_id in range(10):
logger.info("epoch {} start".format(epoch_id))
pass_start = time.time()
dataset.set_filelist(filelist)
exe.train_from_dataset(
program=fleet.main_program,
dataset=dataset,
fetch_list=[avg_cost],
fetch_info=["cost"],
print_period=100,
debug=False,
)
pass_time = time.time() - pass_start
logger.info(
"epoch {} finished, pass_time {}".format(epoch_id, pass_time)
)
fleet.stop_worker()
if __name__ == "__main__":
args = parse_args()
train(args)
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""HDFS Utils."""
import os
import sys
import subprocess
import multiprocessing
from datetime import datetime
import re
import copy
import errno
import time
import logging
# from . import fs
from paddle.distributed.fleet.utils.fs import (
FS,
LocalFS,
FSFileExistsError,
FSFileNotExistsError,
ExecuteError,
FSTimeOut,
FSShellCmdAborted,
)
from paddle.fluid import core
import functools
import shutil
__all__ = ["HDFSClient"]
def _handle_errors(max_time_out=None):
def decorator(f):
@functools.wraps(f)
def handler(*args, **kwargs):
o = args[0]
time_out = max_time_out
if time_out is None:
time_out = float(o._time_out) / 1000.0
else:
time_out /= 1000.0
inter = float(o._sleep_inter) / 1000.0
start = time.time()
last_print_time = start
while True:
try:
return f(*args, **kwargs)
# important: only ExecuteError need to retry
except ExecuteError as e:
if time.time() - start >= time_out:
raise FSTimeOut(
"args:{} timeout:{}".format(
args, time.time() - start
)
)
time.sleep(inter)
if time.time() - last_print_time > 30:
print(
"hadoop operator timeout:args:{} timeout:{}".format(
args, time.time() - start
)
)
last_print_time = time.time()
return handler
return decorator
class HDFSClient(FS):
def __init__(
self,
hadoop_home,
configs,
time_out=5 * 60 * 1000, # ms
sleep_inter=1000,
): # ms
# Raise exception if JAVA_HOME not exists.
self.pre_commands = []
hadoop_bin = '%s/bin/hadoop' % hadoop_home
self.pre_commands.append(hadoop_bin)
dfs = 'fs'
self.pre_commands.append(dfs)
if configs:
for k, v in configs.items():
config_command = '-D%s=%s' % (k, v)
self.pre_commands.append(config_command)
self._time_out = time_out
self._sleep_inter = sleep_inter
self._base_cmd = " ".join(self.pre_commands)
self._bd_err_re = re.compile(
r'\s?responseErrorMsg\s?\:.*, errorCode\:\s?[0-9]+, path\:'
)
def _run_cmd(self, cmd, redirect_stderr=False):
exe_cmd = "{} -{}".format(self._base_cmd, cmd)
ret, output = core.shell_execute_cmd(exe_cmd, 0, 0, redirect_stderr)
ret = int(ret)
if ret == 134:
raise FSShellCmdAborted(cmd)
return ret, output.splitlines()
@_handle_errors()
def list_dirs(self, fs_path):
if not self.is_exist(fs_path):
return []
dirs, files = self._ls_dir(fs_path)
return dirs
@_handle_errors()
def ls_dir(self, fs_path):
"""
list directory under fs_path, and only give the pure name, not include the fs_path
"""
if not self.is_exist(fs_path):
return [], []
return self._ls_dir(fs_path)
def _ls_dir(self, fs_path):
cmd = "ls {}".format(fs_path)
ret, lines = self._run_cmd(cmd)
if ret != 0:
raise ExecuteError(cmd)
dirs = []
files = []
for line in lines:
arr = line.split()
if len(arr) != 8:
continue
p = os.path.basename(arr[7])
if arr[0][0] == 'd':
dirs.append(p)
else:
files.append(p)
return dirs, files
def _test_match(self, lines):
for l in lines:
m = self._bd_err_re.match(l)
if m is not None:
return m
return None
@_handle_errors()
def is_dir(self, fs_path):
if not self.is_exist(fs_path):
return False
return self._is_dir(fs_path)
def _is_dir(self, fs_path):
cmd = "test -d {}".format(fs_path, redirect_stderr=True)
ret, lines = self._run_cmd(cmd)
if ret:
# other error
if self._test_match(lines):
raise ExecuteError(cmd)
return False
return True
def is_file(self, fs_path):
if not self.is_exist(fs_path):
return False
return not self._is_dir(fs_path)
@_handle_errors()
def is_exist(self, fs_path):
cmd = "ls {} ".format(fs_path)
ret, out = self._run_cmd(cmd, redirect_stderr=True)
if ret != 0:
for l in out:
if "No such file or directory" in l:
return False
raise ExecuteError(cmd)
return True
# can't retry
def upload(self, local_path, fs_path):
if self.is_exist(fs_path):
raise FSFileExistsError("{} exists".format(fs_path))
local = LocalFS()
if not local.is_exist(local_path):
raise FSFileNotExistsError("{} not exists".format(local_path))
return self._try_upload(local_path, fs_path)
@_handle_errors()
def _try_upload(self, local_path, fs_path):
cmd = "put {} {}".format(local_path, fs_path)
ret = 0
try:
ret, lines = self._run_cmd(cmd)
if ret != 0:
raise ExecuteError(cmd)
except Exception as e:
self.delete(fs_path)
raise e
# can't retry
def download(self, fs_path, local_path):
if self.is_exist(local_path):
raise FSFileExistsError("{} exists".format(local_path))
if not self.is_exist(fs_path):
raise FSFileNotExistsError("{} not exits".format(fs_path))
return self._try_download(fs_path, local_path)
@_handle_errors()
def _try_download(self, fs_path, local_path):
cmd = "get {} {}".format(fs_path, local_path)
ret = 0
try:
ret, lines = self._run_cmd(cmd)
if ret != 0:
raise ExecuteError(cmd)
except Exception as e:
local_fs = LocalFS()
local_fs.delete(local_path)
raise e
@_handle_errors()
def mkdirs(self, fs_path):
if self.is_exist(fs_path):
return
out_hdfs = False
cmd = "mkdir {} ".format(fs_path)
ret, out = self._run_cmd(cmd, redirect_stderr=True)
if ret != 0:
for l in out:
if "No such file or directory" in l:
out_hdfs = True
break
if not out_hdfs:
raise ExecuteError(cmd)
if out_hdfs and not self.is_exist(fs_path):
cmd = "mkdir -p {}".format(fs_path)
ret, lines = self._run_cmd(cmd)
if ret != 0:
raise ExecuteError(cmd)
def mv(self, fs_src_path, fs_dst_path, overwrite=False, test_exists=True):
if overwrite and self.is_exist(fs_dst_path):
self.delete(fs_dst_path)
if test_exists:
if not self.is_exist(fs_src_path):
raise FSFileNotExistsError(
"{} is not exists".format(fs_src_path)
)
if self.is_exist(fs_dst_path):
raise FSFileExistsError("{} exists already".format(fs_dst_path))
return self._try_mv(fs_src_path, fs_dst_path)
@_handle_errors()
def _try_mv(self, fs_src_path, fs_dst_path):
cmd = "mv {} {}".format(fs_src_path, fs_dst_path)
ret = 0
try:
ret, _ = self._run_cmd(cmd)
if ret != 0:
raise ExecuteError(cmd)
except Exception as e:
if not self.is_exist(fs_src_path) and self.is_exist(fs_dst_path):
return
raise e
def _rmr(self, fs_path):
cmd = "rmr {}".format(fs_path)
ret, _ = self._run_cmd(cmd)
if ret != 0:
raise ExecuteError(cmd)
def _rm(self, fs_path):
cmd = "rm {}".format(fs_path)
ret, _ = self._run_cmd(cmd)
if ret != 0:
raise ExecuteError(cmd)
@_handle_errors()
def delete(self, fs_path):
if not self.is_exist(fs_path):
return
is_dir = self._is_dir(fs_path)
if is_dir:
return self._rmr(fs_path)
return self._rm(fs_path)
def touch(self, fs_path, exist_ok=True):
if self.is_exist(fs_path):
if exist_ok:
return
raise FSFileExistsError
return self._touchz(fs_path)
@_handle_errors()
def _touchz(self, fs_path):
cmd = "touchz {}".format(fs_path)
ret, _ = self._run_cmd(cmd)
if ret != 0:
raise ExecuteError
def need_upload_download(self):
return True
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Http Server."""
import logging
from http.server import HTTPServer
import http.server as SimpleHTTPServer
import time
import threading
import socket
def get_logger(name, level, fmt):
logger = logging.getLogger(name)
logger.setLevel(level)
handler = logging.FileHandler('http.log', mode='w')
formatter = logging.Formatter(fmt=fmt)
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
_http_server_logger = get_logger(
__name__, logging.INFO, fmt='%(asctime)s-%(levelname)s: %(message)s'
)
class KVHandler(SimpleHTTPServer.SimpleHTTPRequestHandler):
"""
kv handler class for kv http server,
it defines the way to get/set kv in server.
"""
def do_GET(self):
"""
get method for kv handler, get value according to key.
"""
log_str = "GET " + self.address_string() + self.path
paths = self.path.split('/')
if len(paths) < 3:
print('len of request path must be 3: ' + self.path)
self.send_status_code(400)
return
_, scope, key = paths
with self.server.kv_lock:
value = self.server.kv.get(scope, {}).get(key)
if value is None:
log_str += ' , key not found: ' + key
self.send_status_code(404)
else:
log_str += ' , key found: ' + key
self.send_response(200)
self.send_header("Content-Length", str(len(value)))
self.end_headers()
self.wfile.write(value)
_http_server_logger.info(log_str)
def do_PUT(self):
"""
put method for kv handler, set value according to key.
"""
log_str = "PUT " + self.address_string() + self.path
paths = self.path.split('/')
if len(paths) < 3:
print('len of request path must be 3: ' + self.path)
self.send_status_code(400)
return
_, scope, key = paths
content_length = int(self.headers['Content-Length'])
try:
value = self.rfile.read(content_length)
except:
print("receive error invalid request")
self.send_status_code(404)
return
with self.server.kv_lock:
if self.server.kv.get(scope) is None:
self.server.kv[scope] = {}
self.server.kv[scope][key] = value
self.send_status_code(200)
_http_server_logger.info(log_str)
def do_DELETE(self):
"""
delete method for kv handler, set value according to key.
"""
log_str = "DELETE " + self.address_string() + self.path
paths = self.path.split('/')
if len(paths) < 3:
print('len of request path must be 3: ' + self.path)
self.send_status_code(400)
return
_, scope, key = paths
with self.server.delete_kv_lock:
if self.server.delete_kv.get(scope) is None:
self.server.delete_kv[scope] = []
self.server.delete_kv[scope].append(key)
self.send_status_code(200)
_http_server_logger.info(log_str)
def log_message(self, format, *args):
"""
ignore all logging messages in kv handler.
"""
pass
def send_status_code(self, code):
"""
send status code back to client.
"""
self.send_response(code)
self.send_header("Content-Length", 0)
self.end_headers()
class KVHTTPServer(HTTPServer):
"""
it is a http server storing kv pairs.
"""
def __init__(self, port, handler):
"""Init."""
super().__init__(('', port), handler)
self.delete_kv_lock = threading.Lock()
self.delete_kv = {}
self.kv_lock = threading.Lock()
self.kv = {}
def get_deleted_size(self, key):
"""
get deleted size in key.
"""
ret = 0
with self.delete_kv_lock:
ret = self.delete_kv.get(key, 0)
return ret
class KVServer:
"""
it is a server storing kv pairs, has a http server inside.
"""
def __init__(self, port, size={}):
"""Init."""
self.http_server = KVHTTPServer(port, KVHandler)
self.listen_thread = None
self.size = {}
def start(self):
"""
start server until user calls stop to let it quit.
"""
self.listen_thread = threading.Thread(
target=lambda: self.http_server.serve_forever()
)
self.listen_thread.start()
def stop(self):
"""
stop server and clear its resources.
"""
self.http_server.shutdown()
self.listen_thread.join()
self.http_server.server_close()
def should_stop(self):
"""
return whether the server should stop.
Returns:
ret(bool): whether the server should stop
"""
for key in self.size:
s = self.http_server.get_deleted_size(key)
if s != self.size.get(key, 0):
return False
return True
......@@ -20,10 +20,10 @@ from test_auto_checkpoint import AutoCheckPointACLBase
import paddle
import paddle.fluid as fluid
import paddle.fluid.incubate.checkpoint.auto_checkpoint as acp
import paddle.fluid.incubate.fleet.base.role_maker as role_maker
import paddle.incubate.distributed.fleet.role_maker as role_maker
from paddle.distributed.fleet.utils.fs import HDFSClient, LocalFS
from paddle.fluid.incubate.fleet.collective import fleet
from paddle.fluid.tests.unittests.auto_checkpoint_utils import get_logger
from paddle.incubate.distributed.fleet.collective import fleet
paddle.enable_static()
logger = get_logger()
......
......@@ -17,7 +17,7 @@ import unittest
import paddle
import paddle.fluid as fluid
import paddle.fluid.incubate.fleet.base.role_maker as role_maker
import paddle.incubate.distributed.fleet.role_maker as role_maker
from paddle.fluid.transpiler.distribute_transpiler import (
DistributeTranspilerConfig,
ServerRuntimeConfig,
......
......@@ -17,11 +17,11 @@ import unittest
import paddle
import paddle.fluid as fluid
import paddle.fluid.incubate.fleet.base.role_maker as role_maker
import paddle.incubate.distributed.fleet.role_maker as role_maker
from paddle.distributed.fleet.utils.fs import HDFSClient, LocalFS
from paddle.fluid.incubate.checkpoint.auto_checkpoint import ExeTrainStatus
from paddle.fluid.incubate.checkpoint.checkpoint_saver import CheckpointSaver
from paddle.fluid.incubate.fleet.collective import fleet
from paddle.incubate.distributed.fleet.collective import fleet
class FleetTest(unittest.TestCase):
......@@ -134,4 +134,5 @@ class FleetTest(unittest.TestCase):
if __name__ == '__main__':
paddle.enable_static()
unittest.main()
......@@ -20,9 +20,8 @@ import unittest
import numpy as np
import paddle.fluid.incubate.fleet.utils.utils as utils
from paddle.dataset.common import download
from paddle.fluid.incubate.fleet.utils.fleet_util import FleetUtil
from paddle.incubate.distributed.fleet.fleet_util import FleetUtil
class TestFleetUtils(unittest.TestCase):
......@@ -199,11 +198,11 @@ class TestFleetUtils(unittest.TestCase):
os.path.join(self.train_dir, "join_main_program.pbtxt"),
)
is_text = True
program = utils.load_program(program_path, is_text)
fleet_util = FleetUtil()
program = fleet_util.load_program(program_path, is_text)
output_dir = os.path.join(data_dir, self.train_dir)
output_filename_1 = "draw_prog_1"
output_filename_2 = "draw_prog_2"
fleet_util = FleetUtil()
fleet_util.draw_from_program_file(
program_path, is_text, output_dir, output_filename_1
)
......
......@@ -28,7 +28,7 @@ import unittest
import paddle
import paddle.fluid as fluid
from paddle.fluid.incubate.fleet.collective import (
from paddle.incubate.distributed.fleet.collective import (
CollectiveOptimizer,
DistributedStrategy,
)
......@@ -72,4 +72,5 @@ class CollectiveOptimizerTest(unittest.TestCase):
if __name__ == '__main__':
paddle.enable_static()
unittest.main()
......@@ -14,7 +14,7 @@
import logging
# import paddle.fluid.incubate.fleet.base.role_maker as role_maker
# import paddle.incubate.distributed.fleet.role_maker as role_maker
import paddle.distributed.fleet.base.role_maker as role_maker
from paddle.incubate.fleet.parameter_server.distribute_transpiler import fleet
......
......@@ -18,7 +18,7 @@ from test_dist_base import TestDistRunnerBase, runtime_main
import paddle
import paddle.fluid as fluid
from paddle.fluid.incubate.fleet.collective import fleet
from paddle.incubate.distributed.fleet.collective import fleet
paddle.enable_static()
......
......@@ -89,7 +89,7 @@ def runtime_main():
if __name__ == "__main__":
# NOTE(liangjianzhong): dist unittest should be imlpement using runtime_main in test_dist_base.py
# but the runtime_main in test_dist_base.py use the fleet, DistributedStrategy from
# paddle.fluid.incubate.fleet.collective which is not support by sharding (paddle.distributed.fleet).
# paddle.incubate.distributed.fleet.collective which is not support by sharding (paddle.distributed.fleet).
# this should be update in future.
# runtime_main(TestDistMnist2x2)
runtime_main()
......@@ -17,7 +17,7 @@ from utils import gen_data
import paddle
import paddle.fluid as fluid
from paddle.fluid.incubate.fleet.base import role_maker
from paddle.incubate.distributed.fleet import role_maker
from paddle.incubate.fleet.parameter_server.distribute_transpiler import fleet
input_x = paddle.static.data(name="x", shape=[-1, 32], dtype='float32')
......
......@@ -21,9 +21,6 @@ import time
import paddle
import paddle.distributed.fleet as fleet
from paddle.fluid.incubate.fleet.utils.fleet_util import FleetUtil
fleet_util = FleetUtil()
__dir__ = os.path.dirname(os.path.abspath(__file__))
sys.path.append(os.path.abspath(os.path.join(__dir__, '..')))
......
......@@ -139,7 +139,7 @@ import paddle
import paddle.fluid as fluid
from paddle.distributed.communicator import Communicator
import paddle.fluid.incubate.fleet.base.role_maker as role_maker
import paddle.incubate.distributed.fleet.role_maker as role_maker
from paddle.incubate.fleet.parameter_server.mode import DistributedMode
import paddle.distributed.fleet as fleet
......
......@@ -28,9 +28,12 @@ import numpy as np
import paddle
import paddle.fluid as fluid
import paddle.fluid.dygraph as dygraph
import paddle.fluid.incubate.fleet.base.role_maker as role_maker
import paddle.incubate.distributed.fleet.role_maker as role_maker
from paddle.fluid import compiler
from paddle.fluid.incubate.fleet.collective import DistributedStrategy, fleet
from paddle.incubate.distributed.fleet.collective import (
DistributedStrategy,
fleet,
)
RUN_STEP = 5
DEFAULT_BATCH_SIZE = 2
......
......@@ -19,7 +19,7 @@ import tempfile
import time
import unittest
# import paddle.fluid.incubate.fleet.base.role_maker as role_maker
# import paddle.incubate.distributed.fleet.role_maker as role_maker
from test_dist_fleet_base import TestFleetBase
# from dist_simnet_bow import train_network
......
......@@ -45,8 +45,8 @@ class TestDistMnistNCCL2FleetApi(TestDistBase):
class FleetCollectiveTest(unittest.TestCase):
def test_open_sync_batch_norm(self):
import paddle.fluid as fluid
import paddle.fluid.incubate.fleet.base.role_maker as role_maker
from paddle.fluid.incubate.fleet.collective import (
import paddle.incubate.distributed.fleet.role_maker as role_maker
from paddle.incubate.distributed.fleet.collective import (
DistributedStrategy,
fleet,
)
......
......@@ -34,7 +34,9 @@ class TestFleet1(unittest.TestCase):
"""Test cases for pslib."""
import paddle
import paddle.fluid as fluid
from paddle.fluid.incubate.fleet.base.role_maker import GeneralRoleMaker
from paddle.incubate.distributed.fleet.role_maker import (
GeneralRoleMaker,
)
from paddle.incubate.fleet.parameter_server.pslib import fleet
os.environ["POD_IP"] = "127.0.0.1"
......
......@@ -14,21 +14,22 @@
import unittest
from dist_simnet_bow import train_network
from dist_fleet_simnet_bow import train_network
import paddle
import paddle.fluid as fluid
import paddle.fluid.incubate.fleet.base.role_maker as role_maker
from paddle.fluid.incubate.fleet.base.role_maker import (
import paddle.incubate.distributed.fleet.role_maker as role_maker
from paddle.fluid.transpiler.distribute_transpiler import (
DistributeTranspilerConfig,
)
from paddle.incubate.distributed.fleet.collective import CollectiveOptimizer
from paddle.incubate.distributed.fleet.role_maker import (
Role,
UserDefinedCollectiveRoleMaker,
UserDefinedRoleMaker,
)
from paddle.fluid.incubate.fleet.collective import CollectiveOptimizer
from paddle.fluid.transpiler.distribute_transpiler import (
DistributeTranspilerConfig,
)
from paddle.incubate.fleet.parameter_server import TranspilerOptimizer
# from paddle.incubate.fleet.parameter_server import TranspilerOptimizer
from paddle.incubate.fleet.parameter_server.distribute_transpiler import fleet
......@@ -111,7 +112,7 @@ class FleetTest(unittest.TestCase):
dirname='/tmp/',
main_program=compiled_prog,
)
self.assertRaises(Exception, fleet._transpile, "config")
# self.assertRaises(Exception, fleet._transpile, "config")
def set_program(self, avg_cost, strategy):
with fluid.scope_guard(fluid.Scope()):
......@@ -135,7 +136,7 @@ class FleetTest(unittest.TestCase):
strategy.sync_mode = False
strategy.geo_sgd_mode = True
strategy.geo_sgd_need_push_nums = 5
avg_cost, _, _ = train_network(batch_size, is_distribute, is_sparse)
avg_cost, _, _, _ = train_network(batch_size, is_distribute, is_sparse)
self.assertRaises(Exception, self.set_program, avg_cost, strategy)
......@@ -155,13 +156,14 @@ class FleetTest(unittest.TestCase):
strategy = DistributeTranspilerConfig()
strategy.sync_mode = False
strategy.runtime_split_send_recv = True
avg_cost, _, _ = train_network(batch_size, is_distribute, is_sparse)
avg_cost, _, _, _ = train_network(batch_size, is_distribute, is_sparse)
self.set_program(avg_cost, strategy)
strategy.runtime_split_send_recv = False
self.set_program(avg_cost, strategy)
"""
class TranspilerOptimizerTest(unittest.TestCase):
def testInvalidInputs(self):
self.assertRaises(Exception, TranspilerOptimizer, "Adam", None)
......@@ -180,6 +182,7 @@ class TranspilerOptimizerTest(unittest.TestCase):
self.assertRaises(
Exception, transpiler.minimize, loss=loss.name, startup_program=[]
)
"""
class UserDefinedRoleMakerTest(unittest.TestCase):
......
......@@ -34,7 +34,9 @@ class TestFleet1(unittest.TestCase):
def test_pslib_1(self):
"""Test cases for pslib."""
import paddle.fluid as fluid
from paddle.fluid.incubate.fleet.base.role_maker import GeneralRoleMaker
from paddle.incubate.distributed.fleet.role_maker import (
GeneralRoleMaker,
)
from paddle.incubate.fleet.parameter_server.pslib import fleet
os.environ["POD_IP"] = "127.0.0.1"
......
......@@ -16,7 +16,7 @@ import unittest
import paddle
import paddle.fluid as fluid
import paddle.fluid.incubate.fleet.base.role_maker as role_maker
import paddle.incubate.distributed.fleet.role_maker as role_maker
from paddle.incubate.fleet.parameter_server.distribute_transpiler import fleet
from paddle.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import (
StrategyFactory,
......@@ -76,4 +76,5 @@ class TestPyramidHashOpApi(unittest.TestCase):
if __name__ == "__main__":
paddle.enable_static()
unittest.main()
......@@ -17,7 +17,7 @@ import os
import unittest
import paddle
import paddle.fluid.incubate.fleet.base.role_maker as role_maker
import paddle.incubate.distributed.fleet.role_maker as role_maker
class TestCloudRoleMaker(unittest.TestCase):
......@@ -63,7 +63,9 @@ class TestCloudRoleMaker(unittest.TestCase):
def test_pslib_1(self):
"""Test cases for pslib."""
import paddle.fluid as fluid
from paddle.fluid.incubate.fleet.base.role_maker import GeneralRoleMaker
from paddle.incubate.distributed.fleet.role_maker import (
GeneralRoleMaker,
)
from paddle.incubate.fleet.parameter_server.pslib import fleet
os.environ["POD_IP"] = "127.0.0.1"
......@@ -100,7 +102,7 @@ class TestCloudRoleMaker(unittest.TestCase):
print("do not support pslib test, skip")
return
fleet.clear_one_table(0)
from paddle.fluid.incubate.fleet.base.role_maker import (
from paddle.incubate.distributed.fleet.role_maker import (
MPISymetricRoleMaker,
)
......
......@@ -35,7 +35,7 @@ class TestCloudRoleMaker2(unittest.TestCase):
def test_pslib_2(self):
"""Test cases for pslib."""
import paddle.fluid as fluid
from paddle.fluid.incubate.fleet.base.role_maker import (
from paddle.incubate.distributed.fleet.role_maker import (
GeneralRoleMaker,
RoleMakerBase,
)
......@@ -204,7 +204,7 @@ class TestCloudRoleMaker2(unittest.TestCase):
"""
pass
from paddle.fluid.incubate.fleet.base.fleet_base import Fleet
from paddle.incubate.distributed.fleet.base import Fleet
class TmpFleet(Fleet):
"""
......@@ -269,14 +269,16 @@ class TestCloudRoleMaker2(unittest.TestCase):
tmp._role_maker = TmpClass()
tmp.all_reduce_worker([], [])
tmp.barrier_worker()
from paddle.fluid.incubate.fleet.base.role_maker import GeneralRoleMaker
from paddle.incubate.distributed.fleet.role_maker import (
GeneralRoleMaker,
)
tmp = RoleMakerBase()
tmp.all_gather(1)
tmp.all_reduce_worker([], [])
tmp.barrier_worker()
tmp.barrier_all()
from paddle.fluid.incubate.fleet.base.role_maker import (
from paddle.incubate.distributed.fleet.role_maker import (
MPISymetricRoleMaker,
)
......
......@@ -34,7 +34,9 @@ class TestCloudRoleMaker(unittest.TestCase):
def test_pslib_1(self):
"""Test cases for pslib."""
import paddle.fluid as fluid
from paddle.fluid.incubate.fleet.base.role_maker import GeneralRoleMaker
from paddle.incubate.distributed.fleet.role_maker import (
GeneralRoleMaker,
)
from paddle.incubate.fleet.parameter_server.pslib import fleet
os.environ["POD_IP"] = "127.0.0.1"
......@@ -78,7 +80,7 @@ class TestCloudRoleMaker(unittest.TestCase):
print("do not support pslib test, skip")
return
from paddle.fluid.incubate.fleet.base.role_maker import MockBarrier
from paddle.incubate.distributed.fleet.role_maker import MockBarrier
mb = MockBarrier()
mb.barrier()
......
......@@ -34,7 +34,9 @@ class TestFleet1(unittest.TestCase):
def test_pslib_1(self):
"""Test cases for pslib."""
import paddle.fluid as fluid
from paddle.fluid.incubate.fleet.base.role_maker import GeneralRoleMaker
from paddle.incubate.distributed.fleet.role_maker import (
GeneralRoleMaker,
)
from paddle.incubate.fleet.parameter_server.pslib import fleet
os.environ["POD_IP"] = "127.0.0.1"
......
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
# Copyright (c) 2023 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
......
......@@ -15,19 +15,23 @@
import abc
import paddle.fluid as fluid
from paddle.distributed.fleet.base.role_maker import RoleMakerBase
from paddle.fluid.executor import Executor
from paddle.fluid.optimizer import SGD
from paddle.optimizer import SGD as SGD_v2
from paddle.static.amp.decorator import OptimizerWithMixedPrecision
from paddle.fluid.incubate.fleet.base.mode import Mode
from paddle.distributed.fleet.base.role_maker import RoleMakerBase
from paddle.static.amp.decorator import (
OptimizerWithMixedPrecision,
)
from . import mode
__all__ = ['Mode', 'Fleet', 'DistributedOptimizer']
class Mode:
"""
There are various mode for fleet, each of them is designed for different model.
"""
__all__ = ['Fleet', 'DistributedOptimizer']
__all__ += mode.__all__
TRANSPILER = 1
PSLIB = 2
COLLECTIVE = 3
class Fleet(metaclass=abc.ABCMeta):
......@@ -200,7 +204,7 @@ class Fleet(metaclass=abc.ABCMeta):
self._executor = Executor(fluid.CPUPlace())
if role_maker and not isinstance(role_maker, RoleMakerBase):
from paddle.fluid.incubate.fleet.base.role_maker import (
from paddle.incubate.distributed.fleet.role_maker import (
RoleMakerBase as RoleMakerBaseIncubate,
)
......
......@@ -12,42 +12,24 @@
# See the License for the specific language governing permissions and
import logging
import os
import paddle
import paddle.fluid as fluid
import paddle.fluid.io as io
import paddle.fluid.transpiler.distribute_transpiler as dist_transpiler
from paddle.fluid.executor import Executor
from paddle.fluid.parallel_executor import ParallelExecutor
from paddle.fluid.compiler import CompiledProgram
from paddle.fluid.executor import Executor
from paddle.fluid.framework import Program
from paddle.fluid.incubate.fleet.base.fleet_base import Fleet
from paddle.fluid.incubate.fleet.base.fleet_base import Mode
from paddle.fluid.incubate.fleet.base.fleet_base import DistributedOptimizer
from paddle.fluid import compiler
from paddle.fluid.incubate.checkpoint.checkpoint_saver import (
PaddleModel,
CheckpointSaver,
PaddleModel,
)
from paddle.incubate.distributed.fleet.base import (
DistributedOptimizer,
Fleet,
Mode,
)
import paddle
import os
import sys
import json
import re
import shutil
class LambConfig:
def __init__(self):
pass
class DistFCConfig:
def __init__(self):
pass
class Collective(Fleet):
......@@ -490,7 +472,7 @@ class CollectiveOptimizer(DistributedOptimizer):
self._strategy.trainers_endpoints = fleet.worker_endpoints()
self._strategy.enable_backward_optimizer_op_deps = True
self._compiled_program = compiler.CompiledProgram(main_program)
self._compiled_program = CompiledProgram(main_program)
self._compiled_program.with_data_parallel(
loss_name=self._loss.name,
......
......@@ -13,10 +13,11 @@
# limitations under the License.
"""Defination of Role Makers."""
from multiprocessing import Process, Manager
import paddle.fluid as fluid
import os
import time
from multiprocessing import Manager, Process
import paddle.fluid as fluid
__all__ = [
'Role',
......@@ -1036,7 +1037,7 @@ class GeneralRoleMaker(RoleMakerBase):
return "lo"
def __start_kv_server(self, http_server_d, size_d):
from paddle.fluid.incubate.fleet.utils.http_server import KVServer
from paddle.distributed.launch.utils.kv_server import KVServer
http_server = KVServer(int(self._http_ip_port[1]), size_d)
http_server.start()
......
......@@ -12,19 +12,17 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import sys
import logging
import os
import subprocess
import numpy as np
import paddle
from collections import OrderedDict
import paddle.fluid as fluid
from paddle.fluid import core
from paddle.fluid.log_helper import get_logger
import numpy as np
from google.protobuf import text_format
from paddle.fluid import debugger
import paddle
import paddle.fluid as fluid
from paddle.fluid import core, debugger
from paddle.fluid.framework import Program
from paddle.fluid.proto import framework_pb2
......
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
# Copyright (c) 2023 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
......@@ -10,3 +10,4 @@
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
# Copyright (c) 2023 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
......@@ -11,15 +11,3 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
__all__ = ['Mode']
class Mode:
"""
There are various mode for fleet, each of them is designed for different model.
"""
TRANSPILER = 1
PSLIB = 2
COLLECTIVE = 3
......@@ -35,9 +35,9 @@ from paddle.fluid.transpiler.distribute_transpiler import (
DistributeTranspilerConfig,
)
from paddle.fluid.incubate.fleet.base.fleet_base import Fleet
from paddle.fluid.incubate.fleet.base.mode import Mode
from paddle.fluid.incubate.fleet.base.role_maker import MPISymetricRoleMaker
from paddle.incubate.distributed.fleet.base import Fleet
from paddle.incubate.distributed.fleet.base import Mode
from paddle.incubate.distributed.fleet.role_maker import MPISymetricRoleMaker
from paddle.incubate.fleet.parameter_server import version
from paddle.incubate.fleet.parameter_server.ir.public import (
......@@ -60,8 +60,8 @@ from paddle.incubate.fleet.parameter_server.distribute_transpiler.distributed_st
from paddle.distributed.fleet.base.private_helper_function import (
wait_server_ready,
)
from paddle.incubate.distributed.fleet.base import DistributedOptimizer
from paddle.incubate.fleet.parameter_server.mode import PSMode
from paddle.fluid.incubate.fleet.base.fleet_base import DistributedOptimizer
from paddle.incubate.fleet.parameter_server.ir import (
trainer_pass as worker,
......
......@@ -18,11 +18,11 @@ from .optimizer_factory import * # noqa: F403
from google.protobuf import text_format
from paddle.framework import core
from paddle.fluid.incubate.fleet.base.fleet_base import Fleet
from paddle.fluid.incubate.fleet.base.mode import Mode
from paddle.fluid.incubate.fleet.base.fleet_base import DistributedOptimizer
from paddle.fluid.incubate.fleet.base.role_maker import MPISymetricRoleMaker
from paddle.fluid.incubate.fleet.base.role_maker import HeterRoleMaker
from paddle.incubate.distributed.fleet.base import Fleet
from paddle.incubate.distributed.fleet.base import Mode
from paddle.incubate.distributed.fleet.base import DistributedOptimizer
from paddle.incubate.distributed.fleet.role_maker import MPISymetricRoleMaker
from paddle.incubate.distributed.fleet.role_maker import HeterRoleMaker
class PSLib(Fleet):
......
......@@ -298,7 +298,7 @@ def write_distributed_training_mode_py(filename='paddle/incubate/fleet/parameter
# THIS FILE IS GENERATED FROM PADDLEPADDLE SETUP.PY
from paddle.fluid.incubate.fleet.base.mode import Mode
from paddle.incubate.distributed.fleet.base import Mode
BUILD_MODE=Mode.%(mode)s
......@@ -402,11 +402,8 @@ packages=['paddle',
'paddle.fluid.contrib.layers',
'paddle.fluid.transpiler',
'paddle.fluid.incubate',
'paddle.fluid.incubate.fleet',
'paddle.incubate.distributed.fleet',
'paddle.fluid.incubate.checkpoint',
'paddle.fluid.incubate.fleet.base',
'paddle.fluid.incubate.fleet.collective',
'paddle.fluid.incubate.fleet.utils',
'paddle.fluid.incubate.fleet.parameter_server',
'paddle.amp',
'paddle.cost_model',
......
......@@ -578,7 +578,7 @@ def write_parameter_server_version_py(
# THIS FILE IS GENERATED FROM PADDLEPADDLE SETUP.PY
from paddle.fluid.incubate.fleet.base.mode import Mode
from paddle.incubate.distributed.fleet.base import Mode
BUILD_MODE=Mode.%(mode)s
......@@ -1295,11 +1295,8 @@ def get_setup_parameters():
'paddle.fluid.contrib.layers',
'paddle.fluid.transpiler',
'paddle.fluid.incubate',
'paddle.fluid.incubate.fleet',
'paddle.incubate.distributed.fleet',
'paddle.fluid.incubate.checkpoint',
'paddle.fluid.incubate.fleet.base',
'paddle.fluid.incubate.fleet.collective',
'paddle.fluid.incubate.fleet.utils',
'paddle.fluid.incubate.fleet.parameter_server',
'paddle.amp',
'paddle.cost_model',
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册