From a40139855584ff308377187d5cedeac7fac4fcaa Mon Sep 17 00:00:00 2001 From: zhihaop Date: Sat, 17 Sep 2022 19:43:42 +0800 Subject: [PATCH] feat: remove unnecessary atomic variable usage --- src/client/inc/tscBulkWrite.h | 10 +++++----- src/client/src/tscBulkWrite.c | 19 +++++++++---------- 2 files changed, 14 insertions(+), 15 deletions(-) diff --git a/src/client/inc/tscBulkWrite.h b/src/client/inc/tscBulkWrite.h index f7d7f2bf54..92fcc19f22 100644 --- a/src/client/inc/tscBulkWrite.h +++ b/src/client/inc/tscBulkWrite.h @@ -48,13 +48,13 @@ typedef struct SAsyncBulkWriteDispatcher { // the batching timeout in milliseconds. int32_t timeoutMs; - + + // the number of insertion rows in the buffer. + int32_t currentSize; + // the number of item in the buffer. volatile int32_t bufferSize; - - // the number of insertion rows in the buffer. - volatile int32_t currentSize; - + // while executing timeout task, the buffer will set exclusive for writing. volatile bool exclusive; diff --git a/src/client/src/tscBulkWrite.c b/src/client/src/tscBulkWrite.c index 8b0f92883f..33bb37f0f3 100644 --- a/src/client/src/tscBulkWrite.c +++ b/src/client/src/tscBulkWrite.c @@ -161,8 +161,8 @@ inline static SArray* dispatcherPollAll(SAsyncBulkWriteDispatcher* dispatcher) { return NULL; } + dispatcher->currentSize = 0; atomic_store_32(&dispatcher->bufferSize, 0); - atomic_store_32(&dispatcher->currentSize, 0); taosArrayClear(dispatcher->buffer); return statements; } @@ -195,14 +195,14 @@ inline static bool dispatcherTryOffer(SAsyncBulkWriteDispatcher* dispatcher, SSq // usually not happen, unless taos_query_a(...) after taos_close(...). if (atomic_load_8(&dispatcher->shutdown)) { pthread_mutex_unlock(&dispatcher->mutex); - return -1; + return false; } // the buffer is full. - while (atomic_load_32(&dispatcher->currentSize) >= dispatcher->batchSize) { + while (dispatcher->currentSize >= dispatcher->batchSize) { if (pthread_cond_wait(&dispatcher->notFull, &dispatcher->mutex)) { pthread_mutex_unlock(&dispatcher->mutex); - return -1; + return false; } } @@ -210,18 +210,17 @@ inline static bool dispatcherTryOffer(SAsyncBulkWriteDispatcher* dispatcher, SSq tscDebug("sql obj %p has been write to insert buffer", pSql); atomic_fetch_add_32(&dispatcher->bufferSize, 1); - int32_t numOfRows = statementGetInsertionRows(pSql); - int32_t currentSize = atomic_add_fetch_32(&dispatcher->currentSize, numOfRows); + dispatcher->currentSize += statementGetInsertionRows(pSql); // the dispatcher has been shutdown or reach batch size. - if (atomic_load_8(&dispatcher->shutdown) || currentSize >= dispatcher->batchSize) { + if (atomic_load_8(&dispatcher->shutdown) || dispatcher->currentSize >= dispatcher->batchSize) { SArray* statements = dispatcherPollAll(dispatcher); dispatcherExecute(statements); taosArrayDestroy(&statements); pthread_cond_broadcast(&dispatcher->notFull); } pthread_mutex_unlock(&dispatcher->mutex); - return currentSize; + return true; } void dispatcherExecute(SArray* statements) { @@ -338,12 +337,12 @@ SAsyncBulkWriteDispatcher* createAsyncBulkWriteDispatcher(int32_t batchSize, int if (!dispatcher) { return NULL; } - + + dispatcher->currentSize = 0; dispatcher->batchSize = batchSize; dispatcher->timeoutMs = timeoutMs; atomic_store_32(&dispatcher->bufferSize, 0); - atomic_store_32(&dispatcher->currentSize, 0); atomic_store_8(&dispatcher->shutdown, false); atomic_store_8(&dispatcher->exclusive, false); -- GitLab