未验证 提交 dbaa4a58 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #13644 from taosdata/feature/qnode

feat: add get table index async API
...@@ -2510,6 +2510,7 @@ typedef struct { ...@@ -2510,6 +2510,7 @@ typedef struct {
int32_t tSerializeSTableIndexRsp(void* buf, int32_t bufLen, const STableIndexRsp* pRsp); int32_t tSerializeSTableIndexRsp(void* buf, int32_t bufLen, const STableIndexRsp* pRsp);
int32_t tDeserializeSTableIndexRsp(void* buf, int32_t bufLen, STableIndexRsp* pRsp); int32_t tDeserializeSTableIndexRsp(void* buf, int32_t bufLen, STableIndexRsp* pRsp);
void tFreeSTableIndexInfo(void *pInfo);
typedef struct { typedef struct {
......
...@@ -67,6 +67,7 @@ typedef struct SCatalogReq { ...@@ -67,6 +67,7 @@ typedef struct SCatalogReq {
SArray *pUdf; // element is udf name SArray *pUdf; // element is udf name
SArray *pIndex; // element is index name SArray *pIndex; // element is index name
SArray *pUser; // element is SUserAuthInfo SArray *pUser; // element is SUserAuthInfo
SArray *pTableIndex; // element is SNAME
bool qNodeRequired; // valid qnode bool qNodeRequired; // valid qnode
bool forceUpdate; bool forceUpdate;
} SCatalogReq; } SCatalogReq;
...@@ -82,6 +83,7 @@ typedef struct SMetaData { ...@@ -82,6 +83,7 @@ typedef struct SMetaData {
SArray *pDbInfo; // pRes = SDbInfo* SArray *pDbInfo; // pRes = SDbInfo*
SArray *pTableMeta; // pRes = STableMeta* SArray *pTableMeta; // pRes = STableMeta*
SArray *pTableHash; // pRes = SVgroupInfo* SArray *pTableHash; // pRes = SVgroupInfo*
SArray *pTableIndex; // pRes = SArray<STableIndexInfo>*
SArray *pUdfList; // pRes = SFuncInfo* SArray *pUdfList; // pRes = SFuncInfo*
SArray *pIndex; // pRes = SIndexInfo* SArray *pIndex; // pRes = SIndexInfo*
SArray *pUser; // pRes = bool* SArray *pUser; // pRes = bool*
...@@ -277,7 +279,7 @@ int32_t catalogGetDBCfg(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, cons ...@@ -277,7 +279,7 @@ int32_t catalogGetDBCfg(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, cons
int32_t catalogGetIndexMeta(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* indexName, SIndexInfo* pInfo); int32_t catalogGetIndexMeta(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* indexName, SIndexInfo* pInfo);
int32_t catalogGetTableIndex(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const char* tbFName, SArray** pRes); int32_t catalogGetTableIndex(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const SName* pTableName, SArray** pRes);
int32_t catalogGetUdfInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* funcName, SFuncInfo* pInfo); int32_t catalogGetUdfInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* funcName, SFuncInfo* pInfo);
......
...@@ -304,6 +304,7 @@ typedef struct SDownstreamSourceNode { ...@@ -304,6 +304,7 @@ typedef struct SDownstreamSourceNode {
typedef struct SExchangePhysiNode { typedef struct SExchangePhysiNode {
SPhysiNode node; SPhysiNode node;
int32_t srcGroupId; // group id of datasource suplans int32_t srcGroupId; // group id of datasource suplans
bool singleChannel;
SNodeList* pSrcEndPoints; // element is SDownstreamSource, scheduler fill by calling qSetSuplanExecutionNode SNodeList* pSrcEndPoints; // element is SDownstreamSource, scheduler fill by calling qSetSuplanExecutionNode
} SExchangePhysiNode; } SExchangePhysiNode;
......
...@@ -2491,6 +2491,15 @@ int32_t tDeserializeSTableIndexRsp(void *buf, int32_t bufLen, STableIndexRsp *pR ...@@ -2491,6 +2491,15 @@ int32_t tDeserializeSTableIndexRsp(void *buf, int32_t bufLen, STableIndexRsp *pR
return 0; return 0;
} }
void tFreeSTableIndexInfo(void* info) {
if (NULL == info) {
return;
}
STableIndexInfo *pInfo = (STableIndexInfo*)info;
taosMemoryFree(pInfo->expr);
}
int32_t tSerializeSShowReq(void *buf, int32_t bufLen, SShowReq *pReq) { int32_t tSerializeSShowReq(void *buf, int32_t bufLen, SShowReq *pReq) {
SEncoder encoder = {0}; SEncoder encoder = {0};
......
...@@ -67,6 +67,7 @@ typedef enum { ...@@ -67,6 +67,7 @@ typedef enum {
CTG_TASK_GET_DB_INFO, CTG_TASK_GET_DB_INFO,
CTG_TASK_GET_TB_META, CTG_TASK_GET_TB_META,
CTG_TASK_GET_TB_HASH, CTG_TASK_GET_TB_HASH,
CTG_TASK_GET_TB_INDEX,
CTG_TASK_GET_INDEX, CTG_TASK_GET_INDEX,
CTG_TASK_GET_UDF, CTG_TASK_GET_UDF,
CTG_TASK_GET_USER, CTG_TASK_GET_USER,
...@@ -93,6 +94,10 @@ typedef struct SCtgTbMetaCtx { ...@@ -93,6 +94,10 @@ typedef struct SCtgTbMetaCtx {
int32_t flag; int32_t flag;
} SCtgTbMetaCtx; } SCtgTbMetaCtx;
typedef struct SCtgTbIndexCtx {
SName* pName;
} SCtgTbIndexCtx;
typedef struct SCtgDbVgCtx { typedef struct SCtgDbVgCtx {
char dbFName[TSDB_DB_FNAME_LEN]; char dbFName[TSDB_DB_FNAME_LEN];
} SCtgDbVgCtx; } SCtgDbVgCtx;
...@@ -189,6 +194,7 @@ typedef struct SCtgJob { ...@@ -189,6 +194,7 @@ typedef struct SCtgJob {
int32_t indexNum; int32_t indexNum;
int32_t userNum; int32_t userNum;
int32_t dbInfoNum; int32_t dbInfoNum;
int32_t tbIndexNum;
} SCtgJob; } SCtgJob;
typedef struct SCtgMsgCtx { typedef struct SCtgMsgCtx {
...@@ -490,7 +496,7 @@ int32_t ctgGetDBVgInfoFromMnode(CTG_PARAMS, SBuildUseDBInput *input, SUseDbOutpu ...@@ -490,7 +496,7 @@ int32_t ctgGetDBVgInfoFromMnode(CTG_PARAMS, SBuildUseDBInput *input, SUseDbOutpu
int32_t ctgGetQnodeListFromMnode(CTG_PARAMS, SArray *out, SCtgTask* pTask); int32_t ctgGetQnodeListFromMnode(CTG_PARAMS, SArray *out, SCtgTask* pTask);
int32_t ctgGetDBCfgFromMnode(CTG_PARAMS, const char *dbFName, SDbCfgInfo *out, SCtgTask* pTask); int32_t ctgGetDBCfgFromMnode(CTG_PARAMS, const char *dbFName, SDbCfgInfo *out, SCtgTask* pTask);
int32_t ctgGetIndexInfoFromMnode(CTG_PARAMS, const char *indexName, SIndexInfo *out, SCtgTask* pTask); int32_t ctgGetIndexInfoFromMnode(CTG_PARAMS, const char *indexName, SIndexInfo *out, SCtgTask* pTask);
int32_t ctgGetTbIndexFromMnode(CTG_PARAMS, const char *tbFName, SArray** out, SCtgTask* pTask); int32_t ctgGetTbIndexFromMnode(CTG_PARAMS, SName* name, SArray** out, SCtgTask* pTask);
int32_t ctgGetUdfInfoFromMnode(CTG_PARAMS, const char *funcName, SFuncInfo *out, SCtgTask* pTask); int32_t ctgGetUdfInfoFromMnode(CTG_PARAMS, const char *funcName, SFuncInfo *out, SCtgTask* pTask);
int32_t ctgGetUserDbAuthFromMnode(CTG_PARAMS, const char *user, SGetUserAuthRsp *out, SCtgTask* pTask); int32_t ctgGetUserDbAuthFromMnode(CTG_PARAMS, const char *user, SGetUserAuthRsp *out, SCtgTask* pTask);
int32_t ctgGetTbMetaFromMnodeImpl(CTG_PARAMS, char *dbFName, char* tbName, STableMetaOutput* out, SCtgTask* pTask); int32_t ctgGetTbMetaFromMnodeImpl(CTG_PARAMS, char *dbFName, char* tbName, STableMetaOutput* out, SCtgTask* pTask);
......
...@@ -1136,14 +1136,14 @@ int32_t catalogGetIndexMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps ...@@ -1136,14 +1136,14 @@ int32_t catalogGetIndexMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps
CTG_API_LEAVE(ctgGetIndexInfoFromMnode(CTG_PARAMS_LIST(), indexName, pInfo, NULL)); CTG_API_LEAVE(ctgGetIndexInfoFromMnode(CTG_PARAMS_LIST(), indexName, pInfo, NULL));
} }
int32_t catalogGetTableIndex(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const char* tbFName, SArray** pRes) { int32_t catalogGetTableIndex(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const SName* pTableName, SArray** pRes) {
CTG_API_ENTER(); CTG_API_ENTER();
if (NULL == pCtg || NULL == pTrans || NULL == pMgmtEps || NULL == tbFName || NULL == pRes) { if (NULL == pCtg || NULL == pTrans || NULL == pMgmtEps || NULL == pTableName || NULL == pRes) {
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT); CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
} }
CTG_API_LEAVE(ctgGetTbIndexFromMnode(CTG_PARAMS_LIST(), tbFName, pRes, NULL)); CTG_API_LEAVE(ctgGetTbIndexFromMnode(CTG_PARAMS_LIST(), (SName*)pTableName, pRes, NULL));
} }
......
...@@ -44,7 +44,7 @@ int32_t ctgInitGetTbMetaTask(SCtgJob *pJob, int32_t taskIdx, SName *name) { ...@@ -44,7 +44,7 @@ int32_t ctgInitGetTbMetaTask(SCtgJob *pJob, int32_t taskIdx, SName *name) {
taosArrayPush(pJob->pTasks, &task); taosArrayPush(pJob->pTasks, &task);
qDebug("QID:0x%" PRIx64 " the %d task type %s initialized, tableName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), name->tname); qDebug("QID:0x%" PRIx64 " the %d task type %s initialized, tbName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), name->tname);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -232,6 +232,35 @@ int32_t ctgInitGetUserTask(SCtgJob *pJob, int32_t taskIdx, SUserAuthInfo *user) ...@@ -232,6 +232,35 @@ int32_t ctgInitGetUserTask(SCtgJob *pJob, int32_t taskIdx, SUserAuthInfo *user)
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgInitGetTbIndexTask(SCtgJob *pJob, int32_t taskIdx, SName *name) {
SCtgTask task = {0};
task.type = CTG_TASK_GET_TB_INDEX;
task.taskId = taskIdx;
task.pJob = pJob;
task.taskCtx = taosMemoryCalloc(1, sizeof(SCtgTbIndexCtx));
if (NULL == task.taskCtx) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
SCtgTbIndexCtx* ctx = task.taskCtx;
ctx->pName = taosMemoryMalloc(sizeof(*name));
if (NULL == ctx->pName) {
taosMemoryFree(task.taskCtx);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
memcpy(ctx->pName, name, sizeof(*name));
taosArrayPush(pJob->pTasks, &task);
qDebug("QID:%" PRIx64 " the %d task type %s initialized, tbName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), name->tname);
return TSDB_CODE_SUCCESS;
}
int32_t ctgHandleForceUpdate(SCatalog* pCtg, SCtgJob *pJob, const SCatalogReq* pReq) { int32_t ctgHandleForceUpdate(SCatalog* pCtg, SCtgJob *pJob, const SCatalogReq* pReq) {
int32_t dbNum = pJob->dbCfgNum + pJob->dbVgNum + pJob->dbInfoNum; int32_t dbNum = pJob->dbCfgNum + pJob->dbVgNum + pJob->dbInfoNum;
if (dbNum > 0) { if (dbNum > 0) {
...@@ -329,8 +358,9 @@ int32_t ctgInitJob(CTG_PARAMS, SCtgJob** job, uint64_t reqId, const SCatalogReq* ...@@ -329,8 +358,9 @@ int32_t ctgInitJob(CTG_PARAMS, SCtgJob** job, uint64_t reqId, const SCatalogReq*
int32_t indexNum = (int32_t)taosArrayGetSize(pReq->pIndex); int32_t indexNum = (int32_t)taosArrayGetSize(pReq->pIndex);
int32_t userNum = (int32_t)taosArrayGetSize(pReq->pUser); int32_t userNum = (int32_t)taosArrayGetSize(pReq->pUser);
int32_t dbInfoNum = (int32_t)taosArrayGetSize(pReq->pDbInfo); int32_t dbInfoNum = (int32_t)taosArrayGetSize(pReq->pDbInfo);
int32_t tbIndexNum = (int32_t)taosArrayGetSize(pReq->pTableIndex);
*taskNum = tbMetaNum + dbVgNum + udfNum + tbHashNum + qnodeNum + dbCfgNum + indexNum + userNum + dbInfoNum; *taskNum = tbMetaNum + dbVgNum + udfNum + tbHashNum + qnodeNum + dbCfgNum + indexNum + userNum + dbInfoNum + tbIndexNum;
if (*taskNum <= 0) { if (*taskNum <= 0) {
ctgDebug("Empty input for job, no need to retrieve meta, reqId:0x%" PRIx64, reqId); ctgDebug("Empty input for job, no need to retrieve meta, reqId:0x%" PRIx64, reqId);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -360,6 +390,7 @@ int32_t ctgInitJob(CTG_PARAMS, SCtgJob** job, uint64_t reqId, const SCatalogReq* ...@@ -360,6 +390,7 @@ int32_t ctgInitJob(CTG_PARAMS, SCtgJob** job, uint64_t reqId, const SCatalogReq*
pJob->indexNum = indexNum; pJob->indexNum = indexNum;
pJob->userNum = userNum; pJob->userNum = userNum;
pJob->dbInfoNum = dbInfoNum; pJob->dbInfoNum = dbInfoNum;
pJob->tbIndexNum = tbIndexNum;
pJob->pTasks = taosArrayInit(*taskNum, sizeof(SCtgTask)); pJob->pTasks = taosArrayInit(*taskNum, sizeof(SCtgTask));
...@@ -398,6 +429,11 @@ int32_t ctgInitJob(CTG_PARAMS, SCtgJob** job, uint64_t reqId, const SCatalogReq* ...@@ -398,6 +429,11 @@ int32_t ctgInitJob(CTG_PARAMS, SCtgJob** job, uint64_t reqId, const SCatalogReq*
CTG_ERR_JRET(ctgInitGetTbHashTask(pJob, taskIdx++, name)); CTG_ERR_JRET(ctgInitGetTbHashTask(pJob, taskIdx++, name));
} }
for (int32_t i = 0; i < tbIndexNum; ++i) {
SName* name = taosArrayGet(pReq->pTableIndex, i);
CTG_ERR_JRET(ctgInitGetTbIndexTask(pJob, taskIdx++, name));
}
for (int32_t i = 0; i < indexNum; ++i) { for (int32_t i = 0; i < indexNum; ++i) {
char* indexName = taosArrayGet(pReq->pIndex, i); char* indexName = taosArrayGet(pReq->pIndex, i);
CTG_ERR_JRET(ctgInitGetIndexTask(pJob, taskIdx++, indexName)); CTG_ERR_JRET(ctgInitGetIndexTask(pJob, taskIdx++, indexName));
...@@ -479,6 +515,21 @@ int32_t ctgDumpTbHashRes(SCtgTask* pTask) { ...@@ -479,6 +515,21 @@ int32_t ctgDumpTbHashRes(SCtgTask* pTask) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgDumpTbIndexRes(SCtgTask* pTask) {
SCtgJob* pJob = pTask->pJob;
if (NULL == pJob->jobRes.pTableIndex) {
pJob->jobRes.pTableIndex = taosArrayInit(pJob->tbIndexNum, sizeof(SMetaRes));
if (NULL == pJob->jobRes.pTableIndex) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
}
SMetaRes res = {.code = pTask->code, .pRes = pTask->res};
taosArrayPush(pJob->jobRes.pTableHash, &res);
return TSDB_CODE_SUCCESS;
}
int32_t ctgDumpIndexRes(SCtgTask* pTask) { int32_t ctgDumpIndexRes(SCtgTask* pTask) {
SCtgJob* pJob = pTask->pJob; SCtgJob* pJob = pTask->pJob;
if (NULL == pJob->jobRes.pIndex) { if (NULL == pJob->jobRes.pIndex) {
...@@ -817,6 +868,20 @@ _return: ...@@ -817,6 +868,20 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgHandleGetTbIndexRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) {
int32_t code = 0;
CTG_ERR_JRET(ctgProcessRspMsg(pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target));
TSWAP(pTask->res, pTask->msgCtx.out);
_return:
ctgHandleTaskEnd(pTask, code);
CTG_RET(code);
}
int32_t ctgHandleGetDbCfgRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) { int32_t ctgHandleGetDbCfgRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) {
int32_t code = 0; int32_t code = 0;
CTG_ERR_JRET(ctgProcessRspMsg(pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target)); CTG_ERR_JRET(ctgProcessRspMsg(pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target));
...@@ -1056,13 +1121,24 @@ _return: ...@@ -1056,13 +1121,24 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgLaunchGetTbIndexTask(SCtgTask *pTask) {
int32_t code = 0;
SCatalog* pCtg = pTask->pJob->pCtg;
void *pTrans = pTask->pJob->pTrans;
const SEpSet* pMgmtEps = &pTask->pJob->pMgmtEps;
SCtgTbIndexCtx* pCtx = (SCtgTbIndexCtx*)pTask->taskCtx;
CTG_ERR_RET(ctgGetTbIndexFromMnode(CTG_PARAMS_LIST(), pCtx->pName, NULL, pTask));
return TSDB_CODE_SUCCESS;
}
int32_t ctgLaunchGetQnodeTask(SCtgTask *pTask) { int32_t ctgLaunchGetQnodeTask(SCtgTask *pTask) {
SCatalog* pCtg = pTask->pJob->pCtg; SCatalog* pCtg = pTask->pJob->pCtg;
void *pTrans = pTask->pJob->pTrans; void *pTrans = pTask->pJob->pTrans;
const SEpSet* pMgmtEps = &pTask->pJob->pMgmtEps; const SEpSet* pMgmtEps = &pTask->pJob->pMgmtEps;
CTG_ERR_RET(ctgGetQnodeListFromMnode(CTG_PARAMS_LIST(), NULL, pTask)); CTG_ERR_RET(ctgGetQnodeListFromMnode(CTG_PARAMS_LIST(), NULL, pTask));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -1168,15 +1244,16 @@ int32_t ctgRelaunchGetTbMetaTask(SCtgTask *pTask) { ...@@ -1168,15 +1244,16 @@ int32_t ctgRelaunchGetTbMetaTask(SCtgTask *pTask) {
} }
SCtgAsyncFps gCtgAsyncFps[] = { SCtgAsyncFps gCtgAsyncFps[] = {
{ctgLaunchGetQnodeTask, ctgHandleGetQnodeRsp, ctgDumpQnodeRes}, {ctgLaunchGetQnodeTask, ctgHandleGetQnodeRsp, ctgDumpQnodeRes},
{ctgLaunchGetDbVgTask, ctgHandleGetDbVgRsp, ctgDumpDbVgRes}, {ctgLaunchGetDbVgTask, ctgHandleGetDbVgRsp, ctgDumpDbVgRes},
{ctgLaunchGetDbCfgTask, ctgHandleGetDbCfgRsp, ctgDumpDbCfgRes}, {ctgLaunchGetDbCfgTask, ctgHandleGetDbCfgRsp, ctgDumpDbCfgRes},
{ctgLaunchGetDbInfoTask, ctgHandleGetDbInfoRsp, ctgDumpDbInfoRes}, {ctgLaunchGetDbInfoTask, ctgHandleGetDbInfoRsp, ctgDumpDbInfoRes},
{ctgLaunchGetTbMetaTask, ctgHandleGetTbMetaRsp, ctgDumpTbMetaRes}, {ctgLaunchGetTbMetaTask, ctgHandleGetTbMetaRsp, ctgDumpTbMetaRes},
{ctgLaunchGetTbHashTask, ctgHandleGetTbHashRsp, ctgDumpTbHashRes}, {ctgLaunchGetTbHashTask, ctgHandleGetTbHashRsp, ctgDumpTbHashRes},
{ctgLaunchGetIndexTask, ctgHandleGetIndexRsp, ctgDumpIndexRes}, {ctgLaunchGetTbIndexTask, ctgHandleGetTbIndexRsp, ctgDumpTbIndexRes},
{ctgLaunchGetUdfTask, ctgHandleGetUdfRsp, ctgDumpUdfRes}, {ctgLaunchGetIndexTask, ctgHandleGetIndexRsp, ctgDumpIndexRes},
{ctgLaunchGetUserTask, ctgHandleGetUserRsp, ctgDumpUserRes}, {ctgLaunchGetUdfTask, ctgHandleGetUdfRsp, ctgDumpUdfRes},
{ctgLaunchGetUserTask, ctgHandleGetUserRsp, ctgDumpUserRes},
}; };
int32_t ctgMakeAsyncRes(SCtgJob *pJob) { int32_t ctgMakeAsyncRes(SCtgJob *pJob) {
......
...@@ -427,11 +427,13 @@ int32_t ctgGetIndexInfoFromMnode(CTG_PARAMS, const char *indexName, SIndexInfo * ...@@ -427,11 +427,13 @@ int32_t ctgGetIndexInfoFromMnode(CTG_PARAMS, const char *indexName, SIndexInfo *
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgGetTbIndexFromMnode(CTG_PARAMS, const char *tbFName, SArray** out, SCtgTask* pTask) { int32_t ctgGetTbIndexFromMnode(CTG_PARAMS, SName *name, SArray** out, SCtgTask* pTask) {
char *msg = NULL; char *msg = NULL;
int32_t msgLen = 0; int32_t msgLen = 0;
int32_t reqType = TDMT_MND_GET_TABLE_INDEX; int32_t reqType = TDMT_MND_GET_TABLE_INDEX;
void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont; void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
char tbFName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(name, tbFName);
ctgDebug("try to get tb index from mnode, tbFName:%s", tbFName); ctgDebug("try to get tb index from mnode, tbFName:%s", tbFName);
......
...@@ -59,6 +59,9 @@ void ctgFreeSMetaData(SMetaData* pData) { ...@@ -59,6 +59,9 @@ void ctgFreeSMetaData(SMetaData* pData) {
taosArrayDestroy(pData->pTableHash); taosArrayDestroy(pData->pTableHash);
pData->pTableHash = NULL; pData->pTableHash = NULL;
taosArrayDestroy(pData->pTableIndex);
pData->pTableIndex = NULL;
taosArrayDestroy(pData->pUdfList); taosArrayDestroy(pData->pUdfList);
pData->pUdfList = NULL; pData->pUdfList = NULL;
...@@ -248,6 +251,14 @@ void ctgFreeMsgCtx(SCtgMsgCtx* pCtx) { ...@@ -248,6 +251,14 @@ void ctgFreeMsgCtx(SCtgMsgCtx* pCtx) {
taosMemoryFreeClear(pCtx->out); taosMemoryFreeClear(pCtx->out);
break; break;
} }
case TDMT_MND_GET_TABLE_INDEX: {
SArray** pOut = (SArray**)pCtx->out;
if (pOut) {
taosArrayDestroyEx(*pOut, tFreeSTableIndexInfo);
taosMemoryFreeClear(pCtx->out);
}
break;
}
case TDMT_MND_RETRIEVE_FUNC: { case TDMT_MND_RETRIEVE_FUNC: {
SFuncInfo* pOut = (SFuncInfo*)pCtx->out; SFuncInfo* pOut = (SFuncInfo*)pCtx->out;
taosMemoryFree(pOut->pCode); taosMemoryFree(pOut->pCode);
...@@ -344,6 +355,13 @@ void ctgFreeTask(SCtgTask* pTask) { ...@@ -344,6 +355,13 @@ void ctgFreeTask(SCtgTask* pTask) {
taosMemoryFreeClear(pTask->res); taosMemoryFreeClear(pTask->res);
break; break;
} }
case CTG_TASK_GET_TB_INDEX: {
SCtgTbIndexCtx* taskCtx = (SCtgTbIndexCtx*)pTask->taskCtx;
taosMemoryFreeClear(taskCtx->pName);
taosMemoryFreeClear(pTask->taskCtx);
taosArrayDestroyEx(pTask->res, tFreeSTableIndexInfo);
break;
}
case CTG_TASK_GET_INDEX: { case CTG_TASK_GET_INDEX: {
taosMemoryFreeClear(pTask->taskCtx); taosMemoryFreeClear(pTask->taskCtx);
taosMemoryFreeClear(pTask->res); taosMemoryFreeClear(pTask->res);
......
...@@ -32,15 +32,17 @@ extern "C" { ...@@ -32,15 +32,17 @@ extern "C" {
#define EXPLAIN_PROJECTION_FORMAT "Projection" #define EXPLAIN_PROJECTION_FORMAT "Projection"
#define EXPLAIN_JOIN_FORMAT "%s" #define EXPLAIN_JOIN_FORMAT "%s"
#define EXPLAIN_AGG_FORMAT "Aggragate" #define EXPLAIN_AGG_FORMAT "Aggragate"
#define EXPLAIN_INDEF_ROWS_FORMAT "Indefinite Rows Function"
#define EXPLAIN_EXCHANGE_FORMAT "Data Exchange %d:1" #define EXPLAIN_EXCHANGE_FORMAT "Data Exchange %d:1"
#define EXPLAIN_SORT_FORMAT "Sort" #define EXPLAIN_SORT_FORMAT "Sort"
#define EXPLAIN_INTERVAL_FORMAT "Interval on Column %s" #define EXPLAIN_INTERVAL_FORMAT "Interval on Column %s"
#define EXPLAIN_FILL_FORMAT "Fill"
#define EXPLAIN_SESSION_FORMAT "Session" #define EXPLAIN_SESSION_FORMAT "Session"
#define EXPLAIN_STATE_WINDOW_FORMAT "StateWindow on Column %s" #define EXPLAIN_STATE_WINDOW_FORMAT "StateWindow on Column %s"
#define EXPLAIN_PARITION_FORMAT "Partition on Column %s" #define EXPLAIN_PARITION_FORMAT "Partition on Column %s"
#define EXPLAIN_ORDER_FORMAT "Order: %s" #define EXPLAIN_ORDER_FORMAT "Order: %s"
#define EXPLAIN_FILTER_FORMAT "Filter: " #define EXPLAIN_FILTER_FORMAT "Filter: "
#define EXPLAIN_FILL_FORMAT "Fill: %s" #define EXPLAIN_FILL_VALUE_FORMAT "Fill Values: "
#define EXPLAIN_ON_CONDITIONS_FORMAT "Join Cond: " #define EXPLAIN_ON_CONDITIONS_FORMAT "Join Cond: "
#define EXPLAIN_TIMERANGE_FORMAT "Time Range: [%" PRId64 ", %" PRId64 "]" #define EXPLAIN_TIMERANGE_FORMAT "Time Range: [%" PRId64 ", %" PRId64 "]"
#define EXPLAIN_OUTPUT_FORMAT "Output: " #define EXPLAIN_OUTPUT_FORMAT "Output: "
...@@ -66,6 +68,8 @@ extern "C" { ...@@ -66,6 +68,8 @@ extern "C" {
#define EXPLAIN_WIDTH_FORMAT "width=%d" #define EXPLAIN_WIDTH_FORMAT "width=%d"
#define EXPLAIN_FUNCTIONS_FORMAT "functions=%d" #define EXPLAIN_FUNCTIONS_FORMAT "functions=%d"
#define EXPLAIN_EXECINFO_FORMAT "cost=%.3f..%.3f rows=%" PRIu64 #define EXPLAIN_EXECINFO_FORMAT "cost=%.3f..%.3f rows=%" PRIu64
#define EXPLAIN_MODE_FORMAT "mode=%s"
#define EXPLAIN_STRING_TYPE_FORMAT "%s"
typedef struct SExplainGroup { typedef struct SExplainGroup {
int32_t nodeNum; int32_t nodeNum;
......
...@@ -179,6 +179,21 @@ int32_t qExplainGenerateResChildren(SPhysiNode *pNode, SExplainGroup *group, SNo ...@@ -179,6 +179,21 @@ int32_t qExplainGenerateResChildren(SPhysiNode *pNode, SExplainGroup *group, SNo
pPhysiChildren = mergePhysiNode->node.pChildren; pPhysiChildren = mergePhysiNode->node.pChildren;
break; break;
} }
case QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC: {
SIndefRowsFuncPhysiNode *indefPhysiNode = (SIndefRowsFuncPhysiNode *)pNode;
pPhysiChildren = indefPhysiNode->node.pChildren;
break;
}
case QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL: {
SMergeIntervalPhysiNode *intPhysiNode = (SMergeIntervalPhysiNode *)pNode;
pPhysiChildren = intPhysiNode->window.node.pChildren;
break;
}
case QUERY_NODE_PHYSICAL_PLAN_FILL: {
SFillPhysiNode *fillPhysiNode = (SFillPhysiNode *)pNode;
pPhysiChildren = fillPhysiNode->node.pChildren;
break;
}
default: default:
qError("not supported physical node type %d", pNode->type); qError("not supported physical node type %d", pNode->type);
QRY_ERR_RET(TSDB_CODE_QRY_APP_ERROR); QRY_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
...@@ -212,12 +227,15 @@ int32_t qExplainGenerateResNodeExecInfo(SArray **pExecInfo, SExplainGroup *group ...@@ -212,12 +227,15 @@ int32_t qExplainGenerateResNodeExecInfo(SArray **pExecInfo, SExplainGroup *group
SExplainRsp *rsp = NULL; SExplainRsp *rsp = NULL;
for (int32_t i = 0; i < group->nodeNum; ++i) { for (int32_t i = 0; i < group->nodeNum; ++i) {
rsp = taosArrayGet(group->nodeExecInfo, i); rsp = taosArrayGet(group->nodeExecInfo, i);
/*
if (group->physiPlanExecIdx >= rsp->numOfPlans) { if (group->physiPlanExecIdx >= rsp->numOfPlans) {
qError("physiPlanIdx %d exceed plan num %d", group->physiPlanExecIdx, rsp->numOfPlans); qError("physiPlanIdx %d exceed plan num %d", group->physiPlanExecIdx, rsp->numOfPlans);
return TSDB_CODE_QRY_APP_ERROR; return TSDB_CODE_QRY_APP_ERROR;
} }
taosArrayPush(*pExecInfo, rsp->subplanInfo + group->physiPlanExecIdx); taosArrayPush(*pExecInfo, rsp->subplanInfo + group->physiPlanExecIdx);
*/
taosArrayPush(*pExecInfo, rsp->subplanInfo);
} }
++group->physiPlanExecIdx; ++group->physiPlanExecIdx;
...@@ -599,6 +617,42 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i ...@@ -599,6 +617,42 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
} }
break; break;
} }
case QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC: {
SIndefRowsFuncPhysiNode *pIndefNode = (SIndefRowsFuncPhysiNode *)pNode;
EXPLAIN_ROW_NEW(level, EXPLAIN_INDEF_ROWS_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_LEFT_PARENTHESIS_FORMAT);
if (pResNode->pExecInfo) {
QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen));
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
}
if (pIndefNode->pVectorFuncs) {
EXPLAIN_ROW_APPEND(EXPLAIN_FUNCTIONS_FORMAT, pIndefNode->pVectorFuncs->length);
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
}
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pIndefNode->node.pOutputDataBlockDesc->totalRowSize);
EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT);
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level));
if (verbose) {
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_OUTPUT_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT,
nodesGetOutputNumFromSlotList(pIndefNode->node.pOutputDataBlockDesc->pSlots));
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pIndefNode->node.pOutputDataBlockDesc->outputRowSize);
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
if (pIndefNode->node.pConditions) {
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT);
QRY_ERR_RET(nodesNodeToSQL(pIndefNode->node.pConditions, tbuf + VARSTR_HEADER_SIZE,
TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen));
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
}
}
break;
}
case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE: { case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE: {
SExchangePhysiNode *pExchNode = (SExchangePhysiNode *)pNode; SExchangePhysiNode *pExchNode = (SExchangePhysiNode *)pNode;
SExplainGroup *group = taosHashGet(ctx->groupHash, &pExchNode->srcGroupId, sizeof(pExchNode->srcGroupId)); SExplainGroup *group = taosHashGet(ctx->groupHash, &pExchNode->srcGroupId, sizeof(pExchNode->srcGroupId));
...@@ -607,7 +661,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i ...@@ -607,7 +661,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
QRY_ERR_RET(TSDB_CODE_QRY_APP_ERROR); QRY_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
} }
EXPLAIN_ROW_NEW(level, EXPLAIN_EXCHANGE_FORMAT, group->nodeNum); EXPLAIN_ROW_NEW(level, EXPLAIN_EXCHANGE_FORMAT, pExchNode->singleChannel ? 1 : group->nodeNum);
EXPLAIN_ROW_APPEND(EXPLAIN_LEFT_PARENTHESIS_FORMAT); EXPLAIN_ROW_APPEND(EXPLAIN_LEFT_PARENTHESIS_FORMAT);
if (pResNode->pExecInfo) { if (pResNode->pExecInfo) {
QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen)); QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen));
...@@ -750,6 +804,106 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i ...@@ -750,6 +804,106 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
} }
break; break;
} }
case QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL: {
SMergeIntervalPhysiNode *pIntNode = (SMergeIntervalPhysiNode *)pNode;
EXPLAIN_ROW_NEW(level, EXPLAIN_INTERVAL_FORMAT, nodesGetNameFromColumnNode(pIntNode->window.pTspk));
EXPLAIN_ROW_APPEND(EXPLAIN_LEFT_PARENTHESIS_FORMAT);
if (pResNode->pExecInfo) {
QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen));
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
}
EXPLAIN_ROW_APPEND(EXPLAIN_FUNCTIONS_FORMAT, pIntNode->window.pFuncs->length);
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pIntNode->window.node.pOutputDataBlockDesc->totalRowSize);
EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT);
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level));
if (verbose) {
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_OUTPUT_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT,
nodesGetOutputNumFromSlotList(pIntNode->window.node.pOutputDataBlockDesc->pSlots));
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pIntNode->window.node.pOutputDataBlockDesc->outputRowSize);
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
uint8_t precision = getIntervalPrecision(pIntNode);
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_TIME_WINDOWS_FORMAT,
INVERAL_TIME_FROM_PRECISION_TO_UNIT(pIntNode->interval, pIntNode->intervalUnit, precision),
pIntNode->intervalUnit, pIntNode->offset, getPrecisionUnit(precision),
INVERAL_TIME_FROM_PRECISION_TO_UNIT(pIntNode->sliding, pIntNode->slidingUnit, precision),
pIntNode->slidingUnit);
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
if (pIntNode->window.node.pConditions) {
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT);
QRY_ERR_RET(nodesNodeToSQL(pIntNode->window.node.pConditions, tbuf + VARSTR_HEADER_SIZE,
TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen));
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
}
}
break;
}
case QUERY_NODE_PHYSICAL_PLAN_FILL: {
SFillPhysiNode *pFillNode = (SFillPhysiNode *)pNode;
EXPLAIN_ROW_NEW(level, EXPLAIN_FILL_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_LEFT_PARENTHESIS_FORMAT);
if (pResNode->pExecInfo) {
QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen));
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
}
EXPLAIN_ROW_APPEND(EXPLAIN_MODE_FORMAT, nodesGetFillModeString(pFillNode->mode));
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pFillNode->node.pOutputDataBlockDesc->totalRowSize);
EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT);
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level));
if (verbose) {
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_OUTPUT_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT,
nodesGetOutputNumFromSlotList(pFillNode->node.pOutputDataBlockDesc->pSlots));
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pFillNode->node.pOutputDataBlockDesc->outputRowSize);
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
if (pFillNode->pValues) {
SNodeListNode *pValues = (SNodeListNode*)pFillNode->pValues;
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILL_VALUE_FORMAT);
SNode* tNode = NULL;
int32_t i = 0;
FOREACH(tNode, pValues->pNodeList) {
if (i) {
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
}
SValueNode* tValue = (SValueNode*)tNode;
char *value = nodesGetStrValueFromNode(tValue);
EXPLAIN_ROW_APPEND(EXPLAIN_STRING_TYPE_FORMAT, value);
taosMemoryFree(value);
++i;
}
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
}
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_TIMERANGE_FORMAT, pFillNode->timeRange.skey,
pFillNode->timeRange.ekey);
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
if (pFillNode->node.pConditions) {
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT);
QRY_ERR_RET(nodesNodeToSQL(pFillNode->node.pConditions, tbuf + VARSTR_HEADER_SIZE,
TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen));
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
}
}
break;
}
case QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION: { case QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION: {
SSessionWinodwPhysiNode *pSessNode = (SSessionWinodwPhysiNode *)pNode; SSessionWinodwPhysiNode *pSessNode = (SSessionWinodwPhysiNode *)pNode;
EXPLAIN_ROW_NEW(level, EXPLAIN_SESSION_FORMAT); EXPLAIN_ROW_NEW(level, EXPLAIN_SESSION_FORMAT);
......
...@@ -1186,6 +1186,7 @@ static int32_t createExchangePhysiNodeByMerge(SMergePhysiNode* pMerge) { ...@@ -1186,6 +1186,7 @@ static int32_t createExchangePhysiNodeByMerge(SMergePhysiNode* pMerge) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
pExchange->srcGroupId = pMerge->srcGroupId; pExchange->srcGroupId = pMerge->srcGroupId;
pExchange->singleChannel = true;
pExchange->node.pParent = (SPhysiNode*)pMerge; pExchange->node.pParent = (SPhysiNode*)pMerge;
pExchange->node.pOutputDataBlockDesc = nodesCloneNode(pMerge->node.pOutputDataBlockDesc); pExchange->node.pOutputDataBlockDesc = nodesCloneNode(pMerge->node.pOutputDataBlockDesc);
if (NULL == pExchange->node.pOutputDataBlockDesc) { if (NULL == pExchange->node.pOutputDataBlockDesc) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册