未验证 提交 59dd97af 编写于 作者: R Ruibiao Chen 提交者: GitHub

Share workqueue cross-interpretercores (#54780)

* Share workqueue cross-interpretercores

* Fix UT
上级 05bd4a89
......@@ -72,12 +72,11 @@ StandaloneExecutor::StandaloneExecutor(const platform::Place& place,
auto kernel_program =
paddle::dialect::PdOpLowerToKernelPass(base_program.get());
interpretercores_.emplace_back(std::make_unique<InterpreterCore>(
interpretercores_.emplace_back(std::make_shared<InterpreterCore>(
place_, std::move(kernel_program), scope_, execution_config));
} else {
interpretercores_.emplace_back(
std::make_unique<InterpreterCore>(place_,
std::make_shared<InterpreterCore>(place_,
program->Block(0),
micro_batch_scopes_[micro_batch_id],
execution_config));
......@@ -100,6 +99,17 @@ paddle::framework::FetchList StandaloneExecutor::Run(
}
const auto& jobs = plan_.JobList();
if (!is_interpretercore_build_result_shared_) {
for (size_t job_idx = 1; job_idx < jobs.size(); ++job_idx) {
interpretercores_[job_idx]->ShareWorkQueueFrom(interpretercores_[0]);
// TODO(Ruibiao): Share other build result, e.g., kernel choosing, data
// transfer, op dependency, thread scheduling, GC, event analyzer, and so
// on.
}
is_interpretercore_build_result_shared_ = true;
}
for (size_t job_idx = 0; job_idx < jobs.size(); ++job_idx) {
const auto& job = jobs[job_idx];
const std::string& job_type = job->Type();
......
......@@ -42,11 +42,12 @@ class StandaloneExecutor {
paddle::framework::FetchList Run(const std::vector<std::string>& feed_names);
private:
bool is_interpretercore_build_result_shared_{false};
const platform::Place place_;
const interpreter::Plan plan_;
std::vector<framework::Scope*> micro_batch_scopes_;
std::vector<std::unique_ptr<InterpreterCore>> interpretercores_;
std::vector<std::shared_ptr<InterpreterCore>> interpretercores_;
Scope* scope_;
};
......
......@@ -13,6 +13,7 @@
# limitations under the License.
import platform
import unittest
import numpy as np
......@@ -214,6 +215,15 @@ class TestEncorderMulitMicroBatchRun(unittest.TestCase):
return res
def check_result(self, expected_result, actual_result):
# FIXME(Ruibiao): The output result of Encorder layers is unstable in some case.
if self.place.is_cpu_place() or platform.system().lower() == "windows":
np.testing.assert_allclose(
expected_result, actual_result, atol=1e-6, rtol=1e-6
)
else:
np.testing.assert_equal(expected_result, actual_result)
def test_multi_micro_batch_run(self):
last_res = None
......@@ -222,9 +232,7 @@ class TestEncorderMulitMicroBatchRun(unittest.TestCase):
res = self.run_train(split, micro_batch_num)
if last_res:
for i in range(len(res)):
np.testing.assert_allclose(
last_res[i], res[i], atol=1e-6, rtol=1e-6
)
self.check_result(last_res[i], res[i])
last_res = res
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册