diff --git a/examples/c/stream_demo.c b/examples/c/stream_demo.c index a4fc30ff65d63d435eb533f1f2624e0694e1d6b0..f5cb7f112073e4861bbc53f2bb50bdc161003a0f 100644 --- a/examples/c/stream_demo.c +++ b/examples/c/stream_demo.c @@ -99,8 +99,8 @@ int32_t create_stream() { /*const char* sql = "select sum(k) from tu1 interval(10m)";*/ /*pRes = tmq_create_stream(pConn, "stream1", "out1", sql);*/ pRes = taos_query(pConn, - "create stream stream1 trigger at_once into outstb as select _wstartts, sum(k) from st1 " - "partition by tbname interval(10s) "); + "create stream stream1 trigger at_once into outstb as select _wstartts, sum(k) from st1 partition " + "by tbname interval(10s) "); if (taos_errno(pRes) != 0) { printf("failed to create stream stream1, reason:%s\n", taos_errstr(pRes)); return -1; diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 8c4d43d5d3d8438e8775073cfeb66f3c90c77f19..29eebc332ba569e5f2d9ca5cb3af449e56f72abd 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -851,7 +851,6 @@ typedef struct { int32_t tSerializeSServerVerRsp(void* buf, int32_t bufLen, SServerVerRsp* pRsp); int32_t tDeserializeSServerVerRsp(void* buf, int32_t bufLen, SServerVerRsp* pRsp); - typedef struct SQueryNodeAddr { int32_t nodeId; // vgId or qnodeId SEpSet epSet; @@ -878,7 +877,6 @@ int32_t tSerializeSDnodeListRsp(void* buf, int32_t bufLen, SDnodeListRsp* pRsp); int32_t tDeserializeSDnodeListRsp(void* buf, int32_t bufLen, SDnodeListRsp* pRsp); void tFreeSDnodeListRsp(SDnodeListRsp* pRsp); - typedef struct { SArray* pArray; // Array of SUseDbRsp } SUseDbBatchRsp; @@ -1245,12 +1243,12 @@ int32_t tSerializeSShowVariablesReq(void* buf, int32_t bufLen, SShowVariablesReq int32_t tDeserializeSShowVariablesReq(void* buf, int32_t bufLen, SShowVariablesReq* pReq); typedef struct { - char name[TSDB_CONFIG_OPTION_LEN + 1]; - char value[TSDB_CONFIG_VALUE_LEN + 1]; + char name[TSDB_CONFIG_OPTION_LEN + 1]; + char value[TSDB_CONFIG_VALUE_LEN + 1]; } SVariablesInfo; typedef struct { - SArray *variables; //SArray + SArray* variables; // SArray } SShowVariablesRsp; int32_t tSerializeSShowVariablesRsp(void* buf, int32_t bufLen, SShowVariablesRsp* pReq); @@ -1258,7 +1256,6 @@ int32_t tDeserializeSShowVariablesRsp(void* buf, int32_t bufLen, SShowVariablesR void tFreeSShowVariablesRsp(SShowVariablesRsp* pRsp); - /* * sql: show tables like '%a_%' * payload is the query condition, e.g., '%a_%' @@ -1308,6 +1305,8 @@ typedef struct { int32_t compLen; int32_t numOfRows; int32_t numOfCols; + int64_t skey; + int64_t ekey; char data[]; } SRetrieveTableRsp; diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 5e2f5bc2dd067f9c61393607f0f93fdd68c80d34..21158bb0a29ddc1b85d75426297dc56c86925373 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -634,7 +634,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { if (dropReq.igNotExists) { mDebug("stream:%s, not exist, ignore not exist is set", dropReq.name); sdbRelease(pMnode->pSdb, pStream); - return -1; + return 0; } else { terrno = TSDB_CODE_MND_STREAM_NOT_EXIST; return -1; diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index ef328ecf848de7d0fdde0ab8780c9cefd62943bc..3efe3e6c9c7511d140c26a94231e3858df7b4203 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -27,11 +27,14 @@ int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock ASSERT(pReq->blockNum == taosArrayGetSize(pReq->dataLen)); for (int32_t i = 0; i < blockNum; i++) { - int32_t len = *(int32_t*)taosArrayGet(pReq->dataLen, i); + /*int32_t len = *(int32_t*)taosArrayGet(pReq->dataLen, i);*/ SRetrieveTableRsp* pRetrieve = taosArrayGetP(pReq->data, i); SSDataBlock* pDataBlock = taosArrayGet(pArray, i); blockCompressDecode(pDataBlock, htonl(pRetrieve->numOfCols), htonl(pRetrieve->numOfRows), pRetrieve->data); // TODO: refactor + pDataBlock->info.window.skey = be64toh(pRetrieve->skey); + pDataBlock->info.window.ekey = be64toh(pRetrieve->ekey); + pDataBlock->info.type = pRetrieve->streamBlockType; pDataBlock->info.childId = pReq->upstreamChildId; } @@ -46,8 +49,14 @@ int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock } taosArraySetSize(pArray, 1); SRetrieveTableRsp* pRetrieve = pReq->pRetrieve; - SSDataBlock* pBlock = taosArrayGet(pArray, 0); - blockCompressDecode(pBlock, htonl(pRetrieve->numOfCols), htonl(pRetrieve->numOfRows), pRetrieve->data); + SSDataBlock* pDataBlock = taosArrayGet(pArray, 0); + blockCompressDecode(pDataBlock, htonl(pRetrieve->numOfCols), htonl(pRetrieve->numOfRows), pRetrieve->data); + // TODO: refactor + pDataBlock->info.window.skey = be64toh(pRetrieve->skey); + pDataBlock->info.window.ekey = be64toh(pRetrieve->ekey); + + pDataBlock->info.type = pRetrieve->streamBlockType; + pData->blocks = pArray; return 0; } diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index a8b18210dd51f0baff3b649e372adeb8d4783ac0..1f51a927e7a7d091c1018e104f49d31f57dde4cf 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -104,6 +104,8 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock) pRetrieve->streamBlockType = pBlock->info.type; pRetrieve->numOfRows = htonl(pBlock->info.rows); pRetrieve->numOfCols = htonl(numOfCols); + pRetrieve->skey = htobe64(pBlock->info.window.skey); + pRetrieve->ekey = htobe64(pBlock->info.window.ekey); int32_t actualLen = 0; blockCompressEncode(pBlock, pRetrieve->data, &actualLen, numOfCols, false); @@ -171,6 +173,8 @@ static int32_t streamAddBlockToDispatchMsg(const SSDataBlock* pBlock, SStreamDis pRetrieve->completed = 1; pRetrieve->streamBlockType = pBlock->info.type; pRetrieve->numOfRows = htonl(pBlock->info.rows); + pRetrieve->skey = htobe64(pBlock->info.window.skey); + pRetrieve->ekey = htobe64(pBlock->info.window.ekey); int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock); pRetrieve->numOfCols = htonl(numOfCols);