提交 a54e96fc 编写于 作者: 陶建辉(Jeff)'s avatar 陶建辉(Jeff)

fix issue #662

上级 dea69ae5
......@@ -59,7 +59,7 @@ typedef struct {
uint8_t secret[TSDB_KEY_LEN];
uint8_t ckey[TSDB_KEY_LEN];
short localPort; // for UDP only
short localPort; // for UDP only
uint32_t peerUid;
uint32_t peerIp; // peer IP
short peerPort; // peer port
......@@ -164,8 +164,8 @@ char *taosBuildReqHeader(void *param, char type, char *msg) {
pHeader->spi = 0;
pHeader->tcp = 0;
pHeader->encrypt = 0;
if (pConn->tranId == 0) __sync_add_and_fetch_32(&pConn->tranId, 1);
pHeader->tranId = __sync_add_and_fetch_32(&pConn->tranId, 1);
if (pHeader->tranId == 0) pHeader->tranId = __sync_add_and_fetch_32(&pConn->tranId, 1);
pHeader->sourceId = pConn->ownId;
pHeader->destId = pConn->peerId;
......@@ -196,8 +196,8 @@ char *taosBuildReqMsgWithSize(void *param, char type, int size) {
pHeader->spi = 0;
pHeader->tcp = 0;
pHeader->encrypt = 0;
if (pConn->tranId == 0) __sync_add_and_fetch_32(&pConn->tranId, 1);
pHeader->tranId = __sync_add_and_fetch_32(&pConn->tranId, 1);
if (pHeader->tranId == 0) pHeader->tranId = __sync_add_and_fetch_32(&pConn->tranId, 1);
pHeader->sourceId = pConn->ownId;
pHeader->destId = pConn->peerId;
......@@ -219,6 +219,11 @@ char *taosBuildRspMsgWithSize(void *param, char type, int size) {
size += sizeof(SMsgNode) + sizeof(STaosHeader) + sizeof(STaosDigest);
pMsg = (char *)malloc((size_t)size);
if (pMsg == NULL) {
tError("pConn:%p, malloc(%d) failed when building a type:%d message", pConn, size, type);
return NULL;
}
memset(pMsg, 0, (size_t)size);
pHeader = (STaosHeader *)pMsg;
pHeader->version = 1;
......@@ -324,6 +329,11 @@ void *taosOpenRpc(SRpcInit *pRpc) {
int size = (int)sizeof(SRpcChann) * pRpc->numOfChanns;
pServer->channList = (SRpcChann *)malloc((size_t)size);
if (pServer->channList == NULL) {
tError("%s, failed to malloc channList", pRpc->label);
tfree(pServer);
return NULL;
}
memset(pServer->channList, 0, (size_t)size);
pServer->shandle = (*taosInitConn[pRpc->connType])(pRpc->localIp, pRpc->localPort, pRpc->label, pRpc->numOfThreads,
......@@ -334,7 +344,14 @@ void *taosOpenRpc(SRpcInit *pRpc) {
return NULL;
}
if (pServer->numOfChanns == 1) taosOpenRpcChann(pServer, 0, pRpc->sessionsPerChann);
if (pServer->numOfChanns == 1) {
int retVal = taosOpenRpcChann(pServer, 0, pRpc->sessionsPerChann);
if (0 != retVal) {
tError("%s, failed to open rpc chann", pRpc->label);
taosCloseRpc(pServer);
return NULL;
}
}
tTrace("%s RPC is openned, numOfThreads:%d", pRpc->label, pRpc->numOfThreads);
......@@ -354,13 +371,12 @@ int taosOpenRpcChannWithQ(void *handle, int cid, int sessions, void *qhandle) {
pChann = pServer->channList + cid;
memset(pChann, 0, sizeof(SRpcChann));
int size = (int)sizeof(SRpcConn) * sessions;
pChann->connList = (SRpcConn *)malloc((size_t)size);
size_t size = sizeof(SRpcConn) * sessions;
pChann->connList = (SRpcConn *)calloc(1, size);
if (pChann->connList == NULL) {
tError("%s cid:%d, failed to allocate memory for taos connections", pServer->label, cid);
tError("%s cid:%d, failed to allocate memory for taos connections, size:%d", pServer->label, cid, size);
return -1;
}
memset(pChann->connList, 0, (size_t)size);
if (pServer->idMgmt == TAOS_ID_FREE) {
pChann->idPool = taosInitIdPool(sessions);
......@@ -387,7 +403,7 @@ int taosOpenRpcChannWithQ(void *handle, int cid, int sessions, void *qhandle) {
pChann->qhandle = qhandle ? qhandle : pServer->qhandle;
return 0;
return TSDB_CODE_SUCCESS;
}
void taosCloseRpcChann(void *handle, int cid) {
......@@ -472,7 +488,7 @@ int taosGetRpcConn(int chann, int sid, char *meterId, STaosRpc *pServer, SRpcCon
pChann = pServer->channList + chann;
if (pServer->idMgmt == TAOS_ID_FREE) {
if ((sid == 0) || (pChann->connList[sid].signature == NULL)) {
if (sid == 0) {
if (req) {
int osid = sid;
SRpcConn **ppConn = (SRpcConn **)taosGetStrHashData(pChann->hash, hashstr);
......@@ -494,7 +510,12 @@ int taosGetRpcConn(int chann, int sid, char *meterId, STaosRpc *pServer, SRpcCon
} else {
return TSDB_CODE_UNEXPECTED_RESPONSE;
}
}
} else {
if (pChann->connList[sid].signature == NULL) {
tError("%s cid:%d, sid:%d session is already released", pServer->label, chann, sid);
return TSDB_CODE_INVALID_VALUE;
}
}
}
pConn = pChann->connList + sid;
......@@ -515,8 +536,7 @@ int taosGetRpcConn(int chann, int sid, char *meterId, STaosRpc *pServer, SRpcCon
if (pServer->afp) {
int ret = (*pServer->afp)(meterId, &pConn->spi, &pConn->encrypt, pConn->secret, pConn->ckey);
if (ret != 0) {
tTrace("%s cid:%d sid:%d id:%s, meterId not there, localPort:%d pConn:%p", pServer->label, chann, sid, pConn->meterId,
pConn->localPort, pConn);
tTrace("%s cid:%d sid:%d id:%s, meterId not there pConn:%p", pServer->label, chann, sid, pConn->meterId, pConn);
return ret;
}
}
......@@ -529,8 +549,8 @@ int taosGetRpcConn(int chann, int sid, char *meterId, STaosRpc *pServer, SRpcCon
}
taosAddStrHash(pChann->hash, hashstr, (char *)&pConn);
tTrace("%s cid:%d sid:%d id:%s, TAOS connection is allocated, localPort:%d pConn:%p",
pServer->label, chann, sid, pConn->meterId, pConn->localPort, pConn);
tTrace("%s cid:%d sid:%d id:%s, TAOS connection is allocated, localPort:%d pConn:%p", pServer->label, chann, sid,
pConn->meterId, pConn->localPort, pConn);
} else {
if (memcmp(pConn->meterId, meterId, tListLen(pConn->meterId)) != 0) {
tTrace("%s cid:%d sid:%d id:%s, meterId is not matched, received:%s", pServer->label, chann, sid, pConn->meterId,
......@@ -566,9 +586,8 @@ void *taosOpenRpcConn(SRpcConnInit *pInit, uint8_t *code) {
if (taosOpenConn[pServer->type]) {
pConn->chandle = (*taosOpenConn[pServer->type])(pServer->shandle, pConn, pConn->peerIpstr, pConn->peerPort);
if (pConn->chandle) {
tTrace("%s cid:%d sid:%d id:%s, nw connection is set up, ip:%s:%hu localPort:%d pConn:%p",
pServer->label, pConn->chann, pConn->sid, pInit->meterId, pConn->peerIpstr, pConn->peerPort,
pConn->localPort, pConn);
tTrace("%s cid:%d sid:%d id:%s, nw connection is set up, ip:%s:%hu localPort:%d pConn:%p", pServer->label,
pConn->chann, pConn->sid, pInit->meterId, pConn->peerIpstr, pConn->peerPort, pConn->localPort, pConn);
} else {
tError("%s cid:%d sid:%d id:%s, failed to set up nw connection to ip:%s:%hu", pServer->label, pConn->chann,
pConn->sid, pInit->meterId, pConn->peerIpstr, pConn->peerPort);
......@@ -629,18 +648,16 @@ int taosSendDataToPeer(SRpcConn *pConn, char *data, int dataLen) {
if (pHeader->msgType & 1) {
if (pHeader->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16))
tTrace(
"%s cid:%d sid:%d id:%s, %s is sent to %s:%hu, len:%d tranId:%d "
"pConn:%p",
pServer->label, pConn->chann, pConn->sid, pConn->meterId, taosMsg[pHeader->msgType], pConn->peerIpstr,
pConn->peerPort, dataLen, pHeader->tranId, pConn);
tTrace("%s cid:%d sid:%d id:%s, %s is sent to %s:%hu, len:%d source:0x%08x dest:0x%08x tranId:%d pConn:%p",
pServer->label, pConn->chann, pConn->sid, pConn->meterId, taosMsg[pHeader->msgType], pConn->peerIpstr,
pConn->peerPort, dataLen, pHeader->sourceId, pHeader->destId, pHeader->tranId, pConn);
} else {
if (pHeader->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16))
tTrace(
"%s cid:%d sid:%d id:%s, %s is sent to %s:%hu, code:%u len:%d "
"tranId:%d pConn:%p",
"%s cid:%d sid:%d id:%s, %s is sent to %s:%hu, code:%u len:%d source:0x%08x dest:0x%08x tranId:%d pConn:%p",
pServer->label, pConn->chann, pConn->sid, pConn->meterId, taosMsg[pHeader->msgType], pConn->peerIpstr,
pConn->peerPort, (uint8_t)pHeader->content[0], dataLen, pHeader->tranId, pConn);
pConn->peerPort, (uint8_t)pHeader->content[0], dataLen, pHeader->sourceId, pHeader->destId, pHeader->tranId,
pConn);
}
writtenLen = (*taosSendData[pServer->type])(pConn->peerIp, pConn->peerPort, (char *)pHeader, dataLen, pConn->chandle);
......@@ -650,8 +667,6 @@ int taosSendDataToPeer(SRpcConn *pConn, char *data, int dataLen) {
pConn->sid, pConn->meterId, dataLen, writtenLen, strerror(errno));
// assert ( writtenLen == dataLen );
tDump(data, dataLen);
tTrace("%s msg sent, len:%d source:0x%08x dest:0x%08x tranId:%d pConn:%p", pServer->label, dataLen, pHeader->sourceId,
pHeader->destId, pHeader->tranId, pConn);
return 0;
}
......@@ -697,12 +712,13 @@ void taosProcessResponse(SRpcConn *pConn) {
if (pConn->pHead == NULL) pConn->pTail = NULL;
}
pthread_mutex_unlock(&pChann->mutex);
if (msg) {
taosSendDataToPeer(pConn, msg, msgLen);
taosTmrReset(taosProcessTaosTimer, tsRpcTimer, pConn, pChann->tmrCtrl, &pConn->pTimer);
}
pthread_mutex_unlock(&pChann->mutex);
}
int taosProcessMsgHeader(STaosHeader *pHeader, SRpcConn **ppConn, STaosRpc *pServer, int dataLen, uint32_t ip,
......@@ -712,7 +728,7 @@ int taosProcessMsgHeader(STaosHeader *pHeader, SRpcConn **ppConn, STaosRpc *pSer
SRpcChann *pChann;
int msgLen;
char hashstr[40] = {0};
int reSend = 0;
// int reSend = 0;
*ppConn = NULL;
uint32_t destId = htonl(pHeader->destId);
......@@ -794,24 +810,24 @@ int taosProcessMsgHeader(STaosHeader *pHeader, SRpcConn **ppConn, STaosRpc *pSer
delta = (int32_t)htonl(pDigest->timeStamp);
delta -= (int32_t)taosGetTimestampSec();
if (abs(delta) > 900) {
tWarn("%s cid:%d sid:%d id:%s, time diff:%d is too big, msg discarded pConn:%p, timestamp:%d",
pServer->label, chann, sid, pConn->meterId, delta, pConn, htonl(pDigest->timeStamp));
tWarn("%s cid:%d sid:%d id:%s, time diff:%d is too big, msg discarded pConn:%p, timestamp:%d", pServer->label,
chann, sid, pConn->meterId, delta, pConn, htonl(pDigest->timeStamp));
// the requirement of goldwind, should not return error in this case
code = TSDB_CODE_INVALID_TIME_STAMP;
goto _exit;
}
if (taosAuthenticateMsg((uint8_t *)pHeader, dataLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret) < 0) {
tTrace("%s cid:%d sid:%d id:%s, authentication failed, msg discarded pConn:%p",
pServer->label, chann, sid, pConn->meterId, pConn);
tTrace("%s cid:%d sid:%d id:%s, authentication failed, msg discarded pConn:%p", pServer->label, chann, sid,
pConn->meterId, pConn);
code = TSDB_CODE_AUTH_FAILURE;
goto _exit;
}
} else {
// if it is request or response with code 0, msg shall be discarded
if ((pHeader->msgType & 1) || (pHeader->content[0] == 0)) {
tTrace("%s cid:%d sid:%d id:%s, auth spi not matched, msg discarded pConn:%p",
pServer->label, chann, sid, pConn->meterId, pConn);
tTrace("%s cid:%d sid:%d id:%s, auth spi not matched, msg discarded pConn:%p", pServer->label, chann, sid,
pConn->meterId, pConn);
code = TSDB_CODE_AUTH_FAILURE;
goto _exit;
}
......@@ -829,9 +845,9 @@ int taosProcessMsgHeader(STaosHeader *pHeader, SRpcConn **ppConn, STaosRpc *pSer
pConn->peerId = pHeader->sourceId;
} else {
if (pConn->peerId != pHeader->sourceId) {
tTrace("%s cid:%d sid:%d id:%s, source Id is changed, old:0x%08x new:0x%08x pConn:%p",
pServer->label, chann, sid, pConn->meterId, pConn->peerId, pHeader->sourceId, pConn);
code = TSDB_CODE_SESSION_ALREADY_EXIST;
tTrace("%s cid:%d sid:%d id:%s, source Id is changed, old:0x%08x new:0x%08x pConn:%p", pServer->label, chann,
sid, pConn->meterId, pConn->peerId, pHeader->sourceId, pConn);
code = TSDB_CODE_INVALID_VALUE;
goto _exit;
}
}
......@@ -842,9 +858,9 @@ int taosProcessMsgHeader(STaosHeader *pHeader, SRpcConn **ppConn, STaosRpc *pSer
taosMsg[pHeader->msgType], pConn);
taosSendQuickRsp(pConn, (char)(pHeader->msgType + 1), TSDB_CODE_ACTION_IN_PROGRESS);
} else if (pConn->inType == 0) {
tTrace("%s cid:%d sid:%d id:%s, %s is already processed, tranId:%d pConn:%p",
pServer->label, chann, sid, pConn->meterId, taosMsg[pHeader->msgType], pConn->inTranId, pConn);
reSend = 1;
tTrace("%s cid:%d sid:%d id:%s, %s is already processed, tranId:%d pConn:%p", pServer->label, chann, sid,
pConn->meterId, taosMsg[pHeader->msgType], pConn->inTranId, pConn);
taosReSendRspToPeer(pConn);
} else {
tTrace("%s cid:%d sid:%d id:%s, mismatched message %s and tranId pConn:%p", pServer->label, chann, sid,
pConn->meterId, taosMsg[pHeader->msgType], pConn);
......@@ -856,8 +872,8 @@ int taosProcessMsgHeader(STaosHeader *pHeader, SRpcConn **ppConn, STaosRpc *pSer
}
if (pConn->inType != 0) {
tTrace("%s cid:%d sid:%d id:%s, last session is not finished, inTranId:%d tranId:%d pConn:%p",
pServer->label, chann, sid, pConn->meterId, pConn->inTranId, pHeader->tranId, pConn);
tTrace("%s cid:%d sid:%d id:%s, last session is not finished, inTranId:%d tranId:%d pConn:%p", pServer->label,
chann, sid, pConn->meterId, pConn->inTranId, pHeader->tranId, pConn);
code = TSDB_CODE_LAST_SESSION_NOT_FINISHED;
goto _exit;
}
......@@ -897,8 +913,8 @@ int taosProcessMsgHeader(STaosHeader *pHeader, SRpcConn **ppConn, STaosRpc *pSer
if (*pHeader->content == TSDB_CODE_ACTION_IN_PROGRESS || pHeader->tcp) {
if (pConn->tretry <= tsRpcMaxRetry) {
tTrace("%s cid:%d sid:%d id:%s, peer is still processing the transaction, pConn:%p",
pServer->label, chann, sid, pHeader->meterId, pConn);
tTrace("%s cid:%d sid:%d id:%s, peer is still processing the transaction, pConn:%p", pServer->label, chann, sid,
pHeader->meterId, pConn);
pConn->tretry++;
taosTmrReset(taosProcessTaosTimer, tsRpcProgressTime, pConn, pChann->tmrCtrl, &pConn->pTimer);
code = TSDB_CODE_ALREADY_PROCESSED;
......@@ -921,7 +937,7 @@ int taosProcessMsgHeader(STaosHeader *pHeader, SRpcConn **ppConn, STaosRpc *pSer
_exit:
pthread_mutex_unlock(&pChann->mutex);
if (reSend) taosReSendRspToPeer(pConn);
// if (reSend) taosReSendRspToPeer(pConn);
return code;
}
......@@ -1028,8 +1044,6 @@ void *taosProcessDataFromPeer(char *data, int dataLen, uint32_t ip, short port,
}
pHeader = (STaosHeader *)data;
tTrace("%s msg received, len:%d source:0x%08x dest:0x%08x tranId:%d", pServer->label, dataLen, pHeader->sourceId,
pHeader->destId, pHeader->tranId);
msgLen = (int32_t)htonl((uint32_t)pHeader->msgLen);
code = (uint8_t)taosProcessMsgHeader(pHeader, &pConn, pServer, dataLen, ip, port, chandle);
......@@ -1044,16 +1058,19 @@ void *taosProcessDataFromPeer(char *data, int dataLen, uint32_t ip, short port,
}
if (code == TSDB_CODE_ALREADY_PROCESSED) {
tTrace("%s cid:%d sid:%d id:%s, %s wont be processed tranId:%d pConn:%p", pServer->label, chann, sid,
pHeader->meterId, taosMsg[pHeader->msgType], pHeader->tranId, pConn);
tTrace("%s cid:%d sid:%d id:%s, %s wont be processed, source:0x%08x dest:0x%08x tranId:%d pConn:%p", pServer->label,
chann, sid, pHeader->meterId, taosMsg[pHeader->msgType], pHeader->sourceId, htonl(pHeader->destId),
pHeader->tranId, pConn);
free(data);
return pConn;
}
if (pHeader->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16)) {
tTrace("%s cid:%d sid:%d id:%s, %s received from 0x%x:%hu, parse code:%u, first:%u len:%d tranId:%d pConn:%p",
pServer->label, chann, sid, pHeader->meterId, taosMsg[pHeader->msgType], ip, port, code, pHeader->content[0],
dataLen, pHeader->tranId, pConn);
tTrace(
"%s cid:%d sid:%d id:%s, %s received from 0x%x:%hu, parse code:%u, first:%u len:%d source:0x%08x dest:0x%08x "
"tranId:%d pConn:%p",
pServer->label, chann, sid, pHeader->meterId, taosMsg[pHeader->msgType], ip, port, code, pHeader->content[0],
dataLen, pHeader->sourceId, htonl(pHeader->destId), pHeader->tranId, pConn);
}
if (code != 0) {
......@@ -1085,9 +1102,8 @@ void *taosProcessDataFromPeer(char *data, int dataLen, uint32_t ip, short port,
}
if (pHeader->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16)) {
tTrace("%s cid:%d sid:%d id:%s, %s is put into queue, msgLen:%d pConn:%p pTimer:%p",
pServer->label, chann, sid, pHeader->meterId, taosMsg[pHeader->msgType], pHeader->msgLen, pConn,
pConn->pTimer);
tTrace("%s cid:%d sid:%d id:%s, %s is put into queue, msgLen:%d pConn:%p pTimer:%p", pServer->label, chann, sid,
pHeader->meterId, taosMsg[pHeader->msgType], pHeader->msgLen, pConn, pConn->pTimer);
}
pChann = pServer->channList + pConn->chann;
......@@ -1168,8 +1184,8 @@ int taosSendMsgToPeerH(void *thandle, char *pCont, int contLen, void *ahandle) {
} else {
assert(pConn->pMsgNode == NULL);
if (pConn->pMsgNode) {
tError("%s cid:%d sid:%d id:%s, bug, there shall be no pengding req pConn:%p",
pServer->label, pConn->chann, pConn->sid, pConn->meterId, pConn);
tError("%s cid:%d sid:%d id:%s, bug, there shall be no pengding req pConn:%p", pServer->label, pConn->chann,
pConn->sid, pConn->meterId, pConn);
}
pConn->outType = msgType;
......@@ -1180,8 +1196,6 @@ int taosSendMsgToPeerH(void *thandle, char *pCont, int contLen, void *ahandle) {
}
}
pthread_mutex_unlock(&pChann->mutex);
if (msgLen) {
taosSendDataToPeer(pConn, (char *)pHeader, msgLen);
if (msgType & 1U) {
......@@ -1189,6 +1203,8 @@ int taosSendMsgToPeerH(void *thandle, char *pCont, int contLen, void *ahandle) {
}
}
pthread_mutex_unlock(&pChann->mutex);
return contLen;
}
......@@ -1288,12 +1304,13 @@ void taosProcessTaosTimer(void *param, void *tmrId) {
}
}
pthread_mutex_unlock(&pChann->mutex);
if (pHeader) {
(*taosSendData[pServer->type])(pConn->peerIp, pConn->peerPort, (char *)pHeader, msgLen, pConn->chandle);
taosTmrReset(taosProcessTaosTimer, tsRpcTimer, pConn, pChann->tmrCtrl, &pConn->pTimer);
}
pthread_mutex_unlock(&pChann->mutex);
}
void taosGetRpcConnInfo(void *thandle, uint32_t *peerId, uint32_t *peerIp, short *peerPort, int *cid, int *sid) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册