提交 57528cd7 编写于 作者: N nroskill 提交者: LINGuanRen

fix memory leak

上级 b0e85791
......@@ -24,17 +24,12 @@
using namespace oceanbase::lib;
using namespace oceanbase::common;
ObMallocAllocator* ObMallocAllocator::instance_ = NULL;
uint64_t ObMallocAllocator::max_used_tenant_id_ = 0;
bool ObMallocAllocator::is_inited_ = false;
ObMallocAllocator::ObMallocAllocator() : locks_(), allocators_(), reserved_(0), urgent_(0)
{}
ObMallocAllocator::~ObMallocAllocator()
{}
int ObMallocAllocator::init()
{
set_root_allocator();
int ret = OB_SUCCESS;
for (int64_t i = 0; OB_SUCC(ret) && i < ObCtxIds::MAX_CTX_ID; i++) {
if (OB_FAIL(create_tenant_ctx_allocator(OB_SYS_TENANT_ID, i))) {
......@@ -43,7 +38,12 @@ int ObMallocAllocator::init()
LOG_ERROR("create tenant allocator fail", K(ret), K(i));
}
}
return ret;
is_inited_ = true;
}
ObMallocAllocator::~ObMallocAllocator()
{
is_inited_ = false;
}
void* ObMallocAllocator::alloc(const int64_t size)
......@@ -321,60 +321,21 @@ int ObMallocAllocator::delete_tenant_ctx_allocator(uint64_t tenant_id)
return ret;
}
int ObMallocAllocator::set_root_allocator(ObTenantCtxAllocator* allocator)
void ObMallocAllocator::set_root_allocator()
{
int ret = OB_SUCCESS;
if (OB_ISNULL(allocator)) {
ret = OB_INVALID_ARGUMENT;
} else if (!OB_ISNULL(allocators_[OB_SERVER_TENANT_ID][0])) {
ret = OB_ENTRY_EXIST;
static ObTenantCtxAllocator allocator(OB_SERVER_TENANT_ID);
if (OB_FAIL(allocator.set_tenant_memory_mgr())) {
LOG_ERROR("set_tenant_memory_mgr failed", K(ret));
} else {
allocators_[OB_SERVER_TENANT_ID][0] = allocator;
allocators_[OB_SERVER_TENANT_ID][0] = &allocator;
}
return ret;
}
ObMallocAllocator* ObMallocAllocator::get_instance()
{
int ret = OB_SUCCESS;
if (OB_ISNULL(ObMallocAllocator::instance_)) {
ObMallocAllocator* malloc_allocator = NULL;
ObTenantCtxAllocator* allocator = new (std::nothrow) ObTenantCtxAllocator(OB_SERVER_TENANT_ID);
if (!OB_ISNULL(allocator)) {
if (OB_FAIL(allocator->set_tenant_memory_mgr())) {
LOG_ERROR("set_tenant_memory_mgr failed", K(ret));
} else {
ObMemAttr attr;
attr.tenant_id_ = OB_SERVER_TENANT_ID;
attr.label_ = ObModIds::OB_TENANT_CTX_ALLOCATOR;
void* buf = allocator->alloc(sizeof(ObMallocAllocator), attr);
if (!OB_ISNULL(buf)) {
malloc_allocator = new (buf) ObMallocAllocator();
ret = malloc_allocator->set_root_allocator(allocator);
} else {
ret = OB_ALLOCATE_MEMORY_FAILED;
}
}
} else {
ret = OB_ALLOCATE_MEMORY_FAILED;
}
if (OB_FAIL(ret)) {
if (!OB_ISNULL(malloc_allocator)) {
malloc_allocator->~ObMallocAllocator();
allocator->free(malloc_allocator);
malloc_allocator = NULL;
}
if (!OB_ISNULL(allocator)) {
delete allocator;
allocator = NULL;
}
} else {
ObMallocAllocator::instance_ = malloc_allocator;
}
}
return ObMallocAllocator::instance_;
static ObMallocAllocator instance;
return &instance;
}
int ObMallocAllocator::with_resource_handle_invoke(uint64_t tenant_id, InvokeFunc func)
......
......@@ -32,14 +32,12 @@ public:
ObMallocAllocator();
virtual ~ObMallocAllocator();
int init();
void* alloc(const int64_t size);
void* alloc(const int64_t size, const ObMemAttr& attr);
void* realloc(const void* ptr, const int64_t size, const ObMemAttr& attr);
void free(void* ptr);
int set_root_allocator(ObTenantCtxAllocator* allocator);
void set_root_allocator();
static ObMallocAllocator* get_instance();
ObTenantCtxAllocator* get_tenant_ctx_allocator(uint64_t tenant_id, uint64_t ctx_id = 0) const;
......@@ -68,6 +66,8 @@ public:
const uint64_t tenant_id, const uint64_t ctx_id, const int64_t size, const bool reserve = false);
int get_chunks(AChunk** chunks, int cap, int& cnt);
static uint64_t get_max_used_tenant_id() { return max_used_tenant_id_; }
static bool is_inited_;
private:
using InvokeFunc = std::function<int(ObTenantMemoryMgr*)>;
static int with_resource_handle_invoke(uint64_t tenant_id, InvokeFunc func);
......@@ -81,8 +81,6 @@ private:
int64_t reserved_;
int64_t urgent_;
static uint64_t max_used_tenant_id_;
static ObMallocAllocator* instance_;
}; // end of class ObMallocAllocator
} // end of namespace lib
......
......@@ -34,8 +34,8 @@ public:
normal_locker_(mutex_),
logger_locker_(mutex_),
locker_(!for_logger ? static_cast<ISetLocker&>(normal_locker_) : static_cast<ISetLocker&>(logger_locker_)),
os_(),
bs_()
bs_(),
os_()
{
bs_.set_locker(&locker_);
os_.set_locker(&locker_);
......@@ -91,8 +91,8 @@ private:
SetLocker normal_locker_;
SetLockerForLogger logger_locker_;
ISetLocker& locker_;
ObjectSet os_;
BlockSet bs_;
ObjectSet os_;
};
template <int N>
......
......@@ -121,7 +121,7 @@ void oceanbase::common::ob_free_align(void* ptr)
{
if (NULL == ptr) {
_OB_LOG(WARN, "cannot free NULL pointer.");
} else {
} else if (oceanbase::lib::ObMallocAllocator::is_inited_) {
uint8_t* sign_ptr = reinterpret_cast<uint8_t*>(static_cast<char*>(ptr) - 1);
int64_t* header_ptr = reinterpret_cast<int64_t*>(static_cast<char*>(ptr) - 1 - sizeof(int64_t));
char* origin_ptr = NULL;
......
......@@ -42,18 +42,22 @@ inline void* ob_malloc(const int64_t nbyte, const ObMemAttr& attr = default_mema
inline void ob_free(void* ptr)
{
ObIAllocator* allocator = lib::ObMallocAllocator::get_instance();
abort_unless(!OB_ISNULL(allocator));
allocator->free(ptr);
ptr = NULL;
if (OB_LIKELY(lib::ObMallocAllocator::is_inited_)) {
ObIAllocator* allocator = lib::ObMallocAllocator::get_instance();
abort_unless(!OB_ISNULL(allocator));
allocator->free(ptr);
ptr = NULL;
}
}
inline void* ob_realloc(void* ptr, const int64_t nbyte, const ObMemAttr& attr)
{
void* nptr = NULL;
ObIAllocator* allocator = lib::ObMallocAllocator::get_instance();
if (!OB_ISNULL(allocator)) {
nptr = allocator->realloc(ptr, nbyte, attr);
if (OB_LIKELY(lib::ObMallocAllocator::is_inited_)) {
ObIAllocator* allocator = lib::ObMallocAllocator::get_instance();
if (!OB_ISNULL(allocator)) {
nptr = allocator->realloc(ptr, nbyte, attr);
}
}
return nptr;
}
......
......@@ -139,8 +139,6 @@ namespace oceanbase {
namespace common {
ObIAllocator* global_default_allocator = NULL;
extern void tsi_factory_init();
extern void tsi_factory_destroy();
int ob_init_memory_pool(int64_t block_size)
{
UNUSED(block_size);
......@@ -175,17 +173,12 @@ void __attribute__((constructor(MALLOC_INIT_PRIORITY))) init_global_memory_pool(
coro.get_context().get_stack(cls->stack_addr_, cls->stack_size_);
return OB_SUCCESS;
};
ObMallocAllocator* allocator = ObMallocAllocator::get_instance();
if (OB_ISNULL(allocator) || OB_FAIL(allocator->init())) {}
global_default_allocator = ObMallocAllocator::get_instance();
tsi_factory_init();
abort_unless(OB_SUCCESS == install_ob_signal_handler());
}
void __attribute__((destructor(MALLOC_INIT_PRIORITY))) deinit_global_memory_pool()
{
tsi_factory_destroy();
}
{}
} // end namespace common
} // end namespace oceanbase
......@@ -45,8 +45,7 @@ int CWLock::unlock() const
}
//////////////////////////////////////////////////////////////////////////////////////
CRWLock::CRWLock(ELockMode lockMode)
CRWLock::CRWLock(ELockMode lockMode) : _rlock(&_rwlock), _wlock(&_rwlock)
{
pthread_rwlockattr_t attr;
pthread_rwlockattr_init(&attr);
......@@ -56,13 +55,9 @@ CRWLock::CRWLock(ELockMode lockMode)
pthread_rwlockattr_setkind_np(&attr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP);
}
pthread_rwlock_init(&_rwlock, &attr);
_rlock = new CRLock(&_rwlock);
_wlock = new CWLock(&_rwlock);
}
CRWLock::~CRWLock()
{
pthread_rwlock_destroy(&_rwlock);
delete _rlock;
delete _wlock;
}
......@@ -60,18 +60,18 @@ public:
CRWLock(ELockMode lockMode = NO_PRIORITY);
~CRWLock();
CRLock* rlock() const
const CRLock *rlock() const
{
return _rlock;
return &_rlock;
}
CWLock* wlock() const
const CWLock *wlock() const
{
return _wlock;
return &_wlock;
}
private:
CRLock* _rlock;
CWLock* _wlock;
CRLock _rlock;
CWLock _wlock;
pthread_rwlock_t _rwlock;
};
......
......@@ -80,41 +80,39 @@ void ObMemoryCutter::free_stack(int64_t& total_size)
void ObMemoryCutter::free_memstore(int64_t& total_size)
{
if (ObMallocAllocator::instance_ != nullptr) {
for (int64_t slot = 0; slot < ObMallocAllocator::PRESERVED_TENANT_COUNT; ++slot) {
bool has_crash = false;
do_with_crash_restore(
[&]() {
ObTenantCtxAllocator* ta = ObMallocAllocator::instance_->allocators_[slot][ObCtxIds::MEMSTORE_CTX_ID];
while (ta != nullptr) {
bool has_crash = false;
do_with_crash_restore(
[&]() {
AChunk* cur = ta->using_list_head_.next2_;
while (cur != &ta->using_list_head_) {
if (cur->check_magic_code()) {
AChunk* next = cur->next2_;
uint64_t all_size = chunk_size(cur);
free_chunk(cur, all_size);
total_size += all_size;
cur = next;
} else {
DLOG(WARN, "invalid chunk magic");
break;
}
}
},
has_crash);
if (has_crash) {
DLOG(WARN, "restore from crash, let's goon~");
for (int64_t slot = 0; slot < ObMallocAllocator::PRESERVED_TENANT_COUNT; ++slot) {
bool has_crash = false;
do_with_crash_restore(
[&]() {
ObTenantCtxAllocator* ta = ObMallocAllocator::get_instance()->allocators_[slot][ObCtxIds::MEMSTORE_CTX_ID];
while (ta != nullptr) {
bool has_crash = false;
do_with_crash_restore(
[&]() {
AChunk* cur = ta->using_list_head_.next2_;
while (cur != &ta->using_list_head_) {
if (cur->check_magic_code()) {
AChunk* next = cur->next2_;
uint64_t all_size = chunk_size(cur);
free_chunk(cur, all_size);
total_size += all_size;
cur = next;
} else {
DLOG(WARN, "invalid chunk magic");
break;
}
}
ta = ta->get_next();
}
},
has_crash);
if (has_crash) {
DLOG(WARN, "restore from crash, let's goon~");
}
},
has_crash);
if (has_crash) {
DLOG(WARN, "restore from crash, let's goon~");
}
ta = ta->get_next();
}
},
has_crash);
if (has_crash) {
DLOG(WARN, "restore from crash, let's goon~");
}
}
}
......
......@@ -19,15 +19,5 @@ TSIFactory& get_tsi_fatcory()
static TSIFactory instance;
return instance;
}
void tsi_factory_init()
{
get_tsi_fatcory().init();
}
void tsi_factory_destroy()
{
get_tsi_fatcory().destroy();
}
} // namespace common
} // namespace oceanbase
......@@ -384,43 +384,31 @@ private:
class TSIFactory;
extern TSIFactory& get_tsi_fatcory();
extern void tsi_factory_init();
extern void tsi_factory_destroy();
class TSIFactory {
static const pthread_key_t INVALID_THREAD_KEY = INT32_MAX;
public:
TSIFactory() : key_(INVALID_THREAD_KEY)
{}
~TSIFactory()
{}
public:
int init()
{
int ret = OB_SUCCESS;
if (0 != pthread_key_create(&key_, destroy_thread_data_)) {
_LIB_LOG(WARN, "pthread_key_create fail errno=%u", errno);
ret = OB_ERROR;
}
return ret;
}
int destroy()
~TSIFactory()
{
int ret = OB_SUCCESS;
if (INVALID_THREAD_KEY != key_) {
void* ptr = pthread_getspecific(key_);
destroy_thread_data_(ptr);
if (0 != pthread_key_delete(key_)) {
ret = OB_ERR_UNEXPECTED;
_LIB_LOG(WARN, "pthread_key_delete fail errno=%u", errno);
} else {
key_ = INVALID_THREAD_KEY;
}
}
return ret;
}
public:
template <class T>
T* new_instance()
{
......
......@@ -389,7 +389,10 @@ int main(int argc, char* argv[])
CoSetSched sched;
sched.CoMainRoutine::init();
sched.active_routine_ = &sched;
#ifndef OB_USE_ASAN
get_mem_leak_checker().init();
#endif
// coroutine settings
::oceanbase::common::USE_CO_LATCH = false;
......
ob_unittest(test_archive_task)
# ob_unittest(test_archive_task)
......@@ -305,7 +305,6 @@ TEST_F(TestHashMapPerformance, test_performance)
ObArray<blocksstable::MacroBlockId> block_ids;
ObArray<int64_t> sstable_block_idx;
ObArray<blocksstable::MacroBlockId> query_block_ids;
ASSERT_EQ(OB_SUCCESS, lib::ObMallocAllocator::get_instance()->init());
ASSERT_EQ(OB_SUCCESS, generate_data(count, block_ids, sstable_block_idx));
ASSERT_EQ(OB_SUCCESS, prepare_hash_map(count, 1.0));
ASSERT_EQ(OB_SUCCESS, prepare_array_hash_map(count, 1.0));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册