未验证 提交 097723f4 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #20151 from taosdata/feature/3_liaohj

fix(query): expand output buffer.
...@@ -538,7 +538,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -538,7 +538,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
tqInitDataRsp(&dataRsp, &req, pHandle->execHandle.subType); tqInitDataRsp(&dataRsp, &req, pHandle->execHandle.subType);
tqOffsetResetToLog(&dataRsp.rspOffset, walGetLastVer(pTq->pVnode->pWal)); tqOffsetResetToLog(&dataRsp.rspOffset, walGetLastVer(pTq->pVnode->pWal));
tqDebug("tmq poll: consumer %" PRId64 ", subkey %s, vg %d, offset reset to %" PRId64, consumerId, tqDebug("tmq poll: consumer:0x %" PRIx64 ", subkey %s, vg %d, offset reset to %" PRId64, consumerId,
pHandle->subKey, TD_VID(pTq->pVnode), dataRsp.rspOffset.version); pHandle->subKey, TD_VID(pTq->pVnode), dataRsp.rspOffset.version);
if (tqSendDataRsp(pTq, pMsg, &req, &dataRsp) < 0) { if (tqSendDataRsp(pTq, pMsg, &req, &dataRsp) < 0) {
code = -1; code = -1;
...@@ -556,7 +556,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -556,7 +556,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
return code; return code;
} }
} else if (reqOffset.type == TMQ_OFFSET__RESET_NONE) { } else if (reqOffset.type == TMQ_OFFSET__RESET_NONE) {
tqError("tmq poll: subkey %s, no offset committed for consumer %" PRId64 tqError("tmq poll: subkey %s, no offset committed for consumer:0x%" PRIx64
" in vg %d, subkey %s, reset none failed", " in vg %d, subkey %s, reset none failed",
pHandle->subKey, consumerId, TD_VID(pTq->pVnode), req.subKey); pHandle->subKey, consumerId, TD_VID(pTq->pVnode), req.subKey);
terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET; terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
...@@ -585,7 +585,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -585,7 +585,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
pPushEntry->dataRsp.head.epoch = reqEpoch; pPushEntry->dataRsp.head.epoch = reqEpoch;
pPushEntry->dataRsp.head.mqMsgType = TMQ_MSG_TYPE__POLL_RSP; pPushEntry->dataRsp.head.mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
taosHashPut(pTq->pPushMgr, pHandle->subKey, strlen(pHandle->subKey) + 1, &pPushEntry, sizeof(void*)); taosHashPut(pTq->pPushMgr, pHandle->subKey, strlen(pHandle->subKey) + 1, &pPushEntry, sizeof(void*));
tqDebug("tmq poll: consumer %" PRId64 ", subkey %s, vg %d save handle to push mgr", consumerId, pHandle->subKey, tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vg %d save handle to push mgr", consumerId, pHandle->subKey,
TD_VID(pTq->pVnode)); TD_VID(pTq->pVnode));
// unlock // unlock
taosWUnLockLatch(&pTq->pushLock); taosWUnLockLatch(&pTq->pushLock);
...@@ -660,7 +660,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -660,7 +660,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
while (1) { while (1) {
consumerEpoch = atomic_load_32(&pHandle->epoch); consumerEpoch = atomic_load_32(&pHandle->epoch);
if (consumerEpoch > reqEpoch) { if (consumerEpoch > reqEpoch) {
tqWarn("tmq poll: consumer %" PRId64 " (epoch %d), subkey %s, vg %d offset %" PRId64 tqWarn("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, vg %d offset %" PRId64
", found new consumer epoch %d, discard req epoch %d", ", found new consumer epoch %d, discard req epoch %d",
consumerId, req.epoch, pHandle->subKey, TD_VID(pTq->pVnode), fetchVer, consumerEpoch, reqEpoch); consumerId, req.epoch, pHandle->subKey, TD_VID(pTq->pVnode), fetchVer, consumerEpoch, reqEpoch);
break; break;
...@@ -798,7 +798,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg ...@@ -798,7 +798,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey)); STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
if (pHandle == NULL) { if (pHandle == NULL) {
if (req.oldConsumerId != -1) { if (req.oldConsumerId != -1) {
tqError("vgId:%d, build new consumer handle %s for consumer %" PRId64 ", but old consumerId is %" PRId64 "", tqError("vgId:%d, build new consumer handle %s for consumer:0x%" PRIx64 ", but old consumerId is %" PRId64 "",
req.vgId, req.subKey, req.newConsumerId, req.oldConsumerId); req.vgId, req.subKey, req.newConsumerId, req.oldConsumerId);
} }
if (req.newConsumerId == -1) { if (req.newConsumerId == -1) {
...@@ -873,7 +873,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg ...@@ -873,7 +873,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, NULL, NULL); pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, NULL, NULL);
} }
taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle)); taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle));
tqDebug("try to persist handle %s consumer %" PRId64, req.subKey, pHandle->consumerId); tqDebug("try to persist handle %s consumer:0x%" PRIx64, req.subKey, pHandle->consumerId);
if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) { if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) {
return -1; return -1;
} }
......
...@@ -1184,13 +1184,14 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprS ...@@ -1184,13 +1184,14 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprS
} }
if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) { if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
releaseBufPage(pBuf, page); // expand the result datablock capacity
if (pRow->numOfRows > pBlock->info.capacity) {
if (pBlock->info.rows <= 0 || pRow->numOfRows > pBlock->info.capacity) { blockDataEnsureCapacity(pBlock, pRow->numOfRows);
qError("error in copy data to ssdatablock, existed rows in block:%d, rows in pRow:%d, capacity:%d, %s", qDebug("datablock capacity not sufficient, expand to requried:%d, current capacity:%d, %s", pRow->numOfRows,
pBlock->info.rows, pRow->numOfRows, pBlock->info.capacity, GET_TASKID(pTaskInfo)); pBlock->info.capacity, GET_TASKID(pTaskInfo));
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_APP_ERROR); // todo set the pOperator->resultInfo size
} else { } else {
releaseBufPage(pBuf, page);
break; break;
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册