提交 6b7ea1d2 编写于 作者: Z zhihaop

fix: pDataBlock->numOfTables is not correct

上级 4a69ca01
...@@ -23,6 +23,9 @@ extern "C" { ...@@ -23,6 +23,9 @@ extern "C" {
#include "tarray.h" #include "tarray.h"
#include "tthread.h" #include "tthread.h"
// forward declaration.
typedef struct SSqlObj SSqlObj;
/** /**
* SAsyncBulkWriteDispatcher is an async batching dispatcher(for writing), it can buffer insertion statements, batch * SAsyncBulkWriteDispatcher is an async batching dispatcher(for writing), it can buffer insertion statements, batch
* and merge them into single statement. * and merge them into single statement.
...@@ -52,14 +55,9 @@ typedef struct SAsyncBulkWriteDispatcher { ...@@ -52,14 +55,9 @@ typedef struct SAsyncBulkWriteDispatcher {
// the number of insertion rows in the buffer. // the number of insertion rows in the buffer.
int32_t currentSize; int32_t currentSize;
// the number of item in the buffer.
volatile int32_t bufferSize;
// while executing timeout task, the buffer will set exclusive for writing.
volatile bool exclusive;
// whether the dispatcher is shutdown. // whether the dispatcher is shutdown.
volatile bool shutdown; volatile bool shutdown;
} SAsyncBulkWriteDispatcher; } SAsyncBulkWriteDispatcher;
// forward declaration. // forward declaration.
......
...@@ -151,7 +151,7 @@ int32_t dispatcherStatementMerge(SArray* statements, SSqlObj** result) { ...@@ -151,7 +151,7 @@ int32_t dispatcherStatementMerge(SArray* statements, SSqlObj** result) {
* @return the items in the dispatcher, SArray<SSqlObj*>. * @return the items in the dispatcher, SArray<SSqlObj*>.
*/ */
inline static SArray* dispatcherPollAll(SAsyncBulkWriteDispatcher* dispatcher) { inline static SArray* dispatcherPollAll(SAsyncBulkWriteDispatcher* dispatcher) {
if (!atomic_load_32(&dispatcher->bufferSize)) { if (!taosArrayGetSize(dispatcher->buffer)) {
return NULL; return NULL;
} }
...@@ -162,7 +162,6 @@ inline static SArray* dispatcherPollAll(SAsyncBulkWriteDispatcher* dispatcher) { ...@@ -162,7 +162,6 @@ inline static SArray* dispatcherPollAll(SAsyncBulkWriteDispatcher* dispatcher) {
} }
dispatcher->currentSize = 0; dispatcher->currentSize = 0;
atomic_store_32(&dispatcher->bufferSize, 0);
taosArrayClear(dispatcher->buffer); taosArrayClear(dispatcher->buffer);
return statements; return statements;
} }
...@@ -207,10 +206,8 @@ inline static bool dispatcherTryOffer(SAsyncBulkWriteDispatcher* dispatcher, SSq ...@@ -207,10 +206,8 @@ inline static bool dispatcherTryOffer(SAsyncBulkWriteDispatcher* dispatcher, SSq
} }
taosArrayPush(dispatcher->buffer, pSql); taosArrayPush(dispatcher->buffer, pSql);
tscDebug("sql obj %p has been write to insert buffer", pSql);
atomic_fetch_add_32(&dispatcher->bufferSize, 1);
dispatcher->currentSize += statementGetInsertionRows(pSql); dispatcher->currentSize += statementGetInsertionRows(pSql);
tscDebug("sql obj %p has been write to insert buffer", pSql);
// the dispatcher has been shutdown or reach batch size. // the dispatcher has been shutdown or reach batch size.
if (atomic_load_8(&dispatcher->shutdown) || dispatcher->currentSize >= dispatcher->batchSize) { if (atomic_load_8(&dispatcher->shutdown) || dispatcher->currentSize >= dispatcher->batchSize) {
...@@ -316,9 +313,7 @@ static void* dispatcherTimeoutCallback(void* arg) { ...@@ -316,9 +313,7 @@ static void* dispatcherTimeoutCallback(void* arg) {
clock_gettime(CLOCK_REALTIME, &current); clock_gettime(CLOCK_REALTIME, &current);
struct timespec timeout = afterMillis(current, dispatcher->timeoutMs); struct timespec timeout = afterMillis(current, dispatcher->timeoutMs);
atomic_store_8(&dispatcher->exclusive, true);
SArray* statements = dispatcherLockPollAll(dispatcher); SArray* statements = dispatcherLockPollAll(dispatcher);
atomic_store_8(&dispatcher->exclusive, false);
dispatcherExecute(statements); dispatcherExecute(statements);
taosArrayDestroy(&statements); taosArrayDestroy(&statements);
...@@ -339,9 +334,7 @@ SAsyncBulkWriteDispatcher* createAsyncBulkWriteDispatcher(int32_t batchSize, int ...@@ -339,9 +334,7 @@ SAsyncBulkWriteDispatcher* createAsyncBulkWriteDispatcher(int32_t batchSize, int
dispatcher->batchSize = batchSize; dispatcher->batchSize = batchSize;
dispatcher->timeoutMs = timeoutMs; dispatcher->timeoutMs = timeoutMs;
atomic_store_32(&dispatcher->bufferSize, 0);
atomic_store_8(&dispatcher->shutdown, false); atomic_store_8(&dispatcher->shutdown, false);
atomic_store_8(&dispatcher->exclusive, false);
// init the buffer. // init the buffer.
dispatcher->buffer = taosArrayInit(batchSize, sizeof(SSqlObj*)); dispatcher->buffer = taosArrayInit(batchSize, sizeof(SSqlObj*));
...@@ -392,8 +385,12 @@ void destroyAsyncDispatcher(SAsyncBulkWriteDispatcher* dispatcher) { ...@@ -392,8 +385,12 @@ void destroyAsyncDispatcher(SAsyncBulkWriteDispatcher* dispatcher) {
dispatcherShutdown(dispatcher); dispatcherShutdown(dispatcher);
// poll and send all the statements in the buffer. // poll and send all the statements in the buffer.
while (atomic_load_32(&dispatcher->bufferSize)) { while (true) {
SArray* statements = dispatcherLockPollAll(dispatcher); SArray* statements = dispatcherLockPollAll(dispatcher);
if (!statements) {
break ;
}
dispatcherExecute(statements); dispatcherExecute(statements);
taosArrayDestroy(&statements); taosArrayDestroy(&statements);
} }
...@@ -452,11 +449,6 @@ bool dispatcherTryDispatch(SAsyncBulkWriteDispatcher* dispatcher, SSqlObj* pSql) ...@@ -452,11 +449,6 @@ bool dispatcherTryDispatch(SAsyncBulkWriteDispatcher* dispatcher, SSqlObj* pSql)
return false; return false;
} }
// the buffer is exclusive.
if (atomic_load_8(&dispatcher->exclusive)) {
return false;
}
// try to offer pSql to the buffer. // try to offer pSql to the buffer.
return dispatcherTryOffer(dispatcher, pSql); return dispatcherTryOffer(dispatcher, pSql);
} }
......
...@@ -2224,13 +2224,13 @@ static void extractTableNameList(SSqlObj *pSql, SInsertStatementParam *pInsertPa ...@@ -2224,13 +2224,13 @@ static void extractTableNameList(SSqlObj *pSql, SInsertStatementParam *pInsertPa
*/ */
int32_t tscMergeKVPayLoadSqlObj(SArray* statements, SSqlObj **result) { int32_t tscMergeKVPayLoadSqlObj(SArray* statements, SSqlObj **result) {
// statement array is empty. // statement array is empty.
if (statements == NULL || taosArrayGetSize(statements) == 0) { if (!statements || !taosArrayGetSize(statements)) {
return TSDB_CODE_TSC_INVALID_OPERATION; return TSDB_CODE_TSC_INVALID_OPERATION;
} }
// a.k.a SHashObj<int64_t, STableDataBlocks*>, the key value represents vgroup id. // a.k.a SHashObj<int64_t, STableDataBlocks*>, the key value represents vgroup id.
SHashObj* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false); SHashObj* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
if (pVnodeDataBlockHashList == NULL) { if (!pVnodeDataBlockHashList) {
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
...@@ -2307,7 +2307,7 @@ int32_t tscMergeKVPayLoadSqlObj(SArray* statements, SSqlObj **result) { ...@@ -2307,7 +2307,7 @@ int32_t tscMergeKVPayLoadSqlObj(SArray* statements, SSqlObj **result) {
// copy the data into vgroup data blocks. // copy the data into vgroup data blocks.
memcpy(dataBuf->pData + dataBuf->size, tableBlock->pData + tableBlock->headerSize, tableBlock->size - tableBlock->headerSize); memcpy(dataBuf->pData + dataBuf->size, tableBlock->pData + tableBlock->headerSize, tableBlock->size - tableBlock->headerSize);
dataBuf->size += tableBlock->size - tableBlock->headerSize; dataBuf->size += tableBlock->size - tableBlock->headerSize;
dataBuf->numOfTables += 1; dataBuf->numOfTables += tableBlock->numOfTables;
tscDestroyDataBlock(pSql, tableBlock, false); tscDestroyDataBlock(pSql, tableBlock, false);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册