未验证 提交 51aa2129 编写于 作者: W wangxiaoning 提交者: GitHub

[Fluidclean]move fluid.transpiler to distributed.transpiler (#51025)

* remove transpiler

* Revert "remove transpiler"

This reverts commit 46044ccd52011d45d7026786d331f264a6a8f645.

* Revert "Revert "remove transpiler""

This reverts commit 80ad0945401b5b5efebac4baee0ec50a793d4405.

* codestyle

* fix setup

* fix

* fix
上级 9c60c5ec
......@@ -48,7 +48,7 @@ def _save_distributed_persistables(executor, dirname, main_program):
paddle.enable_static()
exe = paddle.static.Executor(paddle.CPUPlace())
param_path = "./my_paddle_model"
t = distribute_transpiler.DistributeTranspiler()
t = paddle.distributed.transpiler.DistributeTranspiler()
t.transpile(...)
train_program = t.get_trainer_program()
_save_distributed_persistables(executor=exe, dirname=param_path, main_program=train_program)
......
......@@ -11,3 +11,9 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .distribute_transpiler import (
DistributeTranspiler,
DistributeTranspilerConfig,
)
from .memory_optimization_transpiler import memory_optimize, release_memory
......@@ -28,26 +28,29 @@ Steps to transpile pserver:
5. add listen_and_serv op
"""
import collections
import logging
import math
import os
import sys
import math
from functools import reduce
import collections
import logging
import numpy as np
from .ps_dispatcher import RoundRobin, PSDispatcher
from .. import core, framework, unique_name
from ..framework import (
Program,
from paddle import framework
from paddle.fluid.framework import grad_var_name
from paddle.framework import Block, Program, core
from paddle.incubate.distributed.fleet.parameter_server.ir.ps_dispatcher import (
PSDispatcher,
RoundRobin,
)
from paddle.nn.initializer import Constant
from paddle.static import (
Parameter,
default_main_program,
default_startup_program,
Block,
Parameter,
grad_var_name,
)
from paddle.utils import unique_name
LOOKUP_TABLE_TYPE = ["lookup_table", "lookup_table_v2"]
LOOKUP_TABLE_GRAD_TYPE = ["lookup_table_grad", "lookup_table_v2_grad"]
......@@ -172,10 +175,10 @@ class DistributeTranspilerConfig:
Examples:
.. code-block:: python
from paddle.fluid.transpiler.ps_dispatcher import RoundRobin
import paddle.fluid as fluid
from paddle.distributed.transpiler.ps_dispatcher import RoundRobin
import paddle.distributed.transpiler as transpiler
config = fluid.DistributeTranspilerConfig()
config = transpiler.DistributeTranspilerConfig()
config.slice_var_up = True
config.split_method = RoundRobin
config.min_block_size = 81920
......@@ -281,11 +284,12 @@ class DistributeTranspiler:
import paddle
import paddle.fluid as fluid
import paddle.distributed.transpiler as transpiler
paddle.enable_static()
x = fluid.data(name='x', shape=[1,13], dtype='float32')
y = fluid.data(name='y', shape=[1], dtype='float32')
x = paddle.static.data(name='x', shape=[1,13], dtype='float32')
y = paddle.static.data(name='y', shape=[1], dtype='float32')
y_predict = paddle.static.nn.fc(x, size=1, activation=None)
cost =paddle.nn.functional.square_error_cost(input=y_predict, label=y)
......@@ -301,7 +305,7 @@ class DistributeTranspiler:
trainer_id = 0
trainers = 4
role = "PSERVER"
t = fluid.DistributeTranspiler()
t = transpiler.DistributeTranspiler()
t.transpile(
trainer_id, pservers=pserver_endpoints, trainers=trainers)
if role == "PSERVER":
......@@ -314,12 +318,12 @@ class DistributeTranspiler:
# for nccl2 mode
trainer_num = 2
trainer_id = 0
config = fluid.DistributeTranspilerConfig()
config = transpiler.DistributeTranspilerConfig()
config.mode = "nccl2"
trainer_endpoints = "192.168.0.1:6174,192.168.0.2:6174"
t = fluid.DistributeTranspiler(config=config)
t = transpiler.DistributeTranspiler(config=config)
t.transpile(trainer_id=trainer_id, trainers=trainer_endpoints, current_endpoint="192.168.0.1:6174")
exe = fluid.ParallelExecutor(
exe = paddle.static.ParallelExecutor(
use_cuda=True,
loss_name=avg_loss.name,
num_trainers=trainer_num,
......@@ -588,9 +592,9 @@ class DistributeTranspiler:
trainer_id (int): id for current trainer worker, if you have
n workers, the id may range from 0 ~ n-1
program (Program|None): program to transpile,
default is fluid.default_main_program().
default is paddle.static.default_main_program().
startup_program (Program|None): startup_program to transpile,
default is fluid.default_startup_program().
default is paddle.static.default_startup_program().
pservers (str): comma separated ip:port string for the pserver
list.
trainers (int|str): in pserver mode this is the number of
......@@ -598,7 +602,7 @@ class DistributeTranspiler:
endpoints.
sync_mode (bool): Do sync training or not, default is True.
startup_program (Program|None): startup_program to transpile,
default is fluid.default_main_program().
default is paddle.static.default_main_program().
current_endpoint (str): need pass current endpoint when
transpile as nccl2 distributed mode. In pserver mode
this argument is not used.
......@@ -606,7 +610,7 @@ class DistributeTranspiler:
Examples:
.. code-block:: python
transpiler = fluid.DistributeTranspiler()
transpiler = paddle.distributed.transpiler.DistributeTranspiler()
t.transpile(
trainer_id=0,
pservers="127.0.0.1:7000,127.0.0.1:7001",
......@@ -1124,12 +1128,12 @@ WIKI: https://github.com/PaddlePaddle/Fleet/blob/develop/markdown_doc/transpiler
Examples:
.. code-block:: python
import paddle.fluid as fluid
import paddle.distributed.transpiler as transpiler
#this is an example, find available endpoints in your case
pserver_endpoints = "192.168.0.1:6174,192.168.0.2:6174"
trainer_id = 0
trainers = 4
t = fluid.DistributeTranspiler()
t = transpiler.DistributeTranspiler()
t.transpile(trainer_id, trainers=trainers, pservers=pserver_endpoints)
trainer_program = t.get_trainer_program()
"""
......@@ -1270,13 +1274,13 @@ WIKI: https://github.com/PaddlePaddle/Fleet/blob/develop/markdown_doc/transpiler
Examples:
.. code-block:: python
import paddle.fluid as fluid
import paddle.distributed.transpiler as transpiler
#this is an example, find available endpoints in your case
pserver_endpoints = "192.168.0.1:6174,192.168.0.2:6174"
current_endpoint = "192.168.0.1:6174"
trainer_id = 0
trainers = 4
t = fluid.DistributeTranspiler()
t = transpiler.DistributeTranspiler()
t.transpile(
trainer_id, pservers=pserver_endpoints, trainers=trainers)
pserver_program = t.get_pserver_program(current_endpoint)
......@@ -1349,8 +1353,8 @@ WIKI: https://github.com/PaddlePaddle/Fleet/blob/develop/markdown_doc/transpiler
opt_op_on_pserver.append(op)
# step 3.3
# prepare if dc asgd is enabled
if self.config.enable_dc_asgd == True:
assert self.sync_mode == False
if self.config.enable_dc_asgd is True:
assert self.sync_mode is False
self.param_bak_list = []
# add param_bak for each trainer
for p in self.param_grad_ep_mapping[endpoint]["params"]:
......@@ -1579,13 +1583,13 @@ WIKI: https://github.com/PaddlePaddle/Fleet/blob/develop/markdown_doc/transpiler
Examples:
.. code-block:: python
import paddle.fluid as fluid
import paddle.distributed.transpiler as transpiler
#this is an example, find available endpoints in your case
pserver_endpoints = "192.168.0.1:6174,192.168.0.2:6174"
current_endpoint = "192.168.0.1:6174"
trainer_id = 0
trainers = 4
t = fluid.DistributeTranspiler()
t = transpiler.DistributeTranspiler()
t.transpile(
trainer_id, pservers=pserver_endpoints, trainers=trainers)
pserver_program, pserver_startup_program = t.get_pserver_programs(current_endpoint)
......@@ -1624,7 +1628,7 @@ WIKI: https://github.com/PaddlePaddle/Fleet/blob/develop/markdown_doc/transpiler
trainer_id = 0
trainers = 4
t = fluid.DistributeTranspiler()
t = paddle.distributed.transpiler.DistributeTranspiler()
t.transpile(trainer_id, pservers=pserver_endpoints, trainers=trainers)
pserver_program = t.get_pserver_program(current_endpoint)
pserver_startup_program = t.get_startup_program(current_endpoint,
......@@ -1851,7 +1855,7 @@ WIKI: https://github.com/PaddlePaddle/Fleet/blob/develop/markdown_doc/transpiler
param_grad_set = set()
for p, g in self.params_grads:
# skip parameter marked not trainable
if type(p) == Parameter and p.trainable == False:
if type(p) == Parameter and p.trainable is False:
continue
if p.name not in param_grad_set:
param_list.append(p)
......@@ -2834,7 +2838,7 @@ WIKI: https://github.com/PaddlePaddle/Fleet/blob/develop/markdown_doc/transpiler
if role_id == int(LR_SCHED_OP_ROLE_ATTR_VALUE) or role_id == int(
LR_SCHED_OP_ROLE_ATTR_VALUE
) | int(OPT_OP_ROLE_ATTR_VALUE):
if self.sync_mode == False and op.type == 'increment':
if self.sync_mode is False and op.type == 'increment':
inputs = self._get_input_map_from_op(
self.origin_program.global_block().vars, op
)
......@@ -2879,7 +2883,7 @@ WIKI: https://github.com/PaddlePaddle/Fleet/blob/develop/markdown_doc/transpiler
dtype=var.dtype,
shape=var.shape,
persistable=var.persistable,
initializer=paddle.nn.initializer.Constant(1),
initializer=Constant(1),
)
op_role_attr_name = (
core.op_proto_and_checker_maker.kOpRoleAttrName()
......@@ -2978,7 +2982,7 @@ WIKI: https://github.com/PaddlePaddle/Fleet/blob/develop/markdown_doc/transpiler
if op.attr(OP_ROLE_VAR_ATTR_NAME):
param_name = op.attr(OP_ROLE_VAR_ATTR_NAME)[0]
grad_name = op.attr(OP_ROLE_VAR_ATTR_NAME)[1]
if not param_name in optimize_params:
if param_name not in optimize_params:
optimize_params.add(param_name)
log("adding param_grad pair: ", param_name, grad_name)
params_grads.append(
......
......@@ -24,36 +24,34 @@ Steps to transpile pserver:
4. append sum ops that should run on current server instance.
5. add listen_and_serv op
"""
import sys
import collections
import numpy as np
from .ps_dispatcher import RoundRobin, PSDispatcher
from .. import core, framework
from ..framework import (
Program,
default_main_program,
default_startup_program,
Block,
Parameter,
from paddle import framework
from paddle.distributed.distribute_lookup_table import (
find_distributed_lookup_table,
)
from paddle.distributed.transpiler.details import (
wait_server_ready,
VarsDistributed,
wait_server_ready,
)
from paddle.distributed.transpiler.details import delete_ops
from .distribute_transpiler import (
DistributeTranspiler,
DistributeTranspilerConfig,
slice_variable,
same_or_split_var,
ServerRuntimeConfig,
from paddle.framework import Program, core
from paddle.incubate.distributed.fleet.parameter_server.ir.ps_dispatcher import (
PSDispatcher,
RoundRobin,
)
from paddle.incubate.distributed.fleet.parameter_server.mode import (
DistributedMode,
)
from paddle.distributed.distribute_lookup_table import (
find_distributed_lookup_table,
from paddle.static import (
Parameter,
default_main_program,
default_startup_program,
)
from .distribute_transpiler import (
DistributeTranspiler,
DistributeTranspilerConfig,
slice_variable,
)
RPC_OP_ROLE_ATTR_NAME = (
......@@ -311,7 +309,7 @@ class GeoSgdTranspiler(DistributeTranspiler):
param_grad_set = set()
# step 1. create param_list
for p, g in self.params_grads:
if type(p) == Parameter and p.trainable == False:
if type(p) == Parameter and p.trainable is False:
continue
if p.name not in param_grad_set:
param_list.append(p)
......
......@@ -25,7 +25,7 @@ def memory_optimize(
memory optimization strategies are enabled by default.
"""
logging.warn(
'Caution! paddle.fluid.memory_optimize() is deprecated '
'Caution! paddle.distributed.transpiler.memory_optimize() is deprecated '
'and not maintained any more, since it is not stable!\n'
'This API would not take any memory optimizations on your Program '
'now, since we have provided default strategies for you.\n'
......@@ -48,6 +48,6 @@ def release_memory(input_program, skip_opt_set=None):
memory optimization strategies are enabled by default.
"""
logging.warn(
'paddle.fluid.release_memory() is deprecated, it would not'
'paddle.distributed.transpiler.release_memory() is deprecated, it would not'
' take any memory release on your program'
)
......@@ -64,7 +64,6 @@ from .backward import gradients
from . import regularizer
from . import average
from . import metrics
from . import transpiler
from . import incubate
from .param_attr import ParamAttr, WeightNormParamAttr
from .data_feeder import DataFeeder
......@@ -80,12 +79,6 @@ from .core import (
MLUPlace,
CustomPlace,
)
from .transpiler import (
DistributeTranspiler,
memory_optimize,
release_memory,
DistributeTranspilerConfig,
)
from .lod_tensor import create_lod_tensor, create_random_int_lodtensor
from . import profiler
from . import unique_name
......@@ -107,7 +100,6 @@ from .trainer_desc import (
MultiTrainer,
HeterXpuTrainer,
)
from .transpiler import HashName, RoundRobin
from .backward import append_backward
Tensor = LoDTensor
......@@ -118,7 +110,6 @@ __all__ = (
framework.__all__
+ executor.__all__
+ trainer_desc.__all__
+ transpiler.__all__
+ parallel_executor.__all__
+ lod_tensor.__all__
+ data_feed_desc.__all__
......@@ -135,7 +126,6 @@ __all__ = (
'disable_dygraph',
'enable_imperative',
'disable_imperative',
'transpiler',
'nets',
'optimizer',
'backward',
......
......@@ -235,7 +235,7 @@ def train(net_type, use_cuda, save_dirname, is_local):
current_endpoint = os.getenv("POD_IP") + ":" + port
trainer_id = int(os.getenv("PADDLE_TRAINER_ID"))
training_role = os.getenv("PADDLE_TRAINING_ROLE", "TRAINER")
t = fluid.DistributeTranspiler()
t = paddle.distributed.transpiler.DistributeTranspiler()
t.transpile(trainer_id, pservers=pserver_endpoints, trainers=trainers)
if training_role == "PSERVER":
pserver_prog = t.get_pserver_program(current_endpoint)
......
......@@ -563,7 +563,7 @@ def _save_distributed_persistables(executor, dirname, main_program):
paddle.enable_static()
exe = fluid.Executor(fluid.CPUPlace())
param_path = "./my_paddle_model"
t = distribute_transpiler.DistributeTranspiler()
t = paddle.distributed.transpiler.DistributeTranspiler()
t.transpile(...)
train_program = t.get_trainer_program()
_save_distributed_persistables(executor=exe, dirname=param_path, main_program=train_program)
......@@ -1179,7 +1179,7 @@ def _load_distributed_persistables(executor, dirname, main_program=None):
paddle.enable_static()
exe = fluid.Executor(fluid.CPUPlace())
param_path = "./my_paddle_model"
t = distribute_transpiler.DistributeTranspiler()
t = paddle.distributed.transpiler.DistributeTranspiler()
t.transpile(...)
pserver_prog = t.get_pserver_program(...)
_load_distributed_persistables(executor=exe, dirname=param_path, main_program=pserver_prog)
......
......@@ -129,7 +129,7 @@ def train(
current_endpoint = os.getenv("POD_IP") + ":" + port
trainer_id = int(os.getenv("PADDLE_TRAINER_ID"))
training_role = os.getenv("PADDLE_TRAINING_ROLE", "TRAINER")
t = fluid.DistributeTranspiler()
t = paddle.distributed.transpiler.DistributeTranspiler()
t.transpile(trainer_id, pservers=pserver_endpoints, trainers=trainers)
if training_role == "PSERVER":
pserver_prog = t.get_pserver_program(current_endpoint)
......
......@@ -144,7 +144,7 @@ def train(use_cuda, save_dirname, is_local, use_bf16, pure_bf16):
current_endpoint = os.getenv("POD_IP") + ":" + port
trainer_id = int(os.getenv("PADDLE_TRAINER_ID"))
training_role = os.getenv("PADDLE_TRAINING_ROLE", "TRAINER")
t = fluid.DistributeTranspiler()
t = paddle.distributed.transpiler.DistributeTranspiler()
t.transpile(trainer_id, pservers=pserver_endpoints, trainers=trainers)
if training_role == "PSERVER":
pserver_prog = t.get_pserver_program(current_endpoint)
......
......@@ -202,7 +202,7 @@ def train(net_type, use_cuda, save_dirname, is_local):
current_endpoint = os.getenv("POD_IP") + ":" + port
trainer_id = int(os.getenv("PADDLE_TRAINER_ID"))
training_role = os.getenv("PADDLE_TRAINING_ROLE", "TRAINER")
t = fluid.DistributeTranspiler()
t = paddle.distributed.transpiler.DistributeTranspiler()
t.transpile(trainer_id, pservers=pserver_endpoints, trainers=trainers)
if training_role == "PSERVER":
pserver_prog = t.get_pserver_program(current_endpoint)
......
......@@ -178,7 +178,7 @@ def train(
current_endpoint = os.getenv("POD_IP") + ":" + port
trainer_id = int(os.getenv("PADDLE_TRAINER_ID"))
training_role = os.getenv("PADDLE_TRAINING_ROLE", "TRAINER")
t = fluid.DistributeTranspiler()
t = paddle.distributed.transpiler.DistributeTranspiler()
t.transpile(trainer_id, pservers=pserver_endpoints, trainers=trainers)
if training_role == "PSERVER":
pserver_prog = t.get_pserver_program(current_endpoint)
......
......@@ -279,7 +279,7 @@ def train(use_cuda, save_dirname, is_local=True):
current_endpoint = os.getenv("POD_IP") + ":" + port
trainer_id = int(os.getenv("PADDLE_TRAINER_ID"))
training_role = os.getenv("PADDLE_TRAINING_ROLE", "TRAINER")
t = fluid.DistributeTranspiler()
t = paddle.distributed.transpiler.DistributeTranspiler()
t.transpile(trainer_id, pservers=pserver_endpoints, trainers=trainers)
if training_role == "PSERVER":
pserver_prog = t.get_pserver_program(current_endpoint)
......
......@@ -184,7 +184,7 @@ def train(
current_endpoint = os.getenv("POD_IP") + ":" + port
trainer_id = int(os.getenv("PADDLE_TRAINER_ID"))
training_role = os.getenv("PADDLE_TRAINING_ROLE", "TRAINER")
t = fluid.DistributeTranspiler()
t = paddle.distributed.transpiler.DistributeTranspiler()
t.transpile(trainer_id, pservers=pserver_endpoints, trainers=trainers)
if training_role == "PSERVER":
pserver_prog = t.get_pserver_program(current_endpoint)
......
......@@ -18,7 +18,7 @@ import unittest
import paddle
import paddle.fluid as fluid
import paddle.incubate.distributed.fleet.role_maker as role_maker
from paddle.fluid.transpiler.distribute_transpiler import (
from paddle.distributed.transpiler.distribute_transpiler import (
DistributeTranspilerConfig,
ServerRuntimeConfig,
)
......
......@@ -25,10 +25,10 @@ class TestTranspile(unittest.TestCase):
"""TestCases for BoxPS Preload"""
def get_transpile(self, mode, trainers="127.0.0.1:6174"):
config = fluid.DistributeTranspilerConfig()
config = paddle.distributed.transpiler.DistributeTranspilerConfig()
config.mode = 'collective'
config.collective_mode = mode
t = fluid.DistributeTranspiler(config=config)
t = paddle.distributed.transpiler.DistributeTranspiler(config=config)
return t
def test_transpile(self):
......
......@@ -16,12 +16,13 @@ import unittest
from simple_nets import simple_fc_net
import paddle.distributed.transpiler as transpiler
import paddle.fluid as fluid
class DeprecatedMemoryOptimizationInterfaceTest(unittest.TestCase):
def setUp(self):
self.method = fluid.memory_optimize
self.method = transpiler.memory_optimize
def build_network(self, call_interface):
startup_prog = fluid.Program()
......@@ -63,7 +64,7 @@ class DeprecatedMemoryOptimizationInterfaceTest(unittest.TestCase):
class ReleaseMemoryTest(DeprecatedMemoryOptimizationInterfaceTest):
def setUp(self):
self.method = fluid.release_memory
self.method = transpiler.release_memory
if __name__ == '__main__':
......
......@@ -80,7 +80,7 @@ class TestDistRunnerBase:
hogwild_mode=False,
):
# NOTE: import fluid until runtime, or else forking processes will cause error.
config = fluid.DistributeTranspilerConfig()
config = paddle.distributed.transpiler.DistributeTranspilerConfig()
config.enable_dc_asgd = dc_asgd
config.sync_mode = sync_mode
config.runtime_split_send_recv = hogwild_mode
......@@ -88,7 +88,7 @@ class TestDistRunnerBase:
if nccl_comm_num > 1:
config.nccl_comm_num = nccl_comm_num
# config.runtime_split_send_recv = True
t = fluid.DistributeTranspiler(config=config)
t = paddle.distributed.transpiler.DistributeTranspiler(config=config)
t.transpile(
trainer_id=trainer_id,
program=main_program,
......@@ -454,7 +454,7 @@ class TestDistRunnerBase:
or args.update_method == "nccl2_reduce_layer"
):
# transpile for nccl2
config = fluid.DistributeTranspilerConfig()
config = paddle.distributed.transpiler.DistributeTranspilerConfig()
config.mode = "nccl2"
config.nccl_comm_num = args.nccl_comm_num
if args.use_hallreduce:
......@@ -466,7 +466,9 @@ class TestDistRunnerBase:
type(self).__name__,
"begin to run transpile on trainer with nccl2 mode",
)
nccl2_t = fluid.DistributeTranspiler(config=config)
nccl2_t = paddle.distributed.transpiler.DistributeTranspiler(
config=config
)
nccl2_t.transpile(
args.trainer_id,
program=fluid.default_main_program(),
......
......@@ -81,7 +81,11 @@ class TranspilerTest(unittest.TestCase):
def _transpiler_instance(self, config=None, sync_mode=True):
if not self.transpiler:
main = self.get_main_program()
self.transpiler = fluid.DistributeTranspiler(config=config)
self.transpiler = (
paddle.distributed.transpiler.DistributeTranspiler(
config=config
)
)
self.transpiler.transpile(
self.trainer_id,
program=main,
......@@ -202,7 +206,7 @@ class TestBasicModel(TranspilerTest):
class TestBasicModelWithLargeBlockSize(TranspilerTest):
def transpiler_test_impl(self):
config = fluid.DistributeTranspilerConfig()
config = paddle.distributed.transpiler.DistributeTranspilerConfig()
config.min_block_size = 1048576
pserver, startup = self.get_pserver(self.pserver1_ep, config)
......@@ -276,7 +280,7 @@ class TestNoSliceVar(TranspilerTest):
super().setUp()
def transpiler_test_impl(self):
config = fluid.DistributeTranspilerConfig()
config = paddle.distributed.transpiler.DistributeTranspilerConfig()
config.slice_var_up = False
_, startup = self.get_pserver(self.pserver1_ep, config)
......@@ -692,7 +696,7 @@ class TestEmptyPserverOptimizeBlocks(TranspilerTest):
sgd_optimizer.minimize(avg_cost)
def transpiler_test_impl(self):
config = fluid.DistributeTranspilerConfig()
config = paddle.distributed.transpiler.DistributeTranspilerConfig()
config.slice_var_up = False
pserver, startup = self.get_pserver(ep=self.pserver2_ep, config=config)
......@@ -924,7 +928,7 @@ class TestAsyncLocalLookupTable(TestDistLookupTableBase):
self.network_with_table(is_sparse=True, is_distributed=False)
def transpiler_test_impl(self):
config = fluid.DistributeTranspilerConfig()
config = paddle.distributed.transpiler.DistributeTranspilerConfig()
pserver1, startup1 = self.get_pserver(self.pserver1_ep, config, False)
self.assertEqual(len(pserver1.blocks), 4)
......@@ -991,7 +995,7 @@ class TestAsyncDistLookupTable(TestDistLookupTableBase):
self.network_with_table(is_sparse=True, is_distributed=True)
def transpiler_test_impl(self):
config = fluid.DistributeTranspilerConfig()
config = paddle.distributed.transpiler.DistributeTranspilerConfig()
pserver1, startup1 = self.get_pserver(self.pserver1_ep, config, False)
......@@ -1089,7 +1093,7 @@ class TestDistLookupTableSliceSize(TestDistLookupTableBase):
self.network_with_table(is_sparse=True, is_distributed=True)
def transpiler_test_impl(self):
config = fluid.DistributeTranspilerConfig()
config = paddle.distributed.transpiler.DistributeTranspilerConfig()
pserver1, _ = self.get_pserver(self.pserver1_ep, config)
self.assertTrue(self.transpiler.has_distributed_lookup_table)
......@@ -1220,10 +1224,12 @@ class TestNCCL2Transpile(TranspilerTest):
with fluid.program_guard(main, startup):
self.net_conf()
config = fluid.DistributeTranspilerConfig()
config = paddle.distributed.transpiler.DistributeTranspilerConfig()
config.mode = "nccl2"
config.wait_port = False
t = fluid.DistributeTranspiler(config=config)
t = paddle.distributed.transpiler.DistributeTranspiler(
config=config
)
t.transpile(
0,
trainers="127.0.0.1:6174,127.0.0.1:6175",
......
......@@ -19,7 +19,7 @@ from dist_fleet_simnet_bow import train_network
import paddle
import paddle.fluid as fluid
import paddle.incubate.distributed.fleet.role_maker as role_maker
from paddle.fluid.transpiler.distribute_transpiler import (
from paddle.distributed.transpiler.distribute_transpiler import (
DistributeTranspilerConfig,
)
from paddle.incubate.distributed.fleet.collective import CollectiveOptimizer
......
......@@ -49,9 +49,9 @@ def run_pserver(use_cuda, sync_mode, ip, port, trainers, trainer_id):
pserver_endpoints = ip + ":" + port
current_endpoint = ip + ":" + port
config = fluid.DistributeTranspilerConfig()
config = paddle.distributed.transpiler.DistributeTranspilerConfig()
config.sync_mode = sync_mode
t = fluid.DistributeTranspiler(config=config)
t = paddle.distributed.transpiler.DistributeTranspiler(config=config)
t.transpile(
trainer_id,
pservers=pserver_endpoints,
......@@ -87,11 +87,11 @@ def run_pserver_with_empty_block(
ps2 = ip + ":" + port
pserver_endpoints = ps1 + "," + ps2
config = fluid.DistributeTranspilerConfig()
config = paddle.distributed.transpiler.DistributeTranspilerConfig()
config.sync_mode = sync_mode
config.slice_var_up = False
t = fluid.DistributeTranspiler(config=config)
t = paddle.distributed.transpiler.DistributeTranspiler(config=config)
t.transpile(
trainer_id,
pservers=pserver_endpoints,
......
......@@ -16,7 +16,7 @@ import random
import unittest
import paddle.fluid as fluid
from paddle.fluid.transpiler.distribute_transpiler import slice_variable
from paddle.distributed.transpiler.distribute_transpiler import slice_variable
class TestSliceVar(unittest.TestCase):
......
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .distribute_transpiler import (
DistributeTranspiler,
DistributeTranspilerConfig,
)
from .memory_optimization_transpiler import memory_optimize, release_memory
from .ps_dispatcher import HashName, RoundRobin
__all__ = [
"DistributeTranspiler",
"memory_optimize",
"release_memory",
"HashName",
"RoundRobin",
"DistributeTranspilerConfig",
]
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
class PSDispatcher:
"""
PSDispatcher is the base class for dispatching vars
into different pserver instance.
You need to implement the `dispatch` interface.
"""
def __init__(self, pserver_endpoints):
self._eps = pserver_endpoints
self._step = 0
@property
def eps(self):
return self._eps
def reset(self):
"""
reset the step counter, set it zero.
"""
self._step = 0
def dispatch(self, varlist):
"""
Args:
varlist(list): a list of Variables
Returns:
a map of pserver endpoint -> varname
"""
AssertionError("Interface has not been implemented.")
class HashName(PSDispatcher):
"""
:api_attr: Static Graph
Hash variable names to several endpoints using python
"hash()" function.
Args:
pserver_endpoints (list): list of endpoint(ip:port).
Examples:
.. code-block:: python
pserver_endpoints = ["127.0.0.1:6007", "127.0.0.1:6008"]
vars = ["var1","var2","var3","var4","var5"]
rr = RoundRobin(pserver_endpoints)
rr.dispatch(vars)
"""
def __init__(self, pserver_endpoints):
super().__init__(pserver_endpoints)
def _hash_block(self, block_str, total):
return hash(block_str) % total
def dispatch(self, varlist):
"""
use `HashName` method to dispatch variables with each parameter server.
Args:
varlist (list): a list of Variables
"""
eplist = []
for var in varlist:
server_id = self._hash_block(var.name(), len(self._eps))
server_for_param = self._eps[server_id]
eplist.append(server_for_param)
return eplist
class RoundRobin(PSDispatcher):
"""
:api_attr: Static Graph
Distribute variables to several endpoints using
RondRobin<https://en.wikipedia.org/wiki/Round-robin_scheduling> method.
Args:
pserver_endpoints (list): list of endpoint(ip:port).
Examples:
.. code-block:: python
pserver_endpoints = ["127.0.0.1:6007", "127.0.0.1:6008"]
vars = ["var1","var2","var3","var4","var5"]
rr = RoundRobin(pserver_endpoints)
rr.dispatch(vars)
"""
def __init__(self, pserver_endpoints):
super().__init__(pserver_endpoints)
def dispatch(self, varlist):
"""
use `RoundRobin` method to dispatch variables with each parameter server.
Args:
varlist (list): a list of Variables
"""
eplist = []
for var in varlist:
server_for_param = self._eps[self._step]
eplist.append(server_for_param)
self._step += 1
if self._step >= len(self._eps):
self._step = 0
return eplist
......@@ -15,9 +15,9 @@ import logging
import os
import paddle
import paddle.distributed.transpiler.distribute_transpiler as dist_transpiler
import paddle.fluid as fluid
import paddle.fluid.io as io
import paddle.fluid.transpiler.distribute_transpiler as dist_transpiler
from paddle.fluid.compiler import CompiledProgram
from paddle.fluid.executor import Executor
from paddle.fluid.framework import Program
......
......@@ -31,7 +31,7 @@ from paddle.fluid.compiler import CompiledProgram
from paddle.fluid.parallel_executor import ParallelExecutor
from paddle.fluid.optimizer import Optimizer
from paddle.fluid.transpiler.distribute_transpiler import (
from paddle.distributed.transpiler.distribute_transpiler import (
DistributeTranspilerConfig,
)
......@@ -78,7 +78,7 @@ from paddle.incubate.distributed.fleet.parameter_server.ir import (
class FleetTranspiler(Fleet):
"""
A subclass for compatibility with fluid.transpiler.DistributeTranspiler.
A subclass for compatibility with distributed.transpiler.DistributeTranspiler.
"""
def __init__(self):
......
......@@ -25,7 +25,7 @@ __all__ = [
import os
import paddle.fluid as fluid
from paddle.fluid.transpiler.distribute_transpiler import (
from paddle.distributed.transpiler.distribute_transpiler import (
DistributeTranspilerConfig,
ServerRuntimeConfig,
)
......
......@@ -400,7 +400,6 @@ packages=['paddle',
'paddle.fluid.contrib',
'paddle.fluid.contrib.extend_optimizer',
'paddle.fluid.contrib.layers',
'paddle.fluid.transpiler',
'paddle.fluid.incubate',
'paddle.incubate.distributed.fleet',
'paddle.fluid.incubate.checkpoint',
......
......@@ -1300,7 +1300,6 @@ def get_setup_parameters():
'paddle.fluid.contrib',
'paddle.fluid.contrib.extend_optimizer',
'paddle.fluid.contrib.layers',
'paddle.fluid.transpiler',
'paddle.fluid.incubate',
'paddle.incubate.distributed.fleet',
'paddle.fluid.incubate.checkpoint',
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册