提交 edbd19a6 编写于 作者: H Hongze Cheng

Merge branch '3.0' of https://github.com/taosdata/TDengine into feat/row_refact

......@@ -177,8 +177,8 @@ typedef struct {
typedef struct SField {
char name[TSDB_COL_NAME_LEN];
uint8_t type;
int32_t bytes;
int8_t flags;
int32_t bytes;
} SField;
typedef struct SRetention {
......@@ -1122,13 +1122,13 @@ typedef struct {
SSchema* pSchemas;
} STableMetaRsp;
typedef struct {
typedef struct {
STableMetaRsp* pMeta;
} SMAlterStbRsp;
int32_t tEncodeSMAlterStbRsp(SEncoder *pEncoder, const SMAlterStbRsp *pRsp);
int32_t tDecodeSMAlterStbRsp(SDecoder *pDecoder, SMAlterStbRsp *pRsp);
void tFreeSMAlterStbRsp(SMAlterStbRsp* pRsp);
int32_t tEncodeSMAlterStbRsp(SEncoder* pEncoder, const SMAlterStbRsp* pRsp);
int32_t tDecodeSMAlterStbRsp(SDecoder* pDecoder, SMAlterStbRsp* pRsp);
void tFreeSMAlterStbRsp(SMAlterStbRsp* pRsp);
int32_t tSerializeSTableMetaRsp(void* buf, int32_t bufLen, STableMetaRsp* pRsp);
int32_t tDeserializeSTableMetaRsp(void* buf, int32_t bufLen, STableMetaRsp* pRsp);
......@@ -2303,23 +2303,23 @@ typedef struct {
} SVgEpSet;
typedef struct {
int8_t version; // for compatibility(default 0)
int8_t intervalUnit; // MACRO: TIME_UNIT_XXX
int8_t slidingUnit; // MACRO: TIME_UNIT_XXX
int8_t timezoneInt; // sma data expired if timezone changes.
int32_t dstVgId;
char indexName[TSDB_INDEX_NAME_LEN];
int32_t exprLen;
int32_t tagsFilterLen;
int32_t numOfVgroups;
int64_t indexUid;
tb_uid_t tableUid; // super/child/common table uid
int64_t interval;
int64_t offset; // use unit by precision of DB
int64_t sliding;
char* expr; // sma expression
char* tagsFilter;
SVgEpSet vgEpSet[];
int8_t version; // for compatibility(default 0)
int8_t intervalUnit; // MACRO: TIME_UNIT_XXX
int8_t slidingUnit; // MACRO: TIME_UNIT_XXX
int8_t timezoneInt; // sma data expired if timezone changes.
int32_t dstVgId;
char indexName[TSDB_INDEX_NAME_LEN];
int32_t exprLen;
int32_t tagsFilterLen;
int32_t numOfVgroups;
int64_t indexUid;
tb_uid_t tableUid; // super/child/common table uid
int64_t interval;
int64_t offset; // use unit by precision of DB
int64_t sliding;
char* expr; // sma expression
char* tagsFilter;
SVgEpSet* pVgEpSet;
} STSma; // Time-range-wise SMA
typedef STSma SVCreateTSmaReq;
......@@ -2405,7 +2405,7 @@ static int32_t tDecodeTSmaWrapper(SDecoder* pDecoder, STSmaWrapper* pReq) {
}
typedef struct {
int64_t indexUid;
int64_t indexUid;
STimeWindow queryWindow;
} SVGetTsmaExpWndsReq;
......
......@@ -235,6 +235,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MND_TOO_MANY_COLUMNS TAOS_DEF_ERROR_CODE(0, 0x03AC)
#define TSDB_CODE_MND_COLUMN_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03AD)
#define TSDB_CODE_MND_COLUMN_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03AE)
#define TSDB_CODE_MND_FIELD_CONFLICT_WITH_TOPIC TAOS_DEF_ERROR_CODE(0, 0x03AF)
#define TSDB_CODE_MND_SINGLE_STB_MODE_DB TAOS_DEF_ERROR_CODE(0, 0x03B0)
// mnode-infoSchema
......
......@@ -694,7 +694,6 @@ void tFreeSMAltertbReq(SMAlterStbReq *pReq) {
pReq->pFields = NULL;
}
int32_t tSerializeSEpSet(void *buf, int32_t bufLen, const SEpSet *pEpset) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
......@@ -3674,12 +3673,12 @@ int32_t tEncodeTSma(SEncoder *pCoder, const STSma *pSma) {
if (tEncodeCStr(pCoder, pSma->tagsFilter) < 0) return -1;
}
for (int32_t v = 0; v < pSma->numOfVgroups; ++v) {
if (tEncodeI32(pCoder, pSma->vgEpSet[v].vgId) < 0) return -1;
if (tEncodeI8(pCoder, pSma->vgEpSet[v].epSet.inUse) < 0) return -1;
int8_t numOfEps = pSma->vgEpSet[v].epSet.numOfEps;
if (tEncodeI32(pCoder, pSma->pVgEpSet[v].vgId) < 0) return -1;
if (tEncodeI8(pCoder, pSma->pVgEpSet[v].epSet.inUse) < 0) return -1;
int8_t numOfEps = pSma->pVgEpSet[v].epSet.numOfEps;
if (tEncodeI8(pCoder, numOfEps) < 0) return -1;
for (int32_t n = 0; n < numOfEps; ++n) {
const SEp *pEp = &pSma->vgEpSet[v].epSet.eps[n];
const SEp *pEp = &pSma->pVgEpSet[v].epSet.eps[n];
if (tEncodeCStr(pCoder, pEp->fqdn) < 0) return -1;
if (tEncodeU16(pCoder, pEp->port) < 0) return -1;
}
......@@ -3712,15 +3711,25 @@ int32_t tDecodeTSma(SDecoder *pCoder, STSma *pSma) {
} else {
pSma->tagsFilter = NULL;
}
for (int32_t v = 0; v < pSma->numOfVgroups; ++v) {
if (tDecodeI32(pCoder, &pSma->vgEpSet[v].vgId) < 0) return -1;
if (tDecodeI8(pCoder, &pSma->vgEpSet[v].epSet.inUse) < 0) return -1;
if (tDecodeI8(pCoder, &pSma->vgEpSet[v].epSet.numOfEps) < 0) return -1;
int8_t numOfEps = pSma->vgEpSet[v].epSet.numOfEps;
for (int32_t n = 0; n < numOfEps; ++n) {
SEp *pEp = &pSma->vgEpSet[v].epSet.eps[n];
if (tDecodeCStrTo(pCoder, pEp->fqdn) < 0) return -1;
if (tDecodeU16(pCoder, &pEp->port) < 0) return -1;
if (pSma->numOfVgroups > 0) {
pSma->pVgEpSet = (SVgEpSet *)tDecoderMalloc(pCoder, pSma->numOfVgroups * sizeof(SVgEpSet));
if (!pSma->pVgEpSet) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
memset(pSma->pVgEpSet, 0, pSma->numOfVgroups * sizeof(SVgEpSet));
for (int32_t v = 0; v < pSma->numOfVgroups; ++v) {
if (tDecodeI32(pCoder, &pSma->pVgEpSet[v].vgId) < 0) return -1;
if (tDecodeI8(pCoder, &pSma->pVgEpSet[v].epSet.inUse) < 0) return -1;
if (tDecodeI8(pCoder, &pSma->pVgEpSet[v].epSet.numOfEps) < 0) return -1;
int8_t numOfEps = pSma->pVgEpSet[v].epSet.numOfEps;
for (int32_t n = 0; n < numOfEps; ++n) {
SEp *pEp = &pSma->pVgEpSet[v].epSet.eps[n];
if (tDecodeCStrTo(pCoder, pEp->fqdn) < 0) return -1;
if (tDecodeU16(pCoder, &pEp->port) < 0) return -1;
}
}
}
......@@ -3765,7 +3774,7 @@ int32_t tDecodeSVDropTSmaReq(SDecoder *pCoder, SVDropTSmaReq *pReq) {
return 0;
}
int32_t tEncodeSVGetTSmaExpWndsReq(SEncoder* pCoder, const SVGetTsmaExpWndsReq* pReq) {
int32_t tEncodeSVGetTSmaExpWndsReq(SEncoder *pCoder, const SVGetTsmaExpWndsReq *pReq) {
if (tStartEncode(pCoder) < 0) return -1;
if (tEncodeI64(pCoder, pReq->indexUid) < 0) return -1;
......@@ -3773,10 +3782,10 @@ int32_t tEncodeSVGetTSmaExpWndsReq(SEncoder* pCoder, const SVGetTsmaExpWndsReq*
if (tEncodeI64(pCoder, pReq->queryWindow.ekey) < 0) return -1;
tEndEncode(pCoder);
return 0;
return 0;
}
int32_t tDecodeSVGetTsmaExpWndsReq(SDecoder* pCoder, SVGetTsmaExpWndsReq* pReq) {
int32_t tDecodeSVGetTsmaExpWndsReq(SDecoder *pCoder, SVGetTsmaExpWndsReq *pReq) {
if (tStartDecode(pCoder) < 0) return -1;
if (tDecodeI64(pCoder, &pReq->indexUid) < 0) return -1;
......@@ -3787,7 +3796,7 @@ int32_t tDecodeSVGetTsmaExpWndsReq(SDecoder* pCoder, SVGetTsmaExpWndsReq* pReq)
return 0;
}
int32_t tEncodeSVGetTSmaExpWndsRsp(SEncoder* pCoder, const SVGetTsmaExpWndsRsp* pReq) {
int32_t tEncodeSVGetTSmaExpWndsRsp(SEncoder *pCoder, const SVGetTsmaExpWndsRsp *pReq) {
if (tStartEncode(pCoder) < 0) return -1;
if (tEncodeI64(pCoder, pReq->indexUid) < 0) return -1;
......@@ -3814,7 +3823,7 @@ int32_t tDecodeSVGetTsmaExpWndsRsp(SDecoder *pCoder, SVGetTsmaExpWndsRsp *pReq)
return 0;
}
int32_t tEncodeSVDeleteReq(SEncoder* pCoder, const SVDeleteReq* pReq) {
int32_t tEncodeSVDeleteReq(SEncoder *pCoder, const SVDeleteReq *pReq) {
if (tStartEncode(pCoder) < 0) return -1;
if (tEncodeI64(pCoder, pReq->delUid) < 0) return -1;
......@@ -3832,7 +3841,7 @@ int32_t tEncodeSVDeleteReq(SEncoder* pCoder, const SVDeleteReq* pReq) {
return 0;
}
int32_t tDecodeSVDeleteReq(SDecoder* pCoder, SVDeleteReq* pReq) {
int32_t tDecodeSVDeleteReq(SDecoder *pCoder, SVDeleteReq *pReq) {
if (tStartDecode(pCoder) < 0) return -1;
if (tDecodeI64(pCoder, &pReq->delUid) < 0) return -1;
......@@ -3850,7 +3859,7 @@ int32_t tDecodeSVDeleteReq(SDecoder* pCoder, SVDeleteReq* pReq) {
return 0;
}
int32_t tEncodeSVDeleteRsp(SEncoder* pCoder, const SVDeleteRsp* pReq) {
int32_t tEncodeSVDeleteRsp(SEncoder *pCoder, const SVDeleteRsp *pReq) {
if (tStartEncode(pCoder) < 0) return -1;
if (tEncodeI32(pCoder, pReq->code) < 0) return -1;
......@@ -3860,7 +3869,7 @@ int32_t tEncodeSVDeleteRsp(SEncoder* pCoder, const SVDeleteRsp* pReq) {
return 0;
}
int32_t tDecodeSVDeleteRsp(SDecoder* pCoder, SVDeleteRsp* pReq) {
int32_t tDecodeSVDeleteRsp(SDecoder *pCoder, SVDeleteRsp *pReq) {
if (tStartDecode(pCoder) < 0) return -1;
if (tDecodeI32(pCoder, &pReq->code) < 0) return -1;
......@@ -4502,7 +4511,7 @@ int32_t tDecodeSVAlterTbRsp(SDecoder *pDecoder, SVAlterTbRsp *pRsp) {
}
int32_t tDeserializeSVAlterTbRsp(void *buf, int32_t bufLen, SVAlterTbRsp *pRsp) {
int32_t meta = 0;
int32_t meta = 0;
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
......@@ -4543,7 +4552,7 @@ int32_t tDecodeSMAlterStbRsp(SDecoder *pDecoder, SMAlterStbRsp *pRsp) {
}
int32_t tDeserializeSMAlterStbRsp(void *buf, int32_t bufLen, SMAlterStbRsp *pRsp) {
int32_t meta = 0;
int32_t meta = 0;
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
......@@ -4559,7 +4568,7 @@ int32_t tDeserializeSMAlterStbRsp(void *buf, int32_t bufLen, SMAlterStbRsp *pRsp
return 0;
}
void tFreeSMAlterStbRsp(SMAlterStbRsp* pRsp) {
void tFreeSMAlterStbRsp(SMAlterStbRsp *pRsp) {
if (NULL == pRsp) {
return;
}
......@@ -4569,6 +4578,3 @@ void tFreeSMAlterStbRsp(SMAlterStbRsp* pRsp) {
taosMemoryFree(pRsp->pMeta);
}
}
......@@ -298,28 +298,30 @@ typedef struct {
} SVgObj;
typedef struct {
char name[TSDB_TABLE_FNAME_LEN];
char stb[TSDB_TABLE_FNAME_LEN];
char db[TSDB_DB_FNAME_LEN];
int64_t createdTime;
int64_t uid;
int64_t stbUid;
int64_t dbUid;
int8_t intervalUnit;
int8_t slidingUnit;
int8_t timezone;
int32_t dstVgId; // for stream
int64_t interval;
int64_t offset;
int64_t sliding;
int32_t exprLen; // strlen + 1
int32_t tagsFilterLen;
int32_t sqlLen;
int32_t astLen;
char* expr;
char* tagsFilter;
char* sql;
char* ast;
char name[TSDB_TABLE_FNAME_LEN];
char stb[TSDB_TABLE_FNAME_LEN];
char db[TSDB_DB_FNAME_LEN];
int64_t createdTime;
int64_t uid;
int64_t stbUid;
int64_t dbUid;
int8_t intervalUnit;
int8_t slidingUnit;
int8_t timezone;
int32_t dstVgId; // for stream
int64_t interval;
int64_t offset;
int64_t sliding;
int32_t exprLen; // strlen + 1
int32_t tagsFilterLen;
int32_t sqlLen;
int32_t astLen;
int32_t numOfVgroups;
char* expr;
char* tagsFilter;
char* sql;
char* ast;
SVgEpSet* pVgEpSet;
} SSmaObj;
typedef struct {
......
......@@ -37,7 +37,7 @@ const char *mndTopicGetShowName(const char topic[TSDB_TOPIC_FNAME_LEN]);
int32_t mndSetTopicCommitLogs(SMnode *pMnode, STrans *pTrans, SMqTopicObj *pTopic);
bool mndCheckColAndTagModifiable(SMnode *pMnode, int64_t suid, const SArray *colIds);
int32_t mndCheckColAndTagModifiable(SMnode *pMnode, int64_t suid, col_id_t colId);
#ifdef __cplusplus
}
......
......@@ -36,6 +36,7 @@ static SSdbRow *mndSmaActionDecode(SSdbRaw *pRaw);
static int32_t mndSmaActionInsert(SSdb *pSdb, SSmaObj *pSma);
static int32_t mndSmaActionDelete(SSdb *pSdb, SSmaObj *pSpSmatb);
static int32_t mndSmaActionUpdate(SSdb *pSdb, SSmaObj *pOld, SSmaObj *pNew);
static int32_t mndSmaGetVgEpSet(SMnode *pMnode, SDbObj *pDb, SVgEpSet **ppVgEpSet, int32_t *numOfVgroups);
static int32_t mndProcessMCreateSmaReq(SRpcMsg *pReq);
static int32_t mndProcessMDropSmaReq(SRpcMsg *pReq);
static int32_t mndProcessGetSmaReq(SRpcMsg *pReq);
......@@ -262,7 +263,9 @@ static void *mndBuildVCreateSmaReq(SMnode *pMnode, SVgObj *pVgroup, SSmaObj *pSm
req.sliding = pSma->sliding;
req.expr = pSma->expr;
req.tagsFilter = pSma->tagsFilter;
req.numOfVgroups = pSma->numOfVgroups;
req.pVgEpSet = pSma->pVgEpSet;
// get length
int32_t ret = 0;
tEncodeSize(tEncodeSVCreateTSmaReq, &req, contLen, ret);
......@@ -420,6 +423,15 @@ static int32_t mndSetCreateSmaVgroupRedoActions(SMnode *pMnode, STrans *pTrans,
mndReleaseDnode(pMnode, pDnode);
// todo add sma info here
SVgEpSet *pVgEpSet = NULL;
int32_t numOfVgroups = 0;
if (mndSmaGetVgEpSet(pMnode, pDb, &pVgEpSet, &numOfVgroups) != 0) {
return -1;
}
pSma->pVgEpSet = pVgEpSet;
pSma->numOfVgroups = numOfVgroups;
int32_t smaContLen = 0;
void *pSmaReq = mndBuildVCreateSmaReq(pMnode, pVgroup, pSma, &smaContLen);
if (pSmaReq == NULL) return -1;
......@@ -964,3 +976,52 @@ static void mndCancelGetNextSma(SMnode *pMnode, void *pIter) {
SSdb *pSdb = pMnode->pSdb;
sdbCancelFetch(pSdb, pIter);
}
static int32_t mndSmaGetVgEpSet(SMnode *pMnode, SDbObj *pDb, SVgEpSet **ppVgEpSet, int32_t *numOfVgroups) {
SSdb *pSdb = pMnode->pSdb;
SVgObj *pVgroup = NULL;
void *pIter = NULL;
SVgEpSet *pVgEpSet = NULL;
int32_t nAllocVgs = 16;
int32_t nVgs = 0;
pVgEpSet = taosMemoryCalloc(nAllocVgs, sizeof(SVgEpSet));
if (!pVgEpSet) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
while (1) {
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
if (pIter == NULL) break;
if (pVgroup->dbUid != pDb->uid) {
sdbRelease(pSdb, pVgroup);
continue;
}
if (nVgs >= nAllocVgs) {
void *p = taosMemoryRealloc(pVgEpSet, nAllocVgs * 2 * sizeof(SVgEpSet));
if (!p) {
taosMemoryFree(pVgEpSet);
sdbCancelFetch(pSdb, pIter);
sdbRelease(pSdb, pVgroup);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pVgEpSet = (SVgEpSet *)p;
nAllocVgs *= 2;
}
(pVgEpSet + nVgs)->vgId = pVgroup->vgId;
(pVgEpSet + nVgs)->epSet = mndGetVgroupEpset(pMnode, pVgroup);
++nVgs;
sdbRelease(pSdb, pVgroup);
}
*ppVgEpSet = pVgEpSet;
*numOfVgroups = nVgs;
return 0;
}
......@@ -23,6 +23,7 @@
#include "mndPerfSchema.h"
#include "mndScheduler.h"
#include "mndShow.h"
#include "mndTopic.h"
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
......@@ -394,14 +395,14 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt
req.pRSmaParam.xFilesFactor = pStb->xFilesFactor;
req.pRSmaParam.delay = pStb->delay;
if (pStb->ast1Len > 0) {
if (mndConvertRSmaTask(pStb->pAst1, pStb->uid, 0, 0, &req.pRSmaParam.qmsg1, &req.pRSmaParam.qmsg1Len, req.pRSmaParam.xFilesFactor) !=
TSDB_CODE_SUCCESS) {
if (mndConvertRSmaTask(pStb->pAst1, pStb->uid, 0, 0, &req.pRSmaParam.qmsg1, &req.pRSmaParam.qmsg1Len,
req.pRSmaParam.xFilesFactor) != TSDB_CODE_SUCCESS) {
return NULL;
}
}
if (pStb->ast2Len > 0) {
if (mndConvertRSmaTask(pStb->pAst2, pStb->uid, 0, 0, &req.pRSmaParam.qmsg2, &req.pRSmaParam.qmsg2Len, req.pRSmaParam.xFilesFactor) !=
TSDB_CODE_SUCCESS) {
if (mndConvertRSmaTask(pStb->pAst2, pStb->uid, 0, 0, &req.pRSmaParam.qmsg2, &req.pRSmaParam.qmsg2Len,
req.pRSmaParam.xFilesFactor) != TSDB_CODE_SUCCESS) {
return NULL;
}
}
......@@ -949,13 +950,18 @@ static int32_t mndAddSuperTableTag(const SStbObj *pOld, SStbObj *pNew, SArray *p
return 0;
}
static int32_t mndDropSuperTableTag(const SStbObj *pOld, SStbObj *pNew, const char *tagName) {
static int32_t mndDropSuperTableTag(SMnode *pMnode, const SStbObj *pOld, SStbObj *pNew, const char *tagName) {
int32_t tag = mndFindSuperTableTagIndex(pOld, tagName);
if (tag < 0) {
terrno = TSDB_CODE_MND_TAG_NOT_EXIST;
return -1;
}
col_id_t colId = pOld->pTags[tag].colId;
if (mndCheckColAndTagModifiable(pMnode, pOld->uid, colId) != 0) {
return -1;
}
if (mndAllocStbSchemas(pOld, pNew) != 0) {
return -1;
}
......@@ -968,7 +974,7 @@ static int32_t mndDropSuperTableTag(const SStbObj *pOld, SStbObj *pNew, const ch
return 0;
}
static int32_t mndAlterStbTagName(const SStbObj *pOld, SStbObj *pNew, SArray *pFields) {
static int32_t mndAlterStbTagName(SMnode *pMnode, const SStbObj *pOld, SStbObj *pNew, SArray *pFields) {
if ((int32_t)taosArrayGetSize(pFields) != 2) {
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
return -1;
......@@ -986,6 +992,11 @@ static int32_t mndAlterStbTagName(const SStbObj *pOld, SStbObj *pNew, SArray *pF
return -1;
}
col_id_t colId = pOld->pTags[tag].colId;
if (mndCheckColAndTagModifiable(pMnode, pOld->uid, colId) != 0) {
return -1;
}
if (mndFindSuperTableTagIndex(pOld, newTagName) >= 0) {
terrno = TSDB_CODE_MND_TAG_ALREADY_EXIST;
return -1;
......@@ -1008,13 +1019,18 @@ static int32_t mndAlterStbTagName(const SStbObj *pOld, SStbObj *pNew, SArray *pF
return 0;
}
static int32_t mndAlterStbTagBytes(const SStbObj *pOld, SStbObj *pNew, const SField *pField) {
static int32_t mndAlterStbTagBytes(SMnode *pMnode, const SStbObj *pOld, SStbObj *pNew, const SField *pField) {
int32_t tag = mndFindSuperTableTagIndex(pOld, pField->name);
if (tag < 0) {
terrno = TSDB_CODE_MND_TAG_NOT_EXIST;
return -1;
}
col_id_t colId = pOld->pTags[tag].colId;
if (mndCheckColAndTagModifiable(pMnode, pOld->uid, colId) != 0) {
return -1;
}
if (mndAllocStbSchemas(pOld, pNew) != 0) {
return -1;
}
......@@ -1075,7 +1091,7 @@ static int32_t mndAddSuperTableColumn(const SStbObj *pOld, SStbObj *pNew, SArray
return 0;
}
static int32_t mndDropSuperTableColumn(const SStbObj *pOld, SStbObj *pNew, const char *colName) {
static int32_t mndDropSuperTableColumn(SMnode *pMnode, const SStbObj *pOld, SStbObj *pNew, const char *colName) {
int32_t col = mndFindSuperTableColumnIndex(pOld, colName);
if (col < 0) {
terrno = TSDB_CODE_MND_COLUMN_NOT_EXIST;
......@@ -1092,6 +1108,11 @@ static int32_t mndDropSuperTableColumn(const SStbObj *pOld, SStbObj *pNew, const
return -1;
}
col_id_t colId = pOld->pTags[col].colId;
if (mndCheckColAndTagModifiable(pMnode, pOld->uid, colId) != 0) {
return -1;
}
if (mndAllocStbSchemas(pOld, pNew) != 0) {
return -1;
}
......@@ -1104,7 +1125,7 @@ static int32_t mndDropSuperTableColumn(const SStbObj *pOld, SStbObj *pNew, const
return 0;
}
static int32_t mndAlterStbColumnBytes(const SStbObj *pOld, SStbObj *pNew, const SField *pField) {
static int32_t mndAlterStbColumnBytes(SMnode *pMnode, const SStbObj *pOld, SStbObj *pNew, const SField *pField) {
int32_t col = mndFindSuperTableColumnIndex(pOld, pField->name);
if (col < 0) {
terrno = TSDB_CODE_MND_COLUMN_NOT_EXIST;
......@@ -1121,6 +1142,11 @@ static int32_t mndAlterStbColumnBytes(const SStbObj *pOld, SStbObj *pNew, const
return -1;
}
col_id_t colId = pOld->pTags[col].colId;
if (mndCheckColAndTagModifiable(pMnode, pOld->uid, colId) != 0) {
return -1;
}
if (mndAllocStbSchemas(pOld, pNew) != 0) {
return -1;
}
......@@ -1199,7 +1225,6 @@ static int32_t mndSetAlterStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
return 0;
}
static int32_t mndBuildStbSchemaImp(SDbObj *pDb, SStbObj *pStb, const char *tbName, STableMetaRsp *pRsp) {
taosRLockLatch(&pStb->lock);
......@@ -1269,13 +1294,13 @@ static int32_t mndBuildStbSchema(SMnode *pMnode, const char *dbFName, const char
return code;
}
static int32_t mndBuildSMAlterStbRsp(SDbObj *pDb, const SMAlterStbReq *pAlter, SStbObj *pObj, void **pCont, int32_t *pLen) {
static int32_t mndBuildSMAlterStbRsp(SDbObj *pDb, const SMAlterStbReq *pAlter, SStbObj *pObj, void **pCont,
int32_t *pLen) {
int ret;
SEncoder ec = {0};
uint32_t contLen = 0;
uint32_t contLen = 0;
SMAlterStbRsp alterRsp = {0};
SName name = {0};
SName name = {0};
tNameFromString(&name, pAlter->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
alterRsp.pMeta = taosMemoryCalloc(1, sizeof(STableMetaRsp));
......@@ -1283,20 +1308,20 @@ static int32_t mndBuildSMAlterStbRsp(SDbObj *pDb, const SMAlterStbReq *pAlter, S
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
ret = mndBuildStbSchemaImp(pDb, pObj, name.tname, alterRsp.pMeta);
if (ret) {
tFreeSMAlterStbRsp(&alterRsp);
return ret;
}
tEncodeSize(tEncodeSMAlterStbRsp, &alterRsp, contLen, ret);
if (ret) {
tFreeSMAlterStbRsp(&alterRsp);
return ret;
}
void* cont = taosMemoryMalloc(contLen);
void *cont = taosMemoryMalloc(contLen);
tEncoderInit(&ec, cont, contLen);
tEncodeSMAlterStbRsp(&ec, &alterRsp);
tEncoderClear(&ec);
......@@ -1305,24 +1330,24 @@ static int32_t mndBuildSMAlterStbRsp(SDbObj *pDb, const SMAlterStbReq *pAlter, S
*pCont = cont;
*pLen = contLen;
return 0;
}
static int32_t mndAlterStb(SMnode *pMnode, SRpcMsg *pReq, const SMAlterStbReq *pAlter, SDbObj *pDb, SStbObj *pOld) {
bool needRsp = true;
bool needRsp = true;
int32_t code = -1;
STrans *pTrans = NULL;
SField *pField0 = NULL;
SStbObj stbObj = {0};
taosRLockLatch(&pOld->lock);
memcpy(&stbObj, pOld, sizeof(SStbObj));
taosRUnLockLatch(&pOld->lock);
stbObj.pColumns = NULL;
stbObj.pTags = NULL;
stbObj.updateTime = taosGetTimestampMs();
stbObj.lock = 0;
taosRUnLockLatch(&pOld->lock);
int32_t code = -1;
STrans *pTrans = NULL;
SField *pField0 = NULL;
switch (pAlter->alterType) {
case TSDB_ALTER_TABLE_ADD_TAG:
......@@ -1330,25 +1355,25 @@ static int32_t mndAlterStb(SMnode *pMnode, SRpcMsg *pReq, const SMAlterStbReq *p
break;
case TSDB_ALTER_TABLE_DROP_TAG:
pField0 = taosArrayGet(pAlter->pFields, 0);
code = mndDropSuperTableTag(pOld, &stbObj, pField0->name);
code = mndDropSuperTableTag(pMnode, pOld, &stbObj, pField0->name);
break;
case TSDB_ALTER_TABLE_UPDATE_TAG_NAME:
code = mndAlterStbTagName(pOld, &stbObj, pAlter->pFields);
code = mndAlterStbTagName(pMnode, pOld, &stbObj, pAlter->pFields);
break;
case TSDB_ALTER_TABLE_UPDATE_TAG_BYTES:
pField0 = taosArrayGet(pAlter->pFields, 0);
code = mndAlterStbTagBytes(pOld, &stbObj, pField0);
code = mndAlterStbTagBytes(pMnode, pOld, &stbObj, pField0);
break;
case TSDB_ALTER_TABLE_ADD_COLUMN:
code = mndAddSuperTableColumn(pOld, &stbObj, pAlter->pFields, pAlter->numOfFields);
break;
case TSDB_ALTER_TABLE_DROP_COLUMN:
pField0 = taosArrayGet(pAlter->pFields, 0);
code = mndDropSuperTableColumn(pOld, &stbObj, pField0->name);
code = mndDropSuperTableColumn(pMnode, pOld, &stbObj, pField0->name);
break;
case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES:
pField0 = taosArrayGet(pAlter->pFields, 0);
code = mndAlterStbColumnBytes(pOld, &stbObj, pField0);
code = mndAlterStbColumnBytes(pMnode, pOld, &stbObj, pField0);
break;
case TSDB_ALTER_TABLE_UPDATE_OPTIONS:
needRsp = false;
......@@ -1370,12 +1395,12 @@ static int32_t mndAlterStb(SMnode *pMnode, SRpcMsg *pReq, const SMAlterStbReq *p
mndTransSetDbName(pTrans, pDb->name);
if (needRsp) {
void* pCont = NULL;
void *pCont = NULL;
int32_t contLen = 0;
if (mndBuildSMAlterStbRsp(pDb, pAlter, &stbObj, &pCont, &contLen)) goto _OVER;
if (mndBuildSMAlterStbRsp(pDb, pAlter, &stbObj, &pCont, &contLen) != 0) goto _OVER;
mndTransSetRpcRsp(pTrans, pCont, contLen);
}
if (mndSetAlterStbRedoLogs(pMnode, pTrans, pDb, &stbObj) != 0) goto _OVER;
if (mndSetAlterStbCommitLogs(pMnode, pTrans, pDb, &stbObj) != 0) goto _OVER;
if (mndSetAlterStbRedoActions(pMnode, pTrans, pDb, &stbObj) != 0) goto _OVER;
......
......@@ -71,7 +71,7 @@ const char *mndTopicGetShowName(const char topic[TSDB_TOPIC_FNAME_LEN]) {
return strchr(topic, '.') + 1;
}
bool mndCheckColAndTagModifiable(SMnode *pMnode, int64_t suid, const SArray *colAndTagIds) {
int32_t mndCheckColAndTagModifiable(SMnode *pMnode, int64_t suid, col_id_t colId) {
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
bool found = false;
......@@ -105,20 +105,21 @@ bool mndCheckColAndTagModifiable(SMnode *pMnode, int64_t suid, const SArray *col
}
}
for (int32_t i = 0; i < taosArrayGetSize(colAndTagIds); i++) {
int16_t *pColId = taosArrayGet(colAndTagIds, i);
if (taosHashGet(pColHash, pColId, sizeof(int16_t)) != NULL) {
found = true;
goto NEXT;
}
if (taosHashGet(pColHash, &colId, sizeof(int16_t)) != NULL) {
found = true;
goto NEXT;
}
NEXT:
sdbRelease(pSdb, pTopic);
nodesDestroyNode(pAst);
if (found) return false;
if (found) {
terrno = TSDB_CODE_MND_FIELD_CONFLICT_WITH_TOPIC;
return -1;
}
}
return true;
return 0;
}
SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
......
......@@ -103,7 +103,7 @@ static int metaSaveJsonVarToIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const
tIndexJsonPut(pMeta->pTagIvtIdx, terms, tuid);
indexMultiTermDestroy(terms);
#endif
return -1;
return 0;
}
int metaCreateSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
......
......@@ -38,7 +38,7 @@ typedef struct SIFParam {
col_id_t colId;
int64_t suid; // add later
char dbName[TSDB_DB_NAME_LEN];
char colName[TSDB_COL_NAME_LEN];
char colName[TSDB_COL_NAME_LEN * 2 + 4];
SIndexMetaArg arg;
} SIFParam;
......@@ -171,6 +171,7 @@ static int32_t sifInitJsonParam(SNode *node, SIFParam *param, SIFCtx *ctx) {
param->colId = l->colId;
param->colValType = l->node.resType.type;
memcpy(param->dbName, l->dbName, sizeof(l->dbName));
sprintf(param->colName, "%s_%s", l->colName, r->literal);
param->colValType = r->typeData;
return 0;
......
......@@ -225,14 +225,14 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_VGROUP_NOT_IN_DNODE, "Vgroup not in dnode")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_VGROUP_NOT_EXIST, "VGroup does not exist")
// mnode-stable
TAOS_DEFINE_ERROR(TSDB_CODE_MND_STB_ALREADY_EXIST, "Stable already exists")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_STB_NOT_EXIST, "Stable not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_NAME_CONFLICT_WITH_TOPIC, "Stable confilct with topic")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_STB_ALREADY_EXIST, "STable already exists")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_STB_NOT_EXIST, "STable not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_NAME_CONFLICT_WITH_TOPIC, "STable confilct with topic")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_STBS, "Too many stables")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_STB, "Invalid stable name")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_STB_OPTION, "Invalid stable options")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_STB_ALTER_OPTION, "Invalid stable alter options")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_STB_OPTION_UNCHNAGED, "Stable option unchanged")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_STB_OPTION_UNCHNAGED, "STable option unchanged")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_ROW_BYTES, "Invalid row bytes")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_TAGS, "Too many tags")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TAG_ALREADY_EXIST, "Tag already exists")
......@@ -240,6 +240,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_TAG_NOT_EXIST, "Tag does not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_COLUMNS, "Too many columns")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_COLUMN_ALREADY_EXIST, "Column already exists")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_COLUMN_NOT_EXIST, "Column does not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_FIELD_CONFLICT_WITH_TOPIC,"Field used by topic")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_SINGLE_STB_MODE_DB, "Database is single stable mode")
// mnode-infoSchema
......
......@@ -282,7 +282,7 @@ class TDTestCase:
tdSql.execute("create topic %s as select ts, c1, c2, t1, t2 from %s.%s" %(columnTopicFromStb, parameterDict['dbName'], parameterDict['stbName']))
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(columnTopicFromNtb, parameterDict['dbName'], ntbName))
tsLog.info("======== super table test:")
tdLog.info("======== super table test:")
# alter actions prohibited: drop column/tag, modify column/tag type, rename column/tag included in topic
tdSql.error("alter table %s.%s drop column c1"%(parameterDict['dbName'], parameterDict['stbName']))
tdSql.error("alter table %s.%s drop column c2"%(parameterDict['dbName'], parameterDict['stbName']))
......@@ -316,7 +316,7 @@ class TDTestCase:
tdSql.query("alter table %s.%s add tag t3 int"%(parameterDict['dbName'], parameterDict['stbName']))
tdSql.query("alter table %s.%s add tag t4 float"%(parameterDict['dbName'], parameterDict['stbName']))
tsLog.info("======== normal table test:")
tdLog.info("======== normal table test:")
# alter actions prohibited: drop column/tag, modify column/tag type, rename column/tag included in topic
tdSql.error("alter table %s.%s drop column c1"%(parameterDict['dbName'], ntbName))
tdSql.error("alter table %s.%s drop column c2"%(parameterDict['dbName'], ntbName))
......@@ -419,7 +419,7 @@ class TDTestCase:
tdSql.execute("create topic %s as select ts, c1, c2, t1, t2 from %s.%s where c3 > 3 and c4 like 'abc' and t3 = 5 and t4 = 'beijing'" %(columnTopicFromStb, parameterDict['dbName'], parameterDict['stbName']))
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s where c3 > 3 and c4 like 'abc'" %(columnTopicFromNtb, parameterDict['dbName'], ntbName))
tsLog.info("======== super table test:")
tdLog.info("======== super table test:")
# alter actions prohibited: drop column/tag, modify column/tag type, rename column/tag included in topic
tdSql.error("alter table %s.%s drop column c1"%(parameterDict['dbName'], parameterDict['stbName']))
tdSql.error("alter table %s.%s drop column c2"%(parameterDict['dbName'], parameterDict['stbName']))
......@@ -457,7 +457,7 @@ class TDTestCase:
tdSql.query("alter table %s.%s add column c5 int"%(parameterDict['dbName'], parameterDict['stbName']))
tdSql.query("alter table %s.%s add tag t5 float"%(parameterDict['dbName'], parameterDict['stbName']))
tsLog.info("======== normal table test:")
tdLog.info("======== normal table test:")
# alter actions prohibited: drop column/tag, modify column/tag type, rename column/tag included in topic
tdSql.error("alter table %s.%s drop column c1"%(parameterDict['dbName'], ntbName))
tdSql.error("alter table %s.%s drop column c2"%(parameterDict['dbName'], ntbName))
......@@ -566,7 +566,7 @@ class TDTestCase:
tdSql.execute("create topic %s as select * from %s.%s" %(columnTopicFromStb, parameterDict['dbName'], parameterDict['stbName']))
tdSql.execute("create topic %s as select * from %s.%s " %(columnTopicFromNtb, parameterDict['dbName'], ntbName))
tsLog.info("======== super table test:")
tdLog.info("======== super table test:")
# alter actions prohibited: drop column/tag, modify column/tag type, rename column/tag included in topic
tdSql.error("alter table %s.%s drop column c1"%(parameterDict['dbName'], parameterDict['stbName']))
tdSql.error("alter table %s.%s drop column c2"%(parameterDict['dbName'], parameterDict['stbName']))
......@@ -601,7 +601,7 @@ class TDTestCase:
tdSql.query("alter table %s.%s add column c6 int"%(parameterDict['dbName'], parameterDict['stbName']))
tdSql.query("alter table %s.%s add tag t6 float"%(parameterDict['dbName'], parameterDict['stbName']))
tsLog.info("======== normal table test:")
tdLog.info("======== normal table test:")
# alter actions prohibited: drop column/tag, modify column/tag type, rename column/tag included in topic
tdSql.error("alter table %s.%s drop column c1"%(parameterDict['dbName'], ntbName))
tdSql.error("alter table %s.%s drop column c2"%(parameterDict['dbName'], ntbName))
......@@ -687,8 +687,8 @@ class TDTestCase:
tdLog.info("cfgPath: %s" % cfgPath)
self.tmqCase1(cfgPath, buildPath)
self.tmqCase2(cfgPath, buildPath)
self.tmqCase3(cfgPath, buildPath)
# self.tmqCase2(cfgPath, buildPath)
# self.tmqCase3(cfgPath, buildPath)
def stop(self):
tdSql.close()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册