提交 8305f7e4 编写于 作者: L Liu Jicong

Merge branch '3.0' into feature/tq

...@@ -14,3 +14,6 @@ ...@@ -14,3 +14,6 @@
path = tests path = tests
url = https://github.com/taosdata/tests url = https://github.com/taosdata/tests
branch = 3.0 branch = 3.0
[submodule "examples/rust"]
path = examples/rust
url = https://github.com/songtianyi/tdengine-rust-bindings.git
Subproject commit 1c8924dc668e6aa848214c2fc54e3ace3f5bf8df
...@@ -100,7 +100,7 @@ typedef enum _mgmt_table { ...@@ -100,7 +100,7 @@ typedef enum _mgmt_table {
TSDB_MGMT_TABLE_STREAMS, TSDB_MGMT_TABLE_STREAMS,
TSDB_MGMT_TABLE_VARIABLES, TSDB_MGMT_TABLE_VARIABLES,
TSDB_MGMT_TABLE_CONNS, TSDB_MGMT_TABLE_CONNS,
TSDB_MGMT_TABLE_SCORES, TSDB_MGMT_TABLE_TRANS,
TSDB_MGMT_TABLE_GRANTS, TSDB_MGMT_TABLE_GRANTS,
TSDB_MGMT_TABLE_VNODES, TSDB_MGMT_TABLE_VNODES,
TSDB_MGMT_TABLE_CLUSTER, TSDB_MGMT_TABLE_CLUSTER,
...@@ -524,6 +524,7 @@ typedef struct { ...@@ -524,6 +524,7 @@ typedef struct {
int8_t update; int8_t update;
int8_t cacheLastRow; int8_t cacheLastRow;
int8_t ignoreExist; int8_t ignoreExist;
int8_t streamMode;
} SCreateDbReq; } SCreateDbReq;
int32_t tSerializeSCreateDbReq(void* buf, int32_t bufLen, SCreateDbReq* pReq); int32_t tSerializeSCreateDbReq(void* buf, int32_t bufLen, SCreateDbReq* pReq);
...@@ -977,6 +978,13 @@ typedef struct { ...@@ -977,6 +978,13 @@ typedef struct {
int32_t tSerializeSKillConnReq(void* buf, int32_t bufLen, SKillConnReq* pReq); int32_t tSerializeSKillConnReq(void* buf, int32_t bufLen, SKillConnReq* pReq);
int32_t tDeserializeSKillConnReq(void* buf, int32_t bufLen, SKillConnReq* pReq); int32_t tDeserializeSKillConnReq(void* buf, int32_t bufLen, SKillConnReq* pReq);
typedef struct {
int32_t transId;
} SKillTransReq;
int32_t tSerializeSKillTransReq(void* buf, int32_t bufLen, SKillTransReq* pReq);
int32_t tDeserializeSKillTransReq(void* buf, int32_t bufLen, SKillTransReq* pReq);
typedef struct { typedef struct {
char user[TSDB_USER_LEN]; char user[TSDB_USER_LEN];
char spi; char spi;
......
...@@ -136,6 +136,7 @@ enum { ...@@ -136,6 +136,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_MND_SHOW_RETRIEVE, "mnode-retrieve", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_SHOW_RETRIEVE, "mnode-retrieve", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_STATUS, "mnode-status", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_STATUS, "mnode-status", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_TRANS, "mnode-trans", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TRANS, "mnode-trans", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_KILL_TRANS, "mnode-kill-trans", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_GRANT, "mnode-grant", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_GRANT, "mnode-grant", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_AUTH, "mnode-auth", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_AUTH, "mnode-auth", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_TOPIC, "mnode-create-topic", SMCreateTopicReq, SMCreateTopicRsp) TD_DEF_MSG_TYPE(TDMT_MND_CREATE_TOPIC, "mnode-create-topic", SMCreateTopicReq, SMCreateTopicRsp)
......
...@@ -126,86 +126,87 @@ ...@@ -126,86 +126,87 @@
#define TK_PRECISION 108 #define TK_PRECISION 108
#define TK_UPDATE 109 #define TK_UPDATE 109
#define TK_CACHELAST 110 #define TK_CACHELAST 110
#define TK_UNSIGNED 111 #define TK_STREAM 111
#define TK_TAGS 112 #define TK_MODE 112
#define TK_USING 113 #define TK_UNSIGNED 113
#define TK_NULL 114 #define TK_TAGS 114
#define TK_NOW 115 #define TK_USING 115
#define TK_SELECT 116 #define TK_NULL 116
#define TK_UNION 117 #define TK_NOW 117
#define TK_ALL 118 #define TK_SELECT 118
#define TK_DISTINCT 119 #define TK_UNION 119
#define TK_FROM 120 #define TK_ALL 120
#define TK_VARIABLE 121 #define TK_DISTINCT 121
#define TK_INTERVAL 122 #define TK_FROM 122
#define TK_EVERY 123 #define TK_VARIABLE 123
#define TK_SESSION 124 #define TK_INTERVAL 124
#define TK_STATE_WINDOW 125 #define TK_EVERY 125
#define TK_FILL 126 #define TK_SESSION 126
#define TK_SLIDING 127 #define TK_STATE_WINDOW 127
#define TK_ORDER 128 #define TK_FILL 128
#define TK_BY 129 #define TK_SLIDING 129
#define TK_ASC 130 #define TK_ORDER 130
#define TK_GROUP 131 #define TK_BY 131
#define TK_HAVING 132 #define TK_ASC 132
#define TK_LIMIT 133 #define TK_GROUP 133
#define TK_OFFSET 134 #define TK_HAVING 134
#define TK_SLIMIT 135 #define TK_LIMIT 135
#define TK_SOFFSET 136 #define TK_OFFSET 136
#define TK_WHERE 137 #define TK_SLIMIT 137
#define TK_RESET 138 #define TK_SOFFSET 138
#define TK_QUERY 139 #define TK_WHERE 139
#define TK_SYNCDB 140 #define TK_RESET 140
#define TK_ADD 141 #define TK_QUERY 141
#define TK_COLUMN 142 #define TK_SYNCDB 142
#define TK_MODIFY 143 #define TK_ADD 143
#define TK_TAG 144 #define TK_COLUMN 144
#define TK_CHANGE 145 #define TK_MODIFY 145
#define TK_SET 146 #define TK_TAG 146
#define TK_KILL 147 #define TK_CHANGE 147
#define TK_CONNECTION 148 #define TK_SET 148
#define TK_STREAM 149 #define TK_KILL 149
#define TK_COLON 150 #define TK_CONNECTION 150
#define TK_ABORT 151 #define TK_COLON 151
#define TK_AFTER 152 #define TK_ABORT 152
#define TK_ATTACH 153 #define TK_AFTER 153
#define TK_BEFORE 154 #define TK_ATTACH 154
#define TK_BEGIN 155 #define TK_BEFORE 155
#define TK_CASCADE 156 #define TK_BEGIN 156
#define TK_CLUSTER 157 #define TK_CASCADE 157
#define TK_CONFLICT 158 #define TK_CLUSTER 158
#define TK_COPY 159 #define TK_CONFLICT 159
#define TK_DEFERRED 160 #define TK_COPY 160
#define TK_DELIMITERS 161 #define TK_DEFERRED 161
#define TK_DETACH 162 #define TK_DELIMITERS 162
#define TK_EACH 163 #define TK_DETACH 163
#define TK_END 164 #define TK_EACH 164
#define TK_EXPLAIN 165 #define TK_END 165
#define TK_FAIL 166 #define TK_EXPLAIN 166
#define TK_FOR 167 #define TK_FAIL 167
#define TK_IGNORE 168 #define TK_FOR 168
#define TK_IMMEDIATE 169 #define TK_IGNORE 169
#define TK_INITIALLY 170 #define TK_IMMEDIATE 170
#define TK_INSTEAD 171 #define TK_INITIALLY 171
#define TK_KEY 172 #define TK_INSTEAD 172
#define TK_OF 173 #define TK_KEY 173
#define TK_RAISE 174 #define TK_OF 174
#define TK_REPLACE 175 #define TK_RAISE 175
#define TK_RESTRICT 176 #define TK_REPLACE 176
#define TK_ROW 177 #define TK_RESTRICT 177
#define TK_STATEMENT 178 #define TK_ROW 178
#define TK_TRIGGER 179 #define TK_STATEMENT 179
#define TK_VIEW 180 #define TK_TRIGGER 180
#define TK_SEMI 181 #define TK_VIEW 181
#define TK_NONE 182 #define TK_SEMI 182
#define TK_PREV 183 #define TK_NONE 183
#define TK_LINEAR 184 #define TK_PREV 184
#define TK_IMPORT 185 #define TK_LINEAR 185
#define TK_TBNAME 186 #define TK_IMPORT 186
#define TK_JOIN 187 #define TK_TBNAME 187
#define TK_INSERT 188 #define TK_JOIN 188
#define TK_INTO 189 #define TK_INSERT 189
#define TK_VALUES 190 #define TK_INTO 190
#define TK_VALUES 191
#define NEW_TK_OR 1 #define NEW_TK_OR 1
#define NEW_TK_AND 2 #define NEW_TK_AND 2
......
...@@ -72,8 +72,10 @@ typedef enum ENodeType { ...@@ -72,8 +72,10 @@ typedef enum ENodeType {
QUERY_NODE_SHOW_STMT, QUERY_NODE_SHOW_STMT,
QUERY_NODE_LOGIC_PLAN_SCAN, QUERY_NODE_LOGIC_PLAN_SCAN,
QUERY_NODE_LOGIC_PLAN_JOIN,
QUERY_NODE_LOGIC_PLAN_FILTER, QUERY_NODE_LOGIC_PLAN_FILTER,
QUERY_NODE_LOGIC_PLAN_AGG QUERY_NODE_LOGIC_PLAN_AGG,
QUERY_NODE_LOGIC_PLAN_PROJECT
} ENodeType; } ENodeType;
/** /**
...@@ -100,7 +102,7 @@ SNode* nodesMakeNode(ENodeType type); ...@@ -100,7 +102,7 @@ SNode* nodesMakeNode(ENodeType type);
void nodesDestroyNode(SNode* pNode); void nodesDestroyNode(SNode* pNode);
SNodeList* nodesMakeList(); SNodeList* nodesMakeList();
SNodeList* nodesListAppend(SNodeList* pList, SNode* pNode); int32_t nodesListAppend(SNodeList* pList, SNode* pNode);
SListCell* nodesListErase(SNodeList* pList, SListCell* pCell); SListCell* nodesListErase(SNodeList* pList, SListCell* pCell);
SNode* nodesListGetNode(SNodeList* pList, int32_t index); SNode* nodesListGetNode(SNodeList* pList, int32_t index);
void nodesDestroyList(SNodeList* pList); void nodesDestroyList(SNodeList* pList);
......
...@@ -50,6 +50,7 @@ typedef enum EColumnType { ...@@ -50,6 +50,7 @@ typedef enum EColumnType {
typedef struct SColumnNode { typedef struct SColumnNode {
SExprNode node; // QUERY_NODE_COLUMN SExprNode node; // QUERY_NODE_COLUMN
uint64_t tableId;
int16_t colId; int16_t colId;
EColumnType colType; // column or tag EColumnType colType; // column or tag
char dbName[TSDB_DB_NAME_LEN]; char dbName[TSDB_DB_NAME_LEN];
...@@ -59,10 +60,11 @@ typedef struct SColumnNode { ...@@ -59,10 +60,11 @@ typedef struct SColumnNode {
SNode* pProjectRef; SNode* pProjectRef;
} SColumnNode; } SColumnNode;
typedef struct SColumnRef { typedef struct SColumnRefNode {
ENodeType type; ENodeType type;
int32_t tupleId;
int32_t slotId; int32_t slotId;
} SColumnRef; } SColumnRefNode;
typedef struct SValueNode { typedef struct SValueNode {
SExprNode node; // QUERY_NODE_VALUE SExprNode node; // QUERY_NODE_VALUE
...@@ -269,6 +271,25 @@ typedef struct SSetOperator { ...@@ -269,6 +271,25 @@ typedef struct SSetOperator {
SNode* pLimit; SNode* pLimit;
} SSetOperator; } SSetOperator;
typedef enum ESqlClause {
SQL_CLAUSE_FROM = 1,
SQL_CLAUSE_WHERE,
SQL_CLAUSE_PARTITION_BY,
SQL_CLAUSE_WINDOW,
SQL_CLAUSE_GROUP_BY,
SQL_CLAUSE_HAVING,
SQL_CLAUSE_SELECT,
SQL_CLAUSE_ORDER_BY
} ESqlClause;
void nodesWalkSelectStmt(SSelectStmt* pSelect, ESqlClause clause, FNodeWalker walker, void* pContext);
void nodesRewriteSelectStmt(SSelectStmt* pSelect, ESqlClause clause, FNodeRewriter rewriter, void* pContext);
int32_t nodesCollectColumns(SSelectStmt* pSelect, ESqlClause clause, uint64_t tableId, bool realCol, SNodeList** pCols);
typedef bool (*FFuncClassifier)(int32_t funcId);
int32_t nodesCollectFuncs(SSelectStmt* pSelect, FFuncClassifier classifier, SNodeList** pFuncs);
bool nodesIsExprNode(const SNode* pNode); bool nodesIsExprNode(const SNode* pNode);
bool nodesIsArithmeticOp(const SOperatorNode* pOp); bool nodesIsArithmeticOp(const SOperatorNode* pOp);
......
...@@ -113,6 +113,7 @@ typedef struct STableMetaOutput { ...@@ -113,6 +113,7 @@ typedef struct STableMetaOutput {
typedef struct SDataBuf { typedef struct SDataBuf {
void *pData; void *pData;
uint32_t len; uint32_t len;
void *handle;
} SDataBuf; } SDataBuf;
typedef int32_t (*__async_send_cb_fn_t)(void* param, const SDataBuf* pMsg, int32_t code); typedef int32_t (*__async_send_cb_fn_t)(void* param, const SDataBuf* pMsg, int32_t code);
......
...@@ -246,6 +246,8 @@ int32_t* taosGetErrno(); ...@@ -246,6 +246,8 @@ int32_t* taosGetErrno();
// mnode-trans // mnode-trans
#define TSDB_CODE_MND_TRANS_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03D0) #define TSDB_CODE_MND_TRANS_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03D0)
#define TSDB_CODE_MND_TRANS_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03D1) #define TSDB_CODE_MND_TRANS_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03D1)
#define TSDB_CODE_MND_TRANS_INVALID_STAGE TAOS_DEF_ERROR_CODE(0, 0x03D2)
#define TSDB_CODE_MND_TRANS_CANT_PARALLEL TAOS_DEF_ERROR_CODE(0, 0x03D4)
// mnode-mq // mnode-mq
#define TSDB_CODE_MND_TOPIC_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03E0) #define TSDB_CODE_MND_TOPIC_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03E0)
...@@ -459,7 +461,6 @@ int32_t* taosGetErrno(); ...@@ -459,7 +461,6 @@ int32_t* taosGetErrno();
#define TSDB_CODE_PAR_GROUPBY_LACK_EXPRESSION TAOS_DEF_ERROR_CODE(0, 0x260A) //Not a GROUP BY expression #define TSDB_CODE_PAR_GROUPBY_LACK_EXPRESSION TAOS_DEF_ERROR_CODE(0, 0x260A) //Not a GROUP BY expression
#define TSDB_CODE_PAR_NOT_SELECTED_EXPRESSION TAOS_DEF_ERROR_CODE(0, 0x260B) //Not SELECTed expression #define TSDB_CODE_PAR_NOT_SELECTED_EXPRESSION TAOS_DEF_ERROR_CODE(0, 0x260B) //Not SELECTed expression
#define TSDB_CODE_PAR_NOT_SINGLE_GROUP TAOS_DEF_ERROR_CODE(0, 0x260C) //Not a single-group group function #define TSDB_CODE_PAR_NOT_SINGLE_GROUP TAOS_DEF_ERROR_CODE(0, 0x260C) //Not a single-group group function
#define TSDB_CODE_PAR_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x260D) //Out of memory
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -214,6 +214,10 @@ do { \ ...@@ -214,6 +214,10 @@ do { \
#define TSDB_SHOW_SUBQUERY_LEN 1000 #define TSDB_SHOW_SUBQUERY_LEN 1000
#define TSDB_SLOW_QUERY_SQL_LEN 512 #define TSDB_SLOW_QUERY_SQL_LEN 512
#define TSDB_TRANS_STAGE_LEN 12
#define TSDB_TRANS_DESC_LEN 16
#define TSDB_TRANS_ERROR_LEN 128
#define TSDB_STEP_NAME_LEN 32 #define TSDB_STEP_NAME_LEN 32
#define TSDB_STEP_DESC_LEN 128 #define TSDB_STEP_DESC_LEN 128
......
...@@ -395,8 +395,10 @@ static void* hbThreadFunc(void* param) { ...@@ -395,8 +395,10 @@ static void* hbThreadFunc(void* param) {
hbClearReqInfo(pAppHbMgr); hbClearReqInfo(pAppHbMgr);
break; break;
} }
tSerializeSClientHbBatchReq(buf, tlen, pReq); tSerializeSClientHbBatchReq(buf, tlen, pReq);
SMsgSendInfo *pInfo = malloc(sizeof(SMsgSendInfo)); SMsgSendInfo *pInfo = calloc(1, sizeof(SMsgSendInfo));
if (pInfo == NULL) { if (pInfo == NULL) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
tFreeClientHbBatchReq(pReq, false); tFreeClientHbBatchReq(pReq, false);
......
此差异已折叠。
...@@ -97,9 +97,9 @@ SMsgSendInfo* buildMsgInfoImpl(SRequestObj *pRequest) { ...@@ -97,9 +97,9 @@ SMsgSendInfo* buildMsgInfoImpl(SRequestObj *pRequest) {
int32_t contLen = tSerializeSRetrieveTableReq(NULL, 0, &retrieveReq); int32_t contLen = tSerializeSRetrieveTableReq(NULL, 0, &retrieveReq);
void* pReq = malloc(contLen); void* pReq = malloc(contLen);
tSerializeSRetrieveTableReq(pReq, contLen, &retrieveReq); tSerializeSRetrieveTableReq(pReq, contLen, &retrieveReq);
pMsgSendInfo->msgInfo.pData = pReq; pMsgSendInfo->msgInfo.pData = pReq;
pMsgSendInfo->msgInfo.len = contLen; pMsgSendInfo->msgInfo.len = contLen;
pMsgSendInfo->msgInfo.handle = NULL;
} else { } else {
SVShowTablesFetchReq* pFetchMsg = calloc(1, sizeof(SVShowTablesFetchReq)); SVShowTablesFetchReq* pFetchMsg = calloc(1, sizeof(SVShowTablesFetchReq));
if (pFetchMsg == NULL) { if (pFetchMsg == NULL) {
...@@ -111,6 +111,7 @@ SMsgSendInfo* buildMsgInfoImpl(SRequestObj *pRequest) { ...@@ -111,6 +111,7 @@ SMsgSendInfo* buildMsgInfoImpl(SRequestObj *pRequest) {
pMsgSendInfo->msgInfo.pData = pFetchMsg; pMsgSendInfo->msgInfo.pData = pFetchMsg;
pMsgSendInfo->msgInfo.len = sizeof(SVShowTablesFetchReq); pMsgSendInfo->msgInfo.len = sizeof(SVShowTablesFetchReq);
pMsgSendInfo->msgInfo.handle = NULL;
} }
} else { } else {
assert(pRequest != NULL); assert(pRequest != NULL);
......
...@@ -328,7 +328,7 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) { ...@@ -328,7 +328,7 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) {
SMqSubscribeCbParam param = {.rspErr = TMQ_RESP_ERR__SUCCESS, .tmq = tmq}; SMqSubscribeCbParam param = {.rspErr = TMQ_RESP_ERR__SUCCESS, .tmq = tmq};
tsem_init(&param.rspSem, 0, 0); tsem_init(&param.rspSem, 0, 0);
pRequest->body.requestMsg = (SDataBuf){.pData = buf, .len = tlen}; pRequest->body.requestMsg = (SDataBuf){.pData = buf, .len = tlen, .handle = NULL};
SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest); SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
sendInfo->param = &param; sendInfo->param = &param;
...@@ -453,7 +453,7 @@ TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, i ...@@ -453,7 +453,7 @@ TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, i
tSerializeMCreateTopicReq(buf, tlen, &req); tSerializeMCreateTopicReq(buf, tlen, &req);
/*printf("formatted: %s\n", dagStr);*/ /*printf("formatted: %s\n", dagStr);*/
pRequest->body.requestMsg = (SDataBuf){.pData = buf, .len = tlen}; pRequest->body.requestMsg = (SDataBuf){.pData = buf, .len = tlen, .handle = NULL};
pRequest->type = TDMT_MND_CREATE_TOPIC; pRequest->type = TDMT_MND_CREATE_TOPIC;
SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest); SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
...@@ -779,8 +779,10 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { ...@@ -779,8 +779,10 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) {
param->pVg = pVg; param->pVg = pVg;
tsem_init(&param->rspSem, 0, 0); tsem_init(&param->rspSem, 0, 0);
SRequestObj* pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_VND_CONSUME); SRequestObj* pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_VND_CONSUME);
pRequest->body.requestMsg = (SDataBuf){.pData = pReq, .len = sizeof(SMqConsumeReq)}; pRequest->body.requestMsg = (SDataBuf){.pData = pReq, .len = sizeof(SMqConsumeReq), .handle = NULL};
SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest); SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
sendInfo->requestObjRefId = 0; sendInfo->requestObjRefId = 0;
......
...@@ -1275,6 +1275,7 @@ int32_t tSerializeSCreateDbReq(void *buf, int32_t bufLen, SCreateDbReq *pReq) { ...@@ -1275,6 +1275,7 @@ int32_t tSerializeSCreateDbReq(void *buf, int32_t bufLen, SCreateDbReq *pReq) {
if (tEncodeI8(&encoder, pReq->update) < 0) return -1; if (tEncodeI8(&encoder, pReq->update) < 0) return -1;
if (tEncodeI8(&encoder, pReq->cacheLastRow) < 0) return -1; if (tEncodeI8(&encoder, pReq->cacheLastRow) < 0) return -1;
if (tEncodeI8(&encoder, pReq->ignoreExist) < 0) return -1; if (tEncodeI8(&encoder, pReq->ignoreExist) < 0) return -1;
if (tEncodeI8(&encoder, pReq->streamMode) < 0) return -1;
tEndEncode(&encoder); tEndEncode(&encoder);
int32_t tlen = encoder.pos; int32_t tlen = encoder.pos;
...@@ -1307,6 +1308,7 @@ int32_t tDeserializeSCreateDbReq(void *buf, int32_t bufLen, SCreateDbReq *pReq) ...@@ -1307,6 +1308,7 @@ int32_t tDeserializeSCreateDbReq(void *buf, int32_t bufLen, SCreateDbReq *pReq)
if (tDecodeI8(&decoder, &pReq->update) < 0) return -1; if (tDecodeI8(&decoder, &pReq->update) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->cacheLastRow) < 0) return -1; if (tDecodeI8(&decoder, &pReq->cacheLastRow) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->ignoreExist) < 0) return -1; if (tDecodeI8(&decoder, &pReq->ignoreExist) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->streamMode) < 0) return -1;
tEndDecode(&decoder); tEndDecode(&decoder);
tCoderClear(&decoder); tCoderClear(&decoder);
...@@ -2239,6 +2241,31 @@ int32_t tDeserializeSKillConnReq(void *buf, int32_t bufLen, SKillConnReq *pReq) ...@@ -2239,6 +2241,31 @@ int32_t tDeserializeSKillConnReq(void *buf, int32_t bufLen, SKillConnReq *pReq)
return 0; return 0;
} }
int32_t tSerializeSKillTransReq(void *buf, int32_t bufLen, SKillTransReq *pReq) {
SCoder encoder = {0};
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeI32(&encoder, pReq->transId) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tCoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSKillTransReq(void *buf, int32_t bufLen, SKillTransReq *pReq) {
SCoder decoder = {0};
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->transId) < 0) return -1;
tEndDecode(&decoder);
tCoderClear(&decoder);
return 0;
}
int32_t tSerializeSDCreateMnodeReq(void *buf, int32_t bufLen, SDCreateMnodeReq *pReq) { int32_t tSerializeSDCreateMnodeReq(void *buf, int32_t bufLen, SDCreateMnodeReq *pReq) {
SCoder encoder = {0}; SCoder encoder = {0};
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
......
...@@ -27,6 +27,7 @@ void mndCleanupDb(SMnode *pMnode); ...@@ -27,6 +27,7 @@ void mndCleanupDb(SMnode *pMnode);
SDbObj *mndAcquireDb(SMnode *pMnode, const char *db); SDbObj *mndAcquireDb(SMnode *pMnode, const char *db);
void mndReleaseDb(SMnode *pMnode, SDbObj *pDb); void mndReleaseDb(SMnode *pMnode, SDbObj *pDb);
int32_t mndValidateDbInfo(SMnode *pMnode, SDbVgVersion *pDbs, int32_t numOfDbs, void **ppRsp, int32_t *pRspLen); int32_t mndValidateDbInfo(SMnode *pMnode, SDbVgVersion *pDbs, int32_t numOfDbs, void **ppRsp, int32_t *pRspLen);
char *mnGetDbStr(char *src);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -104,6 +104,44 @@ typedef enum { ...@@ -104,6 +104,44 @@ typedef enum {
TRN_STAGE_FINISHED = 8 TRN_STAGE_FINISHED = 8
} ETrnStage; } ETrnStage;
typedef enum {
TRN_TYPE_BASIC_SCOPE = 1000,
TRN_TYPE_CREATE_USER = 1001,
TRN_TYPE_ALTER_USER = 1002,
TRN_TYPE_DROP_USER = 1003,
TRN_TYPE_CREATE_FUNC = 1004,
TRN_TYPE_DROP_FUNC = 1005,
TRN_TYPE_CREATE_SNODE = 1006,
TRN_TYPE_DROP_SNODE = 1007,
TRN_TYPE_CREATE_QNODE = 1008,
TRN_TYPE_DROP_QNODE = 1009,
TRN_TYPE_CREATE_BNODE = 1010,
TRN_TYPE_DROP_BNODE = 1011,
TRN_TYPE_CREATE_MNODE = 1012,
TRN_TYPE_DROP_MNODE = 1013,
TRN_TYPE_CREATE_TOPIC = 1014,
TRN_TYPE_DROP_TOPIC = 1015,
TRN_TYPE_SUBSCRIBE = 1016,
TRN_TYPE_REBALANCE = 1017,
TRN_TYPE_BASIC_SCOPE_END,
TRN_TYPE_GLOBAL_SCOPE = 2000,
TRN_TYPE_CREATE_DNODE = 2001,
TRN_TYPE_DROP_DNODE = 2002,
TRN_TYPE_GLOBAL_SCOPE_END,
TRN_TYPE_DB_SCOPE = 3000,
TRN_TYPE_CREATE_DB = 3001,
TRN_TYPE_ALTER_DB = 3002,
TRN_TYPE_DROP_DB = 3003,
TRN_TYPE_SPLIT_VGROUP = 3004,
TRN_TYPE_MERGE_VGROUP = 3015,
TRN_TYPE_DB_SCOPE_END,
TRN_TYPE_STB_SCOPE = 4000,
TRN_TYPE_CREATE_STB = 4001,
TRN_TYPE_ALTER_STB = 4002,
TRN_TYPE_DROP_STB = 4003,
TRN_TYPE_STB_SCOPE_END,
} ETrnType;
typedef enum { TRN_POLICY_ROLLBACK = 0, TRN_POLICY_RETRY = 1 } ETrnPolicy; typedef enum { TRN_POLICY_ROLLBACK = 0, TRN_POLICY_RETRY = 1 } ETrnPolicy;
typedef enum { typedef enum {
...@@ -124,6 +162,7 @@ typedef struct { ...@@ -124,6 +162,7 @@ typedef struct {
int32_t id; int32_t id;
ETrnStage stage; ETrnStage stage;
ETrnPolicy policy; ETrnPolicy policy;
ETrnType transType;
int32_t code; int32_t code;
int32_t failedTimes; int32_t failedTimes;
void* rpcHandle; void* rpcHandle;
...@@ -135,6 +174,11 @@ typedef struct { ...@@ -135,6 +174,11 @@ typedef struct {
SArray* commitLogs; SArray* commitLogs;
SArray* redoActions; SArray* redoActions;
SArray* undoActions; SArray* undoActions;
int64_t createdTime;
int64_t lastExecTime;
uint64_t dbUid;
char dbname[TSDB_DB_FNAME_LEN];
char lastError[TSDB_TRANS_ERROR_LEN];
} STrans; } STrans;
typedef struct { typedef struct {
......
...@@ -36,7 +36,7 @@ typedef struct { ...@@ -36,7 +36,7 @@ typedef struct {
int32_t mndInitTrans(SMnode *pMnode); int32_t mndInitTrans(SMnode *pMnode);
void mndCleanupTrans(SMnode *pMnode); void mndCleanupTrans(SMnode *pMnode);
STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, const SRpcMsg *pReq); STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnType type, const SRpcMsg *pReq);
void mndTransDrop(STrans *pTrans); void mndTransDrop(STrans *pTrans);
int32_t mndTransAppendRedolog(STrans *pTrans, SSdbRaw *pRaw); int32_t mndTransAppendRedolog(STrans *pTrans, SSdbRaw *pRaw);
int32_t mndTransAppendUndolog(STrans *pTrans, SSdbRaw *pRaw); int32_t mndTransAppendUndolog(STrans *pTrans, SSdbRaw *pRaw);
...@@ -44,6 +44,7 @@ int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw); ...@@ -44,6 +44,7 @@ int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw);
int32_t mndTransAppendRedoAction(STrans *pTrans, STransAction *pAction); int32_t mndTransAppendRedoAction(STrans *pTrans, STransAction *pAction);
int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction); int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction);
void mndTransSetRpcRsp(STrans *pTrans, void *pCont, int32_t contLen); void mndTransSetRpcRsp(STrans *pTrans, void *pCont, int32_t contLen);
void mndTransSetDbInfo(STrans *pTrans, SDbObj *pDb);
int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans); int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans);
void mndTransProcessRsp(SMnodeMsg *pRsp); void mndTransProcessRsp(SMnodeMsg *pRsp);
......
...@@ -21,7 +21,7 @@ ...@@ -21,7 +21,7 @@
#include "mndTrans.h" #include "mndTrans.h"
#include "mndUser.h" #include "mndUser.h"
#define TSDB_BNODE_VER_NUMBER 1 #define TSDB_BNODE_VER_NUMBER 1
#define TSDB_BNODE_RESERVE_SIZE 64 #define TSDB_BNODE_RESERVE_SIZE 64
static SSdbRaw *mndBnodeActionEncode(SBnodeObj *pObj); static SSdbRaw *mndBnodeActionEncode(SBnodeObj *pObj);
...@@ -248,7 +248,7 @@ static int32_t mndCreateBnode(SMnode *pMnode, SMnodeMsg *pReq, SDnodeObj *pDnode ...@@ -248,7 +248,7 @@ static int32_t mndCreateBnode(SMnode *pMnode, SMnodeMsg *pReq, SDnodeObj *pDnode
bnodeObj.createdTime = taosGetTimestampMs(); bnodeObj.createdTime = taosGetTimestampMs();
bnodeObj.updateTime = bnodeObj.createdTime; bnodeObj.updateTime = bnodeObj.createdTime;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_BNODE, &pReq->rpcMsg);
if (pTrans == NULL) goto CREATE_BNODE_OVER; if (pTrans == NULL) goto CREATE_BNODE_OVER;
mDebug("trans:%d, used to create bnode:%d", pTrans->id, pCreate->dnodeId); mDebug("trans:%d, used to create bnode:%d", pTrans->id, pCreate->dnodeId);
...@@ -366,7 +366,7 @@ static int32_t mndSetDropBnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SBn ...@@ -366,7 +366,7 @@ static int32_t mndSetDropBnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SBn
static int32_t mndDropBnode(SMnode *pMnode, SMnodeMsg *pReq, SBnodeObj *pObj) { static int32_t mndDropBnode(SMnode *pMnode, SMnodeMsg *pReq, SBnodeObj *pObj) {
int32_t code = -1; int32_t code = -1;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pReq->rpcMsg); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_DROP_BNODE, &pReq->rpcMsg);
if (pTrans == NULL) goto DROP_BNODE_OVER; if (pTrans == NULL) goto DROP_BNODE_OVER;
mDebug("trans:%d, used to drop bnode:%d", pTrans->id, pObj->id); mDebug("trans:%d, used to drop bnode:%d", pTrans->id, pObj->id);
......
...@@ -23,7 +23,7 @@ ...@@ -23,7 +23,7 @@
#include "mndUser.h" #include "mndUser.h"
#include "mndVgroup.h" #include "mndVgroup.h"
#define TSDB_DB_VER_NUMBER 1 #define TSDB_DB_VER_NUMBER 1
#define TSDB_DB_RESERVE_SIZE 64 #define TSDB_DB_RESERVE_SIZE 64
static SSdbRaw *mndDbActionEncode(SDbObj *pDb); static SSdbRaw *mndDbActionEncode(SDbObj *pDb);
...@@ -434,11 +434,12 @@ static int32_t mndCreateDb(SMnode *pMnode, SMnodeMsg *pReq, SCreateDbReq *pCreat ...@@ -434,11 +434,12 @@ static int32_t mndCreateDb(SMnode *pMnode, SMnodeMsg *pReq, SCreateDbReq *pCreat
} }
int32_t code = -1; int32_t code = -1;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_DB, &pReq->rpcMsg);
if (pTrans == NULL) goto CREATE_DB_OVER; if (pTrans == NULL) goto CREATE_DB_OVER;
mDebug("trans:%d, used to create db:%s", pTrans->id, pCreate->db); mDebug("trans:%d, used to create db:%s", pTrans->id, pCreate->db);
mndTransSetDbInfo(pTrans, &dbObj);
if (mndSetCreateDbRedoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto CREATE_DB_OVER; if (mndSetCreateDbRedoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto CREATE_DB_OVER;
if (mndSetCreateDbUndoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto CREATE_DB_OVER; if (mndSetCreateDbUndoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto CREATE_DB_OVER;
if (mndSetCreateDbCommitLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto CREATE_DB_OVER; if (mndSetCreateDbCommitLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto CREATE_DB_OVER;
...@@ -620,11 +621,12 @@ static int32_t mndSetUpdateDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj ...@@ -620,11 +621,12 @@ static int32_t mndSetUpdateDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
static int32_t mndUpdateDb(SMnode *pMnode, SMnodeMsg *pReq, SDbObj *pOld, SDbObj *pNew) { static int32_t mndUpdateDb(SMnode *pMnode, SMnodeMsg *pReq, SDbObj *pOld, SDbObj *pNew) {
int32_t code = -1; int32_t code = -1;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pReq->rpcMsg); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_ALTER_DB, &pReq->rpcMsg);
if (pTrans == NULL) goto UPDATE_DB_OVER; if (pTrans == NULL) goto UPDATE_DB_OVER;
mDebug("trans:%d, used to update db:%s", pTrans->id, pOld->name); mDebug("trans:%d, used to update db:%s", pTrans->id, pOld->name);
mndTransSetDbInfo(pTrans, pOld);
if (mndSetUpdateDbRedoLogs(pMnode, pTrans, pOld, pNew) != 0) goto UPDATE_DB_OVER; if (mndSetUpdateDbRedoLogs(pMnode, pTrans, pOld, pNew) != 0) goto UPDATE_DB_OVER;
if (mndSetUpdateDbCommitLogs(pMnode, pTrans, pOld, pNew) != 0) goto UPDATE_DB_OVER; if (mndSetUpdateDbCommitLogs(pMnode, pTrans, pOld, pNew) != 0) goto UPDATE_DB_OVER;
if (mndSetUpdateDbRedoActions(pMnode, pTrans, pOld, pNew) != 0) goto UPDATE_DB_OVER; if (mndSetUpdateDbRedoActions(pMnode, pTrans, pOld, pNew) != 0) goto UPDATE_DB_OVER;
...@@ -799,10 +801,11 @@ static int32_t mndSetDropDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *p ...@@ -799,10 +801,11 @@ static int32_t mndSetDropDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *p
static int32_t mndDropDb(SMnode *pMnode, SMnodeMsg *pReq, SDbObj *pDb) { static int32_t mndDropDb(SMnode *pMnode, SMnodeMsg *pReq, SDbObj *pDb) {
int32_t code = -1; int32_t code = -1;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pReq->rpcMsg); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_DROP_DB, &pReq->rpcMsg);
if (pTrans == NULL) goto DROP_DB_OVER; if (pTrans == NULL) goto DROP_DB_OVER;
mDebug("trans:%d, used to drop db:%s", pTrans->id, pDb->name); mDebug("trans:%d, used to drop db:%s", pTrans->id, pDb->name);
mndTransSetDbInfo(pTrans, pDb);
if (mndSetDropDbRedoLogs(pMnode, pTrans, pDb) != 0) goto DROP_DB_OVER; if (mndSetDropDbRedoLogs(pMnode, pTrans, pDb) != 0) goto DROP_DB_OVER;
if (mndSetDropDbCommitLogs(pMnode, pTrans, pDb) != 0) goto DROP_DB_OVER; if (mndSetDropDbCommitLogs(pMnode, pTrans, pDb) != 0) goto DROP_DB_OVER;
...@@ -914,6 +917,8 @@ static void mndBuildDBVgroupInfo(SDbObj *pDb, SMnode *pMnode, SArray *pVgList) { ...@@ -914,6 +917,8 @@ static void mndBuildDBVgroupInfo(SDbObj *pDb, SMnode *pMnode, SArray *pVgList) {
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
} }
sdbCancelFetch(pSdb, pIter);
} }
static int32_t mndProcessUseDbReq(SMnodeMsg *pReq) { static int32_t mndProcessUseDbReq(SMnodeMsg *pReq) {
......
...@@ -440,7 +440,7 @@ static int32_t mndCreateDnode(SMnode *pMnode, SMnodeMsg *pReq, SCreateDnodeReq * ...@@ -440,7 +440,7 @@ static int32_t mndCreateDnode(SMnode *pMnode, SMnodeMsg *pReq, SCreateDnodeReq *
memcpy(dnodeObj.fqdn, pCreate->fqdn, TSDB_FQDN_LEN); memcpy(dnodeObj.fqdn, pCreate->fqdn, TSDB_FQDN_LEN);
snprintf(dnodeObj.ep, TSDB_EP_LEN, "%s:%u", dnodeObj.fqdn, dnodeObj.port); snprintf(dnodeObj.ep, TSDB_EP_LEN, "%s:%u", dnodeObj.fqdn, dnodeObj.port);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_DNODE, &pReq->rpcMsg);
if (pTrans == NULL) { if (pTrans == NULL) {
mError("dnode:%s, failed to create since %s", dnodeObj.ep, terrstr()); mError("dnode:%s, failed to create since %s", dnodeObj.ep, terrstr());
return -1; return -1;
...@@ -516,7 +516,7 @@ CREATE_DNODE_OVER: ...@@ -516,7 +516,7 @@ CREATE_DNODE_OVER:
} }
static int32_t mndDropDnode(SMnode *pMnode, SMnodeMsg *pReq, SDnodeObj *pDnode) { static int32_t mndDropDnode(SMnode *pMnode, SMnodeMsg *pReq, SDnodeObj *pDnode) {
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_DROP_DNODE, &pReq->rpcMsg);
if (pTrans == NULL) { if (pTrans == NULL) {
mError("dnode:%d, failed to drop since %s", pDnode->id, terrstr()); mError("dnode:%d, failed to drop since %s", pDnode->id, terrstr());
return -1; return -1;
......
...@@ -21,7 +21,7 @@ ...@@ -21,7 +21,7 @@
#include "mndTrans.h" #include "mndTrans.h"
#include "mndUser.h" #include "mndUser.h"
#define SDB_FUNC_VER 1 #define SDB_FUNC_VER 1
#define SDB_FUNC_RESERVE_SIZE 64 #define SDB_FUNC_RESERVE_SIZE 64
static SSdbRaw *mndFuncActionEncode(SFuncObj *pFunc); static SSdbRaw *mndFuncActionEncode(SFuncObj *pFunc);
...@@ -206,7 +206,7 @@ static int32_t mndCreateFunc(SMnode *pMnode, SMnodeMsg *pReq, SCreateFuncReq *pC ...@@ -206,7 +206,7 @@ static int32_t mndCreateFunc(SMnode *pMnode, SMnodeMsg *pReq, SCreateFuncReq *pC
memcpy(func.pComment, pCreate->pComment, pCreate->commentSize); memcpy(func.pComment, pCreate->pComment, pCreate->commentSize);
memcpy(func.pCode, pCreate->pCode, func.codeSize); memcpy(func.pCode, pCreate->pCode, func.codeSize);
pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg); pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_FUNC, &pReq->rpcMsg);
if (pTrans == NULL) goto CREATE_FUNC_OVER; if (pTrans == NULL) goto CREATE_FUNC_OVER;
mDebug("trans:%d, used to create func:%s", pTrans->id, pCreate->name); mDebug("trans:%d, used to create func:%s", pTrans->id, pCreate->name);
...@@ -236,7 +236,7 @@ CREATE_FUNC_OVER: ...@@ -236,7 +236,7 @@ CREATE_FUNC_OVER:
static int32_t mndDropFunc(SMnode *pMnode, SMnodeMsg *pReq, SFuncObj *pFunc) { static int32_t mndDropFunc(SMnode *pMnode, SMnodeMsg *pReq, SFuncObj *pFunc) {
int32_t code = -1; int32_t code = -1;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_DROP_FUNC, &pReq->rpcMsg);
if (pTrans == NULL) goto DROP_FUNC_OVER; if (pTrans == NULL) goto DROP_FUNC_OVER;
mDebug("trans:%d, used to drop user:%s", pTrans->id, pFunc->name); mDebug("trans:%d, used to drop user:%s", pTrans->id, pFunc->name);
......
...@@ -21,7 +21,7 @@ ...@@ -21,7 +21,7 @@
#include "mndTrans.h" #include "mndTrans.h"
#include "mndUser.h" #include "mndUser.h"
#define TSDB_MNODE_VER_NUMBER 1 #define TSDB_MNODE_VER_NUMBER 1
#define TSDB_MNODE_RESERVE_SIZE 64 #define TSDB_MNODE_RESERVE_SIZE 64
static int32_t mndCreateDefaultMnode(SMnode *pMnode); static int32_t mndCreateDefaultMnode(SMnode *pMnode);
...@@ -359,7 +359,7 @@ static int32_t mndCreateMnode(SMnode *pMnode, SMnodeMsg *pReq, SDnodeObj *pDnode ...@@ -359,7 +359,7 @@ static int32_t mndCreateMnode(SMnode *pMnode, SMnodeMsg *pReq, SDnodeObj *pDnode
mnodeObj.createdTime = taosGetTimestampMs(); mnodeObj.createdTime = taosGetTimestampMs();
mnodeObj.updateTime = mnodeObj.createdTime; mnodeObj.updateTime = mnodeObj.createdTime;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pReq->rpcMsg); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_CREATE_MNODE, &pReq->rpcMsg);
if (pTrans == NULL) goto CREATE_MNODE_OVER; if (pTrans == NULL) goto CREATE_MNODE_OVER;
mDebug("trans:%d, used to create mnode:%d", pTrans->id, pCreate->dnodeId); mDebug("trans:%d, used to create mnode:%d", pTrans->id, pCreate->dnodeId);
...@@ -526,7 +526,7 @@ static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnode ...@@ -526,7 +526,7 @@ static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnode
static int32_t mndDropMnode(SMnode *pMnode, SMnodeMsg *pReq, SMnodeObj *pObj) { static int32_t mndDropMnode(SMnode *pMnode, SMnodeMsg *pReq, SMnodeObj *pObj) {
int32_t code = -1; int32_t code = -1;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pReq->rpcMsg); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_DROP_MNODE, &pReq->rpcMsg);
if (pTrans == NULL) goto DROP_MNODE_OVER; if (pTrans == NULL) goto DROP_MNODE_OVER;
mDebug("trans:%d, used to drop mnode:%d", pTrans->id, pObj->id); mDebug("trans:%d, used to drop mnode:%d", pTrans->id, pObj->id);
......
...@@ -248,7 +248,7 @@ static int32_t mndCreateQnode(SMnode *pMnode, SMnodeMsg *pReq, SDnodeObj *pDnode ...@@ -248,7 +248,7 @@ static int32_t mndCreateQnode(SMnode *pMnode, SMnodeMsg *pReq, SDnodeObj *pDnode
qnodeObj.createdTime = taosGetTimestampMs(); qnodeObj.createdTime = taosGetTimestampMs();
qnodeObj.updateTime = qnodeObj.createdTime; qnodeObj.updateTime = qnodeObj.createdTime;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_QNODE, &pReq->rpcMsg);
if (pTrans == NULL) goto CREATE_QNODE_OVER; if (pTrans == NULL) goto CREATE_QNODE_OVER;
mDebug("trans:%d, used to create qnode:%d", pTrans->id, pCreate->dnodeId); mDebug("trans:%d, used to create qnode:%d", pTrans->id, pCreate->dnodeId);
...@@ -366,7 +366,7 @@ static int32_t mndSetDropQnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SQn ...@@ -366,7 +366,7 @@ static int32_t mndSetDropQnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SQn
static int32_t mndDropQnode(SMnode *pMnode, SMnodeMsg *pReq, SQnodeObj *pObj) { static int32_t mndDropQnode(SMnode *pMnode, SMnodeMsg *pReq, SQnodeObj *pObj) {
int32_t code = -1; int32_t code = -1;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pReq->rpcMsg); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_DROP_QNODE, &pReq->rpcMsg);
if (pTrans == NULL) goto DROP_QNODE_OVER; if (pTrans == NULL) goto DROP_QNODE_OVER;
mDebug("trans:%d, used to drop qnode:%d", pTrans->id, pObj->id); mDebug("trans:%d, used to drop qnode:%d", pTrans->id, pObj->id);
......
...@@ -295,8 +295,8 @@ char *mndShowStr(int32_t showType) { ...@@ -295,8 +295,8 @@ char *mndShowStr(int32_t showType) {
return "show configs"; return "show configs";
case TSDB_MGMT_TABLE_CONNS: case TSDB_MGMT_TABLE_CONNS:
return "show connections"; return "show connections";
case TSDB_MGMT_TABLE_SCORES: case TSDB_MGMT_TABLE_TRANS:
return "show scores"; return "show trans";
case TSDB_MGMT_TABLE_GRANTS: case TSDB_MGMT_TABLE_GRANTS:
return "show grants"; return "show grants";
case TSDB_MGMT_TABLE_VNODES: case TSDB_MGMT_TABLE_VNODES:
......
...@@ -21,7 +21,7 @@ ...@@ -21,7 +21,7 @@
#include "mndTrans.h" #include "mndTrans.h"
#include "mndUser.h" #include "mndUser.h"
#define TSDB_SNODE_VER_NUMBER 1 #define TSDB_SNODE_VER_NUMBER 1
#define TSDB_SNODE_RESERVE_SIZE 64 #define TSDB_SNODE_RESERVE_SIZE 64
static SSdbRaw *mndSnodeActionEncode(SSnodeObj *pObj); static SSdbRaw *mndSnodeActionEncode(SSnodeObj *pObj);
...@@ -248,7 +248,7 @@ static int32_t mndCreateSnode(SMnode *pMnode, SMnodeMsg *pReq, SDnodeObj *pDnode ...@@ -248,7 +248,7 @@ static int32_t mndCreateSnode(SMnode *pMnode, SMnodeMsg *pReq, SDnodeObj *pDnode
snodeObj.createdTime = taosGetTimestampMs(); snodeObj.createdTime = taosGetTimestampMs();
snodeObj.updateTime = snodeObj.createdTime; snodeObj.updateTime = snodeObj.createdTime;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_SNODE, &pReq->rpcMsg);
if (pTrans == NULL) goto CREATE_SNODE_OVER; if (pTrans == NULL) goto CREATE_SNODE_OVER;
mDebug("trans:%d, used to create snode:%d", pTrans->id, pCreate->dnodeId); mDebug("trans:%d, used to create snode:%d", pTrans->id, pCreate->dnodeId);
...@@ -368,7 +368,7 @@ static int32_t mndSetDropSnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SSn ...@@ -368,7 +368,7 @@ static int32_t mndSetDropSnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SSn
static int32_t mndDropSnode(SMnode *pMnode, SMnodeMsg *pReq, SSnodeObj *pObj) { static int32_t mndDropSnode(SMnode *pMnode, SMnodeMsg *pReq, SSnodeObj *pObj) {
int32_t code = -1; int32_t code = -1;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pReq->rpcMsg); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_DROP_SNODE, &pReq->rpcMsg);
if (pTrans == NULL) goto DROP_SNODE_OVER; if (pTrans == NULL) goto DROP_SNODE_OVER;
mDebug("trans:%d, used to drop snode:%d", pTrans->id, pObj->id); mDebug("trans:%d, used to drop snode:%d", pTrans->id, pObj->id);
......
...@@ -420,7 +420,10 @@ static int32_t mndSetCreateStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj ...@@ -420,7 +420,10 @@ static int32_t mndSetCreateStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
while (1) { while (1) {
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
if (pIter == NULL) break; if (pIter == NULL) break;
if (pVgroup->dbUid != pDb->uid) continue; if (pVgroup->dbUid != pDb->uid) {
sdbRelease(pSdb, pVgroup);
continue;
}
void *pReq = mndBuildVCreateStbReq(pMnode, pVgroup, pStb, &contLen); void *pReq = mndBuildVCreateStbReq(pMnode, pVgroup, pStb, &contLen);
if (pReq == NULL) { if (pReq == NULL) {
...@@ -455,7 +458,10 @@ static int32_t mndSetCreateStbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj ...@@ -455,7 +458,10 @@ static int32_t mndSetCreateStbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj
while (1) { while (1) {
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
if (pIter == NULL) break; if (pIter == NULL) break;
if (pVgroup->dbUid != pDb->uid) continue; if (pVgroup->dbUid != pDb->uid) {
sdbRelease(pSdb, pVgroup);
continue;
}
int32_t contLen = 0; int32_t contLen = 0;
void *pReq = mndBuildVDropStbReq(pMnode, pVgroup, pStb, &contLen); void *pReq = mndBuildVDropStbReq(pMnode, pVgroup, pStb, &contLen);
...@@ -524,10 +530,11 @@ static int32_t mndCreateStb(SMnode *pMnode, SMnodeMsg *pReq, SMCreateStbReq *pCr ...@@ -524,10 +530,11 @@ static int32_t mndCreateStb(SMnode *pMnode, SMnodeMsg *pReq, SMCreateStbReq *pCr
} }
int32_t code = -1; int32_t code = -1;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_STB, &pReq->rpcMsg);
if (pTrans == NULL) goto CREATE_STB_OVER; if (pTrans == NULL) goto CREATE_STB_OVER;
mDebug("trans:%d, used to create stb:%s", pTrans->id, pCreate->name); mDebug("trans:%d, used to create stb:%s", pTrans->id, pCreate->name);
mndTransSetDbInfo(pTrans, pDb);
if (mndSetCreateStbRedoLogs(pMnode, pTrans, pDb, &stbObj) != 0) goto CREATE_STB_OVER; if (mndSetCreateStbRedoLogs(pMnode, pTrans, pDb, &stbObj) != 0) goto CREATE_STB_OVER;
if (mndSetCreateStbUndoLogs(pMnode, pTrans, pDb, &stbObj) != 0) goto CREATE_STB_OVER; if (mndSetCreateStbUndoLogs(pMnode, pTrans, pDb, &stbObj) != 0) goto CREATE_STB_OVER;
...@@ -942,7 +949,10 @@ static int32_t mndSetAlterStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj ...@@ -942,7 +949,10 @@ static int32_t mndSetAlterStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
while (1) { while (1) {
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
if (pIter == NULL) break; if (pIter == NULL) break;
if (pVgroup->dbUid != pDb->uid) continue; if (pVgroup->dbUid != pDb->uid) {
sdbRelease(pSdb, pVgroup);
continue;
}
void *pReq = mndBuildVCreateStbReq(pMnode, pVgroup, pStb, &contLen); void *pReq = mndBuildVCreateStbReq(pMnode, pVgroup, pStb, &contLen);
if (pReq == NULL) { if (pReq == NULL) {
...@@ -1012,10 +1022,11 @@ static int32_t mndAlterStb(SMnode *pMnode, SMnodeMsg *pReq, const SMAltertbReq * ...@@ -1012,10 +1022,11 @@ static int32_t mndAlterStb(SMnode *pMnode, SMnodeMsg *pReq, const SMAltertbReq *
if (code != 0) goto ALTER_STB_OVER; if (code != 0) goto ALTER_STB_OVER;
code = -1; code = -1;
pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pReq->rpcMsg); pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_ALTER_STB, &pReq->rpcMsg);
if (pTrans == NULL) goto ALTER_STB_OVER; if (pTrans == NULL) goto ALTER_STB_OVER;
mDebug("trans:%d, used to alter stb:%s", pTrans->id, pAlter->name); mDebug("trans:%d, used to alter stb:%s", pTrans->id, pAlter->name);
mndTransSetDbInfo(pTrans, pDb);
if (mndSetAlterStbRedoLogs(pMnode, pTrans, pDb, &stbObj) != 0) goto ALTER_STB_OVER; if (mndSetAlterStbRedoLogs(pMnode, pTrans, pDb, &stbObj) != 0) goto ALTER_STB_OVER;
if (mndSetAlterStbCommitLogs(pMnode, pTrans, pDb, &stbObj) != 0) goto ALTER_STB_OVER; if (mndSetAlterStbCommitLogs(pMnode, pTrans, pDb, &stbObj) != 0) goto ALTER_STB_OVER;
...@@ -1116,7 +1127,10 @@ static int32_t mndSetDropStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj * ...@@ -1116,7 +1127,10 @@ static int32_t mndSetDropStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *
while (1) { while (1) {
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
if (pIter == NULL) break; if (pIter == NULL) break;
if (pVgroup->dbUid != pDb->uid) continue; if (pVgroup->dbUid != pDb->uid) {
sdbRelease(pSdb, pVgroup);
continue;
}
int32_t contLen = 0; int32_t contLen = 0;
void *pReq = mndBuildVDropStbReq(pMnode, pVgroup, pStb, &contLen); void *pReq = mndBuildVDropStbReq(pMnode, pVgroup, pStb, &contLen);
...@@ -1147,10 +1161,11 @@ static int32_t mndSetDropStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj * ...@@ -1147,10 +1161,11 @@ static int32_t mndSetDropStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *
static int32_t mndDropStb(SMnode *pMnode, SMnodeMsg *pReq, SDbObj *pDb, SStbObj *pStb) { static int32_t mndDropStb(SMnode *pMnode, SMnodeMsg *pReq, SDbObj *pDb, SStbObj *pStb) {
int32_t code = -1; int32_t code = -1;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK,TRN_TYPE_DROP_STB, &pReq->rpcMsg);
if (pTrans == NULL) goto DROP_STB_OVER; if (pTrans == NULL) goto DROP_STB_OVER;
mDebug("trans:%d, used to drop stb:%s", pTrans->id, pStb->name); mDebug("trans:%d, used to drop stb:%s", pTrans->id, pStb->name);
mndTransSetDbInfo(pTrans, pDb);
if (mndSetDropStbRedoLogs(pMnode, pTrans, pStb) != 0) goto DROP_STB_OVER; if (mndSetDropStbRedoLogs(pMnode, pTrans, pStb) != 0) goto DROP_STB_OVER;
if (mndSetDropStbCommitLogs(pMnode, pTrans, pStb) != 0) goto DROP_STB_OVER; if (mndSetDropStbCommitLogs(pMnode, pTrans, pStb) != 0) goto DROP_STB_OVER;
......
...@@ -135,8 +135,6 @@ static int32_t mndBuildRebalanceMsg(void **pBuf, int32_t *pLen, const SMqConsume ...@@ -135,8 +135,6 @@ static int32_t mndBuildRebalanceMsg(void **pBuf, int32_t *pLen, const SMqConsume
static int32_t mndPersistRebalanceMsg(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp) { static int32_t mndPersistRebalanceMsg(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp) {
ASSERT(pConsumerEp->oldConsumerId != -1); ASSERT(pConsumerEp->oldConsumerId != -1);
int32_t vgId = pConsumerEp->vgId;
SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
void *buf; void *buf;
int32_t tlen; int32_t tlen;
...@@ -144,6 +142,9 @@ static int32_t mndPersistRebalanceMsg(SMnode *pMnode, STrans *pTrans, const SMqC ...@@ -144,6 +142,9 @@ static int32_t mndPersistRebalanceMsg(SMnode *pMnode, STrans *pTrans, const SMqC
return -1; return -1;
} }
int32_t vgId = pConsumerEp->vgId;
SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
STransAction action = {0}; STransAction action = {0};
action.epSet = mndGetVgroupEpset(pMnode, pVgObj); action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
action.pCont = buf; action.pCont = buf;
...@@ -181,15 +182,15 @@ static int32_t mndBuildCancelConnReq(void **pBuf, int32_t *pLen, const SMqConsum ...@@ -181,15 +182,15 @@ static int32_t mndBuildCancelConnReq(void **pBuf, int32_t *pLen, const SMqConsum
} }
static int32_t mndPersistCancelConnReq(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp) { static int32_t mndPersistCancelConnReq(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp) {
int32_t vgId = pConsumerEp->vgId;
SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
void *buf; void *buf;
int32_t tlen; int32_t tlen;
if (mndBuildCancelConnReq(&buf, &tlen, pConsumerEp) < 0) { if (mndBuildCancelConnReq(&buf, &tlen, pConsumerEp) < 0) {
return -1; return -1;
} }
int32_t vgId = pConsumerEp->vgId;
SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
STransAction action = {0}; STransAction action = {0};
action.epSet = mndGetVgroupEpset(pMnode, pVgObj); action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
action.pCont = buf; action.pCont = buf;
...@@ -408,8 +409,8 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) { ...@@ -408,8 +409,8 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) { static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode; SMnode *pMnode = pMsg->pMnode;
SMqDoRebalanceMsg *pReq = (SMqDoRebalanceMsg *)pMsg->rpcMsg.pCont; SMqDoRebalanceMsg *pReq = pMsg->rpcMsg.pCont;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_REBALANCE, &pMsg->rpcMsg);
void *pIter = NULL; void *pIter = NULL;
mInfo("mq rebalance start"); mInfo("mq rebalance start");
...@@ -762,7 +763,10 @@ static int32_t mndInitUnassignedVg(SMnode *pMnode, const SMqTopicObj *pTopic, SM ...@@ -762,7 +763,10 @@ static int32_t mndInitUnassignedVg(SMnode *pMnode, const SMqTopicObj *pTopic, SM
while (1) { while (1) {
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
if (pIter == NULL) break; if (pIter == NULL) break;
if (pVgroup->dbUid != pTopic->dbUid) continue; if (pVgroup->dbUid != pTopic->dbUid) {
sdbRelease(pSdb, pVgroup);
continue;
}
pSub->vgNum++; pSub->vgNum++;
plan->execNode.nodeId = pVgroup->vgId; plan->execNode.nodeId = pVgroup->vgId;
...@@ -796,7 +800,6 @@ static int32_t mndPersistMqSetConnReq(SMnode *pMnode, STrans *pTrans, const SMqT ...@@ -796,7 +800,6 @@ static int32_t mndPersistMqSetConnReq(SMnode *pMnode, STrans *pTrans, const SMqT
const SMqConsumerEp *pConsumerEp) { const SMqConsumerEp *pConsumerEp) {
ASSERT(pConsumerEp->oldConsumerId == -1); ASSERT(pConsumerEp->oldConsumerId == -1);
int32_t vgId = pConsumerEp->vgId; int32_t vgId = pConsumerEp->vgId;
SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
SMqSetCVgReq req = { SMqSetCVgReq req = {
.vgId = vgId, .vgId = vgId,
...@@ -824,6 +827,8 @@ static int32_t mndPersistMqSetConnReq(SMnode *pMnode, STrans *pTrans, const SMqT ...@@ -824,6 +827,8 @@ static int32_t mndPersistMqSetConnReq(SMnode *pMnode, STrans *pTrans, const SMqT
void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
tEncodeSMqSetCVgReq(&abuf, &req); tEncodeSMqSetCVgReq(&abuf, &req);
SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
STransAction action = {0}; STransAction action = {0};
action.epSet = mndGetVgroupEpset(pMnode, pVgObj); action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
action.pCont = buf; action.pCont = buf;
...@@ -1004,7 +1009,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { ...@@ -1004,7 +1009,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
oldTopicNum = taosArrayGetSize(oldSub); oldTopicNum = taosArrayGetSize(oldSub);
} }
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_SUBSCRIBE, &pMsg->rpcMsg);
if (pTrans == NULL) { if (pTrans == NULL) {
// TODO: free memory // TODO: free memory
return -1; return -1;
......
...@@ -252,7 +252,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SMnodeMsg *pReq, SMCreateTopicReq ...@@ -252,7 +252,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SMnodeMsg *pReq, SMCreateTopicReq
topicObj.logicalPlan = pCreate->logicalPlan; topicObj.logicalPlan = pCreate->logicalPlan;
topicObj.sqlLen = strlen(pCreate->sql); topicObj.sqlLen = strlen(pCreate->sql);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_TOPIC, &pReq->rpcMsg);
if (pTrans == NULL) { if (pTrans == NULL) {
mError("topic:%s, failed to create since %s", pCreate->name, terrstr()); mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
return -1; return -1;
...@@ -343,7 +343,7 @@ CREATE_TOPIC_OVER: ...@@ -343,7 +343,7 @@ CREATE_TOPIC_OVER:
} }
static int32_t mndDropTopic(SMnode *pMnode, SMnodeMsg *pReq, SMqTopicObj *pTopic) { static int32_t mndDropTopic(SMnode *pMnode, SMnodeMsg *pReq, SMqTopicObj *pTopic) {
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_DROP_TOPIC, &pReq->rpcMsg);
if (pTrans == NULL) { if (pTrans == NULL) {
mError("topic:%s, failed to drop since %s", pTopic->name, terrstr()); mError("topic:%s, failed to drop since %s", pTopic->name, terrstr());
return -1; return -1;
......
...@@ -270,7 +270,7 @@ static int32_t mndCreateUser(SMnode *pMnode, char *acct, SCreateUserReq *pCreate ...@@ -270,7 +270,7 @@ static int32_t mndCreateUser(SMnode *pMnode, char *acct, SCreateUserReq *pCreate
userObj.updateTime = userObj.createdTime; userObj.updateTime = userObj.createdTime;
userObj.superUser = pCreate->superUser; userObj.superUser = pCreate->superUser;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK,TRN_TYPE_CREATE_USER, &pReq->rpcMsg);
if (pTrans == NULL) { if (pTrans == NULL) {
mError("user:%s, failed to create since %s", pCreate->user, terrstr()); mError("user:%s, failed to create since %s", pCreate->user, terrstr());
return -1; return -1;
...@@ -350,7 +350,7 @@ CREATE_USER_OVER: ...@@ -350,7 +350,7 @@ CREATE_USER_OVER:
} }
static int32_t mndUpdateUser(SMnode *pMnode, SUserObj *pOld, SUserObj *pNew, SMnodeMsg *pReq) { static int32_t mndUpdateUser(SMnode *pMnode, SUserObj *pOld, SUserObj *pNew, SMnodeMsg *pReq) {
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_ALTER_USER,&pReq->rpcMsg);
if (pTrans == NULL) { if (pTrans == NULL) {
mError("user:%s, failed to update since %s", pOld->user, terrstr()); mError("user:%s, failed to update since %s", pOld->user, terrstr());
return -1; return -1;
...@@ -511,7 +511,7 @@ ALTER_USER_OVER: ...@@ -511,7 +511,7 @@ ALTER_USER_OVER:
} }
static int32_t mndDropUser(SMnode *pMnode, SMnodeMsg *pReq, SUserObj *pUser) { static int32_t mndDropUser(SMnode *pMnode, SMnodeMsg *pReq, SUserObj *pUser) {
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK,TRN_TYPE_DROP_USER, &pReq->rpcMsg);
if (pTrans == NULL) { if (pTrans == NULL) {
mError("user:%s, failed to drop since %s", pUser->user, terrstr()); mError("user:%s, failed to drop since %s", pUser->user, terrstr());
return -1; return -1;
......
...@@ -16,6 +16,8 @@ ...@@ -16,6 +16,8 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "sdbInt.h" #include "sdbInt.h"
static void sdbCheck(SSdb *pSdb, SSdbRow *pRow);
const char *sdbTableName(ESdbType type) { const char *sdbTableName(ESdbType type) {
switch (type) { switch (type) {
case SDB_TRANS: case SDB_TRANS:
...@@ -221,6 +223,8 @@ static int32_t sdbDeleteRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow * ...@@ -221,6 +223,8 @@ static int32_t sdbDeleteRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
pSdb->tableVer[pOldRow->type]++; pSdb->tableVer[pOldRow->type]++;
sdbFreeRow(pSdb, pRow); sdbFreeRow(pSdb, pRow);
sdbCheck(pSdb, pOldRow);
// sdbRelease(pSdb, pOldRow->pObj); // sdbRelease(pSdb, pOldRow->pObj);
return 0; return 0;
} }
...@@ -305,6 +309,19 @@ void *sdbAcquire(SSdb *pSdb, ESdbType type, const void *pKey) { ...@@ -305,6 +309,19 @@ void *sdbAcquire(SSdb *pSdb, ESdbType type, const void *pKey) {
return pRet; return pRet;
} }
static void sdbCheck(SSdb *pSdb, SSdbRow *pRow) {
SRWLatch *pLock = &pSdb->locks[pRow->type];
taosRLockLatch(pLock);
int32_t ref = atomic_load_32(&pRow->refCount);
sdbPrintOper(pSdb, pRow, "checkRow");
if (ref <= 0 && pRow->status == SDB_STATUS_DROPPED) {
sdbFreeRow(pSdb, pRow);
}
taosRUnLockLatch(pLock);
}
void sdbRelease(SSdb *pSdb, void *pObj) { void sdbRelease(SSdb *pSdb, void *pObj) {
if (pObj == NULL) return; if (pObj == NULL) return;
...@@ -332,6 +349,7 @@ void *sdbFetch(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj) { ...@@ -332,6 +349,7 @@ void *sdbFetch(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj) {
SRWLatch *pLock = &pSdb->locks[type]; SRWLatch *pLock = &pSdb->locks[type];
taosRLockLatch(pLock); taosRLockLatch(pLock);
#if 0
if (pIter != NULL) { if (pIter != NULL) {
SSdbRow *pLastRow = *(SSdbRow **)pIter; SSdbRow *pLastRow = *(SSdbRow **)pIter;
int32_t ref = atomic_load_32(&pLastRow->refCount); int32_t ref = atomic_load_32(&pLastRow->refCount);
...@@ -339,6 +357,7 @@ void *sdbFetch(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj) { ...@@ -339,6 +357,7 @@ void *sdbFetch(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj) {
sdbFreeRow(pSdb, pLastRow); sdbFreeRow(pSdb, pLastRow);
} }
} }
#endif
SSdbRow **ppRow = taosHashIterate(hash, pIter); SSdbRow **ppRow = taosHashIterate(hash, pIter);
while (ppRow != NULL) { while (ppRow != NULL) {
......
...@@ -264,3 +264,60 @@ void nodesRewriteNodePostOrder(SNode** pNode, FNodeRewriter rewriter, void* pCon ...@@ -264,3 +264,60 @@ void nodesRewriteNodePostOrder(SNode** pNode, FNodeRewriter rewriter, void* pCon
void nodesRewriteListPostOrder(SNodeList* pList, FNodeRewriter rewriter, void* pContext) { void nodesRewriteListPostOrder(SNodeList* pList, FNodeRewriter rewriter, void* pContext) {
(void)rewriteList(pList, TRAVERSAL_POSTORDER, rewriter, pContext); (void)rewriteList(pList, TRAVERSAL_POSTORDER, rewriter, pContext);
} }
void nodesWalkSelectStmt(SSelectStmt* pSelect, ESqlClause clause, FNodeWalker walker, void* pContext) {
if (NULL == pSelect) {
return;
}
switch (clause) {
case SQL_CLAUSE_FROM:
nodesWalkNode(pSelect->pFromTable, walker, pContext);
nodesWalkNode(pSelect->pWhere, walker, pContext);
case SQL_CLAUSE_WHERE:
nodesWalkList(pSelect->pPartitionByList, walker, pContext);
case SQL_CLAUSE_PARTITION_BY:
nodesWalkNode(pSelect->pWindow, walker, pContext);
case SQL_CLAUSE_WINDOW:
nodesWalkList(pSelect->pGroupByList, walker, pContext);
case SQL_CLAUSE_GROUP_BY:
nodesWalkNode(pSelect->pHaving, walker, pContext);
case SQL_CLAUSE_HAVING:
nodesWalkList(pSelect->pProjectionList, walker, pContext);
case SQL_CLAUSE_SELECT:
nodesWalkList(pSelect->pOrderByList, walker, pContext);
case SQL_CLAUSE_ORDER_BY:
default:
break;
}
return;
}
void nodesRewriteSelectStmt(SSelectStmt* pSelect, ESqlClause clause, FNodeRewriter rewriter, void* pContext) {
if (NULL == pSelect) {
return;
}
switch (clause) {
case SQL_CLAUSE_FROM:
nodesRewriteNode(&(pSelect->pFromTable), rewriter, pContext);
nodesRewriteNode(&(pSelect->pWhere), rewriter, pContext);
case SQL_CLAUSE_WHERE:
nodesRewriteList(pSelect->pPartitionByList, rewriter, pContext);
case SQL_CLAUSE_PARTITION_BY:
nodesRewriteNode(&(pSelect->pWindow), rewriter, pContext);
case SQL_CLAUSE_WINDOW:
nodesRewriteList(pSelect->pGroupByList, rewriter, pContext);
case SQL_CLAUSE_GROUP_BY:
nodesRewriteNode(&(pSelect->pHaving), rewriter, pContext);
case SQL_CLAUSE_HAVING:
nodesRewriteList(pSelect->pProjectionList, rewriter, pContext);
case SQL_CLAUSE_SELECT:
nodesRewriteList(pSelect->pOrderByList, rewriter, pContext);
default:
break;
}
return;
}
...@@ -15,7 +15,9 @@ ...@@ -15,7 +15,9 @@
#include "querynodes.h" #include "querynodes.h"
#include "nodesShowStmts.h" #include "nodesShowStmts.h"
#include "taos.h"
#include "taoserror.h" #include "taoserror.h"
#include "thash.h"
static SNode* makeNode(ENodeType type, size_t size) { static SNode* makeNode(ENodeType type, size_t size) {
SNode* p = calloc(1, size); SNode* p = calloc(1, size);
...@@ -98,14 +100,14 @@ SNodeList* nodesMakeList() { ...@@ -98,14 +100,14 @@ SNodeList* nodesMakeList() {
return p; return p;
} }
SNodeList* nodesListAppend(SNodeList* pList, SNode* pNode) { int32_t nodesListAppend(SNodeList* pList, SNode* pNode) {
if (NULL == pList || NULL == pNode) { if (NULL == pList || NULL == pNode) {
return NULL; return TSDB_CODE_SUCCESS;
} }
SListCell* p = calloc(1, sizeof(SListCell)); SListCell* p = calloc(1, sizeof(SListCell));
if (NULL == p) { if (NULL == p) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return pList; return TSDB_CODE_OUT_OF_MEMORY;
} }
p->pNode = pNode; p->pNode = pNode;
if (NULL == pList->pHead) { if (NULL == pList->pHead) {
...@@ -116,7 +118,7 @@ SNodeList* nodesListAppend(SNodeList* pList, SNode* pNode) { ...@@ -116,7 +118,7 @@ SNodeList* nodesListAppend(SNodeList* pList, SNode* pNode) {
} }
pList->pTail = p; pList->pTail = p;
++(pList->length); ++(pList->length);
return pList; return TSDB_CODE_SUCCESS;
} }
SListCell* nodesListErase(SNodeList* pList, SListCell* pCell) { SListCell* nodesListErase(SNodeList* pList, SListCell* pCell) {
...@@ -207,4 +209,100 @@ bool nodesIsTimeorderQuery(const SNode* pQuery) { ...@@ -207,4 +209,100 @@ bool nodesIsTimeorderQuery(const SNode* pQuery) {
bool nodesIsTimelineQuery(const SNode* pQuery) { bool nodesIsTimelineQuery(const SNode* pQuery) {
return false; return false;
} }
\ No newline at end of file
typedef struct SCollectColumnsCxt {
int32_t errCode;
uint64_t tableId;
bool realCol;
SNodeList* pCols;
SHashObj* pColIdHash;
} SCollectColumnsCxt;
static EDealRes doCollect(SCollectColumnsCxt* pCxt, int32_t id, SNode* pNode) {
if (NULL == taosHashGet(pCxt->pColIdHash, &id, sizeof(id))) {
pCxt->errCode = taosHashPut(pCxt->pColIdHash, &id, sizeof(id), NULL, 0);
if (TSDB_CODE_SUCCESS == pCxt->errCode) {
pCxt->errCode = nodesListAppend(pCxt->pCols, pNode);
}
return (TSDB_CODE_SUCCESS == pCxt->errCode ? DEAL_RES_IGNORE_CHILD : DEAL_RES_ERROR);
}
return DEAL_RES_CONTINUE;
}
static EDealRes collectColumns(SNode* pNode, void* pContext) {
SCollectColumnsCxt* pCxt = (SCollectColumnsCxt*)pContext;
if (pCxt->realCol && QUERY_NODE_COLUMN == nodeType(pNode)) {
SColumnNode* pCol = (SColumnNode*)pNode;
int32_t colId = pCol->colId;
if (pCxt->tableId == pCol->tableId && colId > 0) {
return doCollect(pCxt, colId, pNode);
}
} else if (!pCxt->realCol && QUERY_NODE_COLUMN_REF == nodeType(pNode)) {
return doCollect(pCxt, ((SColumnRefNode*)pNode)->slotId, pNode);
}
return DEAL_RES_CONTINUE;
}
int32_t nodesCollectColumns(SSelectStmt* pSelect, ESqlClause clause, uint64_t tableId, bool realCol, SNodeList** pCols) {
if (NULL == pSelect || NULL == pCols) {
return TSDB_CODE_SUCCESS;
}
SCollectColumnsCxt cxt = {
.errCode = TSDB_CODE_SUCCESS,
.realCol = realCol,
.pCols = nodesMakeList(),
.pColIdHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK)
};
if (NULL == cxt.pCols || NULL == cxt.pColIdHash) {
return TSDB_CODE_OUT_OF_MEMORY;
}
nodesWalkSelectStmt(pSelect, clause, collectColumns, &cxt);
taosHashCleanup(cxt.pColIdHash);
if (TSDB_CODE_SUCCESS != cxt.errCode) {
nodesDestroyList(cxt.pCols);
return cxt.errCode;
}
*pCols = cxt.pCols;
return TSDB_CODE_SUCCESS;
}
typedef struct SCollectFuncsCxt {
int32_t errCode;
FFuncClassifier classifier;
SNodeList* pFuncs;
} SCollectFuncsCxt;
static EDealRes collectFuncs(SNode* pNode, void* pContext) {
SCollectFuncsCxt* pCxt = (SCollectFuncsCxt*)pContext;
if (QUERY_NODE_FUNCTION == nodeType(pNode) && pCxt->classifier(((SFunctionNode*)pNode)->funcId)) {
pCxt->errCode = nodesListAppend(pCxt->pFuncs, pNode);
return (TSDB_CODE_SUCCESS == pCxt->errCode ? DEAL_RES_IGNORE_CHILD : DEAL_RES_ERROR);
}
return DEAL_RES_CONTINUE;
}
int32_t nodesCollectFuncs(SSelectStmt* pSelect, FFuncClassifier classifier, SNodeList** pFuncs) {
if (NULL == pSelect || NULL == pFuncs) {
return TSDB_CODE_SUCCESS;
}
SCollectFuncsCxt cxt = {
.errCode = TSDB_CODE_SUCCESS,
.classifier = classifier,
.pFuncs = nodesMakeList()
};
if (NULL == cxt.pFuncs) {
return TSDB_CODE_OUT_OF_MEMORY;
}
nodesWalkSelectStmt(pSelect, SQL_CLAUSE_GROUP_BY, collectFuncs, &cxt);
if (TSDB_CODE_SUCCESS != cxt.errCode) {
nodesDestroyList(cxt.pFuncs);
return cxt.errCode;
}
*pFuncs = cxt.pFuncs;
return TSDB_CODE_SUCCESS;
}
...@@ -170,6 +170,7 @@ typedef struct SCreateDbInfo { ...@@ -170,6 +170,7 @@ typedef struct SCreateDbInfo {
int8_t update; int8_t update;
int8_t cachelast; int8_t cachelast;
SArray *keep; SArray *keep;
int8_t streamMode;
} SCreateDbInfo; } SCreateDbInfo;
typedef struct SCreateFuncInfo { typedef struct SCreateFuncInfo {
......
...@@ -76,7 +76,7 @@ cmd ::= SHOW QUERIES. { setShowOptions(pInfo, TSDB_MGMT_TABLE_QUERIES, 0, 0); ...@@ -76,7 +76,7 @@ cmd ::= SHOW QUERIES. { setShowOptions(pInfo, TSDB_MGMT_TABLE_QUERIES, 0, 0);
cmd ::= SHOW CONNECTIONS.{ setShowOptions(pInfo, TSDB_MGMT_TABLE_CONNS, 0, 0);} cmd ::= SHOW CONNECTIONS.{ setShowOptions(pInfo, TSDB_MGMT_TABLE_CONNS, 0, 0);}
cmd ::= SHOW STREAMS. { setShowOptions(pInfo, TSDB_MGMT_TABLE_STREAMS, 0, 0); } cmd ::= SHOW STREAMS. { setShowOptions(pInfo, TSDB_MGMT_TABLE_STREAMS, 0, 0); }
cmd ::= SHOW VARIABLES. { setShowOptions(pInfo, TSDB_MGMT_TABLE_VARIABLES, 0, 0); } cmd ::= SHOW VARIABLES. { setShowOptions(pInfo, TSDB_MGMT_TABLE_VARIABLES, 0, 0); }
cmd ::= SHOW SCORES. { setShowOptions(pInfo, TSDB_MGMT_TABLE_SCORES, 0, 0); } cmd ::= SHOW SCORES. { setShowOptions(pInfo, TSDB_MGMT_TABLE_TRANS, 0, 0); }
cmd ::= SHOW GRANTS. { setShowOptions(pInfo, TSDB_MGMT_TABLE_GRANTS, 0, 0); } cmd ::= SHOW GRANTS. { setShowOptions(pInfo, TSDB_MGMT_TABLE_GRANTS, 0, 0); }
cmd ::= SHOW VNODES. { setShowOptions(pInfo, TSDB_MGMT_TABLE_VNODES, 0, 0); } cmd ::= SHOW VNODES. { setShowOptions(pInfo, TSDB_MGMT_TABLE_VNODES, 0, 0); }
...@@ -282,6 +282,7 @@ update(Y) ::= UPDATE INTEGER(X). { Y = X; } ...@@ -282,6 +282,7 @@ update(Y) ::= UPDATE INTEGER(X). { Y = X; }
cachelast(Y) ::= CACHELAST INTEGER(X). { Y = X; } cachelast(Y) ::= CACHELAST INTEGER(X). { Y = X; }
vgroups(Y) ::= VGROUPS INTEGER(X). { Y = X; } vgroups(Y) ::= VGROUPS INTEGER(X). { Y = X; }
//partitions(Y) ::= PARTITIONS INTEGER(X). { Y = X; } //partitions(Y) ::= PARTITIONS INTEGER(X). { Y = X; }
stream_mode(Y) ::= STREAM MODE INTEGER(X). { Y = X; }
%type db_optr {SCreateDbInfo} %type db_optr {SCreateDbInfo}
db_optr(Y) ::= . {setDefaultCreateDbOption(&Y);} db_optr(Y) ::= . {setDefaultCreateDbOption(&Y);}
...@@ -302,6 +303,7 @@ db_optr(Y) ::= db_optr(Z) keep(X). { Y = Z; Y.keep = X; } ...@@ -302,6 +303,7 @@ db_optr(Y) ::= db_optr(Z) keep(X). { Y = Z; Y.keep = X; }
db_optr(Y) ::= db_optr(Z) update(X). { Y = Z; Y.update = strtol(X.z, NULL, 10); } db_optr(Y) ::= db_optr(Z) update(X). { Y = Z; Y.update = strtol(X.z, NULL, 10); }
db_optr(Y) ::= db_optr(Z) cachelast(X). { Y = Z; Y.cachelast = strtol(X.z, NULL, 10); } db_optr(Y) ::= db_optr(Z) cachelast(X). { Y = Z; Y.cachelast = strtol(X.z, NULL, 10); }
db_optr(Y) ::= db_optr(Z) vgroups(X). { Y = Z; Y.numOfVgroups = strtol(X.z, NULL, 10); } db_optr(Y) ::= db_optr(Z) vgroups(X). { Y = Z; Y.numOfVgroups = strtol(X.z, NULL, 10); }
db_optr(Y) ::= db_optr(Z) stream_mode(X). { Y = Z; Y.streamMode = strtol(X.z, NULL, 10); }
//%type topic_optr {SCreateDbInfo} //%type topic_optr {SCreateDbInfo}
// //
......
...@@ -96,11 +96,17 @@ SToken getTokenFromRawExprNode(SAstCreateContext* pCxt, SNode* pNode) { ...@@ -96,11 +96,17 @@ SToken getTokenFromRawExprNode(SAstCreateContext* pCxt, SNode* pNode) {
SNodeList* createNodeList(SAstCreateContext* pCxt, SNode* pNode) { SNodeList* createNodeList(SAstCreateContext* pCxt, SNode* pNode) {
SNodeList* list = nodesMakeList(); SNodeList* list = nodesMakeList();
CHECK_OUT_OF_MEM(list); CHECK_OUT_OF_MEM(list);
return nodesListAppend(list, pNode); if (TSDB_CODE_SUCCESS != nodesListAppend(list, pNode)) {
pCxt->valid = false;
}
return list;
} }
SNodeList* addNodeToList(SAstCreateContext* pCxt, SNodeList* pList, SNode* pNode) { SNodeList* addNodeToList(SAstCreateContext* pCxt, SNodeList* pList, SNode* pNode) {
return nodesListAppend(pList, pNode); if (TSDB_CODE_SUCCESS != nodesListAppend(pList, pNode)) {
pCxt->valid = false;
}
return pList;
} }
SNode* createColumnNode(SAstCreateContext* pCxt, const SToken* pTableAlias, const SToken* pColumnName) { SNode* createColumnNode(SAstCreateContext* pCxt, const SToken* pTableAlias, const SToken* pColumnName) {
......
...@@ -242,6 +242,7 @@ static void doSetDbOptions(SCreateDbReq* pMsg, const SCreateDbInfo* pCreateDb) { ...@@ -242,6 +242,7 @@ static void doSetDbOptions(SCreateDbReq* pMsg, const SCreateDbInfo* pCreateDb) {
pMsg->update = pCreateDb->update; pMsg->update = pCreateDb->update;
pMsg->cacheLastRow = pCreateDb->cachelast; pMsg->cacheLastRow = pCreateDb->cachelast;
pMsg->numOfVgroups = pCreateDb->numOfVgroups; pMsg->numOfVgroups = pCreateDb->numOfVgroups;
pMsg->streamMode = pCreateDb->streamMode;
} }
int32_t setDbOptions(SCreateDbReq* pCreateDbMsg, const SCreateDbInfo* pCreateDbSql, SMsgBuf* pMsgBuf) { int32_t setDbOptions(SCreateDbReq* pCreateDbMsg, const SCreateDbInfo* pCreateDbSql, SMsgBuf* pMsgBuf) {
......
...@@ -241,17 +241,6 @@ abort_parse: ...@@ -241,17 +241,6 @@ abort_parse:
return cxt.valid ? TSDB_CODE_SUCCESS : TSDB_CODE_FAILED; return cxt.valid ? TSDB_CODE_SUCCESS : TSDB_CODE_FAILED;
} }
typedef enum ESqlClause {
SQL_CLAUSE_FROM = 1,
SQL_CLAUSE_WHERE,
SQL_CLAUSE_PARTITION_BY,
SQL_CLAUSE_WINDOW,
SQL_CLAUSE_GROUP_BY,
SQL_CLAUSE_HAVING,
SQL_CLAUSE_SELECT,
SQL_CLAUSE_ORDER_BY
} ESqlClause;
static bool afterGroupBy(ESqlClause clause) { static bool afterGroupBy(ESqlClause clause) {
return clause > SQL_CLAUSE_GROUP_BY; return clause > SQL_CLAUSE_GROUP_BY;
} }
...@@ -298,7 +287,7 @@ static char* getSyntaxErrFormat(int32_t errCode) { ...@@ -298,7 +287,7 @@ static char* getSyntaxErrFormat(int32_t errCode) {
return "Not SELECTed expression"; return "Not SELECTed expression";
case TSDB_CODE_PAR_NOT_SINGLE_GROUP: case TSDB_CODE_PAR_NOT_SINGLE_GROUP:
return "Not a single-group group function"; return "Not a single-group group function";
case TSDB_CODE_PAR_OUT_OF_MEMORY: case TSDB_CODE_OUT_OF_MEMORY:
return "Out of memory"; return "Out of memory";
default: default:
return "Unknown error"; return "Unknown error";
...@@ -360,14 +349,15 @@ static SNodeList* getProjectList(SNode* pNode) { ...@@ -360,14 +349,15 @@ static SNodeList* getProjectList(SNode* pNode) {
return NULL; return NULL;
} }
static void setColumnInfoBySchema(const STableNode* pTable, const SSchema* pColSchema, SColumnNode* pCol) { static void setColumnInfoBySchema(const SRealTableNode* pTable, const SSchema* pColSchema, SColumnNode* pCol) {
strcpy(pCol->dbName, pTable->dbName); strcpy(pCol->dbName, pTable->table.dbName);
strcpy(pCol->tableAlias, pTable->tableAlias); strcpy(pCol->tableAlias, pTable->table.tableAlias);
strcpy(pCol->tableName, pTable->tableName); strcpy(pCol->tableName, pTable->table.tableName);
strcpy(pCol->colName, pColSchema->name); strcpy(pCol->colName, pColSchema->name);
if ('\0' == pCol->node.aliasName[0]) { if ('\0' == pCol->node.aliasName[0]) {
strcpy(pCol->node.aliasName, pColSchema->name); strcpy(pCol->node.aliasName, pColSchema->name);
} }
pCol->tableId = pTable->pMeta->uid;
pCol->colId = pColSchema->colId; pCol->colId = pColSchema->colId;
// pCol->colType = pColSchema->type; // pCol->colType = pColSchema->type;
pCol->node.resType.type = pColSchema->type; pCol->node.resType.type = pColSchema->type;
...@@ -376,7 +366,7 @@ static void setColumnInfoBySchema(const STableNode* pTable, const SSchema* pColS ...@@ -376,7 +366,7 @@ static void setColumnInfoBySchema(const STableNode* pTable, const SSchema* pColS
static void setColumnInfoByExpr(const STableNode* pTable, SExprNode* pExpr, SColumnNode* pCol) { static void setColumnInfoByExpr(const STableNode* pTable, SExprNode* pExpr, SColumnNode* pCol) {
pCol->pProjectRef = (SNode*)pExpr; pCol->pProjectRef = (SNode*)pExpr;
pExpr->pAssociationList = nodesListAppend(pExpr->pAssociationList, (SNode*)pCol); nodesListAppend(pExpr->pAssociationList, (SNode*)pCol);
if (NULL != pTable) { if (NULL != pTable) {
strcpy(pCol->tableAlias, pTable->tableAlias); strcpy(pCol->tableAlias, pTable->tableAlias);
} }
...@@ -391,9 +381,9 @@ static int32_t createColumnNodeByTable(STranslateContext* pCxt, const STableNode ...@@ -391,9 +381,9 @@ static int32_t createColumnNodeByTable(STranslateContext* pCxt, const STableNode
for (int32_t i = 0; i < nums; ++i) { for (int32_t i = 0; i < nums; ++i) {
SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN); SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
if (NULL == pCol) { if (NULL == pCol) {
return generateSyntaxErrMsg(pCxt, TSDB_CODE_PAR_OUT_OF_MEMORY); return generateSyntaxErrMsg(pCxt, TSDB_CODE_OUT_OF_MEMORY);
} }
setColumnInfoBySchema(pTable, pMeta->schema + i, pCol); setColumnInfoBySchema((SRealTableNode*)pTable, pMeta->schema + i, pCol);
nodesListAppend(pList, (SNode*)pCol); nodesListAppend(pList, (SNode*)pCol);
} }
} else { } else {
...@@ -402,7 +392,7 @@ static int32_t createColumnNodeByTable(STranslateContext* pCxt, const STableNode ...@@ -402,7 +392,7 @@ static int32_t createColumnNodeByTable(STranslateContext* pCxt, const STableNode
FOREACH(pNode, pProjectList) { FOREACH(pNode, pProjectList) {
SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN); SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
if (NULL == pCol) { if (NULL == pCol) {
return generateSyntaxErrMsg(pCxt, TSDB_CODE_PAR_OUT_OF_MEMORY); return generateSyntaxErrMsg(pCxt, TSDB_CODE_OUT_OF_MEMORY);
} }
setColumnInfoByExpr(pTable, (SExprNode*)pNode, pCol); setColumnInfoByExpr(pTable, (SExprNode*)pNode, pCol);
nodesListAppend(pList, (SNode*)pCol); nodesListAppend(pList, (SNode*)pCol);
...@@ -418,7 +408,7 @@ static bool findAndSetColumn(SColumnNode* pCol, const STableNode* pTable) { ...@@ -418,7 +408,7 @@ static bool findAndSetColumn(SColumnNode* pCol, const STableNode* pTable) {
int32_t nums = pMeta->tableInfo.numOfTags + pMeta->tableInfo.numOfColumns; int32_t nums = pMeta->tableInfo.numOfTags + pMeta->tableInfo.numOfColumns;
for (int32_t i = 0; i < nums; ++i) { for (int32_t i = 0; i < nums; ++i) {
if (0 == strcmp(pCol->colName, pMeta->schema[i].name)) { if (0 == strcmp(pCol->colName, pMeta->schema[i].name)) {
setColumnInfoBySchema(pTable, pMeta->schema + i, pCol); setColumnInfoBySchema((SRealTableNode*)pTable, pMeta->schema + i, pCol);
found = true; found = true;
break; break;
} }
...@@ -572,7 +562,7 @@ static EDealRes translateValue(STranslateContext* pCxt, SValueNode* pVal) { ...@@ -572,7 +562,7 @@ static EDealRes translateValue(STranslateContext* pCxt, SValueNode* pVal) {
int32_t n = strlen(pVal->literal); int32_t n = strlen(pVal->literal);
pVal->datum.p = calloc(1, n); pVal->datum.p = calloc(1, n);
if (NULL == pVal->datum.p) { if (NULL == pVal->datum.p) {
generateSyntaxErrMsg(pCxt, TSDB_CODE_PAR_OUT_OF_MEMORY); generateSyntaxErrMsg(pCxt, TSDB_CODE_OUT_OF_MEMORY);
return DEAL_RES_ERROR; return DEAL_RES_ERROR;
} }
trimStringCopy(pVal->literal, n, pVal->datum.p); trimStringCopy(pVal->literal, n, pVal->datum.p);
...@@ -582,7 +572,7 @@ static EDealRes translateValue(STranslateContext* pCxt, SValueNode* pVal) { ...@@ -582,7 +572,7 @@ static EDealRes translateValue(STranslateContext* pCxt, SValueNode* pVal) {
int32_t n = strlen(pVal->literal); int32_t n = strlen(pVal->literal);
char* tmp = calloc(1, n); char* tmp = calloc(1, n);
if (NULL == tmp) { if (NULL == tmp) {
generateSyntaxErrMsg(pCxt, TSDB_CODE_PAR_OUT_OF_MEMORY); generateSyntaxErrMsg(pCxt, TSDB_CODE_OUT_OF_MEMORY);
return DEAL_RES_ERROR; return DEAL_RES_ERROR;
} }
int32_t len = trimStringCopy(pVal->literal, n, tmp); int32_t len = trimStringCopy(pVal->literal, n, tmp);
...@@ -830,7 +820,7 @@ static int32_t translateStar(STranslateContext* pCxt, SSelectStmt* pSelect, bool ...@@ -830,7 +820,7 @@ static int32_t translateStar(STranslateContext* pCxt, SSelectStmt* pSelect, bool
size_t nums = taosArrayGetSize(pTables); size_t nums = taosArrayGetSize(pTables);
pSelect->pProjectionList = nodesMakeList(); pSelect->pProjectionList = nodesMakeList();
if (NULL == pSelect->pProjectionList) { if (NULL == pSelect->pProjectionList) {
return generateSyntaxErrMsg(pCxt, TSDB_CODE_PAR_OUT_OF_MEMORY); return generateSyntaxErrMsg(pCxt, TSDB_CODE_OUT_OF_MEMORY);
} }
for (size_t i = 0; i < nums; ++i) { for (size_t i = 0; i < nums; ++i) {
STableNode* pTable = taosArrayGetP(pTables, i); STableNode* pTable = taosArrayGetP(pTables, i);
...@@ -897,7 +887,7 @@ static int32_t translateOrderByPosition(STranslateContext* pCxt, SNodeList* pPro ...@@ -897,7 +887,7 @@ static int32_t translateOrderByPosition(STranslateContext* pCxt, SNodeList* pPro
} else { } else {
SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN); SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
if (NULL == pCol) { if (NULL == pCol) {
return generateSyntaxErrMsg(pCxt, TSDB_CODE_PAR_OUT_OF_MEMORY); return generateSyntaxErrMsg(pCxt, TSDB_CODE_OUT_OF_MEMORY);
} }
setColumnInfoByExpr(NULL, (SExprNode*)nodesListGetNode(pProjectionList, pos - 1), pCol); setColumnInfoByExpr(NULL, (SExprNode*)nodesListGetNode(pProjectionList, pos - 1), pCol);
((SOrderByExprNode*)pNode)->pExpr = (SNode*)pCol; ((SOrderByExprNode*)pNode)->pExpr = (SNode*)pCol;
...@@ -1036,7 +1026,7 @@ int32_t setReslutSchema(STranslateContext* pCxt, SQuery* pQuery) { ...@@ -1036,7 +1026,7 @@ int32_t setReslutSchema(STranslateContext* pCxt, SQuery* pQuery) {
pQuery->numOfResCols = LIST_LENGTH(pSelect->pProjectionList); pQuery->numOfResCols = LIST_LENGTH(pSelect->pProjectionList);
pQuery->pResSchema = calloc(pQuery->numOfResCols, sizeof(SSchema)); pQuery->pResSchema = calloc(pQuery->numOfResCols, sizeof(SSchema));
if (NULL == pQuery->pResSchema) { if (NULL == pQuery->pResSchema) {
return generateSyntaxErrMsg(pCxt, TSDB_CODE_PAR_OUT_OF_MEMORY); return generateSyntaxErrMsg(pCxt, TSDB_CODE_OUT_OF_MEMORY);
} }
SNode* pNode; SNode* pNode;
int32_t index = 0; int32_t index = 0;
......
此差异已折叠。
...@@ -230,6 +230,7 @@ static SKeyword keywordTable[] = { ...@@ -230,6 +230,7 @@ static SKeyword keywordTable[] = {
{"PORT", TK_PORT}, {"PORT", TK_PORT},
{"INNER", NEW_TK_INNER}, {"INNER", NEW_TK_INNER},
{"ON", NEW_TK_ON}, {"ON", NEW_TK_ON},
{"MODE", TK_MODE},
}; };
static const char isIdChar[] = { static const char isIdChar[] = {
......
...@@ -25,6 +25,7 @@ extern "C" { ...@@ -25,6 +25,7 @@ extern "C" {
typedef struct SLogicNode { typedef struct SLogicNode {
ENodeType type; ENodeType type;
int32_t id;
SNodeList* pTargets; SNodeList* pTargets;
SNode* pConditions; SNode* pConditions;
SNodeList* pChildren; SNodeList* pChildren;
...@@ -37,6 +38,12 @@ typedef struct SScanLogicNode { ...@@ -37,6 +38,12 @@ typedef struct SScanLogicNode {
struct STableMeta* pMeta; struct STableMeta* pMeta;
} SScanLogicNode; } SScanLogicNode;
typedef struct SJoinLogicNode {
SLogicNode node;
EJoinType joinType;
SNode* pOnConditions;
} SJoinLogicNode;
typedef struct SFilterLogicNode { typedef struct SFilterLogicNode {
SLogicNode node; SLogicNode node;
} SFilterLogicNode; } SFilterLogicNode;
...@@ -47,6 +54,10 @@ typedef struct SAggLogicNode { ...@@ -47,6 +54,10 @@ typedef struct SAggLogicNode {
SNodeList* pAggFuncs; SNodeList* pAggFuncs;
} SAggLogicNode; } SAggLogicNode;
typedef struct SProjectLogicNode {
SLogicNode node;
} SProjectLogicNode;
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -16,83 +16,62 @@ ...@@ -16,83 +16,62 @@
#include "plannerImpl.h" #include "plannerImpl.h"
#include "functionMgt.h" #include "functionMgt.h"
static SLogicNode* createQueryLogicNode(SNode* pStmt); #define CHECK_ALLOC(p, res) \
do { \
typedef struct SCollectColumnsCxt { if (NULL == p) { \
SNodeList* pCols; pCxt->errCode = TSDB_CODE_OUT_OF_MEMORY; \
SHashObj* pColIdHash; return res; \
} SCollectColumnsCxt; } \
} while (0)
static EDealRes doCollectColumns(SNode* pNode, void* pContext) {
if (QUERY_NODE_COLUMN == nodeType(pNode)) {
SCollectColumnsCxt* pCxt = (SCollectColumnsCxt*)pContext;
int16_t colId = ((SColumnNode*)pNode)->colId;
if (colId > 0) {
if (NULL == taosHashGet(pCxt->pColIdHash, &colId, sizeof(colId))) {
taosHashPut(pCxt->pColIdHash, &colId, sizeof(colId), NULL, 0);
nodesListAppend(pCxt->pCols, pNode);
}
}
}
return DEAL_RES_CONTINUE;
}
static SNodeList* collectColumns(SSelectStmt* pSelect) { #define CHECK_CODE(exec, res) \
SCollectColumnsCxt cxt = { .pCols = nodesMakeList(), .pColIdHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK) }; do { \
if (NULL == cxt.pCols || NULL == cxt.pColIdHash) { int32_t code = exec; \
return NULL; if (TSDB_CODE_SUCCESS != code) { \
} pCxt->errCode = code; \
nodesWalkNode(pSelect->pFromTable, doCollectColumns, &cxt); return res; \
nodesWalkNode(pSelect->pWhere, doCollectColumns, &cxt); } \
nodesWalkList(pSelect->pPartitionByList, doCollectColumns, &cxt); } while (0)
nodesWalkNode(pSelect->pWindow, doCollectColumns, &cxt);
nodesWalkList(pSelect->pGroupByList, doCollectColumns, &cxt);
nodesWalkNode(pSelect->pHaving, doCollectColumns, &cxt);
nodesWalkList(pSelect->pProjectionList, doCollectColumns, &cxt);
nodesWalkList(pSelect->pOrderByList, doCollectColumns, &cxt);
taosHashCleanup(cxt.pColIdHash);
return cxt.pCols;
}
typedef struct SCollectAggFuncsCxt { typedef struct SPlanContext {
SNodeList* pAggFuncs; int32_t errCode;
} SCollectAggFuncsCxt; int32_t planNodeId;
SNodeList* pResource;
} SPlanContext;
static EDealRes doCollectAggFuncs(SNode* pNode, void* pContext) { static SLogicNode* createQueryLogicNode(SPlanContext* pCxt, SNode* pStmt);
if (QUERY_NODE_FUNCTION == nodeType(pNode) && fmIsAggFunc(((SFunctionNode*)pNode)->funcId)) { static SLogicNode* createLogicNodeByTable(SPlanContext* pCxt, SSelectStmt* pSelect, SNode* pTable);
SCollectAggFuncsCxt* pCxt = (SCollectAggFuncsCxt*)pContext;
nodesListAppend(pCxt->pAggFuncs, pNode);
return DEAL_RES_IGNORE_CHILD;
}
return DEAL_RES_CONTINUE;
}
static SNodeList* collectAggFuncs(SSelectStmt* pSelect) {
SCollectAggFuncsCxt cxt = { .pAggFuncs = nodesMakeList() };
if (NULL == cxt.pAggFuncs) {
return NULL;
}
nodesWalkNode(pSelect->pHaving, doCollectAggFuncs, &cxt);
nodesWalkList(pSelect->pProjectionList, doCollectAggFuncs, &cxt);
if (!pSelect->isDistinct) {
nodesWalkList(pSelect->pOrderByList, doCollectAggFuncs, &cxt);
}
return cxt.pAggFuncs;
}
typedef struct SRewriteExprCxt { typedef struct SRewriteExprCxt {
int32_t errCode;
int32_t planNodeId;
SNodeList* pTargets; SNodeList* pTargets;
} SRewriteExprCxt; } SRewriteExprCxt;
static EDealRes doRewriteExpr(SNode** pNode, void* pContext) { static EDealRes doRewriteExpr(SNode** pNode, void* pContext) {
switch (nodeType(*pNode)) {
case QUERY_NODE_OPERATOR:
case QUERY_NODE_LOGIC_CONDITION:
case QUERY_NODE_FUNCTION: {
break;
}
default:
break;
}
SRewriteExprCxt* pCxt = (SRewriteExprCxt*)pContext; SRewriteExprCxt* pCxt = (SRewriteExprCxt*)pContext;
SNode* pTarget; SNode* pTarget;
int32_t index = 0; int32_t index = 0;
FOREACH(pTarget, pCxt->pTargets) { FOREACH(pTarget, pCxt->pTargets) {
if (nodesEqualNode(pTarget, *pNode)) { if (nodesEqualNode(pTarget, *pNode)) {
SColumnRefNode* pCol = (SColumnRefNode*)nodesMakeNode(QUERY_NODE_COLUMN_REF);
if (NULL == pCol) {
pCxt->errCode = TSDB_CODE_OUT_OF_MEMORY;
return DEAL_RES_ERROR;
}
pCol->tupleId = pCxt->planNodeId;
pCol->slotId = index;
nodesDestroyNode(*pNode); nodesDestroyNode(*pNode);
*pNode = nodesMakeNode(QUERY_NODE_COLUMN_REF); *pNode = (SNode*)pCol;
((SColumnRef*)*pNode)->slotId = index;
return DEAL_RES_IGNORE_CHILD; return DEAL_RES_IGNORE_CHILD;
} }
++index; ++index;
...@@ -100,105 +79,223 @@ static EDealRes doRewriteExpr(SNode** pNode, void* pContext) { ...@@ -100,105 +79,223 @@ static EDealRes doRewriteExpr(SNode** pNode, void* pContext) {
return DEAL_RES_CONTINUE; return DEAL_RES_CONTINUE;
} }
static int32_t rewriteExpr(SNodeList* pTargets, SSelectStmt* pSelect) { static int32_t rewriteExpr(int32_t planNodeId, SNodeList* pTargets, SSelectStmt* pSelect, ESqlClause clause) {
SRewriteExprCxt cxt = { .pTargets = pTargets }; SRewriteExprCxt cxt = { .errCode = TSDB_CODE_SUCCESS, .planNodeId = planNodeId, .pTargets = pTargets };
nodesRewriteNode(&(pSelect->pFromTable), doRewriteExpr, &cxt); nodesRewriteSelectStmt(pSelect, clause, doRewriteExpr, &cxt);
nodesRewriteNode(&(pSelect->pWhere), doRewriteExpr, &cxt); return cxt.errCode;
nodesRewriteList(pSelect->pPartitionByList, doRewriteExpr, &cxt);
nodesRewriteNode(&(pSelect->pWindow), doRewriteExpr, &cxt);
nodesRewriteList(pSelect->pGroupByList, doRewriteExpr, &cxt);
nodesRewriteNode(&(pSelect->pHaving), doRewriteExpr, &cxt);
nodesRewriteList(pSelect->pProjectionList, doRewriteExpr, &cxt);
nodesRewriteList(pSelect->pOrderByList, doRewriteExpr, &cxt);
return TSDB_CODE_SUCCESS;
} }
static SLogicNode* pushLogicNode(SLogicNode* pRoot, SLogicNode* pNode) { static SLogicNode* pushLogicNode(SPlanContext* pCxt, SLogicNode* pRoot, SLogicNode* pNode) {
if (TSDB_CODE_SUCCESS != pCxt->errCode) {
goto error;
}
if (NULL == pRoot) { if (NULL == pRoot) {
return pNode; return pNode;
} }
if (NULL == pNode) { if (NULL == pNode) {
return pRoot; return pRoot;
} }
pRoot->pParent = pNode;
if (NULL == pNode->pChildren) { if (NULL == pNode->pChildren) {
pNode->pChildren = nodesMakeList(); pNode->pChildren = nodesMakeList();
if (NULL == pNode->pChildren) {
goto error;
}
} }
nodesListAppend(pNode->pChildren, (SNode*)pRoot); if (TSDB_CODE_SUCCESS != nodesListAppend(pNode->pChildren, (SNode*)pRoot)) {
goto error;
}
pRoot->pParent = pNode;
return pNode; return pNode;
error:
nodesDestroyNode((SNode*)pNode);
return pRoot;
} }
static SNodeList* createScanTargets(SNodeList* pCols) { static SNodeList* createScanTargets(int32_t planNodeId, int32_t numOfScanCols) {
SNodeList* pTargets = nodesMakeList();
if (NULL == pTargets) {
return NULL;
}
for (int32_t i = 0; i < numOfScanCols; ++i) {
SColumnRefNode* pCol = (SColumnRefNode*)nodesMakeNode(QUERY_NODE_COLUMN_REF);
if (NULL == pCol || TSDB_CODE_SUCCESS != nodesListAppend(pTargets, (SNode*)pCol)) {
nodesDestroyList(pTargets);
return NULL;
}
pCol->tupleId = planNodeId;
pCol->slotId = i;
}
return pTargets;
} }
static SLogicNode* createScanLogicNode(SSelectStmt* pSelect, SRealTableNode* pRealTable) { static SLogicNode* createScanLogicNode(SPlanContext* pCxt, SSelectStmt* pSelect, SRealTableNode* pRealTable) {
SScanLogicNode* pScan = (SScanLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_SCAN); SScanLogicNode* pScan = (SScanLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_SCAN);
SNodeList* pCols = collectColumns(pSelect); CHECK_ALLOC(pScan, NULL);
pScan->pScanCols = nodesCloneList(pCols); pScan->node.id = pCxt->planNodeId++;
//
rewriteExpr(pScan->pScanCols, pSelect);
pScan->node.pTargets = createScanTargets(pCols);
pScan->pMeta = pRealTable->pMeta; pScan->pMeta = pRealTable->pMeta;
// set columns to scan
SNodeList* pCols = NULL;
CHECK_CODE(nodesCollectColumns(pSelect, SQL_CLAUSE_FROM, pScan->pMeta->uid, true, &pCols), (SLogicNode*)pScan);
pScan->pScanCols = nodesCloneList(pCols);
CHECK_ALLOC(pScan->pScanCols, (SLogicNode*)pScan);
// pScanCols of SScanLogicNode is equivalent to pTargets of other logic nodes
CHECK_CODE(rewriteExpr(pScan->node.id, pScan->pScanCols, pSelect, SQL_CLAUSE_FROM), (SLogicNode*)pScan);
// set output
pScan->node.pTargets = createScanTargets(pScan->node.id, LIST_LENGTH(pScan->pScanCols));
CHECK_ALLOC(pScan->node.pTargets, (SLogicNode*)pScan);
return (SLogicNode*)pScan; return (SLogicNode*)pScan;
} }
static SLogicNode* createSubqueryLogicNode(SSelectStmt* pSelect, STempTableNode* pTable) { static SLogicNode* createSubqueryLogicNode(SPlanContext* pCxt, SSelectStmt* pSelect, STempTableNode* pTable) {
return createQueryLogicNode(pTable->pSubquery); return createQueryLogicNode(pCxt, pTable->pSubquery);
}
static SLogicNode* createJoinLogicNode(SPlanContext* pCxt, SSelectStmt* pSelect, SJoinTableNode* pJoinTable) {
SJoinLogicNode* pJoin = (SJoinLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_JOIN);
CHECK_ALLOC(pJoin, NULL);
pJoin->node.id = pCxt->planNodeId++;
pJoin->joinType = pJoinTable->joinType;
// set left and right node
pJoin->node.pChildren = nodesMakeList();
CHECK_ALLOC(pJoin->node.pChildren, (SLogicNode*)pJoin);
SLogicNode* pLeft = createLogicNodeByTable(pCxt, pSelect, pJoinTable->pLeft);
CHECK_ALLOC(pLeft, (SLogicNode*)pJoin);
CHECK_CODE(nodesListAppend(pJoin->node.pChildren, (SNode*)pLeft), (SLogicNode*)pJoin);
SLogicNode* pRight = createLogicNodeByTable(pCxt, pSelect, pJoinTable->pRight);
CHECK_ALLOC(pRight, (SLogicNode*)pJoin);
CHECK_CODE(nodesListAppend(pJoin->node.pChildren, (SNode*)pRight), (SLogicNode*)pJoin);
// set on conditions
pJoin->pOnConditions = nodesCloneNode(pJoinTable->pOnCond);
CHECK_ALLOC(pJoin->pOnConditions, (SLogicNode*)pJoin);
// set the output and rewrite the expression in subsequent clauses with the output
SNodeList* pCols = NULL;
CHECK_CODE(nodesCollectColumns(pSelect, SQL_CLAUSE_FROM, 0, false, &pCols), (SLogicNode*)pJoin);
pJoin->node.pTargets = nodesCloneList(pCols);
CHECK_ALLOC(pJoin->node.pTargets, (SLogicNode*)pJoin);
CHECK_CODE(rewriteExpr(pJoin->node.id, pJoin->node.pTargets, pSelect, SQL_CLAUSE_FROM), (SLogicNode*)pJoin);
return (SLogicNode*)pJoin;
} }
static SLogicNode* createLogicNodeByTable(SSelectStmt* pSelect, SNode* pTable) { static SLogicNode* createLogicNodeByTable(SPlanContext* pCxt, SSelectStmt* pSelect, SNode* pTable) {
switch (nodeType(pTable)) { switch (nodeType(pTable)) {
case QUERY_NODE_REAL_TABLE: case QUERY_NODE_REAL_TABLE:
return createScanLogicNode(pSelect, (SRealTableNode*)pTable); return createScanLogicNode(pCxt, pSelect, (SRealTableNode*)pTable);
case QUERY_NODE_TEMP_TABLE: case QUERY_NODE_TEMP_TABLE:
return createSubqueryLogicNode(pSelect, (STempTableNode*)pTable); return createSubqueryLogicNode(pCxt, pSelect, (STempTableNode*)pTable);
case QUERY_NODE_JOIN_TABLE: case QUERY_NODE_JOIN_TABLE:
return createJoinLogicNode(pCxt, pSelect, (SJoinTableNode*)pTable);
default: default:
break; break;
} }
return NULL; return NULL;
} }
static SLogicNode* createFilterLogicNode(SNode* pWhere) { static SLogicNode* createWhereFilterLogicNode(SPlanContext* pCxt, SSelectStmt* pSelect) {
if (NULL == pWhere) { if (NULL == pSelect->pWhere) {
return NULL; return NULL;
} }
SFilterLogicNode* pFilter = (SFilterLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_FILTER); SFilterLogicNode* pFilter = (SFilterLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_FILTER);
pFilter->node.pConditions = nodesCloneNode(pWhere); CHECK_ALLOC(pFilter, NULL);
pFilter->node.id = pCxt->planNodeId++;
// set filter conditions
pFilter->node.pConditions = nodesCloneNode(pSelect->pWhere);
CHECK_ALLOC(pFilter->node.pConditions, (SLogicNode*)pFilter);
// set the output and rewrite the expression in subsequent clauses with the output
SNodeList* pCols = NULL;
CHECK_CODE(nodesCollectColumns(pSelect, SQL_CLAUSE_WHERE, 0, false, &pCols), (SLogicNode*)pFilter);
pFilter->node.pTargets = nodesCloneList(pCols);
CHECK_ALLOC(pFilter->node.pTargets, (SLogicNode*)pFilter);
CHECK_CODE(rewriteExpr(pFilter->node.id, pFilter->node.pTargets, pSelect, SQL_CLAUSE_WHERE), (SLogicNode*)pFilter);
return (SLogicNode*)pFilter; return (SLogicNode*)pFilter;
} }
static SLogicNode* createAggLogicNode(SSelectStmt* pSelect, SNodeList* pGroupByList, SNode* pHaving) { static SLogicNode* createAggLogicNode(SPlanContext* pCxt, SSelectStmt* pSelect) {
SNodeList* pAggFuncs = collectAggFuncs(pSelect); SNodeList* pAggFuncs = NULL;
if (NULL == pAggFuncs && NULL == pGroupByList) { CHECK_CODE(nodesCollectFuncs(pSelect, fmIsAggFunc, &pAggFuncs), NULL);
if (NULL == pAggFuncs && NULL == pSelect->pGroupByList) {
return NULL; return NULL;
} }
SAggLogicNode* pAgg = (SAggLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_AGG); SAggLogicNode* pAgg = (SAggLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_AGG);
pAgg->pGroupKeys = nodesCloneList(pGroupByList); CHECK_ALLOC(pAgg, NULL);
pAgg->node.id = pCxt->planNodeId++;
// set grouyp keys, agg funcs and having conditions
pAgg->pGroupKeys = nodesCloneList(pSelect->pGroupByList);
CHECK_ALLOC(pAgg->pGroupKeys, (SLogicNode*)pAgg);
pAgg->pAggFuncs = nodesCloneList(pAggFuncs); pAgg->pAggFuncs = nodesCloneList(pAggFuncs);
pAgg->node.pConditions = nodesCloneNode(pHaving); CHECK_ALLOC(pAgg->pAggFuncs, (SLogicNode*)pAgg);
pAgg->node.pConditions = nodesCloneNode(pSelect->pHaving);
CHECK_ALLOC(pAgg->node.pConditions, (SLogicNode*)pAgg);
// set the output and rewrite the expression in subsequent clauses with the output
SNodeList* pCols = NULL;
CHECK_CODE(nodesCollectColumns(pSelect, SQL_CLAUSE_HAVING, 0, false, &pCols), (SLogicNode*)pAgg);
pAgg->node.pTargets = nodesCloneList(pCols);
CHECK_ALLOC(pAgg->node.pTargets, (SLogicNode*)pAgg);
CHECK_CODE(rewriteExpr(pAgg->node.id, pAgg->node.pTargets, pSelect, SQL_CLAUSE_HAVING), (SLogicNode*)pAgg);
return (SLogicNode*)pAgg; return (SLogicNode*)pAgg;
} }
static SLogicNode* createSelectLogicNode(SSelectStmt* pSelect) { static SLogicNode* createProjectLogicNode(SPlanContext* pCxt, SSelectStmt* pSelect) {
SLogicNode* pRoot = createLogicNodeByTable(pSelect, pSelect->pFromTable); SProjectLogicNode* pProject = (SProjectLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_PROJECT);
pRoot = pushLogicNode(pRoot, createFilterLogicNode(pSelect->pWhere)); CHECK_ALLOC(pProject, NULL);
pRoot = pushLogicNode(pRoot, createAggLogicNode(pSelect, pSelect->pGroupByList, pSelect->pHaving)); pProject->node.id = pCxt->planNodeId++;
// pRoot = pushLogicNode(pRoot, createProjectLogicNode(pSelect, pSelect->pProjectionList));
pProject->node.pTargets = nodesCloneList(pSelect->pProjectionList);
CHECK_ALLOC(pProject->node.pTargets, (SLogicNode*)pProject);
return (SLogicNode*)pProject;
}
static SLogicNode* createSelectLogicNode(SPlanContext* pCxt, SSelectStmt* pSelect) {
SLogicNode* pRoot = createLogicNodeByTable(pCxt, pSelect, pSelect->pFromTable);
if (TSDB_CODE_SUCCESS == pCxt->errCode) {
pRoot = pushLogicNode(pCxt, pRoot, createWhereFilterLogicNode(pCxt, pSelect));
}
if (TSDB_CODE_SUCCESS == pCxt->errCode) {
pRoot = pushLogicNode(pCxt, pRoot, createAggLogicNode(pCxt, pSelect));
}
if (TSDB_CODE_SUCCESS == pCxt->errCode) {
pRoot = pushLogicNode(pCxt, pRoot, createProjectLogicNode(pCxt, pSelect));
}
return pRoot; return pRoot;
} }
static SLogicNode* createQueryLogicNode(SNode* pStmt) { static SLogicNode* createQueryLogicNode(SPlanContext* pCxt, SNode* pStmt) {
switch (nodeType(pStmt)) { switch (nodeType(pStmt)) {
case QUERY_NODE_SELECT_STMT: case QUERY_NODE_SELECT_STMT:
return createSelectLogicNode((SSelectStmt*)pStmt); return createSelectLogicNode(pCxt, (SSelectStmt*)pStmt);
default: default:
break; break;
} }
} }
int32_t createLogicPlan(SNode* pNode, SLogicNode** pLogicNode) { int32_t createLogicPlan(SNode* pNode, SLogicNode** pLogicNode) {
*pLogicNode = createQueryLogicNode(pNode); SPlanContext cxt = { .errCode = TSDB_CODE_SUCCESS, .planNodeId = 0 };
SLogicNode* pRoot = createQueryLogicNode(&cxt, pNode);
if (TSDB_CODE_SUCCESS != cxt.errCode) {
nodesDestroyNode((SNode*)pRoot);
return cxt.errCode;
}
*pLogicNode = pRoot;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -137,7 +137,7 @@ int32_t asyncSendMsgToServer(void *pTransporter, SEpSet* epSet, int64_t* pTransp ...@@ -137,7 +137,7 @@ int32_t asyncSendMsgToServer(void *pTransporter, SEpSet* epSet, int64_t* pTransp
.pCont = pMsg, .pCont = pMsg,
.contLen = pInfo->msgInfo.len, .contLen = pInfo->msgInfo.len,
.ahandle = (void*) pInfo, .ahandle = (void*) pInfo,
.handle = NULL, .handle = pInfo->msgInfo.handle,
.code = 0 .code = 0
}; };
......
...@@ -95,6 +95,7 @@ typedef struct SSchTask { ...@@ -95,6 +95,7 @@ typedef struct SSchTask {
int32_t childReady; // child task ready number int32_t childReady; // child task ready number
SArray *children; // the datasource tasks,from which to fetch the result, element is SQueryTask* SArray *children; // the datasource tasks,from which to fetch the result, element is SQueryTask*
SArray *parents; // the data destination tasks, get data from current task, element is SQueryTask* SArray *parents; // the data destination tasks, get data from current task, element is SQueryTask*
void* handle; // task send handle
} SSchTask; } SSchTask;
typedef struct SSchJobAttr { typedef struct SSchJobAttr {
......
...@@ -18,6 +18,10 @@ ...@@ -18,6 +18,10 @@
#include "query.h" #include "query.h"
#include "catalog.h" #include "catalog.h"
typedef struct SSchTrans {
void *transInst;
void *transHandle;
}SSchTrans;
static SSchedulerMgmt schMgmt = {0}; static SSchedulerMgmt schMgmt = {0};
uint64_t schGenTaskId(void) { uint64_t schGenTaskId(void) {
...@@ -932,6 +936,7 @@ int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, in ...@@ -932,6 +936,7 @@ int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, in
pTask = *task; pTask = *task;
SCH_TASK_DLOG("rsp msg received, type:%s, code:%s", TMSG_INFO(msgType), tstrerror(rspCode)); SCH_TASK_DLOG("rsp msg received, type:%s, code:%s", TMSG_INFO(msgType), tstrerror(rspCode));
pTask->handle = pMsg->handle;
SCH_ERR_JRET(schHandleResponseMsg(pJob, pTask, msgType, pMsg->pData, pMsg->len, rspCode)); SCH_ERR_JRET(schHandleResponseMsg(pJob, pTask, msgType, pMsg->pData, pMsg->len, rspCode));
_return: _return:
...@@ -1000,6 +1005,9 @@ int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) { ...@@ -1000,6 +1005,9 @@ int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) {
int32_t schAsyncSendMsg(void *transport, SEpSet* epSet, uint64_t qId, uint64_t tId, int32_t msgType, void *msg, uint32_t msgSize) { int32_t schAsyncSendMsg(void *transport, SEpSet* epSet, uint64_t qId, uint64_t tId, int32_t msgType, void *msg, uint32_t msgSize) {
int32_t code = 0; int32_t code = 0;
SSchTrans *trans = (SSchTrans *)transport;
SMsgSendInfo* pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo)); SMsgSendInfo* pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo));
if (NULL == pMsgSendInfo) { if (NULL == pMsgSendInfo) {
qError("QID:%"PRIx64 ",TID:%"PRIx64 " calloc %d failed", qId, tId, (int32_t)sizeof(SMsgSendInfo)); qError("QID:%"PRIx64 ",TID:%"PRIx64 " calloc %d failed", qId, tId, (int32_t)sizeof(SMsgSendInfo));
...@@ -1018,14 +1026,16 @@ int32_t schAsyncSendMsg(void *transport, SEpSet* epSet, uint64_t qId, uint64_t t ...@@ -1018,14 +1026,16 @@ int32_t schAsyncSendMsg(void *transport, SEpSet* epSet, uint64_t qId, uint64_t t
param->queryId = qId; param->queryId = qId;
param->taskId = tId; param->taskId = tId;
pMsgSendInfo->param = param; pMsgSendInfo->param = param;
pMsgSendInfo->msgInfo.pData = msg; pMsgSendInfo->msgInfo.pData = msg;
pMsgSendInfo->msgInfo.len = msgSize; pMsgSendInfo->msgInfo.len = msgSize;
pMsgSendInfo->msgInfo.handle = trans->transHandle;
pMsgSendInfo->msgType = msgType; pMsgSendInfo->msgType = msgType;
pMsgSendInfo->fp = fp; pMsgSendInfo->fp = fp;
int64_t transporterId = 0; int64_t transporterId = 0;
code = asyncSendMsgToServer(transport, epSet, &transporterId, pMsgSendInfo); code = asyncSendMsgToServer(trans->transInst, epSet, &transporterId, pMsgSendInfo);
if (code) { if (code) {
SCH_ERR_JRET(code); SCH_ERR_JRET(code);
} }
...@@ -1149,7 +1159,8 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, ...@@ -1149,7 +1159,8 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
atomic_store_32(&pTask->lastMsgType, msgType); atomic_store_32(&pTask->lastMsgType, msgType);
SCH_ERR_JRET(schAsyncSendMsg(pJob->transport, &epSet, pJob->queryId, pTask->taskId, msgType, msg, msgSize)); SSchTrans trans = {.transInst = pJob->transport, .transHandle = pTask->handle};
SCH_ERR_JRET(schAsyncSendMsg(&trans, &epSet, pJob->queryId, pTask->taskId, msgType, msg, msgSize));
if (isCandidateAddr) { if (isCandidateAddr) {
SCH_ERR_RET(schRecordTaskExecNode(pJob, pTask, addr)); SCH_ERR_RET(schRecordTaskExecNode(pJob, pTask, addr));
......
...@@ -133,15 +133,18 @@ static void clientHandleResp(SCliConn* conn) { ...@@ -133,15 +133,18 @@ static void clientHandleResp(SCliConn* conn) {
rpcMsg.msgType = pHead->msgType; rpcMsg.msgType = pHead->msgType;
rpcMsg.ahandle = pCtx->ahandle; rpcMsg.ahandle = pCtx->ahandle;
if (rpcMsg.msgType == TDMT_VND_QUERY_RSP || rpcMsg.msgType == TDMT_VND_FETCH_RSP) { if (rpcMsg.msgType == TDMT_VND_QUERY_RSP || rpcMsg.msgType == TDMT_VND_FETCH_RSP ||
rpcMsg.msgType == TDMT_VND_RES_READY) {
rpcMsg.handle = conn; rpcMsg.handle = conn;
conn->persist = 1; conn->persist = 1;
tDebug("client conn %p persist by app", conn);
} }
tDebug("client conn %p %s received from %s:%d, local info: %s:%d", conn, TMSG_INFO(pHead->msgType), tDebug("client conn %p %s received from %s:%d, local info: %s:%d", conn, TMSG_INFO(pHead->msgType),
inet_ntoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port), inet_ntoa(conn->locaddr.sin_addr), inet_ntoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port), inet_ntoa(conn->locaddr.sin_addr),
ntohs(conn->locaddr.sin_port)); ntohs(conn->locaddr.sin_port));
conn->secured = pHead->secured;
if (conn->push != NULL && conn->ctnRdCnt != 0) { if (conn->push != NULL && conn->ctnRdCnt != 0) {
(*conn->push->callback)(conn->push->arg, &rpcMsg); (*conn->push->callback)(conn->push->arg, &rpcMsg);
conn->push = NULL; conn->push = NULL;
...@@ -156,7 +159,6 @@ static void clientHandleResp(SCliConn* conn) { ...@@ -156,7 +159,6 @@ static void clientHandleResp(SCliConn* conn) {
} }
} }
conn->ctnRdCnt += 1; conn->ctnRdCnt += 1;
conn->secured = pHead->secured;
// buf's mem alread translated to rpcMsg.pCont // buf's mem alread translated to rpcMsg.pCont
transClearBuffer(&conn->readBuf); transClearBuffer(&conn->readBuf);
...@@ -166,16 +168,14 @@ static void clientHandleResp(SCliConn* conn) { ...@@ -166,16 +168,14 @@ static void clientHandleResp(SCliConn* conn) {
SCliThrdObj* pThrd = conn->hostThrd; SCliThrdObj* pThrd = conn->hostThrd;
// user owns conn->persist = 1 // user owns conn->persist = 1
if (conn->push == NULL || conn->persist == 0) { if (conn->push == NULL && conn->persist == 0) {
addConnToPool(pThrd->pool, pCtx->ip, pCtx->port, conn); addConnToPool(pThrd->pool, pCtx->ip, pCtx->port, conn);
destroyCmsg(conn->data);
conn->data = NULL;
} }
destroyCmsg(conn->data);
conn->data = NULL;
// start thread's timer of conn pool if not active // start thread's timer of conn pool if not active
if (!uv_is_active((uv_handle_t*)pThrd->timer) && pRpc->idleTime > 0) { if (!uv_is_active((uv_handle_t*)pThrd->timer) && pRpc->idleTime > 0) {
uv_timer_start((uv_timer_t*)pThrd->timer, clientTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0); // uv_timer_start((uv_timer_t*)pThrd->timer, clientTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0);
} }
} }
static void clientHandleExcept(SCliConn* pConn) { static void clientHandleExcept(SCliConn* pConn) {
...@@ -330,6 +330,9 @@ static void clientAllocBufferCb(uv_handle_t* handle, size_t suggested_size, uv_b ...@@ -330,6 +330,9 @@ static void clientAllocBufferCb(uv_handle_t* handle, size_t suggested_size, uv_b
} }
static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) { static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
// impl later // impl later
if (handle->data == NULL) {
return;
}
SCliConn* conn = handle->data; SCliConn* conn = handle->data;
SConnBuffer* pBuf = &conn->readBuf; SConnBuffer* pBuf = &conn->readBuf;
if (nread > 0) { if (nread > 0) {
......
...@@ -256,6 +256,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_FUNC_RETRIEVE, "Invalid func retriev ...@@ -256,6 +256,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_FUNC_RETRIEVE, "Invalid func retriev
// mnode-trans // mnode-trans
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_ALREADY_EXIST, "Transaction already exists") TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_ALREADY_EXIST, "Transaction already exists")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_NOT_EXIST, "Transaction not exists") TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_NOT_EXIST, "Transaction not exists")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_INVALID_STAGE, "Invalid stage to kill")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_CANT_PARALLEL, "Invalid stage to kill")
// mnode-topic // mnode-topic
TAOS_DEFINE_ERROR(TSDB_CODE_MND_UNSUPPORTED_TOPIC, "Topic with STable not supported yet") TAOS_DEFINE_ERROR(TSDB_CODE_MND_UNSUPPORTED_TOPIC, "Topic with STable not supported yet")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册