From 0b4a5b130fc069caae2a625096328046fa689a2d Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 28 May 2020 10:17:38 +0000 Subject: [PATCH] [TD-354] add sql parser for stream table --- src/inc/taosdef.h | 1 + src/mnode/src/mnodeProfile.c | 137 +++-------------------------------- src/mnode/src/mnodeShow.c | 66 ++++++++++------- src/mnode/src/mnodeTable.c | 131 +++++++++++++++++++++++++++++++-- src/vnode/src/vnodeWrite.c | 7 +- 5 files changed, 179 insertions(+), 163 deletions(-) diff --git a/src/inc/taosdef.h b/src/inc/taosdef.h index ce50fcbcf0..b2bfe7c590 100644 --- a/src/inc/taosdef.h +++ b/src/inc/taosdef.h @@ -203,6 +203,7 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size); #define TSDB_COL_NAME_LEN 64 #define TSDB_MAX_SAVED_SQL_LEN TSDB_MAX_COLUMNS * 64 #define TSDB_MAX_SQL_LEN TSDB_PAYLOAD_SIZE +#define TSDB_MAX_SQL_SHOW_LEN 256 #define TSDB_MAX_ALLOWED_SQL_LEN (8*1024*1024U) // sql length should be less than 6mb #define TSDB_MAX_BYTES_PER_ROW TSDB_MAX_COLUMNS * 64 diff --git a/src/mnode/src/mnodeProfile.c b/src/mnode/src/mnodeProfile.c index 4cb560e066..a37f5436c6 100644 --- a/src/mnode/src/mnodeProfile.c +++ b/src/mnode/src/mnodeProfile.c @@ -346,123 +346,6 @@ int32_t mnodeGetStreams(SShowObj *pShow, void *pConn) { return 0; } -int32_t mnodeGetStreamMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { - int32_t cols = 0; - SSchema *pSchema = pMeta->schema; - - pShow->bytes[cols] = TSDB_USER_LEN; - pSchema[cols].type = TSDB_DATA_TYPE_BINARY; - strcpy(pSchema[cols].name, "user"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); - cols++; - - pShow->bytes[cols] = TSDB_IPv4ADDR_LEN + 14; - pSchema[cols].type = TSDB_DATA_TYPE_BINARY; - strcpy(pSchema[cols].name, "ip:port:id"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); - cols++; - - pShow->bytes[cols] = 8; - pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP; - strcpy(pSchema[cols].name, "created time"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); - cols++; - - pShow->bytes[cols] = 8; - pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP; - strcpy(pSchema[cols].name, "exec time"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); - cols++; - - pShow->bytes[cols] = 8; - pSchema[cols].type = TSDB_DATA_TYPE_INT; - strcpy(pSchema[cols].name, "time(us)"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); - cols++; - - pShow->bytes[cols] = TSDB_SHOW_SQL_LEN; - pSchema[cols].type = TSDB_DATA_TYPE_BINARY; - strcpy(pSchema[cols].name, "sql"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); - cols++; - - pShow->bytes[cols] = 4; - pSchema[cols].type = TSDB_DATA_TYPE_INT; - strcpy(pSchema[cols].name, "cycles"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); - cols++; - - pMeta->numOfColumns = htons(cols); - pShow->numOfColumns = cols; - - pShow->offset[0] = 0; - for (int32_t i = 1; i < cols; ++i) pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1]; - - pShow->numOfRows = 1000000; - pShow->pIter = NULL; - pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; - - mnodeGetStreams(pShow, pConn); - return 0; -} - -int32_t mnodeRetrieveStreams(SShowObj *pShow, char *data, int32_t rows, void *pConn) { - int32_t numOfRows = 0; - char *pWrite; - int32_t cols = 0; - - SStreamShow *pStreamShow = (SStreamShow *)pShow->pIter; - - if (rows > pStreamShow->numOfStreams - pStreamShow->index) rows = pStreamShow->numOfStreams - pStreamShow->index; - - while (numOfRows < rows) { - SStreamDesc *pNode = pStreamShow->sdesc + pStreamShow->index; - SCDesc *pCDesc = pStreamShow->cdesc[pStreamShow->index]; - cols = 0; - - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - strcpy(pWrite, pCDesc->user); - cols++; - - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - uint32_t ip = pCDesc->ip; - sprintf(pWrite, "%d.%d.%d.%d:%hu:%d", ip & 0xFF, (ip >> 8) & 0xFF, (ip >> 16) & 0xFF, ip >> 24, htons(pCDesc->port), - pNode->streamId); - cols++; - - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - *(int64_t *)pWrite = pNode->ctime; - cols++; - - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - *(int64_t *)pWrite = pNode->stime; - cols++; - - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - *(int64_t *)pWrite = pNode->useconds; - cols++; - - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - strcpy(pWrite, pNode->sql); - cols++; - - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - *(int32_t *)pWrite = pNode->num; - cols++; - - numOfRows++; - pStreamShow->index++; - } - - if (numOfRows == 0) { - tfree(pStreamShow->cdesc); - tfree(pStreamShow->connInfo); - tfree(pStreamShow); - } - - pShow->numOfReads += numOfRows; - return numOfRows; -} int32_t mnodeKillStream(char *qidstr, void *pConn) { // char *temp, *chr, idstr[64]; @@ -750,18 +633,16 @@ int32_t mnodeProcessKillConnectionMsg(SMnodeMsg *pMsg) { } int32_t mnodeInitProfile() { - mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_QUERIES, mnodeGetQueryMeta); - mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_QUERIES, mnodeRetrieveQueries); - mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_CONNS, mnodeGetConnsMeta); - mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_CONNS, mnodeRetrieveConns); - mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_STREAMS, mnodeGetStreamMeta); - mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_STREAMS, mnodeRetrieveStreams); - mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_KILL_QUERY, mnodeProcessKillQueryMsg); - mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_KILL_STREAM, mnodeProcessKillStreamMsg); - mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_KILL_CONN, mnodeProcessKillConnectionMsg); + // mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_QUERIES, mnodeGetQueryMeta); + // mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_QUERIES, mnodeRetrieveQueries); + // mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_CONNS, mnodeGetConnsMeta); + // mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_CONNS, mnodeRetrieveConns); + + // mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_KILL_QUERY, mnodeProcessKillQueryMsg); + // mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_KILL_STREAM, mnodeProcessKillStreamMsg); + // mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_KILL_CONN, mnodeProcessKillConnectionMsg); return 0; } -void mnodeCleanupProfile() { -} +void mnodeCleanupProfile() {} diff --git a/src/mnode/src/mnodeShow.c b/src/mnode/src/mnodeShow.c index 0a522e8f99..0973163cf9 100644 --- a/src/mnode/src/mnodeShow.c +++ b/src/mnode/src/mnodeShow.c @@ -47,9 +47,10 @@ static int32_t mnodeProcessConnectMsg(SMnodeMsg *mnodeMsg); static int32_t mnodeProcessUseMsg(SMnodeMsg *mnodeMsg); static void mnodeFreeShowObj(void *data); -static bool mnodeCheckQhandle(uint64_t qhandle); -static void *mnodeSaveQhandle(void *qhandle, int32_t size); -static void mnodeFreeQhandle(void *qhandle, bool forceRemove); +static bool mnodeCheckShowObj(SShowObj *pShow); +static bool mnodeCheckShowFinished(SShowObj *pShow); +static void *mnodeSaveShowObj(SShowObj *pShow, int32_t size); +static void mnodeCleanupShowObj(void *pShow, bool forceRemove); extern void *tsMnodeTmr; static void *tsQhandleCache = NULL; @@ -129,7 +130,7 @@ static int32_t mnodeProcessShowMsg(SMnodeMsg *pMsg) { strcpy(pShow->db, pShowMsg->db); memcpy(pShow->payload, pShowMsg->payload, pShow->payloadLen); - pShow = mnodeSaveQhandle(pShow, showObjSize); + pShow = mnodeSaveShowObj(pShow, showObjSize); if (pShow == NULL) { return TSDB_CODE_SERV_OUT_OF_MEMORY; } @@ -143,7 +144,7 @@ static int32_t mnodeProcessShowMsg(SMnodeMsg *pMsg) { return TSDB_CODE_SUCCESS; } else { mError("show:%p, type:%s, failed to get meta, reason:%s", pShow, mnodeGetShowType(pShowMsg->type), tstrerror(code)); - mnodeFreeQhandle(pShow, true); + mnodeCleanupShowObj(pShow, true); return code; } } @@ -155,17 +156,24 @@ static int32_t mnodeProcessRetrieveMsg(SMnodeMsg *pMsg) { SRetrieveTableMsg *pRetrieve = pMsg->rpcMsg.pCont; pRetrieve->qhandle = htobe64(pRetrieve->qhandle); + SShowObj *pShow = (SShowObj *)pRetrieve->qhandle; + mTrace("show:%p, type:%s, retrieve data", pShow, mnodeGetShowType(pShow->type)); + /* * in case of server restart, apps may hold qhandle created by server before * restart, which is actually invalid, therefore, signature check is required. */ - if (!mnodeCheckQhandle(pRetrieve->qhandle)) { - mError("retrieve:%p, qhandle:%p is invalid", pRetrieve, pRetrieve->qhandle); + if (!mnodeCheckShowObj(pShow)) { + mError("retrieve:%p, qhandle:%p is invalid", pRetrieve, pShow); return TSDB_CODE_INVALID_QHANDLE; } - - SShowObj *pShow = (SShowObj *)pRetrieve->qhandle; - mTrace("show:%p, type:%s, retrieve data", pShow, mnodeGetShowType(pShow->type)); + + if (mnodeCheckShowFinished(pShow)) { + mTrace("retrieve:%p, qhandle:%p already read finished, numOfReads:%d numOfRows:%d", pRetrieve, pShow, pShow->numOfReads, pShow->numOfRows); + pShow->numOfReads = pShow->numOfRows; + //mnodeCleanupShowObj(pShow, true); + //return TSDB_CODE_SUCCESS; + } if ((pRetrieve->free & TSDB_QUERY_TYPE_FREE_RESOURCE) != TSDB_QUERY_TYPE_FREE_RESOURCE) { rowsToRead = pShow->numOfRows - pShow->numOfReads; @@ -190,7 +198,7 @@ static int32_t mnodeProcessRetrieveMsg(SMnodeMsg *pMsg) { if (rowsRead < 0) { rpcFreeCont(pRsp); - mnodeFreeQhandle(pShow, false); + mnodeCleanupShowObj(pShow, false); assert(false); return TSDB_CODE_ACTION_IN_PROGRESS; } @@ -202,11 +210,11 @@ static int32_t mnodeProcessRetrieveMsg(SMnodeMsg *pMsg) { pMsg->rpcRsp.len = size; if (rowsToRead == 0) { - mnodeFreeQhandle(pShow, true); + mnodeCleanupShowObj(pShow, true); } else { - mnodeFreeQhandle(pShow, false); + mnodeCleanupShowObj(pShow, false); } - + return TSDB_CODE_SUCCESS; } @@ -301,23 +309,29 @@ static int32_t mnodeProcessUseMsg(SMnodeMsg *pMsg) { return code; } -static bool mnodeCheckQhandle(uint64_t qhandle) { - void *pSaved = taosCacheAcquireByData(tsQhandleCache, (void *)qhandle); - if (pSaved == (void *)qhandle) { - mTrace("show:%p, is retrieved", qhandle); +static bool mnodeCheckShowFinished(SShowObj *pShow) { + if (pShow->pIter == NULL && pShow->numOfReads != 0) { + return true; + } + return false; +} + +static bool mnodeCheckShowObj(SShowObj *pShow) { + SShowObj *pSaved = taosCacheAcquireByData(tsQhandleCache, pShow); + if (pSaved == pShow) { return true; } else { - mTrace("show:%p, is already released", qhandle); + mTrace("show:%p, is already released", pShow); return false; } } -static void *mnodeSaveQhandle(void *qhandle, int32_t size) { +static void *mnodeSaveShowObj(SShowObj *pShow, int32_t size) { if (tsQhandleCache != NULL) { char key[24]; - sprintf(key, "show:%p", qhandle); - void *newQhandle = taosCachePut(tsQhandleCache, key, qhandle, size, 60); - free(qhandle); + sprintf(key, "show:%p", pShow); + SShowObj *newQhandle = taosCachePut(tsQhandleCache, key, pShow, size, 60); + free(pShow); mTrace("show:%p, is saved", newQhandle); return newQhandle; @@ -332,7 +346,7 @@ static void mnodeFreeShowObj(void *data) { mTrace("show:%p, is destroyed", pShow); } -static void mnodeFreeQhandle(void *qhandle, bool forceRemove) { - mTrace("show:%p, is released, force:%s", qhandle, forceRemove ? "true" : "false"); - taosCacheRelease(tsQhandleCache, &qhandle, forceRemove); +static void mnodeCleanupShowObj(void *pShow, bool forceRemove) { + mTrace("show:%p, is released, force:%s", pShow, forceRemove ? "true" : "false"); + taosCacheRelease(tsQhandleCache, &pShow, forceRemove); } diff --git a/src/mnode/src/mnodeTable.c b/src/mnode/src/mnodeTable.c index 4ce8c88281..c444fd1455 100644 --- a/src/mnode/src/mnodeTable.c +++ b/src/mnode/src/mnodeTable.c @@ -59,9 +59,11 @@ static void mnodeRemoveTableFromStable(SSuperTableObj *pStable, SChildTableOb static int32_t mnodeGetShowTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); static int32_t mnodeRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows, void *pConn); -static int32_t mnodeRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, void *pConn); static int32_t mnodeGetShowSuperTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); - +static int32_t mnodeRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, void *pConn); +static int32_t mnodeGetStreamMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); +static int32_t mnodeRetrieveStreams(SShowObj *pShow, char *data, int32_t rows, void *pConn); + static int32_t mnodeProcessCreateTableMsg(SMnodeMsg *mnodeMsg); static int32_t mnodeProcessCreateSuperTableMsg(SMnodeMsg *pMsg); static int32_t mnodeProcessCreateChildTableMsg(SMnodeMsg *pMsg); @@ -567,7 +569,9 @@ int32_t mnodeInitTables() { mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_TABLE, mnodeRetrieveShowTables); mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_METRIC, mnodeGetShowSuperTableMeta); mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_METRIC, mnodeRetrieveShowSuperTables); - + mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_STREAMS, mnodeGetStreamMeta); + mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_STREAMS, mnodeRetrieveStreams); + return TSDB_CODE_SUCCESS; } @@ -1386,7 +1390,10 @@ static void *mnodeBuildCreateChildTableMsg(SCMCreateTableMsg *pMsg, SChildTableO if (pTable->info.type == TSDB_CHILD_TABLE && pMsg != NULL) { memcpy(pCreate->data + totalCols * sizeof(SSchema), pTagData->data, tagDataLen); - memcpy(pCreate->data + totalCols * sizeof(SSchema) + tagDataLen, pTable->sql, pTable->sqlLen); + } + + if (pTable->info.type == TSDB_STREAM_TABLE && pMsg != NULL) { + memcpy(pCreate->data + totalCols * sizeof(SSchema), pTable->sql, pTable->sqlLen); } return pCreate; @@ -1516,7 +1523,7 @@ static int32_t mnodeProcessCreateChildTableMsg(SMnodeMsg *pMsg) { return terrno; } - SMDCreateTableMsg *pMDCreate = mnodeBuildCreateChildTableMsg(pCreate, (SChildTableObj *) pMsg->pTable); + SMDCreateTableMsg *pMDCreate = mnodeBuildCreateChildTableMsg(pCreate, (SChildTableObj *)pMsg->pTable); if (pMDCreate == NULL) { return terrno; } @@ -1879,7 +1886,7 @@ static int32_t mnodeProcessTableCfgMsg(SMnodeMsg *pMsg) { } SMDCreateTableMsg *pMDCreate = NULL; - pMDCreate = mnodeBuildCreateChildTableMsg(NULL, (SChildTableObj *) pTable); + pMDCreate = mnodeBuildCreateChildTableMsg(NULL, (SChildTableObj *)pTable); if (pMDCreate == NULL) { mnodeDecTableRef(pTable); return terrno; @@ -2248,3 +2255,115 @@ static int32_t mnodeProcessAlterTableMsg(SMnodeMsg *pMsg) { return code; } + +static int32_t mnodeGetStreamMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { + SDbObj *pDb = mnodeGetDb(pShow->db); + if (pDb == NULL) return TSDB_CODE_DB_NOT_SELECTED; + + int32_t cols = 0; + SSchema *pSchema = pMeta->schema; + + pShow->bytes[cols] = TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE; + pSchema[cols].type = TSDB_DATA_TYPE_BINARY; + strcpy(pSchema[cols].name, "table_name"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = 8; + pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP; + strcpy(pSchema[cols].name, "created_time"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = 2; + pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT; + strcpy(pSchema[cols].name, "columns"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = TSDB_MAX_SQL_SHOW_LEN + VARSTR_HEADER_SIZE; + pSchema[cols].type = TSDB_DATA_TYPE_BINARY; + strcpy(pSchema[cols].name, "sql"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pMeta->numOfColumns = htons(cols); + pShow->numOfColumns = cols; + + pShow->offset[0] = 0; + for (int32_t i = 1; i < cols; ++i) { + pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1]; + } + + pShow->numOfRows = pDb->numOfTables; + pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; + + mnodeDecDbRef(pDb); + return 0; +} + +static int32_t mnodeRetrieveStreams(SShowObj *pShow, char *data, int32_t rows, void *pConn) { + SDbObj *pDb = mnodeGetDb(pShow->db); + if (pDb == NULL) return 0; + + + int32_t numOfRows = 0; + SChildTableObj *pTable = NULL; + SPatternCompareInfo info = PATTERN_COMPARE_INFO_INITIALIZER; + + char prefix[64] = {0}; + strcpy(prefix, pDb->name); + strcat(prefix, TS_PATH_DELIMITER); + int32_t prefixLen = strlen(prefix); + + while (numOfRows < rows) { + pShow->pIter = mnodeGetNextChildTable(pShow->pIter, &pTable); + if (pTable == NULL) break; + + // not belong to current db + if (strncmp(pTable->info.tableId, prefix, prefixLen) || pTable->info.type != TSDB_STREAM_TABLE) { + mnodeDecTableRef(pTable); + continue; + } + + char tableName[TSDB_TABLE_NAME_LEN + 1] = {0}; + + // pattern compare for table name + mnodeExtractTableName(pTable->info.tableId, tableName); + + if (pShow->payloadLen > 0 && patternMatch(pShow->payload, tableName, TSDB_TABLE_NAME_LEN, &info) != TSDB_PATTERN_MATCH) { + mnodeDecTableRef(pTable); + continue; + } + + int32_t cols = 0; + + char *pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + + STR_WITH_MAXSIZE_TO_VARSTR(pWrite, tableName, TSDB_TABLE_NAME_LEN); + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + *(int64_t *) pWrite = pTable->createdTime; + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + *(int16_t *)pWrite = pTable->numOfColumns; + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pTable->sql, TSDB_MAX_SQL_SHOW_LEN); + cols++; + + numOfRows++; + mnodeDecTableRef(pTable); + } + + pShow->numOfReads += numOfRows; + const int32_t NUM_OF_COLUMNS = 4; + + mnodeVacuumResult(data, NUM_OF_COLUMNS, numOfRows, rows, pShow); + mnodeDecDbRef(pDb); + + return numOfRows; +} \ No newline at end of file diff --git a/src/vnode/src/vnodeWrite.c b/src/vnode/src/vnodeWrite.c index 6854dd3c78..6d65d10335 100644 --- a/src/vnode/src/vnodeWrite.c +++ b/src/vnode/src/vnodeWrite.c @@ -106,8 +106,7 @@ static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pR static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) { SMDCreateTableMsg *pTable = pCont; int32_t code = 0; - char sql[1024] = "\0"; - + vTrace("vgId:%d, table:%s, start to create", pVnode->vgId, pTable->tableId); int16_t numOfColumns = htons(pTable->numOfColumns); int16_t numOfTags = htons(pTable->numOfTags); @@ -152,8 +151,10 @@ static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRe tsdbTableSetTagValue(&tCfg, dataRow, false); } + // only normal has sql string if (pTable->tableType == TSDB_STREAM_TABLE) { - // TODO: set sql value + char *sql = pTable->data + totalCols * sizeof(SSchema); + vTrace("vgId:%d, table:%s is creating, sql:%s", pVnode->vgId, pTable->tableId, sql); tsdbTableSetStreamSql(&tCfg, sql, false); } -- GitLab