未验证 提交 f6b38f37 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #14213 from taosdata/feature/stream

test(stream): partition by tbname
......@@ -321,6 +321,10 @@ FAIL:
}
int32_t mndPersistTaskDeployReq(STrans *pTrans, const SStreamTask *pTask) {
ASSERT(pTask->isDataScan == 0 || pTask->isDataScan == 1);
if (pTask->isDataScan == 0 && pTask->sinkType == TASK_SINK__NONE) {
ASSERT(taosArrayGetSize(pTask->childEpInfo) != 0);
}
SEncoder encoder;
tEncoderInit(&encoder, NULL, 0);
tEncodeSStreamTask(&encoder, pTask);
......
......@@ -455,6 +455,9 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen) {
}
tDecoderClear(&decoder);
ASSERT(pTask->isDataScan == 0 || pTask->isDataScan == 1);
if (pTask->isDataScan == 0 && pTask->sinkType == TASK_SINK__NONE) {
ASSERT(taosArrayGetSize(pTask->childEpInfo) != 0);
}
pTask->execStatus = TASK_EXEC_STATUS__IDLE;
......
......@@ -145,10 +145,10 @@ static SArray* filterQualifiedChildTables(const SStreamBlockScanInfo* pScanInfo,
continue;
}
ASSERT(mr.me.type == TSDB_CHILD_TABLE);
if (mr.me.ctbEntry.suid != pScanInfo->tableUid) {
if (mr.me.type != TSDB_CHILD_TABLE || mr.me.ctbEntry.suid != pScanInfo->tableUid) {
continue;
}
// TODO handle ntb case
taosArrayPush(qa, id);
}
......
......@@ -4365,6 +4365,26 @@ _error:
return NULL;
}
static int32_t extractTbscanInStreamOpTree(SOperatorInfo* pOperator, STableScanInfo** ppInfo) {
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
if (pOperator->numOfDownstream == 0) {
qError("failed to find stream scan operator");
return TSDB_CODE_QRY_APP_ERROR;
}
if (pOperator->numOfDownstream > 1) {
qError("join not supported for stream block scan");
return TSDB_CODE_QRY_APP_ERROR;
}
return extractTbscanInStreamOpTree(pOperator->pDownstream[0], ppInfo);
} else {
SStreamBlockScanInfo* pInfo = pOperator->info;
ASSERT(pInfo->pSnapshotReadOp->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN);
*ppInfo = pInfo->pSnapshotReadOp->info;
return 0;
}
}
int32_t extractTableScanNode(SPhysiNode* pNode, STableScanPhysiNode** ppNode) {
if (pNode->pChildren == NULL || LIST_LENGTH(pNode->pChildren) == 0) {
if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == pNode->type) {
......@@ -4387,37 +4407,27 @@ int32_t extractTableScanNode(SPhysiNode* pNode, STableScanPhysiNode** ppNode) {
return -1;
}
int32_t doRebuildReader(SOperatorInfo* pOperator, SSubplan* plan, SReadHandle* pHandle) {
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
if (pOperator->numOfDownstream == 0) {
qError("failed to find stream scan operator");
return TSDB_CODE_QRY_APP_ERROR;
}
int32_t rebuildReader(SOperatorInfo* pOperator, SSubplan* plan, SReadHandle* pHandle, int64_t uid, int64_t ts) {
STableScanInfo* pTableScanInfo = NULL;
if (extractTbscanInStreamOpTree(pOperator, &pTableScanInfo) < 0) {
return -1;
}
if (pOperator->numOfDownstream > 1) {
qError("join not supported for stream block scan");
return TSDB_CODE_QRY_APP_ERROR;
}
return doRebuildReader(pOperator->pDownstream[0], plan, pHandle);
} else {
SStreamBlockScanInfo* pInfo = pOperator->info;
ASSERT(pInfo->pSnapshotReadOp->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN);
STableScanInfo* pTableScanInfo = pInfo->pSnapshotReadOp->info;
STableScanPhysiNode* pNode = NULL;
if (extractTableScanNode(plan->pNode, &pNode) < 0) {
ASSERT(0);
}
tsdbCleanupReadHandle(pTableScanInfo->dataReader);
STableScanPhysiNode* pNode = NULL;
if (extractTableScanNode(plan->pNode, &pNode) < 0) {
ASSERT(0);
}
tsdbCleanupReadHandle(pTableScanInfo->dataReader);
STableListInfo info = {0};
pTableScanInfo->dataReader = doCreateDataReader(pNode, pHandle, &info, 0, 0);
if (pTableScanInfo->dataReader == NULL) {
ASSERT(0);
qError("failed to create data reader");
return TSDB_CODE_QRY_APP_ERROR;
}
STableListInfo info = {0};
pTableScanInfo->dataReader = doCreateDataReader(pNode, pHandle, &info, 0, 0);
if (pTableScanInfo->dataReader == NULL) {
ASSERT(0);
qError("failed to create data reader");
return TSDB_CODE_QRY_APP_ERROR;
}
// TODO: set uid and ts to data reader
return 0;
}
......
......@@ -70,20 +70,21 @@ int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* p
if (tEncodeI32(pEncoder, pReq->dstTaskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->srcNodeId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->srcTaskId) < 0) return -1;
if (tEncodeBinary(pEncoder, (const uint8_t*)&pReq->pRetrieve, pReq->retrieveLen) < 0) return -1;
if (tEncodeBinary(pEncoder, (const uint8_t*)pReq->pRetrieve, pReq->retrieveLen) < 0) return -1;
tEndEncode(pEncoder);
return pEncoder->pos;
}
int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq) {
int32_t tlen = 0;
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->dstNodeId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->dstTaskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->srcNodeId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->srcTaskId) < 0) return -1;
if (tDecodeBinary(pDecoder, (uint8_t**)&pReq->pRetrieve, &pReq->retrieveLen) < 0) return -1;
uint64_t len = 0;
if (tDecodeBinaryAlloc(pDecoder, (void**)&pReq->pRetrieve, &len) < 0) return -1;
pReq->retrieveLen = len;
tEndDecode(pDecoder);
return 0;
}
......
......@@ -23,7 +23,7 @@ int32_t walRestoreFromSnapshot(SWal *pWal, int64_t ver) {
void *pIter = NULL;
while (1) {
taosHashIterate(pWal->pRefHash, pIter);
pIter = taosHashIterate(pWal->pRefHash, pIter);
if (pIter == NULL) break;
SWalRef *pRef = (SWalRef *)pIter;
if (pRef->ver != -1) {
......@@ -309,7 +309,7 @@ static int walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) {
SWalIdxEntry entry = {.ver = ver, .offset = offset};
/*int64_t idxOffset = taosLSeekFile(pWal->pWriteIdxTFile, 0, SEEK_CUR);*/
/*wDebug("write index: ver: %ld, offset: %ld, at %ld", ver, offset, idxOffset);*/
int size = taosWriteFile(pWal->pWriteIdxTFile, &entry, sizeof(SWalIdxEntry));
int64_t size = taosWriteFile(pWal->pWriteIdxTFile, &entry, sizeof(SWalIdxEntry));
if (size != sizeof(SWalIdxEntry)) {
terrno = TAOS_SYSTEM_ERROR(errno);
// TODO truncate
......
......@@ -90,6 +90,7 @@
./test.sh -f tsim/stream/triggerInterval0.sim
# ./test.sh -f tsim/stream/triggerSession0.sim
./test.sh -f tsim/stream/partitionby.sim
./test.sh -f tsim/stream/partitionby1.sim
./test.sh -f tsim/stream/schedSnode.sim
./test.sh -f tsim/stream/windowClose.sim
......
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sleep 50
sql connect
sql create database test vgroups 4;
sql use test;
sql create stable st(ts timestamp, a int, b int , c int, d double) tags(ta int,tb int,tc int);
sql create table ts1 using st tags(1,1,1);
sql create table ts2 using st tags(2,2,2);
sql create table ts3 using st tags(3,2,2);
sql create table ts4 using st tags(4,2,2);
sql create stream stream_t1 trigger at_once into streamtST1 as select _wstartts, count(*) c1, count(d) c2 , sum(a) c3 , max(b) c4, min(c) c5 from st partition by tbname interval(10s);
sql insert into ts1 values(1648791213001,1,12,3,1.0);
sql insert into ts2 values(1648791213001,1,12,3,1.0);
sql insert into ts3 values(1648791213001,1,12,3,1.0);
sql insert into ts4 values(1648791213001,1,12,3,1.0);
$loop_count = 0
loop0:
sleep 300
sql select * from streamtST1;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 4 then
print =====rows=$rows
goto loop0
endi
print =====loop0
sql create database test1 vgroups 1;
sql use test1;
sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create table ts1 using st tags(1,2,3);
sql create table ts2 using st tags(1,3,4);
sql create table ts3 using st tags(1,4,5);
sql create stream streams1 trigger at_once into streamt as select _wstartts, count(*) c1, count(a) c2 from st partition by tbname interval(10s);
sql insert into ts1 values(1648791211000,1,2,3);
sql insert into ts2 values(1648791211000,1,2,3);
$loop_count = 0
loop1:
sleep 300
sql select * from streamt;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 2 then
print =====rows=$rows
goto loop1
endi
print =====loop1
sql create database test2 vgroups 1;
sql use test2;
sql create stable st(ts timestamp,a int,b int,c int,id int) tags(ta int,tb int,tc int);
sql create table ts1 using st tags(1,1,1);
sql create table ts2 using st tags(2,2,2);
sql create stream stream_t2 trigger at_once into streamtST as select _wstartts, count(*) c1, count(a) c2 , sum(a) c3 , max(b) c5, min(c) c6, max(id) c7 from st partition by tbname interval(10s) ;
sql insert into ts1 values(1648791211000,1,2,3,1);
sql insert into ts1 values(1648791222001,2,2,3,2);
sql insert into ts2 values(1648791211000,1,2,3,3);
sql insert into ts2 values(1648791222001,2,2,3,4);
sql insert into ts2 values(1648791222002,2,2,3,5);
sql insert into ts2 values(1648791222002,2,2,3,6);
sql insert into ts1 values(1648791211000,1,2,3,1);
sql insert into ts1 values(1648791222001,2,2,3,2);
sql insert into ts2 values(1648791211000,1,2,3,3);
sql insert into ts2 values(1648791222001,2,2,3,4);
$loop_count = 0
loop2:
sleep 300
sql select * from streamtST;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $data01 != 1 then
print =====data01=$data01
goto loop2
endi
if $data02 != 1 then
print =====data02=$data02
goto loop2
endi
if $data03 != 1 then
print =====data03=$data03
goto loop2
endi
if $data04 != 2 then
print =====data04=$data04
goto loop2
endi
print =====loop2
system sh/stop_dnodes.sh
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册