diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 48c85cf265c8637d33af0a330ba3b0baa12ddeaf..10c6c3623ad219625b688a9b1dce5097d337f6b9 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -199,8 +199,8 @@ typedef struct { int8_t automatic; int8_t async; int8_t freeOffsets; - int8_t waitingRspNum; - int8_t totalRspNum; + int32_t waitingRspNum; + int32_t totalRspNum; tmq_resp_err_t rspErr; tmq_commit_cb* userCb; SArray* successfulOffsets; @@ -373,8 +373,9 @@ int32_t tmqCommitCb2(void* param, const SDataBuf* pBuf, int32_t code) { } else { taosArrayPush(pParamSet->successfulOffsets, &pParam->pOffset); } + // count down waiting rsp - int8_t waitingRspNum = atomic_sub_fetch_8(&pParam->params->waitingRspNum, 1); + int32_t waitingRspNum = atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1); ASSERT(waitingRspNum >= 0); if (waitingRspNum == 0) { @@ -395,7 +396,8 @@ int32_t tmqCommitCb2(void* param, const SDataBuf* pBuf, int32_t code) { return 0; } -int32_t tmqComitInner2(tmq_t* tmq, int8_t automatic, int8_t async, tmq_commit_cb* userCb, void* userParam) { +int32_t tmqCommitInner2(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int8_t automatic, int8_t async, + tmq_commit_cb* userCb, void* userParam) { int32_t code = -1; SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet)); @@ -466,6 +468,8 @@ int32_t tmqComitInner2(tmq_t* tmq, int8_t automatic, int8_t async, tmq_commit_cb SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp); int64_t transporterId = 0; asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, pMsgSendInfo); + pParamSet->waitingRspNum++; + pParamSet->totalRspNum++; } } diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index b4ece426b398bff07515343b58ccdcc38ca52b58..5a2aaed74e5e101d97b82388f199607fa646967e 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1708,6 +1708,7 @@ char* buildCtbNameByGroupId(const char* stbName, uint64_t groupId) { pTag->keyLen = strlen(pTag->key); pTag->type = TSDB_DATA_TYPE_UBIGINT; pTag->u = groupId; + pTag->length = sizeof(uint64_t); taosArrayPush(tags, &pTag); void* cname = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN + 1); diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 391a00844004e97a2dda7b57304706b66d831f75..0cca1d2e10be67d1e3eff83b51d98df35fa33f8a 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -46,11 +46,12 @@ static SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSche createTbReq.type = TSDB_CHILD_TABLE; createTbReq.ctb.suid = suid; - STagVal tagVal = {.cid = pDataBlock->info.numOfCols + 1, - .type = TSDB_DATA_TYPE_UBIGINT, - .pData = (uint8_t*)&pDataBlock->info.groupId, - .nData = sizeof(uint64_t)}; - STag* pTag = NULL; + STagVal tagVal = { + .cid = pDataBlock->info.numOfCols + 1, + .type = TSDB_DATA_TYPE_UBIGINT, + .i64 = (int64_t)pDataBlock->info.groupId, + }; + STag* pTag = NULL; taosArrayClear(tagArray); taosArrayPush(tagArray, &tagVal); tTagNew(tagArray, 1, false, &pTag); @@ -110,10 +111,11 @@ static SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSche createTbReq.type = TSDB_CHILD_TABLE; createTbReq.ctb.suid = suid; - STagVal tagVal = {.cid = pDataBlock->info.numOfCols + 1, - .type = TSDB_DATA_TYPE_UBIGINT, - .pData = (uint8_t*)&pDataBlock->info.groupId, - .nData = sizeof(uint64_t)}; + STagVal tagVal = { + .cid = pDataBlock->info.numOfCols + 1, + .type = TSDB_DATA_TYPE_UBIGINT, + .i64 = (int64_t)pDataBlock->info.groupId, + }; taosArrayClear(tagArray); taosArrayPush(tagArray, &tagVal); STag* pTag = NULL;