提交 cde4085a 编写于 作者: L Liu Jicong

optimize msg batch

上级 01c94a77
......@@ -177,6 +177,7 @@ typedef struct SSDataBlock {
enum {
FETCH_TYPE__DATA = 1,
FETCH_TYPE__META,
FETCH_TYPE__SEP,
FETCH_TYPE__NONE,
};
......
......@@ -1603,6 +1603,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
return NULL;
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) {
SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
tscDebug("consumer %ld actual process poll rsp", tmq->consumerId);
/*atomic_sub_fetch_32(&tmq->readyRequest, 1);*/
int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
if (pollRspWrapper->dataRsp.head.epoch == consumerEpoch) {
......@@ -1718,7 +1719,10 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
while (1) {
tmqHandleAllDelayedTask(tmq);
if (tmqPollImpl(tmq, timeout) < 0) return NULL;
if (tmqPollImpl(tmq, timeout) < 0) {
tscDebug("return since poll err");
/*return NULL;*/
}
rspObj = tmqHandleAllRsp(tmq, timeout, false);
if (rspObj) {
......
......@@ -420,6 +420,8 @@ static int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq, int8_t su
return -1;
}
pRsp->withTbName = 0;
#if 0
pRsp->withTbName = pReq->withTbName;
if (pRsp->withTbName) {
pRsp->blockTbName = taosArrayInit(0, sizeof(void*));
......@@ -428,6 +430,7 @@ static int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq, int8_t su
return -1;
}
}
#endif
if (subType == TOPIC_SUB_TYPE__COLUMN) {
pRsp->withSchema = false;
......
......@@ -213,6 +213,8 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_
#endif
int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) {
tqDebug("vgId:%d tq push msg ver %ld", pTq->pVnode->config.vgId, ver);
if (msgType == TDMT_VND_SUBMIT) {
// lock push mgr to avoid potential msg lost
taosWLockLatch(&pTq->pushLock);
......@@ -257,6 +259,8 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
pRsp->blockNum++;
}
tqDebug("vgId:%d tq handle push, subkey: %s, block num: %d", pTq->pVnode->config.vgId,
pPushEntry->pHandle->subKey, pRsp->blockNum);
if (pRsp->blockNum > 0) {
// set offset
tqOffsetResetToLog(&pRsp->rspOffset, ver);
......
......@@ -314,14 +314,18 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) {
return -1;
}
void* body = pReader->pWalReader->pHead->head.body;
#if 0
if (pReader->pWalReader->pHead->head.msgType != TDMT_VND_SUBMIT) {
// TODO do filter
ret->fetchType = FETCH_TYPE__META;
ret->meta = pReader->pWalReader->pHead->head.body;
return 0;
} else {
tqReaderSetDataMsg(pReader, body, pReader->pWalReader->pHead->head.version);
#endif
tqReaderSetDataMsg(pReader, body, pReader->pWalReader->pHead->head.version);
#if 0
}
#endif
}
while (tqNextDataBlock(pReader)) {
......@@ -333,6 +337,7 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) {
continue;
}
ret->fetchType = FETCH_TYPE__DATA;
tqDebug("return data rows %d", ret->data.info.rows);
return 0;
}
......@@ -340,7 +345,7 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) {
ret->offset.type = TMQ_OFFSET__LOG;
ret->offset.version = pReader->ver;
ASSERT(pReader->ver >= 0);
ret->fetchType = FETCH_TYPE__NONE;
ret->fetchType = FETCH_TYPE__SEP;
tqDebug("return offset %" PRId64 ", processed finish", ret->offset.version);
return 0;
}
......
......@@ -193,6 +193,7 @@ enum {
OP_OPENED = 0x1,
OP_RES_TO_RETURN = 0x5,
OP_EXEC_DONE = 0x9,
OP_EXEC_RECV = 0x11,
};
typedef struct SOperatorFpSet {
......
......@@ -241,9 +241,20 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
// The downstream exec may change the value of the newgroup, so use a local variable instead.
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
if (pBlock == NULL) {
if (pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE && pOperator->status == OP_EXEC_RECV &&
pFinalRes->info.rows == 0) {
pOperator->status = OP_OPENED;
continue;
}
qDebug("set op close, exec %d, status %d rows %d", pTaskInfo->execModel, pOperator->status,
pFinalRes->info.rows);
doSetOperatorCompleted(pOperator);
break;
}
if (pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE) {
qDebug("set status recv");
pOperator->status = OP_EXEC_RECV;
}
// for stream interval
if (pBlock->info.type == STREAM_RETRIEVE || pBlock->info.type == STREAM_DELETE_RESULT ||
......@@ -258,7 +269,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
}
setInfoForNewGroup(pBlock, pLimitInfo, pOperator);
if (pTaskInfo->execModel == OPTR_EXEC_MODEL_BATCH && pOperator->status == OP_EXEC_DONE) {
if (pOperator->status == OP_EXEC_DONE) {
break;
}
......@@ -302,6 +313,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
// when apply the limit/offset for each group, pRes->info.rows may be 0, due to limit constraint.
if (pFinalRes->info.rows > 0 || (pOperator->status == OP_EXEC_DONE)) {
qDebug("project return %d rows, status %d", pFinalRes->info.rows, pOperator->status);
break;
}
} else {
......
......@@ -1504,8 +1504,8 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
if (setBlockIntoRes(pInfo, &ret.data) < 0) {
ASSERT(0);
}
// TODO clean data block
if (pInfo->pRes->info.rows > 0) {
pOperator->status = OP_EXEC_RECV;
qDebug("queue scan log return %d rows", pInfo->pRes->info.rows);
return pInfo->pRes;
}
......@@ -1514,18 +1514,19 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
// pTaskInfo->streamInfo.lastStatus = ret.offset;
// pTaskInfo->streamInfo.metaBlk = ret.meta;
// return NULL;
} else if (ret.fetchType == FETCH_TYPE__NONE) {
} else if (ret.fetchType == FETCH_TYPE__NONE ||
(ret.fetchType == FETCH_TYPE__SEP && pOperator->status == OP_EXEC_RECV)) {
pTaskInfo->streamInfo.lastStatus = ret.offset;
ASSERT(pTaskInfo->streamInfo.lastStatus.version >= pTaskInfo->streamInfo.prepareStatus.version);
ASSERT(pTaskInfo->streamInfo.lastStatus.version + 1 == pInfo->tqReader->pWalReader->curVersion);
char formatBuf[80];
tFormatOffset(formatBuf, 80, &ret.offset);
qDebug("queue scan log return null, offset %s", formatBuf);
pOperator->status = OP_OPENED;
return NULL;
} else {
ASSERT(0);
}
}
#if 0
} else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) {
SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);
if (pResult && pResult->info.rows > 0) {
......@@ -1534,6 +1535,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
}
qDebug("stream scan tsdb return null");
return NULL;
#endif
} else {
ASSERT(0);
return NULL;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册