提交 8e9dd92c 编写于 作者: A azural 提交者: Liangliang Zhang

framework: Update task interface

Conflicts:
	modules/planning/reference_line/reference_line_provider.cc
	modules/planning/reference_line/reference_line_provider.h
上级 5407293e
......@@ -39,6 +39,7 @@ file(GLOB CYBERTRON_SRCS
"cybertron/data/*.cpp"
"cybertron/component/timer_component.cpp"
"cybertron/class_loader/*.cpp"
"cybertron/task/*.cpp"
"cybertron/time/*.cpp"
"cybertron/timer/*.cpp"
"cybertron/tf2_cybertron/*.cpp"
......
......@@ -30,6 +30,7 @@ namespace croutine {
using routine_t = uint32_t;
using CRoutineFunc = std::function<void()>;
using Duration = std::chrono::microseconds;
enum class RoutineState {
READY,
......@@ -90,12 +91,11 @@ class CRoutine {
CRoutine::Yield();
}
// TODO(hewei03): use cybertron::Time instead
inline void Sleep(const useconds_t &usec) {
inline void Sleep(const Duration& sleep_duration) {
state_ = RoutineState::SLEEP;
wake_time_ =
std::chrono::steady_clock::now() + std::chrono::microseconds(usec);
statistic_info_.sleep_time += usec / 1000;
std::chrono::steady_clock::now() + sleep_duration;
statistic_info_.sleep_time += sleep_duration.count() / 1000;
CRoutine::Yield();
}
......
......@@ -25,7 +25,7 @@
#include "cybertron/component/component.h"
#include "cybertron/init.h"
#include "cybertron/node/node.h"
#include "cybertron/scheduler/task.h"
#include "cybertron/task/task.h"
#include "cybertron/time/time.h"
#include "cybertron/timer/timer.h"
......@@ -35,55 +35,6 @@ namespace cybertron {
std::unique_ptr<Node> CreateNode(const std::string& node_name,
const std::string& name_space = "");
template <typename Function>
std::unique_ptr<Task<>> CreateTask(const std::string& name, Function&& f,
const uint8_t num_threads = 1) {
if (!OK()) {
return nullptr;
}
std::unique_ptr<Task<>> task(
new Task<>(name, std::forward<Function>(f), num_threads));
return std::move(task);
}
template <typename T, typename Function>
std::unique_ptr<Task<T>> CreateTask(const std::string& name, Function&& f,
const uint8_t& num_threads = 1) {
if (!OK()) {
return nullptr;
}
std::unique_ptr<Task<T>> task(
new Task<T>(name, std::forward<Function>(f), num_threads));
return std::move(task);
}
template <typename T, typename ReturnType, typename Function>
std::unique_ptr<Task<T, ReturnType>> CreateTask(
const std::string& name, Function&& f, const uint8_t& num_threads = 1) {
if (!OK()) {
return nullptr;
}
std::unique_ptr<Task<T, ReturnType>> task(
new Task<T, ReturnType>(name, std::forward<Function>(f), num_threads));
return std::move(task);
}
inline static void Yield() {
if (croutine::CRoutine::GetCurrentRoutine()) {
croutine::CRoutine::Yield();
} else {
std::this_thread::yield();
}
}
inline static void USleep(useconds_t usec) {
auto routine = croutine::CRoutine::GetCurrentRoutine();
if (routine == nullptr) {
std::this_thread::sleep_for(std::chrono::microseconds{usec});
} else {
routine->Sleep(usec);
}
}
} // namespace cybertron
} // namespace apollo
......
......@@ -34,6 +34,7 @@
#include "cybertron/node/node.h"
#include "cybertron/scheduler/scheduler.h"
#include "cybertron/service_discovery/topology_manager.h"
#include "cybertron/task/task.h"
namespace apollo {
namespace cybertron {
......@@ -85,6 +86,7 @@ bool Init() {
common::GlobalData::Instance();
service_discovery::TopologyManager::Instance();
scheduler::Scheduler::Instance();
TaskManager::Instance();
PerfEventCache::Instance();
// Register exit handlers
......
/******************************************************************************
* Copyright 2018 The Apollo Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*****************************************************************************/
#ifndef CYBERTRON_SCHEDULER_TASK_H_
#define CYBERTRON_SCHEDULER_TASK_H_
#include <memory>
#include <string>
#include <type_traits>
#include <utility>
#include <vector>
#include "cybertron/common/global_data.h"
#include "cybertron/croutine/croutine.h"
#include "cybertron/croutine/routine_factory.h"
#include "cybertron/data/data_notifier.h"
#include "cybertron/init.h"
#include "cybertron/scheduler/scheduler.h"
namespace apollo {
namespace cybertron {
static const char* task_prefix = "/internal/task/";
template <typename Type, typename Ret>
struct TaskData {
std::shared_ptr<Type> raw_data;
std::promise<Ret> prom;
};
template <typename T, typename R>
class SharedData {
public:
std::shared_ptr<TaskData<T, R>> GetTaskData() {
std::lock_guard<std::mutex> lg(mutex_);
if (data_list_.empty()) {
return nullptr;
}
auto task = data_list_.front();
data_list_.pop_front();
return task;
}
void ClearData() {
std::lock_guard<std::mutex> lg(mutex_);
data_list_.clear();
}
void Enqueue(const std::shared_ptr<TaskData<T, R>>& data) {
std::lock_guard<std::mutex> lg(mutex_);
data_list_.push_back(data);
}
private:
std::mutex mutex_;
std::list<std::shared_ptr<TaskData<T,R>>> data_list_;
};
template <typename T, typename R, typename Derived>
class TaskBase {
public:
virtual ~TaskBase();
void Stop();
std::future<R> Execute(const std::shared_ptr<T>& val);
protected:
void RegisterCallback(
std::function<void(const std::shared_ptr<TaskData<T, R>>&)>&& func);
void RegisterCallback(std::function<void()>&& func);
private:
TaskBase(const std::string& name, const uint8_t num_threads = 1);
friend Derived;
std::string name_;
uint8_t num_threads_;
uint64_t task_id_;
std::shared_ptr<SharedData<T, R>> shared_data_;
};
template <typename T, typename R, typename Derived>
TaskBase<T, R, Derived>::TaskBase(const std::string& name,
const uint8_t num_threads)
: num_threads_(num_threads), shared_data_(new SharedData<T,R>()) {
if (num_threads_ > scheduler::Scheduler::ProcessorNum()) {
num_threads_ = scheduler::Scheduler::ProcessorNum();
}
name_ = task_prefix + name;
task_id_ = common::GlobalData::RegisterChannel(name_);
}
template <typename T, typename R, typename Derived>
void TaskBase<T, R, Derived>::RegisterCallback(std::function<void()>&& func) {
croutine::RoutineFactory factory = croutine::CreateRoutineFactory(func);
for (int i = 0; i < num_threads_; ++i) {
scheduler::Scheduler::Instance()->CreateTask(factory,
name_ + std::to_string(i));
}
}
template <typename T = void, typename R = void>
class Task : public TaskBase<T, R, Task<T, R>> {
public:
template <typename Function>
Task(const std::string& name, Function&& f, const uint8_t num_threads = 1);
};
template <typename T, typename R>
template <typename Function>
Task<T, R>::Task(const std::string& name, Function&& f,
const uint8_t num_threads)
: TaskBase<T, R, Task<T, R>>(name, num_threads) {
auto func = [ f = std::forward<Function&&>(f), data = this->shared_data_]() {
while (true) {
auto msg = data->GetTaskData();
if (msg == nullptr) {
auto routine = croutine::CRoutine::GetCurrentRoutine();
routine->Sleep(1000);
continue;
}
msg->prom.set_value(f(msg->raw_data));
}
};
this->RegisterCallback(std::move(func));
}
template <typename T>
class Task<T, void> : public TaskBase<T, void, Task<T, void>> {
public:
template <typename Function>
Task(const std::string& name, Function&& f, const uint8_t num_threads = 1);
};
template <typename T>
template <typename Function>
Task<T, void>::Task(const std::string& name, Function&& f,
const uint8_t num_threads)
: TaskBase<T, void, Task<T, void>>(name, num_threads) {
auto func = [ f = std::forward<Function&&>(f), data = this->shared_data_ ]() {
while (true) {
auto msg = data->GetTaskData();
if (msg == nullptr) {
auto routine = croutine::CRoutine::GetCurrentRoutine();
routine->Sleep(1000);
continue;
}
f(msg->raw_data);
msg->prom.set_value();
}
};
this->RegisterCallback(std::move(func));
}
template <typename T, typename R, typename Derived>
void TaskBase<T, R, Derived>::Stop() {
shared_data_->ClearData();
for (int i = 0; i < num_threads_; ++i) {
scheduler::Scheduler::Instance()->RemoveTask(name_ + std::to_string(i));
}
}
template <typename T, typename R, typename Derived>
TaskBase<T, R, Derived>::~TaskBase() {
Stop();
}
template <typename T, typename R, typename Derived>
std::future<R> TaskBase<T, R, Derived>::Execute(const std::shared_ptr<T>& val) {
auto task = std::make_shared<TaskData<T, R>>();
task->raw_data = val;
shared_data_->Enqueue(task);
return task->prom.get_future();
}
template <>
template <typename Function>
Task<void, void>::Task(const std::string& name, Function&& f,
const uint8_t num_threads)
: TaskBase<void, void, Task<void, void>>(name, num_threads) {
auto data = std::make_shared<TaskData<void, void>>();
auto func = [ f = std::forward<Function&&>(f), data ]() {
f();
data->prom.set_value();
};
RegisterCallback(std::move(func));
}
} // namespace cybertron
} // namespace apollo
#endif // CYBERTRON_SCHEDULER_TASK_H_
/******************************************************************************
* Copyright 2018 The Apollo Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*****************************************************************************/
#ifndef CYBERTRON_TASK_TASK_H_
#define CYBERTRON_TASK_TASK_H_
#include <future>
#include "cybertron/task/task_manager.h"
namespace apollo {
namespace cybertron {
template <typename F, typename... Args>
static auto Async(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type> {
return TaskManager::Instance()->Enqueue(std::forward<F>(f),
std::forward<Args>(args)...);
}
static inline void Yield() {
if (croutine::CRoutine::GetCurrentRoutine()) {
croutine::CRoutine::Yield();
} else {
std::this_thread::yield();
}
}
template <typename Rep, typename Period>
static void SleepFor(const std::chrono::duration<Rep, Period>& sleep_duration) {
if (croutine::CRoutine::GetCurrentRoutine()) {
croutine::CRoutine::Sleep(sleep_duration);
} else {
std::this_thread::sleep_for(sleep_duration);
}
}
static inline void USleep(useconds_t usec) {
auto routine = croutine::CRoutine::GetCurrentRoutine();
if (routine == nullptr) {
std::this_thread::sleep_for(std::chrono::microseconds{usec});
} else {
routine->Sleep(croutine::Duration(usec));
}
}
} // namespace cybertron
} // namespace apollo
#endif // CYBERTRON_TASK_TASK_H_
/**************Scheduler::****************************************************************
* Copyright 2018 The Apollo Authors. All Rights Reserved.
*
* Licensed under the Apache License, Vesched_infoon 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*****************************************************************************/
#include "cybertron/task/task_manager.h"
#include "cybertron/task/task.h"
#include "cybertron/croutine/croutine.h"
#include "cybertron/croutine/routine_factory.h"
#include "cybertron/scheduler/scheduler.h"
namespace apollo {
namespace cybertron {
static const char* const task_prefix = "/internal/task";
TaskManager::TaskManager()
: task_queue_size_(1000),
task_queue_(new base::BoundedQueue<std::function<void()>>()) {
task_queue_->Init(task_queue_size_, new base::BlockWaitStrategy());
auto func = [task_queue = this->task_queue_]() {
while (!cybertron::IsShutdown()) {
std::function<void()> task;
task_queue->Dequeue(&task);
if (task == nullptr) {
auto routine = croutine::CRoutine::GetCurrentRoutine();
routine->HangUp();
continue;
}
task();
}
};
auto factory = croutine::CreateRoutineFactory(std::move(func));
num_threads_ = scheduler::Scheduler::ProcessorNum();
tasks_.reserve(num_threads_);
for (int i = 0; i < num_threads_; i++) {
auto task_name = task_prefix + std::to_string(i);
tasks_.push_back(GlobalData::RegisterTaskName(task_name));
scheduler::Scheduler::Instance()->CreateTask(factory, task_name);
}
}
TaskManager::~TaskManager() {
for (int i = 0; i < num_threads_; i++) {
scheduler::Scheduler::Instance()->RemoveTask(task_prefix +
std::to_string(i));
}
}
} // namespace cybertron
} // namespace apollo
/******************************************************************************
* Copyright 2018 The Apollo Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*****************************************************************************/
#ifndef CYBERTRON_TASK_TASK_MANAGER_H_
#define CYBERTRON_TASK_TASK_MANAGER_H_
#include <memory>
#include <string>
#include "cybertron/base/bounded_queue.h"
#include "cybertron/scheduler/proc_balancer.h"
namespace apollo {
namespace cybertron {
class TaskManager {
public:
virtual ~TaskManager();
template <typename F, typename... Args>
auto Enqueue(F&& func, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type> {
using return_type = typename std::result_of<F(Args...)>::type;
auto task = std::make_shared<std::packaged_task<return_type()>>(
std::bind(std::forward<F>(func), std::forward<Args>(args)...));
task_queue_->Enqueue([task]() { (*task)(); });
for (auto& task : tasks_) {
scheduler::ProcBalancer::Instance()->NotifyProcessor(task);
}
std::future<return_type> res(task->get_future());
return res;
}
private:
uint32_t num_threads_;
uint32_t task_queue_size_ = 1000;
std::vector<uint64_t> tasks_;
std::shared_ptr<base::BoundedQueue<std::function<void()>>> task_queue_;
DECLARE_SINGLETON(TaskManager);
};
} // namespace cybertron
} // namespace apollo
#endif // CYBERTRON_TASK_TASK_MANAGER_H_
......@@ -67,11 +67,6 @@ TEST(InitCybertronTest, all_in_one) {
Shutdown();
}
TEST(InitCybertronTest, create_task) {
auto task_ = apollo::cybertron::CreateTask<CarStatus, int>("task", &UserTask);
auto void_task_ = apollo::cybertron::CreateTask<>("void_task", &VoidTask);
}
} // namespace cybertron
} // namespace apollo
......
......@@ -34,10 +34,3 @@ target_link_libraries(scheduler_policy_test
gtest
)
install(TARGETS scheduler_policy_test DESTINATION ${UNIT_TEST_INSTALL_PREFIX})
add_executable(task_test task_test.cpp)
target_link_libraries(task_test
cybertron
gtest
)
install(TARGETS task_test DESTINATION ${UNIT_TEST_INSTALL_PREFIX})
\ No newline at end of file
#include "cybertron/scheduler/task.h"
#include "cybertron/common/log.h"
#include "cybertron/component/component.h"
#include "cybertron/component/timer_component.h"
#include "cybertron/cybertron.h"
#include "cybertron/message/raw_message.h"
#include "cybertron/proto/driver.pb.h"
#include "gtest/gtest.h"
using apollo::cybertron::proto::CarStatus;
using apollo::cybertron::Task;
namespace apollo {
namespace cybertron {
namespace scheduler {
struct Message {
uint64_t id;
};
void Task1() { ADEBUG << "Task1 running"; }
void Task2(const std::shared_ptr<Message>& input) {
usleep(10000);
ADEBUG << "Task2 running";
}
uint64_t Task3(const std::shared_ptr<Message>& input) {
ADEBUG << "Task3 running";
return input->id;
}
uint64_t Task4(const std::shared_ptr<Message>& input) {
ADEBUG << "Task4 running";
usleep(10000);
return input->id;
}
/*
Message UserTask(const std::shared_ptr<CarStatus>& msg) {
AINFO << "receive msg";
return 0;
}
*/
TEST(TaskTest, create_task) {
auto task_1 = std::make_shared<Task<>>("task1", &Task1);
task_1.reset();
auto task_2 = std::make_shared<Task<void>>("task2", &Task1);
task_2.reset();
auto task_3 = std::make_shared<Task<void, void>>("task3", &Task1);
task_3.reset();
}
TEST(TaskTest, return_value) {
auto msg = std::make_shared<Message>();
msg->id = 1;
auto task_1 = std::make_shared<Task<>>("task1", &Task1);
usleep(100000);
task_1.reset();
auto task_2 = std::make_shared<Task<Message, void>>("task2", &Task2, 10);
std::vector<std::future<void>> results;
for (int i = 0; i < 2000; ++i) {
results.push_back(task_2->Execute(msg));
}
for (auto& result: results) {
result.get();
}
auto msg3 = std::make_shared<Message>();
msg3->id = 1;
auto task_3 = std::make_shared<Task<Message, int>>("task3", &Task3);
auto ret_3 = task_3->Execute(msg3);
EXPECT_EQ(ret_3.get(), 1);
ret_3 = task_3->Execute(msg3);
usleep(100000);
EXPECT_EQ(ret_3.get(), 1);
auto task_4 = std::make_shared<Task<Message, uint64_t>>("task4", &Task4, 20);
std::vector<std::future<uint64_t>> results_4;
for (int i = 0; i < 1000; i++) {
results_4.emplace_back(task_4->Execute(msg));
}
for (auto& result : results_4) {
result.get();
}
task_4.reset();
}
} // namespace scheduler
} // namespace cybertron
} // namespace apollo
int main(int argc, char** argv) {
apollo::cybertron::Init(argv[0]);
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
project(scheduler_test)
add_executable(task_test task_test.cpp)
target_link_libraries(task_test
cybertron
gtest
)
install(TARGETS task_test DESTINATION ${UNIT_TEST_INSTALL_PREFIX})
#include "cybertron/task/task.h"
#include "cybertron/common/log.h"
#include "cybertron/cybertron.h"
#include "gtest/gtest.h"
namespace apollo {
namespace cybertron {
namespace scheduler {
class Foo {
public:
void RunOnce() {
auto res = Async(&Foo::Task, this, 10);
EXPECT_EQ(res.get(), 10);
}
uint32_t Task(const uint32_t& input) {
return input;
}
};
struct Message {
uint64_t id;
};
void Task1() { ADEBUG << "Task1 running"; }
void Task2(const Message& input) {
usleep(10000);
ADEBUG << "Task2 running";
}
uint64_t Task3(const std::shared_ptr<Message>& input) {
ADEBUG << "Task3 running";
return input->id;
}
TEST(AsyncTest, create_task) {
auto task_1 = Async(&Task1);
task_1.get();
Message msg;
auto task_2 = Async(&Task2, msg);
task_2.get();
auto shared_msg = std::make_shared<Message>();
shared_msg->id = 1;
auto task_3 = Async(&Task3, shared_msg);
EXPECT_EQ(task_3.get(), 1);
}
TEST(AsyncTest, batch_run) {
std::vector<std::future<void>> void_results;
for (int i = 0; i < 10; i++) {
void_results.push_back(Async(&Task1));
}
for (auto& result: void_results) {
result.get();
}
uint32_t loop = 10;
std::vector<std::future<uint64_t>> int_results;
for (int i = 0; i < loop; i++) {
auto shared_msg = std::make_shared<Message>();
shared_msg->id = i;
int_results.push_back(Async(&Task3, shared_msg));
}
for (int i = 0; i < loop; i++) {
EXPECT_EQ(int_results[i].get(), i);
}
}
TEST(AsyncTest, run_member_function) {
Foo foo;
foo.RunOnce();
}
} // namespace scheduler
} // namespace cybertron
} // namespace apollo
int main(int argc, char** argv) {
apollo::cybertron::Init(argv[0]);
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
......@@ -20,7 +20,7 @@
#include "cybertron/component/timer_component.h"
#include "cybertron/message/raw_message.h"
#include "cybertron/proto/driver.pb.h"
#include "cybertron/scheduler/task.h"
#include "cybertron/task/task.h"
namespace apollo {
namespace cybertron {
......
......@@ -18,11 +18,10 @@
#include "cybertron/common/log.h"
#include "cybertron/cybertron.h"
#include "cybertron/proto/driver.pb.h"
#include "cybertron/scheduler/task.h"
#include "cybertron/task/task.h"
static const uint8_t num_threads = 3;
using apollo::cybertron::Task;
using apollo::cybertron::proto::Driver;
struct Message {
......@@ -31,16 +30,24 @@ struct Message {
std::string content;
};
int TaskProcessor(const Message& msg) {
//apollo::cybertron::USleep(100000);
usleep(100000);
AINFO << "ID is: " << msg.msg_id;
return msg.msg_id;
}
void AsyncDataProcessor() {
for (;;) {
AINFO << "AsyncDataProcesor is running.";
apollo::cybertron::USleep(5000000);
std::vector<std::future<int>> results;
for (int i = 0; i < 1000; i++) {
Message msg;
msg.msg_id = i;
results.push_back(apollo::cybertron::Async(&TaskProcessor, msg));
}
}
int TaskProcessor(const std::shared_ptr<Message>& msg) {
apollo::cybertron::USleep(100000);
return msg->msg_id;
for (auto& result: results) {
AINFO << "Result is: " << result.get();
}
}
void VoidTaskProcessor(const std::shared_ptr<Message>& msg) {
......@@ -49,37 +56,8 @@ void VoidTaskProcessor(const std::shared_ptr<Message>& msg) {
}
int main(int argc, char* argv[]) {
// Init
apollo::cybertron::Init(argv[0]);
Task<> task0("async_data_processor", &AsyncDataProcessor, 1);
Task<Message, int> task1(
"task_processor", &TaskProcessor, num_threads);
Task<Message, void> task2(
"void_task_processor", &VoidTaskProcessor, num_threads);
// Run
uint64_t i = 0;
while (!apollo::cybertron::IsShutdown()) {
std::vector<std::future<int>> futures;
for (int j = 0; j < 1000; ++j) {
auto msg = std::make_shared<Message>();
msg->msg_id = i++;
msg->task_id = j;
futures.emplace_back(task1.Execute(msg));
task2.Execute(msg);
}
apollo::cybertron::USleep(500000);
if (apollo::cybertron::IsShutdown()) {
break;
}
for (auto& future: futures) {
//AINFO << "Finish task:" << future.get();
}
AINFO << "All task are finished.";
}
AINFO << "Return";
auto ret = apollo::cybertron::Async(&AsyncDataProcessor);
ret.get();
return 0;
}
......@@ -105,7 +105,6 @@ class CanReceiver {
MessageManager<SensorType> *pt_manager_ = nullptr;
bool enable_log_ = false;
bool is_init_ = false;
std::unique_ptr<cybertron::Task<>> task_;
DISALLOW_COPY_AND_ASSIGN(CanReceiver);
};
......@@ -194,7 +193,7 @@ template <typename SensorType>
}
is_running_ = true;
task_ = cybertron::CreateTask("CanReceiver", [this] { RecvThreadFunc(); });
cybertron::Async(&CanReceiver<SensorType>::RecvThreadFunc, this);
return ::apollo::common::ErrorCode::OK;
}
......
......@@ -60,9 +60,7 @@ bool UsbCamComponent::Init() {
pb_image_->mutable_data()->reserve(raw_image_->image_size);
writer_ = node_->CreateWriter<Image>(camera_config_->channel_name());
task_.reset(new apollo::cybertron::Task<>(
"usb_cam_task", std::bind(&UsbCamComponent::run, this), 1));
cybertron::Async(&UsbCamComponent::run, this);
return true;
}
......
......@@ -47,7 +47,6 @@ class UsbCamComponent : public Component<> {
std::shared_ptr<Config> camera_config_;
CameraImagePtr raw_image_ = nullptr;
std::shared_ptr<Image> pb_image_ = nullptr;
std::unique_ptr<apollo::cybertron::Task<>> task_;
float spin_rate_ = 0.005;
float device_wait_ = 2.0;
};
......
......@@ -256,9 +256,9 @@ void LocalizationIntegProcess::MeasureDataProcess(
void LocalizationIntegProcess::StartThreadLoop() {
keep_running_ = true;
measure_data_queue_size_ = 150;
const auto &loop_func = [this] { MeasureDataThreadLoop(); };
// const auto &loop_func = [this] { MeasureDataThreadLoop(); };
// measure_data_thread_ = std::thread(loop_func);
measure_data_task_ = cybertron::CreateTask("SinsMeasureReceiver", loop_func);
cybertron::Async(&LocalizationIntegProcess::MeasureDataThreadLoop, this);
}
void LocalizationIntegProcess::StopThreadLoop() {
......
......@@ -97,7 +97,6 @@ class LocalizationIntegProcess {
std::atomic<bool> keep_running_;
// std::thread measure_data_thread_;
std::unique_ptr<cybertron::Task<>> measure_data_task_;
// std::condition_variable new_measure_data_signal_;
std::queue<MeasureData> measure_data_queue_;
int measure_data_queue_size_;
......
......@@ -24,11 +24,9 @@
#include <functional>
#include <utility>
#include "cybertron/cybertron.h"
#include "cybertron/scheduler/task.h"
#include "cybertron/task/task.h"
#include "modules/planning/proto/sl_boundary.pb.h"
#include "modules/common/configs/vehicle_config_helper.h"
#include "modules/common/util/string_util.h"
#include "modules/common/util/util.h"
......@@ -289,16 +287,10 @@ PathObstacle* ReferenceLineInfo::AddObstacle(const Obstacle* obstacle) {
bool ReferenceLineInfo::AddObstacles(
const std::vector<const Obstacle*>& obstacles) {
if (FLAGS_use_multi_thread_to_add_obstacles) {
auto task = std::make_shared<apollo::cybertron::Task<Obstacle, bool>>(
"planning_add_obstacles",
[this](const std::shared_ptr<Obstacle>& msg) -> bool {
return this->AddObstacleHelper(msg);
},
FLAGS_max_planning_thread_pool_size);
std::vector<std::future<bool>> results;
std::vector<std::future<PathObstacle*>> results;
for (const auto* obstacle : obstacles) {
results.push_back(task->Execute(std::make_shared<Obstacle>(*obstacle)));
results.push_back(
cybertron::Async(&ReferenceLineInfo::AddObstacle, this, obstacle));
}
for (auto& result : results) {
if (!result.get()) {
......
......@@ -106,10 +106,8 @@ bool ReferenceLineProvider::Start() {
}
if (FLAGS_enable_reference_line_provider_thread) {
task_.reset(new apollo::cybertron::Task<int>(
"async_reference_line_provider",
[this](const std::shared_ptr<int> &) { this->GenerateThread(); }));
task_future_ = task_->Execute(std::make_shared<int>());
task_future_ =
cybertron::Async(&ReferenceLineProvider::GenerateThread, this);
}
return true;
}
......
......@@ -180,7 +180,6 @@ class ReferenceLineProvider {
std::queue<std::list<ReferenceLine>> reference_line_history_;
std::queue<std::list<hdmap::RouteSegments>> route_segments_history_;
std::unique_ptr<cybertron::Task<int>> task_;
std::future<void> task_future_;
};
......
......@@ -25,7 +25,7 @@
#include <string>
#include <utility>
#include "cybertron/scheduler/task.h"
#include "cybertron/task/task.h"
#include "modules/common/proto/pnc_point.pb.h"
......@@ -176,21 +176,12 @@ Status DpStGraph::CalculateTotalCost() {
int count = next_highest_row - next_lowest_row + 1;
if (count > 0) {
std::unique_ptr<cybertron::Task<StGraphMessage>> task;
if (FLAGS_enable_multi_thread_in_dp_st_graph) {
task = apollo::cybertron::CreateTask<StGraphMessage>(
"dp_process",
[this](const std::shared_ptr<StGraphMessage>& msg) {
this->CalculateCostAt(msg);
},
FLAGS_max_planning_thread_pool_size);
}
std::vector<std::future<void>> results;
for (uint32_t r = next_lowest_row; r <= next_highest_row; ++r) {
auto msg = std::make_shared<StGraphMessage>(c, r);
if (FLAGS_enable_multi_thread_in_dp_st_graph) {
results.push_back(task->Execute(msg));
results.push_back(
cybertron::Async(&DpStGraph::CalculateCostAt, this, msg));
} else {
CalculateCostAt(msg);
}
......
......@@ -23,8 +23,7 @@
#include <algorithm>
#include <utility>
#include "cybertron/cybertron.h"
#include "cybertron/scheduler/task.h"
#include "cybertron/task/task.h"
#include "modules/common/proto/error_code.pb.h"
#include "modules/planning/proto/planning_internal.pb.h"
......@@ -149,16 +148,6 @@ bool DpRoadGraph::GenerateMinCostPath(
auto &front = graph_nodes.front().front();
size_t total_level = path_waypoints.size();
std::unique_ptr<cybertron::Task<RoadGraphMessage>> task;
if (FLAGS_enable_multi_thread_in_dp_poly_path) {
task = cybertron::CreateTask<RoadGraphMessage>(
"planning_graph_node_process",
[this](const std::shared_ptr<RoadGraphMessage> &msg) {
this->UpdateNode(msg);
},
FLAGS_max_planning_thread_pool_size);
}
for (std::size_t level = 1; level < path_waypoints.size(); ++level) {
const auto &prev_dp_nodes = graph_nodes.back();
const auto &level_points = path_waypoints[level];
......@@ -176,7 +165,8 @@ bool DpRoadGraph::GenerateMinCostPath(
&(graph_nodes.back().back()));
if (FLAGS_enable_multi_thread_in_dp_poly_path) {
results.emplace_back(task->Execute(msg));
results.emplace_back(
cybertron::Async(&DpRoadGraph::UpdateNode, this, msg));
} else {
UpdateNode(msg);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册