提交 83c75d87 编写于 作者: H Hongze Cheng

TD-354

上级 13760c7e
......@@ -92,6 +92,7 @@ typedef struct {
STSchema * schema;
STSchema * tagSchema;
SDataRow tagValues;
char * sql;
} STableCfg;
int tsdbInitTableCfg(STableCfg *config, ETableType type, uint64_t uid, int32_t tid);
......@@ -101,6 +102,7 @@ int tsdbTableSetTagSchema(STableCfg *config, STSchema *pSchema, bool dup);
int tsdbTableSetTagValue(STableCfg *config, SDataRow row, bool dup);
int tsdbTableSetName(STableCfg *config, char *name, bool dup);
int tsdbTableSetSName(STableCfg *config, char *sname, bool dup);
int tsdbTableSetStreamSql(STableCfg *config, char *sql, bool dup);
void tsdbClearTableCfg(STableCfg *config);
int32_t tsdbGetTableTagVal(TsdbRepoT *repo, STableId* id, int32_t colId, int16_t *type, int16_t *bytes, char **val);
......
......@@ -85,12 +85,13 @@ typedef struct STable {
TSKEY lastKey; // lastkey inserted in this table, initialized as 0, TODO: make a structure
struct STable *next; // TODO: remove the next
struct STable *prev;
tstr * name; // NOTE: there a flexible string here
tstr * name; // NOTE: there a flexible string here
char * sql;
} STable;
#define TSDB_GET_TABLE_LAST_KEY(tb) ((tb)->lastKey)
void * tsdbEncodeTable(STable *pTable, int *contLen);
void tsdbEncodeTable(STable *pTable, char *buf, int *contLen);
STable *tsdbDecodeTable(void *cont, int contLen);
void tsdbFreeEncode(void *cont);
......
......@@ -446,7 +446,7 @@ int32_t tsdbInsertData(TsdbRepoT *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg * p
*/
int tsdbInitTableCfg(STableCfg *config, ETableType type, uint64_t uid, int32_t tid) {
if (config == NULL) return -1;
if (type != TSDB_NORMAL_TABLE && type != TSDB_CHILD_TABLE) return -1;
if (type != TSDB_CHILD_TABLE && type != TSDB_NORMAL_TABLE && type != TSDB_STREAM_TABLE) return -1;
memset((void *)config, 0, sizeof(STableCfg));
......@@ -455,6 +455,7 @@ int tsdbInitTableCfg(STableCfg *config, ETableType type, uint64_t uid, int32_t t
config->tableId.uid = uid;
config->tableId.tid = tid;
config->name = NULL;
config->sql = NULL;
return 0;
}
......@@ -540,12 +541,26 @@ int tsdbTableSetSName(STableCfg *config, char *sname, bool dup) {
return 0;
}
int tsdbTableSetStreamSql(STableCfg *config, char *sql, bool dup) {
if (config->type != TSDB_STREAM_TABLE) return -1;
if (dup) {
config->sql = strdup(sql);
if (config->sql == NULL) return -1;
} else {
config->sql = sql;
}
return 0;
}
void tsdbClearTableCfg(STableCfg *config) {
if (config->schema) tdFreeSchema(config->schema);
if (config->tagSchema) tdFreeSchema(config->tagSchema);
if (config->tagValues) tdFreeDataRow(config->tagValues);
tfree(config->name);
tfree(config->sname);
tfree(config->sql);
}
int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter) {
......
......@@ -15,7 +15,6 @@ static int32_t tsdbCheckTableCfg(STableCfg *pCfg);
static int tsdbAddTableToMeta(STsdbMeta *pMeta, STable *pTable, bool addIdx);
static int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable);
static int tsdbRemoveTableFromIndex(STsdbMeta *pMeta, STable *pTable);
static int tsdbEstimateTableEncodeSize(STable *pTable);
static int tsdbRemoveTableFromMeta(STsdbMeta *pMeta, STable *pTable, bool rmFromIdx);
/**
......@@ -28,16 +27,10 @@ static int tsdbRemoveTableFromMeta(STsdbMeta *pMeta, STable *pTable, bool rm
* @return binary content for success
* NULL fro failure
*/
void *tsdbEncodeTable(STable *pTable, int *contLen) {
if (pTable == NULL) return NULL;
*contLen = tsdbEstimateTableEncodeSize(pTable);
if (*contLen < 0) return NULL;
void *ret = calloc(1, *contLen);
if (ret == NULL) return NULL;
void tsdbEncodeTable(STable *pTable, char *buf, int *contLen) {
if (pTable == NULL) return;
void *ptr = ret;
void *ptr = buf;
T_APPEND_MEMBER(ptr, pTable, STable, type);
// Encode name, todo refactor
*(int *)ptr = varDataLen(pTable->name);
......@@ -59,7 +52,11 @@ void *tsdbEncodeTable(STable *pTable, int *contLen) {
ptr = tdEncodeSchema(ptr, pTable->schema);
}
return ret;
if (pTable->type == TSDB_STREAM_TABLE) {
ptr = taosEncodeString(ptr, pTable->sql);
}
*contLen = (char *)ptr - buf;
}
/**
......@@ -97,10 +94,15 @@ STable *tsdbDecodeTable(void *cont, int contLen) {
pTable->tagSchema = tdDecodeSchema(&ptr);
} else if (pTable->type == TSDB_CHILD_TABLE) {
pTable->tagVal = tdDataRowDup(ptr);
ptr = POINTER_SHIFT(ptr, dataRowLen(pTable->tagVal));
} else {
pTable->schema = tdDecodeSchema(&ptr);
}
if (pTable->type == TSDB_STREAM_TABLE) {
ptr = taosDecodeString(ptr, &(pTable->sql));
}
return pTable;
}
......@@ -211,7 +213,7 @@ int32_t tsdbFreeMeta(STsdbMeta *pMeta) {
}
STSchema *tsdbGetTableSchema(STsdbMeta *pMeta, STable *pTable) {
if (pTable->type == TSDB_NORMAL_TABLE || pTable->type == TSDB_SUPER_TABLE) {
if (pTable->type == TSDB_NORMAL_TABLE || pTable->type == TSDB_SUPER_TABLE || pTable->type == TSDB_STREAM_TABLE) {
return pTable->schema;
} else if (pTable->type == TSDB_CHILD_TABLE) {
STable *pSuper = tsdbGetTableByUid(pMeta, pTable->superUid);
......@@ -283,6 +285,67 @@ char* tsdbGetTableName(TsdbRepoT *repo, const STableId* id, int16_t* bytes) {
}
}
static STable *tsdbNewTable(STableCfg *pCfg, bool isSuper) {
STable *pTable = NULL;
size_t tsize = 0;
pTable = (STable *)calloc(1, sizeof(STable));
if (pTable == NULL) {
terrno = TSDB_CODE_SERV_OUT_OF_MEMORY;
goto _err;
}
pTable->type = pCfg->type;
tsize = strnlen(pCfg->sname, TSDB_TABLE_NAME_LEN);
pTable->name = calloc(1, tsize + VARSTR_HEADER_SIZE + 1);
if (pTable->name == NULL) {
terrno = TSDB_CODE_SERV_OUT_OF_MEMORY;
goto _err;
}
STR_WITH_SIZE_TO_VARSTR(pTable->name, pCfg->sname, tsize);
if (isSuper) {
pTable->type = TSDB_SUPER_TABLE;
pTable->tableId.uid = pCfg->superUid;
pTable->tableId.tid = -1;
pTable->superUid = TSDB_INVALID_SUPER_TABLE_ID;
pTable->schema = tdDupSchema(pCfg->schema);
pTable->tagSchema = tdDupSchema(pCfg->tagSchema);
STColumn *pColSchema = schemaColAt(pTable->tagSchema, 0);
pTable->pIndex = tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, pColSchema->type, pColSchema->bytes, 1, 0, 1,
getTagIndexKey); // Allow duplicate key, no lock
if (pTable->pIndex == NULL) {
terrno = TSDB_CODE_SERV_OUT_OF_MEMORY;
goto _err;
}
} else {
pTable->type = pCfg->type;
pTable->tableId.uid = pCfg->tableId.uid;
pTable->tableId.tid = pCfg->tableId.tid;
pTable->lastKey = TSKEY_INITIAL_VAL;
if (pCfg->type == TSDB_CHILD_TABLE) {
pTable->superUid = pCfg->superUid;
pTable->tagVal = tdDataRowDup(pCfg->tagValues);
} else if (pCfg->type == TSDB_NORMAL_TABLE) {
pTable->superUid = -1;
pTable->schema = tdDupSchema(pCfg->schema);
} else {
ASSERT(pCfg->type == TSDB_STREAM_TABLE);
pTable->superUid = -1;
pTable->schema = tdDupSchema(pCfg->schema);
pTable->sql = strdup(pCfg->sql);
}
}
return pTable;
_err:
tsdbFreeTable(pTable);
return NULL;
}
int tsdbCreateTable(TsdbRepoT *repo, STableCfg *pCfg) {
STsdbRepo *pRepo = (STsdbRepo *)repo;
STsdbMeta *pMeta = pRepo->tsdbMeta;
......@@ -303,61 +366,19 @@ int tsdbCreateTable(TsdbRepoT *repo, STableCfg *pCfg) {
super = tsdbGetTableByUid(pMeta, pCfg->superUid);
if (super == NULL) { // super table not exists, try to create it
newSuper = 1;
// TODO: use function to implement create table object
super = (STable *)calloc(1, sizeof(STable));
super = tsdbNewTable(pCfg, true);
if (super == NULL) return -1;
super->type = TSDB_SUPER_TABLE;
super->tableId.uid = pCfg->superUid;
super->tableId.tid = -1;
super->superUid = TSDB_INVALID_SUPER_TABLE_ID;
super->schema = tdDupSchema(pCfg->schema);
super->tagSchema = tdDupSchema(pCfg->tagSchema);
super->tagVal = NULL;
// todo refactor extract method
size_t size = strnlen(pCfg->sname, TSDB_TABLE_NAME_LEN);
super->name = calloc(1, size + VARSTR_HEADER_SIZE + 1);
STR_WITH_SIZE_TO_VARSTR(super->name, pCfg->sname, size);
// index the first tag column
STColumn* pColSchema = schemaColAt(super->tagSchema, 0);
super->pIndex = tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, pColSchema->type, pColSchema->bytes,
1, 0, 1, getTagIndexKey); // Allow duplicate key, no lock
if (super->pIndex == NULL) {
tdFreeSchema(super->schema);
tdFreeSchema(super->tagSchema);
tdFreeDataRow(super->tagVal);
free(super);
return -1;
}
} else {
if (super->type != TSDB_SUPER_TABLE) return -1;
}
}
STable *table = (STable *)calloc(1, sizeof(STable));
STable *table = tsdbNewTable(pCfg, false);
if (table == NULL) {
if (newSuper) tsdbFreeTable(super);
return -1;
}
table->tableId = pCfg->tableId;
size_t size = strnlen(pCfg->name, TSDB_TABLE_NAME_LEN);
table->name = calloc(1, size + VARSTR_HEADER_SIZE + 1);
STR_WITH_SIZE_TO_VARSTR(table->name, pCfg->name, size);
table->lastKey = 0;
if (IS_CREATE_STABLE(pCfg)) { // TSDB_CHILD_TABLE
table->type = TSDB_CHILD_TABLE;
table->superUid = pCfg->superUid;
table->tagVal = tdDataRowDup(pCfg->tagValues);
} else { // TSDB_NORMAL_TABLE
table->type = TSDB_NORMAL_TABLE;
table->superUid = -1;
table->schema = tdDupSchema(pCfg->schema);
if (newSuper) {
tsdbFreeTable(super);
return -1;
}
}
// Register to meta
......@@ -372,15 +393,15 @@ int tsdbCreateTable(TsdbRepoT *repo, STableCfg *pCfg) {
// Write to meta file
int bufLen = 0;
char *buf = malloc(4096);
if (newSuper) {
void *buf = tsdbEncodeTable(super, &bufLen);
tsdbEncodeTable(super, buf, &bufLen);
tsdbInsertMetaRecord(pMeta->mfh, super->tableId.uid, buf, bufLen);
tsdbFreeEncode(buf);
}
void *buf = tsdbEncodeTable(table, &bufLen);
tsdbEncodeTable(table, buf, &bufLen);
tsdbInsertMetaRecord(pMeta->mfh, table->tableId.uid, buf, bufLen);
tsdbFreeEncode(buf);
tfree(buf);
return 0;
}
......@@ -438,13 +459,18 @@ static void tsdbFreeMemTable(SMemTable *pMemTable) {
}
static int tsdbFreeTable(STable *pTable) {
// TODO: finish this function
if (pTable == NULL) return 0;
if (pTable->type == TSDB_CHILD_TABLE) {
tdFreeDataRow(pTable->tagVal);
} else {
tdFreeSchema(pTable->schema);
}
if (pTable->type == TSDB_STREAM_TABLE) {
tfree(pTable->sql);
}
// Free content
if (TSDB_TABLE_IS_SUPER_TABLE(pTable)) {
tdFreeSchema(pTable->tagSchema);
......@@ -491,6 +517,9 @@ static int tsdbAddTableToMeta(STsdbMeta *pMeta, STable *pTable, bool addIdx) {
if (pTable->type == TSDB_CHILD_TABLE && addIdx) { // add STABLE to the index
tsdbAddTableIntoIndex(pMeta, pTable);
}
if (pTable->type == TSDB_STREAM_TABLE && addIdx) {
// TODO
}
pMeta->nTables++;
}
......@@ -522,7 +551,6 @@ static int tsdbRemoveTableFromMeta(STsdbMeta *pMeta, STable *pTable, bool rmFrom
tSkipListDestroyIter(pIter);
// TODO: Remove the table from the list
if (pTable->prev != NULL) {
pTable->prev->next = pTable->next;
if (pTable->next != NULL) {
......@@ -536,6 +564,9 @@ static int tsdbRemoveTableFromMeta(STsdbMeta *pMeta, STable *pTable, bool rmFrom
if (pTable->type == TSDB_CHILD_TABLE && rmFromIdx) {
tsdbRemoveTableFromIndex(pMeta, pTable);
}
if (pTable->type == TSDB_STREAM_TABLE && rmFromIdx) {
// TODO
}
pMeta->nTables--;
}
......@@ -598,26 +629,6 @@ static int tsdbRemoveTableFromIndex(STsdbMeta *pMeta, STable *pTable) {
return 0;
}
static int tsdbEstimateTableEncodeSize(STable *pTable) {
int size = 0;
size += T_MEMBER_SIZE(STable, type);
size += sizeof(int) + varDataLen(pTable->name);
size += T_MEMBER_SIZE(STable, tableId);
size += T_MEMBER_SIZE(STable, superUid);
size += T_MEMBER_SIZE(STable, sversion);
if (pTable->type == TSDB_SUPER_TABLE) {
size += tdGetSchemaEncodeSize(pTable->schema);
size += tdGetSchemaEncodeSize(pTable->tagSchema);
} else if (pTable->type == TSDB_CHILD_TABLE) {
size += dataRowLen(pTable->tagVal);
} else {
size += tdGetSchemaEncodeSize(pTable->schema);
}
return size;
}
char *getTSTupleKey(const void * data) {
SDataRow row = (SDataRow)data;
return POINTER_SHIFT(row, TD_DATA_ROW_HEAD_SIZE);
......
......@@ -217,6 +217,28 @@ static FORCE_INLINE void *taosDecodeVariant64(void *buf, uint64_t *value) {
return NULL; // error happened
}
static FORCE_INLINE void *taosEncodeString(void *buf, char *value) {
size_t size = strlen(value);
buf = taosEncodeVariant64(buf, size);
memcpy(buf, value, size);
return POINTER_SHIFT(buf, size);
}
static FORCE_INLINE void *taosDecodeString(void *buf, char **value) {
uint64_t size = 0;
buf = taosDecodeVariant64(buf, &size);
*value = (char *)malloc(size + 1);
if (*value == NULL) return NULL;
memcpy(*value, buf, size);
(*value)[size] = '\0';
return POINTER_SHIFT(buf, size);
}
#ifdef __cplusplus
}
#endif
......
......@@ -106,6 +106,7 @@ static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pR
static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) {
SMDCreateTableMsg *pTable = pCont;
int32_t code = 0;
char sql[1024] = "\0";
vTrace("vgId:%d, table:%s, start to create", pVnode->vgId, pTable->tableId);
int16_t numOfColumns = htons(pTable->numOfColumns);
......@@ -149,6 +150,11 @@ static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRe
tsdbTableSetTagValue(&tCfg, dataRow, false);
}
if (pTable->tableType == TSDB_STREAM_TABLE) {
// TODO: set sql value
tsdbTableSetStreamSql(&tCfg, sql, false);
}
code = tsdbCreateTable(pVnode->tsdb, &tCfg);
tdFreeDataRow(dataRow);
tfree(pDestTagSchema);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册