未验证 提交 c6a8e4d6 编写于 作者: T tavplubix 提交者: GitHub

Merge pull request #8958 from ClickHouse/json-each-row-array-fix

Fixed error in JSONEachRow when data is in array.
......@@ -34,7 +34,7 @@ void ParallelParsingBlockInputStream::segmentatorThreadFunction()
unit.is_last = !have_more_data;
unit.status = READY_TO_PARSE;
scheduleParserThreadForUnitWithNumber(current_unit_number);
scheduleParserThreadForUnitWithNumber(segmentator_ticket_number);
++segmentator_ticket_number;
if (!have_more_data)
......@@ -49,12 +49,13 @@ void ParallelParsingBlockInputStream::segmentatorThreadFunction()
}
}
void ParallelParsingBlockInputStream::parserThreadFunction(size_t current_unit_number)
void ParallelParsingBlockInputStream::parserThreadFunction(size_t current_ticket_number)
{
try
{
setThreadName("ChunkParser");
const auto current_unit_number = current_ticket_number % processing_units.size();
auto & unit = processing_units[current_unit_number];
/*
......@@ -64,9 +65,9 @@ void ParallelParsingBlockInputStream::parserThreadFunction(size_t current_unit_n
* can use it from multiple threads simultaneously.
*/
ReadBuffer read_buffer(unit.segment.data(), unit.segment.size(), 0);
auto parser = std::make_unique<InputStreamFromInputFormat>(
input_processor_creator(read_buffer, header,
row_input_format_params, format_settings));
auto format = input_processor_creator(read_buffer, header, row_input_format_params, format_settings);
format->setCurrentUnitNumber(current_ticket_number);
auto parser = std::make_unique<InputStreamFromInputFormat>(std::move(format));
unit.block_ext.block.clear();
unit.block_ext.block_missing_values.clear();
......
......@@ -213,9 +213,9 @@ private:
std::deque<ProcessingUnit> processing_units;
void scheduleParserThreadForUnitWithNumber(size_t unit_number)
void scheduleParserThreadForUnitWithNumber(size_t ticket_number)
{
pool.scheduleOrThrowOnError(std::bind(&ParallelParsingBlockInputStream::parserThreadFunction, this, unit_number));
pool.scheduleOrThrowOnError(std::bind(&ParallelParsingBlockInputStream::parserThreadFunction, this, ticket_number));
}
void finishAndWait()
......
......@@ -145,9 +145,19 @@ BlockInputStreamPtr FormatFactory::getInput(
// Doesn't make sense to use parallel parsing with less than four threads
// (segmentator + two parsers + reader).
if (settings.input_format_parallel_parsing
&& file_segmentation_engine
&& settings.max_threads >= 4)
bool parallel_parsing = settings.input_format_parallel_parsing && file_segmentation_engine && settings.max_threads >= 4;
if (parallel_parsing && name == "JSONEachRow")
{
/// FIXME ParallelParsingBlockInputStream doesn't support formats with non-trivial readPrefix() and readSuffix()
/// For JSONEachRow we can safely skip whitespace characters
skipWhitespaceIfAny(buf);
if (buf.eof() || *buf.position() == '[')
parallel_parsing = false; /// Disable it for JSONEachRow if data is in square brackets (see JSONEachRowRowInputFormat)
}
if (parallel_parsing)
{
const auto & input_getter = getCreators(name).input_processor_creator;
if (!input_getter)
......
......@@ -38,6 +38,13 @@ public:
static const BlockMissingValues none;
return none;
}
size_t getCurrentUnitNumber() const { return current_unit_number; }
void setCurrentUnitNumber(size_t current_unit_number_) { current_unit_number = current_unit_number_; }
private:
/// Number of currently parsed chunk (if parallel parsing is enabled)
size_t current_unit_number = 0;
};
}
......@@ -77,6 +77,8 @@ protected:
const BlockMissingValues & getMissingValues() const override { return block_missing_values; }
size_t getTotalRows() const { return total_rows; }
private:
Params params;
......
......@@ -216,46 +216,37 @@ void JSONEachRowRowInputFormat::readNestedData(const String & name, MutableColum
bool JSONEachRowRowInputFormat::readRow(MutableColumns & columns, RowReadExtension & ext)
{
/// Set flag data_in_square_brackets if data starts with '['.
if (!in.eof() && parsing_stage == ParsingStage::START)
{
parsing_stage = ParsingStage::PROCESS;
skipWhitespaceIfAny(in);
if (*in.position() == '[')
{
data_in_square_brackets = true;
++in.position();
}
}
if (!allow_new_rows)
return false;
skipWhitespaceIfAny(in);
/// We consume ;, or \n before scanning a new row, instead scanning to next row at the end.
/// We consume , or \n before scanning a new row, instead scanning to next row at the end.
/// The reason is that if we want an exact number of rows read with LIMIT x
/// from a streaming table engine with text data format, like File or Kafka
/// then seeking to next ;, or \n would trigger reading of an extra row at the end.
/// Semicolon is added for convenience as it could be used at end of INSERT query.
if (!in.eof() && (*in.position() == ',' || *in.position() == ';'))
++in.position();
/// Finish reading rows if data is in square brackets and ']' received.
skipWhitespaceIfAny(in);
if (!in.eof() && *in.position() == ']' && data_in_square_brackets)
bool is_first_row = getCurrentUnitNumber() == 0 && getTotalRows() == 1;
if (!in.eof())
{
data_in_square_brackets = false;
parsing_stage = ParsingStage::FINISH;
++in.position();
return false;
/// There may be optional ',' (but not before the first row)
if (!is_first_row && *in.position() == ',')
++in.position();
else if (!data_in_square_brackets && *in.position() == ';')
{
/// ';' means the end of query (but it cannot be before ']')
return allow_new_rows = false;
}
else if (data_in_square_brackets && *in.position() == ']')
{
/// ']' means the end of query
return allow_new_rows = false;
}
}
skipWhitespaceIfAny(in);
if (in.eof() || parsing_stage == ParsingStage::FINISH)
{
if (data_in_square_brackets)
throw Exception("Unexpected end of data: received end of stream instead of ']'.", ErrorCodes::INCORRECT_DATA);
if (in.eof())
return false;
}
size_t num_columns = columns.size();
......@@ -291,6 +282,32 @@ void JSONEachRowRowInputFormat::resetParser()
prev_positions.clear();
}
void JSONEachRowRowInputFormat::readPrefix()
{
skipWhitespaceIfAny(in);
if (!in.eof() && *in.position() == '[')
{
++in.position();
data_in_square_brackets = true;
}
}
void JSONEachRowRowInputFormat::readSuffix()
{
skipWhitespaceIfAny(in);
if (data_in_square_brackets)
{
assertChar(']', in);
skipWhitespaceIfAny(in);
}
if (!in.eof() && *in.position() == ';')
{
++in.position();
skipWhitespaceIfAny(in);
}
assertEOF(in);
}
void registerInputFormatProcessorJSONEachRow(FormatFactory & factory)
{
......
......@@ -24,6 +24,9 @@ public:
String getName() const override { return "JSONEachRowRowInputFormat"; }
void readPrefix() override;
void readSuffix() override;
bool readRow(MutableColumns & columns, RowReadExtension & ext) override;
bool allowSyncAfterError() const override { return true; }
void syncAfterError() override;
......@@ -71,15 +74,7 @@ private:
/// This flag is needed to know if data is in square brackets.
bool data_in_square_brackets = false;
/// This is needed to know the stage of parsing.
enum class ParsingStage
{
START,
PROCESS,
FINISH
};
ParsingStage parsing_stage = ParsingStage::START;
bool allow_new_rows = true;
};
}
......@@ -11,36 +11,36 @@ DATA_DIR=$CUR_DIR/data_avro
echo === input
echo = primitive
cat $DATA_DIR/primitive.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S 'a_bool UInt8, b_int Int32, c_long Int64, d_float Float32, e_double Float64, f_bytes String, g_string String' -q 'select * from table'
cat $DATA_DIR/primitive.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S 'a_bool UInt8, c_long Int64, g_string String' -q 'select * from table'
cat $DATA_DIR/primitive.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S 'g_string String, c_long Int64, a_bool UInt8' -q 'select * from table'
cat $DATA_DIR/primitive.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S 'g_string String' -q 'select * from table'
cat $DATA_DIR/primitive.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S 'a_bool UInt8, b_int Int32, c_long Int64, d_float Float32, e_double Float64, f_bytes String, g_string String' -q 'select * from table'
cat $DATA_DIR/primitive.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S 'a_bool UInt8, c_long Int64, g_string String' -q 'select * from table'
cat $DATA_DIR/primitive.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S 'g_string String, c_long Int64, a_bool UInt8' -q 'select * from table'
cat $DATA_DIR/primitive.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S 'g_string String' -q 'select * from table'
echo = complex
cat $DATA_DIR/complex.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S "a_enum_to_string String, b_enum_to_enum Enum('t' = 1, 'f' = 0), c_array_string Array(String), d_array_array_string Array(Array(String)), e_union_null_string Nullable(String), f_union_long_null Nullable(Int64), g_fixed FixedString(32)" -q 'select * from table'
cat $DATA_DIR/complex.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S "g_fixed FixedString(32)" -q 'select * from table'
cat $DATA_DIR/complex.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S "a_enum_to_string String, b_enum_to_enum Enum('t' = 1, 'f' = 0), c_array_string Array(String), d_array_array_string Array(Array(String)), e_union_null_string Nullable(String), f_union_long_null Nullable(Int64), g_fixed FixedString(32)" -q 'select * from table'
cat $DATA_DIR/complex.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S "g_fixed FixedString(32)" -q 'select * from table'
echo = logical_types
cat $DATA_DIR/logical_types.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S "a_date Date, b_timestamp_millis DateTime64(3, 'UTC'), c_timestamp_micros DateTime64(6, 'UTC')" -q 'select * from table'
cat $DATA_DIR/logical_types.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S 'a_date Int32, b_timestamp_millis Int64, c_timestamp_micros Int64' -q 'select * from table'
cat $DATA_DIR/logical_types.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S "a_date Date, b_timestamp_millis DateTime64(3, 'UTC'), c_timestamp_micros DateTime64(6, 'UTC')" -q 'select * from table'
cat $DATA_DIR/logical_types.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S 'a_date Int32, b_timestamp_millis Int64, c_timestamp_micros Int64' -q 'select * from table'
echo = references
cat $DATA_DIR/references.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S "a String, c String" -q 'select * from table'
cat $DATA_DIR/references.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S "a String, c String" -q 'select * from table'
echo = compression
cat $DATA_DIR/simple.null.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S 'a Int64' -q 'select count() from table'
cat $DATA_DIR/simple.deflate.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S 'a Int64' -q 'select count() from table'
cat $DATA_DIR/simple.null.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S 'a Int64' -q 'select count() from table'
cat $DATA_DIR/simple.deflate.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S 'a Int64' -q 'select count() from table'
#snappy is optional
#cat $DATA_DIR/simple.snappy.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S 'a Int64' -q 'select count() from table'
#cat $DATA_DIR/simple.snappy.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S 'a Int64' -q 'select count() from table'
echo = other
#no data
cat $DATA_DIR/empty.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S 'a Int64' -q 'select count() from table'
cat $DATA_DIR/empty.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S 'a Int64' -q 'select count() from table'
# type mismatch
cat $DATA_DIR/simple.null.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S 'a Int32' -q 'select count() from table'
cat $DATA_DIR/simple.null.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S 'a Int32' -q 'select count() from table'
# field not found
cat $DATA_DIR/simple.null.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S 'b Int64' -q 'select count() from table' 2>&1 | grep -i 'not found' -o
cat $DATA_DIR/simple.null.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S 'b Int64' -q 'select count() from table' 2>&1 | grep -i 'not found' -o
......@@ -52,20 +52,20 @@ echo === output
echo = primitive
S1="a_bool UInt8, b_int Int32, c_long Int64, d_float Float32, e_double Float64, f_bytes String, g_string String"
echo '1,1,2,3.4,5.6,"b1","s1"' | ${CLICKHOUSE_LOCAL} --input-format CSV -S "$S1" -q "select * from table format Avro" | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S "$S1" -q 'select * from table'
echo '1,1,2,3.4,5.6,"b1","s1"' | ${CLICKHOUSE_LOCAL} --input-format CSV -S "$S1" -q "select * from table format Avro" | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S "$S1" -q 'select * from table'
echo = complex
S2="a_enum_to_string String, b_enum_to_enum Enum('t' = 1, 'f' = 0), c_array_string Array(String), d_array_array_string Array(Array(String)), e_union_null_string Nullable(String), f_union_long_null Nullable(Int64), g_fixed FixedString(32)"
echo "\"A\",\"t\",\"['s1','s2']\",\"[['a1'],['a2']]\",\"s1\",\N,\"79cd909892d7e7ade1987cc7422628ba\"" | ${CLICKHOUSE_LOCAL} --input-format CSV -S "$S2" -q "select * from table format Avro" | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S "$S2" -q 'select * from table'
echo "\"A\",\"t\",\"['s1','s2']\",\"[['a1'],['a2']]\",\"s1\",\N,\"79cd909892d7e7ade1987cc7422628ba\"" | ${CLICKHOUSE_LOCAL} --input-format CSV -S "$S2" -q "select * from table format Avro" | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S "$S2" -q 'select * from table'
echo = logical_types
S3="a_date Date, b_timestamp_millis DateTime64(3, 'UTC'), c_timestamp_micros DateTime64(6, 'UTC')"
echo '"2019-12-20","2020-01-10 07:31:56.227","2020-01-10 07:31:56.227000"' | ${CLICKHOUSE_LOCAL} --input-format CSV -S "$S3" -q "select * from table format Avro" | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S "$S3" -q 'select * from table'
echo '"2019-12-20","2020-01-10 07:31:56.227","2020-01-10 07:31:56.227000"' | ${CLICKHOUSE_LOCAL} --input-format CSV -S "$S3" -q "select * from table format Avro" | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S "$S3" -q 'select * from table'
echo = other
S4="a Int64"
${CLICKHOUSE_LOCAL} -q "select toInt64(number) as a from numbers(0) format Avro" | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S "$S4" -q 'select count() from table'
${CLICKHOUSE_LOCAL} -q "select toInt64(number) as a from numbers(1000) format Avro" | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S "$S4" -q 'select count() from table'
${CLICKHOUSE_LOCAL} -q "select toInt64(number) as a from numbers(0) format Avro" | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S "$S4" -q 'select count() from table'
${CLICKHOUSE_LOCAL} -q "select toInt64(number) as a from numbers(1000) format Avro" | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S "$S4" -q 'select count() from table'
# type supported via conversion
${CLICKHOUSE_LOCAL} -q "select toInt16(123) as a format Avro" | wc -c
\ No newline at end of file
DROP TABLE IF EXISTS json_square_brackets;
CREATE TABLE json_square_brackets (id UInt32, name String) ENGINE = Memory;
INSERT INTO json_square_brackets FORMAT JSONEachRow [{"id": 1, "name": "name1"}, {"id": 2, "name": "name2"}]
INSERT INTO json_square_brackets FORMAT JSONEachRow [{"id": 1, "name": "name1"}, {"id": 2, "name": "name2"}];
INSERT INTO json_square_brackets FORMAT JSONEachRow[];
INSERT INTO json_square_brackets FORMAT JSONEachRow [ ] ;
INSERT INTO json_square_brackets FORMAT JSONEachRow ;
SELECT * FROM json_square_brackets ORDER BY id;
DROP TABLE IF EXISTS json_square_brackets;
#!/usr/bin/env bash
set -e
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. $CUR_DIR/../shell_config.sh
${CLICKHOUSE_LOCAL} --query "SELECT '[' || arrayStringConcat(arrayMap(x -> '{\"id\": 1, \"name\": \"name1\"}', range(1000000)), ',') || ']'" | ${CLICKHOUSE_LOCAL} --query "SELECT count() FROM table" --input-format JSONEachRow --structure 'id UInt32, name String'
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册