提交 3a01542c 编写于 作者: dengyihao's avatar dengyihao

merge trans

上级 dbe325e1
......@@ -135,6 +135,7 @@ void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pReq) {
dDebug("startup req is sent, step:%s desc:%s finished:%d", pStartup->name, pStartup->desc, pStartup->finished);
SRpcMsg rpcRsp = {.handle = pReq->handle, .pCont = pStartup, .contLen = sizeof(SStartupReq), .ahandle = NULL};
SRpcMsg rpcRsp = {
.handle = pReq->handle, .pCont = pStartup, .contLen = sizeof(SStartupReq), .ahandle = pReq->ahandle};
rpcSendResponse(&rpcRsp);
}
......@@ -23,7 +23,7 @@
static void *dmThreadRoutine(void *param) {
SDnodeMgmt *pMgmt = param;
SDnode *pDnode = pMgmt->pDnode;
SDnode * pDnode = pMgmt->pDnode;
int64_t lastStatusTime = taosGetTimestampMs();
int64_t lastMonitorTime = lastStatusTime;
......@@ -55,7 +55,7 @@ static void *dmThreadRoutine(void *param) {
static void dmProcessQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
SDnodeMgmt *pMgmt = pInfo->ahandle;
SDnode *pDnode = pMgmt->pDnode;
SDnode * pDnode = pMgmt->pDnode;
SRpcMsg *pRpc = &pMsg->rpcMsg;
int32_t code = -1;
dTrace("msg:%p, will be processed in dnode queue", pMsg);
......
......@@ -183,7 +183,7 @@ typedef struct {
#pragma pack(pop)
typedef enum { Normal, Quit, Release, Register } STransMsgType;
typedef enum { ConnNormal, ConnAcquire, ConnRelease, ConnBroken } ConnStatus;
typedef enum { ConnNormal, ConnAcquire, ConnRelease, ConnBroken, ConnInPool } ConnStatus;
#define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member)))
#define RPC_RESERVE_SIZE (sizeof(STranConnCtx))
......
......@@ -53,6 +53,7 @@ typedef struct SCliMsg {
queue q;
uint64_t st;
STransMsgType type;
int sent; //(0: no send, 1: alread sent)
} SCliMsg;
typedef struct SCliThrdObj {
......@@ -135,6 +136,8 @@ static void destroyThrdObj(SCliThrdObj* pThrd);
#define CONN_SHOULD_RELEASE(conn, head) \
do { \
if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \
uint64_t ahandle = head->ahandle; \
CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle); \
conn->status = ConnRelease; \
transClearBuffer(&conn->readBuf); \
transFreeMsg(transContFromHead((char*)head)); \
......@@ -146,6 +149,7 @@ static void destroyThrdObj(SCliThrdObj* pThrd);
SCliThrdObj* thrd = conn->hostThrd; \
addConnToPool(thrd->pool, conn); \
} \
destroyCmsg(pMsg); \
return; \
} \
} while (0)
......@@ -198,8 +202,18 @@ static void* cliWorkThread(void* arg);
bool cliMaySendCachedMsg(SCliConn* conn) {
if (!transQueueEmpty(&conn->cliMsgs)) {
SCliMsg* pCliMsg = NULL;
int i = 0;
do {
pCliMsg = transQueueGet(&conn->cliMsgs, i++);
if (pCliMsg && 0 == pCliMsg->sent) {
break;
}
} while (pCliMsg != NULL);
if (pCliMsg == NULL) {
return false;
}
cliSend(conn);
return true;
}
return false;
}
......@@ -218,33 +232,27 @@ void cliHandleResp(SCliConn* conn) {
transMsg.msgType = pHead->msgType;
transMsg.ahandle = NULL;
CONN_SHOULD_RELEASE(conn, pHead);
SCliMsg* pMsg = NULL;
STransConnCtx* pCtx = NULL;
CONN_SHOULD_RELEASE(conn, pHead);
if (CONN_NO_PERSIST_BY_APP(conn)) {
pMsg = transQueuePop(&conn->cliMsgs);
/// uint64_t ahandle = (uint64_t)pHead->ahandle;
// CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle);
pCtx = pMsg ? pMsg->ctx : NULL;
if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(conn)) {
transMsg.ahandle = transCtxDumpVal(&conn->ctx, transMsg.msgType);
if (transMsg.ahandle == NULL) {
transMsg.ahandle = transCtxDumpBrokenlinkVal(&conn->ctx, (int32_t*)&(transMsg.msgType));
}
tDebug("cli conn %p construct ahandle %p, persist: 0", conn, transMsg.ahandle);
} else {
transMsg.ahandle = pCtx ? pCtx->ahandle : NULL;
tDebug("cli conn %p get ahandle %p, persist: 0", conn, transMsg.ahandle);
}
transMsg.ahandle = pCtx ? pCtx->ahandle : NULL;
tDebug("cli conn %p get ahandle %p, persist: 0", conn, transMsg.ahandle);
} else {
uint64_t ahandle = (uint64_t)pHead->ahandle;
CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle);
if (pMsg == NULL) {
transMsg.ahandle = transCtxDumpVal(&conn->ctx, transMsg.msgType);
tDebug("cli conn %p construct ahandle %p by %d, persist: 1", conn, transMsg.ahandle, transMsg.msgType);
if (transMsg.ahandle == NULL) {
tDebug("cli conn %p construct ahandle %p due brokenlink, persist: 1", conn, transMsg.ahandle);
transMsg.ahandle = transCtxDumpBrokenlinkVal(&conn->ctx, (int32_t*)&(transMsg.msgType));
}
tDebug("cli conn %p construct ahandle %p, persist: 1", conn, transMsg.ahandle);
} else {
pCtx = pMsg ? pMsg->ctx : NULL;
transMsg.ahandle = pCtx ? pCtx->ahandle : NULL;
......@@ -419,7 +427,7 @@ static void addConnToPool(void* pool, SCliConn* conn) {
conn->expireTime = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime);
transCtxCleanup(&conn->ctx);
transQueueClear(&conn->cliMsgs);
conn->status = ConnNormal;
conn->status = ConnInPool;
char key[128] = {0};
tstrncpy(key, conn->ip, strlen(conn->ip));
......@@ -546,7 +554,21 @@ void cliSend(SCliConn* pConn) {
// assert(taosArrayGetSize(pConn->cliMsgs) > 0);
assert(!transQueueEmpty(&pConn->cliMsgs));
SCliMsg* pCliMsg = transQueueGet(&pConn->cliMsgs, 0);
SCliMsg* pCliMsg = NULL;
int i = 0;
do {
pCliMsg = transQueueGet(&pConn->cliMsgs, i++);
if (pCliMsg && 0 == pCliMsg->sent) {
break;
}
} while (pCliMsg != NULL);
if (pCliMsg == NULL) {
return;
}
pCliMsg->sent = 1;
STransConnCtx* pCtx = pCliMsg->ctx;
SCliThrdObj* pThrd = pConn->hostThrd;
......@@ -558,7 +580,7 @@ void cliSend(SCliConn* pConn) {
pMsg->contLen = 0;
}
STransMsgHead* pHead = transHeadFromCont(pMsg->pCont);
pHead->ahandle = (uint64_t)pCtx->ahandle;
pHead->ahandle = pCtx != NULL ? (uint64_t)pCtx->ahandle : 0;
int msgLen = transMsgLenFromCont(pMsg->contLen);
......@@ -868,6 +890,7 @@ void transReleaseCliHandle(void* handle) {
STransMsg tmsg = {.handle = handle};
SCliMsg* cmsg = calloc(1, sizeof(SCliMsg));
cmsg->msg = tmsg;
cmsg->type = Release;
......
......@@ -93,25 +93,25 @@ typedef struct SServerObj {
static const char* notify = "a";
#define CONN_SHOULD_RELEASE(conn, head) \
do { \
if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \
conn->status = ConnRelease; \
transClearBuffer(&conn->readBuf); \
transFreeMsg(transContFromHead((char*)head)); \
tTrace("server conn %p received release request", conn); \
\
STransMsg tmsg = {.handle = (void*)conn, .code = 0}; \
SSrvMsg* srvMsg = calloc(1, sizeof(SSrvMsg)); \
srvMsg->msg = tmsg; \
srvMsg->type = Release; \
srvMsg->pConn = conn; \
if (!transQueuePush(&conn->srvMsgs, srvMsg)) { \
return; \
} \
uvStartSendRespInternal(srvMsg); \
return; \
} \
#define CONN_SHOULD_RELEASE(conn, head) \
do { \
if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \
conn->status = ConnRelease; \
transClearBuffer(&conn->readBuf); \
transFreeMsg(transContFromHead((char*)head)); \
tTrace("server conn %p received release request", conn); \
\
STransMsg tmsg = {.code = 0, .handle = (void*)conn, .ahandle = NULL}; \
SSrvMsg* srvMsg = calloc(1, sizeof(SSrvMsg)); \
srvMsg->msg = tmsg; \
srvMsg->type = Release; \
srvMsg->pConn = conn; \
if (!transQueuePush(&conn->srvMsgs, srvMsg)) { \
return; \
} \
uvStartSendRespInternal(srvMsg); \
return; \
} \
} while (0)
static void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
......@@ -823,7 +823,7 @@ void transReleaseSrvHandle(void* handle) {
SSrvConn* pConn = handle;
SWorkThrdObj* pThrd = pConn->hostThrd;
STransMsg tmsg = {.handle = handle, .code = 0};
STransMsg tmsg = {.code = 0, .handle = handle, .ahandle = NULL};
SSrvMsg* srvMsg = calloc(1, sizeof(SSrvMsg));
srvMsg->msg = tmsg;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册