提交 76edc16c 编写于 作者: M Minglei Jin

operator/cache-scan: pass col list down to tsdb cache

上级 dee2ca34
...@@ -180,7 +180,7 @@ int32_t tsdbSetTableList(STsdbReader *pReader, const void *pTableList, int32_t n ...@@ -180,7 +180,7 @@ int32_t tsdbSetTableList(STsdbReader *pReader, const void *pTableList, int32_t n
int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, void *pTableList, int32_t numOfTables, int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, void *pTableList, int32_t numOfTables,
SSDataBlock *pResBlock, STsdbReader **ppReader, const char *idstr, bool countOnly); SSDataBlock *pResBlock, STsdbReader **ppReader, const char *idstr, bool countOnly);
void tsdbReaderSetId(STsdbReader* pReader, const char* idstr); void tsdbReaderSetId(STsdbReader *pReader, const char *idstr);
void tsdbReaderClose(STsdbReader *pReader); void tsdbReaderClose(STsdbReader *pReader);
int32_t tsdbNextDataBlock(STsdbReader *pReader, bool *hasNext); int32_t tsdbNextDataBlock(STsdbReader *pReader, bool *hasNext);
int32_t tsdbRetrieveDatablockSMA(STsdbReader *pReader, SSDataBlock *pDataBlock, bool *allHave); int32_t tsdbRetrieveDatablockSMA(STsdbReader *pReader, SSDataBlock *pDataBlock, bool *allHave);
...@@ -194,7 +194,7 @@ void *tsdbGetIvtIdx(SMeta *pMeta); ...@@ -194,7 +194,7 @@ void *tsdbGetIvtIdx(SMeta *pMeta);
uint64_t getReaderMaxVersion(STsdbReader *pReader); uint64_t getReaderMaxVersion(STsdbReader *pReader);
int32_t tsdbCacherowsReaderOpen(void *pVnode, int32_t type, void *pTableIdList, int32_t numOfTables, int32_t numOfCols, int32_t tsdbCacherowsReaderOpen(void *pVnode, int32_t type, void *pTableIdList, int32_t numOfTables, int32_t numOfCols,
uint64_t suid, void **pReader, const char *idstr); SArray *pCidList, uint64_t suid, void **pReader, const char *idstr);
int32_t tsdbRetrieveCacheRows(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, SArray *pTableUids); int32_t tsdbRetrieveCacheRows(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, SArray *pTableUids);
void *tsdbCacherowsReaderClose(void *pReader); void *tsdbCacherowsReaderClose(void *pReader);
int32_t tsdbGetTableSchema(SVnode *pVnode, int64_t uid, STSchema **pSchema, int64_t *suid); int32_t tsdbGetTableSchema(SVnode *pVnode, int64_t uid, STSchema **pSchema, int64_t *suid);
...@@ -260,7 +260,7 @@ int32_t tqReaderAddTbUidList(STqReader *pReader, const SArray *tbUidList); ...@@ -260,7 +260,7 @@ int32_t tqReaderAddTbUidList(STqReader *pReader, const SArray *tbUidList);
int32_t tqReaderRemoveTbUidList(STqReader *pReader, const SArray *tbUidList); int32_t tqReaderRemoveTbUidList(STqReader *pReader, const SArray *tbUidList);
int32_t tqSeekVer(STqReader *pReader, int64_t ver, const char *id); int32_t tqSeekVer(STqReader *pReader, int64_t ver, const char *id);
void tqNextBlock(STqReader *pReader, SFetchRet *ret); void tqNextBlock(STqReader *pReader, SFetchRet *ret);
int32_t tqReaderSetSubmitReq2(STqReader *pReader, void *msgStr, int32_t msgLen, int64_t ver); int32_t tqReaderSetSubmitReq2(STqReader *pReader, void *msgStr, int32_t msgLen, int64_t ver);
// int32_t tqReaderSetDataMsg(STqReader *pReader, const SSubmitReq *pMsg, int64_t ver); // int32_t tqReaderSetDataMsg(STqReader *pReader, const SSubmitReq *pMsg, int64_t ver);
......
...@@ -787,6 +787,7 @@ typedef struct SCacheRowsReader { ...@@ -787,6 +787,7 @@ typedef struct SCacheRowsReader {
uint64_t suid; uint64_t suid;
char **transferBuf; // todo remove it soon char **transferBuf; // todo remove it soon
int32_t numOfCols; int32_t numOfCols;
SArray *pCidList;
int32_t type; int32_t type;
int32_t tableIndex; // currently returned result tables int32_t tableIndex; // currently returned result tables
STableKeyInfo *pTableList; // table id list STableKeyInfo *pTableList; // table id list
......
...@@ -283,6 +283,14 @@ _exit: ...@@ -283,6 +283,14 @@ _exit:
return code; return code;
} }
int32_t tsdbCacheGetLast(STsdb *pTsdb, tb_uid_t uid, SArray **ppLastArray, SCacheRowsReader *pr) {
int32_t code = 0;
SArray *pCidList = pr->pCidList;
return code;
}
int32_t tsdbOpenCache(STsdb *pTsdb) { int32_t tsdbOpenCache(STsdb *pTsdb) {
int32_t code = 0; int32_t code = 0;
SLRUCache *pCache = NULL; SLRUCache *pCache = NULL;
......
...@@ -143,7 +143,7 @@ static int32_t setTableSchema(SCacheRowsReader* p, uint64_t suid, const char* id ...@@ -143,7 +143,7 @@ static int32_t setTableSchema(SCacheRowsReader* p, uint64_t suid, const char* id
} }
int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList, int32_t numOfTables, int32_t numOfCols, int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList, int32_t numOfTables, int32_t numOfCols,
uint64_t suid, void** pReader, const char* idstr) { SArray* pCidList, uint64_t suid, void** pReader, const char* idstr) {
*pReader = NULL; *pReader = NULL;
SCacheRowsReader* p = taosMemoryCalloc(1, sizeof(SCacheRowsReader)); SCacheRowsReader* p = taosMemoryCalloc(1, sizeof(SCacheRowsReader));
if (p == NULL) { if (p == NULL) {
...@@ -155,6 +155,7 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList, ...@@ -155,6 +155,7 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList,
p->pTsdb = p->pVnode->pTsdb; p->pTsdb = p->pVnode->pTsdb;
p->verRange = (SVersionRange){.minVer = 0, .maxVer = UINT64_MAX}; p->verRange = (SVersionRange){.minVer = 0, .maxVer = UINT64_MAX};
p->numOfCols = numOfCols; p->numOfCols = numOfCols;
p->pCidList = pCidList;
p->suid = suid; p->suid = suid;
if (numOfTables == 0) { if (numOfTables == 0) {
......
...@@ -36,6 +36,7 @@ typedef struct SCacheRowsScanInfo { ...@@ -36,6 +36,7 @@ typedef struct SCacheRowsScanInfo {
int32_t currentGroupIndex; int32_t currentGroupIndex;
SSDataBlock* pBufferredRes; SSDataBlock* pBufferredRes;
SArray* pUidList; SArray* pUidList;
SArray* pCidList;
int32_t indexOfBufferedRes; int32_t indexOfBufferedRes;
STableListInfo* pTableList; STableListInfo* pTableList;
} SCacheRowsScanInfo; } SCacheRowsScanInfo;
...@@ -45,13 +46,13 @@ static void destroyCacheScanOperator(void* param); ...@@ -45,13 +46,13 @@ static void destroyCacheScanOperator(void* param);
static int32_t extractCacheScanSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTaskInfo, int32_t** pSlotIds); static int32_t extractCacheScanSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTaskInfo, int32_t** pSlotIds);
static int32_t removeRedundantTsCol(SLastRowScanPhysiNode* pScanNode, SColMatchInfo* pColMatchInfo); static int32_t removeRedundantTsCol(SLastRowScanPhysiNode* pScanNode, SColMatchInfo* pColMatchInfo);
#define SCAN_ROW_TYPE(_t) ((_t)? CACHESCAN_RETRIEVE_LAST : CACHESCAN_RETRIEVE_LAST_ROW) #define SCAN_ROW_TYPE(_t) ((_t) ? CACHESCAN_RETRIEVE_LAST : CACHESCAN_RETRIEVE_LAST_ROW)
SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandle* readHandle, SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandle* readHandle,
STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) { STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SCacheRowsScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SCacheRowsScanInfo)); SCacheRowsScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SCacheRowsScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) { if (pInfo == NULL || pOperator == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
tableListDestroy(pTableListInfo); tableListDestroy(pTableListInfo);
...@@ -71,6 +72,13 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe ...@@ -71,6 +72,13 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe
goto _error; goto _error;
} }
SArray* pCidList = taosArrayInit(numOfCols, sizeof(int16_t));
for (int i = 0; i < TARRAY_SIZE(pInfo->matchInfo.pList); ++i) {
SColMatchItem* pColInfo = taosArrayGet(pInfo->matchInfo.pList, i);
taosArrayPush(pCidList, &pColInfo->colId);
}
pInfo->pCidList = pCidList;
removeRedundantTsCol(pScanNode, &pInfo->matchInfo); removeRedundantTsCol(pScanNode, &pInfo->matchInfo);
code = extractCacheScanSlotId(pInfo->matchInfo.pList, pTaskInfo, &pInfo->pSlotIds); code = extractCacheScanSlotId(pInfo->matchInfo.pList, pTaskInfo, &pInfo->pSlotIds);
...@@ -91,7 +99,8 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe ...@@ -91,7 +99,8 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe
uint64_t suid = tableListGetSuid(pTableListInfo); uint64_t suid = tableListGetSuid(pTableListInfo);
code = tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pList, totalTables, code = tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pList, totalTables,
taosArrayGetSize(pInfo->matchInfo.pList), suid, &pInfo->pLastrowReader, pTaskInfo->id.str); taosArrayGetSize(pInfo->matchInfo.pList), pCidList, suid, &pInfo->pLastrowReader,
pTaskInfo->id.str);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
...@@ -114,7 +123,8 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe ...@@ -114,7 +123,8 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe
p->pCtx = createSqlFunctionCtx(p->pExprInfo, p->numOfExprs, &p->rowEntryInfoOffset); p->pCtx = createSqlFunctionCtx(p->pExprInfo, p->numOfExprs, &p->rowEntryInfoOffset);
} }
setOperatorInfo(pOperator, "CachedRowScanOperator", QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo); setOperatorInfo(pOperator, "CachedRowScanOperator", QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN, false, OP_NOT_OPENED,
pInfo, pTaskInfo);
pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock); pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
pOperator->fpSet = pOperator->fpSet =
...@@ -123,7 +133,7 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe ...@@ -123,7 +133,7 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe
pOperator->cost.openCost = 0; pOperator->cost.openCost = 0;
return pOperator; return pOperator;
_error: _error:
pTaskInfo->code = code; pTaskInfo->code = code;
destroyCacheScanOperator(pInfo); destroyCacheScanOperator(pInfo);
taosMemoryFree(pOperator); taosMemoryFree(pOperator);
...@@ -136,8 +146,8 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { ...@@ -136,8 +146,8 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
} }
SCacheRowsScanInfo* pInfo = pOperator->info; SCacheRowsScanInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
STableListInfo* pTableList = pInfo->pTableList; STableListInfo* pTableList = pInfo->pTableList;
uint64_t suid = tableListGetSuid(pTableList); uint64_t suid = tableListGetSuid(pTableList);
int32_t size = tableListGetSize(pTableList); int32_t size = tableListGetSize(pTableList);
...@@ -194,8 +204,8 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { ...@@ -194,8 +204,8 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
pRes->info.rows = 1; pRes->info.rows = 1;
SExprSupp* pSup = &pInfo->pseudoExprSup; SExprSupp* pSup = &pInfo->pseudoExprSup;
int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pRes, int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pRes,
pRes->info.rows, GET_TASKID(pTaskInfo), NULL); pRes->info.rows, GET_TASKID(pTaskInfo), NULL);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = code; pTaskInfo->code = code;
return NULL; return NULL;
...@@ -217,7 +227,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { ...@@ -217,7 +227,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
} }
STableKeyInfo* pList = NULL; STableKeyInfo* pList = NULL;
int32_t num = 0; int32_t num = 0;
int32_t code = tableListGetGroupList(pTableList, pInfo->currentGroupIndex, &pList, &num); int32_t code = tableListGetGroupList(pTableList, pInfo->currentGroupIndex, &pList, &num);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
...@@ -225,8 +235,8 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { ...@@ -225,8 +235,8 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
} }
code = tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pList, num, code = tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pList, num,
taosArrayGetSize(pInfo->matchInfo.pList), suid, &pInfo->pLastrowReader, taosArrayGetSize(pInfo->matchInfo.pList), pInfo->pCidList, suid,
pTaskInfo->id.str); &pInfo->pLastrowReader, pTaskInfo->id.str);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
pInfo->currentGroupIndex += 1; pInfo->currentGroupIndex += 1;
taosArrayClear(pInfo->pUidList); taosArrayClear(pInfo->pUidList);
...@@ -254,8 +264,8 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { ...@@ -254,8 +264,8 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
ASSERT((pInfo->retrieveType & CACHESCAN_RETRIEVE_LAST_ROW) == CACHESCAN_RETRIEVE_LAST_ROW); ASSERT((pInfo->retrieveType & CACHESCAN_RETRIEVE_LAST_ROW) == CACHESCAN_RETRIEVE_LAST_ROW);
pInfo->pRes->info.id.uid = *(tb_uid_t*)taosArrayGet(pInfo->pUidList, 0); pInfo->pRes->info.id.uid = *(tb_uid_t*)taosArrayGet(pInfo->pUidList, 0);
code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pInfo->pRes, pInfo->pRes->info.rows, code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pInfo->pRes,
GET_TASKID(pTaskInfo), NULL); pInfo->pRes->info.rows, GET_TASKID(pTaskInfo), NULL);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = code; pTaskInfo->code = code;
return NULL; return NULL;
...@@ -280,6 +290,7 @@ void destroyCacheScanOperator(void* param) { ...@@ -280,6 +290,7 @@ void destroyCacheScanOperator(void* param) {
blockDataDestroy(pInfo->pRes); blockDataDestroy(pInfo->pRes);
blockDataDestroy(pInfo->pBufferredRes); blockDataDestroy(pInfo->pBufferredRes);
taosMemoryFree(pInfo->pSlotIds); taosMemoryFree(pInfo->pSlotIds);
taosArrayDestroy(pInfo->pCidList);
taosArrayDestroy(pInfo->pUidList); taosArrayDestroy(pInfo->pUidList);
taosArrayDestroy(pInfo->matchInfo.pList); taosArrayDestroy(pInfo->matchInfo.pList);
tableListDestroy(pInfo->pTableList); tableListDestroy(pInfo->pTableList);
...@@ -325,7 +336,7 @@ int32_t removeRedundantTsCol(SLastRowScanPhysiNode* pScanNode, SColMatchInfo* pC ...@@ -325,7 +336,7 @@ int32_t removeRedundantTsCol(SLastRowScanPhysiNode* pScanNode, SColMatchInfo* pC
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
size_t size = taosArrayGetSize(pColMatchInfo->pList); size_t size = taosArrayGetSize(pColMatchInfo->pList);
SArray* pMatchInfo = taosArrayInit(size, sizeof(SColMatchItem)); SArray* pMatchInfo = taosArrayInit(size, sizeof(SColMatchItem));
for (int32_t i = 0; i < size; ++i) { for (int32_t i = 0; i < size; ++i) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册