提交 1a3154df 编写于 作者: Z zhihaop

feat: add tao_query_a autobatch support, which huge improves the insertion...

feat: add tao_query_a autobatch support, which huge improves the insertion performance of the small sql object
上级 2742bf29
......@@ -149,6 +149,7 @@ void* tscDestroyUdfArrayList(SArray* pUdfList);
void* tscDestroyBlockHashTable(SSqlObj* pSql, SHashObj* pBlockHashTable, bool removeMeta);
int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock);
int32_t tscMergeKVPayLoadSqlObj(SArray* statements, SSqlObj **result);
int32_t tscMergeTableDataBlocks(SSqlObj *pSql, SInsertStatementParam *pInsertParam, bool freeBlockMap);
int32_t tscGetDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, int32_t startOffset, int32_t rowSize, SName* pName, STableMeta* pTableMeta,
STableDataBlocks** dataBlocks, SArray* pBlockList);
......
......@@ -256,7 +256,7 @@ typedef struct SInsertStatementParam {
int32_t batchSize; // for parameter ('?') binding and batch processing
int32_t numOfParams;
int32_t numOfRows;
int32_t numOfFiles;
char msg[512]; // error message
......@@ -399,9 +399,10 @@ typedef struct SSqlObj {
struct SSqlObj *prev, *next;
int64_t self;
// connect alive
int64_t lastAlive;
void * pPrevContext;
bool enableBatch;
} SSqlObj;
typedef struct SSqlStream {
......@@ -496,12 +497,28 @@ void tscCloseTscObj(void *pObj);
TAOS *taos_connect_a(char *ip, char *user, char *pass, char *db, uint16_t port, void (*fp)(void *, TAOS_RES *, int),
void *param, TAOS **taos);
TAOS_RES* taos_query_h(TAOS* taos, const char *sqlstr, int64_t* res);
TAOS_RES * taos_query_ra(TAOS *taos, const char *sqlstr, __async_cb_func_t fp, void *param);
TAOS_RES * taos_query_ra(TAOS *taos, const char *sqlstr, __async_cb_func_t fp, void *param, bool enableBatch);
// get taos connection unused session number
int32_t taos_unused_session(TAOS* taos);
void waitForQueryRsp(void *param, TAOS_RES *tres, int code);
/**
* The initialization of async insertion auto batch feature.
*
* @param batchLen When user submit an insert statement to `taos_query_ra`, the statement will be buffered
* asynchronously in the execution queue instead of executing it. If the number of the buffered
* statements reach batchLen, all the statements in the queue will be merged and sent to vnodes.
* @param timeout The statements will be sent to vnodes no more than timeout milliseconds. But the actual time
* vnodes received the statements depends on the network quality.
*/
void tscInitAsyncDispatcher(int32_t batchLen, int64_t timeout);
/**
* Destroy the async auto batch dispatcher.
*/
void tscDestroyAsyncDispatcher();
void doAsyncQuery(STscObj *pObj, SSqlObj *pSql, __async_cb_func_t fp, void *param, const char *sqlstr, size_t sqlLen);
void tscImportDataFromFile(SSqlObj *pSql);
......
......@@ -13,16 +13,18 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <stdatomic.h>
#include "os.h"
#include "tutil.h"
#include "qTableMeta.h"
#include "tnote.h"
#include "tqueue.h"
#include "trpc.h"
#include "tscLog.h"
#include "tscSubquery.h"
#include "tscUtil.h"
#include "tsched.h"
#include "qTableMeta.h"
#include "tsclient.h"
static void tscAsyncQueryRowsForNextVnode(void *param, TAOS_RES *tres, int numOfRows);
......@@ -34,6 +36,387 @@ static void tscAsyncQueryRowsForNextVnode(void *param, TAOS_RES *tres, int numOf
*/
static void tscAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOfRows);
// The async auto batch feature.
static bool asyncBatchEnable;
// The queue store the async insertion statements
static taos_queue insertionQueue;
// The number of statements in the insertion queue.
static int currentBatchLen;
// The maximum auto batch len.
static int asyncBatchLen;
// The batch timeout in milliseconds.
static int64_t asyncBatchTimeout;
// The state of the insertion queue. While executing timeout task, the queue will set exclusive for writing
// in order to make sure the statements will be sent to vnodes no more than `timeout` milliseconds.
static int exclusiveState;
// The background thread to manage statement auto batch timeout.
static pthread_t background;
/**
* Return the error result to the callback function, and release the sql object.
*
* @param pSql the sql object.
* @param code the error code of the error result.
*/
static void tscReturnsError(SSqlObj* pSql, int code) {
if (pSql == NULL) {
return;
}
pSql->res.code = code;
tscAsyncResultOnError(pSql);
}
/**
* Represents the callback function and its context.
*/
typedef struct {
__async_cb_func_t fp;
void *param;
} Runnable;
/**
* The context of `tscMergedStatementsCallBack`.
*/
typedef struct {
size_t count;
Runnable runnable[];
} BatchCallBackContext;
/**
* Proxy function to perform sequentially insert operation.
*
* @param param the context of `tscMergedStatementsCallBack`.
* @param tres the result object.
* @param code the error code.
*/
static void tscMergedStatementsCallBack(void *param, TAOS_RES *tres, int32_t code) {
BatchCallBackContext* context = param;
SSqlObj* res = tres;
// handle corner case [context == null].
if (context == NULL) {
tscError("context in `tscMergedStatementsCallBack` is null, which should not happen");
if (tres) {
taosReleaseRef(tscObjRef, res->self);
}
return;
}
// handle corner case [res == null].
if (res == NULL) {
tscError("tres in `tscMergedStatementsCallBack` is null, which should not happen");
free(context);
return;
}
// handle results.
tscDebug("async batch result callback, number of item: %zu", context->count);
for (int i = 0; i < context->count ; ++i) {
// the result object is shared by many sql objects.
// therefore, we need to increase the ref count.
taosAcquireRef(tscObjRef, res->self);
Runnable* runnable = &context->runnable[i];
runnable->fp(runnable->param, res, res == NULL ? code : taos_errno(res));
}
taosReleaseRef(tscObjRef, res->self);
free(param);
}
/**
* Merge the statements into single SSqlObj.
*
* @param fp the callback of SSqlObj.
* @param param the parameters of the callback.
* @param statements the sql statements represents in SArray<SSqlObj*>.
* @return the merged SSqlObj.
*/
static int32_t tscMergeStatements(SArray* statements, SSqlObj** result) {
if (statements == NULL) {
return TSDB_CODE_SUCCESS;
}
size_t count = taosArrayGetSize(statements);
if (count == 0) {
return TSDB_CODE_SUCCESS;
}
// create the callback context.
BatchCallBackContext* context = calloc(1, sizeof(BatchCallBackContext) + count * sizeof(Runnable));
if (context == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
tscDebug("create batch call back context: %p", context);
// initialize the callback context.
context->count = count;
for (size_t i = 0; i < count; ++i) {
SSqlObj* statement = *((SSqlObj ** )taosArrayGet(statements, i));
Runnable * callback = &context->runnable[i];
callback->fp = statement->fp;
callback->param = statement->param;
}
// merge the statements into single one.
tscDebug("start to merge %zu sql objs", count);
int32_t code = tscMergeKVPayLoadSqlObj(statements, result);
if (code != TSDB_CODE_SUCCESS) {
const char* msg = tstrerror(code);
tscDebug("failed to merge sql objects: %s", msg);
free(context);
} else {
// set the merged sql object callback.
(*result)->fp = tscMergedStatementsCallBack;
(*result)->fetchFp = (*result)->fp;
(*result)->param = context;
}
return code;
}
/**
* Fetch all the statements in the insertion queue, clean the insertion queue, and sent the statements to the vnodes.
*/
static void tscPollThenSendAsyncQueue() {
// get the number of the items in the queue.
int sizeOfQueue = taosGetQueueItemsNumber(insertionQueue);
if (sizeOfQueue == 0) {
return;
}
int32_t code = TSDB_CODE_SUCCESS;
SArray* statements = taosArrayInit(0, sizeof(SSqlObj *));
// out of memory.
if (statements == NULL) {
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto cleanup;
}
// get the sql statements from the queue.
for (int i = 0; i < sizeOfQueue; ++i) {
// get a queue node from the queue.
int type;
void* node;
if (!taosReadQitem(insertionQueue, &type, &node)) {
break;
}
// get the SSqlObj* from the queue node.
SSqlObj* item = *((SSqlObj **) node);
taosFreeQitem(node);
atomic_fetch_sub(&currentBatchLen, item->cmd.insertParam.numOfRows);
// out of memory.
if (!taosArrayPush(statements, &item)) {
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
tscReturnsError(item, code);
goto cleanup;
}
}
// no item in the queue (items has been taken by other threads).
if (taosArrayGetSize(statements) == 0) {
goto cleanup;
}
// merge the statements into single one.
SSqlObj* merged = NULL;
code = tscMergeStatements(statements, &merged);
if (code == TSDB_CODE_SUCCESS) {
tscDebug("merging %zu sql objs into %p", taosArrayGetSize(statements), merged);
tscHandleMultivnodeInsert(merged);
taosArrayDestroy(&statements);
return;
}
cleanup:
if (code != TSDB_CODE_SUCCESS) {
tscError("send async batch sql obj failed, reason: %s", tstrerror(code));
}
// handling the failures.
if (statements) {
for (int i = 0; i < taosArrayGetSize(statements); ++i) {
SSqlObj* item = *((SSqlObj **)taosArrayGet(statements, i));
tscReturnsError(item, code);
}
taosArrayDestroy(&statements);
}
}
/**
* The background thread to manage statement batch timeout.
*/
static void* tscAsyncBackGroundThread(void* args) {
const int64_t timeoutUs = asyncBatchTimeout * 1000L;
setThreadName("tscBackground");
while (atomic_load(&asyncBatchEnable)) {
// set the exclusive state.
atomic_fetch_or(&exclusiveState, 0x1);
int64_t t0 = taosGetTimestampNs();
tscPollThenSendAsyncQueue();
int64_t t1 = taosGetTimestampNs();
// unset the exclusive state.
atomic_fetch_and(&exclusiveState, ~0x1);
int64_t durationUs = (t1 - t0) / 1000L;
// Similar to scheduleAtFixedRate in Java, if the execution time of `tscPollThenSendAsyncQueue` exceed
// `asyncBatchTimeout` milliseconds, then there will be no sleep.
if (durationUs < timeoutUs) {
usleep(timeoutUs - durationUs);
}
}
return args;
}
/**
* The initialization of async insertion auto batch feature.
*
* @param batchLen When user submit an insert statement to `taos_query_ra`, the statement will be buffered
* asynchronously in the execution queue instead of executing it. If the number of the buffered
* statements reach batchLen, all the statements in the queue will be merged and sent to vnodes.
* @param timeout The statements will be sent to vnodes no more than timeout milliseconds. But the actual time
* vnodes received the statements depends on the network quality.
*/
void tscInitAsyncDispatcher(int32_t batchLen, int64_t timeout) {
atomic_init(&asyncBatchEnable, true);
asyncBatchLen = batchLen;
asyncBatchTimeout = timeout;
// init the queue
insertionQueue = taosOpenQueue();
if (insertionQueue == NULL) {
atomic_store(&asyncBatchEnable, false);
return;
}
// init the state.
atomic_init(&exclusiveState, 0);
atomic_init(&currentBatchLen, 0);
// init background thread.
if (pthread_create(&background, NULL, tscAsyncBackGroundThread, NULL)) {
atomic_store(&asyncBatchEnable, false);
taosCloseQueue(insertionQueue);
return;
}
}
/**
* Destroy the async auto batch dispatcher.
*/
void tscDestroyAsyncDispatcher() {
atomic_init(&asyncBatchEnable, false);
// poll and send all the statements in the queue.
while (taosGetQueueItemsNumber(insertionQueue) != 0) {
tscPollThenSendAsyncQueue();
}
// clear the state.
atomic_store(&exclusiveState, 0);
// make sure the thread exit.
pthread_join(background, NULL);
// destroy the queue.
taosCloseQueue(insertionQueue);
}
/**
* Check if the current sql object supports auto batch.
* 1. auto batch feature on the sql object must be enabled.
* 2. must be an `insert into ... value ...` statement.
* 3. the payload type must be kv payload.
*
* @param pSql the sql object to check.
* @return returns true if the sql object supports auto batch.
*/
bool tscSupportAutoBatch(SSqlObj* pSql) {
if (pSql == NULL || !pSql->enableBatch) {
return false;
}
SSqlCmd* pCmd = &pSql->cmd;
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd);
// only support insert statement.
if (!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_INSERT)) {
return false;
}
SInsertStatementParam* pInsertParam = &pCmd->insertParam;
// file insert not support.
if (TSDB_QUERY_HAS_TYPE(pInsertParam->insertType, TSDB_QUERY_TYPE_FILE_INSERT)) {
return false;
}
// only support kv payload.
if (pInsertParam->payloadType != PAYLOAD_TYPE_KV) {
return false;
}
return true;
}
/**
* Try to offer the insert statement to the queue. If the number of row reach `asyncBatchSize`, the function
* will merge the statements in the queue and send them to the vnodes.
*
* @param pSql the insert statement to offer.
* @return if offer success, returns true.
*/
bool tscTryOfferInsertStatements(SSqlObj* pSql) {
if (!atomic_load(&asyncBatchEnable)) {
return false;
}
// the sql object doesn't support auto batch.
if (!tscSupportAutoBatch(pSql)) {
return false;
}
// the queue is full or reach batch size.
if (atomic_load(&currentBatchLen) >= asyncBatchLen) {
return false;
}
// the queue is exclusive for writing.
if (atomic_load(&exclusiveState) & 0x1) {
return false;
}
// allocate the queue node.
void* node = taosAllocateQitem(sizeof(SSqlObj *));
if (node == NULL) {
return false;
}
// offer the node to the queue.
memcpy(node, &pSql, sizeof(SSqlObj *));
taosWriteQitem(insertionQueue, 0, node);
tscDebug("sql obj %p has been write to insert queue", pSql);
// reach the batch size.
int numsOfRows = pSql->cmd.insertParam.numOfRows;
int batchLen = atomic_fetch_add(&currentBatchLen, numsOfRows) + numsOfRows;
if (batchLen >= asyncBatchLen) {
tscPollThenSendAsyncQueue();
}
return true;
}
void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, __async_cb_func_t fp, void* param, const char* sqlstr, size_t sqlLen) {
SSqlCmd* pCmd = &pSql->cmd;
......@@ -64,7 +447,6 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, __async_cb_func_t fp, void* para
pCmd->resColumnId = TSDB_RES_COL_ID;
taosAcquireRef(tscObjRef, pSql->self);
int32_t code = tsParseSql(pSql, true);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
......@@ -73,23 +455,28 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, __async_cb_func_t fp, void* para
}
if (code != TSDB_CODE_SUCCESS) {
pSql->res.code = code;
tscAsyncResultOnError(pSql);
tscReturnsError(pSql, code);
taosReleaseRef(tscObjRef, pSql->self);
return;
}
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
if (tscTryOfferInsertStatements(pSql)) {
taosReleaseRef(tscObjRef, pSql->self);
tscDebug("sql obj %p has been buffer in insert queue", pSql);
return;
}
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd);
executeQuery(pSql, pQueryInfo);
taosReleaseRef(tscObjRef, pSql->self);
}
// TODO return the correct error code to client in tscQueueAsyncError
void taos_query_a(TAOS *taos, const char *sqlstr, __async_cb_func_t fp, void *param) {
taos_query_ra(taos, sqlstr, fp, param);
taos_query_ra(taos, sqlstr, fp, param, true);
}
TAOS_RES * taos_query_ra(TAOS *taos, const char *sqlstr, __async_cb_func_t fp, void *param) {
TAOS_RES * taos_query_ra(TAOS *taos, const char *sqlstr, __async_cb_func_t fp, void *param, bool enableBatch) {
STscObj *pObj = (STscObj *)taos;
if (pObj == NULL || pObj->signature != pObj) {
tscError("pObj:%p is NULL or freed", pObj);
......@@ -115,6 +502,8 @@ TAOS_RES * taos_query_ra(TAOS *taos, const char *sqlstr, __async_cb_func_t fp, v
return NULL;
}
pSql->enableBatch = enableBatch;
doAsyncQuery(pObj, pSql, fp, param, sqlstr, sqlLen);
return pSql;
......
......@@ -1569,7 +1569,7 @@ int tsParseInsertSql(SSqlObj *pSql) {
if (pInsertParam->numOfParams > 0) {
goto _clean;
}
// merge according to vgId
if (!TSDB_QUERY_HAS_TYPE(pInsertParam->insertType, TSDB_QUERY_TYPE_STMT_INSERT) && taosHashGetSize(pInsertParam->pTableBlockHashList) > 0) {
if ((code = tscMergeTableDataBlocks(pSql, pInsertParam, true)) != TSDB_CODE_SUCCESS) {
......@@ -1577,6 +1577,7 @@ int tsParseInsertSql(SSqlObj *pSql) {
}
}
pCmd->insertParam.numOfRows = totalNum;
code = TSDB_CODE_SUCCESS;
goto _clean;
......
......@@ -341,7 +341,7 @@ bool sqlBufSend(TAOS *taos, char *sqlBuf) {
} while(++sleepCnt < 20);
strcat(sqlBuf, ";");
taos_query_ra(taos, sqlBuf, cbSendValues, NULL);
taos_query_ra(taos, sqlBuf, cbSendValues, NULL, false);
return true;
}
......
......@@ -215,6 +215,10 @@ void taos_init_imp(void) {
#endif
tscDebug("starting to initialize client ...");
tscDebug("Local End Point is:%s", tsLocalEp);
if (tsAsyncBatchEnable) {
tscInitAsyncDispatcher(tsAsyncBatchLen, tsAsyncBatchTimeout);
}
}
taosSetCoreDump();
......@@ -279,6 +283,9 @@ void taos_cleanup(void) {
#ifdef LUA_EMBEDDED
scriptEnvPoolCleanup();
#endif
if (tsAsyncBatchEnable) {
tscDestroyAsyncDispatcher();
}
}
int32_t id = tscObjRef;
......
......@@ -2153,7 +2153,7 @@ static int32_t getRowExpandSize(STableMeta* pTableMeta) {
return result;
}
static void extractTableNameList(SSqlObj *pSql, SInsertStatementParam *pInsertParam, bool freeBlockMap) {
static void extractTableNameList(SSqlObj *pSql, SInsertStatementParam *pInsertParam) {
pInsertParam->numOfTables = (int32_t) taosHashGetSize(pInsertParam->pTableBlockHashList);
if (pInsertParam->pTableNameList == NULL) {
pInsertParam->pTableNameList = malloc(pInsertParam->numOfTables * POINTER_BYTES);
......@@ -2168,10 +2168,113 @@ static void extractTableNameList(SSqlObj *pSql, SInsertStatementParam *pInsertPa
pInsertParam->pTableNameList[i++] = tNameDup(&pBlocks->tableName);
p1 = taosHashIterate(pInsertParam->pTableBlockHashList, p1);
}
}
if (freeBlockMap) {
pInsertParam->pTableBlockHashList = tscDestroyBlockHashTable(pSql, pInsertParam->pTableBlockHashList, false);
/**
* Merge the KV-PayLoad SQL objects into single one. (the statements here must be an insertion statement).
*
* @param statements the array of statements. a.k.a SArray<SSqlObj*>.
* @param result the returned result. result is not null!
* @return the status code. usually TSDB_CODE_SUCCESS.
*/
int32_t tscMergeKVPayLoadSqlObj(SArray* statements, SSqlObj **result) {
// statement array is empty.
if (statements == NULL || taosArrayGetSize(statements) == 0) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
// a.k.a SHashObj<int64_t, STableDataBlocks*>, the key value represents vgroup id.
SHashObj* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
if (pVnodeDataBlockHashList == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
// let the first statement in the array to be the merged result.
SSqlObj* merged = *((SSqlObj**) taosArrayGet(statements, 0));
SSqlCmd* pMergeCmd = &merged->cmd;
SInsertStatementParam* pMergeInsertParam = &pMergeCmd->insertParam;
SArray* pMergeDataBlocks = pMergeInsertParam->pDataBlocks;
// initialize the `pVnodeDataBlockHashList`.
assert(pMergeInsertParam->payloadType == PAYLOAD_TYPE_KV);
for (int i = 0; i < taosArrayGetSize(pMergeInsertParam->pDataBlocks); ++i) {
STableDataBlocks *pDataBlocks = *((STableDataBlocks** )taosArrayGet(pMergeInsertParam->pDataBlocks, i));
if (taosHashPut(pVnodeDataBlockHashList, &pDataBlocks->vgId, sizeof(pDataBlocks->vgId), &pDataBlocks, sizeof(STableDataBlocks *))) {
taosHashCleanup(pVnodeDataBlockHashList);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
}
// merge sql obj statements[i] into sql obj `merged`.
for (int i = 1; i < taosArrayGetSize(statements); ++i) {
SSqlObj *pSql = *((SSqlObj**) taosArrayGet(statements, i));
SSqlCmd *pCmd = &pSql->cmd;
SInsertStatementParam* pInsertParam = &pCmd->insertParam;
assert(pInsertParam->payloadType == PAYLOAD_TYPE_KV);
// merge all the data blocks by vgroup id.
for (int j = 0; pInsertParam->pDataBlocks && j < taosArrayGetSize(pInsertParam->pDataBlocks); ++j) {
STableDataBlocks * tableBlock = *((STableDataBlocks **) taosArrayGet(pInsertParam->pDataBlocks, j));
SSubmitBlk *pBlocks = (SSubmitBlk *)tableBlock->pData;
// skip the empty data block.
if (pBlocks->numOfRows <= 0) {
tscDebug("0x%" PRIx64 " table %s data block is empty", pInsertParam->objectId, tableBlock->tableName.tname);
continue;
}
// get the data blocks of vgroup id.
STableDataBlocks *dataBuf = NULL;
STableDataBlocks** iter = taosHashGet(pVnodeDataBlockHashList, &tableBlock->vgId, sizeof(tableBlock->vgId));
if (iter == NULL) {
dataBuf = tableBlock;
if (taosHashPut(pVnodeDataBlockHashList, &tableBlock->vgId, sizeof(tableBlock->vgId), &dataBuf, sizeof(STableDataBlocks *))) {
taosHashCleanup(pVnodeDataBlockHashList);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
if (!taosArrayPush(pMergeDataBlocks, &dataBuf)) {
taosHashCleanup(pVnodeDataBlockHashList);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
} else {
dataBuf = *iter;
}
// the allocated size is too small.
int64_t destSize = dataBuf->size + (tableBlock->size - tableBlock->headerSize);
if (dataBuf->nAllocSize < destSize) {
dataBuf->nAllocSize = (uint32_t)(destSize * 1.5);
char *tmp = realloc(dataBuf->pData, dataBuf->nAllocSize);
if (tmp != NULL) {
dataBuf->pData = tmp;
} else { // failed to allocate memory, free already allocated memory and return error code
tscError("0x%" PRIx64 " failed to allocate memory for merging submit block, size:%d", pInsertParam->objectId,
dataBuf->nAllocSize);
taosHashCleanup(pVnodeDataBlockHashList);
tfree(dataBuf->pData);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
}
// copy the data into vgroup data blocks.
memcpy(dataBuf->pData + dataBuf->size, tableBlock->pData + tableBlock->headerSize, tableBlock->size - tableBlock->headerSize);
dataBuf->size += tableBlock->size - tableBlock->headerSize;
dataBuf->numOfTables += 1;
tscDestroyDataBlock(pSql, tableBlock, false);
}
// free the data blocks and sql objs. (because it is no longer needed).
taosArrayDestroy(&pInsertParam->pDataBlocks);
taosReleaseRef(tscObjRef, pSql->self);
}
// clean up.
taosHashCleanup(pVnodeDataBlockHashList);
*result = merged;
return TSDB_CODE_SUCCESS;
}
int32_t tscMergeTableDataBlocks(SSqlObj *pSql, SInsertStatementParam *pInsertParam, bool freeBlockMap) {
......@@ -2283,7 +2386,11 @@ int32_t tscMergeTableDataBlocks(SSqlObj *pSql, SInsertStatementParam *pInsertPar
pOneTableBlock = *p;
}
extractTableNameList(pSql, pInsertParam, freeBlockMap);
extractTableNameList(pSql, pInsertParam);
if (freeBlockMap) {
pInsertParam->pTableBlockHashList = tscDestroyBlockHashTable(pSql, pInsertParam->pTableBlockHashList, false);
}
// free the table data blocks;
pInsertParam->pDataBlocks = pVnodeDataBlockList;
......
......@@ -90,6 +90,9 @@ extern int32_t tsRetryStreamCompDelay;
extern float tsStreamComputDelayRatio; // the delayed computing ration of the whole time window
extern int32_t tsProjectExecInterval;
extern int64_t tsMaxRetentWindow;
extern bool tsAsyncBatchEnable;
extern int32_t tsAsyncBatchLen;
extern int64_t tsAsyncBatchTimeout;
// db parameters in client
extern int32_t tsCacheBlockSize;
......
......@@ -121,6 +121,16 @@ float tsStreamComputDelayRatio = 0.1f;
int32_t tsProjectExecInterval = 10000; // every 10sec, the projection will be executed once
int64_t tsMaxRetentWindow = 24 * 3600L; // maximum time window tolerance
// The async insertion auto batch feature.
// When user submit an insert statement to `taos_query_ra`, the statement will be buffered asynchronously in the
// execution queue instead of executing it. If the number of the buffered statements reach `tsAsyncBatchLen`, all
// the statements in the queue will be merged and sent to vnodes.
// The statements will be sent to vnodes no more than `tsAsyncBatchTimeout` milliseconds. But the actual time vnodes
// received the statements depends on the network quality.
bool tsAsyncBatchEnable = true;
int32_t tsAsyncBatchLen = 128;
int64_t tsAsyncBatchTimeout = 50;
// the maximum allowed query buffer size during query processing for each data node.
// -1 no limit (default)
// 0 no query allowed, queries are disabled
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册