diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 17cf5d43b575dbe2840bec24a29e97dc399ccd7d..3f3f4f5b5d70dbb70f88f395b86d84833010c873 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -940,7 +940,7 @@ static int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock } // do not show for cleared subscription -#if 0 +#if 1 int32_t sz = taosArrayGetSize(pSub->unassignedVgs); for (int32_t i = 0; i < sz; i++) { SMqVgEp *pVgEp = taosArrayGetP(pSub->unassignedVgs, i); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 192016166a4d386aa6873955d9411efe32df2412..96ce6e8eeeeaf17243d8e29baa733c369437c931 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -235,6 +235,15 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) { } } } + while (1) { + pIter = taosHashIterate(pTq->pStreamTasks, pIter); + if (pIter == NULL) break; + SStreamTask* pTask = (SStreamTask*)pIter; + if (pTask->inputType == STREAM_INPUT__DATA_SUBMIT) { + int32_t code = qUpdateQualifiedTableId(pTask->exec.executor, tbUidList, isAdd); + ASSERT(code == 0); + } + } return 0; } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 40f75804dd36e23c06f4bcc189f355aea6b71a56..621ba1187205a0dea417e02aa0f832e772605133 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -678,6 +678,7 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in int32_t nRows; int32_t tsize, ret; SEncoder encoder = {0}; + SArray *newTbUids = NULL; terrno = TSDB_CODE_SUCCESS; pRsp->code = 0; @@ -698,6 +699,7 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in } submitRsp.pArray = taosArrayInit(pSubmitReq->numOfBlocks, sizeof(SSubmitBlkRsp)); + newTbUids = taosArrayInit(pSubmitReq->numOfBlocks, sizeof(int64_t)); if (!submitRsp.pArray) { pRsp->code = TSDB_CODE_OUT_OF_MEMORY; goto _exit; @@ -727,6 +729,7 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in goto _exit; } } + taosArrayPush(newTbUids, &createTbReq.uid); submitBlkRsp.uid = createTbReq.uid; submitBlkRsp.tblFName = taosMemoryMalloc(strlen(pVnode->config.dbname) + strlen(createTbReq.name) + 2); @@ -754,8 +757,10 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in submitRsp.affectedRows += submitBlkRsp.affectedRows; taosArrayPush(submitRsp.pArray, &submitBlkRsp); } + tqUpdateTbUidList(pVnode->pTq, newTbUids, true); _exit: + taosArrayDestroy(newTbUids); tEncodeSize(tEncodeSSubmitRsp, &submitRsp, tsize, ret); pRsp->pCont = rpcMallocCont(tsize); pRsp->contLen = tsize; diff --git a/source/libs/stream/src/tstream.c b/source/libs/stream/src/tstream.c index 7e4f83a693cf9301da29493ea984828c2731552a..cc9fc8bd803ae68e909c69634e0077e1e5507c90 100644 --- a/source/libs/stream/src/tstream.c +++ b/source/libs/stream/src/tstream.c @@ -35,6 +35,14 @@ void* streamDataBlockDecode(const void* buf, SStreamDataBlock* pInput) { return (void*)buf; } +static int32_t streamBuildDispatchMsg(SStreamTask* pTask, SArray* data, SRpcMsg* pMsg, SEpSet** ppEpSet) { + SStreamDispatchReq req = { + .streamId = pTask->streamId, + .data = data, + }; + return 0; +} + static int32_t streamBuildExecMsg(SStreamTask* pTask, SArray* data, SRpcMsg* pMsg, SEpSet** ppEpSet) { SStreamTaskExecReq req = { .streamId = pTask->streamId, @@ -407,6 +415,26 @@ int32_t streamProcessRecoverRsp(SStreamTask* pTask, SStreamTaskRecoverRsp* pRsp) return 0; } +int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* pReq) { + if (tStartEncode(pEncoder) < 0) return -1; + if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->sourceTaskId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->sourceVg) < 0) return -1; + tEndEncode(pEncoder); + return 0; +} + +int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) { + if (tStartDecode(pDecoder) < 0) return -1; + if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->sourceTaskId) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->sourceVg) < 0) return -1; + tEndDecode(pDecoder); + return 0; +} + int32_t tEncodeSStreamTaskExecReq(void** buf, const SStreamTaskExecReq* pReq) { int32_t tlen = 0; tlen += taosEncodeFixedI64(buf, pReq->streamId); diff --git a/tests/pytest/stream/test3.py b/tests/pytest/stream/test3.py new file mode 100644 index 0000000000000000000000000000000000000000..b45521a9476961394c1cf4b2454d6fb9e2368c68 --- /dev/null +++ b/tests/pytest/stream/test3.py @@ -0,0 +1,34 @@ +# -*- coding: utf-8 -*- + +import sys +from util.log import * +from util.cases import * +from util.sql import * +from util.common import tdCom +class TDTestCase: + def init(self, conn, logSql): + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor(), logSql) + + def run(self): + #for i in range(100): + tdSql.prepare() + dbname = tdCom.getLongName(10, "letters") + tdSql.execute('create database if not exists djnhawvlgq vgroups 1') + tdSql.execute('use djnhawvlgq') + tdSql.execute('create table if not exists downsampling_stb (ts timestamp, c1 int, c2 double, c3 varchar(100), c4 bool) tags (t1 int, t2 double, t3 varchar(100), t4 bool);') + tdSql.execute('create table downsampling_ct1 using downsampling_stb tags(10, 10.1, "Beijing", True);') + tdSql.execute('create table if not exists scalar_stb (ts timestamp, c1 int, c2 double, c3 binary(20), c4 nchar(20), c5 nchar(20)) tags (t1 int);') + tdSql.execute('create table scalar_ct1 using scalar_stb tags(10);') + tdSql.execute('create table if not exists data_filter_stb (ts timestamp, c1 tinyint, c2 smallint, c3 int, c4 bigint, c5 float, c6 double, c7 binary(100), c8 nchar(200), c9 bool, c10 tinyint unsigned, c11 smallint unsigned, c12 int unsigned, c13 bigint unsigned) tags (t1 tinyint, t2 smallint, t3 int, t4 bigint, t5 float, t6 double, t7 binary(100), t8 nchar(200), t9 bool, t10 tinyint unsigned, t11 smallint unsigned, t12 int unsigned, t13 bigint unsigned)') + tdSql.execute('create table if not exists data_filter_ct1 using data_filter_stb tags (1, 2, 3, 4, 5.5, 6.6, "binary7", "nchar8", true, 11, 12, 13, 14)') + tdSql.execute('create stream data_filter_stream into output_data_filter_stb as select * from data_filter_stb where ts >= 1653648072973+1s and c1 = 1 or c2 > 1 and c3 != 4 or c4 <= 3 and c5 <> 0 or c6 is not Null or c7 is Null or c8 between "na" and "nchar4" and c8 not between "bi" and "binary" and c8 match "nchar[19]" and c8 nmatch "nchar[25]" or c9 in (1, 2, 3) or c10 not in (6, 7) and c8 like "nch%" and c7 not like "bina_" and c11 <= 10 or c12 is Null or c13 >= 4;') + tdSql.execute('insert into data_filter_ct1 values (1653648072973, 1, 1, 1, 3, 1.1, 1.1, "binary1", "nchar1", true, 1, 2, 3, 4);') + tdSql.execute('insert into data_filter_ct1 values (1653648072973+1s, 2, 2, 1, 3, 1.1, 1.1, "binary2", "nchar2", true, 2, 3, 4, 5);') + def stop(self): + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + + +tdCases.addWindows(__file__, TDTestCase()) +tdCases.addLinux(__file__, TDTestCase())