提交 7a1e2bed 编写于 作者: H Hongze Cheng

add version to tsdb schema

上级 bddcc6fa
......@@ -66,7 +66,7 @@ typedef struct {
// ----------------- TSDB SCHEMA DEFINITION
typedef struct {
int totalCols; // Total columns allocated
int version; // version
int numOfCols; // Number of columns appended
int tlen; // maximum length of a SDataRow without the header part
int flen; // First part length in a SDataRow after the header part
......@@ -74,16 +74,13 @@ typedef struct {
} STSchema;
#define schemaNCols(s) ((s)->numOfCols)
#define schemaTotalCols(s) ((s)->totalCols)
#define schemaVersion(s) ((s)->version)
#define schemaTLen(s) ((s)->tlen)
#define schemaFLen(s) ((s)->flen)
#define schemaColAt(s, i) ((s)->columns + i)
#define tdFreeSchema(s) tfree((s))
STSchema *tdNewSchema(int32_t nCols);
int tdSchemaAddCol(STSchema *pSchema, int8_t type, int16_t colId, int32_t bytes);
STSchema *tdDupSchema(STSchema *pSchema);
int tdGetSchemaEncodeSize(STSchema *pSchema);
void * tdEncodeSchema(void *dst, STSchema *pSchema);
STSchema *tdDecodeSchema(void **psrc);
......@@ -103,6 +100,22 @@ static FORCE_INLINE STColumn *tdGetColOfID(STSchema *pSchema, int16_t colId) {
return (STColumn *)ptr;
}
// ----------------- SCHEMA BUILDER DEFINITION
typedef struct {
int tCols;
int nCols;
int tlen;
int flen;
int version;
STColumn *columns;
} STSchemaBuilder;
int tdInitTSchemaBuilder(STSchemaBuilder *pBuilder, int32_t version);
void tdDestroyTSchemaBuilder(STSchemaBuilder *pBuilder);
void tdResetTSchemaBuilder(STSchemaBuilder *pBuilder, int32_t version);
int tdAddColToSchema(STSchemaBuilder *pBuilder, int8_t type, int16_t colId, int32_t bytes);
STSchema *tdGetSchemaFromBuilder(STSchemaBuilder *pBuilder);
// ----------------- Data row structure
/* A data row, the format is like below:
......
......@@ -16,93 +16,27 @@
#include "talgo.h"
#include "wchar.h"
/**
* Create a SSchema object with nCols columns
* ASSUMPTIONS: VALID PARAMETERS
*
* @param nCols number of columns the schema has
*
* @return a STSchema object for success
* NULL for failure
*/
STSchema *tdNewSchema(int32_t nCols) {
int32_t size = sizeof(STSchema) + sizeof(STColumn) * nCols;
STSchema *pSchema = (STSchema *)calloc(1, size);
if (pSchema == NULL) return NULL;
pSchema->numOfCols = 0;
pSchema->totalCols = nCols;
pSchema->flen = 0;
pSchema->tlen = 0;
return pSchema;
}
/**
* Append a column to the schema
*/
int tdSchemaAddCol(STSchema *pSchema, int8_t type, int16_t colId, int32_t bytes) {
if (!isValidDataType(type, 0) || pSchema->numOfCols >= pSchema->totalCols) return -1;
STColumn *pCol = schemaColAt(pSchema, schemaNCols(pSchema));
colSetType(pCol, type);
colSetColId(pCol, colId);
if (schemaNCols(pSchema) == 0) {
colSetOffset(pCol, 0);
} else {
STColumn *pTCol = schemaColAt(pSchema, schemaNCols(pSchema) - 1);
colSetOffset(pCol, pTCol->offset + TYPE_BYTES[pTCol->type]);
}
switch (type) {
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
colSetBytes(pCol, bytes); // Set as maximum bytes
pSchema->tlen += (TYPE_BYTES[type] + sizeof(VarDataLenT) + bytes);
break;
default:
colSetBytes(pCol, TYPE_BYTES[type]);
pSchema->tlen += TYPE_BYTES[type];
break;
}
pSchema->numOfCols++;
pSchema->flen += TYPE_BYTES[type];
ASSERT(pCol->offset < pSchema->flen);
return 0;
}
/**
* Duplicate the schema and return a new object
*/
STSchema *tdDupSchema(STSchema *pSchema) {
STSchema *tSchema = tdNewSchema(schemaNCols(pSchema));
int tlen = sizeof(STSchema) + sizeof(STColumn) * schemaNCols(pSchema);
STSchema *tSchema = (STSchema *)malloc(tlen);
if (tSchema == NULL) return NULL;
int32_t size = sizeof(STSchema) + sizeof(STColumn) * schemaNCols(pSchema);
memcpy((void *)tSchema, (void *)pSchema, size);
memcpy((void *)tSchema, (void *)pSchema, tlen);
return tSchema;
}
/**
* Return the size of encoded schema
*/
int tdGetSchemaEncodeSize(STSchema *pSchema) {
return T_MEMBER_SIZE(STSchema, totalCols) +
schemaNCols(pSchema) *
(T_MEMBER_SIZE(STColumn, type) + T_MEMBER_SIZE(STColumn, colId) + T_MEMBER_SIZE(STColumn, bytes));
}
/**
* Encode a schema to dst, and return the next pointer
*/
void *tdEncodeSchema(void *dst, STSchema *pSchema) {
ASSERT(pSchema->numOfCols == pSchema->totalCols);
T_APPEND_MEMBER(dst, pSchema, STSchema, totalCols);
T_APPEND_MEMBER(dst, pSchema, STSchema, version);
T_APPEND_MEMBER(dst, pSchema, STSchema, numOfCols);
for (int i = 0; i < schemaNCols(pSchema); i++) {
STColumn *pCol = schemaColAt(pSchema, i);
T_APPEND_MEMBER(dst, pCol, STColumn, type);
......@@ -118,11 +52,14 @@ void *tdEncodeSchema(void *dst, STSchema *pSchema) {
*/
STSchema *tdDecodeSchema(void **psrc) {
int totalCols = 0;
int version = 0;
STSchemaBuilder schemaBuilder = {0};
T_READ_MEMBER(*psrc, int, version);
T_READ_MEMBER(*psrc, int, totalCols);
STSchema *pSchema = tdNewSchema(totalCols);
if (pSchema == NULL) return NULL;
if (tdInitTSchemaBuilder(&schemaBuilder, version) < 0) return NULL;
for (int i = 0; i < totalCols; i++) {
int8_t type = 0;
int16_t colId = 0;
......@@ -131,9 +68,91 @@ STSchema *tdDecodeSchema(void **psrc) {
T_READ_MEMBER(*psrc, int16_t, colId);
T_READ_MEMBER(*psrc, int32_t, bytes);
tdSchemaAddCol(pSchema, type, colId, bytes);
if (tdAddColToSchema(&schemaBuilder, type, colId, bytes) < 0) {
tdDestroyTSchemaBuilder(&schemaBuilder);
return NULL;
}
}
STSchema *pSchema = tdGetSchemaFromBuilder(&schemaBuilder);
tdDestroyTSchemaBuilder(&schemaBuilder);
return pSchema;
}
int tdInitTSchemaBuilder(STSchemaBuilder *pBuilder, int32_t version) {
if (pBuilder == NULL) return -1;
pBuilder->tCols = 256;
pBuilder->columns = (STColumn *)malloc(sizeof(STColumn) * pBuilder->tCols);
if (pBuilder->columns == NULL) return -1;
tdResetTSchemaBuilder(pBuilder, version);
return 0;
}
void tdDestroyTSchemaBuilder(STSchemaBuilder *pBuilder) {
if (pBuilder) {
tfree(pBuilder->columns);
}
}
void tdResetTSchemaBuilder(STSchemaBuilder *pBuilder, int32_t version) {
pBuilder->nCols = 0;
pBuilder->tlen = 0;
pBuilder->flen = 0;
pBuilder->version = version;
}
int tdAddColToSchema(STSchemaBuilder *pBuilder, int8_t type, int16_t colId, int32_t bytes) {
if (!isValidDataType(type, 0)) return -1;
if (pBuilder->nCols >= pBuilder->tCols) {
pBuilder->tCols *= 2;
pBuilder->columns = (STColumn *)realloc(pBuilder->columns, sizeof(STColumn) * pBuilder->tCols);
if (pBuilder->columns == NULL) return -1;
}
STColumn *pCol = &(pBuilder->columns[pBuilder->nCols]);
colSetType(pCol, type);
colSetColId(pCol, colId);
if (pBuilder->nCols == 0) {
colSetOffset(pCol, 0);
} else {
STColumn *pTCol = &(pBuilder->columns[pBuilder->nCols-1]);
colSetOffset(pCol, pTCol->offset + TYPE_BYTES[pTCol->type]);
}
if (IS_VAR_DATA_TYPE(type)) {
colSetBytes(pCol, bytes);
pBuilder->tlen += (TYPE_BYTES[type] + sizeof(VarDataLenT) + bytes);
} else {
colSetBytes(pCol, TYPE_BYTES[type]);
pBuilder->tlen += TYPE_BYTES[type];
}
pBuilder->nCols++;
pBuilder->flen += TYPE_BYTES[type];
ASSERT(pCol->offset < pBuilder->flen);
return 0;
}
STSchema *tdGetSchemaFromBuilder(STSchemaBuilder *pBuilder) {
if (pBuilder->nCols <= 0) return NULL;
int tlen = sizeof(STSchema) + sizeof(STColumn) * pBuilder->nCols;
STSchema *pSchema = (STSchema *)malloc(tlen);
if (pSchema == NULL) return NULL;
schemaVersion(pSchema) = pBuilder->version;
schemaNCols(pSchema) = pBuilder->nCols;
schemaTLen(pSchema) = pBuilder->tlen;
schemaFLen(pSchema) = pBuilder->flen;
memcpy(schemaColAt(pSchema, 0), pBuilder->columns, sizeof(STColumn) * pBuilder->nCols);
return pSchema;
}
......
......@@ -59,9 +59,15 @@ int main(int argc, char *argv[]) {
exit(-1);
}
STSchema *pSchema = tdNewSchema(2);
tdSchemaAddCol(pSchema, TSDB_DATA_TYPE_TIMESTAMP, 0, 8);
tdSchemaAddCol(pSchema, TSDB_DATA_TYPE_INT, 1, 4);
STSchemaBuilder schemaBuilder = {0};
tdInitTSchemaBuilder(&schemaBuilder, 0);
tdAddColToSchema(&schemaBuilder, TSDB_DATA_TYPE_TIMESTAMP, 0, 8);
tdAddColToSchema(&schemaBuilder, TSDB_DATA_TYPE_INT, 1, 4);
STSchema *pSchema = tdGetSchemaFromBuilder(&schemaBuilder);
tdDestroyTSchemaBuilder(&schemaBuilder);
for (int sid =1; sid<10; ++sid) {
cqCreate(pCq, sid, "select avg(speed) from demo.t1 sliding(1s) interval(5s)", pSchema);
......
......@@ -440,41 +440,39 @@ STable *tsdbIsValidTableToInsert(STsdbMeta *pMeta, STableId tableId) {
STableCfg *tsdbCreateTableCfgFromMsg(SMDCreateTableMsg *pMsg) {
if (pMsg == NULL) return NULL;
SSchema *pSchema = (SSchema *)pMsg->data;
int16_t numOfCols = htons(pMsg->numOfColumns);
int16_t numOfTags = htons(pMsg->numOfTags);
SSchema * pSchema = (SSchema *)pMsg->data;
int16_t numOfCols = htons(pMsg->numOfColumns);
int16_t numOfTags = htons(pMsg->numOfTags);
STSchemaBuilder schemaBuilder = {0};
STableCfg *pCfg = (STableCfg *)calloc(1, sizeof(STableCfg));
if (pCfg == NULL) return NULL;
if (tsdbInitTableCfg(pCfg, pMsg->tableType, htobe64(pMsg->uid), htonl(pMsg->sid)) < 0) goto _err;
STSchema *pDSchema = tdNewSchema(numOfCols);
if (pDSchema == NULL) goto _err;
if (tdInitTSchemaBuilder(&schemaBuilder, htonl(pMsg->sversion)) < 0) goto _err;
for (int i = 0; i < numOfCols; i++) {
tdSchemaAddCol(pDSchema, pSchema[i].type, htons(pSchema[i].colId), htons(pSchema[i].bytes));
tdAddColToSchema(&schemaBuilder, pSchema[i].type, htons(pSchema[i].colId), htons(pSchema[i].bytes));
}
if (tsdbTableSetSchema(pCfg, pDSchema, false) < 0) goto _err;
if (tsdbTableSetSchema(pCfg, tdGetSchemaFromBuilder(&schemaBuilder), false) < 0) goto _err;
if (tsdbTableSetName(pCfg, pMsg->tableId, true) < 0) goto _err;
if (numOfTags > 0) {
STSchema *pTSchema = tdNewSchema(numOfTags);
int accBytes = 0;
char *pTagData = pMsg->data + (numOfCols + numOfTags) * sizeof(SSchema);
SKVRowBuilder kvRowBuilder = {0};
tdResetTSchemaBuilder(&schemaBuilder, htonl(pMsg->tversion));
if (tdInitKVRowBuilder(&kvRowBuilder) < 0) goto _err;
for (int i = numOfCols; i < numOfCols + numOfTags; i++) {
tdSchemaAddCol(pTSchema, pSchema[i].type, htons(pSchema[i].colId), htons(pSchema[i].bytes));
tdAddColToSchema(&schemaBuilder, pSchema[i].type, htons(pSchema[i].colId), htons(pSchema[i].bytes));
tdAddColToKVRow(&kvRowBuilder, htons(pSchema[i].colId), pSchema[i].type, pTagData + accBytes);
accBytes += htons(pSchema[i + numOfCols].bytes);
}
if (tsdbTableSetTagSchema(pCfg, pTSchema, false) < 0) goto _err;
if (tsdbTableSetTagSchema(pCfg, tdGetSchemaFromBuilder(&schemaBuilder), false) < 0) goto _err;
if (tsdbTableSetSName(pCfg, pMsg->superTableId, true) < 0) goto _err;
if (tsdbTableSetSuperUid(pCfg, htobe64(pMsg->superTableUid)) < 0) goto _err;
char * pTagData = pMsg->data + (numOfCols + numOfTags) * sizeof(SSchema);
int accBytes = 0;
SKVRowBuilder kvRowBuilder;
if (tdInitKVRowBuilder(&kvRowBuilder) < 0) goto _err;
for (int i = 0; i < numOfTags; i++) {
STColumn *pCol = schemaColAt(pTSchema, i);
tdAddColToKVRow(&kvRowBuilder, pCol->colId, pCol->type, pTagData + accBytes);
accBytes += htons(pSchema[i+numOfCols].bytes);
}
tsdbTableSetTagValue(pCfg, tdGetKVRowFromBuilder(&kvRowBuilder), false);
tdDestroyKVRowBuilder(&kvRowBuilder);
}
......@@ -484,9 +482,12 @@ STableCfg *tsdbCreateTableCfgFromMsg(SMDCreateTableMsg *pMsg) {
tsdbTableSetStreamSql(pCfg, sql, true);
}
tdDestroyTSchemaBuilder(&schemaBuilder);
return pCfg;
_err:
tdDestroyTSchemaBuilder(&schemaBuilder);
tsdbClearTableCfg(pCfg);
tfree(pCfg);
return NULL;
......
......@@ -130,52 +130,11 @@ static int32_t vnodeProcessDropTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet
}
static int32_t vnodeProcessAlterTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) {
SMDCreateTableMsg *pTable = pCont;
int32_t code = 0;
vTrace("vgId:%d, table:%s, start to alter", pVnode->vgId, pTable->tableId);
int16_t numOfColumns = htons(pTable->numOfColumns);
int16_t numOfTags = htons(pTable->numOfTags);
int32_t sid = htonl(pTable->sid);
uint64_t uid = htobe64(pTable->uid);
SSchema *pSchema = (SSchema *) pTable->data;
int32_t totalCols = numOfColumns + numOfTags;
STableCfg tCfg;
tsdbInitTableCfg(&tCfg, pTable->tableType, uid, sid);
STSchema *pDestSchema = tdNewSchema(numOfColumns);
for (int i = 0; i < numOfColumns; i++) {
tdSchemaAddCol(pDestSchema, pSchema[i].type, htons(pSchema[i].colId), htons(pSchema[i].bytes));
}
tsdbTableSetSchema(&tCfg, pDestSchema, false);
if (numOfTags != 0) {
STSchema *pDestTagSchema = tdNewSchema(numOfTags);
for (int i = numOfColumns; i < totalCols; i++) {
tdSchemaAddCol(pDestTagSchema, pSchema[i].type, htons(pSchema[i].colId), htons(pSchema[i].bytes));
}
tsdbTableSetTagSchema(&tCfg, pDestTagSchema, false);
char *pTagData = pTable->data + totalCols * sizeof(SSchema);
int accumBytes = 0;
SDataRow dataRow = tdNewDataRowFromSchema(pDestTagSchema);
for (int i = 0; i < numOfTags; i++) {
STColumn *pTCol = schemaColAt(pDestTagSchema, i);
tdAppendColVal(dataRow, pTagData + accumBytes, pTCol->type, pTCol->bytes, pTCol->offset);
accumBytes += htons(pSchema[i + numOfColumns].bytes);
}
tsdbTableSetTagValue(&tCfg, dataRow, false);
}
code = tsdbAlterTable(pVnode->tsdb, &tCfg);
tfree(pDestSchema);
vTrace("vgId:%d, table:%s, alter table result:%d", pVnode->vgId, pTable->tableId, code);
STableCfg *pCfg = tsdbCreateTableCfgFromMsg((SMDCreateTableMsg *)pCont);
if (pCfg == NULL) return terrno;
int32_t code = tsdbAlterTable(pVnode->tsdb, pCfg);
tsdbClearTableCfg(pCfg);
free(pCfg);
return code;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册