未验证 提交 4a310bae 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #14420 from taosdata/fix/tsim

refactor: stable mgmt
......@@ -436,15 +436,16 @@ typedef struct {
int32_t ttl;
int32_t numOfColumns;
int32_t numOfTags;
int32_t numOfFuncs;
int32_t commentLen;
int32_t ast1Len;
int32_t ast2Len;
SArray* pColumns; // array of SField
SArray* pTags; // array of SField
char* comment;
SArray* pFuncs;
char* pComment;
char* pAst1;
char* pAst2;
SArray* pFuncs;
} SMCreateStbReq;
int32_t tSerializeSMCreateStbReq(void* buf, int32_t bufLen, SMCreateStbReq* pReq);
......
......@@ -503,6 +503,7 @@ int32_t tSerializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pReq
if (tEncodeI32(&encoder, pReq->ttl) < 0) return -1;
if (tEncodeI32(&encoder, pReq->numOfColumns) < 0) return -1;
if (tEncodeI32(&encoder, pReq->numOfTags) < 0) return -1;
if (tEncodeI32(&encoder, pReq->numOfFuncs) < 0) return -1;
if (tEncodeI32(&encoder, pReq->commentLen) < 0) return -1;
if (tEncodeI32(&encoder, pReq->ast1Len) < 0) return -1;
if (tEncodeI32(&encoder, pReq->ast2Len) < 0) return -1;
......@@ -510,21 +511,26 @@ int32_t tSerializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pReq
for (int32_t i = 0; i < pReq->numOfColumns; ++i) {
SField *pField = taosArrayGet(pReq->pColumns, i);
if (tEncodeI8(&encoder, pField->type) < 0) return -1;
if (tEncodeI8(&encoder, pField->flags) < 0) return -1;
if (tEncodeI32(&encoder, pField->bytes) < 0) return -1;
if (tEncodeCStr(&encoder, pField->name) < 0) return -1;
if (tEncodeI8(&encoder, pField->flags) < 0) return -1;
}
for (int32_t i = 0; i < pReq->numOfTags; ++i) {
SField *pField = taosArrayGet(pReq->pTags, i);
if (tEncodeI8(&encoder, pField->type) < 0) return -1;
if (tEncodeI8(&encoder, pField->flags) < 0) return -1;
if (tEncodeI32(&encoder, pField->bytes) < 0) return -1;
if (tEncodeCStr(&encoder, pField->name) < 0) return -1;
if (tEncodeI8(&encoder, pField->flags) < 0) return -1;
}
for (int32_t i = 0; i < pReq->numOfFuncs; ++i) {
const char *pFunc = taosArrayGet(pReq->pFuncs, i);
if (tEncodeCStr(&encoder, pFunc) < 0) return -1;
}
if (pReq->commentLen > 0) {
if (tEncodeCStr(&encoder, pReq->comment) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->pComment) < 0) return -1;
}
if (pReq->ast1Len > 0) {
if (tEncodeBinary(&encoder, pReq->pAst1, pReq->ast1Len) < 0) return -1;
......@@ -533,13 +539,6 @@ int32_t tSerializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pReq
if (tEncodeBinary(&encoder, pReq->pAst2, pReq->ast2Len) < 0) return -1;
}
int32_t numOfFuncs = taosArrayGetSize(pReq->pFuncs);
if (tEncodeI32(&encoder, numOfFuncs) < 0) return -1;
for (int32_t i = 0; i < numOfFuncs; ++i) {
const char *pFunc = taosArrayGet(pReq->pFuncs, i);
if (tEncodeCStr(&encoder, pFunc) < 0) return -1;
}
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
......@@ -561,13 +560,15 @@ int32_t tDeserializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pR
if (tDecodeI32(&decoder, &pReq->ttl) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->numOfColumns) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->numOfTags) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->numOfFuncs) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->commentLen) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->ast1Len) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->ast2Len) < 0) return -1;
pReq->pColumns = taosArrayInit(pReq->numOfColumns, sizeof(SField));
pReq->pTags = taosArrayInit(pReq->numOfTags, sizeof(SField));
if (pReq->pColumns == NULL || pReq->pTags == NULL) {
pReq->pFuncs = taosArrayInit(pReq->numOfFuncs, TSDB_FUNC_NAME_LEN);
if (pReq->pColumns == NULL || pReq->pTags == NULL || pReq->pFuncs == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
......@@ -575,9 +576,9 @@ int32_t tDeserializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pR
for (int32_t i = 0; i < pReq->numOfColumns; ++i) {
SField field = {0};
if (tDecodeI8(&decoder, &field.type) < 0) return -1;
if (tDecodeI8(&decoder, &field.flags) < 0) return -1;
if (tDecodeI32(&decoder, &field.bytes) < 0) return -1;
if (tDecodeCStrTo(&decoder, field.name) < 0) return -1;
if (tDecodeI8(&decoder, &field.flags) < 0) return -1;
if (taosArrayPush(pReq->pColumns, &field) == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
......@@ -587,19 +588,28 @@ int32_t tDeserializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pR
for (int32_t i = 0; i < pReq->numOfTags; ++i) {
SField field = {0};
if (tDecodeI8(&decoder, &field.type) < 0) return -1;
if (tDecodeI8(&decoder, &field.flags) < 0) return -1;
if (tDecodeI32(&decoder, &field.bytes) < 0) return -1;
if (tDecodeCStrTo(&decoder, field.name) < 0) return -1;
if (tDecodeI8(&decoder, &field.flags) < 0) return -1;
if (taosArrayPush(pReq->pTags, &field) == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
}
for (int32_t i = 0; i < pReq->numOfFuncs; ++i) {
char pFunc[TSDB_FUNC_NAME_LEN] = {0};
if (tDecodeCStrTo(&decoder, pFunc) < 0) return -1;
if (taosArrayPush(pReq->pFuncs, pFunc) == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
}
if (pReq->commentLen > 0) {
pReq->comment = taosMemoryMalloc(pReq->commentLen + 1);
if (pReq->comment == NULL) return -1;
if (tDecodeCStrTo(&decoder, pReq->comment) < 0) return -1;
pReq->pComment = taosMemoryMalloc(pReq->commentLen + 1);
if (pReq->pComment == NULL) return -1;
if (tDecodeCStrTo(&decoder, pReq->pComment) < 0) return -1;
}
if (pReq->ast1Len > 0) {
......@@ -614,23 +624,7 @@ int32_t tDeserializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pR
if (tDecodeCStrTo(&decoder, pReq->pAst2) < 0) return -1;
}
int32_t numOfFuncs = 0;
if (tDecodeI32(&decoder, &numOfFuncs) < 0) return -1;
if (numOfFuncs > 0) {
pReq->pFuncs = taosArrayInit(numOfFuncs, TSDB_FUNC_NAME_LEN);
if (NULL == pReq->pFuncs) return -1;
}
for (int32_t i = 0; i < numOfFuncs; ++i) {
char pFunc[TSDB_FUNC_NAME_LEN] = {0};
if (tDecodeCStrTo(&decoder, pFunc) < 0) return -1;
if (taosArrayPush(pReq->pFuncs, pFunc) == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
}
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
......@@ -638,10 +632,10 @@ int32_t tDeserializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pR
void tFreeSMCreateStbReq(SMCreateStbReq *pReq) {
taosArrayDestroy(pReq->pColumns);
taosArrayDestroy(pReq->pTags);
taosMemoryFreeClear(pReq->comment);
taosArrayDestroy(pReq->pFuncs);
taosMemoryFreeClear(pReq->pComment);
taosMemoryFreeClear(pReq->pAst1);
taosMemoryFreeClear(pReq->pAst2);
taosArrayDestroy(pReq->pFuncs);
}
int32_t tSerializeSMDropStbReq(void *buf, int32_t bufLen, SMDropStbReq *pReq) {
......
......@@ -341,11 +341,12 @@ typedef struct {
int32_t colVer;
int32_t smaVer;
int32_t nextColId;
int64_t watermark[2];
int64_t maxdelay[2];
int64_t watermark[2];
int32_t ttl;
int32_t numOfColumns;
int32_t numOfTags;
int32_t numOfFuncs;
int32_t commentLen;
int32_t ast1Len;
int32_t ast2Len;
......
......@@ -396,6 +396,8 @@ static int32_t mndSetUpdateSmaStbCommitLogs(SMnode *pMnode, STrans *pTrans, SStb
stbObj.pColumns = NULL;
stbObj.numOfTags = 0;
stbObj.pTags = NULL;
stbObj.numOfFuncs = 0;
stbObj.pFuncs = NULL;
stbObj.updateTime = taosGetTimestampMs();
stbObj.lock = 0;
stbObj.smaVer++;
......@@ -408,47 +410,6 @@ static int32_t mndSetUpdateSmaStbCommitLogs(SMnode *pMnode, STrans *pTrans, SStb
return 0;
}
#if 0
static int32_t mndSetCreateSmaRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SSmaObj *pSma) {
SSdb *pSdb = pMnode->pSdb;
SVgObj *pVgroup = NULL;
void *pIter = NULL;
int32_t contLen;
while (1) {
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
if (pIter == NULL) break;
if (pVgroup->dbUid != pDb->uid) {
sdbRelease(pSdb, pVgroup);
continue;
}
void *pReq = mndBuildVCreateSmaReq(pMnode, pVgroup, pSma, &contLen);
if (pReq == NULL) {
sdbCancelFetch(pSdb, pIter);
sdbRelease(pSdb, pVgroup);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
STransAction action = {0};
action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
action.pCont = pReq;
action.contLen = contLen;
action.msgType = TDMT_VND_CREATE_SMA;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(pReq);
sdbCancelFetch(pSdb, pIter);
sdbRelease(pSdb, pVgroup);
return -1;
}
sdbRelease(pSdb, pVgroup);
}
return 0;
}
#endif
static int32_t mndSetCreateSmaVgroupRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
SSmaObj *pSma) {
SVnodeGid *pVgid = pVgroup->vnodeGid + 0;
......@@ -621,7 +582,6 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
if (mndSetCreateSmaCommitLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER;
if (mndSetCreateSmaVgroupCommitLogs(pMnode, pTrans, &streamObj.fixedSinkVg) != 0) goto _OVER;
if (mndSetUpdateSmaStbCommitLogs(pMnode, pTrans, pStb) != 0) goto _OVER;
// if (mndSetCreateSmaRedoActions(pMnode, pTrans, pDb, &smaObj) != 0) goto _OVER;
if (mndSetCreateSmaVgroupRedoActions(pMnode, pTrans, pDb, &streamObj.fixedSinkVg, &smaObj) != 0) goto _OVER;
if (mndScheduleStream(pMnode, &streamObj) != 0) goto _OVER;
if (mndPersistStream(pMnode, pTrans, &streamObj) != 0) goto _OVER;
......@@ -770,49 +730,6 @@ static int32_t mndSetDropSmaVgroupCommitLogs(SMnode *pMnode, STrans *pTrans, SVg
return 0;
}
#if 0
static int32_t mndSetDropSmaRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SSmaObj *pSma) {
SSdb *pSdb = pMnode->pSdb;
SVgObj *pVgroup = NULL;
void *pIter = NULL;
int32_t contLen;
while (1) {
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
if (pIter == NULL) break;
if (pVgroup->dbUid != pDb->uid) {
sdbRelease(pSdb, pVgroup);
continue;
}
int32_t contLen = 0;
void *pReq = mndBuildVDropSmaReq(pMnode, pVgroup, pSma, &contLen);
if (pReq == NULL) {
sdbCancelFetch(pSdb, pIter);
sdbRelease(pSdb, pVgroup);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
STransAction action = {0};
action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
action.pCont = pReq;
action.contLen = contLen;
action.msgType = TDMT_VND_DROP_SMA;
action.acceptableCode = TSDB_CODE_VND_SMA_NOT_EXIST;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(pReq);
sdbCancelFetch(pSdb, pIter);
sdbRelease(pSdb, pVgroup);
return -1;
}
sdbRelease(pSdb, pVgroup);
}
return 0;
}
#endif
static int32_t mndSetDropSmaVgroupRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) {
SVnodeGid *pVgid = pVgroup->vnodeGid + 0;
SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
......@@ -879,7 +796,6 @@ static int32_t mndDropSma(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SSmaObj *p
if (mndSetDropSmaCommitLogs(pMnode, pTrans, pSma) != 0) goto _OVER;
if (mndSetDropSmaVgroupCommitLogs(pMnode, pTrans, pVgroup) != 0) goto _OVER;
if (mndSetUpdateSmaStbCommitLogs(pMnode, pTrans, pStb) != 0) goto _OVER;
// if (mndSetDropSmaRedoActions(pMnode, pTrans, pDb, pSma) != 0) goto _OVER;
if (mndSetDropSmaVgroupRedoActions(pMnode, pTrans, pDb, pVgroup) != 0) goto _OVER;
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
......@@ -909,7 +825,6 @@ int32_t mndDropSmasByStb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *p
if (mndSetDropSmaVgroupCommitLogs(pMnode, pTrans, pVgroup) != 0) goto _OVER;
if (mndSetDropSmaVgroupRedoActions(pMnode, pTrans, pDb, pVgroup) != 0) goto _OVER;
if (mndSetDropSmaCommitLogs(pMnode, pTrans, pSma) != 0) goto _OVER;
// if (mndSetDropSmaRedoActions(pMnode, pTrans, pDb, pSma) != 0) goto _OVER;
mndReleaseVgroup(pMnode, pVgroup);
pVgroup = NULL;
}
......
......@@ -78,7 +78,7 @@ void mndCleanupStb(SMnode *pMnode) {}
SSdbRaw *mndStbActionEncode(SStbObj *pStb) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
int32_t size = sizeof(SStbObj) + (pStb->numOfColumns + pStb->numOfTags) * sizeof(SSchema) + +pStb->commentLen +
int32_t size = sizeof(SStbObj) + (pStb->numOfColumns + pStb->numOfTags) * sizeof(SSchema) + pStb->commentLen +
pStb->ast1Len + pStb->ast2Len + STB_RESERVE_SIZE + taosArrayGetSize(pStb->pFuncs) * TSDB_FUNC_NAME_LEN;
SSdbRaw *pRaw = sdbAllocRaw(SDB_STB, STB_VER_NUMBER, size);
if (pRaw == NULL) goto _OVER;
......@@ -92,6 +92,7 @@ SSdbRaw *mndStbActionEncode(SStbObj *pStb) {
SDB_SET_INT64(pRaw, dataPos, pStb->dbUid, _OVER)
SDB_SET_INT32(pRaw, dataPos, pStb->tagVer, _OVER)
SDB_SET_INT32(pRaw, dataPos, pStb->colVer, _OVER)
SDB_SET_INT32(pRaw, dataPos, pStb->smaVer, _OVER)
SDB_SET_INT32(pRaw, dataPos, pStb->nextColId, _OVER)
SDB_SET_INT64(pRaw, dataPos, pStb->maxdelay[0], _OVER)
SDB_SET_INT64(pRaw, dataPos, pStb->maxdelay[1], _OVER)
......@@ -100,17 +101,11 @@ SSdbRaw *mndStbActionEncode(SStbObj *pStb) {
SDB_SET_INT32(pRaw, dataPos, pStb->ttl, _OVER)
SDB_SET_INT32(pRaw, dataPos, pStb->numOfColumns, _OVER)
SDB_SET_INT32(pRaw, dataPos, pStb->numOfTags, _OVER)
SDB_SET_INT32(pRaw, dataPos, pStb->numOfFuncs, _OVER)
SDB_SET_INT32(pRaw, dataPos, pStb->commentLen, _OVER)
SDB_SET_INT32(pRaw, dataPos, pStb->ast1Len, _OVER)
SDB_SET_INT32(pRaw, dataPos, pStb->ast2Len, _OVER)
int32_t funcNum = taosArrayGetSize(pStb->pFuncs);
SDB_SET_INT32(pRaw, dataPos, funcNum, _OVER)
for (int32_t i = 0; i < funcNum; ++i) {
char *func = taosArrayGet(pStb->pFuncs, i);
SDB_SET_BINARY(pRaw, dataPos, func, TSDB_FUNC_NAME_LEN, _OVER)
}
for (int32_t i = 0; i < pStb->numOfColumns; ++i) {
SSchema *pSchema = &pStb->pColumns[i];
SDB_SET_INT8(pRaw, dataPos, pSchema->type, _OVER)
......@@ -129,15 +124,23 @@ SSdbRaw *mndStbActionEncode(SStbObj *pStb) {
SDB_SET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN, _OVER)
}
for (int32_t i = 0; i < pStb->numOfFuncs; ++i) {
char *func = taosArrayGet(pStb->pFuncs, i);
SDB_SET_BINARY(pRaw, dataPos, func, TSDB_FUNC_NAME_LEN, _OVER)
}
if (pStb->commentLen > 0) {
SDB_SET_BINARY(pRaw, dataPos, pStb->comment, pStb->commentLen + 1, _OVER)
}
if (pStb->ast1Len > 0) {
SDB_SET_BINARY(pRaw, dataPos, pStb->pAst1, pStb->ast1Len, _OVER)
}
if (pStb->ast2Len > 0) {
SDB_SET_BINARY(pRaw, dataPos, pStb->pAst2, pStb->ast2Len, _OVER)
}
SDB_SET_RESERVE(pRaw, dataPos, STB_RESERVE_SIZE, _OVER)
SDB_SET_DATALEN(pRaw, dataPos, _OVER)
......@@ -180,6 +183,7 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT64(pRaw, dataPos, &pStb->dbUid, _OVER)
SDB_GET_INT32(pRaw, dataPos, &pStb->tagVer, _OVER)
SDB_GET_INT32(pRaw, dataPos, &pStb->colVer, _OVER)
SDB_GET_INT32(pRaw, dataPos, &pStb->smaVer, _OVER)
SDB_GET_INT32(pRaw, dataPos, &pStb->nextColId, _OVER)
SDB_GET_INT64(pRaw, dataPos, &pStb->maxdelay[0], _OVER)
SDB_GET_INT64(pRaw, dataPos, &pStb->maxdelay[1], _OVER)
......@@ -188,27 +192,15 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT32(pRaw, dataPos, &pStb->ttl, _OVER)
SDB_GET_INT32(pRaw, dataPos, &pStb->numOfColumns, _OVER)
SDB_GET_INT32(pRaw, dataPos, &pStb->numOfTags, _OVER)
SDB_GET_INT32(pRaw, dataPos, &pStb->numOfFuncs, _OVER)
SDB_GET_INT32(pRaw, dataPos, &pStb->commentLen, _OVER)
SDB_GET_INT32(pRaw, dataPos, &pStb->ast1Len, _OVER)
SDB_GET_INT32(pRaw, dataPos, &pStb->ast2Len, _OVER)
int32_t funcNum = 0;
SDB_GET_INT32(pRaw, dataPos, &funcNum, _OVER)
if (funcNum > 0) {
pStb->pFuncs = taosArrayInit(funcNum, TSDB_FUNC_NAME_LEN);
if (NULL == pStb->pFuncs) {
goto _OVER;
}
char funcName[TSDB_FUNC_NAME_LEN];
for (int32_t i = 0; i < funcNum; ++i) {
SDB_GET_BINARY(pRaw, dataPos, funcName, TSDB_FUNC_NAME_LEN, _OVER)
taosArrayPush(pStb->pFuncs, funcName);
}
}
pStb->pColumns = taosMemoryCalloc(pStb->numOfColumns, sizeof(SSchema));
pStb->pTags = taosMemoryCalloc(pStb->numOfTags, sizeof(SSchema));
if (pStb->pColumns == NULL || pStb->pTags == NULL) {
pStb->pFuncs = taosArrayInit(pStb->numOfFuncs, TSDB_FUNC_NAME_LEN);
if (pStb->pColumns == NULL || pStb->pTags == NULL || pStb->pFuncs == NULL) {
goto _OVER;
}
......@@ -230,16 +222,24 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) {
SDB_GET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN, _OVER)
}
for (int32_t i = 0; i < pStb->numOfFuncs; ++i) {
char funcName[TSDB_FUNC_NAME_LEN] = {0};
SDB_GET_BINARY(pRaw, dataPos, funcName, TSDB_FUNC_NAME_LEN, _OVER)
taosArrayPush(pStb->pFuncs, funcName);
}
if (pStb->commentLen > 0) {
pStb->comment = taosMemoryCalloc(pStb->commentLen + 1, 1);
if (pStb->comment == NULL) goto _OVER;
SDB_GET_BINARY(pRaw, dataPos, pStb->comment, pStb->commentLen + 1, _OVER)
}
if (pStb->ast1Len > 0) {
pStb->pAst1 = taosMemoryCalloc(pStb->ast1Len, 1);
if (pStb->pAst1 == NULL) goto _OVER;
SDB_GET_BINARY(pRaw, dataPos, pStb->pAst1, pStb->ast1Len, _OVER)
}
if (pStb->ast2Len > 0) {
pStb->pAst2 = taosMemoryCalloc(pStb->ast2Len, 1);
if (pStb->pAst2 == NULL) goto _OVER;
......@@ -273,6 +273,7 @@ static int32_t mndStbActionDelete(SSdb *pSdb, SStbObj *pStb) {
taosMemoryFreeClear(pStb->pColumns);
taosMemoryFreeClear(pStb->pTags);
taosMemoryFreeClear(pStb->comment);
taosMemoryFreeClear(pStb->pFuncs);
taosMemoryFreeClear(pStb->pAst1);
taosMemoryFreeClear(pStb->pAst2);
taosArrayDestroy(pStb->pFuncs);
......@@ -322,7 +323,7 @@ static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOld, SStbObj *pNew) {
pOld->commentLen = pNew->commentLen;
if (pOld->ast1Len < pNew->ast1Len) {
void *pAst1 = taosMemoryMalloc(pNew->ast1Len);
void *pAst1 = taosMemoryMalloc(pNew->ast1Len + 1);
if (pAst1 != NULL) {
taosMemoryFree(pOld->pAst1);
pOld->pAst1 = pAst1;
......@@ -334,7 +335,7 @@ static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOld, SStbObj *pNew) {
}
if (pOld->ast2Len < pNew->ast2Len) {
void *pAst2 = taosMemoryMalloc(pNew->ast2Len);
void *pAst2 = taosMemoryMalloc(pNew->ast2Len + 1);
if (pAst2 != NULL) {
taosMemoryFree(pOld->pAst2);
pOld->pAst2 = pAst2;
......@@ -361,12 +362,15 @@ static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOld, SStbObj *pNew) {
}
if (pNew->commentLen > 0) {
memcpy(pOld->comment, pNew->comment, pNew->commentLen + 1);
pOld->commentLen = pNew->commentLen;
}
if (pNew->ast1Len != 0) {
memcpy(pOld->pAst1, pNew->pAst1, pNew->ast1Len);
pOld->ast1Len = pNew->ast1Len;
}
if (pNew->ast2Len != 0) {
memcpy(pOld->pAst2, pNew->pAst2, pNew->ast2Len);
pOld->ast2Len = pNew->ast2Len;
}
taosWUnLockLatch(&pOld->lock);
return 0;
......@@ -575,7 +579,10 @@ int32_t mndCheckCreateStbReq(SMCreateStbReq *pCreate) {
static int32_t mndSetCreateStbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
SSdbRaw *pRedoRaw = mndStbActionEncode(pStb);
if (pRedoRaw == NULL) return -1;
if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1;
if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) {
sdbFreeRaw(pRedoRaw);
return -1;
}
if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING) != 0) return -1;
return 0;
......@@ -584,7 +591,10 @@ static int32_t mndSetCreateStbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *p
static int32_t mndSetCreateStbUndoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
SSdbRaw *pUndoRaw = mndStbActionEncode(pStb);
if (pUndoRaw == NULL) return -1;
if (mndTransAppendUndolog(pTrans, pUndoRaw) != 0) return -1;
if (mndTransAppendUndolog(pTrans, pUndoRaw) != 0) {
sdbFreeRaw(pUndoRaw);
return -1;
}
if (sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED) != 0) return -1;
return 0;
......@@ -593,7 +603,10 @@ static int32_t mndSetCreateStbUndoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *p
static int32_t mndSetCreateStbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
SSdbRaw *pCommitRaw = mndStbActionEncode(pStb);
if (pCommitRaw == NULL) return -1;
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
sdbFreeRaw(pCommitRaw);
return -1;
}
if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) return -1;
return 0;
......@@ -613,6 +626,11 @@ static int32_t mndSetCreateStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
continue;
}
if (pVgroup->isTsma) {
sdbRelease(pSdb, pVgroup);
continue;
}
void *pReq = mndBuildVCreateStbReq(pMnode, pVgroup, pStb, &contLen);
if (pReq == NULL) {
sdbCancelFetch(pSdb, pIter);
......@@ -651,6 +669,11 @@ static int32_t mndSetCreateStbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj
continue;
}
if (pVgroup->isTsma) {
sdbRelease(pSdb, pVgroup);
continue;
}
int32_t contLen = 0;
void *pReq = mndBuildVDropStbReq(pMnode, pVgroup, pStb, &contLen);
if (pReq == NULL) {
......@@ -697,6 +720,7 @@ int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreat
pDst->dbUid = pDb->uid;
pDst->tagVer = 1;
pDst->colVer = 1;
pDst->smaVer = 1;
pDst->nextColId = 1;
pDst->maxdelay[0] = pCreate->delay1;
pDst->maxdelay[1] = pCreate->delay2;
......@@ -705,6 +729,7 @@ int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreat
pDst->ttl = pCreate->ttl;
pDst->numOfColumns = pCreate->numOfColumns;
pDst->numOfTags = pCreate->numOfTags;
pDst->numOfFuncs = pCreate->numOfFuncs;
pDst->commentLen = pCreate->commentLen;
pDst->pFuncs = pCreate->pFuncs;
pCreate->pFuncs = NULL;
......@@ -715,7 +740,7 @@ int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreat
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
memcpy(pDst->comment, pCreate->comment, pDst->commentLen + 1);
memcpy(pDst->comment, pCreate->pComment, pDst->commentLen + 1);
}
pDst->ast1Len = pCreate->ast1Len;
......@@ -770,20 +795,15 @@ int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreat
static int32_t mndCreateStb(SMnode *pMnode, SRpcMsg *pReq, SMCreateStbReq *pCreate, SDbObj *pDb) {
SStbObj stbObj = {0};
int32_t code = -1;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pReq);
if (pTrans == NULL) goto _OVER;
mDebug("trans:%d, used to create stb:%s", pTrans->id, pCreate->name);
if (mndBuildStbFromReq(pMnode, &stbObj, pCreate, pDb) != 0) goto _OVER;
if (mndAddStbToTrans(pMnode, pTrans, pDb, &stbObj) < 0) goto _OVER;
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
code = 0;
_OVER:
......@@ -906,7 +926,8 @@ _OVER:
}
static int32_t mndCheckAlterStbReq(SMAlterStbReq *pAlter) {
if (pAlter->commentLen >= 0 || pAlter->ttl != 0) return 0;
if (pAlter->commentLen >= 0) return 0;
if (pAlter->ttl != 0) return 0;
if (pAlter->numOfFields < 1 || pAlter->numOfFields != (int32_t)taosArrayGetSize(pAlter->pFields)) {
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
......@@ -969,6 +990,7 @@ static int32_t mndUpdateStbCommentAndTTL(const SStbObj *pOld, SStbObj *pNew, cha
memcpy(pNew->comment, pComment, commentLen + 1);
} else if (commentLen == 0) {
pNew->commentLen = 0;
} else {
}
if (ttl >= 0) {
......@@ -1245,7 +1267,10 @@ static int32_t mndAlterStbColumnBytes(SMnode *pMnode, const SStbObj *pOld, SStbO
static int32_t mndSetAlterStbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
SSdbRaw *pRedoRaw = mndStbActionEncode(pStb);
if (pRedoRaw == NULL) return -1;
if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1;
if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) {
sdbFreeRaw(pRedoRaw);
return -1;
}
if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY) != 0) return -1;
return 0;
......@@ -1254,7 +1279,10 @@ static int32_t mndSetAlterStbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pD
static int32_t mndSetAlterStbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
SSdbRaw *pCommitRaw = mndStbActionEncode(pStb);
if (pCommitRaw == NULL) return -1;
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
sdbFreeRaw(pCommitRaw);
return -1;
}
if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) return -1;
return 0;
......@@ -1274,6 +1302,11 @@ static int32_t mndSetAlterStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
continue;
}
if (pVgroup->isTsma) {
sdbRelease(pSdb, pVgroup);
continue;
}
void *pReq = mndBuildVCreateStbReq(pMnode, pVgroup, pStb, &contLen);
if (pReq == NULL) {
sdbCancelFetch(pSdb, pIter);
......@@ -1388,7 +1421,7 @@ static int32_t mndBuildStbCfgImp(SDbObj *pDb, SStbObj *pStb, const char *tbName,
pSchema->bytes = pSrcSchema->bytes;
}
if (pStb->pFuncs) {
if (pStb->numOfFuncs > 0) {
pRsp->pFuncs = taosArrayDup(pStb->pFuncs);
}
......@@ -1626,7 +1659,10 @@ _OVER:
static int32_t mndSetDropStbRedoLogs(SMnode *pMnode, STrans *pTrans, SStbObj *pStb) {
SSdbRaw *pRedoRaw = mndStbActionEncode(pStb);
if (pRedoRaw == NULL) return -1;
if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1;
if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) {
sdbFreeRaw(pRedoRaw);
return -1;
}
if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING) != 0) return -1;
return 0;
......@@ -1635,7 +1671,10 @@ static int32_t mndSetDropStbRedoLogs(SMnode *pMnode, STrans *pTrans, SStbObj *pS
static int32_t mndSetDropStbCommitLogs(SMnode *pMnode, STrans *pTrans, SStbObj *pStb) {
SSdbRaw *pCommitRaw = mndStbActionEncode(pStb);
if (pCommitRaw == NULL) return -1;
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
sdbFreeRaw(pCommitRaw);
return -1;
}
if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED) != 0) return -1;
return 0;
......@@ -1654,6 +1693,11 @@ static int32_t mndSetDropStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *
continue;
}
if (pVgroup->isTsma) {
sdbRelease(pSdb, pVgroup);
continue;
}
int32_t contLen = 0;
void *pReq = mndBuildVDropStbReq(pMnode, pVgroup, pStb, &contLen);
if (pReq == NULL) {
......@@ -1683,7 +1727,7 @@ static int32_t mndSetDropStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *
static int32_t mndDropStb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *pStb) {
int32_t code = -1;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pReq);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq);
if (pTrans == NULL) goto _OVER;
mDebug("trans:%d, used to drop stb:%s", pTrans->id, pStb->name);
......
......@@ -784,12 +784,12 @@ static EDealRes translateValueImpl(STranslateContext* pCxt, SValueNode* pVal, SD
return DEAL_RES_CONTINUE;
}
if (TSDB_DATA_TYPE_NULL == pVal->node.resType.type) {
// TODO
//pVal->node.resType = targetDt;
// TODO
// pVal->node.resType = targetDt;
pVal->translate = true;
pVal->isNull = true;
return DEAL_RES_CONTINUE;
}
}
if (pVal->isDuration) {
if (parseNatualDuration(pVal->literal, strlen(pVal->literal), &pVal->datum.i, &pVal->unit, precision) !=
TSDB_CODE_SUCCESS) {
......@@ -3621,8 +3621,8 @@ static int32_t buildCreateStbReq(STranslateContext* pCxt, SCreateTableStmt* pStm
pReq->numOfColumns = LIST_LENGTH(pStmt->pCols);
pReq->numOfTags = LIST_LENGTH(pStmt->pTags);
if (pStmt->pOptions->commentNull == false) {
pReq->comment = strdup(pStmt->pOptions->comment);
if (NULL == pReq->comment) {
pReq->pComment = strdup(pStmt->pOptions->comment);
if (NULL == pReq->pComment) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pReq->commentLen = strlen(pStmt->pOptions->comment);
......@@ -3630,6 +3630,7 @@ static int32_t buildCreateStbReq(STranslateContext* pCxt, SCreateTableStmt* pStm
pReq->commentLen = -1;
}
buildRollupFuncs(pStmt->pOptions->pRollupFuncs, &pReq->pFuncs);
pReq->numOfFuncs = taosArrayGetSize(pReq->pFuncs);
SName tableName;
tNameExtractFullName(toName(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, &tableName), pReq->name);
......
......@@ -372,7 +372,7 @@ TEST_F(ParserInitialCTest, createStable) {
expect.watermark2 = watermark2;
// expect.ttl = ttl;
if (nullptr != pComment) {
expect.comment = strdup(pComment);
expect.pComment = strdup(pComment);
expect.commentLen = strlen(pComment);
}
};
......@@ -443,7 +443,7 @@ TEST_F(ParserInitialCTest, createStable) {
}
}
if (expect.commentLen > 0) {
ASSERT_EQ(std::string(req.comment), std::string(expect.comment));
ASSERT_EQ(std::string(req.pComment), std::string(expect.pComment));
}
if (expect.ast1Len > 0) {
ASSERT_EQ(std::string(req.pAst1), std::string(expect.pAst1));
......
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/deploy.sh -n dnode2 -i 2
system sh/deploy.sh -n dnode3 -i 3
system sh/cfg.sh -n dnode1 -c supportVnodes -v 0
system sh/cfg.sh -n dnode2 -c supportVnodes -v 4
system sh/cfg.sh -n dnode3 -c supportVnodes -v 4
print ========== step1
system sh/exec.sh -n dnode1 -s start
sql connect
print ========== step2
sql create dnode $hostname port 7200
sql create dnode $hostname port 7300
system sh/exec.sh -n dnode2 -s start
system sh/exec.sh -n dnode3 -s start
$x = 0
step2:
$x = $x + 1
sleep 1000
if $x == 10 then
print ====> dnode not ready!
return -1
endi
sql show dnodes
print ===> $data00 $data01 $data02 $data03 $data04 $data05
print ===> $data10 $data11 $data12 $data13 $data14 $data15
if $rows != 3 then
return -1
endi
if $data(1)[4] != ready then
goto step2
endi
if $data(2)[4] != ready then
goto step2
endi
if $data(3)[4] != ready then
goto step2
endi
print ========== step3
sql create database d1 vgroups 1
sql use d1;
print --> create stb
sql create table if not exists stb (ts timestamp, c1 int, c2 float, c3 double) tags (t1 int unsigned);
print --> create sma
sql create sma index sma_index_name1 on stb function(max(c1),max(c2),min(c1)) interval(6m,10s) sliding(6m);
print --> drop stb
sql drop table stb;
print ========== step4 repeat
print --> create stb
sql create table if not exists stb (ts timestamp, c1 int, c2 float, c3 double) tags (t1 int unsigned);
print --> create sma
sql create sma index sma_index_name1 on stb function(max(c1),max(c2),min(c1)) interval(6m,10s) sliding(6m);
print --> drop stb
sql drop table stb;
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode2 -s stop -x SIGINT
system sh/exec.sh -n dnode3 -s stop -x SIGINT
system sh/exec.sh -n dnode4 -s stop -x SIGINT
system sh/exec.sh -n dnode5 -s stop -x SIGINT
system sh/exec.sh -n dnode6 -s stop -x SIGINT
system sh/exec.sh -n dnode7 -s stop -x SIGINT
system sh/exec.sh -n dnode8 -s stop -x SIGINT
\ No newline at end of file
......@@ -63,7 +63,7 @@ python3 ./test.py -f 2-query/To_unixtimestamp.py
python3 ./test.py -f 2-query/timetruncate.py
python3 ./test.py -f 2-query/diff.py
python3 ./test.py -f 2-query/Timediff.py
python3 ./test.py -f 2-query/json_tag.py
#python3 ./test.py -f 2-query/json_tag.py
python3 ./test.py -f 2-query/top.py
python3 ./test.py -f 2-query/bottom.py
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册