提交 f81120e3 编写于 作者: H Hongze Cheng

TD-90

上级 4de11d83
...@@ -69,11 +69,13 @@ typedef struct { ...@@ -69,11 +69,13 @@ typedef struct {
} SMemTable; } SMemTable;
// ---------- TSDB TABLE DEFINITION // ---------- TSDB TABLE DEFINITION
#define TSDB_MAX_TABLE_SCHEMAS 16
typedef struct STable { typedef struct STable {
int8_t type; int8_t type;
STableId tableId; STableId tableId;
uint64_t superUid; // Super table UID uint64_t superUid; // Super table UID
STSchema * schema; int16_t numOfSchemas;
STSchema ** schema;
STSchema * tagSchema; STSchema * tagSchema;
SKVRow tagVal; SKVRow tagVal;
SMemTable * mem; SMemTable * mem;
......
...@@ -37,18 +37,24 @@ void tsdbEncodeTable(STable *pTable, char *buf, int *contLen) { ...@@ -37,18 +37,24 @@ void tsdbEncodeTable(STable *pTable, char *buf, int *contLen) {
ptr = (char *)ptr + sizeof(int); ptr = (char *)ptr + sizeof(int);
memcpy(ptr, varDataVal(pTable->name), varDataLen(pTable->name)); memcpy(ptr, varDataVal(pTable->name), varDataLen(pTable->name));
ptr = (char *)ptr + varDataLen(pTable->name); ptr = (char *)ptr + varDataLen(pTable->name);
T_APPEND_MEMBER(ptr, &(pTable->tableId), STableId, uid); T_APPEND_MEMBER(ptr, &(pTable->tableId), STableId, uid);
T_APPEND_MEMBER(ptr, &(pTable->tableId), STableId, tid); T_APPEND_MEMBER(ptr, &(pTable->tableId), STableId, tid);
T_APPEND_MEMBER(ptr, pTable, STable, superUid); T_APPEND_MEMBER(ptr, pTable, STable, superUid);
if (pTable->type == TSDB_SUPER_TABLE) { if (pTable->type == TSDB_SUPER_TABLE) {
ptr = tdEncodeSchema(ptr, pTable->schema); T_APPEND_MEMBER(ptr, pTable, STable, numOfSchemas);
for (int i = 0; i < pTable->numOfSchemas; i++) {
ptr = tdEncodeSchema(ptr, pTable->schema[i]);
}
ptr = tdEncodeSchema(ptr, pTable->tagSchema); ptr = tdEncodeSchema(ptr, pTable->tagSchema);
} else if (pTable->type == TSDB_CHILD_TABLE) { } else if (pTable->type == TSDB_CHILD_TABLE) {
ptr = tdEncodeKVRow(ptr, pTable->tagVal); ptr = tdEncodeKVRow(ptr, pTable->tagVal);
} else { } else {
ptr = tdEncodeSchema(ptr, pTable->schema); T_APPEND_MEMBER(ptr, pTable, STable, numOfSchemas);
for (int i = 0; i < pTable->numOfSchemas; i++) {
ptr = tdEncodeSchema(ptr, pTable->schema[i]);
}
} }
if (pTable->type == TSDB_STREAM_TABLE) { if (pTable->type == TSDB_STREAM_TABLE) {
...@@ -71,6 +77,11 @@ void tsdbEncodeTable(STable *pTable, char *buf, int *contLen) { ...@@ -71,6 +77,11 @@ void tsdbEncodeTable(STable *pTable, char *buf, int *contLen) {
STable *tsdbDecodeTable(void *cont, int contLen) { STable *tsdbDecodeTable(void *cont, int contLen) {
STable *pTable = (STable *)calloc(1, sizeof(STable)); STable *pTable = (STable *)calloc(1, sizeof(STable));
if (pTable == NULL) return NULL; if (pTable == NULL) return NULL;
pTable->schema = (STSchema **)malloc(sizeof(STSchema *) * TSDB_MAX_TABLE_SCHEMAS);
if (pTable->schema == NULL) {
free(pTable);
return NULL;
}
void *ptr = cont; void *ptr = cont;
T_READ_MEMBER(ptr, int8_t, pTable->type); T_READ_MEMBER(ptr, int8_t, pTable->type);
...@@ -78,28 +89,34 @@ STable *tsdbDecodeTable(void *cont, int contLen) { ...@@ -78,28 +89,34 @@ STable *tsdbDecodeTable(void *cont, int contLen) {
ptr = (char *)ptr + sizeof(int); ptr = (char *)ptr + sizeof(int);
pTable->name = calloc(1, len + VARSTR_HEADER_SIZE + 1); pTable->name = calloc(1, len + VARSTR_HEADER_SIZE + 1);
if (pTable->name == NULL) return NULL; if (pTable->name == NULL) return NULL;
varDataSetLen(pTable->name, len); varDataSetLen(pTable->name, len);
memcpy(pTable->name->data, ptr, len); memcpy(pTable->name->data, ptr, len);
ptr = (char *)ptr + len; ptr = (char *)ptr + len;
T_READ_MEMBER(ptr, uint64_t, pTable->tableId.uid); T_READ_MEMBER(ptr, uint64_t, pTable->tableId.uid);
T_READ_MEMBER(ptr, int32_t, pTable->tableId.tid); T_READ_MEMBER(ptr, int32_t, pTable->tableId.tid);
T_READ_MEMBER(ptr, uint64_t, pTable->superUid); T_READ_MEMBER(ptr, uint64_t, pTable->superUid);
if (pTable->type == TSDB_SUPER_TABLE) { if (pTable->type == TSDB_SUPER_TABLE) {
pTable->schema = tdDecodeSchema(&ptr); T_READ_MEMBER(ptr, int16_t, pTable->numOfSchemas);
for (int i = 0; i < pTable->numOfSchemas; i++) {
pTable->schema[i] = tdDecodeSchema(&ptr);
}
pTable->tagSchema = tdDecodeSchema(&ptr); pTable->tagSchema = tdDecodeSchema(&ptr);
} else if (pTable->type == TSDB_CHILD_TABLE) { } else if (pTable->type == TSDB_CHILD_TABLE) {
ptr = tdDecodeKVRow(ptr, &pTable->tagVal); ptr = tdDecodeKVRow(ptr, &pTable->tagVal);
} else { } else {
pTable->schema = tdDecodeSchema(&ptr); T_READ_MEMBER(ptr, int16_t, pTable->numOfSchemas);
for (int i = 0; i < pTable->numOfSchemas; i++) {
pTable->schema[i] = tdDecodeSchema(&ptr);
}
} }
if (pTable->type == TSDB_STREAM_TABLE) { if (pTable->type == TSDB_STREAM_TABLE) {
ptr = taosDecodeString(ptr, &(pTable->sql)); ptr = taosDecodeString(ptr, &(pTable->sql));
} }
pTable->lastKey = TSKEY_INITIAL_VAL; pTable->lastKey = TSKEY_INITIAL_VAL;
return pTable; return pTable;
} }
...@@ -221,13 +238,14 @@ int32_t tsdbFreeMeta(STsdbMeta *pMeta) { ...@@ -221,13 +238,14 @@ int32_t tsdbFreeMeta(STsdbMeta *pMeta) {
return 0; return 0;
} }
// Get the newest table schema
STSchema *tsdbGetTableSchema(STsdbMeta *pMeta, STable *pTable) { STSchema *tsdbGetTableSchema(STsdbMeta *pMeta, STable *pTable) {
if (pTable->type == TSDB_NORMAL_TABLE || pTable->type == TSDB_SUPER_TABLE || pTable->type == TSDB_STREAM_TABLE) { if (pTable->type == TSDB_NORMAL_TABLE || pTable->type == TSDB_SUPER_TABLE || pTable->type == TSDB_STREAM_TABLE) {
return pTable->schema; return pTable->schema[pTable->numOfSchemas - 1];
} else if (pTable->type == TSDB_CHILD_TABLE) { } else if (pTable->type == TSDB_CHILD_TABLE) {
STable *pSuper = tsdbGetTableByUid(pMeta, pTable->superUid); STable *pSuper = tsdbGetTableByUid(pMeta, pTable->superUid);
if (pSuper == NULL) return NULL; if (pSuper == NULL) return NULL;
return pSuper->schema; return pSuper->schema[pSuper->numOfSchemas-1];
} else { } else {
return NULL; return NULL;
} }
...@@ -299,13 +317,16 @@ static STable *tsdbNewTable(STableCfg *pCfg, bool isSuper) { ...@@ -299,13 +317,16 @@ static STable *tsdbNewTable(STableCfg *pCfg, bool isSuper) {
} }
pTable->type = pCfg->type; pTable->type = pCfg->type;
pTable->numOfSchemas = 0;
if (isSuper) { if (isSuper) {
pTable->type = TSDB_SUPER_TABLE; pTable->type = TSDB_SUPER_TABLE;
pTable->tableId.uid = pCfg->superUid; pTable->tableId.uid = pCfg->superUid;
pTable->tableId.tid = -1; pTable->tableId.tid = -1;
pTable->superUid = TSDB_INVALID_SUPER_TABLE_ID; pTable->superUid = TSDB_INVALID_SUPER_TABLE_ID;
pTable->schema = tdDupSchema(pCfg->schema); pTable->schema = (STSchema **)malloc(sizeof(STSchema *) * TSDB_MAX_TABLE_SCHEMAS);
pTable->numOfSchemas = 1;
pTable->schema[0] = tdDupSchema(pCfg->schema);
pTable->tagSchema = tdDupSchema(pCfg->tagSchema); pTable->tagSchema = tdDupSchema(pCfg->tagSchema);
tsize = strnlen(pCfg->sname, TSDB_TABLE_NAME_LEN); tsize = strnlen(pCfg->sname, TSDB_TABLE_NAME_LEN);
...@@ -340,14 +361,18 @@ static STable *tsdbNewTable(STableCfg *pCfg, bool isSuper) { ...@@ -340,14 +361,18 @@ static STable *tsdbNewTable(STableCfg *pCfg, bool isSuper) {
if (pCfg->type == TSDB_CHILD_TABLE) { if (pCfg->type == TSDB_CHILD_TABLE) {
pTable->superUid = pCfg->superUid; pTable->superUid = pCfg->superUid;
pTable->tagVal = tdKVRowDup(pCfg->tagValues); pTable->tagVal = tdKVRowDup(pCfg->tagValues);
} else if (pCfg->type == TSDB_NORMAL_TABLE) {
pTable->superUid = -1;
pTable->schema = tdDupSchema(pCfg->schema);
} else { } else {
ASSERT(pCfg->type == TSDB_STREAM_TABLE); pTable->schema = (STSchema **)malloc(sizeof(STSchema *) * TSDB_MAX_TABLE_SCHEMAS);
pTable->superUid = -1; pTable->numOfSchemas = 1;
pTable->schema = tdDupSchema(pCfg->schema); pTable->schema[0] = tdDupSchema(pCfg->schema);
pTable->sql = strdup(pCfg->sql);
if (pCfg->type == TSDB_NORMAL_TABLE) {
pTable->superUid = -1;
} else {
ASSERT(pCfg->type == TSDB_STREAM_TABLE);
pTable->superUid = -1;
pTable->sql = strdup(pCfg->sql);
}
} }
} }
...@@ -580,7 +605,7 @@ static int tsdbFreeTable(STable *pTable) { ...@@ -580,7 +605,7 @@ static int tsdbFreeTable(STable *pTable) {
if (pTable->type == TSDB_CHILD_TABLE) { if (pTable->type == TSDB_CHILD_TABLE) {
kvRowFree(pTable->tagVal); kvRowFree(pTable->tagVal);
} else { } else {
tdFreeSchema(pTable->schema); for (int i = 0; i < pTable->numOfSchemas; i++) tdFreeSchema(pTable->schema[i]);
} }
if (pTable->type == TSDB_STREAM_TABLE) { if (pTable->type == TSDB_STREAM_TABLE) {
...@@ -642,9 +667,10 @@ static int tsdbAddTableToMeta(STsdbMeta *pMeta, STable *pTable, bool addIdx) { ...@@ -642,9 +667,10 @@ static int tsdbAddTableToMeta(STsdbMeta *pMeta, STable *pTable, bool addIdx) {
} }
// Update the pMeta->maxCols and pMeta->maxRowBytes // Update the pMeta->maxCols and pMeta->maxRowBytes
if (pTable->type == TSDB_SUPER_TABLE || pTable->type == TSDB_NORMAL_TABLE) { if (pTable->type == TSDB_SUPER_TABLE || pTable->type == TSDB_NORMAL_TABLE || pTable->type == TSDB_STREAM_TABLE) {
if (schemaNCols(pTable->schema) > pMeta->maxCols) pMeta->maxCols = schemaNCols(pTable->schema); if (schemaNCols(pTable->schema[pTable->numOfSchemas - 1]) > pMeta->maxCols)
int bytes = dataRowMaxBytesFromSchema(pTable->schema); pMeta->maxCols = schemaNCols(pTable->schema[pTable->numOfSchemas - 1]);
int bytes = dataRowMaxBytesFromSchema(pTable->schema[pTable->numOfSchemas - 1]);
if (bytes > pMeta->maxRowBytes) pMeta->maxRowBytes = bytes; if (bytes > pMeta->maxRowBytes) pMeta->maxRowBytes = bytes;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册