提交 d01a0c59 编写于 作者: S slguan

fix compile error for rpc interface changed

上级 3870a01e
......@@ -379,7 +379,7 @@ int tsParseSql(SSqlObj *pSql, bool multiVnodeInsertion);
void tscInitMsgs();
extern int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo);
void tscProcessMsgFromServer(char type, void *pCont, int contLen, void *ahandle, int32_t code);
void tscProcessMsgFromServer(SRpcMsg *rpcMsg);
int tscProcessSql(SSqlObj *pSql);
void tscAsyncInsertMultiVnodesProxy(void *param, TAOS_RES *tres, int numOfRows);
......
......@@ -320,7 +320,6 @@ void tscProcessFetchRow(SSchedMsg *pMsg) {
void tscProcessAsyncRes(SSchedMsg *pMsg) {
SSqlObj *pSql = (SSqlObj *)pMsg->ahandle;
STscObj *pTscObj = pSql->pTscObj;
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
......
......@@ -98,6 +98,7 @@ void tscSetMgmtIpList(SRpcIpSet *pIpList) {
* The retry will not be executed since only *two* retry is allowed in case of single management node in the cluster.
* Therefore, we need to multiply the retry times by factor of 2 to fix this problem.
*/
UNUSED_FUNC
static int32_t tscGetMgmtConnMaxRetryTimes() {
int32_t factor = 2;
return tscMgmtIpList.numOfIps * factor;
......@@ -185,20 +186,35 @@ int tscSendMsgToServer(SSqlObj *pSql) {
pSql->ipList->port = tsVnodeShellPort;
tscPrint("%p msg:%s is sent to server %d", pSql, taosMsg[pSql->cmd.msgType], pSql->ipList->port);
memcpy(pMsg, pSql->cmd.payload + tsRpcHeadSize, pSql->cmd.payloadLen);
rpcSendRequest(pVnodeConn, pSql->ipList, pSql->cmd.msgType, pMsg, pSql->cmd.payloadLen, pSql);
SRpcMsg rpcMsg = {
.msgType = pSql->cmd.msgType,
.pCont = pMsg,
.contLen = pSql->cmd.payloadLen,
.handle = pSql,
.code = 0
};
rpcSendRequest(pVnodeConn, pSql->ipList, &rpcMsg);
} else {
pSql->ipList->port = tsMgmtShellPort;
tscPrint("%p msg:%s is sent to server %d", pSql, taosMsg[pSql->cmd.msgType], pSql->ipList->port);
memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen);
rpcSendRequest(pTscMgmtConn, pSql->ipList, pSql->cmd.msgType, pMsg, pSql->cmd.payloadLen, pSql);
SRpcMsg rpcMsg = {
.msgType = pSql->cmd.msgType,
.pCont = pMsg,
.contLen = pSql->cmd.payloadLen,
.handle = pSql,
.code = 0
};
rpcSendRequest(pTscMgmtConn, pSql->ipList, &rpcMsg);
}
return TSDB_CODE_SUCCESS;
}
void tscProcessMsgFromServer(char type, void *pCont, int contLen, void *ahandle, int32_t code) {
tscPrint("response:%s is received, len:%d error:%s", taosMsg[(uint8_t)type], contLen, tstrerror(code));
SSqlObj *pSql = (SSqlObj *)ahandle;
void tscProcessMsgFromServer(SRpcMsg *rpcMsg) {
tscPrint("response:%s is received, len:%d error:%s", taosMsg[rpcMsg->msgType], rpcMsg->contLen, tstrerror(rpcMsg->code));
SSqlObj *pSql = (SSqlObj *)rpcMsg->handle;
if (pSql == NULL || pSql->signature != pSql) {
tscError("%p sql is already released, signature:%p", pSql, pSql->signature);
return;
......@@ -207,24 +223,24 @@ void tscProcessMsgFromServer(char type, void *pCont, int contLen, void *ahandle,
SSqlRes *pRes = &pSql->res;
SSqlCmd *pCmd = &pSql->cmd;
STscObj *pObj = pSql->pTscObj;
tscTrace("%p msg:%p is received from server", pSql, pCont);
tscTrace("%p msg:%p is received from server", pSql, rpcMsg->pCont);
if (pSql->freed || pObj->signature != pObj) {
tscTrace("%p sql is already released or DB connection is closed, freed:%d pObj:%p signature:%p", pSql, pSql->freed,
pObj, pObj->signature);
tscFreeSqlObj(pSql);
rpcFreeCont(pCont);
rpcFreeCont(rpcMsg->pCont);
return;
}
if (pCont == NULL) {
code = TSDB_CODE_NETWORK_UNAVAIL;
if (rpcMsg->pCont == NULL) {
rpcMsg->code = TSDB_CODE_NETWORK_UNAVAIL;
} else {
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
if (code == TSDB_CODE_NOT_ACTIVE_TABLE || code == TSDB_CODE_INVALID_TABLE_ID ||
code == TSDB_CODE_INVALID_VNODE_ID || code == TSDB_CODE_NOT_ACTIVE_VNODE ||
code == TSDB_CODE_NETWORK_UNAVAIL || code == TSDB_CODE_NOT_ACTIVE_SESSION ||
code == TSDB_CODE_TABLE_ID_MISMATCH) {
if (rpcMsg->code == TSDB_CODE_NOT_ACTIVE_TABLE || rpcMsg->code == TSDB_CODE_INVALID_TABLE_ID ||
rpcMsg->code == TSDB_CODE_INVALID_VNODE_ID || rpcMsg->code == TSDB_CODE_NOT_ACTIVE_VNODE ||
rpcMsg->code == TSDB_CODE_NETWORK_UNAVAIL || rpcMsg->code == TSDB_CODE_NOT_ACTIVE_SESSION ||
rpcMsg->code == TSDB_CODE_TABLE_ID_MISMATCH) {
/*
* not_active_table: 1. the virtual node may fail to create table, since the procedure of create table is asynchronized,
* the virtual node may have not create table till now, so try again by using the new metermeta.
......@@ -236,24 +252,24 @@ void tscProcessMsgFromServer(char type, void *pCont, int contLen, void *ahandle,
* not_active_session: db has been move to other node, the vnode does not exist on this dnode anymore.
*/
if (pCmd->command == TSDB_SQL_CONNECT) {
code = TSDB_CODE_NETWORK_UNAVAIL;
rpcFreeCont(pCont);
rpcMsg->code = TSDB_CODE_NETWORK_UNAVAIL;
rpcFreeCont(rpcMsg->pCont);
return;
} else if (pCmd->command == TSDB_SQL_HB) {
code = TSDB_CODE_NOT_READY;
rpcFreeCont(pCont);
rpcMsg->code = TSDB_CODE_NOT_READY;
rpcFreeCont(rpcMsg->pCont);
return;
} else {
tscTrace("%p it shall renew meter meta, code:%d", pSql, code);
tscTrace("%p it shall renew meter meta, code:%d", pSql, rpcMsg->code);
pSql->maxRetry = TSDB_VNODES_SUPPORT * 2;
pSql->res.code = (uint8_t) code; // keep the previous error code
pSql->res.code = rpcMsg->code; // keep the previous error code
code = tscRenewMeterMeta(pSql, pMeterMetaInfo->name);
rpcMsg->code = tscRenewMeterMeta(pSql, pMeterMetaInfo->name);
if (pMeterMetaInfo->pMeterMeta) {
tscSendMsgToServer(pSql);
rpcFreeCont(pCont);
rpcFreeCont(rpcMsg->pCont);
return;
}
}
......@@ -266,16 +282,16 @@ void tscProcessMsgFromServer(char type, void *pCont, int contLen, void *ahandle,
pRes->rspLen = 0;
if (pRes->code != TSDB_CODE_QUERY_CANCELLED) {
pRes->code = (code != TSDB_CODE_SUCCESS) ? code : TSDB_CODE_NETWORK_UNAVAIL;
pRes->code = (rpcMsg->code != TSDB_CODE_SUCCESS) ? rpcMsg->code : TSDB_CODE_NETWORK_UNAVAIL;
} else {
tscTrace("%p query is cancelled, code:%d", pSql, pRes->code);
}
if (pRes->code != TSDB_CODE_QUERY_CANCELLED) {
assert(type == pCmd->msgType + 1);
pRes->code = (int32_t)code;
pRes->rspType = type;
pRes->rspLen = contLen;
assert(rpcMsg->msgType == pCmd->msgType + 1);
pRes->code = (int32_t)rpcMsg->code;
pRes->rspType = rpcMsg->msgType;
pRes->rspLen = rpcMsg->contLen;
char *tmp = (char *)realloc(pRes->pRsp, pRes->rspLen);
if (tmp == NULL) {
......@@ -283,7 +299,7 @@ void tscProcessMsgFromServer(char type, void *pCont, int contLen, void *ahandle,
} else {
pRes->pRsp = tmp;
if (pRes->rspLen) {
memcpy(pRes->pRsp, pCont, pRes->rspLen);
memcpy(pRes->pRsp, rpcMsg->pCont, pRes->rspLen);
}
}
......@@ -296,8 +312,8 @@ void tscProcessMsgFromServer(char type, void *pCont, int contLen, void *ahandle,
* There is not response callback function for submit response.
* The actual inserted number of points is the first number.
*/
if (type == TSDB_MSG_TYPE_SUBMIT_RSP) {
SShellSubmitRspMsg *pMsg = pRes->pRsp;
if (rpcMsg->msgType == TSDB_MSG_TYPE_SUBMIT_RSP) {
SShellSubmitRspMsg *pMsg = (SShellSubmitRspMsg*)pRes->pRsp;
pMsg->code = htonl(pMsg->code);
pMsg->numOfRows = htonl(pMsg->numOfRows);
pMsg->affectedRows = htonl(pMsg->affectedRows);
......@@ -316,14 +332,14 @@ void tscProcessMsgFromServer(char type, void *pCont, int contLen, void *ahandle,
tsem_post(&pSql->rspSem);
} else {
if (pRes->code == TSDB_CODE_SUCCESS && tscProcessMsgRsp[pCmd->command])
code = (*tscProcessMsgRsp[pCmd->command])(pSql);
rpcMsg->code = (*tscProcessMsgRsp[pCmd->command])(pSql);
if (code != TSDB_CODE_ACTION_IN_PROGRESS) {
if (rpcMsg->code != TSDB_CODE_ACTION_IN_PROGRESS) {
int command = pCmd->command;
void *taosres = tscKeepConn[command] ? pSql : NULL;
code = pRes->code ? -pRes->code : pRes->numOfRows;
rpcMsg->code = pRes->code ? -pRes->code : pRes->numOfRows;
tscTrace("%p Async SQL result:%d res:%p", pSql, code, taosres);
tscTrace("%p Async SQL result:%d res:%p", pSql, rpcMsg->code, taosres);
/*
* Whether to free sqlObj or not should be decided before call the user defined function, since this SqlObj
......@@ -335,9 +351,9 @@ void tscProcessMsgFromServer(char type, void *pCont, int contLen, void *ahandle,
*/
bool shouldFree = tscShouldFreeAsyncSqlObj(pSql);
if (command == TSDB_SQL_INSERT) { // handle multi-vnode insertion situation
(*pSql->fp)(pSql, taosres, code);
(*pSql->fp)(pSql, taosres, rpcMsg->code);
} else {
(*pSql->fp)(pSql->param, taosres, code);
(*pSql->fp)(pSql->param, taosres, rpcMsg->code);
}
if (shouldFree) {
......@@ -353,7 +369,7 @@ void tscProcessMsgFromServer(char type, void *pCont, int contLen, void *ahandle,
}
}
rpcFreeCont(pCont);
rpcFreeCont(rpcMsg->pCont);
}
static SSqlObj *tscCreateSqlObjForSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj *prevSqlObj);
......@@ -1206,7 +1222,7 @@ int tscBuildRetrieveMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pStart = pSql->cmd.payload + tsRpcHeadSize;
pMsg = pStart;
SRetrieveTableMsg *pRetrieveMsg = (SShellSubmitMsg *)pMsg;
SRetrieveTableMsg *pRetrieveMsg = (SRetrieveTableMsg *)pMsg;
pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle);
pMsg += sizeof(pSql->res.qhandle);
......@@ -1221,13 +1237,13 @@ int tscBuildRetrieveMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
}
void tscUpdateVnodeInSubmitMsg(SSqlObj *pSql, char *buf) {
SShellSubmitMsg *pShellMsg;
char * pMsg;
SMeterMetaInfo * pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, pSql->cmd.clauseIndex, 0);
//SShellSubmitMsg *pShellMsg;
//char * pMsg;
//SMeterMetaInfo * pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, pSql->cmd.clauseIndex, 0);
STableMeta *pMeterMeta = pMeterMetaInfo->pMeterMeta;
//STableMeta *pMeterMeta = pMeterMetaInfo->pMeterMeta;
pMsg = buf + tsRpcHeadSize;
//pMsg = buf + tsRpcHeadSize;
//TODO set iplist
//pShellMsg = (SShellSubmitMsg *)pMsg;
......@@ -1999,7 +2015,7 @@ int tscBuildCreateTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
memcpy(pMsg, &pInfo->pCreateTableInfo->usingInfo.tagdata, sizeof(STagData));
pMsg += sizeof(STagData);
} else { // create (super) table
pSchema = pCreateTableMsg->schema;
pSchema = (SSchema *)pCreateTableMsg->schema;
for (int i = 0; i < pCmd->numOfCols + pCmd->count; ++i) {
TAOS_FIELD *pField = tscFieldInfoGetField(pQueryInfo, i);
......@@ -2039,7 +2055,7 @@ int tscEstimateAlterTableMsgLength(SSqlCmd *pCmd) {
int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SAlterTableMsg *pAlterTableMsg;
char * pMsg, *pStart;
char * pMsg;
int msgLen = 0;
int size = 0;
......@@ -2557,7 +2573,6 @@ int tscBuildHeartBeatMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
int tscProcessMeterMetaRsp(SSqlObj *pSql) {
STableMeta *pMeta;
SSchema * pSchema;
uint8_t ieType;
pMeta = (STableMeta *)pSql->res.pRsp;
......@@ -2661,7 +2676,7 @@ int tscProcessMultiMeterMetaRsp(SSqlObj *pSql) {
for (i = 0; i < totalNum; i++) {
SMultiTableMeta *pMultiMeta = (SMultiTableMeta *)rsp;
STableMeta * pMeta = &pMultiMeta->metas;
STableMeta * pMeta = pMultiMeta->metas;
pMeta->sid = htonl(pMeta->sid);
pMeta->sversion = htons(pMeta->sversion);
......@@ -3023,7 +3038,7 @@ int tscProcessAlterDbMsgRsp(SSqlObj *pSql) {
int tscProcessQueryRsp(SSqlObj *pSql) {
SSqlRes *pRes = &pSql->res;
SQueryTableRsp *pQuery = (SRetrieveTableRsp *)pRes->pRsp;
SQueryTableRsp *pQuery = (SQueryTableRsp *)pRes->pRsp;
pQuery->qhandle = htobe64(pQuery->qhandle);
pRes->qhandle = pQuery->qhandle;
......@@ -3035,7 +3050,6 @@ int tscProcessQueryRsp(SSqlObj *pSql) {
int tscProcessRetrieveRspFromVnode(SSqlObj *pSql) {
SSqlRes *pRes = &pSql->res;
SSqlCmd *pCmd = &pSql->cmd;
STscObj *pObj = pSql->pTscObj;
SRetrieveTableRsp *pRetrieve = (SRetrieveTableRsp *)pRes->pRsp;
......
......@@ -72,7 +72,7 @@ int32_t tscInitRpc(const char *user, const char *secret) {
rpcInit.cfp = tscProcessMsgFromServer;
rpcInit.sessions = tsMaxVnodeConnections;
rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.user = user;
rpcInit.user = (char*)user;
rpcInit.ckey = "key";
rpcInit.secret = secretEncrypt;
......
......@@ -359,7 +359,7 @@ void rpcSendRequest(void *shandle, SRpcIpSet *pIpSet, SRpcMsg *pMsg) {
// connection type is application specific.
// for TDengine, all the query, show commands shall have TCP connection
char type = pMsg->msgType;
if (type == TSDB_MSG_TYPE_DNODE_QUERY || type == TSDB_MSG_TYPE_DNODE_RETRIEVE ||
if (type == TSDB_MSG_TYPE_QUERY || type == TSDB_MSG_TYPE_RETRIEVE ||
type == TSDB_MSG_TYPE_STABLE_META || type == TSDB_MSG_TYPE_MULTI_TABLE_META ||
type == TSDB_MSG_TYPE_SHOW )
pContext->connType = RPC_CONN_TCPC;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册