From dda4ddcedab9d96e33424134be095f8fe287fef1 Mon Sep 17 00:00:00 2001 From: zhihaop Date: Thu, 15 Sep 2022 17:53:57 +0800 Subject: [PATCH] feat: add config for async batch insertion --- packaging/cfg/taos.cfg | 9 ++++++++ src/client/inc/tscBulkWrite.h | 7 +----- src/client/src/tscBulkWrite.c | 5 ++++- src/common/src/tglobal.c | 42 ++++++++++++++++++++++++++++++----- src/util/inc/tconfig.h | 2 +- 5 files changed, 51 insertions(+), 14 deletions(-) diff --git a/packaging/cfg/taos.cfg b/packaging/cfg/taos.cfg index 7d77a0b23e..8206c138b2 100644 --- a/packaging/cfg/taos.cfg +++ b/packaging/cfg/taos.cfg @@ -310,3 +310,12 @@ keepColumnName 1 # unit Hour. Latency of data migration # keepTimeOffset 0 + +# taosc async batching insertion +# asyncBatchEnable 1 + +# taosc async insertion batchsize +# asyncBatchSize 256 + +# taosc async batching timeout in milliseconds +# asyncBatchTimeout 5 \ No newline at end of file diff --git a/src/client/inc/tscBulkWrite.h b/src/client/inc/tscBulkWrite.h index 1bb3b8adbd..2278c9b8c5 100644 --- a/src/client/inc/tscBulkWrite.h +++ b/src/client/inc/tscBulkWrite.h @@ -78,7 +78,7 @@ SArray* dispatcherPollAll(SAsyncBulkWriteDispatcher* dispatcher); * * @param dispatcher the async bulk write dispatcher. * @param pSql the sql object to offer. - * @return int32_t if offer success, return the current size of the buffer. otherwise returns -1. + * @return if offer success, return the current size of the buffer. otherwise returns -1. */ int32_t dispatcherTryOffer(SAsyncBulkWriteDispatcher* dispatcher, SSqlObj* pSql); @@ -89,11 +89,6 @@ int32_t dispatcherTryOffer(SAsyncBulkWriteDispatcher* dispatcher, SSqlObj* pSql) */ void dispatcherExecute(SArray* statements); -/** - * The thread to manage batching timeout. - */ -void* dispatcherTimeoutCallback(void* arg); - /** * Create the async bulk write dispatcher. * diff --git a/src/client/src/tscBulkWrite.c b/src/client/src/tscBulkWrite.c index 998a5ec0d4..982214d4d7 100644 --- a/src/client/src/tscBulkWrite.c +++ b/src/client/src/tscBulkWrite.c @@ -232,7 +232,10 @@ void dispatcherExecute(SArray* statements) { taosArrayDestroy(&statements); } -void* dispatcherTimeoutCallback(void* arg) { +/** + * The thread to manage batching timeout. + */ +static void* dispatcherTimeoutCallback(void* arg) { SAsyncBulkWriteDispatcher* dispatcher = arg; setThreadName("tscBackground"); diff --git a/src/common/src/tglobal.c b/src/common/src/tglobal.c index 9138ed7a26..96602e3375 100644 --- a/src/common/src/tglobal.c +++ b/src/common/src/tglobal.c @@ -121,15 +121,14 @@ float tsStreamComputDelayRatio = 0.1f; int32_t tsProjectExecInterval = 10000; // every 10sec, the projection will be executed once int64_t tsMaxRetentWindow = 24 * 3600L; // maximum time window tolerance -// The async insertion auto batch feature. -// When user submit an insert statement to `taos_query_ra`, the statement will be buffered asynchronously in the -// execution queue instead of executing it. If the number of the buffered statements reach `tsAsyncBatchLen`, all -// the statements in the queue will be merged and sent to vnodes. +// The async insertion batching feature. +// When user submit an insert statement to `taos_query_ra`, the statement will be buffered asynchronously instead of executing it. +// If the number of the buffered statements reach `tsAsyncBatchSize`, all the statements in the queue will be merged and sent to vnodes. // The statements will be sent to vnodes no more than `tsAsyncBatchTimeout` milliseconds. But the actual time vnodes // received the statements depends on the network quality. bool tsAsyncBatchEnable = true; -int32_t tsAsyncBatchSize = 128; -int64_t tsAsyncBatchTimeout = 50; +int32_t tsAsyncBatchSize = 256; +int64_t tsAsyncBatchTimeout = 5; // the maximum allowed query buffer size during query processing for each data node. // -1 no limit (default) @@ -1833,6 +1832,37 @@ static void doInitGlobalConfig(void) { cfg.ptrLength = 0; cfg.unitType = TAOS_CFG_UTYPE_NONE; taosInitConfigOption(cfg); + + cfg.option = "asyncBatchEnable"; + cfg.ptr = &tsAsyncBatchEnable; + cfg.valType = TAOS_CFG_VTYPE_INT8; + cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG; + cfg.minValue = 0; + cfg.maxValue = 1; + cfg.ptrLength = 0; + cfg.unitType = TAOS_CFG_UTYPE_NONE; + taosInitConfigOption(cfg); + + cfg.option = "asyncBatchSize"; + cfg.ptr = &tsAsyncBatchSize; + cfg.valType = TAOS_CFG_VTYPE_INT32; + cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG; + cfg.minValue = 1; + cfg.maxValue = 65536; + cfg.ptrLength = 0; + cfg.unitType = TAOS_CFG_UTYPE_NONE; + taosInitConfigOption(cfg); + + cfg.option = "asyncBatchTimeout"; + cfg.ptr = &tsAsyncBatchTimeout; + cfg.valType = TAOS_CFG_VTYPE_INT32; + cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG; + cfg.minValue = 0; + cfg.maxValue = 65536; + cfg.ptrLength = 0; + cfg.unitType = TAOS_CFG_UTYPE_NONE; + taosInitConfigOption(cfg); + assert(tsGlobalConfigNum == TSDB_CFG_MAX_NUM); #else // if TD_TSZ macro define, have 5 count configs, so must add 5 diff --git a/src/util/inc/tconfig.h b/src/util/inc/tconfig.h index 872da82a8e..cd46d51de1 100644 --- a/src/util/inc/tconfig.h +++ b/src/util/inc/tconfig.h @@ -20,7 +20,7 @@ extern "C" { #endif -#define TSDB_CFG_MAX_NUM 134 +#define TSDB_CFG_MAX_NUM 137 #define TSDB_CFG_PRINT_LEN 23 #define TSDB_CFG_OPTION_LEN 24 #define TSDB_CFG_VALUE_LEN 41 -- GitLab