未验证 提交 533f3c04 编写于 作者: dengyihao's avatar dengyihao 提交者: GitHub

Merge pull request #10857 from taosdata/feature/supportQ

handle except
......@@ -228,8 +228,8 @@ typedef struct SConnBuffer {
typedef void (*AsyncCB)(uv_async_t* handle);
typedef struct {
void* pThrd;
queue qmsg;
void* pThrd;
queue qmsg;
TdThreadMutex mtx; // protect qmsg;
} SAsyncItem;
......@@ -273,11 +273,52 @@ void transCloseClient(void* arg);
void transCloseServer(void* arg);
void transCtxInit(STransCtx* ctx);
void transCtxDestroy(STransCtx* ctx);
void transCtxCleanup(STransCtx* ctx);
void transCtxClear(STransCtx* ctx);
void transCtxMerge(STransCtx* dst, STransCtx* src);
void* transCtxDumpVal(STransCtx* ctx, int32_t key);
// queue sending msgs
typedef struct {
SArray* q;
void (*free)(void* arg);
} STransQueue;
/*
* init queue
* note: queue'size is small, default 1
*/
void transQueueInit(STransQueue* queue, void (*free)(void* arg));
/*
* put arg into queue
* if queue'size > 1, return false; else return true
*/
bool transQueuePush(STransQueue* queue, void* arg);
/*
* pop head from queue
*/
void* transQueuePop(STransQueue* queue);
/*
* get head from queue
*/
void* transQueueGet(STransQueue* queue);
/*
* queue empty or not
*/
bool transQueueEmpty(STransQueue* queue);
/*
* clear queue
*/
void transQueueClear(STransQueue* queue);
/*
* destroy queue
*/
void transQueueDestroy(STransQueue* queue);
#ifdef __cplusplus
}
#endif
......
......@@ -25,13 +25,14 @@ typedef struct SCliConn {
void* hostThrd;
SConnBuffer readBuf;
void* data;
SArray* cliMsgs;
queue conn;
uint64_t expireTime;
int hThrdIdx;
bool broken; // link broken or not
STransCtx ctx;
// SArray* cliMsgs;
STransQueue cliMsgs;
queue conn;
uint64_t expireTime;
int hThrdIdx;
STransCtx ctx;
bool broken; // link broken or not
ConnStatus status; //
int release; // 1: release
// spi configure
......@@ -56,14 +57,14 @@ typedef struct SCliMsg {
} SCliMsg;
typedef struct SCliThrdObj {
TdThread thread;
TdThread thread;
uv_loop_t* loop;
SAsyncPool* asyncPool;
uv_timer_t timer;
void* pool; // conn pool
// msg queue
queue msg;
queue msg;
TdThreadMutex msgMtx;
uint64_t nextTimeout; // next timeout
......@@ -181,12 +182,11 @@ static void destroyThrdObj(SCliThrdObj* pThrd);
static void* cliWorkThread(void* arg);
bool cliMaySendCachedMsg(SCliConn* conn) {
if (taosArrayGetSize(conn->cliMsgs) > 0) {
if (!transQueueEmpty(&conn->cliMsgs)) {
cliSend(conn);
return true;
} else {
return false;
}
return false;
}
void cliHandleResp(SCliConn* conn) {
SCliThrdObj* pThrd = conn->hostThrd;
......@@ -195,6 +195,7 @@ void cliHandleResp(SCliConn* conn) {
STransMsgHead* pHead = (STransMsgHead*)(conn->readBuf.buf);
pHead->code = htonl(pHead->code);
pHead->msgLen = htonl(pHead->msgLen);
STransMsg transMsg = {0};
transMsg.contLen = transContLenFromMsg(pHead->msgLen);
transMsg.pCont = transContFromHead((char*)pHead);
......@@ -204,11 +205,7 @@ void cliHandleResp(SCliConn* conn) {
CONN_SHOULD_RELEASE(conn, pHead);
SCliMsg* pMsg = NULL;
if (taosArrayGetSize(conn->cliMsgs) > 0) {
pMsg = taosArrayGetP(conn->cliMsgs, 0);
taosArrayRemove(conn->cliMsgs, 0);
}
SCliMsg* pMsg = transQueuePop(&conn->cliMsgs);
STransConnCtx* pCtx = pMsg ? pMsg->ctx : NULL;
if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(conn)) {
......@@ -264,7 +261,7 @@ _RETURN:
}
void cliHandleExcept(SCliConn* pConn) {
if (taosArrayGetSize(pConn->cliMsgs) == 0) {
if (transQueueEmpty(&pConn->cliMsgs)) {
if (pConn->broken == true || CONN_NO_PERSIST_BY_APP(pConn)) {
transUnrefCliHandle(pConn);
return;
......@@ -274,11 +271,7 @@ void cliHandleExcept(SCliConn* pConn) {
STrans* pTransInst = pThrd->pTransInst;
do {
SCliMsg* pMsg = NULL;
if (taosArrayGetSize(pConn->cliMsgs) > 0) {
pMsg = taosArrayGetP(pConn->cliMsgs, 0);
taosArrayRemove(pConn->cliMsgs, 0);
}
SCliMsg* pMsg = transQueuePop(&pConn->cliMsgs);
STransConnCtx* pCtx = pMsg ? pMsg->ctx : NULL;
......@@ -303,7 +296,7 @@ void cliHandleExcept(SCliConn* pConn) {
}
destroyCmsg(pMsg);
tTrace("%s cli conn %p start to destroy", CONN_GET_INST_LABEL(pConn), pConn);
} while (taosArrayGetSize(pConn->cliMsgs) > 0);
} while (!transQueueEmpty(&pConn->cliMsgs));
transUnrefCliHandle(pConn);
}
......@@ -380,21 +373,20 @@ static void addConnToPool(void* pool, SCliConn* conn) {
SCliThrdObj* thrd = conn->hostThrd;
CONN_HANDLE_THREAD_QUIT(thrd);
char key[128] = {0};
STrans* pTransInst = ((SCliThrdObj*)conn->hostThrd)->pTransInst;
conn->expireTime = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime);
transCtxCleanup(&conn->ctx);
transQueueClear(&conn->cliMsgs);
conn->status = ConnNormal;
transCtxDestroy(&conn->ctx);
char key[128] = {0};
tstrncpy(key, conn->ip, strlen(conn->ip));
tstrncpy(key + strlen(key), (char*)(&conn->port), sizeof(conn->port));
tTrace("cli conn %p added to conn pool, read buf cap: %d", conn, conn->readBuf.cap);
STrans* pTransInst = ((SCliThrdObj*)conn->hostThrd)->pTransInst;
conn->expireTime = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime);
SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key));
conn->status = ConnNormal;
// list already create before
assert(plist != NULL);
taosArrayClear(conn->cliMsgs);
QUEUE_PUSH(&plist->conn, &conn->conn);
assert(!QUEUE_IS_EMPTY(&plist->conn));
}
......@@ -445,7 +437,8 @@ static SCliConn* cliCreateConn(SCliThrdObj* pThrd) {
conn->writeReq.data = conn;
conn->connReq.data = conn;
conn->cliMsgs = taosArrayInit(2, sizeof(void*));
transQueueInit(&conn->cliMsgs, NULL);
QUEUE_INIT(&conn->conn);
conn->hostThrd = pThrd;
conn->status = ConnNormal;
......@@ -465,18 +458,18 @@ static void cliDestroy(uv_handle_t* handle) {
SCliConn* conn = handle->data;
free(conn->ip);
free(conn->stream);
transCtxDestroy(&conn->ctx);
taosArrayDestroy(conn->cliMsgs);
transCtxCleanup(&conn->ctx);
transQueueDestroy(&conn->cliMsgs);
tTrace("%s cli conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn);
free(conn);
}
static bool cliHandleNoResp(SCliConn* conn) {
bool res = false;
SArray* msgs = conn->cliMsgs;
if (taosArrayGetSize(msgs) > 0) {
SCliMsg* pMsg = taosArrayGetP(msgs, 0);
bool res = false;
if (!transQueueEmpty(&conn->cliMsgs)) {
SCliMsg* pMsg = transQueueGet(&conn->cliMsgs);
if (REQUEST_NO_RESP(&pMsg->msg)) {
taosArrayRemove(msgs, 0);
transQueuePop(&conn->cliMsgs);
// taosArrayRemove(msgs, 0);
destroyCmsg(pMsg);
res = true;
}
......@@ -509,8 +502,9 @@ static void cliSendCb(uv_write_t* req, int status) {
void cliSend(SCliConn* pConn) {
CONN_HANDLE_BROKEN(pConn);
assert(taosArrayGetSize(pConn->cliMsgs) > 0);
SCliMsg* pCliMsg = taosArrayGetP(pConn->cliMsgs, 0);
// assert(taosArrayGetSize(pConn->cliMsgs) > 0);
assert(!transQueueEmpty(&pConn->cliMsgs));
SCliMsg* pCliMsg = transQueueGet(&pConn->cliMsgs);
STransConnCtx* pCtx = pCliMsg->ctx;
SCliThrdObj* pThrd = pConn->hostThrd;
......@@ -600,9 +594,8 @@ static void cliHandleRelease(SCliMsg* pMsg, SCliThrdObj* pThrd) {
if (T_REF_VAL_GET(conn) == 2) {
transUnrefCliHandle(conn);
taosArrayPush(conn->cliMsgs, &pMsg);
if (taosArrayGetSize(conn->cliMsgs) >= 2) {
return; // send one by one
if (!transQueuePush(&conn->cliMsgs, pMsg)) {
return;
}
cliSend(conn);
} else {
......@@ -643,17 +636,14 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
conn->hThrdIdx = pCtx->hThrdIdx;
transCtxMerge(&conn->ctx, &pCtx->appCtx);
if (taosArrayGetSize(conn->cliMsgs) > 0) {
taosArrayPush(conn->cliMsgs, &pMsg);
if (!transQueuePush(&conn->cliMsgs, pMsg)) {
return;
}
taosArrayPush(conn->cliMsgs, &pMsg);
transDestroyBuffer(&conn->readBuf);
cliSend(conn);
} else {
conn = cliCreateConn(pThrd);
taosArrayPush(conn->cliMsgs, &pMsg);
transQueuePush(&conn->cliMsgs, pMsg);
conn->hThrdIdx = pCtx->hThrdIdx;
conn->ip = strdup(pMsg->ctx->ip);
......
......@@ -228,7 +228,7 @@ void transCtxInit(STransCtx* ctx) {
// init transCtx
ctx->args = taosHashInit(2, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UINT), true, HASH_NO_LOCK);
}
void transCtxDestroy(STransCtx* ctx) {
void transCtxCleanup(STransCtx* ctx) {
if (ctx->args == NULL) {
return;
}
......@@ -276,4 +276,49 @@ void* transCtxDumpVal(STransCtx* ctx, int32_t key) {
return (void*)ret;
}
void transQueueInit(STransQueue* queue, void (*free)(void* arg)) {
queue->q = taosArrayInit(2, sizeof(void*));
queue->free = free;
}
bool transQueuePush(STransQueue* queue, void* arg) {
taosArrayPush(queue->q, &arg);
if (taosArrayGetSize(queue->q) > 1) {
return false;
}
return true;
}
void* transQueuePop(STransQueue* queue) {
if (taosArrayGetSize(queue->q) == 0) {
return NULL;
}
void* ptr = taosArrayGetP(queue->q, 0);
taosArrayRemove(queue->q, 0);
return ptr;
}
void* transQueueGet(STransQueue* queue) {
if (taosArrayGetSize(queue->q) == 0) {
return NULL;
}
void* ptr = taosArrayGetP(queue->q, 0);
return ptr;
}
bool transQueueEmpty(STransQueue* queue) {
//
return taosArrayGetSize(queue->q) == 0;
}
void transQueueClear(STransQueue* queue) {
if (queue->free != NULL) {
for (int i = 0; i < taosArrayGetSize(queue->q); i++) {
void* p = taosArrayGetP(queue->q, i);
queue->free(p);
}
}
taosArrayClear(queue->q);
}
void transQueueDestroy(STransQueue* queue) {
transQueueClear(queue);
taosArrayDestroy(queue->q);
}
#endif
......@@ -37,7 +37,7 @@ typedef struct SSrvConn {
void* pTransInst; // rpc init
void* ahandle; //
void* hostThrd;
SArray* srvMsgs;
STransQueue srvMsgs;
SSrvRegArg regArg;
bool broken; // conn broken;
......@@ -62,12 +62,12 @@ typedef struct SSrvMsg {
} SSrvMsg;
typedef struct SWorkThrdObj {
TdThread thread;
uv_pipe_t* pipe;
uv_os_fd_t fd;
uv_loop_t* loop;
SAsyncPool* asyncPool;
queue msg;
TdThread thread;
uv_pipe_t* pipe;
uv_os_fd_t fd;
uv_loop_t* loop;
SAsyncPool* asyncPool;
queue msg;
TdThreadMutex msgMtx;
queue conn;
......@@ -76,7 +76,7 @@ typedef struct SWorkThrdObj {
} SWorkThrdObj;
typedef struct SServerObj {
TdThread thread;
TdThread thread;
uv_tcp_t server;
uv_loop_t* loop;
......@@ -106,8 +106,7 @@ static const char* notify = "a";
srvMsg->msg = tmsg; \
srvMsg->type = Release; \
srvMsg->pConn = conn; \
taosArrayPush(conn->srvMsgs, &srvMsg); \
if (taosArrayGetSize(conn->srvMsgs) > 1) { \
if (!transQueuePush(&conn->srvMsgs, srvMsg)) { \
return; \
} \
uvStartSendRespInternal(srvMsg); \
......@@ -271,20 +270,16 @@ void uvOnSendCb(uv_write_t* req, int status) {
transClearBuffer(&conn->readBuf);
if (status == 0) {
tTrace("server conn %p data already was written on stream", conn);
if (conn->srvMsgs != NULL) {
assert(taosArrayGetSize(conn->srvMsgs) >= 1);
SSrvMsg* msg = taosArrayGetP(conn->srvMsgs, 0);
tTrace("server conn %p sending msg size: %d", conn, (int)taosArrayGetSize(conn->srvMsgs));
taosArrayRemove(conn->srvMsgs, 0);
if (!transQueueEmpty(&conn->srvMsgs)) {
SSrvMsg* msg = transQueuePop(&conn->srvMsgs);
if (msg->type == Release && conn->status != ConnNormal) {
conn->status = ConnNormal;
transUnrefSrvHandle(conn);
}
destroySmsg(msg);
// send second data, just use for push
if (taosArrayGetSize(conn->srvMsgs) > 0) {
tTrace("resent server conn %p sending msg size: %d", conn, (int)taosArrayGetSize(conn->srvMsgs));
msg = (SSrvMsg*)taosArrayGetP(conn->srvMsgs, 0);
if (!transQueueEmpty(&conn->srvMsgs)) {
msg = (SSrvMsg*)transQueueGet(&conn->srvMsgs);
if (msg->type == Register && conn->status == ConnAcquire) {
conn->regArg.notifyCount = 0;
conn->regArg.init = 1;
......@@ -294,7 +289,7 @@ void uvOnSendCb(uv_write_t* req, int status) {
(pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL);
memset(&conn->regArg, 0, sizeof(conn->regArg));
}
taosArrayRemove(conn->srvMsgs, 0);
transQueuePop(&conn->srvMsgs);
free(msg);
} else {
uvStartSendRespInternal(msg);
......@@ -373,10 +368,7 @@ static void uvStartSendResp(SSrvMsg* smsg) {
transUnrefSrvHandle(pConn);
}
taosArrayPush(pConn->srvMsgs, &smsg);
if (taosArrayGetSize(pConn->srvMsgs) > 1) {
tDebug("server conn %p send data to client %s:%d, local info: %s:%d", pConn, taosInetNtoa(pConn->addr.sin_addr),
ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port));
if (!transQueuePush(&pConn->srvMsgs, smsg)) {
return;
}
uvStartSendRespInternal(smsg);
......@@ -608,14 +600,15 @@ static SSrvConn* createConn(void* hThrd) {
QUEUE_INIT(&pConn->queue);
QUEUE_PUSH(&pThrd->conn, &pConn->queue);
pConn->srvMsgs = taosArrayInit(2, sizeof(void*)); //
tTrace("server conn %p created", pConn);
transQueueInit(&pConn->srvMsgs, NULL);
memset(&pConn->regArg, 0, sizeof(pConn->regArg));
pConn->broken = false;
pConn->status = ConnNormal;
transRefSrvHandle(pConn);
tTrace("server conn %p created", pConn);
return pConn;
}
......@@ -625,11 +618,7 @@ static void destroyConn(SSrvConn* conn, bool clear) {
}
transDestroyBuffer(&conn->readBuf);
for (int i = 0; i < taosArrayGetSize(conn->srvMsgs); i++) {
SSrvMsg* msg = taosArrayGetP(conn->srvMsgs, i);
destroySmsg(msg);
}
conn->srvMsgs = taosArrayDestroy(conn->srvMsgs);
transQueueDestroy(&conn->srvMsgs);
if (clear) {
tTrace("server conn %p to be destroyed", conn);
uv_shutdown_t* req = malloc(sizeof(uv_shutdown_t));
......@@ -724,8 +713,7 @@ void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd) {
// release handle to rpc init
SSrvConn* conn = msg->pConn;
if (conn->status == ConnAcquire) {
taosArrayPush(conn->srvMsgs, &msg);
if (taosArrayGetSize(conn->srvMsgs) > 1) {
if (!transQueuePush(&conn->srvMsgs, msg)) {
return;
}
uvStartSendRespInternal(msg);
......@@ -744,8 +732,7 @@ void uvHandleRegister(SSrvMsg* msg, SWorkThrdObj* thrd) {
SSrvConn* conn = msg->pConn;
tDebug("server conn %p register brokenlink callback", conn);
if (conn->status == ConnAcquire) {
if (taosArrayGetSize(conn->srvMsgs) > 0) {
taosArrayPush(conn->srvMsgs, &msg);
if (!transQueuePush(&conn->srvMsgs, msg)) {
return;
}
conn->regArg.notifyCount = 0;
......
......@@ -144,7 +144,7 @@ class TransCtxEnv : public ::testing::Test {
// TODO
}
virtual void TearDown() {
transCtxDestroy(ctx);
transCtxCleanup(ctx);
// formate
}
STransCtx *ctx;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册