提交 80e9120f 编写于 作者: C Cary Xu

Merge branch '3.0' into feature/TD-14481-3.0

......@@ -24,6 +24,7 @@ static void msg_process(TAOS_RES* msg) {
char buf[1024];
/*memset(buf, 0, 1024);*/
printf("topic: %s\n", tmq_get_topic_name(msg));
printf("db: %s\n", tmq_get_db_name(msg));
printf("vg: %d\n", tmq_get_vgroup_id(msg));
while (1) {
TAOS_ROW row = taos_fetch_row(msg);
......
......@@ -144,8 +144,8 @@ DLL_EXPORT int taos_stmt_set_tbname_tags(TAOS_STMT *stmt, const char *nam
DLL_EXPORT int taos_stmt_set_tbname(TAOS_STMT *stmt, const char *name);
DLL_EXPORT int taos_stmt_set_tags(TAOS_STMT *stmt, TAOS_MULTI_BIND *tags);
DLL_EXPORT int taos_stmt_set_sub_tbname(TAOS_STMT *stmt, const char *name);
DLL_EXPORT int taos_stmt_get_tag_fields(TAOS_STMT *stmt, int* fieldNum, TAOS_FIELD_E** fields);
DLL_EXPORT int taos_stmt_get_col_fields(TAOS_STMT *stmt, int* fieldNum, TAOS_FIELD_E** fields);
DLL_EXPORT int taos_stmt_get_tag_fields(TAOS_STMT *stmt, int *fieldNum, TAOS_FIELD_E **fields);
DLL_EXPORT int taos_stmt_get_col_fields(TAOS_STMT *stmt, int *fieldNum, TAOS_FIELD_E **fields);
DLL_EXPORT int taos_stmt_is_insert(TAOS_STMT *stmt, int *insert);
DLL_EXPORT int taos_stmt_num_params(TAOS_STMT *stmt, int *nums);
......@@ -269,6 +269,7 @@ DLL_EXPORT void tmq_conf_set_auto_commit_cb(tmq_conf_t *conf, tmq_comm
/* -------------------------TMQ MSG HANDLE INTERFACE---------------------- */
DLL_EXPORT const char *tmq_get_topic_name(TAOS_RES *res);
DLL_EXPORT const char *tmq_get_db_name(TAOS_RES *res);
DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res);
DLL_EXPORT const char *tmq_get_table_name(TAOS_RES *res);
......
......@@ -2203,10 +2203,8 @@ typedef struct {
int64_t newConsumerId;
char subKey[TSDB_SUBSCRIBE_KEY_LEN];
int8_t subType;
// int8_t withTbName;
// int8_t withSchema;
// int8_t withTag;
char* qmsg;
char* qmsg;
int64_t suid;
} SMqRebVgReq;
static FORCE_INLINE int32_t tEncodeSMqRebVgReq(void** buf, const SMqRebVgReq* pReq) {
......@@ -2217,11 +2215,10 @@ static FORCE_INLINE int32_t tEncodeSMqRebVgReq(void** buf, const SMqRebVgReq* pR
tlen += taosEncodeFixedI64(buf, pReq->newConsumerId);
tlen += taosEncodeString(buf, pReq->subKey);
tlen += taosEncodeFixedI8(buf, pReq->subType);
// tlen += taosEncodeFixedI8(buf, pReq->withTbName);
// tlen += taosEncodeFixedI8(buf, pReq->withSchema);
// tlen += taosEncodeFixedI8(buf, pReq->withTag);
if (pReq->subType == TOPIC_SUB_TYPE__COLUMN) {
tlen += taosEncodeString(buf, pReq->qmsg);
} else if (pReq->subType == TOPIC_SUB_TYPE__TABLE) {
tlen += taosEncodeFixedI64(buf, pReq->suid);
}
return tlen;
}
......@@ -2233,11 +2230,10 @@ static FORCE_INLINE void* tDecodeSMqRebVgReq(const void* buf, SMqRebVgReq* pReq)
buf = taosDecodeFixedI64(buf, &pReq->newConsumerId);
buf = taosDecodeStringTo(buf, pReq->subKey);
buf = taosDecodeFixedI8(buf, &pReq->subType);
// buf = taosDecodeFixedI8(buf, &pReq->withTbName);
// buf = taosDecodeFixedI8(buf, &pReq->withSchema);
// buf = taosDecodeFixedI8(buf, &pReq->withTag);
if (pReq->subType == TOPIC_SUB_TYPE__COLUMN) {
buf = taosDecodeString(buf, &pReq->qmsg);
} else if (pReq->subType == TOPIC_SUB_TYPE__TABLE) {
buf = taosDecodeFixedI64(buf, &pReq->suid);
}
return (void*)buf;
}
......@@ -2471,7 +2467,7 @@ static FORCE_INLINE void* tDecodeSMqSubVgEp(void* buf, SMqSubVgEp* pVgEp) {
typedef struct {
char topic[TSDB_TOPIC_FNAME_LEN];
int8_t isSchemaAdaptive;
char db[TSDB_DB_FNAME_LEN];
SArray* vgs; // SArray<SMqSubVgEp>
SSchemaWrapper schema;
} SMqSubTopicEp;
......@@ -2479,7 +2475,7 @@ typedef struct {
static FORCE_INLINE int32_t tEncodeSMqSubTopicEp(void** buf, const SMqSubTopicEp* pTopicEp) {
int32_t tlen = 0;
tlen += taosEncodeString(buf, pTopicEp->topic);
tlen += taosEncodeFixedI8(buf, pTopicEp->isSchemaAdaptive);
tlen += taosEncodeString(buf, pTopicEp->db);
int32_t sz = taosArrayGetSize(pTopicEp->vgs);
tlen += taosEncodeFixedI32(buf, sz);
for (int32_t i = 0; i < sz; i++) {
......@@ -2492,7 +2488,7 @@ static FORCE_INLINE int32_t tEncodeSMqSubTopicEp(void** buf, const SMqSubTopicEp
static FORCE_INLINE void* tDecodeSMqSubTopicEp(void* buf, SMqSubTopicEp* pTopicEp) {
buf = taosDecodeStringTo(buf, pTopicEp->topic);
buf = taosDecodeFixedI8(buf, &pTopicEp->isSchemaAdaptive);
buf = taosDecodeStringTo(buf, pTopicEp->db);
int32_t sz;
buf = taosDecodeFixedI32(buf, &sz);
pTopicEp->vgs = taosArrayInit(sz, sizeof(SMqSubVgEp));
......
......@@ -156,6 +156,9 @@ bool fmIsDynamicScanOptimizedFunc(int32_t funcId);
bool fmIsMultiResFunc(int32_t funcId);
bool fmIsRepeatScanFunc(int32_t funcId);
bool fmIsUserDefinedFunc(int32_t funcId);
bool fmIsDistExecFunc(int32_t funcId);
int32_t fmGetDistMethod(const SFunctionNode* pFunc, SFunctionNode** pPartialFunc, SFunctionNode** pMergeFunc);
typedef enum EFuncDataRequired {
FUNC_DATA_REQUIRED_DATA_LOAD = 1,
......
......@@ -189,6 +189,7 @@ typedef enum ENodeType {
QUERY_NODE_LOGIC_PLAN_PROJECT,
QUERY_NODE_LOGIC_PLAN_VNODE_MODIF,
QUERY_NODE_LOGIC_PLAN_EXCHANGE,
QUERY_NODE_LOGIC_PLAN_MERGE,
QUERY_NODE_LOGIC_PLAN_WINDOW,
QUERY_NODE_LOGIC_PLAN_FILL,
QUERY_NODE_LOGIC_PLAN_SORT,
......@@ -206,6 +207,7 @@ typedef enum ENodeType {
QUERY_NODE_PHYSICAL_PLAN_JOIN,
QUERY_NODE_PHYSICAL_PLAN_AGG,
QUERY_NODE_PHYSICAL_PLAN_EXCHANGE,
QUERY_NODE_PHYSICAL_PLAN_MERGE,
QUERY_NODE_PHYSICAL_PLAN_SORT,
QUERY_NODE_PHYSICAL_PLAN_INTERVAL,
QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL,
......
......@@ -95,9 +95,15 @@ typedef struct SVnodeModifLogicNode {
typedef struct SExchangeLogicNode {
SLogicNode node;
int32_t srcGroupId;
uint8_t precision;
} SExchangeLogicNode;
typedef struct SMergeLogicNode {
SLogicNode node;
SNodeList* pMergeKeys;
int32_t numOfChannels;
int32_t srcGroupId;
} SMergeLogicNode;
typedef enum EWindowType { WINDOW_TYPE_INTERVAL = 1, WINDOW_TYPE_SESSION, WINDOW_TYPE_STATE } EWindowType;
typedef struct SWindowLogicNode {
......@@ -268,6 +274,13 @@ typedef struct SExchangePhysiNode {
SNodeList* pSrcEndPoints; // element is SDownstreamSource, scheduler fill by calling qSetSuplanExecutionNode
} SExchangePhysiNode;
typedef struct SMergePhysiNode {
SPhysiNode node;
SNodeList* pMergeKeys;
int32_t numOfChannels;
int32_t srcGroupId;
} SMergePhysiNode;
typedef struct SWinodwPhysiNode {
SPhysiNode node;
SNodeList* pExprs; // these are expression list of parameter expression of function
......
......@@ -209,7 +209,7 @@ typedef enum ELogicConditionType {
#define TSDB_INDEX_FNAME_LEN (TSDB_DB_FNAME_LEN + TSDB_INDEX_NAME_LEN + TSDB_NAME_DELIMITER_LEN)
#define TSDB_TYPE_STR_MAX_LEN 32
#define TSDB_TABLE_FNAME_LEN (TSDB_DB_FNAME_LEN + TSDB_TABLE_NAME_LEN + TSDB_NAME_DELIMITER_LEN)
#define TSDB_TOPIC_FNAME_LEN TSDB_TABLE_FNAME_LEN
#define TSDB_TOPIC_FNAME_LEN (TSDB_ACCT_ID_LEN + TSDB_TABLE_NAME_LEN + TSDB_NAME_DELIMITER_LEN)
#define TSDB_STREAM_FNAME_LEN TSDB_TABLE_FNAME_LEN
#define TSDB_SUBSCRIBE_KEY_LEN (TSDB_CGROUP_LEN + TSDB_TOPIC_FNAME_LEN + 2)
#define TSDB_PARTITION_KEY_LEN (TSDB_SUBSCRIBE_KEY_LEN + 20)
......
......@@ -191,6 +191,7 @@ typedef struct SRequestSendRecvBody {
typedef struct {
int8_t resType;
char topic[TSDB_TOPIC_FNAME_LEN];
char db[TSDB_DB_FNAME_LEN];
int32_t vgId;
SSchemaWrapper schema;
int32_t resIter;
......
......@@ -143,6 +143,7 @@ typedef struct {
typedef struct {
// subscribe info
char* topicName;
char db[TSDB_DB_FNAME_LEN];
SArray* vgs; // SArray<SMqClientVg>
......@@ -1039,6 +1040,7 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) {
topic.schema = pTopicEp->schema;
taosHashClear(pHash);
topic.topicName = strdup(pTopicEp->topic);
tstrncpy(topic.db, pTopicEp->db, TSDB_DB_FNAME_LEN);
tscDebug("consumer %ld update topic: %s", tmq->consumerId, topic.topicName);
int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
......@@ -1283,7 +1285,8 @@ SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t timeout, SMqClientTopic*
SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) {
SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj));
pRspObj->resType = RES_TYPE__TMQ;
strncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
pRspObj->vgId = pWrapper->vgHandle->vgId;
pRspObj->resIter = -1;
memcpy(&pRspObj->rsp, &pWrapper->msg, sizeof(SMqDataBlkRsp));
......@@ -1506,6 +1509,15 @@ const char* tmq_get_topic_name(TAOS_RES* res) {
}
}
const char* tmq_get_db_name(TAOS_RES* res) {
if (TD_RES_TMQ(res)) {
SMqRspObj* pRspObj = (SMqRspObj*)res;
return strchr(pRspObj->db, '.') + 1;
} else {
return NULL;
}
}
int32_t tmq_get_vgroup_id(TAOS_RES* res) {
if (TD_RES_TMQ(res)) {
SMqRspObj* pRspObj = (SMqRspObj*)res;
......
......@@ -168,7 +168,7 @@ typedef struct {
int64_t createdTime;
int64_t updateTime;
SDnodeObj* pDnode;
SQnodeLoad load;
SQnodeLoad load;
} SQnodeObj;
typedef struct {
......@@ -403,26 +403,22 @@ int32_t tEncodeSMqOffsetObj(void** buf, const SMqOffsetObj* pOffset);
void* tDecodeSMqOffsetObj(void* buf, SMqOffsetObj* pOffset);
typedef struct {
char name[TSDB_TOPIC_FNAME_LEN];
char db[TSDB_DB_FNAME_LEN];
int64_t createTime;
int64_t updateTime;
int64_t uid;
int64_t dbUid;
int32_t version;
int8_t subType; // column, db or stable
// int8_t withTbName;
// int8_t withSchema;
// int8_t withTag;
char name[TSDB_TOPIC_FNAME_LEN];
char db[TSDB_DB_FNAME_LEN];
int64_t createTime;
int64_t updateTime;
int64_t uid;
int64_t dbUid;
int32_t version;
int8_t subType; // column, db or stable
SRWLatch lock;
int32_t consumerCnt;
int32_t sqlLen;
int32_t astLen;
char* sql;
char* ast;
char* physicalPlan;
SSchemaWrapper schema;
// int32_t refConsumerCnt;
int64_t stbUid;
} SMqTopicObj;
typedef struct {
......@@ -476,14 +472,12 @@ int32_t tEncodeSMqConsumerEp(void** buf, const SMqConsumerEp* pEp);
void* tDecodeSMqConsumerEp(const void* buf, SMqConsumerEp* pEp);
typedef struct {
char key[TSDB_SUBSCRIBE_KEY_LEN];
SRWLatch lock;
int64_t dbUid;
int32_t vgNum;
int8_t subType;
// int8_t withTbName;
// int8_t withSchema;
// int8_t withTag;
char key[TSDB_SUBSCRIBE_KEY_LEN];
SRWLatch lock;
int64_t dbUid;
int32_t vgNum;
int8_t subType;
int64_t stbUid;
SHashObj* consumerHash; // consumerId -> SMqConsumerEp
SArray* unassignedVgs; // SArray<SMqVgEp*>
} SMqSubscribeObj;
......@@ -535,7 +529,7 @@ typedef struct {
} SMqRebOutputObj;
typedef struct {
char name[TSDB_TOPIC_FNAME_LEN];
char name[TSDB_STREAM_FNAME_LEN];
char sourceDb[TSDB_DB_FNAME_LEN];
char targetDb[TSDB_DB_FNAME_LEN];
char targetSTbName[TSDB_TABLE_FNAME_LEN];
......
......@@ -306,6 +306,7 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
ASSERT(pTopic);
taosRLockLatch(&pTopic->lock);
tstrncpy(topicEp.db, pTopic->db, TSDB_DB_FNAME_LEN);
topicEp.schema.nCols = pTopic->schema.nCols;
if (topicEp.schema.nCols) {
topicEp.schema.pSchema = taosMemoryCalloc(topicEp.schema.nCols, sizeof(SSchema));
......
......@@ -395,10 +395,8 @@ SMqSubscribeObj *tCloneSubscribeObj(const SMqSubscribeObj *pSub) {
taosInitRWLatch(&pSubNew->lock);
pSubNew->dbUid = pSub->dbUid;
pSubNew->stbUid = pSub->stbUid;
pSubNew->subType = pSub->subType;
/*pSubNew->withTbName = pSub->withTbName;*/
/*pSubNew->withSchema = pSub->withSchema;*/
/*pSubNew->withTag = pSub->withTag;*/
pSubNew->vgNum = pSub->vgNum;
pSubNew->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
......@@ -431,9 +429,7 @@ int32_t tEncodeSubscribeObj(void **buf, const SMqSubscribeObj *pSub) {
tlen += taosEncodeFixedI64(buf, pSub->dbUid);
tlen += taosEncodeFixedI32(buf, pSub->vgNum);
tlen += taosEncodeFixedI8(buf, pSub->subType);
/*tlen += taosEncodeFixedI8(buf, pSub->withTbName);*/
/*tlen += taosEncodeFixedI8(buf, pSub->withSchema);*/
/*tlen += taosEncodeFixedI8(buf, pSub->withTag);*/
tlen += taosEncodeFixedI64(buf, pSub->stbUid);
void *pIter = NULL;
int32_t sz = taosHashGetSize(pSub->consumerHash);
......@@ -458,9 +454,7 @@ void *tDecodeSubscribeObj(const void *buf, SMqSubscribeObj *pSub) {
buf = taosDecodeFixedI64(buf, &pSub->dbUid);
buf = taosDecodeFixedI32(buf, &pSub->vgNum);
buf = taosDecodeFixedI8(buf, &pSub->subType);
/*buf = taosDecodeFixedI8(buf, &pSub->withTbName);*/
/*buf = taosDecodeFixedI8(buf, &pSub->withSchema);*/
/*buf = taosDecodeFixedI8(buf, &pSub->withTag);*/
buf = taosDecodeFixedI64(buf, &pSub->stbUid);
int32_t sz;
buf = taosDecodeFixedI32(buf, &sz);
......
......@@ -93,10 +93,8 @@ static SMqSubscribeObj *mndCreateSub(SMnode *pMnode, const SMqTopicObj *pTopic,
return NULL;
}
pSub->dbUid = pTopic->dbUid;
pSub->stbUid = pTopic->stbUid;
pSub->subType = pTopic->subType;
/*pSub->withTbName = pTopic->withTbName;*/
/*pSub->withSchema = pTopic->withSchema;*/
/*pSub->withTag = pTopic->withTag;*/
ASSERT(pSub->unassignedVgs->size == 0);
ASSERT(taosHashGetSize(pSub->consumerHash) == 0);
......@@ -121,9 +119,7 @@ static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, const SMqSubscri
req.vgId = pRebVg->pVgEp->vgId;
req.qmsg = pRebVg->pVgEp->qmsg;
req.subType = pSub->subType;
/*req.withTbName = pSub->withTbName;*/
/*req.withSchema = pSub->withSchema;*/
/*req.withTag = pSub->withTag;*/
req.suid = pSub->stbUid;
strncpy(req.subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
int32_t tlen = sizeof(SMsgHead) + tEncodeSMqRebVgReq(NULL, &req);
......
......@@ -96,11 +96,8 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
SDB_SET_INT64(pRaw, dataPos, pTopic->dbUid, TOPIC_ENCODE_OVER);
SDB_SET_INT32(pRaw, dataPos, pTopic->version, TOPIC_ENCODE_OVER);
SDB_SET_INT8(pRaw, dataPos, pTopic->subType, TOPIC_ENCODE_OVER);
/*SDB_SET_INT8(pRaw, dataPos, pTopic->withTbName, TOPIC_ENCODE_OVER);*/
/*SDB_SET_INT8(pRaw, dataPos, pTopic->withSchema, TOPIC_ENCODE_OVER);*/
/*SDB_SET_INT8(pRaw, dataPos, pTopic->withTag, TOPIC_ENCODE_OVER);*/
SDB_SET_INT32(pRaw, dataPos, pTopic->consumerCnt, TOPIC_ENCODE_OVER);
SDB_SET_INT64(pRaw, dataPos, pTopic->stbUid, TOPIC_ENCODE_OVER);
SDB_SET_INT32(pRaw, dataPos, pTopic->sqlLen, TOPIC_ENCODE_OVER);
SDB_SET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_ENCODE_OVER);
SDB_SET_INT32(pRaw, dataPos, pTopic->astLen, TOPIC_ENCODE_OVER);
......@@ -122,8 +119,6 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
SDB_SET_BINARY(pRaw, dataPos, swBuf, schemaLen, TOPIC_ENCODE_OVER);
}
/*SDB_SET_INT32(pRaw, dataPos, pTopic->refConsumerCnt, TOPIC_ENCODE_OVER);*/
SDB_SET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_ENCODE_OVER);
SDB_SET_DATALEN(pRaw, dataPos, TOPIC_ENCODE_OVER);
......@@ -168,12 +163,8 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT64(pRaw, dataPos, &pTopic->dbUid, TOPIC_DECODE_OVER);
SDB_GET_INT32(pRaw, dataPos, &pTopic->version, TOPIC_DECODE_OVER);
SDB_GET_INT8(pRaw, dataPos, &pTopic->subType, TOPIC_DECODE_OVER);
/*SDB_GET_INT8(pRaw, dataPos, &pTopic->withTbName, TOPIC_DECODE_OVER);*/
/*SDB_GET_INT8(pRaw, dataPos, &pTopic->withSchema, TOPIC_DECODE_OVER);*/
/*SDB_GET_INT8(pRaw, dataPos, &pTopic->withTag, TOPIC_DECODE_OVER);*/
SDB_GET_INT32(pRaw, dataPos, &pTopic->consumerCnt, TOPIC_DECODE_OVER);
SDB_GET_INT64(pRaw, dataPos, &pTopic->stbUid, TOPIC_DECODE_OVER);
SDB_GET_INT32(pRaw, dataPos, &pTopic->sqlLen, TOPIC_DECODE_OVER);
pTopic->sql = taosMemoryCalloc(pTopic->sqlLen, sizeof(char));
if (pTopic->sql == NULL) {
......@@ -222,8 +213,6 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
pTopic->schema.pSchema = NULL;
}
/*SDB_GET_INT32(pRaw, dataPos, &pTopic->refConsumerCnt, TOPIC_DECODE_OVER);*/
SDB_GET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_DECODE_OVER);
terrno = TSDB_CODE_SUCCESS;
......@@ -254,8 +243,6 @@ static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pOldTopic, SMqTopic
atomic_exchange_64(&pOldTopic->updateTime, pNewTopic->updateTime);
atomic_exchange_32(&pOldTopic->version, pNewTopic->version);
/*atomic_store_32(&pOldTopic->refConsumerCnt, pNewTopic->refConsumerCnt);*/
/*taosWLockLatch(&pOldTopic->lock);*/
// TODO handle update
......@@ -278,18 +265,6 @@ void mndReleaseTopic(SMnode *pMnode, SMqTopicObj *pTopic) {
sdbRelease(pSdb, pTopic);
}
#if 0
static SDbObj *mndAcquireDbByTopic(SMnode *pMnode, char *topicName) {
SName name = {0};
tNameFromString(&name, topicName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
char db[TSDB_TOPIC_FNAME_LEN] = {0};
tNameGetFullDbName(&name, db);
return mndAcquireDb(pMnode, db);
}
#endif
static SDDropTopicReq *mndBuildDropTopicMsg(SMnode *pMnode, SVgObj *pVgroup, SMqTopicObj *pTopic) {
int32_t contLen = sizeof(SDDropTopicReq);
......@@ -341,8 +316,6 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
if (pCreate->subType == TOPIC_SUB_TYPE__COLUMN) {
topicObj.ast = strdup(pCreate->ast);
topicObj.astLen = strlen(pCreate->ast) + 1;
/*topicObj.withTbName = pCreate->withTbName;*/
/*topicObj.withSchema = pCreate->withSchema;*/
SNode *pAst = NULL;
if (nodesStringToNode(pCreate->ast, &pAst) != 0) {
......@@ -375,13 +348,16 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
taosMemoryFree(topicObj.sql);
return -1;
}
/*} else if (pCreate->subType == TOPIC_SUB_TYPE__DB) {*/
/*topicObj.ast = NULL;*/
/*topicObj.astLen = 0;*/
/*topicObj.physicalPlan = NULL;*/
/*topicObj.withTbName = 1;*/
/*topicObj.withSchema = 1;*/
}
} else if (pCreate->subType == TOPIC_SUB_TYPE__TABLE) {
SStbObj *pStb = mndAcquireStb(pMnode, pCreate->subStbName);
topicObj.stbUid = pStb->uid;
}
/*} else if (pCreate->subType == TOPIC_SUB_TYPE__DB) {*/
/*topicObj.ast = NULL;*/
/*topicObj.astLen = 0;*/
/*topicObj.physicalPlan = NULL;*/
/*topicObj.withTbName = 1;*/
/*topicObj.withSchema = 1;*/
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq);
if (pTrans == NULL) {
......
......@@ -85,7 +85,7 @@ typedef struct SMetaFltParam {
tb_uid_t suid;
int16_t cid;
int16_t type;
char * val;
char *val;
bool reverse;
int (*filterFunc)(void *a, void *b, int16_t type);
......@@ -119,7 +119,8 @@ tsdbReaderT tsdbQueryCacheLast(SVnode *pVnode, SQueryTableDataCond *pCond, STab
int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT *pReader, STableBlockDistInfo *pTableBlockInfo);
bool isTsdbCacheLastRow(tsdbReaderT *pReader);
int32_t tsdbGetAllTableList(SMeta *pMeta, uint64_t uid, SArray *list);
void * tsdbGetIdx(SMeta *pMeta);
int32_t tsdbGetCtbIdList(SMeta *pMeta, int64_t suid, SArray *list);
void *tsdbGetIdx(SMeta *pMeta);
int64_t tsdbGetNumOfRowsInMemTable(tsdbReaderT *pHandle);
bool tsdbNextDataBlock(tsdbReaderT pTsdbReadHandle);
......@@ -195,7 +196,7 @@ struct SMetaEntry {
int64_t version;
int8_t type;
tb_uid_t uid;
char * name;
char *name;
union {
struct {
SSchemaWrapper schemaRow;
......@@ -223,17 +224,17 @@ struct SMetaEntry {
struct SMetaReader {
int32_t flags;
SMeta * pMeta;
SMeta *pMeta;
SDecoder coder;
SMetaEntry me;
void * pBuf;
void *pBuf;
int32_t szBuf;
};
struct SMTbCursor {
TBC * pDbc;
void * pKey;
void * pVal;
TBC *pDbc;
void *pKey;
void *pVal;
int32_t kLen;
int32_t vLen;
SMetaReader mr;
......
......@@ -260,18 +260,14 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
pHandle->epoch = -1;
pHandle->execHandle.subType = req.subType;
/*pExec->withTbName = req.withTbName;*/
/*pExec->withSchema = req.withSchema;*/
/*pExec->withTag = req.withTag;*/
pHandle->execHandle.exec.execCol.qmsg = req.qmsg;
req.qmsg = NULL;
pHandle->pWalReader = walOpenReadHandle(pTq->pVnode->pWal);
for (int32_t i = 0; i < 5; i++) {
pHandle->execHandle.pExecReader[i] = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);
}
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
pHandle->execHandle.exec.execCol.qmsg = req.qmsg;
req.qmsg = NULL;
for (int32_t i = 0; i < 5; i++) {
SReadHandle handle = {
.reader = pHandle->execHandle.pExecReader[i],
......@@ -286,6 +282,18 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
pHandle->execHandle.exec.execDb.pFilterOutTbUid =
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
pHandle->execHandle.exec.execTb.suid = req.suid;
SArray* tbUidList = taosArrayInit(0, sizeof(int64_t));
tsdbGetCtbIdList(pTq->pVnode->pMeta, req.suid, tbUidList);
tqDebug("vg %d, tq try get suid: %ld", pTq->pVnode->config.vgId, req.suid);
for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) {
int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i);
tqDebug("vg %d, idx %d, uid: %ld", pTq->pVnode->config.vgId, i, tbUid);
}
for (int32_t i = 0; i < 5; i++) {
tqReadHandleSetTbUidList(pHandle->execHandle.pExecReader[i], tbUidList);
}
taosArrayDestroy(tbUidList);
}
taosHashPut(pTq->handles, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle));
} else {
......
......@@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "vnode.h"
#include "tsdb.h"
#include "vnode.h"
#define EXTRA_BYTES 2
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
......@@ -327,8 +327,8 @@ static void setQueryTimewindow(STsdbReadHandle* pTsdbReadHandle, SQueryTableData
if (updateTs) {
tsdbDebug("%p update the query time window, old:%" PRId64 " - %" PRId64 ", new:%" PRId64 " - %" PRId64 ", %s",
pTsdbReadHandle, pCond->twindows[tWinIdx].skey, pCond->twindows[tWinIdx].ekey, pTsdbReadHandle->window.skey,
pTsdbReadHandle->window.ekey, pTsdbReadHandle->idStr);
pTsdbReadHandle, pCond->twindows[tWinIdx].skey, pCond->twindows[tWinIdx].ekey,
pTsdbReadHandle->window.skey, pTsdbReadHandle->window.ekey, pTsdbReadHandle->idStr);
}
}
......@@ -586,7 +586,8 @@ void tsdbResetReadHandle(tsdbReaderT queryHandle, SQueryTableDataCond* pCond, in
resetCheckInfo(pTsdbReadHandle);
}
void tsdbResetQueryHandleForNewTable(tsdbReaderT queryHandle, SQueryTableDataCond* pCond, STableListInfo* tableList, int32_t tWinIdx) {
void tsdbResetQueryHandleForNewTable(tsdbReaderT queryHandle, SQueryTableDataCond* pCond, STableListInfo* tableList,
int32_t tWinIdx) {
STsdbReadHandle* pTsdbReadHandle = queryHandle;
pTsdbReadHandle->order = pCond->order;
......@@ -2845,6 +2846,22 @@ int32_t tsdbGetAllTableList(SMeta* pMeta, uint64_t uid, SArray* list) {
return TSDB_CODE_SUCCESS;
}
int32_t tsdbGetCtbIdList(SMeta* pMeta, int64_t suid, SArray* list) {
SMCtbCursor* pCur = metaOpenCtbCursor(pMeta, suid);
while (1) {
tb_uid_t id = metaCtbCursorNext(pCur);
if (id == 0) {
break;
}
taosArrayPush(list, &id);
}
metaCloseCtbCursor(pCur);
return TSDB_CODE_SUCCESS;
}
static void destroyHelper(void* param) {
if (param == NULL) {
return;
......
......@@ -26,22 +26,24 @@ typedef int32_t (*FTranslateFunc)(SFunctionNode* pFunc, char* pErrBuf, int32_t l
typedef EFuncDataRequired (*FFuncDataRequired)(SFunctionNode* pFunc, STimeWindow* pTimeWindow);
typedef struct SBuiltinFuncDefinition {
char name[FUNCTION_NAME_MAX_LENGTH];
EFunctionType type;
uint64_t classification;
FTranslateFunc translateFunc;
FFuncDataRequired dataRequiredFunc;
FExecGetEnv getEnvFunc;
FExecInit initFunc;
FExecProcess processFunc;
const char* name;
EFunctionType type;
uint64_t classification;
FTranslateFunc translateFunc;
FFuncDataRequired dataRequiredFunc;
FExecGetEnv getEnvFunc;
FExecInit initFunc;
FExecProcess processFunc;
FScalarExecProcess sprocessFunc;
FExecFinalize finalizeFunc;
FExecProcess invertFunc;
FExecCombine combineFunc;
FExecFinalize finalizeFunc;
FExecProcess invertFunc;
FExecCombine combineFunc;
const char* pPartialFunc;
const char* pMergeFunc;
} SBuiltinFuncDefinition;
extern const SBuiltinFuncDefinition funcMgtBuiltins[];
extern const int funcMgtBuiltinsNum;
extern const int funcMgtBuiltinsNum;
#ifdef __cplusplus
}
......
......@@ -178,14 +178,14 @@ static int32_t translatePercentile(SFunctionNode* pFunc, char* pErrBuf, int32_t
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
}
//param0
// param0
SNode* pParamNode0 = nodesListGetNode(pFunc->pParameterList, 0);
if (nodeType(pParamNode0) != QUERY_NODE_COLUMN) {
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
"The first parameter of PERCENTILE function can only be column");
}
//param1
// param1
SValueNode* pValue = (SValueNode*)nodesListGetNode(pFunc->pParameterList, 1);
if (pValue->datum.i < 0 || pValue->datum.i > 100) {
......@@ -200,7 +200,7 @@ static int32_t translatePercentile(SFunctionNode* pFunc, char* pErrBuf, int32_t
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
//set result type
// set result type
pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes, .type = TSDB_DATA_TYPE_DOUBLE};
return TSDB_CODE_SUCCESS;
}
......@@ -219,14 +219,14 @@ static int32_t translateApercentile(SFunctionNode* pFunc, char* pErrBuf, int32_t
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
}
//param0
// param0
SNode* pParamNode0 = nodesListGetNode(pFunc->pParameterList, 0);
if (nodeType(pParamNode0) != QUERY_NODE_COLUMN) {
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
"The first parameter of APERCENTILE function can only be column");
}
//param1
// param1
SNode* pParamNode1 = nodesListGetNode(pFunc->pParameterList, 1);
if (nodeType(pParamNode1) != QUERY_NODE_VALUE) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
......@@ -245,7 +245,7 @@ static int32_t translateApercentile(SFunctionNode* pFunc, char* pErrBuf, int32_t
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
//param2
// param2
if (3 == numOfParams) {
uint8_t para3Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 2))->resType.type;
if (!IS_VAR_DATA_TYPE(para3Type)) {
......@@ -285,14 +285,14 @@ static int32_t translateTop(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
//param0
// param0
SNode* pParamNode0 = nodesListGetNode(pFunc->pParameterList, 0);
if (nodeType(pParamNode0) != QUERY_NODE_COLUMN) {
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
"The first parameter of TOP/BOTTOM function can only be column");
}
//param1
// param1
SNode* pParamNode1 = nodesListGetNode(pFunc->pParameterList, 1);
if (nodeType(pParamNode1) != QUERY_NODE_VALUE) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
......@@ -309,7 +309,7 @@ static int32_t translateTop(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
pValue->notReserved = true;
//set result type
// set result type
SDataType* pType = &((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType;
pFunc->node.resType = (SDataType){.bytes = pType->bytes, .type = pType->type};
return TSDB_CODE_SUCCESS;
......@@ -709,8 +709,8 @@ static int32_t translateTail(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
if (pValue->datum.i < ((i > 1) ? 0 : 1) || pValue->datum.i > 100) {
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
"TAIL function second parameter should be in range [1, 100], "
"third parameter should be in range [0, 100]");
"TAIL function second parameter should be in range [1, 100], "
"third parameter should be in range [0, 100]");
}
pValue->notReserved = true;
......@@ -771,7 +771,7 @@ static int32_t translateDiff(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
}
//param0
// param0
SNode* pParamNode0 = nodesListGetNode(pFunc->pParameterList, 0);
if (nodeType(pParamNode0) != QUERY_NODE_COLUMN) {
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
......@@ -779,12 +779,11 @@ static int32_t translateDiff(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
}
uint8_t colType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type;
if (!IS_SIGNED_NUMERIC_TYPE(colType) && !IS_FLOAT_TYPE(colType) &&
TSDB_DATA_TYPE_BOOL != colType) {
if (!IS_SIGNED_NUMERIC_TYPE(colType) && !IS_FLOAT_TYPE(colType) && TSDB_DATA_TYPE_BOOL != colType) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
//param1
// param1
if (numOfParams == 2) {
uint8_t paraType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 1))->resType.type;
if (!IS_INTEGER_TYPE(paraType)) {
......@@ -911,7 +910,7 @@ static int32_t translateSubstr(SFunctionNode* pFunc, char* pErrBuf, int32_t len)
if (3 == numOfParams) {
SExprNode* p2 = (SExprNode*)nodesListGetNode(pFunc->pParameterList, 2);
uint8_t para2Type = p2->resType.type;
uint8_t para2Type = p2->resType.type;
if (!IS_INTEGER_TYPE(para2Type)) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
......@@ -1144,6 +1143,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.finalizeFunc = functionFinalize,
.invertFunc = countInvertFunction,
.combineFunc = combineFunction,
// .pPartialFunc = "count",
// .pMergeFunc = "sum"
},
{
.name = "sum",
......
......@@ -199,3 +199,81 @@ bool fmIsInvertible(int32_t funcId) {
}
return res;
}
static SFunctionNode* createFunction(const char* pName, SNodeList* pParameterList) {
SFunctionNode* pFunc = nodesMakeNode(QUERY_NODE_FUNCTION);
if (NULL == pFunc) {
return NULL;
}
strcpy(pFunc->functionName, pName);
pFunc->pParameterList = pParameterList;
char msg[64] = {0};
if (TSDB_CODE_SUCCESS != fmGetFuncInfo(pFunc, msg, sizeof(msg))) {
nodesDestroyNode(pFunc);
return NULL;
}
return pFunc;
}
static SColumnNode* createColumnByFunc(const SFunctionNode* pFunc) {
SColumnNode* pCol = nodesMakeNode(QUERY_NODE_COLUMN);
if (NULL == pCol) {
return NULL;
}
strcpy(pCol->colName, pFunc->node.aliasName);
pCol->node.resType = pFunc->node.resType;
return pCol;
}
bool fmIsDistExecFunc(int32_t funcId) {
if (!fmIsVectorFunc(funcId)) {
return true;
}
return (NULL != funcMgtBuiltins[funcId].pPartialFunc && NULL != funcMgtBuiltins[funcId].pMergeFunc);
}
static int32_t createPartialFunction(const SFunctionNode* pSrcFunc, SFunctionNode** pPartialFunc) {
SNodeList* pParameterList = nodesCloneList(pSrcFunc->pParameterList);
if (NULL == pParameterList) {
return TSDB_CODE_OUT_OF_MEMORY;
}
*pPartialFunc = createFunction(funcMgtBuiltins[pSrcFunc->funcId].pPartialFunc, pParameterList);
if (NULL == *pPartialFunc) {
nodesDestroyList(pParameterList);
return TSDB_CODE_OUT_OF_MEMORY;
}
snprintf((*pPartialFunc)->node.aliasName, sizeof((*pPartialFunc)->node.aliasName), "%s.%p",
(*pPartialFunc)->functionName, pSrcFunc);
return TSDB_CODE_SUCCESS;
}
static int32_t createMergeFunction(const SFunctionNode* pSrcFunc, const SFunctionNode* pPartialFunc,
SFunctionNode** pMergeFunc) {
SNodeList* pParameterList = NULL;
nodesListMakeStrictAppend(&pParameterList, createColumnByFunc(pPartialFunc));
*pMergeFunc = createFunction(funcMgtBuiltins[pSrcFunc->funcId].pMergeFunc, pParameterList);
if (NULL == *pMergeFunc) {
nodesDestroyList(pParameterList);
return TSDB_CODE_OUT_OF_MEMORY;
}
strcpy((*pMergeFunc)->node.aliasName, pSrcFunc->node.aliasName);
return TSDB_CODE_SUCCESS;
}
int32_t fmGetDistMethod(const SFunctionNode* pFunc, SFunctionNode** pPartialFunc, SFunctionNode** pMergeFunc) {
if (!fmIsDistExecFunc(pFunc->funcId)) {
return TSDB_CODE_FAILED;
}
int32_t code = createPartialFunction(pFunc, pPartialFunc);
if (TSDB_CODE_SUCCESS == code) {
code = createMergeFunction(pFunc, *pPartialFunc, pMergeFunc);
}
if (TSDB_CODE_SUCCESS != code) {
nodesDestroyNode(*pPartialFunc);
nodesDestroyNode(*pMergeFunc);
}
return code;
}
......@@ -373,7 +373,14 @@ static SNode* logicVnodeModifCopy(const SVnodeModifLogicNode* pSrc, SVnodeModifL
static SNode* logicExchangeCopy(const SExchangeLogicNode* pSrc, SExchangeLogicNode* pDst) {
COPY_BASE_OBJECT_FIELD(node, logicNodeCopy);
COPY_SCALAR_FIELD(srcGroupId);
COPY_SCALAR_FIELD(precision);
return (SNode*)pDst;
}
static SNode* logicMergeCopy(const SMergeLogicNode* pSrc, SMergeLogicNode* pDst) {
COPY_BASE_OBJECT_FIELD(node, logicNodeCopy);
CLONE_NODE_LIST_FIELD(pMergeKeys);
COPY_SCALAR_FIELD(numOfChannels);
COPY_SCALAR_FIELD(srcGroupId);
return (SNode*)pDst;
}
......@@ -537,6 +544,8 @@ SNodeptr nodesCloneNode(const SNodeptr pNode) {
return logicVnodeModifCopy((const SVnodeModifLogicNode*)pNode, (SVnodeModifLogicNode*)pDst);
case QUERY_NODE_LOGIC_PLAN_EXCHANGE:
return logicExchangeCopy((const SExchangeLogicNode*)pNode, (SExchangeLogicNode*)pDst);
case QUERY_NODE_LOGIC_PLAN_MERGE:
return logicMergeCopy((const SMergeLogicNode*)pNode, (SMergeLogicNode*)pDst);
case QUERY_NODE_LOGIC_PLAN_WINDOW:
return logicWindowCopy((const SWindowLogicNode*)pNode, (SWindowLogicNode*)pDst);
case QUERY_NODE_LOGIC_PLAN_FILL:
......
......@@ -190,6 +190,8 @@ const char* nodesNodeName(ENodeType type) {
return "LogicVnodeModif";
case QUERY_NODE_LOGIC_PLAN_EXCHANGE:
return "LogicExchange";
case QUERY_NODE_LOGIC_PLAN_MERGE:
return "LogicMerge";
case QUERY_NODE_LOGIC_PLAN_WINDOW:
return "LogicWindow";
case QUERY_NODE_LOGIC_PLAN_FILL:
......@@ -220,6 +222,8 @@ const char* nodesNodeName(ENodeType type) {
return "PhysiAgg";
case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE:
return "PhysiExchange";
case QUERY_NODE_PHYSICAL_PLAN_MERGE:
return "PhysiMerge";
case QUERY_NODE_PHYSICAL_PLAN_SORT:
return "PhysiSort";
case QUERY_NODE_PHYSICAL_PLAN_INTERVAL:
......@@ -596,7 +600,6 @@ static int32_t jsonToLogicProjectNode(const SJson* pJson, void* pObj) {
}
static const char* jkExchangeLogicPlanSrcGroupId = "SrcGroupId";
static const char* jkExchangeLogicPlanSrcPrecision = "Precision";
static int32_t logicExchangeNodeToJson(const void* pObj, SJson* pJson) {
const SExchangeLogicNode* pNode = (const SExchangeLogicNode*)pObj;
......@@ -605,9 +608,6 @@ static int32_t logicExchangeNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkExchangeLogicPlanSrcGroupId, pNode->srcGroupId);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkExchangeLogicPlanSrcPrecision, pNode->precision);
}
return code;
}
......@@ -619,8 +619,144 @@ static int32_t jsonToLogicExchangeNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetIntValue(pJson, jkExchangeLogicPlanSrcGroupId, &pNode->srcGroupId);
}
return code;
}
static const char* jkMergeLogicPlanMergeKeys = "MergeKeys";
static const char* jkMergeLogicPlanNumOfChannels = "NumOfChannels";
static const char* jkMergeLogicPlanSrcGroupId = "SrcGroupId";
static int32_t logicMergeNodeToJson(const void* pObj, SJson* pJson) {
const SMergeLogicNode* pNode = (const SMergeLogicNode*)pObj;
int32_t code = logicPlanNodeToJson(pObj, pJson);
if (TSDB_CODE_SUCCESS == code) {
code = nodeListToJson(pJson, jkMergeLogicPlanMergeKeys, pNode->pMergeKeys);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetUTinyIntValue(pJson, jkExchangeLogicPlanSrcPrecision, &pNode->precision);
code = tjsonAddIntegerToObject(pJson, jkMergeLogicPlanNumOfChannels, pNode->numOfChannels);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkMergeLogicPlanSrcGroupId, pNode->srcGroupId);
}
return code;
}
static int32_t jsonToLogicMergeNode(const SJson* pJson, void* pObj) {
SMergeLogicNode* pNode = (SMergeLogicNode*)pObj;
int32_t code = jsonToLogicPlanNode(pJson, pObj);
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkMergeLogicPlanMergeKeys, &pNode->pMergeKeys);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetIntValue(pJson, jkMergeLogicPlanNumOfChannels, &pNode->numOfChannels);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetIntValue(pJson, jkMergeLogicPlanSrcGroupId, &pNode->srcGroupId);
}
return code;
}
static const char* jkWindowLogicPlanWinType = "WinType";
static const char* jkWindowLogicPlanFuncs = "Funcs";
static const char* jkWindowLogicPlanInterval = "Interval";
static const char* jkWindowLogicPlanOffset = "Offset";
static const char* jkWindowLogicPlanSliding = "Sliding";
static const char* jkWindowLogicPlanIntervalUnit = "IntervalUnit";
static const char* jkWindowLogicPlanSlidingUnit = "SlidingUnit";
static const char* jkWindowLogicPlanSessionGap = "SessionGap";
static const char* jkWindowLogicPlanTspk = "Tspk";
static const char* jkWindowLogicPlanStateExpr = "StateExpr";
static const char* jkWindowLogicPlanTriggerType = "TriggerType";
static const char* jkWindowLogicPlanWatermark = "Watermark";
static int32_t logicWindowNodeToJson(const void* pObj, SJson* pJson) {
const SWindowLogicNode* pNode = (const SWindowLogicNode*)pObj;
int32_t code = logicPlanNodeToJson(pObj, pJson);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkWindowLogicPlanWinType, pNode->winType);
}
if (TSDB_CODE_SUCCESS == code) {
code = nodeListToJson(pJson, jkWindowLogicPlanFuncs, pNode->pFuncs);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkWindowLogicPlanInterval, pNode->interval);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkWindowLogicPlanOffset, pNode->offset);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkWindowLogicPlanSliding, pNode->sliding);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkWindowLogicPlanIntervalUnit, pNode->intervalUnit);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkWindowLogicPlanSlidingUnit, pNode->slidingUnit);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkWindowLogicPlanSessionGap, pNode->sessionGap);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkWindowLogicPlanTspk, nodeToJson, pNode->pTspk);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkWindowLogicPlanStateExpr, nodeToJson, pNode->pStateExpr);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkWindowLogicPlanTriggerType, pNode->triggerType);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkWindowLogicPlanWatermark, pNode->watermark);
}
return code;
}
static int32_t jsonToLogicWindowNode(const SJson* pJson, void* pObj) {
SWindowLogicNode* pNode = (SWindowLogicNode*)pObj;
int32_t code = jsonToLogicPlanNode(pJson, pObj);
if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkWindowLogicPlanWinType, pNode->winType, code);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkWindowLogicPlanFuncs, &pNode->pFuncs);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBigIntValue(pJson, jkWindowLogicPlanInterval, &pNode->interval);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBigIntValue(pJson, jkWindowLogicPlanOffset, &pNode->offset);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBigIntValue(pJson, jkWindowLogicPlanSliding, &pNode->sliding);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetTinyIntValue(pJson, jkWindowLogicPlanIntervalUnit, &pNode->intervalUnit);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetTinyIntValue(pJson, jkWindowLogicPlanSlidingUnit, &pNode->slidingUnit);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBigIntValue(pJson, jkWindowLogicPlanSessionGap, &pNode->sessionGap);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkWindowLogicPlanTspk, &pNode->pTspk);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkWindowLogicPlanStateExpr, &pNode->pStateExpr);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetTinyIntValue(pJson, jkWindowLogicPlanTriggerType, &pNode->triggerType);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBigIntValue(pJson, jkWindowLogicPlanWatermark, &pNode->watermark);
}
return code;
......@@ -1459,6 +1595,44 @@ static int32_t jsonToPhysiExchangeNode(const SJson* pJson, void* pObj) {
return code;
}
static const char* jkMergePhysiPlanMergeKeys = "MergeKeys";
static const char* jkMergePhysiPlanNumOfChannels = "NumOfChannels";
static const char* jkMergePhysiPlanSrcGroupId = "SrcGroupId";
static int32_t physiMergeNodeToJson(const void* pObj, SJson* pJson) {
const SMergePhysiNode* pNode = (const SMergePhysiNode*)pObj;
int32_t code = physicPlanNodeToJson(pObj, pJson);
if (TSDB_CODE_SUCCESS == code) {
code = nodeListToJson(pJson, jkMergePhysiPlanMergeKeys, pNode->pMergeKeys);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkMergePhysiPlanNumOfChannels, pNode->numOfChannels);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkMergePhysiPlanSrcGroupId, pNode->srcGroupId);
}
return code;
}
static int32_t jsonToPhysiMergeNode(const SJson* pJson, void* pObj) {
SMergePhysiNode* pNode = (SMergePhysiNode*)pObj;
int32_t code = jsonToPhysicPlanNode(pJson, pObj);
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkMergePhysiPlanMergeKeys, &pNode->pMergeKeys);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetIntValue(pJson, jkMergePhysiPlanNumOfChannels, &pNode->numOfChannels);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetIntValue(pJson, jkMergePhysiPlanSrcGroupId, &pNode->srcGroupId);
}
return code;
}
static const char* jkSortPhysiPlanExprs = "Exprs";
static const char* jkSortPhysiPlanSortKeys = "SortKeys";
static const char* jkSortPhysiPlanTargets = "Targets";
......@@ -3401,6 +3575,10 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
break;
case QUERY_NODE_LOGIC_PLAN_EXCHANGE:
return logicExchangeNodeToJson(pObj, pJson);
case QUERY_NODE_LOGIC_PLAN_MERGE:
return logicMergeNodeToJson(pObj, pJson);
case QUERY_NODE_LOGIC_PLAN_WINDOW:
return logicWindowNodeToJson(pObj, pJson);
case QUERY_NODE_LOGIC_PLAN_FILL:
return logicFillNodeToJson(pObj, pJson);
case QUERY_NODE_LOGIC_PLAN_SORT:
......@@ -3427,6 +3605,8 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
return physiAggNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE:
return physiExchangeNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_MERGE:
return physiMergeNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_SORT:
return physiSortNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_INTERVAL:
......@@ -3512,6 +3692,10 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
return jsonToLogicProjectNode(pJson, pObj);
case QUERY_NODE_LOGIC_PLAN_EXCHANGE:
return jsonToLogicExchangeNode(pJson, pObj);
case QUERY_NODE_LOGIC_PLAN_MERGE:
return jsonToLogicMergeNode(pJson, pObj);
case QUERY_NODE_LOGIC_PLAN_WINDOW:
return jsonToLogicWindowNode(pJson, pObj);
case QUERY_NODE_LOGIC_PLAN_FILL:
return jsonToLogicFillNode(pJson, pObj);
case QUERY_NODE_LOGIC_PLAN_SORT:
......@@ -3538,6 +3722,8 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
return jsonToPhysiAggNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE:
return jsonToPhysiExchangeNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_MERGE:
return jsonToPhysiMergeNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_SORT:
return jsonToPhysiSortNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_INTERVAL:
......
......@@ -220,6 +220,8 @@ SNodeptr nodesMakeNode(ENodeType type) {
return makeNode(type, sizeof(SVnodeModifLogicNode));
case QUERY_NODE_LOGIC_PLAN_EXCHANGE:
return makeNode(type, sizeof(SExchangeLogicNode));
case QUERY_NODE_LOGIC_PLAN_MERGE:
return makeNode(type, sizeof(SMergeLogicNode));
case QUERY_NODE_LOGIC_PLAN_WINDOW:
return makeNode(type, sizeof(SWindowLogicNode));
case QUERY_NODE_LOGIC_PLAN_FILL:
......@@ -250,6 +252,8 @@ SNodeptr nodesMakeNode(ENodeType type) {
return makeNode(type, sizeof(SAggPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE:
return makeNode(type, sizeof(SExchangePhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_MERGE:
return makeNode(type, sizeof(SMergePhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_SORT:
return makeNode(type, sizeof(SSortPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_INTERVAL:
......
......@@ -188,8 +188,8 @@ int32_t __catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* ve
}
int32_t __catalogGetDBVgInfo(SCatalog* pCtg, void* pRpc, const SEpSet* pMgmtEps, const char* dbFName,
SArray** vgroupList) {
return 0;
SArray** pVgList) {
return g_mockCatalogService->catalogGetDBVgInfo(dbFName, pVgList);
}
int32_t __catalogGetDBCfg(SCatalog* pCtg, void* pRpc, const SEpSet* pMgmtEps, const char* dbFName, SDbCfgInfo* pDbCfg) {
......
......@@ -18,6 +18,7 @@
#include <iomanip>
#include <iostream>
#include <map>
#include <set>
#include "tdatablock.h"
#include "tname.h"
......@@ -120,6 +121,25 @@ class MockCatalogServiceImpl {
return copyTableVgroup(db, tNameGetTableName(pTableName), vgList);
}
int32_t catalogGetDBVgInfo(const char* pDbFName, SArray** pVgList) const {
std::string dbFName(pDbFName);
DbMetaCache::const_iterator it = meta_.find(dbFName.substr(std::string(pDbFName).find_last_of('.') + 1));
if (meta_.end() == it) {
return TSDB_CODE_FAILED;
}
std::set<int32_t> vgSet;
*pVgList = taosArrayInit(it->second.size(), sizeof(SVgroupInfo));
for (const auto& vgs : it->second) {
for (const auto& vg : vgs.second->vgs) {
if (0 == vgSet.count(vg.vgId)) {
taosArrayPush(*pVgList, &vg);
vgSet.insert(vg.vgId);
}
}
}
return TSDB_CODE_SUCCESS;
}
int32_t catalogGetUdfInfo(const std::string& funcName, SFuncInfo* pInfo) const {
auto it = udf_.find(funcName);
if (udf_.end() == it) {
......@@ -187,8 +207,9 @@ class MockCatalogServiceImpl {
// number of backward fills
#define NOB(n) ((n) % 2 ? (n) / 2 + 1 : (n) / 2)
// center aligned
#define CA(n, s) std::setw(NOF((n) - int((s).length()))) << "" << (s) \
<< std::setw(NOB((n) - int((s).length()))) << "" << "|"
#define CA(n, s) \
std::setw(NOF((n) - int((s).length()))) << "" << (s) << std::setw(NOB((n) - int((s).length()))) << "" \
<< "|"
// string field length
#define SFL 20
// string field header
......@@ -490,6 +511,10 @@ int32_t MockCatalogService::catalogGetTableDistVgInfo(const SName* pTableName, S
return impl_->catalogGetTableDistVgInfo(pTableName, pVgList);
}
int32_t MockCatalogService::catalogGetDBVgInfo(const char* pDbFName, SArray** pVgList) const {
return impl_->catalogGetDBVgInfo(pDbFName, pVgList);
}
int32_t MockCatalogService::catalogGetUdfInfo(const std::string& funcName, SFuncInfo* pInfo) const {
return impl_->catalogGetUdfInfo(funcName, pInfo);
}
......
......@@ -61,6 +61,7 @@ class MockCatalogService {
int32_t catalogGetTableMeta(const SName* pTableName, STableMeta** pTableMeta) const;
int32_t catalogGetTableHashVgroup(const SName* pTableName, SVgroupInfo* vgInfo) const;
int32_t catalogGetTableDistVgInfo(const SName* pTableName, SArray** pVgList) const;
int32_t catalogGetDBVgInfo(const char* pDbFName, SArray** pVgList) const;
int32_t catalogGetUdfInfo(const std::string& funcName, SFuncInfo* pInfo) const;
int32_t catalogGetAllMeta(const SCatalogReq* pCatalogReq, SMetaData* pMetaData) const;
......
......@@ -36,6 +36,7 @@ extern "C" {
#define planTrace(param, ...) qTrace("PLAN: " param, __VA_ARGS__)
int32_t generateUsageErrMsg(char* pBuf, int32_t len, int32_t errCode, ...);
int32_t createColumnByRewriteExps(SNodeList* pExprs, SNodeList** pList);
int32_t createLogicPlan(SPlanContext* pCxt, SLogicNode** pLogicNode);
int32_t optimizeLogicPlan(SPlanContext* pCxt, SLogicNode* pLogicNode);
......
......@@ -133,56 +133,56 @@ static int32_t createChildLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelec
return code;
}
typedef struct SCreateColumnCxt {
int32_t errCode;
SNodeList* pList;
} SCreateColumnCxt;
static EDealRes doCreateColumn(SNode* pNode, void* pContext) {
SCreateColumnCxt* pCxt = (SCreateColumnCxt*)pContext;
switch (nodeType(pNode)) {
case QUERY_NODE_COLUMN: {
SNode* pCol = nodesCloneNode(pNode);
if (NULL == pCol) {
return DEAL_RES_ERROR;
}
return (TSDB_CODE_SUCCESS == nodesListAppend(pCxt->pList, pCol) ? DEAL_RES_IGNORE_CHILD : DEAL_RES_ERROR);
}
case QUERY_NODE_OPERATOR:
case QUERY_NODE_LOGIC_CONDITION:
case QUERY_NODE_FUNCTION: {
SExprNode* pExpr = (SExprNode*)pNode;
SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
if (NULL == pCol) {
return DEAL_RES_ERROR;
}
pCol->node.resType = pExpr->resType;
strcpy(pCol->colName, pExpr->aliasName);
return (TSDB_CODE_SUCCESS == nodesListAppend(pCxt->pList, pCol) ? DEAL_RES_IGNORE_CHILD : DEAL_RES_ERROR);
}
default:
break;
}
return DEAL_RES_CONTINUE;
}
static int32_t createColumnByRewriteExps(SLogicPlanContext* pCxt, SNodeList* pExprs, SNodeList** pList) {
SCreateColumnCxt cxt = {.errCode = TSDB_CODE_SUCCESS, .pList = (NULL == *pList ? nodesMakeList() : *pList)};
if (NULL == cxt.pList) {
return TSDB_CODE_OUT_OF_MEMORY;
}
nodesWalkExprs(pExprs, doCreateColumn, &cxt);
if (TSDB_CODE_SUCCESS != cxt.errCode) {
nodesDestroyList(cxt.pList);
return cxt.errCode;
}
if (NULL == *pList) {
*pList = cxt.pList;
}
return cxt.errCode;
}
// typedef struct SCreateColumnCxt {
// int32_t errCode;
// SNodeList* pList;
// } SCreateColumnCxt;
// static EDealRes doCreateColumn(SNode* pNode, void* pContext) {
// SCreateColumnCxt* pCxt = (SCreateColumnCxt*)pContext;
// switch (nodeType(pNode)) {
// case QUERY_NODE_COLUMN: {
// SNode* pCol = nodesCloneNode(pNode);
// if (NULL == pCol) {
// return DEAL_RES_ERROR;
// }
// return (TSDB_CODE_SUCCESS == nodesListAppend(pCxt->pList, pCol) ? DEAL_RES_IGNORE_CHILD : DEAL_RES_ERROR);
// }
// case QUERY_NODE_OPERATOR:
// case QUERY_NODE_LOGIC_CONDITION:
// case QUERY_NODE_FUNCTION: {
// SExprNode* pExpr = (SExprNode*)pNode;
// SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
// if (NULL == pCol) {
// return DEAL_RES_ERROR;
// }
// pCol->node.resType = pExpr->resType;
// strcpy(pCol->colName, pExpr->aliasName);
// return (TSDB_CODE_SUCCESS == nodesListAppend(pCxt->pList, pCol) ? DEAL_RES_IGNORE_CHILD : DEAL_RES_ERROR);
// }
// default:
// break;
// }
// return DEAL_RES_CONTINUE;
// }
// static int32_t createColumnByRewriteExps(SNodeList* pExprs, SNodeList** pList) {
// SCreateColumnCxt cxt = {.errCode = TSDB_CODE_SUCCESS, .pList = (NULL == *pList ? nodesMakeList() : *pList)};
// if (NULL == cxt.pList) {
// return TSDB_CODE_OUT_OF_MEMORY;
// }
// nodesWalkExprs(pExprs, doCreateColumn, &cxt);
// if (TSDB_CODE_SUCCESS != cxt.errCode) {
// nodesDestroyList(cxt.pList);
// return cxt.errCode;
// }
// if (NULL == *pList) {
// *pList = cxt.pList;
// }
// return cxt.errCode;
// }
static EScanType getScanType(SLogicPlanContext* pCxt, SNodeList* pScanPseudoCols, SNodeList* pScanCols,
STableMeta* pMeta) {
......@@ -294,10 +294,10 @@ static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
// set output
if (TSDB_CODE_SUCCESS == code) {
code = createColumnByRewriteExps(pCxt, pScan->pScanCols, &pScan->node.pTargets);
code = createColumnByRewriteExps(pScan->pScanCols, &pScan->node.pTargets);
}
if (TSDB_CODE_SUCCESS == code) {
code = createColumnByRewriteExps(pCxt, pScan->pScanPseudoCols, &pScan->node.pTargets);
code = createColumnByRewriteExps(pScan->pScanPseudoCols, &pScan->node.pTargets);
}
if (TSDB_CODE_SUCCESS == code) {
......@@ -463,10 +463,10 @@ static int32_t createAggLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect,
// set the output
if (TSDB_CODE_SUCCESS == code && NULL != pAgg->pGroupKeys) {
code = createColumnByRewriteExps(pCxt, pAgg->pGroupKeys, &pAgg->node.pTargets);
code = createColumnByRewriteExps(pAgg->pGroupKeys, &pAgg->node.pTargets);
}
if (TSDB_CODE_SUCCESS == code && NULL != pAgg->pAggFuncs) {
code = createColumnByRewriteExps(pCxt, pAgg->pAggFuncs, &pAgg->node.pTargets);
code = createColumnByRewriteExps(pAgg->pAggFuncs, &pAgg->node.pTargets);
}
if (TSDB_CODE_SUCCESS == code) {
......@@ -496,7 +496,7 @@ static int32_t createWindowLogicNodeFinalize(SLogicPlanContext* pCxt, SSelectStm
}
if (TSDB_CODE_SUCCESS == code) {
code = createColumnByRewriteExps(pCxt, pWindow->pFuncs, &pWindow->node.pTargets);
code = createColumnByRewriteExps(pWindow->pFuncs, &pWindow->node.pTargets);
}
pSelect->hasAggFuncs = false;
......@@ -766,7 +766,7 @@ static int32_t createDistinctLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSe
// set the output
if (TSDB_CODE_SUCCESS == code) {
code = createColumnByRewriteExps(pCxt, pAgg->pGroupKeys, &pAgg->node.pTargets);
code = createColumnByRewriteExps(pAgg->pGroupKeys, &pAgg->node.pTargets);
}
if (TSDB_CODE_SUCCESS == code) {
......@@ -913,7 +913,7 @@ static int32_t createSetOpAggLogicNode(SLogicPlanContext* pCxt, SSetOperator* pS
// set the output
if (TSDB_CODE_SUCCESS == code) {
code = createColumnByRewriteExps(pCxt, pAgg->pGroupKeys, &pAgg->node.pTargets);
code = createColumnByRewriteExps(pAgg->pGroupKeys, &pAgg->node.pTargets);
}
if (TSDB_CODE_SUCCESS == code) {
......
......@@ -839,7 +839,7 @@ static int32_t createProjectPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChild
static int32_t doCreateExchangePhysiNode(SPhysiPlanContext* pCxt, SExchangeLogicNode* pExchangeLogicNode,
SPhysiNode** pPhyNode) {
SExchangePhysiNode* pExchange = (SExchangePhysiNode*)makePhysiNode(
pCxt, pExchangeLogicNode->precision, (SLogicNode*)pExchangeLogicNode, QUERY_NODE_PHYSICAL_PLAN_EXCHANGE);
pCxt, pExchangeLogicNode->node.precision, (SLogicNode*)pExchangeLogicNode, QUERY_NODE_PHYSICAL_PLAN_EXCHANGE);
if (NULL == pExchange) {
return TSDB_CODE_OUT_OF_MEMORY;
}
......@@ -849,10 +849,11 @@ static int32_t doCreateExchangePhysiNode(SPhysiPlanContext* pCxt, SExchangeLogic
return TSDB_CODE_SUCCESS;
}
static int32_t createStreamScanPhysiNodeByExchange(SPhysiPlanContext* pCxt, SExchangeLogicNode* pExchangeLogicNode,
SPhysiNode** pPhyNode) {
SScanPhysiNode* pScan = (SScanPhysiNode*)makePhysiNode(
pCxt, pExchangeLogicNode->precision, (SLogicNode*)pExchangeLogicNode, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN);
pCxt, pExchangeLogicNode->node.precision, (SLogicNode*)pExchangeLogicNode, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN);
if (NULL == pScan) {
return TSDB_CODE_OUT_OF_MEMORY;
}
......@@ -954,7 +955,8 @@ static int32_t createSessionWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList*
SWindowLogicNode* pWindowLogicNode, SPhysiNode** pPhyNode) {
SSessionWinodwPhysiNode* pSession = (SSessionWinodwPhysiNode*)makePhysiNode(
pCxt, getPrecision(pChildren), (SLogicNode*)pWindowLogicNode,
(pCxt->pPlanCxt->streamQuery ? QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION_WINDOW : QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW));
(pCxt->pPlanCxt->streamQuery ? QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION_WINDOW
: QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW));
if (NULL == pSession) {
return TSDB_CODE_OUT_OF_MEMORY;
}
......@@ -1137,6 +1139,54 @@ static int32_t createFillPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren
return code;
}
static int32_t createExchangePhysiNodeByMerge(SMergePhysiNode* pMerge) {
SExchangePhysiNode* pExchange = nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_EXCHANGE);
if (NULL == pExchange) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pExchange->srcGroupId = pMerge->srcGroupId;
pExchange->node.pParent = (SPhysiNode*)pMerge;
pExchange->node.pOutputDataBlockDesc = nodesCloneNode(pMerge->node.pOutputDataBlockDesc);
if (NULL == pExchange->node.pOutputDataBlockDesc) {
nodesDestroyNode(pExchange);
return TSDB_CODE_OUT_OF_MEMORY;
}
return nodesListMakeStrictAppend(&pMerge->node.pChildren, pExchange);
}
static int32_t createMergePhysiNode(SPhysiPlanContext* pCxt, SMergeLogicNode* pMergeLogicNode, SPhysiNode** pPhyNode) {
SMergePhysiNode* pMerge = (SMergePhysiNode*)makePhysiNode(
pCxt, pMergeLogicNode->node.precision, (SLogicNode*)pMergeLogicNode, QUERY_NODE_PHYSICAL_PLAN_MERGE);
if (NULL == pMerge) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pMerge->numOfChannels = pMergeLogicNode->numOfChannels;
pMerge->srcGroupId = pMergeLogicNode->srcGroupId;
int32_t code = TSDB_CODE_SUCCESS;
for (int32_t i = 0; i < pMerge->numOfChannels; ++i) {
code = createExchangePhysiNodeByMerge(pMerge);
if (TSDB_CODE_SUCCESS != code) {
break;
}
}
if (TSDB_CODE_SUCCESS == code) {
code = setListSlotId(pCxt, pMerge->node.pOutputDataBlockDesc->dataBlockId, -1, pMergeLogicNode->pMergeKeys,
&pMerge->pMergeKeys);
}
if (TSDB_CODE_SUCCESS == code) {
*pPhyNode = (SPhysiNode*)pMerge;
} else {
nodesDestroyNode(pMerge);
}
return code;
}
static int32_t doCreatePhysiNode(SPhysiPlanContext* pCxt, SLogicNode* pLogicNode, SSubplan* pSubplan,
SNodeList* pChildren, SPhysiNode** pPhyNode) {
switch (nodeType(pLogicNode)) {
......@@ -1158,6 +1208,8 @@ static int32_t doCreatePhysiNode(SPhysiPlanContext* pCxt, SLogicNode* pLogicNode
return createPartitionPhysiNode(pCxt, pChildren, (SPartitionLogicNode*)pLogicNode, pPhyNode);
case QUERY_NODE_LOGIC_PLAN_FILL:
return createFillPhysiNode(pCxt, pChildren, (SFillLogicNode*)pLogicNode, pPhyNode);
case QUERY_NODE_LOGIC_PLAN_MERGE:
return createMergePhysiNode(pCxt, (SMergeLogicNode*)pLogicNode, pPhyNode);
default:
break;
}
......@@ -1188,9 +1240,13 @@ static int32_t createPhysiNode(SPhysiPlanContext* pCxt, SLogicNode* pLogicNode,
}
if (TSDB_CODE_SUCCESS == code) {
(*pPhyNode)->pChildren = pChildren;
SNode* pChild;
FOREACH(pChild, (*pPhyNode)->pChildren) { ((SPhysiNode*)pChild)->pParent = (*pPhyNode); }
if (LIST_LENGTH(pChildren) > 0) {
(*pPhyNode)->pChildren = pChildren;
SNode* pChild;
FOREACH(pChild, (*pPhyNode)->pChildren) { ((SPhysiNode*)pChild)->pParent = (*pPhyNode); }
} else {
nodesDestroyList(pChildren);
}
} else {
nodesDestroyList(pChildren);
}
......
......@@ -34,3 +34,54 @@ int32_t generateUsageErrMsg(char* pBuf, int32_t len, int32_t errCode, ...) {
va_end(vArgList);
return errCode;
}
typedef struct SCreateColumnCxt {
int32_t errCode;
SNodeList* pList;
} SCreateColumnCxt;
static EDealRes doCreateColumn(SNode* pNode, void* pContext) {
SCreateColumnCxt* pCxt = (SCreateColumnCxt*)pContext;
switch (nodeType(pNode)) {
case QUERY_NODE_COLUMN: {
SNode* pCol = nodesCloneNode(pNode);
if (NULL == pCol) {
return DEAL_RES_ERROR;
}
return (TSDB_CODE_SUCCESS == nodesListAppend(pCxt->pList, pCol) ? DEAL_RES_IGNORE_CHILD : DEAL_RES_ERROR);
}
case QUERY_NODE_OPERATOR:
case QUERY_NODE_LOGIC_CONDITION:
case QUERY_NODE_FUNCTION: {
SExprNode* pExpr = (SExprNode*)pNode;
SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
if (NULL == pCol) {
return DEAL_RES_ERROR;
}
pCol->node.resType = pExpr->resType;
strcpy(pCol->colName, pExpr->aliasName);
return (TSDB_CODE_SUCCESS == nodesListAppend(pCxt->pList, pCol) ? DEAL_RES_IGNORE_CHILD : DEAL_RES_ERROR);
}
default:
break;
}
return DEAL_RES_CONTINUE;
}
int32_t createColumnByRewriteExps(SNodeList* pExprs, SNodeList** pList) {
SCreateColumnCxt cxt = {.errCode = TSDB_CODE_SUCCESS, .pList = (NULL == *pList ? nodesMakeList() : *pList)};
if (NULL == cxt.pList) {
return TSDB_CODE_OUT_OF_MEMORY;
}
nodesWalkExprs(pExprs, doCreateColumn, &cxt);
if (TSDB_CODE_SUCCESS != cxt.errCode) {
nodesDestroyList(cxt.pList);
return cxt.errCode;
}
if (NULL == *pList) {
*pList = cxt.pList;
}
return cxt.errCode;
}
......@@ -58,16 +58,19 @@ static int32_t setSubplanExecutionNode(SPhysiNode* pNode, int32_t groupId, SDown
if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == nodeType(pNode)) {
SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pNode;
if (pExchange->srcGroupId == groupId) {
if (NULL == pExchange->pSrcEndPoints) {
pExchange->pSrcEndPoints = nodesMakeList();
if (NULL == pExchange->pSrcEndPoints) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
if (TSDB_CODE_SUCCESS != nodesListStrictAppend(pExchange->pSrcEndPoints, nodesCloneNode(pSource))) {
return TSDB_CODE_OUT_OF_MEMORY;
return nodesListMakeStrictAppend(&pExchange->pSrcEndPoints, nodesCloneNode(pSource));
}
} else if (QUERY_NODE_PHYSICAL_PLAN_MERGE == nodeType(pNode)) {
SMergePhysiNode* pMerge = (SMergePhysiNode*)pNode;
if (pMerge->srcGroupId == groupId) {
SExchangePhysiNode* pExchange =
(SExchangePhysiNode*)nodesListGetNode(pMerge->node.pChildren, pMerge->numOfChannels - 1);
if (1 == pMerge->numOfChannels) {
pMerge->numOfChannels = LIST_LENGTH(pMerge->node.pChildren);
} else {
--(pMerge->numOfChannels);
}
return TSDB_CODE_SUCCESS;
return nodesListMakeStrictAppend(&pExchange->pSrcEndPoints, nodesCloneNode(pSource));
}
}
......
......@@ -50,4 +50,10 @@ TEST_F(PlanIntervalTest, selectFunc) {
run("SELECT MAX(c1), MIN(c1) FROM t1 INTERVAL(10s)");
// select function along with the columns of select row, and with INTERVAL clause
run("SELECT MAX(c1), c2 FROM t1 INTERVAL(10s)");
}
\ No newline at end of file
}
TEST_F(PlanIntervalTest, stable) {
useDb("root", "test");
run("SELECT COUNT(*) FROM st1 INTERVAL(10s)");
}
......@@ -58,7 +58,7 @@
./test.sh -f tsim/mnode/basic1.sim
./test.sh -f tsim/mnode/basic2.sim
./test.sh -f tsim/mnode/basic3.sim
./test.sh -f tsim/mnode/basic4.sim
#./test.sh -f tsim/mnode/basic4.sim
# ---- show
./test.sh -f tsim/show/basic.sim
......
......@@ -191,4 +191,4 @@ endi
system sh/exec.sh -n dnode1 -s stop
system sh/exec.sh -n dnode2 -s stop
system sh/exec.sh -n dnode3 -s stop
system sh/exec.sh -n dnode4 -s stop
\ No newline at end of file
system sh/exec.sh -n dnode4 -s stop
......@@ -18,7 +18,7 @@ python3 ./test.py -f 0-others/fsync.py
python3 ./test.py -f 1-insert/influxdb_line_taosc_insert.py
python3 ./test.py -f 1-insert/opentsdb_telnet_line_taosc_insert.py
python3 ./test.py -f 1-insert/opentsdb_json_taosc_insert.py
python3 ./test.py -f 1-insert/test_stmt_muti_insert_query.py
#python3 ./test.py -f 1-insert/test_stmt_muti_insert_query.py
python3 ./test.py -f 2-query/between.py
python3 ./test.py -f 2-query/distinct.py
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册