未验证 提交 d4d9ce20 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #11291 from taosdata/feature/tq

add multi topic test
...@@ -67,7 +67,7 @@ extern "C" { ...@@ -67,7 +67,7 @@ extern "C" {
} \ } \
} }
#define TQ_BUFFER_SIZE 8 #define TQ_BUFFER_SIZE 4
#define TQ_BUCKET_MASK 0xFF #define TQ_BUCKET_MASK 0xFF
#define TQ_BUCKET_SIZE 256 #define TQ_BUCKET_SIZE 256
......
...@@ -421,22 +421,62 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ...@@ -421,22 +421,62 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
int32_t tqProcessRebReq(STQ* pTq, char* msg) { int32_t tqProcessRebReq(STQ* pTq, char* msg) {
SMqMVRebReq req = {0}; SMqMVRebReq req = {0};
terrno = TSDB_CODE_SUCCESS;
tDecodeSMqMVRebReq(msg, &req); tDecodeSMqMVRebReq(msg, &req);
vDebug("vg %d set from consumer %ld to consumer %ld", req.vgId, req.oldConsumerId ,req.newConsumerId); vDebug("vg %d set from consumer %ld to consumer %ld", req.vgId, req.oldConsumerId ,req.newConsumerId);
STqConsumer* pConsumer = tqHandleGet(pTq->tqMeta, req.oldConsumerId); STqConsumer* pConsumer = tqHandleGet(pTq->tqMeta, req.oldConsumerId);
ASSERT(pConsumer); ASSERT(pConsumer);
pConsumer->consumerId = req.newConsumerId; ASSERT(pConsumer->consumerId == req.oldConsumerId);
tqHandleMovePut(pTq->tqMeta, req.newConsumerId, pConsumer); int32_t numOfTopics = taosArrayGetSize(pConsumer->topics);
tqHandleCommit(pTq->tqMeta, req.newConsumerId); if (numOfTopics == 1) {
tqHandlePurge(pTq->tqMeta, req.oldConsumerId); STqTopic* pTopic = taosArrayGet(pConsumer->topics, 0);
terrno = TSDB_CODE_SUCCESS; ASSERT(strcmp(pTopic->topicName, req.topic) == 0);
STqConsumer* pNewConsumer = tqHandleGet(pTq->tqMeta, req.newConsumerId);
if (pNewConsumer == NULL) {
pConsumer->consumerId = req.newConsumerId;
tqHandleMovePut(pTq->tqMeta, req.newConsumerId, pConsumer);
tqHandleCommit(pTq->tqMeta, req.newConsumerId);
tqHandlePurge(pTq->tqMeta, req.oldConsumerId);
return 0;
} else {
taosArrayPush(pNewConsumer->topics, pTopic);
}
} else {
for (int32_t i = 0; i < numOfTopics; i++) {
STqTopic* pTopic = taosArrayGet(pConsumer->topics, i);
if (strcmp(pTopic->topicName, req.topic) == 0) {
STqConsumer* pNewConsumer = tqHandleGet(pTq->tqMeta, req.newConsumerId);
if (pNewConsumer == NULL) {
pNewConsumer = taosMemoryCalloc(1, sizeof(STqConsumer));
if (pNewConsumer == NULL) {
terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
return -1;
}
strcpy(pNewConsumer->cgroup, pConsumer->cgroup);
pNewConsumer->topics = taosArrayInit(0, sizeof(STqTopic));
pNewConsumer->consumerId = req.newConsumerId;
pNewConsumer->epoch = 0;
taosArrayPush(pNewConsumer->topics, pTopic);
tqHandleMovePut(pTq->tqMeta, req.newConsumerId, pConsumer);
tqHandleCommit(pTq->tqMeta, req.newConsumerId);
return 0;
}
ASSERT(pNewConsumer->consumerId == req.newConsumerId);
taosArrayPush(pNewConsumer->topics, pTopic);
break;
}
}
//
}
return 0; return 0;
} }
int32_t tqProcessSetConnReq(STQ* pTq, char* msg) { int32_t tqProcessSetConnReq(STQ* pTq, char* msg) {
SMqSetCVgReq req = {0}; SMqSetCVgReq req = {0};
tDecodeSMqSetCVgReq(msg, &req); tDecodeSMqSetCVgReq(msg, &req);
bool create = false;
vDebug("vg %d set to consumer %ld", req.vgId, req.consumerId); vDebug("vg %d set to consumer %ld", req.vgId, req.consumerId);
STqConsumer* pConsumer = tqHandleGet(pTq->tqMeta, req.consumerId); STqConsumer* pConsumer = tqHandleGet(pTq->tqMeta, req.consumerId);
...@@ -450,6 +490,7 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) { ...@@ -450,6 +490,7 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) {
pConsumer->topics = taosArrayInit(0, sizeof(STqTopic)); pConsumer->topics = taosArrayInit(0, sizeof(STqTopic));
pConsumer->consumerId = req.consumerId; pConsumer->consumerId = req.consumerId;
pConsumer->epoch = 0; pConsumer->epoch = 0;
create = true;
} }
STqTopic* pTopic = taosMemoryCalloc(1, sizeof(STqTopic)); STqTopic* pTopic = taosMemoryCalloc(1, sizeof(STqTopic));
...@@ -483,10 +524,12 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) { ...@@ -483,10 +524,12 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) {
pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(req.qmsg, &handle); pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(req.qmsg, &handle);
ASSERT(pTopic->buffer.output[i].task); ASSERT(pTopic->buffer.output[i].task);
} }
/*printf("set topic %s to consumer %ld on vg %d\n", pTopic->topicName, req.consumerId, pTq->pVnode->vgId);*/ vDebug("set topic %s to consumer %ld on vg %d", pTopic->topicName, req.consumerId, pTq->pVnode->vgId);
taosArrayPush(pConsumer->topics, pTopic); taosArrayPush(pConsumer->topics, pTopic);
tqHandleMovePut(pTq->tqMeta, req.consumerId, pConsumer); if (create) {
tqHandleCommit(pTq->tqMeta, req.consumerId); tqHandleMovePut(pTq->tqMeta, req.consumerId, pConsumer);
tqHandleCommit(pTq->tqMeta, req.consumerId);
}
terrno = TSDB_CODE_SUCCESS; terrno = TSDB_CODE_SUCCESS;
return 0; return 0;
} }
......
...@@ -42,7 +42,7 @@ ...@@ -42,7 +42,7 @@
./test.sh -f tsim/tmq/basic.sim ./test.sh -f tsim/tmq/basic.sim
./test.sh -f tsim/tmq/basic1.sim ./test.sh -f tsim/tmq/basic1.sim
./test.sh -f tsim/tmq/oneTopic.sim ./test.sh -f tsim/tmq/oneTopic.sim
#./test.sh -f tsim/tmq/multiTopic.sim ./test.sh -f tsim/tmq/multiTopic.sim
# --- stable # --- stable
./test.sh -f tsim/stable/disk.sim ./test.sh -f tsim/stable/disk.sim
......
...@@ -226,7 +226,7 @@ void loop_consume(tmq_t* tmq) { ...@@ -226,7 +226,7 @@ void loop_consume(tmq_t* tmq) {
int32_t totalRows = 0; int32_t totalRows = 0;
int32_t skipLogNum = 0; int32_t skipLogNum = 0;
while (running) { while (running) {
tmq_message_t* tmqMsg = tmq_consumer_poll(tmq, 3000); tmq_message_t* tmqMsg = tmq_consumer_poll(tmq, 4000);
if (tmqMsg) { if (tmqMsg) {
totalMsgs++; totalMsgs++;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册