提交 652c05c6 编写于 作者: L Liu Jicong

Merge branch '3.0' into feature/tq

...@@ -221,8 +221,7 @@ typedef struct SBuildTableMetaInput { ...@@ -221,8 +221,7 @@ typedef struct SBuildTableMetaInput {
typedef struct SBuildUseDBInput { typedef struct SBuildUseDBInput {
char db[TSDB_TABLE_FNAME_LEN]; char db[TSDB_TABLE_FNAME_LEN];
int32_t vgroupVersion; int32_t vgVersion;
int32_t dbGroupVersion;
} SBuildUseDBInput; } SBuildUseDBInput;
...@@ -627,8 +626,7 @@ typedef struct { ...@@ -627,8 +626,7 @@ typedef struct {
typedef struct { typedef struct {
char db[TSDB_TABLE_FNAME_LEN]; char db[TSDB_TABLE_FNAME_LEN];
int8_t ignoreNotExists; int8_t ignoreNotExists;
int32_t vgroupVersion; int32_t vgVersion;
int32_t dbGroupVersion;
int32_t reserve[8]; int32_t reserve[8];
} SUseDbMsg; } SUseDbMsg;
...@@ -808,6 +806,9 @@ typedef struct SSTableVgroupMsg { ...@@ -808,6 +806,9 @@ typedef struct SSTableVgroupMsg {
typedef struct SVgroupInfo { typedef struct SVgroupInfo {
int32_t vgId; int32_t vgId;
int32_t hashBegin;
int32_t hashEnd;
int8_t inUse;
int8_t numOfEps; int8_t numOfEps;
SEpAddrMsg epAddr[TSDB_MAX_REPLICA]; SEpAddrMsg epAddr[TSDB_MAX_REPLICA];
} SVgroupInfo; } SVgroupInfo;
...@@ -841,7 +842,7 @@ typedef struct { ...@@ -841,7 +842,7 @@ typedef struct {
int32_t tversion; int32_t tversion;
uint64_t tuid; uint64_t tuid;
uint64_t suid; uint64_t suid;
SVgroupMsg vgroup; int32_t vgId;
SSchema pSchema[]; SSchema pSchema[];
} STableMetaMsg; } STableMetaMsg;
...@@ -863,15 +864,12 @@ typedef struct { ...@@ -863,15 +864,12 @@ typedef struct {
} STagData; } STagData;
typedef struct { typedef struct {
int32_t vgroupNum; char db[TSDB_FULL_DB_NAME_LEN];
int32_t vgroupVersion; int32_t vgVersion;
char db[TSDB_TABLE_FNAME_LEN]; int32_t vgNum;
int32_t dbVgroupVersion; int8_t hashMethod;
int32_t dbVgroupNum;
int32_t dbHashRange;
SVgroupInfo vgroupInfo[]; SVgroupInfo vgroupInfo[];
//int32_t vgIdList[]; } SUseDbRsp;
} SUseDbRspMsg;
......
...@@ -29,12 +29,7 @@ extern char tsLocalFqdn[]; ...@@ -29,12 +29,7 @@ extern char tsLocalFqdn[];
extern char tsLocalEp[]; extern char tsLocalEp[];
extern uint16_t tsServerPort; extern uint16_t tsServerPort;
extern int32_t tsStatusInterval; extern int32_t tsStatusInterval;
extern int32_t tsNumOfMnodes;
extern int8_t tsEnableVnodeBak;
extern int8_t tsEnableTelemetryReporting; extern int8_t tsEnableTelemetryReporting;
extern char tsArbitrator[];
extern int8_t tsArbOnline;
extern int64_t tsArbOnlineTimestamp;
// common // common
extern int tsRpcTimer; extern int tsRpcTimer;
...@@ -60,7 +55,6 @@ extern int tsCompatibleModel; // 2.0 compatible model ...@@ -60,7 +55,6 @@ extern int tsCompatibleModel; // 2.0 compatible model
extern int32_t tsQueryBufferSize; // maximum allowed usage buffer size in MB for each data node during query processing extern int32_t tsQueryBufferSize; // maximum allowed usage buffer size in MB for each data node during query processing
extern int64_t tsQueryBufferSizeBytes; // maximum allowed usage buffer size in byte for each data node during query processing extern int64_t tsQueryBufferSizeBytes; // maximum allowed usage buffer size in byte for each data node during query processing
extern int32_t tsRetrieveBlockingModel;// retrieve threads will be blocked extern int32_t tsRetrieveBlockingModel;// retrieve threads will be blocked
extern int8_t tsKeepOriginalColumnName; extern int8_t tsKeepOriginalColumnName;
// client // client
...@@ -78,66 +72,15 @@ extern float tsStreamComputDelayRatio; // the delayed computing ration of the ...@@ -78,66 +72,15 @@ extern float tsStreamComputDelayRatio; // the delayed computing ration of the
extern int32_t tsProjectExecInterval; extern int32_t tsProjectExecInterval;
extern int64_t tsMaxRetentWindow; extern int64_t tsMaxRetentWindow;
// db parameters in client
extern int32_t tsCacheBlockSize;
extern int32_t tsBlocksPerVnode;
extern int32_t tsTableIncStepPerVnode;
extern int32_t tsMaxVgroupsPerDb;
extern int16_t tsDaysPerFile;
extern int32_t tsDaysToKeep;
extern int32_t tsMinRowsInFileBlock;
extern int32_t tsMaxRowsInFileBlock;
extern int16_t tsCommitTime; // seconds
extern int32_t tsTimePrecision;
extern int8_t tsCompression;
extern int8_t tsWAL;
extern int32_t tsFsyncPeriod;
extern int32_t tsReplications;
extern int16_t tsPartitons;
extern int32_t tsQuorum;
extern int8_t tsUpdate;
extern int8_t tsCacheLastRow;
//tsdb
extern bool tsdbForceKeepFile;
// balance // balance
extern int8_t tsEnableBalance;
extern int8_t tsAlternativeRole;
extern int32_t tsBalanceInterval;
extern int32_t tsOfflineThreshold;
extern int8_t tsEnableFlowCtrl;
extern int8_t tsEnableSlaveQuery; extern int8_t tsEnableSlaveQuery;
extern int8_t tsEnableAdjustMaster;
// restful // interna
extern int32_t tsRestRowLimit;
extern int8_t tsTelegrafUseFieldNum;
// mqtt
extern int8_t tsEnableMqttModule;
extern char tsMqttHostName[];
extern char tsMqttPort[];
extern char tsMqttUser[];
extern char tsMqttPass[];
extern char tsMqttClientId[];
extern char tsMqttTopic[];
// monitor
extern int8_t tsEnableMonitorModule;
extern char tsMonitorDbName[];
extern char tsInternalPass[];
extern int32_t tsMonitorInterval;
// stream
extern int8_t tsEnableStream;
// internal
extern int8_t tsPrintAuth; extern int8_t tsPrintAuth;
extern char tsVnodeDir[]; extern char tsVnodeDir[];
extern char tsMnodeDir[]; extern char tsMnodeDir[];
extern int64_t tsTickPerDay[3]; extern int64_t tsTickPerDay[3];
extern int32_t tsTopicBianryLen;
// system info // system info
extern float tsTotalLogDirGB; extern float tsTotalLogDirGB;
......
...@@ -32,7 +32,7 @@ extern "C" { ...@@ -32,7 +32,7 @@ extern "C" {
struct SCatalog; struct SCatalog;
typedef struct SCatalogReq { typedef struct SCatalogReq {
char clusterId[TSDB_CLUSTER_ID_LEN]; //???? char dbName[TSDB_DB_NAME_LEN];
SArray *pTableName; // table full name SArray *pTableName; // table full name
SArray *pUdf; // udf name SArray *pUdf; // udf name
bool qNodeRequired; // valid qnode bool qNodeRequired; // valid qnode
...@@ -45,42 +45,10 @@ typedef struct SMetaData { ...@@ -45,42 +45,10 @@ typedef struct SMetaData {
SEpSet *pEpSet; // qnode epset list SEpSet *pEpSet; // qnode epset list
} SMetaData; } SMetaData;
typedef struct STableComInfo {
uint8_t numOfTags; // the number of tags in schema
uint8_t precision; // the number of precision
int16_t numOfColumns; // the number of columns
int32_t rowSize; // row size of the schema
} STableComInfo;
/*
* ASSERT(sizeof(SCTableMeta) == 24)
* ASSERT(tableType == TSDB_CHILD_TABLE)
* The cached child table meta info. For each child table, 24 bytes are required to keep the essential table info.
*/
typedef struct SCTableMeta {
int32_t vgId:24;
int8_t tableType;
uint64_t uid;
uint64_t suid;
} SCTableMeta;
/*
* Note that the first 24 bytes of STableMeta are identical to SCTableMeta, it is safe to cast a STableMeta to be a SCTableMeta.
*/
typedef struct STableMeta {
int32_t vgId:24;
int8_t tableType;
uint64_t uid;
uint64_t suid;
// if the table is TSDB_CHILD_TABLE, the following information is acquired from the corresponding super table meta info
int16_t sversion;
int16_t tversion;
STableComInfo tableInfo;
SSchema schema[];
} STableMeta;
typedef struct SCatalogCfg { typedef struct SCatalogCfg {
bool enableVgroupCache;
uint32_t maxTblCacheNum;
uint32_t maxDBCacheNum;
} SCatalogCfg; } SCatalogCfg;
int32_t catalogInit(SCatalogCfg *cfg); int32_t catalogInit(SCatalogCfg *cfg);
...@@ -93,22 +61,14 @@ int32_t catalogInit(SCatalogCfg *cfg); ...@@ -93,22 +61,14 @@ int32_t catalogInit(SCatalogCfg *cfg);
*/ */
int32_t catalogGetHandle(const char *clusterId, struct SCatalog** catalogHandle); int32_t catalogGetHandle(const char *clusterId, struct SCatalog** catalogHandle);
int32_t catalogGetVgroupVersion(struct SCatalog* pCatalog, int32_t* version);
int32_t catalogGetVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, SArray** pVgroupList);
int32_t catalogUpdateVgroup(struct SCatalog* pCatalog, SVgroupListInfo* pVgroup);
int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName, int32_t* version); int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName, int32_t* version);
int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbName, int32_t forceUpdate, SDBVgroupInfo** dbInfo); int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbName, int32_t forceUpdate, SDBVgroupInfo* dbInfo);
int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo); int32_t catalogUpdateDBVgroupCache(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo);
int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pTableName, STableMeta* pTableMeta); int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, STableMeta** pTableMeta);
int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const STableMeta* pTableMeta); int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName);
int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const STableMeta* pTableMeta, STableMeta* pNewTableMeta); int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, STableMeta** pTableMeta);
/** /**
...@@ -117,7 +77,7 @@ int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const ...@@ -117,7 +77,7 @@ int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const
* @pVgroupList - array of SVgroupInfo * @pVgroupList - array of SVgroupInfo
* @return * @return
*/ */
int32_t catalogGetTableVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pTableName, SArray* pVgroupList); int32_t catalogGetTableVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, SArray* pVgroupList);
/** /**
...@@ -129,7 +89,7 @@ int32_t catalogGetTableVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSe ...@@ -129,7 +89,7 @@ int32_t catalogGetTableVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSe
* @param pMetaData * @param pMetaData
* @return * @return
*/ */
int32_t catalogGetAllMeta(struct SCatalog* pCatalog, const SEpSet* pMgmtEps, const SCatalogReq* pReq, SMetaData* pRsp); int32_t catalogGetAllMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SCatalogReq* pReq, SMetaData* pRsp);
int32_t catalogGetQnodeList(struct SCatalog* pCatalog, const SEpSet* pMgmtEps, SEpSet* pQnodeEpSet); int32_t catalogGetQnodeList(struct SCatalog* pCatalog, const SEpSet* pMgmtEps, SEpSet* pQnodeEpSet);
......
...@@ -132,13 +132,15 @@ struct SInsertStmtInfo; ...@@ -132,13 +132,15 @@ struct SInsertStmtInfo;
bool qIsInsertSql(const char* pStr, size_t length); bool qIsInsertSql(const char* pStr, size_t length);
typedef struct SParseContext { typedef struct SParseContext {
const char* pSql; // sql string const char* pAcctId;
size_t sqlLen; // length of the sql string
int64_t id; // operator id, generated by uuid generator
const char* pDbname; const char* pDbname;
void *pRpc;
const char* pClusterId;
const SEpSet* pEpSet; const SEpSet* pEpSet;
int64_t id; // query id, generated by uuid generator
int8_t schemaAttached; // denote if submit block is built with table schema or not int8_t schemaAttached; // denote if submit block is built with table schema or not
const char* pSql; // sql string
size_t sqlLen; // length of the sql string
char* pMsg; // extended error message if exists to help avoid the problem in sql statement. char* pMsg; // extended error message if exists to help avoid the problem in sql statement.
int32_t msgLen; // max length of the msg int32_t msgLen; // max length of the msg
} SParseContext; } SParseContext;
......
...@@ -20,52 +20,92 @@ ...@@ -20,52 +20,92 @@
extern "C" { extern "C" {
#endif #endif
#include "taosmsg.h"
#define QUERY_TYPE_MERGE 1 #define QUERY_TYPE_MERGE 1
#define QUERY_TYPE_PARTIAL 2 #define QUERY_TYPE_PARTIAL 2
#define QUERY_TYPE_SCAN 3 #define QUERY_TYPE_SCAN 3
enum OPERATOR_TYPE_E { enum OPERATOR_TYPE_E {
OP_TableScan = 1, OP_Unknown,
OP_DataBlocksOptScan = 2, #define INCLUDE_AS_ENUM
OP_TableSeqScan = 3, #include "plannerOp.h"
OP_TagScan = 4, #undef INCLUDE_AS_ENUM
OP_TableBlockInfoScan= 5, OP_TotalNum
OP_Aggregate = 6,
OP_Project = 7,
OP_Groupby = 8,
OP_Limit = 9,
OP_SLimit = 10,
OP_TimeWindow = 11,
OP_SessionWindow = 12,
OP_StateWindow = 22,
OP_Fill = 13,
OP_MultiTableAggregate = 14,
OP_MultiTableTimeInterval = 15,
// OP_DummyInput = 16, //TODO remove it after fully refactor.
// OP_MultiwayMergeSort = 17, // multi-way data merge into one input stream.
// OP_GlobalAggregate = 18, // global merge for the multi-way data sources.
OP_Filter = 19,
OP_Distinct = 20,
OP_Join = 21,
OP_AllTimeWindow = 23,
OP_AllMultiTableTimeInterval = 24,
OP_Order = 25,
OP_Exchange = 26,
}; };
struct SEpSet; struct SEpSet;
struct SQueryPlanNode;
struct SPhyNode;
struct SQueryStmtInfo; struct SQueryStmtInfo;
typedef SSchema SSlotSchema;
typedef struct SDataBlockSchema {
SSlotSchema *pSchema;
int32_t numOfCols; // number of columns
} SDataBlockSchema;
typedef struct SQueryNodeBasicInfo {
int32_t type; // operator type
const char *name; // operator name
} SQueryNodeBasicInfo;
typedef struct SPhyNode {
SQueryNodeBasicInfo info;
SArray *pTargets; // target list to be computed or scanned at this node
SArray *pConditions; // implicitly-ANDed qual conditions
SDataBlockSchema targetSchema;
// children plan to generated result for current node to process
// in case of join, multiple plan nodes exist.
SArray *pChildren;
struct SPhyNode *pParent;
} SPhyNode;
typedef struct SScanPhyNode {
SPhyNode node;
uint64_t uid; // unique id of the table
int8_t tableType;
} SScanPhyNode;
typedef SScanPhyNode SSystemTableScanPhyNode;
typedef SScanPhyNode STagScanPhyNode;
typedef struct STableScanPhyNode {
SScanPhyNode scan;
uint8_t scanFlag; // denotes reversed scan of data or not
STimeWindow window;
SArray *pTagsConditions; // implicitly-ANDed tag qual conditions
} STableScanPhyNode;
typedef STableScanPhyNode STableSeqScanPhyNode;
typedef struct SProjectPhyNode {
SPhyNode node;
} SProjectPhyNode;
typedef struct SExchangePhyNode {
SPhyNode node;
uint64_t templateId;
SArray *pSourceEpSet; // SEpSet
} SExchangePhyNode;
typedef struct SSubplanId {
uint64_t queryId;
uint64_t templateId;
uint64_t subplanId;
} SSubplanId;
typedef struct SSubplan { typedef struct SSubplan {
SSubplanId id; // unique id of the subplan
int32_t type; // QUERY_TYPE_MERGE|QUERY_TYPE_PARTIAL|QUERY_TYPE_SCAN int32_t type; // QUERY_TYPE_MERGE|QUERY_TYPE_PARTIAL|QUERY_TYPE_SCAN
SArray *pDatasource; // the datasource subplan,from which to fetch the result int32_t level; // the execution level of current subplan, starting from 0.
struct SPhyNode *pNode; // physical plan of current subplan SEpSet execEpSet; // for the scan sub plan, the optional execution node
SArray *pChildern; // the datasource subplan,from which to fetch the result
SArray *pParents; // the data destination subplan, get data from current subplan
SPhyNode *pNode; // physical plan of current subplan
} SSubplan; } SSubplan;
typedef struct SQueryDag { typedef struct SQueryDag {
SArray **pSubplans; SArray *pSubplans; // Element is SArray*, and nested element is SSubplan. The execution level of subplan, starting from 0.
} SQueryDag; } SQueryDag;
/** /**
...@@ -75,6 +115,7 @@ int32_t qCreateQueryDag(const struct SQueryStmtInfo* pQueryInfo, struct SEpSet* ...@@ -75,6 +115,7 @@ int32_t qCreateQueryDag(const struct SQueryStmtInfo* pQueryInfo, struct SEpSet*
int32_t qExplainQuery(const struct SQueryStmtInfo* pQueryInfo, struct SEpSet* pQnode, char** str); int32_t qExplainQuery(const struct SQueryStmtInfo* pQueryInfo, struct SEpSet* pQnode, char** str);
/** /**
* Convert to subplan to string for the scheduler to send to the executor * Convert to subplan to string for the scheduler to send to the executor
*/ */
......
...@@ -13,23 +13,36 @@ ...@@ -13,23 +13,36 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#ifndef TDENGINE_HTTP_H #if defined(INCLUDE_AS_ENUM) // enum define mode
#define TDENGINE_HTTP_H #undef OP_ENUM_MACRO
#define OP_ENUM_MACRO(op) OP_##op,
#ifdef __cplusplus #elif defined(INCLUDE_AS_NAME) // comment define mode
extern "C" { #undef OP_ENUM_MACRO
#endif #define OP_ENUM_MACRO(op) #op,
#else
#include <stdint.h> #error To use this include file, first define either INCLUDE_AS_ENUM or INCLUDE_AS_NAME
int32_t httpGetReqCount();
int32_t httpInitSystem();
int32_t httpStartSystem();
void httpStopSystem();
void httpCleanUpSystem();
#ifdef __cplusplus
}
#endif #endif
#endif OP_ENUM_MACRO(TableScan)
OP_ENUM_MACRO(DataBlocksOptScan)
OP_ENUM_MACRO(TableSeqScan)
OP_ENUM_MACRO(TagScan)
OP_ENUM_MACRO(TableBlockInfoScan)
OP_ENUM_MACRO(Aggregate)
OP_ENUM_MACRO(Project)
OP_ENUM_MACRO(Groupby)
OP_ENUM_MACRO(Limit)
OP_ENUM_MACRO(SLimit)
OP_ENUM_MACRO(TimeWindow)
OP_ENUM_MACRO(SessionWindow)
OP_ENUM_MACRO(StateWindow)
OP_ENUM_MACRO(Fill)
OP_ENUM_MACRO(MultiTableAggregate)
OP_ENUM_MACRO(MultiTableTimeInterval)
OP_ENUM_MACRO(Filter)
OP_ENUM_MACRO(Distinct)
OP_ENUM_MACRO(Join)
OP_ENUM_MACRO(AllTimeWindow)
OP_ENUM_MACRO(AllMultiTableTimeInterval)
OP_ENUM_MACRO(Order)
OP_ENUM_MACRO(Exchange)
...@@ -21,21 +21,66 @@ extern "C" { ...@@ -21,21 +21,66 @@ extern "C" {
#endif #endif
#include "tarray.h" #include "tarray.h"
#include "thash.h"
typedef SVgroupListRspMsg SVgroupListInfo; typedef SVgroupListRspMsg SVgroupListInfo;
typedef struct STableComInfo {
uint8_t numOfTags; // the number of tags in schema
uint8_t precision; // the number of precision
int16_t numOfColumns; // the number of columns
int32_t rowSize; // row size of the schema
} STableComInfo;
/*
* ASSERT(sizeof(SCTableMeta) == 24)
* ASSERT(tableType == TSDB_CHILD_TABLE)
* The cached child table meta info. For each child table, 24 bytes are required to keep the essential table info.
*/
typedef struct SCTableMeta {
int32_t vgId:24;
int8_t tableType;
uint64_t uid;
uint64_t suid;
} SCTableMeta;
/*
* Note that the first 24 bytes of STableMeta are identical to SCTableMeta, it is safe to cast a STableMeta to be a SCTableMeta.
*/
typedef struct STableMeta {
//BEGIN: KEEP THIS PART SAME WITH SCTableMeta
int32_t vgId:24;
int8_t tableType;
uint64_t uid;
uint64_t suid;
//END: KEEP THIS PART SAME WITH SCTableMeta
// if the table is TSDB_CHILD_TABLE, the following information is acquired from the corresponding super table meta info
int16_t sversion;
int16_t tversion;
STableComInfo tableInfo;
SSchema schema[];
} STableMeta;
typedef struct SDBVgroupInfo { typedef struct SDBVgroupInfo {
int32_t vgroupVersion; int32_t vgVersion;
SArray *vgId; int8_t hashMethod;
int32_t hashRange; SHashObj *vgInfo; //key:vgId, value:SVgroupInfo
} SDBVgroupInfo; } SDBVgroupInfo;
typedef struct SUseDbOutput { typedef struct SUseDbOutput {
SVgroupListInfo *vgroupList; char db[TSDB_FULL_DB_NAME_LEN];
char db[TSDB_TABLE_FNAME_LEN]; SDBVgroupInfo dbVgroup;
SDBVgroupInfo *dbVgroup;
} SUseDbOutput; } SUseDbOutput;
typedef struct STableMetaOutput {
int32_t metaNum;
char ctbFname[TSDB_TABLE_FNAME_LEN];
char tbFname[TSDB_TABLE_FNAME_LEN];
SCTableMeta ctbMeta;
STableMeta *tbMeta;
} STableMetaOutput;
extern int32_t (*queryBuildMsg[TSDB_MSG_TYPE_MAX])(void* input, char **msg, int32_t msgSize, int32_t *msgLen); extern int32_t (*queryBuildMsg[TSDB_MSG_TYPE_MAX])(void* input, char **msg, int32_t msgSize, int32_t *msgLen);
extern int32_t (*queryProcessMsgRsp[TSDB_MSG_TYPE_MAX])(void* output, char *msg, int32_t msgSize); extern int32_t (*queryProcessMsgRsp[TSDB_MSG_TYPE_MAX])(void* output, char *msg, int32_t msgSize);
......
...@@ -41,7 +41,7 @@ typedef struct SArray { ...@@ -41,7 +41,7 @@ typedef struct SArray {
* @param elemSize * @param elemSize
* @return * @return
*/ */
void* taosArrayInit(size_t size, size_t elemSize); SArray* taosArrayInit(size_t size, size_t elemSize);
/** /**
* *
......
...@@ -32,7 +32,6 @@ extern int32_t mDebugFlag; ...@@ -32,7 +32,6 @@ extern int32_t mDebugFlag;
extern int32_t cDebugFlag; extern int32_t cDebugFlag;
extern int32_t jniDebugFlag; extern int32_t jniDebugFlag;
extern int32_t tmrDebugFlag; extern int32_t tmrDebugFlag;
extern int32_t sdbDebugFlag;
extern int32_t httpDebugFlag; extern int32_t httpDebugFlag;
extern int32_t mqttDebugFlag; extern int32_t mqttDebugFlag;
extern int32_t monDebugFlag; extern int32_t monDebugFlag;
......
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
extern "C" { extern "C" {
#endif #endif
extern bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTags);
......
...@@ -22,10 +22,6 @@ void initLog(const char* path) { ...@@ -22,10 +22,6 @@ void initLog(const char* path) {
cDebugFlag = 0; cDebugFlag = 0;
jniDebugFlag = 0; jniDebugFlag = 0;
tmrDebugFlag = 0; tmrDebugFlag = 0;
sdbDebugFlag = 0;
httpDebugFlag = 0;
mqttDebugFlag = 0;
monDebugFlag = 0;
uDebugFlag = 143; uDebugFlag = 143;
rpcDebugFlag = 0; rpcDebugFlag = 0;
odbcDebugFlag = 0; odbcDebugFlag = 0;
......
...@@ -214,6 +214,7 @@ typedef struct { ...@@ -214,6 +214,7 @@ typedef struct {
int64_t createdTime; int64_t createdTime;
int64_t updateTime; int64_t updateTime;
int64_t uid; int64_t uid;
int32_t version;
SDbCfg cfg; SDbCfg cfg;
} SDbObj; } SDbObj;
......
...@@ -74,6 +74,7 @@ static SSdbRaw *mndDbActionEncode(SDbObj *pDb) { ...@@ -74,6 +74,7 @@ static SSdbRaw *mndDbActionEncode(SDbObj *pDb) {
SDB_SET_INT64(pRaw, dataPos, pDb->createdTime) SDB_SET_INT64(pRaw, dataPos, pDb->createdTime)
SDB_SET_INT64(pRaw, dataPos, pDb->updateTime) SDB_SET_INT64(pRaw, dataPos, pDb->updateTime)
SDB_SET_INT64(pRaw, dataPos, pDb->uid) SDB_SET_INT64(pRaw, dataPos, pDb->uid)
SDB_SET_INT32(pRaw, dataPos, pDb->version)
SDB_SET_INT32(pRaw, dataPos, pDb->cfg.cacheBlockSize) SDB_SET_INT32(pRaw, dataPos, pDb->cfg.cacheBlockSize)
SDB_SET_INT32(pRaw, dataPos, pDb->cfg.totalBlocks) SDB_SET_INT32(pRaw, dataPos, pDb->cfg.totalBlocks)
SDB_SET_INT32(pRaw, dataPos, pDb->cfg.daysPerFile) SDB_SET_INT32(pRaw, dataPos, pDb->cfg.daysPerFile)
...@@ -117,6 +118,7 @@ static SSdbRow *mndDbActionDecode(SSdbRaw *pRaw) { ...@@ -117,6 +118,7 @@ static SSdbRow *mndDbActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT64(pRaw, pRow, dataPos, &pDb->createdTime) SDB_GET_INT64(pRaw, pRow, dataPos, &pDb->createdTime)
SDB_GET_INT64(pRaw, pRow, dataPos, &pDb->updateTime) SDB_GET_INT64(pRaw, pRow, dataPos, &pDb->updateTime)
SDB_GET_INT64(pRaw, pRow, dataPos, &pDb->uid) SDB_GET_INT64(pRaw, pRow, dataPos, &pDb->uid)
SDB_GET_INT32(pRaw, pRow, dataPos, &pDb->version)
SDB_GET_INT32(pRaw, pRow, dataPos, &pDb->cfg.cacheBlockSize) SDB_GET_INT32(pRaw, pRow, dataPos, &pDb->cfg.cacheBlockSize)
SDB_GET_INT32(pRaw, pRow, dataPos, &pDb->cfg.totalBlocks) SDB_GET_INT32(pRaw, pRow, dataPos, &pDb->cfg.totalBlocks)
SDB_GET_INT32(pRaw, pRow, dataPos, &pDb->cfg.daysPerFile) SDB_GET_INT32(pRaw, pRow, dataPos, &pDb->cfg.daysPerFile)
...@@ -571,6 +573,7 @@ static int32_t mndProcessAlterDbMsg(SMnodeMsg *pMsg) { ...@@ -571,6 +573,7 @@ static int32_t mndProcessAlterDbMsg(SMnodeMsg *pMsg) {
return code; return code;
} }
dbObj.version++;
code = mndUpdateDb(pMnode, pMsg, pDb, &dbObj); code = mndUpdateDb(pMnode, pMsg, pDb, &dbObj);
mndReleaseDb(pMnode, pDb); mndReleaseDb(pMnode, pDb);
......
...@@ -69,7 +69,7 @@ int32_t mndInitStb(SMnode *pMnode) { ...@@ -69,7 +69,7 @@ int32_t mndInitStb(SMnode *pMnode) {
void mndCleanupStb(SMnode *pMnode) {} void mndCleanupStb(SMnode *pMnode) {}
static SSdbRaw *mndStbActionEncode(SStbObj *pStb) { static SSdbRaw *mndStbActionEncode(SStbObj *pStb) {
int32_t size = sizeof(SStbObj) + (pStb->numOfColumns + pStb->numOfTags) * sizeof(SSchema); int32_t size = sizeof(SStbObj) + (pStb->numOfColumns + pStb->numOfTags) * sizeof(SSchema) + TSDB_STB_RESERVE_SIZE;
SSdbRaw *pRaw = sdbAllocRaw(SDB_STB, TSDB_STB_VER_NUM, size); SSdbRaw *pRaw = sdbAllocRaw(SDB_STB, TSDB_STB_VER_NUM, size);
if (pRaw == NULL) return NULL; if (pRaw == NULL) return NULL;
......
...@@ -71,7 +71,7 @@ int32_t mndInitVgroup(SMnode *pMnode) { ...@@ -71,7 +71,7 @@ int32_t mndInitVgroup(SMnode *pMnode) {
void mndCleanupVgroup(SMnode *pMnode) {} void mndCleanupVgroup(SMnode *pMnode) {}
static SSdbRaw *mndVgroupActionEncode(SVgObj *pVgroup) { static SSdbRaw *mndVgroupActionEncode(SVgObj *pVgroup) {
SSdbRaw *pRaw = sdbAllocRaw(SDB_DB, TSDB_VGROUP_VER_NUM, sizeof(SDbObj)); SSdbRaw *pRaw = sdbAllocRaw(SDB_DB, TSDB_VGROUP_VER_NUM, sizeof(SVgObj) + TSDB_VGROUP_RESERVE_SIZE);
if (pRaw == NULL) return NULL; if (pRaw == NULL) return NULL;
int32_t dataPos = 0; int32_t dataPos = 0;
...@@ -141,11 +141,8 @@ static int32_t mndProcessCompactVnodeRsp(SMnodeMsg *pMsg) { return 0; } ...@@ -141,11 +141,8 @@ static int32_t mndProcessCompactVnodeRsp(SMnodeMsg *pMsg) { return 0; }
static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOldVgroup, SVgObj *pNewVgroup) { static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOldVgroup, SVgObj *pNewVgroup) {
mTrace("vgId:%d, perform update action", pOldVgroup->vgId); mTrace("vgId:%d, perform update action", pOldVgroup->vgId);
pOldVgroup->vgId = pNewVgroup->vgId;
pOldVgroup->createdTime = pNewVgroup->createdTime;
pOldVgroup->updateTime = pNewVgroup->updateTime; pOldVgroup->updateTime = pNewVgroup->updateTime;
pOldVgroup->version = pNewVgroup->version; pOldVgroup->version = pNewVgroup->version;
memcpy(pOldVgroup->dbName, pNewVgroup->dbName, TSDB_FULL_DB_NAME_LEN);
pOldVgroup->replica = pNewVgroup->replica; pOldVgroup->replica = pNewVgroup->replica;
memcpy(pOldVgroup->vnodeGid, pNewVgroup->vnodeGid, TSDB_MAX_REPLICA * sizeof(SVnodeGid)); memcpy(pOldVgroup->vnodeGid, pNewVgroup->vnodeGid, TSDB_MAX_REPLICA * sizeof(SVnodeGid));
return 0; return 0;
......
...@@ -19,5 +19,5 @@ target_link_libraries( ...@@ -19,5 +19,5 @@ target_link_libraries(
# test # test
if(${BUILD_TEST}) if(${BUILD_TEST})
add_subdirectory(test) #add_subdirectory(test)
endif(${BUILD_TEST}) endif(${BUILD_TEST})
\ No newline at end of file
...@@ -10,3 +10,5 @@ target_link_libraries( ...@@ -10,3 +10,5 @@ target_link_libraries(
catalog catalog
PRIVATE os util common transport query PRIVATE os util common transport query
) )
ADD_SUBDIRECTORY(test)
\ No newline at end of file
...@@ -24,16 +24,16 @@ extern "C" { ...@@ -24,16 +24,16 @@ extern "C" {
#include "common.h" #include "common.h"
#include "tlog.h" #include "tlog.h"
#define CTG_DEFAULT_CLUSTER_NUMBER 6 #define CTG_DEFAULT_CACHE_CLUSTER_NUMBER 6
#define CTG_DEFAULT_VGROUP_NUMBER 100 #define CTG_DEFAULT_CACHE_VGROUP_NUMBER 100
#define CTG_DEFAULT_DB_NUMBER 20 #define CTG_DEFAULT_CACHE_DB_NUMBER 20
#define CTG_DEFAULT_CACHE_TABLEMETA_NUMBER 100000
#define CTG_DEFAULT_INVALID_VERSION (-1) #define CTG_DEFAULT_INVALID_VERSION (-1)
typedef struct SVgroupListCache { typedef struct SVgroupListCache {
int32_t vgroupVersion; int32_t vgroupVersion;
SHashObj *cache; // key:vgId, value:SVgroupInfo* SHashObj *cache; // key:vgId, value:SVgroupInfo
SArray *arrayCache; // SVgroupInfo
} SVgroupListCache; } SVgroupListCache;
typedef struct SDBVgroupCache { typedef struct SDBVgroupCache {
...@@ -42,6 +42,7 @@ typedef struct SDBVgroupCache { ...@@ -42,6 +42,7 @@ typedef struct SDBVgroupCache {
typedef struct STableMetaCache { typedef struct STableMetaCache {
SHashObj *cache; //key:fulltablename, value:STableMeta SHashObj *cache; //key:fulltablename, value:STableMeta
SHashObj *stableCache; //key:suid, value:STableMeta*
} STableMetaCache; } STableMetaCache;
typedef struct SCatalog { typedef struct SCatalog {
...@@ -53,8 +54,10 @@ typedef struct SCatalog { ...@@ -53,8 +54,10 @@ typedef struct SCatalog {
typedef struct SCatalogMgmt { typedef struct SCatalogMgmt {
void *pMsgSender; // used to send messsage to mnode to fetch necessary metadata void *pMsgSender; // used to send messsage to mnode to fetch necessary metadata
SHashObj *pCluster; // items cached for each cluster, the hash key is the cluster-id got from mgmt node SHashObj *pCluster; // items cached for each cluster, the hash key is the cluster-id got from mgmt node
SCatalogCfg cfg;
} SCatalogMgmt; } SCatalogMgmt;
typedef uint32_t (*tableNameHashFp)(const char *, uint32_t);
extern int32_t ctgDebugFlag; extern int32_t ctgDebugFlag;
......
此差异已折叠。
MESSAGE(STATUS "build catalog unit test")
# GoogleTest requires at least C++11
SET(CMAKE_CXX_STANDARD 11)
AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
ADD_EXECUTABLE(catalogTest ${SOURCE_LIST})
TARGET_LINK_LIBRARIES(
catalogTest
PUBLIC os util common catalog transport gtest query
)
TARGET_INCLUDE_DIRECTORIES(
catalogTest
PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/catalog/"
PRIVATE "${CMAKE_SOURCE_DIR}/source/libs/catalog/inc"
)
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <gtest/gtest.h>
#include <tglobal.h>
#include <iostream>
#pragma GCC diagnostic ignored "-Wwrite-strings"
#pragma GCC diagnostic ignored "-Wunused-function"
#pragma GCC diagnostic ignored "-Wunused-variable"
#pragma GCC diagnostic ignored "-Wsign-compare"
#include "os.h"
#include "taos.h"
#include "tdef.h"
#include "tvariant.h"
#include "catalog.h"
namespace {
}
TEST(testCase, normalCase) {
char *clusterId = "cluster1";
struct SCatalog* pCtg = NULL;
int32_t code = catalogInit(NULL);
ASSERT_EQ(code, 0);
code = catalogGetHandle(clusterId, &pCtg);
ASSERT_EQ(code, 0);
}
/*
TEST(testCase, normalCase) {
SSqlInfo info1 = doGenerateAST("select top(a*b / 99, 20) from `t.1abc` interval(10s, 1s)");
ASSERT_EQ(info1.valid, true);
char msg[128] = {0};
SMsgBuf buf;
buf.len = 128;
buf.buf = msg;
SSqlNode* pNode = (SSqlNode*) taosArrayGetP(((SArray*)info1.sub.node), 0);
int32_t code = evaluateSqlNode(pNode, TSDB_TIME_PRECISION_NANO, &buf);
ASSERT_EQ(code, 0);
SCatalogReq req = {0};
int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128);
ASSERT_EQ(ret, 0);
ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);
SQueryStmtInfo* pQueryInfo = createQueryInfo();
setTableMetaInfo(pQueryInfo, &req);
SSqlNode* pSqlNode = (SSqlNode*)taosArrayGetP(info1.sub.node, 0);
ret = validateSqlNode(pSqlNode, pQueryInfo, &buf);
ASSERT_EQ(ret, 0);
SArray* pExprList = pQueryInfo->exprList[0];
int32_t num = tsCompatibleModel? 2:1;
ASSERT_EQ(taosArrayGetSize(pExprList), num);
SExprInfo* p1 = (SExprInfo*) taosArrayGetP(pExprList, 1);
ASSERT_EQ(p1->base.pColumns->uid, 110);
ASSERT_EQ(p1->base.numOfParams, 1);
ASSERT_EQ(p1->base.resSchema.type, TSDB_DATA_TYPE_DOUBLE);
ASSERT_STRCASEEQ(p1->base.resSchema.name, "top(a*b / 99, 20)");
ASSERT_EQ(p1->base.pColumns->flag, TSDB_COL_TMP);
ASSERT_STRCASEEQ(p1->base.token, "top(a*b / 99, 20)");
ASSERT_EQ(p1->base.interBytes, 16);
ASSERT_EQ(p1->pExpr->nodeType, TEXPR_FUNCTION_NODE);
ASSERT_STREQ(p1->pExpr->_function.functionName, "top");
tExprNode* pParam = p1->pExpr->_function.pChild[0];
ASSERT_EQ(pParam->nodeType, TEXPR_COL_NODE);
ASSERT_EQ(taosArrayGetSize(pQueryInfo->colList), 3);
ASSERT_EQ(pQueryInfo->fieldsInfo.numOfOutput, 2);
struct SQueryPlanNode* n = nullptr;
code = createQueryPlan(pQueryInfo, &n);
char* str = NULL;
queryPlanToString(n, &str);
printf("%s\n", str);
destroyQueryInfo(pQueryInfo);
qParserClearupMetaRequestInfo(&req);
destroySqlInfo(&info1);
}
TEST(testCase, displayPlan) {
generateLogicplan("select count(*) from `t.1abc`");
generateLogicplan("select count(*)+ 22 from `t.1abc`");
generateLogicplan("select count(*)+ 22 from `t.1abc` interval(1h, 20s) sliding(10m) limit 20,30");
generateLogicplan("select count(*) from `t.1abc` group by a");
generateLogicplan("select count(A+B) from `t.1abc` group by a");
generateLogicplan("select count(length(a)+b) from `t.1abc` group by a");
generateLogicplan("select count(*) from `t.1abc` interval(10s, 5s) sliding(7s)");
generateLogicplan("select count(*) from `t.1abc` interval(10s, 5s) sliding(7s) order by 1 desc ");
generateLogicplan("select count(*),sum(a),avg(b),min(a+b)+99 from `t.1abc`");
generateLogicplan("select count(*), min(a) + 99 from `t.1abc`");
generateLogicplan("select count(length(count(*) + 22)) from `t.1abc`");
generateLogicplan("select concat(concat(a,b), concat(a,b)) from `t.1abc` limit 20");
generateLogicplan("select count(*), first(a), last(b) from `t.1abc` state_window(a)");
generateLogicplan("select count(*), first(a), last(b) from `t.1abc` session(ts, 20s)");
// order by + group by column + limit offset
generateLogicplan("select top(a, 20) k from `t.1abc` order by k asc limit 3 offset 1");
// fill
generateLogicplan("select min(a) from `t.1abc` where ts>now and ts<now+2h interval(1s) fill(linear)");
// union + union all
// join
// Aggregate(count(*) [count(*) #5056], sum(a) [sum(a) #5057], avg(b) [avg(b) #5058], min(a+b) [min(a+b) #5060])
// Projection(cols: [a+b #5059]) filters:(nil)
// Projection(cols: [ts #0], [a #1], [b #2]) filters:(nil)
// TableScan(t.1abc #110) time_range: -9223372036854775808 - 9223372036854775807
}
*/
int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
...@@ -4831,8 +4831,8 @@ static SSDataBlock* doBlockInfoScan(void* param, bool* newgroup) { ...@@ -4831,8 +4831,8 @@ static SSDataBlock* doBlockInfoScan(void* param, bool* newgroup) {
STableBlockDist tableBlockDist = {0}; STableBlockDist tableBlockDist = {0};
tableBlockDist.numOfTables = (int32_t)pOperator->pRuntimeEnv->tableqinfoGroupInfo.numOfTables; tableBlockDist.numOfTables = (int32_t)pOperator->pRuntimeEnv->tableqinfoGroupInfo.numOfTables;
int32_t numRowSteps = tsMaxRowsInFileBlock / TSDB_BLOCK_DIST_STEP_ROWS; int32_t numRowSteps = TSDB_DEFAULT_MAX_ROW_FBLOCK / TSDB_BLOCK_DIST_STEP_ROWS;
if (tsMaxRowsInFileBlock % TSDB_BLOCK_DIST_STEP_ROWS != 0) { if (TSDB_DEFAULT_MAX_ROW_FBLOCK % TSDB_BLOCK_DIST_STEP_ROWS != 0) {
++numRowSteps; ++numRowSteps;
} }
tableBlockDist.dataBlockInfos = taosArrayInit(numRowSteps, sizeof(SFileBlockInfo)); tableBlockDist.dataBlockInfos = taosArrayInit(numRowSteps, sizeof(SFileBlockInfo));
......
...@@ -30,6 +30,7 @@ extern "C" { ...@@ -30,6 +30,7 @@ extern "C" {
typedef struct Fst Fst; typedef struct Fst Fst;
typedef struct FstNode FstNode; typedef struct FstNode FstNode;
typedef struct StreamWithState StreamWithState;
typedef enum { Included, Excluded, Unbounded} FstBound; typedef enum { Included, Excluded, Unbounded} FstBound;
...@@ -283,6 +284,9 @@ Output fstEmptyFinalOutput(Fst *fst, bool *null); ...@@ -283,6 +284,9 @@ Output fstEmptyFinalOutput(Fst *fst, bool *null);
FstStreamBuilder *fstSearch(Fst *fst, AutomationCtx *ctx); FstStreamBuilder *fstSearch(Fst *fst, AutomationCtx *ctx);
FstStreamWithStateBuilder *fstSearchWithState(Fst *fst, AutomationCtx *ctx); FstStreamWithStateBuilder *fstSearchWithState(Fst *fst, AutomationCtx *ctx);
// into stream to expand later
StreamWithState* streamBuilderIntoStream(FstStreamBuilder *sb);
bool fstVerify(Fst *fst); bool fstVerify(Fst *fst);
......
...@@ -1094,6 +1094,10 @@ bool fstGet(Fst *fst, FstSlice *b, Output *out) { ...@@ -1094,6 +1094,10 @@ bool fstGet(Fst *fst, FstSlice *b, Output *out) {
FstStreamBuilder *fstSearch(Fst *fst, AutomationCtx *ctx) { FstStreamBuilder *fstSearch(Fst *fst, AutomationCtx *ctx) {
return fstStreamBuilderCreate(fst, ctx); return fstStreamBuilderCreate(fst, ctx);
} }
StreamWithState* streamBuilderIntoStream(FstStreamBuilder *sb) {
if (sb == NULL) { return NULL; }
return streamWithStateCreate(sb->fst, sb->aut, sb->min, sb->max);
}
FstStreamWithStateBuilder *fstSearchWithState(Fst *fst, AutomationCtx *ctx) { FstStreamWithStateBuilder *fstSearchWithState(Fst *fst, AutomationCtx *ctx) {
return fstStreamBuilderCreate(fst, ctx); return fstStreamBuilderCreate(fst, ctx);
} }
...@@ -1119,7 +1123,7 @@ CompiledAddr fstGetRootAddr(Fst *fst) { ...@@ -1119,7 +1123,7 @@ CompiledAddr fstGetRootAddr(Fst *fst) {
Output fstEmptyFinalOutput(Fst *fst, bool *null) { Output fstEmptyFinalOutput(Fst *fst, bool *null) {
Output res = 0; Output res = 0;
FstNode *node = fst->root; FstNode *node = fstGetRoot(fst);
if (FST_NODE_IS_FINAL(node)) { if (FST_NODE_IS_FINAL(node)) {
*null = false; *null = false;
res = FST_NODE_FINAL_OUTPUT(node); res = FST_NODE_FINAL_OUTPUT(node);
...@@ -1176,7 +1180,7 @@ bool fstBoundWithDataIsEmpty(FstBoundWithData *bound) { ...@@ -1176,7 +1180,7 @@ bool fstBoundWithDataIsEmpty(FstBoundWithData *bound) {
bool fstBoundWithDataIsIncluded(FstBoundWithData *bound) { bool fstBoundWithDataIsIncluded(FstBoundWithData *bound) {
return bound->type == Included ? true : false; return bound->type == Excluded? false : true;
} }
void fstBoundDestroy(FstBoundWithData *bound) { void fstBoundDestroy(FstBoundWithData *bound) {
......
...@@ -68,7 +68,7 @@ StartWithStateValue *startWithStateValueDump(StartWithStateValue *sv) { ...@@ -68,7 +68,7 @@ StartWithStateValue *startWithStateValueDump(StartWithStateValue *sv) {
// prefix query, impl later // prefix query, impl later
static void* prefixStart(AutomationCtx *ctx) { static void* prefixStart(AutomationCtx *ctx) {
StartWithStateValue *data = (StartWithStateValue *)(ctx->data); StartWithStateValue *data = (StartWithStateValue *)(ctx->stdata);
return startWithStateValueDump(data); return startWithStateValueDump(data);
}; };
static bool prefixIsMatch(AutomationCtx *ctx, void *sv) { static bool prefixIsMatch(AutomationCtx *ctx, void *sv) {
...@@ -145,7 +145,8 @@ AutomationCtx* automCtxCreate(void *data,AutomationType atype) { ...@@ -145,7 +145,8 @@ AutomationCtx* automCtxCreate(void *data,AutomationType atype) {
StartWithStateValue *sv = NULL; StartWithStateValue *sv = NULL;
if (atype == AUTOMATION_PREFIX) { if (atype == AUTOMATION_PREFIX) {
sv = startWithStateValueCreate(Running, FST_INT, 0); int val = 0;
sv = startWithStateValueCreate(Running, FST_INT, &val);
ctx->stdata = (void *)sv; ctx->stdata = (void *)sv;
} else if (atype == AUTMMATION_MATCH) { } else if (atype == AUTMMATION_MATCH) {
......
...@@ -59,9 +59,22 @@ class FstReadMemory { ...@@ -59,9 +59,22 @@ class FstReadMemory {
return ok; return ok;
} }
// add later // add later
bool Search(const std::string &key, std::vector<uint64_t> &result) { bool Search(AutomationCtx *ctx, std::vector<uint64_t> &result) {
FstStreamBuilder *sb = fstSearch(_fst, ctx);
StreamWithState *st = streamBuilderIntoStream(sb);
StreamWithStateResult *rt = NULL;
while ((rt = streamWithStateNextWith(st, NULL)) != NULL) {
result.push_back((uint64_t)(rt->out.out));
}
return true; return true;
} }
bool SearchWithTimeCostUs(AutomationCtx *ctx, std::vector<uint64_t> &result) {
int64_t s = taosGetTimestampUs();
bool ok = this->Search(ctx, result);
int64_t e = taosGetTimestampUs();
return ok;
}
~FstReadMemory() { ~FstReadMemory() {
fstCountingWriterDestroy(_w); fstCountingWriterDestroy(_w);
...@@ -186,11 +199,43 @@ void checkFstPerf() { ...@@ -186,11 +199,43 @@ void checkFstPerf() {
printf("success to init fst read"); printf("success to init fst read");
} }
Performance_fstReadRecords(m); Performance_fstReadRecords(m);
delete m; delete m;
} }
void checkFstPrefixSearch() {
FstWriter *fw = new FstWriter;
int64_t s = taosGetTimestampUs();
int count = 2;
std::string key("ab");
for (int i = 0; i < count; i++) {
key[1] = key[1] + i;
fw->Put(key, i);
}
int64_t e = taosGetTimestampUs();
std::cout << "insert data count : " << count << "elapas time: " << e - s << std::endl;
delete fw;
FstReadMemory *m = new FstReadMemory(1024 * 64);
if (m->init() == false) {
std::cout << "init readMemory failed" << std::endl;
delete m;
return;
}
// prefix search
std::vector<uint64_t> result;
AutomationCtx *ctx = automCtxCreate((void *)"ab", AUTOMATION_PREFIX);
m->Search(ctx, result);
assert(result.size() == count);
for (int i = 0; i < result.size(); i++) {
assert(result[i] == i); // check result
}
free(ctx);
delete m;
}
void validateFst() { void validateFst() {
int val = 100; int val = 100;
int count = 100; int count = 100;
...@@ -209,6 +254,8 @@ void validateFst() { ...@@ -209,6 +254,8 @@ void validateFst() {
FstReadMemory *m = new FstReadMemory(1024 * 64); FstReadMemory *m = new FstReadMemory(1024 * 64);
if (m->init() == false) { if (m->init() == false) {
std::cout << "init readMemory failed" << std::endl; std::cout << "init readMemory failed" << std::endl;
delete m;
return;
} }
{ {
...@@ -230,10 +277,12 @@ void validateFst() { ...@@ -230,10 +277,12 @@ void validateFst() {
} }
} }
delete m; delete m;
} }
int main(int argc, char** argv) { int main(int argc, char** argv) {
checkFstPerf(); checkFstPerf();
//checkFstPrefixSearch();
return 1; return 1;
} }
......
...@@ -4088,7 +4088,7 @@ int32_t qParserValidateSqlNode(struct SCatalog* pCatalog, SSqlInfo* pInfo, SQuer ...@@ -4088,7 +4088,7 @@ int32_t qParserValidateSqlNode(struct SCatalog* pCatalog, SSqlInfo* pInfo, SQuer
} }
// load the meta data from catalog // load the meta data from catalog
code = catalogGetAllMeta(pCatalog, NULL, &req, &data); code = catalogGetAllMeta(pCatalog, NULL, NULL, &req, &data);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
......
...@@ -71,8 +71,7 @@ typedef struct SInsertParseContext { ...@@ -71,8 +71,7 @@ typedef struct SInsertParseContext {
const char* pSql; const char* pSql;
SMsgBuf msg; SMsgBuf msg;
struct SCatalog* pCatalog; struct SCatalog* pCatalog;
SMetaData meta; // need release STableMeta* pTableMeta;
const STableMeta* pTableMeta;
SHashObj* pTableBlockHashObj; // data block for each table. need release SHashObj* pTableBlockHashObj; // data block for each table. need release
int32_t totalNum; int32_t totalNum;
SInsertStmtInfo* pOutput; SInsertStmtInfo* pOutput;
...@@ -165,29 +164,29 @@ static int32_t skipInsertInto(SInsertParseContext* pCxt) { ...@@ -165,29 +164,29 @@ static int32_t skipInsertInto(SInsertParseContext* pCxt) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t buildTableName(SInsertParseContext* pCxt, SToken* pStname, SArray* tableNameList) { static int32_t buildName(SInsertParseContext* pCxt, SToken* pStname, char* fullDbName, char* tableName) {
if (parserValidateIdToken(pStname) != TSDB_CODE_SUCCESS) { if (parserValidateIdToken(pStname) != TSDB_CODE_SUCCESS) {
return buildSyntaxErrMsg(&pCxt->msg, "invalid table name", pStname->z); return buildSyntaxErrMsg(&pCxt->msg, "invalid table name", pStname->z);
} }
SName name = {0}; char* p = strnchr(pStname->z, TS_PATH_DELIMITER[0], pStname->n, false);
strcpy(name.dbname, pCxt->pComCxt->pDbname); if (NULL != p) { // db.table
strncpy(name.tname, pStname->z, pStname->n); strcpy(fullDbName, pCxt->pComCxt->pAcctId);
taosArrayPush(tableNameList, &name); fullDbName[strlen(pCxt->pComCxt->pAcctId)] = TS_PATH_DELIMITER[0];
strncpy(fullDbName, pStname->z, p - pStname->z);
strncpy(tableName, p + 1, pStname->n - (p - pStname->z) - 1);
} else {
snprintf(fullDbName, TSDB_FULL_DB_NAME_LEN, "%s.%s", pCxt->pComCxt->pAcctId, pCxt->pComCxt->pDbname);
strncpy(tableName, pStname->z, pStname->n);
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t buildMetaReq(SInsertParseContext* pCxt, SToken* pStname, SCatalogReq* pMetaReq) {
pMetaReq->pTableName = taosArrayInit(4, sizeof(SName));
return buildTableName(pCxt, pStname, pMetaReq->pTableName);
}
static int32_t getTableMeta(SInsertParseContext* pCxt, SToken* pTname) { static int32_t getTableMeta(SInsertParseContext* pCxt, SToken* pTname) {
SCatalogReq req; char fullDbName[TSDB_FULL_DB_NAME_LEN] = {0};
CHECK_CODE(buildMetaReq(pCxt, pTname, &req)); char tableName[TSDB_TABLE_NAME_LEN] = {0};
CHECK_CODE(catalogGetTableMeta(pCxt->pCatalog, NULL, NULL, NULL, &pCxt->meta)); //TODO CHECK_CODE(buildName(pCxt, pTname, fullDbName, tableName));
pCxt->pTableMeta = (STableMeta*)taosArrayGetP(pCxt->meta.pTableMeta, 0); CHECK_CODE(catalogGetTableMeta(pCxt->pCatalog, pCxt->pComCxt->pRpc, pCxt->pComCxt->pEpSet, fullDbName, tableName, &pCxt->pTableMeta));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -868,12 +867,11 @@ int32_t parseInsertSql(SParseContext* pContext, SInsertStmtInfo** pInfo) { ...@@ -868,12 +867,11 @@ int32_t parseInsertSql(SParseContext* pContext, SInsertStmtInfo** pInfo) {
.pOutput = *pInfo .pOutput = *pInfo
}; };
CHECK_CODE(catalogGetHandle(NULL, &context.pCatalog)); //TODO
if (NULL == context.pTableBlockHashObj) { if (NULL == context.pTableBlockHashObj) {
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
CHECK_CODE(catalogGetHandle(pContext->pClusterId, &context.pCatalog));
CHECK_CODE(skipInsertInto(&context)); CHECK_CODE(skipInsertInto(&context));
CHECK_CODE(parseInsertBody(&context)); CHECK_CODE(parseInsertBody(&context));
......
...@@ -1464,29 +1464,6 @@ int32_t copyTagData(STagData* dst, const STagData* src) { ...@@ -1464,29 +1464,6 @@ int32_t copyTagData(STagData* dst, const STagData* src) {
return 0; return 0;
} }
STableMeta* createSuperTableMeta(STableMetaMsg* pChild) {
assert(pChild != NULL);
int32_t total = pChild->numOfColumns + pChild->numOfTags;
STableMeta* pTableMeta = calloc(1, sizeof(STableMeta) + sizeof(SSchema) * total);
pTableMeta->tableType = TSDB_SUPER_TABLE;
pTableMeta->tableInfo.numOfTags = pChild->numOfTags;
pTableMeta->tableInfo.numOfColumns = pChild->numOfColumns;
pTableMeta->tableInfo.precision = pChild->precision;
pTableMeta->uid = pChild->suid;
pTableMeta->tversion = pChild->tversion;
pTableMeta->sversion = pChild->sversion;
memcpy(pTableMeta->schema, pChild->pSchema, sizeof(SSchema) * total);
int32_t num = pTableMeta->tableInfo.numOfColumns;
for(int32_t i = 0; i < num; ++i) {
pTableMeta->tableInfo.rowSize += pTableMeta->schema[i].bytes;
}
return pTableMeta;
}
uint32_t getTableMetaSize(const STableMeta* pTableMeta) { uint32_t getTableMetaSize(const STableMeta* pTableMeta) {
assert(pTableMeta != NULL); assert(pTableMeta != NULL);
......
...@@ -6,13 +6,16 @@ SET(CMAKE_CXX_STANDARD 11) ...@@ -6,13 +6,16 @@ SET(CMAKE_CXX_STANDARD 11)
AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST) AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
ADD_EXECUTABLE(parserTest ${SOURCE_LIST}) ADD_EXECUTABLE(parserTest ${SOURCE_LIST})
TARGET_LINK_LIBRARIES(
parserTest
PUBLIC os util common parser catalog transport gtest function planner query
)
TARGET_INCLUDE_DIRECTORIES( TARGET_INCLUDE_DIRECTORIES(
parserTest parserTest
PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/parser/" PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/parser/"
PRIVATE "${CMAKE_SOURCE_DIR}/source/libs/parser/inc" PRIVATE "${CMAKE_SOURCE_DIR}/source/libs/parser/inc"
) )
TARGET_LINK_LIBRARIES(
parserTest
PUBLIC os util common parser catalog transport gtest function planner query
)
TARGET_LINK_OPTIONS(parserTest PRIVATE -Wl,-wrap,malloc)
...@@ -27,6 +27,27 @@ namespace { ...@@ -27,6 +27,27 @@ namespace {
} }
} }
extern "C" {
#include <execinfo.h>
void *__real_malloc(size_t);
void *__wrap_malloc(size_t c) {
// printf("My MALLOC called: %d\n", c);
// void *array[32];
// int size = backtrace(array, 32);
// char **symbols = backtrace_symbols(array, size);
// for (int i = 0; i < size; ++i) {
// cout << symbols[i] << endl;
// }
// free(symbols);
return __real_malloc(c);
}
}
// syntax: // syntax:
// INSERT INTO // INSERT INTO
// tb_name // tb_name
......
...@@ -23,25 +23,23 @@ extern "C" { ...@@ -23,25 +23,23 @@ extern "C" {
#include "common.h" #include "common.h"
#include "tarray.h" #include "tarray.h"
#include "planner.h" #include "planner.h"
#include "parser.h"
#include "taosmsg.h" #include "taosmsg.h"
enum LOGIC_PLAN_E { #define QNODE_TAGSCAN 1
LP_SCAN = 1, #define QNODE_TABLESCAN 2
LP_SESSION = 2, #define QNODE_PROJECT 3
LP_STATE = 3, #define QNODE_AGGREGATE 4
LP_INTERVAL = 4, #define QNODE_GROUPBY 5
LP_FILL = 5, #define QNODE_LIMIT 6
LP_AGG = 6, #define QNODE_JOIN 7
LP_JOIN = 7, #define QNODE_DISTINCT 8
LP_PROJECT = 8, #define QNODE_SORT 9
LP_DISTINCT = 9, #define QNODE_UNION 10
LP_ORDER = 10 #define QNODE_TIMEWINDOW 11
}; #define QNODE_SESSIONWINDOW 12
#define QNODE_STATEWINDOW 13
typedef struct SQueryNodeBasicInfo { #define QNODE_FILL 14
int32_t type; // operator type
char *name; // operator name
} SQueryNodeBasicInfo;
typedef struct SQueryDistPlanNodeInfo { typedef struct SQueryDistPlanNodeInfo {
bool stableQuery; // super table query or not bool stableQuery; // super table query or not
...@@ -52,8 +50,9 @@ typedef struct SQueryDistPlanNodeInfo { ...@@ -52,8 +50,9 @@ typedef struct SQueryDistPlanNodeInfo {
} SQueryDistPlanNodeInfo; } SQueryDistPlanNodeInfo;
typedef struct SQueryTableInfo { typedef struct SQueryTableInfo {
char *tableName; char *tableName; // to be deleted
uint64_t uid; uint64_t uid; // to be deleted
STableMetaInfo* pMeta;
STimeWindow window; STimeWindow window;
} SQueryTableInfo; } SQueryTableInfo;
...@@ -64,50 +63,12 @@ typedef struct SQueryPlanNode { ...@@ -64,50 +63,12 @@ typedef struct SQueryPlanNode {
SArray *pExpr; // the query functions or sql aggregations SArray *pExpr; // the query functions or sql aggregations
int32_t numOfExpr; // number of result columns, which is also the number of pExprs int32_t numOfExpr; // number of result columns, which is also the number of pExprs
void *pExtInfo; // additional information void *pExtInfo; // additional information
// previous operator to generated result for current node to process // children operator to generated result for current node to process
// in case of join, multiple prev nodes exist. // in case of join, multiple prev nodes exist.
SArray *pPrevNodes; // upstream nodes SArray *pChildren; // upstream nodes
struct SQueryPlanNode *nextNode; struct SQueryPlanNode *pParent;
} SQueryPlanNode; } SQueryPlanNode;
typedef SSchema SSlotSchema;
typedef struct SDataBlockSchema {
int32_t index;
SSlotSchema *pSchema;
int32_t numOfCols; // number of columns
} SDataBlockSchema;
typedef struct SPhyNode {
SQueryNodeBasicInfo info;
SArray *pTargets; // target list to be computed or scanned at this node
SArray *pConditions; // implicitly-ANDed qual conditions
SDataBlockSchema targetSchema;
// children plan to generated result for current node to process
// in case of join, multiple plan nodes exist.
SArray *pChildren;
} SPhyNode;
typedef struct SScanPhyNode {
SPhyNode node;
uint64_t uid; // unique id of the table
} SScanPhyNode;
typedef SScanPhyNode STagScanPhyNode;
typedef SScanPhyNode SSystemTableScanPhyNode;
typedef struct SMultiTableScanPhyNode {
SScanPhyNode scan;
SArray *pTagsConditions; // implicitly-ANDed tag qual conditions
} SMultiTableScanPhyNode;
typedef SMultiTableScanPhyNode SMultiTableSeqScanPhyNode;
typedef struct SProjectPhyNode {
SPhyNode node;
} SProjectPhyNode;
/** /**
* Optimize the query execution plan, currently not implement yet. * Optimize the query execution plan, currently not implement yet.
* @param pQueryNode * @param pQueryNode
......
...@@ -14,23 +14,147 @@ ...@@ -14,23 +14,147 @@
*/ */
#include "plannerInt.h" #include "plannerInt.h"
#include "parser.h"
SPhyNode* createScanNode(SQueryPlanNode* pPlanNode) { static const char* gOpName[] = {
return NULL; "Unknown",
#define INCLUDE_AS_NAME
#include "plannerOp.h"
#undef INCLUDE_AS_NAME
};
typedef struct SPlanContext {
struct SCatalog* pCatalog;
struct SQueryDag* pDag;
SSubplan* pCurrentSubplan;
SSubplanId nextId;
} SPlanContext;
static void toDataBlockSchema(SQueryPlanNode* pPlanNode, SDataBlockSchema* dataBlockSchema) {
SWAP(dataBlockSchema->pSchema, pPlanNode->pSchema, SSchema*);
dataBlockSchema->numOfCols = pPlanNode->numOfCols;
}
static SPhyNode* initPhyNode(SQueryPlanNode* pPlanNode, int32_t type, int32_t size) {
SPhyNode* node = (SPhyNode*)calloc(1, size);
node->info.type = type;
node->info.name = gOpName[type];
SWAP(node->pTargets, pPlanNode->pExpr, SArray*);
toDataBlockSchema(pPlanNode, &(node->targetSchema));
}
static SPhyNode* createTagScanNode(SQueryPlanNode* pPlanNode) {
return initPhyNode(pPlanNode, OP_TagScan, sizeof(STagScanPhyNode));
}
static SSubplan* initSubplan(SPlanContext* pCxt, int32_t type) {
SSubplan* subplan = calloc(1, sizeof(SSubplan));
subplan->id = pCxt->nextId;
++(pCxt->nextId.subplanId);
subplan->type = type;
subplan->level = 0;
if (NULL != pCxt->pCurrentSubplan) {
subplan->level = pCxt->pCurrentSubplan->level + 1;
if (NULL == pCxt->pCurrentSubplan->pChildern) {
pCxt->pCurrentSubplan->pChildern = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES);
}
taosArrayPush(pCxt->pCurrentSubplan->pChildern, subplan);
subplan->pParents = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES);
taosArrayPush(subplan->pParents, pCxt->pCurrentSubplan);
}
pCxt->pCurrentSubplan = subplan;
return subplan;
}
static uint8_t getScanFlag(SQueryPlanNode* pPlanNode) {
// todo
return MASTER_SCAN;
}
static SPhyNode* createTableScanNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable) {
STableScanPhyNode* node = (STableScanPhyNode*)initPhyNode(pPlanNode, OP_TableScan, sizeof(STableScanPhyNode));
node->scan.uid = pTable->pMeta->pTableMeta->uid;
node->scan.tableType = pTable->pMeta->pTableMeta->tableType;
node->scanFlag = getScanFlag(pPlanNode);
node->window = pTable->window;
// todo tag cond
}
static void vgroupToEpSet(const SVgroupMsg* vg, SEpSet* epSet) {
// todo
}
static void splitSubplanBySTable(SPlanContext* pCxt, SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable) {
SVgroupsInfo* vgroupList = pTable->pMeta->vgroupList;
for (int32_t i = 0; i < pTable->pMeta->vgroupList->numOfVgroups; ++i) {
SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_SCAN);
vgroupToEpSet(&(pTable->pMeta->vgroupList->vgroups[i]), &subplan->execEpSet);
subplan->pNode = createTableScanNode(pCxt, pPlanNode, pTable);
// todo reset pCxt->pCurrentSubplan
}
} }
SPhyNode* createPhyNode(SQueryPlanNode* node) { static SPhyNode* createExchangeNode() {
switch (node->info.type) {
case LP_SCAN: }
return createScanNode(node);
static SPhyNode* createScanNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
SQueryTableInfo* pTable = (SQueryTableInfo*)pPlanNode->pExtInfo;
if (TSDB_SUPER_TABLE == pTable->pMeta->pTableMeta->tableType) {
splitSubplanBySTable(pCxt, pPlanNode, pTable);
return createExchangeNode(pCxt, pTable);
} }
return NULL; return createTableScanNode(pCxt, pPlanNode, pTable);
} }
SPhyNode* createSubplan(SQueryPlanNode* pSubquery) { static SPhyNode* createPhyNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
return NULL; SPhyNode* node = NULL;
switch (pPlanNode->info.type) {
case QNODE_TAGSCAN:
node = createTagScanNode(pPlanNode);
break;
case QNODE_TABLESCAN:
node = createScanNode(pCxt, pPlanNode);
break;
default:
assert(false);
}
if (pPlanNode->pChildren != NULL && taosArrayGetSize(pPlanNode->pChildren) > 0) {
node->pChildren = taosArrayInit(4, POINTER_BYTES);
size_t size = taosArrayGetSize(pPlanNode->pChildren);
for(int32_t i = 0; i < size; ++i) {
SPhyNode* child = createPhyNode(pCxt, taosArrayGet(pPlanNode->pChildren, i));
child->pParent = node;
taosArrayPush(node->pChildren, &child);
}
}
return node;
}
static void createSubplanByLevel(SPlanContext* pCxt, SQueryPlanNode* pRoot) {
SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_MERGE);
subplan->pNode = createPhyNode(pCxt, pRoot);
SArray* l0 = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES);
taosArrayPush(l0, &subplan);
taosArrayPush(pCxt->pDag->pSubplans, &l0);
// todo deal subquery
}
int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryDag** pDag) {
SPlanContext context = {
.pCatalog = pCatalog,
.pDag = calloc(1, sizeof(SQueryDag)),
.pCurrentSubplan = NULL
};
if (NULL == context.pDag) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
context.pDag->pSubplans = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES);
createSubplanByLevel(&context, pQueryNode);
*pDag = context.pDag;
return TSDB_CODE_SUCCESS;
} }
int32_t createDag(struct SQueryPlanNode* pQueryNode, struct SEpSet* pQnode, struct SQueryDag** pDag) { int32_t subPlanToString(struct SSubplan *pPhyNode, char** str) {
return 0; return TSDB_CODE_SUCCESS;
} }
...@@ -18,21 +18,6 @@ ...@@ -18,21 +18,6 @@
#include "parser.h" #include "parser.h"
#include "plannerInt.h" #include "plannerInt.h"
#define QNODE_TAGSCAN 1
#define QNODE_TABLESCAN 2
#define QNODE_PROJECT 3
#define QNODE_AGGREGATE 4
#define QNODE_GROUPBY 5
#define QNODE_LIMIT 6
#define QNODE_JOIN 7
#define QNODE_DISTINCT 8
#define QNODE_SORT 9
#define QNODE_UNION 10
#define QNODE_TIMEWINDOW 11
#define QNODE_SESSIONWINDOW 12
#define QNODE_STATEWINDOW 13
#define QNODE_FILL 14
typedef struct SFillEssInfo { typedef struct SFillEssInfo {
int32_t fillType; // fill type int32_t fillType; // fill type
int64_t *val; // fill value int64_t *val; // fill value
...@@ -104,12 +89,13 @@ static SQueryPlanNode* createQueryNode(int32_t type, const char* name, SQueryPla ...@@ -104,12 +89,13 @@ static SQueryPlanNode* createQueryNode(int32_t type, const char* name, SQueryPla
taosArrayPush(pNode->pExpr, &pExpr[i]); taosArrayPush(pNode->pExpr, &pExpr[i]);
} }
pNode->pPrevNodes = taosArrayInit(4, POINTER_BYTES); pNode->pChildren = taosArrayInit(4, POINTER_BYTES);
for(int32_t i = 0; i < numOfPrev; ++i) { for(int32_t i = 0; i < numOfPrev; ++i) {
taosArrayPush(pNode->pPrevNodes, &prev[i]); taosArrayPush(pNode->pChildren, &prev[i]);
} }
switch(type) { switch(type) {
case QNODE_TAGSCAN:
case QNODE_TABLESCAN: { case QNODE_TABLESCAN: {
SQueryTableInfo* info = calloc(1, sizeof(SQueryTableInfo)); SQueryTableInfo* info = calloc(1, sizeof(SQueryTableInfo));
memcpy(info, pExtInfo, sizeof(SQueryTableInfo)); memcpy(info, pExtInfo, sizeof(SQueryTableInfo));
...@@ -177,7 +163,7 @@ static SQueryPlanNode* doAddTableColumnNode(SQueryStmtInfo* pQueryInfo, STableMe ...@@ -177,7 +163,7 @@ static SQueryPlanNode* doAddTableColumnNode(SQueryStmtInfo* pQueryInfo, STableMe
SArray* pExprs, SArray* tableCols) { SArray* pExprs, SArray* tableCols) {
if (pQueryInfo->info.onlyTagQuery) { if (pQueryInfo->info.onlyTagQuery) {
int32_t num = (int32_t) taosArrayGetSize(pExprs); int32_t num = (int32_t) taosArrayGetSize(pExprs);
SQueryPlanNode* pNode = createQueryNode(QNODE_TAGSCAN, "TableTagScan", NULL, 0, pExprs->pData, num, NULL); SQueryPlanNode* pNode = createQueryNode(QNODE_TAGSCAN, "TableTagScan", NULL, 0, pExprs->pData, num, info);
if (pQueryInfo->info.distinct) { if (pQueryInfo->info.distinct) {
pNode = createQueryNode(QNODE_DISTINCT, "Distinct", &pNode, 1, pExprs->pData, num, NULL); pNode = createQueryNode(QNODE_DISTINCT, "Distinct", &pNode, 1, pExprs->pData, num, NULL);
...@@ -386,14 +372,14 @@ static void doDestroyQueryNode(SQueryPlanNode* pQueryNode) { ...@@ -386,14 +372,14 @@ static void doDestroyQueryNode(SQueryPlanNode* pQueryNode) {
tfree(pQueryNode->info.name); tfree(pQueryNode->info.name);
// dropAllExprInfo(pQueryNode->pExpr); // dropAllExprInfo(pQueryNode->pExpr);
if (pQueryNode->pPrevNodes != NULL) { if (pQueryNode->pChildren != NULL) {
int32_t size = (int32_t) taosArrayGetSize(pQueryNode->pPrevNodes); int32_t size = (int32_t) taosArrayGetSize(pQueryNode->pChildren);
for(int32_t i = 0; i < size; ++i) { for(int32_t i = 0; i < size; ++i) {
SQueryPlanNode* p = taosArrayGetP(pQueryNode->pPrevNodes, i); SQueryPlanNode* p = taosArrayGetP(pQueryNode->pChildren, i);
doDestroyQueryNode(p); doDestroyQueryNode(p);
} }
taosArrayDestroy(pQueryNode->pPrevNodes); taosArrayDestroy(pQueryNode->pChildren);
} }
tfree(pQueryNode); tfree(pQueryNode);
...@@ -607,8 +593,8 @@ int32_t printExprInfo(const char* buf, const SQueryPlanNode* pQueryNode, int32_t ...@@ -607,8 +593,8 @@ int32_t printExprInfo(const char* buf, const SQueryPlanNode* pQueryNode, int32_t
int32_t queryPlanToStringImpl(char* buf, SQueryPlanNode* pQueryNode, int32_t level, int32_t totalLen) { int32_t queryPlanToStringImpl(char* buf, SQueryPlanNode* pQueryNode, int32_t level, int32_t totalLen) {
int32_t len = doPrintPlan(buf, pQueryNode, level, totalLen); int32_t len = doPrintPlan(buf, pQueryNode, level, totalLen);
for(int32_t i = 0; i < taosArrayGetSize(pQueryNode->pPrevNodes); ++i) { for(int32_t i = 0; i < taosArrayGetSize(pQueryNode->pChildren); ++i) {
SQueryPlanNode* p1 = taosArrayGetP(pQueryNode->pPrevNodes, i); SQueryPlanNode* p1 = taosArrayGetP(pQueryNode->pChildren, i);
int32_t len1 = queryPlanToStringImpl(buf, p1, level + 1, len); int32_t len1 = queryPlanToStringImpl(buf, p1, level + 1, len);
len = len1; len = len1;
} }
......
...@@ -21,17 +21,6 @@ int32_t (*queryBuildMsg[TSDB_MSG_TYPE_MAX])(void* input, char **msg, int32_t msg ...@@ -21,17 +21,6 @@ int32_t (*queryBuildMsg[TSDB_MSG_TYPE_MAX])(void* input, char **msg, int32_t msg
int32_t (*queryProcessMsgRsp[TSDB_MSG_TYPE_MAX])(void* output, char *msg, int32_t msgSize) = {0}; int32_t (*queryProcessMsgRsp[TSDB_MSG_TYPE_MAX])(void* output, char *msg, int32_t msgSize) = {0};
int32_t queryBuildVgroupListReqMsg(void* input, char **msg, int32_t msgSize, int32_t *msgLen) {
if (NULL == msg || NULL == msgLen) {
return TSDB_CODE_TSC_INVALID_INPUT;
}
*msgLen = 0;
return TSDB_CODE_SUCCESS;
}
int32_t queryBuildTableMetaReqMsg(void* input, char **msg, int32_t msgSize, int32_t *msgLen) { int32_t queryBuildTableMetaReqMsg(void* input, char **msg, int32_t msgSize, int32_t *msgLen) {
if (NULL == input || NULL == msg || NULL == msgLen) { if (NULL == input || NULL == msg || NULL == msgLen) {
return TSDB_CODE_TSC_INVALID_INPUT; return TSDB_CODE_TSC_INVALID_INPUT;
...@@ -81,8 +70,7 @@ int32_t queryBuildUseDbMsg(void* input, char **msg, int32_t msgSize, int32_t *ms ...@@ -81,8 +70,7 @@ int32_t queryBuildUseDbMsg(void* input, char **msg, int32_t msgSize, int32_t *ms
strncpy(bMsg->db, bInput->db, sizeof(bMsg->db)); strncpy(bMsg->db, bInput->db, sizeof(bMsg->db));
bMsg->db[sizeof(bMsg->db) - 1] = 0; bMsg->db[sizeof(bMsg->db) - 1] = 0;
bMsg->vgroupVersion = bInput->vgroupVersion; bMsg->vgVersion = bInput->vgVersion;
bMsg->dbGroupVersion = bInput->dbGroupVersion;
*msgLen = (int32_t)sizeof(*bMsg); *msgLen = (int32_t)sizeof(*bMsg);
...@@ -90,156 +78,186 @@ int32_t queryBuildUseDbMsg(void* input, char **msg, int32_t msgSize, int32_t *ms ...@@ -90,156 +78,186 @@ int32_t queryBuildUseDbMsg(void* input, char **msg, int32_t msgSize, int32_t *ms
} }
int32_t queryProcessUseDBRsp(void* output, char *msg, int32_t msgSize) {
int32_t queryProcessVgroupListRsp(void* output, char *msg, int32_t msgSize) {
if (NULL == output || NULL == msg || msgSize <= 0) { if (NULL == output || NULL == msg || msgSize <= 0) {
return TSDB_CODE_TSC_INVALID_INPUT; return TSDB_CODE_TSC_INVALID_INPUT;
} }
SVgroupListRspMsg *pRsp = (SVgroupListRspMsg *)msg; SUseDbRsp *pRsp = (SUseDbRsp *)msg;
SUseDbOutput *pOut = (SUseDbOutput *)output;
pRsp->vgroupNum = htonl(pRsp->vgroupNum); int32_t code = 0;
pRsp->vgroupVersion = htonl(pRsp->vgroupVersion);
if (pRsp->vgroupNum < 0) { if (msgSize <= sizeof(*pRsp)) {
qError("vgroup number[%d] in rsp is invalid", pRsp->vgroupNum); qError("invalid use db rsp msg size, msgSize:%d", msgSize);
return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE; return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
} }
if (pRsp->vgroupVersion < 0) { pRsp->vgVersion = htonl(pRsp->vgVersion);
qError("vgroup vgroupVersion[%d] in rsp is invalid", pRsp->vgroupVersion); pRsp->vgNum = htonl(pRsp->vgNum);
return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
if (pRsp->vgNum < 0) {
qError("invalid db[%s] vgroup number[%d]", pRsp->db, pRsp->vgNum);
return TSDB_CODE_TSC_INVALID_VALUE;
} }
if (msgSize != (pRsp->vgroupNum * sizeof(pRsp->vgroupInfo[0]) + sizeof(*pRsp))) { int32_t expectSize = pRsp->vgNum * sizeof(pRsp->vgroupInfo[0]) + sizeof(*pRsp);
qError("vgroup list msg size mis-match, msgSize:%d, vgroup number:%d", msgSize, pRsp->vgroupNum); if (msgSize != expectSize) {
qError("use db rsp size mis-match, msgSize:%d, expected:%d, vgnumber:%d", msgSize, expectSize, pRsp->vgNum);
return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE; return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
} }
// keep SVgroupListInfo/SVgroupListRspMsg the same pOut->dbVgroup.vgVersion = pRsp->vgVersion;
*(SVgroupListInfo **)output = (SVgroupListInfo *)msg; pOut->dbVgroup.hashMethod = pRsp->hashMethod;
pOut->dbVgroup.vgInfo = taosHashInit(pRsp->vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
if (pRsp->vgroupNum == 0) { if (NULL == pOut->dbVgroup.vgInfo) {
return TSDB_CODE_SUCCESS; qError("hash init[%d] failed", pRsp->vgNum);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
for (int32_t i = 0; i < pRsp->vgroupNum; ++i) { for (int32_t i = 0; i < pRsp->vgNum; ++i) {
pRsp->vgroupInfo[i].vgId = htonl(pRsp->vgroupInfo[i].vgId); pRsp->vgroupInfo[i].vgId = htonl(pRsp->vgroupInfo[i].vgId);
pRsp->vgroupInfo[i].hashBegin = htonl(pRsp->vgroupInfo[i].hashBegin);
pRsp->vgroupInfo[i].hashEnd = htonl(pRsp->vgroupInfo[i].hashEnd);
for (int32_t n = 0; n < pRsp->vgroupInfo[i].numOfEps; ++n) { for (int32_t n = 0; n < pRsp->vgroupInfo[i].numOfEps; ++n) {
pRsp->vgroupInfo[i].epAddr[n].port = htonl(pRsp->vgroupInfo[i].epAddr[n].port); pRsp->vgroupInfo[i].epAddr[n].port = htonl(pRsp->vgroupInfo[i].epAddr[n].port);
} }
}
return TSDB_CODE_SUCCESS;
}
if (0 != taosHashPut(pOut->dbVgroup.vgInfo, &pRsp->vgroupInfo[i].vgId, sizeof(pRsp->vgroupInfo[i].vgId), &pRsp->vgroupInfo[i], sizeof(pRsp->vgroupInfo[i]))) {
qError("hash push failed");
goto _return;
}
}
memcpy(pOut->db, pRsp->db, sizeof(pOut->db));
return code;
int32_t queryProcessUseDBRsp(void* output, char *msg, int32_t msgSize) { _return:
if (NULL == output || NULL == msg || msgSize <= 0) { if (pOut) {
return TSDB_CODE_TSC_INVALID_INPUT; tfree(pOut->dbVgroup.vgInfo);
} }
SUseDbRspMsg *pRsp = (SUseDbRspMsg *)msg; return code;
SUseDbOutput *pOut = (SUseDbOutput *)output; }
int32_t code = 0;
if (msgSize <= sizeof(*pRsp)) { static int32_t queryConvertTableMetaMsg(STableMetaMsg* pMetaMsg) {
qError("invalid use db rsp msg size, msgSize:%d", msgSize); pMetaMsg->numOfTags = htonl(pMetaMsg->numOfTags);
return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE; pMetaMsg->numOfColumns = htonl(pMetaMsg->numOfColumns);
pMetaMsg->sversion = htonl(pMetaMsg->sversion);
pMetaMsg->tversion = htonl(pMetaMsg->tversion);
pMetaMsg->tuid = htobe64(pMetaMsg->tuid);
pMetaMsg->suid = htobe64(pMetaMsg->suid);
pMetaMsg->vgId = htonl(pMetaMsg->vgId);
if (pMetaMsg->numOfTags < 0 || pMetaMsg->numOfTags > TSDB_MAX_TAGS) {
qError("invalid numOfTags[%d] in table meta rsp msg", pMetaMsg->numOfTags);
return TSDB_CODE_TSC_INVALID_VALUE;
} }
pRsp->vgroupVersion = htonl(pRsp->vgroupVersion); if (pMetaMsg->numOfColumns > TSDB_MAX_COLUMNS || pMetaMsg->numOfColumns <= 0) {
pRsp->dbVgroupVersion = htonl(pRsp->dbVgroupVersion); qError("invalid numOfColumns[%d] in table meta rsp msg", pMetaMsg->numOfColumns);
return TSDB_CODE_TSC_INVALID_VALUE;
}
pRsp->vgroupNum = htonl(pRsp->vgroupNum); if (pMetaMsg->tableType != TSDB_SUPER_TABLE && pMetaMsg->tableType != TSDB_CHILD_TABLE && pMetaMsg->tableType != TSDB_NORMAL_TABLE) {
pRsp->dbVgroupNum = htonl(pRsp->dbVgroupNum); qError("invalid tableType[%d] in table meta rsp msg", pMetaMsg->tableType);
return TSDB_CODE_TSC_INVALID_VALUE;
}
if (pRsp->vgroupNum < 0) { if (pMetaMsg->sversion < 0) {
qError("invalid vgroup number[%d]", pRsp->vgroupNum); qError("invalid sversion[%d] in table meta rsp msg", pMetaMsg->sversion);
return TSDB_CODE_TSC_INVALID_VALUE; return TSDB_CODE_TSC_INVALID_VALUE;
} }
if (pRsp->dbVgroupNum < 0) { if (pMetaMsg->tversion < 0) {
qError("invalid db vgroup number[%d]", pRsp->dbVgroupNum); qError("invalid tversion[%d] in table meta rsp msg", pMetaMsg->tversion);
return TSDB_CODE_TSC_INVALID_VALUE; return TSDB_CODE_TSC_INVALID_VALUE;
} }
int32_t expectSize = pRsp->vgroupNum * sizeof(pRsp->vgroupInfo[0]) + pRsp->dbVgroupNum * sizeof(int32_t) + sizeof(*pRsp); SSchema* pSchema = pMetaMsg->pSchema;
if (msgSize != expectSize) {
qError("vgroup list msg size mis-match, msgSize:%d, expected:%d, vgroup number:%d, db vgroup number:%d", msgSize, expectSize, pRsp->vgroupNum, pRsp->dbVgroupNum); int32_t numOfTotalCols = pMetaMsg->numOfColumns + pMetaMsg->numOfTags;
return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE; for (int i = 0; i < numOfTotalCols; ++i) {
pSchema->bytes = htonl(pSchema->bytes);
pSchema->colId = htonl(pSchema->colId);
pSchema++;
} }
if (pRsp->vgroupVersion < 0) { if (pMetaMsg->pSchema[0].colId != PRIMARYKEY_TIMESTAMP_COL_ID) {
qInfo("no new vgroup list info"); qError("invalid colId[%d] for the first column in table meta rsp msg", pMetaMsg->pSchema[0].colId);
if (pRsp->vgroupNum != 0) {
qError("invalid vgroup number[%d] for no new vgroup list case", pRsp->vgroupNum);
return TSDB_CODE_TSC_INVALID_VALUE; return TSDB_CODE_TSC_INVALID_VALUE;
} }
} else {
int32_t s = sizeof(*pOut->vgroupList) + sizeof(pOut->vgroupList->vgroupInfo[0]) * pRsp->vgroupNum; return TSDB_CODE_SUCCESS;
pOut->vgroupList = calloc(1, s); }
if (NULL == pOut->vgroupList) {
qError("calloc size[%d] failed", s); int32_t queryCreateTableMetaFromMsg(STableMetaMsg* msg, bool isSuperTable, STableMeta **pMeta) {
int32_t total = msg->numOfColumns + msg->numOfTags;
int32_t metaSize = sizeof(STableMeta) + sizeof(SSchema) * total;
STableMeta* pTableMeta = calloc(1, metaSize);
if (NULL == pTableMeta) {
qError("calloc size[%d] failed", metaSize);
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
pOut->vgroupList->vgroupNum = pRsp->vgroupNum; pTableMeta->tableType = isSuperTable ? TSDB_SUPER_TABLE : msg->tableType;
pOut->vgroupList->vgroupVersion = pRsp->vgroupVersion; pTableMeta->uid = msg->suid;
pTableMeta->suid = msg->suid;
pTableMeta->sversion = msg->sversion;
pTableMeta->tversion = msg->tversion;
for (int32_t i = 0; i < pRsp->vgroupNum; ++i) { pTableMeta->tableInfo.numOfTags = msg->numOfTags;
pRsp->vgroupInfo[i].vgId = htonl(pRsp->vgroupInfo[i].vgId); pTableMeta->tableInfo.precision = msg->precision;
for (int32_t n = 0; n < pRsp->vgroupInfo[i].numOfEps; ++n) { pTableMeta->tableInfo.numOfColumns = msg->numOfColumns;
pRsp->vgroupInfo[i].epAddr[n].port = htonl(pRsp->vgroupInfo[i].epAddr[n].port);
}
memcpy(&pOut->vgroupList->vgroupInfo[i], &pRsp->vgroupInfo[i], sizeof(pRsp->vgroupInfo[i])); for(int32_t i = 0; i < msg->numOfColumns; ++i) {
} pTableMeta->tableInfo.rowSize += pTableMeta->schema[i].bytes;
} }
int32_t *vgIdList = (int32_t *)((char *)pRsp->vgroupInfo + sizeof(pRsp->vgroupInfo[0]) * pRsp->vgroupNum); memcpy(pTableMeta->schema, msg->pSchema, sizeof(SSchema) * total);
memcpy(pOut->db, pRsp->db, sizeof(pOut->db)); *pMeta = pTableMeta;
if (pRsp->dbVgroupVersion < 0) { return TSDB_CODE_SUCCESS;
qInfo("no new vgroup info for db[%s]", pRsp->db); }
} else {
pOut->dbVgroup = calloc(1, sizeof(*pOut->dbVgroup));
if (NULL == pOut->dbVgroup) { int32_t queryProcessTableMetaRsp(void* output, char *msg, int32_t msgSize) {
qError("calloc size[%d] failed", (int32_t)sizeof(*pOut->dbVgroup)); STableMetaMsg *pMetaMsg = (STableMetaMsg *)msg;
code = TSDB_CODE_TSC_OUT_OF_MEMORY; int32_t code = queryConvertTableMetaMsg(pMetaMsg);
goto _exit; if (code != TSDB_CODE_SUCCESS) {
return code;
} }
pOut->dbVgroup->vgId = taosArrayInit(pRsp->dbVgroupNum, sizeof(int32_t)); STableMetaOutput *pOut = (STableMetaOutput *)output;
if (NULL == pOut->dbVgroup->vgId) {
qError("taosArrayInit size[%d] failed", pRsp->dbVgroupNum); if (!tIsValidSchema(pMetaMsg->pSchema, pMetaMsg->numOfColumns, pMetaMsg->numOfTags)) {
code = TSDB_CODE_TSC_OUT_OF_MEMORY; qError("validate table meta schema in rsp msg failed");
goto _exit; return TSDB_CODE_TSC_INVALID_VALUE;
} }
pOut->dbVgroup->vgroupVersion = pRsp->dbVgroupVersion; if (pMetaMsg->tableType == TSDB_CHILD_TABLE) {
pOut->dbVgroup->hashRange = htonl(pRsp->dbHashRange); pOut->metaNum = 2;
for (int32_t i = 0; i < pRsp->dbVgroupNum; ++i) { memcpy(pOut->ctbFname, pMetaMsg->tbFname, sizeof(pOut->ctbFname));
*(vgIdList + i) = htonl(*(vgIdList + i)); memcpy(pOut->tbFname, pMetaMsg->stbFname, sizeof(pOut->tbFname));
taosArrayPush(pOut->dbVgroup->vgId, vgIdList + i) ; pOut->ctbMeta.vgId = pMetaMsg->vgId;
} pOut->ctbMeta.tableType = pMetaMsg->tableType;
} pOut->ctbMeta.uid = pMetaMsg->tuid;
pOut->ctbMeta.suid = pMetaMsg->suid;
return code; code = queryCreateTableMetaFromMsg(pMetaMsg, true, &pOut->tbMeta);
} else {
pOut->metaNum = 1;
_exit: memcpy(pOut->tbFname, pMetaMsg->tbFname, sizeof(pOut->tbFname));
if (pOut->dbVgroup && pOut->dbVgroup->vgId) {
taosArrayDestroy(pOut->dbVgroup->vgId);
pOut->dbVgroup->vgId = NULL;
}
tfree(pOut->dbVgroup); code = queryCreateTableMetaFromMsg(pMetaMsg, false, &pOut->tbMeta);
tfree(pOut->vgroupList); }
return code; return code;
} }
...@@ -247,11 +265,9 @@ _exit: ...@@ -247,11 +265,9 @@ _exit:
void msgInit() { void msgInit() {
queryBuildMsg[TSDB_MSG_TYPE_TABLE_META] = queryBuildTableMetaReqMsg; queryBuildMsg[TSDB_MSG_TYPE_TABLE_META] = queryBuildTableMetaReqMsg;
queryBuildMsg[TSDB_MSG_TYPE_VGROUP_LIST] = queryBuildVgroupListReqMsg;
queryBuildMsg[TSDB_MSG_TYPE_USE_DB] = queryBuildUseDbMsg; queryBuildMsg[TSDB_MSG_TYPE_USE_DB] = queryBuildUseDbMsg;
//tscProcessMsgRsp[TSDB_MSG_TYPE_TABLE_META] = tscProcessTableMetaRsp; queryProcessMsgRsp[TSDB_MSG_TYPE_TABLE_META] = queryProcessTableMetaRsp;
queryProcessMsgRsp[TSDB_MSG_TYPE_VGROUP_LIST] = queryProcessVgroupListRsp;
queryProcessMsgRsp[TSDB_MSG_TYPE_USE_DB] = queryProcessUseDBRsp; queryProcessMsgRsp[TSDB_MSG_TYPE_USE_DB] = queryProcessUseDBRsp;
/* /*
......
...@@ -9,5 +9,5 @@ target_include_directories( ...@@ -9,5 +9,5 @@ target_include_directories(
target_link_libraries( target_link_libraries(
scheduler scheduler
PRIVATE os util planner PRIVATE os util planner common
) )
\ No newline at end of file
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
#include "tarray.h" #include "tarray.h"
#include "talgo.h" #include "talgo.h"
void* taosArrayInit(size_t size, size_t elemSize) { SArray* taosArrayInit(size_t size, size_t elemSize) {
assert(elemSize > 0); assert(elemSize > 0);
if (size < TARRAY_MIN_SIZE) { if (size < TARRAY_MIN_SIZE) {
......
...@@ -81,15 +81,11 @@ int32_t writeInterval = DEFAULT_LOG_INTERVAL; ...@@ -81,15 +81,11 @@ int32_t writeInterval = DEFAULT_LOG_INTERVAL;
// log // log
int32_t tsNumOfLogLines = 10000000; int32_t tsNumOfLogLines = 10000000;
int32_t mDebugFlag = 131; int32_t mDebugFlag = 131;
int32_t sdbDebugFlag = 131;
int32_t dDebugFlag = 135; int32_t dDebugFlag = 135;
int32_t vDebugFlag = 135; int32_t vDebugFlag = 135;
int32_t cDebugFlag = 131; int32_t cDebugFlag = 131;
int32_t jniDebugFlag = 131; int32_t jniDebugFlag = 131;
int32_t odbcDebugFlag = 131; int32_t odbcDebugFlag = 131;
int32_t httpDebugFlag = 131;
int32_t mqttDebugFlag = 131;
int32_t monDebugFlag = 131;
int32_t qDebugFlag = 131; int32_t qDebugFlag = 131;
int32_t rpcDebugFlag = 131; int32_t rpcDebugFlag = 131;
int32_t uDebugFlag = 131; int32_t uDebugFlag = 131;
......
CMAKE_MINIMUM_REQUIRED(VERSION 2.8...3.20)
PROJECT(TDengine)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/mnode/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/dnode/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/tsdb/inc)
INCLUDE_DIRECTORIES(${TD_ENTERPRISE_DIR}/src/inc)
INCLUDE_DIRECTORIES(inc)
AUX_SOURCE_DIRECTORY(src SRC)
ADD_LIBRARY(balance ${SRC})
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_BALANCE_INT_H
#define TDENGINE_BALANCE_INT_H
#ifdef __cplusplus
extern "C" {
#endif
#include "mnodeInt.h"
#include "mnodeDef.h"
#include "mnodeDnode.h"
typedef struct {
int32_t size;
int32_t maxSize;
SDnodeObj **list;
} SBnDnodes;
typedef struct {
void * timer;
bool stop;
pthread_mutex_t mutex;
pthread_cond_t cond;
pthread_t thread;
} SBnThread;
typedef struct {
pthread_mutex_t mutex;
} SBnMgmt;
int32_t bnInit();
void bnCleanUp();
bool bnStart();
void bnCheckStatus();
void bnCheckModules();
extern SBnDnodes tsBnDnodes;
extern void *tsMnodeTmr;
#ifdef __cplusplus
}
#endif
#endif
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_BALANCE_SCORE_H
#define TDENGINE_BALANCE_SCORE_H
#ifdef __cplusplus
extern "C" {
#endif
#include "bnInt.h"
void bnInitDnodes();
void bnCleanupDnodes();
void bnAccquireDnodes();
void bnReleaseDnodes();
float bnTryCalcDnodeScore(SDnodeObj *pDnode, int32_t extraVnode);
#ifdef __cplusplus
}
#endif
#endif
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_BALANCE_THREAD_H
#define TDENGINE_BALANCE_THREAD_H
#ifdef __cplusplus
extern "C" {
#endif
#include "bnInt.h"
int32_t bnInitThread();
void bnCleanupThread();
void bnNotify();
void bnStartTimer(int32_t mseconds);
#ifdef __cplusplus
}
#endif
#endif
此差异已折叠。
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "tglobal.h"
#include "mnodeShow.h"
#include "mnodeUser.h"
#include "bnScore.h"
SBnDnodes tsBnDnodes;
static int32_t bnGetScoresMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
static int32_t bnRetrieveScores(SShowObj *pShow, char *data, int32_t rows, void *pConn);
static int32_t bnCalcCpuScore(SDnodeObj *pDnode) {
if (pDnode->cpuAvgUsage < 80)
return 0;
else if (pDnode->cpuAvgUsage < 90)
return 10;
else
return 50;
}
static int32_t bnCalcMemoryScore(SDnodeObj *pDnode) {
if (pDnode->memoryAvgUsage < 80)
return 0;
else if (pDnode->memoryAvgUsage < 90)
return 10;
else
return 50;
}
static int32_t bnCalcDiskScore(SDnodeObj *pDnode) {
if (pDnode->diskAvgUsage < 80)
return 0;
else if (pDnode->diskAvgUsage < 90)
return 10;
else
return 50;
}
static int32_t bnCalcBandScore(SDnodeObj *pDnode) {
if (pDnode->bandwidthUsage < 30)
return 0;
else if (pDnode->bandwidthUsage < 80)
return 10;
else
return 50;
}
static float bnCalcModuleScore(SDnodeObj *pDnode) {
if (pDnode->numOfCores <= 0) return 0;
if (pDnode->isMgmt) {
return (float)tsMnodeEqualVnodeNum / pDnode->numOfCores;
}
return 0;
}
static float bnCalcVnodeScore(SDnodeObj *pDnode, int32_t extra) {
if (pDnode->status == TAOS_DN_STATUS_DROPPING || pDnode->status == TAOS_DN_STATUS_OFFLINE) return 100000000;
if (pDnode->numOfCores <= 0) return 0;
return (float)(pDnode->openVnodes + extra) / pDnode->numOfCores;
}
/**
* calc singe score, such as cpu/memory/disk/bandwitdh/vnode
* 1. get the score config
* 2. if the value is out of range, use border data
* 3. otherwise use interpolation method
**/
static void bnCalcDnodeScore(SDnodeObj *pDnode) {
pDnode->score = bnCalcCpuScore(pDnode) + bnCalcMemoryScore(pDnode) + bnCalcDiskScore(pDnode) +
bnCalcBandScore(pDnode) + bnCalcModuleScore(pDnode) + bnCalcVnodeScore(pDnode, 0) +
pDnode->customScore;
}
float bnTryCalcDnodeScore(SDnodeObj *pDnode, int32_t extra) {
int32_t systemScore = bnCalcCpuScore(pDnode) + bnCalcMemoryScore(pDnode) + bnCalcDiskScore(pDnode) +
bnCalcBandScore(pDnode);
float moduleScore = bnCalcModuleScore(pDnode);
float vnodeScore = bnCalcVnodeScore(pDnode, extra);
float score = systemScore + moduleScore + vnodeScore + pDnode->customScore;
return score;
}
void bnInitDnodes() {
mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_SCORES, bnGetScoresMeta);
mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_SCORES, bnRetrieveScores);
mnodeAddShowFreeIterHandle(TSDB_MGMT_TABLE_SCORES, mnodeCancelGetNextDnode);
memset(&tsBnDnodes, 0, sizeof(SBnDnodes));
tsBnDnodes.maxSize = 16;
tsBnDnodes.list = calloc(tsBnDnodes.maxSize, sizeof(SDnodeObj *));
}
void bnCleanupDnodes() {
if (tsBnDnodes.list != NULL) {
free(tsBnDnodes.list);
tsBnDnodes.list = NULL;
}
}
static void bnCheckDnodesSize(int32_t dnodesNum) {
if (tsBnDnodes.maxSize <= dnodesNum) {
int32_t maxSize = dnodesNum * 2;
SDnodeObj** list1 = NULL;
int32_t retry = 0;
while(list1 == NULL && retry++ < 3) {
list1 = realloc(tsBnDnodes.list, maxSize * sizeof(SDnodeObj *));
}
if(list1) {
tsBnDnodes.list = list1;
tsBnDnodes.maxSize = maxSize;
}
}
}
void bnAccquireDnodes() {
int32_t dnodesNum = mnodeGetDnodesNum();
bnCheckDnodesSize(dnodesNum);
void * pIter = NULL;
SDnodeObj *pDnode = NULL;
int32_t dnodeIndex = 0;
while (1) {
if (dnodeIndex >= dnodesNum) {
mnodeCancelGetNextDnode(pIter);
break;
}
pIter = mnodeGetNextDnode(pIter, &pDnode);
if (pDnode == NULL) break;
if (pDnode->status == TAOS_DN_STATUS_OFFLINE) {
mnodeDecDnodeRef(pDnode);
continue;
}
bnCalcDnodeScore(pDnode);
int32_t orderIndex = dnodeIndex;
for (; orderIndex > 0; --orderIndex) {
if (pDnode->score > tsBnDnodes.list[orderIndex - 1]->score) {
break;
}
tsBnDnodes.list[orderIndex] = tsBnDnodes.list[orderIndex - 1];
}
tsBnDnodes.list[orderIndex] = pDnode;
dnodeIndex++;
}
tsBnDnodes.size = dnodeIndex;
}
void bnReleaseDnodes() {
for (int32_t i = 0; i < tsBnDnodes.size; ++i) {
SDnodeObj *pDnode = tsBnDnodes.list[i];
if (pDnode != NULL) {
mnodeDecDnodeRef(pDnode);
}
}
}
static int32_t bnGetScoresMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
SUserObj *pUser = mnodeGetUserFromConn(pConn);
if (pUser == NULL) return 0;
if (strcmp(pUser->pAcct->user, "root") != 0) {
mnodeDecUserRef(pUser);
return TSDB_CODE_MND_NO_RIGHTS;
}
int32_t cols = 0;
SSchema *pSchema = pMeta->schema;
pShow->bytes[cols] = 2;
pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
strcpy(pSchema[cols].name, "id");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_FLOAT;
strcpy(pSchema[cols].name, "system scores");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_FLOAT;
strcpy(pSchema[cols].name, "custom scores");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_FLOAT;
strcpy(pSchema[cols].name, "module scores");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_FLOAT;
strcpy(pSchema[cols].name, "vnode scores");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_FLOAT;
strcpy(pSchema[cols].name, "total scores");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT;
strcpy(pSchema[cols].name, "open vnodes");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT;
strcpy(pSchema[cols].name, "cpu cores");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 18 + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "balance state");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pMeta->numOfColumns = htons(cols);
pShow->numOfColumns = cols;
pShow->offset[0] = 0;
for (int32_t i = 1; i < cols; ++i) {
pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
}
pShow->numOfRows = mnodeGetDnodesNum();
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
pShow->pIter = NULL;
mnodeDecUserRef(pUser);
return 0;
}
static int32_t bnRetrieveScores(SShowObj *pShow, char *data, int32_t rows, void *pConn) {
int32_t numOfRows = 0;
SDnodeObj *pDnode = NULL;
char * pWrite;
int32_t cols = 0;
while (numOfRows < rows) {
pShow->pIter = mnodeGetNextDnode(pShow->pIter, &pDnode);
if (pDnode == NULL) break;
int32_t systemScore = bnCalcCpuScore(pDnode) + bnCalcMemoryScore(pDnode) + bnCalcDiskScore(pDnode) + bnCalcBandScore(pDnode);
float moduleScore = bnCalcModuleScore(pDnode);
float vnodeScore = bnCalcVnodeScore(pDnode, 0);
cols = 0;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int16_t *)pWrite = pDnode->dnodeId;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(float *)pWrite = (float)systemScore;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(float *)pWrite = (float)pDnode->customScore;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(float *)pWrite = (float)moduleScore;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(float *)pWrite = (float)vnodeScore;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(float *)pWrite = (float)(vnodeScore + moduleScore + pDnode->customScore + systemScore);
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int32_t *)pWrite = pDnode->openVnodes;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int32_t *)pWrite = pDnode->numOfCores;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
STR_TO_VARSTR(pWrite, dnodeStatus[pDnode->status]);
cols++;
numOfRows++;
mnodeDecDnodeRef(pDnode);
}
mnodeVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow);
pShow->numOfReads += numOfRows;
return numOfRows;
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "ttimer.h"
#include "tglobal.h"
#include "mnodeSdb.h"
#include "bnThread.h"
static SBnThread tsBnThread;
static void *bnThreadFunc(void *arg) {
setThreadName("balance");
while (1) {
pthread_mutex_lock(&tsBnThread.mutex);
if (tsBnThread.stop) {
pthread_mutex_unlock(&tsBnThread.mutex);
break;
}
pthread_cond_wait(&tsBnThread.cond, &tsBnThread.mutex);
mDebug("balance thread wakes up to work");
bool updateSoon = bnStart();
mDebug("balance thread finished this poll, updateSoon:%d", updateSoon);
bnStartTimer(updateSoon ? 1000 : -1);
pthread_mutex_unlock(&(tsBnThread.mutex));
}
mDebug("balance thread is stopped");
return NULL;
}
int32_t bnInitThread() {
memset(&tsBnThread, 0, sizeof(SBnThread));
tsBnThread.stop = false;
pthread_mutex_init(&tsBnThread.mutex, NULL);
pthread_cond_init(&tsBnThread.cond, NULL);
pthread_attr_t thattr;
pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
int32_t ret = pthread_create(&tsBnThread.thread, &thattr, bnThreadFunc, NULL);
pthread_attr_destroy(&thattr);
if (ret != 0) {
mError("failed to create balance thread since %s", strerror(ret));
return -1;
}
bnStartTimer(2000);
mDebug("balance thread is created");
return 0;
}
void bnCleanupThread() {
mDebug("balance thread will be cleanup");
if (tsBnThread.timer != NULL) {
taosTmrStopA(&tsBnThread.timer);
tsBnThread.timer = NULL;
mDebug("stop balance timer");
}
pthread_mutex_lock(&tsBnThread.mutex);
tsBnThread.stop = true;
pthread_cond_signal(&tsBnThread.cond);
pthread_mutex_unlock(&(tsBnThread.mutex));
pthread_join(tsBnThread.thread, NULL);
pthread_cond_destroy(&tsBnThread.cond);
pthread_mutex_destroy(&tsBnThread.mutex);
}
static void bnPostSignal() {
if (tsBnThread.stop) return;
pthread_mutex_lock(&tsBnThread.mutex);
pthread_cond_signal(&tsBnThread.cond);
pthread_mutex_unlock(&(tsBnThread.mutex));
}
/*
* once sdb work as mater, then tsAccessSquence reset to zero
* increase tsAccessSquence every balance interval
*/
static void bnProcessTimer(void *handle, void *tmrId) {
if (!sdbIsMaster()) return;
if (tsBnThread.stop) return;
tsBnThread.timer = NULL;
bnStartTimer(-1);
bnCheckStatus();
if (handle == NULL) {
++tsAccessSquence;
if (tsAccessSquence % tsBalanceInterval == 0) {
mDebug("balance function is scheduled by timer");
bnPostSignal();
}
} else {
int64_t mseconds = (int64_t)handle;
mDebug("balance function is scheduled by event for %" PRId64 " mseconds arrived", mseconds);
bnPostSignal();
}
}
void bnStartTimer(int32_t mseconds) {
if (tsBnThread.stop) return;
if (mseconds != -1) {
mTrace("balance function will be called after %d ms", mseconds);
taosTmrReset(bnProcessTimer, mseconds, (void *)(int64_t)mseconds, tsMnodeTmr, &tsBnThread.timer);
} else {
taosTmrReset(bnProcessTimer, tsStatusInterval * 1000, NULL, tsMnodeTmr, &tsBnThread.timer);
}
}
void bnNotify() {
bnStartTimer(500);
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_MONITOR_H
#define TDENGINE_MONITOR_H
#ifdef __cplusplus
extern "C" {
#endif
#include <stdint.h>
typedef struct {
char * acctId;
int64_t currentPointsPerSecond;
int64_t maxPointsPerSecond;
int64_t totalTimeSeries;
int64_t maxTimeSeries;
int64_t totalStorage;
int64_t maxStorage;
int64_t totalQueryTime;
int64_t maxQueryTime;
int64_t totalInbound;
int64_t maxInbound;
int64_t totalOutbound;
int64_t maxOutbound;
int64_t totalDbs;
int64_t maxDbs;
int64_t totalUsers;
int64_t maxUsers;
int64_t totalStreams;
int64_t maxStreams;
int64_t totalConns;
int64_t maxConns;
int8_t accessState;
} SAcctMonitorObj;
int32_t monInitSystem();
int32_t monStartSystem();
void monStopSystem();
void monCleanupSystem();
void monSaveAcctLog(SAcctMonitorObj *pMonObj);
void monSaveLog(int32_t level, const char *const format, ...);
void monExecuteSQL(char *sql);
typedef void (*MonExecuteSQLCbFP)(void *param, TAOS_RES *, int code);
void monExecuteSQLWithResultCallback(char *sql, MonExecuteSQLCbFP callback, void* param);
#ifdef __cplusplus
}
#endif
#endif
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_ACCT_H
#define TDENGINE_ACCT_H
#ifdef __cplusplus
extern "C" {
#endif
typedef enum {
ACCT_GRANT_USER,
ACCT_GRANT_DB,
ACCT_GRANT_TABLE
} EAcctGrantType;
int32_t acctInit();
void acctCleanUp();
int32_t acctCheck(void *pAcct, EAcctGrantType type);
#ifdef __cplusplus
}
#endif
#endif
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_ADMIN_H
#define TDENGINE_ADMIN_H
#ifdef __cplusplus
extern "C" {
#endif
struct HttpServer;
void adminInitHandle(struct HttpServer* pServer);
void opInitHandle(struct HttpServer* pServer);
#ifdef __cplusplus
}
#endif
#endif
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_BALANCE_H
#define TDENGINE_BALANCE_H
#ifdef __cplusplus
extern "C" {
#endif
struct SVgObj;
struct SDnodeObj;
int32_t bnInit();
void bnCleanUp();
void bnNotify();
void bnCheckModules();
void bnReset();
int32_t bnAllocVnodes(struct SVgObj *pVgroup);
int32_t bnAlterDnode(struct SDnodeObj *pDnode, int32_t vnodeId, int32_t dnodeId);
int32_t bnDropDnode(struct SDnodeObj *pDnode);
int32_t bnDnodeCanCreateMnode(struct SDnodeObj *pDnode);
#ifdef __cplusplus
}
#endif
#endif
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_GTANT_H
#define TDENGINE_GTANT_H
#ifdef __cplusplus
"C" {
#endif
typedef enum {
TSDB_GRANT_ALL,
TSDB_GRANT_TIME,
TSDB_GRANT_USER,
TSDB_GRANT_DB,
TSDB_GRANT_TIMESERIES,
TSDB_GRANT_DNODE,
TSDB_GRANT_ACCT,
TSDB_GRANT_STORAGE,
TSDB_GRANT_SPEED,
TSDB_GRANT_QUERY_TIME,
TSDB_GRANT_CONNS,
TSDB_GRANT_STREAMS,
TSDB_GRANT_CPU_CORES,
} EGrantType;
int32_t grantInit();
void grantCleanUp();
void grantParseParameter();
int32_t grantCheck(EGrantType grant);
void grantReset(EGrantType grant, uint64_t value);
void grantAdd(EGrantType grant, uint64_t value);
void grantRestore(EGrantType grant, uint64_t value);
#ifdef __cplusplus
}
#endif
#endif
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_MQTT_H
#define TDENGINE_MQTT_H
#ifdef __cplusplus
extern "C" {
#endif
int32_t mqttInitSystem();
int32_t mqttStartSystem();
void mqttStopSystem();
void mqttCleanUpSystem();
#ifdef __cplusplus
}
#endif
#endif
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_TP
#define TDENGINE_TP
#ifdef __cplusplus
extern "C" {
#endif
int32_t tpInit();
void tpCleanUp();
void tpUpdateTs(int32_t vgId, int64_t *seq, void *pMsg);
#ifdef __cplusplus
}
#endif
#endif
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_SYNC_H
#define TDENGINE_SYNC_H
#ifdef __cplusplus
extern "C" {
#endif
#define TAOS_SYNC_MAX_REPLICA 5
#define TAOS_SYNC_MAX_INDEX 0x7FFFFFFF
typedef enum {
TAOS_SYNC_ROLE_OFFLINE = 0,
TAOS_SYNC_ROLE_UNSYNCED = 1,
TAOS_SYNC_ROLE_SYNCING = 2,
TAOS_SYNC_ROLE_SLAVE = 3,
TAOS_SYNC_ROLE_MASTER = 4
} ESyncRole;
typedef enum {
TAOS_SYNC_STATUS_INIT = 0,
TAOS_SYNC_STATUS_START = 1,
TAOS_SYNC_STATUS_FILE = 2,
TAOS_SYNC_STATUS_CACHE = 3
} ESyncStatus;
typedef struct {
uint32_t nodeId; // node ID assigned by TDengine
uint16_t nodePort; // node sync Port
char nodeFqdn[TSDB_FQDN_LEN]; // node FQDN
} SNodeInfo;
typedef struct {
int8_t quorum; // number of confirms required, >=1
int8_t replica; // number of replications, >=1
SNodeInfo nodeInfo[TAOS_SYNC_MAX_REPLICA];
} SSyncCfg;
typedef struct {
int32_t selfIndex;
uint32_t nodeId[TAOS_SYNC_MAX_REPLICA];
int32_t role[TAOS_SYNC_MAX_REPLICA];
} SNodesRole;
// get the wal file from index or after
// return value, -1: error, 1:more wal files, 0:last WAL. if name[0]==0, no WAL file
typedef int32_t (*FGetWalInfo)(int32_t vgId, char *fileName, int64_t *fileId);
// when a forward pkt is received, call this to handle data
typedef int32_t (*FWriteToCache)(int32_t vgId, void *pHead, int32_t qtype, void *pMsg);
// when forward is confirmed by peer, master call this API to notify app
typedef void (*FConfirmForward)(int32_t vgId, void *mhandle, int32_t code);
// when role is changed, call this to notify app
typedef void (*FNotifyRole)(int32_t vgId, int8_t role);
// if a number of retrieving data failed, call this to start flow control
typedef void (*FNotifyFlowCtrl)(int32_t vgId, int32_t level);
// when data file is synced successfully, notity app
typedef void (*FStartSyncFile)(int32_t vgId);
typedef void (*FStopSyncFile)(int32_t vgId, uint64_t fversion);
// get file version
typedef int32_t (*FGetVersion)(int32_t vgId, uint64_t *fver, uint64_t *vver);
typedef int32_t (*FSendFile)(void *tsdb, SOCKET socketFd);
typedef int32_t (*FRecvFile)(void *tsdb, SOCKET socketFd);
typedef struct {
int32_t vgId; // vgroup ID
uint64_t version; // initial version
SSyncCfg syncCfg; // configuration from mgmt
char path[TSDB_FILENAME_LEN]; // path to the file
void * pTsdb;
FGetWalInfo getWalInfoFp;
FWriteToCache writeToCacheFp;
FConfirmForward confirmForward;
FNotifyRole notifyRoleFp;
FNotifyFlowCtrl notifyFlowCtrlFp;
FStartSyncFile startSyncFileFp;
FStopSyncFile stopSyncFileFp;
FGetVersion getVersionFp;
FSendFile sendFileFp;
FRecvFile recvFileFp;
} SSyncInfo;
typedef void *tsync_h;
int32_t syncInit();
void syncCleanUp();
int64_t syncStart(const SSyncInfo *);
void syncStop(int64_t rid);
int32_t syncReconfig(int64_t rid, const SSyncCfg *);
int32_t syncForwardToPeer(int64_t rid, void *pHead, void *mhandle, int32_t qtype, bool force);
void syncConfirmForward(int64_t rid, uint64_t version, int32_t code, bool force);
void syncRecover(int64_t rid); // recover from other nodes:
int32_t syncGetNodesRole(int64_t rid, SNodesRole *);
extern char *syncRole[];
//global configurable parameters
extern int32_t sDebugFlag;
extern char tsArbitrator[];
extern uint16_t tsSyncPort;
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_SYNC_H
CMAKE_MINIMUM_REQUIRED(VERSION 2.8...3.20)
PROJECT(TDengine)
INCLUDE_DIRECTORIES(inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/client/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/query/inc)
AUX_SOURCE_DIRECTORY(./src SRC)
ADD_LIBRARY(monitor ${SRC})
IF (TD_SOMODE_STATIC)
TARGET_LINK_LIBRARIES(monitor taos_static)
ELSE ()
TARGET_LINK_LIBRARIES(monitor taos)
ENDIF ()
此差异已折叠。
CMAKE_MINIMUM_REQUIRED(VERSION 2.8...3.20)
PROJECT(TDengine)
ADD_DEFINITIONS(-DWAL_CHECKSUM_WHOLE)
INCLUDE_DIRECTORIES(inc)
AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR}/src SRC)
ADD_LIBRARY(twal ${SRC})
TARGET_LINK_LIBRARIES(twal tutil common)
ADD_SUBDIRECTORY(test)
CMAKE_MINIMUM_REQUIRED(VERSION 2.8...3.20)
PROJECT(TDengine)
IF (TD_LINUX)
INCLUDE_DIRECTORIES(../inc)
LIST(APPEND WALTEST_SRC ./waltest.c)
ADD_EXECUTABLE(waltest ${WALTEST_SRC})
TARGET_LINK_LIBRARIES(waltest twal os tutil)
ENDIF ()
IF (TD_DARWIN)
INCLUDE_DIRECTORIES(../inc)
LIST(APPEND WALTEST_SRC ./waltest.c)
ADD_EXECUTABLE(waltest ${WALTEST_SRC})
TARGET_LINK_LIBRARIES(waltest twal os tutil)
ENDIF ()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册