未验证 提交 5478360b 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #14734 from taosdata/feature/stream

feat(tmq): support consume from tsdb then wal
...@@ -36,6 +36,7 @@ typedef struct SReadHandle { ...@@ -36,6 +36,7 @@ typedef struct SReadHandle {
void* vnode; void* vnode;
void* mnd; void* mnd;
SMsgCb* pMsgCb; SMsgCb* pMsgCb;
int64_t version;
bool initMetaReader; bool initMetaReader;
bool initTableReader; bool initTableReader;
bool initTqReader; bool initTqReader;
......
...@@ -89,6 +89,8 @@ typedef struct { ...@@ -89,6 +89,8 @@ typedef struct {
STqExecTb execTb; STqExecTb execTb;
STqExecDb execDb; STqExecDb execDb;
}; };
// TODO remove it
int64_t tsdbEndVer;
} STqExecHandle; } STqExecHandle;
......
...@@ -483,6 +483,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { ...@@ -483,6 +483,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
/*for (int32_t i = 0; i < 5; i++) {*/ /*for (int32_t i = 0; i < 5; i++) {*/
/*pHandle->execHandle.pExecReader[i] = tqOpenReader(pTq->pVnode);*/ /*pHandle->execHandle.pExecReader[i] = tqOpenReader(pTq->pVnode);*/
/*}*/ /*}*/
int64_t ver = walGetCommittedVer(pTq->pVnode->pWal);
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
pHandle->execHandle.execCol.qmsg = req.qmsg; pHandle->execHandle.execCol.qmsg = req.qmsg;
req.qmsg = NULL; req.qmsg = NULL;
...@@ -493,6 +494,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { ...@@ -493,6 +494,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
.vnode = pTq->pVnode, .vnode = pTq->pVnode,
.initTableReader = true, .initTableReader = true,
.initTqReader = true, .initTqReader = true,
.version = ver,
}; };
pHandle->execHandle.execCol.task[i] = qCreateStreamExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle); pHandle->execHandle.execCol.task[i] = qCreateStreamExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle);
ASSERT(pHandle->execHandle.execCol.task[i]); ASSERT(pHandle->execHandle.execCol.task[i]);
...@@ -501,6 +503,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { ...@@ -501,6 +503,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
ASSERT(scanner); ASSERT(scanner);
pHandle->execHandle.pExecReader[i] = qExtractReaderFromStreamScanner(scanner); pHandle->execHandle.pExecReader[i] = qExtractReaderFromStreamScanner(scanner);
ASSERT(pHandle->execHandle.pExecReader[i]); ASSERT(pHandle->execHandle.pExecReader[i]);
pHandle->execHandle.tsdbEndVer = ver;
} }
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) { } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
for (int32_t i = 0; i < 5; i++) { for (int32_t i = 0; i < 5; i++) {
......
...@@ -96,6 +96,12 @@ int64_t tqScan(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOffset ...@@ -96,6 +96,12 @@ int64_t tqScan(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOffset
} }
} }
if (pRsp->blockNum == 0 && pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
tqOffsetResetToLog(pOffset, pExec->tsdbEndVer + 1);
qStreamPrepareScan(task, pOffset);
continue;
}
void* meta = qStreamExtractMetaMsg(task); void* meta = qStreamExtractMetaMsg(task);
if (meta != NULL) { if (meta != NULL) {
// tq add meta to rsp // tq add meta to rsp
...@@ -107,7 +113,7 @@ int64_t tqScan(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOffset ...@@ -107,7 +113,7 @@ int64_t tqScan(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOffset
ASSERT(pRsp->rspOffset.type != 0); ASSERT(pRsp->rspOffset.type != 0);
if (pRsp->rspOffset.type == TMQ_OFFSET__LOG) { if (pRsp->reqOffset.type == TMQ_OFFSET__LOG) {
ASSERT(pRsp->rspOffset.version + 1 >= pRsp->reqOffset.version); ASSERT(pRsp->rspOffset.version + 1 >= pRsp->reqOffset.version);
} }
......
...@@ -1493,6 +1493,9 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys ...@@ -1493,6 +1493,9 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
if (pHandle) { if (pHandle) {
SOperatorInfo* pTableScanOp = createTableScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo); SOperatorInfo* pTableScanOp = createTableScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo);
STableScanInfo* pSTInfo = (STableScanInfo*)pTableScanOp->info; STableScanInfo* pSTInfo = (STableScanInfo*)pTableScanOp->info;
if (pHandle->version > 0) {
pSTInfo->cond.endVersion = pHandle->version;
}
SArray* tableList = taosArrayGetP(pTaskInfo->tableqinfoList.pGroupList, 0); SArray* tableList = taosArrayGetP(pTaskInfo->tableqinfoList.pGroupList, 0);
if (pHandle->initTableReader) { if (pHandle->initTableReader) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册