diff --git a/include/common/tmsg.h b/include/common/tmsg.h index e20b51aa6afa9fe41a85037256f751d587f1e6fe..88c66df52e80cb285243bf77c17f34586ed55fa3 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -300,9 +300,7 @@ typedef struct SSchema { typedef struct { int32_t nCols; - int32_t sver; - int32_t tagVer; - int32_t colVer; + int32_t version; SSchema* pSchema; } SSchemaWrapper; @@ -310,9 +308,7 @@ static FORCE_INLINE SSchemaWrapper* tCloneSSchemaWrapper(const SSchemaWrapper* p SSchemaWrapper* pSW = (SSchemaWrapper*)taosMemoryMalloc(sizeof(SSchemaWrapper)); if (pSW == NULL) return pSW; pSW->nCols = pSchemaWrapper->nCols; - pSW->sver = pSchemaWrapper->sver; - pSW->tagVer = pSchemaWrapper->tagVer; - pSW->colVer = pSchemaWrapper->colVer; + pSW->version = pSchemaWrapper->version; pSW->pSchema = (SSchema*)taosMemoryCalloc(pSW->nCols, sizeof(SSchema)); if (pSW->pSchema == NULL) { taosMemoryFree(pSW); @@ -367,9 +363,7 @@ static FORCE_INLINE int32_t tDecodeSSchema(SDecoder* pDecoder, SSchema* pSchema) static FORCE_INLINE int32_t taosEncodeSSchemaWrapper(void** buf, const SSchemaWrapper* pSW) { int32_t tlen = 0; tlen += taosEncodeVariantI32(buf, pSW->nCols); - tlen += taosEncodeVariantI32(buf, pSW->sver); - tlen += taosEncodeVariantI32(buf, pSW->tagVer); - tlen += taosEncodeVariantI32(buf, pSW->colVer); + tlen += taosEncodeVariantI32(buf, pSW->version); for (int32_t i = 0; i < pSW->nCols; i++) { tlen += taosEncodeSSchema(buf, &pSW->pSchema[i]); } @@ -378,9 +372,7 @@ static FORCE_INLINE int32_t taosEncodeSSchemaWrapper(void** buf, const SSchemaWr static FORCE_INLINE void* taosDecodeSSchemaWrapper(const void* buf, SSchemaWrapper* pSW) { buf = taosDecodeVariantI32(buf, &pSW->nCols); - buf = taosDecodeVariantI32(buf, &pSW->sver); - buf = taosDecodeVariantI32(buf, &pSW->tagVer); - buf = taosDecodeVariantI32(buf, &pSW->colVer); + buf = taosDecodeVariantI32(buf, &pSW->version); pSW->pSchema = (SSchema*)taosMemoryCalloc(pSW->nCols, sizeof(SSchema)); if (pSW->pSchema == NULL) { return NULL; @@ -394,9 +386,7 @@ static FORCE_INLINE void* taosDecodeSSchemaWrapper(const void* buf, SSchemaWrapp static FORCE_INLINE int32_t tEncodeSSchemaWrapper(SEncoder* pEncoder, const SSchemaWrapper* pSW) { if (tEncodeI32v(pEncoder, pSW->nCols) < 0) return -1; - if (tEncodeI32v(pEncoder, pSW->sver) < 0) return -1; - if (tEncodeI32v(pEncoder, pSW->tagVer) < 0) return -1; - if (tEncodeI32v(pEncoder, pSW->colVer) < 0) return -1; + if (tEncodeI32v(pEncoder, pSW->version) < 0) return -1; for (int32_t i = 0; i < pSW->nCols; i++) { if (tEncodeSSchema(pEncoder, &pSW->pSchema[i]) < 0) return -1; } @@ -406,9 +396,7 @@ static FORCE_INLINE int32_t tEncodeSSchemaWrapper(SEncoder* pEncoder, const SSch static FORCE_INLINE int32_t tDecodeSSchemaWrapper(SDecoder* pDecoder, SSchemaWrapper* pSW) { if (tDecodeI32v(pDecoder, &pSW->nCols) < 0) return -1; - if (tDecodeI32v(pDecoder, &pSW->sver) < 0) return -1; - if (tDecodeI32v(pDecoder, &pSW->tagVer) < 0) return -1; - if (tDecodeI32v(pDecoder, &pSW->colVer) < 0) return -1; + if (tDecodeI32v(pDecoder, &pSW->version) < 0) return -1; pSW->pSchema = (SSchema*)taosMemoryCalloc(pSW->nCols, sizeof(SSchema)); if (pSW->pSchema == NULL) return -1; @@ -421,9 +409,7 @@ static FORCE_INLINE int32_t tDecodeSSchemaWrapper(SDecoder* pDecoder, SSchemaWra static FORCE_INLINE int32_t tDecodeSSchemaWrapperEx(SDecoder* pDecoder, SSchemaWrapper* pSW) { if (tDecodeI32v(pDecoder, &pSW->nCols) < 0) return -1; - if (tDecodeI32v(pDecoder, &pSW->sver) < 0) return -1; - if (tDecodeI32v(pDecoder, &pSW->tagVer) < 0) return -1; - if (tDecodeI32v(pDecoder, &pSW->colVer) < 0) return -1; + if (tDecodeI32v(pDecoder, &pSW->version) < 0) return -1; pSW->pSchema = (SSchema*)tDecoderMalloc(pDecoder, pSW->nCols * sizeof(SSchema)); if (pSW->pSchema == NULL) return -1; @@ -1713,7 +1699,7 @@ typedef struct SVCreateStbReq { char* name; tb_uid_t suid; int8_t rollup; - SSchemaWrapper schema; + SSchemaWrapper schemaRow; SSchemaWrapper schemaTag; SRSmaParam pRSmaParam; } SVCreateStbReq; @@ -1745,7 +1731,7 @@ typedef struct SVCreateTbReq { uint8_t* pTag; } ctb; struct { - SSchemaWrapper schema; + SSchemaWrapper schemaRow; } ntb; }; } SVCreateTbReq; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 19ba42332e80cc08aaedc966f38b506072fc9bd6..23efb8ef9d6be13e5cca52303c59bf6da68243ba 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -3801,7 +3801,7 @@ int tEncodeSVCreateStbReq(SEncoder *pCoder, const SVCreateStbReq *pReq) { if (tEncodeCStr(pCoder, pReq->name) < 0) return -1; if (tEncodeI64(pCoder, pReq->suid) < 0) return -1; if (tEncodeI8(pCoder, pReq->rollup) < 0) return -1; - if (tEncodeSSchemaWrapper(pCoder, &pReq->schema) < 0) return -1; + if (tEncodeSSchemaWrapper(pCoder, &pReq->schemaRow) < 0) return -1; if (tEncodeSSchemaWrapper(pCoder, &pReq->schemaTag) < 0) return -1; if (pReq->rollup) { if (tEncodeSRSmaParam(pCoder, &pReq->pRSmaParam) < 0) return -1; @@ -3817,7 +3817,7 @@ int tDecodeSVCreateStbReq(SDecoder *pCoder, SVCreateStbReq *pReq) { if (tDecodeCStr(pCoder, &pReq->name) < 0) return -1; if (tDecodeI64(pCoder, &pReq->suid) < 0) return -1; if (tDecodeI8(pCoder, &pReq->rollup) < 0) return -1; - if (tDecodeSSchemaWrapper(pCoder, &pReq->schema) < 0) return -1; + if (tDecodeSSchemaWrapper(pCoder, &pReq->schemaRow) < 0) return -1; if (tDecodeSSchemaWrapper(pCoder, &pReq->schemaTag) < 0) return -1; if (pReq->rollup) { if (tDecodeSRSmaParam(pCoder, &pReq->pRSmaParam) < 0) return -1; @@ -3866,7 +3866,7 @@ int tEncodeSVCreateTbReq(SEncoder *pCoder, const SVCreateTbReq *pReq) { if (tEncodeI64(pCoder, pReq->ctb.suid) < 0) return -1; if (tEncodeBinary(pCoder, pReq->ctb.pTag, kvRowLen(pReq->ctb.pTag)) < 0) return -1; } else if (pReq->type == TSDB_NORMAL_TABLE) { - if (tEncodeSSchemaWrapper(pCoder, &pReq->ntb.schema) < 0) return -1; + if (tEncodeSSchemaWrapper(pCoder, &pReq->ntb.schemaRow) < 0) return -1; } else { ASSERT(0); } @@ -3892,7 +3892,7 @@ int tDecodeSVCreateTbReq(SDecoder *pCoder, SVCreateTbReq *pReq) { if (tDecodeI64(pCoder, &pReq->ctb.suid) < 0) return -1; if (tDecodeBinary(pCoder, &pReq->ctb.pTag, &len) < 0) return -1; } else if (pReq->type == TSDB_NORMAL_TABLE) { - if (tDecodeSSchemaWrapper(pCoder, &pReq->ntb.schema) < 0) return -1; + if (tDecodeSSchemaWrapper(pCoder, &pReq->ntb.schemaRow) < 0) return -1; } else { ASSERT(0); } diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 61f115e2bab32b64ee6a57e967fb0c8e5c287d0f..e5c464e0b0c7b9d3676a8e495846b82eb16b6130 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -388,13 +388,12 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt req.name = (char *)tNameGetTableName(&name); req.suid = pStb->uid; req.rollup = pStb->ast1Len > 0 ? 1 : 0; - req.schema.nCols = pStb->numOfColumns; - req.schema.sver = pStb->version; - req.schema.tagVer = pStb->tagVer; - req.schema.colVer = pStb->colVer; - req.schema.pSchema = pStb->pColumns; + // todo + req.schemaRow.nCols = pStb->numOfColumns; + req.schemaRow.version = pStb->version; + req.schemaRow.pSchema = pStb->pColumns; req.schemaTag.nCols = pStb->numOfTags; - req.schemaTag.sver = 1; + req.schemaTag.version = pStb->tagVer; req.schemaTag.pSchema = pStb->pTags; if (req.rollup) { diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index ec3d30ff07b2eeb169cc214cc24fe43be4fe9f15..2048c798475062055520fe25e0249f411615b81f 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -217,7 +217,7 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) { } } else { pTopic->schema.nCols = 0; - pTopic->schema.sver = 0; + pTopic->schema.version = 0; pTopic->schema.pSchema = NULL; } diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 60262451745b4cc5b49b00a1a02386cd060f460d..bfc275c74553da4652d6892e31b98e9e1eb0053d 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -182,7 +182,7 @@ struct SMetaEntry { char *name; union { struct { - SSchemaWrapper schema; + SSchemaWrapper schemaRow; SSchemaWrapper schemaTag; } stbEntry; struct { @@ -195,7 +195,7 @@ struct SMetaEntry { int64_t ctime; int32_t ttlDays; int32_t ncid; // next column id - SSchemaWrapper schema; + SSchemaWrapper schemaRow; } ntbEntry; struct { STSma *tsma; diff --git a/source/dnode/vnode/src/meta/metaEntry.c b/source/dnode/vnode/src/meta/metaEntry.c index 8a4db3100d31695af2889367088c9a0e16bb6236..be2ddfc32f83fcf0d6b5500fb21cdec632c27aa8 100644 --- a/source/dnode/vnode/src/meta/metaEntry.c +++ b/source/dnode/vnode/src/meta/metaEntry.c @@ -24,7 +24,7 @@ int metaEncodeEntry(SEncoder *pCoder, const SMetaEntry *pME) { if (tEncodeCStr(pCoder, pME->name) < 0) return -1; if (pME->type == TSDB_SUPER_TABLE) { - if (tEncodeSSchemaWrapper(pCoder, &pME->stbEntry.schema) < 0) return -1; + if (tEncodeSSchemaWrapper(pCoder, &pME->stbEntry.schemaRow) < 0) return -1; if (tEncodeSSchemaWrapper(pCoder, &pME->stbEntry.schemaTag) < 0) return -1; } else if (pME->type == TSDB_CHILD_TABLE) { if (tEncodeI64(pCoder, pME->ctbEntry.ctime) < 0) return -1; @@ -35,7 +35,7 @@ int metaEncodeEntry(SEncoder *pCoder, const SMetaEntry *pME) { if (tEncodeI64(pCoder, pME->ntbEntry.ctime) < 0) return -1; if (tEncodeI32(pCoder, pME->ntbEntry.ttlDays) < 0) return -1; if (tEncodeI32v(pCoder, pME->ntbEntry.ncid) < 0) return -1; - if (tEncodeSSchemaWrapper(pCoder, &pME->ntbEntry.schema) < 0) return -1; + if (tEncodeSSchemaWrapper(pCoder, &pME->ntbEntry.schemaRow) < 0) return -1; } else if (pME->type == TSDB_TSMA_TABLE) { if (tEncodeTSma(pCoder, pME->smaEntry.tsma) < 0) return -1; } else { @@ -56,7 +56,7 @@ int metaDecodeEntry(SDecoder *pCoder, SMetaEntry *pME) { if (tDecodeCStr(pCoder, &pME->name) < 0) return -1; if (pME->type == TSDB_SUPER_TABLE) { - if (tDecodeSSchemaWrapperEx(pCoder, &pME->stbEntry.schema) < 0) return -1; + if (tDecodeSSchemaWrapperEx(pCoder, &pME->stbEntry.schemaRow) < 0) return -1; if (tDecodeSSchemaWrapperEx(pCoder, &pME->stbEntry.schemaTag) < 0) return -1; } else if (pME->type == TSDB_CHILD_TABLE) { if (tDecodeI64(pCoder, &pME->ctbEntry.ctime) < 0) return -1; @@ -67,7 +67,7 @@ int metaDecodeEntry(SDecoder *pCoder, SMetaEntry *pME) { if (tDecodeI64(pCoder, &pME->ntbEntry.ctime) < 0) return -1; if (tDecodeI32(pCoder, &pME->ntbEntry.ttlDays) < 0) return -1; if (tDecodeI32v(pCoder, &pME->ntbEntry.ncid) < 0) return -1; - if (tDecodeSSchemaWrapperEx(pCoder, &pME->ntbEntry.schema) < 0) return -1; + if (tDecodeSSchemaWrapperEx(pCoder, &pME->ntbEntry.schemaRow) < 0) return -1; } else if (pME->type == TSDB_TSMA_TABLE) { pME->smaEntry.tsma = tDecoderMalloc(pCoder, sizeof(STSma)); if (!pME->smaEntry.tsma) { diff --git a/source/dnode/vnode/src/meta/metaTable.c b/source/dnode/vnode/src/meta/metaTable.c index 462d461a8a23fec805a9739f59a841572ef0bcee..7182f496c4d6410a705a82dba1c92ff6561a5faf 100644 --- a/source/dnode/vnode/src/meta/metaTable.c +++ b/source/dnode/vnode/src/meta/metaTable.c @@ -56,7 +56,7 @@ int metaCreateSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) { me.type = TSDB_SUPER_TABLE; me.uid = pReq->suid; me.name = pReq->name; - me.stbEntry.schema = pReq->schema; + me.stbEntry.schemaRow = pReq->schemaRow; me.stbEntry.schemaTag = pReq->schemaTag; if (metaHandleEntry(pMeta, &me) < 0) goto _err; @@ -182,15 +182,13 @@ int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) { nStbEntry.type = TSDB_SUPER_TABLE; nStbEntry.uid = pReq->suid; nStbEntry.name = pReq->name; - nStbEntry.stbEntry.schema = pReq->schema; + nStbEntry.stbEntry.schemaRow = pReq->schemaRow; nStbEntry.stbEntry.schemaTag = pReq->schemaTag; metaWLock(pMeta); // compare two entry - if (oStbEntry.stbEntry.schema.sver != pReq->schema.sver) { - if (oStbEntry.stbEntry.schema.nCols != pReq->schema.nCols) { - metaSaveToSkmDb(pMeta, &nStbEntry); - } + if (oStbEntry.stbEntry.schemaRow.version != pReq->schemaRow.version) { + metaSaveToSkmDb(pMeta, &nStbEntry); } // if (oStbEntry.stbEntry.schemaTag.sver != pReq->schemaTag.sver) { @@ -247,8 +245,8 @@ int metaCreateTable(SMeta *pMeta, int64_t version, SVCreateTbReq *pReq) { } else { me.ntbEntry.ctime = pReq->ctime; me.ntbEntry.ttlDays = pReq->ttl; - me.ntbEntry.schema = pReq->ntb.schema; - me.ntbEntry.ncid = me.ntbEntry.schema.pSchema[me.ntbEntry.schema.nCols - 1].colId + 1; + me.ntbEntry.schemaRow = pReq->ntb.schemaRow; + me.ntbEntry.ncid = me.ntbEntry.schemaRow.pSchema[me.ntbEntry.schemaRow.nCols - 1].colId + 1; } if (metaHandleEntry(pMeta, &me) < 0) goto _err; @@ -381,7 +379,7 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl } // search the column to add/drop/update - pSchema = &entry.ntbEntry.schema; + pSchema = &entry.ntbEntry.schemaRow; int32_t iCol = 0; for (;;) { pColumn = NULL; @@ -402,16 +400,16 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl terrno = TSDB_CODE_VND_COL_ALREADY_EXISTS; goto _err; } - pSchema->sver++; + pSchema->version++; pSchema->nCols++; pNewSchema = taosMemoryMalloc(sizeof(SSchema) * pSchema->nCols); memcpy(pNewSchema, pSchema->pSchema, sizeof(SSchema) * (pSchema->nCols - 1)); pSchema->pSchema = pNewSchema; - pSchema->pSchema[entry.ntbEntry.schema.nCols - 1].bytes = pAlterTbReq->bytes; - pSchema->pSchema[entry.ntbEntry.schema.nCols - 1].type = pAlterTbReq->type; - pSchema->pSchema[entry.ntbEntry.schema.nCols - 1].flags = pAlterTbReq->flags; - pSchema->pSchema[entry.ntbEntry.schema.nCols - 1].colId = entry.ntbEntry.ncid++; - strcpy(pSchema->pSchema[entry.ntbEntry.schema.nCols - 1].name, pAlterTbReq->colName); + pSchema->pSchema[entry.ntbEntry.schemaRow.nCols - 1].bytes = pAlterTbReq->bytes; + pSchema->pSchema[entry.ntbEntry.schemaRow.nCols - 1].type = pAlterTbReq->type; + pSchema->pSchema[entry.ntbEntry.schemaRow.nCols - 1].flags = pAlterTbReq->flags; + pSchema->pSchema[entry.ntbEntry.schemaRow.nCols - 1].colId = entry.ntbEntry.ncid++; + strcpy(pSchema->pSchema[entry.ntbEntry.schemaRow.nCols - 1].name, pAlterTbReq->colName); break; case TSDB_ALTER_TABLE_DROP_COLUMN: if (pColumn == NULL) { @@ -422,7 +420,7 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl terrno = TSDB_CODE_VND_INVALID_TABLE_ACTION; goto _err; } - pSchema->sver++; + pSchema->version++; tlen = (pSchema->nCols - iCol - 1) * sizeof(SSchema); if (tlen) { memmove(pColumn, pColumn + 1, tlen); @@ -438,7 +436,7 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl terrno = TSDB_CODE_VND_INVALID_TABLE_ACTION; goto _err; } - pSchema->sver++; + pSchema->version++; pColumn->bytes = pAlterTbReq->colModBytes; break; case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME: @@ -446,7 +444,7 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl terrno = TSDB_CODE_VND_TABLE_COL_NOT_EXISTS; goto _err; } - pSchema->sver++; + pSchema->version++; strcpy(pColumn->name, pAlterTbReq->colNewName); break; } @@ -813,15 +811,15 @@ static int metaSaveToSkmDb(SMeta *pMeta, const SMetaEntry *pME) { const SSchemaWrapper *pSW; if (pME->type == TSDB_SUPER_TABLE) { - pSW = &pME->stbEntry.schema; + pSW = &pME->stbEntry.schemaRow; } else if (pME->type == TSDB_NORMAL_TABLE) { - pSW = &pME->ntbEntry.schema; + pSW = &pME->ntbEntry.schemaRow; } else { ASSERT(0); } skmDbKey.uid = pME->uid; - skmDbKey.sver = pSW->sver; + skmDbKey.sver = pSW->version; // encode schema int32_t ret = 0; diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index 0a77274a21a559870ca2a30f11378b2338d0c653..0c8664f36c83d8375fd7cb378af19432189c68dd 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -324,10 +324,10 @@ int tsdbInsertTableData(STsdb *pTsdb, SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlo strcat(pRsp->tblFName, mr.me.name); if (mr.me.type == TSDB_NORMAL_TABLE) { - sverNew = mr.me.ntbEntry.schema.sver; + sverNew = mr.me.ntbEntry.schemaRow.version; } else { metaGetTableEntryByUid(&mr, mr.me.ctbEntry.suid); - sverNew = mr.me.stbEntry.schema.sver; + sverNew = mr.me.stbEntry.schemaRow.version; } metaReaderClear(&mr); diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index 3b47b9025492a7ad53514750b9e88dbf01f52d49..b3aab7788daddf7278754025369ae605688b8411 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -64,7 +64,7 @@ int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) { if (mer1.me.type == TSDB_SUPER_TABLE) { strcpy(metaRsp.stbName, mer1.me.name); - schema = mer1.me.stbEntry.schema; + schema = mer1.me.stbEntry.schemaRow; schemaTag = mer1.me.stbEntry.schemaTag; metaRsp.suid = mer1.me.uid; } else if (mer1.me.type == TSDB_CHILD_TABLE) { @@ -73,10 +73,10 @@ int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) { strcpy(metaRsp.stbName, mer2.me.name); metaRsp.suid = mer2.me.uid; - schema = mer2.me.stbEntry.schema; + schema = mer2.me.stbEntry.schemaRow; schemaTag = mer2.me.stbEntry.schemaTag; } else if (mer1.me.type == TSDB_NORMAL_TABLE) { - schema = mer1.me.ntbEntry.schema; + schema = mer1.me.ntbEntry.schemaRow; } else { ASSERT(0); } @@ -84,7 +84,7 @@ int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) { metaRsp.numOfTags = schemaTag.nCols; metaRsp.numOfColumns = schema.nCols; metaRsp.precision = pVnode->config.tsdbCfg.precision; - metaRsp.sversion = schema.sver; + metaRsp.sversion = schema.version; metaRsp.pSchemas = (SSchema *)taosMemoryMalloc(sizeof(SSchema) * (metaRsp.numOfColumns + metaRsp.numOfTags)); memcpy(metaRsp.pSchemas, schema.pSchema, sizeof(SSchema) * schema.nCols); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 593b79ecc84a3d4af7965a6e2aee37ebc6778a65..fbcb47363636a1feaab716f87448552d2ab97982 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -124,7 +124,7 @@ static void destroySysTableScannerOperatorInfo(void* param, int32_t numOfOutput) void doSetOperatorCompleted(SOperatorInfo* pOperator) { pOperator->status = OP_EXEC_DONE; - pOperator->cost.totalCost = (taosGetTimestampUs() - pOperator->pTaskInfo->cost.start * 1000)/1000.0; + pOperator->cost.totalCost = (taosGetTimestampUs() - pOperator->pTaskInfo->cost.start * 1000) / 1000.0; if (pOperator->pTaskInfo != NULL) { setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); } @@ -2717,7 +2717,7 @@ static void* setAllSourcesCompleted(SOperatorInfo* pOperator, int64_t startTs) { SExchangeInfo* pExchangeInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - int64_t el = taosGetTimestampUs() - startTs; + int64_t el = taosGetTimestampUs() - startTs; SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo; pLoadInfo->totalElapsed += el; @@ -3023,13 +3023,13 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, const SNodeList* p tsem_init(&pInfo->ready, 0, 0); - pOperator->name = "ExchangeOperator"; + pOperator->name = "ExchangeOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; - pOperator->numOfExprs = pBlock->info.numOfCols; - pOperator->pTaskInfo = pTaskInfo; + pOperator->blocking = false; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; + pOperator->numOfExprs = pBlock->info.numOfCols; + pOperator->pTaskInfo = pTaskInfo; pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, doLoadRemoteData, NULL, NULL, destroyExchangeOperatorInfo, NULL, NULL, NULL); @@ -3465,7 +3465,7 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { initGroupedResultInfo(&pAggInfo->groupResInfo, pAggInfo->aggSup.pResultRowHashTable, 0); OPTR_SET_OPENED(pOperator); - pOperator->cost.openCost = (taosGetTimestampUs() - st)/1000.0; + pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0; return TSDB_CODE_SUCCESS; } @@ -3490,10 +3490,10 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) { doSetOperatorCompleted(pOperator); } - size_t rows = blockDataGetNumOfRows(pInfo->pRes);//pInfo->pRes : NULL; + size_t rows = blockDataGetNumOfRows(pInfo->pRes); // pInfo->pRes : NULL; pOperator->resultInfo.totalRows += rows; - return (rows == 0)? NULL:pInfo->pRes; + return (rows == 0) ? NULL : pInfo->pRes; } void aggEncodeResultRow(SOperatorInfo* pOperator, SAggSupporter* pSup, SOptrBasicInfo* pInfo, char** result, @@ -3778,10 +3778,10 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { pOperator->resultInfo.totalRows += rows; if (pOperator->cost.openCost == 0) { - pOperator->cost.openCost = (taosGetTimestampUs() - st)/ 1000.0; + pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0; } - return (rows > 0)? pInfo->pRes:NULL; + return (rows > 0) ? pInfo->pRes : NULL; } static void doHandleRemainBlockForNewGroupImpl(SFillOperatorInfo* pInfo, SResultInfo* pResultInfo, bool* newgroup, @@ -4455,15 +4455,15 @@ void extractTableSchemaVersion(SReadHandle* pHandle, uint64_t uid, SExecTaskInfo pTaskInfo->schemaVer.tablename = strdup(mr.me.name); if (mr.me.type == TSDB_SUPER_TABLE) { - pTaskInfo->schemaVer.sversion = mr.me.stbEntry.schema.sver; - pTaskInfo->schemaVer.tversion = mr.me.stbEntry.schemaTag.sver; + pTaskInfo->schemaVer.sversion = mr.me.stbEntry.schemaRow.version; + pTaskInfo->schemaVer.tversion = mr.me.stbEntry.schemaTag.version; } else if (mr.me.type == TSDB_CHILD_TABLE) { tb_uid_t suid = mr.me.ctbEntry.suid; metaGetTableEntryByUid(&mr, suid); - pTaskInfo->schemaVer.sversion = mr.me.stbEntry.schema.sver; - pTaskInfo->schemaVer.tversion = mr.me.stbEntry.schemaTag.sver; + pTaskInfo->schemaVer.sversion = mr.me.stbEntry.schemaRow.version; + pTaskInfo->schemaVer.tversion = mr.me.stbEntry.schemaTag.version; } else { - pTaskInfo->schemaVer.sversion = mr.me.ntbEntry.schema.sver; + pTaskInfo->schemaVer.sversion = mr.me.ntbEntry.schemaRow.version; } metaReaderClear(&mr); @@ -4668,8 +4668,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); int32_t tsSlotId = ((SColumnNode*)pSessionNode->window.pTspk)->slotId; - pOptr = - createStreamSessionAggOperatorInfo(ops[0], pExprInfo, num, pResBlock, pSessionNode->gap, tsSlotId, &as, pTaskInfo); + pOptr = createStreamSessionAggOperatorInfo(ops[0], pExprInfo, num, pResBlock, pSessionNode->gap, tsSlotId, &as, + pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_PARTITION == type) { SPartitionPhysiNode* pPartNode = (SPartitionPhysiNode*)pPhyNode; @@ -5162,8 +5162,7 @@ int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SExplainExecInfo return TSDB_CODE_SUCCESS; } -int32_t initCatchSupporter(SCatchSupporter* pCatchSup, size_t rowSize, const char* pKey, - const char* pDir) { +int32_t initCatchSupporter(SCatchSupporter* pCatchSup, size_t rowSize, const char* pKey, const char* pDir) { pCatchSup->keySize = sizeof(int64_t) + sizeof(int64_t) + sizeof(TSKEY); pCatchSup->pKeyBuf = taosMemoryCalloc(1, pCatchSup->keySize); _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 17238bbd9be4b28a534cb073de45065568a0014c..c4b3274ae16332b30f3c45f22fa8b3931fa8879e 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -13,8 +13,8 @@ * along with this program. If not, see . */ -#include "function.h" #include "filter.h" +#include "function.h" #include "functionMgt.h" #include "os.h" #include "querynodes.h" @@ -142,7 +142,7 @@ static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockIn return true; } - while(1) { + while (1) { getNextTimeWindow(pInterval, &w, order); if (w.ekey < pBlockInfo->window.skey) { break; @@ -190,7 +190,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca pCost->skipBlocks += 1; // clear all data in pBlock that are set when handing the previous block - for(int32_t i = 0; i < pBlockInfo->numOfCols; ++i) { + for (int32_t i = 0; i < pBlockInfo->numOfCols; ++i) { SColumnInfoData* pcol = taosArrayGet(pBlock->pDataBlock, i); pcol->pData = NULL; } @@ -304,23 +304,23 @@ void addTagPseudoColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock) setTbNameColData(pTableScanInfo->readHandle.meta, pBlock, pColInfoData, functionId); } else { // these are tags const char* p = NULL; - if(pColInfoData->info.type == TSDB_DATA_TYPE_JSON){ - const uint8_t *tmp = mr.me.ctbEntry.pTags; - char *data = taosMemoryCalloc(kvRowLen(tmp) + 1, 1); - if(data == NULL){ + if (pColInfoData->info.type == TSDB_DATA_TYPE_JSON) { + const uint8_t* tmp = mr.me.ctbEntry.pTags; + char* data = taosMemoryCalloc(kvRowLen(tmp) + 1, 1); + if (data == NULL) { qError("doTagScan calloc error:%d", kvRowLen(tmp) + 1); return; } *data = TSDB_DATA_TYPE_JSON; - memcpy(data+1, tmp, kvRowLen(tmp)); + memcpy(data + 1, tmp, kvRowLen(tmp)); p = data; - }else{ + } else { p = metaGetTableTagVal(&mr.me, pExpr->base.pParam[0].pCol->colId); } for (int32_t i = 0; i < pBlock->info.rows; ++i) { colDataAppend(pColInfoData, i, p, (p == NULL)); } - if(pColInfoData->info.type == TSDB_DATA_TYPE_JSON){ + if (pColInfoData->info.type == TSDB_DATA_TYPE_JSON) { taosMemoryFree((void*)p); } } @@ -338,9 +338,8 @@ void setTbNameColData(void* pMeta, const SSDataBlock* pBlock, SColumnInfoData* p infoData.info.bytes = sizeof(uint64_t); colInfoDataEnsureCapacity(&infoData, 0, 1); - colDataAppendInt64(&infoData, 0, (int64_t*) &pBlock->info.uid); - SScalarParam srcParam = { - .numOfRows = pBlock->info.rows, .param = pMeta, .columnData = &infoData}; + colDataAppendInt64(&infoData, 0, (int64_t*)&pBlock->info.uid); + SScalarParam srcParam = {.numOfRows = pBlock->info.rows, .param = pMeta, .columnData = &infoData}; SScalarParam param = {.columnData = pColInfoData}; fpSet.process(&srcParam, 1, ¶m); @@ -372,7 +371,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { } pOperator->resultInfo.totalRows = pTableScanInfo->readRecorder.totalRows; - pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st)/1000.0; + pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0; pOperator->cost.totalCost = pTableScanInfo->readRecorder.elapsedTime; return pBlock; @@ -405,7 +404,8 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { STimeWindow* pWin = &pTableScanInfo->cond.twindow; qDebug("%s start to repeat ascending order scan data blocks due to query func required, qrange:%" PRId64 - "-%" PRId64, GET_TASKID(pTaskInfo), pWin->skey, pWin->ekey); + "-%" PRId64, + GET_TASKID(pTaskInfo), pWin->skey, pWin->ekey); // do prepare for the next round table scan operation tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond); @@ -463,7 +463,7 @@ SInterval extractIntervalInfo(const STableScanPhysiNode* pTableScanNode) { static int32_t getTableScannerExecInfo(struct SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) { SFileBlockLoadRecorder* pRecorder = taosMemoryCalloc(1, sizeof(SFileBlockLoadRecorder)); - STableScanInfo* pTableScanInfo = pOptr->info; + STableScanInfo* pTableScanInfo = pOptr->info; *pRecorder = pTableScanInfo->readRecorder; *pOptrExplain = pRecorder; *len = sizeof(SFileBlockLoadRecorder); @@ -480,7 +480,8 @@ static void destroyTableScanOperatorInfo(void* param, int32_t numOfOutput) { } } -SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, tsdbReaderT pDataReader, SReadHandle* readHandle, SExecTaskInfo* pTaskInfo) { +SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, tsdbReaderT pDataReader, + SReadHandle* readHandle, SExecTaskInfo* pTaskInfo) { STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -494,7 +495,8 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SDataBlockDescNode* pDescNode = pTableScanNode->scan.node.pOutputDataBlockDesc; int32_t numOfCols = 0; - SArray* pColList = extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, pTaskInfo, COL_MATCH_FROM_COL_ID); + SArray* pColList = + extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, pTaskInfo, COL_MATCH_FROM_COL_ID); int32_t code = initQueryTableDataCond(&pInfo->cond, pTableScanNode); if (code != TSDB_CODE_SUCCESS) { @@ -503,31 +505,32 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, if (pTableScanNode->scan.pScanPseudoCols != NULL) { pInfo->pPseudoExpr = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pInfo->numOfPseudoExpr); - pInfo->pPseudoCtx = createSqlFunctionCtx(pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, &pInfo->rowCellInfoOffset); + pInfo->pPseudoCtx = createSqlFunctionCtx(pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, &pInfo->rowCellInfoOffset); } pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]}; -// pInfo->scanInfo = (SScanInfo){.numOfAsc = 0, .numOfDesc = 1}; // for debug purpose + // pInfo->scanInfo = (SScanInfo){.numOfAsc = 0, .numOfDesc = 1}; // for debug purpose - pInfo->readHandle = *readHandle; - pInfo->interval = extractIntervalInfo(pTableScanNode); - pInfo->sampleRatio = pTableScanNode->ratio; + pInfo->readHandle = *readHandle; + pInfo->interval = extractIntervalInfo(pTableScanNode); + pInfo->sampleRatio = pTableScanNode->ratio; pInfo->dataBlockLoadFlag = pTableScanNode->dataRequired; - pInfo->pResBlock = createResDataBlock(pDescNode); - pInfo->pFilterNode = pTableScanNode->scan.node.pConditions; - pInfo->dataReader = pDataReader; - pInfo->scanFlag = MAIN_SCAN; - pInfo->pColMatchInfo = pColList; + pInfo->pResBlock = createResDataBlock(pDescNode); + pInfo->pFilterNode = pTableScanNode->scan.node.pConditions; + pInfo->dataReader = pDataReader; + pInfo->scanFlag = MAIN_SCAN; + pInfo->pColMatchInfo = pColList; - pOperator->name = "TableScanOperator"; // for debug purpose + pOperator->name = "TableScanOperator"; // for debug purpose pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; - pOperator->numOfExprs = numOfCols; - pOperator->pTaskInfo = pTaskInfo; + pOperator->blocking = false; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; + pOperator->numOfExprs = numOfCols; + pOperator->pTaskInfo = pTaskInfo; - pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScan, NULL, NULL, destroyTableScanOperatorInfo, NULL, NULL, getTableScannerExecInfo); + pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScan, NULL, NULL, destroyTableScanOperatorInfo, + NULL, NULL, getTableScannerExecInfo); // for non-blocking operator, the open cost is always 0 pOperator->cost.openCost = 0; @@ -645,32 +648,30 @@ static void doClearBufferedBlocks(SStreamBlockScanInfo* pInfo) { taosArrayClear(pInfo->pBlockLists); } -static bool isSessionWindow(SStreamBlockScanInfo* pInfo) { - return pInfo->sessionSup.pStreamAggSup != NULL; -} +static bool isSessionWindow(SStreamBlockScanInfo* pInfo) { return pInfo->sessionSup.pStreamAggSup != NULL; } static bool prepareDataScan(SStreamBlockScanInfo* pInfo) { SSDataBlock* pSDB = pInfo->pUpdateRes; if (pInfo->updateResIndex < pSDB->info.rows) { SColumnInfoData* pColDataInfo = taosArrayGet(pSDB->pDataBlock, 0); - TSKEY *tsCols = (TSKEY*)pColDataInfo->pData; - SResultRowInfo dumyInfo; + TSKEY* tsCols = (TSKEY*)pColDataInfo->pData; + SResultRowInfo dumyInfo; dumyInfo.cur.pageId = -1; STimeWindow win; if (isSessionWindow(pInfo)) { SStreamAggSupporter* pAggSup = pInfo->sessionSup.pStreamAggSup; - int64_t gap = pInfo->sessionSup.gap; - int32_t winIndex = 0; - SResultWindowInfo* pCurWin = getSessionTimeWindow(pAggSup->pResultRows, - tsCols[pInfo->updateResIndex], gap, &winIndex); + int64_t gap = pInfo->sessionSup.gap; + int32_t winIndex = 0; + SResultWindowInfo* pCurWin = + getSessionTimeWindow(pAggSup->pResultRows, tsCols[pInfo->updateResIndex], gap, &winIndex); win = pCurWin->win; - pInfo->updateResIndex += updateSessionWindowInfo(pCurWin, tsCols, pSDB->info.rows, - pInfo->updateResIndex, gap, NULL); + pInfo->updateResIndex += + updateSessionWindowInfo(pCurWin, tsCols, pSDB->info.rows, pInfo->updateResIndex, gap, NULL); } else { - win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[pInfo->updateResIndex], - &pInfo->interval, pInfo->interval.precision, NULL); - pInfo->updateResIndex += getNumOfRowsInTimeWindow(&pSDB->info, tsCols, pInfo->updateResIndex, - win.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC); + win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[pInfo->updateResIndex], &pInfo->interval, + pInfo->interval.precision, NULL); + pInfo->updateResIndex += getNumOfRowsInTimeWindow(&pSDB->info, tsCols, pInfo->updateResIndex, win.ekey, + binarySearchForKey, NULL, TSDB_ORDER_ASC); } STableScanInfo* pTableScanInfo = pInfo->pOperatorDumy->info; pTableScanInfo->cond.twindow = win; @@ -709,8 +710,8 @@ static SSDataBlock* getUpdateDataBlock(SStreamBlockScanInfo* pInfo, bool inverti // p->info.type = STREAM_INVERT; // taosArrayClear(pInfo->tsArray); // return p; - SSDataBlock* pDataBlock = createOneDataBlock(pInfo->pRes, false); - SColumnInfoData* pCol = (SColumnInfoData*) taosArrayGet(pDataBlock->pDataBlock, 0); + SSDataBlock* pDataBlock = createOneDataBlock(pInfo->pRes, false); + SColumnInfoData* pCol = (SColumnInfoData*)taosArrayGet(pDataBlock->pDataBlock, 0); ASSERT(pCol->info.type == TSDB_DATA_TYPE_TIMESTAMP); colInfoDataEnsureCapacity(pCol, 0, size); for (int32_t i = 0; i < size; i++) { @@ -733,19 +734,17 @@ void static setSupKeyBuf(SCatchSupporter* pSup, int64_t groupId, int64_t childId pKey[2] = ts; } -static int32_t catchWidonwInfo(SSDataBlock* pDataBlock, SCatchSupporter* pSup, - int32_t pageId, int32_t tsIndex, int64_t childId) { +static int32_t catchWidonwInfo(SSDataBlock* pDataBlock, SCatchSupporter* pSup, int32_t pageId, int32_t tsIndex, + int64_t childId) { SColumnInfoData* pColDataInfo = taosArrayGet(pDataBlock->pDataBlock, tsIndex); - TSKEY* tsCols = (int64_t*)pColDataInfo->pData; + TSKEY* tsCols = (int64_t*)pColDataInfo->pData; for (int32_t i = 0; i < pDataBlock->info.rows; i++) { setSupKeyBuf(pSup, pDataBlock->info.groupId, childId, tsCols[i]); - SWindowPosition* p1 = (SWindowPosition*)taosHashGet(pSup->pWindowHashTable, - pSup->pKeyBuf, pSup->keySize); + SWindowPosition* p1 = (SWindowPosition*)taosHashGet(pSup->pWindowHashTable, pSup->pKeyBuf, pSup->keySize); if (p1 == NULL) { SWindowPosition pos = {.pageId = pageId, .rowId = i}; - int32_t code = taosHashPut(pSup->pWindowHashTable, pSup->pKeyBuf, pSup->keySize, &pos, - sizeof(SWindowPosition)); - if (code != TSDB_CODE_SUCCESS ) { + int32_t code = taosHashPut(pSup->pWindowHashTable, pSup->pKeyBuf, pSup->keySize, &pos, sizeof(SWindowPosition)); + if (code != TSDB_CODE_SUCCESS) { return code; } } else { @@ -756,24 +755,23 @@ static int32_t catchWidonwInfo(SSDataBlock* pDataBlock, SCatchSupporter* pSup, return TSDB_CODE_SUCCESS; } -static int32_t catchDatablock(SSDataBlock* pDataBlock, SCatchSupporter* pSup, - int32_t tsIndex, int64_t childId) { +static int32_t catchDatablock(SSDataBlock* pDataBlock, SCatchSupporter* pSup, int32_t tsIndex, int64_t childId) { int32_t start = 0; int32_t stop = 0; int32_t pageSize = getBufPageSize(pSup->pDataBuf); - while(start < pDataBlock->info.rows) { + while (start < pDataBlock->info.rows) { blockDataSplitRows(pDataBlock, pDataBlock->info.hasVarCol, start, &stop, pageSize); SSDataBlock* pDB = blockDataExtractBlock(pDataBlock, start, stop - start + 1); if (pDB == NULL) { return terrno; } int32_t pageId = -1; - void* pPage = getNewBufPage(pSup->pDataBuf, pDataBlock->info.groupId, &pageId); + void* pPage = getNewBufPage(pSup->pDataBuf, pDataBlock->info.groupId, &pageId); if (pPage == NULL) { blockDataDestroy(pDB); return terrno; } - int32_t size = blockDataGetSize(pDB) + sizeof(int32_t) + pDB->info.numOfCols * sizeof(int32_t); + int32_t size = blockDataGetSize(pDB) + sizeof(int32_t) + pDB->info.numOfCols * sizeof(int32_t); assert(size <= pageSize); blockDataToBuf(pPage, pDB); setBufPageDirty(pPage, true); @@ -781,7 +779,7 @@ static int32_t catchDatablock(SSDataBlock* pDataBlock, SCatchSupporter* pSup, blockDataDestroy(pDB); start = stop + 1; int32_t code = catchWidonwInfo(pDB, pSup, pageId, tsIndex, childId); - if (code != TSDB_CODE_SUCCESS ) { + if (code != TSDB_CODE_SUCCESS) { return code; } } @@ -794,16 +792,14 @@ static SSDataBlock* getDataFromCatch(SStreamBlockScanInfo* pInfo) { blockDataCleanup(pInfo->pRes); SCatchSupporter* pCSup = &pInfo->childAggSup; SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, 0); - TSKEY *tsCols = (TSKEY*)pColDataInfo->pData; - int32_t size = taosArrayGetSize(pInfo->childIds); + TSKEY* tsCols = (TSKEY*)pColDataInfo->pData; + int32_t size = taosArrayGetSize(pInfo->childIds); for (int32_t i = 0; i < size; i++) { - int64_t id = *(int64_t *)taosArrayGet(pInfo->childIds, i); - setSupKeyBuf(pCSup, pBlock->info.groupId, id, - tsCols[pInfo->updateResIndex]); - SWindowPosition* pos = (SWindowPosition*)taosHashGet(pCSup->pWindowHashTable, - pCSup->pKeyBuf, pCSup->keySize); - void* buf = getBufPage(pCSup->pDataBuf, pos->pageId); - SSDataBlock* pDB = createOneDataBlock(pInfo->pRes, false); + int64_t id = *(int64_t*)taosArrayGet(pInfo->childIds, i); + setSupKeyBuf(pCSup, pBlock->info.groupId, id, tsCols[pInfo->updateResIndex]); + SWindowPosition* pos = (SWindowPosition*)taosHashGet(pCSup->pWindowHashTable, pCSup->pKeyBuf, pCSup->keySize); + void* buf = getBufPage(pCSup->pDataBuf, pos->pageId); + SSDataBlock* pDB = createOneDataBlock(pInfo->pRes, false); blockDataFromBuf(pDB, buf); SSDataBlock* pSub = blockDataExtractBlock(pDB, pos->rowId, 1); blockDataMerge(pInfo->pRes, pSub); @@ -834,7 +830,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { if (pDB != NULL) { return pDB; } else { - pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; + pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; } } @@ -844,7 +840,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { return NULL; } - int32_t current = pInfo->validBlockIndex++; + int32_t current = pInfo->validBlockIndex++; SSDataBlock* pBlock = taosArrayGetP(pInfo->pBlockLists, current); if (pBlock->info.type == STREAM_REPROCESS) { pInfo->scanMode = STREAM_SCAN_FROM_UPDATERES; @@ -942,7 +938,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { if (rows == 0) { pOperator->status = OP_EXEC_DONE; } else if (pInfo->pUpdateInfo) { - SSDataBlock* upRes = getUpdateDataBlock(pInfo, true); //TODO(liuyao) get invertible from plan + SSDataBlock* upRes = getUpdateDataBlock(pInfo, true); // TODO(liuyao) get invertible from plan if (upRes) { pInfo->pUpdateRes = upRes; if (upRes->info.type == STREAM_REPROCESS) { @@ -977,7 +973,7 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, void* pDataR SArray* pColIds = taosArrayInit(4, sizeof(int16_t)); for (int32_t i = 0; i < numOfOutput; ++i) { SColMatchInfo* id = taosArrayGet(pColList, i); - int16_t colId = id->colId; + int16_t colId = id->colId; taosArrayPush(pColIds, &colId); } @@ -1001,33 +997,34 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, void* pDataR goto _error; } - pInfo->primaryTsIndex = 0; // TODO(liuyao) get it from physical plan + pInfo->primaryTsIndex = 0; // TODO(liuyao) get it from physical plan if (pSTInfo->interval.interval > 0) { - pInfo->pUpdateInfo = updateInfoInitP(&pSTInfo->interval, 10000); // TODO(liuyao) get watermark from physical plan + pInfo->pUpdateInfo = updateInfoInitP(&pSTInfo->interval, 10000); // TODO(liuyao) get watermark from physical plan } else { pInfo->pUpdateInfo = NULL; } - pInfo->readHandle = *pHandle; - pInfo->tableUid = uid; + pInfo->readHandle = *pHandle; + pInfo->tableUid = uid; pInfo->streamBlockReader = streamReadHandle; - pInfo->pRes = pResBlock; - pInfo->pCondition = pCondition; - pInfo->pDataReader = pDataReader; - pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; - pInfo->pOperatorDumy = pOperatorDumy; - pInfo->interval = pSTInfo->interval; - pInfo->sessionSup = (SessionWindowSupporter){.pStreamAggSup = NULL, .gap = -1}; + pInfo->pRes = pResBlock; + pInfo->pCondition = pCondition; + pInfo->pDataReader = pDataReader; + pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; + pInfo->pOperatorDumy = pOperatorDumy; + pInfo->interval = pSTInfo->interval; + pInfo->sessionSup = (SessionWindowSupporter){.pStreamAggSup = NULL, .gap = -1}; - initCatchSupporter(&pInfo->childAggSup, 1024, "StreamFinalInterval", "/tmp/"); // TODO(liuyao) get row size from phy plan + initCatchSupporter(&pInfo->childAggSup, 1024, "StreamFinalInterval", + "/tmp/"); // TODO(liuyao) get row size from phy plan - pOperator->name = "StreamBlockScanOperator"; + pOperator->name = "StreamBlockScanOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; + pOperator->blocking = false; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; pOperator->numOfExprs = pResBlock->info.numOfCols; - pOperator->pTaskInfo = pTaskInfo; + pOperator->pTaskInfo = pTaskInfo; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamBlockScan, NULL, NULL, operatorDummyCloseFn, NULL, NULL, NULL); @@ -1290,7 +1287,7 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) { // number of columns pColInfoData = taosArrayGet(p->pDataBlock, 3); - colDataAppend(pColInfoData, numOfRows, (char*)&mr.me.stbEntry.schema.nCols, false); + colDataAppend(pColInfoData, numOfRows, (char*)&mr.me.stbEntry.schemaRow.nCols, false); // super table name STR_TO_VARSTR(str, mr.me.name); @@ -1314,7 +1311,7 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) { // number of columns pColInfoData = taosArrayGet(p->pDataBlock, 3); - colDataAppend(pColInfoData, numOfRows, (char*)&pInfo->pCur->mr.me.ntbEntry.schema.nCols, false); + colDataAppend(pColInfoData, numOfRows, (char*)&pInfo->pCur->mr.me.ntbEntry.schemaRow.nCols, false); // super table name pColInfoData = taosArrayGet(p->pDataBlock, 4); @@ -1633,7 +1630,7 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) { if (fmIsScanPseudoColumnFunc(pExprInfo[j].pExpr->_function.functionId)) { STR_TO_VARSTR(str, mr.me.name); colDataAppend(pDst, count, str, false); - } else { // it is a tag value + } else { // it is a tag value if (pDst->info.type == TSDB_DATA_TYPE_JSON) { const uint8_t* tmp = mr.me.ctbEntry.pTags; // TODO opt perf by realloc memory diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 340153f5f0559a14997cd56bdb5a55b8cf674e56..383f4f8c39712800f05e193712bc1780758cff68 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -3951,7 +3951,7 @@ typedef struct SVgroupCreateTableBatch { static void destroyCreateTbReq(SVCreateTbReq* pReq) { taosMemoryFreeClear(pReq->name); - taosMemoryFreeClear(pReq->ntb.schema.pSchema); + taosMemoryFreeClear(pReq->ntb.schemaRow.pSchema); } static int32_t buildNormalTableBatchReq(int32_t acctId, const SCreateTableStmt* pStmt, const SVgroupInfo* pVgroupInfo, @@ -3964,10 +3964,10 @@ static int32_t buildNormalTableBatchReq(int32_t acctId, const SCreateTableStmt* SVCreateTbReq req = {0}; req.type = TD_NORMAL_TABLE; req.name = strdup(pStmt->tableName); - req.ntb.schema.nCols = LIST_LENGTH(pStmt->pCols); - req.ntb.schema.sver = 1; - req.ntb.schema.pSchema = taosMemoryCalloc(req.ntb.schema.nCols, sizeof(SSchema)); - if (NULL == req.name || NULL == req.ntb.schema.pSchema) { + req.ntb.schemaRow.nCols = LIST_LENGTH(pStmt->pCols); + req.ntb.schemaRow.version = 1; + req.ntb.schemaRow.pSchema = taosMemoryCalloc(req.ntb.schemaRow.nCols, sizeof(SSchema)); + if (NULL == req.name || NULL == req.ntb.schemaRow.pSchema) { destroyCreateTbReq(&req); return TSDB_CODE_OUT_OF_MEMORY; } @@ -3977,7 +3977,7 @@ static int32_t buildNormalTableBatchReq(int32_t acctId, const SCreateTableStmt* SNode* pCol; col_id_t index = 0; FOREACH(pCol, pStmt->pCols) { - toSchema((SColumnDefNode*)pCol, index + 1, req.ntb.schema.pSchema + index); + toSchema((SColumnDefNode*)pCol, index + 1, req.ntb.schemaRow.pSchema + index); ++index; } pBatch->info = *pVgroupInfo; @@ -4031,7 +4031,7 @@ static void destroyCreateTbReqBatch(SVgroupCreateTableBatch* pTbBatch) { taosMemoryFreeClear(pTableReq->name); if (pTableReq->type == TSDB_NORMAL_TABLE) { - taosMemoryFreeClear(pTableReq->ntb.schema.pSchema); + taosMemoryFreeClear(pTableReq->ntb.schemaRow.pSchema); } else if (pTableReq->type == TSDB_CHILD_TABLE) { taosMemoryFreeClear(pTableReq->ctb.pTag); }