提交 f1ff5dce 编写于 作者: H Haojun Liao

fix(query): use hashmap to keep the multiple schema.

上级 0ae2034e
......@@ -48,6 +48,13 @@ SSHashObj *tSimpleHashInit(size_t capacity, _hash_fn_t fn);
*/
int32_t tSimpleHashGetSize(const SSHashObj *pHashObj);
/**
* set the free function pointer
* @param pHashObj
* @param freeFp
*/
void tSimpleHashSetFreeFp(SSHashObj* pHashObj, _hash_free_fn_t freeFp);
int32_t tSimpleHashPrint(const SSHashObj *pHashObj);
/**
......
......@@ -15,6 +15,7 @@
#include "osDef.h"
#include "tsdb.h"
#include "tsimplehash.h"
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
......@@ -177,7 +178,8 @@ struct STsdbReader {
STsdbReadSnap* pReadSnap;
SIOCostSummary cost;
STSchema* pSchema; // the newest version schema
STSchema* pMemSchema; // the previous schema for in-memory data, to avoid load schema too many times
// STSchema* pMemSchema; // the previous schema for in-memory data, to avoid load schema too many times
SSHashObj* pSchemaMap; // keep the retrieved schema info, to avoid the overhead by repeatly load schema
SDataFReader* pFileReader; // the file reader
SDelFReader* pDelFReader; // the del file reader
SArray* pDelIdx; // del file block index;
......@@ -1858,28 +1860,23 @@ static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader*
return pReader->pSchema;
}
if (pReader->pMemSchema == NULL) {
int32_t code =
metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pReader->pMemSchema);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
return NULL;
} else {
return pReader->pMemSchema;
}
void** p = tSimpleHashGet(pReader->pSchemaMap, &sversion, sizeof(sversion));
if (p != NULL) {
return *(STSchema**) p;
}
if (pReader->pMemSchema->version == sversion) {
return pReader->pMemSchema;
}
taosMemoryFreeClear(pReader->pMemSchema);
int32_t code = metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pReader->pMemSchema);
if (code != TSDB_CODE_SUCCESS || pReader->pMemSchema == NULL) {
STSchema* ptr = NULL;
int32_t code = metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &ptr);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
return NULL;
} else {
return pReader->pMemSchema;
code = tSimpleHashPut(pReader->pSchemaMap, &sversion, sizeof(sversion), &ptr, POINTER_BYTES);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
return NULL;
}
return ptr;
}
}
......@@ -4002,6 +3999,11 @@ static int32_t doOpenReaderImpl(STsdbReader* pReader) {
return code;
}
static void freeSchemaFunc(void* param) {
void* p = *(void**) param;
taosMemoryFree(p);
}
// ====================================== EXPOSED APIs ======================================
int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableList, int32_t numOfTables,
SSDataBlock* pResBlock, STsdbReader** ppReader, const char* idstr) {
......@@ -4078,6 +4080,14 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableL
}
}
pReader->pSchemaMap = tSimpleHashInit(8, taosFastHash);
if (pReader->pSchemaMap == NULL) {
tsdbError("failed init schema hash for reader", pReader->idStr);
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
tSimpleHashSetFreeFp(pReader->pSchemaMap, freeSchemaFunc);
if (pReader->pSchema != NULL) {
code = updateBlockSMAInfo(pReader->pSchema, &pReader->suppInfo);
if (code != TSDB_CODE_SUCCESS) {
......@@ -4120,7 +4130,7 @@ void tsdbReaderClose(STsdbReader* pReader) {
p->status.uidList.tableUidList = NULL;
p->pReadSnap = NULL;
p->pSchema = NULL;
p->pMemSchema = NULL;
p->pSchemaMap = NULL;
p = pReader->innerReader[1];
......@@ -4128,7 +4138,7 @@ void tsdbReaderClose(STsdbReader* pReader) {
p->status.uidList.tableUidList = NULL;
p->pReadSnap = NULL;
p->pSchema = NULL;
p->pMemSchema = NULL;
p->pSchemaMap = NULL;
tsdbReaderClose(pReader->innerReader[0]);
tsdbReaderClose(pReader->innerReader[1]);
......@@ -4208,10 +4218,7 @@ void tsdbReaderClose(STsdbReader* pReader) {
taosMemoryFree(pReader->idStr);
taosMemoryFree(pReader->pSchema);
if (pReader->pMemSchema != pReader->pSchema) {
taosMemoryFree(pReader->pMemSchema);
}
tSimpleHashCleanup(pReader->pSchemaMap);
taosMemoryFreeClear(pReader);
}
......@@ -4371,14 +4378,14 @@ int32_t tsdbReaderResume(STsdbReader* pReader) {
pPrevReader->status.pTableMap = pReader->status.pTableMap;
pPrevReader->status.uidList = pReader->status.uidList;
pPrevReader->pSchema = pReader->pSchema;
pPrevReader->pMemSchema = pReader->pMemSchema;
pPrevReader->pSchemaMap = pReader->pSchemaMap;
pPrevReader->pReadSnap = pReader->pReadSnap;
pNextReader->capacity = 1;
pNextReader->status.pTableMap = pReader->status.pTableMap;
pNextReader->status.uidList = pReader->status.uidList;
pNextReader->pSchema = pReader->pSchema;
pNextReader->pMemSchema = pReader->pMemSchema;
pNextReader->pSchemaMap = pReader->pSchemaMap;
pNextReader->pReadSnap = pReader->pReadSnap;
code = doOpenReaderImpl(pPrevReader);
......
......@@ -28,19 +28,23 @@
#define HASH_INDEX(v, c) ((v) & ((c)-1))
#define FREE_HASH_NODE(_n) \
do { \
taosMemoryFreeClear(_n); \
#define FREE_HASH_NODE(_n, fp) \
do { \
if (fp) { \
fp((_n)->data); \
} \
taosMemoryFreeClear(_n); \
} while (0);
struct SSHashObj {
SHNode **hashList;
size_t capacity; // number of slots
int64_t size; // number of elements in hash table
_hash_fn_t hashFp; // hash function
_equal_fn_t equalFp; // equal function
SArray* pHashNodeBuf;// hash node allocation buffer, 1k size of each page by default
int32_t offset; // allocation offset in current page
SHNode **hashList;
size_t capacity; // number of slots
int64_t size; // number of elements in hash table
_hash_fn_t hashFp; // hash function
_equal_fn_t equalFp; // equal function
_hash_free_fn_t freeFp; // free function
SArray *pHashNodeBuf; // hash node allocation buffer, 1k size of each page by default
int32_t offset; // allocation offset in current page
};
static FORCE_INLINE int32_t taosHashCapacity(int32_t length) {
......@@ -71,7 +75,6 @@ SSHashObj *tSimpleHashInit(size_t capacity, _hash_fn_t fn) {
pHashObj->capacity = taosHashCapacity((int32_t)capacity);
pHashObj->equalFp = memcmp;
pHashObj->pHashNodeBuf = taosArrayInit(10, sizeof(void*));
pHashObj->offset = 0;
pHashObj->size = 0;
......@@ -92,6 +95,10 @@ int32_t tSimpleHashGetSize(const SSHashObj *pHashObj) {
return (int32_t) pHashObj->size;
}
void tSimpleHashSetFreeFp(SSHashObj* pHashObj, _hash_free_fn_t freeFp) {
pHashObj->freeFp = freeFp;
}
static void* doInternalAlloc(SSHashObj* pHashObj, int32_t size) {
#if 0
void** p = taosArrayGetLast(pHashObj->pHashNodeBuf);
......@@ -306,7 +313,8 @@ int32_t tSimpleHashRemove(SSHashObj *pHashObj, const void *key, size_t keyLen) {
} else {
pPrev->next = pNode->next;
}
FREE_HASH_NODE(pNode);
FREE_HASH_NODE(pNode, pHashObj->freeFp);
pHashObj->size -= 1;
code = TSDB_CODE_SUCCESS;
break;
......@@ -341,7 +349,7 @@ int32_t tSimpleHashIterateRemove(SSHashObj *pHashObj, const void *key, size_t ke
*pIter = pPrev ? GET_SHASH_NODE_DATA(pPrev) : NULL;
}
FREE_HASH_NODE(pNode);
FREE_HASH_NODE(pNode, pHashObj->freeFp);
pHashObj->size -= 1;
break;
}
......@@ -370,14 +378,13 @@ void tSimpleHashClear(SSHashObj *pHashObj) {
while (pNode) {
pNext = pNode->next;
FREE_HASH_NODE(pNode);
FREE_HASH_NODE(pNode, pHashObj->freeFp);
pNode = pNext;
}
pHashObj->hashList[i] = NULL;
}
taosArrayClearEx(pHashObj->pHashNodeBuf, destroyItems);
pHashObj->offset = 0;
pHashObj->size = 0;
}
......@@ -388,7 +395,6 @@ void tSimpleHashCleanup(SSHashObj *pHashObj) {
}
tSimpleHashClear(pHashObj);
taosArrayDestroy(pHashObj->pHashNodeBuf);
taosMemoryFreeClear(pHashObj->hashList);
taosMemoryFree(pHashObj);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册