diff --git a/include/common/tmsg.h b/include/common/tmsg.h index ef36285ddc53506c5e65d181212443e17b096f36..eb91b619f8cdeb7491f7ec1ba6b05b5dd392fab6 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -836,6 +836,8 @@ int32_t tDeserializeSShowRsp(void* buf, int32_t bufLen, SShowRsp* pRsp); void tFreeSShowRsp(SShowRsp* pRsp); typedef struct { + int32_t type; + char db[TSDB_DB_FNAME_LEN]; int64_t showId; int8_t free; } SRetrieveTableReq; diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 0f0c4729bc70cf4fbcfdc56d331794fe8e846158..78770f2c50f7254e0ba0169a109b336403e97bf1 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -134,6 +134,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_MND_HEARTBEAT, "mnode-heartbeat", SClientHbBatchReq, SClientHbBatchRsp) TD_DEF_MSG_TYPE(TDMT_MND_SHOW, "mnode-show", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_SHOW_RETRIEVE, "mnode-retrieve", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_SYSTABLE_RETRIEVE, "mnode-systable-retrieve", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_STATUS, "mnode-status", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TRANS_TIMER, "mnode-trans-tmr", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_KILL_TRANS, "mnode-kill-trans", NULL, NULL) diff --git a/include/libs/nodes/nodes.h b/include/libs/nodes/nodes.h index b27150c9fbe2dc422bd0abbc23dbe6f43b31e02c..ff1ddc88c03d83a159ebe1bca6b03e6889b315bd 100644 --- a/include/libs/nodes/nodes.h +++ b/include/libs/nodes/nodes.h @@ -106,6 +106,7 @@ typedef enum ENodeType { QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, + QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN, QUERY_NODE_PHYSICAL_PLAN_PROJECT, QUERY_NODE_PHYSICAL_PLAN_JOIN, QUERY_NODE_PHYSICAL_PLAN_AGG, diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index a36168b622dcc124a4c47105be5d0877330c0377..3856cd3ff3ad09120ea66044a4cdd55418909fc5 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -353,6 +353,23 @@ typedef struct { char payload[]; } SShowObj; +typedef struct { + int64_t id; + int8_t type; + int8_t replica; + int16_t numOfColumns; + int32_t rowSize; + int32_t numOfRows; + int32_t numOfReads; + int32_t payloadLen; + void* pIter; + SMnode* pMnode; + char db[TSDB_DB_FNAME_LEN]; + int16_t offset[TSDB_MAX_COLUMNS]; + int32_t bytes[TSDB_MAX_COLUMNS]; + char payload[]; +} SSysTableRetrieveObj; + typedef struct { int32_t vgId; // -1 for unassigned int32_t status; diff --git a/source/dnode/mnode/impl/src/mndShow.c b/source/dnode/mnode/impl/src/mndShow.c index 8fd0c282e15224a051225f04d911a98d2191e0a0..ad97888ac5b06d3492c505fdbc696689a72bfc24 100644 --- a/source/dnode/mnode/impl/src/mndShow.c +++ b/source/dnode/mnode/impl/src/mndShow.c @@ -25,6 +25,7 @@ static void mndReleaseShowObj(SShowObj *pShow, bool forceRemove); static int32_t mndProcessShowReq(SMnodeMsg *pReq); static int32_t mndProcessRetrieveReq(SMnodeMsg *pReq); static bool mndCheckRetrieveFinished(SShowObj *pShow); +static int32_t mndProcessRetrieveSysTableReq(SMnodeMsg *pReq); int32_t mndInitShow(SMnode *pMnode) { SShowMgmt *pMgmt = &pMnode->showMgmt; @@ -38,6 +39,7 @@ int32_t mndInitShow(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_MND_SHOW, mndProcessShowReq); mndSetMsgHandle(pMnode, TDMT_MND_SHOW_RETRIEVE, mndProcessRetrieveReq); + mndSetMsgHandle(pMnode, TDMT_MND_SYSTABLE_RETRIEVE, mndProcessRetrieveSysTableReq); return 0; } @@ -261,6 +263,106 @@ static int32_t mndProcessRetrieveReq(SMnodeMsg *pReq) { return TSDB_CODE_SUCCESS; } +static int32_t mndProcessRetrieveSysTableReq(SMnodeMsg *pReq) { + SMnode *pMnode = pReq->pMnode; + SShowMgmt *pMgmt = &pMnode->showMgmt; + int32_t rowsToRead = 0; + int32_t size = 0; + int32_t rowsRead = 0; + + SRetrieveTableReq retrieveReq = {0}; + if (tDeserializeSRetrieveTableReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &retrieveReq) != 0) { + terrno = TSDB_CODE_INVALID_MSG; + return -1; + } + + SShowObj* pShow = NULL; + + if (retrieveReq.showId == 0) { + SShowReq req = {0}; + req.type = retrieveReq.type; + strncpy(req.db, retrieveReq.db, tListLen(req.db)); + + pShow = mndCreateShowObj(pMnode, &req); + if (pShow == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + mError("failed to process show-meta req since %s", terrstr()); + return -1; + } + } else { + pShow = mndAcquireShowObj(pMnode, retrieveReq.showId); + if (pShow == NULL) { + terrno = TSDB_CODE_MND_INVALID_SHOWOBJ; + mError("failed to process show-retrieve req:%p since %s", pShow, terrstr()); + return -1; + } + } + + ShowRetrieveFp retrieveFp = pMgmt->retrieveFps[pShow->type]; + if (retrieveFp == NULL) { + mndReleaseShowObj((SShowObj*) pShow, false); + terrno = TSDB_CODE_MSG_NOT_PROCESSED; + mError("show:0x%" PRIx64 ", failed to retrieve data since %s", pShow->id, terrstr()); + return -1; + } + + mDebug("show:0x%" PRIx64 ", start retrieve data, numOfReads:%d numOfRows:%d type:%s", pShow->id, pShow->numOfReads, + pShow->numOfRows, mndShowStr(pShow->type)); + + if (mndCheckRetrieveFinished((SShowObj*) pShow)) { + mDebug("show:0x%" PRIx64 ", read finished, numOfReads:%d numOfRows:%d", pShow->id, pShow->numOfReads, + pShow->numOfRows); + pShow->numOfReads = pShow->numOfRows; + } + + if ((retrieveReq.free & TSDB_QUERY_TYPE_FREE_RESOURCE) != TSDB_QUERY_TYPE_FREE_RESOURCE) { + rowsToRead = pShow->numOfRows - pShow->numOfReads; + } + + /* return no more than 100 tables in one round trip */ + if (rowsToRead > SHOW_STEP_SIZE) rowsToRead = SHOW_STEP_SIZE; + + /* + * the actual number of table may be larger than the value of pShow->numOfRows, if a query is + * issued during a continuous create table operation. Therefore, rowToRead may be less than 0. + */ + if (rowsToRead < 0) rowsToRead = 0; + size = pShow->rowSize * rowsToRead; + + size += SHOW_STEP_SIZE; + SRetrieveTableRsp *pRsp = rpcMallocCont(size); + if (pRsp == NULL) { + mndReleaseShowObj((SShowObj*) pShow, false); + terrno = TSDB_CODE_OUT_OF_MEMORY; + mError("show:0x%" PRIx64 ", failed to retrieve data since %s", pShow->id, terrstr()); + return -1; + } + + // if free flag is set, client wants to clean the resources + if ((retrieveReq.free & TSDB_QUERY_TYPE_FREE_RESOURCE) != TSDB_QUERY_TYPE_FREE_RESOURCE) { + rowsRead = (*retrieveFp)(pReq, (SShowObj*) pShow, pRsp->data, rowsToRead); + } + + mDebug("show:0x%" PRIx64 ", stop retrieve data, rowsRead:%d rowsToRead:%d", pShow->id, rowsRead, rowsToRead); + + pRsp->numOfRows = htonl(rowsRead); + pRsp->precision = TSDB_TIME_PRECISION_MILLI; // millisecond time precision + + pReq->pCont = pRsp; + pReq->contLen = size; + + if (rowsRead == 0 || rowsToRead == 0 || (rowsRead == rowsToRead && pShow->numOfRows == pShow->numOfReads)) { + pRsp->completed = 1; + mDebug("show:0x%" PRIx64 ", retrieve completed", pShow->id); + mndReleaseShowObj((SShowObj*) pShow, true); + } else { + mDebug("show:0x%" PRIx64 ", retrieve not completed yet", pShow->id); + mndReleaseShowObj((SShowObj*) pShow, false); + } + + return TSDB_CODE_SUCCESS; +} + char *mndShowStr(int32_t showType) { switch (showType) { case TSDB_MGMT_TABLE_ACCT: diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 1cc25938deb979a4e5971be971304c6193c9968a..d46978b26daa354f850464ea1f3a0f672e9466e0 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -405,17 +405,15 @@ typedef struct SExchangeInfo { } SExchangeInfo; typedef struct STableScanInfo { - void* pTsdbReadHandle; - int32_t numOfBlocks; // extract basic running information. - int32_t numOfSkipped; - int32_t numOfBlockStatis; - int64_t numOfRows; - - int32_t order; // scan order - int32_t times; // repeat counts - int32_t current; - int32_t reverseTimes; // 0 by default - + void* pTsdbReadHandle; + int32_t numOfBlocks; // extract basic running information. + int32_t numOfSkipped; + int32_t numOfBlockStatis; + int64_t numOfRows; + int32_t order; // scan order + int32_t times; // repeat counts + int32_t current; + int32_t reverseTimes; // 0 by default SqlFunctionCtx* pCtx; // next operator query context SResultRowInfo* pResultRowInfo; int32_t* rowCellInfoOffset; @@ -424,8 +422,7 @@ typedef struct STableScanInfo { int32_t numOfOutput; int64_t elapsedTime; int32_t prevGroupId; // previous table group id - - int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan + int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan } STableScanInfo; typedef struct STagScanInfo { @@ -443,6 +440,20 @@ typedef struct SStreamBlockScanInfo { void* readerHandle; // stream block reader handle } SStreamBlockScanInfo; +typedef struct SSysTableScanInfo { + void *pTransporter; + SEpSet epSet; + int32_t type; // show type + tsem_t ready; + void *readHandle; + SSchema *pSchema; + SSDataBlock *pRes; + int64_t numOfBlocks; // extract basic running information. + int64_t totalRows; + int64_t elapsedTime; + int64_t totalBytes; +} SSysTableScanInfo; + typedef struct SOptrBasicInfo { SResultRowInfo resultRowInfo; int32_t* rowCellInfoOffset; // offset value for each row result cell info @@ -612,13 +623,14 @@ typedef struct SOrderOperatorInfo { uint64_t totalElapsed; // total elapsed time } SOrderOperatorInfo; -SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray* pSchema, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray* pExprInfo, SExecTaskInfo* pTaskInfo); SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput, int32_t repeatTime, int32_t reverseTime, SExecTaskInfo* pTaskInfo); SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv); SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createSystemScanOperatorInfo(void* pSystemTableReadHandle, const SArray* pExprInfo, const SSchema* pSchema, SExecTaskInfo* pTaskInfo); SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream); SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SInterval* pInterval, SExecTaskInfo* pTaskInfo); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index fcb98ea167a0c23f890fe9c7c203fea45c0ea5dd..7e46a7a63d972f5c240649877ddab0c1952c23ae 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -5280,7 +5280,7 @@ SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray* pOperator->status = OP_IN_EXECUTING; pOperator->info = pInfo; pOperator->numOfOutput = size; - pOperator->nextDataFn = doLoadRemoteData; + pOperator->nextDataFn = doLoadRemoteData; pOperator->pTaskInfo = pTaskInfo; #if 1 @@ -5348,19 +5348,18 @@ SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, } pInfo->pTsdbReadHandle = pTsdbReadHandle; - pInfo->times = repeatTime; - pInfo->reverseTimes = reverseTime; - pInfo->order = order; - pInfo->current = 0; - pInfo->scanFlag = MAIN_SCAN; - + pInfo->times = repeatTime; + pInfo->reverseTimes = reverseTime; + pInfo->order = order; + pInfo->current = 0; + pInfo->scanFlag = MAIN_SCAN; pOperator->name = "TableScanOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN; pOperator->blockingOptr = false; pOperator->status = OP_IN_EXECUTING; pOperator->info = pInfo; pOperator->numOfOutput = numOfOutput; - pOperator->nextDataFn = doTableScan; + pOperator->nextDataFn = doTableScan; pOperator->pTaskInfo = pTaskInfo; return pOperator; @@ -5456,6 +5455,80 @@ SOperatorInfo* createStreamScanOperatorInfo(void *streamReadHandle, SArray* pExp return pOperator; } + +static int32_t loadSysTableContentCb(void* param, const SDataBuf* pMsg, int32_t code) { + SSourceDataInfo* pSourceDataInfo = (SSourceDataInfo*) param; + pSourceDataInfo->pRsp = pMsg->pData; + + SRetrieveTableRsp* pRsp = pSourceDataInfo->pRsp; + pRsp->numOfRows = htonl(pRsp->numOfRows); + pRsp->useconds = htobe64(pRsp->useconds); + pRsp->compLen = htonl(pRsp->compLen); + + pSourceDataInfo->status = DATA_READY; + tsem_post(&pSourceDataInfo->pEx->ready); +} + +static SSDataBlock* doSysTableScan(void* param, bool* newgroup) { +// build message and send to mnode to fetch the content of system tables. + SOperatorInfo* pOperator = (SOperatorInfo*) param; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SSysTableScanInfo* pInfo = pOperator->info; + + SRetrieveTableReq* req = calloc(1, sizeof(SRetrieveTableReq)); + if (req == NULL) { + pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + req->type = pInfo->type; + + // send the fetch remote task result reques + SMsgSendInfo* pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo)); + if (NULL == pMsgSendInfo) { + qError("%s prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo)); + pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY; + return NULL; + } + + pMsgSendInfo->param = NULL; + pMsgSendInfo->msgInfo.pData = req; + pMsgSendInfo->msgInfo.len = sizeof(SRetrieveTableReq); + pMsgSendInfo->msgType = TDMT_MND_SYSTABLE_RETRIEVE; + pMsgSendInfo->fp = loadRemoteDataCallback; + + int64_t transporterId = 0; + int32_t code = asyncSendMsgToServer(pInfo->pTransporter, &pInfo->epSet, &transporterId, pMsgSendInfo); + + tsem_wait(&pInfo->ready); + // handle the response and return to the caller + + return NULL; +} + +SOperatorInfo* createSystemScanOperatorInfo(void* pSysTableReadHandle, const SArray* pExprInfo, const SSchema* pSchema, SExecTaskInfo* pTaskInfo) { + SSysTableScanInfo* pInfo = calloc(1, sizeof(SSysTableScanInfo)); + SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); + if (pInfo == NULL || pOperator == NULL) { + tfree(pInfo); + tfree(pOperator); + terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; + return NULL; + } + + pInfo->readHandle = pSysTableReadHandle; + pOperator->name = "SysTableScanOperator"; + pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN; + pOperator->blockingOptr = false; + pOperator->status = OP_IN_EXECUTING; + pOperator->info = pInfo; + pOperator->numOfOutput = taosArrayGetSize(pExprInfo); + pOperator->nextDataFn = doSysTableScan; + pOperator->pTaskInfo = pTaskInfo; + + return pOperator; +} + void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInfo* pDownstream) { assert(pTableScanInfo != NULL && pDownstream != NULL);