提交 9db55464 编写于 作者: C Cary Xu

Merge branch '3.0' into feature/TD-11274-3.0

......@@ -1519,7 +1519,7 @@ typedef struct {
#define STREAM_TRIGGER_MAX_DELAY 3
typedef struct {
char name[TSDB_TABLE_FNAME_LEN];
char name[TSDB_STREAM_FNAME_LEN];
char sourceDB[TSDB_DB_FNAME_LEN];
char targetStbFullName[TSDB_TABLE_FNAME_LEN];
int8_t igExists;
......@@ -1539,7 +1539,7 @@ int32_t tDeserializeSCMCreateStreamReq(void* buf, int32_t bufLen, SCMCreateStrea
void tFreeSCMCreateStreamReq(SCMCreateStreamReq* pReq);
typedef struct {
char name[TSDB_TOPIC_FNAME_LEN];
char name[TSDB_STREAM_FNAME_LEN];
int64_t streamId;
char* sql;
char* executorMsg;
......@@ -2008,7 +2008,7 @@ typedef struct {
char sql[TSDB_SHOW_SQL_LEN];
uint64_t queryId;
int64_t useconds;
int64_t stime; // timestamp precision ms
int64_t stime; // timestamp precision ms
int64_t reqRid;
int32_t pid;
bool stableQuery;
......@@ -2257,13 +2257,13 @@ typedef struct {
} SMqVDeleteRsp;
typedef struct {
char name[TSDB_STREAM_FNAME_LEN];
int64_t streamId;
} SMDropStreamTaskReq;
char name[TSDB_STREAM_FNAME_LEN];
int8_t igNotExists;
} SMDropStreamReq;
typedef struct {
int8_t reserved;
} SMDropStreamTaskRsp;
} SMDropStreamRsp;
typedef struct {
SMsgHead head;
......
......@@ -95,7 +95,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
* @return
*/
int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, struct SSubplan* pPlan,
qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, EOPTR_EXEC_MODEL model);
qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, const char* sql, EOPTR_EXEC_MODEL model);
/**
*
......@@ -159,13 +159,6 @@ int64_t qGetQueriedTableUid(qTaskInfo_t tinfo);
*/
int32_t qGetQualifiedTableIdList(void* pTableList, const char* tagCond, int32_t tagCondLen, SArray* pTableIdList);
/**
* Update the table id list of a given query.
* @param uid child table uid
* @param type operation type: ADD|DROP
* @return
*/
int32_t qUpdateQueriedTableIdList(qTaskInfo_t tinfo, int64_t uid, int32_t type);
void qProcessFetchRsp(void* parent, struct SRpcMsg* pMsg, struct SEpSet* pEpSet);
......
......@@ -150,12 +150,8 @@ typedef struct SqlFunctionCtx {
} SqlFunctionCtx;
enum {
TEXPR_NODE_DUMMY = 0x0,
TEXPR_BINARYEXPR_NODE= 0x1,
TEXPR_UNARYEXPR_NODE = 0x2,
TEXPR_FUNCTION_NODE = 0x3,
TEXPR_COL_NODE = 0x4,
TEXPR_VALUE_NODE = 0x8,
};
typedef struct tExprNode {
......
......@@ -410,10 +410,10 @@ int32_t* taosGetErrno();
#define TSDB_CODE_SYN_INVALID_CHECKSUM TAOS_DEF_ERROR_CODE(0, 0x0908)
#define TSDB_CODE_SYN_INVALID_MSGLEN TAOS_DEF_ERROR_CODE(0, 0x0909)
#define TSDB_CODE_SYN_INVALID_MSGTYPE TAOS_DEF_ERROR_CODE(0, 0x090A)
#define TSDB_CODE_SYN_NOT_LEADER TAOS_DEF_ERROR_CODE(0, 0x0910)
#define TSDB_CODE_SYN_ONE_REPLICA TAOS_DEF_ERROR_CODE(0, 0x0911)
#define TSDB_CODE_SYN_NOT_IN_NEW_CONFIG TAOS_DEF_ERROR_CODE(0, 0x0912)
#define TSDB_CODE_SYN_IS_LEADER TAOS_DEF_ERROR_CODE(0, 0x090B)
#define TSDB_CODE_SYN_NOT_LEADER TAOS_DEF_ERROR_CODE(0, 0x090C)
#define TSDB_CODE_SYN_ONE_REPLICA TAOS_DEF_ERROR_CODE(0, 0x090D)
#define TSDB_CODE_SYN_NOT_IN_NEW_CONFIG TAOS_DEF_ERROR_CODE(0, 0x090E)
#define TSDB_CODE_SYN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x09FF)
// tq
......
......@@ -210,7 +210,7 @@ typedef enum ELogicConditionType {
#define TSDB_TYPE_STR_MAX_LEN 32
#define TSDB_TABLE_FNAME_LEN (TSDB_DB_FNAME_LEN + TSDB_TABLE_NAME_LEN + TSDB_NAME_DELIMITER_LEN)
#define TSDB_TOPIC_FNAME_LEN (TSDB_ACCT_ID_LEN + TSDB_TABLE_NAME_LEN + TSDB_NAME_DELIMITER_LEN)
#define TSDB_STREAM_FNAME_LEN TSDB_TABLE_FNAME_LEN
#define TSDB_STREAM_FNAME_LEN (TSDB_ACCT_ID_LEN + TSDB_TABLE_NAME_LEN + TSDB_NAME_DELIMITER_LEN)
#define TSDB_SUBSCRIBE_KEY_LEN (TSDB_CGROUP_LEN + TSDB_TOPIC_FNAME_LEN + 2)
#define TSDB_PARTITION_KEY_LEN (TSDB_SUBSCRIBE_KEY_LEN + 20)
#define TSDB_COL_NAME_LEN 65
......
......@@ -44,6 +44,7 @@ void showDB(TAOS* pConn) {
}
void fetchCallback(void* param, void* res, int32_t numOfRow) {
#if 0
printf("numOfRow = %d \n", numOfRow);
int numFields = taos_num_fields(res);
TAOS_FIELD *fields = taos_fetch_fields(res);
......@@ -63,6 +64,13 @@ void fetchCallback(void* param, void* res, int32_t numOfRow) {
// taos_close(_taos);
// taos_cleanup();
}
#endif
if (numOfRow == 0) {
printf("completed\n");
return;
}
taos_fetch_raw_block_a(res, fetchCallback, param);
}
void queryCallback(void* param, void* res, int32_t code) {
......@@ -70,7 +78,7 @@ void queryCallback(void* param, void* res, int32_t code) {
printf("failed to execute, reason:%s\n", taos_errstr(res));
}
printf("start to fetch data\n");
taos_fetch_rows_a(res, fetchCallback, param);
taos_fetch_raw_block_a(res, fetchCallback, param);
}
void queryCallback1(void* param, void* res, int32_t code) {
......@@ -778,9 +786,9 @@ TEST(testCase, async_api_test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(pConn, nullptr);
taos_query(pConn, "use nest");
TAOS_RES* pRes = taos_query(pConn, "select NOW() from (select * from regular_table_2 where tbname in ('regular_table_2_1') and q_bigint <= 9223372036854775807 and q_tinyint <= 127 and q_bool in ( true , false) ) order by ts;");
taos_query(pConn, "use table_alltype_hyperloglog");
#if 0
TAOS_RES* pRes = taos_query(pConn, "insert into tu(ts) values('2022-02-27 12:12:61')");
if (taos_errno(pRes) != 0) {
printf("failed, reason:%s\n", taos_errstr(pRes));
}
......@@ -802,8 +810,9 @@ TEST(testCase, async_api_test) {
printf("%s\n", str);
memset(str, 0, sizeof(str));
}
#endif
taos_query_a(pConn, "alter table test.m1 comment 'abcde' ", queryCallback, pConn);
taos_query_a(pConn, "select HYPERLOGLOG(q_ts) from stable_1_2 where ts between 1630000001000 and 1630100001000 interval(19d) Fill(NONE);", queryCallback, pConn);
getchar();
taos_close(pConn);
}
......
......@@ -1226,6 +1226,7 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) {
SColumnInfoData colInfo = {0};
SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);
colInfo.info = p->info;
colInfo.hasNull = true;
taosArrayPush(pBlock->pDataBlock, &colInfo);
}
......@@ -1583,6 +1584,11 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks
int32_t rowSize = pDataBlock->info.rowSize;
int64_t groupId = pDataBlock->info.groupId;
if (colNum <= 1) {
// invalid if only with TS col
continue;
}
if (rb.nCols != colNum) {
tdSRowSetTpInfo(&rb, colNum, pTSchema->flen);
}
......@@ -1679,23 +1685,28 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks
msgLen += pSubmitBlk->dataLen;
}
(*pReq)->length = msgLen;
(*pReq)->header.vgId = htonl(vgId);
(*pReq)->header.contLen = htonl(msgLen);
(*pReq)->length = (*pReq)->header.contLen;
(*pReq)->numOfBlocks = htonl(numOfBlks);
SSubmitBlk* blk = (SSubmitBlk*)((*pReq) + 1);
while (numOfBlks--) {
int32_t dataLen = blk->dataLen;
blk->uid = htobe64(blk->uid);
blk->suid = htobe64(blk->suid);
blk->padding = htonl(blk->padding);
blk->sversion = htonl(blk->sversion);
blk->dataLen = htonl(blk->dataLen);
blk->schemaLen = htonl(blk->schemaLen);
blk->numOfRows = htons(blk->numOfRows);
blk = (SSubmitBlk*)(blk->data + dataLen);
if (numOfBlks > 0) {
(*pReq)->length = msgLen;
(*pReq)->header.vgId = htonl(vgId);
(*pReq)->header.contLen = htonl(msgLen);
(*pReq)->length = (*pReq)->header.contLen;
(*pReq)->numOfBlocks = htonl(numOfBlks);
SSubmitBlk* blk = (SSubmitBlk*)((*pReq) + 1);
while (numOfBlks--) {
int32_t dataLen = blk->dataLen;
blk->uid = htobe64(blk->uid);
blk->suid = htobe64(blk->suid);
blk->padding = htonl(blk->padding);
blk->sversion = htonl(blk->sversion);
blk->dataLen = htonl(blk->dataLen);
blk->schemaLen = htonl(blk->schemaLen);
blk->numOfRows = htons(blk->numOfRows);
blk = (SSubmitBlk*)(blk->data + dataLen);
}
} else {
// no valid rows
taosMemoryFreeClear(*pReq);
}
return TSDB_CODE_SUCCESS;
......
......@@ -539,27 +539,36 @@ typedef struct {
} SMqRebOutputObj;
typedef struct {
char name[TSDB_STREAM_FNAME_LEN];
char sourceDb[TSDB_DB_FNAME_LEN];
char targetDb[TSDB_DB_FNAME_LEN];
char targetSTbName[TSDB_TABLE_FNAME_LEN];
int64_t targetStbUid;
int64_t createTime;
int64_t updateTime;
int64_t uid;
int64_t dbUid;
int32_t version;
int32_t vgNum;
SRWLatch lock;
int8_t status;
int8_t createdBy; // STREAM_CREATED_BY__USER or SMA
int32_t fixedSinkVgId; // 0 for shuffle
SVgObj fixedSinkVg;
int64_t smaId; // 0 for unused
int8_t trigger;
int64_t triggerParam;
int64_t watermark;
char name[TSDB_STREAM_FNAME_LEN];
// ctl
SRWLatch lock;
// create info
int64_t createTime;
int64_t updateTime;
int32_t version;
int64_t smaId; // 0 for unused
// info
int64_t uid;
int8_t status;
// config
int8_t dropPolicy;
int8_t trigger;
int64_t triggerParam;
int64_t watermark;
// source and target
int64_t sourceDbUid;
int64_t targetDbUid;
char sourceDb[TSDB_DB_FNAME_LEN];
char targetDb[TSDB_DB_FNAME_LEN];
char targetSTbName[TSDB_TABLE_FNAME_LEN];
int64_t targetStbUid;
int32_t fixedSinkVgId; // 0 for shuffle
// fixedSinkVg is not applicable for encode and decode
SVgObj fixedSinkVg;
// transformation
char* sql;
char* ast;
char* physicalPlan;
SArray* tasks; // SArray<SArray<SStreamTask>>
SSchemaWrapper outputSchema;
......
......@@ -32,6 +32,8 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream);
int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64_t uid, int8_t triggerType,
int64_t watermark, double filesFactor);
int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream);
#ifdef __cplusplus
}
#endif
......
......@@ -27,8 +27,7 @@ void mndCleanupStb(SMnode *pMnode);
SStbObj *mndAcquireStb(SMnode *pMnode, char *stbName);
void mndReleaseStb(SMnode *pMnode, SStbObj *pStb);
SSdbRaw *mndStbActionEncode(SStbObj *pStb);
int32_t mndValidateStbInfo(SMnode *pMnode, SSTableVersion *pStbs, int32_t numOfStbs, void **ppRsp,
int32_t *pRspLen);
int32_t mndValidateStbInfo(SMnode *pMnode, SSTableVersion *pStbs, int32_t numOfStbs, void **ppRsp, int32_t *pRspLen);
int32_t mndGetNumOfStbs(SMnode *pMnode, char *dbName, int32_t *pNumOfStbs);
int32_t mndCheckCreateStbReq(SMCreateStbReq *pCreate);
......@@ -36,6 +35,9 @@ SDbObj *mndAcquireDbByStb(SMnode *pMnode, const char *stbName);
int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreate, SDbObj *pDb);
int32_t mndAddStbToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb);
void mndExtractDbNameFromStbFullName(const char *stbFullName, char *dst);
void mndExtractTbNameFromStbFullName(const char *stbFullName, char *dst, int32_t dstSize);
#ifdef __cplusplus
}
#endif
......
......@@ -31,7 +31,8 @@ void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream);
SSdbRaw *mndStreamActionEncode(SStreamObj *pStream);
SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw);
int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, STrans *pTrans);
int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb);
int32_t mndPersistStream(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
#ifdef __cplusplus
}
......
......@@ -18,34 +18,35 @@
#include "mndConsumer.h"
int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
int32_t sz = 0;
/*int32_t outputNameSz = 0;*/
if (tEncodeCStr(pEncoder, pObj->name) < 0) return -1;
if (tEncodeCStr(pEncoder, pObj->sourceDb) < 0) return -1;
if (tEncodeCStr(pEncoder, pObj->targetDb) < 0) return -1;
if (tEncodeCStr(pEncoder, pObj->targetSTbName) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->targetStbUid) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->createTime) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->updateTime) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->uid) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->dbUid) < 0) return -1;
if (tEncodeI32(pEncoder, pObj->version) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->smaId) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->uid) < 0) return -1;
if (tEncodeI8(pEncoder, pObj->status) < 0) return -1;
if (tEncodeI8(pEncoder, pObj->createdBy) < 0) return -1;
if (tEncodeI8(pEncoder, pObj->dropPolicy) < 0) return -1;
if (tEncodeI8(pEncoder, pObj->trigger) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->triggerParam) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->watermark) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->sourceDbUid) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->targetDbUid) < 0) return -1;
if (tEncodeCStr(pEncoder, pObj->sourceDb) < 0) return -1;
if (tEncodeCStr(pEncoder, pObj->targetDb) < 0) return -1;
if (tEncodeCStr(pEncoder, pObj->targetSTbName) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->targetStbUid) < 0) return -1;
if (tEncodeI32(pEncoder, pObj->fixedSinkVgId) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->smaId) < 0) return -1;
if (tEncodeCStr(pEncoder, pObj->sql) < 0) return -1;
/*if (tEncodeCStr(pEncoder, pObj->logicalPlan) < 0) return -1;*/
if (tEncodeCStr(pEncoder, pObj->ast) < 0) return -1;
if (tEncodeCStr(pEncoder, pObj->physicalPlan) < 0) return -1;
// TODO encode tasks
if (pObj->tasks) {
sz = taosArrayGetSize(pObj->tasks);
}
if (tEncodeI32(pEncoder, sz) < 0) return -1;
int32_t sz = taosArrayGetSize(pObj->tasks);
if (tEncodeI32(pEncoder, sz) < 0) return -1;
for (int32_t i = 0; i < sz; i++) {
SArray *pArray = taosArrayGetP(pObj->tasks, i);
int32_t innerSz = taosArrayGetSize(pArray);
......@@ -58,40 +59,37 @@ int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
if (tEncodeSSchemaWrapper(pEncoder, &pObj->outputSchema) < 0) return -1;
#if 0
if (pObj->ColAlias != NULL) {
outputNameSz = taosArrayGetSize(pObj->ColAlias);
}
if (tEncodeI32(pEncoder, outputNameSz) < 0) return -1;
for (int32_t i = 0; i < outputNameSz; i++) {
char *name = taosArrayGetP(pObj->ColAlias, i);
if (tEncodeCStr(pEncoder, name) < 0) return -1;
}
#endif
return pEncoder->pos;
}
int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj) {
if (tDecodeCStrTo(pDecoder, pObj->name) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pObj->sourceDb) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pObj->targetDb) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pObj->targetSTbName) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->targetStbUid) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->createTime) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->updateTime) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->uid) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->dbUid) < 0) return -1;
if (tDecodeI32(pDecoder, &pObj->version) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->smaId) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->uid) < 0) return -1;
if (tDecodeI8(pDecoder, &pObj->status) < 0) return -1;
if (tDecodeI8(pDecoder, &pObj->createdBy) < 0) return -1;
if (tDecodeI8(pDecoder, &pObj->dropPolicy) < 0) return -1;
if (tDecodeI8(pDecoder, &pObj->trigger) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->triggerParam) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->watermark) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->sourceDbUid) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->targetDbUid) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pObj->sourceDb) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pObj->targetDb) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pObj->targetSTbName) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->targetStbUid) < 0) return -1;
if (tDecodeI32(pDecoder, &pObj->fixedSinkVgId) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->smaId) < 0) return -1;
if (tDecodeCStrAlloc(pDecoder, &pObj->sql) < 0) return -1;
/*if (tDecodeCStrAlloc(pDecoder, &pObj->logicalPlan) < 0) return -1;*/
if (tDecodeCStrAlloc(pDecoder, &pObj->ast) < 0) return -1;
if (tDecodeCStrAlloc(pDecoder, &pObj->physicalPlan) < 0) return -1;
pObj->tasks = NULL;
int32_t sz;
if (tDecodeI32(pDecoder, &sz) < 0) return -1;
......@@ -112,21 +110,7 @@ int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj) {
}
if (tDecodeSSchemaWrapper(pDecoder, &pObj->outputSchema) < 0) return -1;
#if 0
int32_t outputNameSz;
if (tDecodeI32(pDecoder, &outputNameSz) < 0) return -1;
if (outputNameSz != 0) {
pObj->ColAlias = taosArrayInit(outputNameSz, sizeof(void *));
if (pObj->ColAlias == NULL) {
return -1;
}
}
for (int32_t i = 0; i < outputNameSz; i++) {
char *name;
if (tDecodeCStrAlloc(pDecoder, &name) < 0) return -1;
taosArrayPush(pObj->ColAlias, &name);
}
#endif
return 0;
}
......
......@@ -397,7 +397,7 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
char logBuf[512] = {0};
char *syncNodeStr = sync2SimpleStr(pMgmt->sync);
snprintf(logBuf, sizeof(logBuf), "==vnodeProcessSyncReq== msgType:%d, syncNode: %s", pMsg->msgType, syncNodeStr);
snprintf(logBuf, sizeof(logBuf), "==mndProcessSyncMsg== msgType:%d, syncNode: %s", pMsg->msgType, syncNodeStr);
static int64_t mndTick = 0;
if (++mndTick % 10 == 1) {
mTrace("sync trace msg:%s, %s", TMSG_INFO(pMsg->msgType), syncNodeStr);
......
......@@ -131,7 +131,7 @@ int32_t mndPersistTaskDeployReq(STrans* pTrans, SStreamTask* pTask, const SEpSet
int32_t mndAddSinkToTask(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, SStreamTask* pTask) {
pTask->dispatchType = TASK_DISPATCH__NONE;
// sink
if (pStream->createdBy == STREAM_CREATED_BY__SMA) {
if (pStream->smaId != 0) {
pTask->sinkType = TASK_SINK__SMA;
pTask->smaSink.smaId = pStream->smaId;
} else {
......@@ -275,7 +275,7 @@ int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, STrans* pTrans, SStreamOb
pTask->execType = TASK_EXEC__NONE;
// sink
if (pStream->createdBy == STREAM_CREATED_BY__SMA) {
if (pStream->smaId != 0) {
pTask->sinkType = TASK_SINK__SMA;
pTask->smaSink.smaId = pStream->smaId;
} else {
......@@ -321,7 +321,7 @@ int32_t mndAddFixedSinkTaskToStream(SMnode* pMnode, STrans* pTrans, SStreamObj*
pTask->execType = TASK_EXEC__NONE;
// sink
if (pStream->createdBy == STREAM_CREATED_BY__SMA) {
if (pStream->smaId != 0) {
pTask->sinkType = TASK_SINK__SMA;
pTask->smaSink.smaId = pStream->smaId;
} else {
......@@ -346,8 +346,6 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
terrno = TSDB_CODE_QRY_INVALID_INPUT;
return -1;
}
ASSERT(pStream->vgNum == 0);
int32_t totLevel = LIST_LENGTH(pPlan->pSubplans);
ASSERT(totLevel <= 2);
pStream->tasks = taosArrayInit(totLevel, sizeof(void*));
......@@ -399,7 +397,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
// exec
pFinalTask->execType = TASK_EXEC__PIPE;
SVgObj* pVgroup = mndSchedFetchOneVg(pMnode, pStream->dbUid);
SVgObj* pVgroup = mndSchedFetchOneVg(pMnode, pStream->sourceDbUid);
if (mndAssignTaskToVg(pMnode, pTrans, pFinalTask, plan, pVgroup) < 0) {
sdbRelease(pSdb, pVgroup);
qDestroyQueryPlan(pPlan);
......@@ -420,7 +418,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
SVgObj* pVgroup;
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
if (pIter == NULL) break;
if (pVgroup->dbUid != pStream->dbUid) {
if (pVgroup->dbUid != pStream->sourceDbUid) {
sdbRelease(pSdb, pVgroup);
continue;
}
......@@ -463,7 +461,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
SVgObj* pVgroup;
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
if (pIter == NULL) break;
if (pVgroup->dbUid != pStream->dbUid) {
if (pVgroup->dbUid != pStream->sourceDbUid) {
sdbRelease(pSdb, pVgroup);
continue;
}
......
......@@ -20,6 +20,7 @@
#include "mndDnode.h"
#include "mndInfoSchema.h"
#include "mndMnode.h"
#include "mndScheduler.h"
#include "mndShow.h"
#include "mndStb.h"
#include "mndStream.h"
......@@ -520,6 +521,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
memcpy(smaObj.db, pDb->name, TSDB_DB_FNAME_LEN);
smaObj.createdTime = taosGetTimestampMs();
smaObj.uid = mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN);
ASSERT(smaObj.uid != 0);
char resultTbName[TSDB_TABLE_FNAME_LEN + 16] = {0};
snprintf(resultTbName, TSDB_TABLE_FNAME_LEN + 16, "td.tsma.rst.tb.%s", pCreate->name);
memcpy(smaObj.dstTbName, resultTbName, TSDB_TABLE_FNAME_LEN);
......@@ -556,13 +558,13 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
streamObj.createTime = taosGetTimestampMs();
streamObj.updateTime = streamObj.createTime;
streamObj.uid = mndGenerateUid(pCreate->name, strlen(pCreate->name));
streamObj.dbUid = pDb->uid;
streamObj.sourceDbUid = pDb->uid;
streamObj.version = 1;
streamObj.sql = pCreate->sql;
streamObj.createdBy = STREAM_CREATED_BY__SMA;
streamObj.smaId = smaObj.uid;
streamObj.watermark = 0;
streamObj.trigger = STREAM_TRIGGER_AT_ONCE;
streamObj.ast = strdup(smaObj.ast);
if (mndAllocSmaVgroup(pMnode, pDb, &streamObj.fixedSinkVg) != 0) {
mError("sma:%s, failed to create since %s", smaObj.name, terrstr());
......@@ -571,6 +573,39 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
smaObj.dstVgId = streamObj.fixedSinkVg.vgId;
streamObj.fixedSinkVgId = smaObj.dstVgId;
SNode *pAst = NULL;
if (nodesStringToNode(streamObj.ast, &pAst) < 0) {
ASSERT(0);
return -1;
}
// extract output schema from ast
if (qExtractResultSchema(pAst, (int32_t *)&streamObj.outputSchema.nCols, &streamObj.outputSchema.pSchema) != 0) {
ASSERT(0);
return -1;
}
SQueryPlan *pPlan = NULL;
SPlanContext cxt = {
.pAstRoot = pAst,
.topicQuery = false,
.streamQuery = true,
.triggerType = streamObj.trigger,
.watermark = streamObj.watermark,
};
if (qCreateQueryPlan(&cxt, &pPlan, NULL) < 0) {
ASSERT(0);
return -1;
}
// save physcial plan
if (nodesNodeToString((SNode *)pPlan, false, &streamObj.physicalPlan, NULL) != 0) {
ASSERT(0);
return -1;
}
if (pAst != NULL) nodesDestroyNode(pAst);
int32_t code = -1;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq);
if (pTrans == NULL) goto _OVER;
......@@ -585,7 +620,8 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
if (mndSetUpdateSmaStbCommitLogs(pMnode, pTrans, pStb) != 0) goto _OVER;
// if (mndSetCreateSmaRedoActions(pMnode, pTrans, pDb, &smaObj) != 0) goto _OVER;
if (mndSetCreateSmaVgroupRedoActions(pMnode, pTrans, pDb, &streamObj.fixedSinkVg, &smaObj) != 0) goto _OVER;
if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, pTrans) != 0) goto _OVER;
if (mndScheduleStream(pMnode, pTrans, &streamObj) != 0) goto _OVER;
if (mndPersistStream(pMnode, pTrans, &streamObj) != 0) goto _OVER;
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
code = 0;
......
......@@ -28,7 +28,6 @@
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
#include "mndSma.h"
#include "tname.h"
#define STB_VER_NUMBER 1
......@@ -1271,7 +1270,8 @@ static int32_t mndBuildStbSchemaImp(SDbObj *pDb, SStbObj *pStb, const char *tbNa
return 0;
}
static int32_t mndBuildStbSchema(SMnode *pMnode, const char *dbFName, const char *tbName, STableMetaRsp *pRsp, int32_t *smaVer) {
static int32_t mndBuildStbSchema(SMnode *pMnode, const char *dbFName, const char *tbName, STableMetaRsp *pRsp,
int32_t *smaVer) {
char tbFName[TSDB_TABLE_FNAME_LEN] = {0};
snprintf(tbFName, sizeof(tbFName), "%s.%s", dbFName, tbName);
......@@ -1672,7 +1672,7 @@ int32_t mndValidateStbInfo(SMnode *pMnode, SSTableVersion *pStbVersions, int32_t
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
for (int32_t i = 0; i < numOfStbs; ++i) {
SSTableVersion *pStbVersion = &pStbVersions[i];
pStbVersion->suid = be64toh(pStbVersion->suid);
......@@ -1681,7 +1681,7 @@ int32_t mndValidateStbInfo(SMnode *pMnode, SSTableVersion *pStbVersions, int32_t
pStbVersion->smaVer = ntohl(pStbVersion->smaVer);
STableMetaRsp metaRsp = {0};
int32_t smaVer = 0;
int32_t smaVer = 0;
mDebug("stb:%s.%s, start to retrieve meta", pStbVersion->dbFName, pStbVersion->stbName);
if (mndBuildStbSchema(pMnode, pStbVersion->dbFName, pStbVersion->stbName, &metaRsp, &smaVer) != 0) {
metaRsp.numOfColumns = -1;
......@@ -1697,15 +1697,15 @@ int32_t mndValidateStbInfo(SMnode *pMnode, SSTableVersion *pStbVersions, int32_t
}
if (pStbVersion->smaVer && pStbVersion->smaVer != smaVer) {
bool exist = false;
char tbFName[TSDB_TABLE_FNAME_LEN];
bool exist = false;
char tbFName[TSDB_TABLE_FNAME_LEN];
STableIndexRsp indexRsp = {0};
indexRsp.pIndex = taosArrayInit(10, sizeof(STableIndexInfo));
if (NULL == indexRsp.pIndex) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
sprintf(tbFName, "%s.%s", pStbVersion->dbFName, pStbVersion->stbName);
int32_t code = mndGetTableSma(pMnode, tbFName, &indexRsp, &exist);
if (code || !exist) {
......@@ -1769,16 +1769,23 @@ int32_t mndGetNumOfStbs(SMnode *pMnode, char *dbName, int32_t *pNumOfStbs) {
return 0;
}
static void mndExtractTableName(char *tableId, char *name) {
void mndExtractDbNameFromStbFullName(const char *stbFullName, char *dst) {
SName name = {0};
tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
tNameGetFullDbName(&name, dst);
}
void mndExtractTbNameFromStbFullName(const char *stbFullName, char *dst, int32_t dstSize) {
int32_t pos = -1;
int32_t num = 0;
for (pos = 0; tableId[pos] != 0; ++pos) {
if (tableId[pos] == TS_PATH_DELIMITER[0]) num++;
for (pos = 0; stbFullName[pos] != 0; ++pos) {
if (stbFullName[pos] == TS_PATH_DELIMITER[0]) num++;
if (num == 2) break;
}
if (num == 2) {
strcpy(name, tableId + pos + 1);
tstrncpy(dst, stbFullName + pos + 1, dstSize);
}
}
......@@ -1808,7 +1815,7 @@ static int32_t mndRetrieveStb(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBloc
SName name = {0};
char stbName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
mndExtractTableName(pStb->name, &stbName[VARSTR_HEADER_SIZE]);
mndExtractTbNameFromStbFullName(pStb->name, &stbName[VARSTR_HEADER_SIZE], TSDB_TABLE_NAME_LEN);
varDataSetLen(stbName, strlen(&stbName[VARSTR_HEADER_SIZE]));
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
......
......@@ -196,7 +196,8 @@ void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream) {
}
static int32_t mndCheckCreateStreamReq(SCMCreateStreamReq *pCreate) {
if (pCreate->name[0] == 0 || pCreate->sql == NULL || pCreate->sql[0] == 0) {
if (pCreate->name[0] == 0 || pCreate->sql == NULL || pCreate->sql[0] == 0 || pCreate->sourceDB[0] == 0 ||
pCreate->targetStbFullName[0] == 0) {
terrno = TSDB_CODE_MND_INVALID_STREAM_OPTION;
return -1;
}
......@@ -232,6 +233,114 @@ static int32_t mndStreamGetPlanString(const char *ast, int8_t triggerType, int64
return code;
}
static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, SCMCreateStreamReq *pCreate) {
SNode *pAst = NULL;
mDebug("stream:%s to create", pCreate->name);
memcpy(pObj->name, pCreate->name, TSDB_STREAM_FNAME_LEN);
pObj->createTime = taosGetTimestampMs();
pObj->updateTime = pObj->createTime;
pObj->version = 1;
pObj->smaId = 0;
pObj->uid = mndGenerateUid(pObj->name, strlen(pObj->name));
pObj->status = 0;
// TODO
pObj->dropPolicy = 0;
pObj->trigger = pCreate->triggerType;
pObj->triggerParam = pCreate->maxDelay;
pObj->watermark = pCreate->watermark;
memcpy(pObj->sourceDb, pCreate->sourceDB, TSDB_DB_FNAME_LEN);
SDbObj *pSourceDb = mndAcquireDb(pMnode, pCreate->sourceDB);
if (pSourceDb == NULL) {
/*ASSERT(0);*/
mDebug("stream:%s failed to create, source db %s not exist", pCreate->name, pObj->sourceDb);
terrno = TSDB_CODE_MND_DB_NOT_EXIST;
return -1;
}
pObj->sourceDbUid = pSourceDb->uid;
memcpy(pObj->targetSTbName, pCreate->targetStbFullName, TSDB_TABLE_FNAME_LEN);
SDbObj *pTargetDb = mndAcquireDbByStb(pMnode, pObj->targetSTbName);
if (pTargetDb == NULL) {
mDebug("stream:%s failed to create, target db %s not exist", pCreate->name, pObj->targetDb);
terrno = TSDB_CODE_MND_DB_NOT_EXIST;
return -1;
}
tstrncpy(pObj->targetDb, pTargetDb->name, TSDB_DB_FNAME_LEN);
pObj->targetStbUid = mndGenerateUid(pObj->targetSTbName, TSDB_TABLE_FNAME_LEN);
pObj->targetDbUid = pTargetDb->uid;
pObj->sql = pCreate->sql;
pObj->ast = pCreate->ast;
pCreate->sql = NULL;
pCreate->ast = NULL;
// deserialize ast
if (nodesStringToNode(pObj->ast, &pAst) < 0) {
/*ASSERT(0);*/
goto FAIL;
}
// extract output schema from ast
if (qExtractResultSchema(pAst, (int32_t *)&pObj->outputSchema.nCols, &pObj->outputSchema.pSchema) != 0) {
/*ASSERT(0);*/
goto FAIL;
}
SQueryPlan *pPlan = NULL;
SPlanContext cxt = {
.pAstRoot = pAst,
.topicQuery = false,
.streamQuery = true,
.triggerType = pObj->trigger == STREAM_TRIGGER_MAX_DELAY ? STREAM_TRIGGER_WINDOW_CLOSE : pObj->trigger,
.watermark = pObj->watermark,
};
// using ast and param to build physical plan
if (qCreateQueryPlan(&cxt, &pPlan, NULL) < 0) {
/*ASSERT(0);*/
goto FAIL;
}
// save physcial plan
if (nodesNodeToString((SNode *)pPlan, false, &pObj->physicalPlan, NULL) != 0) {
/*ASSERT(0);*/
goto FAIL;
}
FAIL:
if (pAst != NULL) nodesDestroyNode(pAst);
return 0;
}
int32_t mndPersistStream(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
SSdbRaw *pCommitRaw = mndStreamActionEncode(pStream);
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
return -1;
}
sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
return 0;
}
int32_t mndPersistDropStreamLog(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
SSdbRaw *pCommitRaw = mndStreamActionEncode(pStream);
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
return -1;
}
sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED);
return 0;
}
int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, STrans *pTrans) {
SNode *pAst = NULL;
......@@ -278,8 +387,8 @@ int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast
}
static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStreamObj *pStream, const char *user) {
SStbObj *pStb = NULL;
SDbObj *pDb = NULL;
SStbObj *pStb = NULL;
SDbObj *pDb = NULL;
SMCreateStbReq createReq = {0};
tstrncpy(createReq.name, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
......@@ -321,7 +430,6 @@ static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStre
goto _OVER;
}
if (mndCheckDbAuth(pMnode, user, MND_OPER_WRITE_DB, pDb) != 0) {
goto _OVER;
}
......@@ -357,24 +465,24 @@ static int32_t mndPersistTaskDropReq(STrans *pTrans, SStreamTask *pTask) {
ASSERT(pTask->nodeId != 0);
// vnode
if (pTask->nodeId > 0) {
SVDropStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVDropStreamTaskReq));
if (pReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pReq->head.vgId = htonl(pTask->nodeId);
pReq->taskId = pTask->taskId;
STransAction action = {0};
memcpy(&action.epSet, &pTask->epSet, sizeof(SEpSet));
action.pCont = pReq;
action.contLen = sizeof(SVDropStreamTaskReq);
action.msgType = TDMT_VND_STREAM_TASK_DROP;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(pReq);
return -1;
}
/*if (pTask->nodeId > 0) {*/
SVDropStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVDropStreamTaskReq));
if (pReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pReq->head.vgId = htonl(pTask->nodeId);
pReq->taskId = pTask->taskId;
STransAction action = {0};
memcpy(&action.epSet, &pTask->epSet, sizeof(SEpSet));
action.pCont = pReq;
action.contLen = sizeof(SVDropStreamTaskReq);
action.msgType = TDMT_VND_STREAM_TASK_DROP;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(pReq);
return -1;
}
/*}*/
return 0;
}
......@@ -404,10 +512,9 @@ static int32_t mndCreateStream(SMnode *pMnode, SRpcMsg *pReq, SCMCreateStreamReq
streamObj.updateTime = streamObj.createTime;
streamObj.uid = mndGenerateUid(pCreate->name, strlen(pCreate->name));
streamObj.targetStbUid = mndGenerateUid(pCreate->targetStbFullName, TSDB_TABLE_FNAME_LEN);
streamObj.dbUid = pDb->uid;
streamObj.sourceDbUid = pDb->uid;
streamObj.version = 1;
streamObj.sql = pCreate->sql;
streamObj.createdBy = STREAM_CREATED_BY__USER;
// TODO
streamObj.fixedSinkVgId = 0;
streamObj.smaId = 0;
......@@ -487,7 +594,8 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
goto _OVER;
}
// TODO check auth
// TODO check read auth for source and write auth for target
#if 0
pDb = mndAcquireDb(pMnode, createStreamReq.sourceDB);
if (pDb == NULL) {
terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
......@@ -497,9 +605,60 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
if (mndCheckDbAuth(pMnode, pReq->conn.user, MND_OPER_WRITE_DB, pDb) != 0) {
goto _OVER;
}
#endif
code = mndCreateStream(pMnode, pReq, &createStreamReq, pDb);
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pReq);
if (pTrans == NULL) {
mError("stream:%s, failed to create since %s", createStreamReq.name, terrstr());
goto _OVER;
}
mndTransSetDbName(pTrans, createStreamReq.sourceDB);
// TODO
/*mndTransSetDbName(pTrans, streamObj.targetDb);*/
mDebug("trans:%d, used to create stream:%s", pTrans->id, createStreamReq.name);
// build stream obj from request
SStreamObj streamObj = {0};
if (mndBuildStreamObjFromCreateReq(pMnode, &streamObj, &createStreamReq) < 0) {
ASSERT(0);
mError("stream:%s, failed to create since %s", createStreamReq.name, terrstr());
goto _OVER;
}
// create stb for stream
if (mndCreateStbForStream(pMnode, pTrans, &streamObj, pReq->conn.user) < 0) {
mError("trans:%d, failed to create stb for stream %s since %s", pTrans->id, createStreamReq.name, terrstr());
mndTransDrop(pTrans);
goto _OVER;
}
// schedule stream task for stream obj
if (mndScheduleStream(pMnode, pTrans, &streamObj) < 0) {
mError("stream:%s, failed to schedule since %s", createStreamReq.name, terrstr());
mndTransDrop(pTrans);
goto _OVER;
}
// add stream to trans
if (mndPersistStream(pMnode, pTrans, &streamObj) < 0) {
mError("stream:%s, failed to schedule since %s", createStreamReq.name, terrstr());
mndTransDrop(pTrans);
goto _OVER;
}
// execute creation
if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
goto _OVER;
}
mndTransDrop(pTrans);
/*code = mndCreateStream(pMnode, pReq, &createStreamReq, pDb);*/
/*if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;*/
code = TSDB_CODE_ACTION_IN_PROGRESS;
_OVER:
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
......@@ -520,13 +679,19 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
/*SDbObj *pDb = NULL;*/
/*SUserObj *pUser = NULL;*/
SMDropStreamTaskReq dropStreamReq = *(SMDropStreamTaskReq *)pReq->pCont;
SMDropStreamReq dropReq = *(SMDropStreamReq *)pReq->pCont;
pStream = mndAcquireStream(pMnode, dropStreamReq.name);
pStream = mndAcquireStream(pMnode, dropReq.name);
if (pStream == NULL) {
terrno = TSDB_CODE_MND_STREAM_NOT_EXIST;
return -1;
if (dropReq.igNotExists) {
mDebug("stream:%s, not exist, ignore not exist is set", dropReq.name);
code = 0;
goto DROP_STREAM_OVER;
} else {
terrno = TSDB_CODE_MND_STREAM_NOT_EXIST;
return -1;
}
}
#if 0
......@@ -539,18 +704,62 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq);
if (pTrans == NULL) {
mError("stream:%s, failed to drop since %s", dropStreamReq.name, terrstr());
return -1;
mError("stream:%s, failed to drop since %s", dropReq.name, terrstr());
return code;
}
mDebug("trans:%d, used to drop stream:%s", pTrans->id, dropStreamReq.name);
mDebug("trans:%d, used to drop stream:%s", pTrans->id, dropReq.name);
// drop all tasks
if (mndDropStreamTasks(pMnode, pTrans, pStream) < 0) {
mError("stream:%s, failed to drop task since %s", dropStreamReq.name, terrstr());
mError("stream:%s, failed to drop task since %s", dropReq.name, terrstr());
return code;
}
// drop stream
if (mndPersistDropStreamLog(pMnode, pTrans, pStream) < 0) {
sdbRelease(pMnode->pSdb, pStream);
return -1;
}
DROP_STREAM_OVER:
return code;
}
int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
SStreamObj *pStream = NULL;
while (1) {
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
if (pIter == NULL) break;
if (pStream->sourceDbUid == pDb->uid || pStream->targetDbUid == pDb->uid) {
if (pStream->sourceDbUid != pStream->targetDbUid) {
sdbRelease(pSdb, pStream);
return -1;
} else {
// TODO drop all task on snode
if (mndPersistDropStreamLog(pMnode, pTrans, pStream) < 0) {
sdbRelease(pSdb, pStream);
return -1;
}
}
} else {
sdbRelease(pSdb, pStream);
continue;
}
#if 0
if (mndSetDropOffsetStreamLogs(pMnode, pTrans, pStream) < 0) {
sdbRelease(pSdb, pStream);
goto END;
}
#endif
sdbRelease(pSdb, pStream);
}
return 0;
}
......@@ -569,7 +778,7 @@ static int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfS
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
if (pIter == NULL) break;
if (pStream->dbUid == pDb->uid) {
if (pStream->sourceDbUid == pDb->uid) {
numOfStreams++;
}
......
......@@ -256,14 +256,6 @@ void mndSyncStart(SMnode *pMnode) {
syncSetMsgCb(pMgmt->sync, &pMnode->msgCb);
syncStart(pMgmt->sync);
mDebug("mnode sync started, id:%" PRId64 " standby:%d", pMgmt->sync, pMgmt->standby);
/*
if (pMgmt->standby) {
syncStartStandBy(pMgmt->sync);
} else {
syncStart(pMgmt->sync);
}
*/
}
void mndSyncStop(SMnode *pMnode) {}
......
......@@ -260,6 +260,7 @@ struct SSma {
#define SMA_CFG(s) (&(s)->pVnode->config)
#define SMA_TSDB_CFG(s) (&(s)->pVnode->config.tsdbCfg)
#define SMA_RETENTION(s) ((SRetention *)&(s)->pVnode->config.tsdbCfg.retentions)
#define SMA_LOCKED(s) ((s)->locked)
#define SMA_META(s) ((s)->pVnode->pMeta)
#define SMA_VID(s) TD_VID((s)->pVnode)
......
......@@ -400,22 +400,24 @@ static FORCE_INLINE int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int3
}
if (taosArrayGetSize(pResult) > 0) {
#if 1
#if 0
char flag[10] = {0};
snprintf(flag, 10, "level %" PRIi8, level);
blockDebugShowData(pResult, flag);
#endif
STsdb *sinkTsdb = (level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb1 : pSma->pRSmaTsdb2);
SSubmitReq *pReq = NULL;
if (buildSubmitReqFromDataBlock(&pReq, pResult, pTSchema, SMA_VID(pSma), suid) != 0) {
if (buildSubmitReqFromDataBlock(&pReq, pResult, pTSchema, SMA_VID(pSma), suid) < 0) {
taosArrayDestroy(pResult);
return TSDB_CODE_FAILED;
}
if (tdProcessSubmitReq(sinkTsdb, INT64_MAX, pReq) != 0) {
if (pReq && tdProcessSubmitReq(sinkTsdb, INT64_MAX, pReq) < 0) {
taosArrayDestroy(pResult);
taosMemoryFreeClear(pReq);
return TSDB_CODE_FAILED;
}
taosMemoryFreeClear(pReq);
} else {
smaDebug("vgId:%d, no rsma % " PRIi8 " data generated since %s", SMA_VID(pSma), level, tstrerror(terrno));
......@@ -469,6 +471,12 @@ int32_t tdProcessRSmaSubmit(SSma *pSma, void *pMsg, int32_t inputType) {
return TSDB_CODE_SUCCESS;
}
SRetention *pRetention = SMA_RETENTION(pSma);
if (!RETENTION_VALID(pRetention + 1)) {
// return directly if retention level 1 is invalid
return TSDB_CODE_SUCCESS;
}
if (inputType == STREAM_DATA_TYPE_SUBMIT_BLOCK) {
STbUidStore uidStore = {0};
tdFetchSubmitReqSuids(pMsg, &uidStore);
......
......@@ -300,108 +300,6 @@ void vnodeUpdateMetaRsp(SVnode *pVnode, STableMetaRsp *pMetaRsp) {
pMetaRsp->precision = pVnode->config.tsdbCfg.precision;
}
int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
int32_t ret = 0;
if (syncEnvIsStart()) {
SSyncNode *pSyncNode = syncNodeAcquire(pVnode->sync);
assert(pSyncNode != NULL);
ESyncState state = syncGetMyRole(pVnode->sync);
SyncTerm currentTerm = syncGetMyTerm(pVnode->sync);
SMsgHead *pHead = pMsg->pCont;
char logBuf[512] = {0};
char *syncNodeStr = sync2SimpleStr(pVnode->sync);
snprintf(logBuf, sizeof(logBuf), "==vnodeProcessSyncReq== msgType:%d, syncNode: %s", pMsg->msgType, syncNodeStr);
static int64_t vndTick = 0;
if (++vndTick % 10 == 1) {
vTrace("sync trace msg:%s, %s", TMSG_INFO(pMsg->msgType), syncNodeStr);
}
syncRpcMsgLog2(logBuf, pMsg);
taosMemoryFree(syncNodeStr);
SRpcMsg *pRpcMsg = pMsg;
if (pRpcMsg->msgType == TDMT_SYNC_TIMEOUT) {
SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
ret = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg);
syncTimeoutDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_SYNC_PING) {
SyncPing *pSyncMsg = syncPingFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
ret = syncNodeOnPingCb(pSyncNode, pSyncMsg);
syncPingDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_SYNC_PING_REPLY) {
SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
ret = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg);
syncPingReplyDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
ret = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg);
syncClientRequestDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
ret = syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg);
syncRequestVoteDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) {
SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
ret = syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg);
syncRequestVoteReplyDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) {
SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
ret = syncNodeOnAppendEntriesCb(pSyncNode, pSyncMsg);
syncAppendEntriesDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) {
SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
ret = syncNodeOnAppendEntriesReplyCb(pSyncNode, pSyncMsg);
syncAppendEntriesReplyDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_SYNC_SET_VNODE_STANDBY) {
ret = syncSetStandby(pVnode->sync);
vInfo("vgId:%d, set standby result:0x%x rid:%" PRId64, pVnode->config.vgId, ret, pVnode->sync);
SRpcMsg rsp = {.code = ret, .info = pMsg->info};
tmsgSendRsp(&rsp);
} else {
vError("==vnodeProcessSyncReq== error msg type:%d", pRpcMsg->msgType);
ret = -1;
}
syncNodeRelease(pSyncNode);
} else {
vError("==vnodeProcessSyncReq== error syncEnv stop");
ret = -1;
}
if (ret != 0) {
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
}
return ret;
}
static int32_t vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
SVCreateStbReq req = {0};
SDecoder coder;
......
......@@ -50,6 +50,33 @@ static inline void vnodePostBlockMsg(SVnode *pVnode, tmsg_t type) {
}
}
static int32_t vnodeSetStandBy(SVnode *pVnode) {
vInfo("vgId:%d, start to set standby", TD_VID(pVnode));
if (syncSetStandby(pVnode->sync) == 0) {
vInfo("vgId:%d, set standby success", TD_VID(pVnode));
return 0;
} else if (terrno != TSDB_CODE_SYN_IS_LEADER) {
vError("vgId:%d, failed to set standby since %s", TD_VID(pVnode), terrstr());
return -1;
}
if (syncLeaderTransfer(pVnode->sync) != 0) {
vError("vgId:%d, failed to transfer leader since:%s", TD_VID(pVnode), terrstr());
return -1;
} else {
vInfo("vgId:%d, transfer leader success", TD_VID(pVnode));
}
if (syncSetStandby(pVnode->sync) == 0) {
vInfo("vgId:%d, set standby success", TD_VID(pVnode));
return 0;
} else {
vError("vgId:%d, failed to set standby since %s", TD_VID(pVnode), terrstr());
return -1;
}
}
static int32_t vnodeProcessAlterReplicaReq(SVnode *pVnode, SRpcMsg *pMsg) {
SAlterVnodeReq req = {0};
if (tDeserializeSAlterVnodeReq((char *)pMsg->pCont + sizeof(SMsgHead), pMsg->contLen - sizeof(SMsgHead), &req) != 0) {
......@@ -74,14 +101,19 @@ static int32_t vnodeProcessAlterReplicaReq(SVnode *pVnode, SRpcMsg *pMsg) {
int32_t code = syncPropose(pVnode->sync, &rpcMsg, false);
if (code != 0) {
vDebug("vgId:%d, failed to propose reconfig msg since %s", TD_VID(pVnode), terrstr());
if (syncLeaderTransfer(pVnode->sync) != 0) {
vError("vgId:%d, failed to transfer leader since %s", TD_VID(pVnode), terrstr());
} else {
vDebug("vgId:%d, transfer leader success, propose reconfig config again", TD_VID(pVnode));
if (terrno != 0) code = terrno;
vInfo("vgId:%d, failed to propose reconfig msg since %s", TD_VID(pVnode), terrstr());
if (terrno == TSDB_CODE_SYN_IS_LEADER) {
if (syncLeaderTransfer(pVnode->sync) != 0) {
vError("vgId:%d, failed to transfer leader since %s", TD_VID(pVnode), terrstr());
} else {
vInfo("vgId:%d, transfer leader success", TD_VID(pVnode));
}
}
}
terrno = code;
return code;
}
......@@ -108,7 +140,6 @@ void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
if (code == 0) {
vnodeAccumBlockMsg(pVnode, pMsg->msgType);
} else if (code == -1 && terrno == TSDB_CODE_SYN_NOT_LEADER) {
SEpSet newEpSet = {0};
syncGetEpSet(pVnode->sync, &newEpSet);
......@@ -170,6 +201,108 @@ void vnodeApplyMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
}
}
int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
int32_t ret = 0;
if (syncEnvIsStart()) {
SSyncNode *pSyncNode = syncNodeAcquire(pVnode->sync);
assert(pSyncNode != NULL);
ESyncState state = syncGetMyRole(pVnode->sync);
SyncTerm currentTerm = syncGetMyTerm(pVnode->sync);
SMsgHead *pHead = pMsg->pCont;
char logBuf[512] = {0};
char *syncNodeStr = sync2SimpleStr(pVnode->sync);
snprintf(logBuf, sizeof(logBuf), "==vnodeProcessSyncReq== msgType:%d, syncNode: %s", pMsg->msgType, syncNodeStr);
static int64_t vndTick = 0;
if (++vndTick % 10 == 1) {
vTrace("sync trace msg:%s, %s", TMSG_INFO(pMsg->msgType), syncNodeStr);
}
syncRpcMsgLog2(logBuf, pMsg);
taosMemoryFree(syncNodeStr);
SRpcMsg *pRpcMsg = pMsg;
if (pRpcMsg->msgType == TDMT_SYNC_TIMEOUT) {
SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
ret = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg);
syncTimeoutDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_SYNC_PING) {
SyncPing *pSyncMsg = syncPingFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
ret = syncNodeOnPingCb(pSyncNode, pSyncMsg);
syncPingDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_SYNC_PING_REPLY) {
SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
ret = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg);
syncPingReplyDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
ret = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg);
syncClientRequestDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
ret = syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg);
syncRequestVoteDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) {
SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
ret = syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg);
syncRequestVoteReplyDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) {
SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
ret = syncNodeOnAppendEntriesCb(pSyncNode, pSyncMsg);
syncAppendEntriesDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) {
SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
ret = syncNodeOnAppendEntriesReplyCb(pSyncNode, pSyncMsg);
syncAppendEntriesReplyDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_SYNC_SET_VNODE_STANDBY) {
ret = vnodeSetStandBy(pVnode);
if (ret != 0 && terrno != 0) ret = terrno;
SRpcMsg rsp = {.code = ret, .info = pMsg->info};
tmsgSendRsp(&rsp);
} else {
vError("==vnodeProcessSyncReq== error msg type:%d", pRpcMsg->msgType);
ret = -1;
}
syncNodeRelease(pSyncNode);
} else {
vError("==vnodeProcessSyncReq== error syncEnv stop");
ret = -1;
}
if (ret != 0) {
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
}
return ret;
}
static int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
int32_t code = tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg);
if (code != 0) {
......@@ -258,17 +391,17 @@ static void vnodeSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta
syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);
}
int32_t vnodeSnapshotStartRead(struct SSyncFSM *pFsm, void **ppReader) { return 0; }
static int32_t vnodeSnapshotStartRead(struct SSyncFSM *pFsm, void **ppReader) { return 0; }
int32_t vnodeSnapshotStopRead(struct SSyncFSM *pFsm, void *pReader) { return 0; }
static int32_t vnodeSnapshotStopRead(struct SSyncFSM *pFsm, void *pReader) { return 0; }
int32_t vnodeSnapshotDoRead(struct SSyncFSM *pFsm, void *pReader, void **ppBuf, int32_t *len) { return 0; }
static int32_t vnodeSnapshotDoRead(struct SSyncFSM *pFsm, void *pReader, void **ppBuf, int32_t *len) { return 0; }
int32_t vnodeSnapshotStartWrite(struct SSyncFSM *pFsm, void **ppWriter) { return 0; }
static int32_t vnodeSnapshotStartWrite(struct SSyncFSM *pFsm, void **ppWriter) { return 0; }
int32_t vnodeSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool isApply) { return 0; }
static int32_t vnodeSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool isApply) { return 0; }
int32_t vnodeSnapshotDoWrite(struct SSyncFSM *pFsm, void *pWriter, void *pBuf, int32_t len) { return 0; }
static int32_t vnodeSnapshotDoWrite(struct SSyncFSM *pFsm, void *pWriter, void *pBuf, int32_t len) { return 0; }
static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
......@@ -279,7 +412,6 @@ static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
pFsm->FpGetSnapshot = vnodeSyncGetSnapshot;
pFsm->FpRestoreFinishCb = NULL;
pFsm->FpReConfigCb = vnodeSyncReconfig;
pFsm->FpSnapshotStartRead = vnodeSnapshotStartRead;
pFsm->FpSnapshotStopRead = vnodeSnapshotStopRead;
pFsm->FpSnapshotDoRead = vnodeSnapshotDoRead;
......@@ -292,7 +424,6 @@ static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
int32_t vnodeSyncOpen(SVnode *pVnode, char *path) {
SSyncInfo syncInfo = {
.isStandBy = false,
.snapshotEnable = false,
.vgId = pVnode->config.vgId,
.isStandBy = pVnode->config.standby,
......@@ -321,13 +452,6 @@ int32_t vnodeSyncOpen(SVnode *pVnode, char *path) {
void vnodeSyncStart(SVnode *pVnode) {
syncSetMsgCb(pVnode->sync, &pVnode->msgCb);
syncStart(pVnode->sync);
/*
if (pVnode->config.standby) {
syncStartStandBy(pVnode->sync);
} else {
syncStart(pVnode->sync);
}
*/
}
void vnodeSyncClose(SVnode *pVnode) { syncStop(pVnode->sync); }
......@@ -55,7 +55,6 @@ typedef struct SResultRow {
uint32_t numOfRows; // number of rows of current time window
STimeWindow win;
struct SResultRowEntryInfo pEntryInfo[]; // For each result column, there is a resultInfo
// char *key; // start key of current result row
} SResultRow;
typedef struct SResultRowPosition {
......
......@@ -189,7 +189,7 @@ typedef struct SExecTaskInfo {
} schemaVer;
STableListInfo tableqinfoList; // this is a table list
char* sql; // query sql string
const char* sql; // query sql string
jmp_buf env; // jump to this position when error happens.
EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model]
struct SOperatorInfo* pRoot;
......@@ -544,6 +544,13 @@ typedef struct SFillOperatorInfo {
bool multigroupResult;
} SFillOperatorInfo;
typedef struct SScalarSupp {
SExprInfo* pScalarExprInfo;
int32_t numOfScalarExpr; // the number of scalar expression in group operator
SqlFunctionCtx* pScalarFuncCtx;
int32_t* rowCellInfoOffset; // offset value for each row result cell info
} SScalarSupp;
typedef struct SGroupbyOperatorInfo {
// SOptrBasicInfo should be first, SAggSupporter should be second for stream encode
SOptrBasicInfo binfo;
......@@ -556,10 +563,7 @@ typedef struct SGroupbyOperatorInfo {
char* keyBuf; // group by keys for hash
int32_t groupKeyLen; // total group by column width
SGroupResInfo groupResInfo;
SExprInfo* pScalarExprInfo;
int32_t numOfScalarExpr; // the number of scalar expression in group operator
SqlFunctionCtx* pScalarFuncCtx;
int32_t* rowCellInfoOffset; // offset value for each row result cell info
SScalarSupp scalarSup;
} SGroupbyOperatorInfo;
typedef struct SDataGroupInfo {
......@@ -583,6 +587,7 @@ typedef struct SPartitionOperatorInfo {
void* pGroupIter; // group iterator
int32_t pageIndex; // page index of current group
SSDataBlock* pUpdateRes;
SScalarSupp scalarSupp;
} SPartitionOperatorInfo;
typedef struct SWindowRowsSup {
......@@ -760,6 +765,8 @@ void getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t
int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t *order, int32_t* scanFlag);
int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz);
SArray* extractPartitionColInfo(SNodeList* pNodeList);
void doSetOperatorCompleted(SOperatorInfo* pOperator);
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock);
SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, int32_t** rowCellInfoOffset);
......@@ -839,20 +846,17 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExp
SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols,
SSDataBlock* pResBlock, STimeWindowAggSupp *pTwAggSupp, int32_t tsSlotId, SColumn* pStateKeyCol, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResultBlock, SArray* pGroupColList, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNode* pPartNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResultBlock, const SNodeListNode* pValNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SNode* pOnCondition, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream,
SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, int64_t gap,
int32_t tsSlotId, STimeWindowAggSupp* pTwAggSupp, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, int64_t gap, int32_t tsSlotId, STimeWindowAggSupp* pTwAggSupp, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream,
SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo);
#if 0
SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv);
......@@ -890,7 +894,7 @@ int32_t decodeOperator(SOperatorInfo* ops, char* data, int32_t length);
void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status);
int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
EOPTR_EXEC_MODEL model);
const char* sql, EOPTR_EXEC_MODEL model);
int32_t createDataSinkParam(SDataSinkNode *pNode, void **pParam, qTaskInfo_t* pTaskInfo);
int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SExplainExecInfo** pRes, int32_t* capacity,
int32_t* resNum);
......
......@@ -151,6 +151,13 @@ SSDataBlock* tsortGetSortedDataBlock(const SSortHandle* pSortHandle);
*/
SSortExecInfo tsortGetSortExecInfo(SSortHandle* pHandle);
/**
* get proper sort buffer pages according to the row size
* @param rowSize
* @return
*/
int32_t getProperSortPageSize(size_t rowSize);
#ifdef __cplusplus
}
#endif
......
......@@ -28,17 +28,6 @@ typedef struct SCompSupporter {
int32_t order;
} SCompSupporter;
int32_t getOutputInterResultBufSize(STaskAttr* pQueryAttr) {
int32_t size = 0;
for (int32_t i = 0; i < pQueryAttr->numOfOutput; ++i) {
// size += pQueryAttr->pExpr1[i].base.interBytes;
}
assert(size >= 0);
return size;
}
int32_t initResultRowInfo(SResultRowInfo *pResultRowInfo, int32_t size) {
pResultRowInfo->size = 0;
pResultRowInfo->cur.pageId = -1;
......
......@@ -121,7 +121,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle) {
}
qTaskInfo_t pTaskInfo = NULL;
code = qCreateExecTask(streamReadHandle, 0, 0, plan, &pTaskInfo, NULL, OPTR_EXEC_MODEL_STREAM);
code = qCreateExecTask(streamReadHandle, 0, 0, plan, &pTaskInfo, NULL, NULL, OPTR_EXEC_MODEL_STREAM);
if (code != TSDB_CODE_SUCCESS) {
// TODO: destroy SSubplan & pTaskInfo
terrno = code;
......
......@@ -31,13 +31,13 @@ static void initRefPool() {
}
int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan,
qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, EOPTR_EXEC_MODEL model) {
qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, const char* sql, EOPTR_EXEC_MODEL model) {
assert(readHandle != NULL && pSubplan != NULL);
SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo;
taosThreadOnce(&initPoolOnce, initRefPool);
int32_t code = createExecTaskInfoImpl(pSubplan, pTask, readHandle, taskId, model);
int32_t code = createExecTaskInfoImpl(pSubplan, pTask, readHandle, taskId, sql, model);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
......
......@@ -40,9 +40,6 @@
#define IS_MAIN_SCAN(runtime) ((runtime)->scanFlag == MAIN_SCAN)
#define SET_REVERSE_SCAN_FLAG(runtime) ((runtime)->scanFlag = REVERSE_SCAN)
#define SDATA_BLOCK_INITIALIZER \
(SDataBlockInfo) { {0}, 0 }
#define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) == TSDB_ORDER_ASC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP)
#if 0
......@@ -95,8 +92,6 @@ static void setBlockStatisInfo(SqlFunctionCtx* pCtx, SExprInfo* pExpr, SSDataBlo
static void destroyTableQueryInfoImpl(STableQueryInfo* pTableQueryInfo);
static SColumnInfo* extractColumnFilterInfo(SExprInfo* pExpr, int32_t numOfOutput, int32_t* numOfFilterCols);
static void releaseQueryBuf(size_t numOfTables);
static void destroySFillOperatorInfo(void* param, int32_t numOfOutput);
......@@ -454,75 +449,6 @@ static bool chkWindowOutputBufByKey(STaskRuntimeEnv* pRuntimeEnv, SResultRowInfo
return chkResultRowFromKey(pRuntimeEnv, pResultRowInfo, (char*)&win->skey, TSDB_KEYSIZE, masterscan, groupId);
}
static void doUpdateResultRowIndex(SResultRowInfo* pResultRowInfo, TSKEY lastKey, bool ascQuery,
bool timeWindowInterpo) {
int64_t skey = TSKEY_INITIAL_VAL;
#if 0
int32_t i = 0;
for (i = pResultRowInfo->size - 1; i >= 0; --i) {
SResultRow* pResult = pResultRowInfo->pResult[i];
if (pResult->closed) {
break;
}
// new closed result rows
if (timeWindowInterpo) {
if (pResult->endInterp &&
((pResult->win.skey <= lastKey && ascQuery) || (pResult->win.skey >= lastKey && !ascQuery))) {
if (i > 0) { // the first time window, the startInterp is false.
assert(pResult->startInterp);
}
closeResultRow(pResultRowInfo, i);
} else {
skey = pResult->win.skey;
}
} else {
if ((pResult->win.ekey <= lastKey && ascQuery) || (pResult->win.skey >= lastKey && !ascQuery)) {
closeResultRow(pResultRowInfo, i);
} else {
skey = pResult->win.skey;
}
}
}
// all result rows are closed, set the last one to be the skey
if (skey == TSKEY_INITIAL_VAL) {
if (pResultRowInfo->size == 0) {
// assert(pResultRowInfo->current == NULL);
assert(pResultRowInfo->curPos == -1);
pResultRowInfo->curPos = -1;
} else {
pResultRowInfo->curPos = pResultRowInfo->size - 1;
}
} else {
for (i = pResultRowInfo->size - 1; i >= 0; --i) {
SResultRow* pResult = pResultRowInfo->pResult[i];
if (pResult->closed) {
break;
}
}
if (i == pResultRowInfo->size - 1) {
pResultRowInfo->curPos = i;
} else {
pResultRowInfo->curPos = i + 1; // current not closed result object
}
}
#endif
}
//
// static void updateResultRowInfoActiveIndex(SResultRowInfo* pResultRowInfo, const STimeWindow* pWin, TSKEY lastKey,
// bool ascQuery, bool interp) {
// if ((lastKey > pWin->ekey && ascQuery) || (lastKey < pWin->ekey && (!ascQuery))) {
// closeAllResultRows(pResultRowInfo);
// pResultRowInfo->curPos = pResultRowInfo->size - 1;
// } else {
// int32_t step = ascQuery ? 1 : -1;
// doUpdateResultRowIndex(pResultRowInfo, lastKey - step, ascQuery, interp);
// }
//}
// query_range_start, query_range_end, window_duration, window_start, window_end
void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow) {
pColData->info.type = TSDB_DATA_TYPE_TIMESTAMP;
......@@ -886,7 +812,6 @@ int32_t setGroupResultOutputBuf(SOptrBasicInfo* binfo, int32_t numOfCols, char*
doSetResultOutBufByKey(pBuf, pResultRowInfo, (char*)pData, bytes, true, groupId, pTaskInfo, false, pAggSup);
assert(pResultRow != NULL);
setResultRowKey(pResultRow, pData, type);
setResultRowInitCtx(pResultRow, pCtx, numOfCols, binfo->rowCellInfoOffset);
return TSDB_CODE_SUCCESS;
}
......@@ -908,21 +833,6 @@ bool functionNeedToExecute(SqlFunctionCtx* pCtx) {
return false;
}
// if (functionId == FUNCTION_FIRST_DST || functionId == FUNCTION_FIRST) {
// // return QUERY_IS_ASC_QUERY(pQueryAttr);
// }
//
// // denote the order type
// if ((functionId == FUNCTION_LAST_DST || functionId == FUNCTION_LAST)) {
// // return pCtx->param[0].i == pQueryAttr->order.order;
// }
// in the reverse table scan, only the following functions need to be executed
// if (IS_REVERSE_SCAN(pRuntimeEnv) ||
// (pRuntimeEnv->scanFlag == REPEAT_SCAN && functionId != FUNCTION_STDDEV && functionId != FUNCTION_PERCT)) {
// return false;
// }
return true;
}
......@@ -3935,27 +3845,6 @@ void initResultSizeInfo(SOperatorInfo* pOperator, int32_t numOfRows) {
}
}
// static STableQueryInfo* initTableQueryInfo(const STableListInfo* pTableListInfo) {
// int32_t size = taosArrayGetSize(pTableListInfo->pTableList);
// if (size == 0) {
// return NULL;
// }
//
// STableQueryInfo* pTableQueryInfo = taosMemoryCalloc(size, sizeof(STableQueryInfo));
// if (pTableQueryInfo == NULL) {
// return NULL;
// }
//
// for (int32_t j = 0; j < size; ++j) {
// STableKeyInfo* pk = taosArrayGet(pTableListInfo->pTableList, j);
// STableQueryInfo* pTQueryInfo = &pTableQueryInfo[j];
// pTQueryInfo->lastKey = pk->lastKey;
// }
//
// pTableQueryInfo->lastKey = 0;
// return pTableQueryInfo;
//}
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResultBlock, SExprInfo* pScalarExprInfo,
int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo) {
......@@ -4525,7 +4414,6 @@ static tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SRead
static SArray* extractColumnInfo(SNodeList* pNodeList);
static SArray* createSortInfo(SNodeList* pNodeList);
static SArray* extractPartitionColInfo(SNodeList* pNodeList);
int32_t extractTableSchemaVersion(SReadHandle* pHandle, uint64_t uid, SExecTaskInfo* pTaskInfo) {
SMetaReader mr = {0};
......@@ -4645,12 +4533,14 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
tsdbReaderT pDataReader =
doCreateDataReader(pTableScanNode, pHandle, pTableListInfo, (uint64_t)queryId, taskId, pTagCond);
if (pDataReader == NULL && terrno != 0) {
pTaskInfo->code = terrno;
return NULL;
}
int32_t code = extractTableSchemaVersion(pHandle, pTableScanNode->scan.uid, pTaskInfo);
if (code) {
tsdbCleanupReadHandle(pDataReader);
pTaskInfo->code = terrno;
return NULL;
}
......@@ -4659,6 +4549,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
taosArrayDestroy(groupKeys);
if (code) {
tsdbCleanupReadHandle(pDataReader);
pTaskInfo->code = terrno;
return NULL;
}
......@@ -4871,12 +4762,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_PARTITION == type) {
SPartitionPhysiNode* pPartNode = (SPartitionPhysiNode*)pPhyNode;
SArray* pColList = extractPartitionColInfo(pPartNode->pPartitionKeys);
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
SExprInfo* pExprInfo = createExprInfo(pPartNode->pTargets, NULL, &num);
pOptr = createPartitionOperatorInfo(ops[0], pExprInfo, num, pResBlock, pColList, pTaskInfo);
pOptr = createPartitionOperatorInfo(ops[0], (SPartitionPhysiNode*)pPhyNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE == type) {
SStateWinodwPhysiNode* pStateNode = (SStateWinodwPhysiNode*)pPhyNode;
......@@ -5314,7 +5200,7 @@ int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pT
}
int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
EOPTR_EXEC_MODEL model) {
const char* sql, EOPTR_EXEC_MODEL model) {
uint64_t queryId = pPlan->id.queryId;
int32_t code = TSDB_CODE_SUCCESS;
......@@ -5324,6 +5210,7 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead
goto _complete;
}
(*pTaskInfo)->sql = sql;
(*pTaskInfo)->pRoot = createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, queryId, taskId,
&(*pTaskInfo)->tableqinfoList, pPlan->pTagCond);
if (NULL == (*pTaskInfo)->pRoot) {
......
......@@ -320,8 +320,8 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order, scanFlag, true);
// there is an scalar expression that needs to be calculated right before apply the group aggregation.
if (pInfo->pScalarExprInfo != NULL) {
pTaskInfo->code = projectApplyFunctions(pInfo->pScalarExprInfo, pBlock, pBlock, pInfo->pScalarFuncCtx, pInfo->numOfScalarExpr, NULL);
if (pInfo->scalarSup.pScalarExprInfo != NULL) {
pTaskInfo->code = projectApplyFunctions(pInfo->scalarSup.pScalarExprInfo, pBlock, pBlock, pInfo->scalarSup.pScalarFuncCtx, pInfo->scalarSup.numOfScalarExpr, NULL);
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, pTaskInfo->code);
}
......@@ -385,9 +385,9 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
pInfo->pGroupCols = pGroupColList;
pInfo->pCondition = pCondition;
pInfo->pScalarExprInfo = pScalarExprInfo;
pInfo->numOfScalarExpr = numOfScalarExpr;
pInfo->pScalarFuncCtx = createSqlFunctionCtx(pScalarExprInfo, numOfScalarExpr, &pInfo->rowCellInfoOffset);
pInfo->scalarSup.pScalarExprInfo = pScalarExprInfo;
pInfo->scalarSup.numOfScalarExpr = numOfScalarExpr;
pInfo->scalarSup.pScalarFuncCtx = createSqlFunctionCtx(pScalarExprInfo, numOfScalarExpr, &pInfo->scalarSup.rowCellInfoOffset);
int32_t code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pGroupColList);
if (code != TSDB_CODE_SUCCESS) {
......@@ -624,7 +624,9 @@ static SSDataBlock* hashPartition(SOperatorInfo* pOperator) {
return NULL;
}
SGroupbyOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SPartitionOperatorInfo* pInfo = pOperator->info;
SSDataBlock* pRes = pInfo->binfo.pRes;
if (pOperator->status == OP_RES_TO_RETURN) {
......@@ -641,6 +643,14 @@ static SSDataBlock* hashPartition(SOperatorInfo* pOperator) {
break;
}
// there is an scalar expression that needs to be calculated right before apply the group aggregation.
if (pInfo->scalarSupp.pScalarExprInfo != NULL) {
pTaskInfo->code = projectApplyFunctions(pInfo->scalarSupp.pScalarExprInfo, pBlock, pBlock, pInfo->scalarSupp.pScalarFuncCtx, pInfo->scalarSupp.numOfScalarExpr, NULL);
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, pTaskInfo->code);
}
}
doHashPartition(pOperator, pBlock);
}
......@@ -665,15 +675,26 @@ static void destroyPartitionOperatorInfo(void* param, int32_t numOfOutput) {
taosMemoryFree(pInfo->columnOffset);
}
SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResultBlock, SArray* pGroupColList, SExecTaskInfo* pTaskInfo) {
SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNode* pPartNode, SExecTaskInfo* pTaskInfo) {
SPartitionOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SPartitionOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
goto _error;
}
pInfo->pGroupCols = pGroupColList;
SSDataBlock* pResBlock = createResDataBlock(pPartNode->node.pOutputDataBlockDesc);
int32_t numOfCols = 0;
SExprInfo* pExprInfo = createExprInfo(pPartNode->pTargets, NULL, &numOfCols);
pInfo->pGroupCols = extractPartitionColInfo(pPartNode->pPartitionKeys);
if (pPartNode->pExprs != NULL) {
pInfo->scalarSupp.numOfScalarExpr = 0;
pInfo->scalarSupp.pScalarExprInfo = createExprInfo(pPartNode->pExprs, NULL, &pInfo->scalarSupp.numOfScalarExpr);
pInfo->scalarSupp.pScalarFuncCtx = createSqlFunctionCtx(
pInfo->scalarSupp.pScalarExprInfo, pInfo->scalarSupp.numOfScalarExpr, &pInfo->scalarSupp.rowCellInfoOffset);
}
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pInfo->pGroupSet = taosHashInit(100, hashFn, false, HASH_NO_LOCK);
......@@ -683,16 +704,16 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo*
uint32_t defaultPgsz = 0;
uint32_t defaultBufsz = 0;
getBufferPgSize(pResultBlock->info.rowSize, &defaultPgsz, &defaultBufsz);
getBufferPgSize(pResBlock->info.rowSize, &defaultPgsz, &defaultBufsz);
int32_t code = createDiskbasedBuf(&pInfo->pBuf, defaultPgsz, defaultBufsz, pTaskInfo->id.str, TD_TMP_DIR_PATH);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
pInfo->rowCapacity = blockDataGetCapacityInRow(pResultBlock, getBufPageSize(pInfo->pBuf));
pInfo->columnOffset = setupColumnOffset(pResultBlock, pInfo->rowCapacity);
code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pGroupColList);
pInfo->rowCapacity = blockDataGetCapacityInRow(pResBlock, getBufPageSize(pInfo->pBuf));
pInfo->columnOffset = setupColumnOffset(pResBlock, pInfo->rowCapacity);
code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pInfo->pGroupCols);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
......@@ -701,7 +722,7 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo*
pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED;
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PARTITION;
pInfo->binfo.pRes = pResultBlock;
pInfo->binfo.pRes = pResBlock;
pOperator->numOfExprs = numOfCols;
pOperator->pExpr = pExprInfo;
pOperator->info = pInfo;
......
......@@ -1234,6 +1234,8 @@ static SSDataBlock* doFilterResult(SSysTableScanInfo* pInfo) {
return pInfo->pRes->info.rows == 0 ? NULL : pInfo->pRes;
}
doFilter(pInfo->pCondition, pInfo->pRes);
#if 0
SFilterInfo* filter = NULL;
int32_t code = filterInitFromNode(pInfo->pCondition, &filter, 0);
......@@ -1279,6 +1281,7 @@ static SSDataBlock* doFilterResult(SSysTableScanInfo* pInfo) {
px->info.rows = numOfRow;
pInfo->pRes = px;
#endif
return pInfo->pRes->info.rows == 0 ? NULL : pInfo->pRes;
}
......@@ -1457,6 +1460,8 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock);
doFilterResult(pInfo);
blockDataDestroy(p);
pInfo->loadInfo.totalRows += pInfo->pRes->info.rows;
return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes;
}
......@@ -1545,10 +1550,10 @@ int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capacity) {
p->info.rows = buildDbTableInfoBlock(p, pSysDbTableMeta, size, TSDB_PERFORMANCE_SCHEMA_DB);
relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock);
// blockDataDestroy(p); todo handle memory leak
pInfo->pRes->info.rows = p->info.rows;
return p->info.rows;
blockDataDestroy(p);
return pInfo->pRes->info.rows;
}
int32_t buildDbTableInfoBlock(const SSDataBlock* p, const SSysTableMeta* pSysDbTableMeta, size_t size,
......
......@@ -420,24 +420,29 @@ SOperatorInfo* createMultiwaySortMergeOperatorInfo(SOperatorInfo** downStreams,
goto _error;
}
pInfo->binfo.pRes = pResBlock;
initResultSizeInfo(pOperator, 1024);
pInfo->pSortInfo = pSortInfo;
pInfo->binfo.pRes = pResBlock;
pInfo->pSortInfo = pSortInfo;
pInfo->pColMatchInfo = pColMatchColInfo;
pInfo->pInputBlock = pInputBlock;
pOperator->name = "MultiwaySortMerge";
pInfo->pInputBlock = pInputBlock;
pOperator->name = "MultiwaySortMerge";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pInfo->bufPageSize = rowSize < 1024 ? 1024 : rowSize * 2;
pInfo->sortBufSize = pInfo->bufPageSize * 16;
pInfo->hasGroupId = false;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pInfo->hasGroupId = false;
pInfo->prefetchedTuple = NULL;
pOperator->pTaskInfo = pTaskInfo;
pInfo->bufPageSize = getProperSortPageSize(rowSize);
uint32_t numOfSources = taosArrayGetSize(pSortInfo);
numOfSources = TMAX(2, numOfSources);
pInfo->sortBufSize = numOfSources * pInfo->bufPageSize;
pOperator->fpSet =
createOperatorFpSet(doOpenMultiwaySortMergeOperator, doMultiwaySortMerge, NULL, NULL,
destroyMultiwaySortMergeOperatorInfo, NULL, NULL, getMultiwaySortMergeExplainExecInfo);
......
......@@ -333,11 +333,6 @@ void doTimeWindowInterpolation(SIntervalAggOperatorInfo* pInfo, int32_t numOfExp
continue;
}
// if (functionId != FUNCTION_TWA && functionId != FUNCTION_INTERP) {
// pCtx[k].start.key = INT64_MIN;
// continue;
// }
SFunctParam* pParam = &pCtx[k].param[0];
SColumnInfoData* pColInfo = taosArrayGet(pDataBlock, pParam->pCol->slotId);
......
......@@ -204,7 +204,12 @@ static int32_t sortComparInit(SMsortComparParam* cmpParam, SArray* pSources, int
if (pHandle->type == SORT_SINGLESOURCE_SORT) {
for (int32_t i = 0; i < cmpParam->numOfSources; ++i) {
SSortSource* pSource = cmpParam->pSources[i];
SPageInfo* pPgInfo = *(SPageInfo**)taosArrayGet(pSource->pageIdList, pSource->pageIndex);
if (taosArrayGetSize(pSource->pageIdList) == 0) {
return TSDB_CODE_SUCCESS;
}
SPageInfo* pPgInfo = *(SPageInfo**)taosArrayGet(pSource->pageIdList, pSource->pageIndex);
void* pPage = getBufPage(pHandle->pBuf, getPageId(pPgInfo));
code = blockDataFromBuf(pSource->src.pBlock, pPage);
......@@ -532,6 +537,19 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
return 0;
}
int32_t getProperSortPageSize(size_t rowSize) {
uint32_t defaultPageSize = 4096;
uint32_t pgSize = 0;
if (rowSize * 4 > defaultPageSize) {
pgSize = rowSize * 4;
} else {
pgSize = defaultPageSize;
}
return pgSize;
}
static int32_t createInitialSources(SSortHandle* pHandle) {
size_t sortBufSize = pHandle->numOfPages * pHandle->pageSize;
......@@ -557,14 +575,9 @@ static int32_t createInitialSources(SSortHandle* pHandle) {
if (!hasGroupId) {
// calculate the buffer pages according to the total available buffers.
int32_t rowSize = blockDataGetRowSize(pBlock);
if (rowSize * 4 > 4096) {
pHandle->pageSize = rowSize * 4;
} else {
pHandle->pageSize = 4096;
}
// todo!!
pHandle->pageSize = getProperSortPageSize(blockDataGetRowSize(pBlock));
// todo, number of pages are set according to the total available sort buffer
pHandle->numOfPages = 1024;
sortBufSize = pHandle->numOfPages * pHandle->pageSize;
......@@ -577,7 +590,7 @@ static int32_t createInitialSources(SSortHandle* pHandle) {
if (pHandle->beforeFp != NULL) {
pHandle->beforeFp(pBlock, pHandle->param);
}
// todo relocate the columns
int32_t code = blockDataMerge(pHandle->pDataBlock, pBlock);
if (code != 0) {
return code;
......
......@@ -945,7 +945,7 @@ TEST(testCase, build_executor_tree_Test) {
int32_t code = qStringToSubplan(msg, &plan);
ASSERT_EQ(code, 0);
code = qCreateExecTask(&handle, 2, 1, plan, (void**)&pTaskInfo, &sinkHandle, OPTR_EXEC_MODEL_BATCH);
code = qCreateExecTask(&handle, 2, 1, plan, (void**)&pTaskInfo, &sinkHandle, NULL, OPTR_EXEC_MODEL_BATCH);
ASSERT_EQ(code, 0);
}
#if 0
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_COMMON_EXPR_H_
#define _TD_COMMON_EXPR_H_
#ifdef __cplusplus
extern "C" {
#endif
#include "os.h"
#include "tmsg.h"
#include "taosdef.h"
#include "tskiplist.h"
#include "function.h"
struct tExprNode;
struct SSchema;
#define QUERY_COND_REL_PREFIX_IN "IN|"
#define QUERY_COND_REL_PREFIX_LIKE "LIKE|"
#define QUERY_COND_REL_PREFIX_MATCH "MATCH|"
#define QUERY_COND_REL_PREFIX_NMATCH "NMATCH|"
#define QUERY_COND_REL_PREFIX_IN_LEN 3
#define QUERY_COND_REL_PREFIX_LIKE_LEN 5
#define QUERY_COND_REL_PREFIX_MATCH_LEN 6
#define QUERY_COND_REL_PREFIX_NMATCH_LEN 7
typedef bool (*__result_filter_fn_t)(const void *, void *);
typedef void (*__do_filter_suppl_fn_t)(void *, void *);
/**
* this structure is used to filter data in tags, so the offset of filtered tag column in tagdata string is required
*/
typedef struct tQueryInfo {
uint8_t optr; // expression operator
SSchema sch; // schema of tags
char* q;
__compar_fn_t compare; // filter function
bool indexed; // indexed columns
} tQueryInfo;
typedef struct SExprTraverseSupp {
__result_filter_fn_t nodeFilterFn;
__do_filter_suppl_fn_t setupInfoFn;
void *pExtInfo;
} SExprTraverseSupp;
#ifdef __cplusplus
}
#endif
#endif /*_TD_COMMON_EXPR_H_*/
......@@ -17,15 +17,7 @@
#include "os.h"
#include "texception.h"
#include "taosdef.h"
#include "tmsg.h"
#include "tarray.h"
#include "tbuffer.h"
#include "tcompare.h"
#include "thash.h"
#include "texpr.h"
#include "tvariant.h"
#include "tdef.h"
static void doExprTreeDestroy(tExprNode **pExpr, void (*fp)(void *));
......
......@@ -24,7 +24,7 @@ extern "C" {
#include "dataSinkMgt.h"
int32_t qwPrerocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg);
int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t explain);
int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t explain, const char* sql);
int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg);
int32_t qwProcessReady(QW_FPARAMS_DEF, SQWMsg *qwMsg);
int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg);
......
......@@ -308,10 +308,8 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int
SQWMsg qwMsg = {.node = node, .msg = msg->msg + msg->sqlLen, .msgLen = msg->phyLen, .connInfo = pMsg->info};
char * sql = strndup(msg->msg, msg->sqlLen);
QW_SCH_TASK_DLOG("processQuery start, node:%p, handle:%p, sql:%s", node, pMsg->info.handle, sql);
taosMemoryFreeClear(sql);
QW_ERR_RET(qwProcessQuery(QW_FPARAMS(), &qwMsg, msg->taskType, msg->explain));
QW_ERR_RET(qwProcessQuery(QW_FPARAMS(), &qwMsg, msg->taskType, msg->explain, sql));
QW_SCH_TASK_DLOG("processQuery end, node:%p", node);
return TSDB_CODE_SUCCESS;
......
......@@ -510,7 +510,7 @@ _return:
}
int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t explain) {
int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t explain, const char* sql) {
int32_t code = 0;
bool queryRsped = false;
SSubplan *plan = NULL;
......@@ -537,7 +537,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t ex
ctx->plan = plan;
code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, OPTR_EXEC_MODEL_BATCH);
code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, sql, OPTR_EXEC_MODEL_BATCH);
if (code) {
QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code));
QW_ERR_JRET(code);
......@@ -938,7 +938,7 @@ int32_t qwProcessDelete(QW_FPARAMS_DEF, SQWMsg *qwMsg, SRpcMsg *pRsp, SDeleteRes
ctx.plan = plan;
code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, OPTR_EXEC_MODEL_BATCH);
code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, NULL, OPTR_EXEC_MODEL_BATCH);
if (code) {
QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code));
QW_ERR_JRET(code);
......
......@@ -1488,13 +1488,15 @@ int32_t schExecJobImpl(SSchedulerReq *pReq, int64_t *job, SQueryResult* pRes, bo
qDebug("QID:0x%" PRIx64 " job refId 0x%"PRIx64 " started", pReq->pDag->queryId, pJob->refId);
*job = pJob->refId;
if (!sync) {
pJob->userCb = SCH_EXEC_CB;
}
SCH_ERR_JRET(schLaunchJob(pJob));
if (sync) {
SCH_JOB_DLOG("will wait for rsp now, job status:%s", SCH_GET_JOB_STATUS_STR(pJob));
tsem_wait(&pJob->rspSem);
} else {
pJob->userCb = SCH_EXEC_CB;
}
SCH_JOB_DLOG("job exec done, job status:%s, jobId:0x%"PRIx64, SCH_GET_JOB_STATUS_STR(pJob), pJob->refId);
......
......@@ -156,7 +156,7 @@ int32_t syncSetStandby(int64_t rid) {
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
terrno = TSDB_CODE_SYN_IS_LEADER;
sError("failed to set standby since it is leader, rid:%" PRId64, rid);
return -1;
}
......
......@@ -324,9 +324,9 @@ void cliHandleResp(SCliConn* conn) {
tDebug("%s cli conn %p ref by app", CONN_GET_INST_LABEL(conn), conn);
}
tDebug("%s cli conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pTransInst->label, conn,
tDebug("%s cli conn %p %s received from %s:%d, local info:%s:%d, msg size:%d code:0x%x", pTransInst->label, conn,
TMSG_INFO(pHead->msgType), taosInetNtoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port),
taosInetNtoa(conn->localAddr.sin_addr), ntohs(conn->localAddr.sin_port), transMsg.contLen);
taosInetNtoa(conn->localAddr.sin_addr), ntohs(conn->localAddr.sin_port), transMsg.contLen, transMsg.code);
if (pCtx == NULL && CONN_NO_PERSIST_BY_APP(conn)) {
tTrace("except, server continue send while cli ignore it");
......
......@@ -285,13 +285,14 @@ static void uvHandleReq(SSvrConn* pConn) {
}
if (pConn->status == ConnNormal && pHead->noResp == 0) {
transRefSrvHandle(pConn);
tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pConn, TMSG_INFO(transMsg.msgType),
taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->localAddr.sin_addr),
ntohs(pConn->localAddr.sin_port), transMsg.contLen);
tDebug("server conn %p %s received from %s:%d, local info:%s:%d, msg size:%d code:0x%x", pConn,
TMSG_INFO(transMsg.msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port),
taosInetNtoa(pConn->localAddr.sin_addr), ntohs(pConn->localAddr.sin_port), transMsg.contLen, transMsg.code);
} else {
tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d, resp:%d ", pConn,
tDebug("server conn %p %s received from %s:%d, local info:%s:%d, msg size:%d, resp:%d code:0x%x", pConn,
TMSG_INFO(transMsg.msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port),
taosInetNtoa(pConn->localAddr.sin_addr), ntohs(pConn->localAddr.sin_port), transMsg.contLen, pHead->noResp);
taosInetNtoa(pConn->localAddr.sin_addr), ntohs(pConn->localAddr.sin_port), transMsg.contLen, pHead->noResp,
transMsg.code);
// no ref here
}
......
......@@ -412,7 +412,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SYN_MISMATCHED_SIGNATURE, "Mismatched signature"
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_CHECKSUM, "Invalid msg checksum")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_MSGLEN, "Invalid msg length")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_MSGTYPE, "Invalid msg type")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_IS_LEADER, "Sync is leader")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NOT_LEADER, "Sync not leader")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_ONE_REPLICA, "Sync one replica")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NOT_IN_NEW_CONFIG, "Sync not in new config")
......
......@@ -4,6 +4,11 @@ system sh/deploy.sh -n dnode2 -i 2
system sh/deploy.sh -n dnode3 -i 3
system sh/deploy.sh -n dnode4 -i 4
system sh/deploy.sh -n dnode5 -i 5
system sh/cfg.sh -n dnode1 -c transPullupInterval -v 1
system sh/cfg.sh -n dnode2 -c transPullupInterval -v 1
system sh/cfg.sh -n dnode3 -c transPullupInterval -v 1
system sh/cfg.sh -n dnode4 -c transPullupInterval -v 1
system sh/cfg.sh -n dnode5 -c transPullupInterval -v 1
system sh/cfg.sh -n dnode1 -c supportVnodes -v 0
system sh/exec.sh -n dnode1 -s start
system sh/exec.sh -n dnode2 -s start
......@@ -128,9 +133,9 @@ if $rows != 1 then
endi
if $data(2)[4] == leader then
$leaderExist = 1
$leaderVnode = 4
$follower1 = 2
$follower2 = 3
$leaderVnode = 2
$follower1 = 3
$follower2 = 4
endi
if $data(2)[6] == leader then
$leaderExist = 1
......@@ -140,9 +145,9 @@ if $data(2)[6] == leader then
endi
if $data(2)[8] == leader then
$leaderExist = 1
$leaderVnode = 2
$follower1 = 3
$follower2 = 4
$leaderVnode = 4
$follower1 = 2
$follower2 = 3
endi
if $leaderExist != 1 then
goto step3
......@@ -160,110 +165,55 @@ if $rows != 1 then
return -1
endi
<<<<<<< HEAD
print =============== step32: move follower2
print redistribute vgroup 2 dnode $leaderVnode dnode $follower2 dnode 5
sql redistribute vgroup 2 dnode $leaderVnode dnode $follower2 dnode 5
<<<<<<< HEAD:tests/script/tsim/dnode/redistribute_vgroup_replica3_v1_leader.sim
=======
sql show d1.vgroups
print ===> $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09
>>>>>>> origin/3.0:tests/script/tsim/dnode/redistribute_vgroup_replica3_v1_follower.sim
=======
print =============== step32: move leader
print redistribute vgroup 2 dnode $follower1 dnode $follower2 dnode 5
sql redistribute vgroup 2 dnode $follower1 dnode $follower2 dnode 5
sql show d1.vgroups
print ===> $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09
>>>>>>> origin/3.0
sql show d1.tables
if $rows != 1 then
return -1
endi
<<<<<<< HEAD
print =============== step33: move follower1
print redistribute vgroup 2 dnode $leaderVnode dnode $follower1 dnode 5
sql redistribute vgroup 2 dnode $leaderVnode dnode $follower1 dnode 5
<<<<<<< HEAD:tests/script/tsim/dnode/redistribute_vgroup_replica3_v1_leader.sim
=======
sql show d1.vgroups
print ===> $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09
return
>>>>>>> origin/3.0:tests/script/tsim/dnode/redistribute_vgroup_replica3_v1_follower.sim
=======
print =============== step33: move follower2
print redistribute vgroup 2 dnode $leaderVnode dnode $follower1 dnode 5
sql redistribute vgroup 2 dnode $leaderVnode dnode $follower1 dnode 5
print =============== step33: move follower1
print redistribute vgroup 2 dnode $leaderVnode dnode $follower2 dnode 5
sql redistribute vgroup 2 dnode $leaderVnode dnode $follower2 dnode 5
sql show d1.vgroups
print ===> $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09
>>>>>>> origin/3.0
sql show d1.tables
if $rows != 1 then
return -1
endi
<<<<<<< HEAD
print =============== step34: move follower2
print redistribute vgroup 2 dnode $leaderVnode dnode 5 dnode $follower2
sql redistribute vgroup 2 dnode $leaderVnode dnode 5 dnode $follower2
<<<<<<< HEAD:tests/script/tsim/dnode/redistribute_vgroup_replica3_v1_leader.sim
=======
sql show d1.vgroups
print ===> $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09
>>>>>>> origin/3.0:tests/script/tsim/dnode/redistribute_vgroup_replica3_v1_follower.sim
=======
print =============== step34: move follower1
print redistribute vgroup 2 dnode $follower2 dnode 5 dnode $leaderVnode
sql redistribute vgroup 2 dnode $follower2 dnode 5 dnode $leaderVnode
print redistribute vgroup 2 dnode $follower1 dnode 5 dnode $leaderVnode
sql redistribute vgroup 2 dnode $follower1 dnode 5 dnode $leaderVnode
sql show d1.vgroups
print ===> $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09
>>>>>>> origin/3.0
sql show d1.tables
if $rows != 1 then
return -1
endi
<<<<<<< HEAD
print =============== step35: move follower1
print redistribute vgroup 2 dnode $leaderVnode dnode 5 dnode $follower1
sql redistribute vgroup 2 dnode $leaderVnode dnode 5 dnode $follower1
<<<<<<< HEAD:tests/script/tsim/dnode/redistribute_vgroup_replica3_v1_leader.sim
sql show d1.tables
if $rows != 1 then
return -1
endi
=======
print =============== step35: move leader
print redistribute vgroup 2 dnode $follower2 dnode 5 dnode $follower1
sql redistribute vgroup 2 dnode $follower2 dnode 5 dnode $follower1
sql show d1.vgroups
print ===> $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09
>>>>>>> origin/3.0:tests/script/tsim/dnode/redistribute_vgroup_replica3_v1_follower.sim
=======
print =============== step35: move 5
print redistribute vgroup 2 dnode $leaderVnode dnode $follower1 dnode $follower2
sql redistribute vgroup 2 dnode $leaderVnode dnode $follower1 dnode $follower2
sql show d1.vgroups
print ===> $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09
>>>>>>> origin/3.0
sql show d1.tables
if $rows != 1 then
return -1
endi
<<<<<<< HEAD
print =============== step36: move follower2
print =============== step36: move follower1
print redistribute vgroup 2 dnode $leaderVnode dnode $follower2 dnode 5
sql redistribute vgroup 2 dnode $leaderVnode dnode $follower2 dnode 5
=======
print =============== step36: move follower1
print redistribute vgroup 2 dnode $follower1 dnode $follower2 dnode 5
sql redistribute vgroup 2 dnode $follower1 dnode $follower2 dnode 5
>>>>>>> origin/3.0
sql show d1.vgroups
print ===> $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09
......@@ -272,11 +222,7 @@ if $rows != 1 then
return -1
endi
<<<<<<< HEAD
print =============== step37: move follower1
=======
print =============== step37: move follower2
>>>>>>> origin/3.0
print redistribute vgroup 2 dnode $leaderVnode dnode $follower1 dnode 5
sql redistribute vgroup 2 dnode $leaderVnode dnode $follower1 dnode 5
sql show d1.vgroups
......@@ -287,17 +233,9 @@ if $rows != 1 then
return -1
endi
<<<<<<< HEAD
<<<<<<< HEAD:tests/script/tsim/dnode/redistribute_vgroup_replica3_v1_leader.sim
=======
print =============== step38: move follower2
print redistribute vgroup 2 dnode $leaderVnode dnode 5 dnode $follower2
sql redistribute vgroup 2 dnode $leaderVnode dnode 5 dnode $follower2
=======
print =============== step38: move leader
print redistribute vgroup 2 dnode $follower1 dnode 5 dnode $follower2
sql redistribute vgroup 2 dnode $follower1 dnode 5 dnode $follower2
>>>>>>> origin/3.0
sql show d1.vgroups
print ===> $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09
......@@ -305,18 +243,10 @@ sql show d1.tables
if $rows != 1 then
return -1
endi
<<<<<<< HEAD
>>>>>>> origin/3.0:tests/script/tsim/dnode/redistribute_vgroup_replica3_v1_follower.sim
print =============== step39: move follower1
print redistribute vgroup 2 dnode $leaderVnode dnode 5 dnode $follower1
sql redistribute vgroup 2 dnode $leaderVnode dnode 5 dnode $follower1
=======
print =============== step39: move 5
print redistribute vgroup 2 dnode $leaderVnode dnode $follower2 dnode $follower1
sql redistribute vgroup 2 dnode $leaderVnode dnode $follower2 dnode $follower1
>>>>>>> origin/3.0
print redistribute vgroup 2 dnode $leaderVnode dnode 5 dnode $follower2
sql redistribute vgroup 2 dnode $leaderVnode dnode 5 dnode $follower2
sql show d1.vgroups
print ===> $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09
......
......@@ -101,7 +101,7 @@ python3 ./test.py -f 2-query/tail.py
python3 ./test.py -f 6-cluster/5dnode1mnode.py
python3 ./test.py -f 6-cluster/5dnode2mnode.py
python3 ./test.py -f 6-cluster/5dnode3mnodeStop.py
#python3 ./test.py -f 6-cluster/5dnode3mnodeStop.py
python3 ./test.py -f 6-cluster/5dnode3mnodeDrop.py
# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeStopInsert.py
......
......@@ -5,7 +5,12 @@ set /a a=0
if %1 == full (
echo Windows Taosd Full Test
set /a exitNum=0
for /F "usebackq tokens=*" %%i in (fulltest.bat) do (
del /Q /F failed.txt
set caseFile="fulltest.bat"
if not "%2" == "" (
set caseFile="%2"
)
for /F "usebackq tokens=*" %%i in (!caseFile!) do (
for /f "tokens=1* delims= " %%a in ("%%i") do if not "%%a" == "@REM" (
set /a a+=1
echo !a! Processing %%i
......@@ -13,7 +18,7 @@ if %1 == full (
set time1=!_timeTemp!
echo Start at !time!
call %%i ARG1 > result_!a!.txt 2>error_!a!.txt
if errorlevel 1 ( call :colorEcho 0c "failed" &echo. && set /a exitNum=8 ) else ( call :colorEcho 0a "Success" &echo. )
if errorlevel 1 ( call :colorEcho 0c "failed" &echo. && set /a exitNum=8 && echo %%i >>failed.txt ) else ( call :colorEcho 0a "Success" &echo. )
)
)
exit !exitNum!
......@@ -77,4 +82,4 @@ for %%a in (%tt%) do (
set /a index=index+1
)
set /a _timeTemp=(%hh%*60+%mm%)*60+%ss%
goto :eof
\ No newline at end of file
goto :eof
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册