diff --git a/src/client/inc/tscBulkWrite.h b/src/client/inc/tscBulkWrite.h index 6a5f805157fe51b6aea3f89e34896c05f90376d8..1bb3b8adbd7e5d31a2411379c2d2278346b73e1b 100644 --- a/src/client/inc/tscBulkWrite.h +++ b/src/client/inc/tscBulkWrite.h @@ -22,7 +22,6 @@ extern "C" { #include "tlist.h" #include "tarray.h" -#include "tsclient.h" #include "tthread.h" typedef struct SAsyncBulkWriteDispatcher { @@ -54,26 +53,8 @@ typedef struct SAsyncBulkWriteDispatcher { volatile bool shutdown; } SAsyncBulkWriteDispatcher; - - - -/** - * Return the error result to the callback function, and release the sql object. - * - * @param pSql the sql object. - * @param code the error code of the error result. - */ -void tscReturnsError(SSqlObj* pSql, int code); - -/** - * Proxy function to perform sequentially insert operation. - * - * @param param the context of `batchResultCallback`. - * @param tres the result object. - * @param code the error code. - */ -void batchResultCallback(void* param, TAOS_RES* tres, int32_t code); - +// forward declaration. +typedef struct SSqlObj SSqlObj; /** * Merge the statements into single SSqlObj. * @@ -84,14 +65,6 @@ void batchResultCallback(void* param, TAOS_RES* tres, int32_t code); */ int32_t dispatcherStatementMerge(SArray* statements, SSqlObj** result); -/** - * Get the number of insertion row in the sql statement. - * - * @param pSql the sql statement. - * @return int32_t the number of insertion row. - */ -inline static int32_t statementGetInsertionRows(SSqlObj* pSql) { return pSql->cmd.insertParam.numOfRows; } - /** * Poll all the SSqlObj* in the dispatcher's buffer. * diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 5d1f08b1621273ea73cc407726fbdb9a6d5973b8..3c0a1a6a00e01d56050a0db8c5b45cf9734c398a 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -544,6 +544,9 @@ extern SHashObj *tscVgroupMap; extern SHashObj *tscTableMetaMap; extern SCacheObj *tscVgroupListBuf; +// forward declaration. +typedef struct SAsyncBulkWriteDispatcher SAsyncBulkWriteDispatcher; +extern SAsyncBulkWriteDispatcher* tscDispatcher; extern int tscObjRef; extern void *tscTmr; extern void *tscQhandle; diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index 7a0b9ae0a7405bc1d9de22927f748b088148f6eb..9049fb3a97d62385c4d3494b0063652924587074 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -37,26 +37,6 @@ static void tscAsyncQueryRowsForNextVnode(void *param, TAOS_RES *tres, int numOf */ static void tscAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOfRows); -static SAsyncBulkWriteDispatcher* tscDispatcher; - -/** - * Init the taosc async bulk write dispatcher. - * - * @param batchSize the batchSize of async bulk write dispatcher. - * @param timeoutMs the timeout of batching in milliseconds. - */ -void tscInitAsyncDispatcher(int32_t batchSize, int32_t timeoutMs) { - tscDispatcher = createAsyncBulkWriteDispatcher(batchSize, timeoutMs); -} - -/** - * Destroy the taosc async bulk write dispatcher. - */ -void tscDestroyAsyncDispatcher() { - destroyAsyncDispatcher(tscDispatcher); - tscDispatcher = NULL; -} - void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, __async_cb_func_t fp, void* param, const char* sqlstr, size_t sqlLen) { SSqlCmd* pCmd = &pSql->cmd; diff --git a/src/client/src/tscBulkWrite.c b/src/client/src/tscBulkWrite.c index bacc2408cd320da93c001a312ef1575fd755e7fe..998a5ec0d40adaf9bb59155386074c43a24004ea 100644 --- a/src/client/src/tscBulkWrite.c +++ b/src/client/src/tscBulkWrite.c @@ -15,6 +15,7 @@ #include "osAtomic.h" +#include "tsclient.h" #include "tscBulkWrite.h" #include "tscSubquery.h" #include "tscLog.h" @@ -35,7 +36,21 @@ typedef struct { Runnable runnable[]; } BatchCallBackContext; -void tscReturnsError(SSqlObj *pSql, int code) { +/** + * Get the number of insertion row in the sql statement. + * + * @param pSql the sql statement. + * @return int32_t the number of insertion row. + */ +inline static int32_t statementGetInsertionRows(SSqlObj* pSql) { return pSql->cmd.insertParam.numOfRows; } + +/** + * Return the error result to the callback function, and release the sql object. + * + * @param pSql the sql object. + * @param code the error code of the error result. + */ +inline static void tscReturnsError(SSqlObj *pSql, int code) { if (pSql == NULL) { return; } @@ -44,7 +59,14 @@ void tscReturnsError(SSqlObj *pSql, int code) { tscAsyncResultOnError(pSql); } -void batchResultCallback(void* param, TAOS_RES* tres, int32_t code) { +/** + * Proxy function to perform sequentially insert operation. + * + * @param param the context of `batchResultCallback`. + * @param tres the result object. + * @param code the error code. + */ +static void batchResultCallback(void* param, TAOS_RES* tres, int32_t code) { BatchCallBackContext* context = param; SSqlObj* res = tres; diff --git a/src/client/src/tscSystem.c b/src/client/src/tscSystem.c index 620525263f9a53cb5269812ddf2dbd71f5bd028d..de467896d66d94bb7f646b634291f84882a650af 100644 --- a/src/client/src/tscSystem.c +++ b/src/client/src/tscSystem.c @@ -26,6 +26,7 @@ #include "tconfig.h" #include "ttimezone.h" #include "qScript.h" +#include "tscBulkWrite.h" // global, not configurable #define TSC_VAR_NOT_RELEASE 1 @@ -49,6 +50,7 @@ void *tscRpcCache; // cache to keep rpc obj int32_t tscNumOfThreads = 1; // num of rpc threads char tscLogFileName[] = "taoslog"; int tscLogFileNum = 10; +SAsyncBulkWriteDispatcher* tscDispatcher = NULL; static pthread_mutex_t rpcObjMutex; // mutex to protect open the rpc obj concurrently static pthread_once_t tscinit = PTHREAD_ONCE_INIT; @@ -57,6 +59,24 @@ static pthread_mutex_t setConfMutex = PTHREAD_MUTEX_INITIALIZER; // pthread_once can not return result code, so result code is set to a global variable. static volatile int tscInitRes = 0; +/** + * Init the taosc async bulk write dispatcher. + * + * @param batchSize the batchSize of async bulk write dispatcher. + * @param timeoutMs the timeout of batching in milliseconds. + */ +void tscInitAsyncDispatcher(int32_t batchSize, int32_t timeoutMs) { + tscDispatcher = createAsyncBulkWriteDispatcher(batchSize, timeoutMs); +} + +/** + * Destroy the taosc async bulk write dispatcher. + */ +void tscDestroyAsyncDispatcher() { + destroyAsyncDispatcher(tscDispatcher); + tscDispatcher = NULL; +} + void tscCheckDiskUsage(void *UNUSED_PARAM(para), void *UNUSED_PARAM(param)) { taosGetDisk(); taosTmrReset(tscCheckDiskUsage, 20 * 1000, NULL, tscTmr, &tscCheckDiskUsageTmr);