未验证 提交 f6ebba75 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #15256 from taosdata/feature/stream

feat(stream): add version in stream block
...@@ -103,6 +103,7 @@ typedef struct SDataBlockInfo { ...@@ -103,6 +103,7 @@ typedef struct SDataBlockInfo {
int16_t hasVarCol; int16_t hasVarCol;
uint32_t capacity; uint32_t capacity;
// TODO: optimize and remove following // TODO: optimize and remove following
int64_t version; // used for stream, and need serialization
int32_t childId; // used for stream, do not serialize int32_t childId; // used for stream, do not serialize
EStreamType type; // used for stream, do not serialize EStreamType type; // used for stream, do not serialize
STimeWindow calWin; // used for stream, do not serialize STimeWindow calWin; // used for stream, do not serialize
......
...@@ -1359,6 +1359,7 @@ typedef struct { ...@@ -1359,6 +1359,7 @@ typedef struct {
int32_t numOfCols; int32_t numOfCols;
int64_t skey; int64_t skey;
int64_t ekey; int64_t ekey;
int64_t version; // for stream
char data[]; char data[];
} SRetrieveTableRsp; } SRetrieveTableRsp;
......
...@@ -142,6 +142,7 @@ static FORCE_INLINE void* streamQueueNextItem(SStreamQueue* queue) { ...@@ -142,6 +142,7 @@ static FORCE_INLINE void* streamQueueNextItem(SStreamQueue* queue) {
ASSERT(queue->qItem != NULL); ASSERT(queue->qItem != NULL);
return streamQueueCurItem(queue); return streamQueueCurItem(queue);
} else { } else {
queue->qItem = NULL;
taosGetQitem(queue->qall, &queue->qItem); taosGetQitem(queue->qall, &queue->qItem);
if (queue->qItem == NULL) { if (queue->qItem == NULL) {
taosReadAllQitems(queue->queue, queue->qall); taosReadAllQitems(queue->queue, queue->qall);
......
...@@ -868,7 +868,10 @@ int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topicName) ...@@ -868,7 +868,10 @@ int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topicName)
} }
// iter all vnode to delete handle // iter all vnode to delete handle
ASSERT(taosHashGetSize(pSub->consumerHash) == 0); if (taosHashGetSize(pSub->consumerHash) != 0) {
sdbRelease(pSdb, pSub);
return -1;
}
int32_t sz = taosArrayGetSize(pSub->unassignedVgs); int32_t sz = taosArrayGetSize(pSub->unassignedVgs);
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
SMqVgEp *pVgEp = taosArrayGetP(pSub->unassignedVgs, i); SMqVgEp *pVgEp = taosArrayGetP(pSub->unassignedVgs, i);
......
...@@ -583,6 +583,7 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { ...@@ -583,6 +583,7 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
mndTransSetDbName(pTrans, pTopic->db, NULL); mndTransSetDbName(pTrans, pTopic->db, NULL);
if (pTrans == NULL) { if (pTrans == NULL) {
mError("topic:%s, failed to drop since %s", pTopic->name, terrstr()); mError("topic:%s, failed to drop since %s", pTopic->name, terrstr());
mndReleaseTopic(pMnode, pTopic);
return -1; return -1;
} }
...@@ -590,11 +591,17 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { ...@@ -590,11 +591,17 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
if (mndDropOffsetByTopic(pMnode, pTrans, dropReq.name) < 0) { if (mndDropOffsetByTopic(pMnode, pTrans, dropReq.name) < 0) {
ASSERT(0); ASSERT(0);
mndTransDrop(pTrans);
mndReleaseTopic(pMnode, pTopic);
return -1; return -1;
} }
// TODO check if rebalancing
if (mndDropSubByTopic(pMnode, pTrans, dropReq.name) < 0) { if (mndDropSubByTopic(pMnode, pTrans, dropReq.name) < 0) {
ASSERT(0); /*ASSERT(0);*/
mError("topic:%s, failed to drop since %s", pTopic->name, terrstr());
mndTransDrop(pTrans);
mndReleaseTopic(pMnode, pTopic);
return -1; return -1;
} }
......
...@@ -249,6 +249,8 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) ...@@ -249,6 +249,8 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
return -1; return -1;
} }
memcpy(data, msg, msgLen); memcpy(data, msg, msgLen);
SSubmitReq* pReq = (SSubmitReq*)data;
pReq->version = ver;
tqProcessStreamTrigger(pTq, data); tqProcessStreamTrigger(pTq, data);
} }
......
...@@ -314,6 +314,7 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader) { ...@@ -314,6 +314,7 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader) {
pBlock->info.uid = pReader->msgIter.uid; pBlock->info.uid = pReader->msgIter.uid;
pBlock->info.rows = pReader->msgIter.numOfRows; pBlock->info.rows = pReader->msgIter.numOfRows;
pBlock->info.version = pReader->pMsg->version;
while ((row = tGetSubmitBlkNext(&pReader->blkIter)) != NULL) { while ((row = tGetSubmitBlkNext(&pReader->blkIter)) != NULL) {
tdSTSRowIterReset(&iter, row); tdSTSRowIterReset(&iter, row);
......
...@@ -34,6 +34,7 @@ int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock ...@@ -34,6 +34,7 @@ int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock
// TODO: refactor // TODO: refactor
pDataBlock->info.window.skey = be64toh(pRetrieve->skey); pDataBlock->info.window.skey = be64toh(pRetrieve->skey);
pDataBlock->info.window.ekey = be64toh(pRetrieve->ekey); pDataBlock->info.window.ekey = be64toh(pRetrieve->ekey);
pDataBlock->info.version = be64toh(pRetrieve->version);
pDataBlock->info.type = pRetrieve->streamBlockType; pDataBlock->info.type = pRetrieve->streamBlockType;
pDataBlock->info.childId = pReq->upstreamChildId; pDataBlock->info.childId = pReq->upstreamChildId;
...@@ -54,6 +55,7 @@ int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock ...@@ -54,6 +55,7 @@ int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock
// TODO: refactor // TODO: refactor
pDataBlock->info.window.skey = be64toh(pRetrieve->skey); pDataBlock->info.window.skey = be64toh(pRetrieve->skey);
pDataBlock->info.window.ekey = be64toh(pRetrieve->ekey); pDataBlock->info.window.ekey = be64toh(pRetrieve->ekey);
pDataBlock->info.version = be64toh(pRetrieve->version);
pDataBlock->info.type = pRetrieve->streamBlockType; pDataBlock->info.type = pRetrieve->streamBlockType;
......
...@@ -108,6 +108,7 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock) ...@@ -108,6 +108,7 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock)
pRetrieve->numOfCols = htonl(numOfCols); pRetrieve->numOfCols = htonl(numOfCols);
pRetrieve->skey = htobe64(pBlock->info.window.skey); pRetrieve->skey = htobe64(pBlock->info.window.skey);
pRetrieve->ekey = htobe64(pBlock->info.window.ekey); pRetrieve->ekey = htobe64(pBlock->info.window.ekey);
pRetrieve->version = htobe64(pBlock->info.version);
int32_t actualLen = 0; int32_t actualLen = 0;
blockEncode(pBlock, pRetrieve->data, &actualLen, numOfCols, false); blockEncode(pBlock, pRetrieve->data, &actualLen, numOfCols, false);
...@@ -182,6 +183,7 @@ static int32_t streamAddBlockToDispatchMsg(const SSDataBlock* pBlock, SStreamDis ...@@ -182,6 +183,7 @@ static int32_t streamAddBlockToDispatchMsg(const SSDataBlock* pBlock, SStreamDis
pRetrieve->numOfRows = htonl(pBlock->info.rows); pRetrieve->numOfRows = htonl(pBlock->info.rows);
pRetrieve->skey = htobe64(pBlock->info.window.skey); pRetrieve->skey = htobe64(pBlock->info.window.skey);
pRetrieve->ekey = htobe64(pBlock->info.window.ekey); pRetrieve->ekey = htobe64(pBlock->info.window.ekey);
pRetrieve->version = htobe64(pBlock->info.version);
int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock); int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock);
pRetrieve->numOfCols = htonl(numOfCols); pRetrieve->numOfCols = htonl(numOfCols);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册