提交 601e454a 编写于 作者: L Liu Jicong

enh(tmq): put offset store into vnode

上级 f35baac5
......@@ -1494,9 +1494,9 @@ typedef struct {
int32_t code;
} STaskDropRsp;
#define STREAM_TRIGGER_AT_ONCE 1
#define STREAM_TRIGGER_WINDOW_CLOSE 2
#define STREAM_TRIGGER_MAX_DELAY 3
#define STREAM_TRIGGER_AT_ONCE 1
#define STREAM_TRIGGER_WINDOW_CLOSE 2
#define STREAM_TRIGGER_MAX_DELAY 3
typedef struct {
char name[TSDB_TABLE_FNAME_LEN];
......@@ -2297,6 +2297,29 @@ int32_t tDecodeSMqOffset(SDecoder* decoder, SMqOffset* pOffset);
int32_t tEncodeSMqCMCommitOffsetReq(SEncoder* encoder, const SMqCMCommitOffsetReq* pReq);
int32_t tDecodeSMqCMCommitOffsetReq(SDecoder* decoder, SMqCMCommitOffsetReq* pReq);
// tqOffset
enum {
TMQ_OFFSET__SNAPSHOT = 1,
TMQ_OFFSET__LOG,
};
typedef struct {
int8_t type;
union {
struct {
int64_t uid;
int64_t ts;
};
struct {
int64_t version;
};
};
char subKey[TSDB_SUBSCRIBE_KEY_LEN];
} STqOffset;
int32_t tEncodeSTqOffset(SEncoder* pEncoder, const STqOffset* pOffset);
int32_t tDecodeSTqOffset(SDecoder* pDecoder, STqOffset* pOffset);
typedef struct {
char name[TSDB_TABLE_FNAME_LEN];
char stb[TSDB_TABLE_FNAME_LEN];
......
......@@ -140,7 +140,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_MND_MQ_CONSUMER_RECOVER, "consumer-recover", SMqConsumerRecoverMsg, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_MQ_DO_REBALANCE, "do-rebalance", SMqDoRebalanceMsg, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_MQ_DROP_CGROUP, "drop-cgroup", SMqDropCGroupReq, SMqDropCGroupRsp)
TD_DEF_MSG_TYPE(TDMT_MND_MQ_COMMIT_OFFSET, "commit-offset", SMqCMCommitOffsetReq, SMqCMCommitOffsetRsp)
TD_DEF_MSG_TYPE(TDMT_MND_MQ_COMMIT_OFFSET, "mnode-commit-offset", SMqCMCommitOffsetReq, SMqCMCommitOffsetRsp)
TD_DEF_MSG_TYPE(TDMT_MND_MQ_TIMER, "mq-tmr", SMTimerReq, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_TELEM_TIMER, "telem-tmr", SMTimerReq, SMTimerReq)
TD_DEF_MSG_TYPE(TDMT_MND_TRANS_TIMER, "trans-tmr", NULL, NULL)
......@@ -176,6 +176,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_DROP_STB, "vnode-drop-stb", SVDropStbReq, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_MQ_VG_CHANGE, "vnode-mq-vg-change", SMqRebVgReq, SMqRebVgRsp)
TD_DEF_MSG_TYPE(TDMT_VND_MQ_VG_DELETE, "vnode-mq-vg-delete", SMqVDeleteReq, SMqVDeleteRsp)
TD_DEF_MSG_TYPE(TDMT_VND_MQ_COMMIT_OFFSET, "vnode-commit-offset", STqOffset, STqOffset)
TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_TASK, "vnode-cancel-task", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_DROP_TASK, "vnode-drop-task", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_CREATE_TOPIC, "vnode-create-topic", NULL, NULL)
......
......@@ -205,7 +205,7 @@ int32_t execLocalCmd(SRequestObj* pRequest, SQuery* pQuery) {
SRetrieveTableRsp* pRsp = NULL;
int32_t code = qExecCommand(pQuery->pRoot, &pRsp);
if (TSDB_CODE_SUCCESS == code && NULL != pRsp) {
code = setQueryResultFromRsp(&pRequest->body.resInfo, pRsp, false, false);
code = setQueryResultFromRsp(&pRequest->body.resInfo, pRsp, false, true);
}
return code;
......@@ -232,9 +232,7 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
return TSDB_CODE_SUCCESS;
}
static SAppInstInfo* getAppInfo(SRequestObj* pRequest) {
return pRequest->pTscObj->pAppInfo;
}
static SAppInstInfo* getAppInfo(SRequestObj* pRequest) { return pRequest->pTscObj->pAppInfo; }
void asyncExecLocalCmd(SRequestObj* pRequest, SQuery* pQuery) {
SRetrieveTableRsp* pRsp = NULL;
......@@ -258,7 +256,7 @@ void asyncExecLocalCmd(SRequestObj* pRequest, SQuery* pQuery) {
}
pRequest->body.queryFp(pRequest->body.param, pRequest, 0);
// pRequest->body.fetchFp(pRequest->body.param, pRequest, pResultInfo->numOfRows);
// pRequest->body.fetchFp(pRequest->body.param, pRequest, pResultInfo->numOfRows);
}
int32_t asyncExecDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
......@@ -401,7 +399,7 @@ int32_t scheduleAsyncQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNod
SQueryResult res = {.code = 0, .numOfRows = 0};
int32_t code = schedulerAsyncExecJob(pTransporter, pNodeList, pDag, &pRequest->body.queryJob, pRequest->sqlstr,
pRequest->metric.start, schdExecCallback, &res);
pRequest->metric.start, schdExecCallback, &res);
pRequest->body.resInfo.execRes = res.res;
......@@ -457,7 +455,8 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList
return pRequest->code;
}
if (TDMT_VND_SUBMIT == pRequest->type || TDMT_VND_DELETE == pRequest->type || TDMT_VND_CREATE_TABLE == pRequest->type) {
if (TDMT_VND_SUBMIT == pRequest->type || TDMT_VND_DELETE == pRequest->type ||
TDMT_VND_CREATE_TABLE == pRequest->type) {
pRequest->body.resInfo.numOfRows = res.numOfRows;
if (pRequest->body.queryJob != 0) {
......@@ -470,9 +469,9 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList
return pRequest->code;
}
int32_t handleSubmitExecRes(SRequestObj* pRequest, void* res, SCatalog* pCatalog, SEpSet *epset) {
int32_t code = 0;
SArray* pArray = NULL;
int32_t handleSubmitExecRes(SRequestObj* pRequest, void* res, SCatalog* pCatalog, SEpSet* epset) {
int32_t code = 0;
SArray* pArray = NULL;
SSubmitRsp* pRsp = (SSubmitRsp*)res;
if (pRsp->nBlocks <= 0) {
return TSDB_CODE_SUCCESS;
......@@ -502,7 +501,7 @@ _return:
return code;
}
int32_t handleQueryExecRes(SRequestObj* pRequest, void* res, SCatalog* pCatalog, SEpSet *epset) {
int32_t handleQueryExecRes(SRequestObj* pRequest, void* res, SCatalog* pCatalog, SEpSet* epset) {
int32_t code = 0;
SArray* pArray = NULL;
SArray* pTbArray = (SArray*)res;
......@@ -540,15 +539,15 @@ int32_t handleQueryExecRsp(SRequestObj* pRequest) {
return TSDB_CODE_SUCCESS;
}
SCatalog* pCatalog = NULL;
SAppInstInfo* pAppInfo = getAppInfo(pRequest);
SCatalog* pCatalog = NULL;
SAppInstInfo* pAppInfo = getAppInfo(pRequest);
int32_t code = catalogGetHandle(pAppInfo->clusterId, &pCatalog);
if (code) {
return code;
}
SEpSet epset = getEpSet_s(&pAppInfo->mgmtEp);
SEpSet epset = getEpSet_s(&pAppInfo->mgmtEp);
SQueryExecRes* pRes = &pRequest->body.resInfo.execRes;
switch (pRes->msgType) {
......@@ -566,8 +565,8 @@ int32_t handleQueryExecRsp(SRequestObj* pRequest) {
break;
}
default:
tscError("0x%"PRIx64", invalid exec result for request type %d, reqId:0x%"PRIx64, pRequest->self,
pRequest->type, pRequest->requestId);
tscError("0x%" PRIx64 ", invalid exec result for request type %d, reqId:0x%" PRIx64, pRequest->self,
pRequest->type, pRequest->requestId);
code = TSDB_CODE_APP_ERROR;
}
......@@ -575,13 +574,13 @@ int32_t handleQueryExecRsp(SRequestObj* pRequest) {
}
void schedulerExecCb(SQueryResult* pResult, void* param, int32_t code) {
SRequestObj* pRequest = (SRequestObj*) param;
SRequestObj* pRequest = (SRequestObj*)param;
pRequest->code = code;
STscObj* pTscObj = pRequest->pTscObj;
if (code != TSDB_CODE_SUCCESS && NEED_CLIENT_HANDLE_ERROR(code)) {
tscDebug("0x%"PRIx64" client retry to handle the error, code:%d - %s, tryCount:%d, reqId:0x%"PRIx64, pRequest->self, code, tstrerror(code),
pRequest->retry, pRequest->requestId);
tscDebug("0x%" PRIx64 " client retry to handle the error, code:%d - %s, tryCount:%d, reqId:0x%" PRIx64,
pRequest->self, code, tstrerror(code), pRequest->retry, pRequest->requestId);
pRequest->prevCode = code;
doAsyncQuery(pRequest, true);
return;
......@@ -589,7 +588,7 @@ void schedulerExecCb(SQueryResult* pResult, void* param, int32_t code) {
if (code == TSDB_CODE_SUCCESS) {
code = handleQueryExecRsp(pRequest);
ASSERT(pRequest->code == TSDB_CODE_SUCCESS);
ASSERT(pRequest->code == TSDB_CODE_SUCCESS);
pRequest->code = code;
}
......@@ -697,16 +696,17 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery) {
schedulerAsyncExecJob(pAppInfo->pTransporter, pNodeList, pRequest->body.pDag, &pRequest->body.queryJob,
pRequest->sqlstr, pRequest->metric.start, schedulerExecCb, pRequest);
} else {
tscError("0x%"PRIx64" failed to create query plan, code:%s 0x%"PRIx64, pRequest->self, tstrerror(code), pRequest->requestId);
tscError("0x%" PRIx64 " failed to create query plan, code:%s 0x%" PRIx64, pRequest->self, tstrerror(code),
pRequest->requestId);
pRequest->body.queryFp(pRequest->body.param, pRequest, code);
}
//todo not to be released here
// todo not to be released here
taosArrayDestroy(pNodeList);
break;
}
case QUERY_EXEC_MODE_EMPTY_RESULT:
pRequest->type = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
pRequest->type = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
pRequest->body.queryFp(pRequest->body.param, pRequest, 0);
break;
default:
......@@ -1349,14 +1349,14 @@ int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32
p += sizeof(uint64_t);
// check fields
for(int32_t i = 0; i < numOfCols; ++i) {
int16_t type = *(int16_t*) p;
for (int32_t i = 0; i < numOfCols; ++i) {
int16_t type = *(int16_t*)p;
p += sizeof(int16_t);
int32_t bytes = *(int32_t*) p;
int32_t bytes = *(int32_t*)p;
p += sizeof(int32_t);
// ASSERT(type == pFields[i].type && bytes == pFields[i].bytes);
ASSERT(type == pFields[i].type && bytes == pFields[i].bytes);
}
int32_t* colLength = (int32_t*)p;
......
......@@ -132,6 +132,7 @@ typedef struct {
// statistics
int64_t pollCnt;
// offset
int64_t committedOffset;
int64_t currentOffset;
// connection info
int32_t vgId;
......@@ -193,6 +194,26 @@ typedef struct {
void* userParam;
} SMqCommitCbParam;
typedef struct {
tmq_t* tmq;
int8_t automatic;
int8_t async;
int8_t freeOffsets;
int8_t waitingRspNum;
int8_t totalRspNum;
tmq_resp_err_t rspErr;
tmq_commit_cb* userCb;
SArray* successfulOffsets;
SArray* failedOffsets;
void* userParam;
tsem_t rspSem;
} SMqCommitCbParamSet;
typedef struct {
SMqCommitCbParamSet* params;
STqOffset* pOffset;
} SMqCommitCbParam2;
tmq_conf_t* tmq_conf_new() {
tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t));
conf->withTbName = false;
......@@ -343,6 +364,135 @@ int32_t tmqCommitCb(void* param, const SDataBuf* pMsg, int32_t code) {
return 0;
}
int32_t tmqCommitCb2(void* param, const SDataBuf* pBuf, int32_t code) {
SMqCommitCbParam2* pParam = (SMqCommitCbParam2*)param;
SMqCommitCbParamSet* pParamSet = (SMqCommitCbParamSet*)pParam->params;
// push into array
if (code == 0) {
taosArrayPush(pParamSet->failedOffsets, &pParam->pOffset);
} else {
taosArrayPush(pParamSet->successfulOffsets, &pParam->pOffset);
}
// count down waiting rsp
int8_t waitingRspNum = atomic_sub_fetch_8(&pParam->params->waitingRspNum, 1);
ASSERT(waitingRspNum >= 0);
if (waitingRspNum == 0) {
// if no more waiting rsp
if (pParamSet->async) {
// call async cb func
if (pParamSet->automatic && pParamSet->tmq->commitCb) {
pParamSet->tmq->commitCb(pParamSet->tmq, pParamSet->rspErr, NULL, pParamSet->tmq->commitCbUserParam);
} else if (!pParamSet->automatic && pParamSet->userCb) {
// sem post
pParamSet->userCb(pParamSet->tmq, pParamSet->rspErr, NULL, pParamSet->userParam);
}
}
taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree);
taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree);
}
return 0;
}
int32_t tmqComitInner2(tmq_t* tmq, int8_t automatic, int8_t async, tmq_commit_cb* userCb, void* userParam) {
int32_t code = -1;
SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
if (pParamSet == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pParamSet->tmq = tmq;
pParamSet->automatic = automatic;
pParamSet->async = async;
pParamSet->freeOffsets = 1;
pParamSet->userCb = userCb;
pParamSet->userParam = userParam;
tsem_init(&pParamSet->rspSem, 0, 0);
for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
for (int32_t j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, i);
STqOffset* pOffset = taosMemoryCalloc(1, sizeof(STqOffset));
if (pOffset == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
int32_t tlen = strlen(tmq->groupId);
memcpy(pOffset->subKey, tmq->groupId, tlen);
pOffset->subKey[tlen] = TMQ_SEPARATOR;
strcpy(pOffset->subKey + tlen + 1, pTopic->topicName);
int32_t len;
int32_t code;
tEncodeSize(tEncodeSTqOffset, pOffset, len, code);
if (code < 0) {
ASSERT(0);
}
void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
((SMsgHead*)buf)->vgId = htonl(pVg->vgId);
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
SEncoder encoder;
tEncoderInit(&encoder, abuf, len);
tEncodeSTqOffset(&encoder, pOffset);
// build param
SMqCommitCbParam2* pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam2));
pParam->params = pParamSet;
pParam->pOffset = pOffset;
// build send info
SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (pMsgSendInfo == NULL) {
// TODO
continue;
}
pMsgSendInfo->msgInfo = (SDataBuf){
.pData = buf,
.len = len,
.handle = NULL,
};
pMsgSendInfo->requestId = generateRequestId();
pMsgSendInfo->requestObjRefId = 0;
pMsgSendInfo->param = pParam;
pMsgSendInfo->fp = tmqCommitCb2;
pMsgSendInfo->msgType = TDMT_MND_MQ_COMMIT_OFFSET;
// send msg
SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
int64_t transporterId = 0;
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, pMsgSendInfo);
}
}
if (!async) {
tsem_wait(&pParamSet->rspSem);
code = pParamSet->rspErr;
tsem_destroy(&pParamSet->rspSem);
} else {
code = 0;
}
if (code != 0 && async) {
if (automatic) {
tmq->commitCb(tmq, code, NULL, tmq->commitCbUserParam);
} else {
userCb(tmq, code, NULL, userParam);
}
}
if (!async) {
taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree);
taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree);
}
return 0;
}
int32_t tmqCommitInner(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int8_t automatic, int8_t async,
tmq_commit_cb* userCb, void* userParam) {
SMqCMCommitOffsetReq req;
......@@ -890,12 +1040,13 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) {
sprintf(vgKey, "%s:%d", topic.topicName, pVgEp->vgId);
int64_t* pOffset = taosHashGet(pHash, vgKey, strlen(vgKey));
int64_t offset = pVgEp->offset;
tscDebug("consumer %ld epoch %d vg %d offset og to %ld", tmq->consumerId, epoch, pVgEp->vgId, offset);
tscDebug("consumer %ld(epoch %d) original offset of vg %d is %ld", tmq->consumerId, epoch, pVgEp->vgId, offset);
if (pOffset != NULL) {
offset = *pOffset;
tscDebug("consumer %ld epoch %d vg %d found %s", tmq->consumerId, epoch, pVgEp->vgId, vgKey);
tscDebug("consumer %ld(epoch %d) receive offset of vg %d, full key is %s", tmq->consumerId, epoch, pVgEp->vgId,
vgKey);
}
tscDebug("consumer %ld epoch %d vg %d offset set to %ld", tmq->consumerId, epoch, pVgEp->vgId, offset);
tscDebug("consumer %ld(epoch %d) offset of vg %d updated to %ld", tmq->consumerId, epoch, pVgEp->vgId, offset);
SMqClientVg clientVg = {
.pollCnt = 0,
.currentOffset = offset,
......@@ -1226,9 +1377,8 @@ SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) {
SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
/*atomic_sub_fetch_32(&tmq->readyRequest, 1);*/
/*printf("handle poll rsp %d\n", rspMsg->head.mqMsgType);*/
if (pollRspWrapper->msg.head.epoch == atomic_load_32(&tmq->epoch)) {
/*printf("epoch match\n");*/
int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
if (pollRspWrapper->msg.head.epoch == consumerEpoch) {
SMqClientVg* pVg = pollRspWrapper->vgHandle;
/*printf("vg %d offset %ld up to %ld\n", pVg->vgId, pVg->currentOffset, rspMsg->msg.rspOffset);*/
pVg->currentOffset = pollRspWrapper->msg.rspOffset;
......@@ -1243,7 +1393,8 @@ SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
taosFreeQitem(pollRspWrapper);
return pRsp;
} else {
/*printf("epoch mismatch\n");*/
tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d\n", pollRspWrapper->msg.head.epoch,
consumerEpoch);
taosFreeQitem(pollRspWrapper);
}
} else {
......@@ -1263,10 +1414,14 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
SMqRspObj* rspObj;
int64_t startTime = taosGetTimestampMs();
#if 0
tmqHandleAllDelayedTask(tmq);
tmqPollImpl(tmq, timeout);
rspObj = tmqHandleAllRsp(tmq, timeout, false);
if (rspObj) {
return (TAOS_RES*)rspObj;
}
#endif
// in no topic status also need process delayed task
if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__INIT) {
......@@ -1359,8 +1514,7 @@ const char* tmq_get_table_name(TAOS_RES* res) {
pRspObj->resIter >= pRspObj->rsp.blockNum) {
return NULL;
}
const char* name = taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter);
return name;
return (const char*)taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter);
}
return NULL;
}
......
......@@ -1756,7 +1756,7 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
createTbReq.type = TSDB_CHILD_TABLE;
createTbReq.ctb.suid = suid;
STagVal tagVal = {.cid = 1,
STagVal tagVal = {.cid = pDataBlock->info.numOfCols + 1,
.type = TSDB_DATA_TYPE_UBIGINT,
.pData = (uint8_t*)&pDataBlock->info.groupId,
.nData = sizeof(uint64_t)};
......@@ -1821,7 +1821,7 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
createTbReq.type = TSDB_CHILD_TABLE;
createTbReq.ctb.suid = suid;
STagVal tagVal = {.cid = 1,
STagVal tagVal = {.cid = pDataBlock->info.numOfCols + 1,
.type = TSDB_DATA_TYPE_UBIGINT,
.pData = (uint8_t*)&pDataBlock->info.groupId,
.nData = sizeof(uint64_t)};
......
......@@ -4746,3 +4746,32 @@ void tFreeSMAlterStbRsp(SMAlterStbRsp *pRsp) {
taosMemoryFree(pRsp->pMeta);
}
}
int32_t tEncodeSTqOffset(SEncoder *pEncoder, const STqOffset *pOffset) {
if (tEncodeI8(pEncoder, pOffset->type) < 0) return -1;
if (pOffset->type == TMQ_OFFSET__SNAPSHOT) {
if (tEncodeI64(pEncoder, pOffset->uid) < 0) return -1;
if (tEncodeI64(pEncoder, pOffset->ts) < 0) return -1;
} else if (pOffset->type == TMQ_OFFSET__LOG) {
if (tEncodeI64(pEncoder, pOffset->version) < 0) return -1;
} else {
ASSERT(0);
}
if (tEncodeCStr(pEncoder, pOffset->subKey) < 0) return -1;
return 0;
}
int32_t tDecodeSTqOffset(SDecoder *pDecoder, STqOffset *pOffset) {
if (tDecodeI8(pDecoder, &pOffset->type) < 0) return -1;
if (pOffset->type == TMQ_OFFSET__SNAPSHOT) {
if (tDecodeI64(pDecoder, &pOffset->uid) < 0) return -1;
if (tDecodeI64(pDecoder, &pOffset->ts) < 0) return -1;
} else if (pOffset->type == TMQ_OFFSET__LOG) {
if (tDecodeI64(pDecoder, &pOffset->version) < 0) return -1;
} else {
ASSERT(0);
}
if (tDecodeCStrTo(pDecoder, pOffset->subKey) < 0) return -1;
return 0;
}
......@@ -344,6 +344,7 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_SUBMIT_RSMA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_CHANGE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_COMMIT_OFFSET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_CONSUME, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_HEARTBEAT, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
......
......@@ -41,7 +41,6 @@ extern "C" {
#define tqTrace(...) do { if (tqDebugFlag & DEBUG_TRACE) { taosPrintLog("TQ ", DEBUG_TRACE, tqDebugFlag, __VA_ARGS__); }} while(0)
// clang-format on
typedef struct STqOffsetCfg STqOffsetCfg;
typedef struct STqOffsetStore STqOffsetStore;
// tqRead
......@@ -127,14 +126,15 @@ typedef struct {
} STqHandle;
struct STQ {
char* path;
SHashObj* pushMgr; // consumerId -> STqHandle*
SHashObj* handles; // subKey -> STqHandle
SHashObj* pStreamTasks; // taksId -> SStreamTask
SVnode* pVnode;
SWal* pWal;
TDB* pMetaStore;
TTB* pExecStore;
char* path;
SHashObj* pushMgr; // consumerId -> STqHandle*
SHashObj* handles; // subKey -> STqHandle
SHashObj* pStreamTasks; // taksId -> SStreamTask
STqOffsetStore* pOffsetStore;
SVnode* pVnode;
SWal* pWal;
TDB* pMetaStore;
TTB* pExecStore;
};
typedef struct {
......@@ -157,16 +157,18 @@ int32_t tqMetaClose(STQ* pTq);
int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle);
int32_t tqMetaDeleteHandle(STQ* pTq, const char* key);
// tqSink
void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data);
typedef struct {
int32_t size;
} STqOffsetHead;
// tqOffset
STqOffsetStore* tqOffsetOpen(STqOffsetCfg*);
STqOffsetStore* tqOffsetOpen();
void tqOffsetClose(STqOffsetStore*);
int64_t tqOffsetFetch(STqOffsetStore* pStore, const char* subscribeKey);
int32_t tqOffsetCommit(STqOffsetStore* pStore, const char* subscribeKey, int64_t offset);
int32_t tqOffsetPersist(STqOffsetStore* pStore, const char* subscribeKey);
int32_t tqOffsetPersistAll(STqOffsetStore* pStore);
STqOffset* tqOffsetRead(STqOffsetStore* pStore, const char* subscribeKey);
int32_t tqOffsetWrite(STqOffsetStore* pStore, const STqOffset* pOffset);
int32_t tqOffsetSnapshot(STqOffsetStore* pStore);
// tqSink
void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data);
#ifdef __cplusplus
}
......
......@@ -137,6 +137,7 @@ int tqCommit(STQ*);
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd);
int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen);
int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen);
int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen);
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId);
int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen);
int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* data);
......
......@@ -66,19 +66,23 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal) {
ASSERT(0);
}
if (tqOffsetOpen(pTq) < 0) {
ASSERT(0);
}
return pTq;
}
void tqClose(STQ* pTq) {
if (pTq) {
taosMemoryFreeClear(pTq->path);
tqOffsetClose(pTq->pOffsetStore);
taosHashCleanup(pTq->handles);
taosHashCleanup(pTq->pStreamTasks);
taosHashCleanup(pTq->pushMgr);
taosMemoryFree(pTq->path);
tqMetaClose(pTq);
taosMemoryFree(pTq);
}
// TODO
}
int32_t tqSendPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataBlkRsp* pRsp) {
......@@ -109,6 +113,33 @@ int32_t tqSendPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con
return 0;
}
int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen) {
STqOffset offset = {0};
SDecoder decoder;
tDecoderInit(&decoder, msg, msgLen);
if (tDecodeSTqOffset(&decoder, &offset) < 0) {
ASSERT(0);
return -1;
}
tDecoderClear(&decoder);
if (offset.type == TMQ_OFFSET__SNAPSHOT) {
tqDebug("receive offset commit msg to %s, offset(type:snapshot) uid: %ld, ts: %ld", offset.subKey, offset.uid,
offset.ts);
} else if (offset.type == TMQ_OFFSET__LOG) {
tqDebug("receive offset commit msg to %s, offset(type:log) version: %ld", offset.subKey, offset.version);
} else {
ASSERT(0);
}
if (tqOffsetWrite(pTq->pOffsetStore, &offset) < 0) {
ASSERT(0);
return -1;
}
return 0;
}
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
SMqPollReq* pReq = pMsg->pCont;
int64_t consumerId = pReq->consumerId;
......
......@@ -16,26 +16,113 @@
#include "tq.h"
enum ETqOffsetPersist {
TQ_OFFSET_PERSIST__LAZY = 1,
TQ_OFFSET_PERSIST__EAGER,
};
struct STqOffsetCfg {
int8_t persistPolicy;
};
struct STqOffsetStore {
STqOffsetCfg cfg;
SHashObj* pHash; // SHashObj<subscribeKey, offset>
STQ* pTq;
SHashObj* pHash; // SHashObj<subscribeKey, offset>
};
STqOffsetStore* tqOffsetOpen(STqOffsetCfg* pCfg) {
STqOffsetStore* pStore = taosMemoryMalloc(sizeof(STqOffsetStore));
STqOffsetStore* tqOffsetOpen(STQ* pTq) {
STqOffsetStore* pStore = taosMemoryCalloc(1, sizeof(STqOffsetStore));
if (pStore == NULL) {
return NULL;
}
memcpy(&pStore->cfg, pCfg, sizeof(STqOffsetCfg));
pStore->pHash = taosHashInit(64, MurmurHash3_32, true, HASH_NO_LOCK);
if (pStore->pHash == NULL) {
if (pStore->pHash) taosHashCleanup(pStore->pHash);
return NULL;
}
TdFilePtr pFile = taosOpenFile(pStore->pTq->path, TD_FILE_READ);
if (pFile != NULL) {
STqOffsetHead head = {0};
int64_t code;
while (1) {
if ((code = taosReadFile(pFile, &head, sizeof(STqOffsetHead))) != sizeof(STqOffsetHead)) {
if (code < 0) {
break;
} else {
ASSERT(0);
// TODO handle error
}
}
int32_t size = htonl(head.size);
void* memBuf = taosMemoryCalloc(1, size);
if ((code = taosReadFile(pFile, memBuf, size)) != size) {
ASSERT(0);
// TODO handle error
}
STqOffset offset;
SDecoder decoder;
tDecoderInit(&decoder, memBuf, size);
if (tDecodeSTqOffset(&decoder, &offset) < 0) {
ASSERT(0);
}
tDecoderClear(&decoder);
if (taosHashPut(pStore->pHash, offset.subKey, strlen(offset.subKey), &offset, sizeof(STqOffset)) < 0) {
ASSERT(0);
// TODO
}
}
taosCloseFile(&pFile);
}
return pStore;
}
void tqOffsetClose(STqOffsetStore* pStore) {
tqOffsetSnapshot(pStore);
taosHashCleanup(pStore->pHash);
}
STqOffset* tqOffsetRead(STqOffsetStore* pStore, const char* subscribeKey) {
return (STqOffset*)taosHashGet(pStore->pHash, subscribeKey, strlen(subscribeKey));
}
int32_t tqOffsetWrite(STqOffsetStore* pStore, const STqOffset* pOffset) {
return taosHashPut(pStore->pHash, pOffset->subKey, strlen(pOffset->subKey), pOffset, sizeof(STqOffset));
}
int32_t tqOffsetSnapshot(STqOffsetStore* pStore) {
// open file
// TODO file name should be with a version
TdFilePtr pFile = taosOpenFile(pStore->pTq->path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
if (pFile == NULL) {
ASSERT(0);
return -1;
}
void* pIter = NULL;
while (1) {
pIter = taosHashIterate(pStore->pHash, pIter);
if (pIter == NULL) break;
STqOffset* pOffset = (STqOffset*)pIter;
int32_t bodyLen;
int32_t code;
tEncodeSize(tEncodeSTqOffset, pOffset, bodyLen, code);
ASSERT(code == 0);
if (code < 0) {
ASSERT(0);
taosHashCancelIterate(pStore->pHash, pIter);
return -1;
}
int32_t totLen = sizeof(STqOffsetHead) + bodyLen;
void* buf = taosMemoryCalloc(1, totLen);
void* abuf = POINTER_SHIFT(buf, sizeof(STqOffsetHead));
((STqOffsetHead*)buf)->size = htonl(bodyLen);
SEncoder encoder;
tEncoderInit(&encoder, abuf, bodyLen);
tEncodeSTqOffset(&encoder, pOffset);
// write file
int64_t writeLen;
if ((writeLen = taosWriteFile(pFile, buf, totLen)) != bodyLen) {
ASSERT(0);
tqError("write offset incomplete, len %d, write len %ld", bodyLen, writeLen);
taosHashCancelIterate(pStore->pHash, pIter);
return -1;
}
}
// close and rename file
taosCloseFile(&pFile);
return 0;
}
......@@ -148,17 +148,24 @@ int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
case TDMT_VND_MQ_VG_CHANGE:
if (tqProcessVgChangeReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
pMsg->contLen - sizeof(SMsgHead)) < 0) {
// TODO: handle error
goto _err;
}
break;
case TDMT_VND_MQ_VG_DELETE:
if (tqProcessVgDeleteReq(pVnode->pTq, pMsg->pCont, pMsg->contLen) < 0) {
// TODO: handle error
goto _err;
}
break;
case TDMT_VND_MQ_COMMIT_OFFSET:
if (tqProcessOffsetCommitReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
pMsg->contLen - sizeof(SMsgHead)) < 0) {
goto _err;
}
break;
case TDMT_STREAM_TASK_DEPLOY: {
if (tqProcessTaskDeploy(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
pMsg->contLen - sizeof(SMsgHead)) < 0) {
goto _err;
}
} break;
case TDMT_VND_ALTER_CONFIRM:
......@@ -901,8 +908,8 @@ static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t version, void
static int32_t vnodeProcessAlterHasnRangeReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
vInfo("vgId:%d, alter hashrange msg will be processed", TD_VID(pVnode));
// todo
// 1. stop work
// todo
// 1. stop work
// 2. adjust hash range / compact / remove wals / rename vgroups
// 3. reload sync
return 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册