diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 7e3b54545a5c6fb942d2a70da6c09b5f47123727..6ae6ff6d746314f77b696c4e9a38783a8f3bbd47 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -379,7 +379,7 @@ int tsParseSql(SSqlObj *pSql, bool multiVnodeInsertion); void tscInitMsgs(); extern int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo); -void tscProcessMsgFromServer(char type, void *pCont, int contLen, void *ahandle, int32_t code); +void tscProcessMsgFromServer(SRpcMsg *rpcMsg); int tscProcessSql(SSqlObj *pSql); void tscAsyncInsertMultiVnodesProxy(void *param, TAOS_RES *tres, int numOfRows); diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index 9528097d207f7d2546c3702f2c2712591fb81cc6..67ea3c5120a15ecf15e2cbdceccdd2e093bc49ef 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -320,7 +320,6 @@ void tscProcessFetchRow(SSchedMsg *pMsg) { void tscProcessAsyncRes(SSchedMsg *pMsg) { SSqlObj *pSql = (SSqlObj *)pMsg->ahandle; - STscObj *pTscObj = pSql->pTscObj; SSqlCmd *pCmd = &pSql->cmd; SSqlRes *pRes = &pSql->res; diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index c6efafe3329ff57e6d5c3f1dfadd1b49952c4beb..893b42d9caf7ad159372bc5f323f9ae93e57cc1a 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -98,6 +98,7 @@ void tscSetMgmtIpList(SRpcIpSet *pIpList) { * The retry will not be executed since only *two* retry is allowed in case of single management node in the cluster. * Therefore, we need to multiply the retry times by factor of 2 to fix this problem. */ +UNUSED_FUNC static int32_t tscGetMgmtConnMaxRetryTimes() { int32_t factor = 2; return tscMgmtIpList.numOfIps * factor; @@ -185,20 +186,35 @@ int tscSendMsgToServer(SSqlObj *pSql) { pSql->ipList->port = tsVnodeShellPort; tscPrint("%p msg:%s is sent to server %d", pSql, taosMsg[pSql->cmd.msgType], pSql->ipList->port); memcpy(pMsg, pSql->cmd.payload + tsRpcHeadSize, pSql->cmd.payloadLen); - rpcSendRequest(pVnodeConn, pSql->ipList, pSql->cmd.msgType, pMsg, pSql->cmd.payloadLen, pSql); + + SRpcMsg rpcMsg = { + .msgType = pSql->cmd.msgType, + .pCont = pMsg, + .contLen = pSql->cmd.payloadLen, + .handle = pSql, + .code = 0 + }; + rpcSendRequest(pVnodeConn, pSql->ipList, &rpcMsg); } else { pSql->ipList->port = tsMgmtShellPort; tscPrint("%p msg:%s is sent to server %d", pSql, taosMsg[pSql->cmd.msgType], pSql->ipList->port); memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen); - rpcSendRequest(pTscMgmtConn, pSql->ipList, pSql->cmd.msgType, pMsg, pSql->cmd.payloadLen, pSql); + SRpcMsg rpcMsg = { + .msgType = pSql->cmd.msgType, + .pCont = pMsg, + .contLen = pSql->cmd.payloadLen, + .handle = pSql, + .code = 0 + }; + rpcSendRequest(pTscMgmtConn, pSql->ipList, &rpcMsg); } return TSDB_CODE_SUCCESS; } -void tscProcessMsgFromServer(char type, void *pCont, int contLen, void *ahandle, int32_t code) { - tscPrint("response:%s is received, len:%d error:%s", taosMsg[(uint8_t)type], contLen, tstrerror(code)); - SSqlObj *pSql = (SSqlObj *)ahandle; +void tscProcessMsgFromServer(SRpcMsg *rpcMsg) { + tscPrint("response:%s is received, len:%d error:%s", taosMsg[rpcMsg->msgType], rpcMsg->contLen, tstrerror(rpcMsg->code)); + SSqlObj *pSql = (SSqlObj *)rpcMsg->handle; if (pSql == NULL || pSql->signature != pSql) { tscError("%p sql is already released, signature:%p", pSql, pSql->signature); return; @@ -207,24 +223,24 @@ void tscProcessMsgFromServer(char type, void *pCont, int contLen, void *ahandle, SSqlRes *pRes = &pSql->res; SSqlCmd *pCmd = &pSql->cmd; STscObj *pObj = pSql->pTscObj; - tscTrace("%p msg:%p is received from server", pSql, pCont); + tscTrace("%p msg:%p is received from server", pSql, rpcMsg->pCont); if (pSql->freed || pObj->signature != pObj) { tscTrace("%p sql is already released or DB connection is closed, freed:%d pObj:%p signature:%p", pSql, pSql->freed, pObj, pObj->signature); tscFreeSqlObj(pSql); - rpcFreeCont(pCont); + rpcFreeCont(rpcMsg->pCont); return; } - if (pCont == NULL) { - code = TSDB_CODE_NETWORK_UNAVAIL; + if (rpcMsg->pCont == NULL) { + rpcMsg->code = TSDB_CODE_NETWORK_UNAVAIL; } else { SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0); - if (code == TSDB_CODE_NOT_ACTIVE_TABLE || code == TSDB_CODE_INVALID_TABLE_ID || - code == TSDB_CODE_INVALID_VNODE_ID || code == TSDB_CODE_NOT_ACTIVE_VNODE || - code == TSDB_CODE_NETWORK_UNAVAIL || code == TSDB_CODE_NOT_ACTIVE_SESSION || - code == TSDB_CODE_TABLE_ID_MISMATCH) { + if (rpcMsg->code == TSDB_CODE_NOT_ACTIVE_TABLE || rpcMsg->code == TSDB_CODE_INVALID_TABLE_ID || + rpcMsg->code == TSDB_CODE_INVALID_VNODE_ID || rpcMsg->code == TSDB_CODE_NOT_ACTIVE_VNODE || + rpcMsg->code == TSDB_CODE_NETWORK_UNAVAIL || rpcMsg->code == TSDB_CODE_NOT_ACTIVE_SESSION || + rpcMsg->code == TSDB_CODE_TABLE_ID_MISMATCH) { /* * not_active_table: 1. the virtual node may fail to create table, since the procedure of create table is asynchronized, * the virtual node may have not create table till now, so try again by using the new metermeta. @@ -236,24 +252,24 @@ void tscProcessMsgFromServer(char type, void *pCont, int contLen, void *ahandle, * not_active_session: db has been move to other node, the vnode does not exist on this dnode anymore. */ if (pCmd->command == TSDB_SQL_CONNECT) { - code = TSDB_CODE_NETWORK_UNAVAIL; - rpcFreeCont(pCont); + rpcMsg->code = TSDB_CODE_NETWORK_UNAVAIL; + rpcFreeCont(rpcMsg->pCont); return; } else if (pCmd->command == TSDB_SQL_HB) { - code = TSDB_CODE_NOT_READY; - rpcFreeCont(pCont); + rpcMsg->code = TSDB_CODE_NOT_READY; + rpcFreeCont(rpcMsg->pCont); return; } else { - tscTrace("%p it shall renew meter meta, code:%d", pSql, code); + tscTrace("%p it shall renew meter meta, code:%d", pSql, rpcMsg->code); pSql->maxRetry = TSDB_VNODES_SUPPORT * 2; - pSql->res.code = (uint8_t) code; // keep the previous error code + pSql->res.code = rpcMsg->code; // keep the previous error code - code = tscRenewMeterMeta(pSql, pMeterMetaInfo->name); + rpcMsg->code = tscRenewMeterMeta(pSql, pMeterMetaInfo->name); if (pMeterMetaInfo->pMeterMeta) { tscSendMsgToServer(pSql); - rpcFreeCont(pCont); + rpcFreeCont(rpcMsg->pCont); return; } } @@ -266,16 +282,16 @@ void tscProcessMsgFromServer(char type, void *pCont, int contLen, void *ahandle, pRes->rspLen = 0; if (pRes->code != TSDB_CODE_QUERY_CANCELLED) { - pRes->code = (code != TSDB_CODE_SUCCESS) ? code : TSDB_CODE_NETWORK_UNAVAIL; + pRes->code = (rpcMsg->code != TSDB_CODE_SUCCESS) ? rpcMsg->code : TSDB_CODE_NETWORK_UNAVAIL; } else { tscTrace("%p query is cancelled, code:%d", pSql, pRes->code); } if (pRes->code != TSDB_CODE_QUERY_CANCELLED) { - assert(type == pCmd->msgType + 1); - pRes->code = (int32_t)code; - pRes->rspType = type; - pRes->rspLen = contLen; + assert(rpcMsg->msgType == pCmd->msgType + 1); + pRes->code = (int32_t)rpcMsg->code; + pRes->rspType = rpcMsg->msgType; + pRes->rspLen = rpcMsg->contLen; char *tmp = (char *)realloc(pRes->pRsp, pRes->rspLen); if (tmp == NULL) { @@ -283,7 +299,7 @@ void tscProcessMsgFromServer(char type, void *pCont, int contLen, void *ahandle, } else { pRes->pRsp = tmp; if (pRes->rspLen) { - memcpy(pRes->pRsp, pCont, pRes->rspLen); + memcpy(pRes->pRsp, rpcMsg->pCont, pRes->rspLen); } } @@ -296,8 +312,8 @@ void tscProcessMsgFromServer(char type, void *pCont, int contLen, void *ahandle, * There is not response callback function for submit response. * The actual inserted number of points is the first number. */ - if (type == TSDB_MSG_TYPE_SUBMIT_RSP) { - SShellSubmitRspMsg *pMsg = pRes->pRsp; + if (rpcMsg->msgType == TSDB_MSG_TYPE_SUBMIT_RSP) { + SShellSubmitRspMsg *pMsg = (SShellSubmitRspMsg*)pRes->pRsp; pMsg->code = htonl(pMsg->code); pMsg->numOfRows = htonl(pMsg->numOfRows); pMsg->affectedRows = htonl(pMsg->affectedRows); @@ -316,14 +332,14 @@ void tscProcessMsgFromServer(char type, void *pCont, int contLen, void *ahandle, tsem_post(&pSql->rspSem); } else { if (pRes->code == TSDB_CODE_SUCCESS && tscProcessMsgRsp[pCmd->command]) - code = (*tscProcessMsgRsp[pCmd->command])(pSql); + rpcMsg->code = (*tscProcessMsgRsp[pCmd->command])(pSql); - if (code != TSDB_CODE_ACTION_IN_PROGRESS) { + if (rpcMsg->code != TSDB_CODE_ACTION_IN_PROGRESS) { int command = pCmd->command; void *taosres = tscKeepConn[command] ? pSql : NULL; - code = pRes->code ? -pRes->code : pRes->numOfRows; + rpcMsg->code = pRes->code ? -pRes->code : pRes->numOfRows; - tscTrace("%p Async SQL result:%d res:%p", pSql, code, taosres); + tscTrace("%p Async SQL result:%d res:%p", pSql, rpcMsg->code, taosres); /* * Whether to free sqlObj or not should be decided before call the user defined function, since this SqlObj @@ -335,9 +351,9 @@ void tscProcessMsgFromServer(char type, void *pCont, int contLen, void *ahandle, */ bool shouldFree = tscShouldFreeAsyncSqlObj(pSql); if (command == TSDB_SQL_INSERT) { // handle multi-vnode insertion situation - (*pSql->fp)(pSql, taosres, code); + (*pSql->fp)(pSql, taosres, rpcMsg->code); } else { - (*pSql->fp)(pSql->param, taosres, code); + (*pSql->fp)(pSql->param, taosres, rpcMsg->code); } if (shouldFree) { @@ -353,7 +369,7 @@ void tscProcessMsgFromServer(char type, void *pCont, int contLen, void *ahandle, } } - rpcFreeCont(pCont); + rpcFreeCont(rpcMsg->pCont); } static SSqlObj *tscCreateSqlObjForSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj *prevSqlObj); @@ -1206,7 +1222,7 @@ int tscBuildRetrieveMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pStart = pSql->cmd.payload + tsRpcHeadSize; pMsg = pStart; - SRetrieveTableMsg *pRetrieveMsg = (SShellSubmitMsg *)pMsg; + SRetrieveTableMsg *pRetrieveMsg = (SRetrieveTableMsg *)pMsg; pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle); pMsg += sizeof(pSql->res.qhandle); @@ -1221,13 +1237,13 @@ int tscBuildRetrieveMsg(SSqlObj *pSql, SSqlInfo *pInfo) { } void tscUpdateVnodeInSubmitMsg(SSqlObj *pSql, char *buf) { - SShellSubmitMsg *pShellMsg; - char * pMsg; - SMeterMetaInfo * pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, pSql->cmd.clauseIndex, 0); + //SShellSubmitMsg *pShellMsg; + //char * pMsg; + //SMeterMetaInfo * pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, pSql->cmd.clauseIndex, 0); - STableMeta *pMeterMeta = pMeterMetaInfo->pMeterMeta; + //STableMeta *pMeterMeta = pMeterMetaInfo->pMeterMeta; - pMsg = buf + tsRpcHeadSize; + //pMsg = buf + tsRpcHeadSize; //TODO set iplist //pShellMsg = (SShellSubmitMsg *)pMsg; @@ -1999,7 +2015,7 @@ int tscBuildCreateTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) { memcpy(pMsg, &pInfo->pCreateTableInfo->usingInfo.tagdata, sizeof(STagData)); pMsg += sizeof(STagData); } else { // create (super) table - pSchema = pCreateTableMsg->schema; + pSchema = (SSchema *)pCreateTableMsg->schema; for (int i = 0; i < pCmd->numOfCols + pCmd->count; ++i) { TAOS_FIELD *pField = tscFieldInfoGetField(pQueryInfo, i); @@ -2039,7 +2055,7 @@ int tscEstimateAlterTableMsgLength(SSqlCmd *pCmd) { int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) { SAlterTableMsg *pAlterTableMsg; - char * pMsg, *pStart; + char * pMsg; int msgLen = 0; int size = 0; @@ -2557,7 +2573,6 @@ int tscBuildHeartBeatMsg(SSqlObj *pSql, SSqlInfo *pInfo) { int tscProcessMeterMetaRsp(SSqlObj *pSql) { STableMeta *pMeta; SSchema * pSchema; - uint8_t ieType; pMeta = (STableMeta *)pSql->res.pRsp; @@ -2661,7 +2676,7 @@ int tscProcessMultiMeterMetaRsp(SSqlObj *pSql) { for (i = 0; i < totalNum; i++) { SMultiTableMeta *pMultiMeta = (SMultiTableMeta *)rsp; - STableMeta * pMeta = &pMultiMeta->metas; + STableMeta * pMeta = pMultiMeta->metas; pMeta->sid = htonl(pMeta->sid); pMeta->sversion = htons(pMeta->sversion); @@ -3023,7 +3038,7 @@ int tscProcessAlterDbMsgRsp(SSqlObj *pSql) { int tscProcessQueryRsp(SSqlObj *pSql) { SSqlRes *pRes = &pSql->res; - SQueryTableRsp *pQuery = (SRetrieveTableRsp *)pRes->pRsp; + SQueryTableRsp *pQuery = (SQueryTableRsp *)pRes->pRsp; pQuery->qhandle = htobe64(pQuery->qhandle); pRes->qhandle = pQuery->qhandle; @@ -3035,7 +3050,6 @@ int tscProcessQueryRsp(SSqlObj *pSql) { int tscProcessRetrieveRspFromVnode(SSqlObj *pSql) { SSqlRes *pRes = &pSql->res; SSqlCmd *pCmd = &pSql->cmd; - STscObj *pObj = pSql->pTscObj; SRetrieveTableRsp *pRetrieve = (SRetrieveTableRsp *)pRes->pRsp; diff --git a/src/client/src/tscSystem.c b/src/client/src/tscSystem.c index 6e4653c2d442e58e6d8cfa58a4ed402dc44b725a..ced0d76ca4360a1ae712b6d9ce01a5d6811481d3 100644 --- a/src/client/src/tscSystem.c +++ b/src/client/src/tscSystem.c @@ -72,7 +72,7 @@ int32_t tscInitRpc(const char *user, const char *secret) { rpcInit.cfp = tscProcessMsgFromServer; rpcInit.sessions = tsMaxVnodeConnections; rpcInit.connType = TAOS_CONN_CLIENT; - rpcInit.user = user; + rpcInit.user = (char*)user; rpcInit.ckey = "key"; rpcInit.secret = secretEncrypt; diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index 89cf34fe3832c391394c333b806e3549723ddb8d..505fba94ce51d2a398f3fb5369862f29dc5ad5e5 100755 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -359,7 +359,7 @@ void rpcSendRequest(void *shandle, SRpcIpSet *pIpSet, SRpcMsg *pMsg) { // connection type is application specific. // for TDengine, all the query, show commands shall have TCP connection char type = pMsg->msgType; - if (type == TSDB_MSG_TYPE_DNODE_QUERY || type == TSDB_MSG_TYPE_DNODE_RETRIEVE || + if (type == TSDB_MSG_TYPE_QUERY || type == TSDB_MSG_TYPE_RETRIEVE || type == TSDB_MSG_TYPE_STABLE_META || type == TSDB_MSG_TYPE_MULTI_TABLE_META || type == TSDB_MSG_TYPE_SHOW ) pContext->connType = RPC_CONN_TCPC;