提交 9329f5f0 编写于 作者: H Hongze Cheng

refact vnode

上级 1bc0c3cc
...@@ -253,22 +253,16 @@ typedef struct { ...@@ -253,22 +253,16 @@ typedef struct {
SSubmitRspBlock failedBlocks[]; SSubmitRspBlock failedBlocks[];
} SSubmitRsp; } SSubmitRsp;
#define SCHEMA_SMA_ON 0x1
#define SCHEMA_IDX_ON 0x2
typedef struct SSchema { typedef struct SSchema {
int8_t type; int8_t type;
int8_t index; // default is 0, not index created int8_t flags;
col_id_t colId; col_id_t colId;
int32_t bytes; int32_t bytes;
char name[TSDB_COL_NAME_LEN]; char name[TSDB_COL_NAME_LEN];
} SSchema; } SSchema;
typedef struct {
int8_t type;
int8_t sma; // ETsdbBSmaType and default is TSDB_BSMA_TYPE_I
col_id_t colId;
int32_t bytes;
char name[TSDB_COL_NAME_LEN];
} SSchemaEx;
#define SSCHMEA_TYPE(s) ((s)->type) #define SSCHMEA_TYPE(s) ((s)->type)
#define SSCHMEA_SMA(s) ((s)->sma) #define SSCHMEA_SMA(s) ((s)->sma)
#define SSCHMEA_COLID(s) ((s)->colId) #define SSCHMEA_COLID(s) ((s)->colId)
...@@ -1454,7 +1448,7 @@ typedef struct SVCreateTbReq { ...@@ -1454,7 +1448,7 @@ typedef struct SVCreateTbReq {
tb_uid_t suid; tb_uid_t suid;
col_id_t nCols; col_id_t nCols;
col_id_t nBSmaCols; col_id_t nBSmaCols;
SSchemaEx* pSchema; SSchema* pSchema;
col_id_t nTagCols; col_id_t nTagCols;
SSchema* pTagSchema; SSchema* pTagSchema;
SRSmaParam* pRSmaParam; SRSmaParam* pRSmaParam;
...@@ -1466,7 +1460,7 @@ typedef struct SVCreateTbReq { ...@@ -1466,7 +1460,7 @@ typedef struct SVCreateTbReq {
struct { struct {
col_id_t nCols; col_id_t nCols;
col_id_t nBSmaCols; col_id_t nBSmaCols;
SSchemaEx* pSchema; SSchema* pSchema;
SRSmaParam* pRSmaParam; SRSmaParam* pRSmaParam;
} ntbCfg; } ntbCfg;
}; };
...@@ -2031,16 +2025,13 @@ int32_t tDecodeSMqCMCommitOffsetReq(SCoder* decoder, SMqCMCommitOffsetReq* pReq) ...@@ -2031,16 +2025,13 @@ int32_t tDecodeSMqCMCommitOffsetReq(SCoder* decoder, SMqCMCommitOffsetReq* pReq)
typedef struct { typedef struct {
uint32_t nCols; uint32_t nCols;
union {
SSchema* pSchema; SSchema* pSchema;
SSchemaEx* pSchemaEx;
};
} SSchemaWrapper; } SSchemaWrapper;
static FORCE_INLINE int32_t taosEncodeSSchema(void** buf, const SSchema* pSchema) { static FORCE_INLINE int32_t taosEncodeSSchema(void** buf, const SSchema* pSchema) {
int32_t tlen = 0; int32_t tlen = 0;
tlen += taosEncodeFixedI8(buf, pSchema->type); tlen += taosEncodeFixedI8(buf, pSchema->type);
tlen += taosEncodeFixedI8(buf, pSchema->index); tlen += taosEncodeFixedI8(buf, pSchema->flags);
tlen += taosEncodeFixedI32(buf, pSchema->bytes); tlen += taosEncodeFixedI32(buf, pSchema->bytes);
tlen += taosEncodeFixedI16(buf, pSchema->colId); tlen += taosEncodeFixedI16(buf, pSchema->colId);
tlen += taosEncodeString(buf, pSchema->name); tlen += taosEncodeString(buf, pSchema->name);
...@@ -2049,7 +2040,7 @@ static FORCE_INLINE int32_t taosEncodeSSchema(void** buf, const SSchema* pSchema ...@@ -2049,7 +2040,7 @@ static FORCE_INLINE int32_t taosEncodeSSchema(void** buf, const SSchema* pSchema
static FORCE_INLINE void* taosDecodeSSchema(void* buf, SSchema* pSchema) { static FORCE_INLINE void* taosDecodeSSchema(void* buf, SSchema* pSchema) {
buf = taosDecodeFixedI8(buf, &pSchema->type); buf = taosDecodeFixedI8(buf, &pSchema->type);
buf = taosDecodeFixedI8(buf, &pSchema->index); buf = taosDecodeFixedI8(buf, &pSchema->flags);
buf = taosDecodeFixedI32(buf, &pSchema->bytes); buf = taosDecodeFixedI32(buf, &pSchema->bytes);
buf = taosDecodeFixedI16(buf, &pSchema->colId); buf = taosDecodeFixedI16(buf, &pSchema->colId);
buf = taosDecodeStringTo(buf, pSchema->name); buf = taosDecodeStringTo(buf, pSchema->name);
...@@ -2058,7 +2049,7 @@ static FORCE_INLINE void* taosDecodeSSchema(void* buf, SSchema* pSchema) { ...@@ -2058,7 +2049,7 @@ static FORCE_INLINE void* taosDecodeSSchema(void* buf, SSchema* pSchema) {
static FORCE_INLINE int32_t tEncodeSSchema(SCoder* pEncoder, const SSchema* pSchema) { static FORCE_INLINE int32_t tEncodeSSchema(SCoder* pEncoder, const SSchema* pSchema) {
if (tEncodeI8(pEncoder, pSchema->type) < 0) return -1; if (tEncodeI8(pEncoder, pSchema->type) < 0) return -1;
if (tEncodeI8(pEncoder, pSchema->index) < 0) return -1; if (tEncodeI8(pEncoder, pSchema->flags) < 0) return -1;
if (tEncodeI32(pEncoder, pSchema->bytes) < 0) return -1; if (tEncodeI32(pEncoder, pSchema->bytes) < 0) return -1;
if (tEncodeI16(pEncoder, pSchema->colId) < 0) return -1; if (tEncodeI16(pEncoder, pSchema->colId) < 0) return -1;
if (tEncodeCStr(pEncoder, pSchema->name) < 0) return -1; if (tEncodeCStr(pEncoder, pSchema->name) < 0) return -1;
...@@ -2067,7 +2058,7 @@ static FORCE_INLINE int32_t tEncodeSSchema(SCoder* pEncoder, const SSchema* pSch ...@@ -2067,7 +2058,7 @@ static FORCE_INLINE int32_t tEncodeSSchema(SCoder* pEncoder, const SSchema* pSch
static FORCE_INLINE int32_t tDecodeSSchema(SCoder* pDecoder, SSchema* pSchema) { static FORCE_INLINE int32_t tDecodeSSchema(SCoder* pDecoder, SSchema* pSchema) {
if (tDecodeI8(pDecoder, &pSchema->type) < 0) return -1; if (tDecodeI8(pDecoder, &pSchema->type) < 0) return -1;
if (tDecodeI8(pDecoder, &pSchema->index) < 0) return -1; if (tDecodeI8(pDecoder, &pSchema->flags) < 0) return -1;
if (tDecodeI32(pDecoder, &pSchema->bytes) < 0) return -1; if (tDecodeI32(pDecoder, &pSchema->bytes) < 0) return -1;
if (tDecodeI16(pDecoder, &pSchema->colId) < 0) return -1; if (tDecodeI16(pDecoder, &pSchema->colId) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pSchema->name) < 0) return -1; if (tDecodeCStrTo(pDecoder, pSchema->name) < 0) return -1;
......
...@@ -411,7 +411,7 @@ int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) { ...@@ -411,7 +411,7 @@ int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) {
tlen += taosEncodeFixedI16(buf, pReq->stbCfg.nBSmaCols); tlen += taosEncodeFixedI16(buf, pReq->stbCfg.nBSmaCols);
for (col_id_t i = 0; i < pReq->stbCfg.nCols; ++i) { for (col_id_t i = 0; i < pReq->stbCfg.nCols; ++i) {
tlen += taosEncodeFixedI8(buf, pReq->stbCfg.pSchema[i].type); tlen += taosEncodeFixedI8(buf, pReq->stbCfg.pSchema[i].type);
tlen += taosEncodeFixedI8(buf, pReq->stbCfg.pSchema[i].sma); tlen += taosEncodeFixedI8(buf, pReq->stbCfg.pSchema[i].flags);
tlen += taosEncodeFixedI16(buf, pReq->stbCfg.pSchema[i].colId); tlen += taosEncodeFixedI16(buf, pReq->stbCfg.pSchema[i].colId);
tlen += taosEncodeFixedI32(buf, pReq->stbCfg.pSchema[i].bytes); tlen += taosEncodeFixedI32(buf, pReq->stbCfg.pSchema[i].bytes);
tlen += taosEncodeString(buf, pReq->stbCfg.pSchema[i].name); tlen += taosEncodeString(buf, pReq->stbCfg.pSchema[i].name);
...@@ -419,7 +419,7 @@ int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) { ...@@ -419,7 +419,7 @@ int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) {
tlen += taosEncodeFixedI16(buf, pReq->stbCfg.nTagCols); tlen += taosEncodeFixedI16(buf, pReq->stbCfg.nTagCols);
for (col_id_t i = 0; i < pReq->stbCfg.nTagCols; ++i) { for (col_id_t i = 0; i < pReq->stbCfg.nTagCols; ++i) {
tlen += taosEncodeFixedI8(buf, pReq->stbCfg.pTagSchema[i].type); tlen += taosEncodeFixedI8(buf, pReq->stbCfg.pTagSchema[i].type);
tlen += taosEncodeFixedI8(buf, pReq->stbCfg.pTagSchema[i].index); tlen += taosEncodeFixedI8(buf, pReq->stbCfg.pTagSchema[i].flags);
tlen += taosEncodeFixedI16(buf, pReq->stbCfg.pTagSchema[i].colId); tlen += taosEncodeFixedI16(buf, pReq->stbCfg.pTagSchema[i].colId);
tlen += taosEncodeFixedI32(buf, pReq->stbCfg.pTagSchema[i].bytes); tlen += taosEncodeFixedI32(buf, pReq->stbCfg.pTagSchema[i].bytes);
tlen += taosEncodeString(buf, pReq->stbCfg.pTagSchema[i].name); tlen += taosEncodeString(buf, pReq->stbCfg.pTagSchema[i].name);
...@@ -443,7 +443,7 @@ int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) { ...@@ -443,7 +443,7 @@ int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) {
tlen += taosEncodeFixedI16(buf, pReq->ntbCfg.nBSmaCols); tlen += taosEncodeFixedI16(buf, pReq->ntbCfg.nBSmaCols);
for (col_id_t i = 0; i < pReq->ntbCfg.nCols; ++i) { for (col_id_t i = 0; i < pReq->ntbCfg.nCols; ++i) {
tlen += taosEncodeFixedI8(buf, pReq->ntbCfg.pSchema[i].type); tlen += taosEncodeFixedI8(buf, pReq->ntbCfg.pSchema[i].type);
tlen += taosEncodeFixedI8(buf, pReq->ntbCfg.pSchema[i].sma); tlen += taosEncodeFixedI8(buf, pReq->ntbCfg.pSchema[i].flags);
tlen += taosEncodeFixedI16(buf, pReq->ntbCfg.pSchema[i].colId); tlen += taosEncodeFixedI16(buf, pReq->ntbCfg.pSchema[i].colId);
tlen += taosEncodeFixedI32(buf, pReq->ntbCfg.pSchema[i].bytes); tlen += taosEncodeFixedI32(buf, pReq->ntbCfg.pSchema[i].bytes);
tlen += taosEncodeString(buf, pReq->ntbCfg.pSchema[i].name); tlen += taosEncodeString(buf, pReq->ntbCfg.pSchema[i].name);
...@@ -478,10 +478,10 @@ void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) { ...@@ -478,10 +478,10 @@ void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) {
buf = taosDecodeFixedI64(buf, &(pReq->stbCfg.suid)); buf = taosDecodeFixedI64(buf, &(pReq->stbCfg.suid));
buf = taosDecodeFixedI16(buf, &(pReq->stbCfg.nCols)); buf = taosDecodeFixedI16(buf, &(pReq->stbCfg.nCols));
buf = taosDecodeFixedI16(buf, &(pReq->stbCfg.nBSmaCols)); buf = taosDecodeFixedI16(buf, &(pReq->stbCfg.nBSmaCols));
pReq->stbCfg.pSchema = (SSchemaEx *)taosMemoryMalloc(pReq->stbCfg.nCols * sizeof(SSchemaEx)); pReq->stbCfg.pSchema = (SSchema *)taosMemoryMalloc(pReq->stbCfg.nCols * sizeof(SSchema));
for (col_id_t i = 0; i < pReq->stbCfg.nCols; ++i) { for (col_id_t i = 0; i < pReq->stbCfg.nCols; ++i) {
buf = taosDecodeFixedI8(buf, &(pReq->stbCfg.pSchema[i].type)); buf = taosDecodeFixedI8(buf, &(pReq->stbCfg.pSchema[i].type));
buf = taosDecodeFixedI8(buf, &(pReq->stbCfg.pSchema[i].sma)); buf = taosDecodeFixedI8(buf, &(pReq->stbCfg.pSchema[i].flags));
buf = taosDecodeFixedI16(buf, &(pReq->stbCfg.pSchema[i].colId)); buf = taosDecodeFixedI16(buf, &(pReq->stbCfg.pSchema[i].colId));
buf = taosDecodeFixedI32(buf, &(pReq->stbCfg.pSchema[i].bytes)); buf = taosDecodeFixedI32(buf, &(pReq->stbCfg.pSchema[i].bytes));
buf = taosDecodeStringTo(buf, pReq->stbCfg.pSchema[i].name); buf = taosDecodeStringTo(buf, pReq->stbCfg.pSchema[i].name);
...@@ -490,7 +490,7 @@ void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) { ...@@ -490,7 +490,7 @@ void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) {
pReq->stbCfg.pTagSchema = (SSchema *)taosMemoryMalloc(pReq->stbCfg.nTagCols * sizeof(SSchema)); pReq->stbCfg.pTagSchema = (SSchema *)taosMemoryMalloc(pReq->stbCfg.nTagCols * sizeof(SSchema));
for (col_id_t i = 0; i < pReq->stbCfg.nTagCols; ++i) { for (col_id_t i = 0; i < pReq->stbCfg.nTagCols; ++i) {
buf = taosDecodeFixedI8(buf, &(pReq->stbCfg.pTagSchema[i].type)); buf = taosDecodeFixedI8(buf, &(pReq->stbCfg.pTagSchema[i].type));
buf = taosDecodeFixedI8(buf, &(pReq->stbCfg.pTagSchema[i].index)); buf = taosDecodeFixedI8(buf, &(pReq->stbCfg.pTagSchema[i].flags));
buf = taosDecodeFixedI16(buf, &pReq->stbCfg.pTagSchema[i].colId); buf = taosDecodeFixedI16(buf, &pReq->stbCfg.pTagSchema[i].colId);
buf = taosDecodeFixedI32(buf, &pReq->stbCfg.pTagSchema[i].bytes); buf = taosDecodeFixedI32(buf, &pReq->stbCfg.pTagSchema[i].bytes);
buf = taosDecodeStringTo(buf, pReq->stbCfg.pTagSchema[i].name); buf = taosDecodeStringTo(buf, pReq->stbCfg.pTagSchema[i].name);
...@@ -520,10 +520,10 @@ void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) { ...@@ -520,10 +520,10 @@ void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) {
case TD_NORMAL_TABLE: case TD_NORMAL_TABLE:
buf = taosDecodeFixedI16(buf, &pReq->ntbCfg.nCols); buf = taosDecodeFixedI16(buf, &pReq->ntbCfg.nCols);
buf = taosDecodeFixedI16(buf, &(pReq->ntbCfg.nBSmaCols)); buf = taosDecodeFixedI16(buf, &(pReq->ntbCfg.nBSmaCols));
pReq->ntbCfg.pSchema = (SSchemaEx *)taosMemoryMalloc(pReq->ntbCfg.nCols * sizeof(SSchemaEx)); pReq->ntbCfg.pSchema = (SSchema *)taosMemoryMalloc(pReq->ntbCfg.nCols * sizeof(SSchema));
for (col_id_t i = 0; i < pReq->ntbCfg.nCols; ++i) { for (col_id_t i = 0; i < pReq->ntbCfg.nCols; ++i) {
buf = taosDecodeFixedI8(buf, &pReq->ntbCfg.pSchema[i].type); buf = taosDecodeFixedI8(buf, &pReq->ntbCfg.pSchema[i].type);
buf = taosDecodeFixedI8(buf, &pReq->ntbCfg.pSchema[i].sma); buf = taosDecodeFixedI8(buf, &pReq->ntbCfg.pSchema[i].flags);
buf = taosDecodeFixedI16(buf, &pReq->ntbCfg.pSchema[i].colId); buf = taosDecodeFixedI16(buf, &pReq->ntbCfg.pSchema[i].colId);
buf = taosDecodeFixedI32(buf, &pReq->ntbCfg.pSchema[i].bytes); buf = taosDecodeFixedI32(buf, &pReq->ntbCfg.pSchema[i].bytes);
buf = taosDecodeStringTo(buf, pReq->ntbCfg.pSchema[i].name); buf = taosDecodeStringTo(buf, pReq->ntbCfg.pSchema[i].name);
......
aux_source_directory(. DND_VNODE_TEST_SRC) # aux_source_directory(. DND_VNODE_TEST_SRC)
add_executable(dvnodeTest ${DND_VNODE_TEST_SRC}) # add_executable(dvnodeTest ${DND_VNODE_TEST_SRC})
target_link_libraries( # target_link_libraries(
dvnodeTest # dvnodeTest
PUBLIC sut # PUBLIC sut
) # )
add_test( # add_test(
NAME dvnodeTest # NAME dvnodeTest
COMMAND dvnodeTest # COMMAND dvnodeTest
) # )
...@@ -18,8 +18,8 @@ ...@@ -18,8 +18,8 @@
#include "mndDb.h" #include "mndDb.h"
#include "mndDnode.h" #include "mndDnode.h"
#include "mndInfoSchema.h" #include "mndInfoSchema.h"
#include "mndPerfSchema.h"
#include "mndMnode.h" #include "mndMnode.h"
#include "mndPerfSchema.h"
#include "mndShow.h" #include "mndShow.h"
#include "mndTrans.h" #include "mndTrans.h"
#include "mndUser.h" #include "mndUser.h"
...@@ -40,7 +40,7 @@ static int32_t mndProcessVCreateStbRsp(SNodeMsg *pRsp); ...@@ -40,7 +40,7 @@ static int32_t mndProcessVCreateStbRsp(SNodeMsg *pRsp);
static int32_t mndProcessVAlterStbRsp(SNodeMsg *pRsp); static int32_t mndProcessVAlterStbRsp(SNodeMsg *pRsp);
static int32_t mndProcessVDropStbRsp(SNodeMsg *pRsp); static int32_t mndProcessVDropStbRsp(SNodeMsg *pRsp);
static int32_t mndProcessTableMetaReq(SNodeMsg *pReq); static int32_t mndProcessTableMetaReq(SNodeMsg *pReq);
static int32_t mndRetrieveStb(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock* pBlock, int32_t rows); static int32_t mndRetrieveStb(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void mndCancelGetNextStb(SMnode *pMnode, void *pIter); static void mndCancelGetNextStb(SMnode *pMnode, void *pIter);
int32_t mndInitStb(SMnode *pMnode) { int32_t mndInitStb(SMnode *pMnode) {
...@@ -333,9 +333,9 @@ static SDbObj *mndAcquireDbByStb(SMnode *pMnode, const char *stbName) { ...@@ -333,9 +333,9 @@ static SDbObj *mndAcquireDbByStb(SMnode *pMnode, const char *stbName) {
} }
static FORCE_INLINE int schemaExColIdCompare(const void *colId, const void *pSchema) { static FORCE_INLINE int schemaExColIdCompare(const void *colId, const void *pSchema) {
if (*(col_id_t *)colId < ((SSchemaEx *)pSchema)->colId) { if (*(col_id_t *)colId < ((SSchema *)pSchema)->colId) {
return -1; return -1;
} else if (*(col_id_t *)colId > ((SSchemaEx *)pSchema)->colId) { } else if (*(col_id_t *)colId > ((SSchema *)pSchema)->colId) {
return 1; return 1;
} }
return 0; return 0;
...@@ -360,49 +360,15 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt ...@@ -360,49 +360,15 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt
req.stbCfg.nTagCols = pStb->numOfTags; req.stbCfg.nTagCols = pStb->numOfTags;
req.stbCfg.pTagSchema = pStb->pTags; req.stbCfg.pTagSchema = pStb->pTags;
req.stbCfg.nBSmaCols = pStb->numOfSmas; req.stbCfg.nBSmaCols = pStb->numOfSmas;
req.stbCfg.pSchema = (SSchemaEx *)taosMemoryCalloc(pStb->numOfColumns, sizeof(SSchemaEx)); req.stbCfg.pSchema = (SSchema *)taosMemoryCalloc(pStb->numOfColumns, sizeof(SSchema));
if (req.stbCfg.pSchema == NULL) { if (req.stbCfg.pSchema == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
} }
int bSmaStat = 0; // no column has bsma memcpy(req.stbCfg.pSchema, pStb->pColumns, sizeof(SSchema) * pStb->numOfColumns);
if (pStb->numOfSmas == pStb->numOfColumns) { // assume pColumns > 0 for (int i = 0; i < pStb->numOfColumns; i++) {
bSmaStat = 1; // all columns have bsma req.stbCfg.pSchema[i].flags = SCHEMA_SMA_ON;
} else if (pStb->numOfSmas != 0) {
bSmaStat = 2; // partial columns have bsma
TASSERT(pStb->pSmas != NULL); // TODO: remove the assert
}
for (int32_t i = 0; i < req.stbCfg.nCols; ++i) {
SSchemaEx *pSchemaEx = req.stbCfg.pSchema + i;
SSchema *pSchema = pStb->pColumns + i;
pSchemaEx->type = pSchema->type;
pSchemaEx->sma = (bSmaStat == 1) ? TSDB_BSMA_TYPE_LATEST : TSDB_BSMA_TYPE_NONE;
pSchemaEx->colId = pSchema->colId;
pSchemaEx->bytes = pSchema->bytes;
memcpy(pSchemaEx->name, pSchema->name, TSDB_COL_NAME_LEN);
}
if (bSmaStat == 2) {
if (pStb->pSmas == NULL) {
mError("stb:%s, sma options is empty", pStb->name);
taosMemoryFreeClear(req.stbCfg.pSchema);
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
return NULL;
}
for (int32_t i = 0; i < pStb->numOfSmas; ++i) {
SSchema *pSmaSchema = pStb->pSmas + i;
SSchemaEx *pColSchema = taosbsearch(&pSmaSchema->colId, req.stbCfg.pSchema, req.stbCfg.nCols, sizeof(SSchemaEx),
schemaExColIdCompare, TD_EQ);
if (pColSchema == NULL) {
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
taosMemoryFreeClear(req.stbCfg.pSchema);
mError("stb:%s, sma col:%s not found in columns", pStb->name, pSmaSchema->name);
return NULL;
}
pColSchema->sma = TSDB_BSMA_TYPE_LATEST;
}
} }
SRSmaParam *pRSmaParam = NULL; SRSmaParam *pRSmaParam = NULL;
...@@ -1644,14 +1610,14 @@ static void mndExtractTableName(char *tableId, char *name) { ...@@ -1644,14 +1610,14 @@ static void mndExtractTableName(char *tableId, char *name) {
} }
} }
static int32_t mndRetrieveStb(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock* pBlock, int32_t rows) { static int32_t mndRetrieveStb(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
SMnode *pMnode = pReq->pNode; SMnode *pMnode = pReq->pNode;
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
int32_t numOfRows = 0; int32_t numOfRows = 0;
SStbObj *pStb = NULL; SStbObj *pStb = NULL;
int32_t cols = 0; int32_t cols = 0;
SDbObj* pDb = NULL; SDbObj *pDb = NULL;
if (strlen(pShow->db) > 0) { if (strlen(pShow->db) > 0) {
pDb = mndAcquireDb(pMnode, pShow->db); pDb = mndAcquireDb(pMnode, pShow->db);
if (pDb == NULL) return terrno; if (pDb == NULL) return terrno;
...@@ -1673,16 +1639,16 @@ static int32_t mndRetrieveStb(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock* pBlo ...@@ -1673,16 +1639,16 @@ static int32_t mndRetrieveStb(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock* pBlo
mndExtractTableName(pStb->name, &stbName[VARSTR_HEADER_SIZE]); mndExtractTableName(pStb->name, &stbName[VARSTR_HEADER_SIZE]);
varDataSetLen(stbName, strlen(&stbName[VARSTR_HEADER_SIZE])); varDataSetLen(stbName, strlen(&stbName[VARSTR_HEADER_SIZE]));
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char*) stbName, false); colDataAppend(pColInfo, numOfRows, (const char *)stbName, false);
char db[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; char db[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
tNameFromString(&name, pStb->db, T_NAME_ACCT|T_NAME_DB); tNameFromString(&name, pStb->db, T_NAME_ACCT | T_NAME_DB);
tNameGetDbName(&name, varDataVal(db)); tNameGetDbName(&name, varDataVal(db));
varDataSetLen(db, strlen(varDataVal(db))); varDataSetLen(db, strlen(varDataVal(db)));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char*) db, false); colDataAppend(pColInfo, numOfRows, (const char *)db, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pStb->createdTime, false); colDataAppend(pColInfo, numOfRows, (const char *)&pStb->createdTime, false);
...@@ -1696,7 +1662,7 @@ static int32_t mndRetrieveStb(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock* pBlo ...@@ -1696,7 +1662,7 @@ static int32_t mndRetrieveStb(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock* pBlo
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pStb->updateTime, false); // number of tables colDataAppend(pColInfo, numOfRows, (const char *)&pStb->updateTime, false); // number of tables
char* p = taosMemoryMalloc(pStb->commentLen + VARSTR_HEADER_SIZE); // check malloc failures char *p = taosMemoryMalloc(pStb->commentLen + VARSTR_HEADER_SIZE); // check malloc failures
if (p != NULL) { if (p != NULL) {
if (pStb->commentLen != 0) { if (pStb->commentLen != 0) {
STR_TO_VARSTR(p, pStb->comment); STR_TO_VARSTR(p, pStb->comment);
......
...@@ -299,10 +299,10 @@ int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg) { ...@@ -299,10 +299,10 @@ int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg) {
if (pTbCfg->type == META_SUPER_TABLE) { if (pTbCfg->type == META_SUPER_TABLE) {
schemaWrapper.nCols = pTbCfg->stbCfg.nCols; schemaWrapper.nCols = pTbCfg->stbCfg.nCols;
schemaWrapper.pSchemaEx = pTbCfg->stbCfg.pSchema; schemaWrapper.pSchema = pTbCfg->stbCfg.pSchema;
} else { } else {
schemaWrapper.nCols = pTbCfg->ntbCfg.nCols; schemaWrapper.nCols = pTbCfg->ntbCfg.nCols;
schemaWrapper.pSchemaEx = pTbCfg->ntbCfg.pSchema; schemaWrapper.pSchema = pTbCfg->ntbCfg.pSchema;
} }
pVal = pBuf = buf; pVal = pBuf = buf;
metaEncodeSchemaEx(&pBuf, &schemaWrapper); metaEncodeSchemaEx(&pBuf, &schemaWrapper);
...@@ -464,7 +464,7 @@ STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver) { ...@@ -464,7 +464,7 @@ STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver) {
tb_uid_t quid; tb_uid_t quid;
SSchemaWrapper *pSW; SSchemaWrapper *pSW;
STSchemaBuilder sb; STSchemaBuilder sb;
SSchemaEx *pSchema; SSchema *pSchema;
STSchema *pTSchema; STSchema *pTSchema;
STbCfg *pTbCfg; STbCfg *pTbCfg;
...@@ -482,8 +482,8 @@ STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver) { ...@@ -482,8 +482,8 @@ STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver) {
tdInitTSchemaBuilder(&sb, 0); tdInitTSchemaBuilder(&sb, 0);
for (int i = 0; i < pSW->nCols; i++) { for (int i = 0; i < pSW->nCols; i++) {
pSchema = pSW->pSchemaEx + i; pSchema = pSW->pSchema + i;
tdAddColToSchema(&sb, pSchema->type, pSchema->sma, pSchema->colId, pSchema->bytes); tdAddColToSchema(&sb, pSchema->type, pSchema->flags, pSchema->colId, pSchema->bytes);
} }
pTSchema = tdGetSchemaFromBuilder(&sb); pTSchema = tdGetSchemaFromBuilder(&sb);
tdDestroyTSchemaBuilder(&sb); tdDestroyTSchemaBuilder(&sb);
...@@ -939,7 +939,7 @@ static int metaEncodeSchema(void **buf, SSchemaWrapper *pSW) { ...@@ -939,7 +939,7 @@ static int metaEncodeSchema(void **buf, SSchemaWrapper *pSW) {
for (int i = 0; i < pSW->nCols; i++) { for (int i = 0; i < pSW->nCols; i++) {
pSchema = pSW->pSchema + i; pSchema = pSW->pSchema + i;
tlen += taosEncodeFixedI8(buf, pSchema->type); tlen += taosEncodeFixedI8(buf, pSchema->type);
tlen += taosEncodeFixedI8(buf, pSchema->index); tlen += taosEncodeFixedI8(buf, pSchema->flags);
tlen += taosEncodeFixedI16(buf, pSchema->colId); tlen += taosEncodeFixedI16(buf, pSchema->colId);
tlen += taosEncodeFixedI32(buf, pSchema->bytes); tlen += taosEncodeFixedI32(buf, pSchema->bytes);
tlen += taosEncodeString(buf, pSchema->name); tlen += taosEncodeString(buf, pSchema->name);
...@@ -967,13 +967,13 @@ static void *metaDecodeSchema(void *buf, SSchemaWrapper *pSW) { ...@@ -967,13 +967,13 @@ static void *metaDecodeSchema(void *buf, SSchemaWrapper *pSW) {
static int metaEncodeSchemaEx(void **buf, SSchemaWrapper *pSW) { static int metaEncodeSchemaEx(void **buf, SSchemaWrapper *pSW) {
int tlen = 0; int tlen = 0;
SSchemaEx *pSchema; SSchema *pSchema;
tlen += taosEncodeFixedU32(buf, pSW->nCols); tlen += taosEncodeFixedU32(buf, pSW->nCols);
for (int i = 0; i < pSW->nCols; ++i) { for (int i = 0; i < pSW->nCols; ++i) {
pSchema = pSW->pSchemaEx + i; pSchema = pSW->pSchema + i;
tlen += taosEncodeFixedI8(buf, pSchema->type); tlen += taosEncodeFixedI8(buf, pSchema->type);
tlen += taosEncodeFixedI8(buf, pSchema->sma); tlen += taosEncodeFixedI8(buf, pSchema->flags);
tlen += taosEncodeFixedI16(buf, pSchema->colId); tlen += taosEncodeFixedI16(buf, pSchema->colId);
tlen += taosEncodeFixedI32(buf, pSchema->bytes); tlen += taosEncodeFixedI32(buf, pSchema->bytes);
tlen += taosEncodeString(buf, pSchema->name); tlen += taosEncodeString(buf, pSchema->name);
...@@ -985,11 +985,11 @@ static int metaEncodeSchemaEx(void **buf, SSchemaWrapper *pSW) { ...@@ -985,11 +985,11 @@ static int metaEncodeSchemaEx(void **buf, SSchemaWrapper *pSW) {
static void *metaDecodeSchemaEx(void *buf, SSchemaWrapper *pSW, bool isGetEx) { static void *metaDecodeSchemaEx(void *buf, SSchemaWrapper *pSW, bool isGetEx) {
buf = taosDecodeFixedU32(buf, &pSW->nCols); buf = taosDecodeFixedU32(buf, &pSW->nCols);
if (isGetEx) { if (isGetEx) {
pSW->pSchemaEx = (SSchemaEx *)taosMemoryMalloc(sizeof(SSchemaEx) * pSW->nCols); pSW->pSchema = (SSchema *)taosMemoryMalloc(sizeof(SSchema) * pSW->nCols);
for (int i = 0; i < pSW->nCols; i++) { for (int i = 0; i < pSW->nCols; i++) {
SSchemaEx *pSchema = pSW->pSchemaEx + i; SSchema *pSchema = pSW->pSchema + i;
buf = taosDecodeFixedI8(buf, &pSchema->type); buf = taosDecodeFixedI8(buf, &pSchema->type);
buf = taosDecodeFixedI8(buf, &pSchema->sma); buf = taosDecodeFixedI8(buf, &pSchema->flags);
buf = taosDecodeFixedI16(buf, &pSchema->colId); buf = taosDecodeFixedI16(buf, &pSchema->colId);
buf = taosDecodeFixedI32(buf, &pSchema->bytes); buf = taosDecodeFixedI32(buf, &pSchema->bytes);
buf = taosDecodeStringTo(buf, pSchema->name); buf = taosDecodeStringTo(buf, pSchema->name);
......
...@@ -495,7 +495,7 @@ static EDealRes translateOperator(STranslateContext* pCxt, SOperatorNode* pOp) { ...@@ -495,7 +495,7 @@ static EDealRes translateOperator(STranslateContext* pCxt, SOperatorNode* pOp) {
if ((TSDB_DATA_TYPE_TIMESTAMP == ldt.type && IS_INTEGER_TYPE(rdt.type)) || if ((TSDB_DATA_TYPE_TIMESTAMP == ldt.type && IS_INTEGER_TYPE(rdt.type)) ||
(TSDB_DATA_TYPE_TIMESTAMP == rdt.type && IS_INTEGER_TYPE(ldt.type)) || (TSDB_DATA_TYPE_TIMESTAMP == rdt.type && IS_INTEGER_TYPE(ldt.type)) ||
(TSDB_DATA_TYPE_TIMESTAMP == ldt.type && TSDB_DATA_TYPE_BOOL == rdt.type) || (TSDB_DATA_TYPE_TIMESTAMP == ldt.type && TSDB_DATA_TYPE_BOOL == rdt.type) ||
(TSDB_DATA_TYPE_TIMESTAMP == rdt.type && TSDB_DATA_TYPE_BOOL == ldt.type) ) { (TSDB_DATA_TYPE_TIMESTAMP == rdt.type && TSDB_DATA_TYPE_BOOL == ldt.type)) {
pOp->node.resType.type = TSDB_DATA_TYPE_TIMESTAMP; pOp->node.resType.type = TSDB_DATA_TYPE_TIMESTAMP;
pOp->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes; pOp->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes;
} else { } else {
...@@ -835,7 +835,8 @@ static SNode* createMultiResFunc(SFunctionNode* pSrcFunc, SExprNode* pExpr) { ...@@ -835,7 +835,8 @@ static SNode* createMultiResFunc(SFunctionNode* pSrcFunc, SExprNode* pExpr) {
return NULL; return NULL;
} }
pFunc->pParameterList = nodesMakeList(); pFunc->pParameterList = nodesMakeList();
if (NULL == pFunc->pParameterList || TSDB_CODE_SUCCESS != nodesListStrictAppend(pFunc->pParameterList, nodesCloneNode(pExpr))) { if (NULL == pFunc->pParameterList ||
TSDB_CODE_SUCCESS != nodesListStrictAppend(pFunc->pParameterList, nodesCloneNode(pExpr))) {
nodesDestroyNode(pFunc); nodesDestroyNode(pFunc);
return NULL; return NULL;
} }
...@@ -886,11 +887,13 @@ static int32_t createTableAllCols(STranslateContext* pCxt, SColumnNode* pCol, SN ...@@ -886,11 +887,13 @@ static int32_t createTableAllCols(STranslateContext* pCxt, SColumnNode* pCol, SN
} }
static bool isStar(SNode* pNode) { static bool isStar(SNode* pNode) {
return (QUERY_NODE_COLUMN == nodeType(pNode)) && ('\0' == ((SColumnNode*)pNode)->tableAlias[0]) && (0 == strcmp(((SColumnNode*)pNode)->colName, "*")); return (QUERY_NODE_COLUMN == nodeType(pNode)) && ('\0' == ((SColumnNode*)pNode)->tableAlias[0]) &&
(0 == strcmp(((SColumnNode*)pNode)->colName, "*"));
} }
static bool isTableStar(SNode* pNode) { static bool isTableStar(SNode* pNode) {
return (QUERY_NODE_COLUMN == nodeType(pNode)) && ('\0' != ((SColumnNode*)pNode)->tableAlias[0]) && (0 == strcmp(((SColumnNode*)pNode)->colName, "*")); return (QUERY_NODE_COLUMN == nodeType(pNode)) && ('\0' != ((SColumnNode*)pNode)->tableAlias[0]) &&
(0 == strcmp(((SColumnNode*)pNode)->colName, "*"));
} }
static int32_t createMultiResFuncsParas(STranslateContext* pCxt, SNodeList* pSrcParas, SNodeList** pOutput) { static int32_t createMultiResFuncsParas(STranslateContext* pCxt, SNodeList* pSrcParas, SNodeList** pOutput) {
...@@ -1125,9 +1128,7 @@ static int32_t translateGroupBy(STranslateContext* pCxt, SSelectStmt* pSelect) { ...@@ -1125,9 +1128,7 @@ static int32_t translateGroupBy(STranslateContext* pCxt, SSelectStmt* pSelect) {
return translateExprList(pCxt, pSelect->pGroupByList); return translateExprList(pCxt, pSelect->pGroupByList);
} }
static bool isValTimeUnit(char unit) { static bool isValTimeUnit(char unit) { return ('n' == unit || 'y' == unit); }
return ('n' == unit || 'y' == unit);
}
static int64_t getMonthsFromTimeVal(int64_t val, int32_t fromPrecision, char unit) { static int64_t getMonthsFromTimeVal(int64_t val, int32_t fromPrecision, char unit) {
int64_t days = convertTimeFromPrecisionToUnit(val, fromPrecision, 'd'); int64_t days = convertTimeFromPrecisionToUnit(val, fromPrecision, 'd');
...@@ -1171,7 +1172,8 @@ static int32_t checkIntervalWindow(STranslateContext* pCxt, SIntervalWindowNode* ...@@ -1171,7 +1172,8 @@ static int32_t checkIntervalWindow(STranslateContext* pCxt, SIntervalWindowNode*
} }
bool fixed = !isValTimeUnit(pOffset->unit) && !valInter; bool fixed = !isValTimeUnit(pOffset->unit) && !valInter;
if ((fixed && pOffset->datum.i >= pInter->datum.i) || if ((fixed && pOffset->datum.i >= pInter->datum.i) ||
(!fixed && getMonthsFromTimeVal(pOffset->datum.i, precision, pOffset->unit) >= getMonthsFromTimeVal(pInter->datum.i, precision, pInter->unit))) { (!fixed && getMonthsFromTimeVal(pOffset->datum.i, precision, pOffset->unit) >=
getMonthsFromTimeVal(pInter->datum.i, precision, pInter->unit))) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INTER_OFFSET_TOO_BIG); return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INTER_OFFSET_TOO_BIG);
} }
} }
...@@ -1364,7 +1366,8 @@ static int32_t checkRangeOption(STranslateContext* pCxt, const char* pName, SVal ...@@ -1364,7 +1366,8 @@ static int32_t checkRangeOption(STranslateContext* pCxt, const char* pName, SVal
if (DEAL_RES_ERROR == translateValue(pCxt, pVal)) { if (DEAL_RES_ERROR == translateValue(pCxt, pVal)) {
return pCxt->errCode; return pCxt->errCode;
} }
if (pVal->isDuration && (TIME_UNIT_MINUTE != pVal->unit && TIME_UNIT_HOUR != pVal->unit && TIME_UNIT_DAY != pVal->unit)) { if (pVal->isDuration &&
(TIME_UNIT_MINUTE != pVal->unit && TIME_UNIT_HOUR != pVal->unit && TIME_UNIT_DAY != pVal->unit)) {
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_INVALID_OPTION_UNIT, pName, pVal->unit); return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_INVALID_OPTION_UNIT, pName, pVal->unit);
} }
int64_t val = getBigintFromValueNode(pVal); int64_t val = getBigintFromValueNode(pVal);
...@@ -1459,9 +1462,12 @@ static int32_t checkKeepOption(STranslateContext* pCxt, SNodeList* pKeep) { ...@@ -1459,9 +1462,12 @@ static int32_t checkKeepOption(STranslateContext* pCxt, SNodeList* pKeep) {
SValueNode* pKeep0 = (SValueNode*)nodesListGetNode(pKeep, 0); SValueNode* pKeep0 = (SValueNode*)nodesListGetNode(pKeep, 0);
SValueNode* pKeep1 = (SValueNode*)nodesListGetNode(pKeep, 1); SValueNode* pKeep1 = (SValueNode*)nodesListGetNode(pKeep, 1);
SValueNode* pKeep2 = (SValueNode*)nodesListGetNode(pKeep, 2); SValueNode* pKeep2 = (SValueNode*)nodesListGetNode(pKeep, 2);
if ((pKeep0->isDuration && (TIME_UNIT_MINUTE != pKeep0->unit && TIME_UNIT_HOUR != pKeep0->unit && TIME_UNIT_DAY != pKeep0->unit)) || if ((pKeep0->isDuration &&
(pKeep1->isDuration && (TIME_UNIT_MINUTE != pKeep1->unit && TIME_UNIT_HOUR != pKeep1->unit && TIME_UNIT_DAY != pKeep1->unit)) || (TIME_UNIT_MINUTE != pKeep0->unit && TIME_UNIT_HOUR != pKeep0->unit && TIME_UNIT_DAY != pKeep0->unit)) ||
(pKeep2->isDuration && (TIME_UNIT_MINUTE != pKeep2->unit && TIME_UNIT_HOUR != pKeep2->unit && TIME_UNIT_DAY != pKeep2->unit))) { (pKeep1->isDuration &&
(TIME_UNIT_MINUTE != pKeep1->unit && TIME_UNIT_HOUR != pKeep1->unit && TIME_UNIT_DAY != pKeep1->unit)) ||
(pKeep2->isDuration &&
(TIME_UNIT_MINUTE != pKeep2->unit && TIME_UNIT_HOUR != pKeep2->unit && TIME_UNIT_DAY != pKeep2->unit))) {
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_INVALID_KEEP_UNIT, pKeep0->unit, pKeep1->unit, pKeep2->unit); return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_INVALID_KEEP_UNIT, pKeep0->unit, pKeep1->unit, pKeep2->unit);
} }
...@@ -1518,7 +1524,8 @@ static int32_t checkDatabaseOptions(STranslateContext* pCxt, SDatabaseOptions* p ...@@ -1518,7 +1524,8 @@ static int32_t checkDatabaseOptions(STranslateContext* pCxt, SDatabaseOptions* p
code = checkRangeOption(pCxt, "compression", pOptions->pCompressionLevel, TSDB_MIN_COMP_LEVEL, TSDB_MAX_COMP_LEVEL); code = checkRangeOption(pCxt, "compression", pOptions->pCompressionLevel, TSDB_MIN_COMP_LEVEL, TSDB_MAX_COMP_LEVEL);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = checkRangeOption(pCxt, "daysPerFile", pOptions->pDaysPerFile, TSDB_MIN_DAYS_PER_FILE, TSDB_MAX_DAYS_PER_FILE); code =
checkRangeOption(pCxt, "daysPerFile", pOptions->pDaysPerFile, TSDB_MIN_DAYS_PER_FILE, TSDB_MAX_DAYS_PER_FILE);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = checkRangeOption(pCxt, "fsyncPeriod", pOptions->pFsyncPeriod, TSDB_MIN_FSYNC_PERIOD, TSDB_MAX_FSYNC_PERIOD); code = checkRangeOption(pCxt, "fsyncPeriod", pOptions->pFsyncPeriod, TSDB_MIN_FSYNC_PERIOD, TSDB_MAX_FSYNC_PERIOD);
...@@ -1571,7 +1578,7 @@ static int32_t checkCreateDatabase(STranslateContext* pCxt, SCreateDatabaseStmt* ...@@ -1571,7 +1578,7 @@ static int32_t checkCreateDatabase(STranslateContext* pCxt, SCreateDatabaseStmt*
return checkDatabaseOptions(pCxt, pStmt->pOptions); return checkDatabaseOptions(pCxt, pStmt->pOptions);
} }
typedef int32_t (*FSerializeFunc)(void *pBuf, int32_t bufLen, void *pReq); typedef int32_t (*FSerializeFunc)(void* pBuf, int32_t bufLen, void* pReq);
static int32_t buildCmdMsg(STranslateContext* pCxt, int16_t msgType, FSerializeFunc func, void* pReq) { static int32_t buildCmdMsg(STranslateContext* pCxt, int16_t msgType, FSerializeFunc func, void* pReq) {
pCxt->pCmdMsg = taosMemoryMalloc(sizeof(SCmdMsgInfo)); pCxt->pCmdMsg = taosMemoryMalloc(sizeof(SCmdMsgInfo));
...@@ -2176,7 +2183,8 @@ static int16_t getCreateComponentNodeMsgType(ENodeType type) { ...@@ -2176,7 +2183,8 @@ static int16_t getCreateComponentNodeMsgType(ENodeType type) {
static int32_t translateCreateComponentNode(STranslateContext* pCxt, SCreateComponentNodeStmt* pStmt) { static int32_t translateCreateComponentNode(STranslateContext* pCxt, SCreateComponentNodeStmt* pStmt) {
SMCreateQnodeReq createReq = {.dnodeId = pStmt->dnodeId}; SMCreateQnodeReq createReq = {.dnodeId = pStmt->dnodeId};
return buildCmdMsg(pCxt, getCreateComponentNodeMsgType(nodeType(pStmt)), (FSerializeFunc)tSerializeSCreateDropMQSBNodeReq, &createReq); return buildCmdMsg(pCxt, getCreateComponentNodeMsgType(nodeType(pStmt)),
(FSerializeFunc)tSerializeSCreateDropMQSBNodeReq, &createReq);
} }
static int16_t getDropComponentNodeMsgType(ENodeType type) { static int16_t getDropComponentNodeMsgType(ENodeType type) {
...@@ -2197,7 +2205,8 @@ static int16_t getDropComponentNodeMsgType(ENodeType type) { ...@@ -2197,7 +2205,8 @@ static int16_t getDropComponentNodeMsgType(ENodeType type) {
static int32_t translateDropComponentNode(STranslateContext* pCxt, SDropComponentNodeStmt* pStmt) { static int32_t translateDropComponentNode(STranslateContext* pCxt, SDropComponentNodeStmt* pStmt) {
SDDropQnodeReq dropReq = {.dnodeId = pStmt->dnodeId}; SDDropQnodeReq dropReq = {.dnodeId = pStmt->dnodeId};
return buildCmdMsg(pCxt, getDropComponentNodeMsgType(nodeType(pStmt)), (FSerializeFunc)tSerializeSCreateDropMQSBNodeReq, &dropReq); return buildCmdMsg(pCxt, getDropComponentNodeMsgType(nodeType(pStmt)),
(FSerializeFunc)tSerializeSCreateDropMQSBNodeReq, &dropReq);
} }
static int32_t translateCreateTopic(STranslateContext* pCxt, SCreateTopicStmt* pStmt) { static int32_t translateCreateTopic(STranslateContext* pCxt, SCreateTopicStmt* pStmt) {
...@@ -2301,11 +2310,13 @@ static int32_t translateCreateStream(STranslateContext* pCxt, SCreateStreamStmt* ...@@ -2301,11 +2310,13 @@ static int32_t translateCreateStream(STranslateContext* pCxt, SCreateStreamStmt*
} }
if (TSDB_CODE_SUCCESS == code && NULL != pStmt->pOptions->pWatermark) { if (TSDB_CODE_SUCCESS == code && NULL != pStmt->pOptions->pWatermark) {
code = (DEAL_RES_ERROR == translateValue(pCxt, (SValueNode*)pStmt->pOptions->pWatermark)) ? pCxt->errCode : TSDB_CODE_SUCCESS; code = (DEAL_RES_ERROR == translateValue(pCxt, (SValueNode*)pStmt->pOptions->pWatermark)) ? pCxt->errCode
: TSDB_CODE_SUCCESS;
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
createReq.triggerType = pStmt->pOptions->triggerType; createReq.triggerType = pStmt->pOptions->triggerType;
createReq.watermark = (NULL != pStmt->pOptions->pWatermark ? ((SValueNode*)pStmt->pOptions->pWatermark)->datum.i : 0); createReq.watermark =
(NULL != pStmt->pOptions->pWatermark ? ((SValueNode*)pStmt->pOptions->pWatermark)->datum.i : 0);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
...@@ -2457,9 +2468,7 @@ static int32_t extractSelectResultSchema(const SSelectStmt* pSelect, int32_t* nu ...@@ -2457,9 +2468,7 @@ static int32_t extractSelectResultSchema(const SSelectStmt* pSelect, int32_t* nu
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int8_t extractResultTsPrecision(const SSelectStmt* pSelect) { static int8_t extractResultTsPrecision(const SSelectStmt* pSelect) { return pSelect->precision; }
return pSelect->precision;
}
static int32_t extractExplainResultSchema(int32_t* numOfCols, SSchema** pSchema) { static int32_t extractExplainResultSchema(int32_t* numOfCols, SSchema** pSchema) {
*numOfCols = 1; *numOfCols = 1;
...@@ -2722,11 +2731,15 @@ typedef struct SVgroupTablesBatch { ...@@ -2722,11 +2731,15 @@ typedef struct SVgroupTablesBatch {
char dbName[TSDB_DB_NAME_LEN]; char dbName[TSDB_DB_NAME_LEN];
} SVgroupTablesBatch; } SVgroupTablesBatch;
static void toSchemaEx(const SColumnDefNode* pCol, col_id_t colId, SSchemaEx* pSchema) { static void toSchemaEx(const SColumnDefNode* pCol, col_id_t colId, SSchema* pSchema) {
int8_t flags = 0;
if (pCol->sma) {
flags |= SCHEMA_SMA_ON;
}
pSchema->colId = colId; pSchema->colId = colId;
pSchema->type = pCol->dataType.type; pSchema->type = pCol->dataType.type;
pSchema->bytes = calcTypeBytes(pCol->dataType); pSchema->bytes = calcTypeBytes(pCol->dataType);
pSchema->sma = pCol->sma ? TSDB_BSMA_TYPE_LATEST : TSDB_BSMA_TYPE_NONE; pSchema->flags = flags;
strcpy(pSchema->name, pCol->colName); strcpy(pSchema->name, pCol->colName);
} }
...@@ -2771,7 +2784,7 @@ static int32_t buildNormalTableBatchReq(int32_t acctId, const SCreateTableStmt* ...@@ -2771,7 +2784,7 @@ static int32_t buildNormalTableBatchReq(int32_t acctId, const SCreateTableStmt*
req.dbFName = strdup(dbFName); req.dbFName = strdup(dbFName);
req.name = strdup(pStmt->tableName); req.name = strdup(pStmt->tableName);
req.ntbCfg.nCols = LIST_LENGTH(pStmt->pCols); req.ntbCfg.nCols = LIST_LENGTH(pStmt->pCols);
req.ntbCfg.pSchema = taosMemoryCalloc(req.ntbCfg.nCols, sizeof(SSchemaEx)); req.ntbCfg.pSchema = taosMemoryCalloc(req.ntbCfg.nCols, sizeof(SSchema));
if (NULL == req.name || NULL == req.ntbCfg.pSchema) { if (NULL == req.name || NULL == req.ntbCfg.pSchema) {
destroyCreateTbReq(&req); destroyCreateTbReq(&req);
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
...@@ -3176,7 +3189,7 @@ static int32_t setQuery(STranslateContext* pCxt, SQuery* pQuery) { ...@@ -3176,7 +3189,7 @@ static int32_t setQuery(STranslateContext* pCxt, SQuery* pQuery) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
pQuery->precision = extractResultTsPrecision((SSelectStmt*) pQuery->pRoot); pQuery->precision = extractResultTsPrecision((SSelectStmt*)pQuery->pRoot);
} }
if (NULL != pCxt->pDbs) { if (NULL != pCxt->pDbs) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册