diff --git a/src/common/inc/tdataformat.h b/src/common/inc/tdataformat.h index 1d3319e61dfd2fafda7474cd5f8c30e60bc8b34f..f71f52edc57500cbc37d86db79678d78eb9badf4 100644 --- a/src/common/inc/tdataformat.h +++ b/src/common/inc/tdataformat.h @@ -66,7 +66,7 @@ typedef struct { // ----------------- TSDB SCHEMA DEFINITION typedef struct { - int totalCols; // Total columns allocated + int version; // version int numOfCols; // Number of columns appended int tlen; // maximum length of a SDataRow without the header part int flen; // First part length in a SDataRow after the header part @@ -74,16 +74,13 @@ typedef struct { } STSchema; #define schemaNCols(s) ((s)->numOfCols) -#define schemaTotalCols(s) ((s)->totalCols) +#define schemaVersion(s) ((s)->version) #define schemaTLen(s) ((s)->tlen) #define schemaFLen(s) ((s)->flen) #define schemaColAt(s, i) ((s)->columns + i) #define tdFreeSchema(s) tfree((s)) -STSchema *tdNewSchema(int32_t nCols); -int tdSchemaAddCol(STSchema *pSchema, int8_t type, int16_t colId, int32_t bytes); STSchema *tdDupSchema(STSchema *pSchema); -int tdGetSchemaEncodeSize(STSchema *pSchema); void * tdEncodeSchema(void *dst, STSchema *pSchema); STSchema *tdDecodeSchema(void **psrc); @@ -103,6 +100,22 @@ static FORCE_INLINE STColumn *tdGetColOfID(STSchema *pSchema, int16_t colId) { return (STColumn *)ptr; } +// ----------------- SCHEMA BUILDER DEFINITION +typedef struct { + int tCols; + int nCols; + int tlen; + int flen; + int version; + STColumn *columns; +} STSchemaBuilder; + +int tdInitTSchemaBuilder(STSchemaBuilder *pBuilder, int32_t version); +void tdDestroyTSchemaBuilder(STSchemaBuilder *pBuilder); +void tdResetTSchemaBuilder(STSchemaBuilder *pBuilder, int32_t version); +int tdAddColToSchema(STSchemaBuilder *pBuilder, int8_t type, int16_t colId, int32_t bytes); +STSchema *tdGetSchemaFromBuilder(STSchemaBuilder *pBuilder); + // ----------------- Data row structure /* A data row, the format is like below: diff --git a/src/common/src/tdataformat.c b/src/common/src/tdataformat.c index 7a35d5fb69d53673f64faa29241ddd6348b19c19..9816f0472f8fbfcee335302e5656409bd3d8b5b4 100644 --- a/src/common/src/tdataformat.c +++ b/src/common/src/tdataformat.c @@ -16,93 +16,27 @@ #include "talgo.h" #include "wchar.h" -/** - * Create a SSchema object with nCols columns - * ASSUMPTIONS: VALID PARAMETERS - * - * @param nCols number of columns the schema has - * - * @return a STSchema object for success - * NULL for failure - */ -STSchema *tdNewSchema(int32_t nCols) { - int32_t size = sizeof(STSchema) + sizeof(STColumn) * nCols; - - STSchema *pSchema = (STSchema *)calloc(1, size); - if (pSchema == NULL) return NULL; - - pSchema->numOfCols = 0; - pSchema->totalCols = nCols; - pSchema->flen = 0; - pSchema->tlen = 0; - - return pSchema; -} - -/** - * Append a column to the schema - */ -int tdSchemaAddCol(STSchema *pSchema, int8_t type, int16_t colId, int32_t bytes) { - if (!isValidDataType(type, 0) || pSchema->numOfCols >= pSchema->totalCols) return -1; - - STColumn *pCol = schemaColAt(pSchema, schemaNCols(pSchema)); - colSetType(pCol, type); - colSetColId(pCol, colId); - if (schemaNCols(pSchema) == 0) { - colSetOffset(pCol, 0); - } else { - STColumn *pTCol = schemaColAt(pSchema, schemaNCols(pSchema) - 1); - colSetOffset(pCol, pTCol->offset + TYPE_BYTES[pTCol->type]); - } - switch (type) { - case TSDB_DATA_TYPE_BINARY: - case TSDB_DATA_TYPE_NCHAR: - colSetBytes(pCol, bytes); // Set as maximum bytes - pSchema->tlen += (TYPE_BYTES[type] + sizeof(VarDataLenT) + bytes); - break; - default: - colSetBytes(pCol, TYPE_BYTES[type]); - pSchema->tlen += TYPE_BYTES[type]; - break; - } - - pSchema->numOfCols++; - pSchema->flen += TYPE_BYTES[type]; - - ASSERT(pCol->offset < pSchema->flen); - - return 0; -} - /** * Duplicate the schema and return a new object */ STSchema *tdDupSchema(STSchema *pSchema) { - STSchema *tSchema = tdNewSchema(schemaNCols(pSchema)); + + int tlen = sizeof(STSchema) + sizeof(STColumn) * schemaNCols(pSchema); + STSchema *tSchema = (STSchema *)malloc(tlen); if (tSchema == NULL) return NULL; - int32_t size = sizeof(STSchema) + sizeof(STColumn) * schemaNCols(pSchema); - memcpy((void *)tSchema, (void *)pSchema, size); + memcpy((void *)tSchema, (void *)pSchema, tlen); return tSchema; } -/** - * Return the size of encoded schema - */ -int tdGetSchemaEncodeSize(STSchema *pSchema) { - return T_MEMBER_SIZE(STSchema, totalCols) + - schemaNCols(pSchema) * - (T_MEMBER_SIZE(STColumn, type) + T_MEMBER_SIZE(STColumn, colId) + T_MEMBER_SIZE(STColumn, bytes)); -} - /** * Encode a schema to dst, and return the next pointer */ void *tdEncodeSchema(void *dst, STSchema *pSchema) { - ASSERT(pSchema->numOfCols == pSchema->totalCols); - T_APPEND_MEMBER(dst, pSchema, STSchema, totalCols); + T_APPEND_MEMBER(dst, pSchema, STSchema, version); + T_APPEND_MEMBER(dst, pSchema, STSchema, numOfCols); for (int i = 0; i < schemaNCols(pSchema); i++) { STColumn *pCol = schemaColAt(pSchema, i); T_APPEND_MEMBER(dst, pCol, STColumn, type); @@ -118,11 +52,14 @@ void *tdEncodeSchema(void *dst, STSchema *pSchema) { */ STSchema *tdDecodeSchema(void **psrc) { int totalCols = 0; + int version = 0; + STSchemaBuilder schemaBuilder = {0}; + T_READ_MEMBER(*psrc, int, version); T_READ_MEMBER(*psrc, int, totalCols); - STSchema *pSchema = tdNewSchema(totalCols); - if (pSchema == NULL) return NULL; + if (tdInitTSchemaBuilder(&schemaBuilder, version) < 0) return NULL; + for (int i = 0; i < totalCols; i++) { int8_t type = 0; int16_t colId = 0; @@ -131,9 +68,91 @@ STSchema *tdDecodeSchema(void **psrc) { T_READ_MEMBER(*psrc, int16_t, colId); T_READ_MEMBER(*psrc, int32_t, bytes); - tdSchemaAddCol(pSchema, type, colId, bytes); + if (tdAddColToSchema(&schemaBuilder, type, colId, bytes) < 0) { + tdDestroyTSchemaBuilder(&schemaBuilder); + return NULL; + } + } + + STSchema *pSchema = tdGetSchemaFromBuilder(&schemaBuilder); + tdDestroyTSchemaBuilder(&schemaBuilder); + return pSchema; +} + +int tdInitTSchemaBuilder(STSchemaBuilder *pBuilder, int32_t version) { + if (pBuilder == NULL) return -1; + + pBuilder->tCols = 256; + pBuilder->columns = (STColumn *)malloc(sizeof(STColumn) * pBuilder->tCols); + if (pBuilder->columns == NULL) return -1; + + tdResetTSchemaBuilder(pBuilder, version); + return 0; +} + +void tdDestroyTSchemaBuilder(STSchemaBuilder *pBuilder) { + if (pBuilder) { + tfree(pBuilder->columns); + } +} + +void tdResetTSchemaBuilder(STSchemaBuilder *pBuilder, int32_t version) { + pBuilder->nCols = 0; + pBuilder->tlen = 0; + pBuilder->flen = 0; + pBuilder->version = version; +} + +int tdAddColToSchema(STSchemaBuilder *pBuilder, int8_t type, int16_t colId, int32_t bytes) { + if (!isValidDataType(type, 0)) return -1; + + if (pBuilder->nCols >= pBuilder->tCols) { + pBuilder->tCols *= 2; + pBuilder->columns = (STColumn *)realloc(pBuilder->columns, sizeof(STColumn) * pBuilder->tCols); + if (pBuilder->columns == NULL) return -1; } + STColumn *pCol = &(pBuilder->columns[pBuilder->nCols]); + colSetType(pCol, type); + colSetColId(pCol, colId); + if (pBuilder->nCols == 0) { + colSetOffset(pCol, 0); + } else { + STColumn *pTCol = &(pBuilder->columns[pBuilder->nCols-1]); + colSetOffset(pCol, pTCol->offset + TYPE_BYTES[pTCol->type]); + } + + if (IS_VAR_DATA_TYPE(type)) { + colSetBytes(pCol, bytes); + pBuilder->tlen += (TYPE_BYTES[type] + sizeof(VarDataLenT) + bytes); + } else { + colSetBytes(pCol, TYPE_BYTES[type]); + pBuilder->tlen += TYPE_BYTES[type]; + } + + pBuilder->nCols++; + pBuilder->flen += TYPE_BYTES[type]; + + ASSERT(pCol->offset < pBuilder->flen); + + return 0; +} + +STSchema *tdGetSchemaFromBuilder(STSchemaBuilder *pBuilder) { + if (pBuilder->nCols <= 0) return NULL; + + int tlen = sizeof(STSchema) + sizeof(STColumn) * pBuilder->nCols; + + STSchema *pSchema = (STSchema *)malloc(tlen); + if (pSchema == NULL) return NULL; + + schemaVersion(pSchema) = pBuilder->version; + schemaNCols(pSchema) = pBuilder->nCols; + schemaTLen(pSchema) = pBuilder->tlen; + schemaFLen(pSchema) = pBuilder->flen; + + memcpy(schemaColAt(pSchema, 0), pBuilder->columns, sizeof(STColumn) * pBuilder->nCols); + return pSchema; } diff --git a/src/cq/test/cqtest.c b/src/cq/test/cqtest.c index 3aa649ee34ce84cdc79b9d42dffd8656137a618e..fbe3c95b8642c45dea67daef2fa6fdaa702da314 100644 --- a/src/cq/test/cqtest.c +++ b/src/cq/test/cqtest.c @@ -59,9 +59,15 @@ int main(int argc, char *argv[]) { exit(-1); } - STSchema *pSchema = tdNewSchema(2); - tdSchemaAddCol(pSchema, TSDB_DATA_TYPE_TIMESTAMP, 0, 8); - tdSchemaAddCol(pSchema, TSDB_DATA_TYPE_INT, 1, 4); + STSchemaBuilder schemaBuilder = {0}; + + tdInitTSchemaBuilder(&schemaBuilder, 0); + tdAddColToSchema(&schemaBuilder, TSDB_DATA_TYPE_TIMESTAMP, 0, 8); + tdAddColToSchema(&schemaBuilder, TSDB_DATA_TYPE_INT, 1, 4); + + STSchema *pSchema = tdGetSchemaFromBuilder(&schemaBuilder); + + tdDestroyTSchemaBuilder(&schemaBuilder); for (int sid =1; sid<10; ++sid) { cqCreate(pCq, sid, "select avg(speed) from demo.t1 sliding(1s) interval(5s)", pSchema); diff --git a/src/tsdb/src/tsdbMeta.c b/src/tsdb/src/tsdbMeta.c index 8d70789b673209763028c2d820f2baaa6062dc31..fa2d47af04cc0c3c97fe1667fcc11c4ea9332518 100644 --- a/src/tsdb/src/tsdbMeta.c +++ b/src/tsdb/src/tsdbMeta.c @@ -440,41 +440,39 @@ STable *tsdbIsValidTableToInsert(STsdbMeta *pMeta, STableId tableId) { STableCfg *tsdbCreateTableCfgFromMsg(SMDCreateTableMsg *pMsg) { if (pMsg == NULL) return NULL; - SSchema *pSchema = (SSchema *)pMsg->data; - int16_t numOfCols = htons(pMsg->numOfColumns); - int16_t numOfTags = htons(pMsg->numOfTags); + SSchema * pSchema = (SSchema *)pMsg->data; + int16_t numOfCols = htons(pMsg->numOfColumns); + int16_t numOfTags = htons(pMsg->numOfTags); + STSchemaBuilder schemaBuilder = {0}; STableCfg *pCfg = (STableCfg *)calloc(1, sizeof(STableCfg)); if (pCfg == NULL) return NULL; if (tsdbInitTableCfg(pCfg, pMsg->tableType, htobe64(pMsg->uid), htonl(pMsg->sid)) < 0) goto _err; - STSchema *pDSchema = tdNewSchema(numOfCols); - if (pDSchema == NULL) goto _err; + if (tdInitTSchemaBuilder(&schemaBuilder, htonl(pMsg->sversion)) < 0) goto _err; + for (int i = 0; i < numOfCols; i++) { - tdSchemaAddCol(pDSchema, pSchema[i].type, htons(pSchema[i].colId), htons(pSchema[i].bytes)); + tdAddColToSchema(&schemaBuilder, pSchema[i].type, htons(pSchema[i].colId), htons(pSchema[i].bytes)); } - if (tsdbTableSetSchema(pCfg, pDSchema, false) < 0) goto _err; + if (tsdbTableSetSchema(pCfg, tdGetSchemaFromBuilder(&schemaBuilder), false) < 0) goto _err; if (tsdbTableSetName(pCfg, pMsg->tableId, true) < 0) goto _err; if (numOfTags > 0) { - STSchema *pTSchema = tdNewSchema(numOfTags); + int accBytes = 0; + char *pTagData = pMsg->data + (numOfCols + numOfTags) * sizeof(SSchema); + + SKVRowBuilder kvRowBuilder = {0}; + tdResetTSchemaBuilder(&schemaBuilder, htonl(pMsg->tversion)); + if (tdInitKVRowBuilder(&kvRowBuilder) < 0) goto _err; for (int i = numOfCols; i < numOfCols + numOfTags; i++) { - tdSchemaAddCol(pTSchema, pSchema[i].type, htons(pSchema[i].colId), htons(pSchema[i].bytes)); + tdAddColToSchema(&schemaBuilder, pSchema[i].type, htons(pSchema[i].colId), htons(pSchema[i].bytes)); + tdAddColToKVRow(&kvRowBuilder, htons(pSchema[i].colId), pSchema[i].type, pTagData + accBytes); + accBytes += htons(pSchema[i + numOfCols].bytes); } - if (tsdbTableSetTagSchema(pCfg, pTSchema, false) < 0) goto _err; + if (tsdbTableSetTagSchema(pCfg, tdGetSchemaFromBuilder(&schemaBuilder), false) < 0) goto _err; if (tsdbTableSetSName(pCfg, pMsg->superTableId, true) < 0) goto _err; if (tsdbTableSetSuperUid(pCfg, htobe64(pMsg->superTableUid)) < 0) goto _err; - char * pTagData = pMsg->data + (numOfCols + numOfTags) * sizeof(SSchema); - int accBytes = 0; - SKVRowBuilder kvRowBuilder; - - if (tdInitKVRowBuilder(&kvRowBuilder) < 0) goto _err; - for (int i = 0; i < numOfTags; i++) { - STColumn *pCol = schemaColAt(pTSchema, i); - tdAddColToKVRow(&kvRowBuilder, pCol->colId, pCol->type, pTagData + accBytes); - accBytes += htons(pSchema[i+numOfCols].bytes); - } tsdbTableSetTagValue(pCfg, tdGetKVRowFromBuilder(&kvRowBuilder), false); tdDestroyKVRowBuilder(&kvRowBuilder); } @@ -484,9 +482,12 @@ STableCfg *tsdbCreateTableCfgFromMsg(SMDCreateTableMsg *pMsg) { tsdbTableSetStreamSql(pCfg, sql, true); } + tdDestroyTSchemaBuilder(&schemaBuilder); + return pCfg; _err: + tdDestroyTSchemaBuilder(&schemaBuilder); tsdbClearTableCfg(pCfg); tfree(pCfg); return NULL; diff --git a/src/vnode/src/vnodeWrite.c b/src/vnode/src/vnodeWrite.c index 90e2a482e95ce9a7e851298552f139bb4eaa739d..73d58527b54412d602517c1bf1d055ea06d189ab 100644 --- a/src/vnode/src/vnodeWrite.c +++ b/src/vnode/src/vnodeWrite.c @@ -130,52 +130,11 @@ static int32_t vnodeProcessDropTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet } static int32_t vnodeProcessAlterTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) { - SMDCreateTableMsg *pTable = pCont; - int32_t code = 0; - - vTrace("vgId:%d, table:%s, start to alter", pVnode->vgId, pTable->tableId); - int16_t numOfColumns = htons(pTable->numOfColumns); - int16_t numOfTags = htons(pTable->numOfTags); - int32_t sid = htonl(pTable->sid); - uint64_t uid = htobe64(pTable->uid); - SSchema *pSchema = (SSchema *) pTable->data; - - int32_t totalCols = numOfColumns + numOfTags; - - STableCfg tCfg; - tsdbInitTableCfg(&tCfg, pTable->tableType, uid, sid); - - STSchema *pDestSchema = tdNewSchema(numOfColumns); - for (int i = 0; i < numOfColumns; i++) { - tdSchemaAddCol(pDestSchema, pSchema[i].type, htons(pSchema[i].colId), htons(pSchema[i].bytes)); - } - tsdbTableSetSchema(&tCfg, pDestSchema, false); - - if (numOfTags != 0) { - STSchema *pDestTagSchema = tdNewSchema(numOfTags); - for (int i = numOfColumns; i < totalCols; i++) { - tdSchemaAddCol(pDestTagSchema, pSchema[i].type, htons(pSchema[i].colId), htons(pSchema[i].bytes)); - } - tsdbTableSetTagSchema(&tCfg, pDestTagSchema, false); - - char *pTagData = pTable->data + totalCols * sizeof(SSchema); - int accumBytes = 0; - SDataRow dataRow = tdNewDataRowFromSchema(pDestTagSchema); - - for (int i = 0; i < numOfTags; i++) { - STColumn *pTCol = schemaColAt(pDestTagSchema, i); - tdAppendColVal(dataRow, pTagData + accumBytes, pTCol->type, pTCol->bytes, pTCol->offset); - accumBytes += htons(pSchema[i + numOfColumns].bytes); - } - tsdbTableSetTagValue(&tCfg, dataRow, false); - } - - code = tsdbAlterTable(pVnode->tsdb, &tCfg); - - tfree(pDestSchema); - - vTrace("vgId:%d, table:%s, alter table result:%d", pVnode->vgId, pTable->tableId, code); - + STableCfg *pCfg = tsdbCreateTableCfgFromMsg((SMDCreateTableMsg *)pCont); + if (pCfg == NULL) return terrno; + int32_t code = tsdbAlterTable(pVnode->tsdb, pCfg); + tsdbClearTableCfg(pCfg); + free(pCfg); return code; }