diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index ed13196bd8d953f8ea0b9ec96baea06cf79b58b3..5dd14842c276d053f14e87d42a5f99ca6de68554 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -538,7 +538,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { tqInitDataRsp(&dataRsp, &req, pHandle->execHandle.subType); tqOffsetResetToLog(&dataRsp.rspOffset, walGetLastVer(pTq->pVnode->pWal)); - tqDebug("tmq poll: consumer %" PRId64 ", subkey %s, vg %d, offset reset to %" PRId64, consumerId, + tqDebug("tmq poll: consumer:0x %" PRIx64 ", subkey %s, vg %d, offset reset to %" PRId64, consumerId, pHandle->subKey, TD_VID(pTq->pVnode), dataRsp.rspOffset.version); if (tqSendDataRsp(pTq, pMsg, &req, &dataRsp) < 0) { code = -1; @@ -556,7 +556,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { return code; } } else if (reqOffset.type == TMQ_OFFSET__RESET_NONE) { - tqError("tmq poll: subkey %s, no offset committed for consumer %" PRId64 + tqError("tmq poll: subkey %s, no offset committed for consumer:0x%" PRIx64 " in vg %d, subkey %s, reset none failed", pHandle->subKey, consumerId, TD_VID(pTq->pVnode), req.subKey); terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET; @@ -585,7 +585,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { pPushEntry->dataRsp.head.epoch = reqEpoch; pPushEntry->dataRsp.head.mqMsgType = TMQ_MSG_TYPE__POLL_RSP; taosHashPut(pTq->pPushMgr, pHandle->subKey, strlen(pHandle->subKey) + 1, &pPushEntry, sizeof(void*)); - tqDebug("tmq poll: consumer %" PRId64 ", subkey %s, vg %d save handle to push mgr", consumerId, pHandle->subKey, + tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vg %d save handle to push mgr", consumerId, pHandle->subKey, TD_VID(pTq->pVnode)); // unlock taosWUnLockLatch(&pTq->pushLock); @@ -660,7 +660,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { while (1) { consumerEpoch = atomic_load_32(&pHandle->epoch); if (consumerEpoch > reqEpoch) { - tqWarn("tmq poll: consumer %" PRId64 " (epoch %d), subkey %s, vg %d offset %" PRId64 + tqWarn("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, vg %d offset %" PRId64 ", found new consumer epoch %d, discard req epoch %d", consumerId, req.epoch, pHandle->subKey, TD_VID(pTq->pVnode), fetchVer, consumerEpoch, reqEpoch); break; @@ -798,7 +798,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey)); if (pHandle == NULL) { if (req.oldConsumerId != -1) { - tqError("vgId:%d, build new consumer handle %s for consumer %" PRId64 ", but old consumerId is %" PRId64 "", + tqError("vgId:%d, build new consumer handle %s for consumer:0x%" PRIx64 ", but old consumerId is %" PRId64 "", req.vgId, req.subKey, req.newConsumerId, req.oldConsumerId); } if (req.newConsumerId == -1) { @@ -873,7 +873,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, NULL, NULL); } taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle)); - tqDebug("try to persist handle %s consumer %" PRId64, req.subKey, pHandle->consumerId); + tqDebug("try to persist handle %s consumer:0x%" PRIx64, req.subKey, pHandle->consumerId); if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) { return -1; } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index b1b14a9569ee07fe2dcf8e6ddd882e94e41a70ff..cfbbaf2e7ee227a6d7fd3dfb096eec5c2bea721c 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1184,13 +1184,14 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprS } if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) { - releaseBufPage(pBuf, page); - - if (pBlock->info.rows <= 0 || pRow->numOfRows > pBlock->info.capacity) { - qError("error in copy data to ssdatablock, existed rows in block:%d, rows in pRow:%d, capacity:%d, %s", - pBlock->info.rows, pRow->numOfRows, pBlock->info.capacity, GET_TASKID(pTaskInfo)); - T_LONG_JMP(pTaskInfo->env, TSDB_CODE_APP_ERROR); + // expand the result datablock capacity + if (pRow->numOfRows > pBlock->info.capacity) { + blockDataEnsureCapacity(pBlock, pRow->numOfRows); + qDebug("datablock capacity not sufficient, expand to requried:%d, current capacity:%d, %s", pRow->numOfRows, + pBlock->info.capacity, GET_TASKID(pTaskInfo)); + // todo set the pOperator->resultInfo size } else { + releaseBufPage(pBuf, page); break; } }