提交 81ac6382 编写于 作者: A Anton Popov

slightly better performance

上级 6800e536
......@@ -39,6 +39,8 @@ class AggregateFunctionArgMinMax final : public IAggregateFunctionTupleArgHelper
private:
const DataTypePtr & type_res;
const DataTypePtr & type_val;
const SerializationPtr serialization_res;
const SerializationPtr serialization_val;
bool tuple_argument;
using Base = IAggregateFunctionTupleArgHelper<Data, AggregateFunctionArgMinMax<Data>, 2>;
......@@ -48,6 +50,8 @@ public:
: Base({type_res_, type_val_}, {}, tuple_argument_)
, type_res(this->argument_types[0])
, type_val(this->argument_types[1])
, serialization_res(type_res->getDefaultSerialization())
, serialization_val(type_val->getDefaultSerialization())
{
if (!type_val->isComparable())
throw Exception(
......@@ -84,14 +88,14 @@ public:
void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf) const override
{
this->data(place).result.write(buf, *type_res);
this->data(place).value.write(buf, *type_val);
this->data(place).result.write(buf, *serialization_res);
this->data(place).value.write(buf, *serialization_val);
}
void deserialize(AggregateDataPtr __restrict place, ReadBuffer & buf, Arena * arena) const override
{
this->data(place).result.read(buf, *type_res, arena);
this->data(place).value.read(buf, *type_val, arena);
this->data(place).result.read(buf, *serialization_res, arena);
this->data(place).value.read(buf, *serialization_val, arena);
}
bool allocatesMemoryInArena() const override { return Data::allocatesMemoryInArena(); }
......
......@@ -55,7 +55,8 @@ class AggregateFunctionGroupArrayInsertAtGeneric final
: public IAggregateFunctionDataHelper<AggregateFunctionGroupArrayInsertAtDataGeneric, AggregateFunctionGroupArrayInsertAtGeneric>
{
private:
DataTypePtr & type;
DataTypePtr type;
SerializationPtr serialization;
Field default_value;
UInt64 length_to_resize = 0; /// zero means - do not do resizing.
......@@ -63,6 +64,7 @@ public:
AggregateFunctionGroupArrayInsertAtGeneric(const DataTypes & arguments, const Array & params)
: IAggregateFunctionDataHelper<AggregateFunctionGroupArrayInsertAtDataGeneric, AggregateFunctionGroupArrayInsertAtGeneric>(arguments, params)
, type(argument_types[0])
, serialization(type->getDefaultSerialization())
{
if (!params.empty())
{
......@@ -154,7 +156,7 @@ public:
else
{
writeBinary(UInt8(0), buf);
type->getDefaultSerialization()->serializeBinary(elem, buf);
serialization->serializeBinary(elem, buf);
}
}
}
......@@ -175,7 +177,7 @@ public:
UInt8 is_null = 0;
readBinary(is_null, buf);
if (!is_null)
type->getDefaultSerialization()->deserializeBinary(arr[i], buf);
serialization->deserializeBinary(arr[i], buf);
}
}
......
......@@ -50,14 +50,14 @@ public:
assert_cast<ColVecType &>(to).insertDefault();
}
void write(WriteBuffer & buf, const IDataType & /*data_type*/) const
void write(WriteBuffer & buf, const ISerialization & /*serialization*/) const
{
writeBinary(has(), buf);
if (has())
writeBinary(value, buf);
}
void read(ReadBuffer & buf, const IDataType & /*data_type*/, Arena *)
void read(ReadBuffer & buf, const ISerialization & /*serialization*/, Arena *)
{
readBinary(has_value, buf);
if (has())
......@@ -221,14 +221,14 @@ public:
assert_cast<ColumnString &>(to).insertDefault();
}
void write(WriteBuffer & buf, const IDataType & /*data_type*/) const
void write(WriteBuffer & buf, const ISerialization & /*serialization*/) const
{
writeBinary(size, buf);
if (has())
buf.write(getData(), size);
}
void read(ReadBuffer & buf, const IDataType & /*data_type*/, Arena * arena)
void read(ReadBuffer & buf, const ISerialization & /*serialization*/, Arena * arena)
{
Int32 rhs_size;
readBinary(rhs_size, buf);
......@@ -427,24 +427,24 @@ public:
to.insertDefault();
}
void write(WriteBuffer & buf, const IDataType & data_type) const
void write(WriteBuffer & buf, const ISerialization & serialization) const
{
if (!value.isNull())
{
writeBinary(true, buf);
data_type.getDefaultSerialization()->serializeBinary(value, buf);
serialization.serializeBinary(value, buf);
}
else
writeBinary(false, buf);
}
void read(ReadBuffer & buf, const IDataType & data_type, Arena *)
void read(ReadBuffer & buf, const ISerialization & serialization, Arena *)
{
bool is_not_null;
readBinary(is_not_null, buf);
if (is_not_null)
data_type.getDefaultSerialization()->deserializeBinary(value, buf);
serialization.deserializeBinary(value, buf);
}
void change(const IColumn & column, size_t row_num, Arena *)
......@@ -678,15 +678,15 @@ struct AggregateFunctionAnyHeavyData : Data
return false;
}
void write(WriteBuffer & buf, const IDataType & data_type) const
void write(WriteBuffer & buf, const ISerialization & serialization) const
{
Data::write(buf, data_type);
Data::write(buf, serialization);
writeBinary(counter, buf);
}
void read(ReadBuffer & buf, const IDataType & data_type, Arena * arena)
void read(ReadBuffer & buf, const ISerialization & serialization, Arena * arena)
{
Data::read(buf, data_type, arena);
Data::read(buf, serialization, arena);
readBinary(counter, buf);
}
......@@ -698,12 +698,14 @@ template <typename Data>
class AggregateFunctionsSingleValue final : public IAggregateFunctionDataHelper<Data, AggregateFunctionsSingleValue<Data>>
{
private:
DataTypePtr & type;
DataTypePtr type;
SerializationPtr serialization;
public:
AggregateFunctionsSingleValue(const DataTypePtr & type_)
: IAggregateFunctionDataHelper<Data, AggregateFunctionsSingleValue<Data>>({type_}, {})
, type(this->argument_types[0])
, serialization(type->getDefaultSerialization())
{
if (StringRef(Data::name()) == StringRef("min")
|| StringRef(Data::name()) == StringRef("max"))
......@@ -733,12 +735,12 @@ public:
void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf) const override
{
this->data(place).write(buf, *type.get());
this->data(place).write(buf, *serialization);
}
void deserialize(AggregateDataPtr __restrict place, ReadBuffer & buf, Arena * arena) const override
{
this->data(place).read(buf, *type.get(), arena);
this->data(place).read(buf, *serialization, arena);
}
bool allocatesMemoryInArena() const override
......
......@@ -64,7 +64,9 @@ class AggregateFunctionMapBase : public IAggregateFunctionDataHelper<
{
private:
DataTypePtr keys_type;
SerializationPtr keys_serialization;
DataTypes values_types;
Serializations values_serializations;
public:
using Base = IAggregateFunctionDataHelper<
......@@ -72,9 +74,14 @@ public:
AggregateFunctionMapBase(const DataTypePtr & keys_type_,
const DataTypes & values_types_, const DataTypes & argument_types_)
: Base(argument_types_, {} /* parameters */), keys_type(keys_type_),
values_types(values_types_)
: Base(argument_types_, {} /* parameters */)
, keys_type(keys_type_)
, keys_serialization(keys_type->getDefaultSerialization())
, values_types(values_types_)
{
values_serializations.reserve(values_types.size());
for (const auto & type : values_types)
values_serializations.emplace_back(type->getDefaultSerialization());
}
DataTypePtr getReturnType() const override
......@@ -248,9 +255,9 @@ public:
for (const auto & elem : merged_maps)
{
keys_type->getDefaultSerialization()->serializeBinary(elem.first, buf);
keys_serialization->serializeBinary(elem.first, buf);
for (size_t col = 0; col < values_types.size(); ++col)
values_types[col]->getDefaultSerialization()->serializeBinary(elem.second[col], buf);
values_serializations[col]->serializeBinary(elem.second[col], buf);
}
}
......@@ -263,12 +270,12 @@ public:
for (size_t i = 0; i < size; ++i)
{
Field key;
keys_type->getDefaultSerialization()->deserializeBinary(key, buf);
keys_serialization->deserializeBinary(key, buf);
Array values;
values.resize(values_types.size());
for (size_t col = 0; col < values_types.size(); ++col)
values_types[col]->getDefaultSerialization()->deserializeBinary(values[col], buf);
values_serializations[col]->deserializeBinary(values[col], buf);
if constexpr (IsDecimalNumber<T>)
merged_maps[key.get<DecimalField<T>>()] = values;
......
......@@ -82,7 +82,7 @@ void NativeBlockInputStream::readData(const IDataType & type, ColumnPtr & column
auto serialization = type.getDefaultSerialization();
serialization->deserializeBinaryBulkStatePrefix(settings, state);
serialization->deserializeBinaryBulkWithMultipleStreams(column, rows, settings, state);
serialization->deserializeBinaryBulkWithMultipleStreams(column, rows, settings, state, nullptr);
if (column->size() != rows)
throw Exception("Cannot read all data in NativeBlockInputStream. Rows read: " + toString(column->size()) + ". Rows expected: " + toString(rows) + ".",
......
......@@ -174,7 +174,7 @@ public:
size_t limit,
DeserializeBinaryBulkSettings & settings,
DeserializeBinaryBulkStatePtr & state,
SubstreamsCache * cache = nullptr) const;
SubstreamsCache * cache) const;
/** Override these methods for data types that require just single stream (most of data types).
*/
......
......@@ -110,7 +110,7 @@ void SerializationFixedString::serializeTextEscaped(const IColumn & column, size
}
void SerializationFixedString::alignStringLength(size_t n, ColumnFixedString::Chars & data, size_t string_start)
void SerializationFixedString::alignStringLength(size_t n, PaddedPODArray<UInt8> & data, size_t string_start)
{
size_t length = data.size() - string_start;
if (length < n)
......
......@@ -44,7 +44,7 @@ public:
/// Makes sure that the length of a newly inserted string to `chars` is equal to getN().
/// If the length is less than getN() the function will add zero characters up to getN().
/// If the length is greater than getN() the function will throw an exception.
static void alignStringLength(size_t n, PaddedPODArray<UInt8> & chars, size_t old_size);
static void alignStringLength(size_t n, PaddedPODArray<UInt8> & data, size_t string_start);
};
}
......@@ -439,6 +439,7 @@ std::vector<DictionaryAttribute> DictionaryStructure::getAttributes(
name,
underlying_type,
initial_type,
initial_type->getDefaultSerialization(),
type,
expression,
null_value,
......
......@@ -58,6 +58,7 @@ struct DictionaryAttribute final
const std::string name;
const AttributeUnderlyingType underlying_type;
const DataTypePtr type;
const SerializationPtr serialization;
const DataTypePtr nested_type;
const std::string expression;
const Field null_value;
......
......@@ -358,7 +358,7 @@ void ExternalQueryBuilder::composeKeyCondition(const Columns & key_columns, cons
/// key_i=value_i
writeQuoted(key_description.name, out);
writeString("=", out);
key_description.type->getDefaultSerialization()->serializeTextQuoted(*key_columns[i], row, out, format_settings);
key_description.serialization->serializeTextQuoted(*key_columns[i], row, out, format_settings);
}
}
......@@ -415,7 +415,7 @@ void ExternalQueryBuilder::composeKeyTuple(const Columns & key_columns, const si
writeString(", ", out);
first = false;
(*dict_struct.key)[i].type->getDefaultSerialization()->serializeTextQuoted(*key_columns[i], row, out, format_settings);
(*dict_struct.key)[i].serialization->serializeTextQuoted(*key_columns[i], row, out, format_settings);
}
writeString(")", out);
......
......@@ -726,9 +726,10 @@ struct ConvertImplGenericToString
WriteBufferFromVector<ColumnString::Chars> write_buffer(data_to);
FormatSettings format_settings;
auto serialization = type.getDefaultSerialization();
for (size_t i = 0; i < size; ++i)
{
type.getDefaultSerialization()->serializeText(col_from, i, write_buffer, format_settings);
serialization->serializeText(col_from, i, write_buffer, format_settings);
writeChar(0, write_buffer);
offsets_to[i] = write_buffer.count();
}
......@@ -1109,11 +1110,12 @@ struct ConvertImplGenericFromString
size_t current_offset = 0;
FormatSettings format_settings;
auto serialization = data_type_to.getDefaultSerialization();
for (size_t i = 0; i < size; ++i)
{
ReadBufferFromMemory read_buffer(&chars[current_offset], offsets[i] - current_offset - 1);
data_type_to.getDefaultSerialization()->deserializeWholeText(column_to, read_buffer, format_settings);
serialization->deserializeWholeText(column_to, read_buffer, format_settings);
if (!read_buffer.eof())
throwExceptionForIncompletelyParsedValue(read_buffer, result_type);
......
......@@ -868,6 +868,8 @@ bool FunctionArrayElement::matchKeyToIndexNumberConst(
const IColumn & data, const Offsets & offsets,
const Field & index, PaddedPODArray<UInt64> & matched_idxs)
{
std::cerr << "index type: " << index.getTypeName() << "\n";
std::cerr << "index: " << toString(index) << "\n";
const auto * data_numeric = checkAndGetColumn<ColumnVector<DataType>>(&data);
if (!data_numeric)
return false;
......
......@@ -47,6 +47,8 @@ namespace DB
typename ColVecConc::Container & vec_concurrency = col_concurrency->getData();
std::multiset<typename ArgDataType::FieldType> ongoing_until;
auto begin_serializaion = arguments[0].type->getDefaultSerialization();
auto end_serialization = arguments[1].type->getDefaultSerialization();
for (size_t i = 0; i < input_rows_count; ++i)
{
const auto begin = vec_begin[i];
......@@ -56,8 +58,8 @@ namespace DB
{
const FormatSettings default_format;
WriteBufferFromOwnString buf_begin, buf_end;
arguments[0].type->getDefaultSerialization()->serializeTextQuoted(*(arguments[0].column), i, buf_begin, default_format);
arguments[1].type->getDefaultSerialization()->serializeTextQuoted(*(arguments[1].column), i, buf_end, default_format);
begin_serializaion->serializeTextQuoted(*(arguments[0].column), i, buf_begin, default_format);
end_serialization->serializeTextQuoted(*(arguments[1].column), i, buf_end, default_format);
throw Exception(
"Incorrect order of events: " + buf_begin.str() + " > " + buf_end.str(),
ErrorCodes::INCORRECT_DATA);
......
......@@ -58,11 +58,12 @@ public:
String tmp;
FormatSettings format_settings;
auto serialization = src.type->getDefaultSerialization();
for (size_t i = 0; i < size; ++i)
{
{
WriteBufferFromString out(tmp);
src.type->getDefaultSerialization()->serializeText(*src.column, i, out, format_settings);
serialization->serializeText(*src.column, i, out, format_settings);
}
res_data[i] = UTF8::countCodePoints(reinterpret_cast<const UInt8 *>(tmp.data()), tmp.size());
......
......@@ -107,7 +107,7 @@ void MergeTreeIndexGranuleSet::deserializeBinary(ReadBuffer & istr)
auto serialization = type->getDefaultSerialization();
serialization->deserializeBinaryBulkStatePrefix(settings, state);
serialization->deserializeBinaryBulkWithMultipleStreams(new_column, rows_to_read, settings, state);
serialization->deserializeBinaryBulkWithMultipleStreams(new_column, rows_to_read, settings, state, nullptr);
block.insert(ColumnWithTypeAndName(new_column, type, column.name));
}
......
......@@ -220,14 +220,14 @@ void MergeTreeReaderCompact::readData(
auto serialization = type_in_storage->getDefaultSerialization();
serialization->deserializeBinaryBulkStatePrefix(deserialize_settings, state);
serialization->deserializeBinaryBulkWithMultipleStreams(temp_column, rows_to_read, deserialize_settings, state);
serialization->deserializeBinaryBulkWithMultipleStreams(temp_column, rows_to_read, deserialize_settings, state, nullptr);
column = type_in_storage->getSubcolumn(name_and_type.getSubcolumnName(), *temp_column);
}
else
{
auto serialization = type->getDefaultSerialization();
serialization->deserializeBinaryBulkStatePrefix(deserialize_settings, state);
serialization->deserializeBinaryBulkWithMultipleStreams(column, rows_to_read, deserialize_settings, state);
serialization->deserializeBinaryBulkWithMultipleStreams(column, rows_to_read, deserialize_settings, state, nullptr);
}
/// The buffer is left in inconsistent state after reading single offsets
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册