diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index 877cfb130c3b338d701571c945d4b9b53db6bd91..86b51dbb85a70cb95d21ecebcbfa6598fed8bab3 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -88,12 +88,27 @@ typedef struct SUseDbOutput { typedef struct STableMetaOutput { int32_t metaNum; char ctbFname[TSDB_TABLE_FNAME_LEN]; - char tbFname[TSDB_TABLE_FNAME_LEN]; + char tbFname[TSDB_TABLE_FNAME_LEN]; SCTableMeta ctbMeta; STableMeta *tbMeta; } STableMetaOutput; -typedef int32_t __async_exec_fn_t(void* param); +typedef struct SDataBuf { + void *pData; + uint32_t len; +} SDataBuf; + +typedef int32_t (*__async_send_cb_fn_t)(void* param, const SDataBuf* pMsg, int32_t code); +typedef int32_t (*__async_exec_fn_t)(void* param); + +typedef struct SMsgSendInfo { + __async_send_cb_fn_t fp; //async callback function + void *param; + uint64_t requestId; + uint64_t requestObjRefId; + int32_t msgType; + SDataBuf msgInfo; +} SMsgSendInfo; bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTags); @@ -109,7 +124,9 @@ int32_t cleanupTaskQueue(); */ int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code); -SSchema* tGetTbnameColumnSchema(); +int32_t asyncSendMsgToServer(void *pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo); + +const SSchema* tGetTbnameColumnSchema(); void initQueryModuleMsgHandle(); extern int32_t (*queryBuildMsg[TSDB_MSG_TYPE_MAX])(void* input, char **msg, int32_t msgSize, int32_t *msgLen); diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index 550d8c3ecfe96b0268ce739c2a5d146913024a21..eba904c0b8ff4a010d4265828b1a3c932e1e0c7f 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -29,6 +29,7 @@ extern "C" { #include "tlist.h" #include "tmsgtype.h" #include "trpc.h" +#include "query.h" typedef struct SQueryExecMetric { int64_t start; // start timestamp @@ -100,11 +101,6 @@ typedef struct SReqResultInfo { uint32_t current; } SReqResultInfo; -typedef struct SDataBuf { - void *pData; - uint32_t len; -} SDataBuf; - typedef struct SRequestSendRecvBody { tsem_t rspSem; // not used now void* fp; @@ -119,39 +115,21 @@ typedef struct SRequestObj { uint64_t requestId; int32_t type; // request type STscObj *pTscObj; - SQueryExecMetric metric; char *sqlstr; // sql string - SRequestSendRecvBody body; int64_t self; char *msgBuf; void *pInfo; // sql parse info, generated by parser module int32_t code; + SQueryExecMetric metric; + SRequestSendRecvBody body; } SRequestObj; -typedef struct SRequestMsgBody { - int32_t msgType; - SDataBuf msgInfo; - uint64_t requestId; - uint64_t requestObjRefId; -} SRequestMsgBody; - -typedef int (*__async_send_cb_fn_t)(void* param, const SDataBuf* pMsg, int32_t code); - -typedef struct SMsgSendInfo { - __async_send_cb_fn_t fp; - void *param; - uint64_t requestId; - uint64_t requestObjRefId; - int32_t msgType; - SDataBuf msgInfo; -} SMsgSendInfo; - extern SAppInfo appInfo; -extern int32_t msgObjRefPool; +extern int32_t clientReqRefPool; extern int32_t clientConnRefPool; SMsgSendInfo* buildSendMsgInfoImpl(SRequestObj *pRequest); -int genericExecCallback(void* param, const SDataBuf* pMsg, int32_t code); +int genericRspCallback(void* param, const SDataBuf* pMsg, int32_t code); extern int (*handleRequestRspFp[TSDB_MSG_TYPE_MAX])(void* param, const SDataBuf* pMsg, int32_t code); int taos_init(); @@ -166,7 +144,7 @@ char *getConnectionDB(STscObj* pObj); void setConnectionDB(STscObj* pTscObj, const char* db); void taos_init_imp(void); -int taos_options_imp(TSDB_OPTION option, const char *str); +int taos_options_imp(TSDB_OPTION option, const char *str); void* openTransporter(const char *user, const char *auth, int32_t numOfThreads); @@ -176,8 +154,8 @@ void initMsgHandleFp(); TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, const char *auth, const char *db, uint16_t port); TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen); -void* doFetchRow(SRequestObj* pRequest); -void setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows); +void *doFetchRow(SRequestObj* pRequest); +void setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows); #ifdef __cplusplus } diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index dca4d85ad6c707f16ff31d66fde4d4adc1959b13..7fcdafaf442b33e33821627a343e8266f9e9e27c 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -32,7 +32,7 @@ #define TSC_VAR_RELEASED 0 SAppInfo appInfo; -int32_t msgObjRefPool = -1; +int32_t clientReqRefPool = -1; int32_t clientConnRefPool = -1; static pthread_once_t tscinit = PTHREAD_ONCE_INIT; @@ -43,7 +43,7 @@ static void registerRequest(SRequestObj* pRequest) { assert(pTscObj != NULL); // connection has been released already, abort creating request. - pRequest->self = taosAddRef(msgObjRefPool, pRequest); + pRequest->self = taosAddRef(clientReqRefPool, pRequest); int32_t num = atomic_add_fetch_32(&pTscObj->numOfReqs, 1); @@ -167,12 +167,11 @@ void* createRequest(STscObj* pObj, __taos_async_fn_t fp, void* param, int32_t ty // TODO generated request uuid pRequest->requestId = 0; - pRequest->metric.start = taosGetTimestampMs(); pRequest->type = type; pRequest->pTscObj = pObj; - pRequest->body.fp = fp; + pRequest->body.fp = fp; // not used it yet pRequest->msgBuf = calloc(1, ERROR_MSG_BUF_DEFAULT_SIZE); tsem_init(&pRequest->body.rspSem, 0, 0); @@ -201,7 +200,7 @@ void destroyRequest(SRequestObj* pRequest) { return; } - taosReleaseRef(msgObjRefPool, pRequest->self); + taosReleaseRef(clientReqRefPool, pRequest->self); } void taos_init_imp(void) { @@ -238,7 +237,7 @@ void taos_init_imp(void) { initTaskQueue(); clientConnRefPool = taosOpenRef(200, destroyTscObj); - msgObjRefPool = taosOpenRef(40960, doDestroyRequest); + clientReqRefPool = taosOpenRef(40960, doDestroyRequest); taosGetAppName(appInfo.appName, NULL); appInfo.pid = taosGetPId(); diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 7cd8e9d359bc2bf8a2ef331d51ec5a62ff66d2af..ed211e142c4de1ce85ed525aae38af873e8f3564 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -14,8 +14,6 @@ static int32_t initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorE static SMsgSendInfo* buildConnectMsg(SRequestObj *pRequest); static void destroySendMsgInfo(SMsgSendInfo* pMsgBody); -static int32_t sendMsgToServer(void *pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo); - static bool stringLengthCheck(const char* str, size_t maxsize) { if (str == NULL) { return false; @@ -201,10 +199,10 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) { tstrncpy(ep.fqdn[i], info.epAddr[i].fqdn, tListLen(ep.fqdn[i])); } - sendMsgToServer(pTscObj->pTransporter, &ep, &transporterId, body); + asyncSendMsgToServer(pTscObj->pTransporter, &ep, &transporterId, body); } else { int64_t transporterId = 0; - sendMsgToServer(pTscObj->pTransporter, pEpSet, &transporterId, body); + asyncSendMsgToServer(pTscObj->pTransporter, pEpSet, &transporterId, body); } tsem_wait(&pRequest->body.rspSem); @@ -274,7 +272,7 @@ STscObj* taosConnectImpl(const char *ip, const char *user, const char *auth, con SMsgSendInfo* body = buildConnectMsg(pRequest); int64_t transporterId = 0; - sendMsgToServer(pTscObj->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body); + asyncSendMsgToServer(pTscObj->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body); tsem_wait(&pRequest->body.rspSem); destroySendMsgInfo(body); @@ -335,7 +333,7 @@ static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) { tfree(pMsgBody); } -int32_t sendMsgToServer(void *pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo) { +int32_t asyncSendMsgToServer(void *pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo) { char *pMsg = rpcMallocCont(pInfo->msgInfo.len); if (NULL == pMsg) { tscError("0x%"PRIx64" msg:%s malloc failed", pInfo->requestId, taosMsg[pInfo->msgType]); @@ -364,7 +362,7 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { assert(pMsg->ahandle != NULL); if (pSendInfo->requestObjRefId != 0) { - SRequestObj *pRequest = (SRequestObj *)taosAcquireRef(msgObjRefPool, pSendInfo->requestObjRefId); + SRequestObj *pRequest = (SRequestObj *)taosAcquireRef(clientReqRefPool, pSendInfo->requestObjRefId); assert(pRequest->self == pSendInfo->requestObjRefId); pRequest->metric.rsp = taosGetTimestampMs(); @@ -391,7 +389,7 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { pRequest->metric.rsp - pRequest->metric.start); } - taosReleaseRef(msgObjRefPool, pSendInfo->requestObjRefId); + taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId); } SDataBuf buf = {.pData = pMsg->pCont, .len = pMsg->contLen}; @@ -437,7 +435,7 @@ void* doFetchRow(SRequestObj* pRequest) { int64_t transporterId = 0; STscObj *pTscObj = pRequest->pTscObj; - sendMsgToServer(pTscObj->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body); + asyncSendMsgToServer(pTscObj->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body); tsem_wait(&pRequest->body.rspSem); destroySendMsgInfo(body); diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 44c78076d9b3bd8e452039042af702adceba6930..af85f1b310abbbe0e691b743bfdc929e14e60de8 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -35,8 +35,8 @@ void taos_cleanup(void) { return; } - int32_t id = msgObjRefPool; - msgObjRefPool = -1; + int32_t id = clientReqRefPool; + clientReqRefPool = -1; taosCloseRef(id); cleanupTaskQueue(); diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index b18b3ecb51483777072dcbbe7d36e20b3c2b880b..700a7a20cfc3ba1c1a4614782738b2f962e0ab2c 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -22,7 +22,7 @@ int (*handleRequestRspFp[TSDB_MSG_TYPE_MAX])(void*, const SDataBuf* pMsg, int32_t code); -int genericExecCallback(void* param, const SDataBuf* pMsg, int32_t code) { +int genericRspCallback(void* param, const SDataBuf* pMsg, int32_t code) { SRequestObj* pRequest = param; pRequest->code = code; sem_post(&pRequest->body.rspSem); @@ -96,7 +96,7 @@ SMsgSendInfo* buildSendMsgInfoImpl(SRequestObj *pRequest) { pMsgSendInfo->requestId = pRequest->requestId; pMsgSendInfo->param = pRequest; - pMsgSendInfo->fp = (handleRequestRspFp[pRequest->type] == NULL)? genericExecCallback:handleRequestRspFp[pRequest->type]; + pMsgSendInfo->fp = (handleRequestRspFp[pRequest->type] == NULL)? genericRspCallback:handleRequestRspFp[pRequest->type]; } return pMsgSendInfo; diff --git a/source/client/test/CMakeLists.txt b/source/client/test/CMakeLists.txt index a3f2ad88bb6d8fc35e17ec76dd18539655a607fd..6886206363885e5fc8dc53706585d385065b0810 100644 --- a/source/client/test/CMakeLists.txt +++ b/source/client/test/CMakeLists.txt @@ -8,7 +8,7 @@ AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST) ADD_EXECUTABLE(clientTest ${SOURCE_LIST}) TARGET_LINK_LIBRARIES( clientTest - PUBLIC os util common transport gtest taos + PUBLIC os util common transport gtest taos qcom ) TARGET_INCLUDE_DIRECTORIES( diff --git a/source/libs/qcom/src/queryUtil.c b/source/libs/qcom/src/queryUtil.c index 829f426c9ddf1e2be1d3cab0220188eb1ad0d0c8..df3fa40004a4df3180ea0855e0fa8efbbe4bfdea 100644 --- a/source/libs/qcom/src/queryUtil.c +++ b/source/libs/qcom/src/queryUtil.c @@ -14,7 +14,7 @@ static struct SSchema _s = { .name = "tbname", }; -SSchema* tGetTbnameColumnSchema() { +const SSchema* tGetTbnameColumnSchema() { return &_s; } @@ -103,7 +103,7 @@ int32_t cleanupTaskQueue() { static void execHelper(struct SSchedMsg* pSchedMsg) { assert(pSchedMsg != NULL && pSchedMsg->ahandle != NULL); - __async_exec_fn_t* execFn = (__async_exec_fn_t*) pSchedMsg->ahandle; + __async_exec_fn_t execFn = (__async_exec_fn_t) pSchedMsg->ahandle; int32_t code = execFn(pSchedMsg->thandle); if (code != 0 && pSchedMsg->msg != NULL) { *(int32_t*) pSchedMsg->msg = code; diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 4185b3176c7f531f8ac3664b3aba4550d2874681..8e57f127771cd1b808afe4adf48575e2a1bc0362 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -32,7 +32,7 @@ int32_t schBuildAndSendRequest(void *pRpc, const SEpSet* pMgmtEps, __taos_async_ buildConnectMsg(pRequest, &body); int64_t transporterId = 0; - sendMsgToServer(pTscObj->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &body, &transporterId); + asyncSendMsgToServer(pTscObj->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &body, &transporterId); tsem_wait(&pRequest->body.rspSem); destroyConnectMsg(&body);