diff --git a/lite/src/mge/network_impl.cpp b/lite/src/mge/network_impl.cpp index 03c5e5a4a14f4c677ad63a2d8559f5a47c3045f8..247c486afb132ab492fca1a67ea94ef38ad2e1fa 100644 --- a/lite/src/mge/network_impl.cpp +++ b/lite/src/mge/network_impl.cpp @@ -207,6 +207,9 @@ void NetworkImplDft::use_tensorrt() { //! set the callback in async model void NetworkImplDft::set_async_callback(const AsyncCallback& callback) { LITE_ASSERT(!m_is_cpu_inplace_mode, "cpu inplace mode not support async mode"); + LITE_ASSERT( + m_user_config->options.comp_node_seq_record_level == 0, + "record mode not support async mode"); LITE_ASSERT( m_user_config->device_type == LiteDeviceType::LITE_CPU || m_user_config->device_type == LiteDeviceType::LITE_CUDA, diff --git a/src/core/impl/comp_node/comp_node.cpp b/src/core/impl/comp_node/comp_node.cpp index c37c6247290c6b74605f4ceb3005b076e1a99525..6854e4087416684bae67a5c6b6672fad4ea4342a 100644 --- a/src/core/impl/comp_node/comp_node.cpp +++ b/src/core/impl/comp_node/comp_node.cpp @@ -659,4 +659,21 @@ void CompNode::ImplBase::add_callback(megdnn::thin_function&&) { locator().to_string().c_str()); } +void CompNode::ImplBase::enable_dispatch() { + mgb_throw( + MegBrainError, + "Unsupported add callback to " + "comp node %s", + locator().to_string().c_str()); +} + +void CompNode::ImplBase::disable_dispatch(bool* flag) { + MGB_MARK_USED_VAR(flag); + mgb_throw( + MegBrainError, + "Unsupported add callback to " + "comp node %s", + locator().to_string().c_str()); +} + // vim: syntax=cpp.doxygen foldmethod=marker foldmarker=f{{{,f}}} diff --git a/src/core/impl/comp_node/cpu/comp_node.cpp b/src/core/impl/comp_node/cpu/comp_node.cpp index 6ec87c6cbdc516a9f7c37209ea285c5bb2374f0d..5dbb9c596b439376958d1a40a585caaa607e94d5 100644 --- a/src/core/impl/comp_node/cpu/comp_node.cpp +++ b/src/core/impl/comp_node/cpu/comp_node.cpp @@ -810,6 +810,12 @@ public: task(); } } + + void enable_dispatch() override { m_env.cpu_env().enable_dispatch(); } + + void disable_dispatch(bool* flag) override { + m_env.cpu_env().disable_dispatch(flag); + } }; MGB_DYN_TYPE_OBJ_FINAL_IMPL(CompNodeRecorderImpl); #if MGB_HAVE_THREAD diff --git a/src/core/impl/comp_node_env.cpp b/src/core/impl/comp_node_env.cpp index 9a28f6415e74b089f16e2645d9b1e3b0bc789c32..b4421ed127ce289416e711143492dc17316f4e67 100644 --- a/src/core/impl/comp_node_env.cpp +++ b/src/core/impl/comp_node_env.cpp @@ -474,4 +474,35 @@ void CompNodeEnv::on_bad_device_type(DeviceType expected) const { MGB_VERSION_SYMBOL3(MEGDNN, MEGDNN_MAJOR, MEGDNN_MINOR, MEGDNN_PATCH); +void CompNodeEnv::CpuEnv::enable_dispatch() { + do_task_inplace = nullptr; +} + +void CompNodeEnv::CpuEnv::disable_dispatch(bool* flag) { + do_task_inplace = flag; +} + +void CompNodeEnv::CpuEnv::dispatch(Task&& task) const { + if (do_task_inplace && *do_task_inplace) { + task(); + } else { + dispatcher->dispatch(std::move(task)); + } +} + +void CompNodeEnv::CpuEnv::dispatch( + MultiThreadingTask&& task, size_t parallelism) const { + if (do_task_inplace && *do_task_inplace) { + for (size_t i = 0; i < parallelism; ++i) { + task(i, 0); + } + } else { + dispatcher->dispatch(std::move(task), parallelism); + } +} + +#if MGB_HAVE_THREAD +MGB_THREAD_LOCAL_PTR(bool) CompNodeEnv::CpuEnv::do_task_inplace = nullptr; +#endif + // vim: syntax=cpp.doxygen foldmethod=marker foldmarker=f{{{,f}}} diff --git a/src/core/impl/graph/cg_impl.cpp b/src/core/impl/graph/cg_impl.cpp index 8d7c816f2220aa2a16ea209e393f3c1d0e07fd90..f897c7552030188041adec37eaf5c059991ee73d 100644 --- a/src/core/impl/graph/cg_impl.cpp +++ b/src/core/impl/graph/cg_impl.cpp @@ -168,12 +168,32 @@ MGB_DEFINE_OPR_CLASS( ComputingGraphImpl::CallbackCaller, SingleCNOperatorNodeBase) // { std::vector> m_cb; + //! CallbackCaller supports change memory address in output tensor record mode(only + //! on CPU). The whole callback will be dispatched(like dispatching tensor copy + //! instead of dispatching memcpy). + //! Side effect: sync() is not supported in callback anymore. Users should call + //! func->wait() instead out of callback to sync data from Device to Host. + //! Note : only record level 1 supports change memory address in output tensor. + //! HostTensor captured in callback should not on cpu default. void scn_do_execute() override { for (size_t i = 0; i < input().size(); ++i) { auto&& in = input(i)->dev_tensor(); for (auto&& callback : m_cb[i]) { - // const cast for backward API compatibility - callback(const_cast(in)); + if (this->owner_graph()->options().comp_node_seq_record_level == 1 && + in.comp_node().device_type() == CompNode::DeviceType::CPU && + in.comp_node() != CompNode::default_cpu()) { + auto record_cb = [&in, &callback]() { + auto comp_node = in.comp_node(); + bool do_task_inplace = true; + comp_node.disable_dispatch(&do_task_inplace); + callback(const_cast(in)); + comp_node.enable_dispatch(); + }; + in.comp_node().add_callback(record_cb); + } else { + // const cast for backward API compatibility + callback(const_cast(in)); + } } } } diff --git a/src/core/include/megbrain/comp_node.h b/src/core/include/megbrain/comp_node.h index 3223d173ed962bd9b03aaedb16ea7bdc707d1f0c..9bb7eea08afdfebdfaf459c26878dfe8e22f3f10 100644 --- a/src/core/include/megbrain/comp_node.h +++ b/src/core/include/megbrain/comp_node.h @@ -412,6 +412,16 @@ public: return m_impl->add_callback(std::move(cb)); } + /*! + * enable dispatcher + */ + void enable_dispatch() { m_impl->enable_dispatch(); } + + /*! + * disable dispatcher so that task will be done inplace + */ + void disable_dispatch(bool* flag) { m_impl->disable_dispatch(flag); } + enum class Flag : uint32_t { //! Whether computing recorder is supported on this comp node (i.e. //! whether non-zero comp_node_seq_record_level is allowed) @@ -532,6 +542,10 @@ protected: virtual void add_callback(megdnn::thin_function&&); + virtual void enable_dispatch(); + + virtual void disable_dispatch(bool* flag); + virtual uint64_t get_uid() { mgb_throw(MegBrainError, "get_uid is not impl yet"); }; diff --git a/src/core/include/megbrain/comp_node_env.h b/src/core/include/megbrain/comp_node_env.h index fac2ba0962929b360483bb6f7e3a59354ab945f5..759c1e34196efcd8e38b165472849a11faedc514 100644 --- a/src/core/include/megbrain/comp_node_env.h +++ b/src/core/include/megbrain/comp_node_env.h @@ -503,12 +503,19 @@ public: using AffinityCallBack = thin_function; std::shared_ptr dispatcher; +#if MGB_HAVE_THREAD + static MGB_THREAD_LOCAL_PTR(bool) do_task_inplace; +#else + bool* do_task_inplace = nullptr; +#endif - void dispatch(Task&& task) const { dispatcher->dispatch(std::move(task)); } + void enable_dispatch(); - void dispatch(MultiThreadingTask&& task, size_t parallelism) const { - dispatcher->dispatch(std::move(task), parallelism); - } + void disable_dispatch(bool* flag); + + void dispatch(Task&& task) const; + + void dispatch(MultiThreadingTask&& task, size_t parallelism) const; void set_affinity(AffinityCallBack&& cb) const { dispatcher->set_affinity(std::move(cb)); @@ -521,6 +528,12 @@ public: return m_cpu_env; } + CpuEnv& cpu_env() { + if (mgb_unlikely(m_property.type != DeviceType::CPU)) + on_bad_device_type(DeviceType::CPU); + return m_cpu_env; + } + //! init this as a cpu env void init_cpu(const CpuEnv& env, CompNode comp_node); diff --git a/src/core/test/comp_node_helper.cpp b/src/core/test/comp_node_helper.cpp index a3d84cfa48a458e61b5286d63a9af64df72e26c9..43c0fcca3cc01fa8eea6f3a4299a91a21c906db9 100644 --- a/src/core/test/comp_node_helper.cpp +++ b/src/core/test/comp_node_helper.cpp @@ -44,7 +44,7 @@ void run_comp_seq_rec_basic(CompNode cn, bool fake_first) { graph->options().fake_next_exec = true; graph->options().var_sanity_check_first_run = false; } - auto func = graph->compile({make_callback_copy(z, host_z)}); + auto func = graph->compile({make_callback_copy(z, host_z, false)}); if (fake_first) { func->execute(); // first exec } @@ -55,6 +55,8 @@ void run_comp_seq_rec_basic(CompNode cn, bool fake_first) { } host_x->copy_from_fixlayout(*gen(host_x->shape(), cn)); func->execute(); + func->wait(); + host_z.sync(); auto expect = eval_conv_cpu(*host_x, *host_y, param); MGB_ASSERT_TENSOR_NEAR(expect, host_z, 1e-3) << "iter " << iter; } @@ -70,6 +72,28 @@ void run_comp_seq_rec_basic(CompNode cn, bool fake_first) { ASSERT_EQ(executed[2], change); // create new recorder, exec with recorder ASSERT_EQ(executed[3], change + 1); + + //! then we change host_z's ptr each time and check result + HostTensorND host_iter; + host_iter.copy_from(host_z); + std::vector> m_hosts(10); + for (size_t i = 0; i < 10; i++) { + m_hosts[i] = gen(host_z.shape(), host_z.comp_node()); + } + iter = 0; + for (; iter < 10; ++iter) { + auto host_tmp = m_hosts[iter]; + auto host_z_storage = host_z.storage(); + auto origin_ptr = host_z_storage.raw_storage(); + host_z_storage.reset( + host_z.comp_node(), host_z_storage.size(), + host_tmp->storage().raw_storage()); + auto changed_ptr = host_z_storage.raw_storage(); + ASSERT_TRUE(origin_ptr != changed_ptr); + func->execute(); + func->wait(); + MGB_ASSERT_TENSOR_NEAR(host_iter, host_z, 1e-3) << "iter " << iter; + } } void run_comp_seq_rec_basic_level2(CompNode cn) { @@ -154,7 +178,7 @@ void run_comp_seq_rec_dyn_elemwise(CompNode cn, bool fake_first) { w = opr::Elemwise::make({x, y, z}, opr::Elemwise::Mode::FUSE_MUL_ADD3); HostTensorND host_w; - auto func = graph->compile({make_callback_copy(w, host_w)}); + auto func = graph->compile({make_callback_copy(w, host_w, false)}); if (fake_first) { func->execute(); } @@ -166,9 +190,30 @@ void run_comp_seq_rec_dyn_elemwise(CompNode cn, bool fake_first) { } host_x->copy_from(*gen(host_x->shape(), cn)); func->execute(); + func->wait(); auto expect = check(); MGB_ASSERT_TENSOR_EQ(expect, host_w) << "iter " << i; } + //! then we change host_z's ptr each time and check result + HostTensorND host_iter; + host_iter.copy_from(host_w); + std::vector> m_hosts(10); + for (size_t i = 0; i < 10; i++) { + m_hosts[i] = gen(host_w.shape(), host_w.comp_node()); + } + for (size_t iter = 0; iter < 10; ++iter) { + auto host_tmp = m_hosts[iter]; + auto host_w_storage = host_w.storage(); + auto origin_ptr = host_w_storage.raw_storage(); + host_w_storage.reset( + host_w.comp_node(), host_w_storage.size(), + host_tmp->storage().raw_storage()); + auto changed_ptr = host_w_storage.raw_storage(); + ASSERT_TRUE(origin_ptr != changed_ptr); + func->execute(); + func->wait(); + MGB_ASSERT_TENSOR_EQ(host_iter, host_w) << "iter " << iter; + } } void run_level2(CompNode cn, bool use_multi_holder) { @@ -381,6 +426,9 @@ void run(CompNode cn) { HostTensorND host_y; graph->options().var_sanity_check_first_run = false; graph->options().comp_node_seq_record_level = level; + if (level == 1) { + sync = false; + } auto cb = [&](const DeviceTensorND& dv) { host_y.copy_from(dv); if (sync) { @@ -418,6 +466,9 @@ void run(CompNode cn) { HostTensorND host_y; graph->options().var_sanity_check_first_run = false; graph->options().comp_node_seq_record_level = level; + if (level == 1) { + sync = false; + } auto cb = [&](const DeviceTensorND& dv) { host_y.copy_from(dv); if (sync) { @@ -428,8 +479,8 @@ void run(CompNode cn) { if (level == 2) { ComputingGraph::assert_destroy(graph); } - for (int i = 0; i < 3; ++i) { - host_x->copy_from(*gen(host_x->shape())); + for (int k = 0; k < 3; ++k) { + host_x->copy_from(*gen(host_x->shape(), cn)); HostTensorND expect{host_x->comp_node(), {5, 4}}; auto px = host_x->ptr(), py = expect.ptr(); for (int i = 0; i < 5; ++i) { @@ -504,14 +555,16 @@ void run(CompNode cn) { y = opr::Host2DeviceCopy::make(*graph, host_y), z = opr::Convolution::make(x, y, param); graph->options().comp_node_seq_record_level = 1; - return graph->compile({make_callback_copy(z, host_z_v[graph_id])}); + return graph->compile({make_callback_copy(z, host_z_v[graph_id], false)}); }; funcs.push_back(gen_graph(0)); funcs.push_back(gen_graph(1)); for (int iter = 0; iter < 10; ++iter) { host_x->copy_from_fixlayout(*gen(host_x->shape(), cn)); funcs[0]->execute(); + funcs[0]->wait(); funcs[1]->execute(); + funcs[1]->wait(); auto expect = eval_conv_cpu(*host_x, *host_y, param); MGB_ASSERT_TENSOR_NEAR(expect, host_z_v[0], 1e-3) << "iter " << iter; MGB_ASSERT_TENSOR_NEAR(expect, host_z_v[1], 1e-3) << "iter " << iter; diff --git a/src/core/test/graph/misc.cpp b/src/core/test/graph/misc.cpp index aa0d7c808a089aae41d05960770b11aada64ccaa..1378f03996fceee403333711ef1c92da059a812d 100644 --- a/src/core/test/graph/misc.cpp +++ b/src/core/test/graph/misc.cpp @@ -1375,13 +1375,17 @@ TEST(TestGraph, CompNodeFinalize) { graph->options().var_sanity_check_first_run = false; graph->options().comp_node_seq_record_level = rec; } - auto func = graph->compile({make_callback_copy(z, host_z)}); + auto sync = (rec != 1); + auto func = graph->compile({make_callback_copy(z, host_z, sync)}); if (rec == 2) { ComputingGraph::assert_destroy(graph); } for (int i = 0; i < 5; ++i) { host_x->copy_from(*gen({1}, cn)); func->execute(); + if (!sync) { + func->wait(); + } MGB_ASSERT_FLOAT_EQ( host_x->ptr()[0] + host_y->ptr()[0], host_z.ptr()[0]); @@ -1933,6 +1937,7 @@ void test_free_memory_in_weight_preprocess(int record_level, CompNode cn) { #endif graph->options().graph_opt.weight_preprocess = true; graph->options().comp_node_seq_record_level = record_level; + auto sync = (record_level != 1); auto mkvar = [&](const char* name, const TensorShape& shp) { return opr::Host2DeviceCopy::make(*graph, gen(shp, cn)).rename(name); }; @@ -1970,11 +1975,17 @@ void test_free_memory_in_weight_preprocess(int record_level, CompNode cn) { }); HostTensorND host_y; - auto func = graph->compile({make_callback_copy(y, host_y)}); + auto func = graph->compile({make_callback_copy(y, host_y, sync)}); //! flag the no need memory of var func->execute(); + if (!sync) { + func->wait(); + } //! free the no need memory of var func->execute(); + if (!sync) { + func->wait(); + } auto check = [&](SymbolVar v) { ASSERT_TRUE(v.node()->contain_flag(VarNode::Flag::MEMORY_NO_NEED)); ASSERT_TRUE(v.node()->dev_tensor().empty()); diff --git a/src/core/test/graph/multi_thread.cpp b/src/core/test/graph/multi_thread.cpp index 5f52650db3f357e2d37c7d3715b0f9edb349a46f..5cf48a4f13bfb9d27780466b99c2d47d96df274d 100644 --- a/src/core/test/graph/multi_thread.cpp +++ b/src/core/test/graph/multi_thread.cpp @@ -213,9 +213,13 @@ TEST(TestGraph, MultiThreadRecorder) { z = opr::Convolution::make(x, y, param); graph->options().comp_node_seq_record_level = record_level; graph->options().var_sanity_check_first_run = false; - auto func = graph->compile({make_callback_copy(z, host_z)}); + auto sync = (record_level != 1); + auto func = graph->compile({make_callback_copy(z, host_z, sync)}); for (int i = 0; i < 5; i++) { func->execute(); + if (!sync) { + func->wait(); + } } auto expect = eval_conv_cpu(*host_x, *host_y, param); MGB_ASSERT_TENSOR_NEAR(expect, host_z, 1e-3); diff --git a/src/plugin/test/opr_io_dump.cpp b/src/plugin/test/opr_io_dump.cpp index 7ca987ef3eaba655f7e3222e8597cf240aaeebe1..c9466973e4ab9f6bb73a09805d032a0855abc4de 100644 --- a/src/plugin/test/opr_io_dump.cpp +++ b/src/plugin/test/opr_io_dump.cpp @@ -50,6 +50,7 @@ void run_test(CompNode cn, const PluginMaker& plugin_maker) { graph->options().var_sanity_check_first_run = false; graph->options().comp_node_seq_record_level = record; graph->options().graph_opt_level = 0; + auto sync = (record != 1); auto plug = plugin_maker(graph.get(), record); // make a non-contiguous value, also introduce some shape dependencies @@ -76,11 +77,14 @@ void run_test(CompNode cn, const PluginMaker& plugin_maker) { cg::DepOprIter{cb_rename}.add(y); HostTensorND host_y; - auto func = graph->compile({make_callback_copy(y, host_y)}); + auto func = graph->compile({make_callback_copy(y, host_y, sync)}); if (record == 2) { ComputingGraph::assert_destroy(graph); } func->execute(); + if (!sync) { + func->wait(); + } plug->flush_lazy(); MGB_ASSERT_TENSOR_EQ(make_expect(), host_y); @@ -91,10 +95,16 @@ void run_test(CompNode cn, const PluginMaker& plugin_maker) { *host_x = *gen(host_x->shape(), cn); } func->execute(); + if (!sync) { + func->wait(); + } MGB_ASSERT_TENSOR_EQ(make_expect(), host_y); for (int i = 0; i < 2; ++i) { host_x->copy_from(*gen(host_x->shape(), cn)); func->execute(); + if (!sync) { + func->wait(); + } MGB_ASSERT_TENSOR_EQ(make_expect(), host_y); }