提交 a63473b1 编写于 作者: H Haojun Liao

refacotor: do some internal refactor.

上级 c66c0c4d
......@@ -34,6 +34,11 @@ extern "C" {
#define CACHESCAN_RETRIEVE_LAST_ROW 0x4
#define CACHESCAN_RETRIEVE_LAST 0x8
#define META_READER_NOLOCK 0x1
typedef struct SMeta SMeta;
typedef TSKEY (*GetTsFun)(void*);
typedef struct SMetaEntry {
int64_t version;
int8_t type;
......@@ -70,6 +75,32 @@ typedef struct SMetaEntry {
uint8_t *pBuf;
} SMetaEntry;
typedef struct SMetaReader {
int32_t flags;
void * pMeta;
SDecoder coder;
SMetaEntry me;
void * pBuf;
int32_t szBuf;
struct SStorageAPI *storageAPI;
} SMetaReader;
typedef struct SMTbCursor {
struct TBC *pDbc;
void * pKey;
void * pVal;
int32_t kLen;
int32_t vLen;
SMetaReader mr;
} SMTbCursor;
typedef struct SRowBuffPos {
void* pRowBuff;
void* pKey;
bool beFlushed;
bool beUsed;
} SRowBuffPos;
// int32_t tsdbReuseCacherowsReader(void* pReader, void* pTableIdList, int32_t numOfTables);
// int32_t tsdbCacherowsReaderOpen(void *pVnode, int32_t type, void *pTableIdList, int32_t numOfTables, int32_t numOfCols,
// SArray *pCidList, int32_t *pSlotIds, uint64_t suid, void **pReader, const char *idstr);
......@@ -135,8 +166,6 @@ typedef struct SMetaTableInfo {
char tbName[TSDB_TABLE_NAME_LEN];
} SMetaTableInfo;
typedef struct SMeta SMeta;
typedef struct SSnapContext {
SMeta * pMeta; // todo remove it
int64_t snapVersion;
......@@ -162,23 +191,18 @@ typedef struct {
// int32_t tqReaderRemoveTbUidList(STqReader *pReader, const SArray *tbUidList);
// bool tqReaderIsQueriedTable(STqReader* pReader, uint64_t uid);
// bool tqCurrentBlockConsumed(const STqReader* pReader);
//
// int32_t tqSeekVer(STqReader *pReader, int64_t ver, const char *id);
// bool tqNextBlockInWal(STqReader* pReader, const char* idstr);
// bool tqNextBlockImpl(STqReader *pReader, const char* idstr);
// int32_t getMetafromSnapShot(SSnapContext *ctx, void **pBuf, int32_t *contLen, int16_t *type, int64_t *uid);
// SMetaTableInfo getUidfromSnapShot(SSnapContext *ctx);
// int32_t setForSnapShot(SSnapContext *ctx, int64_t uid);
// int32_t destroySnapContext(SSnapContext *ctx);
// SMTbCursor *metaOpenTbCursor(SMeta *pMeta);
// void metaCloseTbCursor(SMTbCursor *pTbCur);
// int32_t metaTbCursorNext(SMTbCursor *pTbCur, ETableType jumpTableType);
// int32_t metaTbCursorPrev(SMTbCursor *pTbCur, ETableType jumpTableType);
#define META_READER_NOLOCK 0x1
/*-------------------------------------------------new api format---------------------------------------------------*/
// typedef int32_t (*__store_reader_(STsdbReader *pReader, const void *pTableList, int32_t num);
......@@ -201,7 +225,7 @@ typedef int32_t (*__store_reader_open_fn_t)(void *pVnode, SQueryTableDataCond *p
int32_t numOfTables, SSDataBlock *pResBlock, void **ppReader,
const char *idstr, bool countOnly, SHashObj **pIgnoreTables);
typedef struct SStoreDataReaderFn {
typedef struct SStoreTSDReader {
__store_reader_open_fn_t storeReaderOpen;
void (*storeReaderClose)();
void (*setReaderId)(void *pReader, const char *pId);
......@@ -216,7 +240,7 @@ typedef struct SStoreDataReaderFn {
void (*storeReaderGetDataBlockDistInfo)();
void (*storeReaderGetNumOfInMemRows)();
void (*storeReaderNotifyClosing)();
} SStoreDataReaderFn;
} SStoreTSDReader;
/**
* int32_t tsdbReuseCacherowsReader(void* pReader, void* pTableIdList, int32_t numOfTables);
......@@ -226,14 +250,14 @@ int32_t tsdbRetrieveCacheRows(void *pReader, SSDataBlock *pResBlock, const int32
SArray *pTableUids);
void *tsdbCacherowsReaderClose(void *pReader);
*/
typedef struct SStoreCachedDataReaderFn {
typedef struct SStoreCacheReader {
int32_t (*openReader)(void *pVnode, int32_t type, void *pTableIdList, int32_t numOfTables, int32_t numOfCols,
SArray *pCidList, int32_t *pSlotIds, uint64_t suid, void **pReader, const char *idstr);
void *(*closeReader)(void *pReader);
int32_t (*retrieveRows)(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, const int32_t *dstSlotIds,
SArray *pTableUidList);
void (*reuseReader)(void *pReader, void *pTableIdList, int32_t numOfTables);
} SStoreCachedDataReaderFn;
} SStoreCacheReader;
/*------------------------------------------------------------------------------------------------------------------*/
/*
......@@ -259,7 +283,7 @@ SWalReader* tqGetWalReader(STqReader* pReader);
int32_t tqRetrieveTaosxBlock(STqReader *pReader, SArray *blocks, SArray *schemas, SSubmitTbData **pSubmitTbDataRet);
*/
// todo rename
typedef struct SStoreTqReaderFn {
typedef struct SStoreTqReader {
void *(*tqReaderOpen)();
void (*tqReaderClose)();
......@@ -282,7 +306,7 @@ typedef struct SStoreTqReaderFn {
int32_t (*tqReaderSetSubmitMsg)(); // todo remove it
void (*tqReaderNextBlockFilterOut)();
} SStoreTqReaderFn;
} SStoreTqReader;
typedef struct SStoreSnapshotFn {
/*
......@@ -291,10 +315,10 @@ typedef struct SStoreSnapshotFn {
int32_t setForSnapShot(SSnapContext *ctx, int64_t uid);
int32_t destroySnapContext(SSnapContext *ctx);
*/
int32_t (*storeCreateSnapshot)();
void (*storeDestroySnapshot)();
SMetaTableInfo (*storeSSGetTableInfo)();
int32_t (*storeSSGetMetaInfo)();
int32_t (*createSnapshot)();
void (*destroySnapshot)();
SMetaTableInfo (*getTableInfoFromSnapshot)();
int32_t (*getMetaInfoFromSnapshot)();
} SStoreSnapshotFn;
/**
......@@ -322,17 +346,9 @@ int32_t metaGetCachedTbGroup(SMeta* pMeta, tb_uid_t suid, const uint8_t* pKey,
int32_t metaPutTbGroupToCache(SMeta* pMeta, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload,
int32_t payloadLen);
*/
typedef struct SMetaReader {
int32_t flags;
void * pMeta;
SDecoder coder;
SMetaEntry me;
void * pBuf;
int32_t szBuf;
struct SStorageAPI *storageAPI;
} SMetaReader;
typedef struct SStoreMetaReaderFn {
typedef struct SStoreMetaReader {
void (*initReader)(void *pReader, void *pMeta, int32_t flags);
void *(*clearReader)();
......@@ -341,9 +357,9 @@ typedef struct SStoreMetaReaderFn {
int32_t (*getTableEntryByUid)();
int32_t (*getTableEntryByName)();
int32_t (*readerGetEntryGetUidCache)(SMetaReader *pReader, tb_uid_t uid);
} SStoreMetaReaderFn;
} SStoreMetaReader;
typedef struct SStoreMetaFn {
typedef struct SStoreMeta {
/*
SMTbCursor *metaOpenTbCursor(SMeta *pMeta);
void metaCloseTbCursor(SMTbCursor *pTbCur);
......@@ -398,7 +414,7 @@ int32_t vnodeGetCtbIdList(void *pVnode, int64_t suid, SArray *list);
int32_t vnodeGetCtbIdListByFilter(void *pVnode, int64_t suid, SArray *list, bool (*filter)(void *arg), void *arg);
int32_t vnodeGetStbIdList(void *pVnode, int64_t suid, SArray *list);
*/
} SStoreMetaFn;
} SStoreMeta;
typedef struct STdbState {
void* rocksdb;
......@@ -450,21 +466,14 @@ typedef struct SUpdateInfo {
} SUpdateInfo;
typedef struct {
void* iter;
void* snapshot;
void* readOpt;
void* db;
// rocksdb_iterator_t* iter;
// rocksdb_snapshot_t* snapshot;
// rocksdb_readoptions_t* readOpt;
// rocksdb_t* db;
void* pCur;
void* iter; // rocksdb_iterator_t* iter;
void* snapshot; // rocksdb_snapshot_t* snapshot;
void* readOpt; // rocksdb_readoptions_t* readOpt;
void* db; // rocksdb_t* db;
void* pCur;
int64_t number;
} SStreamStateCur;
typedef TSKEY (*GetTsFun)(void*);
typedef struct SStateStore {
int32_t (*streamStatePutParName)(SStreamState* pState, int64_t groupId, const char* tbname);
int32_t (*streamStateGetParName)(SStreamState* pState, int64_t groupId, void** pVal);
......@@ -541,33 +550,15 @@ typedef struct SStateStore {
} SStateStore;
typedef struct SStorageAPI {
SStoreMetaFn metaFn; // todo: refactor
SStoreDataReaderFn storeReader;
SStoreMetaReaderFn metaReaderFn;
SStoreCachedDataReaderFn cacheFn;
SStoreSnapshotFn snapshotFn;
SStoreTqReaderFn tqReaderFn;
SStateStore stateStore;
SStoreMeta metaFn; // todo: refactor
SStoreTSDReader tsdReader;
SStoreMetaReader metaReaderFn;
SStoreCacheReader cacheFn;
SStoreSnapshotFn snapshotFn;
SStoreTqReader tqReaderFn;
SStateStore stateStore;
} SStorageAPI;
typedef struct SMTbCursor {
struct TBC *pDbc;
void * pKey;
void * pVal;
int32_t kLen;
int32_t vLen;
SMetaReader mr;
} SMTbCursor;
typedef struct SRowBuffPos {
void* pRowBuff;
void* pKey;
bool beFlushed;
bool beUsed;
} SRowBuffPos;
#ifdef __cplusplus
}
#endif
......
......@@ -208,7 +208,7 @@ typedef struct STableScanBase {
SLimitInfo limitInfo;
// there are more than one table list exists in one task, if only one vnode exists.
STableListInfo* pTableListInfo;
SStoreDataReaderFn readerAPI;
SStoreTSDReader readerAPI;
} STableScanBase;
typedef struct STableScanInfo {
......@@ -224,7 +224,7 @@ typedef struct STableScanInfo {
int8_t assignBlockUid;
bool hasGroupByTag;
bool countOnly;
SStoreDataReaderFn readerAPI;
SStoreTSDReader readerAPI;
} STableScanInfo;
typedef struct STableMergeScanInfo {
......@@ -372,7 +372,7 @@ typedef struct SStreamScanInfo {
int8_t igCheckUpdate;
int8_t igExpired;
void* pState; //void
SStoreTqReaderFn readerFn;
SStoreTqReader readerFn;
SStateStore stateStore;
} SStreamScanInfo;
......
......@@ -166,7 +166,7 @@ void doSetTaskId(SOperatorInfo* pOperator, SStorageAPI *pAPI) {
if (pStreamScanInfo->pTableScanOp != NULL) {
STableScanInfo* pScanInfo = pStreamScanInfo->pTableScanOp->info;
if (pScanInfo->base.dataReader != NULL) {
pAPI->storeReader.setReaderId(pScanInfo->base.dataReader, pTaskInfo->id.str);
pAPI->tsdReader.setReaderId(pScanInfo->base.dataReader, pTaskInfo->id.str);
}
}
} else {
......@@ -1087,7 +1087,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
if (pOffset->type == TMQ_OFFSET__LOG) {
// todo refactor: move away
pTaskInfo->storageAPI.storeReader.storeReaderClose(pScanBaseInfo->dataReader);
pTaskInfo->storageAPI.tsdReader.storeReaderClose(pScanBaseInfo->dataReader);
pScanBaseInfo->dataReader = NULL;
ASSERT(0);
......@@ -1148,7 +1148,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
pScanInfo->scanTimes = 0;
if (pScanBaseInfo->dataReader == NULL) {
int32_t code = pTaskInfo->storageAPI.storeReader.storeReaderOpen(pScanBaseInfo->readHandle.vnode, &pScanBaseInfo->cond, &keyInfo, 1,
int32_t code = pTaskInfo->storageAPI.tsdReader.storeReaderOpen(pScanBaseInfo->readHandle.vnode, &pScanBaseInfo->cond, &keyInfo, 1,
pScanInfo->pResBlock, (void**) &pScanBaseInfo->dataReader, id, false, NULL);
if (code != TSDB_CODE_SUCCESS) {
qError("prepare read tsdb snapshot failed, uid:%" PRId64 ", code:%s %s", pOffset->uid, tstrerror(code), id);
......@@ -1159,8 +1159,8 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
qDebug("tsdb reader created with offset(snapshot) uid:%" PRId64 " ts:%" PRId64 " table index:%d, total:%d, %s",
uid, pScanBaseInfo->cond.twindows.skey, pScanInfo->currentTable, numOfTables, id);
} else {
pTaskInfo->storageAPI.storeReader.storeReaderSetTableList(pScanBaseInfo->dataReader, &keyInfo, 1);
pTaskInfo->storageAPI.storeReader.storeReaderResetStatus(pScanBaseInfo->dataReader, &pScanBaseInfo->cond);
pTaskInfo->storageAPI.tsdReader.storeReaderSetTableList(pScanBaseInfo->dataReader, &keyInfo, 1);
pTaskInfo->storageAPI.tsdReader.storeReaderResetStatus(pScanBaseInfo->dataReader, &pScanBaseInfo->cond);
qDebug("tsdb reader offset seek snapshot to uid:%" PRId64 " ts %" PRId64 " table index:%d numOfTable:%d, %s",
uid, pScanBaseInfo->cond.twindows.skey, pScanInfo->currentTable, numOfTables, id);
}
......@@ -1182,14 +1182,14 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
SOperatorInfo* p = extractOperatorInTree(pOperator, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, id);
STableListInfo* pTableListInfo = ((SStreamRawScanInfo*)(p->info))->pTableListInfo;
if (pAPI->snapshotFn.storeCreateSnapshot(sContext, pOffset->uid) != 0) {
if (pAPI->snapshotFn.createSnapshot(sContext, pOffset->uid) != 0) {
qError("setDataForSnapShot error. uid:%" PRId64 " , %s", pOffset->uid, id);
terrno = TSDB_CODE_PAR_INTERNAL_ERROR;
return -1;
}
SMetaTableInfo mtInfo = pTaskInfo->storageAPI.snapshotFn.storeSSGetTableInfo(sContext);
pTaskInfo->storageAPI.storeReader.storeReaderClose(pInfo->dataReader);
SMetaTableInfo mtInfo = pTaskInfo->storageAPI.snapshotFn.getTableInfoFromSnapshot(sContext);
pTaskInfo->storageAPI.tsdReader.storeReaderClose(pInfo->dataReader);
pInfo->dataReader = NULL;
cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond);
......@@ -1207,7 +1207,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
STableKeyInfo* pList = tableListGetInfo(pTableListInfo, 0);
int32_t size = tableListGetSize(pTableListInfo);
pTaskInfo->storageAPI.storeReader.storeReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pList, size, NULL, (void**) &pInfo->dataReader, NULL,
pTaskInfo->storageAPI.tsdReader.storeReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pList, size, NULL, (void**) &pInfo->dataReader, NULL,
false, NULL);
cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond);
......@@ -1219,7 +1219,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
} else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_META) {
SStreamRawScanInfo* pInfo = pOperator->info;
SSnapContext* sContext = pInfo->sContext;
if (pTaskInfo->storageAPI.snapshotFn.storeCreateSnapshot(sContext, pOffset->uid) != 0) {
if (pTaskInfo->storageAPI.snapshotFn.createSnapshot(sContext, pOffset->uid) != 0) {
qError("setForSnapShot error. uid:%" PRIu64 " ,version:%" PRId64, pOffset->uid, pOffset->version);
terrno = TSDB_CODE_PAR_INTERNAL_ERROR;
return -1;
......@@ -1228,7 +1228,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
id);
} else if (pOffset->type == TMQ_OFFSET__LOG) {
SStreamRawScanInfo* pInfo = pOperator->info;
pTaskInfo->storageAPI.storeReader.storeReaderClose(pInfo->dataReader);
pTaskInfo->storageAPI.tsdReader.storeReaderClose(pInfo->dataReader);
pInfo->dataReader = NULL;
qDebug("tmqsnap qStreamPrepareScan snapshot log, %s", id);
}
......
......@@ -1195,7 +1195,7 @@ void qStreamCloseTsdbReader(void* task) {
qDebug("wait for the reader stopping");
}
pTaskInfo->storageAPI.storeReader.storeReaderClose(pTSInfo->base.dataReader);
pTaskInfo->storageAPI.tsdReader.storeReaderClose(pTSInfo->base.dataReader);
pTSInfo->base.dataReader = NULL;
// restore the status, todo refactor.
......
......@@ -239,7 +239,7 @@ static ERetType doStopDataReader(SOperatorInfo* pOperator, STraverParam* pParam,
STableScanInfo* pInfo = pOperator->info;
if (pInfo->base.dataReader != NULL) {
pAPI->storeReader.storeReaderNotifyClosing(pInfo->base.dataReader);
pAPI->tsdReader.storeReaderNotifyClosing(pInfo->base.dataReader);
}
return OPTR_FN_RET_ABORT;
} else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
......@@ -248,7 +248,7 @@ static ERetType doStopDataReader(SOperatorInfo* pOperator, STraverParam* pParam,
if (pInfo->pTableScanOp != NULL) {
STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
if (pTableScanInfo != NULL && pTableScanInfo->base.dataReader != NULL) {
pAPI->storeReader.storeReaderNotifyClosing(pTableScanInfo->base.dataReader);
pAPI->tsdReader.storeReaderNotifyClosing(pTableScanInfo->base.dataReader);
}
}
......
......@@ -267,7 +267,7 @@ static bool doLoadBlockSMA(STableScanBase* pTableScanInfo, SSDataBlock* pBlock,
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
bool allColumnsHaveAgg = true;
int32_t code = pAPI->storeReader.storeReaderRetrieveBlockSMA(pTableScanInfo->dataReader, pBlock, &allColumnsHaveAgg);
int32_t code = pAPI->tsdReader.storeReaderRetrieveBlockSMA(pTableScanInfo->dataReader, pBlock, &allColumnsHaveAgg);
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code);
}
......@@ -350,7 +350,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
pCost->filterOutBlocks += 1;
pCost->totalRows += pBlock->info.rows;
pAPI->storeReader.storeReaderReleaseDataBlock(pTableScanInfo->dataReader);
pAPI->tsdReader.storeReaderReleaseDataBlock(pTableScanInfo->dataReader);
return TSDB_CODE_SUCCESS;
} else if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) {
qDebug("%s data block skipped, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64 ", uid:%" PRIu64,
......@@ -358,7 +358,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca
pBlockInfo->id.uid);
doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, pBlock->info.rows);
pCost->skipBlocks += 1;
pAPI->storeReader.storeReaderReleaseDataBlock(pTableScanInfo->dataReader);
pAPI->tsdReader.storeReaderReleaseDataBlock(pTableScanInfo->dataReader);
return TSDB_CODE_SUCCESS;
} else if (*status == FUNC_DATA_REQUIRED_SMA_LOAD) {
pCost->loadBlockStatis += 1;
......@@ -368,7 +368,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca
qDebug("%s data block SMA loaded, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64, GET_TASKID(pTaskInfo),
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, pBlock->info.rows);
pAPI->storeReader.storeReaderReleaseDataBlock(pTableScanInfo->dataReader);
pAPI->tsdReader.storeReaderReleaseDataBlock(pTableScanInfo->dataReader);
return TSDB_CODE_SUCCESS;
} else {
qDebug("%s failed to load SMA, since not all columns have SMA", GET_TASKID(pTaskInfo));
......@@ -390,7 +390,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca
pCost->filterOutBlocks += 1;
(*status) = FUNC_DATA_REQUIRED_FILTEROUT;
pAPI->storeReader.storeReaderReleaseDataBlock(pTableScanInfo->dataReader);
pAPI->tsdReader.storeReaderReleaseDataBlock(pTableScanInfo->dataReader);
return TSDB_CODE_SUCCESS;
}
}
......@@ -405,7 +405,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca
qDebug("%s data block skipped due to dynamic prune, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64,
GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
pCost->skipBlocks += 1;
pAPI->storeReader.storeReaderReleaseDataBlock(pTableScanInfo->dataReader);
pAPI->tsdReader.storeReaderReleaseDataBlock(pTableScanInfo->dataReader);
STableScanInfo* p1 = pOperator->info;
if (taosHashGetSize(p1->pIgnoreTables) == taosArrayGetSize(p1->base.pTableListInfo->pTableList)) {
......@@ -419,7 +419,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca
pCost->totalCheckedRows += pBlock->info.rows;
pCost->loadBlocks += 1;
SSDataBlock* p = pAPI->storeReader.storeReaderRetrieveDataBlock(pTableScanInfo->dataReader, NULL);
SSDataBlock* p = pAPI->tsdReader.storeReaderRetrieveDataBlock(pTableScanInfo->dataReader, NULL);
if (p == NULL) {
return terrno;
}
......@@ -693,9 +693,9 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
int64_t st = taosGetTimestampUs();
while (true) {
code = pAPI->storeReader.storeReaderNextDataBlock(pTableScanInfo->base.dataReader, &hasNext);
code = pAPI->tsdReader.storeReaderNextDataBlock(pTableScanInfo->base.dataReader, &hasNext);
if (code) {
pAPI->storeReader.storeReaderReleaseDataBlock(pTableScanInfo->base.dataReader);
pAPI->tsdReader.storeReaderReleaseDataBlock(pTableScanInfo->base.dataReader);
T_LONG_JMP(pTaskInfo->env, code);
}
......@@ -704,12 +704,12 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
}
if (isTaskKilled(pTaskInfo)) {
pAPI->storeReader.storeReaderReleaseDataBlock(pTableScanInfo->base.dataReader);
pAPI->tsdReader.storeReaderReleaseDataBlock(pTableScanInfo->base.dataReader);
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
}
if (pOperator->status == OP_EXEC_DONE) {
pAPI->storeReader.storeReaderReleaseDataBlock(pTableScanInfo->base.dataReader);
pAPI->tsdReader.storeReaderReleaseDataBlock(pTableScanInfo->base.dataReader);
break;
}
......@@ -774,7 +774,7 @@ static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator) {
qDebug("start to repeat ascending order scan data blocks due to query func required, %s", GET_TASKID(pTaskInfo));
// do prepare for the next round table scan operation
pAPI->storeReader.storeReaderResetStatus(pTableScanInfo->base.dataReader, &pTableScanInfo->base.cond);
pAPI->tsdReader.storeReaderResetStatus(pTableScanInfo->base.dataReader, &pTableScanInfo->base.cond);
}
}
......@@ -782,7 +782,7 @@ static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator) {
if (pTableScanInfo->scanTimes < total) {
if (pTableScanInfo->base.cond.order == TSDB_ORDER_ASC) {
prepareForDescendingScan(&pTableScanInfo->base, pOperator->exprSupp.pCtx, 0);
pAPI->storeReader.storeReaderResetStatus(pTableScanInfo->base.dataReader, &pTableScanInfo->base.cond);
pAPI->tsdReader.storeReaderResetStatus(pTableScanInfo->base.dataReader, &pTableScanInfo->base.cond);
qDebug("%s start to descending order scan data blocks due to query func required", GET_TASKID(pTaskInfo));
}
......@@ -800,7 +800,7 @@ static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator) {
pTableScanInfo->base.scanFlag = MAIN_SCAN;
qDebug("%s start to repeat descending order scan data blocks", GET_TASKID(pTaskInfo));
pAPI->storeReader.storeReaderResetStatus(pTableScanInfo->base.dataReader, &pTableScanInfo->base.cond);
pAPI->tsdReader.storeReaderResetStatus(pTableScanInfo->base.dataReader, &pTableScanInfo->base.cond);
}
}
}
......@@ -839,11 +839,11 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
tInfo = *(STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->currentTable);
taosRUnLockLatch(&pTaskInfo->lock);
pAPI->storeReader.storeReaderSetTableList(pInfo->base.dataReader, &tInfo, 1);
pAPI->tsdReader.storeReaderSetTableList(pInfo->base.dataReader, &tInfo, 1);
qDebug("set uid:%" PRIu64 " into scanner, total tables:%d, index:%d/%d %s", tInfo.uid, numOfTables,
pInfo->currentTable, numOfTables, GET_TASKID(pTaskInfo));
pAPI->storeReader.storeReaderResetStatus(pInfo->base.dataReader, &pInfo->base.cond);
pAPI->tsdReader.storeReaderResetStatus(pInfo->base.dataReader, &pInfo->base.cond);
pInfo->scanTimes = 0;
}
} else { // scan table group by group sequentially
......@@ -858,7 +858,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
tableListGetGroupList(pInfo->base.pTableListInfo, pInfo->currentGroupId, &pList, &num);
ASSERT(pInfo->base.dataReader == NULL);
int32_t code = pAPI->storeReader.storeReaderOpen(pInfo->base.readHandle.vnode, &pInfo->base.cond, pList, num, pInfo->pResBlock,
int32_t code = pAPI->tsdReader.storeReaderOpen(pInfo->base.readHandle.vnode, &pInfo->base.cond, pList, num, pInfo->pResBlock,
(void**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo), pInfo->countOnly, &pInfo->pIgnoreTables);
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code);
......@@ -887,8 +887,8 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
STableKeyInfo* pList = NULL;
tableListGetGroupList(pInfo->base.pTableListInfo, pInfo->currentGroupId, &pList, &num);
pAPI->storeReader.storeReaderSetTableList(pInfo->base.dataReader, pList, num);
pAPI->storeReader.storeReaderResetStatus(pInfo->base.dataReader, &pInfo->base.cond);
pAPI->tsdReader.storeReaderSetTableList(pInfo->base.dataReader, pList, num);
pAPI->tsdReader.storeReaderResetStatus(pInfo->base.dataReader, &pInfo->base.cond);
pInfo->scanTimes = 0;
result = doGroupedTableScan(pOperator);
......@@ -910,7 +910,7 @@ static int32_t getTableScannerExecInfo(struct SOperatorInfo* pOptr, void** pOptr
return 0;
}
static void destroyTableScanBase(STableScanBase* pBase, SStoreDataReaderFn* pAPI) {
static void destroyTableScanBase(STableScanBase* pBase, SStoreTSDReader* pAPI) {
cleanupQueryTableDataCond(&pBase->cond);
pAPI->storeReaderClose(pBase->dataReader);
......@@ -1094,7 +1094,7 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU
SSDataBlock* pBlock = pTableScanInfo->pResBlock;
STsdbReader* pReader = NULL;
int32_t code = pAPI->storeReader.storeReaderOpen(pTableScanInfo->base.readHandle.vnode, &cond, &tblInfo, 1, pBlock,
int32_t code = pAPI->tsdReader.storeReaderOpen(pTableScanInfo->base.readHandle.vnode, &cond, &tblInfo, 1, pBlock,
(void**)&pReader, GET_TASKID(pTaskInfo), false, NULL);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
......@@ -1103,7 +1103,7 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU
}
bool hasNext = false;
code = pAPI->storeReader.storeReaderNextDataBlock(pReader, &hasNext);
code = pAPI->tsdReader.storeReaderNextDataBlock(pReader, &hasNext);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
T_LONG_JMP(pTaskInfo->env, code);
......@@ -1111,12 +1111,12 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU
}
if (hasNext) {
/*SSDataBlock* p = */ pAPI->storeReader.storeReaderRetrieveDataBlock(pReader, NULL);
/*SSDataBlock* p = */ pAPI->tsdReader.storeReaderRetrieveDataBlock(pReader, NULL);
doSetTagColumnData(&pTableScanInfo->base, pBlock, pTaskInfo, pBlock->info.rows);
pBlock->info.id.groupId = getTableGroupId(pTableScanInfo->base.pTableListInfo, pBlock->info.id.uid);
}
pAPI->storeReader.storeReaderClose(pReader);
pAPI->tsdReader.storeReaderClose(pReader);
qDebug("retrieve prev rows:%" PRId64 ", skey:%" PRId64 ", ekey:%" PRId64 " uid:%" PRIu64 ", max ver:%" PRId64
", suid:%" PRIu64,
pBlock->info.rows, startTs, endTs, tbUid, maxVersion, cond.suid);
......@@ -1657,7 +1657,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
return pResult;
}
STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
pAPI->storeReader.storeReaderClose(pTSInfo->base.dataReader);
pAPI->tsdReader.storeReaderClose(pTSInfo->base.dataReader);
pTSInfo->base.dataReader = NULL;
qDebug("queue scan tsdb over, switch to wal ver %" PRId64 "", pTaskInfo->streamInfo.snapshotVer + 1);
......@@ -1821,7 +1821,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__SCAN2;
}
pAPI->storeReader.storeReaderClose(pTSInfo->base.dataReader);
pAPI->tsdReader.storeReaderClose(pTSInfo->base.dataReader);
pTSInfo->base.dataReader = NULL;
pInfo->pTableScanOp->status = OP_OPENED;
......@@ -1901,7 +1901,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
}
pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__NONE;
STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
pAPI->storeReader.storeReaderClose(pTSInfo->base.dataReader);
pAPI->tsdReader.storeReaderClose(pTSInfo->base.dataReader);
pTSInfo->base.dataReader = NULL;
......@@ -2162,20 +2162,20 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_DATA) {
bool hasNext = false;
if (pInfo->dataReader) {
code = pAPI->storeReader.storeReaderNextDataBlock(pInfo->dataReader, &hasNext);
code = pAPI->tsdReader.storeReaderNextDataBlock(pInfo->dataReader, &hasNext);
if (code) {
pAPI->storeReader.storeReaderReleaseDataBlock(pInfo->dataReader);
pAPI->tsdReader.storeReaderReleaseDataBlock(pInfo->dataReader);
T_LONG_JMP(pTaskInfo->env, code);
}
}
if (pInfo->dataReader && hasNext) {
if (isTaskKilled(pTaskInfo)) {
pAPI->storeReader.storeReaderReleaseDataBlock(pInfo->dataReader);
pAPI->tsdReader.storeReaderReleaseDataBlock(pInfo->dataReader);
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
}
SSDataBlock* pBlock = pAPI->storeReader.storeReaderRetrieveDataBlock(pInfo->dataReader, NULL);
SSDataBlock* pBlock = pAPI->tsdReader.storeReaderRetrieveDataBlock(pInfo->dataReader, NULL);
if (pBlock == NULL) {
T_LONG_JMP(pTaskInfo->env, terrno);
}
......@@ -2185,7 +2185,7 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
return pBlock;
}
SMetaTableInfo mtInfo = pAPI->snapshotFn.storeSSGetTableInfo(pInfo->sContext);
SMetaTableInfo mtInfo = pAPI->snapshotFn.getTableInfoFromSnapshot(pInfo->sContext);
STqOffsetVal offset = {0};
if (mtInfo.uid == 0) { // read snapshot done, change to get data from wal
qDebug("tmqsnap read snapshot done, change to get data from wal");
......@@ -2203,7 +2203,7 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
int32_t dataLen = 0;
int16_t type = 0;
int64_t uid = 0;
if (pAPI->snapshotFn.storeSSGetMetaInfo(sContext, &data, &dataLen, &type, &uid) < 0) {
if (pAPI->snapshotFn.getMetaInfoFromSnapshot(sContext, &data, &dataLen, &type, &uid) < 0) {
qError("tmqsnap getMetafromSnapShot error");
taosMemoryFreeClear(data);
return NULL;
......@@ -2227,8 +2227,8 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
static void destroyRawScanOperatorInfo(void* param) {
SStreamRawScanInfo* pRawScan = (SStreamRawScanInfo*)param;
pRawScan->pAPI->storeReader.storeReaderClose(pRawScan->dataReader);
pRawScan->pAPI->snapshotFn.storeDestroySnapshot(pRawScan->sContext);
pRawScan->pAPI->tsdReader.storeReaderClose(pRawScan->dataReader);
pRawScan->pAPI->snapshotFn.destroySnapshot(pRawScan->sContext);
tableListDestroy(pRawScan->pTableListInfo);
taosMemoryFree(pRawScan);
}
......@@ -2662,7 +2662,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
SReadHandle* pHandle = &pInfo->base.readHandle;
if (NULL == source->dataReader || !source->multiReader) {
code = pAPI->storeReader.storeReaderOpen(pHandle->vnode, pQueryCond, p, 1, pBlock, (void**)&source->dataReader, GET_TASKID(pTaskInfo), false, NULL);
code = pAPI->tsdReader.storeReaderOpen(pHandle->vnode, pQueryCond, p, 1, pBlock, (void**)&source->dataReader, GET_TASKID(pTaskInfo), false, NULL);
if (code != 0) {
T_LONG_JMP(pTaskInfo->env, code);
}
......@@ -2674,9 +2674,9 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
qTrace("tsdb/read-table-data: %p, enter next reader", reader);
while (true) {
code = pAPI->storeReader.storeReaderNextDataBlock(reader, &hasNext);
code = pAPI->tsdReader.storeReaderNextDataBlock(reader, &hasNext);
if (code != 0) {
pAPI->storeReader.storeReaderReleaseDataBlock(reader);
pAPI->tsdReader.storeReaderReleaseDataBlock(reader);
pInfo->base.dataReader = NULL;
T_LONG_JMP(pTaskInfo->env, code);
}
......@@ -2686,7 +2686,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
}
if (isTaskKilled(pTaskInfo)) {
pAPI->storeReader.storeReaderReleaseDataBlock(reader);
pAPI->tsdReader.storeReaderReleaseDataBlock(reader);
pInfo->base.dataReader = NULL;
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
}
......@@ -2726,7 +2726,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
qTrace("tsdb/read-table-data: %p, close reader", reader);
if (!source->multiReader) {
pAPI->storeReader.storeReaderClose(pInfo->base.dataReader);
pAPI->tsdReader.storeReaderClose(pInfo->base.dataReader);
source->dataReader = NULL;
}
pInfo->base.dataReader = NULL;
......@@ -2734,7 +2734,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
}
if (!source->multiReader) {
pAPI->storeReader.storeReaderClose(pInfo->base.dataReader);
pAPI->tsdReader.storeReaderClose(pInfo->base.dataReader);
source->dataReader = NULL;
}
pInfo->base.dataReader = NULL;
......@@ -2853,7 +2853,7 @@ int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) {
for (int32_t i = 0; i < numOfTable; ++i) {
STableMergeScanSortSourceParam* param = taosArrayGet(pInfo->sortSourceParams, i);
blockDataDestroy(param->inputBlock);
pAPI->storeReader.storeReaderClose(param->dataReader);
pAPI->tsdReader.storeReaderClose(param->dataReader);
param->dataReader = NULL;
}
taosArrayClear(pInfo->sortSourceParams);
......
......@@ -2198,7 +2198,7 @@ static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) {
T_LONG_JMP(pTaskInfo->env, code);
}
pAPI->storeReader.storeReaderGetDataBlockDistInfo(pBlockScanInfo->pHandle, &blockDistInfo);
pAPI->tsdReader.storeReaderGetDataBlockDistInfo(pBlockScanInfo->pHandle, &blockDistInfo);
blockDistInfo.numOfInmemRows = (int32_t) pAPI->metaFn.getNumOfRowsInMem(pBlockScanInfo->pHandle);
SSDataBlock* pBlock = pBlockScanInfo->pResBlock;
......@@ -2229,7 +2229,7 @@ static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) {
static void destroyBlockDistScanOperatorInfo(void* param) {
SBlockDistInfo* pDistInfo = (SBlockDistInfo*)param;
blockDataDestroy(pDistInfo->pResBlock);
pDistInfo->readHandle.api.storeReader.storeReaderClose(pDistInfo->pHandle);
pDistInfo->readHandle.api.tsdReader.storeReaderClose(pDistInfo->pHandle);
tableListDestroy(pDistInfo->pTableListInfo);
taosMemoryFreeClear(param);
}
......@@ -2284,7 +2284,7 @@ SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDi
size_t num = tableListGetSize(pTableListInfo);
void* pList = tableListGetInfo(pTableListInfo, 0);
code = readHandle->api.storeReader.storeReaderOpen(readHandle->vnode, &cond, pList, num, pInfo->pResBlock, (void**)&pInfo->pHandle, pTaskInfo->id.str, false, NULL);
code = readHandle->api.tsdReader.storeReaderOpen(readHandle->vnode, &cond, pList, num, pInfo->pResBlock, (void**)&pInfo->pHandle, pTaskInfo->id.str, false, NULL);
cleanupQueryTableDataCond(&cond);
if (code != 0) {
goto _error;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册