提交 6996aabe 编写于 作者: S Shengliang Guan

Merge remote-tracking branch 'origin/3.0' into feature/config

add_executable(simulate_vnode "simulate_vnode.c")
target_link_libraries(simulate_vnode craft lz4 uv_a)
\ No newline at end of file
target_link_libraries(simulate_vnode PUBLIC craft lz4 uv_a)
\ No newline at end of file
......@@ -3,4 +3,4 @@ target_sources(singleNode
PRIVATE
"singleNode.c"
)
target_link_libraries(singleNode traft lz4 uv_a)
target_link_libraries(singleNode PUBLIC traft lz4 uv_a)
......@@ -44,6 +44,7 @@ extern "C" {
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/utsname.h>
#include <sys/param.h>
#include <unistd.h>
#include <wchar.h>
#include <wctype.h>
......
......@@ -40,55 +40,9 @@ typedef struct SCacheStatis {
int64_t refreshCount;
} SCacheStatis;
struct STrashElem;
typedef struct SCacheDataNode {
uint64_t addedTime; // the added time when this element is added or updated into cache
uint64_t lifespan; // life duration when this element should be remove from cache
uint64_t expireTime; // expire time
uint64_t signature;
struct STrashElem *pTNodeHeader; // point to trash node head
uint16_t keySize : 15; // max key size: 32kb
bool inTrashcan : 1; // denote if it is in trash or not
uint32_t size; // allocated size for current SCacheDataNode
T_REF_DECLARE()
char *key;
char data[];
} SCacheDataNode;
typedef struct STrashElem {
struct STrashElem *prev;
struct STrashElem *next;
SCacheDataNode *pData;
} STrashElem;
/*
* to accommodate the old data which has the same key value of new one in hashList
* when an new node is put into cache, if an existed one with the same key:
* 1. if the old one does not be referenced, update it.
* 2. otherwise, move the old one to pTrash, addedTime the new one.
*
* when the node in pTrash does not be referenced, it will be release at the expired expiredTime
*/
typedef struct {
int64_t totalSize; // total allocated buffer in this hash table, SCacheObj is not included.
int64_t refreshTime;
STrashElem *pTrash;
char *name;
SCacheStatis statistics;
SHashObj *pHashTable;
__cache_free_fn_t freeFp;
uint32_t numOfElemsInTrash; // number of element in trash
uint8_t deleting; // set the deleting flag to stop refreshing ASAP.
pthread_t refreshWorker;
bool extendLifespan; // auto extend life span when one item is accessed.
int64_t checkTick; // tick used to record the check times of the refresh threads
#if defined(LINUX)
pthread_rwlock_t lock;
#else
pthread_mutex_t lock;
#endif
} SCacheObj;
typedef struct SCacheObj SCacheObj;
typedef struct SCacheIter SCacheIter;
typedef struct STrashElem STrashElem;
/**
* initialize the cache object
......@@ -141,7 +95,7 @@ void *taosCacheAcquireByData(SCacheObj *pCacheObj, void *data);
* @param data
* @return
*/
void *taosCacheTransfer(SCacheObj *pCacheObj, void **data);
void *taosCacheTransferData(SCacheObj *pCacheObj, void **data);
/**
* remove data in cache, the data will not be removed immediately.
......@@ -152,6 +106,13 @@ void *taosCacheTransfer(SCacheObj *pCacheObj, void **data);
*/
void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove);
/**
*
* @param pCacheObj
* @return
*/
size_t taosCacheGetNumOfObj(const SCacheObj* pCacheObj);
/**
* move all data node into trash, clear node in trash can if it is not referenced by any clients
* @param handle
......@@ -184,6 +145,12 @@ void taosCacheRefresh(SCacheObj *pCacheObj, __cache_trav_fn_t fp, void *param1);
*/
void taosStopCacheRefreshWorker();
SCacheIter* taosCacheCreateIter(const SCacheObj* pCacheObj);
bool taosCacheIterNext(SCacheIter* pIter);
void* taosCacheIterGetData(const SCacheIter* pIter, size_t* dataLen);
void* taosCacheIterGetKey(const SCacheIter* pIter, size_t* keyLen);
void taosCacheDestroyIter(SCacheIter* pIter);
#ifdef __cplusplus
}
#endif
......
......@@ -28,11 +28,6 @@ typedef int32_t (*_equal_fn_t)(const void *, const void *, size_t len);
typedef void (*_hash_before_fn_t)(void *);
typedef void (*_hash_free_fn_t)(void *);
#define HASH_MAX_CAPACITY (1024 * 1024 * 16)
#define HASH_DEFAULT_LOAD_FACTOR (0.75)
#define HASH_INDEX(v, c) ((v) & ((c)-1))
#define HASH_NODE_EXIST(code) (code == -2)
/**
......@@ -62,41 +57,17 @@ typedef struct SHashNode {
uint32_t hashVal; // the hash value of key
uint32_t dataLen; // length of data
uint32_t keyLen; // length of the key
uint16_t count; // reference count
uint16_t refCount; // reference count
int8_t removed; // flag to indicate removed
char data[];
} SHashNode;
#define GET_HASH_NODE_KEY(_n) ((char *)(_n) + sizeof(SHashNode) + (_n)->dataLen)
#define GET_HASH_NODE_DATA(_n) ((char *)(_n) + sizeof(SHashNode))
#define GET_HASH_PNODE(_n) ((SHashNode *)((char *)(_n) - sizeof(SHashNode)))
typedef enum SHashLockTypeE {
HASH_NO_LOCK = 0,
HASH_ENTRY_LOCK = 1,
} SHashLockTypeE;
typedef struct SHashEntry {
int32_t num; // number of elements in current entry
SRWLatch latch; // entry latch
SHashNode *next;
} SHashEntry;
typedef struct SHashObj {
SHashEntry **hashList;
uint32_t capacity; // number of slots
uint32_t size; // number of elements in hash table
_hash_fn_t hashFp; // hash function
_hash_free_fn_t freeFp; // hash node free callback function
_equal_fn_t equalFp; // equal function
_hash_before_fn_t callbackFp; // function invoked before return the value to caller
SRWLatch lock; // read-write spin lock
SHashLockTypeE type; // lock type
bool enableUpdate; // enable update
SArray *pMemBlock; // memory block allocated for SHashEntry
} SHashObj;
typedef struct SHashObj SHashObj;
/**
* init the hash table
......@@ -126,8 +97,6 @@ int32_t taosHashGetSize(const SHashObj *pHashObj);
*/
int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *data, size_t size);
int32_t taosHashPutExt(SHashObj *pHashObj, const void *key, size_t keyLen, void *data, size_t size, bool *newAdded);
/**
* return the payload data with the specified key
*
......@@ -146,17 +115,18 @@ void *taosHashGet(SHashObj *pHashObj, const void *key, size_t keyLen);
* @param destBuf
* @return
*/
void *taosHashGetClone(SHashObj *pHashObj, const void *key, size_t keyLen, void *destBuf);
int32_t taosHashGetDup(SHashObj *pHashObj, const void *key, size_t keyLen, void *destBuf);
/**
* Clone the result to interval allocated buffer
*
* @param pHashObj
* @param key
* @param keyLen
* @param destBuf
* @param size
* @return
*/
void *taosHashGetCloneExt(SHashObj *pHashObj, const void *key, size_t keyLen, void (*fp)(void *), void **d, size_t *sz);
int32_t taosHashGetDup_m(SHashObj* pHashObj, const void* key, size_t keyLen, void** destBuf, int32_t* size);
/**
* remove item with the specified key
......@@ -207,37 +177,13 @@ void *taosHashIterate(SHashObj *pHashObj, void *p);
*/
void taosHashCancelIterate(SHashObj *pHashObj, void *p);
/**
* Get the corresponding key information for a given data in hash table
* @param data
* @return
*/
int32_t taosHashGetKey(void *data, void **key, size_t *keyLen);
/**
* Get the corresponding key information for a given data in hash table, using memcpy
* @param data
* @param dst
* @return
*/
static FORCE_INLINE int32_t taosHashCopyKey(void *data, void *dst) {
if (NULL == data || NULL == dst) {
return -1;
}
SHashNode *node = GET_HASH_PNODE(data);
void *key = GET_HASH_NODE_KEY(node);
memcpy(dst, key, node->keyLen);
return 0;
}
/**
* Get the corresponding data length for a given data in hash table
* @param data
* @return
*/
int32_t taosHashGetDataLen(void *data);
/**
* Get the corresponding key information for a given data in hash table
* @param data
* @param keyLen
* @return
*/
void *taosHashGetKey(void *data, size_t* keyLen);
/**
* return the payload data with the specified key(reference number added)
......@@ -258,8 +204,20 @@ void *taosHashAcquire(SHashObj *pHashObj, const void *key, size_t keyLen);
*/
void taosHashRelease(SHashObj *pHashObj, void *p);
/**
*
* @param pHashObj
* @param fp
*/
void taosHashSetEqualFp(SHashObj *pHashObj, _equal_fn_t fp);
/**
*
* @param pHashObj
* @param fp
*/
void taosHashSetFreeFp(SHashObj *pHashObj, _hash_free_fn_t fp);
#ifdef __cplusplus
}
#endif
......
......@@ -53,7 +53,7 @@ typedef struct SDiskbasedBufStatis {
* @param handle
* @return
*/
int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMemBufSize, uint64_t qId, const char* dir);
int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMemBufSize, const char* id, const char* dir);
/**
*
......
......@@ -473,7 +473,8 @@ SAppHbMgr *appHbMgrInit(SAppInstInfo *pAppInstInfo, char *key) {
free(pAppHbMgr);
return NULL;
}
pAppHbMgr->activeInfo->freeFp = tFreeClientHbReq;
taosHashSetFreeFp(pAppHbMgr->activeInfo, tFreeClientHbReq);
// init getInfoFunc
pAppHbMgr->connInfo = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
......
......@@ -85,7 +85,7 @@ static SVnodeObj *dndAcquireVnode(SDnode *pDnode, int32_t vgId) {
int32_t refCount = 0;
taosRLockLatch(&pMgmt->latch);
taosHashGetClone(pMgmt->hash, &vgId, sizeof(int32_t), (void *)&pVnode);
taosHashGetDup(pMgmt->hash, &vgId, sizeof(int32_t), (void *)&pVnode);
if (pVnode == NULL) {
terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
} else {
......
......@@ -48,7 +48,7 @@ static SConnObj *mndCreateConn(SMnode *pMnode, SRpcConnInfo *pInfo, int32_t pid,
static void mndFreeConn(SConnObj *pConn);
static SConnObj *mndAcquireConn(SMnode *pMnode, int32_t connId);
static void mndReleaseConn(SMnode *pMnode, SConnObj *pConn);
static void *mndGetNextConn(SMnode *pMnode, void *pIter, SConnObj **pConn);
static void *mndGetNextConn(SMnode *pMnode, SCacheIter *pIter);
static void mndCancelGetNextConn(SMnode *pMnode, void *pIter);
static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq);
static int32_t mndProcessConnectReq(SMnodeMsg *pReq);
......@@ -158,27 +158,23 @@ static void mndReleaseConn(SMnode *pMnode, SConnObj *pConn) {
taosCacheRelease(pMgmt->cache, (void **)&pConn, false);
}
static void *mndGetNextConn(SMnode *pMnode, void *pIter, SConnObj **pConn) {
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
*pConn = NULL;
pIter = taosHashIterate(pMgmt->cache->pHashTable, pIter);
if (pIter == NULL) return NULL;
SCacheDataNode **pNode = pIter;
if (pNode == NULL || *pNode == NULL) {
taosHashCancelIterate(pMgmt->cache->pHashTable, pIter);
return NULL;
void *mndGetNextConn(SMnode *pMnode, SCacheIter *pIter) {
SConnObj* pConn = NULL;
bool hasNext = taosCacheIterNext(pIter);
if (hasNext) {
size_t dataLen = 0;
pConn = taosCacheIterGetData(pIter, &dataLen);
} else {
taosCacheDestroyIter(pIter);
}
*pConn = (SConnObj *)((*pNode)->data);
return pIter;
return pConn;
}
static void mndCancelGetNextConn(SMnode *pMnode, void *pIter) {
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
taosHashCancelIterate(pMgmt->cache->pHashTable, pIter);
if (pIter != NULL) {
taosCacheDestroyIter(pIter);
}
}
static int32_t mndProcessConnectReq(SMnodeMsg *pReq) {
......@@ -376,8 +372,8 @@ static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) {
int32_t rspLen = 0;
mndValidateDbInfo(pMnode, kv->value, kv->valueLen / sizeof(SDbVgVersion), &rspMsg, &rspLen);
if (rspMsg && rspLen > 0) {
SKv kv = {.key = HEARTBEAT_KEY_DBINFO, .valueLen = rspLen, .value = rspMsg};
taosArrayPush(hbRsp.info, &kv);
SKv kv1 = {.key = HEARTBEAT_KEY_DBINFO, .valueLen = rspLen, .value = rspMsg};
taosArrayPush(hbRsp.info, &kv1);
}
break;
}
......@@ -386,8 +382,8 @@ static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) {
int32_t rspLen = 0;
mndValidateStbInfo(pMnode, kv->value, kv->valueLen / sizeof(SSTableMetaVersion), &rspMsg, &rspLen);
if (rspMsg && rspLen > 0) {
SKv kv = {.key = HEARTBEAT_KEY_STBINFO, .valueLen = rspLen, .value = rspMsg};
taosArrayPush(hbRsp.info, &kv);
SKv kv1 = {.key = HEARTBEAT_KEY_STBINFO, .valueLen = rspLen, .value = rspMsg};
taosArrayPush(hbRsp.info, &kv1);
}
break;
}
......@@ -638,7 +634,7 @@ static int32_t mndGetConnsMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *
pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
}
pShow->numOfRows = taosHashGetSize(pMgmt->cache->pHashTable);
pShow->numOfRows = taosCacheGetNumOfObj(pMgmt->cache);
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
strcpy(pMeta->tbName, mndShowStr(pShow->type));
......@@ -653,8 +649,13 @@ static int32_t mndRetrieveConns(SMnodeMsg *pReq, SShowObj *pShow, char *data, in
char *pWrite;
char ipStr[TSDB_IPv4ADDR_LEN + 6];
if (pShow->pIter == NULL) {
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
pShow->pIter = taosCacheCreateIter(pMgmt->cache);
}
while (numOfRows < rows) {
pShow->pIter = mndGetNextConn(pMnode, pShow->pIter, &pConn);
pConn = mndGetNextConn(pMnode, pShow->pIter);
if (pConn == NULL) break;
cols = 0;
......@@ -823,19 +824,24 @@ static int32_t mndRetrieveQueries(SMnodeMsg *pReq, SShowObj *pShow, char *data,
void *pIter;
char str[TSDB_IPv4ADDR_LEN + 6] = {0};
if (pShow->pIter == NULL) {
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
pShow->pIter = taosCacheCreateIter(pMgmt->cache);
}
while (numOfRows < rows) {
pIter = mndGetNextConn(pMnode, pShow->pIter, &pConn);
pConn = mndGetNextConn(pMnode, pShow->pIter);
if (pConn == NULL) {
pShow->pIter = pIter;
pShow->pIter = NULL;
break;
}
if (numOfRows + pConn->numOfQueries >= rows) {
mndCancelGetNextConn(pMnode, pIter);
taosCacheDestroyIter(pShow->pIter);
pShow->pIter = NULL;
break;
}
pShow->pIter = pIter;
for (int32_t i = 0; i < pConn->numOfQueries; ++i) {
SQueryDesc *pDesc = pConn->pQueries + i;
cols = 0;
......@@ -913,8 +919,9 @@ static int32_t mndRetrieveQueries(SMnodeMsg *pReq, SShowObj *pShow, char *data,
}
static void mndCancelGetNextQuery(SMnode *pMnode, void *pIter) {
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
taosHashCancelIterate(pMgmt->cache->pHashTable, pIter);
if (pIter != NULL) {
taosCacheDestroyIter(pIter);
}
}
int32_t mndGetNumOfConnections(SMnode *pMnode) { return taosHashGetSize(pMnode->profileMgmt.cache->pHashTable); }
\ No newline at end of file
......@@ -168,7 +168,7 @@ void ctgDbgShowTableMeta(SCatalog* pCtg, const char *tbName, STableMeta* p) {
ctgDebug("table [%s] meta: type:%d, vgId:%d, uid:%" PRIx64 ",suid:%" PRIx64 ",sv:%d, tv:%d, tagNum:%d, precision:%d, colNum:%d, rowSize:%d",
tbName, p->tableType, p->vgId, p->uid, p->suid, p->sversion, p->tversion, c->numOfTags, c->precision, c->numOfColumns, c->rowSize);
}
int32_t colNum = c->numOfColumns + c->numOfTags;
for (int32_t i = 0; i < colNum; ++i) {
SSchema *s = &p->schema[i];
......@@ -190,7 +190,7 @@ void ctgDbgShowDBCache(SCatalog* pCtg, SHashObj *dbHash) {
dbCache = (SCtgDBCache *)pIter;
taosHashGetKey(dbCache, (void **)&dbFName, &len);
taosHashGetKey((void **)&dbFName, &len);
int32_t metaNum = dbCache->tbCache.metaCache ? taosHashGetSize(dbCache->tbCache.metaCache) : 0;
int32_t stbNum = dbCache->tbCache.stbCache ? taosHashGetSize(dbCache->tbCache.stbCache) : 0;
......@@ -204,9 +204,9 @@ void ctgDbgShowDBCache(SCatalog* pCtg, SHashObj *dbHash) {
if (dbCache->vgInfo->vgHash) {
vgNum = taosHashGetSize(dbCache->vgInfo->vgHash);
}
}
}
ctgDebug("[%d] db [%.*s][%"PRIx64"] %s: metaNum:%d, stbNum:%d, vgVersion:%d, hashMethod:%d, vgNum:%d",
ctgDebug("[%d] db [%.*s][%"PRIx64"] %s: metaNum:%d, stbNum:%d, vgVersion:%d, hashMethod:%d, vgNum:%d",
i, (int32_t)len, dbFName, dbCache->dbId, dbCache->deleted?"deleted":"", metaNum, stbNum, vgVersion, hashMethod, vgNum);
pIter = taosHashIterate(dbHash, pIter);
......@@ -222,7 +222,7 @@ void ctgDbgShowClusterCache(SCatalog* pCtg) {
}
ctgDebug("## cluster %"PRIx64" %p cache Info ##", pCtg->clusterId, pCtg);
ctgDebug("db:%d meta:%d stb:%d dbRent:%d stbRent:%d", ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_DB_NUM), ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_META_NUM),
ctgDebug("db:%d meta:%d stb:%d dbRent:%d stbRent:%d", ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_DB_NUM), ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_META_NUM),
ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_STB_NUM), ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_DB_RENT_NUM), ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_STB_RENT_NUM));
ctgDbgShowDBCache(pCtg, pCtg->dbCache);
......@@ -306,9 +306,9 @@ int32_t ctgPushRmDBMsgInQueue(SCatalog* pCtg, const char *dbFName, int64_t dbId)
return TSDB_CODE_SUCCESS;
_return:
tfree(action.data);
CTG_RET(code);
CTG_RET(code);
}
......@@ -336,9 +336,9 @@ int32_t ctgPushRmStbMsgInQueue(SCatalog* pCtg, const char *dbFName, int64_t dbId
return TSDB_CODE_SUCCESS;
_return:
tfree(action.data);
CTG_RET(code);
CTG_RET(code);
}
......@@ -366,9 +366,9 @@ int32_t ctgPushRmTblMsgInQueue(SCatalog* pCtg, const char *dbFName, int64_t dbId
return TSDB_CODE_SUCCESS;
_return:
tfree(action.data);
CTG_RET(code);
CTG_RET(code);
}
......@@ -657,9 +657,9 @@ int32_t ctgGetTableMetaFromCache(SCatalog* pCtg, const SName* pTableName, STable
return TSDB_CODE_SUCCESS;
}
size_t sz = 0;
int32_t sz = 0;
CTG_LOCK(CTG_READ, &dbCache->tbCache.metaLock);
STableMeta *tbMeta = taosHashGetCloneExt(dbCache->tbCache.metaCache, pTableName->tname, strlen(pTableName->tname), NULL, (void **)pTableMeta, &sz);
int32_t code = taosHashGetDup_m(dbCache->tbCache.metaCache, pTableName->tname, strlen(pTableName->tname), (void **)pTableMeta, &sz);
CTG_UNLOCK(CTG_READ, &dbCache->tbCache.metaLock);
if (NULL == *pTableMeta) {
......@@ -673,8 +673,8 @@ int32_t ctgGetTableMetaFromCache(SCatalog* pCtg, const SName* pTableName, STable
if (dbId) {
*dbId = dbCache->dbId;
}
tbMeta = *pTableMeta;
STableMeta* tbMeta = *pTableMeta;
if (tbMeta->tableType != TSDB_CHILD_TABLE) {
ctgReleaseDBCache(pCtg, dbCache);
......@@ -1076,7 +1076,7 @@ _return:
}
int32_t ctgMetaRentRemove(SCtgRentMgmt *mgmt, int64_t id, __compar_fn_t compare) {
int16_t widx = abs(id % mgmt->slotNum);
int16_t widx = labs(id % mgmt->slotNum);
SCtgRentSlot *slot = &mgmt->slots[widx];
int32_t code = 0;
......@@ -1238,7 +1238,7 @@ void ctgRemoveStbRent(SCatalog* pCtg, SCtgTbMetaCache *cache) {
void *pIter = taosHashIterate(cache->stbCache, NULL);
while (pIter) {
uint64_t *suid = NULL;
taosHashGetKey(pIter, (void **)&suid, NULL);
suid = taosHashGetKey(pIter, NULL);
if (TSDB_CODE_SUCCESS == ctgMetaRentRemove(&pCtg->stbRent, *suid, ctgStbVersionCompare)) {
ctgDebug("stb removed from rent, suid:%"PRIx64, *suid);
......@@ -1397,7 +1397,7 @@ int32_t ctgUpdateTblMeta(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFName, ui
CTG_UNLOCK(CTG_WRITE, &tbCache->stbLock);
ctgDebug("stb removed from stbCache, dbFName:%s, stb:%s, suid:%"PRIx64, dbFName, tbName, orig->suid);
ctgMetaRentRemove(&pCtg->stbRent, orig->suid, ctgStbVersionCompare);
}
......@@ -1437,7 +1437,7 @@ int32_t ctgUpdateTblMeta(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFName, ui
if (taosHashPut(tbCache->stbCache, &meta->suid, sizeof(meta->suid), &tbMeta, POINTER_BYTES) != 0) {
CTG_UNLOCK(CTG_WRITE, &tbCache->stbLock);
CTG_UNLOCK(CTG_READ, &tbCache->metaLock);
ctgError("taosHashPutExt stable to stable cache failed, suid:%"PRIx64, meta->suid);
ctgError("taosHashPut stable to stable cache failed, suid:%"PRIx64, meta->suid);
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
}
......@@ -1475,7 +1475,7 @@ int32_t ctgCloneVgInfo(SDBVgInfo *src, SDBVgInfo **dst) {
int32_t *vgId = NULL;
void *pIter = taosHashIterate(src->vgHash, NULL);
while (pIter) {
taosHashGetKey(pIter, (void **)&vgId, NULL);
vgId = taosHashGetKey(pIter, NULL);
if (taosHashPut((*dst)->vgHash, (void *)vgId, sizeof(int32_t), pIter, sizeof(SVgroupInfo))) {
qError("taosHashPut failed, hashSize:%d", (int32_t)hashSize);
......@@ -1635,7 +1635,7 @@ int32_t ctgRefreshTblMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps,
if (!CTG_FLAG_IS_FORCE_UPDATE(flag)) {
CTG_ERR_JRET(ctgIsTableMetaExistInCache(pCtg, output->dbFName, output->tbName, &exist));
}
if (0 == exist) {
CTG_ERR_JRET(ctgGetTableMetaFromMnodeImpl(pCtg, pTrans, pMgmtEps, output->dbFName, output->tbName, &moutput));
......@@ -1723,9 +1723,9 @@ int32_t ctgGetTableMeta(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, cons
tbType = (*pTableMeta)->tableType;
suid = (*pTableMeta)->suid;
tfree(*pTableMeta);
tfree(*pTableMeta);
}
if (CTG_FLAG_IS_UNKNOWN_STB(flag)) {
CTG_FLAG_SET_STB(flag, tbType);
}
......@@ -1950,21 +1950,21 @@ int32_t ctgActRemoveTbl(SCtgMetaAction *action) {
ctgDebug("dbId already modified, dbFName:%s, current:%"PRIx64", dbId:%"PRIx64", tbName:%s", msg->dbFName, dbCache->dbId, msg->dbId, msg->tbName);
return TSDB_CODE_SUCCESS;
}
CTG_LOCK(CTG_READ, &dbCache->tbCache.metaLock);
if (taosHashRemove(dbCache->tbCache.metaCache, msg->tbName, strlen(msg->tbName))) {
if (taosHashRemove(dbCache->tbCache.metaCache, msg->tbName, strlen(msg->tbName))) {
CTG_UNLOCK(CTG_READ, &dbCache->tbCache.metaLock);
ctgError("stb not exist in cache, dbFName:%s, tbName:%s", msg->dbFName, msg->tbName);
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
}
CTG_UNLOCK(CTG_READ, &dbCache->tbCache.metaLock);
ctgInfo("table removed from cache, dbFName:%s, tbName:%s", msg->dbFName, msg->tbName);
_return:
tfree(msg);
CTG_RET(code);
}
......@@ -2458,7 +2458,7 @@ int32_t catalogGetTableDistVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgm
CTG_ERR_JRET(ctgGenerateVgList(pCtg, vgHash, pVgList));
} else {
int32_t vgId = tbMeta->vgId;
if (NULL == taosHashGetClone(vgHash, &vgId, sizeof(vgId), &vgroupInfo)) {
if (taosHashGetDup(vgHash, &vgId, sizeof(vgId), &vgroupInfo) != 0) {
ctgError("table's vgId not found in vgroup list, vgId:%d, tbName:%s", vgId, tNameGetTableName(pTableName));
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
......
......@@ -68,7 +68,7 @@ typedef struct SResultRow {
} SResultRow;
typedef struct SResultRowInfo {
SResultRow *pCurResult; // current active result row info
SList* pRows;
SResultRow** pResult; // result list
// int16_t type:8; // data type for hash key
int32_t size; // number of result set
......
......@@ -233,9 +233,9 @@ typedef struct STaskAttr {
SArray* pUdfInfo; // no need to free
} STaskAttr;
typedef int32_t (*__optr_prepare_fn_t)(void* param);
typedef SSDataBlock* (*__operator_fn_t)(void* param, bool* newgroup);
typedef void (*__optr_cleanup_fn_t)(void* param, int32_t num);
typedef int32_t (*__optr_open_fn_t)(void* param);
typedef SSDataBlock* (*__optr_fn_t)(void* param, bool* newgroup);
typedef void (*__optr_close_fn_t)(void* param, int32_t num);
struct SOperatorInfo;
......@@ -306,21 +306,21 @@ enum {
};
typedef struct SOperatorInfo {
uint8_t operatorType;
bool blockingOptr; // block operator or not
uint8_t status; // denote if current operator is completed
int32_t numOfOutput; // number of columns of the current operator results
char* name; // name, used to show the query execution plan
void* info; // extension attribution
SExprInfo* pExpr;
STaskRuntimeEnv* pRuntimeEnv; // todo remove it
SExecTaskInfo* pTaskInfo;
uint8_t operatorType;
bool blockingOptr; // block operator or not
uint8_t status; // denote if current operator is completed
int32_t numOfOutput; // number of columns of the current operator results
char* name; // name, used to show the query execution plan
void* info; // extension attribution
SExprInfo* pExpr;
STaskRuntimeEnv* pRuntimeEnv; // todo remove it
SExecTaskInfo* pTaskInfo;
struct SOperatorInfo** pDownstream; // downstram pointer list
int32_t numOfDownstream; // number of downstream. The value is always ONE expect for join operator
__optr_prepare_fn_t prepareFn;
__operator_fn_t exec;
__optr_cleanup_fn_t cleanupFn;
__optr_open_fn_t openFn;
__optr_fn_t nextDataFn;
__optr_close_fn_t closeFn;
} SOperatorInfo;
typedef struct {
......@@ -479,9 +479,6 @@ typedef struct SAggOperatorInfo {
typedef struct SProjectOperatorInfo {
SOptrBasicInfo binfo;
int32_t bufCapacity;
uint32_t seed;
SSDataBlock* existDataBlock;
} SProjectOperatorInfo;
......@@ -615,10 +612,10 @@ SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order,
SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv);
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
SOperatorInfo* createProjectOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr,
int32_t numOfOutput);
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream);
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SInterval* pInterval, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream,
SExprInfo* pExpr, int32_t numOfOutput);
......@@ -654,8 +651,6 @@ SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pdownstream, int32_t numOf
SOperatorInfo* createOrderOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SArray* pOrderVal, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SArray* pExprInfo, SArray* pOrderVal, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo);
// SSDataBlock* doGlobalAggregate(void* param, bool* newgroup);
// SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup);
// SSDataBlock* doSLimit(void* param, bool* newgroup);
// int32_t doCreateFilterInfo(SColumnInfo* pCols, int32_t numOfCols, int32_t numOfFilterCols, SSingleColumnFilterInfo** pFilterInfo, uint64_t qId);
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_TSIMPLEHASH_H
#define TDENGINE_TSIMPLEHASH_H
#include "tarray.h"
#include "tlockfree.h"
#ifdef __cplusplus
extern "C" {
#endif
typedef uint32_t (*_hash_fn_t)(const char *, uint32_t);
typedef int32_t (*_equal_fn_t)(const void *, const void *, size_t len);
typedef void (*_hash_free_fn_t)(void *);
typedef struct SSHashObj SSHashObj;
/**
* init the hash table
*
* @param capacity initial capacity of the hash table
* @param fn hash function to generate the hash value
* @return
*/
SSHashObj *tSimpleHashInit(size_t capacity, _hash_fn_t fn, size_t keyLen, size_t dataLen);
/**
* return the size of hash table
* @param pHashObj
* @return
*/
int32_t tSimpleHashGetSize(const SSHashObj *pHashObj);
/**
* put element into hash table, if the element with the same key exists, update it
* @param pHashObj
* @param key
* @param data
* @return
*/
int32_t tSimpleHashPut(SSHashObj *pHashObj, const void *key, const void *data);
/**
* return the payload data with the specified key
*
* @param pHashObj
* @param key
* @return
*/
void *tSimpleHashGet(SSHashObj *pHashObj, const void *key);
/**
* remove item with the specified key
* @param pHashObj
* @param key
* @param keyLen
*/
int32_t tSimpleHashRemove(SSHashObj *pHashObj, const void *key);
/**
* Clear the hash table.
* @param pHashObj
*/
void tSimpleHashClear(SSHashObj *pHashObj);
/**
* Clean up hash table and release all allocated resources.
* @param handle
*/
void tSimpleHashCleanup(SSHashObj *pHashObj);
/**
* Get the hash table size
* @param pHashObj
* @return
*/
size_t tSimpleHashGetMemSize(const SSHashObj *pHashObj);
/**
* Get the corresponding key information for a given data in hash table
* @param data
* @param keyLen
* @return
*/
void *tSimpleHashGetKey(const SSHashObj* pHashObj, void *data, size_t* keyLen);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_TSIMPLEHASH_H
......@@ -158,7 +158,7 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
int64_t st = 0;
st = taosGetTimestampUs();
*pRes = pTaskInfo->pRoot->exec(pTaskInfo->pRoot, &newgroup);
*pRes = pTaskInfo->pRoot->nextDataFn(pTaskInfo->pRoot, &newgroup);
uint64_t el = (taosGetTimestampUs() - st);
pTaskInfo->cost.elapsedTime += el;
......
......@@ -29,8 +29,8 @@ typedef struct SLHashBucket {
typedef struct SLHashObj {
SDiskbasedBuf *pBuf;
_hash_fn_t hashFn;
int32_t tuplesPerPage;
SLHashBucket **pBucket; // entry list
int32_t tuplesPerPage;
int32_t numOfAlloc; // number of allocated bucket ptr slot
int32_t bits; // the number of bits used in hash
int32_t numOfBuckets; // the number of buckets
......@@ -142,7 +142,7 @@ static void doRemoveFromBucket(SFilePage* pPage, SLHashNode* pNode, SLHashBucket
pBucket->size -= 1;
}
static void doCompressBucketPages(SLHashObj *pHashObj, SLHashBucket* pBucket) {
static void doTrimBucketPages(SLHashObj *pHashObj, SLHashBucket* pBucket) {
size_t numOfPages = taosArrayGetSize(pBucket->pPageIdList);
if (numOfPages <= 1) {
return;
......@@ -253,6 +253,7 @@ SLHashObj* tHashInit(int32_t inMemPages, int32_t pageSize, _hash_fn_t fn, int32_
return NULL;
}
// disable compress when flushing to disk
setBufPageCompressOnDisk(pHashObj->pBuf, false);
/**
......@@ -367,7 +368,7 @@ int32_t tHashPut(SLHashObj* pHashObj, const void *key, size_t keyLen, void *data
releaseBufPage(pHashObj->pBuf, p);
}
doCompressBucketPages(pHashObj, pBucket);
doTrimBucketPages(pHashObj, pBucket);
}
return TSDB_CODE_SUCCESS;
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "tsimplehash.h"
#include "taoserror.h"
#define SHASH_DEFAULT_LOAD_FACTOR 0.75
#define HASH_MAX_CAPACITY (1024*1024*16)
#define SHASH_NEED_RESIZE(_h) ((_h)->size >= (_h)->capacity * SHASH_DEFAULT_LOAD_FACTOR)
#define GET_SHASH_NODE_KEY(_n, _dl) ((char*)(_n) + sizeof(SHNode) + (_dl))
#define GET_SHASH_NODE_DATA(_n) ((char*)(_n) + sizeof(SHNode))
#define HASH_INDEX(v, c) ((v) & ((c)-1))
#define HASH_NEED_RESIZE(_h) ((_h)->size >= (_h)->capacity * SHASH_DEFAULT_LOAD_FACTOR)
#define FREE_HASH_NODE(_n) \
do { \
tfree(_n); \
} while (0);
typedef struct SHNode {
struct SHNode *next;
char data[];
} SHNode;
typedef struct SSHashObj {
SHNode **hashList;
size_t capacity; // number of slots
size_t size; // number of elements in hash table
_hash_fn_t hashFp; // hash function
_equal_fn_t equalFp; // equal function
int32_t keyLen;
int32_t dataLen;
} SSHashObj;
static FORCE_INLINE int32_t taosHashCapacity(int32_t length) {
int32_t len = MIN(length, HASH_MAX_CAPACITY);
int32_t i = 4;
while (i < len) i = (i << 1u);
return i;
}
SSHashObj *tSimpleHashInit(size_t capacity, _hash_fn_t fn, size_t keyLen, size_t dataLen) {
ASSERT(fn != NULL);
if (capacity == 0) {
capacity = 4;
}
SSHashObj* pHashObj = (SSHashObj*) calloc(1, sizeof(SSHashObj));
if (pHashObj == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
// the max slots is not defined by user
pHashObj->capacity = taosHashCapacity((int32_t)capacity);
pHashObj->equalFp = memcmp;
pHashObj->hashFp = fn;
ASSERT((pHashObj->capacity & (pHashObj->capacity - 1)) == 0);
pHashObj->keyLen = keyLen;
pHashObj->dataLen = dataLen;
pHashObj->hashList = (SHNode **)calloc(pHashObj->capacity, sizeof(void *));
if (pHashObj->hashList == NULL) {
free(pHashObj);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
return pHashObj;
}
int32_t tSimpleHashGetSize(const SSHashObj *pHashObj) {
if (pHashObj == NULL) {
return 0;
}
return (int32_t)atomic_load_64(&pHashObj->size);
}
static SHNode *doCreateHashNode(const void *key, size_t keyLen, const void *pData, size_t dsize, uint32_t hashVal) {
SHNode *pNewNode = malloc(sizeof(SHNode) + keyLen + dsize);
if (pNewNode == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
pNewNode->next = NULL;
memcpy(GET_SHASH_NODE_DATA(pNewNode), pData, dsize);
memcpy(GET_SHASH_NODE_KEY(pNewNode, dsize), key, keyLen);
return pNewNode;
}
void taosHashTableResize(SSHashObj *pHashObj) {
if (!HASH_NEED_RESIZE(pHashObj)) {
return;
}
int32_t newCapacity = (int32_t)(pHashObj->capacity << 1u);
if (newCapacity > HASH_MAX_CAPACITY) {
// uDebug("current capacity:%zu, maximum capacity:%d, no resize applied due to limitation is reached",
// pHashObj->capacity, HASH_MAX_CAPACITY);
return;
}
int64_t st = taosGetTimestampUs();
void *pNewEntryList = realloc(pHashObj->hashList, sizeof(void *) * newCapacity);
if (pNewEntryList == NULL) {
// qWarn("hash resize failed due to out of memory, capacity remain:%zu", pHashObj->capacity);
return;
}
size_t inc = newCapacity - pHashObj->capacity;
memset(pNewEntryList + pHashObj->capacity * sizeof(void*), 0, inc);
pHashObj->hashList = pNewEntryList;
pHashObj->capacity = newCapacity;
for (int32_t idx = 0; idx < pHashObj->capacity; ++idx) {
SHNode* pNode = pHashObj->hashList[idx];
SHNode *pNext;
SHNode *pPrev = NULL;
if (pNode == NULL) {
continue;
}
while (pNode != NULL) {
void* key = GET_SHASH_NODE_KEY(pNode, pHashObj->dataLen);
uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)pHashObj->dataLen);
int32_t newIdx = HASH_INDEX(hashVal, pHashObj->capacity);
pNext = pNode->next;
if (newIdx != idx) {
if (pPrev == NULL) {
pHashObj->hashList[idx] = pNext;
} else {
pPrev->next = pNext;
}
pNode->next = pHashObj->hashList[newIdx];
pHashObj->hashList[newIdx] = pNode;
} else {
pPrev = pNode;
}
pNode = pNext;
}
}
int64_t et = taosGetTimestampUs();
// uDebug("hash table resize completed, new capacity:%d, load factor:%f, elapsed time:%fms", (int32_t)pHashObj->capacity,
// ((double)pHashObj->size) / pHashObj->capacity, (et - st) / 1000.0);
}
int32_t tSimpleHashPut(SSHashObj *pHashObj, const void *key, const void *data) {
if (pHashObj == NULL || key == NULL) {
return -1;
}
uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)pHashObj->keyLen);
// need the resize process, write lock applied
if (SHASH_NEED_RESIZE(pHashObj)) {
taosHashTableResize(pHashObj);
}
int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity);
SHNode *pNode = pHashObj->hashList[slot];
if (pNode == NULL) {
SHNode *pNewNode = doCreateHashNode(key, pHashObj->keyLen, data, pHashObj->size, hashVal);
if (pNewNode == NULL) {
return -1;
}
pHashObj->hashList[slot] = pNewNode;
return 0;
}
while (pNode) {
if ((*(pHashObj->equalFp))(GET_SHASH_NODE_KEY(pNode, pHashObj->dataLen), key, pHashObj->keyLen) == 0) {
break;
}
pNode = pNode->next;
}
if (pNode == NULL) {
SHNode *pNewNode = doCreateHashNode(key, pHashObj->keyLen, data, pHashObj->size, hashVal);
if (pNewNode == NULL) {
return -1;
}
pNewNode->next = pHashObj->hashList[slot];
pHashObj->hashList[slot] = pNewNode;
atomic_add_fetch_64(&pHashObj->size, 1);
} else { //update data
memcpy(GET_SHASH_NODE_DATA(pNode), data, pHashObj->dataLen);
}
return 0;
}
static FORCE_INLINE SHNode *doSearchInEntryList(SSHashObj *pHashObj, const void *key, int32_t index) {
SHNode *pNode = pHashObj->hashList[index];
while (pNode) {
if ((*(pHashObj->equalFp))(GET_SHASH_NODE_KEY(pNode, pHashObj->dataLen), key, pHashObj->keyLen) == 0) {
break;
}
pNode = pNode->next;
}
return pNode;
}
static FORCE_INLINE bool taosHashTableEmpty(const SSHashObj *pHashObj) {
return tSimpleHashGetSize(pHashObj) == 0;
}
void *tSimpleHashGet(SSHashObj *pHashObj, const void *key) {
if (pHashObj == NULL || taosHashTableEmpty(pHashObj) || key == NULL) {
return NULL;
}
uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)pHashObj->keyLen);
int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity);
SHNode *pNode = pHashObj->hashList[slot];
if (pNode == NULL) {
return NULL;
}
char *data = NULL;
pNode = doSearchInEntryList(pHashObj, key, slot);
if (pNode != NULL) {
data = GET_SHASH_NODE_DATA(pNode);
}
return data;
}
int32_t tSimpleHashRemove(SSHashObj *pHashObj, const void *key) {
// todo
}
void tSimpleHashClear(SSHashObj *pHashObj) {
if (pHashObj == NULL) {
return;
}
SHNode *pNode, *pNext;
for (int32_t i = 0; i < pHashObj->capacity; ++i) {
pNode = pHashObj->hashList[i];
if (pNode == NULL) {
continue;
}
while (pNode) {
pNext = pNode->next;
FREE_HASH_NODE(pNode);
pNode = pNext;
}
}
pHashObj->size = 0;
}
void tSimpleHashCleanup(SSHashObj *pHashObj) {
if (pHashObj == NULL) {
return;
}
tSimpleHashClear(pHashObj);
tfree(pHashObj->hashList);
}
size_t tSimpleHashGetMemSize(const SSHashObj *pHashObj) {
if (pHashObj == NULL) {
return 0;
}
return (pHashObj->capacity * sizeof(void *)) + sizeof(SHNode) * tSimpleHashGetSize(pHashObj) + sizeof(SSHashObj);
}
void *tSimpleHashGetKey(const SSHashObj* pHashObj, void *data, size_t* keyLen) {
int32_t offset = offsetof(SHNode, data);
SHNode *node = data - offset;
if (keyLen != NULL) {
*keyLen = pHashObj->keyLen;
}
return GET_SHASH_NODE_KEY(node, pHashObj->dataLen);
}
\ No newline at end of file
......@@ -255,7 +255,7 @@ tMemBucket *tMemBucketCreate(int16_t nElemSize, int16_t dataType, double minval,
resetSlotInfo(pBucket);
int32_t ret = createDiskbasedBuf(&pBucket->pBuffer, pBucket->bufPageSize, pBucket->bufPageSize * 512, 1, "/tmp");
int32_t ret = createDiskbasedBuf(&pBucket->pBuffer, pBucket->bufPageSize, pBucket->bufPageSize * 512, "1", "/tmp");
if (ret != 0) {
tMemBucketDestroy(pBucket);
return NULL;
......
......@@ -153,7 +153,7 @@ static int32_t buildOutput(SInsertParseContext* pCxt) {
if (NULL == dst) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
taosHashGetClone(pCxt->pVgroupsHashObj, (const char*)&src->vgId, sizeof(src->vgId), &dst->vg);
taosHashGetDup(pCxt->pVgroupsHashObj, (const char*)&src->vgId, sizeof(src->vgId), &dst->vg);
dst->numOfTables = src->numOfTables;
dst->size = src->size;
TSWAP(dst->pData, src->pData, char*);
......
此差异已折叠。
此差异已折叠。
......@@ -42,8 +42,8 @@ struct SDiskbasedBuf {
bool comp; // compressed before flushed to disk
uint64_t nextPos; // next page flush position
uint64_t qId; // for debug purpose
bool printStatis; // Print statistics info when closing this buffer.
char* id; // for debug purpose
bool printStatis; // Print statistics info when closing this buffer.
SDiskbasedBufStatis statis;
};
......@@ -269,11 +269,12 @@ static SPageInfo* registerPage(SDiskbasedBuf* pBuf, int32_t groupId, int32_t pag
SPageInfo* ppi = malloc(sizeof(SPageInfo));
ppi->pageId = pageId;
ppi->pData = NULL;
ppi->pData = NULL;
ppi->offset = -1;
ppi->length = -1;
ppi->used = true;
ppi->pn = NULL;
ppi->used = true;
ppi->pn = NULL;
ppi->dirty = false;
return *(SPageInfo**)taosArrayPush(list, &ppi);
}
......@@ -356,7 +357,7 @@ static SPageInfo* getPageInfoFromPayload(void* page) {
return ppi;
}
int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMemBufSize, uint64_t qId,
int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMemBufSize, const char* id,
const char* dir) {
*pBuf = calloc(1, sizeof(SDiskbasedBuf));
......@@ -366,13 +367,13 @@ int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMem
}
pPBuf->pageSize = pagesize;
pPBuf->numOfPages = 0; // all pages are in buffer in the first place
pPBuf->numOfPages = 0; // all pages are in buffer in the first place
pPBuf->totalBufSize = 0;
pPBuf->inMemPages = inMemBufSize / pagesize; // maximum allowed pages, it is a soft limit.
pPBuf->allocateId = -1;
pPBuf->comp = true;
pPBuf->pFile = NULL;
pPBuf->qId = qId;
pPBuf->comp = true;
pPBuf->pFile = NULL;
pPBuf->id = strdup(id);
pPBuf->fileSize = 0;
pPBuf->pFree = taosArrayInit(4, sizeof(SFreeListItem));
pPBuf->freePgList = tdListNew(POINTER_BYTES);
......@@ -471,7 +472,7 @@ void* getBufPage(SDiskbasedBuf* pBuf, int32_t id) {
return (void*)(GET_DATA_PAYLOAD(*pi));
} else { // not in memory
assert((*pi)->pData == NULL && (*pi)->pn == NULL && (*pi)->length >= 0 && (*pi)->offset >= 0);
assert((*pi)->pData == NULL && (*pi)->pn == NULL && (((*pi)->length >= 0 && (*pi)->offset >= 0) || ((*pi)->length == -1 && (*pi)->offset == -1)));
char* availablePage = NULL;
if (NO_IN_MEM_AVAILABLE_PAGES(pBuf)) {
......@@ -493,9 +494,12 @@ void* getBufPage(SDiskbasedBuf* pBuf, int32_t id) {
lruListPushFront(pBuf->lruList, *pi);
(*pi)->used = true;
int32_t code = loadPageFromDisk(pBuf, *pi);
if (code != 0) {
return NULL;
// some data has been flushed to disk, and needs to be loaded into buffer again.
if ((*pi)->length > 0 && (*pi)->offset >= 0) {
int32_t code = loadPageFromDisk(pBuf, *pi);
if (code != 0) {
return NULL;
}
}
return (void*)(GET_DATA_PAYLOAD(*pi));
......@@ -540,13 +544,13 @@ void destroyDiskbasedBuf(SDiskbasedBuf* pBuf) {
if (pBuf->pFile != NULL) {
uDebug(
"Paged buffer closed, total:%.2f Kb (%d Pages), inmem size:%.2f Kb (%d Pages), file size:%.2f Kb, page "
"size:%.2f Kb, %" PRIx64 "\n",
"size:%.2f Kb, %s\n",
pBuf->totalBufSize / 1024.0, pBuf->numOfPages, listNEles(pBuf->lruList) * pBuf->pageSize / 1024.0,
listNEles(pBuf->lruList), pBuf->fileSize / 1024.0, pBuf->pageSize / 1024.0f, pBuf->qId);
listNEles(pBuf->lruList), pBuf->fileSize / 1024.0, pBuf->pageSize / 1024.0f, pBuf->id);
taosCloseFile(&pBuf->pFile);
} else {
uDebug("Paged buffer closed, total:%.2f Kb, no file created, %" PRIx64, pBuf->totalBufSize / 1024.0, pBuf->qId);
uDebug("Paged buffer closed, total:%.2f Kb, no file created, %s", pBuf->totalBufSize / 1024.0, pBuf->id);
}
// print the statistics information
......@@ -584,6 +588,7 @@ void destroyDiskbasedBuf(SDiskbasedBuf* pBuf) {
taosHashCleanup(pBuf->groupSet);
taosHashCleanup(pBuf->all);
tfree(pBuf->id);
tfree(pBuf->assistBuf);
tfree(pBuf);
}
......@@ -639,9 +644,9 @@ void dBufPrintStatis(const SDiskbasedBuf* pBuf) {
printf(
"Paged buffer closed, total:%.2f Kb (%d Pages), inmem size:%.2f Kb (%d Pages), file size:%.2f Kb, page size:%.2f "
"Kb, %" PRIx64 "\n",
"Kb, %s\n",
pBuf->totalBufSize / 1024.0, pBuf->numOfPages, listNEles(pBuf->lruList) * pBuf->pageSize / 1024.0,
listNEles(pBuf->lruList), pBuf->fileSize / 1024.0, pBuf->pageSize / 1024.0f, pBuf->qId);
listNEles(pBuf->lruList), pBuf->fileSize / 1024.0, pBuf->pageSize / 1024.0f, pBuf->id);
printf(
"Get/Release pages:%d/%d, flushToDisk:%.2f Kb (%d Pages), loadFromDisk:%.2f Kb (%d Pages), avgPageSize:%.2f Kb\n",
......
......@@ -43,6 +43,9 @@ static void remove_batch_test() {
taosArrayPush(delList, &a);
taosArrayRemoveBatch(pa, (const int32_t*) TARRAY_GET_START(delList), taosArrayGetSize(delList));
EXPECT_EQ(taosArrayGetSize(pa), 17);
taosArrayDestroy(pa);
taosArrayDestroy(delList);
}
} // namespace
......@@ -79,4 +82,6 @@ TEST(arrayTest, array_search_test) {
}
}
taosArrayDestroy(pa);
}
......@@ -6,7 +6,7 @@
#include "tcache.h"
// test cache
TEST(testCase, client_cache_test) {
TEST(cacheTest, client_cache_test) {
const int32_t REFRESH_TIME_IN_SEC = 2;
SCacheObj* tscMetaCache = taosCacheInit(TSDB_DATA_TYPE_BINARY, REFRESH_TIME_IN_SEC, 0, NULL, "test");
......@@ -92,7 +92,7 @@ TEST(testCase, client_cache_test) {
taosCacheCleanup(tscMetaCache);
}
TEST(testCase, cache_resize_test) {
TEST(cacheTest, cache_iter_test) {
const int32_t REFRESH_TIME_IN_SEC = 2;
auto* pCache = taosCacheInit(TSDB_DATA_TYPE_BINARY, REFRESH_TIME_IN_SEC, false, NULL, "test");
......@@ -107,6 +107,7 @@ TEST(testCase, cache_resize_test) {
int32_t len = sprintf(key, "abc_%7d", i);
taosCachePut(pCache, key, strlen(key), data, len, 3600);
}
uint64_t endTime = taosGetTimestampUs();
printf("add %d object cost:%" PRIu64 " us, avg:%f us\n", num, endTime - startTime, (endTime-startTime)/(double)num);
......@@ -120,5 +121,22 @@ TEST(testCase, cache_resize_test) {
endTime = taosGetTimestampUs();
printf("retrieve %d object cost:%" PRIu64 " us,avg:%f\n", num, endTime - startTime, (endTime - startTime)/(double)num);
int32_t count = 0;
SCacheIter* pIter = taosCacheCreateIter(pCache);
while(taosCacheIterNext(pIter)) {
size_t keyLen = 0;
size_t dataLen = 0;
char* key1 = static_cast<char*>(taosCacheIterGetKey(pIter, &keyLen));
char* data1 = static_cast<char*>(taosCacheIterGetData(pIter, &dataLen));
// char d[256] = {0};
// memcpy(d, data1, dataLen);
// char k[256] = {0};
// memcpy(k, key1, keyLen);
}
ASSERT_EQ(count, num);
taosCacheCleanup(pCache);
}
\ No newline at end of file
}
......@@ -201,8 +201,8 @@ TEST(td_encode_test, encode_decode_cstr) {
}
}
delete buf;
delete cstr;
delete[] buf;
delete[] cstr;
}
typedef struct {
......@@ -354,7 +354,7 @@ static int32_t tSFinalReq_v2_decode(SCoder *pCoder, SFinalReq_v2 *ps2) {
tEndDecode(pCoder);
return 0;
}
#if 0
TEST(td_encode_test, compound_struct_encode_test) {
SCoder encoder, decoder;
uint8_t * buf1;
......@@ -436,5 +436,5 @@ TEST(td_encode_test, compound_struct_encode_test) {
GTEST_ASSERT_EQ(dreq21.v_b, req2.v_b);
tCoderClear(&decoder);
}
#endif
#pragma GCC diagnostic pop
\ No newline at end of file
......@@ -106,7 +106,7 @@ void noLockPerformanceTest() {
ASSERT_EQ(taosHashGetSize(hashTable), 0);
char key[128] = {0};
int32_t num = 5000000;
int32_t num = 5000;
int64_t st = taosGetTimestampUs();
......@@ -186,10 +186,15 @@ void acquireRleaseTest() {
printf("%s,expect:%s", pdata->p, str3);
ASSERT_TRUE(strcmp(pdata->p, str3) == 0);
tfree(pdata->p);
taosHashRelease(hashTable, pdata);
num = taosHashGetSize(hashTable);
ASSERT_EQ(num, 1);
taosHashCleanup(hashTable);
tfree(data.p);
}
}
......
......@@ -12,145 +12,150 @@
namespace {
// simple test
void simpleTest() {
SDiskbasedBuf* pResultBuf = NULL;
int32_t ret = createDiskbasedBuf(&pResultBuf, 1024, 4096, 1, "/tmp/");
SDiskbasedBuf* pBuf = NULL;
int32_t ret = createDiskbasedBuf(&pBuf, 1024, 4096, "", "/tmp/");
int32_t pageId = 0;
int32_t groupId = 0;
SFilePage* pBufPage = static_cast<SFilePage*>(getNewBufPage(pResultBuf, groupId, &pageId));
SFilePage* pBufPage = static_cast<SFilePage*>(getNewBufPage(pBuf, groupId, &pageId));
ASSERT_TRUE(pBufPage != NULL);
ASSERT_EQ(getTotalBufSize(pResultBuf), 1024);
ASSERT_EQ(getTotalBufSize(pBuf), 1024);
SIDList list = getDataBufPagesIdList(pResultBuf, groupId);
SIDList list = getDataBufPagesIdList(pBuf, groupId);
ASSERT_EQ(taosArrayGetSize(list), 1);
ASSERT_EQ(getNumOfBufGroupId(pResultBuf), 1);
ASSERT_EQ(getNumOfBufGroupId(pBuf), 1);
releaseBufPage(pResultBuf, pBufPage);
releaseBufPage(pBuf, pBufPage);
SFilePage* pBufPage1 = static_cast<SFilePage*>(getNewBufPage(pResultBuf, groupId, &pageId));
SFilePage* pBufPage1 = static_cast<SFilePage*>(getNewBufPage(pBuf, groupId, &pageId));
SFilePage* t = static_cast<SFilePage*>(getBufPage(pResultBuf, pageId));
SFilePage* t = static_cast<SFilePage*>(getBufPage(pBuf, pageId));
ASSERT_TRUE(t == pBufPage1);
SFilePage* pBufPage2 = static_cast<SFilePage*>(getNewBufPage(pResultBuf, groupId, &pageId));
SFilePage* t1 = static_cast<SFilePage*>(getBufPage(pResultBuf, pageId));
SFilePage* pBufPage2 = static_cast<SFilePage*>(getNewBufPage(pBuf, groupId, &pageId));
SFilePage* t1 = static_cast<SFilePage*>(getBufPage(pBuf, pageId));
ASSERT_TRUE(t1 == pBufPage2);
SFilePage* pBufPage3 = static_cast<SFilePage*>(getNewBufPage(pResultBuf, groupId, &pageId));
SFilePage* t2 = static_cast<SFilePage*>(getBufPage(pResultBuf, pageId));
SFilePage* pBufPage3 = static_cast<SFilePage*>(getNewBufPage(pBuf, groupId, &pageId));
SFilePage* t2 = static_cast<SFilePage*>(getBufPage(pBuf, pageId));
ASSERT_TRUE(t2 == pBufPage3);
SFilePage* pBufPage4 = static_cast<SFilePage*>(getNewBufPage(pResultBuf, groupId, &pageId));
SFilePage* t3 = static_cast<SFilePage*>(getBufPage(pResultBuf, pageId));
SFilePage* pBufPage4 = static_cast<SFilePage*>(getNewBufPage(pBuf, groupId, &pageId));
SFilePage* t3 = static_cast<SFilePage*>(getBufPage(pBuf, pageId));
ASSERT_TRUE(t3 == pBufPage4);
SFilePage* pBufPage5 = static_cast<SFilePage*>(getNewBufPage(pResultBuf, groupId, &pageId));
SFilePage* t4 = static_cast<SFilePage*>(getBufPage(pResultBuf, pageId));
releaseBufPage(pBuf, pBufPage2);
SFilePage* pBufPage5 = static_cast<SFilePage*>(getNewBufPage(pBuf, groupId, &pageId));
SFilePage* t4 = static_cast<SFilePage*>(getBufPage(pBuf, pageId));
ASSERT_TRUE(t4 == pBufPage5);
destroyDiskbasedBuf(pResultBuf);
destroyDiskbasedBuf(pBuf);
}
void writeDownTest() {
SDiskbasedBuf* pResultBuf = NULL;
int32_t ret = createDiskbasedBuf(&pResultBuf, 1024, 4*1024, 1, "/tmp/");
SDiskbasedBuf* pBuf = NULL;
int32_t ret = createDiskbasedBuf(&pBuf, 1024, 4*1024, "1", "/tmp/");
int32_t pageId = 0;
int32_t writePageId = 0;
int32_t groupId = 0;
int32_t nx = 12345;
SFilePage* pBufPage = static_cast<SFilePage*>(getNewBufPage(pResultBuf, groupId, &pageId));
SFilePage* pBufPage = static_cast<SFilePage*>(getNewBufPage(pBuf, groupId, &pageId));
ASSERT_TRUE(pBufPage != NULL);
*(int32_t*)(pBufPage->data) = nx;
writePageId = pageId;
releaseBufPage(pResultBuf, pBufPage);
setBufPageDirty(pBufPage, true);
releaseBufPage(pBuf, pBufPage);
SFilePage* pBufPage1 = static_cast<SFilePage*>(getNewBufPage(pResultBuf, groupId, &pageId));
SFilePage* t1 = static_cast<SFilePage*>(getBufPage(pResultBuf, pageId));
SFilePage* pBufPage1 = static_cast<SFilePage*>(getNewBufPage(pBuf, groupId, &pageId));
SFilePage* t1 = static_cast<SFilePage*>(getBufPage(pBuf, pageId));
ASSERT_TRUE(t1 == pBufPage1);
ASSERT_TRUE(pageId == 1);
SFilePage* pBufPage2 = static_cast<SFilePage*>(getNewBufPage(pResultBuf, groupId, &pageId));
SFilePage* t2 = static_cast<SFilePage*>(getBufPage(pResultBuf, pageId));
SFilePage* pBufPage2 = static_cast<SFilePage*>(getNewBufPage(pBuf, groupId, &pageId));
SFilePage* t2 = static_cast<SFilePage*>(getBufPage(pBuf, pageId));
ASSERT_TRUE(t2 == pBufPage2);
ASSERT_TRUE(pageId == 2);
SFilePage* pBufPage3 = static_cast<SFilePage*>(getNewBufPage(pResultBuf, groupId, &pageId));
SFilePage* t3 = static_cast<SFilePage*>(getBufPage(pResultBuf, pageId));
SFilePage* pBufPage3 = static_cast<SFilePage*>(getNewBufPage(pBuf, groupId, &pageId));
SFilePage* t3 = static_cast<SFilePage*>(getBufPage(pBuf, pageId));
ASSERT_TRUE(t3 == pBufPage3);
ASSERT_TRUE(pageId == 3);
SFilePage* pBufPage4 = static_cast<SFilePage*>(getNewBufPage(pResultBuf, groupId, &pageId));
SFilePage* t4 = static_cast<SFilePage*>(getBufPage(pResultBuf, pageId));
SFilePage* pBufPage4 = static_cast<SFilePage*>(getNewBufPage(pBuf, groupId, &pageId));
SFilePage* t4 = static_cast<SFilePage*>(getBufPage(pBuf, pageId));
ASSERT_TRUE(t4 == pBufPage4);
ASSERT_TRUE(pageId == 4);
releaseBufPage(pResultBuf, t4);
releaseBufPage(pBuf, t4);
// flush the written page to disk, and read it out again
SFilePage* pBufPagex = static_cast<SFilePage*>(getBufPage(pResultBuf, writePageId));
SFilePage* pBufPagex = static_cast<SFilePage*>(getBufPage(pBuf, writePageId));
ASSERT_EQ(*(int32_t*)pBufPagex->data, nx);
SArray* pa = getDataBufPagesIdList(pResultBuf, groupId);
SArray* pa = getDataBufPagesIdList(pBuf, groupId);
ASSERT_EQ(taosArrayGetSize(pa), 5);
destroyDiskbasedBuf(pResultBuf);
destroyDiskbasedBuf(pBuf);
}
void recyclePageTest() {
SDiskbasedBuf* pResultBuf = NULL;
int32_t ret = createDiskbasedBuf(&pResultBuf, 1024, 4*1024, 1, "/tmp/");
SDiskbasedBuf* pBuf = NULL;
int32_t ret = createDiskbasedBuf(&pBuf, 1024, 4*1024, "1", "/tmp/");
int32_t pageId = 0;
int32_t writePageId = 0;
int32_t groupId = 0;
int32_t nx = 12345;
SFilePage* pBufPage = static_cast<SFilePage*>(getNewBufPage(pResultBuf, groupId, &pageId));
SFilePage* pBufPage = static_cast<SFilePage*>(getNewBufPage(pBuf, groupId, &pageId));
ASSERT_TRUE(pBufPage != NULL);
releaseBufPage(pResultBuf, pBufPage);
releaseBufPage(pBuf, pBufPage);
SFilePage* pBufPage1 = static_cast<SFilePage*>(getNewBufPage(pResultBuf, groupId, &pageId));
SFilePage* t1 = static_cast<SFilePage*>(getBufPage(pResultBuf, pageId));
SFilePage* pBufPage1 = static_cast<SFilePage*>(getNewBufPage(pBuf, groupId, &pageId));
SFilePage* t1 = static_cast<SFilePage*>(getBufPage(pBuf, pageId));
ASSERT_TRUE(t1 == pBufPage1);
ASSERT_TRUE(pageId == 1);
SFilePage* pBufPage2 = static_cast<SFilePage*>(getNewBufPage(pResultBuf, groupId, &pageId));
SFilePage* t2 = static_cast<SFilePage*>(getBufPage(pResultBuf, pageId));
SFilePage* pBufPage2 = static_cast<SFilePage*>(getNewBufPage(pBuf, groupId, &pageId));
SFilePage* t2 = static_cast<SFilePage*>(getBufPage(pBuf, pageId));
ASSERT_TRUE(t2 == pBufPage2);
ASSERT_TRUE(pageId == 2);
SFilePage* pBufPage3 = static_cast<SFilePage*>(getNewBufPage(pResultBuf, groupId, &pageId));
SFilePage* t3 = static_cast<SFilePage*>(getBufPage(pResultBuf, pageId));
SFilePage* pBufPage3 = static_cast<SFilePage*>(getNewBufPage(pBuf, groupId, &pageId));
SFilePage* t3 = static_cast<SFilePage*>(getBufPage(pBuf, pageId));
ASSERT_TRUE(t3 == pBufPage3);
ASSERT_TRUE(pageId == 3);
SFilePage* pBufPage4 = static_cast<SFilePage*>(getNewBufPage(pResultBuf, groupId, &pageId));
SFilePage* t4 = static_cast<SFilePage*>(getBufPage(pResultBuf, pageId));
SFilePage* pBufPage4 = static_cast<SFilePage*>(getNewBufPage(pBuf, groupId, &pageId));
SFilePage* t4 = static_cast<SFilePage*>(getBufPage(pBuf, pageId));
ASSERT_TRUE(t4 == pBufPage4);
ASSERT_TRUE(pageId == 4);
releaseBufPage(pResultBuf, t4);
releaseBufPage(pBuf, t4);
SFilePage* pBufPage5 = static_cast<SFilePage*>(getNewBufPage(pResultBuf, groupId, &pageId));
SFilePage* t5 = static_cast<SFilePage*>(getBufPage(pResultBuf, pageId));
SFilePage* pBufPage5 = static_cast<SFilePage*>(getNewBufPage(pBuf, groupId, &pageId));
SFilePage* t5 = static_cast<SFilePage*>(getBufPage(pBuf, pageId));
ASSERT_TRUE(t5 == pBufPage5);
ASSERT_TRUE(pageId == 5);
releaseBufPage(pBuf, t5);
// flush the written page to disk, and read it out again
SFilePage* pBufPagex = static_cast<SFilePage*>(getBufPage(pResultBuf, writePageId));
SFilePage* pBufPagex = static_cast<SFilePage*>(getBufPage(pBuf, writePageId));
*(int32_t*)(pBufPagex->data) = nx;
writePageId = pageId; // update the data
releaseBufPage(pResultBuf, pBufPagex);
releaseBufPage(pBuf, pBufPagex);
SFilePage* pBufPagex1 = static_cast<SFilePage*>(getBufPage(pResultBuf, 1));
SFilePage* pBufPagex1 = static_cast<SFilePage*>(getBufPage(pBuf, 1));
SArray* pa = getDataBufPagesIdList(pResultBuf, groupId);
SArray* pa = getDataBufPagesIdList(pBuf, groupId);
ASSERT_EQ(taosArrayGetSize(pa), 6);
destroyDiskbasedBuf(pResultBuf);
destroyDiskbasedBuf(pBuf);
}
} // namespace
......
Subproject commit 08ed39f0a5fcbbfb5a630b945ab3d1998d4b4136
Subproject commit 904e6f0e152e8fe61edfe0a0a9ae497cfde2a72c
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册