提交 6f8d4d69 编写于 作者: L Liu Jicong

fix(tmq): push msg

上级 260ca173
......@@ -171,11 +171,11 @@ int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry) {
ASSERT(taosArrayGetSize(pRsp->blockSchema) == 0);
if (pRsp->reqOffset.type == TMQ_OFFSET__LOG) {
if (pRsp->blockNum > 0) {
ASSERT(pRsp->rspOffset.version > pRsp->reqOffset.version);
} else {
ASSERT(pRsp->rspOffset.version >= pRsp->reqOffset.version);
}
/*if (pRsp->blockNum > 0) {*/
/*ASSERT(pRsp->rspOffset.version > pRsp->reqOffset.version);*/
/*} else {*/
ASSERT(pRsp->rspOffset.version > pRsp->reqOffset.version);
/*}*/
}
int32_t len = 0;
......
......@@ -245,7 +245,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
tqDebug("vgId:%d cannot find handle %s", pTq->pVnode->config.vgId, pPushEntry->subKey);
continue;
}
if (pPushEntry->dataRsp.reqOffset.version > ver) {
if (pPushEntry->dataRsp.reqOffset.version >= ver) {
tqDebug("vgId:%d push entry req version %ld, while push version %ld, skip", pTq->pVnode->config.vgId,
pPushEntry->dataRsp.reqOffset.version, ver);
continue;
......
......@@ -479,6 +479,7 @@ typedef struct SStreamScanInfo {
SExprInfo* pPseudoExpr;
int32_t numOfPseudoExpr;
SExprSupp tbnameCalSup;
SExprSupp tagCalSup;
int32_t primaryTsIndex; // primary time stamp slot id
SReadHandle readHandle;
SInterval interval; // if the upstream is an interval operator, the interval info is also kept here.
......@@ -775,6 +776,7 @@ typedef struct SStreamPartitionOperatorInfo {
SPartitionBySupporter partitionSup;
SExprSupp scalarSup;
SExprSupp tbnameCalSup;
SExprSupp tagCalSup;
SHashObj* pPartitions;
void* parIte;
SSDataBlock* pInputDataBlock;
......
......@@ -991,6 +991,8 @@ static void destroyStreamPartitionOperatorInfo(void* param) {
taosMemoryFree(pInfo->partitionSup.keyBuf);
cleanupExprSupp(&pInfo->scalarSup);
cleanupExprSupp(&pInfo->tbnameCalSup);
cleanupExprSupp(&pInfo->tagCalSup);
blockDataDestroy(pInfo->pDelRes);
taosMemoryFreeClear(param);
}
......@@ -1037,6 +1039,19 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr
}
}
if (pPartNode->pTags != NULL) {
int32_t numOfTags;
SExprInfo* pTagExpr = createExprInfo(pPartNode->pTags, NULL, &numOfTags);
if (pTagExpr == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _error;
}
if (initExprSupp(&pInfo->tagCalSup, pTagExpr, numOfTags) != 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _error;
}
}
int32_t keyLen = 0;
code = initGroupOptrInfo(&pInfo->partitionSup.pGroupColVals, &keyLen, &pInfo->partitionSup.keyBuf,
pInfo->partitionSup.pGroupCols);
......
......@@ -1311,10 +1311,15 @@ static void calBlockTag(SExprSupp* pTagCalSup, SSDataBlock* pBlock, SSDataBlock*
blockDataEnsureCapacity(pResBlock, 1);
projectApplyFunctions(pTagCalSup->pExprInfo, pResBlock, pSrcBlock, pTagCalSup->pCtx, 1, NULL);
projectApplyFunctions(pTagCalSup->pExprInfo, pResBlock, pSrcBlock, pTagCalSup->pCtx, pTagCalSup->numOfExprs, NULL);
ASSERT(pResBlock->info.rows == 1);
// build tagArray
/*SArray* tagArray = taosArrayInit(0, sizeof(void*));*/
/*STagVal tagVal = {*/
/*.cid = 0,*/
/*.type = 0,*/
/*};*/
// build STag
// set STag
......@@ -2110,6 +2115,9 @@ static void destroyStreamScanOperatorInfo(void* param) {
taosMemoryFree(pStreamScan->pPseudoExpr);
}
cleanupExprSupp(&pStreamScan->tbnameCalSup);
cleanupExprSupp(&pStreamScan->tagCalSup);
updateInfoDestroy(pStreamScan->pUpdateInfo);
blockDataDestroy(pStreamScan->pRes);
blockDataDestroy(pStreamScan->pUpdateRes);
......@@ -2164,6 +2172,19 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
}
}
if (pTableScanNode->pTags != NULL) {
int32_t numOfTags;
SExprInfo* pTagExpr = createExprInfo(pTableScanNode->pTags, NULL, &numOfTags);
if (pTagExpr == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _error;
}
if (initExprSupp(&pInfo->tagCalSup, pTagExpr, numOfTags) != 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _error;
}
}
pInfo->pBlockLists = taosArrayInit(4, POINTER_BYTES);
if (pInfo->pBlockLists == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
......
......@@ -351,7 +351,10 @@ int walCheckAndRepairIdx(SWal* pWal) {
taosWriteFile(pIdxFile, &idxEntry, sizeof(SWalIdxEntry));
break;
}
taosLSeekFile(pIdxFile, offset, SEEK_SET);
if (taosLSeekFile(pIdxFile, offset, SEEK_SET) < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d cannot seek offset %ld when repair idx since %s", pWal->cfg.vgId, offset, terrstr());
}
int64_t contLen = taosReadFile(pIdxFile, &idxEntry, sizeof(SWalIdxEntry));
if (contLen < 0 || contLen != sizeof(SWalIdxEntry)) {
terrno = TAOS_SYSTEM_ERROR(errno);
......@@ -636,6 +639,5 @@ int walRemoveMeta(SWal* pWal) {
if (metaVer == -1) return 0;
char fnameStr[WAL_FILE_LEN];
walBuildMetaName(pWal, metaVer, fnameStr);
taosRemoveFile(fnameStr);
return 0;
return taosRemoveFile(fnameStr);
}
......@@ -514,7 +514,6 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) {
pReader->pHead->head.version, ver);
pReader->curInvalid = 1;
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
ASSERT(0);
taosThreadMutexUnlock(&pReader->mutex);
return -1;
}
......@@ -528,7 +527,6 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) {
wError("checksum written into log:%u, checksum calculated:%u", logCkSum, readCkSum);
pReader->curInvalid = 1;
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
ASSERT(0);
taosThreadMutexUnlock(&pReader->mutex);
return -1;
}
......
......@@ -42,10 +42,20 @@ int32_t walRestoreFromSnapshot(SWal *pWal, int64_t ver) {
SWalFileInfo *pFileInfo = taosArrayGet(pWal->fileInfoSet, i);
char fnameStr[WAL_FILE_LEN];
walBuildLogName(pWal, pFileInfo->firstVer, fnameStr);
taosRemoveFile(fnameStr);
if (taosRemoveFile(fnameStr) < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d restore from snapshot, cannot remove file %s since %s", pWal->cfg.vgId, fnameStr, terrstr());
return -1;
}
wInfo("vgId:%d restore from snapshot, remove file %s", pWal->cfg.vgId, fnameStr);
walBuildIdxName(pWal, pFileInfo->firstVer, fnameStr);
taosRemoveFile(fnameStr);
if (taosRemoveFile(fnameStr) < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d cannot remove file %s since %s", pWal->cfg.vgId, fnameStr, terrstr());
return -1;
}
wInfo("vgId:%d restore from snapshot, remove file %s", pWal->cfg.vgId, fnameStr);
}
}
walRemoveMeta(pWal);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册