diff --git a/src/client/src/tscBulkWrite.c b/src/client/src/tscBulkWrite.c index 9994d85aaf9f9ca2c5755ae9def154ce19ebf49d..8b0f92883f7f484ec705169226ec56d771743edd 100644 --- a/src/client/src/tscBulkWrite.c +++ b/src/client/src/tscBulkWrite.c @@ -144,32 +144,40 @@ int32_t dispatcherStatementMerge(SArray* statements, SSqlObj** result) { } /** - * Poll all the SSqlObj* in the dispatcher's buffer. + * Poll all the SSqlObj* in the dispatcher's buffer (No Lock). After call this function, + * you need to notify dispatcher->notFull by yourself. * * @param dispatcher the dispatcher. * @return the items in the dispatcher, SArray. */ inline static SArray* dispatcherPollAll(SAsyncBulkWriteDispatcher* dispatcher) { - pthread_mutex_lock(&dispatcher->mutex); if (!atomic_load_32(&dispatcher->bufferSize)) { - pthread_cond_broadcast(&dispatcher->notFull); - pthread_mutex_unlock(&dispatcher->mutex); return NULL; } SArray* statements = taosArrayDup(dispatcher->buffer); if (statements == NULL) { - pthread_mutex_unlock(&dispatcher->mutex); tscError("failed to poll all items: out of memory"); return NULL; } - + atomic_store_32(&dispatcher->bufferSize, 0); atomic_store_32(&dispatcher->currentSize, 0); taosArrayClear(dispatcher->buffer); + return statements; +} + +/** + * Poll all the SSqlObj* in the dispatcher's buffer. + * + * @param dispatcher the dispatcher. + * @return the items in the dispatcher, SArray. + */ +inline static SArray* dispatcherLockPollAll(SAsyncBulkWriteDispatcher* dispatcher) { + pthread_mutex_lock(&dispatcher->mutex); + SArray* statements = dispatcherPollAll(dispatcher); pthread_cond_broadcast(&dispatcher->notFull); pthread_mutex_unlock(&dispatcher->mutex); - return statements; } @@ -178,9 +186,9 @@ inline static SArray* dispatcherPollAll(SAsyncBulkWriteDispatcher* dispatcher) { * * @param dispatcher the async bulk write dispatcher. * @param pSql the sql object to offer. - * @return if offer success, return the current size of the buffer. otherwise returns -1. + * @return return whether offer success. */ -inline static int32_t dispatcherTryOffer(SAsyncBulkWriteDispatcher* dispatcher, SSqlObj* pSql) { +inline static bool dispatcherTryOffer(SAsyncBulkWriteDispatcher* dispatcher, SSqlObj* pSql) { pthread_mutex_lock(&dispatcher->mutex); // if dispatcher is shutdown, must fail back to normal insertion. @@ -204,6 +212,14 @@ inline static int32_t dispatcherTryOffer(SAsyncBulkWriteDispatcher* dispatcher, atomic_fetch_add_32(&dispatcher->bufferSize, 1); int32_t numOfRows = statementGetInsertionRows(pSql); int32_t currentSize = atomic_add_fetch_32(&dispatcher->currentSize, numOfRows); + + // the dispatcher has been shutdown or reach batch size. + if (atomic_load_8(&dispatcher->shutdown) || currentSize >= dispatcher->batchSize) { + SArray* statements = dispatcherPollAll(dispatcher); + dispatcherExecute(statements); + taosArrayDestroy(&statements); + pthread_cond_broadcast(&dispatcher->notFull); + } pthread_mutex_unlock(&dispatcher->mutex); return currentSize; } @@ -305,7 +321,7 @@ static void* dispatcherTimeoutCallback(void* arg) { struct timespec timeout = afterMillis(current, dispatcher->timeoutMs); atomic_store_8(&dispatcher->exclusive, true); - SArray* statements = dispatcherPollAll(dispatcher); + SArray* statements = dispatcherLockPollAll(dispatcher); atomic_store_8(&dispatcher->exclusive, false); dispatcherExecute(statements); taosArrayDestroy(&statements); @@ -381,7 +397,7 @@ void destroyAsyncDispatcher(SAsyncBulkWriteDispatcher* dispatcher) { // poll and send all the statements in the buffer. while (atomic_load_32(&dispatcher->bufferSize)) { - SArray* statements = dispatcherPollAll(dispatcher); + SArray* statements = dispatcherLockPollAll(dispatcher); dispatcherExecute(statements); taosArrayDestroy(&statements); } @@ -446,17 +462,7 @@ bool dispatcherTryBatching(SAsyncBulkWriteDispatcher* dispatcher, SSqlObj* pSql) } // try to offer pSql to the buffer. - int32_t currentSize = dispatcherTryOffer(dispatcher, pSql); - if (currentSize < 0) { - return false; - } - - if (atomic_load_8(&dispatcher->shutdown) || currentSize >= dispatcher->batchSize) { - SArray* statements = dispatcherPollAll(dispatcher); - dispatcherExecute(statements); - taosArrayDestroy(&statements); - } - return true; + return dispatcherTryOffer(dispatcher, pSql); } /**