diff --git a/src/inc/tsdb.h b/src/inc/tsdb.h index bee68b81f567bf0259edea33e944a4aa074ccdaa..341dee14769271103b8e508dab22f287bb329a99 100644 --- a/src/inc/tsdb.h +++ b/src/inc/tsdb.h @@ -92,6 +92,7 @@ typedef struct { STSchema * schema; STSchema * tagSchema; SDataRow tagValues; + char * sql; } STableCfg; int tsdbInitTableCfg(STableCfg *config, ETableType type, uint64_t uid, int32_t tid); @@ -101,6 +102,7 @@ int tsdbTableSetTagSchema(STableCfg *config, STSchema *pSchema, bool dup); int tsdbTableSetTagValue(STableCfg *config, SDataRow row, bool dup); int tsdbTableSetName(STableCfg *config, char *name, bool dup); int tsdbTableSetSName(STableCfg *config, char *sname, bool dup); +int tsdbTableSetStreamSql(STableCfg *config, char *sql, bool dup); void tsdbClearTableCfg(STableCfg *config); int32_t tsdbGetTableTagVal(TsdbRepoT *repo, STableId* id, int32_t colId, int16_t *type, int16_t *bytes, char **val); diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index 63d3eb349b75717895441c0e2661b86db3db2c67..0839e0f8ff786d6d72cfdf301ec58cd20401b685 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -85,12 +85,13 @@ typedef struct STable { TSKEY lastKey; // lastkey inserted in this table, initialized as 0, TODO: make a structure struct STable *next; // TODO: remove the next struct STable *prev; - tstr * name; // NOTE: there a flexible string here + tstr * name; // NOTE: there a flexible string here + char * sql; } STable; #define TSDB_GET_TABLE_LAST_KEY(tb) ((tb)->lastKey) -void * tsdbEncodeTable(STable *pTable, int *contLen); +void tsdbEncodeTable(STable *pTable, char *buf, int *contLen); STable *tsdbDecodeTable(void *cont, int contLen); void tsdbFreeEncode(void *cont); diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index 27473e805c1ddc0c06d8c77c3bfebe4efc0faafe..bddb3fcaff997f8589b28bc200c7a33ecf2e8a16 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -446,7 +446,7 @@ int32_t tsdbInsertData(TsdbRepoT *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg * p */ int tsdbInitTableCfg(STableCfg *config, ETableType type, uint64_t uid, int32_t tid) { if (config == NULL) return -1; - if (type != TSDB_NORMAL_TABLE && type != TSDB_CHILD_TABLE) return -1; + if (type != TSDB_CHILD_TABLE && type != TSDB_NORMAL_TABLE && type != TSDB_STREAM_TABLE) return -1; memset((void *)config, 0, sizeof(STableCfg)); @@ -455,6 +455,7 @@ int tsdbInitTableCfg(STableCfg *config, ETableType type, uint64_t uid, int32_t t config->tableId.uid = uid; config->tableId.tid = tid; config->name = NULL; + config->sql = NULL; return 0; } @@ -540,12 +541,26 @@ int tsdbTableSetSName(STableCfg *config, char *sname, bool dup) { return 0; } +int tsdbTableSetStreamSql(STableCfg *config, char *sql, bool dup) { + if (config->type != TSDB_STREAM_TABLE) return -1; + + if (dup) { + config->sql = strdup(sql); + if (config->sql == NULL) return -1; + } else { + config->sql = sql; + } + + return 0; +} + void tsdbClearTableCfg(STableCfg *config) { if (config->schema) tdFreeSchema(config->schema); if (config->tagSchema) tdFreeSchema(config->tagSchema); if (config->tagValues) tdFreeDataRow(config->tagValues); tfree(config->name); tfree(config->sname); + tfree(config->sql); } int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter) { diff --git a/src/tsdb/src/tsdbMeta.c b/src/tsdb/src/tsdbMeta.c index 95680f95c4565a500a00b14e8dfec9de91d902fe..427e15de378c9a4d7b026ffb50d138c0f6b55bdc 100644 --- a/src/tsdb/src/tsdbMeta.c +++ b/src/tsdb/src/tsdbMeta.c @@ -15,7 +15,6 @@ static int32_t tsdbCheckTableCfg(STableCfg *pCfg); static int tsdbAddTableToMeta(STsdbMeta *pMeta, STable *pTable, bool addIdx); static int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable); static int tsdbRemoveTableFromIndex(STsdbMeta *pMeta, STable *pTable); -static int tsdbEstimateTableEncodeSize(STable *pTable); static int tsdbRemoveTableFromMeta(STsdbMeta *pMeta, STable *pTable, bool rmFromIdx); /** @@ -28,16 +27,10 @@ static int tsdbRemoveTableFromMeta(STsdbMeta *pMeta, STable *pTable, bool rm * @return binary content for success * NULL fro failure */ -void *tsdbEncodeTable(STable *pTable, int *contLen) { - if (pTable == NULL) return NULL; - - *contLen = tsdbEstimateTableEncodeSize(pTable); - if (*contLen < 0) return NULL; - - void *ret = calloc(1, *contLen); - if (ret == NULL) return NULL; +void tsdbEncodeTable(STable *pTable, char *buf, int *contLen) { + if (pTable == NULL) return; - void *ptr = ret; + void *ptr = buf; T_APPEND_MEMBER(ptr, pTable, STable, type); // Encode name, todo refactor *(int *)ptr = varDataLen(pTable->name); @@ -59,7 +52,11 @@ void *tsdbEncodeTable(STable *pTable, int *contLen) { ptr = tdEncodeSchema(ptr, pTable->schema); } - return ret; + if (pTable->type == TSDB_STREAM_TABLE) { + ptr = taosEncodeString(ptr, pTable->sql); + } + + *contLen = (char *)ptr - buf; } /** @@ -97,10 +94,15 @@ STable *tsdbDecodeTable(void *cont, int contLen) { pTable->tagSchema = tdDecodeSchema(&ptr); } else if (pTable->type == TSDB_CHILD_TABLE) { pTable->tagVal = tdDataRowDup(ptr); + ptr = POINTER_SHIFT(ptr, dataRowLen(pTable->tagVal)); } else { pTable->schema = tdDecodeSchema(&ptr); } + if (pTable->type == TSDB_STREAM_TABLE) { + ptr = taosDecodeString(ptr, &(pTable->sql)); + } + return pTable; } @@ -211,7 +213,7 @@ int32_t tsdbFreeMeta(STsdbMeta *pMeta) { } STSchema *tsdbGetTableSchema(STsdbMeta *pMeta, STable *pTable) { - if (pTable->type == TSDB_NORMAL_TABLE || pTable->type == TSDB_SUPER_TABLE) { + if (pTable->type == TSDB_NORMAL_TABLE || pTable->type == TSDB_SUPER_TABLE || pTable->type == TSDB_STREAM_TABLE) { return pTable->schema; } else if (pTable->type == TSDB_CHILD_TABLE) { STable *pSuper = tsdbGetTableByUid(pMeta, pTable->superUid); @@ -283,6 +285,67 @@ char* tsdbGetTableName(TsdbRepoT *repo, const STableId* id, int16_t* bytes) { } } +static STable *tsdbNewTable(STableCfg *pCfg, bool isSuper) { + STable *pTable = NULL; + size_t tsize = 0; + + pTable = (STable *)calloc(1, sizeof(STable)); + if (pTable == NULL) { + terrno = TSDB_CODE_SERV_OUT_OF_MEMORY; + goto _err; + } + + pTable->type = pCfg->type; + tsize = strnlen(pCfg->sname, TSDB_TABLE_NAME_LEN); + pTable->name = calloc(1, tsize + VARSTR_HEADER_SIZE + 1); + if (pTable->name == NULL) { + terrno = TSDB_CODE_SERV_OUT_OF_MEMORY; + goto _err; + } + STR_WITH_SIZE_TO_VARSTR(pTable->name, pCfg->sname, tsize); + + if (isSuper) { + pTable->type = TSDB_SUPER_TABLE; + pTable->tableId.uid = pCfg->superUid; + pTable->tableId.tid = -1; + pTable->superUid = TSDB_INVALID_SUPER_TABLE_ID; + pTable->schema = tdDupSchema(pCfg->schema); + pTable->tagSchema = tdDupSchema(pCfg->tagSchema); + + STColumn *pColSchema = schemaColAt(pTable->tagSchema, 0); + pTable->pIndex = tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, pColSchema->type, pColSchema->bytes, 1, 0, 1, + getTagIndexKey); // Allow duplicate key, no lock + if (pTable->pIndex == NULL) { + terrno = TSDB_CODE_SERV_OUT_OF_MEMORY; + goto _err; + } + } else { + pTable->type = pCfg->type; + pTable->tableId.uid = pCfg->tableId.uid; + pTable->tableId.tid = pCfg->tableId.tid; + pTable->lastKey = TSKEY_INITIAL_VAL; + + if (pCfg->type == TSDB_CHILD_TABLE) { + pTable->superUid = pCfg->superUid; + pTable->tagVal = tdDataRowDup(pCfg->tagValues); + } else if (pCfg->type == TSDB_NORMAL_TABLE) { + pTable->superUid = -1; + pTable->schema = tdDupSchema(pCfg->schema); + } else { + ASSERT(pCfg->type == TSDB_STREAM_TABLE); + pTable->superUid = -1; + pTable->schema = tdDupSchema(pCfg->schema); + pTable->sql = strdup(pCfg->sql); + } + } + + return pTable; + +_err: + tsdbFreeTable(pTable); + return NULL; +} + int tsdbCreateTable(TsdbRepoT *repo, STableCfg *pCfg) { STsdbRepo *pRepo = (STsdbRepo *)repo; STsdbMeta *pMeta = pRepo->tsdbMeta; @@ -303,61 +366,19 @@ int tsdbCreateTable(TsdbRepoT *repo, STableCfg *pCfg) { super = tsdbGetTableByUid(pMeta, pCfg->superUid); if (super == NULL) { // super table not exists, try to create it newSuper = 1; - // TODO: use function to implement create table object - super = (STable *)calloc(1, sizeof(STable)); + super = tsdbNewTable(pCfg, true); if (super == NULL) return -1; - - super->type = TSDB_SUPER_TABLE; - super->tableId.uid = pCfg->superUid; - super->tableId.tid = -1; - super->superUid = TSDB_INVALID_SUPER_TABLE_ID; - super->schema = tdDupSchema(pCfg->schema); - super->tagSchema = tdDupSchema(pCfg->tagSchema); - super->tagVal = NULL; - - // todo refactor extract method - size_t size = strnlen(pCfg->sname, TSDB_TABLE_NAME_LEN); - super->name = calloc(1, size + VARSTR_HEADER_SIZE + 1); - STR_WITH_SIZE_TO_VARSTR(super->name, pCfg->sname, size); - - // index the first tag column - STColumn* pColSchema = schemaColAt(super->tagSchema, 0); - super->pIndex = tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, pColSchema->type, pColSchema->bytes, - 1, 0, 1, getTagIndexKey); // Allow duplicate key, no lock - - if (super->pIndex == NULL) { - tdFreeSchema(super->schema); - tdFreeSchema(super->tagSchema); - tdFreeDataRow(super->tagVal); - free(super); - return -1; - } } else { if (super->type != TSDB_SUPER_TABLE) return -1; } } - STable *table = (STable *)calloc(1, sizeof(STable)); + STable *table = tsdbNewTable(pCfg, false); if (table == NULL) { - if (newSuper) tsdbFreeTable(super); - return -1; - } - - table->tableId = pCfg->tableId; - - size_t size = strnlen(pCfg->name, TSDB_TABLE_NAME_LEN); - table->name = calloc(1, size + VARSTR_HEADER_SIZE + 1); - STR_WITH_SIZE_TO_VARSTR(table->name, pCfg->name, size); - - table->lastKey = 0; - if (IS_CREATE_STABLE(pCfg)) { // TSDB_CHILD_TABLE - table->type = TSDB_CHILD_TABLE; - table->superUid = pCfg->superUid; - table->tagVal = tdDataRowDup(pCfg->tagValues); - } else { // TSDB_NORMAL_TABLE - table->type = TSDB_NORMAL_TABLE; - table->superUid = -1; - table->schema = tdDupSchema(pCfg->schema); + if (newSuper) { + tsdbFreeTable(super); + return -1; + } } // Register to meta @@ -372,15 +393,15 @@ int tsdbCreateTable(TsdbRepoT *repo, STableCfg *pCfg) { // Write to meta file int bufLen = 0; + char *buf = malloc(4096); if (newSuper) { - void *buf = tsdbEncodeTable(super, &bufLen); + tsdbEncodeTable(super, buf, &bufLen); tsdbInsertMetaRecord(pMeta->mfh, super->tableId.uid, buf, bufLen); - tsdbFreeEncode(buf); } - void *buf = tsdbEncodeTable(table, &bufLen); + tsdbEncodeTable(table, buf, &bufLen); tsdbInsertMetaRecord(pMeta->mfh, table->tableId.uid, buf, bufLen); - tsdbFreeEncode(buf); + tfree(buf); return 0; } @@ -438,13 +459,18 @@ static void tsdbFreeMemTable(SMemTable *pMemTable) { } static int tsdbFreeTable(STable *pTable) { - // TODO: finish this function + if (pTable == NULL) return 0; + if (pTable->type == TSDB_CHILD_TABLE) { tdFreeDataRow(pTable->tagVal); } else { tdFreeSchema(pTable->schema); } + if (pTable->type == TSDB_STREAM_TABLE) { + tfree(pTable->sql); + } + // Free content if (TSDB_TABLE_IS_SUPER_TABLE(pTable)) { tdFreeSchema(pTable->tagSchema); @@ -491,6 +517,9 @@ static int tsdbAddTableToMeta(STsdbMeta *pMeta, STable *pTable, bool addIdx) { if (pTable->type == TSDB_CHILD_TABLE && addIdx) { // add STABLE to the index tsdbAddTableIntoIndex(pMeta, pTable); } + if (pTable->type == TSDB_STREAM_TABLE && addIdx) { + // TODO + } pMeta->nTables++; } @@ -522,7 +551,6 @@ static int tsdbRemoveTableFromMeta(STsdbMeta *pMeta, STable *pTable, bool rmFrom tSkipListDestroyIter(pIter); - // TODO: Remove the table from the list if (pTable->prev != NULL) { pTable->prev->next = pTable->next; if (pTable->next != NULL) { @@ -536,6 +564,9 @@ static int tsdbRemoveTableFromMeta(STsdbMeta *pMeta, STable *pTable, bool rmFrom if (pTable->type == TSDB_CHILD_TABLE && rmFromIdx) { tsdbRemoveTableFromIndex(pMeta, pTable); } + if (pTable->type == TSDB_STREAM_TABLE && rmFromIdx) { + // TODO + } pMeta->nTables--; } @@ -598,26 +629,6 @@ static int tsdbRemoveTableFromIndex(STsdbMeta *pMeta, STable *pTable) { return 0; } -static int tsdbEstimateTableEncodeSize(STable *pTable) { - int size = 0; - size += T_MEMBER_SIZE(STable, type); - size += sizeof(int) + varDataLen(pTable->name); - size += T_MEMBER_SIZE(STable, tableId); - size += T_MEMBER_SIZE(STable, superUid); - size += T_MEMBER_SIZE(STable, sversion); - - if (pTable->type == TSDB_SUPER_TABLE) { - size += tdGetSchemaEncodeSize(pTable->schema); - size += tdGetSchemaEncodeSize(pTable->tagSchema); - } else if (pTable->type == TSDB_CHILD_TABLE) { - size += dataRowLen(pTable->tagVal); - } else { - size += tdGetSchemaEncodeSize(pTable->schema); - } - - return size; -} - char *getTSTupleKey(const void * data) { SDataRow row = (SDataRow)data; return POINTER_SHIFT(row, TD_DATA_ROW_HEAD_SIZE); diff --git a/src/util/inc/tcoding.h b/src/util/inc/tcoding.h index b4f7f596c56ff6bde8242881d1f8e06a7264986a..cc9caf71d04bdceba3dab1916470d8803863992f 100644 --- a/src/util/inc/tcoding.h +++ b/src/util/inc/tcoding.h @@ -217,6 +217,28 @@ static FORCE_INLINE void *taosDecodeVariant64(void *buf, uint64_t *value) { return NULL; // error happened } +static FORCE_INLINE void *taosEncodeString(void *buf, char *value) { + size_t size = strlen(value); + + buf = taosEncodeVariant64(buf, size); + memcpy(buf, value, size); + + return POINTER_SHIFT(buf, size); +} + +static FORCE_INLINE void *taosDecodeString(void *buf, char **value) { + uint64_t size = 0; + + buf = taosDecodeVariant64(buf, &size); + *value = (char *)malloc(size + 1); + if (*value == NULL) return NULL; + memcpy(*value, buf, size); + + (*value)[size] = '\0'; + + return POINTER_SHIFT(buf, size); +} + #ifdef __cplusplus } #endif diff --git a/src/vnode/src/vnodeWrite.c b/src/vnode/src/vnodeWrite.c index 9c415d6af756d6c9f015ef50055a00b2c48a910b..25cd0983e526cb881335b9369a39d85c50f0f281 100644 --- a/src/vnode/src/vnodeWrite.c +++ b/src/vnode/src/vnodeWrite.c @@ -106,6 +106,7 @@ static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pR static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) { SMDCreateTableMsg *pTable = pCont; int32_t code = 0; + char sql[1024] = "\0"; vTrace("vgId:%d, table:%s, start to create", pVnode->vgId, pTable->tableId); int16_t numOfColumns = htons(pTable->numOfColumns); @@ -149,6 +150,11 @@ static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRe tsdbTableSetTagValue(&tCfg, dataRow, false); } + if (pTable->tableType == TSDB_STREAM_TABLE) { + // TODO: set sql value + tsdbTableSetStreamSql(&tCfg, sql, false); + } + code = tsdbCreateTable(pVnode->tsdb, &tCfg); tdFreeDataRow(dataRow); tfree(pDestTagSchema);