提交 82c141ed 编写于 作者: L Liu Jicong

merge from 3.0

上级 2cb89fe8
......@@ -160,7 +160,7 @@ void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
}
while (running) {
tmq_message_t* tmqmessage = tmq_consumer_poll(tmq, 500);
tmq_message_t* tmqmessage = tmq_consumer_poll(tmq, 1000);
if (tmqmessage) {
msg_process(tmqmessage);
tmq_message_destroy(tmqmessage);
......
......@@ -49,6 +49,11 @@ enum {
TMQ_CONF__RESET_OFFSET__NONE = -3,
};
enum {
TMQ_MSG_TYPE__POLL_RSP = 0,
TMQ_MSG_TYPE__EP_RSP,
};
typedef struct {
uint32_t numOfTables;
SArray* pGroupList;
......
......@@ -1901,6 +1901,7 @@ struct tmq_message_t {
SMqConsumeRsp consumeRsp;
SMqCMGetSubEpRsp getEpRsp;
};
void* extra;
};
static FORCE_INLINE void tDeleteSMqSubTopicEp(SMqSubTopicEp* pSubTopicEp) { taosArrayDestroy(pSubTopicEp->vgs); }
......@@ -1955,7 +1956,6 @@ static FORCE_INLINE void* tDecodeSMqSubTopicEp(void* buf, SMqSubTopicEp* pTopicE
static FORCE_INLINE int32_t tEncodeSMqCMGetSubEpRsp(void** buf, const SMqCMGetSubEpRsp* pRsp) {
int32_t tlen = 0;
tlen += taosEncodeFixedI64(buf, pRsp->consumerId);
tlen += taosEncodeFixedI32(buf, pRsp->epoch);
tlen += taosEncodeString(buf, pRsp->cgroup);
int32_t sz = taosArrayGetSize(pRsp->topics);
tlen += taosEncodeFixedI32(buf, sz);
......@@ -1968,7 +1968,6 @@ static FORCE_INLINE int32_t tEncodeSMqCMGetSubEpRsp(void** buf, const SMqCMGetSu
static FORCE_INLINE void* tDecodeSMqCMGetSubEpRsp(void* buf, SMqCMGetSubEpRsp* pRsp) {
buf = taosDecodeFixedI64(buf, &pRsp->consumerId);
buf = taosDecodeFixedI32(buf, &pRsp->epoch);
buf = taosDecodeStringTo(buf, pRsp->cgroup);
int32_t sz;
buf = taosDecodeFixedI32(buf, &sz);
......
......@@ -69,16 +69,10 @@ struct tmq_t {
SArray* clientTopics; // SArray<SMqClientTopic>
STaosQueue* mqueue; // queue of tmq_message_t
STaosQall* qall;
SRWLatch pollLock;
// stat
int64_t pollCnt;
};
enum {
TMQ_MSG_TYPE__POLL_RSP = 0,
TMQ_MSG_TYPE__EP_RSP,
};
enum {
TMQ_VG_STATUS__IDLE = 0,
TMQ_VG_STATUS__WAIT,
......@@ -125,7 +119,7 @@ typedef struct {
typedef struct {
tmq_t* tmq;
SMqClientVg* pVg;
tmq_message_t* rspMsg;
int32_t epoch;
tsem_t rspSem;
} SMqPollCbParam;
......@@ -249,7 +243,6 @@ tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errs
pTmq->status = 0;
pTmq->pollCnt = 0;
pTmq->epoch = 0;
taosInitRWLatch(&pTmq->pollLock);
// set conf
strcpy(pTmq->clientId, conf->clientId);
strcpy(pTmq->groupId, conf->groupId);
......@@ -632,31 +625,50 @@ void tmqShowMsg(tmq_message_t* tmq_message) {
}
int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
printf("recv poll\n");
SMqPollCbParam* pParam = (SMqPollCbParam*)param;
SMqClientVg* pVg = pParam->pVg;
tmq_t* tmq = pParam->tmq;
if (code != 0) {
printf("msg discard\n");
if (pParam->epoch == tmq->epoch) {
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
}
return 0;
}
int32_t msgEpoch = ((SMqRspHead*)pMsg->pData)->epoch;
int32_t tmqEpoch = atomic_load_32(&tmq->epoch);
if (msgEpoch < tmqEpoch) {
printf("discard rsp epoch %d, current epoch %d\n", msgEpoch, tmqEpoch);
return 0;
}
SMqConsumeRsp* pRsp = calloc(1, sizeof(SMqConsumeRsp));
if (msgEpoch != tmqEpoch) {
printf("mismatch rsp epoch %d, current epoch %d\n", msgEpoch, tmqEpoch);
}
/*SMqConsumeRsp* pRsp = calloc(1, sizeof(SMqConsumeRsp));*/
tmq_message_t* pRsp = taosAllocateQitem(sizeof(tmq_message_t));
if (pRsp == NULL) {
taosWUnLockLatch(&pParam->tmq->pollLock);
return -1;
}
tDecodeSMqConsumeRsp(pMsg->pData, pRsp);
memcpy(pRsp, pMsg->pData, sizeof(SMqRspHead));
tDecodeSMqConsumeRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRsp->consumeRsp);
/*printf("rsp commit off:%ld rsp off:%ld has data:%d\n", pRsp->committedOffset, pRsp->rspOffset, pRsp->numOfTopics);*/
if (pRsp->numOfTopics == 0) {
if (pRsp->consumeRsp.numOfTopics == 0) {
/*printf("no data\n");*/
free(pRsp);
taosWUnLockLatch(&pParam->tmq->pollLock);
taosFreeQitem(pRsp);
return 0;
}
pParam->rspMsg = (tmq_message_t*)pRsp;
pVg->currentOffset = pRsp->rspOffset;
pRsp->extra = pParam->pVg;
taosWriteQitem(tmq->mqueue, pRsp);
printf("poll in queue\n");
/*pParam->rspMsg = (tmq_message_t*)pRsp;*/
/*pVg->currentOffset = pRsp->consumeRsp.rspOffset;*/
/*printf("rsp offset: %ld\n", rsp.rspOffset);*/
/*printf("-----msg begin----\n");*/
taosWUnLockLatch(&pParam->tmq->pollLock);
/*printf("\n-----msg end------\n");*/
return 0;
}
......@@ -715,6 +727,10 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) {
tDeleteSMqCMGetSubEpRsp(&rsp);
} else {
tmq_message_t* pRsp = taosAllocateQitem(sizeof(tmq_message_t));
if (pRsp == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
memcpy(pRsp, pMsg->pData, sizeof(SMqRspHead));
tDecodeSMqCMGetSubEpRsp(pMsg->pData, &pRsp->getEpRsp);
taosWriteQitem(tmq->mqueue, pRsp);
......@@ -723,6 +739,7 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) {
}
int32_t tmqAskEp(tmq_t* tmq, bool sync) {
printf("ask ep sync %d\n", sync);
int32_t tlen = sizeof(SMqCMGetSubEpReq);
SMqCMGetSubEpReq* buf = malloc(tlen);
if (buf == NULL) {
......@@ -842,6 +859,7 @@ void tmqClearUnhandleMsg(tmq_t* tmq) {
}
int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) {
printf("call poll\n");
for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
......@@ -864,6 +882,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) {
}
param->tmq = tmq;
param->pVg = pVg;
param->epoch = tmq->epoch;
SRequestObj* pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_VND_CONSUME);
pRequest->body.requestMsg = (SDataBuf){
.pData = pReq,
......@@ -877,6 +896,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) {
sendInfo->fp = tmqPollCb;
int64_t transporterId = 0;
printf("send poll\n");
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
pVg->pollCnt++;
tmq->pollCnt++;
......@@ -885,17 +905,10 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) {
return 0;
}
void tmqFetchLeftRes(tmq_t* tmq, tmq_message_t** pRspMsg) {
taosGetQitem(tmq->qall, (void**)pRspMsg);
if (pRspMsg == NULL) {
taosReadAllQitems(tmq->mqueue, tmq->qall);
taosGetQitem(tmq->qall, (void**)pRspMsg);
}
}
// return
int32_t tmqHandleRes(tmq_t* tmq, tmq_message_t* rspMsg, bool* pReset) {
if (rspMsg->head.mqMsgType == TMQ_MSG_TYPE__EP_RSP) {
printf("ep %d %d\n", rspMsg->head.epoch, tmq->epoch);
if (rspMsg->head.epoch > atomic_load_32(&tmq->epoch)) {
tmqUpdateEp(tmq, rspMsg->head.epoch, &rspMsg->getEpRsp);
tmqClearUnhandleMsg(tmq);
......@@ -904,57 +917,68 @@ int32_t tmqHandleRes(tmq_t* tmq, tmq_message_t* rspMsg, bool* pReset) {
*pReset = false;
}
} else {
*pReset = false;
return -1;
}
return 0;
}
tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) {
tmq_message_t* tmqHandleAllRsp(tmq_t* tmq, int64_t blockingTime, bool pollIfReset) {
while (1) {
tmq_message_t* rspMsg = NULL;
int64_t startTime = taosGetTimestampMs();
// TODO: put into another thread or delayed queue
int64_t status = atomic_load_64(&tmq->status);
tmqAskEp(tmq, status == TMQ_CONSUMER_STATUS__INIT);
tmqFetchLeftRes(tmq, &rspMsg);
taosGetQitem(tmq->qall, (void**)&rspMsg);
if (rspMsg == NULL) {
taosReadAllQitems(tmq->mqueue, tmq->qall);
break;
}
while (1) {
taosGetQitem(tmq->qall, (void**)&rspMsg);
if (rspMsg == NULL) break;
if (rspMsg->head.mqMsgType == TMQ_MSG_TYPE__POLL_RSP) {
printf("handle poll rsp %d\n", rspMsg->head.mqMsgType);
if (rspMsg->head.epoch == atomic_load_32(&tmq->epoch)) {
printf("epoch match\n");
SMqClientVg* pVg = rspMsg->extra;
pVg->currentOffset = rspMsg->consumeRsp.rspOffset;
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
return rspMsg;
} else {
printf("epoch mismatch\n");
taosFreeQitem(rspMsg);
}
} else {
printf("handle ep rsp %d\n", rspMsg->head.mqMsgType);
bool reset = false;
tmqHandleRes(tmq, rspMsg, &reset);
taosFreeQitem(rspMsg);
if (pollIfReset && reset) {
printf("reset and repoll\n");
tmqPollImpl(tmq, blockingTime);
}
}
}
return NULL;
}
tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) {
tmq_message_t* rspMsg = NULL;
int64_t startTime = taosGetTimestampMs();
// TODO: put into another thread or delayed queue
int64_t status = atomic_load_64(&tmq->status);
tmqAskEp(tmq, status == TMQ_CONSUMER_STATUS__INIT);
taosGetQitem(tmq->qall, (void**)&rspMsg);
if (rspMsg == NULL) {
taosReadAllQitems(tmq->mqueue, tmq->qall);
}
tmqHandleAllRsp(tmq, blocking_time, false);
tmqPollImpl(tmq, blocking_time);
while (1) {
taosReadAllQitems(tmq->mqueue, tmq->qall);
while (1) {
taosGetQitem(tmq->qall, (void**)&rspMsg);
if (rspMsg == NULL) break;
if (rspMsg->head.mqMsgType == TMQ_MSG_TYPE__POLL_RSP) {
return rspMsg;
} else {
bool reset = false;
tmqHandleRes(tmq, rspMsg, &reset);
taosFreeQitem(rspMsg);
if (reset) tmqPollImpl(tmq, blocking_time);
}
}
tmqHandleAllRsp(tmq, blocking_time, true);
int64_t endTime = taosGetTimestampMs();
if (endTime - startTime > blocking_time) {
printf("cycle end\n");
usleep(1000 * 1000);
return NULL;
}
}
......@@ -1092,9 +1116,9 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* tmq_topic_v
void tmq_message_destroy(tmq_message_t* tmq_message) {
if (tmq_message == NULL) return;
SMqConsumeRsp* pRsp = (SMqConsumeRsp*)tmq_message;
SMqConsumeRsp* pRsp = &tmq_message->consumeRsp;
tDeleteSMqConsumeRsp(pRsp);
free(tmq_message);
taosFreeQitem(tmq_message);
}
tmq_resp_err_t tmq_consumer_close(tmq_t* tmq) { return TMQ_RESP_ERR__SUCCESS; }
......
......@@ -270,9 +270,8 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) {
strcpy(rsp.cgroup, pReq->cgroup);
rsp.consumerId = consumerId;
rsp.epoch = pConsumer->epoch;
if (epoch != rsp.epoch) {
mInfo("send new assignment to consumer, consumer epoch %d, server epoch %d", epoch, rsp.epoch);
if (epoch != pConsumer->epoch) {
mInfo("send new assignment to consumer, consumer epoch %d, server epoch %d", epoch, pConsumer->epoch);
SArray *pTopics = pConsumer->currentTopics;
int32_t sz = taosArrayGetSize(pTopics);
rsp.topics = taosArrayInit(sz, sizeof(SMqSubTopicEp));
......@@ -308,13 +307,16 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) {
mndReleaseSubscribe(pMnode, pSub);
}
}
int32_t tlen = tEncodeSMqCMGetSubEpRsp(NULL, &rsp);
int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqCMGetSubEpRsp(NULL, &rsp);
void *buf = rpcMallocCont(tlen);
if (buf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
void *abuf = buf;
((SMqRspHead *)buf)->mqMsgType = TMQ_MSG_TYPE__EP_RSP;
((SMqRspHead *)buf)->epoch = pConsumer->epoch;
void *abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
tEncodeSMqCMGetSubEpRsp(&abuf, &rsp);
tDeleteSMqCMGetSubEpRsp(&rsp);
mndReleaseConsumer(pMnode, pConsumer);
......
......@@ -438,6 +438,33 @@ static int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTo
}
#endif
static int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTopics) {
SSdb *pSdb = pMnode->pSdb;
SDbObj *pDb = mndAcquireDb(pMnode, dbName);
if (pDb == NULL) {
terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
return -1;
}
int32_t numOfTopics = 0;
void *pIter = NULL;
while (1) {
SMqTopicObj *pTopic = NULL;
pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic);
if (pIter == NULL) break;
if (pTopic->dbUid == pDb->uid) {
numOfTopics++;
}
sdbRelease(pSdb, pTopic);
}
*pNumOfTopics = numOfTopics;
mndReleaseDb(pMnode, pDb);
return 0;
}
static int32_t mndGetTopicMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) {
SMnode *pMnode = pReq->pMnode;
SSdb *pSdb = pMnode->pSdb;
......
......@@ -220,7 +220,11 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
fetchOffset = pReq->currentOffset + 1;
}
SMqConsumeRsp rsp = {.consumerId = consumerId, .numOfTopics = 0, .pBlockData = NULL};
SMqConsumeRsp rsp = {
.consumerId = consumerId,
.numOfTopics = 0,
.pBlockData = NULL,
};
STqConsumer* pConsumer = tqHandleGet(pTq->tqMeta, consumerId);
if (pConsumer == NULL) {
......@@ -296,14 +300,16 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
}
}
int32_t tlen = tEncodeSMqConsumeRsp(NULL, &rsp);
int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqConsumeRsp(NULL, &rsp);
void* buf = rpcMallocCont(tlen);
if (buf == NULL) {
pMsg->code = -1;
return -1;
}
((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
((SMqRspHead*)buf)->epoch = pReq->epoch;
void* abuf = buf;
void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
tEncodeSMqConsumeRsp(&abuf, &rsp);
rsp.pBlockData = NULL;
pMsg->pCont = buf;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册