提交 74a253ea 编写于 作者: L Liu Jicong

refactor

上级 2c13c8b3
...@@ -3274,10 +3274,15 @@ void tDestroySSubmitRsp2(SSubmitRsp2* pRsp, int32_t flag); ...@@ -3274,10 +3274,15 @@ void tDestroySSubmitRsp2(SSubmitRsp2* pRsp, int32_t flag);
#define TSDB_MSG_FLG_DECODE 0x2 #define TSDB_MSG_FLG_DECODE 0x2
typedef struct { typedef struct {
void* msgStr; union {
int32_t msgLen; struct {
int64_t ver; void* msgStr;
} SPackedSubmit; int32_t msgLen;
int64_t ver;
};
void* pDataBlock;
};
} SPackedData;
#pragma pack(pop) #pragma pack(pop)
......
...@@ -192,7 +192,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT ...@@ -192,7 +192,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
// int32_t qStreamScanMemData(qTaskInfo_t tinfo, const SSubmitReq* pReq, int64_t ver); // int32_t qStreamScanMemData(qTaskInfo_t tinfo, const SSubmitReq* pReq, int64_t ver);
// //
int32_t qStreamSetScanMemData(qTaskInfo_t tinfo, SPackedSubmit submit); int32_t qStreamSetScanMemData(qTaskInfo_t tinfo, SPackedData submit);
int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset); int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset);
......
...@@ -120,10 +120,10 @@ typedef struct { ...@@ -120,10 +120,10 @@ typedef struct {
#endif #endif
typedef struct { typedef struct {
int8_t type; int8_t type;
int64_t ver; int64_t ver;
int32_t* dataRef; int32_t* dataRef;
SPackedSubmit submit; SPackedData submit;
} SStreamDataSubmit2; } SStreamDataSubmit2;
typedef struct { typedef struct {
...@@ -235,7 +235,7 @@ static FORCE_INLINE void* streamQueueNextItem(SStreamQueue* queue) { ...@@ -235,7 +235,7 @@ static FORCE_INLINE void* streamQueueNextItem(SStreamQueue* queue) {
} }
} }
SStreamDataSubmit2* streamDataSubmitNew(SPackedSubmit submit); SStreamDataSubmit2* streamDataSubmitNew(SPackedData submit);
void streamDataSubmitRefDec(SStreamDataSubmit2* pDataSubmit); void streamDataSubmitRefDec(SStreamDataSubmit2* pDataSubmit);
......
...@@ -226,8 +226,8 @@ typedef struct STqReader { ...@@ -226,8 +226,8 @@ typedef struct STqReader {
// SSubmitMsgIter msgIter; // SSubmitMsgIter msgIter;
// SSubmitBlkIter blkIter; // SSubmitBlkIter blkIter;
int64_t ver; int64_t ver;
SPackedSubmit msg2; SPackedData msg2;
int8_t setMsg; int8_t setMsg;
SSubmitReq2 submit; SSubmitReq2 submit;
......
...@@ -153,7 +153,7 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs ...@@ -153,7 +153,7 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs
int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** pHeadWithCkSum); int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** pHeadWithCkSum);
// tqExec // tqExec
int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedSubmit submit, STaosxRsp* pRsp); int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxRsp* pRsp);
// int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp* pRsp); // int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp* pRsp);
int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision); int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision);
int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp); int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp);
......
...@@ -180,7 +180,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t version, char* msg, int32_t msg ...@@ -180,7 +180,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t version, char* msg, int32_t msg
int32_t tqProcessTaskDropReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen); int32_t tqProcessTaskDropReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t version, char* msg, int32_t msgLen); int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
int32_t tqProcessSubmitReq(STQ* pTq, SPackedSubmit submit); int32_t tqProcessSubmitReq(STQ* pTq, SPackedData submit);
int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver); int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver);
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec); int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec);
......
...@@ -673,7 +673,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -673,7 +673,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
req.epoch, TD_VID(pTq->pVnode), fetchVer, pHead->msgType); req.epoch, TD_VID(pTq->pVnode), fetchVer, pHead->msgType);
if (pHead->msgType == TDMT_VND_SUBMIT) { if (pHead->msgType == TDMT_VND_SUBMIT) {
SPackedSubmit submit = { SPackedData submit = {
.msgStr = POINTER_SHIFT(pHead->body, sizeof(SMsgHead)), .msgStr = POINTER_SHIFT(pHead->body, sizeof(SMsgHead)),
.msgLen = pHead->bodyLen - sizeof(SMsgHead), .msgLen = pHead->bodyLen - sizeof(SMsgHead),
.ver = pHead->version, .ver = pHead->version,
...@@ -1332,7 +1332,7 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) { ...@@ -1332,7 +1332,7 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) {
return 0; return 0;
} }
int32_t tqProcessSubmitReq(STQ* pTq, SPackedSubmit submit) { int32_t tqProcessSubmitReq(STQ* pTq, SPackedData submit) {
void* pIter = NULL; void* pIter = NULL;
bool failed = false; bool failed = false;
SStreamDataSubmit2* pSubmit = NULL; SStreamDataSubmit2* pSubmit = NULL;
......
...@@ -215,7 +215,7 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMeta ...@@ -215,7 +215,7 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMeta
return 0; return 0;
} }
int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedSubmit submit, STaosxRsp* pRsp) { int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxRsp* pRsp) {
STqExecHandle* pExec = &pHandle->execHandle; STqExecHandle* pExec = &pHandle->execHandle;
ASSERT(pExec->subType != TOPIC_SUB_TYPE__COLUMN); ASSERT(pExec->subType != TOPIC_SUB_TYPE__COLUMN);
......
...@@ -258,7 +258,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) ...@@ -258,7 +258,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
SMqDataRsp* pRsp = &pPushEntry->dataRsp; SMqDataRsp* pRsp = &pPushEntry->dataRsp;
// prepare scan mem data // prepare scan mem data
SPackedSubmit submit = { SPackedData submit = {
.msgStr = data, .msgStr = data,
.msgLen = len, .msgLen = len,
.ver = ver, .ver = ver,
...@@ -324,7 +324,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) ...@@ -324,7 +324,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
return -1; return -1;
} }
memcpy(data, pReq, len); memcpy(data, pReq, len);
SPackedSubmit submit = { SPackedData submit = {
.msgStr = data, .msgStr = data,
.msgLen = len, .msgLen = len,
.ver = ver, .ver = ver,
......
...@@ -133,7 +133,7 @@ typedef struct { ...@@ -133,7 +133,7 @@ typedef struct {
int64_t snapshotVer; int64_t snapshotVer;
// const SSubmitReq* pReq; // const SSubmitReq* pReq;
SPackedSubmit submit; SPackedData submit;
SSchemaWrapper* schema; SSchemaWrapper* schema;
char tbName[TSDB_TABLE_NAME_LEN]; char tbName[TSDB_TABLE_NAME_LEN];
......
...@@ -115,7 +115,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu ...@@ -115,7 +115,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
if (type == STREAM_INPUT__MERGED_SUBMIT) { if (type == STREAM_INPUT__MERGED_SUBMIT) {
// ASSERT(numOfBlocks > 1); // ASSERT(numOfBlocks > 1);
for (int32_t i = 0; i < numOfBlocks; i++) { for (int32_t i = 0; i < numOfBlocks; i++) {
SPackedSubmit* pReq = POINTER_SHIFT(input, i * sizeof(SPackedSubmit)); SPackedData* pReq = POINTER_SHIFT(input, i * sizeof(SPackedData));
taosArrayPush(pInfo->pBlockLists, pReq); taosArrayPush(pInfo->pBlockLists, pReq);
} }
pInfo->blockType = STREAM_INPUT__DATA_SUBMIT; pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
...@@ -125,9 +125,9 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu ...@@ -125,9 +125,9 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
pInfo->blockType = STREAM_INPUT__DATA_SUBMIT; pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
} else if (type == STREAM_INPUT__DATA_BLOCK) { } else if (type == STREAM_INPUT__DATA_BLOCK) {
for (int32_t i = 0; i < numOfBlocks; ++i) { for (int32_t i = 0; i < numOfBlocks; ++i) {
SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i]; SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i];
SPackedSubmit tmp = { SPackedData tmp = {
.msgStr = pDataBlock, .pDataBlock = pDataBlock,
}; };
taosArrayPush(pInfo->pBlockLists, &tmp); taosArrayPush(pInfo->pBlockLists, &tmp);
} }
...@@ -1016,7 +1016,7 @@ int32_t qStreamScanMemData(qTaskInfo_t tinfo, const SSubmitReq* pReq, int64_t sc ...@@ -1016,7 +1016,7 @@ int32_t qStreamScanMemData(qTaskInfo_t tinfo, const SSubmitReq* pReq, int64_t sc
} }
#endif #endif
int32_t qStreamSetScanMemData(qTaskInfo_t tinfo, SPackedSubmit submit) { int32_t qStreamSetScanMemData(qTaskInfo_t tinfo, SPackedData submit) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE); ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE);
ASSERT(pTaskInfo->streamInfo.submit.msgStr == NULL); ASSERT(pTaskInfo->streamInfo.submit.msgStr == NULL);
......
...@@ -1532,10 +1532,10 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { ...@@ -1532,10 +1532,10 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
/*const SSubmitReq* pSubmit = pInfo->tqReader->pMsg;*/ /*const SSubmitReq* pSubmit = pInfo->tqReader->pMsg;*/
/*if (tqReaderSetDataMsg(pInfo->tqReader, pSubmit, 0) < 0) {*/ /*if (tqReaderSetDataMsg(pInfo->tqReader, pSubmit, 0) < 0) {*/
/*void* msgStr = pTaskInfo->streamInfo.*/ /*void* msgStr = pTaskInfo->streamInfo.*/
SPackedSubmit submit = pTaskInfo->streamInfo.submit; SPackedData submit = pTaskInfo->streamInfo.submit;
if (tqReaderSetSubmitReq2(pInfo->tqReader, submit.msgStr, submit.msgLen, submit.ver) < 0) { if (tqReaderSetSubmitReq2(pInfo->tqReader, submit.msgStr, submit.msgLen, submit.ver) < 0) {
qError("submit msg messed up when initing stream submit block %p", submit.msgStr); qError("submit msg messed up when initing stream submit block %p", submit.msgStr);
pInfo->tqReader->msg2 = (SPackedSubmit){0}; pInfo->tqReader->msg2 = (SPackedData){0};
pInfo->tqReader->setMsg = 0; pInfo->tqReader->setMsg = 0;
ASSERT(0); ASSERT(0);
} }
...@@ -1560,9 +1560,9 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { ...@@ -1560,9 +1560,9 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
} }
} }
pInfo->tqReader->msg2 = (SPackedSubmit){0}; pInfo->tqReader->msg2 = (SPackedData){0};
pInfo->tqReader->setMsg = 0; pInfo->tqReader->setMsg = 0;
pTaskInfo->streamInfo.submit = (SPackedSubmit){0}; pTaskInfo->streamInfo.submit = (SPackedData){0};
return NULL; return NULL;
} }
...@@ -1791,7 +1791,8 @@ FETCH_NEXT_BLOCK: ...@@ -1791,7 +1791,8 @@ FETCH_NEXT_BLOCK:
} }
int32_t current = pInfo->validBlockIndex++; int32_t current = pInfo->validBlockIndex++;
SSDataBlock* pBlock = taosArrayGetP(pInfo->pBlockLists, current); SPackedData* pPacked = taosArrayGet(pInfo->pBlockLists, current);
SSDataBlock* pBlock = pPacked->pDataBlock;
if (pBlock->info.id.groupId && pBlock->info.parTbName[0]) { if (pBlock->info.id.groupId && pBlock->info.parTbName[0]) {
streamStatePutParName(pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, pBlock->info.parTbName); streamStatePutParName(pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, pBlock->info.parTbName);
} }
...@@ -1928,8 +1929,8 @@ FETCH_NEXT_BLOCK: ...@@ -1928,8 +1929,8 @@ FETCH_NEXT_BLOCK:
return NULL; return NULL;
} }
int32_t current = pInfo->validBlockIndex++; int32_t current = pInfo->validBlockIndex++;
SPackedSubmit* pSubmit = taosArrayGet(pInfo->pBlockLists, current); SPackedData* pSubmit = taosArrayGet(pInfo->pBlockLists, current);
/*if (tqReaderSetDataMsg(pInfo->tqReader, pSubmit, 0) < 0) {*/ /*if (tqReaderSetDataMsg(pInfo->tqReader, pSubmit, 0) < 0) {*/
if (tqReaderSetSubmitReq2(pInfo->tqReader, pSubmit->msgStr, pSubmit->msgLen, pSubmit->ver) < 0) { if (tqReaderSetSubmitReq2(pInfo->tqReader, pSubmit->msgStr, pSubmit->msgLen, pSubmit->ver) < 0) {
qError("submit msg messed up when initing stream submit block %p, current %d, total %d", pSubmit, current, qError("submit msg messed up when initing stream submit block %p, current %d, total %d", pSubmit, current,
...@@ -2264,7 +2265,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys ...@@ -2264,7 +2265,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
} }
} }
pInfo->pBlockLists = taosArrayInit(4, sizeof(SPackedSubmit)); pInfo->pBlockLists = taosArrayInit(4, sizeof(SPackedData));
if (pInfo->pBlockLists == NULL) { if (pInfo->pBlockLists == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _error; goto _error;
......
...@@ -66,7 +66,7 @@ int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock ...@@ -66,7 +66,7 @@ int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock
return 0; return 0;
} }
SStreamDataSubmit2* streamDataSubmitNew(SPackedSubmit submit) { SStreamDataSubmit2* streamDataSubmitNew(SPackedData submit) {
SStreamDataSubmit2* pDataSubmit = (SStreamDataSubmit2*)taosAllocateQitem(sizeof(SStreamDataSubmit2), DEF_QITEM); SStreamDataSubmit2* pDataSubmit = (SStreamDataSubmit2*)taosAllocateQitem(sizeof(SStreamDataSubmit2), DEF_QITEM);
if (pDataSubmit == NULL) return NULL; if (pDataSubmit == NULL) return NULL;
pDataSubmit->dataRef = (int32_t*)taosMemoryMalloc(sizeof(int32_t)); pDataSubmit->dataRef = (int32_t*)taosMemoryMalloc(sizeof(int32_t));
...@@ -83,7 +83,7 @@ FAIL: ...@@ -83,7 +83,7 @@ FAIL:
SStreamMergedSubmit2* streamMergedSubmitNew() { SStreamMergedSubmit2* streamMergedSubmitNew() {
SStreamMergedSubmit2* pMerged = (SStreamMergedSubmit2*)taosAllocateQitem(sizeof(SStreamMergedSubmit2), DEF_QITEM); SStreamMergedSubmit2* pMerged = (SStreamMergedSubmit2*)taosAllocateQitem(sizeof(SStreamMergedSubmit2), DEF_QITEM);
if (pMerged == NULL) return NULL; if (pMerged == NULL) return NULL;
pMerged->submits = taosArrayInit(0, sizeof(SPackedSubmit)); pMerged->submits = taosArrayInit(0, sizeof(SPackedData));
pMerged->dataRefs = taosArrayInit(0, sizeof(void*)); pMerged->dataRefs = taosArrayInit(0, sizeof(void*));
if (pMerged->dataRefs == NULL || pMerged->submits == NULL) goto FAIL; if (pMerged->dataRefs == NULL || pMerged->submits == NULL) goto FAIL;
pMerged->type = STREAM_INPUT__MERGED_SUBMIT; pMerged->type = STREAM_INPUT__MERGED_SUBMIT;
...@@ -172,7 +172,7 @@ void streamFreeQitem(SStreamQueueItem* data) { ...@@ -172,7 +172,7 @@ void streamFreeQitem(SStreamQueueItem* data) {
int32_t ref = atomic_sub_fetch_32(pRef, 1); int32_t ref = atomic_sub_fetch_32(pRef, 1);
ASSERT(ref >= 0); ASSERT(ref >= 0);
if (ref == 0) { if (ref == 0) {
SPackedSubmit* pSubmit = (SPackedSubmit*)taosArrayGet(pMerge->submits, i); SPackedData* pSubmit = (SPackedData*)taosArrayGet(pMerge->submits, i);
taosMemoryFree(pSubmit->msgStr); taosMemoryFree(pSubmit->msgStr);
taosMemoryFree(pRef); taosMemoryFree(pRef);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册