diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index a34250ccab8f51355e0318bfc65d248234cb4d7e..550d8c3ecfe96b0268ce739c2a5d146913024a21 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -37,7 +37,7 @@ typedef struct SQueryExecMetric { int64_t rsp; // receive response from server } SQueryExecMetric; -typedef struct SInstanceActivity { +typedef struct SInstanceSummary { uint64_t numOfInsertsReq; uint64_t numOfInsertRows; uint64_t insertElapsedTime; @@ -48,7 +48,7 @@ typedef struct SInstanceActivity { uint64_t numOfSlowQueries; uint64_t totalRequests; uint64_t currentRequests; // the number of SRequestObj -} SInstanceActivity; +} SInstanceSummary; typedef struct SHeartBeatInfo { void *pTimer; // timer, used to send request msg to mnode @@ -57,7 +57,7 @@ typedef struct SHeartBeatInfo { typedef struct SAppInstInfo { int64_t numOfConns; SCorEpSet mgmtEp; - SInstanceActivity summary; + SInstanceSummary summary; SList *pConnList; // STscObj linked list uint32_t clusterId; void *pTransporter; @@ -100,16 +100,16 @@ typedef struct SReqResultInfo { uint32_t current; } SReqResultInfo; -typedef struct SReqMsg { - void *pMsg; +typedef struct SDataBuf { + void *pData; uint32_t len; -} SReqMsgInfo; +} SDataBuf; typedef struct SRequestSendRecvBody { tsem_t rspSem; // not used now void* fp; int64_t execId; // showId/queryId - SReqMsgInfo requestMsg; + SDataBuf requestMsg; SReqResultInfo resInfo; } SRequestSendRecvBody; @@ -121,26 +121,38 @@ typedef struct SRequestObj { STscObj *pTscObj; SQueryExecMetric metric; char *sqlstr; // sql string - SRequestSendRecvBody body; + SRequestSendRecvBody body; int64_t self; char *msgBuf; - int32_t code; void *pInfo; // sql parse info, generated by parser module + int32_t code; } SRequestObj; typedef struct SRequestMsgBody { int32_t msgType; - SReqMsgInfo msgInfo; + SDataBuf msgInfo; uint64_t requestId; uint64_t requestObjRefId; } SRequestMsgBody; -extern SAppInfo appInfo; -extern int32_t tscReqRef; -extern int32_t tscConnRef; +typedef int (*__async_send_cb_fn_t)(void* param, const SDataBuf* pMsg, int32_t code); + +typedef struct SMsgSendInfo { + __async_send_cb_fn_t fp; + void *param; + uint64_t requestId; + uint64_t requestObjRefId; + int32_t msgType; + SDataBuf msgInfo; +} SMsgSendInfo; + +extern SAppInfo appInfo; +extern int32_t msgObjRefPool; +extern int32_t clientConnRefPool; -SRequestMsgBody buildRequestMsgImpl(SRequestObj *pRequest); -extern int (*handleRequestRspFp[TSDB_MSG_TYPE_MAX])(SRequestObj *pRequest, const char* pMsg, int32_t msgLen); +SMsgSendInfo* buildSendMsgInfoImpl(SRequestObj *pRequest); +int genericExecCallback(void* param, const SDataBuf* pMsg, int32_t code); +extern int (*handleRequestRspFp[TSDB_MSG_TYPE_MAX])(void* param, const SDataBuf* pMsg, int32_t code); int taos_init(); diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index b54f7fedd788ea464714c17f033ac5b1bac48244..dca4d85ad6c707f16ff31d66fde4d4adc1959b13 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -13,10 +13,10 @@ * along with this program. If not, see . */ -#include +#include "os.h" +#include "catalog.h" #include "clientInt.h" #include "clientLog.h" -#include "os.h" #include "query.h" #include "taosmsg.h" #include "tcache.h" @@ -32,26 +32,26 @@ #define TSC_VAR_RELEASED 0 SAppInfo appInfo; -int32_t tscReqRef = -1; -int32_t tscConnRef = -1; +int32_t msgObjRefPool = -1; +int32_t clientConnRefPool = -1; static pthread_once_t tscinit = PTHREAD_ONCE_INIT; volatile int32_t tscInitRes = 0; static void registerRequest(SRequestObj* pRequest) { - STscObj *pTscObj = (STscObj *)taosAcquireRef(tscConnRef, pRequest->pTscObj->id); + STscObj *pTscObj = (STscObj *)taosAcquireRef(clientConnRefPool, pRequest->pTscObj->id); assert(pTscObj != NULL); // connection has been released already, abort creating request. - pRequest->self = taosAddRef(tscReqRef, pRequest); + pRequest->self = taosAddRef(msgObjRefPool, pRequest); int32_t num = atomic_add_fetch_32(&pTscObj->numOfReqs, 1); if (pTscObj->pAppInfo) { - SInstanceActivity *pActivity = &pTscObj->pAppInfo->summary; + SInstanceSummary *pSummary = &pTscObj->pAppInfo->summary; - int32_t total = atomic_add_fetch_32(&pActivity->totalRequests, 1); - int32_t currentInst = atomic_add_fetch_32(&pActivity->currentRequests, 1); + int32_t total = atomic_add_fetch_32(&pSummary->totalRequests, 1); + int32_t currentInst = atomic_add_fetch_32(&pSummary->currentRequests, 1); tscDebug("0x%" PRIx64 " new Request from connObj:0x%" PRIx64 ", current:%d, app current:%d, total:%d", pRequest->self, pRequest->pTscObj->id, num, currentInst, total); } @@ -61,13 +61,13 @@ static void deregisterRequest(SRequestObj* pRequest) { assert(pRequest != NULL); STscObj* pTscObj = pRequest->pTscObj; - SInstanceActivity* pActivity = &pTscObj->pAppInfo->summary; + SInstanceSummary* pActivity = &pTscObj->pAppInfo->summary; int32_t currentInst = atomic_sub_fetch_32(&pActivity->currentRequests, 1); int32_t num = atomic_sub_fetch_32(&pTscObj->numOfReqs, 1); tscDebug("0x%"PRIx64" free Request from connObj: 0x%"PRIx64", current:%d, app current:%d", pRequest->self, pTscObj->id, num, currentInst); - taosReleaseRef(tscConnRef, pTscObj->id); + taosReleaseRef(clientConnRefPool, pTscObj->id); } static void tscInitLogFile() { @@ -150,7 +150,7 @@ void* createTscObj(const char* user, const char* auth, const char *db, SAppInstI } pthread_mutex_init(&pObj->mutex, NULL); - pObj->id = taosAddRef(tscConnRef, pObj); + pObj->id = taosAddRef(clientConnRefPool, pObj); tscDebug("connObj created, 0x%"PRIx64, pObj->id); return pObj; @@ -173,7 +173,6 @@ void* createRequest(STscObj* pObj, __taos_async_fn_t fp, void* param, int32_t ty pRequest->type = type; pRequest->pTscObj = pObj; pRequest->body.fp = fp; -// pRequest->body.requestMsg. = param; pRequest->msgBuf = calloc(1, ERROR_MSG_BUF_DEFAULT_SIZE); tsem_init(&pRequest->body.rspSem, 0, 0); @@ -202,7 +201,7 @@ void destroyRequest(SRequestObj* pRequest) { return; } - taosReleaseRef(tscReqRef, pRequest->self); + taosReleaseRef(msgObjRefPool, pRequest->self); } void taos_init_imp(void) { @@ -238,8 +237,8 @@ void taos_init_imp(void) { initTaskQueue(); - tscConnRef = taosOpenRef(200, destroyTscObj); - tscReqRef = taosOpenRef(40960, doDestroyRequest); + clientConnRefPool = taosOpenRef(200, destroyTscObj); + msgObjRefPool = taosOpenRef(40960, doDestroyRequest); taosGetAppName(appInfo.appName, NULL); appInfo.pid = taosGetPId(); diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index f00e7bd5781ce8f04e3f67410e08f9b44f6b2d39..7cd8e9d359bc2bf8a2ef331d51ec5a62ff66d2af 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -11,10 +11,10 @@ #include "parser.h" static int32_t initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet); -static int32_t buildConnectMsg(SRequestObj *pRequest, SRequestMsgBody* pMsgBody); -static void destroyRequestMsgBody(SRequestMsgBody* pMsgBody); +static SMsgSendInfo* buildConnectMsg(SRequestObj *pRequest); +static void destroySendMsgInfo(SMsgSendInfo* pMsgBody); -static int32_t sendMsgToServer(void *pTransporter, SEpSet* epSet, const SRequestMsgBody *pBody, int64_t* pTransporterId); +static int32_t sendMsgToServer(void *pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo); static bool stringLengthCheck(const char* str, size_t maxsize) { if (str == NULL) { @@ -163,10 +163,11 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) { int32_t code = qParseQuerySql(&cxt, &pQuery); if (qIsDclQuery(pQuery)) { SDclStmtInfo* pDcl = (SDclStmtInfo*) pQuery; + pRequest->type = pDcl->msgType; - pRequest->body.requestMsg = (SReqMsgInfo){.pMsg = pDcl->pMsg, .len = pDcl->msgLen}; + pRequest->body.requestMsg = (SDataBuf){.pData = pDcl->pMsg, .len = pDcl->msgLen}; - SRequestMsgBody body = buildRequestMsgImpl(pRequest); + SMsgSendInfo* body = buildSendMsgInfoImpl(pRequest); SEpSet* pEpSet = &pTscObj->pAppInfo->mgmtEp.epSet; if (pDcl->msgType == TSDB_MSG_TYPE_CREATE_TABLE) { @@ -180,7 +181,7 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) { return pRequest; } - SCreateTableMsg* pMsg = body.msgInfo.pMsg; + SCreateTableMsg* pMsg = body->msgInfo.pData; SName t = {0}; tNameFromString(&t, pMsg->name, T_NAME_ACCT|T_NAME_DB|T_NAME_TABLE); @@ -200,14 +201,14 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) { tstrncpy(ep.fqdn[i], info.epAddr[i].fqdn, tListLen(ep.fqdn[i])); } - sendMsgToServer(pTscObj->pTransporter, &ep, &body, &transporterId); + sendMsgToServer(pTscObj->pTransporter, &ep, &transporterId, body); } else { int64_t transporterId = 0; - sendMsgToServer(pTscObj->pTransporter, pEpSet, &body, &transporterId); + sendMsgToServer(pTscObj->pTransporter, pEpSet, &transporterId, body); } tsem_wait(&pRequest->body.rspSem); - destroyRequestMsgBody(&body); + destroySendMsgInfo(body); } tfree(cxt.ctx.db); @@ -270,14 +271,13 @@ STscObj* taosConnectImpl(const char *ip, const char *user, const char *auth, con return NULL; } - SRequestMsgBody body = {0}; - buildConnectMsg(pRequest, &body); + SMsgSendInfo* body = buildConnectMsg(pRequest); int64_t transporterId = 0; - sendMsgToServer(pTscObj->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &body, &transporterId); + sendMsgToServer(pTscObj->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body); tsem_wait(&pRequest->body.rspSem); - destroyRequestMsgBody(&body); + destroySendMsgInfo(body); if (pRequest->code != TSDB_CODE_SUCCESS) { const char *errorMsg = (pRequest->code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(terrno); @@ -294,15 +294,25 @@ STscObj* taosConnectImpl(const char *ip, const char *user, const char *auth, con return pTscObj; } -static int32_t buildConnectMsg(SRequestObj *pRequest, SRequestMsgBody* pMsgBody) { - pMsgBody->msgType = TSDB_MSG_TYPE_CONNECT; - pMsgBody->msgInfo.len = sizeof(SConnectMsg); - pMsgBody->requestObjRefId = pRequest->self; +static SMsgSendInfo* buildConnectMsg(SRequestObj *pRequest) { + SMsgSendInfo *pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo)); + if (pMsgSendInfo == NULL) { + terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; + return NULL; + } + + pMsgSendInfo->msgType = TSDB_MSG_TYPE_CONNECT; + pMsgSendInfo->msgInfo.len = sizeof(SConnectMsg); + pMsgSendInfo->requestObjRefId = pRequest->self; + pMsgSendInfo->requestId = pRequest->requestId; + pMsgSendInfo->fp = handleRequestRspFp[pMsgSendInfo->msgType]; + pMsgSendInfo->param = pRequest; SConnectMsg *pConnect = calloc(1, sizeof(SConnectMsg)); if (pConnect == NULL) { + tfree(pMsgSendInfo); terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; - return -1; + return NULL; } STscObj *pObj = pRequest->pTscObj; @@ -315,84 +325,78 @@ static int32_t buildConnectMsg(SRequestObj *pRequest, SRequestMsgBody* pMsgBody) pConnect->startTime = htobe64(appInfo.startTime); tstrncpy(pConnect->app, appInfo.appName, tListLen(pConnect->app)); - pMsgBody->msgInfo.pMsg = pConnect; - return 0; + pMsgSendInfo->msgInfo.pData = pConnect; + return pMsgSendInfo; } -static void destroyRequestMsgBody(SRequestMsgBody* pMsgBody) { +static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) { assert(pMsgBody != NULL); - tfree(pMsgBody->msgInfo.pMsg); + tfree(pMsgBody->msgInfo.pData); + tfree(pMsgBody); } -int32_t sendMsgToServer(void *pTransporter, SEpSet* epSet, const SRequestMsgBody *pBody, int64_t* pTransporterId) { - char *pMsg = rpcMallocCont(pBody->msgInfo.len); +int32_t sendMsgToServer(void *pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo) { + char *pMsg = rpcMallocCont(pInfo->msgInfo.len); if (NULL == pMsg) { - tscError("0x%"PRIx64" msg:%s malloc failed", pBody->requestId, taosMsg[pBody->msgType]); + tscError("0x%"PRIx64" msg:%s malloc failed", pInfo->requestId, taosMsg[pInfo->msgType]); terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; return -1; } - memcpy(pMsg, pBody->msgInfo.pMsg, pBody->msgInfo.len); + memcpy(pMsg, pInfo->msgInfo.pData, pInfo->msgInfo.len); SRpcMsg rpcMsg = { - .msgType = pBody->msgType, + .msgType = pInfo->msgType, .pCont = pMsg, - .contLen = pBody->msgInfo.len, - .ahandle = (void*) pBody->requestObjRefId, + .contLen = pInfo->msgInfo.len, + .ahandle = (void*) pInfo, .handle = NULL, .code = 0 }; + assert(pInfo->fp != NULL); + rpcSendRequest(pTransporter, epSet, &rpcMsg, pTransporterId); return TSDB_CODE_SUCCESS; } void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { - int64_t requestRefId = (int64_t)pMsg->ahandle; - - SRequestObj *pRequest = (SRequestObj *)taosAcquireRef(tscReqRef, requestRefId); - if (pRequest == NULL) { - rpcFreeCont(pMsg->pCont); - return; - } + SMsgSendInfo *pSendInfo = (SMsgSendInfo *) pMsg->ahandle; + assert(pMsg->ahandle != NULL); - assert(pRequest->self == requestRefId); - pRequest->metric.rsp = taosGetTimestampMs(); + if (pSendInfo->requestObjRefId != 0) { + SRequestObj *pRequest = (SRequestObj *)taosAcquireRef(msgObjRefPool, pSendInfo->requestObjRefId); + assert(pRequest->self == pSendInfo->requestObjRefId); - pRequest->code = pMsg->code; + pRequest->metric.rsp = taosGetTimestampMs(); + pRequest->code = pMsg->code; - STscObj *pTscObj = pRequest->pTscObj; - if (pEpSet) { - if (!isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, pEpSet)) { - updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, pEpSet); + STscObj *pTscObj = pRequest->pTscObj; + if (pEpSet) { + if (!isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, pEpSet)) { + updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, pEpSet); + } } - } - /* + /* * There is not response callback function for submit response. * The actual inserted number of points is the first number. - */ - if (pMsg->code == TSDB_CODE_SUCCESS) { - tscDebug("0x%" PRIx64 " message:%s, code:%s rspLen:%d, elapsed:%"PRId64 " ms", pRequest->requestId, taosMsg[pMsg->msgType], - tstrerror(pMsg->code), pMsg->contLen, pRequest->metric.rsp - pRequest->metric.start); - if (handleRequestRspFp[pRequest->type]) { - char *p = malloc(pMsg->contLen); - if (p == NULL) { - pRequest->code = TSDB_CODE_TSC_OUT_OF_MEMORY; - terrno = pRequest->code; - } else { - memcpy(p, pMsg->pCont, pMsg->contLen); - pMsg->code = (*handleRequestRspFp[pRequest->type])(pRequest, p, pMsg->contLen); - } + */ + if (pMsg->code == TSDB_CODE_SUCCESS) { + tscDebug("0x%" PRIx64 " message:%s, code:%s rspLen:%d, elapsed:%" PRId64 " ms", pRequest->requestId, + taosMsg[pMsg->msgType], tstrerror(pMsg->code), pMsg->contLen, + pRequest->metric.rsp - pRequest->metric.start); + } else { + tscError("0x%" PRIx64 " SQL cmd:%s, code:%s rspLen:%d, elapsed time:%" PRId64 " ms", pRequest->requestId, + taosMsg[pMsg->msgType], tstrerror(pMsg->code), pMsg->contLen, + pRequest->metric.rsp - pRequest->metric.start); } - } else { - tscError("0x%" PRIx64 " SQL cmd:%s, code:%s rspLen:%d, elapsed time:%"PRId64" ms", pRequest->requestId, taosMsg[pMsg->msgType], - tstrerror(pMsg->code), pMsg->contLen, pRequest->metric.rsp - pRequest->metric.start); + + taosReleaseRef(msgObjRefPool, pSendInfo->requestObjRefId); } - taosReleaseRef(tscReqRef, requestRefId); + SDataBuf buf = {.pData = pMsg->pCont, .len = pMsg->contLen}; + pSendInfo->fp(pSendInfo->param, &buf, pMsg->code); rpcFreeCont(pMsg->pCont); - - sem_post(&pRequest->body.rspSem); } TAOS *taos_connect_auth(const char *ip, const char *user, const char *auth, const char *db, uint16_t port) { @@ -429,14 +433,14 @@ void* doFetchRow(SRequestObj* pRequest) { if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) { pRequest->type = TSDB_MSG_TYPE_SHOW_RETRIEVE; - SRequestMsgBody body = buildRequestMsgImpl(pRequest); + SMsgSendInfo* body = buildSendMsgInfoImpl(pRequest); - int64_t transporterId = 0; - STscObj* pTscObj = pRequest->pTscObj; - sendMsgToServer(pTscObj->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &body, &transporterId); + int64_t transporterId = 0; + STscObj *pTscObj = pRequest->pTscObj; + sendMsgToServer(pTscObj->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body); tsem_wait(&pRequest->body.rspSem); - destroyRequestMsgBody(&body); + destroySendMsgInfo(body); pResultInfo->current = 0; if (pResultInfo->numOfRows <= pResultInfo->current) { diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index f50765d37a82a2836cd671286d9bd64cdfc4f837..44c78076d9b3bd8e452039042af702adceba6930 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -35,14 +35,14 @@ void taos_cleanup(void) { return; } - int32_t id = tscReqRef; - tscReqRef = -1; + int32_t id = msgObjRefPool; + msgObjRefPool = -1; taosCloseRef(id); cleanupTaskQueue(); - id = tscConnRef; - tscConnRef = -1; + id = clientConnRefPool; + clientConnRefPool = -1; taosCloseRef(id); rpcCleanup(); @@ -72,7 +72,7 @@ void taos_close(TAOS* taos) { STscObj *pTscObj = (STscObj *)taos; tscDebug("0x%"PRIx64" try to close connection, numOfReq:%d", pTscObj->id, pTscObj->numOfReqs); - taosRemoveRef(tscConnRef, pTscObj->id); + taosRemoveRef(clientConnRefPool, pTscObj->id); } int taos_errno(TAOS_RES *tres) { @@ -130,7 +130,7 @@ TAOS_RES *taos_query(TAOS *taos, const char *sql) { return NULL; } - return taos_query_l(taos, sql, strlen(sql)); + return taos_query_l(taos, sql, (int32_t) strlen(sql)); } TAOS_ROW taos_fetch_row(TAOS_RES *pRes) { @@ -140,7 +140,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *pRes) { SRequestObj *pRequest = (SRequestObj *) pRes; if (pRequest->type == TSDB_SQL_RETRIEVE_EMPTY_RESULT || - pRequest->type == TSDB_SQL_INSERT) { + pRequest->type == TSDB_SQL_INSERT || pRequest->code != TSDB_CODE_SUCCESS) { return NULL; } diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index 548ea3d725f3cb43fafc393d69fad3d48ca73a87..b18b3ecb51483777072dcbbe7d36e20b3c2b880b 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -18,45 +18,23 @@ #include "tname.h" #include "clientInt.h" #include "clientLog.h" -#include "tmsgtype.h" #include "trpc.h" -int (*handleRequestRspFp[TSDB_MSG_TYPE_MAX])(SRequestObj *pRequest, const char* pMsg, int32_t msgLen); +int (*handleRequestRspFp[TSDB_MSG_TYPE_MAX])(void*, const SDataBuf* pMsg, int32_t code); -int32_t buildConnectMsg(SRequestObj *pRequest, SRequestMsgBody* pMsgBody) { - pMsgBody->msgType = TSDB_MSG_TYPE_CONNECT; - pMsgBody->msgInfo.len = sizeof(SConnectMsg); - pMsgBody->requestObjRefId = pRequest->self; - - SConnectMsg *pConnect = calloc(1, sizeof(SConnectMsg)); - if (pConnect == NULL) { - terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; - return -1; - } - - // TODO refactor full_name - char *db; // ugly code to move the space - - STscObj *pObj = pRequest->pTscObj; - pthread_mutex_lock(&pObj->mutex); - db = strstr(pObj->db, TS_PATH_DELIMITER); - - db = (db == NULL) ? pObj->db : db + 1; - tstrncpy(pConnect->db, db, sizeof(pConnect->db)); - pthread_mutex_unlock(&pObj->mutex); - - pConnect->pid = htonl(appInfo.pid); - pConnect->startTime = htobe64(appInfo.startTime); - tstrncpy(pConnect->app, appInfo.appName, tListLen(pConnect->app)); - - pMsgBody->msgInfo.pMsg = pConnect; +int genericExecCallback(void* param, const SDataBuf* pMsg, int32_t code) { + SRequestObj* pRequest = param; + pRequest->code = code; + sem_post(&pRequest->body.rspSem); return 0; } -int processConnectRsp(SRequestObj *pRequest, const char* pMsg, int32_t msgLen) { +int processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) { + SRequestObj* pRequest = param; + STscObj *pTscObj = pRequest->pTscObj; - SConnectRsp *pConnect = (SConnectRsp *)pMsg; + SConnectRsp *pConnect = (SConnectRsp *)pMsg->pData; pConnect->acctId = htonl(pConnect->acctId); pConnect->connId = htonl(pConnect->connId); pConnect->clusterId = htonl(pConnect->clusterId); @@ -81,15 +59,19 @@ int processConnectRsp(SRequestObj *pRequest, const char* pMsg, int32_t msgLen) { pTscObj->pAppInfo->clusterId = pConnect->clusterId; atomic_add_fetch_64(&pTscObj->pAppInfo->numOfConns, 1); - pRequest->body.resInfo.pRspMsg = pMsg; +// pRequest->body.resInfo.pRspMsg = pMsg->pData; tscDebug("0x%" PRIx64 " clusterId:%d, totalConn:%"PRId64, pRequest->requestId, pConnect->clusterId, pTscObj->pAppInfo->numOfConns); + + sem_post(&pRequest->body.rspSem); return 0; } -static int32_t buildRetrieveMnodeMsg(SRequestObj *pRequest, SRequestMsgBody* pMsgBody) { - pMsgBody->msgType = TSDB_MSG_TYPE_SHOW_RETRIEVE; - pMsgBody->msgInfo.len = sizeof(SRetrieveTableMsg); - pMsgBody->requestObjRefId = pRequest->self; +static int32_t buildRetrieveMnodeMsg(SRequestObj *pRequest, SMsgSendInfo* pMsgSendInfo) { + pMsgSendInfo->msgType = TSDB_MSG_TYPE_SHOW_RETRIEVE; + pMsgSendInfo->msgInfo.len = sizeof(SRetrieveTableMsg); + pMsgSendInfo->requestObjRefId = pRequest->self; + pMsgSendInfo->param = pRequest; + pMsgSendInfo->fp = handleRequestRspFp[pMsgSendInfo->msgType]; SRetrieveTableMsg *pRetrieveMsg = calloc(1, sizeof(SRetrieveTableMsg)); if (pRetrieveMsg == NULL) { @@ -97,29 +79,38 @@ static int32_t buildRetrieveMnodeMsg(SRequestObj *pRequest, SRequestMsgBody* pMs } pRetrieveMsg->showId = htonl(pRequest->body.execId); - pMsgBody->msgInfo.pMsg = pRetrieveMsg; + pMsgSendInfo->msgInfo.pData = pRetrieveMsg; return TSDB_CODE_SUCCESS; } -SRequestMsgBody buildRequestMsgImpl(SRequestObj *pRequest) { +SMsgSendInfo* buildSendMsgInfoImpl(SRequestObj *pRequest) { + SMsgSendInfo* pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo)); + if (pRequest->type == TSDB_MSG_TYPE_SHOW_RETRIEVE) { - SRequestMsgBody body = {0}; - buildRetrieveMnodeMsg(pRequest, &body); - return body; + buildRetrieveMnodeMsg(pRequest, pMsgSendInfo); } else { assert(pRequest != NULL); - SRequestMsgBody body = { - .requestObjRefId = pRequest->self, - .msgInfo = pRequest->body.requestMsg, - .msgType = pRequest->type, - .requestId = pRequest->requestId, - }; - return body; + pMsgSendInfo->requestObjRefId = pRequest->self; + pMsgSendInfo->msgInfo = pRequest->body.requestMsg; + pMsgSendInfo->msgType = pRequest->type; + pMsgSendInfo->requestId = pRequest->requestId; + pMsgSendInfo->param = pRequest; + + pMsgSendInfo->fp = (handleRequestRspFp[pRequest->type] == NULL)? genericExecCallback:handleRequestRspFp[pRequest->type]; } + + return pMsgSendInfo; } -int32_t processShowRsp(SRequestObj *pRequest, const char* pMsg, int32_t msgLen) { - SShowRsp* pShow = (SShowRsp *)pMsg; +int32_t processShowRsp(void* param, const SDataBuf* pMsg, int32_t code) { + SRequestObj* pRequest = param; + if (code != TSDB_CODE_SUCCESS) { + pRequest->code = code; + tsem_post(&pRequest->body.rspSem); + return code; + } + + SShowRsp* pShow = (SShowRsp *)pMsg->pData; pShow->showId = htonl(pShow->showId); STableMetaMsg *pMetaMsg = &(pShow->tableMeta); @@ -140,7 +131,7 @@ int32_t processShowRsp(SRequestObj *pRequest, const char* pMsg, int32_t msgLen) pFields[i].bytes = pSchema[i].bytes; } - pRequest->body.resInfo.pRspMsg = pMsg; +// pRequest->body.resInfo.pRspMsg = pMsg->pData; SReqResultInfo* pResInfo = &pRequest->body.resInfo; pResInfo->fields = pFields; @@ -150,16 +141,18 @@ int32_t processShowRsp(SRequestObj *pRequest, const char* pMsg, int32_t msgLen) pResInfo->length = calloc(pResInfo->numOfCols, sizeof(int32_t)); pRequest->body.execId = pShow->showId; + tsem_post(&pRequest->body.rspSem); return 0; } -int32_t processRetrieveMnodeRsp(SRequestObj *pRequest, const char* pMsg, int32_t msgLen) { - assert(msgLen >= sizeof(SRetrieveTableRsp)); +int32_t processRetrieveMnodeRsp(void* param, const SDataBuf* pMsg, int32_t code) { + assert(pMsg->len >= sizeof(SRetrieveTableRsp)); - tfree(pRequest->body.resInfo.pRspMsg); - pRequest->body.resInfo.pRspMsg = pMsg; + SRequestObj* pRequest = param; +// tfree(pRequest->body.resInfo.pRspMsg); +// pRequest->body.resInfo.pRspMsg = pMsg->pData; - SRetrieveTableRsp *pRetrieve = (SRetrieveTableRsp *) pMsg; + SRetrieveTableRsp *pRetrieve = (SRetrieveTableRsp *) pMsg->pData; pRetrieve->numOfRows = htonl(pRetrieve->numOfRows); pRetrieve->precision = htons(pRetrieve->precision); @@ -172,29 +165,42 @@ int32_t processRetrieveMnodeRsp(SRequestObj *pRequest, const char* pMsg, int32_t tscDebug("0x%"PRIx64" numOfRows:%d, complete:%d, qId:0x%"PRIx64, pRequest->self, pRetrieve->numOfRows, pRetrieve->completed, pRequest->body.execId); + + tsem_post(&pRequest->body.rspSem); return 0; } -int32_t processCreateDbRsp(SRequestObj *pRequest, const char* pMsg, int32_t msgLen) { +int32_t processCreateDbRsp(void* param, const SDataBuf* pMsg, int32_t code) { // todo rsp with the vnode id list + SRequestObj* pRequest = param; + tsem_post(&pRequest->body.rspSem); } -int32_t processUseDbRsp(SRequestObj *pRequest, const char* pMsg, int32_t msgLen) { - SUseDbRsp* pUseDbRsp = (SUseDbRsp*) pMsg; +int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) { + SUseDbRsp* pUseDbRsp = (SUseDbRsp*) pMsg->pData; SName name = {0}; tNameFromString(&name, pUseDbRsp->db, T_NAME_ACCT|T_NAME_DB); char db[TSDB_DB_NAME_LEN] = {0}; tNameGetDbName(&name, db); + + SRequestObj* pRequest = param; setConnectionDB(pRequest->pTscObj, db); + + tsem_post(&pRequest->body.rspSem); + return 0; } -int32_t processCreateTableRsp(SRequestObj *pRequest, const char* pMsg, int32_t msgLen) { +int32_t processCreateTableRsp(void* param, const SDataBuf* pMsg, int32_t code) { assert(pMsg != NULL); + SRequestObj* pRequest = param; + tsem_post(&pRequest->body.rspSem); } -int32_t processDropDbRsp(SRequestObj *pRequest, const char* pMsg, int32_t msgLen) { +int32_t processDropDbRsp(void* param, const SDataBuf* pMsg, int32_t code) { // todo: Remove cache in catalog cache. + SRequestObj* pRequest = param; + tsem_post(&pRequest->body.rspSem); } void initMsgHandleFp() { diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index 2938d0180a958af8ace229e009e1b682e895abe4..1a5872f77a6eb6af5d7af2c97caad74a99aaa670 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -49,55 +49,55 @@ int main(int argc, char** argv) { TEST(testCase, driverInit_Test) { taos_init(); } - TEST(testCase, connect_Test) { - TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0); -// assert(pConn != NULL); - taos_close(pConn); -} - - TEST(testCase, create_user_Test) { +TEST(testCase, connect_Test) { TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0); assert(pConn != NULL); - - TAOS_RES* pRes = taos_query(pConn, "create user abc pass 'abc'"); - if (taos_errno(pRes) != TSDB_CODE_SUCCESS) { - printf("failed to create user, reason:%s\n", taos_errstr(pRes)); - } - - taos_free_result(pRes); taos_close(pConn); } - TEST(testCase, create_account_Test) { - TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0); - assert(pConn != NULL); - - TAOS_RES* pRes = taos_query(pConn, "create account aabc pass 'abc'"); - if (taos_errno(pRes) != TSDB_CODE_SUCCESS) { - printf("failed to create user, reason:%s\n", taos_errstr(pRes)); - } - - taos_free_result(pRes); - taos_close(pConn); -} +//TEST(testCase, create_user_Test) { +// TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0); +// assert(pConn != NULL); +// +// TAOS_RES* pRes = taos_query(pConn, "create user abc pass 'abc'"); +// if (taos_errno(pRes) != TSDB_CODE_SUCCESS) { +// printf("failed to create user, reason:%s\n", taos_errstr(pRes)); +// } +// +// taos_free_result(pRes); +// taos_close(pConn); +//} +// +//TEST(testCase, create_account_Test) { +// TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0); +// assert(pConn != NULL); +// +// TAOS_RES* pRes = taos_query(pConn, "create account aabc pass 'abc'"); +// if (taos_errno(pRes) != TSDB_CODE_SUCCESS) { +// printf("failed to create user, reason:%s\n", taos_errstr(pRes)); +// } +// +// taos_free_result(pRes); +// taos_close(pConn); +//} +// +//TEST(testCase, drop_account_Test) { +// TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0); +// assert(pConn != NULL); +// +// TAOS_RES* pRes = taos_query(pConn, "drop account aabc"); +// if (taos_errno(pRes) != TSDB_CODE_SUCCESS) { +// printf("failed to create user, reason:%s\n", taos_errstr(pRes)); +// } +// +// taos_free_result(pRes); +// taos_close(pConn); +//} - TEST(testCase, drop_account_Test) { +TEST(testCase, show_user_Test) { TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0); assert(pConn != NULL); - TAOS_RES* pRes = taos_query(pConn, "drop account aabc"); - if (taos_errno(pRes) != TSDB_CODE_SUCCESS) { - printf("failed to create user, reason:%s\n", taos_errstr(pRes)); - } - - taos_free_result(pRes); - taos_close(pConn); -} - - TEST(testCase, show_user_Test) { - TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0); -// assert(pConn != NULL); - TAOS_RES* pRes = taos_query(pConn, "show users"); TAOS_ROW pRow = NULL; @@ -113,7 +113,7 @@ TEST(testCase, driverInit_Test) { taos_init(); } taos_close(pConn); } - TEST(testCase, drop_user_Test) { +TEST(testCase, drop_user_Test) { TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0); assert(pConn != NULL); @@ -126,7 +126,7 @@ TEST(testCase, driverInit_Test) { taos_init(); } taos_close(pConn); } - TEST(testCase, show_db_Test) { +TEST(testCase, show_db_Test) { TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0); // assert(pConn != NULL); @@ -191,10 +191,15 @@ TEST(testCase, drop_db_test) { if (taos_errno(pRes) != 0) { printf("failed to drop db, reason:%s\n", taos_errstr(pRes)); } - taos_free_result(pRes); showDB(pConn); + + pRes = taos_query(pConn, "create database abc1"); + if (taos_errno(pRes) != 0) { + printf("create to drop db, reason:%s\n", taos_errstr(pRes)); + } + taos_free_result(pRes); taos_close(pConn); } @@ -248,7 +253,19 @@ TEST(testCase, show_stable_Test) { TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0); assert(pConn != NULL); - TAOS_RES* pRes = taos_query(pConn, "show stables"); + TAOS_RES* pRes = taos_query(pConn, "use abc1"); + if (taos_errno(pRes) != 0) { + printf("failed to use db, reason:%s\n", taos_errstr(pRes)); + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "show stables"); + if (taos_errno(pRes) != 0) { + printf("failed to show stables, reason:%s\n", taos_errstr(pRes)); + taos_free_result(pRes); + ASSERT_TRUE(false); + } + TAOS_ROW pRow = NULL; TAOS_FIELD* pFields = taos_fetch_fields(pRes); diff --git a/source/libs/parser/inc/astToMsg.h b/source/libs/parser/inc/astToMsg.h index c0c3c8386b5237707881204efd00dcda07896126..361b4e71cb0ca3834f78990aaffbef26d27d4bfe 100644 --- a/source/libs/parser/inc/astToMsg.h +++ b/source/libs/parser/inc/astToMsg.h @@ -7,7 +7,7 @@ SCreateUserMsg* buildUserManipulationMsg(SSqlInfo* pInfo, int32_t* outputLen, int64_t id, char* msgBuf, int32_t msgLen); SCreateAcctMsg* buildAcctManipulationMsg(SSqlInfo* pInfo, int32_t* outputLen, int64_t id, char* msgBuf, int32_t msgLen); SDropUserMsg* buildDropUserMsg(SSqlInfo* pInfo, int32_t* outputLen, int64_t id, char* msgBuf, int32_t msgLen); -SShowMsg* buildShowMsg(SShowInfo* pShowInfo, int64_t id, char* msgBuf, int32_t msgLen); +SShowMsg* buildShowMsg(SShowInfo* pShowInfo, SParseBasicCtx* pParseCtx, char* msgBuf, int32_t msgLen); SCreateDbMsg* buildCreateDbMsg(SCreateDbInfo* pCreateDbInfo, SParseBasicCtx *pCtx, SMsgBuf* pMsgBuf); SCreateStbMsg* buildCreateTableMsg(SCreateTableSql* pCreateTableSql, int32_t* len, SParseBasicCtx* pParseCtx, SMsgBuf* pMsgBuf); SDropTableMsg* buildDropTableMsg(SSqlInfo* pInfo, int32_t* len, SParseBasicCtx* pParseCtx, SMsgBuf* pMsgBuf); diff --git a/source/libs/parser/src/astToMsg.c b/source/libs/parser/src/astToMsg.c index 00f3d3f7162cb4df8de51d7755fce8911dbfaa57..7f2f15ac650845c5d15d2492bcd61e425caf2715 100644 --- a/source/libs/parser/src/astToMsg.c +++ b/source/libs/parser/src/astToMsg.c @@ -86,7 +86,7 @@ SDropUserMsg* buildDropUserMsg(SSqlInfo* pInfo, int32_t *msgLen, int64_t id, cha return pMsg; } -SShowMsg* buildShowMsg(SShowInfo* pShowInfo, int64_t id, char* msgBuf, int32_t msgLen) { +SShowMsg* buildShowMsg(SShowInfo* pShowInfo, SParseBasicCtx *pCtx, char* msgBuf, int32_t msgLen) { SShowMsg* pShowMsg = calloc(1, sizeof(SShowMsg)); pShowMsg->type = pShowInfo->showType; @@ -105,6 +105,12 @@ SShowMsg* buildShowMsg(SShowInfo* pShowInfo, int64_t id, char* msgBuf, int32_t m pShowMsg->payloadLen = htons(pEpAddr->n); } + if (pShowInfo->showType == TSDB_MGMT_TABLE_STB || pShowInfo->showType == TSDB_MGMT_TABLE_VGROUP) { + SName n = {0}; + tNameSetDbName(&n, pCtx->acctId, pCtx->db, strlen(pCtx->db)); + tNameGetFullDbName(&n, pShowMsg->db); + } + return pShowMsg; } diff --git a/source/libs/parser/src/astValidate.c b/source/libs/parser/src/astValidate.c index cd11607326a33a8b277dbed0367138e269865b86..2510f354fb3e8fda29012ed1ca26a0647e3bbb80 100644 --- a/source/libs/parser/src/astValidate.c +++ b/source/libs/parser/src/astValidate.c @@ -4090,7 +4090,7 @@ static int32_t setShowInfo(SShowInfo* pShowInfo, SParseBasicCtx *pCtx, void** ou } } - *output = buildShowMsg(pShowInfo, pCtx->requestId, pMsgBuf->buf, pMsgBuf->len); + *output = buildShowMsg(pShowInfo, pCtx, pMsgBuf->buf, pMsgBuf->len); *outputLen = sizeof(SShowMsg)/* + htons(pShowMsg->payloadLen)*/; return TSDB_CODE_SUCCESS; }