From d4c33fba2f3433932e72f17233d8de129910b526 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Tue, 7 Jun 2022 20:49:27 +0800 Subject: [PATCH] feat: create tsma result stable --- include/common/taosdef.h | 1 + include/common/tmsg.h | 61 +++---- source/common/src/tdatablock.c | 40 +++-- source/common/src/tmsg.c | 36 ++-- source/dnode/mnode/impl/inc/mndDef.h | 53 +++--- source/dnode/mnode/impl/src/mndSma.c | 87 +++++++++- source/dnode/mnode/impl/src/mndStream.c | 4 + source/dnode/vnode/src/meta/metaSma.c | 4 +- source/dnode/vnode/src/sma/smaRollup.c | 4 +- source/dnode/vnode/src/sma/smaTimeRange2.c | 188 ++------------------- source/dnode/vnode/src/tq/tqPush.c | 12 +- source/dnode/vnode/src/vnd/vnodeSvr.c | 8 +- 12 files changed, 229 insertions(+), 269 deletions(-) diff --git a/include/common/taosdef.h b/include/common/taosdef.h index 516df71b0b..60b1dc6c10 100644 --- a/include/common/taosdef.h +++ b/include/common/taosdef.h @@ -98,6 +98,7 @@ extern char *qtypeStr[]; #undef TD_DEBUG_PRINT_ROW #undef TD_DEBUG_PRINT_TSDB_LOAD_DCOLS #undef TD_DEBUG_PRINT_TAG +#define TD_DEBUG_SMA_ID 123456 #ifdef __cplusplus } diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 4a3c4b0c3f..72f36a6995 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2329,24 +2329,28 @@ typedef struct { } SVgEpSet; typedef struct { - int8_t version; // for compatibility(default 0) - int8_t intervalUnit; // MACRO: TIME_UNIT_XXX - int8_t slidingUnit; // MACRO: TIME_UNIT_XXX - int8_t timezoneInt; // sma data expired if timezone changes. - int32_t dstVgId; - char indexName[TSDB_INDEX_NAME_LEN]; - int32_t exprLen; - int32_t tagsFilterLen; - int32_t numOfVgroups; - int64_t indexUid; - tb_uid_t tableUid; // super/child/common table uid - int64_t interval; - int64_t offset; // use unit by precision of DB - int64_t sliding; - char* expr; // sma expression - char* tagsFilter; - SVgEpSet* pVgEpSet; -} STSma; // Time-range-wise SMA + int8_t version; // for compatibility(default 0) + int8_t intervalUnit; // MACRO: TIME_UNIT_XXX + int8_t slidingUnit; // MACRO: TIME_UNIT_XXX + int8_t timezoneInt; // sma data expired if timezone changes. + int32_t dstVgId; + char indexName[TSDB_INDEX_NAME_LEN]; + int32_t exprLen; + int32_t tagsFilterLen; + int32_t numOfVgroups; // for dstVgroup + int64_t indexUid; + tb_uid_t tableUid; // super/child/common table uid + tb_uid_t dstTbUid; // for dstVgroup + int64_t interval; + int64_t offset; // use unit by precision of DB + int64_t sliding; + char* dstTbName; // for dstVgroup + char* expr; // sma expression + char* tagsFilter; + SVgEpSet* pVgEpSet; // for dstVgroup + SSchemaWrapper schemaRow; // for dstVgroup + SSchemaWrapper schemaTag; // for dstVgroup +} STSma; // Time-range-wise SMA typedef STSma SVCreateTSmaReq; @@ -2493,14 +2497,14 @@ int32_t tSerializeSTableIndexReq(void* buf, int32_t bufLen, STableIndexReq* pReq int32_t tDeserializeSTableIndexReq(void* buf, int32_t bufLen, STableIndexReq* pReq); typedef struct { - int8_t intervalUnit; - int8_t slidingUnit; - int64_t interval; - int64_t offset; - int64_t sliding; - int64_t dstTbUid; - int32_t dstVgId; // for stream - char* expr; + int8_t intervalUnit; + int8_t slidingUnit; + int64_t interval; + int64_t offset; + int64_t sliding; + int64_t dstTbUid; + int32_t dstVgId; // for stream + char* expr; } STableIndexInfo; typedef struct { @@ -2510,7 +2514,6 @@ typedef struct { int32_t tSerializeSTableIndexRsp(void* buf, int32_t bufLen, const STableIndexRsp* pRsp); int32_t tDeserializeSTableIndexRsp(void* buf, int32_t bufLen, STableIndexRsp* pRsp); - typedef struct { int8_t mqMsgType; int32_t code; @@ -2751,8 +2754,8 @@ typedef struct { char* msg; } SVDeleteReq; -int32_t tSerializeSVDeleteReq(void *buf, int32_t bufLen, SVDeleteReq *pReq); -int32_t tDeserializeSVDeleteReq(void *buf, int32_t bufLen, SVDeleteReq *pReq); +int32_t tSerializeSVDeleteReq(void* buf, int32_t bufLen, SVDeleteReq* pReq); +int32_t tDeserializeSVDeleteReq(void* buf, int32_t bufLen, SVDeleteReq* pReq); typedef struct { int64_t affectedRows; diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 9caa9a73a5..2202b9579d 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1624,25 +1624,31 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks break; default: if (pColInfoData->info.type < TSDB_DATA_TYPE_MAX && pColInfoData->info.type > TSDB_DATA_TYPE_NULL) { - char tv[8] = {0}; - if (pColInfoData->info.type == TSDB_DATA_TYPE_FLOAT) { - float v = 0; - GET_TYPED_DATA(v, float, pColInfoData->info.type, var); - SET_TYPED_DATA(&tv, pCol->type, v); - } else if (pColInfoData->info.type == TSDB_DATA_TYPE_DOUBLE) { - double v = 0; - GET_TYPED_DATA(v, double, pColInfoData->info.type, var); - SET_TYPED_DATA(&tv, pCol->type, v); - } else if (IS_SIGNED_NUMERIC_TYPE(pColInfoData->info.type)) { - int64_t v = 0; - GET_TYPED_DATA(v, int64_t, pColInfoData->info.type, var); - SET_TYPED_DATA(&tv, pCol->type, v); + if (pCol->type == pColInfoData->info.type) { + tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, pCol->type, TD_VTYPE_NORM, var, true, offset, + k); } else { - uint64_t v = 0; - GET_TYPED_DATA(v, uint64_t, pColInfoData->info.type, var); - SET_TYPED_DATA(&tv, pCol->type, v); + char tv[8] = {0}; + if (pColInfoData->info.type == TSDB_DATA_TYPE_FLOAT) { + float v = 0; + GET_TYPED_DATA(v, float, pColInfoData->info.type, var); + SET_TYPED_DATA(&tv, pCol->type, v); + } else if (pColInfoData->info.type == TSDB_DATA_TYPE_DOUBLE) { + double v = 0; + GET_TYPED_DATA(v, double, pColInfoData->info.type, var); + SET_TYPED_DATA(&tv, pCol->type, v); + } else if (IS_SIGNED_NUMERIC_TYPE(pColInfoData->info.type)) { + int64_t v = 0; + GET_TYPED_DATA(v, int64_t, pColInfoData->info.type, var); + SET_TYPED_DATA(&tv, pCol->type, v); + } else { + uint64_t v = 0; + GET_TYPED_DATA(v, uint64_t, pColInfoData->info.type, var); + SET_TYPED_DATA(&tv, pCol->type, v); + } + tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, pCol->type, TD_VTYPE_NORM, tv, true, offset, + k); } - tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, pCol->type, TD_VTYPE_NORM, tv, true, offset, k); } else { uError("the column type %" PRIi16 " is undefined\n", pColInfoData->info.type); TASSERT(0); diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 51c40827b5..bbd7c2e7a0 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -3844,6 +3844,8 @@ int32_t tEncodeTSma(SEncoder *pCoder, const STSma *pSma) { if (tEncodeI32(pCoder, pSma->numOfVgroups) < 0) return -1; if (tEncodeI64(pCoder, pSma->indexUid) < 0) return -1; if (tEncodeI64(pCoder, pSma->tableUid) < 0) return -1; + if (tEncodeI64(pCoder, pSma->dstTbUid) < 0) return -1; + if (tEncodeCStr(pCoder, pSma->dstTbName) < 0) return -1; if (tEncodeI64(pCoder, pSma->interval) < 0) return -1; if (tEncodeI64(pCoder, pSma->offset) < 0) return -1; if (tEncodeI64(pCoder, pSma->sliding) < 0) return -1; @@ -3853,17 +3855,24 @@ int32_t tEncodeTSma(SEncoder *pCoder, const STSma *pSma) { if (pSma->tagsFilterLen > 0) { if (tEncodeCStr(pCoder, pSma->tagsFilter) < 0) return -1; } - for (int32_t v = 0; v < pSma->numOfVgroups; ++v) { - if (tEncodeI32(pCoder, pSma->pVgEpSet[v].vgId) < 0) return -1; - if (tEncodeI8(pCoder, pSma->pVgEpSet[v].epSet.inUse) < 0) return -1; - int8_t numOfEps = pSma->pVgEpSet[v].epSet.numOfEps; - if (tEncodeI8(pCoder, numOfEps) < 0) return -1; - for (int32_t n = 0; n < numOfEps; ++n) { - const SEp *pEp = &pSma->pVgEpSet[v].epSet.eps[n]; - if (tEncodeCStr(pCoder, pEp->fqdn) < 0) return -1; - if (tEncodeU16(pCoder, pEp->port) < 0) return -1; + + if (pSma->numOfVgroups) { // only needed in dstVgroup + for (int32_t v = 0; v < pSma->numOfVgroups; ++v) { + if (tEncodeI32(pCoder, pSma->pVgEpSet[v].vgId) < 0) return -1; + if (tEncodeI8(pCoder, pSma->pVgEpSet[v].epSet.inUse) < 0) return -1; + int8_t numOfEps = pSma->pVgEpSet[v].epSet.numOfEps; + if (tEncodeI8(pCoder, numOfEps) < 0) return -1; + for (int32_t n = 0; n < numOfEps; ++n) { + const SEp *pEp = &pSma->pVgEpSet[v].epSet.eps[n]; + if (tEncodeCStr(pCoder, pEp->fqdn) < 0) return -1; + if (tEncodeU16(pCoder, pEp->port) < 0) return -1; + } } + + tEncodeSSchemaWrapper(pCoder, &pSma->schemaRow); + tEncodeSSchemaWrapper(pCoder, &pSma->schemaTag); } + return 0; } @@ -3871,14 +3880,16 @@ int32_t tDecodeTSma(SDecoder *pCoder, STSma *pSma) { if (tDecodeI8(pCoder, &pSma->version) < 0) return -1; if (tDecodeI8(pCoder, &pSma->intervalUnit) < 0) return -1; if (tDecodeI8(pCoder, &pSma->slidingUnit) < 0) return -1; - if (tDecodeI32(pCoder, &pSma->dstVgId) < 0) return -1; if (tDecodeI8(pCoder, &pSma->timezoneInt) < 0) return -1; + if (tDecodeI32(pCoder, &pSma->dstVgId) < 0) return -1; if (tDecodeCStrTo(pCoder, pSma->indexName) < 0) return -1; if (tDecodeI32(pCoder, &pSma->exprLen) < 0) return -1; if (tDecodeI32(pCoder, &pSma->tagsFilterLen) < 0) return -1; if (tDecodeI32(pCoder, &pSma->numOfVgroups) < 0) return -1; if (tDecodeI64(pCoder, &pSma->indexUid) < 0) return -1; if (tDecodeI64(pCoder, &pSma->tableUid) < 0) return -1; + if (tDecodeI64(pCoder, &pSma->dstTbUid) < 0) return -1; + if (tDecodeCStr(pCoder, &pSma->dstTbName) < 0) return -1; if (tDecodeI64(pCoder, &pSma->interval) < 0) return -1; if (tDecodeI64(pCoder, &pSma->offset) < 0) return -1; if (tDecodeI64(pCoder, &pSma->sliding) < 0) return -1; @@ -3892,7 +3903,7 @@ int32_t tDecodeTSma(SDecoder *pCoder, STSma *pSma) { } else { pSma->tagsFilter = NULL; } - if (pSma->numOfVgroups > 0) { + if (pSma->numOfVgroups > 0) { // only needed in dstVgroup pSma->pVgEpSet = (SVgEpSet *)tDecoderMalloc(pCoder, pSma->numOfVgroups * sizeof(SVgEpSet)); if (!pSma->pVgEpSet) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -3912,6 +3923,9 @@ int32_t tDecodeTSma(SDecoder *pCoder, STSma *pSma) { if (tDecodeU16(pCoder, &pEp->port) < 0) return -1; } } + + tDecodeSSchemaWrapperEx(pCoder, &pSma->schemaRow); + tDecodeSSchemaWrapperEx(pCoder, &pSma->schemaTag); } return 0; diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 382f8dd55f..d21af87067 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -298,31 +298,34 @@ typedef struct { } SVgObj; typedef struct { - char name[TSDB_TABLE_FNAME_LEN]; - char stb[TSDB_TABLE_FNAME_LEN]; - char db[TSDB_DB_FNAME_LEN]; - int64_t createdTime; - int64_t uid; - int64_t stbUid; - int64_t dbUid; - int8_t intervalUnit; - int8_t slidingUnit; - int8_t timezone; - int32_t dstVgId; // for stream - int64_t dstTbUid; - int64_t interval; - int64_t offset; - int64_t sliding; - int32_t exprLen; // strlen + 1 - int32_t tagsFilterLen; - int32_t sqlLen; - int32_t astLen; - int32_t numOfVgroups; - char* expr; - char* tagsFilter; - char* sql; - char* ast; - SVgEpSet* pVgEpSet; + char name[TSDB_TABLE_FNAME_LEN]; + char stb[TSDB_TABLE_FNAME_LEN]; + char db[TSDB_DB_FNAME_LEN]; + char dstTbName[TSDB_TABLE_FNAME_LEN]; + int64_t createdTime; + int64_t uid; + int64_t stbUid; + int64_t dbUid; + int64_t dstTbUid; + int8_t intervalUnit; + int8_t slidingUnit; + int8_t timezone; + int32_t dstVgId; // for stream + int64_t interval; + int64_t offset; + int64_t sliding; + int32_t exprLen; // strlen + 1 + int32_t tagsFilterLen; + int32_t sqlLen; + int32_t astLen; + int32_t numOfVgroups; // for dstVgroup + char* expr; + char* tagsFilter; + char* sql; + char* ast; + SVgEpSet* pVgEpSet; // for dstVgroup + SSchemaWrapper schemaRow; // for dstVgroup + SSchemaWrapper schemaTag; // for dstVgroup } SSmaObj; typedef struct { diff --git a/source/dnode/mnode/impl/src/mndSma.c b/source/dnode/mnode/impl/src/mndSma.c index 1d47f8fc7a..ef9f5c3352 100644 --- a/source/dnode/mnode/impl/src/mndSma.c +++ b/source/dnode/mnode/impl/src/mndSma.c @@ -26,6 +26,7 @@ #include "mndTrans.h" #include "mndUser.h" #include "mndVgroup.h" +#include "parser.h" #include "tname.h" #define TSDB_SMA_VER_NUMBER 1 @@ -82,10 +83,12 @@ static SSdbRaw *mndSmaActionEncode(SSmaObj *pSma) { SDB_SET_BINARY(pRaw, dataPos, pSma->name, TSDB_TABLE_FNAME_LEN, _OVER) SDB_SET_BINARY(pRaw, dataPos, pSma->stb, TSDB_TABLE_FNAME_LEN, _OVER) SDB_SET_BINARY(pRaw, dataPos, pSma->db, TSDB_DB_FNAME_LEN, _OVER) + SDB_SET_BINARY(pRaw, dataPos, pSma->dstTbName, TSDB_DB_FNAME_LEN, _OVER) SDB_SET_INT64(pRaw, dataPos, pSma->createdTime, _OVER) SDB_SET_INT64(pRaw, dataPos, pSma->uid, _OVER) SDB_SET_INT64(pRaw, dataPos, pSma->stbUid, _OVER) SDB_SET_INT64(pRaw, dataPos, pSma->dbUid, _OVER) + SDB_SET_INT64(pRaw, dataPos, pSma->dstTbUid, _OVER) SDB_SET_INT8(pRaw, dataPos, pSma->intervalUnit, _OVER) SDB_SET_INT8(pRaw, dataPos, pSma->slidingUnit, _OVER) SDB_SET_INT8(pRaw, dataPos, pSma->timezone, _OVER) @@ -147,10 +150,12 @@ static SSdbRow *mndSmaActionDecode(SSdbRaw *pRaw) { SDB_GET_BINARY(pRaw, dataPos, pSma->name, TSDB_TABLE_FNAME_LEN, _OVER) SDB_GET_BINARY(pRaw, dataPos, pSma->stb, TSDB_TABLE_FNAME_LEN, _OVER) SDB_GET_BINARY(pRaw, dataPos, pSma->db, TSDB_DB_FNAME_LEN, _OVER) + SDB_GET_BINARY(pRaw, dataPos, pSma->dstTbName, TSDB_DB_FNAME_LEN, _OVER) SDB_GET_INT64(pRaw, dataPos, &pSma->createdTime, _OVER) SDB_GET_INT64(pRaw, dataPos, &pSma->uid, _OVER) SDB_GET_INT64(pRaw, dataPos, &pSma->stbUid, _OVER) SDB_GET_INT64(pRaw, dataPos, &pSma->dbUid, _OVER) + SDB_GET_INT64(pRaw, dataPos, &pSma->dstTbUid, _OVER) SDB_GET_INT8(pRaw, dataPos, &pSma->intervalUnit, _OVER) SDB_GET_INT8(pRaw, dataPos, &pSma->slidingUnit, _OVER) SDB_GET_INT8(pRaw, dataPos, &pSma->timezone, _OVER) @@ -260,6 +265,8 @@ static void *mndBuildVCreateSmaReq(SMnode *pMnode, SVgObj *pVgroup, SSmaObj *pSm req.tagsFilterLen = pSma->tagsFilterLen; req.indexUid = pSma->uid; req.tableUid = pSma->stbUid; + req.dstVgId = pSma->dstVgId; + req.dstTbUid = pSma->dstTbUid; req.interval = pSma->interval; req.offset = pSma->offset; req.sliding = pSma->sliding; @@ -267,7 +274,10 @@ static void *mndBuildVCreateSmaReq(SMnode *pMnode, SVgObj *pVgroup, SSmaObj *pSm req.tagsFilter = pSma->tagsFilter; req.numOfVgroups = pSma->numOfVgroups; req.pVgEpSet = pSma->pVgEpSet; - + req.schemaRow = pSma->schemaRow; + req.schemaTag = pSma->schemaTag; + req.dstTbName = pSma->dstTbName; + // get length int32_t ret = 0; tEncodeSize(tEncodeSVCreateTSmaReq, &req, contLen, ret); @@ -425,15 +435,43 @@ static int32_t mndSetCreateSmaVgroupRedoActions(SMnode *pMnode, STrans *pTrans, mndReleaseDnode(pMnode, pDnode); // todo add sma info here +#if 1 + SNode *pAst = NULL; + if (nodesStringToNode(pSma->ast, &pAst) < 0) { + return -1; + } + if (qExtractResultSchema(pAst, &pSma->schemaRow.nCols, &pSma->schemaRow.pSchema) != 0) { + nodesDestroyNode(pAst); + return -1; + } + nodesDestroyNode(pAst); + pSma->schemaRow.version = 1; + + // TODO: the schemaTag generated by qExtractResultXXX later. + pSma->schemaTag.nCols = 1; + pSma->schemaTag.version = 1; + pSma->schemaTag.pSchema = taosMemoryCalloc(1, sizeof(SSchema)); + if (!pSma->schemaTag.pSchema) { + nodesDestroyNode(pAst); + return -1; + } + pSma->schemaTag.pSchema[0].type = TSDB_DATA_TYPE_BIGINT; + pSma->schemaTag.pSchema[0].bytes = TYPE_BYTES[TSDB_DATA_TYPE_BIGINT]; + pSma->schemaTag.pSchema[0].colId = pSma->schemaRow.nCols + PRIMARYKEY_TIMESTAMP_COL_ID; + pSma->schemaTag.pSchema[0].flags = 0; + snprintf(pSma->schemaTag.pSchema[0].name, TSDB_COL_NAME_LEN, "groupId"); + SVgEpSet *pVgEpSet = NULL; int32_t numOfVgroups = 0; if (mndSmaGetVgEpSet(pMnode, pDb, &pVgEpSet, &numOfVgroups) != 0) { + nodesDestroyNode(pAst); return -1; } + nodesDestroyNode(pAst); pSma->pVgEpSet = pVgEpSet; pSma->numOfVgroups = numOfVgroups; - +#endif int32_t smaContLen = 0; void *pSmaReq = mndBuildVCreateSmaReq(pMnode, pVgroup, pSma, &smaContLen); if (pSmaReq == NULL) return -1; @@ -463,13 +501,20 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea memcpy(smaObj.stb, pStb->name, TSDB_TABLE_FNAME_LEN); memcpy(smaObj.db, pDb->name, TSDB_DB_FNAME_LEN); smaObj.createdTime = taosGetTimestampMs(); +#if 0 smaObj.uid = mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN); +#endif + smaObj.uid = TD_DEBUG_SMA_ID; + char resultTbName[TSDB_TABLE_FNAME_LEN + 16] = {0}; + snprintf(resultTbName, TSDB_TABLE_FNAME_LEN + 16, "td.tsma.rst.tb.%s", pCreate->name); + memcpy(smaObj.dstTbName, resultTbName, TSDB_TABLE_FNAME_LEN); + smaObj.dstTbUid = mndGenerateUid(smaObj.dstTbName, TSDB_TABLE_FNAME_LEN); smaObj.stbUid = pStb->uid; smaObj.dbUid = pStb->dbUid; smaObj.intervalUnit = pCreate->intervalUnit; smaObj.slidingUnit = pCreate->slidingUnit; smaObj.timezone = pCreate->timezone; - smaObj.dstVgId = pCreate->dstVgId; + // smaObj.dstVgId = pCreate->dstVgId; smaObj.interval = pCreate->interval; smaObj.offset = pCreate->offset; smaObj.sliding = pCreate->sliding; @@ -502,6 +547,42 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea memcpy(smaObj.ast, pCreate->ast, smaObj.astLen); } +#if 1 // only for debugging, not needed in common vgroups, only needed in dstVgroup. + SNode *pAst = NULL; + if (nodesStringToNode(smaObj.ast, &pAst) < 0) { + return -1; + } + if (qExtractResultSchema(pAst, &smaObj.schemaRow.nCols, &smaObj.schemaRow.pSchema) != 0) { + nodesDestroyNode(pAst); + return -1; + } + smaObj.schemaRow.version = 1; + + smaObj.schemaTag.nCols = 1; + smaObj.schemaTag.version = 1; + smaObj.schemaTag.pSchema = taosMemoryCalloc(1, sizeof(SSchema)); + if (!smaObj.schemaTag.pSchema) { + nodesDestroyNode(pAst); + return -1; + } + smaObj.schemaTag.pSchema[0].type = TSDB_DATA_TYPE_BIGINT; + smaObj.schemaTag.pSchema[0].bytes = TYPE_BYTES[TSDB_DATA_TYPE_BIGINT]; + smaObj.schemaTag.pSchema[0].colId = smaObj.schemaRow.nCols + PRIMARYKEY_TIMESTAMP_COL_ID; + smaObj.schemaTag.pSchema[0].flags = 0; + snprintf(smaObj.schemaTag.pSchema[0].name, TSDB_COL_NAME_LEN, "groupId"); + + nodesDestroyNode(pAst); + + SVgEpSet *pVgEpSet = NULL; + int32_t numOfVgroups = 0; + if (mndSmaGetVgEpSet(pMnode, pDb, &pVgEpSet, &numOfVgroups) != 0) { + return -1; + } + + smaObj.pVgEpSet = pVgEpSet; + smaObj.numOfVgroups = numOfVgroups; +#endif + SStreamObj streamObj = {0}; tstrncpy(streamObj.name, pCreate->name, TSDB_STREAM_FNAME_LEN); tstrncpy(streamObj.sourceDb, pDb->name, TSDB_DB_FNAME_LEN); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 7abe9e3c0d..a331534a93 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -252,8 +252,12 @@ int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast } if (qExtractResultSchema(pAst, (int32_t *)&pStream->outputSchema.nCols, &pStream->outputSchema.pSchema) != 0) { + nodesDestroyNode(pAst); return -1; } + // free + nodesDestroyNode(pAst); + #if 0 printf("|"); diff --git a/source/dnode/vnode/src/meta/metaSma.c b/source/dnode/vnode/src/meta/metaSma.c index 689cd511c4..910a0835bb 100644 --- a/source/dnode/vnode/src/meta/metaSma.c +++ b/source/dnode/vnode/src/meta/metaSma.c @@ -34,13 +34,13 @@ int32_t metaCreateTSma(SMeta *pMeta, int64_t version, SSmaCfg *pCfg) { SMetaReader mr = {0}; // validate req + // save smaIndex metaReaderInit(&mr, pMeta, 0); if (metaGetTableEntryByUid(&mr, pCfg->indexUid) == 0) { -// TODO: just for pass case #if 1 terrno = TSDB_CODE_TDB_TSMA_ALREADY_EXIST; metaReaderClear(&mr); - return -1; + return -1; // don't goto _err; #else metaReaderClear(&mr); return 0; diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index e738d3a408..05ad5c1cb2 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -167,13 +167,13 @@ int32_t tdFetchTbUidList(SSma *pSma, STbUidStore **ppStore, tb_uid_t suid, tb_ui */ int32_t tdProcessRSmaCreate(SVnode *pVnode, SVCreateStbReq *pReq) { SSma *pSma = pVnode->pSma; - SMeta *pMeta = pVnode->pMeta; - SMsgCb *pMsgCb = &pVnode->msgCb; if (!pReq->rollup) { smaTrace("vgId:%d, return directly since no rollup for stable %s %" PRIi64, SMA_VID(pSma), pReq->name, pReq->suid); return TSDB_CODE_SUCCESS; } + SMeta *pMeta = pVnode->pMeta; + SMsgCb *pMsgCb = &pVnode->msgCb; SRSmaParam *param = &pReq->pRSmaParam; if ((param->qmsg1Len == 0) && (param->qmsg2Len == 0)) { diff --git a/source/dnode/vnode/src/sma/smaTimeRange2.c b/source/dnode/vnode/src/sma/smaTimeRange2.c index 09adc1a6a2..df89ec1795 100644 --- a/source/dnode/vnode/src/sma/smaTimeRange2.c +++ b/source/dnode/vnode/src/sma/smaTimeRange2.c @@ -363,8 +363,8 @@ static int32_t tdGetSmaStorageLevel(STSmaKeepCfg *pCfg, int64_t interval) { */ int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) { STsdbCfg *pCfg = SMA_TSDB_CFG(pSma); +#if 0 const SArray *pDataBlocks = (const SArray *)msg; - int64_t testSkey = TSKEY_INITIAL_VAL; // TODO: destroy SSDataBlocks(msg) @@ -386,6 +386,7 @@ int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) { smaWarn("vgId:%d insert tSma data failed since pDataBlocks is empty", SMA_VID(pSma)); return TSDB_CODE_FAILED; } +#endif SSmaEnv *pEnv = SMA_TSMA_ENV(pSma); SSmaStat *pStat = SMA_ENV_STAT(pEnv); @@ -403,178 +404,8 @@ int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) { return TSDB_CODE_FAILED; } - STSma *pTSma = pItem->pTSma; - STSmaWriteH tSmaH = {0}; - - if (tdInitTSmaWriteH(&tSmaH, pSma, pDataBlocks, pTSma->interval, pTSma->intervalUnit) != 0) { - return TSDB_CODE_FAILED; - } - - char rPath[TSDB_FILENAME_LEN] = {0}; - char aPath[TSDB_FILENAME_LEN] = {0}; - snprintf(rPath, TSDB_FILENAME_LEN, "%s%s%" PRIi64, SMA_ENV_PATH(pEnv), TD_DIRSEP, indexUid); - tfsAbsoluteName(SMA_TFS(pSma), SMA_ENV_DID(pEnv), rPath, aPath); - if (!taosCheckExistFile(aPath)) { - if (tfsMkdirRecurAt(SMA_TFS(pSma), rPath, SMA_ENV_DID(pEnv)) != TSDB_CODE_SUCCESS) { - tdUnRefSmaStat(pSma, pStat); - return TSDB_CODE_FAILED; - } - } - - // Step 1: Judge the storage level and days - int32_t storageLevel = tdGetSmaStorageLevel(pCfg, tSmaH.interval); - int32_t minutePerFile = tdGetTSmaDays(pSma, tSmaH.interval, storageLevel); - - char smaKey[SMA_KEY_LEN] = {0}; // key: skey + groupId - char dataBuf[512] = {0}; // val: aggr data // TODO: handle 512 buffer? - void *pDataBuf = NULL; - int32_t sz = taosArrayGetSize(pDataBlocks); - for (int32_t i = 0; i < sz; ++i) { - SSDataBlock *pDataBlock = taosArrayGet(pDataBlocks, i); - int32_t colNum = pDataBlock->info.numOfCols; - int32_t rows = pDataBlock->info.rows; - int32_t rowSize = pDataBlock->info.rowSize; - int64_t groupId = pDataBlock->info.groupId; - for (int32_t j = 0; j < rows; ++j) { - printf("|"); - TSKEY skey = TSKEY_INITIAL_VAL; // the start key of TS window by interval - void *pSmaKey = &smaKey; - bool isStartKey = false; - - int32_t tlen = 0; // reset the len - pDataBuf = &dataBuf; // reset the buf - for (int32_t k = 0; k < colNum; ++k) { - SColumnInfoData *pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k); - void *var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes); - switch (pColInfoData->info.type) { - case TSDB_DATA_TYPE_TIMESTAMP: - if (!isStartKey) { - isStartKey = true; - skey = *(TSKEY *)var; - testSkey = skey; - printf("= skey %" PRIi64 " groupId = %" PRIi64 "|", skey, groupId); - tdEncodeTSmaKey(groupId, skey, &pSmaKey); - } else { - printf(" %" PRIi64 " |", *(int64_t *)var); - tlen += taosEncodeFixedI64(&pDataBuf, *(int64_t *)var); - break; - } - break; - case TSDB_DATA_TYPE_BOOL: - case TSDB_DATA_TYPE_UTINYINT: - printf(" %15d |", *(uint8_t *)var); - tlen += taosEncodeFixedU8(&pDataBuf, *(uint8_t *)var); - break; - case TSDB_DATA_TYPE_TINYINT: - printf(" %15d |", *(int8_t *)var); - tlen += taosEncodeFixedI8(&pDataBuf, *(int8_t *)var); - break; - case TSDB_DATA_TYPE_SMALLINT: - printf(" %15d |", *(int16_t *)var); - tlen += taosEncodeFixedI16(&pDataBuf, *(int16_t *)var); - break; - case TSDB_DATA_TYPE_USMALLINT: - printf(" %15d |", *(uint16_t *)var); - tlen += taosEncodeFixedU16(&pDataBuf, *(uint16_t *)var); - break; - case TSDB_DATA_TYPE_INT: - printf(" %15d |", *(int32_t *)var); - tlen += taosEncodeFixedI32(&pDataBuf, *(int32_t *)var); - break; - case TSDB_DATA_TYPE_FLOAT: - printf(" %15f |", *(float *)var); - tlen += taosEncodeBinary(&pDataBuf, var, sizeof(float)); - break; - case TSDB_DATA_TYPE_UINT: - printf(" %15u |", *(uint32_t *)var); - tlen += taosEncodeFixedU32(&pDataBuf, *(uint32_t *)var); - break; - case TSDB_DATA_TYPE_BIGINT: - printf(" %15ld |", *(int64_t *)var); - tlen += taosEncodeFixedI64(&pDataBuf, *(int64_t *)var); - break; - case TSDB_DATA_TYPE_DOUBLE: - printf(" %15lf |", *(double *)var); - tlen += taosEncodeBinary(&pDataBuf, var, sizeof(double)); - case TSDB_DATA_TYPE_UBIGINT: - printf(" %15lu |", *(uint64_t *)var); - tlen += taosEncodeFixedU64(&pDataBuf, *(uint64_t *)var); - break; - case TSDB_DATA_TYPE_NCHAR: { - char tmpChar[100] = {0}; - strncpy(tmpChar, varDataVal(var), varDataLen(var)); - printf(" %s |", tmpChar); - tlen += taosEncodeBinary(&pDataBuf, varDataVal(var), varDataLen(var)); - break; - } - case TSDB_DATA_TYPE_VARCHAR: { // TSDB_DATA_TYPE_BINARY - char tmpChar[100] = {0}; - strncpy(tmpChar, varDataVal(var), varDataLen(var)); - printf(" %s |", tmpChar); - tlen += taosEncodeBinary(&pDataBuf, varDataVal(var), varDataLen(var)); - break; - } - case TSDB_DATA_TYPE_VARBINARY: - // TODO: add binary/varbinary - TASSERT(0); - default: - printf("the column type %" PRIi16 " is undefined\n", pColInfoData->info.type); - TASSERT(0); - break; - } - } - printf("\n"); - // if ((tlen > 0) && (skey != TSKEY_INITIAL_VAL)) { - if (tlen > 0) { - int32_t fid = (int32_t)(TSDB_KEY_FID(skey, minutePerFile, pCfg->precision)); - - // Step 2: Set the DFile for storage of SMA index, and iterate/split the TSma data and store to B+Tree index - // file - // - Set and open the DFile or the B+Tree file - // TODO: tsdbStartTSmaCommit(); - if (fid != tSmaH.dFile.fid) { - if (tSmaH.dFile.fid != SMA_IVLD_FID) { - tdSmaEndCommit(pEnv); - smaCloseDBF(&tSmaH.dFile); - } - tdSetTSmaDataFile(&tSmaH, indexUid, fid); - smaDebug("@@@ vgId:%d write to DBF %s, days:%d, interval:%" PRIi64 ", storageLevel:%" PRIi32 - " queryKey:%" PRIi64, - SMA_VID(pSma), tSmaH.dFile.path, minutePerFile, tSmaH.interval, storageLevel, testSkey); - if (smaOpenDBF(pEnv->dbEnv, &tSmaH.dFile) != 0) { - smaWarn("vgId:%d open DB file %s failed since %s", SMA_VID(pSma), - tSmaH.dFile.path ? tSmaH.dFile.path : "path is NULL", tstrerror(terrno)); - tdDestroyTSmaWriteH(&tSmaH); - tdUnRefSmaStat(pSma, pStat); - return TSDB_CODE_FAILED; - } - tdSmaBeginCommit(pEnv); - } - - if (tdInsertTSmaBlocks(&tSmaH, &smaKey, SMA_KEY_LEN, dataBuf, tlen, &pEnv->txn) != 0) { - smaWarn("vgId:%d insert tsma data blocks fail for index %" PRIi64 ", skey %" PRIi64 ", groupId %" PRIi64 - " since %s", - SMA_VID(pSma), indexUid, skey, groupId, tstrerror(terrno)); - tdSmaEndCommit(pEnv); - tdDestroyTSmaWriteH(&tSmaH); - tdUnRefSmaStat(pSma, pStat); - return TSDB_CODE_FAILED; - } - - smaDebug("vgId:%d insert tsma data blocks success for index %" PRIi64 ", skey %" PRIi64 ", groupId %" PRIi64, - SMA_VID(pSma), indexUid, skey, groupId); - // TODO:tsdbEndTSmaCommit(); + STSma *pTSma = pItem->pTSma; - // Step 3: reset the SSmaStat - tdResetExpiredWindow(pSma, pStat, indexUid, skey); - } else { - smaWarn("vgId:%d invalid data skey:%" PRIi64 ", tlen %" PRIi32 " during insert tSma data for %" PRIi64, - SMA_VID(pSma), skey, tlen, indexUid); - } - } - } - tdSmaEndCommit(pEnv); // TODO: not commit for every insert - tdDestroyTSmaWriteH(&tSmaH); tdUnRefSmaStat(pSma, pStat); return TSDB_CODE_SUCCESS; @@ -865,6 +696,19 @@ int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t version, const char *pMsg) { return -1; } + if (TD_VID(pSma->pVnode) == pCfg->dstVgId) { + // create stable to save tsma result in dstVgId + SVCreateStbReq pReq = {0}; + pReq.name = pCfg->dstTbName; + pReq.suid = pCfg->dstTbUid; + pReq.schemaRow = pCfg->schemaRow; + pReq.schemaTag = pCfg->schemaTag; + + if (metaCreateSTable(SMA_META(pSma), version, &pReq) < 0) { + return -1; + } + } + tdTSmaAdd(pSma, 1); return 0; } diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index 9be94eb5b6..2f26fba50a 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -241,13 +241,13 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) if (tdUpdateExpireWindow(pTq->pVnode->pSma, msg, ver) != 0) { // TODO handle sma error } - void* data = taosMemoryMalloc(msgLen); - if (data == NULL) { - return -1; - } - memcpy(data, msg, msgLen); + // void* data = taosMemoryMalloc(msgLen); + // if (data == NULL) { + // return -1; + // } + // memcpy(data, msg, msgLen); - tqProcessStreamTrigger(pTq, data); + // tqProcessStreamTrigger(pTq, data); } return 0; diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 7a9c9ef393..dd705011c4 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -174,11 +174,12 @@ int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp } vTrace("vgId:%d, process %s request success, version: %" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), version); - +#if 1 if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, version) < 0) { vError("vgId:%d, failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno)); return -1; } +#endif // commit if need if (vnodeShouldCommit(pVnode)) { @@ -278,8 +279,11 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, int64_t version, SRpcMsg *pMsg, SRp void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) { // TODO - // blockDebugShowData(data); + // blockDebugShowData(data, __func__); +#if 0 tdProcessTSmaInsert(((SVnode *)pVnode)->pSma, smaId, (const char *)data); +#endif + tdProcessTSmaInsert(((SVnode *)pVnode)->pSma, TD_DEBUG_SMA_ID, NULL); } void vnodeUpdateMetaRsp(SVnode *pVnode, STableMetaRsp *pMetaRsp) { -- GitLab