提交 c21e15d9 编写于 作者: D dapan1121

feature/qnode

上级 16800098
......@@ -193,15 +193,15 @@ typedef struct SQWorkerMgmt {
#define QW_UNLOCK(type, _lock) do { \
if (QW_READ == (type)) { \
assert(atomic_load_32((_lock)) > 0); \
qDebug("QW RULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
QW_LOCK_DEBUG("QW RULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosRUnLockLatch(_lock); \
qDebug("QW RULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
QW_LOCK_DEBUG("QW RULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) >= 0); \
} else { \
assert(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \
qDebug("QW WULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
QW_LOCK_DEBUG("QW WULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosWUnLockLatch(_lock); \
qDebug("QW WULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
QW_LOCK_DEBUG("QW WULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) >= 0); \
} \
} while (0)
......
......@@ -324,11 +324,11 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
SQWMsg qwMsg = {.node = node, .msg = msg->msg, .msgLen = msg->contentLen, .connection = pMsg};
QW_SCH_TASK_DLOG("processQuery start");
QW_SCH_TASK_DLOG("processQuery start, node:%p", node);
QW_RET(qwProcessQuery(QW_FPARAMS(), &qwMsg));
QW_SCH_TASK_DLOG("processQuery end");
QW_SCH_TASK_DLOG("processQuery end, node:%p", node);
return TSDB_CODE_SUCCESS;
}
......@@ -357,11 +357,11 @@ int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connection = pMsg};
QW_SCH_TASK_DLOG("processCQuery start");
QW_SCH_TASK_DLOG("processCQuery start, node:%p", node);
QW_ERR_RET(qwProcessCQuery(QW_FPARAMS(), &qwMsg));
QW_SCH_TASK_DLOG("processCQuery end");
QW_SCH_TASK_DLOG("processCQuery end, node:%p", node);
return TSDB_CODE_SUCCESS;
}
......@@ -396,6 +396,8 @@ int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg){
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt;
msg->sId = be64toh(msg->sId);
msg->queryId = be64toh(msg->queryId);
msg->taskId = be64toh(msg->taskId);
......@@ -406,11 +408,11 @@ int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg){
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connection = pMsg};
QW_SCH_TASK_DLOG("processReady start");
QW_SCH_TASK_DLOG("processReady start, node:%p", node);
QW_ERR_RET(qwProcessReady(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, &qwMsg));
QW_SCH_TASK_DLOG("processReady end");
QW_SCH_TASK_DLOG("processReady end, node:%p", node);
return TSDB_CODE_SUCCESS;
}
......@@ -462,11 +464,11 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connection = pMsg};
QW_SCH_TASK_DLOG("processFetch start");
QW_SCH_TASK_DLOG("processFetch start, node:%p", node);
QW_ERR_RET(qwProcessFetch(QW_FPARAMS(), &qwMsg));
QW_SCH_TASK_DLOG("processFetch end");
QW_SCH_TASK_DLOG("processFetch end, node:%p", node);
return TSDB_CODE_SUCCESS;
}
......@@ -520,11 +522,11 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connection = pMsg};
QW_SCH_TASK_DLOG("processDrop start");
QW_SCH_TASK_DLOG("processDrop start, node:%p", node);
QW_ERR_RET(qwProcessDrop(QW_FPARAMS(), &qwMsg));
QW_SCH_TASK_DLOG("processDrop end");
QW_SCH_TASK_DLOG("processDrop end, node:%p", node);
return TSDB_CODE_SUCCESS;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册