未验证 提交 e8b91539 编写于 作者: C Cary Xu 提交者: GitHub

Merge pull request #10957 from taosdata/feature/TD-11463-3.0

Feature/td 11463 3.0
......@@ -63,9 +63,12 @@ typedef enum {
} ETsdbStatisStatus;
typedef enum {
TSDB_SMA_STAT_OK = 0, // ready to provide service
TSDB_SMA_STAT_EXPIRED = 1, // not ready or expired
} ETsdbSmaStat;
TSDB_SMA_STAT_UNKNOWN = -1, // unknown
TSDB_SMA_STAT_OK = 0, // ready to provide service
TSDB_SMA_STAT_EXPIRED = 1, // not ready or expired
TSDB_SMA_STAT_DROPPED = 2, // sma dropped
} ETsdbSmaStat; // bit operation
typedef enum {
TSDB_SMA_TYPE_BLOCK = 0, // Block-wise SMA
......
......@@ -342,8 +342,9 @@ int32_t* taosGetErrno();
#define TSDB_CODE_TDB_IVLD_TAG_VAL TAOS_DEF_ERROR_CODE(0, 0x0615)
#define TSDB_CODE_TDB_NO_CACHE_LAST_ROW TAOS_DEF_ERROR_CODE(0, 0x0616)
#define TSDB_CODE_TDB_TABLE_RECREATED TAOS_DEF_ERROR_CODE(0, 0x0617)
#define TSDB_CODE_TDB_NO_SMA_INDEX_IN_META TAOS_DEF_ERROR_CODE(0, 0x0618)
#define TSDB_CODE_TDB_TDB_ENV_OPEN_ERROR TAOS_DEF_ERROR_CODE(0, 0x0619)
#define TSDB_CODE_TDB_TDB_ENV_OPEN_ERROR TAOS_DEF_ERROR_CODE(0, 0x0618)
#define TSDB_CODE_TDB_NO_SMA_INDEX_IN_META TAOS_DEF_ERROR_CODE(0, 0x0619)
#define TSDB_CODE_TDB_INVALID_SMA_STAT TAOS_DEF_ERROR_CODE(0, 0x0620)
// query
#define TSDB_CODE_QRY_INVALID_QHANDLE TAOS_DEF_ERROR_CODE(0, 0x0700)
......
......@@ -313,7 +313,7 @@ int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) {
for (col_id_t i = 0; i < pReq->stbCfg.nBSmaCols; ++i) {
tlen += taosEncodeFixedI16(buf, pReq->stbCfg.pBSmaCols[i]);
}
if (pReq->rollup && NULL != pReq->stbCfg.pRSmaParam) {
if(pReq->rollup && pReq->stbCfg.pRSmaParam) {
SRSmaParam *param = pReq->stbCfg.pRSmaParam;
tlen += taosEncodeFixedU32(buf, (uint32_t)param->xFilesFactor);
tlen += taosEncodeFixedI8(buf, param->delayUnit);
......@@ -336,12 +336,12 @@ int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) {
tlen += taosEncodeFixedI32(buf, pReq->ntbCfg.pSchema[i].bytes);
tlen += taosEncodeString(buf, pReq->ntbCfg.pSchema[i].name);
}
tlen += taosEncodeFixedI16(buf, pReq->stbCfg.nBSmaCols);
for (col_id_t i = 0; i < pReq->stbCfg.nBSmaCols; ++i) {
tlen += taosEncodeFixedI16(buf, pReq->stbCfg.pBSmaCols[i]);
tlen += taosEncodeFixedI16(buf, pReq->ntbCfg.nBSmaCols);
for (col_id_t i = 0; i < pReq->ntbCfg.nBSmaCols; ++i) {
tlen += taosEncodeFixedI16(buf, pReq->ntbCfg.pBSmaCols[i]);
}
if (pReq->rollup && NULL != pReq->stbCfg.pRSmaParam) {
SRSmaParam *param = pReq->stbCfg.pRSmaParam;
if(pReq->rollup && pReq->ntbCfg.pRSmaParam) {
SRSmaParam *param = pReq->ntbCfg.pRSmaParam;
tlen += taosEncodeFixedU32(buf, (uint32_t)param->xFilesFactor);
tlen += taosEncodeFixedI8(buf, param->delayUnit);
tlen += taosEncodeFixedI8(buf, param->nFuncIds);
......@@ -424,19 +424,19 @@ void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) {
buf = taosDecodeFixedI32(buf, &pReq->ntbCfg.pSchema[i].bytes);
buf = taosDecodeStringTo(buf, pReq->ntbCfg.pSchema[i].name);
}
buf = taosDecodeFixedI16(buf, &(pReq->stbCfg.nBSmaCols));
if (pReq->stbCfg.nBSmaCols > 0) {
pReq->stbCfg.pBSmaCols = (col_id_t *)malloc(pReq->stbCfg.nBSmaCols * sizeof(col_id_t));
for (col_id_t i = 0; i < pReq->stbCfg.nBSmaCols; ++i) {
buf = taosDecodeFixedI16(buf, pReq->stbCfg.pBSmaCols + i);
buf = taosDecodeFixedI16(buf, &(pReq->ntbCfg.nBSmaCols));
if(pReq->ntbCfg.nBSmaCols > 0) {
pReq->ntbCfg.pBSmaCols = (col_id_t *)malloc(pReq->ntbCfg.nBSmaCols * sizeof(col_id_t));
for (col_id_t i = 0; i < pReq->ntbCfg.nBSmaCols; ++i) {
buf = taosDecodeFixedI16(buf, pReq->ntbCfg.pBSmaCols + i);
}
} else {
pReq->stbCfg.pBSmaCols = NULL;
pReq->ntbCfg.pBSmaCols = NULL;
}
if (pReq->rollup) {
pReq->stbCfg.pRSmaParam = (SRSmaParam *)malloc(sizeof(SRSmaParam));
SRSmaParam *param = pReq->stbCfg.pRSmaParam;
buf = taosDecodeFixedU32(buf, (uint32_t *)&param->xFilesFactor);
if(pReq->rollup) {
pReq->ntbCfg.pRSmaParam = (SRSmaParam *)malloc(sizeof(SRSmaParam));
SRSmaParam *param = pReq->ntbCfg.pRSmaParam;
buf = taosDecodeFixedU32(buf, (uint32_t*)&param->xFilesFactor);
buf = taosDecodeFixedI8(buf, &param->delayUnit);
buf = taosDecodeFixedI8(buf, &param->nFuncIds);
if (param->nFuncIds > 0) {
......@@ -448,7 +448,7 @@ void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) {
}
buf = taosDecodeFixedI64(buf, &param->delay);
} else {
pReq->stbCfg.pRSmaParam = NULL;
pReq->ntbCfg.pRSmaParam = NULL;
}
break;
default:
......
......@@ -51,7 +51,7 @@ int metaCreateTable(SMeta *pMeta, STbCfg *pTbCfg);
int metaDropTable(SMeta *pMeta, tb_uid_t uid);
int metaCommit(SMeta *pMeta);
int32_t metaCreateTSma(SMeta *pMeta, SSmaCfg *pCfg);
int32_t metaDropTSma(SMeta *pMeta, char *indexName);
int32_t metaDropTSma(SMeta *pMeta, int64_t indexUid);
// For Query
STbCfg * metaGetTbInfoByUid(SMeta *pMeta, tb_uid_t uid);
......
......@@ -96,6 +96,7 @@ int tsdbCommit(STsdb *pTsdb);
*/
int32_t tsdbInsertTSmaData(STsdb *pTsdb, char *msg);
int32_t tsdbUpdateSmaWindow(STsdb *pTsdb, int8_t smaType, char *msg);
int32_t tsdbDropTSmaData(STsdb *pTsdb, int64_t indexUid);
/**
* @brief Insert RSma(Time-range-wise Rollup SMA) data.
......
......@@ -34,7 +34,7 @@ void metaCloseDB(SMeta* pMeta);
int metaSaveTableToDB(SMeta* pMeta, STbCfg* pTbCfg);
int metaRemoveTableFromDb(SMeta* pMeta, tb_uid_t uid);
int metaSaveSmaToDB(SMeta* pMeta, STSma* pTbCfg);
int metaRemoveSmaFromDb(SMeta* pMeta, const char* indexName);
int metaRemoveSmaFromDb(SMeta* pMeta, int64_t indexUid);
// SMetaCache
int metaOpenCache(SMeta* pMeta);
......
......@@ -16,15 +16,15 @@
#ifndef _TD_TSDB_SMA_H_
#define _TD_TSDB_SMA_H_
typedef struct SSmaStat SSmaStat;
typedef struct SSmaEnv SSmaEnv;
typedef struct SSmaStat SSmaStat;
typedef struct SSmaEnv SSmaEnv;
struct SSmaEnv {
TdThreadRwlock lock;
SDiskID did;
TDBEnv dbEnv; // TODO: If it's better to put it in smaIndex level?
char * path; // relative path
SSmaStat * pStat;
SDiskID did;
TDBEnv dbEnv; // TODO: If it's better to put it in smaIndex level?
char *path; // relative path
SSmaStat *pStat;
};
#define SMA_ENV_LOCK(env) ((env)->lock)
......
......@@ -259,7 +259,7 @@ int metaSaveSmaToDB(SMeta *pMeta, STSma *pSmaCfg) {
return 0;
}
int metaRemoveSmaFromDb(SMeta *pMeta, const char *indexName) {
int metaRemoveSmaFromDb(SMeta *pMeta, int64_t indexUid) {
// TODO
#if 0
DBT key = {0};
......
......@@ -121,11 +121,11 @@ int32_t metaCreateTSma(SMeta *pMeta, SSmaCfg *pCfg) {
return TSDB_CODE_SUCCESS;
}
int32_t metaDropTSma(SMeta *pMeta, char* indexName) {
int32_t metaDropTSma(SMeta *pMeta, int64_t indexUid) {
// TODO: Validate the cfg
// TODO: add atomicity
if (metaRemoveSmaFromDb(pMeta, indexName) < 0) {
if (metaRemoveSmaFromDb(pMeta, indexUid) < 0) {
// TODO: handle error
return -1;
}
......
......@@ -25,6 +25,7 @@ static const char *TSDB_SMA_DNAME[] = {
#define SMA_STORAGE_TSDB_TIMES 10
#define SMA_STORAGE_SPLIT_HOURS 24
#define SMA_KEY_LEN 18 // tableUid_colId_TSKEY 8+2+8
#define SMA_DROP_EXPIRED_TIME 10 // default is 10 seconds
#define SMA_STATE_HASH_SLOT 4
#define SMA_STATE_ITEM_HASH_SLOT 32
......@@ -60,10 +61,11 @@ typedef struct {
typedef struct {
/**
* @brief The field 'state' is here to demonstrate if one smaIndex is ready to provide service.
* - TSDB_SMA_STAT_EXPIRED: 1) If sma calculation of history TS data is not finished; 2) Or if the TSDB is open,
* without information about its previous state.
* - TSDB_SMA_STAT_OK: 1) The sma calculation of history data is finished; 2) Or recevied information from
* Streaming Module or TSDB local persistence.
* - TSDB_SMA_STAT_EXPIRED: 1) If sma calculation of history TS data is not finished; 2) Or if the TSDB is open,
* without information about its previous state.
* - TSDB_SMA_STAT_DROPPED: 1)sma dropped
*/
int8_t state; // ETsdbSmaStat
SHashObj *expiredWindows; // key: skey of time window, value: N/A
......@@ -80,6 +82,7 @@ struct SSmaStat {
// expired window
static int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, ETsdbSmaType smaType, char *msg);
static int32_t tsdbInitSmaStat(SSmaStat **pSmaStat);
static void * tsdbFreeSmaStatItem(SSmaStatItem *pSmaStatItem);
static int32_t tsdbDestroySmaState(SSmaStat *pSmaStat);
static SSmaEnv *tsdbNewSmaEnv(const STsdb *pTsdb, const char *path, SDiskID did);
static int32_t tsdbInitSmaEnv(STsdb *pTsdb, const char *path, SDiskID did, SSmaEnv **pEnv);
......@@ -109,7 +112,55 @@ static void tsdbGetSmaDir(int32_t vgId, ETsdbSmaType smaType, char dirName[])
static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg);
static int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, char *msg);
// mgmt interface
static int32_t tsdbDropTSmaDataImpl(STsdb *pTsdb, int64_t indexUid);
// implementation
static FORCE_INLINE int8_t tsdbSmaStat(SSmaStatItem *pStatItem) {
if (pStatItem) {
return atomic_load_8(&pStatItem->state);
}
return TSDB_SMA_STAT_UNKNOWN;
}
static FORCE_INLINE bool tsdbSmaStatIsOK(SSmaStatItem *pStatItem, int8_t *state) {
if(!pStatItem) {
return false;
}
if (state) {
*state = atomic_load_8(&pStatItem->state);
return *state == TSDB_SMA_STAT_OK;
}
return atomic_load_8(&pStatItem->state) == TSDB_SMA_STAT_OK;
}
static FORCE_INLINE bool tsdbSmaStatIsExpired(SSmaStatItem *pStatItem) {
return pStatItem ? (atomic_load_8(&pStatItem->state) & TSDB_SMA_STAT_EXPIRED) : true;
}
static FORCE_INLINE bool tsdbSmaStatIsDropped(SSmaStatItem *pStatItem) {
return pStatItem ? (atomic_load_8(&pStatItem->state) & TSDB_SMA_STAT_DROPPED) : true;
}
static FORCE_INLINE void tsdbSmaStatSetOK(SSmaStatItem *pStatItem) {
if (pStatItem) {
atomic_store_8(&pStatItem->state, TSDB_SMA_STAT_OK);
}
}
static FORCE_INLINE void tsdbSmaStatSetExpired(SSmaStatItem *pStatItem) {
if (pStatItem) {
atomic_or_fetch_8(&pStatItem->state, TSDB_SMA_STAT_EXPIRED);
}
}
static FORCE_INLINE void tsdbSmaStatSetDropped(SSmaStatItem *pStatItem) {
if (pStatItem) {
atomic_or_fetch_8(&pStatItem->state, TSDB_SMA_STAT_DROPPED);
}
}
static void tsdbGetSmaDir(int32_t vgId, ETsdbSmaType smaType, char dirName[]) {
snprintf(dirName, TSDB_FILENAME_LEN, "vnode%svnode%d%stsdb%s%s", TD_DIRSEP, vgId, TD_DIRSEP, TD_DIRSEP,
TSDB_SMA_DNAME[smaType]);
......@@ -252,6 +303,16 @@ static SSmaStatItem *tsdbNewSmaStatItem(int8_t state) {
return pItem;
}
static void *tsdbFreeSmaStatItem(SSmaStatItem *pSmaStatItem) {
if (pSmaStatItem != NULL) {
tdDestroyTSma(pSmaStatItem->pSma);
tfree(pSmaStatItem->pSma);
taosHashCleanup(pSmaStatItem->expiredWindows);
tfree(pSmaStatItem);
}
return NULL;
}
/**
* @brief Release resources allocated for its member fields, not including itself.
*
......@@ -264,12 +325,7 @@ int32_t tsdbDestroySmaState(SSmaStat *pSmaStat) {
void *item = taosHashIterate(pSmaStat->smaStatItems, NULL);
while (item != NULL) {
SSmaStatItem *pItem = *(SSmaStatItem **)item;
if (pItem != NULL) {
tdDestroyTSma(pItem->pSma);
tfree(pItem->pSma);
taosHashCleanup(pItem->expiredWindows);
tfree(pItem);
}
tsdbFreeSmaStatItem(pItem);
item = taosHashIterate(pSmaStat->smaStatItems, item);
}
taosHashCleanup(pSmaStat->smaStatItems);
......@@ -437,6 +493,15 @@ static int32_t tsdbResetExpiredWindow(STsdb *pTsdb, SSmaStat *pStat, int64_t ind
skey, indexUid);
return TSDB_CODE_FAILED;
}
// TODO: use a standalone interface to received state upate notification from stream computing module.
/**
* @brief state
* - When SMA env init in TSDB, its status is TSDB_SMA_STAT_OK.
* - In startup phase of stream computing module, it should notify the SMA env in TSDB to expired if needed(e.g.
* when batch data caculation not finised)
* - When TSDB_SMA_STAT_OK, the stream computing module should also notify that to the SMA env in TSDB.
*/
pItem->state = TSDB_SMA_STAT_OK;
} else {
// error handling
tsdbUnRefSmaStat(pTsdb, pStat);
......@@ -711,6 +776,8 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg) {
STsdbCfg * pCfg = REPO_CFG(pTsdb);
STSmaDataWrapper *pData = (STSmaDataWrapper *)msg;
SSmaEnv * pEnv = atomic_load_ptr(&pTsdb->pTSmaEnv);
int64_t indexUid = SMA_TEST_INDEX_UID;
if (pEnv == NULL) {
terrno = TSDB_CODE_INVALID_PTR;
......@@ -730,13 +797,28 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg) {
return TSDB_CODE_FAILED;
}
int64_t indexUid = SMA_TEST_INDEX_UID;
SSmaStat *pStat = SMA_ENV_STAT(pTsdb->pTSmaEnv);
SSmaStatItem *pItem = NULL;
tsdbRefSmaStat(pTsdb, pStat);
if (pStat && pStat->smaStatItems) {
pItem = taosHashGet(pStat->smaStatItems, &indexUid, sizeof(indexUid));
}
if ((pItem == NULL) || ((pItem = *(SSmaStatItem **)pItem) == NULL) || tsdbSmaStatIsDropped(pItem)) {
terrno = TSDB_CODE_TDB_INVALID_SMA_STAT;
tsdbUnRefSmaStat(pTsdb, pStat);
return TSDB_CODE_FAILED;
}
char rPath[TSDB_FILENAME_LEN] = {0};
char aPath[TSDB_FILENAME_LEN] = {0};
snprintf(rPath, TSDB_FILENAME_LEN, "%s%s%" PRIi64, SMA_ENV_PATH(pEnv), TD_DIRSEP, indexUid);
tfsAbsoluteName(REPO_TFS(pTsdb), SMA_ENV_DID(pEnv), rPath, aPath);
if (!taosCheckExistFile(aPath)) {
if (tfsMkdirRecurAt(REPO_TFS(pTsdb), rPath, SMA_ENV_DID(pEnv)) != TSDB_CODE_SUCCESS) {
tsdbUnRefSmaStat(pTsdb, pStat);
return TSDB_CODE_FAILED;
}
}
......@@ -754,12 +836,14 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg) {
tsdbWarn("vgId:%d open DB file %s failed since %s", REPO_ID(pTsdb),
tSmaH.dFile.path ? tSmaH.dFile.path : "path is NULL", tstrerror(terrno));
tsdbDestroyTSmaWriteH(&tSmaH);
tsdbUnRefSmaStat(pTsdb, pStat);
return TSDB_CODE_FAILED;
}
if (tsdbInsertTSmaDataSection(&tSmaH, pData) != 0) {
tsdbWarn("vgId:%d insert tSma data section failed since %s", REPO_ID(pTsdb), tstrerror(terrno));
tsdbDestroyTSmaWriteH(&tSmaH);
tsdbUnRefSmaStat(pTsdb, pStat);
return TSDB_CODE_FAILED;
}
// TODO:tsdbEndTSmaCommit();
......@@ -768,9 +852,60 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg) {
tsdbResetExpiredWindow(pTsdb, SMA_ENV_STAT(pTsdb->pTSmaEnv), pData->indexUid, pData->skey);
tsdbDestroyTSmaWriteH(&tSmaH);
tsdbUnRefSmaStat(pTsdb, pStat);
return TSDB_CODE_SUCCESS;
}
/**
* @brief Drop tSma data and local cache
* - insert/query reference
* @param pTsdb
* @param msg
* @return int32_t
*/
static int32_t tsdbDropTSmaDataImpl(STsdb *pTsdb, int64_t indexUid) {
SSmaEnv *pEnv = atomic_load_ptr(&pTsdb->pTSmaEnv);
// clear local cache
if (pEnv) {
tsdbDebug("vgId:%d drop tSma local cache for %" PRIi64, REPO_ID(pTsdb), indexUid);
SSmaStatItem *pItem = taosHashGet(SMA_ENV_STAT_ITEMS(pEnv), &indexUid, sizeof(indexUid));
if ((pItem != NULL) || ((pItem = *(SSmaStatItem **)pItem) != NULL)) {
if (tsdbSmaStatIsDropped(pItem)) {
tsdbDebug("vgId:%d tSma stat is already dropped for %" PRIi64, REPO_ID(pTsdb), indexUid);
return TSDB_CODE_TDB_INVALID_ACTION; // TODO: duplicate drop msg would be intercepted by mnode
}
tsdbWLockSma(pEnv);
if (tsdbSmaStatIsDropped(pItem)) {
tsdbUnLockSma(pEnv);
tsdbDebug("vgId:%d tSma stat is already dropped for %" PRIi64, REPO_ID(pTsdb), indexUid);
return TSDB_CODE_TDB_INVALID_ACTION; // TODO: duplicate drop msg would be intercepted by mnode
}
tsdbSmaStatSetDropped(pItem);
tsdbUnLockSma(pEnv);
int32_t nSleep = 0;
while (true) {
if (T_REF_VAL_GET(SMA_ENV_STAT(pEnv)) <= 0) {
break;
}
taosSsleep(1);
if (++nSleep > SMA_DROP_EXPIRED_TIME) {
break;
};
}
tsdbFreeSmaStatItem(pItem);
tsdbDebug("vgId:%d getTSmaDataImpl failed since no index %" PRIi64 " in local cache", REPO_ID(pTsdb), indexUid);
}
}
// clear sma data files
// TODO:
}
static int32_t tsdbSetRSmaDataFile(STSmaWriteH *pSmaH, STSmaDataWrapper *pData, int32_t fid) {
STsdb *pTsdb = pSmaH->pTsdb;
......@@ -784,35 +919,64 @@ static int32_t tsdbSetRSmaDataFile(STSmaWriteH *pSmaH, STSmaDataWrapper *pData,
static int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, char *msg) {
STsdbCfg * pCfg = REPO_CFG(pTsdb);
STSmaDataWrapper *pData = (STSmaDataWrapper *)msg;
STSmaWriteH tSmaH = {0};
SSmaEnv * pEnv = atomic_load_ptr(&pTsdb->pRSmaEnv);
tsdbInitTSmaWriteH(&tSmaH, pTsdb, pData);
if (pEnv == NULL) {
terrno = TSDB_CODE_INVALID_PTR;
tsdbWarn("vgId:%d insert tSma data failed since pTSmaEnv is NULL", REPO_ID(pTsdb));
return terrno;
}
if (pData->dataLen <= 0) {
TASSERT(0);
terrno = TSDB_CODE_INVALID_PARA;
return terrno;
return TSDB_CODE_FAILED;
}
// Step 1: Judge the storage level
int32_t storageLevel = tsdbGetSmaStorageLevel(pData->interval, pData->intervalUnit);
int32_t daysPerFile = storageLevel == SMA_STORAGE_LEVEL_TSDB ? SMA_STORAGE_TSDB_DAYS : pCfg->daysPerFile;
STSmaWriteH tSmaH = {0};
// Step 2: Set the DFile for storage of SMA index, and iterate/split the TSma data and store to B+Tree index file
// - Set and open the DFile or the B+Tree file
if (tsdbInitTSmaWriteH(&tSmaH, pTsdb, pData) != 0) {
return TSDB_CODE_FAILED;
}
int64_t indexUid = SMA_TEST_INDEX_UID;
char rPath[TSDB_FILENAME_LEN] = {0};
char aPath[TSDB_FILENAME_LEN] = {0};
snprintf(rPath, TSDB_FILENAME_LEN, "%s%s%" PRIi64, SMA_ENV_PATH(pEnv), TD_DIRSEP, indexUid);
tfsAbsoluteName(REPO_TFS(pTsdb), SMA_ENV_DID(pEnv), rPath, aPath);
if (!taosCheckExistFile(aPath)) {
if (tfsMkdirRecurAt(REPO_TFS(pTsdb), rPath, SMA_ENV_DID(pEnv)) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_FAILED;
}
}
// Step 1: Judge the storage level and days
int32_t storageLevel = tsdbGetSmaStorageLevel(pData->interval, pData->intervalUnit);
int32_t daysPerFile = tsdbGetTSmaDays(pTsdb, tSmaH.interval, storageLevel);
int32_t fid = (int32_t)(TSDB_KEY_FID(pData->skey, daysPerFile, pCfg->precision));
// Save all the TSma data to one file
// Step 2: Set the DFile for storage of SMA index, and iterate/split the TSma data and store to B+Tree index file
// - Set and open the DFile or the B+Tree file
// TODO: tsdbStartTSmaCommit();
tsdbSetTSmaDataFile(&tSmaH, pData, storageLevel, fid);
tsdbSetTSmaDataFile(&tSmaH, pData, indexUid, fid);
if (tsdbOpenDBF(pTsdb->pTSmaEnv->dbEnv, &tSmaH.dFile) != 0) {
tsdbWarn("vgId:%d open DB file %s failed since %s", REPO_ID(pTsdb),
tSmaH.dFile.path ? tSmaH.dFile.path : "path is NULL", tstrerror(terrno));
tsdbDestroyTSmaWriteH(&tSmaH);
return TSDB_CODE_FAILED;
}
tsdbInsertTSmaDataSection(&tSmaH, pData);
if (tsdbInsertTSmaDataSection(&tSmaH, pData) != 0) {
tsdbWarn("vgId:%d insert tSma data section failed since %s", REPO_ID(pTsdb), tstrerror(terrno));
tsdbDestroyTSmaWriteH(&tSmaH);
return TSDB_CODE_FAILED;
}
// TODO:tsdbEndTSmaCommit();
// reset the SSmaStat
tsdbResetExpiredWindow(pTsdb, SMA_ENV_STAT(pTsdb->pRSmaEnv), pData->indexUid, pData->skey);
// Step 3: reset the SSmaStat
tsdbResetExpiredWindow(pTsdb, SMA_ENV_STAT(pTsdb->pTSmaEnv), pData->indexUid, pData->skey);
tsdbDestroyTSmaWriteH(&tSmaH);
return TSDB_CODE_SUCCESS;
}
......@@ -934,6 +1098,15 @@ static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSmaDataWrapper *pData, int64_
#endif
#if 1
int8_t smaStat = 0;
if (!tsdbSmaStatIsOK(pItem, &smaStat)) { // TODO: multiple check for large scale sma query
tsdbUnRefSmaStat(pTsdb, SMA_ENV_STAT(pTsdb->pTSmaEnv));
terrno = TSDB_CODE_TDB_INVALID_SMA_STAT;
tsdbWarn("vgId:%d getTSmaDataImpl failed from index %" PRIi64 " since %s %" PRIi8, REPO_ID(pTsdb), indexUid,
tstrerror(terrno), smaStat);
return TSDB_CODE_FAILED;
}
if (taosHashGet(pItem->expiredWindows, &querySKey, sizeof(TSKEY)) != NULL) {
// TODO: mark this window as expired.
tsdbDebug("vgId:%d skey %" PRIi64 " of window exists in expired window for index %" PRIi64, REPO_ID(pTsdb),
......@@ -1086,6 +1259,20 @@ int32_t tsdbInsertRSmaData(STsdb *pTsdb, char *msg) {
return code;
}
/**
* @brief Get tSma data
*
* @param pTsdb
* @param pData
* @param indexUid
* @param interval
* @param intervalUnit
* @param tableUid
* @param colId
* @param querySKey
* @param nMaxResult
* @return int32_t
*/
int32_t tsdbGetTSmaData(STsdb *pTsdb, STSmaDataWrapper *pData, int64_t indexUid, int64_t interval, int8_t intervalUnit,
tb_uid_t tableUid, col_id_t colId, TSKEY querySKey, int32_t nMaxResult) {
int32_t code = TSDB_CODE_SUCCESS;
......@@ -1094,4 +1281,19 @@ int32_t tsdbGetTSmaData(STsdb *pTsdb, STSmaDataWrapper *pData, int64_t indexUid,
tsdbWarn("vgId:%d get tSma data failed since %s", REPO_ID(pTsdb), tstrerror(terrno));
}
return code;
}
/**
* @brief Drop tSma Data and caches
*
* @param pTsdb
* @param msg
* @return int32_t
*/
int32_t tsdbDropTSmaData(STsdb *pTsdb, int64_t indexUid) {
int32_t code = TSDB_CODE_SUCCESS;
if ((code = tsdbDropTSmaDataImpl(pTsdb, indexUid)) < 0) {
tsdbWarn("vgId:%d drop tSma data failed since %s", REPO_ID(pTsdb), tstrerror(terrno));
}
return code;
}
\ No newline at end of file
......@@ -202,15 +202,23 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
return -1;
}
if (metaDropTSma(pVnode->pMeta, vDropSmaReq.indexName) < 0) {
// TODO: handle error
return -1;
}
// TODO: send msg to stream computing to drop tSma
// if ((send msg to stream computing) < 0) {
// tdDestroyTSma(&vCreateSmaReq);
// return -1;
// }
//
if (metaDropTSma(pVnode->pMeta, vDropSmaReq.indexUid) < 0) {
// TODO: handle error
return -1;
}
if(tsdbDropTSmaData(pVnode->pTsdb, vDropSmaReq.indexUid) < 0) {
// TODO: handle error
return -1;
}
// TODO: return directly or go on follow steps?
#endif
} break;
......
......@@ -272,8 +272,8 @@ TEST(testCase, tSma_metaDB_Put_Get_Del_Test) {
taosArrayDestroy(pUids);
// resource release
metaRemoveSmaFromDb(pMeta, smaIndexName1);
metaRemoveSmaFromDb(pMeta, smaIndexName2);
metaRemoveSmaFromDb(pMeta, indexUid1);
metaRemoveSmaFromDb(pMeta, indexUid2);
tdDestroyTSma(&tSma);
metaClose(pMeta);
......
......@@ -338,8 +338,10 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TDB_MESSED_MSG, "TSDB messed message")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_IVLD_TAG_VAL, "TSDB invalid tag value")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_CACHE_LAST_ROW, "TSDB no cache last row data")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_TABLE_RECREATED, "Table re-created")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_SMA_INDEX_IN_META, "No sma index in meta")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_TDB_ENV_OPEN_ERROR, "TDB env open error")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_SMA_INDEX_IN_META, "No sma index in meta")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_SMA_STAT, "Invalid sma state")
// query
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INVALID_QHANDLE, "Invalid handle")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册