提交 88e1000d 编写于 作者: S SanmuWangZJU 提交者: wangzelin.wzl

[liboblog] opensource liboblog

上级 9543929e
......@@ -166,7 +166,7 @@ void* easy_pool_default_realloc(void* ptr, size_t size)
if (size) {
return realloc(ptr, size);
} else if (ptr) {
// free(ptr);
free(ptr);
}
return 0;
......
......@@ -113,6 +113,8 @@ public:
{
return is_updated_;
}
bool is_use_ssl() const { return is_use_ssl_; }
void disable_ssl() { is_use_ssl_ = false; }
int64_t to_string(char* buf, const int64_t buf_len) const
{
int64_t pos = 0;
......@@ -179,6 +181,7 @@ protected:
bool is_updated_;
bool is_stop_;
bool is_use_ssl_;
ObMySQLConnection::Mode mode_;
int tg_id_;
......
......@@ -100,7 +100,7 @@ public:
*
*/
inline int clone(const ObString& rv, ObDataBuffer& buf);
int clone(const ObString& rv, ObDataBuffer& buf);
// reset
void reset()
......
......@@ -53,6 +53,7 @@ add_subdirectory(archive)
add_subdirectory(election)
add_subdirectory(storage)
add_subdirectory(observer)
add_subdirectory(liboblog)
if (OB_ENABLE_SERVER_PCH)
target_precompile_headers(ob_base INTERFACE ${ob_server_pchs})
......
add_definitions(-D_GLIBCXX_USE_CXX11_ABI=0)
add_subdirectory(src)
add_subdirectory(tests)
add_library(oblog_base INTERFACE)
target_include_directories(oblog_base INTERFACE ${CMAKE_CURRENT_SOURCE_DIR})
target_link_libraries(oblog_base
INTERFACE ob_base oblogmsg
)
add_library(oblog_miner INTERFACE)
target_include_directories(oblog_miner INTERFACE ${CMAKE_CURRENT_SOURCE_DIR})
target_link_libraries(oblog_miner
INTERFACE ob_base oblogmsg_static
)
set(SRC_LIST
liboblog.h liboblog.cpp
ob_concurrent_seq_queue.h ob_concurrent_seq_queue.cpp
ob_easy_hazard_map.h
ob_log_adapt_string.h ob_log_adapt_string.cpp
ob_log_adapt_string.h ob_log_adapt_string.cpp
ob_log_all_svr_cache.h ob_log_all_svr_cache.cpp
ob_log_binlog_record.h ob_log_binlog_record.cpp
ob_log_binlog_record_pool.h ob_log_binlog_record_pool.cpp
ob_log_binlog_record_queue.h ob_log_binlog_record_queue.cpp
ob_log_cluster_id_filter.cpp ob_log_cluster_id_filter.h
ob_log_common.h
ob_log_config.h ob_log_config.cpp
ob_log_ddl_handler.cpp ob_log_ddl_handler.h
ob_log_ddl_parser.h ob_log_ddl_parser.cpp
ob_log_dlist.h
ob_log_dml_parser.h ob_log_dml_parser.cpp
ob_log_fake_common_config.h
ob_log_fetch_log_rpc.h ob_log_fetch_log_rpc.cpp
ob_log_fetch_stat_info.h ob_log_fetch_stat_info.cpp
ob_log_fetch_stream.h ob_log_fetch_stream.cpp
ob_log_fetch_stream_container.h ob_log_fetch_stream_container.cpp
ob_log_fetch_stream_pool.h ob_log_fetch_stream_pool.cpp
ob_log_fetch_stream_type.h ob_log_fetch_stream_type.cpp
ob_log_fetcher.h ob_log_fetcher.cpp
ob_log_fetcher_dead_pool.h ob_log_fetcher_dead_pool.cpp
ob_log_fetcher_dispatcher.cpp ob_log_fetcher_dispatcher.h
ob_log_fetcher_heartbeat_worker.cpp ob_log_fetcher_heartbeat_worker.h
ob_log_fetcher_heartbeat_worker.h ob_log_fetcher_heartbeat_worker.cpp
ob_log_fetcher_idle_pool.h ob_log_fetcher_idle_pool.cpp
ob_log_formatter.h ob_log_formatter.cpp
ob_log_entry_task_pool.h ob_log_entry_task_pool.cpp
ob_log_row_list.h ob_log_row_list.cpp
ob_log_row_data_index.h ob_log_row_data_index.cpp
ob_log_store_service.h
ob_log_store_service_stat.h ob_log_store_service_stat.cpp
ob_log_mock_store_service.h
ob_log_storager.h ob_log_storager.cpp
ob_log_reader_plug_in.h ob_log_reader_plug_in.cpp
ob_log_data_processor.h ob_log_data_processor.cpp
ob_log_hbase_mode.h ob_log_hbase_mode.cpp
ob_log_work_mode.h ob_log_work_mode.cpp
ob_log_instance.h ob_log_instance.cpp
ob_log_lighty_list.h
ob_log_main.c
ob_log_meta_manager.h ob_log_meta_manager.cpp
ob_log_mysql_connector.h ob_log_mysql_connector.cpp
ob_log_mysql_proxy.h ob_log_mysql_proxy.cpp
ob_log_part_fetch_ctx.h ob_log_part_fetch_ctx.cpp
ob_log_part_fetch_mgr.h ob_log_part_fetch_mgr.cpp
ob_log_part_mgr.h ob_log_part_mgr.cpp
ob_log_table_id_cache.h ob_log_table_id_cache.cpp
ob_log_part_progress_controller.h ob_log_part_progress_controller.cpp
ob_log_part_serve_info.h
ob_log_part_svr_list.h ob_log_part_svr_list.cpp
ob_log_svr_blacklist.h ob_log_svr_blacklist.cpp
ob_log_part_trans_dispatcher.cpp ob_log_part_trans_dispatcher.h
ob_log_part_trans_parser.h ob_log_part_trans_parser.cpp
ob_log_part_trans_resolver.h ob_log_part_trans_resolver.cpp
ob_log_part_trans_resolver_factory.cpp ob_log_part_trans_resolver_factory.h
ob_log_part_trans_task.cpp ob_log_part_trans_task.h
ob_log_part_trans_task.h ob_log_part_trans_task.cpp
ob_log_part_trans_task_queue.h ob_log_part_trans_task_queue.cpp
ob_log_entry_wrapper.h ob_log_entry_wrapper.cpp
ob_log_resource_collector.h ob_log_resource_collector.cpp
ob_log_resource_recycle_task.h
ob_log_rpc.h ob_log_rpc.cpp
ob_log_schema_cache_info.h ob_log_schema_cache_info.cpp
ob_log_schema_getter.h ob_log_schema_getter.cpp
ob_log_sequencer1.h ob_log_sequencer1.cpp
ob_log_server_priority.h ob_log_server_priority.cpp
ob_log_sql_server_provider.h ob_log_sql_server_provider.cpp
ob_log_start_log_id_locator.h ob_log_start_log_id_locator.cpp
ob_log_stream_worker.h ob_log_stream_worker.cpp
ob_log_svr_finder.h ob_log_svr_finder.cpp
ob_log_svr_stream.h ob_log_svr_stream.cpp
ob_log_systable_helper.h ob_log_systable_helper.cpp
ob_log_table_matcher.h ob_log_table_matcher.cpp
ob_log_start_schema_matcher.h ob_log_start_schema_matcher.cpp
ob_log_task_pool.h
ob_log_timer.h ob_log_timer.cpp
ob_log_timezone_info_getter.h ob_log_timezone_info_getter.cpp
ob_log_trace_id.h ob_log_trace_id.cpp
ob_log_trans_ctx.h ob_log_trans_ctx.cpp
ob_log_trans_ctx_mgr.h ob_log_trans_ctx_mgr.cpp
ob_log_trans_log.h ob_log_trans_log.cpp
ob_log_trans_stat_mgr.h ob_log_trans_stat_mgr.cpp
ob_log_utils.h ob_log_utils.cpp
ob_map_queue.h
ob_map_queue_thread.h
ob_ms_queue_thread.h
ob_obj2str_helper.h ob_obj2str_helper.cpp
ob_seq_thread.h
ob_small_arena.h ob_small_arena.cpp
ob_log_tenant_task_queue.h ob_log_tenant_task_queue.cpp
ob_log_tenant.h ob_log_tenant.cpp
ob_log_tenant_mgr.h ob_log_tenant_mgr.cpp
ob_log_part_info.h ob_log_part_info.cpp
ob_log_part_callback.h
ob_log_ref_state.h ob_log_ref_state.cpp
ob_log_committer.h ob_log_committer.cpp
)
add_library(oblog_objects OBJECT ${SRC_LIST})
target_link_libraries(oblog_objects PUBLIC oblog_base)
target_compile_definitions(oblog_objects PRIVATE ENABLE_DEBUG_LOG)
target_compile_options(oblog_objects PRIVATE -Werror)
disable_pch(oblog_objects)
set(LGPL_DEPS "-L${DEP_DIR}/lib/mariadb -l:libmariadbclient.a -laio")
if (OB_SO_CACHE)
add_library(oblog SHARED IMPORTED GLOBAL)
set_target_properties(oblog PROPERTIES
IMPORTED_LOCATION "${CMAKE_CURRENT_BINARY_DIR}/liboblog.so"
IMPORTED_LINK_INTERFACE_LIBRARIES oblog_miner)
else()
add_library(oblog SHARED ${CMAKE_BINARY_DIR}/src/observer/ob_version.cpp)
target_link_libraries(oblog
PUBLIC oblog_base oblib
PRIVATE
-Wl,--whole-archive
$<TARGET_OBJECTS:oblog_objects>
-Wl,--no-whole-archive
-Wl,--start-group
oceanbase_static
-Wl,--end-group
-static-libgcc -static-libstdc++
-Wl,-Bsymbolic
-Wl,-e,so_main
${LGPL_DEPS}
easy
)
add_dependencies(oblog oblog_objects)
set_target_properties(oblog PROPERTIES SOVERSION 1 VERSION 1.0.0)
endif()
add_library(oblog_objects_miner OBJECT ${SRC_LIST})
disable_pch(oblog_objects_miner)
target_link_libraries(oblog_objects_miner PUBLIC oblog_miner)
add_library(oblog_static
STATIC
EXCLUDE_FROM_ALL
${CMAKE_BINARY_DIR}/src/observer/ob_version.cpp)
target_link_libraries(oblog_static
PUBLIC oblog_objects_miner oblib
-Wl,--start-group
oceanbase_static
-Wl,--end-group
PRIVATE -static-libgcc -static-libstdc++
${LGPL_DEPS}
)
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#include "liboblog.h"
#include <locale.h>
#include <curl/curl.h>
#include "lib/allocator/ob_malloc.h" // ob_set_memory_size_limit
#include "lib/utility/utility.h" // get_phy_mem_size
#include "ob_log_common.h" // MAX_MEMORY_USAGE_PERCENT
#include "ob_log_instance.h" // ObLogInstance
using namespace oceanbase::common;
namespace oceanbase
{
namespace liboblog
{
ObLogFactory::ObLogFactory()
{
// set max memory limit
lib::set_memory_limit(get_phy_mem_size() * MAX_MEMORY_USAGE_PERCENT / 100);
CURLcode curl_code = curl_global_init(CURL_GLOBAL_ALL);
if (OB_UNLIKELY(CURLE_OK != curl_code)) {
OBLOG_LOG(ERROR, "curl_global_init fail", K(curl_code));
}
setlocale(LC_ALL, "");
setlocale(LC_TIME, "en_US.UTF-8");
}
ObLogFactory::~ObLogFactory()
{
curl_global_cleanup();
}
IObLog *ObLogFactory::construct_oblog()
{
return ObLogInstance::get_instance();
}
void ObLogFactory::deconstruct(IObLog *log)
{
UNUSED(log);
ObLogInstance::destroy_instance();
}
} // namespace liboblog
} // namespace oceanbase
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef OCEANBASE_LIBOBLOG_LIBOBLOG_
#define OCEANBASE_LIBOBLOG_LIBOBLOG_
#ifndef __STDC_LIMIT_MACROS
#define __STDC_LIMIT_MACROS
#endif
#ifndef __STDC_CONSTANT_MACROS
#define __STDC_CONSTANT_MACROS
#endif
#include <fnmatch.h> // FNM_CASEFOLD
#include <stdint.h>
#include <map>
#include <LogRecord.h>
using namespace oceanbase::logmessage;
namespace oceanbase
{
namespace liboblog
{
struct ObLogError
{
enum ErrLevel
{
ERR_WARN = 0,
ERR_ABORT,
} level_; ///< error level
int errno_; ///< error number
const char *errmsg_; ///< error message
};
typedef void (* ERROR_CALLBACK) (const ObLogError &err);
class IObLog
{
public:
virtual ~IObLog() {};
public:
/*
* init liboblog
* @param config_file config file name
* @param start_timestamp start timestamp (by second)
* @param err_cb error callback function pointer
*/
virtual int init(const char *config_file,
const uint64_t start_timestamp,
ERROR_CALLBACK err_cb = NULL) = 0;
/*
* init liboblog
* @param configs config by map
* @param start_timestamp start timestamp (by secon)
* @param err_cb error callback function pointer
*/
virtual int init(const std::map<std::string, std::string>& configs,
const uint64_t start_timestamp,
ERROR_CALLBACK err_cb = NULL) = 0;
/*
* init liboblog
* @param configs config by map
* @param start_timestamp start timestamp by microsecond
* @param err_cb error callback function pointer
*/
virtual int init_with_start_tstamp_usec(const std::map<std::string, std::string>& configs,
const uint64_t start_timestamp_usec,
ERROR_CALLBACK err_cb = NULL) = 0;
virtual void destroy() = 0;
/*
* fetch next binlog record from OB cluster
* @param record binlog record, memory allocated by oblog, support release_record(corresponding times) after mutli next_record
* @param OB_SUCCESS success
* @param OB_TIMEOUT timeout
* @param other errorcode fail
*/
virtual int next_record(ILogRecord **record, const int64_t timeout_us) = 0;
/*
* fetch next binlog record from OB cluster
* @param [out] record binlog record, memory allocated by oblog, support release_record(corresponding tiems) after mutli next_record
* @param [out] major_version major version of ILogRecord
* @param [out] tenant_id tenant id of ILogRecord
*
* @param OB_SUCCESS success
* @param OB_TIMEOUT timeout
* @param other error code fail
*/
virtual int next_record(ILogRecord **record,
int32_t &major_version,
uint64_t &tenant_id,
const int64_t timeout_us) = 0;
/*
* release recorcd for EACH ILogRecord
* @param record
*/
virtual void release_record(ILogRecord *record) = 0;
/*
* Launch liboblog
* @retval OB_SUCCESS on success
* @retval ! OB_SUCCESS on fail
*/
virtual int launch() = 0;
/*
* Stop liboblog
*/
virtual void stop() = 0;
/// Match the TableGroup being served
/// Currently, TableGroup refers to a specific Database in the format "Tenant.Database".
///
/// @param [in] pattern target pattern string
/// @param [out] is_matched match result
/// @param [in] fnmatch_flags fnmatch flags
///
/// @retval OB_SUCCESS success
/// @retval other value fail
virtual int table_group_match(const char *pattern, bool &is_matched, int fnmatch_flags = FNM_CASEFOLD) = 0;
/// get all serving tenant TableGroup list
///
/// @param [out] table_groups tablegroup list
///
/// @retval OB_SUCCESS success
/// @retval other value fail
virtual int get_table_groups(std::vector<std::string> &table_groups) = 0;
/// get all serving tenant id list after oblog inited
///
/// @param [out] tenant_ids tenant ids that oblog serving
///
/// @retval OB_SUCCESS success
/// @retval other value fail
virtual int get_tenant_ids(std::vector<uint64_t> &tenant_ids) = 0;
};
class ObLogFactory
{
public:
ObLogFactory();
~ObLogFactory();
public:
IObLog *construct_oblog();
void deconstruct(IObLog *log);
};
}
}
#endif // OCEANBASE_LIBOBLOG_LIBOBLOG_
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#define USING_LOG_PREFIX LIB
#include "ob_concurrent_seq_queue.h"
#include <linux/futex.h> // futex
#include "lib/ob_define.h"
#include "lib/time/ob_time_utility.h" // ObTimeUtility
#include "share/ob_errno.h" // KR
namespace oceanbase
{
namespace common
{
static struct timespec make_timespec(int64_t us)
{
timespec ts;
ts.tv_sec = us / 1000000;
ts.tv_nsec = 1000 * (us % 1000000);
return ts;
}
#define futex(...) syscall(SYS_futex,__VA_ARGS__)
inline int futex_wake(volatile int *p, int val)
{
return static_cast<int>(futex((int *)p, FUTEX_WAKE_PRIVATE, val, NULL, NULL, 0));
}
// 0: Woken up by FUTEX_WAKE
// ETIMEDOUT: Timeout
// EWOULDBLOCK: Target value changed, not equal to the incoming comparison value
// EINTR: woken up by various signals
inline int futex_wait(volatile int *p, int val, const timespec *timeout)
{
int ret = 0;
if (0 != futex((int *)p, FUTEX_WAIT_PRIVATE, val, timeout, NULL, 0)) {
ret = errno;
}
return ret;
}
/////////////////////////////////////////////////////////////////////////////
ObConcurrentSeqQueue::ObConcurrentSeqQueue(): items_(NULL), limit_(0), size_(0)
{}
ObConcurrentSeqQueue::~ObConcurrentSeqQueue()
{
destroy();
}
int ObConcurrentSeqQueue::init(const int64_t limit, const ObMemAttr &memattr)
{
int ret = OB_SUCCESS;
int64_t alloc_size = sizeof(SeqItem) * limit;
if (OB_UNLIKELY(0 >= limit)) {
ret = OB_INVALID_ARGUMENT;
LOG_ERROR("invalid argument", KR(ret), K(limit));
} else if (OB_UNLIKELY(limit_ > 0 || NULL != items_)) {
ret = OB_INIT_TWICE;
} else if (OB_ISNULL(items_ = (SeqItem *)ob_malloc(alloc_size, memattr))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_ERROR("allocate memory failed", K(alloc_size), KR(ret), K(items_));
} else {
limit_ = limit;
size_ = 0;
memset(items_, 0, sizeof(SeqItem) * limit);
for (int64_t i = 0; i < limit; i++) {
// default to NOT READY state
items_[i].seq_ = i;
}
}
return ret;
}
void ObConcurrentSeqQueue::destroy()
{
if (NULL != items_) {
ob_free(items_);
items_ = NULL;
}
limit_ = 0;
size_ = 0;
}
inline bool ObConcurrentSeqQueue::is_inited_() const
{
return NULL != items_ && limit_ > 0;
}
inline ObConcurrentSeqQueue::SeqItem &ObConcurrentSeqQueue::seq_item_(const int64_t seq)
{
return items_[seq % limit_];
}
int ObConcurrentSeqQueue::wait_on_item_(SeqItem &item,
const int64_t cmp_val,
const int64_t end_time)
{
int ret = OB_SUCCESS;
// Sleep time for one operation
// We set it relatively short to avoid the cost of "false sleep" caused by int32_t overflow
static const int64_t WAIT_TIME_ON_OP = 10L * 1000L;
int64_t wait_time_us = end_time - ObTimeUtility::current_time();
wait_time_us = std::min(wait_time_us, WAIT_TIME_ON_OP);
if (wait_time_us <= 0) {
ret = OB_TIMEOUT;
} else {
volatile int *p = reinterpret_cast<volatile int *>(&item.seq_);
int cmp_val_int32 = static_cast<int>(cmp_val & INT32_MASK);
timespec ts = make_timespec(wait_time_us);
/// Note: Our data is int64_t, but futex only supports int32_t.
/// This is a direct comparison of the lower 32 bits of int64_t, since our data is incremented
/// Only after the int32_t value overflows will the value be misclassified, leading to a false sleep.
/// We consider the probability of this occurring to be extremely low, and even if it does,
/// we reduce the impact by making the sleep wait time relatively short.
int futex_err = futex_wait(p, cmp_val_int32, &ts);
if (futex_err == ETIMEDOUT) {
ret = OB_TIMEOUT;
} else {
// 成功
}
}
return ret;
}
int ObConcurrentSeqQueue::update_seq_(SeqItem &item,
const int64_t expected_cur_seq,
const int64_t new_seq)
{
int ret = OB_SUCCESS;
// If the setup fails, it means there is a concurrent scenario and exit with an error
int64_t cur_seq = ATOMIC_CAS(&item.seq_, expected_cur_seq, new_seq);
if (OB_UNLIKELY(cur_seq != expected_cur_seq)) {
LOG_ERROR("update seq value fail, must have other threads updating the same item",
K(cur_seq), K(expected_cur_seq), K(new_seq), K(item.seq_), K(item.data_));
ret = OB_STATE_NOT_MATCH;
} else {
volatile int *p = reinterpret_cast<volatile int *>(&item.seq_);
// wake up all threads waiting on that element
// Note: The reason for waking up all threads here is to avoid waking up threads that are not the target threads
// For example: there may be multiple consuming/producing threads waiting for different seq values on this slot.
// In order to successfully wake up the target thread, we can only broadcast, because we don't have the ability to wake up a specific thread via futex
int64_t wake_num = futex_wake(p, INT32_MAX);
LOG_DEBUG("wake_up after update seq", "old_val", expected_cur_seq, "new_val", new_seq,
K(wake_num));
}
return ret;
}
int ObConcurrentSeqQueue::push(void *data, const int64_t seq, const int64_t timeout_us)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!is_inited_())) {
ret = OB_NOT_INIT;
LOG_ERROR("ObConcurrentSeqQueue not init", K(items_), K(limit_));
} else if (OB_UNLIKELY(seq < 0)) {
ret = OB_INVALID_ARGUMENT;
LOG_ERROR("invalid argument", K(seq));
} else {
bool ready_to_push = false;
SeqItem &item = seq_item_(seq);
int64_t end_time = ObTimeUtility::current_time() + timeout_us;
while (! ready_to_push && OB_SUCCESS == ret) {
// First save the current seq value
int64_t item_seq = ATOMIC_LOAD(&item.seq_);
// should not be greater than target seq
if (OB_UNLIKELY(item_seq > seq)) {
ret = OB_ENTRY_EXIST;
LOG_ERROR("invalid sequence: ENTRY_EXIST", K(seq), K(item.seq_));
} else {
ready_to_push = (item_seq == seq);
}
// Wait if the data is not ready to be pushed
if (OB_SUCCESS == ret && ! ready_to_push) {
ret = wait_on_item_(item, item_seq, end_time);
}
}
if (OB_SUCCESS == ret) {
// Set the data once the element is ready
item.data_ = data;
__sync_synchronize();
// seq -> seq + 1
if (OB_FAIL(update_seq_(item, seq, seq + 1))) {
LOG_ERROR("update seq fail after push data", KR(ret), K(seq));
} else {
ATOMIC_INC(&size_);
}
}
}
return ret;
}
int ObConcurrentSeqQueue::pop(void *&data, const int64_t asked_seq, const int64_t timeout_us)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!is_inited_())) {
ret = OB_NOT_INIT;
} else if (OB_UNLIKELY(asked_seq < 0)) {
ret = OB_INVALID_ARGUMENT;
OB_LOG(ERROR, "invalid sequence number", K(asked_seq));
} else {
bool ready_to_pop = false;
SeqItem &item = seq_item_(asked_seq);
int64_t end_time = ObTimeUtility::current_time() + timeout_us;
// The value becomes seq + 1, indicating that the data is ready
int64_t ready_seq = asked_seq + 1;
// Loop to wait for seq to become ready
while (! ready_to_pop && OB_SUCCESS == ret) {
// First save the current seq value
int64_t item_seq = ATOMIC_LOAD(&item.seq_);
if (OB_UNLIKELY(item_seq > ready_seq)) {
ret = OB_ENTRY_NOT_EXIST;
LOG_ERROR("invalid sequence: ENTRY_NOT_EXIST", K(asked_seq), K(ready_seq), K(item_seq));
} else {
ready_to_pop = (item_seq == ready_seq);
}
// waif if data is not ready
if (! ready_to_pop && OB_SUCCESS == ret) {
ret = wait_on_item_(item, item_seq, end_time);
}
}
// Take out the data, update the seq and prepare the next round of slots
if (OB_SUCCESS == ret) {
data = item.data_;
__sync_synchronize();
// update value of seq FROM asked_seq + 1 TO asked_seq + limit_
if (OB_FAIL(update_seq_(item, asked_seq + 1, asked_seq + limit_))) {
LOG_ERROR("update seq fail after pop data", K(asked_seq));
} else {
ATOMIC_DEC(&size_);
}
}
}
return ret;
}
}; // end namespace common
}; // end namespace oceanbase
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef OCEANBASE_COMMON_OB_CONCURRENT_SEQ_QUEUE_H__
#define OCEANBASE_COMMON_OB_CONCURRENT_SEQ_QUEUE_H__
#include "lib/utility/ob_macro_utils.h" // DISALLOW_COPY_AND_ASSIGN
#include "lib/allocator/ob_malloc.h" // default_memattr
namespace oceanbase
{
namespace common
{
// A fixed-length, concurrent Sequence Queue, where the sequence number is specified for push and pop.
// Usage scenarios:
// 1. Different seqs are pushed in parallel, and a specific seq can only be pushed by one thread
// 2. Different seqs are popped in parallel, and a specific seq can only be popped by one thread
// 3. different seqs do not guarantee the order of push and pop, there may be 0 <= M < N, pop(N) operation earlier than push(M)
//
// Implementation idea.
// Assuming that there are a total of H slots, each of which holds an element whose seq number is predetermined.
// For slot X, it stores the element seq number: X + K * H, K >= 0
//
// Each slot element is accompanied by a seq variable, which takes the following values, for slot X.
// 1. X + K * H: data is not ready, pending production
// 2. X + K * H + 1: data is ready, pending consumption
//
// Producer push element No. M: wait for the value of the element's seq variable to change to M, set the data, and then mark seq as M + 1, indicating that the data is ready.
// Consumer pop element #M: wait for the element's seq variable value to become M + 2, take the data out, then change seq to M + H and wait for the next round of push
//
class ObConcurrentSeqQueue
{
public:
ObConcurrentSeqQueue();
~ObConcurrentSeqQueue();
public:
int init(const int64_t queue_size, const ObMemAttr &memattr = default_memattr);
/// @retval OB_SUCCESS success
/// @retval OB_TIMEOUT timeout
/// @retval other value fail
int push(void *data, const int64_t seq, const int64_t timeout_us);
/// @retval OB_SUCCESS success
/// @retval OB_TIMEOUT timeout
/// @retval other value fail
int pop(void *&data, const int64_t seq, const int64_t timeout_us);
void destroy();
/// valid task number;
int64_t size() const { return ATOMIC_LOAD(&size_); }
private:
static const int64_t INT32_MASK = ((1LL << 32) - 1LL);
struct SeqItem
{
volatile int64_t seq_;
void *volatile data_;
};
private:
bool is_inited_() const;
SeqItem &seq_item_(const int64_t seq);
int wait_on_item_(SeqItem &item, const int64_t cmp_val, const int64_t end_time);
int update_seq_(SeqItem &item,
const int64_t expected_cur_seq,
const int64_t new_seq);
private:
SeqItem *items_ CACHE_ALIGNED;
int64_t limit_;
// task number
int64_t size_ CACHE_ALIGNED;
private:
DISALLOW_COPY_AND_ASSIGN(ObConcurrentSeqQueue);
};
} // end namespace common
} // end namespace oceanbase
#endif /* OCEANBASE_COMMON_OB_CONCURRENT_SEQ_QUEUE_H__ */
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef OCEANBASE_LIBOBLOG_EASY_HAZARD_MAP_
#define OCEANBASE_LIBOBLOG_EASY_HAZARD_MAP_
#include "lib/atomic/ob_atomic.h" // ATOMIC_*
#include "lib/objectpool/ob_small_obj_pool.h" // ObSmallObjPool
#include "lib/hash/ob_concurrent_hash_map_with_hazard_value.h" // ObConcurrentHashMapWithHazardValue
#include "share/ob_errno.h" // KR
namespace oceanbase
{
namespace liboblog
{
template <class K, class V>
class ObEasyHazardMap
{
typedef common::ObConcurrentHashMapWithHazardValue<K, V*> EMap;
typedef common::ObSmallObjPool<V> EPool;
class EAlloc : public EMap::IValueAlloc
{
public:
EAlloc() : inited_(false), pool_() {}
~EAlloc() { destroy(); }
public:
V* alloc();
void free(V* value);
int64_t get_alloc_count() const { return pool_.get_alloc_count(); }
int64_t get_free_count() const { return pool_.get_free_count(); }
public:
int init(const int64_t max_cached_count,
const int64_t block_size,
const char *label,
const uint64_t tenant_id);
void destroy();
private:
bool inited_;
EPool pool_;
};
public:
ObEasyHazardMap() : inited_(false), valid_count_(0), alloc_(), map_() {}
virtual ~ObEasyHazardMap() { destroy(); }
public:
int init(const int64_t max_cached_count,
const int64_t block_size,
const char *label,
const uint64_t tenant_id = common::OB_SERVER_TENANT_ID);
void destroy();
int64_t get_valid_count() const { return valid_count_; }
int64_t get_alloc_count() const { return alloc_.get_alloc_count(); }
int64_t get_free_count() const { return alloc_.get_free_count(); }
/// 元素的分配与释放
V* alloc() { return alloc_.alloc(); }
void free(V *value) { alloc_.free(value); }
/// contains key or not
///
/// @param key key to find
///
/// @retval OB_ENTRY_EXIST element exist
/// @retval OB_ENTRY_NOT_EXIST element not exist
/// @retval other errcode fail
int contains_key(const K &key);
/// insert element
/// NOTICE:
/// 1. The interface has no get semantics, no need to call revert(), if the element is to be used further, call get() once
/// 2. Elements must be allocated and freed using the alloc()/free() functions provided by this class
///
/// @param [in] key key
/// @param [int] value Value
///
/// @retval OB_SUCCESS success
/// @retval OB_ENTRY_EXIST element already exist
/// @retval other errcode fail
int insert(const K &key, V *value);
/// Get Key-Value record, support creating a new Value when it does not exist
///
/// @param [in] key key
/// @param [out] value value of key
/// @param [in] enable_create Whether to allow the creation of a new Value object if it does not exist
///
/// @retval OB_SUCCESS success
/// @retval OB_ENTRY_NOT_EXIST Does not exist, return when enable_create is false
/// @retval other errcode fail
int get(const K &key, V *&value, bool enable_create = false);
/// Return to Value object
///
/// @param value Value object
///
/// @retval OB_SUCCESS success
/// @retval other errcode fail
int revert(V *value);
/// Delete Key-Value records
///
/// @param key key of target operation
///
/// @retval OB_SUCCESS success
/// @retval OB_ENTRY_NOT_EXIST key not exist
/// @retval other errcode fail
int remove(const K &key);
void print_state(const char *mod_str) const;
template <typename Function> int for_each(Function &fn)
{
int ret = common::OB_SUCCESS;
if (! inited_) {
ret = common::OB_NOT_INIT;
} else {
ret = map_.for_each(fn);
}
return ret;
}
private:
bool inited_;
int64_t valid_count_;
EAlloc alloc_;
EMap map_;
private:
DISALLOW_COPY_AND_ASSIGN(ObEasyHazardMap);
};
///////////////////////////////////////////////////////////////////////////////////
template <class K, class V>
int ObEasyHazardMap<K, V>::EAlloc::init(const int64_t max_cached_count,
const int64_t block_size,
const char *label,
const uint64_t tenant_id)
{
int ret = common::OB_SUCCESS;
if (inited_) {
ret = common::OB_INIT_TWICE;
} else if (max_cached_count <= 0 || block_size <= 0) {
ret = common::OB_INVALID_ARGUMENT;
} else if (OB_FAIL(pool_.init(max_cached_count, label, tenant_id, block_size))) {
LIB_LOG(ERROR, "init value pool fail", KR(ret), K(max_cached_count), K(block_size));
} else {
inited_ = true;
}
return ret;
}
template <class K, class V>
void ObEasyHazardMap<K, V>::EAlloc::destroy()
{
inited_ = false;
pool_.destroy();
}
template <class K, class V>
V* ObEasyHazardMap<K, V>::EAlloc::alloc()
{
int ret = common::OB_SUCCESS;
V *ret_obj = NULL;
if (! inited_) {
ret = common::OB_NOT_INIT;
} else if (OB_FAIL(pool_.alloc(ret_obj))) {
LIB_LOG(ERROR, "alloc value from pool fail", KR(ret));
} else {
ret_obj->reset();
}
return ret_obj;
}
template <class K, class V>
void ObEasyHazardMap<K, V>::EAlloc::free(V *value)
{
int ret = common::OB_SUCCESS;
if (inited_ && NULL != value) {
if (OB_FAIL(pool_.free(value))) {
LIB_LOG(ERROR, "free value fail", K(value), KR(ret));
} else {
value = NULL;
}
}
}
/////////////////////////////////////////////////////////////////////////////////////
template <class K, class V>
int ObEasyHazardMap<K, V>::init(const int64_t max_cached_count,
const int64_t block_size,
const char *label,
const uint64_t tenant_id)
{
int ret = common::OB_SUCCESS;
if (inited_) {
ret = common::OB_INIT_TWICE;
} else if (max_cached_count <= 0 || block_size <= 0) {
ret = common::OB_INVALID_ARGUMENT;
} else if (OB_FAIL(alloc_.init(max_cached_count, block_size, label, tenant_id))) {
LIB_LOG(ERROR, "init allocator fail", KR(ret), K(max_cached_count), K(block_size));
} else if (OB_FAIL(map_.init(&alloc_))) {
LIB_LOG(ERROR, "init map fail", KR(ret));
} else {
valid_count_ = 0;
inited_ = true;
}
return ret;
}
template <class K, class V>
void ObEasyHazardMap<K, V>::destroy()
{
inited_ = false;
valid_count_ = 0;
// FIXME: can't call destroy of EAlloc cause EMap don't have destroy funtion
// but EMap relays on EAlloc
// TODO: recycle each element in Map
}
template <class K, class V>
int ObEasyHazardMap<K, V>::contains_key(const K &key)
{
int ret = common::OB_SUCCESS;
if (OB_UNLIKELY(! inited_)) {
ret = common::OB_NOT_INIT;
} else {
ret = map_.contains_key(key);
}
return ret;
}
template <class K, class V>
int ObEasyHazardMap<K, V>::insert(const K &key, V *value)
{
int ret = common::OB_SUCCESS;
if (OB_UNLIKELY(! inited_)) {
ret = common::OB_NOT_INIT;
} else if (OB_FAIL(map_.put_refactored(key, value))) {
if (common::OB_ENTRY_EXIST == ret) {
// element exist
} else {
LIB_LOG(WARN, "put value into easy hazard map fail", KR(ret), K(key), K(value));
}
} else {
// Inserted successfully, increase the number of valid
ATOMIC_INC(&valid_count_);
}
return ret;
}
template <class K, class V>
int ObEasyHazardMap<K, V>::get(const K &key, V *&value, bool enable_create)
{
int ret = common::OB_SUCCESS;
if (! inited_) {
ret = common::OB_NOT_INIT;
} else if (OB_SUCC(map_.get_refactored(key, value))) {
// succ
} else if (OB_LIKELY(common::OB_ENTRY_NOT_EXIST == ret) && OB_LIKELY(enable_create)) {
// Create a new record when the record does not exist and is allowed to create a new record
while (common::OB_ENTRY_NOT_EXIST == ret) {
if (OB_SUCC(map_.create_refactored(key, value))) {
// Created successfully and returned the object just created
ATOMIC_INC(&valid_count_);
} else if (OB_UNLIKELY(common::OB_ENTRY_EXIST == ret)) {
// Create operational conflicts and get them through the get interface
LIB_LOG(DEBUG, "create value conflict, get value instead", K(key));
ret = map_.get_refactored(key, value);
if (OB_UNLIKELY(common::OB_SUCCESS != ret)
&& OB_UNLIKELY(common::OB_ENTRY_NOT_EXIST != ret)) {
LIB_LOG(ERROR, "get value from map fail", KR(ret));
} else if (OB_UNLIKELY(common::OB_ENTRY_NOT_EXIST == ret)) {
// The second get record still does not exist, which means the record has been deleted in the meantime, try again next time
LIB_LOG(WARN, "value not exist after create-get. retry immediately.");
}
} else {
LIB_LOG(ERROR, "create value from map fail", KR(ret));
}
}
} else if (OB_UNLIKELY(common::OB_ENTRY_NOT_EXIST != ret)) {
LIB_LOG(ERROR, "get value from map fail", KR(ret), K(key));
}
return ret;
}
template <class K, class V>
int ObEasyHazardMap<K, V>::revert(V *value)
{
int ret = common::OB_SUCCESS;
if (! inited_) {
ret = common::OB_NOT_INIT;
} else if (OB_UNLIKELY(NULL == value)) {
ret = common::OB_INVALID_ARGUMENT;
} else if (OB_FAIL(map_.revert_value(value))) {
LIB_LOG(ERROR, "revert value fail", KR(ret), K(value));
} else {
// succ
value = NULL;
}
return ret;
}
template <class K, class V>
int ObEasyHazardMap<K, V>::remove(const K &key)
{
int ret = common::OB_SUCCESS;
if (! inited_) {
ret = common::OB_NOT_INIT;
} else if (OB_FAIL(map_.remove_refactored(key))) {
if (common::OB_ENTRY_NOT_EXIST != ret) {
LIB_LOG(ERROR, "remove value fail", KR(ret), K(key));
}
} else {
// succ
ATOMIC_DEC(&valid_count_);
}
return ret;
}
template <class K, class V>
void ObEasyHazardMap<K, V>::print_state(const char *mod_str) const
{
_LIB_LOG(INFO, "%s VALID=%ld HAZARD_CACHED=%ld ALLOC=%ld FREE=%ld",
mod_str,
get_valid_count(),
get_alloc_count() - get_free_count() - get_valid_count(),
get_alloc_count(),
get_free_count());
}
} // namespace liboblog
} // namespace oceanbase
#endif /* OCEANBASE_LIBOBLOG_EASY_HAZARD_MAP_ */
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#define USING_LOG_PREFIX OBLOG
#include "ob_log_adapt_string.h"
#include "lib/allocator/ob_malloc.h" // ob_free
#include "lib/utility/ob_print_utils.h" // databuff_printf
using namespace oceanbase::common;
namespace oceanbase
{
namespace liboblog
{
ObLogAdaptString::ObLogAdaptString(const char *label) :
attr_(500, label),
buf_()
{}
ObLogAdaptString::~ObLogAdaptString()
{
if (NULL != buf_.get_data()) {
ob_free(buf_.get_data());
}
buf_.reset();
}
int ObLogAdaptString::append(const char *data)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(data)) {
LOG_ERROR("invalid argument", K(data));
ret = OB_INVALID_ARGUMENT;
} else {
int64_t size = strlen(data);
char *data_buf = NULL;
// Prepare memory even if the data length is 0, because the empty string case should be supported
if (OB_FAIL(alloc_buf_(size, data_buf))) {
LOG_ERROR("allocate buffer fail", KR(ret), K(size), K(data_buf));
}
// Non-empty string case requires buffer to be valid
else if (OB_UNLIKELY(size > 0 && NULL == data_buf)) {
LOG_ERROR("data buffer is invalid", K(data_buf), K(size));
ret = OB_ERR_UNEXPECTED;
} else if (size > 0) {
// copy data into data_buf
(void)MEMCPY(data_buf, data, size);
}
}
return ret;
}
int ObLogAdaptString::append_int64(const int64_t int_val)
{
int ret = OB_SUCCESS;
static const int64_t MAX_INT_CHAR_LEN = 32;
char data_buf[MAX_INT_CHAR_LEN];
// First print the number to the buffer, then append to the end
if (OB_FAIL(databuff_printf(data_buf, sizeof(data_buf), "%ld", int_val))) {
LOG_ERROR("databuff_printf fail", KR(ret), K(sizeof(data_buf)), K(data_buf), K(int_val));
} else if (OB_FAIL(append(data_buf))) {
LOG_ERROR("append string fail", KR(ret), K(sizeof(data_buf)), K(int_val));
} else {
// success
}
return ret;
}
// Supports calling the append function again after cstr
int ObLogAdaptString::cstr(const char *&str)
{
int ret = OB_SUCCESS;
if (buf_.get_data() == NULL) {
str = "";
}
// Require that there must be space left to store \0
else if (OB_UNLIKELY(buf_.get_remain() <= 0)) {
LOG_ERROR("remain buffer is not enough", K(buf_));
ret = OB_ERR_UNEXPECTED;
} else {
// Fill \0, but do not change the pos position, the purpose is to support continued filling
buf_.get_data()[buf_.get_position()] = '\0';
str = buf_.get_data();
}
return ret;
}
int ObLogAdaptString::alloc_buf_(const int64_t data_size, char *&data_buf)
{
static const int64_t STRING_DEFAULT_SIZE = 8 * _K_;
int ret = OB_SUCCESS;
// The prepared buffer should always be larger than the data length, as it will be filled with \0 at the end.
int64_t expected_buf_size = data_size + 1;
data_buf = NULL;
// First prepare the buffer, if the buffer is empty, then create a new buffer
if (NULL == buf_.get_data()) {
int64_t alloc_size = std::max(expected_buf_size, STRING_DEFAULT_SIZE);
char *new_buf = static_cast<char *>(ob_malloc(alloc_size, attr_));
if (OB_ISNULL(new_buf)) {
LOG_ERROR("allocate memory fail", K(new_buf), K(alloc_size), K(expected_buf_size));
ret = OB_ALLOCATE_MEMORY_FAILED;
} else if (OB_UNLIKELY(! buf_.set_data(new_buf, alloc_size))) {
LOG_ERROR("set data fail", K(buf_), K(new_buf), K(alloc_size));
ret = OB_ERR_UNEXPECTED;
}
}
// If there is not enough space left in the buffer, reallocate a larger space
else if (buf_.get_remain() < expected_buf_size) {
int64_t realloc_size = buf_.get_capacity() + std::max(expected_buf_size, STRING_DEFAULT_SIZE);
char *new_buf = static_cast<char *>(ob_realloc(buf_.get_data(), realloc_size, attr_));
if (OB_ISNULL(new_buf)) {
LOG_ERROR("realloc memory fail", K(new_buf), K(realloc_size), K(expected_buf_size));
ret = OB_ALLOCATE_MEMORY_FAILED;
} else {
int64_t pos = buf_.get_position();
buf_.reset();
if (OB_UNLIKELY(! buf_.set_data(new_buf, realloc_size))) {
LOG_ERROR("set data fail", K(buf_), K(new_buf), K(realloc_size));
ret = OB_ERR_UNEXPECTED;
}
// Reallocate previously allocated memory
else if (OB_ISNULL(buf_.alloc(pos))) {
LOG_ERROR("allocate old memory from buf fail", K(pos), K(buf_));
ret = OB_ERR_UNEXPECTED;
}
}
}
if (OB_SUCCESS == ret) {
// After the buffer is ready, allocate the memory, allocate the memory of the size of the data, here you can not allocate \0 memory, because it will repeatedly fill the data
// Allocate memory only if data_size is greater than 0
if (data_size > 0) {
if (OB_ISNULL(data_buf = static_cast<char *>(buf_.alloc(data_size)))) {
LOG_ERROR("allocate buffer fail", KR(ret), K(data_size), K(buf_));
ret = OB_ERR_UNEXPECTED;
} else {
// success
}
}
}
return ret;
}
}
}
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef OCEANBASE_OB_LOG_ADAPT_STRING_H__
#define OCEANBASE_OB_LOG_ADAPT_STRING_H__
#include "lib/alloc/alloc_struct.h" // ObMemAttr
#include "common/data_buffer.h" // ObDataBuffer
#include "ob_log_utils.h" // _K_
namespace oceanbase
{
namespace liboblog
{
class ObLogAdaptString
{
public:
explicit ObLogAdaptString(const char *label);
virtual ~ObLogAdaptString();
int append(const char *data);
int append_int64(const int64_t int_val);
// Supports calling append function again after cstr to fill
// If the user has not called the append function, the empty string is returned, for compatibility with std::string
int cstr(const char *&str);
public:
TO_STRING_KV(K_(buf));
private:
int alloc_buf_(const int64_t data_size, char *&data_buf);
private:
lib::ObMemAttr attr_;
common::ObDataBuffer buf_;
private:
DISALLOW_COPY_AND_ASSIGN(ObLogAdaptString);
};
}
}
#endif
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#define USING_LOG_PREFIX OBLOG
#include "ob_log_all_svr_cache.h"
#include "lib/allocator/ob_mod_define.h" // ObModIds
#include "lib/utility/ob_macro_utils.h" // OB_ISNULL, ...
#include "lib/oblog/ob_log_module.h" // LOG_*
#include "ob_log_instance.h" // IObLogErrHandler
#include "ob_log_systable_helper.h" // IObLogSysTableHelper
#include "ob_log_config.h" // ObLogConfig
using namespace oceanbase::common;
using namespace oceanbase::share;
namespace oceanbase
{
namespace liboblog
{
int64_t ObLogAllSvrCache::g_all_server_cache_update_interval=
ObLogConfig::default_all_server_cache_update_interval_sec * _SEC_;
int64_t ObLogAllSvrCache::g_all_zone_cache_update_interval=
ObLogConfig::default_all_zone_cache_update_interval_sec * _SEC_;
ObRegion ObLogAllSvrCache::g_assign_region=ObRegion("");
ObLogAllSvrCache::ObLogAllSvrCache() :
tid_(0),
err_handler_(NULL),
systable_helper_(NULL),
stop_flag_(true),
cur_version_(0),
cur_zone_version_(0),
zone_need_update_(false),
zone_last_update_tstamp_(OB_INVALID_TIMESTAMP),
is_region_info_valid_(true),
svr_map_(),
zone_map_()
{}
ObLogAllSvrCache::~ObLogAllSvrCache()
{
destroy();
}
const char *ObLogAllSvrCache::print_svr_status(StatusType status)
{
const char *str = "UNKNOWN";
int ret = OB_SUCCESS;
if (OB_FAIL(ObServerStatus::display_status_str(status, str))) {
str = "UNKNOWN";
}
return str;
}
bool ObLogAllSvrCache::is_svr_avail(const common::ObAddr &svr)
{
bool bool_ret = false;
RegionPriority region_priority = REGION_PRIORITY_UNKNOWN;
bool_ret = is_svr_avail(svr, region_priority);
return bool_ret;
}
bool ObLogAllSvrCache::is_svr_avail(
const common::ObAddr &svr,
RegionPriority &region_priority)
{
int ret = OB_SUCCESS;
bool bool_ret = false;
region_priority = REGION_PRIORITY_UNKNOWN;
SvrItem svr_item;
ZoneItem zone_item;
if (OB_FAIL(get_svr_item_(svr, svr_item))) {
bool_ret = false;
} else if (OB_FAIL(get_zone_item_(svr_item.zone_, zone_item))) {
LOG_ERROR("failed to get zone item", KR(ret), K(svr_item), K(zone_item));
} else if (is_svr_serve_(svr_item)) {
bool_ret = true;
// get region priority of server if server is available
region_priority = svr_item.region_priority_;
LOG_DEBUG("is svr avail", K(svr), K(region_priority), K(zone_item));
} else {
region_priority = REGION_PRIORITY_UNKNOWN;
LOG_DEBUG("svr not avail", K(svr), K(zone_item), K(svr_item));
}
return bool_ret;
}
int ObLogAllSvrCache::get_svr_item_(const common::ObAddr &svr, SvrItem &item)
{
int ret = OB_SUCCESS;
int64_t cur_ver = ATOMIC_LOAD(&cur_version_);
if (OB_FAIL(svr_map_.get(svr, item))) {
if (OB_ENTRY_NOT_EXIST != ret) {
LOG_ERROR("get server item from map fail", KR(ret), K(svr));
}
} else if (item.version_ < cur_ver) {
// treat as invalid record if version of data little than current version
ret = OB_ENTRY_NOT_EXIST;
} else {
// succ
}
LOG_DEBUG("[STAT] [ALL_SVR_CACHE] [GET_SVR_ITEM]", KR(ret), K(svr),
"status", OB_SUCCESS == ret ? print_svr_status(item.status_) : "NOT_EXIST",
"svr_ver", item.version_, K(cur_ver),
"zone", item.zone_,
"region_priority", item.region_priority_);
return ret;
}
int ObLogAllSvrCache::get_region_priority_(const common::ObRegion &region,
RegionPriority &priority)
{
int ret = OB_SUCCESS;
if (is_assign_region_(region)) {
// specified region
priority = REGION_PRIORITY_HIGH;
} else {
// other region or empty region
priority = REGION_PRIORITY_LOW;
}
LOG_DEBUG("get region priority", K(region), K(g_assign_region));
return ret;
};
bool ObLogAllSvrCache::is_svr_serve_(const SvrItem &svr_item) const
{
bool bool_ret = false;
StatusType status = svr_item.status_;
bool_ret = ObServerStatus::OB_SERVER_ACTIVE == status
|| ObServerStatus::OB_SERVER_DELETING == status;
return bool_ret;
}
bool ObLogAllSvrCache::is_assign_region_(const common::ObRegion &region) const
{
bool bool_ret = false;
// ignore case
bool_ret = (0 == strncasecmp(g_assign_region.ptr(),
region.ptr(),
g_assign_region.size()));
return bool_ret;
}
int ObLogAllSvrCache::init(IObLogSysTableHelper &systable_helper, IObLogErrHandler &err_handler)
{
int ret = OB_SUCCESS;
int pthread_ret = 0;
if (OB_FAIL(svr_map_.init(ObModIds::OB_LOG_ALL_SERVER_CACHE))) {
LOG_ERROR("init svr map fail", KR(ret));
} else if (OB_FAIL(zone_map_.init(ObModIds::OB_LOG_ALL_SERVER_CACHE))) {
LOG_ERROR("init zone map fail", KR(ret));
} else {
tid_ = 0;
stop_flag_ = false;
cur_version_ = 0;
cur_zone_version_ = 0;
zone_need_update_ = false;
zone_last_update_tstamp_ = OB_INVALID_TIMESTAMP;
is_region_info_valid_ = true;
err_handler_ = &err_handler;
systable_helper_ = &systable_helper;
LOG_INFO("init all svr cache succ");
if (OB_UNLIKELY(0 != (pthread_ret = pthread_create(&tid_, NULL, thread_func_, this)))) {
LOG_ERROR("create thread for all server cache fail", K(pthread_ret), KERRNOMSG(pthread_ret));
ret = OB_ERR_UNEXPECTED;
}
}
return ret;
}
void ObLogAllSvrCache::destroy()
{
stop_flag_ = true;
if (0 != tid_) {
int pthread_ret = pthread_join(tid_, NULL);
if (0 != pthread_ret) {
LOG_ERROR("pthread_join fail", K(tid_), K(pthread_ret), KERRNOMSG(pthread_ret));
}
tid_ = 0;
}
err_handler_ = NULL;
systable_helper_ = NULL;
cur_version_ = 0;
cur_zone_version_ = 0;
zone_need_update_ = false;
zone_last_update_tstamp_ = OB_INVALID_TIMESTAMP;
is_region_info_valid_ = true;
(void)svr_map_.destroy();
(void)zone_map_.destroy();
LOG_INFO("destroy all svr cache succ");
}
void *ObLogAllSvrCache::thread_func_(void *arg)
{
ObLogAllSvrCache *host = static_cast<ObLogAllSvrCache *>(arg);
if (NULL != host) {
host->run();
}
return NULL;
}
void ObLogAllSvrCache::run()
{
int ret = OB_SUCCESS;
LOG_INFO("all svr cache thread start");
while (! stop_flag_ && OB_SUCCESS == ret) {
if (need_update_zone_()) {
if (OB_FAIL(update_zone_cache_())) {
LOG_ERROR("update zone cache error", KR(ret));
} else if (OB_FAIL(purge_stale_zone_records_())) {
LOG_ERROR("purge stale records fail", KR(ret));
} else {
// do nothing
}
}
if (OB_SUCC(ret)) {
int64_t all_svr_cache_update_interval = ATOMIC_LOAD(&g_all_server_cache_update_interval);
if (REACH_TIME_INTERVAL(all_svr_cache_update_interval)) {
if (OB_FAIL(update_server_cache_())) {
LOG_ERROR("update server cache error", KR(ret));
} else if (OB_FAIL(purge_stale_records_())) {
LOG_ERROR("purge stale records fail", KR(ret));
} else {
// succ
}
}
}
// sleep
usec_sleep(USLEEP_INTERVAL);
}
if (OB_SUCCESS != ret) {
if (NULL != err_handler_) {
err_handler_->handle_error(ret, "all server cache update thread exits, err=%d", ret);
}
}
LOG_INFO("all svr cache thread stop", KR(ret));
}
void ObLogAllSvrCache::configure(const ObLogConfig & config)
{
int ret = OB_SUCCESS;
int64_t all_server_cache_update_interval_sec = config.all_server_cache_update_interval_sec;
ATOMIC_STORE(&g_all_server_cache_update_interval, all_server_cache_update_interval_sec * _SEC_);
int64_t all_zone_cache_update_interval_sec = config.all_zone_cache_update_interval_sec;
ATOMIC_STORE(&g_all_zone_cache_update_interval, all_zone_cache_update_interval_sec * _SEC_);
if (OB_FAIL(g_assign_region.assign(config.region.str()))) {
LOG_ERROR("g_assign_region assign fail", KR(ret), K(g_assign_region));
}
LOG_INFO("[CONFIG]", K(all_server_cache_update_interval_sec));
LOG_INFO("[CONFIG]", K(all_zone_cache_update_interval_sec));
LOG_INFO("[CONFIG]", K(g_assign_region));
}
bool ObLogAllSvrCache::need_update_zone_()
{
bool bool_ret = false;
int64_t all_zone_cache_update_interval = ATOMIC_LOAD(&g_all_zone_cache_update_interval);
bool is_region_info_valid = ATOMIC_LOAD(&is_region_info_valid_);
int64_t update_delta_time = get_timestamp() - zone_last_update_tstamp_;
if (!is_region_info_valid) {
bool_ret = false;
}
// need update if never update
else if (OB_INVALID_TIMESTAMP == zone_last_update_tstamp_) {
bool_ret = true;
}
// update if set zone_need_update_
else if (zone_need_update_) {
bool_ret = true;
}
// update by interval
else if (update_delta_time >= all_zone_cache_update_interval) {
bool_ret = true;
}
return bool_ret;
}
int ObLogAllSvrCache::update_zone_cache_()
{
int ret = OB_SUCCESS;
IObLogSysTableHelper::AllZoneRecordArray record_array;
IObLogSysTableHelper::AllZoneTypeRecordArray zone_type_record_array;
record_array.reset();
if (OB_ISNULL(systable_helper_)) {
LOG_ERROR("invalid systable helper", K(systable_helper_));
ret = OB_INVALID_ARGUMENT;
} else if (OB_FAIL(systable_helper_->query_all_zone_info(record_array))) {
if (OB_NEED_RETRY == ret) {
LOG_WARN("query all zone info need retry", KR(ret));
ret = OB_SUCCESS;
} else if (OB_ITEM_NOT_SETTED == ret) {
ATOMIC_STORE(&is_region_info_valid_, false);
LOG_INFO("'region' is not availalbe in __all_zone table. would not update zone cache",
K_(is_region_info_valid));
ret = OB_SUCCESS;
} else {
LOG_ERROR("query all zone info fail", KR(ret));
}
} else if (OB_FAIL(systable_helper_->query_all_zone_type(zone_type_record_array))) {
if (OB_NEED_RETRY == ret) {
LOG_WARN("query all zone type need retry", KR(ret));
ret = OB_SUCCESS;
} else if (OB_ITEM_NOT_SETTED == ret) {
LOG_INFO("'zone_type' is not availalbe in __all_zone table. would not update zone cache", K(zone_type_record_array));
ret = OB_SUCCESS;
} else {
LOG_ERROR("query all zone type fail", KR(ret));
}
} else {
int64_t next_version = cur_zone_version_ + 1;
for (int64_t index = 0; OB_SUCCESS == ret && index < record_array.count(); index++) {
IObLogSysTableHelper::AllZoneRecord &record = record_array.at(index);
common::ObZone &zone = record.zone_;
common::ObRegion &region = record.region_;
_LOG_INFO("[STAT] [ALL_ZONE] INDEX=%ld/%ld ZONE=%s REGION=%s VERSION=%lu",
index, record_array.count(), to_cstring(zone), to_cstring(region), next_version);
ZoneItem item;
item.reset(next_version, region);
LOG_DEBUG("update zone cache item", K(zone), K(item));
if (OB_FAIL(zone_map_.insert_or_update(zone, item))) {
LOG_ERROR("zone_map_ insert_or_update fail", KR(ret), K(zone), K(item));
}
}
for (int64_t idx = 0; OB_SUCCESS == ret && idx < zone_type_record_array.count(); idx ++) {
ZoneItem item;
IObLogSysTableHelper::AllZoneTypeRecord &record = zone_type_record_array.at(idx);
common::ObZone &zone = record.zone_;
common::ObZoneType &zone_type = record.zone_type_;
if (OB_FAIL(get_zone_item_(zone, item))) {
LOG_ERROR("fail to get zone item from cache by zone", KR(ret), K(zone));
} else {
item.set_zone_type(zone_type);
if (OB_FAIL(zone_map_.insert_or_update(zone, item))) {
LOG_ERROR("zone_map_ insert_or_update set zone_type fail", KR(ret), K(zone), K(item), K(zone_type));
}
}
_LOG_INFO("[STAT] [ALL_ZONE] INDEX=%ld/%ld ZONE=%s ZONE_TYPE=%s VERSION=%lu",
idx, zone_type_record_array.count(), to_cstring(zone), zone_type_to_str(item.get_zone_type()), next_version);
}
ATOMIC_INC(&cur_zone_version_);
_LOG_INFO("[STAT] [ALL_ZONE] COUNT=%ld VERSION=%lu", record_array.count(), cur_zone_version_);
}
if (OB_SUCC(ret)) {
zone_need_update_ = false;
zone_last_update_tstamp_ = get_timestamp();
}
return ret;
}
int ObLogAllSvrCache::update_server_cache_()
{
int ret = OB_SUCCESS;
IObLogSysTableHelper::AllServerRecordArray record_array(common::ObModIds::OB_LOG_ALL_SERVER_ARRAY, common::OB_MALLOC_NORMAL_BLOCK_SIZE);
record_array.reset();
if (OB_ISNULL(systable_helper_)) {
LOG_ERROR("invalid systable helper", K(systable_helper_));
ret = OB_INVALID_ARGUMENT;
} else if (OB_FAIL(systable_helper_->query_all_server_info(record_array))) {
if (OB_NEED_RETRY == ret) {
LOG_WARN("query all server info need retry", KR(ret));
ret = OB_SUCCESS;
} else {
LOG_ERROR("query all server info fail", KR(ret));
}
} else {
int64_t next_version = cur_version_ + 1;
for (int64_t index = 0; OB_SUCCESS == ret && index < record_array.count(); index++) {
IObLogSysTableHelper::AllServerRecord &record = record_array.at(index);
ObAddr svr;
svr.set_ip_addr(record.svr_ip_, record.svr_port_);
const char *status_str = NULL;
int tmp_ret = ObServerStatus::display_status_str(record.status_, status_str);
if (OB_SUCCESS != tmp_ret) {
LOG_ERROR("invalid server status, can not cast to string", K(tmp_ret),
K(record.status_), K(svr));
}
ZoneItem zone_item;
RegionPriority region_priority = REGION_PRIORITY_UNKNOWN;
if (OB_FAIL(get_zone_item_(record.zone_, zone_item))) {
LOG_ERROR("get_zone_item_ fail", KR(ret), "zone", record.zone_, K(zone_item));
} else if (OB_FAIL(get_region_priority_(zone_item.region_, region_priority))) {
LOG_ERROR("get priority based region fail", KR(ret), K(svr),
"region", zone_item.region_,
"region_priority", print_region_priority(region_priority));
} else {
SvrItem item;
item.reset(record.status_, next_version, record.zone_, region_priority);
LOG_DEBUG("update cache item", K(item));
if (OB_FAIL(svr_map_.insert_or_update(svr, item))) {
LOG_ERROR("svr_map_ insert_or_update fail", KR(ret), K(svr), K(item));
}
}
_LOG_INFO("[STAT] [ALL_SERVER_LIST] INDEX=%ld/%ld SERVER=%s STATUS=%d(%s) "
"ZONE=%s REGION=%s(%s) VERSION=%lu",
index, record_array.count(), to_cstring(svr), record.status_, status_str,
to_cstring(record.zone_), to_cstring(zone_item.region_),
print_region_priority(region_priority), next_version);
}
ATOMIC_INC(&cur_version_);
_LOG_INFO("[STAT] [ALL_SERVER_LIST] COUNT=%ld VERSION=%lu", record_array.count(), cur_version_);
}
return ret;
}
int ObLogAllSvrCache::get_zone_item_(const common::ObZone &zone,
ZoneItem &zone_item)
{
int ret = OB_SUCCESS;
bool is_region_info_valid = ATOMIC_LOAD(&is_region_info_valid_);
zone_item.reset();
if (!is_region_info_valid) {
LOG_DEBUG("region is invalid, do not use");
} else {
if (OB_FAIL(zone_map_.get(zone, zone_item))) {
if (OB_ENTRY_NOT_EXIST != ret) {
LOG_ERROR("zone_map_ get zone_item fail", KR(ret), K(zone), K(zone_item));
} else {
// update all zone cache if can't get region info in __zone_map
zone_need_update_ = true;
LOG_DEBUG("zone_map_ get zone_item not exist, need update", KR(ret), K(zone));
}
}
int64_t cur_zone_ver = ATOMIC_LOAD(&cur_zone_version_);
if (OB_SUCCESS == ret) {
if (zone_item.version_ < cur_zone_ver) {
// treate as invalid record if version little than current version
ret = OB_ENTRY_NOT_EXIST;
} else {
// do nothing
}
}
}
if (OB_ENTRY_NOT_EXIST == ret) {
ret = OB_SUCCESS;
zone_item.reset();
}
return ret;
}
int ObLogAllSvrCache::purge_stale_records_()
{
int ret = OB_SUCCESS;
StaleRecPurger purger(cur_version_);
if (OB_FAIL(svr_map_.remove_if(purger))) {
LOG_ERROR("remove if fail", KR(ret), K(cur_version_));
} else {
_LOG_INFO("[STAT] [ALL_SERVER_LIST] [PURGE] PURGE_COUNT=%ld CUR_COUNT=%ld VERSION=%lu",
purger.purge_count_, svr_map_.count(), cur_version_);
}
return ret;
}
int ObLogAllSvrCache::purge_stale_zone_records_()
{
int ret = OB_SUCCESS;
StaleZoneRecPurger purger(cur_zone_version_);
if (OB_FAIL(zone_map_.remove_if(purger))) {
LOG_ERROR("zone_map_ remove if fail", KR(ret), K(cur_zone_version_));
} else {
_LOG_INFO("[STAT] [ALL_ZONE] [PURGE] PURGE_COUNT=%ld CUR_COUNT=%ld VERSION=%lu",
purger.purge_count_, zone_map_.count(), cur_zone_version_);
}
return ret;
}
bool ObLogAllSvrCache::StaleRecPurger::operator()(const common::ObAddr &svr,
const SvrItem &svr_item)
{
bool need_purge = (svr_item.version_ < cur_ver_);
if (need_purge) {
purge_count_++;
_LOG_INFO("[STAT] [ALL_SERVER_LIST] [PURGE] SERVER=%s VERSION=%lu/%lu",
to_cstring(svr), svr_item.version_, cur_ver_);
}
return need_purge;
}
bool ObLogAllSvrCache::StaleZoneRecPurger::operator()(const common::ObZone &zone,
const ZoneItem &zone_item)
{
bool need_purge = (zone_item.version_ < cur_ver_);
if (need_purge) {
purge_count_++;
_LOG_INFO("[STAT] [ALL_ZONE] [PURGE] ZONE=%s VERSION=%lu/%lu",
zone.ptr(), zone_item.version_, cur_ver_);
}
return need_purge;
}
void ObLogAllSvrCache::set_update_interval_(const int64_t time)
{
ATOMIC_STORE(&g_all_server_cache_update_interval, time);
}
}
}
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef OCEANBASE_LIBOBLOG_OB_LOG_ALL_SVR_CACHE_H__
#define OCEANBASE_LIBOBLOG_OB_LOG_ALL_SVR_CACHE_H__
#include <pthread.h> // pthread_*
#include "lib/net/ob_addr.h" // ObAddr
#include "lib/hash/ob_linear_hash_map.h" // ObLinearHashMap
#include "common/ob_zone.h" // ObZone
#include "common/ob_zone_type.h" // ObZoneType
#include "common/ob_region.h" // ObRegin
#include "share/ob_server_status.h" // ObServerStatus
#include "ob_log_utils.h" // _SEC_
#include "ob_log_server_priority.h" // RegionPriority
namespace oceanbase
{
namespace liboblog
{
class IObLogAllSvrCache
{
public:
typedef share::ObServerStatus::DisplayStatus StatusType;
static const int64_t USLEEP_INTERVAL = 1 * 1000 * 1000;
public:
virtual ~IObLogAllSvrCache() {}
// check server is available or not
// server avail means server can serve for RPC(locate start log id, fetch log, ...)
virtual bool is_svr_avail(const common::ObAddr &svr) = 0;
// 1. check server is available or not
// 2. if svr is available, return region priority of region, otherwise return REGION_PRIORITY_UNKNOWN
// server is available means:
// 1. server status is ACTIVE or DELETING
// 2. server not in ENCRYPTION zone
//
// @param [in] svr server addr
// @param [out] region_priority region priority
//
// @retval true server is available
// @retval false server is not available
virtual bool is_svr_avail(
const common::ObAddr &svr,
RegionPriority &region_priority) = 0;
};
///////////////////// ObLogAllSvrCache //////////////////////
class IObLogErrHandler;
class IObLogSysTableHelper;
class ObLogConfig;
class ObLogAllSvrCache : public IObLogAllSvrCache
{
// class static variables
public:
static int64_t g_all_server_cache_update_interval;
static int64_t g_all_zone_cache_update_interval;
// specified region(used for svr priority)
static common::ObRegion g_assign_region;
public:
ObLogAllSvrCache();
virtual ~ObLogAllSvrCache();
public:
virtual bool is_svr_avail(const common::ObAddr &svr);
virtual bool is_svr_avail(
const common::ObAddr &svr,
RegionPriority &region_priority);
static const char *print_svr_status(StatusType status);
public:
int init(IObLogSysTableHelper &systable_helper, IObLogErrHandler &err_handler);
void destroy();
void run();
public:
static void configure(const ObLogConfig & config);
private:
struct SvrItem;
int get_svr_item_(const common::ObAddr &svr, SvrItem &item);
// 1. get region of specified zone from zone_map_
// 2. refresh cache of __all_zone and retry query if query return ret == OB_ENTRY_NOT_EXIST
struct ZoneItem;
int get_zone_item_(const common::ObZone &zone, ZoneItem &zone_item);
// two priroity of region, named from high to low:
// 1. region_priority = REGION_PRIORITY_HIGH if current region = specified region(g_assign_zone)
// 2. other region or empty region(no retion info for lower version of observer)
// region_priority = REGION_PRIORITY_LOW
int get_region_priority_(const common::ObRegion &region, RegionPriority &priority);
bool is_assign_region_(const common::ObRegion &region) const;
static void *thread_func_(void *arg);
bool need_update_zone_();
int update_zone_cache_();
int update_server_cache_();
int purge_stale_records_();
int purge_stale_zone_records_();
// NOTE: server serve in such cases:
// 1. server status is ACTIVE or DELETING
bool is_svr_serve_(const SvrItem &svr_item) const;
private:
struct SvrItem
{
StatusType status_;
uint64_t version_;
common::ObZone zone_;
RegionPriority region_priority_;
void reset(const StatusType status,
const uint64_t version,
const common::ObZone &zone,
const RegionPriority region_priority)
{
status_ = status;
version_ = version;
zone_ = zone;
region_priority_ = region_priority;
}
TO_STRING_KV(K_(status), K_(version), K_(zone),
"region_priority", print_region_priority(region_priority_));
};
typedef common::ObLinearHashMap<common::ObAddr, SvrItem> SvrMap;
struct ZoneItem
{
// Compatibility: the observer of the lower version does not have the region field,
// and the region is empty by default
common::ObRegion region_;
common::ObZoneType zone_type_;
uint64_t version_;
void reset()
{
version_ = -1;
region_.reset();
zone_type_ = common::ZONE_TYPE_INVALID;
}
void reset(const uint64_t version,
const common::ObRegion &region)
{
version_ = version;
region_ = region;
zone_type_ = common::ZONE_TYPE_INVALID;
}
void set_zone_type(const common::ObZoneType &zone_type)
{
zone_type_ = zone_type;
}
const common::ObZoneType& get_zone_type() const { return zone_type_; }
TO_STRING_KV(K_(region), "zone_type", zone_type_to_str(zone_type_), K_(version));
};
typedef common::ObLinearHashMap<common::ObZone, ZoneItem> ZoneMap;
struct StaleRecPurger
{
uint64_t cur_ver_;
int64_t purge_count_;
explicit StaleRecPurger(const int64_t ver) : cur_ver_(ver), purge_count_(0)
{}
bool operator()(const common::ObAddr &svr, const SvrItem &svr_item);
};
struct StaleZoneRecPurger
{
uint64_t cur_ver_;
int64_t purge_count_;
explicit StaleZoneRecPurger(const int64_t ver) : cur_ver_(ver), purge_count_(0)
{}
bool operator()(const common::ObZone &zone, const ZoneItem &zone_item);
};
// set g_all_server_cache_update_interval for unitest
static void set_update_interval_(const int64_t time);
private:
pthread_t tid_;
IObLogErrHandler *err_handler_;
IObLogSysTableHelper *systable_helper_;
bool stop_flag_ CACHE_ALIGNED;
uint64_t cur_version_ CACHE_ALIGNED;
uint64_t cur_zone_version_ CACHE_ALIGNED;
bool zone_need_update_;
int64_t zone_last_update_tstamp_;
// For low version observer compatibility, region information exists by default,
// if update_zone_cache does not query the record, no region information exists
bool is_region_info_valid_ CACHE_ALIGNED;
SvrMap svr_map_;
ZoneMap zone_map_;
private:
DISALLOW_COPY_AND_ASSIGN(ObLogAllSvrCache);
};
}
}
#endif
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#define USING_LOG_PREFIX OBLOG
#include <MetaInfo.h> // ITableMeta
#include "ob_log_binlog_record.h"
#include "ob_log_utils.h"
#include "ob_log_instance.h" // TCTX
using namespace oceanbase::common;
namespace oceanbase
{
namespace liboblog
{
const char *ObLogBR::COLUMN_CHANGED_LABEL_PTR = "";
const char *ObLogBR::COLUMN_UNCHANGED_LABEL_PTR = NULL;
ObLogBR::ObLogBR() : ObLogResourceRecycleTask(ObLogResourceRecycleTask::BINLOG_RECORD_TASK),
data_(NULL),
is_serilized_(false),
host_(NULL),
log_entry_task_(NULL),
next_(NULL),
valid_(true),
precise_timestamp_(0),
freeze_version_(),
tenant_id_(OB_INVALID_TENANT_ID),
ddl_schema_version_(OB_INVALID_VERSION),
part_trans_task_count_(0)
{
}
ObLogBR::~ObLogBR()
{
reset();
destruct_data_();
}
void ObLogBR::construct_data_(const bool creating_binlog_record)
{
data_ = LogMsgFactory::createLogRecord(TCTX.drc_message_factory_binlog_record_type_, creating_binlog_record);
if (OB_ISNULL(data_)) {
OBLOG_LOG(ERROR, "LogMsgFactory::createLogRecord fails");
} else {
// set user data pointer to the pointer hold the binlog record
data_->setUserData(this);
}
}
void ObLogBR::destruct_data_()
{
if (NULL != data_) {
LogMsgFactory::destroy(data_);
data_ = NULL;
}
}
void ObLogBR::reset()
{
if (NULL != data_) {
data_->clear();
// note reset all filed used by liboblog, cause clear() may won't reset fields
// clear new/old column array
data_->setNewColumn(NULL, 0);
data_->setOldColumn(NULL, 0);
// clear TableMeta and IDBMeta
data_->setTableMeta(NULL);
data_->setTbname(NULL);
data_->setDBMeta(NULL);
data_->setDbname(NULL);
// set user data pointer to the pointer hold the binlog record
data_->setUserData(this);
}
host_ = NULL;
log_entry_task_ = NULL;
next_ = NULL;
valid_ = true;
precise_timestamp_ = 0;
freeze_version_.reset();
tenant_id_ = OB_INVALID_TENANT_ID;
part_trans_task_count_ = 0;
}
int ObLogBR::set_table_meta(ITableMeta *table_meta)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(data_)) {
LOG_ERROR("ILogRecord has not been created");
ret = OB_NOT_INIT;
} else if (OB_ISNULL(table_meta)) {
LOG_ERROR("invalid argument", K(table_meta));
ret = OB_INVALID_ARGUMENT;
} else {
data_->setTableMeta(table_meta);
data_->setTbname(table_meta->getName());
}
return ret;
}
int ObLogBR::set_db_meta(IDBMeta *db_meta)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(data_)) {
LOG_ERROR("ILogRecord has not been created");
ret = OB_NOT_INIT;
} else if (OB_ISNULL(db_meta)) {
LOG_ERROR("invalid argument", K(db_meta));
ret = OB_INVALID_ARGUMENT;
} else {
data_->setDBMeta(db_meta);
data_->setDbname(db_meta->getName());
}
return ret;
}
int ObLogBR::init_dml_data_first(const RecordType type,
const uint64_t tenant_id)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(data_)) {
LOG_ERROR("ILogRecord has not been created");
ret = OB_NOT_INIT;
} else if (OB_UNLIKELY(EUNKNOWN == type)
|| OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id)) {
LOG_ERROR("invalid argument", K(type), K(tenant_id));
ret = OB_INVALID_ARGUMENT;
} else if (OB_FAIL(init_binlog_record_first_(type))) {
LOG_ERROR("init_binlog_record_first_ fail", KR(ret), K(type), K(tenant_id));
} else {
tenant_id_ = tenant_id;
set_next(NULL);
valid_ = true;
}
return ret;
}
int ObLogBR::init_dml_data_second(const RecordType type,
const uint64_t cluster_id,
const int64_t tenant_id,
const common::ObString &trace_id,
const common::ObString &trace_info,
const common::ObString &unique_id,
const common::ObVersion &freeze_version,
const int64_t commit_version)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(data_)) {
LOG_ERROR("ILogRecord has not been created");
ret = OB_NOT_INIT;
} else if (OB_FAIL(init_binlog_record_second_(type, cluster_id, trace_id, trace_info, unique_id, commit_version))) {
LOG_ERROR("init_binlog_record_second_ fail", KR(ret), K(type), K(cluster_id), K(trace_id), K(trace_info),
K(unique_id), K(commit_version));
} else {
LOG_DEBUG("init_dml_data_second succ", "type", print_record_type(type), K(cluster_id), K(tenant_id),
K(trace_id), K(trace_info), K(unique_id), K(commit_version));
set_precise_timestamp(commit_version);
freeze_version_ = freeze_version;
}
return ret;
}
int ObLogBR::init_data(const RecordType type,
const uint64_t cluster_id,
const uint64_t tenant_id,
const int64_t ddl_schema_version,
const common::ObString &trace_id,
const common::ObString &trace_info,
const common::ObString &unique_id,
const common::ObVersion &freeze_version,
const int64_t commit_version,
const int64_t part_trans_task_count,
const common::ObString *major_version_str)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(data_)) {
LOG_ERROR("ILogRecord has not been created");
ret = OB_NOT_INIT;
} else if (OB_UNLIKELY(EUNKNOWN == type)
|| OB_UNLIKELY(! is_valid_cluster_id(cluster_id))
|| OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id)
|| OB_UNLIKELY(commit_version <= 0)
|| OB_UNLIKELY(OB_INVALID_TIMESTAMP == ddl_schema_version)) {
LOG_ERROR("invalid argument", K(type), K(cluster_id), K(commit_version), K(tenant_id), K(ddl_schema_version));
ret = OB_INVALID_ARGUMENT;
} else if (OB_FAIL(verify_part_trans_task_count_(type, part_trans_task_count))) {
LOG_ERROR("verify_part_trans_task_count_ fail", KR(ret), K(type),
"type", print_record_type(type), K(part_trans_task_count));
} else if (OB_FAIL(init_binlog_record_(type, cluster_id, trace_id, trace_info, unique_id,
commit_version, major_version_str))) {
LOG_ERROR("init_binlog_record_ fail", KR(ret), K(type), K(cluster_id), K(tenant_id), K(trace_id), K(trace_info),
K(unique_id), K(commit_version), K(major_version_str));
} else {
set_precise_timestamp(commit_version);
tenant_id_ = tenant_id;
freeze_version_ = freeze_version;
ddl_schema_version_ = ddl_schema_version;
set_next(NULL);
part_trans_task_count_ = part_trans_task_count;
valid_ = true;
}
return ret;
}
int ObLogBR::init_binlog_record_(const RecordType type,
const uint64_t cluster_id,
const common::ObString &trace_id,
const common::ObString &trace_info,
const common::ObString &unique_id,
const int64_t commit_version,
const common::ObString *major_version_str)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(data_)) {
LOG_ERROR("ILogRecord has not been created");
ret = OB_NOT_INIT;
} else if (OB_FAIL(init_binlog_record_first_(type))) {
LOG_ERROR("init_binlog_record_first_ fail", KR(ret), K(type));
} else if (OB_FAIL(init_binlog_record_second_(type, cluster_id, trace_id, trace_info, unique_id,
commit_version, major_version_str))) {
LOG_ERROR("init_binlog_record_second_ fail", KR(ret), K(type), K(cluster_id), K(trace_id), K(trace_info),
K(unique_id), K(commit_version), K(major_version_str));
} else {
// succ
}
return ret;
}
int ObLogBR::init_binlog_record_first_(const RecordType type)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(data_)) {
LOG_ERROR("ILogRecord has not been created");
ret = OB_NOT_INIT;
} else {
// set to invalid_data
int src_category = SRC_NO;
// NOTE: init checkpint to 0 (sec/microsecond)
uint64_t checkpoint_sec = 0;
uint64_t checkpoint_usec = 0;
data_->setRecordType(type);
data_->setSrcCategory(src_category);
data_->setCheckpoint(checkpoint_sec, checkpoint_usec);
data_->setId(0); // always set id to 0
data_->setSrcType(SRC_OCEANBASE_1_0); // for OB 1.0
// means that two consecutive statements operate on different fields
// set this field to true for performance
data_->setFirstInLogevent(true);
}
return ret;
}
int ObLogBR::init_binlog_record_second_(const RecordType type,
const uint64_t cluster_id,
const common::ObString &trace_id,
const common::ObString &trace_info,
const common::ObString &unique_id,
const int64_t commit_version,
const common::ObString *major_version_str)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(data_)) {
LOG_ERROR("ILogRecord has not been created");
ret = OB_NOT_INIT;
} else {
// treate cluster_id as thread_id
// convert from 64 bit to 32 bit
data_->setThreadId(static_cast<uint32_t>(cluster_id));
// set trans commit timestamp (second)
data_->setTimestamp(commit_version / 1000000);
// set trans commit timestamp (microsecond)
// note: combine getTimestamp() and getRecordUsec() as complete trans commit timestamp
data_->setRecordUsec(static_cast<uint32_t>(commit_version % 1000000));
// won't use this field
data_->putFilterRuleVal("0", 1);
// set unique id to binlog record
data_->putFilterRuleVal(unique_id.ptr(), unique_id.length());
// set OBTraceID
data_->putFilterRuleVal(trace_id.ptr(), trace_id.length());
// TODO setObTraceInfo has bug, relay on drc message support in new release
UNUSED(trace_info);
// data_->setObTraceInfo(trace_info.ptr());
// put major version(from int32_t to char*) to the forth field
if (EBEGIN == type) {
if (OB_ISNULL(major_version_str)) {
LOG_ERROR("major version str for EBEGIN statement should not be null!", KR(ret), K(cluster_id),
K(type), K(trace_id));
} else {
data_->putFilterRuleVal(major_version_str->ptr(), major_version_str->length());
}
}
}
return ret;
}
int ObLogBR::put_old(ILogRecord *br, const bool is_changed)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(br)) {
LOG_ERROR("invalid argument", K(br));
ret = OB_INVALID_ARGUMENT;
} else {
// DRC proto
// mark value of OldCol to empty string, use global unique empty string value
// value of unchanged OldCol as NULL
const char *val = is_changed ? ObLogBR::COLUMN_CHANGED_LABEL_PTR :
ObLogBR::COLUMN_UNCHANGED_LABEL_PTR;
int64_t pos = (NULL == val ? 0 : strlen(val));
(void)br->putOld(val, static_cast<int>(pos));
}
return ret;
}
int ObLogBR::get_record_type(int &record_type)
{
int ret = OB_SUCCESS;
record_type = 0;
if (OB_ISNULL(data_)) {
LOG_ERROR("data_ is null", K(data_));
ret = OB_ERR_UNEXPECTED;
} else {
record_type = data_->recordType();
}
return ret;
}
int ObLogBR::setInsertRecordTypeForHBasePut(const RecordType type)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(data_)) {
LOG_ERROR("ILogRecord has not been created");
ret = OB_NOT_INIT;
} else if (OB_UNLIKELY(EINSERT != type)) {
LOG_ERROR("invalid argument", "type", print_record_type(type));
} else {
data_->setRecordType(type);
}
return ret;
}
int ObLogBR::verify_part_trans_task_count_(const RecordType type,
const int64_t part_trans_task_count)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(EUNKNOWN == type)) {
LOG_ERROR("invalid argument", K(type), "type", print_record_type(type));
ret = OB_INVALID_ARGUMENT;
} else {
if ((EDDL == type) || (EBEGIN == type) || (ECOMMIT == type)) {
// verify part_trans_task_count, should greater than 0 if DDL/BEGIN/COMMIT
if (OB_UNLIKELY(part_trans_task_count <= 0)) {
LOG_ERROR("part_trans_task_count is not greater than 0", K(type),
"type", print_record_type(type), K(part_trans_task_count));
ret = OB_ERR_UNEXPECTED;
} else {
// do nothing
}
}
}
return ret;
}
// unserilized Binlog record
ObLogUnserilizedBR::ObLogUnserilizedBR() : ObLogBR()
{
construct_unserilized_data_();
ObLogBR::reset();
}
ObLogUnserilizedBR::~ObLogUnserilizedBR()
{
}
void ObLogUnserilizedBR::construct_unserilized_data_()
{
const bool creating_binlog_record = true;
construct_data_(creating_binlog_record);
}
// serilized Binlog Record
ObLogSerilizedBR::ObLogSerilizedBR() : ObLogBR()
{
construct_serilized_data_();
ObLogBR::reset();
}
ObLogSerilizedBR::~ObLogSerilizedBR()
{
}
void ObLogSerilizedBR::construct_serilized_data_()
{
const bool creating_binlog_record = false;
construct_data_(creating_binlog_record);
}
} // end namespace liboblog
} // end namespace oceanbase
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef OCEANBASE_LIBOBLOG_BINLOG_RECORD_
#define OCEANBASE_LIBOBLOG_BINLOG_RECORD_
#include <LogRecord.h> // ILogRecord
#include <LogMsgFactory.h> // createLogRecord
#include "share/ob_define.h"
#include "lib/oblog/ob_log_module.h" // OBLOG_LOG
#include "lib/string/ob_string.h" // ObString
#include "common/ob_range.h" // ObVersion
#include "ob_log_resource_recycle_task.h" // ObLogResourceRecycleTask
using namespace oceanbase::logmessage;
namespace oceanbase
{
namespace liboblog
{
class ObLogBR : public ObLogResourceRecycleTask
{
public:
static const char *COLUMN_CHANGED_LABEL_PTR;
static const char *COLUMN_UNCHANGED_LABEL_PTR;
public:
ObLogBR();
virtual ~ObLogBR();
public:
static int put_old(ILogRecord *br, const bool is_changed);
public:
void reset();
// init INSERT/UPDATE/DELETE Binlog Record
// set record_type/srcCategory/checkpoint/scrType/firstInLogevent
int init_dml_data_first(const RecordType type,
const uint64_t tenant_id);
// read persist data, fill data after deserialize
//
// INSERT/UPDATE/DELETE
// threadId/timestamp(checkpoint)/filterRuleVal
int init_dml_data_second(const RecordType type,
const uint64_t cluster_id,
const int64_t tenant_id,
const common::ObString &trace_id,
const common::ObString &trace_info,
const common::ObString &unique_id,
const common::ObVersion &freeze_version,
const int64_t commit_version);
// init Binlog Record of DDL/BEGIN/COMMIT
int init_data(const RecordType type,
const uint64_t cluster_id,
const uint64_t tenant_id,
const int64_t ddl_schema_version,
const common::ObString &trace_id,
const common::ObString &trace_info,
const common::ObString &unique_id,
const common::ObVersion &freeze_version,
const int64_t commit_version,
const int64_t part_trans_task_count = 0,
const common::ObString *major_version_str = NULL);
ILogRecord *get_data() { return data_; }
int get_record_type(int &record_type);
void set_next(ObLogBR *next) {next_ = next;};
ObLogBR *get_next() {return next_;};
void set_is_valid(const bool is_valid) { valid_ = is_valid; }
bool is_valid() const { return valid_; }
int set_table_meta(ITableMeta *table_meta);
int set_db_meta(IDBMeta *db_meta);
inline void set_precise_timestamp(int64_t precise_timestamp) { precise_timestamp_ = precise_timestamp; }
inline int64_t get_precise_timestamp() const { return precise_timestamp_; }
inline void *get_host() { return host_; }
void set_host(void *host) { host_ = host; }
inline void *get_log_entry_task() { return log_entry_task_; }
void set_log_entry_task(void *log_entry_task) { log_entry_task_ = log_entry_task; }
inline bool is_serilized() const { return is_serilized_; }
void set_serilized(const bool is_serilized) { is_serilized_ = is_serilized; }
int32_t get_major_version() const { return freeze_version_.major_; }
uint64_t get_tenant_id() const { return tenant_id_; }
int64_t get_ddl_schema_version() const { return ddl_schema_version_; }
int64_t get_part_trans_task_count() const { return part_trans_task_count_; }
// for put operation of HBASE: store data type as update, new value use full-column mode, old value is empty
// special treatment for liboblog:
// TODO:observer add new dml operation type to represend PUT operation
int setInsertRecordTypeForHBasePut(const RecordType type);
public:
TO_STRING_KV("is_ser", is_serilized_,
K_(valid));
private:
int verify_part_trans_task_count_(const RecordType type,
const int64_t part_trans_task_count);
// unique id of BinlogRecord: Pkey + LogId + row_index
// 1. DML statement
// set unique id by puFilterRuleVal
// 2. DDL statement
// set tenant id and schema version
int init_binlog_record_(const RecordType type,
const uint64_t cluster_id,
const common::ObString &trace_id,
const common::ObString &trace_info,
const common::ObString &unique_id,
const int64_t commit_version,
const common::ObString *major_version_str = NULL);
int init_binlog_record_first_(const RecordType type);
int init_binlog_record_second_(const RecordType type,
const uint64_t cluster_id,
const common::ObString &trace_id,
const common::ObString &trace_info,
const common::ObString &unique_id,
const int64_t commit_version,
const common::ObString *major_version_str = NULL);
private:
ILogRecord *data_; ///< real BinlogRecord
// is binglog record serilized, reference construct_data_ function implementation
// 1. If binglog record has not been serilized, is_serilized_ = false
// 2. If binglog record parse from string, is_serilized_ = true
// is_serilized_ can only be modified by set_serilized function
bool is_serilized_;
void *host_; ///< record corresponsding RowIndex
void *log_entry_task_;
ObLogBR *next_;
bool valid_; ///< statement is valid or not
int64_t precise_timestamp_; ///< precise timestamp in micro seconds
common::ObVersion freeze_version_;
uint64_t tenant_id_;
// corresponding schema version for DDL
// use schema version allocated by Sequencer for DML
// 0 for HEARTBEAT
int64_t ddl_schema_version_;
// Number of tasks in the transaction partition, i.e. number of participants in the transaction
// 1. DDL
// 2. DML begin/commit binglog record will carry this info
int64_t part_trans_task_count_;
protected:
/*
* LogMsgFactory
* static ILogRecord* createLogRecord(
* const std::string& type = DFT_BR, bool creating = true);
*
* @param creating is to differentiate two kinds of usage, if creating is
* true, it means the created binlog record has not been
* serilized, all in-memory functions can be called. Otherwise
* if creating is false, only after-serialized function could
* be called
*/
void construct_data_(const bool creating_binlog_record);
private:
void destruct_data_();
};
class ObLogUnserilizedBR : public ObLogBR
{
public:
ObLogUnserilizedBR();
virtual ~ObLogUnserilizedBR();
private:
// Build unserialized ILogRecord, in-memory operations
void construct_unserilized_data_();
};
class ObLogSerilizedBR : public ObLogBR
{
public:
ObLogSerilizedBR();
virtual ~ObLogSerilizedBR();
private:
// Build serialized ILogRecord, parse based on persistent data
void construct_serilized_data_();
};
} // end namespace liboblog
} // end namespace oceanbase
#endif // end OCEANBASE_LIBOBLOG_BINLOG_RECORD_
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#define USING_LOG_PREFIX OBLOG
#include "ob_log_binlog_record_pool.h"
using namespace oceanbase::common;
namespace oceanbase
{
namespace liboblog
{
ObLogBRPool::ObLogBRPool() : inited_(false), unserilized_pool_(), serilized_pool_()
{
}
ObLogBRPool::~ObLogBRPool()
{
destroy();
}
int ObLogBRPool::init(const int64_t fixed_br_count)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(inited_)) {
LOG_ERROR("BRPool has been initialized");
ret = OB_INIT_TWICE;
} else if (OB_UNLIKELY(fixed_br_count <= 0)) {
LOG_ERROR("invalid argument", K(fixed_br_count));
ret = OB_INVALID_ARGUMENT;
} else if (OB_FAIL(unserilized_pool_.init(fixed_br_count, ObModIds::OB_LOG_BINLOG_RECORD_POOL))) {
LOG_ERROR("initialize binlog record pool fail", KR(ret), K(fixed_br_count));
} else if (OB_FAIL(serilized_pool_.init(fixed_br_count, ObModIds::OB_LOG_BINLOG_RECORD_POOL))) {
LOG_ERROR("initialize binlog record pool fail", KR(ret), K(fixed_br_count));
} else {
inited_ = true;
}
return ret;
}
void ObLogBRPool::destroy()
{
inited_ = false;
unserilized_pool_.destroy();
serilized_pool_.destroy();
}
int ObLogBRPool::alloc(const bool is_serilized, ObLogBR *&br, void *host/* = NULL */, void *log_entry_task/*=NULL*/)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(! inited_)) {
LOG_ERROR("BRPool has not been initialized");
ret = OB_NOT_INIT;
} else {
if (! is_serilized) {
ObLogUnserilizedBR *unserilized_br = NULL;
if (OB_FAIL(unserilized_pool_.alloc(unserilized_br))) {
LOG_ERROR("alloc binlog record fail", KR(ret));
} else {
br = unserilized_br;
}
} else {
ObLogSerilizedBR *serilized_br = NULL;
if (OB_FAIL(serilized_pool_.alloc(serilized_br))) {
LOG_ERROR("alloc binlog record fail", KR(ret));
} else {
br = serilized_br;
}
}
if (OB_SUCC(ret)) {
if (OB_ISNULL(br)) {
LOG_ERROR("alloc binlog record fail", K(br));
ret = OB_ERR_UNEXPECTED;
} else {
br->set_host(host);
br->set_log_entry_task(log_entry_task);
br->set_serilized(is_serilized);
}
}
}
return ret;
}
void ObLogBRPool::free(ObLogBR *br)
{
int ret = OB_SUCCESS;
if (OB_LIKELY(inited_) && OB_LIKELY(NULL != br)) {
const bool is_serilized = br->is_serilized();
// recycle memory
br->reset();
if (! is_serilized) {
ObLogUnserilizedBR *unserilized_br = NULL;
if (OB_ISNULL(unserilized_br = static_cast<ObLogUnserilizedBR *>(br))) {
LOG_ERROR("unserilized_br is NULL");
ret = OB_ERR_UNEXPECTED;
} else if (OB_FAIL(unserilized_pool_.free(unserilized_br))) {
LOG_ERROR("free binlog record fail", KR(ret), K(br));
}
} else {
ObLogSerilizedBR *serilized_br = NULL;
if (OB_ISNULL(serilized_br = static_cast<ObLogSerilizedBR *>(br))) {
LOG_ERROR("serilized_br is NULL");
ret = OB_ERR_UNEXPECTED;
} else if (OB_FAIL(serilized_pool_.free(serilized_br))) {
LOG_ERROR("free binlog record fail", KR(ret), K(br));
}
}
if (OB_SUCC(ret)) {
br = NULL;
}
}
}
void ObLogBRPool::print_stat_info() const
{
_LOG_INFO("[STAT] [BR_POOL] [UNSER](TOTAL=%ld FREE=%ld FIXED=%ld) "
"[SER](TOTAL=%ld FREE=%ld FIXED=%ld)",
unserilized_pool_.get_alloc_count(), unserilized_pool_.get_free_count(), unserilized_pool_.get_fixed_count(),
serilized_pool_.get_alloc_count(), serilized_pool_.get_free_count(), serilized_pool_.get_fixed_count());
}
} // namespace liboblog
} // namespace oceanbase
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef OCEANBASE_SRC_LIBOBLOG_OB_LOG_BINLOG_RECORD_POOL_
#define OCEANBASE_SRC_LIBOBLOG_OB_LOG_BINLOG_RECORD_POOL_
#include "ob_log_binlog_record.h" // ObLogBR
#include "lib/objectpool/ob_small_obj_pool.h" // ObSmallObjPool
namespace oceanbase
{
namespace liboblog
{
class IObLogBRPool
{
public:
virtual ~IObLogBRPool() {}
public:
// If host is valid, then set host to binlog record: ObLogBR::set_host()
// is_serilized = false, to allocate in-memory ILogRecord, i.e. for serialization
// is_serilized = true, for allocating deserialized ILogRecord
virtual int alloc(const bool is_serilized, ObLogBR *&br, void *host = NULL, void *log_entry_task = NULL) = 0;
virtual void free(ObLogBR *br) = 0;
virtual void print_stat_info() const = 0;
};
//////////////////////////////////////////////////////////////////////////////
class ObLogBRPool : public IObLogBRPool
{
typedef common::ObSmallObjPool<ObLogUnserilizedBR> UnserilizedBRObjPool;
typedef common::ObSmallObjPool<ObLogSerilizedBR> SerilizedBRObjPool;
public:
ObLogBRPool();
virtual ~ObLogBRPool();
public:
int alloc(const bool is_serilized, ObLogBR *&br, void *host = NULL, void *log_entry_task = NULL);
void free(ObLogBR *br);
void print_stat_info() const;
public:
int init(const int64_t fixed_br_count);
void destroy();
private:
bool inited_;
UnserilizedBRObjPool unserilized_pool_;
SerilizedBRObjPool serilized_pool_;
private:
DISALLOW_COPY_AND_ASSIGN(ObLogBRPool);
};
} // namespace liboblog
} // namespace oceanbase
#endif /* OCEANBASE_SRC_LIBOBLOG_OB_LOG_BINLOG_RECORD_POOL_ */
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#define USING_LOG_PREFIX OBLOG
#include "share/ob_define.h"
#include "ob_log_binlog_record_queue.h"
#include "ob_log_utils.h" // get_timestamp
using namespace oceanbase::common;
namespace oceanbase
{
namespace liboblog
{
int BRQueue::init(const int64_t queue_size)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(inited_)) {
LOG_ERROR("BRQueue has been initialized");
ret = OB_INIT_TWICE;
} else if (0 >= queue_size) {
ret = OB_INVALID_ARGUMENT;
} else if (OB_SUCCESS != (ret = queue_.init(queue_size))) {
LOG_ERROR("init fixed queue fail", KR(ret), K(queue_size));
} else {
dml_br_count_ = 0;
ddl_br_count_ = 0;
part_trans_task_count_ = 0;
inited_ = true;
}
return ret;
}
void BRQueue::destroy()
{
inited_ = false;
dml_br_count_ = 0;
ddl_br_count_ = 0;
part_trans_task_count_ = 0;
queue_.destroy();
}
int BRQueue::push(ObLogBR *data, const int64_t timeout)
{
int ret = OB_SUCCESS;
ILogRecord *br_data = NULL;
bool need_accumulate_stat = true;
if (OB_UNLIKELY(! inited_)) {
LOG_ERROR("BRQueue has not been initialized");
ret = OB_NOT_INIT;
} else if (OB_ISNULL(data)) {
LOG_ERROR("invalid argument", K(data));
ret = OB_INVALID_ARGUMENT;
} else if (OB_ISNULL(br_data = data->get_data())) {
LOG_ERROR("binlog record data is invalid", K(data));
ret = OB_INVALID_ARGUMENT;
} else if (OB_FAIL(do_stat_for_part_trans_task_count_(*data, need_accumulate_stat))) {
LOG_ERROR("do_stat_for_part_trans_task_count_ fail", KR(ret), K(need_accumulate_stat));
} else {
int64_t end_time = timeout + get_timestamp();
int record_type = br_data->recordType();
if (EDDL == record_type) {
ATOMIC_INC(&ddl_br_count_);
} else if (HEARTBEAT != record_type && EBEGIN != record_type && ECOMMIT != record_type) {
ATOMIC_INC(&dml_br_count_);
} else {
// do nothing
}
while (true) {
ret = queue_.push(data);
if (OB_UNLIKELY(OB_SIZE_OVERFLOW != ret)) {
break;
}
int64_t left_time = end_time - get_timestamp();
if (OB_UNLIKELY(left_time <= 0)) {
ret = OB_TIMEOUT;
break;
}
cond_.timedwait(left_time);
}
if (OB_FAIL(ret)) {
if (OB_TIMEOUT != ret) {
LOG_ERROR("push data into fixed queue fail", KR(ret), K(data));
}
} else {
cond_.signal();
}
}
return ret;
}
int BRQueue::pop(ILogRecord *&record, const int64_t timeout)
{
int ret = OB_SUCCESS;
int32_t major_version = 0;
uint64_t tenant_id = OB_INVALID_ID;
if (OB_FAIL(pop(record, major_version, tenant_id, timeout))) {
LOG_ERROR("pop BinlogRecord faili", KR(ret), K(record));
}
return ret;
}
int BRQueue::pop(ILogRecord *&record,
int32_t &major_version,
uint64_t &tenant_id,
const int64_t timeout)
{
int ret = OB_SUCCESS;
ObLogBR *next_br = NULL;
if (OB_UNLIKELY(! inited_)) {
LOG_ERROR("BRQueue has not been initialized");
ret = OB_NOT_INIT;
} else if (OB_FAIL(pop_next_br_(next_br, timeout))) {
if (OB_TIMEOUT != ret) {
LOG_ERROR("pop binlog record from br_queue fail", KR(ret));
}
} else if (OB_ISNULL(next_br)) {
LOG_ERROR("pop binlog record from br_queue fail", KR(ret), K(next_br));
ret = OB_ERR_UNEXPECTED;
} else {
record = next_br->get_data();
major_version = next_br->get_major_version();
tenant_id = next_br->get_tenant_id();
}
if (OB_SUCC(ret)) {
if (OB_ISNULL(record)) {
LOG_ERROR("binlog record data is invalid", K(record), K(next_br));
ret = OB_ERR_UNEXPECTED;
} else {
int record_type = record->recordType();
if (EDDL == record_type) {
ATOMIC_DEC(&ddl_br_count_);
} else if (HEARTBEAT != record_type && EBEGIN != record_type && ECOMMIT != record_type) {
ATOMIC_DEC(&dml_br_count_);
} else {
// do nothing
}
}
}
return ret;
}
int BRQueue::pop_next_br_(ObLogBR *&data, const int64_t timeout)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(! inited_)) {
LOG_ERROR("BRQueue has not been initialized");
ret = OB_NOT_INIT;
} else {
int64_t end_time = timeout + get_timestamp();
while (true) {
ret = queue_.pop(data);
if (OB_UNLIKELY(OB_ENTRY_NOT_EXIST != ret)) {
break;
}
int64_t left_time = end_time - get_timestamp();
if (OB_UNLIKELY(left_time <= 0)) {
ret = OB_TIMEOUT;
break;
}
cond_.timedwait(left_time);
}
if (OB_FAIL(ret)) {
if (OB_TIMEOUT != ret) {
LOG_ERROR("pop data from fixed queue fail", KR(ret));
}
} else {
bool need_accumulate_stat = false;
if (OB_FAIL(do_stat_for_part_trans_task_count_(*data, need_accumulate_stat))) {
LOG_ERROR("do_stat_for_part_trans_task_count_ fail", KR(ret), K(need_accumulate_stat));
}
cond_.signal();
}
}
return ret;
}
int64_t BRQueue::get_dml_br_count() const
{
return ATOMIC_LOAD(&dml_br_count_);
}
int64_t BRQueue::get_ddl_br_count() const
{
return ATOMIC_LOAD(&ddl_br_count_);
}
int64_t BRQueue::get_part_trans_task_count() const
{
return ATOMIC_LOAD(&part_trans_task_count_);
}
int BRQueue::do_stat_for_part_trans_task_count_(ObLogBR &data,
bool need_accumulate_stat)
{
int ret = OB_SUCCESS;
int record_type = 0;
int64_t part_trans_task_count = 0;
if (OB_UNLIKELY(! inited_)) {
LOG_ERROR("BRQueue has not been initialized");
ret = OB_NOT_INIT;
} else if (OB_FAIL(data.get_record_type(record_type))) {
LOG_ERROR("data get_record_type fail", KR(ret),
"record_type", print_record_type(record_type));
} else {
part_trans_task_count = data.get_part_trans_task_count();
if ((EDDL == record_type) || (EBEGIN == record_type)) {
if (need_accumulate_stat) {
// enter BRQueue
(void)ATOMIC_AAF(&part_trans_task_count_, part_trans_task_count);
} else {
// leave BRQueue
(void)ATOMIC_AAF(&part_trans_task_count_, -part_trans_task_count);
}
} else {
// do nothing
}
}
return ret;
}
} // namespace liboblog
} // namespace oceanbase
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef OCEANBASE_LIBOBLOG_OB_LOG_BINLOG_RECORD_QUEUE_
#define OCEANBASE_LIBOBLOG_OB_LOG_BINLOG_RECORD_QUEUE_
#include "lib/queue/ob_fixed_queue.h" // ObFixedQueue
#include "common/ob_queue_thread.h" // ObCond
#include "ob_log_binlog_record.h" // ObLogBR
namespace oceanbase
{
namespace liboblog
{
class BRQueue
{
public:
BRQueue() :
inited_(false),
queue_(),
cond_(),
dml_br_count_(0),
ddl_br_count_(0),
part_trans_task_count_(0)
{}
virtual ~BRQueue() { destroy(); }
public:
int init(const int64_t queue_size);
void destroy();
// To support large transactions - implement a streaming commit model where each push and pop is a separate ObLogBR
int push(ObLogBR *data, const int64_t timeout);
int pop(ILogRecord *&record, const int64_t timeout);
int pop(ILogRecord *&record, int32_t &major_version, uint64_t &tenant_id, const int64_t timeout);
int64_t get_dml_br_count() const;
int64_t get_ddl_br_count() const;
int64_t get_part_trans_task_count() const;
private:
int pop_next_br_(ObLogBR *&data, const int64_t timeout);
int do_stat_for_part_trans_task_count_(ObLogBR &data,
bool need_accumulate_stat);
private:
bool inited_;
common::ObFixedQueue<ObLogBR> queue_;
common::ObCond cond_;
int64_t dml_br_count_ CACHE_ALIGNED;
int64_t ddl_br_count_ CACHE_ALIGNED;
// Statistics on the number of partitioned transaction tasks
int64_t part_trans_task_count_ CACHE_ALIGNED;
};
} // namespace liboblog
} // namespace oceanbase
#endif /* OCEANBASE_LIBOBLOG_OB_LOG_BINLOG_RECORD_QUEUE_ */
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#define USING_LOG_PREFIX OBLOG_PARSER
#include "ob_log_cluster_id_filter.h" // ObLogClusterIDFilter
#include "lib/string/ob_string.h" // ObString
#include "ob_log_utils.h" // get_timestamp, get_record_type, split_int64
using namespace oceanbase::common;
namespace oceanbase
{
namespace liboblog
{
const char IObLogClusterIDFilter::DEFAULT_CLUSTER_ID_BLACK_LIST_DELIMITER = '|';
ObLogClusterIDFilter::ObLogClusterIDFilter() :
inited_(false),
cluster_id_ignored_part_trans_count_(0),
last_cluster_id_ignored_part_trans_count_(0),
last_stat_time_(0),
cluster_id_black_list_()
{}
ObLogClusterIDFilter::~ObLogClusterIDFilter()
{
destroy();
}
int ObLogClusterIDFilter::init(const char *cluster_id_black_list,
const int64_t cluster_id_black_value_min,
const int64_t cluster_id_black_value_max)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(inited_)) {
LOG_ERROR("init twice", K(inited_));
ret = OB_INIT_TWICE;
} else if (OB_UNLIKELY(OB_ISNULL(cluster_id_black_list)
|| OB_UNLIKELY(cluster_id_black_value_min > cluster_id_black_value_max))) {
LOG_ERROR("invalid argument", K(cluster_id_black_list), K(cluster_id_black_value_min),
K(cluster_id_black_value_max));
ret = OB_INVALID_ARGUMENT;
} else if (OB_FAIL(init_cluster_id_black_list_(cluster_id_black_list,
cluster_id_black_value_min, cluster_id_black_value_max))) {
LOG_ERROR("init cluster id black list fail", KR(ret), K(cluster_id_black_list),
K(cluster_id_black_value_min), K(cluster_id_black_value_max));
} else {
inited_ = true;
}
return ret;
}
int ObLogClusterIDFilter::init_cluster_id_black_list_(const char *cluster_id_black_list,
const int64_t cluster_id_black_value_min,
const int64_t cluster_id_black_value_max)
{
int ret = OB_SUCCESS;
ObString str(cluster_id_black_list);
const char delimiter = DEFAULT_CLUSTER_ID_BLACK_LIST_DELIMITER;
if (OB_ISNULL(cluster_id_black_list)
|| OB_UNLIKELY(cluster_id_black_value_min > cluster_id_black_value_max)) {
LOG_ERROR("invalid argument", K(cluster_id_black_list), K(cluster_id_black_value_min),
K(cluster_id_black_value_max));
ret = OB_INVALID_ARGUMENT;
}
// split into int64 data
else if (OB_FAIL(split_int64(str, delimiter, cluster_id_black_list_))) {
LOG_ERROR("fail to parse cluster_id_black_list",
KR(ret), K(str), K(delimiter), K(cluster_id_black_list_));
} else {
_LOG_INFO("[STAT] [CLUSTER_ID_BLACK_LIST] count=%ld, black_list='%s', min=%ld, max=%ld",
cluster_id_black_list_.count(), cluster_id_black_list,
cluster_id_black_value_min, cluster_id_black_value_max);
// Check the validity of each element and whether it is within a reasonable range
for (int64_t idx = 0; OB_SUCCESS == ret && idx < cluster_id_black_list_.count(); idx++) {
int64_t cluster_id = cluster_id_black_list_.at(idx);
_LOG_INFO("[STAT] [CLUSTER_ID_BLACK_LIST] idx=%ld, cluster_id=%ld", idx, cluster_id);
if (OB_UNLIKELY(cluster_id < cluster_id_black_value_min)
|| OB_UNLIKELY(cluster_id > cluster_id_black_value_max)) {
LOG_ERROR("invalid cluster id in black list, which is out of range",
K(cluster_id), K(cluster_id_black_value_min),
K(cluster_id_black_value_max),
K(cluster_id_black_list));
ret = OB_INVALID_CONFIG;
}
}
}
return ret;
}
void ObLogClusterIDFilter::destroy()
{
inited_ = false;
cluster_id_ignored_part_trans_count_ = 0;
last_cluster_id_ignored_part_trans_count_ = 0;
last_stat_time_ = 0;
cluster_id_black_list_.destroy();
}
void ObLogClusterIDFilter::stat_ignored_tps()
{
int64_t cur_time = get_timestamp();
int64_t cur_count = ATOMIC_LOAD(&cluster_id_ignored_part_trans_count_);
int64_t last_count = ATOMIC_LOAD(&last_cluster_id_ignored_part_trans_count_);
int64_t delta_time = (cur_time - last_stat_time_) / 1000000;
if (last_stat_time_ > 0 && delta_time > 0) {
double tps = static_cast<double>(cur_count - last_count) / static_cast<double>(delta_time);
_LOG_INFO("[TPS_STAT] CLUSTER_ID_IGNORED_PART_TPS=%.3lf", tps);
}
last_cluster_id_ignored_part_trans_count_ = cur_count;
last_stat_time_ = cur_time;
}
int ObLogClusterIDFilter::check_is_served(const uint64_t cluster_id, bool &is_served,
const bool stat_tps)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(! inited_)) {
LOG_ERROR("not inited", K(inited_));
ret = OB_NOT_INIT;
} else {
is_served = true;
for (int64_t idx = 0; OB_SUCCESS == ret && is_served && idx < cluster_id_black_list_.count(); idx++) {
// not serve if in blacklist
if (cluster_id == cluster_id_black_list_.at(idx)) {
is_served = false;
}
}
if (! is_served && stat_tps) {
(void)ATOMIC_FAA(&cluster_id_ignored_part_trans_count_, 1);
}
}
return ret;
}
}
}
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef OCEANBASE_LIBOBLOG_OB_LOG_CLUSTER_ID_FILTER_H_
#define OCEANBASE_LIBOBLOG_OB_LOG_CLUSTER_ID_FILTER_H_
#include "lib/utility/ob_macro_utils.h" // CACHE_ALIGNED
#include "lib/container/ob_se_array.h" // ObSEArray
namespace oceanbase
{
namespace liboblog
{
class IObLogClusterIDFilter
{
public:
static const char DEFAULT_CLUSTER_ID_BLACK_LIST_DELIMITER;
public:
virtual ~IObLogClusterIDFilter() {}
public:
virtual int check_is_served(const uint64_t cluster_id, bool &is_served,
const bool stat_tps = true) = 0;
virtual void stat_ignored_tps() = 0;
};
class ObLogClusterIDFilter : public IObLogClusterIDFilter
{
static const int64_t DEFAULT_CLUSTER_ID_BLACK_LIST_SIZE = 8;
public:
ObLogClusterIDFilter();
virtual ~ObLogClusterIDFilter();
public:
virtual int check_is_served(const uint64_t cluster_id, bool &is_served,
const bool stat_tps = true);
virtual void stat_ignored_tps();
public:
int init(const char *cluster_id_black_list,
const int64_t cluster_id_black_value_min,
const int64_t cluster_id_black_value_max);
void destroy();
private:
int init_cluster_id_black_list_(const char *cluster_id_black_list,
const int64_t cluster_id_black_value_min,
const int64_t cluster_id_black_value_max);
private:
bool inited_;
// TPS statistics based on cluster_id filtering
// The TPS statistics here refers to the number of partition transactions
int64_t cluster_id_ignored_part_trans_count_ CACHE_ALIGNED;
int64_t last_cluster_id_ignored_part_trans_count_ CACHE_ALIGNED;
int64_t last_stat_time_ CACHE_ALIGNED;
// blacklist of cluster id
common::ObSEArray<int64_t, DEFAULT_CLUSTER_ID_BLACK_LIST_SIZE> cluster_id_black_list_;
private:
DISALLOW_COPY_AND_ASSIGN(ObLogClusterIDFilter);
};
}
}
#endif
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册