提交 cf68440a 编写于 作者: C Cary Xu

bSma grammar integrate

上级 02f26d14
......@@ -54,6 +54,7 @@ typedef void TAOS_SUB;
#define TSDB_DATA_TYPE_BLOB 18 // binary
#define TSDB_DATA_TYPE_MEDIUMBLOB 19
#define TSDB_DATA_TYPE_BINARY TSDB_DATA_TYPE_VARCHAR // string
#define TSDB_DATA_TYPE_MAX 20
typedef enum {
TSDB_OPTION_LOCALE,
......
......@@ -76,6 +76,13 @@ typedef enum {
TSDB_SMA_TYPE_ROLLUP = 2, // Rollup SMA
} ETsdbSmaType;
typedef enum {
TSDB_BSMA_TYPE_NONE = 0, // no block-wise SMA
TSDB_BSMA_TYPE_I = 1, // sum/min/max(default)
} ETsdbBSmaType;
#define TSDB_BSMA_TYPE_LATEST TSDB_BSMA_TYPE_I
extern char *qtypeStr[];
#define TSDB_PORT_HTTP 11
......
......@@ -70,11 +70,13 @@ typedef struct {
#pragma pack(pop)
#define colType(col) ((col)->type)
#define colSma(col) ((col)->sma)
#define colColId(col) ((col)->colId)
#define colBytes(col) ((col)->bytes)
#define colOffset(col) ((col)->offset)
#define colSetType(col, t) (colType(col) = (t))
#define colSetSma(col, s) (colSma(col) = (s))
#define colSetColId(col, id) (colColId(col) = (id))
#define colSetBytes(col, b) (colBytes(col) = (b))
#define colSetOffset(col, o) (colOffset(col) = (o))
......@@ -139,7 +141,7 @@ typedef struct {
int32_t tdInitTSchemaBuilder(STSchemaBuilder *pBuilder, schema_ver_t version);
void tdDestroyTSchemaBuilder(STSchemaBuilder *pBuilder);
void tdResetTSchemaBuilder(STSchemaBuilder *pBuilder, schema_ver_t version);
int32_t tdAddColToSchema(STSchemaBuilder *pBuilder, int8_t type, col_id_t colId, col_bytes_t bytes);
int32_t tdAddColToSchema(STSchemaBuilder *pBuilder, int8_t type, int8_t sma, col_id_t colId, col_bytes_t bytes);
STSchema *tdGetSchemaFromBuilder(STSchemaBuilder *pBuilder);
// ----------------- Semantic timestamp key definition
......
......@@ -265,6 +265,20 @@ typedef struct SSchema {
char name[TSDB_COL_NAME_LEN];
} SSchema;
typedef struct {
int8_t type;
int8_t sma; // ETsdbBSmaType and default is TSDB_BSMA_TYPE_I
col_id_t colId;
int32_t bytes;
char name[TSDB_COL_NAME_LEN];
} SSchemaEx;
#define SSCHMEA_TYPE(s) ((s)->type)
#define SSCHMEA_SMA(s) ((s)->sma)
#define SSCHMEA_COLID(s) ((s)->colId)
#define SSCHMEA_BYTES(s) ((s)->bytes)
#define SSCHMEA_NAME(s) ((s)->name)
typedef struct {
char name[TSDB_TABLE_FNAME_LEN];
int8_t igExists;
......@@ -1403,13 +1417,12 @@ typedef struct SVCreateTbReq {
};
union {
struct {
tb_uid_t suid;
uint32_t nCols;
SSchema* pSchema;
uint32_t nTagCols;
SSchema* pTagSchema;
col_id_t nBSmaCols;
col_id_t* pBSmaCols;
tb_uid_t suid;
col_id_t nCols;
col_id_t nBSmaCols;
SSchemaEx* pSchema;
col_id_t nTagCols;
SSchema* pTagSchema;
SRSmaParam* pRSmaParam;
} stbCfg;
struct {
......@@ -1417,10 +1430,9 @@ typedef struct SVCreateTbReq {
SKVRow pTag;
} ctbCfg;
struct {
uint32_t nCols;
SSchema* pSchema;
col_id_t nBSmaCols;
col_id_t* pBSmaCols;
col_id_t nCols;
col_id_t nBSmaCols;
SSchemaEx* pSchema;
SRSmaParam* pRSmaParam;
} ntbCfg;
};
......@@ -1899,7 +1911,10 @@ int32_t tDecodeSMqCMCommitOffsetReq(SCoder* decoder, SMqCMCommitOffsetReq* pReq)
typedef struct {
uint32_t nCols;
SSchema* pSchema;
union {
SSchema* pSchema;
SSchemaEx* pSchemaEx;
};
} SSchemaWrapper;
static FORCE_INLINE int32_t taosEncodeSSchema(void** buf, const SSchema* pSchema) {
......
......@@ -37,8 +37,10 @@ int32_t taosMemorySize(void *ptr);
#define taosMemoryFreeClear(ptr) \
do { \
taosMemoryFree(ptr); \
(ptr)=NULL; \
if (ptr) { \
taosMemoryFree(ptr); \
(ptr) = NULL; \
} \
} while (0)
#ifdef __cplusplus
......
......@@ -57,6 +57,8 @@ static FORCE_INLINE void *taosDecodeFixedI8(const void *buf, int8_t *value) {
return POINTER_SHIFT(buf, sizeof(*value));
}
static FORCE_INLINE void *taosSkipFixedLen(const void *buf, size_t len) { return POINTER_SHIFT(buf, len); }
// ---- Fixed U16
static FORCE_INLINE int32_t taosEncodeFixedU16(void **buf, uint16_t value) {
if (buf != NULL) {
......
......@@ -85,6 +85,7 @@ int tdEncodeSchema(void **buf, STSchema *pSchema) {
for (int i = 0; i < schemaNCols(pSchema); i++) {
STColumn *pCol = schemaColAt(pSchema, i);
tlen += taosEncodeFixedI8(buf, colType(pCol));
tlen += taosEncodeFixedI8(buf, colSma(pCol));
tlen += taosEncodeFixedI16(buf, colColId(pCol));
tlen += taosEncodeFixedI16(buf, colBytes(pCol));
}
......@@ -107,12 +108,14 @@ void *tdDecodeSchema(void *buf, STSchema **pRSchema) {
for (int i = 0; i < numOfCols; i++) {
col_type_t type = 0;
int8_t sma = TSDB_BSMA_TYPE_NONE;
col_id_t colId = 0;
col_bytes_t bytes = 0;
buf = taosDecodeFixedI8(buf, &type);
buf = taosDecodeFixedI8(buf, &sma);
buf = taosDecodeFixedI16(buf, &colId);
buf = taosDecodeFixedI32(buf, &bytes);
if (tdAddColToSchema(&schemaBuilder, type, colId, bytes) < 0) {
if (tdAddColToSchema(&schemaBuilder, type, sma, colId, bytes) < 0) {
tdDestroyTSchemaBuilder(&schemaBuilder);
return NULL;
}
......@@ -148,7 +151,7 @@ void tdResetTSchemaBuilder(STSchemaBuilder *pBuilder, schema_ver_t version) {
pBuilder->version = version;
}
int tdAddColToSchema(STSchemaBuilder *pBuilder, int8_t type, col_id_t colId, col_bytes_t bytes) {
int32_t tdAddColToSchema(STSchemaBuilder *pBuilder, int8_t type, int8_t sma, col_id_t colId, col_bytes_t bytes) {
if (!isValidDataType(type)) return -1;
if (pBuilder->nCols >= pBuilder->tCols) {
......@@ -161,6 +164,7 @@ int tdAddColToSchema(STSchemaBuilder *pBuilder, int8_t type, col_id_t colId, col
STColumn *pCol = &(pBuilder->columns[pBuilder->nCols]);
colSetType(pCol, type);
colSetColId(pCol, colId);
colSetSma(pCol, sma);
if (pBuilder->nCols == 0) {
colSetOffset(pCol, 0);
} else {
......@@ -168,9 +172,6 @@ int tdAddColToSchema(STSchemaBuilder *pBuilder, int8_t type, col_id_t colId, col
colSetOffset(pCol, pTCol->offset + TYPE_BYTES[pTCol->type]);
}
// TODO: set sma value by user input
pCol->sma = 1;
if (IS_VAR_DATA_TYPE(type)) {
colSetBytes(pCol, bytes);
pBuilder->tlen += (TYPE_BYTES[type] + bytes);
......
......@@ -296,24 +296,22 @@ int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) {
switch (pReq->type) {
case TD_SUPER_TABLE:
tlen += taosEncodeFixedI64(buf, pReq->stbCfg.suid);
tlen += taosEncodeFixedU32(buf, pReq->stbCfg.nCols);
for (uint32_t i = 0; i < pReq->stbCfg.nCols; i++) {
tlen += taosEncodeFixedI16(buf, pReq->stbCfg.nCols);
tlen += taosEncodeFixedI16(buf, pReq->stbCfg.nBSmaCols);
for (col_id_t i = 0; i < pReq->stbCfg.nCols; ++i) {
tlen += taosEncodeFixedI8(buf, pReq->stbCfg.pSchema[i].type);
tlen += taosEncodeFixedI8(buf, pReq->stbCfg.pSchema[i].sma);
tlen += taosEncodeFixedI16(buf, pReq->stbCfg.pSchema[i].colId);
tlen += taosEncodeFixedI32(buf, pReq->stbCfg.pSchema[i].bytes);
tlen += taosEncodeString(buf, pReq->stbCfg.pSchema[i].name);
}
tlen += taosEncodeFixedU32(buf, pReq->stbCfg.nTagCols);
for (uint32_t i = 0; i < pReq->stbCfg.nTagCols; i++) {
tlen += taosEncodeFixedI16(buf, pReq->stbCfg.nTagCols);
for (col_id_t i = 0; i < pReq->stbCfg.nTagCols; ++i) {
tlen += taosEncodeFixedI8(buf, pReq->stbCfg.pTagSchema[i].type);
tlen += taosEncodeFixedI16(buf, pReq->stbCfg.pTagSchema[i].colId);
tlen += taosEncodeFixedI32(buf, pReq->stbCfg.pTagSchema[i].bytes);
tlen += taosEncodeString(buf, pReq->stbCfg.pTagSchema[i].name);
}
tlen += taosEncodeFixedI16(buf, pReq->stbCfg.nBSmaCols);
for (col_id_t i = 0; i < pReq->stbCfg.nBSmaCols; ++i) {
tlen += taosEncodeFixedI16(buf, pReq->stbCfg.pBSmaCols[i]);
}
if (pReq->rollup && pReq->stbCfg.pRSmaParam) {
SRSmaParam *param = pReq->stbCfg.pRSmaParam;
tlen += taosEncodeFixedU32(buf, (uint32_t)param->xFilesFactor);
......@@ -330,17 +328,15 @@ int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) {
tlen += tdEncodeKVRow(buf, pReq->ctbCfg.pTag);
break;
case TD_NORMAL_TABLE:
tlen += taosEncodeFixedU32(buf, pReq->ntbCfg.nCols);
for (uint32_t i = 0; i < pReq->ntbCfg.nCols; i++) {
tlen += taosEncodeFixedI16(buf, pReq->ntbCfg.nCols);
tlen += taosEncodeFixedI16(buf, pReq->ntbCfg.nBSmaCols);
for (col_id_t i = 0; i < pReq->ntbCfg.nCols; ++i) {
tlen += taosEncodeFixedI8(buf, pReq->ntbCfg.pSchema[i].type);
tlen += taosEncodeFixedI8(buf, pReq->ntbCfg.pSchema[i].sma);
tlen += taosEncodeFixedI16(buf, pReq->ntbCfg.pSchema[i].colId);
tlen += taosEncodeFixedI32(buf, pReq->ntbCfg.pSchema[i].bytes);
tlen += taosEncodeString(buf, pReq->ntbCfg.pSchema[i].name);
}
tlen += taosEncodeFixedI16(buf, pReq->ntbCfg.nBSmaCols);
for (col_id_t i = 0; i < pReq->ntbCfg.nBSmaCols; ++i) {
tlen += taosEncodeFixedI16(buf, pReq->ntbCfg.pBSmaCols[i]);
}
if (pReq->rollup && pReq->ntbCfg.pRSmaParam) {
SRSmaParam *param = pReq->ntbCfg.pRSmaParam;
tlen += taosEncodeFixedU32(buf, (uint32_t)param->xFilesFactor);
......@@ -370,31 +366,24 @@ void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) {
switch (pReq->type) {
case TD_SUPER_TABLE:
buf = taosDecodeFixedI64(buf, &(pReq->stbCfg.suid));
buf = taosDecodeFixedU32(buf, &(pReq->stbCfg.nCols));
pReq->stbCfg.pSchema = (SSchema *)taosMemoryMalloc(pReq->stbCfg.nCols * sizeof(SSchema));
for (uint32_t i = 0; i < pReq->stbCfg.nCols; i++) {
buf = taosDecodeFixedI16(buf, &(pReq->stbCfg.nCols));
buf = taosDecodeFixedI16(buf, &(pReq->stbCfg.nBSmaCols));
pReq->stbCfg.pSchema = (SSchemaEx *)taosMemoryMalloc(pReq->stbCfg.nCols * sizeof(SSchemaEx));
for (col_id_t i = 0; i < pReq->stbCfg.nCols; i++) {
buf = taosDecodeFixedI8(buf, &(pReq->stbCfg.pSchema[i].type));
buf = taosDecodeFixedI8(buf, &(pReq->stbCfg.pSchema[i].sma));
buf = taosDecodeFixedI16(buf, &(pReq->stbCfg.pSchema[i].colId));
buf = taosDecodeFixedI32(buf, &(pReq->stbCfg.pSchema[i].bytes));
buf = taosDecodeStringTo(buf, pReq->stbCfg.pSchema[i].name);
}
buf = taosDecodeFixedU32(buf, &pReq->stbCfg.nTagCols);
buf = taosDecodeFixedI16(buf, &pReq->stbCfg.nTagCols);
pReq->stbCfg.pTagSchema = (SSchema *)taosMemoryMalloc(pReq->stbCfg.nTagCols * sizeof(SSchema));
for (uint32_t i = 0; i < pReq->stbCfg.nTagCols; i++) {
for (col_id_t i = 0; i < pReq->stbCfg.nTagCols; i++) {
buf = taosDecodeFixedI8(buf, &(pReq->stbCfg.pTagSchema[i].type));
buf = taosDecodeFixedI16(buf, &pReq->stbCfg.pTagSchema[i].colId);
buf = taosDecodeFixedI32(buf, &pReq->stbCfg.pTagSchema[i].bytes);
buf = taosDecodeStringTo(buf, pReq->stbCfg.pTagSchema[i].name);
}
buf = taosDecodeFixedI16(buf, &(pReq->stbCfg.nBSmaCols));
if (pReq->stbCfg.nBSmaCols > 0) {
pReq->stbCfg.pBSmaCols = (col_id_t *)taosMemoryMalloc(pReq->stbCfg.nBSmaCols * sizeof(col_id_t));
for (col_id_t i = 0; i < pReq->stbCfg.nBSmaCols; ++i) {
buf = taosDecodeFixedI16(buf, pReq->stbCfg.pBSmaCols + i);
}
} else {
pReq->stbCfg.pBSmaCols = NULL;
}
if (pReq->rollup) {
pReq->stbCfg.pRSmaParam = (SRSmaParam *)taosMemoryMalloc(sizeof(SRSmaParam));
SRSmaParam *param = pReq->stbCfg.pRSmaParam;
......@@ -418,23 +407,16 @@ void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) {
buf = tdDecodeKVRow(buf, &pReq->ctbCfg.pTag);
break;
case TD_NORMAL_TABLE:
buf = taosDecodeFixedU32(buf, &pReq->ntbCfg.nCols);
pReq->ntbCfg.pSchema = (SSchema *)taosMemoryMalloc(pReq->ntbCfg.nCols * sizeof(SSchema));
for (uint32_t i = 0; i < pReq->ntbCfg.nCols; i++) {
buf = taosDecodeFixedI16(buf, &pReq->ntbCfg.nCols);
buf = taosDecodeFixedI16(buf, &(pReq->ntbCfg.nBSmaCols));
pReq->ntbCfg.pSchema = (SSchemaEx *)taosMemoryMalloc(pReq->ntbCfg.nCols * sizeof(SSchemaEx));
for (col_id_t i = 0; i < pReq->ntbCfg.nCols; i++) {
buf = taosDecodeFixedI8(buf, &pReq->ntbCfg.pSchema[i].type);
buf = taosDecodeFixedI8(buf, &pReq->ntbCfg.pSchema[i].sma);
buf = taosDecodeFixedI16(buf, &pReq->ntbCfg.pSchema[i].colId);
buf = taosDecodeFixedI32(buf, &pReq->ntbCfg.pSchema[i].bytes);
buf = taosDecodeStringTo(buf, pReq->ntbCfg.pSchema[i].name);
}
buf = taosDecodeFixedI16(buf, &(pReq->ntbCfg.nBSmaCols));
if (pReq->ntbCfg.nBSmaCols > 0) {
pReq->ntbCfg.pBSmaCols = (col_id_t *)taosMemoryMalloc(pReq->ntbCfg.nBSmaCols * sizeof(col_id_t));
for (col_id_t i = 0; i < pReq->ntbCfg.nBSmaCols; ++i) {
buf = taosDecodeFixedI16(buf, pReq->ntbCfg.pBSmaCols + i);
}
} else {
pReq->ntbCfg.pBSmaCols = NULL;
}
if (pReq->rollup) {
pReq->ntbCfg.pRSmaParam = (SRSmaParam *)taosMemoryMalloc(sizeof(SRSmaParam));
SRSmaParam *param = pReq->ntbCfg.pRSmaParam;
......
......@@ -431,7 +431,7 @@ FORCE_INLINE void *getDataMax(int32_t type) {
}
}
bool isValidDataType(int32_t type) { return type >= TSDB_DATA_TYPE_NULL && type <= TSDB_DATA_TYPE_UBIGINT; }
bool isValidDataType(int32_t type) { return type >= TSDB_DATA_TYPE_NULL && type < TSDB_DATA_TYPE_MAX; }
void setVardataNull(void *val, int32_t type) {
if (type == TSDB_DATA_TYPE_BINARY) {
......
......@@ -164,37 +164,37 @@ TEST_F(DndTestVnode, 03_Create_Stb) {
req.keep = 0;
req.type = TD_SUPER_TABLE;
SSchema schemas[5] = {0};
SSchemaEx schemas[2] = {0};
{
SSchema* pSchema = &schemas[0];
SSchemaEx* pSchema = &schemas[0];
pSchema->bytes = htonl(8);
pSchema->type = TSDB_DATA_TYPE_TIMESTAMP;
strcpy(pSchema->name, "ts");
}
{
SSchema* pSchema = &schemas[1];
SSchemaEx* pSchema = &schemas[1];
pSchema->bytes = htonl(4);
pSchema->type = TSDB_DATA_TYPE_INT;
strcpy(pSchema->name, "col1");
}
SSchema tagSchemas[3] = {0};
{
SSchema* pSchema = &schemas[2];
SSchema* pSchema = &tagSchemas[0];
pSchema->bytes = htonl(2);
pSchema->type = TSDB_DATA_TYPE_TINYINT;
strcpy(pSchema->name, "tag1");
}
{
SSchema* pSchema = &schemas[3];
SSchema* pSchema = &tagSchemas[1];
pSchema->bytes = htonl(8);
pSchema->type = TSDB_DATA_TYPE_BIGINT;
strcpy(pSchema->name, "tag2");
}
{
SSchema* pSchema = &schemas[4];
SSchema* pSchema = &tagSchemas[2];
pSchema->bytes = htonl(16);
pSchema->type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema->name, "tag3");
......@@ -204,7 +204,7 @@ TEST_F(DndTestVnode, 03_Create_Stb) {
req.stbCfg.nCols = 2;
req.stbCfg.pSchema = &schemas[0];
req.stbCfg.nTagCols = 3;
req.stbCfg.pTagSchema = &schemas[2];
req.stbCfg.pTagSchema = &tagSchemas[0];
int32_t contLen = tSerializeSVCreateTbReq(NULL, &req) + sizeof(SMsgHead);
SMsgHead* pHead = (SMsgHead*)rpcMallocCont(contLen);
......@@ -236,37 +236,37 @@ TEST_F(DndTestVnode, 04_Alter_Stb) {
req.keep = 0;
req.type = TD_SUPER_TABLE;
SSchema schemas[5] = {0};
SSchemaEx schemas[2] = {0};
{
SSchema* pSchema = &schemas[0];
SSchemaEx* pSchema = &schemas[0];
pSchema->bytes = htonl(8);
pSchema->type = TSDB_DATA_TYPE_TIMESTAMP;
strcpy(pSchema->name, "ts");
}
{
SSchema* pSchema = &schemas[1];
SSchemaEx* pSchema = &schemas[1];
pSchema->bytes = htonl(4);
pSchema->type = TSDB_DATA_TYPE_INT;
strcpy(pSchema->name, "col1");
}
SSchema tagSchemas[3] = {0};
{
SSchema* pSchema = &schemas[2];
SSchema* pSchema = &tagSchemas[0];
pSchema->bytes = htonl(2);
pSchema->type = TSDB_DATA_TYPE_TINYINT;
strcpy(pSchema->name, "_tag1");
}
{
SSchema* pSchema = &schemas[3];
SSchema* pSchema = &tagSchemas[1];
pSchema->bytes = htonl(8);
pSchema->type = TSDB_DATA_TYPE_BIGINT;
strcpy(pSchema->name, "_tag2");
}
{
SSchema* pSchema = &schemas[4];
SSchema* pSchema = &tagSchemas[2];
pSchema->bytes = htonl(16);
pSchema->type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema->name, "_tag3");
......@@ -276,7 +276,7 @@ TEST_F(DndTestVnode, 04_Alter_Stb) {
req.stbCfg.nCols = 2;
req.stbCfg.pSchema = &schemas[0];
req.stbCfg.nTagCols = 3;
req.stbCfg.pTagSchema = &schemas[2];
req.stbCfg.pTagSchema = &tagSchemas[0];
int32_t contLen = tSerializeSVCreateTbReq(NULL, &req) + sizeof(SMsgHead);
SMsgHead* pHead = (SMsgHead*)rpcMallocCont(contLen);
......
......@@ -333,6 +333,15 @@ static SDbObj *mndAcquireDbByStb(SMnode *pMnode, const char *stbName) {
return mndAcquireDb(pMnode, db);
}
static FORCE_INLINE int schemaExColIdCompare(const void *colId, const void *pSchema) {
if (*(col_id_t *)colId < ((SSchemaEx *)pSchema)->colId) {
return -1;
} else if (*(col_id_t *)colId > ((SSchemaEx *)pSchema)->colId) {
return 1;
}
return 0;
}
static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, int32_t *pContLen) {
SName name = {0};
tNameFromString(&name, pStb->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
......@@ -348,13 +357,60 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt
req.type = TD_SUPER_TABLE;
req.stbCfg.suid = pStb->uid;
req.stbCfg.nCols = pStb->numOfColumns;
req.stbCfg.pSchema = pStb->pColumns;
req.stbCfg.nTagCols = pStb->numOfTags;
req.stbCfg.pTagSchema = pStb->pTags;
req.stbCfg.nBSmaCols = pStb->numOfSmas;
req.stbCfg.pSchema = (SSchemaEx *)taosMemoryCalloc(pStb->numOfColumns, sizeof(SSchemaEx));
if (req.stbCfg.pSchema == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
int bSmaStat = 0; // no column has bsma
if (pStb->numOfSmas == pStb->numOfColumns) { // assume pColumns > 0
bSmaStat = 1; // all columns have bsma
TASSERT(pStb->pSmas == NULL); // TODO: remove the assert
} else if (pStb->numOfSmas != 0) {
bSmaStat = 2; // partial columns have bsma
TASSERT(pStb->pSmas != NULL); // TODO: remove the assert
} else {
TASSERT(pStb->pSmas == NULL); // TODO: remove the assert
}
for (int32_t i = 0; i < req.stbCfg.nCols; ++i) {
SSchemaEx *pSchemaEx = req.stbCfg.pSchema + i;
SSchema *pSchema = pStb->pColumns + i;
pSchemaEx->type = pSchema->type;
pSchemaEx->sma = (bSmaStat == 1) ? TSDB_BSMA_TYPE_LATEST : TSDB_BSMA_TYPE_NONE;
pSchemaEx->colId = pSchema->colId;
pSchemaEx->bytes = pSchema->bytes;
memcpy(pSchemaEx->name, pSchema->name, TSDB_COL_NAME_LEN);
}
if (bSmaStat == 2) {
if (pStb->pSmas == NULL) {
mError("stb:%s, sma options is empty", pStb->name);
taosMemoryFreeClear(req.stbCfg.pSchema);
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
return NULL;
}
for (int32_t i = 0; i < pStb->numOfSmas; ++i) {
SSchema *pSmaSchema = pStb->pSmas + i;
SSchemaEx *pColSchema = taosbsearch(&pSmaSchema->colId, req.stbCfg.pSchema, req.stbCfg.nCols, sizeof(SSchemaEx),
schemaExColIdCompare, TD_EQ);
if (pColSchema == NULL) {
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
taosMemoryFreeClear(req.stbCfg.pSchema);
mError("stb:%s, sma col:%s not found in columns", pStb->name, pSmaSchema->name);
return NULL;
}
pColSchema->sma = TSDB_BSMA_TYPE_LATEST;
}
}
int32_t contLen = tSerializeSVCreateTbReq(NULL, &req) + sizeof(SMsgHead);
int32_t contLen = tSerializeSVCreateTbReq(NULL, &req) + sizeof(SMsgHead);
SMsgHead *pHead = taosMemoryMalloc(contLen);
if (pHead == NULL) {
taosMemoryFreeClear(req.stbCfg.pSchema);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
......@@ -366,6 +422,7 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt
tSerializeSVCreateTbReq(&pBuf, &req);
*pContLen = contLen;
taosMemoryFreeClear(req.stbCfg.pSchema);
return pHead;
}
......@@ -498,7 +555,6 @@ static int32_t mndSetCreateStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
if (pReq == NULL) {
sdbCancelFetch(pSdb, pIter);
sdbRelease(pSdb, pVgroup);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
......@@ -559,9 +615,9 @@ static int32_t mndSetCreateStbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj
}
static SSchema *mndFindStbColumns(const SStbObj *pStb, const char *colName) {
for (int32_t col = 0; col < pStb->numOfColumns; col++) {
for (int32_t col = 0; col < pStb->numOfColumns; ++col) {
SSchema *pSchema = &pStb->pColumns[col];
if (strcasecmp(pStb->pColumns[col].name, colName) == 0) {
if (strncasecmp(pSchema->name, colName, TSDB_COL_NAME_LEN) == 0) {
return pSchema;
}
}
......@@ -625,7 +681,7 @@ static int32_t mndCreateStb(SMnode *pMnode, SNodeMsg *pReq, SMCreateStbReq *pCre
SSchema *pSchema = &stbObj.pSmas[i];
SSchema *pColSchema = mndFindStbColumns(&stbObj, pField->name);
if (pColSchema == NULL) {
mError("stb:%s, sma:%s not found in columns", stbObj.name, pSchema->name);
mError("stb:%s, sma:%s not found in columns", stbObj.name, pField->name);
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
return -1;
}
......@@ -1061,7 +1117,6 @@ static int32_t mndSetAlterStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
if (pReq == NULL) {
sdbCancelFetch(pSdb, pIter);
sdbRelease(pSdb, pVgroup);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
......
......@@ -70,9 +70,12 @@ static void *metaDecodeTbInfo(void *buf, STbCfg *pTbCfg);
static void metaClearTbCfg(STbCfg *pTbCfg);
static int metaEncodeSchema(void **buf, SSchemaWrapper *pSW);
static void *metaDecodeSchema(void *buf, SSchemaWrapper *pSW);
static int metaEncodeSchemaEx(void **buf, SSchemaWrapper *pSW);
static void *metaDecodeSchemaEx(void *buf, SSchemaWrapper *pSW, bool isGetEx);
static void metaDBWLock(SMetaDB *pDB);
static void metaDBRLock(SMetaDB *pDB);
static void metaDBULock(SMetaDB *pDB);
static SSchemaWrapper *metaGetTableSchemaImpl(SMeta *pMeta, tb_uid_t uid, int32_t sver, bool isinline, bool isGetEx);
#define BDB_PERR(info, code) fprintf(stderr, info " reason: %s", db_strerror(code))
......@@ -155,13 +158,13 @@ void metaCloseDB(SMeta *pMeta) {
}
int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg) {
tb_uid_t uid;
char buf[512];
char buf1[512];
void *pBuf;
DBT key1, value1;
DBT key2, value2;
SSchema *pSchema = NULL;
tb_uid_t uid;
char buf[512];
char buf1[512];
void *pBuf;
DBT key1, value1;
DBT key2, value2;
SSchemaEx *pSchema = NULL;
if (pTbCfg->type == META_SUPER_TABLE) {
uid = pTbCfg->stbCfg.suid;
......@@ -204,8 +207,8 @@ int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg) {
key2.data = &schemaKey;
key2.size = sizeof(schemaKey);
SSchemaWrapper sw = {.nCols = ncols, .pSchema = pSchema};
metaEncodeSchema(&pBuf, &sw);
SSchemaWrapper sw = {.nCols = ncols, .pSchemaEx = pSchema};
metaEncodeSchemaEx(&pBuf, &sw);
value2.data = buf1;
value2.size = POINTER_DISTANCE(pBuf, buf1);
......@@ -298,6 +301,8 @@ static void *metaDecodeSchema(void *buf, SSchemaWrapper *pSW) {
buf = taosDecodeFixedU32(buf, &pSW->nCols);
pSW->pSchema = (SSchema *)taosMemoryMalloc(sizeof(SSchema) * pSW->nCols);
int8_t dummy;
for (int i = 0; i < pSW->nCols; i++) {
pSchema = pSW->pSchema + i;
buf = taosDecodeFixedI8(buf, &pSchema->type);
......@@ -309,6 +314,50 @@ static void *metaDecodeSchema(void *buf, SSchemaWrapper *pSW) {
return buf;
}
static int metaEncodeSchemaEx(void **buf, SSchemaWrapper *pSW) {
int tlen = 0;
SSchemaEx *pSchema;
tlen += taosEncodeFixedU32(buf, pSW->nCols);
for (int i = 0; i < pSW->nCols; i++) {
pSchema = pSW->pSchemaEx + i;
tlen += taosEncodeFixedI8(buf, pSchema->type);
tlen += taosEncodeFixedI8(buf, pSchema->sma);
tlen += taosEncodeFixedI16(buf, pSchema->colId);
tlen += taosEncodeFixedI32(buf, pSchema->bytes);
tlen += taosEncodeString(buf, pSchema->name);
}
return tlen;
}
static void *metaDecodeSchemaEx(void *buf, SSchemaWrapper *pSW, bool isGetEx) {
buf = taosDecodeFixedU32(buf, &pSW->nCols);
if (isGetEx) {
pSW->pSchemaEx = (SSchemaEx *)taosMemoryMalloc(sizeof(SSchemaEx) * pSW->nCols);
for (int i = 0; i < pSW->nCols; i++) {
SSchemaEx *pSchema = pSW->pSchemaEx + i;
buf = taosDecodeFixedI8(buf, &pSchema->type);
buf = taosDecodeFixedI8(buf, &pSchema->sma);
buf = taosDecodeFixedI16(buf, &pSchema->colId);
buf = taosDecodeFixedI32(buf, &pSchema->bytes);
buf = taosDecodeStringTo(buf, pSchema->name);
}
} else {
pSW->pSchema = (SSchema *)taosMemoryMalloc(sizeof(SSchema) * pSW->nCols);
for (int i = 0; i < pSW->nCols; i++) {
SSchema *pSchema = pSW->pSchema + i;
buf = taosDecodeFixedI8(buf, &pSchema->type);
buf = taosSkipFixedLen(buf, sizeof(int8_t));
buf = taosDecodeFixedI16(buf, &pSchema->colId);
buf = taosDecodeFixedI32(buf, &pSchema->bytes);
buf = taosDecodeStringTo(buf, pSchema->name);
}
}
return buf;
}
static SMetaDB *metaNewDB() {
SMetaDB *pDB = NULL;
pDB = (SMetaDB *)taosMemoryCalloc(1, sizeof(*pDB));
......@@ -652,12 +701,16 @@ STSma *metaGetSmaInfoByIndex(SMeta *pMeta, int64_t indexUid) {
}
SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, bool isinline) {
return metaGetTableSchemaImpl(pMeta, uid, sver, isinline, false);
}
static SSchemaWrapper *metaGetTableSchemaImpl(SMeta *pMeta, tb_uid_t uid, int32_t sver, bool isinline, bool isGetEx) {
uint32_t nCols;
SSchemaWrapper *pSW = NULL;
SMetaDB *pDB = pMeta->pDB;
int ret;
void *pBuf;
SSchema *pSchema;
// SSchema *pSchema;
SSchemaKey schemaKey = {uid, sver, 0};
DBT key = {0};
DBT value = {0};
......@@ -678,7 +731,7 @@ SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, boo
// Decode the schema
pBuf = value.data;
pSW = taosMemoryMalloc(sizeof(*pSW));
metaDecodeSchema(pBuf, pSW);
metaDecodeSchemaEx(pBuf, pSW, isGetEx);
return pSW;
}
......@@ -755,7 +808,7 @@ char *metaTbCursorNext(SMTbCursor *pTbCur) {
STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver) {
STSchemaBuilder sb;
STSchema *pTSchema = NULL;
SSchema *pSchema;
SSchemaEx *pSchema;
SSchemaWrapper *pSW;
STbCfg *pTbCfg;
tb_uid_t quid;
......@@ -767,16 +820,16 @@ STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver) {
quid = uid;
}
pSW = metaGetTableSchema(pMeta, quid, sver, true);
pSW = metaGetTableSchemaImpl(pMeta, quid, sver, true, true);
if (pSW == NULL) {
return NULL;
}
// Rebuild a schema
tdInitTSchemaBuilder(&sb, 0);
for (int32_t i = 0; i < pSW->nCols; i++) {
pSchema = pSW->pSchema + i;
tdAddColToSchema(&sb, pSchema->type, pSchema->colId, pSchema->bytes);
for (int32_t i = 0; i < pSW->nCols; ++i) {
pSchema = pSW->pSchemaEx + i;
tdAddColToSchema(&sb, pSchema->type, pSchema->sma, pSchema->colId, pSchema->bytes);
}
pTSchema = tdGetSchemaFromBuilder(&sb);
tdDestroyTSchemaBuilder(&sb);
......
......@@ -46,6 +46,10 @@ static int metaEncodeTbInfo(void **buf, STbCfg *pTbCfg);
static void *metaDecodeTbInfo(void *buf, STbCfg *pTbCfg);
static int metaEncodeSchema(void **buf, SSchemaWrapper *pSW);
static void *metaDecodeSchema(void *buf, SSchemaWrapper *pSW);
static int metaEncodeSchemaEx(void **buf, SSchemaWrapper *pSW);
static void *metaDecodeSchemaEx(void *buf, SSchemaWrapper *pSW, bool isGetEx);
static SSchemaWrapper *metaGetTableSchemaImpl(SMeta *pMeta, tb_uid_t uid, int32_t sver, bool isinline, bool isGetEx);
static inline int metaUidCmpr(const void *arg1, int len1, const void *arg2, int len2) {
tb_uid_t uid1, uid2;
......@@ -228,7 +232,7 @@ int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg) {
schemaWrapper.pSchema = pTbCfg->ntbCfg.pSchema;
}
pVal = pBuf = buf;
metaEncodeSchema(&pBuf, &schemaWrapper);
metaEncodeSchemaEx(&pBuf, &schemaWrapper);
vLen = POINTER_DISTANCE(pBuf, buf);
ret = tdbDbInsert(pMetaDb->pSchemaDB, pKey, kLen, pVal, vLen);
if (ret < 0) {
......@@ -345,6 +349,10 @@ STbCfg *metaGetTbInfoByName(SMeta *pMeta, char *tbname, tb_uid_t *uid) {
}
SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, bool isinline) {
return *metaGetTableSchemaImpl(pMeta, uid, sver, isinline, false);
}
static SSchemaWrapper *metaGetTableSchemaImpl(SMeta *pMeta, tb_uid_t uid, int32_t sver, bool isinline, bool isGetEx) {
void *pKey;
void *pVal;
int kLen;
......@@ -368,7 +376,7 @@ SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, boo
// decode
pBuf = pVal;
pSchemaWrapper = taosMemoryMalloc(sizeof(*pSchemaWrapper));
metaDecodeSchema(pBuf, pSchemaWrapper);
metaDecodeSchemaEx(pBuf, pSchemaWrapper, isGetEx);
TDB_FREE(pVal);
......@@ -379,7 +387,7 @@ STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver) {
tb_uid_t quid;
SSchemaWrapper *pSW;
STSchemaBuilder sb;
SSchema *pSchema;
SSchemaEx *pSchema;
STSchema *pTSchema;
STbCfg *pTbCfg;
......@@ -390,15 +398,15 @@ STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver) {
quid = uid;
}
pSW = metaGetTableSchema(pMeta, quid, sver, true);
pSW = metaGetTableSchemaImpl(pMeta, quid, sver, true, true);
if (pSW == NULL) {
return NULL;
}
tdInitTSchemaBuilder(&sb, 0);
for (int i = 0; i < pSW->nCols; i++) {
pSchema = pSW->pSchema + i;
tdAddColToSchema(&sb, pSchema->type, pSchema->colId, pSchema->bytes);
pSchema = pSW->pSchemaEx + i;
tdAddColToSchema(&sb, pSchema->type, pSchema->sma, pSchema->colId, pSchema->bytes);
}
pTSchema = tdGetSchemaFromBuilder(&sb);
tdDestroyTSchemaBuilder(&sb);
......@@ -605,6 +613,50 @@ static void *metaDecodeSchema(void *buf, SSchemaWrapper *pSW) {
return buf;
}
static int metaEncodeSchemaEx(void **buf, SSchemaWrapper *pSW) {
int tlen = 0;
SSchemaEx *pSchema;
tlen += taosEncodeFixedU32(buf, pSW->nCols);
for (int i = 0; i < pSW->nCols; ++i) {
pSchema = pSW->pSchemaEx + i;
tlen += taosEncodeFixedI8(buf, pSchema->type);
tlen += taosEncodeFixedI8(buf, pSchema->sma);
tlen += taosEncodeFixedI16(buf, pSchema->colId);
tlen += taosEncodeFixedI32(buf, pSchema->bytes);
tlen += taosEncodeString(buf, pSchema->name);
}
return tlen;
}
static void *metaDecodeSchemaEx(void *buf, SSchemaWrapper *pSW, bool isGetEx) {
buf = taosDecodeFixedU32(buf, &pSW->nCols);
if (isGetEx) {
pSW->pSchemaEx = (SSchemaEx *)taosMemoryMalloc(sizeof(SSchemaEx) * pSW->nCols);
for (int i = 0; i < pSW->nCols; i++) {
SSchemaEx *pSchema = pSW->pSchemaEx + i;
buf = taosDecodeFixedI8(buf, &pSchema->type);
buf = taosDecodeFixedI8(buf, &pSchema->sma);
buf = taosDecodeFixedI16(buf, &pSchema->colId);
buf = taosDecodeFixedI32(buf, &pSchema->bytes);
buf = taosDecodeStringTo(buf, pSchema->name);
}
} else {
pSW->pSchema = (SSchema *)taosMemoryMalloc(sizeof(SSchema) * pSW->nCols);
for (int i = 0; i < pSW->nCols; i++) {
SSchema *pSchema = pSW->pSchema + i;
buf = taosDecodeFixedI8(buf, &pSchema->type);
buf = taosSkipFixedLen(buf, sizeof(int8_t));
buf = taosDecodeFixedI16(buf, &pSchema->colId);
buf = taosDecodeFixedI32(buf, &pSchema->bytes);
buf = taosDecodeStringTo(buf, pSchema->name);
}
}
return buf;
}
static int metaEncodeTbInfo(void **buf, STbCfg *pTbCfg) {
int tsize = 0;
......
......@@ -81,7 +81,6 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
// TODO: maybe need to clear the request struct
taosMemoryFree(vCreateTbReq.stbCfg.pSchema);
taosMemoryFree(vCreateTbReq.stbCfg.pTagSchema);
taosMemoryFree(vCreateTbReq.stbCfg.pBSmaCols);
taosMemoryFree(vCreateTbReq.stbCfg.pRSmaParam);
taosMemoryFree(vCreateTbReq.dbFName);
taosMemoryFree(vCreateTbReq.name);
......@@ -116,13 +115,11 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
if (pCreateTbReq->type == TD_SUPER_TABLE) {
taosMemoryFree(pCreateTbReq->stbCfg.pSchema);
taosMemoryFree(pCreateTbReq->stbCfg.pTagSchema);
taosMemoryFree(pCreateTbReq->stbCfg.pBSmaCols);
taosMemoryFree(pCreateTbReq->stbCfg.pRSmaParam);
} else if (pCreateTbReq->type == TD_CHILD_TABLE) {
taosMemoryFree(pCreateTbReq->ctbCfg.pTag);
} else {
taosMemoryFree(pCreateTbReq->ntbCfg.pSchema);
taosMemoryFree(pCreateTbReq->ntbCfg.pBSmaCols);
taosMemoryFree(pCreateTbReq->ntbCfg.pRSmaParam);
}
}
......@@ -150,7 +147,6 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
tDeserializeSVCreateTbReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vAlterTbReq);
taosMemoryFree(vAlterTbReq.stbCfg.pSchema);
taosMemoryFree(vAlterTbReq.stbCfg.pTagSchema);
taosMemoryFree(vAlterTbReq.stbCfg.pBSmaCols);
taosMemoryFree(vAlterTbReq.stbCfg.pRSmaParam);
taosMemoryFree(vAlterTbReq.dbFName);
taosMemoryFree(vAlterTbReq.name);
......
......@@ -2169,10 +2169,11 @@ typedef struct SVgroupTablesBatch {
char dbName[TSDB_DB_NAME_LEN];
} SVgroupTablesBatch;
static void toSchema(const SColumnDefNode* pCol, col_id_t colId, SSchema* pSchema) {
static void toSchema(const SColumnDefNode* pCol, col_id_t colId, SSchemaEx* pSchema) {
pSchema->colId = colId;
pSchema->type = pCol->dataType.type;
pSchema->bytes = calcTypeBytes(pCol->dataType);
pSchema->sma = TSDB_BSMA_TYPE_LATEST; // TODO: use default value currently, and use the real value later.
strcpy(pSchema->name, pCol->colName);
}
......@@ -2194,7 +2195,7 @@ static int32_t buildNormalTableBatchReq(int32_t acctId, const char* pDbName, con
req.dbFName = strdup(dbFName);
req.name = strdup(pTableName);
req.ntbCfg.nCols = LIST_LENGTH(pColumns);
req.ntbCfg.pSchema = taosMemoryCalloc(req.ntbCfg.nCols, sizeof(SSchema));
req.ntbCfg.pSchema = taosMemoryCalloc(req.ntbCfg.nCols, sizeof(SSchemaEx));
if (NULL == req.name || NULL == req.ntbCfg.pSchema) {
destroyCreateTbReq(&req);
return TSDB_CODE_OUT_OF_MEMORY;
......@@ -2205,6 +2206,7 @@ static int32_t buildNormalTableBatchReq(int32_t acctId, const char* pDbName, con
toSchema((SColumnDefNode*)pCol, index + 1, req.ntbCfg.pSchema + index);
++index;
}
// TODO: use the real sma for normal table.
pBatch->info = *pVgroupInfo;
strcpy(pBatch->dbName, pDbName);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册