diff --git a/.gitignore b/.gitignore index d1f1dc4dedb31da5b41a1046d7a4419881746071..b0a36fdeddeb2815d5ff5ba1c6a2241f13f678f5 100644 --- a/.gitignore +++ b/.gitignore @@ -1,9 +1,12 @@ build/ .ycm_extra_conf.py .vscode/ +.cache/ +compile_commands.json .idea/ cmake-build-debug/ cmake-build-release/ +cmake-build-relwithdebinfo/ cscope.out cscope.files tags diff --git a/packaging/cfg/taos.cfg b/packaging/cfg/taos.cfg index a6cbe6f3fb5278c230b40ab0d814932888a7e548..9b07ed7ab3c2ead684d8318c2a13989bc87ad7b9 100644 --- a/packaging/cfg/taos.cfg +++ b/packaging/cfg/taos.cfg @@ -313,3 +313,12 @@ keepColumnName 1 # unit Hour. Latency of data migration # keepTimeOffset 0 + +# taosc write batch size, maximum 4096, suggested value 64 ~ 512, default 0, 0 means disable write batching. +# writeBatchSize 0 + +# taosc write batch timeout in milliseconds, maximum 2048, suggested value 2 ~ 100, default 10. +# writeBatchTimeout 10 + +# using thread local write batching. this option is not available when writeBatchSize = 0. +# writeBatchThreadLocal 0 \ No newline at end of file diff --git a/src/client/inc/tscBatchMerge.h b/src/client/inc/tscBatchMerge.h new file mode 100644 index 0000000000000000000000000000000000000000..12fe66c9cf6df4ee1d0be24def41d7e05efd8b82 --- /dev/null +++ b/src/client/inc/tscBatchMerge.h @@ -0,0 +1,274 @@ +/* +* Copyright (c) 2019 TAOS Data, Inc. +* +* This program is free software: you can use, redistribute, and/or modify +* it under the terms of the GNU Affero General Public License, version 3 +* or later ("AGPL"), as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, but WITHOUT +* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +* FITNESS FOR A PARTICULAR PURPOSE. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see . +*/ + +#ifndef TDENGINE_TSCBATCHMERGE_H +#define TDENGINE_TSCBATCHMERGE_H + +#include "hash.h" +#include "taosmsg.h" +#include "tarray.h" +#include "tscUtil.h" +#include "tsclient.h" + +#ifdef __cplusplus +extern "C" { +#endif + +/** + * A builder of SSubmitBlk. + */ +typedef struct SSubmitBlkBuilder { + // the metadata of the SSubmitBlk. + SSubmitBlk* metadata; + + // the array stores all the rows in a table, aka SArray. + SArray* rows; + +} SSubmitBlkBuilder; + +/** + * The builder to build SSubmitMsg::blocks. + */ +typedef struct SSubmitMsgBlocksBuilder { + // SHashObj. + SHashObj* blockBuilders; + int64_t vgId; +} SSubmitMsgBlocksBuilder; + +/** + * STableDataBlocksBuilder is a tool to build data blocks by append the existing data blocks in a vnode. + */ +typedef struct STableDataBlocksBuilder { + SSubmitMsgBlocksBuilder* blocksBuilder; + STableDataBlocks* firstBlock; + int64_t vgId; +} STableDataBlocksBuilder; + +/** + * STableDataBlocksListBuilder is a tool to build vnode data blocks list by appending exist data blocks. + */ +typedef struct STableDataBlocksListBuilder { + SHashObj* dataBlocksBuilders; +} STableDataBlocksListBuilder; + +/** + * A Builder to build SInsertStatementParam::pTableNameList. + */ +typedef struct STableNameListBuilder { + // store the unsorted table names, SArray. + SArray* pTableNameList; +} STableNameListBuilder; + +/** + * Create a SSubmitBlkBuilder using exist metadata. + * + * @param metadata the metadata. + * @return the SSubmitBlkBuilder. + */ +SSubmitBlkBuilder* createSSubmitBlkBuilder(SSubmitBlk* metadata); + +/** + * Destroy the SSubmitBlkBuilder. + * + * @param builder the SSubmitBlkBuilder. + */ +void destroySSubmitBlkBuilder(SSubmitBlkBuilder* builder); + +/** + * Append a SSubmitBlk* to the builder. The table uid in pBlock must be the same with the builder's. + * + * @param builder the SSubmitBlkBuilder. + * @param pBlock the pBlock to append. + * @return whether the append is success. + */ +bool appendSSubmitBlkBuilder(SSubmitBlkBuilder* builder, SSubmitBlk *pBlock); + + +/** + * Build and write SSubmitBlk to `target` + * + * @param builder the SSubmitBlkBuilder. + * @param target the target to write. + * @param nRows the number of rows in SSubmitBlk*. + * @return the writen bytes. + */ +size_t writeSSubmitBlkBuilder(SSubmitBlkBuilder* builder, SSubmitBlk* target, size_t* nRows); + +/** + * Get the expected writen bytes of `writeSSubmitBlkBuilder`. + * + * @param builder the SSubmitBlkBuilder. + * @return the expected writen bytes of `writeSSubmitBlkBuilder`. + */ +size_t nWriteSSubmitBlkBuilder(SSubmitBlkBuilder* builder); + +/** + * Create a SSubmitMsgBuilder. + * + * @param vgId the vgId of SSubmitMsg. + * @return the SSubmitMsgBuilder. + */ +SSubmitMsgBlocksBuilder* createSSubmitMsgBuilder(int64_t vgId); + +/** + * Get the expected writen bytes of `writeSSubmitMsgBlocksBuilder`. + * + * @param builder the SSubmitMsgBlocksBuilder. + * @return the expected writen bytes of `writeSSubmitMsgBlocksBuilder`. + */ +size_t nWriteSSubmitMsgBuilder(SSubmitMsgBlocksBuilder* builder); + +/** + * Build and write SSubmitMsg::blocks to `pBlocks` + * + * @param builder the SSubmitBlkBuilder. + * @param pBlocks the target to write. + * @param nRows the number of row in SSubmitMsg::blocks. + * @return the writen bytes. + */ +size_t writeSSubmitMsgBlocksBuilder(SSubmitMsgBlocksBuilder* builder, SSubmitBlk* pBlocks, size_t* nRows); + +/** + * Get the number of block in SSubmitMsgBlocksBuilder. + * @param builder the SSubmitMsgBlocksBuilder. + * @return the number of SSubmitBlk block. + */ +size_t nBlockSSubmitMsgBlocksBuilder(SSubmitMsgBlocksBuilder* builder); + +/** + * Destroy the SSubmitMsgBlocksBuilder. + * + * @param builder the SSubmitMsgBlocksBuilder to destroy. + */ +void destroySSubmitMsgBuilder(SSubmitMsgBlocksBuilder* builder); + +/** + * Append SSubmitMsg* to the SSubmitMsgBlocksBuilder. + * + * @param builder the SSubmitMsgBlocksBuilder. + * @param pBlocks the SSubmitBlk in SSubmitMsg::blocks. + * @param nBlocks the number of blocks in SSubmitMsg. + * @return whether the append is success. + */ +bool appendSSubmitMsgBlocks(SSubmitMsgBlocksBuilder* builder, SSubmitBlk* pBlocks, size_t nBlocks); + +/** + * Create the STableDataBlocksBuilder. + * + * @param vgId the vgId of STableDataBlocksBuilder. + * @return the STableDataBlocksBuilder. + */ +STableDataBlocksBuilder* createSTableDataBlocksBuilder(int64_t vgId); + +/** + * Destroy the STableDataBlocksBuilder. + * @param builder the STableDataBlocksBuilder. + */ +void destroySTableDataBlocksBuilder(STableDataBlocksBuilder *builder); + +/** + * Append a data blocks to STableDataBlocksBuilder. + * @param builder the STableDataBlocksBuilder. + * @param dataBlocks the dataBlocks to append. the vgId of dataBlocks must be same with the STableDataBlocksBuilder. + * @return whether the append is success. + */ +bool appendSTableDataBlocksBuilder(STableDataBlocksBuilder* builder, STableDataBlocks* dataBlocks); + +/** + * Build the data blocks for single vnode. + * @param builder the STableDataBlocksBuilder. + * @param nRows the number of row in STableDataBlocks. + * @return the data blocks for single vnode. + */ +STableDataBlocks* buildSTableDataBlocksBuilder(STableDataBlocksBuilder* builder, size_t* nRows); + +/** + * Create the STableDataBlocksListBuilder. + * + * @return the STableDataBlocksListBuilder. + */ +STableDataBlocksListBuilder* createSTableDataBlocksListBuilder(); + +/** + * Destroy the STableDataBlocksListBuilder. + * + * @param builder the STableDataBlocksListBuilder. + */ +void destroySTableDataBlocksListBuilder(STableDataBlocksListBuilder* builder); + +/** + * Append a data blocks to STableDataBlocksListBuilder. + * + * @param builder the STableDataBlocksListBuilder. + * @param dataBlocks the data blocks. + * @return whether the append is success. + */ +bool appendSTableDataBlocksListBuilder(STableDataBlocksListBuilder* builder, STableDataBlocks* dataBlocks); + +/** + * Build the vnode data blocks list. + * + * @param builder the STableDataBlocksListBuilder. + * @param nTables the number of table in vnode data blocks list. + * @param nRows the number of row in vnode data blocks list. + * @return the vnode data blocks list. + */ +SArray* buildSTableDataBlocksListBuilder(STableDataBlocksListBuilder* builder, size_t* nTables, size_t* nRows); + +/** + * Create STableNameListBuilder. + */ +STableNameListBuilder* createSTableNameListBuilder(); + +/** + * Destroy the STableNameListBuilder. + * @param builder the STableNameListBuilder. + */ +void destroySTableNameListBuilder(STableNameListBuilder* builder); + +/** + * Insert a SName to builder. + * + * @param builder the STableNameListBuilder. + * @param name the table name. + * @return whether it is success. + */ +bool insertSTableNameListBuilder(STableNameListBuilder* builder, SName* name); + +/** + * Build the STable name list. + * + * @param builder the STableNameListBuilder. + * @param numOfTables the number of table. + * @return the STable name list. + */ +SName** buildSTableNameListBuilder(STableNameListBuilder* builder, size_t* numOfTables); + +/** + * Merge the KV-PayLoad SQL objects into single one. + * The statements here must be an insertion statement and no schema attached. + * + * @param polls the array of SSqlObj*. + * @param nPolls the number of SSqlObj* in the array. + * @param result the returned result. result is not null! + * @return the status code. + */ +int32_t tscMergeSSqlObjs(SSqlObj** polls, size_t nPolls, SSqlObj *result); + +#ifdef __cplusplus +} +#endif + +#endif // TDENGINE_TSCBATCHMERGE_H diff --git a/src/client/inc/tscBatchWrite.h b/src/client/inc/tscBatchWrite.h new file mode 100644 index 0000000000000000000000000000000000000000..0e7d0136b068c30ced3cbe158bc0e60bc660cae4 --- /dev/null +++ b/src/client/inc/tscBatchWrite.h @@ -0,0 +1,235 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_TSCBATCHWRITE_H +#define TDENGINE_TSCBATCHWRITE_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include "tthread.h" + +// forward declaration. +typedef struct STscObj STscObj; +typedef struct SSqlObj SSqlObj; +typedef struct SDispatcherTimeoutManager SDispatcherTimeoutManager; + +/** + * SAsyncBatchWriteDispatcher is an async batching write dispatcher (ABWD). ABWD accepts the recent SQL requests and put + * them in a queue waiting to be scheduled. When the number of requests in the queue reaches batch_size, it merges the + * requests in the queue and sends them to the server, thus reducing the network overhead caused by multiple + * communications to the server and directly improving the throughput of small object asynchronous writes. + */ +typedef struct SAsyncBatchWriteDispatcher { + // the client object. + STscObj* pClient; + + // the timeout manager. + SDispatcherTimeoutManager* timeoutManager; + + // the mutex to protect the dispatcher. + pthread_mutex_t bufferMutex; + + // the cond to signal when buffer not full. + pthread_cond_t notFull; + + // the maximum number of insertion rows in a batch. + int32_t batchSize; + + // the number of insertion rows in the buffer. + int32_t currentSize; + + // the number of items in the buffer. + int32_t bufferSize; + + // whether the dispatcher is shutdown. + bool shutdown; + + SSqlObj* buffer[]; +} SAsyncBatchWriteDispatcher; + +/** + * The manager of SAsyncBatchWriteDispatcher. Call dispatcherAcquire(...) to get the SAsyncBatchWriteDispatcher + * instance. SDispatcherManager will manage the life cycle of SAsyncBatchWriteDispatcher. + */ +typedef struct SDispatcherManager { + pthread_key_t key; + + // the maximum number of insertion rows in a batch. + int32_t batchSize; + + // the batching timeout in milliseconds. + int32_t timeoutMs; + + // specifies whether the dispatcher is thread local, if the dispatcher is not + // thread local, we will use the global dispatcher below. + bool isThreadLocal; + + // the global dispatcher, if thread local enabled, global will be set to NULL. + SAsyncBatchWriteDispatcher* pGlobal; + + // the client object. + STscObj* pClient; + +} SDispatcherManager; + +/** + * Control the timeout of the dispatcher queue. + */ +typedef struct SDispatcherTimeoutManager { + // the dispatcher that timeout manager belongs to. + SAsyncBatchWriteDispatcher* dispatcher; + + // the background thread. + pthread_t background; + + // the mutex to sleep the background thread. + pthread_mutex_t sleepMutex; + + // the cond to signal to background thread. + pthread_cond_t timeout; + + // the batching timeout in milliseconds. + int32_t timeoutMs; + + // whether the timeout manager is shutdown. + bool shutdown; +} SDispatcherTimeoutManager; + +/** + * A batch that polls from SAsyncBatchWriteDispatcher::buffer. + */ +typedef struct SBatchRequest { + size_t nRequests; + SSqlObj* pRequests[]; +} SBatchRequest; + +/** + * Create the dispatcher timeout manager. + */ +SDispatcherTimeoutManager* createSDispatcherTimeoutManager(SAsyncBatchWriteDispatcher* dispatcher, int32_t timeoutMs); + +/** + * Destroy the dispatcher timeout manager. + */ +void destroySDispatcherTimeoutManager(SDispatcherTimeoutManager* manager); + +/** + * Check if the timeout manager is shutdown. + * @param manager the timeout manager. + * @return whether the timeout manager is shutdown. + */ +bool isShutdownSDispatcherTimeoutManager(SDispatcherTimeoutManager* manager); + +/** + * Shutdown the SDispatcherTimeoutManager. + * @param manager the SDispatcherTimeoutManager. + */ +void shutdownSDispatcherTimeoutManager(SDispatcherTimeoutManager* manager); + +/** + * Merge SSqlObjs into single SSqlObj. + * + * @param pRequest the batch request. + * @param batch the batch SSqlObj*. + * @return the status code. + */ +int32_t dispatcherBatchBuilder(SBatchRequest* pRequest, SSqlObj** batch); + +/** + * Merge the sql statements and execute the merged sql statement asynchronously. + * + * @param pRequest the batch request. the request will be promised to free after calling this function. + */ +void dispatcherAsyncExecute(SBatchRequest* pRequest); + +/** + * Merge the sql statements and execute the merged sql statement. + * + * @param pRequest the batch request. you must call free(pRequest) after calling this function. + */ +void dispatcherExecute(SBatchRequest* pRequest); + +/** + * Create the async batch write dispatcher. + * + * @param pClient the client object. + * @param batchSize When user submit an insert sql to `taos_query_a`, the SSqlObj* will be buffered instead of executing + * it. If the number of the buffered rows reach `batchSize`, all the SSqlObj* will be merged and sent to vnodes. + * @param timeout The SSqlObj* will be sent to vnodes no more than `timeout` milliseconds. But the actual time + * vnodes received the SSqlObj* depends on the network quality. + */ +SAsyncBatchWriteDispatcher* createSAsyncBatchWriteDispatcher(STscObj* pClient, int32_t batchSize, int32_t timeoutMs); + +/** + * Destroy the async auto batch dispatcher. + */ +void destroySAsyncBatchWriteDispatcher(SAsyncBatchWriteDispatcher* dispatcher); + +/** + * Check if the current sql object can be dispatch by ABWD. + * 1. auto batch feature on the sql object must be enabled. + * 2. must be an `insert into ... value ...` statement. + * 3. the payload type must be kv payload. + * 4. no schema attached. + * + * @param dispatcher the dispatcher. + * @param pSql the sql object to check. + * @return returns true if the sql object can be dispatch by ABWD. + */ +bool dispatcherCanDispatch(SAsyncBatchWriteDispatcher* dispatcher, SSqlObj* pSql); + +/** + * Try to offer the SSqlObj* to the dispatcher. If the number of row reach `batchSize`, the function + * will merge the SSqlObj* in the buffer and send them to the vnodes. + * + * @param pSql the insert statement to offer. + * @return if offer success, returns true. + */ +bool dispatcherTryDispatch(SAsyncBatchWriteDispatcher* dispatcher, SSqlObj* pSql); + +/** + * Create the manager of SAsyncBatchWriteDispatcher. + * + * @param pClient the client object. + * @param batchSize the batchSize of SAsyncBatchWriteDispatcher. + * @param timeoutMs the timeoutMs of SAsyncBatchWriteDispatcher. + * @param isThreadLocal specifies whether the dispatcher is thread local. + * @return the SAsyncBatchWriteDispatcher manager. + */ +SDispatcherManager* createDispatcherManager(STscObj* pClient, int32_t batchSize, int32_t timeoutMs, bool isThreadLocal); + +/** + * Destroy the SDispatcherManager. + * (will destroy all the instances of SAsyncBatchWriteDispatcher in the thread local variable) + * + * @param manager the SDispatcherManager. + */ +void destroyDispatcherManager(SDispatcherManager* manager); + +/** + * Get an instance of SAsyncBatchWriteDispatcher. + * + * @param manager the SDispatcherManager. + * @return the SAsyncBatchWriteDispatcher instance. + */ +SAsyncBatchWriteDispatcher* dispatcherAcquire(SDispatcherManager* manager); + +#ifdef __cplusplus +} +#endif + +#endif // TDENGINE_TSCBATCHWRITE_H diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index 2699f4dbda4f74b204ba89f108b9c2a91d6cb4e9..2e6c99f68cae6d801a55395170df4e25c137505f 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -144,6 +144,8 @@ void doRetrieveSubqueryData(SSchedMsg *pMsg); SParamInfo* tscAddParamToDataBlock(STableDataBlocks* pDataBlock, char type, uint8_t timePrec, int16_t bytes, uint32_t offset); +void destroySTableDataBlocksList(SArray* pDataBlocks); +void destroySTableDataBlocks(STableDataBlocks* pDataBlocks); void* tscDestroyBlockArrayList(SSqlObj* pSql, SArray* pDataBlockList); void* tscDestroyUdfArrayList(SArray* pUdfList); void* tscDestroyBlockHashTable(SSqlObj* pSql, SHashObj* pBlockHashTable, bool removeMeta); diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 65854905bdd12ce7452eddada0b051ff7d1cdcbe..9c2a7220c6340c360440c62bdca5f015dc73232a 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -45,6 +45,7 @@ typedef enum { // forward declaration struct SSqlInfo; +typedef struct SDispatcherManager SDispatcherManager; typedef void (*__async_cb_func_t)(void *param, TAOS_RES *tres, int32_t numOfRows); typedef void (*_freeSqlSupporter)(void **); @@ -256,7 +257,7 @@ typedef struct SInsertStatementParam { int32_t batchSize; // for parameter ('?') binding and batch processing int32_t numOfParams; - + int32_t numOfRows; int32_t numOfFiles; char msg[512]; // error message @@ -352,7 +353,8 @@ typedef struct STscObj { SRpcCorEpSet *tscCorMgmtEpSet; pthread_mutex_t mutex; int32_t numOfObj; // number of sqlObj from this tscObj - + + SDispatcherManager*dispatcherManager; SReqOrigin from; } STscObj; @@ -400,9 +402,10 @@ typedef struct SSqlObj { struct SSqlObj *prev, *next; int64_t self; - // connect alive + int64_t lastAlive; void * pPrevContext; + bool enableBatch; } SSqlObj; typedef struct SSqlStream { @@ -503,7 +506,7 @@ void tscCloseTscObj(void *pObj); TAOS *taos_connect_a(char *ip, char *user, char *pass, char *db, uint16_t port, void (*fp)(void *, TAOS_RES *, int), void *param, TAOS **taos); TAOS_RES* taos_query_h(TAOS* taos, const char *sqlstr, int64_t* res); -TAOS_RES * taos_query_ra(TAOS *taos, const char *sqlstr, __async_cb_func_t fp, void *param); +TAOS_RES * taos_query_ra(TAOS *taos, const char *sqlstr, __async_cb_func_t fp, void *param, bool enableBatch); // get taos connection unused session number int32_t taos_unused_session(TAOS* taos); diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index e75ce8dc0106bafa53411f585ad8b708c1c1bf99..c8ef467da4a15feebd3d76976679a3c22e8a091f 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -13,16 +13,19 @@ * along with this program. If not, see . */ + #include "os.h" +#include "osAtomic.h" +#include "tarray.h" #include "tutil.h" +#include "qTableMeta.h" #include "tnote.h" #include "trpc.h" +#include "tscBatchWrite.h" #include "tscLog.h" #include "tscSubquery.h" #include "tscUtil.h" -#include "tsched.h" -#include "qTableMeta.h" #include "tsclient.h" static void tscAsyncQueryRowsForNextVnode(void *param, TAOS_RES *tres, int numOfRows); @@ -334,6 +337,26 @@ bool appendTagsFilter(SSqlObj* pSql) { return false; } + // check tags is blank or '' + char* p1 = pTscObj->tags; + if (strcmp(p1, "\'\'") == 0) { + tscDebug("TAGS 0x%" PRIx64 " tags is empty. user=%s", pSql->self, pTscObj->user); + return false; + } + bool blank = true; + while(*p1 != 0) { + if(*p1 != ' ') { + blank = false; + break; + } + ++p1; + } + // result + if(blank) { + tscDebug("TAGS 0x%" PRIx64 " tags is all blank. user=%s", pSql->self, pTscObj->user); + return false; + } + char * p = insertTags(pSql->sqlstr, pTscObj->tags); if(p == NULL) { return false; @@ -378,7 +401,6 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, __async_cb_func_t fp, void* para pCmd->resColumnId = TSDB_RES_COL_ID; taosAcquireRef(tscObjRef, pSql->self); - int32_t code = tsParseSql(pSql, true); if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { @@ -392,18 +414,27 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, __async_cb_func_t fp, void* para taosReleaseRef(tscObjRef, pSql->self); return; } - - SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd); + + if (pObj->dispatcherManager != NULL) { + SAsyncBatchWriteDispatcher * dispatcher = dispatcherAcquire(pObj->dispatcherManager); + if (dispatcherTryDispatch(dispatcher, pSql)) { + taosReleaseRef(tscObjRef, pSql->self); + tscDebug("sql obj %p has been buffer in insert buffer", pSql); + return; + } + } + + SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd); executeQuery(pSql, pQueryInfo); taosReleaseRef(tscObjRef, pSql->self); } // TODO return the correct error code to client in tscQueueAsyncError void taos_query_a(TAOS *taos, const char *sqlstr, __async_cb_func_t fp, void *param) { - taos_query_ra(taos, sqlstr, fp, param); + taos_query_ra(taos, sqlstr, fp, param, tsWriteBatchSize > 0); } -TAOS_RES * taos_query_ra(TAOS *taos, const char *sqlstr, __async_cb_func_t fp, void *param) { +TAOS_RES * taos_query_ra(TAOS *taos, const char *sqlstr, __async_cb_func_t fp, void *param, bool enableBatch) { STscObj *pObj = (STscObj *)taos; if (pObj == NULL || pObj->signature != pObj) { tscError("pObj:%p is NULL or freed", pObj); @@ -429,6 +460,8 @@ TAOS_RES * taos_query_ra(TAOS *taos, const char *sqlstr, __async_cb_func_t fp, v return NULL; } + pSql->enableBatch = enableBatch; + doAsyncQuery(pObj, pSql, fp, param, sqlstr, sqlLen); return pSql; diff --git a/src/client/src/tscBatchMerge.c b/src/client/src/tscBatchMerge.c new file mode 100644 index 0000000000000000000000000000000000000000..177de143f9ab26ba63c7b9db6819e137876220e0 --- /dev/null +++ b/src/client/src/tscBatchMerge.c @@ -0,0 +1,554 @@ +/* +* Copyright (c) 2019 TAOS Data, Inc. +* +* This program is free software: you can use, redistribute, and/or modify +* it under the terms of the GNU Affero General Public License, version 3 +* or later ("AGPL"), as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, but WITHOUT +* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +* FITNESS FOR A PARTICULAR PURPOSE. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see . +*/ + +#include "tscBatchMerge.h" + +/** + * A util function to compare two SName. + */ +static int32_t compareSName(const void *left, const void *right) { + if (left == right) { + return 0; + } + SName* x = *(SName** ) left; + SName* y = *(SName** ) right; + return memcmp(x, y, sizeof(SName)); +} + +/** + * A util function to sort SArray by key. + */ +static int32_t compareSMemRow(const void *x, const void *y) { + TSKEY left = memRowKey(*(void **) x); + TSKEY right = memRowKey(*(void **) y); + if (left == right) { + return 0; + } else { + return left > right ? 1 : -1; + } +} + +/** + * If the SSubmitBlkBuilder of pBlock->uid is present, returns it. Otherwise, build a new SSubmitBlkBuilder. + * + * @param builder the SSubmitMsgBlocksBuilder. + * @param pBlock the SSubmitBlk. + * @return the SSubmitBlkBuilder (NULL means failure). + */ +inline static SSubmitBlkBuilder* computeIfAbsentSSubmitBlkBuilder(SSubmitMsgBlocksBuilder* builder, SSubmitBlk* pBlock) { + SSubmitBlkBuilder** iter = taosHashGet(builder->blockBuilders, &pBlock->uid, sizeof(pBlock->uid)); + SSubmitBlkBuilder* blocksBuilder = NULL; + if (iter) { + return *iter; + } + + blocksBuilder = createSSubmitBlkBuilder(pBlock); + if (!blocksBuilder) { + return NULL; + } + + if (taosHashPut(builder->blockBuilders, &pBlock->uid, sizeof(pBlock->uid), &blocksBuilder, sizeof(SArray*))) { + destroySSubmitBlkBuilder(blocksBuilder); + return NULL; + } + + return blocksBuilder; +} + +SName** buildSTableNameListBuilder(STableNameListBuilder* builder, size_t* nTables) { + if (!taosArrayGetSize(builder->pTableNameList)) { + *nTables = 0; + return NULL; + } + + // sort and unique. + taosArraySort(builder->pTableNameList, compareSName); + size_t tail = 0; + size_t nNames = taosArrayGetSize(builder->pTableNameList); + for (size_t i = 1; i < nNames; ++i) { + SName* last = taosArrayGetP(builder->pTableNameList, tail); + SName* current = taosArrayGetP(builder->pTableNameList, i); + if (memcmp(last, current, sizeof(SName)) != 0) { + ++tail; + taosArraySet(builder->pTableNameList, tail, ¤t); + } + } + + // build table names list. + SName** tableNames = calloc(tail + 1, sizeof(SName*)); + if (!tableNames) { + return NULL; + } + + // clone data. + for (size_t i = 0; i <= tail; ++i) { + SName* clone = malloc(sizeof(SName)); + if (!clone) { + goto error; + } + memcpy(clone, taosArrayGetP(builder->pTableNameList, i), sizeof(SName)); + tableNames[i] = clone; + } + + *nTables = tail + 1; + return tableNames; + +error: + for (size_t i = 0; i <= tail; ++i) { + if (tableNames[i]) { + free(tableNames[i]); + } + } + free(tableNames); + return NULL; +} + +SSubmitBlkBuilder* createSSubmitBlkBuilder(SSubmitBlk* metadata) { + SSubmitBlkBuilder* builder = calloc(1, sizeof(SSubmitBlkBuilder)); + if (!builder) { + return NULL; + } + + builder->rows = taosArrayInit(1, sizeof(SMemRow)); + if (!builder->rows) { + free(builder); + return NULL; + } + + builder->metadata = calloc(1, sizeof(SSubmitBlk)); + if (!builder->metadata) { + taosArrayDestroy(&builder->rows); + free(builder); + return NULL; + } + memcpy(builder->metadata, metadata, sizeof(SSubmitBlk)); + + return builder; +} + +void destroySSubmitBlkBuilder(SSubmitBlkBuilder* builder) { + if (!builder) { + return; + } + taosArrayDestroy(&builder->rows); + free(builder->metadata); + free(builder); +} + +bool appendSSubmitBlkBuilder(SSubmitBlkBuilder* builder, SSubmitBlk* pBlock) { + assert(pBlock->uid == builder->metadata->uid); + assert(pBlock->schemaLen == 0); + + // shadow copy all the SMemRow to SSubmitBlkBuilder::rows. + char* pRow = pBlock->data; + char* pEnd = pBlock->data + htonl(pBlock->dataLen); + while (pRow < pEnd) { + if (!taosArrayPush(builder->rows, &pRow)) { + return false; + } + pRow += memRowTLen(pRow); + } + + return true; +} + +size_t writeSSubmitBlkBuilder(SSubmitBlkBuilder* builder, SSubmitBlk* target, size_t* nRows) { + memcpy(target, builder->metadata, sizeof(SSubmitBlk)); + + // sort SSubmitBlkBuilder::rows by timestamp. + uint32_t dataLen = 0; + taosArraySort(builder->rows, compareSMemRow); + + // deep copy all the SMemRow to target. + size_t nMemRows = taosArrayGetSize(builder->rows); + for (int i = 0; i < nMemRows; ++i) { + char* pRow = taosArrayGetP(builder->rows, i); + memcpy(POINTER_SHIFT(target->data, dataLen), pRow, memRowTLen(pRow)); + dataLen += memRowTLen(pRow); + } + + *nRows = nMemRows; + + target->schemaLen = 0; + target->dataLen = (int32_t) htonl(dataLen); + target->numOfRows = (int16_t) htons(*nRows); + + return dataLen + sizeof(SSubmitBlk); +} + +size_t nWriteSSubmitBlkBuilder(SSubmitBlkBuilder* builder) { + size_t dataLen = 0; + size_t nRows = taosArrayGetSize(builder->rows); + for (int i = 0; i < nRows; ++i) { + char* pRow = taosArrayGetP(builder->rows, i); + dataLen += memRowTLen(pRow); + } + return dataLen + sizeof(SSubmitBlk); +} + +SSubmitMsgBlocksBuilder* createSSubmitMsgBuilder(int64_t vgId) { + SSubmitMsgBlocksBuilder* builder = calloc(1, sizeof(SSubmitMsgBlocksBuilder)); + if (!builder) { + return NULL; + } + builder->vgId = vgId; + + builder->blockBuilders = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false); + if (!builder->blockBuilders) { + free(builder); + return NULL; + } + return builder; +} + +size_t nWriteSSubmitMsgBuilder(SSubmitMsgBlocksBuilder* builder) { + size_t nWrite = 0; + SSubmitBlkBuilder** iter = taosHashIterate(builder->blockBuilders, NULL); + while (iter) { + SSubmitBlkBuilder* blocksBuilder = *iter; + nWrite += nWriteSSubmitBlkBuilder(blocksBuilder); + iter = taosHashIterate(builder->blockBuilders, iter); + } + return nWrite; +} + +size_t writeSSubmitMsgBlocksBuilder(SSubmitMsgBlocksBuilder* builder, SSubmitBlk* pBlocks, size_t* nRows) { + size_t nWrite = 0; + SSubmitBlkBuilder** iter = taosHashIterate(builder->blockBuilders, NULL); + + // copy all the SSubmitBlk to pBlocks. + while (iter) { + size_t nSubRows = 0; + SSubmitBlkBuilder* blocksBuilder = *iter; + SSubmitBlk* pBlock = POINTER_SHIFT(pBlocks, nWrite); + nWrite += writeSSubmitBlkBuilder(blocksBuilder, pBlock, &nSubRows); + *nRows += nSubRows; + iter = taosHashIterate(builder->blockBuilders, iter); + } + return nWrite; +} + +size_t nBlockSSubmitMsgBlocksBuilder(SSubmitMsgBlocksBuilder* builder) { + return taosHashGetSize(builder->blockBuilders); +} + +void destroySSubmitMsgBuilder(SSubmitMsgBlocksBuilder* builder) { + if (!builder) { + return; + } + + SSubmitBlkBuilder** iter = taosHashIterate(builder->blockBuilders, NULL); + while (iter) { + destroySSubmitBlkBuilder(*iter); + iter = taosHashIterate(builder->blockBuilders, iter); + } + taosHashCleanup(builder->blockBuilders); + free(builder); +} + +bool appendSSubmitMsgBlocks(SSubmitMsgBlocksBuilder* builder, SSubmitBlk* pBlocks, size_t nBlocks) { + SSubmitBlk* pBlock = pBlocks; + for (size_t i = 0; i < nBlocks; ++i) { + // not support SSubmitBlk with schema. + assert(pBlock->schemaLen == 0); + + // get the builder of specific table (by pBlock->uid). + SSubmitBlkBuilder* blocksBuilder = computeIfAbsentSSubmitBlkBuilder(builder, pBlock); + if (!blocksBuilder) { + return false; + } + + if (!appendSSubmitBlkBuilder(blocksBuilder, pBlock)) { + return false; + } + + // go to next block. + size_t blockSize = sizeof (SSubmitBlk) + htonl(pBlock->dataLen); + pBlock = POINTER_SHIFT(pBlock, blockSize); + } + return true; +} + +STableDataBlocksBuilder* createSTableDataBlocksBuilder(int64_t vgId) { + STableDataBlocksBuilder* builder = calloc(1, sizeof(STableDataBlocksBuilder)); + if (!builder) { + return NULL; + } + + builder->blocksBuilder = createSSubmitMsgBuilder(vgId); + if (!builder->blocksBuilder) { + free(builder); + return NULL; + } + + builder->vgId = vgId; + builder->firstBlock = NULL; + return builder; +} + +void destroySTableDataBlocksBuilder(STableDataBlocksBuilder* builder) { + if (!builder) { + return; + } + + destroySSubmitMsgBuilder(builder->blocksBuilder); + free(builder); +} + +bool appendSTableDataBlocksBuilder(STableDataBlocksBuilder* builder, STableDataBlocks* dataBlocks) { + // the data blocks vgId must be same with builder vgId. + if (!dataBlocks || dataBlocks->vgId != builder->vgId) { + return false; + } + + if (!builder->firstBlock) { + builder->firstBlock = dataBlocks; + } + + SSubmitBlk* pBlocks = (SSubmitBlk *)(dataBlocks->pData + dataBlocks->headerSize); + return appendSSubmitMsgBlocks(builder->blocksBuilder, pBlocks, dataBlocks->numOfTables); +} + +STableDataBlocks* buildSTableDataBlocksBuilder(STableDataBlocksBuilder* builder, size_t* nRows) { + SSubmitMsgBlocksBuilder* blocksBuilder = builder->blocksBuilder; + STableDataBlocks *firstBlock = builder->firstBlock; + if (!firstBlock) { + return NULL; + } + + size_t nWriteSize = nWriteSSubmitMsgBuilder(builder->blocksBuilder); + size_t nHeaderSize = firstBlock->headerSize; + size_t nAllocSize = nWriteSize + nHeaderSize; + + // allocate data blocks. + STableDataBlocks* dataBlocks = NULL; + int32_t code = tscCreateDataBlock(nAllocSize, 0, (int32_t) nHeaderSize, &firstBlock->tableName, firstBlock->pTableMeta, &dataBlocks); + if (code != TSDB_CODE_SUCCESS) { + return NULL; + } + + // build the header (using first block). + dataBlocks->size = nHeaderSize; + memcpy(dataBlocks->pData, firstBlock->pData, nHeaderSize); + + // build the SSubmitMsg::blocks. + dataBlocks->size += writeSSubmitMsgBlocksBuilder(blocksBuilder, (SSubmitBlk *) (dataBlocks->pData + nHeaderSize), nRows); + dataBlocks->numOfTables = (int32_t) nBlockSSubmitMsgBlocksBuilder(blocksBuilder); + return dataBlocks; +} + +STableDataBlocksListBuilder* createSTableDataBlocksListBuilder() { + STableDataBlocksListBuilder* builder = calloc(1, sizeof(STableDataBlocksListBuilder)); + if (!builder) { + return NULL; + } + + builder->dataBlocksBuilders = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false); + if (!builder->dataBlocksBuilders) { + free(builder); + return NULL; + } + + return builder; +} + +void destroySTableDataBlocksListBuilder(STableDataBlocksListBuilder* builder) { + if (!builder) { + return; + } + + STableDataBlocksBuilder** iter = taosHashIterate(builder->dataBlocksBuilders, NULL); + while (iter) { + destroySTableDataBlocksBuilder(*iter); + iter = taosHashIterate(builder->dataBlocksBuilders, iter); + } + taosHashCleanup(builder->dataBlocksBuilders); + free(builder); +} + +bool appendSTableDataBlocksListBuilder(STableDataBlocksListBuilder* builder, STableDataBlocks* dataBlocks) { + // get the data blocks builder of specific vgId. + STableDataBlocksBuilder** item = taosHashGet(builder->dataBlocksBuilders, &dataBlocks->vgId, sizeof(dataBlocks->vgId)); + STableDataBlocksBuilder* blocksBuilder = NULL; + if (item) { + blocksBuilder = *item; + } else { + blocksBuilder = createSTableDataBlocksBuilder(dataBlocks->vgId); + if (!blocksBuilder) { + return false; + } + + if (taosHashPut(builder->dataBlocksBuilders, &dataBlocks->vgId, sizeof(dataBlocks->vgId), &blocksBuilder, sizeof(STableDataBlocksBuilder*))) { + destroySTableDataBlocksBuilder(blocksBuilder); + return false; + } + } + + // append to this builder. + return appendSTableDataBlocksBuilder(blocksBuilder, dataBlocks); +} + +SArray* buildSTableDataBlocksListBuilder(STableDataBlocksListBuilder* builder, size_t* nTables, size_t* nRows) { + SArray* pVnodeDataBlockList = taosArrayInit(taosHashGetSize(builder->dataBlocksBuilders), sizeof(STableDataBlocks*)); + if (!pVnodeDataBlockList) { + return NULL; + } + + // build data blocks of each vgId. + STableDataBlocksBuilder** iter = taosHashIterate(builder->dataBlocksBuilders, NULL); + while (iter) { + size_t nSubRows = 0; + STableDataBlocksBuilder* dataBlocksBuilder = *iter; + STableDataBlocks* dataBlocks = buildSTableDataBlocksBuilder(dataBlocksBuilder, &nSubRows); + if (!dataBlocks) { + goto error; + } + *nTables += dataBlocks->numOfTables; + *nRows += nSubRows; + + taosArrayPush(pVnodeDataBlockList, &dataBlocks); + iter = taosHashIterate(builder->dataBlocksBuilders, iter); + } + return pVnodeDataBlockList; + +error: + for (int i = 0; i < taosArrayGetSize(pVnodeDataBlockList); ++i) { + STableDataBlocks* dataBlocks = taosArrayGetP(pVnodeDataBlockList, i); + tscDestroyDataBlock(NULL, dataBlocks, false); + } + taosArrayDestroy(&pVnodeDataBlockList); + return NULL; +} + +STableNameListBuilder* createSTableNameListBuilder() { + STableNameListBuilder* builder = calloc(1, sizeof(STableNameListBuilder)); + if (!builder) { + return NULL; + } + + builder->pTableNameList = taosArrayInit(1, sizeof(SName*)); + if (!builder->pTableNameList) { + free(builder); + return NULL; + } + + return builder; +} + +void destroySTableNameListBuilder(STableNameListBuilder* builder) { + if (!builder) { + return; + } + taosArrayDestroy(&builder->pTableNameList); + free(builder); +} + +bool insertSTableNameListBuilder(STableNameListBuilder* builder, SName* name) { + return taosArrayPush(builder->pTableNameList, &name); +} + +int32_t tscMergeSSqlObjs(SSqlObj** polls, size_t nPolls, SSqlObj* result) { + // statement array is empty. + if (!polls || !nPolls) { + return TSDB_CODE_TSC_INVALID_OPERATION; + } + + STableDataBlocksListBuilder* builder = createSTableDataBlocksListBuilder(); + if (!builder) { + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + + STableNameListBuilder* nameListBuilder = createSTableNameListBuilder(); + if (!nameListBuilder) { + destroySTableDataBlocksListBuilder(builder); + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + + // append the existing data blocks to builder. + for (size_t i = 0; i < nPolls; ++i) { + SSqlObj *pSql = polls[i]; + SInsertStatementParam* pInsertParam = &pSql->cmd.insertParam; + if (!pInsertParam->pDataBlocks) { + continue; + } + + assert(pInsertParam->payloadType == PAYLOAD_TYPE_KV); + assert(!pInsertParam->schemaAttached); + + // append each vnode data block to the builder. + size_t nBlocks = taosArrayGetSize(pInsertParam->pDataBlocks); + for (size_t j = 0; j < nBlocks; ++j) { + STableDataBlocks* tableBlock = taosArrayGetP(pInsertParam->pDataBlocks, j); + if (!appendSTableDataBlocksListBuilder(builder, tableBlock)) { + destroySTableDataBlocksListBuilder(builder); + destroySTableNameListBuilder(nameListBuilder); + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + + for (int k = 0; k < pInsertParam->numOfTables; ++k) { + if (!insertSTableNameListBuilder(nameListBuilder, pInsertParam->pTableNameList[k])) { + destroySTableDataBlocksListBuilder(builder); + destroySTableNameListBuilder(nameListBuilder); + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + } + } + } + + // build the vnode data blocks. + size_t nBlocks = 0; + size_t nRows = 0; + SInsertStatementParam* pInsertParam = &result->cmd.insertParam; + SArray* pVnodeDataBlocksList = buildSTableDataBlocksListBuilder(builder, &nBlocks, &nRows); + if (!pVnodeDataBlocksList) { + destroySTableDataBlocksListBuilder(builder); + destroySTableNameListBuilder(nameListBuilder); + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + + // build the table name list. + size_t nTables = 0; + SName** pTableNameList = buildSTableNameListBuilder(nameListBuilder, &nTables); + if (!pTableNameList) { + destroySTableDataBlocksListBuilder(builder); + destroySTableNameListBuilder(nameListBuilder); + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + + if (nTables != nBlocks) { + destroySTableDataBlocksListBuilder(builder); + destroySTableNameListBuilder(nameListBuilder); + return TSDB_CODE_TSC_INVALID_OPERATION; + } + + // replace table name list. + if (pInsertParam->pTableNameList) { + destroyTableNameList(pInsertParam); + } + pInsertParam->pTableNameList = pTableNameList; + pInsertParam->numOfTables = (int32_t) nTables; + + // replace vnode data blocks. + if (pInsertParam->pDataBlocks) { + tscDestroyBlockArrayList(result, pInsertParam->pDataBlocks); + } + pInsertParam->pDataBlocks = pVnodeDataBlocksList; + pInsertParam->numOfRows = (int32_t) nRows; + + // clean up. + destroySTableDataBlocksListBuilder(builder); + destroySTableNameListBuilder(nameListBuilder); + return TSDB_CODE_SUCCESS; +} diff --git a/src/client/src/tscBatchWrite.c b/src/client/src/tscBatchWrite.c new file mode 100644 index 0000000000000000000000000000000000000000..7c1c7b14157c54675ac5f5df060e68c3843226e5 --- /dev/null +++ b/src/client/src/tscBatchWrite.c @@ -0,0 +1,603 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "osAtomic.h" + +#include "tscBatchMerge.h" +#include "tscBatchWrite.h" +#include "tscLog.h" +#include "tscSubquery.h" +#include "tsclient.h" + +/** + * Represents the callback function and its context. + */ +typedef struct { + __async_cb_func_t fp; + void* param; +} SCallbackHandler; + +/** + * The context of `batchResultCallback`. + */ +typedef struct { + size_t nHandlers; + SCallbackHandler handler[]; +} SBatchCallbackContext; + +/** + * Get the number of insertion row in the sql statement. + * + * @param pSql the sql statement. + * @return int32_t the number of insertion row. + */ +inline static int32_t statementGetInsertionRows(SSqlObj* pSql) { return pSql->cmd.insertParam.numOfRows; } + +/** + * Return the error result to the callback function, and release the sql object. + * + * @param pSql the sql object. + * @param code the error code of the error result. + */ +inline static void tscReturnsError(SSqlObj* pSql, int code) { + if (pSql == NULL) { + return; + } + + pSql->res.code = code; + tscAsyncResultOnError(pSql); +} + +/** + * Proxy function to perform sequentially insert operation. + * + * @param param the context of `batchResultCallback`. + * @param tres the result object. + * @param code the error code. + */ +static void batchResultCallback(void* param, TAOS_RES* tres, int32_t code) { + SBatchCallbackContext* context = param; + SSqlObj* res = tres; + + // handle corner case [context == null]. + if (context == NULL) { + tscError("context in `batchResultCallback` is null, which should not happen"); + if (tres) { + taosReleaseRef(tscObjRef, res->self); + } + return; + } + + // handle corner case [res == null]. + if (res == NULL) { + tscError("tres in `batchResultCallback` is null, which should not happen"); + free(context); + return; + } + + // handle results. + tscDebug("async batch result callback, number of item: %zu", context->nHandlers); + for (int i = 0; i < context->nHandlers; ++i) { + // the result object is shared by many sql objects. + // therefore, we need to increase the ref count. + taosAcquireRef(tscObjRef, res->self); + + SCallbackHandler* handler = &context->handler[i]; + handler->fp(handler->param, res, code); + } + + taosReleaseRef(tscObjRef, res->self); + free(context); +} + +int32_t dispatcherBatchBuilder(SBatchRequest* pRequest, SSqlObj** batch) { + assert(pRequest); + assert(pRequest->pRequests); + assert(pRequest->nRequests); + + // create the callback context. + SBatchCallbackContext* context = + calloc(1, sizeof(SBatchCallbackContext) + pRequest->nRequests * sizeof(SCallbackHandler)); + if (context == NULL) { + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + + tscDebug("create batch call back context: %p", context); + + // initialize the callback context. + context->nHandlers = pRequest->nRequests; + for (size_t i = 0; i < pRequest->nRequests; ++i) { + SSqlObj* pSql = pRequest->pRequests[i]; + context->handler[i].fp = pSql->fp; + context->handler[i].param = pSql->param; + } + + // merge the statements into single one. + tscDebug("start to merge %zu sql objs", pRequest->nRequests); + SSqlObj* pFirst = pRequest->pRequests[0]; + int32_t code = tscMergeSSqlObjs(pRequest->pRequests, pRequest->nRequests, pFirst); + if (code != TSDB_CODE_SUCCESS) { + const char* msg = tstrerror(code); + tscDebug("failed to merge sql objects: %s", msg); + free(context); + return code; + } + + pFirst->fp = batchResultCallback; + pFirst->param = context; + pFirst->fetchFp = pFirst->fp; + taosAcquireRef(tscObjRef, pFirst->self); + *batch = pFirst; + + for (int i = 0; i < pRequest->nRequests; ++i) { + SSqlObj* pSql = pRequest->pRequests[i]; + taosReleaseRef(tscObjRef, pSql->self); + } + return code; +} + +/** + * Poll all the SSqlObj* in the dispatcher's buffer (No Lock). After call this function, + * you need to notify dispatcher->notFull by yourself. + * + * @param dispatcher the dispatcher. + * @param nPolls the number of polled SSqlObj*. + * @return all the SSqlObj* in the buffer. + */ +inline static SBatchRequest* dispatcherPollAll(SAsyncBatchWriteDispatcher* dispatcher) { + if (!dispatcher->bufferSize) { + return NULL; + } + + SBatchRequest* pRequest = malloc(sizeof(SBatchRequest) + sizeof(SSqlObj*) * dispatcher->bufferSize); + if (pRequest == NULL) { + tscError("failed to poll all items: out of memory"); + return NULL; + } + + memcpy(pRequest->pRequests, dispatcher->buffer, sizeof(SSqlObj*) * dispatcher->bufferSize); + pRequest->nRequests = dispatcher->bufferSize; + dispatcher->currentSize = 0; + dispatcher->bufferSize = 0; + return pRequest; +} + +/** + * Poll all the SSqlObj* in the dispatcher's buffer. + * + * @param dispatcher the dispatcher. + * @return all the SSqlObj* in the buffer. + */ +inline static SBatchRequest* dispatcherLockPollAll(SAsyncBatchWriteDispatcher* dispatcher) { + SBatchRequest* pRequest = NULL; + pthread_mutex_lock(&dispatcher->bufferMutex); + pRequest = dispatcherPollAll(dispatcher); + pthread_cond_broadcast(&dispatcher->notFull); + pthread_mutex_unlock(&dispatcher->bufferMutex); + return pRequest; +} + +/** + * @brief Try to offer the SSqlObj* to the dispatcher. + * + * @param dispatcher the async bulk write dispatcher. + * @param pSql the sql object to offer. + * @return return whether offer success. + */ +inline static bool dispatcherTryOffer(SAsyncBatchWriteDispatcher* dispatcher, SSqlObj* pSql) { + pthread_mutex_lock(&dispatcher->bufferMutex); + + // if dispatcher is shutdown, must fail back to normal insertion. + // usually not happen, unless taos_query_a(...) after taos_close(...). + if (atomic_load_8(&dispatcher->shutdown)) { + pthread_mutex_unlock(&dispatcher->bufferMutex); + return false; + } + + // the buffer is full. + while (dispatcher->currentSize >= dispatcher->batchSize) { + if (pthread_cond_wait(&dispatcher->notFull, &dispatcher->bufferMutex)) { + pthread_mutex_unlock(&dispatcher->bufferMutex); + return false; + } + } + + dispatcher->buffer[dispatcher->bufferSize++] = pSql; + dispatcher->currentSize += statementGetInsertionRows(pSql); + tscDebug("sql obj %p has been write to insert buffer", pSql); + + if (dispatcher->currentSize < dispatcher->batchSize) { + pthread_mutex_unlock(&dispatcher->bufferMutex); + return true; + } + + // the dispatcher reaches batch size. + SBatchRequest* pRequest = dispatcherPollAll(dispatcher); + pthread_cond_broadcast(&dispatcher->notFull); + pthread_mutex_unlock(&dispatcher->bufferMutex); + + if (pRequest) { + dispatcherAsyncExecute(pRequest); + } + return true; +} + +void dispatcherExecute(SBatchRequest* pRequest) { + int32_t code = TSDB_CODE_SUCCESS; + // no item in the buffer (items has been taken by other threads). + if (!pRequest) { + return; + } + + assert(pRequest->pRequests); + assert(pRequest->nRequests); + + // merge the statements into single one. + SSqlObj* pSql = NULL; + code = dispatcherBatchBuilder(pRequest, &pSql); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + + tscDebug("merging %zu sql objs into %p", pRequest->nRequests, pSql); + tscHandleMultivnodeInsert(pSql); + return; +_error: + tscError("send async batch sql obj failed, reason: %s", tstrerror(code)); + + // handling the failures. + for (size_t i = 0; i < pRequest->nRequests; ++i) { + SSqlObj* item = pRequest->pRequests[i]; + tscReturnsError(item, code); + } +} + +/** + * Get the timespec after `millis` ms + * + * @param t the timespec. + * @param millis the duration in milliseconds. + * @return the timespec after `millis` ms. + */ +static inline void afterMillis(struct timespec* t, int32_t millis) { + t->tv_nsec += millis * 1000000L; + t->tv_sec += t->tv_nsec / 1000000000L; + t->tv_nsec %= 1000000000L; +} + +/** + * Sleep until `timeout` timespec. When dispatcherShutdown(...) called, the function will return immediately. + * + * @param dispatcher the dispatcher thread to sleep. + * @param timeout the timeout in CLOCK_REALTIME. + */ +inline static void timeoutManagerSleepUntil(SDispatcherTimeoutManager* manager, struct timespec* timeout) { + pthread_mutex_lock(&manager->sleepMutex); + while (true) { + // notified by dispatcherShutdown(...). + if (isShutdownSDispatcherTimeoutManager(manager)) { + break; + } + if (pthread_cond_timedwait(&manager->timeout, &manager->sleepMutex, timeout)) { + fflush(stdout); + break; + } + } + pthread_mutex_unlock(&manager->sleepMutex); +} + +/** + * The thread to manage batching timeout. + */ +static void* timeoutManagerCallback(void* arg) { + SDispatcherTimeoutManager* manager = arg; + setThreadName("tscAsyncBackground"); + + while (!isShutdownSDispatcherTimeoutManager(manager)) { + struct timespec timeout; + clock_gettime(CLOCK_REALTIME, &timeout); + afterMillis(&timeout, manager->timeoutMs); + + SBatchRequest* pRequest = dispatcherLockPollAll(manager->dispatcher); + if (pRequest) { + dispatcherAsyncExecute(pRequest); + } + + // Similar to scheduleAtFixedRate in Java, if the execution time exceed + // `timeoutMs` milliseconds, then there will be no sleep. + timeoutManagerSleepUntil(manager, &timeout); + } + return NULL; +} + +SAsyncBatchWriteDispatcher* createSAsyncBatchWriteDispatcher(STscObj* pClient, int32_t batchSize, int32_t timeoutMs) { + SAsyncBatchWriteDispatcher* dispatcher = calloc(1, sizeof(SAsyncBatchWriteDispatcher) + batchSize * sizeof(SSqlObj*)); + if (!dispatcher) { + return NULL; + } + + assert(pClient != NULL); + + dispatcher->pClient = pClient; + dispatcher->currentSize = 0; + dispatcher->bufferSize = 0; + dispatcher->batchSize = batchSize; + atomic_store_8(&dispatcher->shutdown, false); + + // init the mutex and the cond. + pthread_mutex_init(&dispatcher->bufferMutex, NULL); + pthread_cond_init(&dispatcher->notFull, NULL); + + // init timeout manager. + dispatcher->timeoutManager = createSDispatcherTimeoutManager(dispatcher, timeoutMs); + if (!dispatcher->timeoutManager) { + pthread_mutex_destroy(&dispatcher->bufferMutex); + pthread_cond_destroy(&dispatcher->notFull); + free(dispatcher); + return NULL; + } + + return dispatcher; +} + +/** + * Shutdown the dispatcher and join the timeout thread. + * + * @param dispatcher the dispatcher. + */ +inline static void dispatcherShutdown(SAsyncBatchWriteDispatcher* dispatcher) { + atomic_store_8(&dispatcher->shutdown, true); + if (dispatcher->timeoutManager) { + shutdownSDispatcherTimeoutManager(dispatcher->timeoutManager); + } +} + +void destroySAsyncBatchWriteDispatcher(SAsyncBatchWriteDispatcher* dispatcher) { + if (dispatcher == NULL) { + return; + } + + dispatcherShutdown(dispatcher); + + // poll and send all the statements in the buffer. + while (true) { + SBatchRequest* pRequest = dispatcherLockPollAll(dispatcher); + if (!pRequest) { + break; + } + dispatcherExecute(pRequest); + free(pRequest); + } + // destroy the timeout manager. + destroySDispatcherTimeoutManager(dispatcher->timeoutManager); + + // destroy the mutex. + pthread_mutex_destroy(&dispatcher->bufferMutex); + pthread_cond_destroy(&dispatcher->notFull); + + free(dispatcher); +} + +bool dispatcherCanDispatch(SAsyncBatchWriteDispatcher* dispatcher, SSqlObj* pSql) { + if (pSql == NULL || !pSql->enableBatch) { + return false; + } + + SSqlCmd* pCmd = &pSql->cmd; + SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd); + + // only support insert statement. + if (!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_INSERT)) { + return false; + } + + SInsertStatementParam* pInsertParam = &pCmd->insertParam; + + // file insert not support. + if (TSDB_QUERY_HAS_TYPE(pInsertParam->insertType, TSDB_QUERY_TYPE_FILE_INSERT)) { + return false; + } + + // only support kv payload. + if (pInsertParam->payloadType != PAYLOAD_TYPE_KV) { + return false; + } + + // no schema attached. + if (pInsertParam->schemaAttached) { + return false; + } + + // too many insertion rows, fail back to normal insertion. + if (statementGetInsertionRows(pSql) >= dispatcher->batchSize) { + return false; + } + + return true; +} + +bool dispatcherTryDispatch(SAsyncBatchWriteDispatcher* dispatcher, SSqlObj* pSql) { + if (atomic_load_8(&dispatcher->shutdown)) { + return false; + } + + // the sql object doesn't support bulk insertion. + if (!dispatcherCanDispatch(dispatcher, pSql)) { + return false; + } + + // try to offer pSql to the buffer. + return dispatcherTryOffer(dispatcher, pSql); +} + +/** + * Destroy the SAsyncBatchWriteDispatcher create by SDispatcherManager. + * @param arg the thread local SAsyncBatchWriteDispatcher. + */ +static void destroyDispatcher(void* arg) { + SAsyncBatchWriteDispatcher* dispatcher = arg; + if (!dispatcher) { + return; + } + + destroySAsyncBatchWriteDispatcher(dispatcher); +} + +SDispatcherManager* createDispatcherManager(STscObj* pClient, int32_t batchSize, int32_t timeoutMs, + bool isThreadLocal) { + SDispatcherManager* dispatcher = calloc(1, sizeof(SDispatcherManager)); + if (!dispatcher) { + return NULL; + } + + assert(pClient != NULL); + + dispatcher->pClient = pClient; + dispatcher->batchSize = batchSize; + dispatcher->timeoutMs = timeoutMs; + dispatcher->isThreadLocal = isThreadLocal; + + if (isThreadLocal) { + if (pthread_key_create(&dispatcher->key, destroyDispatcher)) { + free(dispatcher); + return NULL; + } + } else { + dispatcher->pGlobal = createSAsyncBatchWriteDispatcher(pClient, batchSize, timeoutMs); + if (!dispatcher->pGlobal) { + free(dispatcher); + return NULL; + } + } + return dispatcher; +} + +SAsyncBatchWriteDispatcher* dispatcherAcquire(SDispatcherManager* manager) { + if (!manager->isThreadLocal) { + return manager->pGlobal; + } + + SAsyncBatchWriteDispatcher* value = pthread_getspecific(manager->key); + if (value) { + return value; + } + + value = createSAsyncBatchWriteDispatcher(manager->pClient, manager->batchSize, manager->timeoutMs); + if (value) { + pthread_setspecific(manager->key, value); + return value; + } + + return NULL; +} + +void destroyDispatcherManager(SDispatcherManager* manager) { + if (manager) { + if (manager->isThreadLocal) { + pthread_key_delete(manager->key); + } + + if (manager->pGlobal) { + destroySAsyncBatchWriteDispatcher(manager->pGlobal); + } + free(manager); + } +} + +SDispatcherTimeoutManager* createSDispatcherTimeoutManager(SAsyncBatchWriteDispatcher* dispatcher, int32_t timeoutMs) { + SDispatcherTimeoutManager* manager = calloc(1, sizeof(SDispatcherTimeoutManager)); + if (!manager) { + return NULL; + } + + manager->timeoutMs = timeoutMs; + manager->dispatcher = dispatcher; + atomic_store_8(&manager->shutdown, false); + + pthread_mutex_init(&manager->sleepMutex, NULL); + pthread_cond_init(&manager->timeout, NULL); + + // init background thread. + if (pthread_create(&manager->background, NULL, timeoutManagerCallback, manager)) { + pthread_mutex_destroy(&manager->sleepMutex); + pthread_cond_destroy(&manager->timeout); + free(manager); + return NULL; + } + return manager; +} + +void destroySDispatcherTimeoutManager(SDispatcherTimeoutManager* manager) { + if (!manager) { + return; + } + + shutdownSDispatcherTimeoutManager(manager); + manager->dispatcher->timeoutManager = NULL; + + pthread_mutex_destroy(&manager->sleepMutex); + pthread_cond_destroy(&manager->timeout); + free(manager); +} + +void shutdownSDispatcherTimeoutManager(SDispatcherTimeoutManager* manager) { + // mark shutdown, signal shutdown to timeout thread. + pthread_mutex_lock(&manager->sleepMutex); + atomic_store_8(&manager->shutdown, true); + pthread_cond_broadcast(&manager->timeout); + pthread_mutex_unlock(&manager->sleepMutex); + + // make sure the timeout thread exit. + pthread_join(manager->background, NULL); +} + +bool isShutdownSDispatcherTimeoutManager(SDispatcherTimeoutManager* manager) { + if (!manager) { + return true; + } + return atomic_load_8(&manager->shutdown); +} + +/** + * The proxy function to call `dispatcherExecute`. + * + * @param pMsg the schedule message. + */ +static void dispatcherExecuteProxy(struct SSchedMsg* pMsg) { + SBatchRequest* pRequest = pMsg->ahandle; + if (!pRequest) { + return; + } + + pMsg->ahandle = NULL; + dispatcherExecute(pRequest); + free(pRequest); +} + +void dispatcherAsyncExecute(SBatchRequest* pRequest) { + if (!pRequest) { + return; + } + + assert(pRequest->pRequests); + assert(pRequest->nRequests); + + SSchedMsg schedMsg = {0}; + schedMsg.fp = dispatcherExecuteProxy; + schedMsg.ahandle = (void*) pRequest; + schedMsg.thandle = (void*) 1; + schedMsg.msg = 0; + taosScheduleTask(tscQhandle, &schedMsg); +} diff --git a/src/client/src/tscParseInsert.c b/src/client/src/tscParseInsert.c index 38aee8a6787d5ae99b1d98ec0b2fadd42208ac5a..750d670d1be0f00d011f05e8ff960ef875d85519 100644 --- a/src/client/src/tscParseInsert.c +++ b/src/client/src/tscParseInsert.c @@ -1569,7 +1569,7 @@ int tsParseInsertSql(SSqlObj *pSql) { if (pInsertParam->numOfParams > 0) { goto _clean; } - + // merge according to vgId if (!TSDB_QUERY_HAS_TYPE(pInsertParam->insertType, TSDB_QUERY_TYPE_STMT_INSERT) && taosHashGetSize(pInsertParam->pTableBlockHashList) > 0) { if ((code = tscMergeTableDataBlocks(pSql, pInsertParam, true)) != TSDB_CODE_SUCCESS) { @@ -1577,6 +1577,7 @@ int tsParseInsertSql(SSqlObj *pSql) { } } + pCmd->insertParam.numOfRows = totalNum; code = TSDB_CODE_SUCCESS; goto _clean; diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 8eb7f15fde6a878ff617e047826df91c9e2c1878..b7cc85908c425ed711121b91ccf0835d61a24cb9 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -862,7 +862,7 @@ int32_t tscValidateSqlInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) { const char* msg3 = "name too long"; const char* msg5 = "invalid user rights"; const char* msg7 = "not support options"; - const char* msg8 = "tags filter length must over 3 bytes."; + const char* msg8 = "tags filter string length must less than 255 bytes."; pCmd->command = pInfo->type; @@ -903,15 +903,14 @@ int32_t tscValidateSqlInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) { } } else if (pUser->type == TSDB_ALTER_USER_TAGS) { SStrToken* pTags = &pUser->tags; - if(pTags->n < 4) - return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg8); - } else { + if (pTags->n > TSDB_TAGS_LEN - 1 ) return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg8); + } else { return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg7); } } - break; - } + break; + } case TSDB_SQL_CFG_LOCAL: { SMiscInfo *pMiscInfo = pInfo->pMiscInfo; diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 409c41fb1a2d0952090ff2ef90581e37007e3679..7dd1c71fb69b94d95d83a0a9fd782a802b74724f 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -21,6 +21,7 @@ #include "tnote.h" #include "trpc.h" #include "tscLog.h" +#include "tscBatchWrite.h" #include "tscSubquery.h" #include "tscUtil.h" #include "tsclient.h" @@ -163,15 +164,30 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa pSql->fp = fp; pSql->param = param; pSql->cmd.command = TSDB_SQL_CONNECT; - + + pObj->dispatcherManager = NULL; + if (tsWriteBatchSize > 1 && !tscEmbedded) { + pObj->dispatcherManager = createDispatcherManager(pObj, tsWriteBatchSize, tsWriteBatchTimeout, tsWriteBatchThreadLocal); + if (!pObj->dispatcherManager) { + terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; + tscReleaseRpc(pRpcObj); + free(pSql); + free(pObj); + return NULL; + } + } + if (TSDB_CODE_SUCCESS != tscAllocPayload(&pSql->cmd, TSDB_DEFAULT_PAYLOAD_SIZE)) { terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; + if (pObj->dispatcherManager) { + destroyDispatcherManager(pObj->dispatcherManager); + } tscReleaseRpc(pRpcObj); free(pSql); free(pObj); return NULL; } - + if (taos != NULL) { *taos = pObj; } diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index 7ea24ce8290fe5c5fda2841e836d2cdf82e2bd40..ac6b7355d52543727413393f45ca09b581ceab7a 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -339,7 +339,7 @@ bool sqlBufSend(TAOS *taos, char *sqlBuf) { } while(++sleepCnt < 20); strcat(sqlBuf, ";"); - taos_query_ra(taos, sqlBuf, cbSendValues, NULL); + taos_query_ra(taos, sqlBuf, cbSendValues, NULL, false); return true; } diff --git a/src/client/src/tscSystem.c b/src/client/src/tscSystem.c index 944b85e996db3364181d9a2ce4132e827cc3f406..50c7ceb733ed405b89b9fc8d4fe3491aa7fd89ec 100644 --- a/src/client/src/tscSystem.c +++ b/src/client/src/tscSystem.c @@ -14,18 +14,19 @@ */ #include "os.h" +#include "qScript.h" #include "taosmsg.h" +#include "tconfig.h" +#include "tglobal.h" +#include "tnote.h" #include "tref.h" #include "trpc.h" -#include "tnote.h" -#include "ttimer.h" -#include "tsched.h" +#include "tscBatchWrite.h" #include "tscLog.h" +#include "tsched.h" #include "tsclient.h" -#include "tglobal.h" -#include "tconfig.h" +#include "ttimer.h" #include "ttimezone.h" -#include "qScript.h" // global, not configurable #define TSC_VAR_NOT_RELEASE 1 @@ -319,6 +320,49 @@ void taos_cleanup(void) { taosTmrCleanUp(p); } +/** + * Set the option value (int32, uint16, int16, int8). + * @param cfg the config. + * @param str the value string. + * @return whether is set or not. + */ +static bool taos_set_option_int(SGlobalCfg * cfg, const char* str) { + if (cfg->cfgStatus <= TAOS_CFG_CSTATUS_OPTION) { + char* p = NULL; + errno = 0; + long value = strtol(str, &p, 10); + + if (errno != 0 || p == str) { + tscError("failed to parse option: %s, value: %s", cfg->option, str); + return false; + } + + if ((float) value < cfg->minValue || (float) value > cfg->maxValue) { + tscError("failed to set option: %s, setValue: %ld, minValue: %f, maxValue: %f", cfg->option, value, cfg->minValue, cfg->maxValue); + return false; + } + + if (cfg->valType == TAOS_CFG_VTYPE_INT32) { + *((int32_t*) cfg->ptr) = (int32_t) value; + } else if (cfg->valType == TAOS_CFG_VTYPE_UINT16) { + *((uint16_t*) cfg->ptr) = (uint16_t) value; + } else if (cfg->valType == TAOS_CFG_VTYPE_INT16) { + *((int16_t*) cfg->ptr) = (int16_t) value; + } else if (cfg->valType == TAOS_CFG_VTYPE_INT8) { + *((int8_t*) cfg->ptr) = (int8_t) value; + } else { + tscError("failed to set option: %s, type expected %d", cfg->option, cfg->valType); + return false; + } + + cfg->cfgStatus = TAOS_CFG_CSTATUS_OPTION; + tscDebug("config option: %s has set to %s", cfg->option, str); + return true; + } + tscWarn("config option: %s, is configured by %s", cfg->option, tsCfgStatusStr[cfg->cfgStatus]); + return false; +} + static int taos_options_imp(TSDB_OPTION option, const char *pStr) { SGlobalCfg *cfg = NULL; @@ -473,6 +517,27 @@ static int taos_options_imp(TSDB_OPTION option, const char *pStr) { tsCfgStatusStr[cfg->cfgStatus], (char *)cfg->ptr); } break; + + case TSDB_WRITE_BATCH_SIZE: { + cfg = taosGetConfigOption("writeBatchSize"); + assert(cfg != NULL); + taos_set_option_int(cfg, pStr); + break; + } + + case TSDB_WRITE_BATCH_TIMEOUT: { + cfg = taosGetConfigOption("writeBatchTimeout"); + assert(cfg != NULL); + taos_set_option_int(cfg, pStr); + break; + } + + case TSDB_WRITE_BATCH_THREAD_LOCAL: { + cfg = taosGetConfigOption("writeBatchThreadLocal"); + assert(cfg != NULL); + taos_set_option_int(cfg, pStr); + break; + } default: // TODO return the correct error code to client in the format for taos_errstr() diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 6e29d6ddc3facdfac745557b49f31d80a4a2cb11..c0a96c5b3ae0e5345344a1f6968ec4ba37d4a281 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -20,6 +20,7 @@ #include "texpr.h" #include "tkey.h" #include "tmd5.h" +#include "tscBatchWrite.h" #include "tscGlobalmerge.h" #include "tscLog.h" #include "tscProfile.h" @@ -1841,32 +1842,36 @@ void tscDestroyBoundColumnInfo(SParsedDataColInfo* pColInfo) { tfree(pColInfo->colIdxInfo); } -void tscDestroyDataBlock(SSqlObj *pSql, STableDataBlocks* pDataBlock, bool removeMeta) { - if (pDataBlock == NULL) { +void destroySTableDataBlocks(STableDataBlocks* pDataBlocks) { + if (!pDataBlocks) { return; } + tfree(pDataBlocks->pData); + if (!pDataBlocks->cloned) { + tfree(pDataBlocks->params); + + // free the refcount for metermeta + if (pDataBlocks->pTableMeta != NULL) { + tfree(pDataBlocks->pTableMeta); + } - tfree(pDataBlock->pData); + tscDestroyBoundColumnInfo(&pDataBlocks->boundColumnInfo); + } + tfree(pDataBlocks); +} +void tscDestroyDataBlock(SSqlObj *pSql, STableDataBlocks* pDataBlock, bool removeMeta) { + if (pDataBlock == NULL) { + return; + } + if (removeMeta) { char name[TSDB_TABLE_FNAME_LEN] = {0}; tNameExtractFullName(&pDataBlock->tableName, name); - taosHashRemove(UTIL_GET_TABLEMETA(pSql), name, strnlen(name, TSDB_TABLE_FNAME_LEN)); } - if (!pDataBlock->cloned) { - tfree(pDataBlock->params); - - // free the refcount for metermeta - if (pDataBlock->pTableMeta != NULL) { - tfree(pDataBlock->pTableMeta); - } - - tscDestroyBoundColumnInfo(&pDataBlock->boundColumnInfo); - } - - tfree(pDataBlock); + destroySTableDataBlocks(pDataBlock); } SParamInfo* tscAddParamToDataBlock(STableDataBlocks* pDataBlock, char type, uint8_t timePrec, int16_t bytes, @@ -1893,18 +1898,22 @@ SParamInfo* tscAddParamToDataBlock(STableDataBlocks* pDataBlock, char type, uint return param; } -void* tscDestroyBlockArrayList(SSqlObj *pSql, SArray* pDataBlockList) { - if (pDataBlockList == NULL) { - return NULL; +void destroySTableDataBlocksList(SArray* pDataBlocks) { + if (!pDataBlocks) { + return; } - - size_t size = taosArrayGetSize(pDataBlockList); - for (int32_t i = 0; i < size; i++) { - void* d = taosArrayGetP(pDataBlockList, i); - tscDestroyDataBlock(pSql, d, false); + size_t nBlocks = taosArrayGetSize(pDataBlocks); + for (size_t i = 0; i < nBlocks; ++i) { + STableDataBlocks * pDataBlock = taosArrayGetP(pDataBlocks, i); + if (pDataBlock) { + destroySTableDataBlocks(pDataBlock); + } } + taosArrayDestroy(&pDataBlocks); +} - taosArrayDestroy(&pDataBlockList); +void* tscDestroyBlockArrayList(SSqlObj *pSql, SArray* pDataBlockList) { + destroySTableDataBlocksList(pDataBlockList); return NULL; } @@ -2202,42 +2211,35 @@ static int32_t getRowExpandSize(STableMeta* pTableMeta) { return result; } -static void extractTableNameList(SSqlObj *pSql, SInsertStatementParam *pInsertParam, bool freeBlockMap) { - pInsertParam->numOfTables = (int32_t) taosHashGetSize(pInsertParam->pTableBlockHashList); - if (pInsertParam->pTableNameList == NULL) { - pInsertParam->pTableNameList = malloc(pInsertParam->numOfTables * POINTER_BYTES); - } - - STableDataBlocks **p1 = taosHashIterate(pInsertParam->pTableBlockHashList, NULL); - int32_t i = 0; - while(p1) { - STableDataBlocks* pBlocks = *p1; - //tfree(pInsertParam->pTableNameList[i]); - - pInsertParam->pTableNameList[i++] = tNameDup(&pBlocks->tableName); - p1 = taosHashIterate(pInsertParam->pTableBlockHashList, p1); - } - - if (freeBlockMap) { - pInsertParam->pTableBlockHashList = tscDestroyBlockHashTable(pSql, pInsertParam->pTableBlockHashList, false); - } -} - int32_t tscMergeTableDataBlocks(SSqlObj *pSql, SInsertStatementParam *pInsertParam, bool freeBlockMap) { const int INSERT_HEAD_SIZE = sizeof(SMsgDesc) + sizeof(SSubmitMsg); int code = 0; bool isRawPayload = IS_RAW_PAYLOAD(pInsertParam->payloadType); - void* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false); + size_t initialSize = taosHashGetSize(pInsertParam->pTableBlockHashList); + initialSize = initialSize > 128 ? 128 : initialSize; + + void* pVnodeDataBlockHashList = taosHashInit(initialSize, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false); SArray* pVnodeDataBlockList = taosArrayInit(8, POINTER_BYTES); - - STableDataBlocks** p = taosHashIterate(pInsertParam->pTableBlockHashList, NULL); - - STableDataBlocks* pOneTableBlock = *p; - + + // alloc table name list. + size_t numOfTables = taosHashGetSize(pInsertParam->pTableBlockHashList); + if (pInsertParam->pTableNameList) { + destroyTableNameList(pInsertParam); + } + pInsertParam->pTableNameList = calloc(numOfTables, sizeof(SName*)); + pInsertParam->numOfTables = (int32_t) numOfTables; + + size_t tail = 0; SBlockKeyInfo blkKeyInfo = {0}; // share by pOneTableBlock - - while(pOneTableBlock) { + STableDataBlocks** iter = taosHashIterate(pInsertParam->pTableBlockHashList, NULL); + while (iter) { + STableDataBlocks* pOneTableBlock = *iter; SSubmitBlk* pBlocks = (SSubmitBlk*) pOneTableBlock->pData; + iter = taosHashIterate(pInsertParam->pTableBlockHashList, iter); + + // extract table name list. + pInsertParam->pTableNameList[tail++] = tNameDup(&pOneTableBlock->tableName); + if (pBlocks->numOfRows > 0) { // the maximum expanded size in byte when a row-wise data is converted to SDataRow format int32_t expandSize = isRawPayload ? getRowExpandSize(pOneTableBlock->pTableMeta) : 0; @@ -2320,19 +2322,19 @@ int32_t tscMergeTableDataBlocks(SSqlObj *pSql, SInsertStatementParam *pInsertPar dataBuf->numOfTables += 1; pBlocks->numOfRows = 0; - }else { + } else { tscDebug("0x%"PRIx64" table %s data block is empty", pInsertParam->objectId, pOneTableBlock->tableName.tname); } - - p = taosHashIterate(pInsertParam->pTableBlockHashList, p); - if (p == NULL) { - break; + + if (freeBlockMap) { + tscDestroyDataBlock(pSql, pOneTableBlock, false); } - - pOneTableBlock = *p; } - - extractTableNameList(pSql, pInsertParam, freeBlockMap); + + if (freeBlockMap) { + taosHashCleanup(pInsertParam->pTableBlockHashList); + pInsertParam->pTableBlockHashList = NULL; + } // free the table data blocks; pInsertParam->pDataBlocks = pVnodeDataBlockList; @@ -2353,6 +2355,9 @@ void tscCloseTscObj(void *param) { tscReleaseRpc(pObj->pRpcObj); pthread_mutex_destroy(&pObj->mutex); tscReleaseClusterInfo(pObj->clusterId); + + destroyDispatcherManager(pObj->dispatcherManager); + pObj->dispatcherManager = NULL; tfree(pObj); } diff --git a/src/common/inc/tglobal.h b/src/common/inc/tglobal.h index 20153e76ea4af28996f32778fadf9191e5ccf9d9..bcc0d68f95cbd9ec884df5e87b3ec10bd5463883 100644 --- a/src/common/inc/tglobal.h +++ b/src/common/inc/tglobal.h @@ -92,6 +92,9 @@ extern int32_t tsRetryStreamCompDelay; extern float tsStreamComputDelayRatio; // the delayed computing ration of the whole time window extern int32_t tsProjectExecInterval; extern int64_t tsMaxRetentWindow; +extern bool tsWriteBatchThreadLocal; +extern int32_t tsWriteBatchSize; +extern int32_t tsWriteBatchTimeout; // db parameters in client extern int32_t tsCacheBlockSize; diff --git a/src/common/src/tglobal.c b/src/common/src/tglobal.c index 57b77d32b09adf99e159405652ffd382f1cf6eae..30711dfd3997cd8f6fd10dd7af8b147b137ac0c0 100644 --- a/src/common/src/tglobal.c +++ b/src/common/src/tglobal.c @@ -127,6 +127,11 @@ int8_t tsSortWhenGroupBy = 1; int32_t tsProjectExecInterval = 10000; // every 10sec, the projection will be executed once int64_t tsMaxRetentWindow = 24 * 3600L; // maximum time window tolerance +// The tsc async write batching feature (using ABWD). +bool tsWriteBatchThreadLocal = false; // if thread local enable, each thread will allocate a dispatcher. +int32_t tsWriteBatchSize = 0; // suggest: 64 - 512, default 0, 0 means disable batching. +int32_t tsWriteBatchTimeout = 10; // suggest: 2 - 100 (unit: milliseconds) + // the maximum allowed query buffer size during query processing for each data node. // -1 no limit (default) // 0 no query allowed, queries are disabled @@ -1874,6 +1879,37 @@ static void doInitGlobalConfig(void) { cfg.ptrLength = 0; cfg.unitType = TAOS_CFG_UTYPE_NONE; taosInitConfigOption(cfg); + + cfg.option = "writeBatchThreadLocal"; + cfg.ptr = &tsWriteBatchThreadLocal; + cfg.valType = TAOS_CFG_VTYPE_INT8; + cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW | TSDB_CFG_CTYPE_B_CLIENT; + cfg.minValue = 0; + cfg.maxValue = 1; + cfg.ptrLength = 0; + cfg.unitType = TAOS_CFG_UTYPE_NONE; + taosInitConfigOption(cfg); + + cfg.option = "writeBatchSize"; + cfg.ptr = &tsWriteBatchSize; + cfg.valType = TAOS_CFG_VTYPE_INT32; + cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW | TSDB_CFG_CTYPE_B_CLIENT; + cfg.minValue = 0; + cfg.maxValue = 4096; + cfg.ptrLength = 0; + cfg.unitType = TAOS_CFG_UTYPE_NONE; + taosInitConfigOption(cfg); + + cfg.option = "writeBatchTimeout"; + cfg.ptr = &tsWriteBatchTimeout; + cfg.valType = TAOS_CFG_VTYPE_INT32; + cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW | TSDB_CFG_CTYPE_B_CLIENT; + cfg.minValue = 1; + cfg.maxValue = 2048; + cfg.ptrLength = 0; + cfg.unitType = TAOS_CFG_UTYPE_NONE; + taosInitConfigOption(cfg); + assert(tsGlobalConfigNum == TSDB_CFG_MAX_NUM); #else // if TD_TSZ macro define, have 5 count configs, so must add 5 diff --git a/src/inc/taos.h b/src/inc/taos.h index b2e2e9f3c625e05c5ebbedec6d0c84b31a3a9259..efb3ce25d678f7af83954c15e0625f697315c714 100644 --- a/src/inc/taos.h +++ b/src/inc/taos.h @@ -60,7 +60,10 @@ typedef enum { TSDB_OPTION_TIMEZONE, TSDB_OPTION_CONFIGDIR, TSDB_OPTION_SHELL_ACTIVITY_TIMER, - TSDB_MAX_OPTIONS + TSDB_MAX_OPTIONS, + TSDB_WRITE_BATCH_SIZE, + TSDB_WRITE_BATCH_TIMEOUT, + TSDB_WRITE_BATCH_THREAD_LOCAL } TSDB_OPTION; typedef struct taosField { diff --git a/src/kit/shell/src/shellEngine.c b/src/kit/shell/src/shellEngine.c index ecd7d2d31ecf77a5c9e41b24c99b8e68dc7f880a..1bdcc532943318f5fe368b3832145ad7e06f6a56 100644 --- a/src/kit/shell/src/shellEngine.c +++ b/src/kit/shell/src/shellEngine.c @@ -377,7 +377,7 @@ void shellRunCommandOnServer(TAOS *con, char command[]) { } else { int num_rows_affacted = taos_affected_rows(pSql); et = taosGetTimestampUs(); - printf("Query OK, %d of %d row(s) in database (%.6fs)\n", num_rows_affacted, num_rows_affacted, (et - st) / 1E6); + printf("Query OK, %d row(s) affected in set (%.6fs)\n", num_rows_affacted, (et - st) / 1E6); #ifndef WINDOWS // call auto tab diff --git a/src/mnode/src/mnodeUser.c b/src/mnode/src/mnodeUser.c index f866da96cdd0b3d8b1809e0eeb53b6e41cfc067d..85f20b6d24990c9229fb47e5744d232b96508d8b 100644 --- a/src/mnode/src/mnodeUser.c +++ b/src/mnode/src/mnodeUser.c @@ -97,7 +97,7 @@ static int32_t mnodeUserActionDecode(SSdbRow *pRow) { SUserObj *pUser = (SUserObj *)calloc(1, sizeof(SUserObj)); if (pUser == NULL) return TSDB_CODE_MND_OUT_OF_MEMORY; - memcpy(pUser, pRow->rowData, tsUserUpdateSize); + memcpy(pUser, pRow->rowData, pRow->rowSize); pRow->pObj = pUser; return TSDB_CODE_SUCCESS; } diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index c352c9c47a99fa27a5942081b9735bfa3878c067..d3e38cf365272781ecd3b7727d1f9809f50897af 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -318,10 +318,10 @@ static int32_t getColDataFromGroupRes(SGroupResInfo* pGroupResInfo, SQueryRuntim for (int32_t i = 0; i < numRows; ++i) { SResultRow* row = taosArrayGetP(pGroupResInfo->pRows, i); row->groupIndex = i; - tFilePage* page = getResBufPage(pRuntimeEnv->pResultBuf, row->pageId); - int32_t rowsToCopy = 1; - char* out = colData + numOfResult * colBytes; - char* in = getPosInResultPage(pRuntimeEnv->pQueryAttr, page, row->offset, colDataOffset); + tFilePage* page = getResBufPage(pRuntimeEnv->pResultBuf, row->pageId); + int32_t rowsToCopy = 1; + char* out = colData + numOfResult * colBytes; + char* in = getPosInResultPage(pRuntimeEnv->pQueryAttr, page, row->offset, colDataOffset); memcpy(out, in, colBytes * rowsToCopy); numOfResult += rowsToCopy; } @@ -2837,6 +2837,10 @@ static bool isCachedLastQuery(SQueryAttr* pQueryAttr) { return false; } + if (pQueryAttr->pFilters != NULL || pQueryAttr->pFilterInfo != NULL) { + return false; + } + return true; } @@ -3165,7 +3169,7 @@ static bool overlapWithTimeWindow(SQueryAttr* pQueryAttr, SDataBlockInfo* pBlock return true; } - while(w.skey < pBlockInfo->window.ekey) { + while (w.skey < pBlockInfo->window.ekey) { // add one slding if (pQueryAttr->interval.slidingUnit == 'n' || pQueryAttr->interval.slidingUnit == 'y') ekey = taosTimeAdd(ekey, pQueryAttr->interval.sliding, pQueryAttr->interval.slidingUnit, pQueryAttr->precision); @@ -3180,7 +3184,7 @@ static bool overlapWithTimeWindow(SQueryAttr* pQueryAttr, SDataBlockInfo* pBlock getAlignQueryTimeWindow(pQueryAttr, ekey, sk, ek, &w); } - while(1) { + while (1) { if (w.ekey < pBlockInfo->window.skey) { break; } @@ -6339,7 +6343,7 @@ SOperatorInfo* createOrderOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorI SColumnInfoData col = {{0}}; col.info.colId = pExpr[i].base.colInfo.colId; col.info.bytes = pExpr[i].base.resBytes; - col.info.type = pExpr[i].base.resType; + col.info.type = pExpr[i].base.resType; taosArrayPush(pDataBlock->pDataBlock, &col); if (!found && col.info.colId == pOrderVal->orderColId) { @@ -6874,18 +6878,18 @@ static bool doEveryInterpolation(SOperatorInfo* pOperatorInfo, SSDataBlock* pBlo break; } } - + if (pCtx == NULL) { goto group_finished_exit; } - + TSKEY* tsCols = NULL; if (pBlock && pBlock->pDataBlock != NULL) { SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, 0); tsCols = (int64_t*)pColDataInfo->pData; assert(tsCols[0] == pBlock->info.window.skey && tsCols[pBlock->info.rows - 1] == pBlock->info.window.ekey); } - + if (pCtx->startTs == INT64_MIN) { if (pQueryAttr->range.skey == INT64_MIN) { if (NULL == tsCols) { @@ -6984,7 +6988,7 @@ static bool doEveryInterpolation(SOperatorInfo* pOperatorInfo, SSDataBlock* pBlo return false; } int32_t nRows = pBlock ? pBlock->info.rows : 0; - int32_t startPos = binarySearchForKey((char*) tsCols, nRows, pCtx->startTs, pQueryAttr->order.order); + int32_t startPos = binarySearchForKey((char*)tsCols, nRows, pCtx->startTs, pQueryAttr->order.order); if (ascQuery && pQueryAttr->fillType != TSDB_FILL_NEXT && pCtx->start.key == INT64_MIN) { if (startPos < 0) { @@ -8522,18 +8526,18 @@ SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator SColIndex* idx = taosArrayGet(pInfo->orderColumnList, i); offset += pExpr[idx->colIndex].base.resBytes; } - + SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); if (pOperator == NULL) { goto _clean; } - + pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); if (pInfo->pRes == NULL) { tfree(pOperator); goto _clean; } - + pOperator->name = "SLimitOperator"; pOperator->operatorType = OP_SLimit; pOperator->blockingOptr = false; @@ -10171,7 +10175,8 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SGroupbyExpr* pGroupbyExpr, S pTableqinfo->pGroupList = taosArrayInit(numOfGroups, POINTER_BYTES); pTableqinfo->numOfTables = pTableGroupInfo->numOfTables; - pTableqinfo->map = taosHashInit(pTableGroupInfo->numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); + pTableqinfo->map = taosHashInit(pTableGroupInfo->numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, + HASH_ENTRY_LOCK); } pQInfo->pBuf = calloc(pTableGroupInfo->numOfTables, sizeof(STableQueryInfo)); @@ -10373,7 +10378,7 @@ static void doDestroyTableQueryInfo(STableGroupInfo* pTableqinfoGroupInfo) { taosArrayDestroy(&pTableqinfoGroupInfo->pGroupList); - SHashObj *pmap = pTableqinfoGroupInfo->map; + SHashObj* pmap = pTableqinfoGroupInfo->map; if (pmap == atomic_val_compare_exchange_ptr(&pTableqinfoGroupInfo->map, pmap, NULL)) { taosHashCleanup(pmap); } diff --git a/src/tsdb/src/tsdbCommit.c b/src/tsdb/src/tsdbCommit.c index 3abc3e9acc6c8f6e909d4d6ef5f043dc2ee3e156..9358530581a3a755b1765e6e1571bf06a428b074 100644 --- a/src/tsdb/src/tsdbCommit.c +++ b/src/tsdb/src/tsdbCommit.c @@ -1080,7 +1080,7 @@ int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDFile int rowsToWrite = pDataCols->numOfRows; ASSERT(rowsToWrite > 0 && rowsToWrite <= pCfg->maxRowsPerFileBlock); - ASSERT((!isLast) || rowsToWrite < pCfg->minRowsPerFileBlock); + // ASSERT((!isLast) || rowsToWrite < pCfg->minRowsPerFileBlock); // Make buffer space if (tsdbMakeRoom(ppBuf, tsdbBlockStatisSize(pDataCols->numOfCols, SBlockVerLatest)) < 0) { diff --git a/src/util/inc/tconfig.h b/src/util/inc/tconfig.h index f207b9b5c4dd270cc4364e548449edcbee43ae4a..d6f4c79ddd8c0d22149eeead5ef23887aae69b04 100644 --- a/src/util/inc/tconfig.h +++ b/src/util/inc/tconfig.h @@ -20,7 +20,7 @@ extern "C" { #endif -#define TSDB_CFG_MAX_NUM 138 +#define TSDB_CFG_MAX_NUM 141 #define TSDB_CFG_PRINT_LEN 23 #define TSDB_CFG_OPTION_LEN 24 #define TSDB_CFG_VALUE_LEN 41 diff --git a/src/util/src/tutil.c b/src/util/src/tutil.c index 7e08178c42cbb7e34a0aec340352916d874a7094..649f8fa5e9697eac76f4a97eb44d5b18e06924ec 100644 --- a/src/util/src/tutil.c +++ b/src/util/src/tutil.c @@ -225,66 +225,59 @@ char *tstrstr(char *src, char *dst, bool ignoreInEsc) { } char* strtolower(char *dst, const char *src) { - int esc = 0; - char quote = 0, *p = dst, c; - - assert(dst != NULL); - - for (c = *src++; c; c = *src++) { - if (esc) { - esc = 0; - } else if (quote) { - if (c == '\\') { - esc = 1; - } else if (c == quote) { - quote = 0; + if (src == NULL || dst == NULL) { + return dst; + } + char* const ret = dst; + while (*src) { + const char ch = *(src++); + *(dst++) = (ch >= 'A' && ch <= 'Z') ? ch - 'A' + 'a' : ch; + + if (ch == '\'' || ch == '"') { + char prev = ch; + while (*src) { + const char next = *(src++); + *(dst++) = next; + if (prev != '\\' && next == ch) break; + prev = next; + } + } else if (ch == '`') { + while (*src) { + const char next = *(src++); + *(dst++) = next; + if (next == ch) break; } - } else if (c >= 'A' && c <= 'Z') { - c -= 'A' - 'a'; - } else if (c == '\'' || c == '"') { - quote = c; } - *p++ = c; } - - *p = 0; - return dst; + *(dst) = 0; + return ret; } char* strntolower(char *dst, const char *src, int32_t n) { - int esc = 0, inEsc = 0; - char quote = 0, *p = dst, c; - assert(dst != NULL); - if (n == 0) { - *p = 0; - return dst; - } - for (c = *src++; n-- > 0; c = *src++) { - if (esc) { - esc = 0; - } else if (quote) { - if (c == '\\') { - esc = 1; - } else if (c == quote) { - quote = 0; + char* const end = dst + n; + while (dst != end) { + const char ch = *(src++); + *(dst++) = (ch >= 'A' && ch <= 'Z') ? ch - 'A' + 'a' : ch; + + if (ch == '\'' || ch == '"') { + char prev = ch; + while (dst != end) { + const char next = *(src++); + *(dst++) = next; + if (prev != '\\' && next == ch) break; + prev = next; } - } else if (inEsc) { - if (c == '`') { - inEsc = 0; + } else if (ch == '`') { + while (dst != end) { + const char next = *(src++); + *(dst++) = next; + if (next == ch) break; } - } else if (c >= 'A' && c <= 'Z') { - c -= 'A' - 'a'; - } else if (inEsc == 0 && (c == '\'' || c == '"')) { - quote = c; - } else if (c == '`' && quote == 0) { - inEsc = 1; } - *p++ = c; } - - *p = 0; - return dst; + *(dst) = 0; + return dst - n; } char* strntolower_s(char *dst, const char *src, int32_t n) {