提交 94f07836 编写于 作者: L Liu Jicong

fix(tmq): set scan rsp offset correctly

上级 ca1c961f
...@@ -172,7 +172,7 @@ int32_t qDeserializeTaskStatus(qTaskInfo_t tinfo, const char* pInput, int32_t le ...@@ -172,7 +172,7 @@ int32_t qDeserializeTaskStatus(qTaskInfo_t tinfo, const char* pInput, int32_t le
*/ */
int32_t qGetStreamScanStatus(qTaskInfo_t tinfo, uint64_t* uid, int64_t* ts); int32_t qGetStreamScanStatus(qTaskInfo_t tinfo, uint64_t* uid, int64_t* ts);
int32_t qPrepareScan(qTaskInfo_t tinfo, uint64_t uid, int64_t ts); int32_t qStreamPrepareScan(qTaskInfo_t tinfo, uint64_t uid, int64_t ts);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -389,7 +389,6 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ...@@ -389,7 +389,6 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
// 4. send rsp // 4. send rsp
if (dataRsp.blockNum != 0) { if (dataRsp.blockNum != 0) {
tqOffsetResetToData(&dataRsp.rspOffset, 0, 0);
if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) { if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) {
code = -1; code = -1;
} }
......
...@@ -63,10 +63,11 @@ int32_t tqScanSnapshot(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, S ...@@ -63,10 +63,11 @@ int32_t tqScanSnapshot(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, S
ASSERT(pExec->subType == TOPIC_SUB_TYPE__COLUMN); ASSERT(pExec->subType == TOPIC_SUB_TYPE__COLUMN);
qTaskInfo_t task = pExec->execCol.task[workerId]; qTaskInfo_t task = pExec->execCol.task[workerId];
if (qStreamScanSnapshot(task) < 0) { /*if (qStreamScanSnapshot(task) < 0) {*/
ASSERT(0); /*ASSERT(0);*/
} /*}*/
if (qPrepareScan(task, offset.uid, offset.ts) < 0) {
if (qStreamPrepareScan(task, offset.uid, offset.ts) < 0) {
ASSERT(0); ASSERT(0);
} }
...@@ -86,7 +87,7 @@ int32_t tqScanSnapshot(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, S ...@@ -86,7 +87,7 @@ int32_t tqScanSnapshot(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, S
if (pRsp->withTbName) { if (pRsp->withTbName) {
pRsp->withTbName = 0; pRsp->withTbName = 0;
#if 1 #if 0
int64_t uid; int64_t uid;
int64_t ts; int64_t ts;
if (qGetStreamScanStatus(task, &uid, &ts) < 0) { if (qGetStreamScanStatus(task, &uid, &ts) < 0) {
......
...@@ -250,7 +250,7 @@ typedef struct SSampleExecInfo { ...@@ -250,7 +250,7 @@ typedef struct SSampleExecInfo {
enum { enum {
TABLE_SCAN__TABLE_ORDER = 1, TABLE_SCAN__TABLE_ORDER = 1,
TABLE_SCAN__BLOCK_ORDER = 1, TABLE_SCAN__BLOCK_ORDER = 2,
}; };
typedef struct STableScanInfo { typedef struct STableScanInfo {
...@@ -286,11 +286,6 @@ typedef struct STableScanInfo { ...@@ -286,11 +286,6 @@ typedef struct STableScanInfo {
int64_t ts; int64_t ts;
} lastStatus; } lastStatus;
struct {
uint64_t uid;
int64_t ts;
} expStatus;
int8_t scanMode; int8_t scanMode;
} STableScanInfo; } STableScanInfo;
......
...@@ -231,9 +231,15 @@ int32_t qDeserializeTaskStatus(qTaskInfo_t tinfo, const char* pInput, int32_t le ...@@ -231,9 +231,15 @@ int32_t qDeserializeTaskStatus(qTaskInfo_t tinfo, const char* pInput, int32_t le
return decodeOperator(pTaskInfo->pRoot, pInput, len); return decodeOperator(pTaskInfo->pRoot, pInput, len);
} }
int32_t qPrepareScan(qTaskInfo_t tinfo, uint64_t uid, int64_t ts) { int32_t qStreamPrepareScan(qTaskInfo_t tinfo, uint64_t uid, int64_t ts) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
if (uid == 0) {
STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, 0);
uid = pTableInfo->uid;
ts = INT64_MIN;
}
return doPrepareScan(pTaskInfo->pRoot, uid, ts); return doPrepareScan(pTaskInfo->pRoot, uid, ts);
} }
......
...@@ -2825,18 +2825,36 @@ int32_t doPrepareScan(SOperatorInfo* pOperator, uint64_t uid, int64_t ts) { ...@@ -2825,18 +2825,36 @@ int32_t doPrepareScan(SOperatorInfo* pOperator, uint64_t uid, int64_t ts) {
int32_t type = pOperator->operatorType; int32_t type = pOperator->operatorType;
if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
SStreamBlockScanInfo* pScanInfo = pOperator->info; SStreamBlockScanInfo* pScanInfo = pOperator->info;
STableScanInfo* pSnapShotScanInfo = pScanInfo->pSnapshotReadOp->info; pScanInfo->blockType = STREAM_INPUT__DATA_SCAN;
/**uid = pSnapShotScanInfo->scanStatus.uid;*/
/**ts = pSnapShotScanInfo->scanStatus.t;*/ STableScanInfo* pInfo = pScanInfo->pSnapshotReadOp->info;
if (pSnapShotScanInfo->lastStatus.uid != uid || pSnapShotScanInfo->lastStatus.ts != ts) {
// rebuild scan /*if (pSnapShotScanInfo->dataReader == NULL) {*/
// /*pSnapShotScanInfo->dataReader = tsdbReaderOpen(pHandle->vnode, &pSTInfo->cond, tableList, 0, 0);*/
/*pSnapShotScanInfo->scanMode = TABLE_SCAN__TABLE_ORDER;*/
/*}*/
if (pInfo->lastStatus.uid != uid || pInfo->lastStatus.ts != ts) {
tsdbSetTableId(pInfo->dataReader, uid);
SQueryTableDataCond tmpCond = pInfo->cond;
tmpCond.twindows[0] = (STimeWindow){
.skey = ts,
.ekey = INT64_MAX,
};
tsdbResetReadHandle(pInfo->dataReader, &tmpCond, 0);
pInfo->scanTimes = 0;
pInfo->curTWinIdx = 0;
} }
} else { } else {
if (pOperator->pDownstream[0] == NULL) { if (pOperator->numOfDownstream == 1) {
return TSDB_CODE_INVALID_PARA; return doPrepareScan(pOperator->pDownstream[0], uid, ts);
} else if (pOperator->numOfDownstream == 0) {
qError("failed to find stream scan operator to set the input data block");
return TSDB_CODE_QRY_APP_ERROR;
} else { } else {
doPrepareScan(pOperator->pDownstream[0], uid, ts); qError("join not supported for stream block scan");
return TSDB_CODE_QRY_APP_ERROR;
} }
} }
......
...@@ -519,35 +519,6 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { ...@@ -519,35 +519,6 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
// if scan table by table // if scan table by table
if (pInfo->scanMode == TABLE_SCAN__TABLE_ORDER) { if (pInfo->scanMode == TABLE_SCAN__TABLE_ORDER) {
// check status // check status
if (pInfo->lastStatus.uid == pInfo->expStatus.uid && pInfo->lastStatus.ts == pInfo->expStatus.ts) {
while (1) {
SSDataBlock* result = doTableScanGroup(pOperator);
if (result) {
return result;
}
// if no data, switch to next table and continue scan
pInfo->currentTable++;
if (pInfo->currentTable >= taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList)) {
return NULL;
}
STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, pInfo->currentTable);
/*pTableInfo->uid */
tsdbSetTableId(pInfo->dataReader, pTableInfo->uid);
tsdbResetReadHandle(pInfo->dataReader, &pInfo->cond, 0);
pInfo->scanTimes = 0;
pInfo->curTWinIdx = 0;
}
}
// reset to exp table and window start from ts
tsdbSetTableId(pInfo->dataReader, pInfo->expStatus.uid);
SQueryTableDataCond tmpCond = pInfo->cond;
tmpCond.twindows[0] = (STimeWindow){
.skey = pInfo->expStatus.ts,
.ekey = INT64_MAX,
};
tsdbResetReadHandle(pInfo->dataReader, &tmpCond, 0);
pInfo->scanTimes = 0;
pInfo->curTWinIdx = 0;
while (1) { while (1) {
SSDataBlock* result = doTableScanGroup(pOperator); SSDataBlock* result = doTableScanGroup(pOperator);
if (result) { if (result) {
......
...@@ -133,6 +133,8 @@ endi ...@@ -133,6 +133,8 @@ endi
$totalMsgCons = $totalMsgOfOneTopic + $totalMsgOfStb $totalMsgCons = $totalMsgOfOneTopic + $totalMsgOfStb
$sumOfRows = $data[0][3] + $data[1][3] $sumOfRows = $data[0][3] + $data[1][3]
if $sumOfRows != $totalMsgCons then if $sumOfRows != $totalMsgCons then
print actual: $sumOfRows
print expect: $totalMsgCons
return -1 return -1
endi endi
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册