提交 00acf452 编写于 作者: L Liu Jicong

refactor(stream): remove option

上级 4c49d99a
...@@ -270,7 +270,7 @@ typedef struct SStreamTask { ...@@ -270,7 +270,7 @@ typedef struct SStreamTask {
int64_t startVer; int64_t startVer;
int64_t checkpointVer; int64_t checkpointVer;
int64_t processedVer; int64_t processedVer;
int32_t numOfVgroups; // int32_t numOfVgroups;
// children info // children info
SArray* childEpInfo; // SArray<SStreamChildEpInfo*> SArray* childEpInfo; // SArray<SStreamChildEpInfo*>
......
...@@ -89,7 +89,7 @@ static void vmProcessStreamQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { ...@@ -89,7 +89,7 @@ static void vmProcessStreamQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, pMsg, pInfo); int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, pMsg, pInfo);
if (code != 0) { if (code != 0) {
if (terrno != 0) code = terrno; if (terrno != 0) code = terrno;
dGError("vgId:%d, msg:%p failed to stream since %s", pVnode->vgId, pMsg, terrstr()); dGError("vgId:%d, msg:%p failed to process stream since %s", pVnode->vgId, pMsg, terrstr());
vmSendRsp(pMsg, code); vmSendRsp(pMsg, code);
} }
......
...@@ -391,10 +391,12 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { ...@@ -391,10 +391,12 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
// exec // exec
pInnerTask->execType = TASK_EXEC__PIPE; pInnerTask->execType = TASK_EXEC__PIPE;
#if 0
SDbObj* pSourceDb = mndAcquireDb(pMnode, pStream->sourceDb); SDbObj* pSourceDb = mndAcquireDb(pMnode, pStream->sourceDb);
ASSERT(pDbObj != NULL); ASSERT(pDbObj != NULL);
sdbRelease(pSdb, pSourceDb); sdbRelease(pSdb, pSourceDb);
pInnerTask->numOfVgroups = pSourceDb->cfg.numOfVgroups; pInnerTask->numOfVgroups = pSourceDb->cfg.numOfVgroups;
#endif
if (tsSchedStreamToSnode) { if (tsSchedStreamToSnode) {
SSnodeObj* pSnode = mndSchedFetchOneSnode(pMnode); SSnodeObj* pSnode = mndSchedFetchOneSnode(pMnode);
......
...@@ -653,7 +653,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen) { ...@@ -653,7 +653,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen) {
} else { } else {
SReadHandle mgHandle = { SReadHandle mgHandle = {
.vnode = NULL, .vnode = NULL,
.numOfVgroups = pTask->numOfVgroups, .numOfVgroups = (int32_t)taosArrayGetSize(pTask->childEpInfo),
}; };
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &mgHandle); pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &mgHandle);
} }
......
...@@ -64,7 +64,7 @@ int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { ...@@ -64,7 +64,7 @@ int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
if (tEncodeI32(pEncoder, pTask->selfChildId) < 0) return -1; if (tEncodeI32(pEncoder, pTask->selfChildId) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->nodeId) < 0) return -1; if (tEncodeI32(pEncoder, pTask->nodeId) < 0) return -1;
if (tEncodeSEpSet(pEncoder, &pTask->epSet) < 0) return -1; if (tEncodeSEpSet(pEncoder, &pTask->epSet) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->numOfVgroups) < 0) return -1; /*if (tEncodeI32(pEncoder, pTask->numOfVgroups) < 0) return -1;*/
int32_t epSz = taosArrayGetSize(pTask->childEpInfo); int32_t epSz = taosArrayGetSize(pTask->childEpInfo);
if (tEncodeI32(pEncoder, epSz) < 0) return -1; if (tEncodeI32(pEncoder, epSz) < 0) return -1;
...@@ -119,7 +119,7 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { ...@@ -119,7 +119,7 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
if (tDecodeI32(pDecoder, &pTask->selfChildId) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->selfChildId) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->nodeId) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->nodeId) < 0) return -1;
if (tDecodeSEpSet(pDecoder, &pTask->epSet) < 0) return -1; if (tDecodeSEpSet(pDecoder, &pTask->epSet) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->numOfVgroups) < 0) return -1; /*if (tDecodeI32(pDecoder, &pTask->numOfVgroups) < 0) return -1;*/
int32_t epSz; int32_t epSz;
if (tDecodeI32(pDecoder, &epSz) < 0) return -1; if (tDecodeI32(pDecoder, &epSz) < 0) return -1;
......
...@@ -61,26 +61,31 @@ static inline int32_t compareWalFileInfo(const void* pLeft, const void* pRight) ...@@ -61,26 +61,31 @@ static inline int32_t compareWalFileInfo(const void* pLeft, const void* pRight)
} }
static inline int64_t walGetLastFileSize(SWal* pWal) { static inline int64_t walGetLastFileSize(SWal* pWal) {
if (taosArrayGetSize(pWal->fileInfoSet) == 0) return 0;
SWalFileInfo* pInfo = (SWalFileInfo*)taosArrayGetLast(pWal->fileInfoSet); SWalFileInfo* pInfo = (SWalFileInfo*)taosArrayGetLast(pWal->fileInfoSet);
return pInfo->fileSize; return pInfo->fileSize;
} }
static inline int64_t walGetLastFileFirstVer(SWal* pWal) { static inline int64_t walGetLastFileFirstVer(SWal* pWal) {
if (taosArrayGetSize(pWal->fileInfoSet) == 0) return -1;
SWalFileInfo* pInfo = (SWalFileInfo*)taosArrayGetLast(pWal->fileInfoSet); SWalFileInfo* pInfo = (SWalFileInfo*)taosArrayGetLast(pWal->fileInfoSet);
return pInfo->firstVer; return pInfo->firstVer;
} }
static inline int64_t walGetCurFileFirstVer(SWal* pWal) { static inline int64_t walGetCurFileFirstVer(SWal* pWal) {
if (pWal->writeCur == -1) return -1;
SWalFileInfo* pInfo = (SWalFileInfo*)taosArrayGet(pWal->fileInfoSet, pWal->writeCur); SWalFileInfo* pInfo = (SWalFileInfo*)taosArrayGet(pWal->fileInfoSet, pWal->writeCur);
return pInfo->firstVer; return pInfo->firstVer;
} }
static inline int64_t walGetCurFileLastVer(SWal* pWal) { static inline int64_t walGetCurFileLastVer(SWal* pWal) {
if (pWal->writeCur == -1) return -1;
SWalFileInfo* pInfo = (SWalFileInfo*)taosArrayGet(pWal->fileInfoSet, pWal->writeCur); SWalFileInfo* pInfo = (SWalFileInfo*)taosArrayGet(pWal->fileInfoSet, pWal->writeCur);
return pInfo->firstVer; return pInfo->firstVer;
} }
static inline int64_t walGetCurFileOffset(SWal* pWal) { static inline int64_t walGetCurFileOffset(SWal* pWal) {
if (pWal->writeCur == -1) return -1;
SWalFileInfo* pInfo = (SWalFileInfo*)taosArrayGet(pWal->fileInfoSet, pWal->writeCur); SWalFileInfo* pInfo = (SWalFileInfo*)taosArrayGet(pWal->fileInfoSet, pWal->writeCur);
return pInfo->fileSize; return pInfo->fileSize;
} }
...@@ -88,6 +93,7 @@ static inline int64_t walGetCurFileOffset(SWal* pWal) { ...@@ -88,6 +93,7 @@ static inline int64_t walGetCurFileOffset(SWal* pWal) {
static inline bool walCurFileClosed(SWal* pWal) { return taosArrayGetSize(pWal->fileInfoSet) != pWal->writeCur; } static inline bool walCurFileClosed(SWal* pWal) { return taosArrayGetSize(pWal->fileInfoSet) != pWal->writeCur; }
static inline SWalFileInfo* walGetCurFileInfo(SWal* pWal) { static inline SWalFileInfo* walGetCurFileInfo(SWal* pWal) {
if (pWal->writeCur == -1) return NULL;
return (SWalFileInfo*)taosArrayGet(pWal->fileInfoSet, pWal->writeCur); return (SWalFileInfo*)taosArrayGet(pWal->fileInfoSet, pWal->writeCur);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册