提交 dd309177 编写于 作者: C Cary Xu

sma: refactor

上级 babef48c
......@@ -78,13 +78,6 @@ typedef enum {
TSDB_SMA_TYPE_ROLLUP = 2, // Rollup SMA
} ETsdbSmaType;
typedef enum {
TSDB_BSMA_TYPE_NONE = 0, // no block-wise SMA
TSDB_BSMA_TYPE_I = 1, // sum/min/max(default)
} ETsdbBSmaType;
#define TSDB_BSMA_TYPE_LATEST TSDB_BSMA_TYPE_I
extern char *qtypeStr[];
#define TSDB_PORT_HTTP 11
......
......@@ -1473,6 +1473,8 @@ typedef struct {
int32_t delay;
int8_t nFuncIds;
func_id_t* pFuncIds;
char* qmsg1; // not null: pAst1:qmsg1:SRetention1 => trigger aggr task1
char* qmsg2; // not null: pAst2:qmsg2:SRetention2 => trigger aggr task2
} SRSmaParam;
typedef struct SVCreateTbReq {
......@@ -2298,9 +2300,10 @@ static FORCE_INLINE void tdDestroyTSmaWrapper(STSmaWrapper* pSW) {
}
}
static FORCE_INLINE void tdFreeTSmaWrapper(STSmaWrapper* pSW) {
static FORCE_INLINE void* tdFreeTSmaWrapper(STSmaWrapper* pSW) {
tdDestroyTSmaWrapper(pSW);
taosMemoryFreeClear(pSW);
taosMemoryFree(pSW);
return NULL;
}
static FORCE_INLINE int32_t tEncodeTSma(void** buf, const STSma* pSma) {
......
......@@ -110,7 +110,7 @@ void *tdDecodeSchema(void *buf, STSchema **pRSchema) {
for (int i = 0; i < numOfCols; i++) {
col_type_t type = 0;
int8_t sma = TSDB_BSMA_TYPE_NONE;
int8_t sma = 0;
col_id_t colId = 0;
col_bytes_t bytes = 0;
buf = taosDecodeFixedI8(buf, &type);
......
......@@ -434,6 +434,8 @@ int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) {
for (int8_t i = 0; i < param->nFuncIds; ++i) {
tlen += taosEncodeFixedI32(buf, param->pFuncIds[i]);
}
tlen += taosEncodeString(buf, param->qmsg1);
tlen += taosEncodeString(buf, param->qmsg2);
}
break;
case TD_CHILD_TABLE:
......@@ -496,19 +498,19 @@ void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) {
buf = taosDecodeStringTo(buf, pReq->stbCfg.pTagSchema[i].name);
}
if (pReq->rollup) {
pReq->stbCfg.pRSmaParam = (SRSmaParam *)taosMemoryMalloc(sizeof(SRSmaParam));
pReq->stbCfg.pRSmaParam = (SRSmaParam *)taosMemoryCalloc(1, sizeof(SRSmaParam));
SRSmaParam *param = pReq->stbCfg.pRSmaParam;
buf = taosDecodeBinaryTo(buf, (void *)&param->xFilesFactor, sizeof(param->xFilesFactor));
buf = taosDecodeFixedI32(buf, &param->delay);
buf = taosDecodeFixedI8(buf, &param->nFuncIds);
if (param->nFuncIds > 0) {
param->pFuncIds = (func_id_t *)taosMemoryMalloc(param->nFuncIds * sizeof(func_id_t));
param->pFuncIds = (func_id_t *)taosMemoryCalloc(param->nFuncIds, sizeof(func_id_t));
for (int8_t i = 0; i < param->nFuncIds; ++i) {
buf = taosDecodeFixedI32(buf, param->pFuncIds + i);
}
} else {
param->pFuncIds = NULL;
}
buf = taosDecodeString(buf, &param->qmsg1);
buf = taosDecodeString(buf, &param->qmsg2);
} else {
pReq->stbCfg.pRSmaParam = NULL;
}
......
......@@ -851,6 +851,8 @@ static int32_t mndProcessGetDbCfgReq(SNodeMsg *pReq) {
cfgRsp.cacheLastRow = pDb->cfg.cacheLastRow;
cfgRsp.streamMode = pDb->cfg.streamMode;
cfgRsp.singleSTable = pDb->cfg.singleSTable;
cfgRsp.numOfRetensions = pDb->cfg.numOfRetensions;
cfgRsp.pRetensions = pDb->cfg.pRetensions;
int32_t contLen = tSerializeSDbCfgRsp(NULL, 0, &cfgRsp);
void *pRsp = rpcMallocCont(contLen);
......
......@@ -443,6 +443,15 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt
for (int32_t f = 0; f < pRSmaParam->nFuncIds; ++f) {
*(pRSmaParam->pFuncIds + f) = pStb->aggregationMethod;
}
if (pStb->ast1Len > 0) {
pRSmaParam->qmsg1 = strdup(pStb->pAst1);
}
if (pStb->ast2Len > 0) {
pRSmaParam->qmsg2 = strdup(pStb->pAst2);
}
TASSERT(pRSmaParam->qmsg1 && pRSmaParam->qmsg2);
req.stbCfg.pRSmaParam = pRSmaParam;
}
......@@ -451,6 +460,8 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt
if (pHead == NULL) {
if (pRSmaParam) {
taosMemoryFreeClear(pRSmaParam->pFuncIds);
taosMemoryFreeClear(pRSmaParam->qmsg1);
taosMemoryFreeClear(pRSmaParam->qmsg2);
taosMemoryFreeClear(pRSmaParam);
}
taosMemoryFreeClear(req.stbCfg.pSchema);
......@@ -467,6 +478,8 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt
*pContLen = contLen;
if (pRSmaParam) {
taosMemoryFreeClear(pRSmaParam->pFuncIds);
taosMemoryFreeClear(pRSmaParam->qmsg1);
taosMemoryFreeClear(pRSmaParam->qmsg2);
taosMemoryFreeClear(pRSmaParam);
}
taosMemoryFreeClear(req.stbCfg.pSchema);
......
......@@ -391,6 +391,7 @@ struct SReadH {
#define TSDB_READ_REPO_ID(rh) REPO_ID(TSDB_READ_REPO(rh))
#define TSDB_READ_FSET(rh) (&((rh)->rSet))
#define TSDB_READ_TABLE(rh) ((rh)->pTable)
#define TSDB_READ_TABLE_UID(rh) ((rh)->pTable->uid)
#define TSDB_READ_HEAD_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_HEAD)
#define TSDB_READ_DATA_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_DATA)
#define TSDB_READ_LAST_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_LAST)
......
......@@ -626,14 +626,8 @@ STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid) {
#ifdef META_TDB_SMA_TEST
STSmaWrapper *pSW = NULL;
pSW = taosMemoryCalloc(1, sizeof(*pSW));
if (pSW == NULL) {
return NULL;
}
SMSmaCursor *pCur = metaOpenSmaCursor(pMeta, uid);
if (pCur == NULL) {
taosMemoryFree(pSW);
return NULL;
}
......@@ -653,6 +647,12 @@ STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid) {
continue;
}
if ((pSW == NULL) && ((pSW = taosMemoryCalloc(1, sizeof(*pSW))) == NULL)) {
TDB_FREE(pSmaVal);
metaCloseSmaCursor(pCur);
return NULL;
}
++pSW->number;
STSma *tptr = (STSma *)taosMemoryRealloc(pSW->tSma, pSW->number * sizeof(STSma));
if (tptr == NULL) {
......
......@@ -3260,6 +3260,9 @@ int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReaderT* pTsdbReadHandle, SColumnDat
return TSDB_CODE_SUCCESS;
}
tsdbDebug("vgId:%d succeed to load block statis part for uid %" PRIu64, REPO_ID(pHandle->pTsdb),
TSDB_READ_TABLE_UID(&pHandle->rhelper));
int16_t* colIds = pHandle->defaultLoadColumn->pData;
size_t numOfCols = QH_GET_NUM_OF_COLS(pHandle);
......
......@@ -322,15 +322,18 @@ int tsdbLoadBlockStatis(SReadH *pReadh, SBlock *pBlock) {
ASSERT(pBlock->numOfSubBlocks <= 1);
if (!pBlock->aggrStat) {
tsdbDebug("vgId:%d no need to load block statis part for uid %" PRIu64 " since not exist", REPO_ID(pReadh->pRepo),
TSDB_READ_TABLE_UID(pReadh));
return TSDB_STATIS_NONE;
}
SDFile *pDFileAggr = pBlock->last ? TSDB_READ_SMAL_FILE(pReadh) : TSDB_READ_SMAD_FILE(pReadh);
if (tsdbSeekDFile(pDFileAggr, pBlock->aggrOffset, SEEK_SET) < 0) {
tsdbError("vgId:%d failed to load block aggr part while seek file %s to offset %" PRIu64 " since %s",
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFileAggr), (uint64_t)pBlock->aggrOffset,
tstrerror(terrno));
tsdbError("vgId:%d failed to load block statis part for uid %" PRIu64 " while seek file %s to offset %" PRIu64
" since %s",
TSDB_READ_REPO_ID(pReadh), TSDB_READ_TABLE_UID(pReadh), TSDB_FILE_FULL_NAME(pDFileAggr),
(uint64_t)pBlock->aggrOffset, tstrerror(terrno));
return -1;
}
......@@ -339,25 +342,28 @@ int tsdbLoadBlockStatis(SReadH *pReadh, SBlock *pBlock) {
int64_t nreadAggr = tsdbReadDFile(pDFileAggr, (void *)(pReadh->pAggrBlkData), sizeAggr);
if (nreadAggr < 0) {
tsdbError("vgId:%d failed to load block aggr part while read file %s since %s, offset:%" PRIu64 " len :%" PRIzu,
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFileAggr), tstrerror(terrno),
(uint64_t)pBlock->aggrOffset, sizeAggr);
tsdbError("vgId:%d failed to load block statis part for uid %" PRIu64
" while read file %s since %s, offset:%" PRIu64 " len :%" PRIzu,
TSDB_READ_REPO_ID(pReadh), TSDB_READ_TABLE_UID(pReadh), TSDB_FILE_FULL_NAME(pDFileAggr),
tstrerror(terrno), (uint64_t)pBlock->aggrOffset, sizeAggr);
return -1;
}
if (nreadAggr < sizeAggr) {
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
tsdbError("vgId:%d block aggr part in file %s is corrupted, offset:%" PRIu64 " expected bytes:%" PRIzu
" read bytes: %" PRId64,
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFileAggr), (uint64_t)pBlock->aggrOffset, sizeAggr,
nreadAggr);
tsdbError("vgId:%d block statis part for uid %" PRIu64 " in file %s is corrupted, offset:%" PRIu64
" expected bytes:%" PRIzu " read bytes: %" PRId64,
TSDB_READ_REPO_ID(pReadh), TSDB_READ_TABLE_UID(pReadh), TSDB_FILE_FULL_NAME(pDFileAggr),
(uint64_t)pBlock->aggrOffset, sizeAggr, nreadAggr);
return -1;
}
if (!taosCheckChecksumWhole((uint8_t *)(pReadh->pAggrBlkData), (uint32_t)sizeAggr)) {
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
tsdbError("vgId:%d block aggr part in file %s is corrupted since wrong checksum, offset:%" PRIu64 " len :%" PRIzu,
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFileAggr), (uint64_t)pBlock->aggrOffset, sizeAggr);
tsdbError("vgId:%d block statis part for uid %" PRIu64
"in file %s is corrupted since wrong checksum, offset:%" PRIu64 " len :%" PRIzu,
TSDB_READ_REPO_ID(pReadh), TSDB_READ_TABLE_UID(pReadh), TSDB_FILE_FULL_NAME(pDFileAggr),
(uint64_t)pBlock->aggrOffset, sizeAggr);
return -1;
}
return 0;
......@@ -367,7 +373,7 @@ static int tsdbLoadBlockOffset(SReadH *pReadh, SBlock *pBlock) {
ASSERT(pBlock->numOfSubBlocks <= 1);
SDFile *pDFile = (pBlock->last) ? TSDB_READ_LAST_FILE(pReadh) : TSDB_READ_DATA_FILE(pReadh);
if (tsdbSeekDFile(pDFile, pBlock->offset, SEEK_SET) < 0) {
tsdbError("vgId:%d failed to load block statis part while seek file %s to offset %" PRId64 " since %s",
tsdbError("vgId:%d failed to load block head part while seek file %s to offset %" PRId64 " since %s",
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), (int64_t)pBlock->offset, tstrerror(terrno));
return -1;
}
......@@ -377,14 +383,14 @@ static int tsdbLoadBlockOffset(SReadH *pReadh, SBlock *pBlock) {
int64_t nread = tsdbReadDFile(pDFile, (void *)(pReadh->pBlkData), size);
if (nread < 0) {
tsdbError("vgId:%d failed to load block statis part while read file %s since %s, offset:%" PRId64 " len :%" PRIzu,
tsdbError("vgId:%d failed to load block head part while read file %s since %s, offset:%" PRId64 " len :%" PRIzu,
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), tstrerror(terrno), (int64_t)pBlock->offset, size);
return -1;
}
if (nread < size) {
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
tsdbError("vgId:%d block statis part in file %s is corrupted, offset:%" PRId64 " expected bytes:%" PRIzu
tsdbError("vgId:%d block head part in file %s is corrupted, offset:%" PRId64 " expected bytes:%" PRIzu
" read bytes: %" PRId64,
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), (int64_t)pBlock->offset, size, nread);
return -1;
......@@ -392,7 +398,7 @@ static int tsdbLoadBlockOffset(SReadH *pReadh, SBlock *pBlock) {
if (!taosCheckChecksumWhole((uint8_t *)(pReadh->pBlkData), (uint32_t)size)) {
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
tsdbError("vgId:%d block statis part in file %s is corrupted since wrong checksum, offset:%" PRId64 " len :%" PRIzu,
tsdbError("vgId:%d block head part in file %s is corrupted since wrong checksum, offset:%" PRId64 " len :%" PRIzu,
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), (int64_t)pBlock->offset, size);
return -1;
}
......@@ -546,7 +552,7 @@ static int tsdbLoadBlockDataImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDat
int32_t tsize = (int32_t)tsdbBlockStatisSize(pBlock->numOfCols, (uint32_t)pBlock->blkVer);
if (!taosCheckChecksumWhole((uint8_t *)TSDB_READ_BUF(pReadh), tsize)) {
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
tsdbError("vgId:%d block statis part in file %s is corrupted since wrong checksum, offset:%" PRId64 " len :%d",
tsdbError("vgId:%d block head part in file %s is corrupted since wrong checksum, offset:%" PRId64 " len :%d",
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), (int64_t)pBlock->offset, tsize);
return -1;
}
......
......@@ -643,6 +643,7 @@ int32_t tsdbUpdateExpiredWindowImpl(STsdb *pTsdb, SSubmitReq *pMsg, int64_t vers
SSubmitMsgIter msgIter = {0};
SSubmitBlk *pBlock = NULL;
SInterval interval = {0};
TSKEY lastWinSKey = INT64_MIN;
if (tInitSubmitMsgIter(pMsg, &msgIter) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_FAILED;
......@@ -657,7 +658,7 @@ int32_t tsdbUpdateExpiredWindowImpl(STsdb *pTsdb, SSubmitReq *pMsg, int64_t vers
SSubmitBlkIter blkIter = {0};
if (tInitSubmitBlkIter(pBlock, &blkIter) != TSDB_CODE_SUCCESS) {
tdFreeTSmaWrapper(pSW);
pSW = tdFreeTSmaWrapper(pSW);
break;
}
......@@ -667,31 +668,37 @@ int32_t tsdbUpdateExpiredWindowImpl(STsdb *pTsdb, SSubmitReq *pMsg, int64_t vers
tdFreeTSmaWrapper(pSW);
break;
}
if (pSW == NULL) {
if (!pSW || (pTSma->tableUid != pBlock->suid)) {
if (pSW) {
pSW = tdFreeTSmaWrapper(pSW);
}
if ((pSW = metaGetSmaInfoByTable(REPO_META(pTsdb), pBlock->suid)) == NULL) {
break;
}
if ((pSW->number) <= 0 || (pSW->tSma == NULL)) {
tdFreeTSmaWrapper(pSW);
pSW = tdFreeTSmaWrapper(pSW);
break;
}
pTSma = pSW->tSma;
}
interval.interval = pTSma->interval;
interval.intervalUnit = pTSma->intervalUnit;
interval.offset = pTSma->offset;
interval.precision = REPO_CFG(pTsdb)->precision;
interval.sliding = pTSma->sliding;
interval.slidingUnit = pTSma->slidingUnit;
interval.interval = pTSma->interval;
interval.intervalUnit = pTSma->intervalUnit;
interval.offset = pTSma->offset;
interval.precision = REPO_CFG(pTsdb)->precision;
interval.sliding = pTSma->sliding;
interval.slidingUnit = pTSma->slidingUnit;
}
TSKEY winSKey = taosTimeTruncate(TD_ROW_KEY(row), &interval, interval.precision);
tsdbSetExpiredWindow(pTsdb, pItemsHash, pTSma->indexUid, winSKey, version);
// TODO: release only when suid changes.
tdDestroyTSmaWrapper(pSW);
taosMemoryFreeClear(pSW);
if (lastWinSKey != winSKey) {
lastWinSKey = winSKey;
tsdbSetExpiredWindow(pTsdb, pItemsHash, pTSma->indexUid, winSKey, version);
} else {
tsdbDebug("vgId:%d smaIndex %" PRIi64 ", put skey %" PRIi64 " to expire window ignore as duplicated",
REPO_ID(pTsdb), pTSma->indexUid, winSKey);
}
}
}
......
......@@ -215,10 +215,20 @@ static int vnodeProcessCreateStbReq(SVnode *pVnode, void *pReq) {
return -1;
}
// TODO: remove the log
if (vCreateTbReq.stbCfg.pRSmaParam) {
printf("qmsg1 len = %d, body = %s\n", (int32_t)strlen(vCreateTbReq.stbCfg.pRSmaParam->qmsg1),
vCreateTbReq.stbCfg.pRSmaParam->qmsg1);
printf("qmsg2 len = %d, body = %s\n", (int32_t)strlen(vCreateTbReq.stbCfg.pRSmaParam->qmsg2),
vCreateTbReq.stbCfg.pRSmaParam->qmsg2);
}
taosMemoryFree(vCreateTbReq.stbCfg.pSchema);
taosMemoryFree(vCreateTbReq.stbCfg.pTagSchema);
if (vCreateTbReq.stbCfg.pRSmaParam) {
taosMemoryFree(vCreateTbReq.stbCfg.pRSmaParam->pFuncIds);
taosMemoryFree(vCreateTbReq.stbCfg.pRSmaParam->qmsg1);
taosMemoryFree(vCreateTbReq.stbCfg.pRSmaParam->qmsg2);
taosMemoryFree(vCreateTbReq.stbCfg.pRSmaParam);
}
taosMemoryFree(vCreateTbReq.name);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册