diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 24ed795a021be733a101d4fd4b326af69a1a1eb3..544af9b6eed46cde117815a66e0f99e1e60c0322 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1764,17 +1764,15 @@ int32_t tDecodeSVDropTbBatchRsp(SDecoder* pCoder, SVDropTbBatchRsp* pRsp); typedef struct { const char* tbName; int8_t action; + const char* colName; // TSDB_ALTER_TABLE_ADD_COLUMN - int8_t type; - int32_t bytes; - const char* colAddName; + int8_t type; + int8_t flags; + int32_t bytes; // TSDB_ALTER_TABLE_DROP_COLUMN - const char* colDropName; // TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES - const char* colModName; - int32_t colModBytes; + int32_t colModBytes; // TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME - const char* colOldName; const char* colNewName; // TSDB_ALTER_TABLE_UPDATE_TAG_VAL const char* tagName; diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 96fb210e26c1c1bbc88646c4e04a780d3c08b482..32bda084ed5ab1ddac400354264f18545d88e001 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -323,6 +323,9 @@ int32_t* taosGetErrno(); #define TSDB_CODE_VND_SMA_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0516) #define TSDB_CODE_VND_HASH_MISMATCH TAOS_DEF_ERROR_CODE(0, 0x0517) #define TSDB_CODE_VND_TABLE_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0518) +#define TSDB_CODE_VND_INVALID_TABLE_ACTION TAOS_DEF_ERROR_CODE(0, 0x0519) +#define TSDB_CODE_VND_COL_ALREADY_EXISTS TAOS_DEF_ERROR_CODE(0, 0x051a) +#define TSDB_CODE_VND_TABLE_COL_NOT_EXISTS TAOS_DEF_ERROR_CODE(0, 0x051b) // tsdb #define TSDB_CODE_TDB_INVALID_TABLE_ID TAOS_DEF_ERROR_CODE(0, 0x0600) diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index e539aa34674d36000fd32c9639f140ad99360a47..52edd79f3bacefe10cd02e0b139d87a95fabc827 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -4171,19 +4171,20 @@ int32_t tEncodeSVAlterTbReq(SEncoder *pEncoder, const SVAlterTbReq *pReq) { if (tEncodeI8(pEncoder, pReq->action) < 0) return -1; switch (pReq->action) { case TSDB_ALTER_TABLE_ADD_COLUMN: + if (tEncodeCStr(pEncoder, pReq->colName) < 0) return -1; if (tEncodeI8(pEncoder, pReq->type) < 0) return -1; + if (tEncodeI8(pEncoder, pReq->flags) < 0) return -1; if (tEncodeI32v(pEncoder, pReq->bytes) < 0) return -1; - if (tEncodeCStr(pEncoder, pReq->colAddName) < 0) return -1; break; case TSDB_ALTER_TABLE_DROP_COLUMN: - if (tEncodeCStr(pEncoder, pReq->colDropName) < 0) return -1; + if (tEncodeCStr(pEncoder, pReq->colName) < 0) return -1; break; case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES: - if (tEncodeCStr(pEncoder, pReq->colModName) < 0) return -1; + if (tEncodeCStr(pEncoder, pReq->colName) < 0) return -1; if (tEncodeI32v(pEncoder, pReq->colModBytes) < 0) return -1; break; case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME: - if (tEncodeCStr(pEncoder, pReq->colOldName) < 0) return -1; + if (tEncodeCStr(pEncoder, pReq->colName) < 0) return -1; if (tEncodeCStr(pEncoder, pReq->colNewName) < 0) return -1; break; case TSDB_ALTER_TABLE_UPDATE_TAG_VAL: @@ -4216,21 +4217,23 @@ int32_t tDecodeSVAlterTbReq(SDecoder *pDecoder, SVAlterTbReq *pReq) { if (tDecodeCStr(pDecoder, &pReq->tbName) < 0) return -1; if (tDecodeI8(pDecoder, &pReq->action) < 0) return -1; + if (tDecodeCStr(pDecoder, &pReq->colName) < 0) return -1; switch (pReq->action) { case TSDB_ALTER_TABLE_ADD_COLUMN: + if (tDecodeCStr(pDecoder, &pReq->colName) < 0) return -1; if (tDecodeI8(pDecoder, &pReq->type) < 0) return -1; + if (tDecodeI8(pDecoder, &pReq->flags) < 0) return -1; if (tDecodeI32v(pDecoder, &pReq->bytes) < 0) return -1; - if (tDecodeCStr(pDecoder, &pReq->colAddName) < 0) return -1; break; case TSDB_ALTER_TABLE_DROP_COLUMN: - if (tDecodeCStr(pDecoder, &pReq->colDropName) < 0) return -1; + if (tDecodeCStr(pDecoder, &pReq->colName) < 0) return -1; break; case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES: - if (tDecodeCStr(pDecoder, &pReq->colModName) < 0) return -1; + if (tDecodeCStr(pDecoder, &pReq->colName) < 0) return -1; if (tDecodeI32v(pDecoder, &pReq->colModBytes) < 0) return -1; break; case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME: - if (tDecodeCStr(pDecoder, &pReq->colOldName) < 0) return -1; + if (tDecodeCStr(pDecoder, &pReq->colName) < 0) return -1; if (tDecodeCStr(pDecoder, &pReq->colNewName) < 0) return -1; break; case TSDB_ALTER_TABLE_UPDATE_TAG_VAL: diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index a2be393eb2b8d9149f863e1f58b5dcfa179f2d5b..4431a2c48bf81ff9f4d46e377774ce44be4b3bc3 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -189,6 +189,7 @@ struct SMetaEntry { struct { int64_t ctime; int32_t ttlDays; + int32_t ncid; // next column id SSchemaWrapper schema; } ntbEntry; struct { diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index a217a460363689ecd58667b702b3aacc6ac3fe39..2018f0a68ca956937c2cf1ab30034c616dd38624 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -83,6 +83,7 @@ int metaAlterSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* p int metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq); int metaCreateTable(SMeta* pMeta, int64_t version, SVCreateTbReq* pReq); int metaDropTable(SMeta* pMeta, int64_t version, SVDropTbReq* pReq); +int metaAlterTable(SMeta* pMeta, int64_t version, SVAlterTbReq* pReq); SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, bool isinline); STSchema* metaGetTbTSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver); int metaGetTableEntryByName(SMetaReader* pReader, const char* name); @@ -97,7 +98,7 @@ int32_t metaCreateTSma(SMeta* pMeta, int64_t version, SSmaCfg* pCfg); int32_t metaDropTSma(SMeta* pMeta, int64_t indexUid); // tsdb -int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeepCfg *pKeepCfg); +int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeepCfg* pKeepCfg); int tsdbClose(STsdb** pTsdb); int tsdbBegin(STsdb* pTsdb); int tsdbCommit(STsdb* pTsdb); @@ -182,7 +183,7 @@ typedef enum { TSDB_TYPE_RSMA_L2 = 4, // RSMA Level 2 } ETsdbType; -struct STsdbKeepCfg{ +struct STsdbKeepCfg { int8_t precision; // precision always be used with below keep cfgs int32_t days; int32_t keep0; diff --git a/source/dnode/vnode/src/meta/metaEntry.c b/source/dnode/vnode/src/meta/metaEntry.c index dd36906b19862cd00c5b7d1de90b8aa4e2bd4b45..2bc0d7517db42084ef5aeb22785b189427ae8a8c 100644 --- a/source/dnode/vnode/src/meta/metaEntry.c +++ b/source/dnode/vnode/src/meta/metaEntry.c @@ -34,6 +34,7 @@ int metaEncodeEntry(SEncoder *pCoder, const SMetaEntry *pME) { } else if (pME->type == TSDB_NORMAL_TABLE) { if (tEncodeI64(pCoder, pME->ntbEntry.ctime) < 0) return -1; if (tEncodeI32(pCoder, pME->ntbEntry.ttlDays) < 0) return -1; + if (tEncodeI32v(pCoder, pME->ntbEntry.ncid) < 0) return -1; if (tEncodeSSchemaWrapper(pCoder, &pME->ntbEntry.schema) < 0) return -1; } else if (pME->type == TSDB_TSMA_TABLE) { if (tEncodeTSma(pCoder, pME->smaEntry.tsma) < 0) return -1; @@ -65,10 +66,11 @@ int metaDecodeEntry(SDecoder *pCoder, SMetaEntry *pME) { } else if (pME->type == TSDB_NORMAL_TABLE) { if (tDecodeI64(pCoder, &pME->ntbEntry.ctime) < 0) return -1; if (tDecodeI32(pCoder, &pME->ntbEntry.ttlDays) < 0) return -1; + if (tDecodeI32v(pCoder, &pME->ntbEntry.ncid) < 0) return -1; if (tDecodeSSchemaWrapper(pCoder, &pME->ntbEntry.schema) < 0) return -1; } else if (pME->type == TSDB_TSMA_TABLE) { if (tDecodeTSma(pCoder, pME->smaEntry.tsma) < 0) return -1; - } else { + } else { ASSERT(0); } diff --git a/source/dnode/vnode/src/meta/metaTable.c b/source/dnode/vnode/src/meta/metaTable.c index e5377d543a8a958996ff0310687dbcab7ee50d83..e61064fe6701d9d6abc3665b2ae87d0cb2eda9f3 100644 --- a/source/dnode/vnode/src/meta/metaTable.c +++ b/source/dnode/vnode/src/meta/metaTable.c @@ -240,6 +240,7 @@ int metaCreateTable(SMeta *pMeta, int64_t version, SVCreateTbReq *pReq) { me.ntbEntry.ctime = pReq->ctime; me.ntbEntry.ttlDays = pReq->ttl; me.ntbEntry.schema = pReq->ntb.schema; + me.ntbEntry.ncid = me.ntbEntry.schema.pSchema[me.ntbEntry.schema.nCols - 1].colId + 1; } if (metaHandleEntry(pMeta, &me) < 0) goto _err; @@ -374,6 +375,170 @@ int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq) { return 0; } +static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAlterTbReq) { + void *pVal = NULL; + int nVal = 0; + const void *pData = NULL; + int nData = 0; + int ret = 0; + tb_uid_t uid; + int64_t oversion; + SSchema *pColumn = NULL; + SMetaEntry entry = {0}; + SSchemaWrapper *pSchema; + int c; + + // search name index + ret = tdbDbGet(pMeta->pNameIdx, pAlterTbReq->tbName, strlen(pAlterTbReq->tbName) + 1, &pVal, &nVal); + if (ret < 0) { + terrno = TSDB_CODE_VND_TABLE_NOT_EXIST; + return -1; + } + + uid = *(tb_uid_t *)pVal; + tdbFree(pVal); + pVal = NULL; + + // search uid index + TDBC *pUidIdxc = NULL; + + tdbDbcOpen(pMeta->pUidIdx, &pUidIdxc, &pMeta->txn); + tdbDbcMoveTo(pUidIdxc, &uid, sizeof(uid), &c); + ASSERT(c == 0); + + tdbDbcGet(pUidIdxc, NULL, NULL, &pData, &nData); + oversion = *(int64_t *)pData; + + // search table.db + TDBC *pTbDbc = NULL; + + tdbDbcOpen(pMeta->pTbDb, &pTbDbc, &pMeta->txn); + tdbDbcMoveTo(pTbDbc, &((STbDbKey){.uid = uid, .version = oversion}), sizeof(STbDbKey), &c); + ASSERT(c == 0); + tdbDbcGet(pTbDbc, NULL, NULL, &pData, &nData); + + // get table entry + SDecoder dc = {0}; + tDecoderInit(&dc, pData, nData); + metaDecodeEntry(&dc, &entry); + + if (entry.type != TSDB_NORMAL_TABLE) { + terrno = TSDB_CODE_VND_INVALID_TABLE_ACTION; + goto _err; + } + + // search the column to add/drop/update + pSchema = &entry.ntbEntry.schema; + int32_t iCol = 0; + for (;;) { + pColumn = NULL; + + if (iCol >= pSchema->nCols) break; + pColumn = &pSchema->pSchema[iCol]; + + if (strcmp(pColumn->name, pAlterTbReq->colName) == 0) break; + iCol++; + } + + entry.version = version; + int tlen; + switch (pAlterTbReq->action) { + case TSDB_ALTER_TABLE_ADD_COLUMN: + if (pColumn) { + terrno = TSDB_CODE_VND_COL_ALREADY_EXISTS; + goto _err; + } + pSchema->sver++; + pSchema->nCols++; + pSchema->pSchema = + taosMemoryRealloc(entry.ntbEntry.schema.pSchema, sizeof(SSchema) * entry.ntbEntry.schema.nCols); + pSchema->pSchema[entry.ntbEntry.schema.nCols - 1].bytes = pAlterTbReq->bytes; + pSchema->pSchema[entry.ntbEntry.schema.nCols - 1].type = pAlterTbReq->type; + pSchema->pSchema[entry.ntbEntry.schema.nCols - 1].flags = pAlterTbReq->flags; + pSchema->pSchema[entry.ntbEntry.schema.nCols - 1].colId = entry.ntbEntry.ncid++; + strcpy(pSchema->pSchema[entry.ntbEntry.schema.nCols - 1].name, pAlterTbReq->colName); + break; + case TSDB_ALTER_TABLE_DROP_COLUMN: + if (pColumn == NULL) { + terrno = TSDB_CODE_VND_TABLE_COL_NOT_EXISTS; + goto _err; + } + if (pColumn->colId == 0) { + terrno = TSDB_CODE_VND_INVALID_TABLE_ACTION; + goto _err; + } + pSchema->sver++; + pSchema->nCols--; + tlen = (pSchema->nCols - iCol - 1) * sizeof(SSchema); + if (tlen) { + memmove(pColumn, pColumn + 1, tlen); + } + break; + case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES: + if (pColumn == NULL) { + terrno = TSDB_CODE_VND_TABLE_COL_NOT_EXISTS; + goto _err; + } + if (!IS_VAR_DATA_TYPE(pColumn->type) || pColumn->bytes <= pAlterTbReq->bytes) { + terrno = TSDB_CODE_VND_INVALID_TABLE_ACTION; + goto _err; + } + pSchema->sver++; + pColumn->bytes = pAlterTbReq->bytes; + break; + case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME: + if (pColumn == NULL) { + terrno = TSDB_CODE_VND_TABLE_COL_NOT_EXISTS; + goto _err; + } + pSchema->sver++; + strcpy(pColumn->name, pAlterTbReq->colNewName); + break; + } + + entry.version = version; + + tDecoderClear(&dc); + tdbDbcClose(pTbDbc); + tdbDbcClose(pUidIdxc); + return 0; + +_err: + tDecoderClear(&dc); + tdbDbcClose(pTbDbc); + tdbDbcClose(pUidIdxc); + return -1; +} + +static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pAlterTbReq) { + // TODO + return 0; +} + +static int metaUpdateTableOptions(SMeta *pMeta, int64_t version, SVAlterTbReq *pAlterTbReq) { + // TODO + ASSERT(0); + return 0; +} + +int metaAlterTable(SMeta *pMeta, int64_t version, SVAlterTbReq *pReq) { + switch (pReq->action) { + case TSDB_ALTER_TABLE_ADD_COLUMN: + case TSDB_ALTER_TABLE_DROP_COLUMN: + case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES: + case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME: + return metaAlterTableColumn(pMeta, version, pReq); + case TSDB_ALTER_TABLE_UPDATE_TAG_VAL: + return metaUpdateTableTagVal(pMeta, version, pReq); + case TSDB_ALTER_TABLE_UPDATE_OPTIONS: + return metaUpdateTableOptions(pMeta, version, pReq); + default: + terrno = TSDB_CODE_VND_INVALID_TABLE_ACTION; + return -1; + break; + } +} + static int metaSaveToTbDb(SMeta *pMeta, const SMetaEntry *pME) { STbDbKey tbDbKey; void *pKey = NULL; diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index d4363f1ea0dbb84665493ec33c7ee2e03c97db76..ac49a0b0786fe9e5ffc37d501df8b1de5e528bea 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -19,7 +19,7 @@ static int vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, static int vnodeProcessAlterStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); static int vnodeProcessDropStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp); -static int vnodeProcessAlterTbReq(SVnode *pVnode, void *pReq, int32_t len, SRpcMsg *pRsp); +static int vnodeProcessAlterTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); static int vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); static int vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp); @@ -82,7 +82,7 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg if (vnodeProcessCreateTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err; break; case TDMT_VND_ALTER_TABLE: - if (vnodeProcessAlterTbReq(pVnode, pReq, len, pRsp) < 0) goto _err; + if (vnodeProcessAlterTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err; break; case TDMT_VND_DROP_TABLE: if (vnodeProcessDropTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err; @@ -455,9 +455,32 @@ _exit: return 0; } -static int vnodeProcessAlterTbReq(SVnode *pVnode, void *pReq, int32_t len, SRpcMsg *pRsp) { - // TODO - ASSERT(0); +static int vnodeProcessAlterTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { + SVAlterTbReq vAlterTbReq = {0}; + SDecoder dc = {0}; + + pRsp->msgType = TDMT_VND_ALTER_TABLE_RSP; + pRsp->pCont = NULL; + pRsp->contLen = 0; + pRsp->code = TSDB_CODE_SUCCESS; + + tDecoderInit(&dc, pReq, len); + + // decode + if (tDecodeSVAlterTbReq(&dc, &vAlterTbReq) < 0) { + pRsp->code = TSDB_CODE_INVALID_MSG; + tDecoderClear(&dc); + return -1; + } + + // process + if (metaAlterTable(pVnode->pMeta, version, &vAlterTbReq) < 0) { + pRsp->code = terrno; + tDecoderClear(&dc); + return -1; + } + + tDecoderClear(&dc); return 0; } @@ -668,7 +691,7 @@ _exit: static int vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp) { SVCreateTSmaReq req = {0}; - SDecoder coder; + SDecoder coder; pRsp->msgType = TDMT_VND_CREATE_SMA_RSP; pRsp->code = TSDB_CODE_SUCCESS; @@ -682,7 +705,7 @@ static int vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq pRsp->code = terrno; goto _err; } - + if (metaCreateTSma(pVnode->pMeta, version, &req) < 0) { pRsp->code = terrno; goto _err; diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 8a2a60d3e2f451277a59fd19a826b57dac6c6d05..b5e64242e4e50179352debf1989a97f70108eef5 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -322,6 +322,10 @@ TAOS_DEFINE_ERROR(TSDB_CODE_VND_TB_NOT_EXIST, "Table not exists") TAOS_DEFINE_ERROR(TSDB_CODE_VND_SMA_NOT_EXIST, "SMA not exists") TAOS_DEFINE_ERROR(TSDB_CODE_VND_HASH_MISMATCH, "Hash value mismatch") TAOS_DEFINE_ERROR(TSDB_CODE_VND_TABLE_NOT_EXIST, "Table does not exists") +TAOS_DEFINE_ERROR(TSDB_CODE_VND_INVALID_TABLE_ACTION, "Invalid table action") +TAOS_DEFINE_ERROR(TSDB_CODE_VND_COL_ALREADY_EXISTS, "Table column already exists") +TAOS_DEFINE_ERROR(TSDB_CODE_VND_TABLE_COL_NOT_EXISTS, "Table column not exists") + // tsdb TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_TABLE_ID, "Invalid table ID")