提交 a1ebea86 编写于 作者: S Shengliang Guan

Merge remote-tracking branch 'origin/develop' into hotfix/crash

...@@ -1351,6 +1351,7 @@ int tsParseSql(SSqlObj *pSql, bool initial) { ...@@ -1351,6 +1351,7 @@ int tsParseSql(SSqlObj *pSql, bool initial) {
static int doPackSendDataBlock(SSqlObj *pSql, int32_t numOfRows, STableDataBlocks *pTableDataBlocks) { static int doPackSendDataBlock(SSqlObj *pSql, int32_t numOfRows, STableDataBlocks *pTableDataBlocks) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
pSql->res.numOfRows = 0;
assert(pCmd->numOfClause == 1); assert(pCmd->numOfClause == 1);
STableMeta *pTableMeta = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0)->pTableMeta; STableMeta *pTableMeta = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0)->pTableMeta;
...@@ -1394,6 +1395,7 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int code) { ...@@ -1394,6 +1395,7 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int code) {
fclose(fp); fclose(fp);
pParentSql->res.code = code; pParentSql->res.code = code;
tscQueueAsyncRes(pParentSql);
return; return;
} }
...@@ -1458,8 +1460,11 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int code) { ...@@ -1458,8 +1460,11 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int code) {
free(line); free(line);
if (count > 0) { if (count > 0) {
if ((code = doPackSendDataBlock(pSql, count, pTableDataBlock)) != TSDB_CODE_SUCCESS) { code = doPackSendDataBlock(pSql, count, pTableDataBlock);
if (code != TSDB_CODE_SUCCESS) {
pParentSql->res.code = code; pParentSql->res.code = code;
tscQueueAsyncRes(pParentSql);
return;
} }
} else { } else {
......
...@@ -721,7 +721,7 @@ int32_t dnodeGetDnodeId() { ...@@ -721,7 +721,7 @@ int32_t dnodeGetDnodeId() {
} }
void dnodeSendRedirectMsg(SRpcMsg *rpcMsg, bool forShell) { void dnodeSendRedirectMsg(SRpcMsg *rpcMsg, bool forShell) {
SRpcConnInfo connInfo; SRpcConnInfo connInfo = {0};
rpcGetConnInfo(rpcMsg->handle, &connInfo); rpcGetConnInfo(rpcMsg->handle, &connInfo);
SRpcIpSet ipSet = {0}; SRpcIpSet ipSet = {0};
......
...@@ -121,8 +121,8 @@ void mnodeCleanupSystem() { ...@@ -121,8 +121,8 @@ void mnodeCleanupSystem() {
dnodeFreeMnodeWqueue(); dnodeFreeMnodeWqueue();
dnodeFreeMnodeRqueue(); dnodeFreeMnodeRqueue();
dnodeFreeMnodePqueue(); dnodeFreeMnodePqueue();
mnodeCleanupTimer();
mnodeCleanupComponents(sizeof(tsMnodeComponents) / sizeof(tsMnodeComponents[0]) - 1); mnodeCleanupComponents(sizeof(tsMnodeComponents) / sizeof(tsMnodeComponents[0]) - 1);
mnodeCleanupTimer();
mPrint("mnode is cleaned up"); mPrint("mnode is cleaned up");
} }
......
...@@ -819,6 +819,7 @@ static int rpcProcessRspHead(SRpcConn *pConn, SRpcHead *pHead) { ...@@ -819,6 +819,7 @@ static int rpcProcessRspHead(SRpcConn *pConn, SRpcHead *pHead) {
tTrace("%s, authentication shall be restarted", pConn->info); tTrace("%s, authentication shall be restarted", pConn->info);
pConn->secured = 0; pConn->secured = 0;
rpcSendMsgToPeer(pConn, pConn->pReqMsg, pConn->reqMsgLen); rpcSendMsgToPeer(pConn, pConn->pReqMsg, pConn->reqMsgLen);
if (pConn->connType != RPC_CONN_TCPC)
pConn->pTimer = taosTmrStart(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl); pConn->pTimer = taosTmrStart(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl);
return TSDB_CODE_RPC_ALREADY_PROCESSED; return TSDB_CODE_RPC_ALREADY_PROCESSED;
} }
...@@ -828,6 +829,7 @@ static int rpcProcessRspHead(SRpcConn *pConn, SRpcHead *pHead) { ...@@ -828,6 +829,7 @@ static int rpcProcessRspHead(SRpcConn *pConn, SRpcHead *pHead) {
tTrace("%s, peer is still processing the transaction, retry:%d", pConn->info, pConn->tretry); tTrace("%s, peer is still processing the transaction, retry:%d", pConn->info, pConn->tretry);
pConn->tretry++; pConn->tretry++;
rpcSendReqHead(pConn); rpcSendReqHead(pConn);
if (pConn->connType != RPC_CONN_TCPC)
pConn->pTimer = taosTmrStart(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl); pConn->pTimer = taosTmrStart(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl);
return TSDB_CODE_RPC_ALREADY_PROCESSED; return TSDB_CODE_RPC_ALREADY_PROCESSED;
} else { } else {
...@@ -896,8 +898,12 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv) { ...@@ -896,8 +898,12 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv) {
terrno = rpcProcessReqHead(pConn, pHead); terrno = rpcProcessReqHead(pConn, pHead);
pConn->connType = pRecv->connType; pConn->connType = pRecv->connType;
// client shall send the request within tsRpcTime again, double it // stop idle timer
taosTmrReset(rpcProcessIdleTimer, tsRpcTimer*2, pConn, pRpc->tmrCtrl, &pConn->pIdleTimer); taosTmrStopA(&pConn->pIdleTimer);
// client shall send the request within tsRpcTime again for UDP, double it
if (pConn->connType != RPC_CONN_TCPS)
pConn->pIdleTimer = taosTmrStart(rpcProcessIdleTimer, tsRpcTimer*2, pConn, pRpc->tmrCtrl);
} else { } else {
terrno = rpcProcessRspHead(pConn, pHead); terrno = rpcProcessRspHead(pConn, pHead);
} }
...@@ -1024,6 +1030,7 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) { ...@@ -1024,6 +1030,7 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) {
rpcAddRef(pRpc); // add the refCount for requests rpcAddRef(pRpc); // add the refCount for requests
// start the progress timer to monitor the response from server app // start the progress timer to monitor the response from server app
if (pConn->connType != RPC_CONN_TCPS)
pConn->pTimer = taosTmrStart(rpcProcessProgressTimer, tsProgressTimer, pConn, pRpc->tmrCtrl); pConn->pTimer = taosTmrStart(rpcProcessProgressTimer, tsProgressTimer, pConn, pRpc->tmrCtrl);
// notify the server app // notify the server app
...@@ -1056,9 +1063,11 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) { ...@@ -1056,9 +1063,11 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) {
for (int i=0; i<pContext->ipSet.numOfIps; ++i) for (int i=0; i<pContext->ipSet.numOfIps; ++i)
pContext->ipSet.port[i] = htons(pContext->ipSet.port[i]); pContext->ipSet.port[i] = htons(pContext->ipSet.port[i]);
rpcSendReqToServer(pRpc, pContext); rpcSendReqToServer(pRpc, pContext);
rpcFreeCont(rpcMsg.pCont);
} else if (pHead->code == TSDB_CODE_RPC_NOT_READY) { } else if (pHead->code == TSDB_CODE_RPC_NOT_READY) {
pContext->code = pHead->code; pContext->code = pHead->code;
rpcProcessConnError(pContext, NULL); rpcProcessConnError(pContext, NULL);
rpcFreeCont(rpcMsg.pCont);
} else { } else {
rpcNotifyClient(pContext, &rpcMsg); rpcNotifyClient(pContext, &rpcMsg);
} }
...@@ -1187,6 +1196,7 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { ...@@ -1187,6 +1196,7 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) {
pConn->pContext = pContext; pConn->pContext = pContext;
rpcSendMsgToPeer(pConn, msg, msgLen); rpcSendMsgToPeer(pConn, msg, msgLen);
if (pConn->connType != RPC_CONN_TCPC)
taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer); taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer);
rpcUnlockConn(pConn); rpcUnlockConn(pConn);
......
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册