提交 2e4c947f 编写于 作者: N nroskill 提交者: LINGuanRen

slog writer task use dynamic memory alloc

上级 619e5e3d
......@@ -104,8 +104,7 @@ int ObMsInfoTask::update_curr_member_list(const common::ObMemberList& curr_membe
return ret;
}
ObSlogWriterQueueThread::ObSlogWriterQueueThread()
: inited_(false), partition_service_(NULL), free_queue_(), tasks_(NULL), tg_id_(-1)
ObSlogWriterQueueThread::ObSlogWriterQueueThread() : inited_(false), partition_service_(NULL), free_queue_(), tg_id_(-1)
{}
ObSlogWriterQueueThread::~ObSlogWriterQueueThread()
......@@ -115,49 +114,34 @@ ObSlogWriterQueueThread::~ObSlogWriterQueueThread()
void ObSlogWriterQueueThread::destroy()
{
int ret = OB_SUCCESS;
inited_ = false;
partition_service_ = NULL;
if (NULL != tasks_) {
ob_free(tasks_);
ObMsInfoTask *task = nullptr;
while (OB_SUCC(free_queue_.pop((ObLink *&)task))) {
if (OB_NOT_NULL(task)) {
ob_free(task);
}
}
tasks_ = NULL;
STORAGE_LOG(INFO, "ObSlogWriterQueueThread destroy");
}
int ObSlogWriterQueueThread::init(ObPartitionService* partition_service, int tg_id)
{
int ret = OB_SUCCESS;
const int64_t max_task_num = OB_MAX_PARTITION_NUM_PER_SERVER * 2;
tg_id_ = tg_id;
if (inited_) {
ret = OB_INIT_TWICE;
STORAGE_LOG(WARN, "ObSlogWriterQueueThread has already been inited", K(ret));
} else if (NULL == partition_service) {
} else if (OB_ISNULL(partition_service)) {
ret = OB_INVALID_ARGUMENT;
STORAGE_LOG(WARN, "invalid argument", K(ret), KP(partition_service));
} else if (OB_FAIL(TG_SET_HANDLER_AND_START(tg_id_, *this))) {
STORAGE_LOG(WARN, "ObSimpleThreadPool inited error.", K(ret));
} else if (OB_SUCCESS != (ret = free_queue_.init(max_task_num))) {
STORAGE_LOG(WARN, "initialize fixed queue of tasks failed", K(ret));
} else {
int64_t size = sizeof(ObMsInfoTask) * max_task_num;
ObMemAttr attr(common::OB_SERVER_TENANT_ID, ObModIds::OB_CALLBACK_TASK);
if (NULL == (tasks_ = (ObMsInfoTask*)ob_malloc(size, attr))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
STORAGE_LOG(ERROR, "no memory", K(ret), K(size));
} else {
for (int64_t i = max_task_num - 1; OB_SUCC(ret) && i >= 0; --i) {
(void)new (tasks_ + i) ObMsInfoTask;
if (OB_SUCCESS != (ret = free_queue_.push(&tasks_[i]))) {
STORAGE_LOG(WARN, "push free task failed", K(ret));
}
}
if (OB_SUCC(ret)) {
partition_service_ = partition_service;
inited_ = true;
STORAGE_LOG(INFO, "ObSlogWriterQueueThread init success", K_(tg_id));
}
}
partition_service_ = partition_service;
inited_ = true;
STORAGE_LOG(INFO, "ObSlogWriterQueueThread init success", K_(tg_id));
}
if (OB_SUCCESS != ret && !inited_) {
destroy();
......@@ -171,9 +155,25 @@ int ObSlogWriterQueueThread::get_task(ObMsInfoTask*& task)
if (!inited_) {
ret = OB_NOT_INIT;
STORAGE_LOG(WARN, "ObSlogWriterQueueThread not init", K(ret));
} else if (OB_SUCCESS != (ret = free_queue_.pop(task))) {
STORAGE_LOG(WARN, "pop free task failed", K(ret));
} else if (OB_SUCC(free_queue_.pop((ObLink *&)task))) {
// do nothing
} else if (OB_SUCC(alloc_task(task))) {
// do nothing
} else {
STORAGE_LOG(ERROR, "get task failed");
}
return ret;
}
int ObSlogWriterQueueThread::alloc_task(ObMsInfoTask *&task)
{
const ObMemAttr attr(common::OB_SERVER_TENANT_ID, ObModIds::OB_CALLBACK_TASK);
int ret = OB_SUCCESS;
if (OB_ISNULL(task = (ObMsInfoTask *)ob_malloc(sizeof(ObMsInfoTask), attr))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
STORAGE_LOG(WARN, "alloc task failed");
} else {
task = new (task) ObMsInfoTask;
}
return ret;
}
......@@ -184,11 +184,14 @@ void ObSlogWriterQueueThread::free_task(ObMsInfoTask* task)
if (!inited_) {
tmp_ret = OB_NOT_INIT;
STORAGE_LOG(ERROR, "ObSlogWriterQueueThread not init", K(tmp_ret));
} else if (NULL == task) {
} else if (OB_ISNULL(task)) {
tmp_ret = OB_INVALID_ARGUMENT;
STORAGE_LOG(ERROR, "ObSlogWriterQueueThread invalid argument", K(tmp_ret), K(task));
} else if (OB_SUCCESS != (tmp_ret = free_queue_.push(task))) {
STORAGE_LOG(ERROR, "push free task failed", K(tmp_ret));
} else if (free_queue_.size() >= MAX_FREE_TASK_NUM) {
ob_free(task);
} else if (OB_UNLIKELY(OB_SUCCESS != (tmp_ret = free_queue_.push(task)))) {
STORAGE_LOG(WARN, "push free task failed", K(tmp_ret));
ob_free(task);
}
}
......@@ -202,9 +205,9 @@ int ObSlogWriterQueueThread::push(const ObMsInfoTask* task)
} else if (!task->is_valid()) {
ret = OB_INVALID_ARGUMENT;
STORAGE_LOG(WARN, "invalid argument", K(ret), K(*task));
} else if (OB_SUCCESS != (ret = get_task(saved_task))) {
} else if (OB_FAIL(get_task(saved_task))) {
STORAGE_LOG(WARN, "get free task failed", K(ret));
} else if (NULL == saved_task) {
} else if (OB_ISNULL(saved_task)) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "Unexpected error, saved_task shouldn't be null", K(ret));
} else {
......
......@@ -15,7 +15,7 @@
#include "common/ob_partition_key.h"
#include "common/ob_member_list.h"
#include "lib/queue/ob_fixed_queue.h"
#include "lib/queue/ob_link_queue.h"
#include "lib/thread/thread_mgr_interface.h"
#include "share/ob_proposal_id.h"
#include "clog/ob_log_define.h"
......@@ -24,7 +24,7 @@ namespace oceanbase {
namespace storage {
class ObPartitionService;
class ObMsInfoTask {
class ObMsInfoTask : public common::ObLink {
public:
ObMsInfoTask()
: pkey_(),
......@@ -141,6 +141,7 @@ class ObSlogWriterQueueThread : public lib::TGTaskHandler {
public:
static const int64_t QUEUE_THREAD_NUM = 4;
static const int64_t MINI_MODE_QUEUE_THREAD_NUM = 2;
static const int64_t MAX_FREE_TASK_NUM = 1024;
static const int64_t SLOG_FLUSH_TASK_TIMEOUT_THRESHOLD = clog::CLOG_LEADER_RECONFIRM_SYNC_TIMEOUT;
ObSlogWriterQueueThread();
virtual ~ObSlogWriterQueueThread();
......@@ -157,13 +158,13 @@ public:
private:
int get_task(ObMsInfoTask*& task);
int alloc_task(ObMsInfoTask*& task);
void free_task(ObMsInfoTask* task);
private:
bool inited_;
ObPartitionService* partition_service_;
common::ObFixedQueue<ObMsInfoTask> free_queue_;
ObMsInfoTask* tasks_;
common::ObLinkQueue free_queue_;
int tg_id_;
private:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册