提交 fe22f0f6 编写于 作者: H Hongze Cheng

use two version in server and refact

上级 c47b7939
...@@ -300,9 +300,7 @@ typedef struct SSchema { ...@@ -300,9 +300,7 @@ typedef struct SSchema {
typedef struct { typedef struct {
int32_t nCols; int32_t nCols;
int32_t sver; int32_t version;
int32_t tagVer;
int32_t colVer;
SSchema* pSchema; SSchema* pSchema;
} SSchemaWrapper; } SSchemaWrapper;
...@@ -310,9 +308,7 @@ static FORCE_INLINE SSchemaWrapper* tCloneSSchemaWrapper(const SSchemaWrapper* p ...@@ -310,9 +308,7 @@ static FORCE_INLINE SSchemaWrapper* tCloneSSchemaWrapper(const SSchemaWrapper* p
SSchemaWrapper* pSW = (SSchemaWrapper*)taosMemoryMalloc(sizeof(SSchemaWrapper)); SSchemaWrapper* pSW = (SSchemaWrapper*)taosMemoryMalloc(sizeof(SSchemaWrapper));
if (pSW == NULL) return pSW; if (pSW == NULL) return pSW;
pSW->nCols = pSchemaWrapper->nCols; pSW->nCols = pSchemaWrapper->nCols;
pSW->sver = pSchemaWrapper->sver; pSW->version = pSchemaWrapper->version;
pSW->tagVer = pSchemaWrapper->tagVer;
pSW->colVer = pSchemaWrapper->colVer;
pSW->pSchema = (SSchema*)taosMemoryCalloc(pSW->nCols, sizeof(SSchema)); pSW->pSchema = (SSchema*)taosMemoryCalloc(pSW->nCols, sizeof(SSchema));
if (pSW->pSchema == NULL) { if (pSW->pSchema == NULL) {
taosMemoryFree(pSW); taosMemoryFree(pSW);
...@@ -367,9 +363,7 @@ static FORCE_INLINE int32_t tDecodeSSchema(SDecoder* pDecoder, SSchema* pSchema) ...@@ -367,9 +363,7 @@ static FORCE_INLINE int32_t tDecodeSSchema(SDecoder* pDecoder, SSchema* pSchema)
static FORCE_INLINE int32_t taosEncodeSSchemaWrapper(void** buf, const SSchemaWrapper* pSW) { static FORCE_INLINE int32_t taosEncodeSSchemaWrapper(void** buf, const SSchemaWrapper* pSW) {
int32_t tlen = 0; int32_t tlen = 0;
tlen += taosEncodeVariantI32(buf, pSW->nCols); tlen += taosEncodeVariantI32(buf, pSW->nCols);
tlen += taosEncodeVariantI32(buf, pSW->sver); tlen += taosEncodeVariantI32(buf, pSW->version);
tlen += taosEncodeVariantI32(buf, pSW->tagVer);
tlen += taosEncodeVariantI32(buf, pSW->colVer);
for (int32_t i = 0; i < pSW->nCols; i++) { for (int32_t i = 0; i < pSW->nCols; i++) {
tlen += taosEncodeSSchema(buf, &pSW->pSchema[i]); tlen += taosEncodeSSchema(buf, &pSW->pSchema[i]);
} }
...@@ -378,9 +372,7 @@ static FORCE_INLINE int32_t taosEncodeSSchemaWrapper(void** buf, const SSchemaWr ...@@ -378,9 +372,7 @@ static FORCE_INLINE int32_t taosEncodeSSchemaWrapper(void** buf, const SSchemaWr
static FORCE_INLINE void* taosDecodeSSchemaWrapper(const void* buf, SSchemaWrapper* pSW) { static FORCE_INLINE void* taosDecodeSSchemaWrapper(const void* buf, SSchemaWrapper* pSW) {
buf = taosDecodeVariantI32(buf, &pSW->nCols); buf = taosDecodeVariantI32(buf, &pSW->nCols);
buf = taosDecodeVariantI32(buf, &pSW->sver); buf = taosDecodeVariantI32(buf, &pSW->version);
buf = taosDecodeVariantI32(buf, &pSW->tagVer);
buf = taosDecodeVariantI32(buf, &pSW->colVer);
pSW->pSchema = (SSchema*)taosMemoryCalloc(pSW->nCols, sizeof(SSchema)); pSW->pSchema = (SSchema*)taosMemoryCalloc(pSW->nCols, sizeof(SSchema));
if (pSW->pSchema == NULL) { if (pSW->pSchema == NULL) {
return NULL; return NULL;
...@@ -394,9 +386,7 @@ static FORCE_INLINE void* taosDecodeSSchemaWrapper(const void* buf, SSchemaWrapp ...@@ -394,9 +386,7 @@ static FORCE_INLINE void* taosDecodeSSchemaWrapper(const void* buf, SSchemaWrapp
static FORCE_INLINE int32_t tEncodeSSchemaWrapper(SEncoder* pEncoder, const SSchemaWrapper* pSW) { static FORCE_INLINE int32_t tEncodeSSchemaWrapper(SEncoder* pEncoder, const SSchemaWrapper* pSW) {
if (tEncodeI32v(pEncoder, pSW->nCols) < 0) return -1; if (tEncodeI32v(pEncoder, pSW->nCols) < 0) return -1;
if (tEncodeI32v(pEncoder, pSW->sver) < 0) return -1; if (tEncodeI32v(pEncoder, pSW->version) < 0) return -1;
if (tEncodeI32v(pEncoder, pSW->tagVer) < 0) return -1;
if (tEncodeI32v(pEncoder, pSW->colVer) < 0) return -1;
for (int32_t i = 0; i < pSW->nCols; i++) { for (int32_t i = 0; i < pSW->nCols; i++) {
if (tEncodeSSchema(pEncoder, &pSW->pSchema[i]) < 0) return -1; if (tEncodeSSchema(pEncoder, &pSW->pSchema[i]) < 0) return -1;
} }
...@@ -406,9 +396,7 @@ static FORCE_INLINE int32_t tEncodeSSchemaWrapper(SEncoder* pEncoder, const SSch ...@@ -406,9 +396,7 @@ static FORCE_INLINE int32_t tEncodeSSchemaWrapper(SEncoder* pEncoder, const SSch
static FORCE_INLINE int32_t tDecodeSSchemaWrapper(SDecoder* pDecoder, SSchemaWrapper* pSW) { static FORCE_INLINE int32_t tDecodeSSchemaWrapper(SDecoder* pDecoder, SSchemaWrapper* pSW) {
if (tDecodeI32v(pDecoder, &pSW->nCols) < 0) return -1; if (tDecodeI32v(pDecoder, &pSW->nCols) < 0) return -1;
if (tDecodeI32v(pDecoder, &pSW->sver) < 0) return -1; if (tDecodeI32v(pDecoder, &pSW->version) < 0) return -1;
if (tDecodeI32v(pDecoder, &pSW->tagVer) < 0) return -1;
if (tDecodeI32v(pDecoder, &pSW->colVer) < 0) return -1;
pSW->pSchema = (SSchema*)taosMemoryCalloc(pSW->nCols, sizeof(SSchema)); pSW->pSchema = (SSchema*)taosMemoryCalloc(pSW->nCols, sizeof(SSchema));
if (pSW->pSchema == NULL) return -1; if (pSW->pSchema == NULL) return -1;
...@@ -421,9 +409,7 @@ static FORCE_INLINE int32_t tDecodeSSchemaWrapper(SDecoder* pDecoder, SSchemaWra ...@@ -421,9 +409,7 @@ static FORCE_INLINE int32_t tDecodeSSchemaWrapper(SDecoder* pDecoder, SSchemaWra
static FORCE_INLINE int32_t tDecodeSSchemaWrapperEx(SDecoder* pDecoder, SSchemaWrapper* pSW) { static FORCE_INLINE int32_t tDecodeSSchemaWrapperEx(SDecoder* pDecoder, SSchemaWrapper* pSW) {
if (tDecodeI32v(pDecoder, &pSW->nCols) < 0) return -1; if (tDecodeI32v(pDecoder, &pSW->nCols) < 0) return -1;
if (tDecodeI32v(pDecoder, &pSW->sver) < 0) return -1; if (tDecodeI32v(pDecoder, &pSW->version) < 0) return -1;
if (tDecodeI32v(pDecoder, &pSW->tagVer) < 0) return -1;
if (tDecodeI32v(pDecoder, &pSW->colVer) < 0) return -1;
pSW->pSchema = (SSchema*)tDecoderMalloc(pDecoder, pSW->nCols * sizeof(SSchema)); pSW->pSchema = (SSchema*)tDecoderMalloc(pDecoder, pSW->nCols * sizeof(SSchema));
if (pSW->pSchema == NULL) return -1; if (pSW->pSchema == NULL) return -1;
...@@ -1713,7 +1699,7 @@ typedef struct SVCreateStbReq { ...@@ -1713,7 +1699,7 @@ typedef struct SVCreateStbReq {
char* name; char* name;
tb_uid_t suid; tb_uid_t suid;
int8_t rollup; int8_t rollup;
SSchemaWrapper schema; SSchemaWrapper schemaRow;
SSchemaWrapper schemaTag; SSchemaWrapper schemaTag;
SRSmaParam pRSmaParam; SRSmaParam pRSmaParam;
} SVCreateStbReq; } SVCreateStbReq;
...@@ -1745,7 +1731,7 @@ typedef struct SVCreateTbReq { ...@@ -1745,7 +1731,7 @@ typedef struct SVCreateTbReq {
uint8_t* pTag; uint8_t* pTag;
} ctb; } ctb;
struct { struct {
SSchemaWrapper schema; SSchemaWrapper schemaRow;
} ntb; } ntb;
}; };
} SVCreateTbReq; } SVCreateTbReq;
......
...@@ -3801,7 +3801,7 @@ int tEncodeSVCreateStbReq(SEncoder *pCoder, const SVCreateStbReq *pReq) { ...@@ -3801,7 +3801,7 @@ int tEncodeSVCreateStbReq(SEncoder *pCoder, const SVCreateStbReq *pReq) {
if (tEncodeCStr(pCoder, pReq->name) < 0) return -1; if (tEncodeCStr(pCoder, pReq->name) < 0) return -1;
if (tEncodeI64(pCoder, pReq->suid) < 0) return -1; if (tEncodeI64(pCoder, pReq->suid) < 0) return -1;
if (tEncodeI8(pCoder, pReq->rollup) < 0) return -1; if (tEncodeI8(pCoder, pReq->rollup) < 0) return -1;
if (tEncodeSSchemaWrapper(pCoder, &pReq->schema) < 0) return -1; if (tEncodeSSchemaWrapper(pCoder, &pReq->schemaRow) < 0) return -1;
if (tEncodeSSchemaWrapper(pCoder, &pReq->schemaTag) < 0) return -1; if (tEncodeSSchemaWrapper(pCoder, &pReq->schemaTag) < 0) return -1;
if (pReq->rollup) { if (pReq->rollup) {
if (tEncodeSRSmaParam(pCoder, &pReq->pRSmaParam) < 0) return -1; if (tEncodeSRSmaParam(pCoder, &pReq->pRSmaParam) < 0) return -1;
...@@ -3817,7 +3817,7 @@ int tDecodeSVCreateStbReq(SDecoder *pCoder, SVCreateStbReq *pReq) { ...@@ -3817,7 +3817,7 @@ int tDecodeSVCreateStbReq(SDecoder *pCoder, SVCreateStbReq *pReq) {
if (tDecodeCStr(pCoder, &pReq->name) < 0) return -1; if (tDecodeCStr(pCoder, &pReq->name) < 0) return -1;
if (tDecodeI64(pCoder, &pReq->suid) < 0) return -1; if (tDecodeI64(pCoder, &pReq->suid) < 0) return -1;
if (tDecodeI8(pCoder, &pReq->rollup) < 0) return -1; if (tDecodeI8(pCoder, &pReq->rollup) < 0) return -1;
if (tDecodeSSchemaWrapper(pCoder, &pReq->schema) < 0) return -1; if (tDecodeSSchemaWrapper(pCoder, &pReq->schemaRow) < 0) return -1;
if (tDecodeSSchemaWrapper(pCoder, &pReq->schemaTag) < 0) return -1; if (tDecodeSSchemaWrapper(pCoder, &pReq->schemaTag) < 0) return -1;
if (pReq->rollup) { if (pReq->rollup) {
if (tDecodeSRSmaParam(pCoder, &pReq->pRSmaParam) < 0) return -1; if (tDecodeSRSmaParam(pCoder, &pReq->pRSmaParam) < 0) return -1;
...@@ -3866,7 +3866,7 @@ int tEncodeSVCreateTbReq(SEncoder *pCoder, const SVCreateTbReq *pReq) { ...@@ -3866,7 +3866,7 @@ int tEncodeSVCreateTbReq(SEncoder *pCoder, const SVCreateTbReq *pReq) {
if (tEncodeI64(pCoder, pReq->ctb.suid) < 0) return -1; if (tEncodeI64(pCoder, pReq->ctb.suid) < 0) return -1;
if (tEncodeBinary(pCoder, pReq->ctb.pTag, kvRowLen(pReq->ctb.pTag)) < 0) return -1; if (tEncodeBinary(pCoder, pReq->ctb.pTag, kvRowLen(pReq->ctb.pTag)) < 0) return -1;
} else if (pReq->type == TSDB_NORMAL_TABLE) { } else if (pReq->type == TSDB_NORMAL_TABLE) {
if (tEncodeSSchemaWrapper(pCoder, &pReq->ntb.schema) < 0) return -1; if (tEncodeSSchemaWrapper(pCoder, &pReq->ntb.schemaRow) < 0) return -1;
} else { } else {
ASSERT(0); ASSERT(0);
} }
...@@ -3892,7 +3892,7 @@ int tDecodeSVCreateTbReq(SDecoder *pCoder, SVCreateTbReq *pReq) { ...@@ -3892,7 +3892,7 @@ int tDecodeSVCreateTbReq(SDecoder *pCoder, SVCreateTbReq *pReq) {
if (tDecodeI64(pCoder, &pReq->ctb.suid) < 0) return -1; if (tDecodeI64(pCoder, &pReq->ctb.suid) < 0) return -1;
if (tDecodeBinary(pCoder, &pReq->ctb.pTag, &len) < 0) return -1; if (tDecodeBinary(pCoder, &pReq->ctb.pTag, &len) < 0) return -1;
} else if (pReq->type == TSDB_NORMAL_TABLE) { } else if (pReq->type == TSDB_NORMAL_TABLE) {
if (tDecodeSSchemaWrapper(pCoder, &pReq->ntb.schema) < 0) return -1; if (tDecodeSSchemaWrapper(pCoder, &pReq->ntb.schemaRow) < 0) return -1;
} else { } else {
ASSERT(0); ASSERT(0);
} }
......
...@@ -388,13 +388,12 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt ...@@ -388,13 +388,12 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt
req.name = (char *)tNameGetTableName(&name); req.name = (char *)tNameGetTableName(&name);
req.suid = pStb->uid; req.suid = pStb->uid;
req.rollup = pStb->ast1Len > 0 ? 1 : 0; req.rollup = pStb->ast1Len > 0 ? 1 : 0;
req.schema.nCols = pStb->numOfColumns; // todo
req.schema.sver = pStb->version; req.schemaRow.nCols = pStb->numOfColumns;
req.schema.tagVer = pStb->tagVer; req.schemaRow.version = pStb->version;
req.schema.colVer = pStb->colVer; req.schemaRow.pSchema = pStb->pColumns;
req.schema.pSchema = pStb->pColumns;
req.schemaTag.nCols = pStb->numOfTags; req.schemaTag.nCols = pStb->numOfTags;
req.schemaTag.sver = 1; req.schemaTag.version = pStb->tagVer;
req.schemaTag.pSchema = pStb->pTags; req.schemaTag.pSchema = pStb->pTags;
if (req.rollup) { if (req.rollup) {
......
...@@ -217,7 +217,7 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) { ...@@ -217,7 +217,7 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
} }
} else { } else {
pTopic->schema.nCols = 0; pTopic->schema.nCols = 0;
pTopic->schema.sver = 0; pTopic->schema.version = 0;
pTopic->schema.pSchema = NULL; pTopic->schema.pSchema = NULL;
} }
......
...@@ -182,7 +182,7 @@ struct SMetaEntry { ...@@ -182,7 +182,7 @@ struct SMetaEntry {
char *name; char *name;
union { union {
struct { struct {
SSchemaWrapper schema; SSchemaWrapper schemaRow;
SSchemaWrapper schemaTag; SSchemaWrapper schemaTag;
} stbEntry; } stbEntry;
struct { struct {
...@@ -195,7 +195,7 @@ struct SMetaEntry { ...@@ -195,7 +195,7 @@ struct SMetaEntry {
int64_t ctime; int64_t ctime;
int32_t ttlDays; int32_t ttlDays;
int32_t ncid; // next column id int32_t ncid; // next column id
SSchemaWrapper schema; SSchemaWrapper schemaRow;
} ntbEntry; } ntbEntry;
struct { struct {
STSma *tsma; STSma *tsma;
......
...@@ -24,7 +24,7 @@ int metaEncodeEntry(SEncoder *pCoder, const SMetaEntry *pME) { ...@@ -24,7 +24,7 @@ int metaEncodeEntry(SEncoder *pCoder, const SMetaEntry *pME) {
if (tEncodeCStr(pCoder, pME->name) < 0) return -1; if (tEncodeCStr(pCoder, pME->name) < 0) return -1;
if (pME->type == TSDB_SUPER_TABLE) { if (pME->type == TSDB_SUPER_TABLE) {
if (tEncodeSSchemaWrapper(pCoder, &pME->stbEntry.schema) < 0) return -1; if (tEncodeSSchemaWrapper(pCoder, &pME->stbEntry.schemaRow) < 0) return -1;
if (tEncodeSSchemaWrapper(pCoder, &pME->stbEntry.schemaTag) < 0) return -1; if (tEncodeSSchemaWrapper(pCoder, &pME->stbEntry.schemaTag) < 0) return -1;
} else if (pME->type == TSDB_CHILD_TABLE) { } else if (pME->type == TSDB_CHILD_TABLE) {
if (tEncodeI64(pCoder, pME->ctbEntry.ctime) < 0) return -1; if (tEncodeI64(pCoder, pME->ctbEntry.ctime) < 0) return -1;
...@@ -35,7 +35,7 @@ int metaEncodeEntry(SEncoder *pCoder, const SMetaEntry *pME) { ...@@ -35,7 +35,7 @@ int metaEncodeEntry(SEncoder *pCoder, const SMetaEntry *pME) {
if (tEncodeI64(pCoder, pME->ntbEntry.ctime) < 0) return -1; if (tEncodeI64(pCoder, pME->ntbEntry.ctime) < 0) return -1;
if (tEncodeI32(pCoder, pME->ntbEntry.ttlDays) < 0) return -1; if (tEncodeI32(pCoder, pME->ntbEntry.ttlDays) < 0) return -1;
if (tEncodeI32v(pCoder, pME->ntbEntry.ncid) < 0) return -1; if (tEncodeI32v(pCoder, pME->ntbEntry.ncid) < 0) return -1;
if (tEncodeSSchemaWrapper(pCoder, &pME->ntbEntry.schema) < 0) return -1; if (tEncodeSSchemaWrapper(pCoder, &pME->ntbEntry.schemaRow) < 0) return -1;
} else if (pME->type == TSDB_TSMA_TABLE) { } else if (pME->type == TSDB_TSMA_TABLE) {
if (tEncodeTSma(pCoder, pME->smaEntry.tsma) < 0) return -1; if (tEncodeTSma(pCoder, pME->smaEntry.tsma) < 0) return -1;
} else { } else {
...@@ -56,7 +56,7 @@ int metaDecodeEntry(SDecoder *pCoder, SMetaEntry *pME) { ...@@ -56,7 +56,7 @@ int metaDecodeEntry(SDecoder *pCoder, SMetaEntry *pME) {
if (tDecodeCStr(pCoder, &pME->name) < 0) return -1; if (tDecodeCStr(pCoder, &pME->name) < 0) return -1;
if (pME->type == TSDB_SUPER_TABLE) { if (pME->type == TSDB_SUPER_TABLE) {
if (tDecodeSSchemaWrapperEx(pCoder, &pME->stbEntry.schema) < 0) return -1; if (tDecodeSSchemaWrapperEx(pCoder, &pME->stbEntry.schemaRow) < 0) return -1;
if (tDecodeSSchemaWrapperEx(pCoder, &pME->stbEntry.schemaTag) < 0) return -1; if (tDecodeSSchemaWrapperEx(pCoder, &pME->stbEntry.schemaTag) < 0) return -1;
} else if (pME->type == TSDB_CHILD_TABLE) { } else if (pME->type == TSDB_CHILD_TABLE) {
if (tDecodeI64(pCoder, &pME->ctbEntry.ctime) < 0) return -1; if (tDecodeI64(pCoder, &pME->ctbEntry.ctime) < 0) return -1;
...@@ -67,7 +67,7 @@ int metaDecodeEntry(SDecoder *pCoder, SMetaEntry *pME) { ...@@ -67,7 +67,7 @@ int metaDecodeEntry(SDecoder *pCoder, SMetaEntry *pME) {
if (tDecodeI64(pCoder, &pME->ntbEntry.ctime) < 0) return -1; if (tDecodeI64(pCoder, &pME->ntbEntry.ctime) < 0) return -1;
if (tDecodeI32(pCoder, &pME->ntbEntry.ttlDays) < 0) return -1; if (tDecodeI32(pCoder, &pME->ntbEntry.ttlDays) < 0) return -1;
if (tDecodeI32v(pCoder, &pME->ntbEntry.ncid) < 0) return -1; if (tDecodeI32v(pCoder, &pME->ntbEntry.ncid) < 0) return -1;
if (tDecodeSSchemaWrapperEx(pCoder, &pME->ntbEntry.schema) < 0) return -1; if (tDecodeSSchemaWrapperEx(pCoder, &pME->ntbEntry.schemaRow) < 0) return -1;
} else if (pME->type == TSDB_TSMA_TABLE) { } else if (pME->type == TSDB_TSMA_TABLE) {
pME->smaEntry.tsma = tDecoderMalloc(pCoder, sizeof(STSma)); pME->smaEntry.tsma = tDecoderMalloc(pCoder, sizeof(STSma));
if (!pME->smaEntry.tsma) { if (!pME->smaEntry.tsma) {
......
...@@ -56,7 +56,7 @@ int metaCreateSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) { ...@@ -56,7 +56,7 @@ int metaCreateSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
me.type = TSDB_SUPER_TABLE; me.type = TSDB_SUPER_TABLE;
me.uid = pReq->suid; me.uid = pReq->suid;
me.name = pReq->name; me.name = pReq->name;
me.stbEntry.schema = pReq->schema; me.stbEntry.schemaRow = pReq->schemaRow;
me.stbEntry.schemaTag = pReq->schemaTag; me.stbEntry.schemaTag = pReq->schemaTag;
if (metaHandleEntry(pMeta, &me) < 0) goto _err; if (metaHandleEntry(pMeta, &me) < 0) goto _err;
...@@ -182,16 +182,14 @@ int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) { ...@@ -182,16 +182,14 @@ int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
nStbEntry.type = TSDB_SUPER_TABLE; nStbEntry.type = TSDB_SUPER_TABLE;
nStbEntry.uid = pReq->suid; nStbEntry.uid = pReq->suid;
nStbEntry.name = pReq->name; nStbEntry.name = pReq->name;
nStbEntry.stbEntry.schema = pReq->schema; nStbEntry.stbEntry.schemaRow = pReq->schemaRow;
nStbEntry.stbEntry.schemaTag = pReq->schemaTag; nStbEntry.stbEntry.schemaTag = pReq->schemaTag;
metaWLock(pMeta); metaWLock(pMeta);
// compare two entry // compare two entry
if (oStbEntry.stbEntry.schema.sver != pReq->schema.sver) { if (oStbEntry.stbEntry.schemaRow.version != pReq->schemaRow.version) {
if (oStbEntry.stbEntry.schema.nCols != pReq->schema.nCols) {
metaSaveToSkmDb(pMeta, &nStbEntry); metaSaveToSkmDb(pMeta, &nStbEntry);
} }
}
// if (oStbEntry.stbEntry.schemaTag.sver != pReq->schemaTag.sver) { // if (oStbEntry.stbEntry.schemaTag.sver != pReq->schemaTag.sver) {
// // change tag schema // // change tag schema
...@@ -247,8 +245,8 @@ int metaCreateTable(SMeta *pMeta, int64_t version, SVCreateTbReq *pReq) { ...@@ -247,8 +245,8 @@ int metaCreateTable(SMeta *pMeta, int64_t version, SVCreateTbReq *pReq) {
} else { } else {
me.ntbEntry.ctime = pReq->ctime; me.ntbEntry.ctime = pReq->ctime;
me.ntbEntry.ttlDays = pReq->ttl; me.ntbEntry.ttlDays = pReq->ttl;
me.ntbEntry.schema = pReq->ntb.schema; me.ntbEntry.schemaRow = pReq->ntb.schemaRow;
me.ntbEntry.ncid = me.ntbEntry.schema.pSchema[me.ntbEntry.schema.nCols - 1].colId + 1; me.ntbEntry.ncid = me.ntbEntry.schemaRow.pSchema[me.ntbEntry.schemaRow.nCols - 1].colId + 1;
} }
if (metaHandleEntry(pMeta, &me) < 0) goto _err; if (metaHandleEntry(pMeta, &me) < 0) goto _err;
...@@ -381,7 +379,7 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl ...@@ -381,7 +379,7 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl
} }
// search the column to add/drop/update // search the column to add/drop/update
pSchema = &entry.ntbEntry.schema; pSchema = &entry.ntbEntry.schemaRow;
int32_t iCol = 0; int32_t iCol = 0;
for (;;) { for (;;) {
pColumn = NULL; pColumn = NULL;
...@@ -402,16 +400,16 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl ...@@ -402,16 +400,16 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl
terrno = TSDB_CODE_VND_COL_ALREADY_EXISTS; terrno = TSDB_CODE_VND_COL_ALREADY_EXISTS;
goto _err; goto _err;
} }
pSchema->sver++; pSchema->version++;
pSchema->nCols++; pSchema->nCols++;
pNewSchema = taosMemoryMalloc(sizeof(SSchema) * pSchema->nCols); pNewSchema = taosMemoryMalloc(sizeof(SSchema) * pSchema->nCols);
memcpy(pNewSchema, pSchema->pSchema, sizeof(SSchema) * (pSchema->nCols - 1)); memcpy(pNewSchema, pSchema->pSchema, sizeof(SSchema) * (pSchema->nCols - 1));
pSchema->pSchema = pNewSchema; pSchema->pSchema = pNewSchema;
pSchema->pSchema[entry.ntbEntry.schema.nCols - 1].bytes = pAlterTbReq->bytes; pSchema->pSchema[entry.ntbEntry.schemaRow.nCols - 1].bytes = pAlterTbReq->bytes;
pSchema->pSchema[entry.ntbEntry.schema.nCols - 1].type = pAlterTbReq->type; pSchema->pSchema[entry.ntbEntry.schemaRow.nCols - 1].type = pAlterTbReq->type;
pSchema->pSchema[entry.ntbEntry.schema.nCols - 1].flags = pAlterTbReq->flags; pSchema->pSchema[entry.ntbEntry.schemaRow.nCols - 1].flags = pAlterTbReq->flags;
pSchema->pSchema[entry.ntbEntry.schema.nCols - 1].colId = entry.ntbEntry.ncid++; pSchema->pSchema[entry.ntbEntry.schemaRow.nCols - 1].colId = entry.ntbEntry.ncid++;
strcpy(pSchema->pSchema[entry.ntbEntry.schema.nCols - 1].name, pAlterTbReq->colName); strcpy(pSchema->pSchema[entry.ntbEntry.schemaRow.nCols - 1].name, pAlterTbReq->colName);
break; break;
case TSDB_ALTER_TABLE_DROP_COLUMN: case TSDB_ALTER_TABLE_DROP_COLUMN:
if (pColumn == NULL) { if (pColumn == NULL) {
...@@ -422,7 +420,7 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl ...@@ -422,7 +420,7 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl
terrno = TSDB_CODE_VND_INVALID_TABLE_ACTION; terrno = TSDB_CODE_VND_INVALID_TABLE_ACTION;
goto _err; goto _err;
} }
pSchema->sver++; pSchema->version++;
tlen = (pSchema->nCols - iCol - 1) * sizeof(SSchema); tlen = (pSchema->nCols - iCol - 1) * sizeof(SSchema);
if (tlen) { if (tlen) {
memmove(pColumn, pColumn + 1, tlen); memmove(pColumn, pColumn + 1, tlen);
...@@ -438,7 +436,7 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl ...@@ -438,7 +436,7 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl
terrno = TSDB_CODE_VND_INVALID_TABLE_ACTION; terrno = TSDB_CODE_VND_INVALID_TABLE_ACTION;
goto _err; goto _err;
} }
pSchema->sver++; pSchema->version++;
pColumn->bytes = pAlterTbReq->colModBytes; pColumn->bytes = pAlterTbReq->colModBytes;
break; break;
case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME: case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME:
...@@ -446,7 +444,7 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl ...@@ -446,7 +444,7 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl
terrno = TSDB_CODE_VND_TABLE_COL_NOT_EXISTS; terrno = TSDB_CODE_VND_TABLE_COL_NOT_EXISTS;
goto _err; goto _err;
} }
pSchema->sver++; pSchema->version++;
strcpy(pColumn->name, pAlterTbReq->colNewName); strcpy(pColumn->name, pAlterTbReq->colNewName);
break; break;
} }
...@@ -813,15 +811,15 @@ static int metaSaveToSkmDb(SMeta *pMeta, const SMetaEntry *pME) { ...@@ -813,15 +811,15 @@ static int metaSaveToSkmDb(SMeta *pMeta, const SMetaEntry *pME) {
const SSchemaWrapper *pSW; const SSchemaWrapper *pSW;
if (pME->type == TSDB_SUPER_TABLE) { if (pME->type == TSDB_SUPER_TABLE) {
pSW = &pME->stbEntry.schema; pSW = &pME->stbEntry.schemaRow;
} else if (pME->type == TSDB_NORMAL_TABLE) { } else if (pME->type == TSDB_NORMAL_TABLE) {
pSW = &pME->ntbEntry.schema; pSW = &pME->ntbEntry.schemaRow;
} else { } else {
ASSERT(0); ASSERT(0);
} }
skmDbKey.uid = pME->uid; skmDbKey.uid = pME->uid;
skmDbKey.sver = pSW->sver; skmDbKey.sver = pSW->version;
// encode schema // encode schema
int32_t ret = 0; int32_t ret = 0;
......
...@@ -324,10 +324,10 @@ int tsdbInsertTableData(STsdb *pTsdb, SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlo ...@@ -324,10 +324,10 @@ int tsdbInsertTableData(STsdb *pTsdb, SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlo
strcat(pRsp->tblFName, mr.me.name); strcat(pRsp->tblFName, mr.me.name);
if (mr.me.type == TSDB_NORMAL_TABLE) { if (mr.me.type == TSDB_NORMAL_TABLE) {
sverNew = mr.me.ntbEntry.schema.sver; sverNew = mr.me.ntbEntry.schemaRow.version;
} else { } else {
metaGetTableEntryByUid(&mr, mr.me.ctbEntry.suid); metaGetTableEntryByUid(&mr, mr.me.ctbEntry.suid);
sverNew = mr.me.stbEntry.schema.sver; sverNew = mr.me.stbEntry.schemaRow.version;
} }
metaReaderClear(&mr); metaReaderClear(&mr);
......
...@@ -64,7 +64,7 @@ int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) { ...@@ -64,7 +64,7 @@ int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) {
if (mer1.me.type == TSDB_SUPER_TABLE) { if (mer1.me.type == TSDB_SUPER_TABLE) {
strcpy(metaRsp.stbName, mer1.me.name); strcpy(metaRsp.stbName, mer1.me.name);
schema = mer1.me.stbEntry.schema; schema = mer1.me.stbEntry.schemaRow;
schemaTag = mer1.me.stbEntry.schemaTag; schemaTag = mer1.me.stbEntry.schemaTag;
metaRsp.suid = mer1.me.uid; metaRsp.suid = mer1.me.uid;
} else if (mer1.me.type == TSDB_CHILD_TABLE) { } else if (mer1.me.type == TSDB_CHILD_TABLE) {
...@@ -73,10 +73,10 @@ int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) { ...@@ -73,10 +73,10 @@ int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) {
strcpy(metaRsp.stbName, mer2.me.name); strcpy(metaRsp.stbName, mer2.me.name);
metaRsp.suid = mer2.me.uid; metaRsp.suid = mer2.me.uid;
schema = mer2.me.stbEntry.schema; schema = mer2.me.stbEntry.schemaRow;
schemaTag = mer2.me.stbEntry.schemaTag; schemaTag = mer2.me.stbEntry.schemaTag;
} else if (mer1.me.type == TSDB_NORMAL_TABLE) { } else if (mer1.me.type == TSDB_NORMAL_TABLE) {
schema = mer1.me.ntbEntry.schema; schema = mer1.me.ntbEntry.schemaRow;
} else { } else {
ASSERT(0); ASSERT(0);
} }
...@@ -84,7 +84,7 @@ int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) { ...@@ -84,7 +84,7 @@ int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) {
metaRsp.numOfTags = schemaTag.nCols; metaRsp.numOfTags = schemaTag.nCols;
metaRsp.numOfColumns = schema.nCols; metaRsp.numOfColumns = schema.nCols;
metaRsp.precision = pVnode->config.tsdbCfg.precision; metaRsp.precision = pVnode->config.tsdbCfg.precision;
metaRsp.sversion = schema.sver; metaRsp.sversion = schema.version;
metaRsp.pSchemas = (SSchema *)taosMemoryMalloc(sizeof(SSchema) * (metaRsp.numOfColumns + metaRsp.numOfTags)); metaRsp.pSchemas = (SSchema *)taosMemoryMalloc(sizeof(SSchema) * (metaRsp.numOfColumns + metaRsp.numOfTags));
memcpy(metaRsp.pSchemas, schema.pSchema, sizeof(SSchema) * schema.nCols); memcpy(metaRsp.pSchemas, schema.pSchema, sizeof(SSchema) * schema.nCols);
......
...@@ -124,7 +124,7 @@ static void destroySysTableScannerOperatorInfo(void* param, int32_t numOfOutput) ...@@ -124,7 +124,7 @@ static void destroySysTableScannerOperatorInfo(void* param, int32_t numOfOutput)
void doSetOperatorCompleted(SOperatorInfo* pOperator) { void doSetOperatorCompleted(SOperatorInfo* pOperator) {
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
pOperator->cost.totalCost = (taosGetTimestampUs() - pOperator->pTaskInfo->cost.start * 1000)/1000.0; pOperator->cost.totalCost = (taosGetTimestampUs() - pOperator->pTaskInfo->cost.start * 1000) / 1000.0;
if (pOperator->pTaskInfo != NULL) { if (pOperator->pTaskInfo != NULL) {
setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
} }
...@@ -3465,7 +3465,7 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { ...@@ -3465,7 +3465,7 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
initGroupedResultInfo(&pAggInfo->groupResInfo, pAggInfo->aggSup.pResultRowHashTable, 0); initGroupedResultInfo(&pAggInfo->groupResInfo, pAggInfo->aggSup.pResultRowHashTable, 0);
OPTR_SET_OPENED(pOperator); OPTR_SET_OPENED(pOperator);
pOperator->cost.openCost = (taosGetTimestampUs() - st)/1000.0; pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -3490,10 +3490,10 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) { ...@@ -3490,10 +3490,10 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) {
doSetOperatorCompleted(pOperator); doSetOperatorCompleted(pOperator);
} }
size_t rows = blockDataGetNumOfRows(pInfo->pRes);//pInfo->pRes : NULL; size_t rows = blockDataGetNumOfRows(pInfo->pRes); // pInfo->pRes : NULL;
pOperator->resultInfo.totalRows += rows; pOperator->resultInfo.totalRows += rows;
return (rows == 0)? NULL:pInfo->pRes; return (rows == 0) ? NULL : pInfo->pRes;
} }
void aggEncodeResultRow(SOperatorInfo* pOperator, SAggSupporter* pSup, SOptrBasicInfo* pInfo, char** result, void aggEncodeResultRow(SOperatorInfo* pOperator, SAggSupporter* pSup, SOptrBasicInfo* pInfo, char** result,
...@@ -3778,10 +3778,10 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { ...@@ -3778,10 +3778,10 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
pOperator->resultInfo.totalRows += rows; pOperator->resultInfo.totalRows += rows;
if (pOperator->cost.openCost == 0) { if (pOperator->cost.openCost == 0) {
pOperator->cost.openCost = (taosGetTimestampUs() - st)/ 1000.0; pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
} }
return (rows > 0)? pInfo->pRes:NULL; return (rows > 0) ? pInfo->pRes : NULL;
} }
static void doHandleRemainBlockForNewGroupImpl(SFillOperatorInfo* pInfo, SResultInfo* pResultInfo, bool* newgroup, static void doHandleRemainBlockForNewGroupImpl(SFillOperatorInfo* pInfo, SResultInfo* pResultInfo, bool* newgroup,
...@@ -4455,15 +4455,15 @@ void extractTableSchemaVersion(SReadHandle* pHandle, uint64_t uid, SExecTaskInfo ...@@ -4455,15 +4455,15 @@ void extractTableSchemaVersion(SReadHandle* pHandle, uint64_t uid, SExecTaskInfo
pTaskInfo->schemaVer.tablename = strdup(mr.me.name); pTaskInfo->schemaVer.tablename = strdup(mr.me.name);
if (mr.me.type == TSDB_SUPER_TABLE) { if (mr.me.type == TSDB_SUPER_TABLE) {
pTaskInfo->schemaVer.sversion = mr.me.stbEntry.schema.sver; pTaskInfo->schemaVer.sversion = mr.me.stbEntry.schemaRow.version;
pTaskInfo->schemaVer.tversion = mr.me.stbEntry.schemaTag.sver; pTaskInfo->schemaVer.tversion = mr.me.stbEntry.schemaTag.version;
} else if (mr.me.type == TSDB_CHILD_TABLE) { } else if (mr.me.type == TSDB_CHILD_TABLE) {
tb_uid_t suid = mr.me.ctbEntry.suid; tb_uid_t suid = mr.me.ctbEntry.suid;
metaGetTableEntryByUid(&mr, suid); metaGetTableEntryByUid(&mr, suid);
pTaskInfo->schemaVer.sversion = mr.me.stbEntry.schema.sver; pTaskInfo->schemaVer.sversion = mr.me.stbEntry.schemaRow.version;
pTaskInfo->schemaVer.tversion = mr.me.stbEntry.schemaTag.sver; pTaskInfo->schemaVer.tversion = mr.me.stbEntry.schemaTag.version;
} else { } else {
pTaskInfo->schemaVer.sversion = mr.me.ntbEntry.schema.sver; pTaskInfo->schemaVer.sversion = mr.me.ntbEntry.schemaRow.version;
} }
metaReaderClear(&mr); metaReaderClear(&mr);
...@@ -4668,8 +4668,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -4668,8 +4668,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
int32_t tsSlotId = ((SColumnNode*)pSessionNode->window.pTspk)->slotId; int32_t tsSlotId = ((SColumnNode*)pSessionNode->window.pTspk)->slotId;
pOptr = pOptr = createStreamSessionAggOperatorInfo(ops[0], pExprInfo, num, pResBlock, pSessionNode->gap, tsSlotId, &as,
createStreamSessionAggOperatorInfo(ops[0], pExprInfo, num, pResBlock, pSessionNode->gap, tsSlotId, &as, pTaskInfo); pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_PARTITION == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_PARTITION == type) {
SPartitionPhysiNode* pPartNode = (SPartitionPhysiNode*)pPhyNode; SPartitionPhysiNode* pPartNode = (SPartitionPhysiNode*)pPhyNode;
...@@ -5162,8 +5162,7 @@ int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SExplainExecInfo ...@@ -5162,8 +5162,7 @@ int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SExplainExecInfo
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t initCatchSupporter(SCatchSupporter* pCatchSup, size_t rowSize, const char* pKey, int32_t initCatchSupporter(SCatchSupporter* pCatchSup, size_t rowSize, const char* pKey, const char* pDir) {
const char* pDir) {
pCatchSup->keySize = sizeof(int64_t) + sizeof(int64_t) + sizeof(TSKEY); pCatchSup->keySize = sizeof(int64_t) + sizeof(int64_t) + sizeof(TSKEY);
pCatchSup->pKeyBuf = taosMemoryCalloc(1, pCatchSup->keySize); pCatchSup->pKeyBuf = taosMemoryCalloc(1, pCatchSup->keySize);
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
......
...@@ -13,8 +13,8 @@ ...@@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "function.h"
#include "filter.h" #include "filter.h"
#include "function.h"
#include "functionMgt.h" #include "functionMgt.h"
#include "os.h" #include "os.h"
#include "querynodes.h" #include "querynodes.h"
...@@ -142,7 +142,7 @@ static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockIn ...@@ -142,7 +142,7 @@ static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockIn
return true; return true;
} }
while(1) { while (1) {
getNextTimeWindow(pInterval, &w, order); getNextTimeWindow(pInterval, &w, order);
if (w.ekey < pBlockInfo->window.skey) { if (w.ekey < pBlockInfo->window.skey) {
break; break;
...@@ -190,7 +190,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca ...@@ -190,7 +190,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
pCost->skipBlocks += 1; pCost->skipBlocks += 1;
// clear all data in pBlock that are set when handing the previous block // clear all data in pBlock that are set when handing the previous block
for(int32_t i = 0; i < pBlockInfo->numOfCols; ++i) { for (int32_t i = 0; i < pBlockInfo->numOfCols; ++i) {
SColumnInfoData* pcol = taosArrayGet(pBlock->pDataBlock, i); SColumnInfoData* pcol = taosArrayGet(pBlock->pDataBlock, i);
pcol->pData = NULL; pcol->pData = NULL;
} }
...@@ -304,23 +304,23 @@ void addTagPseudoColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock) ...@@ -304,23 +304,23 @@ void addTagPseudoColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock)
setTbNameColData(pTableScanInfo->readHandle.meta, pBlock, pColInfoData, functionId); setTbNameColData(pTableScanInfo->readHandle.meta, pBlock, pColInfoData, functionId);
} else { // these are tags } else { // these are tags
const char* p = NULL; const char* p = NULL;
if(pColInfoData->info.type == TSDB_DATA_TYPE_JSON){ if (pColInfoData->info.type == TSDB_DATA_TYPE_JSON) {
const uint8_t *tmp = mr.me.ctbEntry.pTags; const uint8_t* tmp = mr.me.ctbEntry.pTags;
char *data = taosMemoryCalloc(kvRowLen(tmp) + 1, 1); char* data = taosMemoryCalloc(kvRowLen(tmp) + 1, 1);
if(data == NULL){ if (data == NULL) {
qError("doTagScan calloc error:%d", kvRowLen(tmp) + 1); qError("doTagScan calloc error:%d", kvRowLen(tmp) + 1);
return; return;
} }
*data = TSDB_DATA_TYPE_JSON; *data = TSDB_DATA_TYPE_JSON;
memcpy(data+1, tmp, kvRowLen(tmp)); memcpy(data + 1, tmp, kvRowLen(tmp));
p = data; p = data;
}else{ } else {
p = metaGetTableTagVal(&mr.me, pExpr->base.pParam[0].pCol->colId); p = metaGetTableTagVal(&mr.me, pExpr->base.pParam[0].pCol->colId);
} }
for (int32_t i = 0; i < pBlock->info.rows; ++i) { for (int32_t i = 0; i < pBlock->info.rows; ++i) {
colDataAppend(pColInfoData, i, p, (p == NULL)); colDataAppend(pColInfoData, i, p, (p == NULL));
} }
if(pColInfoData->info.type == TSDB_DATA_TYPE_JSON){ if (pColInfoData->info.type == TSDB_DATA_TYPE_JSON) {
taosMemoryFree((void*)p); taosMemoryFree((void*)p);
} }
} }
...@@ -338,9 +338,8 @@ void setTbNameColData(void* pMeta, const SSDataBlock* pBlock, SColumnInfoData* p ...@@ -338,9 +338,8 @@ void setTbNameColData(void* pMeta, const SSDataBlock* pBlock, SColumnInfoData* p
infoData.info.bytes = sizeof(uint64_t); infoData.info.bytes = sizeof(uint64_t);
colInfoDataEnsureCapacity(&infoData, 0, 1); colInfoDataEnsureCapacity(&infoData, 0, 1);
colDataAppendInt64(&infoData, 0, (int64_t*) &pBlock->info.uid); colDataAppendInt64(&infoData, 0, (int64_t*)&pBlock->info.uid);
SScalarParam srcParam = { SScalarParam srcParam = {.numOfRows = pBlock->info.rows, .param = pMeta, .columnData = &infoData};
.numOfRows = pBlock->info.rows, .param = pMeta, .columnData = &infoData};
SScalarParam param = {.columnData = pColInfoData}; SScalarParam param = {.columnData = pColInfoData};
fpSet.process(&srcParam, 1, &param); fpSet.process(&srcParam, 1, &param);
...@@ -372,7 +371,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { ...@@ -372,7 +371,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
} }
pOperator->resultInfo.totalRows = pTableScanInfo->readRecorder.totalRows; pOperator->resultInfo.totalRows = pTableScanInfo->readRecorder.totalRows;
pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st)/1000.0; pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
pOperator->cost.totalCost = pTableScanInfo->readRecorder.elapsedTime; pOperator->cost.totalCost = pTableScanInfo->readRecorder.elapsedTime;
return pBlock; return pBlock;
...@@ -405,7 +404,8 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { ...@@ -405,7 +404,8 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
STimeWindow* pWin = &pTableScanInfo->cond.twindow; STimeWindow* pWin = &pTableScanInfo->cond.twindow;
qDebug("%s start to repeat ascending order scan data blocks due to query func required, qrange:%" PRId64 qDebug("%s start to repeat ascending order scan data blocks due to query func required, qrange:%" PRId64
"-%" PRId64, GET_TASKID(pTaskInfo), pWin->skey, pWin->ekey); "-%" PRId64,
GET_TASKID(pTaskInfo), pWin->skey, pWin->ekey);
// do prepare for the next round table scan operation // do prepare for the next round table scan operation
tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond); tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond);
...@@ -480,7 +480,8 @@ static void destroyTableScanOperatorInfo(void* param, int32_t numOfOutput) { ...@@ -480,7 +480,8 @@ static void destroyTableScanOperatorInfo(void* param, int32_t numOfOutput) {
} }
} }
SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, tsdbReaderT pDataReader, SReadHandle* readHandle, SExecTaskInfo* pTaskInfo) { SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, tsdbReaderT pDataReader,
SReadHandle* readHandle, SExecTaskInfo* pTaskInfo) {
STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo)); STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) { if (pInfo == NULL || pOperator == NULL) {
...@@ -494,7 +495,8 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, ...@@ -494,7 +495,8 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
SDataBlockDescNode* pDescNode = pTableScanNode->scan.node.pOutputDataBlockDesc; SDataBlockDescNode* pDescNode = pTableScanNode->scan.node.pOutputDataBlockDesc;
int32_t numOfCols = 0; int32_t numOfCols = 0;
SArray* pColList = extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, pTaskInfo, COL_MATCH_FROM_COL_ID); SArray* pColList =
extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, pTaskInfo, COL_MATCH_FROM_COL_ID);
int32_t code = initQueryTableDataCond(&pInfo->cond, pTableScanNode); int32_t code = initQueryTableDataCond(&pInfo->cond, pTableScanNode);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
...@@ -507,7 +509,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, ...@@ -507,7 +509,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
} }
pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]}; pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]};
// pInfo->scanInfo = (SScanInfo){.numOfAsc = 0, .numOfDesc = 1}; // for debug purpose // pInfo->scanInfo = (SScanInfo){.numOfAsc = 0, .numOfDesc = 1}; // for debug purpose
pInfo->readHandle = *readHandle; pInfo->readHandle = *readHandle;
pInfo->interval = extractIntervalInfo(pTableScanNode); pInfo->interval = extractIntervalInfo(pTableScanNode);
...@@ -527,7 +529,8 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, ...@@ -527,7 +529,8 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
pOperator->numOfExprs = numOfCols; pOperator->numOfExprs = numOfCols;
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScan, NULL, NULL, destroyTableScanOperatorInfo, NULL, NULL, getTableScannerExecInfo); pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScan, NULL, NULL, destroyTableScanOperatorInfo,
NULL, NULL, getTableScannerExecInfo);
// for non-blocking operator, the open cost is always 0 // for non-blocking operator, the open cost is always 0
pOperator->cost.openCost = 0; pOperator->cost.openCost = 0;
...@@ -645,15 +648,13 @@ static void doClearBufferedBlocks(SStreamBlockScanInfo* pInfo) { ...@@ -645,15 +648,13 @@ static void doClearBufferedBlocks(SStreamBlockScanInfo* pInfo) {
taosArrayClear(pInfo->pBlockLists); taosArrayClear(pInfo->pBlockLists);
} }
static bool isSessionWindow(SStreamBlockScanInfo* pInfo) { static bool isSessionWindow(SStreamBlockScanInfo* pInfo) { return pInfo->sessionSup.pStreamAggSup != NULL; }
return pInfo->sessionSup.pStreamAggSup != NULL;
}
static bool prepareDataScan(SStreamBlockScanInfo* pInfo) { static bool prepareDataScan(SStreamBlockScanInfo* pInfo) {
SSDataBlock* pSDB = pInfo->pUpdateRes; SSDataBlock* pSDB = pInfo->pUpdateRes;
if (pInfo->updateResIndex < pSDB->info.rows) { if (pInfo->updateResIndex < pSDB->info.rows) {
SColumnInfoData* pColDataInfo = taosArrayGet(pSDB->pDataBlock, 0); SColumnInfoData* pColDataInfo = taosArrayGet(pSDB->pDataBlock, 0);
TSKEY *tsCols = (TSKEY*)pColDataInfo->pData; TSKEY* tsCols = (TSKEY*)pColDataInfo->pData;
SResultRowInfo dumyInfo; SResultRowInfo dumyInfo;
dumyInfo.cur.pageId = -1; dumyInfo.cur.pageId = -1;
STimeWindow win; STimeWindow win;
...@@ -661,16 +662,16 @@ static bool prepareDataScan(SStreamBlockScanInfo* pInfo) { ...@@ -661,16 +662,16 @@ static bool prepareDataScan(SStreamBlockScanInfo* pInfo) {
SStreamAggSupporter* pAggSup = pInfo->sessionSup.pStreamAggSup; SStreamAggSupporter* pAggSup = pInfo->sessionSup.pStreamAggSup;
int64_t gap = pInfo->sessionSup.gap; int64_t gap = pInfo->sessionSup.gap;
int32_t winIndex = 0; int32_t winIndex = 0;
SResultWindowInfo* pCurWin = getSessionTimeWindow(pAggSup->pResultRows, SResultWindowInfo* pCurWin =
tsCols[pInfo->updateResIndex], gap, &winIndex); getSessionTimeWindow(pAggSup->pResultRows, tsCols[pInfo->updateResIndex], gap, &winIndex);
win = pCurWin->win; win = pCurWin->win;
pInfo->updateResIndex += updateSessionWindowInfo(pCurWin, tsCols, pSDB->info.rows, pInfo->updateResIndex +=
pInfo->updateResIndex, gap, NULL); updateSessionWindowInfo(pCurWin, tsCols, pSDB->info.rows, pInfo->updateResIndex, gap, NULL);
} else { } else {
win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[pInfo->updateResIndex], win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[pInfo->updateResIndex], &pInfo->interval,
&pInfo->interval, pInfo->interval.precision, NULL); pInfo->interval.precision, NULL);
pInfo->updateResIndex += getNumOfRowsInTimeWindow(&pSDB->info, tsCols, pInfo->updateResIndex, pInfo->updateResIndex += getNumOfRowsInTimeWindow(&pSDB->info, tsCols, pInfo->updateResIndex, win.ekey,
win.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC); binarySearchForKey, NULL, TSDB_ORDER_ASC);
} }
STableScanInfo* pTableScanInfo = pInfo->pOperatorDumy->info; STableScanInfo* pTableScanInfo = pInfo->pOperatorDumy->info;
pTableScanInfo->cond.twindow = win; pTableScanInfo->cond.twindow = win;
...@@ -710,7 +711,7 @@ static SSDataBlock* getUpdateDataBlock(SStreamBlockScanInfo* pInfo, bool inverti ...@@ -710,7 +711,7 @@ static SSDataBlock* getUpdateDataBlock(SStreamBlockScanInfo* pInfo, bool inverti
// taosArrayClear(pInfo->tsArray); // taosArrayClear(pInfo->tsArray);
// return p; // return p;
SSDataBlock* pDataBlock = createOneDataBlock(pInfo->pRes, false); SSDataBlock* pDataBlock = createOneDataBlock(pInfo->pRes, false);
SColumnInfoData* pCol = (SColumnInfoData*) taosArrayGet(pDataBlock->pDataBlock, 0); SColumnInfoData* pCol = (SColumnInfoData*)taosArrayGet(pDataBlock->pDataBlock, 0);
ASSERT(pCol->info.type == TSDB_DATA_TYPE_TIMESTAMP); ASSERT(pCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
colInfoDataEnsureCapacity(pCol, 0, size); colInfoDataEnsureCapacity(pCol, 0, size);
for (int32_t i = 0; i < size; i++) { for (int32_t i = 0; i < size; i++) {
...@@ -733,19 +734,17 @@ void static setSupKeyBuf(SCatchSupporter* pSup, int64_t groupId, int64_t childId ...@@ -733,19 +734,17 @@ void static setSupKeyBuf(SCatchSupporter* pSup, int64_t groupId, int64_t childId
pKey[2] = ts; pKey[2] = ts;
} }
static int32_t catchWidonwInfo(SSDataBlock* pDataBlock, SCatchSupporter* pSup, static int32_t catchWidonwInfo(SSDataBlock* pDataBlock, SCatchSupporter* pSup, int32_t pageId, int32_t tsIndex,
int32_t pageId, int32_t tsIndex, int64_t childId) { int64_t childId) {
SColumnInfoData* pColDataInfo = taosArrayGet(pDataBlock->pDataBlock, tsIndex); SColumnInfoData* pColDataInfo = taosArrayGet(pDataBlock->pDataBlock, tsIndex);
TSKEY* tsCols = (int64_t*)pColDataInfo->pData; TSKEY* tsCols = (int64_t*)pColDataInfo->pData;
for (int32_t i = 0; i < pDataBlock->info.rows; i++) { for (int32_t i = 0; i < pDataBlock->info.rows; i++) {
setSupKeyBuf(pSup, pDataBlock->info.groupId, childId, tsCols[i]); setSupKeyBuf(pSup, pDataBlock->info.groupId, childId, tsCols[i]);
SWindowPosition* p1 = (SWindowPosition*)taosHashGet(pSup->pWindowHashTable, SWindowPosition* p1 = (SWindowPosition*)taosHashGet(pSup->pWindowHashTable, pSup->pKeyBuf, pSup->keySize);
pSup->pKeyBuf, pSup->keySize);
if (p1 == NULL) { if (p1 == NULL) {
SWindowPosition pos = {.pageId = pageId, .rowId = i}; SWindowPosition pos = {.pageId = pageId, .rowId = i};
int32_t code = taosHashPut(pSup->pWindowHashTable, pSup->pKeyBuf, pSup->keySize, &pos, int32_t code = taosHashPut(pSup->pWindowHashTable, pSup->pKeyBuf, pSup->keySize, &pos, sizeof(SWindowPosition));
sizeof(SWindowPosition)); if (code != TSDB_CODE_SUCCESS) {
if (code != TSDB_CODE_SUCCESS ) {
return code; return code;
} }
} else { } else {
...@@ -756,12 +755,11 @@ static int32_t catchWidonwInfo(SSDataBlock* pDataBlock, SCatchSupporter* pSup, ...@@ -756,12 +755,11 @@ static int32_t catchWidonwInfo(SSDataBlock* pDataBlock, SCatchSupporter* pSup,
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t catchDatablock(SSDataBlock* pDataBlock, SCatchSupporter* pSup, static int32_t catchDatablock(SSDataBlock* pDataBlock, SCatchSupporter* pSup, int32_t tsIndex, int64_t childId) {
int32_t tsIndex, int64_t childId) {
int32_t start = 0; int32_t start = 0;
int32_t stop = 0; int32_t stop = 0;
int32_t pageSize = getBufPageSize(pSup->pDataBuf); int32_t pageSize = getBufPageSize(pSup->pDataBuf);
while(start < pDataBlock->info.rows) { while (start < pDataBlock->info.rows) {
blockDataSplitRows(pDataBlock, pDataBlock->info.hasVarCol, start, &stop, pageSize); blockDataSplitRows(pDataBlock, pDataBlock->info.hasVarCol, start, &stop, pageSize);
SSDataBlock* pDB = blockDataExtractBlock(pDataBlock, start, stop - start + 1); SSDataBlock* pDB = blockDataExtractBlock(pDataBlock, start, stop - start + 1);
if (pDB == NULL) { if (pDB == NULL) {
...@@ -781,7 +779,7 @@ static int32_t catchDatablock(SSDataBlock* pDataBlock, SCatchSupporter* pSup, ...@@ -781,7 +779,7 @@ static int32_t catchDatablock(SSDataBlock* pDataBlock, SCatchSupporter* pSup,
blockDataDestroy(pDB); blockDataDestroy(pDB);
start = stop + 1; start = stop + 1;
int32_t code = catchWidonwInfo(pDB, pSup, pageId, tsIndex, childId); int32_t code = catchWidonwInfo(pDB, pSup, pageId, tsIndex, childId);
if (code != TSDB_CODE_SUCCESS ) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
} }
...@@ -794,14 +792,12 @@ static SSDataBlock* getDataFromCatch(SStreamBlockScanInfo* pInfo) { ...@@ -794,14 +792,12 @@ static SSDataBlock* getDataFromCatch(SStreamBlockScanInfo* pInfo) {
blockDataCleanup(pInfo->pRes); blockDataCleanup(pInfo->pRes);
SCatchSupporter* pCSup = &pInfo->childAggSup; SCatchSupporter* pCSup = &pInfo->childAggSup;
SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, 0); SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, 0);
TSKEY *tsCols = (TSKEY*)pColDataInfo->pData; TSKEY* tsCols = (TSKEY*)pColDataInfo->pData;
int32_t size = taosArrayGetSize(pInfo->childIds); int32_t size = taosArrayGetSize(pInfo->childIds);
for (int32_t i = 0; i < size; i++) { for (int32_t i = 0; i < size; i++) {
int64_t id = *(int64_t *)taosArrayGet(pInfo->childIds, i); int64_t id = *(int64_t*)taosArrayGet(pInfo->childIds, i);
setSupKeyBuf(pCSup, pBlock->info.groupId, id, setSupKeyBuf(pCSup, pBlock->info.groupId, id, tsCols[pInfo->updateResIndex]);
tsCols[pInfo->updateResIndex]); SWindowPosition* pos = (SWindowPosition*)taosHashGet(pCSup->pWindowHashTable, pCSup->pKeyBuf, pCSup->keySize);
SWindowPosition* pos = (SWindowPosition*)taosHashGet(pCSup->pWindowHashTable,
pCSup->pKeyBuf, pCSup->keySize);
void* buf = getBufPage(pCSup->pDataBuf, pos->pageId); void* buf = getBufPage(pCSup->pDataBuf, pos->pageId);
SSDataBlock* pDB = createOneDataBlock(pInfo->pRes, false); SSDataBlock* pDB = createOneDataBlock(pInfo->pRes, false);
blockDataFromBuf(pDB, buf); blockDataFromBuf(pDB, buf);
...@@ -942,7 +938,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { ...@@ -942,7 +938,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
if (rows == 0) { if (rows == 0) {
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
} else if (pInfo->pUpdateInfo) { } else if (pInfo->pUpdateInfo) {
SSDataBlock* upRes = getUpdateDataBlock(pInfo, true); //TODO(liuyao) get invertible from plan SSDataBlock* upRes = getUpdateDataBlock(pInfo, true); // TODO(liuyao) get invertible from plan
if (upRes) { if (upRes) {
pInfo->pUpdateRes = upRes; pInfo->pUpdateRes = upRes;
if (upRes->info.type == STREAM_REPROCESS) { if (upRes->info.type == STREAM_REPROCESS) {
...@@ -1019,7 +1015,8 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, void* pDataR ...@@ -1019,7 +1015,8 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, void* pDataR
pInfo->interval = pSTInfo->interval; pInfo->interval = pSTInfo->interval;
pInfo->sessionSup = (SessionWindowSupporter){.pStreamAggSup = NULL, .gap = -1}; pInfo->sessionSup = (SessionWindowSupporter){.pStreamAggSup = NULL, .gap = -1};
initCatchSupporter(&pInfo->childAggSup, 1024, "StreamFinalInterval", "/tmp/"); // TODO(liuyao) get row size from phy plan initCatchSupporter(&pInfo->childAggSup, 1024, "StreamFinalInterval",
"/tmp/"); // TODO(liuyao) get row size from phy plan
pOperator->name = "StreamBlockScanOperator"; pOperator->name = "StreamBlockScanOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN;
...@@ -1290,7 +1287,7 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) { ...@@ -1290,7 +1287,7 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
// number of columns // number of columns
pColInfoData = taosArrayGet(p->pDataBlock, 3); pColInfoData = taosArrayGet(p->pDataBlock, 3);
colDataAppend(pColInfoData, numOfRows, (char*)&mr.me.stbEntry.schema.nCols, false); colDataAppend(pColInfoData, numOfRows, (char*)&mr.me.stbEntry.schemaRow.nCols, false);
// super table name // super table name
STR_TO_VARSTR(str, mr.me.name); STR_TO_VARSTR(str, mr.me.name);
...@@ -1314,7 +1311,7 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) { ...@@ -1314,7 +1311,7 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
// number of columns // number of columns
pColInfoData = taosArrayGet(p->pDataBlock, 3); pColInfoData = taosArrayGet(p->pDataBlock, 3);
colDataAppend(pColInfoData, numOfRows, (char*)&pInfo->pCur->mr.me.ntbEntry.schema.nCols, false); colDataAppend(pColInfoData, numOfRows, (char*)&pInfo->pCur->mr.me.ntbEntry.schemaRow.nCols, false);
// super table name // super table name
pColInfoData = taosArrayGet(p->pDataBlock, 4); pColInfoData = taosArrayGet(p->pDataBlock, 4);
......
...@@ -3951,7 +3951,7 @@ typedef struct SVgroupCreateTableBatch { ...@@ -3951,7 +3951,7 @@ typedef struct SVgroupCreateTableBatch {
static void destroyCreateTbReq(SVCreateTbReq* pReq) { static void destroyCreateTbReq(SVCreateTbReq* pReq) {
taosMemoryFreeClear(pReq->name); taosMemoryFreeClear(pReq->name);
taosMemoryFreeClear(pReq->ntb.schema.pSchema); taosMemoryFreeClear(pReq->ntb.schemaRow.pSchema);
} }
static int32_t buildNormalTableBatchReq(int32_t acctId, const SCreateTableStmt* pStmt, const SVgroupInfo* pVgroupInfo, static int32_t buildNormalTableBatchReq(int32_t acctId, const SCreateTableStmt* pStmt, const SVgroupInfo* pVgroupInfo,
...@@ -3964,10 +3964,10 @@ static int32_t buildNormalTableBatchReq(int32_t acctId, const SCreateTableStmt* ...@@ -3964,10 +3964,10 @@ static int32_t buildNormalTableBatchReq(int32_t acctId, const SCreateTableStmt*
SVCreateTbReq req = {0}; SVCreateTbReq req = {0};
req.type = TD_NORMAL_TABLE; req.type = TD_NORMAL_TABLE;
req.name = strdup(pStmt->tableName); req.name = strdup(pStmt->tableName);
req.ntb.schema.nCols = LIST_LENGTH(pStmt->pCols); req.ntb.schemaRow.nCols = LIST_LENGTH(pStmt->pCols);
req.ntb.schema.sver = 1; req.ntb.schemaRow.version = 1;
req.ntb.schema.pSchema = taosMemoryCalloc(req.ntb.schema.nCols, sizeof(SSchema)); req.ntb.schemaRow.pSchema = taosMemoryCalloc(req.ntb.schemaRow.nCols, sizeof(SSchema));
if (NULL == req.name || NULL == req.ntb.schema.pSchema) { if (NULL == req.name || NULL == req.ntb.schemaRow.pSchema) {
destroyCreateTbReq(&req); destroyCreateTbReq(&req);
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
...@@ -3977,7 +3977,7 @@ static int32_t buildNormalTableBatchReq(int32_t acctId, const SCreateTableStmt* ...@@ -3977,7 +3977,7 @@ static int32_t buildNormalTableBatchReq(int32_t acctId, const SCreateTableStmt*
SNode* pCol; SNode* pCol;
col_id_t index = 0; col_id_t index = 0;
FOREACH(pCol, pStmt->pCols) { FOREACH(pCol, pStmt->pCols) {
toSchema((SColumnDefNode*)pCol, index + 1, req.ntb.schema.pSchema + index); toSchema((SColumnDefNode*)pCol, index + 1, req.ntb.schemaRow.pSchema + index);
++index; ++index;
} }
pBatch->info = *pVgroupInfo; pBatch->info = *pVgroupInfo;
...@@ -4031,7 +4031,7 @@ static void destroyCreateTbReqBatch(SVgroupCreateTableBatch* pTbBatch) { ...@@ -4031,7 +4031,7 @@ static void destroyCreateTbReqBatch(SVgroupCreateTableBatch* pTbBatch) {
taosMemoryFreeClear(pTableReq->name); taosMemoryFreeClear(pTableReq->name);
if (pTableReq->type == TSDB_NORMAL_TABLE) { if (pTableReq->type == TSDB_NORMAL_TABLE) {
taosMemoryFreeClear(pTableReq->ntb.schema.pSchema); taosMemoryFreeClear(pTableReq->ntb.schemaRow.pSchema);
} else if (pTableReq->type == TSDB_CHILD_TABLE) { } else if (pTableReq->type == TSDB_CHILD_TABLE) {
taosMemoryFreeClear(pTableReq->ctb.pTag); taosMemoryFreeClear(pTableReq->ctb.pTag);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册