diff --git a/packaging/cfg/taos.cfg b/packaging/cfg/taos.cfg index 295b30c56847d540f794a41d611596348cded913..85e7959aa795a9d7b84077f0b58b81c6799c40f5 100644 --- a/packaging/cfg/taos.cfg +++ b/packaging/cfg/taos.cfg @@ -317,7 +317,7 @@ keepColumnName 1 # enable thread local batch dispatcher # asyncBatchThreadLocal 1 -# taosc async insertion batch size, maximum 65535 +# taosc async insertion batch size, maximum 4096 # asyncBatchSize 256 # taosc async batching timeout in milliseconds, maximum 2048 diff --git a/src/client/inc/tscDataBlockMerge.h b/src/client/inc/tscBatchMerge.h similarity index 95% rename from src/client/inc/tscDataBlockMerge.h rename to src/client/inc/tscBatchMerge.h index a5b0b6e36d5593653d96448e03e32c65a4fd3386..0cfc7a9d5d6959a7b76976ce5b285ce579bb2454 100644 --- a/src/client/inc/tscDataBlockMerge.h +++ b/src/client/inc/tscBatchMerge.h @@ -13,8 +13,8 @@ * along with this program. If not, see . */ -#ifndef TDENGINE_TSCDATABLOCKMERGE_H -#define TDENGINE_TSCDATABLOCKMERGE_H +#ifndef TDENGINE_TSCBATCHMERGE_H +#define TDENGINE_TSCBATCHMERGE_H #include "hash.h" #include "taosmsg.h" @@ -260,14 +260,15 @@ SName** buildSTableNameListBuilder(STableNameListBuilder* builder, size_t* numOf * Merge the KV-PayLoad SQL objects into single one. * The statements here must be an insertion statement and no schema attached. * - * @param statements the array of statements. a.k.a SArray. - * @param result the returned result. result is not null! - * @return the status code. usually TSDB_CODE_SUCCESS. + * @param polls the array of SSqlObj*. + * @param nPolls the number of SSqlObj* in the array. + * @param result the returned result. result is not null! + * @return the status code. */ -int32_t tscMergeKVPayLoadSqlObj(SArray* statements, SSqlObj *result); +int32_t tscMergeSSqlObjs(SSqlObj** polls, size_t nPolls, SSqlObj *result); #ifdef __cplusplus } #endif -#endif // TDENGINE_TSCDATABLOCKMERGE_H +#endif // TDENGINE_TSCBATCHMERGE_H diff --git a/src/client/inc/tscBatchWrite.h b/src/client/inc/tscBatchWrite.h new file mode 100644 index 0000000000000000000000000000000000000000..95a6980eded773b9a588eed3b6e6201f4e0e5cf4 --- /dev/null +++ b/src/client/inc/tscBatchWrite.h @@ -0,0 +1,215 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_TSCBATCHWRITE_H +#define TDENGINE_TSCBATCHWRITE_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include "tthread.h" + +// forward declaration. +typedef struct SSqlObj SSqlObj; +typedef struct SDispatcherTimeoutManager SDispatcherTimeoutManager; + +/** + * SAsyncBatchWriteDispatcher is an async batching write dispatcher (ABWD). ABWD accepts the recent SQL requests and put + * them in a queue waiting to be scheduled. When the number of requests in the queue reaches batch_size, it merges the + * requests in the queue and sends them to the server, thus reducing the network overhead caused by multiple + * communications to the server and directly improving the throughput of small object asynchronous writes. + */ +typedef struct SAsyncBatchWriteDispatcher { + // the timeout manager. + SDispatcherTimeoutManager* timeoutManager; + + // the mutex to protect the dispatcher. + pthread_mutex_t bufferMutex; + + // the cond to signal when buffer not full. + pthread_cond_t notFull; + + // the maximum number of insertion rows in a batch. + int32_t batchSize; + + // the number of insertion rows in the buffer. + int32_t currentSize; + + // the number of items in the buffer. + int32_t bufferSize; + + // whether the dispatcher is shutdown. + volatile bool shutdown; + + SSqlObj* buffer[]; +} SAsyncBatchWriteDispatcher; + +/** + * The manager of SAsyncBatchWriteDispatcher. Call dispatcherAcquire(...) to get the SAsyncBatchWriteDispatcher + * instance. SDispatcherManager will manage the life cycle of SAsyncBatchWriteDispatcher. + */ +typedef struct SDispatcherManager { + pthread_key_t key; + + // the maximum number of insertion rows in a batch. + int32_t batchSize; + + // the batching timeout in milliseconds. + int32_t timeoutMs; + + // specifies whether the dispatcher is thread local, if the dispatcher is not + // thread local, we will use the global dispatcher below. + bool isThreadLocal; + + // the global dispatcher, if thread local enabled, global will be set to NULL. + SAsyncBatchWriteDispatcher* global; + +} SDispatcherManager; + +/** + * Control the timeout of the dispatcher queue. + */ +typedef struct SDispatcherTimeoutManager { + // the dispatcher that timeout manager belongs to. + SAsyncBatchWriteDispatcher* dispatcher; + + // the background thread. + pthread_t background; + + // the mutex to sleep the background thread. + pthread_mutex_t sleepMutex; + + // the cond to signal to background thread. + pthread_cond_t timeout; + + // the batching timeout in milliseconds. + int32_t timeoutMs; + + // whether the timeout manager is shutdown. + volatile bool shutdown; +} SDispatcherTimeoutManager; + +/** + * Create the dispatcher timeout manager. + */ +SDispatcherTimeoutManager* createSDispatcherTimeoutManager(SAsyncBatchWriteDispatcher* dispatcher, int32_t timeoutMs); + +/** + * Destroy the dispatcher timeout manager. + */ +void destroySDispatcherTimeoutManager(SDispatcherTimeoutManager* manager); + +/** + * Check if the timeout manager is shutdown. + * @param manager the timeout manager. + * @return whether the timeout manager is shutdown. + */ +bool isShutdownSDispatcherTimeoutManager(SDispatcherTimeoutManager* manager); + +/** + * Shutdown the SDispatcherTimeoutManager. + * @param manager the SDispatcherTimeoutManager. + */ +void shutdownSDispatcherTimeoutManager(SDispatcherTimeoutManager* manager); + +/** + * Merge the statements into single SSqlObj. + * + * @param fp the callback of SSqlObj. + * @param param the parameters of the callback. + * @param polls the array of SSqlObj*. + * @param nPolls the number of SSqlObj* in the array. + * @return the merged SSqlObj. + */ +int32_t dispatcherBatchMerge(SSqlObj** polls, size_t nPolls, SSqlObj** result); + +/** + * Merge the sql statements and execute the merged sql statement. + * + * @param polls the array of SSqlObj*. + * @param nPolls the number of SSqlObj* in the array. + */ +void dispatcherExecute(SSqlObj** polls, size_t nPolls); + +/** + * Create the async batch write dispatcher. + * + * @param batchSize When user submit an insert statement to `taos_query_ra`, the statement will be buffered + * asynchronously in the buffer instead of executing it. If the number of the buffered + * statements reach batchLen, all the statements in the buffer will be merged and sent to vnodes. + * @param timeout The statements will be sent to vnodes no more than timeout milliseconds. But the actual time + * vnodes received the statements depends on the network quality. + */ +SAsyncBatchWriteDispatcher* createSAsyncBatchWriteDispatcher(int32_t batchSize, int32_t timeoutMs); + +/** + * Destroy the async auto batch dispatcher. + */ +void destroySAsyncBatchWriteDispatcher(SAsyncBatchWriteDispatcher* dispatcher); + +/** + * Check if the current sql object can be dispatch by ABWD. + * 1. auto batch feature on the sql object must be enabled. + * 2. must be an `insert into ... value ...` statement. + * 3. the payload type must be kv payload. + * 4. no schema attached. + * + * @param dispatcher the dispatcher. + * @param pSql the sql object to check. + * @return returns true if the sql object can be dispatch by ABWD. + */ +bool dispatcherCanDispatch(SAsyncBatchWriteDispatcher* dispatcher, SSqlObj* pSql); + +/** + * Try to offer the SSqlObj* to the dispatcher. If the number of row reach `batchSize`, the function + * will merge the SSqlObj* in the buffer and send them to the vnodes. + * + * @param pSql the insert statement to offer. + * @return if offer success, returns true. + */ +bool dispatcherTryDispatch(SAsyncBatchWriteDispatcher* dispatcher, SSqlObj* pSql); + +/** + * Create the manager of SAsyncBatchWriteDispatcher. + * + * @param batchSize the batchSize of SAsyncBatchWriteDispatcher. + * @param timeoutMs the timeoutMs of SAsyncBatchWriteDispatcher. + * @param isThreadLocal specifies whether the dispatcher is thread local. + * @return the SAsyncBatchWriteDispatcher manager. + */ +SDispatcherManager* createDispatcherManager(int32_t batchSize, int32_t timeoutMs, bool isThreadLocal); + +/** + * Destroy the SDispatcherManager. + * (will destroy all the instances of SAsyncBatchWriteDispatcher in the thread local variable) + * + * @param manager the SDispatcherManager. + */ +void destroyDispatcherManager(SDispatcherManager* manager); + +/** + * Get an instance of SAsyncBatchWriteDispatcher. + * + * @param manager the SDispatcherManager. + * @return the SAsyncBatchWriteDispatcher instance. + */ +SAsyncBatchWriteDispatcher* dispatcherAcquire(SDispatcherManager* manager); + +#ifdef __cplusplus +} +#endif + +#endif // TDENGINE_TSCBATCHWRITE_H diff --git a/src/client/inc/tscBulkWrite.h b/src/client/inc/tscBulkWrite.h deleted file mode 100644 index 1c4d4aa65a09514ff1a8903b345656d3e61ca04d..0000000000000000000000000000000000000000 --- a/src/client/inc/tscBulkWrite.h +++ /dev/null @@ -1,176 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef TDENGINE_TSCBULKWRITE_H -#define TDENGINE_TSCBULKWRITE_H - -#ifdef __cplusplus -extern "C" { -#endif - -#include "tarray.h" -#include "tthread.h" - -// forward declaration. -typedef struct SSqlObj SSqlObj; - -/** - * SAsyncBulkWriteDispatcher is an async batching dispatcher(for writing), it can buffer insertion statements, batch - * and merge them into single statement. - */ -typedef struct SAsyncBulkWriteDispatcher { - // the buffer to store the insertion statements. equivalent to SArray. - SArray* buffer; - - // the mutex to protect the dispatcher. - pthread_mutex_t bufferMutex; - - // the mutex to sleep the background thread. - pthread_mutex_t sleepMutex; - - // the cond to signal to background thread. - pthread_cond_t timeout; - - // the cond to signal when buffer not full. - pthread_cond_t notFull; - - // the background thread to manage batching timeout. - pthread_t background; - - // the maximum number of insertion rows in a batch. - int32_t batchSize; - - // the batching timeout in milliseconds. - int32_t timeoutMs; - - // the number of insertion rows in the buffer. - int32_t currentSize; - - // whether the dispatcher is shutdown. - volatile bool shutdown; - -} SAsyncBulkWriteDispatcher; - -// forward declaration. -typedef struct SSqlObj SSqlObj; - -/** - * Merge the statements into single SSqlObj. - * - * @param fp the callback of SSqlObj. - * @param param the parameters of the callback. - * @param statements the sql statements represents in SArray. - * @return the merged SSqlObj. - */ -int32_t dispatcherStatementMerge(SArray* statements, SSqlObj** result); - -/** - * Merge the sql statements and execute the merged sql statement. - * - * @param statements the array of sql statement. a.k.a SArray. - */ -void dispatcherExecute(SArray* statements); - -/** - * Create the async bulk write dispatcher. - * - * @param batchSize When user submit an insert statement to `taos_query_ra`, the statement will be buffered - * asynchronously in the buffer instead of executing it. If the number of the buffered - * statements reach batchLen, all the statements in the buffer will be merged and sent to vnodes. - * @param timeout The statements will be sent to vnodes no more than timeout milliseconds. But the actual time - * vnodes received the statements depends on the network quality. - */ -SAsyncBulkWriteDispatcher* createAsyncBulkWriteDispatcher(int32_t batchSize, int32_t timeoutMs); - -/** - * Destroy the async auto batch dispatcher. - */ -void destroyAsyncDispatcher(SAsyncBulkWriteDispatcher* dispatcher); - -/** - * Check if the current sql object supports bulk insertion. - * 1. auto batch feature on the sql object must be enabled. - * 2. must be an `insert into ... value ...` statement. - * 3. the payload type must be kv payload. - * 4. no schema attached. - * - * @param dispatcher the async dispatcher. - * @param pSql the sql object to check. - * @return returns true if the sql object supports auto batch. - */ -bool tscSupportBulkInsertion(SAsyncBulkWriteDispatcher* dispatcher, SSqlObj* pSql); - -/** - * Try to offer the SSqlObj* to the dispatcher. If the number of row reach `batchSize`, the function - * will merge the SSqlObj* in the buffer and send them to the vnodes. - * - * @param pSql the insert statement to offer. - * @return if offer success, returns true. - */ -bool dispatcherTryDispatch(SAsyncBulkWriteDispatcher* dispatcher, SSqlObj* pSql); - -/** - * A holder of SAsyncBulkWriteDispatcher. Call dispatcherAcquire(...) to get the SAsyncBulkWriteDispatcher - * instance. This holder will manage the life cycle of SAsyncBulkWriteDispatcher. - */ -typedef struct SDispatcherHolder { - pthread_key_t key; - - // the maximum number of insertion rows in a batch. - int32_t batchSize; - - // the batching timeout in milliseconds. - int32_t timeoutMs; - - // specifies whether the dispatcher is thread local, if the dispatcher is not - // thread local, we will use the global dispatcher below. - bool isThreadLocal; - - // the global dispatcher, if thread local enabled, global will be set to NULL. - SAsyncBulkWriteDispatcher * global; - -} SDispatcherHolder; - -/** - * Create a holder of SAsyncBulkWriteDispatcher. - * - * @param batchSize the batchSize of SAsyncBulkWriteDispatcher. - * @param timeoutMs the timeoutMs of SAsyncBulkWriteDispatcher. - * @param isThreadLocal specifies whether the dispatcher is thread local. - * @return the SAsyncBulkWriteDispatcher holder. - */ -SDispatcherHolder* createDispatcherHolder(int32_t batchSize, int32_t timeoutMs, bool isThreadLocal); - -/** - * Destroy the holder of SAsyncBulkWriteDispatcher. - * (will destroy all the instances of SAsyncBulkWriteDispatcher in the thread local variable) - * - * @param holder the holder of SAsyncBulkWriteDispatcher. - */ -void destroyDispatcherHolder(SDispatcherHolder* holder); - -/** - * Get an instance of SAsyncBulkWriteDispatcher. - * - * @param holder the holder of SAsyncBulkWriteDispatcher. - * @return the SAsyncBulkWriteDispatcher instance. - */ -SAsyncBulkWriteDispatcher* dispatcherAcquire(SDispatcherHolder* holder); - -#ifdef __cplusplus -} -#endif - -#endif // TDENGINE_TSCBULKWRITE_H diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index f71b345b17be40d0a6b411d9f84a5bccd9da053f..3aaf8d5216001771c2933b51c3f34d665d7a8863 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -511,18 +511,18 @@ int32_t taos_unused_session(TAOS* taos); void waitForQueryRsp(void *param, TAOS_RES *tres, int code); /** - * Init the taosc async bulk write dispatcher. + * Init the manager of async batch write dispatcher. * - * @param batchSize the batchSize of async bulk write dispatcher. - * @param timeoutMs the timeout of batching in milliseconds. + * @param batchSize the batchSize of async batch write dispatcher. + * @param timeoutMs the timeout of batching in milliseconds. * @param isThreadLocal specifies whether the dispatcher is thread local. */ -void tscInitAsyncDispatcher(int32_t batchSize, int32_t timeoutMs, bool isThreadLocal); +void tscInitDispatcherManager(int32_t batchSize, int32_t timeoutMs, bool isThreadLocal); /** - * Destroy the async auto batch dispatcher. + * Destroy the manager of async batch write dispatcher. */ -void tscDestroyAsyncDispatcher(); +void tscDestroyDispatcherManager(); void doAsyncQuery(STscObj *pObj, SSqlObj *pSql, __async_cb_func_t fp, void *param, const char *sqlstr, size_t sqlLen); @@ -553,8 +553,8 @@ extern SHashObj *tscTableMetaMap; extern SCacheObj *tscVgroupListBuf; // forward declaration. -typedef struct SDispatcherHolder SDispatcherHolder; -extern SDispatcherHolder *tscDispatcher; +typedef struct SDispatcherManager SDispatcherManager; +extern SDispatcherManager *tscDispatcherManager; extern int tscObjRef; extern void *tscTmr; extern void *tscQhandle; diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index e5d7d794be09261c9c78d2a7923f98c64a850099..f4b1d392301f3f979fdfc1b2ffab4907df387ea6 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -22,11 +22,11 @@ #include "qTableMeta.h" #include "tnote.h" #include "trpc.h" +#include "tscBatchWrite.h" #include "tscLog.h" #include "tscSubquery.h" #include "tscUtil.h" #include "tsclient.h" -#include "tscBulkWrite.h" static void tscAsyncQueryRowsForNextVnode(void *param, TAOS_RES *tres, int numOfRows); @@ -395,8 +395,8 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, __async_cb_func_t fp, void* para return; } - if (tscDispatcher != NULL) { - SAsyncBulkWriteDispatcher* dispatcher = dispatcherAcquire(tscDispatcher); + if (tscDispatcherManager != NULL) { + SAsyncBatchWriteDispatcher * dispatcher = dispatcherAcquire(tscDispatcherManager); if (dispatcherTryDispatch(dispatcher, pSql)) { taosReleaseRef(tscObjRef, pSql->self); tscDebug("sql obj %p has been buffer in insert buffer", pSql); diff --git a/src/client/src/tscDataBlockMerge.c b/src/client/src/tscBatchMerge.c similarity index 98% rename from src/client/src/tscDataBlockMerge.c rename to src/client/src/tscBatchMerge.c index 197e6561b869518e81a6f9bbe17466957c494044..25d65bf734473b213b21df0aecf6995ca711b500 100644 --- a/src/client/src/tscDataBlockMerge.c +++ b/src/client/src/tscBatchMerge.c @@ -13,7 +13,7 @@ * along with this program. If not, see . */ -#include "tscDataBlockMerge.h" +#include "tscBatchMerge.h" /** * A util function to compare two SName. @@ -456,9 +456,9 @@ bool insertSTableNameListBuilder(STableNameListBuilder* builder, SName* name) { return taosArrayPush(builder->tableNames, &name); } -int32_t tscMergeKVPayLoadSqlObj(SArray* statements, SSqlObj* result) { +int32_t tscMergeSSqlObjs(SSqlObj** polls, size_t nPolls, SSqlObj* result) { // statement array is empty. - if (!statements || !taosArrayGetSize(statements)) { + if (!polls || !nPolls) { return TSDB_CODE_TSC_INVALID_OPERATION; } @@ -474,8 +474,8 @@ int32_t tscMergeKVPayLoadSqlObj(SArray* statements, SSqlObj* result) { } // append the existing data blocks to builder. - for (size_t i = 0; i < taosArrayGetSize(statements); ++i) { - SSqlObj *pSql = taosArrayGetP(statements, i); + for (size_t i = 0; i < nPolls; ++i) { + SSqlObj *pSql = polls[i]; SInsertStatementParam* pInsertParam = &pSql->cmd.insertParam; if (!pInsertParam->pDataBlocks) { continue; diff --git a/src/client/src/tscBulkWrite.c b/src/client/src/tscBatchWrite.c similarity index 56% rename from src/client/src/tscBulkWrite.c rename to src/client/src/tscBatchWrite.c index 1a970a7cfe448da8fead8645a51cc10766638b19..4cf3d455ba21cd0969cd3cac28415ca6f2cce032 100644 --- a/src/client/src/tscBulkWrite.c +++ b/src/client/src/tscBatchWrite.c @@ -15,11 +15,11 @@ #include "osAtomic.h" -#include "tscDataBlockMerge.h" -#include "tscBulkWrite.h" -#include "tscLog.h" +#include "tscBatchMerge.h" +#include "tscBatchWrite.h" #include "tscSubquery.h" #include "tsclient.h" +#include "tscLog.h" /** * Represents the callback function and its context. @@ -27,15 +27,15 @@ typedef struct { __async_cb_func_t fp; void* param; -} Runnable; +} SCallbackHandler; /** * The context of `batchResultCallback`. */ typedef struct { - size_t count; - Runnable runnable[]; -} BatchCallBackContext; + size_t nHandlers; + SCallbackHandler handler[]; +} SBatchCallbackContext; /** * Get the number of insertion row in the sql statement. @@ -70,8 +70,8 @@ inline static void tscReturnsError(SSqlObj* pSql, int code) { * @param code the error code. */ static void batchResultCallback(void* param, TAOS_RES* tres, int32_t code) { - BatchCallBackContext* context = param; - SSqlObj* res = tres; + SBatchCallbackContext* context = param; + SSqlObj* res = tres; // handle corner case [context == null]. if (context == NULL) { @@ -90,32 +90,27 @@ static void batchResultCallback(void* param, TAOS_RES* tres, int32_t code) { } // handle results. - tscDebug("async batch result callback, number of item: %zu", context->count); - for (int i = 0; i < context->count; ++i) { + tscDebug("async batch result callback, number of item: %zu", context->nHandlers); + for (int i = 0; i < context->nHandlers; ++i) { // the result object is shared by many sql objects. // therefore, we need to increase the ref count. taosAcquireRef(tscObjRef, res->self); - Runnable* runnable = &context->runnable[i]; - runnable->fp(runnable->param, res, res == NULL ? code : taos_errno(res)); + SCallbackHandler* handler = &context->handler[i]; + handler->fp(handler->param, res, code); } taosReleaseRef(tscObjRef, res->self); free(context); } -int32_t dispatcherStatementMerge(SArray* statements, SSqlObj** result) { - if (statements == NULL) { +int32_t dispatcherBatchMerge(SSqlObj** polls, size_t nPolls, SSqlObj** result) { + if (!polls || !nPolls) { return TSDB_CODE_SUCCESS; } - - size_t count = taosArrayGetSize(statements); - if (count == 0) { - return TSDB_CODE_SUCCESS; - } - + // create the callback context. - BatchCallBackContext* context = calloc(1, sizeof(BatchCallBackContext) + count * sizeof(Runnable)); + SBatchCallbackContext* context = calloc(1, sizeof(SBatchCallbackContext) + nPolls * sizeof(SCallbackHandler)); if (context == NULL) { return TSDB_CODE_TSC_OUT_OF_MEMORY; } @@ -123,17 +118,17 @@ int32_t dispatcherStatementMerge(SArray* statements, SSqlObj** result) { tscDebug("create batch call back context: %p", context); // initialize the callback context. - context->count = count; - for (size_t i = 0; i < count; ++i) { - SSqlObj* statement = taosArrayGetP(statements, i); - context->runnable[i].fp = statement->fp; - context->runnable[i].param = statement->param; + context->nHandlers = nPolls; + for (size_t i = 0; i < nPolls; ++i) { + SSqlObj* pSql = polls[i]; + context->handler[i].fp = pSql->fp; + context->handler[i].param = pSql->param; } // merge the statements into single one. - tscDebug("start to merge %zu sql objs", count); - SSqlObj *pFirst = taosArrayGetP(statements, 0); - int32_t code = tscMergeKVPayLoadSqlObj(statements, pFirst); + tscDebug("start to merge %zu sql objs", nPolls); + SSqlObj *pFirst = polls[0]; + int32_t code = tscMergeSSqlObjs(polls, nPolls, pFirst); if (code != TSDB_CODE_SUCCESS) { const char* msg = tstrerror(code); tscDebug("failed to merge sql objects: %s", msg); @@ -147,8 +142,8 @@ int32_t dispatcherStatementMerge(SArray* statements, SSqlObj** result) { pFirst->fetchFp = pFirst->fp; *result = pFirst; - for (int i = 1; i < count; ++i) { - SSqlObj *pSql = taosArrayGetP(statements, i); + for (int i = 1; i < nPolls; ++i) { + SSqlObj *pSql = polls[i]; taosReleaseRef(tscObjRef, pSql->self); } return code; @@ -159,36 +154,43 @@ int32_t dispatcherStatementMerge(SArray* statements, SSqlObj** result) { * you need to notify dispatcher->notFull by yourself. * * @param dispatcher the dispatcher. - * @return the items in the dispatcher, SArray. + * @param nPolls the number of polled SSqlObj*. + * @return all the SSqlObj* in the buffer. */ -inline static SArray* dispatcherPollAll(SAsyncBulkWriteDispatcher* dispatcher) { - if (!taosArrayGetSize(dispatcher->buffer)) { +inline static SSqlObj** dispatcherPollAll(SAsyncBatchWriteDispatcher* dispatcher, size_t* nPolls) { + if (!dispatcher->bufferSize) { + *nPolls = 0; return NULL; } - SArray* statements = taosArrayDup(dispatcher->buffer); - if (statements == NULL) { + SSqlObj** clone = malloc(sizeof(SSqlObj*) * dispatcher->bufferSize); + if (clone == NULL) { tscError("failed to poll all items: out of memory"); + *nPolls = 0; return NULL; } + memcpy(clone, dispatcher->buffer, sizeof(SSqlObj*) * dispatcher->bufferSize); + *nPolls = dispatcher->bufferSize; dispatcher->currentSize = 0; - taosArrayClear(dispatcher->buffer); - return statements; + dispatcher->bufferSize = 0; + return clone; } /** * Poll all the SSqlObj* in the dispatcher's buffer. * * @param dispatcher the dispatcher. - * @return the items in the dispatcher, SArray. + * @param nPolls the number of polled SSqlObj*. + * @return all the SSqlObj* in the buffer. */ -inline static SArray* dispatcherLockPollAll(SAsyncBulkWriteDispatcher* dispatcher) { +inline static SSqlObj** dispatcherLockPollAll(SAsyncBatchWriteDispatcher* dispatcher, size_t* nPolls) { + SSqlObj** polls = NULL; pthread_mutex_lock(&dispatcher->bufferMutex); - SArray* statements = dispatcherPollAll(dispatcher); + polls = dispatcherPollAll(dispatcher, nPolls); pthread_cond_broadcast(&dispatcher->notFull); pthread_mutex_unlock(&dispatcher->bufferMutex); - return statements; + return polls; } /** @@ -198,7 +200,7 @@ inline static SArray* dispatcherLockPollAll(SAsyncBulkWriteDispatcher* dispatche * @param pSql the sql object to offer. * @return return whether offer success. */ -inline static bool dispatcherTryOffer(SAsyncBulkWriteDispatcher* dispatcher, SSqlObj* pSql) { +inline static bool dispatcherTryOffer(SAsyncBatchWriteDispatcher* dispatcher, SSqlObj* pSql) { pthread_mutex_lock(&dispatcher->bufferMutex); // if dispatcher is shutdown, must fail back to normal insertion. @@ -216,44 +218,51 @@ inline static bool dispatcherTryOffer(SAsyncBulkWriteDispatcher* dispatcher, SSq } } - taosArrayPush(dispatcher->buffer, pSql); + dispatcher->buffer[dispatcher->bufferSize++] = pSql; dispatcher->currentSize += statementGetInsertionRows(pSql); tscDebug("sql obj %p has been write to insert buffer", pSql); - // the dispatcher has been shutdown or reach batch size. - if (atomic_load_8(&dispatcher->shutdown) || dispatcher->currentSize >= dispatcher->batchSize) { - SArray* statements = dispatcherPollAll(dispatcher); - dispatcherExecute(statements); - taosArrayDestroy(&statements); - pthread_cond_broadcast(&dispatcher->notFull); + if (dispatcher->currentSize < dispatcher->batchSize) { + pthread_mutex_unlock(&dispatcher->bufferMutex); + return true; } + + // the dispatcher reaches batch size. + size_t nPolls = 0; + SSqlObj** polls = dispatcherPollAll(dispatcher, &nPolls); + pthread_cond_broadcast(&dispatcher->notFull); pthread_mutex_unlock(&dispatcher->bufferMutex); + + if (polls) { + dispatcherExecute(polls, nPolls); + free(polls); + } return true; } -void dispatcherExecute(SArray* statements) { +void dispatcherExecute(SSqlObj** polls, size_t nPolls) { int32_t code = TSDB_CODE_SUCCESS; // no item in the buffer (items has been taken by other threads). - if (!statements || !taosArrayGetSize(statements)) { + if (!polls || !nPolls) { return; } // merge the statements into single one. SSqlObj* merged = NULL; - code = dispatcherStatementMerge(statements, &merged); + code = dispatcherBatchMerge(polls, nPolls, &merged); if (code != TSDB_CODE_SUCCESS) { goto _error; } - tscDebug("merging %zu sql objs into %p", taosArrayGetSize(statements), merged); + tscDebug("merging %zu sql objs into %p", nPolls, merged); tscHandleMultivnodeInsert(merged); return; _error: tscError("send async batch sql obj failed, reason: %s", tstrerror(code)); // handling the failures. - for (size_t i = 0; i < taosArrayGetSize(statements); ++i) { - SSqlObj* item = taosArrayGetP(statements, i); + for (size_t i = 0; i < nPolls; ++i) { + SSqlObj* item = polls[i]; tscReturnsError(item, code); } } @@ -277,77 +286,69 @@ static inline void afterMillis(struct timespec *t, int32_t millis) { * @param dispatcher the dispatcher thread to sleep. * @param timeout the timeout in CLOCK_REALTIME. */ -inline static void dispatcherSleepUntil(SAsyncBulkWriteDispatcher* dispatcher, struct timespec* timeout) { - pthread_mutex_lock(&dispatcher->sleepMutex); +inline static void timeoutManagerSleepUntil(SDispatcherTimeoutManager* manager, struct timespec* timeout) { + pthread_mutex_lock(&manager->sleepMutex); while (true) { // notified by dispatcherShutdown(...). - if (atomic_load_8(&dispatcher->shutdown)) { + if (isShutdownSDispatcherTimeoutManager(manager)) { break; } - if (pthread_cond_timedwait(&dispatcher->timeout, &dispatcher->sleepMutex, timeout)) { + if (pthread_cond_timedwait(&manager->timeout, &manager->sleepMutex, timeout)) { fflush(stdout); break; } } - pthread_mutex_unlock(&dispatcher->sleepMutex); + pthread_mutex_unlock(&manager->sleepMutex); } /** * The thread to manage batching timeout. */ -static void* dispatcherTimeoutCallback(void* arg) { - SAsyncBulkWriteDispatcher* dispatcher = arg; +static void* timeoutManagerCallback(void* arg) { + SDispatcherTimeoutManager* manager = arg; setThreadName("tscAsyncBackground"); - while (!atomic_load_8(&dispatcher->shutdown)) { + while (!isShutdownSDispatcherTimeoutManager(manager)) { struct timespec timeout; clock_gettime(CLOCK_REALTIME, &timeout); - afterMillis(&timeout, dispatcher->timeoutMs); + afterMillis(&timeout, manager->timeoutMs); + + size_t nPolls = 0; + SSqlObj** polls = dispatcherLockPollAll(manager->dispatcher, &nPolls); - SArray* statements = dispatcherLockPollAll(dispatcher); - dispatcherExecute(statements); - taosArrayDestroy(&statements); + if (polls) { + dispatcherExecute(polls, nPolls); + free(polls); + } // Similar to scheduleAtFixedRate in Java, if the execution time exceed // `timeoutMs` milliseconds, then there will be no sleep. - dispatcherSleepUntil(dispatcher, &timeout); + timeoutManagerSleepUntil(manager, &timeout); } return NULL; } -SAsyncBulkWriteDispatcher* createAsyncBulkWriteDispatcher(int32_t batchSize, int32_t timeoutMs) { - SAsyncBulkWriteDispatcher* dispatcher = calloc(1, sizeof(SAsyncBulkWriteDispatcher)); +SAsyncBatchWriteDispatcher* createSAsyncBatchWriteDispatcher(int32_t batchSize, int32_t timeoutMs) { + SAsyncBatchWriteDispatcher* dispatcher = calloc(1, sizeof(SAsyncBatchWriteDispatcher) + batchSize * sizeof(SSqlObj*)); if (!dispatcher) { return NULL; } dispatcher->currentSize = 0; + dispatcher->bufferSize = 0; dispatcher->batchSize = batchSize; - dispatcher->timeoutMs = timeoutMs; - atomic_store_8(&dispatcher->shutdown, false); - - // init the buffer. - dispatcher->buffer = taosArrayInit(batchSize, sizeof(SSqlObj*)); - if (!dispatcher->buffer) { - tfree(dispatcher); - return NULL; - } - + // init the mutex and the cond. pthread_mutex_init(&dispatcher->bufferMutex, NULL); - pthread_mutex_init(&dispatcher->sleepMutex, NULL); - pthread_cond_init(&dispatcher->timeout, NULL); pthread_cond_init(&dispatcher->notFull, NULL); - // init background thread. - if (pthread_create(&dispatcher->background, NULL, dispatcherTimeoutCallback, dispatcher)) { + // init timeout manager. + dispatcher->timeoutManager = createSDispatcherTimeoutManager(dispatcher, timeoutMs); + if (!dispatcher->timeoutManager) { pthread_mutex_destroy(&dispatcher->bufferMutex); - pthread_mutex_destroy(&dispatcher->sleepMutex); - pthread_cond_destroy(&dispatcher->timeout); pthread_cond_destroy(&dispatcher->notFull); - taosArrayDestroy(&dispatcher->buffer); - tfree(dispatcher); + free(dispatcher); return NULL; } @@ -359,18 +360,14 @@ SAsyncBulkWriteDispatcher* createAsyncBulkWriteDispatcher(int32_t batchSize, int * * @param dispatcher the dispatcher. */ -inline static void dispatcherShutdown(SAsyncBulkWriteDispatcher* dispatcher) { - // mark shutdown, signal shutdown to timeout thread. - pthread_mutex_lock(&dispatcher->sleepMutex); +inline static void dispatcherShutdown(SAsyncBatchWriteDispatcher* dispatcher) { atomic_store_8(&dispatcher->shutdown, true); - pthread_cond_broadcast(&dispatcher->timeout); - pthread_mutex_unlock(&dispatcher->sleepMutex); - - // make sure the timeout thread exit. - pthread_join(dispatcher->background, NULL); + if (dispatcher->timeoutManager) { + shutdownSDispatcherTimeoutManager(dispatcher->timeoutManager); + } } -void destroyAsyncDispatcher(SAsyncBulkWriteDispatcher* dispatcher) { +void destroySAsyncBatchWriteDispatcher(SAsyncBatchWriteDispatcher* dispatcher) { if (dispatcher == NULL) { return; } @@ -379,28 +376,25 @@ void destroyAsyncDispatcher(SAsyncBulkWriteDispatcher* dispatcher) { // poll and send all the statements in the buffer. while (true) { - SArray* statements = dispatcherLockPollAll(dispatcher); - if (!statements) { + size_t nPolls = 0; + SSqlObj** polls = dispatcherLockPollAll(dispatcher, &nPolls); + if (!polls) { break ; } - - dispatcherExecute(statements); - taosArrayDestroy(&statements); + dispatcherExecute(polls, nPolls); + free(polls); } - - // destroy the buffer. - taosArrayDestroy(&dispatcher->buffer); - + // destroy the timeout manager. + destroySDispatcherTimeoutManager(dispatcher->timeoutManager); + // destroy the mutex. pthread_mutex_destroy(&dispatcher->bufferMutex); - pthread_mutex_destroy(&dispatcher->sleepMutex); - pthread_cond_destroy(&dispatcher->timeout); pthread_cond_destroy(&dispatcher->notFull); - + free(dispatcher); } -bool tscSupportBulkInsertion(SAsyncBulkWriteDispatcher* dispatcher, SSqlObj* pSql) { +bool dispatcherCanDispatch(SAsyncBatchWriteDispatcher* dispatcher, SSqlObj* pSql) { if (pSql == NULL || !pSql->enableBatch) { return false; } @@ -438,13 +432,13 @@ bool tscSupportBulkInsertion(SAsyncBulkWriteDispatcher* dispatcher, SSqlObj* pSq return true; } -bool dispatcherTryDispatch(SAsyncBulkWriteDispatcher* dispatcher, SSqlObj* pSql) { +bool dispatcherTryDispatch(SAsyncBatchWriteDispatcher* dispatcher, SSqlObj* pSql) { if (atomic_load_8(&dispatcher->shutdown)) { return false; } // the sql object doesn't support bulk insertion. - if (!tscSupportBulkInsertion(dispatcher, pSql)) { + if (!dispatcherCanDispatch(dispatcher, pSql)) { return false; } @@ -453,20 +447,20 @@ bool dispatcherTryDispatch(SAsyncBulkWriteDispatcher* dispatcher, SSqlObj* pSql) } /** - * Destroy the SAsyncBulkWriteDispatcher create by SDispatcherHolder. - * @param arg the thread local SAsyncBulkWriteDispatcher. + * Destroy the SAsyncBatchWriteDispatcher create by SDispatcherManager. + * @param arg the thread local SAsyncBatchWriteDispatcher. */ static void destroyDispatcher(void* arg) { - SAsyncBulkWriteDispatcher* dispatcher = arg; + SAsyncBatchWriteDispatcher* dispatcher = arg; if (!dispatcher) { return; } - destroyAsyncDispatcher(dispatcher); + destroySAsyncBatchWriteDispatcher(dispatcher); } -SDispatcherHolder* createDispatcherHolder(int32_t batchSize, int32_t timeoutMs, bool isThreadLocal) { - SDispatcherHolder* dispatcher = calloc(1, sizeof(SDispatcherHolder)); +SDispatcherManager* createDispatcherManager(int32_t batchSize, int32_t timeoutMs, bool isThreadLocal) { + SDispatcherManager* dispatcher = calloc(1, sizeof(SDispatcherManager)); if (!dispatcher) { return NULL; } @@ -481,7 +475,7 @@ SDispatcherHolder* createDispatcherHolder(int32_t batchSize, int32_t timeoutMs, return NULL; } } else { - dispatcher->global = createAsyncBulkWriteDispatcher(batchSize, timeoutMs); + dispatcher->global = createSAsyncBatchWriteDispatcher(batchSize, timeoutMs); if (!dispatcher->global) { free(dispatcher); return NULL; @@ -490,32 +484,85 @@ SDispatcherHolder* createDispatcherHolder(int32_t batchSize, int32_t timeoutMs, return dispatcher; } -SAsyncBulkWriteDispatcher* dispatcherAcquire(SDispatcherHolder* holder) { - if (!holder->isThreadLocal) { - return holder->global; +SAsyncBatchWriteDispatcher* dispatcherAcquire(SDispatcherManager* manager) { + if (!manager->isThreadLocal) { + return manager->global; } - SAsyncBulkWriteDispatcher* value = pthread_getspecific(holder->key); + SAsyncBatchWriteDispatcher* value = pthread_getspecific(manager->key); if (value) { return value; } - value = createAsyncBulkWriteDispatcher(holder->batchSize, holder->timeoutMs); + value = createSAsyncBatchWriteDispatcher(manager->batchSize, manager->timeoutMs); if (value) { - pthread_setspecific(holder->key, value); + pthread_setspecific(manager->key, value); return value; } return NULL; } -void destroyDispatcherHolder(SDispatcherHolder* holder) { - if (holder) { - if (holder->isThreadLocal) { - pthread_key_delete(holder->key); +void destroyDispatcherManager(SDispatcherManager* manager) { + if (manager) { + if (manager->isThreadLocal) { + pthread_key_delete(manager->key); } else { - destroyAsyncDispatcher(holder->global); + destroySAsyncBatchWriteDispatcher(manager->global); } - free(holder); + free(manager); + } +} + +SDispatcherTimeoutManager* createSDispatcherTimeoutManager(SAsyncBatchWriteDispatcher* dispatcher, int32_t timeoutMs) { + SDispatcherTimeoutManager* manager = calloc(1, sizeof(SDispatcherTimeoutManager)); + if (!manager) { + return NULL; + } + + manager->timeoutMs = timeoutMs; + manager->dispatcher = dispatcher; + atomic_store_8(&manager->shutdown, false); + + pthread_mutex_init(&manager->sleepMutex, NULL); + pthread_cond_init(&manager->timeout, NULL); + + // init background thread. + if (pthread_create(&manager->background, NULL, timeoutManagerCallback, manager)) { + pthread_mutex_destroy(&manager->sleepMutex); + pthread_cond_destroy(&manager->timeout); + free(manager); + return NULL; + } + return manager; +} + +void destroySDispatcherTimeoutManager(SDispatcherTimeoutManager* manager) { + if (!manager) { + return; + } + + shutdownSDispatcherTimeoutManager(manager); + manager->dispatcher->timeoutManager = NULL; + + pthread_mutex_destroy(&manager->sleepMutex); + pthread_cond_destroy(&manager->timeout); + free(manager); +} + +void shutdownSDispatcherTimeoutManager(SDispatcherTimeoutManager* manager) { + // mark shutdown, signal shutdown to timeout thread. + pthread_mutex_lock(&manager->sleepMutex); + atomic_store_8(&manager->shutdown, true); + pthread_cond_broadcast(&manager->timeout); + pthread_mutex_unlock(&manager->sleepMutex); + + // make sure the timeout thread exit. + pthread_join(manager->background, NULL); +} +bool isShutdownSDispatcherTimeoutManager(SDispatcherTimeoutManager* manager) { + if (!manager) { + return true; } + return atomic_load_8(&manager->shutdown); } diff --git a/src/client/src/tscSystem.c b/src/client/src/tscSystem.c index 411c00bd376e1f2de0878d0b982c0f8ad5354526..90ac59bad86a7749d6d0e16a96bc82f200235b31 100644 --- a/src/client/src/tscSystem.c +++ b/src/client/src/tscSystem.c @@ -14,19 +14,19 @@ */ #include "os.h" +#include "qScript.h" #include "taosmsg.h" +#include "tconfig.h" +#include "tglobal.h" +#include "tnote.h" #include "tref.h" #include "trpc.h" -#include "tnote.h" -#include "ttimer.h" -#include "tsched.h" +#include "tscBatchWrite.h" #include "tscLog.h" +#include "tsched.h" #include "tsclient.h" -#include "tglobal.h" -#include "tconfig.h" +#include "ttimer.h" #include "ttimezone.h" -#include "qScript.h" -#include "tscBulkWrite.h" // global, not configurable #define TSC_VAR_NOT_RELEASE 1 @@ -50,7 +50,7 @@ void *tscRpcCache; // cache to keep rpc obj int32_t tscNumOfThreads = 1; // num of rpc threads char tscLogFileName[] = "taoslog"; int tscLogFileNum = 10; -SDispatcherHolder * tscDispatcher = NULL; +SDispatcherManager *tscDispatcherManager = NULL; static pthread_mutex_t rpcObjMutex; // mutex to protect open the rpc obj concurrently static pthread_once_t tscinit = PTHREAD_ONCE_INIT; @@ -59,16 +59,16 @@ static pthread_mutex_t setConfMutex = PTHREAD_MUTEX_INITIALIZER; // pthread_once can not return result code, so result code is set to a global variable. static volatile int tscInitRes = 0; -void tscInitAsyncDispatcher(int32_t batchSize, int32_t timeoutMs, bool isThreadLocal) { +void tscInitDispatcherManager(int32_t batchSize, int32_t timeoutMs, bool isThreadLocal) { if (tsAsyncBatchEnable) { - tscDispatcher = createDispatcherHolder(batchSize, timeoutMs, isThreadLocal); + tscDispatcherManager = createDispatcherManager(batchSize, timeoutMs, isThreadLocal); } } -void tscDestroyAsyncDispatcher() { - if (tscDispatcher) { - destroyDispatcherHolder(tscDispatcher); - tscDispatcher = NULL; +void tscDestroyDispatcherManager() { + if (tscDispatcherManager) { + destroyDispatcherManager(tscDispatcherManager); + tscDispatcherManager = NULL; } } @@ -230,8 +230,8 @@ void taos_init_imp(void) { #endif tscDebug("starting to initialize client ..."); tscDebug("Local End Point is:%s", tsLocalEp); - - tscInitAsyncDispatcher(tsAsyncBatchSize, tsAsyncBatchTimeout, tsAsyncBatchThreadLocal); + + tscInitDispatcherManager(tsAsyncBatchSize, tsAsyncBatchTimeout, tsAsyncBatchThreadLocal); } taosSetCoreDump(); @@ -297,7 +297,7 @@ void taos_cleanup(void) { scriptEnvPoolCleanup(); #endif if (tsAsyncBatchEnable) { - tscDestroyAsyncDispatcher(); + tscDestroyDispatcherManager(); } } diff --git a/src/common/src/tglobal.c b/src/common/src/tglobal.c index 1eceb4ae0ed69d2fff479ecbc40f81a37b75a855..67637f584675297e3b342c5c9a06f7e42b4f869c 100644 --- a/src/common/src/tglobal.c +++ b/src/common/src/tglobal.c @@ -1885,7 +1885,7 @@ static void doInitGlobalConfig(void) { cfg.valType = TAOS_CFG_VTYPE_INT32; cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG; cfg.minValue = 1; - cfg.maxValue = 65535; + cfg.maxValue = 4096; cfg.ptrLength = 0; cfg.unitType = TAOS_CFG_UTYPE_NONE; taosInitConfigOption(cfg);