提交 de41059d 编写于 作者: D dapan1121

Merge remote-tracking branch 'origin/3.0' into feature/qnode

......@@ -20,7 +20,7 @@
extern "C" {
#endif
#include "nodes.h"
#include "querynodes.h"
typedef enum EFunctionType {
// aggregate function
......
......@@ -40,7 +40,11 @@ extern "C" {
(NULL == cell1 ? (node1 = NULL, false) : (node1 = cell1->pNode, true)), (NULL == cell2 ? (node2 = NULL, false) : (node2 = cell2->pNode, true)), (node1 != NULL && node2 != NULL); \
cell1 = cell1->pNext, cell2 = cell2->pNext)
#define FOREACH_FOR_REWRITE(node, list) \
for (SListCell* cell = (NULL != (list) ? (list)->pHead : NULL); (NULL != cell ? (node = &(cell->pNode), true) : (node = NULL, false)); cell = cell->pNext)
typedef enum ENodeType {
// Syntax nodes are used in parser and planner module, and some are also used in executor module, such as COLUMN, VALUE, OPERATOR, FUNCTION and so on.
QUERY_NODE_COLUMN = 1,
QUERY_NODE_VALUE,
QUERY_NODE_OPERATOR,
......@@ -59,9 +63,10 @@ typedef enum ENodeType {
QUERY_NODE_NODE_LIST,
QUERY_NODE_FILL,
// only for parser
// Only be used in parser module.
QUERY_NODE_RAW_EXPR,
// Statement nodes are used in parser and planner module.
QUERY_NODE_SET_OPERATOR,
QUERY_NODE_SELECT_STMT,
QUERY_NODE_SHOW_STMT
......@@ -87,248 +92,6 @@ typedef struct SNodeList {
SListCell* pTail;
} SNodeList;
typedef struct SRawExprNode {
ENodeType nodeType;
char* p;
uint32_t n;
SNode* pNode;
} SRawExprNode;
typedef struct SDataType {
uint8_t type;
uint8_t precision;
uint8_t scale;
int32_t bytes;
} SDataType;
typedef struct SExprNode {
ENodeType nodeType;
SDataType resType;
char aliasName[TSDB_COL_NAME_LEN];
SNodeList* pAssociationList;
} SExprNode;
typedef enum EColumnType {
COLUMN_TYPE_COLUMN = 1,
COLUMN_TYPE_TAG
} EColumnType;
typedef struct SColumnNode {
SExprNode node; // QUERY_NODE_COLUMN
int16_t colId;
EColumnType colType; // column or tag
char dbName[TSDB_DB_NAME_LEN];
char tableName[TSDB_TABLE_NAME_LEN];
char tableAlias[TSDB_TABLE_NAME_LEN];
char colName[TSDB_COL_NAME_LEN];
SNode* pProjectRef;
} SColumnNode;
typedef struct SValueNode {
SExprNode node; // QUERY_NODE_VALUE
char* literal;
bool isDuration;
union {
bool b;
int64_t i;
uint64_t u;
double d;
char* p;
} datum;
} SValueNode;
typedef enum EOperatorType {
// arithmetic operator
OP_TYPE_ADD = 1,
OP_TYPE_SUB,
OP_TYPE_MULTI,
OP_TYPE_DIV,
OP_TYPE_MOD,
// comparison operator
OP_TYPE_GREATER_THAN,
OP_TYPE_GREATER_EQUAL,
OP_TYPE_LOWER_THAN,
OP_TYPE_LOWER_EQUAL,
OP_TYPE_EQUAL,
OP_TYPE_NOT_EQUAL,
OP_TYPE_IN,
OP_TYPE_NOT_IN,
OP_TYPE_LIKE,
OP_TYPE_NOT_LIKE,
OP_TYPE_MATCH,
OP_TYPE_NMATCH,
// json operator
OP_TYPE_JSON_GET_VALUE,
OP_TYPE_JSON_CONTAINS
} EOperatorType;
typedef struct SOperatorNode {
SExprNode node; // QUERY_NODE_OPERATOR
EOperatorType opType;
SNode* pLeft;
SNode* pRight;
} SOperatorNode;
typedef enum ELogicConditionType {
LOGIC_COND_TYPE_AND,
LOGIC_COND_TYPE_OR,
LOGIC_COND_TYPE_NOT,
} ELogicConditionType;
typedef struct SLogicConditionNode {
ENodeType type; // QUERY_NODE_LOGIC_CONDITION
ELogicConditionType condType;
SNodeList* pParameterList;
} SLogicConditionNode;
typedef struct SIsNullCondNode {
ENodeType type; // QUERY_NODE_IS_NULL_CONDITION
SNode* pExpr;
bool isNull;
} SIsNullCondNode;
typedef struct SNodeListNode {
ENodeType type; // QUERY_NODE_NODE_LIST
SNodeList* pNodeList;
} SNodeListNode;
typedef struct SFunctionNode {
SExprNode node; // QUERY_NODE_FUNCTION
char functionName[TSDB_FUNC_NAME_LEN];
int32_t funcId;
int32_t funcType;
SNodeList* pParameterList;
} SFunctionNode;
typedef struct STableNode {
ENodeType type;
char dbName[TSDB_DB_NAME_LEN];
char tableName[TSDB_TABLE_NAME_LEN];
char tableAlias[TSDB_TABLE_NAME_LEN];
} STableNode;
struct STableMeta;
typedef struct SRealTableNode {
STableNode table; // QUERY_NODE_REAL_TABLE
struct STableMeta* pMeta;
} SRealTableNode;
typedef struct STempTableNode {
STableNode table; // QUERY_NODE_TEMP_TABLE
SNode* pSubquery;
} STempTableNode;
typedef enum EJoinType {
JOIN_TYPE_INNER = 1
} EJoinType;
typedef struct SJoinTableNode {
STableNode table; // QUERY_NODE_JOIN_TABLE
EJoinType joinType;
SNode* pLeft;
SNode* pRight;
SNode* pOnCond;
} SJoinTableNode;
typedef enum EGroupingSetType {
GP_TYPE_NORMAL = 1
} EGroupingSetType;
typedef struct SGroupingSetNode {
ENodeType type; // QUERY_NODE_GROUPING_SET
EGroupingSetType groupingSetType;
SNodeList* pParameterList;
} SGroupingSetNode;
typedef enum EOrder {
ORDER_ASC = 1,
ORDER_DESC
} EOrder;
typedef enum ENullOrder {
NULL_ORDER_DEFAULT = 1,
NULL_ORDER_FIRST,
NULL_ORDER_LAST
} ENullOrder;
typedef struct SOrderByExprNode {
ENodeType type; // QUERY_NODE_ORDER_BY_EXPR
SNode* pExpr;
EOrder order;
ENullOrder nullOrder;
} SOrderByExprNode;
typedef struct SLimitNode {
ENodeType type; // QUERY_NODE_LIMIT
uint64_t limit;
uint64_t offset;
} SLimitNode;
typedef struct SStateWindowNode {
ENodeType type; // QUERY_NODE_STATE_WINDOW
SNode* pCol;
} SStateWindowNode;
typedef struct SSessionWindowNode {
ENodeType type; // QUERY_NODE_SESSION_WINDOW
int64_t gap; // gap between two session window(in microseconds)
SNode* pCol;
} SSessionWindowNode;
typedef struct SIntervalWindowNode {
ENodeType type; // QUERY_NODE_INTERVAL_WINDOW
SNode* pInterval; // SValueNode
SNode* pOffset; // SValueNode
SNode* pSliding; // SValueNode
SNode* pFill;
} SIntervalWindowNode;
typedef enum EFillMode {
FILL_MODE_NONE = 1,
FILL_MODE_VALUE,
FILL_MODE_PREV,
FILL_MODE_NULL,
FILL_MODE_LINEAR,
FILL_MODE_NEXT
} EFillMode;
typedef struct SFillNode {
ENodeType type; // QUERY_NODE_FILL
EFillMode mode;
SNode* pValues; // SNodeListNode
} SFillNode;
typedef struct SSelectStmt {
ENodeType type; // QUERY_NODE_SELECT_STMT
bool isDistinct;
SNodeList* pProjectionList; // SNode
SNode* pFromTable;
SNode* pWhere;
SNodeList* pPartitionByList; // SNode
SNode* pWindow;
SNodeList* pGroupByList; // SGroupingSetNode
SNode* pHaving;
SNodeList* pOrderByList; // SOrderByExprNode
SNode* pLimit;
SNode* pSlimit;
} SSelectStmt;
typedef enum ESetOperatorType {
SET_OP_TYPE_UNION_ALL = 1
} ESetOperatorType;
typedef struct SSetOperator {
ENodeType type; // QUERY_NODE_SET_OPERATOR
ESetOperatorType opType;
SNode* pLeft;
SNode* pRight;
SNodeList* pOrderByList; // SOrderByExprNode
SNode* pLimit;
} SSetOperator;
SNode* nodesMakeNode(ENodeType type);
void nodesDestroyNode(SNode* pNode);
......@@ -344,12 +107,18 @@ typedef enum EDealRes {
DEAL_RES_IGNORE_CHILD,
DEAL_RES_ERROR,
} EDealRes;
typedef EDealRes (*FQueryNodeWalker)(SNode* pNode, void* pContext);
void nodesWalkNode(SNode* pNode, FQueryNodeWalker walker, void* pContext);
void nodesWalkList(SNodeList* pList, FQueryNodeWalker walker, void* pContext);
void nodesWalkNodePostOrder(SNode* pNode, FQueryNodeWalker walker, void* pContext);
void nodesWalkListPostOrder(SNodeList* pList, FQueryNodeWalker walker, void* pContext);
typedef EDealRes (*FNodeWalker)(SNode* pNode, void* pContext);
void nodesWalkNode(SNode* pNode, FNodeWalker walker, void* pContext);
void nodesWalkList(SNodeList* pList, FNodeWalker walker, void* pContext);
void nodesWalkNodePostOrder(SNode* pNode, FNodeWalker walker, void* pContext);
void nodesWalkListPostOrder(SNodeList* pList, FNodeWalker walker, void* pContext);
typedef EDealRes (*FNodeRewriter)(SNode** pNode, void* pContext);
void nodesRewriteNode(SNode** pNode, FNodeRewriter rewriter, void* pContext);
void nodesRewriteList(SNodeList* pList, FNodeRewriter rewriter, void* pContext);
void nodesRewriteNodePostOrder(SNode** pNode, FNodeRewriter rewriter, void* pContext);
void nodesRewriteListPostOrder(SNodeList* pList, FNodeRewriter rewriter, void* pContext);
bool nodesEqualNode(const SNode* a, const SNode* b);
......@@ -358,15 +127,6 @@ void nodesCloneNode(const SNode* pNode);
int32_t nodesNodeToString(const SNode* pNode, char** pStr, int32_t* pLen);
int32_t nodesStringToNode(const char* pStr, SNode** pNode);
bool nodesIsExprNode(const SNode* pNode);
bool nodesIsArithmeticOp(const SOperatorNode* pOp);
bool nodesIsComparisonOp(const SOperatorNode* pOp);
bool nodesIsJsonOp(const SOperatorNode* pOp);
bool nodesIsTimeorderQuery(const SNode* pQuery);
bool nodesIsTimelineQuery(const SNode* pQuery);
#ifdef __cplusplus
}
#endif
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_QUERY_NODES_H_
#define _TD_QUERY_NODES_H_
#ifdef __cplusplus
extern "C" {
#endif
#include "nodes.h"
typedef struct SRawExprNode {
ENodeType nodeType;
char* p;
uint32_t n;
SNode* pNode;
} SRawExprNode;
typedef struct SDataType {
uint8_t type;
uint8_t precision;
uint8_t scale;
int32_t bytes;
} SDataType;
typedef struct SExprNode {
ENodeType nodeType;
SDataType resType;
char aliasName[TSDB_COL_NAME_LEN];
SNodeList* pAssociationList;
} SExprNode;
typedef enum EColumnType {
COLUMN_TYPE_COLUMN = 1,
COLUMN_TYPE_TAG
} EColumnType;
typedef struct SColumnNode {
SExprNode node; // QUERY_NODE_COLUMN
int16_t colId;
EColumnType colType; // column or tag
char dbName[TSDB_DB_NAME_LEN];
char tableName[TSDB_TABLE_NAME_LEN];
char tableAlias[TSDB_TABLE_NAME_LEN];
char colName[TSDB_COL_NAME_LEN];
SNode* pProjectRef;
} SColumnNode;
typedef struct SValueNode {
SExprNode node; // QUERY_NODE_VALUE
char* literal;
bool isDuration;
union {
bool b;
int64_t i;
uint64_t u;
double d;
char* p;
} datum;
} SValueNode;
typedef enum EOperatorType {
// arithmetic operator
OP_TYPE_ADD = 1,
OP_TYPE_SUB,
OP_TYPE_MULTI,
OP_TYPE_DIV,
OP_TYPE_MOD,
// comparison operator
OP_TYPE_GREATER_THAN,
OP_TYPE_GREATER_EQUAL,
OP_TYPE_LOWER_THAN,
OP_TYPE_LOWER_EQUAL,
OP_TYPE_EQUAL,
OP_TYPE_NOT_EQUAL,
OP_TYPE_IN,
OP_TYPE_NOT_IN,
OP_TYPE_LIKE,
OP_TYPE_NOT_LIKE,
OP_TYPE_MATCH,
OP_TYPE_NMATCH,
// json operator
OP_TYPE_JSON_GET_VALUE,
OP_TYPE_JSON_CONTAINS
} EOperatorType;
typedef struct SOperatorNode {
SExprNode node; // QUERY_NODE_OPERATOR
EOperatorType opType;
SNode* pLeft;
SNode* pRight;
} SOperatorNode;
typedef enum ELogicConditionType {
LOGIC_COND_TYPE_AND,
LOGIC_COND_TYPE_OR,
LOGIC_COND_TYPE_NOT,
} ELogicConditionType;
typedef struct SLogicConditionNode {
ENodeType type; // QUERY_NODE_LOGIC_CONDITION
ELogicConditionType condType;
SNodeList* pParameterList;
} SLogicConditionNode;
typedef struct SIsNullCondNode {
ENodeType type; // QUERY_NODE_IS_NULL_CONDITION
SNode* pExpr;
bool isNull;
} SIsNullCondNode;
typedef struct SNodeListNode {
ENodeType type; // QUERY_NODE_NODE_LIST
SNodeList* pNodeList;
} SNodeListNode;
typedef struct SFunctionNode {
SExprNode node; // QUERY_NODE_FUNCTION
char functionName[TSDB_FUNC_NAME_LEN];
int32_t funcId;
int32_t funcType;
SNodeList* pParameterList;
} SFunctionNode;
typedef struct STableNode {
ENodeType type;
char dbName[TSDB_DB_NAME_LEN];
char tableName[TSDB_TABLE_NAME_LEN];
char tableAlias[TSDB_TABLE_NAME_LEN];
} STableNode;
struct STableMeta;
typedef struct SRealTableNode {
STableNode table; // QUERY_NODE_REAL_TABLE
struct STableMeta* pMeta;
} SRealTableNode;
typedef struct STempTableNode {
STableNode table; // QUERY_NODE_TEMP_TABLE
SNode* pSubquery;
} STempTableNode;
typedef enum EJoinType {
JOIN_TYPE_INNER = 1
} EJoinType;
typedef struct SJoinTableNode {
STableNode table; // QUERY_NODE_JOIN_TABLE
EJoinType joinType;
SNode* pLeft;
SNode* pRight;
SNode* pOnCond;
} SJoinTableNode;
typedef enum EGroupingSetType {
GP_TYPE_NORMAL = 1
} EGroupingSetType;
typedef struct SGroupingSetNode {
ENodeType type; // QUERY_NODE_GROUPING_SET
EGroupingSetType groupingSetType;
SNodeList* pParameterList;
} SGroupingSetNode;
typedef enum EOrder {
ORDER_ASC = 1,
ORDER_DESC
} EOrder;
typedef enum ENullOrder {
NULL_ORDER_DEFAULT = 1,
NULL_ORDER_FIRST,
NULL_ORDER_LAST
} ENullOrder;
typedef struct SOrderByExprNode {
ENodeType type; // QUERY_NODE_ORDER_BY_EXPR
SNode* pExpr;
EOrder order;
ENullOrder nullOrder;
} SOrderByExprNode;
typedef struct SLimitNode {
ENodeType type; // QUERY_NODE_LIMIT
uint64_t limit;
uint64_t offset;
} SLimitNode;
typedef struct SStateWindowNode {
ENodeType type; // QUERY_NODE_STATE_WINDOW
SNode* pCol;
} SStateWindowNode;
typedef struct SSessionWindowNode {
ENodeType type; // QUERY_NODE_SESSION_WINDOW
int64_t gap; // gap between two session window(in microseconds)
SNode* pCol;
} SSessionWindowNode;
typedef struct SIntervalWindowNode {
ENodeType type; // QUERY_NODE_INTERVAL_WINDOW
SNode* pInterval; // SValueNode
SNode* pOffset; // SValueNode
SNode* pSliding; // SValueNode
SNode* pFill;
} SIntervalWindowNode;
typedef enum EFillMode {
FILL_MODE_NONE = 1,
FILL_MODE_VALUE,
FILL_MODE_PREV,
FILL_MODE_NULL,
FILL_MODE_LINEAR,
FILL_MODE_NEXT
} EFillMode;
typedef struct SFillNode {
ENodeType type; // QUERY_NODE_FILL
EFillMode mode;
SNode* pValues; // SNodeListNode
} SFillNode;
typedef struct SSelectStmt {
ENodeType type; // QUERY_NODE_SELECT_STMT
bool isDistinct;
SNodeList* pProjectionList; // SNode
SNode* pFromTable;
SNode* pWhere;
SNodeList* pPartitionByList; // SNode
SNode* pWindow;
SNodeList* pGroupByList; // SGroupingSetNode
SNode* pHaving;
SNodeList* pOrderByList; // SOrderByExprNode
SNode* pLimit;
SNode* pSlimit;
} SSelectStmt;
typedef enum ESetOperatorType {
SET_OP_TYPE_UNION_ALL = 1
} ESetOperatorType;
typedef struct SSetOperator {
ENodeType type; // QUERY_NODE_SET_OPERATOR
ESetOperatorType opType;
SNode* pLeft;
SNode* pRight;
SNodeList* pOrderByList; // SOrderByExprNode
SNode* pLimit;
} SSetOperator;
bool nodesIsExprNode(const SNode* pNode);
bool nodesIsArithmeticOp(const SOperatorNode* pOp);
bool nodesIsComparisonOp(const SOperatorNode* pOp);
bool nodesIsJsonOp(const SOperatorNode* pOp);
bool nodesIsTimeorderQuery(const SNode* pQuery);
bool nodesIsTimelineQuery(const SNode* pQuery);
#ifdef __cplusplus
}
#endif
#endif /*_TD_QUERY_NODES_H_*/
\ No newline at end of file
......@@ -20,7 +20,7 @@
extern "C" {
#endif
#include "nodes.h"
#include "querynodes.h"
#include "nodesShowStmts.h"
#include "astCreateContext.h"
#include "ttoken.h"
......
......@@ -13,7 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "nodes.h"
#include "querynodes.h"
#include "parser.h"
#ifndef _TD_AST_CREATE_FUNCS_H_
......@@ -25,6 +25,7 @@ extern "C" {
typedef struct SQuery {
SNode* pRoot;
// todo reslut meta
} SQuery;
int32_t doParse(SParseContext* pParseCxt, SQuery* pQuery);
......
......@@ -533,7 +533,7 @@ static EDealRes translateValue(STranslateContext* pCxt, SValueNode* pVal) {
case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_BIGINT: {
char* endPtr = NULL;
pVal->datum.i = strtoull(pVal->literal, &endPtr, 10);
pVal->datum.i = strtoll(pVal->literal, &endPtr, 10);
break;
}
case TSDB_DATA_TYPE_UTINYINT:
......@@ -563,7 +563,7 @@ static EDealRes translateValue(STranslateContext* pCxt, SValueNode* pVal) {
int32_t n = strlen(pVal->literal);
char* tmp = calloc(1, n);
int32_t len = trimStringCopy(pVal->literal, n, tmp);
if (taosParseTime(tmp, &pVal->datum.u, len, pVal->node.resType.precision, tsDaylight) != TSDB_CODE_SUCCESS) {
if (taosParseTime(tmp, &pVal->datum.i, len, pVal->node.resType.precision, tsDaylight) != TSDB_CODE_SUCCESS) {
tfree(tmp);
generateSyntaxErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, pVal->literal);
return DEAL_RES_ERROR;
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_PLANNER_IMPL_H_
#define _TD_PLANNER_IMPL_H_
#ifdef __cplusplus
extern "C" {
#endif
#ifdef __cplusplus
}
#endif
#endif /*_TD_PLANNER_IMPL_H_*/
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
// int32_t getPlan(SNode* pRoot, SQueryPlanNode** pQueryPlan) {
// }
\ No newline at end of file
......@@ -233,7 +233,7 @@ typedef struct {
uv_async_t* asyncs;
} SAsyncPool;
SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, void* arg, AsyncCB cb);
SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, int sz, void* arg, AsyncCB cb);
void transDestroyAsyncPool(SAsyncPool* pool);
int transSendAsync(SAsyncPool* pool, queue* mq);
......
......@@ -30,8 +30,11 @@ void* rpcOpen(const SRpcInit* pInit) {
tstrncpy(pRpc->label, pInit->label, strlen(pInit->label));
}
pRpc->cfp = pInit->cfp;
// pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads;
pRpc->numOfThreads = pInit->numOfThreads;
if (pInit->connType == TAOS_CONN_SERVER) {
pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads;
} else {
pRpc->numOfThreads = pInit->numOfThreads;
}
pRpc->connType = pInit->connType;
pRpc->idleTime = pInit->idleTime;
pRpc->tcphandle = (*taosInitHandle[pRpc->connType])(0, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc);
......
......@@ -38,6 +38,7 @@ typedef struct SCliConn {
int32_t ref;
// debug and log info
struct sockaddr_in addr;
struct sockaddr_in locaddr;
} SCliConn;
typedef struct SCliMsg {
......@@ -130,8 +131,9 @@ static void clientHandleResp(SCliConn* conn) {
rpcMsg.msgType = pHead->msgType;
rpcMsg.ahandle = pCtx->ahandle;
tDebug("client conn %p %s received from %s:%d", conn, TMSG_INFO(pHead->msgType), inet_ntoa(conn->addr.sin_addr),
ntohs(conn->addr.sin_port));
tDebug("client conn %p %s received from %s:%d, local info: %s:%d", conn, TMSG_INFO(pHead->msgType),
inet_ntoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port), inet_ntoa(conn->locaddr.sin_addr),
ntohs(conn->locaddr.sin_port));
if (conn->push != NULL && conn->notifyCount != 0) {
(*conn->push->callback)(conn->push->arg, &rpcMsg);
......@@ -417,8 +419,9 @@ static void clientWrite(SCliConn* pConn) {
pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
uv_buf_t wb = uv_buf_init((char*)pHead, msgLen);
tDebug("client conn %p %s is send to %s:%d", pConn, TMSG_INFO(pHead->msgType), inet_ntoa(pConn->addr.sin_addr),
ntohs(pConn->addr.sin_port));
tDebug("client conn %p %s is send to %s:%d, local info %s:%d", pConn, TMSG_INFO(pHead->msgType),
inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), inet_ntoa(pConn->locaddr.sin_addr),
ntohs(pConn->locaddr.sin_port));
uv_write(pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, clientWriteCb);
}
static void clientConnCb(uv_connect_t* req, int status) {
......@@ -433,6 +436,9 @@ static void clientConnCb(uv_connect_t* req, int status) {
int addrlen = sizeof(pConn->addr);
uv_tcp_getpeername((uv_tcp_t*)pConn->stream, (struct sockaddr*)&pConn->addr, &addrlen);
addrlen = sizeof(pConn->locaddr);
uv_tcp_getsockname((uv_tcp_t*)pConn->stream, (struct sockaddr*)&pConn->locaddr, &addrlen);
tTrace("client conn %p create", pConn);
assert(pConn->stream == req->handle);
......@@ -579,7 +585,7 @@ static SCliThrdObj* createThrdObj() {
pThrd->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t));
uv_loop_init(pThrd->loop);
pThrd->asyncPool = transCreateAsyncPool(pThrd->loop, pThrd, clientAsyncCb);
pThrd->asyncPool = transCreateAsyncPool(pThrd->loop, 5, pThrd, clientAsyncCb);
pThrd->timer = malloc(sizeof(uv_timer_t));
uv_timer_init(pThrd->loop, pThrd->timer);
......
......@@ -250,9 +250,7 @@ int transDestroyBuffer(SConnBuffer* buf) {
transClearBuffer(buf);
}
SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, void* arg, AsyncCB cb) {
static int sz = 10;
SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, int sz, void* arg, AsyncCB cb) {
SAsyncPool* pool = calloc(1, sizeof(SAsyncPool));
pool->index = 0;
pool->nAsync = sz;
......
......@@ -31,9 +31,11 @@ typedef struct SSrvConn {
void* pTransInst; // rpc init
void* ahandle; //
void* hostThrd;
void* pSrvMsg;
SArray* srvMsgs;
// void* pSrvMsg;
struct sockaddr_in addr;
struct sockaddr_in locaddr;
// SRpcMsg sendMsg;
// del later
......@@ -94,6 +96,7 @@ static void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf)
static void uvWorkerAsyncCb(uv_async_t* handle);
static void uvAcceptAsyncCb(uv_async_t* handle);
static void uvStartSendRespInternal(SSrvMsg* smsg);
static void uvPrepareSendData(SSrvMsg* msg, uv_buf_t* wb);
static void uvStartSendResp(SSrvMsg* msg);
......@@ -263,8 +266,9 @@ static void uvHandleReq(SSrvConn* pConn) {
transClearBuffer(&pConn->readBuf);
pConn->ref++;
tDebug("server conn %p %s received from %s:%d", pConn, TMSG_INFO(rpcMsg.msgType), inet_ntoa(pConn->addr.sin_addr),
ntohs(pConn->addr.sin_port));
tDebug("server conn %p %s received from %s:%d, local info: %s:%d", pConn, TMSG_INFO(rpcMsg.msgType),
inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), inet_ntoa(pConn->locaddr.sin_addr),
ntohs(pConn->locaddr.sin_port));
(*(pRpc->cfp))(pRpc->parent, &rpcMsg, NULL);
// uv_timer_start(pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0);
// auth
......@@ -310,14 +314,19 @@ void uvOnTimeoutCb(uv_timer_t* handle) {
void uvOnWriteCb(uv_write_t* req, int status) {
SSrvConn* conn = req->data;
SSrvMsg* smsg = conn->pSrvMsg;
destroySmsg(smsg);
conn->pSrvMsg = NULL;
transClearBuffer(&conn->readBuf);
if (status == 0) {
tTrace("server conn %p data already was written on stream", conn);
assert(taosArrayGetSize(conn->srvMsgs) >= 1);
SSrvMsg* msg = taosArrayGetP(conn->srvMsgs, 0);
taosArrayRemove(conn->srvMsgs, 0);
destroySmsg(msg);
// send second data, just use for push
if (taosArrayGetSize(conn->srvMsgs) > 0) {
msg = (SSrvMsg*)taosArrayGetP(conn->srvMsgs, 0);
uvStartSendRespInternal(msg);
}
} else {
tError("server conn %p failed to write data, %s", conn, uv_err_name(status));
//
......@@ -354,27 +363,37 @@ static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) {
if (transCompressMsg(msg, len, NULL)) {
// impl later
}
tDebug("server conn %p %s is sent to %s:%d", pConn, TMSG_INFO(pHead->msgType), inet_ntoa(pConn->addr.sin_addr),
ntohs(pConn->addr.sin_port));
tDebug("server conn %p %s is sent to %s:%d, local info: %s:%d", pConn, TMSG_INFO(pHead->msgType),
inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), inet_ntoa(pConn->locaddr.sin_addr),
ntohs(pConn->locaddr.sin_port));
pHead->msgLen = htonl(len);
wb->base = msg;
wb->len = len;
}
static void uvStartSendResp(SSrvMsg* smsg) {
// impl
static void uvStartSendRespInternal(SSrvMsg* smsg) {
uv_buf_t wb;
uvPrepareSendData(smsg, &wb);
SSrvConn* pConn = smsg->pConn;
uv_timer_stop(pConn->pTimer);
pConn->pSrvMsg = smsg;
// pConn->pSrvMsg = smsg;
// conn->pWriter->data = smsg;
uv_write(pConn->pWriter, (uv_stream_t*)pConn->pTcp, &wb, 1, uvOnWriteCb);
// SRpcMsg* rpcMsg = smsg->msg;
}
static void uvStartSendResp(SSrvMsg* smsg) {
// impl
SSrvConn* pConn = smsg->pConn;
if (taosArrayGetSize(pConn->srvMsgs) > 0) {
tDebug("server conn %p push data to client %s:%d, local info: %s:%d", pConn, inet_ntoa(pConn->addr.sin_addr),
ntohs(pConn->addr.sin_port), inet_ntoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port));
taosArrayPush(pConn->srvMsgs, &smsg);
return;
}
taosArrayPush(pConn->srvMsgs, &smsg);
uvStartSendRespInternal(smsg);
return;
}
static void destroySmsg(SSrvMsg* smsg) {
......@@ -496,13 +515,23 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
uv_os_fd_t fd;
uv_fileno((const uv_handle_t*)pConn->pTcp, &fd);
tTrace("server conn %p created, fd: %d", pConn, fd);
int addrlen = sizeof(pConn->addr);
if (0 != uv_tcp_getpeername(pConn->pTcp, (struct sockaddr*)&pConn->addr, &addrlen)) {
tError("server conn %p failed to get peer info", pConn);
destroyConn(pConn, true);
} else {
uv_read_start((uv_stream_t*)(pConn->pTcp), uvAllocReadBufferCb, uvOnReadCb);
return;
}
addrlen = sizeof(pConn->locaddr);
if (0 != uv_tcp_getsockname(pConn->pTcp, (struct sockaddr*)&pConn->locaddr, &addrlen)) {
tError("server conn %p failed to get local info", pConn);
destroyConn(pConn, true);
return;
}
uv_read_start((uv_stream_t*)(pConn->pTcp), uvAllocReadBufferCb, uvOnReadCb);
} else {
tDebug("failed to create new connection");
destroyConn(pConn, true);
......@@ -531,7 +560,7 @@ static bool addHandleToWorkloop(void* arg) {
QUEUE_INIT(&pThrd->msg);
pthread_mutex_init(&pThrd->msgMtx, NULL);
pThrd->asyncPool = transCreateAsyncPool(pThrd->loop, pThrd, uvWorkerAsyncCb);
pThrd->asyncPool = transCreateAsyncPool(pThrd->loop, 4, pThrd, uvWorkerAsyncCb);
uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb);
return true;
}
......@@ -571,6 +600,7 @@ void* workerThread(void* arg) {
static SSrvConn* createConn() {
SSrvConn* pConn = (SSrvConn*)calloc(1, sizeof(SSrvConn));
pConn->srvMsgs = taosArrayInit(2, sizeof(void*)); //
tTrace("conn %p created", pConn);
++pConn->ref;
return pConn;
......@@ -585,8 +615,15 @@ static void destroyConn(SSrvConn* conn, bool clear) {
return;
}
transDestroyBuffer(&conn->readBuf);
destroySmsg(conn->pSrvMsg);
conn->pSrvMsg = NULL;
for (int i = 0; i < taosArrayGetSize(conn->srvMsgs); i++) {
SSrvMsg* msg = taosArrayGetP(conn->srvMsgs, i);
destroySmsg(msg);
}
taosArrayDestroy(conn->srvMsgs);
// destroySmsg(conn->pSrvMsg);
// conn->pSrvMsg = NULL;
if (clear) {
uv_close((uv_handle_t*)conn->pTcp, uvDestroyConn);
......
......@@ -77,7 +77,7 @@ void processShellMsg() {
taosFreeQitem(pRpcMsg);
{
sleep(1);
// sleep(1);
SRpcMsg nRpcMsg = {0};
nRpcMsg.pCont = rpcMallocCont(msgSize);
nRpcMsg.contLen = msgSize;
......
......@@ -13,7 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "nodes.h"
#include "querynodes.h"
#define COMPARE_SCALAR_FIELD(fldname) \
do { \
......
......@@ -13,16 +13,16 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "nodes.h"
#include "querynodes.h"
typedef enum ETraversalOrder {
TRAVERSAL_PREORDER = 1,
TRAVERSAL_POSTORDER
} ETraversalOrder;
static EDealRes walkList(SNodeList* pNodeList, ETraversalOrder order, FQueryNodeWalker walker, void* pContext);
static EDealRes walkList(SNodeList* pNodeList, ETraversalOrder order, FNodeWalker walker, void* pContext);
static EDealRes walkNode(SNode* pNode, ETraversalOrder order, FQueryNodeWalker walker, void* pContext) {
static EDealRes walkNode(SNode* pNode, ETraversalOrder order, FNodeWalker walker, void* pContext) {
if (NULL == pNode) {
return DEAL_RES_CONTINUE;
}
......@@ -119,7 +119,7 @@ static EDealRes walkNode(SNode* pNode, ETraversalOrder order, FQueryNodeWalker w
return res;
}
static EDealRes walkList(SNodeList* pNodeList, ETraversalOrder order, FQueryNodeWalker walker, void* pContext) {
static EDealRes walkList(SNodeList* pNodeList, ETraversalOrder order, FNodeWalker walker, void* pContext) {
SNode* node;
FOREACH(node, pNodeList) {
if (DEAL_RES_ERROR == walkNode(node, order, walker, pContext)) {
......@@ -129,18 +129,144 @@ static EDealRes walkList(SNodeList* pNodeList, ETraversalOrder order, FQueryNode
return DEAL_RES_CONTINUE;
}
void nodesWalkNode(SNode* pNode, FQueryNodeWalker walker, void* pContext) {
void nodesWalkNode(SNode* pNode, FNodeWalker walker, void* pContext) {
(void)walkNode(pNode, TRAVERSAL_PREORDER, walker, pContext);
}
void nodesWalkList(SNodeList* pNodeList, FQueryNodeWalker walker, void* pContext) {
void nodesWalkList(SNodeList* pNodeList, FNodeWalker walker, void* pContext) {
(void)walkList(pNodeList, TRAVERSAL_PREORDER, walker, pContext);
}
void nodesWalkNodePostOrder(SNode* pNode, FQueryNodeWalker walker, void* pContext) {
void nodesWalkNodePostOrder(SNode* pNode, FNodeWalker walker, void* pContext) {
(void)walkNode(pNode, TRAVERSAL_POSTORDER, walker, pContext);
}
void nodesWalkListPostOrder(SNodeList* pList, FQueryNodeWalker walker, void* pContext) {
void nodesWalkListPostOrder(SNodeList* pList, FNodeWalker walker, void* pContext) {
(void)walkList(pList, TRAVERSAL_POSTORDER, walker, pContext);
}
static EDealRes rewriteList(SNodeList* pNodeList, ETraversalOrder order, FNodeRewriter rewriter, void* pContext);
static EDealRes rewriteNode(SNode** pRawNode, ETraversalOrder order, FNodeRewriter rewriter, void* pContext) {
if (NULL == pRawNode || NULL == *pRawNode) {
return DEAL_RES_CONTINUE;
}
EDealRes res = DEAL_RES_CONTINUE;
if (TRAVERSAL_PREORDER == order) {
res = rewriter(pRawNode, pContext);
if (DEAL_RES_CONTINUE != res) {
return res;
}
}
SNode* pNode = *pRawNode;
switch (nodeType(pNode)) {
case QUERY_NODE_COLUMN:
case QUERY_NODE_VALUE:
case QUERY_NODE_LIMIT:
// these node types with no subnodes
break;
case QUERY_NODE_OPERATOR: {
SOperatorNode* pOpNode = (SOperatorNode*)pNode;
res = rewriteNode(&(pOpNode->pLeft), order, rewriter, pContext);
if (DEAL_RES_ERROR != res) {
res = rewriteNode(&(pOpNode->pRight), order, rewriter, pContext);
}
break;
}
case QUERY_NODE_LOGIC_CONDITION:
res = rewriteList(((SLogicConditionNode*)pNode)->pParameterList, order, rewriter, pContext);
break;
case QUERY_NODE_IS_NULL_CONDITION:
res = rewriteNode(&(((SIsNullCondNode*)pNode)->pExpr), order, rewriter, pContext);
break;
case QUERY_NODE_FUNCTION:
res = rewriteList(((SFunctionNode*)pNode)->pParameterList, order, rewriter, pContext);
break;
case QUERY_NODE_REAL_TABLE:
case QUERY_NODE_TEMP_TABLE:
break; // todo
case QUERY_NODE_JOIN_TABLE: {
SJoinTableNode* pJoinTableNode = (SJoinTableNode*)pNode;
res = rewriteNode(&(pJoinTableNode->pLeft), order, rewriter, pContext);
if (DEAL_RES_ERROR != res) {
res = rewriteNode(&(pJoinTableNode->pRight), order, rewriter, pContext);
}
if (DEAL_RES_ERROR != res) {
res = rewriteNode(&(pJoinTableNode->pOnCond), order, rewriter, pContext);
}
break;
}
case QUERY_NODE_GROUPING_SET:
res = rewriteList(((SGroupingSetNode*)pNode)->pParameterList, order, rewriter, pContext);
break;
case QUERY_NODE_ORDER_BY_EXPR:
res = rewriteNode(&(((SOrderByExprNode*)pNode)->pExpr), order, rewriter, pContext);
break;
case QUERY_NODE_STATE_WINDOW:
res = rewriteNode(&(((SStateWindowNode*)pNode)->pCol), order, rewriter, pContext);
break;
case QUERY_NODE_SESSION_WINDOW:
res = rewriteNode(&(((SSessionWindowNode*)pNode)->pCol), order, rewriter, pContext);
break;
case QUERY_NODE_INTERVAL_WINDOW: {
SIntervalWindowNode* pInterval = (SIntervalWindowNode*)pNode;
res = rewriteNode(&(pInterval->pInterval), order, rewriter, pContext);
if (DEAL_RES_ERROR != res) {
res = rewriteNode(&(pInterval->pOffset), order, rewriter, pContext);
}
if (DEAL_RES_ERROR != res) {
res = rewriteNode(&(pInterval->pSliding), order, rewriter, pContext);
}
if (DEAL_RES_ERROR != res) {
res = rewriteNode(&(pInterval->pFill), order, rewriter, pContext);
}
break;
}
case QUERY_NODE_NODE_LIST:
res = rewriteList(((SNodeListNode*)pNode)->pNodeList, order, rewriter, pContext);
break;
case QUERY_NODE_FILL:
res = rewriteNode(&(((SFillNode*)pNode)->pValues), order, rewriter, pContext);
break;
case QUERY_NODE_RAW_EXPR:
res = rewriteNode(&(((SRawExprNode*)pNode)->pNode), order, rewriter, pContext);
break;
default:
break;
}
if (DEAL_RES_ERROR != res && TRAVERSAL_POSTORDER == order) {
res = rewriter(pRawNode, pContext);
}
return res;
}
static EDealRes rewriteList(SNodeList* pNodeList, ETraversalOrder order, FNodeRewriter rewriter, void* pContext) {
SNode** pNode;
FOREACH_FOR_REWRITE(pNode, pNodeList) {
if (DEAL_RES_ERROR == rewriteNode(pNode, order, rewriter, pContext)) {
return DEAL_RES_ERROR;
}
}
return DEAL_RES_CONTINUE;
}
void nodesRewriteNode(SNode** pNode, FNodeRewriter rewriter, void* pContext) {
(void)rewriteNode(pNode, TRAVERSAL_PREORDER, rewriter, pContext);
}
void nodesRewriteList(SNodeList* pList, FNodeRewriter rewriter, void* pContext) {
(void)rewriteList(pList, TRAVERSAL_PREORDER, rewriter, pContext);
}
void nodesRewriteNodePostOrder(SNode** pNode, FNodeRewriter rewriter, void* pContext) {
(void)rewriteNode(pNode, TRAVERSAL_POSTORDER, rewriter, pContext);
}
void nodesRewriteListPostOrder(SNodeList* pList, FNodeRewriter rewriter, void* pContext) {
(void)rewriteList(pList, TRAVERSAL_POSTORDER, rewriter, pContext);
}
......@@ -13,7 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "nodes.h"
#include "querynodes.h"
#include "nodesShowStmts.h"
#include "taoserror.h"
......
......@@ -15,8 +15,44 @@
#include <gtest/gtest.h>
#include "querynodes.h"
using namespace std;
static EDealRes rewriterTest(SNode** pNode, void* pContext) {
EDealRes* pRes = (EDealRes*)pContext;
if (QUERY_NODE_OPERATOR == nodeType(*pNode)) {
SOperatorNode* pOp = (SOperatorNode*)(*pNode);
if (QUERY_NODE_VALUE != nodeType(pOp->pLeft) || QUERY_NODE_VALUE != nodeType(pOp->pRight)) {
*pRes = DEAL_RES_ERROR;
}
SValueNode* pVal = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE);
string tmp = to_string(stoi(((SValueNode*)(pOp->pLeft))->literal) + stoi(((SValueNode*)(pOp->pRight))->literal));
pVal->literal = strdup(tmp.c_str());
nodesDestroyNode(*pNode);
*pNode = (SNode*)pVal;
}
return DEAL_RES_CONTINUE;
}
TEST(NodesTest, traverseTest) {
// todo
SNode* pRoot = nodesMakeNode(QUERY_NODE_OPERATOR);
SOperatorNode* pOp = (SOperatorNode*)pRoot;
SOperatorNode* pLeft = (SOperatorNode*)nodesMakeNode(QUERY_NODE_OPERATOR);
pLeft->pLeft = nodesMakeNode(QUERY_NODE_VALUE);
((SValueNode*)(pLeft->pLeft))->literal = strdup("10");
pLeft->pRight = nodesMakeNode(QUERY_NODE_VALUE);
((SValueNode*)(pLeft->pRight))->literal = strdup("5");
pOp->pLeft = (SNode*)pLeft;
pOp->pRight = nodesMakeNode(QUERY_NODE_VALUE);
((SValueNode*)(pOp->pRight))->literal = strdup("3");
EXPECT_EQ(nodeType(pRoot), QUERY_NODE_OPERATOR);
EDealRes res = DEAL_RES_CONTINUE;
nodesRewriteNodePostOrder(&pRoot, rewriterTest, &res);
EXPECT_EQ(res, DEAL_RES_CONTINUE);
EXPECT_EQ(nodeType(pRoot), QUERY_NODE_VALUE);
EXPECT_EQ(string(((SValueNode*)pRoot)->literal), "18");
}
int main(int argc, char* argv[]) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册