From d2605d0ece2b186b205c545fd0e1bf2d379327c5 Mon Sep 17 00:00:00 2001 From: slguan Date: Sat, 22 Feb 2020 11:46:59 +0800 Subject: [PATCH] connect from client to mgmt --- src/client/inc/tsclient.h | 8 +- src/client/src/tscAsync.c | 11 +- src/client/src/tscPrepare.c | 1 - src/client/src/tscProfile.c | 2 +- src/client/src/tscServer.c | 467 +++++++++--------------------------- src/client/src/tscSql.c | 22 +- src/client/src/tscSub.c | 1 - 7 files changed, 123 insertions(+), 389 deletions(-) diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 8753142c30..5e58ebe574 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -324,14 +324,12 @@ typedef struct _sql_obj { short vnode; int64_t stime; uint32_t queryId; - void * thandle; - SRpcIpSet *ipSet; void * pStream; void * pSubscription; char * sqlstr; char retry; char maxRetry; - uint8_t index; + SRpcIpSet *ipList; char freed : 4; char listed : 4; tsem_t rspSem; @@ -379,8 +377,8 @@ int tsParseSql(SSqlObj *pSql, bool multiVnodeInsertion); void tscInitMsgs(); extern int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo); -void *tscProcessMsgFromServer(char *msg, void *ahandle, void *thandle); -int tscProcessSql(SSqlObj *pSql); +void tscProcessMsgFromServer(char type, void *pCont, int contLen, void *ahandle, int32_t code); +int tscProcessSql(SSqlObj *pSql); void tscAsyncInsertMultiVnodesProxy(void *param, TAOS_RES *tres, int numOfRows); diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index a70a314298..b4fb277652 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -318,13 +318,6 @@ void tscProcessAsyncRes(SSchedMsg *pMsg) { int cmd = pCmd->command; int code = pRes->code ? -pRes->code : pRes->numOfRows; - if ((tscKeepConn[cmd] == 0 || (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS)) && - pSql->pStream == NULL) { - if (pSql->thandle) taosAddConnIntoCache(tscConnCache, pSql->thandle, pSql->ip, pSql->vnode, pTscObj->user); - - pSql->thandle = NULL; - } - // in case of async insert, restore the user specified callback function bool shouldFree = tscShouldFreeAsyncSqlObj(pSql); @@ -454,8 +447,8 @@ void tscMeterMetaCallBack(void *param, TAOS_RES *res, int code) { tscTrace("%p failed to renew meterMeta", pSql); tsem_post(&pSql->rspSem); } else { - tscTrace("%p renew meterMeta successfully, command:%d, code:%d, thandle:%p, retry:%d", - pSql, pSql->cmd.command, pSql->res.code, pSql->thandle, pSql->retry); + tscTrace("%p renew meterMeta successfully, command:%d, code:%d, retry:%d", + pSql, pSql->cmd.command, pSql->res.code, pSql->retry); SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0, 0); assert(pMeterMetaInfo->pMeterMeta == NULL); diff --git a/src/client/src/tscPrepare.c b/src/client/src/tscPrepare.c index 4ab63c18e9..cb991691f5 100644 --- a/src/client/src/tscPrepare.c +++ b/src/client/src/tscPrepare.c @@ -451,7 +451,6 @@ static int insertStmtExecute(STscStmt* stmt) { pRes->numOfTotalInCurrentClause = 0; pRes->qhandle = 0; - pSql->thandle = NULL; tscDoQuery(pSql); diff --git a/src/client/src/tscProfile.c b/src/client/src/tscProfile.c index b7c7623998..d6af2f59dc 100644 --- a/src/client/src/tscProfile.c +++ b/src/client/src/tscProfile.c @@ -145,7 +145,7 @@ void tscKillQuery(STscObj *pObj, uint32_t killId) { if (pSql == NULL) return; - tscTrace("%p query is killed, queryId:%d thandle:%p", pSql, killId, pSql->thandle); + tscTrace("%p query is killed, queryId:%d", pSql, killId); taos_stop_query(pSql); } diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index ac760cc296..e12841a2e7 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -123,8 +123,8 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { if (pRsp->killConnection) { tscKillConnection(pObj); } else { - if (pRsp->queryId) tscKillQuery(pObj, pRsp->queryId); - if (pRsp->streamId) tscKillStream(pObj, pRsp->streamId); + if (pRsp->queryId) tscKillQuery(pObj, htonl(pRsp->queryId)); + if (pRsp->streamId) tscKillStream(pObj, htonl(pRsp->streamId)); } } else { tscTrace("heart beat failed, code:%d", code); @@ -165,7 +165,7 @@ void tscProcessActivityTimer(void *handle, void *tmrId) { } if (tscShouldFreeHeatBeat(pObj->pHb)) { - tscTrace("%p free HB object and release connection, pConn:%p", pObj, pObj->pHb->thandle); + tscTrace("%p free HB object and release connection", pObj); //taosCloseRpcConn(pObj->pHb->thandle); tscFreeSqlObj(pObj->pHb); @@ -176,344 +176,113 @@ void tscProcessActivityTimer(void *handle, void *tmrId) { tscProcessSql(pObj->pHb); } - -void tscGetConnToMgmt(SSqlObj *pSql, uint8_t *pCode) { - STscObj *pTscObj = pSql->pTscObj; - if (pSql->retry < tscGetMgmtConnMaxRetryTimes()) { - *pCode = 0; - pSql->retry++; - pSql->index = pSql->index % tscMgmtIpList.numOfIps; - if (pSql->cmd.command > TSDB_SQL_READ && pSql->index == 0) pSql->index = 1; - void *thandle = taosGetConnFromCache(tscConnCache, tscMgmtIpList.ip[pSql->index], TSC_MGMT_VNODE, pTscObj->user); - - -// if (thandle == NULL) { -// SRpcConnInit connInit; -// memset(&connInit, 0, sizeof(connInit)); -// connInit.cid = 0; -// connInit.sid = 0; -// connInit.meterId = pSql->pTscObj->user; -// connInit.peerId = 0; -// connInit.shandle = pTscMgmtConn; -// connInit.ahandle = pSql; -// connInit.peerPort = tsMgmtShellPort; -// connInit.spi = 1; -// connInit.encrypt = 0; -// connInit.secret = pSql->pTscObj->pass; -// -// connInit.peerIp = tscMgmtIpList.ipstr[pSql->index]; -// thandle = taosOpenRpcConn(&connInit, pCode); -// } - - pSql->thandle = thandle; - pSql->ip = tscMgmtIpList.ip[pSql->index]; - pSql->vnode = TSC_MGMT_VNODE; - tscTrace("%p mgmt index:%d ip:0x%x is picked up, pConn:%p", pSql, pSql->index, tscMgmtIpList.ip[pSql->index], - pSql->thandle); - } - - // the pSql->res.code is the previous error(status) code. - if (pSql->thandle == NULL && pSql->retry >= pSql->maxRetry) { - if (pSql->res.code != TSDB_CODE_SUCCESS && pSql->res.code != TSDB_CODE_ACTION_IN_PROGRESS) { - *pCode = pSql->res.code; - } - - tscError("%p reach the max retry:%d, code:%d", pSql, pSql->retry, *pCode); - } -} - -void tscGetConnToVnode(SSqlObj *pSql, uint8_t *pCode) { - SVPeerDesc *pVPeersDesc = NULL; - static int vidIndex = 0; - STscObj * pTscObj = pSql->pTscObj; - - pSql->thandle = NULL; - - SSqlCmd * pCmd = &pSql->cmd; - SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0); - - if (UTIL_METER_IS_SUPERTABLE(pMeterMetaInfo)) { // multiple vnode query - SVnodeSidList *vnodeList = tscGetVnodeSidList(pMeterMetaInfo->pMetricMeta, pMeterMetaInfo->vnodeIndex); - if (vnodeList != NULL) { - pVPeersDesc = vnodeList->vpeerDesc; - } - } else { - SMeterMeta *pMeta = pMeterMetaInfo->pMeterMeta; - if (pMeta == NULL) { - tscError("%p pMeterMeta is NULL", pSql); - pSql->retry = pSql->maxRetry; - return; - } - pVPeersDesc = pMeta->vpeerDesc; - } - - if (pVPeersDesc == NULL) { - pSql->retry = pSql->maxRetry; - tscError("%p pVPeerDesc is NULL", pSql); - } - - while (pSql->retry < pSql->maxRetry) { - (pSql->retry)++; - char ipstr[40] = {0}; - if (pVPeersDesc[pSql->index].ip == 0) { - /* - * in the edge edition, ip is 0, and at this time we use masterIp instead - * in the cluster edition, ip is vnode ip - */ - pVPeersDesc[pSql->index].ip = tscMgmtIpList.ip[0]; - } - *pCode = TSDB_CODE_SUCCESS; - - void *thandle = - taosGetConnFromCache(tscConnCache, pVPeersDesc[pSql->index].ip, pVPeersDesc[pSql->index].vnode, pTscObj->user); - -// if (thandle == NULL) { -// SRpcConnInit connInit; -// tinet_ntoa(ipstr, pVPeersDesc[pSql->index].ip); -// memset(&connInit, 0, sizeof(connInit)); -// connInit.cid = vidIndex; -// connInit.sid = 0; -// connInit.spi = 0; -// connInit.encrypt = 0; -// connInit.meterId = pSql->pTscObj->user; -// connInit.peerId = htonl((pVPeersDesc[pSql->index].vnode << TSDB_SHELL_VNODE_BITS)); -// connInit.shandle = pVnodeConn; -// connInit.ahandle = pSql; -// connInit.peerIp = ipstr; -// connInit.peerPort = tsVnodeShellPort; -// thandle = taosOpenRpcConn(&connInit, pCode); -// vidIndex = (vidIndex + 1) % tscNumOfThreads; -// } - - pSql->thandle = thandle; - pSql->ip = pVPeersDesc[pSql->index].ip; - pSql->vnode = pVPeersDesc[pSql->index].vnode; - tscTrace("%p vnode:%d ip:%p index:%d is picked up, pConn:%p", pSql, pVPeersDesc[pSql->index].vnode, - pVPeersDesc[pSql->index].ip, pSql->index, pSql->thandle); - - //TODO fetch from vpeerdesc - pSql->ipSet = &tscMgmtIpList; - break; - } - - // the pSql->res.code is the previous error(status) code. - if (pSql->thandle == NULL && pSql->retry >= pSql->maxRetry) { - if (pSql->res.code != TSDB_CODE_SUCCESS && pSql->res.code != TSDB_CODE_ACTION_IN_PROGRESS) { - *pCode = pSql->res.code; - } - - tscError("%p reach the max retry:%d, code:%d", pSql, pSql->retry, *pCode); - } -} - int tscSendMsgToServer(SSqlObj *pSql) { uint8_t code = TSDB_CODE_NETWORK_UNAVAIL; - if (pSql->thandle == NULL) { - if (pSql->cmd.command < TSDB_SQL_MGMT) - tscGetConnToVnode(pSql, &code); - else - tscGetConnToMgmt(pSql, &code); + /* + * the total length of message + * rpc header + actual message body + digest + * + * the pSql object may be released automatically during insert procedure, in which the access of + * message body by using "if (pHeader->msgType & 1)" may cause the segment fault. + * + */ + + // the memory will be released by taosProcessResponse, so no memory leak here + char *pStart = rpcMallocCont(pSql->cmd.payloadLen); + if (NULL == pStart) { + tscError("%p msg:%s malloc fail", pSql, taosMsg[pSql->cmd.msgType]); + return TSDB_CODE_CLI_OUT_OF_MEMORY; } - if (pSql->thandle) { + tscTrace("%p msg:%s is sent to server", pSql, taosMsg[pSql->cmd.msgType]); + + if (pStart) { /* - * the total length of message - * rpc header + actual message body + digest - * - * the pSql object may be released automatically during insert procedure, in which the access of - * message body by using "if (pHeader->msgType & 1)" may cause the segment fault. - * + * this SQL object may be released by other thread due to the completion of this query even before the log + * is dumped to log file. So the signature needs to be kept in a local variable. */ - size_t totalLen = pSql->cmd.payloadLen + tsRpcHeadSize + 100; + uint64_t signature = (uint64_t) pSql->signature; + //if (tscUpdateVnodeMsg[pSql->cmd.command]) (*tscUpdateVnodeMsg[pSql->cmd.command])(pSql, pStart); - // the memory will be released by taosProcessResponse, so no memory leak here - char *pStart = rpcMallocCont(pSql->cmd.payloadLen); - if (NULL == pStart) { - tscError("%p msg:%s malloc fail", pSql, taosMsg[pSql->cmd.msgType]); - return TSDB_CODE_CLI_OUT_OF_MEMORY; + if (pSql->cmd.command < TSDB_SQL_MGMT) { + rpcSendRequest(pTscMgmtConn, pSql->ipList, pSql->cmd.msgType, pStart, pSql->cmd.payloadLen, pSql); + } else { + rpcSendRequest(pVnodeConn, pSql->ipList, pSql->cmd.msgType, pStart, pSql->cmd.payloadLen, pSql); } - memcpy(pStart, pSql->cmd.payload + tsRpcHeadSize, pSql->cmd.payloadLen); - - tscTrace("%p msg:%s is sent to server", pSql, taosMsg[pSql->cmd.msgType]); - - if (pStart) { - /* - * this SQL object may be released by other thread due to the completion of this query even before the log - * is dumped to log file. So the signature needs to be kept in a local variable. - */ - uint64_t signature = (uint64_t)pSql->signature; - //if (tscUpdateVnodeMsg[pSql->cmd.command]) (*tscUpdateVnodeMsg[pSql->cmd.command])(pSql, pStart); - if (pSql->cmd.command < TSDB_SQL_MGMT) { - rpcSendRequest(pTscMgmtConn, &tscMgmtIpList, pSql->cmd.msgType, pStart, pSql->cmd.payloadLen, pSql); - } else { - SRpcIpSet rpcSet = tscMgmtIpList; - rpcSendRequest(pVnodeConn, &rpcSet, pSql->cmd.msgType, pStart, pSql->cmd.payloadLen, pSql); - } - - tscTrace("%p send msg code:%d sig:%p", pSql, code, signature); - } + tscTrace("%p send msg code:%d sig:%p", pSql, code, signature); } return code; } -void tscProcessMgmtRedirect(SSqlObj *pSql, uint8_t *cont) { -// SIpList *pIpList = (SIpList *)(cont); -// tscSetMgmtIpList(pIpList); -// -// if (pSql->cmd.command < TSDB_SQL_READ) { -// tsMasterIndex = 0; -// pSql->index = 0; -// } else { -// pSql->index++; -// } -// -// tscPrintMgmtIp(); -} - -void *tscProcessMsgFromServer(char *msg, void *ahandle, void *thandle) { - if (ahandle == NULL) return NULL; - - SIntMsg *pMsg = (SIntMsg *)msg; +void tscProcessMsgFromServer(char type, void *pCont, int contLen, void *ahandle, int32_t code) { SSqlObj *pSql = (SSqlObj *)ahandle; - SSqlRes *pRes = &pSql->res; - SSqlCmd *pCmd = &pSql->cmd; - STscObj *pObj = pSql->pTscObj; - int code = TSDB_CODE_NETWORK_UNAVAIL; - - if (pSql->signature != pSql) { + if (pSql == NULL || pSql->signature != pSql) { tscError("%p sql is already released, signature:%p", pSql, pSql->signature); - return NULL; - } - - if (pSql->thandle != thandle) { - tscError("%p thandle:%p is different from received:%p", pSql, pSql->thandle, thandle); - return NULL; + return; } - tscTrace("%p msg:%p is received from server, pConn:%p", pSql, msg, thandle); + SSqlRes *pRes = &pSql->res; + SSqlCmd *pCmd = &pSql->cmd; + STscObj *pObj = pSql->pTscObj; + tscTrace("%p msg:%p is received from server", pSql, pCont); if (pSql->freed || pObj->signature != pObj) { tscTrace("%p sql is already released or DB connection is closed, freed:%d pObj:%p signature:%p", pSql, pSql->freed, pObj, pObj->signature); - taosAddConnIntoCache(tscConnCache, pSql->thandle, pSql->ip, pSql->vnode, pObj->user); tscFreeSqlObj(pSql); - return ahandle; + rpcFreeCont(pCont); + return; } SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0); - if (msg == NULL) { - tscTrace("%p no response from ip:%s", pSql, taosIpStr(pSql->ip)); - - pSql->index++; - pSql->thandle = NULL; - // todo taos_stop_query() in async model + if (code == TSDB_CODE_NOT_ACTIVE_TABLE || code == TSDB_CODE_INVALID_TABLE_ID || + code == TSDB_CODE_INVALID_VNODE_ID || code == TSDB_CODE_NOT_ACTIVE_VNODE || + code == TSDB_CODE_NETWORK_UNAVAIL || code == TSDB_CODE_NOT_ACTIVE_SESSION || + code == TSDB_CODE_TABLE_ID_MISMATCH) { /* - * in case of - * 1. query cancelled(pRes->code != TSDB_CODE_QUERY_CANCELLED), do NOT re-issue the request to server. - * 2. retrieve, do NOT re-issue the retrieve request since the qhandle may have been released by server + * not_active_table: 1. the virtual node may fail to create table, since the procedure of create table is asynchronized, + * the virtual node may have not create table till now, so try again by using the new metermeta. + * 2. this requested table may have been removed by other client, so we need to renew the + * metermeta here. + * + * not_active_vnode: current vnode is move to other node due to node balance procedure or virtual node have been + * removed. So, renew metermeta and try again. + * not_active_session: db has been move to other node, the vnode does not exist on this dnode anymore. */ - if (pCmd->command != TSDB_SQL_FETCH && pCmd->command != TSDB_SQL_RETRIEVE && pCmd->command != TSDB_SQL_KILL_QUERY && - pRes->code != TSDB_CODE_QUERY_CANCELLED) { - code = tscSendMsgToServer(pSql); - if (code == 0) return NULL; - } + if (pCmd->command == TSDB_SQL_CONNECT) { + code = TSDB_CODE_NETWORK_UNAVAIL; + rpcFreeCont(pCont); + return; + } else if (pCmd->command == TSDB_SQL_HB) { + code = TSDB_CODE_NOT_READY; + rpcFreeCont(pCont); + return; + } else { + tscTrace("%p it shall renew meter meta, code:%d", pSql, code); - // renew meter meta in case it is changed - if (pCmd->command < TSDB_SQL_FETCH && pRes->code != TSDB_CODE_QUERY_CANCELLED) { pSql->maxRetry = TSDB_VNODES_SUPPORT * 2; + pSql->res.code = (uint8_t)code; // keep the previous error code + code = tscRenewMeterMeta(pSql, pMeterMetaInfo->name); - pRes->code = code; - if (code == TSDB_CODE_ACTION_IN_PROGRESS) return pSql; if (pMeterMetaInfo->pMeterMeta) { - code = tscSendMsgToServer(pSql); - if (code == 0) return pSql; + tscSendMsgToServer(pSql); + rpcFreeCont(pCont); + return; } } - } else { - uint16_t rspCode = pMsg->content[0]; - - if (rspCode == TSDB_CODE_REDIRECT) { - tscTrace("%p it shall be redirected!", pSql); - taosAddConnIntoCache(tscConnCache, thandle, pSql->ip, pSql->vnode, pObj->user); - pSql->thandle = NULL; - - if (pCmd->command > TSDB_SQL_MGMT) { - tscProcessMgmtRedirect(pSql, pMsg->content + 1); - } else if (pCmd->command == TSDB_SQL_INSERT) { - pSql->index++; - pSql->maxRetry = TSDB_VNODES_SUPPORT * 2; - } else { - pSql->index++; - } - - code = tscSendMsgToServer(pSql); - if (code == 0) return pSql; - msg = NULL; - } else if (rspCode == TSDB_CODE_NOT_ACTIVE_TABLE || rspCode == TSDB_CODE_INVALID_TABLE_ID || - rspCode == TSDB_CODE_INVALID_VNODE_ID || rspCode == TSDB_CODE_NOT_ACTIVE_VNODE || - rspCode == TSDB_CODE_NETWORK_UNAVAIL || rspCode == TSDB_CODE_NOT_ACTIVE_SESSION || - rspCode == TSDB_CODE_TABLE_ID_MISMATCH) { - /* - * not_active_table: 1. the virtual node may fail to create table, since the procedure of create table is asynchronized, - * the virtual node may have not create table till now, so try again by using the new metermeta. - * 2. this requested table may have been removed by other client, so we need to renew the - * metermeta here. - * - * not_active_vnode: current vnode is move to other node due to node balance procedure or virtual node have been - * removed. So, renew metermeta and try again. - * not_active_session: db has been move to other node, the vnode does not exist on this dnode anymore. - */ - pSql->thandle = NULL; - taosAddConnIntoCache(tscConnCache, thandle, pSql->ip, pSql->vnode, pObj->user); - - if (pCmd->command == TSDB_SQL_CONNECT) { - code = TSDB_CODE_NETWORK_UNAVAIL; - } else if (pCmd->command == TSDB_SQL_HB) { - code = TSDB_CODE_NOT_READY; - } else { - tscTrace("%p it shall renew meter meta, code:%d", pSql, rspCode); - - pSql->maxRetry = TSDB_VNODES_SUPPORT * 2; - pSql->res.code = (uint8_t)rspCode; // keep the previous error code - - code = tscRenewMeterMeta(pSql, pMeterMetaInfo->name); - if (code == TSDB_CODE_ACTION_IN_PROGRESS) return pSql; - - if (pMeterMetaInfo->pMeterMeta) { - code = tscSendMsgToServer(pSql); - if (code == 0) return pSql; - } - } + } - msg = NULL; - } else { // for other error set and return to invoker - code = rspCode; - } + if (code != TSDB_CODE_SUCCESS){ // for other error set and return to invoker + rpcFreeCont(pCont); + return; } pSql->retry = 0; - if (msg) { - if (pCmd->command < TSDB_SQL_MGMT) { - if (UTIL_METER_IS_NOMRAL_METER(pMeterMetaInfo)) { - if (pMeterMetaInfo->pMeterMeta) // it may be deleted - pMeterMetaInfo->pMeterMeta->index = pSql->index; - } else { - SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMeterMetaInfo->pMetricMeta, pMeterMetaInfo->vnodeIndex); - pVnodeSidList->index = pSql->index; - } - } else { - if (pCmd->command > TSDB_SQL_READ) - tsSlaveIndex = pSql->index; - else - tsMasterIndex = pSql->index; - } - } - if (pSql->fp == NULL) tsem_wait(&pSql->emptyRspSem); pRes->rspLen = 0; @@ -523,11 +292,11 @@ void *tscProcessMsgFromServer(char *msg, void *ahandle, void *thandle) { tscTrace("%p query is cancelled, code:%d", pSql, pRes->code); } - if (msg && pRes->code != TSDB_CODE_QUERY_CANCELLED) { - assert(pMsg->msgType == pCmd->msgType + 1); - pRes->code = pMsg->content[0]; - pRes->rspType = pMsg->msgType; - pRes->rspLen = pMsg->msgLen - sizeof(SIntMsg); + if (pRes->code != TSDB_CODE_QUERY_CANCELLED) { + assert(type == pCmd->msgType + 1); + pRes->code = (int8_t)code; + pRes->rspType = type; + pRes->rspLen = contLen; char *tmp = (char *)realloc(pRes->pRsp, pRes->rspLen); if (tmp == NULL) { @@ -535,7 +304,7 @@ void *tscProcessMsgFromServer(char *msg, void *ahandle, void *thandle) { } else { pRes->pRsp = tmp; if (pRes->rspLen) { - memcpy(pRes->pRsp, pMsg->content + 1, pRes->rspLen - 1); + memcpy(pRes->pRsp, pCont, pRes->rspLen); } } @@ -548,7 +317,7 @@ void *tscProcessMsgFromServer(char *msg, void *ahandle, void *thandle) { * There is not response callback function for submit response. * The actual inserted number of points is the first number. */ - if (pMsg->msgType == TSDB_MSG_TYPE_DNODE_SUBMIT_RSP) { + if (type == TSDB_MSG_TYPE_DNODE_SUBMIT_RSP) { pRes->numOfRows += *(int32_t *)pRes->pRsp; tscTrace("%p cmd:%d code:%d, inserted rows:%d, rsp len:%d", pSql, pCmd->command, pRes->code, @@ -558,14 +327,6 @@ void *tscProcessMsgFromServer(char *msg, void *ahandle, void *thandle) { } } - if (tscKeepConn[pCmd->command] == 0 || - (pRes->code != TSDB_CODE_SUCCESS && pRes->code != TSDB_CODE_ACTION_IN_PROGRESS)) { - if (pSql->thandle != NULL) { - taosAddConnIntoCache(tscConnCache, pSql->thandle, pSql->ip, pSql->vnode, pObj->user); - pSql->thandle = NULL; - } - } - if (pSql->fp == NULL) { tsem_post(&pSql->rspSem); } else { @@ -607,7 +368,7 @@ void *tscProcessMsgFromServer(char *msg, void *ahandle, void *thandle) { } } - return ahandle; + rpcFreeCont(pCont); } static SSqlObj *tscCreateSqlObjForSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj *prevSqlObj); @@ -767,28 +528,27 @@ int tscProcessSql(SSqlObj *pSql) { } tscTrace("%p SQL cmd:%d will be processed, name:%s, type:%d", pSql, pCmd->command, name, type); - pSql->retry = 0; if (pSql->cmd.command < TSDB_SQL_MGMT) { - pSql->maxRetry = TSDB_VNODES_SUPPORT; - // the pMeterMetaInfo cannot be NULL if (pMeterMetaInfo == NULL) { pSql->res.code = TSDB_CODE_OTHERS; return pSql->res.code; } - if (UTIL_METER_IS_NOMRAL_METER(pMeterMetaInfo)) { - pSql->index = pMeterMetaInfo->pMeterMeta->index; - } else { // it must be the parent SSqlObj for super table query - if ((pQueryInfo->type & TSDB_QUERY_TYPE_SUBQUERY) != 0) { - int32_t idx = pMeterMetaInfo->vnodeIndex; - - SVnodeSidList *pSidList = tscGetVnodeSidList(pMeterMetaInfo->pMetricMeta, idx); - pSql->index = pSidList->index; - } - } + //TODO change the connect info in metadata + return TSDB_CODE_OTHERS; +// if (UTIL_METER_IS_NOMRAL_METER(pMeterMetaInfo)) { +// pSql->index = pMeterMetaInfo->pMeterMeta->index; +// } else { // it must be the parent SSqlObj for super table query +// if ((pQueryInfo->type & TSDB_QUERY_TYPE_SUBQUERY) != 0) { +// int32_t idx = pMeterMetaInfo->vnodeIndex; +// +// SVnodeSidList *pSidList = tscGetVnodeSidList(pMeterMetaInfo->pMetricMeta, idx); +// pSql->index = pSidList->index; +// } +// } } else if (pSql->cmd.command < TSDB_SQL_LOCAL) { - pSql->index = pSql->cmd.command < TSDB_SQL_READ ? tsMasterIndex : tsSlaveIndex; + pSql->ipList = &tscMgmtIpList; } else { // local handler return (*tscProcessMsgRsp[pCmd->command])(pSql); } @@ -1309,7 +1069,7 @@ void tscKillMetricQuery(SSqlObj *pSql) { for (int i = 0; i < pSql->numOfSubs; ++i) { SSqlObj *pSub = pSql->pSubs[i]; - if (pSub == NULL || pSub->thandle == NULL) { + if (pSub == NULL) { continue; } @@ -1481,7 +1241,8 @@ void tscUpdateVnodeInSubmitMsg(SSqlObj *pSql, char *buf) { pMsg = buf + tsRpcHeadSize; - pShellMsg = (SShellSubmitMsg *)pMsg; + //TODO set iplist + //pShellMsg = (SShellSubmitMsg *)pMsg; //pShellMsg->vnode = htons(pMeterMeta->vpeerDesc[pSql->index].vnode); //tscTrace("%p update submit msg vnode:%s:%d", pSql, taosIpStr(pMeterMeta->vpeerDesc[pSql->index].ip), // htons(pShellMsg->vnode)); @@ -1514,20 +1275,21 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) { } void tscUpdateVnodeInQueryMsg(SSqlObj *pSql, char *buf) { - SSqlCmd * pCmd = &pSql->cmd; - SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0); - - char * pStart = buf + tsRpcHeadSize; - SQueryMeterMsg *pQueryMsg = (SQueryMeterMsg *)pStart; - - if (UTIL_METER_IS_NOMRAL_METER(pMeterMetaInfo)) { // pColumnModel == NULL, query on meter - SMeterMeta *pMeterMeta = pMeterMetaInfo->pMeterMeta; - pQueryMsg->vnode = htons(pMeterMeta->vpeerDesc[pSql->index].vnode); - } else { // query on metric - SMetricMeta * pMetricMeta = pMeterMetaInfo->pMetricMeta; - SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pMeterMetaInfo->vnodeIndex); - pQueryMsg->vnode = htons(pVnodeSidList->vpeerDesc[pSql->index].vnode); - } + //TODO +// SSqlCmd * pCmd = &pSql->cmd; +// SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0); +// +// char * pStart = buf + tsRpcHeadSize; +// SQueryMeterMsg *pQueryMsg = (SQueryMeterMsg *)pStart; +// +// if (UTIL_METER_IS_NOMRAL_METER(pMeterMetaInfo)) { // pColumnModel == NULL, query on meter +// SMeterMeta *pMeterMeta = pMeterMetaInfo->pMeterMeta; +// pQueryMsg->vnode = htons(pMeterMeta->vpeerDesc[pSql->index].vnode); +// } else { // query on metric +// SMetricMeta * pMetricMeta = pMeterMetaInfo->pMetricMeta; +// SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pMeterMetaInfo->vnodeIndex); +// pQueryMsg->vnode = htons(pVnodeSidList->vpeerDesc[pSql->index].vnode); +// } } /* @@ -3449,18 +3211,7 @@ int tscProcessRetrieveRspFromVnode(SSqlObj *pSql) { pRes->row = 0; - /** - * If the query result is exhausted, or current query is to free resource at server side, - * the connection will be recycled. - */ - if ((pRes->numOfRows == 0 && !(tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && pRes->offset > 0)) || - ((pQueryInfo->type & TSDB_QUERY_TYPE_FREE_RESOURCE) == TSDB_QUERY_TYPE_FREE_RESOURCE)) { - tscTrace("%p no result or free resource, recycle connection", pSql); - taosAddConnIntoCache(tscConnCache, pSql->thandle, pSql->ip, pSql->vnode, pObj->user); - pSql->thandle = NULL; - } else { - tscTrace("%p numOfRows:%d, offset:%d, not recycle connection", pSql, pRes->numOfRows, pRes->offset); - } + tscTrace("%p numOfRows:%d, offset:%d", pSql, pRes->numOfRows, pRes->offset); return 0; } diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 0ea4c64204..976cd64ba8 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -64,14 +64,18 @@ TAOS *taos_connect_imp(const char *ip, const char *user, const char *pass, const } if (ip && ip[0]) { - tscMgmtIpList.numOfIps = 3; + tscMgmtIpList.numOfIps = 2; + tscMgmtIpList.index = 0; tscMgmtIpList.ip[0] = inet_addr(ip); tscMgmtIpList.ip[1] = inet_addr(tsMasterIp); - tscMgmtIpList.ip[2] = inet_addr(tsSecondIp); - tscMgmtIpList.index = 0; - tscMgmtIpList.port = tsMgmtShellPort; + if (tsSecondIp[0]) { + tscMgmtIpList.numOfIps = 3; + tscMgmtIpList.ip[2] = inet_addr(tsSecondIp); + } } + tscMgmtIpList.port = port ? port : tsMgmtShellPort; + pObj = (STscObj *)malloc(sizeof(STscObj)); if (NULL == pObj) { globalCode = TSDB_CODE_CLI_OUT_OF_MEMORY; @@ -208,7 +212,6 @@ int taos_query_imp(STscObj *pObj, SSqlObj *pSql) { * to free connection, which may cause segment fault, when the parse phrase is not even successfully executed. */ pRes->qhandle = 0; - pSql->thandle = NULL; if (pRes->code == TSDB_CODE_SUCCESS) { tscDoQuery(pSql); @@ -713,7 +716,6 @@ void taos_free_result_imp(TAOS_RES* res, int keepCmd) { /* Query rsp is not received from vnode, so the qhandle is NULL */ tscTrace("%p qhandle is null, abort free, fp:%p", pSql, pSql->fp); if (pSql->fp != NULL) { - pSql->thandle = NULL; tscFreeSqlObj(pSql); tscTrace("%p Async SqlObj is freed by app", pSql); } else if (keepCmd) { @@ -774,7 +776,6 @@ void taos_free_result_imp(TAOS_RES* res, int keepCmd) { * * Then this object will be reused and no free operation is required. */ - pSql->thandle = NULL; if (keepCmd) { tscFreeSqlResult(pSql); tscTrace("%p sql result is freed by app while sql command is kept", pSql); @@ -785,7 +786,6 @@ void taos_free_result_imp(TAOS_RES* res, int keepCmd) { } } else { // if no free resource msg is sent to vnode, we free this object immediately. - pSql->thandle = NULL; if (pSql->fp) { assert(pRes->numOfRows == 0 || (pCmd->command > TSDB_SQL_LOCAL)); @@ -899,11 +899,6 @@ void taos_stop_query(TAOS_RES *res) { return; } - if (pSql->thandle == NULL) { - tscTrace("%p no connection, abort cancel", res); - return; - } - //taosStopRpcConn(pSql->thandle); tscTrace("%p query is cancelled", res); } @@ -1147,7 +1142,6 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) { * to free connection, which may cause segment fault, when the parse phrase is not even successfully executed. */ pRes->qhandle = 0; - pSql->thandle = NULL; free(str); if (pRes->code != TSDB_CODE_SUCCESS) { diff --git a/src/client/src/tscSub.c b/src/client/src/tscSub.c index 610c119e6d..6bf53fae37 100644 --- a/src/client/src/tscSub.c +++ b/src/client/src/tscSub.c @@ -382,7 +382,6 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) { pRes->numOfRows = 1; pRes->numOfTotal = 0; pRes->qhandle = 0; - pSql->thandle = NULL; pSql->cmd.command = TSDB_SQL_SELECT; pQueryInfo->type = type; -- GitLab