未验证 提交 e378d6d4 编写于 作者: V Vlad Ilyushchenko 提交者: GitHub

feat(sql): hide telemetry table from `tables()` SQL output (#1951)

上级 8eee34ed
......@@ -39,4 +39,9 @@ public class DefaultTelemetryConfiguration implements TelemetryConfiguration {
public int getQueueCapacity() {
return 16;
}
@Override
public boolean hideTables() {
return false;
}
}
......@@ -179,6 +179,7 @@ public class PropServerConfiguration implements ServerConfiguration {
private final boolean telemetryEnabled;
private final boolean telemetryDisableCompletely;
private final int telemetryQueueCapacity;
private final boolean telemetryHideTables;
private final LineTcpReceiverConfiguration lineTcpReceiverConfiguration = new PropLineTcpReceiverConfiguration();
private final IODispatcherConfiguration lineTcpReceiverDispatcherConfiguration = new PropLineTcpReceiverIODispatcherConfiguration();
private final boolean lineTcpEnabled;
......@@ -757,6 +758,7 @@ public class PropServerConfiguration implements ServerConfiguration {
this.telemetryEnabled = getBoolean(properties, env, "telemetry.enabled", true);
this.telemetryDisableCompletely = getBoolean(properties, env, "telemetry.disable.completely", false);
this.telemetryQueueCapacity = Numbers.ceilPow2(getInt(properties, env, "telemetry.queue.capacity", 512));
this.telemetryHideTables = getBoolean(properties, env, "telemetry.hide.tables", true);
this.o3PartitionPurgeListCapacity = getInt(properties, env, "cairo.o3.partition.purge.list.initial.capacity", 1);
parseBindTo(properties, env, "line.udp.bind.to", "0.0.0.0:9009", (a, p) -> {
......@@ -2914,6 +2916,11 @@ public class PropServerConfiguration implements ServerConfiguration {
public int getQueueCapacity() {
return telemetryQueueCapacity;
}
@Override
public boolean hideTables() {
return telemetryHideTables;
}
}
private class PropHttpMinServerConfiguration implements HttpMinServerConfiguration {
......
......@@ -30,4 +30,6 @@ public interface TelemetryConfiguration {
boolean getEnabled();
int getQueueCapacity();
boolean hideTables();
}
......@@ -48,22 +48,21 @@ import org.jetbrains.annotations.Nullable;
import java.io.Closeable;
public class TelemetryJob extends SynchronizedJob implements Closeable {
private static final Log LOG = LogFactory.getLog(TelemetryJob.class);
private static final String WRITER_LOCK_REASON = "telemetryJob";
private final static CharSequence tableName = "telemetry";
final static CharSequence configTableName = "telemetry_config";
public final static CharSequence tableName = "telemetry";
public final static CharSequence configTableName = "telemetry_config";
static final String QDB_PACKAGE = "QDB_PACKAGE";
static final String OS_NAME = "os.name";
private static final Log LOG = LogFactory.getLog(TelemetryJob.class);
private static final String WRITER_LOCK_REASON = "telemetryJob";
private final MicrosecondClock clock;
private final CairoConfiguration configuration;
private final RingQueue<TelemetryTask> queue;
private final SCSequence subSeq;
private final SCSequence tempSequence = new SCSequence();
private boolean enabled;
private TableWriter writer;
private final QueueConsumer<TelemetryTask> myConsumer = this::newRowConsumer;
private TableWriter configWriter;
private final SCSequence tempSequence = new SCSequence();
public TelemetryJob(CairoEngine engine) throws SqlException {
this(engine, null);
......@@ -127,20 +126,6 @@ public class TelemetryJob extends SynchronizedJob implements Closeable {
}
}
private void tryAddColumn(SqlCompiler compiler, SqlExecutionContext executionContext, CharSequence columnDetails) {
try {
CompiledQuery cc = compiler.compile(
"ALTER TABLE " + configTableName + " ADD COLUMN " + columnDetails,
executionContext
);
try (QueryFuture execution = cc.execute(tempSequence)) {
execution.await();
}
} catch (SqlException ex) {
LOG.info().$("Failed to alter telemetry table [table=").$(configTableName).$(",error=").$(ex.getFlyweightMessage()).I$();
}
}
@Override
public void close() {
if (enabled) {
......@@ -162,42 +147,6 @@ public class TelemetryJob extends SynchronizedJob implements Closeable {
return false;
}
private TableWriter updateTelemetryConfig(
SqlCompiler compiler,
SqlExecutionContextImpl sqlExecutionContext,
boolean enabled
) throws SqlException {
final TableWriter configWriter = compiler.getEngine().getWriter(AllowAllCairoSecurityContext.INSTANCE, configTableName, WRITER_LOCK_REASON);
final CompiledQuery cc = compiler.compile(configTableName + " LIMIT -1", sqlExecutionContext);
try (final RecordCursor cursor = cc.getRecordCursorFactory().getCursor(sqlExecutionContext)) {
if (cursor.hasNext()) {
final Record record = cursor.getRecord();
final boolean _enabled = record.getBool(1);
Long256 l256 = record.getLong256A(0);
final CharSequence lastVersion = record.getSym(2);
// if the configuration changed to enable or disable telemetry
// we need to update the table to reflect that
if (enabled != _enabled || !configuration.getBuildInformation().getQuestDbVersion().equals(lastVersion)) {
appendConfigRow(compiler, configWriter, l256, enabled);
LOG.advisory()
.$("instance config changes [id=").$256(l256.getLong0(), l256.getLong1(), 0, 0)
.$(", enabled=").$(enabled)
.$(']').$();
} else {
LOG.advisory()
.$("instance [id=").$256(l256.getLong0(), l256.getLong1(), 0, 0)
.$(", enabled=").$(enabled)
.$(']').$();
}
} else {
// if there are no record for telemetry id we need to create one using clocks
appendConfigRow(compiler, configWriter, null, enabled);
}
}
return configWriter;
}
private void appendConfigRow(SqlCompiler compiler, TableWriter configWriter, Long256 id, boolean enabled) {
TableWriter.Row row = configWriter.newRow();
if (null == id) {
......@@ -253,4 +202,54 @@ public class TelemetryJob extends SynchronizedJob implements Closeable {
.$(']').$();
}
}
private void tryAddColumn(SqlCompiler compiler, SqlExecutionContext executionContext, CharSequence columnDetails) {
try {
CompiledQuery cc = compiler.compile(
"ALTER TABLE " + configTableName + " ADD COLUMN " + columnDetails,
executionContext
);
try (QueryFuture execution = cc.execute(tempSequence)) {
execution.await();
}
} catch (SqlException ex) {
LOG.info().$("Failed to alter telemetry table [table=").$(configTableName).$(",error=").$(ex.getFlyweightMessage()).I$();
}
}
private TableWriter updateTelemetryConfig(
SqlCompiler compiler,
SqlExecutionContextImpl sqlExecutionContext,
boolean enabled
) throws SqlException {
final TableWriter configWriter = compiler.getEngine().getWriter(AllowAllCairoSecurityContext.INSTANCE, configTableName, WRITER_LOCK_REASON);
final CompiledQuery cc = compiler.compile(configTableName + " LIMIT -1", sqlExecutionContext);
try (final RecordCursor cursor = cc.getRecordCursorFactory().getCursor(sqlExecutionContext)) {
if (cursor.hasNext()) {
final Record record = cursor.getRecord();
final boolean _enabled = record.getBool(1);
Long256 l256 = record.getLong256A(0);
final CharSequence lastVersion = record.getSym(2);
// if the configuration changed to enable or disable telemetry
// we need to update the table to reflect that
if (enabled != _enabled || !configuration.getBuildInformation().getQuestDbVersion().equals(lastVersion)) {
appendConfigRow(compiler, configWriter, l256, enabled);
LOG.advisory()
.$("instance config changes [id=").$256(l256.getLong0(), l256.getLong1(), 0, 0)
.$(", enabled=").$(enabled)
.$(']').$();
} else {
LOG.advisory()
.$("instance [id=").$256(l256.getLong0(), l256.getLong1(), 0, 0)
.$(", enabled=").$(enabled)
.$(']').$();
}
} else {
// if there are no record for telemetry id we need to create one using clocks
appendConfigRow(compiler, configWriter, null, enabled);
}
}
return configWriter;
}
}
......@@ -24,9 +24,10 @@
package io.questdb.griffin.engine.functions.catalogue;
import io.questdb.TelemetryJob;
import io.questdb.cairo.*;
import io.questdb.cairo.sql.*;
import io.questdb.cairo.sql.Record;
import io.questdb.cairo.sql.*;
import io.questdb.griffin.FunctionFactory;
import io.questdb.griffin.SqlExecutionContext;
import io.questdb.griffin.engine.functions.CursorFunction;
......@@ -67,7 +68,10 @@ public class TableMetadataCursorFactory implements FunctionFactory {
SqlExecutionContext sqlExecutionContext
) {
return new CursorFunction(
new TableMetadataCursor(configuration.getFilesFacade(), configuration.getRoot())
new TableMetadataCursor(
configuration.getFilesFacade(),
configuration.getRoot(),
configuration.getTelemetryConfiguration().hideTables())
) {
@Override
public boolean isRuntimeConstant() {
......@@ -79,12 +83,14 @@ public class TableMetadataCursorFactory implements FunctionFactory {
private static class TableMetadataCursor implements RecordCursorFactory {
private final FilesFacade ff;
private final TableListRecordCursor cursor;
private final boolean hideTelemetryTables;
private Path path;
public TableMetadataCursor(FilesFacade ff, CharSequence dbRoot) {
public TableMetadataCursor(FilesFacade ff, CharSequence dbRoot, boolean hideTelemetryTables) {
this.ff = ff;
path = new Path().of(dbRoot).$();
cursor = new TableListRecordCursor();
this.hideTelemetryTables = hideTelemetryTables;
}
@Override
......@@ -234,6 +240,11 @@ public class TableMetadataCursorFactory implements FunctionFactory {
}
public boolean open(CharSequence tableName) {
if (hideTelemetryTables && (Chars.equals(tableName, TelemetryJob.tableName) || Chars.equals(tableName, TelemetryJob.configTableName))) {
return false;
}
int pathLen = path.length();
try {
path.chop$().concat(tableName).concat(META_FILE_NAME).$();
......
......@@ -509,9 +509,20 @@ query.timeout.sec=60
################ Telemetry settings ##################
# Telemetry switch. Telemetry events are use to identify components of questdb that are being used. They never identify
# user not data stored in questdb. All events can be viewed via `select * from telemetry`. Switching telemetry off will
# stop all events from being collected and subsequent growth of `telemetry` table. After telemetry is switched off
# `telemetry` table cab be dropped.
#telemetry.enabled=true
# Capacity of the internal telemetry queue, which is the gateway of all telemetry events.
# This queue capacity does not require tweaking.
#telemetry.queue.capacity=512
# hides telemetry tables from `select * from tables()` output. As a result, telemetry tables
# will not be visible in the Web Console table view
#telemetry.hide.tables=true
################ Metrics settings ##################
#metrics.enabled=true
......@@ -24,8 +24,10 @@
package io.questdb.cairo;
import io.questdb.DefaultTelemetryConfiguration;
import io.questdb.MessageBus;
import io.questdb.Metrics;
import io.questdb.TelemetryConfiguration;
import io.questdb.cairo.sql.RecordCursor;
import io.questdb.cairo.sql.RecordMetadata;
import io.questdb.griffin.DatabaseSnapshotAgent;
......@@ -81,6 +83,8 @@ public class AbstractCairoTest {
public static long writerAsyncCommandBusyWaitTimeout = -1;
public static long writerAsyncCommandMaxTimeout = -1;
public static long spinLockTimeoutUs = -1;
protected static boolean hideTelemetryTable = false;
private static TelemetryConfiguration telemetryConfiguration;
@BeforeClass
public static void setUpStatic() {
......@@ -94,6 +98,14 @@ public class AbstractCairoTest {
} catch (IOException e) {
throw new ExceptionInInitializerError();
}
telemetryConfiguration = new DefaultTelemetryConfiguration() {
@Override
public boolean hideTables() {
return hideTelemetryTable;
}
};
configuration = new DefaultCairoConfiguration(root) {
@Override
public FilesFacade getFilesFacade() {
......@@ -211,6 +223,11 @@ public class AbstractCairoTest {
public boolean isSnapshotRecoveryEnabled() {
return snapshotRecoveryEnabled == null ? super.isSnapshotRecoveryEnabled() : snapshotRecoveryEnabled;
}
@Override
public TelemetryConfiguration getTelemetryConfiguration() {
return telemetryConfiguration;
}
};
engine = new CairoEngine(configuration, metrics);
snapshotAgent = new DatabaseSnapshotAgent(engine);
......@@ -251,6 +268,7 @@ public class AbstractCairoTest {
spinLockTimeoutUs = -1;
snapshotInstanceId = null;
snapshotRecoveryEnabled = null;
hideTelemetryTable = false;
}
protected static void assertMemoryLeak(TestUtils.LeakProneCode code) throws Exception {
......
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (c) 2014-2019 Appsicle
* Copyright (c) 2019-2022 QuestDB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
******************************************************************************/
package io.questdb.griffin;
import io.questdb.TelemetryJob;
import io.questdb.test.tools.TestUtils;
import org.junit.Test;
public class HideTelemetryTablesTest extends AbstractGriffinTest {
@Test
public void testShow() throws Exception {
assertMemoryLeak(() -> {
compiler.compile("create table test(a int)", sqlExecutionContext);
compiler.compile("create table " + TelemetryJob.tableName + "(a int)", sqlExecutionContext);
compiler.compile("create table " + TelemetryJob.configTableName + "(a int)", sqlExecutionContext);
TestUtils.assertSql(
compiler,
sqlExecutionContext,
"tables order by 2",
sink,
"id\tname\tdesignatedTimestamp\tpartitionBy\tmaxUncommittedRows\tcommitLag\n" +
"2\ttelemetry\t\tNONE\t1000\t0\n" +
"3\ttelemetry_config\t\tNONE\t1000\t0\n" +
"1\ttest\t\tNONE\t1000\t0\n"
);
});
}
@Test
public void testHide() throws Exception {
hideTelemetryTable = true;
assertMemoryLeak(() -> {
compiler.compile("create table test(a int)", sqlExecutionContext);
compiler.compile("create table " + TelemetryJob.tableName + "(a int)", sqlExecutionContext);
compiler.compile("create table " + TelemetryJob.configTableName + "(a int)", sqlExecutionContext);
TestUtils.assertSql(
compiler,
sqlExecutionContext,
"tables",
sink,
"id\tname\tdesignatedTimestamp\tpartitionBy\tmaxUncommittedRows\tcommitLag\n" +
"1\ttest\t\tNONE\t1000\t0\n"
);
});
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册