提交 0bdbf866 编写于 作者: H Hongze Cheng

Merge branch '3.0' of https://github.com/taosdata/TDengine into feat/tsdb_refact

...@@ -1519,7 +1519,7 @@ typedef struct { ...@@ -1519,7 +1519,7 @@ typedef struct {
#define STREAM_TRIGGER_MAX_DELAY 3 #define STREAM_TRIGGER_MAX_DELAY 3
typedef struct { typedef struct {
char name[TSDB_TABLE_FNAME_LEN]; char name[TSDB_STREAM_FNAME_LEN];
char sourceDB[TSDB_DB_FNAME_LEN]; char sourceDB[TSDB_DB_FNAME_LEN];
char targetStbFullName[TSDB_TABLE_FNAME_LEN]; char targetStbFullName[TSDB_TABLE_FNAME_LEN];
int8_t igExists; int8_t igExists;
...@@ -1539,7 +1539,7 @@ int32_t tDeserializeSCMCreateStreamReq(void* buf, int32_t bufLen, SCMCreateStrea ...@@ -1539,7 +1539,7 @@ int32_t tDeserializeSCMCreateStreamReq(void* buf, int32_t bufLen, SCMCreateStrea
void tFreeSCMCreateStreamReq(SCMCreateStreamReq* pReq); void tFreeSCMCreateStreamReq(SCMCreateStreamReq* pReq);
typedef struct { typedef struct {
char name[TSDB_TOPIC_FNAME_LEN]; char name[TSDB_STREAM_FNAME_LEN];
int64_t streamId; int64_t streamId;
char* sql; char* sql;
char* executorMsg; char* executorMsg;
...@@ -2008,7 +2008,7 @@ typedef struct { ...@@ -2008,7 +2008,7 @@ typedef struct {
char sql[TSDB_SHOW_SQL_LEN]; char sql[TSDB_SHOW_SQL_LEN];
uint64_t queryId; uint64_t queryId;
int64_t useconds; int64_t useconds;
int64_t stime; // timestamp precision ms int64_t stime; // timestamp precision ms
int64_t reqRid; int64_t reqRid;
int32_t pid; int32_t pid;
bool stableQuery; bool stableQuery;
...@@ -2257,13 +2257,13 @@ typedef struct { ...@@ -2257,13 +2257,13 @@ typedef struct {
} SMqVDeleteRsp; } SMqVDeleteRsp;
typedef struct { typedef struct {
char name[TSDB_STREAM_FNAME_LEN]; char name[TSDB_STREAM_FNAME_LEN];
int64_t streamId; int8_t igNotExists;
} SMDropStreamTaskReq; } SMDropStreamReq;
typedef struct { typedef struct {
int8_t reserved; int8_t reserved;
} SMDropStreamTaskRsp; } SMDropStreamRsp;
typedef struct { typedef struct {
SMsgHead head; SMsgHead head;
......
...@@ -411,10 +411,10 @@ int32_t* taosGetErrno(); ...@@ -411,10 +411,10 @@ int32_t* taosGetErrno();
#define TSDB_CODE_SYN_INVALID_CHECKSUM TAOS_DEF_ERROR_CODE(0, 0x0908) #define TSDB_CODE_SYN_INVALID_CHECKSUM TAOS_DEF_ERROR_CODE(0, 0x0908)
#define TSDB_CODE_SYN_INVALID_MSGLEN TAOS_DEF_ERROR_CODE(0, 0x0909) #define TSDB_CODE_SYN_INVALID_MSGLEN TAOS_DEF_ERROR_CODE(0, 0x0909)
#define TSDB_CODE_SYN_INVALID_MSGTYPE TAOS_DEF_ERROR_CODE(0, 0x090A) #define TSDB_CODE_SYN_INVALID_MSGTYPE TAOS_DEF_ERROR_CODE(0, 0x090A)
#define TSDB_CODE_SYN_IS_LEADER TAOS_DEF_ERROR_CODE(0, 0x090B)
#define TSDB_CODE_SYN_NOT_LEADER TAOS_DEF_ERROR_CODE(0, 0x0910) #define TSDB_CODE_SYN_NOT_LEADER TAOS_DEF_ERROR_CODE(0, 0x090C)
#define TSDB_CODE_SYN_ONE_REPLICA TAOS_DEF_ERROR_CODE(0, 0x0911) #define TSDB_CODE_SYN_ONE_REPLICA TAOS_DEF_ERROR_CODE(0, 0x090D)
#define TSDB_CODE_SYN_NOT_IN_NEW_CONFIG TAOS_DEF_ERROR_CODE(0, 0x0912) #define TSDB_CODE_SYN_NOT_IN_NEW_CONFIG TAOS_DEF_ERROR_CODE(0, 0x090E)
#define TSDB_CODE_SYN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x09FF) #define TSDB_CODE_SYN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x09FF)
// tq // tq
......
...@@ -210,7 +210,7 @@ typedef enum ELogicConditionType { ...@@ -210,7 +210,7 @@ typedef enum ELogicConditionType {
#define TSDB_TYPE_STR_MAX_LEN 32 #define TSDB_TYPE_STR_MAX_LEN 32
#define TSDB_TABLE_FNAME_LEN (TSDB_DB_FNAME_LEN + TSDB_TABLE_NAME_LEN + TSDB_NAME_DELIMITER_LEN) #define TSDB_TABLE_FNAME_LEN (TSDB_DB_FNAME_LEN + TSDB_TABLE_NAME_LEN + TSDB_NAME_DELIMITER_LEN)
#define TSDB_TOPIC_FNAME_LEN (TSDB_ACCT_ID_LEN + TSDB_TABLE_NAME_LEN + TSDB_NAME_DELIMITER_LEN) #define TSDB_TOPIC_FNAME_LEN (TSDB_ACCT_ID_LEN + TSDB_TABLE_NAME_LEN + TSDB_NAME_DELIMITER_LEN)
#define TSDB_STREAM_FNAME_LEN TSDB_TABLE_FNAME_LEN #define TSDB_STREAM_FNAME_LEN (TSDB_ACCT_ID_LEN + TSDB_TABLE_NAME_LEN + TSDB_NAME_DELIMITER_LEN)
#define TSDB_SUBSCRIBE_KEY_LEN (TSDB_CGROUP_LEN + TSDB_TOPIC_FNAME_LEN + 2) #define TSDB_SUBSCRIBE_KEY_LEN (TSDB_CGROUP_LEN + TSDB_TOPIC_FNAME_LEN + 2)
#define TSDB_PARTITION_KEY_LEN (TSDB_SUBSCRIBE_KEY_LEN + 20) #define TSDB_PARTITION_KEY_LEN (TSDB_SUBSCRIBE_KEY_LEN + 20)
#define TSDB_COL_NAME_LEN 65 #define TSDB_COL_NAME_LEN 65
......
...@@ -1584,6 +1584,11 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks ...@@ -1584,6 +1584,11 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks
int32_t rowSize = pDataBlock->info.rowSize; int32_t rowSize = pDataBlock->info.rowSize;
int64_t groupId = pDataBlock->info.groupId; int64_t groupId = pDataBlock->info.groupId;
if (colNum <= 1) {
// invalid if only with TS col
continue;
}
if (rb.nCols != colNum) { if (rb.nCols != colNum) {
tdSRowSetTpInfo(&rb, colNum, pTSchema->flen); tdSRowSetTpInfo(&rb, colNum, pTSchema->flen);
} }
...@@ -1680,23 +1685,28 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks ...@@ -1680,23 +1685,28 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks
msgLen += pSubmitBlk->dataLen; msgLen += pSubmitBlk->dataLen;
} }
(*pReq)->length = msgLen; if (numOfBlks > 0) {
(*pReq)->length = msgLen;
(*pReq)->header.vgId = htonl(vgId);
(*pReq)->header.contLen = htonl(msgLen); (*pReq)->header.vgId = htonl(vgId);
(*pReq)->length = (*pReq)->header.contLen; (*pReq)->header.contLen = htonl(msgLen);
(*pReq)->numOfBlocks = htonl(numOfBlks); (*pReq)->length = (*pReq)->header.contLen;
SSubmitBlk* blk = (SSubmitBlk*)((*pReq) + 1); (*pReq)->numOfBlocks = htonl(numOfBlks);
while (numOfBlks--) { SSubmitBlk* blk = (SSubmitBlk*)((*pReq) + 1);
int32_t dataLen = blk->dataLen; while (numOfBlks--) {
blk->uid = htobe64(blk->uid); int32_t dataLen = blk->dataLen;
blk->suid = htobe64(blk->suid); blk->uid = htobe64(blk->uid);
blk->padding = htonl(blk->padding); blk->suid = htobe64(blk->suid);
blk->sversion = htonl(blk->sversion); blk->padding = htonl(blk->padding);
blk->dataLen = htonl(blk->dataLen); blk->sversion = htonl(blk->sversion);
blk->schemaLen = htonl(blk->schemaLen); blk->dataLen = htonl(blk->dataLen);
blk->numOfRows = htons(blk->numOfRows); blk->schemaLen = htonl(blk->schemaLen);
blk = (SSubmitBlk*)(blk->data + dataLen); blk->numOfRows = htons(blk->numOfRows);
blk = (SSubmitBlk*)(blk->data + dataLen);
}
} else {
// no valid rows
taosMemoryFreeClear(*pReq);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
......
...@@ -539,27 +539,36 @@ typedef struct { ...@@ -539,27 +539,36 @@ typedef struct {
} SMqRebOutputObj; } SMqRebOutputObj;
typedef struct { typedef struct {
char name[TSDB_STREAM_FNAME_LEN]; char name[TSDB_STREAM_FNAME_LEN];
char sourceDb[TSDB_DB_FNAME_LEN]; // ctl
char targetDb[TSDB_DB_FNAME_LEN]; SRWLatch lock;
char targetSTbName[TSDB_TABLE_FNAME_LEN]; // create info
int64_t targetStbUid; int64_t createTime;
int64_t createTime; int64_t updateTime;
int64_t updateTime; int32_t version;
int64_t uid; int64_t smaId; // 0 for unused
int64_t dbUid; // info
int32_t version; int64_t uid;
int32_t vgNum; int8_t status;
SRWLatch lock; // config
int8_t status; int8_t dropPolicy;
int8_t createdBy; // STREAM_CREATED_BY__USER or SMA int8_t trigger;
int32_t fixedSinkVgId; // 0 for shuffle int64_t triggerParam;
SVgObj fixedSinkVg; int64_t watermark;
int64_t smaId; // 0 for unused // source and target
int8_t trigger; int64_t sourceDbUid;
int64_t triggerParam; int64_t targetDbUid;
int64_t watermark; char sourceDb[TSDB_DB_FNAME_LEN];
char targetDb[TSDB_DB_FNAME_LEN];
char targetSTbName[TSDB_TABLE_FNAME_LEN];
int64_t targetStbUid;
int32_t fixedSinkVgId; // 0 for shuffle
// fixedSinkVg is not applicable for encode and decode
SVgObj fixedSinkVg;
// transformation
char* sql; char* sql;
char* ast;
char* physicalPlan; char* physicalPlan;
SArray* tasks; // SArray<SArray<SStreamTask>> SArray* tasks; // SArray<SArray<SStreamTask>>
SSchemaWrapper outputSchema; SSchemaWrapper outputSchema;
......
...@@ -32,6 +32,8 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream); ...@@ -32,6 +32,8 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream);
int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64_t uid, int8_t triggerType, int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64_t uid, int8_t triggerType,
int64_t watermark, double filesFactor); int64_t watermark, double filesFactor);
int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -27,8 +27,7 @@ void mndCleanupStb(SMnode *pMnode); ...@@ -27,8 +27,7 @@ void mndCleanupStb(SMnode *pMnode);
SStbObj *mndAcquireStb(SMnode *pMnode, char *stbName); SStbObj *mndAcquireStb(SMnode *pMnode, char *stbName);
void mndReleaseStb(SMnode *pMnode, SStbObj *pStb); void mndReleaseStb(SMnode *pMnode, SStbObj *pStb);
SSdbRaw *mndStbActionEncode(SStbObj *pStb); SSdbRaw *mndStbActionEncode(SStbObj *pStb);
int32_t mndValidateStbInfo(SMnode *pMnode, SSTableVersion *pStbs, int32_t numOfStbs, void **ppRsp, int32_t mndValidateStbInfo(SMnode *pMnode, SSTableVersion *pStbs, int32_t numOfStbs, void **ppRsp, int32_t *pRspLen);
int32_t *pRspLen);
int32_t mndGetNumOfStbs(SMnode *pMnode, char *dbName, int32_t *pNumOfStbs); int32_t mndGetNumOfStbs(SMnode *pMnode, char *dbName, int32_t *pNumOfStbs);
int32_t mndCheckCreateStbReq(SMCreateStbReq *pCreate); int32_t mndCheckCreateStbReq(SMCreateStbReq *pCreate);
...@@ -36,6 +35,9 @@ SDbObj *mndAcquireDbByStb(SMnode *pMnode, const char *stbName); ...@@ -36,6 +35,9 @@ SDbObj *mndAcquireDbByStb(SMnode *pMnode, const char *stbName);
int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreate, SDbObj *pDb); int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreate, SDbObj *pDb);
int32_t mndAddStbToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb); int32_t mndAddStbToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb);
void mndExtractDbNameFromStbFullName(const char *stbFullName, char *dst);
void mndExtractTbNameFromStbFullName(const char *stbFullName, char *dst, int32_t dstSize);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -31,7 +31,8 @@ void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream); ...@@ -31,7 +31,8 @@ void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream);
SSdbRaw *mndStreamActionEncode(SStreamObj *pStream); SSdbRaw *mndStreamActionEncode(SStreamObj *pStream);
SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw); SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw);
int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, STrans *pTrans); int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb);
int32_t mndPersistStream(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -18,34 +18,35 @@ ...@@ -18,34 +18,35 @@
#include "mndConsumer.h" #include "mndConsumer.h"
int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) { int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
int32_t sz = 0;
/*int32_t outputNameSz = 0;*/
if (tEncodeCStr(pEncoder, pObj->name) < 0) return -1; if (tEncodeCStr(pEncoder, pObj->name) < 0) return -1;
if (tEncodeCStr(pEncoder, pObj->sourceDb) < 0) return -1;
if (tEncodeCStr(pEncoder, pObj->targetDb) < 0) return -1;
if (tEncodeCStr(pEncoder, pObj->targetSTbName) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->targetStbUid) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->createTime) < 0) return -1; if (tEncodeI64(pEncoder, pObj->createTime) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->updateTime) < 0) return -1; if (tEncodeI64(pEncoder, pObj->updateTime) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->uid) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->dbUid) < 0) return -1;
if (tEncodeI32(pEncoder, pObj->version) < 0) return -1; if (tEncodeI32(pEncoder, pObj->version) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->smaId) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->uid) < 0) return -1;
if (tEncodeI8(pEncoder, pObj->status) < 0) return -1; if (tEncodeI8(pEncoder, pObj->status) < 0) return -1;
if (tEncodeI8(pEncoder, pObj->createdBy) < 0) return -1;
if (tEncodeI8(pEncoder, pObj->dropPolicy) < 0) return -1;
if (tEncodeI8(pEncoder, pObj->trigger) < 0) return -1; if (tEncodeI8(pEncoder, pObj->trigger) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->triggerParam) < 0) return -1; if (tEncodeI64(pEncoder, pObj->triggerParam) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->watermark) < 0) return -1; if (tEncodeI64(pEncoder, pObj->watermark) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->sourceDbUid) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->targetDbUid) < 0) return -1;
if (tEncodeCStr(pEncoder, pObj->sourceDb) < 0) return -1;
if (tEncodeCStr(pEncoder, pObj->targetDb) < 0) return -1;
if (tEncodeCStr(pEncoder, pObj->targetSTbName) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->targetStbUid) < 0) return -1;
if (tEncodeI32(pEncoder, pObj->fixedSinkVgId) < 0) return -1; if (tEncodeI32(pEncoder, pObj->fixedSinkVgId) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->smaId) < 0) return -1;
if (tEncodeCStr(pEncoder, pObj->sql) < 0) return -1; if (tEncodeCStr(pEncoder, pObj->sql) < 0) return -1;
/*if (tEncodeCStr(pEncoder, pObj->logicalPlan) < 0) return -1;*/ if (tEncodeCStr(pEncoder, pObj->ast) < 0) return -1;
if (tEncodeCStr(pEncoder, pObj->physicalPlan) < 0) return -1; if (tEncodeCStr(pEncoder, pObj->physicalPlan) < 0) return -1;
// TODO encode tasks
if (pObj->tasks) {
sz = taosArrayGetSize(pObj->tasks);
}
if (tEncodeI32(pEncoder, sz) < 0) return -1;
int32_t sz = taosArrayGetSize(pObj->tasks);
if (tEncodeI32(pEncoder, sz) < 0) return -1;
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
SArray *pArray = taosArrayGetP(pObj->tasks, i); SArray *pArray = taosArrayGetP(pObj->tasks, i);
int32_t innerSz = taosArrayGetSize(pArray); int32_t innerSz = taosArrayGetSize(pArray);
...@@ -58,40 +59,37 @@ int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) { ...@@ -58,40 +59,37 @@ int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
if (tEncodeSSchemaWrapper(pEncoder, &pObj->outputSchema) < 0) return -1; if (tEncodeSSchemaWrapper(pEncoder, &pObj->outputSchema) < 0) return -1;
#if 0
if (pObj->ColAlias != NULL) {
outputNameSz = taosArrayGetSize(pObj->ColAlias);
}
if (tEncodeI32(pEncoder, outputNameSz) < 0) return -1;
for (int32_t i = 0; i < outputNameSz; i++) {
char *name = taosArrayGetP(pObj->ColAlias, i);
if (tEncodeCStr(pEncoder, name) < 0) return -1;
}
#endif
return pEncoder->pos; return pEncoder->pos;
} }
int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj) { int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj) {
if (tDecodeCStrTo(pDecoder, pObj->name) < 0) return -1; if (tDecodeCStrTo(pDecoder, pObj->name) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pObj->sourceDb) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pObj->targetDb) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pObj->targetSTbName) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->targetStbUid) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->createTime) < 0) return -1; if (tDecodeI64(pDecoder, &pObj->createTime) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->updateTime) < 0) return -1; if (tDecodeI64(pDecoder, &pObj->updateTime) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->uid) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->dbUid) < 0) return -1;
if (tDecodeI32(pDecoder, &pObj->version) < 0) return -1; if (tDecodeI32(pDecoder, &pObj->version) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->smaId) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->uid) < 0) return -1;
if (tDecodeI8(pDecoder, &pObj->status) < 0) return -1; if (tDecodeI8(pDecoder, &pObj->status) < 0) return -1;
if (tDecodeI8(pDecoder, &pObj->createdBy) < 0) return -1;
if (tDecodeI8(pDecoder, &pObj->dropPolicy) < 0) return -1;
if (tDecodeI8(pDecoder, &pObj->trigger) < 0) return -1; if (tDecodeI8(pDecoder, &pObj->trigger) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->triggerParam) < 0) return -1; if (tDecodeI64(pDecoder, &pObj->triggerParam) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->watermark) < 0) return -1; if (tDecodeI64(pDecoder, &pObj->watermark) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->sourceDbUid) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->targetDbUid) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pObj->sourceDb) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pObj->targetDb) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pObj->targetSTbName) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->targetStbUid) < 0) return -1;
if (tDecodeI32(pDecoder, &pObj->fixedSinkVgId) < 0) return -1; if (tDecodeI32(pDecoder, &pObj->fixedSinkVgId) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->smaId) < 0) return -1;
if (tDecodeCStrAlloc(pDecoder, &pObj->sql) < 0) return -1; if (tDecodeCStrAlloc(pDecoder, &pObj->sql) < 0) return -1;
/*if (tDecodeCStrAlloc(pDecoder, &pObj->logicalPlan) < 0) return -1;*/ if (tDecodeCStrAlloc(pDecoder, &pObj->ast) < 0) return -1;
if (tDecodeCStrAlloc(pDecoder, &pObj->physicalPlan) < 0) return -1; if (tDecodeCStrAlloc(pDecoder, &pObj->physicalPlan) < 0) return -1;
pObj->tasks = NULL; pObj->tasks = NULL;
int32_t sz; int32_t sz;
if (tDecodeI32(pDecoder, &sz) < 0) return -1; if (tDecodeI32(pDecoder, &sz) < 0) return -1;
...@@ -112,21 +110,7 @@ int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj) { ...@@ -112,21 +110,7 @@ int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj) {
} }
if (tDecodeSSchemaWrapper(pDecoder, &pObj->outputSchema) < 0) return -1; if (tDecodeSSchemaWrapper(pDecoder, &pObj->outputSchema) < 0) return -1;
#if 0
int32_t outputNameSz;
if (tDecodeI32(pDecoder, &outputNameSz) < 0) return -1;
if (outputNameSz != 0) {
pObj->ColAlias = taosArrayInit(outputNameSz, sizeof(void *));
if (pObj->ColAlias == NULL) {
return -1;
}
}
for (int32_t i = 0; i < outputNameSz; i++) {
char *name;
if (tDecodeCStrAlloc(pDecoder, &name) < 0) return -1;
taosArrayPush(pObj->ColAlias, &name);
}
#endif
return 0; return 0;
} }
......
...@@ -397,7 +397,7 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { ...@@ -397,7 +397,7 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
char logBuf[512] = {0}; char logBuf[512] = {0};
char *syncNodeStr = sync2SimpleStr(pMgmt->sync); char *syncNodeStr = sync2SimpleStr(pMgmt->sync);
snprintf(logBuf, sizeof(logBuf), "==vnodeProcessSyncReq== msgType:%d, syncNode: %s", pMsg->msgType, syncNodeStr); snprintf(logBuf, sizeof(logBuf), "==mndProcessSyncMsg== msgType:%d, syncNode: %s", pMsg->msgType, syncNodeStr);
static int64_t mndTick = 0; static int64_t mndTick = 0;
if (++mndTick % 10 == 1) { if (++mndTick % 10 == 1) {
mTrace("sync trace msg:%s, %s", TMSG_INFO(pMsg->msgType), syncNodeStr); mTrace("sync trace msg:%s, %s", TMSG_INFO(pMsg->msgType), syncNodeStr);
......
...@@ -131,7 +131,7 @@ int32_t mndPersistTaskDeployReq(STrans* pTrans, SStreamTask* pTask, const SEpSet ...@@ -131,7 +131,7 @@ int32_t mndPersistTaskDeployReq(STrans* pTrans, SStreamTask* pTask, const SEpSet
int32_t mndAddSinkToTask(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, SStreamTask* pTask) { int32_t mndAddSinkToTask(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, SStreamTask* pTask) {
pTask->dispatchType = TASK_DISPATCH__NONE; pTask->dispatchType = TASK_DISPATCH__NONE;
// sink // sink
if (pStream->createdBy == STREAM_CREATED_BY__SMA) { if (pStream->smaId != 0) {
pTask->sinkType = TASK_SINK__SMA; pTask->sinkType = TASK_SINK__SMA;
pTask->smaSink.smaId = pStream->smaId; pTask->smaSink.smaId = pStream->smaId;
} else { } else {
...@@ -275,7 +275,7 @@ int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, STrans* pTrans, SStreamOb ...@@ -275,7 +275,7 @@ int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, STrans* pTrans, SStreamOb
pTask->execType = TASK_EXEC__NONE; pTask->execType = TASK_EXEC__NONE;
// sink // sink
if (pStream->createdBy == STREAM_CREATED_BY__SMA) { if (pStream->smaId != 0) {
pTask->sinkType = TASK_SINK__SMA; pTask->sinkType = TASK_SINK__SMA;
pTask->smaSink.smaId = pStream->smaId; pTask->smaSink.smaId = pStream->smaId;
} else { } else {
...@@ -321,7 +321,7 @@ int32_t mndAddFixedSinkTaskToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* ...@@ -321,7 +321,7 @@ int32_t mndAddFixedSinkTaskToStream(SMnode* pMnode, STrans* pTrans, SStreamObj*
pTask->execType = TASK_EXEC__NONE; pTask->execType = TASK_EXEC__NONE;
// sink // sink
if (pStream->createdBy == STREAM_CREATED_BY__SMA) { if (pStream->smaId != 0) {
pTask->sinkType = TASK_SINK__SMA; pTask->sinkType = TASK_SINK__SMA;
pTask->smaSink.smaId = pStream->smaId; pTask->smaSink.smaId = pStream->smaId;
} else { } else {
...@@ -346,8 +346,6 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { ...@@ -346,8 +346,6 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
terrno = TSDB_CODE_QRY_INVALID_INPUT; terrno = TSDB_CODE_QRY_INVALID_INPUT;
return -1; return -1;
} }
ASSERT(pStream->vgNum == 0);
int32_t totLevel = LIST_LENGTH(pPlan->pSubplans); int32_t totLevel = LIST_LENGTH(pPlan->pSubplans);
ASSERT(totLevel <= 2); ASSERT(totLevel <= 2);
pStream->tasks = taosArrayInit(totLevel, sizeof(void*)); pStream->tasks = taosArrayInit(totLevel, sizeof(void*));
...@@ -399,7 +397,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { ...@@ -399,7 +397,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
// exec // exec
pFinalTask->execType = TASK_EXEC__PIPE; pFinalTask->execType = TASK_EXEC__PIPE;
SVgObj* pVgroup = mndSchedFetchOneVg(pMnode, pStream->dbUid); SVgObj* pVgroup = mndSchedFetchOneVg(pMnode, pStream->sourceDbUid);
if (mndAssignTaskToVg(pMnode, pTrans, pFinalTask, plan, pVgroup) < 0) { if (mndAssignTaskToVg(pMnode, pTrans, pFinalTask, plan, pVgroup) < 0) {
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
qDestroyQueryPlan(pPlan); qDestroyQueryPlan(pPlan);
...@@ -420,7 +418,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { ...@@ -420,7 +418,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
SVgObj* pVgroup; SVgObj* pVgroup;
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup); pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
if (pIter == NULL) break; if (pIter == NULL) break;
if (pVgroup->dbUid != pStream->dbUid) { if (pVgroup->dbUid != pStream->sourceDbUid) {
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
continue; continue;
} }
...@@ -463,7 +461,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { ...@@ -463,7 +461,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
SVgObj* pVgroup; SVgObj* pVgroup;
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup); pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
if (pIter == NULL) break; if (pIter == NULL) break;
if (pVgroup->dbUid != pStream->dbUid) { if (pVgroup->dbUid != pStream->sourceDbUid) {
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
continue; continue;
} }
......
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
#include "mndDnode.h" #include "mndDnode.h"
#include "mndInfoSchema.h" #include "mndInfoSchema.h"
#include "mndMnode.h" #include "mndMnode.h"
#include "mndScheduler.h"
#include "mndShow.h" #include "mndShow.h"
#include "mndStb.h" #include "mndStb.h"
#include "mndStream.h" #include "mndStream.h"
...@@ -520,6 +521,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea ...@@ -520,6 +521,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
memcpy(smaObj.db, pDb->name, TSDB_DB_FNAME_LEN); memcpy(smaObj.db, pDb->name, TSDB_DB_FNAME_LEN);
smaObj.createdTime = taosGetTimestampMs(); smaObj.createdTime = taosGetTimestampMs();
smaObj.uid = mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN); smaObj.uid = mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN);
ASSERT(smaObj.uid != 0);
char resultTbName[TSDB_TABLE_FNAME_LEN + 16] = {0}; char resultTbName[TSDB_TABLE_FNAME_LEN + 16] = {0};
snprintf(resultTbName, TSDB_TABLE_FNAME_LEN + 16, "td.tsma.rst.tb.%s", pCreate->name); snprintf(resultTbName, TSDB_TABLE_FNAME_LEN + 16, "td.tsma.rst.tb.%s", pCreate->name);
memcpy(smaObj.dstTbName, resultTbName, TSDB_TABLE_FNAME_LEN); memcpy(smaObj.dstTbName, resultTbName, TSDB_TABLE_FNAME_LEN);
...@@ -556,13 +558,13 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea ...@@ -556,13 +558,13 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
streamObj.createTime = taosGetTimestampMs(); streamObj.createTime = taosGetTimestampMs();
streamObj.updateTime = streamObj.createTime; streamObj.updateTime = streamObj.createTime;
streamObj.uid = mndGenerateUid(pCreate->name, strlen(pCreate->name)); streamObj.uid = mndGenerateUid(pCreate->name, strlen(pCreate->name));
streamObj.dbUid = pDb->uid; streamObj.sourceDbUid = pDb->uid;
streamObj.version = 1; streamObj.version = 1;
streamObj.sql = pCreate->sql; streamObj.sql = pCreate->sql;
streamObj.createdBy = STREAM_CREATED_BY__SMA;
streamObj.smaId = smaObj.uid; streamObj.smaId = smaObj.uid;
streamObj.watermark = 0; streamObj.watermark = 0;
streamObj.trigger = STREAM_TRIGGER_AT_ONCE; streamObj.trigger = STREAM_TRIGGER_AT_ONCE;
streamObj.ast = strdup(smaObj.ast);
if (mndAllocSmaVgroup(pMnode, pDb, &streamObj.fixedSinkVg) != 0) { if (mndAllocSmaVgroup(pMnode, pDb, &streamObj.fixedSinkVg) != 0) {
mError("sma:%s, failed to create since %s", smaObj.name, terrstr()); mError("sma:%s, failed to create since %s", smaObj.name, terrstr());
...@@ -571,6 +573,39 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea ...@@ -571,6 +573,39 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
smaObj.dstVgId = streamObj.fixedSinkVg.vgId; smaObj.dstVgId = streamObj.fixedSinkVg.vgId;
streamObj.fixedSinkVgId = smaObj.dstVgId; streamObj.fixedSinkVgId = smaObj.dstVgId;
SNode *pAst = NULL;
if (nodesStringToNode(streamObj.ast, &pAst) < 0) {
ASSERT(0);
return -1;
}
// extract output schema from ast
if (qExtractResultSchema(pAst, (int32_t *)&streamObj.outputSchema.nCols, &streamObj.outputSchema.pSchema) != 0) {
ASSERT(0);
return -1;
}
SQueryPlan *pPlan = NULL;
SPlanContext cxt = {
.pAstRoot = pAst,
.topicQuery = false,
.streamQuery = true,
.triggerType = streamObj.trigger,
.watermark = streamObj.watermark,
};
if (qCreateQueryPlan(&cxt, &pPlan, NULL) < 0) {
ASSERT(0);
return -1;
}
// save physcial plan
if (nodesNodeToString((SNode *)pPlan, false, &streamObj.physicalPlan, NULL) != 0) {
ASSERT(0);
return -1;
}
if (pAst != NULL) nodesDestroyNode(pAst);
int32_t code = -1; int32_t code = -1;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq);
if (pTrans == NULL) goto _OVER; if (pTrans == NULL) goto _OVER;
...@@ -585,7 +620,8 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea ...@@ -585,7 +620,8 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
if (mndSetUpdateSmaStbCommitLogs(pMnode, pTrans, pStb) != 0) goto _OVER; if (mndSetUpdateSmaStbCommitLogs(pMnode, pTrans, pStb) != 0) goto _OVER;
// if (mndSetCreateSmaRedoActions(pMnode, pTrans, pDb, &smaObj) != 0) goto _OVER; // if (mndSetCreateSmaRedoActions(pMnode, pTrans, pDb, &smaObj) != 0) goto _OVER;
if (mndSetCreateSmaVgroupRedoActions(pMnode, pTrans, pDb, &streamObj.fixedSinkVg, &smaObj) != 0) goto _OVER; if (mndSetCreateSmaVgroupRedoActions(pMnode, pTrans, pDb, &streamObj.fixedSinkVg, &smaObj) != 0) goto _OVER;
if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, pTrans) != 0) goto _OVER; if (mndScheduleStream(pMnode, pTrans, &streamObj) != 0) goto _OVER;
if (mndPersistStream(pMnode, pTrans, &streamObj) != 0) goto _OVER;
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
code = 0; code = 0;
......
...@@ -28,7 +28,6 @@ ...@@ -28,7 +28,6 @@
#include "mndTrans.h" #include "mndTrans.h"
#include "mndUser.h" #include "mndUser.h"
#include "mndVgroup.h" #include "mndVgroup.h"
#include "mndSma.h"
#include "tname.h" #include "tname.h"
#define STB_VER_NUMBER 1 #define STB_VER_NUMBER 1
...@@ -1271,7 +1270,8 @@ static int32_t mndBuildStbSchemaImp(SDbObj *pDb, SStbObj *pStb, const char *tbNa ...@@ -1271,7 +1270,8 @@ static int32_t mndBuildStbSchemaImp(SDbObj *pDb, SStbObj *pStb, const char *tbNa
return 0; return 0;
} }
static int32_t mndBuildStbSchema(SMnode *pMnode, const char *dbFName, const char *tbName, STableMetaRsp *pRsp, int32_t *smaVer) { static int32_t mndBuildStbSchema(SMnode *pMnode, const char *dbFName, const char *tbName, STableMetaRsp *pRsp,
int32_t *smaVer) {
char tbFName[TSDB_TABLE_FNAME_LEN] = {0}; char tbFName[TSDB_TABLE_FNAME_LEN] = {0};
snprintf(tbFName, sizeof(tbFName), "%s.%s", dbFName, tbName); snprintf(tbFName, sizeof(tbFName), "%s.%s", dbFName, tbName);
...@@ -1672,7 +1672,7 @@ int32_t mndValidateStbInfo(SMnode *pMnode, SSTableVersion *pStbVersions, int32_t ...@@ -1672,7 +1672,7 @@ int32_t mndValidateStbInfo(SMnode *pMnode, SSTableVersion *pStbVersions, int32_t
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
for (int32_t i = 0; i < numOfStbs; ++i) { for (int32_t i = 0; i < numOfStbs; ++i) {
SSTableVersion *pStbVersion = &pStbVersions[i]; SSTableVersion *pStbVersion = &pStbVersions[i];
pStbVersion->suid = be64toh(pStbVersion->suid); pStbVersion->suid = be64toh(pStbVersion->suid);
...@@ -1681,7 +1681,7 @@ int32_t mndValidateStbInfo(SMnode *pMnode, SSTableVersion *pStbVersions, int32_t ...@@ -1681,7 +1681,7 @@ int32_t mndValidateStbInfo(SMnode *pMnode, SSTableVersion *pStbVersions, int32_t
pStbVersion->smaVer = ntohl(pStbVersion->smaVer); pStbVersion->smaVer = ntohl(pStbVersion->smaVer);
STableMetaRsp metaRsp = {0}; STableMetaRsp metaRsp = {0};
int32_t smaVer = 0; int32_t smaVer = 0;
mDebug("stb:%s.%s, start to retrieve meta", pStbVersion->dbFName, pStbVersion->stbName); mDebug("stb:%s.%s, start to retrieve meta", pStbVersion->dbFName, pStbVersion->stbName);
if (mndBuildStbSchema(pMnode, pStbVersion->dbFName, pStbVersion->stbName, &metaRsp, &smaVer) != 0) { if (mndBuildStbSchema(pMnode, pStbVersion->dbFName, pStbVersion->stbName, &metaRsp, &smaVer) != 0) {
metaRsp.numOfColumns = -1; metaRsp.numOfColumns = -1;
...@@ -1697,15 +1697,15 @@ int32_t mndValidateStbInfo(SMnode *pMnode, SSTableVersion *pStbVersions, int32_t ...@@ -1697,15 +1697,15 @@ int32_t mndValidateStbInfo(SMnode *pMnode, SSTableVersion *pStbVersions, int32_t
} }
if (pStbVersion->smaVer && pStbVersion->smaVer != smaVer) { if (pStbVersion->smaVer && pStbVersion->smaVer != smaVer) {
bool exist = false; bool exist = false;
char tbFName[TSDB_TABLE_FNAME_LEN]; char tbFName[TSDB_TABLE_FNAME_LEN];
STableIndexRsp indexRsp = {0}; STableIndexRsp indexRsp = {0};
indexRsp.pIndex = taosArrayInit(10, sizeof(STableIndexInfo)); indexRsp.pIndex = taosArrayInit(10, sizeof(STableIndexInfo));
if (NULL == indexRsp.pIndex) { if (NULL == indexRsp.pIndex) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
sprintf(tbFName, "%s.%s", pStbVersion->dbFName, pStbVersion->stbName); sprintf(tbFName, "%s.%s", pStbVersion->dbFName, pStbVersion->stbName);
int32_t code = mndGetTableSma(pMnode, tbFName, &indexRsp, &exist); int32_t code = mndGetTableSma(pMnode, tbFName, &indexRsp, &exist);
if (code || !exist) { if (code || !exist) {
...@@ -1769,16 +1769,23 @@ int32_t mndGetNumOfStbs(SMnode *pMnode, char *dbName, int32_t *pNumOfStbs) { ...@@ -1769,16 +1769,23 @@ int32_t mndGetNumOfStbs(SMnode *pMnode, char *dbName, int32_t *pNumOfStbs) {
return 0; return 0;
} }
static void mndExtractTableName(char *tableId, char *name) { void mndExtractDbNameFromStbFullName(const char *stbFullName, char *dst) {
SName name = {0};
tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
tNameGetFullDbName(&name, dst);
}
void mndExtractTbNameFromStbFullName(const char *stbFullName, char *dst, int32_t dstSize) {
int32_t pos = -1; int32_t pos = -1;
int32_t num = 0; int32_t num = 0;
for (pos = 0; tableId[pos] != 0; ++pos) { for (pos = 0; stbFullName[pos] != 0; ++pos) {
if (tableId[pos] == TS_PATH_DELIMITER[0]) num++; if (stbFullName[pos] == TS_PATH_DELIMITER[0]) num++;
if (num == 2) break; if (num == 2) break;
} }
if (num == 2) { if (num == 2) {
strcpy(name, tableId + pos + 1); tstrncpy(dst, stbFullName + pos + 1, dstSize);
} }
} }
...@@ -1808,7 +1815,7 @@ static int32_t mndRetrieveStb(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBloc ...@@ -1808,7 +1815,7 @@ static int32_t mndRetrieveStb(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBloc
SName name = {0}; SName name = {0};
char stbName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; char stbName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
mndExtractTableName(pStb->name, &stbName[VARSTR_HEADER_SIZE]); mndExtractTbNameFromStbFullName(pStb->name, &stbName[VARSTR_HEADER_SIZE], TSDB_TABLE_NAME_LEN);
varDataSetLen(stbName, strlen(&stbName[VARSTR_HEADER_SIZE])); varDataSetLen(stbName, strlen(&stbName[VARSTR_HEADER_SIZE]));
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
......
...@@ -196,7 +196,8 @@ void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream) { ...@@ -196,7 +196,8 @@ void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream) {
} }
static int32_t mndCheckCreateStreamReq(SCMCreateStreamReq *pCreate) { static int32_t mndCheckCreateStreamReq(SCMCreateStreamReq *pCreate) {
if (pCreate->name[0] == 0 || pCreate->sql == NULL || pCreate->sql[0] == 0) { if (pCreate->name[0] == 0 || pCreate->sql == NULL || pCreate->sql[0] == 0 || pCreate->sourceDB[0] == 0 ||
pCreate->targetStbFullName[0] == 0) {
terrno = TSDB_CODE_MND_INVALID_STREAM_OPTION; terrno = TSDB_CODE_MND_INVALID_STREAM_OPTION;
return -1; return -1;
} }
...@@ -232,6 +233,114 @@ static int32_t mndStreamGetPlanString(const char *ast, int8_t triggerType, int64 ...@@ -232,6 +233,114 @@ static int32_t mndStreamGetPlanString(const char *ast, int8_t triggerType, int64
return code; return code;
} }
static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, SCMCreateStreamReq *pCreate) {
SNode *pAst = NULL;
mDebug("stream:%s to create", pCreate->name);
memcpy(pObj->name, pCreate->name, TSDB_STREAM_FNAME_LEN);
pObj->createTime = taosGetTimestampMs();
pObj->updateTime = pObj->createTime;
pObj->version = 1;
pObj->smaId = 0;
pObj->uid = mndGenerateUid(pObj->name, strlen(pObj->name));
pObj->status = 0;
// TODO
pObj->dropPolicy = 0;
pObj->trigger = pCreate->triggerType;
pObj->triggerParam = pCreate->maxDelay;
pObj->watermark = pCreate->watermark;
memcpy(pObj->sourceDb, pCreate->sourceDB, TSDB_DB_FNAME_LEN);
SDbObj *pSourceDb = mndAcquireDb(pMnode, pCreate->sourceDB);
if (pSourceDb == NULL) {
/*ASSERT(0);*/
mDebug("stream:%s failed to create, source db %s not exist", pCreate->name, pObj->sourceDb);
terrno = TSDB_CODE_MND_DB_NOT_EXIST;
return -1;
}
pObj->sourceDbUid = pSourceDb->uid;
memcpy(pObj->targetSTbName, pCreate->targetStbFullName, TSDB_TABLE_FNAME_LEN);
SDbObj *pTargetDb = mndAcquireDbByStb(pMnode, pObj->targetSTbName);
if (pTargetDb == NULL) {
mDebug("stream:%s failed to create, target db %s not exist", pCreate->name, pObj->targetDb);
terrno = TSDB_CODE_MND_DB_NOT_EXIST;
return -1;
}
tstrncpy(pObj->targetDb, pTargetDb->name, TSDB_DB_FNAME_LEN);
pObj->targetStbUid = mndGenerateUid(pObj->targetSTbName, TSDB_TABLE_FNAME_LEN);
pObj->targetDbUid = pTargetDb->uid;
pObj->sql = pCreate->sql;
pObj->ast = pCreate->ast;
pCreate->sql = NULL;
pCreate->ast = NULL;
// deserialize ast
if (nodesStringToNode(pObj->ast, &pAst) < 0) {
/*ASSERT(0);*/
goto FAIL;
}
// extract output schema from ast
if (qExtractResultSchema(pAst, (int32_t *)&pObj->outputSchema.nCols, &pObj->outputSchema.pSchema) != 0) {
/*ASSERT(0);*/
goto FAIL;
}
SQueryPlan *pPlan = NULL;
SPlanContext cxt = {
.pAstRoot = pAst,
.topicQuery = false,
.streamQuery = true,
.triggerType = pObj->trigger == STREAM_TRIGGER_MAX_DELAY ? STREAM_TRIGGER_WINDOW_CLOSE : pObj->trigger,
.watermark = pObj->watermark,
};
// using ast and param to build physical plan
if (qCreateQueryPlan(&cxt, &pPlan, NULL) < 0) {
/*ASSERT(0);*/
goto FAIL;
}
// save physcial plan
if (nodesNodeToString((SNode *)pPlan, false, &pObj->physicalPlan, NULL) != 0) {
/*ASSERT(0);*/
goto FAIL;
}
FAIL:
if (pAst != NULL) nodesDestroyNode(pAst);
return 0;
}
int32_t mndPersistStream(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
SSdbRaw *pCommitRaw = mndStreamActionEncode(pStream);
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
return -1;
}
sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
return 0;
}
int32_t mndPersistDropStreamLog(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
SSdbRaw *pCommitRaw = mndStreamActionEncode(pStream);
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
return -1;
}
sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED);
return 0;
}
int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, STrans *pTrans) { int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, STrans *pTrans) {
SNode *pAst = NULL; SNode *pAst = NULL;
...@@ -278,8 +387,8 @@ int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast ...@@ -278,8 +387,8 @@ int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast
} }
static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStreamObj *pStream, const char *user) { static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStreamObj *pStream, const char *user) {
SStbObj *pStb = NULL; SStbObj *pStb = NULL;
SDbObj *pDb = NULL; SDbObj *pDb = NULL;
SMCreateStbReq createReq = {0}; SMCreateStbReq createReq = {0};
tstrncpy(createReq.name, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN); tstrncpy(createReq.name, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
...@@ -321,7 +430,6 @@ static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStre ...@@ -321,7 +430,6 @@ static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStre
goto _OVER; goto _OVER;
} }
if (mndCheckDbAuth(pMnode, user, MND_OPER_WRITE_DB, pDb) != 0) { if (mndCheckDbAuth(pMnode, user, MND_OPER_WRITE_DB, pDb) != 0) {
goto _OVER; goto _OVER;
} }
...@@ -357,24 +465,24 @@ static int32_t mndPersistTaskDropReq(STrans *pTrans, SStreamTask *pTask) { ...@@ -357,24 +465,24 @@ static int32_t mndPersistTaskDropReq(STrans *pTrans, SStreamTask *pTask) {
ASSERT(pTask->nodeId != 0); ASSERT(pTask->nodeId != 0);
// vnode // vnode
if (pTask->nodeId > 0) { /*if (pTask->nodeId > 0) {*/
SVDropStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVDropStreamTaskReq)); SVDropStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVDropStreamTaskReq));
if (pReq == NULL) { if (pReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
}
pReq->head.vgId = htonl(pTask->nodeId);
pReq->taskId = pTask->taskId;
STransAction action = {0};
memcpy(&action.epSet, &pTask->epSet, sizeof(SEpSet));
action.pCont = pReq;
action.contLen = sizeof(SVDropStreamTaskReq);
action.msgType = TDMT_VND_STREAM_TASK_DROP;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(pReq);
return -1;
}
} }
pReq->head.vgId = htonl(pTask->nodeId);
pReq->taskId = pTask->taskId;
STransAction action = {0};
memcpy(&action.epSet, &pTask->epSet, sizeof(SEpSet));
action.pCont = pReq;
action.contLen = sizeof(SVDropStreamTaskReq);
action.msgType = TDMT_VND_STREAM_TASK_DROP;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(pReq);
return -1;
}
/*}*/
return 0; return 0;
} }
...@@ -404,10 +512,9 @@ static int32_t mndCreateStream(SMnode *pMnode, SRpcMsg *pReq, SCMCreateStreamReq ...@@ -404,10 +512,9 @@ static int32_t mndCreateStream(SMnode *pMnode, SRpcMsg *pReq, SCMCreateStreamReq
streamObj.updateTime = streamObj.createTime; streamObj.updateTime = streamObj.createTime;
streamObj.uid = mndGenerateUid(pCreate->name, strlen(pCreate->name)); streamObj.uid = mndGenerateUid(pCreate->name, strlen(pCreate->name));
streamObj.targetStbUid = mndGenerateUid(pCreate->targetStbFullName, TSDB_TABLE_FNAME_LEN); streamObj.targetStbUid = mndGenerateUid(pCreate->targetStbFullName, TSDB_TABLE_FNAME_LEN);
streamObj.dbUid = pDb->uid; streamObj.sourceDbUid = pDb->uid;
streamObj.version = 1; streamObj.version = 1;
streamObj.sql = pCreate->sql; streamObj.sql = pCreate->sql;
streamObj.createdBy = STREAM_CREATED_BY__USER;
// TODO // TODO
streamObj.fixedSinkVgId = 0; streamObj.fixedSinkVgId = 0;
streamObj.smaId = 0; streamObj.smaId = 0;
...@@ -487,7 +594,8 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { ...@@ -487,7 +594,8 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
goto _OVER; goto _OVER;
} }
// TODO check auth // TODO check read auth for source and write auth for target
#if 0
pDb = mndAcquireDb(pMnode, createStreamReq.sourceDB); pDb = mndAcquireDb(pMnode, createStreamReq.sourceDB);
if (pDb == NULL) { if (pDb == NULL) {
terrno = TSDB_CODE_MND_DB_NOT_SELECTED; terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
...@@ -497,9 +605,60 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { ...@@ -497,9 +605,60 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
if (mndCheckDbAuth(pMnode, pReq->conn.user, MND_OPER_WRITE_DB, pDb) != 0) { if (mndCheckDbAuth(pMnode, pReq->conn.user, MND_OPER_WRITE_DB, pDb) != 0) {
goto _OVER; goto _OVER;
} }
#endif
code = mndCreateStream(pMnode, pReq, &createStreamReq, pDb); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pReq);
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; if (pTrans == NULL) {
mError("stream:%s, failed to create since %s", createStreamReq.name, terrstr());
goto _OVER;
}
mndTransSetDbName(pTrans, createStreamReq.sourceDB);
// TODO
/*mndTransSetDbName(pTrans, streamObj.targetDb);*/
mDebug("trans:%d, used to create stream:%s", pTrans->id, createStreamReq.name);
// build stream obj from request
SStreamObj streamObj = {0};
if (mndBuildStreamObjFromCreateReq(pMnode, &streamObj, &createStreamReq) < 0) {
ASSERT(0);
mError("stream:%s, failed to create since %s", createStreamReq.name, terrstr());
goto _OVER;
}
// create stb for stream
if (mndCreateStbForStream(pMnode, pTrans, &streamObj, pReq->conn.user) < 0) {
mError("trans:%d, failed to create stb for stream %s since %s", pTrans->id, createStreamReq.name, terrstr());
mndTransDrop(pTrans);
goto _OVER;
}
// schedule stream task for stream obj
if (mndScheduleStream(pMnode, pTrans, &streamObj) < 0) {
mError("stream:%s, failed to schedule since %s", createStreamReq.name, terrstr());
mndTransDrop(pTrans);
goto _OVER;
}
// add stream to trans
if (mndPersistStream(pMnode, pTrans, &streamObj) < 0) {
mError("stream:%s, failed to schedule since %s", createStreamReq.name, terrstr());
mndTransDrop(pTrans);
goto _OVER;
}
// execute creation
if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
goto _OVER;
}
mndTransDrop(pTrans);
/*code = mndCreateStream(pMnode, pReq, &createStreamReq, pDb);*/
/*if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;*/
code = TSDB_CODE_ACTION_IN_PROGRESS;
_OVER: _OVER:
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
...@@ -520,13 +679,19 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { ...@@ -520,13 +679,19 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
/*SDbObj *pDb = NULL;*/ /*SDbObj *pDb = NULL;*/
/*SUserObj *pUser = NULL;*/ /*SUserObj *pUser = NULL;*/
SMDropStreamTaskReq dropStreamReq = *(SMDropStreamTaskReq *)pReq->pCont; SMDropStreamReq dropReq = *(SMDropStreamReq *)pReq->pCont;
pStream = mndAcquireStream(pMnode, dropStreamReq.name); pStream = mndAcquireStream(pMnode, dropReq.name);
if (pStream == NULL) { if (pStream == NULL) {
terrno = TSDB_CODE_MND_STREAM_NOT_EXIST; if (dropReq.igNotExists) {
return -1; mDebug("stream:%s, not exist, ignore not exist is set", dropReq.name);
code = 0;
goto DROP_STREAM_OVER;
} else {
terrno = TSDB_CODE_MND_STREAM_NOT_EXIST;
return -1;
}
} }
#if 0 #if 0
...@@ -539,18 +704,62 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { ...@@ -539,18 +704,62 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq);
if (pTrans == NULL) { if (pTrans == NULL) {
mError("stream:%s, failed to drop since %s", dropStreamReq.name, terrstr()); mError("stream:%s, failed to drop since %s", dropReq.name, terrstr());
return -1; return code;
} }
mDebug("trans:%d, used to drop stream:%s", pTrans->id, dropStreamReq.name); mDebug("trans:%d, used to drop stream:%s", pTrans->id, dropReq.name);
// drop all tasks // drop all tasks
if (mndDropStreamTasks(pMnode, pTrans, pStream) < 0) { if (mndDropStreamTasks(pMnode, pTrans, pStream) < 0) {
mError("stream:%s, failed to drop task since %s", dropStreamReq.name, terrstr()); mError("stream:%s, failed to drop task since %s", dropReq.name, terrstr());
return code;
}
// drop stream
if (mndPersistDropStreamLog(pMnode, pTrans, pStream) < 0) {
sdbRelease(pMnode->pSdb, pStream);
return -1; return -1;
} }
DROP_STREAM_OVER: DROP_STREAM_OVER:
return code;
}
int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
SStreamObj *pStream = NULL;
while (1) {
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
if (pIter == NULL) break;
if (pStream->sourceDbUid == pDb->uid || pStream->targetDbUid == pDb->uid) {
if (pStream->sourceDbUid != pStream->targetDbUid) {
sdbRelease(pSdb, pStream);
return -1;
} else {
// TODO drop all task on snode
if (mndPersistDropStreamLog(pMnode, pTrans, pStream) < 0) {
sdbRelease(pSdb, pStream);
return -1;
}
}
} else {
sdbRelease(pSdb, pStream);
continue;
}
#if 0
if (mndSetDropOffsetStreamLogs(pMnode, pTrans, pStream) < 0) {
sdbRelease(pSdb, pStream);
goto END;
}
#endif
sdbRelease(pSdb, pStream);
}
return 0; return 0;
} }
...@@ -569,7 +778,7 @@ static int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfS ...@@ -569,7 +778,7 @@ static int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfS
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream); pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
if (pIter == NULL) break; if (pIter == NULL) break;
if (pStream->dbUid == pDb->uid) { if (pStream->sourceDbUid == pDb->uid) {
numOfStreams++; numOfStreams++;
} }
......
...@@ -256,14 +256,6 @@ void mndSyncStart(SMnode *pMnode) { ...@@ -256,14 +256,6 @@ void mndSyncStart(SMnode *pMnode) {
syncSetMsgCb(pMgmt->sync, &pMnode->msgCb); syncSetMsgCb(pMgmt->sync, &pMnode->msgCb);
syncStart(pMgmt->sync); syncStart(pMgmt->sync);
mDebug("mnode sync started, id:%" PRId64 " standby:%d", pMgmt->sync, pMgmt->standby); mDebug("mnode sync started, id:%" PRId64 " standby:%d", pMgmt->sync, pMgmt->standby);
/*
if (pMgmt->standby) {
syncStartStandBy(pMgmt->sync);
} else {
syncStart(pMgmt->sync);
}
*/
} }
void mndSyncStop(SMnode *pMnode) {} void mndSyncStop(SMnode *pMnode) {}
......
...@@ -258,6 +258,7 @@ struct SSma { ...@@ -258,6 +258,7 @@ struct SSma {
#define SMA_CFG(s) (&(s)->pVnode->config) #define SMA_CFG(s) (&(s)->pVnode->config)
#define SMA_TSDB_CFG(s) (&(s)->pVnode->config.tsdbCfg) #define SMA_TSDB_CFG(s) (&(s)->pVnode->config.tsdbCfg)
#define SMA_RETENTION(s) ((SRetention *)&(s)->pVnode->config.tsdbCfg.retentions)
#define SMA_LOCKED(s) ((s)->locked) #define SMA_LOCKED(s) ((s)->locked)
#define SMA_META(s) ((s)->pVnode->pMeta) #define SMA_META(s) ((s)->pVnode->pMeta)
#define SMA_VID(s) TD_VID((s)->pVnode) #define SMA_VID(s) TD_VID((s)->pVnode)
......
...@@ -400,22 +400,24 @@ static FORCE_INLINE int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int3 ...@@ -400,22 +400,24 @@ static FORCE_INLINE int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int3
} }
if (taosArrayGetSize(pResult) > 0) { if (taosArrayGetSize(pResult) > 0) {
#if 1 #if 0
char flag[10] = {0}; char flag[10] = {0};
snprintf(flag, 10, "level %" PRIi8, level); snprintf(flag, 10, "level %" PRIi8, level);
blockDebugShowData(pResult, flag); blockDebugShowData(pResult, flag);
#endif #endif
STsdb *sinkTsdb = (level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb1 : pSma->pRSmaTsdb2); STsdb *sinkTsdb = (level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb1 : pSma->pRSmaTsdb2);
SSubmitReq *pReq = NULL; SSubmitReq *pReq = NULL;
if (buildSubmitReqFromDataBlock(&pReq, pResult, pTSchema, SMA_VID(pSma), suid) != 0) { if (buildSubmitReqFromDataBlock(&pReq, pResult, pTSchema, SMA_VID(pSma), suid) < 0) {
taosArrayDestroy(pResult); taosArrayDestroy(pResult);
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
if (tdProcessSubmitReq(sinkTsdb, INT64_MAX, pReq) != 0) {
if (pReq && tdProcessSubmitReq(sinkTsdb, INT64_MAX, pReq) < 0) {
taosArrayDestroy(pResult); taosArrayDestroy(pResult);
taosMemoryFreeClear(pReq); taosMemoryFreeClear(pReq);
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
taosMemoryFreeClear(pReq); taosMemoryFreeClear(pReq);
} else { } else {
smaDebug("vgId:%d, no rsma % " PRIi8 " data generated since %s", SMA_VID(pSma), level, tstrerror(terrno)); smaDebug("vgId:%d, no rsma % " PRIi8 " data generated since %s", SMA_VID(pSma), level, tstrerror(terrno));
...@@ -469,6 +471,12 @@ int32_t tdProcessRSmaSubmit(SSma *pSma, void *pMsg, int32_t inputType) { ...@@ -469,6 +471,12 @@ int32_t tdProcessRSmaSubmit(SSma *pSma, void *pMsg, int32_t inputType) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SRetention *pRetention = SMA_RETENTION(pSma);
if (!RETENTION_VALID(pRetention + 1)) {
// return directly if retention level 1 is invalid
return TSDB_CODE_SUCCESS;
}
if (inputType == STREAM_DATA_TYPE_SUBMIT_BLOCK) { if (inputType == STREAM_DATA_TYPE_SUBMIT_BLOCK) {
STbUidStore uidStore = {0}; STbUidStore uidStore = {0};
tdFetchSubmitReqSuids(pMsg, &uidStore); tdFetchSubmitReqSuids(pMsg, &uidStore);
......
...@@ -300,108 +300,6 @@ void vnodeUpdateMetaRsp(SVnode *pVnode, STableMetaRsp *pMetaRsp) { ...@@ -300,108 +300,6 @@ void vnodeUpdateMetaRsp(SVnode *pVnode, STableMetaRsp *pMetaRsp) {
pMetaRsp->precision = pVnode->config.tsdbCfg.precision; pMetaRsp->precision = pVnode->config.tsdbCfg.precision;
} }
int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
int32_t ret = 0;
if (syncEnvIsStart()) {
SSyncNode *pSyncNode = syncNodeAcquire(pVnode->sync);
assert(pSyncNode != NULL);
ESyncState state = syncGetMyRole(pVnode->sync);
SyncTerm currentTerm = syncGetMyTerm(pVnode->sync);
SMsgHead *pHead = pMsg->pCont;
char logBuf[512] = {0};
char *syncNodeStr = sync2SimpleStr(pVnode->sync);
snprintf(logBuf, sizeof(logBuf), "==vnodeProcessSyncReq== msgType:%d, syncNode: %s", pMsg->msgType, syncNodeStr);
static int64_t vndTick = 0;
if (++vndTick % 10 == 1) {
vTrace("sync trace msg:%s, %s", TMSG_INFO(pMsg->msgType), syncNodeStr);
}
syncRpcMsgLog2(logBuf, pMsg);
taosMemoryFree(syncNodeStr);
SRpcMsg *pRpcMsg = pMsg;
if (pRpcMsg->msgType == TDMT_SYNC_TIMEOUT) {
SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
ret = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg);
syncTimeoutDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_SYNC_PING) {
SyncPing *pSyncMsg = syncPingFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
ret = syncNodeOnPingCb(pSyncNode, pSyncMsg);
syncPingDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_SYNC_PING_REPLY) {
SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
ret = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg);
syncPingReplyDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
ret = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg);
syncClientRequestDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
ret = syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg);
syncRequestVoteDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) {
SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
ret = syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg);
syncRequestVoteReplyDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) {
SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
ret = syncNodeOnAppendEntriesCb(pSyncNode, pSyncMsg);
syncAppendEntriesDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) {
SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
ret = syncNodeOnAppendEntriesReplyCb(pSyncNode, pSyncMsg);
syncAppendEntriesReplyDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_SYNC_SET_VNODE_STANDBY) {
ret = syncSetStandby(pVnode->sync);
vInfo("vgId:%d, set standby result:0x%x rid:%" PRId64, pVnode->config.vgId, ret, pVnode->sync);
SRpcMsg rsp = {.code = ret, .info = pMsg->info};
tmsgSendRsp(&rsp);
} else {
vError("==vnodeProcessSyncReq== error msg type:%d", pRpcMsg->msgType);
ret = -1;
}
syncNodeRelease(pSyncNode);
} else {
vError("==vnodeProcessSyncReq== error syncEnv stop");
ret = -1;
}
if (ret != 0) {
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
}
return ret;
}
static int32_t vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { static int32_t vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
SVCreateStbReq req = {0}; SVCreateStbReq req = {0};
SDecoder coder; SDecoder coder;
......
...@@ -50,6 +50,33 @@ static inline void vnodePostBlockMsg(SVnode *pVnode, tmsg_t type) { ...@@ -50,6 +50,33 @@ static inline void vnodePostBlockMsg(SVnode *pVnode, tmsg_t type) {
} }
} }
static int32_t vnodeSetStandBy(SVnode *pVnode) {
vInfo("vgId:%d, start to set standby", TD_VID(pVnode));
if (syncSetStandby(pVnode->sync) == 0) {
vInfo("vgId:%d, set standby success", TD_VID(pVnode));
return 0;
} else if (terrno != TSDB_CODE_SYN_IS_LEADER) {
vError("vgId:%d, failed to set standby since %s", TD_VID(pVnode), terrstr());
return -1;
}
if (syncLeaderTransfer(pVnode->sync) != 0) {
vError("vgId:%d, failed to transfer leader since:%s", TD_VID(pVnode), terrstr());
return -1;
} else {
vInfo("vgId:%d, transfer leader success", TD_VID(pVnode));
}
if (syncSetStandby(pVnode->sync) == 0) {
vInfo("vgId:%d, set standby success", TD_VID(pVnode));
return 0;
} else {
vError("vgId:%d, failed to set standby since %s", TD_VID(pVnode), terrstr());
return -1;
}
}
static int32_t vnodeProcessAlterReplicaReq(SVnode *pVnode, SRpcMsg *pMsg) { static int32_t vnodeProcessAlterReplicaReq(SVnode *pVnode, SRpcMsg *pMsg) {
SAlterVnodeReq req = {0}; SAlterVnodeReq req = {0};
if (tDeserializeSAlterVnodeReq((char *)pMsg->pCont + sizeof(SMsgHead), pMsg->contLen - sizeof(SMsgHead), &req) != 0) { if (tDeserializeSAlterVnodeReq((char *)pMsg->pCont + sizeof(SMsgHead), pMsg->contLen - sizeof(SMsgHead), &req) != 0) {
...@@ -74,14 +101,19 @@ static int32_t vnodeProcessAlterReplicaReq(SVnode *pVnode, SRpcMsg *pMsg) { ...@@ -74,14 +101,19 @@ static int32_t vnodeProcessAlterReplicaReq(SVnode *pVnode, SRpcMsg *pMsg) {
int32_t code = syncPropose(pVnode->sync, &rpcMsg, false); int32_t code = syncPropose(pVnode->sync, &rpcMsg, false);
if (code != 0) { if (code != 0) {
vDebug("vgId:%d, failed to propose reconfig msg since %s", TD_VID(pVnode), terrstr()); if (terrno != 0) code = terrno;
if (syncLeaderTransfer(pVnode->sync) != 0) {
vError("vgId:%d, failed to transfer leader since %s", TD_VID(pVnode), terrstr()); vInfo("vgId:%d, failed to propose reconfig msg since %s", TD_VID(pVnode), terrstr());
} else { if (terrno == TSDB_CODE_SYN_IS_LEADER) {
vDebug("vgId:%d, transfer leader success, propose reconfig config again", TD_VID(pVnode)); if (syncLeaderTransfer(pVnode->sync) != 0) {
vError("vgId:%d, failed to transfer leader since %s", TD_VID(pVnode), terrstr());
} else {
vInfo("vgId:%d, transfer leader success", TD_VID(pVnode));
}
} }
} }
terrno = code;
return code; return code;
} }
...@@ -108,7 +140,6 @@ void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { ...@@ -108,7 +140,6 @@ void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
if (code == 0) { if (code == 0) {
vnodeAccumBlockMsg(pVnode, pMsg->msgType); vnodeAccumBlockMsg(pVnode, pMsg->msgType);
} else if (code == -1 && terrno == TSDB_CODE_SYN_NOT_LEADER) { } else if (code == -1 && terrno == TSDB_CODE_SYN_NOT_LEADER) {
SEpSet newEpSet = {0}; SEpSet newEpSet = {0};
syncGetEpSet(pVnode->sync, &newEpSet); syncGetEpSet(pVnode->sync, &newEpSet);
...@@ -170,6 +201,108 @@ void vnodeApplyMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { ...@@ -170,6 +201,108 @@ void vnodeApplyMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
} }
} }
int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
int32_t ret = 0;
if (syncEnvIsStart()) {
SSyncNode *pSyncNode = syncNodeAcquire(pVnode->sync);
assert(pSyncNode != NULL);
ESyncState state = syncGetMyRole(pVnode->sync);
SyncTerm currentTerm = syncGetMyTerm(pVnode->sync);
SMsgHead *pHead = pMsg->pCont;
char logBuf[512] = {0};
char *syncNodeStr = sync2SimpleStr(pVnode->sync);
snprintf(logBuf, sizeof(logBuf), "==vnodeProcessSyncReq== msgType:%d, syncNode: %s", pMsg->msgType, syncNodeStr);
static int64_t vndTick = 0;
if (++vndTick % 10 == 1) {
vTrace("sync trace msg:%s, %s", TMSG_INFO(pMsg->msgType), syncNodeStr);
}
syncRpcMsgLog2(logBuf, pMsg);
taosMemoryFree(syncNodeStr);
SRpcMsg *pRpcMsg = pMsg;
if (pRpcMsg->msgType == TDMT_SYNC_TIMEOUT) {
SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
ret = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg);
syncTimeoutDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_SYNC_PING) {
SyncPing *pSyncMsg = syncPingFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
ret = syncNodeOnPingCb(pSyncNode, pSyncMsg);
syncPingDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_SYNC_PING_REPLY) {
SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
ret = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg);
syncPingReplyDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
ret = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg);
syncClientRequestDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
ret = syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg);
syncRequestVoteDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) {
SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
ret = syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg);
syncRequestVoteReplyDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) {
SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
ret = syncNodeOnAppendEntriesCb(pSyncNode, pSyncMsg);
syncAppendEntriesDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) {
SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
ret = syncNodeOnAppendEntriesReplyCb(pSyncNode, pSyncMsg);
syncAppendEntriesReplyDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_SYNC_SET_VNODE_STANDBY) {
ret = vnodeSetStandBy(pVnode);
if (ret != 0 && terrno != 0) ret = terrno;
SRpcMsg rsp = {.code = ret, .info = pMsg->info};
tmsgSendRsp(&rsp);
} else {
vError("==vnodeProcessSyncReq== error msg type:%d", pRpcMsg->msgType);
ret = -1;
}
syncNodeRelease(pSyncNode);
} else {
vError("==vnodeProcessSyncReq== error syncEnv stop");
ret = -1;
}
if (ret != 0) {
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
}
return ret;
}
static int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { static int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
int32_t code = tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg); int32_t code = tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg);
if (code != 0) { if (code != 0) {
...@@ -258,17 +391,17 @@ static void vnodeSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta ...@@ -258,17 +391,17 @@ static void vnodeSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta
syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg); syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);
} }
int32_t vnodeSnapshotStartRead(struct SSyncFSM *pFsm, void **ppReader) { return 0; } static int32_t vnodeSnapshotStartRead(struct SSyncFSM *pFsm, void **ppReader) { return 0; }
int32_t vnodeSnapshotStopRead(struct SSyncFSM *pFsm, void *pReader) { return 0; } static int32_t vnodeSnapshotStopRead(struct SSyncFSM *pFsm, void *pReader) { return 0; }
int32_t vnodeSnapshotDoRead(struct SSyncFSM *pFsm, void *pReader, void **ppBuf, int32_t *len) { return 0; } static int32_t vnodeSnapshotDoRead(struct SSyncFSM *pFsm, void *pReader, void **ppBuf, int32_t *len) { return 0; }
int32_t vnodeSnapshotStartWrite(struct SSyncFSM *pFsm, void **ppWriter) { return 0; } static int32_t vnodeSnapshotStartWrite(struct SSyncFSM *pFsm, void **ppWriter) { return 0; }
int32_t vnodeSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool isApply) { return 0; } static int32_t vnodeSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool isApply) { return 0; }
int32_t vnodeSnapshotDoWrite(struct SSyncFSM *pFsm, void *pWriter, void *pBuf, int32_t len) { return 0; } static int32_t vnodeSnapshotDoWrite(struct SSyncFSM *pFsm, void *pWriter, void *pBuf, int32_t len) { return 0; }
static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) { static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM)); SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
...@@ -279,7 +412,6 @@ static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) { ...@@ -279,7 +412,6 @@ static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
pFsm->FpGetSnapshot = vnodeSyncGetSnapshot; pFsm->FpGetSnapshot = vnodeSyncGetSnapshot;
pFsm->FpRestoreFinishCb = NULL; pFsm->FpRestoreFinishCb = NULL;
pFsm->FpReConfigCb = vnodeSyncReconfig; pFsm->FpReConfigCb = vnodeSyncReconfig;
pFsm->FpSnapshotStartRead = vnodeSnapshotStartRead; pFsm->FpSnapshotStartRead = vnodeSnapshotStartRead;
pFsm->FpSnapshotStopRead = vnodeSnapshotStopRead; pFsm->FpSnapshotStopRead = vnodeSnapshotStopRead;
pFsm->FpSnapshotDoRead = vnodeSnapshotDoRead; pFsm->FpSnapshotDoRead = vnodeSnapshotDoRead;
...@@ -292,7 +424,6 @@ static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) { ...@@ -292,7 +424,6 @@ static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
int32_t vnodeSyncOpen(SVnode *pVnode, char *path) { int32_t vnodeSyncOpen(SVnode *pVnode, char *path) {
SSyncInfo syncInfo = { SSyncInfo syncInfo = {
.isStandBy = false,
.snapshotEnable = false, .snapshotEnable = false,
.vgId = pVnode->config.vgId, .vgId = pVnode->config.vgId,
.isStandBy = pVnode->config.standby, .isStandBy = pVnode->config.standby,
...@@ -321,13 +452,6 @@ int32_t vnodeSyncOpen(SVnode *pVnode, char *path) { ...@@ -321,13 +452,6 @@ int32_t vnodeSyncOpen(SVnode *pVnode, char *path) {
void vnodeSyncStart(SVnode *pVnode) { void vnodeSyncStart(SVnode *pVnode) {
syncSetMsgCb(pVnode->sync, &pVnode->msgCb); syncSetMsgCb(pVnode->sync, &pVnode->msgCb);
syncStart(pVnode->sync); syncStart(pVnode->sync);
/*
if (pVnode->config.standby) {
syncStartStandBy(pVnode->sync);
} else {
syncStart(pVnode->sync);
}
*/
} }
void vnodeSyncClose(SVnode *pVnode) { syncStop(pVnode->sync); } void vnodeSyncClose(SVnode *pVnode) { syncStop(pVnode->sync); }
...@@ -4533,12 +4533,14 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -4533,12 +4533,14 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
STsdbReader* pDataReader = STsdbReader* pDataReader =
doCreateDataReader(pTableScanNode, pHandle, pTableListInfo, (uint64_t)queryId, taskId, pTagCond); doCreateDataReader(pTableScanNode, pHandle, pTableListInfo, (uint64_t)queryId, taskId, pTagCond);
if (pDataReader == NULL && terrno != 0) { if (pDataReader == NULL && terrno != 0) {
pTaskInfo->code = terrno;
return NULL; return NULL;
} }
int32_t code = extractTableSchemaVersion(pHandle, pTableScanNode->scan.uid, pTaskInfo); int32_t code = extractTableSchemaVersion(pHandle, pTableScanNode->scan.uid, pTaskInfo);
if (code) { if (code) {
tsdbCleanupReadHandle(pDataReader); tsdbCleanupReadHandle(pDataReader);
pTaskInfo->code = terrno;
return NULL; return NULL;
} }
...@@ -4547,6 +4549,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -4547,6 +4549,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
taosArrayDestroy(groupKeys); taosArrayDestroy(groupKeys);
if (code) { if (code) {
tsdbCleanupReadHandle(pDataReader); tsdbCleanupReadHandle(pDataReader);
pTaskInfo->code = terrno;
return NULL; return NULL;
} }
......
...@@ -156,7 +156,7 @@ int32_t syncSetStandby(int64_t rid) { ...@@ -156,7 +156,7 @@ int32_t syncSetStandby(int64_t rid) {
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
taosReleaseRef(tsNodeRefId, pSyncNode->rid); taosReleaseRef(tsNodeRefId, pSyncNode->rid);
terrno = TSDB_CODE_SYN_INTERNAL_ERROR; terrno = TSDB_CODE_SYN_IS_LEADER;
sError("failed to set standby since it is leader, rid:%" PRId64, rid); sError("failed to set standby since it is leader, rid:%" PRId64, rid);
return -1; return -1;
} }
......
...@@ -324,9 +324,9 @@ void cliHandleResp(SCliConn* conn) { ...@@ -324,9 +324,9 @@ void cliHandleResp(SCliConn* conn) {
tDebug("%s cli conn %p ref by app", CONN_GET_INST_LABEL(conn), conn); tDebug("%s cli conn %p ref by app", CONN_GET_INST_LABEL(conn), conn);
} }
tDebug("%s cli conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pTransInst->label, conn, tDebug("%s cli conn %p %s received from %s:%d, local info:%s:%d, msg size:%d code:0x%x", pTransInst->label, conn,
TMSG_INFO(pHead->msgType), taosInetNtoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port), TMSG_INFO(pHead->msgType), taosInetNtoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port),
taosInetNtoa(conn->localAddr.sin_addr), ntohs(conn->localAddr.sin_port), transMsg.contLen); taosInetNtoa(conn->localAddr.sin_addr), ntohs(conn->localAddr.sin_port), transMsg.contLen, transMsg.code);
if (pCtx == NULL && CONN_NO_PERSIST_BY_APP(conn)) { if (pCtx == NULL && CONN_NO_PERSIST_BY_APP(conn)) {
tTrace("except, server continue send while cli ignore it"); tTrace("except, server continue send while cli ignore it");
......
...@@ -285,13 +285,14 @@ static void uvHandleReq(SSvrConn* pConn) { ...@@ -285,13 +285,14 @@ static void uvHandleReq(SSvrConn* pConn) {
} }
if (pConn->status == ConnNormal && pHead->noResp == 0) { if (pConn->status == ConnNormal && pHead->noResp == 0) {
transRefSrvHandle(pConn); transRefSrvHandle(pConn);
tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pConn, TMSG_INFO(transMsg.msgType), tDebug("server conn %p %s received from %s:%d, local info:%s:%d, msg size:%d code:0x%x", pConn,
taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->localAddr.sin_addr), TMSG_INFO(transMsg.msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port),
ntohs(pConn->localAddr.sin_port), transMsg.contLen); taosInetNtoa(pConn->localAddr.sin_addr), ntohs(pConn->localAddr.sin_port), transMsg.contLen, transMsg.code);
} else { } else {
tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d, resp:%d ", pConn, tDebug("server conn %p %s received from %s:%d, local info:%s:%d, msg size:%d, resp:%d code:0x%x", pConn,
TMSG_INFO(transMsg.msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), TMSG_INFO(transMsg.msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port),
taosInetNtoa(pConn->localAddr.sin_addr), ntohs(pConn->localAddr.sin_port), transMsg.contLen, pHead->noResp); taosInetNtoa(pConn->localAddr.sin_addr), ntohs(pConn->localAddr.sin_port), transMsg.contLen, pHead->noResp,
transMsg.code);
// no ref here // no ref here
} }
......
...@@ -413,7 +413,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SYN_MISMATCHED_SIGNATURE, "Mismatched signature" ...@@ -413,7 +413,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SYN_MISMATCHED_SIGNATURE, "Mismatched signature"
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_CHECKSUM, "Invalid msg checksum") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_CHECKSUM, "Invalid msg checksum")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_MSGLEN, "Invalid msg length") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_MSGLEN, "Invalid msg length")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_MSGTYPE, "Invalid msg type") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_MSGTYPE, "Invalid msg type")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_IS_LEADER, "Sync is leader")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NOT_LEADER, "Sync not leader") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NOT_LEADER, "Sync not leader")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_ONE_REPLICA, "Sync one replica") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_ONE_REPLICA, "Sync one replica")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NOT_IN_NEW_CONFIG, "Sync not in new config") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NOT_IN_NEW_CONFIG, "Sync not in new config")
......
...@@ -4,6 +4,11 @@ system sh/deploy.sh -n dnode2 -i 2 ...@@ -4,6 +4,11 @@ system sh/deploy.sh -n dnode2 -i 2
system sh/deploy.sh -n dnode3 -i 3 system sh/deploy.sh -n dnode3 -i 3
system sh/deploy.sh -n dnode4 -i 4 system sh/deploy.sh -n dnode4 -i 4
system sh/deploy.sh -n dnode5 -i 5 system sh/deploy.sh -n dnode5 -i 5
system sh/cfg.sh -n dnode1 -c transPullupInterval -v 1
system sh/cfg.sh -n dnode2 -c transPullupInterval -v 1
system sh/cfg.sh -n dnode3 -c transPullupInterval -v 1
system sh/cfg.sh -n dnode4 -c transPullupInterval -v 1
system sh/cfg.sh -n dnode5 -c transPullupInterval -v 1
system sh/cfg.sh -n dnode1 -c supportVnodes -v 0 system sh/cfg.sh -n dnode1 -c supportVnodes -v 0
system sh/exec.sh -n dnode1 -s start system sh/exec.sh -n dnode1 -s start
system sh/exec.sh -n dnode2 -s start system sh/exec.sh -n dnode2 -s start
...@@ -128,9 +133,9 @@ if $rows != 1 then ...@@ -128,9 +133,9 @@ if $rows != 1 then
endi endi
if $data(2)[4] == leader then if $data(2)[4] == leader then
$leaderExist = 1 $leaderExist = 1
$leaderVnode = 4 $leaderVnode = 2
$follower1 = 2 $follower1 = 3
$follower2 = 3 $follower2 = 4
endi endi
if $data(2)[6] == leader then if $data(2)[6] == leader then
$leaderExist = 1 $leaderExist = 1
...@@ -140,9 +145,9 @@ if $data(2)[6] == leader then ...@@ -140,9 +145,9 @@ if $data(2)[6] == leader then
endi endi
if $data(2)[8] == leader then if $data(2)[8] == leader then
$leaderExist = 1 $leaderExist = 1
$leaderVnode = 2 $leaderVnode = 4
$follower1 = 3 $follower1 = 2
$follower2 = 4 $follower2 = 3
endi endi
if $leaderExist != 1 then if $leaderExist != 1 then
goto step3 goto step3
...@@ -160,110 +165,55 @@ if $rows != 1 then ...@@ -160,110 +165,55 @@ if $rows != 1 then
return -1 return -1
endi endi
<<<<<<< HEAD
print =============== step32: move follower2
print redistribute vgroup 2 dnode $leaderVnode dnode $follower2 dnode 5
sql redistribute vgroup 2 dnode $leaderVnode dnode $follower2 dnode 5
<<<<<<< HEAD:tests/script/tsim/dnode/redistribute_vgroup_replica3_v1_leader.sim
=======
sql show d1.vgroups
print ===> $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09
>>>>>>> origin/3.0:tests/script/tsim/dnode/redistribute_vgroup_replica3_v1_follower.sim
=======
print =============== step32: move leader print =============== step32: move leader
print redistribute vgroup 2 dnode $follower1 dnode $follower2 dnode 5 print redistribute vgroup 2 dnode $follower1 dnode $follower2 dnode 5
sql redistribute vgroup 2 dnode $follower1 dnode $follower2 dnode 5 sql redistribute vgroup 2 dnode $follower1 dnode $follower2 dnode 5
sql show d1.vgroups sql show d1.vgroups
print ===> $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09 print ===> $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09
>>>>>>> origin/3.0
sql show d1.tables sql show d1.tables
if $rows != 1 then if $rows != 1 then
return -1 return -1
endi endi
<<<<<<< HEAD return
print =============== step33: move follower1
print redistribute vgroup 2 dnode $leaderVnode dnode $follower1 dnode 5
sql redistribute vgroup 2 dnode $leaderVnode dnode $follower1 dnode 5
<<<<<<< HEAD:tests/script/tsim/dnode/redistribute_vgroup_replica3_v1_leader.sim
=======
sql show d1.vgroups
print ===> $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09
>>>>>>> origin/3.0:tests/script/tsim/dnode/redistribute_vgroup_replica3_v1_follower.sim print =============== step33: move follower1
======= print redistribute vgroup 2 dnode $leaderVnode dnode $follower2 dnode 5
print =============== step33: move follower2 sql redistribute vgroup 2 dnode $leaderVnode dnode $follower2 dnode 5
print redistribute vgroup 2 dnode $leaderVnode dnode $follower1 dnode 5
sql redistribute vgroup 2 dnode $leaderVnode dnode $follower1 dnode 5
sql show d1.vgroups sql show d1.vgroups
print ===> $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09 print ===> $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09
>>>>>>> origin/3.0
sql show d1.tables sql show d1.tables
if $rows != 1 then if $rows != 1 then
return -1 return -1
endi endi
<<<<<<< HEAD
print =============== step34: move follower2 print =============== step34: move follower2
print redistribute vgroup 2 dnode $leaderVnode dnode 5 dnode $follower2 print redistribute vgroup 2 dnode $follower1 dnode 5 dnode $leaderVnode
sql redistribute vgroup 2 dnode $leaderVnode dnode 5 dnode $follower2 sql redistribute vgroup 2 dnode $follower1 dnode 5 dnode $leaderVnode
<<<<<<< HEAD:tests/script/tsim/dnode/redistribute_vgroup_replica3_v1_leader.sim
=======
sql show d1.vgroups
print ===> $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09
>>>>>>> origin/3.0:tests/script/tsim/dnode/redistribute_vgroup_replica3_v1_follower.sim
=======
print =============== step34: move follower1
print redistribute vgroup 2 dnode $follower2 dnode 5 dnode $leaderVnode
sql redistribute vgroup 2 dnode $follower2 dnode 5 dnode $leaderVnode
sql show d1.vgroups sql show d1.vgroups
print ===> $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09 print ===> $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09
>>>>>>> origin/3.0
sql show d1.tables sql show d1.tables
if $rows != 1 then if $rows != 1 then
return -1 return -1
endi endi
<<<<<<< HEAD print =============== step35: move leader
print =============== step35: move follower1 print redistribute vgroup 2 dnode $follower2 dnode 5 dnode $follower1
print redistribute vgroup 2 dnode $leaderVnode dnode 5 dnode $follower1 sql redistribute vgroup 2 dnode $follower2 dnode 5 dnode $follower1
sql redistribute vgroup 2 dnode $leaderVnode dnode 5 dnode $follower1
<<<<<<< HEAD:tests/script/tsim/dnode/redistribute_vgroup_replica3_v1_leader.sim
sql show d1.tables
if $rows != 1 then
return -1
endi
=======
sql show d1.vgroups sql show d1.vgroups
print ===> $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09 print ===> $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09
>>>>>>> origin/3.0:tests/script/tsim/dnode/redistribute_vgroup_replica3_v1_follower.sim
=======
print =============== step35: move 5
print redistribute vgroup 2 dnode $leaderVnode dnode $follower1 dnode $follower2
sql redistribute vgroup 2 dnode $leaderVnode dnode $follower1 dnode $follower2
sql show d1.vgroups
print ===> $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09
>>>>>>> origin/3.0
sql show d1.tables sql show d1.tables
if $rows != 1 then if $rows != 1 then
return -1 return -1
endi endi
<<<<<<< HEAD print =============== step36: move follower1
print =============== step36: move follower2
print redistribute vgroup 2 dnode $leaderVnode dnode $follower2 dnode 5 print redistribute vgroup 2 dnode $leaderVnode dnode $follower2 dnode 5
sql redistribute vgroup 2 dnode $leaderVnode dnode $follower2 dnode 5 sql redistribute vgroup 2 dnode $leaderVnode dnode $follower2 dnode 5
=======
print =============== step36: move follower1
print redistribute vgroup 2 dnode $follower1 dnode $follower2 dnode 5
sql redistribute vgroup 2 dnode $follower1 dnode $follower2 dnode 5
>>>>>>> origin/3.0
sql show d1.vgroups sql show d1.vgroups
print ===> $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09 print ===> $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09
...@@ -272,11 +222,7 @@ if $rows != 1 then ...@@ -272,11 +222,7 @@ if $rows != 1 then
return -1 return -1
endi endi
<<<<<<< HEAD
print =============== step37: move follower1
=======
print =============== step37: move follower2 print =============== step37: move follower2
>>>>>>> origin/3.0
print redistribute vgroup 2 dnode $leaderVnode dnode $follower1 dnode 5 print redistribute vgroup 2 dnode $leaderVnode dnode $follower1 dnode 5
sql redistribute vgroup 2 dnode $leaderVnode dnode $follower1 dnode 5 sql redistribute vgroup 2 dnode $leaderVnode dnode $follower1 dnode 5
sql show d1.vgroups sql show d1.vgroups
...@@ -287,17 +233,9 @@ if $rows != 1 then ...@@ -287,17 +233,9 @@ if $rows != 1 then
return -1 return -1
endi endi
<<<<<<< HEAD
<<<<<<< HEAD:tests/script/tsim/dnode/redistribute_vgroup_replica3_v1_leader.sim
=======
print =============== step38: move follower2
print redistribute vgroup 2 dnode $leaderVnode dnode 5 dnode $follower2
sql redistribute vgroup 2 dnode $leaderVnode dnode 5 dnode $follower2
=======
print =============== step38: move leader print =============== step38: move leader
print redistribute vgroup 2 dnode $follower1 dnode 5 dnode $follower2 print redistribute vgroup 2 dnode $follower1 dnode 5 dnode $follower2
sql redistribute vgroup 2 dnode $follower1 dnode 5 dnode $follower2 sql redistribute vgroup 2 dnode $follower1 dnode 5 dnode $follower2
>>>>>>> origin/3.0
sql show d1.vgroups sql show d1.vgroups
print ===> $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09 print ===> $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09
...@@ -305,18 +243,10 @@ sql show d1.tables ...@@ -305,18 +243,10 @@ sql show d1.tables
if $rows != 1 then if $rows != 1 then
return -1 return -1
endi endi
<<<<<<< HEAD
>>>>>>> origin/3.0:tests/script/tsim/dnode/redistribute_vgroup_replica3_v1_follower.sim
print =============== step39: move follower1 print =============== step39: move follower1
print redistribute vgroup 2 dnode $leaderVnode dnode 5 dnode $follower1 print redistribute vgroup 2 dnode $leaderVnode dnode 5 dnode $follower2
sql redistribute vgroup 2 dnode $leaderVnode dnode 5 dnode $follower1 sql redistribute vgroup 2 dnode $leaderVnode dnode 5 dnode $follower2
=======
print =============== step39: move 5
print redistribute vgroup 2 dnode $leaderVnode dnode $follower2 dnode $follower1
sql redistribute vgroup 2 dnode $leaderVnode dnode $follower2 dnode $follower1
>>>>>>> origin/3.0
sql show d1.vgroups sql show d1.vgroups
print ===> $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09 print ===> $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09
......
...@@ -101,7 +101,7 @@ python3 ./test.py -f 2-query/tail.py ...@@ -101,7 +101,7 @@ python3 ./test.py -f 2-query/tail.py
python3 ./test.py -f 6-cluster/5dnode1mnode.py python3 ./test.py -f 6-cluster/5dnode1mnode.py
python3 ./test.py -f 6-cluster/5dnode2mnode.py python3 ./test.py -f 6-cluster/5dnode2mnode.py
python3 ./test.py -f 6-cluster/5dnode3mnodeStop.py #python3 ./test.py -f 6-cluster/5dnode3mnodeStop.py
python3 ./test.py -f 6-cluster/5dnode3mnodeDrop.py python3 ./test.py -f 6-cluster/5dnode3mnodeDrop.py
# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeStopInsert.py # BUG python3 ./test.py -f 6-cluster/5dnode3mnodeStopInsert.py
......
...@@ -5,7 +5,12 @@ set /a a=0 ...@@ -5,7 +5,12 @@ set /a a=0
if %1 == full ( if %1 == full (
echo Windows Taosd Full Test echo Windows Taosd Full Test
set /a exitNum=0 set /a exitNum=0
for /F "usebackq tokens=*" %%i in (fulltest.bat) do ( del /Q /F failed.txt
set caseFile="fulltest.bat"
if not "%2" == "" (
set caseFile="%2"
)
for /F "usebackq tokens=*" %%i in (!caseFile!) do (
for /f "tokens=1* delims= " %%a in ("%%i") do if not "%%a" == "@REM" ( for /f "tokens=1* delims= " %%a in ("%%i") do if not "%%a" == "@REM" (
set /a a+=1 set /a a+=1
echo !a! Processing %%i echo !a! Processing %%i
...@@ -13,7 +18,7 @@ if %1 == full ( ...@@ -13,7 +18,7 @@ if %1 == full (
set time1=!_timeTemp! set time1=!_timeTemp!
echo Start at !time! echo Start at !time!
call %%i ARG1 > result_!a!.txt 2>error_!a!.txt call %%i ARG1 > result_!a!.txt 2>error_!a!.txt
if errorlevel 1 ( call :colorEcho 0c "failed" &echo. && set /a exitNum=8 ) else ( call :colorEcho 0a "Success" &echo. ) if errorlevel 1 ( call :colorEcho 0c "failed" &echo. && set /a exitNum=8 && echo %%i >>failed.txt ) else ( call :colorEcho 0a "Success" &echo. )
) )
) )
exit !exitNum! exit !exitNum!
...@@ -77,4 +82,4 @@ for %%a in (%tt%) do ( ...@@ -77,4 +82,4 @@ for %%a in (%tt%) do (
set /a index=index+1 set /a index=index+1
) )
set /a _timeTemp=(%hh%*60+%mm%)*60+%ss% set /a _timeTemp=(%hh%*60+%mm%)*60+%ss%
goto :eof goto :eof
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册