提交 619bbdfd 编写于 作者: L Liu Jicong

refactor: support submitreq2

上级 fd5fd114
...@@ -190,7 +190,7 @@ int32_t qStreamPrepareTsdbScan(qTaskInfo_t tinfo, uint64_t uid, int64_t ts); ...@@ -190,7 +190,7 @@ int32_t qStreamPrepareTsdbScan(qTaskInfo_t tinfo, uint64_t uid, int64_t ts);
int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType); int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType);
int32_t qStreamScanMemData(qTaskInfo_t tinfo, const SSubmitReq* pReq); int32_t qStreamScanMemData(qTaskInfo_t tinfo, const SSubmitReq* pReq, int64_t ver);
int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset); int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset);
......
...@@ -189,12 +189,13 @@ typedef enum ELogicConditionType { ...@@ -189,12 +189,13 @@ typedef enum ELogicConditionType {
#define TSDB_MAX_COLUMNS 4096 #define TSDB_MAX_COLUMNS 4096
#define TSDB_MIN_COLUMNS 2 // PRIMARY COLUMN(timestamp) + other columns #define TSDB_MIN_COLUMNS 2 // PRIMARY COLUMN(timestamp) + other columns
#define TSDB_NODE_NAME_LEN 64 #define TSDB_NODE_NAME_LEN 64
#define TSDB_TABLE_NAME_LEN 193 // it is a null-terminated string #define TSDB_TABLE_NAME_LEN 193 // it is a null-terminated string
#define TSDB_TOPIC_NAME_LEN 193 // it is a null-terminated string #define TSDB_TOPIC_NAME_LEN 193 // it is a null-terminated string
#define TSDB_CGROUP_LEN 193 // it is a null-terminated string #define TSDB_CGROUP_LEN 193 // it is a null-terminated string
#define TSDB_DB_NAME_LEN 65 #define TSDB_USER_CGROUP_LEN (TSDB_USER_LEN + TSDB_CGROUP_LEN) // it is a null-terminated string
#define TSDB_DB_FNAME_LEN (TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN + TSDB_NAME_DELIMITER_LEN) #define TSDB_DB_NAME_LEN 65
#define TSDB_DB_FNAME_LEN (TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN + TSDB_NAME_DELIMITER_LEN)
#define TSDB_FUNC_NAME_LEN 65 #define TSDB_FUNC_NAME_LEN 65
#define TSDB_FUNC_COMMENT_LEN 1024 * 1024 #define TSDB_FUNC_COMMENT_LEN 1024 * 1024
...@@ -280,7 +281,7 @@ typedef enum ELogicConditionType { ...@@ -280,7 +281,7 @@ typedef enum ELogicConditionType {
#define TSDB_DNODE_ROLE_MGMT 1 #define TSDB_DNODE_ROLE_MGMT 1
#define TSDB_DNODE_ROLE_VNODE 2 #define TSDB_DNODE_ROLE_VNODE 2
#define TSDB_MAX_REPLICA 5 #define TSDB_MAX_REPLICA 5
#define TSDB_SYNC_LOG_BUFFER_SIZE 4096 #define TSDB_SYNC_LOG_BUFFER_SIZE 4096
#define TSDB_TBNAME_COLUMN_INDEX (-1) #define TSDB_TBNAME_COLUMN_INDEX (-1)
......
...@@ -558,6 +558,10 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { ...@@ -558,6 +558,10 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
goto SUBSCRIBE_OVER; goto SUBSCRIBE_OVER;
} }
if (mndCheckTopicPrivilege(pMnode, pMsg->info.conn.user, MND_OPER_SUBSCRIBE, pTopic) != 0) {
goto SUBSCRIBE_OVER;
}
mndReleaseTopic(pMnode, pTopic); mndReleaseTopic(pMnode, pTopic);
} }
......
...@@ -689,15 +689,6 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { ...@@ -689,15 +689,6 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
sdbRelease(pSdb, pConsumer); sdbRelease(pSdb, pConsumer);
} }
#if 0
if (pTopic->refConsumerCnt != 0) {
mndReleaseTopic(pMnode, pTopic);
terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED;
mError("topic:%s, failed to drop since %s", dropReq.name, terrstr());
return -1;
}
#endif
if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_READ_DB, pTopic->db) != 0) { if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_READ_DB, pTopic->db) != 0) {
return -1; return -1;
} }
......
...@@ -211,12 +211,24 @@ typedef struct SSnapContext { ...@@ -211,12 +211,24 @@ typedef struct SSnapContext {
bool queryMetaOrData; // true-get meta, false-get data bool queryMetaOrData; // true-get meta, false-get data
} SSnapContext; } SSnapContext;
typedef struct {
void *msgStr;
int32_t msgLen;
int64_t ver;
} SPackedSubmit;
typedef struct STqReader { typedef struct STqReader {
int64_t ver;
const SSubmitReq *pMsg; const SSubmitReq *pMsg;
SSubmitBlk *pBlock; // SSubmitBlk *pBlock;
SSubmitMsgIter msgIter; // SSubmitMsgIter msgIter;
SSubmitBlkIter blkIter; // SSubmitBlkIter blkIter;
int64_t ver;
SPackedSubmit msg2;
SSubmitReq2 *pSubmit;
int32_t nextBlk;
int64_t lastBlkUid;
SWalReader *pWalReader; SWalReader *pWalReader;
...@@ -241,11 +253,14 @@ int32_t tqReaderRemoveTbUidList(STqReader *pReader, const SArray *tbUidList); ...@@ -241,11 +253,14 @@ int32_t tqReaderRemoveTbUidList(STqReader *pReader, const SArray *tbUidList);
int32_t tqSeekVer(STqReader *pReader, int64_t ver); int32_t tqSeekVer(STqReader *pReader, int64_t ver);
int32_t tqNextBlock(STqReader *pReader, SFetchRet *ret); int32_t tqNextBlock(STqReader *pReader, SFetchRet *ret);
int32_t tqReaderSetDataMsg(STqReader *pReader, const SSubmitReq *pMsg, int64_t ver); int32_t tqReaderSetSubmitReq2(STqReader *pReader, void *msgStr, int32_t msgLen, int64_t ver);
bool tqNextDataBlock(STqReader *pReader); // int32_t tqReaderSetDataMsg(STqReader *pReader, const SSubmitReq *pMsg, int64_t ver);
bool tqNextDataBlockFilterOut(STqReader *pReader, SHashObj *filterOutUids); bool tqNextDataBlock2(STqReader *pReader);
int32_t tqRetrieveDataBlock(SSDataBlock *pBlock, STqReader *pReader); bool tqNextDataBlockFilterOut2(STqReader *pReader, SHashObj *filterOutUids);
int32_t tqRetrieveTaosxBlock(STqReader *pReader, SArray *blocks, SArray *schemas); int32_t tqRetrieveDataBlock2(SSDataBlock *pBlock, STqReader *pReader);
int32_t tqRetrieveTaosxBlock2(STqReader *pReader, SArray *blocks, SArray *schemas);
// int32_t tqRetrieveDataBlock(SSDataBlock *pBlock, STqReader *pReader);
// int32_t tqRetrieveTaosxBlock(STqReader *pReader, SArray *blocks, SArray *schemas);
int32_t vnodeEnqueueStreamMsg(SVnode *pVnode, SRpcMsg *pMsg); int32_t vnodeEnqueueStreamMsg(SVnode *pVnode, SRpcMsg *pMsg);
......
...@@ -153,7 +153,8 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs ...@@ -153,7 +153,8 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs
int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** pHeadWithCkSum); int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** pHeadWithCkSum);
// tqExec // tqExec
int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp* pRsp); int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedSubmit submit, STaosxRsp* pRsp);
// int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp* pRsp);
int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision); int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision);
int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp); int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp);
int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry); int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry);
......
...@@ -675,7 +675,12 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -675,7 +675,12 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
if (pHead->msgType == TDMT_VND_SUBMIT) { if (pHead->msgType == TDMT_VND_SUBMIT) {
SSubmitReq* pCont = (SSubmitReq*)&pHead->body; SSubmitReq* pCont = (SSubmitReq*)&pHead->body;
if (tqTaosxScanLog(pTq, pHandle, pCont, &taosxRsp) < 0) { SPackedSubmit submit = {
.msgStr = pCont,
.msgLen = pHead->bodyLen,
.ver = pHead->version,
};
if (tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp) < 0) {
} }
if (taosxRsp.blockNum > 0 /* threshold */) { if (taosxRsp.blockNum > 0 /* threshold */) {
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer); tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
...@@ -725,9 +730,13 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t version, char* msg, int32_t msgL ...@@ -725,9 +730,13 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t version, char* msg, int32_t msgL
} }
taosWUnLockLatch(&pTq->pushLock); taosWUnLockLatch(&pTq->pushLock);
code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey)); STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
if (code != 0) { if (pHandle) {
tqError("cannot process tq delete req %s, since no such handle", pReq->subKey); walCloseRef(pHandle->pWalReader->pWal, pHandle->pRef->refId);
code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
if (code != 0) {
tqError("cannot process tq delete req %s, since no such handle", pReq->subKey);
}
} }
code = tqOffsetDelete(pTq->pOffsetStore, pReq->subKey); code = tqOffsetDelete(pTq->pOffsetStore, pReq->subKey);
......
...@@ -110,14 +110,7 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs ...@@ -110,14 +110,7 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs
} }
ASSERT(pRsp->rspOffset.type != 0); ASSERT(pRsp->rspOffset.type != 0);
if (pRsp->withTbName) { ASSERT(pRsp->withTbName == false);
if (pRsp->rspOffset.type == TMQ_OFFSET__LOG) {
int64_t uid = pExec->pExecReader->msgIter.uid;
tqAddTbNameToRsp(pTq, uid, pRsp, 1);
} else {
pRsp->withTbName = false;
}
}
ASSERT(pRsp->withSchema == false); ASSERT(pRsp->withSchema == false);
return 0; return 0;
...@@ -154,9 +147,8 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMeta ...@@ -154,9 +147,8 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMeta
if (pDataBlock != NULL) { if (pDataBlock != NULL) {
if (pRsp->withTbName) { if (pRsp->withTbName) {
int64_t uid = 0;
if (pOffset->type == TMQ_OFFSET__LOG) { if (pOffset->type == TMQ_OFFSET__LOG) {
uid = pExec->pExecReader->msgIter.uid; int64_t uid = pExec->pExecReader->lastBlkUid;
if (tqAddTbNameToRsp(pTq, uid, (SMqDataRsp*)pRsp, 1) < 0) { if (tqAddTbNameToRsp(pTq, uid, (SMqDataRsp*)pRsp, 1) < 0) {
continue; continue;
} }
...@@ -223,7 +215,7 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMeta ...@@ -223,7 +215,7 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMeta
return 0; return 0;
} }
int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp* pRsp) { int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedSubmit submit, STaosxRsp* pRsp) {
STqExecHandle* pExec = &pHandle->execHandle; STqExecHandle* pExec = &pHandle->execHandle;
ASSERT(pExec->subType != TOPIC_SUB_TYPE__COLUMN); ASSERT(pExec->subType != TOPIC_SUB_TYPE__COLUMN);
...@@ -232,8 +224,9 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp ...@@ -232,8 +224,9 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp
if (pExec->subType == TOPIC_SUB_TYPE__TABLE) { if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
STqReader* pReader = pExec->pExecReader; STqReader* pReader = pExec->pExecReader;
tqReaderSetDataMsg(pReader, pReq, 0); /*tqReaderSetDataMsg(pReader, pReq, 0);*/
while (tqNextDataBlock(pReader)) { tqReaderSetSubmitReq2(pReader, submit.msgStr, submit.msgLen, submit.ver);
while (tqNextDataBlock2(pReader)) {
/*SSDataBlock block = {0};*/ /*SSDataBlock block = {0};*/
/*if (tqRetrieveDataBlock(&block, pReader) < 0) {*/ /*if (tqRetrieveDataBlock(&block, pReader) < 0) {*/
/*if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;*/ /*if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;*/
...@@ -241,11 +234,12 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp ...@@ -241,11 +234,12 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp
taosArrayClear(pBlocks); taosArrayClear(pBlocks);
taosArrayClear(pSchemas); taosArrayClear(pSchemas);
if (tqRetrieveTaosxBlock(pReader, pBlocks, pSchemas) < 0) { if (tqRetrieveTaosxBlock2(pReader, pBlocks, pSchemas) < 0) {
if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue; if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;
} }
if (pRsp->withTbName) { if (pRsp->withTbName) {
int64_t uid = pExec->pExecReader->msgIter.uid; /*int64_t uid = pExec->pExecReader->msgIter.uid;*/
int64_t uid = pExec->pExecReader->lastBlkUid;
if (tqAddTbNameToRsp(pTq, uid, (SMqDataRsp*)pRsp, taosArrayGetSize(pBlocks)) < 0) { if (tqAddTbNameToRsp(pTq, uid, (SMqDataRsp*)pRsp, taosArrayGetSize(pBlocks)) < 0) {
taosArrayDestroyEx(pBlocks, (FDelete)blockDataFreeRes); taosArrayDestroyEx(pBlocks, (FDelete)blockDataFreeRes);
taosArrayDestroyP(pSchemas, (FDelete)tDeleteSSchemaWrapper); taosArrayDestroyP(pSchemas, (FDelete)tDeleteSSchemaWrapper);
...@@ -255,7 +249,9 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp ...@@ -255,7 +249,9 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp
} }
} }
if (pHandle->fetchMeta) { if (pHandle->fetchMeta) {
#if 0
SSubmitBlk* pBlk = pReader->pBlock; SSubmitBlk* pBlk = pReader->pBlock;
int64_t uid = pExec->pExecReader->lastBlkUid;
int32_t schemaLen = htonl(pBlk->schemaLen); int32_t schemaLen = htonl(pBlk->schemaLen);
if (schemaLen > 0) { if (schemaLen > 0) {
if (pRsp->createTableNum == 0) { if (pRsp->createTableNum == 0) {
...@@ -268,6 +264,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp ...@@ -268,6 +264,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp
taosArrayPush(pRsp->createTableReq, &createReq); taosArrayPush(pRsp->createTableReq, &createReq);
pRsp->createTableNum++; pRsp->createTableNum++;
} }
#endif
} }
for (int32_t i = 0; i < taosArrayGetSize(pBlocks); i++) { for (int32_t i = 0; i < taosArrayGetSize(pBlocks); i++) {
SSDataBlock* pBlock = taosArrayGet(pBlocks, i); SSDataBlock* pBlock = taosArrayGet(pBlocks, i);
...@@ -281,19 +278,20 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp ...@@ -281,19 +278,20 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp
} }
} else if (pExec->subType == TOPIC_SUB_TYPE__DB) { } else if (pExec->subType == TOPIC_SUB_TYPE__DB) {
STqReader* pReader = pExec->pExecReader; STqReader* pReader = pExec->pExecReader;
tqReaderSetDataMsg(pReader, pReq, 0); /*tqReaderSetDataMsg(pReader, pReq, 0);*/
while (tqNextDataBlockFilterOut(pReader, pExec->execDb.pFilterOutTbUid)) { tqReaderSetSubmitReq2(pReader, submit.msgStr, submit.msgLen, submit.ver);
while (tqNextDataBlockFilterOut2(pReader, pExec->execDb.pFilterOutTbUid)) {
/*SSDataBlock block = {0};*/ /*SSDataBlock block = {0};*/
/*if (tqRetrieveDataBlock(&block, pReader) < 0) {*/ /*if (tqRetrieveDataBlock(&block, pReader) < 0) {*/
/*if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;*/ /*if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;*/
/*}*/ /*}*/
taosArrayClear(pBlocks); taosArrayClear(pBlocks);
taosArrayClear(pSchemas); taosArrayClear(pSchemas);
if (tqRetrieveTaosxBlock(pReader, pBlocks, pSchemas) < 0) { if (tqRetrieveTaosxBlock2(pReader, pBlocks, pSchemas) < 0) {
if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue; if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;
} }
if (pRsp->withTbName) { if (pRsp->withTbName) {
int64_t uid = pExec->pExecReader->msgIter.uid; int64_t uid = pExec->pExecReader->lastBlkUid;
if (tqAddTbNameToRsp(pTq, uid, (SMqDataRsp*)pRsp, taosArrayGetSize(pBlocks)) < 0) { if (tqAddTbNameToRsp(pTq, uid, (SMqDataRsp*)pRsp, taosArrayGetSize(pBlocks)) < 0) {
taosArrayDestroyEx(pBlocks, (FDelete)blockDataFreeRes); taosArrayDestroyEx(pBlocks, (FDelete)blockDataFreeRes);
taosArrayDestroyP(pSchemas, (FDelete)tDeleteSSchemaWrapper); taosArrayDestroyP(pSchemas, (FDelete)tDeleteSSchemaWrapper);
...@@ -303,6 +301,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp ...@@ -303,6 +301,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp
} }
} }
if (pHandle->fetchMeta) { if (pHandle->fetchMeta) {
#if 0
SSubmitBlk* pBlk = pReader->pBlock; SSubmitBlk* pBlk = pReader->pBlock;
int32_t schemaLen = htonl(pBlk->schemaLen); int32_t schemaLen = htonl(pBlk->schemaLen);
if (schemaLen > 0) { if (schemaLen > 0) {
...@@ -316,6 +315,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp ...@@ -316,6 +315,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp
taosArrayPush(pRsp->createTableReq, &createReq); taosArrayPush(pRsp->createTableReq, &createReq);
pRsp->createTableNum++; pRsp->createTableNum++;
} }
#endif
} }
/*tqAddBlockDataToRsp(&block, (SMqDataRsp*)pRsp, taosArrayGetSize(block.pDataBlock),*/ /*tqAddBlockDataToRsp(&block, (SMqDataRsp*)pRsp, taosArrayGetSize(block.pDataBlock),*/
/*pTq->pVnode->config.tsdbCfg.precision);*/ /*pTq->pVnode->config.tsdbCfg.precision);*/
......
...@@ -256,7 +256,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) ...@@ -256,7 +256,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
SMqDataRsp* pRsp = &pPushEntry->dataRsp; SMqDataRsp* pRsp = &pPushEntry->dataRsp;
// prepare scan mem data // prepare scan mem data
qStreamScanMemData(task, pReq); qStreamScanMemData(task, pReq, ver);
// exec // exec
while (1) { while (1) {
......
...@@ -264,7 +264,7 @@ STqReader* tqOpenReader(SVnode* pVnode) { ...@@ -264,7 +264,7 @@ STqReader* tqOpenReader(SVnode* pVnode) {
} }
pReader->pVnodeMeta = pVnode->pMeta; pReader->pVnodeMeta = pVnode->pMeta;
pReader->pMsg = NULL; /*pReader->pMsg = NULL;*/
pReader->ver = -1; pReader->ver = -1;
pReader->pColIdList = NULL; pReader->pColIdList = NULL;
pReader->cachedSchemaVer = 0; pReader->cachedSchemaVer = 0;
...@@ -320,7 +320,9 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) { ...@@ -320,7 +320,9 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) {
ASSERT(ret->offset.version >= 0); ASSERT(ret->offset.version >= 0);
return -1; return -1;
} }
void* body = pReader->pWalReader->pHead->head.body; void* body = pReader->pWalReader->pHead->head.body;
int32_t bodyLen = pReader->pWalReader->pHead->head.bodyLen;
int64_t ver = pReader->pWalReader->pHead->head.version;
#if 0 #if 0
if (pReader->pWalReader->pHead->head.msgType != TDMT_VND_SUBMIT) { if (pReader->pWalReader->pHead->head.msgType != TDMT_VND_SUBMIT) {
// TODO do filter // TODO do filter
...@@ -329,16 +331,17 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) { ...@@ -329,16 +331,17 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) {
return 0; return 0;
} else { } else {
#endif #endif
tqReaderSetDataMsg(pReader, body, pReader->pWalReader->pHead->head.version); tqReaderSetSubmitReq2(pReader, body, bodyLen, ver);
/*tqReaderSetDataMsg(pReader, body, pReader->pWalReader->pHead->head.version);*/
#if 0 #if 0
} }
#endif #endif
} }
while (tqNextDataBlock(pReader)) { while (tqNextDataBlock2(pReader)) {
// TODO mem free // TODO mem free
memset(&ret->data, 0, sizeof(SSDataBlock)); memset(&ret->data, 0, sizeof(SSDataBlock));
int32_t code = tqRetrieveDataBlock(&ret->data, pReader); int32_t code = tqRetrieveDataBlock2(&ret->data, pReader);
if (code != 0 || ret->data.info.rows == 0) { if (code != 0 || ret->data.info.rows == 0) {
ASSERT(0); ASSERT(0);
continue; continue;
...@@ -359,6 +362,7 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) { ...@@ -359,6 +362,7 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) {
} }
} }
#if 0
int32_t tqReaderSetDataMsg(STqReader* pReader, const SSubmitReq* pMsg, int64_t ver) { int32_t tqReaderSetDataMsg(STqReader* pReader, const SSubmitReq* pMsg, int64_t ver) {
pReader->pMsg = pMsg; pReader->pMsg = pMsg;
...@@ -373,7 +377,24 @@ int32_t tqReaderSetDataMsg(STqReader* pReader, const SSubmitReq* pMsg, int64_t v ...@@ -373,7 +377,24 @@ int32_t tqReaderSetDataMsg(STqReader* pReader, const SSubmitReq* pMsg, int64_t v
memset(&pReader->blkIter, 0, sizeof(SSubmitBlkIter)); memset(&pReader->blkIter, 0, sizeof(SSubmitBlkIter));
return 0; return 0;
} }
#endif
int32_t tqReaderSetSubmitReq2(STqReader* pReader, void* msgStr, int32_t msgLen, int64_t ver) {
ASSERT(pReader->msg2.msgStr == NULL);
pReader->msg2.msgStr = msgStr;
pReader->msg2.msgLen = msgLen;
pReader->msg2.ver = ver;
if (pReader->pSubmit == NULL) {
SDecoder decoder;
tDecoderInit(&decoder, pReader->msg2.msgStr, pReader->msg2.msgLen);
tDecodeSSubmitReq2(&decoder, pReader->pSubmit);
tDecoderClear(&decoder);
}
return 0;
}
#if 0
bool tqNextDataBlock(STqReader* pReader) { bool tqNextDataBlock(STqReader* pReader) {
if (pReader->pMsg == NULL) return false; if (pReader->pMsg == NULL) return false;
while (1) { while (1) {
...@@ -397,6 +418,53 @@ bool tqNextDataBlock(STqReader* pReader) { ...@@ -397,6 +418,53 @@ bool tqNextDataBlock(STqReader* pReader) {
} }
return false; return false;
} }
#endif
bool tqNextDataBlock2(STqReader* pReader) {
if (pReader->msg2.msgStr == NULL) return false;
ASSERT(pReader->pSubmit != NULL);
int32_t blockSz = taosArrayGetSize(pReader->pSubmit->aSubmitTbData);
while (pReader->nextBlk < blockSz) {
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->pSubmit->aSubmitTbData, pReader->nextBlk);
if (pReader->tbIdHash == NULL) return true;
void* ret = taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t));
if (ret != NULL) {
return true;
}
}
tDestroySSubmitReq2(pReader->pSubmit, TSDB_MSG_FLG_DECODE);
pReader->pSubmit = NULL;
pReader->nextBlk = 0;
pReader->msg2.msgStr = NULL;
return false;
}
bool tqNextDataBlockFilterOut2(STqReader* pReader, SHashObj* filterOutUids) {
if (pReader->msg2.msgStr == NULL) return false;
ASSERT(pReader->pSubmit != NULL);
int32_t blockSz = taosArrayGetSize(pReader->pSubmit->aSubmitTbData);
while (pReader->nextBlk < blockSz) {
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->pSubmit->aSubmitTbData, pReader->nextBlk);
if (pReader->tbIdHash == NULL) return true;
void* ret = taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t));
if (ret == NULL) {
return true;
}
}
tDestroySSubmitReq2(pReader->pSubmit, TSDB_MSG_FLG_DECODE);
pReader->pSubmit = NULL;
pReader->nextBlk = 0;
pReader->msg2.msgStr = NULL;
return false;
}
int32_t tqMaskBlock(SSchemaWrapper* pDst, SSDataBlock* pBlock, const SSchemaWrapper* pSrc, char* mask) { int32_t tqMaskBlock(SSchemaWrapper* pDst, SSDataBlock* pBlock, const SSchemaWrapper* pSrc, char* mask) {
int32_t code; int32_t code;
...@@ -427,6 +495,7 @@ int32_t tqMaskBlock(SSchemaWrapper* pDst, SSDataBlock* pBlock, const SSchemaWrap ...@@ -427,6 +495,7 @@ int32_t tqMaskBlock(SSchemaWrapper* pDst, SSDataBlock* pBlock, const SSchemaWrap
return 0; return 0;
} }
#if 0
bool tqNextDataBlockFilterOut(STqReader* pHandle, SHashObj* filterOutUids) { bool tqNextDataBlockFilterOut(STqReader* pHandle, SHashObj* filterOutUids) {
while (1) { while (1) {
if (tGetSubmitMsgNext(&pHandle->msgIter, &pHandle->pBlock) < 0) { if (tGetSubmitMsgNext(&pHandle->msgIter, &pHandle->pBlock) < 0) {
...@@ -443,6 +512,225 @@ bool tqNextDataBlockFilterOut(STqReader* pHandle, SHashObj* filterOutUids) { ...@@ -443,6 +512,225 @@ bool tqNextDataBlockFilterOut(STqReader* pHandle, SHashObj* filterOutUids) {
return false; return false;
} }
int32_t tqScanSubmitSplit(SArray* pBlocks, SArray* schemas, STqReader* pReader) {
//
int32_t sversion = htonl(pReader->pBlock->sversion);
if (pReader->cachedSchemaSuid == 0 || pReader->cachedSchemaVer != sversion ||
pReader->cachedSchemaSuid != pReader->msgIter.suid) {
taosMemoryFree(pReader->pSchema);
pReader->pSchema = metaGetTbTSchema(pReader->pVnodeMeta, pReader->msgIter.uid, sversion, 1);
if (pReader->pSchema == NULL) {
tqWarn("vgId:%d, cannot found tsschema for table: uid:%" PRId64 " (suid:%" PRId64
"), version %d, possibly dropped table",
pReader->pWalReader->pWal->cfg.vgId, pReader->msgIter.uid, pReader->msgIter.suid, sversion);
pReader->cachedSchemaSuid = 0;
terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
return -1;
}
tDeleteSSchemaWrapper(pReader->pSchemaWrapper);
pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, pReader->msgIter.uid, sversion, 1);
if (pReader->pSchemaWrapper == NULL) {
tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", version %d, possibly dropped table",
pReader->pWalReader->pWal->cfg.vgId, pReader->msgIter.uid, pReader->cachedSchemaVer);
pReader->cachedSchemaSuid = 0;
terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
return -1;
}
STSchema* pTschema = pReader->pSchema;
SSchemaWrapper* pSchemaWrapper = pReader->pSchemaWrapper;
int32_t colNumNeed = taosArrayGetSize(pReader->pColIdList);
}
return 0;
}
#endif
int32_t tqRetrieveDataBlock2(SSDataBlock* pBlock, STqReader* pReader) {
int32_t blockSz = taosArrayGetSize(pReader->pSubmit->aSubmitTbData);
ASSERT(pReader->nextBlk < blockSz);
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->pSubmit->aSubmitTbData, pReader->nextBlk);
pReader->nextBlk++;
int32_t sversion = pSubmitTbData->sver;
int64_t suid = pSubmitTbData->suid;
int64_t uid = pSubmitTbData->uid;
pReader->lastBlkUid = uid;
pBlock->info.id.uid = uid;
pBlock->info.version = pReader->msg2.ver;
if (pReader->cachedSchemaSuid == 0 || pReader->cachedSchemaVer != sversion || pReader->cachedSchemaSuid != suid) {
taosMemoryFree(pReader->pSchema);
pReader->pSchema = metaGetTbTSchema(pReader->pVnodeMeta, uid, sversion, 1);
if (pReader->pSchema == NULL) {
tqWarn("vgId:%d, cannot found tsschema for table: uid:%" PRId64 " (suid:%" PRId64
"), version %d, possibly dropped table",
pReader->pWalReader->pWal->cfg.vgId, uid, suid, sversion);
pReader->cachedSchemaSuid = 0;
terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
return -1;
}
tDeleteSSchemaWrapper(pReader->pSchemaWrapper);
pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1);
if (pReader->pSchemaWrapper == NULL) {
tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", version %d, possibly dropped table",
pReader->pWalReader->pWal->cfg.vgId, uid, pReader->cachedSchemaVer);
pReader->cachedSchemaSuid = 0;
terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
return -1;
}
STSchema* pTschema = pReader->pSchema;
SSchemaWrapper* pSchemaWrapper = pReader->pSchemaWrapper;
int32_t colNumNeed = taosArrayGetSize(pReader->pColIdList);
if (colNumNeed == 0) {
int32_t colMeta = 0;
while (colMeta < pSchemaWrapper->nCols) {
SSchema* pColSchema = &pSchemaWrapper->pSchema[colMeta];
SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId);
int32_t code = blockDataAppendColInfo(pBlock, &colInfo);
if (code != TSDB_CODE_SUCCESS) {
goto FAIL;
}
colMeta++;
}
} else {
if (colNumNeed > pSchemaWrapper->nCols) {
colNumNeed = pSchemaWrapper->nCols;
}
int32_t colMeta = 0;
int32_t colNeed = 0;
while (colMeta < pSchemaWrapper->nCols && colNeed < colNumNeed) {
SSchema* pColSchema = &pSchemaWrapper->pSchema[colMeta];
col_id_t colIdSchema = pColSchema->colId;
col_id_t colIdNeed = *(col_id_t*)taosArrayGet(pReader->pColIdList, colNeed);
if (colIdSchema < colIdNeed) {
colMeta++;
} else if (colIdSchema > colIdNeed) {
colNeed++;
} else {
SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId);
int32_t code = blockDataAppendColInfo(pBlock, &colInfo);
if (code != TSDB_CODE_SUCCESS) {
goto FAIL;
}
colMeta++;
colNeed++;
}
}
}
int32_t numOfRows = 0;
if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
SArray* pCols = pSubmitTbData->aCol;
SColData* pCol = taosArrayGet(pCols, 0);
numOfRows = pCol->nVal;
} else {
SArray* pRows = pSubmitTbData->aRowP;
numOfRows = taosArrayGetSize(pRows);
}
if (blockDataEnsureCapacity(pBlock, numOfRows) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto FAIL;
}
pBlock->info.rows = numOfRows;
int32_t colActual = blockDataGetNumOfCols(pBlock);
// convert and scan one block
if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
SArray* pCols = pSubmitTbData->aCol;
int32_t numOfCols = taosArrayGetSize(pCols);
int32_t targetIdx = 0;
int32_t sourceIdx = 0;
while (targetIdx < colActual) {
ASSERT(sourceIdx < numOfCols);
SColData* pCol = taosArrayGet(pCols, sourceIdx);
SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx);
SColVal colVal;
ASSERT(pCol->nVal == numOfRows);
if (pCol->cid < pColData->info.colId) {
sourceIdx++;
} else if (pCol->cid == pColData->info.colId) {
for (int32_t i = 0; i < pCol->nVal; i++) {
tColDataGetValue(pCol, sourceIdx, &colVal);
void* val = NULL;
if (IS_STR_DATA_TYPE(colVal.type)) {
val = colVal.value.pData;
} else {
val = &colVal.value.val;
}
if (colDataAppend(pColData, i, val, colVal.type != TD_VTYPE_NORM) < 0) {
goto FAIL;
}
}
sourceIdx++;
targetIdx++;
} else {
ASSERT(0);
}
}
} else {
SArray* pRows = pSubmitTbData->aRowP;
for (int32_t i = 0; i < numOfRows; i++) {
SRow* pRow = taosArrayGet(pRows, i);
int32_t targetIdx = 0;
int32_t sourceIdx = 0;
for (int32_t j = 0; j < colActual; j++) {
SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, j);
while (1) {
ASSERT(sourceIdx < numOfRows);
SColVal colVal;
tRowGet(pRow, pTschema, sourceIdx, &colVal);
if (colVal.cid < pColData->info.colId) {
sourceIdx++;
continue;
} else if (colVal.cid == pColData->info.colId) {
void* val = NULL;
if (IS_STR_DATA_TYPE(colVal.type)) {
val = colVal.value.pData;
} else {
val = &colVal.value.val;
}
if (colDataAppend(pColData, i, val, colVal.type != TD_VTYPE_NORM) < 0) {
goto FAIL;
}
sourceIdx++;
targetIdx++;
break;
} else {
ASSERT(0);
}
}
}
}
}
}
return 0;
FAIL:
blockDataFreeRes(pBlock);
return -1;
}
#if 0
int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader) { int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader) {
// TODO: cache multiple schema // TODO: cache multiple schema
int32_t sversion = htonl(pReader->pBlock->sversion); int32_t sversion = htonl(pReader->pBlock->sversion);
...@@ -690,6 +978,18 @@ FAIL: ...@@ -690,6 +978,18 @@ FAIL:
taosMemoryFree(assigned); taosMemoryFree(assigned);
return -1; return -1;
} }
#endif
int32_t tqRetrieveTaosxBlock2(STqReader* pReader, SArray* blocks, SArray* schemas) {
SSDataBlock block = {0};
if (tqRetrieveDataBlock2(&block, pReader) == 0) {
taosArrayPush(blocks, &block);
SSchemaWrapper* pSW = tCloneSSchemaWrapper(pReader->pSchemaWrapper);
taosArrayPush(schemas, &pSW);
return 0;
}
return -1;
}
void tqReaderSetColIdList(STqReader* pReader, SArray* pColIdList) { pReader->pColIdList = pColIdList; } void tqReaderSetColIdList(STqReader* pReader, SArray* pColIdList) { pReader->pColIdList = pColIdList; }
......
...@@ -569,9 +569,9 @@ static int32_t tsdbInsertColDataToTable(SMemTable *pMemTable, STbData *pTbData, ...@@ -569,9 +569,9 @@ static int32_t tsdbInsertColDataToTable(SMemTable *pMemTable, STbData *pTbData,
int32_t nColData = TARRAY_SIZE(pSubmitTbData->aCol); int32_t nColData = TARRAY_SIZE(pSubmitTbData->aCol);
SColData *aColData = (SColData *)TARRAY_DATA(pSubmitTbData->aCol); SColData *aColData = (SColData *)TARRAY_DATA(pSubmitTbData->aCol);
ASSERT(aColData[0].cid = PRIMARYKEY_TIMESTAMP_COL_ID); ASSERT(aColData[0].cid == PRIMARYKEY_TIMESTAMP_COL_ID);
ASSERT(aColData[0].type = TSDB_DATA_TYPE_TIMESTAMP); ASSERT(aColData[0].type == TSDB_DATA_TYPE_TIMESTAMP);
ASSERT(aColData[0].flag = HAS_VALUE); ASSERT(aColData[0].flag == HAS_VALUE);
// copy and construct block data // copy and construct block data
SBlockData *pBlockData = vnodeBufPoolMalloc(pPool, sizeof(*pBlockData)); SBlockData *pBlockData = vnodeBufPoolMalloc(pPool, sizeof(*pBlockData));
......
...@@ -132,6 +132,9 @@ typedef struct { ...@@ -132,6 +132,9 @@ typedef struct {
int8_t returned; int8_t returned;
int64_t snapshotVer; int64_t snapshotVer;
const SSubmitReq* pReq; const SSubmitReq* pReq;
int64_t scanVer;
SPackedSubmit submit;
SSchemaWrapper* schema; SSchemaWrapper* schema;
char tbName[TSDB_TABLE_NAME_LEN]; char tbName[TSDB_TABLE_NAME_LEN];
...@@ -181,7 +184,7 @@ struct SExecTaskInfo { ...@@ -181,7 +184,7 @@ struct SExecTaskInfo {
SSubplan* pSubplan; SSubplan* pSubplan;
struct SOperatorInfo* pRoot; struct SOperatorInfo* pRoot;
SLocalFetch localFetch; SLocalFetch localFetch;
SArray* pResultBlockList;// result block list SArray* pResultBlockList; // result block list
STaskStopInfo stopInfo; STaskStopInfo stopInfo;
}; };
...@@ -253,22 +256,22 @@ typedef struct SLimitInfo { ...@@ -253,22 +256,22 @@ typedef struct SLimitInfo {
} SLimitInfo; } SLimitInfo;
typedef struct SExchangeInfo { typedef struct SExchangeInfo {
SArray* pSources; SArray* pSources;
SArray* pSourceDataInfo; SArray* pSourceDataInfo;
tsem_t ready; tsem_t ready;
void* pTransporter; void* pTransporter;
// SArray<SSDataBlock*>, result block list, used to keep the multi-block that // SArray<SSDataBlock*>, result block list, used to keep the multi-block that
// passed by downstream operator // passed by downstream operator
SArray* pResultBlockList; SArray* pResultBlockList;
SArray* pRecycledBlocks;// build a pool for small data block to avoid to repeatly create and then destroy. SArray* pRecycledBlocks; // build a pool for small data block to avoid to repeatly create and then destroy.
SSDataBlock* pDummyBlock; // dummy block, not keep data SSDataBlock* pDummyBlock; // dummy block, not keep data
bool seqLoadData; // sequential load data or not, false by default bool seqLoadData; // sequential load data or not, false by default
int32_t current; int32_t current;
SLoadRemoteDataInfo loadInfo; SLoadRemoteDataInfo loadInfo;
uint64_t self; uint64_t self;
SLimitInfo limitInfo; SLimitInfo limitInfo;
int64_t openedTs; // start exec time stamp, todo: move to SLoadRemoteDataInfo int64_t openedTs; // start exec time stamp, todo: move to SLoadRemoteDataInfo
} SExchangeInfo; } SExchangeInfo;
typedef struct SScanInfo { typedef struct SScanInfo {
...@@ -303,9 +306,9 @@ typedef struct { ...@@ -303,9 +306,9 @@ typedef struct {
} SAggOptrPushDownInfo; } SAggOptrPushDownInfo;
typedef struct STableMetaCacheInfo { typedef struct STableMetaCacheInfo {
SLRUCache* pTableMetaEntryCache; // 100 by default SLRUCache* pTableMetaEntryCache; // 100 by default
uint64_t metaFetch; uint64_t metaFetch;
uint64_t cacheHit; uint64_t cacheHit;
} STableMetaCacheInfo; } STableMetaCacheInfo;
typedef struct STableScanBase { typedef struct STableScanBase {
...@@ -323,47 +326,47 @@ typedef struct STableScanBase { ...@@ -323,47 +326,47 @@ typedef struct STableScanBase {
} STableScanBase; } STableScanBase;
typedef struct STableScanInfo { typedef struct STableScanInfo {
STableScanBase base; STableScanBase base;
SScanInfo scanInfo; SScanInfo scanInfo;
int32_t scanTimes; int32_t scanTimes;
SSDataBlock* pResBlock; SSDataBlock* pResBlock;
SSampleExecInfo sample; // sample execution info SSampleExecInfo sample; // sample execution info
int32_t currentGroupId; int32_t currentGroupId;
int32_t currentTable; int32_t currentTable;
int8_t scanMode; int8_t scanMode;
int8_t assignBlockUid; int8_t assignBlockUid;
bool hasGroupByTag; bool hasGroupByTag;
} STableScanInfo; } STableScanInfo;
typedef struct STableMergeScanInfo { typedef struct STableMergeScanInfo {
int32_t tableStartIndex; int32_t tableStartIndex;
int32_t tableEndIndex; int32_t tableEndIndex;
bool hasGroupId; bool hasGroupId;
uint64_t groupId; uint64_t groupId;
SArray* queryConds; // array of queryTableDataCond SArray* queryConds; // array of queryTableDataCond
STableScanBase base; STableScanBase base;
int32_t bufPageSize; int32_t bufPageSize;
uint32_t sortBufSize; // max buffer size for in-memory sort uint32_t sortBufSize; // max buffer size for in-memory sort
SArray* pSortInfo; SArray* pSortInfo;
SSortHandle* pSortHandle; SSortHandle* pSortHandle;
SSDataBlock* pSortInputBlock; SSDataBlock* pSortInputBlock;
int64_t startTs; // sort start time int64_t startTs; // sort start time
SArray* sortSourceParams; SArray* sortSourceParams;
SLimitInfo limitInfo; SLimitInfo limitInfo;
int64_t numOfRows; int64_t numOfRows;
SScanInfo scanInfo; SScanInfo scanInfo;
int32_t scanTimes; int32_t scanTimes;
SSDataBlock* pResBlock; SSDataBlock* pResBlock;
SSampleExecInfo sample; // sample execution info SSampleExecInfo sample; // sample execution info
SSortExecInfo sortExecInfo; SSortExecInfo sortExecInfo;
} STableMergeScanInfo; } STableMergeScanInfo;
typedef struct STagScanInfo { typedef struct STagScanInfo {
SColumnInfo* pCols; SColumnInfo* pCols;
SSDataBlock* pRes; SSDataBlock* pRes;
SColMatchInfo matchInfo; SColMatchInfo matchInfo;
int32_t curPos; int32_t curPos;
SReadHandle readHandle; SReadHandle readHandle;
} STagScanInfo; } STagScanInfo;
typedef enum EStreamScanMode { typedef enum EStreamScanMode {
...@@ -657,19 +660,19 @@ void setOperatorInfo(SOperatorInfo* pOperator, const char* name, int32 ...@@ -657,19 +660,19 @@ void setOperatorInfo(SOperatorInfo* pOperator, const char* name, int32
void* pInfo, SExecTaskInfo* pTaskInfo); void* pInfo, SExecTaskInfo* pTaskInfo);
void destroyOperatorInfo(SOperatorInfo* pOperator); void destroyOperatorInfo(SOperatorInfo* pOperator);
void initBasicInfo(SOptrBasicInfo* pInfo, SSDataBlock* pBlock); void initBasicInfo(SOptrBasicInfo* pInfo, SSDataBlock* pBlock);
void cleanupBasicInfo(SOptrBasicInfo* pInfo); void cleanupBasicInfo(SOptrBasicInfo* pInfo);
int32_t initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr); int32_t initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr);
void cleanupExprSupp(SExprSupp* pSup); void cleanupExprSupp(SExprSupp* pSup);
void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs); void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs);
int32_t initAggSup(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize, int32_t initAggSup(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize,
const char* pkey); const char* pkey);
void cleanupAggSup(SAggSupporter* pAggSup); void cleanupAggSup(SAggSupporter* pAggSup);
void initResultSizeInfo(SResultInfo* pResultInfo, int32_t numOfRows); void initResultSizeInfo(SResultInfo* pResultInfo, int32_t numOfRows);
void doBuildStreamResBlock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo, void doBuildStreamResBlock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
SDiskbasedBuf* pBuf); SDiskbasedBuf* pBuf);
...@@ -780,10 +783,10 @@ void setInputDataBlock(SExprSupp* pExprSupp, SSDataBlock* pBlock, int32_t order, ...@@ -780,10 +783,10 @@ void setInputDataBlock(SExprSupp* pExprSupp, SSDataBlock* pBlock, int32_t order,
int32_t checkForQueryBuf(size_t numOfTables); int32_t checkForQueryBuf(size_t numOfTables);
bool isTaskKilled(SExecTaskInfo* pTaskInfo); bool isTaskKilled(SExecTaskInfo* pTaskInfo);
void setTaskKilled(SExecTaskInfo* pTaskInfo, int32_t rspCode); void setTaskKilled(SExecTaskInfo* pTaskInfo, int32_t rspCode);
void doDestroyTask(SExecTaskInfo* pTaskInfo); void doDestroyTask(SExecTaskInfo* pTaskInfo);
void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status); void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status);
int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId, int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
char* sql, EOPTR_EXEC_MODEL model); char* sql, EOPTR_EXEC_MODEL model);
...@@ -807,8 +810,8 @@ bool isDeletedWindow(STimeWindow* pWin, uint64_t groupId, SAggSupporter* pSup); ...@@ -807,8 +810,8 @@ bool isDeletedWindow(STimeWindow* pWin, uint64_t groupId, SAggSupporter* pSup);
bool isDeletedStreamWindow(STimeWindow* pWin, uint64_t groupId, SStreamState* pState, STimeWindowAggSupp* pTwSup); bool isDeletedStreamWindow(STimeWindow* pWin, uint64_t groupId, SStreamState* pState, STimeWindowAggSupp* pTwSup);
void appendOneRowToStreamSpecialBlock(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid, void appendOneRowToStreamSpecialBlock(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid,
uint64_t* pGp, void* pTbName); uint64_t* pGp, void* pTbName);
uint64_t calGroupIdByData(SPartitionBySupporter* pParSup, SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t rowId); uint64_t calGroupIdByData(SPartitionBySupporter* pParSup, SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t rowId);
void calBlockTbName(SStreamScanInfo* pInfo, SSDataBlock* pBlock); void calBlockTbName(SStreamScanInfo* pInfo, SSDataBlock* pBlock);
int32_t finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, SExprSupp* pSup, int32_t finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, SExprSupp* pSup,
SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo); SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
......
...@@ -539,7 +539,7 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bo ...@@ -539,7 +539,7 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bo
taosArrayPush(pTaskInfo->pResultBlockList, &p1); taosArrayPush(pTaskInfo->pResultBlockList, &p1);
p = p1; p = p1;
} else { } else {
p = *(SSDataBlock**) taosArrayGet(pTaskInfo->pResultBlockList, blockIndex); p = *(SSDataBlock**)taosArrayGet(pTaskInfo->pResultBlockList, blockIndex);
copyDataBlock(p, pRes); copyDataBlock(p, pRes);
} }
...@@ -574,9 +574,9 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bo ...@@ -574,9 +574,9 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bo
void qCleanExecTaskBlockBuf(qTaskInfo_t tinfo) { void qCleanExecTaskBlockBuf(qTaskInfo_t tinfo) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
SArray* pList = pTaskInfo->pResultBlockList; SArray* pList = pTaskInfo->pResultBlockList;
size_t num = taosArrayGetSize(pList); size_t num = taosArrayGetSize(pList);
for(int32_t i = 0; i < num; ++i) { for (int32_t i = 0; i < num; ++i) {
SSDataBlock** p = taosArrayGet(pTaskInfo->pResultBlockList, i); SSDataBlock** p = taosArrayGet(pTaskInfo->pResultBlockList, i);
blockDataDestroy(*p); blockDataDestroy(*p);
} }
...@@ -742,11 +742,11 @@ int32_t qSerializeTaskStatus(qTaskInfo_t tinfo, char** pOutput, int32_t* len) { ...@@ -742,11 +742,11 @@ int32_t qSerializeTaskStatus(qTaskInfo_t tinfo, char** pOutput, int32_t* len) {
} }
int32_t nOptrWithVal = 0; int32_t nOptrWithVal = 0;
// int32_t code = encodeOperator(pTaskInfo->pRoot, pOutput, len, &nOptrWithVal); // int32_t code = encodeOperator(pTaskInfo->pRoot, pOutput, len, &nOptrWithVal);
// if ((code == TSDB_CODE_SUCCESS) && (nOptrWithVal == 0)) { // if ((code == TSDB_CODE_SUCCESS) && (nOptrWithVal == 0)) {
// taosMemoryFreeClear(*pOutput); // taosMemoryFreeClear(*pOutput);
// *len = 0; // *len = 0;
// } // }
return 0; return 0;
} }
...@@ -758,7 +758,7 @@ int32_t qDeserializeTaskStatus(qTaskInfo_t tinfo, const char* pInput, int32_t le ...@@ -758,7 +758,7 @@ int32_t qDeserializeTaskStatus(qTaskInfo_t tinfo, const char* pInput, int32_t le
} }
return 0; return 0;
// return decodeOperator(pTaskInfo->pRoot, pInput, len); // return decodeOperator(pTaskInfo->pRoot, pInput, len);
} }
int32_t qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner) { int32_t qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner) {
...@@ -991,11 +991,20 @@ int32_t initQueryTableDataCondForTmq(SQueryTableDataCond* pCond, SSnapContext* s ...@@ -991,11 +991,20 @@ int32_t initQueryTableDataCondForTmq(SQueryTableDataCond* pCond, SSnapContext* s
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qStreamScanMemData(qTaskInfo_t tinfo, const SSubmitReq* pReq) { int32_t qStreamScanMemData(qTaskInfo_t tinfo, const SSubmitReq* pReq, int64_t scanVer) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE); ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE);
ASSERT(pTaskInfo->streamInfo.pReq == NULL); ASSERT(pTaskInfo->streamInfo.pReq == NULL);
pTaskInfo->streamInfo.pReq = pReq; pTaskInfo->streamInfo.pReq = pReq;
pTaskInfo->streamInfo.scanVer = scanVer;
return 0;
}
int32_t qStreamSetScanMemData(qTaskInfo_t tinfo, SPackedSubmit submit) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE);
ASSERT(pTaskInfo->streamInfo.submit.msgStr == NULL);
pTaskInfo->streamInfo.submit = submit;
return 0; return 0;
} }
......
...@@ -487,10 +487,11 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int ...@@ -487,10 +487,11 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int
code = metaGetTableEntryByUidCache(&mr, pBlock->info.id.uid); code = metaGetTableEntryByUidCache(&mr, pBlock->info.id.uid);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
if (terrno == TSDB_CODE_PAR_TABLE_NOT_EXIST) { if (terrno == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
qWarn("failed to get table meta, table may have been dropped, uid:0x%" PRIx64 ", code:%s, %s", pBlock->info.id.uid, qWarn("failed to get table meta, table may have been dropped, uid:0x%" PRIx64 ", code:%s, %s",
tstrerror(terrno), idStr); pBlock->info.id.uid, tstrerror(terrno), idStr);
} else { } else {
qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", pBlock->info.id.uid, tstrerror(terrno), idStr); qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", pBlock->info.id.uid, tstrerror(terrno),
idStr);
} }
metaReaderClear(&mr); metaReaderClear(&mr);
return terrno; return terrno;
...@@ -1436,8 +1437,8 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock ...@@ -1436,8 +1437,8 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock
NULL); NULL);
if (closedWin && pInfo->partitionSup.needCalc) { if (closedWin && pInfo->partitionSup.needCalc) {
gpId = calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pBlock, rowId); gpId = calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pBlock, rowId);
appendOneRowToStreamSpecialBlock(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, &pBlock->info.id.uid, &gpId, appendOneRowToStreamSpecialBlock(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, &pBlock->info.id.uid,
NULL); &gpId, NULL);
} }
} }
} }
...@@ -1521,8 +1522,13 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { ...@@ -1521,8 +1522,13 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
if (pTaskInfo->streamInfo.pReq != NULL) { if (pTaskInfo->streamInfo.pReq != NULL) {
if (pInfo->tqReader->pMsg == NULL) { if (pInfo->tqReader->pMsg == NULL) {
pInfo->tqReader->pMsg = pTaskInfo->streamInfo.pReq; pInfo->tqReader->pMsg = pTaskInfo->streamInfo.pReq;
pInfo->tqReader->ver = pTaskInfo->streamInfo.scanVer;
const SSubmitReq* pSubmit = pInfo->tqReader->pMsg; const SSubmitReq* pSubmit = pInfo->tqReader->pMsg;
if (tqReaderSetDataMsg(pInfo->tqReader, pSubmit, 0) < 0) { /*if (tqReaderSetDataMsg(pInfo->tqReader, pSubmit, 0) < 0) {*/
/*void* msgStr = pTaskInfo->streamInfo.*/
SPackedSubmit submit = pTaskInfo->streamInfo.submit;
if (tqReaderSetSubmitReq2(pInfo->tqReader, submit.msgStr, submit.msgLen, submit.ver) < 0) {
qError("submit msg messed up when initing stream submit block %p", pSubmit); qError("submit msg messed up when initing stream submit block %p", pSubmit);
pInfo->tqReader->pMsg = NULL; pInfo->tqReader->pMsg = NULL;
pTaskInfo->streamInfo.pReq = NULL; pTaskInfo->streamInfo.pReq = NULL;
...@@ -1533,10 +1539,10 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { ...@@ -1533,10 +1539,10 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
blockDataCleanup(pInfo->pRes); blockDataCleanup(pInfo->pRes);
SDataBlockInfo* pBlockInfo = &pInfo->pRes->info; SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
while (tqNextDataBlock(pInfo->tqReader)) { while (tqNextDataBlock2(pInfo->tqReader)) {
SSDataBlock block = {0}; SSDataBlock block = {0};
int32_t code = tqRetrieveDataBlock(&block, pInfo->tqReader); int32_t code = tqRetrieveDataBlock2(&block, pInfo->tqReader);
if (code != TSDB_CODE_SUCCESS || block.info.rows == 0) { if (code != TSDB_CODE_SUCCESS || block.info.rows == 0) {
continue; continue;
...@@ -1925,7 +1931,8 @@ FETCH_NEXT_BLOCK: ...@@ -1925,7 +1931,8 @@ FETCH_NEXT_BLOCK:
int32_t current = pInfo->validBlockIndex++; int32_t current = pInfo->validBlockIndex++;
SSubmitReq* pSubmit = taosArrayGetP(pInfo->pBlockLists, current); SSubmitReq* pSubmit = taosArrayGetP(pInfo->pBlockLists, current);
if (tqReaderSetDataMsg(pInfo->tqReader, pSubmit, 0) < 0) { /*if (tqReaderSetDataMsg(pInfo->tqReader, pSubmit, 0) < 0) {*/
if (tqReaderSetSubmitReq2(pInfo->tqReader, pSubmit, 0, 0) < 0) {
qError("submit msg messed up when initing stream submit block %p, current %d, total %d", pSubmit, current, qError("submit msg messed up when initing stream submit block %p, current %d, total %d", pSubmit, current,
totBlockNum); totBlockNum);
pInfo->tqReader->pMsg = NULL; pInfo->tqReader->pMsg = NULL;
...@@ -1935,10 +1942,10 @@ FETCH_NEXT_BLOCK: ...@@ -1935,10 +1942,10 @@ FETCH_NEXT_BLOCK:
blockDataCleanup(pInfo->pRes); blockDataCleanup(pInfo->pRes);
while (tqNextDataBlock(pInfo->tqReader)) { while (tqNextDataBlock2(pInfo->tqReader)) {
SSDataBlock block = {0}; SSDataBlock block = {0};
int32_t code = tqRetrieveDataBlock(&block, pInfo->tqReader); int32_t code = tqRetrieveDataBlock2(&block, pInfo->tqReader);
if (code != TSDB_CODE_SUCCESS || block.info.rows == 0) { if (code != TSDB_CODE_SUCCESS || block.info.rows == 0) {
continue; continue;
...@@ -2462,7 +2469,8 @@ static void destroyTagScanOperatorInfo(void* param) { ...@@ -2462,7 +2469,8 @@ static void destroyTagScanOperatorInfo(void* param) {
taosMemoryFreeClear(param); taosMemoryFreeClear(param);
} }
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo) { SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode,
SExecTaskInfo* pTaskInfo) {
STagScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STagScanInfo)); STagScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STagScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) { if (pInfo == NULL || pOperator == NULL) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册