/* * Copyright (c) 2019 TAOS Data, Inc. * * This program is free software: you can use, redistribute, and/or modify * it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ #include "sma.h" #include "tq.h" #include "tsdb.h" #define SMA_STORAGE_MINUTES_MAX 86400 #define SMA_STORAGE_MINUTES_DAY 1440 #define SMA_STORAGE_SPLIT_FACTOR 14400 // least records in tsma file static int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t version, const char *pMsg); static int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg); static int32_t tdProcessTSmaGetDaysImpl(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days); int32_t tdProcessTSmaInsert(SSma *pSma, int64_t indexUid, const char *msg) { int32_t code = TSDB_CODE_SUCCESS; if ((code = tdProcessTSmaInsertImpl(pSma, indexUid, msg)) < 0) { smaError("vgId:%d, insert tsma data failed since %s", SMA_VID(pSma), tstrerror(code)); } return code; } int32_t tdProcessTSmaCreate(SSma *pSma, int64_t version, const char *msg) { int32_t code = tdProcessTSmaCreateImpl(pSma, version, msg); return code; } int32_t smaGetTSmaDays(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days) { int32_t code = tdProcessTSmaGetDaysImpl(pCfg, pCont, contLen, days); return code; } /** * @brief Judge the tsma file split days * * @param pCfg * @param pCont * @param contLen * @param days unit is minute * @return int32_t */ static int32_t tdProcessTSmaGetDaysImpl(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days) { int32_t code = 0; int32_t lino = 0; SDecoder coder = {0}; tDecoderInit(&coder, pCont, contLen); STSma tsma = {0}; if (tDecodeSVCreateTSmaReq(&coder, &tsma) < 0) { code = TSDB_CODE_MSG_DECODE_ERROR; TSDB_CHECK_CODE(code, lino, _exit); } STsdbCfg *pTsdbCfg = &pCfg->tsdbCfg; int64_t sInterval = convertTimeFromPrecisionToUnit(tsma.interval, pTsdbCfg->precision, TIME_UNIT_SECOND); if (sInterval <= 0) { *days = pTsdbCfg->days; goto _exit; } int64_t records = pTsdbCfg->days * 60 / sInterval; if (records >= SMA_STORAGE_SPLIT_FACTOR) { *days = pTsdbCfg->days; } else { int64_t mInterval = convertTimeFromPrecisionToUnit(tsma.interval, pTsdbCfg->precision, TIME_UNIT_MINUTE); int64_t daysPerFile = mInterval * SMA_STORAGE_MINUTES_DAY * 2; if (daysPerFile > SMA_STORAGE_MINUTES_MAX) { *days = SMA_STORAGE_MINUTES_MAX; } else { *days = (int32_t)daysPerFile; } if (*days < pTsdbCfg->days) { *days = pTsdbCfg->days; } } _exit: if (code) { smaWarn("vgId:%d, failed at line %d to get tsma days %d since %s", pCfg->vgId, lino, *days, tstrerror(code)); } else { smaDebug("vgId:%d, succeed to get tsma days %d", pCfg->vgId, *days); } tDecoderClear(&coder); return code; } /** * @brief create tsma meta and result stable * * @param pSma * @param version * @param pMsg * @return int32_t */ static int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t version, const char *pMsg) { int32_t code = 0; int32_t lino = 0; SSmaCfg *pCfg = (SSmaCfg *)pMsg; SName stbFullName = {0}; SVCreateStbReq pReq = {0}; if (TD_VID(pSma->pVnode) == pCfg->dstVgId) { // create tsma meta in dstVgId if (metaCreateTSma(SMA_META(pSma), version, pCfg) < 0) { code = terrno; TSDB_CHECK_CODE(code, lino, _exit); } // create stable to save tsma result in dstVgId tNameFromString(&stbFullName, pCfg->dstTbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); pReq.name = (char *)tNameGetTableName(&stbFullName); pReq.suid = pCfg->dstTbUid; pReq.schemaRow = pCfg->schemaRow; pReq.schemaTag = pCfg->schemaTag; if (metaCreateSTable(SMA_META(pSma), version, &pReq) < 0) { code = terrno; TSDB_CHECK_CODE(code, lino, _exit); } } else { code = terrno = TSDB_CODE_TSMA_INVALID_STAT; TSDB_CHECK_CODE(code, lino, _exit); } _exit: if (code) { smaError("vgId:%d, failed at line %d to create sma index %s %" PRIi64 " on stb:%" PRIi64 ", dstSuid:%" PRIi64 " dstTb:%s dstVg:%d", SMA_VID(pSma), lino, pCfg->indexName, pCfg->indexUid, pCfg->tableUid, pCfg->dstTbUid, pReq.name, pCfg->dstVgId); } else { smaDebug("vgId:%d, success to create sma index %s %" PRIi64 " on stb:%" PRIi64 ", dstSuid:%" PRIi64 " dstTb:%s dstVg:%d", SMA_VID(pSma), pCfg->indexName, pCfg->indexUid, pCfg->tableUid, pCfg->dstTbUid, pReq.name, pCfg->dstVgId); } return code; } int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema *pTSchema, SSchemaWrapper *pTagSchemaWrapper, bool createTb, int64_t suid, const char *stbFullName, SBatchDeleteReq *pDeleteReq, void **ppData, int32_t *pLen) { int32_t code = 0; int32_t lino = 0; void *pBuf = NULL; int32_t len = 0; SSubmitReq2 *pReq = NULL; SArray *tagArray = NULL; SArray *createTbArray = NULL; SArray *pVals = NULL; int32_t sz = taosArrayGetSize(pBlocks); tagArray = taosArrayInit(1, sizeof(STagVal)); createTbArray = taosArrayInit(sz, POINTER_BYTES); pReq = taosMemoryCalloc(1, sizeof(SSubmitReq2)); pReq->aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData)); if(!tagArray || !createTbArray || !pReq || !pReq->aSubmitTbData) { code = terrno == TSDB_CODE_SUCCESS ? TSDB_CODE_OUT_OF_MEMORY : terrno; TSDB_CHECK_CODE(code, lino, _exit); } // create table req if (createTb) { for (int32_t i = 0; i < sz; ++i) { SSDataBlock *pDataBlock = taosArrayGet(pBlocks, i); SVCreateTbReq *pCreateTbReq = NULL; if (pDataBlock->info.type == STREAM_DELETE_RESULT) { taosArrayPush(createTbArray, &pCreateTbReq); continue; } if (!(pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateStbReq)))) { code = TSDB_CODE_OUT_OF_MEMORY; TSDB_CHECK_CODE(code, lino, _exit); }; // don't move to the end of loop as to destroy in the end of func when error occur taosArrayPush(createTbArray, &pCreateTbReq); // set const pCreateTbReq->flags = 0; pCreateTbReq->type = TSDB_CHILD_TABLE; pCreateTbReq->ctb.suid = suid; // set super table name SName name = {0}; tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); pCreateTbReq->ctb.stbName = taosStrdup((char *)tNameGetTableName(&name)); // taosStrdup(stbFullName); // set tag content taosArrayClear(tagArray); STagVal tagVal = { .cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1, .type = TSDB_DATA_TYPE_UBIGINT, .i64 = (int64_t)pDataBlock->info.id.groupId, }; taosArrayPush(tagArray, &tagVal); pCreateTbReq->ctb.tagNum = taosArrayGetSize(tagArray); STag *pTag = NULL; tTagNew(tagArray, 1, false, &pTag); if (pTag == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; TSDB_CHECK_CODE(code, lino, _exit); } pCreateTbReq->ctb.pTag = (uint8_t *)pTag; // set tag name SArray *tagName = taosArrayInit(1, TSDB_COL_NAME_LEN); char tagNameStr[TSDB_COL_NAME_LEN] = {0}; strcpy(tagNameStr, "group_id"); taosArrayPush(tagName, tagNameStr); pCreateTbReq->ctb.tagName = tagName; // set table name if (pDataBlock->info.parTbName[0]) { pCreateTbReq->name = taosStrdup(pDataBlock->info.parTbName); } else { pCreateTbReq->name = buildCtbNameByGroupId(stbFullName, pDataBlock->info.id.groupId); } } } // SSubmitTbData req for (int32_t i = 0; i < sz; ++i) { SSDataBlock *pDataBlock = taosArrayGet(pBlocks, i); if (pDataBlock->info.type == STREAM_DELETE_RESULT) { pDeleteReq->suid = suid; pDeleteReq->deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq)); tqBuildDeleteReq(stbFullName, pDataBlock, pDeleteReq, ""); continue; } int32_t rows = pDataBlock->info.rows; SSubmitTbData tbData = {0}; if (!(tbData.aRowP = taosArrayInit(rows, sizeof(SRow *)))) { code = terrno; TSDB_CHECK_CODE(code, lino, _exit); } tbData.suid = suid; tbData.uid = 0; // uid is assigned by vnode tbData.sver = pTSchema->version; if (createTb) { tbData.pCreateTbReq = taosArrayGetP(createTbArray, i); if (tbData.pCreateTbReq) tbData.flags = SUBMIT_REQ_AUTO_CREATE_TABLE; } if (!pVals && !(pVals = taosArrayInit(pTSchema->numOfCols, sizeof(SColVal)))) { taosArrayDestroy(tbData.aRowP); code = terrno; TSDB_CHECK_CODE(code, lino, _exit); } for (int32_t j = 0; j < rows; ++j) { taosArrayClear(pVals); for (int32_t k = 0; k < pTSchema->numOfCols; k++) { const STColumn *pCol = &pTSchema->columns[k]; SColumnInfoData *pColData = taosArrayGet(pDataBlock->pDataBlock, k); if (colDataIsNull_s(pColData, j)) { SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type); taosArrayPush(pVals, &cv); } else { void *data = colDataGetData(pColData, j); if (IS_STR_DATA_TYPE(pCol->type)) { SValue sv = (SValue){.nData = varDataLen(data), .pData = varDataVal(data)}; // address copy, no value SColVal cv = COL_VAL_VALUE(pCol->colId, pCol->type, sv); taosArrayPush(pVals, &cv); } else { SValue sv; memcpy(&sv.val, data, tDataTypes[pCol->type].bytes); SColVal cv = COL_VAL_VALUE(pCol->colId, pCol->type, sv); taosArrayPush(pVals, &cv); } } } SRow *pRow = NULL; if ((code = tRowBuild(pVals, (STSchema *)pTSchema, &pRow)) < 0) { tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE); TSDB_CHECK_CODE(code, lino, _exit); } taosArrayPush(tbData.aRowP, &pRow); } taosArrayPush(pReq->aSubmitTbData, &tbData); } // encode tEncodeSize(tEncodeSubmitReq, pReq, len, code); if (TSDB_CODE_SUCCESS == code) { SEncoder encoder; len += sizeof(SSubmitReq2Msg); if (!(pBuf = rpcMallocCont(len))) { code = terrno; TSDB_CHECK_CODE(code, lino, _exit); } ((SSubmitReq2Msg *)pBuf)->header.vgId = TD_VID(pVnode); ((SSubmitReq2Msg *)pBuf)->header.contLen = htonl(len); ((SSubmitReq2Msg *)pBuf)->version = htobe64(1); tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SSubmitReq2Msg)), len - sizeof(SSubmitReq2Msg)); if (tEncodeSubmitReq(&encoder, pReq) < 0) { tEncoderClear(&encoder); code = TSDB_CODE_OUT_OF_MEMORY; TSDB_CHECK_CODE(code, lino, _exit); } tEncoderClear(&encoder); } _exit: taosArrayDestroy(createTbArray); taosArrayDestroy(tagArray); taosArrayDestroy(pVals); if (pReq) { tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE); taosMemoryFree(pReq); } if (code) { rpcFreeCont(pBuf); taosArrayDestroy(pDeleteReq->deleteReqs); smaWarn("vgId:%d, failed at line %d since %s", TD_VID(pVnode), lino, tstrerror(code)); } else { if (ppData) *ppData = pBuf; if (pLen) *pLen = len; } return code; } static int32_t tsmaProcessDelReq(SSma *pSma, int64_t indexUid, SBatchDeleteReq *pDelReq) { int32_t code = 0; int32_t lino = 0; if (taosArrayGetSize(pDelReq->deleteReqs) > 0) { int32_t len = 0; tEncodeSize(tEncodeSBatchDeleteReq, pDelReq, len, code); TSDB_CHECK_CODE(code, lino, _exit); void *pBuf = rpcMallocCont(len + sizeof(SMsgHead)); if (!pBuf) { code = terrno; TSDB_CHECK_CODE(code, lino, _exit); } SEncoder encoder; tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SMsgHead)), len); tEncodeSBatchDeleteReq(&encoder, pDelReq); tEncoderClear(&encoder); ((SMsgHead *)pBuf)->vgId = TD_VID(pSma->pVnode); SRpcMsg delMsg = {.msgType = TDMT_VND_BATCH_DEL, .pCont = pBuf, .contLen = len + sizeof(SMsgHead)}; code = tmsgPutToQueue(&pSma->pVnode->msgCb, WRITE_QUEUE, &delMsg); TSDB_CHECK_CODE(code, lino, _exit); } _exit: taosArrayDestroy(pDelReq->deleteReqs); if (code) { smaError("vgId:%d, failed at line %d to process delete req for smaIndex %" PRIi64 " since %s", SMA_VID(pSma), lino, indexUid, tstrerror(code)); } return code; } /** * @brief Insert/Update Time-range-wise SMA data. * * @param pSma * @param msg * @return int32_t */ static int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) { int32_t code = 0; int32_t lino = 0; const SArray *pDataBlocks = (const SArray *)msg; if (taosArrayGetSize(pDataBlocks) <= 0) { code = TSDB_CODE_TSMA_INVALID_PARA; TSDB_CHECK_CODE(code, lino, _exit); } if (tdCheckAndInitSmaEnv(pSma, TSDB_SMA_TYPE_TIME_RANGE) != 0) { code = TSDB_CODE_TSMA_INIT_FAILED; TSDB_CHECK_CODE(code, lino, _exit); } SSmaEnv *pEnv = SMA_TSMA_ENV(pSma); SSmaStat *pStat = NULL; STSmaStat *pTsmaStat = NULL; if (!pEnv || !(pStat = SMA_ENV_STAT(pEnv))) { code = TSDB_CODE_TSMA_INVALID_ENV; TSDB_CHECK_CODE(code, lino, _exit); } pTsmaStat = SMA_STAT_TSMA(pStat); if (!pTsmaStat->pTSma) { terrno = 0; STSma *pTSma = metaGetSmaInfoByIndex(SMA_META(pSma), indexUid); if (!pTSma) { code = terrno ? terrno : TSDB_CODE_TSMA_INVALID_PTR; TSDB_CHECK_CODE(code, lino, _exit); } pTsmaStat->pTSma = pTSma; pTsmaStat->pTSchema = metaGetTbTSchema(SMA_META(pSma), pTSma->dstTbUid, -1, 1); if (!pTsmaStat->pTSchema) { code = terrno ? terrno : TSDB_CODE_TSMA_INVALID_PTR; TSDB_CHECK_CODE(code, lino, _exit); } } if (ASSERTS(pTsmaStat->pTSma->indexUid == indexUid, "indexUid:%" PRIi64 " != %" PRIi64, pTsmaStat->pTSma->indexUid, indexUid)) { code = TSDB_CODE_APP_ERROR; TSDB_CHECK_CODE(code, lino, _exit); } SBatchDeleteReq deleteReq = {0}; void *pSubmitReq = NULL; int32_t contLen = 0; code = smaBlockToSubmit(pSma->pVnode, (const SArray *)msg, pTsmaStat->pTSchema, &pTsmaStat->pTSma->schemaTag, true, pTsmaStat->pTSma->dstTbUid, pTsmaStat->pTSma->dstTbName, &deleteReq, &pSubmitReq, &contLen); TSDB_CHECK_CODE(code, lino, _exit); if ((terrno = tsmaProcessDelReq(pSma, indexUid, &deleteReq)) != 0) { goto _exit; } #if 0 if (!strncasecmp("td.tsma.rst.tb", pTsmaStat->pTSma->dstTbName, 14)) { terrno = TSDB_CODE_APP_ERROR; smaError("vgId:%d, tsma insert for smaIndex %" PRIi64 " failed since %s, %s", SMA_VID(pSma), indexUid, pTsmaStat->pTSma->indexUid, tstrerror(terrno), pTsmaStat->pTSma->dstTbName); goto _err; } #endif SRpcMsg submitReqMsg = { .msgType = TDMT_VND_SUBMIT, .pCont = pSubmitReq, .contLen = contLen, }; code = tmsgPutToQueue(&pSma->pVnode->msgCb, WRITE_QUEUE, &submitReqMsg); TSDB_CHECK_CODE(code, lino, _exit); _exit: if (code) { smaError("vgId:%d, %s failed at line %d since %s, smaIndex:%" PRIi64, SMA_VID(pSma), __func__, lino, tstrerror(code), indexUid); } return code; }