提交 27a3a1d7 编写于 作者: D dapan1121

feature/qnode

上级 7602f916
......@@ -915,11 +915,19 @@ typedef struct {
char data[];
} SRetrieveMetaTableRsp;
typedef struct SExplainExecInfo {
uint64_t startupCost;
uint64_t totalCost;
uint64_t numOfRows;
} SExplainExecInfo;
typedef struct {
int32_t numOfItems;
char data[];
int32_t numOfPlans;
SExplainExecInfo *subplanInfo;
} SExplainRsp;
int32_t tSerializeSExplainRsp(void* buf, int32_t bufLen, SExplainRsp* pRsp);
int32_t tDeserializeSExplainRsp(void* buf, int32_t bufLen, SExplainRsp* pRsp);
typedef struct {
char fqdn[TSDB_FQDN_LEN]; // end point, hostname:port
......
......@@ -19,17 +19,11 @@
typedef struct SExplainCtx SExplainCtx;
typedef struct SExplainExecInfo {
uint64_t startupCost;
uint64_t totalCost;
uint64_t numOfRows;
} SExplainExecInfo;
int32_t qExecCommand(SNode* pStmt, SRetrieveTableRsp** pRsp);
int32_t qExecStaticExplain(SQueryPlan *pDag, SRetrieveTableRsp **pRsp);
int32_t qExecExplainBegin(SQueryPlan *pDag, SExplainCtx **pCtx, int32_t startTs);
int32_t qExecExplainEnd(SExplainCtx *pCtx);
int32_t qExecExplainEnd(SExplainCtx *pCtx, SRetrieveTableRsp **pRsp);
int32_t qExplainUpdateExecInfo(SExplainCtx *pCtx, SExplainRsp *pRspMsg, int32_t groupId, SRetrieveTableRsp **pRsp);
void qExplainFreeCtx(SExplainCtx *pCtx);
......
......@@ -217,6 +217,7 @@ int32_t nodesStringToList(const char* pStr, SNodeList** pList);
int32_t nodesNodeToSQL(SNode *pNode, char *buf, int32_t bufSize, int32_t *len);
char *nodesGetNameFromColumnNode(SNode *pNode);
int32_t nodesGetOutputNumFromSlotList(SNodeList* pSlots);
#ifdef __cplusplus
}
......
......@@ -76,6 +76,7 @@ void taosWLockLatch(SRWLatch *pLatch);
void taosWUnLockLatch(SRWLatch *pLatch);
void taosRLockLatch(SRWLatch *pLatch);
void taosRUnLockLatch(SRWLatch *pLatch);
int32_t taosWTryLockLatch(SRWLatch *pLatch);
// copy on read
#define taosCorBeginRead(x) \
......
......@@ -2787,6 +2787,48 @@ int32_t tDecodeSMqCMCommitOffsetReq(SCoder *decoder, SMqCMCommitOffsetReq *pReq)
return 0;
}
int32_t tSerializeSExplainRsp(void* buf, int32_t bufLen, SExplainRsp* pRsp) {
SCoder encoder = {0};
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->numOfPlans) < 0) return -1;
for (int32_t i = 0; i < pRsp->numOfPlans; ++i) {
SExplainExecInfo *info = &pRsp->subplanInfo[i];
if (tEncodeU64(&encoder, info->startupCost) < 0) return -1;
if (tEncodeU64(&encoder, info->totalCost) < 0) return -1;
if (tEncodeU64(&encoder, info->numOfRows) < 0) return -1;
}
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tCoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSExplainRsp(void* buf, int32_t bufLen, SExplainRsp* pRsp) {
SCoder decoder = {0};
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeI32(&decoder, &pRsp->numOfPlans) < 0) return -1;
if (pRsp->numOfPlans > 0) {
pRsp->subplanInfo = taosMemoryMalloc(pRsp->numOfPlans * sizeof(SExplainExecInfo));
if (pRsp->subplanInfo == NULL) return -1;
}
for (int32_t i = 0; i < pRsp->numOfPlans; ++i) {
if (tDecodeU64(&decoder, &pRsp->subplanInfo[i].startupCost) < 0) return -1;
if (tDecodeU64(&decoder, &pRsp->subplanInfo[i].totalCost) < 0) return -1;
if (tDecodeU64(&decoder, &pRsp->subplanInfo[i].numOfRows) < 0) return -1;
}
tEndDecode(&decoder);
tCoderClear(&decoder);
return 0;
}
int32_t tSerializeSSchedulerHbReq(void *buf, int32_t bufLen, SSchedulerHbReq *pReq) {
int32_t headLen = sizeof(SMsgHead);
if (buf != NULL) {
......
......@@ -26,39 +26,53 @@ extern "C" {
#define EXPLAIN_MAX_GROUP_NUM 100
//newline area
#define EXPLAIN_TAG_SCAN_FORMAT "Tag Scan on %s columns=%d width=%d"
#define EXPLAIN_TBL_SCAN_FORMAT "Table Scan on %s columns=%d width=%d"
#define EXPLAIN_SYSTBL_SCAN_FORMAT "System Table Scan on %s columns=%d width=%d"
#define EXPLAIN_PROJECTION_FORMAT "Projection columns=%d width=%d"
#define EXPLAIN_JOIN_FORMAT "%s between %d tables width=%d"
#define EXPLAIN_AGG_FORMAT "Aggragate functions=%d"
#define EXPLAIN_EXCHANGE_FORMAT "Data Exchange %d:1 width=%d"
#define EXPLAIN_SORT_FORMAT "Sort on %d Column(s) width=%d"
#define EXPLAIN_INTERVAL_FORMAT "Interval on Column %s functions=%d interval=%" PRId64 "%c offset=%" PRId64 "%c sliding=%" PRId64 "%c width=%d"
#define EXPLAIN_SESSION_FORMAT "Session gap=%" PRId64 " functions=%d width=%d"
#define EXPLAIN_TAG_SCAN_FORMAT "Tag Scan on %s"
#define EXPLAIN_TBL_SCAN_FORMAT "Table Scan on %s"
#define EXPLAIN_SYSTBL_SCAN_FORMAT "System Table Scan on %s"
#define EXPLAIN_PROJECTION_FORMAT "Projection"
#define EXPLAIN_JOIN_FORMAT "%s"
#define EXPLAIN_AGG_FORMAT "Aggragate"
#define EXPLAIN_EXCHANGE_FORMAT "Data Exchange %d:1"
#define EXPLAIN_SORT_FORMAT "Sort"
#define EXPLAIN_INTERVAL_FORMAT "Interval on Column %s"
#define EXPLAIN_SESSION_FORMAT "Session"
#define EXPLAIN_ORDER_FORMAT "Order: %s"
#define EXPLAIN_FILTER_FORMAT "Filter: "
#define EXPLAIN_FILL_FORMAT "Fill: %s"
#define EXPLAIN_ON_CONDITIONS_FORMAT "Join Cond: "
#define EXPLAIN_TIMERANGE_FORMAT "Time Range: [%" PRId64 ", %" PRId64 "]"
#define EXPLAIN_OUTPUT_FORMAT "Output: "
#define EXPLAIN_TIME_WINDOWS_FORMAT "Time Window: interval=%" PRId64 "%c offset=%" PRId64 "%c sliding=%" PRId64 "%c"
#define EXPLAIN_WINDOW_FORMAT "Window: gap=%" PRId64
//append area
#define EXPLAIN_GROUPS_FORMAT " groups=%d"
#define EXPLAIN_WIDTH_FORMAT " width=%d"
#define EXPLAIN_LOOPS_FORMAT " loops=%d"
#define EXPLAIN_REVERSE_FORMAT " reverse=%d"
#define EXPLAIN_LEFT_PARENTHESIS_FORMAT " ("
#define EXPLAIN_RIGHT_PARENTHESIS_FORMAT ")"
#define EXPLAIN_BLANK_FORMAT " "
#define EXPLAIN_COST_FORMAT "cost=%.2f..%.2f"
#define EXPLAIN_ROWS_FORMAT "rows=%" PRIu64
#define EXPLAIN_COLUMNS_FORMAT "columns=%d"
#define EXPLAIN_WIDTH_FORMAT "width=%d"
#define EXPLAIN_GROUPS_FORMAT "groups=%d"
#define EXPLAIN_WIDTH_FORMAT "width=%d"
#define EXPLAIN_LOOPS_FORMAT "loops=%d"
#define EXPLAIN_REVERSE_FORMAT "reverse=%d"
#define EXPLAIN_FUNCTIONS_FORMAT "functions=%d"
typedef struct SExplainGroup {
int32_t nodeNum;
int32_t physiPlanExecNum;
int32_t physiPlanNum;
int32_t physiPlanExecIdx;
SRWLatch lock;
SSubplan *plan;
SArray *execInfo;
SArray *nodeExecInfo; //Array<SExplainRsp>
} SExplainGroup;
typedef struct SExplainResNode {
SNodeList* pChildren;
SPhysiNode* pNode;
void* pExecInfo;
SNodeList* pChildren;
SPhysiNode* pNode;
SArray* pExecInfo; // Array<SExplainExecInfo>
} SExplainResNode;
typedef struct SQueryExplainRowInfo {
......@@ -68,19 +82,21 @@ typedef struct SQueryExplainRowInfo {
} SQueryExplainRowInfo;
typedef struct SExplainCtx {
double ratio;
bool verbose;
EExplainMode mode;
double ratio;
bool verbose;
int32_t rootGroupId;
int32_t dataSize;
bool execDone;
int64_t reqStartTs;
int64_t jobStartTs;
int64_t jobDoneTs;
char *tbuf;
SArray *rows;
int32_t groupDoneNum;
SHashObj *groupHash;
SRWLatch lock;
int32_t rootGroupId;
int32_t dataSize;
bool execDone;
int64_t reqStartTs;
int64_t jobStartTs;
int64_t jobDoneTs;
char *tbuf;
SArray *rows;
int32_t groupDoneNum;
SHashObj *groupHash; // Hash<SExplainGroup>
} SExplainCtx;
#define EXPLAIN_ORDER_STRING(_order) ((TSDB_ORDER_ASC == _order) ? "Ascending" : "Descending")
......
此差异已折叠。
......@@ -36,6 +36,7 @@ extern "C" {
#include "thash.h"
#include "tlockfree.h"
#include "tpagedbuf.h"
#include "tmsg.h"
struct SColumnFilterElem;
......@@ -160,7 +161,7 @@ typedef struct STaskCostInfo {
typedef struct SOperatorCostInfo {
uint64_t openCost;
uint64_t execCost;
uint64_t totalCost;
} SOperatorCostInfo;
// The basic query information extracted from the SQueryInfo tree to support the
......@@ -714,6 +715,7 @@ int32_t getMaximumIdleDurationSec();
void doInvokeUdf(struct SUdfInfo* pUdfInfo, SqlFunctionCtx* pCtx, int32_t idx, int32_t type);
void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status);
int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId, EOPTR_EXEC_MODEL model);
int32_t getOperatorExplainExecInfo(SOperatorInfo *operator, SExplainExecInfo **pRes, int32_t *capacity, int32_t *resNum);
#ifdef __cplusplus
}
......
......@@ -9523,11 +9523,27 @@ int32_t getOperatorExplainExecInfo(SOperatorInfo *operator, SExplainExecInfo **p
*pRes = taosMemoryRealloc(*pRes, (*capacity) * sizeof(SExplainExecInfo));
if (NULL == *pRes) {
qError("malloc %d failed", capacity * sizeof(SExplainExecInfo));
qError("malloc %d failed", (*capacity) * (int32_t)sizeof(SExplainExecInfo));
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
}
(*pRes)[*resNum].numOfRows = operator->resultInfo.totalRows;
(*pRes)[*resNum].startupCost = operator->cost.openCost;
(*pRes)[*resNum].totalCost = operator->cost.totalCost;
++(*resNum);
int32_t code = 0;
for (int32_t i = 0; i < operator->numOfDownstream; ++i) {
code = getOperatorExplainExecInfo(operator->pDownstream[i], pRes, capacity, resNum);
if (code) {
taosMemoryFreeClear(*pRes);
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
}
return TSDB_CODE_SUCCESS;
}
......@@ -1052,4 +1052,26 @@ char *nodesGetNameFromColumnNode(SNode *pNode) {
return ((SColumnNode *)pNode)->colName;
}
int32_t nodesGetOutputNumFromSlotList(SNodeList* pSlots) {
if (NULL == pSlots || pSlots->length <= 0) {
return 0;
}
SNode* pNode = NULL;
int32_t num = 0;
FOREACH(pNode, pSlots) {
if (QUERY_NODE_SLOT_DESC != pNode->type) {
continue;
}
SSlotDescNode *descNode = (SSlotDescNode *)pNode;
if (descNode->output) {
++num;
}
}
return num;
}
......@@ -23,7 +23,7 @@ extern "C" {
#include "qworkerInt.h"
#include "dataSinkMgt.h"
int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType);
int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t explain);
int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg);
int32_t qwProcessReady(QW_FPARAMS_DEF, SQWMsg *qwMsg);
int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg);
......@@ -38,6 +38,7 @@ void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComple
int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SQWConnInfo *pConn);
int32_t qwBuildAndSendReadyRsp(SQWConnInfo *pConn, int32_t code);
int32_t qwBuildAndSendQueryRsp(SQWConnInfo *pConn, int32_t code);
int32_t qwBuildAndSendExplainRsp(SQWConnInfo *pConn, SExplainExecInfo *execInfo, int32_t num);
void qwFreeFetchRsp(void *msg);
int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp);
int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t sId, SSchedulerStatusRsp **rsp);
......
......@@ -549,7 +549,10 @@ int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
int32_t resNum = 0;
QW_ERR_RET(qGetExplainExecInfo(ctx->taskHandle, &resNum, &execInfo));
QW_ERR_RET(qwBuildAndSendExplainRsp(&ctx->ctrlConnInfo, &rsp));
SQWConnInfo connInfo = {0};
connInfo.handle = ctx->ctrlConnInfo.handle;
QW_ERR_RET(qwBuildAndSendExplainRsp(&connInfo, execInfo, resNum));
}
qwFreeTaskHandle(QW_FPARAMS(), taskHandle);
......
......@@ -85,6 +85,27 @@ int32_t qwBuildAndSendReadyRsp(SQWConnInfo *pConn, int32_t code) {
return TSDB_CODE_SUCCESS;
}
int32_t qwBuildAndSendExplainRsp(SQWConnInfo *pConn, SExplainExecInfo *execInfo, int32_t num) {
SExplainRsp rsp = {.numOfPlans = num, .subplanInfo = execInfo};
int32_t contLen = tSerializeSExplainRsp(NULL, 0, &rsp);
void *pRsp = rpcMallocCont(contLen);
tSerializeSExplainRsp(pRsp, contLen, &rsp);
SRpcMsg rpcRsp = {
.msgType = TDMT_VND_EXPLAIN_RSP,
.handle = pConn->handle,
.ahandle = pConn->ahandle,
.pCont = pRsp,
.contLen = contLen,
.code = 0,
};
tmsgSendRsp(&rpcRsp);
return TSDB_CODE_SUCCESS;
}
int32_t qwBuildAndSendHbRsp(SQWConnInfo *pConn, SSchedulerHbRsp *pStatus, int32_t code) {
int32_t contLen = tSerializeSSchedulerHbRsp(NULL, 0, pStatus);
void *pRsp = rpcMallocCont(contLen);
......
......@@ -1043,6 +1043,19 @@ _return:
SCH_RET(schProcessOnTaskFailure(pJob, pJob->fetchTask, code));
}
int32_t schProcessOnExplainDone(SSchJob *pJob, SSchTask *pTask, SRetrieveTableRsp *pRsp) {
SCH_TASK_DLOG("got explain rsp, rows:%d, complete:%d", htonl(pRsp->numOfRows), pRsp->completed);
atomic_store_32(&pJob->resNumOfRows, htonl(pRsp->numOfRows));
atomic_store_ptr(&pJob->resData, pRsp);
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCCEED);
schProcessOnDataFetched(pJob);
return TSDB_CODE_SUCCESS;
}
// Note: no more task error processing, handled in function internal
int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, char *msg, int32_t msgSize,
int32_t rspCode) {
......@@ -1061,7 +1074,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
case TDMT_VND_CREATE_TABLE_RSP: {
SVCreateTbBatchRsp batchRsp = {0};
if (msg) {
tDeserializeSVCreateTbBatchRsp(msg, msgSize, &batchRsp);
SCH_ERR_JRET(tDeserializeSVCreateTbBatchRsp(msg, msgSize, &batchRsp));
if (batchRsp.rspList) {
int32_t num = taosArrayGetSize(batchRsp.rspList);
for (int32_t i = 0; i < num; ++i) {
......@@ -1099,7 +1112,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
case TDMT_VND_QUERY_RSP: {
SQueryTableRsp rsp = {0};
if (msg) {
tDeserializeSQueryTableRsp(msg, msgSize, &rsp);
SCH_ERR_JRET(tDeserializeSQueryTableRsp(msg, msgSize, &rsp));
SCH_ERR_JRET(rsp.code);
}
......@@ -1126,13 +1139,11 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
break;
}
case TDMT_VND_EXPLAIN_RSP: {
SExplainRsp *taskRsp = (SExplainRsp *)msg;
SCH_ERR_JRET(rspCode);
if (NULL == msg) {
SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
}
if (!SCH_IS_EXPLAIN_JOB(pJob)) {
SCH_TASK_ELOG("invalid msg received for none explain query, msg type:%s", TMSG_INFO(msgType));
SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
......@@ -1140,22 +1151,20 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
if (pJob->resData) {
SCH_TASK_ELOG("explain result is already generated, res:%p", pJob->resData);
taosMemoryFreeClear(taskRsp);
SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR);
}
SExplainRsp rsp = {0};
if (tDeserializeSExplainRsp(msg, msgSize, &rsp)) {
taosMemoryFree(rsp.subplanInfo);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SRetrieveTableRsp *pRsp = NULL;
SCH_ERR_JRET(qExplainUpdateExecInfo(pJob->explainCtx, taskRsp, pTask->plan->id.groupId, &pRsp));
SCH_ERR_JRET(qExplainUpdateExecInfo(pJob->explainCtx, &rsp, pTask->plan->id.groupId, &pRsp));
if (pRsp) {
atomic_store_ptr(&pJob->resData, pRsp);
atomic_store_32(&pJob->resNumOfRows, htonl(pRsp->numOfRows));
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCCEED);
SCH_TASK_DLOG("got explain rsp, rows:%d, complete:%d", htonl(pRsp->numOfRows), pRsp->completed);
schProcessOnDataFetched(pJob);
SCH_ERR_JRET(schProcessOnExplainDone(pJob, pTask, pRsp));
}
break;
}
......@@ -1168,8 +1177,13 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
}
if (SCH_IS_EXPLAIN_JOB(pJob)) {
if (rsp->completed) {
SCH_ERR_JRET(qExecExplainEnd(pJob->explainCtx));
if (rsp->completed) {
SRetrieveTableRsp *pRsp = NULL;
SCH_ERR_JRET(qExecExplainEnd(pJob->explainCtx, &pRsp));
if (pRsp) {
SCH_ERR_JRET(schProcessOnExplainDone(pJob, pTask, pRsp));
}
return TSDB_CODE_SUCCESS;
}
......@@ -1278,6 +1292,10 @@ int32_t schHandleReadyCallback(void *param, const SDataBuf *pMsg, int32_t code)
return schHandleCallback(param, pMsg, TDMT_VND_RES_READY_RSP, code);
}
int32_t schHandleExplainCallback(void *param, const SDataBuf *pMsg, int32_t code) {
return schHandleCallback(param, pMsg, TDMT_VND_EXPLAIN_RSP, code);
}
int32_t schHandleDropCallback(void *param, const SDataBuf *pMsg, int32_t code) {
SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param;
qDebug("QID:%" PRIx64 ",TID:%" PRIx64 " drop task rsp received, code:%x", pParam->queryId, pParam->taskId, code);
......@@ -1366,6 +1384,9 @@ int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) {
case TDMT_VND_RES_READY:
*fp = schHandleReadyCallback;
break;
case TDMT_VND_EXPLAIN:
*fp = schHandleExplainCallback;
break;
case TDMT_VND_FETCH:
*fp = schHandleFetchCallback;
break;
......@@ -1386,6 +1407,43 @@ int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) {
return TSDB_CODE_SUCCESS;
}
int32_t schGenerateTaskCallBackAHandle(SSchJob *pJob, SSchTask *pTask, int32_t msgType, SMsgSendInfo **pMsgSendInfo) {
int32_t code = 0;
SMsgSendInfo *msgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (NULL == msgSendInfo) {
SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SMsgSendInfo));
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SSchTaskCallbackParam *param = taosMemoryCalloc(1, sizeof(SSchTaskCallbackParam));
if (NULL == param) {
SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SSchTaskCallbackParam));
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
__async_send_cb_fn_t fp = NULL;
SCH_ERR_JRET(schGetCallbackFp(msgType, &fp));
param->queryId = pJob->queryId;
param->refId = pJob->refId;
param->taskId = SCH_TASK_ID(pTask);
param->transport = pJob->transport;
msgSendInfo->param = param;
msgSendInfo->fp = fp;
*pMsgSendInfo = msgSendInfo;
return TSDB_CODE_SUCCESS;
_return:
taosMemoryFree(param);
taosMemoryFree(msgSendInfo);
SCH_RET(code);
}
void schFreeRpcCtxVal(const void *arg) {
if (NULL == arg) {
return;
......@@ -1472,8 +1530,8 @@ _return:
int32_t schMakeQueryRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) {
int32_t code = 0;
SSchTaskCallbackParam *param = NULL;
SMsgSendInfo *pMsgSendInfo = NULL;
SMsgSendInfo *pReadyMsgSendInfo = NULL;
SMsgSendInfo *pExplainMsgSendInfo = NULL;
pCtx->args = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
if (NULL == pCtx->args) {
......@@ -1481,31 +1539,18 @@ int32_t schMakeQueryRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) {
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (NULL == pMsgSendInfo) {
SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SMsgSendInfo));
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SCH_ERR_JRET(schGenerateTaskCallBackAHandle(pJob, pTask, TDMT_VND_RES_READY, &pReadyMsgSendInfo));
SCH_ERR_JRET(schGenerateTaskCallBackAHandle(pJob, pTask, TDMT_VND_EXPLAIN, &pExplainMsgSendInfo));
param = taosMemoryCalloc(1, sizeof(SSchTaskCallbackParam));
if (NULL == param) {
SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SSchTaskCallbackParam));
int32_t msgType = TDMT_VND_RES_READY_RSP;
SRpcCtxVal ctxVal = {.val = pReadyMsgSendInfo, .clone = schCloneSMsgSendInfo, .freeFunc = schFreeRpcCtxVal};
if (taosHashPut(pCtx->args, &msgType, sizeof(msgType), &ctxVal, sizeof(ctxVal))) {
SCH_TASK_ELOG("taosHashPut msg %d to rpcCtx failed", msgType);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
int32_t msgType = TDMT_VND_RES_READY_RSP;
__async_send_cb_fn_t fp = NULL;
SCH_ERR_JRET(schGetCallbackFp(TDMT_VND_RES_READY, &fp));
param->queryId = pJob->queryId;
param->refId = pJob->refId;
param->taskId = SCH_TASK_ID(pTask);
param->transport = pJob->transport;
pMsgSendInfo->param = param;
pMsgSendInfo->fp = fp;
SRpcCtxVal ctxVal = {.val = pMsgSendInfo, .clone = schCloneSMsgSendInfo, .freeFunc = schFreeRpcCtxVal};
msgType = TDMT_VND_EXPLAIN_RSP;
ctxVal.val = pExplainMsgSendInfo;
if (taosHashPut(pCtx->args, &msgType, sizeof(msgType), &ctxVal, sizeof(ctxVal))) {
SCH_TASK_ELOG("taosHashPut msg %d to rpcCtx failed", msgType);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
......@@ -1518,8 +1563,16 @@ int32_t schMakeQueryRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) {
_return:
taosHashCleanup(pCtx->args);
taosMemoryFreeClear(param);
taosMemoryFreeClear(pMsgSendInfo);
if (pReadyMsgSendInfo) {
taosMemoryFreeClear(pReadyMsgSendInfo->param);
taosMemoryFreeClear(pReadyMsgSendInfo);
}
if (pExplainMsgSendInfo) {
taosMemoryFreeClear(pExplainMsgSendInfo->param);
taosMemoryFreeClear(pExplainMsgSendInfo);
}
SCH_RET(code);
}
......@@ -1702,32 +1755,13 @@ int32_t schAsyncSendMsg(SSchJob *pJob, SSchTask *pTask, void *transport, SEpSet
SSchTrans *trans = (SSchTrans *)transport;
SMsgSendInfo *pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (NULL == pMsgSendInfo) {
SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SMsgSendInfo));
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SSchTaskCallbackParam *param = taosMemoryCalloc(1, sizeof(SSchTaskCallbackParam));
if (NULL == param) {
SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SSchTaskCallbackParam));
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
__async_send_cb_fn_t fp = NULL;
SCH_ERR_JRET(schGetCallbackFp(msgType, &fp));
param->queryId = pJob->queryId;
param->refId = pJob->refId;
param->taskId = SCH_TASK_ID(pTask);
param->transport = trans->transInst;
SMsgSendInfo *pMsgSendInfo = NULL;
SCH_ERR_JRET(schGenerateTaskCallBackAHandle(pJob, pTask, msgType, &pMsgSendInfo));
pMsgSendInfo->param = param;
pMsgSendInfo->msgInfo.pData = msg;
pMsgSendInfo->msgInfo.len = msgSize;
pMsgSendInfo->msgInfo.handle = trans->transHandle;
pMsgSendInfo->msgType = msgType;
pMsgSendInfo->fp = fp;
qDebug("start to send %s msg to node[%d,%s,%d], refId:%" PRIx64 "instance:%p, handle:%p", TMSG_INFO(msgType),
ntohl(((SMsgHead *)msg)->vgId), epSet->eps[epSet->inUse].fqdn, epSet->eps[epSet->inUse].port, pJob->refId,
......@@ -1744,8 +1778,11 @@ int32_t schAsyncSendMsg(SSchJob *pJob, SSchTask *pTask, void *transport, SEpSet
_return:
taosMemoryFreeClear(param);
taosMemoryFreeClear(pMsgSendInfo);
if (pMsgSendInfo) {
taosMemoryFreeClear(pMsgSendInfo->param);
taosMemoryFreeClear(pMsgSendInfo);
}
SCH_RET(code);
}
......
......@@ -53,6 +53,21 @@ void taosWLockLatch(SRWLatch *pLatch) {
}
}
int32_t taosWTryLockLatch(SRWLatch *pLatch) {
SRWLatch oLatch, nLatch;
oLatch = atomic_load_32(pLatch);
if (oLatch) {
return -1;
}
nLatch = oLatch | TD_RWLATCH_WRITE_FLAG;
if (atomic_val_compare_exchange_32(pLatch, oLatch, nLatch) == oLatch) {
return 0;
}
return -1;
}
void taosWUnLockLatch(SRWLatch *pLatch) { atomic_store_32(pLatch, 0); }
void taosRLockLatch(SRWLatch *pLatch) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册