提交 563c0ddd 编写于 作者: H Haojun Liao

[td-11818] refactor.

上级 5ef4444c
...@@ -88,12 +88,27 @@ typedef struct SUseDbOutput { ...@@ -88,12 +88,27 @@ typedef struct SUseDbOutput {
typedef struct STableMetaOutput { typedef struct STableMetaOutput {
int32_t metaNum; int32_t metaNum;
char ctbFname[TSDB_TABLE_FNAME_LEN]; char ctbFname[TSDB_TABLE_FNAME_LEN];
char tbFname[TSDB_TABLE_FNAME_LEN]; char tbFname[TSDB_TABLE_FNAME_LEN];
SCTableMeta ctbMeta; SCTableMeta ctbMeta;
STableMeta *tbMeta; STableMeta *tbMeta;
} STableMetaOutput; } STableMetaOutput;
typedef int32_t __async_exec_fn_t(void* param); typedef struct SDataBuf {
void *pData;
uint32_t len;
} SDataBuf;
typedef int32_t (*__async_send_cb_fn_t)(void* param, const SDataBuf* pMsg, int32_t code);
typedef int32_t (*__async_exec_fn_t)(void* param);
typedef struct SMsgSendInfo {
__async_send_cb_fn_t fp; //async callback function
void *param;
uint64_t requestId;
uint64_t requestObjRefId;
int32_t msgType;
SDataBuf msgInfo;
} SMsgSendInfo;
bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTags); bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTags);
...@@ -109,7 +124,9 @@ int32_t cleanupTaskQueue(); ...@@ -109,7 +124,9 @@ int32_t cleanupTaskQueue();
*/ */
int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code); int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code);
SSchema* tGetTbnameColumnSchema(); int32_t asyncSendMsgToServer(void *pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo);
const SSchema* tGetTbnameColumnSchema();
void initQueryModuleMsgHandle(); void initQueryModuleMsgHandle();
extern int32_t (*queryBuildMsg[TSDB_MSG_TYPE_MAX])(void* input, char **msg, int32_t msgSize, int32_t *msgLen); extern int32_t (*queryBuildMsg[TSDB_MSG_TYPE_MAX])(void* input, char **msg, int32_t msgSize, int32_t *msgLen);
......
...@@ -29,6 +29,7 @@ extern "C" { ...@@ -29,6 +29,7 @@ extern "C" {
#include "tlist.h" #include "tlist.h"
#include "tmsgtype.h" #include "tmsgtype.h"
#include "trpc.h" #include "trpc.h"
#include "query.h"
typedef struct SQueryExecMetric { typedef struct SQueryExecMetric {
int64_t start; // start timestamp int64_t start; // start timestamp
...@@ -100,11 +101,6 @@ typedef struct SReqResultInfo { ...@@ -100,11 +101,6 @@ typedef struct SReqResultInfo {
uint32_t current; uint32_t current;
} SReqResultInfo; } SReqResultInfo;
typedef struct SDataBuf {
void *pData;
uint32_t len;
} SDataBuf;
typedef struct SRequestSendRecvBody { typedef struct SRequestSendRecvBody {
tsem_t rspSem; // not used now tsem_t rspSem; // not used now
void* fp; void* fp;
...@@ -119,39 +115,21 @@ typedef struct SRequestObj { ...@@ -119,39 +115,21 @@ typedef struct SRequestObj {
uint64_t requestId; uint64_t requestId;
int32_t type; // request type int32_t type; // request type
STscObj *pTscObj; STscObj *pTscObj;
SQueryExecMetric metric;
char *sqlstr; // sql string char *sqlstr; // sql string
SRequestSendRecvBody body;
int64_t self; int64_t self;
char *msgBuf; char *msgBuf;
void *pInfo; // sql parse info, generated by parser module void *pInfo; // sql parse info, generated by parser module
int32_t code; int32_t code;
SQueryExecMetric metric;
SRequestSendRecvBody body;
} SRequestObj; } SRequestObj;
typedef struct SRequestMsgBody {
int32_t msgType;
SDataBuf msgInfo;
uint64_t requestId;
uint64_t requestObjRefId;
} SRequestMsgBody;
typedef int (*__async_send_cb_fn_t)(void* param, const SDataBuf* pMsg, int32_t code);
typedef struct SMsgSendInfo {
__async_send_cb_fn_t fp;
void *param;
uint64_t requestId;
uint64_t requestObjRefId;
int32_t msgType;
SDataBuf msgInfo;
} SMsgSendInfo;
extern SAppInfo appInfo; extern SAppInfo appInfo;
extern int32_t msgObjRefPool; extern int32_t clientReqRefPool;
extern int32_t clientConnRefPool; extern int32_t clientConnRefPool;
SMsgSendInfo* buildSendMsgInfoImpl(SRequestObj *pRequest); SMsgSendInfo* buildSendMsgInfoImpl(SRequestObj *pRequest);
int genericExecCallback(void* param, const SDataBuf* pMsg, int32_t code); int genericRspCallback(void* param, const SDataBuf* pMsg, int32_t code);
extern int (*handleRequestRspFp[TSDB_MSG_TYPE_MAX])(void* param, const SDataBuf* pMsg, int32_t code); extern int (*handleRequestRspFp[TSDB_MSG_TYPE_MAX])(void* param, const SDataBuf* pMsg, int32_t code);
int taos_init(); int taos_init();
...@@ -166,7 +144,7 @@ char *getConnectionDB(STscObj* pObj); ...@@ -166,7 +144,7 @@ char *getConnectionDB(STscObj* pObj);
void setConnectionDB(STscObj* pTscObj, const char* db); void setConnectionDB(STscObj* pTscObj, const char* db);
void taos_init_imp(void); void taos_init_imp(void);
int taos_options_imp(TSDB_OPTION option, const char *str); int taos_options_imp(TSDB_OPTION option, const char *str);
void* openTransporter(const char *user, const char *auth, int32_t numOfThreads); void* openTransporter(const char *user, const char *auth, int32_t numOfThreads);
...@@ -176,8 +154,8 @@ void initMsgHandleFp(); ...@@ -176,8 +154,8 @@ void initMsgHandleFp();
TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, const char *auth, const char *db, uint16_t port); TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, const char *auth, const char *db, uint16_t port);
TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen); TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen);
void* doFetchRow(SRequestObj* pRequest); void *doFetchRow(SRequestObj* pRequest);
void setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows); void setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -32,7 +32,7 @@ ...@@ -32,7 +32,7 @@
#define TSC_VAR_RELEASED 0 #define TSC_VAR_RELEASED 0
SAppInfo appInfo; SAppInfo appInfo;
int32_t msgObjRefPool = -1; int32_t clientReqRefPool = -1;
int32_t clientConnRefPool = -1; int32_t clientConnRefPool = -1;
static pthread_once_t tscinit = PTHREAD_ONCE_INIT; static pthread_once_t tscinit = PTHREAD_ONCE_INIT;
...@@ -43,7 +43,7 @@ static void registerRequest(SRequestObj* pRequest) { ...@@ -43,7 +43,7 @@ static void registerRequest(SRequestObj* pRequest) {
assert(pTscObj != NULL); assert(pTscObj != NULL);
// connection has been released already, abort creating request. // connection has been released already, abort creating request.
pRequest->self = taosAddRef(msgObjRefPool, pRequest); pRequest->self = taosAddRef(clientReqRefPool, pRequest);
int32_t num = atomic_add_fetch_32(&pTscObj->numOfReqs, 1); int32_t num = atomic_add_fetch_32(&pTscObj->numOfReqs, 1);
...@@ -167,12 +167,11 @@ void* createRequest(STscObj* pObj, __taos_async_fn_t fp, void* param, int32_t ty ...@@ -167,12 +167,11 @@ void* createRequest(STscObj* pObj, __taos_async_fn_t fp, void* param, int32_t ty
// TODO generated request uuid // TODO generated request uuid
pRequest->requestId = 0; pRequest->requestId = 0;
pRequest->metric.start = taosGetTimestampMs(); pRequest->metric.start = taosGetTimestampMs();
pRequest->type = type; pRequest->type = type;
pRequest->pTscObj = pObj; pRequest->pTscObj = pObj;
pRequest->body.fp = fp; pRequest->body.fp = fp; // not used it yet
pRequest->msgBuf = calloc(1, ERROR_MSG_BUF_DEFAULT_SIZE); pRequest->msgBuf = calloc(1, ERROR_MSG_BUF_DEFAULT_SIZE);
tsem_init(&pRequest->body.rspSem, 0, 0); tsem_init(&pRequest->body.rspSem, 0, 0);
...@@ -201,7 +200,7 @@ void destroyRequest(SRequestObj* pRequest) { ...@@ -201,7 +200,7 @@ void destroyRequest(SRequestObj* pRequest) {
return; return;
} }
taosReleaseRef(msgObjRefPool, pRequest->self); taosReleaseRef(clientReqRefPool, pRequest->self);
} }
void taos_init_imp(void) { void taos_init_imp(void) {
...@@ -238,7 +237,7 @@ void taos_init_imp(void) { ...@@ -238,7 +237,7 @@ void taos_init_imp(void) {
initTaskQueue(); initTaskQueue();
clientConnRefPool = taosOpenRef(200, destroyTscObj); clientConnRefPool = taosOpenRef(200, destroyTscObj);
msgObjRefPool = taosOpenRef(40960, doDestroyRequest); clientReqRefPool = taosOpenRef(40960, doDestroyRequest);
taosGetAppName(appInfo.appName, NULL); taosGetAppName(appInfo.appName, NULL);
appInfo.pid = taosGetPId(); appInfo.pid = taosGetPId();
......
...@@ -14,8 +14,6 @@ static int32_t initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorE ...@@ -14,8 +14,6 @@ static int32_t initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorE
static SMsgSendInfo* buildConnectMsg(SRequestObj *pRequest); static SMsgSendInfo* buildConnectMsg(SRequestObj *pRequest);
static void destroySendMsgInfo(SMsgSendInfo* pMsgBody); static void destroySendMsgInfo(SMsgSendInfo* pMsgBody);
static int32_t sendMsgToServer(void *pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo);
static bool stringLengthCheck(const char* str, size_t maxsize) { static bool stringLengthCheck(const char* str, size_t maxsize) {
if (str == NULL) { if (str == NULL) {
return false; return false;
...@@ -201,10 +199,10 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) { ...@@ -201,10 +199,10 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) {
tstrncpy(ep.fqdn[i], info.epAddr[i].fqdn, tListLen(ep.fqdn[i])); tstrncpy(ep.fqdn[i], info.epAddr[i].fqdn, tListLen(ep.fqdn[i]));
} }
sendMsgToServer(pTscObj->pTransporter, &ep, &transporterId, body); asyncSendMsgToServer(pTscObj->pTransporter, &ep, &transporterId, body);
} else { } else {
int64_t transporterId = 0; int64_t transporterId = 0;
sendMsgToServer(pTscObj->pTransporter, pEpSet, &transporterId, body); asyncSendMsgToServer(pTscObj->pTransporter, pEpSet, &transporterId, body);
} }
tsem_wait(&pRequest->body.rspSem); tsem_wait(&pRequest->body.rspSem);
...@@ -274,7 +272,7 @@ STscObj* taosConnectImpl(const char *ip, const char *user, const char *auth, con ...@@ -274,7 +272,7 @@ STscObj* taosConnectImpl(const char *ip, const char *user, const char *auth, con
SMsgSendInfo* body = buildConnectMsg(pRequest); SMsgSendInfo* body = buildConnectMsg(pRequest);
int64_t transporterId = 0; int64_t transporterId = 0;
sendMsgToServer(pTscObj->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body); asyncSendMsgToServer(pTscObj->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body);
tsem_wait(&pRequest->body.rspSem); tsem_wait(&pRequest->body.rspSem);
destroySendMsgInfo(body); destroySendMsgInfo(body);
...@@ -335,7 +333,7 @@ static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) { ...@@ -335,7 +333,7 @@ static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) {
tfree(pMsgBody); tfree(pMsgBody);
} }
int32_t sendMsgToServer(void *pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo) { int32_t asyncSendMsgToServer(void *pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo) {
char *pMsg = rpcMallocCont(pInfo->msgInfo.len); char *pMsg = rpcMallocCont(pInfo->msgInfo.len);
if (NULL == pMsg) { if (NULL == pMsg) {
tscError("0x%"PRIx64" msg:%s malloc failed", pInfo->requestId, taosMsg[pInfo->msgType]); tscError("0x%"PRIx64" msg:%s malloc failed", pInfo->requestId, taosMsg[pInfo->msgType]);
...@@ -364,7 +362,7 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { ...@@ -364,7 +362,7 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
assert(pMsg->ahandle != NULL); assert(pMsg->ahandle != NULL);
if (pSendInfo->requestObjRefId != 0) { if (pSendInfo->requestObjRefId != 0) {
SRequestObj *pRequest = (SRequestObj *)taosAcquireRef(msgObjRefPool, pSendInfo->requestObjRefId); SRequestObj *pRequest = (SRequestObj *)taosAcquireRef(clientReqRefPool, pSendInfo->requestObjRefId);
assert(pRequest->self == pSendInfo->requestObjRefId); assert(pRequest->self == pSendInfo->requestObjRefId);
pRequest->metric.rsp = taosGetTimestampMs(); pRequest->metric.rsp = taosGetTimestampMs();
...@@ -391,7 +389,7 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { ...@@ -391,7 +389,7 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
pRequest->metric.rsp - pRequest->metric.start); pRequest->metric.rsp - pRequest->metric.start);
} }
taosReleaseRef(msgObjRefPool, pSendInfo->requestObjRefId); taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId);
} }
SDataBuf buf = {.pData = pMsg->pCont, .len = pMsg->contLen}; SDataBuf buf = {.pData = pMsg->pCont, .len = pMsg->contLen};
...@@ -437,7 +435,7 @@ void* doFetchRow(SRequestObj* pRequest) { ...@@ -437,7 +435,7 @@ void* doFetchRow(SRequestObj* pRequest) {
int64_t transporterId = 0; int64_t transporterId = 0;
STscObj *pTscObj = pRequest->pTscObj; STscObj *pTscObj = pRequest->pTscObj;
sendMsgToServer(pTscObj->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body); asyncSendMsgToServer(pTscObj->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body);
tsem_wait(&pRequest->body.rspSem); tsem_wait(&pRequest->body.rspSem);
destroySendMsgInfo(body); destroySendMsgInfo(body);
......
...@@ -35,8 +35,8 @@ void taos_cleanup(void) { ...@@ -35,8 +35,8 @@ void taos_cleanup(void) {
return; return;
} }
int32_t id = msgObjRefPool; int32_t id = clientReqRefPool;
msgObjRefPool = -1; clientReqRefPool = -1;
taosCloseRef(id); taosCloseRef(id);
cleanupTaskQueue(); cleanupTaskQueue();
......
...@@ -22,7 +22,7 @@ ...@@ -22,7 +22,7 @@
int (*handleRequestRspFp[TSDB_MSG_TYPE_MAX])(void*, const SDataBuf* pMsg, int32_t code); int (*handleRequestRspFp[TSDB_MSG_TYPE_MAX])(void*, const SDataBuf* pMsg, int32_t code);
int genericExecCallback(void* param, const SDataBuf* pMsg, int32_t code) { int genericRspCallback(void* param, const SDataBuf* pMsg, int32_t code) {
SRequestObj* pRequest = param; SRequestObj* pRequest = param;
pRequest->code = code; pRequest->code = code;
sem_post(&pRequest->body.rspSem); sem_post(&pRequest->body.rspSem);
...@@ -96,7 +96,7 @@ SMsgSendInfo* buildSendMsgInfoImpl(SRequestObj *pRequest) { ...@@ -96,7 +96,7 @@ SMsgSendInfo* buildSendMsgInfoImpl(SRequestObj *pRequest) {
pMsgSendInfo->requestId = pRequest->requestId; pMsgSendInfo->requestId = pRequest->requestId;
pMsgSendInfo->param = pRequest; pMsgSendInfo->param = pRequest;
pMsgSendInfo->fp = (handleRequestRspFp[pRequest->type] == NULL)? genericExecCallback:handleRequestRspFp[pRequest->type]; pMsgSendInfo->fp = (handleRequestRspFp[pRequest->type] == NULL)? genericRspCallback:handleRequestRspFp[pRequest->type];
} }
return pMsgSendInfo; return pMsgSendInfo;
......
...@@ -8,7 +8,7 @@ AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST) ...@@ -8,7 +8,7 @@ AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
ADD_EXECUTABLE(clientTest ${SOURCE_LIST}) ADD_EXECUTABLE(clientTest ${SOURCE_LIST})
TARGET_LINK_LIBRARIES( TARGET_LINK_LIBRARIES(
clientTest clientTest
PUBLIC os util common transport gtest taos PUBLIC os util common transport gtest taos qcom
) )
TARGET_INCLUDE_DIRECTORIES( TARGET_INCLUDE_DIRECTORIES(
......
...@@ -14,7 +14,7 @@ static struct SSchema _s = { ...@@ -14,7 +14,7 @@ static struct SSchema _s = {
.name = "tbname", .name = "tbname",
}; };
SSchema* tGetTbnameColumnSchema() { const SSchema* tGetTbnameColumnSchema() {
return &_s; return &_s;
} }
...@@ -103,7 +103,7 @@ int32_t cleanupTaskQueue() { ...@@ -103,7 +103,7 @@ int32_t cleanupTaskQueue() {
static void execHelper(struct SSchedMsg* pSchedMsg) { static void execHelper(struct SSchedMsg* pSchedMsg) {
assert(pSchedMsg != NULL && pSchedMsg->ahandle != NULL); assert(pSchedMsg != NULL && pSchedMsg->ahandle != NULL);
__async_exec_fn_t* execFn = (__async_exec_fn_t*) pSchedMsg->ahandle; __async_exec_fn_t execFn = (__async_exec_fn_t) pSchedMsg->ahandle;
int32_t code = execFn(pSchedMsg->thandle); int32_t code = execFn(pSchedMsg->thandle);
if (code != 0 && pSchedMsg->msg != NULL) { if (code != 0 && pSchedMsg->msg != NULL) {
*(int32_t*) pSchedMsg->msg = code; *(int32_t*) pSchedMsg->msg = code;
......
...@@ -32,7 +32,7 @@ int32_t schBuildAndSendRequest(void *pRpc, const SEpSet* pMgmtEps, __taos_async_ ...@@ -32,7 +32,7 @@ int32_t schBuildAndSendRequest(void *pRpc, const SEpSet* pMgmtEps, __taos_async_
buildConnectMsg(pRequest, &body); buildConnectMsg(pRequest, &body);
int64_t transporterId = 0; int64_t transporterId = 0;
sendMsgToServer(pTscObj->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &body, &transporterId); asyncSendMsgToServer(pTscObj->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &body, &transporterId);
tsem_wait(&pRequest->body.rspSem); tsem_wait(&pRequest->body.rspSem);
destroyConnectMsg(&body); destroyConnectMsg(&body);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册