提交 f660db70 编写于 作者: H Hongze Cheng

more

上级 25e569e1
......@@ -38,7 +38,7 @@ int metaCreate(const char *path);
void metaDestroy(const char *path);
SMeta *metaOpen(SMetaOpts *);
void metaClose(SMeta *);
int metaCreateTable(SMeta *, STableOpts *);
int metaCreateTable(SMeta *, const STableOpts *);
int metaDropTable(SMeta *, uint64_t tuid_t);
int metaAlterTable(SMeta *, void *);
int metaCommit(SMeta *);
......@@ -59,13 +59,6 @@ void metaQueryOptionsDestroy(SMetaQueryOpts *);
// STableOpts
void metaTableOptsInit(STableOpts *, int8_t type, const char *name, const STSchema *pSchema);
/* -------------------------------- Hided implementations -------------------------------- */
struct STableOpts {
int8_t type;
char * name;
STSchema *pSchema;
};
#ifdef __cplusplus
}
#endif
......
......@@ -330,7 +330,7 @@ static FORCE_INLINE void *taosDecodeVariantI64(void *buf, int64_t *value) {
}
// ---- string
static FORCE_INLINE int taosEncodeString(void **buf, char *value) {
static FORCE_INLINE int taosEncodeString(void **buf, const char *value) {
int tlen = 0;
size_t size = strlen(value);
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_META_DEF_H_
#define _TD_META_DEF_H_
#include "metaUid.h"
#include "tkv.h"
#ifdef __cplusplus
extern "C" {
#endif
struct SMeta {
STableUidGenerator uidGenerator;
STkvDb* tableDb; // uid->table obj
STkvDb* tbnameDb; // tbname --> uid
STkvDb* schemaDb; // uid+version --> schema
STkvDb* tagDb; // uid --> tag
STkvDb* tagIdx; // TODO: need to integrate lucene or our own
// STkvCache* metaCache; // TODO: add a global cache here
};
/* ------------------------ TEST CODE ------------------------ */
typedef enum { META_SUPER_TABLE = 0, META_CHILD_TABLE = 1, META_NORMAL_TABLE = 2 } EMetaTableT;
typedef struct SSuperTableOpts {
tb_uid_t uid;
STSchema* pSchema; // (ts timestamp, a int)
STSchema* pTagSchema; // (tag1 binary(10), tag2 int)
} SSuperTableOpts;
typedef struct SChildTableOpts {
tb_uid_t suid; // super table uid
SKVRow tags; // tag value of the child table
} SChildTableOpts;
typedef struct SNormalTableOpts {
STSchema* pSchema;
} SNormalTableOpts;
struct STableOpts {
EMetaTableT type;
char* name;
union {
SSuperTableOpts superOpts;
SChildTableOpts childOpts;
SNormalTableOpts normalOpts;
};
};
#ifdef __cplusplus
}
#endif
#endif /*_TD_META_DEF_H_*/
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "thash.h"
#include "tkv.h"
#include "tlist.h"
#include "tlockfree.h"
#include "ttypes.h"
#include "meta.h"
#include "metaUid.h"
/* -------------------- Structures -------------------- */
typedef struct STable {
tb_uid_t uid;
char * name;
tb_uid_t suid;
SArray * schema;
} STable;
typedef struct STableObj {
bool pin;
uint64_t ref;
SRWLatch latch;
uint64_t offset;
SList * ctbList; // child table list
STable * pTable;
} STableObj;
struct SMeta {
pthread_rwlock_t rwLock;
STableUidGenerator uidGenerator;
SHashObj * pTableObjHash; // uid --> STableObj
SList * stbList; // super table list
STkvDb * tbnameDb; // tbname --> uid
STkvDb * tagDb; // uid --> tag
STkvDb * schemaDb;
STkvDb * tagIdx;
size_t totalUsed;
};
static STable * metaTableNew(tb_uid_t uid, const char *name, int32_t sver);
static STableObj *metaTableObjNew();
/* -------------------- Methods -------------------- */
SMeta *metaOpen(SMetaOpts *options) {
SMeta *pMeta = NULL;
char * err = NULL;
pMeta = (SMeta *)calloc(1, sizeof(*pMeta));
if (pMeta == NULL) {
return NULL;
}
pthread_rwlock_init(&(pMeta->rwLock), NULL);
tableUidGeneratorInit(&(pMeta->uidGenerator), IVLD_TB_UID);
pMeta->pTableObjHash = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
pMeta->stbList = tdListNew(sizeof(STableObj *));
// Options
STkvOpts *dbOptions = tkvOptsCreate();
tkvOptsSetCreateIfMissing(dbOptions, 1);
taosMkDir("meta");
// Open tbname DB
pMeta->tbnameDb = tkvOpen(dbOptions, "meta/tbname_uid_db");
// Open tag DB
pMeta->tagDb = tkvOpen(dbOptions, "meta/uid_tag_db");
// Open schema DB
pMeta->schemaDb = tkvOpen(dbOptions, "meta/schema_db");
// Open tag index
pMeta->tagIdx = tkvOpen(dbOptions, "meta/tag_idx_db");
tkvOptsDestroy(dbOptions);
return pMeta;
}
void metaClose(SMeta *pMeta) {
if (pMeta) {
tkvClose(pMeta->tagIdx);
tkvClose(pMeta->schemaDb);
tkvClose(pMeta->tagDb);
tkvClose(pMeta->tbnameDb);
tdListFree(pMeta->stbList);
taosHashCleanup(pMeta->pTableObjHash);
pthread_rwlock_destroy(&(pMeta->rwLock));
}
}
int metaCreateTable(SMeta *pMeta, STableOpts *pTableOpts) {
size_t vallen;
STkvReadOpts * ropt;
STableObj * pTableObj = NULL;
STkvWriteOpts *wopt;
// Check if table already exists
ropt = tkvReadOptsCreate();
char *uidStr = tkvGet(pMeta->tbnameDb, ropt, pTableOpts->name, strlen(pTableOpts->name), &vallen);
if (uidStr != NULL) {
// Has duplicate named table
return -1;
}
tkvReadOptsDestroy(ropt);
// Create table obj
pTableObj = metaTableObjNew();
if (pTableObj == NULL) {
// TODO
return -1;
}
// Create table object
pTableObj->pTable =
metaTableNew(generateUid(&(pMeta->uidGenerator)), pTableOpts->name, schemaVersion(pTableOpts->pSchema));
if (pTableObj->pTable == NULL) {
// TODO
}
pthread_rwlock_rdlock(&pMeta->rwLock);
taosHashPut(pMeta->pTableObjHash, &(pTableObj->pTable->uid), sizeof(tb_uid_t), &pTableObj, sizeof(pTableObj));
wopt = tkvWriteOptsCreate();
// rocksdb_writeoptions_disable_WAL(wopt, 1);
// Add to tbname db
tkvPut(pMeta->tbnameDb, wopt, pTableOpts->name, strlen(pTableOpts->name), (char *)&pTableObj->pTable->uid,
sizeof(tb_uid_t));
// Add to schema db
char id[12];
char buf[256];
void *pBuf = buf;
*(tb_uid_t *)id = pTableObj->pTable->uid;
*(int32_t *)(id + sizeof(tb_uid_t)) = schemaVersion(pTableOpts->pSchema);
int size = tdEncodeSchema(&pBuf, pTableOpts->pSchema);
tkvPut(pMeta->schemaDb, wopt, id, 12, buf, size);
tkvWriteOptsDestroy(wopt);
pthread_rwlock_unlock(&pMeta->rwLock);
return 0;
}
void metaDestroy(const char *path) { taosRemoveDir(path); }
int metaCommit(SMeta *meta) { return 0; }
void metaTableOptsInit(STableOpts *pTableOpts, int8_t type, const char *name, const STSchema *pSchema) {
pTableOpts->type = type;
pTableOpts->name = strdup(name);
pTableOpts->pSchema = tdDupSchema(pSchema);
}
/* -------------------- Static Methods -------------------- */
static STable *metaTableNew(tb_uid_t uid, const char *name, int32_t sver) {
STable *pTable = NULL;
pTable = (STable *)malloc(sizeof(*pTable));
if (pTable == NULL) {
// TODO
return NULL;
}
pTable->schema = taosArrayInit(0, sizeof(int32_t));
if (pTable->schema == NULL) {
// TODO
return NULL;
}
pTable->uid = uid;
pTable->name = strdup(name);
pTable->suid = IVLD_TB_UID;
taosArrayPush(pTable->schema, &sver);
return pTable;
}
static STableObj *metaTableObjNew() {
STableObj *pTableObj = NULL;
pTableObj = (STableObj *)malloc(sizeof(*pTableObj));
if (pTableObj == NULL) {
return NULL;
}
pTableObj->pin = true;
pTableObj->ref = 1;
taosInitRWLatch(&(pTableObj->latch));
pTableObj->offset = UINT64_MAX;
pTableObj->ctbList = NULL;
pTableObj->pTable = NULL;
return pTableObj;
}
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "meta.h"
#include "metaDef.h"
#include "tcoding.h"
static int metaCreateSuperTable(SMeta *pMeta, const char *tbname, const SSuperTableOpts *pSuperTableOpts);
static int metaCreateChildTable(SMeta *pMeta, const char *tbname, const SChildTableOpts *pChildTableOpts);
static int metaCreateNormalTable(SMeta *pMeta, const char *tbname, const SNormalTableOpts *pNormalTableOpts);
SMeta *metaOpen(SMetaOpts *pMetaOpts) {
SMeta *pMeta = NULL;
pMeta = (SMeta *)calloc(1, sizeof(*pMeta));
if (pMeta == NULL) {
return NULL;
}
// TODO: check if file exists and handle the error
taosMkDir("meta");
// Open tableDb
STkvOpts *tableDbOpts = tkvOptsCreate();
tkvOptsSetCreateIfMissing(tableDbOpts, 1);
pMeta->tableDb = tkvOpen(tableDbOpts, "meta/table_db");
tkvOptsDestroy(tableDbOpts);
// Open tbnameDb
STkvOpts *tbnameDbOpts = tkvOptsCreate();
tkvOptsSetCreateIfMissing(tbnameDbOpts, 1);
pMeta->tbnameDb = tkvOpen(tbnameDbOpts, "meta/tbname_db");
tkvOptsDestroy(tbnameDbOpts);
// Open schemaDb
STkvOpts *schemaDbOpts = tkvOptsCreate();
tkvOptsSetCreateIfMissing(schemaDbOpts, 1);
pMeta->schemaDb = tkvOpen(schemaDbOpts, "meta/schema_db");
tkvOptsDestroy(schemaDbOpts);
// Open tagDb
STkvOpts *tagDbOpts = tkvOptsCreate();
tkvOptsSetCreateIfMissing(tagDbOpts, 1);
pMeta->tagDb = tkvOpen(tagDbOpts, "meta/tag_db");
tkvOptsDestroy(tagDbOpts);
// Open tagIdx
STkvOpts *tagIdxDbOpts = tkvOptsCreate();
tkvOptsSetCreateIfMissing(tagIdxDbOpts, 1);
pMeta->tagIdx = tkvOpen(tagIdxDbOpts, "meta/tag_idx_db");
tkvOptsDestroy(tagIdxDbOpts);
// TODO: need to figure out how to persist the START UID
tableUidGeneratorInit(&(pMeta->uidGenerator), IVLD_TB_UID);
}
void metaClose(SMeta *pMeta) {
if (pMeta) {
tableUidGeneratorClear(&pMeta->uidGenerator);
tkvClose(pMeta->tagIdx);
tkvClose(pMeta->tagDb);
tkvClose(pMeta->schemaDb);
tkvClose(pMeta->tbnameDb);
tkvClose(pMeta->tableDb);
free(pMeta);
}
}
int metaCreateTable(SMeta *pMeta, const STableOpts *pTableOpts) {
size_t vallen;
char * pUid;
// Check if table already exists
pUid = tkvGet(pMeta->tbnameDb, NULL, pTableOpts->name, strlen(pTableOpts->name), &vallen);
if (pUid) {
free(pUid);
// Table already exists, return error code
return -1;
}
switch (pTableOpts->type) {
case META_SUPER_TABLE:
return metaCreateSuperTable(pMeta, pTableOpts->name, &(pTableOpts->superOpts));
case META_CHILD_TABLE:
return metaCreateChildTable(pMeta, pTableOpts->name, &(pTableOpts->childOpts));
case META_NORMAL_TABLE:
return metaCreateNormalTable(pMeta, pTableOpts->name, &(pTableOpts->normalOpts));
default:
ASSERT(0);
}
return 0;
}
/* ------------------------ STATIC METHODS ------------------------ */
static int metaCreateSuperTable(SMeta *pMeta, const char *tbname, const SSuperTableOpts *pSuperTableOpts) {
size_t vallen;
size_t keylen;
char * pVal = NULL;
char schemaKey[sizeof(tb_uid_t) * 2];
char buffer[1024]; /* TODO */
void * pBuf = NULL;
pVal = tkvGet(pMeta->tableDb, NULL, (char *)(&(pSuperTableOpts->uid)), sizeof(pSuperTableOpts->uid), &vallen);
if (pVal) {
free(pVal);
// TODO: table with same uid exists, just return error
return -1;
}
// Put uid -> tbObj
vallen = 0;
pBuf = (void *)buffer;
vallen += taosEncodeString(&pBuf, tbname); // ENCODE TABLE NAME
vallen += taosEncodeFixedI32(&pBuf, 1); // ENCODE SCHEMA, SCHEMA HERE IS AN ARRAY OF VERSIONS
vallen += taosEncodeFixedI32(&pBuf, schemaVersion(pSuperTableOpts->pSchema));
vallen += tdEncodeSchema(&pBuf, pSuperTableOpts->pTagSchema); // ENCODE TAG SCHEMA
tkvPut(pMeta->tableDb, NULL, (char *)(&(pSuperTableOpts->uid)), sizeof(pSuperTableOpts->uid), buffer, vallen);
// Put tbname -> uid
tkvPut(pMeta->tbnameDb, NULL, tbname, strlen(tbname), (char *)(&(pSuperTableOpts->uid)),
sizeof(pSuperTableOpts->uid));
// Put uid+sversion -> schema
*(tb_uid_t *)schemaKey = pSuperTableOpts->uid;
*(int32_t *)(POINTER_SHIFT(schemaKey, sizeof(tb_uid_t))) = schemaVersion(pSuperTableOpts->pSchema);
keylen = sizeof(tb_uid_t) + sizeof(int32_t);
pBuf = (void *)buffer;
vallen = tdEncodeSchema(&pBuf, pSuperTableOpts->pSchema);
tkvPut(pMeta->schemaDb, NULL, schemaKey, keylen, buffer, vallen);
return 0;
}
static int metaCreateChildTable(SMeta *pMeta, const char *tbname, const SChildTableOpts *pChildTableOpts) {
size_t vallen;
char buffer[1024]; /* TODO */
void * pBuf = NULL;
char * pTable;
tb_uid_t uid;
// Check if super table exists
pTable = tkvGet(pMeta->tableDb, NULL, (char *)(&(pChildTableOpts->suid)), sizeof(pChildTableOpts->suid), &vallen);
if (pTable == NULL) {
// Super table not exists, just return error
return -1;
}
// Generate a uid to the new table
uid = generateUid(&pMeta->uidGenerator);
// Put uid -> tbObj
vallen = 0;
pBuf = (void *)buffer;
vallen += taosEncodeString(&pBuf, tbname);
vallen += taosEncodeFixedU64(pBuf, pChildTableOpts->suid);
tkvPut(pMeta->tableDb, NULL, (char *)(&uid), sizeof(uid), buffer, vallen);
// Put tbname -> uid
tkvPut(pMeta->tbnameDb, NULL, tbname, strlen(tbname), (char *)(&uid), sizeof(uid));
// Put uid-> tags
tkvPut(pMeta->tagDb, NULL, (char *)(&uid), sizeof(uid), (char *)pChildTableOpts->tags,
(size_t)kvRowLen(pChildTableOpts->tags));
// TODO: Put tagIdx
return 0;
}
static int metaCreateNormalTable(SMeta *pMeta, const char *tbname, const SNormalTableOpts *pNormalTableOpts) {
size_t vallen;
char keyBuf[sizeof(tb_uid_t) + sizeof(int32_t)];
char buffer[1024]; /* TODO */
void * pBuf = NULL;
tb_uid_t uid;
// Generate a uid to the new table
uid = generateUid(&pMeta->uidGenerator);
// Put uid -> tbObj
vallen = 0;
pBuf = (void *)buffer;
vallen += taosEncodeString(&pBuf, tbname);
vallen += taosEncodeFixedI32(&pBuf, 1);
vallen += taosEncodeFixedI32(&pBuf, schemaVersion(pNormalTableOpts->pSchema));
tkvPut(pMeta->tableDb, NULL, (char *)(&uid), sizeof(uid), buffer, vallen);
// Put tbname -> uid
tkvPut(pMeta->tbnameDb, NULL, tbname, strlen(tbname), (char *)(&(uid)), sizeof(uid));
// Put uid+sversion -> schema
vallen = 0;
pBuf = (void *)buffer;
vallen += tdEncodeSchema(&pBuf, pNormalTableOpts->pSchema);
tkvPut(pMeta->schemaDb, NULL, keyBuf, sizeof(tb_uid_t) + sizeof(int32_t), buffer, vallen);
return 0;
}
#if 0
/* -------------------- Structures -------------------- */
static STable * metaTableNew(tb_uid_t uid, const char *name, int32_t sver);
void metaDestroy(const char *path) { taosRemoveDir(path); }
int metaCommit(SMeta *meta) { return 0; }
void metaTableOptsInit(STableOpts *pTableOpts, int8_t type, const char *name, const STSchema *pSchema) {
pTableOpts->type = type;
pTableOpts->name = strdup(name);
pTableOpts->pSchema = tdDupSchema(pSchema);
}
#endif
\ No newline at end of file
add_executable(metaTest "")
target_sources(metaTest
PRIVATE
"../src/meta.c"
"../src/metaMain.c"
"../src/metaUid.c"
"metaTests.cpp"
)
......
......@@ -9,29 +9,29 @@ TEST(MetaTest, meta_open_test) {
SMeta *meta = metaOpen(NULL);
std::cout << "Meta is opened!" << std::endl;
// Create tables
STableOpts tbOpts;
char tbname[128];
STSchema * pSchema;
STSchemaBuilder sb;
tdInitTSchemaBuilder(&sb, 0);
for (size_t i = 0; i < 10; i++) {
tdAddColToSchema(&sb, TSDB_DATA_TYPE_TIMESTAMP, i, 8);
}
pSchema = tdGetSchemaFromBuilder(&sb);
tdDestroyTSchemaBuilder(&sb);
for (size_t i = 0; i < 1000000; i++) {
sprintf(tbname, "tb%ld", i);
metaTableOptsInit(&tbOpts, 0, tbname, pSchema);
// // Create tables
// STableOpts tbOpts;
// char tbname[128];
// STSchema * pSchema;
// STSchemaBuilder sb;
// tdInitTSchemaBuilder(&sb, 0);
// for (size_t i = 0; i < 10; i++) {
// tdAddColToSchema(&sb, TSDB_DATA_TYPE_TIMESTAMP, i, 8);
// }
// pSchema = tdGetSchemaFromBuilder(&sb);
// tdDestroyTSchemaBuilder(&sb);
// for (size_t i = 0; i < 1000000; i++) {
// sprintf(tbname, "tb%ld", i);
// metaTableOptsInit(&tbOpts, 0, tbname, pSchema);
metaCreateTable(meta, &tbOpts);
}
// metaCreateTable(meta, &tbOpts);
// }
// Close Meta
metaClose(meta);
std::cout << "Meta is closed!" << std::endl;
// Destroy Meta
metaDestroy("meta");
std::cout << "Meta is destroyed!" << std::endl;
// // Destroy Meta
// metaDestroy("meta");
// std::cout << "Meta is destroyed!" << std::endl;
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册