提交 1309c489 编写于 作者: wmmhello's avatar wmmhello

fix:error in tmq for snapshot

上级 f2815571
......@@ -84,7 +84,7 @@ int32_t init_env() {
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create database if not exists db_taosx vgroups 4");
pRes = taos_query(pConn, "create database if not exists db_taosx vgroups 1");
if (taos_errno(pRes) != 0) {
printf("error in create db_taosx, reason:%s\n", taos_errstr(pRes));
return -1;
......@@ -98,7 +98,7 @@ int32_t init_env() {
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create database if not exists abc1 vgroups 3");
pRes = taos_query(pConn, "create database if not exists abc1 vgroups 1");
if (taos_errno(pRes) != 0) {
printf("error in create db, reason:%s\n", taos_errstr(pRes));
return -1;
......
......@@ -43,7 +43,6 @@ typedef struct SReadHandle {
int32_t numOfVgroups;
void* sContext; // SSnapContext*
void* pWalReader;
SHashObj *pFilterOutTbUid;
} SReadHandle;
......
......@@ -1228,6 +1228,7 @@ void blockDataFreeRes(SSDataBlock* pBlock) {
}
taosArrayDestroy(pBlock->pDataBlock);
pBlock->pDataBlock = NULL;
taosMemoryFreeClear(pBlock->pBlockAgg);
memset(&pBlock->info, 0, sizeof(SDataBlockInfo));
}
......
......@@ -385,11 +385,17 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
if (tqSendMetaPollRsp(pTq, pMsg, pReq, &metaRsp) < 0) {
code = -1;
}
taosMemoryFree(metaRsp.metaRsp);
goto OVER;
}
tqDebug("tmq poll: consumer %ld, subkey %s, vg %d, no data", consumerId, pHandle->subKey,
TD_VID(pTq->pVnode));
tqOffsetResetToLog(&dataRsp.rspOffset, metaRsp.rspOffset.version);
if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) {
code = -1;
}
OVER:
// TODO wrap in destroy func
......@@ -404,8 +410,6 @@ OVER:
taosArrayDestroyP(dataRsp.blockTbName, (FDelete)taosMemoryFree);
}
taosMemoryFreeClear(metaRsp.metaRsp);
return code;
}
......@@ -497,7 +501,6 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
buildSnapContext(handle.meta, handle.version, 0, pHandle->execHandle.subType, pHandle->fetchMeta, (SSnapContext **)(&handle.sContext));
handle.tqReader = pHandle->execHandle.pExecReader;
handle.pWalReader = ((STqReader*)handle.tqReader)->pWalReader;
handle.pFilterOutTbUid = pHandle->execHandle.execDb.pFilterOutTbUid;
pHandle->execHandle.task =
......@@ -516,7 +519,6 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
buildSnapContext(handle.meta, handle.version, req.suid, pHandle->execHandle.subType, pHandle->fetchMeta, (SSnapContext **)(&handle.sContext));
handle.tqReader = pHandle->execHandle.pExecReader;
handle.pWalReader = ((STqReader*)handle.tqReader)->pWalReader;
pHandle->execHandle.task =
qCreateQueueExecTaskInfo(NULL, &handle, NULL, NULL);
}
......
......@@ -124,6 +124,7 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqMetaRsp*
if (pRsp->blockNum > 0){
qStreamExtractOffset(task, &pRsp->rspOffset);
tqDebug("task exec exited, get data");
break;
}
......@@ -131,11 +132,12 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqMetaRsp*
if(tmp->rspOffset.type == TMQ_OFFSET__SNAPSHOT_DATA){
qStreamPrepareScan(task, &tmp->rspOffset, pHandle->execHandle.subType);
tmp->rspOffset.type = TMQ_OFFSET__SNAPSHOT_META;
tqDebug("task exec change to get meta");
continue;
}
*pMetaRsp = *tmp;
tqDebug("task exec exited");
tqDebug("task exec exited, get meta");
break;
}
......
......@@ -92,7 +92,6 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
handle.execHandle.pExecReader = tqOpenReader(pTq->pVnode);
buildSnapContext(reader.meta, reader.version, 0, handle.execHandle.subType, handle.fetchMeta, (SSnapContext **)(&reader.sContext));
reader.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
reader.tqReader = handle.execHandle.pExecReader;
reader.pFilterOutTbUid = handle.execHandle.execDb.pFilterOutTbUid;
......
......@@ -498,13 +498,15 @@ typedef struct SStreamRawScanInfo{
// int64_t snapVersion;
// void *metaInfo;
// void *dataInfo;
SVnode* vnode;
SWalCkHead* pCkHead;
SReadHandle* readHandle;
bool needFetchLog;
bool hasDataInOneFetchVer;
SSDataBlock pRes; // result SSDataBlock
uint64_t groupId;
STsdbReader* dataReader;
SSnapContext* sContext;
STqReader* tqReader;
SHashObj* pFilterOutTbUid;
}SStreamRawScanInfo;
typedef struct SSysTableScanInfo {
......
......@@ -768,7 +768,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
pTaskInfo->streamInfo.tableCond.twindows.skey = pOffset->ts;
pTaskInfo->tableqinfoList.pTableList = taosArrayInit(1, sizeof(STableKeyInfo));
taosArrayPush(pTaskInfo->tableqinfoList.pTableList, &(STableKeyInfo){.uid = mtInfo.uid, .groupId = 0});
tsdbReaderOpen(pInfo->readHandle->vnode, &pTaskInfo->streamInfo.tableCond, pTaskInfo->tableqinfoList.pTableList, &pInfo->dataReader, NULL);
tsdbReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pTaskInfo->tableqinfoList.pTableList, &pInfo->dataReader, NULL);
qDebug("tsdb reader snapshot change to uid %ld ts %ld", pOffset->uid, pOffset->ts);
}else if(pOffset->type == TMQ_OFFSET__SNAPSHOT_META){
......
......@@ -1464,6 +1464,7 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStreamRawScanInfo* pInfo = pOperator->info;
pTaskInfo->streamInfo.metaRsp.metaRspLen = 0; // use metaRspLen !=0 to judge if data is meta
pTaskInfo->streamInfo.metaRsp.metaRsp = NULL;
qDebug("stream scan called");
if(pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA){
......@@ -1529,29 +1530,49 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
return NULL;
}else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__LOG) {
if(pInfo->pCkHead == NULL){
pInfo->pCkHead = taosMemoryCalloc(1, sizeof(SWalCkHead) + 2048);
if (pInfo->pCkHead == NULL) {
int64_t fetchVer = pTaskInfo->streamInfo.prepareStatus.version;
while(1){
if(pInfo->needFetchLog){
fetchVer++;
if (tqFetchLog(pInfo->tqReader->pWalReader, pInfo->sContext->withMeta, &fetchVer, &pInfo->pCkHead) < 0) {
qDebug("tmq poll: consumer log end. offset %" PRId64, fetchVer);
pTaskInfo->streamInfo.metaRsp.rspOffset.version = fetchVer;
pTaskInfo->streamInfo.metaRsp.rspOffset.type = TMQ_OFFSET__LOG;
return NULL;
}
SWalCont* pHead = &pInfo->pCkHead->head;
qDebug("tmq poll: consumer log offset %" PRId64 " msgType %d", fetchVer, pHead->msgType);
if (pHead->msgType == TDMT_VND_SUBMIT) {
SSubmitReq* pCont = (SSubmitReq*)&pHead->body;
tqReaderSetDataMsg(pInfo->tqReader, pCont, 0);
pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG;
pTaskInfo->streamInfo.lastStatus.version = fetchVer;
pInfo->hasDataInOneFetchVer = false;
}
}
walSetReaderCapacity(pInfo->readHandle->pWalReader, 2048);
}
int64_t fetchVer = pTaskInfo->streamInfo.prepareStatus.version;
SWalCont* pHead = &pInfo->pCkHead->head;
SWalCont* pHead = &pInfo->pCkHead->head;
if(pHead->msgType != TDMT_VND_SUBMIT){
fetchVer++;
if (tqFetchLog(pInfo->readHandle->pWalReader, pInfo->sContext->withMeta, &fetchVer, &pInfo->pCkHead) < 0) {
return NULL;
}
qDebug("tmq poll: consumer log offset %" PRId64 " msgType %d", fetchVer, pHead->msgType);
pHead = &pInfo->pCkHead->head;
if (pHead->msgType == TDMT_VND_SUBMIT) {
blockDataFreeRes(&pInfo->pRes);
SSDataBlock* block = tqLogScanExec(pInfo->sContext->subType, pInfo->tqReader, pInfo->pFilterOutTbUid, &pInfo->pRes);
if(block){
qDebug("fetch data msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType);
pInfo->needFetchLog = false;
pInfo->hasDataInOneFetchVer = true;
return block;
}else{
pInfo->needFetchLog = true;
if(pHead->msgType == TDMT_VND_SUBMIT){
SSubmitReq* pCont = (SSubmitReq*)&pHead->body;
tqReaderSetDataMsg(pInfo->readHandle->tqReader, pCont, 0);
}else if(pInfo->sContext->withMeta){
if(pInfo->hasDataInOneFetchVer){
return block;
}else{
continue;
}
}
} else if(pInfo->sContext->withMeta){
ASSERT(IS_META_MSG(pHead->msgType));
qDebug("fetch meta msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType);
pTaskInfo->streamInfo.metaRsp.rspOffset.version = fetchVer;
......@@ -1562,23 +1583,8 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
memcpy(pTaskInfo->streamInfo.metaRsp.metaRsp, pHead->body, pHead->bodyLen);
return NULL;
}
}
if (pHead->msgType == TDMT_VND_SUBMIT) {
while(1){
blockDataFreeRes(&pInfo->pRes);
SSDataBlock* block = tqLogScanExec(pInfo->sContext->subType, pInfo->readHandle->tqReader, pInfo->readHandle->pFilterOutTbUid, &pInfo->pRes);
if(!block){
fetchVer++;
if (tqFetchLog(pInfo->readHandle->pWalReader, pInfo->sContext->withMeta, &fetchVer, &pInfo->pCkHead) < 0) {
return NULL;
}
pHead = &pInfo->pCkHead->head;
SSubmitReq* pCont = (SSubmitReq*)&pHead->body;
tqReaderSetDataMsg(pInfo->readHandle->tqReader, pCont, 0);
}
return block;
}
pInfo->needFetchLog = true;
}
}
return NULL;
......@@ -1587,13 +1593,13 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
static void destroyRawScanOperatorInfo(void* param, int32_t numOfOutput) {
SStreamRawScanInfo* pRawScan = (SStreamRawScanInfo*)param;
taosMemoryFreeClear(pRawScan->pCkHead);
if (pRawScan->readHandle->tqReader) {
tqCloseReader(pRawScan->readHandle->tqReader);
if (pRawScan->tqReader) {
tqCloseReader(pRawScan->tqReader);
}
blockDataFreeRes(&pRawScan->pRes);
tsdbReaderClose(pRawScan->dataReader);
destroySnapContext(pRawScan->sContext);
taosHashCleanup(pRawScan->readHandle->pFilterOutTbUid);
taosHashCleanup(pRawScan->pFilterOutTbUid);
taosMemoryFree(pRawScan);
}
......@@ -1613,7 +1619,19 @@ SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pT
return NULL;
}
pInfo->readHandle = pHandle;
pInfo->pCkHead = taosMemoryCalloc(1, sizeof(SWalCkHead) + 2048);
if (pInfo->pCkHead == NULL) {
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
return NULL;
}
pInfo->needFetchLog = true;
pInfo->hasDataInOneFetchVer = false;
pInfo->vnode = pHandle->vnode;
pInfo->pFilterOutTbUid = pHandle->pFilterOutTbUid;
pInfo->tqReader = pHandle->tqReader;
walSetReaderCapacity(pInfo->tqReader->pWalReader, 2048);
pInfo->sContext = pHandle->sContext;
pOperator->name = "RawStreamScanOperator";
// pOperator->blocking = false;
......
......@@ -501,7 +501,8 @@ int main(int argc, char* argv[]) {
if(argc == 3 && strcmp(argv[1], "-c") == 0) {
strcpy(dir, argv[2]);
}else{
strcpy(dir, "../../../sim/psim/cfg");
// strcpy(dir, "../../../sim/psim/cfg");
strcpy(dir, "/var/log");
}
printf("env init\n");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册