From 5fcff75c92b02b14d31e93f5ffdc9baffe992096 Mon Sep 17 00:00:00 2001 From: zhihaop Date: Fri, 16 Sep 2022 20:12:14 +0800 Subject: [PATCH] feat: improve the performance of async bulk write dispatcher --- src/client/inc/tscBulkWrite.h | 32 +++---- src/client/src/tscBulkWrite.c | 157 +++++++++++++++++++++++----------- 2 files changed, 115 insertions(+), 74 deletions(-) diff --git a/src/client/inc/tscBulkWrite.h b/src/client/inc/tscBulkWrite.h index dd7fe9fcdf..f7d7f2bf54 100644 --- a/src/client/inc/tscBulkWrite.h +++ b/src/client/inc/tscBulkWrite.h @@ -31,11 +31,14 @@ typedef struct SAsyncBulkWriteDispatcher { // the buffer to store the insertion statements. equivalent to SArray. SArray* buffer; - // the mutex to protect the buffer. + // the mutex to protect the dispatcher. pthread_mutex_t mutex; // the cond to signal to background thread. - pthread_cond_t cond; + pthread_cond_t timeout; + + // the cond to signal to background thread. + pthread_cond_t notFull; // the background thread to manage batching timeout. pthread_t background; @@ -73,24 +76,7 @@ typedef struct SSqlObj SSqlObj; int32_t dispatcherStatementMerge(SArray* statements, SSqlObj** result); /** - * Poll all the SSqlObj* in the dispatcher's buffer. - * - * @param dispatcher the dispatcher. - * @return the items in the dispatcher, SArray. - */ -SArray* dispatcherPollAll(SAsyncBulkWriteDispatcher* dispatcher); - -/** - * @brief Try to offer the SSqlObj* to the dispatcher. - * - * @param dispatcher the async bulk write dispatcher. - * @param pSql the sql object to offer. - * @return if offer success, return the current size of the buffer. otherwise returns -1. - */ -int32_t dispatcherTryOffer(SAsyncBulkWriteDispatcher* dispatcher, SSqlObj* pSql); - -/** - * @brief Merge the sql statements and execute the merged sql statement. + * Merge the sql statements and execute the merged sql statement. * * @param statements the array of sql statement. a.k.a SArray. */ @@ -117,11 +103,12 @@ void destroyAsyncDispatcher(SAsyncBulkWriteDispatcher* dispatcher); * 1. auto batch feature on the sql object must be enabled. * 2. must be an `insert into ... value ...` statement. * 3. the payload type must be kv payload. - * + * + * @param dispatcher the async dispatcher. * @param pSql the sql object to check. * @return returns true if the sql object supports auto batch. */ -bool tscSupportBulkInsertion(SSqlObj* pSql); +bool tscSupportBulkInsertion(SAsyncBulkWriteDispatcher* dispatcher, SSqlObj* pSql); /** * Try to offer the SSqlObj* to the buffer. If the number of row reach `asyncBatchSize`, the function @@ -174,6 +161,7 @@ void destroyDispatcherHolder(SDispatcherHolder* holder); /** * Get an instance of SAsyncBulkWriteDispatcher. + * * @param holder the holder of SAsyncBulkWriteDispatcher. * @return the SAsyncBulkWriteDispatcher instance. */ diff --git a/src/client/src/tscBulkWrite.c b/src/client/src/tscBulkWrite.c index f78aafa96a..9994d85aaf 100644 --- a/src/client/src/tscBulkWrite.c +++ b/src/client/src/tscBulkWrite.c @@ -143,12 +143,20 @@ int32_t dispatcherStatementMerge(SArray* statements, SSqlObj** result) { return code; } -SArray* dispatcherPollAll(SAsyncBulkWriteDispatcher* dispatcher) { +/** + * Poll all the SSqlObj* in the dispatcher's buffer. + * + * @param dispatcher the dispatcher. + * @return the items in the dispatcher, SArray. + */ +inline static SArray* dispatcherPollAll(SAsyncBulkWriteDispatcher* dispatcher) { + pthread_mutex_lock(&dispatcher->mutex); if (!atomic_load_32(&dispatcher->bufferSize)) { + pthread_cond_broadcast(&dispatcher->notFull); + pthread_mutex_unlock(&dispatcher->mutex); return NULL; } - - pthread_mutex_lock(&dispatcher->mutex); + SArray* statements = taosArrayDup(dispatcher->buffer); if (statements == NULL) { pthread_mutex_unlock(&dispatcher->mutex); @@ -159,27 +167,38 @@ SArray* dispatcherPollAll(SAsyncBulkWriteDispatcher* dispatcher) { atomic_store_32(&dispatcher->bufferSize, 0); atomic_store_32(&dispatcher->currentSize, 0); taosArrayClear(dispatcher->buffer); + pthread_cond_broadcast(&dispatcher->notFull); pthread_mutex_unlock(&dispatcher->mutex); return statements; } -int32_t dispatcherTryOffer(SAsyncBulkWriteDispatcher* dispatcher, SSqlObj* pSql) { - // pre-check: the buffer is full. - if (atomic_load_32(&dispatcher->currentSize) >= dispatcher->batchSize) { - return -1; - } - +/** + * @brief Try to offer the SSqlObj* to the dispatcher. + * + * @param dispatcher the async bulk write dispatcher. + * @param pSql the sql object to offer. + * @return if offer success, return the current size of the buffer. otherwise returns -1. + */ +inline static int32_t dispatcherTryOffer(SAsyncBulkWriteDispatcher* dispatcher, SSqlObj* pSql) { pthread_mutex_lock(&dispatcher->mutex); - - // double-check: the buffer is full. - if (atomic_load_32(&dispatcher->currentSize) >= dispatcher->batchSize) { + + // if dispatcher is shutdown, must fail back to normal insertion. + // usually not happen, unless taos_query_a(...) after taos_close(...). + if (atomic_load_8(&dispatcher->shutdown)) { pthread_mutex_unlock(&dispatcher->mutex); return -1; } + + // the buffer is full. + while (atomic_load_32(&dispatcher->currentSize) >= dispatcher->batchSize) { + if (pthread_cond_wait(&dispatcher->notFull, &dispatcher->mutex)) { + pthread_mutex_unlock(&dispatcher->mutex); + return -1; + } + } taosArrayPush(dispatcher->buffer, pSql); - tscDebug("sql obj %p has been write to insert buffer", pSql); atomic_fetch_add_32(&dispatcher->bufferSize, 1); @@ -205,9 +224,7 @@ void dispatcherExecute(SArray* statements) { tscDebug("merging %zu sql objs into %p", taosArrayGetSize(statements), merged); tscHandleMultivnodeInsert(merged); - taosArrayDestroy(&statements); return; - _error: tscError("send async batch sql obj failed, reason: %s", tstrerror(code)); @@ -216,7 +233,6 @@ _error: SSqlObj* item = *((SSqlObj**)taosArrayGet(statements, i)); tscReturnsError(item, code); } - taosArrayDestroy(&statements); } /** @@ -245,43 +261,58 @@ static inline int64_t durationMillis(struct timespec s, struct timespec t) { return d; } +/** + * Sleep until `timeout` timespec. When dispatcherShutdown(...) called, the function will return immediately. + * + * @param dispatcher the dispatcher thread to sleep. + * @param timeout the timeout in CLOCK_REALTIME. + */ +inline static void dispatcherSleepUntil(SAsyncBulkWriteDispatcher* dispatcher, struct timespec timeout) { + struct timespec current; + clock_gettime(CLOCK_REALTIME, ¤t); + + // if current > timeout, no sleep required. + if (durationMillis(current, timeout) <= 0) { + return; + } + + if (pthread_mutex_timedlock(&dispatcher->mutex, &timeout)) { + return; + } + + while (true) { + // notified by dispatcherShutdown(...). + if (atomic_load_8(&dispatcher->shutdown)) { + break; + } + if (pthread_cond_timedwait(&dispatcher->timeout, &dispatcher->mutex, &timeout)) { + break; + } + } + pthread_mutex_unlock(&dispatcher->mutex); +} + /** * The thread to manage batching timeout. */ static void* dispatcherTimeoutCallback(void* arg) { SAsyncBulkWriteDispatcher* dispatcher = arg; - setThreadName("tscBackground"); + setThreadName("tscAsyncBackground"); while (!atomic_load_8(&dispatcher->shutdown)) { - struct timespec t1, t2; - clock_gettime(CLOCK_REALTIME, &t1); + struct timespec current; + clock_gettime(CLOCK_REALTIME, ¤t); + struct timespec timeout = afterMillis(current, dispatcher->timeoutMs); atomic_store_8(&dispatcher->exclusive, true); SArray* statements = dispatcherPollAll(dispatcher); atomic_store_8(&dispatcher->exclusive, false); - dispatcherExecute(statements); - clock_gettime(CLOCK_REALTIME, &t2); - + taosArrayDestroy(&statements); + // Similar to scheduleAtFixedRate in Java, if the execution time exceed // `timeoutMs` milliseconds, then there will be no sleep. - struct timespec t3 = afterMillis(t1, dispatcher->timeoutMs); - if (durationMillis(t2, t3) > 0) { - if (pthread_mutex_timedlock(&dispatcher->mutex, &t3)) { - continue; - } - - while (true) { - if (atomic_load_8(&dispatcher->shutdown)) { - break; - } - - if (pthread_cond_timedwait(&dispatcher->cond, &dispatcher->mutex, &t3)) { - break; - } - } - pthread_mutex_unlock(&dispatcher->mutex); - } + dispatcherSleepUntil(dispatcher, timeout); } return NULL; } @@ -309,10 +340,14 @@ SAsyncBulkWriteDispatcher* createAsyncBulkWriteDispatcher(int32_t batchSize, int // init the mutex and the cond. pthread_mutex_init(&dispatcher->mutex, NULL); - pthread_cond_init(&dispatcher->cond, NULL); + pthread_cond_init(&dispatcher->timeout, NULL); + pthread_cond_init(&dispatcher->notFull, NULL); // init background thread. if (pthread_create(&dispatcher->background, NULL, dispatcherTimeoutCallback, dispatcher)) { + pthread_mutex_destroy(&dispatcher->mutex); + pthread_cond_destroy(&dispatcher->timeout); + pthread_cond_destroy(&dispatcher->notFull); taosArrayDestroy(&dispatcher->buffer); tfree(dispatcher); return NULL; @@ -321,24 +356,34 @@ SAsyncBulkWriteDispatcher* createAsyncBulkWriteDispatcher(int32_t batchSize, int return dispatcher; } -void destroyAsyncDispatcher(SAsyncBulkWriteDispatcher* dispatcher) { - if (dispatcher == NULL) { - return; - } - +/** + * Shutdown the dispatcher and join the timeout thread. + * + * @param dispatcher the dispatcher. + */ +inline static void dispatcherShutdown(SAsyncBulkWriteDispatcher* dispatcher) { // mark shutdown, signal shutdown to timeout thread. pthread_mutex_lock(&dispatcher->mutex); atomic_store_8(&dispatcher->shutdown, true); - pthread_cond_signal(&dispatcher->cond); + pthread_cond_broadcast(&dispatcher->timeout); pthread_mutex_unlock(&dispatcher->mutex); - + // make sure the timeout thread exit. pthread_join(dispatcher->background, NULL); +} + +void destroyAsyncDispatcher(SAsyncBulkWriteDispatcher* dispatcher) { + if (dispatcher == NULL) { + return; + } + + dispatcherShutdown(dispatcher); // poll and send all the statements in the buffer. while (atomic_load_32(&dispatcher->bufferSize)) { SArray* statements = dispatcherPollAll(dispatcher); dispatcherExecute(statements); + taosArrayDestroy(&statements); } // destroy the buffer. @@ -346,10 +391,13 @@ void destroyAsyncDispatcher(SAsyncBulkWriteDispatcher* dispatcher) { // destroy the mutex. pthread_mutex_destroy(&dispatcher->mutex); + pthread_cond_destroy(&dispatcher->timeout); + pthread_cond_destroy(&dispatcher->notFull); free(dispatcher); } -bool tscSupportBulkInsertion(SSqlObj* pSql) { + +bool tscSupportBulkInsertion(SAsyncBulkWriteDispatcher* dispatcher, SSqlObj* pSql) { if (pSql == NULL || !pSql->enableBatch) { return false; } @@ -373,6 +421,11 @@ bool tscSupportBulkInsertion(SSqlObj* pSql) { if (pInsertParam->payloadType != PAYLOAD_TYPE_KV) { return false; } + + // too many insertion rows, fail back to normal insertion. + if (statementGetInsertionRows(pSql) >= dispatcher->batchSize) { + return false; + } return true; } @@ -383,7 +436,7 @@ bool dispatcherTryBatching(SAsyncBulkWriteDispatcher* dispatcher, SSqlObj* pSql) } // the sql object doesn't support bulk insertion. - if (!tscSupportBulkInsertion(pSql)) { + if (!tscSupportBulkInsertion(dispatcher, pSql)) { return false; } @@ -397,11 +450,11 @@ bool dispatcherTryBatching(SAsyncBulkWriteDispatcher* dispatcher, SSqlObj* pSql) if (currentSize < 0) { return false; } - - // the buffer is full or reach batch size. - if (currentSize >= dispatcher->batchSize) { + + if (atomic_load_8(&dispatcher->shutdown) || currentSize >= dispatcher->batchSize) { SArray* statements = dispatcherPollAll(dispatcher); dispatcherExecute(statements); + taosArrayDestroy(&statements); } return true; } -- GitLab