提交 b45f7cef 编写于 作者: L Liu Jicong

fix

上级 e1631975
......@@ -264,7 +264,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
fetchOffset = pReq->currentOffset + 1;
}
printf("tmq poll vg %d req %ld %ld\n", pTq->pVnode->vgId, pReq->currentOffset, fetchOffset);
/*printf("tmq poll vg %d req %ld %ld\n", pTq->pVnode->vgId, pReq->currentOffset, fetchOffset);*/
SMqPollRsp rsp = {
/*.consumerId = consumerId,*/
......@@ -305,7 +305,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
/*pHead = pTopic->pReadhandle->pHead;*/
if (pHead->msgType == TDMT_VND_SUBMIT) {
SSubmitReq* pCont = (SSubmitReq*)&pHead->body;
printf("from topic %s from consumer\n", pTopic->topicName, consumerId);
/*printf("from topic %s from consumer\n", pTopic->topicName, consumerId);*/
qTaskInfo_t task = pTopic->buffer.output[workerId].task;
ASSERT(task);
qSetStreamInput(task, pCont, STREAM_DATA_TYPE_SUBMIT_BLOCK);
......@@ -353,8 +353,9 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
pMsg->pCont = buf;
pMsg->contLen = tlen;
pMsg->code = 0;
printf("vg %d offset %ld msgType %d from epoch %d actual rsp\n", pTq->pVnode->vgId, fetchOffset, pHead->msgType,
pReq->epoch);
/*printf("vg %d offset %ld msgType %d from epoch %d actual rsp\n", pTq->pVnode->vgId, fetchOffset,
* pHead->msgType,*/
/*pReq->epoch);*/
tmsgSendRsp(pMsg);
taosMemoryFree(pHead);
return 0;
......@@ -384,7 +385,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
pMsg->contLen = tlen;
pMsg->code = 0;
tmsgSendRsp(pMsg);
printf("vg %d offset %ld from epoch %d not rsp\n", pTq->pVnode->vgId, fetchOffset, pReq->epoch);
/*printf("vg %d offset %ld from epoch %d not rsp\n", pTq->pVnode->vgId, fetchOffset, pReq->epoch);*/
/*}*/
return 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册