提交 0adbbe08 编写于 作者: D dapan1121

feature/qnode

上级 4728b159
...@@ -796,6 +796,7 @@ typedef struct { ...@@ -796,6 +796,7 @@ typedef struct {
typedef struct { typedef struct {
char db[TSDB_DB_FNAME_LEN]; char db[TSDB_DB_FNAME_LEN];
int64_t uid;
int32_t vgVersion; int32_t vgVersion;
int32_t vgNum; int32_t vgNum;
int8_t hashMethod; int8_t hashMethod;
......
...@@ -48,8 +48,21 @@ typedef struct SMetaData { ...@@ -48,8 +48,21 @@ typedef struct SMetaData {
typedef struct SCatalogCfg { typedef struct SCatalogCfg {
uint32_t maxTblCacheNum; uint32_t maxTblCacheNum;
uint32_t maxDBCacheNum; uint32_t maxDBCacheNum;
uint32_t dbRentSec;
uint32_t stableRentSec;
} SCatalogCfg; } SCatalogCfg;
typedef struct SSTableMetaVersion {
uint64_t suid;
int16_t sversion;
int16_t tversion;
} SSTableMetaVersion;
typedef struct SDbVgVersion {
int64_t dbId;
int32_t vgVersion;
} SDbVgVersion;
int32_t catalogInit(SCatalogCfg *cfg); int32_t catalogInit(SCatalogCfg *cfg);
...@@ -61,6 +74,8 @@ int32_t catalogInit(SCatalogCfg *cfg); ...@@ -61,6 +74,8 @@ int32_t catalogInit(SCatalogCfg *cfg);
*/ */
int32_t catalogGetHandle(uint64_t clusterId, struct SCatalog** catalogHandle); int32_t catalogGetHandle(uint64_t clusterId, struct SCatalog** catalogHandle);
void catalogFreeHandle(struct SCatalog* pCatalog);
int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName, int32_t* version); int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName, int32_t* version);
/** /**
...@@ -161,6 +176,9 @@ int32_t catalogGetAllMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* p ...@@ -161,6 +176,9 @@ int32_t catalogGetAllMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* p
int32_t catalogGetQnodeList(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, SArray* pQnodeList); int32_t catalogGetQnodeList(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, SArray* pQnodeList);
int32_t catalogGetExpiredSTables(struct SCatalog* pCatalog, SSTableMetaVersion **stables, uint32_t *num);
int32_t catalogGetExpiredDBs(struct SCatalog* pCatalog, SDbVgVersion **dbs, uint32_t *num);
/** /**
......
...@@ -76,6 +76,7 @@ typedef struct STableMeta { ...@@ -76,6 +76,7 @@ typedef struct STableMeta {
typedef struct SDBVgroupInfo { typedef struct SDBVgroupInfo {
SRWLatch lock; SRWLatch lock;
int64_t dbId;
int32_t vgVersion; int32_t vgVersion;
int8_t hashMethod; int8_t hashMethod;
SHashObj *vgInfo; //key:vgId, value:SVgroupInfo SHashObj *vgInfo; //key:vgId, value:SVgroupInfo
......
...@@ -124,6 +124,9 @@ int32_t taosHashGetSize(const SHashObj *pHashObj); ...@@ -124,6 +124,9 @@ int32_t taosHashGetSize(const SHashObj *pHashObj);
*/ */
int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *data, size_t size); int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *data, size_t size);
int32_t taosHashPutExt(SHashObj *pHashObj, const void *key, size_t keyLen, void *data, size_t size, bool *newAdded);
/** /**
* return the payload data with the specified key * return the payload data with the specified key
* *
......
...@@ -44,7 +44,6 @@ extern int32_t tsdbDebugFlag; ...@@ -44,7 +44,6 @@ extern int32_t tsdbDebugFlag;
extern int32_t tqDebugFlag; extern int32_t tqDebugFlag;
extern int32_t cqDebugFlag; extern int32_t cqDebugFlag;
extern int32_t debugFlag; extern int32_t debugFlag;
extern int32_t ctgDebugFlag;
#define DEBUG_FATAL 1U #define DEBUG_FATAL 1U
#define DEBUG_ERROR DEBUG_FATAL #define DEBUG_ERROR DEBUG_FATAL
......
...@@ -917,6 +917,7 @@ static int32_t mndProcessUseDbMsg(SMnodeMsg *pMsg) { ...@@ -917,6 +917,7 @@ static int32_t mndProcessUseDbMsg(SMnodeMsg *pMsg) {
} }
memcpy(pRsp->db, pDb->name, TSDB_DB_FNAME_LEN); memcpy(pRsp->db, pDb->name, TSDB_DB_FNAME_LEN);
pRsp->uid = htobe64(pDb->uid);
pRsp->vgVersion = htonl(pDb->vgVersion); pRsp->vgVersion = htonl(pDb->vgVersion);
pRsp->vgNum = htonl(vindex); pRsp->vgNum = htonl(vindex);
pRsp->hashMethod = pDb->hashMethod; pRsp->hashMethod = pDb->hashMethod;
......
...@@ -22,12 +22,16 @@ extern "C" { ...@@ -22,12 +22,16 @@ extern "C" {
#include "catalog.h" #include "catalog.h"
#include "common.h" #include "common.h"
#include "tlog.h" #include "query.h"
#define CTG_DEFAULT_CACHE_CLUSTER_NUMBER 6 #define CTG_DEFAULT_CACHE_CLUSTER_NUMBER 6
#define CTG_DEFAULT_CACHE_VGROUP_NUMBER 100 #define CTG_DEFAULT_CACHE_VGROUP_NUMBER 100
#define CTG_DEFAULT_CACHE_DB_NUMBER 20 #define CTG_DEFAULT_CACHE_DB_NUMBER 20
#define CTG_DEFAULT_CACHE_TABLEMETA_NUMBER 100000 #define CTG_DEFAULT_CACHE_TABLEMETA_NUMBER 100000
#define CTG_DEFAULT_RENT_SECOND 10
#define CTG_DEFAULT_RENT_SLOT_SIZE 10
#define CTG_RENT_SLOT_SECOND 2
#define CTG_DEFAULT_INVALID_VERSION (-1) #define CTG_DEFAULT_INVALID_VERSION (-1)
...@@ -36,6 +40,11 @@ enum { ...@@ -36,6 +40,11 @@ enum {
CTG_WRITE, CTG_WRITE,
}; };
enum {
CTG_RENT_DB = 1,
CTG_RENT_STABLE,
};
typedef struct SVgroupListCache { typedef struct SVgroupListCache {
int32_t vgroupVersion; int32_t vgroupVersion;
SHashObj *cache; // key:vgId, value:SVgroupInfo SHashObj *cache; // key:vgId, value:SVgroupInfo
...@@ -51,14 +60,29 @@ typedef struct STableMetaCache { ...@@ -51,14 +60,29 @@ typedef struct STableMetaCache {
SHashObj *stableCache; //key:suid, value:STableMeta* SHashObj *stableCache; //key:suid, value:STableMeta*
} STableMetaCache; } STableMetaCache;
typedef struct SRentSlotInfo {
SRWLatch lock;
bool needSort;
SArray *meta;
} SRentSlotInfo;
typedef struct SMetaRentMgmt {
int8_t type;
uint16_t slotNum;
uint16_t slotRIdx;
int64_t lastReadMsec;
SRentSlotInfo *slots;
} SMetaRentMgmt;
typedef struct SCatalog { typedef struct SCatalog {
SDBVgroupCache dbCache; SDBVgroupCache dbCache;
STableMetaCache tableCache; STableMetaCache tableCache;
SMetaRentMgmt dbRent;
SMetaRentMgmt stableRent;
} SCatalog; } SCatalog;
typedef struct SCatalogMgmt { typedef struct SCatalogMgmt {
void *pMsgSender; // used to send messsage to mnode to fetch necessary metadata SHashObj *pCluster; //key: clusterId, value: SCatalog*
SHashObj *pCluster; // items cached for each cluster, the hash key is the cluster-id got from mgmt node
SCatalogCfg cfg; SCatalogCfg cfg;
} SCatalogMgmt; } SCatalogMgmt;
...@@ -72,17 +96,15 @@ typedef uint32_t (*tableNameHashFp)(const char *, uint32_t); ...@@ -72,17 +96,15 @@ typedef uint32_t (*tableNameHashFp)(const char *, uint32_t);
#define CTG_TABLE_NOT_EXIST(code) (code == TSDB_CODE_TDB_INVALID_TABLE_ID) #define CTG_TABLE_NOT_EXIST(code) (code == TSDB_CODE_TDB_INVALID_TABLE_ID)
#define ctgFatal(...) do { if (ctgDebugFlag & DEBUG_FATAL) { taosPrintLog("CTG FATAL ", ctgDebugFlag, __VA_ARGS__); }} while(0) #define ctgFatal(param, ...) qFatal("CTG:%p " param, pCatalog, __VA_ARGS__)
#define ctgError(...) do { if (ctgDebugFlag & DEBUG_ERROR) { taosPrintLog("CTG ERROR ", ctgDebugFlag, __VA_ARGS__); }} while(0) #define ctgError(param, ...) qError("CTG:%p " param, pCatalog, __VA_ARGS__)
#define ctgWarn(...) do { if (ctgDebugFlag & DEBUG_WARN) { taosPrintLog("CTG WARN ", ctgDebugFlag, __VA_ARGS__); }} while(0) #define ctgWarn(param, ...) qWarn("CTG:%p " param, pCatalog, __VA_ARGS__)
#define ctgInfo(...) do { if (ctgDebugFlag & DEBUG_INFO) { taosPrintLog("CTG ", ctgDebugFlag, __VA_ARGS__); }} while(0) #define ctgInfo(param, ...) qInfo("CTG:%p " param, pCatalog, __VA_ARGS__)
#define ctgDebug(...) do { if (ctgDebugFlag & DEBUG_DEBUG) { taosPrintLog("CTG ", ctgDebugFlag, __VA_ARGS__); }} while(0) #define ctgDebug(param, ...) qDebug("CTG:%p " param, pCatalog, __VA_ARGS__)
#define ctgTrace(...) do { if (ctgDebugFlag & DEBUG_TRACE) { taosPrintLog("CTG ", ctgDebugFlag, __VA_ARGS__); }} while(0) #define ctgTrace(param, ...) qTrace("CTG:%p " param, pCatalog, __VA_ARGS__)
#define ctgDebugL(...) do { if (ctgDebugFlag & DEBUG_DEBUG) { taosPrintLongString("CTG ", ctgDebugFlag, __VA_ARGS__); }} while(0)
#define CTG_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0) #define CTG_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0)
#define CTG_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0) #define CTG_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
#define CTG_ERR_LRET(c,...) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { ctgError(__VA_ARGS__); terrno = _code; return _code; } } while (0)
#define CTG_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0) #define CTG_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0)
#define TD_RWLATCH_WRITE_FLAG_COPY 0x40000000 #define TD_RWLATCH_WRITE_FLAG_COPY 0x40000000
...@@ -90,15 +112,15 @@ typedef uint32_t (*tableNameHashFp)(const char *, uint32_t); ...@@ -90,15 +112,15 @@ typedef uint32_t (*tableNameHashFp)(const char *, uint32_t);
#define CTG_LOCK(type, _lock) do { \ #define CTG_LOCK(type, _lock) do { \
if (CTG_READ == (type)) { \ if (CTG_READ == (type)) { \
assert(atomic_load_32((_lock)) >= 0); \ assert(atomic_load_32((_lock)) >= 0); \
ctgDebug("CTG RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ qDebug("CTG RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosRLockLatch(_lock); \ taosRLockLatch(_lock); \
ctgDebug("CTG RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ qDebug("CTG RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) > 0); \ assert(atomic_load_32((_lock)) > 0); \
} else { \ } else { \
assert(atomic_load_32((_lock)) >= 0); \ assert(atomic_load_32((_lock)) >= 0); \
ctgDebug("CTG WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ qDebug("CTG WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosWLockLatch(_lock); \ taosWLockLatch(_lock); \
ctgDebug("CTG WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ qDebug("CTG WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \ assert(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \
} \ } \
} while (0) } while (0)
...@@ -106,15 +128,15 @@ typedef uint32_t (*tableNameHashFp)(const char *, uint32_t); ...@@ -106,15 +128,15 @@ typedef uint32_t (*tableNameHashFp)(const char *, uint32_t);
#define CTG_UNLOCK(type, _lock) do { \ #define CTG_UNLOCK(type, _lock) do { \
if (CTG_READ == (type)) { \ if (CTG_READ == (type)) { \
assert(atomic_load_32((_lock)) > 0); \ assert(atomic_load_32((_lock)) > 0); \
ctgDebug("CTG RULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ qDebug("CTG RULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosRUnLockLatch(_lock); \ taosRUnLockLatch(_lock); \
ctgDebug("CTG RULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ qDebug("CTG RULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) >= 0); \ assert(atomic_load_32((_lock)) >= 0); \
} else { \ } else { \
assert(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \ assert(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \
ctgDebug("CTG WULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ qDebug("CTG WULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosWUnLockLatch(_lock); \ taosWUnLockLatch(_lock); \
ctgDebug("CTG WULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ qDebug("CTG WULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) >= 0); \ assert(atomic_load_32((_lock)) >= 0); \
} \ } \
} while (0) } while (0)
......
此差异已折叠。
...@@ -46,6 +46,8 @@ void ctgTestSetPrepareSTableMeta(); ...@@ -46,6 +46,8 @@ void ctgTestSetPrepareSTableMeta();
bool ctgTestStop = false; bool ctgTestStop = false;
bool ctgTestEnableSleep = false; bool ctgTestEnableSleep = false;
bool ctgTestDeadLoop = false; bool ctgTestDeadLoop = false;
int32_t ctgTestPrintNum = 200000;
int32_t ctgTestCurrentVgVersion = 0; int32_t ctgTestCurrentVgVersion = 0;
int32_t ctgTestVgVersion = 1; int32_t ctgTestVgVersion = 1;
...@@ -101,7 +103,6 @@ void ctgTestInitLogFile() { ...@@ -101,7 +103,6 @@ void ctgTestInitLogFile() {
const char *defaultLogFileNamePrefix = "taoslog"; const char *defaultLogFileNamePrefix = "taoslog";
const int32_t maxLogFileNum = 10; const int32_t maxLogFileNum = 10;
ctgDebugFlag = 159;
tsAsyncLog = 0; tsAsyncLog = 0;
char temp[128] = {0}; char temp[128] = {0};
...@@ -216,6 +217,7 @@ void ctgTestPrepareDbVgroups(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcM ...@@ -216,6 +217,7 @@ void ctgTestPrepareDbVgroups(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcM
ctgTestCurrentVgVersion = ctgTestVgVersion; ctgTestCurrentVgVersion = ctgTestVgVersion;
rspMsg->vgNum = htonl(ctgTestVgNum); rspMsg->vgNum = htonl(ctgTestVgNum);
rspMsg->hashMethod = 0; rspMsg->hashMethod = 0;
rspMsg->uid = htobe64(3);
SVgroupInfo *vg = NULL; SVgroupInfo *vg = NULL;
uint32_t hashUnit = UINT32_MAX / ctgTestVgNum; uint32_t hashUnit = UINT32_MAX / ctgTestVgNum;
...@@ -507,7 +509,7 @@ void *ctgTestGetDbVgroupThread(void *param) { ...@@ -507,7 +509,7 @@ void *ctgTestGetDbVgroupThread(void *param) {
if (ctgTestEnableSleep) { if (ctgTestEnableSleep) {
usleep(rand()%5); usleep(rand()%5);
} }
if (++n % 50000 == 0) { if (++n % ctgTestPrintNum == 0) {
printf("Get:%d\n", n); printf("Get:%d\n", n);
} }
} }
...@@ -531,7 +533,7 @@ void *ctgTestSetDbVgroupThread(void *param) { ...@@ -531,7 +533,7 @@ void *ctgTestSetDbVgroupThread(void *param) {
if (ctgTestEnableSleep) { if (ctgTestEnableSleep) {
usleep(rand()%5); usleep(rand()%5);
} }
if (++n % 50000 == 0) { if (++n % ctgTestPrintNum == 0) {
printf("Set:%d\n", n); printf("Set:%d\n", n);
} }
} }
...@@ -563,7 +565,7 @@ void *ctgTestGetCtableMetaThread(void *param) { ...@@ -563,7 +565,7 @@ void *ctgTestGetCtableMetaThread(void *param) {
usleep(rand()%5); usleep(rand()%5);
} }
if (++n % 50000 == 0) { if (++n % ctgTestPrintNum == 0) {
printf("Get:%d\n", n); printf("Get:%d\n", n);
} }
} }
...@@ -589,7 +591,7 @@ void *ctgTestSetCtableMetaThread(void *param) { ...@@ -589,7 +591,7 @@ void *ctgTestSetCtableMetaThread(void *param) {
if (ctgTestEnableSleep) { if (ctgTestEnableSleep) {
usleep(rand()%5); usleep(rand()%5);
} }
if (++n % 50000 == 0) { if (++n % ctgTestPrintNum == 0) {
printf("Set:%d\n", n); printf("Set:%d\n", n);
} }
} }
...@@ -627,6 +629,7 @@ TEST(tableMeta, normalTable) { ...@@ -627,6 +629,7 @@ TEST(tableMeta, normalTable) {
ASSERT_EQ(vgInfo.vgId, 8); ASSERT_EQ(vgInfo.vgId, 8);
ASSERT_EQ(vgInfo.numOfEps, 3); ASSERT_EQ(vgInfo.numOfEps, 3);
ctgTestSetPrepareTableMeta(); ctgTestSetPrepareTableMeta();
STableMeta *tableMeta = NULL; STableMeta *tableMeta = NULL;
...@@ -653,6 +656,41 @@ TEST(tableMeta, normalTable) { ...@@ -653,6 +656,41 @@ TEST(tableMeta, normalTable) {
ASSERT_EQ(tableMeta->tableInfo.precision, 1); ASSERT_EQ(tableMeta->tableInfo.precision, 1);
ASSERT_EQ(tableMeta->tableInfo.rowSize, 12); ASSERT_EQ(tableMeta->tableInfo.rowSize, 12);
SDbVgVersion *dbs = NULL;
SSTableMetaVersion *stb = NULL;
uint32_t dbNum = 0, stbNum = 0, allDbNum = 0, allStbNum = 0;
int32_t i = 0;
while (i < 5) {
++i;
code = catalogGetExpiredDBs(pCtg, &dbs, &dbNum);
ASSERT_EQ(code, 0);
code = catalogGetExpiredSTables(pCtg, &stb, &stbNum);
ASSERT_EQ(code, 0);
if (dbNum) {
printf("got expired db,dbId:%"PRId64"\n", dbs->dbId);
free(dbs);
dbs = NULL;
} else {
printf("no expired db\n");
}
if (stbNum) {
printf("got expired stb,suid:%"PRId64"\n", stb->suid);
free(stb);
stb = NULL;
} else {
printf("no expired stb\n");
}
allDbNum += dbNum;
allStbNum += stbNum;
sleep(2);
}
ASSERT_EQ(allDbNum, 1);
ASSERT_EQ(allStbNum, 0);
catalogDestroy(); catalogDestroy();
} }
...@@ -714,6 +752,42 @@ TEST(tableMeta, childTableCase) { ...@@ -714,6 +752,42 @@ TEST(tableMeta, childTableCase) {
ASSERT_EQ(tableMeta->tableInfo.precision, 1); ASSERT_EQ(tableMeta->tableInfo.precision, 1);
ASSERT_EQ(tableMeta->tableInfo.rowSize, 12); ASSERT_EQ(tableMeta->tableInfo.rowSize, 12);
SDbVgVersion *dbs = NULL;
SSTableMetaVersion *stb = NULL;
uint32_t dbNum = 0, stbNum = 0, allDbNum = 0, allStbNum = 0;
int32_t i = 0;
while (i < 5) {
++i;
code = catalogGetExpiredDBs(pCtg, &dbs, &dbNum);
ASSERT_EQ(code, 0);
code = catalogGetExpiredSTables(pCtg, &stb, &stbNum);
ASSERT_EQ(code, 0);
if (dbNum) {
printf("got expired db,dbId:%"PRId64"\n", dbs->dbId);
free(dbs);
dbs = NULL;
} else {
printf("no expired db\n");
}
if (stbNum) {
printf("got expired stb,suid:%"PRId64"\n", stb->suid);
free(stb);
stb = NULL;
} else {
printf("no expired stb\n");
}
allDbNum += dbNum;
allStbNum += stbNum;
sleep(2);
}
ASSERT_EQ(allDbNum, 1);
ASSERT_EQ(allStbNum, 1);
catalogDestroy(); catalogDestroy();
} }
...@@ -778,6 +852,40 @@ TEST(tableMeta, superTableCase) { ...@@ -778,6 +852,40 @@ TEST(tableMeta, superTableCase) {
ASSERT_EQ(tableMeta->tableInfo.precision, 1); ASSERT_EQ(tableMeta->tableInfo.precision, 1);
ASSERT_EQ(tableMeta->tableInfo.rowSize, 12); ASSERT_EQ(tableMeta->tableInfo.rowSize, 12);
SDbVgVersion *dbs = NULL;
SSTableMetaVersion *stb = NULL;
uint32_t dbNum = 0, stbNum = 0, allDbNum = 0, allStbNum = 0;
int32_t i = 0;
while (i < 5) {
++i;
code = catalogGetExpiredDBs(pCtg, &dbs, &dbNum);
ASSERT_EQ(code, 0);
code = catalogGetExpiredSTables(pCtg, &stb, &stbNum);
ASSERT_EQ(code, 0);
if (dbNum) {
printf("got expired db,dbId:%"PRId64"\n", dbs->dbId);
free(dbs);
dbs = NULL;
} else {
printf("no expired db\n");
}
if (stbNum) {
printf("got expired stb,suid:%"PRId64"\n", stb->suid);
free(stb);
stb = NULL;
} else {
printf("no expired stb\n");
}
allDbNum += dbNum;
allStbNum += stbNum;
sleep(2);
}
ASSERT_EQ(allDbNum, 1);
ASSERT_EQ(allStbNum, 1);
catalogDestroy(); catalogDestroy();
...@@ -947,7 +1055,6 @@ TEST(dbVgroup, getSetDbVgroupCase) { ...@@ -947,7 +1055,6 @@ TEST(dbVgroup, getSetDbVgroupCase) {
catalogDestroy(); catalogDestroy();
} }
TEST(multiThread, getSetDbVgroupCase) { TEST(multiThread, getSetDbVgroupCase) {
struct SCatalog* pCtg = NULL; struct SCatalog* pCtg = NULL;
void *mockPointer = (void *)0x1; void *mockPointer = (void *)0x1;
...@@ -955,6 +1062,7 @@ TEST(multiThread, getSetDbVgroupCase) { ...@@ -955,6 +1062,7 @@ TEST(multiThread, getSetDbVgroupCase) {
SVgroupInfo *pvgInfo = NULL; SVgroupInfo *pvgInfo = NULL;
SDBVgroupInfo dbVgroup = {0}; SDBVgroupInfo dbVgroup = {0};
SArray *vgList = NULL; SArray *vgList = NULL;
ctgTestStop = false;
ctgTestInitLogFile(); ctgTestInitLogFile();
...@@ -998,7 +1106,6 @@ TEST(multiThread, getSetDbVgroupCase) { ...@@ -998,7 +1106,6 @@ TEST(multiThread, getSetDbVgroupCase) {
catalogDestroy(); catalogDestroy();
} }
TEST(multiThread, ctableMeta) { TEST(multiThread, ctableMeta) {
struct SCatalog* pCtg = NULL; struct SCatalog* pCtg = NULL;
void *mockPointer = (void *)0x1; void *mockPointer = (void *)0x1;
...@@ -1006,6 +1113,7 @@ TEST(multiThread, ctableMeta) { ...@@ -1006,6 +1113,7 @@ TEST(multiThread, ctableMeta) {
SVgroupInfo *pvgInfo = NULL; SVgroupInfo *pvgInfo = NULL;
SDBVgroupInfo dbVgroup = {0}; SDBVgroupInfo dbVgroup = {0};
SArray *vgList = NULL; SArray *vgList = NULL;
ctgTestStop = false;
ctgTestSetPrepareDbVgroupsAndChildMeta(); ctgTestSetPrepareDbVgroupsAndChildMeta();
......
...@@ -97,6 +97,7 @@ int32_t queryProcessUseDBRsp(void* output, char *msg, int32_t msgSize) { ...@@ -97,6 +97,7 @@ int32_t queryProcessUseDBRsp(void* output, char *msg, int32_t msgSize) {
pRsp->vgVersion = ntohl(pRsp->vgVersion); pRsp->vgVersion = ntohl(pRsp->vgVersion);
pRsp->vgNum = ntohl(pRsp->vgNum); pRsp->vgNum = ntohl(pRsp->vgNum);
pRsp->uid = be64toh(pRsp->uid);
if (pRsp->vgNum < 0) { if (pRsp->vgNum < 0) {
qError("invalid db[%s] vgroup number[%d]", pRsp->db, pRsp->vgNum); qError("invalid db[%s] vgroup number[%d]", pRsp->db, pRsp->vgNum);
...@@ -111,6 +112,7 @@ int32_t queryProcessUseDBRsp(void* output, char *msg, int32_t msgSize) { ...@@ -111,6 +112,7 @@ int32_t queryProcessUseDBRsp(void* output, char *msg, int32_t msgSize) {
pOut->dbVgroup.vgVersion = pRsp->vgVersion; pOut->dbVgroup.vgVersion = pRsp->vgVersion;
pOut->dbVgroup.hashMethod = pRsp->hashMethod; pOut->dbVgroup.hashMethod = pRsp->hashMethod;
pOut->dbVgroup.dbId = pRsp->uid;
pOut->dbVgroup.vgInfo = taosHashInit(pRsp->vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); pOut->dbVgroup.vgInfo = taosHashInit(pRsp->vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
if (NULL == pOut->dbVgroup.vgInfo) { if (NULL == pOut->dbVgroup.vgInfo) {
qError("hash init[%d] failed", pRsp->vgNum); qError("hash init[%d] failed", pRsp->vgNum);
...@@ -149,8 +151,8 @@ static int32_t queryConvertTableMetaMsg(STableMetaMsg* pMetaMsg) { ...@@ -149,8 +151,8 @@ static int32_t queryConvertTableMetaMsg(STableMetaMsg* pMetaMsg) {
pMetaMsg->numOfColumns = ntohl(pMetaMsg->numOfColumns); pMetaMsg->numOfColumns = ntohl(pMetaMsg->numOfColumns);
pMetaMsg->sversion = ntohl(pMetaMsg->sversion); pMetaMsg->sversion = ntohl(pMetaMsg->sversion);
pMetaMsg->tversion = ntohl(pMetaMsg->tversion); pMetaMsg->tversion = ntohl(pMetaMsg->tversion);
pMetaMsg->tuid = htobe64(pMetaMsg->tuid); pMetaMsg->tuid = be64toh(pMetaMsg->tuid);
pMetaMsg->suid = htobe64(pMetaMsg->suid); pMetaMsg->suid = be64toh(pMetaMsg->suid);
pMetaMsg->vgId = ntohl(pMetaMsg->vgId); pMetaMsg->vgId = ntohl(pMetaMsg->vgId);
if (pMetaMsg->numOfTags < 0 || pMetaMsg->numOfTags > TSDB_MAX_TAGS) { if (pMetaMsg->numOfTags < 0 || pMetaMsg->numOfTags > TSDB_MAX_TAGS) {
......
...@@ -214,7 +214,7 @@ static FORCE_INLINE bool taosHashTableEmpty(const SHashObj *pHashObj) { ...@@ -214,7 +214,7 @@ static FORCE_INLINE bool taosHashTableEmpty(const SHashObj *pHashObj) {
return taosHashGetSize(pHashObj) == 0; return taosHashGetSize(pHashObj) == 0;
} }
int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *data, size_t size) { int32_t taosHashPutImpl(SHashObj *pHashObj, const void *key, size_t keyLen, void *data, size_t size, bool *newAdded) {
uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)keyLen); uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)keyLen);
SHashNode *pNewNode = doCreateHashNode(key, keyLen, data, size, hashVal); SHashNode *pNewNode = doCreateHashNode(key, keyLen, data, size, hashVal);
if (pNewNode == NULL) { if (pNewNode == NULL) {
...@@ -273,6 +273,10 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *da ...@@ -273,6 +273,10 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *da
__rd_unlock((void*) &pHashObj->lock, pHashObj->type); __rd_unlock((void*) &pHashObj->lock, pHashObj->type);
atomic_add_fetch_32(&pHashObj->size, 1); atomic_add_fetch_32(&pHashObj->size, 1);
if (newAdded) {
*newAdded = true;
}
return 0; return 0;
} else { } else {
// not support the update operation, return error // not support the update operation, return error
...@@ -289,10 +293,23 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *da ...@@ -289,10 +293,23 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *da
// enable resize // enable resize
__rd_unlock((void*) &pHashObj->lock, pHashObj->type); __rd_unlock((void*) &pHashObj->lock, pHashObj->type);
if (newAdded) {
*newAdded = false;
}
return pHashObj->enableUpdate ? 0 : -2; return pHashObj->enableUpdate ? 0 : -2;
} }
} }
int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *data, size_t size) {
return taosHashPutImpl(pHashObj, key, keyLen, data, size, NULL);
}
int32_t taosHashPutExt(SHashObj *pHashObj, const void *key, size_t keyLen, void *data, size_t size, bool *newAdded) {
return taosHashPutImpl(pHashObj, key, keyLen, data, size, newAdded);
}
void *taosHashGet(SHashObj *pHashObj, const void *key, size_t keyLen) { void *taosHashGet(SHashObj *pHashObj, const void *key, size_t keyLen) {
return taosHashGetClone(pHashObj, key, keyLen, NULL); return taosHashGetClone(pHashObj, key, keyLen, NULL);
} }
......
...@@ -95,7 +95,6 @@ int32_t tsdbDebugFlag = 131; ...@@ -95,7 +95,6 @@ int32_t tsdbDebugFlag = 131;
int32_t tqDebugFlag = 131; int32_t tqDebugFlag = 131;
int32_t cqDebugFlag = 131; int32_t cqDebugFlag = 131;
int32_t fsDebugFlag = 135; int32_t fsDebugFlag = 135;
int32_t ctgDebugFlag = 131;
int64_t dbgEmptyW = 0; int64_t dbgEmptyW = 0;
int64_t dbgWN = 0; int64_t dbgWN = 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册