提交 62be804b 编写于 作者: O obdev 提交者: ob-robot

reduce table load extera buf

上级 d3e03d64
......@@ -89,6 +89,43 @@ int ObTableLoadStoreCtx::init(int64_t ddl_task_id,
LOG_WARN("fail to push back ls tablet id", KR(ret));
}
}
if (OB_SUCC(ret)) {
is_multiple_mode_ = (!ctx_->schema_.is_heap_table_ && ctx_->param_.need_sort_) ||
ls_partition_ids_.count() > ObDirectLoadTableStore::MAX_BUCKET_CNT;
table_data_desc_.rowkey_column_num_ =
(!ctx_->schema_.is_heap_table_ ? ctx_->schema_.rowkey_column_count_ : 0);
table_data_desc_.column_count_ = ctx_->param_.column_count_;
table_data_desc_.external_data_block_size_ = ObDirectLoadDataBlock::DEFAULT_DATA_BLOCK_SIZE;
table_data_desc_.sstable_index_block_size_ =
ObDirectLoadSSTableIndexBlock::DEFAULT_INDEX_BLOCK_SIZE;
table_data_desc_.sstable_data_block_size_ =
ObDirectLoadSSTableDataBlock::DEFAULT_DATA_BLOCK_SIZE;
table_data_desc_.extra_buf_size_ = ObDirectLoadTableDataDesc::DEFAULT_EXTRA_BUF_SIZE;
table_data_desc_.compressor_type_ = ObCompressorType::NONE_COMPRESSOR;
table_data_desc_.is_heap_table_ = ctx_->schema_.is_heap_table_;
int64_t wa_mem_limit = 0;
if (OB_FAIL(get_wa_memory_limit(wa_mem_limit))) {
LOG_WARN("failed to get work area memory limit", KR(ret), K(ctx_->param_.tenant_id_));
} else if (wa_mem_limit < ObDirectLoadMemContext::MIN_MEM_LIMIT) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("wa_mem_limit is too small", KR(ret), K(wa_mem_limit));
} else {
table_data_desc_.merge_count_per_round_ = min(wa_mem_limit
/ table_data_desc_.sstable_data_block_size_
/ ctx_->param_.session_count_, ObDirectLoadSSTableScanMerge::MAX_SSTABLE_COUNT);
table_data_desc_.max_mem_chunk_count_ = 128;
int64_t mem_chunk_size = wa_mem_limit / table_data_desc_.max_mem_chunk_count_;
if (mem_chunk_size <= ObDirectLoadExternalMultiPartitionRowChunk::MIN_MEMORY_LIMIT) {
mem_chunk_size = ObDirectLoadExternalMultiPartitionRowChunk::MIN_MEMORY_LIMIT;
table_data_desc_.max_mem_chunk_count_ = wa_mem_limit / mem_chunk_size;
}
table_data_desc_.mem_chunk_size_ = mem_chunk_size;
table_data_desc_.heap_table_mem_chunk_size_ = wa_mem_limit / ctx_->param_.session_count_;
LOG_INFO("table_data_desc init end", K(table_data_desc_));
}
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(insert_table_param.ls_partition_ids_.assign(target_ls_partition_ids_))) {
LOG_WARN("fail to assign ls tablet ids", KR(ret));
......@@ -162,8 +199,6 @@ int ObTableLoadStoreCtx::init(int64_t ddl_task_id,
LOG_WARN("fail to init sequence", KR(ret));
}
if (OB_SUCC(ret)) {
is_multiple_mode_ = (!ctx_->schema_.is_heap_table_ && ctx_->param_.need_sort_) ||
ls_partition_ids_.count() > ObDirectLoadTableStore::MAX_BUCKET_CNT;
if (!is_multiple_mode_ && ctx_->schema_.is_heap_table_) {
is_fast_heap_table_ = true;
if (OB_ISNULL(fast_heap_table_ctx_ =
......@@ -178,41 +213,6 @@ int ObTableLoadStoreCtx::init(int64_t ddl_task_id,
}
}
}
if (OB_SUCC(ret)) {
table_data_desc_.rowkey_column_num_ =
(!ctx_->schema_.is_heap_table_ ? ctx_->schema_.rowkey_column_count_ : 0);
table_data_desc_.column_count_ = ctx_->param_.column_count_;
table_data_desc_.external_data_block_size_ = ObDirectLoadDataBlock::DEFAULT_DATA_BLOCK_SIZE;
table_data_desc_.sstable_index_block_size_ =
ObDirectLoadSSTableIndexBlock::DEFAULT_INDEX_BLOCK_SIZE;
table_data_desc_.sstable_data_block_size_ =
ObDirectLoadSSTableDataBlock::DEFAULT_DATA_BLOCK_SIZE;
table_data_desc_.extra_buf_size_ = ObDirectLoadTableDataDesc::DEFAULT_EXTRA_BUF_SIZE;
table_data_desc_.compressor_type_ = ObCompressorType::NONE_COMPRESSOR;
table_data_desc_.is_heap_table_ = ctx_->schema_.is_heap_table_;
int64_t wa_mem_limit = 0;
if (OB_FAIL(get_wa_memory_limit(wa_mem_limit))) {
LOG_WARN("failed to get work area memory limit", KR(ret), K(ctx_->param_.tenant_id_));
} else if (wa_mem_limit < ObDirectLoadMemContext::MIN_MEM_LIMIT) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("wa_mem_limit is too small", KR(ret), K(wa_mem_limit));
} else {
table_data_desc_.merge_count_per_round_ = min(wa_mem_limit
/ table_data_desc_.sstable_data_block_size_
/ ctx_->param_.session_count_, ObDirectLoadSSTableScanMerge::MAX_SSTABLE_COUNT);
table_data_desc_.max_mem_chunk_count_ = 128;
int64_t mem_chunk_size = wa_mem_limit / table_data_desc_.max_mem_chunk_count_;
if (mem_chunk_size <= ObDirectLoadExternalMultiPartitionRowChunk::MIN_MEMORY_LIMIT) {
mem_chunk_size = ObDirectLoadExternalMultiPartitionRowChunk::MIN_MEMORY_LIMIT;
table_data_desc_.max_mem_chunk_count_ = wa_mem_limit / mem_chunk_size;
}
table_data_desc_.mem_chunk_size_ = mem_chunk_size;
table_data_desc_.heap_table_mem_chunk_size_ = wa_mem_limit / ctx_->param_.session_count_;
LOG_INFO("table_data_desc init end", K(table_data_desc_));
}
}
if (OB_SUCC(ret)) {
is_inited_ = true;
} else {
......@@ -617,6 +617,14 @@ int ObTableLoadStoreCtx::init_session_ctx_array()
for (int64_t i = 0; OB_SUCC(ret) && i < ctx_->param_.session_count_; ++i) {
SessionContext *session_ctx = session_ctx_array_ + i;
session_ctx->autoinc_param_ = autoinc_param;
if (is_multiple_mode_) {
session_ctx->extra_buf_size_ = table_data_desc_.extra_buf_size_;
if (OB_ISNULL(session_ctx->extra_buf_ =
static_cast<char *>(allocator_.alloc(session_ctx->extra_buf_size_)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to alloc memory", KR(ret));
}
}
}
}
return ret;
......
......@@ -129,7 +129,11 @@ public:
share::schema::ObSequenceSchema sequence_schema_;
struct SessionContext
{
SessionContext() : extra_buf_(nullptr), extra_buf_size_(0) {}
share::AutoincParam autoinc_param_;
// for multiple mode
char *extra_buf_;
int64_t extra_buf_size_;
};
SessionContext *session_ctx_array_;
private:
......
......@@ -91,9 +91,7 @@ ObTableLoadTransStoreWriter::SessionContext::SessionContext(int32_t session_id,
: session_id_(session_id),
cast_allocator_("TLD_TS_Caster", OB_MALLOC_NORMAL_BLOCK_SIZE, tenant_id),
cast_params_(cast_params),
last_receive_sequence_no_(0),
extra_buf_(nullptr),
extra_buf_size_(0)
last_receive_sequence_no_(0)
{
}
......@@ -204,21 +202,10 @@ int ObTableLoadTransStoreWriter::init_session_ctx_array()
param.result_info_ = &(trans_ctx_->ctx_->store_ctx_->result_info_);
for (int64_t i = 0; OB_SUCC(ret) && i < param_.session_count_; ++i) {
SessionContext *session_ctx = session_ctx_array_ + i;
if (store_ctx_->is_multiple_mode_) {
session_ctx->extra_buf_size_ = store_ctx_->table_data_desc_.extra_buf_size_;
if (OB_ISNULL(session_ctx->extra_buf_ =
static_cast<char *>(allocator_.alloc(session_ctx->extra_buf_size_)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to alloc memory", KR(ret));
} else {
param.extra_buf_ = session_ctx->extra_buf_;
param.extra_buf_size_ = session_ctx->extra_buf_size_;
}
}
if (OB_FAIL(ret)) {
}
param.extra_buf_ = store_ctx_->session_ctx_array_[i].extra_buf_;
param.extra_buf_size_ = store_ctx_->session_ctx_array_[i].extra_buf_size_;
// init table_store_
else if (OB_FAIL(session_ctx->table_store_.init(param))) {
if (OB_FAIL(session_ctx->table_store_.init(param))) {
LOG_WARN("fail to init table store", KR(ret));
}
// init datum_row_
......
......@@ -108,8 +108,6 @@ private:
ObDataTypeCastParams cast_params_;
storage::ObDirectLoadTableStore table_store_;
uint64_t last_receive_sequence_no_;
char *extra_buf_;
int64_t extra_buf_size_;
};
SessionContext *session_ctx_array_;
int64_t ref_count_ CACHE_ALIGNED;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册