提交 866e4c4b 编写于 作者: L Liu Jicong

refactor(stream): internal refactor

上级 4a648d71
......@@ -117,6 +117,20 @@ typedef struct SSDataBlock {
SDataBlockInfo info;
} SSDataBlock;
enum {
FETCH_TYPE__DATA = 1,
FETCH_TYPE__META,
FETCH_TYPE__NONE,
};
typedef struct {
int8_t fetchType;
union {
SSDataBlock data;
void* meta;
};
} SFetchRet;
typedef struct SVarColAttr {
int32_t* offset; // start position for each entry in the list
uint32_t length; // used buffer size that contain the valid data
......
......@@ -30,7 +30,7 @@ struct SRpcMsg;
struct SSubplan;
typedef struct SReadHandle {
void* streamReader;
void* tqReader;
void* meta;
void* config;
void* vnode;
......@@ -38,7 +38,7 @@ typedef struct SReadHandle {
SMsgCb* pMsgCb;
bool initMetaReader;
bool initTableReader;
bool initStreamReader;
bool initTqReader;
} SReadHandle;
typedef enum {
......@@ -52,7 +52,7 @@ typedef enum {
* @param streamReadHandle
* @return
*/
qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle);
qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers);
/**
* Switch the stream scan to snapshot mode
......@@ -176,6 +176,9 @@ int32_t qGetStreamScanStatus(qTaskInfo_t tinfo, uint64_t* uid, int64_t* ts);
int32_t qStreamPrepareScan(qTaskInfo_t tinfo, uint64_t uid, int64_t ts);
void* qExtractReaderFromStreamScanner(void* scanner);
int32_t qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner);
#ifdef __cplusplus
}
#endif
......
......@@ -235,11 +235,6 @@ typedef struct SStreamTask {
int8_t taskStatus;
int8_t execStatus;
// exec info
int64_t enqueueVer;
int64_t processedVer;
int64_t checkpointVer;
// node info
int32_t selfChildId;
int32_t nodeId;
......
......@@ -64,8 +64,8 @@ int32_t vnodeSnapshotRead(SVSnapshotReader *pReader, const void **ppData, uint32
int32_t vnodeProcessCreateTSma(SVnode *pVnode, void *pCont, uint32_t contLen);
int32_t vnodeGetAllTableList(SVnode *pVnode, uint64_t uid, SArray *list);
int32_t vnodeGetCtbIdList(SVnode *pVnode, int64_t suid, SArray *list);
void * vnodeGetIdx(SVnode *pVnode);
void * vnodeGetIvtIdx(SVnode *pVnode);
void *vnodeGetIdx(SVnode *pVnode);
void *vnodeGetIvtIdx(SVnode *pVnode);
int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad);
int32_t vnodeValidateTableHash(SVnode *pVnode, char *tableFName);
......@@ -95,7 +95,7 @@ typedef struct SMetaFltParam {
tb_uid_t suid;
int16_t cid;
int16_t type;
char * val;
char *val;
bool reverse;
int (*filterFunc)(void *a, void *b, int16_t type);
......@@ -136,8 +136,8 @@ SArray *tsdbRetrieveDataBlock(STsdbReader *pTsdbReadHandle, SArray *pColumnIdLis
int32_t tsdbReaderReset(STsdbReader *pReader, SQueryTableDataCond *pCond, int32_t tWinIdx);
int32_t tsdbGetFileBlocksDistInfo(STsdbReader *pReader, STableBlockDistInfo *pTableBlockInfo);
int64_t tsdbGetNumOfRowsInMemTable(STsdbReader *pHandle);
void * tsdbGetIdx(SMeta *pMeta);
void * tsdbGetIvtIdx(SMeta *pMeta);
void *tsdbGetIdx(SMeta *pMeta);
void *tsdbGetIvtIdx(SMeta *pMeta);
int32_t tsdbLastRowReaderOpen(void *pVnode, int32_t type, SArray *pTableIdList, int32_t *colId, int32_t numOfCols,
void **pReader);
......@@ -146,19 +146,37 @@ int32_t tsdbLastrowReaderClose(void *pReader);
// tq
typedef struct STqReadHandle SStreamReader;
typedef struct STqReader {
int64_t ver;
const SSubmitReq *pMsg;
SSubmitBlk *pBlock;
SSubmitMsgIter msgIter;
SSubmitBlkIter blkIter;
SStreamReader *tqInitSubmitMsgScanner(SMeta *pMeta);
SWalReader *pWalReader;
void tqReadHandleSetColIdList(SStreamReader *pReadHandle, SArray *pColIdList);
int32_t tqReadHandleSetTbUidList(SStreamReader *pHandle, const SArray *tbUidList);
int32_t tqReadHandleAddTbUidList(SStreamReader *pHandle, const SArray *tbUidList);
int32_t tqReadHandleRemoveTbUidList(SStreamReader *pHandle, const SArray *tbUidList);
SMeta *pVnodeMeta;
SHashObj *tbIdHash;
SArray *pColIdList; // SArray<int16_t>
int32_t tqReadHandleSetMsg(SStreamReader *pHandle, SSubmitReq *pMsg, int64_t ver);
bool tqNextDataBlock(SStreamReader *pHandle);
bool tqNextDataBlockFilterOut(SStreamReader *pHandle, SHashObj *filterOutUids);
int32_t tqRetrieveDataBlock(SSDataBlock *pBlock, SStreamReader *pHandle);
int32_t cachedSchemaVer;
int64_t cachedSchemaSuid;
SSchemaWrapper *pSchemaWrapper;
STSchema *pSchema;
} STqReader;
STqReader *tqOpenReader(SVnode *pVnode);
void tqCloseReader(STqReader *);
void tqReaderSetColIdList(STqReader *pReader, SArray *pColIdList);
int32_t tqReaderSetTbUidList(STqReader *pReader, const SArray *tbUidList);
int32_t tqReaderAddTbUidList(STqReader *pReader, const SArray *tbUidList);
int32_t tqReaderRemoveTbUidList(STqReader *pReader, const SArray *tbUidList);
int32_t tqReaderSetDataMsg(STqReader *pReader, SSubmitReq *pMsg, int64_t ver);
bool tqNextDataBlock(STqReader *pReader);
bool tqNextDataBlockFilterOut(STqReader *pReader, SHashObj *filterOutUids);
int32_t tqRetrieveDataBlock(SSDataBlock *pBlock, STqReader *pReader);
// sma
int32_t smaGetTSmaDays(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days);
......@@ -214,7 +232,7 @@ struct SMetaEntry {
int8_t type;
int8_t flags; // TODO: need refactor?
tb_uid_t uid;
char * name;
char *name;
union {
struct {
SSchemaWrapper schemaRow;
......@@ -225,7 +243,7 @@ struct SMetaEntry {
int64_t ctime;
int32_t ttlDays;
int32_t commentLen;
char * comment;
char *comment;
tb_uid_t suid;
uint8_t *pTags;
} ctbEntry;
......@@ -233,7 +251,7 @@ struct SMetaEntry {
int64_t ctime;
int32_t ttlDays;
int32_t commentLen;
char * comment;
char *comment;
int32_t ncid; // next column id
SSchemaWrapper schemaRow;
} ntbEntry;
......@@ -247,17 +265,17 @@ struct SMetaEntry {
struct SMetaReader {
int32_t flags;
SMeta * pMeta;
SMeta *pMeta;
SDecoder coder;
SMetaEntry me;
void * pBuf;
void *pBuf;
int32_t szBuf;
};
struct SMTbCursor {
TBC * pDbc;
void * pKey;
void * pVal;
TBC *pDbc;
void *pKey;
void *pVal;
int32_t kLen;
int32_t vLen;
SMetaReader mr;
......
......@@ -44,25 +44,6 @@ extern "C" {
typedef struct STqOffsetStore STqOffsetStore;
// tqRead
struct STqReadHandle {
int64_t ver;
const SSubmitReq* pMsg;
SSubmitBlk* pBlock;
SSubmitMsgIter msgIter;
SSubmitBlkIter blkIter;
SMeta* pVnodeMeta;
SHashObj* tbIdHash;
SArray* pColIdList; // SArray<int16_t>
int32_t cachedSchemaVer;
int64_t cachedSchemaSuid;
SSchemaWrapper* pSchemaWrapper;
STSchema* pSchema;
};
// tqPush
typedef struct {
......@@ -102,7 +83,7 @@ typedef struct {
typedef struct {
int8_t subType;
SStreamReader* pExecReader[5];
STqReader* pExecReader[5];
union {
STqExecCol execCol;
STqExecTb execTb;
......@@ -118,7 +99,7 @@ typedef struct {
int32_t epoch;
int8_t fetchMeta;
// reader
// TODO remove
SWalReader* pWalReader;
// push
......
......@@ -325,14 +325,14 @@ int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con
return TSDB_CODE_FAILED;
}
SStreamReader *pReadHandle = tqInitSubmitMsgScanner(pMeta);
if (!pReadHandle) {
STqReader *pReader = tqOpenReader(pVnode);
if (!pReader) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
SReadHandle handle = {
.streamReader = pReadHandle,
.tqReader = pReader,
.meta = pMeta,
.pMsgCb = pMsgCb,
.vnode = pVnode,
......@@ -364,7 +364,7 @@ int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con
return TSDB_CODE_SUCCESS;
_err:
tdFreeRSmaInfo(pRSmaInfo);
taosMemoryFree(pReadHandle);
taosMemoryFree(pReader);
return TSDB_CODE_FAILED;
}
......
......@@ -447,26 +447,38 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
pHandle->fetchMeta = req.withMeta;
pHandle->pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
for (int32_t i = 0; i < 5; i++) {
pHandle->execHandle.pExecReader[i] = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);
}
/*for (int32_t i = 0; i < 5; i++) {*/
/*pHandle->execHandle.pExecReader[i] = tqOpenReader(pTq->pVnode);*/
/*}*/
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
pHandle->execHandle.execCol.qmsg = req.qmsg;
req.qmsg = NULL;
for (int32_t i = 0; i < 5; i++) {
SReadHandle handle = {
.streamReader = pHandle->execHandle.pExecReader[i],
.tqReader = pHandle->execHandle.pExecReader[i],
.meta = pTq->pVnode->pMeta,
.vnode = pTq->pVnode,
.initTableReader = true,
.initTqReader = true,
};
pHandle->execHandle.execCol.task[i] = qCreateStreamExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle);
ASSERT(pHandle->execHandle.execCol.task[i]);
void* scanner = NULL;
qExtractStreamScanner(pHandle->execHandle.execCol.task[i], &scanner);
ASSERT(scanner);
pHandle->execHandle.pExecReader[i] = qExtractReaderFromStreamScanner(scanner);
ASSERT(pHandle->execHandle.pExecReader[i]);
}
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
for (int32_t i = 0; i < 5; i++) {
pHandle->execHandle.pExecReader[i] = tqOpenReader(pTq->pVnode);
}
pHandle->execHandle.execDb.pFilterOutTbUid =
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
for (int32_t i = 0; i < 5; i++) {
pHandle->execHandle.pExecReader[i] = tqOpenReader(pTq->pVnode);
}
pHandle->execHandle.execTb.suid = req.suid;
SArray* tbUidList = taosArrayInit(0, sizeof(int64_t));
vnodeGetCtbIdList(pTq->pVnode, req.suid, tbUidList);
......@@ -476,7 +488,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
tqDebug("vg %d, idx %d, uid: %ld", TD_VID(pTq->pVnode), i, tbUid);
}
for (int32_t i = 0; i < 5; i++) {
tqReadHandleSetTbUidList(pHandle->execHandle.pExecReader[i], tbUidList);
tqReaderSetTbUidList(pHandle->execHandle.pExecReader[i], tbUidList);
}
taosArrayDestroy(tbUidList);
}
......@@ -532,7 +544,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen) {
SReadHandle handle = {
.meta = pTq->pVnode->pMeta,
.vnode = pTq->pVnode,
.initStreamReader = 1,
.initTqReader = 1,
};
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle);
} else {
......
......@@ -135,8 +135,8 @@ int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataR
}
} else if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
pRsp->withSchema = 1;
SStreamReader* pReader = pExec->pExecReader[workerId];
tqReadHandleSetMsg(pReader, pReq, 0);
STqReader* pReader = pExec->pExecReader[workerId];
tqReaderSetDataMsg(pReader, pReq, 0);
while (tqNextDataBlock(pReader)) {
SSDataBlock block = {0};
if (tqRetrieveDataBlock(&block, pReader) < 0) {
......@@ -153,8 +153,8 @@ int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataR
}
} else if (pExec->subType == TOPIC_SUB_TYPE__DB) {
pRsp->withSchema = 1;
SStreamReader* pReader = pExec->pExecReader[workerId];
tqReadHandleSetMsg(pReader, pReq, 0);
STqReader* pReader = pExec->pExecReader[workerId];
tqReaderSetDataMsg(pReader, pReq, 0);
while (tqNextDataBlockFilterOut(pReader, pExec->execDb.pFilterOutTbUid)) {
SSDataBlock block = {0};
if (tqRetrieveDataBlock(&block, pReader) < 0) {
......
......@@ -79,12 +79,12 @@ int32_t tqMetaOpen(STQ* pTq) {
tDecodeSTqHandle(&decoder, &handle);
handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
for (int32_t i = 0; i < 5; i++) {
handle.execHandle.pExecReader[i] = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);
handle.execHandle.pExecReader[i] = tqOpenReader(pTq->pVnode);
}
if (handle.execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
for (int32_t i = 0; i < 5; i++) {
SReadHandle reader = {
.streamReader = handle.execHandle.pExecReader[i],
.tqReader = handle.execHandle.pExecReader[i],
.meta = pTq->pVnode->pMeta,
.pMsgCb = &pTq->pVnode->msgCb,
.vnode = pTq->pVnode,
......
......@@ -15,6 +15,11 @@
#include "tq.h"
int64_t tqScanLog(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOffsetVal offset) {
/*if ()*/
return 0;
}
int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** ppCkHead) {
int32_t code = 0;
taosThreadMutexLock(&pHandle->pWalReader->mutex);
......@@ -73,53 +78,107 @@ END:
return code;
}
SStreamReader* tqInitSubmitMsgScanner(SMeta* pMeta) {
SStreamReader* pReadHandle = taosMemoryMalloc(sizeof(SStreamReader));
if (pReadHandle == NULL) {
STqReader* tqOpenReader(SVnode* pVnode) {
STqReader* pReader = taosMemoryMalloc(sizeof(STqReader));
if (pReader == NULL) {
return NULL;
}
pReadHandle->pVnodeMeta = pMeta;
pReadHandle->pMsg = NULL;
pReadHandle->ver = -1;
pReadHandle->pColIdList = NULL;
pReadHandle->cachedSchemaVer = 0;
pReadHandle->cachedSchemaSuid = 0;
pReadHandle->pSchema = NULL;
pReadHandle->pSchemaWrapper = NULL;
pReadHandle->tbIdHash = NULL;
return pReadHandle;
// TODO open
/*pReader->pWalReader = walOpenReader(pVnode->pWal, NULL);*/
pReader->pVnodeMeta = pVnode->pMeta;
pReader->pMsg = NULL;
pReader->ver = -1;
pReader->pColIdList = NULL;
pReader->cachedSchemaVer = 0;
pReader->cachedSchemaSuid = 0;
pReader->pSchema = NULL;
pReader->pSchemaWrapper = NULL;
pReader->tbIdHash = NULL;
return pReader;
}
void tqCloseReader(STqReader* pReader) {
// close wal reader
// free cached schema
// free hash
taosMemoryFree(pReader);
}
int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) {
bool fromProcessedMsg = pReader->pMsg != NULL;
while (1) {
if (!fromProcessedMsg) {
if (walNextValidMsg(pReader->pWalReader) < 0) {
ret->fetchType = FETCH_TYPE__NONE;
return -1;
}
void* body = pReader->pWalReader->pHead->head.body;
if (pReader->pWalReader->pHead->head.msgType != TDMT_VND_SUBMIT) {
// TODO do filter
ret->fetchType = FETCH_TYPE__META;
ret->meta = pReader->pWalReader->pHead->head.body;
return 0;
} else {
tqReaderSetDataMsg(pReader, body, pReader->pWalReader->pHead->head.version);
}
}
while (tqNextDataBlock(pReader)) {
memset(&ret->data, 0, sizeof(SSDataBlock));
int32_t code = tqRetrieveDataBlock(&ret->data, pReader);
if (code != 0 || ret->data.info.rows == 0) {
if (fromProcessedMsg) {
ret->fetchType = FETCH_TYPE__NONE;
return 0;
} else {
break;
}
}
ret->fetchType = FETCH_TYPE__DATA;
return 0;
}
if (fromProcessedMsg) {
ret->fetchType = FETCH_TYPE__NONE;
return 0;
}
}
}
int32_t tqReadHandleSetMsg(SStreamReader* pReadHandle, SSubmitReq* pMsg, int64_t ver) {
pReadHandle->pMsg = pMsg;
int32_t tqReaderSetDataMsg(STqReader* pReader, SSubmitReq* pMsg, int64_t ver) {
pReader->pMsg = pMsg;
if (tInitSubmitMsgIter(pMsg, &pReadHandle->msgIter) < 0) return -1;
if (tInitSubmitMsgIter(pMsg, &pReader->msgIter) < 0) return -1;
while (true) {
if (tGetSubmitMsgNext(&pReadHandle->msgIter, &pReadHandle->pBlock) < 0) return -1;
if (pReadHandle->pBlock == NULL) break;
if (tGetSubmitMsgNext(&pReader->msgIter, &pReader->pBlock) < 0) return -1;
if (pReader->pBlock == NULL) break;
}
if (tInitSubmitMsgIter(pMsg, &pReadHandle->msgIter) < 0) return -1;
pReadHandle->ver = ver;
memset(&pReadHandle->blkIter, 0, sizeof(SSubmitBlkIter));
if (tInitSubmitMsgIter(pMsg, &pReader->msgIter) < 0) return -1;
pReader->ver = ver;
memset(&pReader->blkIter, 0, sizeof(SSubmitBlkIter));
return 0;
}
bool tqNextDataBlock(SStreamReader* pHandle) {
if (pHandle->pMsg == NULL) return false;
bool tqNextDataBlock(STqReader* pReader) {
if (pReader->pMsg == NULL) return false;
while (1) {
if (tGetSubmitMsgNext(&pHandle->msgIter, &pHandle->pBlock) < 0) {
if (tGetSubmitMsgNext(&pReader->msgIter, &pReader->pBlock) < 0) {
return false;
}
if (pHandle->pBlock == NULL) {
pHandle->pMsg = NULL;
if (pReader->pBlock == NULL) {
pReader->pMsg = NULL;
return false;
}
if (pHandle->tbIdHash == NULL) {
if (pReader->tbIdHash == NULL) {
return true;
}
void* ret = taosHashGet(pHandle->tbIdHash, &pHandle->msgIter.uid, sizeof(int64_t));
void* ret = taosHashGet(pReader->tbIdHash, &pReader->msgIter.uid, sizeof(int64_t));
/*tqDebug("search uid %ld", pHandle->msgIter.uid);*/
if (ret != NULL) {
/*tqDebug("find uid %ld", pHandle->msgIter.uid);*/
......@@ -129,7 +188,7 @@ bool tqNextDataBlock(SStreamReader* pHandle) {
return false;
}
bool tqNextDataBlockFilterOut(SStreamReader* pHandle, SHashObj* filterOutUids) {
bool tqNextDataBlockFilterOut(STqReader* pHandle, SHashObj* filterOutUids) {
while (1) {
if (tGetSubmitMsgNext(&pHandle->msgIter, &pHandle->pBlock) < 0) {
return false;
......@@ -145,38 +204,38 @@ bool tqNextDataBlockFilterOut(SStreamReader* pHandle, SHashObj* filterOutUids) {
return false;
}
int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, SStreamReader* pHandle) {
int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader) {
// TODO: cache multiple schema
int32_t sversion = htonl(pHandle->pBlock->sversion);
if (pHandle->cachedSchemaSuid == 0 || pHandle->cachedSchemaVer != sversion ||
pHandle->cachedSchemaSuid != pHandle->msgIter.suid) {
if (pHandle->pSchema) taosMemoryFree(pHandle->pSchema);
pHandle->pSchema = metaGetTbTSchema(pHandle->pVnodeMeta, pHandle->msgIter.uid, sversion);
if (pHandle->pSchema == NULL) {
int32_t sversion = htonl(pReader->pBlock->sversion);
if (pReader->cachedSchemaSuid == 0 || pReader->cachedSchemaVer != sversion ||
pReader->cachedSchemaSuid != pReader->msgIter.suid) {
if (pReader->pSchema) taosMemoryFree(pReader->pSchema);
pReader->pSchema = metaGetTbTSchema(pReader->pVnodeMeta, pReader->msgIter.uid, sversion);
if (pReader->pSchema == NULL) {
tqWarn("cannot found tsschema for table: uid: %ld (suid: %ld), version %d, possibly dropped table",
pHandle->msgIter.uid, pHandle->msgIter.suid, pHandle->cachedSchemaVer);
pReader->msgIter.uid, pReader->msgIter.suid, pReader->cachedSchemaVer);
/*ASSERT(0);*/
terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
return -1;
}
if (pHandle->pSchemaWrapper) tDeleteSSchemaWrapper(pHandle->pSchemaWrapper);
pHandle->pSchemaWrapper = metaGetTableSchema(pHandle->pVnodeMeta, pHandle->msgIter.uid, sversion, true);
if (pHandle->pSchemaWrapper == NULL) {
if (pReader->pSchemaWrapper) tDeleteSSchemaWrapper(pReader->pSchemaWrapper);
pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, pReader->msgIter.uid, sversion, true);
if (pReader->pSchemaWrapper == NULL) {
tqWarn("cannot found schema wrapper for table: suid: %ld, version %d, possibly dropped table",
pHandle->msgIter.uid, pHandle->cachedSchemaVer);
pReader->msgIter.uid, pReader->cachedSchemaVer);
/*ASSERT(0);*/
terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
return -1;
}
pHandle->cachedSchemaVer = sversion;
pHandle->cachedSchemaSuid = pHandle->msgIter.suid;
pReader->cachedSchemaVer = sversion;
pReader->cachedSchemaSuid = pReader->msgIter.suid;
}
STSchema* pTschema = pHandle->pSchema;
SSchemaWrapper* pSchemaWrapper = pHandle->pSchemaWrapper;
STSchema* pTschema = pReader->pSchema;
SSchemaWrapper* pSchemaWrapper = pReader->pSchemaWrapper;
int32_t colNumNeed = taosArrayGetSize(pHandle->pColIdList);
int32_t colNumNeed = taosArrayGetSize(pReader->pColIdList);
if (colNumNeed == 0) {
int32_t colMeta = 0;
......@@ -199,7 +258,7 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, SStreamReader* pHandle) {
while (colMeta < pSchemaWrapper->nCols && colNeed < colNumNeed) {
SSchema* pColSchema = &pSchemaWrapper->pSchema[colMeta];
col_id_t colIdSchema = pColSchema->colId;
col_id_t colIdNeed = *(col_id_t*)taosArrayGet(pHandle->pColIdList, colNeed);
col_id_t colIdNeed = *(col_id_t*)taosArrayGet(pReader->pColIdList, colNeed);
if (colIdSchema < colIdNeed) {
colMeta++;
} else if (colIdSchema > colIdNeed) {
......@@ -216,7 +275,7 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, SStreamReader* pHandle) {
}
}
if (blockDataEnsureCapacity(pBlock, pHandle->msgIter.numOfRows) < 0) {
if (blockDataEnsureCapacity(pBlock, pReader->msgIter.numOfRows) < 0) {
goto FAIL;
}
......@@ -227,13 +286,12 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, SStreamReader* pHandle) {
STSRow* row;
int32_t curRow = 0;
tInitSubmitBlkIter(&pHandle->msgIter, pHandle->pBlock, &pHandle->blkIter);
tInitSubmitBlkIter(&pReader->msgIter, pReader->pBlock, &pReader->blkIter);
pBlock->info.groupId = 0;
pBlock->info.uid = pHandle->msgIter.uid;
pBlock->info.rows = pHandle->msgIter.numOfRows;
pBlock->info.uid = pReader->msgIter.uid;
pBlock->info.rows = pReader->msgIter.numOfRows;
while ((row = tGetSubmitBlkNext(&pHandle->blkIter)) != NULL) {
while ((row = tGetSubmitBlkNext(&pReader->blkIter)) != NULL) {
tdSTSRowIterReset(&iter, row);
// get all wanted col of that block
for (int32_t i = 0; i < colActual; i++) {
......@@ -255,9 +313,9 @@ FAIL:
return -1;
}
void tqReadHandleSetColIdList(SStreamReader* pReadHandle, SArray* pColIdList) { pReadHandle->pColIdList = pColIdList; }
void tqReaderSetColIdList(STqReader* pReadHandle, SArray* pColIdList) { pReadHandle->pColIdList = pColIdList; }
int tqReadHandleSetTbUidList(SStreamReader* pHandle, const SArray* tbUidList) {
int tqReaderSetTbUidList(STqReader* pHandle, const SArray* tbUidList) {
if (pHandle->tbIdHash) {
taosHashClear(pHandle->tbIdHash);
}
......@@ -276,7 +334,7 @@ int tqReadHandleSetTbUidList(SStreamReader* pHandle, const SArray* tbUidList) {
return 0;
}
int tqReadHandleAddTbUidList(SStreamReader* pHandle, const SArray* tbUidList) {
int tqReaderAddTbUidList(STqReader* pHandle, const SArray* tbUidList) {
if (pHandle->tbIdHash == NULL) {
pHandle->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
if (pHandle->tbIdHash == NULL) {
......@@ -293,7 +351,7 @@ int tqReadHandleAddTbUidList(SStreamReader* pHandle, const SArray* tbUidList) {
return 0;
}
int tqReadHandleRemoveTbUidList(SStreamReader* pHandle, const SArray* tbUidList) {
int tqReaderRemoveTbUidList(STqReader* pHandle, const SArray* tbUidList) {
ASSERT(pHandle->tbIdHash != NULL);
for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) {
......
......@@ -365,7 +365,7 @@ typedef struct SStreamScanInfo {
int32_t blockType; // current block type
int32_t validBlockIndex; // Is current data has returned?
uint64_t numOfExec; // execution times
void* streamReader;// stream block reader handle
STqReader* tqReader;
int32_t tsArrayIndex;
SArray* tsArray;
......@@ -383,6 +383,11 @@ typedef struct SStreamScanInfo {
SSDataBlock* pPullDataRes; // pull data SSDataBlock
SSDataBlock* pDeleteDataRes; // delete data SSDataBlock
int32_t deleteDataIndex;
// status for tmq
//SSchemaWrapper schema;
STqOffset offset;
} SStreamScanInfo;
typedef struct SSysTableScanInfo {
......
......@@ -45,7 +45,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
pInfo->blockType = type;
if (type == STREAM_INPUT__DATA_SUBMIT) {
if (tqReadHandleSetMsg(pInfo->streamReader, input, 0) < 0) {
if (tqReaderSetDataMsg(pInfo->tqReader, input, 0) < 0) {
qError("submit msg messed up when initing stream block, %s" PRIx64, id);
return TSDB_CODE_QRY_APP_ERROR;
}
......@@ -105,7 +105,7 @@ int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numO
return code;
}
qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle) {
qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers) {
if (msg == NULL) {
return NULL;
}
......@@ -120,7 +120,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle) {
}
qTaskInfo_t pTaskInfo = NULL;
code = qCreateExecTask(streamReadHandle, 0, 0, plan, &pTaskInfo, NULL, NULL, OPTR_EXEC_MODEL_STREAM);
code = qCreateExecTask(readers, 0, 0, plan, &pTaskInfo, NULL, NULL, OPTR_EXEC_MODEL_STREAM);
if (code != TSDB_CODE_SUCCESS) {
// TODO: destroy SSubplan & pTaskInfo
terrno = code;
......@@ -174,11 +174,11 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
SArray* qa = filterQualifiedChildTables(pScanInfo, tableIdList);
qDebug(" %d qualified child tables added into stream scanner", (int32_t)taosArrayGetSize(qa));
code = tqReadHandleAddTbUidList(pScanInfo->streamReader, qa);
code = tqReaderAddTbUidList(pScanInfo->tqReader, qa);
taosArrayDestroy(qa);
} else { // remove the table id in current list
qDebug(" %d remove child tables from the stream scanner", (int32_t)taosArrayGetSize(tableIdList));
code = tqReadHandleRemoveTbUidList(pScanInfo->streamReader, tableIdList);
code = tqReaderRemoveTbUidList(pScanInfo->tqReader, tableIdList);
}
return code;
......
......@@ -236,6 +236,37 @@ int32_t qDeserializeTaskStatus(qTaskInfo_t tinfo, const char* pInput, int32_t le
return decodeOperator(pTaskInfo->pRoot, pInput, len);
}
int32_t qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
SOperatorInfo* pOperator = pTaskInfo->pRoot;
while (1) {
uint8_t type = pOperator->operatorType;
if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
*scanner = pOperator->info;
return 0;
} else {
ASSERT(pOperator->numOfDownstream == 1);
pOperator = pOperator->pDownstream[0];
}
}
}
void* qExtractReaderFromStreamScanner(void* scanner) {
SStreamScanInfo* pInfo = scanner;
return (void*)pInfo->tqReader;
}
const SSchemaWrapper* qExtractSchemaFromStreamScanner(void* scanner) {
SStreamScanInfo* pInfo = scanner;
return pInfo->tqReader->pSchemaWrapper;
}
const STqOffset* qExtractStatusFromStreamScanner(void* scanner) {
SStreamScanInfo* pInfo = scanner;
return &pInfo->offset;
}
int32_t qStreamPrepareScan(qTaskInfo_t tinfo, uint64_t uid, int64_t ts) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
......
......@@ -2842,7 +2842,7 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scan
}
int32_t doPrepareScan(SOperatorInfo* pOperator, uint64_t uid, int64_t ts) {
int32_t type = pOperator->operatorType;
uint8_t type = pOperator->operatorType;
pOperator->status = OP_OPENED;
......@@ -4309,11 +4309,11 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
} else if (QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN == type) {
SLastRowScanPhysiNode* pScanNode = (SLastRowScanPhysiNode*)pPhyNode;
// int32_t code = createScanTableListInfo(pTableScanNode, pHandle, pTableListInfo, queryId, taskId);
// if (code) {
// pTaskInfo->code = code;
// return NULL;
// }
// int32_t code = createScanTableListInfo(pTableScanNode, pHandle, pTableListInfo, queryId, taskId);
// if (code) {
// pTaskInfo->code = code;
// return NULL;
// }
int32_t code = extractTableSchemaVersion(pHandle, pScanNode->uid, pTaskInfo);
if (code != TSDB_CODE_SUCCESS) {
......@@ -4370,8 +4370,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
pOptr = createGroupOperatorInfo(ops[0], pExprInfo, num, pResBlock, pColList, pAggNode->node.pConditions,
pScalarExprInfo, numOfScalarExpr, pTaskInfo);
} else {
pOptr =
createAggregateOperatorInfo(ops[0], pExprInfo, num, pResBlock, pAggNode->node.pConditions, pScalarExprInfo, numOfScalarExpr, pTaskInfo);
pOptr = createAggregateOperatorInfo(ops[0], pExprInfo, num, pResBlock, pAggNode->node.pConditions,
pScalarExprInfo, numOfScalarExpr, pTaskInfo);
}
} else if (QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL == type || QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL == type) {
SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode;
......@@ -4538,7 +4538,8 @@ SArray* extractColumnInfo(SNodeList* pNodeList) {
return pList;
}
STsdbReader* doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, STableListInfo* pTableListInfo, const char* idstr) {
STsdbReader* doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle,
STableListInfo* pTableListInfo, const char* idstr) {
int32_t code = getTableList(pHandle->meta, pHandle->vnode, &pTableScanNode->scan, pTableListInfo);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
......@@ -4782,7 +4783,6 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead
(*pTaskInfo)->pRoot = createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, queryId, taskId,
&(*pTaskInfo)->tableqinfoList, pPlan->user);
if (NULL == (*pTaskInfo)->pRoot) {
code = (*pTaskInfo)->code;
goto _complete;
......
......@@ -1232,38 +1232,33 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
blockDataCleanup(pInfo->pRes);
while (tqNextDataBlock(pInfo->streamReader)) {
while (tqNextDataBlock(pInfo->tqReader)) {
SSDataBlock block = {0};
// todo refactor
int32_t code = tqRetrieveDataBlock(&block, pInfo->streamReader);
int32_t code = tqRetrieveDataBlock(&block, pInfo->tqReader);
uint64_t groupId = block.info.groupId;
uint64_t uid = block.info.uid;
int32_t numOfRows = block.info.rows;
if (code != TSDB_CODE_SUCCESS || numOfRows == 0) {
if (code != TSDB_CODE_SUCCESS || block.info.rows == 0) {
pTaskInfo->code = code;
return NULL;
}
pInfo->pRes->info.groupId = groupId;
pInfo->pRes->info.rows = numOfRows;
pInfo->pRes->info.uid = uid;
pInfo->pRes->info.rows = block.info.rows;
pInfo->pRes->info.uid = block.info.uid;
pInfo->pRes->info.type = STREAM_NORMAL;
pInfo->pRes->info.capacity = numOfRows;
pInfo->pRes->info.capacity = block.info.rows;
// for generating rollup SMA result, each time is an independent time serie.
// TODO temporarily used, when the statement of "partition by tbname" is ready, remove this
if (pInfo->assignBlockUid) {
pInfo->pRes->info.groupId = uid;
} else {
pInfo->pRes->info.groupId = groupId;
pInfo->pRes->info.groupId = block.info.uid;
}
uint64_t* groupIdPre = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &uid, sizeof(int64_t));
uint64_t* groupIdPre = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &block.info.uid, sizeof(int64_t));
if (groupIdPre) {
pInfo->pRes->info.groupId = *groupIdPre;
} else {
pInfo->pRes->info.groupId = 0;
}
// todo extract method
......@@ -1413,13 +1408,13 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
}
}
if (pHandle->initStreamReader) {
ASSERT(pHandle->streamReader == NULL);
pInfo->streamReader = tqInitSubmitMsgScanner(pHandle->meta);
ASSERT(pInfo->streamReader);
if (pHandle->initTqReader) {
ASSERT(pHandle->tqReader == NULL);
pInfo->tqReader = tqOpenReader(pHandle->vnode);
ASSERT(pInfo->tqReader);
} else {
ASSERT(pHandle->streamReader);
pInfo->streamReader = pHandle->streamReader;
ASSERT(pHandle->tqReader);
pInfo->tqReader = pHandle->tqReader;
}
if (pSTInfo->interval.interval > 0) {
......@@ -1435,9 +1430,9 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
pInfo->tableUid = pScanPhyNode->uid;
// set the extract column id to streamHandle
tqReadHandleSetColIdList(pInfo->streamReader, pColIds);
tqReaderSetColIdList(pInfo->tqReader, pColIds);
SArray* tableIdList = extractTableIdList(&pTaskInfo->tableqinfoList);
int32_t code = tqReadHandleSetTbUidList(pInfo->streamReader, tableIdList);
int32_t code = tqReaderSetTbUidList(pInfo->tqReader, tableIdList);
if (code != 0) {
taosArrayDestroy(tableIdList);
goto _error;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册