提交 0b4a5b13 编写于 作者: S Shengliang Guan

[TD-354] add sql parser for stream table

上级 3528ed61
......@@ -203,6 +203,7 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size);
#define TSDB_COL_NAME_LEN 64
#define TSDB_MAX_SAVED_SQL_LEN TSDB_MAX_COLUMNS * 64
#define TSDB_MAX_SQL_LEN TSDB_PAYLOAD_SIZE
#define TSDB_MAX_SQL_SHOW_LEN 256
#define TSDB_MAX_ALLOWED_SQL_LEN (8*1024*1024U) // sql length should be less than 6mb
#define TSDB_MAX_BYTES_PER_ROW TSDB_MAX_COLUMNS * 64
......
......@@ -346,123 +346,6 @@ int32_t mnodeGetStreams(SShowObj *pShow, void *pConn) {
return 0;
}
int32_t mnodeGetStreamMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
int32_t cols = 0;
SSchema *pSchema = pMeta->schema;
pShow->bytes[cols] = TSDB_USER_LEN;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "user");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = TSDB_IPv4ADDR_LEN + 14;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "ip:port:id");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 8;
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
strcpy(pSchema[cols].name, "created time");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 8;
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
strcpy(pSchema[cols].name, "exec time");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 8;
pSchema[cols].type = TSDB_DATA_TYPE_INT;
strcpy(pSchema[cols].name, "time(us)");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = TSDB_SHOW_SQL_LEN;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "sql");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT;
strcpy(pSchema[cols].name, "cycles");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pMeta->numOfColumns = htons(cols);
pShow->numOfColumns = cols;
pShow->offset[0] = 0;
for (int32_t i = 1; i < cols; ++i) pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
pShow->numOfRows = 1000000;
pShow->pIter = NULL;
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
mnodeGetStreams(pShow, pConn);
return 0;
}
int32_t mnodeRetrieveStreams(SShowObj *pShow, char *data, int32_t rows, void *pConn) {
int32_t numOfRows = 0;
char *pWrite;
int32_t cols = 0;
SStreamShow *pStreamShow = (SStreamShow *)pShow->pIter;
if (rows > pStreamShow->numOfStreams - pStreamShow->index) rows = pStreamShow->numOfStreams - pStreamShow->index;
while (numOfRows < rows) {
SStreamDesc *pNode = pStreamShow->sdesc + pStreamShow->index;
SCDesc *pCDesc = pStreamShow->cdesc[pStreamShow->index];
cols = 0;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
strcpy(pWrite, pCDesc->user);
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
uint32_t ip = pCDesc->ip;
sprintf(pWrite, "%d.%d.%d.%d:%hu:%d", ip & 0xFF, (ip >> 8) & 0xFF, (ip >> 16) & 0xFF, ip >> 24, htons(pCDesc->port),
pNode->streamId);
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int64_t *)pWrite = pNode->ctime;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int64_t *)pWrite = pNode->stime;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int64_t *)pWrite = pNode->useconds;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
strcpy(pWrite, pNode->sql);
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int32_t *)pWrite = pNode->num;
cols++;
numOfRows++;
pStreamShow->index++;
}
if (numOfRows == 0) {
tfree(pStreamShow->cdesc);
tfree(pStreamShow->connInfo);
tfree(pStreamShow);
}
pShow->numOfReads += numOfRows;
return numOfRows;
}
int32_t mnodeKillStream(char *qidstr, void *pConn) {
// char *temp, *chr, idstr[64];
......@@ -750,18 +633,16 @@ int32_t mnodeProcessKillConnectionMsg(SMnodeMsg *pMsg) {
}
int32_t mnodeInitProfile() {
mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_QUERIES, mnodeGetQueryMeta);
mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_QUERIES, mnodeRetrieveQueries);
mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_CONNS, mnodeGetConnsMeta);
mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_CONNS, mnodeRetrieveConns);
mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_STREAMS, mnodeGetStreamMeta);
mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_STREAMS, mnodeRetrieveStreams);
mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_KILL_QUERY, mnodeProcessKillQueryMsg);
mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_KILL_STREAM, mnodeProcessKillStreamMsg);
mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_KILL_CONN, mnodeProcessKillConnectionMsg);
// mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_QUERIES, mnodeGetQueryMeta);
// mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_QUERIES, mnodeRetrieveQueries);
// mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_CONNS, mnodeGetConnsMeta);
// mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_CONNS, mnodeRetrieveConns);
// mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_KILL_QUERY, mnodeProcessKillQueryMsg);
// mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_KILL_STREAM, mnodeProcessKillStreamMsg);
// mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_KILL_CONN, mnodeProcessKillConnectionMsg);
return 0;
}
void mnodeCleanupProfile() {
}
void mnodeCleanupProfile() {}
......@@ -47,9 +47,10 @@ static int32_t mnodeProcessConnectMsg(SMnodeMsg *mnodeMsg);
static int32_t mnodeProcessUseMsg(SMnodeMsg *mnodeMsg);
static void mnodeFreeShowObj(void *data);
static bool mnodeCheckQhandle(uint64_t qhandle);
static void *mnodeSaveQhandle(void *qhandle, int32_t size);
static void mnodeFreeQhandle(void *qhandle, bool forceRemove);
static bool mnodeCheckShowObj(SShowObj *pShow);
static bool mnodeCheckShowFinished(SShowObj *pShow);
static void *mnodeSaveShowObj(SShowObj *pShow, int32_t size);
static void mnodeCleanupShowObj(void *pShow, bool forceRemove);
extern void *tsMnodeTmr;
static void *tsQhandleCache = NULL;
......@@ -129,7 +130,7 @@ static int32_t mnodeProcessShowMsg(SMnodeMsg *pMsg) {
strcpy(pShow->db, pShowMsg->db);
memcpy(pShow->payload, pShowMsg->payload, pShow->payloadLen);
pShow = mnodeSaveQhandle(pShow, showObjSize);
pShow = mnodeSaveShowObj(pShow, showObjSize);
if (pShow == NULL) {
return TSDB_CODE_SERV_OUT_OF_MEMORY;
}
......@@ -143,7 +144,7 @@ static int32_t mnodeProcessShowMsg(SMnodeMsg *pMsg) {
return TSDB_CODE_SUCCESS;
} else {
mError("show:%p, type:%s, failed to get meta, reason:%s", pShow, mnodeGetShowType(pShowMsg->type), tstrerror(code));
mnodeFreeQhandle(pShow, true);
mnodeCleanupShowObj(pShow, true);
return code;
}
}
......@@ -155,17 +156,24 @@ static int32_t mnodeProcessRetrieveMsg(SMnodeMsg *pMsg) {
SRetrieveTableMsg *pRetrieve = pMsg->rpcMsg.pCont;
pRetrieve->qhandle = htobe64(pRetrieve->qhandle);
SShowObj *pShow = (SShowObj *)pRetrieve->qhandle;
mTrace("show:%p, type:%s, retrieve data", pShow, mnodeGetShowType(pShow->type));
/*
* in case of server restart, apps may hold qhandle created by server before
* restart, which is actually invalid, therefore, signature check is required.
*/
if (!mnodeCheckQhandle(pRetrieve->qhandle)) {
mError("retrieve:%p, qhandle:%p is invalid", pRetrieve, pRetrieve->qhandle);
if (!mnodeCheckShowObj(pShow)) {
mError("retrieve:%p, qhandle:%p is invalid", pRetrieve, pShow);
return TSDB_CODE_INVALID_QHANDLE;
}
SShowObj *pShow = (SShowObj *)pRetrieve->qhandle;
mTrace("show:%p, type:%s, retrieve data", pShow, mnodeGetShowType(pShow->type));
if (mnodeCheckShowFinished(pShow)) {
mTrace("retrieve:%p, qhandle:%p already read finished, numOfReads:%d numOfRows:%d", pRetrieve, pShow, pShow->numOfReads, pShow->numOfRows);
pShow->numOfReads = pShow->numOfRows;
//mnodeCleanupShowObj(pShow, true);
//return TSDB_CODE_SUCCESS;
}
if ((pRetrieve->free & TSDB_QUERY_TYPE_FREE_RESOURCE) != TSDB_QUERY_TYPE_FREE_RESOURCE) {
rowsToRead = pShow->numOfRows - pShow->numOfReads;
......@@ -190,7 +198,7 @@ static int32_t mnodeProcessRetrieveMsg(SMnodeMsg *pMsg) {
if (rowsRead < 0) {
rpcFreeCont(pRsp);
mnodeFreeQhandle(pShow, false);
mnodeCleanupShowObj(pShow, false);
assert(false);
return TSDB_CODE_ACTION_IN_PROGRESS;
}
......@@ -202,11 +210,11 @@ static int32_t mnodeProcessRetrieveMsg(SMnodeMsg *pMsg) {
pMsg->rpcRsp.len = size;
if (rowsToRead == 0) {
mnodeFreeQhandle(pShow, true);
mnodeCleanupShowObj(pShow, true);
} else {
mnodeFreeQhandle(pShow, false);
mnodeCleanupShowObj(pShow, false);
}
return TSDB_CODE_SUCCESS;
}
......@@ -301,23 +309,29 @@ static int32_t mnodeProcessUseMsg(SMnodeMsg *pMsg) {
return code;
}
static bool mnodeCheckQhandle(uint64_t qhandle) {
void *pSaved = taosCacheAcquireByData(tsQhandleCache, (void *)qhandle);
if (pSaved == (void *)qhandle) {
mTrace("show:%p, is retrieved", qhandle);
static bool mnodeCheckShowFinished(SShowObj *pShow) {
if (pShow->pIter == NULL && pShow->numOfReads != 0) {
return true;
}
return false;
}
static bool mnodeCheckShowObj(SShowObj *pShow) {
SShowObj *pSaved = taosCacheAcquireByData(tsQhandleCache, pShow);
if (pSaved == pShow) {
return true;
} else {
mTrace("show:%p, is already released", qhandle);
mTrace("show:%p, is already released", pShow);
return false;
}
}
static void *mnodeSaveQhandle(void *qhandle, int32_t size) {
static void *mnodeSaveShowObj(SShowObj *pShow, int32_t size) {
if (tsQhandleCache != NULL) {
char key[24];
sprintf(key, "show:%p", qhandle);
void *newQhandle = taosCachePut(tsQhandleCache, key, qhandle, size, 60);
free(qhandle);
sprintf(key, "show:%p", pShow);
SShowObj *newQhandle = taosCachePut(tsQhandleCache, key, pShow, size, 60);
free(pShow);
mTrace("show:%p, is saved", newQhandle);
return newQhandle;
......@@ -332,7 +346,7 @@ static void mnodeFreeShowObj(void *data) {
mTrace("show:%p, is destroyed", pShow);
}
static void mnodeFreeQhandle(void *qhandle, bool forceRemove) {
mTrace("show:%p, is released, force:%s", qhandle, forceRemove ? "true" : "false");
taosCacheRelease(tsQhandleCache, &qhandle, forceRemove);
static void mnodeCleanupShowObj(void *pShow, bool forceRemove) {
mTrace("show:%p, is released, force:%s", pShow, forceRemove ? "true" : "false");
taosCacheRelease(tsQhandleCache, &pShow, forceRemove);
}
......@@ -59,9 +59,11 @@ static void mnodeRemoveTableFromStable(SSuperTableObj *pStable, SChildTableOb
static int32_t mnodeGetShowTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
static int32_t mnodeRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows, void *pConn);
static int32_t mnodeRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, void *pConn);
static int32_t mnodeGetShowSuperTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
static int32_t mnodeRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, void *pConn);
static int32_t mnodeGetStreamMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
static int32_t mnodeRetrieveStreams(SShowObj *pShow, char *data, int32_t rows, void *pConn);
static int32_t mnodeProcessCreateTableMsg(SMnodeMsg *mnodeMsg);
static int32_t mnodeProcessCreateSuperTableMsg(SMnodeMsg *pMsg);
static int32_t mnodeProcessCreateChildTableMsg(SMnodeMsg *pMsg);
......@@ -567,7 +569,9 @@ int32_t mnodeInitTables() {
mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_TABLE, mnodeRetrieveShowTables);
mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_METRIC, mnodeGetShowSuperTableMeta);
mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_METRIC, mnodeRetrieveShowSuperTables);
mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_STREAMS, mnodeGetStreamMeta);
mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_STREAMS, mnodeRetrieveStreams);
return TSDB_CODE_SUCCESS;
}
......@@ -1386,7 +1390,10 @@ static void *mnodeBuildCreateChildTableMsg(SCMCreateTableMsg *pMsg, SChildTableO
if (pTable->info.type == TSDB_CHILD_TABLE && pMsg != NULL) {
memcpy(pCreate->data + totalCols * sizeof(SSchema), pTagData->data, tagDataLen);
memcpy(pCreate->data + totalCols * sizeof(SSchema) + tagDataLen, pTable->sql, pTable->sqlLen);
}
if (pTable->info.type == TSDB_STREAM_TABLE && pMsg != NULL) {
memcpy(pCreate->data + totalCols * sizeof(SSchema), pTable->sql, pTable->sqlLen);
}
return pCreate;
......@@ -1516,7 +1523,7 @@ static int32_t mnodeProcessCreateChildTableMsg(SMnodeMsg *pMsg) {
return terrno;
}
SMDCreateTableMsg *pMDCreate = mnodeBuildCreateChildTableMsg(pCreate, (SChildTableObj *) pMsg->pTable);
SMDCreateTableMsg *pMDCreate = mnodeBuildCreateChildTableMsg(pCreate, (SChildTableObj *)pMsg->pTable);
if (pMDCreate == NULL) {
return terrno;
}
......@@ -1879,7 +1886,7 @@ static int32_t mnodeProcessTableCfgMsg(SMnodeMsg *pMsg) {
}
SMDCreateTableMsg *pMDCreate = NULL;
pMDCreate = mnodeBuildCreateChildTableMsg(NULL, (SChildTableObj *) pTable);
pMDCreate = mnodeBuildCreateChildTableMsg(NULL, (SChildTableObj *)pTable);
if (pMDCreate == NULL) {
mnodeDecTableRef(pTable);
return terrno;
......@@ -2248,3 +2255,115 @@ static int32_t mnodeProcessAlterTableMsg(SMnodeMsg *pMsg) {
return code;
}
static int32_t mnodeGetStreamMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
SDbObj *pDb = mnodeGetDb(pShow->db);
if (pDb == NULL) return TSDB_CODE_DB_NOT_SELECTED;
int32_t cols = 0;
SSchema *pSchema = pMeta->schema;
pShow->bytes[cols] = TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "table_name");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 8;
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
strcpy(pSchema[cols].name, "created_time");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 2;
pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
strcpy(pSchema[cols].name, "columns");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = TSDB_MAX_SQL_SHOW_LEN + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "sql");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pMeta->numOfColumns = htons(cols);
pShow->numOfColumns = cols;
pShow->offset[0] = 0;
for (int32_t i = 1; i < cols; ++i) {
pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
}
pShow->numOfRows = pDb->numOfTables;
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
mnodeDecDbRef(pDb);
return 0;
}
static int32_t mnodeRetrieveStreams(SShowObj *pShow, char *data, int32_t rows, void *pConn) {
SDbObj *pDb = mnodeGetDb(pShow->db);
if (pDb == NULL) return 0;
int32_t numOfRows = 0;
SChildTableObj *pTable = NULL;
SPatternCompareInfo info = PATTERN_COMPARE_INFO_INITIALIZER;
char prefix[64] = {0};
strcpy(prefix, pDb->name);
strcat(prefix, TS_PATH_DELIMITER);
int32_t prefixLen = strlen(prefix);
while (numOfRows < rows) {
pShow->pIter = mnodeGetNextChildTable(pShow->pIter, &pTable);
if (pTable == NULL) break;
// not belong to current db
if (strncmp(pTable->info.tableId, prefix, prefixLen) || pTable->info.type != TSDB_STREAM_TABLE) {
mnodeDecTableRef(pTable);
continue;
}
char tableName[TSDB_TABLE_NAME_LEN + 1] = {0};
// pattern compare for table name
mnodeExtractTableName(pTable->info.tableId, tableName);
if (pShow->payloadLen > 0 && patternMatch(pShow->payload, tableName, TSDB_TABLE_NAME_LEN, &info) != TSDB_PATTERN_MATCH) {
mnodeDecTableRef(pTable);
continue;
}
int32_t cols = 0;
char *pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, tableName, TSDB_TABLE_NAME_LEN);
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int64_t *) pWrite = pTable->createdTime;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int16_t *)pWrite = pTable->numOfColumns;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pTable->sql, TSDB_MAX_SQL_SHOW_LEN);
cols++;
numOfRows++;
mnodeDecTableRef(pTable);
}
pShow->numOfReads += numOfRows;
const int32_t NUM_OF_COLUMNS = 4;
mnodeVacuumResult(data, NUM_OF_COLUMNS, numOfRows, rows, pShow);
mnodeDecDbRef(pDb);
return numOfRows;
}
\ No newline at end of file
......@@ -106,8 +106,7 @@ static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pR
static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) {
SMDCreateTableMsg *pTable = pCont;
int32_t code = 0;
char sql[1024] = "\0";
vTrace("vgId:%d, table:%s, start to create", pVnode->vgId, pTable->tableId);
int16_t numOfColumns = htons(pTable->numOfColumns);
int16_t numOfTags = htons(pTable->numOfTags);
......@@ -152,8 +151,10 @@ static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRe
tsdbTableSetTagValue(&tCfg, dataRow, false);
}
// only normal has sql string
if (pTable->tableType == TSDB_STREAM_TABLE) {
// TODO: set sql value
char *sql = pTable->data + totalCols * sizeof(SSchema);
vTrace("vgId:%d, table:%s is creating, sql:%s", pVnode->vgId, pTable->tableId, sql);
tsdbTableSetStreamSql(&tCfg, sql, false);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册