diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index 2e4093590ddb05dbd14f8d3c26939c9185c0d54a..c6218002b9a4e924ee5f09d8fa965700e68fc889 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -112,6 +112,7 @@ typedef struct STableMetaOutput { typedef struct SDataBuf { void *pData; uint32_t len; + void *handle; } SDataBuf; typedef int32_t (*__async_send_cb_fn_t)(void* param, const SDataBuf* pMsg, int32_t code); diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index d265ffaa94d9712bc2ea081d74bdfc8ffe1d5adc..e200712b87013c5464e8788de4cc61d33e238747 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -444,7 +444,7 @@ static void* hbThreadFunc(void* param) { } void *abuf = buf; tSerializeSClientHbBatchReq(&abuf, pReq); - SMsgSendInfo *pInfo = malloc(sizeof(SMsgSendInfo)); + SMsgSendInfo *pInfo = calloc(1, sizeof(SMsgSendInfo)); if (pInfo == NULL) { terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; tFreeClientHbBatchReq(pReq, false); diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index dfe7b12ce49aa870920f9bd05900b67b5f1767d8..5f084fd5a7e3cf3c48acd335a7c2a17fe6887772 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -12,10 +12,10 @@ #include "tpagedfile.h" #include "tref.h" -static int32_t initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet); -static SMsgSendInfo* buildConnectMsg(SRequestObj *pRequest); -static void destroySendMsgInfo(SMsgSendInfo* pMsgBody); -static void setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp); +static int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet); +static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest); +static void destroySendMsgInfo(SMsgSendInfo* pMsgBody); +static void setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp); static bool stringLengthCheck(const char* str, size_t maxsize) { if (str == NULL) { @@ -30,17 +30,11 @@ static bool stringLengthCheck(const char* str, size_t maxsize) { return true; } -static bool validateUserName(const char* user) { - return stringLengthCheck(user, TSDB_USER_LEN - 1); -} +static bool validateUserName(const char* user) { return stringLengthCheck(user, TSDB_USER_LEN - 1); } -static bool validatePassword(const char* passwd) { - return stringLengthCheck(passwd, TSDB_PASSWORD_LEN - 1); -} +static bool validatePassword(const char* passwd) { return stringLengthCheck(passwd, TSDB_PASSWORD_LEN - 1); } -static bool validateDbName(const char* db) { - return stringLengthCheck(db, TSDB_DB_NAME_LEN - 1); -} +static bool validateDbName(const char* db) { return stringLengthCheck(db, TSDB_DB_NAME_LEN - 1); } static char* getClusterKey(const char* user, const char* auth, const char* ip, int32_t port) { char key[512] = {0}; @@ -48,10 +42,12 @@ static char* getClusterKey(const char* user, const char* auth, const char* ip, i return strdup(key); } -static STscObj* taosConnectImpl(const char *user, const char *auth, const char *db, __taos_async_fn_t fp, void *param, SAppInstInfo* pAppInfo); -static void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols); +static STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __taos_async_fn_t fp, void* param, + SAppInstInfo* pAppInfo); +static void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols); -TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, const char *auth, const char *db, uint16_t port) { +TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db, + uint16_t port) { if (taos_init() != TSDB_CODE_SUCCESS) { return NULL; } @@ -63,7 +59,7 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, char localDb[TSDB_DB_NAME_LEN] = {0}; if (db != NULL) { - if(!validateDbName(db)) { + if (!validateDbName(db)) { terrno = TSDB_CODE_TSC_INVALID_DB_LENGTH; return NULL; } @@ -79,7 +75,7 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, return NULL; } - taosEncryptPass_c((uint8_t *)pass, strlen(pass), secretEncrypt); + taosEncryptPass_c((uint8_t*)pass, strlen(pass), secretEncrypt); } else { tstrncpy(secretEncrypt, auth, tListLen(secretEncrypt)); } @@ -99,7 +95,7 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, } } - char* key = getClusterKey(user, secretEncrypt, ip, port); + char* key = getClusterKey(user, secretEncrypt, ip, port); SAppInstInfo** pInst = NULL; pthread_mutex_lock(&appInfo.mutex); @@ -108,7 +104,7 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, SAppInstInfo* p = NULL; if (pInst == NULL) { p = calloc(1, sizeof(struct SAppInstInfo)); - p->mgmtEp = epSet; + p->mgmtEp = epSet; p->pTransporter = openTransporter(user, secretEncrypt, tsNumOfCores); p->pAppHbMgr = appHbMgrInit(p, key); taosHashPut(appInfo.pInstMap, key, strlen(key), &p, POINTER_BYTES); @@ -122,7 +118,7 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, return taosConnectImpl(user, &secretEncrypt[0], localDb, NULL, NULL, *pInst); } -int32_t buildRequest(STscObj *pTscObj, const char *sql, int sqlLen, SRequestObj** pRequest) { +int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj** pRequest) { *pRequest = createRequest(pTscObj, NULL, NULL, TSDB_SQL_SELECT); if (*pRequest == NULL) { tscError("failed to malloc sqlObj"); @@ -131,7 +127,7 @@ int32_t buildRequest(STscObj *pTscObj, const char *sql, int sqlLen, SRequestObj* (*pRequest)->sqlstr = malloc(sqlLen + 1); if ((*pRequest)->sqlstr == NULL) { - tscError("0x%"PRIx64" failed to prepare sql string buffer", (*pRequest)->self); + tscError("0x%" PRIx64 " failed to prepare sql string buffer", (*pRequest)->self); (*pRequest)->msgBuf = strdup("failed to prepare sql string buffer"); return TSDB_CODE_TSC_OUT_OF_MEMORY; } @@ -140,7 +136,7 @@ int32_t buildRequest(STscObj *pTscObj, const char *sql, int sqlLen, SRequestObj* (*pRequest)->sqlstr[sqlLen] = 0; (*pRequest)->sqlLen = sqlLen; - tscDebugL("0x%"PRIx64" SQL: %s, reqId:0x%"PRIx64, (*pRequest)->self, (*pRequest)->sqlstr, (*pRequest)->requestId); + tscDebugL("0x%" PRIx64 " SQL: %s, reqId:0x%" PRIx64, (*pRequest)->self, (*pRequest)->sqlstr, (*pRequest)->requestId); return TSDB_CODE_SUCCESS; } @@ -148,14 +144,14 @@ int32_t parseSql(SRequestObj* pRequest, SQueryNode** pQuery) { STscObj* pTscObj = pRequest->pTscObj; SParseContext cxt = { - .requestId = pRequest->requestId, - .acctId = pTscObj->acctId, - .db = getDbOfConnection(pTscObj), - .pSql = pRequest->sqlstr, - .sqlLen = pRequest->sqlLen, - .pMsg = pRequest->msgBuf, - .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE, - .pTransporter = pTscObj->pAppInfo->pTransporter, + .requestId = pRequest->requestId, + .acctId = pTscObj->acctId, + .db = getDbOfConnection(pTscObj), + .pSql = pRequest->sqlstr, + .sqlLen = pRequest->sqlLen, + .pMsg = pRequest->msgBuf, + .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE, + .pTransporter = pTscObj->pAppInfo->pTransporter, }; cxt.mgmtEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp); @@ -174,9 +170,9 @@ int32_t parseSql(SRequestObj* pRequest, SQueryNode** pQuery) { int32_t execDdlQuery(SRequestObj* pRequest, SQueryNode* pQuery) { SDclStmtInfo* pDcl = (SDclStmtInfo*)pQuery; pRequest->type = pDcl->msgType; - pRequest->body.requestMsg = (SDataBuf){.pData = pDcl->pMsg, .len = pDcl->msgLen}; + pRequest->body.requestMsg = (SDataBuf){.pData = pDcl->pMsg, .len = pDcl->msgLen, .handle = NULL}; - STscObj* pTscObj = pRequest->pTscObj; + STscObj* pTscObj = pRequest->pTscObj; SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest); int64_t transporterId = 0; @@ -202,7 +198,7 @@ int32_t getPlan(SRequestObj* pRequest, SQueryNode* pQueryNode, SQueryDag** pDag, SSchema* pSchema = NULL; int32_t numOfCols = 0; - int32_t code = qCreateQueryDag(pQueryNode, pDag, &pSchema, &numOfCols, pNodeList, pRequest->requestId); + int32_t code = qCreateQueryDag(pQueryNode, pDag, &pSchema, &numOfCols, pNodeList, pRequest->requestId); if (code != 0) { return code; } @@ -224,7 +220,7 @@ void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t for (int32_t i = 0; i < pResInfo->numOfCols; ++i) { pResInfo->fields[i].bytes = pSchema[i].bytes; - pResInfo->fields[i].type = pSchema[i].type; + pResInfo->fields[i].type = pSchema[i].type; tstrncpy(pResInfo->fields[i].name, pSchema[i].name, tListLen(pResInfo->fields[i].name)); } } @@ -233,7 +229,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, SArray* pNodeList) void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter; if (TSDB_SQL_INSERT == pRequest->type || TSDB_SQL_CREATE_TABLE == pRequest->type) { SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf}; - int32_t code = schedulerExecJob(pTransporter, NULL, pDag, &pRequest->body.pQueryJob, pRequest->sqlstr, &res); + int32_t code = schedulerExecJob(pTransporter, NULL, pDag, &pRequest->body.pQueryJob, pRequest->sqlstr, &res); if (code != TSDB_CODE_SUCCESS) { // handle error and retry } else { @@ -250,9 +246,9 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, SArray* pNodeList) return schedulerAsyncExecJob(pTransporter, pNodeList, pDag, pRequest->sqlstr, &pRequest->body.pQueryJob); } -TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) { - STscObj *pTscObj = (STscObj *)taos; - if (sqlLen > (size_t) TSDB_MAX_ALLOWED_SQL_LEN) { +TAOS_RES* taos_query_l(TAOS* taos, const char* sql, int sqlLen) { + STscObj* pTscObj = (STscObj*)taos; + if (sqlLen > (size_t)TSDB_MAX_ALLOWED_SQL_LEN) { tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN); terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT; return NULL; @@ -260,9 +256,9 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) { nPrintTsc("%s", sql) - SRequestObj *pRequest = NULL; - SQueryNode *pQueryNode = NULL; - SArray *pNodeList = taosArrayInit(4, sizeof(struct SQueryNodeAddr)); + SRequestObj* pRequest = NULL; + SQueryNode* pQueryNode = NULL; + SArray* pNodeList = taosArrayInit(4, sizeof(struct SQueryNodeAddr)); terrno = TSDB_CODE_SUCCESS; CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return); @@ -286,13 +282,13 @@ _return: return pRequest; } -int initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet) { +int initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet) { pEpSet->version = 0; // init mnode ip set - SEpSet *mgmtEpSet = &(pEpSet->epSet); + SEpSet* mgmtEpSet = &(pEpSet->epSet); mgmtEpSet->numOfEps = 0; - mgmtEpSet->inUse = 0; + mgmtEpSet->inUse = 0; if (firstEp && firstEp[0] != 0) { if (strlen(firstEp) >= TSDB_EP_LEN) { @@ -322,14 +318,15 @@ int initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSe return 0; } -STscObj* taosConnectImpl(const char *user, const char *auth, const char *db, __taos_async_fn_t fp, void *param, SAppInstInfo* pAppInfo) { - STscObj *pTscObj = createTscObj(user, auth, db, pAppInfo); +STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __taos_async_fn_t fp, void* param, + SAppInstInfo* pAppInfo) { + STscObj* pTscObj = createTscObj(user, auth, db, pAppInfo); if (NULL == pTscObj) { terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; return pTscObj; } - SRequestObj *pRequest = createRequest(pTscObj, fp, param, TDMT_MND_CONNECT); + SRequestObj* pRequest = createRequest(pTscObj, fp, param, TDMT_MND_CONNECT); if (pRequest == NULL) { destroyTscObj(pTscObj); terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; @@ -343,42 +340,44 @@ STscObj* taosConnectImpl(const char *user, const char *auth, const char *db, __t tsem_wait(&pRequest->body.rspSem); if (pRequest->code != TSDB_CODE_SUCCESS) { - const char *errorMsg = (pRequest->code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(pRequest->code); + const char* errorMsg = + (pRequest->code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(pRequest->code); printf("failed to connect to server, reason: %s\n\n", errorMsg); destroyRequest(pRequest); taos_close(pTscObj); pTscObj = NULL; } else { - tscDebug("0x%"PRIx64" connection is opening, connId:%d, dnodeConn:%p, reqId:0x%"PRIx64, pTscObj->id, pTscObj->connId, pTscObj->pAppInfo->pTransporter, pRequest->requestId); + tscDebug("0x%" PRIx64 " connection is opening, connId:%d, dnodeConn:%p, reqId:0x%" PRIx64, pTscObj->id, + pTscObj->connId, pTscObj->pAppInfo->pTransporter, pRequest->requestId); destroyRequest(pRequest); } return pTscObj; } -static SMsgSendInfo* buildConnectMsg(SRequestObj *pRequest) { - SMsgSendInfo *pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo)); +static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest) { + SMsgSendInfo* pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo)); if (pMsgSendInfo == NULL) { terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; return NULL; } - pMsgSendInfo->msgType = TDMT_MND_CONNECT; - pMsgSendInfo->msgInfo.len = sizeof(SConnectReq); + pMsgSendInfo->msgType = TDMT_MND_CONNECT; + pMsgSendInfo->msgInfo.len = sizeof(SConnectReq); pMsgSendInfo->requestObjRefId = pRequest->self; - pMsgSendInfo->requestId = pRequest->requestId; - pMsgSendInfo->fp = handleRequestRspFp[TMSG_INDEX(pMsgSendInfo->msgType)]; - pMsgSendInfo->param = pRequest; + pMsgSendInfo->requestId = pRequest->requestId; + pMsgSendInfo->fp = handleRequestRspFp[TMSG_INDEX(pMsgSendInfo->msgType)]; + pMsgSendInfo->param = pRequest; - SConnectReq *pConnect = calloc(1, sizeof(SConnectReq)); + SConnectReq* pConnect = calloc(1, sizeof(SConnectReq)); if (pConnect == NULL) { tfree(pMsgSendInfo); terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; return NULL; } - STscObj *pObj = pRequest->pTscObj; + STscObj* pObj = pRequest->pTscObj; char* db = getDbOfConnection(pObj); if (db != NULL) { @@ -401,17 +400,17 @@ static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) { } void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { - SMsgSendInfo *pSendInfo = (SMsgSendInfo *) pMsg->ahandle; + SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->ahandle; assert(pMsg->ahandle != NULL); if (pSendInfo->requestObjRefId != 0) { - SRequestObj *pRequest = (SRequestObj *)taosAcquireRef(clientReqRefPool, pSendInfo->requestObjRefId); + SRequestObj* pRequest = (SRequestObj*)taosAcquireRef(clientReqRefPool, pSendInfo->requestObjRefId); assert(pRequest->self == pSendInfo->requestObjRefId); pRequest->metric.rsp = taosGetTimestampMs(); pRequest->code = pMsg->code; - STscObj *pTscObj = pRequest->pTscObj; + STscObj* pTscObj = pRequest->pTscObj; if (pEpSet) { if (!isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, pEpSet)) { updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, pEpSet); @@ -419,22 +418,22 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { } /* - * There is not response callback function for submit response. - * The actual inserted number of points is the first number. + * There is not response callback function for submit response. + * The actual inserted number of points is the first number. */ int32_t elapsed = pRequest->metric.rsp - pRequest->metric.start; if (pMsg->code == TSDB_CODE_SUCCESS) { - tscDebug("0x%" PRIx64 " message:%s, code:%s rspLen:%d, elapsed:%d ms, reqId:0x%"PRIx64, pRequest->self, - TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed, pRequest->requestId); + tscDebug("0x%" PRIx64 " message:%s, code:%s rspLen:%d, elapsed:%d ms, reqId:0x%" PRIx64, pRequest->self, + TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed, pRequest->requestId); } else { - tscError("0x%" PRIx64 " SQL cmd:%s, code:%s rspLen:%d, elapsed time:%d ms, reqId:0x%"PRIx64, pRequest->self, - TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed, pRequest->requestId); + tscError("0x%" PRIx64 " SQL cmd:%s, code:%s rspLen:%d, elapsed time:%d ms, reqId:0x%" PRIx64, pRequest->self, + TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed, pRequest->requestId); } taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId); } - SDataBuf buf = {.len = pMsg->contLen, .pData = NULL}; + SDataBuf buf = {.len = pMsg->contLen, .pData = NULL, .handle = pMsg->handle}; if (pMsg->contLen > 0) { buf.pData = calloc(1, pMsg->contLen); @@ -451,7 +450,7 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { destroySendMsgInfo(pSendInfo); } -TAOS *taos_connect_auth(const char *ip, const char *user, const char *auth, const char *db, uint16_t port) { +TAOS* taos_connect_auth(const char* ip, const char* user, const char* auth, const char* db, uint16_t port) { tscDebug("try to connect to %s:%u by auth, user:%s db:%s", ip, port, user, db); if (user == NULL) { user = TSDB_DEFAULT_USER; @@ -465,16 +464,17 @@ TAOS *taos_connect_auth(const char *ip, const char *user, const char *auth, cons return taos_connect_internal(ip, user, NULL, auth, db, port); } -TAOS *taos_connect_l(const char *ip, int ipLen, const char *user, int userLen, const char *pass, int passLen, const char *db, int dbLen, uint16_t port) { - char ipStr[TSDB_EP_LEN] = {0}; +TAOS* taos_connect_l(const char* ip, int ipLen, const char* user, int userLen, const char* pass, int passLen, + const char* db, int dbLen, uint16_t port) { + char ipStr[TSDB_EP_LEN] = {0}; char dbStr[TSDB_DB_NAME_LEN] = {0}; - char userStr[TSDB_USER_LEN] = {0}; - char passStr[TSDB_PASSWORD_LEN] = {0}; + char userStr[TSDB_USER_LEN] = {0}; + char passStr[TSDB_PASSWORD_LEN] = {0}; - strncpy(ipStr, ip, TMIN(TSDB_EP_LEN - 1, ipLen)); + strncpy(ipStr, ip, TMIN(TSDB_EP_LEN - 1, ipLen)); strncpy(userStr, user, TMIN(TSDB_USER_LEN - 1, userLen)); strncpy(passStr, pass, TMIN(TSDB_PASSWORD_LEN - 1, passLen)); - strncpy(dbStr, db, TMIN(TSDB_DB_NAME_LEN - 1, dbLen)); + strncpy(dbStr, db, TMIN(TSDB_DB_NAME_LEN - 1, dbLen)); return taos_connect(ipStr, userStr, passStr, dbStr, port); } @@ -492,15 +492,15 @@ void* doFetchRow(SRequestObj* pRequest) { } SReqResultInfo* pResInfo = &pRequest->body.resInfo; - int32_t code = schedulerFetchRows(pRequest->body.pQueryJob, (void **)&pResInfo->pData); + int32_t code = schedulerFetchRows(pRequest->body.pQueryJob, (void**)&pResInfo->pData); if (code != TSDB_CODE_SUCCESS) { pRequest->code = code; return NULL; } setQueryResultFromRsp(&pRequest->body.resInfo, (SRetrieveTableRsp*)pResInfo->pData); - tscDebug("0x%"PRIx64 " fetch results, numOfRows:%d total Rows:%"PRId64", complete:%d, reqId:0x%"PRIx64, pRequest->self, pResInfo->numOfRows, - pResInfo->totalRows, pResInfo->completed, pRequest->requestId); + tscDebug("0x%" PRIx64 " fetch results, numOfRows:%d total Rows:%" PRId64 ", complete:%d, reqId:0x%" PRIx64, + pRequest->self, pResInfo->numOfRows, pResInfo->totalRows, pResInfo->completed, pRequest->requestId); if (pResultInfo->numOfRows == 0) { return NULL; @@ -513,7 +513,7 @@ void* doFetchRow(SRequestObj* pRequest) { } else if (pRequest->type == TDMT_VND_SHOW_TABLES) { pRequest->type = TDMT_VND_SHOW_TABLES_FETCH; SShowReqInfo* pShowReqInfo = &pRequest->body.showInfo; - SVgroupInfo* pVgroupInfo = taosArrayGet(pShowReqInfo->pArray, pShowReqInfo->currentIndex); + SVgroupInfo* pVgroupInfo = taosArrayGet(pShowReqInfo->pArray, pShowReqInfo->currentIndex); epSet = pVgroupInfo->epset; } else if (pRequest->type == TDMT_VND_SHOW_TABLES_FETCH) { @@ -524,7 +524,7 @@ void* doFetchRow(SRequestObj* pRequest) { return NULL; } - SVgroupInfo* pVgroupInfo = taosArrayGet(pShowReqInfo->pArray, pShowReqInfo->currentIndex); + SVgroupInfo* pVgroupInfo = taosArrayGet(pShowReqInfo->pArray, pShowReqInfo->currentIndex); SVShowTablesReq* pShowReq = calloc(1, sizeof(SVShowTablesReq)); pShowReq->head.vgId = htonl(pVgroupInfo->vgId); @@ -535,14 +535,14 @@ void* doFetchRow(SRequestObj* pRequest) { epSet = pVgroupInfo->epset; int64_t transporterId = 0; - STscObj *pTscObj = pRequest->pTscObj; + STscObj* pTscObj = pRequest->pTscObj; asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, body); tsem_wait(&pRequest->body.rspSem); pRequest->type = TDMT_VND_SHOW_TABLES_FETCH; - } else if (pRequest->type == TDMT_MND_SHOW_RETRIEVE) { + } else if (pRequest->type == TDMT_MND_SHOW_RETRIEVE) { epSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp); - + if (pResultInfo->completed) { return NULL; } @@ -551,7 +551,7 @@ void* doFetchRow(SRequestObj* pRequest) { SMsgSendInfo* body = buildMsgInfoImpl(pRequest); int64_t transporterId = 0; - STscObj *pTscObj = pRequest->pTscObj; + STscObj* pTscObj = pRequest->pTscObj; asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, body); tsem_wait(&pRequest->body.rspSem); @@ -564,7 +564,7 @@ void* doFetchRow(SRequestObj* pRequest) { _return: - for(int32_t i = 0; i < pResultInfo->numOfCols; ++i) { + for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) { pResultInfo->row[i] = pResultInfo->pCol[i] + pResultInfo->fields[i].bytes * pResultInfo->current; if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) { pResultInfo->length[i] = varDataLen(pResultInfo->row[i]); @@ -578,8 +578,8 @@ _return: static void doPrepareResPtr(SReqResultInfo* pResInfo) { if (pResInfo->row == NULL) { - pResInfo->row = calloc(pResInfo->numOfCols, POINTER_BYTES); - pResInfo->pCol = calloc(pResInfo->numOfCols, POINTER_BYTES); + pResInfo->row = calloc(pResInfo->numOfCols, POINTER_BYTES); + pResInfo->pCol = calloc(pResInfo->numOfCols, POINTER_BYTES); pResInfo->length = calloc(pResInfo->numOfCols, sizeof(int32_t)); } } @@ -596,14 +596,14 @@ void setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t int32_t offset = 0; for (int32_t i = 0; i < numOfCols; ++i) { pResultInfo->length[i] = pResultInfo->fields[i].bytes; - pResultInfo->row[i] = (char*) (pResultInfo->pData + offset * pResultInfo->numOfRows); - pResultInfo->pCol[i] = pResultInfo->row[i]; + pResultInfo->row[i] = (char*)(pResultInfo->pData + offset * pResultInfo->numOfRows); + pResultInfo->pCol[i] = pResultInfo->row[i]; offset += pResultInfo->fields[i].bytes; } } char* getDbOfConnection(STscObj* pObj) { - char *p = NULL; + char* p = NULL; pthread_mutex_lock(&pObj->mutex); size_t len = strlen(pObj->db); if (len > 0) { @@ -624,10 +624,10 @@ void setConnectionDB(STscObj* pTscObj, const char* db) { void setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp) { assert(pResultInfo != NULL && pRsp != NULL); - pResultInfo->pRspMsg = (const char*) pRsp; - pResultInfo->pData = (void*) pRsp->data; + pResultInfo->pRspMsg = (const char*)pRsp; + pResultInfo->pData = (void*)pRsp->data; pResultInfo->numOfRows = htonl(pRsp->numOfRows); - pResultInfo->current = 0; + pResultInfo->current = 0; pResultInfo->completed = (pRsp->completed == 1); pResultInfo->totalRows += pResultInfo->numOfRows; diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index 8ab1880069dd015aab08015217ea3f925741e80b..67224e6aa2c8d10b05e06db59756219d2fd5bf06 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -105,6 +105,7 @@ SMsgSendInfo* buildMsgInfoImpl(SRequestObj *pRequest) { pRetrieveMsg->showId = htobe64(pRequest->body.showInfo.execId); pMsgSendInfo->msgInfo.pData = pRetrieveMsg; pMsgSendInfo->msgInfo.len = sizeof(SRetrieveTableReq); + pMsgSendInfo->msgInfo.handle = NULL; } else { SVShowTablesFetchReq* pFetchMsg = calloc(1, sizeof(SVShowTablesFetchReq)); if (pFetchMsg == NULL) { @@ -116,6 +117,7 @@ SMsgSendInfo* buildMsgInfoImpl(SRequestObj *pRequest) { pMsgSendInfo->msgInfo.pData = pFetchMsg; pMsgSendInfo->msgInfo.len = sizeof(SVShowTablesFetchReq); + pMsgSendInfo->msgInfo.handle = NULL; } } else { assert(pRequest != NULL); diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index e57901ed2ed8a4fe030c405d93e913fc237cb00c..ce8ba4e665e29b21b5eb054d48bfb566c83f73ae 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -252,7 +252,7 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) { SMqSubscribeCbParam param = {.rspErr = TMQ_RESP_ERR__SUCCESS, .tmq = tmq}; tsem_init(¶m.rspSem, 0, 0); - pRequest->body.requestMsg = (SDataBuf){.pData = buf, .len = tlen}; + pRequest->body.requestMsg = (SDataBuf){.pData = buf, .len = tlen, .handle = NULL}; SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest); sendInfo->param = ¶m; @@ -381,7 +381,7 @@ TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, i tSerializeSCMCreateTopicReq(&abuf, &req); /*printf("formatted: %s\n", dagStr);*/ - pRequest->body.requestMsg = (SDataBuf){.pData = buf, .len = tlen}; + pRequest->body.requestMsg = (SDataBuf){.pData = buf, .len = tlen, .handle = NULL}; pRequest->type = TDMT_MND_CREATE_TOPIC; SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest); @@ -686,7 +686,7 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { tsem_init(¶m->rspSem, 0, 0); SRequestObj* pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_VND_CONSUME); - pRequest->body.requestMsg = (SDataBuf){.pData = pReq, .len = sizeof(SMqConsumeReq)}; + pRequest->body.requestMsg = (SDataBuf){.pData = pReq, .len = sizeof(SMqConsumeReq), .handle = NULL}; SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest); sendInfo->requestObjRefId = 0; diff --git a/source/libs/qcom/src/queryUtil.c b/source/libs/qcom/src/queryUtil.c index 9f678b4528e28aef8b48f94e670dbedf7c587f33..4cd040c238931cb228ffcc4ebb6fb1fd3e781f17 100644 --- a/source/libs/qcom/src/queryUtil.c +++ b/source/libs/qcom/src/queryUtil.c @@ -137,7 +137,7 @@ int32_t asyncSendMsgToServer(void *pTransporter, SEpSet* epSet, int64_t* pTransp .pCont = pMsg, .contLen = pInfo->msgInfo.len, .ahandle = (void*) pInfo, - .handle = NULL, + .handle = pInfo->msgInfo.handle, .code = 0 }; diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index 94c833307f898462bd33c8b72f92b814c6eebecd..42270cd6453a0e51d1f2a29d045b23898ab6ffe7 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -95,6 +95,7 @@ typedef struct SSchTask { int32_t childReady; // child task ready number SArray *children; // the datasource tasks,from which to fetch the result, element is SQueryTask* SArray *parents; // the data destination tasks, get data from current task, element is SQueryTask* + void* handle; // task send handle } SSchTask; typedef struct SSchJobAttr { diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 1b6d871274d5f3a6d35b74d039daec576c96ac75..ab6873ef9c7eb4ba6490605094f9e97f57fe3bc2 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -18,6 +18,10 @@ #include "query.h" #include "catalog.h" +typedef struct SSchTrans { + void *transInst; + void *transHandle; +}SSchTrans; static SSchedulerMgmt schMgmt = {0}; uint64_t schGenTaskId(void) { @@ -932,6 +936,7 @@ int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, in pTask = *task; SCH_TASK_DLOG("rsp msg received, type:%s, code:%s", TMSG_INFO(msgType), tstrerror(rspCode)); + pTask->handle = pMsg->handle; SCH_ERR_JRET(schHandleResponseMsg(pJob, pTask, msgType, pMsg->pData, pMsg->len, rspCode)); _return: @@ -1000,6 +1005,9 @@ int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) { int32_t schAsyncSendMsg(void *transport, SEpSet* epSet, uint64_t qId, uint64_t tId, int32_t msgType, void *msg, uint32_t msgSize) { int32_t code = 0; + + SSchTrans *trans = (SSchTrans *)transport; + SMsgSendInfo* pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo)); if (NULL == pMsgSendInfo) { qError("QID:%"PRIx64 ",TID:%"PRIx64 " calloc %d failed", qId, tId, (int32_t)sizeof(SMsgSendInfo)); @@ -1018,14 +1026,16 @@ int32_t schAsyncSendMsg(void *transport, SEpSet* epSet, uint64_t qId, uint64_t t param->queryId = qId; param->taskId = tId; + pMsgSendInfo->param = param; pMsgSendInfo->msgInfo.pData = msg; pMsgSendInfo->msgInfo.len = msgSize; + pMsgSendInfo->msgInfo.handle = trans->transHandle; pMsgSendInfo->msgType = msgType; pMsgSendInfo->fp = fp; int64_t transporterId = 0; - code = asyncSendMsgToServer(transport, epSet, &transporterId, pMsgSendInfo); + code = asyncSendMsgToServer(trans->transInst, epSet, &transporterId, pMsgSendInfo); if (code) { SCH_ERR_JRET(code); } @@ -1149,7 +1159,8 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, atomic_store_32(&pTask->lastMsgType, msgType); - SCH_ERR_JRET(schAsyncSendMsg(pJob->transport, &epSet, pJob->queryId, pTask->taskId, msgType, msg, msgSize)); + SSchTrans trans = {.transInst = pJob->transport, .transHandle = pTask->handle}; + SCH_ERR_JRET(schAsyncSendMsg(&trans, &epSet, pJob->queryId, pTask->taskId, msgType, msg, msgSize)); if (isCandidateAddr) { SCH_ERR_RET(schRecordTaskExecNode(pJob, pTask, addr)); diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 9cab863ed75407a5ccb65f9d6b45ffed76606419..f1bd1ba980f7db5ee2221c9d16c7c141566f55e7 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -133,15 +133,18 @@ static void clientHandleResp(SCliConn* conn) { rpcMsg.msgType = pHead->msgType; rpcMsg.ahandle = pCtx->ahandle; - if (rpcMsg.msgType == TDMT_VND_QUERY_RSP || rpcMsg.msgType == TDMT_VND_FETCH_RSP) { + if (rpcMsg.msgType == TDMT_VND_QUERY_RSP || rpcMsg.msgType == TDMT_VND_FETCH_RSP || + rpcMsg.msgType == TDMT_VND_RES_READY) { rpcMsg.handle = conn; conn->persist = 1; + tDebug("client conn %p persist by app", conn); } tDebug("client conn %p %s received from %s:%d, local info: %s:%d", conn, TMSG_INFO(pHead->msgType), inet_ntoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port), inet_ntoa(conn->locaddr.sin_addr), ntohs(conn->locaddr.sin_port)); + conn->secured = pHead->secured; if (conn->push != NULL && conn->ctnRdCnt != 0) { (*conn->push->callback)(conn->push->arg, &rpcMsg); conn->push = NULL; @@ -156,7 +159,6 @@ static void clientHandleResp(SCliConn* conn) { } } conn->ctnRdCnt += 1; - conn->secured = pHead->secured; // buf's mem alread translated to rpcMsg.pCont transClearBuffer(&conn->readBuf); @@ -166,16 +168,14 @@ static void clientHandleResp(SCliConn* conn) { SCliThrdObj* pThrd = conn->hostThrd; // user owns conn->persist = 1 - if (conn->push == NULL || conn->persist == 0) { + if (conn->push == NULL && conn->persist == 0) { addConnToPool(pThrd->pool, pCtx->ip, pCtx->port, conn); - - destroyCmsg(conn->data); - conn->data = NULL; } - + destroyCmsg(conn->data); + conn->data = NULL; // start thread's timer of conn pool if not active if (!uv_is_active((uv_handle_t*)pThrd->timer) && pRpc->idleTime > 0) { - uv_timer_start((uv_timer_t*)pThrd->timer, clientTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0); + // uv_timer_start((uv_timer_t*)pThrd->timer, clientTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0); } } static void clientHandleExcept(SCliConn* pConn) { @@ -330,6 +330,9 @@ static void clientAllocBufferCb(uv_handle_t* handle, size_t suggested_size, uv_b } static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) { // impl later + if (handle->data == NULL) { + return; + } SCliConn* conn = handle->data; SConnBuffer* pBuf = &conn->readBuf; if (nread > 0) {