未验证 提交 69259d92 编写于 作者: Z zhenshan.cao 提交者: GitHub

Call ReleaseUnused of memorypool in arrow (#12893)

Signed-off-by: Nzhenshan.cao <zhenshan.cao@zilliz.com>
上级 19b0a94d
...@@ -307,7 +307,8 @@ CStatus FinishPayloadWriter(CPayloadWriter payloadWriter) { ...@@ -307,7 +307,8 @@ CStatus FinishPayloadWriter(CPayloadWriter payloadWriter) {
} }
auto table = arrow::Table::Make(p->schema, {array}); auto table = arrow::Table::Make(p->schema, {array});
p->output = std::make_shared<wrapper::PayloadOutputStream>(); p->output = std::make_shared<wrapper::PayloadOutputStream>();
ast = parquet::arrow::WriteTable(*table, arrow::default_memory_pool(), p->output, 1024 * 1024 * 1024); auto mem_pool = arrow::default_memory_pool();
ast = parquet::arrow::WriteTable(*table, mem_pool, p->output, 1024 * 1024 * 1024);
if (!ast.ok()) { if (!ast.ok()) {
st.error_code = static_cast<int>(ErrorCode::UNEXPECTED_ERROR); st.error_code = static_cast<int>(ErrorCode::UNEXPECTED_ERROR);
st.error_msg = ErrorMsg(ast.message()); st.error_msg = ErrorMsg(ast.message());
...@@ -344,6 +345,8 @@ CStatus ReleasePayloadWriter(CPayloadWriter handler) { ...@@ -344,6 +345,8 @@ CStatus ReleasePayloadWriter(CPayloadWriter handler) {
st.error_msg = nullptr; st.error_msg = nullptr;
auto p = reinterpret_cast<wrapper::PayloadWriter *>(handler); auto p = reinterpret_cast<wrapper::PayloadWriter *>(handler);
if (p != nullptr) delete p; if (p != nullptr) delete p;
auto mem_pool = arrow::default_memory_pool();
mem_pool->ReleaseUnused();
return st; return st;
} }
...@@ -352,7 +355,8 @@ CPayloadReader NewPayloadReader(int columnType, uint8_t *buffer, int64_t buf_siz ...@@ -352,7 +355,8 @@ CPayloadReader NewPayloadReader(int columnType, uint8_t *buffer, int64_t buf_siz
auto p = new wrapper::PayloadReader; auto p = new wrapper::PayloadReader;
p->bValues = nullptr; p->bValues = nullptr;
p->input = std::make_shared<wrapper::PayloadInputStream>(buffer, buf_size); p->input = std::make_shared<wrapper::PayloadInputStream>(buffer, buf_size);
auto st = parquet::arrow::OpenFile(p->input, arrow::default_memory_pool(), &p->reader); auto mem_pool = arrow::default_memory_pool();
auto st = parquet::arrow::OpenFile(p->input, mem_pool, &p->reader);
if (!st.ok()) { if (!st.ok()) {
delete p; delete p;
return nullptr; return nullptr;
...@@ -366,7 +370,6 @@ CPayloadReader NewPayloadReader(int columnType, uint8_t *buffer, int64_t buf_siz ...@@ -366,7 +370,6 @@ CPayloadReader NewPayloadReader(int columnType, uint8_t *buffer, int64_t buf_siz
assert(p->column != nullptr); assert(p->column != nullptr);
assert(p->column->chunks().size() == 1); assert(p->column->chunks().size() == 1);
p->array = p->column->chunk(0); p->array = p->column->chunk(0);
switch (columnType) { switch (columnType) {
case ColumnType::BOOL : case ColumnType::BOOL :
case ColumnType::INT8 : case ColumnType::INT8 :
...@@ -533,5 +536,7 @@ CStatus ReleasePayloadReader(CPayloadReader payloadReader) { ...@@ -533,5 +536,7 @@ CStatus ReleasePayloadReader(CPayloadReader payloadReader) {
auto p = reinterpret_cast<wrapper::PayloadReader *>(payloadReader); auto p = reinterpret_cast<wrapper::PayloadReader *>(payloadReader);
delete[] p->bValues; delete[] p->bValues;
delete p; delete p;
auto mem_pool = arrow::default_memory_pool();
mem_pool->ReleaseUnused();
return st; return st;
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册