未验证 提交 11dccf12 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #21131 from taosdata/fix/liaohj_main

refactor: do some internal refactor.
...@@ -50,7 +50,6 @@ enum { ...@@ -50,7 +50,6 @@ enum {
TASK_STATUS__RECOVER_PREPARE, TASK_STATUS__RECOVER_PREPARE,
TASK_STATUS__RECOVER1, TASK_STATUS__RECOVER1,
TASK_STATUS__RECOVER2, TASK_STATUS__RECOVER2,
TASK_STATUS__RESTORE, // only available for source task to replay WAL from the checkpoint
}; };
enum { enum {
...@@ -346,7 +345,7 @@ typedef struct SStreamMeta { ...@@ -346,7 +345,7 @@ typedef struct SStreamMeta {
FTaskExpand* expandFunc; FTaskExpand* expandFunc;
int32_t vgId; int32_t vgId;
SRWLatch lock; SRWLatch lock;
int32_t walScan; int32_t walScanCounter;
} SStreamMeta; } SStreamMeta;
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo); int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo);
...@@ -545,8 +544,9 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz); ...@@ -545,8 +544,9 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz);
// recover and fill history // recover and fill history
int32_t streamTaskCheckDownstream(SStreamTask* pTask, int64_t version); int32_t streamTaskCheckDownstream(SStreamTask* pTask, int64_t version);
int32_t streamTaskLaunchRecover(SStreamTask* pTask, int64_t version); int32_t streamTaskLaunchRecover(SStreamTask* pTask, int64_t version);
int32_t streamProcessTaskCheckReq(SStreamTask* pTask, const SStreamTaskCheckReq* pReq); int32_t streamTaskCheckStatus(SStreamTask* pTask);
int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp, int64_t version); int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp, int64_t version);
// common // common
int32_t streamSetParamForRecover(SStreamTask* pTask); int32_t streamSetParamForRecover(SStreamTask* pTask);
int32_t streamRestoreParam(SStreamTask* pTask); int32_t streamRestoreParam(SStreamTask* pTask);
......
...@@ -23,13 +23,12 @@ extern "C" { ...@@ -23,13 +23,12 @@ extern "C" {
#endif #endif
enum { enum {
MQ_CONSUMER_STATUS__MODIFY = 1, MQ_CONSUMER_STATUS_REBALANCE = 1,
// MQ_CONSUMER_STATUS__MODIFY_IN_REB, // this value is not used anymore // MQ_CONSUMER_STATUS__MODIFY_IN_REB, // this value is not used anymore
MQ_CONSUMER_STATUS__READY, MQ_CONSUMER_STATUS__READY,
MQ_CONSUMER_STATUS__LOST, MQ_CONSUMER_STATUS__LOST,
// MQ_CONSUMER_STATUS__LOST_IN_REB, // this value is not used anymore // MQ_CONSUMER_STATUS__LOST_IN_REB, // this value is not used anymore
MQ_CONSUMER_STATUS__LOST_REBD, MQ_CONSUMER_STATUS__LOST_REBD,
MQ_CONSUMER_STATUS__REMOVED,
}; };
int32_t mndInitConsumer(SMnode *pMnode); int32_t mndInitConsumer(SMnode *pMnode);
......
...@@ -142,7 +142,7 @@ typedef enum { ...@@ -142,7 +142,7 @@ typedef enum {
CONSUMER_UPDATE__REMOVE, CONSUMER_UPDATE__REMOVE,
CONSUMER_UPDATE__LOST, CONSUMER_UPDATE__LOST,
CONSUMER_UPDATE__RECOVER, CONSUMER_UPDATE__RECOVER,
CONSUMER_UPDATE__MODIFY, // subscribe req need change consume topic CONSUMER_UPDATE__REBALANCE, // subscribe req need change consume topic
} ECsmUpdateType; } ECsmUpdateType;
typedef struct { typedef struct {
......
...@@ -192,15 +192,18 @@ FAIL: ...@@ -192,15 +192,18 @@ FAIL:
return -1; return -1;
} }
// todo check the clear process
static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg) { static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg) {
SMnode *pMnode = pMsg->info.node; SMnode *pMnode = pMsg->info.node;
SMqConsumerClearMsg *pClearMsg = pMsg->pCont; SMqConsumerClearMsg *pClearMsg = pMsg->pCont;
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pClearMsg->consumerId); SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pClearMsg->consumerId);
if (pConsumer == NULL) { if (pConsumer == NULL) {
mError("consumer:0x%"PRIx64" failed to be found to clear it", pClearMsg->consumerId);
return 0; return 0;
} }
mInfo("receive consumer clear msg, consumer id %" PRId64 ", status %s", pClearMsg->consumerId, mInfo("consumer:0x%" PRIx64 " needs to be cleared, status %s", pClearMsg->consumerId,
mndConsumerStatusName(pConsumer->status)); mndConsumerStatusName(pConsumer->status));
if (pConsumer->status != MQ_CONSUMER_STATUS__LOST_REBD) { if (pConsumer->status != MQ_CONSUMER_STATUS__LOST_REBD) {
...@@ -215,6 +218,8 @@ static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg) { ...@@ -215,6 +218,8 @@ static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg) {
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "clear-csm"); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "clear-csm");
if (pTrans == NULL) goto FAIL; if (pTrans == NULL) goto FAIL;
// this is the drop action, not the update action
if (mndSetConsumerDropLogs(pMnode, pTrans, pConsumerNew) != 0) goto FAIL; if (mndSetConsumerDropLogs(pMnode, pTrans, pConsumerNew) != 0) goto FAIL;
if (mndTransPrepare(pMnode, pTrans) != 0) goto FAIL; if (mndTransPrepare(pMnode, pTrans) != 0) goto FAIL;
...@@ -299,28 +304,36 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) { ...@@ -299,28 +304,36 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
if (status == MQ_CONSUMER_STATUS__READY) { if (status == MQ_CONSUMER_STATUS__READY) {
if (hbStatus > MND_CONSUMER_LOST_HB_CNT) { if (hbStatus > MND_CONSUMER_LOST_HB_CNT) {
SMqConsumerLostMsg *pLostMsg = rpcMallocCont(sizeof(SMqConsumerLostMsg)); SMqConsumerLostMsg *pLostMsg = rpcMallocCont(sizeof(SMqConsumerLostMsg));
if (pLostMsg == NULL) {
mError("consumer:0x%"PRIx64" failed to transfer consumer status to lost due to out of memory. alloc size:%d",
pConsumer->consumerId, (int32_t)sizeof(SMqConsumerLostMsg));
continue;
}
pLostMsg->consumerId = pConsumer->consumerId; pLostMsg->consumerId = pConsumer->consumerId;
SRpcMsg rpcMsg = { SRpcMsg rpcMsg = {
.msgType = TDMT_MND_TMQ_CONSUMER_LOST, .msgType = TDMT_MND_TMQ_CONSUMER_LOST, .pCont = pLostMsg, .contLen = sizeof(SMqConsumerLostMsg)};
.pCont = pLostMsg,
.contLen = sizeof(SMqConsumerLostMsg),
};
mDebug("consumer:0x%"PRIx64" hb not received beyond threshold %d, set to lost", pConsumer->consumerId,
MND_CONSUMER_LOST_HB_CNT);
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
} }
} else if (status == MQ_CONSUMER_STATUS__LOST_REBD) { } else if (status == MQ_CONSUMER_STATUS__LOST_REBD) {
// if the client is lost longer than one day, clear it. Otherwise, do nothing about the lost consumers. // if the client is lost longer than one day, clear it. Otherwise, do nothing about the lost consumers.
if (hbStatus > MND_CONSUMER_LOST_CLEAR_THRESHOLD) { if (hbStatus > MND_CONSUMER_LOST_CLEAR_THRESHOLD) {
SMqConsumerClearMsg *pClearMsg = rpcMallocCont(sizeof(SMqConsumerClearMsg)); SMqConsumerClearMsg *pClearMsg = rpcMallocCont(sizeof(SMqConsumerClearMsg));
if (pClearMsg == NULL) {
mError("consumer:0x%"PRIx64" failed to clear consumer due to out of memory. alloc size:%d",
pConsumer->consumerId, (int32_t)sizeof(SMqConsumerClearMsg));
continue;
}
pClearMsg->consumerId = pConsumer->consumerId; pClearMsg->consumerId = pConsumer->consumerId;
SRpcMsg rpcMsg = { SRpcMsg rpcMsg = {
.msgType = TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, .msgType = TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, .pCont = pClearMsg, .contLen = sizeof(SMqConsumerClearMsg)};
.pCont = pClearMsg,
.contLen = sizeof(SMqConsumerClearMsg),
};
mDebug("consumer:0x%" PRIx64 " lost beyond threshold %d, clear it", pConsumer->consumerId,
MND_CONSUMER_LOST_CLEAR_THRESHOLD);
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
} }
} else if (status == MQ_CONSUMER_STATUS__LOST) { } else if (status == MQ_CONSUMER_STATUS__LOST) {
...@@ -334,7 +347,7 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) { ...@@ -334,7 +347,7 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId); taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId);
} }
taosRUnLockLatch(&pConsumer->lock); taosRUnLockLatch(&pConsumer->lock);
} else { } else { // MQ_CONSUMER_STATUS_REBALANCE
taosRLockLatch(&pConsumer->lock); taosRLockLatch(&pConsumer->lock);
int32_t newTopicNum = taosArrayGetSize(pConsumer->rebNewTopics); int32_t newTopicNum = taosArrayGetSize(pConsumer->rebNewTopics);
...@@ -658,7 +671,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { ...@@ -658,7 +671,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
tstrncpy(pConsumerNew->clientId, subscribe.clientId, tListLen(pConsumerNew->clientId)); tstrncpy(pConsumerNew->clientId, subscribe.clientId, tListLen(pConsumerNew->clientId));
// set the update type // set the update type
pConsumerNew->updateType = CONSUMER_UPDATE__MODIFY; pConsumerNew->updateType = CONSUMER_UPDATE__REBALANCE;
taosArrayDestroy(pConsumerNew->assignedTopics); taosArrayDestroy(pConsumerNew->assignedTopics);
pConsumerNew->assignedTopics = taosArrayDup(pTopicList, topicNameDup); pConsumerNew->assignedTopics = taosArrayDup(pTopicList, topicNameDup);
...@@ -671,7 +684,6 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { ...@@ -671,7 +684,6 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
if (mndTransPrepare(pMnode, pTrans) != 0) goto _over; if (mndTransPrepare(pMnode, pTrans) != 0) goto _over;
} else { } else {
/*taosRLockLatch(&pExistedConsumer->lock);*/
int32_t status = atomic_load_32(&pExistedConsumer->status); int32_t status = atomic_load_32(&pExistedConsumer->status);
mInfo("receive subscribe request from existed consumer:0x%" PRIx64 mInfo("receive subscribe request from existed consumer:0x%" PRIx64
...@@ -689,7 +701,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { ...@@ -689,7 +701,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
} }
// set the update type // set the update type
pConsumerNew->updateType = CONSUMER_UPDATE__MODIFY; pConsumerNew->updateType = CONSUMER_UPDATE__REBALANCE;
taosArrayDestroy(pConsumerNew->assignedTopics); taosArrayDestroy(pConsumerNew->assignedTopics);
pConsumerNew->assignedTopics = taosArrayDup(pTopicList, topicNameDup); pConsumerNew->assignedTopics = taosArrayDup(pTopicList, topicNameDup);
...@@ -868,9 +880,10 @@ static void updateConsumerStatus(SMqConsumerObj *pConsumer) { ...@@ -868,9 +880,10 @@ static void updateConsumerStatus(SMqConsumerObj *pConsumer) {
int32_t status = pConsumer->status; int32_t status = pConsumer->status;
if (taosArrayGetSize(pConsumer->rebNewTopics) == 0 && taosArrayGetSize(pConsumer->rebRemovedTopics) == 0) { if (taosArrayGetSize(pConsumer->rebNewTopics) == 0 && taosArrayGetSize(pConsumer->rebRemovedTopics) == 0) {
if (status == MQ_CONSUMER_STATUS__MODIFY) { if (status == MQ_CONSUMER_STATUS_REBALANCE) {
pConsumer->status = MQ_CONSUMER_STATUS__READY; pConsumer->status = MQ_CONSUMER_STATUS__READY;
} else if (status == MQ_CONSUMER_STATUS__LOST) { } else if (status == MQ_CONSUMER_STATUS__LOST) {
ASSERT(taosArrayGetSize(pConsumer->currentTopics) == 0);
pConsumer->status = MQ_CONSUMER_STATUS__LOST_REBD; pConsumer->status = MQ_CONSUMER_STATUS__LOST_REBD;
} }
} }
...@@ -879,7 +892,7 @@ static void updateConsumerStatus(SMqConsumerObj *pConsumer) { ...@@ -879,7 +892,7 @@ static void updateConsumerStatus(SMqConsumerObj *pConsumer) {
// remove from new topic // remove from new topic
static void removeFromNewTopicList(SMqConsumerObj *pConsumer, const char *pTopic) { static void removeFromNewTopicList(SMqConsumerObj *pConsumer, const char *pTopic) {
int32_t size = taosArrayGetSize(pConsumer->rebNewTopics); int32_t size = taosArrayGetSize(pConsumer->rebNewTopics);
for (int32_t i = 0; i < taosArrayGetSize(pConsumer->rebNewTopics); i++) { for (int32_t i = 0; i < size; i++) {
char *p = taosArrayGetP(pConsumer->rebNewTopics, i); char *p = taosArrayGetP(pConsumer->rebNewTopics, i);
if (strcmp(pTopic, p) == 0) { if (strcmp(pTopic, p) == 0) {
taosArrayRemove(pConsumer->rebNewTopics, i); taosArrayRemove(pConsumer->rebNewTopics, i);
...@@ -900,9 +913,42 @@ static void removeFromRemoveTopicList(SMqConsumerObj *pConsumer, const char *pTo ...@@ -900,9 +913,42 @@ static void removeFromRemoveTopicList(SMqConsumerObj *pConsumer, const char *pTo
if (strcmp(pTopic, p) == 0) { if (strcmp(pTopic, p) == 0) {
taosArrayRemove(pConsumer->rebRemovedTopics, i); taosArrayRemove(pConsumer->rebRemovedTopics, i);
taosMemoryFree(p); taosMemoryFree(p);
mDebug("consumer:0x%" PRIx64 " remove topic:%s in the removed topic list, remain removedTopics:%d",
pConsumer->consumerId, pTopic, (int)taosArrayGetSize(pConsumer->rebRemovedTopics));
break;
}
}
}
static void removeFromCurrentTopicList(SMqConsumerObj *pConsumer, const char *pTopic) {
int32_t sz = taosArrayGetSize(pConsumer->currentTopics);
for (int32_t i = 0; i < sz; i++) {
char *topic = taosArrayGetP(pConsumer->currentTopics, i);
if (strcmp(pTopic, topic) == 0) {
taosArrayRemove(pConsumer->currentTopics, i);
taosMemoryFree(topic);
mDebug("consumer:0x%" PRIx64 " remove topic:%s in the current topic list, remain currentTopics:%d",
pConsumer->consumerId, pTopic, (int)taosArrayGetSize(pConsumer->currentTopics));
break;
}
}
}
static bool existInCurrentTopicList(const SMqConsumerObj* pConsumer, const char* pTopic) {
bool existing = false;
int32_t size = taosArrayGetSize(pConsumer->currentTopics);
for (int32_t i = 0; i < size; i++) {
char *topic = taosArrayGetP(pConsumer->currentTopics, i);
if (strcmp(topic, pTopic) == 0) {
existing = true;
break; break;
} }
} }
return existing;
} }
static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, SMqConsumerObj *pNewConsumer) { static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, SMqConsumerObj *pNewConsumer) {
...@@ -911,21 +957,13 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, ...@@ -911,21 +957,13 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
taosWLockLatch(&pOldConsumer->lock); taosWLockLatch(&pOldConsumer->lock);
if (pNewConsumer->updateType == CONSUMER_UPDATE__MODIFY) { if (pNewConsumer->updateType == CONSUMER_UPDATE__REBALANCE) {
SArray *tmp = pOldConsumer->rebNewTopics; TSWAP(pOldConsumer->rebNewTopics, pNewConsumer->rebNewTopics);
pOldConsumer->rebNewTopics = pNewConsumer->rebNewTopics; TSWAP(pOldConsumer->rebRemovedTopics, pNewConsumer->rebRemovedTopics);
pNewConsumer->rebNewTopics = tmp; TSWAP(pOldConsumer->assignedTopics, pNewConsumer->assignedTopics);
tmp = pOldConsumer->rebRemovedTopics;
pOldConsumer->rebRemovedTopics = pNewConsumer->rebRemovedTopics;
pNewConsumer->rebRemovedTopics = tmp;
tmp = pOldConsumer->assignedTopics;
pOldConsumer->assignedTopics = pNewConsumer->assignedTopics;
pNewConsumer->assignedTopics = tmp;
pOldConsumer->subscribeTime = pNewConsumer->upTime; pOldConsumer->subscribeTime = pNewConsumer->upTime;
pOldConsumer->status = MQ_CONSUMER_STATUS__MODIFY; pOldConsumer->status = MQ_CONSUMER_STATUS_REBALANCE;
} else if (pNewConsumer->updateType == CONSUMER_UPDATE__LOST) { } else if (pNewConsumer->updateType == CONSUMER_UPDATE__LOST) {
int32_t sz = taosArrayGetSize(pOldConsumer->currentTopics); int32_t sz = taosArrayGetSize(pOldConsumer->currentTopics);
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
...@@ -935,10 +973,10 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, ...@@ -935,10 +973,10 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
pOldConsumer->rebalanceTime = pNewConsumer->upTime; pOldConsumer->rebalanceTime = pNewConsumer->upTime;
int32_t status = pOldConsumer->status; int32_t prevStatus = pOldConsumer->status;
pOldConsumer->status = MQ_CONSUMER_STATUS__LOST; pOldConsumer->status = MQ_CONSUMER_STATUS__LOST;
mDebug("consumer:0x%" PRIx64 " state %s -> %s, reb-time:%" PRId64 ", reb-removed-topics:%d", mDebug("consumer:0x%" PRIx64 " state %s -> %s, reb-time:%" PRId64 ", reb-removed-topics:%d",
pOldConsumer->consumerId, mndConsumerStatusName(status), mndConsumerStatusName(pOldConsumer->status), pOldConsumer->consumerId, mndConsumerStatusName(prevStatus), mndConsumerStatusName(pOldConsumer->status),
pOldConsumer->rebalanceTime, (int)taosArrayGetSize(pOldConsumer->rebRemovedTopics)); pOldConsumer->rebalanceTime, (int)taosArrayGetSize(pOldConsumer->rebRemovedTopics));
} else if (pNewConsumer->updateType == CONSUMER_UPDATE__RECOVER) { } else if (pNewConsumer->updateType == CONSUMER_UPDATE__RECOVER) {
int32_t sz = taosArrayGetSize(pOldConsumer->assignedTopics); int32_t sz = taosArrayGetSize(pOldConsumer->assignedTopics);
...@@ -948,8 +986,7 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, ...@@ -948,8 +986,7 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
} }
pOldConsumer->rebalanceTime = pNewConsumer->upTime; pOldConsumer->rebalanceTime = pNewConsumer->upTime;
pOldConsumer->status = MQ_CONSUMER_STATUS_REBALANCE;
pOldConsumer->status = MQ_CONSUMER_STATUS__MODIFY;
} else if (pNewConsumer->updateType == CONSUMER_UPDATE__TOUCH) { } else if (pNewConsumer->updateType == CONSUMER_UPDATE__TOUCH) {
atomic_add_fetch_32(&pOldConsumer->epoch, 1); atomic_add_fetch_32(&pOldConsumer->epoch, 1);
...@@ -958,24 +995,16 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, ...@@ -958,24 +995,16 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
} else if (pNewConsumer->updateType == CONSUMER_UPDATE__ADD) { } else if (pNewConsumer->updateType == CONSUMER_UPDATE__ADD) {
char *pNewTopic = taosStrdup(taosArrayGetP(pNewConsumer->rebNewTopics, 0)); char *pNewTopic = taosStrdup(taosArrayGetP(pNewConsumer->rebNewTopics, 0));
// not exist in current topic // check if exist in current topic
bool existing = false;
int32_t numOfExistedTopics = taosArrayGetSize(pOldConsumer->currentTopics);
for (int32_t i = 0; i < numOfExistedTopics; i++) {
char *topic = taosArrayGetP(pOldConsumer->currentTopics, i);
if (strcmp(topic, pNewTopic) == 0) {
existing = true;
}
}
removeFromNewTopicList(pOldConsumer, pNewTopic); removeFromNewTopicList(pOldConsumer, pNewTopic);
// add to current topic // add to current topic
if (!existing) { bool existing = existInCurrentTopicList(pOldConsumer, pNewTopic);
if (existing) {
taosMemoryFree(pNewTopic);
} else { // added into current topic list
taosArrayPush(pOldConsumer->currentTopics, &pNewTopic); taosArrayPush(pOldConsumer->currentTopics, &pNewTopic);
taosArraySort(pOldConsumer->currentTopics, taosArrayCompareString); taosArraySort(pOldConsumer->currentTopics, taosArrayCompareString);
} else {
taosMemoryFree(pNewTopic);
} }
// set status // set status
...@@ -1000,16 +1029,7 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, ...@@ -1000,16 +1029,7 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
removeFromRemoveTopicList(pOldConsumer, removedTopic); removeFromRemoveTopicList(pOldConsumer, removedTopic);
// remove from current topic // remove from current topic
int32_t i = 0; removeFromCurrentTopicList(pOldConsumer, removedTopic);
int32_t sz = taosArrayGetSize(pOldConsumer->currentTopics);
for (i = 0; i < sz; i++) {
char *topic = taosArrayGetP(pOldConsumer->currentTopics, i);
if (strcmp(removedTopic, topic) == 0) {
taosArrayRemove(pOldConsumer->currentTopics, i);
taosMemoryFree(topic);
break;
}
}
// set status // set status
int32_t status = pOldConsumer->status; int32_t status = pOldConsumer->status;
...@@ -1158,7 +1178,7 @@ static const char *mndConsumerStatusName(int status) { ...@@ -1158,7 +1178,7 @@ static const char *mndConsumerStatusName(int status) {
case MQ_CONSUMER_STATUS__LOST: case MQ_CONSUMER_STATUS__LOST:
case MQ_CONSUMER_STATUS__LOST_REBD: case MQ_CONSUMER_STATUS__LOST_REBD:
return "lost"; return "lost";
case MQ_CONSUMER_STATUS__MODIFY: case MQ_CONSUMER_STATUS_REBALANCE:
return "rebalancing"; return "rebalancing";
default: default:
return "unknown"; return "unknown";
......
...@@ -225,7 +225,7 @@ SMqConsumerObj *tNewSMqConsumerObj(int64_t consumerId, char cgroup[TSDB_CGROUP_L ...@@ -225,7 +225,7 @@ SMqConsumerObj *tNewSMqConsumerObj(int64_t consumerId, char cgroup[TSDB_CGROUP_L
memcpy(pConsumer->cgroup, cgroup, TSDB_CGROUP_LEN); memcpy(pConsumer->cgroup, cgroup, TSDB_CGROUP_LEN);
pConsumer->epoch = 0; pConsumer->epoch = 0;
pConsumer->status = MQ_CONSUMER_STATUS__MODIFY; pConsumer->status = MQ_CONSUMER_STATUS_REBALANCE;
pConsumer->hbStatus = 0; pConsumer->hbStatus = 0;
taosInitRWLatch(&pConsumer->lock); taosInitRWLatch(&pConsumer->lock);
......
...@@ -214,12 +214,8 @@ static void doRemoveExistedConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, ...@@ -214,12 +214,8 @@ static void doRemoveExistedConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash,
int32_t consumerVgNum = taosArrayGetSize(pConsumerEp->vgs); int32_t consumerVgNum = taosArrayGetSize(pConsumerEp->vgs);
for (int32_t j = 0; j < consumerVgNum; j++) { for (int32_t j = 0; j < consumerVgNum; j++) {
SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j); SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j);
SMqRebOutputVg outputVg = {
.oldConsumerId = consumerId,
.newConsumerId = -1,
.pVgEp = pVgEp,
};
SMqRebOutputVg outputVg = {.oldConsumerId = consumerId, .newConsumerId = -1, .pVgEp = pVgEp};
taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg)); taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg));
mInfo("sub:%s mq re-balance remove vgId:%d from consumer:%" PRIx64, pSubKey, pVgEp->vgId, consumerId); mInfo("sub:%s mq re-balance remove vgId:%d from consumer:%" PRIx64, pSubKey, pVgEp->vgId, consumerId);
} }
...@@ -484,14 +480,16 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu ...@@ -484,14 +480,16 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu
for (int32_t i = 0; i < vgNum; i++) { for (int32_t i = 0; i < vgNum; i++) {
SMqRebOutputVg *pRebVg = taosArrayGet(rebVgs, i); SMqRebOutputVg *pRebVg = taosArrayGet(rebVgs, i);
if (mndPersistSubChangeVgReq(pMnode, pTrans, pOutput->pSub, pRebVg) < 0) { if (mndPersistSubChangeVgReq(pMnode, pTrans, pOutput->pSub, pRebVg) < 0) {
goto REB_FAIL; mndTransDrop(pTrans);
return -1;
} }
} }
// 2. redo log: subscribe and vg assignment // 2. redo log: subscribe and vg assignment
// subscribe // subscribe
if (mndSetSubCommitLogs(pMnode, pTrans, pOutput->pSub) != 0) { if (mndSetSubCommitLogs(pMnode, pTrans, pOutput->pSub) != 0) {
goto REB_FAIL; mndTransDrop(pTrans);
return -1;
} }
// 3. commit log: consumer to update status and epoch // 3. commit log: consumer to update status and epoch
...@@ -506,11 +504,15 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu ...@@ -506,11 +504,15 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu
if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) { if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) {
tDeleteSMqConsumerObj(pConsumerNew); tDeleteSMqConsumerObj(pConsumerNew);
taosMemoryFree(pConsumerNew); taosMemoryFree(pConsumerNew);
goto REB_FAIL;
mndTransDrop(pTrans);
return -1;
} }
tDeleteSMqConsumerObj(pConsumerNew); tDeleteSMqConsumerObj(pConsumerNew);
taosMemoryFree(pConsumerNew); taosMemoryFree(pConsumerNew);
} }
// 3.2 set new consumer // 3.2 set new consumer
consumerNum = taosArrayGetSize(pOutput->newConsumers); consumerNum = taosArrayGetSize(pOutput->newConsumers);
for (int32_t i = 0; i < consumerNum; i++) { for (int32_t i = 0; i < consumerNum; i++) {
...@@ -527,8 +529,11 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu ...@@ -527,8 +529,11 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu
if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) { if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) {
tDeleteSMqConsumerObj(pConsumerNew); tDeleteSMqConsumerObj(pConsumerNew);
taosMemoryFree(pConsumerNew); taosMemoryFree(pConsumerNew);
goto REB_FAIL;
mndTransDrop(pTrans);
return -1;
} }
tDeleteSMqConsumerObj(pConsumerNew); tDeleteSMqConsumerObj(pConsumerNew);
taosMemoryFree(pConsumerNew); taosMemoryFree(pConsumerNew);
} }
...@@ -549,8 +554,11 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu ...@@ -549,8 +554,11 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu
if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) { if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) {
tDeleteSMqConsumerObj(pConsumerNew); tDeleteSMqConsumerObj(pConsumerNew);
taosMemoryFree(pConsumerNew); taosMemoryFree(pConsumerNew);
goto REB_FAIL;
mndTransDrop(pTrans);
return -1;
} }
tDeleteSMqConsumerObj(pConsumerNew); tDeleteSMqConsumerObj(pConsumerNew);
taosMemoryFree(pConsumerNew); taosMemoryFree(pConsumerNew);
} }
...@@ -563,15 +571,12 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu ...@@ -563,15 +571,12 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu
// 6. execution // 6. execution
if (mndTransPrepare(pMnode, pTrans) != 0) { if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("failed to prepare trans rebalance since %s", terrstr()); mError("failed to prepare trans rebalance since %s", terrstr());
goto REB_FAIL; mndTransDrop(pTrans);
return -1;
} }
mndTransDrop(pTrans); mndTransDrop(pTrans);
return 0; return 0;
REB_FAIL:
mndTransDrop(pTrans);
return -1;
} }
static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) { static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
...@@ -584,16 +589,11 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) { ...@@ -584,16 +589,11 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
// here we only handle one topic rebalance requirement to ensure the atomic execution of this transaction. // here we only handle one topic rebalance requirement to ensure the atomic execution of this transaction.
while (1) { while (1) {
// if (rebalanceOnce) {
// break;
// }
pIter = taosHashIterate(pReq->rebSubHash, pIter); pIter = taosHashIterate(pReq->rebSubHash, pIter);
if (pIter == NULL) { if (pIter == NULL) {
break; break;
} }
// todo handle the malloc failure
SMqRebInputObj rebInput = {0}; SMqRebInputObj rebInput = {0};
SMqRebOutputObj rebOutput = {0}; SMqRebOutputObj rebOutput = {0};
rebOutput.newConsumers = taosArrayInit(0, sizeof(int64_t)); rebOutput.newConsumers = taosArrayInit(0, sizeof(int64_t));
...@@ -601,6 +601,20 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) { ...@@ -601,6 +601,20 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
rebOutput.modifyConsumers = taosArrayInit(0, sizeof(int64_t)); rebOutput.modifyConsumers = taosArrayInit(0, sizeof(int64_t));
rebOutput.rebVgs = taosArrayInit(0, sizeof(SMqRebOutputVg)); rebOutput.rebVgs = taosArrayInit(0, sizeof(SMqRebOutputVg));
if (rebOutput.newConsumers == NULL || rebOutput.removedConsumers == NULL || rebOutput.modifyConsumers == NULL ||
rebOutput.rebVgs == NULL) {
taosArrayDestroy(rebOutput.newConsumers);
taosArrayDestroy(rebOutput.removedConsumers);
taosArrayDestroy(rebOutput.modifyConsumers);
taosArrayDestroy(rebOutput.rebVgs);
terrno = TSDB_CODE_OUT_OF_MEMORY;
mInfo("mq re-balance failed, due to out of memory");
taosHashCleanup(pReq->rebSubHash);
mndRebEnd();
return -1;
}
SMqRebInfo *pRebInfo = (SMqRebInfo *)pIter; SMqRebInfo *pRebInfo = (SMqRebInfo *)pIter;
SMqSubscribeObj *pSub = mndAcquireSubscribeByKey(pMnode, pRebInfo->key); SMqSubscribeObj *pSub = mndAcquireSubscribeByKey(pMnode, pRebInfo->key);
...@@ -640,6 +654,7 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) { ...@@ -640,6 +654,7 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
rebInput.oldConsumerNum = taosHashGetSize(pSub->consumerHash); rebInput.oldConsumerNum = taosHashGetSize(pSub->consumerHash);
rebOutput.pSub = tCloneSubscribeObj(pSub); rebOutput.pSub = tCloneSubscribeObj(pSub);
taosRUnLockLatch(&pSub->lock); taosRUnLockLatch(&pSub->lock);
mInfo("sub topic:%s has %d consumers sub till now", pRebInfo->key, rebInput.oldConsumerNum); mInfo("sub topic:%s has %d consumers sub till now", pRebInfo->key, rebInput.oldConsumerNum);
mndReleaseSubscribe(pMnode, pSub); mndReleaseSubscribe(pMnode, pSub);
} }
...@@ -661,9 +676,6 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) { ...@@ -661,9 +676,6 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
taosArrayDestroy(rebOutput.rebVgs); taosArrayDestroy(rebOutput.rebVgs);
tDeleteSubscribeObj(rebOutput.pSub); tDeleteSubscribeObj(rebOutput.pSub);
taosMemoryFree(rebOutput.pSub); taosMemoryFree(rebOutput.pSub);
// taosSsleep(100);
// rebalanceOnce = true;
} }
// reset flag // reset flag
......
...@@ -103,8 +103,13 @@ STQ* tqOpen(const char* path, SVnode* pVnode) { ...@@ -103,8 +103,13 @@ STQ* tqOpen(const char* path, SVnode* pVnode) {
pTq->pCheckInfo = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK); pTq->pCheckInfo = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
taosHashSetFreeFp(pTq->pCheckInfo, (FDelete)tDeleteSTqCheckInfo); taosHashSetFreeFp(pTq->pCheckInfo, (FDelete)tDeleteSTqCheckInfo);
tqInitialize(pTq); int32_t code = tqInitialize(pTq);
if (code != TSDB_CODE_SUCCESS) {
tqClose(pTq);
return NULL;
} else {
return pTq; return pTq;
}
} }
int32_t tqInitialize(STQ* pTq) { int32_t tqInitialize(STQ* pTq) {
...@@ -594,11 +599,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { ...@@ -594,11 +599,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
pTask->chkInfo.currentVer = ver; pTask->chkInfo.currentVer = ver;
// expand executor // expand executor
if (pTask->fillHistory) { pTask->status.taskStatus = (pTask->fillHistory)? TASK_STATUS__WAIT_DOWNSTREAM:TASK_STATUS__NORMAL;
pTask->status.taskStatus = TASK_STATUS__WAIT_DOWNSTREAM;
} else {
pTask->status.taskStatus = TASK_STATUS__RESTORE;
}
if (pTask->taskLevel == TASK_LEVEL__SOURCE) { if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1); pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1);
...@@ -657,6 +658,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { ...@@ -657,6 +658,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
} }
streamSetupTrigger(pTask); streamSetupTrigger(pTask);
tqInfo("vgId:%d expand stream task, s-task:%s, checkpoint ver:%" PRId64 " child id:%d, level:%d", vgId, pTask->id.idStr, tqInfo("vgId:%d expand stream task, s-task:%s, checkpoint ver:%" PRId64 " child id:%d, level:%d", vgId, pTask->id.idStr,
pTask->chkInfo.version, pTask->selfChildId, pTask->taskLevel); pTask->chkInfo.version, pTask->selfChildId, pTask->taskLevel);
...@@ -686,8 +688,9 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -686,8 +688,9 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
}; };
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId); SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
if (pTask) { if (pTask) {
rsp.status = (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__NORMAL) ? 1 : 0; rsp.status = streamTaskCheckStatus(pTask);
streamMetaReleaseTask(pTq->pStreamMeta, pTask); streamMetaReleaseTask(pTq->pStreamMeta, pTask);
tqDebug("tq recv task check req(reqId:0x%" PRIx64 tqDebug("tq recv task check req(reqId:0x%" PRIx64
...@@ -1168,9 +1171,6 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1168,9 +1171,6 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId); SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
if (pTask != NULL) { if (pTask != NULL) {
if (pTask->status.taskStatus == TASK_STATUS__NORMAL) { if (pTask->status.taskStatus == TASK_STATUS__NORMAL) {
tqDebug("vgId:%d s-task:%s start to process run req", vgId, pTask->id.idStr);
streamProcessRunReq(pTask);
} else if (pTask->status.taskStatus == TASK_STATUS__RESTORE) {
tqDebug("vgId:%d s-task:%s start to process block from wal, last chk point:%" PRId64, vgId, tqDebug("vgId:%d s-task:%s start to process block from wal, last chk point:%" PRId64, vgId,
pTask->id.idStr, pTask->chkInfo.version); pTask->id.idStr, pTask->chkInfo.version);
streamProcessRunReq(pTask); streamProcessRunReq(pTask);
...@@ -1334,10 +1334,10 @@ int32_t tqStartStreamTasks(STQ* pTq) { ...@@ -1334,10 +1334,10 @@ int32_t tqStartStreamTasks(STQ* pTq) {
return 0; return 0;
} }
pMeta->walScan += 1; pMeta->walScanCounter += 1;
if (pMeta->walScan > 1) { if (pMeta->walScanCounter > 1) {
tqDebug("vgId:%d wal read task has been launched, remain scan times:%d", vgId, pMeta->walScan); tqDebug("vgId:%d wal read task has been launched, remain scan times:%d", vgId, pMeta->walScanCounter);
taosWUnLockLatch(&pTq->pStreamMeta->lock); taosWUnLockLatch(&pTq->pStreamMeta->lock);
return 0; return 0;
} }
......
...@@ -31,9 +31,14 @@ char* tqOffsetBuildFName(const char* path, int32_t fVer) { ...@@ -31,9 +31,14 @@ char* tqOffsetBuildFName(const char* path, int32_t fVer) {
int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname) { int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname) {
TdFilePtr pFile = taosOpenFile(fname, TD_FILE_READ); TdFilePtr pFile = taosOpenFile(fname, TD_FILE_READ);
if (pFile != NULL) { if (pFile == NULL) {
return TSDB_CODE_SUCCESS;
}
int32_t vgId = TD_VID(pStore->pTq->pVnode);
int64_t code = 0;
STqOffsetHead head = {0}; STqOffsetHead head = {0};
int64_t code;
while (1) { while (1) {
if ((code = taosReadFile(pFile, &head, sizeof(STqOffsetHead))) != sizeof(STqOffsetHead)) { if ((code = taosReadFile(pFile, &head, sizeof(STqOffsetHead))) != sizeof(STqOffsetHead)) {
...@@ -43,22 +48,27 @@ int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname) { ...@@ -43,22 +48,27 @@ int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname) {
return -1; return -1;
} }
} }
int32_t size = htonl(head.size); int32_t size = htonl(head.size);
void* memBuf = taosMemoryCalloc(1, size); void* pMemBuf = taosMemoryCalloc(1, size);
if (memBuf == NULL) { if (pMemBuf == NULL) {
tqError("vgId:%d failed to restore offset from file, since out of memory, malloc size:%d", vgId, size);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
if ((code = taosReadFile(pFile, memBuf, size)) != size) {
taosMemoryFree(memBuf); if ((code = taosReadFile(pFile, pMemBuf, size)) != size) {
taosMemoryFree(pMemBuf);
return -1; return -1;
} }
STqOffset offset; STqOffset offset;
SDecoder decoder; SDecoder decoder;
tDecoderInit(&decoder, memBuf, size); tDecoderInit(&decoder, pMemBuf, size);
if (tDecodeSTqOffset(&decoder, &offset) < 0) { if (tDecodeSTqOffset(&decoder, &offset) < 0) {
taosMemoryFree(memBuf); taosMemoryFree(pMemBuf);
tDecoderClear(&decoder); tDecoderClear(&decoder);
return -1; return code;
} }
tDecoderClear(&decoder); tDecoderClear(&decoder);
...@@ -66,22 +76,22 @@ int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname) { ...@@ -66,22 +76,22 @@ int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname) {
return -1; return -1;
} }
// todo remove this
if (offset.val.type == TMQ_OFFSET__LOG) { if (offset.val.type == TMQ_OFFSET__LOG) {
STqHandle* pHandle = taosHashGet(pStore->pTq->pHandle, offset.subKey, strlen(offset.subKey)); STqHandle* pHandle = taosHashGet(pStore->pTq->pHandle, offset.subKey, strlen(offset.subKey));
if (pHandle) { if (pHandle) {
if (walRefVer(pHandle->pRef, offset.val.version) < 0) { if (walRefVer(pHandle->pRef, offset.val.version) < 0) {
tqError("vgId: %d, tq handle %s ref ver %" PRId64 "error", pStore->pTq->pVnode->config.vgId, // tqError("vgId: %d, tq handle %s ref ver %" PRId64 "error", pStore->pTq->pVnode->config.vgId, pHandle->subKey,
pHandle->subKey, offset.val.version); // offset.val.version);
} }
} }
} }
taosMemoryFree(memBuf); taosMemoryFree(pMemBuf);
} }
taosCloseFile(&pFile); taosCloseFile(&pFile);
} return TSDB_CODE_SUCCESS;
return 0;
} }
STqOffsetStore* tqOffsetOpen(STQ* pTq) { STqOffsetStore* tqOffsetOpen(STQ* pTq) {
...@@ -89,6 +99,7 @@ STqOffsetStore* tqOffsetOpen(STQ* pTq) { ...@@ -89,6 +99,7 @@ STqOffsetStore* tqOffsetOpen(STQ* pTq) {
if (pStore == NULL) { if (pStore == NULL) {
return NULL; return NULL;
} }
pStore->pTq = pTq; pStore->pTq = pTq;
pStore->needCommit = 0; pStore->needCommit = 0;
pTq->pOffsetStore = pStore; pTq->pOffsetStore = pStore;
...@@ -98,12 +109,14 @@ STqOffsetStore* tqOffsetOpen(STQ* pTq) { ...@@ -98,12 +109,14 @@ STqOffsetStore* tqOffsetOpen(STQ* pTq) {
taosMemoryFree(pStore); taosMemoryFree(pStore);
return NULL; return NULL;
} }
char* fname = tqOffsetBuildFName(pStore->pTq->path, 0); char* fname = tqOffsetBuildFName(pStore->pTq->path, 0);
if (tqOffsetRestoreFromFile(pStore, fname) < 0) { if (tqOffsetRestoreFromFile(pStore, fname) < 0) {
taosMemoryFree(fname); taosMemoryFree(fname);
taosMemoryFree(pStore); taosMemoryFree(pStore);
return NULL; return NULL;
} }
taosMemoryFree(fname); taosMemoryFree(fname);
return pStore; return pStore;
} }
......
...@@ -274,21 +274,6 @@ int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t v ...@@ -274,21 +274,6 @@ int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t v
} }
if (msgType == TDMT_VND_SUBMIT) { if (msgType == TDMT_VND_SUBMIT) {
#if 0
void* data = taosMemoryMalloc(len);
if (data == NULL) {
// todo: for all stream in this vnode, keep this offset in the offset files, and wait for a moment, and then retry
terrno = TSDB_CODE_OUT_OF_MEMORY;
tqError("vgId:%d, failed to copy submit data for stream processing, since out of memory", vgId);
return -1;
}
memcpy(data, pReq, len);
SPackedData submit = {.msgStr = data, .msgLen = len, .ver = ver};
tqDebug("vgId:%d tq copy submit msg:%p len:%d ver:%" PRId64 " from %p for stream", vgId, data, len, ver, pReq);
tqProcessSubmitReq(pTq, submit);
#endif
SPackedData submit = {0}; SPackedData submit = {0};
tqProcessSubmitReq(pTq, submit); tqProcessSubmitReq(pTq, submit);
} }
......
...@@ -18,16 +18,15 @@ ...@@ -18,16 +18,15 @@
static int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle); static int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle);
// this function should be executed by stream threads. // this function should be executed by stream threads.
// there is a case that the WAL increases more fast than the restore procedure, and this restore procedure // extract submit block from WAL, and add them into the input queue for the sources tasks.
// will not stop eventually.
int32_t tqStreamTasksScanWal(STQ* pTq) { int32_t tqStreamTasksScanWal(STQ* pTq) {
int32_t vgId = TD_VID(pTq->pVnode); int32_t vgId = TD_VID(pTq->pVnode);
SStreamMeta* pMeta = pTq->pStreamMeta; SStreamMeta* pMeta = pTq->pStreamMeta;
int64_t st = taosGetTimestampMs(); int64_t st = taosGetTimestampMs();
while (1) { while (1) {
int32_t scan = pMeta->walScan; int32_t scan = pMeta->walScanCounter;
tqDebug("vgId:%d continue check if data in wal are available, scan:%d", vgId, scan); tqDebug("vgId:%d continue check if data in wal are available, walScanCounter:%d", vgId, scan);
// check all restore tasks // check all restore tasks
bool shouldIdle = true; bool shouldIdle = true;
...@@ -37,12 +36,12 @@ int32_t tqStreamTasksScanWal(STQ* pTq) { ...@@ -37,12 +36,12 @@ int32_t tqStreamTasksScanWal(STQ* pTq) {
if (shouldIdle) { if (shouldIdle) {
taosWLockLatch(&pMeta->lock); taosWLockLatch(&pMeta->lock);
pMeta->walScan -= 1; pMeta->walScanCounter -= 1;
times = pMeta->walScan; times = pMeta->walScanCounter;
ASSERT(pMeta->walScan >= 0); ASSERT(pMeta->walScanCounter >= 0);
if (pMeta->walScan <= 0) { if (pMeta->walScanCounter <= 0) {
taosWUnLockLatch(&pMeta->lock); taosWUnLockLatch(&pMeta->lock);
break; break;
} }
......
...@@ -3395,6 +3395,11 @@ static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) { ...@@ -3395,6 +3395,11 @@ static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) {
STableUidList* pUidList = &pStatus->uidList; STableUidList* pUidList = &pStatus->uidList;
while (1) { while (1) {
if (pReader->flag == READER_STATUS_SHOULD_STOP) {
tsdbWarn("tsdb reader is stopped ASAP, %s", pReader->idStr);
return TSDB_CODE_SUCCESS;
}
STableBlockScanInfo** pBlockScanInfo = pStatus->pTableIter; STableBlockScanInfo** pBlockScanInfo = pStatus->pTableIter;
initMemDataIterator(*pBlockScanInfo, pReader); initMemDataIterator(*pBlockScanInfo, pReader);
...@@ -3474,45 +3479,68 @@ static bool fileBlockPartiallyRead(SFileBlockDumpInfo* pDumpInfo, bool asc) { ...@@ -3474,45 +3479,68 @@ static bool fileBlockPartiallyRead(SFileBlockDumpInfo* pDumpInfo, bool asc) {
((pDumpInfo->rowIndex > 0 && asc) || (pDumpInfo->rowIndex < (pDumpInfo->totalRows - 1) && (!asc))); ((pDumpInfo->rowIndex > 0 && asc) || (pDumpInfo->rowIndex < (pDumpInfo->totalRows - 1) && (!asc)));
} }
static int32_t buildBlockFromFiles(STsdbReader* pReader) { typedef enum {
int32_t code = TSDB_CODE_SUCCESS; TSDB_READ_RETURN = 0x1,
bool asc = ASCENDING_TRAVERSE(pReader->order); TSDB_READ_CONTINUE = 0x2,
} ERetrieveType;
static ERetrieveType doReadDataFromLastFiles(STsdbReader* pReader) {
int32_t code = TSDB_CODE_SUCCESS;
SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock;
SDataBlockIter* pBlockIter = &pReader->status.blockIter; SDataBlockIter* pBlockIter = &pReader->status.blockIter;
if (pBlockIter->numOfBlocks == 0) { while(1) {
_begin: terrno = 0;
code = doLoadLastBlockSequentially(pReader); code = doLoadLastBlockSequentially(pReader);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS || pReader->flag == READER_STATUS_SHOULD_STOP) {
return code; terrno = code;
return TSDB_READ_RETURN;
} }
if (pReader->resBlockInfo.pResBlock->info.rows > 0) { if (pResBlock->info.rows > 0) {
return TSDB_CODE_SUCCESS; return TSDB_READ_RETURN;
} }
// all data blocks are checked in this last block file, now let's try the next file // all data blocks are checked in this last block file, now let's try the next file
if (pReader->status.pTableIter == NULL) { ASSERT(pReader->status.pTableIter == NULL);
code = initForFirstBlockInFile(pReader, pBlockIter); code = initForFirstBlockInFile(pReader, pBlockIter);
// error happens or all the data files are completely checked // error happens or all the data files are completely checked
if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) { if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false) ||
return code; pReader->flag == READER_STATUS_SHOULD_STOP) {
terrno = code;
return TSDB_READ_RETURN;
} }
// this file does not have data files, let's start check the last block file if exists if (pBlockIter->numOfBlocks > 0) { // there are data blocks existed.
if (pBlockIter->numOfBlocks == 0) { return TSDB_READ_CONTINUE;
} else { // all blocks in data file are checked, let's check the data in last files
resetTableListIndex(&pReader->status); resetTableListIndex(&pReader->status);
goto _begin;
} }
} }
}
static int32_t buildBlockFromFiles(STsdbReader* pReader) {
int32_t code = TSDB_CODE_SUCCESS;
bool asc = ASCENDING_TRAVERSE(pReader->order);
SDataBlockIter* pBlockIter = &pReader->status.blockIter;
SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock;
if (pBlockIter->numOfBlocks == 0) {
// let's try to extract data from stt files.
ERetrieveType type = doReadDataFromLastFiles(pReader);
if (type == TSDB_READ_RETURN) {
return terrno;
}
code = doBuildDataBlock(pReader); code = doBuildDataBlock(pReader);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS || pReader->flag == READER_STATUS_SHOULD_STOP) {
return code; return code;
} }
if (pReader->resBlockInfo.pResBlock->info.rows > 0) { if (pResBlock->info.rows > 0) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
} }
...@@ -3530,7 +3558,9 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) { ...@@ -3530,7 +3558,9 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
if (hasNext) { // check for the next block in the block accessed order list if (hasNext) { // check for the next block in the block accessed order list
initBlockDumpInfo(pReader, pBlockIter); initBlockDumpInfo(pReader, pBlockIter);
} else { } else {
if (pReader->status.pCurrentFileset->nSttF > 0) { // all data blocks in files are checked, let's check the data in last files.
ASSERT(pReader->status.pCurrentFileset->nSttF > 0);
// data blocks in current file are exhausted, let's try the next file now // data blocks in current file are exhausted, let's try the next file now
SBlockData* pBlockData = &pReader->status.fileBlockData; SBlockData* pBlockData = &pReader->status.fileBlockData;
if (pBlockData->uid != 0) { if (pBlockData->uid != 0) {
...@@ -3540,20 +3570,10 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) { ...@@ -3540,20 +3570,10 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
tBlockDataReset(pBlockData); tBlockDataReset(pBlockData);
resetDataBlockIterator(pBlockIter, pReader->order); resetDataBlockIterator(pBlockIter, pReader->order);
resetTableListIndex(&pReader->status); resetTableListIndex(&pReader->status);
goto _begin;
} else {
code = initForFirstBlockInFile(pReader, pBlockIter);
// error happens or all the data files are completely checked ERetrieveType type = doReadDataFromLastFiles(pReader);
if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) { if (type == TSDB_READ_RETURN) {
return code; return terrno;
}
// this file does not have blocks, let's start check the last block file
if (pBlockIter->numOfBlocks == 0) {
resetTableListIndex(&pReader->status);
goto _begin;
}
} }
} }
} }
...@@ -3561,11 +3581,11 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) { ...@@ -3561,11 +3581,11 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
code = doBuildDataBlock(pReader); code = doBuildDataBlock(pReader);
} }
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS || pReader->flag == READER_STATUS_SHOULD_STOP) {
return code; return code;
} }
if (pReader->resBlockInfo.pResBlock->info.rows > 0) { if (pResBlock->info.rows > 0) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
} }
......
...@@ -39,7 +39,7 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock) ...@@ -39,7 +39,7 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock)
int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq); int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq);
int32_t streamDispatchOneCheckReq(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet); int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet);
int32_t streamDispatchOneRecoverFinishReq(SStreamTask* pTask, const SStreamRecoverFinishReq* pReq, int32_t vgId, int32_t streamDispatchOneRecoverFinishReq(SStreamTask* pTask, const SStreamRecoverFinishReq* pReq, int32_t vgId,
SEpSet* pEpSet); SEpSet* pEpSet);
......
...@@ -212,9 +212,10 @@ int32_t streamTaskOutput(SStreamTask* pTask, SStreamDataBlock* pBlock) { ...@@ -212,9 +212,10 @@ int32_t streamTaskOutput(SStreamTask* pTask, SStreamDataBlock* pBlock) {
} }
int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp, bool exec) { int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp, bool exec) {
qDebug("vgId:%d s-task:%s receive dispatch req from taskId:%d", pReq->upstreamNodeId, pTask->id.idStr, qDebug("s-task:%s receive dispatch req from taskId:%d(vgId:%d)", pTask->id.idStr, pReq->upstreamTaskId,
pReq->upstreamTaskId); pReq->upstreamNodeId);
// todo add the input queue buffer limitation
streamTaskEnqueueBlocks(pTask, pReq, pRsp); streamTaskEnqueueBlocks(pTask, pReq, pRsp);
tDeleteStreamDispatchReq(pReq); tDeleteStreamDispatchReq(pReq);
...@@ -222,10 +223,6 @@ int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, S ...@@ -222,10 +223,6 @@ int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, S
if (streamTryExec(pTask) < 0) { if (streamTryExec(pTask) < 0) {
return -1; return -1;
} }
/*if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {*/
/*streamDispatch(pTask);*/
/*}*/
} else { } else {
streamSchedExec(pTask); streamSchedExec(pTask);
} }
......
...@@ -208,7 +208,7 @@ static int32_t streamAddBlockToDispatchMsg(const SSDataBlock* pBlock, SStreamDis ...@@ -208,7 +208,7 @@ static int32_t streamAddBlockToDispatchMsg(const SSDataBlock* pBlock, SStreamDis
return 0; return 0;
} }
int32_t streamDispatchOneCheckReq(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet) { int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet) {
void* buf = NULL; void* buf = NULL;
int32_t code = -1; int32_t code = -1;
SRpcMsg msg = {0}; SRpcMsg msg = {0};
...@@ -240,7 +240,7 @@ int32_t streamDispatchOneCheckReq(SStreamTask* pTask, const SStreamTaskCheckReq* ...@@ -240,7 +240,7 @@ int32_t streamDispatchOneCheckReq(SStreamTask* pTask, const SStreamTaskCheckReq*
msg.pCont = buf; msg.pCont = buf;
msg.msgType = TDMT_STREAM_TASK_CHECK; msg.msgType = TDMT_STREAM_TASK_CHECK;
qDebug("dispatch from s-task:%s to downstream s-task:%" PRIx64 ":%d node %d: check msg", pTask->id.idStr, qDebug("s-task:%s dispatch check msg to downstream s-task:%" PRIx64 ":%d node %d: check msg", pTask->id.idStr,
pReq->streamId, pReq->downstreamTaskId, nodeId); pReq->streamId, pReq->downstreamTaskId, nodeId);
tmsgSendReq(pEpSet, &msg); tmsgSendReq(pEpSet, &msg);
......
...@@ -28,7 +28,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* ...@@ -28,7 +28,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray*
while (pTask->taskLevel == TASK_LEVEL__SOURCE) { while (pTask->taskLevel == TASK_LEVEL__SOURCE) {
int8_t status = atomic_load_8(&pTask->status.taskStatus); int8_t status = atomic_load_8(&pTask->status.taskStatus);
if (status != TASK_STATUS__NORMAL && status != TASK_STATUS__RESTORE) { if (status != TASK_STATUS__NORMAL) {
qError("stream task wait for the end of fill history, s-task:%s, status:%d", pTask->id.idStr, qError("stream task wait for the end of fill history, s-task:%s, status:%d", pTask->id.idStr,
atomic_load_8(&pTask->status.taskStatus)); atomic_load_8(&pTask->status.taskStatus));
taosMsleep(2); taosMsleep(2);
......
...@@ -287,6 +287,7 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) { ...@@ -287,6 +287,7 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) {
tdbTbcClose(pCur); tdbTbcClose(pCur);
return -1; return -1;
} }
tDecoderInit(&decoder, (uint8_t*)pVal, vLen); tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
tDecodeStreamTask(&decoder, pTask); tDecodeStreamTask(&decoder, pTask);
tDecoderClear(&decoder); tDecoderClear(&decoder);
...@@ -305,7 +306,6 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) { ...@@ -305,7 +306,6 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) {
return -1; return -1;
} }
/*pTask->status.taskStatus = TASK_STATUS__NORMAL;*/
if (pTask->fillHistory) { if (pTask->fillHistory) {
pTask->status.taskStatus = TASK_STATUS__WAIT_DOWNSTREAM; pTask->status.taskStatus = TASK_STATUS__WAIT_DOWNSTREAM;
streamTaskCheckDownstream(pTask, ver); streamTaskCheckDownstream(pTask, ver);
......
...@@ -54,6 +54,8 @@ int32_t streamTaskLaunchRecover(SStreamTask* pTask, int64_t version) { ...@@ -54,6 +54,8 @@ int32_t streamTaskLaunchRecover(SStreamTask* pTask, int64_t version) {
// checkstatus // checkstatus
int32_t streamTaskCheckDownstream(SStreamTask* pTask, int64_t version) { int32_t streamTaskCheckDownstream(SStreamTask* pTask, int64_t version) {
qDebug("s-taks:%s in fill history stage, ver:%"PRId64, pTask->id.idStr, version);
SStreamTaskCheckReq req = { SStreamTaskCheckReq req = {
.streamId = pTask->id.streamId, .streamId = pTask->id.streamId,
.upstreamTaskId = pTask->id.taskId, .upstreamTaskId = pTask->id.taskId,
...@@ -63,16 +65,18 @@ int32_t streamTaskCheckDownstream(SStreamTask* pTask, int64_t version) { ...@@ -63,16 +65,18 @@ int32_t streamTaskCheckDownstream(SStreamTask* pTask, int64_t version) {
// serialize // serialize
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
req.reqId = tGenIdPI64(); req.reqId = tGenIdPI64();
req.downstreamNodeId = pTask->fixedEpDispatcher.nodeId; req.downstreamNodeId = pTask->fixedEpDispatcher.nodeId;
req.downstreamTaskId = pTask->fixedEpDispatcher.taskId; req.downstreamTaskId = pTask->fixedEpDispatcher.taskId;
pTask->checkReqId = req.reqId; pTask->checkReqId = req.reqId;
qDebug("task %d at node %d check downstream task %d at node %d", pTask->id.taskId, pTask->nodeId, req.downstreamTaskId, qDebug("s-task:%s at node %d check downstream task %d at node %d", pTask->id.idStr, pTask->nodeId, req.downstreamTaskId,
req.downstreamNodeId); req.downstreamNodeId);
streamDispatchOneCheckReq(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet); streamDispatchCheckMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet);
} else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
int32_t vgSz = taosArrayGetSize(vgInfo); int32_t vgSz = taosArrayGetSize(vgInfo);
pTask->recoverTryingDownstream = vgSz; pTask->recoverTryingDownstream = vgSz;
pTask->checkReqIds = taosArrayInit(vgSz, sizeof(int64_t)); pTask->checkReqIds = taosArrayInit(vgSz, sizeof(int64_t));
...@@ -83,14 +87,15 @@ int32_t streamTaskCheckDownstream(SStreamTask* pTask, int64_t version) { ...@@ -83,14 +87,15 @@ int32_t streamTaskCheckDownstream(SStreamTask* pTask, int64_t version) {
taosArrayPush(pTask->checkReqIds, &req.reqId); taosArrayPush(pTask->checkReqIds, &req.reqId);
req.downstreamNodeId = pVgInfo->vgId; req.downstreamNodeId = pVgInfo->vgId;
req.downstreamTaskId = pVgInfo->taskId; req.downstreamTaskId = pVgInfo->taskId;
qDebug("task %d at node %d check downstream task %d at node %d (shuffle)", pTask->id.taskId, pTask->nodeId, qDebug("s-task:%s at node %d check downstream task %d at node %d (shuffle)", pTask->id.idStr, pTask->nodeId,
req.downstreamTaskId, req.downstreamNodeId); req.downstreamTaskId, req.downstreamNodeId);
streamDispatchOneCheckReq(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); streamDispatchCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
} }
} else { } else {
qDebug("task %d at node %d direct launch recover since no downstream", pTask->id.taskId, pTask->nodeId); qDebug("s-task:%s at node %d direct launch recover since no downstream", pTask->id.idStr, pTask->nodeId);
streamTaskLaunchRecover(pTask, version); streamTaskLaunchRecover(pTask, version);
} }
return 0; return 0;
} }
...@@ -109,14 +114,14 @@ int32_t streamRecheckOneDownstream(SStreamTask* pTask, const SStreamTaskCheckRsp ...@@ -109,14 +114,14 @@ int32_t streamRecheckOneDownstream(SStreamTask* pTask, const SStreamTaskCheckRsp
req.downstreamTaskId, req.downstreamNodeId); req.downstreamTaskId, req.downstreamNodeId);
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
streamDispatchOneCheckReq(pTask, &req, pRsp->downstreamNodeId, &pTask->fixedEpDispatcher.epSet); streamDispatchCheckMsg(pTask, &req, pRsp->downstreamNodeId, &pTask->fixedEpDispatcher.epSet);
} else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
int32_t vgSz = taosArrayGetSize(vgInfo); int32_t vgSz = taosArrayGetSize(vgInfo);
for (int32_t i = 0; i < vgSz; i++) { for (int32_t i = 0; i < vgSz; i++) {
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
if (pVgInfo->taskId == req.downstreamTaskId) { if (pVgInfo->taskId == req.downstreamTaskId) {
streamDispatchOneCheckReq(pTask, &req, pRsp->downstreamNodeId, &pVgInfo->epSet); streamDispatchCheckMsg(pTask, &req, pRsp->downstreamNodeId, &pVgInfo->epSet);
} }
} }
} }
...@@ -124,8 +129,8 @@ int32_t streamRecheckOneDownstream(SStreamTask* pTask, const SStreamTaskCheckRsp ...@@ -124,8 +129,8 @@ int32_t streamRecheckOneDownstream(SStreamTask* pTask, const SStreamTaskCheckRsp
return 0; return 0;
} }
int32_t streamProcessTaskCheckReq(SStreamTask* pTask, const SStreamTaskCheckReq* pReq) { int32_t streamTaskCheckStatus(SStreamTask* pTask) {
return atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__NORMAL; return atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__NORMAL? 1:0;
} }
int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp, int64_t version) { int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp, int64_t version) {
...@@ -135,7 +140,9 @@ int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* ...@@ -135,7 +140,9 @@ int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp*
if (pRsp->status == 1) { if (pRsp->status == 1) {
if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
bool found = false; bool found = false;
for (int32_t i = 0; i < taosArrayGetSize(pTask->checkReqIds); i++) {
int32_t numOfReqs = taosArrayGetSize(pTask->checkReqIds);
for (int32_t i = 0; i < numOfReqs; i++) {
int64_t reqId = *(int64_t*)taosArrayGet(pTask->checkReqIds, i); int64_t reqId = *(int64_t*)taosArrayGet(pTask->checkReqIds, i);
if (reqId == pRsp->reqId) { if (reqId == pRsp->reqId) {
found = true; found = true;
...@@ -149,9 +156,12 @@ int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* ...@@ -149,9 +156,12 @@ int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp*
int32_t left = atomic_sub_fetch_32(&pTask->recoverTryingDownstream, 1); int32_t left = atomic_sub_fetch_32(&pTask->recoverTryingDownstream, 1);
ASSERT(left >= 0); ASSERT(left >= 0);
if (left == 0) { if (left == 0) {
taosArrayDestroy(pTask->checkReqIds); taosArrayDestroy(pTask->checkReqIds);
pTask->checkReqIds = NULL; pTask->checkReqIds = NULL;
qDebug("s-task:%s all downstream tasks:%d are ready, now enter into recover stage", pTask->id.idStr, numOfReqs);
streamTaskLaunchRecover(pTask, version); streamTaskLaunchRecover(pTask, version);
} }
} else if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { } else if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
...@@ -163,7 +173,10 @@ int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* ...@@ -163,7 +173,10 @@ int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp*
} else { } else {
ASSERT(0); ASSERT(0);
} }
} else { // not ready, it should wait for at least 100ms and then retry } else { // not ready, wait for 100ms and retry
qDebug("s-task:%s downstream taskId:%d (vgId:%d) not ready, wait for 100ms and retry", pTask->id.idStr,
pRsp->downstreamTaskId, pRsp->downstreamNodeId);
taosMsleep(100);
streamRecheckOneDownstream(pTask, pRsp); streamRecheckOneDownstream(pTask, pRsp);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册