提交 5ae338fe 编写于 作者: A azural 提交者: fengqikai1414

framework: Add Observe interface

上级 2030f20c
......@@ -43,5 +43,17 @@ Node::~Node() { node_manager_->Leave(attr_, RoleType::ROLE_NODE); }
const std::string& Node::Name() const { return node_name_; }
void Node::Observe() {
for (auto& reader : readers_) {
reader.second->Observe();
}
}
void Node::ClearData() {
for (auto& reader : readers_) {
reader.second->ClearData();
}
}
} // namespace cybertron
} // namespace apollo
......@@ -17,6 +17,7 @@
#ifndef CYBERTRON_NODE_NODE_H_
#define CYBERTRON_NODE_NODE_H_
#include <map>
#include <memory>
#include <string>
......@@ -37,9 +38,7 @@ class Node {
friend class TimerComponent;
friend std::unique_ptr<Node> CreateNode(const std::string&,
const std::string&);
virtual ~Node();
const std::string& Name() const;
template <typename MessageT>
......@@ -70,6 +69,13 @@ class Node {
auto CreateClient(const std::string& service_name)
-> std::shared_ptr<Client<Request, Response>>;
void Observe();
void ClearData();
template <typename MessageT>
auto GetReader(const std::string& channel_name)
-> std::shared_ptr<Reader<MessageT>>;
private:
explicit Node(const std::string& node_name,
const std::string& name_space = "");
......@@ -83,6 +89,8 @@ class Node {
std::string node_name_;
std::string name_space_;
proto::RoleAttributes attr_;
std::mutex readers_mutex_;
std::map<std::string, std::shared_ptr<ReaderBase>> readers_;
};
template <typename MessageT>
......@@ -100,23 +108,32 @@ auto Node::CreateWriter(const std::string& channel_name)
template <typename MessageT>
auto Node::CreateReader(const proto::RoleAttributes& role_attr)
-> std::shared_ptr<Reader<MessageT>> {
return node_channel_impl_->template CreateReader<MessageT>(role_attr);
auto reader = node_channel_impl_->template CreateReader<MessageT>(role_attr);
std::lock_guard<std::mutex> lg(readers_mutex_);
readers_.emplace(std::make_pair(role_attr.channel_name(), reader));
return reader;
}
template <typename MessageT>
auto Node::CreateReader(const proto::RoleAttributes& role_attr,
const CallbackFunc<MessageT>& reader_func)
-> std::shared_ptr<Reader<MessageT>> {
return node_channel_impl_->template CreateReader<MessageT>(role_attr,
reader_func);
auto reader = node_channel_impl_->template CreateReader<MessageT>(
role_attr, reader_func);
std::lock_guard<std::mutex> lg(readers_mutex_);
readers_.emplace(std::make_pair(role_attr.channel_name(), reader));
return reader;
}
template <typename MessageT>
auto Node::CreateReader(const std::string& channel_name,
const CallbackFunc<MessageT>& reader_func)
-> std::shared_ptr<Reader<MessageT>> {
return node_channel_impl_->template CreateReader<MessageT>(channel_name,
reader_func);
auto reader = node_channel_impl_->template CreateReader<MessageT>(
channel_name, reader_func);
std::lock_guard<std::mutex> lg(readers_mutex_);
readers_.emplace(std::make_pair(channel_name, reader));
return reader;
}
template <typename Request, typename Response>
......@@ -135,6 +152,17 @@ auto Node::CreateClient(const std::string& service_name)
service_name);
}
template <typename MessageT>
auto Node::GetReader(const std::string& name)
-> std::shared_ptr<Reader<MessageT>> {
std::lock_guard<std::mutex> lg(readers_mutex_);
auto it = readers_.find(name);
if (it != readers_.end()) {
return std::dynamic_pointer_cast<Reader<MessageT>>(it->second);
}
return nullptr;
}
} // namespace cybertron
} // namespace apollo
......
......@@ -18,6 +18,7 @@
#define CYBERTRON_NODE_READER_H_
#include <memory>
#include <mutex>
#include <string>
#include <unordered_map>
#include <utility>
......@@ -47,6 +48,8 @@ class Reader : public ReaderBase {
using LowerReachPtr = std::shared_ptr<transport::LowerReach<MessageT>>;
using ChangeConnection =
typename service_discovery::Manager::ChangeConnection;
using Iterator =
typename std::list<std::shared_ptr<MessageT>>::const_iterator;
Reader(const proto::RoleAttributes& role_attr,
const CallbackFunc<MessageT>& reader_func = nullptr);
......@@ -54,6 +57,18 @@ class Reader : public ReaderBase {
bool Init() override;
void Shutdown() override;
void Observe() override;
void ClearData() override;
bool HasReceived() const override;
bool Empty() const override;
void Enqueue(const std::shared_ptr<MessageT>& msg);
void SetHistoryDepth(const uint32_t& depth);
uint32_t GetHistoryDepth() const;
const std::shared_ptr<MessageT>& GetLatestObserved() const;
const std::shared_ptr<MessageT>& GetOldestObserved() const;
Iterator Begin() const { return observed_queue_.begin(); }
Iterator End() const { return observed_queue_.end(); }
protected:
void JoinTheTopology();
......@@ -66,6 +81,11 @@ class Reader : public ReaderBase {
ChangeConnection change_conn_;
service_discovery::ChannelManagerPtr channel_manager_;
mutable std::mutex mutex_;
uint32_t history_depth_ = 10;
std::list<std::shared_ptr<MessageT>> history_queue_;
std::list<std::shared_ptr<MessageT>> observed_queue_;
};
template <typename MessageT>
......@@ -82,6 +102,18 @@ Reader<MessageT>::~Reader() {
Shutdown();
}
template <typename MessageT>
void Reader<MessageT>::Enqueue(const std::shared_ptr<MessageT>& msg) {
std::lock_guard<std::mutex> lg(mutex_);
history_queue_.push_front(msg);
}
template <typename MessageT>
void Reader<MessageT>::Observe() {
std::lock_guard<std::mutex> lg(mutex_);
observed_queue_ = history_queue_;
}
template <typename MessageT>
bool Reader<MessageT>::Init() {
if (init_.exchange(true)) {
......@@ -90,12 +122,16 @@ bool Reader<MessageT>::Init() {
if (reader_func_ != nullptr) {
auto sched = scheduler::Scheduler::Instance();
croutine_name_ = role_attr_.node_name() + "_" + role_attr_.channel_name();
auto func = [this](const std::shared_ptr<MessageT>& msg) {
this->Enqueue(msg);
this->reader_func_(msg);
};
auto dv = std::make_shared<data::DataVisitor<MessageT>>(
role_attr_.channel_id(), role_attr_.qos_profile().depth());
// Using factory to wrap templates.
croutine::RoutineFactory factory =
croutine::CreateRoutineFactory<MessageT>(reader_func_, dv);
croutine::CreateRoutineFactory<MessageT>(std::move(func), dv);
if (!sched->CreateTask(factory, croutine_name_)) {
init_.exchange(false);
return false;
......@@ -169,6 +205,57 @@ void Reader<MessageT>::OnChannelChange(const proto::ChangeMsg& change_msg) {
}
}
template <typename MessageT>
bool Reader<MessageT>::HasReceived() const {
std::lock_guard<std::mutex> lg(mutex_);
return !history_queue_.empty();
}
template <typename MessageT>
bool Reader<MessageT>::Empty() const {
std::lock_guard<std::mutex> lg(mutex_);
return observed_queue_.empty();
}
template <typename MessageT>
const std::shared_ptr<MessageT>& Reader<MessageT>::GetLatestObserved() const {
std::lock_guard<std::mutex> lg(mutex_);
if (observed_queue_.empty()) {
return nullptr;
}
return observed_queue_.front();
}
template <typename MessageT>
const std::shared_ptr<MessageT>& Reader<MessageT>::GetOldestObserved() const {
std::lock_guard<std::mutex> lg(mutex_);
if (observed_queue_.empty()) {
return nullptr;
}
return observed_queue_.back();
}
template <typename MessageT>
void Reader<MessageT>::ClearData() {
std::lock_guard<std::mutex> lg(mutex_);
history_queue_.clear();
observed_queue_.clear();
}
template <typename MessageT>
void Reader<MessageT>::SetHistoryDepth(const uint32_t& depth) {
std::lock_guard<std::mutex> lg(mutex_);
history_depth_ = depth;
while (history_queue_.size() > history_depth_) {
history_queue_.pop_back();
}
}
template <typename MessageT>
uint32_t Reader<MessageT>::GetHistoryDepth() const {
return history_depth_;
}
} // namespace cybertron
} // namespace apollo
......
......@@ -50,6 +50,11 @@ class ReaderBase {
virtual bool Init() = 0;
virtual void Shutdown() = 0;
virtual void ClearData() = 0;
virtual void Observe() = 0;
virtual bool Empty() const = 0;
virtual bool HasReceived() const = 0;
const std::string& GetChannelName() const {
return role_attr_.channel_name();
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册