提交 d7eface9 编写于 作者: C Cary Xu

feat: rollup sma optimization

上级 500461cf
...@@ -249,8 +249,10 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey ...@@ -249,8 +249,10 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey
pMergeInfo->keyFirst = TMIN(pMergeInfo->keyFirst, rowKey); pMergeInfo->keyFirst = TMIN(pMergeInfo->keyFirst, rowKey);
pMergeInfo->keyLast = TMAX(pMergeInfo->keyLast, rowKey); pMergeInfo->keyLast = TMAX(pMergeInfo->keyLast, rowKey);
lastKey = rowKey; lastKey = rowKey;
++pCols->numOfRows; if (pCols) {
tsdbAppendTableRowToCols(pTable, pCols, &pSchema, row, false); ++pCols->numOfRows;
tsdbAppendTableRowToCols(pTable, pCols, &pSchema, row, false);
}
} else { } else {
tsdbAppendTableRowToCols(pTable, pCols, &pSchema, row, true); tsdbAppendTableRowToCols(pTable, pCols, &pSchema, row, true);
} }
...@@ -279,7 +281,7 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey ...@@ -279,7 +281,7 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey
} }
#endif #endif
} }
if (lastKey != TSKEY_INITIAL_VAL) { if (pCols && (lastKey != TSKEY_INITIAL_VAL)) {
++pCols->numOfRows; ++pCols->numOfRows;
} }
......
...@@ -1638,7 +1638,7 @@ int32_t tsdbCreateTSma(STsdb *pTsdb, char *pMsg) { ...@@ -1638,7 +1638,7 @@ int32_t tsdbCreateTSma(STsdb *pTsdb, char *pMsg) {
tsdbWarn("vgId:%d tsma create msg received but deserialize failed since %s", REPO_ID(pTsdb), terrstr(terrno)); tsdbWarn("vgId:%d tsma create msg received but deserialize failed since %s", REPO_ID(pTsdb), terrstr(terrno));
return -1; return -1;
} }
tsdbDebug("vgId:%d tsma create msg %s:%" PRIi64 " for table %" PRIi64 " received", REPO_ID(pTsdb), tsdbDebug("vgId:%d tsma create msg %s:%" PRIi64 " for table %" PRIi64 " received", REPO_ID(pTsdb),
vCreateSmaReq.tSma.indexName, vCreateSmaReq.tSma.indexUid, vCreateSmaReq.tSma.tableUid); vCreateSmaReq.tSma.indexName, vCreateSmaReq.tSma.indexUid, vCreateSmaReq.tSma.tableUid);
...@@ -2006,6 +2006,12 @@ static FORCE_INLINE int32_t tsdbExecuteRSmaImpl(STsdb *pTsdb, const void *pMsg, ...@@ -2006,6 +2006,12 @@ static FORCE_INLINE int32_t tsdbExecuteRSmaImpl(STsdb *pTsdb, const void *pMsg,
qTaskInfo_t *taskInfo, STSchema *pTSchema, tb_uid_t suid, tb_uid_t uid, qTaskInfo_t *taskInfo, STSchema *pTSchema, tb_uid_t suid, tb_uid_t uid,
int8_t level) { int8_t level) {
SArray *pResult = NULL; SArray *pResult = NULL;
if (!taskInfo) {
tsdbDebug("vgId:%d no qTaskInfo to execute rsma %" PRIi8 " task for suid:%" PRIu64, REPO_ID(pTsdb), level, suid);
return TSDB_CODE_SUCCESS;
}
tsdbDebug("vgId:%d execute rsma %" PRIi8 " task for qTaskInfo:%p suid:%" PRIu64, REPO_ID(pTsdb), level, taskInfo, tsdbDebug("vgId:%d execute rsma %" PRIi8 " task for qTaskInfo:%p suid:%" PRIu64, REPO_ID(pTsdb), level, taskInfo,
suid); suid);
...@@ -2071,10 +2077,18 @@ static int32_t tsdbExecuteRSma(STsdb *pTsdb, const void *pMsg, int32_t inputType ...@@ -2071,10 +2077,18 @@ static int32_t tsdbExecuteRSma(STsdb *pTsdb, const void *pMsg, int32_t inputType
tsdbDebug("vgId:%d no rsma info for suid:%" PRIu64, REPO_ID(pTsdb), suid); tsdbDebug("vgId:%d no rsma info for suid:%" PRIu64, REPO_ID(pTsdb), suid);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (!pRSmaInfo->taskInfo[0]) {
tsdbDebug("vgId:%d no rsma qTaskInfo for suid:%" PRIu64, REPO_ID(pTsdb), suid);
return TSDB_CODE_SUCCESS;
}
if (inputType == STREAM_DATA_TYPE_SUBMIT_BLOCK) { if (inputType == STREAM_DATA_TYPE_SUBMIT_BLOCK) {
// TODO: use the proper schema instead of 0, and cache STSchema in cache // TODO: use the proper schema instead of 0, and cache STSchema in cache
STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, suid, 0); STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, suid, 0);
if (!pTSchema) {
terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION;
return TSDB_CODE_FAILED;
}
tsdbExecuteRSmaImpl(pTsdb, pMsg, inputType, pRSmaInfo->taskInfo[0], pTSchema, suid, uid, TSDB_RETENTION_L1); tsdbExecuteRSmaImpl(pTsdb, pMsg, inputType, pRSmaInfo->taskInfo[0], pTSchema, suid, uid, TSDB_RETENTION_L1);
tsdbExecuteRSmaImpl(pTsdb, pMsg, inputType, pRSmaInfo->taskInfo[1], pTSchema, suid, uid, TSDB_RETENTION_L2); tsdbExecuteRSmaImpl(pTsdb, pMsg, inputType, pRSmaInfo->taskInfo[1], pTSchema, suid, uid, TSDB_RETENTION_L2);
taosMemoryFree(pTSchema); taosMemoryFree(pTSchema);
......
...@@ -25,7 +25,7 @@ const SVnodeCfg vnodeCfgDefault = { ...@@ -25,7 +25,7 @@ const SVnodeCfg vnodeCfgDefault = {
.isHeap = false, .isHeap = false,
.isWeak = 0, .isWeak = 0,
.tsdbCfg = {.precision = TSDB_TIME_PRECISION_MILLI, .tsdbCfg = {.precision = TSDB_TIME_PRECISION_MILLI,
.update = 0, .update = 1,
.compression = 2, .compression = 2,
.slLevel = 5, .slLevel = 5,
.days = 10, .days = 10,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册