提交 e28e92ea 编写于 作者: wmmhello's avatar wmmhello

feat:add ttl

上级 f2a19c98
......@@ -1806,6 +1806,8 @@ typedef struct SVCreateTbReq {
tb_uid_t uid;
int64_t ctime;
int32_t ttl;
int32_t commentLen;
char* comment;
int8_t type;
union {
struct {
......@@ -1823,6 +1825,7 @@ int tDecodeSVCreateTbReq(SDecoder* pCoder, SVCreateTbReq* pReq);
static FORCE_INLINE void tdDestroySVCreateTbReq(SVCreateTbReq* req) {
taosMemoryFreeClear(req->name);
taosMemoryFreeClear(req->comment);
if (req->type == TSDB_CHILD_TABLE) {
taosMemoryFreeClear(req->ctb.pTag);
} else if (req->type == TSDB_NORMAL_TABLE) {
......
......@@ -498,7 +498,7 @@ int32_t tSerializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pReq
}
if (pReq->commentLen > 0) {
if (tEncodeCStrWithLen(&encoder, pReq->comment, pReq->commentLen) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->comment) < 0) return -1;
}
if (pReq->ast1Len > 0) {
if (tEncodeBinary(&encoder, pReq->pAst1, pReq->ast1Len) < 0) return -1;
......@@ -561,7 +561,7 @@ int32_t tDeserializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pR
}
if (pReq->commentLen > 0) {
pReq->comment = taosMemoryCalloc(1, pReq->commentLen + 1);
pReq->comment = taosMemoryMalloc(pReq->commentLen);
if (pReq->comment == NULL) return -1;
if (tDecodeCStrTo(&decoder, pReq->comment) < 0) return -1;
}
......@@ -4321,6 +4321,10 @@ int tEncodeSVCreateTbReq(SEncoder *pCoder, const SVCreateTbReq *pReq) {
if (tEncodeI64(pCoder, pReq->ctime) < 0) return -1;
if (tEncodeI32(pCoder, pReq->ttl) < 0) return -1;
if (tEncodeI8(pCoder, pReq->type) < 0) return -1;
if (tEncodeI32(pCoder, pReq->commentLen) < 0) return -1;
if (pReq->commentLen > 0) {
if (tEncodeCStr(pCoder, pReq->comment) < 0) return -1;
}
if (pReq->type == TSDB_CHILD_TABLE) {
if (tEncodeI64(pCoder, pReq->ctb.suid) < 0) return -1;
......@@ -4344,6 +4348,12 @@ int tDecodeSVCreateTbReq(SDecoder *pCoder, SVCreateTbReq *pReq) {
if (tDecodeI64(pCoder, &pReq->ctime) < 0) return -1;
if (tDecodeI32(pCoder, &pReq->ttl) < 0) return -1;
if (tDecodeI8(pCoder, &pReq->type) < 0) return -1;
if (tDecodeI32(pCoder, &pReq->commentLen) < 0) return -1;
if (pReq->commentLen > 0) {
pReq->comment = taosMemoryMalloc(pReq->commentLen);
if (pReq->comment == NULL) return -1;
if (tDecodeCStrTo(pCoder, pReq->comment) < 0) return -1;
}
if (pReq->type == TSDB_CHILD_TABLE) {
if (tDecodeI64(pCoder, &pReq->ctb.suid) < 0) return -1;
......
......@@ -77,12 +77,52 @@ static void mndPullupTelem(SMnode *pMnode) {
tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
}
static void mndPushTtlTime(SMnode *pMnode) {
SSdb *pSdb = pMnode->pSdb;
SVgObj *pVgroup = NULL;
void *pIter = NULL;
while (1) {
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
if (pIter == NULL) break;
int32_t contLen = sizeof(SMsgHead) + sizeof(int32_t);
SMsgHead *pHead = taosMemoryMalloc(contLen);
if (pHead == NULL) {
mError("ttl time malloc err. contLen:%d", contLen);
sdbRelease(pSdb, pVgroup);
continue;
}
pHead->contLen = htonl(pHead->contLen);
pHead->vgId = htonl(pHead->vgId);
int32_t t = taosGetTimestampSec();
*(int32_t*)(pHead + sizeof(SMsgHead)) = htonl(t);
SRpcMsg rpcMsg = {.msgType = TDMT_VND_DROP_TTL_TABLE, .pCont = pHead, .contLen = contLen};
SEpSet epSet = mndGetVgroupEpset(pMnode, pVgroup);
int32_t code = tmsgSendReq(&epSet, &rpcMsg);
if(code != 0){
mError("ttl time seed err. code:%d", code);
}
mError("ttl time seed succ. time:%d", t);
sdbRelease(pSdb, pVgroup);
taosMemoryFree(pHead);
}
}
static void *mndThreadFp(void *param) {
SMnode *pMnode = param;
int64_t lastTime = 0;
setThreadName("mnode-timer");
while (1) {
if (lastTime % (100) == 0) { // sleep 1 day for ttl
mndPushTtlTime(pMnode);
}
lastTime++;
taosMsleep(100);
if (mndGetStop(pMnode)) break;
......@@ -98,6 +138,8 @@ static void *mndThreadFp(void *param) {
if (lastTime % (tsTelemInterval * 10) == 0) {
mndPullupTelem(pMnode);
}
}
return NULL;
......
......@@ -671,12 +671,12 @@ int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreat
pDst->numOfTags = pCreate->numOfTags;
pDst->commentLen = pCreate->commentLen;
if (pDst->commentLen > 0) {
pDst->comment = taosMemoryCalloc(pDst->commentLen + 1, 1);
pDst->comment = taosMemoryCalloc(pDst->commentLen, 1);
if (pDst->comment == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
memcpy(pDst->comment, pCreate->comment, pDst->commentLen + 1);
memcpy(pDst->comment, pCreate->comment, pDst->commentLen);
}
pDst->ast1Len = pCreate->ast1Len;
......@@ -1208,7 +1208,6 @@ static int32_t mndSetAlterStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
sdbRelease(pSdb, pVgroup);
return -1;
}
STransAction action = {0};
action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
action.pCont = pReq;
......
......@@ -210,12 +210,14 @@ struct SMetaEntry {
struct {
int64_t ctime;
int32_t ttlDays;
char *comment;
tb_uid_t suid;
uint8_t *pTags;
} ctbEntry;
struct {
int64_t ctime;
int32_t ttlDays;
char *comment;
int32_t ncid; // next column id
SSchemaWrapper schemaRow;
} ntbEntry;
......
......@@ -29,12 +29,14 @@ int metaEncodeEntry(SEncoder *pCoder, const SMetaEntry *pME) {
} else if (pME->type == TSDB_CHILD_TABLE) {
if (tEncodeI64(pCoder, pME->ctbEntry.ctime) < 0) return -1;
if (tEncodeI32(pCoder, pME->ctbEntry.ttlDays) < 0) return -1;
if (tEncodeCStr(pCoder, pME->ctbEntry.comment) < 0) return -1;
if (tEncodeI64(pCoder, pME->ctbEntry.suid) < 0) return -1;
debugCheckTags((STag*)pME->ctbEntry.pTags); // TODO: remove after debug
if (tEncodeTag(pCoder, (const STag *)pME->ctbEntry.pTags) < 0) return -1;
} else if (pME->type == TSDB_NORMAL_TABLE) {
if (tEncodeI64(pCoder, pME->ntbEntry.ctime) < 0) return -1;
if (tEncodeI32(pCoder, pME->ntbEntry.ttlDays) < 0) return -1;
if (tEncodeCStr(pCoder, pME->ntbEntry.comment) < 0) return -1;
if (tEncodeI32v(pCoder, pME->ntbEntry.ncid) < 0) return -1;
if (tEncodeSSchemaWrapper(pCoder, &pME->ntbEntry.schemaRow) < 0) return -1;
} else if (pME->type == TSDB_TSMA_TABLE) {
......@@ -61,12 +63,14 @@ int metaDecodeEntry(SDecoder *pCoder, SMetaEntry *pME) {
} else if (pME->type == TSDB_CHILD_TABLE) {
if (tDecodeI64(pCoder, &pME->ctbEntry.ctime) < 0) return -1;
if (tDecodeI32(pCoder, &pME->ctbEntry.ttlDays) < 0) return -1;
if (tDecodeCStr(pCoder, &pME->ctbEntry.comment) < 0) return -1;
if (tDecodeI64(pCoder, &pME->ctbEntry.suid) < 0) return -1;
if (tDecodeTag(pCoder, (STag **)&pME->ctbEntry.pTags) < 0) return -1; // (TODO)
debugCheckTags((STag*)pME->ctbEntry.pTags); // TODO: remove after debug
} else if (pME->type == TSDB_NORMAL_TABLE) {
if (tDecodeI64(pCoder, &pME->ntbEntry.ctime) < 0) return -1;
if (tDecodeI32(pCoder, &pME->ntbEntry.ttlDays) < 0) return -1;
if (tDecodeCStr(pCoder, &pME->ntbEntry.comment) < 0) return -1;
if (tDecodeI32v(pCoder, &pME->ntbEntry.ncid) < 0) return -1;
if (tDecodeSSchemaWrapperEx(pCoder, &pME->ntbEntry.schemaRow) < 0) return -1;
} else if (pME->type == TSDB_TSMA_TABLE) {
......
......@@ -320,11 +320,13 @@ int metaCreateTable(SMeta *pMeta, int64_t version, SVCreateTbReq *pReq) {
if (me.type == TSDB_CHILD_TABLE) {
me.ctbEntry.ctime = pReq->ctime;
me.ctbEntry.ttlDays = pReq->ttl;
me.ctbEntry.comment = pReq->comment;
me.ctbEntry.suid = pReq->ctb.suid;
me.ctbEntry.pTags = pReq->ctb.pTag;
} else {
me.ntbEntry.ctime = pReq->ctime;
me.ntbEntry.ttlDays = pReq->ttl;
me.ntbEntry.comment = pReq->comment;
me.ntbEntry.schemaRow = pReq->ntb.schemaRow;
me.ntbEntry.ncid = me.ntbEntry.schemaRow.pSchema[me.ntbEntry.schemaRow.nCols - 1].colId + 1;
}
......@@ -412,12 +414,11 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) {
void * pData = NULL;
int nData = 0;
int rc = 0;
int64_t version;
SMetaEntry e = {0};
SDecoder dc = {0};
rc = tdbTbGet(pMeta->pUidIdx, &uid, sizeof(uid), &pData, &nData);
version = *(int64_t *)pData;
int64_t version = *(int64_t *)pData;
tdbTbGet(pMeta->pTbDb, &(STbDbKey){.version = version, .uid = uid}, sizeof(STbDbKey), &pData, &nData);
......@@ -439,6 +440,7 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) {
// drop schema.db (todo)
}
metaError("ttl drop table:%s", e.name);
tDecoderClear(&dc);
tdbFree(pData);
......@@ -494,6 +496,7 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl
tDecoderInit(&dc, entry.pBuf, nData);
ret = metaDecodeEntry(&dc, &entry);
ASSERT(ret == 0);
tDecoderClear(&dc);
if (entry.type != TSDB_NORMAL_TABLE) {
terrno = TSDB_CODE_VND_INVALID_TABLE_ACTION;
......@@ -579,6 +582,7 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl
// save to table db
metaSaveToTbDb(pMeta, &entry);
tdbTbcOpen(pMeta->pUidIdx, &pUidIdxc, &pMeta->txn);
tdbTbcUpsert(pUidIdxc, &entry.uid, sizeof(tb_uid_t), &version, sizeof(version), 0);
metaSaveToSkmDb(pMeta, &entry);
......@@ -587,14 +591,14 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl
metaUpdateMetaRsp(uid, pAlterTbReq->tbName, pSchema, pMetaRsp);
if (entry.pBuf) taosMemoryFree(entry.pBuf);
if (pNewSchema) taosMemoryFree(pNewSchema);
tDecoderClear(&dc);
tdbTbcClose(pTbDbc);
tdbTbcClose(pUidIdxc);
return 0;
_err:
tDecoderClear(&dc);
if (entry.pBuf) taosMemoryFree(entry.pBuf);
tdbTbcClose(pTbDbc);
tdbTbcClose(pUidIdxc);
return -1;
......@@ -748,18 +752,84 @@ _err:
}
static int metaUpdateTableOptions(SMeta *pMeta, int64_t version, SVAlterTbReq *pAlterTbReq) {
if (commentLen > 0) {
pNew->commentLen = commentLen;
pNew->comment = taosMemoryCalloc(1, commentLen);
if (pNew->comment == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
void * pVal = NULL;
int nVal = 0;
const void * pData = NULL;
int nData = 0;
int ret = 0;
tb_uid_t uid;
int64_t oversion;
SMetaEntry entry = {0};
int c = 0;
// search name index
ret = tdbTbGet(pMeta->pNameIdx, pAlterTbReq->tbName, strlen(pAlterTbReq->tbName) + 1, &pVal, &nVal);
if (ret < 0) {
terrno = TSDB_CODE_VND_TABLE_NOT_EXIST;
return -1;
}
memcpy(pNew->comment, pComment, commentLen);
uid = *(tb_uid_t *)pVal;
tdbFree(pVal);
pVal = NULL;
// search uid index
TBC *pUidIdxc = NULL;
tdbTbcOpen(pMeta->pUidIdx, &pUidIdxc, &pMeta->txn);
tdbTbcMoveTo(pUidIdxc, &uid, sizeof(uid), &c);
ASSERT(c == 0);
tdbTbcGet(pUidIdxc, NULL, NULL, &pData, &nData);
oversion = *(int64_t *)pData;
// search table.db
TBC *pTbDbc = NULL;
tdbTbcOpen(pMeta->pTbDb, &pTbDbc, &pMeta->txn);
tdbTbcMoveTo(pTbDbc, &((STbDbKey){.uid = uid, .version = oversion}), sizeof(STbDbKey), &c);
ASSERT(c == 0);
tdbTbcGet(pTbDbc, NULL, NULL, &pData, &nData);
// get table entry
SDecoder dc = {0};
entry.pBuf = taosMemoryMalloc(nData);
memcpy(entry.pBuf, pData, nData);
tDecoderInit(&dc, entry.pBuf, nData);
ret = metaDecodeEntry(&dc, &entry);
ASSERT(ret == 0);
tDecoderClear(&dc);
entry.version = version;
metaWLock(pMeta);
// build SMetaEntry
if (entry.type == TSDB_CHILD_TABLE) {
if(pAlterTbReq->updateTTL) {
metaDeleteTtlIdx(pMeta, &entry);
entry.ctbEntry.ttlDays = pAlterTbReq->newTTL;
metaUpdateTtlIdx(pMeta, &entry);
}
if(pAlterTbReq->updateComment) entry.ctbEntry.comment = pAlterTbReq->newComment;
} else {
if(pAlterTbReq->updateTTL) {
metaDeleteTtlIdx(pMeta, &entry);
entry.ntbEntry.ttlDays = pAlterTbReq->newTTL;
metaUpdateTtlIdx(pMeta, &entry);
}
if (ttl >= 0) {
pNew->ttl = ttl;
if(pAlterTbReq->updateComment) entry.ntbEntry.comment = pAlterTbReq->newComment;
}
// save to table db
metaSaveToTbDb(pMeta, &entry);
tdbTbcOpen(pMeta->pUidIdx, &pUidIdxc, &pMeta->txn);
tdbTbcUpsert(pUidIdxc, &entry.uid, sizeof(tb_uid_t), &version, sizeof(version), 0);
metaULock(pMeta);
tdbTbcClose(pTbDbc);
tdbTbcClose(pUidIdxc);
if (entry.pBuf) taosMemoryFree(entry.pBuf);
return 0;
}
......
......@@ -398,7 +398,9 @@ static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t version, void *p
SArray *tbUids = taosArrayInit(8, sizeof(int64_t));
if (tbUids == NULL) return TSDB_CODE_OUT_OF_MEMORY;
int32_t ret = metaTtlDropTable(pVnode->pMeta, *(int64_t*)pReq, tbUids);
int32_t t = ntohl(*(int32_t*)pReq);
vError("rec ttl time:%d", t);
int32_t ret = metaTtlDropTable(pVnode->pMeta, t, tbUids);
if(ret != 0){
goto end;
}
......
......@@ -4253,6 +4253,13 @@ static int32_t buildNormalTableBatchReq(int32_t acctId, const SCreateTableStmt*
req.type = TD_NORMAL_TABLE;
req.name = strdup(pStmt->tableName);
req.ttl = pStmt->pOptions->ttl;
if ('\0' != pStmt->pOptions->comment[0]) {
req.comment = strdup(pStmt->pOptions->comment);
if (NULL == req.comment) {
return TSDB_CODE_OUT_OF_MEMORY;
}
req.commentLen = strlen(pStmt->pOptions->comment) + 1;
}
req.ntb.schemaRow.nCols = LIST_LENGTH(pStmt->pCols);
req.ntb.schemaRow.version = 1;
req.ntb.schemaRow.pSchema = taosMemoryCalloc(req.ntb.schemaRow.nCols, sizeof(SSchema));
......@@ -4404,6 +4411,10 @@ static void addCreateTbReqIntoVgroup(int32_t acctId, SHashObj* pVgroupHashmap, S
req.type = TD_CHILD_TABLE;
req.name = strdup(pStmt->tableName);
req.ttl = pStmt->pOptions->ttl;
if ('\0' != pStmt->pOptions->comment[0]) {
req.comment = strdup(pStmt->pOptions->comment);
req.commentLen = strlen(pStmt->pOptions->comment) + 1;
}
req.ctb.suid = suid;
req.ctb.pTag = (uint8_t*)pTag;
if (pStmt->ignoreExists) {
......@@ -4982,7 +4993,7 @@ static int32_t buildUpdateOptionsReq(STranslateContext* pCxt, SAlterTableStmt* p
}
}
if (TSDB_CODE_SUCCESS == code) {
if (TSDB_CODE_SUCCESS == code && '\0' != pStmt->pOptions->comment[0]) {
pReq->updateComment = true;
pReq->newComment = strdup(pStmt->pOptions->comment);
if (NULL == pReq->newComment) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册