diff --git a/include/common/taosdef.h b/include/common/taosdef.h index 9e5e5ebdcf3fa0c9cd975ccee09fd39a5b543dac..05208b1320998dac61530c4b7166359e23c8a07a 100644 --- a/include/common/taosdef.h +++ b/include/common/taosdef.h @@ -30,12 +30,13 @@ typedef int64_t tb_uid_t; #define IS_TSWINDOW_SPECIFIED(win) (((win).skey != INT64_MIN) || ((win).ekey != INT64_MAX)) typedef enum { - TSDB_SUPER_TABLE = 1, // super table - TSDB_CHILD_TABLE = 2, // table created from super table - TSDB_NORMAL_TABLE = 3, // ordinary table - TSDB_STREAM_TABLE = 4, // table created from stream computing - TSDB_TEMP_TABLE = 5, // temp table created by nest query - TSDB_TABLE_MAX = 6 + TSDB_SUPER_TABLE = 1, // super table + TSDB_CHILD_TABLE = 2, // table created from super table + TSDB_NORMAL_TABLE = 3, // ordinary table + TSDB_STREAM_TABLE = 4, // table created from stream computing + TSDB_TEMP_TABLE = 5, // temp table created by nest query + TSDB_SYSTEM_TABLE = 6, + TSDB_TABLE_MAX = 7 } ETableType; typedef enum { diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 82acb2dcb3f63a14d1665e8370b03c47fe23f44b..a10c64c970d313f3dd28a931609164ef3e7a436c 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -879,6 +879,17 @@ typedef struct { char data[]; } SRetrieveTableRsp; +typedef struct { + int64_t handle; + int64_t useconds; + int8_t completed; // all results are returned to client + int8_t precision; + int8_t compressed; + int32_t compLen; + int32_t numOfRows; + char data[]; +} SRetrieveMetaTableRsp; + typedef struct { char fqdn[TSDB_FQDN_LEN]; // end point, hostname:port int32_t port; diff --git a/include/libs/function/function.h b/include/libs/function/function.h index c01e267c42c418aedcca7599c13c95f83c4ff9a4..fde09e59daf214791d2b72fc291a435d2635c40b 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -163,7 +163,7 @@ typedef struct SInputColumnInfoData { typedef struct SqlFunctionCtx { SInputColumnInfoData input; SResultDataInfo resDataInfo; - uint32_t order; // asc|desc + uint32_t order; // data block scanner order: asc|desc //////////////////////////////////////////////////////////////// int32_t startRow; // start row index int32_t size; // handled processed row number diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index cd979968f7a1da28b5cf199ac6730f51be738271..75eedf58b40e7e30b8605c8736ed4c172d19bd7d 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -36,7 +36,7 @@ typedef struct SLogicNode { typedef enum EScanType { SCAN_TYPE_TAG, SCAN_TYPE_TABLE, - SCAN_TYPE_STABLE, + SCAN_TYPE_SYSTEM_TABLE, SCAN_TYPE_STREAM } EScanType; @@ -147,9 +147,13 @@ typedef struct SScanPhysiNode { SName tableName; } SScanPhysiNode; -typedef SScanPhysiNode SSystemTableScanPhysiNode; typedef SScanPhysiNode STagScanPhysiNode; +typedef struct SSystemTableScanPhysiNode { + SScanPhysiNode scan; + SEpSet mgmtEpSet; +} SSystemTableScanPhysiNode; + typedef struct STableScanPhysiNode { SScanPhysiNode scan; uint8_t scanFlag; // denotes reversed scan of data or not diff --git a/include/libs/planner/planner.h b/include/libs/planner/planner.h index 07579e0a7dab8b63711e7598d725113851f1d729..fe1b2d27f944c3613201e2dc756bbb6273da0402 100644 --- a/include/libs/planner/planner.h +++ b/include/libs/planner/planner.h @@ -25,6 +25,7 @@ extern "C" { typedef struct SPlanContext { uint64_t queryId; int32_t acctId; + SEpSet mgmtEpSet; SNode* pAstRoot; } SPlanContext; diff --git a/include/util/tdef.h b/include/util/tdef.h index a53b81894a360c410b80c269b8bf0572a6c29e47..057725b1ff395e5042779e3de7db18717e94e67a 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -99,7 +99,7 @@ extern const int32_t TYPE_BYTES[15]; #define TSDB_INS_TABLE_MNODES "mnodes" #define TSDB_INS_TABLE_MODULES "modules" #define TSDB_INS_TABLE_QNODES "qnodes" -#define TSDB_INS_TABLE_USER_DATABASE "user_database" +#define TSDB_INS_TABLE_USER_DATABASES "user_databases" #define TSDB_INS_TABLE_USER_FUNCTIONS "user_functions" #define TSDB_INS_TABLE_USER_INDEXES "user_indexes" #define TSDB_INS_TABLE_USER_STABLES "user_stables" diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 479eec598fd9b1e968a498c2b51b44866ae9b42c..193c8fdf72aaee44b3ff5f6a64ea9772782bf8c4 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -194,7 +194,12 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQuery* pQuery) { int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList) { pRequest->type = pQuery->msgType; - SPlanContext cxt = { .queryId = pRequest->requestId, .pAstRoot = pQuery->pRoot, .acctId = pRequest->pTscObj->acctId }; + SPlanContext cxt = { + .queryId = pRequest->requestId, + .acctId = pRequest->pTscObj->acctId, + .mgmtEpSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp), + .pAstRoot = pQuery->pRoot + }; int32_t code = qCreateQueryPlan(&cxt, pPlan, pNodeList); if (code != 0) { return code; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index eed3d8fa3703b9fa07969b14ec9d7e7a72b1e209..045ce13a1bfcb6e72a172c339758f0eadf856a05 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -1732,6 +1732,7 @@ int32_t tSerializeSRetrieveTableReq(void *buf, int32_t bufLen, SRetrieveTableReq if (tStartEncode(&encoder) < 0) return -1; if (tEncodeI64(&encoder, pReq->showId) < 0) return -1; + if (tEncodeI32(&encoder, pReq->type) < 0) return -1; if (tEncodeI8(&encoder, pReq->free) < 0) return -1; tEndEncode(&encoder); @@ -1746,6 +1747,7 @@ int32_t tDeserializeSRetrieveTableReq(void *buf, int32_t bufLen, SRetrieveTableR if (tStartDecode(&decoder) < 0) return -1; if (tDecodeI64(&decoder, &pReq->showId) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->type) < 0) return -1; if (tDecodeI8(&decoder, &pReq->free) < 0) return -1; tEndDecode(&decoder); diff --git a/source/dnode/mgmt/impl/src/dndTransport.c b/source/dnode/mgmt/impl/src/dndTransport.c index 15db36477d27f845bf4ca3bb1d03d9050c911f38..b58b12bba2282026643ce79e655d096efb597c50 100644 --- a/source/dnode/mgmt/impl/src/dndTransport.c +++ b/source/dnode/mgmt/impl/src/dndTransport.c @@ -102,6 +102,7 @@ static void dndInitMsgFp(STransMgmt *pMgmt) { pMgmt->msgFp[TMSG_INDEX(TDMT_MND_HEARTBEAT)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SHOW)] = dndProcessMnodeReadMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SHOW_RETRIEVE)] = dndProcessMnodeReadMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SYSTABLE_RETRIEVE)] = dndProcessMnodeReadMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STATUS)] = dndProcessMnodeReadMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STATUS_RSP)] = dndProcessMgmtMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_KILL_TRANS)] = dndProcessMnodeWriteMsg; diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index c5f2177b34aed4cf42a8d9bfab9b5232bec9827c..39919bc233e5607fc47f9c091eab6a6e4e83c3ab 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -1342,123 +1342,149 @@ char *mnGetDbStr(char *src) { return pos; } -static int32_t mndRetrieveDbs(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { - SMnode *pMnode = pReq->pMnode; - SSdb *pSdb = pMnode->pSdb; - int32_t numOfRows = 0; - SDbObj *pDb = NULL; - char *pWrite; +static char* getDataPosition(char* pData, SShowObj* pShow, int32_t cols, int32_t rows, int32_t capacityOfRow) { + return pData + pShow->offset[cols] * capacityOfRow + pShow->bytes[cols] * rows; +} + +static void dumpDbInfoToPayload(char* data, SDbObj* pDb, SShowObj* pShow, int32_t rows, int32_t rowCapacity, int64_t numOfTables) { int32_t cols = 0; - while (numOfRows < rows) { - pShow->pIter = sdbFetch(pSdb, SDB_DB, pShow->pIter, (void **)&pDb); - if (pShow->pIter == NULL) break; + char* pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity); + char *name = mnGetDbStr(pDb->name); + if (name != NULL) { + STR_WITH_MAXSIZE_TO_VARSTR(pWrite, name, pShow->bytes[cols]); + } else { + STR_TO_VARSTR(pWrite, "NULL"); + } + cols++; - cols = 0; + pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity); + *(int64_t *)pWrite = pDb->createdTime; + cols++; - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - char *name = mnGetDbStr(pDb->name); - if (name != NULL) { - STR_WITH_MAXSIZE_TO_VARSTR(pWrite, name, pShow->bytes[cols]); - } else { - STR_TO_VARSTR(pWrite, "NULL"); - } - cols++; + pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity); + *(int16_t *)pWrite = pDb->cfg.numOfVgroups; + cols++; + + pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity); + *(int64_t *)pWrite = numOfTables; + cols++; - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - *(int64_t *)pWrite = pDb->createdTime; - cols++; + pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity); + *(int16_t *)pWrite = pDb->cfg.replications; + cols++; - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - *(int16_t *)pWrite = pDb->cfg.numOfVgroups; - cols++; + pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity); + *(int16_t *)pWrite = pDb->cfg.quorum; + cols++; - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - *(int16_t *)pWrite = 0; // todo - cols++; + pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity); + *(int16_t *)pWrite = pDb->cfg.daysPerFile; + cols++; - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - *(int16_t *)pWrite = pDb->cfg.replications; - cols++; + pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity); + char tmp[128] = {0}; + if (pDb->cfg.daysToKeep0 > pDb->cfg.daysToKeep1 || pDb->cfg.daysToKeep0 > pDb->cfg.daysToKeep2) { + sprintf(tmp, "%d,%d,%d", pDb->cfg.daysToKeep1, pDb->cfg.daysToKeep2, pDb->cfg.daysToKeep0); + } else { + sprintf(tmp, "%d,%d,%d", pDb->cfg.daysToKeep0, pDb->cfg.daysToKeep1, pDb->cfg.daysToKeep2); + } + STR_WITH_SIZE_TO_VARSTR(pWrite, tmp, strlen(tmp)); + cols++; - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - *(int16_t *)pWrite = pDb->cfg.quorum; - cols++; + pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity); + *(int32_t *)pWrite = pDb->cfg.cacheBlockSize; + cols++; - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - *(int16_t *)pWrite = pDb->cfg.daysPerFile; - cols++; + pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity); + *(int32_t *)pWrite = pDb->cfg.totalBlocks; + cols++; - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - char tmp[128] = {0}; - if (pDb->cfg.daysToKeep0 > pDb->cfg.daysToKeep1 || pDb->cfg.daysToKeep0 > pDb->cfg.daysToKeep2) { - sprintf(tmp, "%d,%d,%d", pDb->cfg.daysToKeep1, pDb->cfg.daysToKeep2, pDb->cfg.daysToKeep0); - } else { - sprintf(tmp, "%d,%d,%d", pDb->cfg.daysToKeep0, pDb->cfg.daysToKeep1, pDb->cfg.daysToKeep2); - } - STR_WITH_SIZE_TO_VARSTR(pWrite, tmp, strlen(tmp)); - cols++; - - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - *(int32_t *)pWrite = pDb->cfg.cacheBlockSize; - cols++; - - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - *(int32_t *)pWrite = pDb->cfg.totalBlocks; - cols++; - - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - *(int32_t *)pWrite = pDb->cfg.minRows; - cols++; - - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - *(int32_t *)pWrite = pDb->cfg.maxRows; - cols++; - - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - *(int8_t *)pWrite = pDb->cfg.walLevel; - cols++; - - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - *(int32_t *)pWrite = pDb->cfg.fsyncPeriod; - cols++; - - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - *(int8_t *)pWrite = pDb->cfg.compression; - cols++; - - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - *(int8_t *)pWrite = pDb->cfg.cacheLastRow; - cols++; - - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - char *prec = NULL; - switch (pDb->cfg.precision) { - case TSDB_TIME_PRECISION_MILLI: - prec = TSDB_TIME_PRECISION_MILLI_STR; - break; - case TSDB_TIME_PRECISION_MICRO: - prec = TSDB_TIME_PRECISION_MICRO_STR; - break; - case TSDB_TIME_PRECISION_NANO: - prec = TSDB_TIME_PRECISION_NANO_STR; - break; - default: - prec = "none"; - break; - } - STR_WITH_SIZE_TO_VARSTR(pWrite, prec, 2); - cols++; + pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity); + *(int32_t *)pWrite = pDb->cfg.minRows; + cols++; + + pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity); + *(int32_t *)pWrite = pDb->cfg.maxRows; + cols++; - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - *(int8_t *)pWrite = pDb->cfg.update; - cols++; + pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity); + *(int8_t *)pWrite = pDb->cfg.walLevel; + cols++; + + pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity); + *(int32_t *)pWrite = pDb->cfg.fsyncPeriod; + cols++; + + pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity); + *(int8_t *)pWrite = pDb->cfg.compression; + cols++; + + pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity); + *(int8_t *)pWrite = pDb->cfg.cacheLastRow; + cols++; + + pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity); + char *prec = NULL; + switch (pDb->cfg.precision) { + case TSDB_TIME_PRECISION_MILLI: + prec = TSDB_TIME_PRECISION_MILLI_STR; + break; + case TSDB_TIME_PRECISION_MICRO: + prec = TSDB_TIME_PRECISION_MICRO_STR; + break; + case TSDB_TIME_PRECISION_NANO: + prec = TSDB_TIME_PRECISION_NANO_STR; + break; + default: + prec = "none"; + break; + } + STR_WITH_SIZE_TO_VARSTR(pWrite, prec, 2); + cols++; + + pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity); + *(int8_t *)pWrite = pDb->cfg.update; +} + +static void setInformationSchemaDbCfg(SDbObj* pDbObj) { + ASSERT(pDbObj != NULL); + strncpy(pDbObj->name, TSDB_INFORMATION_SCHEMA_DB, tListLen(pDbObj->name)); + + pDbObj->createdTime = 0; + pDbObj->cfg.numOfVgroups = 0; + pDbObj->cfg.quorum = 1; + pDbObj->cfg.replications = 1; + pDbObj->cfg.update = 1; + pDbObj->cfg.precision = TSDB_TIME_PRECISION_MILLI; +} +static int32_t mndRetrieveDbs(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rowsCapacity) { + SMnode *pMnode = pReq->pMnode; + SSdb *pSdb = pMnode->pSdb; + int32_t numOfRows = 0; + SDbObj *pDb = NULL; + + while (numOfRows < rowsCapacity) { + pShow->pIter = sdbFetch(pSdb, SDB_DB, pShow->pIter, (void **)&pDb); + if (pShow->pIter == NULL) { + break; + } + + dumpDbInfoToPayload(data, pDb, pShow, numOfRows, rowsCapacity, 0); numOfRows++; sdbRelease(pSdb, pDb); } - mndVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow); + // Append the information_schema database into the result. + if (numOfRows < rowsCapacity) { + SDbObj dummyISDb = {0}; + setInformationSchemaDbCfg(&dummyISDb); + dumpDbInfoToPayload(data, &dummyISDb, pShow, numOfRows, rowsCapacity, 14); + numOfRows += 1; + } + + mndVacuumResult(data, pShow->numOfColumns, numOfRows, rowsCapacity, pShow); pShow->numOfReads += numOfRows; return numOfRows; diff --git a/source/dnode/mnode/impl/src/mndInfoSchema.c b/source/dnode/mnode/impl/src/mndInfoSchema.c index 2c391e93e8ba0d6d0fd9950cfd3139215a043aed..e60e2989bee2dba9d0d98b14ff05c51364318aa9 100644 --- a/source/dnode/mnode/impl/src/mndInfoSchema.c +++ b/source/dnode/mnode/impl/src/mndInfoSchema.c @@ -39,24 +39,24 @@ static const SInfosTableSchema qnodesSchema[] = {{.name = "id", .byt {.name = "end_point", .bytes = 134, .type = TSDB_DATA_TYPE_BINARY}, {.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP}, }; -static const SInfosTableSchema userDBSchema[] = {{.name = "name", .bytes = 32, .type = TSDB_DATA_TYPE_BINARY}, +static const SInfosTableSchema userDBSchema[] = {{.name = "name", .bytes = (TSDB_DB_NAME_LEN - 1) + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, {.name = "created_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP}, - {.name = "ntables", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, - {.name = "vgroups", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, - {.name = "replica", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, - {.name = "quorum", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, - {.name = "days", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, - {.name = "keep", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, + {.name = "vgroups", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT}, + {.name = "ntables", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT}, + {.name = "replica", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT}, + {.name = "quorum", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT}, + {.name = "days", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT}, + {.name = "keep", .bytes = 24 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, {.name = "cache", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, {.name = "blocks", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, {.name = "minrows", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, {.name = "maxrows", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, - {.name = "wallevel", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, + {.name = "wallevel", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT}, {.name = "fsync", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, - {.name = "comp", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, - {.name = "cachelast", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, - {.name = "precision", .bytes = 2, .type = TSDB_DATA_TYPE_BINARY}, - {.name = "status", .bytes = 10, .type = TSDB_DATA_TYPE_BINARY}, + {.name = "comp", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT}, + {.name = "cachelast", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT}, + {.name = "precision", .bytes = 3 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, + {.name = "update", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT}, }; static const SInfosTableSchema userFuncSchema[] = {{.name = "name", .bytes = 32, .type = TSDB_DATA_TYPE_BINARY}, {.name = "created_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP}, @@ -128,7 +128,7 @@ static const SInfosTableMeta infosMeta[] = {{TSDB_INS_TABLE_DNODES, dnodesSchema {TSDB_INS_TABLE_MNODES, mnodesSchema, tListLen(mnodesSchema)}, {TSDB_INS_TABLE_MODULES, modulesSchema, tListLen(modulesSchema)}, {TSDB_INS_TABLE_QNODES, qnodesSchema, tListLen(qnodesSchema)}, - {TSDB_INS_TABLE_USER_DATABASE, userDBSchema, tListLen(userDBSchema)}, + {TSDB_INS_TABLE_USER_DATABASES, userDBSchema, tListLen(userDBSchema)}, {TSDB_INS_TABLE_USER_FUNCTIONS, userFuncSchema, tListLen(userFuncSchema)}, {TSDB_INS_TABLE_USER_INDEXES, userIdxSchema, tListLen(userIdxSchema)}, {TSDB_INS_TABLE_USER_STABLES, userStbsSchema, tListLen(userStbsSchema)}, @@ -165,7 +165,7 @@ int32_t mndInsInitMeta(SHashObj *hash) { STableMetaRsp meta = {0}; strcpy(meta.dbFName, TSDB_INFORMATION_SCHEMA_DB); - meta.tableType = TSDB_NORMAL_TABLE; + meta.tableType = TSDB_SYSTEM_TABLE; meta.sversion = 1; meta.tversion = 1; diff --git a/source/dnode/mnode/impl/src/mndShow.c b/source/dnode/mnode/impl/src/mndShow.c index ad97888ac5b06d3492c505fdbc696689a72bfc24..83b0a0669f24ae2d7912610a10c6536a58d3a9b0 100644 --- a/source/dnode/mnode/impl/src/mndShow.c +++ b/source/dnode/mnode/impl/src/mndShow.c @@ -284,6 +284,20 @@ static int32_t mndProcessRetrieveSysTableReq(SMnodeMsg *pReq) { strncpy(req.db, retrieveReq.db, tListLen(req.db)); pShow = mndCreateShowObj(pMnode, &req); + STableMetaRsp *meta = (STableMetaRsp *)taosHashGet(pMnode->infosMeta, TSDB_INS_TABLE_USER_DATABASES, strlen(TSDB_INS_TABLE_USER_DATABASES)); + pShow->numOfRows = 100; + + int32_t offset = 0; + for(int32_t i = 0; i < meta->numOfColumns; ++i) { + pShow->numOfColumns = meta->numOfColumns; + pShow->offset[i] = offset; + + int32_t bytes = meta->pSchemas[i].bytes; + pShow->rowSize += bytes; + pShow->bytes[i] = bytes; + offset += bytes; + } + if (pShow == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; mError("failed to process show-meta req since %s", terrstr()); @@ -330,7 +344,7 @@ static int32_t mndProcessRetrieveSysTableReq(SMnodeMsg *pReq) { size = pShow->rowSize * rowsToRead; size += SHOW_STEP_SIZE; - SRetrieveTableRsp *pRsp = rpcMallocCont(size); + SRetrieveMetaTableRsp *pRsp = rpcMallocCont(size); if (pRsp == NULL) { mndReleaseShowObj((SShowObj*) pShow, false); terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -338,6 +352,8 @@ static int32_t mndProcessRetrieveSysTableReq(SMnodeMsg *pReq) { return -1; } + pRsp->handle = htobe64(pShow->id); + // if free flag is set, client wants to clean the resources if ((retrieveReq.free & TSDB_QUERY_TYPE_FREE_RESOURCE) != TSDB_QUERY_TYPE_FREE_RESOURCE) { rowsRead = (*retrieveFp)(pReq, (SShowObj*) pShow, pRsp->data, rowsToRead); diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 4e60ca47186dcbbf55282a7c8e267007e016f0fc..3f31a217567114aa2818334c26cac9431fa92605 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -365,28 +365,6 @@ typedef struct SQInfo { STaskCostInfo summary; } SQInfo; -typedef struct STaskParam { - char* sql; - char* tagCond; - char* colCond; - char* tbnameCond; - char* prevResult; - SArray* pTableIdList; - SExprBasicInfo** pExpr; - SExprBasicInfo** pSecExpr; - SExprInfo* pExprs; - SExprInfo* pSecExprs; - - SFilterInfo* pFilters; - - SColIndex* pGroupColIndex; - SColumnInfo* pTagColumnInfo; - SGroupbyExpr* pGroupbyExpr; - int32_t tableScanOperator; - SArray* pOperator; - struct SUdfInfo* pUdfInfo; -} STaskParam; - enum { EX_SOURCE_DATA_NOT_READY = 0x1, EX_SOURCE_DATA_READY = 0x2, @@ -401,6 +379,12 @@ typedef struct SSourceDataInfo { int32_t status; } SSourceDataInfo; +typedef struct SLoadRemoteDataInfo { + uint64_t totalSize; // total load bytes from remote + uint64_t totalRows; // total number of rows + uint64_t totalElapsed; // total elapsed time +} SLoadRemoteDataInfo; + typedef struct SExchangeInfo { SArray* pSources; SArray* pSourceDataInfo; @@ -409,9 +393,7 @@ typedef struct SExchangeInfo { SSDataBlock* pResult; bool seqLoadData; // sequential load data or not, false by default int32_t current; - uint64_t totalSize; // total load bytes from remote - uint64_t totalRows; // total number of rows - uint64_t totalElapsed; // total elapsed time + SLoadRemoteDataInfo loadInfo; } SExchangeInfo; typedef struct STableScanInfo { @@ -456,19 +438,17 @@ typedef struct SSysTableScanInfo { void* readHandle; }; - void *pCur; // cursor - SRetrieveTableReq* pReq; - SEpSet epSet; - int32_t type; // show type - tsem_t ready; - SSchema* pSchema; - SSDataBlock* pRes; - - int32_t capacity; - int64_t numOfBlocks; // extract basic running information. - int64_t totalRows; - int64_t elapsedTime; - int64_t totalBytes; + SRetrieveMetaTableRsp *pRsp; + void *pCur; // cursor + SRetrieveTableReq req; + SEpSet epSet; + int32_t type; // show type + tsem_t ready; + SSchema* pSchema; + SSDataBlock* pRes; + int32_t capacity; + int64_t numOfBlocks; // extract basic running information. + SLoadRemoteDataInfo loadInfo; } SSysTableScanInfo; typedef struct SOptrBasicInfo { @@ -649,10 +629,8 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntim SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createOrderOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SArray* pOrderVal, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SArray* pExprInfo, SArray* pOrderVal, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, const SArray* pExprInfo, const SSchema* pSchema, - int32_t tableType, SEpSet epset, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataBlock* pResBlock, int32_t tableType, SEpSet epset, + SExecTaskInfo* pTaskInfo); SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream); SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SInterval* pInterval, SExecTaskInfo* pTaskInfo); @@ -688,8 +666,10 @@ SOperatorInfo* createSLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorI SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pdownstream, int32_t numOfDownstream, SSchema* pSchema, int32_t numOfOutput); +SOperatorInfo* createOrderOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SArray* pOrderVal, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SArray* pExprInfo, SArray* pOrderVal, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo); -// int32_t doCreateFilterInfo(SColumnInfo* pCols, int32_t numOfCols, int32_t numOfFilterCols, SSingleColumnFilterInfo** pFilterInfo, uint64_t qId); +// SSDataBlock* doSLimit(void* param, bool* newgroup); void doSetFilterColumnInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols, SSDataBlock* pBlock); bool doFilterDataBlock(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols, int32_t numOfRows, int8_t* p); void doCompactSDataBlock(SSDataBlock* pBlock, int32_t numOfRows, int8_t* p); @@ -706,9 +686,6 @@ void copyTsColoum(SSDataBlock* pRes, SqlFunctionCtx* pCtx, int32_t numOfOutput); int32_t createQueryFilter(char* data, uint16_t len, SFilterInfo** pFilters); -int32_t initQInfo(STsBufInfo* pTsBufInfo, void* tsdb, void* sourceOptr, SQInfo* pQInfo, STaskParam* param, char* start, - int32_t prevResultLen, void* merger); - int32_t createFilterInfo(STaskAttr* pQueryAttr, uint64_t qId); void freeColumnFilterInfo(SColumnFilterInfo* pFilter, int32_t numOfFilters); @@ -720,11 +697,7 @@ int32_t buildArithmeticExprFromMsg(SExprInfo* pArithExprInfo, void* pQueryMsg); bool isTaskKilled(SExecTaskInfo* pTaskInfo); int32_t checkForQueryBuf(size_t numOfTables); bool checkNeedToCompressQueryCol(SQInfo* pQInfo); -void setQueryStatus(STaskRuntimeEnv* pRuntimeEnv, int8_t status); -int32_t doDumpQueryResult(SQInfo* pQInfo, char* data, int8_t compressed, int32_t* compLen); - -size_t getResultSize(SQInfo* pQInfo, int64_t* numOfRows); void setTaskKilled(SExecTaskInfo* pTaskInfo); void publishOperatorProfEvent(SOperatorInfo* operatorInfo, EQueryProfEventType eventType); @@ -734,8 +707,6 @@ void calculateOperatorProfResults(SQInfo* pQInfo); void queryCostStatis(SExecTaskInfo* pTaskInfo); void doDestroyTask(SExecTaskInfo* pTaskInfo); -void freeQueryAttr(STaskAttr* pQuery); - int32_t getMaximumIdleDurationSec(); void doInvokeUdf(struct SUdfInfo* pUdfInfo, SqlFunctionCtx* pCtx, int32_t idx, int32_t type); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 3a8084741a198f090ecd87dbc11bf18305e6e681..4db14a2493b179c7da2b77c90b9ca1403f1a71a7 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4975,35 +4975,36 @@ static int32_t doSendFetchDataRequest(SExchangeInfo *pExchangeInfo, SExecTaskInf return TSDB_CODE_SUCCESS; } -static int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SExchangeInfo *pExchangeInfo, SSourceDataInfo* pDataInfo, int32_t numOfOutput, int64_t startTs) { - char* pData = pDataInfo->pRsp->data; - SRetrieveTableRsp* pRsp = pDataInfo->pRsp; +static int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadInfo, int32_t numOfRows, char* pData, int32_t compLen, + int32_t numOfOutput, int64_t startTs, uint64_t* total) { +// char* pData = pRsp->data; for (int32_t i = 0; i < numOfOutput; ++i) { SColumnInfoData* pColInfoData = taosArrayGet(pRes->pDataBlock, i); - char* tmp = realloc(pColInfoData->pData, pColInfoData->info.bytes * pRsp->numOfRows); + char* tmp = realloc(pColInfoData->pData, pColInfoData->info.bytes * numOfRows); if (tmp == NULL) { return TSDB_CODE_QRY_OUT_OF_MEMORY; } - size_t len = pRsp->numOfRows * pColInfoData->info.bytes; + size_t len = numOfRows * pColInfoData->info.bytes; memcpy(tmp, pData, len); - pColInfoData->pData = tmp; pData += len; } - pRes->info.rows = pRsp->numOfRows; + pRes->info.rows = numOfRows; int64_t el = taosGetTimestampUs() - startTs; - pExchangeInfo->totalRows += pRsp->numOfRows; - pExchangeInfo->totalSize += pRsp->compLen; - pDataInfo->totalRows += pRsp->numOfRows; + pLoadInfo->totalRows += numOfRows; + pLoadInfo->totalSize += compLen; - pExchangeInfo->totalElapsed += el; + if (total != NULL) { + *total += numOfRows; + } + pLoadInfo->totalElapsed += el; return TSDB_CODE_SUCCESS; } @@ -5012,11 +5013,12 @@ static void* setAllSourcesCompleted(SOperatorInfo *pOperator, int64_t startTs) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; int64_t el = taosGetTimestampUs() - startTs; - pExchangeInfo->totalElapsed += el; + SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo; + pLoadInfo->totalElapsed += el; size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources); qDebug("%s all %"PRIzu" sources are exhausted, total rows: %"PRIu64" bytes:%"PRIu64", elapsed:%.2f ms", GET_TASKID(pTaskInfo), totalSources, - pExchangeInfo->totalRows, pExchangeInfo->totalSize, pExchangeInfo->totalElapsed/1000.0); + pLoadInfo->totalRows, pLoadInfo->totalSize, pLoadInfo->totalElapsed/1000.0); doSetOperatorCompleted(pOperator); return NULL; @@ -5045,17 +5047,19 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo *pOperator, SEx SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, i); SSDataBlock* pRes = pExchangeInfo->pResult; - + SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo; if (pRsp->numOfRows == 0) { qDebug("%s vgId:%d, taskID:0x%" PRIx64 " index:%d completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 " try next", GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, i + 1, pDataInfo->totalRows, - pExchangeInfo->totalRows); - pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED; + pExchangeInfo->loadInfo.totalRows); + pDataInfo->status = DATA_EXHAUSTED; completed += 1; continue; } - code = setSDataBlockFromFetchRsp(pExchangeInfo->pResult, pExchangeInfo, pDataInfo, pOperator->numOfOutput, startTs); + SRetrieveTableRsp* pTableRsp = pDataInfo->pRsp; + code = setSDataBlockFromFetchRsp(pExchangeInfo->pResult, pLoadInfo, pTableRsp->numOfRows, + pTableRsp->data, pTableRsp->compLen, pOperator->numOfOutput, startTs, &pDataInfo->totalRows); if (code != 0) { goto _error; } @@ -5064,13 +5068,13 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo *pOperator, SEx qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows, - pDataInfo->totalRows, pExchangeInfo->totalRows, pExchangeInfo->totalSize, i + 1, + pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize, i + 1, totalSources); pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED; } else { qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, totalRows:%" PRIu64 ", totalBytes:%" PRIu64, - GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows, pExchangeInfo->totalRows, - pExchangeInfo->totalSize); + GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows, pLoadInfo->totalRows, + pLoadInfo->totalSize); } if (pDataInfo->status != EX_SOURCE_DATA_EXHAUSTED) { @@ -5139,10 +5143,12 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo *pOperator) { SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current); SRetrieveTableRsp* pRsp = pDataInfo->pRsp; + SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo; + if (pRsp->numOfRows == 0) { qDebug("%s vgId:%d, taskID:0x%"PRIx64" %d of total completed, rowsOfSource:%"PRIu64", totalRows:%"PRIu64" try next", GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pExchangeInfo->current + 1, - pDataInfo->totalRows, pExchangeInfo->totalRows); + pDataInfo->totalRows, pLoadInfo->totalRows); pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED; pExchangeInfo->current += 1; @@ -5150,20 +5156,22 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo *pOperator) { } SSDataBlock* pRes = pExchangeInfo->pResult; - setSDataBlockFromFetchRsp(pExchangeInfo->pResult, pExchangeInfo, pDataInfo, pOperator->numOfOutput, startTs); + SRetrieveTableRsp* pTableRsp = pDataInfo->pRsp; + int32_t code = setSDataBlockFromFetchRsp(pExchangeInfo->pResult, pLoadInfo, pTableRsp->numOfRows, + pTableRsp->data, pTableRsp->compLen, pOperator->numOfOutput, startTs, &pDataInfo->totalRows); if (pRsp->completed == 1) { qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows, - pDataInfo->totalRows, pExchangeInfo->totalRows, pExchangeInfo->totalSize, pExchangeInfo->current + 1, + pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize, pExchangeInfo->current + 1, totalSources); pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED; pExchangeInfo->current += 1; } else { qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, totalRows:%" PRIu64 ", totalBytes:%" PRIu64, - GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows, pExchangeInfo->totalRows, pExchangeInfo->totalSize); + GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows, pLoadInfo->totalRows, pLoadInfo->totalSize); } return pExchangeInfo->pResult; @@ -5196,16 +5204,13 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) { SExchangeInfo *pExchangeInfo = pOperator->info; SExecTaskInfo *pTaskInfo = pOperator->pTaskInfo; - int32_t code = pOperator->_openFn(pOperator); - if (code != TSDB_CODE_SUCCESS) { - pTaskInfo->code = code; - return NULL; - } + size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources); + SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo; size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources); if (pOperator->status == OP_EXEC_DONE) { qDebug("%s all %"PRIzu" source(s) are exhausted, total rows:%"PRIu64" bytes:%"PRIu64", elapsed:%.2f ms", GET_TASKID(pTaskInfo), totalSources, - pExchangeInfo->totalRows, pExchangeInfo->totalSize, pExchangeInfo->totalElapsed/1000.0); + pLoadInfo->totalRows, pLoadInfo->totalSize, pLoadInfo->totalElapsed/1000.0); return NULL; } @@ -5483,18 +5488,16 @@ SOperatorInfo* createStreamScanOperatorInfo(void *streamReadHandle, SSDataBlock* return pOperator; } - static int32_t loadSysTableContentCb(void* param, const SDataBuf* pMsg, int32_t code) { - SSourceDataInfo* pSourceDataInfo = (SSourceDataInfo*) param; - pSourceDataInfo->pRsp = pMsg->pData; + SSysTableScanInfo* pScanResInfo = (SSysTableScanInfo*) param; + pScanResInfo->pRsp = pMsg->pData; - SRetrieveTableRsp* pRsp = pSourceDataInfo->pRsp; + SRetrieveMetaTableRsp* pRsp = pScanResInfo->pRsp; pRsp->numOfRows = htonl(pRsp->numOfRows); pRsp->useconds = htobe64(pRsp->useconds); + pRsp->handle = htobe64(pRsp->handle); pRsp->compLen = htonl(pRsp->compLen); - - pSourceDataInfo->status = EX_SOURCE_DATA_READY; - tsem_post(&pSourceDataInfo->pEx->ready); + tsem_post(&pScanResInfo->ready); } static SSDataBlock* doSysTableScan(void* param, bool* newgroup) { @@ -5521,22 +5524,19 @@ static SSDataBlock* doSysTableScan(void* param, bool* newgroup) { } } - pInfo->totalRows += numOfRows; + pInfo->loadInfo.totalRows += numOfRows; pInfo->pRes->info.rows = numOfRows; // pInfo->elapsedTime; // pInfo->totalBytes; return (pInfo->pRes->info.rows == 0)? NULL:pInfo->pRes; } else { // load the meta from mnode of the given epset - if (pInfo->pReq == NULL) { - pInfo->pReq = calloc(1, sizeof(SRetrieveTableReq)); - if (pInfo->pReq == NULL) { - pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; - return NULL; - } + int64_t startTs = taosGetTimestampUs(); - pInfo->pReq->type = pInfo->type; - } + pInfo->req.type = pInfo->type; + int32_t contLen = tSerializeSRetrieveTableReq(NULL, 0, &pInfo->req); + char* buf1 = calloc(1, contLen); + tSerializeSRetrieveTableReq(buf1, contLen, &pInfo->req); // send the fetch remote task result reques SMsgSendInfo* pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo)); @@ -5546,24 +5546,38 @@ static SSDataBlock* doSysTableScan(void* param, bool* newgroup) { return NULL; } - pMsgSendInfo->param = NULL; - pMsgSendInfo->msgInfo.pData = pInfo->pReq; - pMsgSendInfo->msgInfo.len = sizeof(SRetrieveTableReq); + pMsgSendInfo->param = pInfo; + pMsgSendInfo->msgInfo.pData = buf1; + pMsgSendInfo->msgInfo.len = contLen; pMsgSendInfo->msgType = TDMT_MND_SYSTABLE_RETRIEVE; - pMsgSendInfo->fp = loadRemoteDataCallback; + pMsgSendInfo->fp = loadSysTableContentCb; int64_t transporterId = 0; int32_t code = asyncSendMsgToServer(pInfo->pTransporter, &pInfo->epSet, &transporterId, pMsgSendInfo); - tsem_wait(&pInfo->ready); - // handle the response and return to the caller + + SRetrieveMetaTableRsp* pRsp = pInfo->pRsp; + pInfo->req.showId = pRsp->handle; + + if (pRsp->numOfRows == 0) { +// qDebug("%s vgId:%d, taskID:0x%"PRIx64" %d of total completed, rowsOfSource:%"PRIu64", totalRows:%"PRIu64" try next", +// GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pExchangeInfo->current + 1, +// pDataInfo->totalRows, pExchangeInfo->totalRows); + return NULL; + } + + SRetrieveMetaTableRsp* pTableRsp = pInfo->pRsp; + setSDataBlockFromFetchRsp(pInfo->pRes, &pInfo->loadInfo, pTableRsp->numOfRows, + pTableRsp->data, pTableRsp->compLen, pOperator->numOfOutput, startTs, NULL); + + return pInfo->pRes; } return NULL; } -SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, const SArray* pExprInfo, const SSchema* pSchema, - int32_t tableType, SEpSet epset, SExecTaskInfo* pTaskInfo) { +SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataBlock* pResBlock, int32_t tableType, + SEpSet epset, SExecTaskInfo* pTaskInfo) { SSysTableScanInfo* pInfo = calloc(1, sizeof(SSysTableScanInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -5573,7 +5587,7 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, const S return NULL; } - // todo: create the schema of result data block + pInfo->pRes = pResBlock; pInfo->capacity = 4096; pInfo->type = tableType; if (pInfo->type == TSDB_MGMT_TABLE_TABLE) { @@ -5590,70 +5604,35 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, const S pOperator->blockingOptr = false; pOperator->status = OP_IN_EXECUTING; pOperator->info = pInfo; - pOperator->numOfOutput = taosArrayGetSize(pExprInfo); - pOperator->getNextFn = doSysTableScan; + pOperator->numOfOutput = pResBlock->info.numOfCols; + pOperator->nextDataFn = doSysTableScan; pOperator->closeFn = destroySysTableScannerOperatorInfo; pOperator->pTaskInfo = pTaskInfo; - return pOperator; -} - -void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInfo* pDownstream) { - assert(pTableScanInfo != NULL && pDownstream != NULL); +#if 1 + { // todo refactor + SRpcInit rpcInit; + memset(&rpcInit, 0, sizeof(rpcInit)); + rpcInit.localPort = 0; + rpcInit.label = "DB-META"; + rpcInit.numOfThreads = 1; + rpcInit.cfp = qProcessFetchRsp; + rpcInit.sessions = tsMaxConnections; + rpcInit.connType = TAOS_CONN_CLIENT; + rpcInit.user = (char *)"root"; + rpcInit.idleTime = tsShellActivityTimer * 1000; + rpcInit.ckey = "key"; + rpcInit.spi = 1; + rpcInit.secret = (char *)"dcc5bed04851fec854c035b2e40263b6"; - pTableScanInfo->pExpr = pDownstream->pExpr; // TODO refactor to use colId instead of pExpr - pTableScanInfo->numOfOutput = pDownstream->numOfOutput; -#if 0 - if (pDownstream->operatorType == OP_Aggregate || pDownstream->operatorType == OP_MultiTableAggregate) { - SAggOperatorInfo* pAggInfo = pDownstream->info; - - pTableScanInfo->pCtx = pAggInfo->binfo.pCtx; - pTableScanInfo->pResultRowInfo = &pAggInfo->binfo.resultRowInfo; - pTableScanInfo->rowCellInfoOffset = pAggInfo->binfo.rowCellInfoOffset; - } else if (pDownstream->operatorType == OP_TimeWindow || pDownstream->operatorType == OP_AllTimeWindow) { - STableIntervalOperatorInfo *pIntervalInfo = pDownstream->info; - - pTableScanInfo->pCtx = pIntervalInfo->pCtx; - pTableScanInfo->pResultRowInfo = &pIntervalInfo->resultRowInfo; - pTableScanInfo->rowCellInfoOffset = pIntervalInfo->rowCellInfoOffset; - - } else if (pDownstream->operatorType == OP_Groupby) { - SGroupbyOperatorInfo *pGroupbyInfo = pDownstream->info; - - pTableScanInfo->pCtx = pGroupbyInfo->binfo.pCtx; - pTableScanInfo->pResultRowInfo = &pGroupbyInfo->binfo.resultRowInfo; - pTableScanInfo->rowCellInfoOffset = pGroupbyInfo->binfo.rowCellInfoOffset; - - } else if (pDownstream->operatorType == OP_MultiTableTimeInterval || pDownstream->operatorType == OP_AllMultiTableTimeInterval) { - STableIntervalOperatorInfo *pInfo = pDownstream->info; - - pTableScanInfo->pCtx = pInfo->pCtx; - pTableScanInfo->pResultRowInfo = &pInfo->resultRowInfo; - pTableScanInfo->rowCellInfoOffset = pInfo->rowCellInfoOffset; - - } else if (pDownstream->operatorType == OP_Project) { - SProjectOperatorInfo *pInfo = pDownstream->info; - - pTableScanInfo->pCtx = pInfo->binfo.pCtx; - pTableScanInfo->pResultRowInfo = &pInfo->binfo.resultRowInfo; - pTableScanInfo->rowCellInfoOffset = pInfo->binfo.rowCellInfoOffset; - } else if (pDownstream->operatorType == OP_SessionWindow) { - SSWindowOperatorInfo* pInfo = pDownstream->info; - - pTableScanInfo->pCtx = pInfo->binfo.pCtx; - pTableScanInfo->pResultRowInfo = &pInfo->binfo.resultRowInfo; - pTableScanInfo->rowCellInfoOffset = pInfo->binfo.rowCellInfoOffset; - } else if (pDownstream->operatorType == OP_StateWindow) { - SStateWindowOperatorInfo* pInfo = pDownstream->info; - - pTableScanInfo->pCtx = pInfo->binfo.pCtx; - pTableScanInfo->pResultRowInfo = &pInfo->binfo.resultRowInfo; - pTableScanInfo->rowCellInfoOffset = pInfo->binfo.rowCellInfoOffset; - } else { - assert(0); + pInfo->pTransporter = rpcOpen(&rpcInit); + if (pInfo->pTransporter == NULL) { + return NULL; // todo + } } #endif + return pOperator; } SArray* getOrderCheckColumns(STaskAttr* pQuery) { @@ -7355,23 +7334,23 @@ SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo) { SProjectOperatorInfo* pInfo = calloc(1, sizeof(SProjectOperatorInfo)); - int32_t numOfRows = 4096; - pInfo->binfo.pRes = createOutputBuf_rv(pExprInfo, numOfRows); + pInfo->binfo.capacity = 4096; + pInfo->binfo.pRes = createOutputBuf_rv(pExprInfo, pInfo->binfo.capacity); pInfo->binfo.pCtx = createSqlFunctionCtx_rv(pExprInfo, &pInfo->binfo.rowCellInfoOffset); - // initResultRowInfo(&pBInfo->resultRowInfo, 8); // setFunctionResultOutput(pBInfo, MAIN_SCAN); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "ProjectOperator"; - // pOperator->operatorType = OP_Project; + pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PROJECT; pOperator->blockingOptr = false; pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; pOperator->pExpr = exprArrayDup(pExprInfo); pOperator->numOfOutput = taosArrayGetSize(pExprInfo); - pOperator->getNextFn = doProjectOperation; + pOperator->nextDataFn = doProjectOperation; + pOperator->pTaskInfo = pTaskInfo; pOperator->closeFn = destroyProjectOperatorInfo; int32_t code = appendDownstream(pOperator, &downstream, 1); @@ -8039,37 +8018,6 @@ bool validateExprColumnInfo(SQueriedTableInfo *pTableInfo, SExprBasicInfo *pExpr return j != INT32_MIN; } -static bool validateQueryTableCols(SQueriedTableInfo* pTableInfo, SExprBasicInfo** pExpr, int32_t numOfOutput, - SColumnInfo* pTagCols, void* pMsg) { - int32_t numOfTotal = pTableInfo->numOfCols + pTableInfo->numOfTags; - if (pTableInfo->numOfCols < 0 || pTableInfo->numOfTags < 0 || numOfTotal > TSDB_MAX_COLUMNS) { - //qError("qmsg:%p illegal value of numOfCols %d numOfTags:%d", pMsg, pTableInfo->numOfCols, pTableInfo->numOfTags); - return false; - } - - if (numOfTotal == 0) { // table total columns are not required. -// for(int32_t i = 0; i < numOfOutput; ++i) { -// SExprBasicInfo* p = pExpr[i]; -// if ((p->functionId == FUNCTION_TAGPRJ) || -// (p->functionId == FUNCTION_TID_TAG && p->colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) || -// (p->functionId == FUNCTION_COUNT && p->colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) || -// (p->functionId == FUNCTION_BLKINFO)) { -// continue; -// } -// -// return false; -// } - } - - for(int32_t i = 0; i < numOfOutput; ++i) { - if (!validateExprColumnInfo(pTableInfo, pExpr[i], pTagCols)) { - return TSDB_CODE_QRY_INVALID_MSG; - } - } - - return true; -} - static int32_t deserializeColFilterInfo(SColumnFilterInfo* pColFilters, int16_t numOfFilters, char** pMsg) { for (int32_t f = 0; f < numOfFilters; ++f) { SColumnFilterInfo *pFilterMsg = (SColumnFilterInfo *)(*pMsg); @@ -8104,10 +8052,10 @@ static int32_t deserializeColFilterInfo(SColumnFilterInfo* pColFilters, int16_t static SResSchema createResSchema(int32_t type, int32_t bytes, int32_t slotId, int32_t scale, int32_t precision, const char* name) { SResSchema s = {0}; s.scale = scale; - s.precision = precision; s.type = type; s.bytes = bytes; s.colId = slotId; + s.precision = precision; strncpy(s.name, name, tListLen(s.name)); return s; @@ -8243,9 +8191,16 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa SArray* colList = extractScanColumnId(pScanPhyNode->pScanCols); SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle->reader, pResBlock, colList, tableIdList, pTaskInfo); - taosArrayDestroy(tableIdList); return pOperator; + } else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == nodeType(pPhyNode)) { + SSystemTableScanPhysiNode * pSysScanPhyNode = (SSystemTableScanPhysiNode*)pPhyNode; + SSDataBlock* pResBlock = createOutputBuf_rv1(pSysScanPhyNode->scan.node.pOutputDataBlockDesc); + + SOperatorInfo* pOperator = createSysTableScanOperatorInfo(NULL, pResBlock, TSDB_MGMT_TABLE_DB, pSysScanPhyNode->mgmtEpSet, pTaskInfo); + return pOperator; + } else { + ASSERT(0); } } diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index 7ba7d7bdcccffd14e6f224e62ddf08e2dd8475bd..3f28f4de7b7a9ae390e28d769004cd34c03d1745 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -37,6 +37,10 @@ bool getMinmaxFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv); void minFunction(SqlFunctionCtx* pCtx); void maxFunction(SqlFunctionCtx *pCtx); +bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv); +void firstFunction(SqlFunctionCtx *pCtx); +void lastFunction(SqlFunctionCtx *pCtx); + #ifdef __cplusplus } #endif diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index edb0acf07508c520abddf51e06e1e75ea30264d6..1d9db9aa71aec6e62d94697a888d110879d57bd3 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -61,6 +61,26 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .processFunc = maxFunction, .finalizeFunc = functionFinalizer }, + { + .name = "first", + .type = FUNCTION_TYPE_FIRST, + .classification = FUNC_MGT_AGG_FUNC, + .checkFunc = stubCheckAndGetResultType, + .getEnvFunc = getFirstLastFuncEnv, + .initFunc = functionSetup, + .processFunc = firstFunction, + .finalizeFunc = functionFinalizer + }, + { + .name = "last", + .type = FUNCTION_TYPE_LAST, + .classification = FUNC_MGT_AGG_FUNC, + .checkFunc = stubCheckAndGetResultType, + .getEnvFunc = getFirstLastFuncEnv, + .initFunc = functionSetup, + .processFunc = lastFunction, + .finalizeFunc = functionFinalizer + }, { .name = "concat", .type = FUNCTION_TYPE_CONCAT, @@ -98,6 +118,8 @@ int32_t stubCheckAndGetResultType(SFunctionNode* pFunc) { pFunc->node.resType = (SDataType) { .bytes = tDataTypes[resType].bytes, .type = resType }; break; } + case FUNCTION_TYPE_FIRST: + case FUNCTION_TYPE_LAST: case FUNCTION_TYPE_MIN: case FUNCTION_TYPE_MAX: { SColumnNode* pParam = nodesListGetNode(pFunc->pParameterList, 0); diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index aaaee6d56cca8f16204eae16d2b26c352de4957d..ccac37fd0c903885d330020bd49ba3ca551d6e82 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -72,13 +72,12 @@ void countFunction(SqlFunctionCtx *pCtx) { int32_t numOfElem = 0; /* - * 1. column data missing (schema modified) causes pCtx->hasNull == true. pCtx->isAggSet == true; - * 2. for general non-primary key columns, pCtx->hasNull may be true or false, pCtx->isAggSet == true; - * 3. for primary key column, pCtx->hasNull always be false, pCtx->isAggSet == false; + * 1. column data missing (schema modified) causes pInputCol->hasNull == true. pInput->colDataAggIsSet == true; + * 2. for general non-primary key columns, pInputCol->hasNull may be true or false, pInput->colDataAggIsSet == true; + * 3. for primary key column, pInputCol->hasNull always be false, pInput->colDataAggIsSet == false; */ SInputColumnInfoData* pInput = &pCtx->input; SColumnInfoData* pInputCol = pInput->pData[0]; - if (pInput->colDataAggIsSet && pInput->totalRows == pInput->numOfRows) { numOfElem = pInput->numOfRows - pInput->pColumnDataAgg[0]->numOfNull; ASSERT(numOfElem >= 0); @@ -173,7 +172,7 @@ void sumFunction(SqlFunctionCtx *pCtx) { SET_VAL(GET_RES_INFO(pCtx), numOfElem, 1); } -bool getSumFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { +bool getSumFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { pEnv->calcMemSize = sizeof(SSumRes); return true; } @@ -265,8 +264,7 @@ bool minFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) { return true; } -bool getMinmaxFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { - SNode* pNode = nodesListGetNode(pFunc->pParameterList, 0); +bool getMinmaxFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { pEnv->calcMemSize = sizeof(int64_t); return true; } @@ -278,34 +276,34 @@ bool getMinmaxFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { do { \ for (int32_t _i = 0; _i < (ctx)->tagInfo.numOfTagCols; ++_i) { \ SqlFunctionCtx *__ctx = (ctx)->tagInfo.pTagCtxList[_i]; \ - __ctx->fpSet.process(__ctx); \ + __ctx->fpSet.process(__ctx); \ } \ } while (0); -#define DO_UPDATE_SUBSID_RES(ctx, ts) \ - do { \ +#define DO_UPDATE_SUBSID_RES(ctx, ts) \ + do { \ for (int32_t _i = 0; _i < (ctx)->subsidiaryRes.numOfCols; ++_i) { \ - SqlFunctionCtx *__ctx = (ctx)->subsidiaryRes.pCtx[_i]; \ - if (__ctx->functionId == FUNCTION_TS_DUMMY) { \ - __ctx->tag.i = (ts); \ - __ctx->tag.nType = TSDB_DATA_TYPE_BIGINT; \ - } \ - __ctx->fpSet.process(__ctx); \ - } \ + SqlFunctionCtx *__ctx = (ctx)->subsidiaryRes.pCtx[_i]; \ + if (__ctx->functionId == FUNCTION_TS_DUMMY) { \ + __ctx->tag.i = (ts); \ + __ctx->tag.nType = TSDB_DATA_TYPE_BIGINT; \ + } \ + __ctx->fpSet.process(__ctx); \ + } \ } while (0) #define UPDATE_DATA(ctx, left, right, num, sign, _ts) \ - do { \ - if (((left) < (right)) ^ (sign)) { \ - (left) = (right); \ - DO_UPDATE_SUBSID_RES(ctx, _ts); \ - (num) += 1; \ - } \ + do { \ + if (((left) < (right)) ^ (sign)) { \ + (left) = (right); \ + DO_UPDATE_SUBSID_RES(ctx, _ts); \ + (num) += 1; \ + } \ } while (0) -#define LOOPCHECK_N(val, _col, ctx, _t, _nrow, _start, sign, num) \ +#define LOOPCHECK_N(val, _col, ctx, _t, _nrow, _start, sign, num) \ do { \ - _t* d = (_t*)((_col)->pData); \ + _t *d = (_t *)((_col)->pData); \ for (int32_t i = (_start); i < (_nrow) + (_start); ++i) { \ if (((_col)->hasNull) && colDataIsNull_f((_col)->nullbitmap, i)) { \ continue; \ @@ -445,4 +443,118 @@ void minFunction(SqlFunctionCtx *pCtx) { void maxFunction(SqlFunctionCtx *pCtx) { int32_t numOfElems = doMinMaxHelper(pCtx, 0); SET_VAL(GET_RES_INFO(pCtx), numOfElems, 1); -} \ No newline at end of file +} + +bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { + SColumnNode* pNode = nodesListGetNode(pFunc->pParameterList, 0); + pEnv->calcMemSize = pNode->node.resType.bytes; + return true; +} + +// TODO fix this +// This ordinary first function only handle the data block in ascending order +void firstFunction(SqlFunctionCtx *pCtx) { + if (pCtx->order == TSDB_ORDER_DESC) { + return; + } + + int32_t numOfElems = 0; + + struct SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); + char* buf = GET_ROWCELL_INTERBUF(pResInfo); + + SInputColumnInfoData* pInput = &pCtx->input; + + SColumnInfoData* pInputCol = pInput->pData[0]; + + // All null data column, return directly. + if (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows) { + ASSERT(pInputCol->hasNull == true); + return; + } + + // Check for the first not null data + for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) { + if (pInputCol->hasNull && colDataIsNull(pInputCol, pInput->totalRows, i, NULL)) { + continue; + } + + char* data = colDataGetData(pInputCol, i); + memcpy(buf, data, pInputCol->info.bytes); + // TODO handle the subsidary value +// if (pCtx->ptsList != NULL) { +// TSKEY k = GET_TS_DATA(pCtx, i); +// DO_UPDATE_TAG_COLUMNS(pCtx, k); +// } + + pResInfo->hasResult = DATA_SET_FLAG; + pResInfo->complete = true; + + numOfElems++; + break; + } + + SET_VAL(pResInfo, numOfElems, 1); +} + +void lastFunction(SqlFunctionCtx *pCtx) { + if (pCtx->order != TSDB_ORDER_DESC) { + return; + } + + int32_t numOfElems = 0; + + struct SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); + char* buf = GET_ROWCELL_INTERBUF(pResInfo); + + SInputColumnInfoData* pInput = &pCtx->input; + + SColumnInfoData* pInputCol = pInput->pData[0]; + + // All null data column, return directly. + if (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows) { + ASSERT(pInputCol->hasNull == true); + return; + } + + if (pCtx->order == TSDB_ORDER_DESC) { + for (int32_t i = pInput->numOfRows + pInput->startRowIndex - 1; i >= pInput->startRowIndex; --i) { + if (pInputCol->hasNull && colDataIsNull(pInputCol, pInput->totalRows, i, NULL)) { + continue; + } + + char* data = colDataGetData(pInputCol, i); + memcpy(buf, data, pInputCol->info.bytes); + +// TSKEY ts = pCtx->ptsList ? GET_TS_DATA(pCtx, i) : 0; +// DO_UPDATE_TAG_COLUMNS(pCtx, ts); + + pResInfo->hasResult = DATA_SET_FLAG; + pResInfo->complete = true; // set query completed on this column + numOfElems++; + break; + } + } else { // ascending order + for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) { + if (pInputCol->hasNull && colDataIsNull(pInputCol, pInput->totalRows, i, NULL)) { + continue; + } + + char* data = colDataGetData(pInputCol, i); + TSKEY ts = pCtx->ptsList ? GET_TS_DATA(pCtx, i) : 0; + + if (pResInfo->hasResult != DATA_SET_FLAG || (*(TSKEY*)buf) < ts) { + pResInfo->hasResult = DATA_SET_FLAG; + memcpy(buf, data, pCtx->inputBytes); + + *(TSKEY*)buf = ts; +// DO_UPDATE_TAG_COLUMNS(pCtx, ts); + } + + numOfElems++; + break; + } + } + + SET_VAL(pResInfo, numOfElems, 1); +} diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 782cdcb71a934fbc56d2f8f915210e883bea5d7e..043d3100fa75538d45f5733aacaaf90473e63483 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -107,6 +107,8 @@ const char* nodesNodeName(ENodeType type) { return "PhysiTableSeqScan"; case QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN: return "PhysiSreamScan"; + case QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN: + return "PhysiSystemTableScan"; case QUERY_NODE_PHYSICAL_PLAN_PROJECT: return "PhysiProject"; case QUERY_NODE_PHYSICAL_PLAN_JOIN: @@ -440,6 +442,87 @@ static int32_t jsonToPhysiTableScanNode(const SJson* pJson, void* pObj) { return code; } +static const char* jkEndPointFqdn = "Fqdn"; +static const char* jkEndPointPort = "Port"; + +static int32_t epToJson(const void* pObj, SJson* pJson) { + const SEp* pNode = (const SEp*)pObj; + + int32_t code = tjsonAddStringToObject(pJson, jkEndPointFqdn, pNode->fqdn); + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkEndPointPort, pNode->port); + } + + return code; +} + +static int32_t jsonToEp(const SJson* pJson, void* pObj) { + SEp* pNode = (SEp*)pObj; + + int32_t code = tjsonGetStringValue(pJson, jkEndPointFqdn, pNode->fqdn); + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetSmallIntValue(pJson, jkEndPointPort, &pNode->port); + } + + return code; +} + +static const char* jkEpSetInUse = "InUse"; +static const char* jkEpSetNumOfEps = "NumOfEps"; +static const char* jkEpSetEps = "Eps"; + +static int32_t epSetToJson(const void* pObj, SJson* pJson) { + const SEpSet* pNode = (const SEpSet*)pObj; + + int32_t code = tjsonAddIntegerToObject(pJson, jkEpSetInUse, pNode->inUse); + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkEpSetNumOfEps, pNode->numOfEps); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddArray(pJson, jkEpSetEps, epToJson, pNode->eps, sizeof(SEp), pNode->numOfEps); + } + + return code; +} + +static int32_t jsonToEpSet(const SJson* pJson, void* pObj) { + SEpSet* pNode = (SEpSet*)pObj; + + int32_t code = tjsonGetTinyIntValue(pJson, jkEpSetInUse, &pNode->inUse); + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetTinyIntValue(pJson, jkEpSetNumOfEps, &pNode->numOfEps); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonToArray(pJson, jkEpSetEps, jsonToEp, pNode->eps, sizeof(SEp)); + } + + return code; +} + +static const char* jkSysTableScanPhysiPlanMnodeEpSet = "MnodeEpSet"; + +static int32_t physiSysTableScanNodeToJson(const void* pObj, SJson* pJson) { + const SSystemTableScanPhysiNode* pNode = (const SSystemTableScanPhysiNode*)pObj; + + int32_t code = physiScanNodeToJson(pObj, pJson); + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddObject(pJson, jkSysTableScanPhysiPlanMnodeEpSet, epSetToJson, &pNode->mgmtEpSet); + } + + return code; +} + +static int32_t jsonToPhysiSysTableScanNode(const SJson* pJson, void* pObj) { + SSystemTableScanPhysiNode* pNode = (SSystemTableScanPhysiNode*)pObj; + + int32_t code = jsonToPhysiScanNode(pJson, pObj); + if (TSDB_CODE_SUCCESS == code) { + code = tjsonToObject(pJson, jkSysTableScanPhysiPlanMnodeEpSet, jsonToEpSet, &pNode->mgmtEpSet); + } + + return code; +} + static const char* jkProjectPhysiPlanProjections = "Projections"; static int32_t physiProjectNodeToJson(const void* pObj, SJson* pJson) { @@ -625,31 +708,6 @@ static int32_t jsonToSubplanId(const SJson* pJson, void* pObj) { return code; } -static const char* jkEndPointFqdn = "Fqdn"; -static const char* jkEndPointPort = "Port"; - -static int32_t epToJson(const void* pObj, SJson* pJson) { - const SEp* pNode = (const SEp*)pObj; - - int32_t code = tjsonAddStringToObject(pJson, jkEndPointFqdn, pNode->fqdn); - if (TSDB_CODE_SUCCESS == code) { - code = tjsonAddIntegerToObject(pJson, jkEndPointPort, pNode->port); - } - - return code; -} - -static int32_t jsonToEp(const SJson* pJson, void* pObj) { - SEp* pNode = (SEp*)pObj; - - int32_t code = tjsonGetStringValue(pJson, jkEndPointFqdn, pNode->fqdn); - if (TSDB_CODE_SUCCESS == code) { - code = tjsonGetSmallIntValue(pJson, jkEndPointPort, &pNode->port); - } - - return code; -} - static const char* jkQueryNodeAddrId = "Id"; static const char* jkQueryNodeAddrInUse = "InUse"; static const char* jkQueryNodeAddrNumOfEps = "NumOfEps"; @@ -1490,6 +1548,8 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) { case QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN: case QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN: break; + case QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN: + return physiSysTableScanNodeToJson(pObj, pJson); case QUERY_NODE_PHYSICAL_PLAN_PROJECT: return physiProjectNodeToJson(pObj, pJson); case QUERY_NODE_PHYSICAL_PLAN_JOIN: @@ -1567,6 +1627,8 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) { return jsonToPhysiTagScanNode(pJson, pObj); case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN: return jsonToPhysiTableScanNode(pJson, pObj); + case QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN: + return jsonToPhysiSysTableScanNode(pJson, pObj); case QUERY_NODE_PHYSICAL_PLAN_PROJECT: return jsonToPhysiProjectNode(pJson, pObj); case QUERY_NODE_PHYSICAL_PLAN_JOIN: diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index 7e1d15c191ac3237b089ea961c44ed5121c9f7c1..c1bd4b2279c4790d1f9e846449848f00d8e68e2a 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -146,6 +146,8 @@ SNodeptr nodesMakeNode(ENodeType type) { return makeNode(type, sizeof(STableSeqScanPhysiNode)); case QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN: return makeNode(type, sizeof(SNode)); + case QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN: + return makeNode(type, sizeof(SSystemTableScanPhysiNode)); case QUERY_NODE_PHYSICAL_PLAN_PROJECT: return makeNode(type, sizeof(SProjectPhysiNode)); case QUERY_NODE_PHYSICAL_PLAN_JOIN: diff --git a/source/libs/parser/inc/parUtil.h b/source/libs/parser/inc/parUtil.h index 4d7a8e2a18b280e4f1882a0c96838c36d3f00959..171b406e1853f3d606adb5aebdaf51740594042a 100644 --- a/source/libs/parser/inc/parUtil.h +++ b/source/libs/parser/inc/parUtil.h @@ -23,6 +23,13 @@ extern "C" { #include "os.h" #include "query.h" +#define parserFatal(param, ...) qFatal("PARSER: " param, __VA_ARGS__) +#define parserError(param, ...) qError("PARSER: " param, __VA_ARGS__) +#define parserWarn(param, ...) qWarn("PARSER: " param, __VA_ARGS__) +#define parserInfo(param, ...) qInfo("PARSER: " param, __VA_ARGS__) +#define parserDebug(param, ...) qDebug("PARSER: " param, __VA_ARGS__) +#define parserTrace(param, ...) qTrace("PARSER: " param, __VA_ARGS__) + typedef struct SMsgBuf { int32_t len; char *buf; diff --git a/source/libs/parser/src/parAstCreater.c b/source/libs/parser/src/parAstCreater.c index f21db0f133096eb9065842275a5661939509a905..9f6b6973719875f681711d7ebda35436857b7c04 100644 --- a/source/libs/parser/src/parAstCreater.c +++ b/source/libs/parser/src/parAstCreater.c @@ -396,9 +396,9 @@ static bool checkPort(SAstCreateContext* pCxt, const SToken* pPortToken, int32_t return pCxt->valid; } -static bool checkDbName(SAstCreateContext* pCxt, const SToken* pDbName) { +static bool checkDbName(SAstCreateContext* pCxt, const SToken* pDbName, bool query) { if (NULL == pDbName) { - return true; + return (query ? NULL != pCxt->pQueryCxt->db : true); } pCxt->valid = pDbName->n < TSDB_DB_NAME_LEN ? true : false; return pCxt->valid; @@ -557,7 +557,7 @@ SNode* createNodeListNode(SAstCreateContext* pCxt, SNodeList* pList) { } SNode* createRealTableNode(SAstCreateContext* pCxt, const SToken* pDbName, const SToken* pTableName, const SToken* pTableAlias) { - if (!checkDbName(pCxt, pDbName) || !checkTableName(pCxt, pTableName)) { + if (!checkDbName(pCxt, pDbName, true) || !checkTableName(pCxt, pTableName)) { return NULL; } SRealTableNode* realTable = (SRealTableNode*)nodesMakeNode(QUERY_NODE_REAL_TABLE); @@ -769,7 +769,7 @@ SDatabaseOptions* setDatabaseOption(SAstCreateContext* pCxt, SDatabaseOptions* p } SNode* createCreateDatabaseStmt(SAstCreateContext* pCxt, bool ignoreExists, const SToken* pDbName, SDatabaseOptions* pOptions) { - if (!checkDbName(pCxt, pDbName)) { + if (!checkDbName(pCxt, pDbName, false)) { return NULL; } SCreateDatabaseStmt* pStmt = (SCreateDatabaseStmt*)nodesMakeNode(QUERY_NODE_CREATE_DATABASE_STMT); @@ -782,7 +782,7 @@ SNode* createCreateDatabaseStmt(SAstCreateContext* pCxt, bool ignoreExists, cons } SNode* createDropDatabaseStmt(SAstCreateContext* pCxt, bool ignoreNotExists, const SToken* pDbName) { - if (!checkDbName(pCxt, pDbName)) { + if (!checkDbName(pCxt, pDbName, false)) { return NULL; } SDropDatabaseStmt* pStmt = (SDropDatabaseStmt*)nodesMakeNode(QUERY_NODE_DROP_DATABASE_STMT); @@ -904,7 +904,7 @@ SNode* createUseDatabaseStmt(SAstCreateContext* pCxt, const SToken* pDbName) { } SNode* createShowStmt(SAstCreateContext* pCxt, ENodeType type, const SToken* pDbName) { - if (!checkDbName(pCxt, pDbName)) { + if (!checkDbName(pCxt, pDbName, false)) { return NULL; } SShowStmt* pStmt = nodesMakeNode(type);; diff --git a/source/libs/parser/src/parInsert.c b/source/libs/parser/src/parInsert.c index 3bcec81bb091073b9c9488f28e038e52e40cafb0..57b8acfbda37c5bab7965621593409d37caa0ac3 100644 --- a/source/libs/parser/src/parInsert.c +++ b/source/libs/parser/src/parInsert.c @@ -1032,7 +1032,6 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery) { }; if (NULL == context.pVgroupsHashObj || NULL == context.pTableBlockHashObj || NULL == context.pOutput) { - terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY; } @@ -1051,6 +1050,5 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery) { code = parseInsertBody(&context); } destroyInsertParseContext(&context); - terrno = code; return code; } diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index f67d56324ad1d573dbc8bd4306fd5ad07e11e7b0..47588db7d7f2fa35f44375c341600c6dee20ea30 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -69,14 +69,78 @@ static int32_t addNamespace(STranslateContext* pCxt, void* pTable) { return TSDB_CODE_SUCCESS; } -static SName* toName(int32_t acctId, const SRealTableNode* pRealTable, SName* pName) { +static SName* toName(int32_t acctId, const char* pDbName, const char* pTableName, SName* pName) { pName->type = TSDB_TABLE_NAME_T; pName->acctId = acctId; - strcpy(pName->dbname, pRealTable->table.dbName); - strcpy(pName->tname, pRealTable->table.tableName); + strcpy(pName->dbname, pDbName); + strcpy(pName->tname, pTableName); return pName; } +static int32_t getTableMetaImpl(SParseContext* pCxt, const SName* pName, STableMeta** pMeta) { + int32_t code = catalogGetTableMeta(pCxt->pCatalog, pCxt->pTransporter, &pCxt->mgmtEpSet, pName, pMeta); + if (TSDB_CODE_SUCCESS != code) { + parserError("catalogGetTableMeta error, code:%s, dbName:%s, tbName:%s", tstrerror(code), pName->dbname, pName->tname); + } + return code; +} + +static int32_t getTableMeta(SParseContext* pCxt, const char* pDbName, const char* pTableName, STableMeta** pMeta) { + SName name = { .type = TSDB_TABLE_NAME_T, .acctId = pCxt->acctId }; + strcpy(name.dbname, pDbName); + strcpy(name.tname, pTableName); + return getTableMetaImpl(pCxt, &name, pMeta); +} + +static int32_t getTableDistVgInfo(SParseContext* pCxt, const SName* pName, SArray** pVgInfo) { + int32_t code = catalogGetTableDistVgInfo(pCxt->pCatalog, pCxt->pTransporter, &pCxt->mgmtEpSet, pName, pVgInfo); + if (TSDB_CODE_SUCCESS != code) { + parserError("catalogGetTableDistVgInfo error, code:%s, dbName:%s, tbName:%s", tstrerror(code), pName->dbname, pName->tname); + } + return code; +} + +static int32_t getDBVgInfoImpl(SParseContext* pCxt, const SName* pName, SArray** pVgInfo) { + char fullDbName[TSDB_DB_FNAME_LEN]; + tNameGetFullDbName(pName, fullDbName); + int32_t code = catalogGetDBVgInfo(pCxt->pCatalog, pCxt->pTransporter, &pCxt->mgmtEpSet, fullDbName, pVgInfo); + if (TSDB_CODE_SUCCESS != code) { + parserError("catalogGetDBVgInfo error, code:%s, dbFName:%s", tstrerror(code), fullDbName); + } + return code; +} + +static int32_t getDBVgInfo(SParseContext* pCxt, const char* pDbName, SArray** pVgInfo) { + SName name; + tNameSetDbName(&name, pCxt->acctId, pDbName, strlen(pDbName)); + char dbFname[TSDB_DB_FNAME_LEN] = {0}; + tNameGetFullDbName(&name, dbFname); + return getDBVgInfoImpl(pCxt, &name, pVgInfo); +} + +static int32_t getTableHashVgroupImpl(SParseContext* pCxt, const SName* pName, SVgroupInfo* pInfo) { + int32_t code = catalogGetTableHashVgroup(pCxt->pCatalog, pCxt->pTransporter, &pCxt->mgmtEpSet, pName, pInfo); + if (TSDB_CODE_SUCCESS != code) { + parserError("catalogGetTableHashVgroup error, code:%s, dbName:%s, tbName:%s", tstrerror(code), pName->dbname, pName->tname); + } + return code; +} + +static int32_t getTableHashVgroup(SParseContext* pCxt, const char* pDbName, const char* pTableName, SVgroupInfo* pInfo) { + SName name = { .type = TSDB_TABLE_NAME_T, .acctId = pCxt->acctId }; + strcpy(name.dbname, pDbName); + strcpy(name.tname, pTableName); + return getTableHashVgroupImpl(pCxt, &name, pInfo); +} + +static int32_t getDBVgVersion(SParseContext* pCxt, const char* pDbFName, int32_t* pVersion, int64_t* pDbId, int32_t* pTableNum) { + int32_t code = catalogGetDBVgVersion(pCxt->pCatalog, pDbFName, pVersion, pDbId, pTableNum); + if (TSDB_CODE_SUCCESS != code) { + parserError("catalogGetDBVgVersion error, code:%s, dbFName:%s", tstrerror(code), pDbFName); + } + return code; +} + static bool belongTable(const char* currentDb, const SColumnNode* pCol, const STableNode* pTable) { int cmp = 0; if ('\0' != pCol->dbName[0]) { @@ -498,25 +562,35 @@ static int32_t checkAggColCoexist(STranslateContext* pCxt, SSelectStmt* pSelect) return TSDB_CODE_SUCCESS; } -static int32_t setTableVgroupList(SParseContext* pCxt, SName* name, SRealTableNode* pRealTable) { +static int32_t toVgroupsInfo(SArray* pVgs, SVgroupsInfo** pVgsInfo) { + size_t vgroupNum = taosArrayGetSize(pVgs); + *pVgsInfo = calloc(1, sizeof(SVgroupsInfo) + sizeof(SVgroupInfo) * vgroupNum); + if (NULL == *pVgsInfo) { + return TSDB_CODE_OUT_OF_MEMORY; + } + (*pVgsInfo)->numOfVgroups = vgroupNum; + for (int32_t i = 0; i < vgroupNum; ++i) { + SVgroupInfo *vg = taosArrayGet(pVgs, i); + (*pVgsInfo)->vgroups[i] = *vg; + } + return TSDB_CODE_SUCCESS; +} + +static int32_t setTableVgroupList(SParseContext* pCxt, SName* pName, SRealTableNode* pRealTable) { + int32_t code = TSDB_CODE_SUCCESS; if (TSDB_SUPER_TABLE == pRealTable->pMeta->tableType) { SArray* vgroupList = NULL; - int32_t code = catalogGetTableDistVgInfo(pCxt->pCatalog, pCxt->pTransporter, &pCxt->mgmtEpSet, name, &vgroupList); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - - size_t vgroupNum = taosArrayGetSize(vgroupList); - pRealTable->pVgroupList = calloc(1, sizeof(SVgroupsInfo) + sizeof(SVgroupInfo) * vgroupNum); - if (NULL == pRealTable->pVgroupList) { - return TSDB_CODE_OUT_OF_MEMORY; + code = getTableDistVgInfo(pCxt, pName, &vgroupList); + if (TSDB_CODE_SUCCESS == code) { + code = toVgroupsInfo(vgroupList, &pRealTable->pVgroupList); } - pRealTable->pVgroupList->numOfVgroups = vgroupNum; - for (int32_t i = 0; i < vgroupNum; ++i) { - SVgroupInfo *vg = taosArrayGet(vgroupList, i); - pRealTable->pVgroupList->vgroups[i] = *vg; + taosArrayDestroy(vgroupList); + } else if (TSDB_SYSTEM_TABLE == pRealTable->pMeta->tableType) { + SArray* vgroupList = NULL; + code = getDBVgInfoImpl(pCxt, pName, &vgroupList); + if (TSDB_CODE_SUCCESS == code) { + code = toVgroupsInfo(vgroupList, &pRealTable->pVgroupList); } - taosArrayDestroy(vgroupList); } else { pRealTable->pVgroupList = calloc(1, sizeof(SVgroupsInfo) + sizeof(SVgroupInfo)); @@ -524,12 +598,9 @@ static int32_t setTableVgroupList(SParseContext* pCxt, SName* name, SRealTableNo return TSDB_CODE_OUT_OF_MEMORY; } pRealTable->pVgroupList->numOfVgroups = 1; - int32_t code = catalogGetTableHashVgroup(pCxt->pCatalog, pCxt->pTransporter, &pCxt->mgmtEpSet, name, pRealTable->pVgroupList->vgroups); - if (code != TSDB_CODE_SUCCESS) { - return code; - } + code = getTableHashVgroupImpl(pCxt, pName, pRealTable->pVgroupList->vgroups); } - return TSDB_CODE_SUCCESS; + return code; } static int32_t translateTable(STranslateContext* pCxt, SNode* pTable) { @@ -538,8 +609,8 @@ static int32_t translateTable(STranslateContext* pCxt, SNode* pTable) { case QUERY_NODE_REAL_TABLE: { SRealTableNode* pRealTable = (SRealTableNode*)pTable; SName name; - code = catalogGetTableMeta(pCxt->pParseCxt->pCatalog, pCxt->pParseCxt->pTransporter, &(pCxt->pParseCxt->mgmtEpSet), - toName(pCxt->pParseCxt->acctId, pRealTable, &name), &(pRealTable->pMeta)); + code = getTableMetaImpl(pCxt->pParseCxt, + toName(pCxt->pParseCxt->acctId, pRealTable->table.dbName, pRealTable->table.tableName, &name), &(pRealTable->pMeta)); if (TSDB_CODE_SUCCESS != code) { return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_TABLE_NOT_EXIST, pRealTable->table.tableName); } @@ -894,11 +965,10 @@ static int32_t doTranslateDropSuperTable(STranslateContext* pCxt, const SName* p static int32_t translateDropTable(STranslateContext* pCxt, SDropTableStmt* pStmt) { SDropTableClause* pClause = nodesListGetNode(pStmt->pTables, 0); - SName tableName = { .type = TSDB_TABLE_NAME_T, .acctId = pCxt->pParseCxt->acctId }; - strcpy(tableName.dbname, pClause->dbName); - strcpy(tableName.tname, pClause->tableName); STableMeta* pTableMeta = NULL; - int32_t code = catalogGetTableMeta(pCxt->pParseCxt->pCatalog, pCxt->pParseCxt->pTransporter, &(pCxt->pParseCxt->mgmtEpSet), &tableName, &pTableMeta); + SName tableName; + int32_t code = getTableMetaImpl( + pCxt->pParseCxt, toName(pCxt->pParseCxt->acctId, pClause->dbName, pClause->tableName, &tableName), &pTableMeta); if (TSDB_CODE_SUCCESS == code) { if (TSDB_SUPER_TABLE == pTableMeta->tableType) { code = doTranslateDropSuperTable(pCxt, &tableName, pClause->ignoreNotExists); @@ -906,8 +976,8 @@ static int32_t translateDropTable(STranslateContext* pCxt, SDropTableStmt* pStmt // todo : drop normal table or child table code = TSDB_CODE_FAILED; } + tfree(pTableMeta); } - tfree(pTableMeta); return code; } @@ -920,13 +990,14 @@ static int32_t translateDropSuperTable(STranslateContext* pCxt, SDropSuperTableS } static int32_t translateUseDatabase(STranslateContext* pCxt, SUseDatabaseStmt* pStmt) { + SUseDbReq usedbReq = {0}; SName name = {0}; tNameSetDbName(&name, pCxt->pParseCxt->acctId, pStmt->dbName, strlen(pStmt->dbName)); - - SUseDbReq usedbReq = {0}; tNameExtractFullName(&name, usedbReq.db); - - catalogGetDBVgVersion(pCxt->pParseCxt->pCatalog, usedbReq.db, &usedbReq.vgVersion, &usedbReq.dbId, &usedbReq.numOfTable); + int32_t code = getDBVgVersion(pCxt->pParseCxt, usedbReq.db, &usedbReq.vgVersion, &usedbReq.dbId, &usedbReq.numOfTable); + if (TSDB_CODE_SUCCESS != code) { + return code; + } pCxt->pCmdMsg = malloc(sizeof(SCmdMsgInfo)); if (NULL== pCxt->pCmdMsg) { @@ -1102,19 +1173,14 @@ static int32_t translateShow(STranslateContext* pCxt, SShowStmt* pStmt) { } static int32_t translateShowTables(STranslateContext* pCxt) { - SName name = {0}; SVShowTablesReq* pShowReq = calloc(1, sizeof(SVShowTablesReq)); if (pCxt->pParseCxt->db == NULL || strlen(pCxt->pParseCxt->db) == 0) { return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_TSC_INVALID_OPERATION, "db not specified"); } - tNameSetDbName(&name, pCxt->pParseCxt->acctId, pCxt->pParseCxt->db, strlen(pCxt->pParseCxt->db)); - char dbFname[TSDB_DB_FNAME_LEN] = {0}; - tNameGetFullDbName(&name, dbFname); - SArray* array = NULL; - int32_t code = catalogGetDBVgInfo(pCxt->pParseCxt->pCatalog, pCxt->pParseCxt->pTransporter, &pCxt->pParseCxt->mgmtEpSet, dbFname, &array); - if (code != TSDB_CODE_SUCCESS) { + int32_t code = getDBVgInfo(pCxt->pParseCxt, pCxt->pParseCxt->db, &array); + if (TSDB_CODE_SUCCESS != code) { return code; } SVgroupInfo* info = taosArrayGet(array, 0); @@ -1236,6 +1302,25 @@ static void destroyTranslateContext(STranslateContext* pCxt) { } } +static int32_t rewriteShowDatabase(STranslateContext* pCxt, SQuery* pQuery) { + SSelectStmt* pStmt = nodesMakeNode(QUERY_NODE_SELECT_STMT); + if (NULL == pStmt) { + return TSDB_CODE_OUT_OF_MEMORY; + } + SRealTableNode* pTable = nodesMakeNode(QUERY_NODE_REAL_TABLE); + if (NULL == pTable) { + nodesDestroyNode(pStmt); + return TSDB_CODE_OUT_OF_MEMORY; + } + strcpy(pTable->table.dbName, TSDB_INFORMATION_SCHEMA_DB); + strcpy(pTable->table.tableName, TSDB_INS_TABLE_USER_DATABASES); + pStmt->pFromTable = (SNode*)pTable; + + nodesDestroyNode(pQuery->pRoot); + pQuery->pRoot = (SNode*)pStmt; + return TSDB_CODE_SUCCESS; +} + typedef struct SVgroupTablesBatch { SVCreateTbBatchReq req; SVgroupInfo info; @@ -1325,13 +1410,6 @@ static void destroyCreateTbReqBatch(SVgroupTablesBatch* pTbBatch) { taosArrayDestroy(pTbBatch->req.pArray); } -static int32_t getTableHashVgroup(SParseContext* pCxt, const char* pDbName, const char* pTableName, SVgroupInfo* pInfo) { - SName name = { .type = TSDB_TABLE_NAME_T, .acctId = pCxt->acctId }; - strcpy(name.dbname, pDbName); - strcpy(name.tname, pTableName); - return catalogGetTableHashVgroup(pCxt->pCatalog, pCxt->pTransporter, &pCxt->mgmtEpSet, &name, pInfo); -} - static int32_t rewriteToVnodeModifOpStmt(SQuery* pQuery, SArray* pBufArray) { SVnodeModifOpStmt* pNewStmt = nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT); if (pNewStmt == NULL) { @@ -1516,12 +1594,9 @@ static int32_t buildKVRowForAllTags(STranslateContext* pCxt, SCreateSubTableClau } static int32_t rewriteCreateSubTable(STranslateContext* pCxt, SCreateSubTableClause* pStmt, SHashObj* pVgroupHashmap) { - SName name = { .type = TSDB_TABLE_NAME_T, .acctId = pCxt->pParseCxt->acctId }; - strcpy(name.dbname, pStmt->useDbName); - strcpy(name.tname, pStmt->useTableName); STableMeta* pSuperTableMeta = NULL; - int32_t code = catalogGetTableMeta(pCxt->pParseCxt->pCatalog, pCxt->pParseCxt->pTransporter, &pCxt->pParseCxt->mgmtEpSet, &name, &pSuperTableMeta); - + int32_t code = getTableMeta(pCxt->pParseCxt, pStmt->useDbName, pStmt->useTableName, &pSuperTableMeta); + SKVRowBuilder kvRowBuilder = {0}; if (TSDB_CODE_SUCCESS == code) { code = tdInitKVRowBuilder(&kvRowBuilder); @@ -1609,6 +1684,9 @@ static int32_t rewriteCreateMultiTable(STranslateContext* pCxt, SQuery* pQuery) static int32_t rewriteQuery(STranslateContext* pCxt, SQuery* pQuery) { int32_t code = TSDB_CODE_SUCCESS; switch (nodeType(pQuery->pRoot)) { + case QUERY_NODE_SHOW_DATABASES_STMT: + code = rewriteShowDatabase(pCxt, pQuery); + break; case QUERY_NODE_CREATE_TABLE_STMT: if (NULL == ((SCreateTableStmt*)pQuery->pRoot)->pTags) { code = rewriteCreateTable(pCxt, pQuery); diff --git a/source/libs/parser/src/parser.c b/source/libs/parser/src/parser.c index 868bd755208ed1ac0cdafe3774f4d979dc3d197a..08f316b539e02dec5416f2aa0cc3e63f45c0ac3d 100644 --- a/source/libs/parser/src/parser.c +++ b/source/libs/parser/src/parser.c @@ -38,11 +38,14 @@ static int32_t parseSqlIntoAst(SParseContext* pCxt, SQuery** pQuery) { } int32_t qParseQuerySql(SParseContext* pCxt, SQuery** pQuery) { + int32_t code = TSDB_CODE_SUCCESS; if (isInsertSql(pCxt->pSql, pCxt->sqlLen)) { - return parseInsertSql(pCxt, pQuery); + code = parseInsertSql(pCxt, pQuery); } else { - return parseSqlIntoAst(pCxt, pQuery); + code = parseSqlIntoAst(pCxt, pQuery); } + terrno = code; + return code; } void qDestroyQuery(SQuery* pQueryNode) { diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index a93985e8ba26cec7af6935222548378dc3656961..e1d625ff0a441bd77431ceb51b94752dbb8a5d37 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -123,6 +123,26 @@ error: return pRoot; } +static EScanType getScanType(SNodeList* pScanCols, STableMeta* pMeta) { + if (NULL == pScanCols) { + // select count(*) from t + return SCAN_TYPE_TABLE; + } + + if (TSDB_SYSTEM_TABLE == pMeta->tableType) { + return SCAN_TYPE_SYSTEM_TABLE; + } + + SNode* pCol = NULL; + FOREACH(pCol, pScanCols) { + if (COLUMN_TYPE_COLUMN == ((SColumnNode*)pCol)->colType) { + return SCAN_TYPE_TABLE; + } + } + + return SCAN_TYPE_TAG; +} + static SLogicNode* createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SRealTableNode* pRealTable) { SScanLogicNode* pScan = (SScanLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_SCAN); CHECK_ALLOC(pScan, NULL); @@ -145,7 +165,7 @@ static SLogicNode* createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSe CHECK_ALLOC(pScan->node.pTargets, (SLogicNode*)pScan); } - pScan->scanType = SCAN_TYPE_TABLE; + pScan->scanType = getScanType(pCols, pScan->pMeta); pScan->scanFlag = MAIN_SCAN; pScan->scanRange = TSWINDOW_INITIALIZER; pScan->tableName.type = TSDB_TABLE_NAME_T; diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index cf7d467b80d2ce60fb478cfba4f45c5e3cb2d87d..a546ad44a46215f65c660115d9be33244e7b6fc0 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -199,20 +199,27 @@ static SNodeptr createPrimaryKeyCol(SPhysiPlanContext* pCxt, uint64_t tableId) { } static int32_t createScanCols(SPhysiPlanContext* pCxt, SScanPhysiNode* pScanPhysiNode, SNodeList* pScanCols) { - pScanPhysiNode->pScanCols = nodesMakeList(); - CHECK_ALLOC(pScanPhysiNode->pScanCols, TSDB_CODE_OUT_OF_MEMORY); - CHECK_CODE_EXT(nodesListStrictAppend(pScanPhysiNode->pScanCols, createPrimaryKeyCol(pCxt, pScanPhysiNode->uid))); - - SNode* pNode; - FOREACH(pNode, pScanCols) { - if (PRIMARYKEY_TIMESTAMP_COL_ID == ((SColumnNode*)pNode)->colId) { - SColumnNode* pCol = nodesListGetNode(pScanPhysiNode->pScanCols, 0); - strcpy(pCol->tableAlias, ((SColumnNode*)pNode)->tableAlias); - strcpy(pCol->colName, ((SColumnNode*)pNode)->colName); - continue; + if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == nodeType(pScanPhysiNode) + || QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN == nodeType(pScanPhysiNode)) { + pScanPhysiNode->pScanCols = nodesMakeList(); + CHECK_ALLOC(pScanPhysiNode->pScanCols, TSDB_CODE_OUT_OF_MEMORY); + CHECK_CODE_EXT(nodesListStrictAppend(pScanPhysiNode->pScanCols, createPrimaryKeyCol(pCxt, pScanPhysiNode->uid))); + + SNode* pNode; + FOREACH(pNode, pScanCols) { + if (PRIMARYKEY_TIMESTAMP_COL_ID == ((SColumnNode*)pNode)->colId) { + SColumnNode* pCol = nodesListGetNode(pScanPhysiNode->pScanCols, 0); + strcpy(pCol->tableAlias, ((SColumnNode*)pNode)->tableAlias); + strcpy(pCol->colName, ((SColumnNode*)pNode)->colName); + continue; + } + CHECK_CODE_EXT(nodesListStrictAppend(pScanPhysiNode->pScanCols, nodesCloneNode(pNode))); } - CHECK_CODE_EXT(nodesListStrictAppend(pScanPhysiNode->pScanCols, nodesCloneNode(pNode))); + } else { + pScanPhysiNode->pScanCols = nodesCloneList(pScanCols); + CHECK_ALLOC(pScanPhysiNode->pScanCols, TSDB_CODE_OUT_OF_MEMORY); } + return TSDB_CODE_SUCCESS; } @@ -260,13 +267,27 @@ static SPhysiNode* createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* p return (SPhysiNode*)pTableScan; } +static SPhysiNode* createSystemTableScanPhysiNode(SPhysiPlanContext* pCxt, SScanLogicNode* pScanLogicNode) { + SSystemTableScanPhysiNode* pScan = (SSystemTableScanPhysiNode*)makePhysiNode(pCxt, QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN); + CHECK_ALLOC(pScan, NULL); + CHECK_CODE(initScanPhysiNode(pCxt, pScanLogicNode, (SScanPhysiNode*)pScan), (SPhysiNode*)pScan); + for (int32_t i = 0; i < pScanLogicNode->pVgroupList->numOfVgroups; ++i) { + SQueryNodeAddr addr; + vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups + i, &addr); + taosArrayPush(pCxt->pExecNodeList, &addr); + } + pScan->mgmtEpSet = pCxt->pPlanCxt->mgmtEpSet; + return (SPhysiNode*)pScan; +} + static SPhysiNode* createScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode) { switch (pScanLogicNode->scanType) { case SCAN_TYPE_TAG: return createTagScanPhysiNode(pCxt, pScanLogicNode); case SCAN_TYPE_TABLE: return createTableScanPhysiNode(pCxt, pSubplan, pScanLogicNode); - case SCAN_TYPE_STABLE: + case SCAN_TYPE_SYSTEM_TABLE: + return createSystemTableScanPhysiNode(pCxt, pScanLogicNode); case SCAN_TYPE_STREAM: break; default: @@ -769,7 +790,7 @@ static SQueryPlan* makeQueryPhysiPlan(SPhysiPlanContext* pCxt) { static int32_t doBuildPhysiPlan(SPhysiPlanContext* pCxt, SSubLogicPlan* pLogicSubplan, SSubplan* pParent, SQueryPlan* pQueryPlan) { SSubplan* pSubplan = createPhysiSubplan(pCxt, pLogicSubplan); - CHECK_ALLOC(pSubplan, DEAL_RES_ERROR); + CHECK_ALLOC(pSubplan, TSDB_CODE_OUT_OF_MEMORY); CHECK_CODE_EXT(pushSubplan(pCxt, pSubplan, pLogicSubplan->level, pQueryPlan->pSubplans)); ++(pQueryPlan->numOfSubplans); if (NULL != pParent) { diff --git a/source/libs/planner/test/plannerTest.cpp b/source/libs/planner/test/plannerTest.cpp index 1af89d71c94e9c9cd2ea298e923178f4489e6631..51642199f9d297ebdf5bcfd04466227d9445f0f4 100644 --- a/source/libs/planner/test/plannerTest.cpp +++ b/source/libs/planner/test/plannerTest.cpp @@ -56,7 +56,7 @@ protected: const string syntaxTreeStr = toString(query_->pRoot, false); SLogicNode* pLogicPlan = nullptr; - SPlanContext cxt = { .queryId = 1, .acctId = 0, .pAstRoot = query_->pRoot }; + SPlanContext cxt = { .queryId = 1, .acctId = 0, .mgmtEpSet = {0}, .pAstRoot = query_->pRoot }; code = createLogicPlan(&cxt, &pLogicPlan); if (code != TSDB_CODE_SUCCESS) { cout << "sql:[" << cxt_.pSql << "] logic plan code:" << code << ", strerror:" << tstrerror(code) << endl; diff --git a/source/libs/qcom/src/querymsg.c b/source/libs/qcom/src/querymsg.c index 8dd9f33c8a47104adf774d9a7b08b51d4f3f2913..b4763024dc9ffa8a47ffbbecb22486be573c0fda 100644 --- a/source/libs/qcom/src/querymsg.c +++ b/source/libs/qcom/src/querymsg.c @@ -172,7 +172,7 @@ static int32_t queryConvertTableMetaMsg(STableMetaRsp *pMetaMsg) { } if (pMetaMsg->tableType != TSDB_SUPER_TABLE && pMetaMsg->tableType != TSDB_CHILD_TABLE && - pMetaMsg->tableType != TSDB_NORMAL_TABLE) { + pMetaMsg->tableType != TSDB_NORMAL_TABLE && pMetaMsg->tableType != TSDB_SYSTEM_TABLE) { qError("invalid tableType[%d] in table meta rsp msg", pMetaMsg->tableType); return TSDB_CODE_TSC_INVALID_VALUE; } diff --git a/tests/script/tsim/db/basic1.sim b/tests/script/tsim/db/basic1.sim index 21e26784bf0ff9399536be98b3ca6ee3a3993c27..c07ebd040002b864b6533f42d929ed898b890331 100644 --- a/tests/script/tsim/db/basic1.sim +++ b/tests/script/tsim/db/basic1.sim @@ -6,7 +6,7 @@ sql connect print =============== create database sql create database d1 vgroups 2 sql show databases -if $rows != 1 then +if $rows != 2 then return -1 endi @@ -40,7 +40,7 @@ endi print =============== drop database sql drop database d1 sql show databases -if $rows != 0 then +if $rows != 1 then return -1 endi @@ -49,7 +49,7 @@ sql create database d2 vgroups 2 sql create database d3 vgroups 3 sql create database d4 vgroups 4 sql show databases -if $rows != 3 then +if $rows != 4 then return -1 endi @@ -111,7 +111,7 @@ print =============== drop database sql drop database d2 sql drop database d3 sql show databases -if $rows != 1 then +if $rows != 2 then return -1 endi @@ -154,7 +154,7 @@ system sh/exec.sh -n dnode1 -s start print =============== show databases sql show databases -if $rows != 1 then +if $rows != 2 then return -1 endi diff --git a/tests/script/tsim/db/error1.sim b/tests/script/tsim/db/error1.sim index bf9e04c017b79fc436a4a7d11547ec8071a183c6..a7f74afc10c369b5f2dcaa3e0a0cfe26b6d4bc93 100644 --- a/tests/script/tsim/db/error1.sim +++ b/tests/script/tsim/db/error1.sim @@ -24,8 +24,8 @@ endi print ========== stop dnode2 system sh/exec.sh -n dnode2 -s stop -x SIGKILL -print =============== create database -sql_error create database d1 vgroups 4 +#print =============== create database +#sql_error create database d1 vgroups 4 print ========== start dnode2 system sh/exec.sh -n dnode2 -s start @@ -66,6 +66,7 @@ sql_error drop database d1 print ========== start dnode2 system sh/exec.sh -n dnode2 -s start +sleep 1000 print =============== re-create database $x = 0