提交 cbc3b52d 编写于 作者: H Haojun Liao

[td-13039] add system table scanner.

上级 459b0ef4
......@@ -836,6 +836,8 @@ int32_t tDeserializeSShowRsp(void* buf, int32_t bufLen, SShowRsp* pRsp);
void tFreeSShowRsp(SShowRsp* pRsp);
typedef struct {
int32_t type;
char db[TSDB_DB_FNAME_LEN];
int64_t showId;
int8_t free;
} SRetrieveTableReq;
......
......@@ -134,6 +134,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_MND_HEARTBEAT, "mnode-heartbeat", SClientHbBatchReq, SClientHbBatchRsp)
TD_DEF_MSG_TYPE(TDMT_MND_SHOW, "mnode-show", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_SHOW_RETRIEVE, "mnode-retrieve", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_SYSTABLE_RETRIEVE, "mnode-systable-retrieve", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_STATUS, "mnode-status", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_TRANS_TIMER, "mnode-trans-tmr", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_KILL_TRANS, "mnode-kill-trans", NULL, NULL)
......
......@@ -106,6 +106,7 @@ typedef enum ENodeType {
QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN,
QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN,
QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN,
QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN,
QUERY_NODE_PHYSICAL_PLAN_PROJECT,
QUERY_NODE_PHYSICAL_PLAN_JOIN,
QUERY_NODE_PHYSICAL_PLAN_AGG,
......
......@@ -353,6 +353,23 @@ typedef struct {
char payload[];
} SShowObj;
typedef struct {
int64_t id;
int8_t type;
int8_t replica;
int16_t numOfColumns;
int32_t rowSize;
int32_t numOfRows;
int32_t numOfReads;
int32_t payloadLen;
void* pIter;
SMnode* pMnode;
char db[TSDB_DB_FNAME_LEN];
int16_t offset[TSDB_MAX_COLUMNS];
int32_t bytes[TSDB_MAX_COLUMNS];
char payload[];
} SSysTableRetrieveObj;
typedef struct {
int32_t vgId; // -1 for unassigned
int32_t status;
......
......@@ -25,6 +25,7 @@ static void mndReleaseShowObj(SShowObj *pShow, bool forceRemove);
static int32_t mndProcessShowReq(SMnodeMsg *pReq);
static int32_t mndProcessRetrieveReq(SMnodeMsg *pReq);
static bool mndCheckRetrieveFinished(SShowObj *pShow);
static int32_t mndProcessRetrieveSysTableReq(SMnodeMsg *pReq);
int32_t mndInitShow(SMnode *pMnode) {
SShowMgmt *pMgmt = &pMnode->showMgmt;
......@@ -38,6 +39,7 @@ int32_t mndInitShow(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_MND_SHOW, mndProcessShowReq);
mndSetMsgHandle(pMnode, TDMT_MND_SHOW_RETRIEVE, mndProcessRetrieveReq);
mndSetMsgHandle(pMnode, TDMT_MND_SYSTABLE_RETRIEVE, mndProcessRetrieveSysTableReq);
return 0;
}
......@@ -261,6 +263,106 @@ static int32_t mndProcessRetrieveReq(SMnodeMsg *pReq) {
return TSDB_CODE_SUCCESS;
}
static int32_t mndProcessRetrieveSysTableReq(SMnodeMsg *pReq) {
SMnode *pMnode = pReq->pMnode;
SShowMgmt *pMgmt = &pMnode->showMgmt;
int32_t rowsToRead = 0;
int32_t size = 0;
int32_t rowsRead = 0;
SRetrieveTableReq retrieveReq = {0};
if (tDeserializeSRetrieveTableReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &retrieveReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
SShowObj* pShow = NULL;
if (retrieveReq.showId == 0) {
SShowReq req = {0};
req.type = retrieveReq.type;
strncpy(req.db, retrieveReq.db, tListLen(req.db));
pShow = mndCreateShowObj(pMnode, &req);
if (pShow == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("failed to process show-meta req since %s", terrstr());
return -1;
}
} else {
pShow = mndAcquireShowObj(pMnode, retrieveReq.showId);
if (pShow == NULL) {
terrno = TSDB_CODE_MND_INVALID_SHOWOBJ;
mError("failed to process show-retrieve req:%p since %s", pShow, terrstr());
return -1;
}
}
ShowRetrieveFp retrieveFp = pMgmt->retrieveFps[pShow->type];
if (retrieveFp == NULL) {
mndReleaseShowObj((SShowObj*) pShow, false);
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
mError("show:0x%" PRIx64 ", failed to retrieve data since %s", pShow->id, terrstr());
return -1;
}
mDebug("show:0x%" PRIx64 ", start retrieve data, numOfReads:%d numOfRows:%d type:%s", pShow->id, pShow->numOfReads,
pShow->numOfRows, mndShowStr(pShow->type));
if (mndCheckRetrieveFinished((SShowObj*) pShow)) {
mDebug("show:0x%" PRIx64 ", read finished, numOfReads:%d numOfRows:%d", pShow->id, pShow->numOfReads,
pShow->numOfRows);
pShow->numOfReads = pShow->numOfRows;
}
if ((retrieveReq.free & TSDB_QUERY_TYPE_FREE_RESOURCE) != TSDB_QUERY_TYPE_FREE_RESOURCE) {
rowsToRead = pShow->numOfRows - pShow->numOfReads;
}
/* return no more than 100 tables in one round trip */
if (rowsToRead > SHOW_STEP_SIZE) rowsToRead = SHOW_STEP_SIZE;
/*
* the actual number of table may be larger than the value of pShow->numOfRows, if a query is
* issued during a continuous create table operation. Therefore, rowToRead may be less than 0.
*/
if (rowsToRead < 0) rowsToRead = 0;
size = pShow->rowSize * rowsToRead;
size += SHOW_STEP_SIZE;
SRetrieveTableRsp *pRsp = rpcMallocCont(size);
if (pRsp == NULL) {
mndReleaseShowObj((SShowObj*) pShow, false);
terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("show:0x%" PRIx64 ", failed to retrieve data since %s", pShow->id, terrstr());
return -1;
}
// if free flag is set, client wants to clean the resources
if ((retrieveReq.free & TSDB_QUERY_TYPE_FREE_RESOURCE) != TSDB_QUERY_TYPE_FREE_RESOURCE) {
rowsRead = (*retrieveFp)(pReq, (SShowObj*) pShow, pRsp->data, rowsToRead);
}
mDebug("show:0x%" PRIx64 ", stop retrieve data, rowsRead:%d rowsToRead:%d", pShow->id, rowsRead, rowsToRead);
pRsp->numOfRows = htonl(rowsRead);
pRsp->precision = TSDB_TIME_PRECISION_MILLI; // millisecond time precision
pReq->pCont = pRsp;
pReq->contLen = size;
if (rowsRead == 0 || rowsToRead == 0 || (rowsRead == rowsToRead && pShow->numOfRows == pShow->numOfReads)) {
pRsp->completed = 1;
mDebug("show:0x%" PRIx64 ", retrieve completed", pShow->id);
mndReleaseShowObj((SShowObj*) pShow, true);
} else {
mDebug("show:0x%" PRIx64 ", retrieve not completed yet", pShow->id);
mndReleaseShowObj((SShowObj*) pShow, false);
}
return TSDB_CODE_SUCCESS;
}
char *mndShowStr(int32_t showType) {
switch (showType) {
case TSDB_MGMT_TABLE_ACCT:
......
......@@ -410,12 +410,10 @@ typedef struct STableScanInfo {
int32_t numOfSkipped;
int32_t numOfBlockStatis;
int64_t numOfRows;
int32_t order; // scan order
int32_t times; // repeat counts
int32_t current;
int32_t reverseTimes; // 0 by default
SqlFunctionCtx* pCtx; // next operator query context
SResultRowInfo* pResultRowInfo;
int32_t* rowCellInfoOffset;
......@@ -424,7 +422,6 @@ typedef struct STableScanInfo {
int32_t numOfOutput;
int64_t elapsedTime;
int32_t prevGroupId; // previous table group id
int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan
} STableScanInfo;
......@@ -443,6 +440,20 @@ typedef struct SStreamBlockScanInfo {
void* readerHandle; // stream block reader handle
} SStreamBlockScanInfo;
typedef struct SSysTableScanInfo {
void *pTransporter;
SEpSet epSet;
int32_t type; // show type
tsem_t ready;
void *readHandle;
SSchema *pSchema;
SSDataBlock *pRes;
int64_t numOfBlocks; // extract basic running information.
int64_t totalRows;
int64_t elapsedTime;
int64_t totalBytes;
} SSysTableScanInfo;
typedef struct SOptrBasicInfo {
SResultRowInfo resultRowInfo;
int32_t* rowCellInfoOffset; // offset value for each row result cell info
......@@ -612,13 +623,14 @@ typedef struct SOrderOperatorInfo {
uint64_t totalElapsed; // total elapsed time
} SOrderOperatorInfo;
SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray* pSchema, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray* pExprInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput,
int32_t repeatTime, int32_t reverseTime, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv);
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSystemScanOperatorInfo(void* pSystemTableReadHandle, const SArray* pExprInfo, const SSchema* pSchema, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream);
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SInterval* pInterval, SExecTaskInfo* pTaskInfo);
......
......@@ -5353,7 +5353,6 @@ SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order,
pInfo->order = order;
pInfo->current = 0;
pInfo->scanFlag = MAIN_SCAN;
pOperator->name = "TableScanOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
pOperator->blockingOptr = false;
......@@ -5456,6 +5455,80 @@ SOperatorInfo* createStreamScanOperatorInfo(void *streamReadHandle, SArray* pExp
return pOperator;
}
static int32_t loadSysTableContentCb(void* param, const SDataBuf* pMsg, int32_t code) {
SSourceDataInfo* pSourceDataInfo = (SSourceDataInfo*) param;
pSourceDataInfo->pRsp = pMsg->pData;
SRetrieveTableRsp* pRsp = pSourceDataInfo->pRsp;
pRsp->numOfRows = htonl(pRsp->numOfRows);
pRsp->useconds = htobe64(pRsp->useconds);
pRsp->compLen = htonl(pRsp->compLen);
pSourceDataInfo->status = DATA_READY;
tsem_post(&pSourceDataInfo->pEx->ready);
}
static SSDataBlock* doSysTableScan(void* param, bool* newgroup) {
// build message and send to mnode to fetch the content of system tables.
SOperatorInfo* pOperator = (SOperatorInfo*) param;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SSysTableScanInfo* pInfo = pOperator->info;
SRetrieveTableReq* req = calloc(1, sizeof(SRetrieveTableReq));
if (req == NULL) {
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
req->type = pInfo->type;
// send the fetch remote task result reques
SMsgSendInfo* pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo));
if (NULL == pMsgSendInfo) {
qError("%s prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo));
pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
return NULL;
}
pMsgSendInfo->param = NULL;
pMsgSendInfo->msgInfo.pData = req;
pMsgSendInfo->msgInfo.len = sizeof(SRetrieveTableReq);
pMsgSendInfo->msgType = TDMT_MND_SYSTABLE_RETRIEVE;
pMsgSendInfo->fp = loadRemoteDataCallback;
int64_t transporterId = 0;
int32_t code = asyncSendMsgToServer(pInfo->pTransporter, &pInfo->epSet, &transporterId, pMsgSendInfo);
tsem_wait(&pInfo->ready);
// handle the response and return to the caller
return NULL;
}
SOperatorInfo* createSystemScanOperatorInfo(void* pSysTableReadHandle, const SArray* pExprInfo, const SSchema* pSchema, SExecTaskInfo* pTaskInfo) {
SSysTableScanInfo* pInfo = calloc(1, sizeof(SSysTableScanInfo));
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
tfree(pInfo);
tfree(pOperator);
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
return NULL;
}
pInfo->readHandle = pSysTableReadHandle;
pOperator->name = "SysTableScanOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN;
pOperator->blockingOptr = false;
pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo;
pOperator->numOfOutput = taosArrayGetSize(pExprInfo);
pOperator->nextDataFn = doSysTableScan;
pOperator->pTaskInfo = pTaskInfo;
return pOperator;
}
void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInfo* pDownstream) {
assert(pTableScanInfo != NULL && pDownstream != NULL);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册