提交 152fc5da 编写于 作者: H Haojun Liao

fix(stream):fix the race condition when creating new tsables.

上级 94c6af39
...@@ -68,14 +68,14 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs ...@@ -68,14 +68,14 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs
int32_t vgId = TD_VID(pTq->pVnode); int32_t vgId = TD_VID(pTq->pVnode);
if (qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType) < 0) { if (qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType) < 0) {
tqDebug("prepare scan failed, return, consumer:0x%"PRIx64, pHandle->consumerId); tqDebug("prepare scan failed, vgId:%d, consumer:0x%"PRIx64, vgId, pHandle->consumerId);
if (pOffset->type == TMQ_OFFSET__LOG) { if (pOffset->type == TMQ_OFFSET__LOG) {
pRsp->rspOffset = *pOffset; pRsp->rspOffset = *pOffset;
return 0; return 0;
} else { } else {
tqOffsetResetToLog(pOffset, pHandle->snapshotVer); tqOffsetResetToLog(pOffset, pHandle->snapshotVer);
if (qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType) < 0) { if (qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType) < 0) {
tqDebug("prepare scan failed, return, consumer:0x%"PRIx64, pHandle->consumerId); tqDebug("prepare scan failed, vgId:%d, consumer:0x%"PRIx64, vgId, pHandle->consumerId);
pRsp->rspOffset = *pOffset; pRsp->rspOffset = *pOffset;
return 0; return 0;
} }
......
...@@ -107,6 +107,7 @@ int32_t tableListGetGroupList(const STableListInfo* pTableList, int32_t ...@@ -107,6 +107,7 @@ int32_t tableListGetGroupList(const STableListInfo* pTableList, int32_t
uint64_t tableListGetSize(const STableListInfo* pTableList); uint64_t tableListGetSize(const STableListInfo* pTableList);
uint64_t tableListGetSuid(const STableListInfo* pTableList); uint64_t tableListGetSuid(const STableListInfo* pTableList);
STableKeyInfo* tableListGetInfo(const STableListInfo* pTableList, int32_t index); STableKeyInfo* tableListGetInfo(const STableListInfo* pTableList, int32_t index);
int32_t tableListFind(const STableListInfo* pTableList, uint64_t uid, int32_t startIndex);
size_t getResultRowSize(struct SqlFunctionCtx* pCtx, int32_t numOfOutput); size_t getResultRowSize(struct SqlFunctionCtx* pCtx, int32_t numOfOutput);
void initResultRowInfo(SResultRowInfo* pResultRowInfo); void initResultRowInfo(SResultRowInfo* pResultRowInfo);
......
...@@ -143,9 +143,6 @@ typedef struct { ...@@ -143,9 +143,6 @@ typedef struct {
SQueryTableDataCond tableCond; SQueryTableDataCond tableCond;
int64_t fillHistoryVer1; int64_t fillHistoryVer1;
int64_t fillHistoryVer2; int64_t fillHistoryVer2;
// int8_t triggerSaved;
// int64_t deleteMarkSaved;
SStreamState* pState; SStreamState* pState;
} SStreamTaskInfo; } SStreamTaskInfo;
...@@ -175,7 +172,6 @@ struct SExecTaskInfo { ...@@ -175,7 +172,6 @@ struct SExecTaskInfo {
int64_t owner; // if it is in execution int64_t owner; // if it is in execution
int32_t code; int32_t code;
int32_t qbufQuota; // total available buffer (in KB) during execution query int32_t qbufQuota; // total available buffer (in KB) during execution query
int64_t version; // used for stream to record wal version, why not move to sschemainfo int64_t version; // used for stream to record wal version, why not move to sschemainfo
SStreamTaskInfo streamInfo; SStreamTaskInfo streamInfo;
SSchemaInfo schemaInfo; SSchemaInfo schemaInfo;
...@@ -188,6 +184,7 @@ struct SExecTaskInfo { ...@@ -188,6 +184,7 @@ struct SExecTaskInfo {
SLocalFetch localFetch; SLocalFetch localFetch;
SArray* pResultBlockList; // result block list SArray* pResultBlockList; // result block list
STaskStopInfo stopInfo; STaskStopInfo stopInfo;
SRWLatch lock; // secure the access of STableListInfo
}; };
enum { enum {
......
...@@ -1800,6 +1800,21 @@ STableKeyInfo* tableListGetInfo(const STableListInfo* pTableList, int32_t index) ...@@ -1800,6 +1800,21 @@ STableKeyInfo* tableListGetInfo(const STableListInfo* pTableList, int32_t index)
return taosArrayGet(pTableList->pTableList, index); return taosArrayGet(pTableList->pTableList, index);
} }
int32_t tableListFind(const STableListInfo* pTableList, uint64_t uid, int32_t startIndex) {
int32_t numOfTables = taosArrayGetSize(pTableList->pTableList);
if (startIndex >= numOfTables) {
return -1;
}
for (int32_t i = startIndex; i < numOfTables; ++i) {
STableKeyInfo* p = taosArrayGet(pTableList->pTableList, i);
if (p->uid == uid) {
return i;
}
}
return -1;
}
uint64_t getTableGroupId(const STableListInfo* pTableList, uint64_t tableUid) { uint64_t getTableGroupId(const STableListInfo* pTableList, uint64_t tableUid) {
int32_t* slot = taosHashGet(pTableList->map, &tableUid, sizeof(tableUid)); int32_t* slot = taosHashGet(pTableList->map, &tableUid, sizeof(tableUid));
ASSERT(pTableList->map != NULL && slot != NULL); ASSERT(pTableList->map != NULL && slot != NULL);
......
...@@ -410,6 +410,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo ...@@ -410,6 +410,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
} }
STableListInfo* pTableListInfo = pTaskInfo->pTableInfoList; STableListInfo* pTableListInfo = pTaskInfo->pTableInfoList;
taosWLockLatch(&pTaskInfo->lock);
for (int32_t i = 0; i < numOfQualifiedTables; ++i) { for (int32_t i = 0; i < numOfQualifiedTables; ++i) {
uint64_t* uid = taosArrayGet(qa, i); uint64_t* uid = taosArrayGet(qa, i);
...@@ -424,6 +425,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo ...@@ -424,6 +425,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
taosMemoryFree(keyBuf); taosMemoryFree(keyBuf);
taosArrayDestroy(qa); taosArrayDestroy(qa);
taosWUnLockLatch(&pTaskInfo->lock);
return code; return code;
} }
} }
...@@ -445,6 +447,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo ...@@ -445,6 +447,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
tableListAddTableInfo(pTableListInfo, keyInfo.uid, keyInfo.groupId); tableListAddTableInfo(pTableListInfo, keyInfo.uid, keyInfo.groupId);
} }
taosWUnLockLatch(&pTaskInfo->lock);
if (keyBuf != NULL) { if (keyBuf != NULL) {
taosMemoryFree(keyBuf); taosMemoryFree(keyBuf);
} }
...@@ -452,7 +455,9 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo ...@@ -452,7 +455,9 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
taosArrayDestroy(qa); taosArrayDestroy(qa);
} else { // remove the table id in current list } else { // remove the table id in current list
qDebug(" %d remove child tables from the stream scanner", (int32_t)taosArrayGetSize(tableIdList)); qDebug(" %d remove child tables from the stream scanner", (int32_t)taosArrayGetSize(tableIdList));
taosWLockLatch(&pTaskInfo->lock);
code = tqReaderRemoveTbUidList(pScanInfo->tqReader, tableIdList); code = tqReaderRemoveTbUidList(pScanInfo->tqReader, tableIdList);
taosWUnLockLatch(&pTaskInfo->lock);
} }
return code; return code;
...@@ -1000,6 +1005,7 @@ int32_t qStreamRestoreParam(qTaskInfo_t tinfo) { ...@@ -1000,6 +1005,7 @@ int32_t qStreamRestoreParam(qTaskInfo_t tinfo) {
} }
return 0; return 0;
} }
bool qStreamRecoverScanFinished(qTaskInfo_t tinfo) { bool qStreamRecoverScanFinished(qTaskInfo_t tinfo) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
return pTaskInfo->streamInfo.recoverScanFinished; return pTaskInfo->streamInfo.recoverScanFinished;
...@@ -1082,6 +1088,9 @@ int32_t qStreamSetScanMemData(qTaskInfo_t tinfo, SPackedData submit) { ...@@ -1082,6 +1088,9 @@ int32_t qStreamSetScanMemData(qTaskInfo_t tinfo, SPackedData submit) {
int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType) { int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
SOperatorInfo* pOperator = pTaskInfo->pRoot; SOperatorInfo* pOperator = pTaskInfo->pRoot;
STableListInfo* pTableListInfo = pTaskInfo->pTableInfoList;
const char* id = GET_TASKID(pTaskInfo);
pTaskInfo->streamInfo.prepareStatus = *pOffset; pTaskInfo->streamInfo.prepareStatus = *pOffset;
pTaskInfo->streamInfo.returned = 0; pTaskInfo->streamInfo.returned = 0;
...@@ -1095,20 +1104,23 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT ...@@ -1095,20 +1104,23 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
// TODO add more check // TODO add more check
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
if(pOperator->numOfDownstream != 1){ if(pOperator->numOfDownstream != 1){
qError("pOperator->numOfDownstream != 1:%d", pOperator->numOfDownstream); qError("invalid operator, number of downstream:%d, %s", pOperator->numOfDownstream, id);
return -1; return -1;
} }
pOperator = pOperator->pDownstream[0]; pOperator = pOperator->pDownstream[0];
} }
SStreamScanInfo* pInfo = pOperator->info; SStreamScanInfo* pInfo = pOperator->info;
STableScanInfo* pScanInfo = pInfo->pTableScanOp->info;
STableScanBase* pScanBaseInfo = &pScanInfo->base;
if (pOffset->type == TMQ_OFFSET__LOG) { if (pOffset->type == TMQ_OFFSET__LOG) {
STableScanInfo* pTSInfo = pInfo->pTableScanOp->info; tsdbReaderClose(pScanBaseInfo->dataReader);
tsdbReaderClose(pTSInfo->base.dataReader); pScanBaseInfo->dataReader = NULL;
pTSInfo->base.dataReader = NULL;
// let's seek to the next version in wal file // let's seek to the next version in wal file
if (tqSeekVer(pInfo->tqReader, pOffset->version + 1, pTaskInfo->id.str) < 0) { if (tqSeekVer(pInfo->tqReader, pOffset->version + 1, pTaskInfo->id.str) < 0) {
qError("tqSeekVer failed ver:%" PRId64, pOffset->version + 1); qError("tqSeekVer failed ver:%, %s" PRId64, pOffset->version + 1, id);
return -1; return -1;
} }
} else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) { } else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
...@@ -1117,68 +1129,66 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT ...@@ -1117,68 +1129,66 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
int64_t uid = pOffset->uid; int64_t uid = pOffset->uid;
int64_t ts = pOffset->ts; int64_t ts = pOffset->ts;
// this value may be changed if new tables are created
taosRLockLatch(&pTaskInfo->lock);
int32_t numOfTables = tableListGetSize(pTableListInfo);
if (uid == 0) { if (uid == 0) {
if (tableListGetSize(pTaskInfo->pTableInfoList) != 0) { if (numOfTables != 0) {
STableKeyInfo* pTableInfo = tableListGetInfo(pTaskInfo->pTableInfoList, 0); STableKeyInfo* pTableInfo = tableListGetInfo(pTableListInfo, 0);
uid = pTableInfo->uid; uid = pTableInfo->uid;
ts = INT64_MIN; ts = INT64_MIN;
} else { } else {
qError("uid == 0 and tablelist size is 0"); taosRUnLockLatch(&pTaskInfo->lock);
qError("no table in table list, %s", id);
return -1; return -1;
} }
} }
/*if (pTaskInfo->streamInfo.lastStatus.type != TMQ_OFFSET__SNAPSHOT_DATA ||*/
/*pTaskInfo->streamInfo.lastStatus.uid != uid || pTaskInfo->streamInfo.lastStatus.ts != ts) {*/
STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
int32_t numOfTables = tableListGetSize(pTaskInfo->pTableInfoList);
qDebug("switch to table uid:%" PRId64 " ts:%" PRId64 "% "PRId64 " rows returned", uid, ts, pInfo->pTableScanOp->resultInfo.totalRows);
pInfo->pTableScanOp->resultInfo.totalRows = 0; pInfo->pTableScanOp->resultInfo.totalRows = 0;
bool found = false; // start from current accessed position
for (int32_t i = 0; i < numOfTables; i++) { int32_t index = tableListFind(pTableListInfo, uid, pScanInfo->currentTable);
STableKeyInfo* pTableInfo = tableListGetInfo(pTaskInfo->pTableInfoList, i); taosRUnLockLatch(&pTaskInfo->lock);
if (pTableInfo->uid == uid) {
found = true;
pTableScanInfo->currentTable = i;
break;
}
}
// TODO after dropping table, table may not found if (index >= 0) {
if(!found){ pScanInfo->currentTable = index;
qError("uid not found in tablelist %" PRId64, uid); } else {
qError("uid:%" PRIu64 " not found in table list, total:%d %s", uid, numOfTables, id);
return -1; return -1;
} }
if (pTableScanInfo->base.dataReader == NULL) { STableKeyInfo keyInfo = {.uid = uid};
STableKeyInfo* pList = tableListGetInfo(pTaskInfo->pTableInfoList, 0); if (pScanBaseInfo->dataReader == NULL) {
int32_t num = tableListGetSize(pTaskInfo->pTableInfoList); int32_t code = tsdbReaderOpen(pScanBaseInfo->readHandle.vnode, &pScanBaseInfo->cond, &keyInfo, 1,
pScanInfo->pResBlock, &pScanBaseInfo->dataReader, NULL);
if (tsdbReaderOpen(pTableScanInfo->base.readHandle.vnode, &pTableScanInfo->base.cond, pList, num, if (code != TSDB_CODE_SUCCESS) {
pTableScanInfo->pResBlock, &pTableScanInfo->base.dataReader, NULL) < 0 || qError("prepare read tsdb snapshot failed, uid:%" PRId64 ", code:%s %s", pOffset->uid, tstrerror(code), id);
pTableScanInfo->base.dataReader == NULL) { terrno = code;
qError("tsdbReaderOpen failed. uid:%" PRIi64, pOffset->uid);
return -1; return -1;
} }
} } else {
tsdbSetTableList(pScanBaseInfo->dataReader, &keyInfo, 1);
int64_t oldSkey = pScanBaseInfo->cond.twindows.skey;
// let's start from the next ts that returned to consumer.
pScanBaseInfo->cond.twindows.skey = ts + 1;
tsdbReaderReset(pScanBaseInfo->dataReader, &pScanBaseInfo->cond);
STableKeyInfo tki = {.uid = uid}; // restore the key value
tsdbSetTableList(pTableScanInfo->base.dataReader, &tki, 1); pScanBaseInfo->cond.twindows.skey = oldSkey;
int64_t oldSkey = pTableScanInfo->base.cond.twindows.skey; pScanInfo->scanTimes = 0;
pTableScanInfo->base.cond.twindows.skey = ts + 1;
tsdbReaderReset(pTableScanInfo->base.dataReader, &pTableScanInfo->base.cond);
pTableScanInfo->base.cond.twindows.skey = oldSkey;
pTableScanInfo->scanTimes = 0;
qDebug("tsdb reader offset seek snapshot to uid:%" PRId64 " ts %" PRId64 ", table cur set to %d , all table num %d", uid, qDebug("tsdb reader offset seek snapshot to uid:%" PRId64 " ts %" PRId64 " table index:%d numOfTable:%d, %s",
ts, pTableScanInfo->currentTable, numOfTables); uid, ts, pScanInfo->currentTable, numOfTables, id);
}
} else { } else {
qError("invalid pOffset->type:%d", pOffset->type); qError("invalid pOffset->type:%d, %s", pOffset->type, id);
return -1; return -1;
} }
} else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
} else { // subType == TOPIC_SUB_TYPE__TABLE/DB
if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
SStreamRawScanInfo* pInfo = pOperator->info; SStreamRawScanInfo* pInfo = pOperator->info;
SSnapContext* sContext = pInfo->sContext; SSnapContext* sContext = pInfo->sContext;
if (setForSnapShot(sContext, pOffset->uid) != 0) { if (setForSnapShot(sContext, pOffset->uid) != 0) {
...@@ -1191,7 +1201,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT ...@@ -1191,7 +1201,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
pInfo->dataReader = NULL; pInfo->dataReader = NULL;
cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond); cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond);
tableListClear(pTaskInfo->pTableInfoList); tableListClear(pTableListInfo);
if (mtInfo.uid == 0) { if (mtInfo.uid == 0) {
return 0; // no data return 0; // no data
...@@ -1200,14 +1210,14 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT ...@@ -1200,14 +1210,14 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
initQueryTableDataCondForTmq(&pTaskInfo->streamInfo.tableCond, sContext, &mtInfo); initQueryTableDataCondForTmq(&pTaskInfo->streamInfo.tableCond, sContext, &mtInfo);
pTaskInfo->streamInfo.tableCond.twindows.skey = pOffset->ts; pTaskInfo->streamInfo.tableCond.twindows.skey = pOffset->ts;
if (pTaskInfo->pTableInfoList == NULL) { // if (pTableListInfo == NULL) {
pTaskInfo->pTableInfoList = tableListCreate(); // pTableListInfo = tableListCreate();
} // }
tableListAddTableInfo(pTaskInfo->pTableInfoList, mtInfo.uid, 0); tableListAddTableInfo(pTableListInfo, mtInfo.uid, 0);
STableKeyInfo* pList = tableListGetInfo(pTaskInfo->pTableInfoList, 0); STableKeyInfo* pList = tableListGetInfo(pTableListInfo, 0);
int32_t size = tableListGetSize(pTaskInfo->pTableInfoList); int32_t size = tableListGetSize(pTableListInfo);
tsdbReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pList, size, NULL, &pInfo->dataReader, NULL); tsdbReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pList, size, NULL, &pInfo->dataReader, NULL);
...@@ -1231,6 +1241,8 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT ...@@ -1231,6 +1241,8 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
pInfo->dataReader = NULL; pInfo->dataReader = NULL;
qDebug("tmqsnap qStreamPrepareScan snapshot log"); qDebug("tmqsnap qStreamPrepareScan snapshot log");
} }
}
return 0; return 0;
} }
......
...@@ -1989,6 +1989,7 @@ static SExecTaskInfo* doCreateExecTaskInfo(uint64_t queryId, uint64_t taskId, in ...@@ -1989,6 +1989,7 @@ static SExecTaskInfo* doCreateExecTaskInfo(uint64_t queryId, uint64_t taskId, in
pTaskInfo->stopInfo.pStopInfo = taosArrayInit(4, sizeof(SExchangeOpStopInfo)); pTaskInfo->stopInfo.pStopInfo = taosArrayInit(4, sizeof(SExchangeOpStopInfo));
pTaskInfo->pResultBlockList = taosArrayInit(128, POINTER_BYTES); pTaskInfo->pResultBlockList = taosArrayInit(128, POINTER_BYTES);
taosInitRWLatch(&pTaskInfo->lock);
pTaskInfo->id.vgId = vgId; pTaskInfo->id.vgId = vgId;
pTaskInfo->id.queryId = queryId; pTaskInfo->id.queryId = queryId;
pTaskInfo->id.str = buildTaskId(taskId, queryId); pTaskInfo->id.str = buildTaskId(taskId, queryId);
......
...@@ -766,8 +766,8 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { ...@@ -766,8 +766,8 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
STableKeyInfo* pTableInfo = tableListGetInfo(pTaskInfo->pTableInfoList, pInfo->currentTable); STableKeyInfo* pTableInfo = tableListGetInfo(pTaskInfo->pTableInfoList, pInfo->currentTable);
tsdbSetTableList(pInfo->base.dataReader, pTableInfo, 1); tsdbSetTableList(pInfo->base.dataReader, pTableInfo, 1);
qDebug("set uid:%" PRIu64 " into scanner, total tables:%d, index:%d %s", pTableInfo->uid, numOfTables, qDebug("set uid:%" PRIu64 " into scanner, total tables:%d, index:%d/%d %s", pTableInfo->uid, numOfTables,
pInfo->currentTable, pTaskInfo->id.str); pInfo->currentTable, numOfTables, pTaskInfo->id.str);
tsdbReaderReset(pInfo->base.dataReader, &pInfo->base.cond); tsdbReaderReset(pInfo->base.dataReader, &pInfo->base.cond);
pInfo->scanTimes = 0; pInfo->scanTimes = 0;
...@@ -1569,19 +1569,16 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock ...@@ -1569,19 +1569,16 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStreamScanInfo* pInfo = pOperator->info; SStreamScanInfo* pInfo = pOperator->info;
const char* id = GET_TASKID(pTaskInfo);
qDebug("start to exec queue scan"); qDebug("start to exec queue scan, %s", id);
if (pTaskInfo->streamInfo.submit.msgStr != NULL) { if (pTaskInfo->streamInfo.submit.msgStr != NULL) {
if (pInfo->tqReader->msg2.msgStr == NULL) {
/*pInfo->tqReader->pMsg = pTaskInfo->streamInfo.pReq;*/
/*const SSubmitReq* pSubmit = pInfo->tqReader->pMsg;*/ if (pInfo->tqReader->msg2.msgStr == NULL) {
/*if (tqReaderSetDataMsg(pInfo->tqReader, pSubmit, 0) < 0) {*/
/*void* msgStr = pTaskInfo->streamInfo.*/
SPackedData submit = pTaskInfo->streamInfo.submit; SPackedData submit = pTaskInfo->streamInfo.submit;
if (tqReaderSetSubmitReq2(pInfo->tqReader, submit.msgStr, submit.msgLen, submit.ver) < 0) { if (tqReaderSetSubmitReq2(pInfo->tqReader, submit.msgStr, submit.msgLen, submit.ver) < 0) {
qError("submit msg messed up when initing stream submit block %p", submit.msgStr); qError("submit msg messed up when initing stream submit block %p, %s", submit.msgStr, id);
pInfo->tqReader->msg2 = (SPackedData){0}; pInfo->tqReader->msg2 = (SPackedData){0};
pInfo->tqReader->setMsg = 0; pInfo->tqReader->setMsg = 0;
ASSERT(0); ASSERT(0);
...@@ -1615,18 +1612,20 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { ...@@ -1615,18 +1612,20 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) { if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) {
SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp); SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);
if (pResult && pResult->info.rows > 0) { if (pResult && pResult->info.rows > 0) {
qDebug("queue scan tsdb return %d rows min:%" PRId64 " max:%" PRId64 " wal curVersion:%" PRId64, pResult->info.rows, qDebug("queue scan tsdb return %d rows min:%" PRId64 " max:%" PRId64 " wal curVersion:%" PRId64" %s", pResult->info.rows,
pResult->info.window.skey, pResult->info.window.ekey, pInfo->tqReader->pWalReader->curVersion); pResult->info.window.skey, pResult->info.window.ekey, pInfo->tqReader->pWalReader->curVersion, id);
pTaskInfo->streamInfo.returned = 1; pTaskInfo->streamInfo.returned = 1;
return pResult; return pResult;
} else { } else {
// no data has return already, try to extract data in the WAL
if (!pTaskInfo->streamInfo.returned) { if (!pTaskInfo->streamInfo.returned) {
STableScanInfo* pTSInfo = pInfo->pTableScanOp->info; STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
tsdbReaderClose(pTSInfo->base.dataReader); tsdbReaderClose(pTSInfo->base.dataReader);
qDebug("3");
pTSInfo->base.dataReader = NULL; pTSInfo->base.dataReader = NULL;
tqOffsetResetToLog(&pTaskInfo->streamInfo.prepareStatus, pTaskInfo->streamInfo.snapshotVer); tqOffsetResetToLog(&pTaskInfo->streamInfo.prepareStatus, pTaskInfo->streamInfo.snapshotVer);
qDebug("queue scan tsdb over, switch to wal ver %" PRId64 "", pTaskInfo->streamInfo.snapshotVer + 1);
qDebug("queue scan tsdb over, switch to wal ver:%" PRId64 " %s", pTaskInfo->streamInfo.snapshotVer + 1, id);
if (tqSeekVer(pInfo->tqReader, pTaskInfo->streamInfo.snapshotVer + 1, pTaskInfo->id.str) < 0) { if (tqSeekVer(pInfo->tqReader, pTaskInfo->streamInfo.snapshotVer + 1, pTaskInfo->id.str) < 0) {
tqOffsetResetToLog(&pTaskInfo->streamInfo.lastStatus, pTaskInfo->streamInfo.snapshotVer); tqOffsetResetToLog(&pTaskInfo->streamInfo.lastStatus, pTaskInfo->streamInfo.snapshotVer);
return NULL; return NULL;
...@@ -1643,7 +1642,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { ...@@ -1643,7 +1642,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
if (tqNextBlock(pInfo->tqReader, &ret) < 0) { if (tqNextBlock(pInfo->tqReader, &ret) < 0) {
// if the end is reached, terrno is 0 // if the end is reached, terrno is 0
if (terrno != 0) { if (terrno != 0) {
qError("failed to get next log block since %s", terrstr()); qError("failed to get next log block since %s, %s", terrstr(), id);
} }
} }
...@@ -1658,9 +1657,6 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { ...@@ -1658,9 +1657,6 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
} else if (ret.fetchType == FETCH_TYPE__META) { } else if (ret.fetchType == FETCH_TYPE__META) {
qError("unexpected ret.fetchType:%d", ret.fetchType); qError("unexpected ret.fetchType:%d", ret.fetchType);
continue; continue;
// pTaskInfo->streamInfo.lastStatus = ret.offset;
// pTaskInfo->streamInfo.metaBlk = ret.meta;
// return NULL;
} else if (ret.fetchType == FETCH_TYPE__NONE || } else if (ret.fetchType == FETCH_TYPE__NONE ||
(ret.fetchType == FETCH_TYPE__SEP && pOperator->status == OP_EXEC_RECV)) { (ret.fetchType == FETCH_TYPE__SEP && pOperator->status == OP_EXEC_RECV)) {
pTaskInfo->streamInfo.lastStatus = ret.offset; pTaskInfo->streamInfo.lastStatus = ret.offset;
...@@ -1672,7 +1668,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { ...@@ -1672,7 +1668,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
} }
} }
} else { } else {
qError("unexpected streamInfo prepare type: %d", pTaskInfo->streamInfo.prepareStatus.type); qError("unexpected streamInfo prepare type: %d %s", pTaskInfo->streamInfo.prepareStatus.type, id);
return NULL; return NULL;
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册