提交 9e7bbf62 编写于 作者: L Liu Jicong

refactor: support submitreq2

上级 45b93280
...@@ -674,8 +674,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -674,8 +674,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
if (pHead->msgType == TDMT_VND_SUBMIT) { if (pHead->msgType == TDMT_VND_SUBMIT) {
SPackedSubmit submit = { SPackedSubmit submit = {
.msgStr = pHead->body, .msgStr = POINTER_SHIFT(pHead->body, sizeof(SMsgHead)),
.msgLen = pHead->bodyLen, .msgLen = pHead->bodyLen - sizeof(SMsgHead),
.ver = pHead->version, .ver = pHead->version,
}; };
if (tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp) < 0) { if (tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp) < 0) {
......
...@@ -320,8 +320,8 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) { ...@@ -320,8 +320,8 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) {
ASSERT(ret->offset.version >= 0); ASSERT(ret->offset.version >= 0);
return -1; return -1;
} }
void* body = pReader->pWalReader->pHead->head.body; void* body = POINTER_SHIFT(pReader->pWalReader->pHead->head.body, sizeof(SMsgHead));
int32_t bodyLen = pReader->pWalReader->pHead->head.bodyLen; int32_t bodyLen = pReader->pWalReader->pHead->head.bodyLen - sizeof(SMsgHead);
int64_t ver = pReader->pWalReader->pHead->head.version; int64_t ver = pReader->pWalReader->pHead->head.version;
#if 0 #if 0
if (pReader->pWalReader->pHead->head.msgType != TDMT_VND_SUBMIT) { if (pReader->pWalReader->pHead->head.msgType != TDMT_VND_SUBMIT) {
...@@ -383,9 +383,11 @@ int32_t tqReaderSetSubmitReq2(STqReader* pReader, void* msgStr, int32_t msgLen, ...@@ -383,9 +383,11 @@ int32_t tqReaderSetSubmitReq2(STqReader* pReader, void* msgStr, int32_t msgLen,
ASSERT(pReader->msg2.msgStr == NULL); ASSERT(pReader->msg2.msgStr == NULL);
ASSERT(msgStr); ASSERT(msgStr);
ASSERT(msgLen); ASSERT(msgLen);
ASSERT(ver >= 0);
pReader->msg2.msgStr = msgStr; pReader->msg2.msgStr = msgStr;
pReader->msg2.msgLen = msgLen; pReader->msg2.msgLen = msgLen;
pReader->msg2.ver = ver; pReader->msg2.ver = ver;
pReader->ver = ver;
tqDebug("tq reader set msg %p", msgStr); tqDebug("tq reader set msg %p", msgStr);
......
...@@ -131,8 +131,7 @@ typedef struct { ...@@ -131,8 +131,7 @@ typedef struct {
SMqMetaRsp metaRsp; // for tmq fetching meta SMqMetaRsp metaRsp; // for tmq fetching meta
int8_t returned; int8_t returned;
int64_t snapshotVer; int64_t snapshotVer;
const SSubmitReq* pReq; // const SSubmitReq* pReq;
int64_t scanVer;
SPackedSubmit submit; SPackedSubmit submit;
......
...@@ -13,8 +13,8 @@ ...@@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "filter.h"
#include "executorimpl.h" #include "executorimpl.h"
#include "filter.h"
#include "functionMgt.h" #include "functionMgt.h"
typedef struct SProjectOperatorInfo { typedef struct SProjectOperatorInfo {
...@@ -90,7 +90,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys ...@@ -90,7 +90,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys
pInfo->binfo.pRes = pResBlock; pInfo->binfo.pRes = pResBlock;
pInfo->pFinalRes = createOneDataBlock(pResBlock, false); pInfo->pFinalRes = createOneDataBlock(pResBlock, false);
pInfo->mergeDataBlocks = (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM)? false:pProjPhyNode->mergeDataBlock; pInfo->mergeDataBlocks = (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM) ? false : pProjPhyNode->mergeDataBlock;
int32_t numOfRows = 4096; int32_t numOfRows = 4096;
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
...@@ -117,9 +117,10 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys ...@@ -117,9 +117,10 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys
pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pOperator->exprSupp.pCtx, numOfCols); pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pOperator->exprSupp.pCtx, numOfCols);
setOperatorInfo(pOperator, "ProjectOperator", QUERY_NODE_PHYSICAL_PLAN_PROJECT, false, OP_NOT_OPENED, pInfo, pTaskInfo); setOperatorInfo(pOperator, "ProjectOperator", QUERY_NODE_PHYSICAL_PLAN_PROJECT, false, OP_NOT_OPENED, pInfo,
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doProjectOperation, NULL, pTaskInfo);
destroyProjectOperatorInfo, optrDefaultBufFn, NULL); pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doProjectOperation, NULL, destroyProjectOperatorInfo,
optrDefaultBufFn, NULL);
code = appendDownstream(pOperator, &downstream, 1); code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
...@@ -220,7 +221,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { ...@@ -220,7 +221,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
blockDataCleanup(pFinalRes); blockDataCleanup(pFinalRes);
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
if (pTaskInfo->streamInfo.pReq) { if (pTaskInfo->streamInfo.submit.msgStr) {
pOperator->status = OP_OPENED; pOperator->status = OP_OPENED;
} }
...@@ -414,8 +415,10 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy ...@@ -414,8 +415,10 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy
pInfo->binfo.pRes = pResBlock; pInfo->binfo.pRes = pResBlock;
pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pSup->pCtx, numOfExpr); pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pSup->pCtx, numOfExpr);
setOperatorInfo(pOperator, "IndefinitOperator", QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC, false, OP_NOT_OPENED, pInfo, pTaskInfo); setOperatorInfo(pOperator, "IndefinitOperator", QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC, false, OP_NOT_OPENED, pInfo,
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doApplyIndefinitFunction, NULL, destroyIndefinitOperatorInfo, optrDefaultBufFn, NULL); pTaskInfo);
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doApplyIndefinitFunction, NULL, destroyIndefinitOperatorInfo,
optrDefaultBufFn, NULL);
code = appendDownstream(pOperator, &downstream, 1); code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
......
...@@ -1525,10 +1525,9 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { ...@@ -1525,10 +1525,9 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
qDebug("queue scan called"); qDebug("queue scan called");
if (pTaskInfo->streamInfo.pReq != NULL) { if (pTaskInfo->streamInfo.submit.msgStr != NULL) {
if (pInfo->tqReader->msg2.msgStr == NULL) { if (pInfo->tqReader->msg2.msgStr == NULL) {
/*pInfo->tqReader->pMsg = pTaskInfo->streamInfo.pReq;*/ /*pInfo->tqReader->pMsg = pTaskInfo->streamInfo.pReq;*/
pInfo->tqReader->ver = pTaskInfo->streamInfo.scanVer;
/*const SSubmitReq* pSubmit = pInfo->tqReader->pMsg;*/ /*const SSubmitReq* pSubmit = pInfo->tqReader->pMsg;*/
/*if (tqReaderSetDataMsg(pInfo->tqReader, pSubmit, 0) < 0) {*/ /*if (tqReaderSetDataMsg(pInfo->tqReader, pSubmit, 0) < 0) {*/
...@@ -1538,7 +1537,6 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { ...@@ -1538,7 +1537,6 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
qError("submit msg messed up when initing stream submit block %p", submit.msgStr); qError("submit msg messed up when initing stream submit block %p", submit.msgStr);
pInfo->tqReader->msg2 = (SPackedSubmit){0}; pInfo->tqReader->msg2 = (SPackedSubmit){0};
pInfo->tqReader->setMsg = 0; pInfo->tqReader->setMsg = 0;
pTaskInfo->streamInfo.pReq = NULL;
ASSERT(0); ASSERT(0);
} }
} }
...@@ -1562,7 +1560,8 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { ...@@ -1562,7 +1560,8 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
} }
} }
pTaskInfo->streamInfo.pReq = NULL; pInfo->tqReader->msg2 = (SPackedSubmit){0};
pInfo->tqReader->setMsg = 0;
return NULL; return NULL;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册