提交 2e443c39 编写于 作者: H Hongze Cheng

more code

上级 5656b85a
...@@ -65,7 +65,6 @@ typedef struct SSmaInfo SSmaInfo; ...@@ -65,7 +65,6 @@ typedef struct SSmaInfo SSmaInfo;
typedef struct SBlockCol SBlockCol; typedef struct SBlockCol SBlockCol;
typedef struct SVersionRange SVersionRange; typedef struct SVersionRange SVersionRange;
typedef struct SLDataIter SLDataIter; typedef struct SLDataIter SLDataIter;
typedef struct SQueryNode SQueryNode;
typedef struct SDiskCol SDiskCol; typedef struct SDiskCol SDiskCol;
typedef struct SDiskData SDiskData; typedef struct SDiskData SDiskData;
typedef struct SDiskDataBuilder SDiskDataBuilder; typedef struct SDiskDataBuilder SDiskDataBuilder;
...@@ -209,12 +208,10 @@ int32_t tsdbDecmprColData(uint8_t *pIn, SBlockCol *pBlockCol, int8_t cmprAlg, in ...@@ -209,12 +208,10 @@ int32_t tsdbDecmprColData(uint8_t *pIn, SBlockCol *pBlockCol, int8_t cmprAlg, in
uint8_t **ppBuf); uint8_t **ppBuf);
// tsdbMemTable ============================================================================================== // tsdbMemTable ==============================================================================================
// SMemTable // SMemTable
typedef int32_t (*_tsdb_reseek_func_t)(void *pQHandle);
int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable); int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable);
void tsdbMemTableDestroy(SMemTable *pMemTable); void tsdbMemTableDestroy(SMemTable *pMemTable);
STbData *tsdbGetTbDataFromMemTable(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid); STbData *tsdbGetTbDataFromMemTable(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid);
int32_t tsdbRefMemTable(SMemTable *pMemTable, void *pQHandle, _tsdb_reseek_func_t reseek, SQueryNode **ppNode); int32_t tsdbRefMemTable(SMemTable *pMemTable, void *pQHandle, _query_reseek_func_t reseek, SQueryNode **ppNode);
int32_t tsdbUnrefMemTable(SMemTable *pMemTable, SQueryNode *pNode); int32_t tsdbUnrefMemTable(SMemTable *pMemTable, SQueryNode *pNode);
SArray *tsdbMemTableGetTbDataArray(SMemTable *pMemTable); SArray *tsdbMemTableGetTbDataArray(SMemTable *pMemTable);
// STbDataIter // STbDataIter
...@@ -293,7 +290,7 @@ int32_t tsdbDelFReaderClose(SDelFReader **ppReader); ...@@ -293,7 +290,7 @@ int32_t tsdbDelFReaderClose(SDelFReader **ppReader);
int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData); int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData);
int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx); int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx);
// tsdbRead.c ============================================================================================== // tsdbRead.c ==============================================================================================
int32_t tsdbTakeReadSnap(STsdbReader *pReader, _tsdb_reseek_func_t reseek, STsdbReadSnap **ppSnap); int32_t tsdbTakeReadSnap(STsdbReader *pReader, _query_reseek_func_t reseek, STsdbReadSnap **ppSnap);
void tsdbUntakeReadSnap(STsdbReader *pReader, STsdbReadSnap *pSnap); void tsdbUntakeReadSnap(STsdbReader *pReader, STsdbReadSnap *pSnap);
// tsdbMerge.c ============================================================================================== // tsdbMerge.c ==============================================================================================
int32_t tsdbMerge(STsdb *pTsdb); int32_t tsdbMerge(STsdb *pTsdb);
...@@ -369,13 +366,6 @@ struct STbData { ...@@ -369,13 +366,6 @@ struct STbData {
STbData *next; STbData *next;
}; };
struct SQueryNode {
SQueryNode *pNext;
SQueryNode **ppNext;
void *pQHandle;
_tsdb_reseek_func_t reseek;
};
struct SMemTable { struct SMemTable {
SRWLatch latch; SRWLatch latch;
STsdb *pTsdb; STsdb *pTsdb;
...@@ -392,7 +382,6 @@ struct SMemTable { ...@@ -392,7 +382,6 @@ struct SMemTable {
int32_t nBucket; int32_t nBucket;
STbData **aBucket; STbData **aBucket;
}; };
SQueryNode qList;
}; };
struct TSDBROW { struct TSDBROW {
......
...@@ -65,6 +65,11 @@ struct SVBufPool { ...@@ -65,6 +65,11 @@ struct SVBufPool {
SVBufPool* recycleNext; SVBufPool* recycleNext;
SVBufPool* recyclePrev; SVBufPool* recyclePrev;
// query handle list
TdThreadMutex mutex;
int32_t nQuery;
SQueryNode qList;
SVnode* pVnode; SVnode* pVnode;
int32_t id; int32_t id;
volatile int32_t nRef; volatile int32_t nRef;
......
...@@ -76,6 +76,7 @@ typedef struct SRSmaSnapReader SRSmaSnapReader; ...@@ -76,6 +76,7 @@ typedef struct SRSmaSnapReader SRSmaSnapReader;
typedef struct SRSmaSnapWriter SRSmaSnapWriter; typedef struct SRSmaSnapWriter SRSmaSnapWriter;
typedef struct SSnapDataHdr SSnapDataHdr; typedef struct SSnapDataHdr SSnapDataHdr;
typedef struct SCommitInfo SCommitInfo; typedef struct SCommitInfo SCommitInfo;
typedef struct SQueryNode SQueryNode;
#define VNODE_META_DIR "meta" #define VNODE_META_DIR "meta"
#define VNODE_TSDB_DIR "tsdb" #define VNODE_TSDB_DIR "tsdb"
...@@ -92,6 +93,13 @@ typedef struct SCommitInfo SCommitInfo; ...@@ -92,6 +93,13 @@ typedef struct SCommitInfo SCommitInfo;
#define VND_INFO_FNAME "vnode.json" #define VND_INFO_FNAME "vnode.json"
// vnd.h // vnd.h
typedef int32_t (*_query_reseek_func_t)(void* pQHandle);
struct SQueryNode {
SQueryNode* pNext;
SQueryNode** ppNext;
void* pQHandle;
_query_reseek_func_t reseek;
};
void* vnodeBufPoolMalloc(SVBufPool* pPool, int size); void* vnodeBufPoolMalloc(SVBufPool* pPool, int size);
void* vnodeBufPoolMallocAligned(SVBufPool* pPool, int size); void* vnodeBufPoolMallocAligned(SVBufPool* pPool, int size);
...@@ -100,6 +108,9 @@ void vnodeBufPoolRef(SVBufPool* pPool); ...@@ -100,6 +108,9 @@ void vnodeBufPoolRef(SVBufPool* pPool);
void vnodeBufPoolUnRef(SVBufPool* pPool); void vnodeBufPoolUnRef(SVBufPool* pPool);
int vnodeDecodeInfo(uint8_t* pData, SVnodeInfo* pInfo); int vnodeDecodeInfo(uint8_t* pData, SVnodeInfo* pInfo);
int32_t vnodeBufPoolRegisterQuery(SVBufPool* pPool, void* pQHandle, _query_reseek_func_t reseekFn);
int32_t vnodeBufPoolDeregisterQuery(SVBufPool* pPool);
// meta // meta
typedef struct SMCtbCursor SMCtbCursor; typedef struct SMCtbCursor SMCtbCursor;
typedef struct SMStbCursor SMStbCursor; typedef struct SMStbCursor SMStbCursor;
......
...@@ -64,8 +64,8 @@ int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable) { ...@@ -64,8 +64,8 @@ int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable) {
taosMemoryFree(pMemTable); taosMemoryFree(pMemTable);
goto _err; goto _err;
} }
pMemTable->qList.pNext = &pMemTable->qList; // pMemTable->qList.pNext = &pMemTable->qList;
pMemTable->qList.ppNext = &pMemTable->qList.pNext; // pMemTable->qList.ppNext = &pMemTable->qList.pNext;
vnodeBufPoolRef(pMemTable->pPool); vnodeBufPoolRef(pMemTable->pPool);
*ppMemTable = pMemTable; *ppMemTable = pMemTable;
...@@ -749,39 +749,34 @@ _exit: ...@@ -749,39 +749,34 @@ _exit:
int32_t tsdbGetNRowsInTbData(STbData *pTbData) { return pTbData->sl.size; } int32_t tsdbGetNRowsInTbData(STbData *pTbData) { return pTbData->sl.size; }
int32_t tsdbRefMemTable(SMemTable *pMemTable, void *pQHandle, _tsdb_reseek_func_t reseek, SQueryNode **ppNode) { int32_t tsdbRefMemTable(SMemTable *pMemTable, void *pQHandle, _query_reseek_func_t reseek, SQueryNode **ppNode) {
int32_t code = 0; int32_t code = 0;
int32_t nRef = atomic_fetch_add_32(&pMemTable->nRef, 1); int32_t nRef = atomic_fetch_add_32(&pMemTable->nRef, 1);
ASSERT(nRef > 0); ASSERT(nRef > 0);
/*
// register handle (todo: take concurrency in consideration) vnodeBufPoolRegisterQuery(pMemTable->pPool, pQHandle, reseek);
*ppNode = taosMemoryMalloc(sizeof(SQueryNode));
if (*ppNode == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
(*ppNode)->pQHandle = pQHandle;
(*ppNode)->reseek = reseek;
(*ppNode)->pNext = pMemTable->qList.pNext;
(*ppNode)->ppNext = &pMemTable->qList.pNext;
pMemTable->qList.pNext->ppNext = &(*ppNode)->pNext;
pMemTable->qList.pNext = *ppNode;
*/
_exit: _exit:
return code; return code;
} }
int32_t tsdbUnrefMemTable(SMemTable *pMemTable, SQueryNode *pNode) { int32_t tsdbUnrefMemTable(SMemTable *pMemTable, SQueryNode *pNode) {
int32_t code = 0; int32_t code = 0;
/*
if (pNode) {
vnodeBufPoolDeregisterQuery(pMemTable->pPool);
}
#if 0
// unregister handle (todo: take concurrency in consideration) // unregister handle (todo: take concurrency in consideration)
if (pNode) { if (pNode) {
pNode->pNext->ppNext = pNode->ppNext; pNode->pNext->ppNext = pNode->ppNext;
*pNode->ppNext = pNode->pNext; *pNode->ppNext = pNode->pNext;
taosMemoryFree(pNode); taosMemoryFree(pNode);
} }
*/ #endif
int32_t nRef = atomic_sub_fetch_32(&pMemTable->nRef, 1); int32_t nRef = atomic_sub_fetch_32(&pMemTable->nRef, 1);
if (nRef == 0) { if (nRef == 0) {
tsdbMemTableDestroy(pMemTable); tsdbMemTableDestroy(pMemTable);
...@@ -828,28 +823,28 @@ _exit: ...@@ -828,28 +823,28 @@ _exit:
return aTbDataP; return aTbDataP;
} }
int32_t tsdbRecycleMemTable(SMemTable *pMemTable) { // int32_t tsdbRecycleMemTable(SMemTable *pMemTable) {
int32_t code = 0; // int32_t code = 0;
SQueryNode *pNode = pMemTable->qList.pNext; // SQueryNode *pNode = pMemTable->qList.pNext;
while (1) { // while (1) {
ASSERT(pNode != &pMemTable->qList); // ASSERT(pNode != &pMemTable->qList);
SQueryNode *pNextNode = pNode->pNext; // SQueryNode *pNextNode = pNode->pNext;
if (pNextNode == &pMemTable->qList) { // if (pNextNode == &pMemTable->qList) {
code = (*pNode->reseek)(pNode->pQHandle); // code = (*pNode->reseek)(pNode->pQHandle);
if (code) goto _exit; // if (code) goto _exit;
break; // break;
} else { // } else {
code = (*pNode->reseek)(pNode->pQHandle); // code = (*pNode->reseek)(pNode->pQHandle);
if (code) goto _exit; // if (code) goto _exit;
pNode = pMemTable->qList.pNext; // pNode = pMemTable->qList.pNext;
ASSERT(pNode == pNextNode); // ASSERT(pNode == pNextNode);
} // }
} // }
// NOTE: Take care here, pMemTable is destroyed // // NOTE: Take care here, pMemTable is destroyed
_exit: // _exit:
return code; // return code;
} // }
...@@ -4614,7 +4614,7 @@ int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int6 ...@@ -4614,7 +4614,7 @@ int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int6
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t tsdbTakeReadSnap(STsdbReader* pReader, _tsdb_reseek_func_t reseek, STsdbReadSnap** ppSnap) { int32_t tsdbTakeReadSnap(STsdbReader* pReader, _query_reseek_func_t reseek, STsdbReadSnap** ppSnap) {
int32_t code = 0; int32_t code = 0;
STsdb* pTsdb = pReader->pTsdb; STsdb* pTsdb = pReader->pTsdb;
SVersionRange* pRange = &pReader->verRange; SVersionRange* pRange = &pReader->verRange;
......
...@@ -25,6 +25,13 @@ static int vnodeBufPoolCreate(SVnode *pVnode, int32_t id, int64_t size, SVBufPoo ...@@ -25,6 +25,13 @@ static int vnodeBufPoolCreate(SVnode *pVnode, int32_t id, int64_t size, SVBufPoo
return -1; return -1;
} }
memset(pPool, 0, sizeof(SVBufPool)); memset(pPool, 0, sizeof(SVBufPool));
// query handle list
taosThreadMutexInit(&pPool->mutex, NULL);
pPool->nQuery = 0;
pPool->qList.pNext = &pPool->qList;
pPool->qList.ppNext = &pPool->qList.pNext;
pPool->pVnode = pVnode; pPool->pVnode = pVnode;
pPool->id = id; pPool->id = id;
pPool->ptr = pPool->node.data; pPool->ptr = pPool->node.data;
...@@ -60,6 +67,7 @@ static int vnodeBufPoolDestroy(SVBufPool *pPool) { ...@@ -60,6 +67,7 @@ static int vnodeBufPoolDestroy(SVBufPool *pPool) {
taosThreadSpinDestroy(pPool->lock); taosThreadSpinDestroy(pPool->lock);
taosMemoryFree((void *)pPool->lock); taosMemoryFree((void *)pPool->lock);
} }
taosThreadMutexDestroy(&pPool->mutex);
taosMemoryFree(pPool); taosMemoryFree(pPool);
return 0; return 0;
} }
...@@ -97,6 +105,7 @@ int vnodeCloseBufPool(SVnode *pVnode) { ...@@ -97,6 +105,7 @@ int vnodeCloseBufPool(SVnode *pVnode) {
} }
void vnodeBufPoolReset(SVBufPool *pPool) { void vnodeBufPoolReset(SVBufPool *pPool) {
ASSERT(pPool->nQuery == 0);
for (SVBufPoolNode *pNode = pPool->pTail; pNode->prev; pNode = pPool->pTail) { for (SVBufPoolNode *pNode = pPool->pTail; pNode->prev; pNode = pPool->pTail) {
ASSERT(pNode->pnext == &pPool->pTail); ASSERT(pNode->pnext == &pPool->pTail);
pNode->prev->pnext = &pPool->pTail; pNode->prev->pnext = &pPool->pTail;
...@@ -255,3 +264,41 @@ void vnodeBufPoolUnRef(SVBufPool *pPool) { ...@@ -255,3 +264,41 @@ void vnodeBufPoolUnRef(SVBufPool *pPool) {
taosThreadMutexUnlock(&pVnode->mutex); taosThreadMutexUnlock(&pVnode->mutex);
} }
int32_t vnodeBufPoolRegisterQuery(SVBufPool *pPool, void *pQHandle, _query_reseek_func_t reseekFn) {
int32_t code = 0;
SQueryNode *pQNode = taosMemoryMalloc(sizeof(*pQNode));
if (pQNode == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
pQNode->pQHandle = pQHandle;
pQNode->reseek = reseekFn;
taosThreadMutexLock(&pPool->mutex);
pQNode->pNext = pPool->qList.pNext;
pQNode->ppNext = &pPool->qList.pNext;
pPool->qList.pNext->ppNext = &pQNode->pNext;
pPool->qList.pNext = pQNode;
pPool->nQuery++;
taosThreadMutexUnlock(&pPool->mutex);
_exit:
return code;
}
int32_t vnodeBufPoolDeregisterQuery(SVBufPool *pPool) {
int32_t code = 0;
taosThreadMutexLock(&pPool->mutex);
ASSERT(0);
taosThreadMutexUnlock(&pPool->mutex);
_exit:
return code;
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册