未验证 提交 f4885ef4 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #20128 from taosdata/fix/TD-22639

fix:table name is null
...@@ -354,7 +354,7 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, ...@@ -354,7 +354,7 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
int32_t dataIndex = 0; int32_t dataIndex = 0;
for (int16_t i = 0; i < pObj->outputSchema.nCols; i++) { for (int16_t i = 0; i < pObj->outputSchema.nCols; i++) {
SColLocation *pos = taosArrayGet(pCreate->fillNullCols, nullIndex); SColLocation *pos = taosArrayGet(pCreate->fillNullCols, nullIndex);
if (i < pos->slotId) { if (nullIndex >= numOfNULL || i < pos->slotId) {
pFullSchema[i].bytes = pObj->outputSchema.pSchema[dataIndex].bytes; pFullSchema[i].bytes = pObj->outputSchema.pSchema[dataIndex].bytes;
pFullSchema[i].colId = i + 1; // pObj->outputSchema.pSchema[dataIndex].colId; pFullSchema[i].colId = i + 1; // pObj->outputSchema.pSchema[dataIndex].colId;
pFullSchema[i].flags = pObj->outputSchema.pSchema[dataIndex].flags; pFullSchema[i].flags = pObj->outputSchema.pSchema[dataIndex].flags;
......
...@@ -497,7 +497,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* ...@@ -497,7 +497,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
taosArrayPush(tagArray, &tagVal); taosArrayPush(tagArray, &tagVal);
} }
} }
pCreateTbReq->ctb.tagNum = size; pCreateTbReq->ctb.tagNum = TMAX(size - UD_TAG_COLUMN_INDEX, 1);
STag* pTag = NULL; STag* pTag = NULL;
tTagNew(tagArray, 1, false, &pTag); tTagNew(tagArray, 1, false, &pTag);
...@@ -510,15 +510,12 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* ...@@ -510,15 +510,12 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
pCreateTbReq->ctb.pTag = (uint8_t*)pTag; pCreateTbReq->ctb.pTag = (uint8_t*)pTag;
// set table name // set table name
SColumnInfoData* pTbColInfo = taosArrayGet(pDataBlock->pDataBlock, UD_TABLE_NAME_COLUMN_INDEX); if (!pDataBlock->info.parTbName[0]) {
if (colDataIsNull_s(pTbColInfo, rowId)) {
SColumnInfoData* pGpIdColInfo = taosArrayGet(pDataBlock->pDataBlock, UD_GROUPID_COLUMN_INDEX); SColumnInfoData* pGpIdColInfo = taosArrayGet(pDataBlock->pDataBlock, UD_GROUPID_COLUMN_INDEX);
void* pGpIdData = colDataGetData(pGpIdColInfo, rowId); void* pGpIdData = colDataGetData(pGpIdColInfo, rowId);
pCreateTbReq->name = buildCtbNameByGroupId(stbFullName, *(uint64_t*)pGpIdData); pCreateTbReq->name = buildCtbNameByGroupId(stbFullName, *(uint64_t*)pGpIdData);
} else { } else {
void* pTbData = colDataGetData(pTbColInfo, rowId); pCreateTbReq->name = strdup(pDataBlock->info.parTbName);
pCreateTbReq->name = taosMemoryCalloc(1, varDataLen(pTbData) + 1);
memcpy(pCreateTbReq->name, varDataVal(pTbData), varDataLen(pTbData));
} }
taosArrayPush(reqs.pArray, pCreateTbReq); taosArrayPush(reqs.pArray, pCreateTbReq);
} }
......
...@@ -992,26 +992,34 @@ void appendCreateTableRow(SStreamState* pState, SExprSupp* pTableSup, SExprSupp* ...@@ -992,26 +992,34 @@ void appendCreateTableRow(SStreamState* pState, SExprSupp* pTableSup, SExprSupp*
SSDataBlock* pTmpBlock = blockCopyOneRow(pSrcBlock, rowId); SSDataBlock* pTmpBlock = blockCopyOneRow(pSrcBlock, rowId);
memset(pTmpBlock->info.parTbName, 0, TSDB_TABLE_NAME_LEN); memset(pTmpBlock->info.parTbName, 0, TSDB_TABLE_NAME_LEN);
pTmpBlock->info.id.groupId = groupId; pTmpBlock->info.id.groupId = groupId;
char* tbName = pSrcBlock->info.parTbName;
if (pTableSup->numOfExprs > 0) { if (pTableSup->numOfExprs > 0) {
projectApplyFunctions(pTableSup->pExprInfo, pDestBlock, pTmpBlock, pTableSup->pCtx, pTableSup->numOfExprs, NULL); projectApplyFunctions(pTableSup->pExprInfo, pDestBlock, pTmpBlock, pTableSup->pCtx, pTableSup->numOfExprs, NULL);
SColumnInfoData* pTbCol = taosArrayGet(pDestBlock->pDataBlock, UD_TABLE_NAME_COLUMN_INDEX); SColumnInfoData* pTbCol = taosArrayGet(pDestBlock->pDataBlock, UD_TABLE_NAME_COLUMN_INDEX);
void* pData = colDataGetVarData(pTbCol, pDestBlock->info.rows - 1);
char* tbName = pSrcBlock->info.parTbName;
memset(tbName, 0, TSDB_TABLE_NAME_LEN); memset(tbName, 0, TSDB_TABLE_NAME_LEN);
int32_t len = TMIN(varDataLen(pData), TSDB_TABLE_NAME_LEN - 1); int32_t len = 0;
memcpy(tbName, varDataVal(pData), len); if (colDataIsNull_s(pTbCol, pDestBlock->info.rows - 1)) {
len = TMIN(sizeof(TSDB_DATA_NULL_STR), TSDB_TABLE_NAME_LEN - 1);
memcpy(tbName, TSDB_DATA_NULL_STR, len);
} else {
void* pData = colDataGetData(pTbCol, pDestBlock->info.rows - 1);
len = TMIN(varDataLen(pData), TSDB_TABLE_NAME_LEN - 1);
memcpy(tbName, varDataVal(pData), len);
}
streamStatePutParName(pState, groupId, tbName); streamStatePutParName(pState, groupId, tbName);
memcpy(pTmpBlock->info.parTbName, tbName, len); memcpy(pTmpBlock->info.parTbName, tbName, len);
pDestBlock->info.rows--; pDestBlock->info.rows--;
} else { } else {
void* pTbNameCol = taosArrayGet(pDestBlock->pDataBlock, UD_TABLE_NAME_COLUMN_INDEX); void* pTbNameCol = taosArrayGet(pDestBlock->pDataBlock, UD_TABLE_NAME_COLUMN_INDEX);
colDataAppendNULL(pTbNameCol, pDestBlock->info.rows); colDataAppendNULL(pTbNameCol, pDestBlock->info.rows);
pSrcBlock->info.parTbName[0] = 0; tbName[0] = 0;
} }
if (pTagSup->numOfExprs > 0) { if (pTagSup->numOfExprs > 0) {
projectApplyFunctions(pTagSup->pExprInfo, pDestBlock, pTmpBlock, pTagSup->pCtx, pTagSup->numOfExprs, NULL); projectApplyFunctions(pTagSup->pExprInfo, pDestBlock, pTmpBlock, pTagSup->pCtx, pTagSup->numOfExprs, NULL);
pDestBlock->info.rows--; pDestBlock->info.rows--;
} else {
memcpy(pDestBlock->info.parTbName, pTmpBlock->info.parTbName, TSDB_TABLE_NAME_LEN);
} }
void* pGpIdCol = taosArrayGet(pDestBlock->pDataBlock, UD_GROUPID_COLUMN_INDEX); void* pGpIdCol = taosArrayGet(pDestBlock->pDataBlock, UD_GROUPID_COLUMN_INDEX);
......
...@@ -301,7 +301,7 @@ print $data00, $data01, $data02, $data03 ...@@ -301,7 +301,7 @@ print $data00, $data01, $data02, $data03
print $data10, $data11, $data12, $data13 print $data10, $data11, $data12, $data13
print $data20, $data21, $data22, $data23 print $data20, $data21, $data22, $data23
loop2: loop3:
sleep 300 sleep 300
...@@ -317,47 +317,182 @@ if $rows != 2 then ...@@ -317,47 +317,182 @@ if $rows != 2 then
print $data00, $data01, $data02, $data03 print $data00, $data01, $data02, $data03
print $data10, $data11, $data12, $data13 print $data10, $data11, $data12, $data13
print $data20, $data21, $data22, $data23 print $data20, $data21, $data22, $data23
goto loop2 goto loop3
endi endi
if $data01 != 10 then if $data01 != 10 then
print =====data01=$data01 print =====data01=$data01
goto loop2 goto loop3
endi endi
if $data02 != 20 then if $data02 != 20 then
print =====data02=$data02 print =====data02=$data02
goto loop2 goto loop3
endi endi
if $data03 != 1 then if $data03 != 1 then
print =====data03=$data03 print =====data03=$data03
goto loop2 goto loop3
endi endi
if $data04 != NULL then if $data04 != NULL then
print =====data04=$data04 print =====data04=$data04
goto loop2 goto loop3
endi endi
if $data11 != 40 then if $data11 != 40 then
print =====data11=$data11 print =====data11=$data11
goto loop2 goto loop3
endi endi
if $data12 != 50 then if $data12 != 50 then
print =====data12=$data12 print =====data12=$data12
goto loop2 goto loop3
endi endi
if $data13 != 1 then if $data13 != 1 then
print =====data13=$data13 print =====data13=$data13
goto loop2 goto loop3
endi endi
if $data14 != NULL then if $data14 != NULL then
print =====data14=$data14 print =====data14=$data14
goto loop2 goto loop3
endi
print ===== step7
sql create database result5 vgroups 1;
sql create database test5 vgroups 4;
sql use test5;
sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,2,3);
sql create table t2 using st tags(4,5,6);
sql create stable result5.streamt5(ts timestamp,a int,b int,c int, d int) tags(tg1 int,tg2 int,tg3 int);
sql create stream streams5 trigger at_once into result5.streamt5(ts,c,a,b) tags(tg2, tg3, tg1) subtable( concat("tbl-", cast(tg3 as varchar(10)) ) ) as select _wstart, count(*) c1, max(a),min(b) c2 from st partition by ta+1 as tg1, cast(tb as bigint) as tg2, a as tg3 session(ts, 10s);
sql insert into t1 values(1648791213000,NULL,NULL,NULL);
$loop_count = 0
print select _wstart, count(*) c1, max(a),min(b) c2 from st partition by ta+1 as tg1, cast(tb as bigint) as tg2, a as tg3 session(ts, 10s);
sql select _wstart, count(*) c1, max(a),min(b) c2 from st partition by ta+1 as tg1, cast(tb as bigint) as tg2, a as tg3 session(ts, 10s);
print $data00, $data01, $data02, $data03
print $data10, $data11, $data12, $data13
print $data20, $data21, $data22, $data23
loop4:
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
print sql select * from result5.streamt5 order by tg1;
sql select * from result5.streamt5 order by tg1;
print $data00, $data01, $data02, $data03 $data04 $data05 $data06 $data07
print $data10, $data11, $data12, $data13
print $data20, $data21, $data22, $data23
if $rows != 1 then
print =====rows=$rows
goto loop4
endi
if $data01 != NULL then
print =====data01=$data01
goto loop4
endi
if $data02 != NULL then
print =====data02=$data02
goto loop4
endi
if $data03 != 1 then
print =====data03=$data03
goto loop4
endi
if $data04 != NULL then
print =====data04=$data04
goto loop4
endi
if $data05 != 2 then
print =====data05=$data05
goto loop4
endi
if $data06 != 2 then
print =====data06=$data06
goto loop4
endi
if $data07 != NULL then
print =====data07=$data07
goto loop4
endi
sql drop stream if exists streams4;
sql drop stream if exists streams5;
sql drop database if exists test4;
sql drop database if exists test5;
sql drop database if exists result4;
sql drop database if exists result5;
print ===== step8
sql drop stream if exists streams8;
sql drop database if exists test8;
sql create database test8 vgroups 1;
sql use test8;
sql create table t1(ts timestamp, a int, b int , c int, d double);
sql create stream streams8 trigger at_once into streamt8 as select _wstart as ts, count(*) c1, count(d) c2, count(c) c3 from t1 partition by tbname interval(10s) ;
sql drop stream streams8;
sql create stream streams71 trigger at_once into streamt8(ts, c2) tags(group_id)as select _wstart, count(*) from t1 partition by tbname as group_id interval(10s);
sql insert into t1 values(1648791233000,1,2,3,1.0);
loop8:
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sql select * from streamt8;
print $data00, $data01, $data02, $data03
print $data10, $data11, $data12, $data13
print $data20, $data21, $data22, $data23
if $rows != 1 then
print =====rows=$rows
goto loop8
endi
if $data01 != NULL then
print =====data01=$data01
goto loop8
endi
if $data02 != 1 then
print =====data02=$data02
goto loop8
endi
if $data03 != NULL then
print =====data03=$data03
goto loop8
endi endi
print ======over print ======over
......
...@@ -39,7 +39,10 @@ sql select table_name from information_schema.ins_tables where db_name="result" ...@@ -39,7 +39,10 @@ sql select table_name from information_schema.ins_tables where db_name="result"
if $rows != 2 then if $rows != 2 then
print =====rows=$rows print =====rows=$rows
print $data00 $data10 print $data00
print $data10
print $data20
print $data30
goto loop0 goto loop0
endi endi
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册