提交 8cfba403 编写于 作者: M Minglei Jin

Merge branch 'refact/submit_req' of https://github.com/taosdata/TDengine into refact/submit_req

......@@ -3273,6 +3273,12 @@ void tDestroySSubmitRsp2(SSubmitRsp2* pRsp, int32_t flag);
#define TSDB_MSG_FLG_ENCODE 0x1
#define TSDB_MSG_FLG_DECODE 0x2
typedef struct {
void* msgStr;
int32_t msgLen;
int64_t ver;
} SPackedSubmit;
#pragma pack(pop)
#ifdef __cplusplus
......
......@@ -190,7 +190,9 @@ int32_t qStreamPrepareTsdbScan(qTaskInfo_t tinfo, uint64_t uid, int64_t ts);
int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType);
int32_t qStreamScanMemData(qTaskInfo_t tinfo, const SSubmitReq* pReq, int64_t ver);
// int32_t qStreamScanMemData(qTaskInfo_t tinfo, const SSubmitReq* pReq, int64_t ver);
//
int32_t qStreamSetScanMemData(qTaskInfo_t tinfo, SPackedSubmit submit);
int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset);
......
......@@ -103,6 +103,7 @@ typedef struct {
int8_t type;
} SStreamQueueItem;
#if 0
typedef struct {
int8_t type;
int64_t ver;
......@@ -116,6 +117,21 @@ typedef struct {
SArray* dataRefs; // SArray<int32_t*>
SArray* reqs; // SArray<SSubmitReq*>
} SStreamMergedSubmit;
#endif
typedef struct {
int8_t type;
int64_t ver;
int32_t* dataRef;
SPackedSubmit submit;
} SStreamDataSubmit2;
typedef struct {
int8_t type;
int64_t ver;
SArray* dataRefs; // SArray<int32_t*>
SArray* submits; // SArray<SPackedSubmit>
} SStreamMergedSubmit2;
typedef struct {
int8_t type;
......@@ -219,11 +235,11 @@ static FORCE_INLINE void* streamQueueNextItem(SStreamQueue* queue) {
}
}
SStreamDataSubmit* streamDataSubmitNew(SSubmitReq* pReq);
SStreamDataSubmit2* streamDataSubmitNew(SPackedSubmit submit);
void streamDataSubmitRefDec(SStreamDataSubmit* pDataSubmit);
void streamDataSubmitRefDec(SStreamDataSubmit2* pDataSubmit);
SStreamDataSubmit* streamSubmitRefClone(SStreamDataSubmit* pSubmit);
SStreamDataSubmit2* streamSubmitRefClone(SStreamDataSubmit2* pSubmit);
typedef struct {
char* qmsg;
......@@ -355,14 +371,15 @@ void tFreeSStreamTask(SStreamTask* pTask);
static FORCE_INLINE int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem* pItem) {
if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
SStreamDataSubmit* pSubmitClone = streamSubmitRefClone((SStreamDataSubmit*)pItem);
SStreamDataSubmit2* pSubmitClone = streamSubmitRefClone((SStreamDataSubmit2*)pItem);
if (pSubmitClone == NULL) {
qDebug("task %d %p submit enqueue failed since out of memory", pTask->taskId, pTask);
terrno = TSDB_CODE_OUT_OF_MEMORY;
atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED);
return -1;
}
qDebug("task %d %p submit enqueue %p %p %p", pTask->taskId, pTask, pItem, pSubmitClone, pSubmitClone->data);
qDebug("task %d %p submit enqueue %p %p %p %d %" PRId64, pTask->taskId, pTask, pItem, pSubmitClone,
pSubmitClone->submit.msgStr, pSubmitClone->submit.msgLen, pSubmitClone->submit.ver);
taosWriteQitem(pTask->inputQueue->queue, pSubmitClone);
// qStreamInput(pTask->exec.executor, pSubmitClone);
} else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE ||
......
......@@ -220,12 +220,6 @@ typedef struct SSnapContext {
bool queryMetaOrData; // true-get meta, false-get data
} SSnapContext;
typedef struct {
void *msgStr;
int32_t msgLen;
int64_t ver;
} SPackedSubmit;
typedef struct STqReader {
const SSubmitReq *pMsg;
// SSubmitBlk *pBlock;
......@@ -234,8 +228,10 @@ typedef struct STqReader {
int64_t ver;
SPackedSubmit msg2;
SSubmitReq2 *pSubmit;
int32_t nextBlk;
int8_t setMsg;
SSubmitReq2 submit;
int32_t nextBlk;
int64_t lastBlkUid;
......
......@@ -182,7 +182,8 @@ int32_t tqOffsetCommitFile(STqOffsetStore* pStore);
// tqSink
// void tqSinkToTableMerge(SStreamTask* pTask, void* vnode, int64_t ver, void* data);
void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* data);
// void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* data);
void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* data);
// tqOffset
char* tqOffsetBuildFName(const char* path, int32_t fVer);
......
......@@ -180,7 +180,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t version, char* msg, int32_t msg
int32_t tqProcessTaskDropReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
int32_t tqProcessSubmitReq(STQ* pTq, SSubmitReq* data, int64_t ver);
int32_t tqProcessSubmitReq(STQ* pTq, SPackedSubmit submit);
int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver);
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec);
......
......@@ -953,7 +953,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
pTask->smaSink.smaSink = smaHandleRes;
} else if (pTask->outputType == TASK_OUTPUT__TABLE) {
pTask->tbSink.vnode = pTq->pVnode;
pTask->tbSink.tbSinkFunc = tqSinkToTablePipeline;
pTask->tbSink.tbSinkFunc = tqSinkToTablePipeline2;
ASSERT(pTask->tbSink.pSchemaWrapper);
ASSERT(pTask->tbSink.pSchemaWrapper->pSchema);
......@@ -1334,12 +1334,12 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) {
return 0;
}
int32_t tqProcessSubmitReq(STQ* pTq, SSubmitReq* pReq, int64_t ver) {
void* pIter = NULL;
bool failed = false;
SStreamDataSubmit* pSubmit = NULL;
int32_t tqProcessSubmitReq(STQ* pTq, SPackedSubmit submit) {
void* pIter = NULL;
bool failed = false;
SStreamDataSubmit2* pSubmit = NULL;
pSubmit = streamDataSubmitNew(pReq);
pSubmit = streamDataSubmitNew(submit);
if (pSubmit == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tqError("failed to create data submit for stream since out of memory");
......@@ -1356,7 +1356,7 @@ int32_t tqProcessSubmitReq(STQ* pTq, SSubmitReq* pReq, int64_t ver) {
continue;
}
tqDebug("data submit enqueue stream task: %d, ver: %" PRId64, pTask->taskId, ver);
tqDebug("data submit enqueue stream task: %d, ver: %" PRId64, pTask->taskId, submit.ver);
if (!failed) {
if (streamTaskInput(pTask, (SStreamQueueItem*)pSubmit) < 0) {
......
......@@ -213,7 +213,11 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_
#endif
int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) {
tqDebug("vgId:%d, tq push msg ver %" PRId64 ", type: %s", pTq->pVnode->config.vgId, ver, TMSG_INFO(msgType));
void* pReq = POINTER_SHIFT(msg, sizeof(SMsgHead));
int32_t len = msgLen - sizeof(SMsgHead);
tqDebug("vgId:%d, tq push msg ver %" PRId64 ", type: %s, p head %p, p body %p, len %d", pTq->pVnode->config.vgId, ver,
TMSG_INFO(msgType), msg, pReq, len);
if (msgType == TDMT_VND_SUBMIT) {
// lock push mgr to avoid potential msg lost
......@@ -222,7 +226,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
if (taosHashGetSize(pTq->pPushMgr) != 0) {
SArray* cachedKeys = taosArrayInit(0, sizeof(void*));
SArray* cachedKeyLens = taosArrayInit(0, sizeof(size_t));
void* data = taosMemoryMalloc(msgLen);
void* data = taosMemoryMalloc(len);
if (data == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tqError("failed to copy data for stream since out of memory");
......@@ -230,9 +234,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
taosArrayDestroy(cachedKeyLens);
return -1;
}
memcpy(data, msg, msgLen);
SSubmitReq* pReq = (SSubmitReq*)data;
pReq->version = ver;
memcpy(data, pReq, len);
void* pIter = NULL;
while (1) {
......@@ -256,7 +258,12 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
SMqDataRsp* pRsp = &pPushEntry->dataRsp;
// prepare scan mem data
qStreamScanMemData(task, pReq, ver);
SPackedSubmit submit = {
.msgStr = data,
.msgLen = len,
.ver = ver,
};
qStreamSetScanMemData(task, submit);
// exec
while (1) {
......@@ -310,17 +317,22 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
if (vnodeIsRoleLeader(pTq->pVnode)) {
if (taosHashGetSize(pTq->pStreamMeta->pTasks) == 0) return 0;
if (msgType == TDMT_VND_SUBMIT) {
void* data = taosMemoryMalloc(msgLen);
void* data = taosMemoryMalloc(len);
if (data == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tqError("failed to copy data for stream since out of memory");
return -1;
}
memcpy(data, msg, msgLen);
SSubmitReq* pReq = (SSubmitReq*)data;
pReq->version = ver;
memcpy(data, pReq, len);
SPackedSubmit submit = {
.msgStr = data,
.msgLen = len,
.ver = ver,
};
tqDebug("tq copy write msg %p %d %" PRId64 " from %p", data, len, ver, pReq);
tqProcessSubmitReq(pTq, data, ver);
tqProcessSubmitReq(pTq, submit);
}
if (msgType == TDMT_VND_DELETE) {
tqProcessDelReq(pTq, POINTER_SHIFT(msg, sizeof(SMsgHead)), msgLen - sizeof(SMsgHead), ver);
......
......@@ -306,7 +306,7 @@ int32_t tqSeekVer(STqReader* pReader, int64_t ver) {
}
int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) {
bool fromProcessedMsg = pReader->pMsg != NULL;
bool fromProcessedMsg = pReader->msg2.msgStr != NULL;
while (1) {
if (!fromProcessedMsg) {
......@@ -381,15 +381,22 @@ int32_t tqReaderSetDataMsg(STqReader* pReader, const SSubmitReq* pMsg, int64_t v
int32_t tqReaderSetSubmitReq2(STqReader* pReader, void* msgStr, int32_t msgLen, int64_t ver) {
ASSERT(pReader->msg2.msgStr == NULL);
ASSERT(msgStr);
ASSERT(msgLen);
pReader->msg2.msgStr = msgStr;
pReader->msg2.msgLen = msgLen;
pReader->msg2.ver = ver;
if (pReader->pSubmit == NULL) {
tqDebug("tq reader set msg %p", msgStr);
if (pReader->setMsg == 0) {
SDecoder decoder;
tDecoderInit(&decoder, pReader->msg2.msgStr, pReader->msg2.msgLen);
tDecodeSSubmitReq2(&decoder, pReader->pSubmit);
if (tDecodeSSubmitReq2(&decoder, &pReader->submit) < 0) {
ASSERT(0);
}
tDecoderClear(&decoder);
pReader->setMsg = 1;
}
return 0;
}
......@@ -422,11 +429,14 @@ bool tqNextDataBlock(STqReader* pReader) {
bool tqNextDataBlock2(STqReader* pReader) {
if (pReader->msg2.msgStr == NULL) return false;
ASSERT(pReader->pSubmit != NULL);
ASSERT(pReader->setMsg == 1);
int32_t blockSz = taosArrayGetSize(pReader->pSubmit->aSubmitTbData);
tqDebug("tq reader next data block %p, %d %" PRId64 " %d", pReader->msg2.msgStr, pReader->msg2.msgLen,
pReader->msg2.ver, pReader->nextBlk);
int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData);
while (pReader->nextBlk < blockSz) {
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->pSubmit->aSubmitTbData, pReader->nextBlk);
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
if (pReader->tbIdHash == NULL) return true;
void* ret = taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t));
......@@ -435,8 +445,8 @@ bool tqNextDataBlock2(STqReader* pReader) {
}
}
tDestroySSubmitReq2(pReader->pSubmit, TSDB_MSG_FLG_DECODE);
pReader->pSubmit = NULL;
tDestroySSubmitReq2(&pReader->submit, TSDB_MSG_FLG_DECODE);
pReader->setMsg = 0;
pReader->nextBlk = 0;
pReader->msg2.msgStr = NULL;
......@@ -445,11 +455,11 @@ bool tqNextDataBlock2(STqReader* pReader) {
bool tqNextDataBlockFilterOut2(STqReader* pReader, SHashObj* filterOutUids) {
if (pReader->msg2.msgStr == NULL) return false;
ASSERT(pReader->pSubmit != NULL);
ASSERT(pReader->setMsg == 1);
int32_t blockSz = taosArrayGetSize(pReader->pSubmit->aSubmitTbData);
int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData);
while (pReader->nextBlk < blockSz) {
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->pSubmit->aSubmitTbData, pReader->nextBlk);
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
if (pReader->tbIdHash == NULL) return true;
void* ret = taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t));
......@@ -458,8 +468,8 @@ bool tqNextDataBlockFilterOut2(STqReader* pReader, SHashObj* filterOutUids) {
}
}
tDestroySSubmitReq2(pReader->pSubmit, TSDB_MSG_FLG_DECODE);
pReader->pSubmit = NULL;
tDestroySSubmitReq2(&pReader->submit, TSDB_MSG_FLG_DECODE);
pReader->setMsg = 0;
pReader->nextBlk = 0;
pReader->msg2.msgStr = NULL;
......@@ -548,10 +558,12 @@ int32_t tqScanSubmitSplit(SArray* pBlocks, SArray* schemas, STqReader* pReader)
#endif
int32_t tqRetrieveDataBlock2(SSDataBlock* pBlock, STqReader* pReader) {
int32_t blockSz = taosArrayGetSize(pReader->pSubmit->aSubmitTbData);
int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData);
ASSERT(pReader->nextBlk < blockSz);
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->pSubmit->aSubmitTbData, pReader->nextBlk);
tqDebug("tq reader retrieve data block %p, %d", pReader->msg2.msgStr, pReader->nextBlk);
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
pReader->nextBlk++;
int32_t sversion = pSubmitTbData->sver;
......@@ -672,7 +684,7 @@ int32_t tqRetrieveDataBlock2(SSDataBlock* pBlock, STqReader* pReader) {
} else {
val = &colVal.value.val;
}
if (colDataAppend(pColData, i, val, colVal.type != TD_VTYPE_NORM) < 0) {
if (colDataAppend(pColData, i, val, !COL_VAL_IS_VALUE(&colVal)) < 0) {
goto FAIL;
}
}
......@@ -686,14 +698,14 @@ int32_t tqRetrieveDataBlock2(SSDataBlock* pBlock, STqReader* pReader) {
SArray* pRows = pSubmitTbData->aRowP;
for (int32_t i = 0; i < numOfRows; i++) {
SRow* pRow = taosArrayGet(pRows, i);
SRow* pRow = taosArrayGetP(pRows, i);
int32_t targetIdx = 0;
int32_t sourceIdx = 0;
for (int32_t j = 0; j < colActual; j++) {
SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, j);
while (1) {
ASSERT(sourceIdx < numOfRows);
ASSERT(sourceIdx < pTschema->numOfCols);
SColVal colVal;
tRowGet(pRow, pTschema, sourceIdx, &colVal);
......@@ -707,7 +719,7 @@ int32_t tqRetrieveDataBlock2(SSDataBlock* pBlock, STqReader* pReader) {
} else {
val = &colVal.value.val;
}
if (colDataAppend(pColData, i, val, colVal.type != TD_VTYPE_NORM) < 0) {
if (colDataAppend(pColData, i, val, !COL_VAL_IS_VALUE(&colVal)) < 0) {
goto FAIL;
}
......
......@@ -957,7 +957,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
if (NULL == pBuf) {
goto _end;
}
((SMsgHead*)pBuf)->vgId = htonl(TD_VID(pVnode));
((SMsgHead*)pBuf)->vgId = TD_VID(pVnode);
((SMsgHead*)pBuf)->contLen = htonl(len);
tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SMsgHead)), len - sizeof(SMsgHead));
if (tEncodeSSubmitReq2(&encoder, pReq) < 0) {
......@@ -969,7 +969,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
SRpcMsg msg = {
.msgType = TDMT_VND_SUBMIT,
.pCont = pBuf,
.contLen = ntohl(len),
.contLen = len,
};
if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) {
......
......@@ -101,6 +101,7 @@ int32_t vnodePreProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
}
if (flags & SUBMIT_REQ_AUTO_CREATE_TABLE) {
// SVCreateTbReq
if (tStartDecode(&dc) < 0) {
code = TSDB_CODE_INVALID_MSG;
goto _err;
......@@ -126,6 +127,15 @@ int32_t vnodePreProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
*(int64_t *)(dc.data + dc.pos + 8) = ctime;
tEndDecode(&dc);
// SSubmitTbData
int64_t suid;
if (tDecodeI64(&dc, &suid) < 0) {
code = TSDB_CODE_INVALID_MSG;
goto _err;
}
*(int64_t *)(dc.data + dc.pos) = uid;
}
tEndDecode(&dc);
......@@ -871,6 +881,8 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
pRsp->code = TSDB_CODE_SUCCESS;
vDebug("vvvvvvvvvvvv %p, %d", pReq, len);
// decode
SDecoder dc = {0};
tDecoderInit(&dc, pReq, len);
......
......@@ -202,7 +202,7 @@ typedef struct SOperatorFpSet {
__optr_fn_t getNextFn;
__optr_fn_t cleanupFn; // call this function to release the allocated resources ASAP
__optr_close_fn_t closeFn;
__optr_reqBuf_fn_t reqBufFn; // total used buffer for blocking operator
__optr_reqBuf_fn_t reqBufFn; // total used buffer for blocking operator
__optr_encode_fn_t encodeResultRow;
__optr_decode_fn_t decodeResultRow;
__optr_explain_fn_t getExplainFn;
......@@ -503,8 +503,8 @@ typedef struct STableCountScanOperatorInfo {
STableCountScanSupp supp;
int32_t currGrpIdx;
SArray* stbUidList; // when group by db_name and/or stable_name
int32_t currGrpIdx;
SArray* stbUidList; // when group by db_name and/or stable_name
} STableCountScanOperatorInfo;
typedef struct SOptrBasicInfo {
......
......@@ -115,7 +115,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
if (type == STREAM_INPUT__MERGED_SUBMIT) {
// ASSERT(numOfBlocks > 1);
for (int32_t i = 0; i < numOfBlocks; i++) {
SSubmitReq* pReq = *(void**)POINTER_SHIFT(input, i * sizeof(void*));
SPackedSubmit* pReq = *(void**)POINTER_SHIFT(input, i * sizeof(SPackedSubmit));
taosArrayPush(pInfo->pBlockLists, &pReq);
}
pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
......@@ -125,8 +125,11 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
} else if (type == STREAM_INPUT__DATA_BLOCK) {
for (int32_t i = 0; i < numOfBlocks; ++i) {
SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i];
taosArrayPush(pInfo->pBlockLists, &pDataBlock);
SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i];
SPackedSubmit tmp = {
.msgStr = pDataBlock,
};
taosArrayPush(pInfo->pBlockLists, &tmp);
}
pInfo->blockType = STREAM_INPUT__DATA_BLOCK;
} else {
......@@ -1002,6 +1005,7 @@ int32_t initQueryTableDataCondForTmq(SQueryTableDataCond* pCond, SSnapContext* s
return TSDB_CODE_SUCCESS;
}
#if 0
int32_t qStreamScanMemData(qTaskInfo_t tinfo, const SSubmitReq* pReq, int64_t scanVer) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE);
......@@ -1010,6 +1014,7 @@ int32_t qStreamScanMemData(qTaskInfo_t tinfo, const SSubmitReq* pReq, int64_t sc
pTaskInfo->streamInfo.scanVer = scanVer;
return 0;
}
#endif
int32_t qStreamSetScanMemData(qTaskInfo_t tinfo, SPackedSubmit submit) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
......
......@@ -768,8 +768,8 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
tableListGetGroupList(pTaskInfo->pTableInfoList, pInfo->currentGroupId, &pList, &num);
ASSERT(pInfo->base.dataReader == NULL);
int32_t code = tsdbReaderOpen(pInfo->base.readHandle.vnode, &pInfo->base.cond, pList, num,
pInfo->pResBlock, (STsdbReader**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo));
int32_t code = tsdbReaderOpen(pInfo->base.readHandle.vnode, &pInfo->base.cond, pList, num, pInfo->pResBlock,
(STsdbReader**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo));
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code);
}
......@@ -986,8 +986,8 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU
SSDataBlock* pBlock = pTableScanInfo->pResBlock;
STsdbReader* pReader = NULL;
int32_t code = tsdbReaderOpen(pTableScanInfo->base.readHandle.vnode, &cond, &tblInfo, 1, pBlock, (STsdbReader**)&pReader,
GET_TASKID(pTaskInfo));
int32_t code = tsdbReaderOpen(pTableScanInfo->base.readHandle.vnode, &cond, &tblInfo, 1, pBlock,
(STsdbReader**)&pReader, GET_TASKID(pTaskInfo));
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
T_LONG_JMP(pTaskInfo->env, code);
......@@ -995,7 +995,7 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU
}
if (tsdbNextDataBlock(pReader)) {
/*SSDataBlock* p = */tsdbRetrieveDataBlock(pReader, NULL);
/*SSDataBlock* p = */ tsdbRetrieveDataBlock(pReader, NULL);
doSetTagColumnData(&pTableScanInfo->base, pBlock, pTaskInfo, pBlock->info.rows);
pBlock->info.id.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.id.uid);
}
......@@ -1224,7 +1224,7 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS
SColumnInfoData* pSrcUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX);
SColumnInfoData* pSrcGpCol = taosArrayGet(pSrcBlock->pDataBlock, GROUPID_COLUMN_INDEX);
uint64_t* srcUidData = (uint64_t*)pSrcUidCol->pData;
uint64_t* srcUidData = (uint64_t*)pSrcUidCol->pData;
ASSERT(pSrcStartTsCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
TSKEY* srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData;
TSKEY* srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData;
......@@ -1526,17 +1526,18 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
qDebug("queue scan called");
if (pTaskInfo->streamInfo.pReq != NULL) {
if (pInfo->tqReader->pMsg == NULL) {
pInfo->tqReader->pMsg = pTaskInfo->streamInfo.pReq;
if (pInfo->tqReader->msg2.msgStr == NULL) {
/*pInfo->tqReader->pMsg = pTaskInfo->streamInfo.pReq;*/
pInfo->tqReader->ver = pTaskInfo->streamInfo.scanVer;
const SSubmitReq* pSubmit = pInfo->tqReader->pMsg;
/*const SSubmitReq* pSubmit = pInfo->tqReader->pMsg;*/
/*if (tqReaderSetDataMsg(pInfo->tqReader, pSubmit, 0) < 0) {*/
/*void* msgStr = pTaskInfo->streamInfo.*/
SPackedSubmit submit = pTaskInfo->streamInfo.submit;
if (tqReaderSetSubmitReq2(pInfo->tqReader, submit.msgStr, submit.msgLen, submit.ver) < 0) {
qError("submit msg messed up when initing stream submit block %p", pSubmit);
pInfo->tqReader->pMsg = NULL;
qError("submit msg messed up when initing stream submit block %p", submit.msgStr);
pInfo->tqReader->msg2 = (SPackedSubmit){0};
pInfo->tqReader->setMsg = 0;
pTaskInfo->streamInfo.pReq = NULL;
ASSERT(0);
}
......@@ -1561,7 +1562,6 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
}
}
pInfo->tqReader->pMsg = NULL;
pTaskInfo->streamInfo.pReq = NULL;
return NULL;
}
......@@ -1920,7 +1920,7 @@ FETCH_NEXT_BLOCK:
NEXT_SUBMIT_BLK:
while (1) {
if (pInfo->tqReader->pMsg == NULL) {
if (pInfo->tqReader->msg2.msgStr == NULL) {
if (pInfo->validBlockIndex >= totBlockNum) {
updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo);
doClearBufferedBlocks(pInfo);
......@@ -1928,13 +1928,12 @@ FETCH_NEXT_BLOCK:
return NULL;
}
int32_t current = pInfo->validBlockIndex++;
SSubmitReq* pSubmit = taosArrayGetP(pInfo->pBlockLists, current);
int32_t current = pInfo->validBlockIndex++;
SPackedSubmit* pSubmit = taosArrayGetP(pInfo->pBlockLists, current);
/*if (tqReaderSetDataMsg(pInfo->tqReader, pSubmit, 0) < 0) {*/
if (tqReaderSetSubmitReq2(pInfo->tqReader, pSubmit, 0, 0) < 0) {
if (tqReaderSetSubmitReq2(pInfo->tqReader, pSubmit->msgStr, pSubmit->msgLen, pSubmit->ver) < 0) {
qError("submit msg messed up when initing stream submit block %p, current %d, total %d", pSubmit, current,
totBlockNum);
pInfo->tqReader->pMsg = NULL;
continue;
}
}
......@@ -1985,7 +1984,6 @@ FETCH_NEXT_BLOCK:
if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) {
break;
} else {
pInfo->tqReader->pMsg = NULL;
continue;
}
/*blockDataCleanup(pInfo->pRes);*/
......@@ -2266,7 +2264,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
}
}
pInfo->pBlockLists = taosArrayInit(4, POINTER_BYTES);
pInfo->pBlockLists = taosArrayInit(4, sizeof(SPackedSubmit));
if (pInfo->pBlockLists == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _error;
......@@ -2286,7 +2284,8 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
if (pHandle->initTableReader) {
pTSInfo->scanMode = TABLE_SCAN__TABLE_ORDER;
pTSInfo->base.dataReader = NULL;
code = tsdbReaderOpen(pHandle->vnode, &pTSInfo->base.cond, pList, num, pTSInfo->pResBlock, &pTSInfo->base.dataReader, NULL);
code = tsdbReaderOpen(pHandle->vnode, &pTSInfo->base.cond, pList, num, pTSInfo->pResBlock,
&pTSInfo->base.dataReader, NULL);
if (code != 0) {
terrno = code;
destroyTableScanOperatorInfo(pTableScanOp);
......@@ -2356,7 +2355,8 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
__optr_fn_t nextFn = pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM ? doStreamScan : doQueueScan;
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, nextFn, NULL, destroyStreamScanOperatorInfo, optrDefaultBufFn, NULL);
pOperator->fpSet =
createOperatorFpSet(optrDummyOpenFn, nextFn, NULL, destroyStreamScanOperatorInfo, optrDefaultBufFn, NULL);
return pOperator;
......@@ -2493,7 +2493,8 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi
initResultSizeInfo(&pOperator->resultInfo, 4096);
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTagScan, NULL, destroyTagScanOperatorInfo, optrDefaultBufFn, NULL);
pOperator->fpSet =
createOperatorFpSet(optrDummyOpenFn, doTagScan, NULL, destroyTagScanOperatorInfo, optrDefaultBufFn, NULL);
return pOperator;
......@@ -2514,11 +2515,12 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
SQueryTableDataCond* pQueryCond = taosArrayGet(pInfo->queryConds, readIdx);
int64_t st = taosGetTimestampUs();
void* p = tableListGetInfo(pTaskInfo->pTableInfoList, readIdx + pInfo->tableStartIndex);
int64_t st = taosGetTimestampUs();
void* p = tableListGetInfo(pTaskInfo->pTableInfoList, readIdx + pInfo->tableStartIndex);
SReadHandle* pHandle = &pInfo->base.readHandle;
int32_t code = tsdbReaderOpen(pHandle->vnode, pQueryCond, p, 1, pBlock, &pInfo->base.dataReader, GET_TASKID(pTaskInfo));
int32_t code =
tsdbReaderOpen(pHandle->vnode, pQueryCond, p, 1, pBlock, &pInfo->base.dataReader, GET_TASKID(pTaskInfo));
if (code != 0) {
T_LONG_JMP(pTaskInfo->env, code);
}
......@@ -2916,8 +2918,8 @@ static void buildVnodeGroupedNtbTableCount(STableCountScanOperatorInfo*
SSDataBlock* pRes, char* dbName);
static void buildVnodeFilteredTbCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
STableCountScanSupp* pSupp, SSDataBlock* pRes, char* dbName);
static void buildVnodeGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
STableCountScanSupp* pSupp, SSDataBlock* pRes, int32_t vgId, char* dbName);
static void buildVnodeGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
STableCountScanSupp* pSupp, SSDataBlock* pRes, int32_t vgId, char* dbName);
static SSDataBlock* buildVnodeDbTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
STableCountScanSupp* pSupp, SSDataBlock* pRes);
static void buildSysDbGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
......@@ -3042,8 +3044,8 @@ SOperatorInfo* createTableCountScanOperatorInfo(SReadHandle* readHandle, STableC
setOperatorInfo(pOperator, "TableCountScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN, false, OP_NOT_OPENED,
pInfo, pTaskInfo);
pOperator->fpSet =
createOperatorFpSet(optrDummyOpenFn, doTableCountScan, NULL, destoryTableCountScanOperator, optrDefaultBufFn, NULL);
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableCountScan, NULL, destoryTableCountScanOperator,
optrDefaultBufFn, NULL);
return pOperator;
_error:
......
......@@ -66,12 +66,12 @@ int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock
return 0;
}
SStreamDataSubmit* streamDataSubmitNew(SSubmitReq* pReq) {
SStreamDataSubmit* pDataSubmit = (SStreamDataSubmit*)taosAllocateQitem(sizeof(SStreamDataSubmit), DEF_QITEM);
SStreamDataSubmit2* streamDataSubmitNew(SPackedSubmit submit) {
SStreamDataSubmit2* pDataSubmit = (SStreamDataSubmit2*)taosAllocateQitem(sizeof(SStreamDataSubmit2), DEF_QITEM);
if (pDataSubmit == NULL) return NULL;
pDataSubmit->dataRef = (int32_t*)taosMemoryMalloc(sizeof(int32_t));
if (pDataSubmit->dataRef == NULL) goto FAIL;
pDataSubmit->data = pReq;
pDataSubmit->submit = submit;
*pDataSubmit->dataRef = 1;
pDataSubmit->type = STREAM_INPUT__DATA_SUBMIT;
return pDataSubmit;
......@@ -80,47 +80,47 @@ FAIL:
return NULL;
}
SStreamMergedSubmit* streamMergedSubmitNew() {
SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)taosAllocateQitem(sizeof(SStreamMergedSubmit), DEF_QITEM);
SStreamMergedSubmit2* streamMergedSubmitNew() {
SStreamMergedSubmit2* pMerged = (SStreamMergedSubmit2*)taosAllocateQitem(sizeof(SStreamMergedSubmit2), DEF_QITEM);
if (pMerged == NULL) return NULL;
pMerged->reqs = taosArrayInit(0, sizeof(void*));
pMerged->submits = taosArrayInit(0, sizeof(SPackedSubmit));
pMerged->dataRefs = taosArrayInit(0, sizeof(void*));
if (pMerged->dataRefs == NULL || pMerged->reqs == NULL) goto FAIL;
if (pMerged->dataRefs == NULL || pMerged->submits == NULL) goto FAIL;
pMerged->type = STREAM_INPUT__MERGED_SUBMIT;
return pMerged;
FAIL:
if (pMerged->reqs) taosArrayDestroy(pMerged->reqs);
if (pMerged->submits) taosArrayDestroy(pMerged->submits);
if (pMerged->dataRefs) taosArrayDestroy(pMerged->dataRefs);
taosFreeQitem(pMerged);
return NULL;
}
int32_t streamMergeSubmit(SStreamMergedSubmit* pMerged, SStreamDataSubmit* pSubmit) {
int32_t streamMergeSubmit(SStreamMergedSubmit2* pMerged, SStreamDataSubmit2* pSubmit) {
taosArrayPush(pMerged->dataRefs, &pSubmit->dataRef);
taosArrayPush(pMerged->reqs, &pSubmit->data);
taosArrayPush(pMerged->submits, &pSubmit->submit);
pMerged->ver = pSubmit->ver;
return 0;
}
static FORCE_INLINE void streamDataSubmitRefInc(SStreamDataSubmit* pDataSubmit) {
static FORCE_INLINE void streamDataSubmitRefInc(SStreamDataSubmit2* pDataSubmit) {
atomic_add_fetch_32(pDataSubmit->dataRef, 1);
}
SStreamDataSubmit* streamSubmitRefClone(SStreamDataSubmit* pSubmit) {
SStreamDataSubmit* pSubmitClone = taosAllocateQitem(sizeof(SStreamDataSubmit), DEF_QITEM);
SStreamDataSubmit2* streamSubmitRefClone(SStreamDataSubmit2* pSubmit) {
SStreamDataSubmit2* pSubmitClone = taosAllocateQitem(sizeof(SStreamDataSubmit2), DEF_QITEM);
if (pSubmitClone == NULL) {
return NULL;
}
streamDataSubmitRefInc(pSubmit);
memcpy(pSubmitClone, pSubmit, sizeof(SStreamDataSubmit));
memcpy(pSubmitClone, pSubmit, sizeof(SStreamDataSubmit2));
return pSubmitClone;
}
void streamDataSubmitRefDec(SStreamDataSubmit* pDataSubmit) {
void streamDataSubmitRefDec(SStreamDataSubmit2* pDataSubmit) {
int32_t ref = atomic_sub_fetch_32(pDataSubmit->dataRef, 1);
ASSERT(ref >= 0);
if (ref == 0) {
taosMemoryFree(pDataSubmit->data);
taosMemoryFree(pDataSubmit->submit.msgStr);
taosMemoryFree(pDataSubmit->dataRef);
}
}
......@@ -135,16 +135,16 @@ SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem*
taosFreeQitem(elem);
return dst;
} else if (dst->type == STREAM_INPUT__MERGED_SUBMIT && elem->type == STREAM_INPUT__DATA_SUBMIT) {
SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)dst;
SStreamDataSubmit* pBlockSrc = (SStreamDataSubmit*)elem;
SStreamMergedSubmit2* pMerged = (SStreamMergedSubmit2*)dst;
SStreamDataSubmit2* pBlockSrc = (SStreamDataSubmit2*)elem;
streamMergeSubmit(pMerged, pBlockSrc);
taosFreeQitem(elem);
return dst;
} else if (dst->type == STREAM_INPUT__DATA_SUBMIT && elem->type == STREAM_INPUT__DATA_SUBMIT) {
SStreamMergedSubmit* pMerged = streamMergedSubmitNew();
SStreamMergedSubmit2* pMerged = streamMergedSubmitNew();
ASSERT(pMerged);
streamMergeSubmit(pMerged, (SStreamDataSubmit*)dst);
streamMergeSubmit(pMerged, (SStreamDataSubmit*)elem);
streamMergeSubmit(pMerged, (SStreamDataSubmit2*)dst);
streamMergeSubmit(pMerged, (SStreamDataSubmit2*)elem);
taosFreeQitem(dst);
taosFreeQitem(elem);
return (SStreamQueueItem*)pMerged;
......@@ -162,22 +162,22 @@ void streamFreeQitem(SStreamQueueItem* data) {
taosArrayDestroyEx(((SStreamDataBlock*)data)->blocks, (FDelete)blockDataFreeRes);
taosFreeQitem(data);
} else if (type == STREAM_INPUT__DATA_SUBMIT) {
streamDataSubmitRefDec((SStreamDataSubmit*)data);
streamDataSubmitRefDec((SStreamDataSubmit2*)data);
taosFreeQitem(data);
} else if (type == STREAM_INPUT__MERGED_SUBMIT) {
SStreamMergedSubmit* pMerge = (SStreamMergedSubmit*)data;
int32_t sz = taosArrayGetSize(pMerge->reqs);
SStreamMergedSubmit2* pMerge = (SStreamMergedSubmit2*)data;
int32_t sz = taosArrayGetSize(pMerge->submits);
for (int32_t i = 0; i < sz; i++) {
int32_t* pRef = taosArrayGetP(pMerge->dataRefs, i);
int32_t ref = atomic_sub_fetch_32(pRef, 1);
ASSERT(ref >= 0);
if (ref == 0) {
void* dataStr = taosArrayGetP(pMerge->reqs, i);
taosMemoryFree(dataStr);
SPackedSubmit* pSubmit = (SPackedSubmit*)taosArrayGet(pMerge->submits, i);
taosMemoryFree(pSubmit->msgStr);
taosMemoryFree(pRef);
}
}
taosArrayDestroy(pMerge->reqs);
taosArrayDestroy(pMerge->submits);
taosArrayDestroy(pMerge->dataRefs);
taosFreeQitem(pMerge);
} else if (type == STREAM_INPUT__REF_DATA_BLOCK) {
......
......@@ -26,17 +26,18 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray*
qSetMultiStreamInput(exec, pTrigger->pBlock, 1, STREAM_INPUT__DATA_BLOCK);
} else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE);
const SStreamDataSubmit* pSubmit = (const SStreamDataSubmit*)data;
qDebug("task %d %p set submit input %p %p %d 1", pTask->taskId, pTask, pSubmit, pSubmit->data, *pSubmit->dataRef);
qSetMultiStreamInput(exec, pSubmit->data, 1, STREAM_INPUT__DATA_SUBMIT);
const SStreamDataSubmit2* pSubmit = (const SStreamDataSubmit2*)data;
qDebug("task %d %p set submit input %p %p %d %" PRId64, pTask->taskId, pTask, pSubmit, pSubmit->submit.msgStr,
pSubmit->submit.msgLen, pSubmit->submit.ver);
qSetMultiStreamInput(exec, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT);
} else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
const SStreamDataBlock* pBlock = (const SStreamDataBlock*)data;
SArray* blocks = pBlock->blocks;
qDebug("task %d %p set ssdata input", pTask->taskId, pTask);
qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_INPUT__DATA_BLOCK);
} else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) {
const SStreamMergedSubmit* pMerged = (const SStreamMergedSubmit*)data;
SArray* blocks = pMerged->reqs;
const SStreamMergedSubmit2* pMerged = (const SStreamMergedSubmit2*)data;
SArray* blocks = pMerged->submits;
qDebug("task %d %p set submit input (merged), batch num: %d", pTask->taskId, pTask, (int32_t)blocks->size);
qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_INPUT__MERGED_SUBMIT);
} else if (pItem->type == STREAM_INPUT__REF_DATA_BLOCK) {
......@@ -244,11 +245,11 @@ int32_t streamExecForAll(SStreamTask* pTask) {
qRes->blocks = pRes;
if (((SStreamQueueItem*)input)->type == STREAM_INPUT__DATA_SUBMIT) {
SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)input;
SStreamDataSubmit2* pSubmit = (SStreamDataSubmit2*)input;
qRes->childId = pTask->selfChildId;
qRes->sourceVer = pSubmit->ver;
} else if (((SStreamQueueItem*)input)->type == STREAM_INPUT__MERGED_SUBMIT) {
SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)input;
SStreamMergedSubmit2* pMerged = (SStreamMergedSubmit2*)input;
qRes->childId = pTask->selfChildId;
qRes->sourceVer = pMerged->ver;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册