diff --git a/src/client/inc/tscBulkWrite.h b/src/client/inc/tscBulkWrite.h index f83b9d33c429ecefa630b7db2c58f148f9a387b5..33a6a356b16cd4d778f3c734d40016a566b95c47 100644 --- a/src/client/inc/tscBulkWrite.h +++ b/src/client/inc/tscBulkWrite.h @@ -23,6 +23,9 @@ extern "C" { #include "tarray.h" #include "tthread.h" +// forward declaration. +typedef struct SSqlObj SSqlObj; + /** * SAsyncBulkWriteDispatcher is an async batching dispatcher(for writing), it can buffer insertion statements, batch * and merge them into single statement. @@ -51,15 +54,10 @@ typedef struct SAsyncBulkWriteDispatcher { // the number of insertion rows in the buffer. int32_t currentSize; - - // the number of item in the buffer. - volatile int32_t bufferSize; - - // while executing timeout task, the buffer will set exclusive for writing. - volatile bool exclusive; // whether the dispatcher is shutdown. volatile bool shutdown; + } SAsyncBulkWriteDispatcher; // forward declaration. diff --git a/src/client/src/tscBulkWrite.c b/src/client/src/tscBulkWrite.c index b723891500d4904702e539eb60651a3e1b5a813d..11b5c3902989b2d56fc72a0e1d4bb19e890a2cc2 100644 --- a/src/client/src/tscBulkWrite.c +++ b/src/client/src/tscBulkWrite.c @@ -151,7 +151,7 @@ int32_t dispatcherStatementMerge(SArray* statements, SSqlObj** result) { * @return the items in the dispatcher, SArray. */ inline static SArray* dispatcherPollAll(SAsyncBulkWriteDispatcher* dispatcher) { - if (!atomic_load_32(&dispatcher->bufferSize)) { + if (!taosArrayGetSize(dispatcher->buffer)) { return NULL; } @@ -162,7 +162,6 @@ inline static SArray* dispatcherPollAll(SAsyncBulkWriteDispatcher* dispatcher) { } dispatcher->currentSize = 0; - atomic_store_32(&dispatcher->bufferSize, 0); taosArrayClear(dispatcher->buffer); return statements; } @@ -207,10 +206,8 @@ inline static bool dispatcherTryOffer(SAsyncBulkWriteDispatcher* dispatcher, SSq } taosArrayPush(dispatcher->buffer, pSql); - tscDebug("sql obj %p has been write to insert buffer", pSql); - - atomic_fetch_add_32(&dispatcher->bufferSize, 1); dispatcher->currentSize += statementGetInsertionRows(pSql); + tscDebug("sql obj %p has been write to insert buffer", pSql); // the dispatcher has been shutdown or reach batch size. if (atomic_load_8(&dispatcher->shutdown) || dispatcher->currentSize >= dispatcher->batchSize) { @@ -315,10 +312,8 @@ static void* dispatcherTimeoutCallback(void* arg) { struct timespec current; clock_gettime(CLOCK_REALTIME, ¤t); struct timespec timeout = afterMillis(current, dispatcher->timeoutMs); - - atomic_store_8(&dispatcher->exclusive, true); + SArray* statements = dispatcherLockPollAll(dispatcher); - atomic_store_8(&dispatcher->exclusive, false); dispatcherExecute(statements); taosArrayDestroy(&statements); @@ -338,10 +333,8 @@ SAsyncBulkWriteDispatcher* createAsyncBulkWriteDispatcher(int32_t batchSize, int dispatcher->currentSize = 0; dispatcher->batchSize = batchSize; dispatcher->timeoutMs = timeoutMs; - - atomic_store_32(&dispatcher->bufferSize, 0); + atomic_store_8(&dispatcher->shutdown, false); - atomic_store_8(&dispatcher->exclusive, false); // init the buffer. dispatcher->buffer = taosArrayInit(batchSize, sizeof(SSqlObj*)); @@ -392,8 +385,12 @@ void destroyAsyncDispatcher(SAsyncBulkWriteDispatcher* dispatcher) { dispatcherShutdown(dispatcher); // poll and send all the statements in the buffer. - while (atomic_load_32(&dispatcher->bufferSize)) { + while (true) { SArray* statements = dispatcherLockPollAll(dispatcher); + if (!statements) { + break ; + } + dispatcherExecute(statements); taosArrayDestroy(&statements); } @@ -452,11 +449,6 @@ bool dispatcherTryDispatch(SAsyncBulkWriteDispatcher* dispatcher, SSqlObj* pSql) return false; } - // the buffer is exclusive. - if (atomic_load_8(&dispatcher->exclusive)) { - return false; - } - // try to offer pSql to the buffer. return dispatcherTryOffer(dispatcher, pSql); } diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 19088d161e3fef1763c3c6363f8cac5158800648..a923757af8bb3d0a12c984f17c8e4fb2f1146e2b 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -2224,13 +2224,13 @@ static void extractTableNameList(SSqlObj *pSql, SInsertStatementParam *pInsertPa */ int32_t tscMergeKVPayLoadSqlObj(SArray* statements, SSqlObj **result) { // statement array is empty. - if (statements == NULL || taosArrayGetSize(statements) == 0) { + if (!statements || !taosArrayGetSize(statements)) { return TSDB_CODE_TSC_INVALID_OPERATION; } // a.k.a SHashObj, the key value represents vgroup id. SHashObj* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false); - if (pVnodeDataBlockHashList == NULL) { + if (!pVnodeDataBlockHashList) { return TSDB_CODE_TSC_OUT_OF_MEMORY; } @@ -2307,7 +2307,7 @@ int32_t tscMergeKVPayLoadSqlObj(SArray* statements, SSqlObj **result) { // copy the data into vgroup data blocks. memcpy(dataBuf->pData + dataBuf->size, tableBlock->pData + tableBlock->headerSize, tableBlock->size - tableBlock->headerSize); dataBuf->size += tableBlock->size - tableBlock->headerSize; - dataBuf->numOfTables += 1; + dataBuf->numOfTables += tableBlock->numOfTables; tscDestroyDataBlock(pSql, tableBlock, false); }