提交 91a2d7c2 编写于 作者: L Liu Jicong

merge from 3.0

[submodule "src/connector/go"]
path = src/connector/go
url = git@github.com:taosdata/driver-go.git
[submodule "src/connector/grafanaplugin"]
path = src/connector/grafanaplugin
url = git@github.com:taosdata/grafanaplugin.git
[submodule "src/connector/hivemq-tdengine-extension"]
path = src/connector/hivemq-tdengine-extension
url = git@github.com:taosdata/hivemq-tdengine-extension.git
......
......@@ -111,15 +111,15 @@ typedef enum _mgmt_table {
TSDB_MGMT_TABLE_MAX,
} EShowType;
#define TSDB_ALTER_TABLE_ADD_TAG_COLUMN 1
#define TSDB_ALTER_TABLE_DROP_TAG_COLUMN 2
#define TSDB_ALTER_TABLE_CHANGE_TAG_COLUMN 3
#define TSDB_ALTER_TABLE_ADD_TAG 1
#define TSDB_ALTER_TABLE_DROP_TAG 2
#define TSDB_ALTER_TABLE_UPDATE_TAG_NAME 3
#define TSDB_ALTER_TABLE_UPDATE_TAG_VAL 4
#define TSDB_ALTER_TABLE_ADD_COLUMN 5
#define TSDB_ALTER_TABLE_DROP_COLUMN 6
#define TSDB_ALTER_TABLE_CHANGE_COLUMN 7
#define TSDB_ALTER_TABLE_MODIFY_TAG_COLUMN 8
#define TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES 7
#define TSDB_ALTER_TABLE_UPDATE_TAG_BYTES 8
#define TSDB_FILL_NONE 0
#define TSDB_FILL_NULL 1
......@@ -147,10 +147,15 @@ typedef enum _mgmt_table {
#define TSDB_COL_IS_NORMAL_COL(f) ((f & (~(TSDB_COL_NULL))) == TSDB_COL_NORMAL)
#define TSDB_COL_IS_UD_COL(f) ((f & (~(TSDB_COL_NULL))) == TSDB_COL_UDC)
#define TSDB_COL_REQ_NULL(f) (((f)&TSDB_COL_NULL) != 0)
#define TD_SUPER_TABLE TSDB_SUPER_TABLE
#define TD_CHILD_TABLE TSDB_CHILD_TABLE
#define TD_NORMAL_TABLE TSDB_NORMAL_TABLE
typedef struct {
int32_t vgId;
char* dbName;
char* tableFullName;
char* dbFName;
char* tbName;
} SBuildTableMetaInput;
typedef struct {
......@@ -158,6 +163,12 @@ typedef struct {
int32_t vgVersion;
} SBuildUseDBInput;
typedef struct SField {
char name[TSDB_COL_NAME_LEN];
uint8_t type;
int32_t bytes;
} SField;
#pragma pack(push, 1)
// null-terminated string instead of char array to avoid too many memory consumption in case of more than 1M tableMeta
......@@ -247,21 +258,32 @@ typedef struct SSchema {
typedef struct {
char name[TSDB_TABLE_FNAME_LEN];
int8_t igExists;
int32_t numOfTags;
int32_t numOfColumns;
SSchema pSchema[];
int32_t numOfTags;
SArray* pColumns;
SArray* pTags;
} SMCreateStbReq;
int32_t tSerializeSMCreateStbReq(void** buf, SMCreateStbReq* pReq);
void* tDeserializeSMCreateStbReq(void* buf, SMCreateStbReq* pReq);
typedef struct {
char name[TSDB_TABLE_FNAME_LEN];
int8_t igNotExists;
} SMDropStbReq;
int32_t tSerializeSMDropStbReq(void** buf, SMDropStbReq* pReq);
void* tDeserializeSMDropStbReq(void* buf, SMDropStbReq* pReq);
typedef struct {
char name[TSDB_TABLE_FNAME_LEN];
int8_t alterType;
SSchema schema;
} SMAlterStbReq;
int32_t numOfFields;
SArray* pFields;
} SMAltertbReq;
int32_t tSerializeSMAlterStbReq(void** buf, SMAltertbReq* pReq);
void* tDeserializeSMAlterStbReq(void* buf, SMAltertbReq* pReq);
typedef struct {
int32_t pid;
......@@ -691,8 +713,8 @@ typedef struct {
typedef struct {
SMsgHead header;
char dbFname[TSDB_DB_FNAME_LEN];
char tableFname[TSDB_TABLE_FNAME_LEN];
char dbFName[TSDB_DB_FNAME_LEN];
char tbName[TSDB_TABLE_NAME_LEN];
} STableInfoReq;
typedef struct {
......@@ -717,9 +739,10 @@ typedef struct {
} SVgroupsInfo;
typedef struct {
char tbFname[TSDB_TABLE_FNAME_LEN]; // table full name
char stbFname[TSDB_TABLE_FNAME_LEN];
char dbFname[TSDB_DB_FNAME_LEN];
char tbName[TSDB_TABLE_NAME_LEN];
char stbName[TSDB_TABLE_NAME_LEN];
char dbFName[TSDB_DB_FNAME_LEN];
uint64_t dbId;
int32_t numOfTags;
int32_t numOfColumns;
int8_t precision;
......@@ -1158,10 +1181,7 @@ typedef struct SVCreateTbReq {
char* name;
uint32_t ttl;
uint32_t keep;
#define TD_SUPER_TABLE TSDB_SUPER_TABLE
#define TD_CHILD_TABLE TSDB_CHILD_TABLE
#define TD_NORMAL_TABLE TSDB_NORMAL_TABLE
uint8_t type;
uint8_t type;
union {
struct {
tb_uid_t suid;
......@@ -1179,42 +1199,44 @@ typedef struct SVCreateTbReq {
SSchema* pSchema;
} ntbCfg;
};
} SVCreateTbReq;
} SVCreateTbReq, SVUpdateTbReq;
typedef struct {
} SVCreateTbRsp, SVUpdateTbRsp;
int32_t tSerializeSVCreateTbReq(void** buf, SVCreateTbReq* pReq);
void* tDeserializeSVCreateTbReq(void* buf, SVCreateTbReq* pReq);
int32_t tSerializeSVCreateTbRsp(void** buf, SVCreateTbRsp* pRsp);
void* tDeserializeSVCreateTbRsp(void* buf, SVCreateTbRsp* pRsp);
typedef struct {
uint64_t ver; // use a general definition
SArray* pArray;
} SVCreateTbBatchReq;
int tSerializeSVCreateTbReq(void** buf, SVCreateTbReq* pReq);
void* tDeserializeSVCreateTbReq(void* buf, SVCreateTbReq* pReq);
int tSVCreateTbBatchReqSerialize(void** buf, SVCreateTbBatchReq* pReq);
void* tSVCreateTbBatchReqDeserialize(void* buf, SVCreateTbBatchReq* pReq);
typedef struct {
SMsgHead head;
} SVCreateTbRsp;
} SVCreateTbBatchRsp;
typedef struct {
SMsgHead head;
char name[TSDB_TABLE_FNAME_LEN];
int8_t ignoreNotExists;
} SVAlterTbReq;
int32_t tSerializeSVCreateTbBatchReq(void** buf, SVCreateTbBatchReq* pReq);
void* tDeserializeSVCreateTbBatchReq(void* buf, SVCreateTbBatchReq* pReq);
int32_t tSerializeSVCreateTbBatchRsp(void** buf, SVCreateTbBatchRsp* pRsp);
void* tDeserializeSVCreateTbBatchRsp(void* buf, SVCreateTbBatchRsp* pRsp);
typedef struct {
SMsgHead head;
} SVAlterTbRsp;
typedef struct {
SMsgHead head;
char name[TSDB_TABLE_FNAME_LEN];
int64_t suid;
uint64_t ver;
char* name;
uint8_t type;
tb_uid_t suid;
} SVDropTbReq;
typedef struct {
SMsgHead head;
} SVDropTbRsp;
int32_t tSerializeSVDropTbReq(void** buf, SVDropTbReq* pReq);
void* tDeserializeSVDropTbReq(void* buf, SVDropTbReq* pReq);
int32_t tSerializeSVDropTbRsp(void** buf, SVDropTbRsp* pRsp);
void* tDeserializeSVDropTbRsp(void* buf, SVDropTbRsp* pRsp);
typedef struct {
SMsgHead head;
int64_t uid;
......
......@@ -207,36 +207,77 @@
#define TK_INTO 189
#define TK_VALUES 190
#define NEW_TK_UNION 1
#define NEW_TK_ALL 2
#define NEW_TK_MINUS 3
#define NEW_TK_EXCEPT 4
#define NEW_TK_INTERSECT 5
#define NEW_TK_NK_PLUS 6
#define NEW_TK_NK_MINUS 7
#define NEW_TK_NK_STAR 8
#define NEW_TK_NK_SLASH 9
#define NEW_TK_SHOW 10
#define NEW_TK_DATABASES 11
#define NEW_TK_NK_ID 12
#define NEW_TK_NK_LP 13
#define NEW_TK_NK_RP 14
#define NEW_TK_NK_COMMA 15
#define NEW_TK_NK_LITERAL 16
#define NEW_TK_NK_DOT 17
#define NEW_TK_SELECT 18
#define NEW_TK_DISTINCT 19
#define NEW_TK_AS 20
#define NEW_TK_FROM 21
#define NEW_TK_WITH 22
#define NEW_TK_RECURSIVE 23
#define NEW_TK_ORDER 24
#define NEW_TK_BY 25
#define NEW_TK_ASC 26
#define NEW_TK_DESC 27
#define NEW_TK_NULLS 28
#define NEW_TK_FIRST 29
#define NEW_TK_LAST 30
#define NEW_TK_OR 1
#define NEW_TK_AND 2
#define NEW_TK_UNION 3
#define NEW_TK_ALL 4
#define NEW_TK_MINUS 5
#define NEW_TK_EXCEPT 6
#define NEW_TK_INTERSECT 7
#define NEW_TK_NK_PLUS 8
#define NEW_TK_NK_MINUS 9
#define NEW_TK_NK_STAR 10
#define NEW_TK_NK_SLASH 11
#define NEW_TK_NK_REM 12
#define NEW_TK_SHOW 13
#define NEW_TK_DATABASES 14
#define NEW_TK_NK_INTEGER 15
#define NEW_TK_NK_FLOAT 16
#define NEW_TK_NK_STRING 17
#define NEW_TK_NK_BOOL 18
#define NEW_TK_TIMESTAMP 19
#define NEW_TK_NK_VARIABLE 20
#define NEW_TK_NK_COMMA 21
#define NEW_TK_NK_ID 22
#define NEW_TK_NK_LP 23
#define NEW_TK_NK_RP 24
#define NEW_TK_NK_DOT 25
#define NEW_TK_BETWEEN 26
#define NEW_TK_NOT 27
#define NEW_TK_IS 28
#define NEW_TK_NULL 29
#define NEW_TK_NK_LT 30
#define NEW_TK_NK_GT 31
#define NEW_TK_NK_LE 32
#define NEW_TK_NK_GE 33
#define NEW_TK_NK_NE 34
#define NEW_TK_NK_EQ 35
#define NEW_TK_LIKE 36
#define NEW_TK_MATCH 37
#define NEW_TK_NMATCH 38
#define NEW_TK_IN 39
#define NEW_TK_FROM 40
#define NEW_TK_AS 41
#define NEW_TK_JOIN 42
#define NEW_TK_ON 43
#define NEW_TK_INNER 44
#define NEW_TK_SELECT 45
#define NEW_TK_DISTINCT 46
#define NEW_TK_WHERE 47
#define NEW_TK_PARTITION 48
#define NEW_TK_BY 49
#define NEW_TK_SESSION 50
#define NEW_TK_STATE_WINDOW 51
#define NEW_TK_INTERVAL 52
#define NEW_TK_SLIDING 53
#define NEW_TK_FILL 54
#define NEW_TK_VALUE 55
#define NEW_TK_NONE 56
#define NEW_TK_PREV 57
#define NEW_TK_LINEAR 58
#define NEW_TK_NEXT 59
#define NEW_TK_GROUP 60
#define NEW_TK_HAVING 61
#define NEW_TK_ORDER 62
#define NEW_TK_SLIMIT 63
#define NEW_TK_SOFFSET 64
#define NEW_TK_LIMIT 65
#define NEW_TK_OFFSET 66
#define NEW_TK_ASC 67
#define NEW_TK_DESC 68
#define NEW_TK_NULLS 69
#define NEW_TK_FIRST 70
#define NEW_TK_LAST 71
#define TK_SPACE 300
#define TK_COMMENT 301
......@@ -247,6 +288,8 @@
#define TK_FILE 306
#define TK_QUESTION 307 // denoting the placeholder of "?",when invoking statement bind query
#define TK_NIL 65535
#endif
......@@ -32,6 +32,15 @@ extern "C" {
struct SCatalog;
enum {
CTG_DBG_DB_NUM = 1,
CTG_DBG_META_NUM,
CTG_DBG_STB_NUM,
CTG_DBG_DB_RENT_NUM,
CTG_DBG_STB_RENT_NUM,
};
typedef struct SCatalogReq {
SArray *pTableName; // element is SNAME
SArray *pUdf; // udf name
......@@ -49,17 +58,19 @@ typedef struct SCatalogCfg {
uint32_t maxTblCacheNum;
uint32_t maxDBCacheNum;
uint32_t dbRentSec;
uint32_t stableRentSec;
uint32_t stbRentSec;
} SCatalogCfg;
typedef struct SSTableMetaVersion {
char dbFName[TSDB_DB_FNAME_LEN];
char stbName[TSDB_TABLE_NAME_LEN];
uint64_t suid;
int16_t sversion;
int16_t tversion;
} SSTableMetaVersion;
typedef struct SDbVgVersion {
char dbName[TSDB_DB_FNAME_LEN];
char dbFName[TSDB_DB_FNAME_LEN];
int64_t dbId;
int32_t vgVersion;
} SDbVgVersion;
......@@ -97,9 +108,11 @@ int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName,
*/
int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const char* pDBName, bool forceUpdate, SArray** pVgroupList);
int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo);
int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, uint64_t dbId, SDBVgroupInfo* dbInfo);
int32_t catalogRemoveDBVgroup(struct SCatalog* pCatalog, SDbVgVersion* dbInfo);
int32_t catalogRemoveDB(struct SCatalog* pCatalog, const char* dbName, uint64_t dbId);
int32_t catalogRemoveSTableMeta(struct SCatalog* pCatalog, const char* dbName, const char* stbName, uint64_t suid);
/**
* Get a table's meta data.
......@@ -123,6 +136,8 @@ int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void * pTransporter, cons
*/
int32_t catalogGetSTableMeta(struct SCatalog* pCatalog, void * pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta);
int32_t catalogUpdateSTableMeta(struct SCatalog* pCatalog, STableMetaRsp *rspMsg);
/**
* Force renew a table's local cached meta data.
......
......@@ -37,12 +37,6 @@ typedef struct SQueryNode {
#define queryNodeType(nodeptr) (((const SQueryNode*)(nodeptr))->type)
typedef struct SField {
char name[TSDB_COL_NAME_LEN];
uint8_t type;
int32_t bytes;
} SField;
typedef struct SFieldInfo {
int16_t numOfOutput; // number of column in result
SField *final;
......
......@@ -81,16 +81,15 @@ typedef struct STableMeta {
} STableMeta;
typedef struct SDBVgroupInfo {
SRWLatch lock;
uint64_t dbId;
int32_t vgVersion;
int8_t hashMethod;
SHashObj *vgInfo; //key:vgId, value:SVgroupInfo
SHashObj *vgHash; //key:vgId, value:SVgroupInfo
} SDBVgroupInfo;
typedef struct SUseDbOutput {
char db[TSDB_DB_FNAME_LEN];
SDBVgroupInfo dbVgroup;
char db[TSDB_DB_FNAME_LEN];
uint64_t dbId;
SDBVgroupInfo *dbVgroup;
} SUseDbOutput;
enum {
......@@ -103,8 +102,10 @@ enum {
typedef struct STableMetaOutput {
int32_t metaType;
char ctbFname[TSDB_TABLE_FNAME_LEN];
char tbFname[TSDB_TABLE_FNAME_LEN];
uint64_t dbId;
char dbFName[TSDB_DB_FNAME_LEN];
char ctbName[TSDB_TABLE_NAME_LEN];
char tbName[TSDB_TABLE_NAME_LEN];
SCTableMeta ctbMeta;
STableMeta *tbMeta;
} STableMetaOutput;
......@@ -159,6 +160,8 @@ void initQueryModuleMsgHandle();
const SSchema* tGetTbnameColumnSchema();
bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTags);
int32_t queryCreateTableMetaFromMsg(STableMetaRsp* msg, bool isSuperTable, STableMeta **pMeta);
extern int32_t (*queryBuildMsg[TDMT_MAX])(void* input, char **msg, int32_t msgSize, int32_t *msgLen);
extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char *msg, int32_t msgSize);
......
......@@ -22,6 +22,19 @@ extern "C" {
#include "tdef.h"
#define nodeType(nodeptr) (((const SNode*)(nodeptr))->type)
#define setNodeType(nodeptr, type) (((SNode*)(nodeptr))->type = (type))
#define LIST_LENGTH(l) (NULL != (l) ? (l)->length : 0)
#define FOREACH(node, list) \
for (SListCell* cell = (NULL != (list) ? (list)->pHead : NULL); (NULL != cell ? (node = cell->pNode, true) : (node = NULL, false)); cell = cell->pNext)
#define FORBOTH(node1, list1, node2, list2) \
for (SListCell* cell1 = (NULL != (list1) ? (list1)->pHead : NULL), *cell2 = (NULL != (list2) ? (list2)->pHead : NULL); \
(NULL == cell1 ? (node1 = NULL, false) : (node1 = cell1->pNode, true)), (NULL == cell2 ? (node2 = NULL, false) : (node2 = cell2->pNode, true)), (node1 != NULL && node2 != NULL); \
cell1 = cell1->pNext, cell2 = cell2->pNext)
typedef enum ENodeType {
QUERY_NODE_COLUMN = 1,
QUERY_NODE_VALUE,
......@@ -38,6 +51,11 @@ typedef enum ENodeType {
QUERY_NODE_STATE_WINDOW,
QUERY_NODE_SESSION_WINDOW,
QUERY_NODE_INTERVAL_WINDOW,
QUERY_NODE_NODE_LIST,
QUERY_NODE_FILL,
// only for parser
QUERY_NODE_RAW_EXPR,
QUERY_NODE_SET_OPERATOR,
QUERY_NODE_SELECT_STMT,
......@@ -52,9 +70,6 @@ typedef struct SNode {
ENodeType type;
} SNode;
#define nodeType(nodeptr) (((const SNode*)(nodeptr))->type)
#define setNodeType(nodeptr, type) (((SNode*)(nodeptr))->type = (type))
typedef struct SListCell {
SNode* pNode;
struct SListCell* pNext;
......@@ -62,18 +77,16 @@ typedef struct SListCell {
typedef struct SNodeList {
int16_t length;
SListCell* pHeader;
SListCell* pHead;
SListCell* pTail;
} SNodeList;
#define LIST_LENGTH(l) (NULL != (l) ? (l)->length : 0)
#define FOREACH(node, list) \
for (SListCell* cell = (NULL != (list) ? (list)->pHeader : NULL); (NULL != cell ? (node = cell->pNode, true) : (node = NULL, false)); cell = cell->pNext)
#define FORBOTH(node1, list1, node2, list2) \
for (SListCell* cell1 = (NULL != (list1) ? (list1)->pHeader : NULL), *cell2 = (NULL != (list2) ? (list2)->pHeader : NULL); \
(NULL == cell1 ? (node1 = NULL, false) : (node1 = cell1->pNode, true)), (NULL == cell2 ? (node2 = NULL, false) : (node2 = cell2->pNode, true)), (node1 != NULL && node2 != NULL); \
cell1 = cell1->pNext, cell2 = cell2->pNext)
typedef struct SRawExprNode {
ENodeType nodeType;
char* p;
uint32_t n;
SNode* pNode;
} SRawExprNode;
typedef struct SDataType {
uint8_t type;
......@@ -86,6 +99,7 @@ typedef struct SExprNode {
ENodeType nodeType;
SDataType resType;
char aliasName[TSDB_COL_NAME_LEN];
SNodeList* pAssociationList;
} SExprNode;
typedef enum EColumnType {
......@@ -99,12 +113,22 @@ typedef struct SColumnNode {
EColumnType colType; // column or tag
char dbName[TSDB_DB_NAME_LEN];
char tableName[TSDB_TABLE_NAME_LEN];
char tableAlias[TSDB_TABLE_NAME_LEN];
char colName[TSDB_COL_NAME_LEN];
SNode* pProjectRef;
} SColumnNode;
typedef struct SValueNode {
SExprNode type; // QUERY_NODE_VALUE
SExprNode node; // QUERY_NODE_VALUE
char* literal;
bool isDuration;
union {
bool b;
int64_t i;
uint64_t u;
double d;
char* p;
} datum;
} SValueNode;
typedef enum EOperatorType {
......@@ -135,7 +159,7 @@ typedef enum EOperatorType {
} EOperatorType;
typedef struct SOperatorNode {
SExprNode type; // QUERY_NODE_OPERATOR
SExprNode node; // QUERY_NODE_OPERATOR
EOperatorType opType;
SNode* pLeft;
SNode* pRight;
......@@ -156,25 +180,33 @@ typedef struct SLogicConditionNode {
typedef struct SIsNullCondNode {
ENodeType type; // QUERY_NODE_IS_NULL_CONDITION
SNode* pExpr;
bool isNot;
bool isNull;
} SIsNullCondNode;
typedef struct SNodeListNode {
ENodeType type; // QUERY_NODE_NODE_LIST
SNodeList* pNodeList;
} SNodeListNode;
typedef struct SFunctionNode {
SExprNode type; // QUERY_NODE_FUNCTION
SExprNode node; // QUERY_NODE_FUNCTION
char functionName[TSDB_FUNC_NAME_LEN];
int32_t funcId;
SNodeList* pParameterList; // SNode
SNodeList* pParameterList;
} SFunctionNode;
typedef struct STableNode {
ENodeType type;
char dbName[TSDB_DB_NAME_LEN];
char tableName[TSDB_TABLE_NAME_LEN];
char tableAliasName[TSDB_COL_NAME_LEN];
char tableAlias[TSDB_TABLE_NAME_LEN];
} STableNode;
struct STableMeta;
typedef struct SRealTableNode {
STableNode table; // QUERY_NODE_REAL_TABLE
char dbName[TSDB_DB_NAME_LEN];
struct STableMeta* pMeta;
} SRealTableNode;
typedef struct STempTableNode {
......@@ -241,24 +273,40 @@ typedef struct SSessionWindowNode {
typedef struct SIntervalWindowNode {
ENodeType type; // QUERY_NODE_INTERVAL_WINDOW
int64_t interval;
int64_t sliding;
int64_t offset;
SNode* pInterval; // SValueNode
SNode* pOffset; // SValueNode
SNode* pSliding; // SValueNode
SNode* pFill;
} SIntervalWindowNode;
typedef enum EFillMode {
FILL_MODE_NONE = 1,
FILL_MODE_VALUE,
FILL_MODE_PREV,
FILL_MODE_NULL,
FILL_MODE_LINEAR,
FILL_MODE_NEXT
} EFillMode;
typedef struct SFillNode {
ENodeType type; // QUERY_NODE_FILL
EFillMode mode;
SNode* pValues; // SNodeListNode
} SFillNode;
typedef struct SSelectStmt {
ENodeType type; // QUERY_NODE_SELECT_STMT
bool isDistinct;
bool isStar;
SNodeList* pProjectionList; // SNode
SNode* pFromTable;
SNode* pWhereCond;
SNode* pWhere;
SNodeList* pPartitionByList; // SNode
SNode* pWindowClause;
SNode* pWindow;
SNodeList* pGroupByList; // SGroupingSetNode
SNode* pHaving;
SNodeList* pOrderByList; // SOrderByExprNode
SLimitNode limit;
SLimitNode slimit;
SNode* pLimit;
SNode* pSlimit;
} SSelectStmt;
typedef enum ESetOperatorType {
......@@ -270,12 +318,23 @@ typedef struct SSetOperator {
ESetOperatorType opType;
SNode* pLeft;
SNode* pRight;
SNodeList* pOrderByList; // SOrderByExprNode
SNode* pLimit;
} SSetOperator;
SNode* nodesMakeNode(ENodeType type);
void nodesDestroyNode(SNode* pNode);
SNodeList* nodesMakeList();
SNodeList* nodesListAppend(SNodeList* pList, SNode* pNode);
void nodesDestroyList(SNodeList* pList);
typedef bool (*FQueryNodeWalker)(SNode* pNode, void* pContext);
bool nodesWalkNode(SNode* pNode, FQueryNodeWalker walker, void* pContext);
bool nodesWalkNodeList(SNodeList* pNodeList, FQueryNodeWalker walker, void* pContext);
void nodesWalkNode(SNode* pNode, FQueryNodeWalker walker, void* pContext);
void nodesWalkList(SNodeList* pList, FQueryNodeWalker walker, void* pContext);
void nodesWalkNodePostOrder(SNode* pNode, FQueryNodeWalker walker, void* pContext);
void nodesWalkListPostOrder(SNodeList* pList, FQueryNodeWalker walker, void* pContext);
bool nodesWalkStmt(SNode* pNode, FQueryNodeWalker walker, void* pContext);
......@@ -286,13 +345,13 @@ void nodesCloneNode(const SNode* pNode);
int32_t nodesNodeToString(const SNode* pNode, char** pStr, int32_t* pLen);
int32_t nodesStringToNode(const char* pStr, SNode** pNode);
bool nodesIsArithmeticOp(const SOperatorNode* pOp);
bool nodesIsComparisonOp(const SOperatorNode* pOp);
bool nodesIsJsonOp(const SOperatorNode* pOp);
bool nodesIsTimeorderQuery(const SNode* pQuery);
bool nodesIsTimelineQuery(const SNode* pQuery);
SNode* nodesMakeNode(ENodeType type);
void nodesDestroyNode(SNode* pNode);
void nodesDestroyNodeList(SNodeList* pList);
#ifdef __cplusplus
}
#endif
......
......@@ -218,18 +218,19 @@ int32_t* taosGetErrno();
// mnode-stable
#define TSDB_CODE_MND_STB_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03A0)
#define TSDB_CODE_MND_STB_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03A1)
#define TSDB_CODE_MND_TOO_MANY_STBS TAOS_DEF_ERROR_CODE(0, 0x03A2)
#define TSDB_CODE_MND_INVALID_STB TAOS_DEF_ERROR_CODE(0, 0x03A3)
#define TSDB_CODE_MND_INVALID_STB_OPTION TAOS_DEF_ERROR_CODE(0, 0x03A4)
#define TSDB_CODE_MND_STB_OPTION_UNCHNAGED TAOS_DEF_ERROR_CODE(0, 0x03A5)
#define TSDB_CODE_MND_TOO_MANY_TAGS TAOS_DEF_ERROR_CODE(0, 0x03A6)
#define TSDB_CODE_MND_TAG_ALREAY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03A7)
#define TSDB_CODE_MND_TAG_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03A8)
#define TSDB_CODE_MND_TOO_MANY_COLUMNS TAOS_DEF_ERROR_CODE(0, 0x03A9)
#define TSDB_CODE_MND_COLUMN_ALREAY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03AA)
#define TSDB_CODE_MND_COLUMN_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03AB)
#define TSDB_CODE_MND_EXCEED_MAX_ROW_BYTES TAOS_DEF_ERROR_CODE(0, 0x03AC)
#define TSDB_CODE_MND_NAME_CONFLICT_WITH_TOPIC TAOS_DEF_ERROR_CODE(0, 0x03AD)
#define TSDB_CODE_MND_NAME_CONFLICT_WITH_TOPIC TAOS_DEF_ERROR_CODE(0, 0x03A2)
#define TSDB_CODE_MND_TOO_MANY_STBS TAOS_DEF_ERROR_CODE(0, 0x03A3)
#define TSDB_CODE_MND_INVALID_STB TAOS_DEF_ERROR_CODE(0, 0x03A4)
#define TSDB_CODE_MND_INVALID_STB_OPTION TAOS_DEF_ERROR_CODE(0, 0x03A5)
#define TSDB_CODE_MND_INVALID_STB_ALTER_OPTION TAOS_DEF_ERROR_CODE(0, 0x03A6)
#define TSDB_CODE_MND_STB_OPTION_UNCHNAGED TAOS_DEF_ERROR_CODE(0, 0x03A7)
#define TSDB_CODE_MND_INVALID_ROW_BYTES TAOS_DEF_ERROR_CODE(0, 0x03A8)
#define TSDB_CODE_MND_TOO_MANY_TAGS TAOS_DEF_ERROR_CODE(0, 0x03A9)
#define TSDB_CODE_MND_TAG_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03AA)
#define TSDB_CODE_MND_TAG_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03AB)
#define TSDB_CODE_MND_TOO_MANY_COLUMNS TAOS_DEF_ERROR_CODE(0, 0x03AC)
#define TSDB_CODE_MND_COLUMN_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03AD)
#define TSDB_CODE_MND_COLUMN_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03AE)
// mnode-func
#define TSDB_CODE_MND_FUNC_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03C0)
......@@ -435,12 +436,21 @@ int32_t* taosGetErrno();
#define TSDB_CODE_CTG_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x2402) //catalog is not ready
#define TSDB_CODE_CTG_MEM_ERROR TAOS_DEF_ERROR_CODE(0, 0x2403) //catalog memory error
#define TSDB_CODE_CTG_SYS_ERROR TAOS_DEF_ERROR_CODE(0, 0x2404) //catalog system error
#define TSDB_CODE_CTG_DB_DROPPED TAOS_DEF_ERROR_CODE(0, 0x2405) //Database is dropped
#define TSDB_CODE_CTG_OUT_OF_SERVICE TAOS_DEF_ERROR_CODE(0, 0x2406) //catalog is out of service
//scheduler
#define TSDB_CODE_SCH_STATUS_ERROR TAOS_DEF_ERROR_CODE(0, 0x2501) //scheduler status error
#define TSDB_CODE_SCH_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x2502) //scheduler internal error
//parser
#define TSDB_CODE_PAR_INVALID_COLUMN TAOS_DEF_ERROR_CODE(0, 0x2601) //invalid column name
#define TSDB_CODE_PAR_TABLE_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x2602) //table not exist
#define TSDB_CODE_PAR_AMBIGUOUS_COLUMN TAOS_DEF_ERROR_CODE(0, 0x2603) //ambiguous column
#define TSDB_CODE_PAR_WRONG_VALUE_TYPE TAOS_DEF_ERROR_CODE(0, 0x2604) //wrong value type
#define TSDB_CODE_PAR_FUNTION_PARA_NUM TAOS_DEF_ERROR_CODE(0, 0x2605) //invalid number of arguments
#define TSDB_CODE_PAR_FUNTION_PARA_TYPE TAOS_DEF_ERROR_CODE(0, 0x2606) //inconsistent datatypes
#define TSDB_CODE_PAR_ILLEGAL_USE_AGG_FUNCTION TAOS_DEF_ERROR_CODE(0, 0x2607) //there mustn't be aggregation
#ifdef __cplusplus
}
......
......@@ -242,7 +242,7 @@ int hbHandleRsp(SClientHbBatchRsp* hbRsp);
// cluster level
SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo, char *key);
void appHbMgrCleanup(SAppHbMgr* pAppHbMgr);
void appHbMgrCleanup(void);
// conn level
int hbRegisterConn(SAppHbMgr* pAppHbMgr, int32_t connId, int64_t clusterId, int32_t hbType);
......
......@@ -41,19 +41,13 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog
tscDebug("hb db rsp, db:%s, vgVersion:%d, uid:%"PRIx64, rsp->db, rsp->vgVersion, rsp->uid);
if (rsp->vgVersion < 0) {
SDbVgVersion dbInfo;
strcpy(dbInfo.dbName, rsp->db);
dbInfo.dbId = rsp->uid;
dbInfo.vgVersion = rsp->vgVersion;
code = catalogRemoveDBVgroup(pCatalog, &dbInfo);
code = catalogRemoveDB(pCatalog, rsp->db, rsp->uid);
} else {
SDBVgroupInfo vgInfo = {0};
vgInfo.dbId = rsp->uid;
vgInfo.vgVersion = rsp->vgVersion;
vgInfo.hashMethod = rsp->hashMethod;
vgInfo.vgInfo = taosHashInit(rsp->vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
if (NULL == vgInfo.vgInfo) {
vgInfo.vgHash = taosHashInit(rsp->vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
if (NULL == vgInfo.vgHash) {
tscError("hash init[%d] failed", rsp->vgNum);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
......@@ -67,16 +61,16 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog
rsp->vgroupInfo[i].epset.eps[n].port = ntohs(rsp->vgroupInfo[i].epset.eps[n].port);
}
if (0 != taosHashPut(vgInfo.vgInfo, &rsp->vgroupInfo[i].vgId, sizeof(rsp->vgroupInfo[i].vgId), &rsp->vgroupInfo[i], sizeof(rsp->vgroupInfo[i]))) {
if (0 != taosHashPut(vgInfo.vgHash, &rsp->vgroupInfo[i].vgId, sizeof(rsp->vgroupInfo[i].vgId), &rsp->vgroupInfo[i], sizeof(rsp->vgroupInfo[i]))) {
tscError("hash push failed, errno:%d", errno);
taosHashCleanup(vgInfo.vgInfo);
taosHashCleanup(vgInfo.vgHash);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
}
code = catalogUpdateDBVgroup(pCatalog, rsp->db, &vgInfo);
code = catalogUpdateDBVgroup(pCatalog, rsp->db, rsp->uid, &vgInfo);
if (code) {
taosHashCleanup(vgInfo.vgInfo);
taosHashCleanup(vgInfo.vgHash);
}
}
......@@ -90,6 +84,58 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog
return TSDB_CODE_SUCCESS;
}
static int32_t hbProcessStbInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) {
int32_t msgLen = 0;
int32_t code = 0;
int32_t schemaNum = 0;
while (msgLen < valueLen) {
STableMetaRsp *rsp = (STableMetaRsp *)((char *)value + msgLen);
rsp->numOfColumns = ntohl(rsp->numOfColumns);
rsp->suid = be64toh(rsp->suid);
if (rsp->numOfColumns < 0) {
schemaNum = 0;
tscDebug("hb remove stb, db:%s, stb:%s", rsp->dbFName, rsp->stbName);
catalogRemoveSTableMeta(pCatalog, rsp->dbFName, rsp->stbName, rsp->suid);
} else {
tscDebug("hb update stb, db:%s, stb:%s", rsp->dbFName, rsp->stbName);
rsp->numOfTags = ntohl(rsp->numOfTags);
rsp->sversion = ntohl(rsp->sversion);
rsp->tversion = ntohl(rsp->tversion);
rsp->tuid = be64toh(rsp->tuid);
rsp->vgId = ntohl(rsp->vgId);
SSchema* pSchema = rsp->pSchema;
schemaNum = rsp->numOfColumns + rsp->numOfTags;
for (int i = 0; i < schemaNum; ++i) {
pSchema->bytes = ntohl(pSchema->bytes);
pSchema->colId = ntohl(pSchema->colId);
pSchema++;
}
if (rsp->pSchema[0].colId != PRIMARYKEY_TIMESTAMP_COL_ID) {
tscError("invalid colId[%d] for the first column in table meta rsp msg", rsp->pSchema[0].colId);
return TSDB_CODE_TSC_INVALID_VALUE;
}
catalogUpdateSTableMeta(pCatalog, rsp);
}
msgLen += sizeof(STableMetaRsp) + schemaNum * sizeof(SSchema);
}
return TSDB_CODE_SUCCESS;
}
static int32_t hbQueryHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp* pRsp) {
SHbConnInfo * info = taosHashGet(pAppHbMgr->connInfo, &pRsp->connKey, sizeof(SClientHbKey));
if (NULL == info) {
......@@ -122,9 +168,24 @@ static int32_t hbQueryHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp* pRs
hbProcessDBInfoRsp(kv->value, kv->valueLen, pCatalog);
break;
}
case HEARTBEAT_KEY_STBINFO:
case HEARTBEAT_KEY_STBINFO:{
if (kv->valueLen <= 0 || NULL == kv->value) {
tscError("invalid hb stb info, len:%d, value:%p", kv->valueLen, kv->value);
break;
}
int64_t *clusterId = (int64_t *)info->param;
struct SCatalog *pCatalog = NULL;
int32_t code = catalogGetHandle(*clusterId, &pCatalog);
if (code != TSDB_CODE_SUCCESS) {
tscWarn("catalogGetHandle failed, clusterId:%"PRIx64", error:%s", *clusterId, tstrerror(code));
break;
}
hbProcessStbInfoRsp(kv->value, kv->valueLen, pCatalog);
break;
}
default:
tscError("invalid hb key type:%d", kv->key);
break;
......@@ -157,7 +218,7 @@ static int32_t hbMqAsyncCallBack(void* param, const SDataBuf* pMsg, int32_t code
tfree(param);
if (rspNum) {
tscDebug("hb got %d rsp, %d empty rsp prior", rspNum, atomic_val_compare_exchange_32(&emptyRspNum, emptyRspNum, 0));
tscDebug("hb got %d rsp, %d empty rsp received before", rspNum, atomic_val_compare_exchange_32(&emptyRspNum, emptyRspNum, 0));
} else {
atomic_add_fetch_32(&emptyRspNum, 1);
}
......@@ -204,6 +265,37 @@ int32_t hbGetExpiredDBInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SCl
return TSDB_CODE_SUCCESS;
}
int32_t hbGetExpiredStbInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *req) {
SSTableMetaVersion *stbs = NULL;
uint32_t stbNum = 0;
int32_t code = 0;
code = catalogGetExpiredSTables(pCatalog, &stbs, &stbNum);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
if (stbNum <= 0) {
return TSDB_CODE_SUCCESS;
}
for (int32_t i = 0; i < stbNum; ++i) {
SSTableMetaVersion *stb = &stbs[i];
stb->suid = htobe64(stb->suid);
stb->sversion = htons(stb->sversion);
stb->tversion = htons(stb->tversion);
}
SKv kv = {.key = HEARTBEAT_KEY_STBINFO, .valueLen = sizeof(SSTableMetaVersion) * stbNum, .value = stbs};
tscDebug("hb got %d expired stb, valueLen:%d", stbNum, kv.valueLen);
taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv));
return TSDB_CODE_SUCCESS;
}
int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void* param, SClientHbReq *req) {
int64_t *clusterId = (int64_t *)param;
struct SCatalog *pCatalog = NULL;
......@@ -219,6 +311,11 @@ int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void* param, SClientHbReq *req
return code;
}
code = hbGetExpiredStbInfo(connKey, pCatalog, req);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
return TSDB_CODE_SUCCESS;
}
......@@ -389,7 +486,6 @@ static void hbStopThread() {
}
SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo, char *key) {
return NULL;
hbMgrInit();
SAppHbMgr* pAppHbMgr = malloc(sizeof(SAppHbMgr));
if (pAppHbMgr == NULL) {
......@@ -431,29 +527,22 @@ SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo, char *key) {
return pAppHbMgr;
}
void appHbMgrCleanup(SAppHbMgr* pAppHbMgr) {
if (NULL == pAppHbMgr) {
return;
}
void appHbMgrCleanup(void) {
pthread_mutex_lock(&clientHbMgr.lock);
int sz = taosArrayGetSize(clientHbMgr.appHbMgrs);
for (int i = 0; i < sz; i++) {
SAppHbMgr* pTarget = taosArrayGetP(clientHbMgr.appHbMgrs, i);
if (pAppHbMgr == pTarget) {
taosHashCleanup(pTarget->activeInfo);
pTarget->activeInfo = NULL;
taosHashCleanup(pTarget->connInfo);
pTarget->connInfo = NULL;
}
taosHashCleanup(pTarget->activeInfo);
pTarget->activeInfo = NULL;
taosHashCleanup(pTarget->connInfo);
pTarget->connInfo = NULL;
}
pthread_mutex_unlock(&clientHbMgr.lock);
}
int hbMgrInit() {
return 0;
// init once
int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 0, 1);
if (old == 1) return 0;
......@@ -479,6 +568,7 @@ void hbMgrCleanUp() {
if (old == 0) return;
pthread_mutex_lock(&clientHbMgr.lock);
appHbMgrCleanup();
taosArrayDestroy(clientHbMgr.appHbMgrs);
pthread_mutex_unlock(&clientHbMgr.lock);
......@@ -509,7 +599,6 @@ int hbRegisterConnImpl(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, SHbConnInfo *
}
int hbRegisterConn(SAppHbMgr* pAppHbMgr, int32_t connId, int64_t clusterId, int32_t hbType) {
return 0;
SClientHbKey connKey = {.connId = connId, .hbType = HEARTBEAT_TYPE_QUERY};
SHbConnInfo info = {0};
......@@ -539,9 +628,6 @@ void hbDeregisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey) {
return;
}
atomic_sub_fetch_32(&pAppHbMgr->connKeyCnt, 1);
if (atomic_load_32(&pAppHbMgr->connKeyCnt) <= 0) {
appHbMgrCleanup(pAppHbMgr);
}
}
int hbAddConnInfo(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, void* key, void* value, int32_t keyLen, int32_t valueLen) {
......
......@@ -110,7 +110,7 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass,
p = calloc(1, sizeof(struct SAppInstInfo));
p->mgmtEp = epSet;
p->pTransporter = openTransporter(user, secretEncrypt, tsNumOfCores);
/*p->pAppHbMgr = appHbMgrInit(p, key);*/
p->pAppHbMgr = appHbMgrInit(p, key);
taosHashPut(appInfo.pInstMap, key, strlen(key), &p, POINTER_BYTES);
pInst = &p;
......@@ -540,8 +540,12 @@ void* doFetchRow(SRequestObj* pRequest) {
tsem_wait(&pRequest->body.rspSem);
pRequest->type = TDMT_VND_SHOW_TABLES_FETCH;
} else if (pRequest->type == TDMT_MND_SHOW_RETRIEVE && pResultInfo->pData != NULL) {
return NULL;
} else if (pRequest->type == TDMT_MND_SHOW_RETRIEVE) {
epSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
if (pResultInfo->completed) {
return NULL;
}
}
SMsgSendInfo* body = buildMsgInfoImpl(pRequest);
......
......@@ -76,7 +76,7 @@ int processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) {
pTscObj->connType = HEARTBEAT_TYPE_QUERY;
/*hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, pConnect->connId, pConnect->clusterId, HEARTBEAT_TYPE_QUERY);*/
hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, pConnect->connId, pConnect->clusterId, HEARTBEAT_TYPE_QUERY);
// pRequest->body.resInfo.pRspMsg = pMsg->pData;
tscDebug("0x%" PRIx64 " clusterId:%" PRId64 ", totalConn:%" PRId64, pRequest->requestId, pConnect->clusterId,
......@@ -201,6 +201,7 @@ int32_t processRetrieveMnodeRsp(void* param, const SDataBuf* pMsg, int32_t code)
pResInfo->pRspMsg = pMsg->pData;
pResInfo->numOfRows = pRetrieve->numOfRows;
pResInfo->pData = pRetrieve->data;
pResInfo->completed = pRetrieve->completed;
pResInfo->current = 0;
setResultDataPtr(pResInfo, pResInfo->fields, pResInfo->numOfCols, pResInfo->numOfRows);
......@@ -301,15 +302,12 @@ int32_t processDropDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
SDropDbRsp *rsp = (SDropDbRsp *)pMsg->pData;
SDbVgVersion dbVer = {0};
struct SCatalog *pCatalog = NULL;
strncpy(dbVer.dbName, rsp->db, sizeof(dbVer.dbName));
dbVer.dbId = be64toh(rsp->uid);
rsp->uid = be64toh(rsp->uid);
catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
catalogRemoveDBVgroup(pCatalog, &dbVer);
catalogRemoveDB(pCatalog, rsp->db, rsp->uid);
tsem_post(&pRequest->body.rspSem);
return code;
......
......@@ -27,7 +27,7 @@
#undef TD_MSG_SEG_CODE_
#include "tmsgdef.h"
int tInitSubmitMsgIter(SSubmitMsg *pMsg, SSubmitMsgIter *pIter) {
int32_t tInitSubmitMsgIter(SSubmitMsg *pMsg, SSubmitMsgIter *pIter) {
if (pMsg == NULL) {
terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP;
return -1;
......@@ -44,7 +44,7 @@ int tInitSubmitMsgIter(SSubmitMsg *pMsg, SSubmitMsgIter *pIter) {
return 0;
}
int tGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock) {
int32_t tGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock) {
if (pIter->len == 0) {
pIter->len += sizeof(SSubmitMsg);
} else {
......@@ -63,7 +63,7 @@ int tGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock) {
return 0;
}
int tInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter) {
int32_t tInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter) {
if (pBlock->dataLen <= 0) return -1;
pIter->totalLen = pBlock->dataLen;
pIter->len = 0;
......@@ -85,14 +85,14 @@ SMemRow tGetSubmitBlkNext(SSubmitBlkIter *pIter) {
}
}
int tSerializeSClientHbReq(void **buf, const SClientHbReq *pReq) {
int tlen = 0;
int32_t tSerializeSClientHbReq(void **buf, const SClientHbReq *pReq) {
int32_t tlen = 0;
tlen += taosEncodeSClientHbKey(buf, &pReq->connKey);
int32_t kvNum = taosHashGetSize(pReq->info);
tlen += taosEncodeFixedI32(buf, kvNum);
SKv *kv;
void* pIter = taosHashIterate(pReq->info, NULL);
SKv *kv;
void *pIter = taosHashIterate(pReq->info, NULL);
while (pIter != NULL) {
kv = pIter;
tlen += taosEncodeSKv(buf, kv);
......@@ -111,7 +111,7 @@ void *tDeserializeSClientHbReq(void *buf, SClientHbReq *pReq) {
if (pReq->info == NULL) {
pReq->info = taosHashInit(kvNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
}
for(int i = 0; i < kvNum; i++) {
for (int32_t i = 0; i < kvNum; i++) {
SKv kv;
buf = taosDecodeSKv(buf, &kv);
taosHashPut(pReq->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv));
......@@ -120,54 +120,55 @@ void *tDeserializeSClientHbReq(void *buf, SClientHbReq *pReq) {
return buf;
}
int tSerializeSClientHbRsp(void** buf, const SClientHbRsp* pRsp) {
int tlen = 0;
int32_t tSerializeSClientHbRsp(void **buf, const SClientHbRsp *pRsp) {
int32_t tlen = 0;
int32_t kvNum = taosArrayGetSize(pRsp->info);
tlen += taosEncodeSClientHbKey(buf, &pRsp->connKey);
tlen += taosEncodeFixedI32(buf, pRsp->status);
tlen += taosEncodeFixedI32(buf, kvNum);
for (int i = 0; i < kvNum; i++) {
for (int32_t i = 0; i < kvNum; i++) {
SKv *kv = (SKv *)taosArrayGet(pRsp->info, i);
tlen += taosEncodeSKv(buf, kv);
}
return tlen;
}
void* tDeserializeSClientHbRsp(void* buf, SClientHbRsp* pRsp) {
void *tDeserializeSClientHbRsp(void *buf, SClientHbRsp *pRsp) {
int32_t kvNum = 0;
buf = taosDecodeSClientHbKey(buf, &pRsp->connKey);
buf = taosDecodeFixedI32(buf, &pRsp->status);
buf = taosDecodeFixedI32(buf, &kvNum);
pRsp->info = taosArrayInit(kvNum, sizeof(SKv));
for (int i = 0; i < kvNum; i++) {
for (int32_t i = 0; i < kvNum; i++) {
SKv kv = {0};
buf = taosDecodeSKv(buf, &kv);
taosArrayPush(pRsp->info, &kv);
}
return buf;
}
int tSerializeSClientHbBatchReq(void** buf, const SClientHbBatchReq* pBatchReq) {
int tlen = 0;
int32_t tSerializeSClientHbBatchReq(void **buf, const SClientHbBatchReq *pBatchReq) {
int32_t tlen = 0;
tlen += taosEncodeFixedI64(buf, pBatchReq->reqId);
int32_t reqNum = taosArrayGetSize(pBatchReq->reqs);
tlen += taosEncodeFixedI32(buf, reqNum);
for (int i = 0; i < reqNum; i++) {
SClientHbReq* pReq = taosArrayGet(pBatchReq->reqs, i);
tlen += taosEncodeFixedI32(buf, reqNum);
for (int32_t i = 0; i < reqNum; i++) {
SClientHbReq *pReq = taosArrayGet(pBatchReq->reqs, i);
tlen += tSerializeSClientHbReq(buf, pReq);
}
return tlen;
}
void* tDeserializeSClientHbBatchReq(void* buf, SClientHbBatchReq* pBatchReq) {
void *tDeserializeSClientHbBatchReq(void *buf, SClientHbBatchReq *pBatchReq) {
buf = taosDecodeFixedI64(buf, &pBatchReq->reqId);
if (pBatchReq->reqs == NULL) {
pBatchReq->reqs = taosArrayInit(0, sizeof(SClientHbReq));
}
int32_t reqNum;
buf = taosDecodeFixedI32(buf, &reqNum);
for (int i = 0; i < reqNum; i++) {
for (int32_t i = 0; i < reqNum; i++) {
SClientHbReq req = {0};
buf = tDeserializeSClientHbReq(buf, &req);
taosArrayPush(pBatchReq->reqs, &req);
......@@ -175,31 +176,31 @@ void* tDeserializeSClientHbBatchReq(void* buf, SClientHbBatchReq* pBatchReq) {
return buf;
}
int tSerializeSClientHbBatchRsp(void** buf, const SClientHbBatchRsp* pBatchRsp) {
int tlen = 0;
int32_t tSerializeSClientHbBatchRsp(void **buf, const SClientHbBatchRsp *pBatchRsp) {
int32_t tlen = 0;
int32_t sz = taosArrayGetSize(pBatchRsp->rsps);
tlen += taosEncodeFixedI32(buf, sz);
for (int i = 0; i < sz; i++) {
SClientHbRsp* pRsp = taosArrayGet(pBatchRsp->rsps, i);
for (int32_t i = 0; i < sz; i++) {
SClientHbRsp *pRsp = taosArrayGet(pBatchRsp->rsps, i);
tlen += tSerializeSClientHbRsp(buf, pRsp);
}
return tlen;
}
void* tDeserializeSClientHbBatchRsp(void* buf, SClientHbBatchRsp* pBatchRsp) {
void *tDeserializeSClientHbBatchRsp(void *buf, SClientHbBatchRsp *pBatchRsp) {
int32_t sz;
buf = taosDecodeFixedI32(buf, &sz);
pBatchRsp->rsps = taosArrayInit(sz, sizeof(SClientHbRsp));
for (int i = 0; i < sz; i++) {
for (int32_t i = 0; i < sz; i++) {
SClientHbRsp rsp = {0};
buf = tDeserializeSClientHbRsp(buf, &rsp);
buf = tDeserializeSClientHbRsp(buf, &rsp);
taosArrayPush(pBatchRsp->rsps, &rsp);
}
return buf;
}
int tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) {
int tlen = 0;
int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) {
int32_t tlen = 0;
tlen += taosEncodeFixedU64(buf, pReq->ver);
tlen += taosEncodeString(buf, pReq->name);
......@@ -293,8 +294,8 @@ void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) {
return buf;
}
int tSVCreateTbBatchReqSerialize(void **buf, SVCreateTbBatchReq *pReq) {
int tlen = 0;
int32_t tSerializeSVCreateTbBatchReq(void **buf, SVCreateTbBatchReq *pReq) {
int32_t tlen = 0;
tlen += taosEncodeFixedU64(buf, pReq->ver);
tlen += taosEncodeFixedU32(buf, taosArrayGetSize(pReq->pArray));
......@@ -306,7 +307,7 @@ int tSVCreateTbBatchReqSerialize(void **buf, SVCreateTbBatchReq *pReq) {
return tlen;
}
void *tSVCreateTbBatchReqDeserialize(void *buf, SVCreateTbBatchReq *pReq) {
void *tDeserializeSVCreateTbBatchReq(void *buf, SVCreateTbBatchReq *pReq) {
uint32_t nsize = 0;
buf = taosDecodeFixedU64(buf, &pReq->ver);
......@@ -320,3 +321,139 @@ void *tSVCreateTbBatchReqDeserialize(void *buf, SVCreateTbBatchReq *pReq) {
return buf;
}
int32_t tSerializeSVDropTbReq(void **buf, SVDropTbReq *pReq) {
int32_t tlen = 0;
tlen += taosEncodeFixedU64(buf, pReq->ver);
tlen += taosEncodeString(buf, pReq->name);
tlen += taosEncodeFixedU8(buf, pReq->type);
return tlen;
}
void *tDeserializeSVDropTbReq(void *buf, SVDropTbReq *pReq) {
buf = taosDecodeFixedU64(buf, &pReq->ver);
buf = taosDecodeString(buf, &pReq->name);
buf = taosDecodeFixedU8(buf, &pReq->type);
return buf;
}
int32_t tSerializeSMCreateStbReq(void **buf, SMCreateStbReq *pReq) {
int32_t tlen = 0;
tlen += taosEncodeString(buf, pReq->name);
tlen += taosEncodeFixedI8(buf, pReq->igExists);
tlen += taosEncodeFixedI32(buf, pReq->numOfColumns);
tlen += taosEncodeFixedI32(buf, pReq->numOfTags);
for (int32_t i = 0; i < pReq->numOfColumns; ++i) {
SField *pField = taosArrayGet(pReq->pColumns, i);
tlen += taosEncodeFixedI8(buf, pField->type);
tlen += taosEncodeFixedI32(buf, pField->bytes);
tlen += taosEncodeString(buf, pField->name);
}
for (int32_t i = 0; i < pReq->numOfTags; ++i) {
SField *pField = taosArrayGet(pReq->pTags, i);
tlen += taosEncodeFixedI8(buf, pField->type);
tlen += taosEncodeFixedI32(buf, pField->bytes);
tlen += taosEncodeString(buf, pField->name);
}
return tlen;
}
void *tDeserializeSMCreateStbReq(void *buf, SMCreateStbReq *pReq) {
buf = taosDecodeStringTo(buf, pReq->name);
buf = taosDecodeFixedI8(buf, &pReq->igExists);
buf = taosDecodeFixedI32(buf, &pReq->numOfColumns);
buf = taosDecodeFixedI32(buf, &pReq->numOfTags);
pReq->pColumns = taosArrayInit(pReq->numOfColumns, sizeof(SField));
pReq->pTags = taosArrayInit(pReq->numOfTags, sizeof(SField));
if (pReq->pColumns == NULL || pReq->pTags == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
for (int32_t i = 0; i < pReq->numOfColumns; ++i) {
SField field = {0};
buf = taosDecodeFixedI8(buf, &field.type);
buf = taosDecodeFixedI32(buf, &field.bytes);
buf = taosDecodeStringTo(buf, field.name);
if (taosArrayPush(pReq->pColumns, &field) == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
}
for (int32_t i = 0; i < pReq->numOfTags; ++i) {
SField field = {0};
buf = taosDecodeFixedI8(buf, &field.type);
buf = taosDecodeFixedI32(buf, &field.bytes);
buf = taosDecodeStringTo(buf, field.name);
if (taosArrayPush(pReq->pTags, &field) == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
}
return buf;
}
int32_t tSerializeSMDropStbReq(void **buf, SMDropStbReq *pReq) {
int32_t tlen = 0;
tlen += taosEncodeString(buf, pReq->name);
tlen += taosEncodeFixedI8(buf, pReq->igNotExists);
return tlen;
}
void *tDeserializeSMDropStbReq(void *buf, SMDropStbReq *pReq) {
buf = taosDecodeStringTo(buf, pReq->name);
buf = taosDecodeFixedI8(buf, &pReq->igNotExists);
return buf;
}
int32_t tSerializeSMAlterStbReq(void **buf, SMAltertbReq *pReq) {
int32_t tlen = 0;
tlen += taosEncodeString(buf, pReq->name);
tlen += taosEncodeFixedI8(buf, pReq->alterType);
tlen += taosEncodeFixedI32(buf, pReq->numOfFields);
for (int32_t i = 0; i < pReq->numOfFields; ++i) {
SField *pField = taosArrayGet(pReq->pFields, i);
tlen += taosEncodeFixedU8(buf, pField->type);
tlen += taosEncodeFixedI32(buf, pField->bytes);
tlen += taosEncodeString(buf, pField->name);
}
return tlen;
}
void *tDeserializeSMAlterStbReq(void *buf, SMAltertbReq *pReq) {
buf = taosDecodeStringTo(buf, pReq->name);
buf = taosDecodeFixedI8(buf, &pReq->alterType);
buf = taosDecodeFixedI32(buf, &pReq->numOfFields);
pReq->pFields = taosArrayInit(pReq->numOfFields, sizeof(SField));
if (pReq->pFields == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
for (int32_t i = 0; i < pReq->numOfFields; ++i) {
SField field = {0};
buf = taosDecodeFixedU8(buf, &field.type);
buf = taosDecodeFixedI32(buf, &field.bytes);
buf = taosDecodeStringTo(buf, field.name);
if (taosArrayPush(pReq->pFields, &field) == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
}
return buf;
}
......@@ -125,7 +125,7 @@ const char* Testbase::GetMetaName(int32_t index) {
int32_t Testbase::GetMetaNum() { return pMeta->numOfColumns; }
const char* Testbase::GetMetaTbName() { return pMeta->tbFname; }
const char* Testbase::GetMetaTbName() { return pMeta->tbName; }
void Testbase::SendShowRetrieveReq() {
int32_t contLen = sizeof(SRetrieveTableReq);
......@@ -144,7 +144,7 @@ void Testbase::SendShowRetrieveReq() {
pos = 0;
}
const char* Testbase::GetShowName() { return pMeta->tbFname; }
const char* Testbase::GetShowName() { return pMeta->tbName; }
int8_t Testbase::GetShowInt8() {
int8_t data = *((int8_t*)(pData + pos));
......
......@@ -108,7 +108,7 @@ TEST_F(DndTestVnode, 01_Create_Vnode) {
}
}
TEST_F(DndTestVnode, 02_ALTER_Vnode) {
TEST_F(DndTestVnode, 02_Alter_Vnode) {
for (int i = 0; i < 3; ++i) {
int32_t contLen = sizeof(SAlterVnodeReq);
......@@ -199,19 +199,16 @@ TEST_F(DndTestVnode, 03_Create_Stb) {
req.stbCfg.nTagCols = 3;
req.stbCfg.pTagSchema = &schemas[2];
int32_t bsize = tSerializeSVCreateTbReq(NULL, &req);
void* buf = rpcMallocCont(sizeof(SMsgHead) + bsize);
SMsgHead* pMsgHead = (SMsgHead*)buf;
int32_t contLen = tSerializeSVCreateTbReq(NULL, &req) + sizeof(SMsgHead);
SMsgHead* pHead = (SMsgHead*)rpcMallocCont(contLen);
pMsgHead->contLen = htonl(sizeof(SMsgHead) + bsize);
pMsgHead->vgId = htonl(2);
pHead->contLen = htonl(contLen);
pHead->vgId = htonl(2);
void* pBuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
void* pBuf = POINTER_SHIFT(pHead, sizeof(SMsgHead));
tSerializeSVCreateTbReq(&pBuf, &req);
int32_t contLen = sizeof(SMsgHead) + bsize;
SRpcMsg* pRsp = test.SendReq(TDMT_VND_CREATE_STB, buf, contLen);
SRpcMsg* pRsp = test.SendReq(TDMT_VND_CREATE_STB, (void*)pHead, contLen);
ASSERT_NE(pRsp, nullptr);
if (i == 0) {
ASSERT_EQ(pRsp->code, 0);
......@@ -222,36 +219,98 @@ TEST_F(DndTestVnode, 03_Create_Stb) {
}
}
TEST_F(DndTestVnode, 04_ALTER_Stb) {
#if 0
{
for (int i = 0; i < 3; ++i) {
SRpcMsg* pRsp = test.SendReq(TDMT_VND_ALTER_STB, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, 0);
TEST_F(DndTestVnode, 04_Alter_Stb) {
for (int i = 0; i < 1; ++i) {
SVCreateTbReq req = {0};
req.ver = 0;
req.name = (char*)"stb1";
req.ttl = 0;
req.keep = 0;
req.type = TD_SUPER_TABLE;
SSchema schemas[5] = {0};
{
SSchema* pSchema = &schemas[0];
pSchema->bytes = htonl(8);
pSchema->type = TSDB_DATA_TYPE_TIMESTAMP;
strcpy(pSchema->name, "ts");
}
{
SSchema* pSchema = &schemas[1];
pSchema->bytes = htonl(4);
pSchema->type = TSDB_DATA_TYPE_INT;
strcpy(pSchema->name, "col1");
}
{
SSchema* pSchema = &schemas[2];
pSchema->bytes = htonl(2);
pSchema->type = TSDB_DATA_TYPE_TINYINT;
strcpy(pSchema->name, "_tag1");
}
{
SSchema* pSchema = &schemas[3];
pSchema->bytes = htonl(8);
pSchema->type = TSDB_DATA_TYPE_BIGINT;
strcpy(pSchema->name, "_tag2");
}
{
SSchema* pSchema = &schemas[4];
pSchema->bytes = htonl(16);
pSchema->type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema->name, "_tag3");
}
req.stbCfg.suid = 9527;
req.stbCfg.nCols = 2;
req.stbCfg.pSchema = &schemas[0];
req.stbCfg.nTagCols = 3;
req.stbCfg.pTagSchema = &schemas[2];
int32_t contLen = tSerializeSVCreateTbReq(NULL, &req) + sizeof(SMsgHead);
SMsgHead* pHead = (SMsgHead*)rpcMallocCont(contLen);
pHead->contLen = htonl(contLen);
pHead->vgId = htonl(2);
void* pBuf = POINTER_SHIFT(pHead, sizeof(SMsgHead));
tSerializeSVCreateTbReq(&pBuf, &req);
SRpcMsg* pRsp = test.SendReq(TDMT_VND_ALTER_STB, (void*)pHead, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, 0);
}
#endif
}
TEST_F(DndTestVnode, 05_DROP_Stb) {
#if 0
{
for (int i = 0; i < 3; ++i) {
SRpcMsg* pRsp = test.SendReq(TDMT_VND_DROP_STB, pReq, contLen);
SVDropTbReq req = {0};
req.ver = 0;
req.name = (char*)"stb1";
req.suid = 9599;
req.type = TD_SUPER_TABLE;
int32_t contLen = tSerializeSVDropTbReq(NULL, &req) + sizeof(SMsgHead);
SMsgHead* pHead = (SMsgHead*)rpcMallocCont(contLen);
pHead->contLen = htonl(contLen);
pHead->vgId = htonl(2);
void* pBuf = POINTER_SHIFT(pHead, sizeof(SMsgHead));
tSerializeSVDropTbReq(&pBuf, &req);
SRpcMsg* pRsp = test.SendReq(TDMT_VND_DROP_STB, (void*)pHead, contLen);
ASSERT_NE(pRsp, nullptr);
if (i == 0) {
ASSERT_EQ(pRsp->code, 0);
test.Restart();
} else {
ASSERT_EQ(pRsp->code, TSDB_CODE_TDB_INVALID_TABLE_ID);
}
ASSERT_EQ(pRsp->code, 0);
}
}
#endif
}
TEST_F(DndTestVnode, 06_DROP_Vnode) {
TEST_F(DndTestVnode, 06_Drop_Vnode) {
for (int i = 0; i < 3; ++i) {
int32_t contLen = sizeof(SDropVnodeReq);
......
......@@ -301,10 +301,12 @@ typedef struct {
uint64_t uid;
uint64_t dbUid;
int32_t version;
int32_t nextColId;
int32_t numOfColumns;
int32_t numOfTags;
SSchema* pColumns;
SSchema* pTags;
SRWLatch lock;
SSchema* pSchema;
} SStbObj;
typedef struct {
......
......@@ -28,6 +28,9 @@ void mndCleanupStb(SMnode *pMnode);
SStbObj *mndAcquireStb(SMnode *pMnode, char *stbName);
void mndReleaseStb(SMnode *pMnode, SStbObj *pStb);
int32_t mndValidateStbInfo(SMnode *pMnode, SSTableMetaVersion *stbs, int32_t num, void **rsp, int32_t *rspLen);
#ifdef __cplusplus
}
#endif
......
......@@ -426,7 +426,7 @@ static int32_t mndGetBnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *
pShow->numOfRows = sdbGetSize(pSdb, SDB_BNODE);
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
strcpy(pMeta->tbFname, mndShowStr(pShow->type));
strcpy(pMeta->tbName, mndShowStr(pShow->type));
return 0;
}
......
......@@ -186,7 +186,7 @@ static int32_t mndGetClusterMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp
cols++;
pMeta->numOfColumns = htonl(cols);
strcpy(pMeta->tbFname, mndShowStr(pShow->type));
strcpy(pMeta->tbName, mndShowStr(pShow->type));
pShow->numOfColumns = cols;
pShow->offset[0] = 0;
......@@ -196,7 +196,7 @@ static int32_t mndGetClusterMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp
pShow->numOfRows = 1;
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
strcpy(pMeta->tbFname, mndShowStr(pShow->type));
strcpy(pMeta->tbName, mndShowStr(pShow->type));
return 0;
}
......
......@@ -907,9 +907,9 @@ int32_t mndValidateDBInfo(SMnode *pMnode, SDbVgVersion *dbs, int32_t num, void *
len = 0;
SDbObj *pDb = mndAcquireDb(pMnode, db->dbName);
SDbObj *pDb = mndAcquireDb(pMnode, db->dbFName);
if (pDb == NULL) {
mInfo("db %s not exist", db->dbName);
mInfo("db %s not exist", db->dbFName);
len = sizeof(SUseDbRsp);
} else if (pDb->uid != db->dbId || db->vgVersion < pDb->vgVersion) {
......@@ -929,7 +929,7 @@ int32_t mndValidateDBInfo(SMnode *pMnode, SDbVgVersion *dbs, int32_t num, void *
}
pRsp = (SUseDbRsp *)((char *)buf + bufOffset);
memcpy(pRsp->db, db->dbName, TSDB_DB_FNAME_LEN);
memcpy(pRsp->db, db->dbFName, TSDB_DB_FNAME_LEN);
if (pDb) {
int32_t vgNum = 0;
mndBuildDBVgroupInfo(pDb, pMnode, pRsp->vgroupInfo, &vgNum);
......@@ -1113,7 +1113,7 @@ static int32_t mndGetDbMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMe
pShow->numOfRows = sdbGetSize(pSdb, SDB_DB);
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
strcpy(pMeta->tbFname, mndShowStr(pShow->type));
strcpy(pMeta->tbName, mndShowStr(pShow->type));
return 0;
}
......
......@@ -608,7 +608,7 @@ static int32_t mndGetConfigMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp
pShow->numOfRows = TSDB_CONFIG_NUMBER;
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
strcpy(pMeta->tbFname, mndShowStr(pShow->type));
strcpy(pMeta->tbName, mndShowStr(pShow->type));
return 0;
}
......@@ -715,7 +715,7 @@ static int32_t mndGetDnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *
pShow->numOfRows = sdbGetSize(pSdb, SDB_DNODE);
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
strcpy(pMeta->tbFname, mndShowStr(pShow->type));
strcpy(pMeta->tbName, mndShowStr(pShow->type));
return 0;
}
......
......@@ -482,7 +482,7 @@ static int32_t mndGetFuncMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *p
pShow->numOfRows = sdbGetSize(pSdb, SDB_FUNC);
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
strcpy(pMeta->tbFname, mndShowStr(pShow->type));
strcpy(pMeta->tbName, mndShowStr(pShow->type));
return 0;
}
......
......@@ -624,7 +624,7 @@ static int32_t mndGetMnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *
pShow->numOfRows = sdbGetSize(pSdb, SDB_MNODE);
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
strcpy(pMeta->tbFname, mndShowStr(pShow->type));
strcpy(pMeta->tbName, mndShowStr(pShow->type));
mndUpdateMnodeRole(pMnode);
return 0;
......
......@@ -18,6 +18,7 @@
#include "mndProfile.h"
//#include "mndConsumer.h"
#include "mndDb.h"
#include "mndStb.h"
#include "mndMnode.h"
#include "mndShow.h"
//#include "mndTopic.h"
......@@ -376,9 +377,16 @@ static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) {
}
break;
}
case HEARTBEAT_KEY_STBINFO:
case HEARTBEAT_KEY_STBINFO: {
void *rspMsg = NULL;
int32_t rspLen = 0;
mndValidateStbInfo(pMnode, (SSTableMetaVersion *)kv->value, kv->valueLen/sizeof(SSTableMetaVersion), &rspMsg, &rspLen);
if (rspMsg && rspLen > 0) {
SKv kv = {.key = HEARTBEAT_KEY_STBINFO, .valueLen = rspLen, .value = rspMsg};
taosArrayPush(hbRsp.info, &kv);
}
break;
}
default:
mError("invalid kv key:%d", kv->key);
hbRsp.status = TSDB_CODE_MND_APP_ERROR;
......@@ -623,7 +631,7 @@ static int32_t mndGetConnsMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *
pShow->numOfRows = taosHashGetSize(pMgmt->cache->pHashTable);
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
strcpy(pMeta->tbFname, mndShowStr(pShow->type));
strcpy(pMeta->tbName, mndShowStr(pShow->type));
return 0;
}
......@@ -792,7 +800,7 @@ static int32_t mndGetQueryMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *
pShow->numOfRows = 1000000;
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
strcpy(pMeta->tbFname, mndShowStr(pShow->type));
strcpy(pMeta->tbName, mndShowStr(pShow->type));
return 0;
}
......
......@@ -426,7 +426,7 @@ static int32_t mndGetQnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *
pShow->numOfRows = sdbGetSize(pSdb, SDB_QNODE);
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
strcpy(pMeta->tbFname, mndShowStr(pShow->type));
strcpy(pMeta->tbName, mndShowStr(pShow->type));
return 0;
}
......
......@@ -428,7 +428,7 @@ static int32_t mndGetSnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *
pShow->numOfRows = sdbGetSize(pSdb, SDB_SNODE);
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
strcpy(pMeta->tbFname, mndShowStr(pShow->type));
strcpy(pMeta->tbName, mndShowStr(pShow->type));
return 0;
}
......
......@@ -338,7 +338,7 @@ static int32_t mndProcessTopicMetaMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
STableInfoReq *pInfo = pMsg->rpcMsg.pCont;
mDebug("topic:%s, start to retrieve meta", pInfo->tableFname);
mDebug("topic:%s, start to retrieve meta", pInfo->tbName);
#if 0
SDbObj *pDb = mndAcquireDbByTopic(pMnode, pInfo->tableFname);
......@@ -469,7 +469,7 @@ static int32_t mndGetTopicMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *
pShow->numOfRows = sdbGetSize(pSdb, SDB_TOPIC);
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
strcpy(pMeta->tbFname, mndShowStr(pShow->type));
strcpy(pMeta->tbName, mndShowStr(pShow->type));
return 0;
}
......
......@@ -474,7 +474,7 @@ static int32_t mndGetUserMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *p
pShow->numOfRows = sdbGetSize(pSdb, SDB_USER);
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
strcpy(pMeta->tbFname, mndShowStr(pShow->type));
strcpy(pMeta->tbName, mndShowStr(pShow->type));
return 0;
}
......
......@@ -525,7 +525,7 @@ static int32_t mndGetVgroupMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp
}
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
strcpy(pMeta->tbFname, mndShowStr(pShow->type));
strcpy(pMeta->tbName, mndShowStr(pShow->type));
return 0;
}
......@@ -638,7 +638,7 @@ static int32_t mndGetVnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *
pShow->replica = dnodeId;
pShow->numOfRows = mndGetVnodesNum(pMnode, dnodeId);
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
strcpy(pMeta->tbFname, mndShowStr(pShow->type));
strcpy(pMeta->tbName, mndShowStr(pShow->type));
return 0;
}
......
......@@ -64,7 +64,7 @@ TEST_F(MndTestShow, 03_ShowMsg_Conn) {
test.SendShowMetaReq(TSDB_MGMT_TABLE_CONNS, "");
STableMetaRsp* pMeta = test.GetShowMeta();
EXPECT_STREQ(pMeta->tbFname, "show connections");
EXPECT_STREQ(pMeta->tbName, "show connections");
EXPECT_EQ(pMeta->numOfTags, 0);
EXPECT_EQ(pMeta->numOfColumns, 7);
EXPECT_EQ(pMeta->precision, 0);
......
......@@ -84,7 +84,7 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) {
int msgLen = 0;
int32_t code = TSDB_CODE_VND_APP_ERROR;
pTbCfg = metaGetTbInfoByName(pVnode->pMeta, pReq->tableFname, &uid);
pTbCfg = metaGetTbInfoByName(pVnode->pMeta, pReq->tbName, &uid);
if (pTbCfg == NULL) {
code = TSDB_CODE_VND_TB_NOT_EXIST;
goto _exit;
......@@ -119,13 +119,13 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) {
goto _exit;
}
memcpy(pTbMetaMsg->dbFname, pReq->dbFname, sizeof(pTbMetaMsg->dbFname));
strcpy(pTbMetaMsg->tbFname, pTbCfg->name);
memcpy(pTbMetaMsg->dbFName, pReq->dbFName, sizeof(pTbMetaMsg->dbFName));
strcpy(pTbMetaMsg->tbName, pReq->tbName);
if (pTbCfg->type == META_CHILD_TABLE) {
strcpy(pTbMetaMsg->stbFname, pStbCfg->name);
strcpy(pTbMetaMsg->stbName, pStbCfg->name);
pTbMetaMsg->suid = htobe64(pTbCfg->ctbCfg.suid);
} else if (pTbCfg->type == META_SUPER_TABLE) {
strcpy(pTbMetaMsg->stbFname, pTbCfg->name);
strcpy(pTbMetaMsg->stbName, pTbCfg->name);
pTbMetaMsg->suid = htobe64(uid);
}
pTbMetaMsg->numOfTags = htonl(nTagCols);
......
......@@ -83,7 +83,7 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
free(vCreateTbReq.name);
break;
case TDMT_VND_CREATE_TABLE:
tSVCreateTbBatchReqDeserialize(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vCreateTbBatchReq);
tDeserializeSVCreateTbBatchReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vCreateTbBatchReq);
for (int i = 0; i < taosArrayGetSize(vCreateTbBatchReq.pArray); i++) {
SVCreateTbReq *pCreateTbReq = taosArrayGet(vCreateTbBatchReq.pArray, i);
if (metaCreateTable(pVnode->pMeta, pCreateTbReq) < 0) {
......@@ -105,7 +105,16 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
taosArrayDestroy(vCreateTbBatchReq.pArray);
break;
case TDMT_VND_ALTER_STB:
vTrace("vgId:%d, process alter stb req", pVnode->vgId);
tDeserializeSVCreateTbReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vCreateTbReq);
free(vCreateTbReq.stbCfg.pSchema);
free(vCreateTbReq.stbCfg.pTagSchema);
free(vCreateTbReq.name);
break;
case TDMT_VND_DROP_STB:
vTrace("vgId:%d, process drop stb req", pVnode->vgId);
break;
case TDMT_VND_DROP_TABLE:
// if (metaDropTable(pVnode->pMeta, vReq.dtReq.uid) < 0) {
// // TODO: handle error
......
......@@ -27,7 +27,7 @@ extern "C" {
#define CTG_DEFAULT_CACHE_CLUSTER_NUMBER 6
#define CTG_DEFAULT_CACHE_VGROUP_NUMBER 100
#define CTG_DEFAULT_CACHE_DB_NUMBER 20
#define CTG_DEFAULT_CACHE_TABLEMETA_NUMBER 100000
#define CTG_DEFAULT_CACHE_TABLEMETA_NUMBER 10000
#define CTG_DEFAULT_RENT_SECOND 10
#define CTG_DEFAULT_RENT_SLOT_SIZE 10
......@@ -47,55 +47,57 @@ enum {
CTG_RENT_STABLE,
};
typedef struct SCTGDebug {
int32_t lockDebug;
} SCTGDebug;
typedef struct SVgroupListCache {
int32_t vgroupVersion;
SHashObj *cache; // key:vgId, value:SVgroupInfo
} SVgroupListCache;
typedef struct SDBVgroupCache {
SHashObj *cache; //key:dbname, value:SDBVgroupInfo
} SDBVgroupCache;
typedef struct STableMetaCache {
SRWLatch stableLock;
SHashObj *cache; //key:fulltablename, value:STableMeta
SHashObj *stableCache; //key:suid, value:STableMeta*
} STableMetaCache;
typedef struct SRentSlotInfo {
typedef struct SCtgDebug {
bool lockDebug;
bool cacheDebug;
uint32_t showCachePeriodSec;
} SCtgDebug;
typedef struct SCtgTbMetaCache {
SRWLatch stbLock;
SRWLatch metaLock; // RC between cache destroy and all other operations
SHashObj *metaCache; //key:tbname, value:STableMeta
SHashObj *stbCache; //key:suid, value:STableMeta*
} SCtgTbMetaCache;
typedef struct SCtgDBCache {
SRWLatch vgLock;
uint64_t dbId;
int8_t deleted;
SDBVgroupInfo *vgInfo;
SCtgTbMetaCache tbCache;
} SCtgDBCache;
typedef struct SCtgRentSlot {
SRWLatch lock;
bool needSort;
SArray *meta; // element is SDbVgVersion or SSTableMetaVersion
} SRentSlotInfo;
} SCtgRentSlot;
typedef struct SMetaRentMgmt {
typedef struct SCtgRentMgmt {
int8_t type;
uint16_t slotNum;
uint16_t slotRIdx;
int64_t lastReadMsec;
SRentSlotInfo *slots;
} SMetaRentMgmt;
SCtgRentSlot *slots;
} SCtgRentMgmt;
typedef struct SCatalog {
uint64_t clusterId;
SDBVgroupCache dbCache;
STableMetaCache tableCache;
SMetaRentMgmt dbRent;
SMetaRentMgmt stableRent;
uint64_t clusterId;
SRWLatch dbLock;
SHashObj *dbCache; //key:dbname, value:SCtgDBCache
SCtgRentMgmt dbRent;
SCtgRentMgmt stbRent;
} SCatalog;
typedef struct SCtgApiStat {
} SCtgApiStat;
typedef struct SCtgResourceStat {
typedef struct SCtgRuntimeStat {
} SCtgResourceStat;
} SCtgRuntimeStat;
typedef struct SCtgCacheStat {
......@@ -103,11 +105,13 @@ typedef struct SCtgCacheStat {
typedef struct SCatalogStat {
SCtgApiStat api;
SCtgResourceStat resource;
SCtgRuntimeStat runtime;
SCtgCacheStat cache;
} SCatalogStat;
typedef struct SCatalogMgmt {
bool exit;
SRWLatch lock;
SHashObj *pCluster; //key: clusterId, value: SCatalog*
SCatalogStat stat;
SCatalogCfg cfg;
......@@ -135,11 +139,8 @@ typedef uint32_t (*tableNameHashFp)(const char *, uint32_t);
#define ctgDebug(param, ...) qDebug("CTG:%p " param, pCatalog, __VA_ARGS__)
#define ctgTrace(param, ...) qTrace("CTG:%p " param, pCatalog, __VA_ARGS__)
#define CTG_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0)
#define CTG_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
#define CTG_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0)
#define CTG_LOCK_DEBUG(...) do { if (gCTGDebug.lockDebug) { qDebug(__VA_ARGS__); } } while (0)
#define CTG_CACHE_DEBUG(...) do { if (gCTGDebug.cacheDebug) { qDebug(__VA_ARGS__); } } while (0)
#define TD_RWLATCH_WRITE_FLAG_COPY 0x40000000
......@@ -175,6 +176,15 @@ typedef uint32_t (*tableNameHashFp)(const char *, uint32_t);
} \
} while (0)
#define CTG_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0)
#define CTG_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
#define CTG_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0)
#define CTG_API_ENTER() do { CTG_LOCK(CTG_READ, &ctgMgmt.lock); if (atomic_load_8(&ctgMgmt.exit)) { CTG_RET(TSDB_CODE_CTG_OUT_OF_SERVICE); } } while (0)
#define CTG_API_LEAVE(c) do { int32_t __code = c; CTG_UNLOCK(CTG_READ, &ctgMgmt.lock); CTG_RET(__code); } while (0)
#ifdef __cplusplus
}
......
此差异已折叠。
......@@ -39,6 +39,7 @@ namespace {
extern "C" int32_t ctgGetTableMetaFromCache(struct SCatalog *pCatalog, const SName *pTableName, STableMeta **pTableMeta,
int32_t *exist);
extern "C" int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *output);
extern "C" int32_t ctgDbgGetClusterCacheNum(struct SCatalog* pCatalog, int32_t type);
void ctgTestSetPrepareTableMeta();
void ctgTestSetPrepareCTableMeta();
......@@ -49,7 +50,7 @@ bool ctgTestStop = false;
bool ctgTestEnableSleep = false;
bool ctgTestDeadLoop = false;
int32_t ctgTestPrintNum = 200000;
int32_t ctgTestMTRunSec = 30;
int32_t ctgTestMTRunSec = 5;
int32_t ctgTestCurrentVgVersion = 0;
int32_t ctgTestVgVersion = 1;
......@@ -107,6 +108,7 @@ void ctgTestInitLogFile() {
const int32_t maxLogFileNum = 10;
tsAsyncLog = 0;
qDebugFlag = 159;
char temp[128] = {0};
sprintf(temp, "%s/%s", tsLogDir, defaultLogFileNamePrefix);
......@@ -128,15 +130,14 @@ void ctgTestBuildCTableMetaOutput(STableMetaOutput *output) {
strcpy(sn.dbname, "db1");
strcpy(sn.tname, ctgTestSTablename);
char tbFullName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(&cn, tbFullName);
char db[TSDB_DB_FNAME_LEN] = {0};
tNameGetFullDbName(&cn, db);
strcpy(output->dbFName, db);
SET_META_TYPE_BOTH_TABLE(output->metaType);
strcpy(output->ctbFname, tbFullName);
tNameExtractFullName(&cn, tbFullName);
strcpy(output->tbFname, tbFullName);
strcpy(output->ctbName, cn.tname);
strcpy(output->tbName, sn.tname);
output->ctbMeta.vgId = 9;
output->ctbMeta.tableType = TSDB_CHILD_TABLE;
......@@ -175,18 +176,18 @@ void ctgTestBuildCTableMetaOutput(STableMetaOutput *output) {
strcpy(s->name, "tag1s");
}
void ctgTestBuildDBVgroup(SDBVgroupInfo *dbVgroup) {
void ctgTestBuildDBVgroup(SDBVgroupInfo **pdbVgroup) {
static int32_t vgVersion = ctgTestVgVersion + 1;
int32_t vgNum = 0;
SVgroupInfo vgInfo = {0};
SDBVgroupInfo *dbVgroup = (SDBVgroupInfo *)calloc(1, sizeof(SDBVgroupInfo));
dbVgroup->vgVersion = vgVersion++;
ctgTestCurrentVgVersion = dbVgroup->vgVersion;
dbVgroup->hashMethod = 0;
dbVgroup->dbId = ctgTestDbId;
dbVgroup->vgInfo = taosHashInit(ctgTestVgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
dbVgroup->vgHash = taosHashInit(ctgTestVgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
vgNum = ctgTestGetVgNumFromVgVersion(dbVgroup->vgVersion);
uint32_t hashUnit = UINT32_MAX / vgNum;
......@@ -203,10 +204,51 @@ void ctgTestBuildDBVgroup(SDBVgroupInfo *dbVgroup) {
addr->port = htons(n + 22);
}
taosHashPut(dbVgroup->vgInfo, &vgInfo.vgId, sizeof(vgInfo.vgId), &vgInfo, sizeof(vgInfo));
taosHashPut(dbVgroup->vgHash, &vgInfo.vgId, sizeof(vgInfo.vgId), &vgInfo, sizeof(vgInfo));
}
*pdbVgroup = dbVgroup;
}
void ctgTestBuildSTableMetaRsp(STableMetaRsp *rspMsg) {
strcpy(rspMsg->dbFName, ctgTestDbname);
sprintf(rspMsg->tbName, "%s", ctgTestSTablename);
sprintf(rspMsg->stbName, "%s", ctgTestSTablename);
rspMsg->numOfTags = ctgTestTagNum;
rspMsg->numOfColumns = ctgTestColNum;
rspMsg->precision = 1 + 1;
rspMsg->tableType = TSDB_SUPER_TABLE;
rspMsg->update = 1 + 1;
rspMsg->sversion = ctgTestSVersion + 1;
rspMsg->tversion = ctgTestTVersion + 1;
rspMsg->suid = ctgTestSuid + 1;
rspMsg->tuid = ctgTestSuid + 1;
rspMsg->vgId = 1;
SSchema *s = NULL;
s = &rspMsg->pSchema[0];
s->type = TSDB_DATA_TYPE_TIMESTAMP;
s->colId = 1;
s->bytes = 8;
strcpy(s->name, "ts");
s = &rspMsg->pSchema[1];
s->type = TSDB_DATA_TYPE_INT;
s->colId = 2;
s->bytes = 4;
strcpy(s->name, "col1s");
s = &rspMsg->pSchema[2];
s->type = TSDB_DATA_TYPE_BINARY;
s->colId = 3;
s->bytes = 12 + 1;
strcpy(s->name, "tag1s");
return;
}
void ctgTestPrepareDbVgroups(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) {
SUseDbRsp *rspMsg = NULL; // todo
......@@ -250,7 +292,8 @@ void ctgTestPrepareTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcM
pRsp->contLen = sizeof(STableMetaRsp) + (ctgTestColNum + ctgTestTagNum) * sizeof(SSchema);
pRsp->pCont = calloc(1, pRsp->contLen);
rspMsg = (STableMetaRsp *)pRsp->pCont;
sprintf(rspMsg->tbFname, "%s.%s", ctgTestDbname, ctgTestTablename);
strcpy(rspMsg->dbFName, ctgTestDbname);
strcpy(rspMsg->tbName, ctgTestTablename);
rspMsg->numOfTags = 0;
rspMsg->numOfColumns = htonl(ctgTestColNum);
rspMsg->precision = 1;
......@@ -285,8 +328,9 @@ void ctgTestPrepareCTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpc
pRsp->contLen = sizeof(STableMetaRsp) + (ctgTestColNum + ctgTestTagNum) * sizeof(SSchema);
pRsp->pCont = calloc(1, pRsp->contLen);
rspMsg = (STableMetaRsp *)pRsp->pCont;
sprintf(rspMsg->tbFname, "%s.%s", ctgTestDbname, ctgTestCTablename);
sprintf(rspMsg->stbFname, "%s.%s", ctgTestDbname, ctgTestSTablename);
strcpy(rspMsg->dbFName, ctgTestDbname);
strcpy(rspMsg->tbName, ctgTestCTablename);
strcpy(rspMsg->stbName, ctgTestSTablename);
rspMsg->numOfTags = htonl(ctgTestTagNum);
rspMsg->numOfColumns = htonl(ctgTestColNum);
rspMsg->precision = 1;
......@@ -327,8 +371,9 @@ void ctgTestPrepareSTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpc
pRsp->contLen = sizeof(STableMetaRsp) + (ctgTestColNum + ctgTestTagNum) * sizeof(SSchema);
pRsp->pCont = calloc(1, pRsp->contLen);
rspMsg = (STableMetaRsp *)pRsp->pCont;
sprintf(rspMsg->tbFname, "%s.%s", ctgTestDbname, ctgTestSTablename);
sprintf(rspMsg->stbFname, "%s.%s", ctgTestDbname, ctgTestSTablename);
strcpy(rspMsg->dbFName, ctgTestDbname);
strcpy(rspMsg->tbName, ctgTestSTablename);
strcpy(rspMsg->stbName, ctgTestSTablename);
rspMsg->numOfTags = htonl(ctgTestTagNum);
rspMsg->numOfColumns = htonl(ctgTestColNum);
rspMsg->precision = 1;
......@@ -370,8 +415,9 @@ void ctgTestPrepareMultiSTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg,
pRsp->contLen = sizeof(STableMetaRsp) + (ctgTestColNum + ctgTestTagNum) * sizeof(SSchema);
pRsp->pCont = calloc(1, pRsp->contLen);
rspMsg = (STableMetaRsp *)pRsp->pCont;
sprintf(rspMsg->tbFname, "%s.%s_%d", ctgTestDbname, ctgTestSTablename, idx);
sprintf(rspMsg->stbFname, "%s.%s_%d", ctgTestDbname, ctgTestSTablename, idx);
strcpy(rspMsg->dbFName, ctgTestDbname);
sprintf(rspMsg->tbName, "%s_%d", ctgTestSTablename, idx);
sprintf(rspMsg->stbName, "%s_%d", ctgTestSTablename, idx);
rspMsg->numOfTags = htonl(ctgTestTagNum);
rspMsg->numOfColumns = htonl(ctgTestColNum);
rspMsg->precision = 1;
......@@ -586,15 +632,40 @@ void *ctgTestGetDbVgroupThread(void *param) {
return NULL;
}
void *ctgTestSetDbVgroupThread(void *param) {
void *ctgTestSetSameDbVgroupThread(void *param) {
struct SCatalog *pCtg = (struct SCatalog *)param;
int32_t code = 0;
SDBVgroupInfo dbVgroup = {0};
SDBVgroupInfo *dbVgroup = NULL;
int32_t n = 0;
while (!ctgTestStop) {
ctgTestBuildDBVgroup(&dbVgroup);
code = catalogUpdateDBVgroup(pCtg, ctgTestDbname, ctgTestDbId, dbVgroup);
if (code) {
assert(0);
}
if (ctgTestEnableSleep) {
usleep(rand() % 5);
}
if (++n % ctgTestPrintNum == 0) {
printf("Set:%d\n", n);
}
}
return NULL;
}
void *ctgTestSetDiffDbVgroupThread(void *param) {
struct SCatalog *pCtg = (struct SCatalog *)param;
int32_t code = 0;
SDBVgroupInfo *dbVgroup = NULL;
int32_t n = 0;
while (!ctgTestStop) {
ctgTestBuildDBVgroup(&dbVgroup);
code = catalogUpdateDBVgroup(pCtg, ctgTestDbname, &dbVgroup);
code = catalogUpdateDBVgroup(pCtg, ctgTestDbname, ctgTestDbId++, dbVgroup);
if (code) {
assert(0);
}
......@@ -610,6 +681,7 @@ void *ctgTestSetDbVgroupThread(void *param) {
return NULL;
}
void *ctgTestGetCtableMetaThread(void *param) {
struct SCatalog *pCtg = (struct SCatalog *)param;
int32_t code = 0;
......@@ -669,11 +741,14 @@ void *ctgTestSetCtableMetaThread(void *param) {
return NULL;
}
TEST(tableMeta, normalTable) {
struct SCatalog *pCtg = NULL;
void *mockPointer = (void *)0x1;
SVgroupInfo vgInfo = {0};
ctgTestInitLogFile();
ctgTestSetPrepareDbVgroups();
initQueryModuleMsgHandle();
......@@ -741,7 +816,7 @@ TEST(tableMeta, normalTable) {
}
if (stbNum) {
printf("got expired stb,suid:%" PRId64 "\n", stb->suid);
printf("got expired stb,suid:%" PRId64 ",dbFName:%s, stbName:%s\n", stb->suid, stb->dbFName, stb->stbName);
free(stb);
stb = NULL;
} else {
......@@ -764,6 +839,8 @@ TEST(tableMeta, childTableCase) {
void *mockPointer = (void *)0x1;
SVgroupInfo vgInfo = {0};
ctgTestInitLogFile();
ctgTestSetPrepareDbVgroupsAndChildMeta();
initQueryModuleMsgHandle();
......@@ -837,7 +914,7 @@ TEST(tableMeta, childTableCase) {
}
if (stbNum) {
printf("got expired stb,suid:%" PRId64 "\n", stb->suid);
printf("got expired stb,suid:%" PRId64 ",dbFName:%s, stbName:%s\n", stb->suid, stb->dbFName, stb->stbName);
free(stb);
stb = NULL;
} else {
......@@ -938,7 +1015,8 @@ TEST(tableMeta, superTableCase) {
}
if (stbNum) {
printf("got expired stb,suid:%" PRId64 "\n", stb->suid);
printf("got expired stb,suid:%" PRId64 ",dbFName:%s, stbName:%s\n", stb->suid, stb->dbFName, stb->stbName);
free(stb);
stb = NULL;
} else {
......@@ -956,6 +1034,124 @@ TEST(tableMeta, superTableCase) {
catalogDestroy();
}
TEST(tableMeta, rmStbMeta) {
struct SCatalog *pCtg = NULL;
void *mockPointer = (void *)0x1;
SVgroupInfo vgInfo = {0};
ctgTestInitLogFile();
ctgTestSetPrepareDbVgroupsAndSuperMeta();
initQueryModuleMsgHandle();
int32_t code = catalogInit(NULL);
ASSERT_EQ(code, 0);
// sendCreateDbMsg(pConn->pTransporter, &pConn->pAppInfo->mgmtEp.epSet);
code = catalogGetHandle(ctgTestClusterId, &pCtg);
ASSERT_EQ(code, 0);
SName n = {.type = TSDB_TABLE_NAME_T, .acctId = 1};
strcpy(n.dbname, "db1");
strcpy(n.tname, ctgTestSTablename);
STableMeta *tableMeta = NULL;
code = catalogGetTableMeta(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &tableMeta);
ASSERT_EQ(code, 0);
ASSERT_EQ(tableMeta->vgId, 0);
ASSERT_EQ(tableMeta->tableType, TSDB_SUPER_TABLE);
ASSERT_EQ(tableMeta->sversion, ctgTestSVersion);
ASSERT_EQ(tableMeta->tversion, ctgTestTVersion);
ASSERT_EQ(tableMeta->uid, ctgTestSuid);
ASSERT_EQ(tableMeta->suid, ctgTestSuid);
ASSERT_EQ(tableMeta->tableInfo.numOfColumns, ctgTestColNum);
ASSERT_EQ(tableMeta->tableInfo.numOfTags, ctgTestTagNum);
ASSERT_EQ(tableMeta->tableInfo.precision, 1);
ASSERT_EQ(tableMeta->tableInfo.rowSize, 12);
code = catalogRemoveSTableMeta(pCtg, "1.db1", ctgTestSTablename, ctgTestSuid);
ASSERT_EQ(code, 0);
ASSERT_EQ(ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_DB_NUM), 1);
ASSERT_EQ(ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_META_NUM), 0);
ASSERT_EQ(ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_STB_NUM), 0);
ASSERT_EQ(ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_DB_RENT_NUM), 1);
ASSERT_EQ(ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_STB_RENT_NUM), 0);
catalogDestroy();
}
TEST(tableMeta, updateStbMeta) {
struct SCatalog *pCtg = NULL;
void *mockPointer = (void *)0x1;
SVgroupInfo vgInfo = {0};
ctgTestInitLogFile();
ctgTestSetPrepareDbVgroupsAndSuperMeta();
initQueryModuleMsgHandle();
int32_t code = catalogInit(NULL);
ASSERT_EQ(code, 0);
// sendCreateDbMsg(pConn->pTransporter, &pConn->pAppInfo->mgmtEp.epSet);
code = catalogGetHandle(ctgTestClusterId, &pCtg);
ASSERT_EQ(code, 0);
SName n = {.type = TSDB_TABLE_NAME_T, .acctId = 1};
strcpy(n.dbname, "db1");
strcpy(n.tname, ctgTestSTablename);
STableMeta *tableMeta = NULL;
code = catalogGetTableMeta(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &tableMeta);
ASSERT_EQ(code, 0);
ASSERT_EQ(tableMeta->vgId, 0);
ASSERT_EQ(tableMeta->tableType, TSDB_SUPER_TABLE);
ASSERT_EQ(tableMeta->sversion, ctgTestSVersion);
ASSERT_EQ(tableMeta->tversion, ctgTestTVersion);
ASSERT_EQ(tableMeta->uid, ctgTestSuid);
ASSERT_EQ(tableMeta->suid, ctgTestSuid);
ASSERT_EQ(tableMeta->tableInfo.numOfColumns, ctgTestColNum);
ASSERT_EQ(tableMeta->tableInfo.numOfTags, ctgTestTagNum);
ASSERT_EQ(tableMeta->tableInfo.precision, 1);
ASSERT_EQ(tableMeta->tableInfo.rowSize, 12);
tfree(tableMeta);
STableMetaRsp rsp = {0};
ctgTestBuildSTableMetaRsp(&rsp);
code = catalogUpdateSTableMeta(pCtg, &rsp);
ASSERT_EQ(code, 0);
ASSERT_EQ(ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_DB_NUM), 1);
ASSERT_EQ(ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_META_NUM), 1);
ASSERT_EQ(ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_STB_NUM), 1);
ASSERT_EQ(ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_DB_RENT_NUM), 1);
ASSERT_EQ(ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_STB_RENT_NUM), 1);
code = catalogGetTableMeta(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &tableMeta);
ASSERT_EQ(code, 0);
ASSERT_EQ(tableMeta->vgId, 0);
ASSERT_EQ(tableMeta->tableType, TSDB_SUPER_TABLE);
ASSERT_EQ(tableMeta->sversion, ctgTestSVersion + 1);
ASSERT_EQ(tableMeta->tversion, ctgTestTVersion + 1);
ASSERT_EQ(tableMeta->uid, ctgTestSuid + 1);
ASSERT_EQ(tableMeta->suid, ctgTestSuid + 1);
ASSERT_EQ(tableMeta->tableInfo.numOfColumns, ctgTestColNum);
ASSERT_EQ(tableMeta->tableInfo.numOfTags, ctgTestTagNum);
ASSERT_EQ(tableMeta->tableInfo.precision, 1 + 1);
ASSERT_EQ(tableMeta->tableInfo.rowSize, 12);
tfree(tableMeta);
catalogDestroy();
}
TEST(tableDistVgroup, normalTable) {
struct SCatalog *pCtg = NULL;
void *mockPointer = (void *)0x1;
......@@ -1062,9 +1258,11 @@ TEST(dbVgroup, getSetDbVgroupCase) {
void *mockPointer = (void *)0x1;
SVgroupInfo vgInfo = {0};
SVgroupInfo *pvgInfo = NULL;
SDBVgroupInfo dbVgroup = {0};
SDBVgroupInfo *dbVgroup = NULL;
SArray *vgList = NULL;
ctgTestInitLogFile();
ctgTestSetPrepareDbVgroupsAndNormalMeta();
initQueryModuleMsgHandle();
......@@ -1099,7 +1297,7 @@ TEST(dbVgroup, getSetDbVgroupCase) {
taosArrayDestroy(vgList);
ctgTestBuildDBVgroup(&dbVgroup);
code = catalogUpdateDBVgroup(pCtg, ctgTestDbname, &dbVgroup);
code = catalogUpdateDBVgroup(pCtg, ctgTestDbname, ctgTestDbId, dbVgroup);
ASSERT_EQ(code, 0);
code = catalogGetTableHashVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgInfo);
......@@ -1118,7 +1316,58 @@ TEST(dbVgroup, getSetDbVgroupCase) {
catalogDestroy();
}
TEST(multiThread, getSetDbVgroupCase) {
TEST(multiThread, getSetRmSameDbVgroup) {
struct SCatalog *pCtg = NULL;
void *mockPointer = (void *)0x1;
SVgroupInfo vgInfo = {0};
SVgroupInfo *pvgInfo = NULL;
SDBVgroupInfo dbVgroup = {0};
SArray *vgList = NULL;
ctgTestStop = false;
ctgTestInitLogFile();
ctgTestSetPrepareDbVgroups();
initQueryModuleMsgHandle();
// sendCreateDbMsg(pConn->pTransporter, &pConn->pAppInfo->mgmtEp.epSet);
int32_t code = catalogInit(NULL);
ASSERT_EQ(code, 0);
code = catalogGetHandle(ctgTestClusterId, &pCtg);
ASSERT_EQ(code, 0);
SName n = {.type = TSDB_TABLE_NAME_T, .acctId = 1};
strcpy(n.dbname, "db1");
strcpy(n.tname, ctgTestTablename);
pthread_attr_t thattr;
pthread_attr_init(&thattr);
pthread_t thread1, thread2;
pthread_create(&(thread1), &thattr, ctgTestSetSameDbVgroupThread, pCtg);
sleep(1);
pthread_create(&(thread2), &thattr, ctgTestGetDbVgroupThread, pCtg);
while (true) {
if (ctgTestDeadLoop) {
sleep(1);
} else {
sleep(ctgTestMTRunSec);
break;
}
}
ctgTestStop = true;
sleep(1);
catalogDestroy();
}
TEST(multiThread, getSetRmDiffDbVgroup) {
struct SCatalog *pCtg = NULL;
void *mockPointer = (void *)0x1;
SVgroupInfo vgInfo = {0};
......@@ -1149,10 +1398,10 @@ TEST(multiThread, getSetDbVgroupCase) {
pthread_attr_init(&thattr);
pthread_t thread1, thread2;
pthread_create(&(thread1), &thattr, ctgTestSetDbVgroupThread, pCtg);
pthread_create(&(thread1), &thattr, ctgTestSetDiffDbVgroupThread, pCtg);
sleep(1);
pthread_create(&(thread1), &thattr, ctgTestGetDbVgroupThread, pCtg);
pthread_create(&(thread2), &thattr, ctgTestGetDbVgroupThread, pCtg);
while (true) {
if (ctgTestDeadLoop) {
......@@ -1169,6 +1418,8 @@ TEST(multiThread, getSetDbVgroupCase) {
catalogDestroy();
}
TEST(multiThread, ctableMeta) {
struct SCatalog *pCtg = NULL;
void *mockPointer = (void *)0x1;
......@@ -1178,6 +1429,8 @@ TEST(multiThread, ctableMeta) {
SArray *vgList = NULL;
ctgTestStop = false;
ctgTestInitLogFile();
ctgTestSetPrepareDbVgroupsAndChildMeta();
initQueryModuleMsgHandle();
......@@ -1212,11 +1465,13 @@ TEST(multiThread, ctableMeta) {
}
ctgTestStop = true;
sleep(1);
sleep(2);
catalogDestroy();
}
TEST(rentTest, allRent) {
struct SCatalog *pCtg = NULL;
void *mockPointer = (void *)0x1;
......@@ -1229,6 +1484,8 @@ TEST(rentTest, allRent) {
SSTableMetaVersion *stable = NULL;
uint32_t num = 0;
ctgTestInitLogFile();
ctgTestSetPrepareDbVgroupsAndMultiSuperMeta();
initQueryModuleMsgHandle();
......@@ -1273,7 +1530,7 @@ TEST(rentTest, allRent) {
printf("%d - expired stableNum:%d\n", i, num);
if (stable) {
for (int32_t n = 0; n < num; ++n) {
printf("suid:%" PRId64 ", sversion:%d, tversion:%d\n", stable[n].suid, stable[n].sversion, stable[n].tversion);
printf("suid:%" PRId64 ", dbFName:%s, stbName:%s, sversion:%d, tversion:%d\n", stable[n].suid, stable[n].dbFName, stable[n].stbName, stable[n].sversion, stable[n].tversion);
}
free(stable);
stable = NULL;
......@@ -1291,4 +1548,4 @@ int main(int argc, char **argv) {
return RUN_ALL_TESTS();
}
#pragma GCC diagnostic pop
\ No newline at end of file
#pragma GCC diagnostic pop
......@@ -78,3 +78,11 @@ FuncDef setExecFuncs(FuncDef def, FExecGetEnv getEnv, FExecInit init, FExecProce
int32_t registerFunc(FuncDef func) {
}
int32_t fmGetFuncResultType(FuncMgtHandle handle, SFunctionNode* pFunc) {
return TSDB_CODE_SUCCESS;
}
bool fmIsAggFunc(int32_t funcId) {
return false;
}
......@@ -30,12 +30,9 @@ typedef struct SAstCreateContext {
SNode* pRootNode;
} SAstCreateContext;
int32_t createAstCreateContext(const SParseContext* pQueryCxt, SAstCreateContext* pCxt);
int32_t createAstCreateContext(SParseContext* pQueryCxt, SAstCreateContext* pCxt);
int32_t destroyAstCreateContext(SAstCreateContext* pCxt);
void* acquireRaii(SAstCreateContext* pCxt, void* p);
void* releaseRaii(SAstCreateContext* pCxt, void* p);
#ifdef __cplusplus
}
#endif
......
......@@ -13,11 +13,6 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "nodes.h"
#include "nodesShowStmts.h"
#include "astCreateContext.h"
#include "ttoken.h"
#ifndef _TD_AST_CREATE_FUNCS_H_
#define _TD_AST_CREATE_FUNCS_H_
......@@ -25,20 +20,55 @@
extern "C" {
#endif
bool checkTableName(const SToken* pTableName);
#include "nodes.h"
#include "nodesShowStmts.h"
#include "astCreateContext.h"
#include "ttoken.h"
extern SToken nil_token;
SNode* createRawExprNode(SAstCreateContext* pCxt, const SToken* pToken, SNode* pNode);
SNode* createRawExprNodeExt(SAstCreateContext* pCxt, const SToken* pStart, const SToken* pEnd, SNode* pNode);
SNode* releaseRawExprNode(SAstCreateContext* pCxt, SNode* pNode);
SToken getTokenFromRawExprNode(SAstCreateContext* pCxt, SNode* pNode);
SNodeList* createNodeList(SAstCreateContext* pCxt, SNode* pNode);
SNodeList* addNodeToList(SAstCreateContext* pCxt, SNodeList* pList, SNode* pNode);
SNode* addOrderByList(SAstCreateContext* pCxt, SNode* pStmt, SNodeList* pOrderByList);
SNode* addSlimit(SAstCreateContext* pCxt, SNode* pStmt, SNode* pSlimit);
SNode* addLimit(SAstCreateContext* pCxt, SNode* pStmt, SNode* pLimit);
SNode* createColumnNode(SAstCreateContext* pCxt, const SToken* pTableName, const SToken* pColumnName);
SNode* createColumnNode(SAstCreateContext* pCxt, const SToken* pTableAlias, const SToken* pColumnName);
SNode* createValueNode(SAstCreateContext* pCxt, int32_t dataType, const SToken* pLiteral);
SNode* createDurationValueNode(SAstCreateContext* pCxt, const SToken* pLiteral);
SNode* setProjectionAlias(SAstCreateContext* pCxt, SNode* pNode, const SToken* pAlias);
SNode* createLogicConditionNode(SAstCreateContext* pCxt, ELogicConditionType type, SNode* pParam1, SNode* pParam2);
SNode* createOperatorNode(SAstCreateContext* pCxt, EOperatorType type, SNode* pLeft, SNode* pRight);
SNode* createBetweenAnd(SAstCreateContext* pCxt, SNode* pExpr, SNode* pLeft, SNode* pRight);
SNode* createNotBetweenAnd(SAstCreateContext* pCxt, SNode* pExpr, SNode* pLeft, SNode* pRight);
SNode* createIsNullCondNode(SAstCreateContext* pCxt, SNode* pExpr, bool isNull);
SNode* createFunctionNode(SAstCreateContext* pCxt, const SToken* pFuncName, SNodeList* pParameterList);
SNode* createNodeListNode(SAstCreateContext* pCxt, SNodeList* pList);
SNode* createRealTableNode(SAstCreateContext* pCxt, const SToken* pDbName, const SToken* pTableName, const SToken* pTableAlias);
SNode* createTempTableNode(SAstCreateContext* pCxt, SNode* pSubquery, const SToken* pTableAlias);
SNode* createJoinTableNode(SAstCreateContext* pCxt, EJoinType type, SNode* pLeft, SNode* pRight, SNode* pJoinCond);
SNode* createLimitNode(SAstCreateContext* pCxt, const SToken* pLimit, const SToken* pOffset);
SNodeList* createNodeList(SAstCreateContext* pCxt, SNode* pNode);
SNode* createOrderByExprNode(SAstCreateContext* pCxt, SNode* pExpr, EOrder order, ENullOrder nullOrder);
SNode* createRealTableNode(SAstCreateContext* pCxt, const SToken* pDbName, const SToken* pTableName);
SNode* createSessionWindowNode(SAstCreateContext* pCxt, SNode* pCol, const SToken* pVal);
SNode* createStateWindowNode(SAstCreateContext* pCxt, SNode* pCol);
SNode* createIntervalWindowNode(SAstCreateContext* pCxt, SNode* pInterval, SNode* pOffset, SNode* pSliding, SNode* pFill);
SNode* createFillNode(SAstCreateContext* pCxt, EFillMode mode, SNode* pValues);
SNode* addWhereClause(SAstCreateContext* pCxt, SNode* pStmt, SNode* pWhere);
SNode* addPartitionByClause(SAstCreateContext* pCxt, SNode* pStmt, SNodeList* pPartitionByList);
SNode* addWindowClauseClause(SAstCreateContext* pCxt, SNode* pStmt, SNode* pWindow);
SNode* addGroupByClause(SAstCreateContext* pCxt, SNode* pStmt, SNodeList* pGroupByList);
SNode* addHavingClause(SAstCreateContext* pCxt, SNode* pStmt, SNode* pHaving);
SNode* addOrderByClause(SAstCreateContext* pCxt, SNode* pStmt, SNodeList* pOrderByList);
SNode* addSlimitClause(SAstCreateContext* pCxt, SNode* pStmt, SNode* pSlimit);
SNode* addLimitClause(SAstCreateContext* pCxt, SNode* pStmt, SNode* pLimit);
SNode* createSelectStmt(SAstCreateContext* pCxt, bool isDistinct, SNodeList* pProjectionList, SNode* pTable);
SNode* createSetOperator(SAstCreateContext* pCxt, ESetOperatorType type, SNode* pLeft, SNode* pRight);
SNode* createShowStmt(SAstCreateContext* pCxt, EShowStmtType type);
SNode* setProjectionAlias(SAstCreateContext* pCxt, SNode* pNode, const SToken* pAlias);
#ifdef __cplusplus
}
......
......@@ -10,8 +10,8 @@ SCreateAcctReq* buildAcctManipulationMsg(SSqlInfo* pInfo, int32_t* outputLen, in
SDropUserReq* buildDropUserMsg(SSqlInfo* pInfo, int32_t* outputLen, int64_t id, char* msgBuf, int32_t msgLen);
SShowReq* buildShowMsg(SShowInfo* pShowInfo, SParseContext* pParseCtx, SMsgBuf* pMsgBuf);
SCreateDbReq* buildCreateDbMsg(SCreateDbInfo* pCreateDbInfo, SParseContext *pCtx, SMsgBuf* pMsgBuf);
SMCreateStbReq* buildCreateStbMsg(SCreateTableSql* pCreateTableSql, int32_t* len, SParseContext* pParseCtx, SMsgBuf* pMsgBuf);
SMDropStbReq* buildDropStableMsg(SSqlInfo* pInfo, int32_t* len, SParseContext* pParseCtx, SMsgBuf* pMsgBuf);
char* buildCreateStbReq(SCreateTableSql* pCreateTableSql, int32_t* len, SParseContext* pParseCtx, SMsgBuf* pMsgBuf);
char* buildDropStableReq(SSqlInfo* pInfo, int32_t* len, SParseContext* pParseCtx, SMsgBuf* pMsgBuf);
SCreateDnodeReq *buildCreateDnodeMsg(SSqlInfo* pInfo, int32_t* len, SMsgBuf* pMsgBuf);
SDropDnodeReq *buildDropDnodeMsg(SSqlInfo* pInfo, int32_t* len, SMsgBuf* pMsgBuf);
......
此差异已折叠。
......@@ -28,6 +28,7 @@ typedef struct SQuery {
} SQuery;
int32_t doParse(SParseContext* pParseCxt, SQuery* pQuery);
int32_t doTranslate(SParseContext* pParseCxt, SQuery* pQuery);
#ifdef __cplusplus
}
......
......@@ -813,14 +813,14 @@ cmd ::= ALTER TABLE ids(X) cpxName(F) DROP COLUMN ids(A). {
cmd ::= ALTER TABLE ids(X) cpxName(F) MODIFY COLUMN columnlist(A). {
X.n += F.n;
SAlterTableInfo* pAlterTable = tSetAlterTableInfo(&X, A, NULL, TSDB_ALTER_TABLE_CHANGE_COLUMN, -1);
SAlterTableInfo* pAlterTable = tSetAlterTableInfo(&X, A, NULL, TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES, -1);
setSqlInfo(pInfo, pAlterTable, NULL, TSDB_SQL_ALTER_TABLE);
}
//////////////////////////////////ALTER TAGS statement/////////////////////////////////////
cmd ::= ALTER TABLE ids(X) cpxName(Y) ADD TAG columnlist(A). {
X.n += Y.n;
SAlterTableInfo* pAlterTable = tSetAlterTableInfo(&X, A, NULL, TSDB_ALTER_TABLE_ADD_TAG_COLUMN, -1);
SAlterTableInfo* pAlterTable = tSetAlterTableInfo(&X, A, NULL, TSDB_ALTER_TABLE_ADD_TAG, -1);
setSqlInfo(pInfo, pAlterTable, NULL, TSDB_SQL_ALTER_TABLE);
}
cmd ::= ALTER TABLE ids(X) cpxName(Z) DROP TAG ids(Y). {
......@@ -829,7 +829,7 @@ cmd ::= ALTER TABLE ids(X) cpxName(Z) DROP TAG ids(Y). {
toTSDBType(Y.type);
SArray* A = tListItemAppendToken(NULL, &Y, -1);
SAlterTableInfo* pAlterTable = tSetAlterTableInfo(&X, NULL, A, TSDB_ALTER_TABLE_DROP_TAG_COLUMN, -1);
SAlterTableInfo* pAlterTable = tSetAlterTableInfo(&X, NULL, A, TSDB_ALTER_TABLE_DROP_TAG, -1);
setSqlInfo(pInfo, pAlterTable, NULL, TSDB_SQL_ALTER_TABLE);
}
......@@ -842,7 +842,7 @@ cmd ::= ALTER TABLE ids(X) cpxName(F) CHANGE TAG ids(Y) ids(Z). {
toTSDBType(Z.type);
A = tListItemAppendToken(A, &Z, -1);
SAlterTableInfo* pAlterTable = tSetAlterTableInfo(&X, NULL, A, TSDB_ALTER_TABLE_CHANGE_TAG_COLUMN, -1);
SAlterTableInfo* pAlterTable = tSetAlterTableInfo(&X, NULL, A, TSDB_ALTER_TABLE_UPDATE_TAG_NAME, -1);
setSqlInfo(pInfo, pAlterTable, NULL, TSDB_SQL_ALTER_TABLE);
}
......@@ -859,7 +859,7 @@ cmd ::= ALTER TABLE ids(X) cpxName(F) SET TAG ids(Y) EQ tagitem(Z). {
cmd ::= ALTER TABLE ids(X) cpxName(F) MODIFY TAG columnlist(A). {
X.n += F.n;
SAlterTableInfo* pAlterTable = tSetAlterTableInfo(&X, A, NULL, TSDB_ALTER_TABLE_MODIFY_TAG_COLUMN, -1);
SAlterTableInfo* pAlterTable = tSetAlterTableInfo(&X, A, NULL, TSDB_ALTER_TABLE_UPDATE_TAG_BYTES, -1);
setSqlInfo(pInfo, pAlterTable, NULL, TSDB_SQL_ALTER_TABLE);
}
......@@ -882,14 +882,14 @@ cmd ::= ALTER STABLE ids(X) cpxName(F) DROP COLUMN ids(A). {
cmd ::= ALTER STABLE ids(X) cpxName(F) MODIFY COLUMN columnlist(A). {
X.n += F.n;
SAlterTableInfo* pAlterTable = tSetAlterTableInfo(&X, A, NULL, TSDB_ALTER_TABLE_CHANGE_COLUMN, TSDB_SUPER_TABLE);
SAlterTableInfo* pAlterTable = tSetAlterTableInfo(&X, A, NULL, TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES, TSDB_SUPER_TABLE);
setSqlInfo(pInfo, pAlterTable, NULL, TSDB_SQL_ALTER_TABLE);
}
//////////////////////////////////ALTER TAGS statement/////////////////////////////////////
cmd ::= ALTER STABLE ids(X) cpxName(Y) ADD TAG columnlist(A). {
X.n += Y.n;
SAlterTableInfo* pAlterTable = tSetAlterTableInfo(&X, A, NULL, TSDB_ALTER_TABLE_ADD_TAG_COLUMN, TSDB_SUPER_TABLE);
SAlterTableInfo* pAlterTable = tSetAlterTableInfo(&X, A, NULL, TSDB_ALTER_TABLE_ADD_TAG, TSDB_SUPER_TABLE);
setSqlInfo(pInfo, pAlterTable, NULL, TSDB_SQL_ALTER_TABLE);
}
cmd ::= ALTER STABLE ids(X) cpxName(Z) DROP TAG ids(Y). {
......@@ -898,7 +898,7 @@ cmd ::= ALTER STABLE ids(X) cpxName(Z) DROP TAG ids(Y). {
toTSDBType(Y.type);
SArray* A = tListItemAppendToken(NULL, &Y, -1);
SAlterTableInfo* pAlterTable = tSetAlterTableInfo(&X, NULL, A, TSDB_ALTER_TABLE_DROP_TAG_COLUMN, TSDB_SUPER_TABLE);
SAlterTableInfo* pAlterTable = tSetAlterTableInfo(&X, NULL, A, TSDB_ALTER_TABLE_DROP_TAG, TSDB_SUPER_TABLE);
setSqlInfo(pInfo, pAlterTable, NULL, TSDB_SQL_ALTER_TABLE);
}
......@@ -911,7 +911,7 @@ cmd ::= ALTER STABLE ids(X) cpxName(F) CHANGE TAG ids(Y) ids(Z). {
toTSDBType(Z.type);
A = tListItemAppendToken(A, &Z, -1);
SAlterTableInfo* pAlterTable = tSetAlterTableInfo(&X, NULL, A, TSDB_ALTER_TABLE_CHANGE_TAG_COLUMN, TSDB_SUPER_TABLE);
SAlterTableInfo* pAlterTable = tSetAlterTableInfo(&X, NULL, A, TSDB_ALTER_TABLE_UPDATE_TAG_NAME, TSDB_SUPER_TABLE);
setSqlInfo(pInfo, pAlterTable, NULL, TSDB_SQL_ALTER_TABLE);
}
......@@ -928,7 +928,7 @@ cmd ::= ALTER STABLE ids(X) cpxName(F) SET TAG ids(Y) EQ tagitem(Z). {
cmd ::= ALTER STABLE ids(X) cpxName(F) MODIFY TAG columnlist(A). {
X.n += F.n;
SAlterTableInfo* pAlterTable = tSetAlterTableInfo(&X, A, NULL, TSDB_ALTER_TABLE_MODIFY_TAG_COLUMN, TSDB_SUPER_TABLE);
SAlterTableInfo* pAlterTable = tSetAlterTableInfo(&X, A, NULL, TSDB_ALTER_TABLE_UPDATE_TAG_BYTES, TSDB_SUPER_TABLE);
setSqlInfo(pInfo, pAlterTable, NULL, TSDB_SQL_ALTER_TABLE);
}
......
......@@ -16,25 +16,14 @@
#include "ttoken.h"
#include "astCreateContext.h"
void* acquireRaii(SAstCreateContext* pCxt, void* p) {
if (NULL == p) {
return NULL;
}
return p;
int32_t createAstCreateContext(SParseContext* pQueryCxt, SAstCreateContext* pCxt) {
pCxt->pQueryCxt = pQueryCxt;
pCxt->notSupport = false;
pCxt->valid = true;
pCxt->pRootNode = NULL;
return TSDB_CODE_SUCCESS;
}
void* releaseRaii(SAstCreateContext* pCxt, void* p) {
if (NULL == p) {
return NULL;
}
return p;
int32_t destroyAstCreateContext(SAstCreateContext* pCxt) {
return TSDB_CODE_SUCCESS;
}
int32_t createAstCreater(const SParseContext* pQueryCxt, SAstCreateContext* pCxt) {
}
int32_t destroyAstCreater(SAstCreateContext* pCxt) {
}
......@@ -610,7 +610,7 @@ SAlterTableInfo *tSetAlterTableInfo(SToken *pTableName, SArray *pCols, SArray *p
pAlterTable->type = type;
pAlterTable->tableType = tableType;
if (type == TSDB_ALTER_TABLE_ADD_COLUMN || type == TSDB_ALTER_TABLE_ADD_TAG_COLUMN || type == TSDB_ALTER_TABLE_CHANGE_COLUMN || type == TSDB_ALTER_TABLE_MODIFY_TAG_COLUMN) {
if (type == TSDB_ALTER_TABLE_ADD_COLUMN || type == TSDB_ALTER_TABLE_ADD_TAG || type == TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES || type == TSDB_ALTER_TABLE_UPDATE_TAG_BYTES) {
pAlterTable->pAddColumns = pCols;
assert(pVals == NULL);
} else {
......
......@@ -249,112 +249,64 @@ SCreateDbReq* buildCreateDbMsg(SCreateDbInfo* pCreateDbInfo, SParseContext *pCtx
return pCreateMsg;
}
SMCreateStbReq* buildCreateStbMsg(SCreateTableSql* pCreateTableSql, int32_t* len, SParseContext* pParseCtx, SMsgBuf* pMsgBuf) {
SSchema* pSchema;
int32_t numOfTags = 0;
int32_t numOfCols = (int32_t) taosArrayGetSize(pCreateTableSql->colInfo.pColumns);
if (pCreateTableSql->colInfo.pTagColumns != NULL) {
numOfTags = (int32_t) taosArrayGetSize(pCreateTableSql->colInfo.pTagColumns);
char* buildCreateStbReq(SCreateTableSql* pCreateTableSql, int32_t* len, SParseContext* pParseCtx, SMsgBuf* pMsgBuf) {
SMCreateStbReq createReq = {0};
createReq.igExists = pCreateTableSql->existCheck ? 1 : 0;
createReq.pColumns = pCreateTableSql->colInfo.pColumns;
createReq.pTags = pCreateTableSql->colInfo.pTagColumns;
createReq.numOfColumns = (int32_t)taosArrayGetSize(pCreateTableSql->colInfo.pColumns);
createReq.numOfTags = (int32_t)taosArrayGetSize(pCreateTableSql->colInfo.pTagColumns);
SName n = {0};
if (createSName(&n, &pCreateTableSql->name, pParseCtx, pMsgBuf) != 0) {
return NULL;
}
SMCreateStbReq* pCreateStbMsg = (SMCreateStbReq*)calloc(1, sizeof(SMCreateStbReq) + (numOfCols + numOfTags) * sizeof(SSchema));
if (pCreateStbMsg == NULL) {
if (tNameExtractFullName(&n, createReq.name) != 0) {
buildInvalidOperationMsg(pMsgBuf, "invalid table name or database not specified");
return NULL;
}
char* pMsg = NULL;
#if 0
int32_t tableType = pCreateTableSql->type;
if (tableType != TSQL_CREATE_TABLE && tableType != TSQL_CREATE_STABLE) { // create by using super table, tags value
SArray* list = pInfo->pCreateTableInfo->childTableInfo;
int32_t numOfTables = (int32_t)taosArrayGetSize(list);
pCreateStbMsg->numOfTables = htonl(numOfTables);
pMsg = (char*)pCreateMsg;
for (int32_t i = 0; i < numOfTables; ++i) {
SCreateTableMsg* pCreate = (SCreateTableMsg*)pMsg;
pCreate->numOfColumns = htons(pCmd->numOfCols);
pCreate->numOfTags = htons(pCmd->count);
pMsg += sizeof(SCreateTableMsg);
SCreatedTableInfo* p = taosArrayGet(list, i);
strcpy(pCreate->tableName, p->fullname);
pCreate->igExists = (p->igExist) ? 1 : 0;
// use dbinfo from table id without modifying current db info
pMsg = serializeTagData(&p->tagdata, pMsg);
int32_t len = (int32_t)(pMsg - (char*)pCreate);
pCreate->len = htonl(len);
}
} else {
#endif
// create (super) table
SName n = {0};
int32_t code = createSName(&n, &pCreateTableSql->name, pParseCtx, pMsgBuf);
if (code != 0) {
return NULL;
}
code = tNameExtractFullName(&n, pCreateStbMsg->name);
if (code != 0) {
buildInvalidOperationMsg(pMsgBuf, "invalid table name or database not specified");
return NULL;
}
pCreateStbMsg->igExists = pCreateTableSql->existCheck ? 1 : 0;
pCreateStbMsg->numOfColumns = htonl(numOfCols);
pCreateStbMsg->numOfTags = htonl(numOfTags);
pSchema = (SSchema*)pCreateStbMsg->pSchema;
for (int i = 0; i < numOfCols; ++i) {
SField* pField = taosArrayGet(pCreateTableSql->colInfo.pColumns, i);
pSchema->type = pField->type;
pSchema->bytes = htonl(pField->bytes);
strcpy(pSchema->name, pField->name);
pSchema++;
}
for(int32_t i = 0; i < numOfTags; ++i) {
SField* pField = taosArrayGet(pCreateTableSql->colInfo.pTagColumns, i);
pSchema->type = pField->type;
pSchema->bytes = htonl(pField->bytes);
strcpy(pSchema->name, pField->name);
pSchema++;
}
pMsg = (char*)pSchema;
int32_t msgLen = (int32_t)(pMsg - (char*)pCreateStbMsg);
*len = msgLen;
int32_t tlen = tSerializeSMCreateStbReq(NULL, &createReq);
void* req = malloc(tlen);
if (req == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
return pCreateStbMsg;
void* buf = req;
tSerializeSMCreateStbReq(&buf, &createReq);
*len = tlen;
return req;
}
SMDropStbReq* buildDropStableMsg(SSqlInfo* pInfo, int32_t* len, SParseContext* pParseCtx, SMsgBuf* pMsgBuf) {
char* buildDropStableReq(SSqlInfo* pInfo, int32_t* len, SParseContext* pParseCtx, SMsgBuf* pMsgBuf) {
SToken* tableName = taosArrayGet(pInfo->pMiscInfo->a, 0);
SName name = {0};
SName name = {0};
int32_t code = createSName(&name, tableName, pParseCtx, pMsgBuf);
if (code != TSDB_CODE_SUCCESS) {
terrno = buildInvalidOperationMsg(pMsgBuf, "invalid table name");
return NULL;
}
SMDropStbReq *pDropTableMsg = (SMDropStbReq*) calloc(1, sizeof(SMDropStbReq));
SMDropStbReq dropReq = {0};
code = tNameExtractFullName(&name, dropReq.name);
code = tNameExtractFullName(&name, pDropTableMsg->name);
assert(code == TSDB_CODE_SUCCESS && name.type == TSDB_TABLE_NAME_T);
dropReq.igNotExists = pInfo->pMiscInfo->existsCheck ? 1 : 0;
int32_t tlen = tSerializeSMDropStbReq(NULL, &dropReq);
void* req = malloc(tlen);
if (req == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
pDropTableMsg->igNotExists = pInfo->pMiscInfo->existsCheck ? 1 : 0;
*len = sizeof(SMDropStbReq);
return pDropTableMsg;
void* buf = req;
tSerializeSMDropStbReq(&buf, &dropReq);
*len = tlen;
return req;
}
SCreateDnodeReq *buildCreateDnodeMsg(SSqlInfo* pInfo, int32_t* len, SMsgBuf* pMsgBuf) {
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
// int32_t doTranslate() {
// }
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
......@@ -132,7 +132,7 @@ typedef struct SSchJob {
SQueryProfileSummary summary;
} SSchJob;
#define SCH_TASK_READY_TO_LUNCH(task) (atomic_load_32(&(task)->childReady) >= taosArrayGetSize((task)->children))
#define SCH_TASK_READY_TO_LUNCH(readyNum, task) ((readyNum) >= taosArrayGetSize((task)->children))
#define SCH_IS_DATA_SRC_TASK(task) ((task)->plan->type == QUERY_TYPE_SCAN)
#define SCH_TASK_NEED_WAIT_ALL(task) ((task)->plan->type == QUERY_TYPE_MODIFY)
......
aux_source_directory(src SYNC_SRC)
add_library(sync ${SYNC_SRC})
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册