提交 4f107908 编写于 作者: L Liu Jicong

fix(tmq): reset window

上级 94f07836
...@@ -1547,7 +1547,7 @@ SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) { ...@@ -1547,7 +1547,7 @@ SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) {
} }
int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
/*printf("call poll\n");*/ /*tscDebug("call poll");*/
for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) { for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) { for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
...@@ -1708,6 +1708,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { ...@@ -1708,6 +1708,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
} }
TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
/*tscDebug("call poll1");*/
void* rspObj; void* rspObj;
int64_t startTime = taosGetTimestampMs(); int64_t startTime = taosGetTimestampMs();
......
...@@ -5398,9 +5398,9 @@ int32_t tFormatOffset(char *buf, int32_t maxLen, const STqOffsetVal *pVal) { ...@@ -5398,9 +5398,9 @@ int32_t tFormatOffset(char *buf, int32_t maxLen, const STqOffsetVal *pVal) {
} else if (pVal->type == TMQ_OFFSET__LOG) { } else if (pVal->type == TMQ_OFFSET__LOG) {
snprintf(buf, maxLen, "offset(log) ver:%ld", pVal->version); snprintf(buf, maxLen, "offset(log) ver:%ld", pVal->version);
} else if (pVal->type == TMQ_OFFSET__SNAPSHOT_DATA) { } else if (pVal->type == TMQ_OFFSET__SNAPSHOT_DATA) {
snprintf(buf, maxLen, "offset(snapshot data) uid:%ld, ts:%ld", pVal->uid, pVal->ts); snprintf(buf, maxLen, "offset(ss data) uid:%ld, ts:%ld", pVal->uid, pVal->ts);
} else if (pVal->type == TMQ_OFFSET__SNAPSHOT_META) { } else if (pVal->type == TMQ_OFFSET__SNAPSHOT_META) {
snprintf(buf, maxLen, "offset(snapshot meta) uid:%ld, ts:%ld", pVal->uid, pVal->ts); snprintf(buf, maxLen, "offset(ss meta) uid:%ld, ts:%ld", pVal->uid, pVal->ts);
} else { } else {
ASSERT(0); ASSERT(0);
} }
......
...@@ -154,10 +154,10 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con ...@@ -154,10 +154,10 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con
}; };
tmsgSendRsp(&rsp); tmsgSendRsp(&rsp);
char buf1[50]; char buf1[80];
char buf2[50]; char buf2[80];
tFormatOffset(buf1, 50, &pRsp->reqOffset); tFormatOffset(buf1, 80, &pRsp->reqOffset);
tFormatOffset(buf2, 50, &pRsp->rspOffset); tFormatOffset(buf2, 80, &pRsp->rspOffset);
tqDebug("vg %d from consumer %ld (epoch %d) send rsp, block num: %d, reqOffset: %s, rspOffset: %s", tqDebug("vg %d from consumer %ld (epoch %d) send rsp, block num: %d, reqOffset: %s, rspOffset: %s",
TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2); TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2);
...@@ -238,8 +238,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ...@@ -238,8 +238,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
STqOffsetVal fetchOffsetNew; STqOffsetVal fetchOffsetNew;
// 1.find handle // 1.find handle
char buf[50]; char buf[80];
tFormatOffset(buf, 50, &reqOffset); tFormatOffset(buf, 80, &reqOffset);
tqDebug("tmq poll: consumer %ld (epoch %d) recv poll req in vg %d, req offset %s", consumerId, pReq->epoch, tqDebug("tmq poll: consumer %ld (epoch %d) recv poll req in vg %d, req offset %s", consumerId, pReq->epoch,
TD_VID(pTq->pVnode), buf); TD_VID(pTq->pVnode), buf);
...@@ -360,7 +360,6 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ...@@ -360,7 +360,6 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
tqInfo("fetch meta msg, ver: %ld, type: %d", pHead->version, pHead->msgType); tqInfo("fetch meta msg, ver: %ld, type: %d", pHead->version, pHead->msgType);
SMqMetaRsp metaRsp = {0}; SMqMetaRsp metaRsp = {0};
metaRsp.reqOffset = pReq->reqOffset.version; metaRsp.reqOffset = pReq->reqOffset.version;
/*tqOffsetResetToLog(&metaR)*/
metaRsp.rspOffset = fetchVer; metaRsp.rspOffset = fetchVer;
metaRsp.resMsgType = pHead->msgType; metaRsp.resMsgType = pHead->msgType;
metaRsp.metaRspLen = pHead->bodyLen; metaRsp.metaRspLen = pHead->bodyLen;
...@@ -380,18 +379,14 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ...@@ -380,18 +379,14 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
// 2. get data (rebuild reader if needed) // 2. get data (rebuild reader if needed)
// 3. get new uid and ts // 3. get new uid and ts
char formatBuf[50]; tqInfo("retrieve using snapshot req offset: uid %ld ts %ld", dataRsp.reqOffset.uid, dataRsp.reqOffset.ts);
tFormatOffset(formatBuf, 50, &dataRsp.reqOffset);
tqInfo("retrieve using snapshot req offset %s", formatBuf);
if (tqScanSnapshot(pTq, &pHandle->execHandle, &dataRsp, fetchOffsetNew, workerId) < 0) { if (tqScanSnapshot(pTq, &pHandle->execHandle, &dataRsp, fetchOffsetNew, workerId) < 0) {
ASSERT(0); ASSERT(0);
} }
// 4. send rsp // 4. send rsp
if (dataRsp.blockNum != 0) { if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) {
if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) { code = -1;
code = -1;
}
} }
} else if (fetchOffsetNew.type == TMQ_OFFSET__SNAPSHOT_META) { } else if (fetchOffsetNew.type == TMQ_OFFSET__SNAPSHOT_META) {
ASSERT(0); ASSERT(0);
......
...@@ -2836,12 +2836,10 @@ int32_t doPrepareScan(SOperatorInfo* pOperator, uint64_t uid, int64_t ts) { ...@@ -2836,12 +2836,10 @@ int32_t doPrepareScan(SOperatorInfo* pOperator, uint64_t uid, int64_t ts) {
if (pInfo->lastStatus.uid != uid || pInfo->lastStatus.ts != ts) { if (pInfo->lastStatus.uid != uid || pInfo->lastStatus.ts != ts) {
tsdbSetTableId(pInfo->dataReader, uid); tsdbSetTableId(pInfo->dataReader, uid);
SQueryTableDataCond tmpCond = pInfo->cond; int64_t oldSkey = pInfo->cond.twindows[0].skey;
tmpCond.twindows[0] = (STimeWindow){ pInfo->cond.twindows[0].skey = ts;
.skey = ts, tsdbResetReadHandle(pInfo->dataReader, &pInfo->cond, 0);
.ekey = INT64_MAX, pInfo->cond.twindows[0].skey = oldSkey;
};
tsdbResetReadHandle(pInfo->dataReader, &tmpCond, 0);
pInfo->scanTimes = 0; pInfo->scanTimes = 0;
pInfo->curTWinIdx = 0; pInfo->curTWinIdx = 0;
} }
......
...@@ -518,7 +518,6 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { ...@@ -518,7 +518,6 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
// if scan table by table // if scan table by table
if (pInfo->scanMode == TABLE_SCAN__TABLE_ORDER) { if (pInfo->scanMode == TABLE_SCAN__TABLE_ORDER) {
// check status
while (1) { while (1) {
SSDataBlock* result = doTableScanGroup(pOperator); SSDataBlock* result = doTableScanGroup(pOperator);
if (result) { if (result) {
...@@ -530,7 +529,6 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { ...@@ -530,7 +529,6 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
return NULL; return NULL;
} }
STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, pInfo->currentTable); STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, pInfo->currentTable);
/*pTableInfo->uid */
tsdbSetTableId(pInfo->dataReader, pTableInfo->uid); tsdbSetTableId(pInfo->dataReader, pTableInfo->uid);
tsdbResetReadHandle(pInfo->dataReader, &pInfo->cond, 0); tsdbResetReadHandle(pInfo->dataReader, &pInfo->cond, 0);
pInfo->scanTimes = 0; pInfo->scanTimes = 0;
......
...@@ -111,7 +111,7 @@ endi ...@@ -111,7 +111,7 @@ endi
$consumerId = 0 $consumerId = 0
$totalMsgOfStb = $ctbNum * $rowsPerCtb $totalMsgOfStb = $ctbNum * $rowsPerCtb
$expectmsgcnt = 1 $expectmsgcnt = 1000000
$expectrowcnt = 100 $expectrowcnt = 100
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit ) sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
...@@ -131,9 +131,6 @@ endi ...@@ -131,9 +131,6 @@ endi
if $data[0][1] != $consumerId then if $data[0][1] != $consumerId then
return -1 return -1
endi endi
if $data[0][2] != $expectmsgcnt then
return -1
endi
if $data[0][3] != $expectrowcnt then if $data[0][3] != $expectrowcnt then
return -1 return -1
endi endi
...@@ -183,7 +180,7 @@ endi ...@@ -183,7 +180,7 @@ endi
$consumerId = 0 $consumerId = 0
$totalMsgOfCtb = $rowsPerCtb $totalMsgOfCtb = $rowsPerCtb
$expectmsgcnt = 1 $expectmsgcnt = 1000000
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit ) sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
print == start consumer to pull msgs from ctb print == start consumer to pull msgs from ctb
...@@ -254,7 +251,7 @@ endi ...@@ -254,7 +251,7 @@ endi
$consumerId = 0 $consumerId = 0
$totalMsgOfNtb = $rowsPerCtb $totalMsgOfNtb = $rowsPerCtb
$expectmsgcnt = $totalMsgOfNtb $expectmsgcnt = 1000000
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit ) sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
print == start consumer to pull msgs from ntb print == start consumer to pull msgs from ntb
......
...@@ -80,7 +80,7 @@ $topicList = $topicList . ' ...@@ -80,7 +80,7 @@ $topicList = $topicList . '
$consumerId = 0 $consumerId = 0
$totalMsgOfOneTopic = $ctbNum * $rowsPerCtb $totalMsgOfOneTopic = $ctbNum * $rowsPerCtb
$totalMsgOfStb = $totalMsgOfOneTopic * $topicNum $totalMsgOfStb = $totalMsgOfOneTopic * $topicNum
$expectmsgcnt = $totalMsgOfStb $expectmsgcnt = 1000000
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit ) sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
...@@ -168,7 +168,7 @@ $consumerId = 0 ...@@ -168,7 +168,7 @@ $consumerId = 0
$totalMsgOfOneTopic = $rowsPerCtb $totalMsgOfOneTopic = $rowsPerCtb
$totalMsgOfCtb = $totalMsgOfOneTopic * $topicNum $totalMsgOfCtb = $totalMsgOfOneTopic * $topicNum
$expectmsgcnt = $totalMsgOfCtb $expectmsgcnt = 1000000
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit ) sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
$topicList = ' . topic_ctb_function $topicList = ' . topic_ctb_function
...@@ -245,7 +245,7 @@ $topicList = $topicList . ' ...@@ -245,7 +245,7 @@ $topicList = $topicList . '
$consumerId = 0 $consumerId = 0
$totalMsgOfOneTopic = $rowsPerCtb $totalMsgOfOneTopic = $rowsPerCtb
$totalMsgOfNtb = $totalMsgOfOneTopic * $topicNum $totalMsgOfNtb = $totalMsgOfOneTopic * $topicNum
$expectmsgcnt = $totalMsgOfNtb $expectmsgcnt = 1000000
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit ) sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册