提交 aee70518 编写于 作者: xiaonuo911teamo's avatar xiaonuo911teamo

init...

上级 ff7208dd
output/
inner-depend/
code/.DS_Store
opt/linux-x86_64
build/
此差异已折叠。
# zcmf-Zero_coupling_module_framework
# zcmf-zero_coupling_module_framework
这是一个零耦合的模块工程架构。模块间的交互操作以string作为key,通过静态存储区进行交互,从而达到所有模块均不互相依赖的目的。该架构适用于多任务并行执行,并且多有交互的情景。
\ No newline at end of file
这是一个零耦合的模块工程架构。模块间的交互操作以string作为key,通过静态存储区进行交互,从而达到所有模块均不互相依赖的目的。该架构适用于多任务并行执行,并且多有交互的情景。
## 框架背景以及原理
有时候,我们会因为功能的划分,将一部分内容拆分成多个模块,但是由于其内部有具有一定的关联性,如相互之间的函数调用等。在单独编译各个模块的时候,仍然需要对方的头文件,以及运行时对so的依赖。这样的一种设计,我们称其为耦合设计。通常情况下,我们并不喜欢这种强制的耦合关系,它导致我们的系统依赖关系复杂,不够灵活。
所以,产生了很多低耦合,零耦合的设计,而我们的框架也是其中一种。
我们框架的最基础原理是巧妙地利用了,同一个线程具有相同的静态存储区的性质,在这块相同的区域内支持各个模块的请求与响应,从而做到各个模块间的通信。这样做的好处是,在编译时,其他模块间都不需要相互依赖了,仅需要依赖基础的头文件即可。在运行时,不会去检查对方的so是否存在,而是启动后,去静态存储区查找,如果查找无果,则通信失败。
在框架中,我们提供了一种服务机制,一种消息机制。
服务机制介绍:服务由一方提供,并将其函数地址存储在静态存储区,任意模块(包括自身)都可以作为客户从静态存储区中获得该地址,补充适当的参数即可运行。
消息机制介绍:消息由一方订阅,一方推送(可相同)。订阅方提供消息处理函数,推送方直接将消息推送出,并触发消息处理函数(可以是多个)。
不同点:处理函数的实现位置是不同的,可以引用的类成员变量也是不同的。
相同点:实际上都是在客户或者推送方去提供参数,调用处理函数执行的。
## 框架现有的功能
1. 服务机制
服务机制和消息机制在messager中实现, 具体介绍参见[点我](code/src/corelib/include/message/README.md)
2. 消息机制
同上.
3. 模块线程管理基类
4. 统一的log接口
5. 进程CPU 内存实时记录
6. 时序ulog存储
7. 日志落盘
8. 类似与电信号中断机制的信号诊断功能
## 框架后续的发展规划
1. 框架中仍然有部分逻辑过于复杂,后续考虑进行简明化
2. 框架中仍然有部分的效率没有做到最高,后续考虑更好的优化策略
3. 框架中缺少一种自定义的异常处理机制
4. 框架中缺少单元测试模块
5. 线程调度方面有待完善
6. 获取上一条已输出日志的接口,用于自测时
## 本文使用的术语
| 编号 | 术语和缩写 | 解释 |
| --- | --------- | -----|
| 1 | 模块 | 用于描述一个整体功能模块, 通常模块内都会有一个pulgin.cpp文件,去控制模块的启动和停止|
## 框架使用建议
1. 尽量不要自启线程, 框架内包含对线程的调度部分, 可以满足大部分需求.
## 特殊情况使用指导
### 1.单个模块需要阻塞调用
有时,我们会使用到someip ros这类的通讯机制, 其主要通过回调函数完成, 但是都需要调用一个阻塞线程, 用于监听. 这时, 建议不要在模块内自启线程, 而是直接让阻塞线程在 thread_func 函数内阻塞住. 同时, 重载基类函数stop, 实现阻塞线程的退出.
[111](code/src/corelib/include/pipe/README.md)
set(JSONCPP_FOUND TRUE)
set(JSONCPP_INCLUDE_DIR ${THIRD_PARTY_DIR}/jsoncpp/include)
file(GLOB_RECURSE JSONCPP_LIBRARIES ${THIRD_PARTY_DIR}/jsoncpp/lib/lib*.so*)
set(OPENCV_FOUND TRUE)
set(OPENCV_INCLUDE_DIR ${THIRD_PARTY_DIR}/opencv/include)
file(GLOB_RECURSE OPENCV_LIBRARIES ${THIRD_PARTY_DIR}/opencv/lib/libopencv_*)
set(PROTOBUF_FOUND TRUE)
if(NOT PROTOC_EXEC)
set(PROTOC_EXEC ${THIRD_PARTY_DIR}/protobuf/bin/protoc)
endif()
set(PROTOBUF_INCLUDE_DIR ${THIRD_PARTY_DIR}/protobuf/include)
set(PROTOBUF_LIBRARIES ${THIRD_PARTY_DIR}/protobuf/lib/libprotobuf.so)
MACRO(SUBDIRLIST result curdir)
FILE(GLOB children RELATIVE ${curdir} ${curdir}/*)
SET(dirlist "")
FOREACH(child ${children})
IF(IS_DIRECTORY ${curdir}/${child})
LIST(APPEND dirlist ${child})
ENDIF()
ENDFOREACH()
SET(${result} ${dirlist})
ENDMACRO()
MACRO(ADDEXE name dir)
file(GLOB_RECURSE SRC_LIST ${dir}/*.cpp ${dir}/*.cc)
if(SRC_LIST)
add_executable(${name}
${SRC_LIST}
)
target_link_libraries(${name}
${ARGN}
)
endif()
ENDMACRO()
MACRO(ADDLIB name dir)
file(GLOB_RECURSE SRC_LIST ${dir}/*.cpp ${dir}/*.cc)
if(SRC_LIST)
add_library(${name} SHARED
${SRC_LIST}
)
target_link_libraries(${name}
${ARGN}
)
endif()
ENDMACRO()
MACRO(ADDSLIB name dir)
file(GLOB_RECURSE SRC_LIST ${dir}/*.cpp ${dir}/*.cc)
if(SRC_LIST)
add_library(${name} STATIC
${SRC_LIST}
)
target_link_libraries(${name}
${ARGN}
)
endif()
ENDMACRO()
set(ZMQ_FOUND TRUE)
message(zmq found: ${THIRD_PARTY_DIR}/zeromq)
set(ZMQ_INCLUDE_DIR ${THIRD_PARTY_DIR}/zeromq/include)
file(GLOB_RECURSE ZMQ_LIBRARIES ${THIRD_PARTY_DIR}/zeromq/lib/lib*.so)
cmake_minimum_required(VERSION 3.5)
project(zcmf-zero_couping_moudle_framework)
set(CMAKE_CXX_STANDARD 11)
add_compile_options(-fPIC -O3)
set(CMAKE_CXX_FLAGS -rdynamic)
if(NOT BUILD_TYPE)
set(BUILD_TYPE "linux-x86_64" CACHE INTERNAL "BUILD_TYPE")
elseif(NOT CMAKE_TOOLCHAIN_FILE AND ${BUILD_TYPE} STREQUAL "qnx-aarch64")
MESSAGE(FATAL_ERROR "CMAKE_TOOLCHAIN_FILE is not define")
endif()
message(STATUS "++++++++++++++ ${BUILD_TYPE}")
set(CMAKE_MODULE_PATH ${CMAKE_CURRENT_LIST_DIR}/../cmake)
set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${CMAKE_CURRENT_LIST_DIR}/../opt/${BUILD_TYPE}/bin)
set(CMAKE_LIBRARY_OUTPUT_DIRECTORY ${CMAKE_CURRENT_LIST_DIR}/../opt/${BUILD_TYPE}/lib)
set(THIRD_PARTY_DIR ${CMAKE_CURRENT_LIST_DIR}/../inner-depend/Localization_3rdParty/${BUILD_TYPE} CACHE INTERNAL "THIRD_PARTY_DIR")
set(CORELIB_INCLUDE_DIR ${CMAKE_CURRENT_LIST_DIR}/src/corelib/include/ CACHE INTERNAL "CORELIB_INCLUDE_DIR")
set(DATA_INCLUDE_DIR ${CMAKE_CURRENT_LIST_DIR}/src/proto_data/ CACHE INTERNAL "DATA_INCLUDE_DIR")
set(UTILITIES_INCLUDE_DIR ${CMAKE_CURRENT_LIST_DIR}/src/utilities/include/ CACHE INTERNAL "UTILITIES_INCLUDE_DIR")
if(${BUILD_TYPE} STREQUAL "linux-x86_64")
set(DL dl CACHE INTERNAL "DL")
set(PTHREAD pthread CACHE INTERNAL "PTHREAD")
endif()
find_package(Tools REQUIRED)
add_subdirectory(${CMAKE_CURRENT_LIST_DIR}/src/iv_task)
add_subdirectory(${CMAKE_CURRENT_LIST_DIR}/src/corelib)
add_subdirectory(${CMAKE_CURRENT_LIST_DIR}/src/proto_data)
add_subdirectory(${CMAKE_CURRENT_LIST_DIR}/src/server_log)
add_subdirectory(${CMAKE_CURRENT_LIST_DIR}/src/server_proc)
add_subdirectory(${CMAKE_CURRENT_LIST_DIR}/src/utilities)
if(${BUILD_TYPE} STREQUAL "linux-x86_64")
endif()
cmake_minimum_required(VERSION 2.8)
project(corelib)
add_compile_options(-std=c++11)
include_directories(
${CMAKE_CURRENT_SOURCE_DIR}/include
)
ADDLIB(${PROJECT_NAME} src)
## 自定义数据类型介绍
多线程并行是本框架的特点之一, 所以通常都会对变量的线程安全性有要求, 所以这里就实现了常用的几种线程安全的数据结构模板.使用时,直接传入模板类型即可.目前已经实现的结构有, 针对普通变量的DoubleBufferData, 针对队列的DoubleBufferedQueue, 针对数组的DoubleBufferedVector.
### 快速上手
所有的结构中为了方便使用都引用了下面的定义.
1. 线程锁 _data_mutex: 保证线程安全
2. 原子变量 _data_buffer_updated: 表示数据是否进行的更新, 相对于上一次使用get_data来说.
3. 原子变量 _has_data: 表示在构造出来之后,是否有进行过赋值, 即调用set_data
4. 赋值函数 set_data: 注入一个新的值, 更新_data_buffer_updated和_has_data
5. 取值函数 get_data: 取出当前值, 更新_data_buffer_updated为false
6. 窥视函数 peek_data: 区别于get_data, peek_data不会去更新内部状态, 只是偷看一眼数据.
7. 查看数据更新状态函数 is_updated
8. 查看是否已载入数据函数 has_data
### 仍需优化部分
1. DoubleBufferedQueue和DoubleBufferedVector, 对swap的实现部分, 存在争议, 具体参考定义部分, [double_buffered_queue](./double_buffered_queue.hpp), [double_buffered_vector](./double_buffered_vector.hpp)
#pragma once
typedef int int_t;
typedef long long_t;
typedef float float_t;
typedef double double_t;
typedef char char_t;
typedef short short_t;
#ifndef DEFINITIONS_H
#define DEFINITIONS_H
#include <math.h>
#ifdef M_PI
# undef M_PI
#endif
#define M_PI (3.141592653589793)
#ifdef M_PI_2
# undef M_PI_2
#endif
#define M_PI_2 (M_PI / 2)
#define M_GOLDEN (1.6180339)
#define M_2PI (M_PI * 2)
// MATH_CHECK_INDEXES modifies some objects (e.g. SoloGimbalEKF) to
// include more debug information. It is also used by some functions
// to add extra code for debugging purposes. If you wish to activate
// this, do it here or as part of the top-level Makefile -
// e.g. Tools/Replay/Makefile
#ifndef MATH_CHECK_INDEXES
#define MATH_CHECK_INDEXES (0)
#endif
#ifndef DEG_TO_RAD
#define DEG_TO_RAD (M_PI / 180.0f)
#endif
#ifndef RAD_TO_DEG
#define RAD_TO_DEG (180.0 / M_PI)
#endif
// Centi-degrees to radians
static const double DEGX100 = 5729.57795;
// GPS Specific double precision conversions
// The precision here does matter when using the wsg* functions for converting
// between LLH and ECEF coordinates.
static const double DEG_TO_RAD_DOUBLE = asin(1) / 90;
static const double RAD_TO_DEG_DOUBLE = 90 / asin(1);
// acceleration due to gravity in m/s/s
static const double GRAVITY_MSS = 9.80665;
// radius of earth in meters
static const double RADIUS_OF_EARTH = 6378100;
// convert a longitude or latitude point to meters or centimeters.
// Note: this does not include the longitude scaling which is dependent upon location
static const double LATLON_TO_M = 0.01113195;
static const double LATLON_TO_CM = 1.113195;
static const double G_ACCEL = (9.80151);
static const double RHO = (1.225);
/* WGS84 reference ellipsoid constants */
// Semi-major axis of the Earth, in meters.
static const double WGS84_A = 6356752.3142 / 0.996647189335;
//Inverse flattening of the Earth
static const double WGS84_IF = 2982572.23563;
// The flattening of the Earth
static const double WGS84_F = (1.0 / WGS84_IF);
// Semi-minor axis of the Earth in meters
static const double WGS84_B = 6356752.3142;
// Eccentricity of the Earth
static const double WGS84_E = (sqrt(2 * (1.0 / 298.257223563) - (1.0 / 298.257223563) *
(1.0 / 298.257223563)));
// Polar Radius of Curvature
static const double WGS84_C = 639.95936258;
static const double WGS84_E2(WGS84_E * WGS84_E);
static const double WGS84_A2(WGS84_A * WGS84_A);
static const double WGS84_B2(WGS84_B * WGS84_B);
static const double WGS84_AB2(WGS84_A2 * WGS84_B2);
// air density at 15C at sea level in kg/m^3
static const double AIR_DENSITY_SEA_LEVEL = 1.225;
static const double C_TO_KELVIN = 273.15;
// Gas Constant is from Aerodynamics for Engineering Students, Third Edition, E.L.Houghton and N.B.Carruthers
static const double ISA_GAS_CONSTANT = 287.26;
static const double ISA_LAPSE_RATE = 0.0065;
/*
use AP_ prefix to prevent conflict with OS headers, such as NuttX
clock.h
*/
#define AP_NSEC_PER_SEC 1000000000ULL
#define AP_NSEC_PER_USEC 1000ULL
#define AP_USEC_PER_SEC 1000000ULL
#define AP_USEC_PER_MSEC 1000ULL
#define AP_MSEC_PER_SEC 1000ULL
#define AP_SEC_PER_WEEK (7ULL * 86400ULL)
#define AP_MSEC_PER_WEEK (AP_SEC_PER_WEEK * AP_MSEC_PER_SEC)
#endif
#pragma once
#include <atomic>
#include <mutex>
// 自带有线程保护的模板结构
template <class T>
class DoubleBufferData
{
public:
DoubleBufferData():
_data_buffer_updated(false), _has_data(false) {}
DoubleBufferData(const T& data):
_data_buffer_updated(false){
set_data(data);
}
DoubleBufferData& operator=(const T& data){
set_data(data);
return *this;
}
void set_data(const T& data){
_data_mutex.lock();
_data_buffer = data;
_data_buffer_updated = true;
_has_data = true;
_data_mutex.unlock();
}
const T get_data(){
T data;
_data_mutex.lock();
data = _data_buffer;
_data_buffer_updated = false;
_data_mutex.unlock();
return data;
}
const T peek_data(){
T data;
_data_mutex.lock();
data = _data_buffer;
_data_mutex.unlock();
return data;
}
// 数据是否有更新
bool is_updated(){
return _data_buffer_updated;
}
// 是否有载入数据
bool has_data() {
return _has_data;
}
private:
T _data_buffer;
std::mutex _data_mutex;
std::atomic_bool _data_buffer_updated;
std::atomic_bool _has_data;
};
#pragma once
#include <deque>
#include <atomic>
#include <mutex>
// 带有线程保护的queue
template <class T>
class DoubleBufferedQueue {
public:
DoubleBufferedQueue(int max_size = 10):
_data_buffer_updated(false),
_max_size(max_size) {
}
void push_data(const T& data) {
std::lock_guard<std::mutex> lg(data_mutex);
_data_buffer.push_back(data);
_data_buffer_updated = true;
if (_data_buffer.size() > _max_size) {
_data_buffer.pop_front();
_overflowed = true;
}
}
void push_data(const std::deque<T>& data) {
std::lock_guard<std::mutex> lg(data_mutex);
_data_buffer.clear();
for (auto& t : data){
_data_buffer.push_back(t);
_data_buffer_updated = true;
if (_data_buffer.size() > _max_size) {
_data_buffer.pop_front();
_overflowed = true;
}
}
_data_mutex.unlock();
}
// 区别与peek_data, get_data会取出原数据, 而peek_data仅仅是复制出来
const std::deque<T> get_data() {
if (_data_buffer_updated) {
_data_mutex.lock();
_data.clear();
_data.swap(_data_buffer);
_data_buffer_updated = false;
_overflowed = false;
_data_mutex.unlock();
}
return _data;
}
// 区别与get_data, peek_data仅仅是复制出来, 而get_data会取出原数据
const std::deque<T> peek_data() {
std::deque<T> data;
if (_data_buffer_updated) {
_data_mutex.lock();
data = _data_buffer;
_data_mutex.unlock();
}
return data;
}
void clear() {
_data.clear();
_data_mutex.lock();
_data_buffer.clear();
_overflowed = false;
_data_buffer_updated = false;
_data_mutex.unlock();
}
// 与std中deque不同, 此swap只能做到单向交换, 可以通过swap的方式将数据取出, 但是外部的数据不会进入到_data_buffer
// 即连续使用两次 swap 并不会等于无操作.
// TODO:
// 1. 完成线程安全的双向交换
// 2. 同时, 此结构中有定义最大元素个数, 直接交换存储区, 不能保证内部数据安全. 故后续将引入主动抛出异常的情况, 防止这类错误发生
void swap(std::deque<T>& que_data) {
get_data();
_data.swap(que_data);
}
bool is_updated() {
return _data_buffer_updated;
}
bool is_overflowed() {
return _overflowed;
}
private:
std::deque<T> _data;
std::deque<T> _data_buffer;
std::mutex _data_mutex;
std::atomic<bool> _data_buffer_updated;
int _max_size;
bool _overflowed = false;
};
#pragma once
#include <vector>
#include <atomic>
#include <mutex>
// 带有线程保护的vector
// 与deque不同, vector中的数据满了以后, 如果不执行clear get_data, 将不会再更新数据
template <class T>
class DoubleBufferedVector {
public:
DoubleBufferedVector(int max_size = 10):
_data_buffer_updated(false),
_max_size(max_size) {
}
void push_data(const T& data) {
std::lock_guard<std::mutex> lg(_data_mutex);
_data_buffer.push_back(data);
if (_data_buffer.size() > _max_size) {
_data_buffer.pop_back();
_overflowed = true;
}
_data_buffer_updated = true;
}
void push_data(const std::vector<T>& data) {
std::lock_guard<std::mutex> lg(_data_mutex);
_data_buffer.clear();
for (auto& t : data){
_data_buffer.push_back(t);
if (_data_buffer.size() > _max_size) {
_data_buffer.pop_back();
_overflowed = true;
break;
}
}
_data_buffer_updated = true;
}
// 区别与peek_data, get_data会取出原数据, 而peek_data仅仅是复制出来
const std::vector<T> get_data() {
std::lock_guard<std::mutex> lg(_data_mutex);
_data.clear();
if (_data_buffer_updated) {
_data.swap(_data_buffer);
_data_buffer_updated = false;
_overflowed = false;
}
return _data;
}
// 区别与get_data, peek_data仅仅是复制出来, 而get_data会取出原数据
const std::vector<T> peek_data() {
std::vector<T> _data;
if (_data_buffer_updated) {
_data_mutex.lock();
_data = _data_buffer;
_data_mutex.unlock();
}
return _data;
}
void clear() {
std::lock_guard<std::mutex> lg(_data_mutex);
_data_buffer.clear();
_overflowed = false;
_data_buffer_updated = false;
}
// TODO:
// 1. 完成线程安全的双向交换
// 2. 同时, 此结构中有定义最大元素个数, 直接交换存储区, 不能保证内部数据安全. 故后续将引入主动抛出异常的情况, 防止这类错误发生
void swap(std::vector<T>& vec_data) {
std::lock_guard<std::mutex> lg(_data_mutex);
if (!_data_buffer_updated) {
_data_buffer.clear();
}
_data_buffer.swap(vec_data);
}
// 虽然是get操作, 但是为了保持多线程的一致性, 内部仍然加了线程锁
size_t size() {
std::lock_guard<std::mutex> lg(_data_mutex);
return _data_buffer.size();
}
bool is_updated() {
return _data_buffer_updated;
}
bool is_overflowed() {
return _overflowed;
}
private:
std::vector<T> _data;
std::vector<T> _data_buffer;
std::mutex _data_mutex;
std::atomic<bool> _data_buffer_updated;
int _max_size;
bool _overflowed = false;
};
#pragma once
/******************************************************************************
模拟中断触发方式
学电路的同学会比较熟悉,中断触发方式分为电平触发和边沿触发。百度一下可以知道,电平触发
是判断到电信号为0或1时,触发中断,处理中断函数。而边沿触发是接收到从0到1或从1到0的跳变,
触发中断,处理中断函数。
******************************************************************************/
#include <message/messager.hpp>
#include <regex>
#include <log/logging.h>
// 触发方式
// 这里将电平触发和边沿触发, 拆分为 高电平触发、低电平触发、上升沿触发、下降沿触发、跳变触发
enum TriggerType{
TRUE, // 高电平触发, 当信号为1时触发
FALSE, // 低电平触发, 当信号为0时触发
UP, // 上升沿触发, 当信号从0变为1时触发
DOWN, // 下降沿触发, 当信号从1变为0时触发
UPDOWN // 跳变触发, 上升沿和下降沿均可触发
};
class Diagnose{
public:
// 功能等同于Message中的add_server_func
// 此处register_server的含义是为了注册一个用于诊断的服务,所以内部自动在服务名称后面追加了 "_diag"
static void register_server(const std::string& name, std::function<std::string(const std::string&)> func){
if (Messager::has_server<std::string(const std::string&)>(name + "__diag")) {
WARNING() << " diag server named " << name << " is already exist!";
} else {
Messager::add_server_func(name + "__diag", func);
}
}
// 注册以name为名的action,以trigger_type方式触发中断函数func,为了方式触发过于频繁,可以设置触发间隔trigger_times
template<class T>
static void register_diag_action(const std::string& name,
std::function<void(const T&)> func,
TriggerType trigger_type = TRUE, int trigger_times = 1){
auto& trigger_times_map = get_trigger_times_map<T>();
auto& condition_map = get_condition_map<T>();
auto& last_condition_map = get_last_condition_map<T>();
static int trigger_times_id = 0;
trigger_times_id++;
int trigger_times_id_captured = trigger_times_id;
auto trigger_func = [=, &trigger_times_map](const T& data, bool is_change){
auto& func_trigger_times = trigger_times_map[trigger_times_id_captured];
if (is_change){
func_trigger_times = 0;
}
if (++func_trigger_times >= trigger_times){
func(data);
func_trigger_times = 0;
}
};
Messager::subcribe<bool, T>(name + "__diag", [=, &condition_map, &last_condition_map](bool condition, const T& data){
auto& last_condition = condition_map[name];
bool is_changed_true_false = last_condition ^ condition;
bool has_last_last_condition = last_condition_map.find(name) != last_condition_map.end();
bool is_changed_up_down = false;
if (has_last_last_condition){
is_changed_up_down = !last_condition_map[name] ^ last_condition;
}
switch (trigger_type) {
case TRUE:
if (condition){
trigger_func(data, is_changed_true_false);
}
break;
case FALSE:
if (!condition){
trigger_func(data, is_changed_true_false);
}
break;
case UP:
if (!last_condition && condition){
trigger_func(data, is_changed_up_down);
}
break;
case DOWN:
if (last_condition && !condition){
trigger_func(data, is_changed_up_down);
}
break;
case UPDOWN:
if (last_condition ^ condition){
trigger_func(data, is_changed_up_down);
}
break;
default:;
}
});
}
static void register_diag_action(const std::string& name,
std::function<void()> func,
TriggerType trigger_type = TRUE, int trigger_times = 1){
register_diag_action<bool>(name, [func](const bool&){
func();
}, trigger_type, trigger_times);
}
// 激活以name为名的action,信号量为condition,就是0或1,data对应处理函数中需要的参数
template<class T>
static void fire_diag_condition(const std::string& name, bool condition, const T& data){
auto& condition_map = get_condition_map<T>();
auto& last_condition_map = get_last_condition_map<T>();
if (condition_map.find(name) != condition_map.end()) {
Messager::publish(name + "__diag", condition, data);
}
last_condition_map[name] = condition_map[name];
condition_map[name] = condition;
}
static void fire_diag_condition(const std::string& name, bool condition){
fire_diag_condition(name, condition, true);
}
// 调用以name为名的服务,以value为参数,使用data为返回值
// 服务name使用register_server进行注册
// 此处只实现了string类型的特化版本,用于输出诊断监控的数据信息,详情可看test
static bool diag_call(const std::string& name, const std::string& value, std::string& data){
return diag_call("name:" + name + ";value:" + value, data);
}
// 从字串 'name:***;value:***',解析出name和value,调用diag(name, value, data)
static bool diag_call(const std::string& input, std::string& data){
static std::regex regex("\\s*name:\\s*(\\w*)\\s*;\\s*value:\\s*(\\w*)\\s*");
std::smatch result;
bool retval = false;
if (!input.empty() && std::regex_match(input, result, regex)) {
std::string name = result[1];
std::string value = result[2];
auto func = Messager::get_server_func<std::string(const std::string&)>(name + "__diag");
if (func){
data = func(value);
retval = true;
} else {
data = "diag name not found: ";
data += name;
}
} else {
data = input + "-->input format error! example: name: name1; value: [123]";
}
return retval;
}
public:
// 手动注册diag_call中调用服务的储存结构,不手动调用时,会自动注册
static void register_diag_server_map(){
Messager::register_server_map<std::string(const std::string&)>();
}
// 手动注册模板为T,中断触发系统,不手动调用时,会自动注册
template<class T>
static void register_condition_data_map(){
Messager::register_data_map<bool, T>();
get_condition_map<T>();
get_last_condition_map<T>();
get_trigger_times_map<T>();
}
private:
template<class T>
static std::map<std::string, bool> &get_condition_map(){
static std::map<std::string, bool> condition_map;
return condition_map;
}
template<class T>
static std::map<std::string, bool> &get_last_condition_map(){
static std::map<std::string, bool> condition_map;
return condition_map;
}
template<class T>
static std::map<int, int> &get_trigger_times_map(){
static std::map<int, int> trigger_times_map;
return trigger_times_map;
}
};
/*************************************************************************************************
* 频率计算模块
*
* 这是一个单例模块, 用于对各种周期信号的频率计算.
* Frequence 内部通过map结构保存对象的, 使用<name, frequeue>的方式进行保存, 所以每种信号都必须有独立的名字
*
* 使用方法:
*
* thread1: {
* Frequence::instance().trigger_once("signal");
* }
*
* thread2: {
* auto freq = Frequence::instance().get_frequence("signal");
* std::cout << "signal's frequence: " << freq << std::endl;
* }
*
/************************************************************************************************/
#pragma once
#include <pipe/timer_element.hpp>
#include <map>
#include <string>
#include <mutex>
//
class Frequence : public TimerElement
{
private:
struct FCounter
{
int counter;
double frequence; // Hz
};
std::map<std::string, FCounter> _counter_map;
std::mutex _mutex;
public:
~Frequence(){
stop();
wait();
}
public:
static void trigger_once(const std::string& name){
instance().trigger_inner(name);
}
static double get_frequence(const std::string &name)
{
return instance().frequence_inner(name);
}
static void initail_instance() {
instance();
}
private:
// 3秒累计一次频率
Frequence(): TimerElement(3000, "Frequence"){
start();
}
virtual void timer_func() override {
std::lock_guard<std::mutex> lg(_mutex);
for (auto &counter : _counter_map)
{
counter.second.frequence = 1000.0 * counter.second.counter / get_interval();
counter.second.counter = 0;
}
}
void trigger_inner(const std::string& name)
{
std::lock_guard<std::mutex> lg(_mutex);
_counter_map[name].counter++;
}
double frequence_inner(const std::string &name)
{
std::lock_guard<std::mutex> lg(_mutex);
double frequence = _counter_map[name].frequence;
return frequence;
}
static Frequence& instance(){
static Frequence frequence;
return frequence;
}
};
// TODO: 补充注释和说明
#pragma once
#include <log/logging.h>
#include <sys/socket.h>
#include <arpa/inet.h>
class UdpInterface {
public:
~UdpInterface() {
}
protected:
UdpInterface() {
_fd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP);
}
public:
void set_send_timeout(int time_ms) {
int rc = 0;
struct timeval tv;
tv.tv_sec = time_ms / 1000;
tv.tv_usec = time_ms % 1000 * 1000;
rc = setsockopt(_fd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv));
FATAL_IF(rc != 0) << "setsockopt faild! rc = " << rc << "; errno = " << errno;
_send_time_out = time_ms;
}
void set_recv_timeout(int time_ms) {
int rc = 0;
struct timeval tv;
tv.tv_sec = time_ms / 1000;
tv.tv_usec = time_ms % 1000 * 1000;
rc = setsockopt(_fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv));
FATAL_IF(rc != 0) << "setsockopt faild! rc = " << rc << "; errno = " << errno;
_recv_time_out = time_ms;
}
protected:
int _fd = -1;
int _recv_time_out = -1;
int _send_time_out = -1;
};
class UdpResponser : public UdpInterface {
public:
UdpResponser() {
_recv_buffer.resize(200 * 1024);
}
void set_reponse_func(const std::function<std::string(const std::string&)>& func) {
_rep_func = func;
}
void register_responser(int port) {
_port = port;
_address.sin_family = AF_INET;
_address.sin_port = htons(_port);
inet_aton("0.0.0.0", &_address.sin_addr);
_address_len = sizeof(struct sockaddr_in);
if (bind(this->_fd, (struct sockaddr*)&_address, _address_len) == -1){
FATAL() << "[listen_on_port] with [port=" << port << "] Cannot bind socket";
}
}
void receive_and_respose() {
struct sockaddr_in address;
int size = recvfrom(this->_fd, (char*)_recv_buffer.c_str(), _recv_buffer.size(),
0, (struct sockaddr*)&address, &_address_len);
if (size > 0 && _rep_func) {
auto rep_str = _rep_func(_recv_buffer.substr(0, size));
sendto(this->_fd, rep_str.data(), rep_str.length(), 0, (struct sockaddr*) &address, _address_len);
}
}
private:
std::function<std::string(const std::string&)> _rep_func;
std::string _recv_buffer;
int _port;
struct sockaddr_in _address;
socklen_t _address_len;
};
class UdpRequester : public UdpInterface {
public:
UdpRequester() {
_recv_buffer.resize(200 * 1024);
}
void register_requester(const std::string& ip, int port) {
_port = port;
_ip = ip;
}
bool request_and_receive(const std::string& req_data, std::string& rep_data) {
struct sockaddr_in address;
address.sin_family = AF_INET;
address.sin_port = htons(_port);
inet_aton(_ip.c_str(), &address.sin_addr);
socklen_t address_len = sizeof (address);
int size = sendto(this->_fd, req_data.data(), req_data.length(), 0, (struct sockaddr*) &address, address_len);
if (size > 0) {
size = recvfrom(this->_fd, (char*)_recv_buffer.c_str(), _recv_buffer.size(),
0, (struct sockaddr*)&address, &address_len);
if (size > 0) {
rep_data = _recv_buffer.substr(0, size);
}
}
return size > 0;
}
private:
std::string _recv_buffer;
std::string _ip;
int _port;
};
## log模块说明
log模块中定义了 DEBUG/INFO/WARNING/ERROR/FATAL/DIRECT 几种输出方式,同时还会publish每个消息,
log_debug/log_info/log_warning/log_error/log_fatal/log_direct,可以通过subscribe获得这些消息。
日志的输出格式如:[INFO tid:1988 05/15 17:51 file.cpp:348] Hello World!
如果定义USE_GLOG,则使用glog的实现方式,否则使用iv_log中的实现。
### 使用方式
0. 引入logging.hpp, 对应库目录
1. SET_LOG_LEVEL(level):设置日志的输出等级。Debug < INFO < WARNING < ERROR < FATAL < DIRECT
2. 使用你希望的输出方式,输出log。如INFO,ERROR_IF, DIRECT_NOT_IF
### 几种方式介绍
- DEBUG(): 用于非常细致的日志输出,可以控制调试的日志输出
- INFO(): 用于普通的日志输出, 可以是稍微重要一些的状态更新
- WARNING(): 用于警告级别的日志输出
- ERROR(): 用于一般错误的输出,即不影响程序运行的错误
- FATAL(): 用于严重错误的输出,即影响严重错误的错误
- DIRECT(): 最高级别的输出方式
- DATAINFO(name, value): 用于输出一个变量的值。
- **_IF(condition): 如果 condition==true,则输出内容
- **_IF_NOT(condition): 如果 condition==false,则输出内容
### 注意事项
1. 通常与utils模块中的TIME_LIMIT_EXEC(time_ms)连用。可以用于控制日志的输出频率。如:
```
// 每秒输出一条 Hello World!
TIME_LIMIT_EXEC(1000) {
INFO() << "Hello World!";
}
```
2. DEBUG/INFO/DIRECT 都是标准输出,ERROR/FATAL 是标准错误输出。
\ No newline at end of file
#pragma once
#include "log_interface.h"
#include <message/messager.hpp>
namespace iv_log {
class Debug : public LogInterface{
public:
Debug(const std::string& file, int line) : LogInterface([&](const std::stringstream& ss){
Messager::publish("log_debug", ss.str());
std::cout << ss.str();
}, file, line, DEBUG){
}
};
class Info : public LogInterface{
public:
Info(const std::string& file, int line) : LogInterface([&](const std::stringstream& ss){
Messager::publish("log_info", ss.str());
std::cout << ss.str();
}, file, line, INFO){
}
};
class Warning : public LogInterface{
public:
Warning(const std::string& file, int line) : LogInterface([&](const std::stringstream& ss){
Messager::publish("log_warning", ss.str());
std::cerr << ss.str();
}, file, line, WARNING){
}
};
class Error : public LogInterface{
public:
Error(const std::string& file, int line) : LogInterface([&](const std::stringstream& ss){
Messager::publish("log_error", ss.str());
std::cerr << ss.str();
}, file, line, ERROR){
}
};
class Fatal : public LogInterface{
public:
Fatal(const std::string& file, int line) : LogInterface([&](const std::stringstream& ss){
Messager::publish("log_fatal", ss.str());
std::cerr << ss.str();
throw std::bad_exception();
}, file, line, FATAL){
}
};
class Direct : public LogInterface{
public:
Direct(const std::string& file, int line) : LogInterface([&](const std::stringstream& ss){
Messager::publish("log_direct", ss.str());
std::cerr << ss.str();
}, file, line, DIRECT){
}
};
}
#pragma once
#include <sstream>
#include <iostream>
#include <functional>
#include <utils/app_util.hpp>
#include <pthread.h>
enum LogLevel{
DEBUG = 0,
INFO = 1,
WARNING = 2,
ERROR = 3,
FATAL = 4,
DIRECT = 9999
};
class LogInterface{
public:
LogInterface(const std::function<void(const std::stringstream&)> logging_func,
const std::string& file, int line, LogLevel level){
_logging_func = logging_func;
_level = level;
_ss << "[" << get_log_level_string() << " "
<< "tid:" << pthread_self() << " "
<< AppUtil::now_date() << " " << AppUtil::now_time()
<< " " << file.substr(file.find_last_of('/') + 1)
<< ":" << line << "] ";
}
template<class T>
LogInterface& operator << (const T& t){
_ss << t;
return *this;
}
LogInterface& operator<<(std::ostream& (*fun)(std::ostream&)) {
fun(_ss);
return *this;
}
~LogInterface(){
if (get_global_log_level() <= _level){
_ss << std::endl;
_logging_func(_ss);
}
}
static void set_log_level(LogLevel log_level){
get_global_log_level() = log_level;
}
static std::string log_level_to_string(LogLevel log_level){
switch (log_level) {
case DEBUG:
return "DEBUG";
case INFO:
return "INFO";
case WARNING:
return "WARNING";
case ERROR:
return "ERROR";
case FATAL:
return "FATAL";
case DIRECT:
return "DIRECT";
default: return "UNKNOWN";
}
}
private:
static LogLevel& get_global_log_level(){
static LogLevel log_level = INFO;
return log_level;
}
std::string get_log_level_string(){
return log_level_to_string(_level);
}
private:
std::stringstream _ss;
std::string _file;
int _line;
LogLevel _level = INFO;
std::function<void(const std::stringstream&)> _logging_func;
LogInterface& operator = (const LogInterface&);
};
#pragma once
//ceres依赖glog,所以不能去除glog
#ifdef USE_GLOG
#include <glog/logging.h>
#define DEBUG() LOG(DEBUG)
#define INFO() LOG(INFO)
#define WARNING() LOG(WARNING)
#define ERROR() LOG(ERROR)
#define FATAL() LOG(FATAL)
#define DIRECT() VLOG(0)
#define DATAINFO(name, value) INFO() << #name": " << value
#define SET_LOG_LEVEL(level) FLAGS_minloglevel=google::GLOG_##level
#else
#include "iv_log.h"
#define GLOG_NO_ABBREVIATED_SEVERITIES
#define DEBUG() iv_log::Debug(__FILE__, __LINE__)
#define INFO() iv_log::Info(__FILE__, __LINE__)
#define WARNING() iv_log::Warning(__FILE__, __LINE__)
#define ERROR() iv_log::Error(__FILE__, __LINE__)
#define FATAL() iv_log::Fatal(__FILE__, __LINE__)
#define DIRECT() iv_log::Direct(__FILE__, __LINE__)
#define DATAINFO(name, value) INFO() << #name": " << value
#define DATAINFO1(v) DATAINFO(v, v)
#define SET_LOG_LEVEL(level) LogInterface::set_log_level(level)
#endif
#define DEBUG_IF(condition) if(condition) DEBUG() << #condition" is true: "
#define INFO_IF(condition) if(condition) INFO() << #condition" is true: "
#define WARNING_IF(condition) if(condition) WARNING() << #condition" is true: "
#define ERROR_IF(condition) if(condition) ERROR() << #condition" is true: "
#define FATAL_IF(condition) if(condition) FATAL() << #condition" is true: "
#define DIRECT_IF(condition) if(condition) DIRECT() << #condition" is true: "
#define DEBUG_IF_NOT(condition) if(!condition) DEBUG() << #condition" is false: "
#define INFO_IF_NOT(condition) if(!condition) INFO() << #condition" is false: "
#define WARNING_IF_NOT(condition) if(!condition) WARNING() << #condition" is false: "
#define ERROR_IF_NOT(condition) if(!condition) ERROR() << #condition" is false: "
#define FATAL_IF_NOT(condition) if(!condition) FATAL() << #condition" is false: "
#define DIRECT_IF_NOT(condition) if(!condition) DIRECT() << #condition" is false: "
## messager 模块说明
此模块中定义了一种消息交互方式,publish负责推送,subscribe负责订阅。其原理是通过subscribe在静态存储区定义了函数,通过publish指定其函数名称,即可调用此静态函数。
此模块中还定义了一种服务的实现方式。add_server_func用于注册服务,get_server_func取得服务,另外还有has_server和remove_server_func可以用于判断服务是否存在和删除服务。
> 消息交互和服务两种方式看起来确有相似之处,但是其含义有明显不同。消息交互目的是推送参数,然后由订阅的函数进行相应的处理,不同的地方,也可以对相同的消息做出不同的处理。而服务由一个模块提供,其实现也只能有一个,在其他模块中也只能是补充参数进行调用。
### 使用方式
1. 消息交互
在一处或多处使用subscribe订阅消息,并设置对应的处理方式。在另一处使用publish推送消息,此时会自动调用多处subscribe的处理方式。
2. 服务
在一处使用add_server_func注册服务,在另一处使用get_server_func,获取服务并给予适当的参数,获取函数的反馈或为服务中更新一些东西。
> 注:无论哪一种,单独调用一方面都是安全的。只使用subscribe没有publish,则不会触发调用。只有publish则不会做任何处理,也不会有问题。服务也是类似。
### 服务机制使用示例
设置场景: 模块A中提供生产<font color=red>香飘飘奶茶</font>(赞助商)的服务, 模块B中,需要使用生产奶茶的服务.
```
// 奶茶的定义
class Meco {
public:
// 提供大中小杯子, 和甜度要求. 生成相应的奶茶
Meco(int _size, float _sugar) : size(_size), sugar(_sugar) {}
private:
int size;
float sugar;
};
```
```
// 模块A
Messager::add_server_func<Meco(int, float)>("get_a_meco", [](int size, float sugar) -> Meco {
return Meco(size, sugar);
});
```
```
// 模块B,
if (Messager::has_server("get_a_meco")) {
auto get_a_meco = get_server_func("get_a_meco");
// 这个if是必须要加的, 上面的has_server是可以不加的. 在多线程中, 存在在其他线程调用remove_server_func函数的可能, 导致这里获得空函数, 直接调用出错.
// get_server_func调用是安全的, 如果服务不存在, 只是返回空指针而已, 所以has_server是可以省略的
if (get_a_meco) {
auto my_meco = get_a_meco(2, 0.8); // 中杯, 8分甜
// .....
}
}
```
### 消息机制使用示例
设置场景: 在马戏团中的一只海豚, 听驯兽师的指令, 做出规范动作. 模块A模拟驯兽师发出指令, 模块B模拟海豚响应指令.
```
// 模块A
// 设定一次 "play_ball" 指令持续ns, 听到"stop"指令或超过ns, 则停止.
Messager::publish("play_ball", 10);
sleep(7);
Messager::publish("stop");l
```
### message 模块接口详细说明
1. Messager构造函数, 已设置delete. Message为静态类结构, 不支持构造对象.
2.
\ No newline at end of file
#pragma once
#include <map>
#include <functional>
#include <string>
#include <vector>
class Messager {
public:
Messager() = delete;
static void subcribe(const std::string &key, std::function<void()> func) {
auto &messager_map = get_messager_map();
auto &funcs = messager_map[key];
funcs.push_back(func);
}
template<typename T>
static void subcribe(const std::string &key, std::function<void(const T &)> func) {
auto &messager_map = get_messager_map<T>();
auto &funcs = messager_map[key];
funcs.push_back(func);
}
template<typename T0, typename T1>
static void subcribe(const std::string &key, std::function<void(const T0 &, const T1 &)> func) {
auto &messager_map = get_messager_map<T0, T1>();
auto &funcs = messager_map[key];
funcs.push_back(func);
}
template<typename T0, typename T1, typename T2>
static void subcribe(const std::string &key, std::function<void(const T0 &, const T1 &, const T2 &)> func) {
auto &messager_map = get_messager_map<T0, T1, T2>();
auto &funcs = messager_map[key];
funcs.push_back(func);
}
template<typename T0, typename T1, typename T2, typename T3>
static void
subcribe(const std::string &key, std::function<void(const T0 &, const T1 &, const T2 &, const T3 &)> func) {
auto &messager_map = get_messager_map<T0, T1, T2, T3>();
auto &funcs = messager_map[key];
funcs.push_back(func);
}
template<typename T0, typename T1, typename T2, typename T3, typename T4>
static void subcribe(const std::string &key,
std::function<void(const T0 &, const T1 &, const T2 &, const T3 &, const T4 &)> func) {
auto &messager_map = get_messager_map<T0, T1, T2, T3, T4>();
auto &funcs = messager_map[key];
funcs.push_back(func);
}
static void publish(const std::string &key) {
auto &messager_map = get_messager_map();
auto &funcs = messager_map[key];
for (const auto &func : funcs) {
func();
}
}
template<typename T>
static void publish(const std::string &key, const T &value) {
auto &messager_map = get_messager_map<T>();
auto &funcs = messager_map[key];
for (const auto &func : funcs) {
func(value);
}
}
template<typename T0, typename T1>
static void publish(const std::string &key, const T0 &value0, const T1 &value1) {
auto &messager_map = get_messager_map<T0, T1>();
auto &funcs = messager_map[key];
for (const auto &func : funcs) {
func(value0, value1);
}
}
template<typename T0, typename T1, typename T2>
static void publish(const std::string &key, const T0 &value0, const T1 &value1, const T2 &value2) {
auto &messager_map = get_messager_map<T0, T1, T2>();
auto &funcs = messager_map[key];
for (const auto &func : funcs) {
func(value0, value1, value2);
}
}
template<typename T0, typename T1, typename T2, typename T3>
static void
publish(const std::string &key, const T0 &value0, const T1 &value1, const T2 &value2, const T3 &value3) {
auto &messager_map = get_messager_map<T0, T1, T2, T3>();
auto &funcs = messager_map[key];
for (const auto &func : funcs) {
func(value0, value1, value2, value3);
}
}
template<typename T0, typename T1, typename T2, typename T3, typename T4>
static void publish(const std::string &key, const T0 &value0, const T1 &value1, const T2 &value2, const T3 &value3,
const T4 &value4) {
auto &messager_map = get_messager_map<T0, T1, T2, T3, T4>();
auto &funcs = messager_map[key];
for (const auto &func : funcs) {
func(value0, value1, value2, value3, value4);
}
}
template<typename T>
static void add_server_func(const std::string &key, std::function<T> func) {
auto &server_func = get_server_func<T>(key);
if (server_func){
publish("log_fatal", "server_func is already exists, key: " + key);
throw std::bad_exception();
}
server_func = func;
}
template<typename T>
static bool has_server(const std::string &key) {
auto &server_func = get_server_func<T>(key);
if (server_func){
return true;
} else {
return false;
}
}
template<typename T>
static void remove_server_func(const std::string &key) {
auto &server_func = get_server_func<T>(key);
server_func = std::function<T>();
}
template<typename T>
static std::function<T> &get_server_func(const std::string &key) {
auto & server_func_map = get_server_map<T>();
return server_func_map[key];
}
public:
template<typename T>
static void register_server_map() {
get_server_map<T>();
}
static void register_data_map() {
get_messager_map();
}
template<typename T>
static void register_data_map() {
get_messager_map<T>();
}
template<typename T0, typename T1>
static void register_data_map() {
get_messager_map<T0, T1>();
}
template<typename T0, typename T1, typename T2>
static void register_data_map() {
get_messager_map<T0, T1, T2>();
}
template<typename T0, typename T1, typename T2, typename T3>
static void register_data_map() {
get_messager_map<T0, T1, T2, T3>();
}
template<typename T0, typename T1, typename T2, typename T3, typename T4>
static void register_data_map() {
get_messager_map<T0, T1, T2, T3, T4>();
}
template<typename T>
static std::vector<std::string> get_server_list() {
std::vector<std::string> keys;
auto& server_map = get_server_map<T>();
for (auto& server : server_map){
if (server.second){
keys.push_back(server.first);
}
}
return keys;
}
private:
template<typename T>
static std::map<std::string, std::function<T>> &get_server_map() {
static std::map<std::string, std::function<T>> server_func_map;
return server_func_map;
}
static std::map<std::string, std::vector<std::function<void()>>> &get_messager_map() {
static std::map<std::string, std::vector<std::function<void()>>> messager_map;
return messager_map;
}
template<typename T>
static std::map<std::string, std::vector<std::function<void(const T &)>>> &get_messager_map() {
static std::map<std::string, std::vector<std::function<void(const T &)>>> messager_map;
return messager_map;
}
template<typename T0, typename T1>
static std::map<std::string, std::vector<std::function<void(const T0 &, const T1 &)>>> &get_messager_map() {
static std::map<std::string, std::vector<std::function<void(const T0 &, const T1 &)>>> messager_map;
return messager_map;
}
template<typename T0, typename T1, typename T2>
static std::map<std::string, std::vector<std::function<void(const T0 &, const T1 &, const T2 &)>>> &
get_messager_map() {
static std::map<std::string, std::vector<std::function<void(const T0 &, const T1 &, const T2 &)>>> messager_map;
return messager_map;
}
template<typename T0, typename T1, typename T2, typename T3>
static std::map<std::string, std::vector<std::function<void(const T0 &, const T1 &, const T2 &, const T3 &)>>> &
get_messager_map() {
static std::map<std::string, std::vector<std::function<void(const T0 &, const T1 &, const T2 &,
const T3 &)>>> messager_map;
return messager_map;
}
template<typename T0, typename T1, typename T2, typename T3, typename T4>
static std::map<std::string, std::vector<std::function<void(const T0 &, const T1 &, const T2 &, const T3 &,
const T4 &)>>> &
get_messager_map() {
static std::map<std::string, std::vector<std::function<void(const T0 &, const T1 &, const T2 &, const T3 &,
const T4 &)>>> messager_map;
return messager_map;
}
};
## pipe 模块说明
此部分中的模块是框架的基础模块,包括无间隔处理模块、需要手动submit触发的模块、定时处理的模块、处理单独任务的模块以及负责调度模块的控制器。
### 类说明
-- PipeController: 模块控制器,可同时控制多个模块启动,停止
-- PipeElement: 并发模块, 可配置无处理间隔的循环处理,也可配置需要手动submit触发的处理方式
-- TaskPoolElement: 任务池模块
-- TimerElement: 定时处理模块
-- TimerTrigger: 定时触发模块
### 使用方式
1. 基于PipeElement或TimerElement,创建可以处理你想完成的任务的模块
2. 将这好多模块加入PipeController中,可以进行统一调度
### 注意事项
1. 在使用基于TimerElement时,需要同时创建TimerTrigger模块,用于触发定时模块的检测处理。
\ No newline at end of file
#pragma once
#include <vector>
#include <pipe/pipe_element.hpp>
#include <memory>
// 模块控制器,可同时控制多个模块启动,停止
class PipeController {
public:
// 注册模块,到pipe_elements
void register_element(std::shared_ptr<basic::PipeElement> element) {
pipe_elements.push_back(element);
}
// 加入模块,到pipe_elements,与register_element是两种方式
template<class T>
void add_element() {
auto element = std::make_shared<T>();
pipe_elements.push_back(element);
}
template<class T>
void add_element(T *ele) {
std::shared_ptr<T> sele(ele);
pipe_elements.push_back(sele);
}
void start() {
for (auto iter : pipe_elements) {
iter->start();
}
}
void stop() {
for (auto iter : pipe_elements) {
iter->stop();
}
}
void wait() {
for (auto iter : pipe_elements) {
iter->wait();
}
}
// 创建全局的模块, 加入global()中
template<class T>
static std::shared_ptr<T> create_global_element() {
auto element = std::make_shared<T>();
global().register_element(element);
return element;
}
static PipeController& global() {
static PipeController controller;
return controller;
}
private:
// 保存多个element, 进行同步管理
std::vector<std::shared_ptr<basic::PipeElement>> pipe_elements;
};
#pragma once
#include <map>
#include <mutex>
#include<vector>
#include <thread>
#include <atomic>
#include <functional>
#include <log/logging.h>
#include <condition_variable>
#include <sched.h>
#include <utils/app_preference.hpp>
#include <utils/app_util.hpp>
namespace basic {
// 并发模块
class PipeElement {
public:
// 构造PipeElement,循环处理模块
// @wait_cond 是否等待submit处理
// @name 模块名称
explicit PipeElement(bool wait_cond = true, std::string name = "default") :
running(false), busy(false),
wait_cond(wait_cond),
submitted(false) , thr_name(name){
DIRECT() << "Task " << name << " loaded!";
}
virtual ~PipeElement() = default;
// 模块启动函数
virtual void start() {
if (running) {
ERROR() << "start pipe element failed, pipe element is already running!";
return;
}
thd = std::thread(
[&] {
thread_initial();
running = true;
if (appPref.has_string_key("affinity.CpuIndex")){
auto affinity_str = appPref.get_string_data("affinity.CpuIndex");
AppUtil::set_thread_affinity(std::stoi(affinity_str));
}
auto affinity_key = "affinity." + thr_name;
if (appPref.has_string_key(affinity_key)){
auto affinity_str = appPref.get_string_data(affinity_key);
AppUtil::set_thread_affinity(std::stoi(affinity_str));
}
while (running) {
if (wait_cond && !submitted) {
std::unique_lock<std::mutex> lck(cv_mutex);
func_cv.wait(lck);
if (!running) {
break;
}
}
busy = true;
submitted = false;
this->thread_func();
busy = false;
}
thread_closing();
});
pthread_setname_np(thd.native_handle(), thr_name.c_str());
auto prio_key = "priority." + thr_name;
if (appPref.has_string_key(prio_key)){
auto prio_str = appPref.get_string_data(prio_key);
set_pthd_schd_param(SCHED_RR, std::stoi(prio_str));
INFO() << "Task " << thr_name << " priority: " << prio_str;
} else {
WARNING() << "Task " << thr_name << " using default priority";
}
}
// 模块停止函数
virtual void stop() {
if (!running) {
ERROR() << "stop failed, pipe element is not running!";
return;
}
running = false;
submit();
}
// 等待模块退出函数
void wait() {
thd.join();
}
// 提交函数,在wait_cond=true时,主动调用处理函数
void submit() {
submitted = true;
func_cv.notify_one();
}
// 获取busy状态
bool is_busy() {
return busy;
}
// 直接设置 wait_cond 状态
void set_wait_cond(bool value) {
wait_cond = value;
}
// 返回模块的运行状态
bool is_running() const {
return running;
}
// 设置线程的优先级和优先策略
void set_pthd_schd_param(int in_policy, int in_priority ) {
sched_param param;
param.sched_priority = in_priority;
pthread_setschedparam(thd.native_handle(), in_policy, &param);
this->policy = in_policy;
this->priority = in_priority;
}
protected:
std::thread thd;
std::condition_variable func_cv;
std::mutex cv_mutex;
std::atomic_bool running; // 运行状态
std::atomic_bool busy;
std::atomic_bool wait_cond;
std::atomic_bool submitted;
std::string thr_name;
int priority; // 优先级
int policy; // 优先策略
private:
virtual void thread_func() {};
virtual void thread_initial() {}
virtual void thread_closing() {}
};
}
#pragma once
#include <map>
#include <mutex>
#include<vector>
#include <thread>
#include <atomic>
#include <functional>
#include <log/logging.h>
#include <condition_variable>
#include <sched.h>
#include "pipe_element.hpp"
#include <queue>
namespace basic {
// 任务池模块,将待处理的任务一一加入任务池,会自动进行处理
class TaskPoolElement : public PipeElement {
public:
explicit TaskPoolElement(const std::string& name) : PipeElement(false, name){
start();
}
~TaskPoolElement(){
stop();
wait();
}
// 将func加入任务池
void asysc(const std::function<void()>& func){
std::unique_lock<std::mutex> lk(_mutex);
_funcs.push(func);
lk.unlock();
cv.notify_one();
}
// 当前任务数量
size_t task_size(){
return _funcs.size();
}
private:
virtual void thread_func() override {
std::unique_lock<std::mutex> lk(_mutex);
cv.wait(lk, [this](){
return !_funcs.empty();
});
auto func = _funcs.front();
_funcs.pop();
lk.unlock();
func();
};
std::mutex _mutex;
std::condition_variable cv;
std::queue<std::function<void()>> _funcs;
};
}
#pragma once
#include <pipe/pipe_element.hpp>
#include <utils/app_util.hpp>
// 定时处理模块
class TimerElement : public basic::PipeElement {
public:
// 构造TimerElement
// @interval 定时模块处理间隔
// @name 模块名称
TimerElement(int interval_ms, std::string name = "default") :
PipeElement(true, name), _interval(interval_ms * 1000) {
}
virtual ~TimerElement() = default;
virtual void start() {
if (running) {
ERROR() << "start pipe element failed, pipe element is already running!";
return;
}
Messager::subcribe (
"timer_trigger",
[this]() {
uint64_t time = AppUtil::get_current_us();
bool do_exec = false;
if (time >= _last_time + _interval) {
_last_time = time;
do_exec = true;
}
if (do_exec) {
submit();
}
});
PipeElement::start();
}
// 获取处理间隔
int get_interval() const {
return _interval / 1000;
}
// 设置处理间隔
void set_interval(int value) {
_interval = value * 1000;
}
protected:
int _interval;
uint64_t _last_time = 0;
private:
virtual void thread_func() {
timer_func();
}
virtual void timer_func() {}
};
\ No newline at end of file
#pragma once
#include <utils/app_util.hpp>
#include <pipe/pipe_element.hpp>
#include <message/messager.hpp>
// TimerElement 模块的触发判断函数,设置其判断精度, 默认为2ms
class TimerTrigger : public basic::PipeElement {
public:
TimerTrigger() : PipeElement(false, "TimerTrigger") {
}
private:
virtual void thread_func() {
std::this_thread::sleep_for(std::chrono::microseconds(2000));
Messager::publish("timer_trigger");
}
};
\ No newline at end of file
# ULogger
## 简介
Ulogger是一种高效的时间序列数据记录格式
```
----------------------
| Header |
----------------------
| Definitions |
----------------------
| Data |
----------------------
```
数据以*.ulg格式保存
## 使用步骤
## 1.**包含头文件**
```
//home/nvidia/vehicle/l3-localization/src/localization/src/ulogger
#include "../../ulg/ulg.h"
//请按照相对路径修改
```
## 2.**注册数据类型**(请在各个模块的**类构造函数**或类构造函数里调用的**初始化类成员函数**中注册)
```
Ulogger::instance()->register_time_series_data("double", "roll");
Ulogger::instance()->register_time_series_data("double", "pitch");
Ulogger::instance()->register_time_series_data("double", "yaw");
Ulogger::instance()->register_time_series_data("double", "velocity_e");
Ulogger::instance()->register_time_series_data("double", "velocity_n");
Ulogger::instance()->register_time_series_data("double", "velocity_u");
Ulogger::instance()->register_time_series_data("double", "position_e");
Ulogger::instance()->register_time_series_data("double", "position_n");
Ulogger::instance()->register_time_series_data("double", "position_u");
Ulogger::instance()->register_time_series_data("double", "gyro_bias_x");
Ulogger::instance()->register_time_series_data("double", "gyro_bias_y");
Ulogger::instance()->register_time_series_data("double", "gyro_bias_z");
Ulogger::instance()->register_time_series_data("double", "accel_bias_x");
Ulogger::instance()->register_time_series_data("double", "accel_bias_y");
Ulogger::instance()->register_time_series_data("double", "accel_bias_z");
Ulogger::instance()->register_time_series_data("double", "odom_scale");
```
## 3.**周期写入数据**
### 写入时间序列数据:(数据需要带时间戳us单位)
```
Ulogger::instance()->write_data("roll", _state_struct.time_us, _state_struct.quat.toRotationMatrix().to_euler_rpy().x());
Ulogger::instance()->write_data("pitch", _state_struct.time_us, _state_struct.quat.toRotationMatrix().to_euler_rpy().y());
Ulogger::instance()->write_data("yaw", _state_struct.time_us, _state_struct.quat.toRotationMatrix().to_euler_rpy().z());
Ulogger::instance()->write_data("velocity_e", _state_struct.time_us, _state_struct.vel.x());
Ulogger::instance()->write_data("velocity_n", _state_struct.time_us, _state_struct.vel.y());
Ulogger::instance()->write_data("velocity_u", _state_struct.time_us, _state_struct.vel.z());
Ulogger::instance()->write_data("position_e", _state_struct.time_us, _state_struct.pos.x());
Ulogger::instance()->write_data("position_n", _state_struct.time_us, _state_struct.pos.y());
Ulogger::instance()->write_data("position_u", _state_struct.time_us, _state_struct.pos.z());
Ulogger::instance()->write_data("gyro_bias_x", _state_struct.time_us, _state_struct.gyro_bias.x());
Ulogger::instance()->write_data("gyro_bias_y", _state_struct.time_us, _state_struct.gyro_bias.y());
Ulogger::instance()->write_data("gyro_bias_z", _state_struct.time_us, _state_struct.gyro_bias.z());
Ulogger::instance()->write_data("accel_bias_x", _state_struct.time_us, _state_struct.accel_bias.x());
Ulogger::instance()->write_data("accel_bias_y", _state_struct.time_us, _state_struct.accel_bias.y());
Ulogger::instance()->write_data("accel_bias_z", _state_struct.time_us, _state_struct.accel_bias.z());
Ulogger::instance()->write_data("odom_scale", _state_struct.time_us, _state_struct.odom_scale);
```
#### 特别的:
```
写入整型或bool型数据请调用以下接口:
Ulogger::instance()->write_data_if_value_changed("velocity_e", _state_struct.time_us, _state_struct.vel.x());
```
> 数据变化时写入,会节省空间,但一定程度上会增加调用开销
### 写入时间序列文本:(支持格式化写入,也可仅写入普通文本打印log,数据需要带时间戳us单位)
```
Ulogger::instance()->log_printf(_state_struct.time_ns, "vehicle_speed=%lf", _odom_data_new.speed);
```
## 4.**使用LocalizationAnalysis**工具查看*.ulg数据
>使用分析工具打开```2019-10-23-13:46:18.ulg```
//
// Created by nvidia on 10/17/19.
//
#ifndef PROJECT_MESSAGES_H
#define PROJECT_MESSAGES_H
#include <stdint.h>
enum class ULogMessageType : uint8_t {
FORMAT = 'F',
DATA = 'D',
INFO = 'I',
INFO_MULTIPLE = 'M',
PARAMETER = 'P',
ADD_LOGGED_MSG = 'A',
REMOVE_LOGGED_MSG = 'R',
SYNC = 'S',
DROPOUT = 'O',
LOGGING = 'L',
LOGGING_TAGGED = 'C',
FLAG_BITS = 'B',
};
#pragma pack(push, 1)
/** first bytes of the file */
struct ulog_file_header_s {
uint8_t magic[8];
uint64_t timestamp;
};
#define ULOG_MSG_HEADER_LEN 3 //accounts for msg_size and msg_type
struct ulog_message_header_s {
uint16_t msg_size;
uint8_t msg_type;
};
struct ulog_message_format_s {
uint16_t msg_size; //size of message - ULOG_MSG_HEADER_LEN
uint8_t msg_type = static_cast<uint8_t>(ULogMessageType::FORMAT);
char format[1500];
};
struct ulog_message_add_logged_s {
uint16_t msg_size; //size of message - ULOG_MSG_HEADER_LEN
uint8_t msg_type = static_cast<uint8_t>(ULogMessageType::ADD_LOGGED_MSG);
uint8_t multi_id;
uint16_t msg_id;
char message_name[255];
};
struct ulog_message_remove_logged_s {
uint16_t msg_size; //size of message - ULOG_MSG_HEADER_LEN
uint8_t msg_type = static_cast<uint8_t>(ULogMessageType::REMOVE_LOGGED_MSG);
uint16_t msg_id;
};
struct ulog_message_sync_s {
uint16_t msg_size; //size of message - ULOG_MSG_HEADER_LEN
uint8_t msg_type = static_cast<uint8_t>(ULogMessageType::SYNC);
uint8_t sync_magic[8];
};
struct ulog_message_dropout_s {
uint16_t msg_size = sizeof(uint16_t); //size of message - ULOG_MSG_HEADER_LEN
uint8_t msg_type = static_cast<uint8_t>(ULogMessageType::DROPOUT);
uint16_t duration; //in ms
};
struct ulog_message_data_header_s {
uint16_t msg_size; //size of message - ULOG_MSG_HEADER_LEN
uint8_t msg_type = static_cast<uint8_t>(ULogMessageType::DATA);
uint16_t msg_id;
};
struct ulog_message_info_header_s {
uint16_t msg_size; //size of message - ULOG_MSG_HEADER_LEN
uint8_t msg_type = static_cast<uint8_t>(ULogMessageType::INFO);
uint8_t key_len;
char key[255];
};
struct ulog_message_info_multiple_header_s {
uint16_t msg_size; //size of message - ULOG_MSG_HEADER_LEN
uint8_t msg_type = static_cast<uint8_t>(ULogMessageType::INFO_MULTIPLE);
uint8_t is_continued; ///< can be used for arrays: set to 1, if this message is part of the previous with the same key
uint8_t key_len;
char key[255];
};
struct ulog_message_logging_s {
uint16_t msg_size; //size of message - ULOG_MSG_HEADER_LEN
uint8_t msg_type = static_cast<uint8_t>(ULogMessageType::LOGGING);
uint8_t log_level; //same levels as in the linux kernel
uint64_t timestamp;
char message[128]; //defines the maximum length of a logged message string
};
struct ulog_message_logging_tagged_s {
uint16_t msg_size; //size of message - ULOG_MSG_HEADER_LEN
uint8_t msg_type = static_cast<uint8_t>(ULogMessageType::LOGGING_TAGGED);
uint8_t log_level; //same levels as in the linux kernel
uint16_t tag;
uint64_t timestamp;
char message[128]; //defines the maximum length of a logged message string
};
struct ulog_message_parameter_header_s {
uint16_t msg_size;
uint8_t msg_type = static_cast<uint8_t>(ULogMessageType::PARAMETER);
uint8_t key_len;
char key[255];
};
#define ULOG_INCOMPAT_FLAG0_DATA_APPENDED_MASK (1<<0)
struct ulog_message_flag_bits_s {
uint16_t msg_size;
uint8_t msg_type = static_cast<uint8_t>(ULogMessageType::FLAG_BITS);
uint8_t compat_flags[8];
uint8_t incompat_flags[8]; ///< @see ULOG_INCOMPAT_FLAG_*
uint64_t appended_offsets[3]; ///< file offset(s) for appended data if ULOG_INCOMPAT_FLAG0_DATA_APPENDED_MASK is set
};
#pragma pack(pop)
#endif //PROJECT_MESSAGES_H
//
// Created by nvidia on 10/17/19.
//
#ifndef PROJECT_ULOGGER_H
#define PROJECT_ULOGGER_H
#include <stdint.h>
#include "stdlib.h"
#include "messages.h"
#include <stdio.h>
#include <vector>
#include <map>
#include <mutex>
#include <thread>
#include <string>
#include <unistd.h>
#include <fcntl.h>
#include <sys/mman.h>
#include <cstring>
#include <tuple>
#include <pipe/pipe_element.hpp>
struct UlgConfig {
std::string folder = ""; //父目录
std::string header = ""; //day header
bool is_log_on = false;
bool create_day_folder = false;
bool is_file_ulimited = false;
int64_t FILE_SIZE_MAX = 4096 * 8 * 4e3;
int32_t ULG_NUM = 10;
int32_t BATCH_NUM = 10;
int32_t DAY_NUM = 10;
};
class MmapAsyncWrite : public basic::PipeElement{
public:
typedef std::tuple<int, uint8_t*, int32_t, int64_t> ParamTuple;
MmapAsyncWrite();
bool initialize(const UlgConfig& i_conf);
~MmapAsyncWrite();
virtual void thread_func() override;
std::string get_ulg_file_name(const std::string& dir_name);
bool MapRegion(int fd, uint64_t file_offset, uint8_t **base);
void UnMapRegion(uint8_t* base);
void write_data(uint8_t* data, int32_t len);
bool write_ulg_headr(uint8_t* data, int32_t len);
void sync_batch();
void sync_write();
bool is_newday_folder();
std::string create_day_folder();
std::string get_newday_folder();
void set_config(const UlgConfig& conf);
public:
static const int32_t MAP_SIZE = 4096 * 8;
static const int32_t WRITE_SIZE = MAP_SIZE / 2;
private:
UlgConfig _ulg_conf;
uint8_t * _base;
uint8_t * _base_buffer; //缓冲区,待写入的数据
uint8_t * _cursor;
uint8_t _ulg_header[WRITE_SIZE]; //记录ulg header
std::string _file_name;
std::string _batch_name;
std::string _dir_name;
std::string _up_header;
std::string _batch_folder;
ParamTuple _param_pwrite;
int32_t _mmap_offset;
int64_t _file_offset;
int32_t _header_offset;
int _data_fd;
int _batch_fd;
bool _is_init;
};
enum LogType {
NORMAL = 0,
LOC = 1,
SYS = 2,
SENSOR = 3
};
class Ulogger {
public:
Ulogger(LogType log_type = NORMAL);
~Ulogger();
static Ulogger *instance(LogType log_type = NORMAL) {
static Ulogger s_instance_normal(NORMAL); //
static Ulogger s_instance_loc(LOC); //
static Ulogger s_instance_sys(SYS); //
static Ulogger s_instance_sensor(SENSOR); //
Ulogger * ulg_ptr = NULL;
switch (log_type) {
case NORMAL:
ulg_ptr = &s_instance_normal;
break;
case LOC:
ulg_ptr = &s_instance_loc;
break;
case SYS:
ulg_ptr = &s_instance_sys;
break;
case SENSOR:
ulg_ptr = &s_instance_sensor;
break;
default:
ulg_ptr = &s_instance_normal;
break;
}
return ulg_ptr;
}
private:
MmapAsyncWrite _mmap_writer;
UlgConfig _ulg_conf;
FILE *_ulog_file;
bool _file_initialized = false;
bool _registered_topic_written = false;
uint16_t _topic_name_id = 0;
std::map<const char *, uint16_t> _topic_name_id_pair;
std::mutex _ulogger_mutex;
// store last value of integers
std::map<const char *, std::vector<bool>> _bool_name_value_pair;
std::map<const char *, std::vector<char>> _char_name_value_pair;
std::map<const char *, std::vector<int8_t>> _int8_name_value_pair;
std::map<const char *, std::vector<uint8_t>> _uint8_name_value_pair;
std::map<const char *, std::vector<int16_t>> _int16_name_value_pair;
std::map<const char *, std::vector<uint16_t>> _uint16_name_value_pair;
std::map<const char *, std::vector<int32_t>> _int32_name_value_pair;
std::map<const char *, std::vector<uint32_t>> _uint32_name_value_pair;
std::map<const char *, std::vector<int64_t>> _int64_name_value_pair;
std::map<const char *, std::vector<uint64_t>> _uint64_name_value_pair;
void initialize_uloger(const char *file_path);
/**
* write the file header with file magic and timestamp.
*/
void write_header(void);
void write_time_series_data_format(const char *topic_name);
void write_version(void);
void write_info(const char *name, const char *value);
void write_info_multiple(const char *name, const char *value, bool is_continued);
void write_info(const char *name, int32_t value);
void write_info(const char *name, uint32_t value);
template<typename T>
void write_info_template(const char *name, T value, const char *type_str);
void write_parameters(void);
bool write_message(void *ptr, size_t size, bool record_as_headr = false);
public:
bool register_time_series_data(const char *type_name, const char *topic_name);
bool write_all_registered_time_series_data_format(void);
bool write_data(const char *topic_name, uint64_t time_stamp_us, double variable_value);
bool write_data(const char *topic_name, uint64_t time_stamp_us, uint8_t variable_value);
bool write_data(const char *topic_name, uint64_t time_stamp_us, int8_t variable_value);
bool write_data(const char *topic_name, uint64_t time_stamp_us, int16_t variable_value);
bool write_data(const char *topic_name, uint64_t time_stamp_us, uint16_t variable_value);
bool write_data(const char *topic_name, uint64_t time_stamp_us, int32_t variable_value);
bool write_data(const char *topic_name, uint64_t time_stamp_us, uint32_t variable_value);
bool write_data(const char *topic_name, uint64_t time_stamp_us, int64_t variable_value);
bool write_data(const char *topic_name, uint64_t time_stamp_us, uint64_t variable_value);
bool write_data(const char *topic_name, uint64_t time_stamp_us, std::vector<std::vector<double>> lanes);
// ONLY FOR INT TYPE, EFFICIENT BUT EXPENSIVE
bool write_data_if_value_changed(const char *topic_name, uint64_t time_stamp_us, int8_t variable_value);
bool write_data_if_value_changed(const char *topic_name, uint64_t time_stamp_us, uint8_t variable_value);
bool write_data_if_value_changed(const char *topic_name, uint64_t time_stamp_us, int16_t variable_value);
bool write_data_if_value_changed(const char *topic_name, uint64_t time_stamp_us, uint16_t variable_value);
bool write_data_if_value_changed(const char *topic_name, uint64_t time_stamp_us, int32_t variable_value);
bool write_data_if_value_changed(const char *topic_name, uint64_t time_stamp_us, uint32_t variable_value);
bool write_data_if_value_changed(const char *topic_name, uint64_t time_stamp_us, int64_t variable_value);
bool write_data_if_value_changed(const char *topic_name, uint64_t time_stamp_us, uint64_t variable_value);
bool write_data_if_value_changed(const char *topic_name, uint64_t time_stamp_us, bool variable_value);
void log_printf(uint64_t time_stamp_us, const char *format, ...);
void sync_batch();
void sync_write();
void set_log_ulimited();
void reset_log_config();
template<class T>
bool write_data_t(const char *topic_name, const T& i_data) {
if (!_ulg_conf.is_log_on) {
return false;
}
if (_topic_name_id_pair.empty()) {
return false;
}
if (!_registered_topic_written) {
_registered_topic_written = write_all_registered_time_series_data_format();
}
_ulogger_mutex.lock();
auto variable_count = sizeof(T);
auto msg_size = sizeof(ulog_message_data_header_s) + variable_count;
auto it = _topic_name_id_pair.find(topic_name);
if (it != _topic_name_id_pair.end()) {
ulog_message_data_header_s msg = {};
uint8_t buffer[msg_size];
msg.msg_size = (uint16_t) (msg_size - ULOG_MSG_HEADER_LEN);
msg.msg_id = it->second;
memcpy(buffer, (uint8_t*)&msg, sizeof(ulog_message_data_header_s));
memcpy(buffer + sizeof(ulog_message_data_header_s), (uint8_t*)&i_data, variable_count);
write_message(buffer, msg_size);
_ulogger_mutex.unlock();
return true;
} else {
_ulogger_mutex.unlock();
return false;
}
}
};
#endif //PROJECT_ULOGGER_H
## utils模块说明
这个模块中集成多种工具函数,详细见如下。
### 工具函数
-- app_preference: 单例AppPreference,用于保存整个程序使用的配置,按 std::map<group.key, value> 分类保存
-- app_config: APP配置相关的函数
-- app_util: 通用工具函数
-- timer_counter: 计时工具
### APP配置使用方式
1. 使用 load_ini_config 读取所有配置到 appPref 中
```
load_ini_config("/home/.../config/ini/*.ini");
```
2. 使用 get_int32 从 appPref 中获取指定参数
```
int32_t log_level = AppConfig::get_int32("Log.log_level");
```
### TimerCounter 使用方式
1. 定义 TimerCounter
```
{
TimerCounter counter("INFO", true, 1000);
INFO() << "Hello world!";
}
```
\ No newline at end of file
/*
此文件中定义与程序配置相关的内容。
使用方式:
1. load_ini_config 加载满足条件的配置文件。如:load_ini_config("/home/nvidia/.../config/ini/*.ini");
2. 使用get_int32等获取对应字段的值。
注意事项:
1. load_ini_config 加载的路径需要是绝对路径
*/
#pragma once
#include <utils/app_util.hpp>
#include <utils/app_preference.hpp>
#include <assert.h>
#include <glob.h>
class AppConfig {
public:
static bool has_key(const std::string& key) {
return appPref.has_string_key(key);
}
// 根据 组名.字段名 获取其对应的int32数据
// 如: [Log]
// log_level: 1
// @key=Log.log_level
// @rel=1
static int32_t get_int32(const std::string& key) {
std::string value = appPref.get_string_data(key);
return AppUtil::safe_stoi(value);
}
// 根据 组名.字段名 获取其对应的int64数据
// 如: [Log]
// log_level: 1
// @key=Log.log_level
// @rel=1
static int64_t get_int64(const std::string& key) {
std::string value = appPref.get_string_data(key);
return AppUtil::safe_stoll(value);
}
// 根据 组名.字段名 获取其对应的double 数组数据
// 如: [Log]
// log_file: file_2021.log
// @key=Log.log_file
// @rel=file_2021.log
static double get_double(const std::string& key) {
std::string value = appPref.get_string_data(key);
return AppUtil::safe_stod(value);
}
// 根据 组名.字段名 获取其对应的double 数组数据
// 如: [Log]
// log_file: file_2021.log
// @key=Log.log_file
// @rel=file_2021.log
static std::vector<double> get_double_vector(const std::string& key) {
std::string value = appPref.get_string_data(key);
return AppUtil::safe_stodv(value);
}
// 根据 组名.字段名 获取其对应的string数据
// 如: [Num]
// numbers: [1.0,2.0,3.0,4.0,5.0]
// @key=Num.numbers
// @rel=[1.0,2.0,3.0,4.0,5.0]
static std::string get_str(const std::string& key) {
return appPref.get_string_data(key);
}
// 读取满足pattern格式的路径,按ini格式加载其中文件, 将其中的字段值保存到全局变量appPref中
// @pattern 可带有通配符的路径,如 /home/nvidia/.../config/ini/*.ini
//
// ini文件的编写规则另见其他说明
static void load_ini_config(const std::string& config_pattern){
std::vector<std::string> config_files;
if (glob_files(config_pattern, config_files) == true) {
for (auto& config_file : config_files) {
parse_ini(config_file);
}
}
}
private:
// 获取所有满足pattern格式的路径
// @pattern 可带有通配符的路径,如 /home/nvidia/.../config/ini/*.ini
// @files 一一保存所有路径
static bool glob_files(const std::string& pattern, std::vector<std::string>& files) {
glob_t glob_result;
memset(&glob_result, 0, sizeof(glob_result));
int res = glob(pattern.c_str(), GLOB_TILDE, nullptr, &glob_result);
if (res != 0) {
globfree(&glob_result);
return false;
}
for (size_t i = 0; i < glob_result.gl_pathc; ++i) {
files.emplace_back(std::string(glob_result.gl_pathv[i]));
}
globfree(&glob_result);
return true;
}
// 解析ini文件,并将其中的字段值保存到全局变量appPref中
// @file ini文件的路径
//
// ini文件的编写规则另见其他说明
static void parse_ini(const std::string& file) {
std::string text;
AppUtil::string_load(file, text);
std::stringstream ss(text);
std::string curr_section = "";
std::string line;
while (std::getline(ss, line, '\n')) {
line = AppUtil::string_trim(line);
if (line.empty()) {
continue;
}
// comment
if ((line[0] == '#') || (line[0] == ';')) {
continue;
}
// new section
if ((line[0] == '[') && (line[line.length() - 1] == ']')) {
line = AppUtil::string_trim(line.substr(1, line.length() - 2));
if (!line.empty()) {
curr_section = line + ".";
}
continue;
}
auto pos = line.find("=");
// unknown
if (pos == line.npos) {
continue;
}
// name=value
std::string name = AppUtil::string_trim(line.substr(0, pos));
std::string value = AppUtil::string_trim(line.substr(pos + 1));
pos = value.find("#");
value = AppUtil::string_trim(value.substr(0, pos));
if (name.empty() || value.empty()) {
// invalid
continue;
}
// section-name = value
appPref.set_string_data(curr_section + name, value);
}
}
// 从key中解析出组名和字段值
// @key 字符串如 Log.log_level
// @section 组名如 Log
// @name 字段值如 log_level
static void get_section_and_name_from_key(const std::string& key, std::string& section,
std::string& name) {
std::regex reg("(\\w+).(\\w+)");
std::smatch sm;
if (key.empty() || !std::regex_match(key, sm, reg)) {
FATAL() << "get_str format error, example is \"section.name\"; key: " << key;
}
section = sm[1];
name = sm[2];
}
};
/*
整个程序通用的工具函数集合
*/
#pragma once
#include <string>
#include <iostream>
#include <sstream>
#include <memory>
#include <vector>
#include <regex>
#include <chrono>
#include <iomanip>
#include <thread>
#include <fstream>
#include <strstream>
#include <sys/time.h>
#include <assert.h>
#include <cstdarg>
#include <dirent.h>
#include <fstream>
#include <iostream>
#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
#include <fcntl.h>
#ifdef __QNXNTO__
//#include <nbutil.h> // for getprogname()
#include <sys/neutrino.h> // for ThreadCtl()
#include <sys/procmgr.h> // for procmgr_ability()
#else
#include <unistd.h>
#include <sys/syscall.h> // for syscall()
#include <cerrno> // for program_invocation_short_name
#endif
// 获取文件路径的文件夹目录
#define __FOLDER__ std::string(__FILE__).erase(std::string(__FILE__).find_last_of('/'))
// 在一个循环调用的模块中使用FREQUENCE(freq), 可以计算出这部分的执行频率,保存在freq中
// 注: freq不需要提前声明,其在内部声明,在此部分之后,可以直接使用
#define FREQUENCE(v) static int __count = 0;\
static uint64_t __last_time = 0;\
static double v = 0;\
uint64_t __time = AppUtil::get_current_ms();\
if (__time > __last_time + 3000){ \
v = 1000.0 * __count / (__time - __last_time);\
__last_time = __time;\
__count = 0;\
}\
__count++;
// 控制执行频率,间隔至少interval
#define TIME_LIMIT_EXEC(interval) \
static uint64_t __last_time = 0;\
uint64_t __time = AppUtil::get_current_ms();\
bool do_exec = false;\
if (__time > __last_time + interval){ \
__last_time = __time;\
do_exec = true;\
}\
if (do_exec)
#define TIMESTAMP_LIMIT_EXEC(interval, timpstamp) \
static int64_t __last_time = 0;\
int64_t __time = timpstamp;\
bool do_exec = false;\
if (abs(__time - __last_time) > interval){ \
__last_time = __time;\
do_exec = true;\
}\
if (do_exec)
// AppUtil 工具类函数的集合
class AppUtil {
public:
static void sleep_ms(int value) {
std::this_thread::sleep_for(std::chrono::milliseconds(value));
}
static void sleep_us(int value) {
std::this_thread::sleep_for(std::chrono::microseconds(value));
}
// 获取当前的微秒 毫秒 秒级时间戳
static int64_t get_current_us() {
auto now = std::chrono::system_clock::now();
auto duration = std::chrono::
duration_cast<std::chrono::microseconds>(now.time_since_epoch());
return duration.count();
}
static int64_t get_current_ms() {
return get_current_us() / 1000;
}
static int64_t get_current_sec() {
return get_current_us() / 1000000;
}
// 判断经纬度是否合理
static bool is_legal_lonlat(const double &longitude, const double &latitude) {
if (longitude >= 73 && longitude <= 135 && latitude >= 0 && latitude <= 50) {
return true;
}
return false;
}
// 读取文件path,以string输出,string格式限制大小<2G
static std::string get_file_text(const std::string& path){
std::ifstream ifs(path);
std::stringstream ss;
if (!ifs.is_open()) {
throw "config file open failed, path: ";
}
std::string line;
while (getline(ifs, line)) {
ss << line;
}
return ss.str();
}
// 去掉前后的空格和回车
static std::string string_trim(const std::string& str) {
std::string s(str);
s.erase(0, s.find_first_not_of(" \t\n\r\v\f"));
s.erase(s.find_last_not_of(" \t\n\r\v\f") + 1);
return s;
}
// 按delim分割字符串src
static std::vector<std::string> string_split(const std::string& src, const std::string &delim) {
assert(!delim.empty());
std::vector<std::string> list;
if (src.empty()) {
return list;
}
std::string s = src;
std::string token;
size_t pos = s.find(delim);
while (pos < s.length()) {
token = s.substr(0, pos);
if (token.empty() == false) {
list.emplace_back(token);
}
s = s.substr(pos + delim.length());
pos = s.find(delim);
}
if (s.empty() == false) {
list.emplace_back(s);
}
return list;
}
// 将char数组buff转成string
static std::string char_to_string(const char* buff, const size_t buff_size) {
std::string str(buff, buff_size);
return str;
}
// 解释格式化字符串到string
static std::string string_sprintf(const char* format, ...) {
va_list args;
va_start(args, format);
std::string buff = string_vsnprintf(format, args);
va_end(args);
return buff;
}
// 解释格式化字符串到string
static std::string string_vsnprintf(const std::string& format, va_list args) {
// try first
int32_t try_size = 512;
char try_buff[try_size];
memset(try_buff, 0, try_size);
std::string res;
int32_t size_needed = std::vsnprintf(try_buff, try_size, format.c_str(), args);
if (size_needed < try_size) {
res = try_buff;
} else {
// try again
char buff_needed[size_needed+1];
memset(buff_needed, 0, size_needed+1);
int32_t size = std::vsnprintf(buff_needed, size_needed+1, format.c_str(), args);
if (size >= 0) {
res = std::string(buff_needed);
} else {
res = std::string("");
}
}
return res;
}
// 解释格式化字符串后,追加到file中
static void _string_append(const std::string& file, const char* format, va_list ap) {
assert(!file.empty());
FILE* fp = nullptr;
if (file == "stdout") {
fp = stdout;
} else if (file == "stderr") {
fp = stderr;
} else {
fp = fopen(file.c_str(), "a");
}
if (fp != nullptr) {
std::vfprintf(fp, format, ap);
if ((file != "stdout") && (file != "stderr")){
fclose(fp);
fp = nullptr;
}
}
}
static void string_append(const std::string& file, const char* format, ...) {
assert(!file.empty());
va_list ap;
va_start(ap, format);
_string_append(file, format, ap);
va_end(ap);
}
// 读取file到content
static bool string_load(const std::string& file, std::string& content) {
assert(!file.empty());
FILE* fp = fopen(file.c_str(), "rb");
if (fp == nullptr) {
return false;
}
// check size
fseek(fp, 0, SEEK_END);
size_t bufsize = ftell(fp);
rewind(fp);
// malloc buffer
char buf[bufsize + 1];
memset(buf, 0, bufsize + 1);
size_t res = fread(buf, 1, bufsize, fp);
fclose(fp);
if (res <= 0) {
return false;
}
// to string
content = char_to_string(buf, bufsize);
return true;
}
// 格式转换部分
static int32_t bool_to_int(const bool b) {
int32_t i = 0;
if (b == true) {
i = 1;
}
return i;
}
// zero_as_false: true i不为0,则返回true
// zero_as_false: false i恒为0,则返回true
static bool int_to_bool(const int32_t i, bool zero_as_false) {
bool res = false;
if (zero_as_false && (i != 0)) {
res = true;
} else if ((!zero_as_false) && (i == 0)) {
res = true;
}
// MISRA C++ 2008: 6-6-5
return res;
}
static int32_t safe_stoi(const std::string& str) {
int32_t res = 0;
try {
res = std::stoi(str);
} catch (std::out_of_range& e) {
res = 0;
} catch (std::invalid_argument& e) {
res = 0;
}
// MISRA C++ 2008: 6-6-5
return res;
}
static int64_t safe_stoll(const std::string& str) {
int64_t res = 0;
try {
res = std::stoll(str);
} catch (std::out_of_range& e) {
res = 0;
} catch (std::invalid_argument& e) {
res = 0;
}
// MISRA C++ 2008: 6-6-5
return res;
}
static double safe_stod(const std::string& str) {
double res = 0.0;
try {
res = std::stod(str);
} catch (std::out_of_range& e) {
res = 0.0;
} catch (std::invalid_argument& e) {
res = 0.0;
}
// MISRA C++ 2008: 6-6-5
return res;
}
static std::vector<double> safe_stodv(const std::string& s) {
std::vector<double> res;
int left_i = s.find('[');
int right_i = s.find(']');
if (left_i >= 0 && right_i >= 0 && left_i < right_i) {
std::string sub = s.substr(left_i + 1, right_i - left_i - 1);
std::vector<std::string> str_vec = string_split(sub, ",");
for (auto v : str_vec) {
res.push_back(safe_stod(v));
}
}
return res;
}
// 返回当前时间,格式为 tm_hour-tm_min-tm_sec-ms
static std::string now_time() {
auto now = std::chrono::system_clock::now();
auto _time = std::chrono::system_clock::to_time_t(now);
struct tm _tm;
std::string time_str;
if (localtime_r(&_time, &_tm) != nullptr) {
auto duration = std::chrono::
duration_cast<std::chrono::milliseconds>(now.time_since_epoch());
int64_t ms = duration.count() % 1000;
time_str = string_sprintf("%02d-%02d-%02d-%03d", _tm.tm_hour, _tm.tm_min, _tm.tm_sec, ms);
} else {
time_str = "00-00-00";
}
return time_str;
}
// 返回当前日期,格式为 tm_year-tm_mon-tm_mday
static std::string now_date() {
auto now = std::chrono::system_clock::now();
auto _time = std::chrono::system_clock::to_time_t(now);
struct tm _tm;
std::string date_str;
if (localtime_r(&_time, &_tm) != nullptr) {
date_str = string_sprintf("%04d-%02d-%02d", _tm.tm_year + 1900, _tm.tm_mon + 1, _tm.tm_mday);
} else {
date_str = "1970-01-01";
}
return date_str;
}
// 获取当前文件夹的文件列表,忽略 . ..
static std::vector<std::string> get_file_list(const std::string& path, const std::string& filter = ".*")
{
char cur_dir[] = ".";
char up_dir[] = "..";
std::vector<std::string> file_list;
DIR *dirp;
struct dirent *dp;
dirp = opendir(path.c_str());
while ((dp = readdir(dirp)) != NULL) {
if(std::regex_match(dp->d_name, std::regex(filter))){
//忽略 . 和 ..
if ((0 == strcmp(cur_dir, dp->d_name)) || (0 == strcmp(up_dir, dp->d_name)) ) {
continue;
}
file_list.push_back(std::string(dp->d_name ));
}
}
(void) closedir(dirp);
std::sort(file_list.begin(), file_list.end(), [](const std::string& s1, const std::string& s2){
return s1.compare(s2) < 0;
});
return file_list;
}
// 返回文件大小
static int64_t get_file_size(const std::string& path){
std::ifstream in(path);
in.seekg(0, std::ios::end);
return in.tellg();
}
/*
@return: 1 create success ; 2 exist ; -1 create failed
*/
static int make_dir(const std::string& path) {
if (access(path.c_str(), 0) == -1) {
int ret = mkdir(path.c_str(), S_IRUSR | S_IWUSR | S_IXUSR
| S_IRGRP | S_IWGRP | S_IXGRP
| S_IROTH | S_IXOTH );
if (ret == -1) {
return -1;
} else {
return 1;
}
}
return 2;
}
static int make_file(const std::string& file_name) {
mode_t f_attrib = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH;
int fd = open(file_name.c_str (), O_RDWR | O_CREAT , f_attrib);
return fd;
}
static int remove_rf(const char *dir) {
char cur_dir[] = ".";
char up_dir[] = "..";
DIR *dirp;
struct dirent *dp;
struct stat dir_stat;
if (access(dir, F_OK) != 0) { //文化是否存在
return 0;
}
if (stat(dir, &dir_stat) < 0) { //获取文件状态
return -1;
}
if (S_ISREG(dir_stat.st_mode)) { //普通文件
std::remove(dir);
} else if (S_ISDIR(dir_stat.st_mode)) { //目录文件
dirp = opendir(dir);
while ((dp=readdir(dirp)) != NULL) {
//忽略 . 和 ..
if ((0 == strcmp(cur_dir, dp->d_name)) || (0 == strcmp(up_dir, dp->d_name)) ) {
continue;
}
std::string dir_name = std::string(dir) + "/" + std::string(dp->d_name);
remove_rf(dir_name.c_str());
}
closedir(dirp);
rmdir(dir);
} else {
return -1;
}
return 0;
}
// 检查dir_name文件夹下面的满足筛选条件filter的文件个数是否已经超过上限值num_limited,并将超过的文件删掉
// 如::check_file_num(".", ".*\\.ulg", 10); # 当前路径下只保存10个ulog文件,其他的都删除掉
static void check_file_num(const std::string& dir_name, const std::string& filter, int32_t num_limited) {
std::vector<std::string> files = get_file_list(dir_name, filter);
int file_size = files.size();
for (int index = 0; file_size > num_limited; index++, file_size--){
std::string file = dir_name + "/" + files[index];
remove_rf(file.c_str());
}
}
static int copy_file(const char* src, const char* des) {
int ret = 0;
FILE* psrc = NULL, *pdes = NULL;
psrc = fopen(src, "r");
pdes = fopen(des, "w+");
if (psrc && pdes) {
int nlen = 0;
char sz_buf[1024] = {0};
while((nlen = fread(sz_buf, 1, sizeof(sz_buf), psrc)) > 0) {
fwrite(sz_buf, 1, nlen, pdes);
}
fflush(pdes);
} else {
ret = -1;
}
if (psrc) {
fclose(psrc);
}
if (pdes) {
fclose(pdes);
}
return ret;
}
// 绑定当前线程到指定的CPU上
// 比如将计算需求高的线程绑定在计算能力好的CPU上
static int set_thread_affinity(const size_t cpu_index) {
#ifdef __QNXNTO__
// QNX
if (cpu_index > 0) {
int32_t mask = 1;
size_t index = cpu_index - 1;
if (index > 0) {
mask = mask << index; // bit pos = cpu index
}
return ThreadCtl(_NTO_TCTL_RUNMASK, (void*)(intptr_t)mask); // must be void*
}
#else
// Linux
if (cpu_index > 0) {
size_t cpus = std::thread::hardware_concurrency();
cpu_set_t set;
CPU_ZERO(&set);
size_t index = cpu_index - 1; // cpu index from 0
CPU_SET((index % cpus), &set);
return pthread_setaffinity_np(pthread_self(), sizeof(set), &set);
}
#endif
return 0;
}
};
/*
文件读取器(待优化,暂不放入代码库)
使用方法:
file文件内容: xiaonuo beijing 19960515 25 # 分别表示 姓名 住址 生日 年龄
std::string name;
std::string address;
std::string birthday;
int age;
DataReader reader;
reader.read(name, address, birthday, age);
if (reader.is_finished())
std::cout << "name: " << name << "address: " << address << "birthday: " << birthday << "age: " << age;
*/
#include <log/logging.h>
#include <fstream>
class DataReader {
public:
DataReader(const std::string& path, char spliter = '\t') {
file.open(path);
FATAL_IF_NOT(file.is_open()) << "open file failed! path: " << path;
}
~DataReader() {
}
template<typename T>
DataReader& read(T& t) {
file >> t;
return *this;
}
template<typename T, typename... Args>
DataReader& read(T& t, Args& ... args) {
file >> t;
read(args...);
return *this;
}
bool is_finished(){
return file.peek() == EOF;
};
private:
std::ifstream file;
};
// 待优化,暂不放入代码库
#pragma once
#include <log/logging.h>
#include <fstream>
class DataRecoder {
public:
DataRecoder(const std::string& path, char spliter = '\t'): spliter(spliter) {
file.open(path);
FATAL_IF_NOT(file.is_open()) << "open file failed! path: " << path;
}
~DataRecoder() {
save();
}
template<typename T>
DataRecoder& record(T t) {
file << t << std::endl;
return *this;
}
template<typename T, typename... Args>
DataRecoder& record(T t, Args ... args) {
file << t << spliter;
record(args...);
return *this;
}
void save() {
file.flush();
}
private:
std::ofstream file;
char spliter;
};
#pragma once
#include <string>
#include "dlfcn.h"
#include "app_preference.hpp"
#include "app_util.hpp"
#include "app_config.hpp"
#include <diag/diagnose.hpp>
typedef void (*VoidFunc)();
class DlUtils{
DlUtils();
static std::map<std::string, void*>& get_plug_map(){
static std::map<std::string, void*> plug_map;
return plug_map;
}
public:
static void initial_path(char* argv[]){
char absolute_path[10240];
realpath(argv[0], absolute_path);
std::string exe_file(absolute_path);
std::string exe_name = exe_file.substr(exe_file.find_last_of('/') + 1);
std::string app_folder = DlUtils::path_go_back(exe_file, 2);
std::string root_folder = DlUtils::path_go_back(app_folder);
std::string bin_folder = DlUtils::path_go_back(exe_file);
std::string lib_folder = app_folder + "/lib";
std::string config_folder = app_folder + "/config";
auto config_dir = opendir(config_folder.c_str());
if (config_dir){
closedir(config_dir);
} else {
config_folder = root_folder + "/config";
config_dir = opendir(config_folder.c_str());
if (config_dir){
closedir(config_dir);
} else {
FATAL() << "no config dir found!";
}
}
std::string ini_folder = config_folder + "/ini";
appPref.set_string_data("exe_file", exe_file);
appPref.set_string_data("exe_name", exe_name);
appPref.set_string_data("app_folder", app_folder);
appPref.set_string_data("root_folder", root_folder);
appPref.set_string_data("bin_folder", bin_folder);
appPref.set_string_data("lib_folder", lib_folder);
appPref.set_string_data("config_folder", config_folder);
appPref.set_string_data("ini_folder", ini_folder);
AppConfig::load_ini_config(ini_folder +"/*.ini");
initial_log();
Diagnose::register_server("load_plugin", [&](const std::string& name){
std::string message;
if (DlUtils::try_load_plugin(name.c_str(), message)){
DlUtils::run_plugin(name.c_str());
}
return message;
});
}
static bool try_load_plugin(const char* name, std::string& message){
std::string lib_folder = appPref.get_string_data("lib_folder");
std::string lib_file = lib_folder + "/lib";
lib_file += name;
lib_file += ".so";
std::stringstream ss;
if (get_plug_map().find(name) != get_plug_map().end()){
ss << "load_plugin: plugin " << name << " is already loaded!";
message = ss.str();
return false;
}
auto plugin = dlopen(lib_file.c_str(), RTLD_LAZY);
if (!plugin){
ss << "load_plugin: load library faild, error " << dlerror() << " path:" << lib_file;
message = ss.str();
return false;
}
std::string load_func_name = std::string("load_") + name;
VoidFunc load = (VoidFunc)dlsym(plugin, load_func_name.c_str());
if (!load){
ss << "load_plugin: load func exec faild, error " << dlerror() << " path:" << load_func_name;
message = ss.str();
return false;
}
load();
get_plug_map()[name] = plugin;
ss << name << " load successed!";
message = ss.str();
return true;
}
static void load_plugin(const char* name){
std::string message;
bool res = try_load_plugin(name, message);
FATAL_IF_NOT(res) << message;
}
static void run_plugin(const char* name){
std::string run_func_name = std::string("run_") + name;
if (get_plug_map().find(name) == get_plug_map().end()){
FATAL() << "run_plugin: plugin " << name << " is not loaded!";
}
auto plugin = get_plug_map()[name];
VoidFunc run = (VoidFunc)dlsym(plugin, run_func_name.c_str());
if (!run){
FATAL() << "run_plugin: run func exec faild, error " << dlerror();
}
run();
}
static void unload_plugin(const char* name){
std::string unload_func_name = std::string("unload_") + name;
if (get_plug_map().find(name) == get_plug_map().end()){
FATAL() << "unload_plugin: plugin " << name << " is not loaded!";
}
auto plugin = get_plug_map()[name];
VoidFunc unload = (VoidFunc)dlsym(plugin, unload_func_name.c_str());
if (!unload){
FATAL() << "unload_plugin: unload func exec faild, error " << dlerror();
}
unload();
}
static std::string path_go_back(const std::string& path, int times = 1){
std::string path_res = path;
for (int i = 0; i < times; i++){
path_res = path_res.substr(0, path_res.find_last_of('/'));
}
return path_res;
}
private:
static void initial_log()
{
if (appPref.has_string_key("log.log_level")){
int log_level = std::stoi(appPref.get_string_data("log.log_level"));
bool level_check = log_level < LogLevel::INFO || log_level > LogLevel::FATAL;
log_level = level_check ? LogLevel::DIRECT : log_level;
LogInterface::set_log_level((LogLevel)log_level);
DIRECT() << "using log level: " << LogInterface::log_level_to_string((LogLevel)log_level);
} else {
WARNING() << "using default log level: INFO";
}
if (appPref.has_string_key("ime_log.ime_log_level")){
int log_level = std::stoi(appPref.get_string_data("ime_log.ime_log_level"));
bool level_check = log_level < LogLevel::INFO && log_level > LogLevel::FATAL;
log_level = level_check ? LogLevel::DIRECT : log_level;
ImeLogInterface::set_log_level((LogLevel)log_level);
DIRECT() << "using log level: " << ImeLogInterface::log_level_to_string((LogLevel)log_level);
} else {
WARNING() << "using default log level: INFO";
}
}
};
#pragma once
#include<geo/geo.hpp>
#include <utils/app_util.hpp>
class GpsHelper {
public:
GpsHelper() {
_geo = geo::instance();
}
bool is_initialized() const{
static bool& is_init = _geo->_map_projection_reference_point.init_done;
return is_init;
}
void initial(const double& lon, const double& lat){
if (!_geo->get_map_projection_reference().init_done) {
_geo->global_to_local_init(lat, lon, 0, AppUtil::get_current_ms());
}
}
void lonlat_2_mercator(const double& lon, const double& lat, double& x, double& y) const {
double z = 0;
_geo->global_to_local(lat, lon, -z, &x, &y, &z);
}
void mercator_2_lonlat(const double& x, const double& y, double& lon, double& lat) const {
double z = 0;
_geo->local_to_global(x, y, -z, &lat, &lon, &z);
}
double get_distance_point_2_point(const double& lon1, const double& lat1,
const double& lon2, const double& lat2){
return _geo->get_distance_to_next_waypoint(lat1, lon1, lat2, lon2);
}
private:
geo* _geo;
};
#pragma once
#include <fstream>
#include <jsoncpp/json/json.h>
class JsonWriter {
public:
JsonWriter() {
//jswBuilder["emitUTF8"] = true; // 直接输出 UTF-8 字符
_jsb["indentation"] = ""; // 压缩格式,没有换行和不必要的空白字符
}
~JsonWriter() {
if (_file_os.is_open()) {
_file_os.close();
}
}
void open(const std::string& filename, std::ios_base::openmode mode) {
_file_os.open(filename, mode);
}
void write(const Json::Value& value) {
std::unique_ptr<Json::StreamWriter> jsw(_jsb.newStreamWriter());
std::ostringstream os;
jsw->write(value, &os);
_file_os << os.str();
_file_os.flush();
}
private:
Json::StreamWriterBuilder _jsb;
std::ofstream _file_os;
};
#pragma once
namespace math{
template<typename T,
template <typename, typename = std::allocator<T> > class Container>
double average(const Container<T>& data)
{
int len = data.size();
double sum = 0.0;
for (auto x : data)
sum += x;
return sum / len;
}
template<typename T,
template <typename, typename = std::allocator<T> > class Container>
double variance(const Container<T>& data)
{
int len = data.size();
double sum = 0.0;
double avg = average(data);
for (auto x : data)
sum += pow(x - avg, 2);
return sum / len;
}
template<typename T,
template <typename, typename = std::allocator<T> > class Container>
double standar_deviation(const Container<T>& data)
{
double sd = variance(data);
return sqrt(sd);
}
}
\ No newline at end of file
#pragma once
#include <string>
#include <sys/time.h>
#include <time.h>
#include <iostream>
#include <message/messager.hpp>
#include <memory>
#include <mutex>
#include <utils/app_util.hpp>
#include <core/data_type.hpp>
#include <log/logging.h>
// TimerCounter 用于统计算法的用时,从构造函数开始计时,截止至析构函数,并输出中间运行时间
class TimerCounter {
public:
// 构造 TimerCounter, 并开始计时
// @flag 计时片段的代号
// @enable_cout 是否在析构时自动输出
// @threshold 超时阈值,如果超过threshold才输出用时, ms
TimerCounter(const std::string& flag, bool enable_cout = false, long threshold = -1) :
_enable_cout(enable_cout), _threshold(threshold) {
_start = AppUtil::get_current_us();
this->_flag = flag;
}
~TimerCounter() {
if (_enable_cout){
long delta = get_time_ms_elapsed();
if (delta > _threshold) {
//INFO() << _flag << " Time elapsed: " << delta << "ms";
}
}
}
// 在flag后追加key
void addkey(const std::string& key) {
this->_flag += key;
}
// 获取截止到当前,TimerCounter中统计的时间
long get_time_ms_elapsed() {
long end = AppUtil::get_current_us();
long n = (end - _start) / 1000;
return n;
}
private:
long _start; // 开始计时的时间
bool _enable_cout; // 是否在析构时自动输出
std::string _flag; // 计时片段的代号, 析构时可以输出
long _threshold; // 自动输出用时的阈值
};
/*
zmq的封装
zmq适配层ZmqInterface,zmq发送端ZmqPublisher,zmq响应端ZmqResponser
使用方式:
ZmqPublisher zmq_publisher;
zmq_publisher.register_publisher(output_sensor_url);
zmq_publisher.publish(std::string("hello"));
zmq_publisher.shutdown();
TODO:ZmqResponser
TODO: 空格类的不规范
*/
#pragma once
#include <zmq.h>
#include <string>
#include <log/logging.h>
#include <functional>
class ZmqInterface {
public:
~ZmqInterface() {
shutdown();
}
protected:
ZmqInterface() {
_zmq_context = zmq_ctx_new();
}
public:
void set_send_timeout(int time_ms){
int rc = 0;
rc = zmq_setsockopt (_zmq_socket, ZMQ_SNDTIMEO, &time_ms, sizeof(time_ms));
FATAL_IF(rc != 0) << "zmq_setsockopt faild! rc = " << rc << "; errno = " << errno;
_send_time_out = time_ms;
}
void set_recv_timeout(int time_ms){
int rc = 0;
rc = zmq_setsockopt (_zmq_socket, ZMQ_RCVTIMEO, &time_ms, sizeof(time_ms));
FATAL_IF(rc != 0) << "zmq_setsockopt faild! rc = " << rc << "; errno = " << errno;
_recv_time_out = time_ms;
}
void set_send_queue_size(int size){
int rc = 0;
rc = zmq_setsockopt(_zmq_socket, ZMQ_SNDHWM, &size, sizeof(size));
FATAL_IF(rc != 0) << "zmq_setsockopt faild! rc = " << rc << "; errno = " << errno;
}
void set_send_buffer_size(int size){
int rc = 0;
rc = zmq_setsockopt(_zmq_socket, ZMQ_SNDBUF, &size, sizeof(size));
FATAL_IF(rc != 0) << "zmq_setsockopt faild! rc = " << rc << "; errno = " << errno;
}
int shutdown(){
return zmq_ctx_shutdown(_zmq_context);
}
protected:
void bind(const std::string& url){
int rc = zmq_bind (_zmq_socket, url.c_str());
FATAL_IF(rc != 0) << "zmq_bind faild! url: " << url << "; errno = " << errno;
}
void connect(const std::string& url){
int rc = zmq_connect(_zmq_socket, url.c_str());
FATAL_IF(rc != 0) << "zmq_connect faild! url: " << url << "; errno = " << errno;
}
protected:
void* _zmq_context = nullptr;
void* _zmq_socket = nullptr;
int _recv_time_out = -1;
int _send_time_out = -1;
};
class ZmqPublisher : public ZmqInterface{
public:
ZmqPublisher(){
_zmq_socket = zmq_socket(_zmq_context, ZMQ_PUB);
}
void register_publisher(const std::string& url){
bind(url);
}
int publish(const std::string& data){
int size = zmq_send(_zmq_socket, (char*)data.c_str(), data.size(), 0);
return size;
}
};
class ZmqSubscriber : public ZmqInterface{
public:
ZmqSubscriber(){
_zmq_socket = zmq_socket(_zmq_context, ZMQ_SUB);
set_filter("");
}
void subscribe(const std::string& url){
connect(url);
}
int receive(std::string& buffer){
int size = zmq_recv(_zmq_socket, (char*)buffer.c_str(), buffer.size(), 0);
return size;
}
void set_filter(const std::string& filter){
int rc = zmq_setsockopt(_zmq_socket, ZMQ_SUBSCRIBE, filter.c_str(), 0);
FATAL_IF(rc != 0) << "zmq_setsockopt faild! rc = " << rc << "; errno = " << errno;
}
};
class ZmqResponser : public ZmqInterface{
public:
ZmqResponser(){
_zmq_socket = zmq_socket(_zmq_context, ZMQ_REP);
_recv_buffer.resize(200 * 1024);
}
void set_reponse_func(const std::function<std::string(const std::string&)>& func){
_rep_func = func;
}
void register_responser(const std::string& url){
bind(url);
}
void receive_and_respose(){
int size = zmq_recv(_zmq_socket, (char*)_recv_buffer.c_str(), _recv_buffer.size(), 0);
if (size > 0 && _rep_func) {
auto rep_str = _rep_func(_recv_buffer.substr(0, size));
zmq_send(_zmq_socket, rep_str.data(), rep_str.length(), 0);
}
}
private:
std::function<std::string(const std::string&)> _rep_func;
std::string _recv_buffer;
};
class ZmqRequester : public ZmqInterface{
public:
ZmqRequester(){
_zmq_socket = zmq_socket(_zmq_context, ZMQ_REQ);
_recv_buffer.resize(200 * 1024);
}
void register_requester(const std::string& url){
connect(url);
_url_list.push_back(url);
}
bool request_and_receive(const std::string& req_data, std::string& rep_data){
int size = zmq_send(_zmq_socket, req_data.data(), req_data.length(), 0);
if (size > 0) {
size = zmq_recv(_zmq_socket, (char*)_recv_buffer.c_str(), _recv_buffer.size(), 0);
rep_data = _recv_buffer.substr(0, size);
}
return size >= 0;
}
void reconnect(){
zmq_close(_zmq_socket);
_zmq_socket = zmq_socket(_zmq_context, ZMQ_REQ);
for (auto& url : _url_list){
connect(url);
}
set_recv_timeout(_recv_time_out);
set_send_timeout(_send_time_out);
}
private:
std::string _recv_buffer;
std::vector<std::string> _url_list;
};
此差异已折叠。
project(iv_task)
if(${BUILD_TYPE} STREQUAL "linux-x86_64")
set(DL dl CACHE INTERNAL "DL")
set(PTHREAD pthread CACHE INTERNAL "PTHREAD")
endif()
include_directories(
${CORELIB_INCLUDE_DIR}
)
add_executable(iv_task
src/main.cpp
src/main_loop.cpp
)
target_link_libraries(iv_task
corelib
${PTHREAD}
${DL}
)
#include <dlfcn.h>
#include <map>
#include <log/logging.h>
#include <pipe/pipe_controller.hpp>
#include <pipe/timer_trigger.hpp>
#include <utils/app_config.hpp>
#include <utils/app_preference.hpp>
#include <message/messager.hpp>
#include <diag/diagnose.hpp>
#include "main_loop.h"
#include<unistd.h>
#include <utils/dl_utils.hpp>
#include <sys/wait.h>
#include <signal.h>
MainLoop main_loop;
TimerTrigger g_timer_trigger;
void stop(int){
main_loop.stop();
g_timer_trigger.stop();
}
int main(int argc, char *argv[])
{
DlUtils::initial_path(argv);
std::stringstream ss;
ss << appPref.get_string_data("exe_file") << " ";
main_loop.start();
for (int i = 1; i < argc; i++){
DlUtils::load_plugin(argv[i]);
ss << argv[i] << " ";
}
g_timer_trigger.start();
for (int i = 1; i < argc; i++){
DlUtils::run_plugin(argv[i]);
}
signal(SIGINT, &stop);
main_loop.wait();
g_timer_trigger.wait();
for (int i = 1; i < argc; i++){
DlUtils::unload_plugin(argv[i]);
}
ss << " &";
if (main_loop.is_restart()){
system(ss.str().c_str());
}
}
#include "main_loop.h"
#include <diag/diagnose.hpp>
MainLoop::MainLoop() : TimerElement(1000, "MainLoop")
{
Diagnose::register_server("exec_cmd", [&](const std::string& cmd){
std::string message;
if (cmd == "quit"){
stop();
} else if (cmd == "restart") {
stop();
_is_restart = true;
} else {
message += "cmd ";
message += cmd + " not found!";
}
message = "exec successed!";
return message;
});
}
void MainLoop::timer_func()
{
}
bool MainLoop::is_restart() const
{
return _is_restart;
}
#ifndef MAINLOOP_H
#define MAINLOOP_H
#include <pipe/timer_element.hpp>
class MainLoop : public TimerElement
{
public:
MainLoop();
// TimerElement interface
bool is_restart() const;
private:
virtual void timer_func() override;
bool _is_restart = false;
};
#endif // MAINLOOP_H
cmake_minimum_required(VERSION 2.8)
project(proto_data)
find_package(Protobuf REQUIRED)
#find_package(Glog REQUIRED)
#find_package(Gflags REQUIRED)
include_directories(
${PROTOBUF_INCLUDE_DIR}
${GLOG_INCLUDE_DIR}
)
ADDLIB(${PROJECT_NAME} data
${GLOG_LIBRARIES}
${PROTOBUF_LIBRARIES}
)
此差异已折叠。
此差异已折叠。
package message.fs;
enum Trigger {
FailAction = 1; // FailAction: 失效动作
RecoveryAction = 2; // RecoveryAction: 恢复动作
}
message FailSafeState {
required uint32 dtc_number = 1; // 故障代号,参见 DTCTable
required bool status = 2; // 状态 true: 正常 false: 异常
required int32 is_fail_act = 3; // 状态 1: 已执行  0: 未执行  2:无需执行
}
message Action {
required uint32 dtc_number = 1; // 故障代号,参见 DTCTable
required Trigger trigger = 2; // FailAction: 失效动作 RecoveryAction: 恢复动作
required bool action_state = 3; // 状态 true: 完成 false: 未完成
}
\ No newline at end of file
syntax = "proto3";
message QnxProcStatus {
int64 pid = 1; // process id
int64 tid = 2; // thread id
int64 cpuid = 3; // last cpu id
int64 priority = 4; // scheduler priority
string policy = 5; // scheduler policy: R,F,O,S
int64 mem = 6; // process memory used, bytes
float cpu_used = 7; // =used/total
string proc_name = 8; // process filename
string thread_name = 9; // thread name or id
};
message QnxProcList {
int64 timestamp = 1; // system timestamp
float cpu_user = 3; // cpu user
float cpu_kernel = 4; // cpu kernel
repeated float cpu_idle = 5; // cpu idle
int64 mem_avail = 6; // memory avail, bytes
int64 mem_total = 7; // memory total, bytes
repeated QnxProcStatus proc_list = 8;
};
cmake_minimum_required(VERSION 2.8)
project(server_log)
add_definitions(-DLOAD_PLUGIN=load_${PROJECT_NAME})
add_definitions(-DRUN_PLUGIN=run_${PROJECT_NAME})
add_definitions(-DUNLOAD_PLUGIN=unload_${PROJECT_NAME})
find_package(Protobuf REQUIRED)
include_directories(
${GLOG_INCLUDE_DIR}
${CORELIB_INCLUDE_DIR}
${PROTOBUF_INCLUDE_DIR}
${DATA_INCLUDE_DIR}
${UTILITIES_INCLUDE_DIR}
)
ADDLIB(${PROJECT_NAME} src
)
## server_log模块说明
server_log模块将订阅log_debug/log_info/log_warning/log_error/log_fatal/log_direct, 并将其分类写入到两个log文件中。
分类规则:
1. log_debug&&log_info&&log_direct 写入到 *_INFO.log 文件中.
2. log_warning&&log_error&&log_fatal 写入到 *_ERROR.log 文件中.
### 使用方式
1. 正常加载模块即可, 加载方式参见 TODO:
### 几种方式介绍
- DEBUG(): 用于非常细致的日志输出,可以控制调试的日志输出
- INFO(): 用于普通的日志输出, 可以是稍微重要一些的状态更新
- WARNING(): 用于警告级别的日志输出
- ERROR(): 用于一般错误的输出,即不影响程序运行的错误
- FATAL(): 用于严重错误的输出,即影响严重错误的错误
- DIRECT(): 最高级别的输出方式
- DATAINFO(name, value): 用于输出一个变量的值。
- **_IF(condition): 如果 condition==true,则输出内容
- **_IF_NOT(condition): 如果 condition==false,则输出内容
### 注意事项
1. 通常与utils模块中的TIME_LIMIT_EXEC(time_ms)连用。可以用于控制日志的输出频率。如:
```
// 每秒输出一条 Hello World!
TIME_LIMIT_EXEC(1000) {
INFO() << "Hello World!";
}
```
2. DEBUG/INFO/DIRECT 都是标准输出,ERROR/FATAL 是标准错误输出。
\ No newline at end of file
#include "flogger.h"
#include <stdio.h>
#include <sys/stat.h>
FLogger::FLogger(const std::string &dir) : _dir(dir){
mkdir(dir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
}
void FLogger::error(const std::string &log){
_error_size += log.size();
if(check_error_size()){
_error_size = log.size();
}
_error_ostream << log;
}
void FLogger::info(const std::string &log){
_info_size += log.size();
if(check_info_size()){
_info_size = log.size();
}
_info_ostream << log;
}
void FLogger::flush(){
_error_ostream.flush();
_info_ostream.flush();
}
void FLogger::remove_oldest_log(){
auto files = AppUtil::get_file_list(_dir, ".*\\.log");
if (!files.empty()){
std::string file = _dir + "/" + files.front();
std::remove(file.c_str());
bool failed = std::ifstream(file).is_open();
if (failed){
ERROR() << "remove error log file faild: " << file;
}
}
}
// 检查当前ERROR.log文件的写入大小, 超过上限值, 会写入新文件
// 同时, 会检查最多保留的log个数, INFO+ERROR log总个数有一个上限值 _max_num
bool FLogger::check_error_size(){
bool res = _error_size > _max_size;
if (res || !_error_ostream.is_open()){
_error_ostream = std::ofstream(_dir + "/" + AppUtil::now_date() + "_" + AppUtil::now_time() + "_ERROR.log");
check_file_num();
}
return res;
}
bool FLogger::check_info_size(){
bool res = _info_size > _max_size;
if (res|| !_info_ostream.is_open()){
_info_ostream = std::ofstream(_dir + "/" + AppUtil::now_date() + "_" + AppUtil::now_time() + "_INFO.log");
check_file_num();
}
return res;
}
void FLogger::check_file_num(){
auto files = AppUtil::get_file_list(_dir, ".*\\.log");
if (files.size() > max_num()){ // 使用 max_num() 并不比直接使用_max_num效率差, 编译优化后, 效率是相同的.
int i = files.size() - max_num();
for (; i > 0; i--){
remove_oldest_log();
}
}
}
uint64_t FLogger::max_num() const
{
return _max_num;
}
void FLogger::set_max_num(const uint64_t &max_num)
{
_max_num = max_num;
}
uint64_t FLogger::max_size() const
{
return _max_size;
}
void FLogger::set_max_size(const uint64_t &max_size)
{
_max_size = max_size;
}
#pragma once
#include <utils/app_util.hpp>
#include <log/logging.h>
#include <ostream>
// File logger
// 文件处理类,用于将log保存在 日期-时间 的文件中
class FLogger
{
public:
FLogger(const std::string& dir);
~FLogger(){
};
// 将字符串log写入文件_error_ostream
void error(const std::string& log);
// 将字符串log写入文件_info_ostream
void info(const std::string& log);
// 刷新缓冲区,立即将ostream中得内容写入到文件中
void flush();
inline uint64_t max_size() const;
void set_max_size(const uint64_t &max_size);
inline uint64_t max_num() const;
void set_max_num(const uint64_t &max_num);
private:
void remove_oldest_log();
bool check_error_size();
bool check_info_size();
void check_file_num();
private:
std::string _dir;
std::ofstream _error_ostream;
std::ofstream _info_ostream;
uint64_t _error_size = 0;
uint64_t _info_size = 0;
uint64_t _max_size = 1024 * 100;
uint64_t _max_num = 10;
};
#include "server_log.h"
#include <pipe/pipe_controller.hpp>
PipeController g_controllor;
extern "C"{
void LOAD_PLUGIN(){
g_controllor.add_element<ServerLogElement>();
}
void RUN_PLUGIN(){
g_controllor.start();
}
void UNLOAD_PLUGIN(){
g_controllor.stop();
g_controllor.wait();
}
}
#include "server_log.h"
#include <utils/app_preference.hpp>
ServerLogElement::ServerLogElement() :
TimerElement(1000, "ServerLog"),
_buffer_error(1000), _buffer_info(1000),
_floger(appPref.get_string_data("log.log_dir")){
_floger.set_max_num(AppUtil::safe_stoi(appPref.get_string_data("log.log_num")));
_floger.set_max_size(AppUtil::safe_stoi(appPref.get_string_data("log.log_size")) * 1024);
Messager::subcribe<std::string>("log_debug", [&](const std::string& log){
info(log);
});
Messager::subcribe<std::string>("log_info", [&](const std::string& log){
info(log);
});
Messager::subcribe<std::string>("log_direct", [&](const std::string& log){
info(log);
});
Messager::subcribe<std::string>("log_warning", [&](const std::string& log){
error(log);
});
Messager::subcribe<std::string>("log_error", [&](const std::string& log){
error(log);
});
Messager::subcribe<std::string>("log_fatal", [&](const std::string& log){
error(log);
});
}
void ServerLogElement::initial()
{
ServerLogElement::instance();
}
void ServerLogElement::error(const std::string &log)
{
_buffer_error.push_data(log);
}
void ServerLogElement::info(const std::string &log)
{
_buffer_info.push_data(log);
}
void ServerLogElement::timer_func()
{
if (_buffer_error.is_updated()){
auto errors = _buffer_error.get_data();
for (auto error : errors){
_floger.error(error);
}
}
if (_buffer_info.is_updated()){
auto infos = _buffer_info.get_data();
for (auto info : infos){
_floger.info(info);
}
}
_floger.flush();
}
#pragma once
#include <core/double_buffered_vector.hpp>
#include <message/messager.hpp>
#include <diag/frequence.hpp>
#include <diag/diagnose.hpp>
#include <pipe/timer_element.hpp>
#include "flogger.h"
class ServerLogElement : public TimerElement
{
public:
ServerLogElement();
public:
static void initial();
const static ServerLogElement& instance(){
static ServerLogElement l_instance;
return l_instance;
}
private:
void error(const std::string& log);
void info(const std::string& log);
DoubleBufferedVector<std::string> _buffer_error;
DoubleBufferedVector<std::string> _buffer_info;
FLogger _floger;
private:
virtual void timer_func() override;
};
cmake_minimum_required(VERSION 2.8)
project(server_proc)
set(CMAKE_CXX_STANDARD 11)
add_definitions(-DLOAD_PLUGIN=load_${PROJECT_NAME})
add_definitions(-DRUN_PLUGIN=run_${PROJECT_NAME})
add_definitions(-DUNLOAD_PLUGIN=unload_${PROJECT_NAME})
find_package(Protobuf REQUIRED)
find_package(Zmq REQUIRED)
include_directories(
${CORELIB_INCLUDE_DIR}
${PROTOBUF_INCLUDE_DIR}
${ZMQ_INCLUDE_DIR}
${DATA_INCLUDE_DIR}
${UTILITIES_INCLUDE_DIR}
)
ADDLIB(${PROJECT_NAME} src
${ZMQ_LIBRARIES}
${PROTOBUF_LIBRARIES}
proto_data
proc
)
## server_proc模块说明
这个模块定时保存(/proc/[当前进程id])目录的快照,用于保存当前进程的内存信息和当前进程及内部所有线程的CPU信息。
#include "server_proc.h"
#include "server_proc_task.h"
#include <pipe/pipe_controller.hpp>
PipeController g_controllor;
extern "C"{
void LOAD_PLUGIN(){
ServerData::initial();
g_controllor.add_element<ServerProcTask>();
}
void RUN_PLUGIN(){
g_controllor.start();
}
void UNLOAD_PLUGIN(){
g_controllor.stop();
g_controllor.wait();
}
}
#include "server_proc.h"
#include <diag/diagnose.hpp>
#include <utils/app_preference.hpp>
void ProcUtils::get_proc_list(QnxProcList &list){
parse_procfs(list);
}
void ProcUtils::get_proc_list_str(std::string &list_str){
QnxProcList list;
parse_procfs(list);
proc_list_to_string(list, list_str, -1, -1);
}
void ProcUtils::get_proc_list_str_byid(std::string &list_str, int id){
QnxProcList list;
parse_procfs(list);
proc_list_to_string(list, list_str, id, -1);
}
void ProcUtils::get_current_proc_list_str(std::string &list_str){
QnxProcList list;
parse_procfs(list);
proc_list_to_string(list, list_str, getpid(), -1);
}
#pragma once
#include <proc.h>
class ProcUtils
{
private:
ProcUtils();
public:
static void get_proc_list(QnxProcList& list);
static void get_proc_list_str(std::string& list_str);
static void get_proc_list_str_byid(std::string& list_str, int id);
static void get_current_proc_list_str(std::string& list_str);
};
此差异已折叠。
#pragma once
#include <pipe/timer_element.hpp>
#include <core/double_buffer_data.hpp>
#include <data/qnxproc.pb.h>
#include <zmq/zmq.hpp>
class ServerProcTask : public TimerElement{
public:
ServerProcTask();
// TimerElement interface
private:
void timer_func();
DoubleBufferData<std::string> _proc_text;
DoubleBufferData<QnxProcList> _proc_list;
std::string _proc_log_dir;
bool _proc_log;
bool _proc_remote;
};
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
# 此文件为本地配置文件,如有增加配置,请在流水线配置(项目opt目录下pipeline_config找到同名文件)增加同样配置
[log]
log_level = 1 # 1 INFO, 2 WARNING, 3 ERROR, 4 FALTIL, other DIRECT
log_dir = log
log_size = 100 # unit:kB
log_num = 10
proc_log = 1
[priority]
Frequence = 20
ServerLog = 10
SensorFusion = 20
RoadLoc = 15
MapMatch = 16
HadmapPilot = 18
HadmapServer = 17
ExternedLanesTask = 17
AroundMapTask = 14
InputSensorRos = 20
HadmapRefreshing = 12
ServerDiagZmq = 16
ServerDiagUdp = 16
OutputResultRos = 16
[affinity]
CpuIndex = 0
[cpu]
\ No newline at end of file
[proc]
is_remote = 0
\ No newline at end of file
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册