提交 02994997 编写于 作者: 陶建辉(Jeff)'s avatar 陶建辉(Jeff)

put all the protections all multi-threads

上级 83edc110
......@@ -80,53 +80,51 @@ typedef struct _RpcConn {
int sid; // session ID
uint32_t ownId; // own link ID
uint32_t peerId; // peer link ID
char meterId[TSDB_UNI_LEN];
char spi;
char encrypt;
uint8_t secret[TSDB_KEY_LEN];
uint8_t ckey[TSDB_KEY_LEN];
char meterId[TSDB_UNI_LEN]; // user ID for the link
char spi; // security parameter index
char encrypt; // encryption, 0:1
uint8_t secret[TSDB_KEY_LEN]; // secret for the link
uint8_t ckey[TSDB_KEY_LEN]; // ciphering key
uint16_t localPort; // for UDP only
uint32_t peerUid;
uint32_t peerUid; // peer UID
uint32_t peerIp; // peer IP
uint16_t peerPort; // peer port
char peerIpstr[20]; // peer IP string
uint16_t tranId; // outgoing transcation ID, for build message
uint16_t outTranId; // outgoing transcation ID
uint16_t inTranId;
uint8_t outType;
char inType;
uint16_t inTranId; // transcation ID for incoming msg
uint8_t outType; // message type for outgoing request
char inType; // message type for incoming request
void *chandle; // handle passed by TCP/UDP connection layer
void *ahandle; // handle provided by upper app layter
int retry;
int retry; // number of retry for sending request
int tretry; // total retry
void *pTimer;
void *pIdleTimer;
char *pRspMsg; // including header
int rspMsgLen;
char *pReqMsg; // including header
int reqMsgLen;
SRpcInfo *pRpc;
SRpcReqContext *pContext;
void *pTimer; // retry timer to monitor the response
void *pIdleTimer; // idle timer
char *pRspMsg; // response message including header
int rspMsgLen; // response messag length
char *pReqMsg; // request message including header
int reqMsgLen; // request message length
SRpcInfo *pRpc; // the associated SRpcInfo
SRpcReqContext *pContext; // request context
} SRpcConn;
typedef struct {
char version : 4;
char comp : 4;
char tcp : 2;
char spi : 3;
char encrypt : 3;
uint16_t tranId;
uint32_t uid; // for unique ID inside a client
uint32_t sourceId;
uint32_t destId;
uint32_t destIp;
char meterId[TSDB_UNI_LEN];
uint16_t port; // for UDP only
char empty[1];
uint8_t msgType;
int32_t msgLen;
uint8_t content[0];
char version:4; // RPC version
char comp:4; // compression algorithm, 0:no compression 1:lz4
char tcp:2; // tcp flag
char spi:3; // security parameter index
char encrypt:3; // encrypt algorithm, 0: no encryption
uint16_t tranId; // transcation ID
uint32_t uid; // for unique ID inside a client
uint32_t sourceId; // source ID, an index for connection list
uint32_t destId; // destination ID, an index for connection list
char meterId[TSDB_UNI_LEN];
uint16_t port; // for UDP only, port may be changed
char empty[1]; // reserved
uint8_t msgType; // message type
int32_t msgLen; // message length including the header iteslf
uint8_t content[0]; // message body starts from here
} SRpcHeader;
typedef struct {
......@@ -178,7 +176,9 @@ void (*taosCloseConn[])(void *chandle) = {
static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerIpStr, uint16_t peerPort);
static void rpcCloseConn(void *thandle);
static SRpcConn *rpcSetConnToServer(void *shandle, SRpcIpSet ipSet);
static int rpcGetConnObj(int sid, char *meterId, SRpcInfo *pRpc, SRpcConn **ppConn, char req, char *hashstr);
static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc);
static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, char *meterId, char *hashstr);
static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, char *meterId, char *hashstr);
static void rpcSendReqToServer(SRpcConn *pConn, SRpcReqContext *pContext);
static void rpcSendQuickRsp(SRpcConn *pConn, char code);
......@@ -190,6 +190,7 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHeader *pHeader);
static void rpcProcessConnError(void *param, void *id);
static void rpcProcessRetryTimer(void *, void *);
static void rpcProcessIdleTimer(void *param, void *tmrId);
static void rpcProcessProgressTimer(void *param, void *tmrId);
static void rpcFreeMsg(void *msg);
static int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen);
......@@ -350,6 +351,12 @@ void rpcSendResponse(void *handle, void *pCont, int contLen) {
pthread_mutex_lock(&pRpc->mutex);
if ( pConn->inType == 0 || pConn->meterId[0] == 0 ) {
tTrace("%s pConn:%p, connection is already released, rsp wont be sent", pRpc->label, pConn);
pthread_mutex_lock(&pRpc->mutex);
return;
}
// set msg header
pHeader->version = 1;
pHeader->msgType = pConn->inType+1;
......@@ -364,14 +371,16 @@ void rpcSendResponse(void *handle, void *pCont, int contLen) {
// set pConn parameters
pConn->inType = 0;
rpcFreeMsg(pConn->pRspMsg);
// response message is released until new response is sent
rpcFreeMsg(pConn->pRspMsg);
pConn->pRspMsg = msg;
pConn->rspMsgLen = msgLen;
if (pHeader->content[0] == TSDB_CODE_ACTION_IN_PROGRESS) pConn->inTranId--;
pthread_mutex_unlock(&pRpc->mutex);
taosTmrStopA(&pConn->pTimer);
rpcSendDataToPeer(pConn, msg, msgLen);
return;
......@@ -401,29 +410,26 @@ void rpcSendSimpleRsp(void *thandle, int32_t code) {
static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerIpStr, uint16_t peerPort) {
SRpcConn *pConn;
if ( (uint8_t)(rpcGetConnObj(0, pRpc->meterId, pRpc, &pConn, 1, NULL)) != 0 )
return NULL;
pConn = rpcAllocateClientConn(pRpc);
strcpy(pConn->peerIpstr, peerIpStr);
pConn->peerIp = inet_addr(peerIpStr);
pConn->peerPort = peerPort;
pConn->spi = pRpc->spi;
pConn->encrypt = pRpc->encrypt;
if (pConn->spi) memcpy(pConn->secret, pRpc->secret, TSDB_KEY_LEN);
strcpy(pConn->meterId, pRpc->meterId);
// if it is client, it shall set up connection first
if (taosOpenConn[pRpc->connType]) {
pConn->chandle = (*taosOpenConn[pRpc->connType])(pRpc->shandle, pConn, pConn->peerIpstr, pConn->peerPort);
if (pConn->chandle) {
tTrace("%s pConn:%p, rpc connection is set up, sid:%d id:%s ip:%s:%hu localPort:%d", pRpc->label,
pConn, pConn->sid, pRpc->meterId, pConn->peerIpstr, pConn->peerPort, pConn->localPort);
} else {
tError("%s pConn:%p, failed to set up nw connection to ip:%s:%hu", pRpc->label, pConn,
pConn->sid, pRpc->meterId, pConn->peerIpstr, pConn->peerPort);
terrno = TSDB_CODE_NETWORK_UNAVAIL;
rpcCloseConn(pConn);
pConn = NULL;
if (pConn) {
strcpy(pConn->peerIpstr, peerIpStr);
pConn->peerIp = inet_addr(peerIpStr);
pConn->peerPort = peerPort;
strcpy(pConn->meterId, pRpc->meterId);
if (taosOpenConn[pRpc->connType]) {
pConn->chandle = (*taosOpenConn[pRpc->connType])(pRpc->shandle, pConn, pConn->peerIpstr, pConn->peerPort);
if (pConn->chandle) {
tTrace("%s pConn:%p, rpc connection is set up, sid:%d id:%s ip:%s:%hu localPort:%d", pRpc->label,
pConn, pConn->sid, pRpc->meterId, pConn->peerIpstr, pConn->peerPort, pConn->localPort);
} else {
tError("%s pConn:%p, failed to set up nw connection to ip:%s:%hu", pRpc->label, pConn,
pConn->sid, pRpc->meterId, pConn->peerIpstr, pConn->peerPort);
terrno = TSDB_CODE_NETWORK_UNAVAIL;
rpcCloseConn(pConn);
pConn = NULL;
}
}
}
......@@ -432,10 +438,7 @@ static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerIpStr, uint16_t peerPort)
static void rpcCloseConn(void *thandle) {
SRpcConn *pConn = (SRpcConn *)thandle;
assert(pConn);
SRpcInfo *pRpc = pConn->pRpc;
assert(pRpc);
pthread_mutex_lock(&pRpc->mutex);
......@@ -443,83 +446,111 @@ static void rpcCloseConn(void *thandle) {
taosTmrStopA(&pConn->pTimer);
taosTmrStopA(&pConn->pIdleTimer);
rpcFreeMsg(pConn->pRspMsg);
rpcFreeMsg(pConn->pReqMsg);
char hashstr[40] = {0};
sprintf(hashstr, "%x:%x:%x", pConn->peerIp, pConn->peerUid, pConn->peerId);
taosDeleteStrHash(pRpc->hash, hashstr);
if ( pRpc->connType == TAOS_CONN_UDPS || TAOS_CONN_TCPS) {
char hashstr[40] = {0};
sprintf(hashstr, "%x:%x:%x", pConn->peerIp, pConn->peerUid, pConn->peerId);
taosDeleteStrHash(pRpc->hash, hashstr);
rpcFreeMsg(pConn->pRspMsg); // it may have a response msg saved, but not request msg
}
if (pRpc->idPool) taosFreeId(pRpc->idPool, pConn->sid);
taosFreeId(pRpc->idPool, pConn->sid);
tTrace("%s pConn:%p, TAOS connection closed", pRpc->label, pConn);
memset(pConn, 0, sizeof(SRpcConn));
// reset the link parameters
pConn->meterId[0] = 0;
pConn->outType = 0;
pConn->inType = 0;
pConn->inTranId = 0;
pConn->outTranId = 0;
pConn->pReqMsg = NULL;
pConn->reqMsgLen = 0;
pConn->pRspMsg = NULL;
pConn->rspMsgLen = 0;
pConn->pContext = NULL;
tTrace("%s pConn:%p, rpc connection is closed", pRpc->label, pConn);
pthread_mutex_unlock(&pRpc->mutex);
}
static int rpcGetConnObj(int sid, char *meterId, SRpcInfo *pRpc, SRpcConn **ppConn, char req, char *hashstr) {
SRpcConn * pConn = NULL;
static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc) {
SRpcConn *pConn = NULL;
if (sid == 0) {
if (req) {
int osid = sid;
SRpcConn **ppConn = (SRpcConn **)taosGetStrHashData(pRpc->hash, hashstr);
if (ppConn) pConn = *ppConn;
if (pConn == NULL) {
sid = taosAllocateId(pRpc->idPool);
if (sid <= 0) {
tError("%s maximum number of sessions:%d is reached", pRpc->label, pRpc->sessions);
return TSDB_CODE_MAX_SESSIONS;
} else {
tTrace("%s sid:%d, ID allocated, used:%d, old id:%d", pRpc->label, sid,
taosIdPoolNumOfUsed(pRpc->idPool), osid);
}
} else {
sid = pConn->sid;
tTrace("%s sid:%d id:%s, session is already there", pRpc->label, pConn->sid, pConn->meterId);
}
} else {
return TSDB_CODE_UNEXPECTED_RESPONSE;
}
int sid = taosAllocateId(pRpc->idPool);
if (sid <= 0) {
tError("%s maximum number of sessions:%d is reached", pRpc->label, pRpc->sessions);
terrno = TSDB_CODE_MAX_SESSIONS;
} else {
if (pRpc->connList[sid].meterId[0] == 0) {
tError("%s sid:%d session is already released", pRpc->label, sid);
return TSDB_CODE_INVALID_VALUE;
}
}
tTrace("%s sid:%d, ID allocated, used:%d, old id:%d", pRpc->label, sid, taosIdPoolNumOfUsed(pRpc->idPool));
pConn = pRpc->connList + sid;
memset(pConn, 0, sizeof(SRpcConn));
pConn->pRpc = pRpc;
pConn->sid = sid;
pConn->tranId = (uint16_t)(rand() & 0xFFFF);
pConn->ownId = htonl(pConn->sid);
pConn->spi = pRpc->spi;
pConn->encrypt = pRpc->encrypt;
if (pConn->spi) memcpy(pConn->secret, pRpc->secret, TSDB_KEY_LEN);
}
pConn = pRpc->connList + sid;
return pConn;
}
if (pConn->meterId[0] == 0) {
static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, char *meterId, char *hashstr) {
SRpcConn *pConn;
// check if it is already allocated
pConn = *(SRpcConn **)(taosGetStrHashData(pRpc->hash, hashstr));
if (pConn) return pConn;
int sid = taosAllocateId(pRpc->idPool);
if (sid <= 0) {
tError("%s maximum number of sessions:%d is reached", pRpc->label, pRpc->sessions);
terrno = TSDB_CODE_MAX_SESSIONS;
} else {
pConn = pRpc->connList + sid;
memset(pConn, 0, sizeof(SRpcConn));
memcpy(pConn->meterId, meterId, tListLen(pConn->meterId));
pConn->pRpc = pRpc;
pConn->sid = sid;
pConn->tranId = (uint16_t)(rand() & 0xFFFF);
pConn->ownId = htonl(pConn->sid);
if (pRpc->afp) {
int ret = (*pRpc->afp)(meterId, &pConn->spi, &pConn->encrypt, pConn->secret, pConn->ckey);
if (ret != 0) {
tWarn("%s pConn:%p, meterId not there", pRpc->label, pConn);
taosFreeId(pRpc->idPool, sid); // sid shall be released
memset(pConn, 0, sizeof(SRpcConn));
return ret;
}
if (pRpc->afp && (*pRpc->afp)(meterId, &pConn->spi, &pConn->encrypt, pConn->secret, pConn->ckey)) {
tWarn("%s pConn:%p, meterId not there", pRpc->label, pConn);
taosFreeId(pRpc->idPool, sid); // sid shall be released
terrno = TSDB_CODE_INVALID_USER;
pConn = NULL;
}
}
if (pConn) {
taosAddStrHash(pRpc->hash, hashstr, (char *)&pConn);
tTrace("%s pConn:%p, TAOS connection is allocated, sid:%d id:%s", pRpc->label, pConn, sid);
tTrace("%s pConn:%p, rpc connection is allocated, sid:%d id:%s", pRpc->label, pConn, sid);
}
return pConn;
}
static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, char *meterId, char *hashstr) {
SRpcConn *pConn = NULL;
if (sid) {
pConn = pRpc->connList + sid;
} else {
pConn = rpcAllocateServerConn(pRpc, meterId, hashstr);
}
if (pConn) {
if (memcmp(pConn->meterId, meterId, tListLen(pConn->meterId)) != 0) {
tTrace("%s pConn:%p, meterId:%s is not matched, received:%s", pRpc->label, pConn, pConn->meterId, meterId);
return TSDB_CODE_MISMATCHED_METER_ID;
terrno = TSDB_CODE_MISMATCHED_METER_ID;
pConn = NULL;
}
}
*ppConn = pConn;
return TSDB_CODE_SUCCESS;
return pConn;
}
SRpcConn *rpcSetConnToServer(void *shandle, SRpcIpSet ipSet) {
......@@ -600,8 +631,8 @@ static int rpcProcessRspHeader(SRpcConn *pConn, SRpcHeader *pHeader) {
if (*pHeader->content == TSDB_CODE_ACTION_IN_PROGRESS || pHeader->tcp) {
if (pConn->tretry <= tsRpcMaxRetry) {
tTrace("%s pConn:%p, peer is still processing the transaction", pRpc->label, pConn);
pConn->tretry++;
tTrace("%s pConn:%p, peer is still processing the transaction", pRpc->label, pConn);
taosTmrReset(rpcProcessRetryTimer, tsRpcProgressTime, pConn, pRpc->tmrCtrl, &pConn->pTimer);
return TSDB_CODE_ALREADY_PROCESSED;
} else {
......@@ -647,9 +678,8 @@ static int rpcProcessHeader(SRpcInfo *pRpc, SRpcConn **ppConn, void *data, int d
}
if (sid == 0) sprintf(hashstr, "%x:%x:%x", ip, pHeader->uid, pHeader->sourceId);
code = rpcGetConnObj(sid, pHeader->meterId, pRpc, &pConn, rpcIsReq(pHeader->msgType), hashstr);
if (code != TSDB_CODE_SUCCESS) return code;
pConn = rpcGetConnObj(pRpc, sid, pHeader->meterId, hashstr);
if (pConn == NULL ) return terrno;
*ppConn = pConn;
sid = pConn->sid;
......@@ -739,6 +769,7 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHeader *pHeader) {
pHeader = rpcDecompressRpcMsg(pHeader);
if ( rpcIsReq(pHeader->msgType) ) {
taosTmrReset(rpcProcessProgressTimer, tsRpcTimer/2, pConn, pRpc->tmrCtrl, &pConn->pTimer);
(*(pRpc->fp))(pHeader->msgType, pHeader->content, msgLen, pConn, 0);
} else {
// it's a response
......@@ -833,6 +864,8 @@ static void rpcSendReqToServer(SRpcConn *pConn, SRpcReqContext *pContext) {
int msgLen = rpcMsgLenFromCont(pContext->contLen);
char msgType = pContext->msgType;
pthread_mutex_lock(&pRpc->mutex);
// set the message header
pHeader->version = 1;
pHeader->msgType = msgType;
......@@ -854,6 +887,8 @@ static void rpcSendReqToServer(SRpcConn *pConn, SRpcReqContext *pContext) {
pConn->reqMsgLen = msgLen;
pConn->pContext = pContext;
pthread_mutex_unlock(&pRpc->mutex);
rpcSendDataToPeer(pConn, msg, msgLen);
taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer);
}
......@@ -863,10 +898,6 @@ static void rpcSendDataToPeer(SRpcConn *pConn, void *data, int dataLen) {
SRpcInfo *pRpc = pConn->pRpc;
SRpcHeader *pHeader = (SRpcHeader *)data;
assert(data);
assert(dataLen>0);
assert(pHeader->msgType > 0);
dataLen = rpcAddAuthPart(pConn, data, dataLen);
if ( rpcIsReq(pHeader->msgType)) {
......@@ -896,6 +927,7 @@ static void rpcProcessConnError(void *param, void *id) {
SRpcInfo *pRpc = pContext->pRpc;
if ( pContext->numOfRetry >= pContext->ipSet.numOfIps ) {
rpcFreeMsg(rpcHeaderFromCont(pContext->pCont)); // free the request msg
char *rsp = calloc(1, RPC_MSG_OVERHEAD + sizeof(STaosRsp));
if ( rsp ) {
STaosRsp *pRsp = (STaosRsp *)(rsp+sizeof(SRpcHeader));
......@@ -922,13 +954,11 @@ static void rpcProcessRetryTimer(void *param, void *tmrId) {
int reportDisc = 0;
SRpcInfo *pRpc = pConn->pRpc;
assert(pRpc);
if (pRpc == NULL) return; // it means it is already released
pthread_mutex_lock(&pRpc->mutex);
if (pConn->outType == 0) {
tTrace("%s pConn:%p, outtype is zero, it is already processed", pRpc->label, pConn);
} else {
if (pConn->outType && pConn->meterId[0]) {
tTrace("%s pConn:%p, expected %s is not received", pRpc->label, pConn, taosMsg[(int)pConn->outType + 1]);
pConn->pTimer = NULL;
pConn->retry++;
......@@ -937,36 +967,62 @@ static void rpcProcessRetryTimer(void *param, void *tmrId) {
tTrace("%s pConn:%p, re-send msg:%s to %s:%hu", pRpc->label,
taosMsg[pConn->outType], pConn->peerIpstr, pConn->peerPort);
rpcSendDataToPeer(pConn, pConn->pReqMsg, pConn->reqMsgLen);
taosTmrReset(rpcProcessRetryTimer, tsRpcTimer<<pConn->retry, pConn, pRpc->tmrCtrl, &pConn->pTimer);
} else {
// close the connection
tTrace("%s pConn:%p, failed to send msg:%s to %s:%hu", pRpc->label, pConn,
taosMsg[pConn->outType], pConn->peerIpstr, pConn->peerPort);
reportDisc = 1;
}
} else {
tTrace("%s pConn:%p, retry timer not processed", pRpc->label, pConn);
}
pthread_mutex_unlock(&pRpc->mutex);
pConn->pContext->code = TSDB_CODE_NETWORK_UNAVAIL;
if (reportDisc) rpcProcessConnError(pConn->pContext, NULL);
if (reportDisc) {
rpcProcessConnError(pConn->pContext, NULL);
rpcCloseConn(pConn);
}
}
static void rpcProcessIdleTimer(void *param, void *tmrId) {
SRpcConn *pConn = (SRpcConn *)param;
SRpcInfo *pRpc = pConn->pRpc;
assert(pRpc);
if (pConn->pIdleTimer != tmrId) {
tTrace("%s pConn:%p, idle timer:%p already processed", pRpc->label, pConn, tmrId);
return;
pthread_mutex_lock(&pRpc->mutex);
if (pConn->inType == 0 && pConn->meterId[0]) {
tTrace("%s pConn:%p, close the connection since no activity", pRpc->label, pConn);
rpcCloseConn(pConn);
} else {
tTrace("%s pConn:%p, idle timer:%p not processed", pRpc->label, pConn, tmrId);
}
pthread_mutex_unlock(&pRpc->mutex);
}
static void rpcProcessProgressTimer(void *param, void *tmrId) {
SRpcConn *pConn = (SRpcConn *)param;
SRpcInfo *pRpc = pConn->pRpc;
pthread_mutex_lock(&pRpc->mutex);
if (pConn->inType && pConn->meterId[0]) {
tTrace("%s pConn:%p, progress timer expired, send progress", pRpc->label, pConn);
rpcSendQuickRsp(pConn, TSDB_CODE_ACTION_IN_PROGRESS);
taosTmrReset(rpcProcessProgressTimer, tsRpcTimer<<pConn->retry, pConn, pRpc->tmrCtrl, &pConn->pTimer);
} else {
tTrace("%s pConn:%p, progress timer:%p not processed", pRpc->label, pConn, tmrId);
}
tTrace("%s pConn:%p, close the connection since no activity", pRpc->label, pConn);
rpcCloseConn(pConn);
pthread_mutex_unlock(&pRpc->mutex);
}
static void rpcFreeMsg(void *msg) {
if ( msg == NULL ) return;
char *req = ((char *)msg) - sizeof(SRpcReqContext);
free(req);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册