未验证 提交 b6af6292 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #13817 from taosdata/feature/stream

fix(stream): build table name
......@@ -199,8 +199,8 @@ typedef struct {
int8_t automatic;
int8_t async;
int8_t freeOffsets;
int8_t waitingRspNum;
int8_t totalRspNum;
int32_t waitingRspNum;
int32_t totalRspNum;
tmq_resp_err_t rspErr;
tmq_commit_cb* userCb;
SArray* successfulOffsets;
......@@ -373,8 +373,9 @@ int32_t tmqCommitCb2(void* param, const SDataBuf* pBuf, int32_t code) {
} else {
taosArrayPush(pParamSet->successfulOffsets, &pParam->pOffset);
}
// count down waiting rsp
int8_t waitingRspNum = atomic_sub_fetch_8(&pParam->params->waitingRspNum, 1);
int32_t waitingRspNum = atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
ASSERT(waitingRspNum >= 0);
if (waitingRspNum == 0) {
......@@ -395,7 +396,8 @@ int32_t tmqCommitCb2(void* param, const SDataBuf* pBuf, int32_t code) {
return 0;
}
int32_t tmqComitInner2(tmq_t* tmq, int8_t automatic, int8_t async, tmq_commit_cb* userCb, void* userParam) {
int32_t tmqCommitInner2(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int8_t automatic, int8_t async,
tmq_commit_cb* userCb, void* userParam) {
int32_t code = -1;
SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
......@@ -466,6 +468,8 @@ int32_t tmqComitInner2(tmq_t* tmq, int8_t automatic, int8_t async, tmq_commit_cb
SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
int64_t transporterId = 0;
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, pMsgSendInfo);
pParamSet->waitingRspNum++;
pParamSet->totalRspNum++;
}
}
......
......@@ -1708,6 +1708,7 @@ char* buildCtbNameByGroupId(const char* stbName, uint64_t groupId) {
pTag->keyLen = strlen(pTag->key);
pTag->type = TSDB_DATA_TYPE_UBIGINT;
pTag->u = groupId;
pTag->length = sizeof(uint64_t);
taosArrayPush(tags, &pTag);
void* cname = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN + 1);
......
......@@ -46,11 +46,12 @@ static SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSche
createTbReq.type = TSDB_CHILD_TABLE;
createTbReq.ctb.suid = suid;
STagVal tagVal = {.cid = pDataBlock->info.numOfCols + 1,
.type = TSDB_DATA_TYPE_UBIGINT,
.pData = (uint8_t*)&pDataBlock->info.groupId,
.nData = sizeof(uint64_t)};
STag* pTag = NULL;
STagVal tagVal = {
.cid = pDataBlock->info.numOfCols + 1,
.type = TSDB_DATA_TYPE_UBIGINT,
.i64 = (int64_t)pDataBlock->info.groupId,
};
STag* pTag = NULL;
taosArrayClear(tagArray);
taosArrayPush(tagArray, &tagVal);
tTagNew(tagArray, 1, false, &pTag);
......@@ -110,10 +111,11 @@ static SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSche
createTbReq.type = TSDB_CHILD_TABLE;
createTbReq.ctb.suid = suid;
STagVal tagVal = {.cid = pDataBlock->info.numOfCols + 1,
.type = TSDB_DATA_TYPE_UBIGINT,
.pData = (uint8_t*)&pDataBlock->info.groupId,
.nData = sizeof(uint64_t)};
STagVal tagVal = {
.cid = pDataBlock->info.numOfCols + 1,
.type = TSDB_DATA_TYPE_UBIGINT,
.i64 = (int64_t)pDataBlock->info.groupId,
};
taosArrayClear(tagArray);
taosArrayPush(tagArray, &tagVal);
STag* pTag = NULL;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册