diff --git a/src/inc/taoserror.h b/src/inc/taoserror.h index db910c9caea9331c888fc5181823682a91441fe7..474eeb4f8663581b7e45b4c5c32539e65c9c2fe6 100644 --- a/src/inc/taoserror.h +++ b/src/inc/taoserror.h @@ -172,6 +172,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_FILE_FORMAT, 0, 0x0500, "invalid file // TSDB TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_CONFIG, 0, 0x0580, "invalid TSDB configuration") +TAOS_DEFINE_ERROR(TSDB_CODE_TAG_VER_OUT_OF_DATE, 0, 0x0581, "tag version is out of date") #ifdef TAOS_ERROR_C diff --git a/src/inc/tsdb.h b/src/inc/tsdb.h index 3680a46d854ceab325f40c3ec845d089f6464893..a2b5b44878ac6a421cc0f1141276d706cb16cd24 100644 --- a/src/inc/tsdb.h +++ b/src/inc/tsdb.h @@ -115,6 +115,7 @@ STableCfg *tsdbCreateTableCfgFromMsg(SMDCreateTableMsg *pMsg); int tsdbCreateTable(TsdbRepoT *repo, STableCfg *pCfg); int tsdbDropTable(TsdbRepoT *pRepo, STableId tableId); int tsdbAlterTable(TsdbRepoT *repo, STableCfg *pCfg); +int tsdbUpdateTagValue(TsdbRepoT *repo, SUpdateTableTagValMsg *pMsg); TSKEY tsdbGetTableLastKey(TsdbRepoT *repo, uint64_t uid); uint32_t tsdbGetFileInfo(TsdbRepoT *repo, char *name, uint32_t *index, int32_t *size); diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index e79873d1b91a53eb9d0857699e577c8c5728a56a..abce4e9a62801e82d591c4f2c4470e32bed28750 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -410,6 +410,36 @@ int tsdbAlterTable(TsdbRepoT *pRepo, STableCfg *pCfg) { return 0; } +int tsdbUpdateTagValue(TsdbRepoT *repo, SUpdateTableTagValMsg *pMsg) { + STsdbRepo *pRepo = (STsdbRepo *)repo; + STsdbMeta *pMeta = pRepo->tsdbMeta; + int16_t tversion = htons(pMsg->tversion); + + STable *pTable = tsdbGetTableByUid(pMeta, htobe64(pMsg->uid)); + if (pTable == NULL) return TSDB_CODE_INVALID_TABLE_ID; + if (pTable->tableId.tid != htonl(pMsg->tid)) return TSDB_CODE_INVALID_TABLE_ID; + + if (pTable->type != TSDB_CHILD_TABLE) { + tsdbError("vgId:%d failed to update tag value of table %s since its type is %d", pRepo->config.tsdbId, + varDataVal(pTable->name), pTable->type); + return TSDB_CODE_INVALID_TABLE_TYPE; + } + + if (schemaVersion(tsdbGetTableTagSchema(pMeta, pTable)) > tversion) { + // TODO: Need to update + } + + if (schemaVersion(tsdbGetTableTagSchema(pMeta, pTable)) > tversion) { + tsdbError( + "vgId:%d failed to update tag value of table %s since version out of date, client tag version:%d server tag " + "version:%d", + pRepo->config.tsdbId, varDataVal(pTable->name), tversion, schemaVersion(pTable->tagSchema)); + return TSDB_CODE_TAG_VER_OUT_OF_DATE; + } + tdSetKVRowDataOfCol(&pTable->tagVal, htons(pMsg->colId), htons(pMsg->type), pMsg->data); + return TSDB_CODE_SUCCESS; +} + TSKEY tsdbGetTableLastKey(TsdbRepoT *repo, uint64_t uid) { STsdbRepo *pRepo = (STsdbRepo *)repo; diff --git a/src/vnode/src/vnodeWrite.c b/src/vnode/src/vnodeWrite.c index 73d58527b54412d602517c1bf1d055ea06d189ab..a605d20cac203943ed6a30ad1d556ef3d1053599 100644 --- a/src/vnode/src/vnodeWrite.c +++ b/src/vnode/src/vnodeWrite.c @@ -29,11 +29,12 @@ #include "tcq.h" static int32_t (*vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *, void *, SRspRet *); -static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *); -static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *); -static int32_t vnodeProcessDropTableMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *); -static int32_t vnodeProcessAlterTableMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *); -static int32_t vnodeProcessDropStableMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *); +static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *); +static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *); +static int32_t vnodeProcessDropTableMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *); +static int32_t vnodeProcessAlterTableMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *); +static int32_t vnodeProcessDropStableMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *); +static int32_t vnodeProcessUpdateTagValMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet); void vnodeInitWriteFp(void) { vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_SUBMIT] = vnodeProcessSubmitMsg; @@ -41,6 +42,7 @@ void vnodeInitWriteFp(void) { vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_DROP_TABLE] = vnodeProcessDropTableMsg; vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE] = vnodeProcessAlterTableMsg; vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_DROP_STABLE] = vnodeProcessDropStableMsg; + vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = vnodeProcessUpdateTagValMsg; } int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) { @@ -156,6 +158,10 @@ static int32_t vnodeProcessDropStableMsg(SVnodeObj *pVnode, void *pCont, SRspRet return code; } +static int32_t vnodeProcessUpdateTagValMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) { + return tsdbUpdateTagValue(pVnode->tsdb, (SUpdateTableTagValMsg *)pCont); +} + int vnodeWriteToQueue(void *param, void *data, int type) { SVnodeObj *pVnode = param; SWalHead *pHead = data;