From 601e454a242bd511f2e2914f3f946490715cb45f Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Tue, 14 Jun 2022 13:58:40 +0800 Subject: [PATCH] enh(tmq): put offset store into vnode --- include/common/tmsg.h | 29 +++- include/common/tmsgdef.h | 3 +- source/client/src/clientImpl.c | 54 +++--- source/client/src/tmq.c | 172 +++++++++++++++++++- source/common/src/tdatablock.c | 4 +- source/common/src/tmsg.c | 29 ++++ source/dnode/mgmt/mgmt_vnode/src/vmHandle.c | 1 + source/dnode/vnode/src/inc/tq.h | 36 ++-- source/dnode/vnode/src/inc/vnodeInt.h | 1 + source/dnode/vnode/src/tq/tq.c | 35 +++- source/dnode/vnode/src/tq/tqOffset.c | 115 +++++++++++-- source/dnode/vnode/src/vnd/vnodeSvr.c | 15 +- 12 files changed, 415 insertions(+), 79 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index d69849349c..be9eda3825 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1494,9 +1494,9 @@ typedef struct { int32_t code; } STaskDropRsp; -#define STREAM_TRIGGER_AT_ONCE 1 -#define STREAM_TRIGGER_WINDOW_CLOSE 2 -#define STREAM_TRIGGER_MAX_DELAY 3 +#define STREAM_TRIGGER_AT_ONCE 1 +#define STREAM_TRIGGER_WINDOW_CLOSE 2 +#define STREAM_TRIGGER_MAX_DELAY 3 typedef struct { char name[TSDB_TABLE_FNAME_LEN]; @@ -2297,6 +2297,29 @@ int32_t tDecodeSMqOffset(SDecoder* decoder, SMqOffset* pOffset); int32_t tEncodeSMqCMCommitOffsetReq(SEncoder* encoder, const SMqCMCommitOffsetReq* pReq); int32_t tDecodeSMqCMCommitOffsetReq(SDecoder* decoder, SMqCMCommitOffsetReq* pReq); +// tqOffset +enum { + TMQ_OFFSET__SNAPSHOT = 1, + TMQ_OFFSET__LOG, +}; + +typedef struct { + int8_t type; + union { + struct { + int64_t uid; + int64_t ts; + }; + struct { + int64_t version; + }; + }; + char subKey[TSDB_SUBSCRIBE_KEY_LEN]; +} STqOffset; + +int32_t tEncodeSTqOffset(SEncoder* pEncoder, const STqOffset* pOffset); +int32_t tDecodeSTqOffset(SDecoder* pDecoder, STqOffset* pOffset); + typedef struct { char name[TSDB_TABLE_FNAME_LEN]; char stb[TSDB_TABLE_FNAME_LEN]; diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 743f10bd55..534194e1cf 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -140,7 +140,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_MND_MQ_CONSUMER_RECOVER, "consumer-recover", SMqConsumerRecoverMsg, NULL) TD_DEF_MSG_TYPE(TDMT_MND_MQ_DO_REBALANCE, "do-rebalance", SMqDoRebalanceMsg, NULL) TD_DEF_MSG_TYPE(TDMT_MND_MQ_DROP_CGROUP, "drop-cgroup", SMqDropCGroupReq, SMqDropCGroupRsp) - TD_DEF_MSG_TYPE(TDMT_MND_MQ_COMMIT_OFFSET, "commit-offset", SMqCMCommitOffsetReq, SMqCMCommitOffsetRsp) + TD_DEF_MSG_TYPE(TDMT_MND_MQ_COMMIT_OFFSET, "mnode-commit-offset", SMqCMCommitOffsetReq, SMqCMCommitOffsetRsp) TD_DEF_MSG_TYPE(TDMT_MND_MQ_TIMER, "mq-tmr", SMTimerReq, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TELEM_TIMER, "telem-tmr", SMTimerReq, SMTimerReq) TD_DEF_MSG_TYPE(TDMT_MND_TRANS_TIMER, "trans-tmr", NULL, NULL) @@ -176,6 +176,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_VND_DROP_STB, "vnode-drop-stb", SVDropStbReq, NULL) TD_DEF_MSG_TYPE(TDMT_VND_MQ_VG_CHANGE, "vnode-mq-vg-change", SMqRebVgReq, SMqRebVgRsp) TD_DEF_MSG_TYPE(TDMT_VND_MQ_VG_DELETE, "vnode-mq-vg-delete", SMqVDeleteReq, SMqVDeleteRsp) + TD_DEF_MSG_TYPE(TDMT_VND_MQ_COMMIT_OFFSET, "vnode-commit-offset", STqOffset, STqOffset) TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_TASK, "vnode-cancel-task", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_DROP_TASK, "vnode-drop-task", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_CREATE_TOPIC, "vnode-create-topic", NULL, NULL) diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index dc83f7bc43..f48383c43a 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -205,7 +205,7 @@ int32_t execLocalCmd(SRequestObj* pRequest, SQuery* pQuery) { SRetrieveTableRsp* pRsp = NULL; int32_t code = qExecCommand(pQuery->pRoot, &pRsp); if (TSDB_CODE_SUCCESS == code && NULL != pRsp) { - code = setQueryResultFromRsp(&pRequest->body.resInfo, pRsp, false, false); + code = setQueryResultFromRsp(&pRequest->body.resInfo, pRsp, false, true); } return code; @@ -232,9 +232,7 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQuery* pQuery) { return TSDB_CODE_SUCCESS; } -static SAppInstInfo* getAppInfo(SRequestObj* pRequest) { - return pRequest->pTscObj->pAppInfo; -} +static SAppInstInfo* getAppInfo(SRequestObj* pRequest) { return pRequest->pTscObj->pAppInfo; } void asyncExecLocalCmd(SRequestObj* pRequest, SQuery* pQuery) { SRetrieveTableRsp* pRsp = NULL; @@ -258,7 +256,7 @@ void asyncExecLocalCmd(SRequestObj* pRequest, SQuery* pQuery) { } pRequest->body.queryFp(pRequest->body.param, pRequest, 0); -// pRequest->body.fetchFp(pRequest->body.param, pRequest, pResultInfo->numOfRows); + // pRequest->body.fetchFp(pRequest->body.param, pRequest, pResultInfo->numOfRows); } int32_t asyncExecDdlQuery(SRequestObj* pRequest, SQuery* pQuery) { @@ -401,7 +399,7 @@ int32_t scheduleAsyncQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNod SQueryResult res = {.code = 0, .numOfRows = 0}; int32_t code = schedulerAsyncExecJob(pTransporter, pNodeList, pDag, &pRequest->body.queryJob, pRequest->sqlstr, - pRequest->metric.start, schdExecCallback, &res); + pRequest->metric.start, schdExecCallback, &res); pRequest->body.resInfo.execRes = res.res; @@ -457,7 +455,8 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList return pRequest->code; } - if (TDMT_VND_SUBMIT == pRequest->type || TDMT_VND_DELETE == pRequest->type || TDMT_VND_CREATE_TABLE == pRequest->type) { + if (TDMT_VND_SUBMIT == pRequest->type || TDMT_VND_DELETE == pRequest->type || + TDMT_VND_CREATE_TABLE == pRequest->type) { pRequest->body.resInfo.numOfRows = res.numOfRows; if (pRequest->body.queryJob != 0) { @@ -470,9 +469,9 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList return pRequest->code; } -int32_t handleSubmitExecRes(SRequestObj* pRequest, void* res, SCatalog* pCatalog, SEpSet *epset) { - int32_t code = 0; - SArray* pArray = NULL; +int32_t handleSubmitExecRes(SRequestObj* pRequest, void* res, SCatalog* pCatalog, SEpSet* epset) { + int32_t code = 0; + SArray* pArray = NULL; SSubmitRsp* pRsp = (SSubmitRsp*)res; if (pRsp->nBlocks <= 0) { return TSDB_CODE_SUCCESS; @@ -502,7 +501,7 @@ _return: return code; } -int32_t handleQueryExecRes(SRequestObj* pRequest, void* res, SCatalog* pCatalog, SEpSet *epset) { +int32_t handleQueryExecRes(SRequestObj* pRequest, void* res, SCatalog* pCatalog, SEpSet* epset) { int32_t code = 0; SArray* pArray = NULL; SArray* pTbArray = (SArray*)res; @@ -540,15 +539,15 @@ int32_t handleQueryExecRsp(SRequestObj* pRequest) { return TSDB_CODE_SUCCESS; } - SCatalog* pCatalog = NULL; - SAppInstInfo* pAppInfo = getAppInfo(pRequest); + SCatalog* pCatalog = NULL; + SAppInstInfo* pAppInfo = getAppInfo(pRequest); int32_t code = catalogGetHandle(pAppInfo->clusterId, &pCatalog); if (code) { return code; } - SEpSet epset = getEpSet_s(&pAppInfo->mgmtEp); + SEpSet epset = getEpSet_s(&pAppInfo->mgmtEp); SQueryExecRes* pRes = &pRequest->body.resInfo.execRes; switch (pRes->msgType) { @@ -566,8 +565,8 @@ int32_t handleQueryExecRsp(SRequestObj* pRequest) { break; } default: - tscError("0x%"PRIx64", invalid exec result for request type %d, reqId:0x%"PRIx64, pRequest->self, - pRequest->type, pRequest->requestId); + tscError("0x%" PRIx64 ", invalid exec result for request type %d, reqId:0x%" PRIx64, pRequest->self, + pRequest->type, pRequest->requestId); code = TSDB_CODE_APP_ERROR; } @@ -575,13 +574,13 @@ int32_t handleQueryExecRsp(SRequestObj* pRequest) { } void schedulerExecCb(SQueryResult* pResult, void* param, int32_t code) { - SRequestObj* pRequest = (SRequestObj*) param; + SRequestObj* pRequest = (SRequestObj*)param; pRequest->code = code; STscObj* pTscObj = pRequest->pTscObj; if (code != TSDB_CODE_SUCCESS && NEED_CLIENT_HANDLE_ERROR(code)) { - tscDebug("0x%"PRIx64" client retry to handle the error, code:%d - %s, tryCount:%d, reqId:0x%"PRIx64, pRequest->self, code, tstrerror(code), - pRequest->retry, pRequest->requestId); + tscDebug("0x%" PRIx64 " client retry to handle the error, code:%d - %s, tryCount:%d, reqId:0x%" PRIx64, + pRequest->self, code, tstrerror(code), pRequest->retry, pRequest->requestId); pRequest->prevCode = code; doAsyncQuery(pRequest, true); return; @@ -589,7 +588,7 @@ void schedulerExecCb(SQueryResult* pResult, void* param, int32_t code) { if (code == TSDB_CODE_SUCCESS) { code = handleQueryExecRsp(pRequest); - ASSERT(pRequest->code == TSDB_CODE_SUCCESS); + ASSERT(pRequest->code == TSDB_CODE_SUCCESS); pRequest->code = code; } @@ -697,16 +696,17 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery) { schedulerAsyncExecJob(pAppInfo->pTransporter, pNodeList, pRequest->body.pDag, &pRequest->body.queryJob, pRequest->sqlstr, pRequest->metric.start, schedulerExecCb, pRequest); } else { - tscError("0x%"PRIx64" failed to create query plan, code:%s 0x%"PRIx64, pRequest->self, tstrerror(code), pRequest->requestId); + tscError("0x%" PRIx64 " failed to create query plan, code:%s 0x%" PRIx64, pRequest->self, tstrerror(code), + pRequest->requestId); pRequest->body.queryFp(pRequest->body.param, pRequest, code); } - //todo not to be released here + // todo not to be released here taosArrayDestroy(pNodeList); break; } case QUERY_EXEC_MODE_EMPTY_RESULT: - pRequest->type = TSDB_SQL_RETRIEVE_EMPTY_RESULT; + pRequest->type = TSDB_SQL_RETRIEVE_EMPTY_RESULT; pRequest->body.queryFp(pRequest->body.param, pRequest, 0); break; default: @@ -1349,14 +1349,14 @@ int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32 p += sizeof(uint64_t); // check fields - for(int32_t i = 0; i < numOfCols; ++i) { - int16_t type = *(int16_t*) p; + for (int32_t i = 0; i < numOfCols; ++i) { + int16_t type = *(int16_t*)p; p += sizeof(int16_t); - int32_t bytes = *(int32_t*) p; + int32_t bytes = *(int32_t*)p; p += sizeof(int32_t); -// ASSERT(type == pFields[i].type && bytes == pFields[i].bytes); + ASSERT(type == pFields[i].type && bytes == pFields[i].bytes); } int32_t* colLength = (int32_t*)p; diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 7d49c4206f..48c85cf265 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -132,6 +132,7 @@ typedef struct { // statistics int64_t pollCnt; // offset + int64_t committedOffset; int64_t currentOffset; // connection info int32_t vgId; @@ -193,6 +194,26 @@ typedef struct { void* userParam; } SMqCommitCbParam; +typedef struct { + tmq_t* tmq; + int8_t automatic; + int8_t async; + int8_t freeOffsets; + int8_t waitingRspNum; + int8_t totalRspNum; + tmq_resp_err_t rspErr; + tmq_commit_cb* userCb; + SArray* successfulOffsets; + SArray* failedOffsets; + void* userParam; + tsem_t rspSem; +} SMqCommitCbParamSet; + +typedef struct { + SMqCommitCbParamSet* params; + STqOffset* pOffset; +} SMqCommitCbParam2; + tmq_conf_t* tmq_conf_new() { tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t)); conf->withTbName = false; @@ -343,6 +364,135 @@ int32_t tmqCommitCb(void* param, const SDataBuf* pMsg, int32_t code) { return 0; } +int32_t tmqCommitCb2(void* param, const SDataBuf* pBuf, int32_t code) { + SMqCommitCbParam2* pParam = (SMqCommitCbParam2*)param; + SMqCommitCbParamSet* pParamSet = (SMqCommitCbParamSet*)pParam->params; + // push into array + if (code == 0) { + taosArrayPush(pParamSet->failedOffsets, &pParam->pOffset); + } else { + taosArrayPush(pParamSet->successfulOffsets, &pParam->pOffset); + } + // count down waiting rsp + int8_t waitingRspNum = atomic_sub_fetch_8(&pParam->params->waitingRspNum, 1); + ASSERT(waitingRspNum >= 0); + + if (waitingRspNum == 0) { + // if no more waiting rsp + if (pParamSet->async) { + // call async cb func + if (pParamSet->automatic && pParamSet->tmq->commitCb) { + pParamSet->tmq->commitCb(pParamSet->tmq, pParamSet->rspErr, NULL, pParamSet->tmq->commitCbUserParam); + } else if (!pParamSet->automatic && pParamSet->userCb) { + // sem post + pParamSet->userCb(pParamSet->tmq, pParamSet->rspErr, NULL, pParamSet->userParam); + } + } + + taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree); + taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree); + } + return 0; +} + +int32_t tmqComitInner2(tmq_t* tmq, int8_t automatic, int8_t async, tmq_commit_cb* userCb, void* userParam) { + int32_t code = -1; + + SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet)); + if (pParamSet == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + pParamSet->tmq = tmq; + pParamSet->automatic = automatic; + pParamSet->async = async; + pParamSet->freeOffsets = 1; + pParamSet->userCb = userCb; + pParamSet->userParam = userParam; + tsem_init(&pParamSet->rspSem, 0, 0); + + for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) { + SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); + for (int32_t j = 0; j < taosArrayGetSize(pTopic->vgs); j++) { + SMqClientVg* pVg = taosArrayGet(pTopic->vgs, i); + STqOffset* pOffset = taosMemoryCalloc(1, sizeof(STqOffset)); + if (pOffset == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + int32_t tlen = strlen(tmq->groupId); + memcpy(pOffset->subKey, tmq->groupId, tlen); + pOffset->subKey[tlen] = TMQ_SEPARATOR; + strcpy(pOffset->subKey + tlen + 1, pTopic->topicName); + int32_t len; + int32_t code; + tEncodeSize(tEncodeSTqOffset, pOffset, len, code); + if (code < 0) { + ASSERT(0); + } + void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len); + ((SMsgHead*)buf)->vgId = htonl(pVg->vgId); + + void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); + + SEncoder encoder; + tEncoderInit(&encoder, abuf, len); + tEncodeSTqOffset(&encoder, pOffset); + + // build param + SMqCommitCbParam2* pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam2)); + pParam->params = pParamSet; + pParam->pOffset = pOffset; + + // build send info + SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); + if (pMsgSendInfo == NULL) { + // TODO + continue; + } + pMsgSendInfo->msgInfo = (SDataBuf){ + .pData = buf, + .len = len, + .handle = NULL, + }; + + pMsgSendInfo->requestId = generateRequestId(); + pMsgSendInfo->requestObjRefId = 0; + pMsgSendInfo->param = pParam; + pMsgSendInfo->fp = tmqCommitCb2; + pMsgSendInfo->msgType = TDMT_MND_MQ_COMMIT_OFFSET; + // send msg + + SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp); + int64_t transporterId = 0; + asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, pMsgSendInfo); + } + } + + if (!async) { + tsem_wait(&pParamSet->rspSem); + code = pParamSet->rspErr; + tsem_destroy(&pParamSet->rspSem); + } else { + code = 0; + } + + if (code != 0 && async) { + if (automatic) { + tmq->commitCb(tmq, code, NULL, tmq->commitCbUserParam); + } else { + userCb(tmq, code, NULL, userParam); + } + } + + if (!async) { + taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree); + taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree); + } + + return 0; +} + int32_t tmqCommitInner(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int8_t automatic, int8_t async, tmq_commit_cb* userCb, void* userParam) { SMqCMCommitOffsetReq req; @@ -890,12 +1040,13 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) { sprintf(vgKey, "%s:%d", topic.topicName, pVgEp->vgId); int64_t* pOffset = taosHashGet(pHash, vgKey, strlen(vgKey)); int64_t offset = pVgEp->offset; - tscDebug("consumer %ld epoch %d vg %d offset og to %ld", tmq->consumerId, epoch, pVgEp->vgId, offset); + tscDebug("consumer %ld(epoch %d) original offset of vg %d is %ld", tmq->consumerId, epoch, pVgEp->vgId, offset); if (pOffset != NULL) { offset = *pOffset; - tscDebug("consumer %ld epoch %d vg %d found %s", tmq->consumerId, epoch, pVgEp->vgId, vgKey); + tscDebug("consumer %ld(epoch %d) receive offset of vg %d, full key is %s", tmq->consumerId, epoch, pVgEp->vgId, + vgKey); } - tscDebug("consumer %ld epoch %d vg %d offset set to %ld", tmq->consumerId, epoch, pVgEp->vgId, offset); + tscDebug("consumer %ld(epoch %d) offset of vg %d updated to %ld", tmq->consumerId, epoch, pVgEp->vgId, offset); SMqClientVg clientVg = { .pollCnt = 0, .currentOffset = offset, @@ -1226,9 +1377,8 @@ SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) { SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper; /*atomic_sub_fetch_32(&tmq->readyRequest, 1);*/ - /*printf("handle poll rsp %d\n", rspMsg->head.mqMsgType);*/ - if (pollRspWrapper->msg.head.epoch == atomic_load_32(&tmq->epoch)) { - /*printf("epoch match\n");*/ + int32_t consumerEpoch = atomic_load_32(&tmq->epoch); + if (pollRspWrapper->msg.head.epoch == consumerEpoch) { SMqClientVg* pVg = pollRspWrapper->vgHandle; /*printf("vg %d offset %ld up to %ld\n", pVg->vgId, pVg->currentOffset, rspMsg->msg.rspOffset);*/ pVg->currentOffset = pollRspWrapper->msg.rspOffset; @@ -1243,7 +1393,8 @@ SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { taosFreeQitem(pollRspWrapper); return pRsp; } else { - /*printf("epoch mismatch\n");*/ + tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d\n", pollRspWrapper->msg.head.epoch, + consumerEpoch); taosFreeQitem(pollRspWrapper); } } else { @@ -1263,10 +1414,14 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { SMqRspObj* rspObj; int64_t startTime = taosGetTimestampMs(); +#if 0 + tmqHandleAllDelayedTask(tmq); + tmqPollImpl(tmq, timeout); rspObj = tmqHandleAllRsp(tmq, timeout, false); if (rspObj) { return (TAOS_RES*)rspObj; } +#endif // in no topic status also need process delayed task if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__INIT) { @@ -1359,8 +1514,7 @@ const char* tmq_get_table_name(TAOS_RES* res) { pRspObj->resIter >= pRspObj->rsp.blockNum) { return NULL; } - const char* name = taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter); - return name; + return (const char*)taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter); } return NULL; } diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index b7a9ef88b6..4d1f5a34b1 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1756,7 +1756,7 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo createTbReq.type = TSDB_CHILD_TABLE; createTbReq.ctb.suid = suid; - STagVal tagVal = {.cid = 1, + STagVal tagVal = {.cid = pDataBlock->info.numOfCols + 1, .type = TSDB_DATA_TYPE_UBIGINT, .pData = (uint8_t*)&pDataBlock->info.groupId, .nData = sizeof(uint64_t)}; @@ -1821,7 +1821,7 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo createTbReq.type = TSDB_CHILD_TABLE; createTbReq.ctb.suid = suid; - STagVal tagVal = {.cid = 1, + STagVal tagVal = {.cid = pDataBlock->info.numOfCols + 1, .type = TSDB_DATA_TYPE_UBIGINT, .pData = (uint8_t*)&pDataBlock->info.groupId, .nData = sizeof(uint64_t)}; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index d16ab57ea9..4ebd91b0da 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -4746,3 +4746,32 @@ void tFreeSMAlterStbRsp(SMAlterStbRsp *pRsp) { taosMemoryFree(pRsp->pMeta); } } + +int32_t tEncodeSTqOffset(SEncoder *pEncoder, const STqOffset *pOffset) { + if (tEncodeI8(pEncoder, pOffset->type) < 0) return -1; + if (pOffset->type == TMQ_OFFSET__SNAPSHOT) { + if (tEncodeI64(pEncoder, pOffset->uid) < 0) return -1; + if (tEncodeI64(pEncoder, pOffset->ts) < 0) return -1; + } else if (pOffset->type == TMQ_OFFSET__LOG) { + if (tEncodeI64(pEncoder, pOffset->version) < 0) return -1; + } else { + ASSERT(0); + } + if (tEncodeCStr(pEncoder, pOffset->subKey) < 0) return -1; + return 0; +} + +int32_t tDecodeSTqOffset(SDecoder *pDecoder, STqOffset *pOffset) { + if (tDecodeI8(pDecoder, &pOffset->type) < 0) return -1; + if (pOffset->type == TMQ_OFFSET__SNAPSHOT) { + if (tDecodeI64(pDecoder, &pOffset->uid) < 0) return -1; + if (tDecodeI64(pDecoder, &pOffset->ts) < 0) return -1; + } else if (pOffset->type == TMQ_OFFSET__LOG) { + if (tDecodeI64(pDecoder, &pOffset->version) < 0) return -1; + } else { + ASSERT(0); + } + if (tDecodeCStrTo(pDecoder, pOffset->subKey) < 0) return -1; + return 0; +} + diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index ee120576c3..750006d05f 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -344,6 +344,7 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_SUBMIT_RSMA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_CHANGE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_COMMIT_OFFSET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_CONSUME, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_HEARTBEAT, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 5a8564bfd1..2ee0673ce5 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -41,7 +41,6 @@ extern "C" { #define tqTrace(...) do { if (tqDebugFlag & DEBUG_TRACE) { taosPrintLog("TQ ", DEBUG_TRACE, tqDebugFlag, __VA_ARGS__); }} while(0) // clang-format on -typedef struct STqOffsetCfg STqOffsetCfg; typedef struct STqOffsetStore STqOffsetStore; // tqRead @@ -127,14 +126,15 @@ typedef struct { } STqHandle; struct STQ { - char* path; - SHashObj* pushMgr; // consumerId -> STqHandle* - SHashObj* handles; // subKey -> STqHandle - SHashObj* pStreamTasks; // taksId -> SStreamTask - SVnode* pVnode; - SWal* pWal; - TDB* pMetaStore; - TTB* pExecStore; + char* path; + SHashObj* pushMgr; // consumerId -> STqHandle* + SHashObj* handles; // subKey -> STqHandle + SHashObj* pStreamTasks; // taksId -> SStreamTask + STqOffsetStore* pOffsetStore; + SVnode* pVnode; + SWal* pWal; + TDB* pMetaStore; + TTB* pExecStore; }; typedef struct { @@ -157,16 +157,18 @@ int32_t tqMetaClose(STQ* pTq); int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle); int32_t tqMetaDeleteHandle(STQ* pTq, const char* key); -// tqSink -void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data); +typedef struct { + int32_t size; +} STqOffsetHead; -// tqOffset -STqOffsetStore* tqOffsetOpen(STqOffsetCfg*); +STqOffsetStore* tqOffsetOpen(); void tqOffsetClose(STqOffsetStore*); -int64_t tqOffsetFetch(STqOffsetStore* pStore, const char* subscribeKey); -int32_t tqOffsetCommit(STqOffsetStore* pStore, const char* subscribeKey, int64_t offset); -int32_t tqOffsetPersist(STqOffsetStore* pStore, const char* subscribeKey); -int32_t tqOffsetPersistAll(STqOffsetStore* pStore); +STqOffset* tqOffsetRead(STqOffsetStore* pStore, const char* subscribeKey); +int32_t tqOffsetWrite(STqOffsetStore* pStore, const STqOffset* pOffset); +int32_t tqOffsetSnapshot(STqOffsetStore* pStore); + +// tqSink +void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data); #ifdef __cplusplus } diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 0c2b09a493..300a5f890e 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -137,6 +137,7 @@ int tqCommit(STQ*); int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd); int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen); +int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId); int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* data); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 4a5ea49d79..22685c1e19 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -66,19 +66,23 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal) { ASSERT(0); } + if (tqOffsetOpen(pTq) < 0) { + ASSERT(0); + } + return pTq; } void tqClose(STQ* pTq) { if (pTq) { - taosMemoryFreeClear(pTq->path); + tqOffsetClose(pTq->pOffsetStore); taosHashCleanup(pTq->handles); taosHashCleanup(pTq->pStreamTasks); taosHashCleanup(pTq->pushMgr); + taosMemoryFree(pTq->path); tqMetaClose(pTq); taosMemoryFree(pTq); } - // TODO } int32_t tqSendPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataBlkRsp* pRsp) { @@ -109,6 +113,33 @@ int32_t tqSendPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con return 0; } +int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen) { + STqOffset offset = {0}; + SDecoder decoder; + tDecoderInit(&decoder, msg, msgLen); + if (tDecodeSTqOffset(&decoder, &offset) < 0) { + ASSERT(0); + return -1; + } + tDecoderClear(&decoder); + + if (offset.type == TMQ_OFFSET__SNAPSHOT) { + tqDebug("receive offset commit msg to %s, offset(type:snapshot) uid: %ld, ts: %ld", offset.subKey, offset.uid, + offset.ts); + } else if (offset.type == TMQ_OFFSET__LOG) { + tqDebug("receive offset commit msg to %s, offset(type:log) version: %ld", offset.subKey, offset.version); + } else { + ASSERT(0); + } + + if (tqOffsetWrite(pTq->pOffsetStore, &offset) < 0) { + ASSERT(0); + return -1; + } + + return 0; +} + int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { SMqPollReq* pReq = pMsg->pCont; int64_t consumerId = pReq->consumerId; diff --git a/source/dnode/vnode/src/tq/tqOffset.c b/source/dnode/vnode/src/tq/tqOffset.c index 4d83a67579..2db49d7cf5 100644 --- a/source/dnode/vnode/src/tq/tqOffset.c +++ b/source/dnode/vnode/src/tq/tqOffset.c @@ -16,26 +16,113 @@ #include "tq.h" -enum ETqOffsetPersist { - TQ_OFFSET_PERSIST__LAZY = 1, - TQ_OFFSET_PERSIST__EAGER, -}; - -struct STqOffsetCfg { - int8_t persistPolicy; -}; - struct STqOffsetStore { - STqOffsetCfg cfg; - SHashObj* pHash; // SHashObj + STQ* pTq; + SHashObj* pHash; // SHashObj }; -STqOffsetStore* tqOffsetOpen(STqOffsetCfg* pCfg) { - STqOffsetStore* pStore = taosMemoryMalloc(sizeof(STqOffsetStore)); +STqOffsetStore* tqOffsetOpen(STQ* pTq) { + STqOffsetStore* pStore = taosMemoryCalloc(1, sizeof(STqOffsetStore)); if (pStore == NULL) { return NULL; } - memcpy(&pStore->cfg, pCfg, sizeof(STqOffsetCfg)); pStore->pHash = taosHashInit(64, MurmurHash3_32, true, HASH_NO_LOCK); + if (pStore->pHash == NULL) { + if (pStore->pHash) taosHashCleanup(pStore->pHash); + return NULL; + } + TdFilePtr pFile = taosOpenFile(pStore->pTq->path, TD_FILE_READ); + if (pFile != NULL) { + STqOffsetHead head = {0}; + int64_t code; + + while (1) { + if ((code = taosReadFile(pFile, &head, sizeof(STqOffsetHead))) != sizeof(STqOffsetHead)) { + if (code < 0) { + break; + } else { + ASSERT(0); + // TODO handle error + } + } + int32_t size = htonl(head.size); + void* memBuf = taosMemoryCalloc(1, size); + if ((code = taosReadFile(pFile, memBuf, size)) != size) { + ASSERT(0); + // TODO handle error + } + STqOffset offset; + SDecoder decoder; + tDecoderInit(&decoder, memBuf, size); + if (tDecodeSTqOffset(&decoder, &offset) < 0) { + ASSERT(0); + } + tDecoderClear(&decoder); + if (taosHashPut(pStore->pHash, offset.subKey, strlen(offset.subKey), &offset, sizeof(STqOffset)) < 0) { + ASSERT(0); + // TODO + } + } + + taosCloseFile(&pFile); + } return pStore; } + +void tqOffsetClose(STqOffsetStore* pStore) { + tqOffsetSnapshot(pStore); + taosHashCleanup(pStore->pHash); +} + +STqOffset* tqOffsetRead(STqOffsetStore* pStore, const char* subscribeKey) { + return (STqOffset*)taosHashGet(pStore->pHash, subscribeKey, strlen(subscribeKey)); +} + +int32_t tqOffsetWrite(STqOffsetStore* pStore, const STqOffset* pOffset) { + return taosHashPut(pStore->pHash, pOffset->subKey, strlen(pOffset->subKey), pOffset, sizeof(STqOffset)); +} + +int32_t tqOffsetSnapshot(STqOffsetStore* pStore) { + // open file + // TODO file name should be with a version + TdFilePtr pFile = taosOpenFile(pStore->pTq->path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND); + if (pFile == NULL) { + ASSERT(0); + return -1; + } + void* pIter = NULL; + while (1) { + pIter = taosHashIterate(pStore->pHash, pIter); + if (pIter == NULL) break; + STqOffset* pOffset = (STqOffset*)pIter; + int32_t bodyLen; + int32_t code; + tEncodeSize(tEncodeSTqOffset, pOffset, bodyLen, code); + ASSERT(code == 0); + if (code < 0) { + ASSERT(0); + taosHashCancelIterate(pStore->pHash, pIter); + return -1; + } + + int32_t totLen = sizeof(STqOffsetHead) + bodyLen; + void* buf = taosMemoryCalloc(1, totLen); + void* abuf = POINTER_SHIFT(buf, sizeof(STqOffsetHead)); + + ((STqOffsetHead*)buf)->size = htonl(bodyLen); + SEncoder encoder; + tEncoderInit(&encoder, abuf, bodyLen); + tEncodeSTqOffset(&encoder, pOffset); + // write file + int64_t writeLen; + if ((writeLen = taosWriteFile(pFile, buf, totLen)) != bodyLen) { + ASSERT(0); + tqError("write offset incomplete, len %d, write len %ld", bodyLen, writeLen); + taosHashCancelIterate(pStore->pHash, pIter); + return -1; + } + } + // close and rename file + taosCloseFile(&pFile); + return 0; +} diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index ab2efa4791..186ab4b528 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -148,17 +148,24 @@ int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp case TDMT_VND_MQ_VG_CHANGE: if (tqProcessVgChangeReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), pMsg->contLen - sizeof(SMsgHead)) < 0) { - // TODO: handle error + goto _err; } break; case TDMT_VND_MQ_VG_DELETE: if (tqProcessVgDeleteReq(pVnode->pTq, pMsg->pCont, pMsg->contLen) < 0) { - // TODO: handle error + goto _err; + } + break; + case TDMT_VND_MQ_COMMIT_OFFSET: + if (tqProcessOffsetCommitReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), + pMsg->contLen - sizeof(SMsgHead)) < 0) { + goto _err; } break; case TDMT_STREAM_TASK_DEPLOY: { if (tqProcessTaskDeploy(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), pMsg->contLen - sizeof(SMsgHead)) < 0) { + goto _err; } } break; case TDMT_VND_ALTER_CONFIRM: @@ -901,8 +908,8 @@ static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t version, void static int32_t vnodeProcessAlterHasnRangeReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { vInfo("vgId:%d, alter hashrange msg will be processed", TD_VID(pVnode)); - // todo - // 1. stop work + // todo + // 1. stop work // 2. adjust hash range / compact / remove wals / rename vgroups // 3. reload sync return 0; -- GitLab