提交 d595fe6b 编写于 作者: dengyihao's avatar dengyihao

add ahandle

上级 6eb799d7
...@@ -135,6 +135,6 @@ void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pReq) { ...@@ -135,6 +135,6 @@ void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pReq) {
dDebug("startup req is sent, step:%s desc:%s finished:%d", pStartup->name, pStartup->desc, pStartup->finished); dDebug("startup req is sent, step:%s desc:%s finished:%d", pStartup->name, pStartup->desc, pStartup->finished);
SRpcMsg rpcRsp = {.handle = pReq->handle, .pCont = pStartup, .contLen = sizeof(SStartupReq)}; SRpcMsg rpcRsp = {.handle = pReq->handle, .pCont = pStartup, .contLen = sizeof(SStartupReq), .ahandle = NULL};
rpcSendResponse(&rpcRsp); rpcSendResponse(&rpcRsp);
} }
...@@ -158,7 +158,8 @@ typedef struct { ...@@ -158,7 +158,8 @@ typedef struct {
char secured : 2; char secured : 2;
char spi : 2; char spi : 2;
uint32_t code; // del later uint64_t ahandle; // ahandle assigned by client
uint32_t code; // del later
uint32_t msgType; uint32_t msgType;
int32_t msgLen; int32_t msgLen;
uint8_t content[0]; // message body starts from here uint8_t content[0]; // message body starts from here
...@@ -296,20 +297,25 @@ void transQueueInit(STransQueue* queue, void (*free)(void* arg)); ...@@ -296,20 +297,25 @@ void transQueueInit(STransQueue* queue, void (*free)(void* arg));
* if queue'size > 1, return false; else return true * if queue'size > 1, return false; else return true
*/ */
bool transQueuePush(STransQueue* queue, void* arg); bool transQueuePush(STransQueue* queue, void* arg);
/*
* the size of queue
*/
int32_t transQueueSize(STransQueue* queue);
/* /*
* pop head from queue * pop head from queue
*/ */
void* transQueuePop(STransQueue* queue); void* transQueuePop(STransQueue* queue);
/* /*
* get head from queue * get ith from queue
*/ */
void* transQueueGet(STransQueue* queue); void* transQueueGet(STransQueue* queue, int i);
/*
* rm ith from queue
*/
void* transQueueRm(STransQueue* queue, int i);
/* /*
* queue empty or not * queue empty or not
*/ */
bool transQueueEmpty(STransQueue* queue); bool transQueueEmpty(STransQueue* queue);
/* /*
* clear queue * clear queue
......
...@@ -25,12 +25,11 @@ typedef struct SCliConn { ...@@ -25,12 +25,11 @@ typedef struct SCliConn {
void* hostThrd; void* hostThrd;
SConnBuffer readBuf; SConnBuffer readBuf;
void* data; void* data;
// SArray* cliMsgs; STransQueue cliMsgs;
STransQueue cliMsgs; queue conn;
queue conn; uint64_t expireTime;
uint64_t expireTime; int hThrdIdx;
int hThrdIdx; STransCtx ctx;
STransCtx ctx;
bool broken; // link broken or not bool broken; // link broken or not
ConnStatus status; // ConnStatus status; //
...@@ -151,6 +150,22 @@ static void destroyThrdObj(SCliThrdObj* pThrd); ...@@ -151,6 +150,22 @@ static void destroyThrdObj(SCliThrdObj* pThrd);
} \ } \
} while (0) } while (0)
#define CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle) \
do { \
int i = 0, sz = transQueueSize(&conn->cliMsgs); \
for (; i < sz; i++) { \
pMsg = transQueueGet(&conn->cliMsgs, i); \
if (pMsg != NULL && (uint64_t)pMsg->ctx->ahandle == ahandle) { \
break; \
} \
} \
if (i == sz) { \
pMsg = NULL; \
} else { \
pMsg = transQueueRm(&conn->cliMsgs, i); \
} \
} while (0)
#define CONN_HANDLE_THREAD_QUIT(thrd) \ #define CONN_HANDLE_THREAD_QUIT(thrd) \
do { \ do { \
if (thrd->quit) { \ if (thrd->quit) { \
...@@ -205,16 +220,36 @@ void cliHandleResp(SCliConn* conn) { ...@@ -205,16 +220,36 @@ void cliHandleResp(SCliConn* conn) {
CONN_SHOULD_RELEASE(conn, pHead); CONN_SHOULD_RELEASE(conn, pHead);
SCliMsg* pMsg = transQueuePop(&conn->cliMsgs); SCliMsg* pMsg = NULL;
STransConnCtx* pCtx = NULL;
STransConnCtx* pCtx = pMsg ? pMsg->ctx : NULL; if (CONN_NO_PERSIST_BY_APP(conn)) {
if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(conn)) { pMsg = transQueuePop(&conn->cliMsgs);
transMsg.ahandle = transCtxDumpVal(&conn->ctx, transMsg.msgType); pCtx = pMsg ? pMsg->ctx : NULL;
if (transMsg.ahandle == NULL) { if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(conn)) {
transMsg.ahandle = transCtxDumpBrokenlinkVal(&conn->ctx, (int32_t*)&(transMsg.msgType)); transMsg.ahandle = transCtxDumpVal(&conn->ctx, transMsg.msgType);
if (transMsg.ahandle == NULL) {
transMsg.ahandle = transCtxDumpBrokenlinkVal(&conn->ctx, (int32_t*)&(transMsg.msgType));
}
tDebug("cli conn %p construct ahandle %p, persist: 0", conn, transMsg.ahandle);
} else {
transMsg.ahandle = pCtx ? pCtx->ahandle : NULL;
tDebug("cli conn %p get ahandle %p, persist: 0", conn, transMsg.ahandle);
} }
} else { } else {
transMsg.ahandle = pCtx ? pCtx->ahandle : NULL; uint64_t ahandle = (uint64_t)pHead->ahandle;
CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle);
if (pMsg == NULL) {
transMsg.ahandle = transCtxDumpVal(&conn->ctx, transMsg.msgType);
if (transMsg.ahandle == NULL) {
transMsg.ahandle = transCtxDumpBrokenlinkVal(&conn->ctx, (int32_t*)&(transMsg.msgType));
}
tDebug("cli conn %p construct ahandle %p, persist: 1", conn, transMsg.ahandle);
} else {
pCtx = pMsg ? pMsg->ctx : NULL;
transMsg.ahandle = pCtx ? pCtx->ahandle : NULL;
tDebug("cli conn %p get ahandle %p, persist: 1", conn, transMsg.ahandle);
}
} }
// buf's mem alread translated to transMsg.pCont // buf's mem alread translated to transMsg.pCont
transClearBuffer(&conn->readBuf); transClearBuffer(&conn->readBuf);
...@@ -259,8 +294,6 @@ void cliHandleResp(SCliConn* conn) { ...@@ -259,8 +294,6 @@ void cliHandleResp(SCliConn* conn) {
if (!uv_is_active((uv_handle_t*)&pThrd->timer) && pTransInst->idleTime > 0) { if (!uv_is_active((uv_handle_t*)&pThrd->timer) && pTransInst->idleTime > 0) {
// uv_timer_start((uv_timer_t*)&pThrd->timer, cliTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0); // uv_timer_start((uv_timer_t*)&pThrd->timer, cliTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0);
} }
_RETURN:
return;
} }
void cliHandleExcept(SCliConn* pConn) { void cliHandleExcept(SCliConn* pConn) {
...@@ -282,11 +315,14 @@ void cliHandleExcept(SCliConn* pConn) { ...@@ -282,11 +315,14 @@ void cliHandleExcept(SCliConn* pConn) {
transMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL; transMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
transMsg.msgType = pMsg ? pMsg->msg.msgType + 1 : 0; transMsg.msgType = pMsg ? pMsg->msg.msgType + 1 : 0;
transMsg.ahandle = NULL; transMsg.ahandle = NULL;
transMsg.handle = pConn;
if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(pConn)) { if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(pConn)) {
transMsg.ahandle = transCtxDumpVal(&pConn->ctx, transMsg.msgType); transMsg.ahandle = transCtxDumpVal(&pConn->ctx, transMsg.msgType);
tDebug("cli conn %p construct msgType %s ahandle %p", pConn, TMSG_INFO(transMsg.msgType), transMsg.ahandle);
if (transMsg.ahandle == NULL) { if (transMsg.ahandle == NULL) {
transMsg.ahandle = transCtxDumpBrokenlinkVal(&pConn->ctx, (int32_t*)&(transMsg.msgType)); transMsg.ahandle = transCtxDumpBrokenlinkVal(&pConn->ctx, (int32_t*)&(transMsg.msgType));
tDebug("cli conn %p construct brokenlink ahandle %p", pConn, transMsg.ahandle);
} }
} else { } else {
transMsg.ahandle = pCtx ? pCtx->ahandle : NULL; transMsg.ahandle = pCtx ? pCtx->ahandle : NULL;
...@@ -472,7 +508,7 @@ static void cliDestroy(uv_handle_t* handle) { ...@@ -472,7 +508,7 @@ static void cliDestroy(uv_handle_t* handle) {
static bool cliHandleNoResp(SCliConn* conn) { static bool cliHandleNoResp(SCliConn* conn) {
bool res = false; bool res = false;
if (!transQueueEmpty(&conn->cliMsgs)) { if (!transQueueEmpty(&conn->cliMsgs)) {
SCliMsg* pMsg = transQueueGet(&conn->cliMsgs); SCliMsg* pMsg = transQueueGet(&conn->cliMsgs, 0);
if (REQUEST_NO_RESP(&pMsg->msg)) { if (REQUEST_NO_RESP(&pMsg->msg)) {
transQueuePop(&conn->cliMsgs); transQueuePop(&conn->cliMsgs);
// taosArrayRemove(msgs, 0); // taosArrayRemove(msgs, 0);
...@@ -510,7 +546,7 @@ void cliSend(SCliConn* pConn) { ...@@ -510,7 +546,7 @@ void cliSend(SCliConn* pConn) {
// assert(taosArrayGetSize(pConn->cliMsgs) > 0); // assert(taosArrayGetSize(pConn->cliMsgs) > 0);
assert(!transQueueEmpty(&pConn->cliMsgs)); assert(!transQueueEmpty(&pConn->cliMsgs));
SCliMsg* pCliMsg = transQueueGet(&pConn->cliMsgs); SCliMsg* pCliMsg = transQueueGet(&pConn->cliMsgs, 0);
STransConnCtx* pCtx = pCliMsg->ctx; STransConnCtx* pCtx = pCliMsg->ctx;
SCliThrdObj* pThrd = pConn->hostThrd; SCliThrdObj* pThrd = pConn->hostThrd;
...@@ -522,7 +558,9 @@ void cliSend(SCliConn* pConn) { ...@@ -522,7 +558,9 @@ void cliSend(SCliConn* pConn) {
pMsg->contLen = 0; pMsg->contLen = 0;
} }
STransMsgHead* pHead = transHeadFromCont(pMsg->pCont); STransMsgHead* pHead = transHeadFromCont(pMsg->pCont);
int msgLen = transMsgLenFromCont(pMsg->contLen); pHead->ahandle = (uint64_t)pCtx->ahandle;
int msgLen = transMsgLenFromCont(pMsg->contLen);
if (!pConn->secured) { if (!pConn->secured) {
char* buf = calloc(1, msgLen + sizeof(STransUserMsg)); char* buf = calloc(1, msgLen + sizeof(STransUserMsg));
......
...@@ -305,14 +305,34 @@ void* transQueuePop(STransQueue* queue) { ...@@ -305,14 +305,34 @@ void* transQueuePop(STransQueue* queue) {
taosArrayRemove(queue->q, 0); taosArrayRemove(queue->q, 0);
return ptr; return ptr;
} }
int32_t transQueueSize(STransQueue* queue) {
// Get size
return taosArrayGetSize(queue->q);
}
void* transQueueGet(STransQueue* queue, int i) {
if (taosArrayGetSize(queue->q) == 0) {
return NULL;
}
if (i >= taosArrayGetSize(queue->q)) {
return NULL;
}
void* transQueueGet(STransQueue* queue) { void* ptr = taosArrayGetP(queue->q, i);
return ptr;
}
void* transQueueRm(STransQueue* queue, int i) {
if (taosArrayGetSize(queue->q) == 0) { if (taosArrayGetSize(queue->q) == 0) {
return NULL; return NULL;
} }
void* ptr = taosArrayGetP(queue->q, 0); if (i >= taosArrayGetSize(queue->q)) {
return NULL;
}
void* ptr = taosArrayGetP(queue->q, i);
taosArrayRemove(queue->q, i);
return ptr; return ptr;
} }
bool transQueueEmpty(STransQueue* queue) { bool transQueueEmpty(STransQueue* queue) {
// //
return taosArrayGetSize(queue->q) == 0; return taosArrayGetSize(queue->q) == 0;
......
...@@ -190,7 +190,7 @@ static void uvHandleReq(SSrvConn* pConn) { ...@@ -190,7 +190,7 @@ static void uvHandleReq(SSrvConn* pConn) {
transMsg.pCont = pHead->content; transMsg.pCont = pHead->content;
transMsg.msgType = pHead->msgType; transMsg.msgType = pHead->msgType;
transMsg.code = pHead->code; transMsg.code = pHead->code;
transMsg.ahandle = NULL; transMsg.ahandle = (void*)pHead->ahandle;
transMsg.handle = NULL; transMsg.handle = NULL;
transClearBuffer(&pConn->readBuf); transClearBuffer(&pConn->readBuf);
...@@ -280,7 +280,7 @@ void uvOnSendCb(uv_write_t* req, int status) { ...@@ -280,7 +280,7 @@ void uvOnSendCb(uv_write_t* req, int status) {
destroySmsg(msg); destroySmsg(msg);
// send second data, just use for push // send second data, just use for push
if (!transQueueEmpty(&conn->srvMsgs)) { if (!transQueueEmpty(&conn->srvMsgs)) {
msg = (SSrvMsg*)transQueueGet(&conn->srvMsgs); msg = (SSrvMsg*)transQueueGet(&conn->srvMsgs, 0);
if (msg->type == Register && conn->status == ConnAcquire) { if (msg->type == Register && conn->status == ConnAcquire) {
conn->regArg.notifyCount = 0; conn->regArg.notifyCount = 0;
conn->regArg.init = 1; conn->regArg.init = 1;
...@@ -326,6 +326,7 @@ static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) { ...@@ -326,6 +326,7 @@ static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) {
pMsg->contLen = 0; pMsg->contLen = 0;
} }
STransMsgHead* pHead = transHeadFromCont(pMsg->pCont); STransMsgHead* pHead = transHeadFromCont(pMsg->pCont);
pHead->ahandle = (uint64_t)pMsg->ahandle;
// pHead->secured = pMsg->code == 0 ? 1 : 0; // // pHead->secured = pMsg->code == 0 ? 1 : 0; //
if (!pConn->secured) { if (!pConn->secured) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册