提交 ddc5d955 编写于 作者: Z zhihaop

feat: move global variable tscDispatcher to tscSystem.c

上级 67db8da1
...@@ -22,7 +22,6 @@ extern "C" { ...@@ -22,7 +22,6 @@ extern "C" {
#include "tlist.h" #include "tlist.h"
#include "tarray.h" #include "tarray.h"
#include "tsclient.h"
#include "tthread.h" #include "tthread.h"
typedef struct SAsyncBulkWriteDispatcher { typedef struct SAsyncBulkWriteDispatcher {
...@@ -54,26 +53,8 @@ typedef struct SAsyncBulkWriteDispatcher { ...@@ -54,26 +53,8 @@ typedef struct SAsyncBulkWriteDispatcher {
volatile bool shutdown; volatile bool shutdown;
} SAsyncBulkWriteDispatcher; } SAsyncBulkWriteDispatcher;
// forward declaration.
typedef struct SSqlObj SSqlObj;
/**
* Return the error result to the callback function, and release the sql object.
*
* @param pSql the sql object.
* @param code the error code of the error result.
*/
void tscReturnsError(SSqlObj* pSql, int code);
/**
* Proxy function to perform sequentially insert operation.
*
* @param param the context of `batchResultCallback`.
* @param tres the result object.
* @param code the error code.
*/
void batchResultCallback(void* param, TAOS_RES* tres, int32_t code);
/** /**
* Merge the statements into single SSqlObj. * Merge the statements into single SSqlObj.
* *
...@@ -84,14 +65,6 @@ void batchResultCallback(void* param, TAOS_RES* tres, int32_t code); ...@@ -84,14 +65,6 @@ void batchResultCallback(void* param, TAOS_RES* tres, int32_t code);
*/ */
int32_t dispatcherStatementMerge(SArray* statements, SSqlObj** result); int32_t dispatcherStatementMerge(SArray* statements, SSqlObj** result);
/**
* Get the number of insertion row in the sql statement.
*
* @param pSql the sql statement.
* @return int32_t the number of insertion row.
*/
inline static int32_t statementGetInsertionRows(SSqlObj* pSql) { return pSql->cmd.insertParam.numOfRows; }
/** /**
* Poll all the SSqlObj* in the dispatcher's buffer. * Poll all the SSqlObj* in the dispatcher's buffer.
* *
......
...@@ -544,6 +544,9 @@ extern SHashObj *tscVgroupMap; ...@@ -544,6 +544,9 @@ extern SHashObj *tscVgroupMap;
extern SHashObj *tscTableMetaMap; extern SHashObj *tscTableMetaMap;
extern SCacheObj *tscVgroupListBuf; extern SCacheObj *tscVgroupListBuf;
// forward declaration.
typedef struct SAsyncBulkWriteDispatcher SAsyncBulkWriteDispatcher;
extern SAsyncBulkWriteDispatcher* tscDispatcher;
extern int tscObjRef; extern int tscObjRef;
extern void *tscTmr; extern void *tscTmr;
extern void *tscQhandle; extern void *tscQhandle;
......
...@@ -37,26 +37,6 @@ static void tscAsyncQueryRowsForNextVnode(void *param, TAOS_RES *tres, int numOf ...@@ -37,26 +37,6 @@ static void tscAsyncQueryRowsForNextVnode(void *param, TAOS_RES *tres, int numOf
*/ */
static void tscAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOfRows); static void tscAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOfRows);
static SAsyncBulkWriteDispatcher* tscDispatcher;
/**
* Init the taosc async bulk write dispatcher.
*
* @param batchSize the batchSize of async bulk write dispatcher.
* @param timeoutMs the timeout of batching in milliseconds.
*/
void tscInitAsyncDispatcher(int32_t batchSize, int32_t timeoutMs) {
tscDispatcher = createAsyncBulkWriteDispatcher(batchSize, timeoutMs);
}
/**
* Destroy the taosc async bulk write dispatcher.
*/
void tscDestroyAsyncDispatcher() {
destroyAsyncDispatcher(tscDispatcher);
tscDispatcher = NULL;
}
void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, __async_cb_func_t fp, void* param, const char* sqlstr, size_t sqlLen) { void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, __async_cb_func_t fp, void* param, const char* sqlstr, size_t sqlLen) {
SSqlCmd* pCmd = &pSql->cmd; SSqlCmd* pCmd = &pSql->cmd;
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
#include "osAtomic.h" #include "osAtomic.h"
#include "tsclient.h"
#include "tscBulkWrite.h" #include "tscBulkWrite.h"
#include "tscSubquery.h" #include "tscSubquery.h"
#include "tscLog.h" #include "tscLog.h"
...@@ -35,7 +36,21 @@ typedef struct { ...@@ -35,7 +36,21 @@ typedef struct {
Runnable runnable[]; Runnable runnable[];
} BatchCallBackContext; } BatchCallBackContext;
void tscReturnsError(SSqlObj *pSql, int code) { /**
* Get the number of insertion row in the sql statement.
*
* @param pSql the sql statement.
* @return int32_t the number of insertion row.
*/
inline static int32_t statementGetInsertionRows(SSqlObj* pSql) { return pSql->cmd.insertParam.numOfRows; }
/**
* Return the error result to the callback function, and release the sql object.
*
* @param pSql the sql object.
* @param code the error code of the error result.
*/
inline static void tscReturnsError(SSqlObj *pSql, int code) {
if (pSql == NULL) { if (pSql == NULL) {
return; return;
} }
...@@ -44,7 +59,14 @@ void tscReturnsError(SSqlObj *pSql, int code) { ...@@ -44,7 +59,14 @@ void tscReturnsError(SSqlObj *pSql, int code) {
tscAsyncResultOnError(pSql); tscAsyncResultOnError(pSql);
} }
void batchResultCallback(void* param, TAOS_RES* tres, int32_t code) { /**
* Proxy function to perform sequentially insert operation.
*
* @param param the context of `batchResultCallback`.
* @param tres the result object.
* @param code the error code.
*/
static void batchResultCallback(void* param, TAOS_RES* tres, int32_t code) {
BatchCallBackContext* context = param; BatchCallBackContext* context = param;
SSqlObj* res = tres; SSqlObj* res = tres;
......
...@@ -26,6 +26,7 @@ ...@@ -26,6 +26,7 @@
#include "tconfig.h" #include "tconfig.h"
#include "ttimezone.h" #include "ttimezone.h"
#include "qScript.h" #include "qScript.h"
#include "tscBulkWrite.h"
// global, not configurable // global, not configurable
#define TSC_VAR_NOT_RELEASE 1 #define TSC_VAR_NOT_RELEASE 1
...@@ -49,6 +50,7 @@ void *tscRpcCache; // cache to keep rpc obj ...@@ -49,6 +50,7 @@ void *tscRpcCache; // cache to keep rpc obj
int32_t tscNumOfThreads = 1; // num of rpc threads int32_t tscNumOfThreads = 1; // num of rpc threads
char tscLogFileName[] = "taoslog"; char tscLogFileName[] = "taoslog";
int tscLogFileNum = 10; int tscLogFileNum = 10;
SAsyncBulkWriteDispatcher* tscDispatcher = NULL;
static pthread_mutex_t rpcObjMutex; // mutex to protect open the rpc obj concurrently static pthread_mutex_t rpcObjMutex; // mutex to protect open the rpc obj concurrently
static pthread_once_t tscinit = PTHREAD_ONCE_INIT; static pthread_once_t tscinit = PTHREAD_ONCE_INIT;
...@@ -57,6 +59,24 @@ static pthread_mutex_t setConfMutex = PTHREAD_MUTEX_INITIALIZER; ...@@ -57,6 +59,24 @@ static pthread_mutex_t setConfMutex = PTHREAD_MUTEX_INITIALIZER;
// pthread_once can not return result code, so result code is set to a global variable. // pthread_once can not return result code, so result code is set to a global variable.
static volatile int tscInitRes = 0; static volatile int tscInitRes = 0;
/**
* Init the taosc async bulk write dispatcher.
*
* @param batchSize the batchSize of async bulk write dispatcher.
* @param timeoutMs the timeout of batching in milliseconds.
*/
void tscInitAsyncDispatcher(int32_t batchSize, int32_t timeoutMs) {
tscDispatcher = createAsyncBulkWriteDispatcher(batchSize, timeoutMs);
}
/**
* Destroy the taosc async bulk write dispatcher.
*/
void tscDestroyAsyncDispatcher() {
destroyAsyncDispatcher(tscDispatcher);
tscDispatcher = NULL;
}
void tscCheckDiskUsage(void *UNUSED_PARAM(para), void *UNUSED_PARAM(param)) { void tscCheckDiskUsage(void *UNUSED_PARAM(para), void *UNUSED_PARAM(param)) {
taosGetDisk(); taosGetDisk();
taosTmrReset(tscCheckDiskUsage, 20 * 1000, NULL, tscTmr, &tscCheckDiskUsageTmr); taosTmrReset(tscCheckDiskUsage, 20 * 1000, NULL, tscTmr, &tscCheckDiskUsageTmr);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册