提交 72b29e27 编写于 作者: A azural 提交者: fengqikai1414

framework: 1.Fix reader without callback can not observe issue 2. Fix

unit test.
上级 d180865e
......@@ -107,6 +107,9 @@ template <typename MessageT>
void Reader<MessageT>::Enqueue(const std::shared_ptr<MessageT>& msg) {
std::lock_guard<std::mutex> lg(mutex_);
history_queue_.push_front(msg);
while (history_queue_.size() > history_depth_) {
history_queue_.pop_back();
}
}
template <typename MessageT>
......@@ -120,23 +123,29 @@ bool Reader<MessageT>::Init() {
if (init_.exchange(true)) {
return true;
}
std::function<void(const std::shared_ptr<MessageT>&)> func;
if (reader_func_ != nullptr) {
auto sched = scheduler::Scheduler::Instance();
croutine_name_ = role_attr_.node_name() + "_" + role_attr_.channel_name();
auto func = [this](const std::shared_ptr<MessageT>& msg) {
func = [this](const std::shared_ptr<MessageT>& msg) {
this->Enqueue(msg);
this->reader_func_(msg);
};
auto dv = std::make_shared<data::DataVisitor<MessageT>>(
role_attr_.channel_id(), role_attr_.qos_profile().depth());
// Using factory to wrap templates.
croutine::RoutineFactory factory =
croutine::CreateRoutineFactory<MessageT>(std::move(func), dv);
if (!sched->CreateTask(factory, croutine_name_)) {
init_.exchange(false);
return false;
}
} else {
func = [this](const std::shared_ptr<MessageT>& msg) {
this->Enqueue(msg);
};
}
auto sched = scheduler::Scheduler::Instance();
croutine_name_ = role_attr_.node_name() + "_" + role_attr_.channel_name();
auto dv = std::make_shared<data::DataVisitor<MessageT>>(
role_attr_.channel_id(), role_attr_.qos_profile().depth());
// Using factory to wrap templates.
croutine::RoutineFactory factory =
croutine::CreateRoutineFactory<MessageT>(std::move(func), dv);
if (!sched->CreateTask(factory, croutine_name_)) {
AERROR << "Create Failed!";
init_.exchange(false);
return false;
}
lower_reach_ = ReaderManager<MessageT>::Instance()->GetReader(role_attr_);
......
......@@ -83,17 +83,17 @@ TEST(TimerComponent, init) {
compcfg.set_name("perception2");
apollo::cybertron::proto::ReaderOption *read_opt2 = compcfg.add_readers();
read_opt2->set_channel("/driver/channel");
read_opt2->set_channel("/driver/channel1");
auto comB = std::make_shared<Component_B<RawMessage, RawMessage>>();
EXPECT_EQ(true, comB->Initialize(compcfg));
EXPECT_EQ(true, comB->Process(msg_str1, msg_str2));
compcfg.set_name("perception3");
apollo::cybertron::proto::ReaderOption *read_opt3 = compcfg.add_readers();
read_opt3->set_channel("/driver/channel");
read_opt3->set_channel("/driver/channel2");
compcfg.set_name("perception4");
apollo::cybertron::proto::ReaderOption *read_opt4 = compcfg.add_readers();
read_opt4->set_channel("/driver/channel");
read_opt4->set_channel("/driver/channel3");
auto comA = std::make_shared<
Component_A<RawMessage, RawMessage, RawMessage, RawMessage>>();
EXPECT_EQ(true, comA->Initialize(compcfg));
......
......@@ -78,18 +78,22 @@ TEST(NodeTest, create_reader_with_attr) {
Node node("create_reader_with_attr");
proto::RoleAttributes attr;
attr.set_channel_name("channel");
attr.set_channel_name("channel_a");
auto reader_a = node.CreateReader<proto::UnitTest>(attr, nullptr);
ASSERT_TRUE(reader_a != nullptr);
EXPECT_EQ(reader_a->GetChannelName(), "channel");
EXPECT_EQ(reader_a->GetChannelName(), "channel_a");
EXPECT_TRUE(reader_a->inited());
reader_a->Shutdown();
attr.set_channel_name("channel_b");
auto reader_b = node.CreateReader<proto::UnitTest>(attr);
ASSERT_TRUE(reader_b != nullptr);
EXPECT_EQ(reader_b->GetChannelName(), "channel");
EXPECT_EQ(reader_b->GetChannelName(), "channel_b");
EXPECT_TRUE(reader_b->inited());
reader_b->Shutdown();
auto reader_c = node.CreateReader<proto::UnitTest>(attr);
ASSERT_EQ(reader_c, nullptr);
}
TEST(NodeTest, create_service) {
......
......@@ -94,8 +94,8 @@ TEST(WriterReaderTest, init_and_shutdown) {
EXPECT_TRUE(reader_b.inited());
Reader<proto::UnitTest> reader_c(attr);
EXPECT_TRUE(reader_c.Init());
EXPECT_TRUE(reader_c.inited());
EXPECT_FALSE(reader_c.Init());
EXPECT_FALSE(reader_c.inited());
writer_a.Shutdown();
// repeated call
......@@ -158,8 +158,13 @@ TEST(WriterReaderTest, messaging) {
TEST(WriterReaderTest, observe) {
proto::RoleAttributes attr;
attr.set_channel_name("test_reader");
attr.set_node_name("node");
attr.set_channel_name("channel");
auto channel_id = common::GlobalData::RegisterChannel(attr.channel_name());
attr.set_channel_id(channel_id);
Reader<proto::UnitTest> reader(attr);
reader.Init();
EXPECT_TRUE(reader.Empty());
EXPECT_FALSE(reader.HasReceived());
......@@ -203,6 +208,16 @@ TEST(WriterReaderTest, observe) {
reader.ClearData();
EXPECT_TRUE(reader.Empty());
EXPECT_FALSE(reader.HasReceived());
Writer<proto::UnitTest> writer(attr);
writer.Init();
writer.Write(msg1);
usleep(10000);
ASSERT_TRUE(reader.HasReceived());
reader.Observe();
ASSERT_FALSE(reader.Empty());
latest = reader.GetLatestObserved();
EXPECT_EQ(latest->case_name(), "message_1");
}
} // namespace cybertron
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册