未验证 提交 7de01888 编写于 作者: C Cary Xu 提交者: GitHub

Merge pull request #11189 from taosdata/feature/TD-11463-3.0

Feature/td 11463 3.0
...@@ -54,6 +54,7 @@ typedef void TAOS_SUB; ...@@ -54,6 +54,7 @@ typedef void TAOS_SUB;
#define TSDB_DATA_TYPE_BLOB 18 // binary #define TSDB_DATA_TYPE_BLOB 18 // binary
#define TSDB_DATA_TYPE_MEDIUMBLOB 19 #define TSDB_DATA_TYPE_MEDIUMBLOB 19
#define TSDB_DATA_TYPE_BINARY TSDB_DATA_TYPE_VARCHAR // string #define TSDB_DATA_TYPE_BINARY TSDB_DATA_TYPE_VARCHAR // string
#define TSDB_DATA_TYPE_MAX 20
typedef enum { typedef enum {
TSDB_OPTION_LOCALE, TSDB_OPTION_LOCALE,
......
...@@ -76,6 +76,13 @@ typedef enum { ...@@ -76,6 +76,13 @@ typedef enum {
TSDB_SMA_TYPE_ROLLUP = 2, // Rollup SMA TSDB_SMA_TYPE_ROLLUP = 2, // Rollup SMA
} ETsdbSmaType; } ETsdbSmaType;
typedef enum {
TSDB_BSMA_TYPE_NONE = 0, // no block-wise SMA
TSDB_BSMA_TYPE_I = 1, // sum/min/max(default)
} ETsdbBSmaType;
#define TSDB_BSMA_TYPE_LATEST TSDB_BSMA_TYPE_I
extern char *qtypeStr[]; extern char *qtypeStr[];
#define TSDB_PORT_HTTP 11 #define TSDB_PORT_HTTP 11
......
...@@ -70,11 +70,13 @@ typedef struct { ...@@ -70,11 +70,13 @@ typedef struct {
#pragma pack(pop) #pragma pack(pop)
#define colType(col) ((col)->type) #define colType(col) ((col)->type)
#define colSma(col) ((col)->sma)
#define colColId(col) ((col)->colId) #define colColId(col) ((col)->colId)
#define colBytes(col) ((col)->bytes) #define colBytes(col) ((col)->bytes)
#define colOffset(col) ((col)->offset) #define colOffset(col) ((col)->offset)
#define colSetType(col, t) (colType(col) = (t)) #define colSetType(col, t) (colType(col) = (t))
#define colSetSma(col, s) (colSma(col) = (s))
#define colSetColId(col, id) (colColId(col) = (id)) #define colSetColId(col, id) (colColId(col) = (id))
#define colSetBytes(col, b) (colBytes(col) = (b)) #define colSetBytes(col, b) (colBytes(col) = (b))
#define colSetOffset(col, o) (colOffset(col) = (o)) #define colSetOffset(col, o) (colOffset(col) = (o))
...@@ -139,7 +141,7 @@ typedef struct { ...@@ -139,7 +141,7 @@ typedef struct {
int32_t tdInitTSchemaBuilder(STSchemaBuilder *pBuilder, schema_ver_t version); int32_t tdInitTSchemaBuilder(STSchemaBuilder *pBuilder, schema_ver_t version);
void tdDestroyTSchemaBuilder(STSchemaBuilder *pBuilder); void tdDestroyTSchemaBuilder(STSchemaBuilder *pBuilder);
void tdResetTSchemaBuilder(STSchemaBuilder *pBuilder, schema_ver_t version); void tdResetTSchemaBuilder(STSchemaBuilder *pBuilder, schema_ver_t version);
int32_t tdAddColToSchema(STSchemaBuilder *pBuilder, int8_t type, col_id_t colId, col_bytes_t bytes); int32_t tdAddColToSchema(STSchemaBuilder *pBuilder, int8_t type, int8_t sma, col_id_t colId, col_bytes_t bytes);
STSchema *tdGetSchemaFromBuilder(STSchemaBuilder *pBuilder); STSchema *tdGetSchemaFromBuilder(STSchemaBuilder *pBuilder);
// ----------------- Semantic timestamp key definition // ----------------- Semantic timestamp key definition
......
...@@ -265,6 +265,20 @@ typedef struct SSchema { ...@@ -265,6 +265,20 @@ typedef struct SSchema {
char name[TSDB_COL_NAME_LEN]; char name[TSDB_COL_NAME_LEN];
} SSchema; } SSchema;
typedef struct {
int8_t type;
int8_t sma; // ETsdbBSmaType and default is TSDB_BSMA_TYPE_I
col_id_t colId;
int32_t bytes;
char name[TSDB_COL_NAME_LEN];
} SSchemaEx;
#define SSCHMEA_TYPE(s) ((s)->type)
#define SSCHMEA_SMA(s) ((s)->sma)
#define SSCHMEA_COLID(s) ((s)->colId)
#define SSCHMEA_BYTES(s) ((s)->bytes)
#define SSCHMEA_NAME(s) ((s)->name)
typedef struct { typedef struct {
char name[TSDB_TABLE_FNAME_LEN]; char name[TSDB_TABLE_FNAME_LEN];
int8_t igExists; int8_t igExists;
...@@ -1403,13 +1417,12 @@ typedef struct SVCreateTbReq { ...@@ -1403,13 +1417,12 @@ typedef struct SVCreateTbReq {
}; };
union { union {
struct { struct {
tb_uid_t suid; tb_uid_t suid;
uint32_t nCols; col_id_t nCols;
SSchema* pSchema; col_id_t nBSmaCols;
uint32_t nTagCols; SSchemaEx* pSchema;
SSchema* pTagSchema; col_id_t nTagCols;
col_id_t nBSmaCols; SSchema* pTagSchema;
col_id_t* pBSmaCols;
SRSmaParam* pRSmaParam; SRSmaParam* pRSmaParam;
} stbCfg; } stbCfg;
struct { struct {
...@@ -1417,10 +1430,9 @@ typedef struct SVCreateTbReq { ...@@ -1417,10 +1430,9 @@ typedef struct SVCreateTbReq {
SKVRow pTag; SKVRow pTag;
} ctbCfg; } ctbCfg;
struct { struct {
uint32_t nCols; col_id_t nCols;
SSchema* pSchema; col_id_t nBSmaCols;
col_id_t nBSmaCols; SSchemaEx* pSchema;
col_id_t* pBSmaCols;
SRSmaParam* pRSmaParam; SRSmaParam* pRSmaParam;
} ntbCfg; } ntbCfg;
}; };
...@@ -1899,7 +1911,10 @@ int32_t tDecodeSMqCMCommitOffsetReq(SCoder* decoder, SMqCMCommitOffsetReq* pReq) ...@@ -1899,7 +1911,10 @@ int32_t tDecodeSMqCMCommitOffsetReq(SCoder* decoder, SMqCMCommitOffsetReq* pReq)
typedef struct { typedef struct {
uint32_t nCols; uint32_t nCols;
SSchema* pSchema; union {
SSchema* pSchema;
SSchemaEx* pSchemaEx;
};
} SSchemaWrapper; } SSchemaWrapper;
static FORCE_INLINE int32_t taosEncodeSSchema(void** buf, const SSchema* pSchema) { static FORCE_INLINE int32_t taosEncodeSSchema(void** buf, const SSchema* pSchema) {
......
...@@ -38,8 +38,10 @@ int32_t taosMemorySize(void *ptr); ...@@ -38,8 +38,10 @@ int32_t taosMemorySize(void *ptr);
#define taosMemoryFreeClear(ptr) \ #define taosMemoryFreeClear(ptr) \
do { \ do { \
taosMemoryFree(ptr); \ if (ptr) { \
(ptr)=NULL; \ taosMemoryFree(ptr); \
(ptr) = NULL; \
} \
} while (0) } while (0)
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -57,6 +57,8 @@ static FORCE_INLINE void *taosDecodeFixedI8(const void *buf, int8_t *value) { ...@@ -57,6 +57,8 @@ static FORCE_INLINE void *taosDecodeFixedI8(const void *buf, int8_t *value) {
return POINTER_SHIFT(buf, sizeof(*value)); return POINTER_SHIFT(buf, sizeof(*value));
} }
static FORCE_INLINE void *taosSkipFixedLen(const void *buf, size_t len) { return POINTER_SHIFT(buf, len); }
// ---- Fixed U16 // ---- Fixed U16
static FORCE_INLINE int32_t taosEncodeFixedU16(void **buf, uint16_t value) { static FORCE_INLINE int32_t taosEncodeFixedU16(void **buf, uint16_t value) {
if (buf != NULL) { if (buf != NULL) {
......
...@@ -85,6 +85,7 @@ int tdEncodeSchema(void **buf, STSchema *pSchema) { ...@@ -85,6 +85,7 @@ int tdEncodeSchema(void **buf, STSchema *pSchema) {
for (int i = 0; i < schemaNCols(pSchema); i++) { for (int i = 0; i < schemaNCols(pSchema); i++) {
STColumn *pCol = schemaColAt(pSchema, i); STColumn *pCol = schemaColAt(pSchema, i);
tlen += taosEncodeFixedI8(buf, colType(pCol)); tlen += taosEncodeFixedI8(buf, colType(pCol));
tlen += taosEncodeFixedI8(buf, colSma(pCol));
tlen += taosEncodeFixedI16(buf, colColId(pCol)); tlen += taosEncodeFixedI16(buf, colColId(pCol));
tlen += taosEncodeFixedI16(buf, colBytes(pCol)); tlen += taosEncodeFixedI16(buf, colBytes(pCol));
} }
...@@ -107,12 +108,14 @@ void *tdDecodeSchema(void *buf, STSchema **pRSchema) { ...@@ -107,12 +108,14 @@ void *tdDecodeSchema(void *buf, STSchema **pRSchema) {
for (int i = 0; i < numOfCols; i++) { for (int i = 0; i < numOfCols; i++) {
col_type_t type = 0; col_type_t type = 0;
int8_t sma = TSDB_BSMA_TYPE_NONE;
col_id_t colId = 0; col_id_t colId = 0;
col_bytes_t bytes = 0; col_bytes_t bytes = 0;
buf = taosDecodeFixedI8(buf, &type); buf = taosDecodeFixedI8(buf, &type);
buf = taosDecodeFixedI8(buf, &sma);
buf = taosDecodeFixedI16(buf, &colId); buf = taosDecodeFixedI16(buf, &colId);
buf = taosDecodeFixedI32(buf, &bytes); buf = taosDecodeFixedI32(buf, &bytes);
if (tdAddColToSchema(&schemaBuilder, type, colId, bytes) < 0) { if (tdAddColToSchema(&schemaBuilder, type, sma, colId, bytes) < 0) {
tdDestroyTSchemaBuilder(&schemaBuilder); tdDestroyTSchemaBuilder(&schemaBuilder);
return NULL; return NULL;
} }
...@@ -148,7 +151,7 @@ void tdResetTSchemaBuilder(STSchemaBuilder *pBuilder, schema_ver_t version) { ...@@ -148,7 +151,7 @@ void tdResetTSchemaBuilder(STSchemaBuilder *pBuilder, schema_ver_t version) {
pBuilder->version = version; pBuilder->version = version;
} }
int tdAddColToSchema(STSchemaBuilder *pBuilder, int8_t type, col_id_t colId, col_bytes_t bytes) { int32_t tdAddColToSchema(STSchemaBuilder *pBuilder, int8_t type, int8_t sma, col_id_t colId, col_bytes_t bytes) {
if (!isValidDataType(type)) return -1; if (!isValidDataType(type)) return -1;
if (pBuilder->nCols >= pBuilder->tCols) { if (pBuilder->nCols >= pBuilder->tCols) {
...@@ -161,6 +164,7 @@ int tdAddColToSchema(STSchemaBuilder *pBuilder, int8_t type, col_id_t colId, col ...@@ -161,6 +164,7 @@ int tdAddColToSchema(STSchemaBuilder *pBuilder, int8_t type, col_id_t colId, col
STColumn *pCol = &(pBuilder->columns[pBuilder->nCols]); STColumn *pCol = &(pBuilder->columns[pBuilder->nCols]);
colSetType(pCol, type); colSetType(pCol, type);
colSetColId(pCol, colId); colSetColId(pCol, colId);
colSetSma(pCol, sma);
if (pBuilder->nCols == 0) { if (pBuilder->nCols == 0) {
colSetOffset(pCol, 0); colSetOffset(pCol, 0);
} else { } else {
...@@ -168,9 +172,6 @@ int tdAddColToSchema(STSchemaBuilder *pBuilder, int8_t type, col_id_t colId, col ...@@ -168,9 +172,6 @@ int tdAddColToSchema(STSchemaBuilder *pBuilder, int8_t type, col_id_t colId, col
colSetOffset(pCol, pTCol->offset + TYPE_BYTES[pTCol->type]); colSetOffset(pCol, pTCol->offset + TYPE_BYTES[pTCol->type]);
} }
// TODO: set sma value by user input
pCol->sma = 1;
if (IS_VAR_DATA_TYPE(type)) { if (IS_VAR_DATA_TYPE(type)) {
colSetBytes(pCol, bytes); colSetBytes(pCol, bytes);
pBuilder->tlen += (TYPE_BYTES[type] + bytes); pBuilder->tlen += (TYPE_BYTES[type] + bytes);
......
...@@ -296,24 +296,22 @@ int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) { ...@@ -296,24 +296,22 @@ int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) {
switch (pReq->type) { switch (pReq->type) {
case TD_SUPER_TABLE: case TD_SUPER_TABLE:
tlen += taosEncodeFixedI64(buf, pReq->stbCfg.suid); tlen += taosEncodeFixedI64(buf, pReq->stbCfg.suid);
tlen += taosEncodeFixedU32(buf, pReq->stbCfg.nCols); tlen += taosEncodeFixedI16(buf, pReq->stbCfg.nCols);
for (uint32_t i = 0; i < pReq->stbCfg.nCols; i++) { tlen += taosEncodeFixedI16(buf, pReq->stbCfg.nBSmaCols);
for (col_id_t i = 0; i < pReq->stbCfg.nCols; ++i) {
tlen += taosEncodeFixedI8(buf, pReq->stbCfg.pSchema[i].type); tlen += taosEncodeFixedI8(buf, pReq->stbCfg.pSchema[i].type);
tlen += taosEncodeFixedI8(buf, pReq->stbCfg.pSchema[i].sma);
tlen += taosEncodeFixedI16(buf, pReq->stbCfg.pSchema[i].colId); tlen += taosEncodeFixedI16(buf, pReq->stbCfg.pSchema[i].colId);
tlen += taosEncodeFixedI32(buf, pReq->stbCfg.pSchema[i].bytes); tlen += taosEncodeFixedI32(buf, pReq->stbCfg.pSchema[i].bytes);
tlen += taosEncodeString(buf, pReq->stbCfg.pSchema[i].name); tlen += taosEncodeString(buf, pReq->stbCfg.pSchema[i].name);
} }
tlen += taosEncodeFixedU32(buf, pReq->stbCfg.nTagCols); tlen += taosEncodeFixedI16(buf, pReq->stbCfg.nTagCols);
for (uint32_t i = 0; i < pReq->stbCfg.nTagCols; i++) { for (col_id_t i = 0; i < pReq->stbCfg.nTagCols; ++i) {
tlen += taosEncodeFixedI8(buf, pReq->stbCfg.pTagSchema[i].type); tlen += taosEncodeFixedI8(buf, pReq->stbCfg.pTagSchema[i].type);
tlen += taosEncodeFixedI16(buf, pReq->stbCfg.pTagSchema[i].colId); tlen += taosEncodeFixedI16(buf, pReq->stbCfg.pTagSchema[i].colId);
tlen += taosEncodeFixedI32(buf, pReq->stbCfg.pTagSchema[i].bytes); tlen += taosEncodeFixedI32(buf, pReq->stbCfg.pTagSchema[i].bytes);
tlen += taosEncodeString(buf, pReq->stbCfg.pTagSchema[i].name); tlen += taosEncodeString(buf, pReq->stbCfg.pTagSchema[i].name);
} }
tlen += taosEncodeFixedI16(buf, pReq->stbCfg.nBSmaCols);
for (col_id_t i = 0; i < pReq->stbCfg.nBSmaCols; ++i) {
tlen += taosEncodeFixedI16(buf, pReq->stbCfg.pBSmaCols[i]);
}
if (pReq->rollup && pReq->stbCfg.pRSmaParam) { if (pReq->rollup && pReq->stbCfg.pRSmaParam) {
SRSmaParam *param = pReq->stbCfg.pRSmaParam; SRSmaParam *param = pReq->stbCfg.pRSmaParam;
tlen += taosEncodeFixedU32(buf, (uint32_t)param->xFilesFactor); tlen += taosEncodeFixedU32(buf, (uint32_t)param->xFilesFactor);
...@@ -330,17 +328,15 @@ int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) { ...@@ -330,17 +328,15 @@ int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) {
tlen += tdEncodeKVRow(buf, pReq->ctbCfg.pTag); tlen += tdEncodeKVRow(buf, pReq->ctbCfg.pTag);
break; break;
case TD_NORMAL_TABLE: case TD_NORMAL_TABLE:
tlen += taosEncodeFixedU32(buf, pReq->ntbCfg.nCols); tlen += taosEncodeFixedI16(buf, pReq->ntbCfg.nCols);
for (uint32_t i = 0; i < pReq->ntbCfg.nCols; i++) { tlen += taosEncodeFixedI16(buf, pReq->ntbCfg.nBSmaCols);
for (col_id_t i = 0; i < pReq->ntbCfg.nCols; ++i) {
tlen += taosEncodeFixedI8(buf, pReq->ntbCfg.pSchema[i].type); tlen += taosEncodeFixedI8(buf, pReq->ntbCfg.pSchema[i].type);
tlen += taosEncodeFixedI8(buf, pReq->ntbCfg.pSchema[i].sma);
tlen += taosEncodeFixedI16(buf, pReq->ntbCfg.pSchema[i].colId); tlen += taosEncodeFixedI16(buf, pReq->ntbCfg.pSchema[i].colId);
tlen += taosEncodeFixedI32(buf, pReq->ntbCfg.pSchema[i].bytes); tlen += taosEncodeFixedI32(buf, pReq->ntbCfg.pSchema[i].bytes);
tlen += taosEncodeString(buf, pReq->ntbCfg.pSchema[i].name); tlen += taosEncodeString(buf, pReq->ntbCfg.pSchema[i].name);
} }
tlen += taosEncodeFixedI16(buf, pReq->ntbCfg.nBSmaCols);
for (col_id_t i = 0; i < pReq->ntbCfg.nBSmaCols; ++i) {
tlen += taosEncodeFixedI16(buf, pReq->ntbCfg.pBSmaCols[i]);
}
if (pReq->rollup && pReq->ntbCfg.pRSmaParam) { if (pReq->rollup && pReq->ntbCfg.pRSmaParam) {
SRSmaParam *param = pReq->ntbCfg.pRSmaParam; SRSmaParam *param = pReq->ntbCfg.pRSmaParam;
tlen += taosEncodeFixedU32(buf, (uint32_t)param->xFilesFactor); tlen += taosEncodeFixedU32(buf, (uint32_t)param->xFilesFactor);
...@@ -370,31 +366,24 @@ void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) { ...@@ -370,31 +366,24 @@ void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) {
switch (pReq->type) { switch (pReq->type) {
case TD_SUPER_TABLE: case TD_SUPER_TABLE:
buf = taosDecodeFixedI64(buf, &(pReq->stbCfg.suid)); buf = taosDecodeFixedI64(buf, &(pReq->stbCfg.suid));
buf = taosDecodeFixedU32(buf, &(pReq->stbCfg.nCols)); buf = taosDecodeFixedI16(buf, &(pReq->stbCfg.nCols));
pReq->stbCfg.pSchema = (SSchema *)taosMemoryMalloc(pReq->stbCfg.nCols * sizeof(SSchema)); buf = taosDecodeFixedI16(buf, &(pReq->stbCfg.nBSmaCols));
for (uint32_t i = 0; i < pReq->stbCfg.nCols; i++) { pReq->stbCfg.pSchema = (SSchemaEx *)taosMemoryMalloc(pReq->stbCfg.nCols * sizeof(SSchemaEx));
for (col_id_t i = 0; i < pReq->stbCfg.nCols; i++) {
buf = taosDecodeFixedI8(buf, &(pReq->stbCfg.pSchema[i].type)); buf = taosDecodeFixedI8(buf, &(pReq->stbCfg.pSchema[i].type));
buf = taosDecodeFixedI8(buf, &(pReq->stbCfg.pSchema[i].sma));
buf = taosDecodeFixedI16(buf, &(pReq->stbCfg.pSchema[i].colId)); buf = taosDecodeFixedI16(buf, &(pReq->stbCfg.pSchema[i].colId));
buf = taosDecodeFixedI32(buf, &(pReq->stbCfg.pSchema[i].bytes)); buf = taosDecodeFixedI32(buf, &(pReq->stbCfg.pSchema[i].bytes));
buf = taosDecodeStringTo(buf, pReq->stbCfg.pSchema[i].name); buf = taosDecodeStringTo(buf, pReq->stbCfg.pSchema[i].name);
} }
buf = taosDecodeFixedU32(buf, &pReq->stbCfg.nTagCols); buf = taosDecodeFixedI16(buf, &pReq->stbCfg.nTagCols);
pReq->stbCfg.pTagSchema = (SSchema *)taosMemoryMalloc(pReq->stbCfg.nTagCols * sizeof(SSchema)); pReq->stbCfg.pTagSchema = (SSchema *)taosMemoryMalloc(pReq->stbCfg.nTagCols * sizeof(SSchema));
for (uint32_t i = 0; i < pReq->stbCfg.nTagCols; i++) { for (col_id_t i = 0; i < pReq->stbCfg.nTagCols; i++) {
buf = taosDecodeFixedI8(buf, &(pReq->stbCfg.pTagSchema[i].type)); buf = taosDecodeFixedI8(buf, &(pReq->stbCfg.pTagSchema[i].type));
buf = taosDecodeFixedI16(buf, &pReq->stbCfg.pTagSchema[i].colId); buf = taosDecodeFixedI16(buf, &pReq->stbCfg.pTagSchema[i].colId);
buf = taosDecodeFixedI32(buf, &pReq->stbCfg.pTagSchema[i].bytes); buf = taosDecodeFixedI32(buf, &pReq->stbCfg.pTagSchema[i].bytes);
buf = taosDecodeStringTo(buf, pReq->stbCfg.pTagSchema[i].name); buf = taosDecodeStringTo(buf, pReq->stbCfg.pTagSchema[i].name);
} }
buf = taosDecodeFixedI16(buf, &(pReq->stbCfg.nBSmaCols));
if (pReq->stbCfg.nBSmaCols > 0) {
pReq->stbCfg.pBSmaCols = (col_id_t *)taosMemoryMalloc(pReq->stbCfg.nBSmaCols * sizeof(col_id_t));
for (col_id_t i = 0; i < pReq->stbCfg.nBSmaCols; ++i) {
buf = taosDecodeFixedI16(buf, pReq->stbCfg.pBSmaCols + i);
}
} else {
pReq->stbCfg.pBSmaCols = NULL;
}
if (pReq->rollup) { if (pReq->rollup) {
pReq->stbCfg.pRSmaParam = (SRSmaParam *)taosMemoryMalloc(sizeof(SRSmaParam)); pReq->stbCfg.pRSmaParam = (SRSmaParam *)taosMemoryMalloc(sizeof(SRSmaParam));
SRSmaParam *param = pReq->stbCfg.pRSmaParam; SRSmaParam *param = pReq->stbCfg.pRSmaParam;
...@@ -418,23 +407,16 @@ void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) { ...@@ -418,23 +407,16 @@ void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) {
buf = tdDecodeKVRow(buf, &pReq->ctbCfg.pTag); buf = tdDecodeKVRow(buf, &pReq->ctbCfg.pTag);
break; break;
case TD_NORMAL_TABLE: case TD_NORMAL_TABLE:
buf = taosDecodeFixedU32(buf, &pReq->ntbCfg.nCols); buf = taosDecodeFixedI16(buf, &pReq->ntbCfg.nCols);
pReq->ntbCfg.pSchema = (SSchema *)taosMemoryMalloc(pReq->ntbCfg.nCols * sizeof(SSchema)); buf = taosDecodeFixedI16(buf, &(pReq->ntbCfg.nBSmaCols));
for (uint32_t i = 0; i < pReq->ntbCfg.nCols; i++) { pReq->ntbCfg.pSchema = (SSchemaEx *)taosMemoryMalloc(pReq->ntbCfg.nCols * sizeof(SSchemaEx));
for (col_id_t i = 0; i < pReq->ntbCfg.nCols; i++) {
buf = taosDecodeFixedI8(buf, &pReq->ntbCfg.pSchema[i].type); buf = taosDecodeFixedI8(buf, &pReq->ntbCfg.pSchema[i].type);
buf = taosDecodeFixedI8(buf, &pReq->ntbCfg.pSchema[i].sma);
buf = taosDecodeFixedI16(buf, &pReq->ntbCfg.pSchema[i].colId); buf = taosDecodeFixedI16(buf, &pReq->ntbCfg.pSchema[i].colId);
buf = taosDecodeFixedI32(buf, &pReq->ntbCfg.pSchema[i].bytes); buf = taosDecodeFixedI32(buf, &pReq->ntbCfg.pSchema[i].bytes);
buf = taosDecodeStringTo(buf, pReq->ntbCfg.pSchema[i].name); buf = taosDecodeStringTo(buf, pReq->ntbCfg.pSchema[i].name);
} }
buf = taosDecodeFixedI16(buf, &(pReq->ntbCfg.nBSmaCols));
if (pReq->ntbCfg.nBSmaCols > 0) {
pReq->ntbCfg.pBSmaCols = (col_id_t *)taosMemoryMalloc(pReq->ntbCfg.nBSmaCols * sizeof(col_id_t));
for (col_id_t i = 0; i < pReq->ntbCfg.nBSmaCols; ++i) {
buf = taosDecodeFixedI16(buf, pReq->ntbCfg.pBSmaCols + i);
}
} else {
pReq->ntbCfg.pBSmaCols = NULL;
}
if (pReq->rollup) { if (pReq->rollup) {
pReq->ntbCfg.pRSmaParam = (SRSmaParam *)taosMemoryMalloc(sizeof(SRSmaParam)); pReq->ntbCfg.pRSmaParam = (SRSmaParam *)taosMemoryMalloc(sizeof(SRSmaParam));
SRSmaParam *param = pReq->ntbCfg.pRSmaParam; SRSmaParam *param = pReq->ntbCfg.pRSmaParam;
......
...@@ -431,7 +431,7 @@ FORCE_INLINE void *getDataMax(int32_t type) { ...@@ -431,7 +431,7 @@ FORCE_INLINE void *getDataMax(int32_t type) {
} }
} }
bool isValidDataType(int32_t type) { return type >= TSDB_DATA_TYPE_NULL && type <= TSDB_DATA_TYPE_UBIGINT; } bool isValidDataType(int32_t type) { return type >= TSDB_DATA_TYPE_NULL && type < TSDB_DATA_TYPE_MAX; }
void setVardataNull(void *val, int32_t type) { void setVardataNull(void *val, int32_t type) {
if (type == TSDB_DATA_TYPE_BINARY) { if (type == TSDB_DATA_TYPE_BINARY) {
......
...@@ -164,37 +164,37 @@ TEST_F(DndTestVnode, 03_Create_Stb) { ...@@ -164,37 +164,37 @@ TEST_F(DndTestVnode, 03_Create_Stb) {
req.keep = 0; req.keep = 0;
req.type = TD_SUPER_TABLE; req.type = TD_SUPER_TABLE;
SSchema schemas[5] = {0}; SSchemaEx schemas[2] = {0};
{ {
SSchema* pSchema = &schemas[0]; SSchemaEx* pSchema = &schemas[0];
pSchema->bytes = htonl(8); pSchema->bytes = htonl(8);
pSchema->type = TSDB_DATA_TYPE_TIMESTAMP; pSchema->type = TSDB_DATA_TYPE_TIMESTAMP;
strcpy(pSchema->name, "ts"); strcpy(pSchema->name, "ts");
} }
{ {
SSchema* pSchema = &schemas[1]; SSchemaEx* pSchema = &schemas[1];
pSchema->bytes = htonl(4); pSchema->bytes = htonl(4);
pSchema->type = TSDB_DATA_TYPE_INT; pSchema->type = TSDB_DATA_TYPE_INT;
strcpy(pSchema->name, "col1"); strcpy(pSchema->name, "col1");
} }
SSchema tagSchemas[3] = {0};
{ {
SSchema* pSchema = &schemas[2]; SSchema* pSchema = &tagSchemas[0];
pSchema->bytes = htonl(2); pSchema->bytes = htonl(2);
pSchema->type = TSDB_DATA_TYPE_TINYINT; pSchema->type = TSDB_DATA_TYPE_TINYINT;
strcpy(pSchema->name, "tag1"); strcpy(pSchema->name, "tag1");
} }
{ {
SSchema* pSchema = &schemas[3]; SSchema* pSchema = &tagSchemas[1];
pSchema->bytes = htonl(8); pSchema->bytes = htonl(8);
pSchema->type = TSDB_DATA_TYPE_BIGINT; pSchema->type = TSDB_DATA_TYPE_BIGINT;
strcpy(pSchema->name, "tag2"); strcpy(pSchema->name, "tag2");
} }
{ {
SSchema* pSchema = &schemas[4]; SSchema* pSchema = &tagSchemas[2];
pSchema->bytes = htonl(16); pSchema->bytes = htonl(16);
pSchema->type = TSDB_DATA_TYPE_BINARY; pSchema->type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema->name, "tag3"); strcpy(pSchema->name, "tag3");
...@@ -204,7 +204,7 @@ TEST_F(DndTestVnode, 03_Create_Stb) { ...@@ -204,7 +204,7 @@ TEST_F(DndTestVnode, 03_Create_Stb) {
req.stbCfg.nCols = 2; req.stbCfg.nCols = 2;
req.stbCfg.pSchema = &schemas[0]; req.stbCfg.pSchema = &schemas[0];
req.stbCfg.nTagCols = 3; req.stbCfg.nTagCols = 3;
req.stbCfg.pTagSchema = &schemas[2]; req.stbCfg.pTagSchema = &tagSchemas[0];
int32_t contLen = tSerializeSVCreateTbReq(NULL, &req) + sizeof(SMsgHead); int32_t contLen = tSerializeSVCreateTbReq(NULL, &req) + sizeof(SMsgHead);
SMsgHead* pHead = (SMsgHead*)rpcMallocCont(contLen); SMsgHead* pHead = (SMsgHead*)rpcMallocCont(contLen);
...@@ -236,37 +236,37 @@ TEST_F(DndTestVnode, 04_Alter_Stb) { ...@@ -236,37 +236,37 @@ TEST_F(DndTestVnode, 04_Alter_Stb) {
req.keep = 0; req.keep = 0;
req.type = TD_SUPER_TABLE; req.type = TD_SUPER_TABLE;
SSchema schemas[5] = {0}; SSchemaEx schemas[2] = {0};
{ {
SSchema* pSchema = &schemas[0]; SSchemaEx* pSchema = &schemas[0];
pSchema->bytes = htonl(8); pSchema->bytes = htonl(8);
pSchema->type = TSDB_DATA_TYPE_TIMESTAMP; pSchema->type = TSDB_DATA_TYPE_TIMESTAMP;
strcpy(pSchema->name, "ts"); strcpy(pSchema->name, "ts");
} }
{ {
SSchema* pSchema = &schemas[1]; SSchemaEx* pSchema = &schemas[1];
pSchema->bytes = htonl(4); pSchema->bytes = htonl(4);
pSchema->type = TSDB_DATA_TYPE_INT; pSchema->type = TSDB_DATA_TYPE_INT;
strcpy(pSchema->name, "col1"); strcpy(pSchema->name, "col1");
} }
SSchema tagSchemas[3] = {0};
{ {
SSchema* pSchema = &schemas[2]; SSchema* pSchema = &tagSchemas[0];
pSchema->bytes = htonl(2); pSchema->bytes = htonl(2);
pSchema->type = TSDB_DATA_TYPE_TINYINT; pSchema->type = TSDB_DATA_TYPE_TINYINT;
strcpy(pSchema->name, "_tag1"); strcpy(pSchema->name, "_tag1");
} }
{ {
SSchema* pSchema = &schemas[3]; SSchema* pSchema = &tagSchemas[1];
pSchema->bytes = htonl(8); pSchema->bytes = htonl(8);
pSchema->type = TSDB_DATA_TYPE_BIGINT; pSchema->type = TSDB_DATA_TYPE_BIGINT;
strcpy(pSchema->name, "_tag2"); strcpy(pSchema->name, "_tag2");
} }
{ {
SSchema* pSchema = &schemas[4]; SSchema* pSchema = &tagSchemas[2];
pSchema->bytes = htonl(16); pSchema->bytes = htonl(16);
pSchema->type = TSDB_DATA_TYPE_BINARY; pSchema->type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema->name, "_tag3"); strcpy(pSchema->name, "_tag3");
...@@ -276,7 +276,7 @@ TEST_F(DndTestVnode, 04_Alter_Stb) { ...@@ -276,7 +276,7 @@ TEST_F(DndTestVnode, 04_Alter_Stb) {
req.stbCfg.nCols = 2; req.stbCfg.nCols = 2;
req.stbCfg.pSchema = &schemas[0]; req.stbCfg.pSchema = &schemas[0];
req.stbCfg.nTagCols = 3; req.stbCfg.nTagCols = 3;
req.stbCfg.pTagSchema = &schemas[2]; req.stbCfg.pTagSchema = &tagSchemas[0];
int32_t contLen = tSerializeSVCreateTbReq(NULL, &req) + sizeof(SMsgHead); int32_t contLen = tSerializeSVCreateTbReq(NULL, &req) + sizeof(SMsgHead);
SMsgHead* pHead = (SMsgHead*)rpcMallocCont(contLen); SMsgHead* pHead = (SMsgHead*)rpcMallocCont(contLen);
......
...@@ -333,6 +333,15 @@ static SDbObj *mndAcquireDbByStb(SMnode *pMnode, const char *stbName) { ...@@ -333,6 +333,15 @@ static SDbObj *mndAcquireDbByStb(SMnode *pMnode, const char *stbName) {
return mndAcquireDb(pMnode, db); return mndAcquireDb(pMnode, db);
} }
static FORCE_INLINE int schemaExColIdCompare(const void *colId, const void *pSchema) {
if (*(col_id_t *)colId < ((SSchemaEx *)pSchema)->colId) {
return -1;
} else if (*(col_id_t *)colId > ((SSchemaEx *)pSchema)->colId) {
return 1;
}
return 0;
}
static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, int32_t *pContLen) { static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, int32_t *pContLen) {
SName name = {0}; SName name = {0};
tNameFromString(&name, pStb->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); tNameFromString(&name, pStb->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
...@@ -348,13 +357,58 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt ...@@ -348,13 +357,58 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt
req.type = TD_SUPER_TABLE; req.type = TD_SUPER_TABLE;
req.stbCfg.suid = pStb->uid; req.stbCfg.suid = pStb->uid;
req.stbCfg.nCols = pStb->numOfColumns; req.stbCfg.nCols = pStb->numOfColumns;
req.stbCfg.pSchema = pStb->pColumns;
req.stbCfg.nTagCols = pStb->numOfTags; req.stbCfg.nTagCols = pStb->numOfTags;
req.stbCfg.pTagSchema = pStb->pTags; req.stbCfg.pTagSchema = pStb->pTags;
req.stbCfg.nBSmaCols = pStb->numOfSmas;
req.stbCfg.pSchema = (SSchemaEx *)taosMemoryCalloc(pStb->numOfColumns, sizeof(SSchemaEx));
if (req.stbCfg.pSchema == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
int bSmaStat = 0; // no column has bsma
if (pStb->numOfSmas == pStb->numOfColumns) { // assume pColumns > 0
bSmaStat = 1; // all columns have bsma
} else if (pStb->numOfSmas != 0) {
bSmaStat = 2; // partial columns have bsma
TASSERT(pStb->pSmas != NULL); // TODO: remove the assert
}
for (int32_t i = 0; i < req.stbCfg.nCols; ++i) {
SSchemaEx *pSchemaEx = req.stbCfg.pSchema + i;
SSchema *pSchema = pStb->pColumns + i;
pSchemaEx->type = pSchema->type;
pSchemaEx->sma = (bSmaStat == 1) ? TSDB_BSMA_TYPE_LATEST : TSDB_BSMA_TYPE_NONE;
pSchemaEx->colId = pSchema->colId;
pSchemaEx->bytes = pSchema->bytes;
memcpy(pSchemaEx->name, pSchema->name, TSDB_COL_NAME_LEN);
}
if (bSmaStat == 2) {
if (pStb->pSmas == NULL) {
mError("stb:%s, sma options is empty", pStb->name);
taosMemoryFreeClear(req.stbCfg.pSchema);
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
return NULL;
}
for (int32_t i = 0; i < pStb->numOfSmas; ++i) {
SSchema *pSmaSchema = pStb->pSmas + i;
SSchemaEx *pColSchema = taosbsearch(&pSmaSchema->colId, req.stbCfg.pSchema, req.stbCfg.nCols, sizeof(SSchemaEx),
schemaExColIdCompare, TD_EQ);
if (pColSchema == NULL) {
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
taosMemoryFreeClear(req.stbCfg.pSchema);
mError("stb:%s, sma col:%s not found in columns", pStb->name, pSmaSchema->name);
return NULL;
}
pColSchema->sma = TSDB_BSMA_TYPE_LATEST;
}
}
int32_t contLen = tSerializeSVCreateTbReq(NULL, &req) + sizeof(SMsgHead); int32_t contLen = tSerializeSVCreateTbReq(NULL, &req) + sizeof(SMsgHead);
SMsgHead *pHead = taosMemoryMalloc(contLen); SMsgHead *pHead = taosMemoryMalloc(contLen);
if (pHead == NULL) { if (pHead == NULL) {
taosMemoryFreeClear(req.stbCfg.pSchema);
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
} }
...@@ -366,6 +420,7 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt ...@@ -366,6 +420,7 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt
tSerializeSVCreateTbReq(&pBuf, &req); tSerializeSVCreateTbReq(&pBuf, &req);
*pContLen = contLen; *pContLen = contLen;
taosMemoryFreeClear(req.stbCfg.pSchema);
return pHead; return pHead;
} }
...@@ -498,7 +553,6 @@ static int32_t mndSetCreateStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj ...@@ -498,7 +553,6 @@ static int32_t mndSetCreateStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
if (pReq == NULL) { if (pReq == NULL) {
sdbCancelFetch(pSdb, pIter); sdbCancelFetch(pSdb, pIter);
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
...@@ -559,9 +613,9 @@ static int32_t mndSetCreateStbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj ...@@ -559,9 +613,9 @@ static int32_t mndSetCreateStbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj
} }
static SSchema *mndFindStbColumns(const SStbObj *pStb, const char *colName) { static SSchema *mndFindStbColumns(const SStbObj *pStb, const char *colName) {
for (int32_t col = 0; col < pStb->numOfColumns; col++) { for (int32_t col = 0; col < pStb->numOfColumns; ++col) {
SSchema *pSchema = &pStb->pColumns[col]; SSchema *pSchema = &pStb->pColumns[col];
if (strcasecmp(pStb->pColumns[col].name, colName) == 0) { if (strncasecmp(pSchema->name, colName, TSDB_COL_NAME_LEN) == 0) {
return pSchema; return pSchema;
} }
} }
...@@ -625,7 +679,7 @@ static int32_t mndCreateStb(SMnode *pMnode, SNodeMsg *pReq, SMCreateStbReq *pCre ...@@ -625,7 +679,7 @@ static int32_t mndCreateStb(SMnode *pMnode, SNodeMsg *pReq, SMCreateStbReq *pCre
SSchema *pSchema = &stbObj.pSmas[i]; SSchema *pSchema = &stbObj.pSmas[i];
SSchema *pColSchema = mndFindStbColumns(&stbObj, pField->name); SSchema *pColSchema = mndFindStbColumns(&stbObj, pField->name);
if (pColSchema == NULL) { if (pColSchema == NULL) {
mError("stb:%s, sma:%s not found in columns", stbObj.name, pSchema->name); mError("stb:%s, sma:%s not found in columns", stbObj.name, pField->name);
terrno = TSDB_CODE_MND_INVALID_STB_OPTION; terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
return -1; return -1;
} }
...@@ -1061,7 +1115,6 @@ static int32_t mndSetAlterStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj ...@@ -1061,7 +1115,6 @@ static int32_t mndSetAlterStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
if (pReq == NULL) { if (pReq == NULL) {
sdbCancelFetch(pSdb, pIter); sdbCancelFetch(pSdb, pIter);
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
......
...@@ -70,9 +70,12 @@ static void *metaDecodeTbInfo(void *buf, STbCfg *pTbCfg); ...@@ -70,9 +70,12 @@ static void *metaDecodeTbInfo(void *buf, STbCfg *pTbCfg);
static void metaClearTbCfg(STbCfg *pTbCfg); static void metaClearTbCfg(STbCfg *pTbCfg);
static int metaEncodeSchema(void **buf, SSchemaWrapper *pSW); static int metaEncodeSchema(void **buf, SSchemaWrapper *pSW);
static void *metaDecodeSchema(void *buf, SSchemaWrapper *pSW); static void *metaDecodeSchema(void *buf, SSchemaWrapper *pSW);
static int metaEncodeSchemaEx(void **buf, SSchemaWrapper *pSW);
static void *metaDecodeSchemaEx(void *buf, SSchemaWrapper *pSW, bool isGetEx);
static void metaDBWLock(SMetaDB *pDB); static void metaDBWLock(SMetaDB *pDB);
static void metaDBRLock(SMetaDB *pDB); static void metaDBRLock(SMetaDB *pDB);
static void metaDBULock(SMetaDB *pDB); static void metaDBULock(SMetaDB *pDB);
static SSchemaWrapper *metaGetTableSchemaImpl(SMeta *pMeta, tb_uid_t uid, int32_t sver, bool isinline, bool isGetEx);
#define BDB_PERR(info, code) fprintf(stderr, info " reason: %s", db_strerror(code)) #define BDB_PERR(info, code) fprintf(stderr, info " reason: %s", db_strerror(code))
...@@ -155,13 +158,13 @@ void metaCloseDB(SMeta *pMeta) { ...@@ -155,13 +158,13 @@ void metaCloseDB(SMeta *pMeta) {
} }
int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg) { int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg) {
tb_uid_t uid; tb_uid_t uid;
char buf[512]; char buf[512];
char buf1[512]; char buf1[512];
void *pBuf; void *pBuf;
DBT key1, value1; DBT key1, value1;
DBT key2, value2; DBT key2, value2;
SSchema *pSchema = NULL; SSchemaEx *pSchema = NULL;
if (pTbCfg->type == META_SUPER_TABLE) { if (pTbCfg->type == META_SUPER_TABLE) {
uid = pTbCfg->stbCfg.suid; uid = pTbCfg->stbCfg.suid;
...@@ -204,8 +207,8 @@ int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg) { ...@@ -204,8 +207,8 @@ int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg) {
key2.data = &schemaKey; key2.data = &schemaKey;
key2.size = sizeof(schemaKey); key2.size = sizeof(schemaKey);
SSchemaWrapper sw = {.nCols = ncols, .pSchema = pSchema}; SSchemaWrapper sw = {.nCols = ncols, .pSchemaEx = pSchema};
metaEncodeSchema(&pBuf, &sw); metaEncodeSchemaEx(&pBuf, &sw);
value2.data = buf1; value2.data = buf1;
value2.size = POINTER_DISTANCE(pBuf, buf1); value2.size = POINTER_DISTANCE(pBuf, buf1);
...@@ -298,6 +301,8 @@ static void *metaDecodeSchema(void *buf, SSchemaWrapper *pSW) { ...@@ -298,6 +301,8 @@ static void *metaDecodeSchema(void *buf, SSchemaWrapper *pSW) {
buf = taosDecodeFixedU32(buf, &pSW->nCols); buf = taosDecodeFixedU32(buf, &pSW->nCols);
pSW->pSchema = (SSchema *)taosMemoryMalloc(sizeof(SSchema) * pSW->nCols); pSW->pSchema = (SSchema *)taosMemoryMalloc(sizeof(SSchema) * pSW->nCols);
int8_t dummy;
for (int i = 0; i < pSW->nCols; i++) { for (int i = 0; i < pSW->nCols; i++) {
pSchema = pSW->pSchema + i; pSchema = pSW->pSchema + i;
buf = taosDecodeFixedI8(buf, &pSchema->type); buf = taosDecodeFixedI8(buf, &pSchema->type);
...@@ -309,6 +314,50 @@ static void *metaDecodeSchema(void *buf, SSchemaWrapper *pSW) { ...@@ -309,6 +314,50 @@ static void *metaDecodeSchema(void *buf, SSchemaWrapper *pSW) {
return buf; return buf;
} }
static int metaEncodeSchemaEx(void **buf, SSchemaWrapper *pSW) {
int tlen = 0;
SSchemaEx *pSchema;
tlen += taosEncodeFixedU32(buf, pSW->nCols);
for (int i = 0; i < pSW->nCols; i++) {
pSchema = pSW->pSchemaEx + i;
tlen += taosEncodeFixedI8(buf, pSchema->type);
tlen += taosEncodeFixedI8(buf, pSchema->sma);
tlen += taosEncodeFixedI16(buf, pSchema->colId);
tlen += taosEncodeFixedI32(buf, pSchema->bytes);
tlen += taosEncodeString(buf, pSchema->name);
}
return tlen;
}
static void *metaDecodeSchemaEx(void *buf, SSchemaWrapper *pSW, bool isGetEx) {
buf = taosDecodeFixedU32(buf, &pSW->nCols);
if (isGetEx) {
pSW->pSchemaEx = (SSchemaEx *)taosMemoryMalloc(sizeof(SSchemaEx) * pSW->nCols);
for (int i = 0; i < pSW->nCols; i++) {
SSchemaEx *pSchema = pSW->pSchemaEx + i;
buf = taosDecodeFixedI8(buf, &pSchema->type);
buf = taosDecodeFixedI8(buf, &pSchema->sma);
buf = taosDecodeFixedI16(buf, &pSchema->colId);
buf = taosDecodeFixedI32(buf, &pSchema->bytes);
buf = taosDecodeStringTo(buf, pSchema->name);
}
} else {
pSW->pSchema = (SSchema *)taosMemoryMalloc(sizeof(SSchema) * pSW->nCols);
for (int i = 0; i < pSW->nCols; i++) {
SSchema *pSchema = pSW->pSchema + i;
buf = taosDecodeFixedI8(buf, &pSchema->type);
buf = taosSkipFixedLen(buf, sizeof(int8_t));
buf = taosDecodeFixedI16(buf, &pSchema->colId);
buf = taosDecodeFixedI32(buf, &pSchema->bytes);
buf = taosDecodeStringTo(buf, pSchema->name);
}
}
return buf;
}
static SMetaDB *metaNewDB() { static SMetaDB *metaNewDB() {
SMetaDB *pDB = NULL; SMetaDB *pDB = NULL;
pDB = (SMetaDB *)taosMemoryCalloc(1, sizeof(*pDB)); pDB = (SMetaDB *)taosMemoryCalloc(1, sizeof(*pDB));
...@@ -652,12 +701,16 @@ STSma *metaGetSmaInfoByIndex(SMeta *pMeta, int64_t indexUid) { ...@@ -652,12 +701,16 @@ STSma *metaGetSmaInfoByIndex(SMeta *pMeta, int64_t indexUid) {
} }
SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, bool isinline) { SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, bool isinline) {
return metaGetTableSchemaImpl(pMeta, uid, sver, isinline, false);
}
static SSchemaWrapper *metaGetTableSchemaImpl(SMeta *pMeta, tb_uid_t uid, int32_t sver, bool isinline, bool isGetEx) {
uint32_t nCols; uint32_t nCols;
SSchemaWrapper *pSW = NULL; SSchemaWrapper *pSW = NULL;
SMetaDB *pDB = pMeta->pDB; SMetaDB *pDB = pMeta->pDB;
int ret; int ret;
void *pBuf; void *pBuf;
SSchema *pSchema; // SSchema *pSchema;
SSchemaKey schemaKey = {uid, sver, 0}; SSchemaKey schemaKey = {uid, sver, 0};
DBT key = {0}; DBT key = {0};
DBT value = {0}; DBT value = {0};
...@@ -678,7 +731,7 @@ SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, boo ...@@ -678,7 +731,7 @@ SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, boo
// Decode the schema // Decode the schema
pBuf = value.data; pBuf = value.data;
pSW = taosMemoryMalloc(sizeof(*pSW)); pSW = taosMemoryMalloc(sizeof(*pSW));
metaDecodeSchema(pBuf, pSW); metaDecodeSchemaEx(pBuf, pSW, isGetEx);
return pSW; return pSW;
} }
...@@ -755,7 +808,7 @@ char *metaTbCursorNext(SMTbCursor *pTbCur) { ...@@ -755,7 +808,7 @@ char *metaTbCursorNext(SMTbCursor *pTbCur) {
STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver) { STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver) {
STSchemaBuilder sb; STSchemaBuilder sb;
STSchema *pTSchema = NULL; STSchema *pTSchema = NULL;
SSchema *pSchema; SSchemaEx *pSchema;
SSchemaWrapper *pSW; SSchemaWrapper *pSW;
STbCfg *pTbCfg; STbCfg *pTbCfg;
tb_uid_t quid; tb_uid_t quid;
...@@ -767,16 +820,16 @@ STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver) { ...@@ -767,16 +820,16 @@ STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver) {
quid = uid; quid = uid;
} }
pSW = metaGetTableSchema(pMeta, quid, sver, true); pSW = metaGetTableSchemaImpl(pMeta, quid, sver, true, true);
if (pSW == NULL) { if (pSW == NULL) {
return NULL; return NULL;
} }
// Rebuild a schema // Rebuild a schema
tdInitTSchemaBuilder(&sb, 0); tdInitTSchemaBuilder(&sb, 0);
for (int32_t i = 0; i < pSW->nCols; i++) { for (int32_t i = 0; i < pSW->nCols; ++i) {
pSchema = pSW->pSchema + i; pSchema = pSW->pSchemaEx + i;
tdAddColToSchema(&sb, pSchema->type, pSchema->colId, pSchema->bytes); tdAddColToSchema(&sb, pSchema->type, pSchema->sma, pSchema->colId, pSchema->bytes);
} }
pTSchema = tdGetSchemaFromBuilder(&sb); pTSchema = tdGetSchemaFromBuilder(&sb);
tdDestroyTSchemaBuilder(&sb); tdDestroyTSchemaBuilder(&sb);
......
...@@ -46,6 +46,10 @@ static int metaEncodeTbInfo(void **buf, STbCfg *pTbCfg); ...@@ -46,6 +46,10 @@ static int metaEncodeTbInfo(void **buf, STbCfg *pTbCfg);
static void *metaDecodeTbInfo(void *buf, STbCfg *pTbCfg); static void *metaDecodeTbInfo(void *buf, STbCfg *pTbCfg);
static int metaEncodeSchema(void **buf, SSchemaWrapper *pSW); static int metaEncodeSchema(void **buf, SSchemaWrapper *pSW);
static void *metaDecodeSchema(void *buf, SSchemaWrapper *pSW); static void *metaDecodeSchema(void *buf, SSchemaWrapper *pSW);
static int metaEncodeSchemaEx(void **buf, SSchemaWrapper *pSW);
static void *metaDecodeSchemaEx(void *buf, SSchemaWrapper *pSW, bool isGetEx);
static SSchemaWrapper *metaGetTableSchemaImpl(SMeta *pMeta, tb_uid_t uid, int32_t sver, bool isinline, bool isGetEx);
static inline int metaUidCmpr(const void *arg1, int len1, const void *arg2, int len2) { static inline int metaUidCmpr(const void *arg1, int len1, const void *arg2, int len2) {
tb_uid_t uid1, uid2; tb_uid_t uid1, uid2;
...@@ -228,7 +232,7 @@ int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg) { ...@@ -228,7 +232,7 @@ int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg) {
schemaWrapper.pSchema = pTbCfg->ntbCfg.pSchema; schemaWrapper.pSchema = pTbCfg->ntbCfg.pSchema;
} }
pVal = pBuf = buf; pVal = pBuf = buf;
metaEncodeSchema(&pBuf, &schemaWrapper); metaEncodeSchemaEx(&pBuf, &schemaWrapper);
vLen = POINTER_DISTANCE(pBuf, buf); vLen = POINTER_DISTANCE(pBuf, buf);
ret = tdbDbInsert(pMetaDb->pSchemaDB, pKey, kLen, pVal, vLen); ret = tdbDbInsert(pMetaDb->pSchemaDB, pKey, kLen, pVal, vLen);
if (ret < 0) { if (ret < 0) {
...@@ -345,6 +349,10 @@ STbCfg *metaGetTbInfoByName(SMeta *pMeta, char *tbname, tb_uid_t *uid) { ...@@ -345,6 +349,10 @@ STbCfg *metaGetTbInfoByName(SMeta *pMeta, char *tbname, tb_uid_t *uid) {
} }
SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, bool isinline) { SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, bool isinline) {
return *metaGetTableSchemaImpl(pMeta, uid, sver, isinline, false);
}
static SSchemaWrapper *metaGetTableSchemaImpl(SMeta *pMeta, tb_uid_t uid, int32_t sver, bool isinline, bool isGetEx) {
void *pKey; void *pKey;
void *pVal; void *pVal;
int kLen; int kLen;
...@@ -368,7 +376,7 @@ SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, boo ...@@ -368,7 +376,7 @@ SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, boo
// decode // decode
pBuf = pVal; pBuf = pVal;
pSchemaWrapper = taosMemoryMalloc(sizeof(*pSchemaWrapper)); pSchemaWrapper = taosMemoryMalloc(sizeof(*pSchemaWrapper));
metaDecodeSchema(pBuf, pSchemaWrapper); metaDecodeSchemaEx(pBuf, pSchemaWrapper, isGetEx);
TDB_FREE(pVal); TDB_FREE(pVal);
...@@ -379,7 +387,7 @@ STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver) { ...@@ -379,7 +387,7 @@ STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver) {
tb_uid_t quid; tb_uid_t quid;
SSchemaWrapper *pSW; SSchemaWrapper *pSW;
STSchemaBuilder sb; STSchemaBuilder sb;
SSchema *pSchema; SSchemaEx *pSchema;
STSchema *pTSchema; STSchema *pTSchema;
STbCfg *pTbCfg; STbCfg *pTbCfg;
...@@ -390,15 +398,15 @@ STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver) { ...@@ -390,15 +398,15 @@ STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver) {
quid = uid; quid = uid;
} }
pSW = metaGetTableSchema(pMeta, quid, sver, true); pSW = metaGetTableSchemaImpl(pMeta, quid, sver, true, true);
if (pSW == NULL) { if (pSW == NULL) {
return NULL; return NULL;
} }
tdInitTSchemaBuilder(&sb, 0); tdInitTSchemaBuilder(&sb, 0);
for (int i = 0; i < pSW->nCols; i++) { for (int i = 0; i < pSW->nCols; i++) {
pSchema = pSW->pSchema + i; pSchema = pSW->pSchemaEx + i;
tdAddColToSchema(&sb, pSchema->type, pSchema->colId, pSchema->bytes); tdAddColToSchema(&sb, pSchema->type, pSchema->sma, pSchema->colId, pSchema->bytes);
} }
pTSchema = tdGetSchemaFromBuilder(&sb); pTSchema = tdGetSchemaFromBuilder(&sb);
tdDestroyTSchemaBuilder(&sb); tdDestroyTSchemaBuilder(&sb);
...@@ -605,6 +613,50 @@ static void *metaDecodeSchema(void *buf, SSchemaWrapper *pSW) { ...@@ -605,6 +613,50 @@ static void *metaDecodeSchema(void *buf, SSchemaWrapper *pSW) {
return buf; return buf;
} }
static int metaEncodeSchemaEx(void **buf, SSchemaWrapper *pSW) {
int tlen = 0;
SSchemaEx *pSchema;
tlen += taosEncodeFixedU32(buf, pSW->nCols);
for (int i = 0; i < pSW->nCols; ++i) {
pSchema = pSW->pSchemaEx + i;
tlen += taosEncodeFixedI8(buf, pSchema->type);
tlen += taosEncodeFixedI8(buf, pSchema->sma);
tlen += taosEncodeFixedI16(buf, pSchema->colId);
tlen += taosEncodeFixedI32(buf, pSchema->bytes);
tlen += taosEncodeString(buf, pSchema->name);
}
return tlen;
}
static void *metaDecodeSchemaEx(void *buf, SSchemaWrapper *pSW, bool isGetEx) {
buf = taosDecodeFixedU32(buf, &pSW->nCols);
if (isGetEx) {
pSW->pSchemaEx = (SSchemaEx *)taosMemoryMalloc(sizeof(SSchemaEx) * pSW->nCols);
for (int i = 0; i < pSW->nCols; i++) {
SSchemaEx *pSchema = pSW->pSchemaEx + i;
buf = taosDecodeFixedI8(buf, &pSchema->type);
buf = taosDecodeFixedI8(buf, &pSchema->sma);
buf = taosDecodeFixedI16(buf, &pSchema->colId);
buf = taosDecodeFixedI32(buf, &pSchema->bytes);
buf = taosDecodeStringTo(buf, pSchema->name);
}
} else {
pSW->pSchema = (SSchema *)taosMemoryMalloc(sizeof(SSchema) * pSW->nCols);
for (int i = 0; i < pSW->nCols; i++) {
SSchema *pSchema = pSW->pSchema + i;
buf = taosDecodeFixedI8(buf, &pSchema->type);
buf = taosSkipFixedLen(buf, sizeof(int8_t));
buf = taosDecodeFixedI16(buf, &pSchema->colId);
buf = taosDecodeFixedI32(buf, &pSchema->bytes);
buf = taosDecodeStringTo(buf, pSchema->name);
}
}
return buf;
}
static int metaEncodeTbInfo(void **buf, STbCfg *pTbCfg) { static int metaEncodeTbInfo(void **buf, STbCfg *pTbCfg) {
int tsize = 0; int tsize = 0;
......
...@@ -81,7 +81,6 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { ...@@ -81,7 +81,6 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
// TODO: maybe need to clear the request struct // TODO: maybe need to clear the request struct
taosMemoryFree(vCreateTbReq.stbCfg.pSchema); taosMemoryFree(vCreateTbReq.stbCfg.pSchema);
taosMemoryFree(vCreateTbReq.stbCfg.pTagSchema); taosMemoryFree(vCreateTbReq.stbCfg.pTagSchema);
taosMemoryFree(vCreateTbReq.stbCfg.pBSmaCols);
taosMemoryFree(vCreateTbReq.stbCfg.pRSmaParam); taosMemoryFree(vCreateTbReq.stbCfg.pRSmaParam);
taosMemoryFree(vCreateTbReq.dbFName); taosMemoryFree(vCreateTbReq.dbFName);
taosMemoryFree(vCreateTbReq.name); taosMemoryFree(vCreateTbReq.name);
...@@ -116,13 +115,11 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { ...@@ -116,13 +115,11 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
if (pCreateTbReq->type == TD_SUPER_TABLE) { if (pCreateTbReq->type == TD_SUPER_TABLE) {
taosMemoryFree(pCreateTbReq->stbCfg.pSchema); taosMemoryFree(pCreateTbReq->stbCfg.pSchema);
taosMemoryFree(pCreateTbReq->stbCfg.pTagSchema); taosMemoryFree(pCreateTbReq->stbCfg.pTagSchema);
taosMemoryFree(pCreateTbReq->stbCfg.pBSmaCols);
taosMemoryFree(pCreateTbReq->stbCfg.pRSmaParam); taosMemoryFree(pCreateTbReq->stbCfg.pRSmaParam);
} else if (pCreateTbReq->type == TD_CHILD_TABLE) { } else if (pCreateTbReq->type == TD_CHILD_TABLE) {
taosMemoryFree(pCreateTbReq->ctbCfg.pTag); taosMemoryFree(pCreateTbReq->ctbCfg.pTag);
} else { } else {
taosMemoryFree(pCreateTbReq->ntbCfg.pSchema); taosMemoryFree(pCreateTbReq->ntbCfg.pSchema);
taosMemoryFree(pCreateTbReq->ntbCfg.pBSmaCols);
taosMemoryFree(pCreateTbReq->ntbCfg.pRSmaParam); taosMemoryFree(pCreateTbReq->ntbCfg.pRSmaParam);
} }
} }
...@@ -150,7 +147,6 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { ...@@ -150,7 +147,6 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
tDeserializeSVCreateTbReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vAlterTbReq); tDeserializeSVCreateTbReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vAlterTbReq);
taosMemoryFree(vAlterTbReq.stbCfg.pSchema); taosMemoryFree(vAlterTbReq.stbCfg.pSchema);
taosMemoryFree(vAlterTbReq.stbCfg.pTagSchema); taosMemoryFree(vAlterTbReq.stbCfg.pTagSchema);
taosMemoryFree(vAlterTbReq.stbCfg.pBSmaCols);
taosMemoryFree(vAlterTbReq.stbCfg.pRSmaParam); taosMemoryFree(vAlterTbReq.stbCfg.pRSmaParam);
taosMemoryFree(vAlterTbReq.dbFName); taosMemoryFree(vAlterTbReq.dbFName);
taosMemoryFree(vAlterTbReq.name); taosMemoryFree(vAlterTbReq.name);
......
...@@ -2152,10 +2152,11 @@ typedef struct SVgroupTablesBatch { ...@@ -2152,10 +2152,11 @@ typedef struct SVgroupTablesBatch {
char dbName[TSDB_DB_NAME_LEN]; char dbName[TSDB_DB_NAME_LEN];
} SVgroupTablesBatch; } SVgroupTablesBatch;
static void toSchema(const SColumnDefNode* pCol, col_id_t colId, SSchema* pSchema) { static void toSchema(const SColumnDefNode* pCol, col_id_t colId, SSchemaEx* pSchema) {
pSchema->colId = colId; pSchema->colId = colId;
pSchema->type = pCol->dataType.type; pSchema->type = pCol->dataType.type;
pSchema->bytes = calcTypeBytes(pCol->dataType); pSchema->bytes = calcTypeBytes(pCol->dataType);
pSchema->sma = TSDB_BSMA_TYPE_LATEST; // TODO: use default value currently, and use the real value later.
strcpy(pSchema->name, pCol->colName); strcpy(pSchema->name, pCol->colName);
} }
...@@ -2177,7 +2178,7 @@ static int32_t buildNormalTableBatchReq(int32_t acctId, const char* pDbName, con ...@@ -2177,7 +2178,7 @@ static int32_t buildNormalTableBatchReq(int32_t acctId, const char* pDbName, con
req.dbFName = strdup(dbFName); req.dbFName = strdup(dbFName);
req.name = strdup(pTableName); req.name = strdup(pTableName);
req.ntbCfg.nCols = LIST_LENGTH(pColumns); req.ntbCfg.nCols = LIST_LENGTH(pColumns);
req.ntbCfg.pSchema = taosMemoryCalloc(req.ntbCfg.nCols, sizeof(SSchema)); req.ntbCfg.pSchema = taosMemoryCalloc(req.ntbCfg.nCols, sizeof(SSchemaEx));
if (NULL == req.name || NULL == req.ntbCfg.pSchema) { if (NULL == req.name || NULL == req.ntbCfg.pSchema) {
destroyCreateTbReq(&req); destroyCreateTbReq(&req);
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
...@@ -2188,6 +2189,7 @@ static int32_t buildNormalTableBatchReq(int32_t acctId, const char* pDbName, con ...@@ -2188,6 +2189,7 @@ static int32_t buildNormalTableBatchReq(int32_t acctId, const char* pDbName, con
toSchema((SColumnDefNode*)pCol, index + 1, req.ntbCfg.pSchema + index); toSchema((SColumnDefNode*)pCol, index + 1, req.ntbCfg.pSchema + index);
++index; ++index;
} }
// TODO: use the real sma for normal table.
pBatch->info = *pVgroupInfo; pBatch->info = *pVgroupInfo;
strcpy(pBatch->dbName, pDbName); strcpy(pBatch->dbName, pDbName);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册