From 010dea240011069635fd5ba1a86d9069e58f8a4a Mon Sep 17 00:00:00 2001 From: zhihaop Date: Thu, 15 Sep 2022 23:43:16 +0800 Subject: [PATCH] feat: using thread local buffer to improve performance at scale --- src/client/inc/tscBulkWrite.h | 51 +++++++++++++++++++++-- src/client/inc/tsclient.h | 4 +- src/client/src/tscAsync.c | 11 +++-- src/client/src/tscBulkWrite.c | 77 ++++++++++++++++++++++++++++------- src/client/src/tscSystem.c | 10 ++--- 5 files changed, 125 insertions(+), 28 deletions(-) diff --git a/src/client/inc/tscBulkWrite.h b/src/client/inc/tscBulkWrite.h index 2278c9b8c5..3ecc9f6478 100644 --- a/src/client/inc/tscBulkWrite.h +++ b/src/client/inc/tscBulkWrite.h @@ -16,23 +16,29 @@ #ifndef TDENGINE_TSCBULKWRITE_H #define TDENGINE_TSCBULKWRITE_H +#include +#include #ifdef __cplusplus extern "C" { #endif -#include "tlist.h" #include "tarray.h" +#include "tlist.h" #include "tthread.h" +/** + * SAsyncBulkWriteDispatcher is an async batching dispatcher(for writing), it can buffer insertion statements, batch + * and merge them into single statement. + */ typedef struct SAsyncBulkWriteDispatcher { - // the mpmc queue to store the insertion statements. equivalent to SList. + // the buffer to store the insertion statements. equivalent to SList. SList* buffer; // the mutex to protect the buffer. pthread_mutex_t mutex; // the background thread to manage batching timeout. - pthread_t* background; + pthread_t background; // the maximum number of insertion rows in a batch. int32_t batchSize; @@ -55,6 +61,7 @@ typedef struct SAsyncBulkWriteDispatcher { // forward declaration. typedef struct SSqlObj SSqlObj; + /** * Merge the statements into single SSqlObj. * @@ -125,6 +132,44 @@ bool tscSupportBulkInsertion(SSqlObj* pSql); */ bool dispatcherTryBatching(SAsyncBulkWriteDispatcher* dispatcher, SSqlObj* pSql); +/** + * A thread local version of SAsyncBulkWriteDispatcher. + */ +typedef struct SThreadLocalDispatcher { + pthread_key_t key; + + // the maximum number of insertion rows in a batch. + int32_t batchSize; + + // the batching timeout in milliseconds. + int32_t timeoutMs; + +} SThreadLocalDispatcher; + +/** + * Create a thread local SAsyncBulkWriteDispatcher variable. + * + * @param batchSize the batchSize of SAsyncBulkWriteDispatcher. + * @param timeoutMs the timeoutMs of SAsyncBulkWriteDispatcher. + * @return the thread local SAsyncBulkWriteDispatcher. + */ +SThreadLocalDispatcher* createThreadLocalDispatcher(int32_t batchSize, int32_t timeoutMs); + +/** + * Destroy the thread local SAsyncBulkWriteDispatcher variable. + * (will destroy all the instances of SAsyncBulkWriteDispatcher in the thread local variable) + * + * @param dispatcher the thread local SAsyncBulkWriteDispatcher variable. + */ +void destroyThreadLocalDispatcher(SThreadLocalDispatcher* dispatcher); + +/** + * Get the thread local instance of SAsyncBulkWriteDispatcher. + * @param dispatcher the thread local SAsyncBulkWriteDispatcher variable. + * @return the thread local SAsyncBulkWriteDispatcher. + */ +SAsyncBulkWriteDispatcher* dispatcherThreadLocal(SThreadLocalDispatcher* dispatcher); + #ifdef __cplusplus } #endif diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 9062d63684..62d399d4d9 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -546,8 +546,8 @@ extern SHashObj *tscTableMetaMap; extern SCacheObj *tscVgroupListBuf; // forward declaration. -typedef struct SAsyncBulkWriteDispatcher SAsyncBulkWriteDispatcher; -extern SAsyncBulkWriteDispatcher* tscDispatcher; +typedef struct SThreadLocalDispatcher SThreadLocalDispatcher; +extern SThreadLocalDispatcher *tscDispatcher; extern int tscObjRef; extern void *tscTmr; extern void *tscQhandle; diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index f2630165f2..a739bb68a8 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -397,10 +397,13 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, __async_cb_func_t fp, void* para return; } - if (tscDispatcher != NULL && dispatcherTryBatching(tscDispatcher, pSql)) { - taosReleaseRef(tscObjRef, pSql->self); - tscDebug("sql obj %p has been buffer in insert buffer", pSql); - return; + if (tscDispatcher != NULL) { + SAsyncBulkWriteDispatcher* dispatcher = dispatcherThreadLocal(tscDispatcher); + if (dispatcherTryBatching(dispatcher, pSql)) { + taosReleaseRef(tscObjRef, pSql->self); + tscDebug("sql obj %p has been buffer in insert buffer", pSql); + return; + } } SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd); diff --git a/src/client/src/tscBulkWrite.c b/src/client/src/tscBulkWrite.c index 982214d4d7..2683a90a7b 100644 --- a/src/client/src/tscBulkWrite.c +++ b/src/client/src/tscBulkWrite.c @@ -149,10 +149,9 @@ SArray* dispatcherPollAll(SAsyncBulkWriteDispatcher* dispatcher) { if (!atomic_load_32(&dispatcher->bufferSize)) { return NULL; } - + pthread_mutex_lock(&dispatcher->mutex); - - SArray* statements = taosArrayInit(atomic_load_32(&dispatcher->bufferSize), sizeof(SSqlObj*)); + SArray* statements = taosArrayInit(0, sizeof(SSqlObj*)); if (statements == NULL) { pthread_mutex_unlock(&dispatcher->mutex); tscError("failed to poll all items: out of memory"); @@ -160,7 +159,7 @@ SArray* dispatcherPollAll(SAsyncBulkWriteDispatcher* dispatcher) { } // get all the sql statements from the buffer. - while (atomic_load_32(&dispatcher->bufferSize)) { + while (true) { SListNode* node = tdListPopHead(dispatcher->buffer); if (!node) { break; @@ -172,10 +171,9 @@ SArray* dispatcherPollAll(SAsyncBulkWriteDispatcher* dispatcher) { listNodeFree(node); atomic_fetch_sub_32(&dispatcher->bufferSize, 1); atomic_fetch_sub_32(&dispatcher->currentSize, statementGetInsertionRows(item)); - taosArrayPush(statements, &item); } - + pthread_mutex_unlock(&dispatcher->mutex); return statements; } @@ -285,8 +283,7 @@ SAsyncBulkWriteDispatcher* createAsyncBulkWriteDispatcher(int32_t batchSize, int pthread_mutex_init(&dispatcher->mutex, NULL); // init background thread. - dispatcher->background = taosCreateThread(dispatcherTimeoutCallback, dispatcher); - if (!dispatcher->background) { + if (pthread_create(&dispatcher->background, NULL, dispatcherTimeoutCallback, dispatcher)) { tdListFree(dispatcher->buffer); tfree(dispatcher); return NULL; @@ -299,18 +296,19 @@ void destroyAsyncDispatcher(SAsyncBulkWriteDispatcher* dispatcher) { if (dispatcher == NULL) { return; } - + + // mark shutdown. atomic_store_8(&dispatcher->shutdown, true); - + + // make sure the timeout thread exit. + pthread_join(dispatcher->background, NULL); + // poll and send all the statements in the buffer. while (atomic_load_32(&dispatcher->bufferSize)) { SArray* statements = dispatcherPollAll(dispatcher); dispatcherExecute(statements); } - - // make sure the thread exit. - taosDestroyThread(dispatcher->background); - + // destroy the buffer. tdListFree(dispatcher->buffer); @@ -375,3 +373,54 @@ bool dispatcherTryBatching(SAsyncBulkWriteDispatcher* dispatcher, SSqlObj* pSql) } return true; } + +/** + * Destroy the SAsyncBulkWriteDispatcher create by SThreadLocalDispatcher. + * @param arg + */ +static void destroyDispatcher(void* arg) { + SAsyncBulkWriteDispatcher* dispatcher = arg; + if (!dispatcher) { + return; + } + + destroyAsyncDispatcher(dispatcher); +} + +SThreadLocalDispatcher* createThreadLocalDispatcher(int32_t batchSize, int32_t timeoutMs) { + SThreadLocalDispatcher* dispatcher = calloc(1, sizeof(SThreadLocalDispatcher)); + if (!dispatcher) { + return NULL; + } + + dispatcher->batchSize = batchSize; + dispatcher->timeoutMs = timeoutMs; + + if (pthread_key_create(&dispatcher->key, destroyDispatcher)) { + free(dispatcher); + return NULL; + } + return dispatcher; +} + +SAsyncBulkWriteDispatcher* dispatcherThreadLocal(SThreadLocalDispatcher* dispatcher) { + SAsyncBulkWriteDispatcher* value = pthread_getspecific(dispatcher->key); + if (value) { + return value; + } + + value = createAsyncBulkWriteDispatcher(dispatcher->batchSize, dispatcher->timeoutMs); + if (value) { + pthread_setspecific(dispatcher->key, value); + return value; + } + + return NULL; +} + +void destroyThreadLocalDispatcher(SThreadLocalDispatcher* dispatcher) { + if (dispatcher) { + pthread_key_delete(dispatcher->key); + free(dispatcher); + } +} diff --git a/src/client/src/tscSystem.c b/src/client/src/tscSystem.c index de467896d6..d8a2436cee 100644 --- a/src/client/src/tscSystem.c +++ b/src/client/src/tscSystem.c @@ -50,7 +50,7 @@ void *tscRpcCache; // cache to keep rpc obj int32_t tscNumOfThreads = 1; // num of rpc threads char tscLogFileName[] = "taoslog"; int tscLogFileNum = 10; -SAsyncBulkWriteDispatcher* tscDispatcher = NULL; +SThreadLocalDispatcher * tscDispatcher = NULL; static pthread_mutex_t rpcObjMutex; // mutex to protect open the rpc obj concurrently static pthread_once_t tscinit = PTHREAD_ONCE_INIT; @@ -60,20 +60,20 @@ static pthread_mutex_t setConfMutex = PTHREAD_MUTEX_INITIALIZER; static volatile int tscInitRes = 0; /** - * Init the taosc async bulk write dispatcher. + * Init the thread local async bulk write dispatcher. * * @param batchSize the batchSize of async bulk write dispatcher. * @param timeoutMs the timeout of batching in milliseconds. */ void tscInitAsyncDispatcher(int32_t batchSize, int32_t timeoutMs) { - tscDispatcher = createAsyncBulkWriteDispatcher(batchSize, timeoutMs); + tscDispatcher = createThreadLocalDispatcher(batchSize, timeoutMs); } /** - * Destroy the taosc async bulk write dispatcher. + * Destroy the thread local async bulk write dispatcher. */ void tscDestroyAsyncDispatcher() { - destroyAsyncDispatcher(tscDispatcher); + destroyThreadLocalDispatcher(tscDispatcher); tscDispatcher = NULL; } -- GitLab