提交 b0e06019 编写于 作者: Z zhihaop

feat, doc: improve the performance of sqlobjs merge, update docs of ABWD

上级 58392c19
......@@ -126,15 +126,14 @@ bool isShutdownSDispatcherTimeoutManager(SDispatcherTimeoutManager* manager);
void shutdownSDispatcherTimeoutManager(SDispatcherTimeoutManager* manager);
/**
* Merge the statements into single SSqlObj.
* Merge SSqlObjs into single SSqlObj.
*
* @param fp the callback of SSqlObj.
* @param param the parameters of the callback.
* @param polls the array of SSqlObj*.
* @param nPolls the number of SSqlObj* in the array.
* @param batch the merged SSqlObj*.
* @return the merged SSqlObj.
*/
int32_t dispatcherBatchMerge(SSqlObj** polls, size_t nPolls, SSqlObj** result);
int32_t dispatcherBatchBuilder(SSqlObj** polls, size_t nPolls, SSqlObj** batch);
/**
* Merge the sql statements and execute the merged sql statement.
......@@ -147,11 +146,10 @@ void dispatcherExecute(SSqlObj** polls, size_t nPolls);
/**
* Create the async batch write dispatcher.
*
* @param batchSize When user submit an insert statement to `taos_query_ra`, the statement will be buffered
* asynchronously in the buffer instead of executing it. If the number of the buffered
* statements reach batchLen, all the statements in the buffer will be merged and sent to vnodes.
* @param timeout The statements will be sent to vnodes no more than timeout milliseconds. But the actual time
* vnodes received the statements depends on the network quality.
* @param batchSize When user submit an insert sql to `taos_query_a`, the SSqlObj* will be buffered instead of executing
* it. If the number of the buffered rows reach `batchSize`, all the SSqlObj* will be merged and sent to vnodes.
* @param timeout The SSqlObj* will be sent to vnodes no more than `timeout` milliseconds. But the actual time
* vnodes received the SSqlObj* depends on the network quality.
*/
SAsyncBatchWriteDispatcher* createSAsyncBatchWriteDispatcher(int32_t batchSize, int32_t timeoutMs);
......
......@@ -144,6 +144,8 @@ void doRetrieveSubqueryData(SSchedMsg *pMsg);
SParamInfo* tscAddParamToDataBlock(STableDataBlocks* pDataBlock, char type, uint8_t timePrec, int16_t bytes,
uint32_t offset);
void destroySTableDataBlocksList(SArray* pDataBlocks);
void destroySTableDataBlocks(STableDataBlocks* pDataBlocks);
void* tscDestroyBlockArrayList(SSqlObj* pSql, SArray* pDataBlockList);
void* tscDestroyUdfArrayList(SArray* pUdfList);
void* tscDestroyBlockHashTable(SSqlObj* pSql, SHashObj* pBlockHashTable, bool removeMeta);
......
......@@ -171,13 +171,14 @@ size_t writeSSubmitBlkBuilder(SSubmitBlkBuilder* builder, SSubmitBlk* target, si
taosArraySort(builder->rows, compareSMemRow);
// deep copy all the SMemRow to target.
for (int i = 0; i < taosArrayGetSize(builder->rows); ++i) {
size_t nMemRows = taosArrayGetSize(builder->rows);
for (int i = 0; i < nMemRows; ++i) {
char* pRow = taosArrayGetP(builder->rows, i);
memcpy(POINTER_SHIFT(target->data, dataLen), pRow, memRowTLen(pRow));
dataLen += memRowTLen(pRow);
}
*nRows = taosArrayGetSize(builder->rows);
*nRows = nMemRows;
target->schemaLen = 0;
target->dataLen = (int32_t) htonl(dataLen);
......@@ -188,7 +189,8 @@ size_t writeSSubmitBlkBuilder(SSubmitBlkBuilder* builder, SSubmitBlk* target, si
size_t nWriteSSubmitBlkBuilder(SSubmitBlkBuilder* builder) {
size_t dataLen = 0;
for (int i = 0; i < taosArrayGetSize(builder->rows); ++i) {
size_t nRows = taosArrayGetSize(builder->rows);
for (int i = 0; i < nRows; ++i) {
char* pRow = taosArrayGetP(builder->rows, i);
dataLen += memRowTLen(pRow);
}
......@@ -485,8 +487,9 @@ int32_t tscMergeSSqlObjs(SSqlObj** polls, size_t nPolls, SSqlObj* result) {
assert(!pInsertParam->schemaAttached);
// append each vnode data block to the builder.
for (size_t j = 0; j < taosArrayGetSize(pInsertParam->pDataBlocks); ++j) {
STableDataBlocks * tableBlock = taosArrayGetP(pInsertParam->pDataBlocks, j);
size_t nBlocks = taosArrayGetSize(pInsertParam->pDataBlocks);
for (size_t j = 0; j < nBlocks; ++j) {
STableDataBlocks* tableBlock = taosArrayGetP(pInsertParam->pDataBlocks, j);
if (!appendSTableDataBlocksListBuilder(builder, tableBlock)) {
destroySTableDataBlocksListBuilder(builder);
destroySTableNameListBuilder(nameListBuilder);
......
......@@ -104,7 +104,7 @@ static void batchResultCallback(void* param, TAOS_RES* tres, int32_t code) {
free(context);
}
int32_t dispatcherBatchMerge(SSqlObj** polls, size_t nPolls, SSqlObj** result) {
int32_t dispatcherBatchBuilder(SSqlObj** polls, size_t nPolls, SSqlObj** batch) {
if (!polls || !nPolls) {
return TSDB_CODE_SUCCESS;
}
......@@ -124,7 +124,7 @@ int32_t dispatcherBatchMerge(SSqlObj** polls, size_t nPolls, SSqlObj** result) {
context->handler[i].fp = pSql->fp;
context->handler[i].param = pSql->param;
}
// merge the statements into single one.
tscDebug("start to merge %zu sql objs", nPolls);
SSqlObj *pFirst = polls[0];
......@@ -140,7 +140,7 @@ int32_t dispatcherBatchMerge(SSqlObj** polls, size_t nPolls, SSqlObj** result) {
pFirst->fp = batchResultCallback;
pFirst->param = context;
pFirst->fetchFp = pFirst->fp;
*result = pFirst;
*batch = pFirst;
for (int i = 1; i < nPolls; ++i) {
SSqlObj *pSql = polls[i];
......@@ -249,7 +249,7 @@ void dispatcherExecute(SSqlObj** polls, size_t nPolls) {
// merge the statements into single one.
SSqlObj* merged = NULL;
code = dispatcherBatchMerge(polls, nPolls, &merged);
code = dispatcherBatchBuilder(polls, nPolls, &merged);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
......@@ -560,6 +560,7 @@ void shutdownSDispatcherTimeoutManager(SDispatcherTimeoutManager* manager) {
// make sure the timeout thread exit.
pthread_join(manager->background, NULL);
}
bool isShutdownSDispatcherTimeoutManager(SDispatcherTimeoutManager* manager) {
if (!manager) {
return true;
......
......@@ -1837,32 +1837,36 @@ void tscDestroyBoundColumnInfo(SParsedDataColInfo* pColInfo) {
tfree(pColInfo->colIdxInfo);
}
void tscDestroyDataBlock(SSqlObj *pSql, STableDataBlocks* pDataBlock, bool removeMeta) {
if (pDataBlock == NULL) {
void destroySTableDataBlocks(STableDataBlocks* pDataBlocks) {
if (!pDataBlocks) {
return;
}
tfree(pDataBlocks->pData);
if (!pDataBlocks->cloned) {
tfree(pDataBlocks->params);
// free the refcount for metermeta
if (pDataBlocks->pTableMeta != NULL) {
tfree(pDataBlocks->pTableMeta);
}
tfree(pDataBlock->pData);
tscDestroyBoundColumnInfo(&pDataBlocks->boundColumnInfo);
}
tfree(pDataBlocks);
}
void tscDestroyDataBlock(SSqlObj *pSql, STableDataBlocks* pDataBlock, bool removeMeta) {
if (pDataBlock == NULL) {
return;
}
if (removeMeta) {
char name[TSDB_TABLE_FNAME_LEN] = {0};
tNameExtractFullName(&pDataBlock->tableName, name);
taosHashRemove(UTIL_GET_TABLEMETA(pSql), name, strnlen(name, TSDB_TABLE_FNAME_LEN));
}
if (!pDataBlock->cloned) {
tfree(pDataBlock->params);
// free the refcount for metermeta
if (pDataBlock->pTableMeta != NULL) {
tfree(pDataBlock->pTableMeta);
}
tscDestroyBoundColumnInfo(&pDataBlock->boundColumnInfo);
}
tfree(pDataBlock);
destroySTableDataBlocks(pDataBlock);
}
SParamInfo* tscAddParamToDataBlock(STableDataBlocks* pDataBlock, char type, uint8_t timePrec, int16_t bytes,
......@@ -1889,18 +1893,22 @@ SParamInfo* tscAddParamToDataBlock(STableDataBlocks* pDataBlock, char type, uint
return param;
}
void* tscDestroyBlockArrayList(SSqlObj *pSql, SArray* pDataBlockList) {
if (pDataBlockList == NULL) {
return NULL;
void destroySTableDataBlocksList(SArray* pDataBlocks) {
if (!pDataBlocks) {
return;
}
size_t size = taosArrayGetSize(pDataBlockList);
for (int32_t i = 0; i < size; i++) {
void* d = taosArrayGetP(pDataBlockList, i);
tscDestroyDataBlock(pSql, d, false);
size_t nBlocks = taosArrayGetSize(pDataBlocks);
for (size_t i = 0; i < nBlocks; ++i) {
STableDataBlocks * pDataBlock = taosArrayGetP(pDataBlocks, i);
if (pDataBlock) {
destroySTableDataBlocks(pDataBlock);
}
}
taosArrayDestroy(&pDataBlocks);
}
taosArrayDestroy(&pDataBlockList);
void* tscDestroyBlockArrayList(SSqlObj *pSql, SArray* pDataBlockList) {
destroySTableDataBlocksList(pDataBlockList);
return NULL;
}
......
......@@ -127,15 +127,11 @@ int8_t tsSortWhenGroupBy = 1;
int32_t tsProjectExecInterval = 10000; // every 10sec, the projection will be executed once
int64_t tsMaxRetentWindow = 24 * 3600L; // maximum time window tolerance
// The async insertion batching feature.
// When user submit an insert statement to `taos_query_ra`, the statement will be buffered asynchronously instead of executing it.
// If the number of the buffered statements reach `tsAsyncBatchSize`, all the statements in the queue will be merged and sent to vnodes.
// The statements will be sent to vnodes no more than `tsAsyncBatchTimeout` milliseconds. But the actual time vnodes
// received the statements depends on the network quality.
// The taosc async insertion batching feature.
bool tsAsyncBatchEnable = true;
bool tsAsyncBatchThreadLocal = true; // if thread local enable, each thread will allocate a dispatcher.
int32_t tsAsyncBatchSize = 96;
int32_t tsAsyncBatchTimeout = 10;
int32_t tsAsyncBatchSize = 96; // suggest: 64 - 512
int32_t tsAsyncBatchTimeout = 10; // suggest: 5 - 200 (unit: milliseconds)
// the maximum allowed query buffer size during query processing for each data node.
// -1 no limit (default)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册