提交 c7a8410a 编写于 作者: D dangyuedong 提交者: Jiangtao Hu

Framework: optimize cyber_monitor code

上级 becc5389
......@@ -5,8 +5,6 @@ package(default_visibility = ["//visibility:public"])
cc_binary(
name = "cyber_monitor",
srcs = [
"channel_msg_factory.cc",
"cyber_channel_message.cc",
"cyber_topology_message.cc",
"general_channel_message.cc",
"general_message.cc",
......@@ -16,8 +14,6 @@ cc_binary(
"screen.cc",
],
deps = [
"channel_msg_factory",
"cyber_channel_message",
"cyber_topology_message",
"general_channel_message",
"screen",
......@@ -30,23 +26,6 @@ cc_binary(
],
)
cc_library(
name = "channel_msg_factory",
hdrs = [ "channel_msg_factory.h", ],
)
cc_library(
name = "cyber_channel_message",
hdrs = [ "cyber_channel_message.h", ],
deps = [
"general_message",
"renderable_message",
"//cyber:cyber",
"//cyber/time:time",
"//cyber/time:duration",
],
)
cc_library(
name = "cyber_topology_message",
hdrs = [ "cyber_topology_message.h", ],
......
/******************************************************************************
* Copyright 2018 The Apollo Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*****************************************************************************/
#include "./channel_msg_factory.h"
#include "./general_channel_message.h"
#include <unistd.h>
#include <string>
ChannelMsgFactory* ChannelMsgFactory::Instance() {
static ChannelMsgFactory factory;
return &factory;
}
ChannelMsgFactory::ChannelMsgFactory(void) : pid_(getpid()) {}
bool ChannelMsgFactory::isFromHere(const std::string& nodeName) {
std::ostringstream outStr;
outStr << "MonitorReader" << pid_;
std::string templateName = outStr.str();
const std::string baseName = nodeName.substr(0, templateName.size());
return (templateName.compare(baseName) == 0);
}
bool ChannelMsgFactory::SetDefaultChildFactory(
const std::string& defautlMsgType) {
auto iter = general_factory_.find(defautlMsgType);
if (iter == general_factory_.cend()) {
return false;
}
default_child_factory_ = iter;
return true;
}
ChannelMessage* ChannelMsgFactory::CreateChannelMessage(
const std::string& msgTypeName, const std::string& channelName) {
static int index = 0;
std::ostringstream outStr;
outStr << "MonitorReader" << pid_ << '-' << index++;
ChannelMessage* ret = ChannelMessage::castErrorCode2Ptr(
ChannelMessage::ErrorCode::MessageTypeIsEmptr);
if (!msgTypeName.empty()) {
auto iter = general_factory_.find(msgTypeName);
if (iter != general_factory_.cend()) {
ret = iter->second(channelName, outStr.str());
} else {
if (default_child_factory_ != general_factory_.cend()) {
ret = default_child_factory_->second(channelName, outStr.str());
}
}
}
return ret;
}
/******************************************************************************
* Copyright 2018 The Apollo Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*****************************************************************************/
#ifndef TOOLS_CVT_MONITOR_CHANNEL_MSG_FACTORY_H_
#define TOOLS_CVT_MONITOR_CHANNEL_MSG_FACTORY_H_
#include <functional>
#include <iostream>
#include <map>
#include <string>
class ChannelMessage;
using CreatorFunction =
std::function<ChannelMessage*(const std::string&, const std::string&)>;
class ChannelMsgFactory final {
public:
static ChannelMsgFactory* Instance(void);
~ChannelMsgFactory() {}
bool RegisterChildFactory(const std::string& msgTypeName, CreatorFunction f) {
if (msgTypeName.empty() || f == nullptr) {
return false;
}
auto iter = general_factory_.find(msgTypeName);
if (iter != general_factory_.cend()) {
return false;
}
general_factory_[msgTypeName] = f;
return true;
}
void RemoveChildFactory(const std::string& msgTypeName) {
if (msgTypeName.empty()) {
return;
}
auto iter = general_factory_.find(msgTypeName);
if (iter != general_factory_.cend()) {
general_factory_.erase(iter);
}
}
bool SetDefaultChildFactory(const std::string& defautlMsgType);
ChannelMessage* CreateChannelMessage(const std::string& msgTypeName,
const std::string& channelName);
bool isFromHere(const std::string& nodeName);
private:
ChannelMsgFactory(void);
ChannelMsgFactory(const ChannelMsgFactory& other) = delete;
ChannelMsgFactory& operator=(const ChannelMsgFactory&) = delete;
int pid_;
std::map<std::string, CreatorFunction>::const_iterator default_child_factory_;
std::map<std::string, CreatorFunction> general_factory_;
};
#endif // TOOLS_CVT_MONITOR_CHANNEL_MSG_FACTORY_H_
/******************************************************************************
* Copyright 2018 The Apollo Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*****************************************************************************/
#include "./cyber_channel_message.h"
double ChannelMessage::max_frmae_ratio_ = 1.0;
/******************************************************************************
* Copyright 2018 The Apollo Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*****************************************************************************/
#ifndef TOOLS_CVT_MONITOR_CYBER_CHANNEL_MESSAGE_H_
#define TOOLS_CVT_MONITOR_CYBER_CHANNEL_MESSAGE_H_
#include <fstream>
#include <memory>
#include <mutex>
#include <string>
#include <vector>
#include "./general_message_base.h"
#include "./renderable_message.h"
#include "cyber/cyber.h"
#include "cyber/time/duration.h"
#include "cyber/time/time.h"
class Screen;
class ChannelMessage : public GeneralMessageBase {
static double max_frmae_ratio_;
public:
static double max_frame_ratio(void) { return max_frmae_ratio_; }
enum class ErrorCode {
NewSubClassFailed = -1,
CreateNodeFailed = -2,
CreateReaderFailed = -3,
MessageTypeIsEmptr = -4
};
static const char* errCode2Str(ErrorCode errCode) {
const char* ret = "Unknown Error Code";
switch (errCode) {
case ChannelMessage::ErrorCode::NewSubClassFailed:
ret = "Cannot create Parser Object";
break;
case ChannelMessage::ErrorCode::CreateNodeFailed:
ret = "Cannot create Cyber Node";
break;
case ChannelMessage::ErrorCode::CreateReaderFailed:
ret = "Cannot create Cyber Reader";
break;
case ChannelMessage::ErrorCode::MessageTypeIsEmptr:
ret = "Message Type is Empty";
break;
}
return ret;
}
static bool isErrorCode(void* ptr) {
ErrorCode err = (ErrorCode)(reinterpret_cast<intptr_t>(ptr));
switch (err) {
case ErrorCode::NewSubClassFailed:
case ErrorCode::CreateNodeFailed:
case ErrorCode::CreateReaderFailed:
case ErrorCode::MessageTypeIsEmptr:
return true;
default: {}
}
return false;
}
static ErrorCode castPtr2ErrorCode(void* ptr) {
assert(isErrorCode(ptr));
return static_cast<ErrorCode>(reinterpret_cast<intptr_t>(ptr));
}
static ChannelMessage* castErrorCode2Ptr(ErrorCode errCode) {
return reinterpret_cast<ChannelMessage*>(static_cast<intptr_t>(errCode));
}
explicit ChannelMessage(RenderableMessage* parent = nullptr)
: GeneralMessageBase(parent),
is_enabled_(true),
has_message_come_(false),
message_type_(),
frame_counter_(0),
frame_ratio_(0.0),
last_time_(apollo::cyber::Time::Now()) {}
~ChannelMessage() { channel_node_.reset(); }
void set_message_type(const std::string& msgTypeName) {
message_type_ = msgTypeName;
}
const std::string& message_type(void) const { return message_type_; }
bool is_enabled(void) const { return is_enabled_; }
void set_enabled(bool b) {
is_enabled_ = b;
// if (!b) set_has_message_come(false);
}
bool has_message_come(void) const { return has_message_come_; }
double frame_ratio(void) {
if (!is_enabled_ || !has_message_come()) return 0.0;
apollo::cyber::Time curTime = apollo::cyber::Time::Now();
auto deltaTime = curTime - last_time_;
if (deltaTime.ToNanosecond() > 1000000000) {
last_time_ = curTime;
frame_ratio_ = frame_counter_ / deltaTime.ToSecond();
frame_counter_ = 0;
}
if (frame_ratio_ > max_frmae_ratio_) max_frmae_ratio_ = frame_ratio_;
return (frame_ratio_);
}
const std::string& NodeName(void) const { return channel_node_->Name(); }
void add_reader(const std::string& reader) { DoAdd(readers_, reader); }
void del_reader(const std::string& reader) { DoDelete(readers_, reader); }
void add_writer(const std::string& writer) { DoAdd(writers_, writer); }
void del_writer(const std::string& writer) {
DoDelete(writers_, writer);
if (!writers_.size()) {
set_has_message_come(false);
}
}
protected:
static void DoDelete(std::vector<std::string>& vec, const std::string& str) {
for (auto iter = vec.begin(); iter != vec.end(); ++iter) {
if (*iter == str) {
vec.erase(iter);
break;
}
}
}
static void DoAdd(std::vector<std::string>& vec, const std::string& str) {
for (auto iter = vec.begin(); iter != vec.end(); ++iter) {
if (*iter == str) {
return;
}
}
vec.emplace_back(str);
}
void set_has_message_come(bool b) { has_message_come_ = b; }
bool is_enabled_;
bool has_message_come_;
std::string message_type_;
int frame_counter_;
double frame_ratio_;
apollo::cyber::Time last_time_;
std::unique_ptr<apollo::cyber::Node> channel_node_;
std::vector<std::string> readers_;
std::vector<std::string> writers_;
};
template <typename MessageType>
class CyberChannelMessage : public ChannelMessage {
public:
explicit CyberChannelMessage(RenderableMessage* parent = nullptr)
: ChannelMessage(parent) {}
~CyberChannelMessage() {
channel_reader_.reset();
channel_message_.reset();
}
std::string GetChannelName(void) const {
return channel_reader_->GetChannelName();
}
protected:
void updateRawMessage(const std::shared_ptr<MessageType>& rawMsg) {
set_has_message_come(true);
if (!is_enabled_) {
return;
}
++frame_counter_;
std::lock_guard<std::mutex> _g(inner_lock_);
channel_message_.reset();
channel_message_ = rawMsg;
}
std::shared_ptr<MessageType> CopyMsgPtr(void) const {
decltype(channel_message_) channelMsg;
{
std::lock_guard<std::mutex> g(inner_lock_);
channelMsg = channel_message_;
}
return channelMsg;
}
std::shared_ptr<MessageType> channel_message_;
std::shared_ptr<apollo::cyber::Reader<MessageType>> channel_reader_;
mutable std::mutex inner_lock_;
};
#define ChannelMsgSubFactory(ChannelMsgSubClass, MessageType) \
static ChannelMessage* Instance(const std::string& channelName, \
const std::string& nodeName) { \
ChannelMessage* ret = castErrorCode2Ptr(ErrorCode::NewSubClassFailed); \
ChannelMsgSubClass* subClass = new ChannelMsgSubClass(); \
if (subClass) { \
ret = subClass; \
subClass->channel_node_ = apollo::cyber::CreateNode(nodeName); \
if (subClass->channel_node_ == nullptr) { \
delete subClass; \
subClass = nullptr; \
ret = castErrorCode2Ptr(ErrorCode::CreateNodeFailed); \
} else { \
auto callBack = \
[subClass](const std::shared_ptr<MessageType>& rawMsg) { \
subClass->updateRawMessage(rawMsg); \
}; \
apollo::cyber::ReaderConfig reader_cfg; \
reader_cfg.channel_name = channelName; \
reader_cfg.pending_queue_size = 20; \
subClass->channel_reader_ = \
subClass->channel_node_->CreateReader<MessageType>(reader_cfg, \
callBack); \
if (subClass->channel_reader_ == nullptr) { \
subClass->channel_node_.reset(); \
delete subClass; \
subClass = nullptr; \
ret = castErrorCode2Ptr(ErrorCode::CreateReaderFailed); \
} \
} \
} \
return ret; \
}
#endif // TOOLS_CVT_MONITOR_CYBER_CHANNEL_MESSAGE_H_
......@@ -15,7 +15,6 @@
*****************************************************************************/
#include "./cyber_topology_message.h"
#include "./channel_msg_factory.h"
#include "./general_channel_message.h"
#include "./screen.h"
......@@ -32,23 +31,44 @@ constexpr int SecondColumnOffset = 4;
CyberTopologyMessage::CyberTopologyMessage(const std::string& channel)
: RenderableMessage(nullptr, 1),
second_column_(SecondColumnType::MessageFrameRatio),
pid_(getpid()),
col1_width_(8),
specified_channel_(channel),
all_channels_map_() {}
CyberTopologyMessage::~CyberTopologyMessage(void) {
apollo::cyber::Shutdown();
for (auto item : all_channels_map_) {
if (!ChannelMessage::isErrorCode(item.second)) {
if (!GeneralChannelMessage::isErrorCode(item.second)) {
delete item.second;
}
}
}
bool CyberTopologyMessage::isFromHere(const std::string& nodeName) {
std::ostringstream outStr;
outStr << "MonitorReader" << pid_;
std::string templateName = outStr.str();
const std::string baseName = nodeName.substr(0, templateName.size());
return (templateName.compare(baseName) == 0);
}
RenderableMessage* CyberTopologyMessage::Child(int lineNo) const {
RenderableMessage* ret = nullptr;
auto iter = findChild(lineNo);
if (!GeneralChannelMessage::isErrorCode(iter->second) &&
iter->second->is_enabled()) {
ret = iter->second;
}
return ret;
}
std::map<std::string, GeneralChannelMessage*>::const_iterator CyberTopologyMessage::findChild(int lineNo) const{
--lineNo;
std::map<std::string, GeneralChannelMessage*>::const_iterator ret = all_channels_map_.cend();
if (lineNo > -1 && lineNo < page_item_count_) {
int i = 0;
......@@ -60,9 +80,7 @@ RenderableMessage* CyberTopologyMessage::Child(int lineNo) const {
for (i = 0; iter != all_channels_map_.cend(); ++iter) {
if (i == lineNo) {
if (!ChannelMessage::isErrorCode(iter->second)) {
ret = iter->second;
}
ret = iter;
break;
}
++i;
......@@ -84,11 +102,10 @@ void CyberTopologyMessage::TopologyChanged(
auto iter = all_channels_map_.find(changeMsg.role_attr().channel_name());
if (iter != all_channels_map_.cend() &&
!ChannelMessage::isErrorCode(iter->second)) {
!GeneralChannelMessage::isErrorCode(iter->second)) {
const std::string& nodeName = changeMsg.role_attr().node_name();
if (::apollo::cyber::proto::RoleType::ROLE_WRITER ==
changeMsg.role_type()) {
changeMsg.role_attr();
iter->second->del_writer(nodeName);
} else {
iter->second->del_reader(nodeName);
......@@ -110,29 +127,35 @@ void CyberTopologyMessage::AddReaderWriter(
}
const std::string& nodeName = role.node_name();
if (ChannelMsgFactory::Instance()->isFromHere(nodeName)) {
if (isFromHere(nodeName)) {
return;
}
ChannelMessage* channelMsg = nullptr;
GeneralChannelMessage* channelMsg = nullptr;
const std::string& msgTypeName = role.message_type();
auto iter = all_channels_map_.find(channelName);
if (iter == all_channels_map_.cend()) {
channelMsg = ChannelMsgFactory::Instance()->CreateChannelMessage(
msgTypeName, channelName);
static int index = 0;
if (!ChannelMessage::isErrorCode(channelMsg)) {
channelMsg->set_parent(this);
channelMsg->set_message_type(msgTypeName);
channelMsg->add_reader(channelMsg->NodeName());
}
std::ostringstream outStr;
outStr << "MonitorReader" << pid_ << '-' << index++;
channelMsg = new GeneralChannelMessage(outStr.str(), this);
if(channelMsg != nullptr){
if (!GeneralChannelMessage::isErrorCode(channelMsg->OpenChannel(channelName))) {
channelMsg->set_message_type(msgTypeName);
channelMsg->add_reader(channelMsg->NodeName());
}
} else {
channelMsg = GeneralChannelMessage::castErrorCode2Ptr(GeneralChannelMessage::ErrorCode::NewSubClassFailed);
}
all_channels_map_[channelName] = channelMsg;
} else {
channelMsg = iter->second;
}
if (!ChannelMessage::isErrorCode(channelMsg)) {
if (!GeneralChannelMessage::isErrorCode(channelMsg)) {
if (isWriter) {
if (msgTypeName != apollo::cyber::message::MessageType<
apollo::cyber::message::RawMessage>()) {
......@@ -159,9 +182,20 @@ void CyberTopologyMessage::ChangeState(const Screen* s, int key) {
break;
case ' ': {
ChannelMessage* child = static_cast<ChannelMessage*>(Child(*line_no()));
if (child) {
child->set_enabled(!child->is_enabled());
auto iter = findChild(*line_no());
if (!GeneralChannelMessage::isErrorCode(iter->second)) {
GeneralChannelMessage* child = iter->second;
if(child->is_enabled()){
child->CloseChannel();
} else {
GeneralChannelMessage* ret = child->OpenChannel(iter->first);
if(GeneralChannelMessage::isErrorCode(ret)){
delete child;
all_channels_map_[iter->first] = ret;
} else {
child->add_reader(child->NodeName());
}
}
}
}
......@@ -203,7 +237,7 @@ void CyberTopologyMessage::Render(const Screen* s, int key) {
++iter, ++line) {
color = Screen::RED_BLACK;
if (!ChannelMessage::isErrorCode(iter->second)) {
if (!GeneralChannelMessage::isErrorCode(iter->second)) {
if (iter->second->has_message_come()) {
if (iter->second->is_enabled()) {
color = Screen::GREEN_BLACK;
......@@ -216,7 +250,7 @@ void CyberTopologyMessage::Render(const Screen* s, int key) {
s->SetCurrentColor(color);
s->AddStr(0, line, iter->first.c_str());
if (!ChannelMessage::isErrorCode(iter->second)) {
if (!GeneralChannelMessage::isErrorCode(iter->second)) {
switch (second_column_) {
case SecondColumnType::MessageType:
s->AddStr(col1_width_ + SecondColumnOffset, line,
......@@ -231,10 +265,10 @@ void CyberTopologyMessage::Render(const Screen* s, int key) {
} break;
}
} else {
ChannelMessage::ErrorCode errcode =
ChannelMessage::castPtr2ErrorCode(iter->second);
GeneralChannelMessage::ErrorCode errcode =
GeneralChannelMessage::castPtr2ErrorCode(iter->second);
s->AddStr(col1_width_ + SecondColumnOffset, line,
ChannelMessage::errCode2Str(errcode));
GeneralChannelMessage::errCode2Str(errcode));
}
s->ClearCurrentColor();
}
......
......@@ -30,7 +30,7 @@ class RoleAttributes;
} // cyber
} // apollo
class ChannelMessage;
class GeneralChannelMessage;
// class GeneralMessage;
class CyberTopologyMessage : public RenderableMessage {
......@@ -49,13 +49,17 @@ class CyberTopologyMessage : public RenderableMessage {
CyberTopologyMessage& operator=(const CyberTopologyMessage&) = delete;
void ChangeState(const Screen* s, int key);
bool isFromHere(const std::string& nodeName);
std::map<std::string, GeneralChannelMessage*>::const_iterator findChild(int index) const;
enum class SecondColumnType { MessageType, MessageFrameRatio };
SecondColumnType second_column_;
int col1_width_;
int pid_;
int col1_width_;
const std::string& specified_channel_;
std::map<std::string, ChannelMessage*> all_channels_map_;
std::map<std::string, GeneralChannelMessage*> all_channels_map_;
};
#endif // TOOLS_CVT_MONITOR_CYBER_TOPOLOGY_MESSAGE_H_
......@@ -27,6 +27,99 @@ namespace {
constexpr int ReaderWriterOffset = 4;
} // namespace
double GeneralChannelMessage::max_frmae_ratio_ = 1.0;
const char* GeneralChannelMessage::errCode2Str(
GeneralChannelMessage::ErrorCode errCode) {
const char* ret = "Unknown Error Code";
switch (errCode) {
case GeneralChannelMessage::ErrorCode::NewSubClassFailed:
ret = "Cannot create Parser Object";
break;
case GeneralChannelMessage::ErrorCode::CreateNodeFailed:
ret = "Cannot create Cyber Node";
break;
case GeneralChannelMessage::ErrorCode::CreateReaderFailed:
ret = "Cannot create Cyber Reader";
break;
case GeneralChannelMessage::ErrorCode::MessageTypeIsEmpty:
ret = "Message Type is Empty";
break;
case GeneralChannelMessage::ErrorCode::ChannelNameOrNodeNameIsEmpty:
ret = "Channel Name or Node Name is Empty";
break;
case GeneralChannelMessage::ErrorCode::NoCloseChannel:
ret = "No close Channel";
break;
}
return ret;
}
bool GeneralChannelMessage::isErrorCode(void* ptr) {
GeneralChannelMessage::ErrorCode err =
(GeneralChannelMessage::ErrorCode)(reinterpret_cast<intptr_t>(ptr));
switch (err) {
case GeneralChannelMessage::ErrorCode::NewSubClassFailed:
case GeneralChannelMessage::ErrorCode::CreateNodeFailed:
case GeneralChannelMessage::ErrorCode::CreateReaderFailed:
case GeneralChannelMessage::ErrorCode::MessageTypeIsEmpty:
case GeneralChannelMessage::ErrorCode::ChannelNameOrNodeNameIsEmpty:
return true;
default: {}
}
return false;
}
double GeneralChannelMessage::frame_ratio(void) {
if (!is_enabled() || !has_message_come()) return 0.0;
apollo::cyber::Time curTime = apollo::cyber::Time::Now();
auto deltaTime = curTime - last_time_;
if (deltaTime.ToNanosecond() > 1000000000) {
last_time_ = curTime;
frame_ratio_ = frame_counter_ / deltaTime.ToSecond();
frame_counter_ = 0;
}
if (frame_ratio_ > max_frmae_ratio_) max_frmae_ratio_ = frame_ratio_;
return (frame_ratio_);
}
GeneralChannelMessage* GeneralChannelMessage::OpenChannel(
const std::string& channelName) {
if (channelName.empty() || node_name_.empty()) {
return castErrorCode2Ptr(ErrorCode::ChannelNameOrNodeNameIsEmpty);
}
if (channel_node_ != nullptr || channel_reader_ != nullptr) {
return castErrorCode2Ptr(ErrorCode::NoCloseChannel);
}
channel_node_ = apollo::cyber::CreateNode(node_name_);
if (channel_node_ == nullptr) {
return castErrorCode2Ptr(ErrorCode::CreateNodeFailed);
}
auto callBack =
[this](const std::shared_ptr<apollo::cyber::message::RawMessage>& rawMsg) {
updateRawMessage(rawMsg);
};
channel_reader_ =
channel_node_->CreateReader<apollo::cyber::message::RawMessage>(channelName, callBack);
if (channel_reader_ == nullptr) {
channel_node_.reset();
return castErrorCode2Ptr(ErrorCode::CreateReaderFailed);
}
return this;
}
RenderableMessage* GeneralChannelMessage::Child(int lineNo) const {
return GeneralMessageBase::Child(lineNo);
}
......
......@@ -18,43 +18,178 @@
#define TOOLS_CVT_MONITOR_GENERAL_CHANNEL_MESSAGE_H_
#include "cyber/message/raw_message.h"
#include "cyber_channel_message.h"
#include "general_message_base.h"
class RepeatedItemsMessage;
class CyberTopologyMessage;
class GeneralChannelMessage : public GeneralMessageBase {
static double max_frmae_ratio_;
class GeneralChannelMessage
: public CyberChannelMessage<apollo::cyber::message::RawMessage> {
public:
ChannelMsgSubFactory(GeneralChannelMessage,
apollo::cyber::message::RawMessage);
void Render(const Screen* s, int key) override;
static double max_frame_ratio(void) { return max_frmae_ratio_; }
enum class ErrorCode {
NewSubClassFailed = -1,
CreateNodeFailed = -2,
CreateReaderFailed = -3,
MessageTypeIsEmpty = -4,
ChannelNameOrNodeNameIsEmpty = -5,
NoCloseChannel = -6
};
static const char* errCode2Str(ErrorCode errCode);
static bool isErrorCode(void* ptr);
static ErrorCode castPtr2ErrorCode(void* ptr) {
assert(isErrorCode(ptr));
return static_cast<ErrorCode>(reinterpret_cast<intptr_t>(ptr));
}
static GeneralChannelMessage* castErrorCode2Ptr(ErrorCode errCode) {
return reinterpret_cast<GeneralChannelMessage*>(
static_cast<intptr_t>(errCode));
}
~GeneralChannelMessage() {
channel_node_.reset();
channel_reader_.reset();
channel_message_.reset();
if (raw_msg_class_) {
delete raw_msg_class_;
raw_msg_class_ = nullptr;
}
}
std::string GetChannelName(void) const {
return channel_reader_->GetChannelName();
}
void set_message_type(const std::string& msgTypeName) {
message_type_ = msgTypeName;
}
const std::string& message_type(void) const { return message_type_; }
bool is_enabled(void) const { return channel_reader_ != nullptr; }
bool has_message_come(void) const { return has_message_come_; }
double frame_ratio(void);
const std::string& NodeName(void) const { return node_name_; }
void add_reader(const std::string& reader) { DoAdd(readers_, reader); }
void del_reader(const std::string& reader) { DoDelete(readers_, reader); }
void add_writer(const std::string& writer) { DoAdd(writers_, writer); }
void del_writer(const std::string& writer) {
DoDelete(writers_, writer);
if (!writers_.size()) {
set_has_message_come(false);
}
}
void Render(const Screen* s, int key) override;
RenderableMessage* Child(int lineNo) const override;
void CloseChannel(void) {
if (channel_reader_ != nullptr) {
channel_reader_.reset();
}
if (channel_node_ != nullptr) {
channel_node_.reset();
}
}
private:
explicit GeneralChannelMessage(RenderableMessage* parent = nullptr)
: CyberChannelMessage<apollo::cyber::message::RawMessage>(parent),
explicit GeneralChannelMessage(const std::string& nodeName,
RenderableMessage* parent = nullptr)
: GeneralMessageBase(parent),
current_state_(State::ShowDebugString),
has_message_come_(false),
message_type_(),
frame_counter_(0),
frame_ratio_(0.0),
last_time_(apollo::cyber::Time::Now()),
channel_node_(nullptr),
node_name_(nodeName),
readers_(), writers_(),
channel_message_(nullptr),
channel_reader_(nullptr),
inner_lock_(),
raw_msg_class_(nullptr) {}
GeneralChannelMessage(const GeneralChannelMessage&) = delete;
GeneralChannelMessage& operator=(const GeneralChannelMessage&) = delete;
static void DoDelete(std::vector<std::string>& vec, const std::string& str) {
for (auto iter = vec.begin(); iter != vec.end(); ++iter) {
if (*iter == str) {
vec.erase(iter);
break;
}
}
}
static void DoAdd(std::vector<std::string>& vec, const std::string& str) {
for (auto iter = vec.begin(); iter != vec.end(); ++iter) {
if (*iter == str) {
return;
}
}
vec.emplace_back(str);
}
void updateRawMessage(
const std::shared_ptr<apollo::cyber::message::RawMessage>& rawMsg) {
set_has_message_come(true);
++frame_counter_;
std::lock_guard<std::mutex> _g(inner_lock_);
channel_message_.reset();
channel_message_ = rawMsg;
}
std::shared_ptr<apollo::cyber::message::RawMessage> CopyMsgPtr(
void) const {
decltype(channel_message_) channelMsg;
{
std::lock_guard<std::mutex> g(inner_lock_);
channelMsg = channel_message_;
}
return channelMsg;
}
GeneralChannelMessage* OpenChannel(const std::string& channelName);
void RenderDebugString(const Screen* s, int key, unsigned lineNo);
void RenderInfo(const Screen* s, int key, unsigned lineNo);
void set_has_message_come(bool b) { has_message_come_ = b; }
enum class State { ShowDebugString, ShowInfo } current_state_;
bool has_message_come_;
std::string message_type_;
int frame_counter_;
double frame_ratio_;
apollo::cyber::Time last_time_;
std::unique_ptr<apollo::cyber::Node> channel_node_;
std::string node_name_;
std::vector<std::string> readers_;
std::vector<std::string> writers_;
std::shared_ptr<apollo::cyber::message::RawMessage> channel_message_;
std::shared_ptr<apollo::cyber::Reader<apollo::cyber::message::RawMessage>>
channel_reader_;
mutable std::mutex inner_lock_;
google::protobuf::Message* raw_msg_class_;
friend class RepeatedItemsMessage;
friend class GeneralMessageBase;
friend class CyberTopologyMessage;
}; // GeneralChannelMessage
#endif // TOOLS_CVT_MONITOR_GENERAL_CHANNEL_MESSAGE_H_
......@@ -44,9 +44,6 @@ class GeneralMessage : public GeneralMessageBase {
GeneralMessage(const GeneralMessage&) = delete;
GeneralMessage& operator=(const GeneralMessage&) = delete;
// void PrintRepeatedField(const Screen* s, unsigned& lineNo, int indent,
// int index, int& jumpLines);
int itemIndex_;
const google::protobuf::FieldDescriptor* field_;
......
......@@ -65,7 +65,7 @@ class GeneralMessageBase : public RenderableMessage {
GeneralMessageBase(const GeneralMessageBase&) = delete;
GeneralMessageBase& operator=(const GeneralMessageBase&) = delete;
std::map<const int /* lineNo */, GeneralMessageBase*> children_map_;
std::map<const int, GeneralMessageBase*> children_map_;
};
#endif // TOOLS_CVT_MONITOR_GENERAL_MESSAGE_BASE_H_
......@@ -16,8 +16,6 @@
#include "cyber_topology_message.h"
#include "general_channel_message.h"
#include "channel_msg_factory.h"
#include "screen.h"
#include "cyber/init.h"
......@@ -87,11 +85,6 @@ int main(int argc, char *argv[]) {
FLAGS_alsologtostderr = 0;
FLAGS_colorlogtostderr = 0;
ChannelMsgFactory *f = ChannelMsgFactory::Instance();
f->RegisterChildFactory("apollo::cyber::message::RawMessage",
GeneralChannelMessage::Instance);
f->SetDefaultChildFactory("apollo::cyber::message::RawMessage");
CyberTopologyMessage topologyMsg(val);
auto topologyCallback =
......@@ -120,11 +113,13 @@ int main(int argc, char *argv[]) {
signal(SIGWINCH, SigResizeHandle);
signal(SIGINT, SigCtrlCHandle);
s->SetCurrentRenderMessage(&topologyMsg);
s->Init();
s->Run();
apollo::cyber::Shutdown();
return 0;
}
......@@ -15,7 +15,7 @@
*****************************************************************************/
#include "screen.h"
#include "cyber_channel_message.h"
#include "general_channel_message.h"
#include "cyber_topology_message.h"
#include "renderable_message.h"
......@@ -216,7 +216,7 @@ void Screen::Run() {
(this->*showFuncs[static_cast<int>(current_state_)])(ch);
std::this_thread::sleep_for(std::chrono::milliseconds(
static_cast<int>(1000.0 / (ChannelMessage::max_frame_ratio() + 10.0))));
static_cast<int>(1000.0 / (GeneralChannelMessage::max_frame_ratio() + 10.0))));
} while (canRun_);
}
......@@ -278,6 +278,7 @@ void Screen::ShowRenderMessage(int ch) {
case 'D':
case KEY_RIGHT: {
RenderableMessage* child = current_render_obj_->Child(*y);
if (child) {
child->reset_line_page();
current_render_obj_ = child;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册