未验证 提交 2b8f3b8c 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #12645 from taosdata/feature/tq

enh(tmq): commit when consumer_close called
......@@ -81,7 +81,7 @@ int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numO
* @param isAdd
* @return
*/
int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, SArray* tableIdList, bool isAdd);
int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bool isAdd);
/**
* Create the exec task object according to task json
......@@ -169,7 +169,7 @@ int32_t qUpdateQueriedTableIdList(qTaskInfo_t tinfo, int64_t uid, int32_t type);
void qProcessFetchRsp(void* parent, struct SRpcMsg* pMsg, struct SEpSet* pEpSet);
int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, int32_t *resNum, SExplainExecInfo **pRes);
int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, int32_t* resNum, SExplainExecInfo** pRes);
#ifdef __cplusplus
}
......
......@@ -830,10 +830,12 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
}
// init hb timer
if (tmq->hbTimer == NULL) {
tmq->hbTimer = taosTmrStart(tmqAssignDelayedHbTask, 1000, tmq, tmqMgmt.timer);
}
// init auto commit timer
if (tmq->autoCommit) {
if (tmq->autoCommit && tmq->commitTimer == NULL) {
tmq->commitTimer = taosTmrStart(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, tmq, tmqMgmt.timer);
}
......@@ -1456,9 +1458,18 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t wait_time) {
tmq_resp_err_t tmq_consumer_close(tmq_t* tmq) {
if (tmq->status == TMQ_CONSUMER_STATUS__READY) {
tmq_resp_err_t rsp = tmq_commit_sync(tmq, NULL);
if (rsp == TMQ_RESP_ERR__SUCCESS) {
// TODO: free resources
return TMQ_RESP_ERR__SUCCESS;
} else {
return TMQ_RESP_ERR__FAIL;
}
tmq_list_t* lst = tmq_list_new();
tmq_resp_err_t rsp = tmq_subscribe(tmq, lst);
rsp = tmq_subscribe(tmq, lst);
tmq_list_destroy(lst);
if (rsp == TMQ_RESP_ERR__SUCCESS) {
// TODO: free resources
return TMQ_RESP_ERR__SUCCESS;
......
......@@ -627,6 +627,10 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
if (pNewConsumer->updateType == CONSUMER_UPDATE__MODIFY) {
ASSERT(taosArrayGetSize(pOldConsumer->rebNewTopics) == 0);
ASSERT(taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0);
if (taosArrayGetSize(pNewConsumer->rebNewTopics) == 0 && taosArrayGetSize(pNewConsumer->rebRemovedTopics) == 0) {
pOldConsumer->status = MQ_CONSUMER_STATUS__READY;
} else {
SArray *tmp = pOldConsumer->rebNewTopics;
pOldConsumer->rebNewTopics = pNewConsumer->rebNewTopics;
pNewConsumer->rebNewTopics = tmp;
......@@ -642,6 +646,7 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
pOldConsumer->subscribeTime = pNewConsumer->upTime;
pOldConsumer->status = MQ_CONSUMER_STATUS__MODIFY;
}
} else if (pNewConsumer->updateType == CONSUMER_UPDATE__LOST) {
ASSERT(taosArrayGetSize(pOldConsumer->rebNewTopics) == 0);
ASSERT(taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0);
......
......@@ -117,6 +117,7 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal);
void tqClose(STQ*);
int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver);
int tqCommit(STQ*);
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList);
int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen);
int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen);
int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen, int32_t workerId);
......
......@@ -101,6 +101,21 @@ static void tdSRowDemo() {
taosMemoryFree(pTSChema);
}
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList) {
void* pIter = NULL;
STqExec* pExec = NULL;
while (1) {
pIter = taosHashIterate(pTq->execs, pIter);
if (pIter == NULL) break;
pExec = (STqExec*)pIter;
for (int32_t i = 0; i < 5; i++) {
int32_t code = qUpdateQualifiedTableId(pExec->task[i], tbUidList, true);
ASSERT(code == 0);
}
}
return 0;
}
int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) {
if (msgType != TDMT_VND_SUBMIT) return 0;
void* pIter = NULL;
......
......@@ -333,6 +333,7 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq,
SVCreateTbRsp cRsp = {0};
char tbName[TSDB_TABLE_FNAME_LEN];
STbUidStore *pStore = NULL;
SArray *tbUids = NULL;
pRsp->msgType = TDMT_VND_CREATE_TABLE_RSP;
pRsp->code = TSDB_CODE_SUCCESS;
......@@ -348,7 +349,8 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq,
}
rsp.pArray = taosArrayInit(req.nReqs, sizeof(cRsp));
if (rsp.pArray == NULL) {
tbUids = taosArrayInit(req.nReqs, sizeof(int64_t));
if (rsp.pArray == NULL || tbUids == NULL) {
rcode = -1;
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
......@@ -376,6 +378,7 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq,
} else {
cRsp.code = TSDB_CODE_SUCCESS;
tdFetchTbUidList(pVnode->pSma, &pStore, pCreateReq->ctb.suid, pCreateReq->uid);
taosArrayPush(tbUids, &pCreateReq->uid);
}
taosArrayPush(rsp.pArray, &cRsp);
......@@ -383,6 +386,7 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq,
tDecoderClear(&decoder);
tqUpdateTbUidList(pVnode->pTq, tbUids);
tdUpdateTbUidList(pVnode->pSma, pStore);
tdUidStoreFree(pStore);
......@@ -402,6 +406,7 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq,
_exit:
taosArrayDestroy(rsp.pArray);
taosArrayDestroy(tbUids);
tDecoderClear(&decoder);
tEncoderClear(&encoder);
return rcode;
......
......@@ -126,7 +126,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle) {
return pTaskInfo;
}
int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, SArray* tableIdList, bool isAdd) {
int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bool isAdd) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
// traverse to the stream scanner node to add this table id
......@@ -141,12 +141,12 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, SArray* tableIdList, bool isA
SMetaReader mr = {0};
metaReaderInit(&mr, pScanInfo->readHandle.meta, 0);
for(int32_t i = 0; i < taosArrayGetSize(tableIdList); ++i) {
for (int32_t i = 0; i < taosArrayGetSize(tableIdList); ++i) {
int64_t* id = (int64_t*)taosArrayGet(tableIdList, i);
int32_t code = metaGetTableEntryByUid(&mr, *id);
if (code != TSDB_CODE_SUCCESS) {
qError("failed to get table meta, uid:%"PRIu64" code:%s", *id, tstrerror(terrno));
qError("failed to get table meta, uid:%" PRIu64 " code:%s", *id, tstrerror(terrno));
continue;
}
......@@ -160,7 +160,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, SArray* tableIdList, bool isA
metaReaderClear(&mr);
qDebug(" %d qualified child tables added into stream scanner", (int32_t) taosArrayGetSize(qa));
qDebug(" %d qualified child tables added into stream scanner", (int32_t)taosArrayGetSize(qa));
int32_t code = tqReadHandleAddTbUidList(pScanInfo->streamBlockReader, qa);
if (code != TSDB_CODE_SUCCESS) {
return code;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册