提交 eac38475 编写于 作者: M Minglei Jin

fix: new reseek callback to separate tsdb & cache readers

上级 545607cc
...@@ -207,10 +207,12 @@ int32_t tsdbDecmprColData(uint8_t *pIn, SBlockCol *pBlockCol, int8_t cmprAlg, in ...@@ -207,10 +207,12 @@ int32_t tsdbDecmprColData(uint8_t *pIn, SBlockCol *pBlockCol, int8_t cmprAlg, in
uint8_t **ppBuf); uint8_t **ppBuf);
// tsdbMemTable ============================================================================================== // tsdbMemTable ==============================================================================================
// SMemTable // SMemTable
typedef int32_t (*_tsdb_reseek_func_t)(void *pQHandle);
int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable); int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable);
void tsdbMemTableDestroy(SMemTable *pMemTable); void tsdbMemTableDestroy(SMemTable *pMemTable);
STbData *tsdbGetTbDataFromMemTable(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid); STbData *tsdbGetTbDataFromMemTable(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid);
int32_t tsdbRefMemTable(SMemTable *pMemTable, void *pQHandle, SQueryNode **ppNode); int32_t tsdbRefMemTable(SMemTable *pMemTable, void *pQHandle, _tsdb_reseek_func_t reseek, SQueryNode **ppNode);
int32_t tsdbUnrefMemTable(SMemTable *pMemTable, SQueryNode *pNode); int32_t tsdbUnrefMemTable(SMemTable *pMemTable, SQueryNode *pNode);
SArray *tsdbMemTableGetTbDataArray(SMemTable *pMemTable); SArray *tsdbMemTableGetTbDataArray(SMemTable *pMemTable);
// STbDataIter // STbDataIter
...@@ -290,7 +292,7 @@ int32_t tsdbDelFReaderClose(SDelFReader **ppReader); ...@@ -290,7 +292,7 @@ int32_t tsdbDelFReaderClose(SDelFReader **ppReader);
int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData); int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData);
int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx); int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx);
// tsdbRead.c ============================================================================================== // tsdbRead.c ==============================================================================================
int32_t tsdbTakeReadSnap(STsdbReader *pReader, STsdbReadSnap **ppSnap); int32_t tsdbTakeReadSnap(STsdbReader *pReader, _tsdb_reseek_func_t reseek, STsdbReadSnap **ppSnap);
void tsdbUntakeReadSnap(STsdbReader *pReader, STsdbReadSnap *pSnap); void tsdbUntakeReadSnap(STsdbReader *pReader, STsdbReadSnap *pSnap);
// tsdbMerge.c ============================================================================================== // tsdbMerge.c ==============================================================================================
int32_t tsdbMerge(STsdb *pTsdb); int32_t tsdbMerge(STsdb *pTsdb);
...@@ -362,9 +364,10 @@ struct STbData { ...@@ -362,9 +364,10 @@ struct STbData {
}; };
struct SQueryNode { struct SQueryNode {
SQueryNode *pNext; SQueryNode *pNext;
SQueryNode **ppNext; SQueryNode **ppNext;
void *pQHandle; void *pQHandle;
_tsdb_reseek_func_t reseek;
}; };
struct SMemTable { struct SMemTable {
......
...@@ -194,6 +194,12 @@ static void freeItem(void* pItem) { ...@@ -194,6 +194,12 @@ static void freeItem(void* pItem) {
} }
} }
static int32_t tsdbCacheQueryReseek(void* pQHandle) {
int32_t code = 0;
return code;
}
int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32_t* slotIds, SArray* pTableUidList) { int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32_t* slotIds, SArray* pTableUidList) {
if (pReader == NULL || pResBlock == NULL) { if (pReader == NULL || pResBlock == NULL) {
return TSDB_CODE_INVALID_PARA; return TSDB_CODE_INVALID_PARA;
...@@ -237,7 +243,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 ...@@ -237,7 +243,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
taosArrayPush(pLastCols, &p); taosArrayPush(pLastCols, &p);
} }
tsdbTakeReadSnap(NULL, &pr->pReadSnap); tsdbTakeReadSnap(NULL, tsdbCacheQueryReseek, &pr->pReadSnap);
pr->pDataFReader = NULL; pr->pDataFReader = NULL;
pr->pDataFReaderLast = NULL; pr->pDataFReaderLast = NULL;
......
...@@ -636,7 +636,7 @@ _err: ...@@ -636,7 +636,7 @@ _err:
int32_t tsdbGetNRowsInTbData(STbData *pTbData) { return pTbData->sl.size; } int32_t tsdbGetNRowsInTbData(STbData *pTbData) { return pTbData->sl.size; }
int32_t tsdbRefMemTable(SMemTable *pMemTable, void *pQHandle, SQueryNode **ppNode) { int32_t tsdbRefMemTable(SMemTable *pMemTable, void *pQHandle, _tsdb_reseek_func_t reseek, SQueryNode **ppNode) {
int32_t code = 0; int32_t code = 0;
int32_t nRef = atomic_fetch_add_32(&pMemTable->nRef, 1); int32_t nRef = atomic_fetch_add_32(&pMemTable->nRef, 1);
...@@ -649,6 +649,7 @@ int32_t tsdbRefMemTable(SMemTable *pMemTable, void *pQHandle, SQueryNode **ppNod ...@@ -649,6 +649,7 @@ int32_t tsdbRefMemTable(SMemTable *pMemTable, void *pQHandle, SQueryNode **ppNod
goto _exit; goto _exit;
} }
(*ppNode)->pQHandle = pQHandle; (*ppNode)->pQHandle = pQHandle;
(*ppNode)->reseek = reseek;
(*ppNode)->pNext = pMemTable->qList.pNext; (*ppNode)->pNext = pMemTable->qList.pNext;
(*ppNode)->ppNext = &pMemTable->qList.pNext; (*ppNode)->ppNext = &pMemTable->qList.pNext;
pMemTable->qList.pNext->ppNext = &(*ppNode)->pNext; pMemTable->qList.pNext->ppNext = &(*ppNode)->pNext;
...@@ -714,8 +715,6 @@ _exit: ...@@ -714,8 +715,6 @@ _exit:
return aTbDataP; return aTbDataP;
} }
extern int32_t tsdbSetQueryReseek(void *pQHandle);
int32_t tsdbRecycleMemTable(SMemTable *pMemTable) { int32_t tsdbRecycleMemTable(SMemTable *pMemTable) {
int32_t code = 0; int32_t code = 0;
...@@ -725,11 +724,11 @@ int32_t tsdbRecycleMemTable(SMemTable *pMemTable) { ...@@ -725,11 +724,11 @@ int32_t tsdbRecycleMemTable(SMemTable *pMemTable) {
SQueryNode *pNextNode = pNode->pNext; SQueryNode *pNextNode = pNode->pNext;
if (pNextNode == &pMemTable->qList) { if (pNextNode == &pMemTable->qList) {
code = tsdbSetQueryReseek(pNode->pQHandle); code = (*pNode->reseek)(pNode->pQHandle);
if (code) goto _exit; if (code) goto _exit;
break; break;
} else { } else {
code = tsdbSetQueryReseek(pNode->pQHandle); code = (*pNode->reseek)(pNode->pQHandle);
if (code) goto _exit; if (code) goto _exit;
pNode = pMemTable->qList.pNext; pNode = pMemTable->qList.pNext;
ASSERT(pNode == pNextNode); ASSERT(pNode == pNextNode);
......
...@@ -3489,6 +3489,8 @@ static int32_t doOpenReaderImpl(STsdbReader* pReader) { ...@@ -3489,6 +3489,8 @@ static int32_t doOpenReaderImpl(STsdbReader* pReader) {
} }
// ====================================== EXPOSED APIs ====================================== // ====================================== EXPOSED APIs ======================================
static int32_t tsdbSetQueryReseek(void* pQHandle);
int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTableList, STsdbReader** ppReader, int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTableList, STsdbReader** ppReader,
const char* idstr) { const char* idstr) {
STimeWindow window = pCond->twindows; STimeWindow window = pCond->twindows;
...@@ -3570,7 +3572,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl ...@@ -3570,7 +3572,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl
} }
if (numOfTables > 0) { if (numOfTables > 0) {
code = tsdbTakeReadSnap(pReader, &pReader->pReadSnap); code = tsdbTakeReadSnap(pReader, tsdbSetQueryReseek, &pReader->pReadSnap);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _err; goto _err;
} }
...@@ -4106,7 +4108,7 @@ int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int6 ...@@ -4106,7 +4108,7 @@ int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int6
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t tsdbTakeReadSnap(STsdbReader* pReader, STsdbReadSnap** ppSnap) { int32_t tsdbTakeReadSnap(STsdbReader* pReader, _tsdb_reseek_func_t reseek, STsdbReadSnap** ppSnap) {
int32_t code = 0; int32_t code = 0;
STsdb* pTsdb = pReader->pTsdb; STsdb* pTsdb = pReader->pTsdb;
SVersionRange* pRange = &pReader->verRange; SVersionRange* pRange = &pReader->verRange;
...@@ -4127,12 +4129,12 @@ int32_t tsdbTakeReadSnap(STsdbReader* pReader, STsdbReadSnap** ppSnap) { ...@@ -4127,12 +4129,12 @@ int32_t tsdbTakeReadSnap(STsdbReader* pReader, STsdbReadSnap** ppSnap) {
// take snapshot // take snapshot
if (pTsdb->mem && (pRange->minVer <= pTsdb->mem->maxVer && pRange->maxVer >= pTsdb->mem->minVer)) { if (pTsdb->mem && (pRange->minVer <= pTsdb->mem->maxVer && pRange->maxVer >= pTsdb->mem->minVer)) {
tsdbRefMemTable(pTsdb->mem, pReader, &(*ppSnap)->pNode); tsdbRefMemTable(pTsdb->mem, pReader, reseek, &(*ppSnap)->pNode);
(*ppSnap)->pMem = pTsdb->mem; (*ppSnap)->pMem = pTsdb->mem;
} }
if (pTsdb->imem && (pRange->minVer <= pTsdb->imem->maxVer && pRange->maxVer >= pTsdb->imem->minVer)) { if (pTsdb->imem && (pRange->minVer <= pTsdb->imem->maxVer && pRange->maxVer >= pTsdb->imem->minVer)) {
tsdbRefMemTable(pTsdb->imem, pReader, &(*ppSnap)->pINode); tsdbRefMemTable(pTsdb->imem, pReader, reseek, &(*ppSnap)->pINode);
(*ppSnap)->pIMem = pTsdb->imem; (*ppSnap)->pIMem = pTsdb->imem;
} }
...@@ -4173,7 +4175,7 @@ void tsdbUntakeReadSnap(STsdbReader* pReader, STsdbReadSnap* pSnap) { ...@@ -4173,7 +4175,7 @@ void tsdbUntakeReadSnap(STsdbReader* pReader, STsdbReadSnap* pSnap) {
tsdbTrace("vgId:%d, untake read snapshot", TD_VID(pTsdb->pVnode)); tsdbTrace("vgId:%d, untake read snapshot", TD_VID(pTsdb->pVnode));
} }
int32_t tsdbSetQueryReseek(void* pQHandle) { static int32_t tsdbSetQueryReseek(void* pQHandle) {
int32_t code = 0; int32_t code = 0;
// lock handle // lock handle
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册