提交 2b2485d1 编写于 作者: C Cary Xu

feat: tsma exp wnds process

上级 ba103f3c
......@@ -351,9 +351,6 @@ int32_t* taosGetErrno();
#define TSDB_CODE_TDB_NO_CACHE_LAST_ROW TAOS_DEF_ERROR_CODE(0, 0x0619)
#define TSDB_CODE_TDB_TABLE_RECREATED TAOS_DEF_ERROR_CODE(0, 0x061A)
#define TSDB_CODE_TDB_TDB_ENV_OPEN_ERROR TAOS_DEF_ERROR_CODE(0, 0x061B)
#define TSDB_CODE_TDB_NO_SMA_INDEX_IN_META TAOS_DEF_ERROR_CODE(0, 0x061C)
#define TSDB_CODE_TDB_INVALID_SMA_STAT TAOS_DEF_ERROR_CODE(0, 0x061D)
#define TSDB_CODE_TDB_TSMA_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x061E)
// query
#define TSDB_CODE_QRY_INVALID_QHANDLE TAOS_DEF_ERROR_CODE(0, 0x0700)
......@@ -684,6 +681,19 @@ int32_t* taosGetErrno();
#define TSDB_CODE_SML_INVALID_DATA TAOS_DEF_ERROR_CODE(0, 0x3002)
#define TSDB_CODE_SML_INVALID_DB_CONF TAOS_DEF_ERROR_CODE(0, 0x3003)
//tsma
#define TSDB_CODE_TSMA_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x3100)
#define TSDB_CODE_TSMA_NO_INDEX_IN_META TAOS_DEF_ERROR_CODE(0, 0x3101)
#define TSDB_CODE_TSMA_INVALID_ENV TAOS_DEF_ERROR_CODE(0, 0x3102)
#define TSDB_CODE_TSMA_INVALID_STAT TAOS_DEF_ERROR_CODE(0, 0x3103)
#define TSDB_CODE_TSMA_NO_INDEX_IN_CACHE TAOS_DEF_ERROR_CODE(0, 0x3104)
#define TSDB_CODE_TSMA_RM_SKEY_IN_HASH TAOS_DEF_ERROR_CODE(0, 0x3105)
//rsma
#define TSDB_CODE_RSMA_INVALID_ENV TAOS_DEF_ERROR_CODE(0, 0x3150)
#define TSDB_CODE_RSMA_INVALID_STAT TAOS_DEF_ERROR_CODE(0, 0x3151)
#ifdef __cplusplus
}
#endif
......
......@@ -4049,7 +4049,7 @@ int32_t tDecodeSVClrTsmaExpWndsReq(SDecoder *pCoder, SVClrTsmaExpWndsReq *pReq)
int32_t tEncodeSVClrTsmaExpWndsRsp(SEncoder *pCoder, const SVClrTsmaExpWndsRsp *pReq) {
if (tStartEncode(pCoder) < 0) return -1;
if (tEncodeI64(pCoder, pReq->indexUid) < 0) return -1;
if (tEncodeI32(pCoder, pReq->code) < 0) return -1;
if (tEncodeI32v(pCoder, pReq->code) < 0) return -1;
tEndEncode(pCoder);
return 0;
}
......@@ -4057,7 +4057,7 @@ int32_t tEncodeSVClrTsmaExpWndsRsp(SEncoder *pCoder, const SVClrTsmaExpWndsRsp *
int32_t tDecodeSVClrTsmaExpWndsRsp(SDecoder *pCoder, SVClrTsmaExpWndsRsp *pReq) {
if (tStartDecode(pCoder) < 0) return -1;
if (tDecodeI64(pCoder, &pReq->indexUid) < 0) return -1;
if (tDecodeI32(pCoder, &pReq->code) < 0) return -1;
if (tDecodeI32v(pCoder, &pReq->code) < 0) return -1;
tEndDecode(pCoder);
return 0;
}
......
......@@ -70,7 +70,7 @@ struct SSmaStatItem {
* N.B. only applicable to tsma
*/
int8_t state; // ETsdbSmaStat
SHashObj *expiredWindows; // key: skey of time window, value: version
SHashObj *expireWindows; // key: skey of time window, value: version
STSma *pTSma; // cache schema
};
......@@ -128,7 +128,7 @@ int32_t tdInsertRSmaData(SSma *pSma, char *msg);
int32_t tdRefSmaStat(SSma *pSma, SSmaStat *pStat);
int32_t tdUnRefSmaStat(SSma *pSma, SSmaStat *pStat);
int32_t tdCheckAndInitSmaEnv(SSma *pSma, int8_t smaType);
int32_t tdCheckAndInitSmaEnv(SSma *pSma, int8_t smaType, bool onlyCheck);
int32_t tdLockSma(SSma *pSma);
int32_t tdUnLockSma(SSma *pSma);
......@@ -219,7 +219,8 @@ static int32_t tdInitSmaEnv(SSma *pSma, int8_t smaType, const char *path, SDisk
void *tdFreeRSmaInfo(SRSmaInfo *pInfo);
int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t version, const char *pMsg);
int32_t tdUpdateExpiredWindowImpl(SSma *pSma, const SSubmitReq *pMsg, int64_t version);
int32_t tdUpdateExpireWindowImpl(SSma *pSma, const SSubmitReq *pMsg, int64_t version);
int32_t tdClearExpireWindowImpl(SSma *pSma, const SVClrTsmaExpWndsReq *pMsg);
// TODO: This is the basic params, and should wrap the params to a queryHandle.
int32_t tdGetTSmaDataImpl(SSma *pSma, char *pData, int64_t indexUid, TSKEY querySKey, int32_t nMaxResult);
......
......@@ -151,6 +151,7 @@ int32_t smaOpen(SVnode* pVnode);
int32_t smaClose(SSma* pSma);
int32_t tdUpdateExpireWindow(SSma* pSma, const SSubmitReq* pMsg, int64_t version);
int32_t tdClearExpireWindow(SSma* pSma, const SVClrTsmaExpWndsReq* pMsg);
int32_t tdProcessTSmaCreate(SSma* pSma, int64_t version, const char* msg);
int32_t tdProcessTSmaInsert(SSma* pSma, int64_t indexUid, const char* msg);
......@@ -227,7 +228,7 @@ struct SVnode {
SQHandle* pQuery;
};
#define TD_VID(PVNODE) (PVNODE)->config.vgId
#define TD_VID(PVNODE) ((PVNODE)->config.vgId)
#define VND_TSDB(vnd) ((vnd)->pTsdb)
#define VND_RSMA0(vnd) ((vnd)->pTsdb)
......
......@@ -38,7 +38,7 @@ int32_t metaCreateTSma(SMeta *pMeta, int64_t version, SSmaCfg *pCfg) {
metaReaderInit(&mr, pMeta, 0);
if (metaGetTableEntryByUid(&mr, pCfg->indexUid) == 0) {
#if 1
terrno = TSDB_CODE_TDB_TSMA_ALREADY_EXIST;
terrno = TSDB_CODE_TSMA_ALREADY_EXIST;
metaReaderClear(&mr);
return -1; // don't goto _err;
#else
......
......@@ -38,8 +38,15 @@ int32_t tdProcessTSmaCreate(SSma* pSma, int64_t version, const char* msg) {
int32_t tdUpdateExpireWindow(SSma* pSma, const SSubmitReq* pMsg, int64_t version) {
int32_t code = TSDB_CODE_SUCCESS;
if ((code = tdUpdateExpiredWindowImpl(pSma, pMsg, version)) < 0) {
smaWarn("vgId:%d, update expired sma window failed since %s", SMA_VID(pSma), tstrerror(terrno));
if ((code = tdUpdateExpireWindowImpl(pSma, pMsg, version)) < 0) {
smaWarn("vgId:%d, update expire window failed since %s", SMA_VID(pSma), tstrerror(terrno));
}
return code;
}
int32_t tdClearExpireWindow(SSma* pSma, const SVClrTsmaExpWndsReq* pMsg) {
int32_t code = TSDB_CODE_SUCCESS;
if ((code = tdClearExpireWindowImpl(pSma, pMsg)) < 0) {
smaWarn("vgId:%d, update expire window failed since %s", SMA_VID(pSma), tstrerror(terrno));
}
return code;
}
......
......@@ -242,7 +242,7 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType) {
}
/**
* 1. Lazy mode utilized when init SSmaStat to update expired window(or hungry mode when tdNew).
* 1. Lazy mode utilized when init SSmaStat to update expire window(or hungry mode when tdNew).
* 2. Currently, there is mutex lock when init SSmaEnv, thus no need add lock on SSmaStat, and please add lock if
* tdInitSmaStat invoked in other multithread environment later.
*/
......@@ -280,7 +280,7 @@ void *tdFreeSmaStatItem(SSmaStatItem *pSmaStatItem) {
if (pSmaStatItem) {
tDestroyTSma(pSmaStatItem->pTSma);
taosMemoryFreeClear(pSmaStatItem->pTSma);
taosHashCleanup(pSmaStatItem->expiredWindows);
taosHashCleanup(pSmaStatItem->expireWindows);
taosMemoryFreeClear(pSmaStatItem);
}
return NULL;
......@@ -341,7 +341,7 @@ int32_t tdUnLockSma(SSma *pSma) {
return 0;
}
int32_t tdCheckAndInitSmaEnv(SSma *pSma, int8_t smaType) {
int32_t tdCheckAndInitSmaEnv(SSma *pSma, int8_t smaType, bool onlyCheck) {
SSmaEnv *pEnv = NULL;
// return if already init
......
......@@ -65,7 +65,7 @@ static FORCE_INLINE int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SA
pRSmaInfo = taosHashGet(SMA_STAT_INFO_HASH(pStat), suid, sizeof(tb_uid_t));
if (!pRSmaInfo || !(pRSmaInfo = *(SRSmaInfo **)pRSmaInfo)) {
smaError("vgId:%d, failed to get rsma info for uid:%" PRIi64, SMA_VID(pSma), *suid);
terrno = TSDB_CODE_TDB_INVALID_SMA_STAT;
terrno = TSDB_CODE_RSMA_INVALID_STAT;
return TSDB_CODE_FAILED;
}
......@@ -132,7 +132,7 @@ int32_t tdFetchTbUidList(SSma *pSma, STbUidStore **ppStore, tb_uid_t suid, tb_ui
SSmaStat *pStat = SMA_ENV_STAT(pEnv);
SHashObj *infoHash = NULL;
if (!pStat || !(infoHash = SMA_STAT_INFO_HASH(pStat))) {
terrno = TSDB_CODE_TDB_INVALID_SMA_STAT;
terrno = TSDB_CODE_RSMA_INVALID_STAT;
return TSDB_CODE_FAILED;
}
......@@ -181,7 +181,7 @@ int32_t tdProcessRSmaCreate(SVnode *pVnode, SVCreateStbReq *pReq) {
return TSDB_CODE_SUCCESS;
}
if (tdCheckAndInitSmaEnv(pSma, TSDB_SMA_TYPE_ROLLUP) != TSDB_CODE_SUCCESS) {
if (tdCheckAndInitSmaEnv(pSma, TSDB_SMA_TYPE_ROLLUP, false) != TSDB_CODE_SUCCESS) {
terrno = TSDB_CODE_TDB_INIT_FAILED;
return TSDB_CODE_FAILED;
}
......
......@@ -69,10 +69,10 @@ static int32_t tdInitTSmaFile(STSmaReadH *pSmaH, int64_t indexUid, TSKEY skey);
static bool tdSetAndOpenTSmaFile(STSmaReadH *pReadH, TSKEY *queryKey);
static int32_t tdInsertTSmaBlocks(STSmaWriteH *pSmaH, void *smaKey, int32_t keyLen, void *pData, int32_t dataLen,
TXN *txn);
// expired window
// expire window
static int32_t tdSetExpiredWindow(SSma *pSma, SHashObj *pItemsHash, int64_t indexUid, int64_t winSKey, int64_t version);
static int32_t tdResetExpiredWindow(SSma *pSma, SSmaStat *pStat, int64_t indexUid, TSKEY skey);
static int32_t tdSetExpireWindow(SSma *pSma, SHashObj *pItemsHash, int64_t indexUid, int64_t winSKey, int64_t version);
static int32_t tdResetExpireWindow(SSma *pSma, SSmaStat *pStat, int64_t indexUid, TSKEY skey);
static int32_t tdDropTSmaDataImpl(SSma *pSma, int64_t indexUid);
// read data
......@@ -319,7 +319,7 @@ int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) {
// For super table aggregation, the sma data is stored in vgroup calculated from the hash value of stable name. Thus
// the sma data would arrive ahead of the update-expired-window msg.
if (tdCheckAndInitSmaEnv(pSma, TSDB_SMA_TYPE_TIME_RANGE) != TSDB_CODE_SUCCESS) {
if (tdCheckAndInitSmaEnv(pSma, TSDB_SMA_TYPE_TIME_RANGE, false) != TSDB_CODE_SUCCESS) {
terrno = TSDB_CODE_TDB_INIT_FAILED;
return TSDB_CODE_FAILED;
}
......@@ -347,7 +347,7 @@ int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) {
}
if (!pItem || !(pItem = *(SSmaStatItem **)pItem) || tdSmaStatIsDropped(pItem)) {
terrno = TSDB_CODE_TDB_INVALID_SMA_STAT;
terrno = TSDB_CODE_TSMA_INVALID_STAT;
tdUnRefSmaStat(pSma, pStat);
return TSDB_CODE_FAILED;
}
......@@ -515,7 +515,7 @@ int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) {
// TODO:tsdbEndTSmaCommit();
// Step 3: reset the SSmaStat
tdResetExpiredWindow(pSma, pStat, indexUid, skey);
tdResetExpireWindow(pSma, pStat, indexUid, skey);
} else {
smaWarn("vgId:%d, invalid data skey:%" PRIi64 ", tlen %" PRIi32 " during insert tSma data for %" PRIi64,
SMA_VID(pSma), skey, tlen, indexUid);
......@@ -572,7 +572,7 @@ static int32_t tdInsertTSmaBlocks(STSmaWriteH *pSmaH, void *smaKey, int32_t keyL
}
/**
* @brief When sma data received from stream computing, make the relative expired window valid.
* @brief When sma data received from stream computing, make the relative expire window valid.
*
* @param pSma
* @param pStat
......@@ -580,7 +580,7 @@ static int32_t tdInsertTSmaBlocks(STSmaWriteH *pSmaH, void *smaKey, int32_t keyL
* @param skey
* @return int32_t
*/
static int32_t tdResetExpiredWindow(SSma *pSma, SSmaStat *pStat, int64_t indexUid, TSKEY skey) {
static int32_t tdResetExpireWindow(SSma *pSma, SSmaStat *pStat, int64_t indexUid, TSKEY skey) {
SSmaStatItem *pItem = NULL;
tdRefSmaStat(pSma, pStat);
......@@ -591,14 +591,14 @@ static int32_t tdResetExpiredWindow(SSma *pSma, SSmaStat *pStat, int64_t indexUi
if ((pItem) && ((pItem = *(SSmaStatItem **)pItem))) {
// pItem resides in hash buffer all the time unless drop sma index
// TODO: multithread protect
if (taosHashRemove(pItem->expiredWindows, &skey, sizeof(TSKEY)) != 0) {
if (taosHashRemove(pItem->expireWindows, &skey, sizeof(TSKEY)) != 0) {
// error handling
tdUnRefSmaStat(pSma, pStat);
smaWarn("vgId:%d, remove skey %" PRIi64 " from expired window for sma index %" PRIi64 " fail", SMA_VID(pSma), skey,
smaWarn("vgId:%d, remove skey %" PRIi64 " from expire window for sma index %" PRIi64 " fail", SMA_VID(pSma), skey,
indexUid);
return TSDB_CODE_FAILED;
}
smaDebug("vgId:%d, remove skey %" PRIi64 " from expired window for sma index %" PRIi64 " succeed", SMA_VID(pSma),
smaDebug("vgId:%d, remove skey %" PRIi64 " from expire window for sma index %" PRIi64 " succeed", SMA_VID(pSma),
skey, indexUid);
// TODO: use a standalone interface to received state upate notification from stream computing module.
/**
......@@ -612,7 +612,7 @@ static int32_t tdResetExpiredWindow(SSma *pSma, SSmaStat *pStat, int64_t indexUi
} else {
// error handling
tdUnRefSmaStat(pSma, pStat);
smaWarn("vgId:%d, expired window %" PRIi64 " not exists for sma index %" PRIi64, SMA_VID(pSma), skey, indexUid);
smaWarn("vgId:%d, expire window %" PRIi64 " not exists for sma index %" PRIi64, SMA_VID(pSma), skey, indexUid);
return TSDB_CODE_FAILED;
}
......@@ -711,7 +711,7 @@ int32_t tdGetTSmaDataImpl(SSma *pSma, char *pData, int64_t indexUid, TSKEY query
int32_t nQueryWin = taosArrayGetSize(pQuerySKey);
for (int32_t n = 0; n < nQueryWin; ++n) {
TSKEY skey = taosArrayGet(pQuerySKey, n);
if (taosHashGet(pItem->expiredWindows, &skey, sizeof(TSKEY))) {
if (taosHashGet(pItem->expireWindows, &skey, sizeof(TSKEY))) {
// TODO: mark this window as expired.
}
}
......@@ -721,18 +721,18 @@ int32_t tdGetTSmaDataImpl(SSma *pSma, char *pData, int64_t indexUid, TSKEY query
int8_t smaStat = 0;
if (!tdSmaStatIsOK(pItem, &smaStat)) { // TODO: multiple check for large scale sma query
tdUnRefSmaStat(pSma, pStat);
terrno = TSDB_CODE_TDB_INVALID_SMA_STAT;
terrno = TSDB_CODE_TSMA_INVALID_STAT;
smaWarn("vgId:%d, getTSmaDataImpl failed from index %" PRIi64 " since %s %" PRIi8, SMA_VID(pSma), indexUid,
tstrerror(terrno), smaStat);
return TSDB_CODE_FAILED;
}
if (taosHashGet(pItem->expiredWindows, &querySKey, sizeof(TSKEY))) {
if (taosHashGet(pItem->expireWindows, &querySKey, sizeof(TSKEY))) {
// TODO: mark this window as expired.
smaDebug("vgId:%d, skey %" PRIi64 " of window exists in expired window for index %" PRIi64, SMA_VID(pSma), querySKey,
smaDebug("vgId:%d, skey %" PRIi64 " of window exists in expire window for index %" PRIi64, SMA_VID(pSma), querySKey,
indexUid);
} else {
smaDebug("vgId:%d, skey %" PRIi64 " of window not in expired window for index %" PRIi64, SMA_VID(pSma), querySKey,
smaDebug("vgId:%d, skey %" PRIi64 " of window not in expire window for index %" PRIi64, SMA_VID(pSma), querySKey,
indexUid);
}
......@@ -747,7 +747,7 @@ int32_t tdGetTSmaDataImpl(SSma *pSma, char *pData, int64_t indexUid, TSKEY query
tdUnRefSmaStat(pSma, pStat);
tdInitTSmaFile(&tReadH, indexUid, querySKey);
smaDebug("### vgId:%d read from DBF %s days:%d, interval:%" PRIi64 ", storageLevel:%" PRIi8 " queryKey:%" PRIi64,
smaDebug("### vgId:%d, read from DBF %s days:%d, interval:%" PRIi64 ", storageLevel:%" PRIi8 " queryKey:%" PRIi64,
SMA_VID(pSma), tReadH.dFile.path, tReadH.days, tReadH.interval, tReadH.storageLevel, querySKey);
if (smaOpenDBF(pEnv->dbEnv, &tReadH.dFile) != 0) {
smaWarn("vgId:%d, open DBF %s failed since %s", SMA_VID(pSma), tReadH.dFile.path, tstrerror(terrno));
......@@ -860,9 +860,9 @@ static SSmaStatItem *tdNewSmaStatItem(int8_t state) {
}
pItem->state = state;
pItem->expiredWindows = taosHashInit(SMA_STATE_ITEM_HASH_SLOT, taosGetDefaultHashFunction(TSDB_DATA_TYPE_TIMESTAMP),
true, HASH_ENTRY_LOCK);
if (!pItem->expiredWindows) {
pItem->expireWindows = taosHashInit(SMA_STATE_ITEM_HASH_SLOT, taosGetDefaultHashFunction(TSDB_DATA_TYPE_TIMESTAMP),
true, HASH_ENTRY_LOCK);
if (!pItem->expireWindows) {
taosMemoryFreeClear(pItem);
return NULL;
}
......@@ -870,8 +870,7 @@ static SSmaStatItem *tdNewSmaStatItem(int8_t state) {
return pItem;
}
static int32_t tdSetExpiredWindow(SSma *pSma, SHashObj *pItemsHash, int64_t indexUid, int64_t winSKey,
int64_t version) {
static int32_t tdSetExpireWindow(SSma *pSma, SHashObj *pItemsHash, int64_t indexUid, int64_t winSKey, int64_t version) {
SSmaStatItem *pItem = taosHashGet(pItemsHash, &indexUid, sizeof(indexUid));
if (!pItem) {
// TODO: use TSDB_SMA_STAT_EXPIRED and update by stream computing later
......@@ -885,8 +884,8 @@ static int32_t tdSetExpiredWindow(SSma *pSma, SHashObj *pItemsHash, int64_t inde
// cache smaMeta
STSma *pTSma = metaGetSmaInfoByIndex(SMA_META(pSma), indexUid);
if (!pTSma) {
terrno = TSDB_CODE_TDB_NO_SMA_INDEX_IN_META;
taosHashCleanup(pItem->expiredWindows);
terrno = TSDB_CODET_TSMA_NO_INDEX_IN_META;
taosHashCleanup(pItem->expireWindows);
taosMemoryFree(pItem);
smaWarn("vgId:%d, set expire window, get tsma meta failed for smaIndex %" PRIi64 " since %s", SMA_VID(pSma),
indexUid, tstrerror(terrno));
......@@ -896,7 +895,7 @@ static int32_t tdSetExpiredWindow(SSma *pSma, SHashObj *pItemsHash, int64_t inde
if (taosHashPut(pItemsHash, &indexUid, sizeof(indexUid), &pItem, sizeof(pItem)) != 0) {
// If error occurs during put smaStatItem, free the resources of pItem
taosHashCleanup(pItem->expiredWindows);
taosHashCleanup(pItem->expireWindows);
taosMemoryFree(pItem);
return TSDB_CODE_FAILED;
}
......@@ -905,14 +904,14 @@ static int32_t tdSetExpiredWindow(SSma *pSma, SHashObj *pItemsHash, int64_t inde
return TSDB_CODE_FAILED;
}
if (taosHashPut(pItem->expiredWindows, &winSKey, sizeof(TSKEY), &version, sizeof(version)) != 0) {
// If error occurs during taosHashPut expired windows, remove the smaIndex from pSma->pSmaStat, thus TSDB would
if (taosHashPut(pItem->expireWindows, &winSKey, sizeof(TSKEY), &version, sizeof(version)) != 0) {
// If error occurs during taosHashPut expire windows, remove the smaIndex from pSma->pSmaStat, thus TSDB would
// tell query module to query raw TS data.
// N.B.
// 1) It is assumed to be extemely little probability event of fail to taosHashPut.
// 2) This would solve the inconsistency to some extent, but not completely, unless we record all expired
// windows failed to put into hash table.
taosHashCleanup(pItem->expiredWindows);
taosHashCleanup(pItem->expireWindows);
taosMemoryFreeClear(pItem->pTSma);
taosHashRemove(pItemsHash, &indexUid, sizeof(indexUid));
smaWarn("vgId:%d, smaIndex %" PRIi64 ", put skey %" PRIi64 " to expire window fail", SMA_VID(pSma), indexUid,
......@@ -926,13 +925,13 @@ static int32_t tdSetExpiredWindow(SSma *pSma, SHashObj *pItemsHash, int64_t inde
}
/**
* @brief Update expired window according to msg from stream computing module.
* @brief Update expire window according to msg from stream computing module.
*
* @param pSma
* @param msg SSubmitReq
* @return int32_t
*/
int32_t tdUpdateExpiredWindowImpl(SSma *pSma, const SSubmitReq *pMsg, int64_t version) {
int32_t tdUpdateExpireWindowImpl(SSma *pSma, const SSubmitReq *pMsg, int64_t version) {
// no time-range-sma, just return success
if (atomic_load_16(&SMA_TSMA_NUM(pSma)) <= 0) {
smaTrace("vgId:%d, not update expire window since no tSma", SMA_VID(pSma));
......@@ -945,7 +944,7 @@ int32_t tdUpdateExpiredWindowImpl(SSma *pSma, const SSubmitReq *pMsg, int64_t ve
return TSDB_CODE_FAILED;
}
if (tdCheckAndInitSmaEnv(pSma, TSDB_SMA_TYPE_TIME_RANGE) < 0) {
if (tdCheckAndInitSmaEnv(pSma, TSDB_SMA_TYPE_TIME_RANGE, false) < 0) {
smaError("vgId:%d, init sma env failed since %s", SMA_VID(pSma), terrstr(terrno));
terrno = TSDB_CODE_TDB_INIT_FAILED;
return TSDB_CODE_FAILED;
......@@ -1019,7 +1018,7 @@ int32_t tdUpdateExpiredWindowImpl(SSma *pSma, const SSubmitReq *pMsg, int64_t ve
if (lastWinSKey != winSKey) {
lastWinSKey = winSKey;
if (tdSetExpiredWindow(pSma, pItemsHash, pTSma->indexUid, winSKey, version) < 0) {
if (tdSetExpireWindow(pSma, pItemsHash, pTSma->indexUid, winSKey, version) < 0) {
pSW = tFreeTSmaWrapper(pSW, false);
tdUnRefSmaStat(pSma, pStat);
return TSDB_CODE_FAILED;
......
......@@ -72,10 +72,10 @@ static int32_t tdInitTSmaFile(STSmaReadH *pSmaH, int64_t indexUid, TSKEY skey);
static bool tdSetAndOpenTSmaFile(STSmaReadH *pReadH, TSKEY *queryKey);
static int32_t tdInsertTSmaBlocks(STSmaWriteH *pSmaH, void *smaKey, int32_t keyLen, void *pData, int32_t dataLen,
TXN *txn);
// expired window
// expire window
static int32_t tdSetExpiredWindow(SSma *pSma, SHashObj *pItemsHash, int64_t indexUid, int64_t winSKey, int64_t version);
static int32_t tdResetExpiredWindow(SSma *pSma, SSmaStat *pStat, int64_t indexUid, TSKEY skey);
static int32_t tdSetExpireWindow(SSma *pSma, SHashObj *pItemsHash, int64_t indexUid, int64_t winSKey, int64_t version);
static int32_t tdResetExpireWindow(SSma *pSma, SSmaStat *pStat, int64_t indexUid, TSKEY skey);
static int32_t tdDropTSmaDataImpl(SSma *pSma, int64_t indexUid);
/**
......@@ -149,7 +149,7 @@ static int32_t tdInitTSmaReadH(STSmaReadH *pSmaH, SSma *pSma, int64_t interval,
}
/**
* @brief Init of tSma FS
* @brief Init of tsma FS
*
* @param pReadH
* @param indexUid
......@@ -169,7 +169,7 @@ static int32_t tdInitTSmaFile(STSmaReadH *pSmaH, int64_t indexUid, TSKEY skey) {
}
/**
* @brief Set and open tSma file if it has key locates in queryWin.
* @brief Set and open tsma file if it has key locates in queryWin.
*
* @param pReadH
* @param param
......@@ -335,7 +335,7 @@ static int32_t tdGetTSmaDays(SSma *pSma, int64_t interval, int32_t storageLevel)
}
/**
* @brief Judge the tSma storage level
* @brief Judge the tsma storage level
*
* @param pCfg
* @param interval
......@@ -362,7 +362,7 @@ static int32_t tdGetSmaStorageLevel(STSmaKeepCfg *pCfg, int64_t interval) {
* @return int32_t
*/
int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) {
STsdbCfg *pCfg = SMA_TSDB_CFG(pSma);
STsdbCfg *pCfg = SMA_TSDB_CFG(pSma);
#if 0
const SArray *pDataBlocks = (const SArray *)msg;
......@@ -370,20 +370,20 @@ int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) {
// For super table aggregation, the sma data is stored in vgroup calculated from the hash value of stable name. Thus
// the sma data would arrive ahead of the update-expired-window msg.
if (tdCheckAndInitSmaEnv(pSma, TSDB_SMA_TYPE_TIME_RANGE) != TSDB_CODE_SUCCESS) {
if (tdCheckAndInitSmaEnv(pSma, TSDB_SMA_TYPE_TIME_RANGE, false) != TSDB_CODE_SUCCESS) {
terrno = TSDB_CODE_TDB_INIT_FAILED;
return TSDB_CODE_FAILED;
}
if (!pDataBlocks) {
terrno = TSDB_CODE_INVALID_PTR;
smaWarn("vgId:%d insert tSma data failed since pDataBlocks is NULL", SMA_VID(pSma));
smaWarn("vgId:%d, insert tsma data failed since pDataBlocks is NULL", SMA_VID(pSma));
return terrno;
}
if (taosArrayGetSize(pDataBlocks) <= 0) {
terrno = TSDB_CODE_INVALID_PARA;
smaWarn("vgId:%d insert tSma data failed since pDataBlocks is empty", SMA_VID(pSma));
smaWarn("vgId:%d, insert tsma data failed since pDataBlocks is empty", SMA_VID(pSma));
return TSDB_CODE_FAILED;
}
#endif
......@@ -399,7 +399,7 @@ int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) {
}
if (!pItem || !(pItem = *(SSmaStatItem **)pItem) || tdSmaStatIsDropped(pItem)) {
terrno = TSDB_CODE_TDB_INVALID_SMA_STAT;
terrno = TSDB_CODE_TSMA_INVALID_STAT;
tdUnRefSmaStat(pSma, pStat);
return TSDB_CODE_FAILED;
}
......@@ -414,7 +414,7 @@ int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) {
int32_t tdDropTSmaData(SSma *pSma, int64_t indexUid) {
int32_t code = TSDB_CODE_SUCCESS;
if ((code = tdDropTSmaDataImpl(pSma, indexUid)) < 0) {
smaWarn("vgId:%d drop tSma data failed since %s", SMA_VID(pSma), tstrerror(terrno));
smaWarn("vgId:%d, drop tsma data failed since %s", SMA_VID(pSma), tstrerror(terrno));
}
return code;
}
......@@ -435,11 +435,11 @@ static int32_t tdInsertTSmaBlocks(STSmaWriteH *pSmaH, void *smaKey, int32_t keyL
// TODO: insert tsma data blocks into B+Tree(TTB)
if (smaSaveSmaToDB(pDBFile, smaKey, keyLen, pData, dataLen, txn) != 0) {
smaWarn("vgId:%d insert tsma data blocks into %s: smaKey %" PRIx64 "-%" PRIx64 ", dataLen %" PRIu32 " fail",
smaWarn("vgId:%d, insert tsma data blocks into %s: smaKey %" PRIx64 "-%" PRIx64 ", dataLen %" PRIu32 " fail",
SMA_VID(pSmaH->pSma), pDBFile->path, *(int64_t *)smaKey, *(int64_t *)POINTER_SHIFT(smaKey, 8), dataLen);
return TSDB_CODE_FAILED;
}
smaDebug("vgId:%d insert tsma data blocks into %s: smaKey %" PRIx64 "-%" PRIx64 ", dataLen %" PRIu32 " succeed",
smaDebug("vgId:%d, insert tsma data blocks into %s: smaKey %" PRIx64 "-%" PRIx64 ", dataLen %" PRIu32 " succeed",
SMA_VID(pSmaH->pSma), pDBFile->path, *(int64_t *)smaKey, *(int64_t *)POINTER_SHIFT(smaKey, 8), dataLen);
#ifdef _TEST_SMA_PRINT_DEBUG_LOG_
......@@ -447,14 +447,14 @@ static int32_t tdInsertTSmaBlocks(STSmaWriteH *pSmaH, void *smaKey, int32_t keyL
void *data = tdGetSmaDataByKey(pDBFile, smaKey, keyLen, &valueSize);
ASSERT(data != NULL);
for (uint32_t v = 0; v < valueSize; v += 8) {
smaWarn("vgId:%d insert sma data val[%d] %" PRIi64, REPO_ID(pSmaH->pTsdb), v, *(int64_t *)POINTER_SHIFT(data, v));
smaWarn("vgId:%d, insert sma data val[%d] %" PRIi64, REPO_ID(pSmaH->pTsdb), v, *(int64_t *)POINTER_SHIFT(data, v));
}
#endif
return TSDB_CODE_SUCCESS;
}
/**
* @brief When sma data received from stream computing, make the relative expired window valid.
* @brief When sma data received from stream computing, make the relative expire window valid.
*
* @param pSma
* @param pStat
......@@ -462,7 +462,7 @@ static int32_t tdInsertTSmaBlocks(STSmaWriteH *pSmaH, void *smaKey, int32_t keyL
* @param skey
* @return int32_t
*/
static int32_t tdResetExpiredWindow(SSma *pSma, SSmaStat *pStat, int64_t indexUid, TSKEY skey) {
static int32_t tdResetExpireWindow(SSma *pSma, SSmaStat *pStat, int64_t indexUid, TSKEY skey) {
SSmaStatItem *pItem = NULL;
tdRefSmaStat(pSma, pStat);
......@@ -473,14 +473,14 @@ static int32_t tdResetExpiredWindow(SSma *pSma, SSmaStat *pStat, int64_t indexUi
if ((pItem) && ((pItem = *(SSmaStatItem **)pItem))) {
// pItem resides in hash buffer all the time unless drop sma index
// TODO: multithread protect
if (taosHashRemove(pItem->expiredWindows, &skey, sizeof(TSKEY)) != 0) {
if (taosHashRemove(pItem->expireWindows, &skey, sizeof(TSKEY)) != 0) {
// error handling
tdUnRefSmaStat(pSma, pStat);
smaWarn("vgId:%d remove skey %" PRIi64 " from expired window for sma index %" PRIi64 " fail", SMA_VID(pSma), skey,
smaWarn("vgId:%d, remove skey %" PRIi64 " from expire window for sma index %" PRIi64 " fail", SMA_VID(pSma), skey,
indexUid);
return TSDB_CODE_FAILED;
}
smaDebug("vgId:%d remove skey %" PRIi64 " from expired window for sma index %" PRIi64 " succeed", SMA_VID(pSma),
smaDebug("vgId:%d, remove skey %" PRIi64 " from expire window for sma index %" PRIi64 " succeed", SMA_VID(pSma),
skey, indexUid);
// TODO: use a standalone interface to received state upate notification from stream computing module.
/**
......@@ -494,7 +494,7 @@ static int32_t tdResetExpiredWindow(SSma *pSma, SSmaStat *pStat, int64_t indexUi
} else {
// error handling
tdUnRefSmaStat(pSma, pStat);
smaWarn("vgId:%d expired window %" PRIi64 " not exists for sma index %" PRIi64, SMA_VID(pSma), skey, indexUid);
smaWarn("vgId:%d, expire window %" PRIi64 " not exists for sma index %" PRIi64, SMA_VID(pSma), skey, indexUid);
return TSDB_CODE_FAILED;
}
......@@ -503,7 +503,7 @@ static int32_t tdResetExpiredWindow(SSma *pSma, SSmaStat *pStat, int64_t indexUi
}
/**
* @brief Drop tSma data and local cache
* @brief Drop tsma data and local cache
* - insert/query reference
* @param pSma
* @param msg
......@@ -514,19 +514,19 @@ static int32_t tdDropTSmaDataImpl(SSma *pSma, int64_t indexUid) {
// clear local cache
if (pEnv) {
smaDebug("vgId:%d drop tSma local cache for %" PRIi64, SMA_VID(pSma), indexUid);
smaDebug("vgId:%d, drop tsma local cache for %" PRIi64, SMA_VID(pSma), indexUid);
SSmaStatItem *pItem = taosHashGet(SMA_ENV_STAT_ITEMS(pEnv), &indexUid, sizeof(indexUid));
if ((pItem) || ((pItem = *(SSmaStatItem **)pItem))) {
if (tdSmaStatIsDropped(pItem)) {
smaDebug("vgId:%d tSma stat is already dropped for %" PRIi64, SMA_VID(pSma), indexUid);
smaDebug("vgId:%d, tsma stat is already dropped for %" PRIi64, SMA_VID(pSma), indexUid);
return TSDB_CODE_TDB_INVALID_ACTION; // TODO: duplicate drop msg would be intercepted by mnode
}
tdWLockSmaEnv(pEnv);
if (tdSmaStatIsDropped(pItem)) {
tdUnLockSmaEnv(pEnv);
smaDebug("vgId:%d tSma stat is already dropped for %" PRIi64, SMA_VID(pSma), indexUid);
smaDebug("vgId:%d, tsma stat is already dropped for %" PRIi64, SMA_VID(pSma), indexUid);
return TSDB_CODE_TDB_INVALID_ACTION; // TODO: duplicate drop msg would be intercepted by mnode
}
tdSmaStatSetDropped(pItem);
......@@ -536,19 +536,20 @@ static int32_t tdDropTSmaDataImpl(SSma *pSma, int64_t indexUid) {
int32_t refVal = INT32_MAX;
while (true) {
if ((refVal = T_REF_VAL_GET(SMA_ENV_STAT(pEnv))) <= 0) {
smaDebug("vgId:%d drop index %" PRIi64 " since refVal=%d", SMA_VID(pSma), indexUid, refVal);
smaDebug("vgId:%d, drop index %" PRIi64 " since refVal=%d", SMA_VID(pSma), indexUid, refVal);
break;
}
smaDebug("vgId:%d wait 1s to drop index %" PRIi64 " since refVal=%d", SMA_VID(pSma), indexUid, refVal);
smaDebug("vgId:%d, wait 1s to drop index %" PRIi64 " since refVal=%d", SMA_VID(pSma), indexUid, refVal);
taosSsleep(1);
if (++nSleep > SMA_DROP_EXPIRED_TIME) {
smaDebug("vgId:%d drop index %" PRIi64 " after wait %d (refVal=%d)", SMA_VID(pSma), indexUid, nSleep, refVal);
smaDebug("vgId:%d, drop index %" PRIi64 " after wait %d (refVal=%d)", SMA_VID(pSma), indexUid, nSleep,
refVal);
break;
};
}
tdFreeSmaStatItem(pItem);
smaDebug("vgId:%d getTSmaDataImpl failed since no index %" PRIi64 " in local cache", SMA_VID(pSma), indexUid);
smaDebug("vgId:%d, getTSmaDataImpl failed since no index %" PRIi64 " in local cache", SMA_VID(pSma), indexUid);
}
}
// clear sma data files
......@@ -572,7 +573,7 @@ int32_t tdGetTSmaDataImpl(SSma *pSma, char *pData, int64_t indexUid, TSKEY query
if (!pEnv) {
terrno = TSDB_CODE_INVALID_PTR;
smaWarn("vgId:%d getTSmaDataImpl failed since pTSmaEnv is NULL", SMA_VID(pSma));
smaWarn("vgId:%d, getTSmaDataImpl failed since pTSmaEnv is NULL", SMA_VID(pSma));
return TSDB_CODE_FAILED;
}
......@@ -585,7 +586,7 @@ int32_t tdGetTSmaDataImpl(SSma *pSma, char *pData, int64_t indexUid, TSKEY query
// it's NULL.
tdUnRefSmaStat(pSma, pStat);
terrno = TSDB_CODE_TDB_INVALID_ACTION;
smaDebug("vgId:%d getTSmaDataImpl failed since no index %" PRIi64, SMA_VID(pSma), indexUid);
smaDebug("vgId:%d, getTSmaDataImpl failed since no index %" PRIi64, SMA_VID(pSma), indexUid);
return TSDB_CODE_FAILED;
}
......@@ -593,7 +594,7 @@ int32_t tdGetTSmaDataImpl(SSma *pSma, char *pData, int64_t indexUid, TSKEY query
int32_t nQueryWin = taosArrayGetSize(pQuerySKey);
for (int32_t n = 0; n < nQueryWin; ++n) {
TSKEY skey = taosArrayGet(pQuerySKey, n);
if (taosHashGet(pItem->expiredWindows, &skey, sizeof(TSKEY))) {
if (taosHashGet(pItem->expireWindows, &skey, sizeof(TSKEY))) {
// TODO: mark this window as expired.
}
}
......@@ -603,18 +604,18 @@ int32_t tdGetTSmaDataImpl(SSma *pSma, char *pData, int64_t indexUid, TSKEY query
int8_t smaStat = 0;
if (!tdSmaStatIsOK(pItem, &smaStat)) { // TODO: multiple check for large scale sma query
tdUnRefSmaStat(pSma, pStat);
terrno = TSDB_CODE_TDB_INVALID_SMA_STAT;
smaWarn("vgId:%d getTSmaDataImpl failed from index %" PRIi64 " since %s %" PRIi8, SMA_VID(pSma), indexUid,
terrno = TSDB_CODE_TSMA_INVALID_STAT;
smaWarn("vgId:%d, getTSmaDataImpl failed from index %" PRIi64 " since %s %" PRIi8, SMA_VID(pSma), indexUid,
tstrerror(terrno), smaStat);
return TSDB_CODE_FAILED;
}
if (taosHashGet(pItem->expiredWindows, &querySKey, sizeof(TSKEY))) {
if (taosHashGet(pItem->expireWindows, &querySKey, sizeof(TSKEY))) {
// TODO: mark this window as expired.
smaDebug("vgId:%d skey %" PRIi64 " of window exists in expired window for index %" PRIi64, SMA_VID(pSma), querySKey,
smaDebug("vgId:%d, skey %" PRIi64 " of window exists in expire window for index %" PRIi64, SMA_VID(pSma), querySKey,
indexUid);
} else {
smaDebug("vgId:%d skey %" PRIi64 " of window not in expired window for index %" PRIi64, SMA_VID(pSma), querySKey,
smaDebug("vgId:%d, skey %" PRIi64 " of window not in expire window for index %" PRIi64, SMA_VID(pSma), querySKey,
indexUid);
}
......@@ -629,10 +630,10 @@ int32_t tdGetTSmaDataImpl(SSma *pSma, char *pData, int64_t indexUid, TSKEY query
tdUnRefSmaStat(pSma, pStat);
tdInitTSmaFile(&tReadH, indexUid, querySKey);
smaDebug("### vgId:%d read from DBF %s days:%d, interval:%" PRIi64 ", storageLevel:%" PRIi8 " queryKey:%" PRIi64,
smaDebug("### vgId:%d, read from DBF %s days:%d, interval:%" PRIi64 ", storageLevel:%" PRIi8 " queryKey:%" PRIi64,
SMA_VID(pSma), tReadH.dFile.path, tReadH.days, tReadH.interval, tReadH.storageLevel, querySKey);
if (smaOpenDBF(pEnv->dbEnv, &tReadH.dFile) != 0) {
smaWarn("vgId:%d open DBF %s failed since %s", SMA_VID(pSma), tReadH.dFile.path, tstrerror(terrno));
smaWarn("vgId:%d, open DBF %s failed since %s", SMA_VID(pSma), tReadH.dFile.path, tstrerror(terrno));
return TSDB_CODE_FAILED;
}
......@@ -641,13 +642,13 @@ int32_t tdGetTSmaDataImpl(SSma *pSma, char *pData, int64_t indexUid, TSKEY query
int64_t queryGroupId = 0;
tdEncodeTSmaKey(queryGroupId, querySKey, (void **)&pSmaKey);
smaDebug("vgId:%d get sma data from %s: smaKey %" PRIx64 "-%" PRIx64 ", keyLen %d", SMA_VID(pSma), tReadH.dFile.path,
smaDebug("vgId:%d, get sma data from %s: smaKey %" PRIx64 "-%" PRIx64 ", keyLen %d", SMA_VID(pSma), tReadH.dFile.path,
*(int64_t *)smaKey, *(int64_t *)POINTER_SHIFT(smaKey, 8), SMA_KEY_LEN);
void *result = NULL;
int32_t valueSize = 0;
if (!(result = smaGetSmaDataByKey(&tReadH.dFile, smaKey, SMA_KEY_LEN, &valueSize))) {
smaWarn("vgId:%d get sma data failed from smaIndex %" PRIi64 ", smaKey %" PRIx64 "-%" PRIx64 " since %s",
smaWarn("vgId:%d, get sma data failed from smaIndex %" PRIi64 ", smaKey %" PRIx64 "-%" PRIx64 " since %s",
SMA_VID(pSma), indexUid, *(int64_t *)smaKey, *(int64_t *)POINTER_SHIFT(smaKey, 8), tstrerror(terrno));
smaCloseDBF(&tReadH.dFile);
return TSDB_CODE_FAILED;
......@@ -656,7 +657,7 @@ int32_t tdGetTSmaDataImpl(SSma *pSma, char *pData, int64_t indexUid, TSKEY query
#ifdef _TEST_SMA_PRINT_DEBUG_LOG_
for (uint32_t v = 0; v < valueSize; v += 8) {
smaWarn("vgId:%d get sma data v[%d]=%" PRIi64, SMA_VID(pSma), v, *(int64_t *)POINTER_SHIFT(result, v));
smaWarn("vgId:%d, get sma data v[%d]=%" PRIi64, SMA_VID(pSma), v, *(int64_t *)POINTER_SHIFT(result, v));
}
#endif
taosMemoryFreeClear(result); // TODO: fill the result to output
......@@ -721,7 +722,7 @@ int32_t tdDropTSma(SSma *pSma, char *pMsg) {
return -1;
}
// TODO: send msg to stream computing to drop tSma
// TODO: send msg to stream computing to drop tsma
// if ((send msg to stream computing) < 0) {
// tDestroyTSma(&vCreateSmaReq);
// return -1;
......@@ -755,9 +756,9 @@ static SSmaStatItem *tdNewSmaStatItem(int8_t state) {
}
pItem->state = state;
pItem->expiredWindows = taosHashInit(SMA_STATE_ITEM_HASH_SLOT, taosGetDefaultHashFunction(TSDB_DATA_TYPE_TIMESTAMP),
true, HASH_ENTRY_LOCK);
if (!pItem->expiredWindows) {
pItem->expireWindows = taosHashInit(SMA_STATE_ITEM_HASH_SLOT, taosGetDefaultHashFunction(TSDB_DATA_TYPE_TIMESTAMP),
true, HASH_ENTRY_LOCK);
if (!pItem->expireWindows) {
taosMemoryFreeClear(pItem);
return NULL;
}
......@@ -765,8 +766,7 @@ static SSmaStatItem *tdNewSmaStatItem(int8_t state) {
return pItem;
}
static int32_t tdSetExpiredWindow(SSma *pSma, SHashObj *pItemsHash, int64_t indexUid, int64_t winSKey,
int64_t version) {
static int32_t tdSetExpireWindow(SSma *pSma, SHashObj *pItemsHash, int64_t indexUid, int64_t winSKey, int64_t version) {
SSmaStatItem *pItem = taosHashGet(pItemsHash, &indexUid, sizeof(indexUid));
if (!pItem) {
// TODO: use TSDB_SMA_STAT_EXPIRED and update by stream computing later
......@@ -780,10 +780,10 @@ static int32_t tdSetExpiredWindow(SSma *pSma, SHashObj *pItemsHash, int64_t inde
// cache smaMeta
STSma *pTSma = metaGetSmaInfoByIndex(SMA_META(pSma), indexUid);
if (!pTSma) {
terrno = TSDB_CODE_TDB_NO_SMA_INDEX_IN_META;
taosHashCleanup(pItem->expiredWindows);
terrno = TSDB_CODE_TSMA_NO_INDEX_IN_META;
taosHashCleanup(pItem->expireWindows);
taosMemoryFree(pItem);
smaWarn("vgId:%d set expire window, get tsma meta failed for smaIndex %" PRIi64 " since %s", SMA_VID(pSma),
smaWarn("vgId:%d, set expire window, get tsma meta failed for smaIndex %" PRIi64 " since %s", SMA_VID(pSma),
indexUid, tstrerror(terrno));
return TSDB_CODE_FAILED;
}
......@@ -791,7 +791,7 @@ static int32_t tdSetExpiredWindow(SSma *pSma, SHashObj *pItemsHash, int64_t inde
if (taosHashPut(pItemsHash, &indexUid, sizeof(indexUid), &pItem, sizeof(pItem)) != 0) {
// If error occurs during put smaStatItem, free the resources of pItem
taosHashCleanup(pItem->expiredWindows);
taosHashCleanup(pItem->expireWindows);
taosMemoryFree(pItem);
return TSDB_CODE_FAILED;
}
......@@ -800,53 +800,53 @@ static int32_t tdSetExpiredWindow(SSma *pSma, SHashObj *pItemsHash, int64_t inde
return TSDB_CODE_FAILED;
}
if (taosHashPut(pItem->expiredWindows, &winSKey, sizeof(TSKEY), &version, sizeof(version)) != 0) {
// If error occurs during taosHashPut expired windows, remove the smaIndex from pSma->pSmaStat, thus TSDB would
if (taosHashPut(pItem->expireWindows, &winSKey, sizeof(TSKEY), &version, sizeof(version)) != 0) {
// If error occurs during taosHashPut expire windows, remove the smaIndex from pSma->pSmaStat, thus TSDB would
// tell query module to query raw TS data.
// N.B.
// 1) It is assumed to be extemely little probability event of fail to taosHashPut.
// 2) This would solve the inconsistency to some extent, but not completely, unless we record all expired
// windows failed to put into hash table.
taosHashCleanup(pItem->expiredWindows);
taosHashCleanup(pItem->expireWindows);
taosMemoryFreeClear(pItem->pTSma);
taosHashRemove(pItemsHash, &indexUid, sizeof(indexUid));
smaWarn("vgId:%d smaIndex %" PRIi64 ", put skey %" PRIi64 " to expire window fail", SMA_VID(pSma), indexUid,
smaWarn("vgId:%d, smaIndex %" PRIi64 ", put skey %" PRIi64 " to expire window fail", SMA_VID(pSma), indexUid,
winSKey);
return TSDB_CODE_FAILED;
}
smaDebug("vgId:%d smaIndex %" PRIi64 ", put skey %" PRIi64 " to expire window succeed", SMA_VID(pSma), indexUid,
smaDebug("vgId:%d, smaIndex %" PRIi64 ", put skey %" PRIi64 " to expire window succeed", SMA_VID(pSma), indexUid,
winSKey);
return TSDB_CODE_SUCCESS;
}
/**
* @brief Update expired window according to msg from stream computing module.
* @brief Update expire window according to msg from stream computing module.
*
* @param pSma
* @param msg SSubmitReq
* @return int32_t
*/
int32_t tdUpdateExpiredWindowImpl(SSma *pSma, const SSubmitReq *pMsg, int64_t version) {
int32_t tdUpdateExpireWindowImpl(SSma *pSma, const SSubmitReq *pMsg, int64_t version) {
// no time-range-sma, just return success
if (atomic_load_16(&SMA_TSMA_NUM(pSma)) <= 0) {
smaTrace("vgId:%d not update expire window since no tSma", SMA_VID(pSma));
smaTrace("vgId:%d, not update expire window since no tsma", SMA_VID(pSma));
return TSDB_CODE_SUCCESS;
}
if (!SMA_META(pSma)) {
terrno = TSDB_CODE_INVALID_PTR;
smaError("vgId:%d update expire window failed since no meta ptr", SMA_VID(pSma));
smaError("vgId:%d, update expire window failed since no meta ptr", SMA_VID(pSma));
return TSDB_CODE_FAILED;
}
if (tdCheckAndInitSmaEnv(pSma, TSDB_SMA_TYPE_TIME_RANGE) < 0) {
smaError("vgId:%d init sma env failed since %s", SMA_VID(pSma), terrstr(terrno));
if (tdCheckAndInitSmaEnv(pSma, TSDB_SMA_TYPE_TIME_RANGE, false) < 0) {
smaError("vgId:%d, init tsma env failed since %s", SMA_VID(pSma), terrstr(terrno));
terrno = TSDB_CODE_TDB_INIT_FAILED;
return TSDB_CODE_FAILED;
}
// Firstly, assume that tSma can only be created on super table/normal table.
// Firstly, assume that tsma can only be created on super table/normal table.
// getActiveTimeWindow
SSmaEnv *pEnv = SMA_TSMA_ENV(pSma);
......@@ -914,13 +914,13 @@ int32_t tdUpdateExpiredWindowImpl(SSma *pSma, const SSubmitReq *pMsg, int64_t ve
if (lastWinSKey != winSKey) {
lastWinSKey = winSKey;
if (tdSetExpiredWindow(pSma, pItemsHash, pTSma->indexUid, winSKey, version) < 0) {
if (tdSetExpireWindow(pSma, pItemsHash, pTSma->indexUid, winSKey, version) < 0) {
pSW = tFreeTSmaWrapper(pSW, false);
tdUnRefSmaStat(pSma, pStat);
return TSDB_CODE_FAILED;
}
} else {
smaDebug("vgId:%d smaIndex %" PRIi64 ", put skey %" PRIi64 " to expire window ignore as duplicated",
smaDebug("vgId:%d, smaIndex %" PRIi64 ", put skey %" PRIi64 " to expire window ignore as duplicated",
SMA_VID(pSma), pTSma->indexUid, winSKey);
}
}
......@@ -928,5 +928,78 @@ int32_t tdUpdateExpiredWindowImpl(SSma *pSma, const SSubmitReq *pMsg, int64_t ve
tdUnRefSmaStat(pSma, pStat);
return TSDB_CODE_SUCCESS;
}
/**
* @brief Clear skeys from tsma dstVgroups in expire window.
*
* @param pSma
* @param pMsg
* @return int32_t
*/
int32_t tdClearExpireWindowImpl(SSma *pSma, const SVClrTsmaExpWndsReq *pMsg) {
int64_t indexUid = pMsg->indexUid;
if (atomic_load_16(&SMA_TSMA_NUM(pSma)) <= 0) {
smaWarn("vgId:%d, not clear expire window since no tsma for smaIndex %" PRIi64, SMA_VID(pSma), indexUid);
terrno = TSDB_CODE_TSMA_INVALID_ENV;
return TSDB_CODE_FAILED;
}
if (tdCheckAndInitSmaEnv(pSma, TSDB_SMA_TYPE_TIME_RANGE, true) < 0) {
smaWarn("vgId:%d, not clear expire window since no tsma env", SMA_VID(pSma));
terrno = TSDB_CODE_TSMA_INVALID_ENV;
return TSDB_CODE_FAILED;
}
// Firstly, assume that tsma can only be created on super table/normal table.
// getActiveTimeWindow
SSmaEnv *pEnv = SMA_TSMA_ENV(pSma);
SSmaStat *pStat = SMA_ENV_STAT(pEnv);
SHashObj *pItemsHash = SMA_ENV_STAT_ITEMS(pEnv);
ASSERT(pEnv && pStat && pItemsHash);
// basic procedure
// TODO: optimization
tdRefSmaStat(pSma, pStat);
SSmaStatItem *pItem = taosHashGet(pItemsHash, &indexUid, sizeof(indexUid));
if (!pItem || !(pItem = *(SSmaStatItem **)pItem)) {
smaWarn("vgId:%d, no sma item to clear expire window for smaIndex %" PRIi64, SMA_VID(pSma), indexUid);
terrno = TSDB_CODE_TSMA_NO_INDEX_IN_CACHE;
tdUnRefSmaStat(pSma, pStat);
return TSDB_CODE_FAILED;
}
for (int64_t i = 0; i < pMsg->nItems; ++i) {
const SVTsmaExpWndItem *pWndItem = &pMsg->items[i];
int64_t winSKey = pWndItem->skey;
for (int64_t j = 0; j < pWndItem->nKeys; ++j) {
winSKey += pItem->pTSma->interval;
if (taosHashRemove(pItem->expireWindows, &winSKey, sizeof(winSKey)) != 0) {
// If error occurs during taosHashRemove expire windows, remove the smaIndex from pSma->pSmaStat, thus TSDB
// would tell query module to query raw TS data. N.B.
// 1) It is assumed to be extemely little probability event of fail to taosHashPut.
// 2) This would solve the inconsistency to some extent, but not completely, unless we record all expired
// windows failed to put into hash table.
taosHashCleanup(pItem->expireWindows);
taosMemoryFreeClear(pItem->pTSma);
taosHashRemove(pItemsHash, &indexUid, sizeof(indexUid));
smaWarn("vgId:%d, rm skey %" PRIi64 " in expire window for smaIndex %" PRIi64 " fail", SMA_VID(pSma), winSKey,
indexUid);
terrno = TSDB_CODE_TSMA_RM_SKEY_IN_HASH;
tdUnRefSmaStat(pSma, pStat);
return TSDB_CODE_FAILED;
}
smaDebug("vgId:%d, rm skey %" PRIi64 " in expire window for smaIndex %" PRIi64 " success", SMA_VID(pSma), winSKey,
indexUid);
}
}
tdUnRefSmaStat(pSma, pStat);
return TSDB_CODE_SUCCESS;
}
\ No newline at end of file
......@@ -150,7 +150,7 @@ int32_t tsdbCommit(STsdb *pTsdb) {
return code;
_err:
tsdbError("vgId:%d failed to commit since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
tsdbError("vgId:%d, failed to commit since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
return code;
}
......
......@@ -176,13 +176,13 @@ int32_t tsdbDeleteTableData(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid
pMemTable->nDelOp++;
tsdbError("vgId:%d delete data from table suid:%" PRId64 " uid:%" PRId64 " skey:%" PRId64 " eKey:%" PRId64
tsdbError("vgId:%d, delete data from table suid:%" PRId64 " uid:%" PRId64 " skey:%" PRId64 " eKey:%" PRId64
" since %s",
TD_VID(pTsdb->pVnode), suid, uid, sKey, eKey, tstrerror(code));
return code;
_err:
tsdbError("vgId:%d failed to delete data from table suid:%" PRId64 " uid:%" PRId64 " skey:%" PRId64 " eKey:%" PRId64
tsdbError("vgId:%d, failed to delete data from table suid:%" PRId64 " uid:%" PRId64 " skey:%" PRId64 " eKey:%" PRId64
" since %s",
TD_VID(pTsdb->pVnode), suid, uid, sKey, eKey, tstrerror(code));
return code;
......
......@@ -918,10 +918,10 @@ static int32_t vnodeProcessExpWndsClrReq(SVnode *pVnode, void *pReq, int32_t len
ASSERT(0);
// if (tdProcess(pVnode->pSma, version, (const char *)&req) < 0) {
// if (pRsp) pRsp->code = terrno;
// goto _err;
// }
if (tdClearExpireWindow(pVnode->pSma, (const SVClrTsmaExpWndsReq *)&req) < 0) {
if (pRsp) pRsp->code = terrno;
goto _err;
}
tDecoderClear(&coder);
vDebug("vgId:%d, success to process expWnds clear for tsma %" PRIi64 " version %" PRIi64, TD_VID(pVnode),
......
......@@ -373,7 +373,7 @@ TEST(testCase, tSma_Data_Insert_Query_Test) {
pTsdb->pTfs = tfsOpen(&pDisks, numOfDisks);
EXPECT_NE(pTsdb->pTfs, nullptr);
// generate SSubmitReq msg and update expired window
// generate SSubmitReq msg and update expire window
int16_t schemaVer = 0;
uint32_t mockRowLen = sizeof(STSRow);
uint32_t mockRowNum = 2;
......
......@@ -352,9 +352,6 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TDB_IVLD_TAG_VAL, "TSDB invalid tag valu
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_CACHE_LAST_ROW, "TSDB no cache last row data")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_TABLE_RECREATED, "Table re-created")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_TDB_ENV_OPEN_ERROR, "TDB env open error")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_SMA_INDEX_IN_META, "No sma index in meta")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_SMA_STAT, "Invalid sma state")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_TSMA_ALREADY_EXIST, "TSMA already exists")
// query
......@@ -536,25 +533,38 @@ TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_DELETE_WHERE, "The DELETE statemen
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_REDISTRIBUTE_VG, "The REDISTRIBUTE VGROUP statement only support 1 to 3 dnodes")
//planner
TAOS_DEFINE_ERROR(TSDB_CODE_PLAN_INTERNAL_ERROR, "Planner internal error")
TAOS_DEFINE_ERROR(TSDB_CODE_PLAN_INTERNAL_ERROR, "Planner internal error")
//udf
TAOS_DEFINE_ERROR(TSDB_CODE_UDF_STOPPING, "udf is stopping")
TAOS_DEFINE_ERROR(TSDB_CODE_UDF_PIPE_READ_ERR, "udf pipe read error")
TAOS_DEFINE_ERROR(TSDB_CODE_UDF_PIPE_CONNECT_ERR, "udf pipe connect error")
TAOS_DEFINE_ERROR(TSDB_CODE_UDF_PIPE_NO_PIPE, "udf no pipe")
TAOS_DEFINE_ERROR(TSDB_CODE_UDF_LOAD_UDF_FAILURE, "udf load failure")
TAOS_DEFINE_ERROR(TSDB_CODE_UDF_INVALID_STATE, "udf invalid state")
TAOS_DEFINE_ERROR(TSDB_CODE_UDF_INVALID_INPUT, "udf invalid function input")
TAOS_DEFINE_ERROR(TSDB_CODE_UDF_NO_FUNC_HANDLE, "udf no function handle")
TAOS_DEFINE_ERROR(TSDB_CODE_UDF_INVALID_BUFSIZE, "udf invalid bufsize")
TAOS_DEFINE_ERROR(TSDB_CODE_UDF_INVALID_OUTPUT_TYPE, "udf invalid output type")
TAOS_DEFINE_ERROR(TSDB_CODE_UDF_STOPPING, "udf is stopping")
TAOS_DEFINE_ERROR(TSDB_CODE_UDF_PIPE_READ_ERR, "udf pipe read error")
TAOS_DEFINE_ERROR(TSDB_CODE_UDF_PIPE_CONNECT_ERR, "udf pipe connect error")
TAOS_DEFINE_ERROR(TSDB_CODE_UDF_PIPE_NO_PIPE, "udf no pipe")
TAOS_DEFINE_ERROR(TSDB_CODE_UDF_LOAD_UDF_FAILURE, "udf load failure")
TAOS_DEFINE_ERROR(TSDB_CODE_UDF_INVALID_STATE, "udf invalid state")
TAOS_DEFINE_ERROR(TSDB_CODE_UDF_INVALID_INPUT, "udf invalid function input")
TAOS_DEFINE_ERROR(TSDB_CODE_UDF_NO_FUNC_HANDLE, "udf no function handle")
TAOS_DEFINE_ERROR(TSDB_CODE_UDF_INVALID_BUFSIZE, "udf invalid bufsize")
TAOS_DEFINE_ERROR(TSDB_CODE_UDF_INVALID_OUTPUT_TYPE, "udf invalid output type")
//schemaless
TAOS_DEFINE_ERROR(TSDB_CODE_SML_INVALID_PROTOCOL_TYPE, "Invalid line protocol type")
TAOS_DEFINE_ERROR(TSDB_CODE_SML_INVALID_PRECISION_TYPE, "Invalid timestamp precision type")
TAOS_DEFINE_ERROR(TSDB_CODE_SML_INVALID_DATA, "Invalid data type")
TAOS_DEFINE_ERROR(TSDB_CODE_SML_INVALID_DB_CONF, "Invalid schemaless db config")
TAOS_DEFINE_ERROR(TSDB_CODE_SML_INVALID_PROTOCOL_TYPE, "Invalid line protocol type")
TAOS_DEFINE_ERROR(TSDB_CODE_SML_INVALID_PRECISION_TYPE, "Invalid timestamp precision type")
TAOS_DEFINE_ERROR(TSDB_CODE_SML_INVALID_DATA, "Invalid data type")
TAOS_DEFINE_ERROR(TSDB_CODE_SML_INVALID_DB_CONF, "Invalid schemaless db config")
//tsma
TAOS_DEFINE_ERROR(TSDB_CODE_TSMA_ALREADY_EXIST, "Tsma already exists")
TAOS_DEFINE_ERROR(TSDB_CODE_TSMA_NO_INDEX_IN_META, "No tsma index in meta")
TAOS_DEFINE_ERROR(TSDB_CODE_TSMA_INVALID_ENV, "Invalid tsma env")
TAOS_DEFINE_ERROR(TSDB_CODE_TSMA_INVALID_STAT, "Invalid tsma state")
TAOS_DEFINE_ERROR(TSDB_CODE_TSMA_NO_INDEX_IN_CACHE, "No tsma index in cache")
TAOS_DEFINE_ERROR(TSDB_CODE_TSMA_RM_SKEY_IN_HASH, "Rm tsma skey in cache")
//rsma
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_INVALID_ENV, "Invalid rsma env")
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_INVALID_STAT, "Invalid rsma state")
#ifdef TAOS_ERROR_C
};
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册