提交 8572f4a3 编写于 作者: wmmhello's avatar wmmhello

opti:change push mgr to consume msg for subscribe

上级 9bd74ec5
......@@ -1702,7 +1702,7 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
for (int j = 0; j < numOfVg; j++) {
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
if (taosGetTimestampMs() - pVg->emptyBlockReceiveTs < EMPTY_BLOCK_POLL_IDLE_DURATION) { // less than 100ms
tscTrace("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for 10ms before start next poll", tmq->consumerId,
tscDebug("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for 10ms before start next poll", tmq->consumerId,
tmq->epoch, pVg->vgId);
continue;
}
......@@ -1710,7 +1710,7 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
if (vgStatus == TMQ_VG_STATUS__WAIT) {
int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1);
tscTrace("consumer:0x%" PRIx64 " epoch %d wait poll-rsp, skip vgId:%d skip cnt %d", tmq->consumerId, tmq->epoch,
tscDebug("consumer:0x%" PRIx64 " epoch %d wait poll-rsp, skip vgId:%d skip cnt %d", tmq->consumerId, tmq->epoch,
pVg->vgId, vgSkipCnt);
continue;
#if 0
......
......@@ -1274,10 +1274,10 @@ int32_t taosCreateLog(const char *logname, int32_t logFileNum, const char *cfgDi
taosSetAllDebugFlag(cfgGetItem(pCfg, "debugFlag")->i32, false);
if (taosMulModeMkDir(tsLogDir, 0777) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
printf("failed to create dir:%s since %s", tsLogDir, terrstr());
cfgCleanup(pCfg);
return -1;
// terrno = TAOS_SYSTEM_ERROR(errno);
// printf("failed to create dir:%s since %s", tsLogDir, terrstr());
// cfgCleanup(pCfg);
// return -1;
}
if (taosInitLog(logname, logFileNum) != 0) {
......
......@@ -5328,9 +5328,9 @@ int32_t tSerializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq) {
int32_t tDeserializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq) {
int32_t headLen = sizeof(SMsgHead);
SMsgHead *pHead = buf;
pHead->vgId = pReq->head.vgId;
pHead->contLen = pReq->head.contLen;
// SMsgHead *pHead = buf;
// pHead->vgId = pReq->head.vgId;
// pHead->contLen = pReq->head.contLen;
SDecoder decoder = {0};
tDecoderInit(&decoder, (char *)buf + headLen, bufLen - headLen);
......
......@@ -1084,13 +1084,15 @@ int32_t tqProcessSubmitReqForSubscribe(STQ* pTq) {
taosWLockLatch(&pTq->lock);
for(size_t i = 0; i < taosArrayGetSize(pTq->pPushArray); i++){
STqHandle* pHandle = (STqHandle*)taosArrayGetP(pTq->pPushArray, i);
if(pHandle->msg == NULL){
if(ASSERT(pHandle->msg != NULL)){
tqError("pHandle->msg should not be null");
break;
}else{
SRpcMsg msg = {.msgType = TDMT_VND_TMQ_CONSUME, .pCont = pHandle->msg->pCont, .contLen = pHandle->msg->contLen, .info = pHandle->msg->info};
tmsgPutToQueue(&pTq->pVnode->msgCb, QUERY_QUEUE, &msg);
taosMemoryFree(pHandle->msg);
pHandle->msg = NULL;
}
SRpcMsg msg = {.msgType = TDMT_VND_TMQ_CONSUME, .pCont = pHandle->msg->pCont, .contLen = pHandle->msg->contLen};
tmsgPutToQueue(&pTq->pVnode->msgCb, QUERY_QUEUE, &msg);
taosMemoryFree(pHandle->msg);
pHandle->msg = NULL;
}
taosArrayClear(pTq->pPushArray);
// unlock
......
......@@ -181,15 +181,20 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
// code = tqRegisterPushHandle(pTq, pHandle, pRequest, pMsg, &dataRsp, TMQ_MSG_TYPE__POLL_RSP);
// lock
taosWLockLatch(&pTq->lock);
if(pHandle->msg != NULL){
if(ASSERT(pHandle->msg == NULL)){
tqError("pHandle->msg should be null");
taosWUnLockLatch(&pTq->lock);
goto end;
}
pHandle->msg = taosMemoryCalloc(1, sizeof(SRpcMsg));
memcpy(pHandle->msg, pMsg, sizeof(SRpcMsg));
pHandle->msg->pCont = rpcMallocCont(pMsg->contLen);
memcpy(pHandle->msg->pCont, pMsg->pCont, pMsg->contLen);
pHandle->msg->contLen = pMsg->contLen;
tqError("data is over, register to handle:%p, pCont:%p, len:%d", pHandle, pHandle->msg->pCont, pHandle->msg->contLen);
taosArrayPush(pTq->pPushArray, &pHandle);
taosWUnLockLatch(&pTq->lock);
tDeleteSMqDataRsp(&dataRsp);
return code;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册