提交 e17a105e 编写于 作者: wmmhello's avatar wmmhello

feat:[TD-23117] add schema for ins_topics

上级 ae540d10
...@@ -253,6 +253,7 @@ typedef enum ELogicConditionType { ...@@ -253,6 +253,7 @@ typedef enum ELogicConditionType {
#define TSDB_IPv4ADDR_LEN 16 #define TSDB_IPv4ADDR_LEN 16
#define TSDB_FILENAME_LEN 128 #define TSDB_FILENAME_LEN 128
#define TSDB_SHOW_SQL_LEN 2048 #define TSDB_SHOW_SQL_LEN 2048
#define TSDB_SHOW_SCHEMA_JSON_LEN TSDB_MAX_COLUMNS * 256
#define TSDB_SLOW_QUERY_SQL_LEN 512 #define TSDB_SLOW_QUERY_SQL_LEN 512
#define TSDB_SHOW_SUBQUERY_LEN 1000 #define TSDB_SHOW_SUBQUERY_LEN 1000
......
...@@ -280,7 +280,9 @@ static const SSysDbTableSchema topicSchema[] = { ...@@ -280,7 +280,9 @@ static const SSysDbTableSchema topicSchema[] = {
{.name = "db_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, {.name = "db_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false}, {.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
{.name = "sql", .bytes = TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, {.name = "sql", .bytes = TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
// TODO config {.name = "schema", .bytes = TSDB_SHOW_SCHEMA_JSON_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
{.name = "meta", .bytes = 4 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
{.name = "type", .bytes = 8 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
}; };
......
...@@ -32,6 +32,7 @@ bool mndIsDbReady(SMnode *pMnode, SDbObj *pDb); ...@@ -32,6 +32,7 @@ bool mndIsDbReady(SMnode *pMnode, SDbObj *pDb);
SSdbRaw *mndDbActionEncode(SDbObj *pDb); SSdbRaw *mndDbActionEncode(SDbObj *pDb);
const char *mndGetDbStr(const char *src); const char *mndGetDbStr(const char *src);
const char *mndGetStableStr(const char *src);
int32_t mndProcessCompactDbReq(SRpcMsg *pReq); int32_t mndProcessCompactDbReq(SRpcMsg *pReq);
......
...@@ -521,6 +521,7 @@ typedef struct { ...@@ -521,6 +521,7 @@ typedef struct {
char* physicalPlan; char* physicalPlan;
SSchemaWrapper schema; SSchemaWrapper schema;
int64_t stbUid; int64_t stbUid;
char stbName[TSDB_TABLE_FNAME_LEN];
// forbid condition // forbid condition
int64_t ntbUid; int64_t ntbUid;
SArray* ntbColIds; SArray* ntbColIds;
......
...@@ -1513,6 +1513,13 @@ const char *mndGetDbStr(const char *src) { ...@@ -1513,6 +1513,13 @@ const char *mndGetDbStr(const char *src) {
return pos; return pos;
} }
const char *mndGetStableStr(const char *src) {
char *pos = strstr(src, TS_PATH_DELIMITER);
if (pos != NULL) ++pos;
if (pos == NULL) return src;
return mndGetDbStr(pos);
}
static int64_t getValOfDiffPrecision(int8_t unit, int64_t val) { static int64_t getValOfDiffPrecision(int8_t unit, int64_t val) {
int64_t v = 0; int64_t v = 0;
switch (unit) { switch (unit) {
......
...@@ -285,6 +285,7 @@ void dumpTopic(SSdb *pSdb, SJson *json) { ...@@ -285,6 +285,7 @@ void dumpTopic(SSdb *pSdb, SJson *json) {
tjsonAddStringToObject(item, "subType", i642str(pObj->subType)); tjsonAddStringToObject(item, "subType", i642str(pObj->subType));
tjsonAddStringToObject(item, "withMeta", i642str(pObj->withMeta)); tjsonAddStringToObject(item, "withMeta", i642str(pObj->withMeta));
tjsonAddStringToObject(item, "stbUid", i642str(pObj->stbUid)); tjsonAddStringToObject(item, "stbUid", i642str(pObj->stbUid));
tjsonAddStringToObject(item, "stbName", mndGetStableStr(pObj->stbName));
tjsonAddStringToObject(item, "sqlLen", i642str(pObj->sqlLen)); tjsonAddStringToObject(item, "sqlLen", i642str(pObj->sqlLen));
tjsonAddStringToObject(item, "astLen", i642str(pObj->astLen)); tjsonAddStringToObject(item, "astLen", i642str(pObj->astLen));
tjsonAddStringToObject(item, "sqlLen", i642str(pObj->sqlLen)); tjsonAddStringToObject(item, "sqlLen", i642str(pObj->sqlLen));
......
...@@ -109,6 +109,7 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) { ...@@ -109,6 +109,7 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
SDB_SET_INT8(pRaw, dataPos, pTopic->withMeta, TOPIC_ENCODE_OVER); SDB_SET_INT8(pRaw, dataPos, pTopic->withMeta, TOPIC_ENCODE_OVER);
SDB_SET_INT64(pRaw, dataPos, pTopic->stbUid, TOPIC_ENCODE_OVER); SDB_SET_INT64(pRaw, dataPos, pTopic->stbUid, TOPIC_ENCODE_OVER);
SDB_SET_BINARY(pRaw, dataPos, pTopic->stbName, TSDB_TABLE_FNAME_LEN, TOPIC_ENCODE_OVER);
SDB_SET_INT32(pRaw, dataPos, pTopic->sqlLen, TOPIC_ENCODE_OVER); SDB_SET_INT32(pRaw, dataPos, pTopic->sqlLen, TOPIC_ENCODE_OVER);
SDB_SET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_ENCODE_OVER); SDB_SET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_ENCODE_OVER);
SDB_SET_INT32(pRaw, dataPos, pTopic->astLen, TOPIC_ENCODE_OVER); SDB_SET_INT32(pRaw, dataPos, pTopic->astLen, TOPIC_ENCODE_OVER);
...@@ -196,6 +197,7 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) { ...@@ -196,6 +197,7 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT8(pRaw, dataPos, &pTopic->withMeta, TOPIC_DECODE_OVER); SDB_GET_INT8(pRaw, dataPos, &pTopic->withMeta, TOPIC_DECODE_OVER);
SDB_GET_INT64(pRaw, dataPos, &pTopic->stbUid, TOPIC_DECODE_OVER); SDB_GET_INT64(pRaw, dataPos, &pTopic->stbUid, TOPIC_DECODE_OVER);
SDB_GET_BINARY(pRaw, dataPos, pTopic->stbName, TSDB_TABLE_FNAME_LEN, TOPIC_DECODE_OVER);
SDB_GET_INT32(pRaw, dataPos, &pTopic->sqlLen, TOPIC_DECODE_OVER); SDB_GET_INT32(pRaw, dataPos, &pTopic->sqlLen, TOPIC_DECODE_OVER);
pTopic->sql = taosMemoryCalloc(pTopic->sqlLen, sizeof(char)); pTopic->sql = taosMemoryCalloc(pTopic->sqlLen, sizeof(char));
if (pTopic->sql == NULL) { if (pTopic->sql == NULL) {
...@@ -460,6 +462,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * ...@@ -460,6 +462,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
return -1; return -1;
} }
strcpy(topicObj.stbName, pCreate->subStbName);
topicObj.stbUid = pStb->uid; topicObj.stbUid = pStb->uid;
mndReleaseStb(pMnode, pStb); mndReleaseStb(pMnode, pStb);
} }
...@@ -830,6 +833,43 @@ int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTopics) { ...@@ -830,6 +833,43 @@ int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTopics) {
return 0; return 0;
} }
static void schemaToJson(SSchema *schema, int32_t nCols, char *schemaJson){
char* string = NULL;
cJSON* columns = cJSON_CreateArray();
if (columns == NULL) {
return;
}
for (int i = 0; i < nCols; i++) {
cJSON* column = cJSON_CreateObject();
SSchema* s = schema + i;
cJSON* cname = cJSON_CreateString(s->name);
cJSON_AddItemToObject(column, "name", cname);
cJSON* ctype = cJSON_CreateString(tDataTypes[s->type].name);
cJSON_AddItemToObject(column, "type", ctype);
int32_t length = 0;
if (s->type == TSDB_DATA_TYPE_BINARY) {
length = s->bytes - VARSTR_HEADER_SIZE;
} else if (s->type == TSDB_DATA_TYPE_NCHAR || s->type == TSDB_DATA_TYPE_JSON) {
length = (s->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
} else{
length = s->bytes;
}
cJSON* cbytes = cJSON_CreateNumber(length);
cJSON_AddItemToObject(column, "length", cbytes);
cJSON_AddItemToArray(columns, column);
}
string = cJSON_PrintUnformatted(columns);
cJSON_Delete(columns);
size_t len = strlen(string);
if(string && len <= TSDB_SHOW_SCHEMA_JSON_LEN){
STR_TO_VARSTR(schemaJson, string);
}else{
mError("mndRetrieveTopic build schema error json:%p, json len:%zu", string, len);
}
taosMemoryFree(string);
}
static int32_t mndRetrieveTopic(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) { static int32_t mndRetrieveTopic(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
SMnode *pMnode = pReq->info.node; SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
...@@ -868,6 +908,47 @@ static int32_t mndRetrieveTopic(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl ...@@ -868,6 +908,47 @@ static int32_t mndRetrieveTopic(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)sql, false); colDataSetVal(pColInfo, numOfRows, (const char *)sql, false);
char schemaJson[TSDB_SHOW_SCHEMA_JSON_LEN + VARSTR_HEADER_SIZE] = {0};
if(pTopic->subType == TOPIC_SUB_TYPE__COLUMN){
schemaToJson(pTopic->schema.pSchema, pTopic->schema.nCols, schemaJson);
}else if(pTopic->subType == TOPIC_SUB_TYPE__TABLE){
SStbObj *pStb = mndAcquireStb(pMnode, pTopic->stbName);
if (pStb == NULL) {
terrno = TSDB_CODE_MND_STB_NOT_EXIST;
return -1;
}
schemaToJson(pStb->pColumns, pStb->numOfColumns, schemaJson);
mndReleaseStb(pMnode, pStb);
}else{
STR_TO_VARSTR(schemaJson, "NULL");
}
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)schemaJson, false);
char mete[4 + VARSTR_HEADER_SIZE] = {0};
if(pTopic->withMeta){
STR_TO_VARSTR(mete, "yes");
}else{
STR_TO_VARSTR(mete, "no");
}
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)mete, false);
char type[8 + VARSTR_HEADER_SIZE] = {0};
if(pTopic->subType == TOPIC_SUB_TYPE__COLUMN){
STR_TO_VARSTR(type, "column");
}else if(pTopic->subType == TOPIC_SUB_TYPE__TABLE){
STR_TO_VARSTR(type, "stable");
}else{
STR_TO_VARSTR(type, "db");
}
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)type, false);
numOfRows++; numOfRows++;
sdbRelease(pSdb, pTopic); sdbRelease(pSdb, pTopic);
} }
......
import taos
import sys
import time
import socket
import os
import threading
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
class TDTestCase:
hostname = socket.gethostname()
# rpcDebugFlagVal = '143'
#clientCfgDict = {'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''}
#clientCfgDict["rpcDebugFlag"] = rpcDebugFlagVal
#updatecfgDict = {'clientCfg': {}, 'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''}
# updatecfgDict["rpcDebugFlag"] = rpcDebugFlagVal
#print ("===================: ", updatecfgDict)
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor())
#tdSql.init(conn.cursor(), logSql) # output sql.txt file
def test(self):
tdLog.info("create database, stb, ctb")
tdSql.execute("create database if not exists db1 vgroups 4 wal_retention_period 3600")
tdSql.execute("create table if not exists db1.st(ts timestamp, c1 int, c2 bool, c3 tinyint, c4 double, c5 nchar(8)) tags(t1 int, t2 float, t3 binary(4))")
tdSql.execute("create table if not exists db1.nt(ts timestamp, c1 smallint, c2 float, c3 binary(64), c4 bigint)")
tdSql.execute("create table if not exists db1.st1 using db1.st tags(1, 9.3, \"st1\")")
tdLog.info("create topic")
tdSql.execute("create topic topic_1 as database db1")
tdSql.execute("create topic topic_2 as stable db1.st")
tdSql.execute("create topic topic_3 as select * from db1.nt")
tdSql.execute("create topic topic_4 as select ts,c3,c5 from db1.st")
tdSql.query("select * from information_schema.ins_topics")
tdSql.checkRows(4)
# tdSql.checkData(0, 1, 51)
# tdSql.checkData(0, 4, 940)
# tdSql.checkData(1, 1, 23)
# tdSql.checkData(1, 4, None)
tdLog.printNoPrefix("======== test case end ...... ")
def run(self):
self.test()
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册