提交 5d5b2bb1 编写于 作者: H Haojun Liao

refactor: do some internal refactor.

上级 ce61b2ab
...@@ -190,8 +190,6 @@ STimeWindow getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int ...@@ -190,8 +190,6 @@ STimeWindow getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int
SArray* qGetQueriedTableListInfo(qTaskInfo_t tinfo); SArray* qGetQueriedTableListInfo(qTaskInfo_t tinfo);
void verifyOffset(void *pWalReader, STqOffsetVal* pOffset);
int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType); int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType);
void qStreamSetOpen(qTaskInfo_t tinfo); void qStreamSetOpen(qTaskInfo_t tinfo);
......
...@@ -142,7 +142,7 @@ typedef struct { ...@@ -142,7 +142,7 @@ typedef struct {
typedef struct SWalReader SWalReader; typedef struct SWalReader SWalReader;
// todo hide this struct // todo hide this struct
typedef struct SWalReader { struct SWalReader {
SWal *pWal; SWal *pWal;
int64_t readerId; int64_t readerId;
TdFilePtr pLogFile; TdFilePtr pLogFile;
...@@ -154,7 +154,7 @@ typedef struct SWalReader { ...@@ -154,7 +154,7 @@ typedef struct SWalReader {
SWalFilterCond cond; SWalFilterCond cond;
// TODO remove it // TODO remove it
SWalCkHead *pHead; SWalCkHead *pHead;
} SWalReader; };
// module initialization // module initialization
int32_t walInit(); int32_t walInit();
...@@ -201,6 +201,7 @@ int32_t walNextValidMsg(SWalReader *pRead); ...@@ -201,6 +201,7 @@ int32_t walNextValidMsg(SWalReader *pRead);
int64_t walReaderGetCurrentVer(const SWalReader *pReader); int64_t walReaderGetCurrentVer(const SWalReader *pReader);
int64_t walReaderGetValidFirstVer(const SWalReader *pReader); int64_t walReaderGetValidFirstVer(const SWalReader *pReader);
void walReaderValidVersionRange(SWalReader *pReader, int64_t *sver, int64_t *ever); void walReaderValidVersionRange(SWalReader *pReader, int64_t *sver, int64_t *ever);
void walReaderVerifyOffset(SWalReader *pWalReader, STqOffsetVal* pOffset);
// only for tq usage // only for tq usage
void walSetReaderCapacity(SWalReader *pRead, int32_t capacity); void walSetReaderCapacity(SWalReader *pRead, int32_t capacity);
......
...@@ -245,7 +245,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, ...@@ -245,7 +245,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
} }
if (offset->type == TMQ_OFFSET__LOG) { if (offset->type == TMQ_OFFSET__LOG) {
verifyOffset(pHandle->pWalReader, offset); walReaderVerifyOffset(pHandle->pWalReader, offset);
int64_t fetchVer = offset->version + 1; int64_t fetchVer = offset->version + 1;
pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048); pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048);
if (pCkHead == NULL) { if (pCkHead == NULL) {
......
...@@ -1058,17 +1058,6 @@ void qStreamSetOpen(qTaskInfo_t tinfo) { ...@@ -1058,17 +1058,6 @@ void qStreamSetOpen(qTaskInfo_t tinfo) {
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
} }
void verifyOffset(void *pWalReader, STqOffsetVal* pOffset){
// if offset version is small than first version , let's seek to first version
taosThreadMutexLock(&((SWalReader*)pWalReader)->pWal->mutex);
int64_t firstVer = walGetFirstVer(((SWalReader*)pWalReader)->pWal);
taosThreadMutexUnlock(&((SWalReader*)pWalReader)->pWal->mutex);
if (pOffset->version + 1 < firstVer){
pOffset->version = firstVer - 1;
}
}
int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType) { int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
SOperatorInfo* pOperator = pTaskInfo->pRoot; SOperatorInfo* pOperator = pTaskInfo->pRoot;
...@@ -1095,7 +1084,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT ...@@ -1095,7 +1084,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
tsdbReaderClose(pScanBaseInfo->dataReader); tsdbReaderClose(pScanBaseInfo->dataReader);
pScanBaseInfo->dataReader = NULL; pScanBaseInfo->dataReader = NULL;
verifyOffset(pInfo->tqReader->pWalReader, pOffset); walReaderVerifyOffset(pInfo->tqReader->pWalReader, pOffset);
if (tqSeekVer(pInfo->tqReader, pOffset->version + 1, id) < 0) { if (tqSeekVer(pInfo->tqReader, pOffset->version + 1, id) < 0) {
qError("tqSeekVer failed ver:%" PRId64 ", %s", pOffset->version + 1, id); qError("tqSeekVer failed ver:%" PRId64 ", %s", pOffset->version + 1, id);
return -1; return -1;
......
...@@ -1697,13 +1697,13 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { ...@@ -1697,13 +1697,13 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__LOG) { if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__LOG) {
while (1) { while (1) {
int32_t type = tqNextBlockInWal(pInfo->tqReader); bool hasResult = tqNextBlockInWal(pInfo->tqReader, id);
SSDataBlock* pRes = pInfo->tqReader->pResBlock; SSDataBlock* pRes = pInfo->tqReader->pResBlock;
// curVersion move to next, so currentOffset = curVersion - 1 // curVersion move to next, so currentOffset = curVersion - 1
tqOffsetResetToLog(&pTaskInfo->streamInfo.currentOffset, pInfo->tqReader->pWalReader->curVersion - 1); tqOffsetResetToLog(&pTaskInfo->streamInfo.currentOffset, pInfo->tqReader->pWalReader->curVersion - 1);
if (type == FETCH_TYPE__DATA) { if (hasResult) {
qDebug("doQueueScan get data from log %" PRId64 " rows, version:%" PRId64, pRes->info.rows, qDebug("doQueueScan get data from log %" PRId64 " rows, version:%" PRId64, pRes->info.rows,
pTaskInfo->streamInfo.currentOffset.version); pTaskInfo->streamInfo.currentOffset.version);
blockDataCleanup(pInfo->pRes); blockDataCleanup(pInfo->pRes);
...@@ -1711,7 +1711,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { ...@@ -1711,7 +1711,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
if (pInfo->pRes->info.rows > 0) { if (pInfo->pRes->info.rows > 0) {
return pInfo->pRes; return pInfo->pRes;
} }
} else if (type == FETCH_TYPE__NONE) { } else {
qDebug("doQueueScan get none from log, return, version:%" PRId64, pTaskInfo->streamInfo.currentOffset.version); qDebug("doQueueScan get none from log, return, version:%" PRId64, pTaskInfo->streamInfo.currentOffset.version);
return NULL; return NULL;
} }
...@@ -2075,65 +2075,42 @@ FETCH_NEXT_BLOCK: ...@@ -2075,65 +2075,42 @@ FETCH_NEXT_BLOCK:
} }
const char* id = GET_TASKID(pTaskInfo); const char* id = GET_TASKID(pTaskInfo);
SDataBlockInfo* pBlockInfo = &pInfo->pRes->info; SSDataBlock* pBlock = pInfo->pRes;
SDataBlockInfo* pBlockInfo = &pBlock->info;
int32_t totalBlocks = taosArrayGetSize(pInfo->pBlockLists); int32_t totalBlocks = taosArrayGetSize(pInfo->pBlockLists);
NEXT_SUBMIT_BLK: NEXT_SUBMIT_BLK:
while (1) {
if (pInfo->tqReader->msg.msgStr == NULL) {
if (pInfo->validBlockIndex >= totalBlocks) {
updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo);
doClearBufferedBlocks(pInfo);
qDebug("stream scan return empty, all %d submit blocks consumed, %s", totalBlocks, id);
void* buff = NULL;
// int32_t len = streamScanOperatorEncode(pInfo, &buff);
// if (len > 0) {
// streamStateSaveInfo(pInfo->pState, STREAM_SCAN_OP_NAME, strlen(STREAM_SCAN_OP_NAME), buff, len);
// }
taosMemoryFreeClear(buff);
return NULL;
}
int32_t current = pInfo->validBlockIndex++;
SPackedData* pSubmit = taosArrayGet(pInfo->pBlockLists, current);
qDebug("set %d/%d as the input submit block, %s", current, totalBlocks, id); while (1) {
if (tqReaderSetSubmitMsg(pInfo->tqReader, pSubmit->msgStr, pSubmit->msgLen, pSubmit->ver) < 0) { bool hasResult = tqNextBlockInWal(pInfo->tqReader, id);
qError("submit msg messed up when initializing stream submit block %p, current %d/%d, %s", pSubmit, current, totalBlocks, id); SSDataBlock* pRes = pInfo->tqReader->pResBlock;
continue;
}
}
blockDataCleanup(pInfo->pRes);
while (tqNextBlockImpl(pInfo->tqReader, id)) { blockDataCleanup(pBlock);
int32_t code = tqRetrieveDataBlock(pInfo->tqReader, id);
if (code != TSDB_CODE_SUCCESS || pInfo->tqReader->pResBlock->info.rows == 0) {
continue;
}
setBlockIntoRes(pInfo, pInfo->tqReader->pResBlock, false); if (hasResult) {
qDebug("stream scan get data from log %" PRId64 " rows, version:%" PRId64, pRes->info.rows,
pTaskInfo->streamInfo.currentOffset.version);
setBlockIntoRes(pInfo, pRes, true);
if (pInfo->pCreateTbRes->info.rows > 0) { if (pInfo->pCreateTbRes->info.rows > 0) {
pInfo->scanMode = STREAM_SCAN_FROM_RES; pInfo->scanMode = STREAM_SCAN_FROM_RES;
return pInfo->pCreateTbRes; return pInfo->pCreateTbRes;
} }
doCheckUpdate(pInfo, pBlockInfo->window.ekey, pInfo->pRes); doCheckUpdate(pInfo, pBlockInfo->window.ekey, pBlock);
doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL); doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL);
pInfo->pRes->info.dataLoad = 1; pBlock->info.dataLoad = 1;
blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex); blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex);
} else {
updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo);
doClearBufferedBlocks(pInfo);
if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) { qDebug("stream scan return empty, all %d submit blocks consumed, %s", totalBlocks, id);
break; return NULL;
}
} }
if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) { if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) {
break; break;
} else {
continue;
} }
} }
...@@ -2141,9 +2118,9 @@ FETCH_NEXT_BLOCK: ...@@ -2141,9 +2118,9 @@ FETCH_NEXT_BLOCK:
pInfo->numOfExec++; pInfo->numOfExec++;
pOperator->resultInfo.totalRows += pBlockInfo->rows; pOperator->resultInfo.totalRows += pBlockInfo->rows;
qDebug("stream scan get source rows:%" PRId64", %s", pBlockInfo->rows, id); qDebug("stream scan get source rows:%" PRId64 ", %s", pBlockInfo->rows, id);
if (pBlockInfo->rows > 0) { if (pBlockInfo->rows > 0) {
return pInfo->pRes; return pBlock;
} }
if (pInfo->pUpdateDataRes->info.rows > 0) { if (pInfo->pUpdateDataRes->info.rows > 0) {
...@@ -2151,10 +2128,84 @@ FETCH_NEXT_BLOCK: ...@@ -2151,10 +2128,84 @@ FETCH_NEXT_BLOCK:
} }
goto NEXT_SUBMIT_BLK; goto NEXT_SUBMIT_BLK;
} else {
ASSERT(0); // } else {
return NULL; // qDebug("stream scan get none from log, return, version:%" PRId64,
// pTaskInfo->streamInfo.currentOffset.version); return NULL;
// }
// while (1) {
// if (pInfo->tqReader->msg.msgStr == NULL) {
// if (pInfo->validBlockIndex >= totalBlocks) {
// updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo);
// doClearBufferedBlocks(pInfo);
//
// qDebug("stream scan return empty, all %d submit blocks consumed, %s", totalBlocks, id);
// return NULL;
// }
//
// int32_t current = pInfo->validBlockIndex++;
// SPackedData* pSubmit = taosArrayGet(pInfo->pBlockLists, current);
//
// qDebug("set %d/%d as the input submit block, %s", current, totalBlocks, id);
// if (tqReaderSetSubmitMsg(pInfo->tqReader, pSubmit->msgStr, pSubmit->msgLen, pSubmit->ver) < 0) {
// qError("submit msg messed up when initializing stream submit block %p, current %d/%d, %s", pSubmit,
// current, totalBlocks, id); continue;
// }
// }
//
// blockDataCleanup(pInfo->pRes);
//
// while (tqNextBlockImpl(pInfo->tqReader, id)) {
// int32_t code = tqRetrieveDataBlock(pInfo->tqReader, id);
// if (code != TSDB_CODE_SUCCESS || pInfo->tqReader->pResBlock->info.rows == 0) {
// continue;
// }
//
// setBlockIntoRes(pInfo, pInfo->tqReader->pResBlock, false);
//
// if (pInfo->pCreateTbRes->info.rows > 0) {
// pInfo->scanMode = STREAM_SCAN_FROM_RES;
// return pInfo->pCreateTbRes;
// }
//
// doCheckUpdate(pInfo, pBlockInfo->window.ekey, pInfo->pRes);
// doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
// pInfo->pRes->info.dataLoad = 1;
// blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
//
// if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) {
// break;
// }
// }
//
// if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) {
// break;
// } else {
// continue;
// }
// }
//
// // record the scan action.
// pInfo->numOfExec++;
// pOperator->resultInfo.totalRows += pBlockInfo->rows;
//
// qDebug("stream scan get source rows:%" PRId64", %s", pBlockInfo->rows, id);
// if (pBlockInfo->rows > 0) {
// return pInfo->pRes;
// }
//
// if (pInfo->pUpdateDataRes->info.rows > 0) {
// goto FETCH_NEXT_BLOCK;
// }
//
// goto NEXT_SUBMIT_BLK;
// } else {
// ASSERT(0);
// return NULL;
// }
} }
return NULL;
} }
static SArray* extractTableIdList(const STableListInfo* pTableListInfo) { static SArray* extractTableIdList(const STableListInfo* pTableListInfo) {
......
...@@ -116,6 +116,17 @@ void walReaderValidVersionRange(SWalReader *pReader, int64_t *sver, int64_t *eve ...@@ -116,6 +116,17 @@ void walReaderValidVersionRange(SWalReader *pReader, int64_t *sver, int64_t *eve
*ever = pReader->cond.scanUncommited ? lastVer : committedVer; *ever = pReader->cond.scanUncommited ? lastVer : committedVer;
} }
void walReaderVerifyOffset(SWalReader *pWalReader, STqOffsetVal* pOffset){
// if offset version is small than first version , let's seek to first version
taosThreadMutexLock(&pWalReader->pWal->mutex);
int64_t firstVer = walGetFirstVer((pWalReader)->pWal);
taosThreadMutexUnlock(&pWalReader->pWal->mutex);
if (pOffset->version + 1 < firstVer){
pOffset->version = firstVer - 1;
}
}
static int64_t walReadSeekFilePos(SWalReader *pReader, int64_t fileFirstVer, int64_t ver) { static int64_t walReadSeekFilePos(SWalReader *pReader, int64_t fileFirstVer, int64_t ver) {
int64_t ret = 0; int64_t ret = 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册