提交 f8506512 编写于 作者: O obdev 提交者: ob-robot

fix table load table ctx ref count problem

上级 dd355e11
......@@ -163,6 +163,7 @@ ob_set_subtarget(ob_server table_load
table_load/ob_table_load_finish_processor.cpp
table_load/ob_table_load_general_table_compactor.cpp
table_load/ob_table_load_get_status_processor.cpp
table_load/ob_table_load_manager.cpp
table_load/ob_table_load_merger.cpp
table_load/ob_table_load_obj_cast.cpp
table_load/ob_table_load_parallel_merge_ctx.cpp
......
// Copyright (c) 2022-present Oceanbase Inc. All Rights Reserved.
// Author:
// suzhi.yt <suzhi.yt@oceanbase.com>
#define USING_LOG_PREFIX SERVER
#include "observer/table_load/ob_table_load_manager.h"
#include "observer/table_load/ob_table_load_table_ctx.h"
#include "share/rc/ob_tenant_base.h"
namespace oceanbase
{
namespace observer
{
using namespace common;
using namespace common::hash;
using namespace lib;
using namespace table;
/**
* TableCtxHandle
*/
ObTableLoadManager::TableCtxHandle::TableCtxHandle()
: table_ctx_(nullptr)
{
}
ObTableLoadManager::TableCtxHandle::TableCtxHandle(ObTableLoadTableCtx *table_ctx)
: table_ctx_(table_ctx)
{
if (OB_NOT_NULL(table_ctx_)) {
table_ctx_->inc_ref_count();
}
}
ObTableLoadManager::TableCtxHandle::TableCtxHandle(const TableCtxHandle &other)
: table_ctx_(other.table_ctx_)
{
if (OB_NOT_NULL(table_ctx_)) {
table_ctx_->inc_ref_count();
}
}
ObTableLoadManager::TableCtxHandle::~TableCtxHandle()
{
reset();
}
void ObTableLoadManager::TableCtxHandle::reset()
{
if (nullptr != table_ctx_) {
table_ctx_->dec_ref_count();
table_ctx_ = nullptr;
}
}
void ObTableLoadManager::TableCtxHandle::set(ObTableLoadTableCtx *table_ctx)
{
reset();
if (OB_NOT_NULL(table_ctx)) {
table_ctx_ = table_ctx;
table_ctx_->inc_ref_count();
}
}
ObTableLoadManager::TableCtxHandle &ObTableLoadManager::TableCtxHandle::operator=(
const TableCtxHandle &other)
{
if (this != &other) {
set(other.table_ctx_);
}
return *this;
}
/**
* ObTableLoadManager
*/
ObTableLoadManager::ObTableLoadManager()
: is_inited_(false)
{
}
ObTableLoadManager::~ObTableLoadManager()
{
}
int ObTableLoadManager::init()
{
int ret = OB_SUCCESS;
const int64_t bucket_num = 1024;
if (IS_INIT) {
ret = OB_INIT_TWICE;
LOG_WARN("ObTableLoadManager init twice", KR(ret), KP(this));
} else {
if (OB_FAIL(
table_ctx_map_.create(bucket_num, "TLD_TableCtxMap", "TLD_TableCtxMap", MTL_ID()))) {
LOG_WARN("fail to create table ctx map", KR(ret), K(bucket_num));
} else {
is_inited_ = true;
}
}
return ret;
}
int ObTableLoadManager::add_table_ctx(uint64_t table_id, ObTableLoadTableCtx *table_ctx)
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("ObTableLoadManager not init", KR(ret), KP(this));
} else if (OB_UNLIKELY(OB_INVALID_ID == table_id || nullptr == table_ctx)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid args", KR(ret), K(table_id), KP(table_ctx));
} else if (OB_UNLIKELY(table_ctx->is_dirty())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected dirty table ctx", KR(ret), KP(table_ctx));
} else {
TableCtxHandle handle(table_ctx); // inc_ref_count
if (OB_FAIL(table_ctx_map_.set_refactored(table_id, handle))) {
if (OB_UNLIKELY(OB_HASH_EXIST != ret)) {
LOG_WARN("fail to set refactored", KR(ret), K(table_id));
} else {
ret = OB_ENTRY_EXIST;
}
} else {
handle.take(); // keep reference count
}
}
return ret;
}
int ObTableLoadManager::remove_table_ctx(uint64_t table_id)
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("ObTableLoadManager not init", KR(ret), KP(this));
} else if (OB_UNLIKELY(OB_INVALID_ID == table_id)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid args", KR(ret), KP(table_id));
} else {
TableCtxHandle handle;
// remove from map
if (OB_FAIL(table_ctx_map_.erase_refactored(table_id, &handle))) {
if (OB_UNLIKELY(OB_HASH_NOT_EXIST != ret)) {
LOG_WARN("fail to get refactored", KR(ret), K(table_id));
} else {
ret = OB_ENTRY_NOT_EXIST;
}
}
// add to dirty list
else {
ObTableLoadTableCtx *table_ctx = handle.get();
table_ctx->set_dirty();
ObMutexGuard guard(mutex_);
OB_ASSERT(dirty_list_.add_last(table_ctx));
}
}
return ret;
}
int ObTableLoadManager::get_table_ctx(uint64_t table_id, ObTableLoadTableCtx *&table_ctx)
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("ObTableLoadManager not init", KR(ret), KP(this));
} else {
table_ctx = nullptr;
TableCtxHandle handle;
if (OB_FAIL(table_ctx_map_.get_refactored(table_id, handle))) {
if (OB_UNLIKELY(OB_HASH_NOT_EXIST != ret)) {
LOG_WARN("fail to get refactored", KR(ret), K(table_id));
} else {
ret = OB_ENTRY_NOT_EXIST;
}
} else {
table_ctx = handle.take(); // keep reference count
}
}
return ret;
}
int ObTableLoadManager::get_inactive_table_ctx_list(
ObIArray<ObTableLoadTableCtx *> &table_ctx_array)
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("ObTableLoadManager not init", KR(ret), KP(this));
} else {
table_ctx_array.reset();
auto fn = [&ret, &table_ctx_array](HashMapPair<uint64_t, TableCtxHandle> &entry) -> int {
TableCtxHandle handle = entry.second; // inc_ref_count
ObTableLoadTableCtx *table_ctx = handle.get();
OB_ASSERT(nullptr != table_ctx);
if (table_ctx->get_ref_count() > 2) { // 2 = map + handle
// skip active table ctx
} else if (OB_FAIL(table_ctx_array.push_back(table_ctx))) {
LOG_WARN("fail to push back", KR(ret));
} else {
handle.take(); // keep reference count
}
return ret;
};
if (OB_FAIL(table_ctx_map_.foreach_refactored(fn))) {
LOG_WARN("fail to foreach map", KR(ret));
}
if (OB_FAIL(ret)) {
for (int64_t i = 0; i < table_ctx_array.count(); ++i) {
ObTableLoadTableCtx *table_ctx = table_ctx_array.at(i);
put_table_ctx(table_ctx);
}
table_ctx_array.reset();
}
}
return ret;
}
void ObTableLoadManager::put_table_ctx(ObTableLoadTableCtx *table_ctx)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(table_ctx)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid args", KR(ret), KP(table_ctx));
} else {
OB_ASSERT(table_ctx->dec_ref_count() >= 0);
}
}
int ObTableLoadManager::get_releasable_table_ctx_list(
ObIArray<ObTableLoadTableCtx *> &table_ctx_array)
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("ObTableLoadManager not init", KR(ret), KP(this));
} else {
table_ctx_array.reset();
ObMutexGuard guard(mutex_);
ObTableLoadTableCtx *table_ctx = nullptr;
DLIST_FOREACH_REMOVESAFE(table_ctx, dirty_list_)
{
if (table_ctx->get_ref_count() > 0) {
// wait all task exit
} else if (OB_FAIL(table_ctx_array.push_back(table_ctx))) {
LOG_WARN("fail to push back", KR(ret));
} else {
OB_ASSERT(OB_NOT_NULL(dirty_list_.remove(table_ctx)));
}
}
}
return ret;
}
} // namespace observer
} // namespace oceanbase
......@@ -4,148 +4,58 @@
#pragma once
#include "lib/allocator/ob_malloc.h"
#include "lib/hash/ob_concurrent_hash_map.h"
#include "lib/hash/ob_hashmap.h"
#include "lib/list/ob_dlist.h"
#include "lib/lock/ob_mutex.h"
#include "lib/objectpool/ob_concurrency_objpool.h"
#include "observer/table_load/ob_table_load_struct.h"
namespace oceanbase
{
namespace observer
{
class ObTableLoadTableCtx;
template<class KeyType, class ValueType>
class ObTableLoadManager
{
public:
ObTableLoadManager();
~ObTableLoadManager();
int init();
int put(const KeyType &key, ValueType *value);
int get(const KeyType &key, ValueType *&value);
int remove(const KeyType &key, ValueType *value);
void free_value(ValueType *value);
template<typename... Args>
int new_and_insert(const KeyType &key, ValueType *&value, const Args&... args);
template<typename... Args>
int get_or_new(const KeyType &key, ValueType *&value, bool &is_new, const Args&... args);
template <typename Function>
int for_each(Function &fn) { return value_map_.foreach_refactored(fn); }
int add_table_ctx(uint64_t table_id, ObTableLoadTableCtx *table_ctx);
int remove_table_ctx(uint64_t table_id);
// table ctx holds a reference count
int get_table_ctx(uint64_t table_id, ObTableLoadTableCtx *&table_ctx);
// all table ctx hold a reference count
int get_inactive_table_ctx_list(common::ObIArray<ObTableLoadTableCtx *> &table_ctx_array);
void put_table_ctx(ObTableLoadTableCtx *table_ctx);
// table ctx no reference counting
int get_releasable_table_ctx_list(common::ObIArray<ObTableLoadTableCtx *> &table_ctx_array);
private:
template<typename... Args>
int new_value(const KeyType &key, ValueType *&value, const Args&... args);
private:
common::hash::ObHashMap<KeyType, ValueType *> value_map_;
struct TableCtxHandle
{
public:
TableCtxHandle();
TableCtxHandle(ObTableLoadTableCtx *table_ctx);
TableCtxHandle(const TableCtxHandle &other);
~TableCtxHandle();
void reset();
void set(ObTableLoadTableCtx *table_ctx);
TableCtxHandle &operator=(const TableCtxHandle &other);
OB_INLINE ObTableLoadTableCtx *get() const { return table_ctx_; }
OB_INLINE ObTableLoadTableCtx *take()
{
ObTableLoadTableCtx *table_ctx = table_ctx_;
table_ctx_ = nullptr;
return table_ctx;
}
private:
ObTableLoadTableCtx *table_ctx_;
};
typedef common::hash::ObHashMap<uint64_t, TableCtxHandle> TableCtxMap;
TableCtxMap table_ctx_map_;
lib::ObMutex mutex_;
common::ObDList<ObTableLoadTableCtx> dirty_list_;
bool is_inited_;
};
template<class KeyType, class ValueType>
int ObTableLoadManager<KeyType, ValueType>::init()
{
return value_map_.create(1024, "TLD_LoadMgr", "TLD_LoadMgr");
}
template<class KeyType, class ValueType>
int ObTableLoadManager<KeyType, ValueType>::put(const KeyType &key, ValueType *value)
{
return value_map_.set_refactored(key, value);
}
template<class KeyType, class ValueType>
int ObTableLoadManager<KeyType, ValueType>::get(const KeyType &key, ValueType *&value)
{
return value_map_.get_refactored(key, value);
}
template<class KeyType, class ValueType>
int ObTableLoadManager<KeyType, ValueType>::remove(const KeyType &key, ValueType *value)
{
return value_map_.erase_refactored(key);
}
template<class KeyType, class ValueType>
template<typename... Args>
int ObTableLoadManager<KeyType, ValueType>::new_value(const KeyType &key, ValueType *&value,
const Args&... args)
{
int ret = common::OB_SUCCESS;
lib::ObMutexGuard guard(mutex_);
ret = get(key, value);
if (ret == common::OB_HASH_NOT_EXIST) {
ret = common::OB_SUCCESS;
if (OB_ISNULL(value = OB_NEW(ValueType, ObMemAttr(MTL_ID(), "TLD_MgrValue"), args...))) {
ret = common::OB_ALLOCATE_MEMORY_FAILED;
OB_LOG(WARN, "fail to new value", KR(ret), K(key));
} else {
if (OB_FAIL(value->init())) {
OB_LOG(WARN, "fail to init value", KR(ret));
} else if (OB_FAIL(put(key, value))) {
OB_LOG(WARN, "fail to put value", KR(ret));
}
if (OB_FAIL(ret)) {
free_value(value);
value = nullptr;
}
}
} else if (ret != common::OB_SUCCESS) {
OB_LOG(WARN, "fail to get value", KR(ret));
} else { // 已存在
ret = common::OB_HASH_EXIST;
}
return ret;
}
template<class KeyType, class ValueType>
void ObTableLoadManager<KeyType, ValueType>::free_value(ValueType *value)
{
OB_DELETE(ValueType, "TLD_MgrValue", value);
}
template<class KeyType, class ValueType>
template<typename... Args>
int ObTableLoadManager<KeyType, ValueType>::new_and_insert(const KeyType &key, ValueType *&value,
const Args&... args)
{
int ret = common::OB_SUCCESS;
ValueType *tmp_value = nullptr;
ret = get(key, tmp_value);
if (OB_LIKELY(common::OB_HASH_NOT_EXIST == ret)) {
ret = new_value(key, value, args...);
}
if (OB_FAIL(ret)) {
if (OB_LIKELY(common::OB_HASH_EXIST == ret)) {
OB_LOG(WARN, "value already exist", KR(ret), K(key));
} else {
OB_LOG(WARN, "fail to call contains key", KR(ret), K(key));
}
}
return ret;
}
template<class KeyType, class ValueType>
template<typename... Args>
int ObTableLoadManager<KeyType, ValueType>::get_or_new(const KeyType &key, ValueType *&value,
bool &is_new, const Args&... args)
{
int ret = common::OB_SUCCESS;
value = nullptr;
is_new = false;
ret = get(key, value);
if (common::OB_HASH_NOT_EXIST == ret) {
ret = common::OB_SUCCESS;
if (OB_FAIL(new_value(key, value, args...))) {
if (OB_UNLIKELY(common::OB_HASH_EXIST != ret)) {
OB_LOG(WARN, "fail to new value", KR(ret));
} else { // 已存在
ret = common::OB_SUCCESS;
}
} else {
is_new = true;
}
} else if (ret != common::OB_SUCCESS) {
OB_LOG(WARN, "fail to get value", KR(ret));
}
return ret;
}
} // namespace observer
} // namespace oceanbase
} // namespace observer
} // namespace oceanbase
......@@ -5,8 +5,8 @@
#define USING_LOG_PREFIX SERVER
#include "observer/table_load/ob_table_load_service.h"
#include "observer/table_load/ob_table_load_table_ctx.h"
#include "observer/table_load/ob_table_load_schema.h"
#include "observer/table_load/ob_table_load_table_ctx.h"
#include "share/rc/ob_tenant_base.h"
#include "share/schema/ob_table_schema.h"
......@@ -44,47 +44,51 @@ void ObTableLoadService::ObGCTask::runTimerTask()
LOG_WARN("ObTableLoadService::ObGCTask not init", KR(ret), KP(this));
} else {
LOG_DEBUG("table load start gc", K(tenant_id_));
auto fn = [this](common::hash::HashMapPair<uint64_t, ObTableLoadTableCtx *> &entry) -> int {
int ret = OB_SUCCESS;
uint64_t table_id = entry.first;
ObTableLoadTableCtx *table_ctx = nullptr;
if (OB_FAIL(service_.get_table_ctx(table_id, table_ctx))) {
} else if (table_ctx->is_dirty()) {
LOG_DEBUG("table load ctx is dirty", K(tenant_id_), K(table_id), "ref_count", table_ctx->get_ref_count());
} else if (table_ctx->get_ref_count() > 1) {
// wait all task exit
} else {
ObArray<ObTableLoadTableCtx *> inactive_table_ctx_array;
if (OB_FAIL(service_.manager_.get_inactive_table_ctx_list(inactive_table_ctx_array))) {
LOG_WARN("fail to get inactive table ctx list", KR(ret), K(tenant_id_));
}
for (int64_t i = 0; i < inactive_table_ctx_array.count(); ++i) {
ObTableLoadTableCtx *table_ctx = inactive_table_ctx_array.at(i);
const uint64_t table_id = table_ctx->param_.table_id_;
const uint64_t target_table_id = table_ctx->param_.target_table_id_;
// check if table ctx is removed
if (table_ctx->is_dirty()) {
LOG_DEBUG("table load ctx is dirty", K(tenant_id_), "table_id", table_ctx->param_.table_id_,
"ref_count", table_ctx->get_ref_count());
}
// check if table ctx is activated
else if (table_ctx->get_ref_count() > 2) {
LOG_DEBUG("table load ctx is active", K(tenant_id_), "table_id",
table_ctx->param_.table_id_, "ref_count", table_ctx->get_ref_count());
}
// check if table ctx can be recycled
else {
ObSchemaGetterGuard schema_guard;
const ObTableSchema *table_schema = nullptr;
uint64_t target_table_id = table_ctx->param_.target_table_id_;
if (target_table_id == OB_INVALID_ID) {
// do nothing because hidden table has not been created
} else if (OB_FAIL(ObTableLoadSchema::get_table_schema(tenant_id_,
target_table_id,
schema_guard,
table_schema))) {
LOG_INFO("hidden table has not been created, gc table load ctx", K(tenant_id_),
K(table_id), K(target_table_id));
service_.remove_table_ctx(table_ctx);
} else if (OB_FAIL(ObTableLoadSchema::get_table_schema(tenant_id_, target_table_id,
schema_guard, table_schema))) {
if (OB_UNLIKELY(OB_TABLE_NOT_EXIST != ret)) {
LOG_WARN("fail to get table schema", KR(ret), K(tenant_id_), K(target_table_id));
} else {
// table not exist, gc table load ctx
LOG_DEBUG("table not exist gc table load ctx", K(tenant_id_), K(target_table_id));
LOG_INFO("hidden table not exist, gc table load ctx", K(tenant_id_), K(table_id),
K(target_table_id));
service_.remove_table_ctx(table_ctx);
}
} else if (table_schema->is_in_recyclebin()) {
// table in recyclebin, gc table load ctx
LOG_DEBUG("table is in recyclebin gc table load ctx", K(tenant_id_), K(target_table_id));
LOG_INFO("hidden table is in recyclebin, gc table load ctx", K(tenant_id_), K(table_id),
K(target_table_id));
service_.remove_table_ctx(table_ctx);
} else {
LOG_DEBUG("table load ctx is running", K(target_table_id));
LOG_DEBUG("table load ctx is running", K(tenant_id_), K(table_id), K(target_table_id));
}
}
if (OB_NOT_NULL(table_ctx)) {
service_.put_table_ctx(table_ctx);
table_ctx = nullptr;
}
return ret;
};
service_.table_ctx_manager_.for_each(fn);
service_.put_table_ctx(table_ctx);
}
}
}
......@@ -113,29 +117,24 @@ void ObTableLoadService::ObReleaseTask::runTimerTask()
LOG_WARN("ObTableLoadService::ObReleaseTask not init", KR(ret), KP(this));
} else {
LOG_DEBUG("table load start release", K(tenant_id_));
ObArray<ObTableLoadTableCtx *> releasable_ctx_array;
{
ObMutexGuard guard(service_.mutex_);
ObTableLoadTableCtx *table_ctx = nullptr;
DLIST_FOREACH_REMOVESAFE(table_ctx, service_.dirty_list_)
{
if (table_ctx->get_ref_count() > 0) {
// wait all task exit
} else if (OB_FAIL(releasable_ctx_array.push_back(table_ctx))) {
LOG_WARN("fail to push back", KR(ret));
} else {
abort_unless(OB_NOT_NULL(service_.dirty_list_.remove(table_ctx)));
}
}
ObArray<ObTableLoadTableCtx *> releasable_table_ctx_array;
if (OB_FAIL(service_.manager_.get_releasable_table_ctx_list(releasable_table_ctx_array))) {
LOG_WARN("fail to get releasable table ctx list", KR(ret), K(tenant_id_));
}
for (int64_t i = 0; i < releasable_ctx_array.count(); ++i) {
ObTableLoadTableCtx *table_ctx = releasable_ctx_array.at(i);
LOG_INFO("free table ctx", KP(table_ctx));
service_.table_ctx_manager_.free_value(table_ctx);
for (int64_t i = 0; i < releasable_table_ctx_array.count(); ++i) {
ObTableLoadTableCtx *table_ctx = releasable_table_ctx_array.at(i);
const uint64_t table_id = table_ctx->param_.table_id_;
const uint64_t target_table_id = table_ctx->param_.target_table_id_;
LOG_INFO("free table ctx", K(tenant_id_), K(table_id), K(target_table_id), KP(table_ctx));
OB_DELETE(ObTableLoadTableCtx, "TLD_TableCtxVal", table_ctx);
}
}
}
/**
* ObTableLoadService
*/
int ObTableLoadService::mtl_init(ObTableLoadService *&service)
{
int ret = OB_SUCCESS;
......@@ -149,7 +148,7 @@ int ObTableLoadService::mtl_init(ObTableLoadService *&service)
return ret;
}
int ObTableLoadService::create_ctx(const ObTableLoadParam &param, ObTableLoadTableCtx *&ctx,
int ObTableLoadService::create_ctx(const ObTableLoadParam &param, ObTableLoadTableCtx *&table_ctx,
bool &is_new)
{
int ret = OB_SUCCESS;
......@@ -158,12 +157,12 @@ int ObTableLoadService::create_ctx(const ObTableLoadParam &param, ObTableLoadTab
ret = OB_ERR_SYS;
LOG_WARN("null table load service", KR(ret));
} else {
ret = service->create_table_ctx(param, ctx, is_new);
ret = service->create_table_ctx(param, table_ctx, is_new);
}
return ret;
}
int ObTableLoadService::get_ctx(const ObTableLoadKey &key, ObTableLoadTableCtx *&ctx)
int ObTableLoadService::get_ctx(const ObTableLoadKey &key, ObTableLoadTableCtx *&table_ctx)
{
int ret = OB_SUCCESS;
ObTableLoadService *service = nullptr;
......@@ -171,12 +170,12 @@ int ObTableLoadService::get_ctx(const ObTableLoadKey &key, ObTableLoadTableCtx *
ret = OB_ERR_SYS;
LOG_WARN("null table load service", KR(ret));
} else {
ret = service->get_table_ctx(key.table_id_, ctx);
ret = service->get_table_ctx(key.table_id_, table_ctx);
}
return ret;
}
void ObTableLoadService::put_ctx(ObTableLoadTableCtx *ctx)
void ObTableLoadService::put_ctx(ObTableLoadTableCtx *table_ctx)
{
int ret = OB_SUCCESS;
ObTableLoadService *service = nullptr;
......@@ -184,11 +183,11 @@ void ObTableLoadService::put_ctx(ObTableLoadTableCtx *ctx)
ret = OB_ERR_SYS;
LOG_WARN("null table load service", KR(ret));
} else {
service->put_table_ctx(ctx);
service->put_table_ctx(table_ctx);
}
}
int ObTableLoadService::remove_ctx(ObTableLoadTableCtx *ctx)
int ObTableLoadService::remove_ctx(ObTableLoadTableCtx *table_ctx)
{
int ret = OB_SUCCESS;
ObTableLoadService *service = nullptr;
......@@ -196,7 +195,7 @@ int ObTableLoadService::remove_ctx(ObTableLoadTableCtx *ctx)
ret = OB_ERR_SYS;
LOG_WARN("null table load service", KR(ret));
} else {
ret = service->remove_table_ctx(ctx);
ret = service->remove_table_ctx(table_ctx);
}
return ret;
}
......@@ -212,7 +211,7 @@ int ObTableLoadService::init(uint64_t tenant_id)
if (IS_INIT) {
ret = OB_INIT_TWICE;
LOG_WARN("ObTableLoadService init twice", KR(ret), KP(this));
} else if (OB_FAIL(table_ctx_manager_.init())) {
} else if (OB_FAIL(manager_.init())) {
LOG_WARN("fail to init table ctx manager", KR(ret));
} else if (OB_FAIL(gc_task_.init(tenant_id))) {
LOG_WARN("fail to init gc task", KR(ret));
......@@ -232,7 +231,7 @@ int ObTableLoadService::start()
LOG_WARN("ObTableLoadService not init", KR(ret), KP(this));
} else {
gc_timer_.set_run_wrapper(MTL_CTX());
if (OB_FAIL(gc_timer_.init("TableLoadGc"))) {
if (OB_FAIL(gc_timer_.init("TLD_GC"))) {
LOG_WARN("fail to init gc timer", KR(ret));
} else if (OB_FAIL(gc_timer_.schedule(gc_task_, GC_INTERVAL, true))) {
LOG_WARN("fail to schedule gc task", KR(ret));
......@@ -261,80 +260,73 @@ void ObTableLoadService::destroy()
gc_timer_.destroy();
}
int ObTableLoadService::create_table_ctx(const ObTableLoadParam &param, ObTableLoadTableCtx *&ctx,
bool &is_new)
int ObTableLoadService::create_table_ctx(const ObTableLoadParam &param,
ObTableLoadTableCtx *&table_ctx, bool &is_new)
{
int ret = OB_SUCCESS;
table_ctx = nullptr;
is_new = false;
if (OB_UNLIKELY(!param.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid args", KR(ret), K(param));
} else {
if (OB_FAIL(table_ctx_manager_.get_or_new(param.table_id_, ctx, is_new, param))) {
LOG_WARN("fail to new and insert table ctx", KR(ret));
} else if (OB_ISNULL(ctx)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected null table ctx", KR(ret), K(param));
} else if (OB_UNLIKELY(ctx->is_dirty())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected dirty table ctx", KR(ret));
} else {
ctx->inc_ref_count();
const uint64_t table_id = param.table_id_;
ObTableLoadTableCtx *new_table_ctx = nullptr;
if (OB_FAIL(manager_.get_table_ctx(table_id, table_ctx))) {
if (OB_UNLIKELY(OB_ENTRY_NOT_EXIST != ret)) {
LOG_WARN("fail to get table ctx", KR(ret), K(table_id));
} else {
table_ctx = nullptr;
ret = OB_SUCCESS;
}
}
if (OB_SUCC(ret) && nullptr == table_ctx) {
if (OB_ISNULL(new_table_ctx =
OB_NEW(ObTableLoadTableCtx, ObMemAttr(MTL_ID(), "TLD_TableCtxVal"), param))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to new table ctx", KR(ret), K(param));
} else if (OB_FAIL(new_table_ctx->init())) {
LOG_WARN("fail to init table ctx", KR(ret));
} else if (OB_FAIL(manager_.add_table_ctx(table_id, new_table_ctx))) {
LOG_WARN("fail to add table ctx", KR(ret), K(table_id));
} else {
table_ctx = new_table_ctx;
is_new = true;
}
}
if (OB_FAIL(ret)) {
if (nullptr != new_table_ctx) {
OB_DELETE(ObTableLoadTableCtx, "TLD_TableCtxVal", new_table_ctx);
new_table_ctx = nullptr;
}
}
}
return ret;
}
int ObTableLoadService::get_table_ctx(uint64_t table_id, ObTableLoadTableCtx *&ctx)
int ObTableLoadService::remove_table_ctx(ObTableLoadTableCtx *table_ctx)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(OB_INVALID_ID == table_id)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid args", KR(ret), K(table_id));
} else if (OB_FAIL(table_ctx_manager_.get(table_id, ctx))) {
LOG_WARN("fail to get table ctx", KR(ret), K(table_id));
} else if (OB_ISNULL(ctx)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected null table ctx", KR(ret), K(table_id));
} else if (OB_UNLIKELY(ctx->is_dirty())) {
ret = OB_ENTRY_NOT_EXIST;
LOG_WARN("table ctx is dirty", KR(ret));
} else {
ctx->inc_ref_count();
const uint64_t table_id = table_ctx->param_.table_id_;
if (OB_FAIL(manager_.remove_table_ctx(table_id))) {
LOG_WARN("fail to remove table ctx", KR(ret), K(table_id));
}
return ret;
}
void ObTableLoadService::put_table_ctx(ObTableLoadTableCtx *ctx)
int ObTableLoadService::get_table_ctx(uint64_t table_id, ObTableLoadTableCtx *&ctx)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(ctx)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid args", KR(ret), KP(ctx));
} else {
abort_unless(ctx->dec_ref_count() >= 0);
if (OB_FAIL(manager_.get_table_ctx(table_id, ctx))) {
LOG_WARN("fail to get table ctx", KR(ret), K(table_id));
}
return ret;
}
int ObTableLoadService::remove_table_ctx(ObTableLoadTableCtx *ctx)
void ObTableLoadService::put_table_ctx(ObTableLoadTableCtx *table_ctx)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(ctx)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid args", KR(ret), KP(ctx));
} else if (OB_UNLIKELY(ctx->is_dirty())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected dirty table ctx", KR(ret), KP(ctx));
} else {
ctx->set_dirty();
if (OB_FAIL(table_ctx_manager_.remove(ctx->param_.table_id_, ctx))) {
LOG_WARN("fail to remove table ctx", KR(ret));
} else {
ObMutexGuard guard(mutex_);
abort_unless(dirty_list_.add_last(ctx));
}
}
return ret;
manager_.put_table_ctx(table_ctx);
}
} // namespace observer
} // namespace oceanbase
} // namespace observer
} // namespace oceanbase
......@@ -4,7 +4,6 @@
#pragma once
#include "lib/list/ob_dlist.h"
#include "lib/task/ob_timer.h"
#include "observer/table_load/ob_table_load_manager.h"
#include "observer/table_load/ob_table_load_struct.h"
......@@ -32,9 +31,9 @@ public:
void destroy();
public:
int create_table_ctx(const ObTableLoadParam &param, ObTableLoadTableCtx *&ctx, bool &is_new);
int remove_table_ctx(ObTableLoadTableCtx *ctx);
int get_table_ctx(uint64_t table_id, ObTableLoadTableCtx *&ctx);
void put_table_ctx(ObTableLoadTableCtx *ctx);
int remove_table_ctx(ObTableLoadTableCtx *ctx);
private:
static const int64_t GC_INTERVAL = 30LL * 1000 * 1000; // 30s
static const int64_t RELEASE_INTERVAL = 1LL * 1000 * 1000; // 1s
......@@ -65,9 +64,7 @@ private:
bool is_inited_;
};
private:
ObTableLoadManager<uint64_t, ObTableLoadTableCtx> table_ctx_manager_;
mutable lib::ObMutex mutex_;
common::ObDList<ObTableLoadTableCtx> dirty_list_;
ObTableLoadManager manager_;
common::ObTimer gc_timer_;
ObGCTask gc_task_;
ObReleaseTask release_task_;
......
......@@ -310,6 +310,7 @@ int ObDirectLoadSSTableDataFuse::init(const ObDirectLoadDataFuseParam &param,
ret = OB_INVALID_ARGUMENT;
LOG_WARN("Invalid argument", K(ret), K(param), KP(origin_table), K(range));
} else {
allocator_.set_tenant_id(MTL_ID());
// construct iters
if (OB_FAIL(origin_table->scan(range, allocator_, origin_iter_))) {
LOG_WARN("fail to scan origin table", KR(ret));
......@@ -379,6 +380,7 @@ int ObDirectLoadMultipleSSTableDataFuse::init(
ret = OB_INVALID_ARGUMENT;
LOG_WARN("Invalid argument", K(ret), K(param), KP(origin_table), K(range));
} else {
allocator_.set_tenant_id(MTL_ID());
if (OB_FAIL(range_.assign(param.tablet_id_, range))) {
LOG_WARN("fail to assign range", KR(ret));
}
......
......@@ -8,6 +8,7 @@
#include "lib/lock/ob_mutex.h"
#include "lib/list/ob_list.h"
#include "share/ob_errno.h"
#include "share/rc/ob_tenant_base.h"
namespace oceanbase
{
......@@ -18,7 +19,7 @@ template<class T>
class ObDirectLoadEasyQueue //性能很差的一个queue,主要为了方便使用
{
public:
ObDirectLoadEasyQueue() : malloc_("TLD_easy_queue"), queue_(malloc_) {}
ObDirectLoadEasyQueue() : malloc_(ObMemAttr(MTL_ID(), "TLD_EasyQueue")), queue_(malloc_) {}
int push(const T &e) {
int ret = OB_SUCCESS;
......
......@@ -5,6 +5,7 @@
#define USING_LOG_PREFIX STORAGE
#include "storage/direct_load/ob_direct_load_fast_heap_table.h"
#include "share/rc/ob_tenant_base.h"
namespace oceanbase
{
......@@ -90,12 +91,15 @@ int ObDirectLoadFastHeapTable::init(const ObDirectLoadFastHeapTableCreateParam &
} else if (OB_UNLIKELY(!param.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid args", KR(ret), K(param));
} else if (OB_FAIL(copy_col_stat(param))){
LOG_WARN("fail to inner init", KR(ret), K(param));
} else {
meta_.tablet_id_ = param.tablet_id_;
meta_.row_count_ = param.row_count_;
is_inited_ = true;
allocator_.set_tenant_id(MTL_ID());
if (OB_FAIL(copy_col_stat(param))){
LOG_WARN("fail to inner init", KR(ret), K(param));
} else {
is_inited_ = true;
}
}
return ret;
}
......
......@@ -125,6 +125,7 @@ int ObDirectLoadFastHeapTableBuilder::init(const ObDirectLoadFastHeapTableBuildP
LOG_WARN("invalid args", KR(ret), K(param));
} else {
param_ = param;
allocator_.set_tenant_id(MTL_ID());
if (param_.online_opt_stat_gather_ && OB_FAIL(init_sql_statistics())) {
LOG_WARN("fail to inner init sql statistics", KR(ret));
} else if (OB_FAIL(param_.fast_heap_table_ctx_->get_tablet_context(
......
......@@ -49,6 +49,7 @@ int ObDirectLoadFastHeapTableContext::init(uint64_t tenant_id,
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid args", KR(ret), K(tenant_id), K(ls_partition_ids), K(target_ls_partition_ids));
} else {
allocator_.set_tenant_id(MTL_ID());
if (OB_FAIL(create_all_tablet_contexts(tenant_id, ls_partition_ids, target_ls_partition_ids, reserved_parallel))) {
LOG_WARN("fail to create all tablet contexts", KR(ret));
} else {
......@@ -71,7 +72,7 @@ int ObDirectLoadFastHeapTableContext::create_all_tablet_contexts(
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid args", KR(ret), K(ls_partition_ids), K(target_ls_partition_ids));
} else if (OB_FAIL(
tablet_ctx_map_.create(ls_partition_ids.count(), "TLD_TabInsCtx", "TLD_TabInsCtx"))) {
tablet_ctx_map_.create(ls_partition_ids.count(), "TLD_TabInsCtx", "TLD_TabInsCtx", MTL_ID()))) {
LOG_WARN("fail to create tablet ctx map", KR(ret));
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < ls_partition_ids.count(); ++i) {
......
......@@ -115,6 +115,7 @@ ObDirectLoadMemContext::~ObDirectLoadMemContext()
int ObDirectLoadMemContext::init()
{
int ret = OB_SUCCESS;
allocator_.set_tenant_id(MTL_ID());
if (OB_FAIL(mem_dump_queue_.init(1024))) {
STORAGE_LOG(WARN, "fail to init mem dump queue", KR(ret));
}
......
......@@ -89,7 +89,7 @@ int ObDirectLoadMemLoader::work()
LOG_WARN("some error ocurr", KR(ret));
}
if (OB_SUCC(ret)) {
chunk = OB_NEW(ChunkType, "TLD_row_chunk");
chunk = OB_NEW(ChunkType, ObMemAttr(MTL_ID(), "TLD_MemChunkVal"));
if (chunk == nullptr) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to allocate mem", KR(ret));
......
......@@ -27,7 +27,7 @@ public:
int init()
{
return map_.create(1024, "TLD_multi_map", "TLD_multi_map");
return map_.create(1024, "TLD_multi_map", "TLD_multi_map", MTL_ID());
}
virtual ~ObDirectLoadMultiMapNoLock()
......
......@@ -189,7 +189,7 @@ int ObDirectLoadMultipleHeapTableSorter::work()
LOG_WARN("some error ocurr", KR(ret));
}
if (OB_SUCC(ret)) {
chunk = OB_NEW(ChunkType, "TLD_row_chunk", mem_ctx_->table_data_desc_.heap_table_mem_chunk_size_);
chunk = OB_NEW(ChunkType, ObMemAttr(MTL_ID(), "TLD_MemChunkVal"), mem_ctx_->table_data_desc_.heap_table_mem_chunk_size_);
if (chunk == nullptr) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to allocate mem", KR(ret));
......
......@@ -174,6 +174,7 @@ int ObDirectLoadMultipleSSTable::copy(const ObDirectLoadMultipleSSTable &other)
LOG_WARN("invalid args", KR(ret), K(other));
} else {
reset();
allocator_.set_tenant_id(MTL_ID());
meta_ = other.meta_;
if (OB_FAIL(start_key_.deep_copy(other.start_key_, allocator_))) {
LOG_WARN("fail to deep copy rowkey", KR(ret));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册