未验证 提交 a8078bbd 编写于 作者: L LiYuRio 提交者: GitHub

add multi fetch (#50070)

上级 db83b53a
......@@ -15,6 +15,7 @@
#include "paddle/fluid/distributed/fleet_executor/carrier.h"
#include <algorithm>
#include <vector>
#include "paddle/fluid/distributed/fleet_executor/global.h"
#include "paddle/fluid/distributed/fleet_executor/interceptor.h"
......@@ -24,6 +25,7 @@
#include "paddle/fluid/framework/garbage_collector.h"
#include "paddle/fluid/framework/program_desc.h"
#include "paddle/fluid/framework/scope.h"
#include "paddle/fluid/framework/variable.h"
#include "paddle/fluid/framework/variable_helper.h"
namespace paddle {
......@@ -55,23 +57,34 @@ void Carrier::Init(
framework::Scope* scope,
int64_t num_micro_batches,
const platform::Place& place,
const std::vector<std::string>& inference_root_scope_vars) {
const std::vector<std::string>& inference_root_scope_vars,
const std::vector<framework::Scope*>& micro_scope_list) {
rank_ = rank;
interceptor_id_to_rank_ = interceptor_id_to_rank;
interceptor_id_to_node_ = interceptor_id_to_node;
place_ = place;
root_scope_ = scope;
dev_ctx_ = platform::DeviceContextPool::Instance().Get(place_);
bool need_create_scope = micro_scope_list.empty();
PADDLE_ENFORCE_NOT_NULL(
root_scope_,
platform::errors::InvalidArgument("root_scope can not be nullptr"));
minibatch_scope_ = &root_scope_->NewScope();
microbatch_scopes_.resize(num_micro_batches);
for (int i = 0; i < num_micro_batches; ++i) {
microbatch_scopes_[i] = &minibatch_scope_->NewScope();
CopyParameters(i, program, inference_root_scope_vars);
if (need_create_scope) {
minibatch_scope_ = &root_scope_->NewScope();
microbatch_scopes_.resize(num_micro_batches);
for (int i = 0; i < num_micro_batches; ++i) {
microbatch_scopes_[i] = &minibatch_scope_->NewScope();
CopyParameters(i, program, inference_root_scope_vars);
}
} else {
microbatch_scopes_ = micro_scope_list;
for (int i = 0; i < num_micro_batches; ++i) {
CopyParameters(i, program, inference_root_scope_vars);
}
}
// Add source and sink interceptor id to rank
interceptor_id_to_rank_.emplace(SOURCE_ID, rank);
interceptor_id_to_rank_.emplace(SINK_ID, rank);
......
......@@ -25,6 +25,7 @@
#include "paddle/fluid/distributed/fleet_executor/interceptor.h"
#include "paddle/fluid/distributed/fleet_executor/interceptor_message.pb.h"
#include "paddle/fluid/distributed/fleet_executor/task_loop_thread_pool.h"
#include "paddle/fluid/framework/variable.h"
#include "paddle/fluid/platform/device_context.h"
#include "paddle/fluid/platform/enforce.h"
#include "paddle/fluid/platform/errors.h"
......@@ -60,7 +61,8 @@ class Carrier final {
framework::Scope* scope,
int64_t num_micro_batches,
const platform::Place& place,
const std::vector<std::string>& inference_root_scope_vars = {});
const std::vector<std::string>& inference_root_scope_vars = {},
const std::vector<framework::Scope*>& micro_scope_list = {});
void CopyParameters(
int microbatch_id,
......
......@@ -14,6 +14,7 @@
#include "paddle/fluid/distributed/fleet_executor/fleet_executor.h"
#include <algorithm>
#include <vector>
#include "paddle/fluid/distributed/fleet_executor/global.h"
#include "paddle/fluid/distributed/fleet_executor/message_bus.h"
......@@ -24,6 +25,7 @@
#include "paddle/fluid/framework/op_registry.h"
#include "paddle/fluid/framework/operator.h"
#include "paddle/fluid/framework/program_desc.h"
#include "paddle/fluid/framework/variable.h"
namespace paddle {
namespace distributed {
......@@ -59,7 +61,8 @@ void FleetExecutor::Init(
int64_t num_micro_batches,
const std::vector<TaskNode*>& task_nodes,
const std::unordered_map<int64_t, int64_t>& task_id_to_rank,
const std::vector<std::string>& inference_root_scope_vars) {
const std::vector<std::string>& inference_root_scope_vars,
const std::vector<framework::Scope*>& micro_scope_list) {
PADDLE_ENFORCE_GT(task_nodes.size(),
0,
platform::errors::InvalidArgument(
......@@ -144,7 +147,8 @@ void FleetExecutor::Init(
place,
num_micro_batches,
program_desc,
inference_root_scope_vars);
inference_root_scope_vars,
micro_scope_list);
GlobalVal<MessageBus>::Get()->Barrier();
}
......@@ -154,7 +158,8 @@ void FleetExecutor::InitCarrier(
const platform::Place& place,
int64_t num_micro_batches,
const framework::ProgramDesc& program_desc,
const std::vector<std::string>& inference_root_scope_vars) {
const std::vector<std::string>& inference_root_scope_vars,
const std::vector<framework::Scope*>& micro_scope_list) {
carrier->Init(exe_desc_.cur_rank(),
runtime_graph_->interceptor_id_to_rank(),
runtime_graph_->interceptor_id_to_node(),
......@@ -162,7 +167,8 @@ void FleetExecutor::InitCarrier(
scope,
num_micro_batches,
place,
inference_root_scope_vars);
inference_root_scope_vars,
micro_scope_list);
}
void FleetExecutor::InitMessageBus() {
......
......@@ -18,6 +18,7 @@
#include "paddle/fluid/distributed/fleet_executor/carrier.h"
#include "paddle/fluid/distributed/fleet_executor/fleet_executor_desc.pb.h"
#include "paddle/fluid/framework/variable.h"
#include "paddle/fluid/platform/macros.h"
#include "paddle/fluid/platform/place.h"
......@@ -45,7 +46,8 @@ class FleetExecutor final {
int64_t num_micro_batches,
const std::vector<TaskNode*>& task_nodes,
const std::unordered_map<int64_t, int64_t>& task_id_to_rank,
const std::vector<std::string>& inference_root_scope_vars = {});
const std::vector<std::string>& inference_root_scope_vars = {},
const std::vector<framework::Scope*>& micro_scope_list = {});
void Run(const std::string& carrier_id);
private:
......@@ -57,7 +59,8 @@ class FleetExecutor final {
const platform::Place& place,
int64_t num_micro_batches,
const framework::ProgramDesc& program_desc,
const std::vector<std::string>& inference_root_scope_vars = {});
const std::vector<std::string>& inference_root_scope_vars = {},
const std::vector<framework::Scope*>& micro_scope_list = {});
FleetExecutorDesc exe_desc_;
std::shared_ptr<RuntimeGraph> runtime_graph_;
std::unordered_set<std::string> carrier_ids_;
......
......@@ -2464,6 +2464,7 @@ class Executor:
program=None,
scope=None,
fleet_opt=None,
micro_scope_list=[],
with_standalone_executor=False,
):
num_micro_batches = (
......@@ -2532,6 +2533,7 @@ class Executor:
fleet_opt['task_id_to_rank'] = task_id_to_rank
place = core.Place()
place.set_place(self.place)
# NOTE: the last argument is used to force create some vars in root scope,
# won't be used during train.
self._fleet_executor.init(
......@@ -2543,6 +2545,7 @@ class Executor:
tasks,
task_id_to_rank,
[],
micro_scope_list,
)
def _run_using_fleet_executor(
......@@ -2624,11 +2627,20 @@ class Executor:
)
fetch_task.set_program(fetch_program)
micro_scope_list = []
if (
"inference_generation" in fleet_opt
and fleet_opt["inference_generation"]
):
for i in range(int(fleet_opt["num_micro_batches"])):
micro_scope_list.append(cached_scope.new_scope())
self._prepare_fleet_executor_carrier(
cache_key,
program=cached_program,
scope=cached_scope,
fleet_opt=fleet_opt,
micro_scope_list=micro_scope_list,
with_standalone_executor=with_standalone_executor,
)
......@@ -2653,6 +2665,18 @@ class Executor:
self._fleet_executor.run(cache_key)
if "fetch_var" in fleet_opt:
# If we speed up the generation in evaluation, we need to generate
# multiple queries at the same time. Each query will in separate scope in order
# not mix up. It indicate that final result will in multiple scopes and need to
# fetch each.
result_list = []
for scope in micro_scope_list:
for var in fleet_opt["fetch_var"]:
tensor = core.get_variable_tensor(scope, var)
result_list.append(as_numpy(tensor))
return result_list
if fetch_list:
arr = cached_scope.find_var(fetch_var_name).get_fetch_list()
tensors = arr._move_to_list()
......
......@@ -14,6 +14,8 @@
import unittest
import numpy as np
import paddle
import paddle.fluid.core as core
from paddle.distributed.fleet.fleet_executor_utils import TaskNode
......@@ -21,13 +23,26 @@ from paddle.distributed.fleet.fleet_executor_utils import TaskNode
paddle.enable_static()
def cond(i, ten):
def cond(i, ten, data):
return i < ten
def body(i, ten):
def body(i, ten, data):
i = i + 1
return [i, ten]
data = data + 1
return [i, ten, data]
num_micro_batches = 3
def batch_generator_creator():
def __reader__():
for i in range(num_micro_batches):
data = np.full(shape=[1, 1], fill_value=i, dtype=np.float32)
yield data
return __reader__
class TestFleetExecutor(unittest.TestCase):
......@@ -41,7 +56,16 @@ class TestFleetExecutor(unittest.TestCase):
ten = paddle.full(
shape=[1], fill_value=10, dtype='int64'
) # loop length
i, ten = paddle.static.nn.while_loop(cond, body, [i, ten])
data = paddle.static.data(name='x', shape=[1])
loader = paddle.fluid.io.DataLoader.from_generator(
feed_list=[data], capacity=num_micro_batches * 4, iterable=False
)
loader.set_batch_generator(
batch_generator_creator(), paddle.CUDAPlace(0)
)
paddle.static.nn.while_loop(cond, body, [i, ten, data])
program_a = paddle.static.Program()
program_b = paddle.static.Program()
......@@ -49,18 +73,27 @@ class TestFleetExecutor(unittest.TestCase):
for var_name in main_program.block(0).vars:
if var_name != "_generated_var_0":
var = main_program.block(0).var(var_name)
program_a.block(0).create_var(
name=var_name,
shape=var.shape,
dtype=var.dtype,
stop_gradient=var.stop_gradient,
)
program_b.block(0).create_var(
name=var_name,
shape=var.shape,
dtype=var.dtype,
stop_gradient=var.stop_gradient,
)
if (
var_name == "create_py_reader_0"
or var_name == "double_buffer_0"
):
program_a.block(0).create_var(
name=var_name,
persistable=var.persistable,
)
else:
program_a.block(0).create_var(
name=var_name,
shape=var.shape,
dtype=var.dtype,
stop_gradient=var.stop_gradient,
)
program_b.block(0).create_var(
name=var_name,
shape=var.shape,
dtype=var.dtype,
stop_gradient=var.stop_gradient,
)
for op in main_program.block(0).ops:
if op.type != "while":
......@@ -89,7 +122,6 @@ class TestFleetExecutor(unittest.TestCase):
)
cond_var_name = "tmp_0"
num_micro_batches = 3
task_a = TaskNode(
0,
......@@ -159,12 +191,19 @@ class TestFleetExecutor(unittest.TestCase):
task_e.task_id(): 0,
},
'num_micro_batches': num_micro_batches,
'inference_generation': True,
'fetch_var': ['x'],
},
}
place = paddle.fluid.CUDAPlace(0)
exe = paddle.fluid.Executor(place)
exe.run(main_program)
place = paddle.CUDAPlace(0)
exe = paddle.static.Executor(place)
loader.start()
res = exe.run(main_program)
ref_res = np.full([1], 10, dtype="float32")
for data in res:
np.testing.assert_allclose(data, ref_res, rtol=1e-05)
ref_res = ref_res + 1
if __name__ == "__main__":
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册