提交 558324e7 编写于 作者: M maxim

fixed reading of parquet files containing columns of type `list`

上级 76869bb6
include(ExternalProject)
# === thrift
set(LIBRARY_DIR ${ClickHouse_SOURCE_DIR}/contrib/thrift/lib/cpp)
......@@ -70,6 +72,106 @@ add_custom_command(OUTPUT orc_proto.pb.h orc_proto.pb.cc
--cpp_out="${CMAKE_CURRENT_BINARY_DIR}"
"${PROTO_DIR}/orc_proto.proto")
# === flatbuffers
##############################################################
# fbs - Step 1: download flatbuffers and build flatc executable
# TODO get rid of downloads. Should work offline.
##############################################################
set(FLATBUFFERS_VERSION v1.11.0) #flatbuffers-v1.11.0.tar.gz
if (DEFINED ENV{ARROW_FLATBUFFERS_URL})
set(FLATBUFFERS_SOURCE_URL "$ENV{ARROW_FLATBUFFERS_URL}")
else()
set(FLATBUFFERS_SOURCE_URL "https://github.com/google/flatbuffers/archive/${FLATBUFFERS_VERSION}.tar.gz")
endif()
macro(build_flatbuffers)
message(STATUS "Building flatbuffers from source")
set(FLATBUFFERS_PREFIX
"${CMAKE_CURRENT_BINARY_DIR}/flatbuffers_ep-prefix/src/flatbuffers_ep-install")
if(MSVC)
set(FLATBUFFERS_CMAKE_CXX_FLAGS /EHsc)
else()
set(FLATBUFFERS_CMAKE_CXX_FLAGS -fPIC)
endif()
set(FLATBUFFERS_COMPILER "${FLATBUFFERS_PREFIX}/bin/flatc")
set(
FLATBUFFERS_STATIC_LIB
"${FLATBUFFERS_PREFIX}/lib/${CMAKE_STATIC_LIBRARY_PREFIX}flatbuffers${CMAKE_STATIC_LIBRARY_SUFFIX}"
)
# We always need to do release builds, otherwise flatc will not be installed.
externalproject_add(flatbuffers_ep
URL ${FLATBUFFERS_SOURCE_URL}
BUILD_BYPRODUCTS ${FLATBUFFERS_COMPILER} ${FLATBUFFERS_STATIC_LIB}
CMAKE_ARGS ${EP_COMMON_CMAKE_ARGS}
"-DCMAKE_BUILD_TYPE=RELEASE"
"-DCMAKE_CXX_FLAGS=${FLATBUFFERS_CMAKE_CXX_FLAGS}"
"-DCMAKE_INSTALL_PREFIX:PATH=${FLATBUFFERS_PREFIX}"
"-DFLATBUFFERS_BUILD_TESTS=OFF"
${EP_LOG_OPTIONS})
file(MAKE_DIRECTORY "${FLATBUFFERS_PREFIX}/include")
add_library(flatbuffers::flatbuffers STATIC IMPORTED)
set_target_properties(flatbuffers::flatbuffers
PROPERTIES IMPORTED_LOCATION "${FLATBUFFERS_STATIC_LIB}"
INTERFACE_INCLUDE_DIRECTORIES
"${FLATBUFFERS_PREFIX}/include")
add_executable(flatbuffers::flatc IMPORTED)
set_target_properties(flatbuffers::flatc
PROPERTIES IMPORTED_LOCATION "${FLATBUFFERS_COMPILER}")
add_dependencies(flatbuffers::flatbuffers flatbuffers_ep)
add_dependencies(flatbuffers::flatc flatbuffers_ep)
endmacro()
build_flatbuffers()
###################################
# fbs - Step 2: compile *.fbs files
###################################
set(ARROW_IPC_SRC_DIR ${ARROW_SRC_DIR}/arrow/ipc)
set(ARROW_FORMAT_SRC_DIR ${ARROW_SRC_DIR}/../../format)
set(FBS_OUTPUT_FILES "${ARROW_IPC_SRC_DIR}/File_generated.h" "${ARROW_IPC_SRC_DIR}/Message_generated.h"
"${ARROW_IPC_SRC_DIR}/feather_generated.h")
set(FBS_SRC
${ARROW_FORMAT_SRC_DIR}/Message.fbs
${ARROW_FORMAT_SRC_DIR}/File.fbs
${ARROW_FORMAT_SRC_DIR}/Schema.fbs
${ARROW_FORMAT_SRC_DIR}/Tensor.fbs
${ARROW_FORMAT_SRC_DIR}/SparseTensor.fbs
${ARROW_IPC_SRC_DIR}/feather.fbs)
foreach(FIL ${FBS_SRC})
get_filename_component(ABS_FIL ${FIL} ABSOLUTE)
list(APPEND ABS_FBS_SRC ${ABS_FIL})
endforeach()
get_target_property(FLATC_EXECUTABLE flatbuffers::flatc IMPORTED_LOCATION)
if(NOT FLATC_EXECUTABLE)
get_target_property(FLATC_EXECUTABLE flatbuffers::flatc IMPORTED_LOCATION_RELEASE)
endif()
# TODO compiled files should be stored in binary build dir instead of among submodule sources
set(FLATBUFFERS_COMPILED_OUT_DIR ${ARROW_IPC_SRC_DIR})
message(STATUS "flatc: ${FLATC_EXECUTABLE} -c -o ${FLATBUFFERS_COMPILED_OUT_DIR}/ ${ABS_FBS_SRC}")
add_custom_command(TARGET flatbuffers_ep
POST_BUILD
COMMAND ${FLATC_EXECUTABLE}
-c
-o
${FLATBUFFERS_COMPILED_OUT_DIR}/
${ABS_FBS_SRC}
DEPENDS flatbuffers::flatc ${ABS_FBS_SRC}
COMMENT "Running flatc compiler on ${ABS_FBS_SRC}"
VERBATIM)
# arrow-cmake cmake file calling orc cmake subroutine which detects certain compiler features.
# Apple Clang compiler failed to compile this code without specifying c++11 standard.
# As result these compiler features detected as absent. In result it failed to compile orc itself.
......@@ -86,6 +188,7 @@ configure_file("${ORC_SOURCE_SRC_DIR}/Adaptor.hh.in" "${ORC_BUILD_INCLUDE_DIR}/A
set(ORC_SRCS
${ARROW_SRC_DIR}/arrow/adapters/orc/adapter.cc
${ARROW_SRC_DIR}/arrow/adapters/orc/adapter_util.cc
${ORC_SOURCE_SRC_DIR}/Exceptions.cc
${ORC_SOURCE_SRC_DIR}/OrcFile.cc
${ORC_SOURCE_SRC_DIR}/Reader.cc
......@@ -119,58 +222,91 @@ set(ORC_SRCS
# === arrow
set(LIBRARY_DIR ${ClickHouse_SOURCE_DIR}/contrib/arrow/cpp/src/arrow)
configure_file("${LIBRARY_DIR}/util/config.h.cmake" "${LIBRARY_DIR}/util/config.h")
# arrow/cpp/src/arrow/CMakeLists.txt
set(ARROW_SRCS
${LIBRARY_DIR}/array.cc
${LIBRARY_DIR}/builder.cc
${LIBRARY_DIR}/array/builder_adaptive.cc
${LIBRARY_DIR}/array/builder_base.cc
${LIBRARY_DIR}/array/builder_binary.cc
${LIBRARY_DIR}/array/builder_decimal.cc
${LIBRARY_DIR}/array/builder_dict.cc
${LIBRARY_DIR}/array/builder_nested.cc
${LIBRARY_DIR}/array/builder_primitive.cc
${LIBRARY_DIR}/buffer.cc
${LIBRARY_DIR}/builder.cc
${LIBRARY_DIR}/compare.cc
${LIBRARY_DIR}/extension_type.cc
${LIBRARY_DIR}/memory_pool.cc
${LIBRARY_DIR}/pretty_print.cc
${LIBRARY_DIR}/record_batch.cc
${LIBRARY_DIR}/result.cc
${LIBRARY_DIR}/scalar.cc
${LIBRARY_DIR}/sparse_tensor.cc
${LIBRARY_DIR}/status.cc
${LIBRARY_DIR}/table.cc
${LIBRARY_DIR}/table_builder.cc
${LIBRARY_DIR}/table.cc
${LIBRARY_DIR}/tensor.cc
${LIBRARY_DIR}/sparse_tensor.cc
${LIBRARY_DIR}/type.cc
${LIBRARY_DIR}/visitor.cc
${LIBRARY_DIR}/array/builder_adaptive.cc
${LIBRARY_DIR}/array/builder_base.cc
${LIBRARY_DIR}/array/builder_binary.cc
${LIBRARY_DIR}/array/builder_decimal.cc
${LIBRARY_DIR}/array/builder_dict.cc
${LIBRARY_DIR}/array/builder_nested.cc
${LIBRARY_DIR}/array/builder_primitive.cc
${LIBRARY_DIR}/array/builder_union.cc
${LIBRARY_DIR}/array/concatenate.cc
${LIBRARY_DIR}/array/dict_internal.cc
${LIBRARY_DIR}/array/diff.cc
${LIBRARY_DIR}/csv/converter.cc
${LIBRARY_DIR}/csv/chunker.cc
${LIBRARY_DIR}/csv/column-builder.cc
${LIBRARY_DIR}/csv/column_builder.cc
${LIBRARY_DIR}/csv/options.cc
${LIBRARY_DIR}/csv/parser.cc
${LIBRARY_DIR}/csv/reader.cc
${LIBRARY_DIR}/ipc/dictionary.cc
${LIBRARY_DIR}/ipc/feather.cc
${LIBRARY_DIR}/ipc/file_to_stream.cc
${LIBRARY_DIR}/ipc/message.cc
${LIBRARY_DIR}/ipc/metadata_internal.cc
${LIBRARY_DIR}/ipc/options.cc
${LIBRARY_DIR}/ipc/reader.cc
${LIBRARY_DIR}/ipc/stream_to_file.cc
${LIBRARY_DIR}/ipc/writer.cc
${LIBRARY_DIR}/io/buffered.cc
${LIBRARY_DIR}/io/compressed.cc
${LIBRARY_DIR}/io/file.cc
${LIBRARY_DIR}/io/interfaces.cc
${LIBRARY_DIR}/io/memory.cc
${LIBRARY_DIR}/io/readahead.cc
${LIBRARY_DIR}/io/slow.cc
${LIBRARY_DIR}/util/bit-util.cc
${LIBRARY_DIR}/util/basic_decimal.cc
${LIBRARY_DIR}/util/bit_util.cc
# ${LIBRARY_DIR}/util/compression_brotli.cc
${LIBRARY_DIR}/util/compression_bz2.cc
${LIBRARY_DIR}/util/compression.cc
${LIBRARY_DIR}/util/cpu-info.cc
${LIBRARY_DIR}/util/compression_lz4.cc
${LIBRARY_DIR}/util/compression_snappy.cc
${LIBRARY_DIR}/util/compression_zlib.cc
${LIBRARY_DIR}/util/compression_zstd.cc
${LIBRARY_DIR}/util/cpu_info.cc
${LIBRARY_DIR}/util/decimal.cc
${LIBRARY_DIR}/util/int-util.cc
${LIBRARY_DIR}/util/io-util.cc
${LIBRARY_DIR}/util/logging.cc
${LIBRARY_DIR}/util/int_util.cc
${LIBRARY_DIR}/util/io_util.cc
${LIBRARY_DIR}/util/key_value_metadata.cc
${LIBRARY_DIR}/util/task-group.cc
${LIBRARY_DIR}/util/thread-pool.cc
${LIBRARY_DIR}/util/logging.cc
${LIBRARY_DIR}/util/memory.cc
${LIBRARY_DIR}/util/string_builder.cc
${LIBRARY_DIR}/util/string.cc
${LIBRARY_DIR}/util/task_group.cc
${LIBRARY_DIR}/util/thread_pool.cc
${LIBRARY_DIR}/util/trie.cc
# ${LIBRARY_DIR}/util/uri.cc
${LIBRARY_DIR}/util/utf8.cc
${LIBRARY_DIR}/vendored/base64.cpp
${ORC_SRCS}
)
......@@ -179,7 +315,7 @@ set(ARROW_SRCS ${ARROW_SRCS}
${LIBRARY_DIR}/compute/kernels/boolean.cc
${LIBRARY_DIR}/compute/kernels/cast.cc
${LIBRARY_DIR}/compute/kernels/hash.cc
${LIBRARY_DIR}/compute/kernels/util-internal.cc
${LIBRARY_DIR}/compute/kernels/util_internal.cc
)
if (LZ4_INCLUDE_DIR AND LZ4_LIBRARY)
......@@ -221,6 +357,11 @@ endif()
add_library(${ARROW_LIBRARY} ${ARROW_SRCS})
# Arrow dependencies on external projects (ep): flatbuffers and Boost
add_dependencies(${ARROW_LIBRARY} flatbuffers_ep)
target_link_libraries(${ARROW_LIBRARY} PRIVATE boost_system_internal boost_filesystem_internal boost_regex_internal)
target_link_libraries(${ARROW_LIBRARY} PRIVATE flatbuffers::flatbuffers)
if (USE_INTERNAL_PROTOBUF_LIBRARY)
add_dependencies(${ARROW_LIBRARY} protoc)
endif()
......@@ -255,23 +396,25 @@ set(LIBRARY_DIR ${ClickHouse_SOURCE_DIR}/contrib/arrow/cpp/src/parquet)
# arrow/cpp/src/parquet/CMakeLists.txt
set(PARQUET_SRCS
${LIBRARY_DIR}/arrow/reader.cc
${LIBRARY_DIR}/arrow/record_reader.cc
${LIBRARY_DIR}/arrow/reader_internal.cc
${LIBRARY_DIR}/arrow/schema.cc
${LIBRARY_DIR}/arrow/writer.cc
${LIBRARY_DIR}/bloom_filter.cc
${LIBRARY_DIR}/column_reader.cc
${LIBRARY_DIR}/column_scanner.cc
${LIBRARY_DIR}/column_writer.cc
${LIBRARY_DIR}/deprecated_io.cc
${LIBRARY_DIR}/encoding.cc
${LIBRARY_DIR}/file_reader.cc
${LIBRARY_DIR}/file_writer.cc
${LIBRARY_DIR}/metadata.cc
${LIBRARY_DIR}/murmur3.cc
${LIBRARY_DIR}/platform.cc
${LIBRARY_DIR}/printer.cc
${LIBRARY_DIR}/properties.cc
${LIBRARY_DIR}/schema.cc
${LIBRARY_DIR}/statistics.cc
${LIBRARY_DIR}/types.cc
${LIBRARY_DIR}/util/comparison.cc
${LIBRARY_DIR}/util/memory.cc
)
#list(TRANSFORM PARQUET_SRCS PREPEND ${LIBRARY_DIR}/) # cmake 3.12
list(APPEND PARQUET_SRCS
......@@ -292,7 +435,7 @@ endif()
# === tools
set(TOOLS_DIR ${ClickHouse_SOURCE_DIR}/contrib/arrow/cpp/tools/parquet)
set(PARQUET_TOOLS parquet-dump-schema parquet-reader parquet-scan)
set(PARQUET_TOOLS parquet_dump_schema parquet_reader parquet_scan)
foreach(TOOL ${PARQUET_TOOLS})
add_executable(${TOOL} ${TOOLS_DIR}/${TOOL}.cc)
target_link_libraries(${TOOL} PRIVATE ${PARQUET_LIBRARY})
......
/**
* Autogenerated by Thrift Compiler (0.11.0)
* Autogenerated by Thrift Compiler (0.12.0)
*
* DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
* @generated
......
/**
* Autogenerated by Thrift Compiler (0.11.0)
* Autogenerated by Thrift Compiler (0.12.0)
*
* DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
* @generated
......
......@@ -61,14 +61,14 @@ namespace DB
/// Inserts numeric data right into internal column data to reduce an overhead
template <typename NumericType, typename VectorType = ColumnVector<NumericType>>
static void fillColumnWithNumericData(std::shared_ptr<arrow::Column> & arrow_column, MutableColumnPtr & internal_column)
static void fillColumnWithNumericData(std::shared_ptr<arrow::ChunkedArray> & arrow_column, MutableColumnPtr & internal_column)
{
auto & column_data = static_cast<VectorType &>(*internal_column).getData();
column_data.reserve(arrow_column->length());
for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->data()->num_chunks()); chunk_i < num_chunks; ++chunk_i)
for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->num_chunks()); chunk_i < num_chunks; ++chunk_i)
{
std::shared_ptr<arrow::Array> chunk = arrow_column->data()->chunk(chunk_i);
std::shared_ptr<arrow::Array> chunk = arrow_column->chunk(chunk_i);
/// buffers[0] is a null bitmap and buffers[1] are actual values
std::shared_ptr<arrow::Buffer> buffer = chunk->data()->buffers[1];
......@@ -80,15 +80,15 @@ namespace DB
/// Inserts chars and offsets right into internal column data to reduce an overhead.
/// Internal offsets are shifted by one to the right in comparison with Arrow ones. So the last offset should map to the end of all chars.
/// Also internal strings are null terminated.
static void fillColumnWithStringData(std::shared_ptr<arrow::Column> & arrow_column, MutableColumnPtr & internal_column)
static void fillColumnWithStringData(std::shared_ptr<arrow::ChunkedArray> & arrow_column, MutableColumnPtr & internal_column)
{
PaddedPODArray<UInt8> & column_chars_t = assert_cast<ColumnString &>(*internal_column).getChars();
PaddedPODArray<UInt64> & column_offsets = assert_cast<ColumnString &>(*internal_column).getOffsets();
size_t chars_t_size = 0;
for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->data()->num_chunks()); chunk_i < num_chunks; ++chunk_i)
for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->num_chunks()); chunk_i < num_chunks; ++chunk_i)
{
arrow::BinaryArray & chunk = static_cast<arrow::BinaryArray &>(*(arrow_column->data()->chunk(chunk_i)));
arrow::BinaryArray & chunk = static_cast<arrow::BinaryArray &>(*(arrow_column->chunk(chunk_i)));
const size_t chunk_length = chunk.length();
chars_t_size += chunk.value_offset(chunk_length - 1) + chunk.value_length(chunk_length - 1);
......@@ -98,9 +98,9 @@ namespace DB
column_chars_t.reserve(chars_t_size);
column_offsets.reserve(arrow_column->length());
for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->data()->num_chunks()); chunk_i < num_chunks; ++chunk_i)
for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->num_chunks()); chunk_i < num_chunks; ++chunk_i)
{
arrow::BinaryArray & chunk = static_cast<arrow::BinaryArray &>(*(arrow_column->data()->chunk(chunk_i)));
arrow::BinaryArray & chunk = static_cast<arrow::BinaryArray &>(*(arrow_column->chunk(chunk_i)));
std::shared_ptr<arrow::Buffer> buffer = chunk.value_data();
const size_t chunk_length = chunk.length();
......@@ -118,14 +118,14 @@ namespace DB
}
}
static void fillColumnWithBooleanData(std::shared_ptr<arrow::Column> & arrow_column, MutableColumnPtr & internal_column)
static void fillColumnWithBooleanData(std::shared_ptr<arrow::ChunkedArray> & arrow_column, MutableColumnPtr & internal_column)
{
auto & column_data = assert_cast<ColumnVector<UInt8> &>(*internal_column).getData();
column_data.reserve(arrow_column->length());
for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->data()->num_chunks()); chunk_i < num_chunks; ++chunk_i)
for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->num_chunks()); chunk_i < num_chunks; ++chunk_i)
{
arrow::BooleanArray & chunk = static_cast<arrow::BooleanArray &>(*(arrow_column->data()->chunk(chunk_i)));
arrow::BooleanArray & chunk = static_cast<arrow::BooleanArray &>(*(arrow_column->chunk(chunk_i)));
/// buffers[0] is a null bitmap and buffers[1] are actual values
std::shared_ptr<arrow::Buffer> buffer = chunk.data()->buffers[1];
......@@ -135,14 +135,14 @@ namespace DB
}
/// Arrow stores Parquet::DATE in Int32, while ClickHouse stores Date in UInt16. Therefore, it should be checked before saving
static void fillColumnWithDate32Data(std::shared_ptr<arrow::Column> & arrow_column, MutableColumnPtr & internal_column)
static void fillColumnWithDate32Data(std::shared_ptr<arrow::ChunkedArray> & arrow_column, MutableColumnPtr & internal_column)
{
PaddedPODArray<UInt16> & column_data = assert_cast<ColumnVector<UInt16> &>(*internal_column).getData();
column_data.reserve(arrow_column->length());
for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->data()->num_chunks()); chunk_i < num_chunks; ++chunk_i)
for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->num_chunks()); chunk_i < num_chunks; ++chunk_i)
{
arrow::Date32Array & chunk = static_cast<arrow::Date32Array &>(*(arrow_column->data()->chunk(chunk_i)));
arrow::Date32Array & chunk = static_cast<arrow::Date32Array &>(*(arrow_column->chunk(chunk_i)));
for (size_t value_i = 0, length = static_cast<size_t>(chunk.length()); value_i < length; ++value_i)
{
......@@ -150,7 +150,7 @@ namespace DB
if (days_num > DATE_LUT_MAX_DAY_NUM)
{
// TODO: will it rollback correctly?
throw Exception{"Input value " + std::to_string(days_num) + " of a column \"" + arrow_column->name()
throw Exception{"Input value " + std::to_string(days_num) + " of a column \"" + internal_column->getName()
+ "\" is greater than "
"max allowed Date value, which is "
+ std::to_string(DATE_LUT_MAX_DAY_NUM),
......@@ -163,14 +163,14 @@ namespace DB
}
/// Arrow stores Parquet::DATETIME in Int64, while ClickHouse stores DateTime in UInt32. Therefore, it should be checked before saving
static void fillColumnWithDate64Data(std::shared_ptr<arrow::Column> & arrow_column, MutableColumnPtr & internal_column)
static void fillColumnWithDate64Data(std::shared_ptr<arrow::ChunkedArray> & arrow_column, MutableColumnPtr & internal_column)
{
auto & column_data = assert_cast<ColumnVector<UInt32> &>(*internal_column).getData();
column_data.reserve(arrow_column->length());
for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->data()->num_chunks()); chunk_i < num_chunks; ++chunk_i)
for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->num_chunks()); chunk_i < num_chunks; ++chunk_i)
{
auto & chunk = static_cast<arrow::Date64Array &>(*(arrow_column->data()->chunk(chunk_i)));
auto & chunk = static_cast<arrow::Date64Array &>(*(arrow_column->chunk(chunk_i)));
for (size_t value_i = 0, length = static_cast<size_t>(chunk.length()); value_i < length; ++value_i)
{
auto timestamp = static_cast<UInt32>(chunk.Value(value_i) / 1000); // Always? in ms
......@@ -179,14 +179,14 @@ namespace DB
}
}
static void fillColumnWithTimestampData(std::shared_ptr<arrow::Column> & arrow_column, MutableColumnPtr & internal_column)
static void fillColumnWithTimestampData(std::shared_ptr<arrow::ChunkedArray> & arrow_column, MutableColumnPtr & internal_column)
{
auto & column_data = assert_cast<ColumnVector<UInt32> &>(*internal_column).getData();
column_data.reserve(arrow_column->length());
for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->data()->num_chunks()); chunk_i < num_chunks; ++chunk_i)
for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->num_chunks()); chunk_i < num_chunks; ++chunk_i)
{
auto & chunk = static_cast<arrow::TimestampArray &>(*(arrow_column->data()->chunk(chunk_i)));
auto & chunk = static_cast<arrow::TimestampArray &>(*(arrow_column->chunk(chunk_i)));
const auto & type = static_cast<const ::arrow::TimestampType &>(*chunk.type());
UInt32 divide = 1;
......@@ -215,15 +215,15 @@ namespace DB
}
}
static void fillColumnWithDecimalData(std::shared_ptr<arrow::Column> & arrow_column, MutableColumnPtr & internal_column)
static void fillColumnWithDecimalData(std::shared_ptr<arrow::ChunkedArray> & arrow_column, MutableColumnPtr & internal_column)
{
auto & column = assert_cast<ColumnDecimal<Decimal128> &>(*internal_column);
auto & column_data = column.getData();
column_data.reserve(arrow_column->length());
for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->data()->num_chunks()); chunk_i < num_chunks; ++chunk_i)
for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->num_chunks()); chunk_i < num_chunks; ++chunk_i)
{
auto & chunk = static_cast<arrow::DecimalArray &>(*(arrow_column->data()->chunk(chunk_i)));
auto & chunk = static_cast<arrow::DecimalArray &>(*(arrow_column->chunk(chunk_i)));
for (size_t value_i = 0, length = static_cast<size_t>(chunk.length()); value_i < length; ++value_i)
{
column_data.emplace_back(chunk.IsNull(value_i) ? Decimal128(0) : *reinterpret_cast<const Decimal128 *>(chunk.Value(value_i))); // TODO: copy column
......@@ -232,14 +232,14 @@ namespace DB
}
/// Creates a null bytemap from arrow's null bitmap
static void fillByteMapFromArrowColumn(std::shared_ptr<arrow::Column> & arrow_column, MutableColumnPtr & bytemap)
static void fillByteMapFromArrowColumn(std::shared_ptr<arrow::ChunkedArray> & arrow_column, MutableColumnPtr & bytemap)
{
PaddedPODArray<UInt8> & bytemap_data = assert_cast<ColumnVector<UInt8> &>(*bytemap).getData();
bytemap_data.reserve(arrow_column->length());
for (size_t chunk_i = 0; chunk_i != static_cast<size_t>(arrow_column->data()->num_chunks()); ++chunk_i)
for (size_t chunk_i = 0; chunk_i != static_cast<size_t>(arrow_column->num_chunks()); ++chunk_i)
{
std::shared_ptr<arrow::Array> chunk = arrow_column->data()->chunk(chunk_i);
std::shared_ptr<arrow::Array> chunk = arrow_column->chunk(chunk_i);
for (size_t value_i = 0; value_i != static_cast<size_t>(chunk->length()); ++value_i)
bytemap_data.emplace_back(chunk->IsNull(value_i));
......@@ -255,7 +255,7 @@ namespace DB
columns_list.reserve(header.rows());
using NameToColumnPtr = std::unordered_map<std::string, std::shared_ptr<arrow::Column>>;
using NameToColumnPtr = std::unordered_map<std::string, std::shared_ptr<arrow::ChunkedArray>>;
if (!read_status.ok())
throw Exception{"Error while reading " + format_name + " data: " + read_status.ToString(),
ErrorCodes::CANNOT_READ_ALL_DATA};
......@@ -270,10 +270,10 @@ namespace DB
++row_group_current;
NameToColumnPtr name_to_column_ptr;
for (size_t i = 0, num_columns = static_cast<size_t>(table->num_columns()); i < num_columns; ++i)
for (const auto column_name : table->ColumnNames())
{
std::shared_ptr<arrow::Column> arrow_column = table->column(i);
name_to_column_ptr[arrow_column->name()] = arrow_column;
std::shared_ptr<arrow::ChunkedArray> arrow_column = table->GetColumnByName(column_name);
name_to_column_ptr[column_name] = arrow_column;
}
for (size_t column_i = 0, columns = header.columns(); column_i < columns; ++column_i)
......@@ -285,7 +285,7 @@ namespace DB
throw Exception{"Column \"" + header_column.name + "\" is not presented in input data",
ErrorCodes::THERE_IS_NO_COLUMN};
std::shared_ptr<arrow::Column> arrow_column = name_to_column_ptr[header_column.name];
std::shared_ptr<arrow::ChunkedArray> arrow_column = name_to_column_ptr[header_column.name];
arrow::Type::type arrow_type = arrow_column->type()->id();
// TODO: check if a column is const?
......@@ -313,7 +313,7 @@ namespace DB
}
else
{
throw Exception{"The type \"" + arrow_column->type()->name() + "\" of an input column \"" + arrow_column->name()
throw Exception{"The type \"" + arrow_column->type()->name() + "\" of an input column \"" + header_column.name
+ "\" is not supported for conversion from a " + format_name + " data format",
ErrorCodes::CANNOT_CONVERT_TYPE};
}
......@@ -374,7 +374,7 @@ namespace DB
throw Exception
{
"Unsupported " + format_name + " type \"" + arrow_column->type()->name() + "\" of an input column \""
+ arrow_column->name() + "\"",
+ header_column.name + "\"",
ErrorCodes::UNKNOWN_TYPE
};
}
......
......@@ -44,9 +44,11 @@ namespace DB
buffer = std::make_unique<arrow::Buffer>(file_data);
// TODO: maybe use parquet::RandomAccessSource?
auto reader = parquet::ParquetFileReader::Open(std::make_shared<::arrow::io::BufferReader>(*buffer));
file_reader = std::make_unique<parquet::arrow::FileReader>(::arrow::default_memory_pool(),
std::move(reader));
auto status = parquet::arrow::FileReader::Make(
::arrow::default_memory_pool(),
parquet::ParquetFileReader::Open(std::make_shared<::arrow::io::BufferReader>(*buffer)),
&file_reader);
row_group_total = file_reader->num_row_groups();
row_group_current = 0;
}
......
......@@ -21,9 +21,10 @@
#include <arrow/api.h>
#include <arrow/io/api.h>
#include <arrow/util/decimal.h>
#include <arrow/util/memory.h>
#include <parquet/arrow/writer.h>
#include <parquet/exception.h>
#include <parquet/util/memory.h>
#include <parquet/deprecated_io.h>
namespace DB
......@@ -238,22 +239,39 @@ static const PaddedPODArray<UInt8> * extractNullBytemapPtr(ColumnPtr column)
}
class OstreamOutputStream : public parquet::OutputStream
class OstreamOutputStream : public arrow::io::OutputStream
{
public:
explicit OstreamOutputStream(WriteBuffer & ostr_) : ostr(ostr_) {}
virtual ~OstreamOutputStream() {}
virtual void Close() {}
virtual int64_t Tell() { return total_length; }
virtual void Write(const uint8_t * data, int64_t length)
{
virtual ~OstreamOutputStream() {
is_open_ = true;
}
// FileInterface
::arrow::Status Close() override{
is_open_ = false;
return ::arrow::Status::OK();
}
::arrow::Status Tell(int64_t* position) const override {
*position = total_length;
return ::arrow::Status::OK();
};
bool closed() const override{
return is_open_;
};
// Writable
::arrow::Status Write(const void* data, int64_t length) override {
ostr.write(reinterpret_cast<const char *>(data), length);
total_length += length;
// TODO try catch write ?
return ::arrow::Status::OK();
}
private:
WriteBuffer & ostr;
int64_t total_length = 0;
bool is_open_ = false;
PARQUET_DISALLOW_COPY_AND_ASSIGN(OstreamOutputStream);
};
......@@ -396,7 +414,6 @@ void ParquetBlockOutputFormat::consume(Chunk chunk)
arrow::default_memory_pool(),
sink,
props, /*parquet::default_writer_properties(),*/
parquet::arrow::default_arrow_writer_properties(),
&file_writer);
if (!status.ok())
throw Exception{"Error while opening a table: " + status.ToString(), ErrorCodes::UNKNOWN_EXCEPTION};
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册