提交 f7dc9497 编写于 作者: Y yihaoDeng

add stream verion

上级 3a336bbb
...@@ -28,7 +28,7 @@ ...@@ -28,7 +28,7 @@
#include "parser.h" #include "parser.h"
#include "tname.h" #include "tname.h"
#define MND_STREAM_VER_NUMBER 2 #define MND_STREAM_VER_NUMBER 3
#define MND_STREAM_RESERVE_SIZE 64 #define MND_STREAM_RESERVE_SIZE 64
#define MND_STREAM_MAX_NUM 60 #define MND_STREAM_MAX_NUM 60
...@@ -142,8 +142,8 @@ SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) { ...@@ -142,8 +142,8 @@ SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) {
int8_t sver = 0; int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto STREAM_DECODE_OVER; if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto STREAM_DECODE_OVER;
if (sver != 1 && sver != 2) { if (sver != MND_STREAM_VER_NUMBER) {
terrno = TSDB_CODE_SDB_INVALID_DATA_VER; terrno = 0;
goto STREAM_DECODE_OVER; goto STREAM_DECODE_OVER;
} }
...@@ -1264,7 +1264,7 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock ...@@ -1264,7 +1264,7 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
// task id // task id
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
char idstr[128] = {0}; char idstr[128] = {0};
int32_t len = tintToHex(pTask->id.taskId, &idstr[4]); int32_t len = tintToHex(pTask->id.taskId, &idstr[4]);
idstr[2] = '0'; idstr[2] = '0';
idstr[3] = 'x'; idstr[3] = 'x';
...@@ -1304,7 +1304,7 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock ...@@ -1304,7 +1304,7 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
colDataSetVal(pColInfo, numOfRows, (const char *)&level, false); colDataSetVal(pColInfo, numOfRows, (const char *)&level, false);
// status // status
char status[20 + VARSTR_HEADER_SIZE] = {0}; char status[20 + VARSTR_HEADER_SIZE] = {0};
int8_t taskStatus = atomic_load_8(&pTask->status.taskStatus); int8_t taskStatus = atomic_load_8(&pTask->status.taskStatus);
if (taskStatus == TASK_STATUS__NORMAL) { if (taskStatus == TASK_STATUS__NORMAL) {
memcpy(varDataVal(status), "normal", 6); memcpy(varDataVal(status), "normal", 6);
...@@ -1370,7 +1370,7 @@ static int32_t mndPauseStreamTask(STrans *pTrans, SStreamTask *pTask) { ...@@ -1370,7 +1370,7 @@ static int32_t mndPauseStreamTask(STrans *pTrans, SStreamTask *pTask) {
return 0; return 0;
} }
int32_t mndPauseAllStreamTaskImpl(STrans *pTrans, SArray* tasks) { int32_t mndPauseAllStreamTaskImpl(STrans *pTrans, SArray *tasks) {
int32_t size = taosArrayGetSize(tasks); int32_t size = taosArrayGetSize(tasks);
for (int32_t i = 0; i < size; i++) { for (int32_t i = 0; i < size; i++) {
SArray *pTasks = taosArrayGetP(tasks, i); SArray *pTasks = taosArrayGetP(tasks, i);
...@@ -1491,7 +1491,6 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) { ...@@ -1491,7 +1491,6 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
return TSDB_CODE_ACTION_IN_PROGRESS; return TSDB_CODE_ACTION_IN_PROGRESS;
} }
static int32_t mndResumeStreamTask(STrans *pTrans, SStreamTask *pTask, int8_t igUntreated) { static int32_t mndResumeStreamTask(STrans *pTrans, SStreamTask *pTask, int8_t igUntreated) {
SVResumeStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVResumeStreamTaskReq)); SVResumeStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVResumeStreamTaskReq));
if (pReq == NULL) { if (pReq == NULL) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册