提交 94e7c79c 编写于 作者: L Liu Jicong

fix(tmq): handle adding subtable for subscribing stb

上级 92804d31
......@@ -1268,7 +1268,7 @@ int32_t tqProcessSubmitReq(STQ* pTq, SSubmitReq* pReq, int64_t ver) {
if (pIter == NULL) break;
SStreamTask* pTask = *(SStreamTask**)pIter;
if (pTask->taskLevel != TASK_LEVEL__SOURCE) continue;
if (pTask->taskStatus == TASK_STATUS__RECOVER_PREPARE || pTask->taskStatus == TASK_STATUS__RECOVER1) {
if (pTask->taskStatus == TASK_STATUS__RECOVER_PREPARE) {
tqDebug("skip push task %d, task status %d", pTask->taskId, pTask->taskStatus);
continue;
}
......
......@@ -199,6 +199,9 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHea
goto END;
}
tqDebug("vgId:%d, taosx get msg ver %" PRId64 ", type: %s", pTq->pVnode->config.vgId, offset,
TMSG_INFO((*ppCkHead)->head.msgType));
if ((*ppCkHead)->head.msgType == TDMT_VND_SUBMIT) {
code = walFetchBody(pHandle->pWalReader, ppCkHead);
......@@ -586,8 +589,39 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
taosHashPut(pExec->execHandle.execDb.pFilterOutTbUid, &tbUid, sizeof(int64_t), NULL, 0);
}
}
} else if (pExec->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
if (isAdd) {
SArray* qa = taosArrayInit(4, sizeof(tb_uid_t));
SMetaReader mr = {0};
metaReaderInit(&mr, pTq->pVnode->pMeta, 0);
for (int32_t i = 0; i < taosArrayGetSize(tbUidList); ++i) {
uint64_t* id = (uint64_t*)taosArrayGet(tbUidList, i);
int32_t code = metaGetTableEntryByUid(&mr, *id);
if (code != TSDB_CODE_SUCCESS) {
qError("failed to get table meta, uid:%" PRIu64 " code:%s", *id, tstrerror(terrno));
continue;
}
tDecoderClear(&mr.coder);
if (mr.me.type != TSDB_CHILD_TABLE || mr.me.ctbEntry.suid != pExec->execHandle.execTb.suid) {
tqDebug("table uid %" PRId64 " does not add to tq handle", *id);
continue;
}
tqDebug("table uid %" PRId64 " add to tq handle", *id);
taosArrayPush(qa, id);
}
metaReaderClear(&mr);
if (taosArrayGetSize(qa) > 0) {
tqReaderAddTbUidList(pExec->execHandle.pExecReader, qa);
}
taosArrayDestroy(qa);
} else {
// TODO handle delete table from stb
}
} else {
// tq update id
ASSERT(0);
}
}
while (1) {
......
......@@ -344,7 +344,8 @@ static bool doLoadBlockSMA(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock,
return true;
}
static void doSetTagColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo, int32_t rows) {
static void doSetTagColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo,
int32_t rows) {
if (pTableScanInfo->pseudoSup.numOfExprs > 0) {
SExprSupp* pSup = &pTableScanInfo->pseudoSup;
......@@ -1878,7 +1879,6 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
}
#endif
#if 1
if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE1 ||
pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE2) {
STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
......@@ -1914,7 +1914,6 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
return NULL;
}
#endif
size_t total = taosArrayGetSize(pInfo->pBlockLists);
// TODO: refactor
......@@ -2296,7 +2295,7 @@ SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pT
pInfo->vnode = pHandle->vnode;
pInfo->sContext = pHandle->sContext;
pOperator->name = "RawStreamScanOperator";
pOperator->name = "RawScanOperator";
pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
......@@ -4384,8 +4383,9 @@ static int32_t loadDataBlockFromOneTable2(SOperatorInfo* pOperator, STableMergeS
// currently only the tbname pseudo column
if (pTableScanInfo->pseudoSup.numOfExprs > 0) {
int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pTableScanInfo->pseudoSup.pExprInfo,
pTableScanInfo->pseudoSup.numOfExprs, pBlock, pBlock->info.rows, GET_TASKID(pTaskInfo));
int32_t code =
addTagPseudoColumnData(&pTableScanInfo->readHandle, pTableScanInfo->pseudoSup.pExprInfo,
pTableScanInfo->pseudoSup.numOfExprs, pBlock, pBlock->info.rows, GET_TASKID(pTaskInfo));
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code);
}
......@@ -4501,8 +4501,9 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc
// currently only the tbname pseudo column
if (pTableScanInfo->pseudoSup.numOfExprs > 0) {
int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pTableScanInfo->pseudoSup.pExprInfo,
pTableScanInfo->pseudoSup.numOfExprs, pBlock, pBlock->info.rows, GET_TASKID(pTaskInfo));
int32_t code =
addTagPseudoColumnData(&pTableScanInfo->readHandle, pTableScanInfo->pseudoSup.pExprInfo,
pTableScanInfo->pseudoSup.numOfExprs, pBlock, pBlock->info.rows, GET_TASKID(pTaskInfo));
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code);
}
......
......@@ -347,22 +347,46 @@ static int32_t walSkipFetchBodyNew(SWalReader *pRead) {
int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) {
int64_t code;
int64_t contLen;
bool seeked = false;
wDebug("vgId:%d try to fetch ver %" PRId64 ", first ver:%" PRId64 ", commit ver:%" PRId64 ", last ver:%" PRId64,
pRead->pWal->cfg.vgId, ver, pRead->pWal->vers.firstVer, pRead->pWal->vers.commitVer,
pRead->pWal->vers.lastVer);
// TODO: valid ver
if (ver > pRead->pWal->vers.commitVer) {
if (ver > pRead->pWal->vers.appliedVer) {
return -1;
}
if (pRead->curInvalid || pRead->curVersion != ver) {
code = walReadSeekVer(pRead, ver);
if (code < 0) return -1;
if (code < 0) {
pRead->curVersion = ver;
pRead->curInvalid = 1;
return -1;
}
seeked = true;
}
ASSERT(taosValidFile(pRead->pLogFile) == true);
code = taosReadFile(pRead->pLogFile, pHead, sizeof(SWalCkHead));
if (code != sizeof(SWalCkHead)) {
return -1;
while (1) {
contLen = taosReadFile(pRead->pLogFile, pHead, sizeof(SWalCkHead));
if (contLen == sizeof(SWalCkHead)) {
break;
} else if (contLen == 0 && !seeked) {
walReadSeekVerImpl(pRead, ver);
seeked = true;
continue;
} else {
if (contLen < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
} else {
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
}
ASSERT(0);
pRead->curInvalid = 1;
return -1;
}
}
code = walValidHeadCksum(pHead);
......@@ -373,13 +397,15 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) {
return -1;
}
pRead->curInvalid = 0;
return 0;
}
int32_t walSkipFetchBody(SWalReader *pRead, const SWalCkHead *pHead) {
int64_t code;
// ASSERT(pRead->curVersion == pHead->head.version);
ASSERT(pRead->curVersion == pHead->head.version);
ASSERT(pRead->curInvalid == 0);
code = taosLSeekFile(pRead->pLogFile, pHead->head.bodyLen, SEEK_CUR);
if (code < 0) {
......@@ -409,19 +435,32 @@ int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) {
}
if (pReadHead->bodyLen != taosReadFile(pRead->pLogFile, pReadHead->body, pReadHead->bodyLen)) {
if (pReadHead->bodyLen < 0) {
ASSERT(0);
terrno = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64 ", since %s",
pRead->pWal->cfg.vgId, pReadHead->version, ver, tstrerror(terrno));
} else {
wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64 ", since file corrupted",
pRead->pWal->cfg.vgId, pReadHead->version, ver);
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
}
pRead->curInvalid = 1;
ASSERT(0);
return -1;
}
if (pReadHead->version != ver) {
ASSERT(0);
wError("vgId:%d, wal fetch body error, index:%" PRId64 ", read request index:%" PRId64, pRead->pWal->cfg.vgId,
pRead->pHead->head.version, ver);
pReadHead->version, ver);
pRead->curInvalid = 1;
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
return -1;
}
if (walValidBodyCksum(*ppHead) != 0) {
ASSERT(0);
wError("vgId:%d, wal fetch body error, index:%" PRId64 ", since body checksum not passed", pRead->pWal->cfg.vgId,
ver);
pRead->curInvalid = 1;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册