提交 124b8b66 编写于 作者: dengyihao's avatar dengyihao

enh(rpc): fix hb problem

上级 eefb0a20
......@@ -43,6 +43,7 @@ typedef struct SRpcMsg {
int32_t code;
void * handle; // rpc handle returned to app
void * ahandle; // app handle set by client
int64_t refId; //
int noResp; // has response or not(default 0, 0: resp, 1: no resp);
int persistHandle; // persist handle or not
......
......@@ -275,7 +275,7 @@ static void dmCloseNodes(SDnode *pDnode) {
static void dmProcessProcHandle(void *handle) {
dWarn("handle:%p, the child process dies and send an offline rsp", handle);
SRpcMsg rpcMsg = {.handle = handle, .code = TSDB_CODE_NODE_OFFLINE};
SRpcMsg rpcMsg = {.handle = handle, .code = TSDB_CODE_NODE_OFFLINE, .refId = -1};
rpcSendResponse(&rpcMsg);
}
......
......@@ -134,7 +134,8 @@ static void dmProcessMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
if (pDnode->status != DND_STAT_RUNNING) {
dError("msg:%s ignored since dnode not running, handle:%p app:%p", TMSG_INFO(msgType), pMsg->handle, pMsg->ahandle);
if (isReq) {
SRpcMsg rspMsg = {.handle = pMsg->handle, .code = TSDB_CODE_APP_NOT_READY, .ahandle = pMsg->ahandle};
SRpcMsg rspMsg = {
.handle = pMsg->handle, .code = TSDB_CODE_APP_NOT_READY, .ahandle = pMsg->ahandle, .refId = pMsg->refId};
rpcSendResponse(&rspMsg);
}
rpcFreeCont(pMsg->pCont);
......@@ -143,7 +144,8 @@ static void dmProcessMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
if (isReq && pMsg->pCont == NULL) {
dError("req:%s not processed since its empty, handle:%p app:%p", TMSG_INFO(msgType), pMsg->handle, pMsg->ahandle);
SRpcMsg rspMsg = {.handle = pMsg->handle, .code = TSDB_CODE_INVALID_MSG_LEN, .ahandle = pMsg->ahandle};
SRpcMsg rspMsg = {
.handle = pMsg->handle, .code = TSDB_CODE_INVALID_MSG_LEN, .ahandle = pMsg->ahandle, .refId = pMsg->refId};
rpcSendResponse(&rspMsg);
return;
}
......@@ -151,7 +153,8 @@ static void dmProcessMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
if (pWrapper == NULL) {
dError("msg:%s not processed since no handle, handle:%p app:%p", TMSG_INFO(msgType), pMsg->handle, pMsg->ahandle);
if (isReq) {
SRpcMsg rspMsg = {.handle = pMsg->handle, .code = TSDB_CODE_MSG_NOT_PROCESSED, .ahandle = pMsg->ahandle};
SRpcMsg rspMsg = {
.handle = pMsg->handle, .code = TSDB_CODE_MSG_NOT_PROCESSED, .ahandle = pMsg->ahandle, .refId = pMsg->refId};
rpcSendResponse(&rspMsg);
}
rpcFreeCont(pMsg->pCont);
......@@ -317,7 +320,7 @@ static void dmConsumeChildQueue(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int16_t
if (code != 0) {
dError("msg:%p, failed to process since code:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
if (pRpc->msgType & 1U) {
SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = terrno};
SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = terrno, .refId = pRpc->refId};
dmSendRsp(pWrapper, &rsp);
}
......
......@@ -105,7 +105,7 @@ void dmStopMonitorThread(SDnode *pDnode) {
}
static void dmProcessMgmtQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
SDnode *pDnode = pInfo->ahandle;
SDnode * pDnode = pInfo->ahandle;
SRpcMsg *pRpc = &pMsg->rpcMsg;
int32_t code = -1;
dTrace("msg:%p, will be processed in dnode-mgmt queue", pMsg);
......@@ -150,7 +150,7 @@ static void dmProcessMgmtQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
if (pRpc->msgType & 1u) {
if (code != 0) code = terrno;
SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = code};
SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = code, .refId = pRpc->refId};
rpcSendResponse(&rsp);
}
......
......@@ -177,7 +177,7 @@ void dmProcessServerStatusReq(SDnode *pDnode, SRpcMsg *pReq) {
SServerStatusRsp statusRsp = {0};
dmGetServerStatus(pDnode, &statusRsp);
SRpcMsg rspMsg = {.handle = pReq->handle, .ahandle = pReq->ahandle};
SRpcMsg rspMsg = {.handle = pReq->handle, .ahandle = pReq->ahandle, .refId = pReq->refId};
int32_t rspLen = tSerializeSServerStatusRsp(NULL, 0, &statusRsp);
if (rspLen < 0) {
rspMsg.code = TSDB_CODE_OUT_OF_MEMORY;
......
......@@ -25,10 +25,10 @@ extern "C" {
#include "ttimer.h"
#define QW_DEFAULT_SCHEDULER_NUMBER 10000
#define QW_DEFAULT_TASK_NUMBER 10000
#define QW_DEFAULT_SCH_TASK_NUMBER 10000
#define QW_DEFAULT_SHORT_RUN_TIMES 2
#define QW_DEFAULT_HEARTBEAT_MSEC 3000
#define QW_DEFAULT_TASK_NUMBER 10000
#define QW_DEFAULT_SCH_TASK_NUMBER 10000
#define QW_DEFAULT_SHORT_RUN_TIMES 2
#define QW_DEFAULT_HEARTBEAT_MSEC 3000
enum {
QW_PHASE_PRE_QUERY = 1,
......@@ -60,7 +60,6 @@ enum {
QW_WRITE,
};
enum {
QW_NOT_EXIST_RET_ERR = 1,
QW_NOT_EXIST_ADD,
......@@ -73,65 +72,65 @@ typedef struct SQWDebug {
} SQWDebug;
typedef struct SQWConnInfo {
void *handle;
void *ahandle;
void * handle;
void * ahandle;
int64_t refId;
} SQWConnInfo;
typedef struct SQWMsg {
void *node;
int32_t code;
char *msg;
int32_t msgLen;
SQWConnInfo connInfo;
void * node;
int32_t code;
char * msg;
int32_t msgLen;
SQWConnInfo connInfo;
} SQWMsg;
typedef struct SQWHbInfo {
SSchedulerHbRsp rsp;
SQWConnInfo connInfo;
SSchedulerHbRsp rsp;
SQWConnInfo connInfo;
} SQWHbInfo;
typedef struct SQWPhaseInput {
int32_t code;
int32_t code;
} SQWPhaseInput;
typedef struct SQWPhaseOutput {
} SQWPhaseOutput;
typedef struct SQWTaskStatus {
int64_t refId; // job's refId
int32_t code;
int8_t status;
typedef struct SQWTaskStatus {
int64_t refId; // job's refId
int32_t code;
int8_t status;
} SQWTaskStatus;
typedef struct SQWTaskCtx {
SRWLatch lock;
int8_t phase;
int8_t taskType;
int8_t explain;
bool queryFetched;
bool queryEnd;
bool queryContinue;
bool queryInQueue;
int32_t rspCode;
SQWConnInfo ctrlConnInfo;
SQWConnInfo dataConnInfo;
int8_t events[QW_EVENT_MAX];
qTaskInfo_t taskHandle;
DataSinkHandle sinkHandle;
SRWLatch lock;
int8_t phase;
int8_t taskType;
int8_t explain;
bool queryFetched;
bool queryEnd;
bool queryContinue;
bool queryInQueue;
int32_t rspCode;
SQWConnInfo ctrlConnInfo;
SQWConnInfo dataConnInfo;
int8_t events[QW_EVENT_MAX];
qTaskInfo_t taskHandle;
DataSinkHandle sinkHandle;
} SQWTaskCtx;
typedef struct SQWSchStatus {
int32_t lastAccessTs; // timestamp in second
int32_t lastAccessTs; // timestamp in second
SRWLatch hbConnLock;
SQWConnInfo hbConnInfo;
SQueryNodeEpId hbEpId;
SQueryNodeEpId hbEpId;
SRWLatch tasksLock;
SHashObj *tasksHash; // key:queryId+taskId, value: SQWTaskStatus
SHashObj * tasksHash; // key:queryId+taskId, value: SQWTaskStatus
} SQWSchStatus;
// Qnode/Vnode level task management
......@@ -139,100 +138,146 @@ typedef struct SQWorkerMgmt {
SQWorkerCfg cfg;
int8_t nodeType;
int32_t nodeId;
void *timer;
void * timer;
tmr_h hbTimer;
SRWLatch schLock;
// SRWLatch ctxLock;
SHashObj *schHash; // key: schedulerId, value: SQWSchStatus
SHashObj *ctxHash; // key: queryId+taskId, value: SQWTaskCtx
SMsgCb msgCb;
SHashObj *schHash; // key: schedulerId, value: SQWSchStatus
SHashObj *ctxHash; // key: queryId+taskId, value: SQWTaskCtx
SMsgCb msgCb;
} SQWorkerMgmt;
#define QW_FPARAMS_DEF SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId
#define QW_IDS() sId, qId, tId, rId
#define QW_FPARAMS() mgmt, QW_IDS()
#define QW_IDS() sId, qId, tId, rId
#define QW_FPARAMS() mgmt, QW_IDS()
#define QW_GET_EVENT_VALUE(ctx, event) atomic_load_8(&(ctx)->events[event])
#define QW_IS_EVENT_RECEIVED(ctx, event) (atomic_load_8(&(ctx)->events[event]) == QW_EVENT_RECEIVED)
#define QW_IS_EVENT_PROCESSED(ctx, event) (atomic_load_8(&(ctx)->events[event]) == QW_EVENT_PROCESSED)
#define QW_SET_EVENT_RECEIVED(ctx, event) atomic_store_8(&(ctx)->events[event], QW_EVENT_RECEIVED)
#define QW_IS_EVENT_RECEIVED(ctx, event) (atomic_load_8(&(ctx)->events[event]) == QW_EVENT_RECEIVED)
#define QW_IS_EVENT_PROCESSED(ctx, event) (atomic_load_8(&(ctx)->events[event]) == QW_EVENT_PROCESSED)
#define QW_SET_EVENT_RECEIVED(ctx, event) atomic_store_8(&(ctx)->events[event], QW_EVENT_RECEIVED)
#define QW_SET_EVENT_PROCESSED(ctx, event) atomic_store_8(&(ctx)->events[event], QW_EVENT_PROCESSED)
#define QW_GET_PHASE(ctx) atomic_load_8(&(ctx)->phase)
#define QW_SET_RSP_CODE(ctx, code) atomic_store_32(&(ctx)->rspCode, code)
#define QW_SET_RSP_CODE(ctx, code) atomic_store_32(&(ctx)->rspCode, code)
#define QW_UPDATE_RSP_CODE(ctx, code) atomic_val_compare_exchange_32(&(ctx)->rspCode, 0, code)
#define QW_IS_QUERY_RUNNING(ctx) (QW_GET_PHASE(ctx) == QW_PHASE_PRE_QUERY || QW_GET_PHASE(ctx) == QW_PHASE_PRE_CQUERY)
#define QW_TASK_NOT_EXIST(code) (TSDB_CODE_QRY_SCH_NOT_EXIST == (code) || TSDB_CODE_QRY_TASK_NOT_EXIST == (code))
#define QW_TASK_NOT_EXIST(code) (TSDB_CODE_QRY_SCH_NOT_EXIST == (code) || TSDB_CODE_QRY_TASK_NOT_EXIST == (code))
#define QW_TASK_ALREADY_EXIST(code) (TSDB_CODE_QRY_TASK_ALREADY_EXIST == (code))
#define QW_TASK_READY(status) (status == JOB_TASK_STATUS_SUCCEED || status == JOB_TASK_STATUS_FAILED || status == JOB_TASK_STATUS_CANCELLED || status == JOB_TASK_STATUS_PARTIAL_SUCCEED)
#define QW_SET_QTID(id, qId, tId) do { *(uint64_t *)(id) = (qId); *(uint64_t *)((char *)(id) + sizeof(qId)) = (tId); } while (0)
#define QW_GET_QTID(id, qId, tId) do { (qId) = *(uint64_t *)(id); (tId) = *(uint64_t *)((char *)(id) + sizeof(qId)); } while (0)
#define QW_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0)
#define QW_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
#define QW_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0)
#define QW_TASK_READY(status) \
(status == JOB_TASK_STATUS_SUCCEED || status == JOB_TASK_STATUS_FAILED || status == JOB_TASK_STATUS_CANCELLED || \
status == JOB_TASK_STATUS_PARTIAL_SUCCEED)
#define QW_SET_QTID(id, qId, tId) \
do { \
*(uint64_t *)(id) = (qId); \
*(uint64_t *)((char *)(id) + sizeof(qId)) = (tId); \
} while (0)
#define QW_GET_QTID(id, qId, tId) \
do { \
(qId) = *(uint64_t *)(id); \
(tId) = *(uint64_t *)((char *)(id) + sizeof(qId)); \
} while (0)
#define QW_ERR_RET(c) \
do { \
int32_t _code = c; \
if (_code != TSDB_CODE_SUCCESS) { \
terrno = _code; \
return _code; \
} \
} while (0)
#define QW_RET(c) \
do { \
int32_t _code = c; \
if (_code != TSDB_CODE_SUCCESS) { \
terrno = _code; \
} \
return _code; \
} while (0)
#define QW_ERR_JRET(c) \
do { \
code = c; \
if (code != TSDB_CODE_SUCCESS) { \
terrno = code; \
goto _return; \
} \
} while (0)
#define QW_ELOG(param, ...) qError("QW:%p " param, mgmt, __VA_ARGS__)
#define QW_DLOG(param, ...) qDebug("QW:%p " param, mgmt, __VA_ARGS__)
#define QW_DUMP(param, ...) do { if (gQWDebug.dumpEnable) { qDebug("QW:%p " param, mgmt, __VA_ARGS__); } } while (0)
#define QW_SCH_ELOG(param, ...) qError("QW:%p SID:%"PRIx64" " param, mgmt, sId, __VA_ARGS__)
#define QW_SCH_DLOG(param, ...) qDebug("QW:%p SID:%"PRIx64" " param, mgmt, sId, __VA_ARGS__)
#define QW_TASK_ELOG(param, ...) qError("QW:%p QID:0x%"PRIx64",TID:0x%"PRIx64" " param, mgmt, qId, tId, __VA_ARGS__)
#define QW_TASK_WLOG(param, ...) qWarn("QW:%p QID:0x%"PRIx64",TID:0x%"PRIx64" " param, mgmt, qId, tId, __VA_ARGS__)
#define QW_TASK_DLOG(param, ...) qDebug("QW:%p QID:0x%"PRIx64",TID:0x%"PRIx64" " param, mgmt, qId, tId, __VA_ARGS__)
#define QW_TASK_DLOGL(param, ...) qDebugL("QW:%p QID:0x%"PRIx64",TID:0x%"PRIx64" " param, mgmt, qId, tId, __VA_ARGS__)
#define QW_TASK_ELOG_E(param) qError("QW:%p QID:0x%"PRIx64",TID:0x%"PRIx64" " param, mgmt, qId, tId)
#define QW_TASK_WLOG_E(param) qWarn("QW:%p QID:0x%"PRIx64",TID:0x%"PRIx64" " param, mgmt, qId, tId)
#define QW_TASK_DLOG_E(param) qDebug("QW:%p QID:0x%"PRIx64",TID:0x%"PRIx64" " param, mgmt, qId, tId)
#define QW_SCH_TASK_ELOG(param, ...) qError("QW:%p SID:0x%"PRIx64",QID:0x%"PRIx64",TID:0x%"PRIx64" " param, mgmt, sId, qId, tId, __VA_ARGS__)
#define QW_SCH_TASK_WLOG(param, ...) qWarn("QW:%p SID:0x%"PRIx64",QID:0x%"PRIx64",TID:0x%"PRIx64" " param, mgmt, sId, qId, tId, __VA_ARGS__)
#define QW_SCH_TASK_DLOG(param, ...) qDebug("QW:%p SID:0x%"PRIx64",QID:0x%"PRIx64",TID:0x%"PRIx64" " param, mgmt, sId, qId, tId, __VA_ARGS__)
#define QW_LOCK_DEBUG(...) do { if (gQWDebug.lockEnable) { qDebug(__VA_ARGS__); } } while (0)
#define QW_DUMP(param, ...) \
do { \
if (gQWDebug.dumpEnable) { \
qDebug("QW:%p " param, mgmt, __VA_ARGS__); \
} \
} while (0)
#define QW_SCH_ELOG(param, ...) qError("QW:%p SID:%" PRIx64 " " param, mgmt, sId, __VA_ARGS__)
#define QW_SCH_DLOG(param, ...) qDebug("QW:%p SID:%" PRIx64 " " param, mgmt, sId, __VA_ARGS__)
#define QW_TASK_ELOG(param, ...) qError("QW:%p QID:0x%" PRIx64 ",TID:0x%" PRIx64 " " param, mgmt, qId, tId, __VA_ARGS__)
#define QW_TASK_WLOG(param, ...) qWarn("QW:%p QID:0x%" PRIx64 ",TID:0x%" PRIx64 " " param, mgmt, qId, tId, __VA_ARGS__)
#define QW_TASK_DLOG(param, ...) qDebug("QW:%p QID:0x%" PRIx64 ",TID:0x%" PRIx64 " " param, mgmt, qId, tId, __VA_ARGS__)
#define QW_TASK_DLOGL(param, ...) \
qDebugL("QW:%p QID:0x%" PRIx64 ",TID:0x%" PRIx64 " " param, mgmt, qId, tId, __VA_ARGS__)
#define QW_TASK_ELOG_E(param) qError("QW:%p QID:0x%" PRIx64 ",TID:0x%" PRIx64 " " param, mgmt, qId, tId)
#define QW_TASK_WLOG_E(param) qWarn("QW:%p QID:0x%" PRIx64 ",TID:0x%" PRIx64 " " param, mgmt, qId, tId)
#define QW_TASK_DLOG_E(param) qDebug("QW:%p QID:0x%" PRIx64 ",TID:0x%" PRIx64 " " param, mgmt, qId, tId)
#define QW_SCH_TASK_ELOG(param, ...) \
qError("QW:%p SID:0x%" PRIx64 ",QID:0x%" PRIx64 ",TID:0x%" PRIx64 " " param, mgmt, sId, qId, tId, __VA_ARGS__)
#define QW_SCH_TASK_WLOG(param, ...) \
qWarn("QW:%p SID:0x%" PRIx64 ",QID:0x%" PRIx64 ",TID:0x%" PRIx64 " " param, mgmt, sId, qId, tId, __VA_ARGS__)
#define QW_SCH_TASK_DLOG(param, ...) \
qDebug("QW:%p SID:0x%" PRIx64 ",QID:0x%" PRIx64 ",TID:0x%" PRIx64 " " param, mgmt, sId, qId, tId, __VA_ARGS__)
#define QW_LOCK_DEBUG(...) \
do { \
if (gQWDebug.lockEnable) { \
qDebug(__VA_ARGS__); \
} \
} while (0)
#define TD_RWLATCH_WRITE_FLAG_COPY 0x40000000
#define QW_LOCK(type, _lock) do { \
if (QW_READ == (type)) { \
assert(atomic_load_32((_lock)) >= 0); \
QW_LOCK_DEBUG("QW RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosRLockLatch(_lock); \
QW_LOCK_DEBUG("QW RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) > 0); \
} else { \
assert(atomic_load_32((_lock)) >= 0); \
QW_LOCK_DEBUG("QW WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosWLockLatch(_lock); \
QW_LOCK_DEBUG("QW WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \
} \
} while (0)
#define QW_UNLOCK(type, _lock) do { \
if (QW_READ == (type)) { \
assert(atomic_load_32((_lock)) > 0); \
QW_LOCK_DEBUG("QW RULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosRUnLockLatch(_lock); \
QW_LOCK_DEBUG("QW RULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) >= 0); \
} else { \
assert(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \
QW_LOCK_DEBUG("QW WULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosWUnLockLatch(_lock); \
QW_LOCK_DEBUG("QW WULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) >= 0); \
} \
} while (0)
#define QW_LOCK(type, _lock) \
do { \
if (QW_READ == (type)) { \
assert(atomic_load_32((_lock)) >= 0); \
QW_LOCK_DEBUG("QW RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosRLockLatch(_lock); \
QW_LOCK_DEBUG("QW RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) > 0); \
} else { \
assert(atomic_load_32((_lock)) >= 0); \
QW_LOCK_DEBUG("QW WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosWLockLatch(_lock); \
QW_LOCK_DEBUG("QW WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \
} \
} while (0)
#define QW_UNLOCK(type, _lock) \
do { \
if (QW_READ == (type)) { \
assert(atomic_load_32((_lock)) > 0); \
QW_LOCK_DEBUG("QW RULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosRUnLockLatch(_lock); \
QW_LOCK_DEBUG("QW RULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) >= 0); \
} else { \
assert(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \
QW_LOCK_DEBUG("QW WULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosWUnLockLatch(_lock); \
QW_LOCK_DEBUG("QW WULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) >= 0); \
} \
} while (0)
#ifdef __cplusplus
}
......
......@@ -369,7 +369,7 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
}
for (int32_t n = 0; n < childNum; ++n) {
SSubplan *child = (SSubplan *)nodesListGetNode(pPlan->pChildren, n);
SSubplan * child = (SSubplan *)nodesListGetNode(pPlan->pChildren, n);
SSchTask **childTask = taosHashGet(planToTask, &child, POINTER_BYTES);
if (NULL == childTask || NULL == *childTask) {
SCH_TASK_ELOG("subplan children relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
......@@ -401,7 +401,7 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
}
for (int32_t n = 0; n < parentNum; ++n) {
SSubplan *parent = (SSubplan *)nodesListGetNode(pPlan->pParents, n);
SSubplan * parent = (SSubplan *)nodesListGetNode(pPlan->pParents, n);
SSchTask **parentTask = taosHashGet(planToTask, &parent, POINTER_BYTES);
if (NULL == parentTask || NULL == *parentTask) {
SCH_TASK_ELOG("subplan parent relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
......@@ -491,7 +491,7 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
SSchLevel level = {0};
SNodeListNode *plans = NULL;
int32_t taskNum = 0;
SSchLevel *pLevel = NULL;
SSchLevel * pLevel = NULL;
level.status = JOB_TASK_STATUS_NOT_START;
......@@ -1267,7 +1267,7 @@ int32_t schUpdateTaskExecNodeHandle(SSchTask *pTask, void *handle, int32_t rspCo
int32_t schHandleCallback(void *param, const SDataBuf *pMsg, int32_t msgType, int32_t rspCode) {
int32_t code = 0;
SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param;
SSchTask *pTask = NULL;
SSchTask * pTask = NULL;
SSchJob *pJob = schAcquireJob(pParam->refId);
if (NULL == pJob) {
......@@ -1617,8 +1617,8 @@ _return:
int32_t schMakeHbRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) {
int32_t code = 0;
SSchHbCallbackParam *param = NULL;
SMsgSendInfo *pMsgSendInfo = NULL;
SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
SMsgSendInfo * pMsgSendInfo = NULL;
SQueryNodeAddr * addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
SQueryNodeEpId epId = {0};
epId.nodeId = addr->nodeId;
......@@ -1759,10 +1759,10 @@ int32_t schCloneHbRpcCtx(SRpcCtx *pSrc, SRpcCtx *pDst) {
}
SRpcCtxVal dst = {0};
void *pIter = taosHashIterate(pSrc->args, NULL);
void * pIter = taosHashIterate(pSrc->args, NULL);
while (pIter) {
SRpcCtxVal *pVal = (SRpcCtxVal *)pIter;
int32_t *msgType = taosHashGetKey(pIter, NULL);
int32_t * msgType = taosHashGetKey(pIter, NULL);
dst = *pVal;
dst.val = NULL;
......@@ -1916,7 +1916,7 @@ _return:
int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, int32_t msgType) {
uint32_t msgSize = 0;
void *msg = NULL;
void * msg = NULL;
int32_t code = 0;
bool isCandidateAddr = false;
bool persistHandle = false;
......@@ -2673,7 +2673,7 @@ int32_t schedulerGetTasksStatus(int64_t job, SArray *pSub) {
SSchLevel *pLevel = taosArrayGet(pJob->levels, i);
for (int32_t m = 0; m < pLevel->taskNum; ++m) {
SSchTask *pTask = taosArrayGet(pLevel->subTasks, m);
SSchTask * pTask = taosArrayGet(pLevel->subTasks, m);
SQuerySubDesc subDesc = {.tid = pTask->taskId, .status = pTask->status};
taosArrayPush(pSub, &subDesc);
......@@ -2734,7 +2734,7 @@ void schedulerFreeTaskList(SArray *taskList) {
void schedulerDestroy(void) {
if (schMgmt.jobRef) {
SSchJob *pJob = taosIterateRef(schMgmt.jobRef, 0);
int64_t refId = 0;
int64_t refId = 0;
while (pJob) {
refId = pJob->refId;
......@@ -2751,12 +2751,12 @@ void schedulerDestroy(void) {
}
if (schMgmt.hbConnections) {
void *pIter = taosHashIterate(schMgmt.hbConnections, NULL);
void *pIter = taosHashIterate(schMgmt.hbConnections, NULL);
while (pIter != NULL) {
SSchHbTrans *hb = pIter;
schFreeRpcCtx(&hb->rpcCtx);
pIter = taosHashIterate(schMgmt.hbConnections, pIter);
}
}
taosHashCleanup(schMgmt.hbConnections);
schMgmt.hbConnections = NULL;
}
......
......@@ -45,11 +45,12 @@ typedef struct SSrvConn {
struct sockaddr_in addr;
struct sockaddr_in locaddr;
int spi;
char info[64];
char user[TSDB_UNI_LEN]; // user ID for the link
char secret[TSDB_PASSWORD_LEN];
char ckey[TSDB_PASSWORD_LEN]; // ciphering key
int64_t refId;
int spi;
char info[64];
char user[TSDB_UNI_LEN]; // user ID for the link
char secret[TSDB_PASSWORD_LEN];
char ckey[TSDB_PASSWORD_LEN]; // ciphering key
} SSrvConn;
typedef struct SSrvMsg {
......@@ -89,6 +90,13 @@ typedef struct SServerObj {
uv_async_t* pAcceptAsync; // just to quit from from accept thread
} SServerObj;
// handle
typedef struct SExHandle {
void* handle;
int64_t refId;
SWorkThrdObj* pThrd;
} SExHandle;
static const char* notify = "a";
static void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
......@@ -130,6 +138,15 @@ static void uvHandleRegister(SSrvMsg* msg, SWorkThrdObj* thrd);
static void (*transAsyncHandle[])(SSrvMsg* msg, SWorkThrdObj* thrd) = {uvHandleResp, uvHandleQuit, uvHandleRelease,
uvHandleRegister};
static int exHandlesMgt;
void uvOpenExHandleMgt(int size);
int64_t uvAddExHandle(void* p);
int32_t uvRemoveExHandle(int64_t refId);
int32_t uvReleaseExHandle(int64_t refId);
void uvDestoryExHandle(void* handle);
SExHandle* uvAcquireExHandle(int64_t refId);
static void uvDestroyConn(uv_handle_t* handle);
// server and worker thread
......@@ -168,6 +185,25 @@ static bool addHandleToAcceptloop(void* arg);
uv_loop_close(loop); \
} while (0);
#define ASYNC_ERR_JRET(thrd) \
do { \
if (thrd->quit) { \
tTrace("worker thread already quit, ignore msg"); \
goto _return; \
} \
} while (0)
#define ASYNC_CHECK_HANDLE(exh1, refId) \
do { \
if (refId != -1) { \
SExHandle* exh2 = uvAcquireExHandle(refId); \
if (exh2 == NULL || exh1 != exh2) { \
tTrace("server conn %p except, may already freed, ignore msg", exh2 ? exh2->handle : NULL); \
goto _return; \
} \
} \
} while (0)
void uvAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
SSrvConn* conn = handle->data;
SConnBuffer* pBuf = &conn->readBuf;
......@@ -233,7 +269,11 @@ static void uvHandleReq(SSrvConn* pConn) {
// 1. server application should not send resp on handle
// 2. once send out data, cli conn released to conn pool immediately
// 3. not mixed with persist
transMsg.handle = pConn;
transMsg.handle = (void*)uvAcquireExHandle(pConn->refId);
if (pHead->noResp == 1) {
// transMsg.refId = -1;
}
uvReleaseExHandle(pConn->refId);
STrans* pTransInst = pConn->pTransInst;
(*pTransInst->cfp)(pTransInst->parent, &transMsg, NULL);
......@@ -436,6 +476,18 @@ void uvWorkerAsyncCb(uv_async_t* handle) {
tError("unexcept occurred, continue");
continue;
}
// release handle to rpc init
STransMsg transMsg = msg->msg;
SExHandle* exh1 = transMsg.handle;
int64_t refId = transMsg.refId;
SExHandle* exh2 = uvAcquireExHandle(refId);
if (exh2 == NULL || exh1 != exh2) {
uvReleaseExHandle(refId);
destroySmsg(msg);
continue;
}
msg->pConn = exh1->handle;
(*transAsyncHandle[msg->type])(msg, pThrd);
}
}
......@@ -658,6 +710,13 @@ static SSrvConn* createConn(void* hThrd) {
pConn->broken = false;
pConn->status = ConnNormal;
SExHandle* exh = taosMemoryMalloc(sizeof(SExHandle));
exh->handle = pConn;
exh->pThrd = pThrd;
exh->refId = uvAddExHandle(exh);
uvAcquireExHandle(exh->refId);
pConn->refId = exh->refId;
transRefSrvHandle(pConn);
tTrace("server conn %p created", pConn);
return pConn;
......@@ -667,6 +726,9 @@ static void destroyConn(SSrvConn* conn, bool clear) {
if (conn == NULL) {
return;
}
uvReleaseExHandle(conn->refId);
uvRemoveExHandle(conn->refId);
transDestroyBuffer(&conn->readBuf);
if (clear) {
tTrace("server conn %p to be destroyed", conn);
......@@ -705,6 +767,8 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
srv->port = port;
uv_loop_init(srv->loop);
uvOpenExHandleMgt(10000);
for (int i = 0; i < srv->numOfThreads; i++) {
SWorkThrdObj* thrd = (SWorkThrdObj*)taosMemoryCalloc(1, sizeof(SWorkThrdObj));
thrd->quit = false;
......@@ -749,6 +813,35 @@ End:
transCloseServer(srv);
return NULL;
}
void uvOpenExHandleMgt(int size) {
// added into once later
exHandlesMgt = taosOpenRef(size, uvDestoryExHandle);
}
int64_t uvAddExHandle(void* p) {
// acquire extern handle
return taosAddRef(exHandlesMgt, p);
}
int32_t uvRemoveExHandle(int64_t refId) {
// acquire extern handle
return taosRemoveRef(exHandlesMgt, refId);
}
SExHandle* uvAcquireExHandle(int64_t refId) {
// acquire extern handle
return (SExHandle*)taosAcquireRef(exHandlesMgt, refId);
}
int32_t uvReleaseExHandle(int64_t refId) {
// release extern handle
return taosReleaseRef(exHandlesMgt, refId);
}
void uvDestoryExHandle(void* handle) {
if (handle == NULL) {
return;
}
taosMemoryFree(handle);
}
void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd) {
thrd->quit = true;
if (QUEUE_IS_EMPTY(&thrd->conn)) {
......@@ -759,7 +852,6 @@ void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd) {
taosMemoryFree(msg);
}
void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd) {
// release handle to rpc init
SSrvConn* conn = msg->pConn;
if (conn->status == ConnAcquire) {
if (!transQueuePush(&conn->srvMsgs, msg)) {
......@@ -862,55 +954,66 @@ void transUnrefSrvHandle(void* handle) {
}
void transReleaseSrvHandle(void* handle) {
if (handle == NULL) {
return;
}
SSrvConn* pConn = handle;
SWorkThrdObj* pThrd = pConn->hostThrd;
SExHandle* exh = handle;
// TODO(yihaoDeng): not safy here,
int64_t refId = exh->refId;
ASYNC_CHECK_HANDLE(exh, refId);
STransMsg tmsg = {.code = 0, .handle = handle, .ahandle = NULL};
SWorkThrdObj* pThrd = exh->pThrd;
ASYNC_ERR_JRET(pThrd);
STransMsg tmsg = {.code = 0, .handle = exh, .ahandle = NULL, .refId = exh->refId};
SSrvMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSrvMsg));
srvMsg->msg = tmsg;
srvMsg->type = Release;
srvMsg->pConn = pConn;
tTrace("server conn %p start to release", pConn);
tTrace("server conn %p start to release", exh->handle);
transSendAsync(pThrd->asyncPool, &srvMsg->q);
uvReleaseExHandle(refId);
return;
_return:
uvReleaseExHandle(refId);
}
void transSendResponse(const STransMsg* pMsg) {
if (pMsg->handle == NULL) {
return;
}
SSrvConn* pConn = pMsg->handle;
SWorkThrdObj* pThrd = pConn->hostThrd;
if (pThrd->quit) {
return;
}
void transSendResponse(const STransMsg* msg) {
SExHandle* exh = msg->handle;
int64_t refId = msg->refId;
ASYNC_CHECK_HANDLE(exh, refId);
SWorkThrdObj* pThrd = exh->pThrd;
ASYNC_ERR_JRET(pThrd);
SSrvMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSrvMsg));
srvMsg->pConn = pConn;
srvMsg->msg = *pMsg;
srvMsg->msg = *msg;
srvMsg->type = Normal;
tTrace("server conn %p start to send resp (1/2)", pConn);
tTrace("server conn %p start to send resp (1/2)", exh->handle);
transSendAsync(pThrd->asyncPool, &srvMsg->q);
uvReleaseExHandle(refId);
return;
_return:
uvReleaseExHandle(refId);
}
void transRegisterMsg(const STransMsg* msg) {
if (msg->handle == NULL) {
return;
}
SSrvConn* pConn = msg->handle;
SWorkThrdObj* pThrd = pConn->hostThrd;
SExHandle* exh = NULL;
int64_t refId = msg->refId;
ASYNC_CHECK_HANDLE(exh, refId);
SWorkThrdObj* pThrd = exh->pThrd;
ASYNC_ERR_JRET(pThrd);
SSrvMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSrvMsg));
srvMsg->pConn = pConn;
srvMsg->msg = *msg;
srvMsg->type = Register;
tTrace("server conn %p start to register brokenlink callback", pConn);
tTrace("server conn %p start to register brokenlink callback", exh->handle);
transSendAsync(pThrd->asyncPool, &srvMsg->q);
uvReleaseExHandle(refId);
return;
_return:
uvReleaseExHandle(refId);
}
int transGetConnInfo(void* thandle, STransHandleInfo* pInfo) {
SSrvConn* pConn = thandle;
SExHandle* ex = thandle;
SSrvConn* pConn = ex->handle;
struct sockaddr_in addr = pConn->addr;
pInfo->clientIp = (uint32_t)(addr.sin_addr.s_addr);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册