提交 3ec1560f 编写于 作者: 5 54liuyao

create stream and use existing super table

上级 5714b5ef
......@@ -343,7 +343,8 @@ void tFreeSSubmitRsp(SSubmitRsp* pRsp);
#define COL_IS_SET(FLG) (((FLG) & (COL_SET_VAL | COL_SET_NULL)) != 0)
#define COL_CLR_SET(FLG) ((FLG) &= (~(COL_SET_VAL | COL_SET_NULL)))
#define IS_BSMA_ON(s) (((s)->flags & 0x01) == COL_SMA_ON)
#define IS_BSMA_ON(s) (((s)->flags & 0x01) == COL_SMA_ON)
#define IS_SET_NULL(s) (((s)->flags & COL_SET_NULL) == COL_SET_NULL)
#define SSCHMEA_TYPE(s) ((s)->type)
#define SSCHMEA_FLAGS(s) ((s)->flags)
......@@ -1771,7 +1772,9 @@ typedef struct {
// 3.0.20
int64_t checkpointFreq; // ms
// 3.0.2.3
int8_t createStb;
int8_t createStb;
uint64_t targetStbUid;
SArray* fillNullCols;
} SCMCreateStreamReq;
typedef struct {
......
......@@ -207,6 +207,12 @@ typedef struct SQueryNodeStat {
int32_t tableNum; // vg table number, unit is TSDB_TABLE_NUM_UNIT
} SQueryNodeStat;
typedef struct SColLocation {
int16_t slotId;
col_id_t colId;
int8_t type;
} SColLocation;
int32_t initTaskQueue();
int32_t cleanupTaskQueue();
......
......@@ -5425,6 +5425,7 @@ int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateS
if (tEncodeCStr(&encoder, pField->name) < 0) return -1;
}
if (tEncodeI8(&encoder, pReq->createStb) < 0) return -1;
if (tEncodeU64(&encoder, pReq->targetStbUid) < 0) return -1;
tEndEncode(&encoder);
......@@ -5486,6 +5487,7 @@ int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStrea
}
}
if (tDecodeI8(&decoder, &pReq->createStb) < 0) return -1;
if (tDecodeU64(&decoder, &pReq->targetStbUid) < 0) return -1;
tEndDecode(&decoder);
......
......@@ -314,7 +314,11 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
}
tstrncpy(pObj->targetDb, pTargetDb->name, TSDB_DB_FNAME_LEN);
pObj->targetStbUid = mndGenerateUid(pObj->targetSTbName, TSDB_TABLE_FNAME_LEN);
if (pCreate->createStb == STREAM_CREATE_STABLE_TRUE) {
pObj->targetStbUid = mndGenerateUid(pObj->targetSTbName, TSDB_TABLE_FNAME_LEN);
} else {
pObj->targetStbUid = pCreate->targetStbUid;
}
pObj->targetDbUid = pTargetDb->uid;
mndReleaseDb(pMnode, pTargetDb);
......@@ -334,6 +338,38 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
goto FAIL;
}
int32_t numOfNULL = taosArrayGetSize(pCreate->fillNullCols);
if(numOfNULL > 0) {
pObj->outputSchema.nCols += numOfNULL;
SSchema* pFullSchema = taosMemoryCalloc(pObj->outputSchema.nCols, sizeof(SSchema));
if (!pFullSchema) {
goto FAIL;
}
int32_t nullIndex = 0;
int32_t dataIndex = 0;
for (int16_t i = 0; i < pObj->outputSchema.nCols; i++) {
SColLocation* pos = taosArrayGet(pCreate->fillNullCols, nullIndex);
if (i < pos->slotId) {
pFullSchema[i].bytes = pObj->outputSchema.pSchema[dataIndex].bytes;
pFullSchema[i].colId = i + 1; // pObj->outputSchema.pSchema[dataIndex].colId;
pFullSchema[i].flags = pObj->outputSchema.pSchema[dataIndex].flags;
strcpy(pFullSchema[i].name, pObj->outputSchema.pSchema[dataIndex].name);
pFullSchema[i].type = pObj->outputSchema.pSchema[dataIndex].type;
dataIndex++;
} else {
pFullSchema[i].bytes = 0;
pFullSchema[i].colId = pos->colId;
pFullSchema[i].flags = COL_SET_NULL;
memset(pFullSchema[i].name, 0, TSDB_COL_NAME_LEN);
pFullSchema[i].type = pos->type;
nullIndex++;
}
}
taosMemoryFree(pObj->outputSchema.pSchema);
pObj->outputSchema.pSchema = pFullSchema;
}
SPlanContext cxt = {
.pAstRoot = pAst,
.topicQuery = false,
......
......@@ -323,19 +323,10 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d
taosArrayDestroy(tagArray);
}
static int32_t encodeCreateChildTableForRPC(SVCreateTbReq* req, int32_t vgId, void** pBuf, int32_t* contLen) {
static int32_t encodeCreateChildTableForRPC(SVCreateTbBatchReq* pReqs, int32_t vgId, void** pBuf, int32_t* contLen) {
int32_t ret = 0;
SVCreateTbBatchReq reqs = {0};
reqs.pArray = taosArrayInit(1, sizeof(struct SVCreateTbReq));
if (NULL == reqs.pArray) {
ret = -1;
goto end;
}
taosArrayPush(reqs.pArray, req);
reqs.nReqs = 1;
tEncodeSize(tEncodeSVCreateTbBatchReq, &reqs, *contLen, ret);
tEncodeSize(tEncodeSVCreateTbBatchReq, pReqs, *contLen, ret);
if (ret < 0) {
ret = -1;
goto end;
......@@ -350,7 +341,7 @@ static int32_t encodeCreateChildTableForRPC(SVCreateTbReq* req, int32_t vgId, vo
((SMsgHead*)(*pBuf))->contLen = htonl(*contLen);
SEncoder coder = {0};
tEncoderInit(&coder, POINTER_SHIFT(*pBuf, sizeof(SMsgHead)), (*contLen) - sizeof(SMsgHead) );
if (tEncodeSVCreateTbBatchReq(&coder, &reqs) < 0) {
if (tEncodeSVCreateTbBatchReq(&coder, pReqs) < 0) {
rpcFreeCont(*pBuf);
*pBuf = NULL;
*contLen = 0;
......@@ -361,14 +352,13 @@ static int32_t encodeCreateChildTableForRPC(SVCreateTbReq* req, int32_t vgId, vo
tEncoderClear(&coder);
end:
taosArrayDestroy(reqs.pArray);
return ret;
}
int32_t tqPutReqToQueue(SVnode* pVnode, SVCreateTbReq* pReq) {
int32_t tqPutReqToQueue(SVnode* pVnode, SVCreateTbBatchReq* pReqs) {
void* buf = NULL;
int32_t tlen = 0;
encodeCreateChildTableForRPC(pReq, TD_VID(pVnode), &buf, &tlen);
encodeCreateChildTableForRPC(pReqs, TD_VID(pVnode), &buf, &tlen);
SRpcMsg msg = {
.msgType = TDMT_VND_CREATE_TABLE,
......@@ -387,6 +377,7 @@ _error:
tqError("failed to encode submit req since %s", terrstr());
return TSDB_CODE_OUT_OF_MEMORY;
}
void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
const SArray* pBlocks = (const SArray*)data;
SVnode* pVnode = (SVnode*)vnode;
......@@ -402,6 +393,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
void* pBuf = NULL;
SArray* tagArray = NULL;
SArray* pVals = NULL;
SArray* crTblArray = NULL;
for (int32_t i = 0; i < blockSz; i++) {
SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
......@@ -442,8 +434,14 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
tqDebug("failed to put delete req into write-queue since %s", terrstr());
}
} else if (pDataBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
SVCreateTbBatchReq reqs = {0};
crTblArray = reqs.pArray = taosArrayInit(1, sizeof(struct SVCreateTbReq));
if (NULL == reqs.pArray) {
goto _end;
}
for (int32_t rowId = 0; rowId < rows; rowId++) {
SVCreateTbReq* pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateStbReq));
SVCreateTbReq createTbReq = {0};
SVCreateTbReq* pCreateTbReq = &createTbReq;
if (!pCreateTbReq) {
goto _end;
}
......@@ -511,6 +509,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _end;
}
pCreateTbReq->ctb.pTag = (uint8_t*)pTag;
// set table name
......@@ -524,13 +523,15 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
pCreateTbReq->name = taosMemoryCalloc(1, varDataLen(pTbData) + 1);
memcpy(pCreateTbReq->name, varDataVal(pTbData), varDataLen(pTbData));
}
if (tqPutReqToQueue(pVnode, pCreateTbReq) != TSDB_CODE_SUCCESS) {
goto _end;
}
tdDestroySVCreateTbReq(pCreateTbReq);
taosMemoryFreeClear(pCreateTbReq);
taosArrayPush(reqs.pArray, pCreateTbReq);
}
reqs.nReqs = taosArrayGetSize(reqs.pArray);
if (tqPutReqToQueue(pVnode, &reqs) != TSDB_CODE_SUCCESS) {
goto _end;
}
tagArray = taosArrayDestroy(tagArray);
taosArrayDestroyEx(crTblArray, (FDelete)tdDestroySVCreateTbReq);
crTblArray = NULL;
} else {
SSubmitTbData tbData = {0};
tqDebug("tq sink pipe2, convert block1 %d, rows: %d", i, rows);
......@@ -579,7 +580,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
goto _end;
}
STagVal tagVal = {
.cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1,
.cid = pTSchema->numOfCols + 1,
.type = TSDB_DATA_TYPE_UBIGINT,
.i64 = (int64_t)pDataBlock->info.id.groupId,
};
......@@ -638,28 +639,37 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
for (int32_t j = 0; j < rows; j++) {
taosArrayClear(pVals);
int32_t dataIndex = 0;
for (int32_t k = 0; k < pTSchema->numOfCols; k++) {
const STColumn* pCol = &pTSchema->columns[k];
SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, k);
if (k == 0) {
SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, dataIndex);
void* colData = colDataGetData(pColData, j);
tqDebug("tq sink pipe2, row %d, col %d ts %" PRId64, j, k, *(int64_t*)colData);
}
if (colDataIsNull_s(pColData, j)) {
if (IS_SET_NULL(pCol)) {
SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
taosArrayPush(pVals, &cv);
} else {
void* colData = colDataGetData(pColData, j);
if (IS_STR_DATA_TYPE(pCol->type)) {
SValue sv =
(SValue){.nData = varDataLen(colData), .pData = varDataVal(colData)}; // address copy, no value
SColVal cv = COL_VAL_VALUE(pCol->colId, pCol->type, sv);
} else{
SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, dataIndex);
if (colDataIsNull_s(pColData, j)) {
SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
taosArrayPush(pVals, &cv);
dataIndex++;
} else {
SValue sv;
memcpy(&sv.val, colData, tDataTypes[pCol->type].bytes);
SColVal cv = COL_VAL_VALUE(pCol->colId, pCol->type, sv);
taosArrayPush(pVals, &cv);
void* colData = colDataGetData(pColData, j);
if (IS_STR_DATA_TYPE(pCol->type)) {
SValue sv =
(SValue){.nData = varDataLen(colData), .pData = varDataVal(colData)}; // address copy, no value
SColVal cv = COL_VAL_VALUE(pCol->colId, pCol->type, sv);
taosArrayPush(pVals, &cv);
} else {
SValue sv;
memcpy(&sv.val, colData, tDataTypes[pCol->type].bytes);
SColVal cv = COL_VAL_VALUE(pCol->colId, pCol->type, sv);
taosArrayPush(pVals, &cv);
}
dataIndex++;
}
}
}
......@@ -716,5 +726,6 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
_end:
taosArrayDestroy(tagArray);
taosArrayDestroy(pVals);
taosArrayDestroyEx(crTblArray, (FDelete)tdDestroySVCreateTbReq);
// TODO: change
}
......@@ -5740,7 +5740,7 @@ static int32_t adjustDataTypeOfProjections(STranslateContext* pCxt, const STable
int32_t index = 0;
SNode* pProj = NULL;
FOREACH(pProj, pProjections) {
SSchema* pSchema = pSchemas + index;
SSchema* pSchema = pSchemas + index++;
SDataType dt = {.type = pSchema->type, .bytes = pSchema->bytes};
if (!dataTypeEqual(&dt, &((SExprNode*)pProj)->resType)) {
SNode* pFunc = NULL;
......@@ -5761,7 +5761,7 @@ typedef struct SProjColPos {
} SProjColPos;
static int32_t projColPosCompar(const void* l, const void* r) {
return ((SProjColPos*)l)->colId < ((SProjColPos*)r)->colId;
return ((SProjColPos*)l)->colId > ((SProjColPos*)r)->colId;
}
static void projColPosDelete(void* p) { taosMemoryFree(((SProjColPos*)p)->pProj); }
......@@ -5856,7 +5856,11 @@ static int32_t adjustStreamQueryForExistTable(STranslateContext* pCxt, SCreateSt
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_TABLE_NOT_EXIST, pStmt->targetTabName);
}
pReq->createStb = STREAM_CREATE_STABLE_TRUE;
pReq->targetStbUid = 0;
return TSDB_CODE_SUCCESS;
} else {
pReq->createStb = STREAM_CREATE_STABLE_FALSE;
pReq->targetStbUid = pMeta->suid;
}
if (TSDB_CODE_SUCCESS == code) {
code = adjustStreamQueryForExistTableImpl(pCxt, pStmt, pMeta);
......
......@@ -247,8 +247,8 @@
,,y,script,./test.sh -f tsim/stream/fillIntervalPartitionBy.sim
,,y,script,./test.sh -f tsim/stream/fillIntervalPrevNext.sim
,,y,script,./test.sh -f tsim/stream/fillIntervalValue.sim
,,y,script,./test.sh -f tsim/stream/tableAndTag0.sim
,,y,script,./test.sh -f tsim/stream/tableAndTag1.sim
,,y,script,./test.sh -f tsim/stream/udTableAndTag0.sim
,,y,script,./test.sh -f tsim/stream/udTableAndTag1.sim
,,y,script,./test.sh -f tsim/trans/lossdata1.sim
,,y,script,./test.sh -f tsim/trans/create_db.sim
,,y,script,./test.sh -f tsim/tmq/basic1.sim
......
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
print ===== step1
system sh/exec.sh -n dnode1 -s start
sleep 50
sql connect
print ===== step2
sql create database result vgroups 1;
sql create database test vgroups 4;
sql use test;
sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
sql create stable result.streamt0(ts timestamp,a int,b int) tags(ta int,tb int,tc int);
sql create stream streams0 trigger at_once into result.streamt0 as select _wstart, count(*) c1, max(a) c2 from st partition by tbname interval(10s);
sql insert into t1 values(1648791213000,1,2,3);
sql insert into t2 values(1648791213000,2,2,3);
$loop_count = 0
sql select _wstart, count(*) c1, max(a) c2 from st partition by tbname interval(10s);
print $data00, $data01, $data02
print $data10, $data11, $data12
print $data20, $data21, $data22
loop0:
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sql select * from result.streamt0 order by ta;
if $rows != 2 then
print =====rows=$rows
print $data00, $data01, $data02
print $data10, $data11, $data12
print $data20, $data21, $data22
goto loop0
endi
if $data01 != 1 then
print =====data01=$data01
goto loop0
endi
if $data02 != 1 then
print =====data02=$data02
goto loop0
endi
if $data11 != 1 then
print =====data11=$data11
goto loop0
endi
if $data12 != 2 then
print =====data12=$data12
goto loop0
endi
print ===== step3
sql create database result1 vgroups 1;
sql create database test1 vgroups 4;
sql use test1;
sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
sql create stable result1.streamt1(ts timestamp,a int,b int,c int) tags(ta bigint unsigned,tb int,tc int);
sql create stream streams1 trigger at_once into result1.streamt1(ts,c,a,b) as select _wstart, count(*) c1, max(a),min(b) c2 from st partition by tbname interval(10s);
sql insert into t1 values(1648791213000,10,20,30);
sql insert into t2 values(1648791213000,40,50,60);
$loop_count = 0
sql select _wstart, count(*) c1, max(a),min(b) c2 from st partition by tbname interval(10s);
print $data00, $data01, $data02, $data03
print $data10, $data11, $data12, $data13
print $data20, $data21, $data22, $data23
loop1:
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sql select * from result1.streamt1 order by ta;
if $rows != 2 then
print =====rows=$rows
print $data00, $data01, $data02, $data03
print $data10, $data11, $data12, $data13
print $data20, $data21, $data22, $data23
goto loop1
endi
if $data01 != 10 then
print =====data01=$data01
goto loop1
endi
if $data02 != 20 then
print =====data02=$data02
goto loop1
endi
if $data03 != 1 then
print =====data03=$data03
goto loop1
endi
if $data11 != 40 then
print =====data11=$data11
goto loop1
endi
if $data12 != 50 then
print =====data12=$data12
goto loop1
endi
if $data13 != 1 then
print =====data13=$data13
goto loop1
endi
print ===== step4
sql create database result2 vgroups 1;
sql create database test2 vgroups 4;
sql use test2;
sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
sql create stable result2.streamt2(ts timestamp, a int , b int) tags(ta varchar(20));
# tag dest 1, source 2
##sql_error create stream streams2 trigger at_once into result2.streamt2 TAGS(aa varchar(100), ta int) as select _wstart, count(*) c1, max(a) from st partition by tbname as aa, ta interval(10s);
# column dest 3, source 4
sql_error create stream streams2 trigger at_once into result2.streamt2 as select _wstart, count(*) c1, max(a), max(b) from st partition by tbname interval(10s);
# column dest 3, source 4
sql_error create stream streams2 trigger at_once into result2.streamt2(ts, a, b) as select _wstart, count(*) c1, max(a), max(b) from st partition by tbname interval(10s);
# column dest 3, source 2
sql_error create stream streams2 trigger at_once into result2.streamt2 as select _wstart, count(*) c1 from st partition by tbname interval(10s);
# column dest 3, source 2
sql create stream streams2 trigger at_once into result2.streamt2(ts, a) as select _wstart, count(*) c1 from st partition by tbname interval(10s);
print ===== step5
sql create database result3 vgroups 1;
sql create database test3 vgroups 4;
sql use test3;
sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,2,3);
sql create table t2 using st tags(4,5,6);
sql create stable result3.streamt3(ts timestamp,a int,b int,c int, d int) tags(ta int,tb int,tc int);
sql create stream streams3 trigger at_once into result3.streamt3(ts,c,a,b) as select _wstart, count(*) c1, max(a),min(b) c2 from st interval(10s);
sql insert into t1 values(1648791213000,10,20,30);
sql insert into t2 values(1648791213000,40,50,60);
$loop_count = 0
sql select _wstart, count(*) c1, max(a),min(b) c2 from st interval(10s);
print $data00, $data01, $data02, $data03, $data04
print $data10, $data11, $data12, $data13, $data14
print $data20, $data21, $data22, $data23, $data24
loop2:
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sql select * from result3.streamt3;
if $rows != 1 then
print =====rows=$rows
print $data00, $data01, $data02, $data03
print $data10, $data11, $data12, $data13
print $data20, $data21, $data22, $data23
goto loop2
endi
if $data01 != 40 then
print =====data01=$data01
goto loop2
endi
if $data02 != 20 then
print =====data02=$data02
goto loop2
endi
if $data03 != 2 then
print =====data03=$data03
goto loop2
endi
if $data04 != NULL then
print =====data04=$data04
goto loop2
endi
print ===== step6
sql create database result4 vgroups 1;
sql create database test4 vgroups 4;
sql use test4;
sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,2,3);
sql create table t2 using st tags(4,5,6);
sql create stable result4.streamt4(ts timestamp,a int,b int,c int, d int) tags(ta int,tb int,tc int);
sql create stream streams4 trigger at_once into result4.streamt4(ts,c,a,b) tags(tg2 int, tg3 varchar(100), tg1 bigint) subtable(concat("tbl-", tg1)) as select _wstart, count(*) c1, max(a),min(b) c2 from st partition by ta+1 as tg1, cast(tb as bigint) as tg2, tc as tg3 interval(10s);
sql insert into t1 values(1648791213000,10,20,30);
sql insert into t2 values(1648791213000,40,50,60);
$loop_count = 0
sql select _wstart, count(*) c1, max(a),min(b) c2 from st interval(10s);
print $data00, $data01, $data02, $data03
print $data10, $data11, $data12, $data13
print $data20, $data21, $data22, $data23
loop2:
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sql select * from result4.streamt4;
if $rows != 2 then
print =====rows=$rows
print $data00, $data01, $data02, $data03
print $data10, $data11, $data12, $data13
print $data20, $data21, $data22, $data23
goto loop2
endi
if $data01 != 40 then
print =====data01=$data01
goto loop2
endi
if $data02 != 20 then
print =====data02=$data02
goto loop2
endi
if $data03 != 2 then
print =====data03=$data03
goto loop2
endi
if $data04 != NULL then
print =====data04=$data04
goto loop2
endi
print ======over
system sh/stop_dnodes.sh
......@@ -268,6 +268,105 @@ if $data10 != tbn-t2 then
endi
print ===== step5
print ===== tag name + table name
sql create database result4 vgroups 1;
sql create database test4 vgroups 4;
sql use test4;
sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
sql create table t3 using st tags(3,3,3);
sql create stream streams4 trigger at_once into result4.streamt4 TAGS(dd varchar(100)) SUBTABLE(concat("tbn-", tbname)) as select _wstart, count(*) c1 from st partition by concat("tag-", tbname) as dd, tbname interval(10s);
sql insert into t1 values(1648791213000,1,1,1) t2 values(1648791213000,2,2,2) t3 values(1648791213000,3,3,3);
$loop_count = 0
loop7:
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sql select table_name from information_schema.ins_tables where db_name="result4" order by 1;
if $rows != 3 then
print =====rows=$rows
print $data00 $data10
goto loop7
endi
if $data00 != tbn-t1 then
print =====data00=$data00
goto loop7
endi
if $data10 != tbn-t2 then
print =====data10=$data10
goto loop7
endi
if $data20 != tbn-t3 then
print =====data20=$data20
goto loop7
endi
$loop_count = 0
loop8:
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sql select * from result4.streamt4 order by 3;
if $rows != 3 then
print =====rows=$rows
print $data00 $data10
goto loop8
endi
if $data01 != 1 then
print =====data01=$data01
goto loop8
endi
if $data02 != tag-t1 then
print =====data02=$data02
goto loop8
endi
if $data11 != 1 then
print =====data11=$data11
goto loop8
endi
if $data12 != tag-t2 then
print =====data12=$data12
goto loop8
endi
if $data21 != 1 then
print =====data21=$data21
goto loop8
endi
if $data22 != tag-t3 then
print =====data22=$data22
goto loop8
endi
print ======over
system sh/stop_dnodes.sh
......@@ -269,6 +269,104 @@ if $data10 != tbn-2 then
goto loop6
endi
print ===== step5
print ===== tag name + table name
sql create database result4 vgroups 1;
sql create database test4 vgroups 4;
sql use test4;
sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
sql create table t3 using st tags(3,3,3);
sql create stream streams4 trigger at_once into result4.streamt4 TAGS(dd varchar(100)) SUBTABLE(concat("tbn-", dd)) as select _wstart, count(*) c1 from st partition by concat("t", cast(a as varchar(10) ) ) as dd interval(10s);
sql insert into t1 values(1648791213000,1,1,1) t2 values(1648791213000,2,2,2) t3 values(1648791213000,3,3,3);
$loop_count = 0
loop7:
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sql select table_name from information_schema.ins_tables where db_name="result4" order by 1;
if $rows != 3 then
print =====rows=$rows
print $data00 $data10
goto loop7
endi
if $data00 != tbn-t1 then
print =====data00=$data00
goto loop7
endi
if $data10 != tbn-t2 then
print =====data10=$data10
goto loop7
endi
if $data20 != tbn-t3 then
print =====data20=$data20
goto loop7
endi
$loop_count = 0
loop8:
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sql select * from result4.streamt4 order by 3;
if $rows != 3 then
print =====rows=$rows
print $data00 $data10
goto loop8
endi
if $data01 != 1 then
print =====data01=$data01
goto loop8
endi
if $data02 != t1 then
print =====data02=$data02
goto loop8
endi
if $data11 != 1 then
print =====data11=$data11
goto loop8
endi
if $data12 != t2 then
print =====data12=$data12
goto loop8
endi
if $data21 != 1 then
print =====data21=$data21
goto loop8
endi
if $data22 != t3 then
print =====data22=$data22
goto loop8
endi
print ======over
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册