提交 07e2f961 编写于 作者: H Haojun Liao

[td-225] fix error in closing the conns.

上级 cfc7da14
...@@ -210,7 +210,7 @@ void tscTagCondRelease(STagCond* pCond); ...@@ -210,7 +210,7 @@ void tscTagCondRelease(STagCond* pCond);
void tscGetSrcColumnInfo(SSrcColumnInfo* pColInfo, SQueryInfo* pQueryInfo); void tscGetSrcColumnInfo(SSrcColumnInfo* pColInfo, SQueryInfo* pQueryInfo);
void tscSetFreeHeatBeat(STscObj* pObj); void tscSetFreeHeatBeat(STscObj* pObj);
bool tscShouldFreeHeatBeat(SSqlObj* pHb); bool tscShouldFreeHeartBeat(SSqlObj* pHb);
bool tscShouldBeFreed(SSqlObj* pSql); bool tscShouldBeFreed(SSqlObj* pSql);
STableMetaInfo* tscGetTableMetaInfoFromCmd(SSqlCmd *pCmd, int32_t subClauseIndex, int32_t tableIndex); STableMetaInfo* tscGetTableMetaInfoFromCmd(SSqlCmd *pCmd, int32_t subClauseIndex, int32_t tableIndex);
......
...@@ -171,46 +171,23 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { ...@@ -171,46 +171,23 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
void tscProcessActivityTimer(void *handle, void *tmrId) { void tscProcessActivityTimer(void *handle, void *tmrId) {
STscObj *pObj = (STscObj *)handle; STscObj *pObj = (STscObj *)handle;
if (pObj == NULL || pObj->signature != pObj) {
return;
}
if (pObj == NULL) return; SSqlObj* pHB = pObj->pHb;
if (pObj->signature != pObj) return; if (pObj->pTimer != tmrId || pHB == NULL) {
if (pObj->pTimer != tmrId) return; return;
if (pObj->pHb == NULL) {
SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj));
if (NULL == pSql) return;
pSql->fp = tscProcessHeartBeatRsp;
SQueryInfo *pQueryInfo = NULL;
tscGetQueryInfoDetailSafely(&pSql->cmd, 0, &pQueryInfo);
pQueryInfo->command = TSDB_SQL_HB;
pSql->cmd.command = TSDB_SQL_HB;
if (TSDB_CODE_SUCCESS != tscAllocPayload(&(pSql->cmd), TSDB_DEFAULT_PAYLOAD_SIZE)) {
taosTFree(pSql);
return;
}
pSql->cmd.command = TSDB_SQL_HB;
pSql->param = pObj;
pSql->pTscObj = pObj;
pSql->signature = pSql;
pObj->pHb = pSql;
tscAddSubqueryInfo(&pObj->pHb->cmd);
tscDebug("%p HB is allocated, pObj:%p", pObj->pHb, pObj);
} }
if (tscShouldFreeHeatBeat(pObj->pHb)) { if (tscShouldFreeHeartBeat(pHB)) {
tscDebug("%p free HB object and release connection", pObj->pHb); tscDebug("%p free HB object and release connection", pHB);
tscFreeSqlObj(pObj->pHb); tscFreeSqlObj(pHB);
tscCloseTscObj(pObj); tscCloseTscObj(pObj);
} else { } else {
// taosMsleep(500); int32_t code = tscProcessSql(pHB);
int32_t code = tscProcessSql(pObj->pHb);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
tscError("%p failed to sent HB to server, reason:%s", pObj->pHb, tstrerror(code)); tscError("%p failed to sent HB to server, reason:%s", pHB, tstrerror(code));
} }
} }
} }
...@@ -268,6 +245,8 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { ...@@ -268,6 +245,8 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
return; return;
} }
pSql->pRpcCtx = NULL; // clear the rpcCtx
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
if (pQueryInfo != NULL && pQueryInfo->type == TSDB_QUERY_TYPE_FREE_RESOURCE) { if (pQueryInfo != NULL && pQueryInfo->type == TSDB_QUERY_TYPE_FREE_RESOURCE) {
tscDebug("%p sqlObj needs to be released or DB connection is closed, cmd:%d type:%d, pObj:%p signature:%p", tscDebug("%p sqlObj needs to be released or DB connection is closed, cmd:%d type:%d, pObj:%p signature:%p",
...@@ -1956,6 +1935,35 @@ int tscProcessShowRsp(SSqlObj *pSql) { ...@@ -1956,6 +1935,35 @@ int tscProcessShowRsp(SSqlObj *pSql) {
return 0; return 0;
} }
static void createHBObj(STscObj* pObj) {
if (pObj->pHb != NULL) {
return;
}
SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj));
if (NULL == pSql) return;
pSql->fp = tscProcessHeartBeatRsp;
SQueryInfo *pQueryInfo = NULL;
tscGetQueryInfoDetailSafely(&pSql->cmd, 0, &pQueryInfo);
pQueryInfo->command = TSDB_SQL_HB;
pSql->cmd.command = pQueryInfo->command;
if (TSDB_CODE_SUCCESS != tscAllocPayload(&(pSql->cmd), TSDB_DEFAULT_PAYLOAD_SIZE)) {
taosTFree(pSql);
return;
}
pSql->param = pObj;
pSql->pTscObj = pObj;
pSql->signature = pSql;
pObj->pHb = pSql;
tscAddSubqueryInfo(&pObj->pHb->cmd);
tscDebug("%p HB is allocated, pObj:%p", pObj->pHb, pObj);
}
int tscProcessConnectRsp(SSqlObj *pSql) { int tscProcessConnectRsp(SSqlObj *pSql) {
char temp[TSDB_TABLE_FNAME_LEN * 2]; char temp[TSDB_TABLE_FNAME_LEN * 2];
STscObj *pObj = pSql->pTscObj; STscObj *pObj = pSql->pTscObj;
...@@ -1977,6 +1985,9 @@ int tscProcessConnectRsp(SSqlObj *pSql) { ...@@ -1977,6 +1985,9 @@ int tscProcessConnectRsp(SSqlObj *pSql) {
pObj->writeAuth = pConnect->writeAuth; pObj->writeAuth = pConnect->writeAuth;
pObj->superAuth = pConnect->superAuth; pObj->superAuth = pConnect->superAuth;
pObj->connId = htonl(pConnect->connId); pObj->connId = htonl(pConnect->connId);
createHBObj(pObj);
taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer); taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer);
return 0; return 0;
......
...@@ -216,10 +216,15 @@ void taos_close(TAOS *taos) { ...@@ -216,10 +216,15 @@ void taos_close(TAOS *taos) {
} }
if (pObj->pHb != NULL) { if (pObj->pHb != NULL) {
if (pObj->pHb->pRpcCtx != NULL) { // wait for rsp from dnode
rpcCancelRequest(pObj->pHb->pRpcCtx);
}
tscSetFreeHeatBeat(pObj); tscSetFreeHeatBeat(pObj);
} else { tscFreeSqlObj(pObj->pHb);
tscCloseTscObj(pObj);
} }
tscCloseTscObj(pObj);
} }
void waitForQueryRsp(void *param, TAOS_RES *tres, int code) { void waitForQueryRsp(void *param, TAOS_RES *tres, int code) {
......
...@@ -1394,7 +1394,7 @@ void tscSetFreeHeatBeat(STscObj* pObj) { ...@@ -1394,7 +1394,7 @@ void tscSetFreeHeatBeat(STscObj* pObj) {
pQueryInfo->type = TSDB_QUERY_TYPE_FREE_RESOURCE; pQueryInfo->type = TSDB_QUERY_TYPE_FREE_RESOURCE;
} }
bool tscShouldFreeHeatBeat(SSqlObj* pHb) { bool tscShouldFreeHeartBeat(SSqlObj* pHb) {
assert(pHb == pHb->signature); assert(pHb == pHb->signature);
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pHb->cmd, 0); SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pHb->cmd, 0);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册