提交 a4013985 编写于 作者: Z zhihaop

feat: remove unnecessary atomic variable usage

上级 9d6f980b
...@@ -48,13 +48,13 @@ typedef struct SAsyncBulkWriteDispatcher { ...@@ -48,13 +48,13 @@ typedef struct SAsyncBulkWriteDispatcher {
// the batching timeout in milliseconds. // the batching timeout in milliseconds.
int32_t timeoutMs; int32_t timeoutMs;
// the number of insertion rows in the buffer.
int32_t currentSize;
// the number of item in the buffer. // the number of item in the buffer.
volatile int32_t bufferSize; volatile int32_t bufferSize;
// the number of insertion rows in the buffer.
volatile int32_t currentSize;
// while executing timeout task, the buffer will set exclusive for writing. // while executing timeout task, the buffer will set exclusive for writing.
volatile bool exclusive; volatile bool exclusive;
......
...@@ -161,8 +161,8 @@ inline static SArray* dispatcherPollAll(SAsyncBulkWriteDispatcher* dispatcher) { ...@@ -161,8 +161,8 @@ inline static SArray* dispatcherPollAll(SAsyncBulkWriteDispatcher* dispatcher) {
return NULL; return NULL;
} }
dispatcher->currentSize = 0;
atomic_store_32(&dispatcher->bufferSize, 0); atomic_store_32(&dispatcher->bufferSize, 0);
atomic_store_32(&dispatcher->currentSize, 0);
taosArrayClear(dispatcher->buffer); taosArrayClear(dispatcher->buffer);
return statements; return statements;
} }
...@@ -195,14 +195,14 @@ inline static bool dispatcherTryOffer(SAsyncBulkWriteDispatcher* dispatcher, SSq ...@@ -195,14 +195,14 @@ inline static bool dispatcherTryOffer(SAsyncBulkWriteDispatcher* dispatcher, SSq
// usually not happen, unless taos_query_a(...) after taos_close(...). // usually not happen, unless taos_query_a(...) after taos_close(...).
if (atomic_load_8(&dispatcher->shutdown)) { if (atomic_load_8(&dispatcher->shutdown)) {
pthread_mutex_unlock(&dispatcher->mutex); pthread_mutex_unlock(&dispatcher->mutex);
return -1; return false;
} }
// the buffer is full. // the buffer is full.
while (atomic_load_32(&dispatcher->currentSize) >= dispatcher->batchSize) { while (dispatcher->currentSize >= dispatcher->batchSize) {
if (pthread_cond_wait(&dispatcher->notFull, &dispatcher->mutex)) { if (pthread_cond_wait(&dispatcher->notFull, &dispatcher->mutex)) {
pthread_mutex_unlock(&dispatcher->mutex); pthread_mutex_unlock(&dispatcher->mutex);
return -1; return false;
} }
} }
...@@ -210,18 +210,17 @@ inline static bool dispatcherTryOffer(SAsyncBulkWriteDispatcher* dispatcher, SSq ...@@ -210,18 +210,17 @@ inline static bool dispatcherTryOffer(SAsyncBulkWriteDispatcher* dispatcher, SSq
tscDebug("sql obj %p has been write to insert buffer", pSql); tscDebug("sql obj %p has been write to insert buffer", pSql);
atomic_fetch_add_32(&dispatcher->bufferSize, 1); atomic_fetch_add_32(&dispatcher->bufferSize, 1);
int32_t numOfRows = statementGetInsertionRows(pSql); dispatcher->currentSize += statementGetInsertionRows(pSql);
int32_t currentSize = atomic_add_fetch_32(&dispatcher->currentSize, numOfRows);
// the dispatcher has been shutdown or reach batch size. // the dispatcher has been shutdown or reach batch size.
if (atomic_load_8(&dispatcher->shutdown) || currentSize >= dispatcher->batchSize) { if (atomic_load_8(&dispatcher->shutdown) || dispatcher->currentSize >= dispatcher->batchSize) {
SArray* statements = dispatcherPollAll(dispatcher); SArray* statements = dispatcherPollAll(dispatcher);
dispatcherExecute(statements); dispatcherExecute(statements);
taosArrayDestroy(&statements); taosArrayDestroy(&statements);
pthread_cond_broadcast(&dispatcher->notFull); pthread_cond_broadcast(&dispatcher->notFull);
} }
pthread_mutex_unlock(&dispatcher->mutex); pthread_mutex_unlock(&dispatcher->mutex);
return currentSize; return true;
} }
void dispatcherExecute(SArray* statements) { void dispatcherExecute(SArray* statements) {
...@@ -338,12 +337,12 @@ SAsyncBulkWriteDispatcher* createAsyncBulkWriteDispatcher(int32_t batchSize, int ...@@ -338,12 +337,12 @@ SAsyncBulkWriteDispatcher* createAsyncBulkWriteDispatcher(int32_t batchSize, int
if (!dispatcher) { if (!dispatcher) {
return NULL; return NULL;
} }
dispatcher->currentSize = 0;
dispatcher->batchSize = batchSize; dispatcher->batchSize = batchSize;
dispatcher->timeoutMs = timeoutMs; dispatcher->timeoutMs = timeoutMs;
atomic_store_32(&dispatcher->bufferSize, 0); atomic_store_32(&dispatcher->bufferSize, 0);
atomic_store_32(&dispatcher->currentSize, 0);
atomic_store_8(&dispatcher->shutdown, false); atomic_store_8(&dispatcher->shutdown, false);
atomic_store_8(&dispatcher->exclusive, false); atomic_store_8(&dispatcher->exclusive, false);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册