提交 d2141ea5 编写于 作者: H Hongze Cheng

more code

上级 87b59d21
...@@ -206,8 +206,8 @@ int32_t tsdbDecmprColData(uint8_t *pIn, SBlockCol *pBlockCol, int8_t cmprAlg, in ...@@ -206,8 +206,8 @@ int32_t tsdbDecmprColData(uint8_t *pIn, SBlockCol *pBlockCol, int8_t cmprAlg, in
int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable); int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable);
void tsdbMemTableDestroy(SMemTable *pMemTable); void tsdbMemTableDestroy(SMemTable *pMemTable);
STbData *tsdbGetTbDataFromMemTable(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid); STbData *tsdbGetTbDataFromMemTable(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid);
void tsdbRefMemTable(SMemTable *pMemTable); int32_t tsdbRefMemTable(SMemTable *pMemTable, STsdbReader *pReader);
void tsdbUnrefMemTable(SMemTable *pMemTable); int32_t tsdbUnrefMemTable(SMemTable *pMemTable, STsdbReader *pReader);
SArray *tsdbMemTableGetTbDataArray(SMemTable *pMemTable); SArray *tsdbMemTableGetTbDataArray(SMemTable *pMemTable);
// STbDataIter // STbDataIter
int32_t tsdbTbDataIterCreate(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDataIter **ppIter); int32_t tsdbTbDataIterCreate(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDataIter **ppIter);
...@@ -285,8 +285,8 @@ int32_t tsdbDelFReaderClose(SDelFReader **ppReader); ...@@ -285,8 +285,8 @@ int32_t tsdbDelFReaderClose(SDelFReader **ppReader);
int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData); int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData);
int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx); int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx);
// tsdbRead.c ============================================================================================== // tsdbRead.c ==============================================================================================
int32_t tsdbTakeReadSnap(STsdb *pTsdb, STsdbReadSnap **ppSnap); int32_t tsdbTakeReadSnap(STsdbReader *pReader, STsdbReadSnap **ppSnap);
void tsdbUntakeReadSnap(STsdb *pTsdb, STsdbReadSnap *pSnap); void tsdbUntakeReadSnap(STsdbReader *pReader, STsdbReadSnap *pSnap);
// tsdbMerge.c ============================================================================================== // tsdbMerge.c ==============================================================================================
int32_t tsdbMerge(STsdb *pTsdb); int32_t tsdbMerge(STsdb *pTsdb);
......
...@@ -847,7 +847,7 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs ...@@ -847,7 +847,7 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs
tb_uid_t suid = getTableSuidByUid(uid, pTsdb); tb_uid_t suid = getTableSuidByUid(uid, pTsdb);
tsdbTakeReadSnap(pTsdb, &pIter->pReadSnap); tsdbTakeReadSnap(NULL /*pTsdb (todo)*/, &pIter->pReadSnap);
STbData *pMem = NULL; STbData *pMem = NULL;
if (pIter->pReadSnap->pMem) { if (pIter->pReadSnap->pMem) {
...@@ -941,7 +941,7 @@ static int32_t nextRowIterClose(CacheNextRowIter *pIter) { ...@@ -941,7 +941,7 @@ static int32_t nextRowIterClose(CacheNextRowIter *pIter) {
taosArrayDestroy(pIter->pSkyline); taosArrayDestroy(pIter->pSkyline);
} }
tsdbUntakeReadSnap(pIter->pTsdb, pIter->pReadSnap); tsdbUntakeReadSnap(NULL /*pIter->pTsdb (todo)*/, pIter->pReadSnap);
_err: _err:
return code; return code;
......
...@@ -158,7 +158,7 @@ int32_t tsdbCommit(STsdb *pTsdb) { ...@@ -158,7 +158,7 @@ int32_t tsdbCommit(STsdb *pTsdb) {
pTsdb->mem = NULL; pTsdb->mem = NULL;
taosThreadRwlockUnlock(&pTsdb->rwLock); taosThreadRwlockUnlock(&pTsdb->rwLock);
tsdbUnrefMemTable(pMemTable); tsdbUnrefMemTable(pMemTable, NULL);
goto _exit; goto _exit;
} }
...@@ -983,7 +983,7 @@ static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno) { ...@@ -983,7 +983,7 @@ static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno) {
// unlock // unlock
taosThreadRwlockUnlock(&pTsdb->rwLock); taosThreadRwlockUnlock(&pTsdb->rwLock);
tsdbUnrefMemTable(pMemTable); tsdbUnrefMemTable(pMemTable, NULL);
tsdbFSDestroy(&pCommitter->fs); tsdbFSDestroy(&pCommitter->fs);
taosArrayDestroy(pCommitter->aTbDataP); taosArrayDestroy(pCommitter->aTbDataP);
......
...@@ -629,16 +629,32 @@ _err: ...@@ -629,16 +629,32 @@ _err:
int32_t tsdbGetNRowsInTbData(STbData *pTbData) { return pTbData->sl.size; } int32_t tsdbGetNRowsInTbData(STbData *pTbData) { return pTbData->sl.size; }
void tsdbRefMemTable(SMemTable *pMemTable) { int32_t tsdbRefMemTable(SMemTable *pMemTable, STsdbReader *pReader) {
int32_t code = 0;
int32_t nRef = atomic_fetch_add_32(&pMemTable->nRef, 1); int32_t nRef = atomic_fetch_add_32(&pMemTable->nRef, 1);
ASSERT(nRef > 0); ASSERT(nRef > 0);
// register handle (todo)
if (pReader) {
}
return code;
} }
void tsdbUnrefMemTable(SMemTable *pMemTable) { int32_t tsdbUnrefMemTable(SMemTable *pMemTable, STsdbReader *pReader) {
int32_t code = 0;
// unregister handle (todo)
if (pReader) {
}
int32_t nRef = atomic_sub_fetch_32(&pMemTable->nRef, 1); int32_t nRef = atomic_sub_fetch_32(&pMemTable->nRef, 1);
if (nRef == 0) { if (nRef == 0) {
tsdbMemTableDestroy(pMemTable); tsdbMemTableDestroy(pMemTable);
} }
return code;
} }
static FORCE_INLINE int32_t tbDataPCmprFn(const void *p1, const void *p2) { static FORCE_INLINE int32_t tbDataPCmprFn(const void *p1, const void *p2) {
......
...@@ -3354,7 +3354,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl ...@@ -3354,7 +3354,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl
goto _err; goto _err;
} }
code = tsdbTakeReadSnap(pReader->pTsdb, &pReader->pReadSnap); code = tsdbTakeReadSnap(pReader, &pReader->pReadSnap);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _err; goto _err;
} }
...@@ -3378,7 +3378,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl ...@@ -3378,7 +3378,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl
STsdbReader* pPrevReader = pReader->innerReader[0]; STsdbReader* pPrevReader = pReader->innerReader[0];
SDataBlockIter* pBlockIter = &pPrevReader->status.blockIter; SDataBlockIter* pBlockIter = &pPrevReader->status.blockIter;
code = tsdbTakeReadSnap(pPrevReader->pTsdb, &pPrevReader->pReadSnap); code = tsdbTakeReadSnap(pPrevReader, &pPrevReader->pReadSnap);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _err; goto _err;
} }
...@@ -3435,7 +3435,7 @@ void tsdbReaderClose(STsdbReader* pReader) { ...@@ -3435,7 +3435,7 @@ void tsdbReaderClose(STsdbReader* pReader) {
tsdbDataFReaderClose(&pReader->pFileReader); tsdbDataFReaderClose(&pReader->pFileReader);
} }
tsdbUntakeReadSnap(pReader->pTsdb, pReader->pReadSnap); tsdbUntakeReadSnap(pReader, pReader->pReadSnap);
taosMemoryFree(pReader->status.uidCheckInfo.tableUidList); taosMemoryFree(pReader->status.uidCheckInfo.tableUidList);
SIOCostSummary* pCost = &pReader->cost; SIOCostSummary* pCost = &pReader->cost;
...@@ -3858,8 +3858,10 @@ int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int6 ...@@ -3858,8 +3858,10 @@ int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int6
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t tsdbTakeReadSnap(STsdb* pTsdb, STsdbReadSnap** ppSnap) { int32_t tsdbTakeReadSnap(STsdbReader* pReader, STsdbReadSnap** ppSnap) {
int32_t code = 0; int32_t code = 0;
STsdb* pTsdb = pReader->pTsdb;
SVersionRange* pRange = &pReader->verRange;
// alloc // alloc
*ppSnap = (STsdbReadSnap*)taosMemoryCalloc(1, sizeof(STsdbReadSnap)); *ppSnap = (STsdbReadSnap*)taosMemoryCalloc(1, sizeof(STsdbReadSnap));
...@@ -3876,15 +3878,14 @@ int32_t tsdbTakeReadSnap(STsdb* pTsdb, STsdbReadSnap** ppSnap) { ...@@ -3876,15 +3878,14 @@ int32_t tsdbTakeReadSnap(STsdb* pTsdb, STsdbReadSnap** ppSnap) {
} }
// take snapshot // take snapshot
(*ppSnap)->pMem = pTsdb->mem; if (pTsdb->mem && (pRange->minVer <= pTsdb->mem->maxVer && pRange->maxVer >= pTsdb->mem->minVer)) {
(*ppSnap)->pIMem = pTsdb->imem; tsdbRefMemTable(pTsdb->mem, pReader);
(*ppSnap)->pMem = pTsdb->mem;
if ((*ppSnap)->pMem) {
tsdbRefMemTable((*ppSnap)->pMem);
} }
if ((*ppSnap)->pIMem) { if (pTsdb->imem && (pRange->minVer <= pTsdb->imem->maxVer && pRange->maxVer >= pTsdb->imem->minVer)) {
tsdbRefMemTable((*ppSnap)->pIMem); tsdbRefMemTable(pTsdb->imem, pReader);
(*ppSnap)->pIMem = pTsdb->imem;
} }
// fs // fs
...@@ -3906,14 +3907,16 @@ _exit: ...@@ -3906,14 +3907,16 @@ _exit:
return code; return code;
} }
void tsdbUntakeReadSnap(STsdb* pTsdb, STsdbReadSnap* pSnap) { void tsdbUntakeReadSnap(STsdbReader* pReader, STsdbReadSnap* pSnap) {
STsdb* pTsdb = pReader->pTsdb;
if (pSnap) { if (pSnap) {
if (pSnap->pMem) { if (pSnap->pMem) {
tsdbUnrefMemTable(pSnap->pMem); tsdbUnrefMemTable(pSnap->pMem, pReader);
} }
if (pSnap->pIMem) { if (pSnap->pIMem) {
tsdbUnrefMemTable(pSnap->pIMem); tsdbUnrefMemTable(pSnap->pIMem, pReader);
} }
tsdbFSUnref(pTsdb, &pSnap->fs); tsdbFSUnref(pTsdb, &pSnap->fs);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册