未验证 提交 c1c07861 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #16866 from zhihaop/improve/taosc-async-enhancement-for-2.6

feat(client): add `taos_query_a` batching support for inserting SQL
build/ build/
.ycm_extra_conf.py .ycm_extra_conf.py
.vscode/ .vscode/
.cache/
compile_commands.json
.idea/ .idea/
cmake-build-debug/ cmake-build-debug/
cmake-build-release/ cmake-build-release/
cmake-build-relwithdebinfo/
cscope.out cscope.out
cscope.files cscope.files
tags tags
......
...@@ -310,3 +310,12 @@ keepColumnName 1 ...@@ -310,3 +310,12 @@ keepColumnName 1
# unit Hour. Latency of data migration # unit Hour. Latency of data migration
# keepTimeOffset 0 # keepTimeOffset 0
# taosc write batch size, maximum 4096, suggested value 64 ~ 512, default 0, 0 means disable write batching.
# writeBatchSize 0
# taosc write batch timeout in milliseconds, maximum 2048, suggested value 2 ~ 100, default 10.
# writeBatchTimeout 10
# using thread local write batching. this option is not available when writeBatchSize = 0.
# writeBatchThreadLocal 0
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_TSCBATCHMERGE_H
#define TDENGINE_TSCBATCHMERGE_H
#include "hash.h"
#include "taosmsg.h"
#include "tarray.h"
#include "tscUtil.h"
#include "tsclient.h"
#ifdef __cplusplus
extern "C" {
#endif
/**
* A builder of SSubmitBlk.
*/
typedef struct SSubmitBlkBuilder {
// the metadata of the SSubmitBlk.
SSubmitBlk* metadata;
// the array stores all the rows in a table, aka SArray<SMemRow>.
SArray* rows;
} SSubmitBlkBuilder;
/**
* The builder to build SSubmitMsg::blocks.
*/
typedef struct SSubmitMsgBlocksBuilder {
// SHashObj<table_uid, SSubmitBlkBuilder*>.
SHashObj* blockBuilders;
int64_t vgId;
} SSubmitMsgBlocksBuilder;
/**
* STableDataBlocksBuilder is a tool to build data blocks by append the existing data blocks in a vnode.
*/
typedef struct STableDataBlocksBuilder {
SSubmitMsgBlocksBuilder* blocksBuilder;
STableDataBlocks* firstBlock;
int64_t vgId;
} STableDataBlocksBuilder;
/**
* STableDataBlocksListBuilder is a tool to build vnode data blocks list by appending exist data blocks.
*/
typedef struct STableDataBlocksListBuilder {
SHashObj* dataBlocksBuilders;
} STableDataBlocksListBuilder;
/**
* A Builder to build SInsertStatementParam::pTableNameList.
*/
typedef struct STableNameListBuilder {
// store the unsorted table names, SArray<SName*>.
SArray* pTableNameList;
} STableNameListBuilder;
/**
* Create a SSubmitBlkBuilder using exist metadata.
*
* @param metadata the metadata.
* @return the SSubmitBlkBuilder.
*/
SSubmitBlkBuilder* createSSubmitBlkBuilder(SSubmitBlk* metadata);
/**
* Destroy the SSubmitBlkBuilder.
*
* @param builder the SSubmitBlkBuilder.
*/
void destroySSubmitBlkBuilder(SSubmitBlkBuilder* builder);
/**
* Append a SSubmitBlk* to the builder. The table uid in pBlock must be the same with the builder's.
*
* @param builder the SSubmitBlkBuilder.
* @param pBlock the pBlock to append.
* @return whether the append is success.
*/
bool appendSSubmitBlkBuilder(SSubmitBlkBuilder* builder, SSubmitBlk *pBlock);
/**
* Build and write SSubmitBlk to `target`
*
* @param builder the SSubmitBlkBuilder.
* @param target the target to write.
* @param nRows the number of rows in SSubmitBlk*.
* @return the writen bytes.
*/
size_t writeSSubmitBlkBuilder(SSubmitBlkBuilder* builder, SSubmitBlk* target, size_t* nRows);
/**
* Get the expected writen bytes of `writeSSubmitBlkBuilder`.
*
* @param builder the SSubmitBlkBuilder.
* @return the expected writen bytes of `writeSSubmitBlkBuilder`.
*/
size_t nWriteSSubmitBlkBuilder(SSubmitBlkBuilder* builder);
/**
* Create a SSubmitMsgBuilder.
*
* @param vgId the vgId of SSubmitMsg.
* @return the SSubmitMsgBuilder.
*/
SSubmitMsgBlocksBuilder* createSSubmitMsgBuilder(int64_t vgId);
/**
* Get the expected writen bytes of `writeSSubmitMsgBlocksBuilder`.
*
* @param builder the SSubmitMsgBlocksBuilder.
* @return the expected writen bytes of `writeSSubmitMsgBlocksBuilder`.
*/
size_t nWriteSSubmitMsgBuilder(SSubmitMsgBlocksBuilder* builder);
/**
* Build and write SSubmitMsg::blocks to `pBlocks`
*
* @param builder the SSubmitBlkBuilder.
* @param pBlocks the target to write.
* @param nRows the number of row in SSubmitMsg::blocks.
* @return the writen bytes.
*/
size_t writeSSubmitMsgBlocksBuilder(SSubmitMsgBlocksBuilder* builder, SSubmitBlk* pBlocks, size_t* nRows);
/**
* Get the number of block in SSubmitMsgBlocksBuilder.
* @param builder the SSubmitMsgBlocksBuilder.
* @return the number of SSubmitBlk block.
*/
size_t nBlockSSubmitMsgBlocksBuilder(SSubmitMsgBlocksBuilder* builder);
/**
* Destroy the SSubmitMsgBlocksBuilder.
*
* @param builder the SSubmitMsgBlocksBuilder to destroy.
*/
void destroySSubmitMsgBuilder(SSubmitMsgBlocksBuilder* builder);
/**
* Append SSubmitMsg* to the SSubmitMsgBlocksBuilder.
*
* @param builder the SSubmitMsgBlocksBuilder.
* @param pBlocks the SSubmitBlk in SSubmitMsg::blocks.
* @param nBlocks the number of blocks in SSubmitMsg.
* @return whether the append is success.
*/
bool appendSSubmitMsgBlocks(SSubmitMsgBlocksBuilder* builder, SSubmitBlk* pBlocks, size_t nBlocks);
/**
* Create the STableDataBlocksBuilder.
*
* @param vgId the vgId of STableDataBlocksBuilder.
* @return the STableDataBlocksBuilder.
*/
STableDataBlocksBuilder* createSTableDataBlocksBuilder(int64_t vgId);
/**
* Destroy the STableDataBlocksBuilder.
* @param builder the STableDataBlocksBuilder.
*/
void destroySTableDataBlocksBuilder(STableDataBlocksBuilder *builder);
/**
* Append a data blocks to STableDataBlocksBuilder.
* @param builder the STableDataBlocksBuilder.
* @param dataBlocks the dataBlocks to append. the vgId of dataBlocks must be same with the STableDataBlocksBuilder.
* @return whether the append is success.
*/
bool appendSTableDataBlocksBuilder(STableDataBlocksBuilder* builder, STableDataBlocks* dataBlocks);
/**
* Build the data blocks for single vnode.
* @param builder the STableDataBlocksBuilder.
* @param nRows the number of row in STableDataBlocks.
* @return the data blocks for single vnode.
*/
STableDataBlocks* buildSTableDataBlocksBuilder(STableDataBlocksBuilder* builder, size_t* nRows);
/**
* Create the STableDataBlocksListBuilder.
*
* @return the STableDataBlocksListBuilder.
*/
STableDataBlocksListBuilder* createSTableDataBlocksListBuilder();
/**
* Destroy the STableDataBlocksListBuilder.
*
* @param builder the STableDataBlocksListBuilder.
*/
void destroySTableDataBlocksListBuilder(STableDataBlocksListBuilder* builder);
/**
* Append a data blocks to STableDataBlocksListBuilder.
*
* @param builder the STableDataBlocksListBuilder.
* @param dataBlocks the data blocks.
* @return whether the append is success.
*/
bool appendSTableDataBlocksListBuilder(STableDataBlocksListBuilder* builder, STableDataBlocks* dataBlocks);
/**
* Build the vnode data blocks list.
*
* @param builder the STableDataBlocksListBuilder.
* @param nTables the number of table in vnode data blocks list.
* @param nRows the number of row in vnode data blocks list.
* @return the vnode data blocks list.
*/
SArray* buildSTableDataBlocksListBuilder(STableDataBlocksListBuilder* builder, size_t* nTables, size_t* nRows);
/**
* Create STableNameListBuilder.
*/
STableNameListBuilder* createSTableNameListBuilder();
/**
* Destroy the STableNameListBuilder.
* @param builder the STableNameListBuilder.
*/
void destroySTableNameListBuilder(STableNameListBuilder* builder);
/**
* Insert a SName to builder.
*
* @param builder the STableNameListBuilder.
* @param name the table name.
* @return whether it is success.
*/
bool insertSTableNameListBuilder(STableNameListBuilder* builder, SName* name);
/**
* Build the STable name list.
*
* @param builder the STableNameListBuilder.
* @param numOfTables the number of table.
* @return the STable name list.
*/
SName** buildSTableNameListBuilder(STableNameListBuilder* builder, size_t* numOfTables);
/**
* Merge the KV-PayLoad SQL objects into single one.
* The statements here must be an insertion statement and no schema attached.
*
* @param polls the array of SSqlObj*.
* @param nPolls the number of SSqlObj* in the array.
* @param result the returned result. result is not null!
* @return the status code.
*/
int32_t tscMergeSSqlObjs(SSqlObj** polls, size_t nPolls, SSqlObj *result);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_TSCBATCHMERGE_H
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_TSCBATCHWRITE_H
#define TDENGINE_TSCBATCHWRITE_H
#ifdef __cplusplus
extern "C" {
#endif
#include "tthread.h"
// forward declaration.
typedef struct STscObj STscObj;
typedef struct SSqlObj SSqlObj;
typedef struct SDispatcherTimeoutManager SDispatcherTimeoutManager;
/**
* SAsyncBatchWriteDispatcher is an async batching write dispatcher (ABWD). ABWD accepts the recent SQL requests and put
* them in a queue waiting to be scheduled. When the number of requests in the queue reaches batch_size, it merges the
* requests in the queue and sends them to the server, thus reducing the network overhead caused by multiple
* communications to the server and directly improving the throughput of small object asynchronous writes.
*/
typedef struct SAsyncBatchWriteDispatcher {
// the client object.
STscObj* pClient;
// the timeout manager.
SDispatcherTimeoutManager* timeoutManager;
// the mutex to protect the dispatcher.
pthread_mutex_t bufferMutex;
// the cond to signal when buffer not full.
pthread_cond_t notFull;
// the maximum number of insertion rows in a batch.
int32_t batchSize;
// the number of insertion rows in the buffer.
int32_t currentSize;
// the number of items in the buffer.
int32_t bufferSize;
// whether the dispatcher is shutdown.
bool shutdown;
SSqlObj* buffer[];
} SAsyncBatchWriteDispatcher;
/**
* The manager of SAsyncBatchWriteDispatcher. Call dispatcherAcquire(...) to get the SAsyncBatchWriteDispatcher
* instance. SDispatcherManager will manage the life cycle of SAsyncBatchWriteDispatcher.
*/
typedef struct SDispatcherManager {
pthread_key_t key;
// the maximum number of insertion rows in a batch.
int32_t batchSize;
// the batching timeout in milliseconds.
int32_t timeoutMs;
// specifies whether the dispatcher is thread local, if the dispatcher is not
// thread local, we will use the global dispatcher below.
bool isThreadLocal;
// the global dispatcher, if thread local enabled, global will be set to NULL.
SAsyncBatchWriteDispatcher* pGlobal;
// the client object.
STscObj* pClient;
} SDispatcherManager;
/**
* Control the timeout of the dispatcher queue.
*/
typedef struct SDispatcherTimeoutManager {
// the dispatcher that timeout manager belongs to.
SAsyncBatchWriteDispatcher* dispatcher;
// the background thread.
pthread_t background;
// the mutex to sleep the background thread.
pthread_mutex_t sleepMutex;
// the cond to signal to background thread.
pthread_cond_t timeout;
// the batching timeout in milliseconds.
int32_t timeoutMs;
// whether the timeout manager is shutdown.
bool shutdown;
} SDispatcherTimeoutManager;
/**
* A batch that polls from SAsyncBatchWriteDispatcher::buffer.
*/
typedef struct SBatchRequest {
size_t nRequests;
SSqlObj* pRequests[];
} SBatchRequest;
/**
* Create the dispatcher timeout manager.
*/
SDispatcherTimeoutManager* createSDispatcherTimeoutManager(SAsyncBatchWriteDispatcher* dispatcher, int32_t timeoutMs);
/**
* Destroy the dispatcher timeout manager.
*/
void destroySDispatcherTimeoutManager(SDispatcherTimeoutManager* manager);
/**
* Check if the timeout manager is shutdown.
* @param manager the timeout manager.
* @return whether the timeout manager is shutdown.
*/
bool isShutdownSDispatcherTimeoutManager(SDispatcherTimeoutManager* manager);
/**
* Shutdown the SDispatcherTimeoutManager.
* @param manager the SDispatcherTimeoutManager.
*/
void shutdownSDispatcherTimeoutManager(SDispatcherTimeoutManager* manager);
/**
* Merge SSqlObjs into single SSqlObj.
*
* @param pRequest the batch request.
* @param batch the batch SSqlObj*.
* @return the status code.
*/
int32_t dispatcherBatchBuilder(SBatchRequest* pRequest, SSqlObj** batch);
/**
* Merge the sql statements and execute the merged sql statement asynchronously.
*
* @param pRequest the batch request. the request will be promised to free after calling this function.
*/
void dispatcherAsyncExecute(SBatchRequest* pRequest);
/**
* Merge the sql statements and execute the merged sql statement.
*
* @param pRequest the batch request. you must call free(pRequest) after calling this function.
*/
void dispatcherExecute(SBatchRequest* pRequest);
/**
* Create the async batch write dispatcher.
*
* @param pClient the client object.
* @param batchSize When user submit an insert sql to `taos_query_a`, the SSqlObj* will be buffered instead of executing
* it. If the number of the buffered rows reach `batchSize`, all the SSqlObj* will be merged and sent to vnodes.
* @param timeout The SSqlObj* will be sent to vnodes no more than `timeout` milliseconds. But the actual time
* vnodes received the SSqlObj* depends on the network quality.
*/
SAsyncBatchWriteDispatcher* createSAsyncBatchWriteDispatcher(STscObj* pClient, int32_t batchSize, int32_t timeoutMs);
/**
* Destroy the async auto batch dispatcher.
*/
void destroySAsyncBatchWriteDispatcher(SAsyncBatchWriteDispatcher* dispatcher);
/**
* Check if the current sql object can be dispatch by ABWD.
* 1. auto batch feature on the sql object must be enabled.
* 2. must be an `insert into ... value ...` statement.
* 3. the payload type must be kv payload.
* 4. no schema attached.
*
* @param dispatcher the dispatcher.
* @param pSql the sql object to check.
* @return returns true if the sql object can be dispatch by ABWD.
*/
bool dispatcherCanDispatch(SAsyncBatchWriteDispatcher* dispatcher, SSqlObj* pSql);
/**
* Try to offer the SSqlObj* to the dispatcher. If the number of row reach `batchSize`, the function
* will merge the SSqlObj* in the buffer and send them to the vnodes.
*
* @param pSql the insert statement to offer.
* @return if offer success, returns true.
*/
bool dispatcherTryDispatch(SAsyncBatchWriteDispatcher* dispatcher, SSqlObj* pSql);
/**
* Create the manager of SAsyncBatchWriteDispatcher.
*
* @param pClient the client object.
* @param batchSize the batchSize of SAsyncBatchWriteDispatcher.
* @param timeoutMs the timeoutMs of SAsyncBatchWriteDispatcher.
* @param isThreadLocal specifies whether the dispatcher is thread local.
* @return the SAsyncBatchWriteDispatcher manager.
*/
SDispatcherManager* createDispatcherManager(STscObj* pClient, int32_t batchSize, int32_t timeoutMs, bool isThreadLocal);
/**
* Destroy the SDispatcherManager.
* (will destroy all the instances of SAsyncBatchWriteDispatcher in the thread local variable)
*
* @param manager the SDispatcherManager.
*/
void destroyDispatcherManager(SDispatcherManager* manager);
/**
* Get an instance of SAsyncBatchWriteDispatcher.
*
* @param manager the SDispatcherManager.
* @return the SAsyncBatchWriteDispatcher instance.
*/
SAsyncBatchWriteDispatcher* dispatcherAcquire(SDispatcherManager* manager);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_TSCBATCHWRITE_H
...@@ -144,6 +144,8 @@ void doRetrieveSubqueryData(SSchedMsg *pMsg); ...@@ -144,6 +144,8 @@ void doRetrieveSubqueryData(SSchedMsg *pMsg);
SParamInfo* tscAddParamToDataBlock(STableDataBlocks* pDataBlock, char type, uint8_t timePrec, int16_t bytes, SParamInfo* tscAddParamToDataBlock(STableDataBlocks* pDataBlock, char type, uint8_t timePrec, int16_t bytes,
uint32_t offset); uint32_t offset);
void destroySTableDataBlocksList(SArray* pDataBlocks);
void destroySTableDataBlocks(STableDataBlocks* pDataBlocks);
void* tscDestroyBlockArrayList(SSqlObj* pSql, SArray* pDataBlockList); void* tscDestroyBlockArrayList(SSqlObj* pSql, SArray* pDataBlockList);
void* tscDestroyUdfArrayList(SArray* pUdfList); void* tscDestroyUdfArrayList(SArray* pUdfList);
void* tscDestroyBlockHashTable(SSqlObj* pSql, SHashObj* pBlockHashTable, bool removeMeta); void* tscDestroyBlockHashTable(SSqlObj* pSql, SHashObj* pBlockHashTable, bool removeMeta);
......
...@@ -45,6 +45,7 @@ typedef enum { ...@@ -45,6 +45,7 @@ typedef enum {
// forward declaration // forward declaration
struct SSqlInfo; struct SSqlInfo;
typedef struct SDispatcherManager SDispatcherManager;
typedef void (*__async_cb_func_t)(void *param, TAOS_RES *tres, int32_t numOfRows); typedef void (*__async_cb_func_t)(void *param, TAOS_RES *tres, int32_t numOfRows);
typedef void (*_freeSqlSupporter)(void **); typedef void (*_freeSqlSupporter)(void **);
...@@ -256,7 +257,7 @@ typedef struct SInsertStatementParam { ...@@ -256,7 +257,7 @@ typedef struct SInsertStatementParam {
int32_t batchSize; // for parameter ('?') binding and batch processing int32_t batchSize; // for parameter ('?') binding and batch processing
int32_t numOfParams; int32_t numOfParams;
int32_t numOfRows;
int32_t numOfFiles; int32_t numOfFiles;
char msg[512]; // error message char msg[512]; // error message
...@@ -352,7 +353,8 @@ typedef struct STscObj { ...@@ -352,7 +353,8 @@ typedef struct STscObj {
SRpcCorEpSet *tscCorMgmtEpSet; SRpcCorEpSet *tscCorMgmtEpSet;
pthread_mutex_t mutex; pthread_mutex_t mutex;
int32_t numOfObj; // number of sqlObj from this tscObj int32_t numOfObj; // number of sqlObj from this tscObj
SDispatcherManager*dispatcherManager;
SReqOrigin from; SReqOrigin from;
} STscObj; } STscObj;
...@@ -400,9 +402,10 @@ typedef struct SSqlObj { ...@@ -400,9 +402,10 @@ typedef struct SSqlObj {
struct SSqlObj *prev, *next; struct SSqlObj *prev, *next;
int64_t self; int64_t self;
// connect alive
int64_t lastAlive; int64_t lastAlive;
void * pPrevContext; void * pPrevContext;
bool enableBatch;
} SSqlObj; } SSqlObj;
typedef struct SSqlStream { typedef struct SSqlStream {
...@@ -503,7 +506,7 @@ void tscCloseTscObj(void *pObj); ...@@ -503,7 +506,7 @@ void tscCloseTscObj(void *pObj);
TAOS *taos_connect_a(char *ip, char *user, char *pass, char *db, uint16_t port, void (*fp)(void *, TAOS_RES *, int), TAOS *taos_connect_a(char *ip, char *user, char *pass, char *db, uint16_t port, void (*fp)(void *, TAOS_RES *, int),
void *param, TAOS **taos); void *param, TAOS **taos);
TAOS_RES* taos_query_h(TAOS* taos, const char *sqlstr, int64_t* res); TAOS_RES* taos_query_h(TAOS* taos, const char *sqlstr, int64_t* res);
TAOS_RES * taos_query_ra(TAOS *taos, const char *sqlstr, __async_cb_func_t fp, void *param); TAOS_RES * taos_query_ra(TAOS *taos, const char *sqlstr, __async_cb_func_t fp, void *param, bool enableBatch);
// get taos connection unused session number // get taos connection unused session number
int32_t taos_unused_session(TAOS* taos); int32_t taos_unused_session(TAOS* taos);
......
...@@ -13,16 +13,19 @@ ...@@ -13,16 +13,19 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "os.h" #include "os.h"
#include "osAtomic.h"
#include "tarray.h"
#include "tutil.h" #include "tutil.h"
#include "qTableMeta.h"
#include "tnote.h" #include "tnote.h"
#include "trpc.h" #include "trpc.h"
#include "tscBatchWrite.h"
#include "tscLog.h" #include "tscLog.h"
#include "tscSubquery.h" #include "tscSubquery.h"
#include "tscUtil.h" #include "tscUtil.h"
#include "tsched.h"
#include "qTableMeta.h"
#include "tsclient.h" #include "tsclient.h"
static void tscAsyncQueryRowsForNextVnode(void *param, TAOS_RES *tres, int numOfRows); static void tscAsyncQueryRowsForNextVnode(void *param, TAOS_RES *tres, int numOfRows);
...@@ -398,7 +401,6 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, __async_cb_func_t fp, void* para ...@@ -398,7 +401,6 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, __async_cb_func_t fp, void* para
pCmd->resColumnId = TSDB_RES_COL_ID; pCmd->resColumnId = TSDB_RES_COL_ID;
taosAcquireRef(tscObjRef, pSql->self); taosAcquireRef(tscObjRef, pSql->self);
int32_t code = tsParseSql(pSql, true); int32_t code = tsParseSql(pSql, true);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
...@@ -412,18 +414,27 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, __async_cb_func_t fp, void* para ...@@ -412,18 +414,27 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, __async_cb_func_t fp, void* para
taosReleaseRef(tscObjRef, pSql->self); taosReleaseRef(tscObjRef, pSql->self);
return; return;
} }
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd); if (pObj->dispatcherManager != NULL) {
SAsyncBatchWriteDispatcher * dispatcher = dispatcherAcquire(pObj->dispatcherManager);
if (dispatcherTryDispatch(dispatcher, pSql)) {
taosReleaseRef(tscObjRef, pSql->self);
tscDebug("sql obj %p has been buffer in insert buffer", pSql);
return;
}
}
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd);
executeQuery(pSql, pQueryInfo); executeQuery(pSql, pQueryInfo);
taosReleaseRef(tscObjRef, pSql->self); taosReleaseRef(tscObjRef, pSql->self);
} }
// TODO return the correct error code to client in tscQueueAsyncError // TODO return the correct error code to client in tscQueueAsyncError
void taos_query_a(TAOS *taos, const char *sqlstr, __async_cb_func_t fp, void *param) { void taos_query_a(TAOS *taos, const char *sqlstr, __async_cb_func_t fp, void *param) {
taos_query_ra(taos, sqlstr, fp, param); taos_query_ra(taos, sqlstr, fp, param, tsWriteBatchSize > 0);
} }
TAOS_RES * taos_query_ra(TAOS *taos, const char *sqlstr, __async_cb_func_t fp, void *param) { TAOS_RES * taos_query_ra(TAOS *taos, const char *sqlstr, __async_cb_func_t fp, void *param, bool enableBatch) {
STscObj *pObj = (STscObj *)taos; STscObj *pObj = (STscObj *)taos;
if (pObj == NULL || pObj->signature != pObj) { if (pObj == NULL || pObj->signature != pObj) {
tscError("pObj:%p is NULL or freed", pObj); tscError("pObj:%p is NULL or freed", pObj);
...@@ -449,6 +460,8 @@ TAOS_RES * taos_query_ra(TAOS *taos, const char *sqlstr, __async_cb_func_t fp, v ...@@ -449,6 +460,8 @@ TAOS_RES * taos_query_ra(TAOS *taos, const char *sqlstr, __async_cb_func_t fp, v
return NULL; return NULL;
} }
pSql->enableBatch = enableBatch;
doAsyncQuery(pObj, pSql, fp, param, sqlstr, sqlLen); doAsyncQuery(pObj, pSql, fp, param, sqlstr, sqlLen);
return pSql; return pSql;
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tscBatchMerge.h"
/**
* A util function to compare two SName.
*/
static int32_t compareSName(const void *left, const void *right) {
if (left == right) {
return 0;
}
SName* x = *(SName** ) left;
SName* y = *(SName** ) right;
return memcmp(x, y, sizeof(SName));
}
/**
* A util function to sort SArray<SMemRow> by key.
*/
static int32_t compareSMemRow(const void *x, const void *y) {
TSKEY left = memRowKey(*(void **) x);
TSKEY right = memRowKey(*(void **) y);
if (left == right) {
return 0;
} else {
return left > right ? 1 : -1;
}
}
/**
* If the SSubmitBlkBuilder of pBlock->uid is present, returns it. Otherwise, build a new SSubmitBlkBuilder.
*
* @param builder the SSubmitMsgBlocksBuilder.
* @param pBlock the SSubmitBlk.
* @return the SSubmitBlkBuilder (NULL means failure).
*/
inline static SSubmitBlkBuilder* computeIfAbsentSSubmitBlkBuilder(SSubmitMsgBlocksBuilder* builder, SSubmitBlk* pBlock) {
SSubmitBlkBuilder** iter = taosHashGet(builder->blockBuilders, &pBlock->uid, sizeof(pBlock->uid));
SSubmitBlkBuilder* blocksBuilder = NULL;
if (iter) {
return *iter;
}
blocksBuilder = createSSubmitBlkBuilder(pBlock);
if (!blocksBuilder) {
return NULL;
}
if (taosHashPut(builder->blockBuilders, &pBlock->uid, sizeof(pBlock->uid), &blocksBuilder, sizeof(SArray*))) {
destroySSubmitBlkBuilder(blocksBuilder);
return NULL;
}
return blocksBuilder;
}
SName** buildSTableNameListBuilder(STableNameListBuilder* builder, size_t* nTables) {
if (!taosArrayGetSize(builder->pTableNameList)) {
*nTables = 0;
return NULL;
}
// sort and unique.
taosArraySort(builder->pTableNameList, compareSName);
size_t tail = 0;
size_t nNames = taosArrayGetSize(builder->pTableNameList);
for (size_t i = 1; i < nNames; ++i) {
SName* last = taosArrayGetP(builder->pTableNameList, tail);
SName* current = taosArrayGetP(builder->pTableNameList, i);
if (memcmp(last, current, sizeof(SName)) != 0) {
++tail;
taosArraySet(builder->pTableNameList, tail, &current);
}
}
// build table names list.
SName** tableNames = calloc(tail + 1, sizeof(SName*));
if (!tableNames) {
return NULL;
}
// clone data.
for (size_t i = 0; i <= tail; ++i) {
SName* clone = malloc(sizeof(SName));
if (!clone) {
goto error;
}
memcpy(clone, taosArrayGetP(builder->pTableNameList, i), sizeof(SName));
tableNames[i] = clone;
}
*nTables = tail + 1;
return tableNames;
error:
for (size_t i = 0; i <= tail; ++i) {
if (tableNames[i]) {
free(tableNames[i]);
}
}
free(tableNames);
return NULL;
}
SSubmitBlkBuilder* createSSubmitBlkBuilder(SSubmitBlk* metadata) {
SSubmitBlkBuilder* builder = calloc(1, sizeof(SSubmitBlkBuilder));
if (!builder) {
return NULL;
}
builder->rows = taosArrayInit(1, sizeof(SMemRow));
if (!builder->rows) {
free(builder);
return NULL;
}
builder->metadata = calloc(1, sizeof(SSubmitBlk));
if (!builder->metadata) {
taosArrayDestroy(&builder->rows);
free(builder);
return NULL;
}
memcpy(builder->metadata, metadata, sizeof(SSubmitBlk));
return builder;
}
void destroySSubmitBlkBuilder(SSubmitBlkBuilder* builder) {
if (!builder) {
return;
}
taosArrayDestroy(&builder->rows);
free(builder->metadata);
free(builder);
}
bool appendSSubmitBlkBuilder(SSubmitBlkBuilder* builder, SSubmitBlk* pBlock) {
assert(pBlock->uid == builder->metadata->uid);
assert(pBlock->schemaLen == 0);
// shadow copy all the SMemRow to SSubmitBlkBuilder::rows.
char* pRow = pBlock->data;
char* pEnd = pBlock->data + htonl(pBlock->dataLen);
while (pRow < pEnd) {
if (!taosArrayPush(builder->rows, &pRow)) {
return false;
}
pRow += memRowTLen(pRow);
}
return true;
}
size_t writeSSubmitBlkBuilder(SSubmitBlkBuilder* builder, SSubmitBlk* target, size_t* nRows) {
memcpy(target, builder->metadata, sizeof(SSubmitBlk));
// sort SSubmitBlkBuilder::rows by timestamp.
uint32_t dataLen = 0;
taosArraySort(builder->rows, compareSMemRow);
// deep copy all the SMemRow to target.
size_t nMemRows = taosArrayGetSize(builder->rows);
for (int i = 0; i < nMemRows; ++i) {
char* pRow = taosArrayGetP(builder->rows, i);
memcpy(POINTER_SHIFT(target->data, dataLen), pRow, memRowTLen(pRow));
dataLen += memRowTLen(pRow);
}
*nRows = nMemRows;
target->schemaLen = 0;
target->dataLen = (int32_t) htonl(dataLen);
target->numOfRows = (int16_t) htons(*nRows);
return dataLen + sizeof(SSubmitBlk);
}
size_t nWriteSSubmitBlkBuilder(SSubmitBlkBuilder* builder) {
size_t dataLen = 0;
size_t nRows = taosArrayGetSize(builder->rows);
for (int i = 0; i < nRows; ++i) {
char* pRow = taosArrayGetP(builder->rows, i);
dataLen += memRowTLen(pRow);
}
return dataLen + sizeof(SSubmitBlk);
}
SSubmitMsgBlocksBuilder* createSSubmitMsgBuilder(int64_t vgId) {
SSubmitMsgBlocksBuilder* builder = calloc(1, sizeof(SSubmitMsgBlocksBuilder));
if (!builder) {
return NULL;
}
builder->vgId = vgId;
builder->blockBuilders = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
if (!builder->blockBuilders) {
free(builder);
return NULL;
}
return builder;
}
size_t nWriteSSubmitMsgBuilder(SSubmitMsgBlocksBuilder* builder) {
size_t nWrite = 0;
SSubmitBlkBuilder** iter = taosHashIterate(builder->blockBuilders, NULL);
while (iter) {
SSubmitBlkBuilder* blocksBuilder = *iter;
nWrite += nWriteSSubmitBlkBuilder(blocksBuilder);
iter = taosHashIterate(builder->blockBuilders, iter);
}
return nWrite;
}
size_t writeSSubmitMsgBlocksBuilder(SSubmitMsgBlocksBuilder* builder, SSubmitBlk* pBlocks, size_t* nRows) {
size_t nWrite = 0;
SSubmitBlkBuilder** iter = taosHashIterate(builder->blockBuilders, NULL);
// copy all the SSubmitBlk to pBlocks.
while (iter) {
size_t nSubRows = 0;
SSubmitBlkBuilder* blocksBuilder = *iter;
SSubmitBlk* pBlock = POINTER_SHIFT(pBlocks, nWrite);
nWrite += writeSSubmitBlkBuilder(blocksBuilder, pBlock, &nSubRows);
*nRows += nSubRows;
iter = taosHashIterate(builder->blockBuilders, iter);
}
return nWrite;
}
size_t nBlockSSubmitMsgBlocksBuilder(SSubmitMsgBlocksBuilder* builder) {
return taosHashGetSize(builder->blockBuilders);
}
void destroySSubmitMsgBuilder(SSubmitMsgBlocksBuilder* builder) {
if (!builder) {
return;
}
SSubmitBlkBuilder** iter = taosHashIterate(builder->blockBuilders, NULL);
while (iter) {
destroySSubmitBlkBuilder(*iter);
iter = taosHashIterate(builder->blockBuilders, iter);
}
taosHashCleanup(builder->blockBuilders);
free(builder);
}
bool appendSSubmitMsgBlocks(SSubmitMsgBlocksBuilder* builder, SSubmitBlk* pBlocks, size_t nBlocks) {
SSubmitBlk* pBlock = pBlocks;
for (size_t i = 0; i < nBlocks; ++i) {
// not support SSubmitBlk with schema.
assert(pBlock->schemaLen == 0);
// get the builder of specific table (by pBlock->uid).
SSubmitBlkBuilder* blocksBuilder = computeIfAbsentSSubmitBlkBuilder(builder, pBlock);
if (!blocksBuilder) {
return false;
}
if (!appendSSubmitBlkBuilder(blocksBuilder, pBlock)) {
return false;
}
// go to next block.
size_t blockSize = sizeof (SSubmitBlk) + htonl(pBlock->dataLen);
pBlock = POINTER_SHIFT(pBlock, blockSize);
}
return true;
}
STableDataBlocksBuilder* createSTableDataBlocksBuilder(int64_t vgId) {
STableDataBlocksBuilder* builder = calloc(1, sizeof(STableDataBlocksBuilder));
if (!builder) {
return NULL;
}
builder->blocksBuilder = createSSubmitMsgBuilder(vgId);
if (!builder->blocksBuilder) {
free(builder);
return NULL;
}
builder->vgId = vgId;
builder->firstBlock = NULL;
return builder;
}
void destroySTableDataBlocksBuilder(STableDataBlocksBuilder* builder) {
if (!builder) {
return;
}
destroySSubmitMsgBuilder(builder->blocksBuilder);
free(builder);
}
bool appendSTableDataBlocksBuilder(STableDataBlocksBuilder* builder, STableDataBlocks* dataBlocks) {
// the data blocks vgId must be same with builder vgId.
if (!dataBlocks || dataBlocks->vgId != builder->vgId) {
return false;
}
if (!builder->firstBlock) {
builder->firstBlock = dataBlocks;
}
SSubmitBlk* pBlocks = (SSubmitBlk *)(dataBlocks->pData + dataBlocks->headerSize);
return appendSSubmitMsgBlocks(builder->blocksBuilder, pBlocks, dataBlocks->numOfTables);
}
STableDataBlocks* buildSTableDataBlocksBuilder(STableDataBlocksBuilder* builder, size_t* nRows) {
SSubmitMsgBlocksBuilder* blocksBuilder = builder->blocksBuilder;
STableDataBlocks *firstBlock = builder->firstBlock;
if (!firstBlock) {
return NULL;
}
size_t nWriteSize = nWriteSSubmitMsgBuilder(builder->blocksBuilder);
size_t nHeaderSize = firstBlock->headerSize;
size_t nAllocSize = nWriteSize + nHeaderSize;
// allocate data blocks.
STableDataBlocks* dataBlocks = NULL;
int32_t code = tscCreateDataBlock(nAllocSize, 0, (int32_t) nHeaderSize, &firstBlock->tableName, firstBlock->pTableMeta, &dataBlocks);
if (code != TSDB_CODE_SUCCESS) {
return NULL;
}
// build the header (using first block).
dataBlocks->size = nHeaderSize;
memcpy(dataBlocks->pData, firstBlock->pData, nHeaderSize);
// build the SSubmitMsg::blocks.
dataBlocks->size += writeSSubmitMsgBlocksBuilder(blocksBuilder, (SSubmitBlk *) (dataBlocks->pData + nHeaderSize), nRows);
dataBlocks->numOfTables = (int32_t) nBlockSSubmitMsgBlocksBuilder(blocksBuilder);
return dataBlocks;
}
STableDataBlocksListBuilder* createSTableDataBlocksListBuilder() {
STableDataBlocksListBuilder* builder = calloc(1, sizeof(STableDataBlocksListBuilder));
if (!builder) {
return NULL;
}
builder->dataBlocksBuilders = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
if (!builder->dataBlocksBuilders) {
free(builder);
return NULL;
}
return builder;
}
void destroySTableDataBlocksListBuilder(STableDataBlocksListBuilder* builder) {
if (!builder) {
return;
}
STableDataBlocksBuilder** iter = taosHashIterate(builder->dataBlocksBuilders, NULL);
while (iter) {
destroySTableDataBlocksBuilder(*iter);
iter = taosHashIterate(builder->dataBlocksBuilders, iter);
}
taosHashCleanup(builder->dataBlocksBuilders);
free(builder);
}
bool appendSTableDataBlocksListBuilder(STableDataBlocksListBuilder* builder, STableDataBlocks* dataBlocks) {
// get the data blocks builder of specific vgId.
STableDataBlocksBuilder** item = taosHashGet(builder->dataBlocksBuilders, &dataBlocks->vgId, sizeof(dataBlocks->vgId));
STableDataBlocksBuilder* blocksBuilder = NULL;
if (item) {
blocksBuilder = *item;
} else {
blocksBuilder = createSTableDataBlocksBuilder(dataBlocks->vgId);
if (!blocksBuilder) {
return false;
}
if (taosHashPut(builder->dataBlocksBuilders, &dataBlocks->vgId, sizeof(dataBlocks->vgId), &blocksBuilder, sizeof(STableDataBlocksBuilder*))) {
destroySTableDataBlocksBuilder(blocksBuilder);
return false;
}
}
// append to this builder.
return appendSTableDataBlocksBuilder(blocksBuilder, dataBlocks);
}
SArray* buildSTableDataBlocksListBuilder(STableDataBlocksListBuilder* builder, size_t* nTables, size_t* nRows) {
SArray* pVnodeDataBlockList = taosArrayInit(taosHashGetSize(builder->dataBlocksBuilders), sizeof(STableDataBlocks*));
if (!pVnodeDataBlockList) {
return NULL;
}
// build data blocks of each vgId.
STableDataBlocksBuilder** iter = taosHashIterate(builder->dataBlocksBuilders, NULL);
while (iter) {
size_t nSubRows = 0;
STableDataBlocksBuilder* dataBlocksBuilder = *iter;
STableDataBlocks* dataBlocks = buildSTableDataBlocksBuilder(dataBlocksBuilder, &nSubRows);
if (!dataBlocks) {
goto error;
}
*nTables += dataBlocks->numOfTables;
*nRows += nSubRows;
taosArrayPush(pVnodeDataBlockList, &dataBlocks);
iter = taosHashIterate(builder->dataBlocksBuilders, iter);
}
return pVnodeDataBlockList;
error:
for (int i = 0; i < taosArrayGetSize(pVnodeDataBlockList); ++i) {
STableDataBlocks* dataBlocks = taosArrayGetP(pVnodeDataBlockList, i);
tscDestroyDataBlock(NULL, dataBlocks, false);
}
taosArrayDestroy(&pVnodeDataBlockList);
return NULL;
}
STableNameListBuilder* createSTableNameListBuilder() {
STableNameListBuilder* builder = calloc(1, sizeof(STableNameListBuilder));
if (!builder) {
return NULL;
}
builder->pTableNameList = taosArrayInit(1, sizeof(SName*));
if (!builder->pTableNameList) {
free(builder);
return NULL;
}
return builder;
}
void destroySTableNameListBuilder(STableNameListBuilder* builder) {
if (!builder) {
return;
}
taosArrayDestroy(&builder->pTableNameList);
free(builder);
}
bool insertSTableNameListBuilder(STableNameListBuilder* builder, SName* name) {
return taosArrayPush(builder->pTableNameList, &name);
}
int32_t tscMergeSSqlObjs(SSqlObj** polls, size_t nPolls, SSqlObj* result) {
// statement array is empty.
if (!polls || !nPolls) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
STableDataBlocksListBuilder* builder = createSTableDataBlocksListBuilder();
if (!builder) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
STableNameListBuilder* nameListBuilder = createSTableNameListBuilder();
if (!nameListBuilder) {
destroySTableDataBlocksListBuilder(builder);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
// append the existing data blocks to builder.
for (size_t i = 0; i < nPolls; ++i) {
SSqlObj *pSql = polls[i];
SInsertStatementParam* pInsertParam = &pSql->cmd.insertParam;
if (!pInsertParam->pDataBlocks) {
continue;
}
assert(pInsertParam->payloadType == PAYLOAD_TYPE_KV);
assert(!pInsertParam->schemaAttached);
// append each vnode data block to the builder.
size_t nBlocks = taosArrayGetSize(pInsertParam->pDataBlocks);
for (size_t j = 0; j < nBlocks; ++j) {
STableDataBlocks* tableBlock = taosArrayGetP(pInsertParam->pDataBlocks, j);
if (!appendSTableDataBlocksListBuilder(builder, tableBlock)) {
destroySTableDataBlocksListBuilder(builder);
destroySTableNameListBuilder(nameListBuilder);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
for (int k = 0; k < pInsertParam->numOfTables; ++k) {
if (!insertSTableNameListBuilder(nameListBuilder, pInsertParam->pTableNameList[k])) {
destroySTableDataBlocksListBuilder(builder);
destroySTableNameListBuilder(nameListBuilder);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
}
}
}
// build the vnode data blocks.
size_t nBlocks = 0;
size_t nRows = 0;
SInsertStatementParam* pInsertParam = &result->cmd.insertParam;
SArray* pVnodeDataBlocksList = buildSTableDataBlocksListBuilder(builder, &nBlocks, &nRows);
if (!pVnodeDataBlocksList) {
destroySTableDataBlocksListBuilder(builder);
destroySTableNameListBuilder(nameListBuilder);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
// build the table name list.
size_t nTables = 0;
SName** pTableNameList = buildSTableNameListBuilder(nameListBuilder, &nTables);
if (!pTableNameList) {
destroySTableDataBlocksListBuilder(builder);
destroySTableNameListBuilder(nameListBuilder);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
if (nTables != nBlocks) {
destroySTableDataBlocksListBuilder(builder);
destroySTableNameListBuilder(nameListBuilder);
return TSDB_CODE_TSC_INVALID_OPERATION;
}
// replace table name list.
if (pInsertParam->pTableNameList) {
destroyTableNameList(pInsertParam);
}
pInsertParam->pTableNameList = pTableNameList;
pInsertParam->numOfTables = (int32_t) nTables;
// replace vnode data blocks.
if (pInsertParam->pDataBlocks) {
tscDestroyBlockArrayList(result, pInsertParam->pDataBlocks);
}
pInsertParam->pDataBlocks = pVnodeDataBlocksList;
pInsertParam->numOfRows = (int32_t) nRows;
// clean up.
destroySTableDataBlocksListBuilder(builder);
destroySTableNameListBuilder(nameListBuilder);
return TSDB_CODE_SUCCESS;
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "osAtomic.h"
#include "tscBatchMerge.h"
#include "tscBatchWrite.h"
#include "tscLog.h"
#include "tscSubquery.h"
#include "tsclient.h"
/**
* Represents the callback function and its context.
*/
typedef struct {
__async_cb_func_t fp;
void* param;
} SCallbackHandler;
/**
* The context of `batchResultCallback`.
*/
typedef struct {
size_t nHandlers;
SCallbackHandler handler[];
} SBatchCallbackContext;
/**
* Get the number of insertion row in the sql statement.
*
* @param pSql the sql statement.
* @return int32_t the number of insertion row.
*/
inline static int32_t statementGetInsertionRows(SSqlObj* pSql) { return pSql->cmd.insertParam.numOfRows; }
/**
* Return the error result to the callback function, and release the sql object.
*
* @param pSql the sql object.
* @param code the error code of the error result.
*/
inline static void tscReturnsError(SSqlObj* pSql, int code) {
if (pSql == NULL) {
return;
}
pSql->res.code = code;
tscAsyncResultOnError(pSql);
}
/**
* Proxy function to perform sequentially insert operation.
*
* @param param the context of `batchResultCallback`.
* @param tres the result object.
* @param code the error code.
*/
static void batchResultCallback(void* param, TAOS_RES* tres, int32_t code) {
SBatchCallbackContext* context = param;
SSqlObj* res = tres;
// handle corner case [context == null].
if (context == NULL) {
tscError("context in `batchResultCallback` is null, which should not happen");
if (tres) {
taosReleaseRef(tscObjRef, res->self);
}
return;
}
// handle corner case [res == null].
if (res == NULL) {
tscError("tres in `batchResultCallback` is null, which should not happen");
free(context);
return;
}
// handle results.
tscDebug("async batch result callback, number of item: %zu", context->nHandlers);
for (int i = 0; i < context->nHandlers; ++i) {
// the result object is shared by many sql objects.
// therefore, we need to increase the ref count.
taosAcquireRef(tscObjRef, res->self);
SCallbackHandler* handler = &context->handler[i];
handler->fp(handler->param, res, code);
}
taosReleaseRef(tscObjRef, res->self);
free(context);
}
int32_t dispatcherBatchBuilder(SBatchRequest* pRequest, SSqlObj** batch) {
assert(pRequest);
assert(pRequest->pRequests);
assert(pRequest->nRequests);
// create the callback context.
SBatchCallbackContext* context =
calloc(1, sizeof(SBatchCallbackContext) + pRequest->nRequests * sizeof(SCallbackHandler));
if (context == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
tscDebug("create batch call back context: %p", context);
// initialize the callback context.
context->nHandlers = pRequest->nRequests;
for (size_t i = 0; i < pRequest->nRequests; ++i) {
SSqlObj* pSql = pRequest->pRequests[i];
context->handler[i].fp = pSql->fp;
context->handler[i].param = pSql->param;
}
// merge the statements into single one.
tscDebug("start to merge %zu sql objs", pRequest->nRequests);
SSqlObj* pFirst = pRequest->pRequests[0];
int32_t code = tscMergeSSqlObjs(pRequest->pRequests, pRequest->nRequests, pFirst);
if (code != TSDB_CODE_SUCCESS) {
const char* msg = tstrerror(code);
tscDebug("failed to merge sql objects: %s", msg);
free(context);
return code;
}
pFirst->fp = batchResultCallback;
pFirst->param = context;
pFirst->fetchFp = pFirst->fp;
taosAcquireRef(tscObjRef, pFirst->self);
*batch = pFirst;
for (int i = 0; i < pRequest->nRequests; ++i) {
SSqlObj* pSql = pRequest->pRequests[i];
taosReleaseRef(tscObjRef, pSql->self);
}
return code;
}
/**
* Poll all the SSqlObj* in the dispatcher's buffer (No Lock). After call this function,
* you need to notify dispatcher->notFull by yourself.
*
* @param dispatcher the dispatcher.
* @param nPolls the number of polled SSqlObj*.
* @return all the SSqlObj* in the buffer.
*/
inline static SBatchRequest* dispatcherPollAll(SAsyncBatchWriteDispatcher* dispatcher) {
if (!dispatcher->bufferSize) {
return NULL;
}
SBatchRequest* pRequest = malloc(sizeof(SBatchRequest) + sizeof(SSqlObj*) * dispatcher->bufferSize);
if (pRequest == NULL) {
tscError("failed to poll all items: out of memory");
return NULL;
}
memcpy(pRequest->pRequests, dispatcher->buffer, sizeof(SSqlObj*) * dispatcher->bufferSize);
pRequest->nRequests = dispatcher->bufferSize;
dispatcher->currentSize = 0;
dispatcher->bufferSize = 0;
return pRequest;
}
/**
* Poll all the SSqlObj* in the dispatcher's buffer.
*
* @param dispatcher the dispatcher.
* @return all the SSqlObj* in the buffer.
*/
inline static SBatchRequest* dispatcherLockPollAll(SAsyncBatchWriteDispatcher* dispatcher) {
SBatchRequest* pRequest = NULL;
pthread_mutex_lock(&dispatcher->bufferMutex);
pRequest = dispatcherPollAll(dispatcher);
pthread_cond_broadcast(&dispatcher->notFull);
pthread_mutex_unlock(&dispatcher->bufferMutex);
return pRequest;
}
/**
* @brief Try to offer the SSqlObj* to the dispatcher.
*
* @param dispatcher the async bulk write dispatcher.
* @param pSql the sql object to offer.
* @return return whether offer success.
*/
inline static bool dispatcherTryOffer(SAsyncBatchWriteDispatcher* dispatcher, SSqlObj* pSql) {
pthread_mutex_lock(&dispatcher->bufferMutex);
// if dispatcher is shutdown, must fail back to normal insertion.
// usually not happen, unless taos_query_a(...) after taos_close(...).
if (atomic_load_8(&dispatcher->shutdown)) {
pthread_mutex_unlock(&dispatcher->bufferMutex);
return false;
}
// the buffer is full.
while (dispatcher->currentSize >= dispatcher->batchSize) {
if (pthread_cond_wait(&dispatcher->notFull, &dispatcher->bufferMutex)) {
pthread_mutex_unlock(&dispatcher->bufferMutex);
return false;
}
}
dispatcher->buffer[dispatcher->bufferSize++] = pSql;
dispatcher->currentSize += statementGetInsertionRows(pSql);
tscDebug("sql obj %p has been write to insert buffer", pSql);
if (dispatcher->currentSize < dispatcher->batchSize) {
pthread_mutex_unlock(&dispatcher->bufferMutex);
return true;
}
// the dispatcher reaches batch size.
SBatchRequest* pRequest = dispatcherPollAll(dispatcher);
pthread_cond_broadcast(&dispatcher->notFull);
pthread_mutex_unlock(&dispatcher->bufferMutex);
if (pRequest) {
dispatcherAsyncExecute(pRequest);
}
return true;
}
void dispatcherExecute(SBatchRequest* pRequest) {
int32_t code = TSDB_CODE_SUCCESS;
// no item in the buffer (items has been taken by other threads).
if (!pRequest) {
return;
}
assert(pRequest->pRequests);
assert(pRequest->nRequests);
// merge the statements into single one.
SSqlObj* pSql = NULL;
code = dispatcherBatchBuilder(pRequest, &pSql);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
tscDebug("merging %zu sql objs into %p", pRequest->nRequests, pSql);
tscHandleMultivnodeInsert(pSql);
return;
_error:
tscError("send async batch sql obj failed, reason: %s", tstrerror(code));
// handling the failures.
for (size_t i = 0; i < pRequest->nRequests; ++i) {
SSqlObj* item = pRequest->pRequests[i];
tscReturnsError(item, code);
}
}
/**
* Get the timespec after `millis` ms
*
* @param t the timespec.
* @param millis the duration in milliseconds.
* @return the timespec after `millis` ms.
*/
static inline void afterMillis(struct timespec* t, int32_t millis) {
t->tv_nsec += millis * 1000000L;
t->tv_sec += t->tv_nsec / 1000000000L;
t->tv_nsec %= 1000000000L;
}
/**
* Sleep until `timeout` timespec. When dispatcherShutdown(...) called, the function will return immediately.
*
* @param dispatcher the dispatcher thread to sleep.
* @param timeout the timeout in CLOCK_REALTIME.
*/
inline static void timeoutManagerSleepUntil(SDispatcherTimeoutManager* manager, struct timespec* timeout) {
pthread_mutex_lock(&manager->sleepMutex);
while (true) {
// notified by dispatcherShutdown(...).
if (isShutdownSDispatcherTimeoutManager(manager)) {
break;
}
if (pthread_cond_timedwait(&manager->timeout, &manager->sleepMutex, timeout)) {
fflush(stdout);
break;
}
}
pthread_mutex_unlock(&manager->sleepMutex);
}
/**
* The thread to manage batching timeout.
*/
static void* timeoutManagerCallback(void* arg) {
SDispatcherTimeoutManager* manager = arg;
setThreadName("tscAsyncBackground");
while (!isShutdownSDispatcherTimeoutManager(manager)) {
struct timespec timeout;
clock_gettime(CLOCK_REALTIME, &timeout);
afterMillis(&timeout, manager->timeoutMs);
SBatchRequest* pRequest = dispatcherLockPollAll(manager->dispatcher);
if (pRequest) {
dispatcherAsyncExecute(pRequest);
}
// Similar to scheduleAtFixedRate in Java, if the execution time exceed
// `timeoutMs` milliseconds, then there will be no sleep.
timeoutManagerSleepUntil(manager, &timeout);
}
return NULL;
}
SAsyncBatchWriteDispatcher* createSAsyncBatchWriteDispatcher(STscObj* pClient, int32_t batchSize, int32_t timeoutMs) {
SAsyncBatchWriteDispatcher* dispatcher = calloc(1, sizeof(SAsyncBatchWriteDispatcher) + batchSize * sizeof(SSqlObj*));
if (!dispatcher) {
return NULL;
}
assert(pClient != NULL);
dispatcher->pClient = pClient;
dispatcher->currentSize = 0;
dispatcher->bufferSize = 0;
dispatcher->batchSize = batchSize;
atomic_store_8(&dispatcher->shutdown, false);
// init the mutex and the cond.
pthread_mutex_init(&dispatcher->bufferMutex, NULL);
pthread_cond_init(&dispatcher->notFull, NULL);
// init timeout manager.
dispatcher->timeoutManager = createSDispatcherTimeoutManager(dispatcher, timeoutMs);
if (!dispatcher->timeoutManager) {
pthread_mutex_destroy(&dispatcher->bufferMutex);
pthread_cond_destroy(&dispatcher->notFull);
free(dispatcher);
return NULL;
}
return dispatcher;
}
/**
* Shutdown the dispatcher and join the timeout thread.
*
* @param dispatcher the dispatcher.
*/
inline static void dispatcherShutdown(SAsyncBatchWriteDispatcher* dispatcher) {
atomic_store_8(&dispatcher->shutdown, true);
if (dispatcher->timeoutManager) {
shutdownSDispatcherTimeoutManager(dispatcher->timeoutManager);
}
}
void destroySAsyncBatchWriteDispatcher(SAsyncBatchWriteDispatcher* dispatcher) {
if (dispatcher == NULL) {
return;
}
dispatcherShutdown(dispatcher);
// poll and send all the statements in the buffer.
while (true) {
SBatchRequest* pRequest = dispatcherLockPollAll(dispatcher);
if (!pRequest) {
break;
}
dispatcherExecute(pRequest);
free(pRequest);
}
// destroy the timeout manager.
destroySDispatcherTimeoutManager(dispatcher->timeoutManager);
// destroy the mutex.
pthread_mutex_destroy(&dispatcher->bufferMutex);
pthread_cond_destroy(&dispatcher->notFull);
free(dispatcher);
}
bool dispatcherCanDispatch(SAsyncBatchWriteDispatcher* dispatcher, SSqlObj* pSql) {
if (pSql == NULL || !pSql->enableBatch) {
return false;
}
SSqlCmd* pCmd = &pSql->cmd;
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
// only support insert statement.
if (!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_INSERT)) {
return false;
}
SInsertStatementParam* pInsertParam = &pCmd->insertParam;
// file insert not support.
if (TSDB_QUERY_HAS_TYPE(pInsertParam->insertType, TSDB_QUERY_TYPE_FILE_INSERT)) {
return false;
}
// only support kv payload.
if (pInsertParam->payloadType != PAYLOAD_TYPE_KV) {
return false;
}
// no schema attached.
if (pInsertParam->schemaAttached) {
return false;
}
// too many insertion rows, fail back to normal insertion.
if (statementGetInsertionRows(pSql) >= dispatcher->batchSize) {
return false;
}
return true;
}
bool dispatcherTryDispatch(SAsyncBatchWriteDispatcher* dispatcher, SSqlObj* pSql) {
if (atomic_load_8(&dispatcher->shutdown)) {
return false;
}
// the sql object doesn't support bulk insertion.
if (!dispatcherCanDispatch(dispatcher, pSql)) {
return false;
}
// try to offer pSql to the buffer.
return dispatcherTryOffer(dispatcher, pSql);
}
/**
* Destroy the SAsyncBatchWriteDispatcher create by SDispatcherManager.
* @param arg the thread local SAsyncBatchWriteDispatcher.
*/
static void destroyDispatcher(void* arg) {
SAsyncBatchWriteDispatcher* dispatcher = arg;
if (!dispatcher) {
return;
}
destroySAsyncBatchWriteDispatcher(dispatcher);
}
SDispatcherManager* createDispatcherManager(STscObj* pClient, int32_t batchSize, int32_t timeoutMs,
bool isThreadLocal) {
SDispatcherManager* dispatcher = calloc(1, sizeof(SDispatcherManager));
if (!dispatcher) {
return NULL;
}
assert(pClient != NULL);
dispatcher->pClient = pClient;
dispatcher->batchSize = batchSize;
dispatcher->timeoutMs = timeoutMs;
dispatcher->isThreadLocal = isThreadLocal;
if (isThreadLocal) {
if (pthread_key_create(&dispatcher->key, destroyDispatcher)) {
free(dispatcher);
return NULL;
}
} else {
dispatcher->pGlobal = createSAsyncBatchWriteDispatcher(pClient, batchSize, timeoutMs);
if (!dispatcher->pGlobal) {
free(dispatcher);
return NULL;
}
}
return dispatcher;
}
SAsyncBatchWriteDispatcher* dispatcherAcquire(SDispatcherManager* manager) {
if (!manager->isThreadLocal) {
return manager->pGlobal;
}
SAsyncBatchWriteDispatcher* value = pthread_getspecific(manager->key);
if (value) {
return value;
}
value = createSAsyncBatchWriteDispatcher(manager->pClient, manager->batchSize, manager->timeoutMs);
if (value) {
pthread_setspecific(manager->key, value);
return value;
}
return NULL;
}
void destroyDispatcherManager(SDispatcherManager* manager) {
if (manager) {
if (manager->isThreadLocal) {
pthread_key_delete(manager->key);
}
if (manager->pGlobal) {
destroySAsyncBatchWriteDispatcher(manager->pGlobal);
}
free(manager);
}
}
SDispatcherTimeoutManager* createSDispatcherTimeoutManager(SAsyncBatchWriteDispatcher* dispatcher, int32_t timeoutMs) {
SDispatcherTimeoutManager* manager = calloc(1, sizeof(SDispatcherTimeoutManager));
if (!manager) {
return NULL;
}
manager->timeoutMs = timeoutMs;
manager->dispatcher = dispatcher;
atomic_store_8(&manager->shutdown, false);
pthread_mutex_init(&manager->sleepMutex, NULL);
pthread_cond_init(&manager->timeout, NULL);
// init background thread.
if (pthread_create(&manager->background, NULL, timeoutManagerCallback, manager)) {
pthread_mutex_destroy(&manager->sleepMutex);
pthread_cond_destroy(&manager->timeout);
free(manager);
return NULL;
}
return manager;
}
void destroySDispatcherTimeoutManager(SDispatcherTimeoutManager* manager) {
if (!manager) {
return;
}
shutdownSDispatcherTimeoutManager(manager);
manager->dispatcher->timeoutManager = NULL;
pthread_mutex_destroy(&manager->sleepMutex);
pthread_cond_destroy(&manager->timeout);
free(manager);
}
void shutdownSDispatcherTimeoutManager(SDispatcherTimeoutManager* manager) {
// mark shutdown, signal shutdown to timeout thread.
pthread_mutex_lock(&manager->sleepMutex);
atomic_store_8(&manager->shutdown, true);
pthread_cond_broadcast(&manager->timeout);
pthread_mutex_unlock(&manager->sleepMutex);
// make sure the timeout thread exit.
pthread_join(manager->background, NULL);
}
bool isShutdownSDispatcherTimeoutManager(SDispatcherTimeoutManager* manager) {
if (!manager) {
return true;
}
return atomic_load_8(&manager->shutdown);
}
/**
* The proxy function to call `dispatcherExecute`.
*
* @param pMsg the schedule message.
*/
static void dispatcherExecuteProxy(struct SSchedMsg* pMsg) {
SBatchRequest* pRequest = pMsg->ahandle;
if (!pRequest) {
return;
}
pMsg->ahandle = NULL;
dispatcherExecute(pRequest);
free(pRequest);
}
void dispatcherAsyncExecute(SBatchRequest* pRequest) {
if (!pRequest) {
return;
}
assert(pRequest->pRequests);
assert(pRequest->nRequests);
SSchedMsg schedMsg = {0};
schedMsg.fp = dispatcherExecuteProxy;
schedMsg.ahandle = (void*) pRequest;
schedMsg.thandle = (void*) 1;
schedMsg.msg = 0;
taosScheduleTask(tscQhandle, &schedMsg);
}
...@@ -1569,7 +1569,7 @@ int tsParseInsertSql(SSqlObj *pSql) { ...@@ -1569,7 +1569,7 @@ int tsParseInsertSql(SSqlObj *pSql) {
if (pInsertParam->numOfParams > 0) { if (pInsertParam->numOfParams > 0) {
goto _clean; goto _clean;
} }
// merge according to vgId // merge according to vgId
if (!TSDB_QUERY_HAS_TYPE(pInsertParam->insertType, TSDB_QUERY_TYPE_STMT_INSERT) && taosHashGetSize(pInsertParam->pTableBlockHashList) > 0) { if (!TSDB_QUERY_HAS_TYPE(pInsertParam->insertType, TSDB_QUERY_TYPE_STMT_INSERT) && taosHashGetSize(pInsertParam->pTableBlockHashList) > 0) {
if ((code = tscMergeTableDataBlocks(pSql, pInsertParam, true)) != TSDB_CODE_SUCCESS) { if ((code = tscMergeTableDataBlocks(pSql, pInsertParam, true)) != TSDB_CODE_SUCCESS) {
...@@ -1577,6 +1577,7 @@ int tsParseInsertSql(SSqlObj *pSql) { ...@@ -1577,6 +1577,7 @@ int tsParseInsertSql(SSqlObj *pSql) {
} }
} }
pCmd->insertParam.numOfRows = totalNum;
code = TSDB_CODE_SUCCESS; code = TSDB_CODE_SUCCESS;
goto _clean; goto _clean;
......
...@@ -21,6 +21,7 @@ ...@@ -21,6 +21,7 @@
#include "tnote.h" #include "tnote.h"
#include "trpc.h" #include "trpc.h"
#include "tscLog.h" #include "tscLog.h"
#include "tscBatchWrite.h"
#include "tscSubquery.h" #include "tscSubquery.h"
#include "tscUtil.h" #include "tscUtil.h"
#include "tsclient.h" #include "tsclient.h"
...@@ -163,15 +164,30 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa ...@@ -163,15 +164,30 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa
pSql->fp = fp; pSql->fp = fp;
pSql->param = param; pSql->param = param;
pSql->cmd.command = TSDB_SQL_CONNECT; pSql->cmd.command = TSDB_SQL_CONNECT;
pObj->dispatcherManager = NULL;
if (tsWriteBatchSize > 1 && !tscEmbedded) {
pObj->dispatcherManager = createDispatcherManager(pObj, tsWriteBatchSize, tsWriteBatchTimeout, tsWriteBatchThreadLocal);
if (!pObj->dispatcherManager) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
tscReleaseRpc(pRpcObj);
free(pSql);
free(pObj);
return NULL;
}
}
if (TSDB_CODE_SUCCESS != tscAllocPayload(&pSql->cmd, TSDB_DEFAULT_PAYLOAD_SIZE)) { if (TSDB_CODE_SUCCESS != tscAllocPayload(&pSql->cmd, TSDB_DEFAULT_PAYLOAD_SIZE)) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
if (pObj->dispatcherManager) {
destroyDispatcherManager(pObj->dispatcherManager);
}
tscReleaseRpc(pRpcObj); tscReleaseRpc(pRpcObj);
free(pSql); free(pSql);
free(pObj); free(pObj);
return NULL; return NULL;
} }
if (taos != NULL) { if (taos != NULL) {
*taos = pObj; *taos = pObj;
} }
......
...@@ -339,7 +339,7 @@ bool sqlBufSend(TAOS *taos, char *sqlBuf) { ...@@ -339,7 +339,7 @@ bool sqlBufSend(TAOS *taos, char *sqlBuf) {
} while(++sleepCnt < 20); } while(++sleepCnt < 20);
strcat(sqlBuf, ";"); strcat(sqlBuf, ";");
taos_query_ra(taos, sqlBuf, cbSendValues, NULL); taos_query_ra(taos, sqlBuf, cbSendValues, NULL, false);
return true; return true;
} }
......
...@@ -14,18 +14,19 @@ ...@@ -14,18 +14,19 @@
*/ */
#include "os.h" #include "os.h"
#include "qScript.h"
#include "taosmsg.h" #include "taosmsg.h"
#include "tconfig.h"
#include "tglobal.h"
#include "tnote.h"
#include "tref.h" #include "tref.h"
#include "trpc.h" #include "trpc.h"
#include "tnote.h" #include "tscBatchWrite.h"
#include "ttimer.h"
#include "tsched.h"
#include "tscLog.h" #include "tscLog.h"
#include "tsched.h"
#include "tsclient.h" #include "tsclient.h"
#include "tglobal.h" #include "ttimer.h"
#include "tconfig.h"
#include "ttimezone.h" #include "ttimezone.h"
#include "qScript.h"
// global, not configurable // global, not configurable
#define TSC_VAR_NOT_RELEASE 1 #define TSC_VAR_NOT_RELEASE 1
...@@ -319,6 +320,49 @@ void taos_cleanup(void) { ...@@ -319,6 +320,49 @@ void taos_cleanup(void) {
taosTmrCleanUp(p); taosTmrCleanUp(p);
} }
/**
* Set the option value (int32, uint16, int16, int8).
* @param cfg the config.
* @param str the value string.
* @return whether is set or not.
*/
static bool taos_set_option_int(SGlobalCfg * cfg, const char* str) {
if (cfg->cfgStatus <= TAOS_CFG_CSTATUS_OPTION) {
char* p = NULL;
errno = 0;
long value = strtol(str, &p, 10);
if (errno != 0 || p == str) {
tscError("failed to parse option: %s, value: %s", cfg->option, str);
return false;
}
if ((float) value < cfg->minValue || (float) value > cfg->maxValue) {
tscError("failed to set option: %s, setValue: %ld, minValue: %f, maxValue: %f", cfg->option, value, cfg->minValue, cfg->maxValue);
return false;
}
if (cfg->valType == TAOS_CFG_VTYPE_INT32) {
*((int32_t*) cfg->ptr) = (int32_t) value;
} else if (cfg->valType == TAOS_CFG_VTYPE_UINT16) {
*((uint16_t*) cfg->ptr) = (uint16_t) value;
} else if (cfg->valType == TAOS_CFG_VTYPE_INT16) {
*((int16_t*) cfg->ptr) = (int16_t) value;
} else if (cfg->valType == TAOS_CFG_VTYPE_INT8) {
*((int8_t*) cfg->ptr) = (int8_t) value;
} else {
tscError("failed to set option: %s, type expected %d", cfg->option, cfg->valType);
return false;
}
cfg->cfgStatus = TAOS_CFG_CSTATUS_OPTION;
tscDebug("config option: %s has set to %s", cfg->option, str);
return true;
}
tscWarn("config option: %s, is configured by %s", cfg->option, tsCfgStatusStr[cfg->cfgStatus]);
return false;
}
static int taos_options_imp(TSDB_OPTION option, const char *pStr) { static int taos_options_imp(TSDB_OPTION option, const char *pStr) {
SGlobalCfg *cfg = NULL; SGlobalCfg *cfg = NULL;
...@@ -473,6 +517,27 @@ static int taos_options_imp(TSDB_OPTION option, const char *pStr) { ...@@ -473,6 +517,27 @@ static int taos_options_imp(TSDB_OPTION option, const char *pStr) {
tsCfgStatusStr[cfg->cfgStatus], (char *)cfg->ptr); tsCfgStatusStr[cfg->cfgStatus], (char *)cfg->ptr);
} }
break; break;
case TSDB_WRITE_BATCH_SIZE: {
cfg = taosGetConfigOption("writeBatchSize");
assert(cfg != NULL);
taos_set_option_int(cfg, pStr);
break;
}
case TSDB_WRITE_BATCH_TIMEOUT: {
cfg = taosGetConfigOption("writeBatchTimeout");
assert(cfg != NULL);
taos_set_option_int(cfg, pStr);
break;
}
case TSDB_WRITE_BATCH_THREAD_LOCAL: {
cfg = taosGetConfigOption("writeBatchThreadLocal");
assert(cfg != NULL);
taos_set_option_int(cfg, pStr);
break;
}
default: default:
// TODO return the correct error code to client in the format for taos_errstr() // TODO return the correct error code to client in the format for taos_errstr()
......
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
#include "texpr.h" #include "texpr.h"
#include "tkey.h" #include "tkey.h"
#include "tmd5.h" #include "tmd5.h"
#include "tscBatchWrite.h"
#include "tscGlobalmerge.h" #include "tscGlobalmerge.h"
#include "tscLog.h" #include "tscLog.h"
#include "tscProfile.h" #include "tscProfile.h"
...@@ -1841,32 +1842,36 @@ void tscDestroyBoundColumnInfo(SParsedDataColInfo* pColInfo) { ...@@ -1841,32 +1842,36 @@ void tscDestroyBoundColumnInfo(SParsedDataColInfo* pColInfo) {
tfree(pColInfo->colIdxInfo); tfree(pColInfo->colIdxInfo);
} }
void tscDestroyDataBlock(SSqlObj *pSql, STableDataBlocks* pDataBlock, bool removeMeta) { void destroySTableDataBlocks(STableDataBlocks* pDataBlocks) {
if (pDataBlock == NULL) { if (!pDataBlocks) {
return; return;
} }
tfree(pDataBlocks->pData);
if (!pDataBlocks->cloned) {
tfree(pDataBlocks->params);
// free the refcount for metermeta
if (pDataBlocks->pTableMeta != NULL) {
tfree(pDataBlocks->pTableMeta);
}
tfree(pDataBlock->pData); tscDestroyBoundColumnInfo(&pDataBlocks->boundColumnInfo);
}
tfree(pDataBlocks);
}
void tscDestroyDataBlock(SSqlObj *pSql, STableDataBlocks* pDataBlock, bool removeMeta) {
if (pDataBlock == NULL) {
return;
}
if (removeMeta) { if (removeMeta) {
char name[TSDB_TABLE_FNAME_LEN] = {0}; char name[TSDB_TABLE_FNAME_LEN] = {0};
tNameExtractFullName(&pDataBlock->tableName, name); tNameExtractFullName(&pDataBlock->tableName, name);
taosHashRemove(UTIL_GET_TABLEMETA(pSql), name, strnlen(name, TSDB_TABLE_FNAME_LEN)); taosHashRemove(UTIL_GET_TABLEMETA(pSql), name, strnlen(name, TSDB_TABLE_FNAME_LEN));
} }
if (!pDataBlock->cloned) { destroySTableDataBlocks(pDataBlock);
tfree(pDataBlock->params);
// free the refcount for metermeta
if (pDataBlock->pTableMeta != NULL) {
tfree(pDataBlock->pTableMeta);
}
tscDestroyBoundColumnInfo(&pDataBlock->boundColumnInfo);
}
tfree(pDataBlock);
} }
SParamInfo* tscAddParamToDataBlock(STableDataBlocks* pDataBlock, char type, uint8_t timePrec, int16_t bytes, SParamInfo* tscAddParamToDataBlock(STableDataBlocks* pDataBlock, char type, uint8_t timePrec, int16_t bytes,
...@@ -1893,18 +1898,22 @@ SParamInfo* tscAddParamToDataBlock(STableDataBlocks* pDataBlock, char type, uint ...@@ -1893,18 +1898,22 @@ SParamInfo* tscAddParamToDataBlock(STableDataBlocks* pDataBlock, char type, uint
return param; return param;
} }
void* tscDestroyBlockArrayList(SSqlObj *pSql, SArray* pDataBlockList) { void destroySTableDataBlocksList(SArray* pDataBlocks) {
if (pDataBlockList == NULL) { if (!pDataBlocks) {
return NULL; return;
} }
size_t nBlocks = taosArrayGetSize(pDataBlocks);
size_t size = taosArrayGetSize(pDataBlockList); for (size_t i = 0; i < nBlocks; ++i) {
for (int32_t i = 0; i < size; i++) { STableDataBlocks * pDataBlock = taosArrayGetP(pDataBlocks, i);
void* d = taosArrayGetP(pDataBlockList, i); if (pDataBlock) {
tscDestroyDataBlock(pSql, d, false); destroySTableDataBlocks(pDataBlock);
}
} }
taosArrayDestroy(&pDataBlocks);
}
taosArrayDestroy(&pDataBlockList); void* tscDestroyBlockArrayList(SSqlObj *pSql, SArray* pDataBlockList) {
destroySTableDataBlocksList(pDataBlockList);
return NULL; return NULL;
} }
...@@ -2202,42 +2211,35 @@ static int32_t getRowExpandSize(STableMeta* pTableMeta) { ...@@ -2202,42 +2211,35 @@ static int32_t getRowExpandSize(STableMeta* pTableMeta) {
return result; return result;
} }
static void extractTableNameList(SSqlObj *pSql, SInsertStatementParam *pInsertParam, bool freeBlockMap) {
pInsertParam->numOfTables = (int32_t) taosHashGetSize(pInsertParam->pTableBlockHashList);
if (pInsertParam->pTableNameList == NULL) {
pInsertParam->pTableNameList = malloc(pInsertParam->numOfTables * POINTER_BYTES);
}
STableDataBlocks **p1 = taosHashIterate(pInsertParam->pTableBlockHashList, NULL);
int32_t i = 0;
while(p1) {
STableDataBlocks* pBlocks = *p1;
//tfree(pInsertParam->pTableNameList[i]);
pInsertParam->pTableNameList[i++] = tNameDup(&pBlocks->tableName);
p1 = taosHashIterate(pInsertParam->pTableBlockHashList, p1);
}
if (freeBlockMap) {
pInsertParam->pTableBlockHashList = tscDestroyBlockHashTable(pSql, pInsertParam->pTableBlockHashList, false);
}
}
int32_t tscMergeTableDataBlocks(SSqlObj *pSql, SInsertStatementParam *pInsertParam, bool freeBlockMap) { int32_t tscMergeTableDataBlocks(SSqlObj *pSql, SInsertStatementParam *pInsertParam, bool freeBlockMap) {
const int INSERT_HEAD_SIZE = sizeof(SMsgDesc) + sizeof(SSubmitMsg); const int INSERT_HEAD_SIZE = sizeof(SMsgDesc) + sizeof(SSubmitMsg);
int code = 0; int code = 0;
bool isRawPayload = IS_RAW_PAYLOAD(pInsertParam->payloadType); bool isRawPayload = IS_RAW_PAYLOAD(pInsertParam->payloadType);
void* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false); size_t initialSize = taosHashGetSize(pInsertParam->pTableBlockHashList);
initialSize = initialSize > 128 ? 128 : initialSize;
void* pVnodeDataBlockHashList = taosHashInit(initialSize, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
SArray* pVnodeDataBlockList = taosArrayInit(8, POINTER_BYTES); SArray* pVnodeDataBlockList = taosArrayInit(8, POINTER_BYTES);
STableDataBlocks** p = taosHashIterate(pInsertParam->pTableBlockHashList, NULL); // alloc table name list.
size_t numOfTables = taosHashGetSize(pInsertParam->pTableBlockHashList);
STableDataBlocks* pOneTableBlock = *p; if (pInsertParam->pTableNameList) {
destroyTableNameList(pInsertParam);
}
pInsertParam->pTableNameList = calloc(numOfTables, sizeof(SName*));
pInsertParam->numOfTables = (int32_t) numOfTables;
size_t tail = 0;
SBlockKeyInfo blkKeyInfo = {0}; // share by pOneTableBlock SBlockKeyInfo blkKeyInfo = {0}; // share by pOneTableBlock
STableDataBlocks** iter = taosHashIterate(pInsertParam->pTableBlockHashList, NULL);
while(pOneTableBlock) { while (iter) {
STableDataBlocks* pOneTableBlock = *iter;
SSubmitBlk* pBlocks = (SSubmitBlk*) pOneTableBlock->pData; SSubmitBlk* pBlocks = (SSubmitBlk*) pOneTableBlock->pData;
iter = taosHashIterate(pInsertParam->pTableBlockHashList, iter);
// extract table name list.
pInsertParam->pTableNameList[tail++] = tNameDup(&pOneTableBlock->tableName);
if (pBlocks->numOfRows > 0) { if (pBlocks->numOfRows > 0) {
// the maximum expanded size in byte when a row-wise data is converted to SDataRow format // the maximum expanded size in byte when a row-wise data is converted to SDataRow format
int32_t expandSize = isRawPayload ? getRowExpandSize(pOneTableBlock->pTableMeta) : 0; int32_t expandSize = isRawPayload ? getRowExpandSize(pOneTableBlock->pTableMeta) : 0;
...@@ -2320,19 +2322,19 @@ int32_t tscMergeTableDataBlocks(SSqlObj *pSql, SInsertStatementParam *pInsertPar ...@@ -2320,19 +2322,19 @@ int32_t tscMergeTableDataBlocks(SSqlObj *pSql, SInsertStatementParam *pInsertPar
dataBuf->numOfTables += 1; dataBuf->numOfTables += 1;
pBlocks->numOfRows = 0; pBlocks->numOfRows = 0;
}else { } else {
tscDebug("0x%"PRIx64" table %s data block is empty", pInsertParam->objectId, pOneTableBlock->tableName.tname); tscDebug("0x%"PRIx64" table %s data block is empty", pInsertParam->objectId, pOneTableBlock->tableName.tname);
} }
p = taosHashIterate(pInsertParam->pTableBlockHashList, p); if (freeBlockMap) {
if (p == NULL) { tscDestroyDataBlock(pSql, pOneTableBlock, false);
break;
} }
pOneTableBlock = *p;
} }
extractTableNameList(pSql, pInsertParam, freeBlockMap); if (freeBlockMap) {
taosHashCleanup(pInsertParam->pTableBlockHashList);
pInsertParam->pTableBlockHashList = NULL;
}
// free the table data blocks; // free the table data blocks;
pInsertParam->pDataBlocks = pVnodeDataBlockList; pInsertParam->pDataBlocks = pVnodeDataBlockList;
...@@ -2353,6 +2355,9 @@ void tscCloseTscObj(void *param) { ...@@ -2353,6 +2355,9 @@ void tscCloseTscObj(void *param) {
tscReleaseRpc(pObj->pRpcObj); tscReleaseRpc(pObj->pRpcObj);
pthread_mutex_destroy(&pObj->mutex); pthread_mutex_destroy(&pObj->mutex);
tscReleaseClusterInfo(pObj->clusterId); tscReleaseClusterInfo(pObj->clusterId);
destroyDispatcherManager(pObj->dispatcherManager);
pObj->dispatcherManager = NULL;
tfree(pObj); tfree(pObj);
} }
......
...@@ -92,6 +92,9 @@ extern int32_t tsRetryStreamCompDelay; ...@@ -92,6 +92,9 @@ extern int32_t tsRetryStreamCompDelay;
extern float tsStreamComputDelayRatio; // the delayed computing ration of the whole time window extern float tsStreamComputDelayRatio; // the delayed computing ration of the whole time window
extern int32_t tsProjectExecInterval; extern int32_t tsProjectExecInterval;
extern int64_t tsMaxRetentWindow; extern int64_t tsMaxRetentWindow;
extern bool tsWriteBatchThreadLocal;
extern int32_t tsWriteBatchSize;
extern int32_t tsWriteBatchTimeout;
// db parameters in client // db parameters in client
extern int32_t tsCacheBlockSize; extern int32_t tsCacheBlockSize;
......
...@@ -127,6 +127,11 @@ int8_t tsSortWhenGroupBy = 1; ...@@ -127,6 +127,11 @@ int8_t tsSortWhenGroupBy = 1;
int32_t tsProjectExecInterval = 10000; // every 10sec, the projection will be executed once int32_t tsProjectExecInterval = 10000; // every 10sec, the projection will be executed once
int64_t tsMaxRetentWindow = 24 * 3600L; // maximum time window tolerance int64_t tsMaxRetentWindow = 24 * 3600L; // maximum time window tolerance
// The tsc async write batching feature (using ABWD).
bool tsWriteBatchThreadLocal = false; // if thread local enable, each thread will allocate a dispatcher.
int32_t tsWriteBatchSize = 0; // suggest: 64 - 512, default 0, 0 means disable batching.
int32_t tsWriteBatchTimeout = 10; // suggest: 2 - 100 (unit: milliseconds)
// the maximum allowed query buffer size during query processing for each data node. // the maximum allowed query buffer size during query processing for each data node.
// -1 no limit (default) // -1 no limit (default)
// 0 no query allowed, queries are disabled // 0 no query allowed, queries are disabled
...@@ -1860,6 +1865,37 @@ static void doInitGlobalConfig(void) { ...@@ -1860,6 +1865,37 @@ static void doInitGlobalConfig(void) {
cfg.ptrLength = 0; cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE; cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg); taosInitConfigOption(cfg);
cfg.option = "writeBatchThreadLocal";
cfg.ptr = &tsWriteBatchThreadLocal;
cfg.valType = TAOS_CFG_VTYPE_INT8;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW | TSDB_CFG_CTYPE_B_CLIENT;
cfg.minValue = 0;
cfg.maxValue = 1;
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
cfg.option = "writeBatchSize";
cfg.ptr = &tsWriteBatchSize;
cfg.valType = TAOS_CFG_VTYPE_INT32;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW | TSDB_CFG_CTYPE_B_CLIENT;
cfg.minValue = 0;
cfg.maxValue = 4096;
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
cfg.option = "writeBatchTimeout";
cfg.ptr = &tsWriteBatchTimeout;
cfg.valType = TAOS_CFG_VTYPE_INT32;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW | TSDB_CFG_CTYPE_B_CLIENT;
cfg.minValue = 1;
cfg.maxValue = 2048;
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
assert(tsGlobalConfigNum == TSDB_CFG_MAX_NUM); assert(tsGlobalConfigNum == TSDB_CFG_MAX_NUM);
#else #else
// if TD_TSZ macro define, have 5 count configs, so must add 5 // if TD_TSZ macro define, have 5 count configs, so must add 5
......
...@@ -60,7 +60,10 @@ typedef enum { ...@@ -60,7 +60,10 @@ typedef enum {
TSDB_OPTION_TIMEZONE, TSDB_OPTION_TIMEZONE,
TSDB_OPTION_CONFIGDIR, TSDB_OPTION_CONFIGDIR,
TSDB_OPTION_SHELL_ACTIVITY_TIMER, TSDB_OPTION_SHELL_ACTIVITY_TIMER,
TSDB_MAX_OPTIONS TSDB_MAX_OPTIONS,
TSDB_WRITE_BATCH_SIZE,
TSDB_WRITE_BATCH_TIMEOUT,
TSDB_WRITE_BATCH_THREAD_LOCAL
} TSDB_OPTION; } TSDB_OPTION;
typedef struct taosField { typedef struct taosField {
......
...@@ -20,7 +20,7 @@ ...@@ -20,7 +20,7 @@
extern "C" { extern "C" {
#endif #endif
#define TSDB_CFG_MAX_NUM 137 #define TSDB_CFG_MAX_NUM 140
#define TSDB_CFG_PRINT_LEN 23 #define TSDB_CFG_PRINT_LEN 23
#define TSDB_CFG_OPTION_LEN 24 #define TSDB_CFG_OPTION_LEN 24
#define TSDB_CFG_VALUE_LEN 41 #define TSDB_CFG_VALUE_LEN 41
......
...@@ -225,66 +225,59 @@ char *tstrstr(char *src, char *dst, bool ignoreInEsc) { ...@@ -225,66 +225,59 @@ char *tstrstr(char *src, char *dst, bool ignoreInEsc) {
} }
char* strtolower(char *dst, const char *src) { char* strtolower(char *dst, const char *src) {
int esc = 0; if (src == NULL || dst == NULL) {
char quote = 0, *p = dst, c; return dst;
}
assert(dst != NULL); char* const ret = dst;
while (*src) {
for (c = *src++; c; c = *src++) { const char ch = *(src++);
if (esc) { *(dst++) = (ch >= 'A' && ch <= 'Z') ? ch - 'A' + 'a' : ch;
esc = 0;
} else if (quote) { if (ch == '\'' || ch == '"') {
if (c == '\\') { char prev = ch;
esc = 1; while (*src) {
} else if (c == quote) { const char next = *(src++);
quote = 0; *(dst++) = next;
if (prev != '\\' && next == ch) break;
prev = next;
}
} else if (ch == '`') {
while (*src) {
const char next = *(src++);
*(dst++) = next;
if (next == ch) break;
} }
} else if (c >= 'A' && c <= 'Z') {
c -= 'A' - 'a';
} else if (c == '\'' || c == '"') {
quote = c;
} }
*p++ = c;
} }
*(dst) = 0;
*p = 0; return ret;
return dst;
} }
char* strntolower(char *dst, const char *src, int32_t n) { char* strntolower(char *dst, const char *src, int32_t n) {
int esc = 0, inEsc = 0;
char quote = 0, *p = dst, c;
assert(dst != NULL); assert(dst != NULL);
if (n == 0) { char* const end = dst + n;
*p = 0; while (dst != end) {
return dst; const char ch = *(src++);
} *(dst++) = (ch >= 'A' && ch <= 'Z') ? ch - 'A' + 'a' : ch;
for (c = *src++; n-- > 0; c = *src++) {
if (esc) { if (ch == '\'' || ch == '"') {
esc = 0; char prev = ch;
} else if (quote) { while (dst != end) {
if (c == '\\') { const char next = *(src++);
esc = 1; *(dst++) = next;
} else if (c == quote) { if (prev != '\\' && next == ch) break;
quote = 0; prev = next;
} }
} else if (inEsc) { } else if (ch == '`') {
if (c == '`') { while (dst != end) {
inEsc = 0; const char next = *(src++);
*(dst++) = next;
if (next == ch) break;
} }
} else if (c >= 'A' && c <= 'Z') {
c -= 'A' - 'a';
} else if (inEsc == 0 && (c == '\'' || c == '"')) {
quote = c;
} else if (c == '`' && quote == 0) {
inEsc = 1;
} }
*p++ = c;
} }
*(dst) = 0;
*p = 0; return dst - n;
return dst;
} }
char* strntolower_s(char *dst, const char *src, int32_t n) { char* strntolower_s(char *dst, const char *src, int32_t n) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册