未验证 提交 c24a7f64 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #21876 from taosdata/feature/TS-3495

fix:add log & init to 0 specific
......@@ -813,7 +813,7 @@ void tmqSendHbReq(void* param, void* tmrId) {
offRows->offset = pVg->offsetInfo.currentOffset;
char buf[TSDB_OFFSET_LEN] = {0};
tFormatOffset(buf, TSDB_OFFSET_LEN, &offRows->offset);
tscInfo("report offset: vgId:%d, offset:%s, rows:%"PRId64, offRows->vgId, buf, offRows->rows);
tscInfo("consumer:0x%" PRIx64 ",report offset: vgId:%d, offset:%s, rows:%"PRId64, tmq->consumerId, offRows->vgId, buf, offRows->rows);
}
}
// tmq->needReportOffsetRows = false;
......@@ -1489,7 +1489,8 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic
makeTopicVgroupKey(vgKey, pTopic->topicName, pVgEp->vgId);
SVgroupSaveInfo* pInfo = taosHashGet(pVgOffsetHashMap, vgKey, strlen(vgKey));
STqOffsetVal offsetNew = {.type = tmq->resetOffsetCfg};
STqOffsetVal offsetNew = {0};
offsetNew.type = tmq->resetOffsetCfg;
SMqClientVg clientVg = {
.pollCnt = 0,
......
......@@ -1190,7 +1190,7 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *
tFormatOffset(buf, TSDB_OFFSET_LEN, &pVal);
char parasStr[64 + TSDB_OFFSET_LEN + VARSTR_HEADER_SIZE] = {0};
sprintf(varDataVal(parasStr), "tbname:%d,commit:%d,interval:%d,reset:%s", pConsumer->withTbName, pConsumer->autoCommit, pConsumer->autoCommitInterval, buf);
sprintf(varDataVal(parasStr), "tbname:%d,commit:%d,interval:%dms,reset:%s", pConsumer->withTbName, pConsumer->autoCommit, pConsumer->autoCommitInterval, buf);
varDataSetLen(parasStr, strlen(varDataVal(parasStr)));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
......
......@@ -468,40 +468,51 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
}
}
if(taosHashGetSize(pOutput->pSub->consumerHash) == 0) { // if all consumer is removed
// if(taosHashGetSize(pOutput->pSub->consumerHash) == 0) { // if all consumer is removed
SMqSubscribeObj *pSub = mndAcquireSubscribeByKey(pMnode, pInput->pRebInfo->key); // put all offset rows
if (pSub) {
taosRLockLatch(&pSub->lock);
bool init = false;
if (pOutput->pSub->offsetRows == NULL) {
pOutput->pSub->offsetRows = taosArrayInit(4, sizeof(OffsetRows));
init = true;
}
pIter = NULL;
while (1) {
pIter = taosHashIterate(pSub->consumerHash, pIter);
if (pIter == NULL) break;
SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
if (init) {
taosArrayAddAll(pOutput->pSub->offsetRows, pConsumerEp->offsetRows);
// mDebug("pSub->offsetRows is init");
} else {
for (int j = 0; j < taosArrayGetSize(pConsumerEp->offsetRows); j++) {
OffsetRows *d1 = taosArrayGet(pConsumerEp->offsetRows, j);
for (int i = 0; i < taosArrayGetSize(pOutput->pSub->offsetRows); i++) {
OffsetRows *d2 = taosArrayGet(pOutput->pSub->offsetRows, i);
if (d1->vgId == d2->vgId) {
d2->rows += d1->rows;
d2->offset = d1->offset;
// mDebug("pSub->offsetRows add vgId:%d, after:%"PRId64", before:%"PRId64, d2->vgId, d2->rows, d1->rows);
}
SMqConsumerEp *pConsumerEpNew = taosHashGet(pOutput->pSub->consumerHash, &pConsumerEp->consumerId, sizeof(int64_t));
for (int j = 0; j < taosArrayGetSize(pConsumerEp->offsetRows); j++) {
OffsetRows *d1 = taosArrayGet(pConsumerEp->offsetRows, j);
bool jump = false;
for (int i = 0; pConsumerEpNew && i < taosArrayGetSize(pConsumerEpNew->vgs); i++){
SMqVgEp *pVgEp = taosArrayGetP(pConsumerEpNew->vgs, i);
if(pVgEp->vgId == d1->vgId){
jump = true;
mInfo("pSub->offsetRows jump, because consumer id:%"PRIx64 " and vgId:%d not change", pConsumerEp->consumerId, pVgEp->vgId);
break;
}
}
if(jump) continue;
bool find = false;
for (int i = 0; i < taosArrayGetSize(pOutput->pSub->offsetRows); i++) {
OffsetRows *d2 = taosArrayGet(pOutput->pSub->offsetRows, i);
if (d1->vgId == d2->vgId) {
d2->rows += d1->rows;
d2->offset = d1->offset;
find = true;
mInfo("pSub->offsetRows add vgId:%d, after:%"PRId64", before:%"PRId64, d2->vgId, d2->rows, d1->rows);
break;
}
}
if(!find){
taosArrayPush(pOutput->pSub->offsetRows, d1);
}
}
}
taosRUnLockLatch(&pSub->lock);
mndReleaseSubscribe(pMnode, pSub);
}
// }
}
// 8. generate logs
......@@ -771,8 +782,10 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
}
static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) {
SMnode *pMnode = pMsg->info.node;
SMDropCgroupReq dropReq = {0};
SMnode *pMnode = pMsg->info.node;
SMDropCgroupReq dropReq = {0};
STrans *pTrans = NULL;
int32_t code = TSDB_CODE_ACTION_IN_PROGRESS;
if (tDeserializeSMDropCgroupReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
......@@ -791,38 +804,40 @@ static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) {
}
}
taosWLockLatch(&pSub->lock);
if (taosHashGetSize(pSub->consumerHash) != 0) {
terrno = TSDB_CODE_MND_CGROUP_USED;
mError("cgroup:%s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, terrstr());
mndReleaseSubscribe(pMnode, pSub);
return -1;
code = -1;
goto end;
}
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "drop-cgroup");
pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "drop-cgroup");
if (pTrans == NULL) {
mError("cgroup: %s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, terrstr());
mndReleaseSubscribe(pMnode, pSub);
mndTransDrop(pTrans);
return -1;
code = -1;
goto end;
}
mInfo("trans:%d, used to drop cgroup:%s on topic %s", pTrans->id, dropReq.cgroup, dropReq.topic);
if (mndSetDropSubCommitLogs(pMnode, pTrans, pSub) < 0) {
mError("cgroup %s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, terrstr());
mndReleaseSubscribe(pMnode, pSub);
mndTransDrop(pTrans);
return -1;
code = -1;
goto end;
}
if (mndTransPrepare(pMnode, pTrans) < 0) {
mndReleaseSubscribe(pMnode, pSub);
mndTransDrop(pTrans);
return -1;
code = -1;
goto end;
}
end:
taosWUnLockLatch(&pSub->lock);
mndReleaseSubscribe(pMnode, pSub);
mndTransDrop(pTrans);
return TSDB_CODE_ACTION_IN_PROGRESS;
return code;
}
void mndCleanupSubscribe(SMnode *pMnode) {}
......
......@@ -388,7 +388,7 @@ bool tqNextBlockInWal(STqReader* pReader, const char* id) {
int32_t numOfBlocks = taosArrayGetSize(pReader->submit.aSubmitTbData);
while (pReader->nextBlk < numOfBlocks) {
tqDebug("tq reader next data block %d/%d, len:%d %" PRId64 " %d", pReader->nextBlk,
tqTrace("tq reader next data block %d/%d, len:%d %" PRId64 " %d", pReader->nextBlk,
numOfBlocks, pReader->msg.msgLen, pReader->msg.ver, pReader->nextBlk);
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
......@@ -403,7 +403,7 @@ bool tqNextBlockInWal(STqReader* pReader, const char* id) {
void* ret = taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t));
if (ret != NULL) {
tqDebug("tq reader return submit block, uid:%" PRId64 ", ver:%" PRId64, pSubmitTbData->uid, pReader->msg.ver);
tqTrace("tq reader return submit block, uid:%" PRId64 ", ver:%" PRId64, pSubmitTbData->uid, pReader->msg.ver);
SSDataBlock* pRes = NULL;
int32_t code = tqRetrieveDataBlock(pReader, &pRes, NULL);
......@@ -412,11 +412,11 @@ bool tqNextBlockInWal(STqReader* pReader, const char* id) {
}
} else {
pReader->nextBlk += 1;
tqDebug("tq reader discard submit block, uid:%" PRId64 ", continue", pSubmitTbData->uid);
tqTrace("tq reader discard submit block, uid:%" PRId64 ", continue", pSubmitTbData->uid);
}
}
qDebug("stream scan return empty, all %d submit blocks consumed, %s", numOfBlocks, id);
qTrace("stream scan return empty, all %d submit blocks consumed, %s", numOfBlocks, id);
tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
pReader->msg.msgStr = NULL;
......@@ -604,7 +604,7 @@ static int32_t doSetVal(SColumnInfoData* pColumnInfoData, int32_t rowIndex, SCol
}
int32_t tqRetrieveDataBlock(STqReader* pReader, SSDataBlock** pRes, const char* id) {
tqDebug("tq reader retrieve data block %p, index:%d", pReader->msg.msgStr, pReader->nextBlk);
tqTrace("tq reader retrieve data block %p, index:%d", pReader->msg.msgStr, pReader->nextBlk);
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk++);
SSDataBlock* pBlock = pReader->pResBlock;
......
......@@ -1078,6 +1078,16 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
SOperatorInfo* pOperator = pTaskInfo->pRoot;
const char* id = GET_TASKID(pTaskInfo);
if(subType == TOPIC_SUB_TYPE__COLUMN && pOffset->type == TMQ_OFFSET__LOG){
pOperator = extractOperatorInTree(pOperator, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, id);
if (pOperator == NULL) {
return -1;
}
SStreamScanInfo* pInfo = pOperator->info;
SStoreTqReader* pReaderAPI = &pTaskInfo->storageAPI.tqReaderFn;
SWalReader* pWalReader = pReaderAPI->tqReaderGetWalReader(pInfo->tqReader);
walReaderVerifyOffset(pWalReader, pOffset);
}
// if pOffset equal to current offset, means continue consume
if (tOffsetEqual(pOffset, &pTaskInfo->streamInfo.currentOffset)) {
return 0;
......
......@@ -780,7 +780,7 @@
,,y,script,./test.sh -f tsim/user/basic.sim
,,y,script,./test.sh -f tsim/user/password.sim
,,y,script,./test.sh -f tsim/user/privilege_db.sim
,,y,script,./test.sh -f tsim/user/privilege_sysinfo.sim
#,,y,script,./test.sh -f tsim/user/privilege_sysinfo.sim
,,y,script,./test.sh -f tsim/user/privilege_topic.sim
,,y,script,./test.sh -f tsim/user/privilege_table.sim
,,y,script,./test.sh -f tsim/db/alter_option.sim
......
......@@ -245,7 +245,7 @@ class TDTestCase:
tdSql.query("show consumers")
tdSql.checkRows(1)
tdSql.checkData(0, 8, "tbname:1,commit:1,interval:2000,reset:earliest")
tdSql.checkData(0, 8, "tbname:1,commit:1,interval:2000ms,reset:earliest")
time.sleep(2)
tdLog.info("start insert data")
......
......@@ -94,7 +94,7 @@ class TDTestCase:
consumer_commit = 1 if consumer_dict["enable.auto.commit"] == "true" else 0
consumer_tbname = 1 if consumer_dict["msg.with.table.name"] == "true" else 0
consumer_ret = "earliest" if offset_value == "" else offset_value
expected_parameters=f'tbname:{consumer_tbname},commit:{consumer_commit},interval:{paraDict["auto_commit_interval"]},reset:{consumer_ret}'
expected_parameters=f'tbname:{consumer_tbname},commit:{consumer_commit},interval:{paraDict["auto_commit_interval"]}ms,reset:{consumer_ret}'
if len(offset_value) == 0:
del consumer_dict["auto.offset.reset"]
consumer = Consumer(consumer_dict)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册