提交 82a1cc8e 编写于 作者: L Liu Jicong

fix(wal): return correct msg for open wal file

上级 5005ee33
...@@ -92,9 +92,8 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo ...@@ -92,9 +92,8 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
blkHead->uid = 0; blkHead->uid = 0;
int32_t rows = pDataBlock->info.rows; int32_t rows = pDataBlock->info.rows;
/*int32_t maxLen = TD_ROW_MAX_BYTES_FROM_SCHEMA(pTSchema);*/
/*blkHead->dataLen = htonl(rows * maxLen);*/ int32_t dataLen = 0;
blkHead->dataLen = 0;
void* blkSchema = POINTER_SHIFT(blkHead, sizeof(SSubmitBlk)); void* blkSchema = POINTER_SHIFT(blkHead, sizeof(SSubmitBlk));
...@@ -167,9 +166,8 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo ...@@ -167,9 +166,8 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
} }
int32_t rowLen = TD_ROW_LEN(rowData); int32_t rowLen = TD_ROW_LEN(rowData);
rowData = POINTER_SHIFT(rowData, rowLen); rowData = POINTER_SHIFT(rowData, rowLen);
blkHead->dataLen += rowLen; dataLen += rowLen;
} }
int32_t dataLen = blkHead->dataLen;
blkHead->dataLen = htonl(dataLen); blkHead->dataLen = htonl(dataLen);
ret->length += sizeof(SSubmitBlk) + schemaLen + dataLen; ret->length += sizeof(SSubmitBlk) + schemaLen + dataLen;
......
...@@ -684,6 +684,7 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num ...@@ -684,6 +684,7 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num
} }
void printDataBlock(SSDataBlock* pBlock, const char* flag) { void printDataBlock(SSDataBlock* pBlock, const char* flag) {
if (pBlock == NULL) return;
SArray* blocks = taosArrayInit(1, sizeof(SSDataBlock)); SArray* blocks = taosArrayInit(1, sizeof(SSDataBlock));
taosArrayPush(blocks, pBlock); taosArrayPush(blocks, pBlock);
blockDebugShowData(blocks, flag); blockDebugShowData(blocks, flag);
...@@ -1262,7 +1263,7 @@ static int32_t getAllIntervalWindow(SHashObj* pHashMap, SArray* resWins) { ...@@ -1262,7 +1263,7 @@ static int32_t getAllIntervalWindow(SHashObj* pHashMap, SArray* resWins) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
bool isCloseWindow(STimeWindow *pWin, STimeWindowAggSupp* pSup) { bool isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pSup) {
ASSERT(pSup->maxTs == INT64_MIN || pSup->maxTs > 0); ASSERT(pSup->maxTs == INT64_MIN || pSup->maxTs > 0);
return pSup->maxTs != INT64_MIN && pWin->ekey < pSup->maxTs - pSup->waterMark; return pSup->maxTs != INT64_MIN && pWin->ekey < pSup->maxTs - pSup->waterMark;
} }
...@@ -2402,7 +2403,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, ...@@ -2402,7 +2403,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
initResultRowInfo(&pInfo->binfo.resultRowInfo); initResultRowInfo(&pInfo->binfo.resultRowInfo);
pInfo->pChildren = NULL; pInfo->pChildren = NULL;
if (numOfChild > 0) { if (numOfChild > 0) {
pInfo->pChildren = taosArrayInit(numOfChild, sizeof(void *)); pInfo->pChildren = taosArrayInit(numOfChild, sizeof(void*));
for (int32_t i = 0; i < numOfChild; i++) { for (int32_t i = 0; i < numOfChild; i++) {
SOperatorInfo* pChildOp = createStreamFinalIntervalOperatorInfo(NULL, pPhyNode, pTaskInfo, 0); SOperatorInfo* pChildOp = createStreamFinalIntervalOperatorInfo(NULL, pPhyNode, pTaskInfo, 0);
if (pChildOp) { if (pChildOp) {
...@@ -3027,12 +3028,14 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { ...@@ -3027,12 +3028,14 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
} else if (pOperator->status == OP_RES_TO_RETURN) { } else if (pOperator->status == OP_RES_TO_RETURN) {
doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator); doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
if (pInfo->pDelRes->info.rows > 0) { if (pInfo->pDelRes->info.rows > 0) {
/*printDataBlock(pInfo->pDelRes, "session del");*/
return pInfo->pDelRes; return pInfo->pDelRes;
} }
doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf); doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf);
if (pBInfo->pRes->info.rows == 0 || !hasDataInGroupInfo(&pInfo->groupResInfo)) { if (pBInfo->pRes->info.rows == 0 || !hasDataInGroupInfo(&pInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator); doSetOperatorCompleted(pOperator);
} }
/*printDataBlock(pBInfo->pRes, "session insert");*/
return pBInfo->pRes->info.rows == 0 ? NULL : pBInfo->pRes; return pBInfo->pRes->info.rows == 0 ? NULL : pBInfo->pRes;
} }
...@@ -3100,9 +3103,11 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { ...@@ -3100,9 +3103,11 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator); doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
if (pInfo->pDelRes->info.rows > 0) { if (pInfo->pDelRes->info.rows > 0) {
/*printDataBlock(pInfo->pDelRes, "session del");*/
return pInfo->pDelRes; return pInfo->pDelRes;
} }
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf); doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf);
/*printDataBlock(pBInfo->pRes, "session insert");*/
return pBInfo->pRes->info.rows == 0 ? NULL : pBInfo->pRes; return pBInfo->pRes->info.rows == 0 ? NULL : pBInfo->pRes;
} }
......
...@@ -100,7 +100,6 @@ static int32_t walReadChangeFile(SWalReadHandle *pRead, int64_t fileFirstVer) { ...@@ -100,7 +100,6 @@ static int32_t walReadChangeFile(SWalReadHandle *pRead, int64_t fileFirstVer) {
TdFilePtr pLogTFile = taosOpenFile(fnameStr, TD_FILE_READ); TdFilePtr pLogTFile = taosOpenFile(fnameStr, TD_FILE_READ);
if (pLogTFile == NULL) { if (pLogTFile == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
terrno = TSDB_CODE_WAL_INVALID_VER;
wError("cannot open file %s, since %s", fnameStr, terrstr()); wError("cannot open file %s, since %s", fnameStr, terrstr());
return -1; return -1;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册