提交 dbb1e9bd 编写于 作者: H Haojun Liao

refactor: do some internal refactor.

上级 24c38d30
...@@ -824,7 +824,7 @@ int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) { ...@@ -824,7 +824,7 @@ int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) {
int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t)); int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
*pRefId = pTmq->refId; *pRefId = pTmq->refId;
tscDebug("consumer:0x%"PRIx64" next retrieve ep from mnode in 1s", pTmq->consumerId); tscDebug("consumer:0x%"PRIx64" retrieve ep from mnode in 1s", pTmq->consumerId);
taosTmrReset(tmqAssignAskEpTask, 1000, pRefId, tmqMgmt.timer, &pTmq->epTimer); taosTmrReset(tmqAssignAskEpTask, 1000, pRefId, tmqMgmt.timer, &pTmq->epTimer);
} else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) { } else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) {
tmqCommitInner(pTmq, NULL, 1, 1, pTmq->commitCb, pTmq->commitCbUserParam); tmqCommitInner(pTmq, NULL, 1, 1, pTmq->commitCb, pTmq->commitCbUserParam);
...@@ -832,7 +832,7 @@ int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) { ...@@ -832,7 +832,7 @@ int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) {
int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t)); int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
*pRefId = pTmq->refId; *pRefId = pTmq->refId;
tscDebug("consumer:0x%"PRIx64" next commit to mnode in %.2fs", pTmq->consumerId, pTmq->autoCommitInterval/1000.0); tscDebug("consumer:0x%"PRIx64" commit to vnode(s) in %.2fs", pTmq->consumerId, pTmq->autoCommitInterval/1000.0);
taosTmrReset(tmqAssignDelayedCommitTask, pTmq->autoCommitInterval, pRefId, tmqMgmt.timer, &pTmq->commitTimer); taosTmrReset(tmqAssignDelayedCommitTask, pTmq->autoCommitInterval, pRefId, tmqMgmt.timer, &pTmq->commitTimer);
} else if (*pTaskType == TMQ_DELAYED_TASK__REPORT) { } else if (*pTaskType == TMQ_DELAYED_TASK__REPORT) {
// do nothing // do nothing
...@@ -1578,25 +1578,20 @@ int32_t tmqAskEp(tmq_t* tmq, bool async) { ...@@ -1578,25 +1578,20 @@ int32_t tmqAskEp(tmq_t* tmq, bool async) {
} }
void tmqBuildConsumeReqImpl(SMqPollReq* pReq, tmq_t* tmq, int64_t timeout, SMqClientTopic* pTopic, SMqClientVg* pVg) { void tmqBuildConsumeReqImpl(SMqPollReq* pReq, tmq_t* tmq, int64_t timeout, SMqClientTopic* pTopic, SMqClientVg* pVg) {
/*strcpy(pReq->topic, pTopic->topicName);*/
/*strcpy(pReq->cgroup, tmq->groupId);*/
int32_t groupLen = strlen(tmq->groupId); int32_t groupLen = strlen(tmq->groupId);
memcpy(pReq->subKey, tmq->groupId, groupLen); memcpy(pReq->subKey, tmq->groupId, groupLen);
pReq->subKey[groupLen] = TMQ_SEPARATOR; pReq->subKey[groupLen] = TMQ_SEPARATOR;
strcpy(pReq->subKey + groupLen + 1, pTopic->topicName); strcpy(pReq->subKey + groupLen + 1, pTopic->topicName);
pReq->withTbName = tmq->withTbName; pReq->withTbName = tmq->withTbName;
pReq->timeout = timeout;
pReq->consumerId = tmq->consumerId; pReq->consumerId = tmq->consumerId;
pReq->timeout = timeout;
pReq->epoch = tmq->epoch; pReq->epoch = tmq->epoch;
/*pReq->currentOffset = reqOffset;*/ /*pReq->currentOffset = reqOffset;*/
pReq->reqOffset = pVg->currentOffset; pReq->reqOffset = pVg->currentOffset;
pReq->reqId = generateRequestId();
pReq->useSnapshot = tmq->useSnapshot;
pReq->head.vgId = pVg->vgId; pReq->head.vgId = pVg->vgId;
pReq->useSnapshot = tmq->useSnapshot;
pReq->reqId = generateRequestId();
} }
SMqMetaRspObj* tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) { SMqMetaRspObj* tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) {
...@@ -1646,79 +1641,48 @@ SMqTaosxRspObj* tmqBuildTaosxRspFromWrapper(SMqPollRspWrapper* pWrapper) { ...@@ -1646,79 +1641,48 @@ SMqTaosxRspObj* tmqBuildTaosxRspFromWrapper(SMqPollRspWrapper* pWrapper) {
return pRspObj; return pRspObj;
} }
// broadcast the poll request to all related vnodes static int32_t handleErrorBeforePoll(SMqClientVg* pVg, tmq_t* pTmq) {
int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics); tsem_post(&pTmq->rspSem);
tscDebug("consumer:0x%" PRIx64" start to poll data, numOfTopics:%d", tmq->consumerId, numOfTopics); return -1;
}
for (int i = 0; i < numOfTopics; i++) {
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
if (vgStatus == TMQ_VG_STATUS__WAIT) {
int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1);
tscDebug("consumer:0x%" PRIx64 " epoch %d wait poll-rsp, skip vgId:%d skip cnt %d", tmq->consumerId, tmq->epoch, pVg->vgId,
vgSkipCnt);
continue;
/*if (vgSkipCnt < 10000) continue;*/
#if 0
if (skipCnt < 30000) {
continue;
} else {
tscDebug("consumer:0x%" PRIx64 ",skip vgId:%d skip too much reset", tmq->consumerId, pVg->vgId);
}
#endif
}
atomic_store_32(&pVg->vgSkipCnt, 0);
static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* pVg, int64_t timeout) {
SMqPollReq req = {0}; SMqPollReq req = {0};
tmqBuildConsumeReqImpl(&req, tmq, timeout, pTopic, pVg); tmqBuildConsumeReqImpl(&req, pTmq, timeout, pTopic, pVg);
int32_t msgSize = tSerializeSMqPollReq(NULL, 0, &req); int32_t msgSize = tSerializeSMqPollReq(NULL, 0, &req);
if (msgSize < 0) { if (msgSize < 0) {
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); return handleErrorBeforePoll(pVg, pTmq);
tsem_post(&tmq->rspSem);
return -1;
} }
char* msg = taosMemoryCalloc(1, msgSize); char* msg = taosMemoryCalloc(1, msgSize);
if (NULL == msg) { if (NULL == msg) {
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); return handleErrorBeforePoll(pVg, pTmq);
tsem_post(&tmq->rspSem);
return -1;
} }
if (tSerializeSMqPollReq(msg, msgSize, &req) < 0) { if (tSerializeSMqPollReq(msg, msgSize, &req) < 0) {
taosMemoryFree(msg); taosMemoryFree(msg);
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); return handleErrorBeforePoll(pVg, pTmq);
tsem_post(&tmq->rspSem);
return -1;
} }
SMqPollCbParam* pParam = taosMemoryMalloc(sizeof(SMqPollCbParam)); SMqPollCbParam* pParam = taosMemoryMalloc(sizeof(SMqPollCbParam));
if (pParam == NULL) { if (pParam == NULL) {
taosMemoryFree(msg); taosMemoryFree(msg);
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); return handleErrorBeforePoll(pVg, pTmq);
tsem_post(&tmq->rspSem);
return -1;
} }
pParam->refId = tmq->refId; pParam->refId = pTmq->refId;
pParam->epoch = tmq->epoch; pParam->epoch = pTmq->epoch;
pParam->pVg = pVg; // pVg may be released,fix it
pParam->pVg = pVg;
pParam->pTopic = pTopic; pParam->pTopic = pTopic;
pParam->vgId = pVg->vgId; pParam->vgId = pVg->vgId;
SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (sendInfo == NULL) { if (sendInfo == NULL) {
taosMemoryFree(msg);
taosMemoryFree(pParam); taosMemoryFree(pParam);
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); taosMemoryFree(msg);
tsem_post(&tmq->rspSem); return handleErrorBeforePoll(pVg, pTmq);
return -1;
} }
sendInfo->msgInfo = (SDataBuf){ sendInfo->msgInfo = (SDataBuf){
...@@ -1734,16 +1698,52 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { ...@@ -1734,16 +1698,52 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
sendInfo->msgType = TDMT_VND_TMQ_CONSUME; sendInfo->msgType = TDMT_VND_TMQ_CONSUME;
int64_t transporterId = 0; int64_t transporterId = 0;
char offsetFormatBuf[80]; char offsetFormatBuf[80];
tFormatOffset(offsetFormatBuf, 80, &pVg->currentOffset); tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pVg->currentOffset);
tscDebug("consumer:0x%" PRIx64 ", send poll to %s vgId:%d, epoch %d, req offset:%s, reqId:0x%" PRIx64, tscDebug("consumer:0x%" PRIx64 " send poll to %s vgId:%d, epoch %d, req offset:%s, reqId:0x%" PRIx64,
tmq->consumerId, pTopic->topicName, pVg->vgId, tmq->epoch, offsetFormatBuf, req.reqId); pTmq->consumerId, pTopic->topicName, pVg->vgId, pTmq->epoch, offsetFormatBuf, req.reqId);
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo); asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
pVg->pollCnt++; pVg->pollCnt++;
tmq->pollCnt++; pTmq->pollCnt++;
return TSDB_CODE_SUCCESS;
}
// broadcast the poll request to all related vnodes
int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
tscDebug("consumer:0x%" PRIx64" start to poll data, numOfTopics:%d", tmq->consumerId, numOfTopics);
for (int i = 0; i < numOfTopics; i++) {
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
int32_t numOfVg = taosArrayGetSize(pTopic->vgs);
for (int j = 0; j < numOfVg; j++) {
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
if (vgStatus == TMQ_VG_STATUS__WAIT) {
int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1);
tscDebug("consumer:0x%" PRIx64 " epoch %d wait poll-rsp, skip vgId:%d skip cnt %d", tmq->consumerId, tmq->epoch, pVg->vgId,
vgSkipCnt);
continue;
/*if (vgSkipCnt < 10000) continue;*/
#if 0
if (skipCnt < 30000) {
continue;
} else {
tscDebug("consumer:0x%" PRIx64 ",skip vgId:%d skip too much reset", tmq->consumerId, pVg->vgId);
}
#endif
}
atomic_store_32(&pVg->vgSkipCnt, 0);
int32_t code = doTmqPollImpl(tmq, pTopic, pVg, timeout);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
} }
} }
......
...@@ -27,13 +27,8 @@ void mndCleanupTopic(SMnode *pMnode); ...@@ -27,13 +27,8 @@ void mndCleanupTopic(SMnode *pMnode);
SMqTopicObj *mndAcquireTopic(SMnode *pMnode, const char *topicName); SMqTopicObj *mndAcquireTopic(SMnode *pMnode, const char *topicName);
void mndReleaseTopic(SMnode *pMnode, SMqTopicObj *pTopic); void mndReleaseTopic(SMnode *pMnode, SMqTopicObj *pTopic);
SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic);
SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw);
int32_t mndDropTopicByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb); int32_t mndDropTopicByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb);
int32_t mndCheckTopicExist(SMnode *pMnode, SDbObj *pDb); bool mndTopicExistsForDb(SMnode *pMnode, SDbObj *pDb);
const char *mndTopicGetShowName(const char topic[TSDB_TOPIC_FNAME_LEN]); const char *mndTopicGetShowName(const char topic[TSDB_TOPIC_FNAME_LEN]);
int32_t mndSetTopicCommitLogs(SMnode *pMnode, STrans *pTrans, SMqTopicObj *pTopic); int32_t mndSetTopicCommitLogs(SMnode *pMnode, STrans *pTrans, SMqTopicObj *pTopic);
......
...@@ -1035,13 +1035,23 @@ static int32_t mndBuildDropDbRsp(SDbObj *pDb, int32_t *pRspLen, void **ppRsp, bo ...@@ -1035,13 +1035,23 @@ static int32_t mndBuildDropDbRsp(SDbObj *pDb, int32_t *pRspLen, void **ppRsp, bo
static int32_t mndDropDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb) { static int32_t mndDropDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb) {
int32_t code = -1; int32_t code = -1;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq, "drop-db"); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq, "drop-db");
if (pTrans == NULL) goto _OVER; if (pTrans == NULL) {
goto _OVER;
}
mInfo("trans:%d start to drop db:%s", pTrans->id, pDb->name);
mInfo("trans:%d, used to drop db:%s", pTrans->id, pDb->name);
mndTransSetDbName(pTrans, pDb->name, NULL); mndTransSetDbName(pTrans, pDb->name, NULL);
if (mndTrancCheckConflict(pMnode, pTrans) != 0) goto _OVER; if (mndTrancCheckConflict(pMnode, pTrans) != 0) {
if (mndCheckTopicExist(pMnode, pDb) < 0) goto _OVER; goto _OVER;
}
if (mndTopicExistsForDb(pMnode, pDb)) {
terrno = TSDB_CODE_MND_TOPIC_MUST_BE_DELETED;
goto _OVER;
}
if (mndSetDropDbRedoLogs(pMnode, pTrans, pDb) != 0) goto _OVER; if (mndSetDropDbRedoLogs(pMnode, pTrans, pDb) != 0) goto _OVER;
if (mndSetDropDbCommitLogs(pMnode, pTrans, pDb) != 0) goto _OVER; if (mndSetDropDbCommitLogs(pMnode, pTrans, pDb) != 0) goto _OVER;
...@@ -1092,10 +1102,12 @@ static int32_t mndProcessDropDbReq(SRpcMsg *pReq) { ...@@ -1092,10 +1102,12 @@ static int32_t mndProcessDropDbReq(SRpcMsg *pReq) {
} }
code = mndDropDb(pMnode, pReq, pDb); code = mndDropDb(pMnode, pReq, pDb);
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; if (code == TSDB_CODE_SUCCESS) {
code = TSDB_CODE_ACTION_IN_PROGRESS;
}
_OVER: _OVER:
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("db:%s, failed to drop since %s", dropReq.db, terrstr()); mError("db:%s, failed to drop since %s", dropReq.db, terrstr());
} }
......
...@@ -31,6 +31,9 @@ ...@@ -31,6 +31,9 @@
#define MND_TOPIC_VER_NUMBER 2 #define MND_TOPIC_VER_NUMBER 2
#define MND_TOPIC_RESERVE_SIZE 64 #define MND_TOPIC_RESERVE_SIZE 64
SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic);
SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw);
static int32_t mndTopicActionInsert(SSdb *pSdb, SMqTopicObj *pTopic); static int32_t mndTopicActionInsert(SSdb *pSdb, SMqTopicObj *pTopic);
static int32_t mndTopicActionDelete(SSdb *pSdb, SMqTopicObj *pTopic); static int32_t mndTopicActionDelete(SSdb *pSdb, SMqTopicObj *pTopic);
static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pOldTopic, SMqTopicObj *pNewTopic); static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pOldTopic, SMqTopicObj *pNewTopic);
...@@ -79,6 +82,7 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) { ...@@ -79,6 +82,7 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
if (pTopic->physicalPlan) { if (pTopic->physicalPlan) {
physicalPlanLen = strlen(pTopic->physicalPlan) + 1; physicalPlanLen = strlen(pTopic->physicalPlan) + 1;
} }
int32_t schemaLen = 0; int32_t schemaLen = 0;
if (pTopic->schema.nCols) { if (pTopic->schema.nCols) {
schemaLen = taosEncodeSSchemaWrapper(NULL, &pTopic->schema); schemaLen = taosEncodeSSchemaWrapper(NULL, &pTopic->schema);
...@@ -88,7 +92,9 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) { ...@@ -88,7 +92,9 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
int32_t size = sizeof(SMqTopicObj) + physicalPlanLen + pTopic->sqlLen + pTopic->astLen + schemaLen + ntbColLen + int32_t size = sizeof(SMqTopicObj) + physicalPlanLen + pTopic->sqlLen + pTopic->astLen + schemaLen + ntbColLen +
MND_TOPIC_RESERVE_SIZE; MND_TOPIC_RESERVE_SIZE;
SSdbRaw *pRaw = sdbAllocRaw(SDB_TOPIC, MND_TOPIC_VER_NUMBER, size); SSdbRaw *pRaw = sdbAllocRaw(SDB_TOPIC, MND_TOPIC_VER_NUMBER, size);
if (pRaw == NULL) goto TOPIC_ENCODE_OVER; if (pRaw == NULL) {
goto TOPIC_ENCODE_OVER;
}
int32_t dataPos = 0; int32_t dataPos = 0;
SDB_SET_BINARY(pRaw, dataPos, pTopic->name, TSDB_TOPIC_FNAME_LEN, TOPIC_ENCODE_OVER); SDB_SET_BINARY(pRaw, dataPos, pTopic->name, TSDB_TOPIC_FNAME_LEN, TOPIC_ENCODE_OVER);
...@@ -106,6 +112,7 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) { ...@@ -106,6 +112,7 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
SDB_SET_INT32(pRaw, dataPos, pTopic->sqlLen, TOPIC_ENCODE_OVER); SDB_SET_INT32(pRaw, dataPos, pTopic->sqlLen, TOPIC_ENCODE_OVER);
SDB_SET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_ENCODE_OVER); SDB_SET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_ENCODE_OVER);
SDB_SET_INT32(pRaw, dataPos, pTopic->astLen, TOPIC_ENCODE_OVER); SDB_SET_INT32(pRaw, dataPos, pTopic->astLen, TOPIC_ENCODE_OVER);
if (pTopic->astLen) { if (pTopic->astLen) {
SDB_SET_BINARY(pRaw, dataPos, pTopic->ast, pTopic->astLen, TOPIC_ENCODE_OVER); SDB_SET_BINARY(pRaw, dataPos, pTopic->ast, pTopic->astLen, TOPIC_ENCODE_OVER);
} }
...@@ -123,6 +130,7 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) { ...@@ -123,6 +130,7 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
taosEncodeSSchemaWrapper(&aswBuf, &pTopic->schema); taosEncodeSSchemaWrapper(&aswBuf, &pTopic->schema);
SDB_SET_BINARY(pRaw, dataPos, swBuf, schemaLen, TOPIC_ENCODE_OVER); SDB_SET_BINARY(pRaw, dataPos, swBuf, schemaLen, TOPIC_ENCODE_OVER);
} }
SDB_SET_INT64(pRaw, dataPos, pTopic->ntbUid, TOPIC_ENCODE_OVER); SDB_SET_INT64(pRaw, dataPos, pTopic->ntbUid, TOPIC_ENCODE_OVER);
if (pTopic->ntbUid != 0) { if (pTopic->ntbUid != 0) {
int32_t sz = taosArrayGetSize(pTopic->ntbColIds); int32_t sz = taosArrayGetSize(pTopic->ntbColIds);
...@@ -132,6 +140,7 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) { ...@@ -132,6 +140,7 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
SDB_SET_INT16(pRaw, dataPos, colId, TOPIC_ENCODE_OVER); SDB_SET_INT16(pRaw, dataPos, colId, TOPIC_ENCODE_OVER);
} }
} }
SDB_SET_INT64(pRaw, dataPos, pTopic->ctbStbUid, TOPIC_ENCODE_OVER); SDB_SET_INT64(pRaw, dataPos, pTopic->ctbStbUid, TOPIC_ENCODE_OVER);
SDB_SET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_ENCODE_OVER); SDB_SET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_ENCODE_OVER);
...@@ -247,10 +256,9 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) { ...@@ -247,10 +256,9 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT16(pRaw, dataPos, &colId, TOPIC_DECODE_OVER); SDB_GET_INT16(pRaw, dataPos, &colId, TOPIC_DECODE_OVER);
taosArrayPush(pTopic->ntbColIds, &colId); taosArrayPush(pTopic->ntbColIds, &colId);
} }
SDB_GET_INT64(pRaw, dataPos, &pTopic->ctbStbUid, TOPIC_DECODE_OVER);
SDB_GET_INT64(pRaw, dataPos, &pTopic->ctbStbUid, TOPIC_DECODE_OVER);
SDB_GET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_DECODE_OVER); SDB_GET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_DECODE_OVER);
terrno = TSDB_CODE_SUCCESS; terrno = TSDB_CODE_SUCCESS;
TOPIC_DECODE_OVER: TOPIC_DECODE_OVER:
...@@ -266,12 +274,12 @@ TOPIC_DECODE_OVER: ...@@ -266,12 +274,12 @@ TOPIC_DECODE_OVER:
} }
static int32_t mndTopicActionInsert(SSdb *pSdb, SMqTopicObj *pTopic) { static int32_t mndTopicActionInsert(SSdb *pSdb, SMqTopicObj *pTopic) {
mTrace("topic:%s, perform insert action", pTopic->name); mTrace("topic:%s perform insert action", pTopic->name);
return 0; return 0;
} }
static int32_t mndTopicActionDelete(SSdb *pSdb, SMqTopicObj *pTopic) { static int32_t mndTopicActionDelete(SSdb *pSdb, SMqTopicObj *pTopic) {
mTrace("topic:%s, perform delete action", pTopic->name); mTrace("topic:%s perform delete action", pTopic->name);
taosMemoryFreeClear(pTopic->sql); taosMemoryFreeClear(pTopic->sql);
taosMemoryFreeClear(pTopic->ast); taosMemoryFreeClear(pTopic->ast);
taosMemoryFreeClear(pTopic->physicalPlan); taosMemoryFreeClear(pTopic->physicalPlan);
...@@ -281,7 +289,7 @@ static int32_t mndTopicActionDelete(SSdb *pSdb, SMqTopicObj *pTopic) { ...@@ -281,7 +289,7 @@ static int32_t mndTopicActionDelete(SSdb *pSdb, SMqTopicObj *pTopic) {
} }
static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pOldTopic, SMqTopicObj *pNewTopic) { static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pOldTopic, SMqTopicObj *pNewTopic) {
mTrace("topic:%s, perform update action", pOldTopic->name); mTrace("topic:%s perform update action", pOldTopic->name);
atomic_exchange_64(&pOldTopic->updateTime, pNewTopic->updateTime); atomic_exchange_64(&pOldTopic->updateTime, pNewTopic->updateTime);
atomic_exchange_32(&pOldTopic->version, pNewTopic->version); atomic_exchange_32(&pOldTopic->version, pNewTopic->version);
...@@ -364,7 +372,7 @@ static int32_t extractTopicTbInfo(SNode *pAst, SMqTopicObj *pTopic) { ...@@ -364,7 +372,7 @@ static int32_t extractTopicTbInfo(SNode *pAst, SMqTopicObj *pTopic) {
static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *pCreate, SDbObj *pDb, static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *pCreate, SDbObj *pDb,
const char *userName) { const char *userName) {
mInfo("topic:%s created", pCreate->name); mInfo("start to create topic:%s", pCreate->name);
SMqTopicObj topicObj = {0}; SMqTopicObj topicObj = {0};
tstrncpy(topicObj.name, pCreate->name, TSDB_TOPIC_FNAME_LEN); tstrncpy(topicObj.name, pCreate->name, TSDB_TOPIC_FNAME_LEN);
...@@ -385,13 +393,13 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * ...@@ -385,13 +393,13 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
topicObj.subType = pCreate->subType; topicObj.subType = pCreate->subType;
topicObj.withMeta = pCreate->withMeta; topicObj.withMeta = pCreate->withMeta;
if (topicObj.withMeta && topicObj.subType == TOPIC_SUB_TYPE__COLUMN) { if (pCreate->subType == TOPIC_SUB_TYPE__COLUMN) {
if (pCreate->withMeta) {
terrno = TSDB_CODE_MND_INVALID_TOPIC_OPTION; terrno = TSDB_CODE_MND_INVALID_TOPIC_OPTION;
mError("topic:%s, failed to create since %s", pCreate->name, terrstr()); mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
return -1; return -1;
} }
if (pCreate->subType == TOPIC_SUB_TYPE__COLUMN) {
topicObj.ast = strdup(pCreate->ast); topicObj.ast = strdup(pCreate->ast);
topicObj.astLen = strlen(pCreate->ast) + 1; topicObj.astLen = strlen(pCreate->ast) + 1;
...@@ -451,6 +459,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * ...@@ -451,6 +459,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
terrno = TSDB_CODE_MND_STB_NOT_EXIST; terrno = TSDB_CODE_MND_STB_NOT_EXIST;
return -1; return -1;
} }
topicObj.stbUid = pStb->uid; topicObj.stbUid = pStb->uid;
mndReleaseStb(pMnode, pStb); mndReleaseStb(pMnode, pStb);
} }
...@@ -889,24 +898,26 @@ static void mndCancelGetNextTopic(SMnode *pMnode, void *pIter) { ...@@ -889,24 +898,26 @@ static void mndCancelGetNextTopic(SMnode *pMnode, void *pIter) {
sdbCancelFetch(pSdb, pIter); sdbCancelFetch(pSdb, pIter);
} }
int32_t mndCheckTopicExist(SMnode *pMnode, SDbObj *pDb) { bool mndTopicExistsForDb(SMnode *pMnode, SDbObj *pDb) {
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL; void *pIter = NULL;
SMqTopicObj *pTopic = NULL; SMqTopicObj *pTopic = NULL;
while (1) { while (1) {
pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic); pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic);
if (pIter == NULL) break; if (pIter == NULL) {
break;
}
if (pTopic->dbUid == pDb->uid) { if (pTopic->dbUid == pDb->uid) {
sdbRelease(pSdb, pTopic); sdbRelease(pSdb, pTopic);
terrno = TSDB_CODE_MND_TOPIC_MUST_BE_DELETED; return true;
return -1;
} }
sdbRelease(pSdb, pTopic); sdbRelease(pSdb, pTopic);
} }
return 0;
return false;
} }
#if 0 #if 0
......
...@@ -114,7 +114,10 @@ STQ* tqOpen(const char* path, SVnode* pVnode) { ...@@ -114,7 +114,10 @@ STQ* tqOpen(const char* path, SVnode* pVnode) {
} }
void tqClose(STQ* pTq) { void tqClose(STQ* pTq) {
if (pTq) { if (pTq == NULL) {
return;
}
tqOffsetClose(pTq->pOffsetStore); tqOffsetClose(pTq->pOffsetStore);
taosHashCleanup(pTq->pHandle); taosHashCleanup(pTq->pHandle);
taosHashCleanup(pTq->pPushMgr); taosHashCleanup(pTq->pPushMgr);
...@@ -123,7 +126,6 @@ void tqClose(STQ* pTq) { ...@@ -123,7 +126,6 @@ void tqClose(STQ* pTq) {
tqMetaClose(pTq); tqMetaClose(pTq);
streamMetaClose(pTq->pStreamMeta); streamMetaClose(pTq->pStreamMeta);
taosMemoryFree(pTq); taosMemoryFree(pTq);
}
} }
int32_t tqSendMetaPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqMetaRsp* pRsp) { int32_t tqSendMetaPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqMetaRsp* pRsp) {
...@@ -158,7 +160,7 @@ int32_t tqSendMetaPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, ...@@ -158,7 +160,7 @@ int32_t tqSendMetaPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq,
}; };
tmsgSendRsp(&resp); tmsgSendRsp(&resp);
tqDebug("vgId:%d, from consumer:%" PRId64 ", (epoch %d) send rsp, res msg type %d, offset type:%d", tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) send rsp, res msg type %d, offset type:%d",
TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->resMsgType, pRsp->rspOffset.type); TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->resMsgType, pRsp->rspOffset.type);
return 0; return 0;
...@@ -217,7 +219,7 @@ int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry) { ...@@ -217,7 +219,7 @@ int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry) {
char buf2[80] = {0}; char buf2[80] = {0};
tFormatOffset(buf1, 80, &pRsp->reqOffset); tFormatOffset(buf1, 80, &pRsp->reqOffset);
tFormatOffset(buf2, 80, &pRsp->rspOffset); tFormatOffset(buf2, 80, &pRsp->rspOffset);
tqDebug("vgId:%d, from consumer:%" PRId64 ", (epoch %d) push rsp, block num: %d, reqOffset:%s, rspOffset:%s", tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) push rsp, block num: %d, reqOffset:%s, rspOffset:%s",
TD_VID(pTq->pVnode), pRsp->head.consumerId, pRsp->head.epoch, pRsp->blockNum, buf1, buf2); TD_VID(pTq->pVnode), pRsp->head.consumerId, pRsp->head.epoch, pRsp->blockNum, buf1, buf2);
return 0; return 0;
...@@ -273,7 +275,7 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con ...@@ -273,7 +275,7 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con
char buf2[80] = {0}; char buf2[80] = {0};
tFormatOffset(buf1, 80, &pRsp->reqOffset); tFormatOffset(buf1, 80, &pRsp->reqOffset);
tFormatOffset(buf2, 80, &pRsp->rspOffset); tFormatOffset(buf2, 80, &pRsp->rspOffset);
tqDebug("vgId:%d, from consumer:%" PRId64 ", (epoch %d) send rsp, block num: %d, reqOffset:%s, rspOffset:%s", tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) send rsp, block num: %d, reqOffset:%s, rspOffset:%s",
TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2); TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2);
return 0; return 0;
...@@ -332,8 +334,7 @@ int32_t tqSendTaosxRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, co ...@@ -332,8 +334,7 @@ int32_t tqSendTaosxRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, co
char buf2[80] = {0}; char buf2[80] = {0};
tFormatOffset(buf1, 80, &pRsp->reqOffset); tFormatOffset(buf1, 80, &pRsp->reqOffset);
tFormatOffset(buf2, 80, &pRsp->rspOffset); tFormatOffset(buf2, 80, &pRsp->rspOffset);
tqDebug("taosx rsp, vgId:%d, from consumer:%" PRId64 tqDebug("taosx rsp, vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) send rsp, block num: %d, reqOffset:%s, rspOffset:%s",
", (epoch %d) send rsp, block num: %d, reqOffset:%s, rspOffset:%s",
TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2); TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2);
return 0; return 0;
...@@ -344,10 +345,10 @@ static FORCE_INLINE bool tqOffsetLessOrEqual(const STqOffset* pLeft, const STqOf ...@@ -344,10 +345,10 @@ static FORCE_INLINE bool tqOffsetLessOrEqual(const STqOffset* pLeft, const STqOf
pLeft->val.version <= pRight->val.version; pLeft->val.version <= pRight->val.version;
} }
int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) { int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
STqOffset offset = {0}; STqOffset offset = {0};
SDecoder decoder; SDecoder decoder;
tDecoderInit(&decoder, msg, msgLen); tDecoderInit(&decoder, (uint8_t*) msg, msgLen);
if (tDecodeSTqOffset(&decoder, &offset) < 0) { if (tDecodeSTqOffset(&decoder, &offset) < 0) {
ASSERT(0); ASSERT(0);
return -1; return -1;
...@@ -360,7 +361,7 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t version, char* msg, int32_t m ...@@ -360,7 +361,7 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t version, char* msg, int32_t m
} else if (offset.val.type == TMQ_OFFSET__LOG) { } else if (offset.val.type == TMQ_OFFSET__LOG) {
tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:log) version:%" PRId64, offset.subKey, tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:log) version:%" PRId64, offset.subKey,
TD_VID(pTq->pVnode), offset.val.version); TD_VID(pTq->pVnode), offset.val.version);
if (offset.val.version + 1 == version) { if (offset.val.version + 1 == sversion) {
offset.val.version += 1; offset.val.version += 1;
} }
} else { } else {
...@@ -472,19 +473,17 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -472,19 +473,17 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
int32_t reqEpoch = req.epoch; int32_t reqEpoch = req.epoch;
STqOffsetVal reqOffset = req.reqOffset; STqOffsetVal reqOffset = req.reqOffset;
// 1.find handle // 1. find handle
STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey)); STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
/*ASSERT(pHandle);*/
if (pHandle == NULL) { if (pHandle == NULL) {
tqError("tmq poll: no consumer handle for consumer:%" PRId64 ", in vgId:%d, subkey %s", consumerId, tqError("tmq poll: consumer:0x%" PRIx64 " vgId:%d, subkey %s not found", consumerId, TD_VID(pTq->pVnode),
TD_VID(pTq->pVnode), req.subKey); req.subKey);
return -1; return -1;
} }
// check rebalance // 2. check rebalance
if (pHandle->consumerId != consumerId) { if (pHandle->consumerId != consumerId) {
tqError("tmq poll: consumer handle mismatch for consumer:%" PRId64 tqError("tmq poll: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
", in vgId:%d, subkey %s, handle consumer id %" PRId64,
consumerId, TD_VID(pTq->pVnode), req.subKey, pHandle->consumerId); consumerId, TD_VID(pTq->pVnode), req.subKey, pHandle->consumerId);
terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH; terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
return -1; return -1;
...@@ -498,7 +497,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -498,7 +497,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
char buf[80]; char buf[80];
tFormatOffset(buf, 80, &reqOffset); tFormatOffset(buf, 80, &reqOffset);
tqDebug("tmq poll: consumer %" PRId64 " (epoch %d), subkey %s, recv poll req in vg %d, req offset %s", consumerId, tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req in vg %d, req offset %s", consumerId,
req.epoch, pHandle->subKey, TD_VID(pTq->pVnode), buf); req.epoch, pHandle->subKey, TD_VID(pTq->pVnode), buf);
// 2.reset offset if needed // 2.reset offset if needed
...@@ -510,7 +509,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -510,7 +509,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
fetchOffsetNew = pOffset->val; fetchOffsetNew = pOffset->val;
char formatBuf[80]; char formatBuf[80];
tFormatOffset(formatBuf, 80, &fetchOffsetNew); tFormatOffset(formatBuf, 80, &fetchOffsetNew);
tqDebug("tmq poll: consumer %" PRId64 ", subkey %s, vg %d, offset reset to %s", consumerId, pHandle->subKey, tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vg %d, offset reset to %s", consumerId, pHandle->subKey,
TD_VID(pTq->pVnode), formatBuf); TD_VID(pTq->pVnode), formatBuf);
} else { } else {
if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEAST) { if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEAST) {
...@@ -675,7 +674,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -675,7 +674,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
SWalCont* pHead = &pCkHead->head; SWalCont* pHead = &pCkHead->head;
tqDebug("tmq poll: consumer:%" PRId64 ", (epoch %d) iter log, vgId:%d offset %" PRId64 " msgType %d", consumerId, tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, vgId:%d offset %" PRId64 " msgType %d", consumerId,
req.epoch, TD_VID(pTq->pVnode), fetchVer, pHead->msgType); req.epoch, TD_VID(pTq->pVnode), fetchVer, pHead->msgType);
if (pHead->msgType == TDMT_VND_SUBMIT) { if (pHead->msgType == TDMT_VND_SUBMIT) {
...@@ -721,7 +720,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -721,7 +720,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
return 0; return 0;
} }
int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) { int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg; SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
tqDebug("vgId:%d, delete sub: %s", pTq->pVnode->config.vgId, pReq->subKey); tqDebug("vgId:%d, delete sub: %s", pTq->pVnode->config.vgId, pReq->subKey);
...@@ -755,10 +754,10 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t version, char* msg, int32_t msgL ...@@ -755,10 +754,10 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t version, char* msg, int32_t msgL
return 0; return 0;
} }
int32_t tqProcessAddCheckInfoReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) { int32_t tqProcessAddCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
STqCheckInfo info = {0}; STqCheckInfo info = {0};
SDecoder decoder; SDecoder decoder;
tDecoderInit(&decoder, msg, msgLen); tDecoderInit(&decoder, (uint8_t*) msg, msgLen);
if (tDecodeSTqCheckInfo(&decoder, &info) < 0) { if (tDecodeSTqCheckInfo(&decoder, &info) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
...@@ -775,7 +774,7 @@ int32_t tqProcessAddCheckInfoReq(STQ* pTq, int64_t version, char* msg, int32_t m ...@@ -775,7 +774,7 @@ int32_t tqProcessAddCheckInfoReq(STQ* pTq, int64_t version, char* msg, int32_t m
return 0; return 0;
} }
int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) { int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
if (taosHashRemove(pTq->pCheckInfo, msg, strlen(msg)) < 0) { if (taosHashRemove(pTq->pCheckInfo, msg, strlen(msg)) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
...@@ -787,7 +786,7 @@ int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t version, char* msg, int32_t m ...@@ -787,7 +786,7 @@ int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t version, char* msg, int32_t m
return 0; return 0;
} }
int32_t tqProcessSubscribeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) { int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
SMqRebVgReq req = {0}; SMqRebVgReq req = {0};
tDecodeSMqRebVgReq(msg, &req); tDecodeSMqRebVgReq(msg, &req);
// todo lock // todo lock
...@@ -975,7 +974,7 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -975,7 +974,7 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
SStreamTaskCheckReq req; SStreamTaskCheckReq req;
SDecoder decoder; SDecoder decoder;
tDecoderInit(&decoder, msgBody, msgLen); tDecoderInit(&decoder, (uint8_t*) msgBody, msgLen);
tDecodeSStreamTaskCheckReq(&decoder, &req); tDecodeSStreamTaskCheckReq(&decoder, &req);
tDecoderClear(&decoder); tDecoderClear(&decoder);
int32_t taskId = req.downstreamTaskId; int32_t taskId = req.downstreamTaskId;
...@@ -1028,7 +1027,7 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1028,7 +1027,7 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
return 0; return 0;
} }
int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t version, char* msg, int32_t msgLen) { int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
int32_t code; int32_t code;
SStreamTaskCheckRsp rsp; SStreamTaskCheckRsp rsp;
...@@ -1049,12 +1048,12 @@ int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t version, char* msg, int32_ ...@@ -1049,12 +1048,12 @@ int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t version, char* msg, int32_
return -1; return -1;
} }
code = streamProcessTaskCheckRsp(pTask, &rsp, version); code = streamProcessTaskCheckRsp(pTask, &rsp, sversion);
streamMetaReleaseTask(pTq->pStreamMeta, pTask); streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return code; return code;
} }
int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) { int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
int32_t code; int32_t code;
#if 0 #if 0
code = streamMetaAddSerializedTask(pTq->pStreamMeta, version, msg, msgLen); code = streamMetaAddSerializedTask(pTq->pStreamMeta, version, msg, msgLen);
...@@ -1077,14 +1076,14 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t version, char* msg, int32_t msg ...@@ -1077,14 +1076,14 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t version, char* msg, int32_t msg
tDecoderClear(&decoder); tDecoderClear(&decoder);
// 2.save task // 2.save task
code = streamMetaAddTask(pTq->pStreamMeta, version, pTask); code = streamMetaAddTask(pTq->pStreamMeta, sversion, pTask);
if (code < 0) { if (code < 0) {
return -1; return -1;
} }
// 3.go through recover steps to fill history // 3.go through recover steps to fill history
if (pTask->fillHistory) { if (pTask->fillHistory) {
streamTaskCheckDownstream(pTask, version); streamTaskCheckDownstream(pTask, sversion);
} }
return 0; return 0;
...@@ -1155,7 +1154,7 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1155,7 +1154,7 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) {
return 0; return 0;
} }
int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t version, char* msg, int32_t msgLen) { int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
int32_t code; int32_t code;
SStreamRecoverStep2Req* pReq = (SStreamRecoverStep2Req*)msg; SStreamRecoverStep2Req* pReq = (SStreamRecoverStep2Req*)msg;
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId); SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
...@@ -1164,7 +1163,7 @@ int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t version, char* msg, int32_t m ...@@ -1164,7 +1163,7 @@ int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t version, char* msg, int32_t m
} }
// do recovery step 2 // do recovery step 2
code = streamSourceRecoverScanStep2(pTask, version); code = streamSourceRecoverScanStep2(pTask, sversion);
if (code < 0) { if (code < 0) {
streamMetaReleaseTask(pTq->pStreamMeta, pTask); streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return -1; return -1;
...@@ -1212,7 +1211,7 @@ int32_t tqProcessTaskRecoverFinishReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1212,7 +1211,7 @@ int32_t tqProcessTaskRecoverFinishReq(STQ* pTq, SRpcMsg* pMsg) {
SStreamRecoverFinishReq req; SStreamRecoverFinishReq req;
SDecoder decoder; SDecoder decoder;
tDecoderInit(&decoder, msg, msgLen); tDecoderInit(&decoder, (uint8_t*) msg, msgLen);
tDecodeSStreamRecoverFinishReq(&decoder, &req); tDecodeSStreamRecoverFinishReq(&decoder, &req);
tDecoderClear(&decoder); tDecoderClear(&decoder);
...@@ -1450,7 +1449,7 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1450,7 +1449,7 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
} }
} }
int32_t tqProcessTaskDropReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) { int32_t tqProcessTaskDropReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg; SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
streamMetaRemoveTask(pTq->pStreamMeta, pReq->taskId); streamMetaRemoveTask(pTq->pStreamMeta, pReq->taskId);
return 0; return 0;
...@@ -1462,7 +1461,7 @@ int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1462,7 +1461,7 @@ int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) {
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
SStreamRetrieveReq req; SStreamRetrieveReq req;
SDecoder decoder; SDecoder decoder;
tDecoderInit(&decoder, msgBody, msgLen); tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
tDecodeStreamRetrieveReq(&decoder, &req); tDecodeStreamRetrieveReq(&decoder, &req);
tDecoderClear(&decoder); tDecoderClear(&decoder);
int32_t taskId = req.dstTaskId; int32_t taskId = req.dstTaskId;
...@@ -1495,7 +1494,7 @@ int32_t vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) { ...@@ -1495,7 +1494,7 @@ int32_t vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) {
SStreamDispatchReq req; SStreamDispatchReq req;
SDecoder decoder; SDecoder decoder;
tDecoderInit(&decoder, msgBody, msgLen); tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
if (tDecodeStreamDispatchReq(&decoder, &req) < 0) { if (tDecodeStreamDispatchReq(&decoder, &req) < 0) {
code = TSDB_CODE_MSG_DECODE_ERROR; code = TSDB_CODE_MSG_DECODE_ERROR;
tDecoderClear(&decoder); tDecoderClear(&decoder);
...@@ -1558,4 +1557,4 @@ FAIL: ...@@ -1558,4 +1557,4 @@ FAIL:
return -1; return -1;
} }
int32_t tqCheckLogInWal(STQ* pTq, int64_t version) { return version <= pTq->walLogLastVer; } int32_t tqCheckLogInWal(STQ* pTq, int64_t sversion) { return sversion <= pTq->walLogLastVer; }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册