提交 8cfe24ac 编写于 作者: A Alexander Kuzmenkov

Merge remote-tracking branch 'origin/master' into HEAD

......@@ -186,3 +186,4 @@
[submodule "contrib/cyrus-sasl"]
path = contrib/cyrus-sasl
url = https://github.com/cyrusimap/cyrus-sasl
branch = cyrus-sasl-2.1
......@@ -17,5 +17,4 @@ ClickHouse is an open-source column-oriented database management system that all
## Upcoming Events
* [ClickHouse for Edge Analytics](https://ones2020.sched.com/event/bWPs) on September 29, 2020.
* [ClickHouse online meetup (in Russian)](https://clck.ru/R2zB9) on October 1, 2020.
......@@ -6,7 +6,7 @@ if (ENABLE_CLANG_TIDY)
message(FATAL_ERROR "clang-tidy requires CMake version at least 3.6.")
endif()
find_program (CLANG_TIDY_PATH NAMES "clang-tidy" "clang-tidy-10" "clang-tidy-9" "clang-tidy-8")
find_program (CLANG_TIDY_PATH NAMES "clang-tidy" "clang-tidy-11" "clang-tidy-10" "clang-tidy-9" "clang-tidy-8")
if (CLANG_TIDY_PATH)
message(STATUS
......
......@@ -14,10 +14,10 @@ if (NOT ENABLE_RDKAFKA)
return()
endif()
if (NOT ARCH_ARM AND USE_LIBGSASL)
if (NOT ARCH_ARM)
option (USE_INTERNAL_RDKAFKA_LIBRARY "Set to FALSE to use system librdkafka instead of the bundled" ${NOT_UNBUNDLED})
elseif(USE_INTERNAL_RDKAFKA_LIBRARY)
message (${RECONFIGURE_MESSAGE_LEVEL} "Can't use internal librdkafka with ARCH_ARM=${ARCH_ARM} AND USE_LIBGSASL=${USE_LIBGSASL}")
message (${RECONFIGURE_MESSAGE_LEVEL} "Can't use internal librdkafka with ARCH_ARM=${ARCH_ARM}")
endif ()
if (NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/cppkafka/CMakeLists.txt")
......
Subproject commit 6054630889fd1cd8d0659573d69badcee1e23a00
Subproject commit 9995bf9d8e14f58934d9313ac64f13780d6dd3c9
Subproject commit 297fc905e166392156f83b96aaa5f44e8a6a35c4
Subproject commit 757d947235b307675cff964f29b19d388140a9eb
......@@ -133,6 +133,10 @@
"name": "yandex/clickhouse-postgresql-java-client",
"dependent": []
},
"docker/test/integration/kerberos_kdc": {
"name": "yandex/clickhouse-kerberos-kdc",
"dependent": []
},
"docker/test/base": {
"name": "yandex/clickhouse-test-base",
"dependent": [
......
......@@ -16,7 +16,8 @@ RUN apt-get update \
odbc-postgresql \
sqlite3 \
curl \
tar
tar \
krb5-user
RUN rm -rf \
/var/lib/apt/lists/* \
/var/cache/debconf \
......
# docker build -t yandex/clickhouse-kerberos-kdc .
FROM centos:6.6
# old OS to make is faster and smaller
RUN yum install -y krb5-server krb5-libs krb5-auth-dialog krb5-workstation
EXPOSE 88 749
RUN touch /config.sh
# should be overwritten e.g. via docker_compose volumes
# volumes: /some_path/my_kerberos_config.sh:/config.sh:ro
ENTRYPOINT ["/bin/bash", "/config.sh"]
version: '2.3'
services:
kafka_kerberized_zookeeper:
image: confluentinc/cp-zookeeper:5.2.0
# restart: always
hostname: kafka_kerberized_zookeeper
environment:
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_SERVERS: "kafka_kerberized_zookeeper:2888:3888"
KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/secrets/zookeeper_jaas.conf -Djava.security.krb5.conf=/etc/kafka/secrets/krb.conf -Dzookeeper.authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider -Dsun.security.krb5.debug=true"
volumes:
- ${KERBERIZED_KAFKA_DIR}/secrets:/etc/kafka/secrets
- /dev/urandom:/dev/random
depends_on:
- kafka_kerberos
security_opt:
- label:disable
kerberized_kafka1:
image: confluentinc/cp-kafka:5.2.0
# restart: always
hostname: kerberized_kafka1
ports:
- "9092:9092"
- "9093:9093"
environment:
KAFKA_LISTENERS: OUTSIDE://:19092,UNSECURED_OUTSIDE://:19093,UNSECURED_INSIDE://:9093
KAFKA_ADVERTISED_LISTENERS: OUTSIDE://kerberized_kafka1:19092,UNSECURED_OUTSIDE://kerberized_kafka1:19093,UNSECURED_INSIDE://localhost:9093
# KAFKA_LISTENERS: INSIDE://kerberized_kafka1:9092,OUTSIDE://kerberized_kafka1:19092
# KAFKA_ADVERTISED_LISTENERS: INSIDE://localhost:9092,OUTSIDE://kerberized_kafka1:19092
KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: GSSAPI
KAFKA_SASL_ENABLED_MECHANISMS: GSSAPI
KAFKA_SASL_KERBEROS_SERVICE_NAME: kafka
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: OUTSIDE:SASL_PLAINTEXT,UNSECURED_OUTSIDE:PLAINTEXT,UNSECURED_INSIDE:PLAINTEXT,
KAFKA_INTER_BROKER_LISTENER_NAME: OUTSIDE
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: "kafka_kerberized_zookeeper:2181"
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/secrets/broker_jaas.conf -Djava.security.krb5.conf=/etc/kafka/secrets/krb.conf -Dsun.security.krb5.debug=true"
volumes:
- ${KERBERIZED_KAFKA_DIR}/secrets:/etc/kafka/secrets
- /dev/urandom:/dev/random
depends_on:
- kafka_kerberized_zookeeper
- kafka_kerberos
security_opt:
- label:disable
kafka_kerberos:
image: yandex/clickhouse-kerberos-kdc:${DOCKER_KERBEROS_KDC_TAG}
hostname: kafka_kerberos
volumes:
- ${KERBERIZED_KAFKA_DIR}/secrets:/tmp/keytab
- ${KERBERIZED_KAFKA_DIR}/../../kerberos_image_config.sh:/config.sh
- /dev/urandom:/dev/random
ports: [88, 749]
......@@ -27,6 +27,7 @@ export DOCKER_MYSQL_JAVA_CLIENT_TAG=${DOCKER_MYSQL_JAVA_CLIENT_TAG:=latest}
export DOCKER_MYSQL_JS_CLIENT_TAG=${DOCKER_MYSQL_JS_CLIENT_TAG:=latest}
export DOCKER_MYSQL_PHP_CLIENT_TAG=${DOCKER_MYSQL_PHP_CLIENT_TAG:=latest}
export DOCKER_POSTGRESQL_JAVA_CLIENT_TAG=${DOCKER_POSTGRESQL_JAVA_CLIENT_TAG:=latest}
export DOCKER_KERBEROS_KDC_TAG=${DOCKER_KERBEROS_KDC_TAG:=latest}
cd /ClickHouse/tests/integration
exec "$@"
......@@ -181,6 +181,9 @@ function run_tests
# Randomize test order.
test_files=$(for f in $test_files; do echo "$f"; done | sort -R)
# Limit profiling time to 10 minutes, not to run for too long.
profile_seconds_left=600
# Run the tests.
test_name="<none>"
for test in $test_files
......@@ -194,15 +197,24 @@ function run_tests
test_name=$(basename "$test" ".xml")
echo test "$test_name"
# Don't profile if we're past the time limit.
# Use awk because bash doesn't support floating point arithmetics.
profile_seconds=$(awk "BEGIN { print ($profile_seconds_left > 0 ? 10 : 0) }")
TIMEFORMAT=$(printf "$test_name\t%%3R\t%%3U\t%%3S\n")
# The grep is to filter out set -x output and keep only time output.
# The '2>&1 >/dev/null' redirects stderr to stdout, and discards stdout.
{ \
time "$script_dir/perf.py" --host localhost localhost --port 9001 9002 \
--runs "$CHPC_RUNS" --max-queries "$CHPC_MAX_QUERIES" \
--profile-seconds "$profile_seconds" \
-- "$test" > "$test_name-raw.tsv" 2> "$test_name-err.log" ; \
} 2>&1 >/dev/null | tee >(grep -v ^+ >> "wall-clock-times.tsv") \
|| echo "Test $test_name failed with error code $?" >> "$test_name-err.log"
profile_seconds_left=$(awk -F' ' \
'BEGIN { s = '$profile_seconds_left'; } /^profile-total/ { s -= $2 } END { print s }' \
"$test_name-raw.tsv")
done
unset TIMEFORMAT
......@@ -294,6 +306,7 @@ for test_file in *-raw.tsv
do
test_name=$(basename "$test_file" "-raw.tsv")
sed -n "s/^query\t/$test_name\t/p" < "$test_file" >> "analyze/query-runs.tsv"
sed -n "s/^profile\t/$test_name\t/p" < "$test_file" >> "analyze/query-profiles.tsv"
sed -n "s/^client-time\t/$test_name\t/p" < "$test_file" >> "analyze/client-times.tsv"
sed -n "s/^report-threshold\t/$test_name\t/p" < "$test_file" >> "analyze/report-thresholds.tsv"
sed -n "s/^skipped\t/$test_name\t/p" < "$test_file" >> "analyze/skipped-tests.tsv"
......@@ -658,13 +671,15 @@ create view test_runs as
group by test
;
create table test_times_report engine File(TSV, 'report/test-times.tsv') as
select wall_clock_time_per_test.test, real,
toDecimal64(total_client_time, 3),
create view test_times_view as
select
wall_clock_time_per_test.test test,
real,
total_client_time,
queries,
toDecimal64(query_max, 3),
toDecimal64(real / queries, 3) avg_real_per_query,
toDecimal64(query_min, 3),
query_max,
real / queries avg_real_per_query,
query_min,
runs
from test_time
-- wall clock times are also measured for skipped tests, so don't
......@@ -673,7 +688,43 @@ create table test_times_report engine File(TSV, 'report/test-times.tsv') as
on wall_clock_time_per_test.test = test_time.test
full join test_runs
on test_runs.test = test_time.test
order by avg_real_per_query desc;
;
-- WITH TOTALS doesn't work with INSERT SELECT, so we have to jump through these
-- hoops: https://github.com/ClickHouse/ClickHouse/issues/15227
create view test_times_view_total as
select
'Total' test,
sum(real),
sum(total_client_time),
sum(queries),
max(query_max),
sum(real) / sum(queries) avg_real_per_query,
min(query_min),
-- Totaling the number of runs doesn't make sense, but use the max so
-- that the reporting script doesn't complain about queries being too
-- long.
max(runs)
from test_times_view
;
create table test_times_report engine File(TSV, 'report/test-times.tsv') as
select
test,
toDecimal64(real, 3),
toDecimal64(total_client_time, 3),
queries,
toDecimal64(query_max, 3),
toDecimal64(avg_real_per_query, 3),
toDecimal64(query_min, 3),
runs
from (
select * from test_times_view
union all
select * from test_times_view_total
)
order by test = 'Total' desc, avg_real_per_query desc
;
-- report for all queries page, only main metric
create table all_tests_report engine File(TSV, 'report/all-queries.tsv') as
......@@ -694,13 +745,12 @@ create table all_tests_report engine File(TSV, 'report/all-queries.tsv') as
test, query_index, query_display_name
from queries order by test, query_index;
-- queries for which we will build flamegraphs (see below)
create table queries_for_flamegraph engine File(TSVWithNamesAndTypes,
'report/queries-for-flamegraph.tsv') as
select test, query_index from queries where unstable_show or changed_show
;
-- Report of queries that have inconsistent 'short' markings:
-- 1) have short duration, but are not marked as 'short'
-- 2) the reverse -- marked 'short' but take too long.
-- The threshold for 2) is significantly larger than the threshold for 1), to
-- avoid jitter.
create view shortness
as select
(test, query_index) in
......@@ -718,11 +768,6 @@ create view shortness
and times.query_index = query_display_names.query_index
;
-- Report of queries that have inconsistent 'short' markings:
-- 1) have short duration, but are not marked as 'short'
-- 2) the reverse -- marked 'short' but take too long.
-- The threshold for 2) is significantly larger than the threshold for 1), to
-- avoid jitter.
create table inconsistent_short_marking_report
engine File(TSV, 'report/unexpected-query-duration.tsv')
as select
......@@ -759,18 +804,15 @@ create table all_query_metrics_tsv engine File(TSV, 'report/all-query-metrics.ts
" 2> >(tee -a report/errors.log 1>&2)
# Prepare source data for metrics and flamegraphs for unstable queries.
# Prepare source data for metrics and flamegraphs for queries that were profiled
# by perf.py.
for version in {right,left}
do
rm -rf data
clickhouse-local --query "
create view queries_for_flamegraph as
select * from file('report/queries-for-flamegraph.tsv', TSVWithNamesAndTypes,
'test text, query_index int');
create view query_runs as
create view query_profiles as
with 0 as left, 1 as right
select * from file('analyze/query-runs.tsv', TSV,
select * from file('analyze/query-profiles.tsv', TSV,
'test text, query_index int, query_id text, version UInt8, time float')
where version = $version
;
......@@ -782,15 +824,12 @@ create view query_display_names as select * from
create table unstable_query_runs engine File(TSVWithNamesAndTypes,
'unstable-query-runs.$version.rep') as
select query_runs.test test, query_runs.query_index query_index,
select query_profiles.test test, query_profiles.query_index query_index,
query_display_name, query_id
from query_runs
join queries_for_flamegraph on
query_runs.test = queries_for_flamegraph.test
and query_runs.query_index = queries_for_flamegraph.query_index
from query_profiles
left join query_display_names on
query_runs.test = query_display_names.test
and query_runs.query_index = query_display_names.query_index
query_profiles.test = query_display_names.test
and query_profiles.query_index = query_display_names.query_index
;
create view query_log as select *
......
......@@ -18,9 +18,22 @@ import xml.etree.ElementTree as et
from threading import Thread
from scipy import stats
total_start_seconds = time.perf_counter()
stage_start_seconds = total_start_seconds
def reportStageEnd(stage):
global stage_start_seconds, total_start_seconds
current = time.perf_counter()
print(f'stage\t{stage}\t{current - stage_start_seconds:.3f}\t{current - total_start_seconds:.3f}')
stage_start_seconds = current
def tsv_escape(s):
return s.replace('\\', '\\\\').replace('\t', '\\t').replace('\n', '\\n').replace('\r','')
parser = argparse.ArgumentParser(description='Run performance test.')
# Explicitly decode files as UTF-8 because sometimes we have Russian characters in queries, and LANG=C is set.
parser.add_argument('file', metavar='FILE', type=argparse.FileType('r', encoding='utf-8'), nargs=1, help='test description file')
......@@ -29,16 +42,21 @@ parser.add_argument('--port', nargs='*', default=[9000], help="Space-separated l
parser.add_argument('--runs', type=int, default=1, help='Number of query runs per server.')
parser.add_argument('--max-queries', type=int, default=None, help='Test no more than this number of queries, chosen at random.')
parser.add_argument('--queries-to-run', nargs='*', type=int, default=None, help='Space-separated list of indexes of queries to test.')
parser.add_argument('--profile-seconds', type=int, default=0, help='For how many seconds to profile a query for which the performance has changed.')
parser.add_argument('--long', action='store_true', help='Do not skip the tests tagged as long.')
parser.add_argument('--print-queries', action='store_true', help='Print test queries and exit.')
parser.add_argument('--print-settings', action='store_true', help='Print test settings and exit.')
args = parser.parse_args()
reportStageEnd('start')
test_name = os.path.splitext(os.path.basename(args.file[0].name))[0]
tree = et.parse(args.file[0])
root = tree.getroot()
reportStageEnd('parse')
# Process query parameters
subst_elems = root.findall('substitutions/substitution')
available_parameters = {} # { 'table': ['hits_10m', 'hits_100m'], ... }
......@@ -112,15 +130,21 @@ if not args.long:
sys.exit(0)
# Print report threshold for the test if it is set.
ignored_relative_change = 0.05
if 'max_ignored_relative_change' in root.attrib:
print(f'report-threshold\t{root.attrib["max_ignored_relative_change"]}')
ignored_relative_change = float(root.attrib["max_ignored_relative_change"])
print(f'report-threshold\t{ignored_relative_change}')
reportStageEnd('before-connect')
# Open connections
servers = [{'host': host, 'port': port} for (host, port) in zip(args.host, args.port)]
servers = [{'host': host or args.host[0], 'port': port or args.port[0]} for (host, port) in itertools.zip_longest(args.host, args.port)]
all_connections = [clickhouse_driver.Client(**server) for server in servers]
for s in servers:
print('server\t{}\t{}'.format(s['host'], s['port']))
for i, s in enumerate(servers):
print(f'server\t{i}\t{s["host"]}\t{s["port"]}')
reportStageEnd('connect')
# Run drop queries, ignoring errors. Do this before all other activity, because
# clickhouse_driver disconnects on error (this is not configurable), and the new
......@@ -135,6 +159,8 @@ for conn_index, c in enumerate(all_connections):
except:
pass
reportStageEnd('drop-1')
# Apply settings.
# If there are errors, report them and continue -- maybe a new test uses a setting
# that is not in master, but the queries can still run. If we have multiple
......@@ -152,6 +178,8 @@ for conn_index, c in enumerate(all_connections):
except:
print(traceback.format_exc(), file=sys.stderr)
reportStageEnd('settings')
# Check tables that should exist. If they don't exist, just skip this test.
tables = [e.text for e in root.findall('preconditions/table_exists')]
for t in tables:
......@@ -164,6 +192,8 @@ for t in tables:
print(f'skipped\t{tsv_escape(skipped_message)}')
sys.exit(0)
reportStageEnd('preconditions')
# Run create and fill queries. We will run them simultaneously for both servers,
# to save time.
# The weird search is to keep the relative order of elements, which matters, and
......@@ -194,6 +224,9 @@ for t in threads:
for t in threads:
t.join()
reportStageEnd('create')
# By default, test all queries.
queries_to_run = range(0, len(test_queries))
if args.max_queries:
......@@ -205,6 +238,7 @@ if args.queries_to_run:
queries_to_run = args.queries_to_run
# Run test queries.
profile_total_seconds = 0
for query_index in queries_to_run:
q = test_queries[query_index]
query_prefix = f'{test_name}.query{query_index}'
......@@ -324,34 +358,49 @@ for query_index in queries_to_run:
client_seconds = time.perf_counter() - start_seconds
print(f'client-time\t{query_index}\t{client_seconds}\t{server_seconds}')
#print(all_server_times)
#print(stats.ttest_ind(all_server_times[0], all_server_times[1], equal_var = False).pvalue)
# Run additional profiling queries to collect profile data, but only if test times appeared to be different.
# We have to do it after normal runs because otherwise it will affect test statistics too much
if len(all_server_times) == 2 and stats.ttest_ind(all_server_times[0], all_server_times[1], equal_var = False).pvalue < 0.1:
run = 0
while True:
run_id = f'{query_prefix}.profile{run}'
for conn_index, c in enumerate(this_query_connections):
try:
res = c.execute(q, query_id = run_id, settings = {'query_profiler_real_time_period_ns': 10000000})
print(f'profile\t{query_index}\t{run_id}\t{conn_index}\t{c.last_query.elapsed}')
except Exception as e:
# Add query id to the exception to make debugging easier.
e.args = (run_id, *e.args)
e.message = run_id + ': ' + e.message
raise
elapsed = c.last_query.elapsed
profile_seconds += elapsed
run += 1
# Don't spend too much time for profile runs
if run > args.runs or profile_seconds > 10:
break
# And don't bother with short queries
if len(all_server_times) != 2:
continue
if len(all_server_times[0]) < 3:
# Don't fail if for some reason there are not enough measurements.
continue
pvalue = stats.ttest_ind(all_server_times[0], all_server_times[1], equal_var = False).pvalue
median = [statistics.median(t) for t in all_server_times]
# Keep this consistent with the value used in report. Should eventually move
# to (median[1] - median[0]) / min(median), which is compatible with "times"
# difference we use in report (max(median) / min(median)).
relative_diff = (median[1] - median[0]) / median[0]
print(f'diff\t{query_index}\t{median[0]}\t{median[1]}\t{relative_diff}\t{pvalue}')
if abs(relative_diff) < ignored_relative_change or pvalue > 0.05:
continue
# Perform profile runs for fixed amount of time. Don't limit the number
# of runs, because we also have short queries.
profile_start_seconds = time.perf_counter()
run = 0
while time.perf_counter() - profile_start_seconds < args.profile_seconds:
run_id = f'{query_prefix}.profile{run}'
for conn_index, c in enumerate(this_query_connections):
try:
res = c.execute(q, query_id = run_id, settings = {'query_profiler_real_time_period_ns': 10000000})
print(f'profile\t{query_index}\t{run_id}\t{conn_index}\t{c.last_query.elapsed}')
except Exception as e:
# Add query id to the exception to make debugging easier.
e.args = (run_id, *e.args)
e.message = run_id + ': ' + e.message
raise
run += 1
profile_total_seconds += time.perf_counter() - profile_start_seconds
print(f'profile-total\t{profile_total_seconds}')
reportStageEnd('run')
# Run drop queries
drop_queries = substitute_parameters(drop_query_templates)
......@@ -359,3 +408,5 @@ for conn_index, c in enumerate(all_connections):
for q in drop_queries:
c.execute(q)
print(f'drop\t{conn_index}\t{c.last_query.elapsed}\t{tsv_escape(q)}')
reportStageEnd('drop-2')
......@@ -487,7 +487,7 @@ if args.report == 'main':
for r in rows:
anchor = f'{currentTableAnchor()}.{r[0]}'
total_runs = (int(r[7]) + 1) * 2 # one prewarm run, two servers
if float(r[5]) > allowed_average_run_time * total_runs:
if r[0] != 'Total' and float(r[5]) > allowed_average_run_time * total_runs:
# FIXME should be 15s max -- investigate parallel_insert
slow_average_tests += 1
attrs[5] = f'style="background: {color_bad}"'
......@@ -495,7 +495,7 @@ if args.report == 'main':
else:
attrs[5] = ''
if float(r[4]) > allowed_single_run_time * total_runs:
if r[0] != 'Total' and float(r[4]) > allowed_single_run_time * total_runs:
slow_average_tests += 1
attrs[4] = f'style="background: {color_bad}"'
errors_explained.append([f'<a href="./all-queries.html#all-query-times.{r[0]}.0">Some query of the test \'{r[0]}\' is too slow to run. See the all queries report'])
......
......@@ -165,6 +165,22 @@ Similar to GraphiteMergeTree, the Kafka engine supports extended configuration u
For a list of possible configuration options, see the [librdkafka configuration reference](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md). Use the underscore (`_`) instead of a dot in the ClickHouse configuration. For example, `check.crcs=true` will be `<check_crcs>true</check_crcs>`.
### Kerberos support {#kafka-kerberos-support}
To deal with Kerberos-aware Kafka, add `security_protocol` child element with `sasl_plaintext` value. It is enough if Kerberos ticket-granting ticket is obtained and cached by OS facilities.
ClickHouse is able to maintain Kerberos credentials using a keytab file. Consider `sasl_kerberos_service_name`, `sasl_kerberos_keytab`, `sasl_kerberos_principal` and `sasl.kerberos.kinit.cmd` child elements.
Example:
``` xml
<!-- Kerberos-aware Kafka -->
<kafka>
<security_protocol>SASL_PLAINTEXT</security_protocol>
<sasl_kerberos_keytab>/home/kafkauser/kafkauser.keytab</sasl_kerberos_keytab>
<sasl_kerberos_principal>kafkauser/kafkahost@EXAMPLE.COM</sasl_kerberos_principal>
</kafka>
```
## Virtual Columns {#virtual-columns}
- `_topic` — Kafka topic.
......
......@@ -357,7 +357,7 @@ SELECT date_trunc('hour', now())
## now {#now}
Accepts zero arguments and returns the current time at one of the moments of request execution.
Accepts zero or one arguments(timezone) and returns the current time at one of the moments of request execution, or current time of specific timezone at one of the moments of request execution if `timezone` argument provided.
This function returns a constant, even if the request took a long time to complete.
## today {#today}
......
---
machine_translated: true
machine_translated_rev: 72537a2d527c63c07aa5d2361a8829f3895cf2bd
machine_translated: false
machine_translated_rev:
toc_priority: 0
toc_title: "Descripci\xF3n"
toc_title: "Descripción"
---
# ¿Qué es ClickHouse? {#what-is-clickhouse}
ClickHouse es un sistema de gestión de bases de datos orientado a columnas (DBMS) para el procesamiento analítico en línea de consultas (OLAP).
ClickHouse es un sistema de gestión de bases de datos (DBMS), orientado a columnas, para el procesamiento analítico de consultas en línea (OLAP).
En un “normal” DBMS orientado a filas, los datos se almacenan en este orden:
En un DBMS “normal”, orientado a filas, los datos se almacenan en este orden:
| Fila | Argumento | JavaEnable | Titular | GoodEvent | EventTime |
|------|-------------|------------|---------------------------|-----------|---------------------|
......@@ -36,7 +36,7 @@ Estos ejemplos solo muestran el orden en el que se organizan los datos. Los valo
Ejemplos de un DBMS orientado a columnas: Vertica, Paraccel (Actian Matrix y Amazon Redshift), Sybase IQ, Exasol, Infobright, InfiniDB, MonetDB (VectorWise y Actian Vector), LucidDB, SAP HANA, Google Dremel, Google PowerDrill, Druid y kdb+.
Different orders for storing data are better suited to different scenarios. The data access scenario refers to what queries are made, how often, and in what proportion; how much data is read for each type of query – rows, columns, and bytes; the relationship between reading and updating data; the working size of the data and how locally it is used; whether transactions are used, and how isolated they are; requirements for data replication and logical integrity; requirements for latency and throughput for each type of query, and so on.
Los diferentes modos de ordenar los datos al guardarlos se adecúan mejor a diferentes escenarios. El escenario de acceso a los datos se refiere a qué consultas se hacen, con qué frecuencia y en qué proporción; cuántos datos se leen para cada tipo de consulta - filas, columnas y bytes; la relación entre lectura y actualización de datos; el tamaño de trabajo de los datos y qué tan localmente son usados; si se usan transacciones y qué tan aisladas están;requerimientos de replicación de los datos y de integridad lógica, requerimientos de latencia y caudal (throughput) para cada tipo de consulta, y cosas por el estilo.
Cuanto mayor sea la carga en el sistema, más importante es personalizar el sistema configurado para que coincida con los requisitos del escenario de uso, y más fino será esta personalización. No existe un sistema que sea igualmente adecuado para escenarios significativamente diferentes. Si un sistema es adaptable a un amplio conjunto de escenarios, bajo una carga alta, el sistema manejará todos los escenarios igualmente mal, o funcionará bien para solo uno o algunos de los escenarios posibles.
......
......@@ -18,7 +18,7 @@ Markdown==3.2.1
MarkupSafe==1.1.1
mkdocs==1.1.2
mkdocs-htmlproofer-plugin==0.0.3
mkdocs-macros-plugin==0.4.13
mkdocs-macros-plugin==0.4.17
nltk==3.5
nose==1.3.7
protobuf==3.13.0
......
......@@ -35,12 +35,14 @@ namespace ErrorCodes
extern const int CANNOT_CREATE_CHILD_PROCESS;
}
ShellCommand::ShellCommand(pid_t pid_, int in_fd_, int out_fd_, int err_fd_, bool terminate_in_destructor_)
ShellCommand::ShellCommand(pid_t pid_, int & in_fd_, int & out_fd_, int & err_fd_, bool terminate_in_destructor_)
: pid(pid_)
, terminate_in_destructor(terminate_in_destructor_)
, in(in_fd_)
, out(out_fd_)
, err(err_fd_) {}
, err(err_fd_)
{
}
Poco::Logger * ShellCommand::getLogger()
{
......@@ -144,12 +146,6 @@ std::unique_ptr<ShellCommand> ShellCommand::executeImpl(
pid, pipe_stdin.fds_rw[1], pipe_stdout.fds_rw[0], pipe_stderr.fds_rw[0], terminate_in_destructor));
LOG_TRACE(getLogger(), "Started shell command '{}' with pid {}", filename, pid);
/// Now the ownership of the file descriptors is passed to the result.
pipe_stdin.fds_rw[1] = -1;
pipe_stdout.fds_rw[0] = -1;
pipe_stderr.fds_rw[0] = -1;
return res;
}
......
......@@ -30,7 +30,7 @@ private:
bool wait_called = false;
bool terminate_in_destructor;
ShellCommand(pid_t pid_, int in_fd_, int out_fd_, int err_fd_, bool terminate_in_destructor_);
ShellCommand(pid_t pid_, int & in_fd_, int & out_fd_, int & err_fd_, bool terminate_in_destructor_);
static Poco::Logger * getLogger();
......
......@@ -234,10 +234,16 @@ void ThreadPoolImpl<Thread>::worker(typename std::list<Thread>::iterator thread_
std::is_same_v<Thread, std::thread> ? CurrentMetrics::GlobalThreadActive : CurrentMetrics::LocalThreadActive);
job();
/// job should be reseted before decrementing scheduled_jobs to
/// ensure that the Job destroyed before wait() returns.
job = {};
}
catch (...)
{
/// job should be reseted before decrementing scheduled_jobs to
/// ensure that the Job destroyed before wait() returns.
job = {};
{
std::unique_lock lock(mutex);
if (!first_exception)
......
......@@ -11,6 +11,7 @@
#include <Poco/Event.h>
#include <Common/ThreadStatus.h>
#include <ext/scope_guard.h>
/** Very simple thread pool similar to boost::threadpool.
......@@ -161,21 +162,19 @@ public:
GlobalThreadPool::instance().scheduleOrThrow([
state = state,
func = std::forward<Function>(func),
args = std::make_tuple(std::forward<Args>(args)...)]
args = std::make_tuple(std::forward<Args>(args)...)]() mutable /// mutable is needed to destroy capture
{
try
{
/// Thread status holds raw pointer on query context, thus it always must be destroyed
/// before sending signal that permits to join this thread.
DB::ThreadStatus thread_status;
std::apply(func, args);
}
catch (...)
{
state->set();
throw;
}
state->set();
SCOPE_EXIT(state->set());
/// This moves are needed to destroy function and arguments before exit.
/// It will guarantee that after ThreadFromGlobalPool::join all captured params are destroyed.
auto function = std::move(func);
auto arguments = std::move(args);
/// Thread status holds raw pointer on query context, thus it always must be destroyed
/// before sending signal that permits to join this thread.
DB::ThreadStatus thread_status;
std::apply(function, arguments);
});
}
......
......@@ -663,7 +663,7 @@ bool ZooKeeper::waitForDisappear(const std::string & path, const WaitCondition &
{
WaitForDisappearStatePtr state = std::make_shared<WaitForDisappearState>();
auto callback = [state](const Coordination::ExistsResponse & response)
auto callback = [state](const Coordination::GetResponse & response)
{
state->code = int32_t(response.error);
if (state->code)
......@@ -683,8 +683,9 @@ bool ZooKeeper::waitForDisappear(const std::string & path, const WaitCondition &
while (!condition || !condition())
{
/// NOTE: if the node doesn't exist, the watch will leak.
impl->exists(path, callback, watch);
/// Use getData insteand of exists to avoid watch leak.
impl->get(path, callback, watch);
if (!condition)
state->event.wait();
else if (!state->event.tryWait(1000))
......
......@@ -422,6 +422,18 @@ void ZooKeeperRequest::write(WriteBuffer & out) const
}
static void removeRootPath(String & path, const String & root_path)
{
if (root_path.empty())
return;
if (path.size() <= root_path.size())
throw Exception("Received path is not longer than root_path", Error::ZDATAINCONSISTENCY);
path = path.substr(root_path.size());
}
struct ZooKeeperResponse : virtual Response
{
virtual ~ZooKeeperResponse() override = default;
......@@ -1092,8 +1104,6 @@ void ZooKeeper::sendThread()
{
info.request->has_watch = true;
CurrentMetrics::add(CurrentMetrics::ZooKeeperWatch);
std::lock_guard lock(watches_mutex);
watches[info.request->getPath()].emplace_back(std::move(info.watch));
}
if (expired)
......@@ -1278,6 +1288,30 @@ void ZooKeeper::receiveEvent()
response->removeRootPath(root_path);
}
/// Instead of setting the watch in sendEvent, set it in receiveEvent becuase need to check the response.
/// The watch shouldn't be set if the node does not exist and it will never exist like sequential ephemeral nodes.
/// By using getData() instead of exists(), a watch won't be set if the node doesn't exist.
if (request_info.watch)
{
bool add_watch = false;
/// 3 indicates the ZooKeeperExistsRequest.
// For exists, we set the watch on both node exist and nonexist case.
// For other case like getData, we only set the watch when node exists.
if (request_info.request->getOpNum() == 3)
add_watch = (response->error == Error::ZOK || response->error == Error::ZNONODE);
else
add_watch = response->error == Error::ZOK;
if (add_watch)
{
/// The key of wathces should exclude the root_path
String req_path = request_info.request->getPath();
removeRootPath(req_path, root_path);
std::lock_guard lock(watches_mutex);
watches[req_path].emplace_back(std::move(request_info.watch));
}
}
int32_t actual_length = in->count() - count_before_event;
if (length != actual_length)
throw Exception("Response length doesn't match. Expected: " + toString(length) + ", actual: " + toString(actual_length), Error::ZMARSHALLINGERROR);
......
......@@ -136,7 +136,7 @@ void CompressionCodecDelta::doDecompressData(const char * source, UInt32 source_
namespace
{
UInt8 getDeltaBytesSize(DataTypePtr column_type)
UInt8 getDeltaBytesSize(const IDataType * column_type)
{
if (!column_type->isValueUnambiguouslyRepresentedInFixedSizeContiguousMemoryRegion())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Codec Delta is not applicable for {} because the data type is not of fixed size",
......@@ -155,7 +155,7 @@ UInt8 getDeltaBytesSize(DataTypePtr column_type)
void registerCodecDelta(CompressionCodecFactory & factory)
{
UInt8 method_code = UInt8(CompressionMethodByte::Delta);
factory.registerCompressionCodecWithType("Delta", method_code, [&](const ASTPtr & arguments, DataTypePtr column_type) -> CompressionCodecPtr
factory.registerCompressionCodecWithType("Delta", method_code, [&](const ASTPtr & arguments, const IDataType * column_type) -> CompressionCodecPtr
{
UInt8 delta_bytes_size = 0;
......
......@@ -307,7 +307,7 @@ void decompressDataForType(const char * source, UInt32 source_size, char * dest)
}
}
UInt8 getDataBytesSize(DataTypePtr column_type)
UInt8 getDataBytesSize(const IDataType * column_type)
{
if (!column_type->isValueUnambiguouslyRepresentedInFixedSizeContiguousMemoryRegion())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Codec DoubleDelta is not applicable for {} because the data type is not of fixed size",
......@@ -413,7 +413,7 @@ void registerCodecDoubleDelta(CompressionCodecFactory & factory)
{
UInt8 method_code = UInt8(CompressionMethodByte::DoubleDelta);
factory.registerCompressionCodecWithType("DoubleDelta", method_code,
[&](const ASTPtr & arguments, DataTypePtr column_type) -> CompressionCodecPtr
[&](const ASTPtr & arguments, const IDataType * column_type) -> CompressionCodecPtr
{
if (arguments)
throw Exception("Codec DoubleDelta does not accept any arguments", ErrorCodes::BAD_ARGUMENTS);
......
......@@ -222,7 +222,7 @@ void decompressDataForType(const char * source, UInt32 source_size, char * dest)
}
}
UInt8 getDataBytesSize(DataTypePtr column_type)
UInt8 getDataBytesSize(const IDataType * column_type)
{
if (!column_type->isValueUnambiguouslyRepresentedInFixedSizeContiguousMemoryRegion())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Codec Gorilla is not applicable for {} because the data type is not of fixed size",
......@@ -329,7 +329,7 @@ void registerCodecGorilla(CompressionCodecFactory & factory)
{
UInt8 method_code = UInt8(CompressionMethodByte::Gorilla);
factory.registerCompressionCodecWithType("Gorilla", method_code,
[&](const ASTPtr & arguments, DataTypePtr column_type) -> CompressionCodecPtr
[&](const ASTPtr & arguments, const IDataType * column_type) -> CompressionCodecPtr
{
if (arguments)
throw Exception("Codec Gorilla does not accept any arguments", ErrorCodes::BAD_ARGUMENTS);
......
......@@ -136,7 +136,7 @@ TypeIndex baseType(TypeIndex type_idx)
return TypeIndex::Nothing;
}
TypeIndex typeIdx(const DataTypePtr & data_type)
TypeIndex typeIdx(const IDataType * data_type)
{
if (!data_type)
return TypeIndex::Nothing;
......@@ -656,7 +656,7 @@ void CompressionCodecT64::updateHash(SipHash & hash) const
void registerCodecT64(CompressionCodecFactory & factory)
{
auto reg_func = [&](const ASTPtr & arguments, DataTypePtr type) -> CompressionCodecPtr
auto reg_func = [&](const ASTPtr & arguments, const IDataType * type) -> CompressionCodecPtr
{
Variant variant = Variant::Byte;
......@@ -683,7 +683,7 @@ void registerCodecT64(CompressionCodecFactory & factory)
auto type_idx = typeIdx(type);
if (type && type_idx == TypeIndex::Nothing)
throw Exception("T64 codec is not supported for specified type", ErrorCodes::ILLEGAL_SYNTAX_FOR_CODEC_TYPE);
throw Exception("T64 codec is not supported for specified type " + type->getName(), ErrorCodes::ILLEGAL_SYNTAX_FOR_CODEC_TYPE);
return std::make_shared<CompressionCodecT64>(type_idx, variant);
};
......
......@@ -6,6 +6,7 @@
#include <IO/ReadBuffer.h>
#include <Parsers/queryToString.h>
#include <Compression/CompressionCodecMultiple.h>
#include <Compression/CompressionCodecNone.h>
#include <IO/WriteHelpers.h>
#include <boost/algorithm/string/join.hpp>
......@@ -57,7 +58,7 @@ void CompressionCodecFactory::validateCodec(const String & family_name, std::opt
}
}
ASTPtr CompressionCodecFactory::validateCodecAndGetPreprocessedAST(const ASTPtr & ast, DataTypePtr column_type, bool sanity_check) const
ASTPtr CompressionCodecFactory::validateCodecAndGetPreprocessedAST(const ASTPtr & ast, const IDataType * column_type, bool sanity_check) const
{
if (const auto * func = ast->as<ASTFunction>())
{
......@@ -67,6 +68,7 @@ ASTPtr CompressionCodecFactory::validateCodecAndGetPreprocessedAST(const ASTPtr
bool has_none = false;
std::optional<size_t> generic_compression_codec_pos;
bool can_substitute_codec_arguments = true;
for (size_t i = 0; i < func->arguments->children.size(); ++i)
{
const auto & inner_codec_ast = func->arguments->children[i];
......@@ -99,7 +101,34 @@ ASTPtr CompressionCodecFactory::validateCodecAndGetPreprocessedAST(const ASTPtr
}
else
{
result_codec = getImpl(codec_family_name, codec_arguments, column_type);
if (column_type)
{
CompressionCodecPtr prev_codec;
IDataType::StreamCallback callback = [&](const IDataType::SubstreamPath & substream_path, const IDataType & substream_type)
{
if (IDataType::isSpecialCompressionAllowed(substream_path))
{
result_codec = getImpl(codec_family_name, codec_arguments, &substream_type);
/// Case for column Tuple, which compressed with codec which depends on data type, like Delta.
/// We cannot substitute parameters for such codecs.
if (prev_codec && prev_codec->getHash() != result_codec->getHash())
can_substitute_codec_arguments = false;
prev_codec = result_codec;
}
};
IDataType::SubstreamPath stream_path;
column_type->enumerateStreams(callback, stream_path);
if (!result_codec)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot find any substream with data type for type {}. It's a bug", column_type->getName());
}
else
{
result_codec = getImpl(codec_family_name, codec_arguments, nullptr);
}
codecs_descriptions->children.emplace_back(result_codec->getCodecDesc());
}
......@@ -140,16 +169,30 @@ ASTPtr CompressionCodecFactory::validateCodecAndGetPreprocessedAST(const ASTPtr
" (Note: you can enable setting 'allow_suspicious_codecs' to skip this check).", ErrorCodes::BAD_ARGUMENTS);
}
std::shared_ptr<ASTFunction> result = std::make_shared<ASTFunction>();
result->name = "CODEC";
result->arguments = codecs_descriptions;
return result;
/// For columns with nested types like Tuple(UInt32, UInt64) we
/// obviously cannot substitute parameters for codecs which depend on
/// data type, because for the first column Delta(4) is suitable and
/// Delta(8) for the second. So we should leave codec description as is
/// and deduce them in get method for each subtype separately. For all
/// other types it's better to substitute parameters, for better
/// readability and backward compatibility.
if (can_substitute_codec_arguments)
{
std::shared_ptr<ASTFunction> result = std::make_shared<ASTFunction>();
result->name = "CODEC";
result->arguments = codecs_descriptions;
return result;
}
else
{
return ast;
}
}
throw Exception("Unknown codec family: " + queryToString(ast), ErrorCodes::UNKNOWN_CODEC);
}
CompressionCodecPtr CompressionCodecFactory::get(const ASTPtr & ast, DataTypePtr column_type, CompressionCodecPtr current_default) const
CompressionCodecPtr CompressionCodecFactory::get(const ASTPtr & ast, const IDataType * column_type, CompressionCodecPtr current_default, bool only_generic) const
{
if (current_default == nullptr)
current_default = default_codec;
......@@ -175,10 +218,16 @@ CompressionCodecPtr CompressionCodecFactory::get(const ASTPtr & ast, DataTypePtr
else
throw Exception("Unexpected AST element for compression codec", ErrorCodes::UNEXPECTED_AST_STRUCTURE);
CompressionCodecPtr codec;
if (codec_family_name == DEFAULT_CODEC_NAME)
codecs.emplace_back(current_default);
codec = current_default;
else
codecs.emplace_back(getImpl(codec_family_name, codec_arguments, column_type));
codec = getImpl(codec_family_name, codec_arguments, column_type);
if (only_generic && !codec->isGenericCompression())
continue;
codecs.emplace_back(codec);
}
CompressionCodecPtr res;
......@@ -187,6 +236,8 @@ CompressionCodecPtr CompressionCodecFactory::get(const ASTPtr & ast, DataTypePtr
return codecs.back();
else if (codecs.size() > 1)
return std::make_shared<CompressionCodecMultiple>(codecs);
else
return std::make_shared<CompressionCodecNone>();
}
throw Exception("Unexpected AST structure for compression codec: " + queryToString(ast), ErrorCodes::UNEXPECTED_AST_STRUCTURE);
......@@ -203,7 +254,7 @@ CompressionCodecPtr CompressionCodecFactory::get(const uint8_t byte_code) const
}
CompressionCodecPtr CompressionCodecFactory::getImpl(const String & family_name, const ASTPtr & arguments, DataTypePtr column_type) const
CompressionCodecPtr CompressionCodecFactory::getImpl(const String & family_name, const ASTPtr & arguments, const IDataType * column_type) const
{
if (family_name == "Multiple")
throw Exception("Codec Multiple cannot be specified directly", ErrorCodes::UNKNOWN_CODEC);
......@@ -235,7 +286,7 @@ void CompressionCodecFactory::registerCompressionCodecWithType(
void CompressionCodecFactory::registerCompressionCodec(const String & family_name, std::optional<uint8_t> byte_code, Creator creator)
{
registerCompressionCodecWithType(family_name, byte_code, [family_name, creator](const ASTPtr & ast, DataTypePtr /* data_type */)
registerCompressionCodecWithType(family_name, byte_code, [family_name, creator](const ASTPtr & ast, const IDataType * /* data_type */)
{
return creator(ast);
});
......
......@@ -26,7 +26,7 @@ class CompressionCodecFactory final : private boost::noncopyable
{
protected:
using Creator = std::function<CompressionCodecPtr(const ASTPtr & parameters)>;
using CreatorWithType = std::function<CompressionCodecPtr(const ASTPtr & parameters, DataTypePtr column_type)>;
using CreatorWithType = std::function<CompressionCodecPtr(const ASTPtr & parameters, const IDataType * column_type)>;
using SimpleCreator = std::function<CompressionCodecPtr()>;
using CompressionCodecsDictionary = std::unordered_map<String, CreatorWithType>;
using CompressionCodecsCodeDictionary = std::unordered_map<uint8_t, CreatorWithType>;
......@@ -38,7 +38,13 @@ public:
CompressionCodecPtr getDefaultCodec() const;
/// Validate codecs AST specified by user and parses codecs description (substitute default parameters)
ASTPtr validateCodecAndGetPreprocessedAST(const ASTPtr & ast, DataTypePtr column_type, bool sanity_check) const;
ASTPtr validateCodecAndGetPreprocessedAST(const ASTPtr & ast, const IDataType * column_type, bool sanity_check) const;
/// Just wrapper for previous method.
ASTPtr validateCodecAndGetPreprocessedAST(const ASTPtr & ast, const DataTypePtr & column_type, bool sanity_check) const
{
return validateCodecAndGetPreprocessedAST(ast, column_type.get(), sanity_check);
}
/// Validate codecs AST specified by user
void validateCodec(const String & family_name, std::optional<int> level, bool sanity_check) const;
......@@ -47,8 +53,18 @@ public:
/// information about type to improve inner settings, but every codec should
/// be able to work without information about type. Also AST can contain
/// codec, which can be alias to current default codec, which can be changed
/// in runtime.
CompressionCodecPtr get(const ASTPtr & ast, DataTypePtr column_type, CompressionCodecPtr current_default = nullptr) const;
/// in runtime. If only_generic is true than method will filter all
/// isGenericCompression() == false codecs from result. If nothing found
/// will return codec NONE. It's useful for auxiliary parts of complex columns
/// like Nullable, Array and so on. If all codecs are non generic and
/// only_generic = true, than codec NONE will be returned.
CompressionCodecPtr get(const ASTPtr & ast, const IDataType * column_type, CompressionCodecPtr current_default = nullptr, bool only_generic = false) const;
/// Just wrapper for previous method.
CompressionCodecPtr get(const ASTPtr & ast, const DataTypePtr & column_type, CompressionCodecPtr current_default = nullptr, bool only_generic = false) const
{
return get(ast, column_type.get(), current_default, only_generic);
}
/// Get codec by method byte (no params available)
CompressionCodecPtr get(const uint8_t byte_code) const;
......@@ -65,7 +81,7 @@ public:
void registerSimpleCompressionCodec(const String & family_name, std::optional<uint8_t> byte_code, SimpleCreator creator);
protected:
CompressionCodecPtr getImpl(const String & family_name, const ASTPtr & arguments, DataTypePtr column_type) const;
CompressionCodecPtr getImpl(const String & family_name, const ASTPtr & arguments, const IDataType * column_type) const;
private:
CompressionCodecsDictionary family_name_with_codec;
......
......@@ -7,6 +7,7 @@
#include <Common/Exception.h>
#include <Parsers/queryToString.h>
#include <Parsers/ASTIdentifier.h>
#include <Compression/CompressionCodecMultiple.h>
namespace DB
......
......@@ -17,7 +17,6 @@ using CompressionCodecPtr = std::shared_ptr<ICompressionCodec>;
using Codecs = std::vector<CompressionCodecPtr>;
class IDataType;
using DataTypePtr = std::shared_ptr<const IDataType>;
/**
* Represents interface for compression codecs like LZ4, ZSTD, etc.
......
......@@ -86,7 +86,7 @@ struct MultiEnum
return right.operator==(left);
}
template <typename L>
template <typename L, typename = typename std::enable_if<!std::is_same_v<L, MultiEnum>>::type>
friend bool operator!=(L left, MultiEnum right)
{
return !(right.operator==(left));
......
......@@ -462,7 +462,7 @@ class IColumn;
M(String, format_custom_result_after_delimiter, "", "Suffix after result set (for CustomSeparated format)", 0) \
\
M(String, format_regexp, "", "Regular expression (for Regexp format)", 0) \
M(String, format_regexp_escaping_rule, "Escaped", "Field escaping rule (for Regexp format)", 0) \
M(String, format_regexp_escaping_rule, "Raw", "Field escaping rule (for Regexp format)", 0) \
M(Bool, format_regexp_skip_unmatched, false, "Skip lines unmatched by regular expression (for Regexp format", 0) \
\
M(Bool, output_format_enable_streaming, false, "Enable streaming in output formats that support it.", 0) \
......
......@@ -151,7 +151,7 @@ namespace
void DataTypeArray::enumerateStreams(const StreamCallback & callback, SubstreamPath & path) const
{
path.push_back(Substream::ArraySizes);
callback(path);
callback(path, *this);
path.back() = Substream::ArrayElements;
nested->enumerateStreams(callback, path);
path.pop_back();
......
......@@ -54,7 +54,7 @@ void DataTypeLowCardinality::enumerateStreams(const StreamCallback & callback, S
path.push_back(Substream::DictionaryKeys);
dictionary_type->enumerateStreams(callback, path);
path.back() = Substream::DictionaryIndexes;
callback(path);
callback(path, *this);
path.pop_back();
}
......
......@@ -44,7 +44,7 @@ bool DataTypeNullable::onlyNull() const
void DataTypeNullable::enumerateStreams(const StreamCallback & callback, SubstreamPath & path) const
{
path.push_back(Substream::NullMap);
callback(path);
callback(path, *this);
path.back() = Substream::NullableElements;
nested_data_type->enumerateStreams(callback, path);
path.pop_back();
......
......@@ -130,6 +130,18 @@ String IDataType::getFileNameForStream(const String & column_name, const IDataTy
}
bool IDataType::isSpecialCompressionAllowed(const SubstreamPath & path)
{
for (const Substream & elem : path)
{
if (elem.type == Substream::NullMap
|| elem.type == Substream::ArraySizes
|| elem.type == Substream::DictionaryIndexes)
return false;
}
return true;
}
void IDataType::insertDefaultInto(IColumn & column) const
{
column.insertDefault();
......
......@@ -104,10 +104,11 @@ public:
using SubstreamPath = std::vector<Substream>;
using StreamCallback = std::function<void(const SubstreamPath &)>;
using StreamCallback = std::function<void(const SubstreamPath &, const IDataType &)>;
virtual void enumerateStreams(const StreamCallback & callback, SubstreamPath & path) const
{
callback(path);
callback(path, *this);
}
void enumerateStreams(const StreamCallback & callback, SubstreamPath && path) const { enumerateStreams(callback, path); }
void enumerateStreams(const StreamCallback & callback) const { enumerateStreams(callback, {}); }
......@@ -442,6 +443,10 @@ public:
static String getFileNameForStream(const String & column_name, const SubstreamPath & path);
/// Substream path supports special compression methods like codec Delta.
/// For all other substreams (like ArraySizes, NullMasks, etc.) we use only
/// generic compression codecs like LZ4.
static bool isSpecialCompressionAllowed(const SubstreamPath & path);
private:
friend class DataTypeFactory;
/// Customize this DataType
......@@ -685,4 +690,3 @@ template <> inline constexpr bool IsDataTypeDateOrDateTime<DataTypeDateTime> = t
template <> inline constexpr bool IsDataTypeDateOrDateTime<DataTypeDateTime64> = true;
}
......@@ -19,6 +19,7 @@
#if USE_MYSQL
# include <Core/MySQL/MySQLClient.h>
# include <Databases/MySQL/ConnectionMySQLSettings.h>
# include <Databases/MySQL/DatabaseConnectionMySQL.h>
# include <Databases/MySQL/MaterializeMySQLSettings.h>
# include <Databases/MySQL/DatabaseMaterializeMySQL.h>
......@@ -83,7 +84,7 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
throw Exception("Database engine " + engine_name + " cannot have arguments", ErrorCodes::BAD_ARGUMENTS);
if (engine_define->engine->parameters || engine_define->partition_by || engine_define->primary_key || engine_define->order_by ||
engine_define->sample_by || (engine_name != "MaterializeMySQL" && engine_define->settings))
engine_define->sample_by || (!endsWith(engine_name, "MySQL") && engine_define->settings))
throw Exception("Database engine " + engine_name + " cannot have parameters, primary_key, order_by, sample_by, settings",
ErrorCodes::UNKNOWN_ELEMENT_IN_AST);
......@@ -133,8 +134,13 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
, std::move(materialize_mode_settings));
}
auto mysql_database_settings = std::make_unique<ConnectionMySQLSettings>();
mysql_database_settings->loadFromQueryContext(context);
mysql_database_settings->loadFromQuery(*engine_define); /// higher priority
return std::make_shared<DatabaseConnectionMySQL>(
context, database_name, metadata_path, engine_define, mysql_database_name, std::move(mysql_pool));
context, database_name, metadata_path, engine_define, mysql_database_name, std::move(mysql_database_settings), std::move(mysql_pool));
}
catch (...)
{
......
#include <Databases/MySQL/ConnectionMySQLSettings.h>
#include <Core/SettingsFields.h>
#include <Interpreters/Context.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTCreateQuery.h>
namespace DB
{
namespace ErrorCodes
{
extern const int UNKNOWN_SETTING;
extern const int BAD_ARGUMENTS;
}
IMPLEMENT_SETTINGS_TRAITS(ConnectionMySQLSettingsTraits, LIST_OF_CONNECTION_MYSQL_SETTINGS)
void ConnectionMySQLSettings::loadFromQuery(ASTStorage & storage_def)
{
if (storage_def.settings)
{
try
{
applyChanges(storage_def.settings->changes);
}
catch (Exception & e)
{
if (e.code() == ErrorCodes::UNKNOWN_SETTING)
throw Exception(e.message() + " for database " + storage_def.engine->name, ErrorCodes::BAD_ARGUMENTS);
else
e.rethrow();
}
}
else
{
auto settings_ast = std::make_shared<ASTSetQuery>();
settings_ast->is_standalone = false;
storage_def.set(storage_def.settings, settings_ast);
}
SettingsChanges & changes = storage_def.settings->changes;
#define ADD_IF_ABSENT(NAME) \
if (std::find_if(changes.begin(), changes.end(), \
[](const SettingChange & c) { return c.name == #NAME; }) \
== changes.end()) \
changes.push_back(SettingChange{#NAME, static_cast<Field>(NAME)});
APPLY_FOR_IMMUTABLE_CONNECTION_MYSQL_SETTINGS(ADD_IF_ABSENT)
#undef ADD_IF_ABSENT
}
void ConnectionMySQLSettings::loadFromQueryContext(const Context & context)
{
if (!context.hasQueryContext())
return;
const Settings & settings = context.getQueryContext().getSettingsRef();
if (settings.mysql_datatypes_support_level.value != mysql_datatypes_support_level.value)
set("mysql_datatypes_support_level", settings.mysql_datatypes_support_level.toString());
}
}
#pragma once
#include <Core/Defines.h>
#include <Core/BaseSettings.h>
#include <Core/SettingsEnums.h>
namespace DB
{
class Context;
class ASTStorage;
#define LIST_OF_CONNECTION_MYSQL_SETTINGS(M) \
M(MySQLDataTypesSupport, mysql_datatypes_support_level, 0, "Which MySQL types should be converted to corresponding ClickHouse types (rather than being represented as String). Can be empty or any combination of 'decimal' or 'datetime64'. When empty MySQL's DECIMAL and DATETIME/TIMESTAMP with non-zero precison are seen as String on ClickHouse's side.", 0) \
/// Settings that should not change after the creation of a database.
#define APPLY_FOR_IMMUTABLE_CONNECTION_MYSQL_SETTINGS(M) \
M(mysql_datatypes_support_level)
DECLARE_SETTINGS_TRAITS(ConnectionMySQLSettingsTraits, LIST_OF_CONNECTION_MYSQL_SETTINGS)
/** Settings for the MySQL database engine.
* Could be loaded from a CREATE DATABASE query (SETTINGS clause) and Query settings.
*/
struct ConnectionMySQLSettings : public BaseSettings<ConnectionMySQLSettingsTraits>
{
void loadFromQuery(ASTStorage & storage_def);
void loadFromQueryContext(const Context & context);
};
}
......@@ -45,13 +45,13 @@ static constexpr const std::chrono::seconds cleaner_sleep_time{30};
static const std::chrono::seconds lock_acquire_timeout{10};
DatabaseConnectionMySQL::DatabaseConnectionMySQL(const Context & context, const String & database_name_, const String & metadata_path_,
const ASTStorage * database_engine_define_, const String & database_name_in_mysql_, mysqlxx::Pool && pool)
const ASTStorage * database_engine_define_, const String & database_name_in_mysql_, std::unique_ptr<ConnectionMySQLSettings> settings_, mysqlxx::Pool && pool)
: IDatabase(database_name_)
, global_context(context.getGlobalContext())
, metadata_path(metadata_path_)
, database_engine_define(database_engine_define_->clone())
, database_name_in_mysql(database_name_in_mysql_)
, mysql_datatypes_support_level(context.getQueryContext().getSettingsRef().mysql_datatypes_support_level)
, database_settings(std::move(settings_))
, mysql_pool(std::move(pool))
{
empty(); /// test database is works fine.
......@@ -133,9 +133,20 @@ static ASTPtr getCreateQueryFromStorage(const StoragePtr & storage, const ASTPtr
columns_expression_list->children.emplace_back(column_declaration);
}
ASTStorage * ast_storage = table_storage_define->as<ASTStorage>();
ASTs storage_children = ast_storage->children;
auto storage_engine_arguments = ast_storage->engine->arguments;
/// Add table_name to engine arguments
auto mysql_table_name = std::make_shared<ASTLiteral>(table_id.table_name);
auto storage_engine_arguments = table_storage_define->as<ASTStorage>()->engine->arguments;
storage_engine_arguments->children.insert(storage_engine_arguments->children.begin() + 2, mysql_table_name);
/// Unset settings
storage_children.erase(
std::remove_if(storage_children.begin(), storage_children.end(),
[&](const ASTPtr & element) { return element.get() == ast_storage->settings; }),
storage_children.end());
ast_storage->settings = nullptr;
}
return create_table_query;
......@@ -273,7 +284,7 @@ std::map<String, NamesAndTypesList> DatabaseConnectionMySQL::fetchTablesColumnsL
database_name_in_mysql,
tables_name,
settings.external_table_functions_use_nulls,
mysql_datatypes_support_level);
database_settings->mysql_datatypes_support_level);
}
void DatabaseConnectionMySQL::shutdown()
......
......@@ -8,6 +8,7 @@
#include <Core/MultiEnum.h>
#include <Common/ThreadPool.h>
#include <Databases/DatabasesCommon.h>
#include <Databases/MySQL/ConnectionMySQLSettings.h>
#include <Parsers/ASTCreateQuery.h>
#include <atomic>
......@@ -36,7 +37,8 @@ public:
DatabaseConnectionMySQL(
const Context & context, const String & database_name, const String & metadata_path,
const ASTStorage * database_engine_define, const String & database_name_in_mysql, mysqlxx::Pool && pool);
const ASTStorage * database_engine_define, const String & database_name_in_mysql, std::unique_ptr<ConnectionMySQLSettings> settings_,
mysqlxx::Pool && pool);
String getEngineName() const override { return "MySQL"; }
......@@ -76,9 +78,7 @@ private:
String metadata_path;
ASTPtr database_engine_define;
String database_name_in_mysql;
// Cache setting for later from query context upon creation,
// so column types depend on the settings set at query-level.
MultiEnum<MySQLDataTypesSupport> mysql_datatypes_support_level;
std::unique_ptr<ConnectionMySQLSettings> database_settings;
std::atomic<bool> quit{false};
std::condition_variable cond;
......
......@@ -17,6 +17,7 @@ SRCS(
DatabaseOrdinary.cpp
DatabasesCommon.cpp
DatabaseWithDictionaries.cpp
MySQL/ConnectionMySQLSettings.cpp
MySQL/DatabaseConnectionMySQL.cpp
MySQL/DatabaseMaterializeMySQL.cpp
MySQL/FetchTablesColumnsList.cpp
......
......@@ -368,6 +368,8 @@ void registerInputFormatProcessorArrow(FormatFactory & factory);
void registerOutputFormatProcessorArrow(FormatFactory & factory);
void registerInputFormatProcessorAvro(FormatFactory & factory);
void registerOutputFormatProcessorAvro(FormatFactory & factory);
void registerInputFormatProcessorRawBLOB(FormatFactory & factory);
void registerOutputFormatProcessorRawBLOB(FormatFactory & factory);
/// Output only (presentational) formats.
......@@ -428,6 +430,9 @@ FormatFactory::FormatFactory()
registerOutputFormatProcessorTemplate(*this);
registerInputFormatProcessorMsgPack(*this);
registerOutputFormatProcessorMsgPack(*this);
registerInputFormatProcessorRawBLOB(*this);
registerOutputFormatProcessorRawBLOB(*this);
#if !defined(ARCADIA_BUILD)
registerInputFormatProcessorORC(*this);
registerOutputFormatProcessorORC(*this);
......@@ -458,6 +463,7 @@ FormatFactory::FormatFactory()
registerInputFormatProcessorRegexp(*this);
registerInputFormatProcessorJSONAsString(*this);
registerInputFormatProcessorLineAsString(*this);
#if !defined(ARCADIA_BUILD)
registerInputFormatProcessorCapnProto(*this);
#endif
......
......@@ -5,11 +5,19 @@
#include <Functions/FunctionFactory.h>
#include <Core/Field.h>
#include <Functions/extractTimeZoneFromFunctionArguments.h>
#include <time.h>
namespace DB
{
namespace ErrorCodes
{
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}
namespace
{
......@@ -35,7 +43,7 @@ private:
class FunctionBaseNow : public IFunctionBaseImpl
{
public:
explicit FunctionBaseNow(time_t time_) : time_value(time_), return_type(std::make_shared<DataTypeDateTime>()) {}
explicit FunctionBaseNow(time_t time_, DataTypePtr return_type_) : time_value(time_), return_type(return_type_) {}
String getName() const override { return "now"; }
......@@ -72,14 +80,44 @@ public:
bool isDeterministic() const override { return false; }
bool isVariadic() const override { return true; }
size_t getNumberOfArguments() const override { return 0; }
static FunctionOverloadResolverImplPtr create(const Context &) { return std::make_unique<NowOverloadResolver>(); }
DataTypePtr getReturnType(const DataTypes &) const override { return std::make_shared<DataTypeDateTime>(); }
DataTypePtr getReturnType(const ColumnsWithTypeAndName & arguments) const override
{
if (arguments.size() > 1)
{
throw Exception("Arguments size of function " + getName() + " should be 0 or 1", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
}
if (arguments.size() == 1 && !isStringOrFixedString(arguments[0].type))
{
throw Exception(
"Arguments of function " + getName() + " should be String or FixedString", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
}
if (arguments.size() == 1)
{
return std::make_shared<DataTypeDateTime>(extractTimeZoneNameFromFunctionArguments(arguments, 0, 0));
}
return std::make_shared<DataTypeDateTime>();
}
FunctionBaseImplPtr build(const ColumnsWithTypeAndName &, const DataTypePtr &) const override
FunctionBaseImplPtr build(const ColumnsWithTypeAndName & arguments, const DataTypePtr &) const override
{
return std::make_unique<FunctionBaseNow>(time(nullptr));
if (arguments.size() > 1)
{
throw Exception("Arguments size of function " + getName() + " should be 0 or 1", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
}
if (arguments.size() == 1 && !isStringOrFixedString(arguments[0].type))
{
throw Exception(
"Arguments of function " + getName() + " should be String or FixedString", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
}
if (arguments.size() == 1)
return std::make_unique<FunctionBaseNow>(
time(nullptr), std::make_shared<DataTypeDateTime>(extractTimeZoneNameFromFunctionArguments(arguments, 0, 0)));
return std::make_unique<FunctionBaseNow>(time(nullptr), std::make_shared<DataTypeDateTime>());
}
};
......
......@@ -54,7 +54,7 @@ ReadBufferFromFile::ReadBufferFromFile(
ReadBufferFromFile::ReadBufferFromFile(
int fd_,
int & fd_,
const std::string & original_file_name,
size_t buf_size,
char * existing_memory,
......@@ -63,6 +63,7 @@ ReadBufferFromFile::ReadBufferFromFile(
ReadBufferFromFileDescriptor(fd_, buf_size, existing_memory, alignment),
file_name(original_file_name.empty() ? "(fd = " + toString(fd_) + ")" : original_file_name)
{
fd_ = -1;
}
......
......@@ -29,7 +29,10 @@ public:
char * existing_memory = nullptr, size_t alignment = 0);
/// Use pre-opened file descriptor.
ReadBufferFromFile(int fd, const std::string & original_file_name = {}, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
ReadBufferFromFile(
int & fd, /// Will be set to -1 if constructor didn't throw and ownership of file descriptor is passed to the object.
const std::string & original_file_name = {},
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
char * existing_memory = nullptr, size_t alignment = 0);
~ReadBufferFromFile() override;
......
......@@ -59,7 +59,7 @@ WriteBufferFromFile::WriteBufferFromFile(
/// Use pre-opened file descriptor.
WriteBufferFromFile::WriteBufferFromFile(
int fd_,
int & fd_,
const std::string & original_file_name,
size_t buf_size,
char * existing_memory,
......@@ -68,6 +68,7 @@ WriteBufferFromFile::WriteBufferFromFile(
WriteBufferFromFileDescriptor(fd_, buf_size, existing_memory, alignment),
file_name(original_file_name.empty() ? "(fd = " + toString(fd_) + ")" : original_file_name)
{
fd_ = -1;
}
......
......@@ -39,7 +39,7 @@ public:
/// Use pre-opened file descriptor.
WriteBufferFromFile(
int fd,
int & fd, /// Will be set to -1 if constructor didn't throw and ownership of file descriptor is passed to the object.
const std::string & original_file_name = {},
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
char * existing_memory = nullptr,
......
#include <Processors/Formats/Impl/JSONAsStringRowInputFormat.h>
#include <Formats/JSONEachRowUtils.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <common/find_symbols.h>
#include <IO/ReadHelpers.h>
......@@ -8,17 +10,22 @@ namespace DB
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int BAD_ARGUMENTS;
extern const int INCORRECT_DATA;
}
JSONAsStringRowInputFormat::JSONAsStringRowInputFormat(const Block & header_, ReadBuffer & in_, Params params_) :
IRowInputFormat(header_, in_, std::move(params_)), buf(in)
{
if (header_.columns() > 1 || header_.getDataTypes()[0]->getTypeId() != TypeIndex::String)
{
throw Exception("This input format is only suitable for tables with a single column of type String.", ErrorCodes::LOGICAL_ERROR);
}
if (header_.columns() > 1)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"This input format is only suitable for tables with a single column of type String but the number of columns is {}",
header_.columns());
if (!isString(removeNullable(removeLowCardinality(header_.getByPosition(0).type))))
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"This input format is only suitable for tables with a single column of type String but the column type is {}",
header_.getByPosition(0).type->getName());
}
void JSONAsStringRowInputFormat::resetParser()
......
#include <Formats/FormatFactory.h>
#include <Processors/Formats/Impl/RawBLOBRowInputFormat.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <IO/ReadHelpers.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
RawBLOBRowInputFormat::RawBLOBRowInputFormat(const Block & header_, ReadBuffer & in_, Params params_)
: IRowInputFormat(header_, in_, std::move(params_))
{
if (header_.columns() > 1)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"This input format is only suitable for tables with a single column of type String but the number of columns is {}",
header_.columns());
if (!isString(removeNullable(removeLowCardinality(header_.getByPosition(0).type))))
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"This input format is only suitable for tables with a single column of type String but the column type is {}",
header_.getByPosition(0).type->getName());
}
bool RawBLOBRowInputFormat::readRow(MutableColumns & columns, RowReadExtension &)
{
if (in.eof())
return false;
/// One excessive copy.
String blob;
readStringUntilEOF(blob, in);
columns.at(0)->insertData(blob.data(), blob.size());
return false;
}
void registerInputFormatProcessorRawBLOB(FormatFactory & factory)
{
factory.registerInputFormatProcessor("RawBLOB", [](
ReadBuffer & buf,
const Block & sample,
const RowInputFormatParams & params,
const FormatSettings &)
{
return std::make_shared<RawBLOBRowInputFormat>(sample, buf, params);
});
}
}
#pragma once
#include <Processors/Formats/IRowInputFormat.h>
namespace DB
{
class ReadBuffer;
/// This format slurps all input data into single value.
/// This format can only parse a table with single field of type String or similar.
class RawBLOBRowInputFormat : public IRowInputFormat
{
public:
RawBLOBRowInputFormat(const Block & header_, ReadBuffer & in_, Params params_);
bool readRow(MutableColumns & columns, RowReadExtension &) override;
String getName() const override { return "RawBLOBRowInputFormat"; }
};
}
#include <Processors/Formats/Impl/RawBLOBRowOutputFormat.h>
#include <Formats/FormatFactory.h>
#include <IO/WriteBuffer.h>
namespace DB
{
RawBLOBRowOutputFormat::RawBLOBRowOutputFormat(
WriteBuffer & out_,
const Block & header_,
FormatFactory::WriteCallback callback)
: IRowOutputFormat(header_, out_, callback)
{
}
void RawBLOBRowOutputFormat::writeField(const IColumn & column, const IDataType &, size_t row_num)
{
StringRef value = column.getDataAt(row_num);
out.write(value.data, value.size);
}
void registerOutputFormatProcessorRawBLOB(FormatFactory & factory)
{
factory.registerOutputFormatProcessor("RawBLOB", [](
WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback callback,
const FormatSettings &)
{
return std::make_shared<RawBLOBRowOutputFormat>(buf, sample, callback);
});
}
}
#pragma once
#include <Core/Block.h>
#include <Processors/Formats/IRowOutputFormat.h>
namespace DB
{
class WriteBuffer;
/** This format only allows to output columns of type String
* or types that have contiguous representation in memory.
* They are output as raw bytes without any delimiters or escaping.
*
* The difference between RawBLOB and TSVRaw:
* - data is output in binary, no escaping;
* - no delimiters between values;
* - no newline at the end of each value.
*
* The difference between RawBLOB and RowBinary:
* - strings are output without their lengths.
*
* If you are output more than one value, the output format is ambiguous and you may not be able to read data back.
*/
class RawBLOBRowOutputFormat : public IRowOutputFormat
{
public:
RawBLOBRowOutputFormat(
WriteBuffer & out_,
const Block & header_,
FormatFactory::WriteCallback callback);
String getName() const override { return "RawBLOBRowOutputFormat"; }
void writeField(const IColumn & column, const IDataType &, size_t row_num) override;
};
}
......@@ -47,6 +47,8 @@ SRCS(
Formats/Impl/PrettySpaceBlockOutputFormat.cpp
Formats/Impl/ProtobufRowInputFormat.cpp
Formats/Impl/ProtobufRowOutputFormat.cpp
Formats/Impl/RawBLOBRowInputFormat.cpp
Formats/Impl/RawBLOBRowOutputFormat.cpp
Formats/Impl/RegexpRowInputFormat.cpp
Formats/Impl/TabSeparatedRowInputFormat.cpp
Formats/Impl/TabSeparatedRowOutputFormat.cpp
......
......@@ -426,6 +426,16 @@ CompressionCodecPtr ColumnsDescription::getCodecOrDefault(const String & column_
return getCodecOrDefault(column_name, CompressionCodecFactory::instance().getDefaultCodec());
}
ASTPtr ColumnsDescription::getCodecDescOrDefault(const String & column_name, CompressionCodecPtr default_codec) const
{
const auto it = columns.get<1>().find(column_name);
if (it == columns.get<1>().end() || !it->codec)
return default_codec->getFullCodecDesc();
return it->codec;
}
ColumnsDescription::ColumnTTLs ColumnsDescription::getColumnTTLs() const
{
ColumnTTLs ret;
......
......@@ -115,6 +115,7 @@ public:
bool hasCompressionCodec(const String & column_name) const;
CompressionCodecPtr getCodecOrDefault(const String & column_name, CompressionCodecPtr default_codec) const;
CompressionCodecPtr getCodecOrDefault(const String & column_name) const;
ASTPtr getCodecDescOrDefault(const String & column_name, CompressionCodecPtr default_codec) const;
String toString() const;
static ColumnsDescription parse(const String & str);
......
#include <Storages/JoinSettings.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTSetQuery.h>
#include <Parsers/ASTFunction.h>
#include <Common/Exception.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int UNKNOWN_SETTING;
}
IMPLEMENT_SETTINGS_TRAITS(joinSettingsTraits, LIST_OF_JOIN_SETTINGS)
void JoinSettings::loadFromQuery(ASTStorage & storage_def)
{
if (storage_def.settings)
{
try
{
applyChanges(storage_def.settings->changes);
}
catch (Exception & e)
{
if (e.code() == ErrorCodes::UNKNOWN_SETTING)
throw Exception(e.message() + " for storage " + storage_def.engine->name, ErrorCodes::BAD_ARGUMENTS);
else
e.rethrow();
}
}
else
{
auto settings_ast = std::make_shared<ASTSetQuery>();
settings_ast->is_standalone = false;
storage_def.set(storage_def.settings, settings_ast);
}
}
}
#pragma once
#include <Core/BaseSettings.h>
#include <Core/Settings.h>
namespace DB
{
class ASTStorage;
#define JOIN_RELATED_SETTINGS(M) \
M(Bool, persistent, true, "Disable setting to avoid the overhead of writing to disk for StorageJoin", 0)
#define LIST_OF_JOIN_SETTINGS(M) \
JOIN_RELATED_SETTINGS(M) \
FORMAT_FACTORY_SETTINGS(M)
DECLARE_SETTINGS_TRAITS(joinSettingsTraits, LIST_OF_JOIN_SETTINGS)
/** Settings for the Join engine.
* Could be loaded from a CREATE TABLE query (SETTINGS clause).
*/
struct JoinSettings : public BaseSettings<joinSettingsTraits>
{
void loadFromQuery(ASTStorage & storage_def);
};
}
......@@ -51,7 +51,7 @@ NameSet IMergedBlockOutputStream::removeEmptyColumnsFromPart(
for (const NameAndTypePair & column : columns)
{
column.type->enumerateStreams(
[&](const IDataType::SubstreamPath & substream_path)
[&](const IDataType::SubstreamPath & substream_path, const IDataType & /* substream_path */)
{
++stream_counts[IDataType::getFileNameForStream(column.name, substream_path)];
},
......@@ -62,7 +62,7 @@ NameSet IMergedBlockOutputStream::removeEmptyColumnsFromPart(
const String mrk_extension = data_part->getMarksFileExtension();
for (const auto & column_name : empty_columns)
{
IDataType::StreamCallback callback = [&](const IDataType::SubstreamPath & substream_path)
IDataType::StreamCallback callback = [&](const IDataType::SubstreamPath & substream_path, const IDataType & /* substream_path */)
{
String stream_name = IDataType::getFileNameForStream(column_name, substream_path);
/// Delete files if they are no longer shared with another column.
......
......@@ -124,7 +124,8 @@ private:
/// Watch for the node in front of us.
--my_node_it;
if (!zookeeper.existsWatch(path + "/" + *my_node_it, nullptr, task->getWatchCallback()))
std::string get_path_value;
if (!zookeeper.tryGetWatch(path + "/" + *my_node_it, get_path_value, nullptr, task->getWatchCallback()))
task->schedule();
success = true;
......
......@@ -1452,7 +1452,7 @@ NameToNameVector MergeTreeDataMergerMutator::collectFilesForRenames(
for (const NameAndTypePair & column : source_part->getColumns())
{
column.type->enumerateStreams(
[&](const IDataType::SubstreamPath & substream_path)
[&](const IDataType::SubstreamPath & substream_path, const IDataType & /* substream_type */)
{
++stream_counts[IDataType::getFileNameForStream(column.name, substream_path)];
},
......@@ -1470,7 +1470,7 @@ NameToNameVector MergeTreeDataMergerMutator::collectFilesForRenames(
}
else if (command.type == MutationCommand::Type::DROP_COLUMN)
{
IDataType::StreamCallback callback = [&](const IDataType::SubstreamPath & substream_path)
IDataType::StreamCallback callback = [&](const IDataType::SubstreamPath & substream_path, const IDataType & /* substream_type */)
{
String stream_name = IDataType::getFileNameForStream(command.column_name, substream_path);
/// Delete files if they are no longer shared with another column.
......@@ -1491,7 +1491,7 @@ NameToNameVector MergeTreeDataMergerMutator::collectFilesForRenames(
String escaped_name_from = escapeForFileName(command.column_name);
String escaped_name_to = escapeForFileName(command.rename_to);
IDataType::StreamCallback callback = [&](const IDataType::SubstreamPath & substream_path)
IDataType::StreamCallback callback = [&](const IDataType::SubstreamPath & substream_path, const IDataType & /* substream_type */)
{
String stream_from = IDataType::getFileNameForStream(command.column_name, substream_path);
......@@ -1524,7 +1524,7 @@ NameSet MergeTreeDataMergerMutator::collectFilesToSkip(
/// Skip updated files
for (const auto & entry : updated_header)
{
IDataType::StreamCallback callback = [&](const IDataType::SubstreamPath & substream_path)
IDataType::StreamCallback callback = [&](const IDataType::SubstreamPath & substream_path, const IDataType & /* substream_type */)
{
String stream_name = IDataType::getFileNameForStream(entry.name, substream_path);
files_to_skip.insert(stream_name + ".bin");
......
......@@ -77,7 +77,7 @@ ColumnSize MergeTreeDataPartWide::getColumnSizeImpl(
if (checksums.empty())
return size;
type.enumerateStreams([&](const IDataType::SubstreamPath & substream_path)
type.enumerateStreams([&](const IDataType::SubstreamPath & substream_path, const IDataType & /* substream_type */)
{
String file_name = IDataType::getFileNameForStream(column_name, substream_path);
......@@ -155,7 +155,7 @@ void MergeTreeDataPartWide::checkConsistency(bool require_part_metadata) const
for (const NameAndTypePair & name_type : columns)
{
IDataType::SubstreamPath stream_path;
name_type.type->enumerateStreams([&](const IDataType::SubstreamPath & substream_path)
name_type.type->enumerateStreams([&](const IDataType::SubstreamPath & substream_path, const IDataType & /* substream_type */)
{
String file_name = IDataType::getFileNameForStream(name_type.name, substream_path);
String mrk_file_name = file_name + index_granularity_info.marks_file_extension;
......@@ -177,7 +177,7 @@ void MergeTreeDataPartWide::checkConsistency(bool require_part_metadata) const
std::optional<UInt64> marks_size;
for (const NameAndTypePair & name_type : columns)
{
name_type.type->enumerateStreams([&](const IDataType::SubstreamPath & substream_path)
name_type.type->enumerateStreams([&](const IDataType::SubstreamPath & substream_path, const IDataType & /* substream_type */)
{
auto file_path = path + IDataType::getFileNameForStream(name_type.name, substream_path) + index_granularity_info.marks_file_extension;
......@@ -205,7 +205,7 @@ bool MergeTreeDataPartWide::hasColumnFiles(const String & column_name, const IDa
{
bool res = true;
type.enumerateStreams([&](const IDataType::SubstreamPath & substream_path)
type.enumerateStreams([&](const IDataType::SubstreamPath & substream_path, const IDataType & /* substream_type */)
{
String file_name = IDataType::getFileNameForStream(column_name, substream_path);
......@@ -222,7 +222,7 @@ bool MergeTreeDataPartWide::hasColumnFiles(const String & column_name, const IDa
String MergeTreeDataPartWide::getFileNameForColumn(const NameAndTypePair & column) const
{
String filename;
column.type->enumerateStreams([&](const IDataType::SubstreamPath & substream_path)
column.type->enumerateStreams([&](const IDataType::SubstreamPath & substream_path, const IDataType & /* substream_type */)
{
if (filename.empty())
filename = IDataType::getFileNameForStream(column.name, substream_path);
......
......@@ -3,6 +3,7 @@
namespace DB
{
MergeTreeDataPartWriterCompact::MergeTreeDataPartWriterCompact(
const MergeTreeData::DataPartPtr & data_part_,
const NamesAndTypesList & columns_list_,
......@@ -30,14 +31,37 @@ MergeTreeDataPartWriterCompact::MergeTreeDataPartWriterCompact(
{
const auto & storage_columns = metadata_snapshot->getColumns();
for (const auto & column : columns_list)
addStreams(column.name, *column.type, storage_columns.getCodecDescOrDefault(column.name, default_codec));
}
void MergeTreeDataPartWriterCompact::addStreams(const String & name, const IDataType & type, const ASTPtr & effective_codec_desc)
{
IDataType::StreamCallback callback = [&] (const IDataType::SubstreamPath & substream_path, const IDataType & substream_type)
{
auto codec = storage_columns.getCodecOrDefault(column.name, default_codec);
auto & stream = streams_by_codec[codec->getHash()];
String stream_name = IDataType::getFileNameForStream(name, substream_path);
/// Shared offsets for Nested type.
if (compressed_streams.count(stream_name))
return;
CompressionCodecPtr compression_codec;
/// If we can use special codec than just get it
if (IDataType::isSpecialCompressionAllowed(substream_path))
compression_codec = CompressionCodecFactory::instance().get(effective_codec_desc, &substream_type, default_codec);
else /// otherwise return only generic codecs and don't use info about data_type
compression_codec = CompressionCodecFactory::instance().get(effective_codec_desc, nullptr, default_codec, true);
UInt64 codec_id = compression_codec->getHash();
auto & stream = streams_by_codec[codec_id];
if (!stream)
stream = std::make_shared<CompressedStream>(plain_hashing, codec);
stream = std::make_shared<CompressedStream>(plain_hashing, compression_codec);
compressed_streams.push_back(stream);
}
compressed_streams.emplace(stream_name, stream);
};
IDataType::SubstreamPath stream_path;
type.enumerateStreams(callback, stream_path);
}
void MergeTreeDataPartWriterCompact::write(
......@@ -110,18 +134,37 @@ void MergeTreeDataPartWriterCompact::writeBlock(const Block & block)
auto name_and_type = columns_list.begin();
for (size_t i = 0; i < columns_list.size(); ++i, ++name_and_type)
{
auto & stream = compressed_streams[i];
/// Tricky part, because we share compressed streams between different columns substreams.
/// Compressed streams write data to the single file, but with different compression codecs.
/// So we flush each stream (using next()) before using new one, because otherwise we will override
/// data in result file.
CompressedStreamPtr prev_stream;
auto stream_getter = [&, this](const IDataType::SubstreamPath & substream_path) -> WriteBuffer *
{
String stream_name = IDataType::getFileNameForStream(name_and_type->name, substream_path);
auto & result_stream = compressed_streams[stream_name];
/// Write one compressed block per column in granule for more optimal reading.
if (prev_stream && prev_stream != result_stream)
{
/// Offset should be 0, because compressed block is written for every granule.
assert(result_stream->hashing_buf.offset() == 0);
prev_stream->hashing_buf.next();
}
prev_stream = result_stream;
return &result_stream->hashing_buf;
};
/// Offset should be 0, because compressed block is written for every granule.
assert(stream->hashing_buf.offset() == 0);
writeIntBinary(plain_hashing.count(), marks);
writeIntBinary(UInt64(0), marks);
writeColumnSingleGranule(block.getByName(name_and_type->name), stream, current_row, rows_to_write);
writeColumnSingleGranule(block.getByName(name_and_type->name), stream_getter, current_row, rows_to_write);
/// Write one compressed block per column in granule for more optimal reading.
stream->hashing_buf.next();
/// Each type always have at least one substream
prev_stream->hashing_buf.next(); //-V522
}
++from_mark;
......@@ -145,13 +188,14 @@ void MergeTreeDataPartWriterCompact::writeBlock(const Block & block)
void MergeTreeDataPartWriterCompact::writeColumnSingleGranule(
const ColumnWithTypeAndName & column,
const CompressedStreamPtr & stream,
size_t from_row, size_t number_of_rows)
IDataType::OutputStreamGetter stream_getter,
size_t from_row,
size_t number_of_rows)
{
IDataType::SerializeBinaryBulkStatePtr state;
IDataType::SerializeBinaryBulkSettings serialize_settings;
serialize_settings.getter = [&stream](IDataType::SubstreamPath) -> WriteBuffer * { return &stream->hashing_buf; };
serialize_settings.getter = stream_getter;
serialize_settings.position_independent_encoding = true;
serialize_settings.low_cardinality_max_dictionary_size = 0;
......
......@@ -30,6 +30,8 @@ private:
void addToChecksums(MergeTreeDataPartChecksums & checksums);
void addStreams(const String & name, const IDataType & type, const ASTPtr & effective_codec_desc);
Block header;
/** Simplified SquashingTransform. The original one isn't suitable in this case
......@@ -52,22 +54,25 @@ private:
std::unique_ptr<WriteBufferFromFileBase> plain_file;
HashingWriteBuffer plain_hashing;
/// Compressed stream which allows to write with codec.
struct CompressedStream
{
CompressedWriteBuffer compressed_buf;
HashingWriteBuffer hashing_buf;
CompressedStream(WriteBuffer & buf, const CompressionCodecPtr & codec)
: compressed_buf(buf, codec), hashing_buf(compressed_buf) {}
: compressed_buf(buf, codec)
, hashing_buf(compressed_buf) {}
};
using CompressedStreamPtr = std::shared_ptr<CompressedStream>;
/// Create compressed stream for every different codec.
/// Create compressed stream for every different codec. All streams write to
/// a single file on disk.
std::unordered_map<UInt64, CompressedStreamPtr> streams_by_codec;
/// For better performance save pointer to stream by every column.
std::vector<CompressedStreamPtr> compressed_streams;
/// Stream for each column's substreams path (look at addStreams).
std::unordered_map<String, CompressedStreamPtr> compressed_streams;
/// marks -> marks_file
std::unique_ptr<WriteBufferFromFileBase> marks_file;
......@@ -76,7 +81,7 @@ private:
/// Write single granule of one column (rows between 2 marks)
static void writeColumnSingleGranule(
const ColumnWithTypeAndName & column,
const CompressedStreamPtr & stream,
IDataType::OutputStreamGetter stream_getter,
size_t from_row,
size_t number_of_rows);
};
......
#include <Storages/MergeTree/MergeTreeDataPartWriterWide.h>
#include <Interpreters/Context.h>
#include <Compression/CompressionFactory.h>
namespace DB
{
......@@ -28,28 +29,35 @@ MergeTreeDataPartWriterWide::MergeTreeDataPartWriterWide(
{
const auto & columns = metadata_snapshot->getColumns();
for (const auto & it : columns_list)
addStreams(it.name, *it.type, columns.getCodecOrDefault(it.name, default_codec), settings.estimated_size);
addStreams(it.name, *it.type, columns.getCodecDescOrDefault(it.name, default_codec), settings.estimated_size);
}
void MergeTreeDataPartWriterWide::addStreams(
const String & name,
const IDataType & type,
const CompressionCodecPtr & effective_codec,
const ASTPtr & effective_codec_desc,
size_t estimated_size)
{
IDataType::StreamCallback callback = [&] (const IDataType::SubstreamPath & substream_path)
IDataType::StreamCallback callback = [&] (const IDataType::SubstreamPath & substream_path, const IDataType & substream_type)
{
String stream_name = IDataType::getFileNameForStream(name, substream_path);
/// Shared offsets for Nested type.
if (column_streams.count(stream_name))
return;
CompressionCodecPtr compression_codec;
/// If we can use special codec then just get it
if (IDataType::isSpecialCompressionAllowed(substream_path))
compression_codec = CompressionCodecFactory::instance().get(effective_codec_desc, &substream_type, default_codec);
else /// otherwise return only generic codecs and don't use info about the data_type
compression_codec = CompressionCodecFactory::instance().get(effective_codec_desc, nullptr, default_codec, true);
column_streams[stream_name] = std::make_unique<Stream>(
stream_name,
data_part->volume->getDisk(),
part_path + stream_name, DATA_FILE_EXTENSION,
part_path + stream_name, marks_file_extension,
effective_codec,
compression_codec,
settings.max_compress_block_size,
estimated_size,
settings.aio_threshold);
......@@ -130,7 +138,7 @@ void MergeTreeDataPartWriterWide::writeSingleMark(
size_t number_of_rows,
DB::IDataType::SubstreamPath & path)
{
type.enumerateStreams([&] (const IDataType::SubstreamPath & substream_path)
type.enumerateStreams([&] (const IDataType::SubstreamPath & substream_path, const IDataType & /* substream_type */)
{
bool is_offsets = !substream_path.empty() && substream_path.back().type == IDataType::Substream::ArraySizes;
......@@ -170,7 +178,7 @@ size_t MergeTreeDataPartWriterWide::writeSingleGranule(
type.serializeBinaryBulkWithMultipleStreams(column, from_row, number_of_rows, serialize_settings, serialization_state);
/// So that instead of the marks pointing to the end of the compressed block, there were marks pointing to the beginning of the next one.
type.enumerateStreams([&] (const IDataType::SubstreamPath & substream_path)
type.enumerateStreams([&] (const IDataType::SubstreamPath & substream_path, const IDataType & /* substream_type */)
{
bool is_offsets = !substream_path.empty() && substream_path.back().type == IDataType::Substream::ArraySizes;
......@@ -251,7 +259,7 @@ void MergeTreeDataPartWriterWide::writeColumn(
current_column_mark++;
}
type.enumerateStreams([&] (const IDataType::SubstreamPath & substream_path)
type.enumerateStreams([&] (const IDataType::SubstreamPath & substream_path, const IDataType & /* substream_type */)
{
bool is_offsets = !substream_path.empty() && substream_path.back().type == IDataType::Substream::ArraySizes;
if (is_offsets)
......@@ -312,7 +320,7 @@ void MergeTreeDataPartWriterWide::writeFinalMark(
{
writeSingleMark(column_name, *column_type, offset_columns, 0, path);
/// Memoize information about offsets
column_type->enumerateStreams([&] (const IDataType::SubstreamPath & substream_path)
column_type->enumerateStreams([&] (const IDataType::SubstreamPath & substream_path, const IDataType & /* substream_type */)
{
bool is_offsets = !substream_path.empty() && substream_path.back().type == IDataType::Substream::ArraySizes;
if (is_offsets)
......
......@@ -66,7 +66,7 @@ private:
void addStreams(
const String & name,
const IDataType & type,
const CompressionCodecPtr & effective_codec,
const ASTPtr & effective_codec_desc,
size_t estimated_size);
SerializationStates serialization_states;
......
......@@ -162,7 +162,7 @@ size_t MergeTreeReaderWide::readRows(size_t from_mark, bool continue_reading, si
void MergeTreeReaderWide::addStreams(const String & name, const IDataType & type,
const ReadBufferFromFileBase::ProfileCallback & profile_callback, clockid_t clock_type)
{
IDataType::StreamCallback callback = [&] (const IDataType::SubstreamPath & substream_path)
IDataType::StreamCallback callback = [&] (const IDataType::SubstreamPath & substream_path, const IDataType & /* substream_type */)
{
String stream_name = IDataType::getFileNameForStream(name, substream_path);
......
......@@ -120,7 +120,7 @@ IMergeTreeDataPart::Checksums checkDataPart(
{
for (const auto & column : columns_list)
{
column.type->enumerateStreams([&](const IDataType::SubstreamPath & substream_path)
column.type->enumerateStreams([&](const IDataType::SubstreamPath & substream_path, const IDataType & /* substream_type */)
{
String file_name = IDataType::getFileNameForStream(column.name, substream_path) + ".bin";
checksums_data.files[file_name] = checksum_compressed_file(disk, path + file_name);
......
#include <Storages/SetSettings.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTSetQuery.h>
#include <Parsers/ASTFunction.h>
#include <Common/Exception.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int UNKNOWN_SETTING;
}
IMPLEMENT_SETTINGS_TRAITS(setSettingsTraits, LIST_OF_SET_SETTINGS)
void SetSettings::loadFromQuery(ASTStorage & storage_def)
{
if (storage_def.settings)
{
try
{
applyChanges(storage_def.settings->changes);
}
catch (Exception & e)
{
if (e.code() == ErrorCodes::UNKNOWN_SETTING)
throw Exception(e.message() + " for storage " + storage_def.engine->name, ErrorCodes::BAD_ARGUMENTS);
else
e.rethrow();
}
}
else
{
auto settings_ast = std::make_shared<ASTSetQuery>();
settings_ast->is_standalone = false;
storage_def.set(storage_def.settings, settings_ast);
}
}
}
#pragma once
#include <Core/BaseSettings.h>
#include <Core/Settings.h>
namespace DB
{
class ASTStorage;
#define SET_RELATED_SETTINGS(M) \
M(Bool, persistent, true, "Disable setting to avoid the overhead of writing to disk for StorageSet", 0)
#define LIST_OF_SET_SETTINGS(M) \
SET_RELATED_SETTINGS(M) \
FORMAT_FACTORY_SETTINGS(M)
DECLARE_SETTINGS_TRAITS(setSettingsTraits, LIST_OF_SET_SETTINGS)
/** Settings for the Set engine.
* Could be loaded from a CREATE TABLE query (SETTINGS clause).
*/
struct SetSettings : public BaseSettings<setSettingsTraits>
{
void loadFromQuery(ASTStorage & storage_def);
};
}
......@@ -44,8 +44,9 @@ StorageJoin::StorageJoin(
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
bool overwrite_,
const Context & context_)
: StorageSetOrJoinBase{relative_path_, table_id_, columns_, constraints_, context_}
const Context & context_,
bool persistent_)
: StorageSetOrJoinBase{relative_path_, table_id_, columns_, constraints_, context_, persistent_}
, key_names(key_names_)
, use_nulls(use_nulls_)
, limits(limits_)
......@@ -118,6 +119,7 @@ void registerStorageJoin(StorageFactory & factory)
auto join_overflow_mode = settings.join_overflow_mode;
auto join_any_take_last_row = settings.join_any_take_last_row;
auto old_any_join = settings.any_join_distinct_right_table_keys;
bool persistent = true;
if (args.storage_def && args.storage_def->settings)
{
......@@ -135,6 +137,12 @@ void registerStorageJoin(StorageFactory & factory)
join_any_take_last_row = setting.value;
else if (setting.name == "any_join_distinct_right_table_keys")
old_any_join = setting.value;
else if (setting.name == "persistent")
{
auto join_settings = std::make_unique<JoinSettings>();
join_settings->loadFromQuery(*args.storage_def);
persistent = join_settings->persistent;
}
else
throw Exception(
"Unknown setting " + setting.name + " for storage " + args.engine_name,
......@@ -217,7 +225,8 @@ void registerStorageJoin(StorageFactory & factory)
args.columns,
args.constraints,
join_any_take_last_row,
args.context);
args.context,
persistent);
};
factory.registerStorage("Join", creator_fn, StorageFactory::StorageFeatures{ .supports_settings = true, });
......
......@@ -3,6 +3,7 @@
#include <ext/shared_ptr_helper.h>
#include <Storages/StorageSet.h>
#include <Storages/JoinSettings.h>
#include <Parsers/ASTTablesInSelectQuery.h>
......@@ -72,7 +73,8 @@ protected:
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
bool overwrite,
const Context & context_);
const Context & context_,
bool persistent_);
};
}
......@@ -362,7 +362,7 @@ void LogBlockOutputStream::writeData(const String & name, const IDataType & type
{
IDataType::SerializeBinaryBulkSettings settings;
type.enumerateStreams([&] (const IDataType::SubstreamPath & path)
type.enumerateStreams([&] (const IDataType::SubstreamPath & path, const IDataType & /* substream_type */)
{
String stream_name = IDataType::getFileNameForStream(name, path);
if (written_streams.count(stream_name))
......@@ -382,7 +382,7 @@ void LogBlockOutputStream::writeData(const String & name, const IDataType & type
if (serialize_states.count(name) == 0)
type.serializeBinaryBulkStatePrefix(settings, serialize_states[name]);
type.enumerateStreams([&] (const IDataType::SubstreamPath & path)
type.enumerateStreams([&] (const IDataType::SubstreamPath & path, const IDataType & /* substream_type */)
{
String stream_name = IDataType::getFileNameForStream(name, path);
if (written_streams.count(stream_name))
......@@ -400,7 +400,7 @@ void LogBlockOutputStream::writeData(const String & name, const IDataType & type
type.serializeBinaryBulkWithMultipleStreams(column, 0, 0, settings, serialize_states[name]);
type.enumerateStreams([&] (const IDataType::SubstreamPath & path)
type.enumerateStreams([&] (const IDataType::SubstreamPath & path, const IDataType & /* substream_type */)
{
String stream_name = IDataType::getFileNameForStream(name, path);
if (!written_streams.emplace(stream_name).second)
......@@ -487,7 +487,7 @@ void StorageLog::addFiles(const String & column_name, const IDataType & type)
throw Exception("Duplicate column with name " + column_name + " in constructor of StorageLog.",
ErrorCodes::DUPLICATE_COLUMN);
IDataType::StreamCallback stream_callback = [&] (const IDataType::SubstreamPath & substream_path)
IDataType::StreamCallback stream_callback = [&] (const IDataType::SubstreamPath & substream_path, const IDataType & /* substream_type */)
{
String stream_name = IDataType::getFileNameForStream(column_name, substream_path);
......@@ -597,7 +597,7 @@ const StorageLog::Marks & StorageLog::getMarksWithRealRowCount(const StorageMeta
* (Example: for Array data type, first stream is array sizes; and number of array sizes is the number of arrays).
*/
IDataType::SubstreamPath substream_root_path;
column_type->enumerateStreams([&](const IDataType::SubstreamPath & substream_path)
column_type->enumerateStreams([&](const IDataType::SubstreamPath & substream_path, const IDataType & /* substream_type */)
{
if (filename.empty())
filename = IDataType::getFileNameForStream(column_name, substream_path);
......
......@@ -12,6 +12,8 @@
#include <Interpreters/Set.h>
#include <Interpreters/Context.h>
#include <Poco/DirectoryIterator.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTLiteral.h>
namespace DB
......@@ -35,7 +37,7 @@ public:
SetOrJoinBlockOutputStream(
StorageSetOrJoinBase & table_, const StorageMetadataPtr & metadata_snapshot_,
const String & backup_path_, const String & backup_tmp_path_,
const String & backup_file_name_);
const String & backup_file_name_, bool persistent_);
Block getHeader() const override { return metadata_snapshot->getSampleBlock(); }
void write(const Block & block) override;
......@@ -50,6 +52,7 @@ private:
WriteBufferFromFile backup_buf;
CompressedWriteBuffer compressed_backup_buf;
NativeBlockOutputStream backup_stream;
bool persistent;
};
......@@ -58,7 +61,8 @@ SetOrJoinBlockOutputStream::SetOrJoinBlockOutputStream(
const StorageMetadataPtr & metadata_snapshot_,
const String & backup_path_,
const String & backup_tmp_path_,
const String & backup_file_name_)
const String & backup_file_name_,
bool persistent_)
: table(table_)
, metadata_snapshot(metadata_snapshot_)
, backup_path(backup_path_)
......@@ -67,6 +71,7 @@ SetOrJoinBlockOutputStream::SetOrJoinBlockOutputStream(
, backup_buf(backup_tmp_path + backup_file_name)
, compressed_backup_buf(backup_buf)
, backup_stream(compressed_backup_buf, 0, metadata_snapshot->getSampleBlock())
, persistent(persistent_)
{
}
......@@ -76,24 +81,28 @@ void SetOrJoinBlockOutputStream::write(const Block & block)
Block sorted_block = block.sortColumns();
table.insertBlock(sorted_block);
backup_stream.write(sorted_block);
if (persistent)
backup_stream.write(sorted_block);
}
void SetOrJoinBlockOutputStream::writeSuffix()
{
table.finishInsert();
backup_stream.flush();
compressed_backup_buf.next();
backup_buf.next();
if (persistent)
{
backup_stream.flush();
compressed_backup_buf.next();
backup_buf.next();
Poco::File(backup_tmp_path + backup_file_name).renameTo(backup_path + backup_file_name);
Poco::File(backup_tmp_path + backup_file_name).renameTo(backup_path + backup_file_name);
}
}
BlockOutputStreamPtr StorageSetOrJoinBase::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, const Context & /*context*/)
{
UInt64 id = ++increment;
return std::make_shared<SetOrJoinBlockOutputStream>(*this, metadata_snapshot, path, path + "tmp/", toString(id) + ".bin");
return std::make_shared<SetOrJoinBlockOutputStream>(*this, metadata_snapshot, path, path + "tmp/", toString(id) + ".bin", persistent);
}
......@@ -102,8 +111,10 @@ StorageSetOrJoinBase::StorageSetOrJoinBase(
const StorageID & table_id_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
const Context & context_)
: IStorage(table_id_)
const Context & context_,
bool persistent_)
: IStorage(table_id_),
persistent(persistent_)
{
StorageInMemoryMetadata storage_metadata;
storage_metadata.setColumns(columns_);
......@@ -124,8 +135,9 @@ StorageSet::StorageSet(
const StorageID & table_id_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
const Context & context_)
: StorageSetOrJoinBase{relative_path_, table_id_, columns_, constraints_, context_},
const Context & context_,
bool persistent_)
: StorageSetOrJoinBase{relative_path_, table_id_, columns_, constraints_, context_, persistent_},
set(std::make_shared<Set>(SizeLimits(), false, true))
{
......@@ -229,8 +241,16 @@ void registerStorageSet(StorageFactory & factory)
"Engine " + args.engine_name + " doesn't support any arguments (" + toString(args.engine_args.size()) + " given)",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
return StorageSet::create(args.relative_data_path, args.table_id, args.columns, args.constraints, args.context);
});
bool has_settings = args.storage_def->settings;
auto set_settings = std::make_unique<SetSettings>();
if (has_settings)
{
set_settings->loadFromQuery(*args.storage_def);
}
return StorageSet::create(args.relative_data_path, args.table_id, args.columns, args.constraints, args.context, set_settings->persistent);
}, StorageFactory::StorageFeatures{ .supports_settings = true, });
}
......
......@@ -3,6 +3,7 @@
#include <ext/shared_ptr_helper.h>
#include <Storages/IStorage.h>
#include <Storages/SetSettings.h>
namespace DB
......@@ -31,10 +32,12 @@ protected:
const StorageID & table_id_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
const Context & context_);
const Context & context_,
bool persistent_);
String base_path;
String path;
bool persistent;
std::atomic<UInt64> increment = 0; /// For the backup file names.
......@@ -82,7 +85,8 @@ protected:
const StorageID & table_id_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
const Context & context_);
const Context & context_,
bool persistent_);
};
}
......@@ -390,7 +390,7 @@ void StorageTinyLog::addFiles(const String & column_name, const IDataType & type
throw Exception("Duplicate column with name " + column_name + " in constructor of StorageTinyLog.",
ErrorCodes::DUPLICATE_COLUMN);
IDataType::StreamCallback stream_callback = [&] (const IDataType::SubstreamPath & substream_path)
IDataType::StreamCallback stream_callback = [&] (const IDataType::SubstreamPath & substream_path, const IDataType & /* substream_type */)
{
String stream_name = IDataType::getFileNameForStream(column_name, substream_path);
if (!files.count(stream_name))
......
......@@ -20,6 +20,7 @@ SRCS(
getStructureOfRemoteTable.cpp
IndicesDescription.cpp
IStorage.cpp
JoinSettings.cpp
KeyDescription.cpp
LiveView/StorageLiveView.cpp
LiveView/TemporaryLiveViewCleaner.cpp
......@@ -108,6 +109,7 @@ SRCS(
ReadInOrderOptimizer.cpp
registerStorages.cpp
SelectQueryDescription.cpp
SetSettings.cpp
StorageBuffer.cpp
StorageDictionary.cpp
StorageDistributed.cpp
......
......@@ -45,7 +45,6 @@ def _create_env_file(path, variables, fname=DEFAULT_ENV_NAME):
f.write("=".join([var, value]) + "\n")
return full_path
def subprocess_check_call(args):
# Uncomment for debugging
# print('run:', ' ' . join(args))
......@@ -125,6 +124,7 @@ class ClickHouseCluster:
self.base_zookeeper_cmd = None
self.base_mysql_cmd = []
self.base_kafka_cmd = []
self.base_kerberized_kafka_cmd = []
self.base_rabbitmq_cmd = []
self.base_cassandra_cmd = []
self.pre_zookeeper_commands = []
......@@ -133,6 +133,7 @@ class ClickHouseCluster:
self.with_mysql = False
self.with_postgres = False
self.with_kafka = False
self.with_kerberized_kafka = False
self.with_rabbitmq = False
self.with_odbc_drivers = False
self.with_hdfs = False
......@@ -169,7 +170,7 @@ class ClickHouseCluster:
def add_instance(self, name, base_config_dir=None, main_configs=None, user_configs=None, dictionaries=None,
macros=None,
with_zookeeper=False, with_mysql=False, with_kafka=False, with_rabbitmq=False,
with_zookeeper=False, with_mysql=False, with_kafka=False, with_kerberized_kafka=False, with_rabbitmq=False,
clickhouse_path_dir=None,
with_odbc_drivers=False, with_postgres=False, with_hdfs=False, with_mongo=False,
with_redis=False, with_minio=False, with_cassandra=False,
......@@ -207,6 +208,7 @@ class ClickHouseCluster:
zookeeper_config_path=self.zookeeper_config_path,
with_mysql=with_mysql,
with_kafka=with_kafka,
with_kerberized_kafka=with_kerberized_kafka,
with_rabbitmq=with_rabbitmq,
with_mongo=with_mongo,
with_redis=with_redis,
......@@ -290,6 +292,13 @@ class ClickHouseCluster:
p.join(docker_compose_yml_dir, 'docker_compose_kafka.yml')]
cmds.append(self.base_kafka_cmd)
if with_kerberized_kafka and not self.with_kerberized_kafka:
self.with_kerberized_kafka = True
self.base_cmd.extend(['--file', p.join(docker_compose_yml_dir, 'docker_compose_kerberized_kafka.yml')])
self.base_kerberized_kafka_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name',
self.project_name, '--file', p.join(docker_compose_yml_dir, 'docker_compose_kerberized_kafka.yml')]
cmds.append(self.base_kerberized_kafka_cmd)
if with_rabbitmq and not self.with_rabbitmq:
self.with_rabbitmq = True
self.base_cmd.extend(['--file', p.join(docker_compose_yml_dir, 'docker_compose_rabbitmq.yml')])
......@@ -608,6 +617,11 @@ class ClickHouseCluster:
self.kafka_docker_id = self.get_instance_docker_id('kafka1')
self.wait_schema_registry_to_start(120)
if self.with_kerberized_kafka and self.base_kerberized_kafka_cmd:
env = os.environ.copy()
env['KERBERIZED_KAFKA_DIR'] = instance.path + '/'
subprocess.check_call(self.base_kerberized_kafka_cmd + common_opts + ['--renew-anon-volumes'], env=env)
self.kerberized_kafka_docker_id = self.get_instance_docker_id('kerberized_kafka1')
if self.with_rabbitmq and self.base_rabbitmq_cmd:
subprocess_check_call(self.base_rabbitmq_cmd + common_opts + ['--renew-anon-volumes'])
self.rabbitmq_docker_id = self.get_instance_docker_id('rabbitmq1')
......@@ -788,9 +802,12 @@ services:
- {instance_config_dir}:/etc/clickhouse-server/
- {db_dir}:/var/lib/clickhouse/
- {logs_dir}:/var/log/clickhouse-server/
- /etc/passwd:/etc/passwd:ro
{binary_volume}
{odbc_bridge_volume}
{odbc_ini_path}
{keytab_path}
{krb5_conf}
entrypoint: {entrypoint_cmd}
tmpfs: {tmpfs}
cap_add:
......@@ -820,7 +837,7 @@ class ClickHouseInstance:
def __init__(
self, cluster, base_path, name, base_config_dir, custom_main_configs, custom_user_configs,
custom_dictionaries,
macros, with_zookeeper, zookeeper_config_path, with_mysql, with_kafka, with_rabbitmq, with_mongo,
macros, with_zookeeper, zookeeper_config_path, with_mysql, with_kafka, with_kerberized_kafka, with_rabbitmq, with_mongo,
with_redis, with_minio,
with_cassandra, server_bin_path, odbc_bridge_bin_path, clickhouse_path_dir, with_odbc_drivers,
hostname=None, env_variables=None,
......@@ -839,6 +856,7 @@ class ClickHouseInstance:
self.custom_user_config_paths = [p.abspath(p.join(base_path, c)) for c in custom_user_configs]
self.custom_dictionaries_paths = [p.abspath(p.join(base_path, c)) for c in custom_dictionaries]
self.clickhouse_path_dir = p.abspath(p.join(base_path, clickhouse_path_dir)) if clickhouse_path_dir else None
self.kerberos_secrets_dir = p.abspath(p.join(base_path, 'secrets'))
self.macros = macros if macros is not None else {}
self.with_zookeeper = with_zookeeper
self.zookeeper_config_path = zookeeper_config_path
......@@ -848,6 +866,7 @@ class ClickHouseInstance:
self.with_mysql = with_mysql
self.with_kafka = with_kafka
self.with_kerberized_kafka = with_kerberized_kafka
self.with_rabbitmq = with_rabbitmq
self.with_mongo = with_mongo
self.with_redis = with_redis
......@@ -863,6 +882,13 @@ class ClickHouseInstance:
else:
self.odbc_ini_path = ""
if with_kerberized_kafka:
self.keytab_path = '- ' + os.path.dirname(self.docker_compose_path) + "/secrets:/tmp/keytab"
self.krb5_conf = '- ' + os.path.dirname(self.docker_compose_path) + "/secrets/krb.conf:/etc/krb5.conf:ro"
else:
self.keytab_path = ""
self.krb5_conf = ""
self.docker_client = None
self.ip_address = None
self.client = None
......@@ -1192,6 +1218,9 @@ class ClickHouseInstance:
if self.with_zookeeper:
shutil.copy(self.zookeeper_config_path, conf_d_dir)
if self.with_kerberized_kafka:
shutil.copytree(self.kerberos_secrets_dir, p.abspath(p.join(self.path, 'secrets')))
# Copy config.d configs
print "Copy custom test config files {} to {}".format(self.custom_main_config_paths, self.config_d_dir)
for path in self.custom_main_config_paths:
......@@ -1227,6 +1256,9 @@ class ClickHouseInstance:
depends_on.append("kafka1")
depends_on.append("schema-registry")
if self.with_kerberized_kafka:
depends_on.append("kerberized_kafka1")
if self.with_rabbitmq:
depends_on.append("rabbitmq1")
......@@ -1290,6 +1322,8 @@ class ClickHouseInstance:
user=os.getuid(),
env_file=env_file,
odbc_ini_path=odbc_ini_path,
keytab_path=self.keytab_path,
krb5_conf=self.krb5_conf,
entrypoint_cmd=entrypoint_cmd,
networks=networks,
app_net=app_net,
......
......@@ -155,7 +155,9 @@ if __name__ == "__main__":
elif image == "yandex/clickhouse-postgresql-java-client":
env_tags += "-e {}={} ".format("DOCKER_POSTGRESQL_JAVA_CLIENT_TAG", tag)
elif image == "yandex/clickhouse-integration-test":
env_tags += "-e {}={}".format("DOCKER_BASE_TAG", tag)
env_tags += "-e {}={} ".format("DOCKER_BASE_TAG", tag)
elif image == "yandex/clickhouse-kerberos-kdc":
env_tags += "-e {}={}".format("DOCKER_KERBEROS_KDC_TAG", tag)
else:
logging.info("Unknown image {}".format(image))
......
import random
import string
import pytest
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', with_zookeeper=True)
node2 = cluster.add_instance('node2', with_zookeeper=True)
@pytest.fixture(scope="module")
def start_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def get_compression_codec_byte(node, table_name, part_name, filename):
cmd = "tail -c +17 /var/lib/clickhouse/data/default/{}/{}/{}.bin | od -x -N 1 | head -n 1 | awk '{{print $2}}'".format(
table_name, part_name, filename)
return node.exec_in_container(["bash", "-c", cmd]).strip()
CODECS_MAPPING = {
'NONE' : '0002',
'LZ4': '0082',
'LZ4HC': '0082', # not an error, same byte
'ZSTD': '0090',
'Multiple': '0091',
'Delta': '0092',
'T64': '0093',
}
def test_nested_compression_codec(start_cluster):
for i, node in enumerate([node1, node2]):
node.query("""
CREATE TABLE compression_table (
key UInt64,
column_ok Nullable(UInt64) CODEC(Delta, LZ4),
column_array Array(Array(UInt64)) CODEC(T64, LZ4),
column_bad LowCardinality(Int64) CODEC(Delta)
) ENGINE = ReplicatedMergeTree('/t', '{}') ORDER BY tuple() PARTITION BY key
SETTINGS min_rows_for_wide_part = 0, min_bytes_for_wide_part = 0;
""".format(i), settings={"allow_suspicious_codecs" : "1", "allow_suspicious_low_cardinality_types" : "1"})
node1.query("INSERT INTO compression_table VALUES (1, 1, [[77]], 32)")
node2.query("SYSTEM SYNC REPLICA compression_table", timeout=5)
node1.query("DETACH TABLE compression_table")
node2.query("DETACH TABLE compression_table")
node1.query("ATTACH TABLE compression_table")
node2.query("ATTACH TABLE compression_table")
for node in [node1, node2]:
assert get_compression_codec_byte(node, "compression_table", "1_0_0_0", "column_ok") == CODECS_MAPPING['Multiple']
assert get_compression_codec_byte(node, "compression_table", "1_0_0_0", "column_ok.null") == CODECS_MAPPING['LZ4']
assert get_compression_codec_byte(node1, "compression_table", "1_0_0_0", "column_array") == CODECS_MAPPING['Multiple']
assert get_compression_codec_byte(node2, "compression_table", "1_0_0_0", "column_array.size0") == CODECS_MAPPING['LZ4']
assert get_compression_codec_byte(node2, "compression_table", "1_0_0_0", "column_array.size1") == CODECS_MAPPING['LZ4']
assert get_compression_codec_byte(node2, "compression_table", "1_0_0_0", "column_bad.dict") == CODECS_MAPPING['Delta']
assert get_compression_codec_byte(node1, "compression_table", "1_0_0_0", "column_bad") == CODECS_MAPPING['NONE']
......@@ -124,6 +124,9 @@ def test_clickhouse_dml_for_mysql_database(started_cluster):
clickhouse_node.query("INSERT INTO `test_database`.`test_table`(`i``d`) select number from numbers(10000)")
assert clickhouse_node.query("SELECT count() FROM `test_database`.`test_table`").rstrip() == '10000'
clickhouse_node.query("DROP DATABASE test_database")
assert 'test_database' not in clickhouse_node.query('SHOW DATABASES')
mysql_node.query("DROP DATABASE test_database")
......@@ -160,6 +163,36 @@ def test_bad_arguments_for_mysql_database_engine(started_cluster):
mysql_node.query("DROP DATABASE test_bad_arguments")
def test_data_types_support_level_for_mysql_database_engine(started_cluster):
with contextlib.closing(MySQLNodeInstance('root', 'clickhouse', '127.0.0.1', port=3308)) as mysql_node:
mysql_node.query("CREATE DATABASE IF NOT EXISTS test DEFAULT CHARACTER SET 'utf8'")
clickhouse_node.query("CREATE DATABASE test_database ENGINE = MySQL('mysql1:3306', test, 'root', 'clickhouse')",
settings={"mysql_datatypes_support_level": "decimal,datetime64"})
assert "SETTINGS mysql_datatypes_support_level = \\'decimal,datetime64\\'" in clickhouse_node.query("SHOW CREATE DATABASE test_database FORMAT TSV")
clickhouse_node.query("DETACH DATABASE test_database")
# without context settings
clickhouse_node.query("ATTACH DATABASE test_database")
assert "SETTINGS mysql_datatypes_support_level = \\'decimal,datetime64\\'" in clickhouse_node.query("SHOW CREATE DATABASE test_database FORMAT TSV")
clickhouse_node.query(
"CREATE DATABASE test_database_1 ENGINE = MySQL('mysql1:3306', test, 'root', 'clickhouse') SETTINGS mysql_datatypes_support_level = 'decimal,datetime64'",
settings={"mysql_datatypes_support_level": "decimal"})
assert "SETTINGS mysql_datatypes_support_level = \\'decimal,datetime64\\'" in clickhouse_node.query("SHOW CREATE DATABASE test_database_1 FORMAT TSV")
clickhouse_node.query("DETACH DATABASE test_database_1")
# without context settings
clickhouse_node.query("ATTACH DATABASE test_database_1")
assert "SETTINGS mysql_datatypes_support_level = \\'decimal,datetime64\\'" in clickhouse_node.query("SHOW CREATE DATABASE test_database_1 FORMAT TSV")
clickhouse_node.query("DROP DATABASE test_database")
clickhouse_node.query("DROP DATABASE test_database_1")
assert 'test_database' not in clickhouse_node.query('SHOW DATABASES')
mysql_node.query("DROP DATABASE test")
decimal_values = [0.123, 0.4, 5.67, 8.91011, 123456789.123, -0.123, -0.4, -5.67, -8.91011, -123456789.123]
timestamp_values = ['2015-05-18 07:40:01.123', '2019-09-16 19:20:11.123']
timestamp_values_no_subsecond = ['2015-05-18 07:40:01', '2019-09-16 19:20:11']
......
<yandex>
<kafka>
<auto_offset_reset>earliest</auto_offset_reset>
<!-- Debugging of possible issues, like:
- https://github.com/edenhill/librdkafka/issues/2077
- https://github.com/edenhill/librdkafka/issues/1778
- #5615
XXX: for now this messages will appears in stderr.
-->
<security_protocol>SASL_PLAINTEXT</security_protocol>
<sasl_mechanism>GSSAPI</sasl_mechanism>
<sasl_kerberos_service_name>kafka</sasl_kerberos_service_name>
<sasl_kerberos_keytab>/tmp/keytab/clickhouse.keytab</sasl_kerberos_keytab>
<sasl_kerberos_principal>kafkauser/instance@TEST.CLICKHOUSE.TECH</sasl_kerberos_principal>
<debug>security</debug>
<api_version_request>false</api_version_request>
</kafka>
<kafka_consumer_hang>
<!-- default: 3000 -->
<heartbeat_interval_ms>300</heartbeat_interval_ms>
<!-- default: 10000 -->
<session_timeout_ms>6000</session_timeout_ms>
</kafka_consumer_hang>
</yandex>
<yandex>
<logger>
<level>trace</level>
<log>/var/log/clickhouse-server/log.log</log>
<errorlog>/var/log/clickhouse-server/log.err.log</errorlog>
<size>1000M</size>
<count>10</count>
<stderr>/var/log/clickhouse-server/stderr.log</stderr>
<stdout>/var/log/clickhouse-server/stdout.log</stdout>
</logger>
</yandex>
\ No newline at end of file
#!/bin/bash
set -x # trace
: "${REALM:=TEST.CLICKHOUSE.TECH}"
: "${DOMAIN_REALM:=test.clickhouse.tech}"
: "${KERB_MASTER_KEY:=masterkey}"
: "${KERB_ADMIN_USER:=admin}"
: "${KERB_ADMIN_PASS:=admin}"
create_config() {
: "${KDC_ADDRESS:=$(hostname -f)}"
cat>/etc/krb5.conf<<EOF
[logging]
default = FILE:/var/log/kerberos/krb5libs.log
kdc = FILE:/var/log/kerberos/krb5kdc.log
admin_server = FILE:/var/log/kerberos/kadmind.log
[libdefaults]
default_realm = $REALM
dns_lookup_realm = false
dns_lookup_kdc = false
ticket_lifetime = 15s
renew_lifetime = 15s
forwardable = true
# WARNING: We use weaker key types to simplify testing as stronger key types
# require the enhanced security JCE policy file to be installed. You should
# NOT run with this configuration in production or any real environment. You
# have been warned.
default_tkt_enctypes = des-cbc-md5 des-cbc-crc des3-cbc-sha1
default_tgs_enctypes = des-cbc-md5 des-cbc-crc des3-cbc-sha1
permitted_enctypes = des-cbc-md5 des-cbc-crc des3-cbc-sha1
[realms]
$REALM = {
kdc = $KDC_ADDRESS
admin_server = $KDC_ADDRESS
}
[domain_realm]
.$DOMAIN_REALM = $REALM
$DOMAIN_REALM = $REALM
EOF
cat>/var/kerberos/krb5kdc/kdc.conf<<EOF
[kdcdefaults]
kdc_ports = 88
kdc_tcp_ports = 88
[realms]
$REALM = {
acl_file = /var/kerberos/krb5kdc/kadm5.acl
dict_file = /usr/share/dict/words
admin_keytab = /var/kerberos/krb5kdc/kadm5.keytab
# WARNING: We use weaker key types to simplify testing as stronger key types
# require the enhanced security JCE policy file to be installed. You should
# NOT run with this configuration in production or any real environment. You
# have been warned.
master_key_type = des3-hmac-sha1
supported_enctypes = arcfour-hmac:normal des3-hmac-sha1:normal des-cbc-crc:normal des:normal des:v4 des:norealm des:onlyrealm des:afs3
default_principal_flags = +preauth
}
EOF
}
create_db() {
/usr/sbin/kdb5_util -P $KERB_MASTER_KEY -r $REALM create -s
}
start_kdc() {
mkdir -p /var/log/kerberos
/etc/rc.d/init.d/krb5kdc start
/etc/rc.d/init.d/kadmin start
chkconfig krb5kdc on
chkconfig kadmin on
}
restart_kdc() {
/etc/rc.d/init.d/krb5kdc restart
/etc/rc.d/init.d/kadmin restart
}
create_admin_user() {
kadmin.local -q "addprinc -pw $KERB_ADMIN_PASS $KERB_ADMIN_USER/admin"
echo "*/admin@$REALM *" > /var/kerberos/krb5kdc/kadm5.acl
}
create_keytabs() {
kadmin.local -q "addprinc -randkey zookeeper/kafka_kerberized_zookeeper@${REALM}"
kadmin.local -q "ktadd -norandkey -k /tmp/keytab/kafka_kerberized_zookeeper.keytab zookeeper/kafka_kerberized_zookeeper@${REALM}"
kadmin.local -q "addprinc -randkey kafka/kerberized_kafka1@${REALM}"
kadmin.local -q "ktadd -norandkey -k /tmp/keytab/kerberized_kafka.keytab kafka/kerberized_kafka1@${REALM}"
kadmin.local -q "addprinc -randkey zkclient@${REALM}"
kadmin.local -q "ktadd -norandkey -k /tmp/keytab/zkclient.keytab zkclient@${REALM}"
kadmin.local -q "addprinc -randkey kafkauser/instance@${REALM}"
kadmin.local -q "ktadd -norandkey -k /tmp/keytab/clickhouse.keytab kafkauser/instance@${REALM}"
chmod g+r /tmp/keytab/clickhouse.keytab
}
main() {
if [ ! -f /kerberos_initialized ]; then
create_config
create_db
create_admin_user
start_kdc
touch /kerberos_initialized
fi
if [ ! -f /var/kerberos/krb5kdc/principal ]; then
while true; do sleep 1000; done
else
start_kdc
create_keytabs
tail -F /var/log/kerberos/krb5kdc.log
fi
}
[[ "$0" == "${BASH_SOURCE[0]}" ]] && main "$@"
KafkaServer {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/etc/kafka/secrets/kerberized_kafka.keytab"
principal="kafka/kerberized_kafka1@TEST.CLICKHOUSE.TECH";
};
Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/etc/kafka/secrets/zkclient.keytab"
principal="zkclient@TEST.CLICKHOUSE.TECH";
};
[logging]
default = FILE:/var/log/kerberos/krb5libs.log
kdc = FILE:/var/log/kerberos/krb5kdc.log
admin_server = FILE:/var/log/kerberos/kadmind.log
[libdefaults]
default_realm = TEST.CLICKHOUSE.TECH
dns_lookup_realm = false
dns_lookup_kdc = false
ticket_lifetime = 15s
renew_lifetime = 15s
forwardable = true
[realms]
TEST.CLICKHOUSE.TECH = {
kdc = kafka_kerberos
admin_server = kafka_kerberos
}
[domain_realm]
.TEST.CLICKHOUSE.TECH = TEST.CLICKHOUSE.TECH
TEST.CLICKHOUSE.TECH = TEST.CLICKHOUSE.TECH
Server {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/etc/kafka/secrets/kafka_kerberized_zookeeper.keytab"
principal="zookeeper/kafka_kerberized_zookeeper@TEST.CLICKHOUSE.TECH";
};
Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/etc/kafka/secrets/zkclient.keytab"
principal="zkclient@TEST.CLICKHOUSE.TECH";
};
import os.path as p
import random
import threading
import time
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import TSV
from helpers.client import QueryRuntimeException
from helpers.network import PartitionManager
import json
import subprocess
import kafka.errors
from kafka import KafkaAdminClient, KafkaProducer, KafkaConsumer, BrokerConnection
from kafka.admin import NewTopic
from kafka.protocol.admin import DescribeGroupsResponse_v1, DescribeGroupsRequest_v1
from kafka.protocol.group import MemberAssignment
import socket
cluster = ClickHouseCluster(__file__)
instance = cluster.add_instance('instance',
main_configs=['configs/kafka.xml', 'configs/log_conf.xml' ],
with_kerberized_kafka=True,
clickhouse_path_dir="clickhouse_path"
)
kafka_id = '' # instance.cluster.kafka_docker_id
# Helpers
def check_kafka_is_available():
# plaintext
p = subprocess.Popen(('docker',
'exec',
'-i',
kafka_id,
'/usr/bin/kafka-broker-api-versions',
'--bootstrap-server',
'localhost:9093'),
stdout=subprocess.PIPE)
p.communicate()
return p.returncode == 0
def wait_kafka_is_available(max_retries=50):
retries = 0
while True:
if check_kafka_is_available():
break
else:
retries += 1
if retries > max_retries:
raise "Kafka is not available"
print("Waiting for Kafka to start up")
time.sleep(1)
def kafka_produce(topic, messages, timestamp=None):
producer = KafkaProducer(bootstrap_servers="localhost:9093")
for message in messages:
producer.send(topic=topic, value=message, timestamp_ms=timestamp)
producer.flush()
print ("Produced {} messages for topic {}".format(len(messages), topic))
# Fixtures
@pytest.fixture(scope="module")
def kafka_cluster():
try:
global kafka_id
cluster.start()
kafka_id = instance.cluster.kerberized_kafka_docker_id
print("kafka_id is {}".format(kafka_id))
yield cluster
finally:
cluster.shutdown()
@pytest.fixture(autouse=True)
def kafka_setup_teardown():
instance.query('DROP DATABASE IF EXISTS test; CREATE DATABASE test;')
wait_kafka_is_available()
print("kafka is available - running test")
yield # run test
# Tests
@pytest.mark.timeout(180) # wait to build containers
def test_kafka_json_as_string(kafka_cluster):
kafka_produce('kafka_json_as_string', ['{"t": 123, "e": {"x": "woof"} }', '', '{"t": 124, "e": {"x": "test"} }', '{"F1":"V1","F2":{"F21":"V21","F22":{},"F23":"V23","F24":"2019-12-24T16:28:04"},"F3":"V3"}'])
instance.query('''
CREATE TABLE test.kafka (field String)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kerberized_kafka1:19092',
kafka_topic_list = 'kafka_json_as_string',
kafka_group_name = 'kafka_json_as_string',
kafka_format = 'JSONAsString',
kafka_flush_interval_ms=1000;
''')
result = instance.query('SELECT * FROM test.kafka;')
expected = '''\
{"t": 123, "e": {"x": "woof"} }
{"t": 124, "e": {"x": "test"} }
{"F1":"V1","F2":{"F21":"V21","F22":{},"F23":"V23","F24":"2019-12-24T16:28:04"},"F3":"V3"}
'''
assert TSV(result) == TSV(expected)
assert instance.contains_in_log("Parsing of message (topic: kafka_json_as_string, partition: 0, offset: 1) return no rows")
def test_kafka_json_as_string_no_kdc(kafka_cluster):
kafka_produce('kafka_json_as_string_no_kdc', ['{"t": 123, "e": {"x": "woof"} }', '', '{"t": 124, "e": {"x": "test"} }', '{"F1":"V1","F2":{"F21":"V21","F22":{},"F23":"V23","F24":"2019-12-24T16:28:04"},"F3":"V3"}'])
kafka_cluster.pause_container('kafka_kerberos')
time.sleep(45) # wait for ticket expiration
instance.query('''
CREATE TABLE test.kafka_no_kdc (field String)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kerberized_kafka1:19092',
kafka_topic_list = 'kafka_json_as_string_no_kdc',
kafka_group_name = 'kafka_json_as_string_no_kdc',
kafka_format = 'JSONAsString',
kafka_flush_interval_ms=1000;
''')
result = instance.query('SELECT * FROM test.kafka_no_kdc;')
expected = ''
kafka_cluster.unpause_container('kafka_kerberos')
assert TSV(result) == TSV(expected)
assert instance.contains_in_log("StorageKafka (kafka_no_kdc): Nothing to commit")
assert instance.contains_in_log("Ticket expired")
assert instance.contains_in_log("Kerberos ticket refresh failed")
if __name__ == '__main__':
cluster.start()
raw_input("Cluster created, press any key to destroy...")
cluster.shutdown()
......@@ -17,3 +17,55 @@ WHERE id LIKE 'id%'
GROUP BY id;
DROP TABLE tags;
-- https://github.com/ClickHouse/ClickHouse/issues/15294
drop table if exists TestTable;
create table TestTable (column String, start DateTime, end DateTime) engine MergeTree order by start;
insert into TestTable (column, start, end) values('test', toDateTime('2020-07-20 09:00:00'), toDateTime('2020-07-20 20:00:00')),('test1', toDateTime('2020-07-20 09:00:00'), toDateTime('2020-07-20 20:00:00')),('test2', toDateTime('2020-07-20 09:00:00'), toDateTime('2020-07-20 20:00:00'));
SELECT column,
(SELECT d from (select [1, 2, 3, 4] as d)) as d
FROM TestTable
where column == 'test'
GROUP BY column;
drop table TestTable;
-- https://github.com/ClickHouse/ClickHouse/issues/11407
drop table if exists aaa;
drop table if exists bbb;
CREATE TABLE aaa (
id UInt16,
data String
)
ENGINE = MergeTree()
PARTITION BY tuple()
ORDER BY id;
INSERT INTO aaa VALUES (1, 'sef'),(2, 'fre'),(3, 'jhg');
CREATE TABLE bbb (
id UInt16,
data String
)
ENGINE = MergeTree()
PARTITION BY tuple()
ORDER BY id;
INSERT INTO bbb VALUES (2, 'fre'), (3, 'jhg');
with (select groupArray(id) from bbb) as ids
select *
from aaa
where has(ids, id)
order by id;
drop table aaa;
drop table bbb;
CREATE TABLE delta_codec_synthetic (`id` UInt64 NULL CODEC(Delta, ZSTD(22))) ENGINE = MergeTree() ORDER BY tuple(); -- { serverError 36 }
CREATE TABLE delta_codec_synthetic (`id` UInt64 NULL CODEC(DoubleDelta, ZSTD(22))) ENGINE = MergeTree() ORDER BY tuple(); -- { serverError 36 }
CREATE TABLE delta_codec_synthetic (`id` UInt64 NULL CODEC(Gorilla, ZSTD(22))) ENGINE = MergeTree() ORDER BY tuple(); -- { serverError 36 }
CREATE TABLE delta_codec_synthetic (`id` Decimal(38, 10) CODEC(Delta, ZSTD(22))) ENGINE = MergeTree() ORDER BY tuple(); -- { serverError 36 }
CREATE TABLE delta_codec_synthetic (`id` Decimal(38, 10) CODEC(DoubleDelta, ZSTD(22))) ENGINE = MergeTree() ORDER BY tuple(); -- { serverError 36 }
CREATE TABLE delta_codec_synthetic (`id` Decimal(38, 10) CODEC(Gorilla, ZSTD(22))) ENGINE = MergeTree() ORDER BY tuple(); -- { serverError 36 }
......
----- Default Settings -----
1
----- Settings persistent=1 -----
1
----- Settings persistent=0 -----
DROP TABLE IF EXISTS set;
DROP TABLE IF EXISTS number;
CREATE TABLE number (number UInt64) ENGINE = Memory();
INSERT INTO number values (1);
SELECT '----- Default Settings -----';
CREATE TABLE set (val UInt64) ENGINE = Set();
INSERT INTO set VALUES (1);
DETACH TABLE set;
ATTACH TABLE set;
SELECT number FROM number WHERE number IN set LIMIT 1;
DROP TABLE set;
SELECT '----- Settings persistent=1 -----';
CREATE TABLE set (val UInt64) ENGINE = Set() SETTINGS persistent=1;
INSERT INTO set VALUES (1);
DETACH TABLE set;
ATTACH TABLE set;
SELECT number FROM number WHERE number IN set LIMIT 1;
DROP TABLE set;
SELECT '----- Settings persistent=0 -----';
CREATE TABLE set (val UInt64) ENGINE = Set() SETTINGS persistent=0;
INSERT INTO set VALUES (1);
DETACH TABLE set;
ATTACH TABLE set;
SELECT number FROM number WHERE number IN set LIMIT 1;
DROP TABLE set;
DROP TABLE number;
----- Default Settings -----
1 21
----- Settings persistent=1 -----
1 21
----- Settings persistent=0 -----
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册