未验证 提交 349ef38c 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #13083 from taosdata/feature/3_liaohj

fix(query): set the gid for stream result.
......@@ -247,7 +247,7 @@ typedef enum ELogicConditionType {
#define TSDB_EP_LEN (TSDB_FQDN_LEN + 6)
#define TSDB_IPv4ADDR_LEN 16
#define TSDB_FILENAME_LEN 128
#define TSDB_SHOW_SQL_LEN 512
#define TSDB_SHOW_SQL_LEN 1024
#define TSDB_SLOW_QUERY_SQL_LEN 512
#define TSDB_SHOW_SUBQUERY_LEN 1000
......
......@@ -125,11 +125,15 @@ static const SSysDbTableSchema userStbsSchema[] = {
static const SSysDbTableSchema streamSchema[] = {
{.name = "stream_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "user_name", .bytes = 23, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "dest_table", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
{.name = "sql", .bytes = 1024, .type = TSDB_DATA_TYPE_VARCHAR},
};
{.name = "sql", .bytes = TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "status", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
{.name = "source_db", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "target_db", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "target_table", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "watermark", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT},
{.name = "trigger", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
};
static const SSysDbTableSchema userTblsSchema[] = {
{.name = "table_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
......
......@@ -882,6 +882,8 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
// TODO temporarily used, when the statement of "partition by tbname" is ready, remove this
if (pInfo->assignBlockUid) {
pInfo->pRes->info.groupId = uid;
} else {
pInfo->pRes->info.groupId = groupId;
}
int32_t numOfCols = pInfo->pRes->info.numOfCols;
......
......@@ -1128,7 +1128,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
continue;
}
pUpdated = hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, 0);
pUpdated = hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, pBlock->info.groupId);
}
finalizeUpdatedResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, pUpdated, pInfo->binfo.rowCellInfoOffset);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册