提交 f9d162ba 编写于 作者: C Cary Xu

tsma use TDB

上级 8b8ca3f2
......@@ -30,6 +30,7 @@ target_sources(
# tsdb
# "src/tsdb/tsdbBDBImpl.c"
"src/tsdb/tsdbTDBImpl.c"
"src/tsdb/tsdbCommit.c"
"src/tsdb/tsdbCompact.c"
"src/tsdb/tsdbFile.c"
......@@ -40,7 +41,7 @@ target_sources(
"src/tsdb/tsdbRead.c"
"src/tsdb/tsdbReadImpl.c"
"src/tsdb/tsdbScan.c"
# "src/tsdb/tsdbSma.c"
"src/tsdb/tsdbSma.c"
"src/tsdb/tsdbWrite.c"
# tq
......
......@@ -357,7 +357,7 @@ STbCfg *metaGetTbInfoByUid(SMeta *pMeta, tb_uid_t uid);
STbCfg *metaGetTbInfoByName(SMeta *pMeta, char *tbname, tb_uid_t *uid);
SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, bool isinline);
STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver);
STSma *metaGetSmaInfoByIndex(SMeta *pMeta, int64_t indexUid);
void *metaGetSmaInfoByIndex(SMeta *pMeta, int64_t indexUid, bool isDecode);
STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid);
SArray *metaGetSmaTbUids(SMeta *pMeta, bool isDup);
int metaGetTbNum(SMeta *pMeta);
......@@ -369,8 +369,8 @@ void metaCloseCtbCurosr(SMCtbCursor *pCtbCur);
tb_uid_t metaCtbCursorNext(SMCtbCursor *pCtbCur);
SMSmaCursor *metaOpenSmaCursor(SMeta *pMeta, tb_uid_t uid);
void metaCloseSmaCurosr(SMSmaCursor *pSmaCur);
const char *metaSmaCursorNext(SMSmaCursor *pSmaCur);
void metaCloseSmaCursor(SMSmaCursor *pSmaCur);
int64_t metaSmaCursorNext(SMSmaCursor *pSmaCur);
// Options
void metaOptionsInit(SMetaCfg *pMetaCfg);
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_VNODE_TSDB_SMA_H_
#define _TD_VNODE_TSDB_SMA_H_
#include "tdbInt.h"
#ifdef __cplusplus
extern "C" {
#endif
typedef struct SSmaKey SSmaKey;
struct SSmaKey {
TSKEY skey;
int64_t groupId;
};
typedef struct SDBFile SDBFile;
struct SDBFile {
int32_t fid;
TDB *pDB;
char *path;
};
int32_t tsdbOpenDBEnv(TENV **ppEnv, const char *path);
int32_t tsdbCloseDBEnv(TENV *pEnv);
int32_t tsdbOpenDBF(TENV *pEnv, SDBFile *pDBF);
int32_t tsdbCloseDBF(SDBFile *pDBF);
int32_t tsdbSaveSmaToDB(SDBFile *pDBF, void *pKey, int32_t keyLen, void *pVal, int32_t valLen, TXN *txn);
void *tsdbGetSmaDataByKey(SDBFile *pDBF, const void *pKey, int32_t keyLen, int32_t *valLen);
void tsdbDestroySmaEnv(SSmaEnv *pSmaEnv);
void *tsdbFreeSmaEnv(SSmaEnv *pSmaEnv);
#if 0
int32_t tsdbGetTSmaStatus(STsdb *pTsdb, STSma *param, void *result);
int32_t tsdbRemoveTSmaData(STsdb *pTsdb, STSma *param, STimeWindow *pWin);
#endif
// internal func
static FORCE_INLINE int32_t tsdbEncodeTSmaKey(int64_t groupId, TSKEY tsKey, void **pData) {
int32_t len = 0;
len += taosEncodeFixedI64(pData, tsKey);
len += taosEncodeFixedI64(pData, groupId);
return len;
}
#ifdef __cplusplus
}
#endif
#endif /*_TD_VNODE_TSDB_SMA_H_*/
\ No newline at end of file
......@@ -220,6 +220,8 @@ void smaHandleRes(void* pVnode, int64_t smaId, const SArray* data);
#include "tq.h"
#include "tsdbSma.h"
#ifdef __cplusplus
}
#endif
......
......@@ -667,7 +667,7 @@ STbCfg *metaGetTbInfoByName(SMeta *pMeta, char *tbname, tb_uid_t *uid) {
return pTbCfg;
}
STSma *metaGetSmaInfoByIndex(SMeta *pMeta, int64_t indexUid) {
void *metaGetSmaInfoByIndex(SMeta *pMeta, int64_t indexUid, bool isDecode) {
STSma * pCfg = NULL;
SMetaDB *pDB = pMeta->pDB;
DBT key = {0};
......@@ -920,7 +920,7 @@ SMSmaCursor *metaOpenSmaCursor(SMeta *pMeta, tb_uid_t uid) {
return pCur;
}
void metaCloseSmaCurosr(SMSmaCursor *pCur) {
void metaCloseSmaCursor(SMSmaCursor *pCur) {
if (pCur) {
if (pCur->pCur) {
pCur->pCur->close(pCur->pCur);
......@@ -930,7 +930,8 @@ void metaCloseSmaCurosr(SMSmaCursor *pCur) {
}
}
const char *metaSmaCursorNext(SMSmaCursor *pCur) {
int64_t metaSmaCursorNext(SMSmaCursor *pCur) {
#if 0
DBT skey = {0};
DBT pkey = {0};
DBT pval = {0};
......@@ -946,6 +947,8 @@ const char *metaSmaCursorNext(SMSmaCursor *pCur) {
} else {
return NULL;
}
#endif
return 0;
}
STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid) {
......@@ -972,7 +975,7 @@ STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid) {
++pSW->number;
STSma *tptr = (STSma *)taosMemoryRealloc(pSW->tSma, pSW->number * sizeof(STSma));
if (tptr == NULL) {
metaCloseSmaCurosr(pCur);
metaCloseSmaCursor(pCur);
tdDestroyTSmaWrapper(pSW);
taosMemoryFreeClear(pSW);
return NULL;
......@@ -980,7 +983,7 @@ STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid) {
pSW->tSma = tptr;
pBuf = pval.data;
if (tDecodeTSma(pBuf, pSW->tSma + pSW->number - 1) == NULL) {
metaCloseSmaCurosr(pCur);
metaCloseSmaCursor(pCur);
tdDestroyTSmaWrapper(pSW);
taosMemoryFreeClear(pSW);
return NULL;
......@@ -990,7 +993,7 @@ STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid) {
break;
}
metaCloseSmaCurosr(pCur);
metaCloseSmaCursor(pCur);
return pSW;
}
......@@ -1004,7 +1007,7 @@ SArray *metaGetSmaTbUids(SMeta *pMeta, bool isDup) {
int ret;
// TODO: lock?
ret = pDB->pCtbIdx->cursor(pDB->pSmaIdx, NULL, &pCur, 0);
ret = pDB->pSmaIdx->cursor(pDB->pSmaIdx, NULL, &pCur, 0);
if (ret != 0) {
return NULL;
}
......
......@@ -22,6 +22,8 @@ typedef struct SPoolMem {
struct SPoolMem *next;
} SPoolMem;
#define META_TDB_SMA_TEST
static SPoolMem *openPool();
static void clearPool(SPoolMem *pPool);
static void closePool(SPoolMem *pPool);
......@@ -38,6 +40,10 @@ struct SMetaDB {
TDB * pNtbIdx;
TDB * pCtbIdx;
SPoolMem *pPool;
#ifdef META_TDB_SMA_TEST
TDB *pSmaDB;
TDB *pSmaIdx;
#endif
};
typedef struct __attribute__((__packed__)) {
......@@ -55,6 +61,11 @@ typedef struct {
tb_uid_t uid;
} SCtbIdxKey;
typedef struct {
tb_uid_t uid;
int64_t smaUid;
} SSmaIdxKey;
static int metaEncodeTbInfo(void **buf, STbCfg *pTbCfg);
static void *metaDecodeTbInfo(void *buf, STbCfg *pTbCfg);
static int metaEncodeSchema(void **buf, SSchemaWrapper *pSW);
......@@ -115,6 +126,17 @@ static inline int metaCtbIdxCmpr(const void *arg1, int len1, const void *arg2, i
return metaUidCmpr(&pKey1->uid, sizeof(tb_uid_t), &pKey2->uid, sizeof(tb_uid_t));
}
static inline int metaSmaIdxCmpr(const void *arg1, int len1, const void *arg2, int len2) {
int c;
SSmaIdxKey *pKey1 = (SSmaIdxKey *)arg1;
SSmaIdxKey *pKey2 = (SSmaIdxKey *)arg2;
c = metaUidCmpr(arg1, sizeof(tb_uid_t), arg2, sizeof(tb_uid_t));
if (c) return c;
return metaUidCmpr(&pKey1->smaUid, sizeof(int64_t), &pKey2->smaUid, sizeof(int64_t));
}
int metaOpenDB(SMeta *pMeta) {
SMetaDB *pMetaDb;
int ret;
......@@ -143,6 +165,15 @@ int metaOpenDB(SMeta *pMeta) {
return -1;
}
#ifdef META_TDB_SMA_TEST
ret = tdbDbOpen("sma.db", sizeof(int64_t), TDB_VARIANT_LEN, metaUidCmpr, pMetaDb->pEnv, &(pMetaDb->pSmaDB));
if (ret < 0) {
// TODO
ASSERT(0);
return -1;
}
#endif
// open schema DB
ret = tdbDbOpen("schema.db", sizeof(SSchemaDbKey), TDB_VARIANT_LEN, metaSchemaKeyCmpr, pMetaDb->pEnv,
&(pMetaDb->pSchemaDB));
......@@ -180,6 +211,15 @@ int metaOpenDB(SMeta *pMeta) {
return -1;
}
#ifdef META_TDB_SMA_TEST
ret = tdbDbOpen("sma.idx", sizeof(SSmaIdxKey), 0, metaSmaIdxCmpr, pMetaDb->pEnv, &(pMetaDb->pSmaIdx));
if (ret < 0) {
// TODO
ASSERT(0);
return -1;
}
#endif
pMetaDb->pPool = openPool();
tdbTxnOpen(&pMetaDb->txn, 0, poolMalloc, poolFree, pMetaDb->pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
tdbBegin(pMetaDb->pEnv, NULL);
......@@ -193,10 +233,16 @@ void metaCloseDB(SMeta *pMeta) {
tdbCommit(pMeta->pDB->pEnv, &pMeta->pDB->txn);
tdbTxnClose(&pMeta->pDB->txn);
clearPool(pMeta->pDB->pPool);
#ifdef META_TDB_SMA_TEST
tdbDbClose(pMeta->pDB->pSmaIdx);
#endif
tdbDbClose(pMeta->pDB->pCtbIdx);
tdbDbClose(pMeta->pDB->pNtbIdx);
tdbDbClose(pMeta->pDB->pStbIdx);
tdbDbClose(pMeta->pDB->pNameIdx);
#ifdef META_TDB_SMA_TEST
tdbDbClose(pMeta->pDB->pSmaDB);
#endif
tdbDbClose(pMeta->pDB->pSchemaDB);
tdbDbClose(pMeta->pDB->pTbDB);
taosMemoryFree(pMeta->pDB);
......@@ -491,7 +537,6 @@ char *metaTbCursorNext(SMTbCursor *pTbCur) {
taosMemoryFree(tbCfg.name);
taosMemoryFree(tbCfg.stbCfg.pTagSchema);
continue;
;
} else if (tbCfg.type == META_CHILD_TABLE) {
kvRowFree(tbCfg.ctbCfg.pTag);
}
......@@ -566,51 +611,326 @@ int metaGetTbNum(SMeta *pMeta) {
return 0;
}
struct SMSmaCursor {
TDBC *pCur;
tb_uid_t uid;
void *pKey;
void *pVal;
int kLen;
int vLen;
};
STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid) {
// TODO
ASSERT(0);
// ASSERT(0);
// return NULL;
#ifdef META_TDB_SMA_TEST
STSmaWrapper *pSW = NULL;
pSW = taosMemoryCalloc(1, sizeof(*pSW));
if (pSW == NULL) {
return NULL;
}
SMSmaCursor *pCur = metaOpenSmaCursor(pMeta, uid);
if (pCur == NULL) {
taosMemoryFree(pSW);
return NULL;
}
void *pBuf = NULL;
SSmaIdxKey *pSmaIdxKey = NULL;
while (true) {
// TODO: lock during iterate?
if (tdbDbNext(pCur->pCur, &pCur->pKey, &pCur->kLen, NULL, &pCur->vLen) == 0) {
pSmaIdxKey = pCur->pKey;
ASSERT(pSmaIdxKey != NULL);
void *pSmaVal = metaGetSmaInfoByIndex(pMeta, pSmaIdxKey->smaUid, false);
if (pSmaVal == NULL) {
tsdbWarn("no tsma exists for indexUid: %" PRIi64, pSmaIdxKey->smaUid);
continue;
}
++pSW->number;
STSma *tptr = (STSma *)taosMemoryRealloc(pSW->tSma, pSW->number * sizeof(STSma));
if (tptr == NULL) {
TDB_FREE(pSmaVal);
metaCloseSmaCursor(pCur);
tdDestroyTSmaWrapper(pSW);
taosMemoryFreeClear(pSW);
return NULL;
}
pSW->tSma = tptr;
pBuf = pSmaVal;
if (tDecodeTSma(pBuf, pSW->tSma + pSW->number - 1) == NULL) {
TDB_FREE(pSmaVal);
metaCloseSmaCursor(pCur);
tdDestroyTSmaWrapper(pSW);
taosMemoryFreeClear(pSW);
return NULL;
}
TDB_FREE(pSmaVal);
continue;
}
break;
}
metaCloseSmaCursor(pCur);
return pSW;
#endif
}
int metaRemoveSmaFromDb(SMeta *pMeta, int64_t indexUid) {
// TODO
ASSERT(0);
#ifndef META_TDB_SMA_TEST
DBT key = {0};
key.data = (void *)indexName;
key.size = strlen(indexName);
metaDBWLock(pMeta->pDB);
// TODO: No guarantee of consistence.
// Use transaction or DB->sync() for some guarantee.
pMeta->pDB->pSmaDB->del(pMeta->pDB->pSmaDB, NULL, &key, 0);
metaDBULock(pMeta->pDB);
#endif
return 0;
}
int metaSaveSmaToDB(SMeta *pMeta, STSma *pSmaCfg) {
// TODO
ASSERT(0);
// ASSERT(0);
#ifdef META_TDB_SMA_TEST
int32_t ret = 0;
SMetaDB *pMetaDb = pMeta->pDB;
void *pBuf = NULL, *qBuf = NULL;
void *key = {0}, *val = {0};
// save sma info
int32_t len = tEncodeTSma(NULL, pSmaCfg);
pBuf = taosMemoryCalloc(1, len);
if (pBuf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
key = (void *)&pSmaCfg->indexUid;
qBuf = pBuf;
tEncodeTSma(&qBuf, pSmaCfg);
val = pBuf;
int32_t kLen = sizeof(pSmaCfg->indexUid);
int32_t vLen = POINTER_DISTANCE(qBuf, pBuf);
ret = tdbDbInsert(pMeta->pDB->pSmaDB, key, kLen, val, vLen, &pMetaDb->txn);
if (ret < 0) {
taosMemoryFreeClear(pBuf);
return -1;
}
// add sma idx
SSmaIdxKey smaIdxKey;
smaIdxKey.uid = pSmaCfg->tableUid;
smaIdxKey.smaUid = pSmaCfg->indexUid;
key = &smaIdxKey;
kLen = sizeof(smaIdxKey);
val = NULL;
vLen = 0;
ret = tdbDbInsert(pMeta->pDB->pSmaIdx, key, kLen, val, vLen, &pMetaDb->txn);
if (ret < 0) {
taosMemoryFreeClear(pBuf);
return -1;
}
// release
taosMemoryFreeClear(pBuf);
if (pMeta->pDB->pPool->size > 0) {
metaCommit(pMeta);
}
#endif
return 0;
}
STSma *metaGetSmaInfoByIndex(SMeta *pMeta, int64_t indexUid) {
void *metaGetSmaInfoByIndex(SMeta *pMeta, int64_t indexUid, bool isDecode) {
// TODO
ASSERT(0);
// ASSERT(0);
// return NULL;
#ifdef META_TDB_SMA_TEST
SMetaDB *pDB = pMeta->pDB;
void *pKey = NULL;
void *pVal = NULL;
int kLen = 0;
int vLen = 0;
int ret = -1;
// Set key
pKey = (void *)&indexUid;
kLen = sizeof(indexUid);
// Query
ret = tdbDbGet(pDB->pSmaDB, pKey, kLen, &pVal, &vLen);
if (ret != 0 || !pVal) {
return NULL;
}
if (!isDecode) {
// return raw value
return pVal;
}
// Decode
STSma *pCfg = (STSma *)taosMemoryCalloc(1, sizeof(STSma));
if (pCfg == NULL) {
taosMemoryFree(pVal);
return NULL;
}
void *pBuf = pVal;
if (tDecodeTSma(pBuf, pCfg) == NULL) {
tdDestroyTSma(pCfg);
taosMemoryFree(pCfg);
TDB_FREE(pVal);
return NULL;
}
TDB_FREE(pVal);
return pCfg;
#endif
}
const char *metaSmaCursorNext(SMSmaCursor *pCur) {
/**
* @brief
*
* @param pMeta
* @param uid 0 means iterate all uids.
* @return SMSmaCursor*
*/
SMSmaCursor *metaOpenSmaCursor(SMeta *pMeta, tb_uid_t uid) {
// TODO
ASSERT(0);
// ASSERT(0);
// return NULL;
#ifdef META_TDB_SMA_TEST
SMSmaCursor *pCur = NULL;
SMetaDB *pDB = pMeta->pDB;
int ret;
pCur = (SMSmaCursor *)taosMemoryCalloc(1, sizeof(*pCur));
if (pCur == NULL) {
return NULL;
}
pCur->uid = uid;
ret = tdbDbcOpen(pDB->pSmaIdx, &(pCur->pCur));
if ((ret != 0) || (pCur->pCur == NULL)) {
taosMemoryFree(pCur);
return NULL;
}
if (uid != 0) {
// TODO: move to the specific uid
}
return pCur;
#endif
}
void metaCloseSmaCurosr(SMSmaCursor *pCur) {
/**
* @brief
*
* @param pCur
* @return int64_t smaIndexUid
*/
int64_t metaSmaCursorNext(SMSmaCursor *pCur) {
// TODO
ASSERT(0);
// ASSERT(0);
// return NULL;
#ifdef META_TDB_SMA_TEST
int ret;
void *pBuf;
SSmaIdxKey *smaIdxKey;
ret = tdbDbNext(pCur->pCur, &pCur->pKey, &pCur->kLen, &pCur->pVal, &pCur->vLen);
if (ret < 0) {
return 0;
}
smaIdxKey = pCur->pKey;
return smaIdxKey->smaUid;
#endif
}
void metaCloseSmaCursor(SMSmaCursor *pCur) {
// TODO
// ASSERT(0);
#ifdef META_TDB_SMA_TEST
if (pCur) {
if (pCur->pCur) {
tdbDbcClose(pCur->pCur);
}
taosMemoryFree(pCur);
}
#endif
}
SArray *metaGetSmaTbUids(SMeta *pMeta, bool isDup) {
// TODO
// ASSERT(0); // comment this line to pass CI
// return NULL:
#ifdef META_TDB_SMA_TEST
SArray *pUids = NULL;
SMetaDB *pDB = pMeta->pDB;
void *pKey;
// TODO: lock?
SMSmaCursor *pCur = metaOpenSmaCursor(pMeta, 0);
if (pCur == NULL) {
return NULL;
}
}
// TODO: lock?
SMSmaCursor *metaOpenSmaCursor(SMeta *pMeta, tb_uid_t uid) {
// TODO
ASSERT(0);
SSmaIdxKey *pSmaIdxKey = NULL;
tb_uid_t uid = 0;
while (true) {
// TODO: lock during iterate?
if (tdbDbNext(pCur->pCur, &pCur->pKey, &pCur->kLen, NULL, &pCur->vLen) == 0) {
ASSERT(pSmaIdxKey != NULL);
pSmaIdxKey = pCur->pKey;
if (pSmaIdxKey->uid == 0 || pSmaIdxKey->uid == uid) {
continue;
}
uid = pSmaIdxKey->uid;
if (!pUids) {
pUids = taosArrayInit(16, sizeof(tb_uid_t));
if (!pUids) {
metaCloseSmaCursor(pCur);
return NULL;
}
}
taosArrayPush(pUids, &uid);
continue;
}
break;
}
metaCloseSmaCursor(pCur);
return pUids;
#endif
}
static int metaEncodeSchema(void **buf, SSchemaWrapper *pSW) {
......
......@@ -82,9 +82,9 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t versi
memcpy(data, msg, msgLen);
if (msgType == TDMT_VND_SUBMIT) {
// if (tsdbUpdateSmaWindow(pTq->pVnode->pTsdb, msg) != 0) {
// return -1;
// }
if (tsdbUpdateSmaWindow(pTq->pVnode->pTsdb, msg) != 0) {
return -1;
}
}
SRpcMsg req = {
......
......@@ -38,6 +38,29 @@ typedef enum {
SMA_STORAGE_LEVEL_DFILESET = 1 // use days of TS data e.g. vnode${N}/tsdb/tsma/sma_index_uid/v2f1906.tsma
} ESmaStorageLevel;
typedef struct SPoolMem {
int64_t size;
struct SPoolMem *prev;
struct SPoolMem *next;
} SPoolMem;
struct SSmaEnv {
TdThreadRwlock lock;
TXN txn;
SPoolMem *pPool;
SDiskID did;
TENV *dbEnv; // TODO: If it's better to put it in smaIndex level?
char *path; // relative path
SSmaStat *pStat;
};
#define SMA_ENV_LOCK(env) ((env)->lock)
#define SMA_ENV_DID(env) ((env)->did)
#define SMA_ENV_ENV(env) ((env)->dbEnv)
#define SMA_ENV_PATH(env) ((env)->path)
#define SMA_ENV_STAT(env) ((env)->pStat)
#define SMA_ENV_STAT_ITEMS(env) ((env)->pStat->smaStatItems)
typedef struct {
STsdb *pTsdb;
SDBFile dFile;
......@@ -104,7 +127,8 @@ static void tsdbDestroyTSmaWriteH(STSmaWriteH *pSmaH);
static int32_t tsdbInitTSmaReadH(STSmaReadH *pSmaH, STsdb *pTsdb, int64_t interval, int8_t intervalUnit);
static int32_t tsdbGetSmaStorageLevel(int64_t interval, int8_t intervalUnit);
static int32_t tsdbSetRSmaDataFile(STSmaWriteH *pSmaH, int32_t fid);
static int32_t tsdbInsertTSmaBlocks(STSmaWriteH *pSmaH, void *smaKey, uint32_t keyLen, void *pData, uint32_t dataLen);
static int32_t tsdbInsertTSmaBlocks(STSmaWriteH *pSmaH, void *smaKey, int32_t keyLen, void *pData, int32_t dataLen,
TXN *txn);
static int64_t tsdbGetIntervalByPrecision(int64_t interval, uint8_t intervalUnit, int8_t precision, bool adjusted);
static int32_t tsdbGetTSmaDays(STsdb *pTsdb, int64_t interval, int32_t storageLevel);
static int32_t tsdbSetTSmaDataFile(STSmaWriteH *pSmaH, int64_t indexUid, int32_t fid);
......@@ -117,9 +141,121 @@ static int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, const char *msg);
// mgmt interface
static int32_t tsdbDropTSmaDataImpl(STsdb *pTsdb, int64_t indexUid);
// Pool Memory
static SPoolMem *openPool();
static void clearPool(SPoolMem *pPool);
static void closePool(SPoolMem *pPool);
static void *poolMalloc(void *arg, size_t size);
static void poolFree(void *arg, void *ptr);
static int tsdbSmaBeginCommit(SSmaEnv *pEnv);
static int tsdbSmaEndCommit(SSmaEnv *pEnv);
// implementation
static FORCE_INLINE int16_t tsdbTSmaAdd(STsdb *pTsdb, int16_t n) { return atomic_add_fetch_16(&REPO_TSMA_NUM(pTsdb), n); }
static FORCE_INLINE int16_t tsdbTSmaSub(STsdb *pTsdb, int16_t n) { return atomic_sub_fetch_16(&REPO_TSMA_NUM(pTsdb), n); }
static FORCE_INLINE int16_t tsdbTSmaAdd(STsdb *pTsdb, int16_t n) {
return atomic_add_fetch_16(&REPO_TSMA_NUM(pTsdb), n);
}
static FORCE_INLINE int16_t tsdbTSmaSub(STsdb *pTsdb, int16_t n) {
return atomic_sub_fetch_16(&REPO_TSMA_NUM(pTsdb), n);
}
static FORCE_INLINE int32_t tsdbRLockSma(SSmaEnv *pEnv) {
int code = taosThreadRwlockRdlock(&(pEnv->lock));
if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(code);
return -1;
}
return 0;
}
static FORCE_INLINE int32_t tsdbWLockSma(SSmaEnv *pEnv) {
int code = taosThreadRwlockWrlock(&(pEnv->lock));
if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(code);
return -1;
}
return 0;
}
static FORCE_INLINE int32_t tsdbUnLockSma(SSmaEnv *pEnv) {
int code = taosThreadRwlockUnlock(&(pEnv->lock));
if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(code);
return -1;
}
return 0;
}
static SPoolMem *openPool() {
SPoolMem *pPool = (SPoolMem *)tdbOsMalloc(sizeof(*pPool));
pPool->prev = pPool->next = pPool;
pPool->size = 0;
return pPool;
}
static void clearPool(SPoolMem *pPool) {
if (!pPool) return;
SPoolMem *pMem;
do {
pMem = pPool->next;
if (pMem == pPool) break;
pMem->next->prev = pMem->prev;
pMem->prev->next = pMem->next;
pPool->size -= pMem->size;
tdbOsFree(pMem);
} while (1);
assert(pPool->size == 0);
}
static void closePool(SPoolMem *pPool) {
if (pPool) {
clearPool(pPool);
tdbOsFree(pPool);
}
}
static void *poolMalloc(void *arg, size_t size) {
void *ptr = NULL;
SPoolMem *pPool = (SPoolMem *)arg;
SPoolMem *pMem;
pMem = (SPoolMem *)tdbOsMalloc(sizeof(*pMem) + size);
if (pMem == NULL) {
assert(0);
}
pMem->size = sizeof(*pMem) + size;
pMem->next = pPool->next;
pMem->prev = pPool;
pPool->next->prev = pMem;
pPool->next = pMem;
pPool->size += pMem->size;
ptr = (void *)(&pMem[1]);
return ptr;
}
static void poolFree(void *arg, void *ptr) {
SPoolMem *pPool = (SPoolMem *)arg;
SPoolMem *pMem;
pMem = &(((SPoolMem *)ptr)[-1]);
pMem->next->prev = pMem->prev;
pMem->prev->next = pMem->next;
pPool->size -= pMem->size;
tdbOsFree(pMem);
}
int32_t tsdbInitSma(STsdb *pTsdb) {
// tSma
......@@ -213,7 +349,12 @@ static SSmaEnv *tsdbNewSmaEnv(const STsdb *pTsdb, const char *path, SDiskID did)
char aname[TSDB_FILENAME_LEN] = {0};
tfsAbsoluteName(pTsdb->pTfs, did, path, aname);
if (tsdbOpenBDBEnv(&pEnv->dbEnv, aname) != TSDB_CODE_SUCCESS) {
if (tsdbOpenDBEnv(&pEnv->dbEnv, aname) != TSDB_CODE_SUCCESS) {
tsdbFreeSmaEnv(pEnv);
return NULL;
}
if ((pEnv->pPool = openPool()) == NULL) {
tsdbFreeSmaEnv(pEnv);
return NULL;
}
......@@ -248,7 +389,8 @@ void tsdbDestroySmaEnv(SSmaEnv *pSmaEnv) {
taosMemoryFreeClear(pSmaEnv->pStat);
taosMemoryFreeClear(pSmaEnv->path);
taosThreadRwlockDestroy(&(pSmaEnv->lock));
tsdbCloseBDBEnv(pSmaEnv->dbEnv);
tsdbCloseDBEnv(pSmaEnv->dbEnv);
closePool(pSmaEnv->pPool);
}
}
......@@ -414,7 +556,7 @@ static int32_t tsdbSetExpiredWindow(STsdb *pTsdb, SHashObj *pItemsHash, int64_t
}
// cache smaMeta
STSma *pSma = metaGetSmaInfoByIndex(pTsdb->pMeta, indexUid);
STSma *pSma = metaGetSmaInfoByIndex(pTsdb->pMeta, indexUid, true);
if (pSma == NULL) {
terrno = TSDB_CODE_TDB_NO_SMA_INDEX_IN_META;
taosHashCleanup(pItem->expiredWindows);
......@@ -498,10 +640,6 @@ int32_t tsdbUpdateExpiredWindowImpl(STsdb *pTsdb, SSubmitReq *pMsg) {
return TSDB_CODE_FAILED;
}
#ifndef TSDB_SMA_TEST
TSKEY expiredWindows[SMA_TEST_EXPIRED_WINDOW_SIZE];
#endif
// Firstly, assume that tSma can only be created on super table/normal table.
// getActiveTimeWindow
......@@ -563,6 +701,10 @@ int32_t tsdbUpdateExpiredWindowImpl(STsdb *pTsdb, SSubmitReq *pMsg) {
TSKEY winSKey = taosTimeTruncate(TD_ROW_KEY(row), &interval, interval.precision);
tsdbSetExpiredWindow(pTsdb, pItemsHash, pTSma->indexUid, winSKey);
// TODO: release only when suid changes.
tdDestroyTSmaWrapper(pSW);
taosMemoryFreeClear(pSW);
}
}
......@@ -676,10 +818,12 @@ static int32_t tsdbGetSmaStorageLevel(int64_t interval, int8_t intervalUnit) {
* @param dataLen
* @return int32_t
*/
static int32_t tsdbInsertTSmaBlocks(STSmaWriteH *pSmaH, void *smaKey, uint32_t keyLen, void *pData, uint32_t dataLen) {
static int32_t tsdbInsertTSmaBlocks(STSmaWriteH *pSmaH, void *smaKey, int32_t keyLen, void *pData, int32_t dataLen,
TXN *txn) {
SDBFile *pDBFile = &pSmaH->dFile;
// TODO: insert sma data blocks into B+Tree(TDB)
if (tsdbSaveSmaToDB(pDBFile, smaKey, keyLen, pData, dataLen) != 0) {
if (tsdbSaveSmaToDB(pDBFile, smaKey, keyLen, pData, dataLen, txn) != 0) {
tsdbWarn("vgId:%d insert sma data blocks into %s: smaKey %" PRIx64 "-%" PRIx64 ", dataLen %" PRIu32 " fail",
REPO_ID(pSmaH->pTsdb), pDBFile->path, *(int64_t *)smaKey, *(int64_t *)POINTER_SHIFT(smaKey, 8), dataLen);
return TSDB_CODE_FAILED;
......@@ -826,6 +970,30 @@ static int32_t tsdbGetTSmaDays(STsdb *pTsdb, int64_t interval, int32_t storageLe
return daysPerFile;
}
static int tsdbSmaBeginCommit(SSmaEnv *pEnv) {
TXN *pTxn = &pEnv->txn;
// start a new txn
tdbTxnOpen(pTxn, 0, poolMalloc, poolFree, pEnv->pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
if (tdbBegin(pEnv->dbEnv, pTxn) != 0) {
tsdbWarn("tsdbSma tdb restart txn fail");
return -1;
}
return 0;
}
static int tsdbSmaEndCommit(SSmaEnv *pEnv) {
TXN *pTxn = &pEnv->txn;
// Commit current txn
if (tdbCommit(pEnv->dbEnv, pTxn) != 0) {
tsdbWarn("tsdbSma tdb commit fail");
return -1;
}
tdbTxnClose(pTxn);
clearPool(pEnv->pPool);
return 0;
}
/**
* @brief Insert/Update Time-range-wise SMA data.
* - If interval < SMA_STORAGE_SPLIT_HOURS(e.g. 24), save the SMA data as a part of DFileSet to e.g.
......@@ -911,14 +1079,10 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, int64_t indexUid, const char
int64_t groupId = pDataBlock->info.groupId;
for (int32_t j = 0; j < rows; ++j) {
printf("|");
TSKEY skey = 1649295200000; // TSKEY_INITIAL_VAL; // the start key of TS window by interval
TSKEY skey = TSKEY_INITIAL_VAL; // the start key of TS window by interval
void *pSmaKey = &smaKey;
bool isStartKey = false;
{
// just for debugging
isStartKey = true;
tsdbEncodeTSmaKey(groupId, skey, &pSmaKey);
}
int32_t tlen = 0; // reset the len
pDataBuf = &dataBuf; // reset the buf
for (int32_t k = 0; k < colNum; ++k) {
......@@ -929,7 +1093,7 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, int64_t indexUid, const char
if (!isStartKey) {
isStartKey = true;
skey = *(TSKEY *)var;
printf("==> skey = %" PRIi64 " groupId = %" PRIi64 "|", skey, groupId);
printf("= skey %" PRIi64 " groupId = %" PRIi64 "|", skey, groupId);
tsdbEncodeTSmaKey(groupId, skey, &pSmaKey);
} else {
printf(" %" PRIi64 " |", *(int64_t *)var);
......@@ -1010,6 +1174,7 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, int64_t indexUid, const char
// TODO: tsdbStartTSmaCommit();
if (fid != tSmaH.dFile.fid) {
if (tSmaH.dFile.fid != TSDB_IVLD_FID) {
tsdbSmaEndCommit(pEnv);
tsdbCloseDBF(&tSmaH.dFile);
}
tsdbSetTSmaDataFile(&tSmaH, indexUid, fid);
......@@ -1020,12 +1185,14 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, int64_t indexUid, const char
tsdbUnRefSmaStat(pTsdb, pStat);
return TSDB_CODE_FAILED;
}
tsdbSmaBeginCommit(pEnv);
}
if (tsdbInsertTSmaBlocks(&tSmaH, &smaKey, SMA_KEY_LEN, dataBuf, tlen) != 0) {
if (tsdbInsertTSmaBlocks(&tSmaH, &smaKey, SMA_KEY_LEN, dataBuf, tlen, &pEnv->txn) != 0) {
tsdbWarn("vgId:%d insert tSma data blocks fail for index %" PRIi64 ", skey %" PRIi64 ", groupId %" PRIi64
" since %s",
REPO_ID(pTsdb), indexUid, skey, groupId, tstrerror(terrno));
tsdbSmaEndCommit(pEnv);
tsdbDestroyTSmaWriteH(&tSmaH);
tsdbUnRefSmaStat(pTsdb, pStat);
return TSDB_CODE_FAILED;
......@@ -1044,9 +1211,10 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, int64_t indexUid, const char
printf("\n");
}
}
tsdbSmaEndCommit(pEnv); // TODO: not commit for every insert
tsdbDestroyTSmaWriteH(&tSmaH);
tsdbUnRefSmaStat(pTsdb, pStat);
return TSDB_CODE_SUCCESS;
}
......@@ -1371,7 +1539,7 @@ static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, char *pData, int64_t indexUid,
tReadH.dFile.path, *(int64_t *)smaKey, *(int64_t *)POINTER_SHIFT(smaKey, 8), SMA_KEY_LEN);
void *result = NULL;
uint32_t valueSize = 0;
int32_t valueSize = 0;
if ((result = tsdbGetSmaDataByKey(&tReadH.dFile, smaKey, SMA_KEY_LEN, &valueSize)) == NULL) {
tsdbWarn("vgId:%d get sma data failed from smaIndex %" PRIi64 ", smaKey %" PRIx64 "-%" PRIx64 " since %s",
REPO_ID(pTsdb), indexUid, *(int64_t *)smaKey, *(int64_t *)POINTER_SHIFT(smaKey, 8), tstrerror(terrno));
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define ALLOW_FORBID_FUNC
#include "vnodeInt.h"
int32_t tsdbOpenDBEnv(TENV **ppEnv, const char *path) {
int ret = 0;
if (path == NULL) return -1;
ret = tdbEnvOpen(path, 4096, 256, ppEnv); // use as param
if (ret != 0) {
tsdbError("Failed to create tsdb db env, ret = %d", ret);
return -1;
}
return 0;
}
int32_t tsdbCloseDBEnv(TENV *pEnv) { return tdbEnvClose(pEnv); }
static inline int tsdbSmaKeyCmpr(const void *arg1, int len1, const void *arg2, int len2) {
const SSmaKey *pKey1 = (const SSmaKey *)arg1;
const SSmaKey *pKey2 = (const SSmaKey *)arg2;
ASSERT(len1 == len2 && len1 == sizeof(SSmaKey));
if (pKey1->skey < pKey2->skey) {
return -1;
} else if (pKey1->skey > pKey2->skey) {
return 1;
}
if (pKey1->groupId < pKey2->groupId) {
return -1;
} else if (pKey1->groupId > pKey2->groupId) {
return 1;
}
return 0;
}
static int32_t tsdbOpenDBDb(TDB **ppDB, TENV *pEnv, const char *pFName) {
int ret;
FKeyComparator compFunc;
// Create a database
compFunc = tsdbSmaKeyCmpr;
ret = tdbDbOpen(pFName, TDB_VARIANT_LEN, TDB_VARIANT_LEN, compFunc, pEnv, ppDB);
return 0;
}
static int32_t tsdbCloseDBDb(TDB *pDB) { return tdbDbClose(pDB); }
int32_t tsdbOpenDBF(TENV *pEnv, SDBFile *pDBF) {
// TEnv is shared by a group of SDBFile
if (!pEnv || !pDBF) {
terrno = TSDB_CODE_INVALID_PTR;
return -1;
}
// Open DBF
if (tsdbOpenDBDb(&(pDBF->pDB), pEnv, pDBF->path) < 0) {
terrno = TSDB_CODE_TDB_INIT_FAILED;
tsdbCloseDBDb(pDBF->pDB);
return -1;
}
return 0;
}
int32_t tsdbCloseDBF(SDBFile *pDBF) {
int32_t ret = 0;
if (pDBF->pDB) {
ret = tsdbCloseDBDb(pDBF->pDB);
pDBF->pDB = NULL;
}
taosMemoryFreeClear(pDBF->path);
return ret;
}
int32_t tsdbSaveSmaToDB(SDBFile *pDBF, void *pKey, int32_t keyLen, void *pVal, int32_t valLen, TXN *txn) {
int32_t ret;
ret = tdbDbInsert(pDBF->pDB, pKey, keyLen, pVal, valLen, txn);
if (ret < 0) {
tsdbError("Failed to create insert sma data into db, ret = %d", ret);
return -1;
}
return 0;
}
void *tsdbGetSmaDataByKey(SDBFile *pDBF, const void *pKey, int32_t keyLen, int32_t *valLen) {
void *result;
void *pVal;
int ret;
ret = tdbDbGet(pDBF->pDB, pKey, keyLen, &pVal, valLen);
if (ret < 0) {
tsdbError("Failed to get sma data from db, ret = %d", ret);
return NULL;
}
ASSERT(*valLen >= 0);
result = taosMemoryMalloc(*valLen);
if (result == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
// TODO: lock?
// TODO: Would the key/value be destoryed during return the data?
// TODO: How about the key is updated while value length is changed? The original value buffer would be freed
// automatically?
memcpy(result, pVal, *valLen);
return result;
}
\ No newline at end of file
......@@ -19,7 +19,7 @@ void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) {
// TODO
// blockDebugShowData(data);
// tsdbInsertTSmaData(((SVnode *)pVnode)->pTsdb, smaId, (const char *)data);
tsdbInsertTSmaData(((SVnode *)pVnode)->pTsdb, smaId, (const char *)data);
}
void vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) {
......@@ -232,9 +232,9 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
tdDestroyTSma(&vCreateSmaReq.tSma);
// TODO: return directly or go on follow steps?
#endif
// if (tsdbCreateTSma(pVnode->pTsdb, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) {
// // TODO
// }
if (tsdbCreateTSma(pVnode->pTsdb, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) {
// TODO
}
// } break;
// case TDMT_VND_CANCEL_SMA: { // timeRangeSMA
// } break;
......
......@@ -210,7 +210,7 @@ TEST(testCase, tSma_metaDB_Put_Get_Del_Test) {
// get value by indexName
STSma *qSmaCfg = NULL;
qSmaCfg = metaGetSmaInfoByIndex(pMeta, indexUid1);
qSmaCfg = metaGetSmaInfoByIndex(pMeta, indexUid1, true);
assert(qSmaCfg != NULL);
printf("name1 = %s\n", qSmaCfg->indexName);
printf("timezone1 = %" PRIi8 "\n", qSmaCfg->timezoneInt);
......@@ -221,7 +221,7 @@ TEST(testCase, tSma_metaDB_Put_Get_Del_Test) {
tdDestroyTSma(qSmaCfg);
taosMemoryFreeClear(qSmaCfg);
qSmaCfg = metaGetSmaInfoByIndex(pMeta, indexUid2);
qSmaCfg = metaGetSmaInfoByIndex(pMeta, indexUid2, true);
assert(qSmaCfg != NULL);
printf("name2 = %s\n", qSmaCfg->indexName);
printf("timezone2 = %" PRIi8 "\n", qSmaCfg->timezoneInt);
......@@ -233,11 +233,12 @@ TEST(testCase, tSma_metaDB_Put_Get_Del_Test) {
taosMemoryFreeClear(qSmaCfg);
// get index name by table uid
#if 0
SMSmaCursor *pSmaCur = metaOpenSmaCursor(pMeta, tbUid);
assert(pSmaCur != NULL);
uint32_t indexCnt = 0;
while (1) {
const char *indexName = metaSmaCursorNext(pSmaCur);
const char *indexName = (const char *)metaSmaCursorNext(pSmaCur);
if (indexName == NULL) {
break;
}
......@@ -245,8 +246,8 @@ TEST(testCase, tSma_metaDB_Put_Get_Del_Test) {
++indexCnt;
}
EXPECT_EQ(indexCnt, nCntTSma);
metaCloseSmaCurosr(pSmaCur);
metaCloseSmaCursor(pSmaCur);
#endif
// get wrapper by table uid
STSmaWrapper *pSW = metaGetSmaInfoByTable(pMeta, tbUid);
assert(pSW != NULL);
......
......@@ -1196,6 +1196,11 @@ int tdbBtreeNext(SBTC *pBtc, void **ppKey, int *kLen, void **ppVal, int *vLen) {
return -1;
}
*ppKey = pKey;
*kLen = cd.kLen;
memcpy(pKey, cd.pKey, cd.kLen);
if (ppVal) {
// TODO: vLen may be zero
pVal = TDB_REALLOC(*ppVal, cd.vLen);
if (pVal == NULL) {
......@@ -1203,14 +1208,10 @@ int tdbBtreeNext(SBTC *pBtc, void **ppKey, int *kLen, void **ppVal, int *vLen) {
return -1;
}
*ppKey = pKey;
*ppVal = pVal;
*kLen = cd.kLen;
*vLen = cd.vLen;
memcpy(pKey, cd.pKey, cd.kLen);
memcpy(pVal, cd.pVal, cd.vLen);
}
ret = tdbBtcMoveToNext(pBtc);
if (ret < 0) {
......
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sleep 50
sql connect
print =============== create database
sql create database d1
sql show databases
if $rows != 2 then
return -1
endi
print $data00 $data01 $data02
sql use d1
print =============== create super table, include column type for count/sum/min/max/first
sql create table if not exists stb (ts timestamp, c1 int, c2 float, c3 double) tags (t1 int unsigned)
sql show stables
if $rows != 1 then
return -1
endi
print =============== create child table
sql create table ct1 using stb tags(1000)
sql show tables
if $rows != 1 then
return -1
endi
print =============== insert data, mode1: one row one table in sql
sql insert into ct1 values(now+0s, 10, 2.0, 3.0)
sql insert into ct1 values(now+1s, 11, 2.1, 3.1)(now+2s, -12, -2.2, -3.2)(now+3s, -13, -2.3, -3.3)
print =============== create sma index from super table
sql create sma index sma_index_name1 on stb function(max(c1),max(c2),min(c1)) interval(5m,10s) sliding(2m)
print $data00 $data01 $data02 $data03
sleep 300
print =============== trigger stream to execute sma aggr task and insert sma data into sma store
sql insert into ct1 values(now+5s, 20, 20.0, 30.0)
#===================================================================
system sh/exec.sh -n dnode1 -s stop -x SIGINT
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册