提交 87785b1d 编写于 作者: H Haojun Liao

refactor: add some logs.

上级 a18989df
...@@ -29,9 +29,10 @@ ...@@ -29,9 +29,10 @@
#include "tname.h" #include "tname.h"
#include "tmisce.h" #include "tmisce.h"
#define MND_STREAM_VER_NUMBER 3 #define MND_STREAM_VER_NUMBER 3
#define MND_STREAM_RESERVE_SIZE 64 #define MND_STREAM_RESERVE_SIZE 64
#define MND_STREAM_MAX_NUM 60 #define MND_STREAM_MAX_NUM 60
#define MND_STREAM_CHECKPOINT_NAME "stream-checkpoint"
typedef struct SNodeEntry { typedef struct SNodeEntry {
int32_t nodeId; int32_t nodeId;
...@@ -52,7 +53,6 @@ typedef struct SVgroupChangeInfo { ...@@ -52,7 +53,6 @@ typedef struct SVgroupChangeInfo {
static int32_t mndNodeCheckSentinel = 0; static int32_t mndNodeCheckSentinel = 0;
static SStreamVnodeRevertIndex execNodeList; static SStreamVnodeRevertIndex execNodeList;
#define MND_STREAM_CHECKPOINT_NAME "stream-checkpoint"
static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream); static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream);
static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream); static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream);
static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pOldStream, SStreamObj *pNewStream); static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pOldStream, SStreamObj *pNewStream);
...@@ -1827,7 +1827,7 @@ void initTransAction(STransAction* pAction, void* pCont, int32_t contLen, int32_ ...@@ -1827,7 +1827,7 @@ void initTransAction(STransAction* pAction, void* pCont, int32_t contLen, int32_
// todo extract method: traverse stream tasks // todo extract method: traverse stream tasks
// build trans to update the epset // build trans to update the epset
static int32_t createStreamUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SVgroupChangeInfo* pInfo) { static int32_t createStreamUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SVgroupChangeInfo* pInfo) {
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, NULL, "stream-task-update"); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, NULL, "stream-task-update");
if (pTrans == NULL) { if (pTrans == NULL) {
mError("failed to build stream task DAG update, reason: %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY)); mError("failed to build stream task DAG update, reason: %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY));
return -1; return -1;
......
...@@ -1902,6 +1902,8 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1902,6 +1902,8 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
_end: _end:
tDecoderClear(&decoder); tDecoderClear(&decoder);
tmsgSendRsp(&rsp); tmsgSendRsp(&rsp);
tqDebug("s-task:%s task nodeEp update completed", pTask->id.idStr);
return code; return code;
} }
......
...@@ -89,7 +89,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i ...@@ -89,7 +89,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i
resetTaskInfo(pExecutor); resetTaskInfo(pExecutor);
} }
qError("unexpected stream execution, s-task:%s since %s", pTask->id.idStr, terrstr()); qError("unexpected stream execution, s-task:%s since %s", pTask->id.idStr, tstrerror(code));
continue; continue;
} }
...@@ -547,7 +547,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { ...@@ -547,7 +547,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
int32_t numOfBlocks = 0; int32_t numOfBlocks = 0;
SStreamQueueItem* pInput = NULL; SStreamQueueItem* pInput = NULL;
if (streamTaskShouldStop(&pTask->status)) { if (streamTaskShouldStop(&pTask->status)) {
qDebug("s-task:%s stream task stopped, abort", id); qDebug("s-task:%s stream task is stopped", id);
break; break;
} }
......
...@@ -254,7 +254,9 @@ static void freeUpstreamItem(void* p) { ...@@ -254,7 +254,9 @@ static void freeUpstreamItem(void* p) {
} }
void tFreeStreamTask(SStreamTask* pTask) { void tFreeStreamTask(SStreamTask* pTask) {
qDebug("free s-task:0x%x, %p", pTask->id.taskId, pTask); int32_t taskId = pTask->id.taskId;
qDebug("free s-task:0x%x, %p", taskId, pTask);
// remove the ref by timer // remove the ref by timer
while(pTask->status.timerActive > 0) { while(pTask->status.timerActive > 0) {
...@@ -304,6 +306,7 @@ void tFreeStreamTask(SStreamTask* pTask) { ...@@ -304,6 +306,7 @@ void tFreeStreamTask(SStreamTask* pTask) {
} }
if (pTask->pState) { if (pTask->pState) {
qDebug("s-task:0x%x start to free task state", taskId);
streamStateClose(pTask->pState, status == TASK_STATUS__DROPPING); streamStateClose(pTask->pState, status == TASK_STATUS__DROPPING);
} }
...@@ -330,6 +333,8 @@ void tFreeStreamTask(SStreamTask* pTask) { ...@@ -330,6 +333,8 @@ void tFreeStreamTask(SStreamTask* pTask) {
taosThreadMutexDestroy(&pTask->lock); taosThreadMutexDestroy(&pTask->lock);
taosMemoryFree(pTask); taosMemoryFree(pTask);
qDebug("s-task:0x%x free task completed", taskId);
} }
int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, int64_t ver) { int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, int64_t ver) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册