提交 78c3ec87 编写于 作者: H Haojun Liao

[TD-225] refactor: 1. reduce hash node size. 2. function remained.

上级 5708590b
...@@ -38,12 +38,6 @@ typedef struct SLocalDataSource { ...@@ -38,12 +38,6 @@ typedef struct SLocalDataSource {
tFilePage filePage; tFilePage filePage;
} SLocalDataSource; } SLocalDataSource;
enum {
TSC_LOCALREDUCE_READY = 0x0,
TSC_LOCALREDUCE_IN_PROGRESS = 0x1,
TSC_LOCALREDUCE_TOBE_FREED = 0x2,
};
typedef struct SLocalReducer { typedef struct SLocalReducer {
SLocalDataSource ** pLocalDataSrc; SLocalDataSource ** pLocalDataSrc;
int32_t numOfBuffer; int32_t numOfBuffer;
......
...@@ -308,6 +308,7 @@ typedef struct STscObj { ...@@ -308,6 +308,7 @@ typedef struct STscObj {
SRpcCorEpSet *tscCorMgmtEpSet; SRpcCorEpSet *tscCorMgmtEpSet;
void* pDnodeConn; void* pDnodeConn;
pthread_mutex_t mutex; pthread_mutex_t mutex;
int32_t numOfObj; // number of sqlObj from this tscObj
} STscObj; } STscObj;
typedef struct SSubqueryState { typedef struct SSubqueryState {
...@@ -478,14 +479,14 @@ static FORCE_INLINE void tscGetResultColumnChr(SSqlRes* pRes, SFieldInfo* pField ...@@ -478,14 +479,14 @@ static FORCE_INLINE void tscGetResultColumnChr(SSqlRes* pRes, SFieldInfo* pField
} }
} }
extern SCacheObj* tscMetaCache; extern SCacheObj *tscMetaCache;
extern int tscObjRef;
extern void * tscTmr; extern int tscObjRef;
extern void * tscQhandle; extern void *tscTmr;
extern int tscKeepConn[]; extern void *tscQhandle;
extern int tscNumOfThreads; extern int tscKeepConn[];
extern int tscRefId; extern int tscRefId;
extern int tscNumOfObj; // number of existed sqlObj in current process.
extern int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo); extern int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo);
......
...@@ -31,17 +31,16 @@ ...@@ -31,17 +31,16 @@
#include "tlocale.h" #include "tlocale.h"
// global, not configurable // global, not configurable
SCacheObj* tscMetaCache; SCacheObj *tscMetaCache; // table meta cache
SHashObj *tscHashMap; // hash map to keep the global vgroup info
int tscObjRef = -1; int tscObjRef = -1;
void * tscTmr; void *tscTmr;
void * tscQhandle; void *tscQhandle;
void * tscCheckDiskUsageTmr; void *tscCheckDiskUsageTmr;
int tscRefId = -1; int tscRefId = -1;
int tscNumOfObj = 0; // number of sqlObj in current process.
int tscNumOfThreads;
static pthread_once_t tscinit = PTHREAD_ONCE_INIT; static pthread_once_t tscinit = PTHREAD_ONCE_INIT;
//void tscUpdateEpSet(void *ahandle, SRpcEpSet *pEpSet);
void tscCheckDiskUsage(void *UNUSED_PARAM(para), void* UNUSED_PARAM(param)) { void tscCheckDiskUsage(void *UNUSED_PARAM(para), void* UNUSED_PARAM(param)) {
taosGetDisk(); taosGetDisk();
...@@ -114,7 +113,7 @@ void taos_init_imp(void) { ...@@ -114,7 +113,7 @@ void taos_init_imp(void) {
int queueSize = tsMaxConnections*2; int queueSize = tsMaxConnections*2;
double factor = (tscEmbedded == 0)? 2.0:4.0; double factor = (tscEmbedded == 0)? 2.0:4.0;
tscNumOfThreads = (int)(tsNumOfCores * tsNumOfThreadsPerCore / factor); int32_t tscNumOfThreads = (int)(tsNumOfCores * tsNumOfThreadsPerCore / factor);
if (tscNumOfThreads < 2) { if (tscNumOfThreads < 2) {
tscNumOfThreads = 2; tscNumOfThreads = 2;
} }
......
...@@ -458,9 +458,14 @@ void tscFreeRegisteredSqlObj(void *pSql) { ...@@ -458,9 +458,14 @@ void tscFreeRegisteredSqlObj(void *pSql) {
SSqlObj* p = *(SSqlObj**)pSql; SSqlObj* p = *(SSqlObj**)pSql;
STscObj* pTscObj = p->pTscObj; STscObj* pTscObj = p->pTscObj;
assert(p->self != 0); assert(RID_VALID(p->self));
tscFreeSqlObj(p); tscFreeSqlObj(p);
taosReleaseRef(tscRefId, pTscObj->rid); taosReleaseRef(tscRefId, pTscObj->rid);
int32_t num = atomic_sub_fetch_32(&pTscObj->numOfObj, 1);
int32_t total = atomic_sub_fetch_32(&tscNumOfObj, 1);
tscDebug("%p free SqlObj, total in tscObj:%d, total:%d", pSql, num, total);
} }
void tscFreeTableMetaHelper(void *pTableMeta) { void tscFreeTableMetaHelper(void *pTableMeta) {
...@@ -1905,6 +1910,10 @@ void tscResetForNextRetrieve(SSqlRes* pRes) { ...@@ -1905,6 +1910,10 @@ void tscResetForNextRetrieve(SSqlRes* pRes) {
void registerSqlObj(SSqlObj* pSql) { void registerSqlObj(SSqlObj* pSql) {
taosAcquireRef(tscRefId, pSql->pTscObj->rid); taosAcquireRef(tscRefId, pSql->pTscObj->rid);
pSql->self = taosAddRef(tscObjRef, pSql); pSql->self = taosAddRef(tscObjRef, pSql);
int32_t num = atomic_add_fetch_32(&pSql->pTscObj->numOfObj, 1);
int32_t total = atomic_add_fetch_32(&tscNumOfObj, 1);
tscDebug("%p new SqlObj, total in tscObj:%d, total:%d", pSql, num, total);
} }
SSqlObj* createSimpleSubObj(SSqlObj* pSql, void (*fp)(), void* param, int32_t cmd) { SSqlObj* createSimpleSubObj(SSqlObj* pSql, void (*fp)(), void* param, int32_t cmd) {
......
...@@ -32,11 +32,11 @@ typedef void (*_hash_free_fn_t)(void *param); ...@@ -32,11 +32,11 @@ typedef void (*_hash_free_fn_t)(void *param);
typedef struct SHashNode { typedef struct SHashNode {
struct SHashNode *next; struct SHashNode *next;
uint32_t hashVal; // the hash value of key uint32_t hashVal; // the hash value of key
uint32_t keyLen; // length of the key uint32_t dataLen; // length of data
size_t dataLen; // length of data uint32_t keyLen; // length of the key
int8_t count; // reference count int8_t removed; // flag to indicate removed
int8_t removed; // flag to indicate removed int8_t count; // reference count
char data[]; char data[];
} SHashNode; } SHashNode;
...@@ -115,7 +115,7 @@ void *taosHashGet(SHashObj *pHashObj, const void *key, size_t keyLen); ...@@ -115,7 +115,7 @@ void *taosHashGet(SHashObj *pHashObj, const void *key, size_t keyLen);
* @param dsize * @param dsize
* @return * @return
*/ */
void* taosHashGetCB(SHashObj *pHashObj, const void *key, size_t keyLen, void (*fp)(void *), void* d, size_t dsize); void* taosHashGetClone(SHashObj *pHashObj, const void *key, size_t keyLen, void (*fp)(void *), void* d, size_t dsize);
/** /**
* remove item with the specified key * remove item with the specified key
......
...@@ -271,10 +271,10 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *da ...@@ -271,10 +271,10 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *da
} }
void *taosHashGet(SHashObj *pHashObj, const void *key, size_t keyLen) { void *taosHashGet(SHashObj *pHashObj, const void *key, size_t keyLen) {
return taosHashGetCB(pHashObj, key, keyLen, NULL, NULL, 0); return taosHashGetClone(pHashObj, key, keyLen, NULL, NULL, 0);
} }
void* taosHashGetCB(SHashObj *pHashObj, const void *key, size_t keyLen, void (*fp)(void *), void* d, size_t dsize) { void* taosHashGetClone(SHashObj *pHashObj, const void *key, size_t keyLen, void (*fp)(void *), void* d, size_t dsize) {
if (pHashObj->size <= 0 || keyLen == 0 || key == NULL) { if (pHashObj->size <= 0 || keyLen == 0 || key == NULL) {
return NULL; return NULL;
} }
......
...@@ -278,7 +278,7 @@ void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen ...@@ -278,7 +278,7 @@ void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen
} }
SCacheDataNode* ptNode = NULL; SCacheDataNode* ptNode = NULL;
taosHashGetCB(pCacheObj->pHashTable, key, keyLen, incRefFn, &ptNode, sizeof(void*)); taosHashGetClone(pCacheObj->pHashTable, key, keyLen, incRefFn, &ptNode, sizeof(void*));
void* pData = (ptNode != NULL)? ptNode->data:NULL; void* pData = (ptNode != NULL)? ptNode->data:NULL;
......
...@@ -91,7 +91,7 @@ static void vnodeIncRef(void *ptNode) { ...@@ -91,7 +91,7 @@ static void vnodeIncRef(void *ptNode) {
void *vnodeAcquire(int32_t vgId) { void *vnodeAcquire(int32_t vgId) {
SVnodeObj **ppVnode = NULL; SVnodeObj **ppVnode = NULL;
if (tsVnodesHash != NULL) { if (tsVnodesHash != NULL) {
ppVnode = taosHashGetCB(tsVnodesHash, &vgId, sizeof(int32_t), vnodeIncRef, NULL, sizeof(void *)); ppVnode = taosHashGetClone(tsVnodesHash, &vgId, sizeof(int32_t), vnodeIncRef, NULL, sizeof(void *));
} }
if (ppVnode == NULL || *ppVnode == NULL) { if (ppVnode == NULL || *ppVnode == NULL) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册