未验证 提交 a7c25fad 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #2103 from taosdata/feature/2.0tsdb

add version to tsdb schema
...@@ -66,7 +66,7 @@ typedef struct { ...@@ -66,7 +66,7 @@ typedef struct {
// ----------------- TSDB SCHEMA DEFINITION // ----------------- TSDB SCHEMA DEFINITION
typedef struct { typedef struct {
int totalCols; // Total columns allocated int version; // version
int numOfCols; // Number of columns appended int numOfCols; // Number of columns appended
int tlen; // maximum length of a SDataRow without the header part int tlen; // maximum length of a SDataRow without the header part
int flen; // First part length in a SDataRow after the header part int flen; // First part length in a SDataRow after the header part
...@@ -74,16 +74,13 @@ typedef struct { ...@@ -74,16 +74,13 @@ typedef struct {
} STSchema; } STSchema;
#define schemaNCols(s) ((s)->numOfCols) #define schemaNCols(s) ((s)->numOfCols)
#define schemaTotalCols(s) ((s)->totalCols) #define schemaVersion(s) ((s)->version)
#define schemaTLen(s) ((s)->tlen) #define schemaTLen(s) ((s)->tlen)
#define schemaFLen(s) ((s)->flen) #define schemaFLen(s) ((s)->flen)
#define schemaColAt(s, i) ((s)->columns + i) #define schemaColAt(s, i) ((s)->columns + i)
#define tdFreeSchema(s) tfree((s)) #define tdFreeSchema(s) tfree((s))
STSchema *tdNewSchema(int32_t nCols);
int tdSchemaAddCol(STSchema *pSchema, int8_t type, int16_t colId, int32_t bytes);
STSchema *tdDupSchema(STSchema *pSchema); STSchema *tdDupSchema(STSchema *pSchema);
int tdGetSchemaEncodeSize(STSchema *pSchema);
void * tdEncodeSchema(void *dst, STSchema *pSchema); void * tdEncodeSchema(void *dst, STSchema *pSchema);
STSchema *tdDecodeSchema(void **psrc); STSchema *tdDecodeSchema(void **psrc);
...@@ -103,6 +100,22 @@ static FORCE_INLINE STColumn *tdGetColOfID(STSchema *pSchema, int16_t colId) { ...@@ -103,6 +100,22 @@ static FORCE_INLINE STColumn *tdGetColOfID(STSchema *pSchema, int16_t colId) {
return (STColumn *)ptr; return (STColumn *)ptr;
} }
// ----------------- SCHEMA BUILDER DEFINITION
typedef struct {
int tCols;
int nCols;
int tlen;
int flen;
int version;
STColumn *columns;
} STSchemaBuilder;
int tdInitTSchemaBuilder(STSchemaBuilder *pBuilder, int32_t version);
void tdDestroyTSchemaBuilder(STSchemaBuilder *pBuilder);
void tdResetTSchemaBuilder(STSchemaBuilder *pBuilder, int32_t version);
int tdAddColToSchema(STSchemaBuilder *pBuilder, int8_t type, int16_t colId, int32_t bytes);
STSchema *tdGetSchemaFromBuilder(STSchemaBuilder *pBuilder);
// ----------------- Data row structure // ----------------- Data row structure
/* A data row, the format is like below: /* A data row, the format is like below:
......
...@@ -16,93 +16,27 @@ ...@@ -16,93 +16,27 @@
#include "talgo.h" #include "talgo.h"
#include "wchar.h" #include "wchar.h"
/**
* Create a SSchema object with nCols columns
* ASSUMPTIONS: VALID PARAMETERS
*
* @param nCols number of columns the schema has
*
* @return a STSchema object for success
* NULL for failure
*/
STSchema *tdNewSchema(int32_t nCols) {
int32_t size = sizeof(STSchema) + sizeof(STColumn) * nCols;
STSchema *pSchema = (STSchema *)calloc(1, size);
if (pSchema == NULL) return NULL;
pSchema->numOfCols = 0;
pSchema->totalCols = nCols;
pSchema->flen = 0;
pSchema->tlen = 0;
return pSchema;
}
/**
* Append a column to the schema
*/
int tdSchemaAddCol(STSchema *pSchema, int8_t type, int16_t colId, int32_t bytes) {
if (!isValidDataType(type, 0) || pSchema->numOfCols >= pSchema->totalCols) return -1;
STColumn *pCol = schemaColAt(pSchema, schemaNCols(pSchema));
colSetType(pCol, type);
colSetColId(pCol, colId);
if (schemaNCols(pSchema) == 0) {
colSetOffset(pCol, 0);
} else {
STColumn *pTCol = schemaColAt(pSchema, schemaNCols(pSchema) - 1);
colSetOffset(pCol, pTCol->offset + TYPE_BYTES[pTCol->type]);
}
switch (type) {
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
colSetBytes(pCol, bytes); // Set as maximum bytes
pSchema->tlen += (TYPE_BYTES[type] + sizeof(VarDataLenT) + bytes);
break;
default:
colSetBytes(pCol, TYPE_BYTES[type]);
pSchema->tlen += TYPE_BYTES[type];
break;
}
pSchema->numOfCols++;
pSchema->flen += TYPE_BYTES[type];
ASSERT(pCol->offset < pSchema->flen);
return 0;
}
/** /**
* Duplicate the schema and return a new object * Duplicate the schema and return a new object
*/ */
STSchema *tdDupSchema(STSchema *pSchema) { STSchema *tdDupSchema(STSchema *pSchema) {
STSchema *tSchema = tdNewSchema(schemaNCols(pSchema));
int tlen = sizeof(STSchema) + sizeof(STColumn) * schemaNCols(pSchema);
STSchema *tSchema = (STSchema *)malloc(tlen);
if (tSchema == NULL) return NULL; if (tSchema == NULL) return NULL;
int32_t size = sizeof(STSchema) + sizeof(STColumn) * schemaNCols(pSchema); memcpy((void *)tSchema, (void *)pSchema, tlen);
memcpy((void *)tSchema, (void *)pSchema, size);
return tSchema; return tSchema;
} }
/**
* Return the size of encoded schema
*/
int tdGetSchemaEncodeSize(STSchema *pSchema) {
return T_MEMBER_SIZE(STSchema, totalCols) +
schemaNCols(pSchema) *
(T_MEMBER_SIZE(STColumn, type) + T_MEMBER_SIZE(STColumn, colId) + T_MEMBER_SIZE(STColumn, bytes));
}
/** /**
* Encode a schema to dst, and return the next pointer * Encode a schema to dst, and return the next pointer
*/ */
void *tdEncodeSchema(void *dst, STSchema *pSchema) { void *tdEncodeSchema(void *dst, STSchema *pSchema) {
ASSERT(pSchema->numOfCols == pSchema->totalCols);
T_APPEND_MEMBER(dst, pSchema, STSchema, totalCols); T_APPEND_MEMBER(dst, pSchema, STSchema, version);
T_APPEND_MEMBER(dst, pSchema, STSchema, numOfCols);
for (int i = 0; i < schemaNCols(pSchema); i++) { for (int i = 0; i < schemaNCols(pSchema); i++) {
STColumn *pCol = schemaColAt(pSchema, i); STColumn *pCol = schemaColAt(pSchema, i);
T_APPEND_MEMBER(dst, pCol, STColumn, type); T_APPEND_MEMBER(dst, pCol, STColumn, type);
...@@ -118,11 +52,14 @@ void *tdEncodeSchema(void *dst, STSchema *pSchema) { ...@@ -118,11 +52,14 @@ void *tdEncodeSchema(void *dst, STSchema *pSchema) {
*/ */
STSchema *tdDecodeSchema(void **psrc) { STSchema *tdDecodeSchema(void **psrc) {
int totalCols = 0; int totalCols = 0;
int version = 0;
STSchemaBuilder schemaBuilder = {0};
T_READ_MEMBER(*psrc, int, version);
T_READ_MEMBER(*psrc, int, totalCols); T_READ_MEMBER(*psrc, int, totalCols);
STSchema *pSchema = tdNewSchema(totalCols); if (tdInitTSchemaBuilder(&schemaBuilder, version) < 0) return NULL;
if (pSchema == NULL) return NULL;
for (int i = 0; i < totalCols; i++) { for (int i = 0; i < totalCols; i++) {
int8_t type = 0; int8_t type = 0;
int16_t colId = 0; int16_t colId = 0;
...@@ -131,8 +68,90 @@ STSchema *tdDecodeSchema(void **psrc) { ...@@ -131,8 +68,90 @@ STSchema *tdDecodeSchema(void **psrc) {
T_READ_MEMBER(*psrc, int16_t, colId); T_READ_MEMBER(*psrc, int16_t, colId);
T_READ_MEMBER(*psrc, int32_t, bytes); T_READ_MEMBER(*psrc, int32_t, bytes);
tdSchemaAddCol(pSchema, type, colId, bytes); if (tdAddColToSchema(&schemaBuilder, type, colId, bytes) < 0) {
tdDestroyTSchemaBuilder(&schemaBuilder);
return NULL;
} }
}
STSchema *pSchema = tdGetSchemaFromBuilder(&schemaBuilder);
tdDestroyTSchemaBuilder(&schemaBuilder);
return pSchema;
}
int tdInitTSchemaBuilder(STSchemaBuilder *pBuilder, int32_t version) {
if (pBuilder == NULL) return -1;
pBuilder->tCols = 256;
pBuilder->columns = (STColumn *)malloc(sizeof(STColumn) * pBuilder->tCols);
if (pBuilder->columns == NULL) return -1;
tdResetTSchemaBuilder(pBuilder, version);
return 0;
}
void tdDestroyTSchemaBuilder(STSchemaBuilder *pBuilder) {
if (pBuilder) {
tfree(pBuilder->columns);
}
}
void tdResetTSchemaBuilder(STSchemaBuilder *pBuilder, int32_t version) {
pBuilder->nCols = 0;
pBuilder->tlen = 0;
pBuilder->flen = 0;
pBuilder->version = version;
}
int tdAddColToSchema(STSchemaBuilder *pBuilder, int8_t type, int16_t colId, int32_t bytes) {
if (!isValidDataType(type, 0)) return -1;
if (pBuilder->nCols >= pBuilder->tCols) {
pBuilder->tCols *= 2;
pBuilder->columns = (STColumn *)realloc(pBuilder->columns, sizeof(STColumn) * pBuilder->tCols);
if (pBuilder->columns == NULL) return -1;
}
STColumn *pCol = &(pBuilder->columns[pBuilder->nCols]);
colSetType(pCol, type);
colSetColId(pCol, colId);
if (pBuilder->nCols == 0) {
colSetOffset(pCol, 0);
} else {
STColumn *pTCol = &(pBuilder->columns[pBuilder->nCols-1]);
colSetOffset(pCol, pTCol->offset + TYPE_BYTES[pTCol->type]);
}
if (IS_VAR_DATA_TYPE(type)) {
colSetBytes(pCol, bytes);
pBuilder->tlen += (TYPE_BYTES[type] + sizeof(VarDataLenT) + bytes);
} else {
colSetBytes(pCol, TYPE_BYTES[type]);
pBuilder->tlen += TYPE_BYTES[type];
}
pBuilder->nCols++;
pBuilder->flen += TYPE_BYTES[type];
ASSERT(pCol->offset < pBuilder->flen);
return 0;
}
STSchema *tdGetSchemaFromBuilder(STSchemaBuilder *pBuilder) {
if (pBuilder->nCols <= 0) return NULL;
int tlen = sizeof(STSchema) + sizeof(STColumn) * pBuilder->nCols;
STSchema *pSchema = (STSchema *)malloc(tlen);
if (pSchema == NULL) return NULL;
schemaVersion(pSchema) = pBuilder->version;
schemaNCols(pSchema) = pBuilder->nCols;
schemaTLen(pSchema) = pBuilder->tlen;
schemaFLen(pSchema) = pBuilder->flen;
memcpy(schemaColAt(pSchema, 0), pBuilder->columns, sizeof(STColumn) * pBuilder->nCols);
return pSchema; return pSchema;
} }
......
...@@ -59,9 +59,15 @@ int main(int argc, char *argv[]) { ...@@ -59,9 +59,15 @@ int main(int argc, char *argv[]) {
exit(-1); exit(-1);
} }
STSchema *pSchema = tdNewSchema(2); STSchemaBuilder schemaBuilder = {0};
tdSchemaAddCol(pSchema, TSDB_DATA_TYPE_TIMESTAMP, 0, 8);
tdSchemaAddCol(pSchema, TSDB_DATA_TYPE_INT, 1, 4); tdInitTSchemaBuilder(&schemaBuilder, 0);
tdAddColToSchema(&schemaBuilder, TSDB_DATA_TYPE_TIMESTAMP, 0, 8);
tdAddColToSchema(&schemaBuilder, TSDB_DATA_TYPE_INT, 1, 4);
STSchema *pSchema = tdGetSchemaFromBuilder(&schemaBuilder);
tdDestroyTSchemaBuilder(&schemaBuilder);
for (int sid =1; sid<10; ++sid) { for (int sid =1; sid<10; ++sid) {
cqCreate(pCq, sid, "select avg(speed) from demo.t1 sliding(1s) interval(5s)", pSchema); cqCreate(pCq, sid, "select avg(speed) from demo.t1 sliding(1s) interval(5s)", pSchema);
......
...@@ -440,41 +440,39 @@ STable *tsdbIsValidTableToInsert(STsdbMeta *pMeta, STableId tableId) { ...@@ -440,41 +440,39 @@ STable *tsdbIsValidTableToInsert(STsdbMeta *pMeta, STableId tableId) {
STableCfg *tsdbCreateTableCfgFromMsg(SMDCreateTableMsg *pMsg) { STableCfg *tsdbCreateTableCfgFromMsg(SMDCreateTableMsg *pMsg) {
if (pMsg == NULL) return NULL; if (pMsg == NULL) return NULL;
SSchema *pSchema = (SSchema *)pMsg->data; SSchema * pSchema = (SSchema *)pMsg->data;
int16_t numOfCols = htons(pMsg->numOfColumns); int16_t numOfCols = htons(pMsg->numOfColumns);
int16_t numOfTags = htons(pMsg->numOfTags); int16_t numOfTags = htons(pMsg->numOfTags);
STSchemaBuilder schemaBuilder = {0};
STableCfg *pCfg = (STableCfg *)calloc(1, sizeof(STableCfg)); STableCfg *pCfg = (STableCfg *)calloc(1, sizeof(STableCfg));
if (pCfg == NULL) return NULL; if (pCfg == NULL) return NULL;
if (tsdbInitTableCfg(pCfg, pMsg->tableType, htobe64(pMsg->uid), htonl(pMsg->sid)) < 0) goto _err; if (tsdbInitTableCfg(pCfg, pMsg->tableType, htobe64(pMsg->uid), htonl(pMsg->sid)) < 0) goto _err;
STSchema *pDSchema = tdNewSchema(numOfCols); if (tdInitTSchemaBuilder(&schemaBuilder, htonl(pMsg->sversion)) < 0) goto _err;
if (pDSchema == NULL) goto _err;
for (int i = 0; i < numOfCols; i++) { for (int i = 0; i < numOfCols; i++) {
tdSchemaAddCol(pDSchema, pSchema[i].type, htons(pSchema[i].colId), htons(pSchema[i].bytes)); tdAddColToSchema(&schemaBuilder, pSchema[i].type, htons(pSchema[i].colId), htons(pSchema[i].bytes));
} }
if (tsdbTableSetSchema(pCfg, pDSchema, false) < 0) goto _err; if (tsdbTableSetSchema(pCfg, tdGetSchemaFromBuilder(&schemaBuilder), false) < 0) goto _err;
if (tsdbTableSetName(pCfg, pMsg->tableId, true) < 0) goto _err; if (tsdbTableSetName(pCfg, pMsg->tableId, true) < 0) goto _err;
if (numOfTags > 0) { if (numOfTags > 0) {
STSchema *pTSchema = tdNewSchema(numOfTags); int accBytes = 0;
char *pTagData = pMsg->data + (numOfCols + numOfTags) * sizeof(SSchema);
SKVRowBuilder kvRowBuilder = {0};
tdResetTSchemaBuilder(&schemaBuilder, htonl(pMsg->tversion));
if (tdInitKVRowBuilder(&kvRowBuilder) < 0) goto _err;
for (int i = numOfCols; i < numOfCols + numOfTags; i++) { for (int i = numOfCols; i < numOfCols + numOfTags; i++) {
tdSchemaAddCol(pTSchema, pSchema[i].type, htons(pSchema[i].colId), htons(pSchema[i].bytes)); tdAddColToSchema(&schemaBuilder, pSchema[i].type, htons(pSchema[i].colId), htons(pSchema[i].bytes));
tdAddColToKVRow(&kvRowBuilder, htons(pSchema[i].colId), pSchema[i].type, pTagData + accBytes);
accBytes += htons(pSchema[i + numOfCols].bytes);
} }
if (tsdbTableSetTagSchema(pCfg, pTSchema, false) < 0) goto _err; if (tsdbTableSetTagSchema(pCfg, tdGetSchemaFromBuilder(&schemaBuilder), false) < 0) goto _err;
if (tsdbTableSetSName(pCfg, pMsg->superTableId, true) < 0) goto _err; if (tsdbTableSetSName(pCfg, pMsg->superTableId, true) < 0) goto _err;
if (tsdbTableSetSuperUid(pCfg, htobe64(pMsg->superTableUid)) < 0) goto _err; if (tsdbTableSetSuperUid(pCfg, htobe64(pMsg->superTableUid)) < 0) goto _err;
char * pTagData = pMsg->data + (numOfCols + numOfTags) * sizeof(SSchema);
int accBytes = 0;
SKVRowBuilder kvRowBuilder;
if (tdInitKVRowBuilder(&kvRowBuilder) < 0) goto _err;
for (int i = 0; i < numOfTags; i++) {
STColumn *pCol = schemaColAt(pTSchema, i);
tdAddColToKVRow(&kvRowBuilder, pCol->colId, pCol->type, pTagData + accBytes);
accBytes += htons(pSchema[i+numOfCols].bytes);
}
tsdbTableSetTagValue(pCfg, tdGetKVRowFromBuilder(&kvRowBuilder), false); tsdbTableSetTagValue(pCfg, tdGetKVRowFromBuilder(&kvRowBuilder), false);
tdDestroyKVRowBuilder(&kvRowBuilder); tdDestroyKVRowBuilder(&kvRowBuilder);
} }
...@@ -484,9 +482,12 @@ STableCfg *tsdbCreateTableCfgFromMsg(SMDCreateTableMsg *pMsg) { ...@@ -484,9 +482,12 @@ STableCfg *tsdbCreateTableCfgFromMsg(SMDCreateTableMsg *pMsg) {
tsdbTableSetStreamSql(pCfg, sql, true); tsdbTableSetStreamSql(pCfg, sql, true);
} }
tdDestroyTSchemaBuilder(&schemaBuilder);
return pCfg; return pCfg;
_err: _err:
tdDestroyTSchemaBuilder(&schemaBuilder);
tsdbClearTableCfg(pCfg); tsdbClearTableCfg(pCfg);
tfree(pCfg); tfree(pCfg);
return NULL; return NULL;
......
...@@ -130,52 +130,11 @@ static int32_t vnodeProcessDropTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet ...@@ -130,52 +130,11 @@ static int32_t vnodeProcessDropTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet
} }
static int32_t vnodeProcessAlterTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) { static int32_t vnodeProcessAlterTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) {
SMDCreateTableMsg *pTable = pCont; STableCfg *pCfg = tsdbCreateTableCfgFromMsg((SMDCreateTableMsg *)pCont);
int32_t code = 0; if (pCfg == NULL) return terrno;
int32_t code = tsdbAlterTable(pVnode->tsdb, pCfg);
vTrace("vgId:%d, table:%s, start to alter", pVnode->vgId, pTable->tableId); tsdbClearTableCfg(pCfg);
int16_t numOfColumns = htons(pTable->numOfColumns); free(pCfg);
int16_t numOfTags = htons(pTable->numOfTags);
int32_t sid = htonl(pTable->sid);
uint64_t uid = htobe64(pTable->uid);
SSchema *pSchema = (SSchema *) pTable->data;
int32_t totalCols = numOfColumns + numOfTags;
STableCfg tCfg;
tsdbInitTableCfg(&tCfg, pTable->tableType, uid, sid);
STSchema *pDestSchema = tdNewSchema(numOfColumns);
for (int i = 0; i < numOfColumns; i++) {
tdSchemaAddCol(pDestSchema, pSchema[i].type, htons(pSchema[i].colId), htons(pSchema[i].bytes));
}
tsdbTableSetSchema(&tCfg, pDestSchema, false);
if (numOfTags != 0) {
STSchema *pDestTagSchema = tdNewSchema(numOfTags);
for (int i = numOfColumns; i < totalCols; i++) {
tdSchemaAddCol(pDestTagSchema, pSchema[i].type, htons(pSchema[i].colId), htons(pSchema[i].bytes));
}
tsdbTableSetTagSchema(&tCfg, pDestTagSchema, false);
char *pTagData = pTable->data + totalCols * sizeof(SSchema);
int accumBytes = 0;
SDataRow dataRow = tdNewDataRowFromSchema(pDestTagSchema);
for (int i = 0; i < numOfTags; i++) {
STColumn *pTCol = schemaColAt(pDestTagSchema, i);
tdAppendColVal(dataRow, pTagData + accumBytes, pTCol->type, pTCol->bytes, pTCol->offset);
accumBytes += htons(pSchema[i + numOfColumns].bytes);
}
tsdbTableSetTagValue(&tCfg, dataRow, false);
}
code = tsdbAlterTable(pVnode->tsdb, &tCfg);
tfree(pDestSchema);
vTrace("vgId:%d, table:%s, alter table result:%d", pVnode->vgId, pTable->tableId, code);
return code; return code;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册