提交 0e1328ae 编写于 作者: L Liu Jicong

fix(stream): set child id

上级 7ab3a1e4
...@@ -1205,6 +1205,7 @@ typedef struct { ...@@ -1205,6 +1205,7 @@ typedef struct {
int8_t completed; // all results are returned to client int8_t completed; // all results are returned to client
int8_t precision; int8_t precision;
int8_t compressed; int8_t compressed;
int8_t streamBlockType;
int32_t compLen; int32_t compLen;
int32_t numOfRows; int32_t numOfRows;
int32_t numOfCols; int32_t numOfCols;
...@@ -2493,14 +2494,14 @@ int32_t tSerializeSTableIndexReq(void* buf, int32_t bufLen, STableIndexReq* pReq ...@@ -2493,14 +2494,14 @@ int32_t tSerializeSTableIndexReq(void* buf, int32_t bufLen, STableIndexReq* pReq
int32_t tDeserializeSTableIndexReq(void* buf, int32_t bufLen, STableIndexReq* pReq); int32_t tDeserializeSTableIndexReq(void* buf, int32_t bufLen, STableIndexReq* pReq);
typedef struct { typedef struct {
int8_t intervalUnit; int8_t intervalUnit;
int8_t slidingUnit; int8_t slidingUnit;
int64_t interval; int64_t interval;
int64_t offset; int64_t offset;
int64_t sliding; int64_t sliding;
int64_t dstTbUid; int64_t dstTbUid;
int32_t dstVgId; // for stream int32_t dstVgId; // for stream
char* expr; char* expr;
} STableIndexInfo; } STableIndexInfo;
typedef struct { typedef struct {
...@@ -2510,7 +2511,6 @@ typedef struct { ...@@ -2510,7 +2511,6 @@ typedef struct {
int32_t tSerializeSTableIndexRsp(void* buf, int32_t bufLen, const STableIndexRsp* pRsp); int32_t tSerializeSTableIndexRsp(void* buf, int32_t bufLen, const STableIndexRsp* pRsp);
int32_t tDeserializeSTableIndexRsp(void* buf, int32_t bufLen, STableIndexRsp* pRsp); int32_t tDeserializeSTableIndexRsp(void* buf, int32_t bufLen, STableIndexRsp* pRsp);
typedef struct { typedef struct {
int8_t mqMsgType; int8_t mqMsgType;
int32_t code; int32_t code;
...@@ -2751,8 +2751,8 @@ typedef struct { ...@@ -2751,8 +2751,8 @@ typedef struct {
char* msg; char* msg;
} SVDeleteReq; } SVDeleteReq;
int32_t tSerializeSVDeleteReq(void *buf, int32_t bufLen, SVDeleteReq *pReq); int32_t tSerializeSVDeleteReq(void* buf, int32_t bufLen, SVDeleteReq* pReq);
int32_t tDeserializeSVDeleteReq(void *buf, int32_t bufLen, SVDeleteReq *pReq); int32_t tDeserializeSVDeleteReq(void* buf, int32_t bufLen, SVDeleteReq* pReq);
typedef struct { typedef struct {
int64_t affectedRows; int64_t affectedRows;
......
...@@ -35,6 +35,13 @@ ...@@ -35,6 +35,13 @@
extern bool tsStreamSchedV; extern bool tsStreamSchedV;
static int32_t mndAddTaskToTaskSet(SArray* pArray, SStreamTask* pTask) {
int32_t childId = taosArrayGetSize(pArray);
pTask->childId = childId;
taosArrayPush(pArray, &pTask);
return 0;
}
int32_t mndConvertRSmaTask(const char* ast, int64_t uid, int8_t triggerType, int64_t watermark, char** pStr, int32_t mndConvertRSmaTask(const char* ast, int64_t uid, int8_t triggerType, int64_t watermark, char** pStr,
int32_t* pLen, double filesFactor) { int32_t* pLen, double filesFactor) {
SNode* pAst = NULL; SNode* pAst = NULL;
...@@ -195,7 +202,7 @@ int32_t mndAddShuffledSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* p ...@@ -195,7 +202,7 @@ int32_t mndAddShuffledSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* p
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
taosArrayPush(tasks, &pTask); mndAddTaskToTaskSet(tasks, pTask);
pTask->nodeId = pVgroup->vgId; pTask->nodeId = pVgroup->vgId;
pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup); pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup);
...@@ -235,7 +242,7 @@ int32_t mndAddFixedSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStr ...@@ -235,7 +242,7 @@ int32_t mndAddFixedSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStr
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
taosArrayPush(tasks, &pTask); mndAddTaskToTaskSet(tasks, pTask);
pTask->nodeId = pStream->fixedSinkVgId; pTask->nodeId = pStream->fixedSinkVgId;
#if 0 #if 0
...@@ -378,7 +385,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { ...@@ -378,7 +385,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
return -1; return -1;
} }
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
taosArrayPush(taskOneLevel, &pTask); mndAddTaskToTaskSet(taskOneLevel, pTask);
} }
} else { } else {
// merge plan // merge plan
......
...@@ -53,6 +53,7 @@ int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock ...@@ -53,6 +53,7 @@ int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock
SSDataBlock* pDataBlock = taosArrayGet(pArray, i); SSDataBlock* pDataBlock = taosArrayGet(pArray, i);
blockCompressDecode(pDataBlock, htonl(pRetrieve->numOfCols), htonl(pRetrieve->numOfRows), pRetrieve->data); blockCompressDecode(pDataBlock, htonl(pRetrieve->numOfCols), htonl(pRetrieve->numOfRows), pRetrieve->data);
// TODO: refactor // TODO: refactor
pDataBlock->info.type = pRetrieve->streamBlockType;
pDataBlock->info.childId = pReq->sourceChildId; pDataBlock->info.childId = pReq->sourceChildId;
} }
pData->blocks = pArray; pData->blocks = pArray;
......
...@@ -72,6 +72,7 @@ static int32_t streamAddBlockToDispatchMsg(const SSDataBlock* pBlock, SStreamDis ...@@ -72,6 +72,7 @@ static int32_t streamAddBlockToDispatchMsg(const SSDataBlock* pBlock, SStreamDis
pRetrieve->precision = TSDB_DEFAULT_PRECISION; pRetrieve->precision = TSDB_DEFAULT_PRECISION;
pRetrieve->compressed = 0; pRetrieve->compressed = 0;
pRetrieve->completed = 1; pRetrieve->completed = 1;
pRetrieve->streamBlockType = pBlock->info.type;
pRetrieve->numOfRows = htonl(pBlock->info.rows); pRetrieve->numOfRows = htonl(pBlock->info.rows);
pRetrieve->numOfCols = htonl(pBlock->info.numOfCols); pRetrieve->numOfCols = htonl(pBlock->info.numOfCols);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册