提交 8bfe913c 编写于 作者: H Hongze Cheng

Merge branch 'feature/vnode' of github.com:taosdata/TDengine into feature/vnode

......@@ -45,7 +45,7 @@ static SKVRow createBasicTag() {
tdInitKVRowBuilder(&rb);
for (int i = 10; i < 12; i++) {
for (int i = 0; i < 2; i++) {
void *pVal = malloc(sizeof(VarDataLenT) + strlen("foo"));
varDataLen(pVal) = strlen("foo");
memcpy(varDataVal(pVal), "foo", strlen("foo"));
......@@ -120,7 +120,7 @@ TEST(vnodeApiTest, vnodeOpen_vnodeClose_test) {
{
// Create some child tables
int ntables = 100000;
int ntables = 1000000;
int batch = 10;
for (int i = 0; i < ntables / batch; i++) {
SArray *pMsgs = (SArray *)taosArrayInit(batch, sizeof(SRpcMsg *));
......
......@@ -44,9 +44,9 @@ static SMetaDB *metaNewDB();
static void metaFreeDB(SMetaDB *pDB);
static int metaOpenBDBEnv(DB_ENV **ppEnv, const char *path);
static void metaCloseBDBEnv(DB_ENV *pEnv);
static int metaOpenBDBDb(DB **ppDB, DB_ENV *pEnv, const char *pFName);
static int metaOpenBDBDb(DB **ppDB, DB_ENV *pEnv, const char *pFName, bool isDup);
static void metaCloseBDBDb(DB *pDB);
static int metaOpenBDBIdx(DB **ppIdx, DB_ENV *pEnv, const char *pFName, DB *pDB, bdbIdxCbPtr cbf);
static int metaOpenBDBIdx(DB **ppIdx, DB_ENV *pEnv, const char *pFName, DB *pDB, bdbIdxCbPtr cbf, bool isDup);
static void metaCloseBDBIdx(DB *pIdx);
static int metaNameIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey);
static int metaStbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey);
......@@ -76,33 +76,33 @@ int metaOpenDB(SMeta *pMeta) {
}
// Open DBs
if (metaOpenBDBDb(&(pDB->pTbDB), pDB->pEvn, "meta.db") < 0) {
if (metaOpenBDBDb(&(pDB->pTbDB), pDB->pEvn, "meta.db", false) < 0) {
metaCloseDB(pMeta);
return -1;
}
if (metaOpenBDBDb(&(pDB->pSchemaDB), pDB->pEvn, "meta.db") < 0) {
if (metaOpenBDBDb(&(pDB->pSchemaDB), pDB->pEvn, "meta.db", false) < 0) {
metaCloseDB(pMeta);
return -1;
}
// Open Indices
if (metaOpenBDBIdx(&(pDB->pNameIdx), pDB->pEvn, "index.db", pDB->pTbDB, &metaNameIdxCb) < 0) {
if (metaOpenBDBIdx(&(pDB->pNameIdx), pDB->pEvn, "name.index", pDB->pTbDB, &metaNameIdxCb, false) < 0) {
metaCloseDB(pMeta);
return -1;
}
if (metaOpenBDBIdx(&(pDB->pStbIdx), pDB->pEvn, "index.db", pDB->pTbDB, &metaStbIdxCb) < 0) {
if (metaOpenBDBIdx(&(pDB->pStbIdx), pDB->pEvn, "stb.index", pDB->pTbDB, &metaStbIdxCb, false) < 0) {
metaCloseDB(pMeta);
return -1;
}
if (metaOpenBDBIdx(&(pDB->pNtbIdx), pDB->pEvn, "index.db", pDB->pTbDB, &metaNtbIdxCb) < 0) {
if (metaOpenBDBIdx(&(pDB->pNtbIdx), pDB->pEvn, "ntb.index", pDB->pTbDB, &metaNtbIdxCb, false) < 0) {
metaCloseDB(pMeta);
return -1;
}
if (metaOpenBDBIdx(&(pDB->pCtbIdx), pDB->pEvn, "index.db", pDB->pTbDB, &metaCtbIdxCb) < 0) {
if (metaOpenBDBIdx(&(pDB->pCtbIdx), pDB->pEvn, "ctb.index", pDB->pTbDB, &metaCtbIdxCb, true) < 0) {
metaCloseDB(pMeta);
return -1;
}
......@@ -150,6 +150,7 @@ int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg) {
value.data = buf;
value.size = POINTER_DISTANCE(pBuf, buf);
value.app_data = pTbCfg;
pMeta->pDB->pTbDB->put(pMeta->pDB->pTbDB, NULL, &key, &value, 0);
}
......@@ -232,7 +233,7 @@ static void metaCloseBDBEnv(DB_ENV *pEnv) {
}
}
static int metaOpenBDBDb(DB **ppDB, DB_ENV *pEnv, const char *pFName) {
static int metaOpenBDBDb(DB **ppDB, DB_ENV *pEnv, const char *pFName, bool isDup) {
int ret;
DB *pDB;
......@@ -242,6 +243,14 @@ static int metaOpenBDBDb(DB **ppDB, DB_ENV *pEnv, const char *pFName) {
return -1;
}
if (isDup) {
ret = pDB->set_flags(pDB, DB_DUPSORT);
if (ret != 0) {
BDB_PERR("Failed to set DB flags", ret);
return -1;
}
}
ret = pDB->open(pDB, NULL, pFName, NULL, DB_BTREE, DB_CREATE, 0);
if (ret) {
BDB_PERR("Failed to open META DB", ret);
......@@ -259,11 +268,11 @@ static void metaCloseBDBDb(DB *pDB) {
}
}
static int metaOpenBDBIdx(DB **ppIdx, DB_ENV *pEnv, const char *pFName, DB *pDB, bdbIdxCbPtr cbf) {
static int metaOpenBDBIdx(DB **ppIdx, DB_ENV *pEnv, const char *pFName, DB *pDB, bdbIdxCbPtr cbf, bool isDup) {
DB *pIdx;
int ret;
if (metaOpenBDBDb(ppIdx, pEnv, pFName) < 0) {
if (metaOpenBDBDb(ppIdx, pEnv, pFName, isDup) < 0) {
return -1;
}
......@@ -283,30 +292,24 @@ static void metaCloseBDBIdx(DB *pIdx) {
}
static int metaNameIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey) {
char *name;
STbCfg *pTbCfg = (STbCfg *)(pValue->app_data);
memset(pSKey, 0, sizeof(*pSKey));
taosDecodeString(pValue->data, &name);
pSKey->data = name;
pSKey->size = strlen(name);
pSKey->flags = DB_DBT_APPMALLOC;
pSKey->data = pTbCfg->name;
pSKey->size = strlen(pTbCfg->name);
return 0;
}
static int metaStbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey) {
STbCfg tbCfg = {0};
STbCfg *pTbCfg = (STbCfg *)(pValue->app_data);
metaDecodeTbInfo(pValue->data, &tbCfg);
if (tbCfg.type == META_SUPER_TABLE) {
if (pTbCfg->type == META_SUPER_TABLE) {
memset(pSKey, 0, sizeof(*pSKey));
pSKey->data = pKey->data;
pSKey->size = pKey->size;
metaClearTbCfg(&tbCfg);
return 0;
} else {
return DB_DONOTINDEX;
......@@ -314,17 +317,13 @@ static int metaStbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey
}
static int metaNtbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey) {
STbCfg tbCfg = {0};
STbCfg *pTbCfg = (STbCfg *)(pValue->app_data);
metaDecodeTbInfo(pValue->data, &tbCfg);
if (tbCfg.type == META_NORMAL_TABLE) {
if (pTbCfg->type == META_NORMAL_TABLE) {
memset(pSKey, 0, sizeof(*pSKey));
pSKey->data = pKey->data;
pSKey->size = pKey->size;
metaClearTbCfg(&tbCfg);
return 0;
} else {
return DB_DONOTINDEX;
......@@ -332,16 +331,26 @@ static int metaNtbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey
}
static int metaCtbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey) {
STbCfg tbCfg = {0};
STbCfg *pTbCfg = (STbCfg *)(pValue->app_data);
DBT * pDbt;
metaDecodeTbInfo(pValue->data, &tbCfg);
if (tbCfg.type == META_CHILD_TABLE) {
memset(pSKey, 0, sizeof(*pSKey));
if (pTbCfg->type == META_CHILD_TABLE) {
pDbt = calloc(2, sizeof(DBT));
pSKey->data = pKey->data;
pSKey->size = pKey->size;
// First key is suid
pDbt[0].data = &(pTbCfg->ctbCfg.suid);
pDbt[0].size = sizeof(pTbCfg->ctbCfg.suid);
// Second key is the first tag
void *pTagVal = tdGetKVRowValOfCol(pTbCfg->ctbCfg.pTag, 0);
pDbt[1].data = varDataVal(pTagVal);
pDbt[1].size = varDataLen(pTagVal);
metaClearTbCfg(&tbCfg);
// Set index key
memset(pSKey, 0, sizeof(*pSKey));
pSKey->flags = DB_DBT_MULTIPLE | DB_DBT_APPMALLOC;
pSKey->data = pDbt;
pSKey->size = 2;
return 0;
} else {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册