未验证 提交 c906bc28 编写于 作者: C Cary Xu 提交者: GitHub

Merge pull request #10601 from taosdata/feature/TD-11463-3.0

[TD-11463-3.0] add SVCreateSmaReq/metaGetSmaTbUids
......@@ -1871,15 +1871,27 @@ typedef struct {
} STSma; // Time-range-wise SMA
typedef struct {
int8_t msgType; // 0 create, 1 recreate
int64_t ver; // use a general definition
STSma tSma;
STimeWindow window;
} SCreateTSmaMsg;
} SVCreateTSmaReq;
typedef struct {
STimeWindow window;
int8_t type; // 0 status report, 1 update data
char indexName[TSDB_INDEX_NAME_LEN + 1]; //
STimeWindow windows;
} STSmaMsg;
typedef struct {
int64_t ver; // use a general definition
char indexName[TSDB_INDEX_NAME_LEN + 1];
} SDropTSmaMsg;
} SVDropTSmaReq;
typedef struct {
} SVCreateTSmaRsp, SVDropTSmaRsp;
int32_t tSerializeSVCreateTSmaReq(void** buf, SVCreateTSmaReq* pReq);
void* tDeserializeSVCreateTSmaReq(void* buf, SVCreateTSmaReq* pReq);
int32_t tSerializeSVDropTSmaReq(void** buf, SVDropTSmaReq* pReq);
void* tDeserializeSVDropTSmaReq(void* buf, SVDropTSmaReq* pReq);
typedef struct {
STimeWindow tsWindow; // [skey, ekey]
......@@ -1901,22 +1913,18 @@ static FORCE_INLINE void tdDestroySmaData(STSmaData* pSmaData) {
}
}
// RSma: Time-range-wise Rollup SMA
// TODO: refactor when rSma grammar defined finally =>
// RSma: Rollup SMA
typedef struct {
int64_t interval;
int32_t retention; // unit: day
uint16_t days; // unit: day
int8_t intervalUnit;
} SSmaParams;
// TODO: refactor when rSma grammar defined finally <=
typedef struct {
// TODO: refactor to use the real schema =>
STSma tsma;
float xFilesFactor;
SArray* smaParams; // SSmaParams
// TODO: refactor to use the real schema <=
} SRSma;
typedef struct {
......
......@@ -2390,3 +2390,36 @@ int32_t tDecodeSMqCMCommitOffsetReq(SCoder *decoder, SMqCMCommitOffsetReq *pReq)
tEndDecode(decoder);
return 0;
}
int32_t tSerializeSVCreateTSmaReq(void **buf, SVCreateTSmaReq *pReq) {
int32_t tlen = 0;
tlen += taosEncodeFixedI64(buf, pReq->ver);
tlen += tEncodeTSma(buf, &pReq->tSma);
return tlen;
}
void *tDeserializeSVCreateTSmaReq(void *buf, SVCreateTSmaReq *pReq) {
buf = taosDecodeFixedI64(buf, &(pReq->ver));
if ((buf = tDecodeTSma(buf, &pReq->tSma)) == NULL) {
tdDestroyTSma(&pReq->tSma);
}
return buf;
}
int32_t tSerializeSVDropTSmaReq(void **buf, SVDropTSmaReq *pReq) {
int32_t tlen = 0;
tlen += taosEncodeFixedI64(buf, pReq->ver);
tlen += taosEncodeString(buf, pReq->indexName);
return tlen;
}
void *tDeserializeSVDropTSmaReq(void *buf, SVDropTSmaReq *pReq) {
buf = taosDecodeFixedI64(buf, &(pReq->ver));
buf = taosDecodeStringTo(buf, pReq->indexName);
return buf;
}
......@@ -41,23 +41,26 @@ typedef struct SMCtbCursor SMCtbCursor;
typedef struct SMSmaCursor SMSmaCursor;
typedef SVCreateTbReq STbCfg;
typedef STSma SSmaCfg;
typedef SVCreateTSmaReq SSmaCfg;
// SMeta operations
SMeta *metaOpen(const char *path, const SMetaCfg *pMetaCfg, SMemAllocatorFactory *pMAF);
SMeta * metaOpen(const char *path, const SMetaCfg *pMetaCfg, SMemAllocatorFactory *pMAF);
void metaClose(SMeta *pMeta);
void metaRemove(const char *path);
int metaCreateTable(SMeta *pMeta, STbCfg *pTbCfg);
int metaDropTable(SMeta *pMeta, tb_uid_t uid);
int metaCommit(SMeta *pMeta);
int32_t metaCreateTSma(SMeta *pMeta, SSmaCfg *pCfg);
int32_t metaDropTSma(SMeta *pMeta, char *indexName);
// For Query
STbCfg * metaGetTbInfoByUid(SMeta *pMeta, tb_uid_t uid);
STbCfg * metaGetTbInfoByName(SMeta *pMeta, char *tbname, tb_uid_t *uid);
SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, bool isinline);
STSchema * metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver);
SSmaCfg * metaGetSmaInfoByName(SMeta *pMeta, const char *indexName);
STSma * metaGetSmaInfoByName(SMeta *pMeta, const char *indexName);
STSmaWrapper * metaGetSmaInfoByUid(SMeta *pMeta, tb_uid_t uid);
SArray * metaGetSmaTbUids(SMeta *pMeta, bool isDup);
SMTbCursor *metaOpenTbCursor(SMeta *pMeta);
void metaCloseTbCursor(SMTbCursor *pTbCur);
......
......@@ -33,7 +33,7 @@ int metaOpenDB(SMeta* pMeta);
void metaCloseDB(SMeta* pMeta);
int metaSaveTableToDB(SMeta* pMeta, STbCfg* pTbCfg);
int metaRemoveTableFromDb(SMeta* pMeta, tb_uid_t uid);
int metaSaveSmaToDB(SMeta* pMeta, SSmaCfg* pTbCfg);
int metaSaveSmaToDB(SMeta* pMeta, STSma* pTbCfg);
int metaRemoveSmaFromDb(SMeta* pMeta, const char* indexName);
// SMetaCache
......
......@@ -41,55 +41,4 @@ static FORCE_INLINE int32_t tsdbEncodeTSmaKey(uint64_t tableUid, col_id_t colId,
return len;
}
#if 0
typedef struct {
int minFid;
int midFid;
int maxFid;
TSKEY minKey;
} SRtn;
typedef struct {
uint64_t uid;
int64_t offset;
int64_t size;
} SKVRecord;
void tsdbGetRtnSnap(STsdb *pRepo, SRtn *pRtn);
static FORCE_INLINE int TSDB_KEY_FID(TSKEY key, int32_t days, int8_t precision) {
if (key < 0) {
return (int)((key + 1) / tsTickPerDay[precision] / days - 1);
} else {
return (int)((key / tsTickPerDay[precision] / days));
}
}
static FORCE_INLINE int tsdbGetFidLevel(int fid, SRtn *pRtn) {
if (fid >= pRtn->maxFid) {
return 0;
} else if (fid >= pRtn->midFid) {
return 1;
} else if (fid >= pRtn->minFid) {
return 2;
} else {
return -1;
}
}
#define TSDB_DEFAULT_BLOCK_ROWS(maxRows) ((maxRows)*4 / 5)
int tsdbEncodeKVRecord(void **buf, SKVRecord *pRecord);
void *tsdbDecodeKVRecord(void *buf, SKVRecord *pRecord);
void *tsdbCommitData(STsdbRepo *pRepo);
int tsdbApplyRtnOnFSet(STsdbRepo *pRepo, SDFileSet *pSet, SRtn *pRtn);
int tsdbWriteBlockInfoImpl(SDFile *pHeadf, STable *pTable, SArray *pSupA, SArray *pSubA, void **ppBuf, SBlockIdx *pIdx);
int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf);
int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDataCols *pDataCols, SBlock *pBlock,
bool isLast, bool isSuper, void **ppBuf, void **ppCBuf);
int tsdbApplyRtn(STsdbRepo *pRepo);
#endif
#endif /* _TD_TSDB_SMA_H_ */
\ No newline at end of file
......@@ -226,7 +226,7 @@ int metaRemoveTableFromDb(SMeta *pMeta, tb_uid_t uid) {
return 0;
}
int metaSaveSmaToDB(SMeta *pMeta, SSmaCfg *pSmaCfg) {
int metaSaveSmaToDB(SMeta *pMeta, STSma *pSmaCfg) {
char buf[512] = {0}; // TODO: may overflow
void *pBuf = NULL;
DBT key1 = {0}, value1 = {0};
......@@ -485,7 +485,7 @@ static int metaCtbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey
}
static int metaSmaIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey) {
SSmaCfg *pSmaCfg = (SSmaCfg *)(pValue->app_data);
STSma *pSmaCfg = (STSma *)(pValue->app_data);
memset(pSKey, 0, sizeof(*pSKey));
pSKey->data = &(pSmaCfg->tableUid);
......@@ -609,8 +609,8 @@ STbCfg *metaGetTbInfoByName(SMeta *pMeta, char *tbname, tb_uid_t *uid) {
return pTbCfg;
}
SSmaCfg *metaGetSmaInfoByName(SMeta *pMeta, const char *indexName) {
SSmaCfg *pCfg = NULL;
STSma *metaGetSmaInfoByName(SMeta *pMeta, const char *indexName) {
STSma * pCfg = NULL;
SMetaDB *pDB = pMeta->pDB;
DBT key = {0};
DBT value = {0};
......@@ -629,7 +629,7 @@ SSmaCfg *metaGetSmaInfoByName(SMeta *pMeta, const char *indexName) {
}
// Decode
pCfg = (SSmaCfg *)malloc(sizeof(SSmaCfg));
pCfg = (STSma *)malloc(sizeof(STSma));
if (pCfg == NULL) {
return NULL;
}
......@@ -885,8 +885,8 @@ STSmaWrapper *metaGetSmaInfoByUid(SMeta *pMeta, tb_uid_t uid) {
return NULL;
}
DBT skey = {.data = &(pCur->uid)};
DBT pval = {.size = sizeof(pCur->uid)};
DBT skey = {.data = &(pCur->uid), .size = sizeof(pCur->uid)};
DBT pval = {0};
void *pBuf = NULL;
while (true) {
......@@ -918,6 +918,45 @@ STSmaWrapper *metaGetSmaInfoByUid(SMeta *pMeta, tb_uid_t uid) {
return pSW;
}
SArray *metaGetSmaTbUids(SMeta *pMeta, bool isDup) {
SArray * pUids = NULL;
SMetaDB *pDB = pMeta->pDB;
DBC * pCur = NULL;
DBT pkey = {0}, pval = {0};
int ret;
pUids = taosArrayInit(16, sizeof(tb_uid_t));
if (!pUids) {
return NULL;
}
// TODO: lock?
ret = pDB->pCtbIdx->cursor(pDB->pSmaIdx, NULL, &pCur, 0);
if (ret != 0) {
taosArrayDestroy(pUids);
return NULL;
}
void *pBuf = NULL;
// TODO: lock?
while (true) {
ret = pCur->get(pCur, &pkey, &pval, isDup ? DB_NEXT_DUP : DB_NEXT_NODUP);
if(ret == 0) {
taosArrayPush(pUids, pkey.data);
continue;
}
break;
}
if (pCur) {
pCur->close(pCur);
}
return pUids;
}
static void metaDBWLock(SMetaDB *pDB) {
#if IMPL_WITH_LOCK
pthread_rwlock_wrlock(&(pDB->rwlock));
......
......@@ -107,19 +107,27 @@ int metaRemoveTableFromIdx(SMeta *pMeta, tb_uid_t uid) {
return 0;
}
int metaCreateSma(SMeta *pMeta, SSmaCfg *pSmaCfg) {
// Validate the tbOptions
// if (metaValidateTbCfg(pMeta, pTbCfg) < 0) {
// // TODO: handle error
// return -1;
// }
int32_t metaCreateTSma(SMeta *pMeta, SSmaCfg *pCfg) {
// TODO: Validate the cfg
// The table uid should exists and be super table or common table.
// Check other cfg value
// TODO: add atomicity
if (metaSaveSmaToDB(pMeta, pSmaCfg) < 0) {
if (metaSaveSmaToDB(pMeta, &pCfg->tSma) < 0) {
// TODO: handle error
return -1;
}
return TSDB_CODE_SUCCESS;
}
return 0;
int32_t metaDropTSma(SMeta *pMeta, char* indexName) {
// TODO: Validate the cfg
// TODO: add atomicity
if (metaRemoveSmaFromDb(pMeta, indexName) < 0) {
// TODO: handle error
return -1;
}
return TSDB_CODE_SUCCESS;
}
\ No newline at end of file
......@@ -50,3 +50,4 @@ int metaDropTable(SMeta *pMeta, tb_uid_t uid) {
return 0;
}
......@@ -69,7 +69,7 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
// TODO: handle error
}
// TODO: maybe need to clear the requst struct
// TODO: maybe need to clear the request struct
free(vCreateTbReq.stbCfg.pSchema);
free(vCreateTbReq.stbCfg.pTagSchema);
free(vCreateTbReq.name);
......@@ -133,13 +133,44 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
}
} break;
case TDMT_VND_CREATE_SMA: { // timeRangeSMA
// 1. tdCreateSmaMeta(pVnode->pMeta,...);
// 2. tdCreateSmaDataInit();
// 3. tdCreateSmaData
SSmaCfg vCreateSmaReq = {0};
if (tDeserializeSVCreateTSmaReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vCreateSmaReq) == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
if (metaCreateTSma(pVnode->pMeta, &vCreateSmaReq) < 0) {
// TODO: handle error
tdDestroyTSma(&vCreateSmaReq.tSma);
return -1;
}
// TODO: send msg to stream computing to create tSma
// if ((send msg to stream computing) < 0) {
// tdDestroyTSma(&vCreateSmaReq);
// return -1;
// }
tdDestroyTSma(&vCreateSmaReq.tSma);
// TODO: return directly or go on follow steps?
} break;
case TDMT_VND_CANCEL_SMA: { // timeRangeSMA
} break;
case TDMT_VND_DROP_SMA: { // timeRangeSMA
SVDropTSmaReq vDropSmaReq = {0};
if (tDeserializeSVDropTSmaReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vDropSmaReq) == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
if (metaDropTSma(pVnode->pMeta, vDropSmaReq.indexName) < 0) {
// TODO: handle error
return -1;
}
// TODO: send msg to stream computing to drop tSma
// if ((send msg to stream computing) < 0) {
// tdDestroyTSma(&vCreateSmaReq);
// return -1;
// }
// TODO: return directly or go on follow steps?
} break;
default:
ASSERT(0);
......
......@@ -103,6 +103,7 @@ TEST(testCase, tSma_DB_Put_Get_Del_Test) {
const char *smaIndexName2 = "sma_index_test_2";
const char *smaTestDir = "./smaTest";
const uint64_t tbUid = 1234567890;
const uint32_t nCntTSma = 2;
// encode
STSma tSma = {0};
tSma.version = 0;
......@@ -125,7 +126,7 @@ TEST(testCase, tSma_DB_Put_Get_Del_Test) {
}
SMeta * pMeta = NULL;
SSmaCfg * pSmaCfg = &tSma;
STSma * pSmaCfg = &tSma;
const SMetaCfg *pMetaCfg = &defaultMetaOptions;
taosRemoveDir(smaTestDir);
......@@ -146,14 +147,14 @@ TEST(testCase, tSma_DB_Put_Get_Del_Test) {
metaSaveSmaToDB(pMeta, pSmaCfg);
// get value by indexName
SSmaCfg *qSmaCfg = NULL;
STSma *qSmaCfg = NULL;
qSmaCfg = metaGetSmaInfoByName(pMeta, smaIndexName1);
assert(qSmaCfg != NULL);
printf("name1 = %s\n", qSmaCfg->indexName);
EXPECT_STRCASEEQ(qSmaCfg->indexName, smaIndexName1);
EXPECT_EQ(qSmaCfg->tableUid, tSma.tableUid);
tdDestroyTSma(qSmaCfg);
free(qSmaCfg);
tfree(qSmaCfg);
qSmaCfg = metaGetSmaInfoByName(pMeta, smaIndexName2);
assert(qSmaCfg != NULL);
......@@ -161,7 +162,7 @@ TEST(testCase, tSma_DB_Put_Get_Del_Test) {
EXPECT_STRCASEEQ(qSmaCfg->indexName, smaIndexName2);
EXPECT_EQ(qSmaCfg->interval, tSma.interval);
tdDestroyTSma(qSmaCfg);
free(qSmaCfg);
tfree(qSmaCfg);
// get index name by table uid
SMSmaCursor *pSmaCur = metaOpenSmaCursor(pMeta, tbUid);
......@@ -175,18 +176,31 @@ TEST(testCase, tSma_DB_Put_Get_Del_Test) {
printf("indexName = %s\n", indexName);
++indexCnt;
}
EXPECT_EQ(indexCnt, 2);
EXPECT_EQ(indexCnt, nCntTSma);
metaCloseSmaCurosr(pSmaCur);
// get wrapper by table uid
STSmaWrapper *pSW = metaGetSmaInfoByUid(pMeta, tbUid);
assert(pSW != NULL);
EXPECT_EQ(pSW->number, 2);
EXPECT_EQ(pSW->number, nCntTSma);
EXPECT_STRCASEEQ(pSW->tSma->indexName, smaIndexName1);
EXPECT_EQ(pSW->tSma->tableUid, tSma.tableUid);
EXPECT_STRCASEEQ((pSW->tSma + 1)->indexName, smaIndexName2);
EXPECT_EQ((pSW->tSma + 1)->tableUid, tSma.tableUid);
tdDestroyTSmaWrapper(pSW);
tfree(pSW);
// get all sma table uids
SArray *pUids = metaGetSmaTbUids(pMeta, false);
assert(pUids != NULL);
for (uint32_t i = 0; i < taosArrayGetSize(pUids); ++i) {
printf("metaGetSmaTbUids: uid[%" PRIu32 "] = %" PRIi64 "\n", i, *(tb_uid_t *)taosArrayGet(pUids, i));
// printf("metaGetSmaTbUids: index[%" PRIu32 "] = %s", i, (char *)taosArrayGet(pUids, i));
}
EXPECT_EQ(taosArrayGetSize(pUids), 1);
taosArrayDestroy(pUids);
// resource release
metaRemoveSmaFromDb(pMeta, smaIndexName1);
metaRemoveSmaFromDb(pMeta, smaIndexName2);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册