提交 744a1989 编写于 作者: C Cary Xu

add version for tsma expired window updating

上级 a1147241
...@@ -393,10 +393,11 @@ int32_t tsdbDropTSma(STsdb *pTsdb, char *pMsg); ...@@ -393,10 +393,11 @@ int32_t tsdbDropTSma(STsdb *pTsdb, char *pMsg);
* @brief When submit msg received, update the relative expired window synchronously. * @brief When submit msg received, update the relative expired window synchronously.
* *
* @param pTsdb * @param pTsdb
* @param msg * @param pMsg
* @param version
* @return int32_t * @return int32_t
*/ */
int32_t tsdbUpdateSmaWindow(STsdb *pTsdb, SSubmitReq *pMsg); int32_t tsdbUpdateSmaWindow(STsdb *pTsdb, SSubmitReq *pMsg, int64_t version);
/** /**
* @brief Insert tSma(Time-range-wise SMA) data from stream computing engine * @brief Insert tSma(Time-range-wise SMA) data from stream computing engine
......
...@@ -82,7 +82,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t versi ...@@ -82,7 +82,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t versi
memcpy(data, msg, msgLen); memcpy(data, msg, msgLen);
if (msgType == TDMT_VND_SUBMIT) { if (msgType == TDMT_VND_SUBMIT) {
if (tsdbUpdateSmaWindow(pTq->pVnode->pTsdb, msg) != 0) { if (tsdbUpdateSmaWindow(pTq->pVnode->pTsdb, msg, version) != 0) {
return -1; return -1;
} }
} }
......
...@@ -105,8 +105,8 @@ struct SSmaStat { ...@@ -105,8 +105,8 @@ struct SSmaStat {
// declaration of static functions // declaration of static functions
// expired window // expired window
static int32_t tsdbUpdateExpiredWindowImpl(STsdb *pTsdb, SSubmitReq *pMsg); static int32_t tsdbUpdateExpiredWindowImpl(STsdb *pTsdb, SSubmitReq *pMsg, int64_t version);
static int32_t tsdbSetExpiredWindow(STsdb *pTsdb, SHashObj *pItemsHash, int64_t indexUid, int64_t winSKey); static int32_t tsdbSetExpiredWindow(STsdb *pTsdb, SHashObj *pItemsHash, int64_t indexUid, int64_t winSKey, int64_t version);
static int32_t tsdbInitSmaStat(SSmaStat **pSmaStat); static int32_t tsdbInitSmaStat(SSmaStat **pSmaStat);
static void *tsdbFreeSmaStatItem(SSmaStatItem *pSmaStatItem); static void *tsdbFreeSmaStatItem(SSmaStatItem *pSmaStatItem);
static int32_t tsdbDestroySmaState(SSmaStat *pSmaStat); static int32_t tsdbDestroySmaState(SSmaStat *pSmaStat);
...@@ -544,7 +544,7 @@ static int32_t tsdbCheckAndInitSmaEnv(STsdb *pTsdb, int8_t smaType) { ...@@ -544,7 +544,7 @@ static int32_t tsdbCheckAndInitSmaEnv(STsdb *pTsdb, int8_t smaType) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
}; };
static int32_t tsdbSetExpiredWindow(STsdb *pTsdb, SHashObj *pItemsHash, int64_t indexUid, int64_t winSKey) { static int32_t tsdbSetExpiredWindow(STsdb *pTsdb, SHashObj *pItemsHash, int64_t indexUid, int64_t winSKey, int64_t version) {
SSmaStatItem *pItem = taosHashGet(pItemsHash, &indexUid, sizeof(indexUid)); SSmaStatItem *pItem = taosHashGet(pItemsHash, &indexUid, sizeof(indexUid));
if (pItem == NULL) { if (pItem == NULL) {
// TODO: use TSDB_SMA_STAT_EXPIRED and update by stream computing later // TODO: use TSDB_SMA_STAT_EXPIRED and update by stream computing later
...@@ -578,8 +578,7 @@ static int32_t tsdbSetExpiredWindow(STsdb *pTsdb, SHashObj *pItemsHash, int64_t ...@@ -578,8 +578,7 @@ static int32_t tsdbSetExpiredWindow(STsdb *pTsdb, SHashObj *pItemsHash, int64_t
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
int8_t state = TSDB_SMA_STAT_EXPIRED; if (taosHashPut(pItem->expiredWindows, &winSKey, sizeof(TSKEY), &version, sizeof(version)) != 0) {
if (taosHashPut(pItem->expiredWindows, &winSKey, sizeof(TSKEY), &state, sizeof(state)) != 0) {
// If error occurs during taosHashPut expired windows, remove the smaIndex from pTsdb->pSmaStat, thus TSDB would // If error occurs during taosHashPut expired windows, remove the smaIndex from pTsdb->pSmaStat, thus TSDB would
// tell query module to query raw TS data. // tell query module to query raw TS data.
// N.B. // N.B.
...@@ -606,7 +605,8 @@ static int32_t tsdbSetExpiredWindow(STsdb *pTsdb, SHashObj *pItemsHash, int64_t ...@@ -606,7 +605,8 @@ static int32_t tsdbSetExpiredWindow(STsdb *pTsdb, SHashObj *pItemsHash, int64_t
* @param msg SSubmitReq * @param msg SSubmitReq
* @return int32_t * @return int32_t
*/ */
int32_t tsdbUpdateExpiredWindowImpl(STsdb *pTsdb, SSubmitReq *pMsg) { int32_t tsdbUpdateExpiredWindowImpl(STsdb *pTsdb, SSubmitReq *pMsg, int64_t version) {
// no time-range-sma, just return success
if (atomic_load_16(&REPO_TSMA_NUM(pTsdb)) <= 0) { if (atomic_load_16(&REPO_TSMA_NUM(pTsdb)) <= 0) {
tsdbTrace("vgId:%d not update expire window since no tSma", REPO_ID(pTsdb)); tsdbTrace("vgId:%d not update expire window since no tSma", REPO_ID(pTsdb));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -621,20 +621,6 @@ int32_t tsdbUpdateExpiredWindowImpl(STsdb *pTsdb, SSubmitReq *pMsg) { ...@@ -621,20 +621,6 @@ int32_t tsdbUpdateExpiredWindowImpl(STsdb *pTsdb, SSubmitReq *pMsg) {
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
// TODO: decode the msg from Stream Computing module => start
#ifdef TSDB_SMA_TESTx
int64_t indexUid = SMA_TEST_INDEX_UID;
const int32_t SMA_TEST_EXPIRED_WINDOW_SIZE = 10;
TSKEY expiredWindows[SMA_TEST_EXPIRED_WINDOW_SIZE];
TSKEY skey1 = 1646987196 * 1e3;
for (int32_t i = 0; i < SMA_TEST_EXPIRED_WINDOW_SIZE; ++i) {
expiredWindows[i] = skey1 + i;
}
#else
#endif
// TODO: decode the msg <= end
if (tsdbCheckAndInitSmaEnv(pTsdb, TSDB_SMA_TYPE_TIME_RANGE) != TSDB_CODE_SUCCESS) { if (tsdbCheckAndInitSmaEnv(pTsdb, TSDB_SMA_TYPE_TIME_RANGE) != TSDB_CODE_SUCCESS) {
terrno = TSDB_CODE_TDB_INIT_FAILED; terrno = TSDB_CODE_TDB_INIT_FAILED;
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
...@@ -700,7 +686,7 @@ int32_t tsdbUpdateExpiredWindowImpl(STsdb *pTsdb, SSubmitReq *pMsg) { ...@@ -700,7 +686,7 @@ int32_t tsdbUpdateExpiredWindowImpl(STsdb *pTsdb, SSubmitReq *pMsg) {
TSKEY winSKey = taosTimeTruncate(TD_ROW_KEY(row), &interval, interval.precision); TSKEY winSKey = taosTimeTruncate(TD_ROW_KEY(row), &interval, interval.precision);
tsdbSetExpiredWindow(pTsdb, pItemsHash, pTSma->indexUid, winSKey); tsdbSetExpiredWindow(pTsdb, pItemsHash, pTSma->indexUid, winSKey, version);
// TODO: release only when suid changes. // TODO: release only when suid changes.
tdDestroyTSmaWrapper(pSW); tdDestroyTSmaWrapper(pSW);
...@@ -975,7 +961,7 @@ static int tsdbSmaBeginCommit(SSmaEnv *pEnv) { ...@@ -975,7 +961,7 @@ static int tsdbSmaBeginCommit(SSmaEnv *pEnv) {
// start a new txn // start a new txn
tdbTxnOpen(pTxn, 0, poolMalloc, poolFree, pEnv->pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED); tdbTxnOpen(pTxn, 0, poolMalloc, poolFree, pEnv->pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
if (tdbBegin(pEnv->dbEnv, pTxn) != 0) { if (tdbBegin(pEnv->dbEnv, pTxn) != 0) {
tsdbWarn("tsdbSma tdb restart txn fail"); tsdbWarn("tsdbSma tdb begin commit fail");
return -1; return -1;
} }
return 0; return 0;
...@@ -986,7 +972,7 @@ static int tsdbSmaEndCommit(SSmaEnv *pEnv) { ...@@ -986,7 +972,7 @@ static int tsdbSmaEndCommit(SSmaEnv *pEnv) {
// Commit current txn // Commit current txn
if (tdbCommit(pEnv->dbEnv, pTxn) != 0) { if (tdbCommit(pEnv->dbEnv, pTxn) != 0) {
tsdbWarn("tsdbSma tdb commit fail"); tsdbWarn("tsdbSma tdb end commit fail");
return -1; return -1;
} }
tdbTxnClose(pTxn); tdbTxnClose(pTxn);
...@@ -1009,12 +995,12 @@ static int tsdbSmaEndCommit(SSmaEnv *pEnv) { ...@@ -1009,12 +995,12 @@ static int tsdbSmaEndCommit(SSmaEnv *pEnv) {
static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, int64_t indexUid, const char *msg) { static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, int64_t indexUid, const char *msg) {
STsdbCfg *pCfg = REPO_CFG(pTsdb); STsdbCfg *pCfg = REPO_CFG(pTsdb);
const SArray *pDataBlocks = (const SArray *)msg; const SArray *pDataBlocks = (const SArray *)msg;
SSmaEnv *pEnv = atomic_load_ptr(&REPO_TSMA_ENV(pTsdb));
if (pEnv == NULL) { // For super table aggregation, the sma data is stored in vgroup calculated from the hash value of stable name. Thus
terrno = TSDB_CODE_INVALID_PTR; // the sma data would arrive ahead of the update-expired-window msg.
tsdbWarn("vgId:%d insert tSma data failed since pTSmaEnv is NULL", REPO_ID(pTsdb)); if (tsdbCheckAndInitSmaEnv(pTsdb, TSDB_SMA_TYPE_TIME_RANGE) != TSDB_CODE_SUCCESS) {
return terrno; terrno = TSDB_CODE_TDB_INIT_FAILED;
return TSDB_CODE_FAILED;
} }
if (pDataBlocks == NULL) { if (pDataBlocks == NULL) {
...@@ -1029,6 +1015,7 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, int64_t indexUid, const char ...@@ -1029,6 +1015,7 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, int64_t indexUid, const char
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
SSmaEnv *pEnv = REPO_TSMA_ENV(pTsdb);
SSmaStat *pStat = SMA_ENV_STAT(pEnv); SSmaStat *pStat = SMA_ENV_STAT(pEnv);
SSmaStatItem *pItem = NULL; SSmaStatItem *pItem = NULL;
...@@ -1683,9 +1670,9 @@ int32_t tsdbInsertTSmaData(STsdb *pTsdb, int64_t indexUid, const char *msg) { ...@@ -1683,9 +1670,9 @@ int32_t tsdbInsertTSmaData(STsdb *pTsdb, int64_t indexUid, const char *msg) {
return code; return code;
} }
int32_t tsdbUpdateSmaWindow(STsdb *pTsdb, SSubmitReq *pMsg) { int32_t tsdbUpdateSmaWindow(STsdb *pTsdb, SSubmitReq *pMsg, int64_t version) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
if ((code = tsdbUpdateExpiredWindowImpl(pTsdb, pMsg)) < 0) { if ((code = tsdbUpdateExpiredWindowImpl(pTsdb, pMsg, version)) < 0) {
tsdbWarn("vgId:%d update expired sma window failed since %s", REPO_ID(pTsdb), tstrerror(terrno)); tsdbWarn("vgId:%d update expired sma window failed since %s", REPO_ID(pTsdb), tstrerror(terrno));
} }
return code; return code;
......
...@@ -409,7 +409,7 @@ TEST(testCase, tSma_Data_Insert_Query_Test) { ...@@ -409,7 +409,7 @@ TEST(testCase, tSma_Data_Insert_Query_Test) {
EXPECT_EQ(tdScanAndConvertSubmitMsg(pMsg), TSDB_CODE_SUCCESS); EXPECT_EQ(tdScanAndConvertSubmitMsg(pMsg), TSDB_CODE_SUCCESS);
EXPECT_EQ(tsdbUpdateSmaWindow(pTsdb, pMsg), 0); EXPECT_EQ(tsdbUpdateSmaWindow(pTsdb, pMsg, 0), 0);
// init // init
const int32_t tSmaGroupSize = 4; const int32_t tSmaGroupSize = 4;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册