提交 3a427340 编写于 作者: H Hongze Cheng

more code

上级 4a281e32
...@@ -65,6 +65,7 @@ typedef struct SSmaInfo SSmaInfo; ...@@ -65,6 +65,7 @@ typedef struct SSmaInfo SSmaInfo;
typedef struct SBlockCol SBlockCol; typedef struct SBlockCol SBlockCol;
typedef struct SVersionRange SVersionRange; typedef struct SVersionRange SVersionRange;
typedef struct SLDataIter SLDataIter; typedef struct SLDataIter SLDataIter;
typedef struct SQueryNode SQueryNode;
#define TSDB_FILE_DLMT ((uint32_t)0xF00AFA0F) #define TSDB_FILE_DLMT ((uint32_t)0xF00AFA0F)
#define TSDB_MAX_SUBBLOCKS 8 #define TSDB_MAX_SUBBLOCKS 8
...@@ -206,8 +207,8 @@ int32_t tsdbDecmprColData(uint8_t *pIn, SBlockCol *pBlockCol, int8_t cmprAlg, in ...@@ -206,8 +207,8 @@ int32_t tsdbDecmprColData(uint8_t *pIn, SBlockCol *pBlockCol, int8_t cmprAlg, in
int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable); int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable);
void tsdbMemTableDestroy(SMemTable *pMemTable); void tsdbMemTableDestroy(SMemTable *pMemTable);
STbData *tsdbGetTbDataFromMemTable(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid); STbData *tsdbGetTbDataFromMemTable(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid);
int32_t tsdbRefMemTable(SMemTable *pMemTable, STsdbReader *pReader); int32_t tsdbRefMemTable(SMemTable *pMemTable, void *pQueryHandle, SQueryNode **ppNode);
int32_t tsdbUnrefMemTable(SMemTable *pMemTable, STsdbReader *pReader); int32_t tsdbUnrefMemTable(SMemTable *pMemTable, SQueryNode *pNode);
SArray *tsdbMemTableGetTbDataArray(SMemTable *pMemTable); SArray *tsdbMemTableGetTbDataArray(SMemTable *pMemTable);
// STbDataIter // STbDataIter
int32_t tsdbTbDataIterCreate(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDataIter **ppIter); int32_t tsdbTbDataIterCreate(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDataIter **ppIter);
...@@ -365,6 +366,12 @@ struct STbData { ...@@ -365,6 +366,12 @@ struct STbData {
STbData *next; STbData *next;
}; };
struct SQueryNode {
SQueryNode *pNext;
SQueryNode **ppNext;
void *pQueryHandle;
};
struct SMemTable { struct SMemTable {
SRWLatch latch; SRWLatch latch;
STsdb *pTsdb; STsdb *pTsdb;
...@@ -381,7 +388,7 @@ struct SMemTable { ...@@ -381,7 +388,7 @@ struct SMemTable {
int32_t nBucket; int32_t nBucket;
STbData **aBucket; STbData **aBucket;
}; };
STsdbReader *pReaderList; SQueryNode *qList;
}; };
struct TSDBROW { struct TSDBROW {
...@@ -593,7 +600,9 @@ struct SDelFWriter { ...@@ -593,7 +600,9 @@ struct SDelFWriter {
struct STsdbReadSnap { struct STsdbReadSnap {
SMemTable *pMem; SMemTable *pMem;
SQueryNode *pNode;
SMemTable *pIMem; SMemTable *pIMem;
SQueryNode *pINode;
STsdbFS fs; STsdbFS fs;
}; };
......
...@@ -629,24 +629,35 @@ _err: ...@@ -629,24 +629,35 @@ _err:
int32_t tsdbGetNRowsInTbData(STbData *pTbData) { return pTbData->sl.size; } int32_t tsdbGetNRowsInTbData(STbData *pTbData) { return pTbData->sl.size; }
int32_t tsdbRefMemTable(SMemTable *pMemTable, STsdbReader *pReader) { int32_t tsdbRefMemTable(SMemTable *pMemTable, void *pQueryHandle, SQueryNode **ppNode) {
int32_t code = 0; int32_t code = 0;
int32_t nRef = atomic_fetch_add_32(&pMemTable->nRef, 1); int32_t nRef = atomic_fetch_add_32(&pMemTable->nRef, 1);
ASSERT(nRef > 0); ASSERT(nRef > 0);
// register handle (todo) // register handle
if (pReader) { *ppNode = taosMemoryMalloc(sizeof(SQueryNode));
if (*ppNode == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
} }
(*ppNode)->pQueryHandle = pQueryHandle;
(*ppNode)->pNext = pMemTable->qList;
(*ppNode)->ppNext = &pMemTable->qList;
pMemTable->qList = *ppNode;
_exit:
return code; return code;
} }
int32_t tsdbUnrefMemTable(SMemTable *pMemTable, STsdbReader *pReader) { int32_t tsdbUnrefMemTable(SMemTable *pMemTable, SQueryNode *pNode) {
int32_t code = 0; int32_t code = 0;
// unregister handle (todo) // unregister handle
if (pReader) { if (pNode) {
pNode->pNext->ppNext = pNode->ppNext;
*pNode->ppNext = pNode->pNext;
taosMemoryFree(pNode);
} }
int32_t nRef = atomic_sub_fetch_32(&pMemTable->nRef, 1); int32_t nRef = atomic_sub_fetch_32(&pMemTable->nRef, 1);
......
...@@ -3879,12 +3879,12 @@ int32_t tsdbTakeReadSnap(STsdbReader* pReader, STsdbReadSnap** ppSnap) { ...@@ -3879,12 +3879,12 @@ int32_t tsdbTakeReadSnap(STsdbReader* pReader, STsdbReadSnap** ppSnap) {
// take snapshot // take snapshot
if (pTsdb->mem && (pRange->minVer <= pTsdb->mem->maxVer && pRange->maxVer >= pTsdb->mem->minVer)) { if (pTsdb->mem && (pRange->minVer <= pTsdb->mem->maxVer && pRange->maxVer >= pTsdb->mem->minVer)) {
tsdbRefMemTable(pTsdb->mem, pReader); tsdbRefMemTable(pTsdb->mem, pReader, &(*ppSnap)->pNode);
(*ppSnap)->pMem = pTsdb->mem; (*ppSnap)->pMem = pTsdb->mem;
} }
if (pTsdb->imem && (pRange->minVer <= pTsdb->imem->maxVer && pRange->maxVer >= pTsdb->imem->minVer)) { if (pTsdb->imem && (pRange->minVer <= pTsdb->imem->maxVer && pRange->maxVer >= pTsdb->imem->minVer)) {
tsdbRefMemTable(pTsdb->imem, pReader); tsdbRefMemTable(pTsdb->imem, pReader, &(*ppSnap)->pINode);
(*ppSnap)->pIMem = pTsdb->imem; (*ppSnap)->pIMem = pTsdb->imem;
} }
...@@ -3912,11 +3912,11 @@ void tsdbUntakeReadSnap(STsdbReader* pReader, STsdbReadSnap* pSnap) { ...@@ -3912,11 +3912,11 @@ void tsdbUntakeReadSnap(STsdbReader* pReader, STsdbReadSnap* pSnap) {
if (pSnap) { if (pSnap) {
if (pSnap->pMem) { if (pSnap->pMem) {
tsdbUnrefMemTable(pSnap->pMem, pReader); tsdbUnrefMemTable(pSnap->pMem, pSnap->pNode);
} }
if (pSnap->pIMem) { if (pSnap->pIMem) {
tsdbUnrefMemTable(pSnap->pIMem, pReader); tsdbUnrefMemTable(pSnap->pIMem, pSnap->pINode);
} }
tsdbFSUnref(pTsdb, &pSnap->fs); tsdbFSUnref(pTsdb, &pSnap->fs);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册