提交 d2605d0e 编写于 作者: S slguan

connect from client to mgmt

上级 a241006d
...@@ -324,14 +324,12 @@ typedef struct _sql_obj { ...@@ -324,14 +324,12 @@ typedef struct _sql_obj {
short vnode; short vnode;
int64_t stime; int64_t stime;
uint32_t queryId; uint32_t queryId;
void * thandle;
SRpcIpSet *ipSet;
void * pStream; void * pStream;
void * pSubscription; void * pSubscription;
char * sqlstr; char * sqlstr;
char retry; char retry;
char maxRetry; char maxRetry;
uint8_t index; SRpcIpSet *ipList;
char freed : 4; char freed : 4;
char listed : 4; char listed : 4;
tsem_t rspSem; tsem_t rspSem;
...@@ -379,8 +377,8 @@ int tsParseSql(SSqlObj *pSql, bool multiVnodeInsertion); ...@@ -379,8 +377,8 @@ int tsParseSql(SSqlObj *pSql, bool multiVnodeInsertion);
void tscInitMsgs(); void tscInitMsgs();
extern int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo); extern int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo);
void *tscProcessMsgFromServer(char *msg, void *ahandle, void *thandle); void tscProcessMsgFromServer(char type, void *pCont, int contLen, void *ahandle, int32_t code);
int tscProcessSql(SSqlObj *pSql); int tscProcessSql(SSqlObj *pSql);
void tscAsyncInsertMultiVnodesProxy(void *param, TAOS_RES *tres, int numOfRows); void tscAsyncInsertMultiVnodesProxy(void *param, TAOS_RES *tres, int numOfRows);
......
...@@ -318,13 +318,6 @@ void tscProcessAsyncRes(SSchedMsg *pMsg) { ...@@ -318,13 +318,6 @@ void tscProcessAsyncRes(SSchedMsg *pMsg) {
int cmd = pCmd->command; int cmd = pCmd->command;
int code = pRes->code ? -pRes->code : pRes->numOfRows; int code = pRes->code ? -pRes->code : pRes->numOfRows;
if ((tscKeepConn[cmd] == 0 || (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS)) &&
pSql->pStream == NULL) {
if (pSql->thandle) taosAddConnIntoCache(tscConnCache, pSql->thandle, pSql->ip, pSql->vnode, pTscObj->user);
pSql->thandle = NULL;
}
// in case of async insert, restore the user specified callback function // in case of async insert, restore the user specified callback function
bool shouldFree = tscShouldFreeAsyncSqlObj(pSql); bool shouldFree = tscShouldFreeAsyncSqlObj(pSql);
...@@ -454,8 +447,8 @@ void tscMeterMetaCallBack(void *param, TAOS_RES *res, int code) { ...@@ -454,8 +447,8 @@ void tscMeterMetaCallBack(void *param, TAOS_RES *res, int code) {
tscTrace("%p failed to renew meterMeta", pSql); tscTrace("%p failed to renew meterMeta", pSql);
tsem_post(&pSql->rspSem); tsem_post(&pSql->rspSem);
} else { } else {
tscTrace("%p renew meterMeta successfully, command:%d, code:%d, thandle:%p, retry:%d", tscTrace("%p renew meterMeta successfully, command:%d, code:%d, retry:%d",
pSql, pSql->cmd.command, pSql->res.code, pSql->thandle, pSql->retry); pSql, pSql->cmd.command, pSql->res.code, pSql->retry);
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0, 0); SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0, 0);
assert(pMeterMetaInfo->pMeterMeta == NULL); assert(pMeterMetaInfo->pMeterMeta == NULL);
......
...@@ -451,7 +451,6 @@ static int insertStmtExecute(STscStmt* stmt) { ...@@ -451,7 +451,6 @@ static int insertStmtExecute(STscStmt* stmt) {
pRes->numOfTotalInCurrentClause = 0; pRes->numOfTotalInCurrentClause = 0;
pRes->qhandle = 0; pRes->qhandle = 0;
pSql->thandle = NULL;
tscDoQuery(pSql); tscDoQuery(pSql);
......
...@@ -145,7 +145,7 @@ void tscKillQuery(STscObj *pObj, uint32_t killId) { ...@@ -145,7 +145,7 @@ void tscKillQuery(STscObj *pObj, uint32_t killId) {
if (pSql == NULL) return; if (pSql == NULL) return;
tscTrace("%p query is killed, queryId:%d thandle:%p", pSql, killId, pSql->thandle); tscTrace("%p query is killed, queryId:%d", pSql, killId);
taos_stop_query(pSql); taos_stop_query(pSql);
} }
......
...@@ -123,8 +123,8 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { ...@@ -123,8 +123,8 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
if (pRsp->killConnection) { if (pRsp->killConnection) {
tscKillConnection(pObj); tscKillConnection(pObj);
} else { } else {
if (pRsp->queryId) tscKillQuery(pObj, pRsp->queryId); if (pRsp->queryId) tscKillQuery(pObj, htonl(pRsp->queryId));
if (pRsp->streamId) tscKillStream(pObj, pRsp->streamId); if (pRsp->streamId) tscKillStream(pObj, htonl(pRsp->streamId));
} }
} else { } else {
tscTrace("heart beat failed, code:%d", code); tscTrace("heart beat failed, code:%d", code);
...@@ -165,7 +165,7 @@ void tscProcessActivityTimer(void *handle, void *tmrId) { ...@@ -165,7 +165,7 @@ void tscProcessActivityTimer(void *handle, void *tmrId) {
} }
if (tscShouldFreeHeatBeat(pObj->pHb)) { if (tscShouldFreeHeatBeat(pObj->pHb)) {
tscTrace("%p free HB object and release connection, pConn:%p", pObj, pObj->pHb->thandle); tscTrace("%p free HB object and release connection", pObj);
//taosCloseRpcConn(pObj->pHb->thandle); //taosCloseRpcConn(pObj->pHb->thandle);
tscFreeSqlObj(pObj->pHb); tscFreeSqlObj(pObj->pHb);
...@@ -176,344 +176,113 @@ void tscProcessActivityTimer(void *handle, void *tmrId) { ...@@ -176,344 +176,113 @@ void tscProcessActivityTimer(void *handle, void *tmrId) {
tscProcessSql(pObj->pHb); tscProcessSql(pObj->pHb);
} }
void tscGetConnToMgmt(SSqlObj *pSql, uint8_t *pCode) {
STscObj *pTscObj = pSql->pTscObj;
if (pSql->retry < tscGetMgmtConnMaxRetryTimes()) {
*pCode = 0;
pSql->retry++;
pSql->index = pSql->index % tscMgmtIpList.numOfIps;
if (pSql->cmd.command > TSDB_SQL_READ && pSql->index == 0) pSql->index = 1;
void *thandle = taosGetConnFromCache(tscConnCache, tscMgmtIpList.ip[pSql->index], TSC_MGMT_VNODE, pTscObj->user);
// if (thandle == NULL) {
// SRpcConnInit connInit;
// memset(&connInit, 0, sizeof(connInit));
// connInit.cid = 0;
// connInit.sid = 0;
// connInit.meterId = pSql->pTscObj->user;
// connInit.peerId = 0;
// connInit.shandle = pTscMgmtConn;
// connInit.ahandle = pSql;
// connInit.peerPort = tsMgmtShellPort;
// connInit.spi = 1;
// connInit.encrypt = 0;
// connInit.secret = pSql->pTscObj->pass;
//
// connInit.peerIp = tscMgmtIpList.ipstr[pSql->index];
// thandle = taosOpenRpcConn(&connInit, pCode);
// }
pSql->thandle = thandle;
pSql->ip = tscMgmtIpList.ip[pSql->index];
pSql->vnode = TSC_MGMT_VNODE;
tscTrace("%p mgmt index:%d ip:0x%x is picked up, pConn:%p", pSql, pSql->index, tscMgmtIpList.ip[pSql->index],
pSql->thandle);
}
// the pSql->res.code is the previous error(status) code.
if (pSql->thandle == NULL && pSql->retry >= pSql->maxRetry) {
if (pSql->res.code != TSDB_CODE_SUCCESS && pSql->res.code != TSDB_CODE_ACTION_IN_PROGRESS) {
*pCode = pSql->res.code;
}
tscError("%p reach the max retry:%d, code:%d", pSql, pSql->retry, *pCode);
}
}
void tscGetConnToVnode(SSqlObj *pSql, uint8_t *pCode) {
SVPeerDesc *pVPeersDesc = NULL;
static int vidIndex = 0;
STscObj * pTscObj = pSql->pTscObj;
pSql->thandle = NULL;
SSqlCmd * pCmd = &pSql->cmd;
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
if (UTIL_METER_IS_SUPERTABLE(pMeterMetaInfo)) { // multiple vnode query
SVnodeSidList *vnodeList = tscGetVnodeSidList(pMeterMetaInfo->pMetricMeta, pMeterMetaInfo->vnodeIndex);
if (vnodeList != NULL) {
pVPeersDesc = vnodeList->vpeerDesc;
}
} else {
SMeterMeta *pMeta = pMeterMetaInfo->pMeterMeta;
if (pMeta == NULL) {
tscError("%p pMeterMeta is NULL", pSql);
pSql->retry = pSql->maxRetry;
return;
}
pVPeersDesc = pMeta->vpeerDesc;
}
if (pVPeersDesc == NULL) {
pSql->retry = pSql->maxRetry;
tscError("%p pVPeerDesc is NULL", pSql);
}
while (pSql->retry < pSql->maxRetry) {
(pSql->retry)++;
char ipstr[40] = {0};
if (pVPeersDesc[pSql->index].ip == 0) {
/*
* in the edge edition, ip is 0, and at this time we use masterIp instead
* in the cluster edition, ip is vnode ip
*/
pVPeersDesc[pSql->index].ip = tscMgmtIpList.ip[0];
}
*pCode = TSDB_CODE_SUCCESS;
void *thandle =
taosGetConnFromCache(tscConnCache, pVPeersDesc[pSql->index].ip, pVPeersDesc[pSql->index].vnode, pTscObj->user);
// if (thandle == NULL) {
// SRpcConnInit connInit;
// tinet_ntoa(ipstr, pVPeersDesc[pSql->index].ip);
// memset(&connInit, 0, sizeof(connInit));
// connInit.cid = vidIndex;
// connInit.sid = 0;
// connInit.spi = 0;
// connInit.encrypt = 0;
// connInit.meterId = pSql->pTscObj->user;
// connInit.peerId = htonl((pVPeersDesc[pSql->index].vnode << TSDB_SHELL_VNODE_BITS));
// connInit.shandle = pVnodeConn;
// connInit.ahandle = pSql;
// connInit.peerIp = ipstr;
// connInit.peerPort = tsVnodeShellPort;
// thandle = taosOpenRpcConn(&connInit, pCode);
// vidIndex = (vidIndex + 1) % tscNumOfThreads;
// }
pSql->thandle = thandle;
pSql->ip = pVPeersDesc[pSql->index].ip;
pSql->vnode = pVPeersDesc[pSql->index].vnode;
tscTrace("%p vnode:%d ip:%p index:%d is picked up, pConn:%p", pSql, pVPeersDesc[pSql->index].vnode,
pVPeersDesc[pSql->index].ip, pSql->index, pSql->thandle);
//TODO fetch from vpeerdesc
pSql->ipSet = &tscMgmtIpList;
break;
}
// the pSql->res.code is the previous error(status) code.
if (pSql->thandle == NULL && pSql->retry >= pSql->maxRetry) {
if (pSql->res.code != TSDB_CODE_SUCCESS && pSql->res.code != TSDB_CODE_ACTION_IN_PROGRESS) {
*pCode = pSql->res.code;
}
tscError("%p reach the max retry:%d, code:%d", pSql, pSql->retry, *pCode);
}
}
int tscSendMsgToServer(SSqlObj *pSql) { int tscSendMsgToServer(SSqlObj *pSql) {
uint8_t code = TSDB_CODE_NETWORK_UNAVAIL; uint8_t code = TSDB_CODE_NETWORK_UNAVAIL;
if (pSql->thandle == NULL) { /*
if (pSql->cmd.command < TSDB_SQL_MGMT) * the total length of message
tscGetConnToVnode(pSql, &code); * rpc header + actual message body + digest
else *
tscGetConnToMgmt(pSql, &code); * the pSql object may be released automatically during insert procedure, in which the access of
* message body by using "if (pHeader->msgType & 1)" may cause the segment fault.
*
*/
// the memory will be released by taosProcessResponse, so no memory leak here
char *pStart = rpcMallocCont(pSql->cmd.payloadLen);
if (NULL == pStart) {
tscError("%p msg:%s malloc fail", pSql, taosMsg[pSql->cmd.msgType]);
return TSDB_CODE_CLI_OUT_OF_MEMORY;
} }
if (pSql->thandle) { tscTrace("%p msg:%s is sent to server", pSql, taosMsg[pSql->cmd.msgType]);
if (pStart) {
/* /*
* the total length of message * this SQL object may be released by other thread due to the completion of this query even before the log
* rpc header + actual message body + digest * is dumped to log file. So the signature needs to be kept in a local variable.
*
* the pSql object may be released automatically during insert procedure, in which the access of
* message body by using "if (pHeader->msgType & 1)" may cause the segment fault.
*
*/ */
size_t totalLen = pSql->cmd.payloadLen + tsRpcHeadSize + 100; uint64_t signature = (uint64_t) pSql->signature;
//if (tscUpdateVnodeMsg[pSql->cmd.command]) (*tscUpdateVnodeMsg[pSql->cmd.command])(pSql, pStart);
// the memory will be released by taosProcessResponse, so no memory leak here if (pSql->cmd.command < TSDB_SQL_MGMT) {
char *pStart = rpcMallocCont(pSql->cmd.payloadLen); rpcSendRequest(pTscMgmtConn, pSql->ipList, pSql->cmd.msgType, pStart, pSql->cmd.payloadLen, pSql);
if (NULL == pStart) { } else {
tscError("%p msg:%s malloc fail", pSql, taosMsg[pSql->cmd.msgType]); rpcSendRequest(pVnodeConn, pSql->ipList, pSql->cmd.msgType, pStart, pSql->cmd.payloadLen, pSql);
return TSDB_CODE_CLI_OUT_OF_MEMORY;
} }
memcpy(pStart, pSql->cmd.payload + tsRpcHeadSize, pSql->cmd.payloadLen);
tscTrace("%p msg:%s is sent to server", pSql, taosMsg[pSql->cmd.msgType]);
if (pStart) {
/*
* this SQL object may be released by other thread due to the completion of this query even before the log
* is dumped to log file. So the signature needs to be kept in a local variable.
*/
uint64_t signature = (uint64_t)pSql->signature;
//if (tscUpdateVnodeMsg[pSql->cmd.command]) (*tscUpdateVnodeMsg[pSql->cmd.command])(pSql, pStart);
if (pSql->cmd.command < TSDB_SQL_MGMT) { tscTrace("%p send msg code:%d sig:%p", pSql, code, signature);
rpcSendRequest(pTscMgmtConn, &tscMgmtIpList, pSql->cmd.msgType, pStart, pSql->cmd.payloadLen, pSql);
} else {
SRpcIpSet rpcSet = tscMgmtIpList;
rpcSendRequest(pVnodeConn, &rpcSet, pSql->cmd.msgType, pStart, pSql->cmd.payloadLen, pSql);
}
tscTrace("%p send msg code:%d sig:%p", pSql, code, signature);
}
} }
return code; return code;
} }
void tscProcessMgmtRedirect(SSqlObj *pSql, uint8_t *cont) { void tscProcessMsgFromServer(char type, void *pCont, int contLen, void *ahandle, int32_t code) {
// SIpList *pIpList = (SIpList *)(cont);
// tscSetMgmtIpList(pIpList);
//
// if (pSql->cmd.command < TSDB_SQL_READ) {
// tsMasterIndex = 0;
// pSql->index = 0;
// } else {
// pSql->index++;
// }
//
// tscPrintMgmtIp();
}
void *tscProcessMsgFromServer(char *msg, void *ahandle, void *thandle) {
if (ahandle == NULL) return NULL;
SIntMsg *pMsg = (SIntMsg *)msg;
SSqlObj *pSql = (SSqlObj *)ahandle; SSqlObj *pSql = (SSqlObj *)ahandle;
SSqlRes *pRes = &pSql->res; if (pSql == NULL || pSql->signature != pSql) {
SSqlCmd *pCmd = &pSql->cmd;
STscObj *pObj = pSql->pTscObj;
int code = TSDB_CODE_NETWORK_UNAVAIL;
if (pSql->signature != pSql) {
tscError("%p sql is already released, signature:%p", pSql, pSql->signature); tscError("%p sql is already released, signature:%p", pSql, pSql->signature);
return NULL; return;
}
if (pSql->thandle != thandle) {
tscError("%p thandle:%p is different from received:%p", pSql, pSql->thandle, thandle);
return NULL;
} }
tscTrace("%p msg:%p is received from server, pConn:%p", pSql, msg, thandle); SSqlRes *pRes = &pSql->res;
SSqlCmd *pCmd = &pSql->cmd;
STscObj *pObj = pSql->pTscObj;
tscTrace("%p msg:%p is received from server", pSql, pCont);
if (pSql->freed || pObj->signature != pObj) { if (pSql->freed || pObj->signature != pObj) {
tscTrace("%p sql is already released or DB connection is closed, freed:%d pObj:%p signature:%p", pSql, pSql->freed, tscTrace("%p sql is already released or DB connection is closed, freed:%d pObj:%p signature:%p", pSql, pSql->freed,
pObj, pObj->signature); pObj, pObj->signature);
taosAddConnIntoCache(tscConnCache, pSql->thandle, pSql->ip, pSql->vnode, pObj->user);
tscFreeSqlObj(pSql); tscFreeSqlObj(pSql);
return ahandle; rpcFreeCont(pCont);
return;
} }
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0); SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
if (msg == NULL) { if (code == TSDB_CODE_NOT_ACTIVE_TABLE || code == TSDB_CODE_INVALID_TABLE_ID ||
tscTrace("%p no response from ip:%s", pSql, taosIpStr(pSql->ip)); code == TSDB_CODE_INVALID_VNODE_ID || code == TSDB_CODE_NOT_ACTIVE_VNODE ||
code == TSDB_CODE_NETWORK_UNAVAIL || code == TSDB_CODE_NOT_ACTIVE_SESSION ||
pSql->index++; code == TSDB_CODE_TABLE_ID_MISMATCH) {
pSql->thandle = NULL;
// todo taos_stop_query() in async model
/* /*
* in case of * not_active_table: 1. the virtual node may fail to create table, since the procedure of create table is asynchronized,
* 1. query cancelled(pRes->code != TSDB_CODE_QUERY_CANCELLED), do NOT re-issue the request to server. * the virtual node may have not create table till now, so try again by using the new metermeta.
* 2. retrieve, do NOT re-issue the retrieve request since the qhandle may have been released by server * 2. this requested table may have been removed by other client, so we need to renew the
* metermeta here.
*
* not_active_vnode: current vnode is move to other node due to node balance procedure or virtual node have been
* removed. So, renew metermeta and try again.
* not_active_session: db has been move to other node, the vnode does not exist on this dnode anymore.
*/ */
if (pCmd->command != TSDB_SQL_FETCH && pCmd->command != TSDB_SQL_RETRIEVE && pCmd->command != TSDB_SQL_KILL_QUERY && if (pCmd->command == TSDB_SQL_CONNECT) {
pRes->code != TSDB_CODE_QUERY_CANCELLED) { code = TSDB_CODE_NETWORK_UNAVAIL;
code = tscSendMsgToServer(pSql); rpcFreeCont(pCont);
if (code == 0) return NULL; return;
} } else if (pCmd->command == TSDB_SQL_HB) {
code = TSDB_CODE_NOT_READY;
rpcFreeCont(pCont);
return;
} else {
tscTrace("%p it shall renew meter meta, code:%d", pSql, code);
// renew meter meta in case it is changed
if (pCmd->command < TSDB_SQL_FETCH && pRes->code != TSDB_CODE_QUERY_CANCELLED) {
pSql->maxRetry = TSDB_VNODES_SUPPORT * 2; pSql->maxRetry = TSDB_VNODES_SUPPORT * 2;
pSql->res.code = (uint8_t)code; // keep the previous error code
code = tscRenewMeterMeta(pSql, pMeterMetaInfo->name); code = tscRenewMeterMeta(pSql, pMeterMetaInfo->name);
pRes->code = code;
if (code == TSDB_CODE_ACTION_IN_PROGRESS) return pSql;
if (pMeterMetaInfo->pMeterMeta) { if (pMeterMetaInfo->pMeterMeta) {
code = tscSendMsgToServer(pSql); tscSendMsgToServer(pSql);
if (code == 0) return pSql; rpcFreeCont(pCont);
return;
} }
} }
} else { }
uint16_t rspCode = pMsg->content[0];
if (rspCode == TSDB_CODE_REDIRECT) {
tscTrace("%p it shall be redirected!", pSql);
taosAddConnIntoCache(tscConnCache, thandle, pSql->ip, pSql->vnode, pObj->user);
pSql->thandle = NULL;
if (pCmd->command > TSDB_SQL_MGMT) {
tscProcessMgmtRedirect(pSql, pMsg->content + 1);
} else if (pCmd->command == TSDB_SQL_INSERT) {
pSql->index++;
pSql->maxRetry = TSDB_VNODES_SUPPORT * 2;
} else {
pSql->index++;
}
code = tscSendMsgToServer(pSql);
if (code == 0) return pSql;
msg = NULL;
} else if (rspCode == TSDB_CODE_NOT_ACTIVE_TABLE || rspCode == TSDB_CODE_INVALID_TABLE_ID ||
rspCode == TSDB_CODE_INVALID_VNODE_ID || rspCode == TSDB_CODE_NOT_ACTIVE_VNODE ||
rspCode == TSDB_CODE_NETWORK_UNAVAIL || rspCode == TSDB_CODE_NOT_ACTIVE_SESSION ||
rspCode == TSDB_CODE_TABLE_ID_MISMATCH) {
/*
* not_active_table: 1. the virtual node may fail to create table, since the procedure of create table is asynchronized,
* the virtual node may have not create table till now, so try again by using the new metermeta.
* 2. this requested table may have been removed by other client, so we need to renew the
* metermeta here.
*
* not_active_vnode: current vnode is move to other node due to node balance procedure or virtual node have been
* removed. So, renew metermeta and try again.
* not_active_session: db has been move to other node, the vnode does not exist on this dnode anymore.
*/
pSql->thandle = NULL;
taosAddConnIntoCache(tscConnCache, thandle, pSql->ip, pSql->vnode, pObj->user);
if (pCmd->command == TSDB_SQL_CONNECT) {
code = TSDB_CODE_NETWORK_UNAVAIL;
} else if (pCmd->command == TSDB_SQL_HB) {
code = TSDB_CODE_NOT_READY;
} else {
tscTrace("%p it shall renew meter meta, code:%d", pSql, rspCode);
pSql->maxRetry = TSDB_VNODES_SUPPORT * 2;
pSql->res.code = (uint8_t)rspCode; // keep the previous error code
code = tscRenewMeterMeta(pSql, pMeterMetaInfo->name);
if (code == TSDB_CODE_ACTION_IN_PROGRESS) return pSql;
if (pMeterMetaInfo->pMeterMeta) {
code = tscSendMsgToServer(pSql);
if (code == 0) return pSql;
}
}
msg = NULL; if (code != TSDB_CODE_SUCCESS){ // for other error set and return to invoker
} else { // for other error set and return to invoker rpcFreeCont(pCont);
code = rspCode; return;
}
} }
pSql->retry = 0; pSql->retry = 0;
if (msg) {
if (pCmd->command < TSDB_SQL_MGMT) {
if (UTIL_METER_IS_NOMRAL_METER(pMeterMetaInfo)) {
if (pMeterMetaInfo->pMeterMeta) // it may be deleted
pMeterMetaInfo->pMeterMeta->index = pSql->index;
} else {
SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMeterMetaInfo->pMetricMeta, pMeterMetaInfo->vnodeIndex);
pVnodeSidList->index = pSql->index;
}
} else {
if (pCmd->command > TSDB_SQL_READ)
tsSlaveIndex = pSql->index;
else
tsMasterIndex = pSql->index;
}
}
if (pSql->fp == NULL) tsem_wait(&pSql->emptyRspSem); if (pSql->fp == NULL) tsem_wait(&pSql->emptyRspSem);
pRes->rspLen = 0; pRes->rspLen = 0;
...@@ -523,11 +292,11 @@ void *tscProcessMsgFromServer(char *msg, void *ahandle, void *thandle) { ...@@ -523,11 +292,11 @@ void *tscProcessMsgFromServer(char *msg, void *ahandle, void *thandle) {
tscTrace("%p query is cancelled, code:%d", pSql, pRes->code); tscTrace("%p query is cancelled, code:%d", pSql, pRes->code);
} }
if (msg && pRes->code != TSDB_CODE_QUERY_CANCELLED) { if (pRes->code != TSDB_CODE_QUERY_CANCELLED) {
assert(pMsg->msgType == pCmd->msgType + 1); assert(type == pCmd->msgType + 1);
pRes->code = pMsg->content[0]; pRes->code = (int8_t)code;
pRes->rspType = pMsg->msgType; pRes->rspType = type;
pRes->rspLen = pMsg->msgLen - sizeof(SIntMsg); pRes->rspLen = contLen;
char *tmp = (char *)realloc(pRes->pRsp, pRes->rspLen); char *tmp = (char *)realloc(pRes->pRsp, pRes->rspLen);
if (tmp == NULL) { if (tmp == NULL) {
...@@ -535,7 +304,7 @@ void *tscProcessMsgFromServer(char *msg, void *ahandle, void *thandle) { ...@@ -535,7 +304,7 @@ void *tscProcessMsgFromServer(char *msg, void *ahandle, void *thandle) {
} else { } else {
pRes->pRsp = tmp; pRes->pRsp = tmp;
if (pRes->rspLen) { if (pRes->rspLen) {
memcpy(pRes->pRsp, pMsg->content + 1, pRes->rspLen - 1); memcpy(pRes->pRsp, pCont, pRes->rspLen);
} }
} }
...@@ -548,7 +317,7 @@ void *tscProcessMsgFromServer(char *msg, void *ahandle, void *thandle) { ...@@ -548,7 +317,7 @@ void *tscProcessMsgFromServer(char *msg, void *ahandle, void *thandle) {
* There is not response callback function for submit response. * There is not response callback function for submit response.
* The actual inserted number of points is the first number. * The actual inserted number of points is the first number.
*/ */
if (pMsg->msgType == TSDB_MSG_TYPE_DNODE_SUBMIT_RSP) { if (type == TSDB_MSG_TYPE_DNODE_SUBMIT_RSP) {
pRes->numOfRows += *(int32_t *)pRes->pRsp; pRes->numOfRows += *(int32_t *)pRes->pRsp;
tscTrace("%p cmd:%d code:%d, inserted rows:%d, rsp len:%d", pSql, pCmd->command, pRes->code, tscTrace("%p cmd:%d code:%d, inserted rows:%d, rsp len:%d", pSql, pCmd->command, pRes->code,
...@@ -558,14 +327,6 @@ void *tscProcessMsgFromServer(char *msg, void *ahandle, void *thandle) { ...@@ -558,14 +327,6 @@ void *tscProcessMsgFromServer(char *msg, void *ahandle, void *thandle) {
} }
} }
if (tscKeepConn[pCmd->command] == 0 ||
(pRes->code != TSDB_CODE_SUCCESS && pRes->code != TSDB_CODE_ACTION_IN_PROGRESS)) {
if (pSql->thandle != NULL) {
taosAddConnIntoCache(tscConnCache, pSql->thandle, pSql->ip, pSql->vnode, pObj->user);
pSql->thandle = NULL;
}
}
if (pSql->fp == NULL) { if (pSql->fp == NULL) {
tsem_post(&pSql->rspSem); tsem_post(&pSql->rspSem);
} else { } else {
...@@ -607,7 +368,7 @@ void *tscProcessMsgFromServer(char *msg, void *ahandle, void *thandle) { ...@@ -607,7 +368,7 @@ void *tscProcessMsgFromServer(char *msg, void *ahandle, void *thandle) {
} }
} }
return ahandle; rpcFreeCont(pCont);
} }
static SSqlObj *tscCreateSqlObjForSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj *prevSqlObj); static SSqlObj *tscCreateSqlObjForSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj *prevSqlObj);
...@@ -767,28 +528,27 @@ int tscProcessSql(SSqlObj *pSql) { ...@@ -767,28 +528,27 @@ int tscProcessSql(SSqlObj *pSql) {
} }
tscTrace("%p SQL cmd:%d will be processed, name:%s, type:%d", pSql, pCmd->command, name, type); tscTrace("%p SQL cmd:%d will be processed, name:%s, type:%d", pSql, pCmd->command, name, type);
pSql->retry = 0;
if (pSql->cmd.command < TSDB_SQL_MGMT) { if (pSql->cmd.command < TSDB_SQL_MGMT) {
pSql->maxRetry = TSDB_VNODES_SUPPORT;
// the pMeterMetaInfo cannot be NULL // the pMeterMetaInfo cannot be NULL
if (pMeterMetaInfo == NULL) { if (pMeterMetaInfo == NULL) {
pSql->res.code = TSDB_CODE_OTHERS; pSql->res.code = TSDB_CODE_OTHERS;
return pSql->res.code; return pSql->res.code;
} }
if (UTIL_METER_IS_NOMRAL_METER(pMeterMetaInfo)) { //TODO change the connect info in metadata
pSql->index = pMeterMetaInfo->pMeterMeta->index; return TSDB_CODE_OTHERS;
} else { // it must be the parent SSqlObj for super table query // if (UTIL_METER_IS_NOMRAL_METER(pMeterMetaInfo)) {
if ((pQueryInfo->type & TSDB_QUERY_TYPE_SUBQUERY) != 0) { // pSql->index = pMeterMetaInfo->pMeterMeta->index;
int32_t idx = pMeterMetaInfo->vnodeIndex; // } else { // it must be the parent SSqlObj for super table query
// if ((pQueryInfo->type & TSDB_QUERY_TYPE_SUBQUERY) != 0) {
SVnodeSidList *pSidList = tscGetVnodeSidList(pMeterMetaInfo->pMetricMeta, idx); // int32_t idx = pMeterMetaInfo->vnodeIndex;
pSql->index = pSidList->index; //
} // SVnodeSidList *pSidList = tscGetVnodeSidList(pMeterMetaInfo->pMetricMeta, idx);
} // pSql->index = pSidList->index;
// }
// }
} else if (pSql->cmd.command < TSDB_SQL_LOCAL) { } else if (pSql->cmd.command < TSDB_SQL_LOCAL) {
pSql->index = pSql->cmd.command < TSDB_SQL_READ ? tsMasterIndex : tsSlaveIndex; pSql->ipList = &tscMgmtIpList;
} else { // local handler } else { // local handler
return (*tscProcessMsgRsp[pCmd->command])(pSql); return (*tscProcessMsgRsp[pCmd->command])(pSql);
} }
...@@ -1309,7 +1069,7 @@ void tscKillMetricQuery(SSqlObj *pSql) { ...@@ -1309,7 +1069,7 @@ void tscKillMetricQuery(SSqlObj *pSql) {
for (int i = 0; i < pSql->numOfSubs; ++i) { for (int i = 0; i < pSql->numOfSubs; ++i) {
SSqlObj *pSub = pSql->pSubs[i]; SSqlObj *pSub = pSql->pSubs[i];
if (pSub == NULL || pSub->thandle == NULL) { if (pSub == NULL) {
continue; continue;
} }
...@@ -1481,7 +1241,8 @@ void tscUpdateVnodeInSubmitMsg(SSqlObj *pSql, char *buf) { ...@@ -1481,7 +1241,8 @@ void tscUpdateVnodeInSubmitMsg(SSqlObj *pSql, char *buf) {
pMsg = buf + tsRpcHeadSize; pMsg = buf + tsRpcHeadSize;
pShellMsg = (SShellSubmitMsg *)pMsg; //TODO set iplist
//pShellMsg = (SShellSubmitMsg *)pMsg;
//pShellMsg->vnode = htons(pMeterMeta->vpeerDesc[pSql->index].vnode); //pShellMsg->vnode = htons(pMeterMeta->vpeerDesc[pSql->index].vnode);
//tscTrace("%p update submit msg vnode:%s:%d", pSql, taosIpStr(pMeterMeta->vpeerDesc[pSql->index].ip), //tscTrace("%p update submit msg vnode:%s:%d", pSql, taosIpStr(pMeterMeta->vpeerDesc[pSql->index].ip),
// htons(pShellMsg->vnode)); // htons(pShellMsg->vnode));
...@@ -1514,20 +1275,21 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -1514,20 +1275,21 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
} }
void tscUpdateVnodeInQueryMsg(SSqlObj *pSql, char *buf) { void tscUpdateVnodeInQueryMsg(SSqlObj *pSql, char *buf) {
SSqlCmd * pCmd = &pSql->cmd; //TODO
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0); // SSqlCmd * pCmd = &pSql->cmd;
// SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
char * pStart = buf + tsRpcHeadSize; //
SQueryMeterMsg *pQueryMsg = (SQueryMeterMsg *)pStart; // char * pStart = buf + tsRpcHeadSize;
// SQueryMeterMsg *pQueryMsg = (SQueryMeterMsg *)pStart;
if (UTIL_METER_IS_NOMRAL_METER(pMeterMetaInfo)) { // pColumnModel == NULL, query on meter //
SMeterMeta *pMeterMeta = pMeterMetaInfo->pMeterMeta; // if (UTIL_METER_IS_NOMRAL_METER(pMeterMetaInfo)) { // pColumnModel == NULL, query on meter
pQueryMsg->vnode = htons(pMeterMeta->vpeerDesc[pSql->index].vnode); // SMeterMeta *pMeterMeta = pMeterMetaInfo->pMeterMeta;
} else { // query on metric // pQueryMsg->vnode = htons(pMeterMeta->vpeerDesc[pSql->index].vnode);
SMetricMeta * pMetricMeta = pMeterMetaInfo->pMetricMeta; // } else { // query on metric
SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pMeterMetaInfo->vnodeIndex); // SMetricMeta * pMetricMeta = pMeterMetaInfo->pMetricMeta;
pQueryMsg->vnode = htons(pVnodeSidList->vpeerDesc[pSql->index].vnode); // SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pMeterMetaInfo->vnodeIndex);
} // pQueryMsg->vnode = htons(pVnodeSidList->vpeerDesc[pSql->index].vnode);
// }
} }
/* /*
...@@ -3449,18 +3211,7 @@ int tscProcessRetrieveRspFromVnode(SSqlObj *pSql) { ...@@ -3449,18 +3211,7 @@ int tscProcessRetrieveRspFromVnode(SSqlObj *pSql) {
pRes->row = 0; pRes->row = 0;
/** tscTrace("%p numOfRows:%d, offset:%d", pSql, pRes->numOfRows, pRes->offset);
* If the query result is exhausted, or current query is to free resource at server side,
* the connection will be recycled.
*/
if ((pRes->numOfRows == 0 && !(tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && pRes->offset > 0)) ||
((pQueryInfo->type & TSDB_QUERY_TYPE_FREE_RESOURCE) == TSDB_QUERY_TYPE_FREE_RESOURCE)) {
tscTrace("%p no result or free resource, recycle connection", pSql);
taosAddConnIntoCache(tscConnCache, pSql->thandle, pSql->ip, pSql->vnode, pObj->user);
pSql->thandle = NULL;
} else {
tscTrace("%p numOfRows:%d, offset:%d, not recycle connection", pSql, pRes->numOfRows, pRes->offset);
}
return 0; return 0;
} }
......
...@@ -64,14 +64,18 @@ TAOS *taos_connect_imp(const char *ip, const char *user, const char *pass, const ...@@ -64,14 +64,18 @@ TAOS *taos_connect_imp(const char *ip, const char *user, const char *pass, const
} }
if (ip && ip[0]) { if (ip && ip[0]) {
tscMgmtIpList.numOfIps = 3; tscMgmtIpList.numOfIps = 2;
tscMgmtIpList.index = 0;
tscMgmtIpList.ip[0] = inet_addr(ip); tscMgmtIpList.ip[0] = inet_addr(ip);
tscMgmtIpList.ip[1] = inet_addr(tsMasterIp); tscMgmtIpList.ip[1] = inet_addr(tsMasterIp);
tscMgmtIpList.ip[2] = inet_addr(tsSecondIp); if (tsSecondIp[0]) {
tscMgmtIpList.index = 0; tscMgmtIpList.numOfIps = 3;
tscMgmtIpList.port = tsMgmtShellPort; tscMgmtIpList.ip[2] = inet_addr(tsSecondIp);
}
} }
tscMgmtIpList.port = port ? port : tsMgmtShellPort;
pObj = (STscObj *)malloc(sizeof(STscObj)); pObj = (STscObj *)malloc(sizeof(STscObj));
if (NULL == pObj) { if (NULL == pObj) {
globalCode = TSDB_CODE_CLI_OUT_OF_MEMORY; globalCode = TSDB_CODE_CLI_OUT_OF_MEMORY;
...@@ -208,7 +212,6 @@ int taos_query_imp(STscObj *pObj, SSqlObj *pSql) { ...@@ -208,7 +212,6 @@ int taos_query_imp(STscObj *pObj, SSqlObj *pSql) {
* to free connection, which may cause segment fault, when the parse phrase is not even successfully executed. * to free connection, which may cause segment fault, when the parse phrase is not even successfully executed.
*/ */
pRes->qhandle = 0; pRes->qhandle = 0;
pSql->thandle = NULL;
if (pRes->code == TSDB_CODE_SUCCESS) { if (pRes->code == TSDB_CODE_SUCCESS) {
tscDoQuery(pSql); tscDoQuery(pSql);
...@@ -713,7 +716,6 @@ void taos_free_result_imp(TAOS_RES* res, int keepCmd) { ...@@ -713,7 +716,6 @@ void taos_free_result_imp(TAOS_RES* res, int keepCmd) {
/* Query rsp is not received from vnode, so the qhandle is NULL */ /* Query rsp is not received from vnode, so the qhandle is NULL */
tscTrace("%p qhandle is null, abort free, fp:%p", pSql, pSql->fp); tscTrace("%p qhandle is null, abort free, fp:%p", pSql, pSql->fp);
if (pSql->fp != NULL) { if (pSql->fp != NULL) {
pSql->thandle = NULL;
tscFreeSqlObj(pSql); tscFreeSqlObj(pSql);
tscTrace("%p Async SqlObj is freed by app", pSql); tscTrace("%p Async SqlObj is freed by app", pSql);
} else if (keepCmd) { } else if (keepCmd) {
...@@ -774,7 +776,6 @@ void taos_free_result_imp(TAOS_RES* res, int keepCmd) { ...@@ -774,7 +776,6 @@ void taos_free_result_imp(TAOS_RES* res, int keepCmd) {
* *
* Then this object will be reused and no free operation is required. * Then this object will be reused and no free operation is required.
*/ */
pSql->thandle = NULL;
if (keepCmd) { if (keepCmd) {
tscFreeSqlResult(pSql); tscFreeSqlResult(pSql);
tscTrace("%p sql result is freed by app while sql command is kept", pSql); tscTrace("%p sql result is freed by app while sql command is kept", pSql);
...@@ -785,7 +786,6 @@ void taos_free_result_imp(TAOS_RES* res, int keepCmd) { ...@@ -785,7 +786,6 @@ void taos_free_result_imp(TAOS_RES* res, int keepCmd) {
} }
} else { } else {
// if no free resource msg is sent to vnode, we free this object immediately. // if no free resource msg is sent to vnode, we free this object immediately.
pSql->thandle = NULL;
if (pSql->fp) { if (pSql->fp) {
assert(pRes->numOfRows == 0 || (pCmd->command > TSDB_SQL_LOCAL)); assert(pRes->numOfRows == 0 || (pCmd->command > TSDB_SQL_LOCAL));
...@@ -899,11 +899,6 @@ void taos_stop_query(TAOS_RES *res) { ...@@ -899,11 +899,6 @@ void taos_stop_query(TAOS_RES *res) {
return; return;
} }
if (pSql->thandle == NULL) {
tscTrace("%p no connection, abort cancel", res);
return;
}
//taosStopRpcConn(pSql->thandle); //taosStopRpcConn(pSql->thandle);
tscTrace("%p query is cancelled", res); tscTrace("%p query is cancelled", res);
} }
...@@ -1147,7 +1142,6 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) { ...@@ -1147,7 +1142,6 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
* to free connection, which may cause segment fault, when the parse phrase is not even successfully executed. * to free connection, which may cause segment fault, when the parse phrase is not even successfully executed.
*/ */
pRes->qhandle = 0; pRes->qhandle = 0;
pSql->thandle = NULL;
free(str); free(str);
if (pRes->code != TSDB_CODE_SUCCESS) { if (pRes->code != TSDB_CODE_SUCCESS) {
......
...@@ -382,7 +382,6 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) { ...@@ -382,7 +382,6 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) {
pRes->numOfRows = 1; pRes->numOfRows = 1;
pRes->numOfTotal = 0; pRes->numOfTotal = 0;
pRes->qhandle = 0; pRes->qhandle = 0;
pSql->thandle = NULL;
pSql->cmd.command = TSDB_SQL_SELECT; pSql->cmd.command = TSDB_SQL_SELECT;
pQueryInfo->type = type; pQueryInfo->type = type;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册