提交 d9762010 编写于 作者: S Shengliang Guan

Merge remote-tracking branch 'origin/3.0' into feature/dnode3

......@@ -101,4 +101,4 @@ TAGS
contrib/*
!contrib/CMakeLists.txt
!contrib/test
!contrib/test
\ No newline at end of file
......@@ -10,7 +10,7 @@ set(CMAKE_SUPPORT_DIR "${CMAKE_SOURCE_DIR}/cmake")
set(CMAKE_CONTRIB_DIR "${CMAKE_SOURCE_DIR}/contrib")
include(${CMAKE_SUPPORT_DIR}/cmake.options)
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -fPIC -gdwarf-2 -msse4.2 -mfma")
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -fPIC -gdwarf-2 -msse4.2 -mfma -g3")
# contrib
add_subdirectory(contrib)
......@@ -20,9 +20,13 @@ add_library(api INTERFACE)
target_include_directories(api INTERFACE "include/client")
# src
if(${BUILD_TEST})
include(CTest)
enable_testing()
endif(${BUILD_TEST})
add_subdirectory(source)
# docs
add_subdirectory(docs)
# tests (TODO)
# tests (TODO)
\ No newline at end of file
......@@ -73,10 +73,12 @@ typedef struct taosField {
#define DLL_EXPORT
#endif
DLL_EXPORT int taos_init();
typedef void (*__taos_async_fn_t)(void *param, TAOS_RES *, int code);
DLL_EXPORT void taos_cleanup(void);
DLL_EXPORT int taos_options(TSDB_OPTION option, const void *arg, ...);
DLL_EXPORT TAOS *taos_connect(const char *ip, const char *user, const char *pass, const char *db, uint16_t port);
DLL_EXPORT TAOS *taos_connect_l(const char *ip, int ipLen, const char *user, int userLen, const char *pass, int passLen, const char *db, int dbLen, uint16_t port);
DLL_EXPORT TAOS *taos_connect_auth(const char *ip, const char *user, const char *auth, const char *db, uint16_t port);
DLL_EXPORT void taos_close(TAOS *taos);
......@@ -154,14 +156,14 @@ DLL_EXPORT int* taos_fetch_lengths(TAOS_RES *res);
// TAOS_RES *taos_list_dbs(TAOS *mysql, const char *wild);
// TODO: the return value should be `const`
DLL_EXPORT char *taos_get_server_info(TAOS *taos);
DLL_EXPORT char *taos_get_client_info();
DLL_EXPORT char *taos_errstr(TAOS_RES *tres);
DLL_EXPORT const char *taos_get_server_info(TAOS *taos);
DLL_EXPORT const char *taos_get_client_info();
DLL_EXPORT const char *taos_errstr(TAOS_RES *tres);
DLL_EXPORT int taos_errno(TAOS_RES *tres);
DLL_EXPORT void taos_query_a(TAOS *taos, const char *sql, void (*fp)(void *param, TAOS_RES *, int code), void *param);
DLL_EXPORT void taos_fetch_rows_a(TAOS_RES *res, void (*fp)(void *param, TAOS_RES *, int numOfRows), void *param);
DLL_EXPORT void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param);
DLL_EXPORT void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param);
typedef void (*TAOS_SUBSCRIBE_CALLBACK)(TAOS_SUB* tsub, TAOS_RES *res, void* param, int code);
DLL_EXPORT TAOS_SUB *taos_subscribe(TAOS* taos, int restart, const char* topic, const char *sql, TAOS_SUBSCRIBE_CALLBACK fp, void *param, int interval);
......
......@@ -77,7 +77,7 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_FUNCTION, "drop-function" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_STB, "create-stb" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_STB, "alter-stb" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_STB, "drop-stb" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_STB_VGROUP, "stb-vgroup" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_VGROUP_LIST, "vgroup-list" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_KILL_QUERY, "kill-query" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_KILL_STREAM, "kill-stream" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_KILL_CONN, "kill-conn" )
......@@ -214,6 +214,11 @@ typedef enum _mgmt_table {
extern char *taosMsg[];
typedef struct SBuildTableMetaInput {
int32_t vgId;
char *tableFullName;
} SBuildTableMetaInput;
#pragma pack(push, 1)
// null-terminated string instead of char array to avoid too many memory consumption in case of more than 1M tableMeta
......@@ -358,6 +363,7 @@ typedef struct {
int32_t pid;
char app[TSDB_APP_NAME_LEN];
char db[TSDB_DB_NAME_LEN];
int64_t startTime;
} SConnectMsg;
typedef struct SEpSet {
......@@ -368,19 +374,17 @@ typedef struct SEpSet {
} SEpSet;
typedef struct {
int32_t acctId;
int32_t clusterId;
int32_t connId;
int8_t superAuth;
int8_t readAuth;
int8_t writeAuth;
int8_t reserved[5];
SEpSet epSet;
int32_t acctId;
uint32_t clusterId;
int32_t connId;
int8_t superUser;
int8_t reserved[5];
SEpSet epSet;
} SConnectRsp;
typedef struct {
char user[TSDB_USER_LEN];
char pass[TSDB_KEY_LEN];
char pass[TSDB_PASSWORD_LEN];
int32_t maxUsers;
int32_t maxDbs;
int32_t maxTimeSeries;
......@@ -395,7 +399,7 @@ typedef struct {
typedef struct {
char user[TSDB_USER_LEN];
char pass[TSDB_KEY_LEN];
char pass[TSDB_PASSWORD_LEN];
} SCreateUserMsg, SAlterUserMsg;
typedef struct {
......@@ -774,9 +778,8 @@ typedef struct {
} SStbInfoMsg;
typedef struct {
SMsgHead msgHead;
char tableFname[TSDB_TABLE_FNAME_LEN];
int8_t createFlag;
char tags[];
} STableInfoMsg;
typedef struct {
......@@ -791,6 +794,20 @@ typedef struct SSTableVgroupMsg {
int32_t numOfTables;
} SSTableVgroupMsg, SSTableVgroupRspMsg;
typedef struct SVgroupInfo {
int32_t vgId;
int8_t numOfEps;
SEpAddrMsg epAddr[TSDB_MAX_REPLICA];
} SVgroupInfo;
typedef struct SVgroupListRspMsg {
int32_t vgroupNum;
int32_t vgroupVersion;
SVgroupInfo vgroupInfo[];
} SVgroupListRspMsg;
typedef SVgroupListRspMsg SVgroupListInfo;
typedef struct {
int32_t vgId;
int8_t numOfEps;
......@@ -961,8 +978,8 @@ typedef struct {
char user[TSDB_USER_LEN];
char spi;
char encrypt;
char secret[TSDB_KEY_LEN];
char ckey[TSDB_KEY_LEN];
char secret[TSDB_PASSWORD_LEN];
char ckey[TSDB_PASSWORD_LEN];
} SAuthMsg, SAuthRsp;
typedef struct {
......
#ifndef TDENGINE_TEP_H
#define TDENGINE_TEP_H
#include "os.h"
#include "taosmsg.h"
typedef struct SCorEpSet {
int32_t version;
SEpSet epSet;
} SCorEpSet;
int taosGetFqdnPortFromEp(const char *ep, char *fqdn, uint16_t *port);
bool isEpsetEqual(const SEpSet *s1, const SEpSet *s2);
void updateEpSet_s(SCorEpSet *pEpSet, SEpSet *pNewEpSet);
#endif // TDENGINE_TEP_H
......@@ -81,8 +81,6 @@ extern int64_t tsMaxRetentWindow;
// db parameters in client
extern int32_t tsCacheBlockSize;
extern int32_t tsBlocksPerVnode;
extern int32_t tsMinTablePerVnode;
extern int32_t tsMaxTablePerVnode;
extern int32_t tsTableIncStepPerVnode;
extern int32_t tsMaxVgroupsPerDb;
extern int16_t tsDaysPerFile;
......@@ -113,16 +111,8 @@ extern int8_t tsEnableSlaveQuery;
extern int8_t tsEnableAdjustMaster;
// restful
extern int8_t tsEnableHttpModule;
extern int32_t tsRestRowLimit;
extern uint16_t tsHttpPort;
extern int32_t tsHttpCacheSessions;
extern int32_t tsHttpSessionExpire;
extern int32_t tsHttpMaxThreads;
extern int8_t tsHttpEnableCompress;
extern int8_t tsHttpEnableRecordSql;
extern int8_t tsTelegrafUseFieldNum;
extern int8_t tsHttpDbNameMandatory;
// mqtt
extern int8_t tsEnableMqttModule;
......@@ -144,7 +134,6 @@ extern int8_t tsEnableStream;
// internal
extern int8_t tsPrintAuth;
extern int8_t tscEmbedded;
extern char tsVnodeDir[];
extern char tsMnodeDir[];
extern int64_t tsTickPerDay[3];
......@@ -193,7 +182,7 @@ extern SDiskCfg tsDiskCfg[];
#define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize)
void taosInitGlobalCfg();
int32_t taosCheckGlobalCfg();
int32_t taosCheckAndPrintCfg();
int32_t taosCfgDynamicOptions(char *msg);
bool taosCheckBalanceCfgOptions(const char *option, int32_t *vnodeId, int32_t *dnodeId);
void taosAddDataDir(int index, char *v1, int level, int primary);
......
......@@ -256,7 +256,7 @@ typedef struct STQ {
// the collection of group handle
// the handle of kvstore
char* path;
STqCfg* tqConfig;
STqCfg* tqConfig;
TqLogReader* tqLogReader;
TqMemRef tqMemRef;
TqMetaStore* tqMeta;
......
......@@ -36,6 +36,8 @@ typedef struct SVnodeCfg {
struct {
/** write buffer size */
uint64_t wsize;
uint64_t ssize;
uint64_t lsize;
/** use heap allocator or arena allocator */
bool isHeapAllocator;
};
......@@ -66,9 +68,11 @@ typedef struct SVnodeCfg {
/**
* @brief Initialize the vnode module
*
* @param nthreads number of commit threads. 0 for no threads and
* a schedule queue should be given (TODO)
* @return int 0 for success and -1 for failure
*/
int vnodeInit();
int vnodeInit(uint16_t nthreads);
/**
* @brief clear a vnode
......
......@@ -30,16 +30,23 @@ extern "C" {
struct SCatalog;
typedef struct SMetaReq {
char clusterId[TSDB_CLUSTER_ID_LEN];
typedef struct SDBVgroupInfo {
int32_t vgroupVersion;
SArray *vgId;
int32_t hashRange;
int32_t hashNum;
} SDBVgroupInfo;
typedef struct SCatalogReq {
char clusterId[TSDB_CLUSTER_ID_LEN]; //????
SArray *pTableName; // table full name
SArray *pUdf; // udf name
bool qNodeEpset; // valid qnode
} SMetaReq;
bool qNodeRequired; // valid qnode
} SCatalogReq;
typedef struct SMetaData {
SArray *pTableMeta; // tableMeta
SArray *pVgroupInfo; // vgroupInfo list
SArray *pTableMeta; // STableMeta array
SArray *pVgroupInfo; // SVgroupInfo list
SArray *pUdfList; // udf info list
SEpSet *pEpSet; // qnode epset list
} SMetaData;
......@@ -78,32 +85,71 @@ typedef struct STableMeta {
SSchema schema[];
} STableMeta;
typedef struct SCatalogCfg {
} SCatalogCfg;
int32_t catalogInit(SCatalogCfg *cfg);
/**
* Catalog service object, which is utilized to hold tableMeta (meta/vgroupInfo/udfInfo) at the client-side.
* There is ONLY one SCatalog object for one process space, and this function returns a singleton.
* @param pMgmtEps
* @param clusterId
* @return
*/
int32_t catalogGetHandle(const char *clusterId, struct SCatalog** catalogHandle);
int32_t catalogGetVgroupVersion(struct SCatalog* pCatalog, int32_t* version);
int32_t catalogGetVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, SArray** pVgroupList);
int32_t catalogUpdateVgroup(struct SCatalog* pCatalog, SVgroupListInfo* pVgroup);
int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName, int32_t* version);
int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbName, int32_t forceUpdate, SDBVgroupInfo** dbInfo);
int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo);
int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pTableName, STableMeta* pTableMeta);
int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const STableMeta* pTableMeta);
int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const STableMeta* pTableMeta, STableMeta* pNewTableMeta);
/**
* get table's vgroup list.
* @param clusterId
* @pVgroupList - array of SVgroupInfo
* @return
*/
struct SCatalog* getCatalogHandle(const SEpSet* pMgmtEps);
int32_t catalogGetTableVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pTableName, SArray* pVgroupList);
/**
* Get the required meta data from mnode.
* Note that this is a synchronized API and is also thread-safety.
* @param pCatalog
* @param pMgmtEps
* @param pMetaReq
* @param pMetaData
* @return
*/
int32_t catalogGetMetaData(struct SCatalog* pCatalog, const SMetaReq* pMetaReq, SMetaData* pMetaData);
int32_t catalogGetAllMeta(struct SCatalog* pCatalog, const SEpSet* pMgmtEps, const SCatalogReq* pReq, SMetaData* pRsp);
int32_t catalogGetQnodeList(struct SCatalog* pCatalog, const SEpSet* pMgmtEps, SEpSet* pQnodeEpSet);
/**
* Destroy catalog service handle
* Destroy catalog and relase all resources
* @param pCatalog
*/
void destroyCatalog(struct SCatalog* pCatalog);
void catalogDestroy(void);
#ifdef __cplusplus
}
#endif
#endif /*_TD_CATALOG_H_*/
\ No newline at end of file
#endif /*_TD_CATALOG_H_*/
......@@ -162,7 +162,7 @@ typedef struct SVgDataBlocks {
int64_t vgId; // virtual group id
int32_t numOfTables; // number of tables in current submit block
uint32_t size;
char *pData;
char *pData; // SMsgDesc + SSubmitMsg + SSubmitBlk + ...
} SVgDataBlocks;
typedef struct SInsertStmtInfo {
......
......@@ -13,11 +13,20 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_TSDB_COMMIT_QUEUE_H_
#define _TD_TSDB_COMMIT_QUEUE_H_
#ifndef _TD_QUERY_H_
#define _TD_QUERY_H_
typedef enum { COMMIT_REQ, COMPACT_REQ,COMMIT_CONFIG_REQ } TSDB_REQ_T;
#ifdef __cplusplus
extern "C" {
#endif
int tsdbScheduleCommit(STsdbRepo *pRepo, TSDB_REQ_T req);
#endif /* _TD_TSDB_COMMIT_QUEUE_H_ */
\ No newline at end of file
extern int32_t (*queryBuildMsg[TSDB_MSG_TYPE_MAX])(void* input, char **msg, int32_t msgSize, int32_t *msgLen);
extern int32_t (*queryProcessMsgRsp[TSDB_MSG_TYPE_MAX])(void* output, char *msg, int32_t msgSize);
#ifdef __cplusplus
}
#endif
#endif /*_TD_QUERY_H_*/
......@@ -29,11 +29,6 @@ extern "C" {
extern int tsRpcHeadSize;
typedef struct SRpcCorEpSet {
int32_t version;
SEpSet epSet;
} SRpcCorEpSet;
typedef struct SRpcConnInfo {
uint32_t clientIp;
uint16_t clientPort;
......
......@@ -32,6 +32,23 @@ extern int32_t wDebugFlag;
#define wDebug(...) { if (wDebugFlag & DEBUG_DEBUG) { taosPrintLog("WAL ", wDebugFlag, __VA_ARGS__); }}
#define wTrace(...) { if (wDebugFlag & DEBUG_TRACE) { taosPrintLog("WAL ", wDebugFlag, __VA_ARGS__); }}
#define WAL_PREFIX "wal"
#define WAL_PREFIX_LEN 3
#define WAL_NOSUFFIX_LEN 20
#define WAL_SUFFIX_AT (WAL_NOSUFFIX_LEN+1)
#define WAL_LOG_SUFFIX "log"
#define WAL_INDEX_SUFFIX "idx"
#define WAL_REFRESH_MS 1000
#define WAL_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead) + 16)
#define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12)
#define WAL_FILE_LEN (WAL_PATH_LEN + 32)
#define WAL_IDX_ENTRY_SIZE (sizeof(int64_t)*2)
#define WAL_CUR_POS_WRITABLE 1
#define WAL_CUR_FILE_WRITABLE 2
#define WAL_CUR_FAILED 4
#pragma pack(push,1)
typedef enum {
TAOS_WAL_NOLOG = 0,
TAOS_WAL_WRITE = 1,
......@@ -43,8 +60,9 @@ typedef struct SWalReadHead {
uint8_t msgType;
int8_t reserved[2];
int32_t len;
//int64_t ingestTs; //not implemented
int64_t version;
char cont[];
char body[];
} SWalReadHead;
typedef struct {
......@@ -52,8 +70,9 @@ typedef struct {
int32_t fsyncPeriod; // millisecond
int32_t retentionPeriod; // secs
int32_t rollPeriod; // secs
int64_t retentionSize;
int64_t segSize;
EWalType walLevel; // wal level
EWalType level; // wal level
} SWalCfg;
typedef struct {
......@@ -70,64 +89,48 @@ typedef struct {
SWalReadHead head;
} SWalHead;
#define WAL_PREFIX "wal"
#define WAL_PREFIX_LEN 3
#define WAL_NOSUFFIX_LEN 20
#define WAL_SUFFIX_AT (WAL_NOSUFFIX_LEN+1)
#define WAL_LOG_SUFFIX "log"
#define WAL_INDEX_SUFFIX "idx"
#define WAL_REFRESH_MS 1000
#define WAL_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead) + 16)
#define WAL_SIGNATURE ((uint32_t)(0xFAFBFDFEUL))
#define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12)
#define WAL_FILE_LEN (WAL_PATH_LEN + 32)
//#define WAL_FILE_NUM 1 // 3
#define WAL_FILESET_MAX 128
#define WAL_IDX_ENTRY_SIZE (sizeof(int64_t)*2)
#define WAL_CUR_POS_WRITABLE 1
#define WAL_CUR_FILE_WRITABLE 2
#define WAL_CUR_FAILED 4
typedef struct SWalVer {
int64_t firstVer;
int64_t verInSnapshotting;
int64_t snapshotVer;
int64_t commitVer;
int64_t lastVer;
} SWalVer;
typedef struct SWal {
// cfg
int32_t vgId;
int32_t fsyncPeriod; // millisecond
int32_t rollPeriod; // second
int64_t segSize;
int64_t retentionSize;
int32_t retentionPeriod;
EWalType level;
//total size
int64_t totSize;
//fsync seq
int32_t fsyncSeq;
//reference
int64_t refId;
//write tfd
int64_t writeLogTfd;
int64_t writeIdxTfd;
//wal lifecycle
int64_t firstVersion;
int64_t snapshotVersion;
int64_t commitVersion;
int64_t lastVersion;
//snapshotting version
int64_t snapshottingVer;
//roll status
int64_t lastRollSeq;
SWalCfg cfg;
SWalVer vers;
//file set
int32_t writeCur;
int64_t writeLogTfd;
int64_t writeIdxTfd;
SArray* fileInfoSet;
//ctl
int32_t curStatus;
int32_t fsyncSeq;
int64_t totSize;
int64_t refId;
int64_t lastRollSeq;
pthread_mutex_t mutex;
//path
char path[WAL_PATH_LEN];
//reusable write head
SWalHead head;
SWalHead writeHead;
} SWal; // WAL HANDLE
typedef struct SWalReadHandle {
SWal* pWal;
int64_t readLogTfd;
int64_t readIdxTfd;
int64_t curFileFirstVer;
int64_t curVersion;
int64_t capacity;
int64_t status; //if cursor valid
SWalHead* pHead;
} SWalReadHandle;
#pragma pack(pop)
typedef int32_t (*FWalWrite)(void *ahandle, void *pHead);
// module initialization
......@@ -153,6 +156,10 @@ int32_t walEndTakeSnapshot(SWal *);
//int32_t walDataCorrupted(SWal*);
// read
SWalReadHandle* walOpenReadHandle(SWal *);
void walCloseReadHandle(SWalReadHandle *);
int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver);
int32_t walRead(SWal *, SWalHead **, int64_t ver);
int32_t walReadWithFp(SWal *, FWalWrite writeFp, int64_t verStart, int32_t readNum);
......
......@@ -57,7 +57,7 @@ int64_t taosGetPthreadId(pthread_t thread);
void taosResetPthread(pthread_t* thread);
bool taosComparePthread(pthread_t first, pthread_t second);
int32_t taosGetPId();
int32_t taosGetCurrentAPPName(char* name, int32_t* len);
int32_t taosGetAppName(char* name, int32_t* len);
#ifdef __cplusplus
}
......
......@@ -117,6 +117,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_TSC_INVALID_JSON TAOS_DEF_ERROR_CODE(0, 0x0221) //"Invalid JSON format")
#define TSDB_CODE_TSC_INVALID_JSON_TYPE TAOS_DEF_ERROR_CODE(0, 0x0222) //"Invalid JSON data type")
#define TSDB_CODE_TSC_VALUE_OUT_OF_RANGE TAOS_DEF_ERROR_CODE(0, 0x0223) //"Value out of range")
#define TSDB_CODE_TSC_INVALID_INPUT TAOS_DEF_ERROR_CODE(0, 0X0224) //"Invalid tsc input")
// mnode
#define TSDB_CODE_MND_MSG_NOT_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0300)
......@@ -501,6 +502,13 @@ int32_t* taosGetErrno();
// monitor
#define TSDB_CODE_MON_CONNECTION_INVALID TAOS_DEF_ERROR_CODE(0, 0x2300) //"monitor invalid monitor db connection")
// catalog
#define TSDB_CODE_CTG_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x2400) //catalog interval error
#define TSDB_CODE_CTG_INVALID_INPUT TAOS_DEF_ERROR_CODE(0, 0x2401) //invalid catalog input parameters
#define TSDB_CODE_CTG_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x2402) //catalog is not ready
#define TSDB_CODE_CTG_MEM_ERROR TAOS_DEF_ERROR_CODE(0, 0x2403) //catalog memory error
#define TSDB_CODE_CTG_SYS_ERROR TAOS_DEF_ERROR_CODE(0, 0x2404) //catalog system error
#ifdef __cplusplus
}
#endif
......
......@@ -43,6 +43,13 @@ typedef struct SArray {
*/
void* taosArrayInit(size_t size, size_t elemSize);
/**
*
* @param tsize
* @return
*/
int32_t taosArrayEnsureCap(SArray* pArray, size_t tsize);
/**
*
* @param pArray
......
......@@ -20,7 +20,7 @@
extern "C" {
#endif
#define TSDB_CFG_MAX_NUM 123
#define TSDB_CFG_MAX_NUM 115
#define TSDB_CFG_PRINT_LEN 23
#define TSDB_CFG_OPTION_LEN 24
#define TSDB_CFG_VALUE_LEN 41
......@@ -83,11 +83,11 @@ extern int32_t tsGlobalConfigNum;
extern char * tsCfgStatusStr[];
void taosReadGlobalLogCfg();
int32_t taosReadGlobalCfg();
void taosPrintGlobalCfg();
int32_t taosReadCfgFromFile();
void taosPrintCfg();
void taosDumpGlobalCfg();
void taosInitConfigOption(SGlobalCfg cfg);
void taosAddConfigOption(SGlobalCfg cfg);
SGlobalCfg *taosGetConfigOption(const char *option);
#ifdef __cplusplus
......
......@@ -161,7 +161,7 @@ do { \
#define TSDB_NODE_NAME_LEN 64
#define TSDB_TABLE_NAME_LEN 193 // it is a null-terminated string
#define TSDB_DB_NAME_LEN 33
#define TSDB_DB_NAME_LEN 65
#define TSDB_FULL_DB_NAME_LEN (TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN)
#define TSDB_FUNC_NAME_LEN 65
......@@ -193,7 +193,7 @@ do { \
#define TSDB_MAX_TAG_CONDITIONS 1024
#define TSDB_AUTH_LEN 16
#define TSDB_KEY_LEN 16
#define TSDB_PASSWORD_LEN 32
#define TSDB_VERSION_LEN 12
#define TSDB_LABEL_LEN 8
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_UTIL_TDLIST_H_
#define _TD_UTIL_TDLIST_H_
#ifdef __cplusplus
extern "C" {
#endif
// Single linked list
#define TD_SLIST_NODE(TYPE) \
struct { \
struct TYPE *sl_next_; \
}
#define TD_SLIST(TYPE) \
struct { \
struct TYPE *sl_head_; \
int sl_neles_; \
}
#define TD_SLIST_HEAD(sl) ((sl)->sl_head_)
#define TD_SLIST_NELES(sl) ((sl)->sl_neles_)
#define TD_SLIST_NODE_NEXT(sln) ((sln)->sl_next_)
#define tSListInit(sl) \
do { \
(sl)->sl_head_ = NULL; \
(sl)->sl_neles_ = 0; \
} while (0)
#define tSListPush(sl, sln) \
do { \
TD_SLIST_NODE_NEXT(sln) = TD_SLIST_HEAD(sl); \
TD_SLIST_HEAD(sl) = (sln); \
TD_SLIST_NELES(sl) += 1; \
} while (0)
#define tSListPop(sl) \
do { \
TD_SLIST_HEAD(sl) = TD_SLIST_NODE_NEXT(TD_SLIST_HEAD(sl)); \
TD_SLIST_NELES(sl) -= 1; \
} while (0)
// Double linked list
#define TD_DLIST_NODE(TYPE) \
struct { \
struct TYPE *dl_prev_; \
struct TYPE *dl_next_; \
}
#define TD_DLIST(TYPE) \
struct { \
struct TYPE *dl_head_; \
struct TYPE *dl_tail_; \
int dl_neles_; \
}
#define TD_DLIST_NODE_PREV(dln) ((dln)->dl_prev_)
#define TD_DLIST_NODE_NEXT(dln) ((dln)->dl_next_)
#define TD_DLIST_HEAD(dl) ((dl)->dl_head_)
#define TD_DLIST_TAIL(dl) ((dl)->dl_tail_)
#define TD_DLIST_NELES(dl) ((dl)->dl_neles_)
#define tDListInit(dl) \
do { \
TD_DLIST_HEAD(dl) = TD_DLIST_TAIL(dl) = NULL; \
TD_DLIST_NELES(dl) = 0; \
} while (0)
#define tDListAppend(dl, dln) \
do { \
if (TD_DLIST_HEAD(dl) == NULL) { \
TD_DLIST_NODE_PREV(dln) = TD_DLIST_NODE_NEXT(dln) = NULL; \
TD_DLIST_HEAD(dl) = TD_DLIST_TAIL(dl) = (dln); \
} else { \
TD_DLIST_NODE_PREV(dln) = TD_DLIST_TAIL(dl); \
TD_DLIST_NODE_NEXT(dln) = NULL; \
TD_DLIST_NODE_NEXT(TD_DLIST_TAIL(dl)) = (dln); \
TD_DLIST_TAIL(dl) = (dln); \
} \
TD_DLIST_NELES(dl) += 1; \
} while (0)
#define tDListPrepend(dl, dln) \
do { \
if (TD_DLIST_HEAD(dl) == NULL) { \
TD_DLIST_NODE_PREV(dln) = TD_DLIST_NODE_NEXT(dln) = NULL; \
TD_DLIST_HEAD(dl) = TD_DLIST_TAIL(dl) = (dln); \
} else { \
TD_DLIST_NODE_PREV(dln) = NULL; \
TD_DLIST_NODE_NEXT(dln) = TD_DLIST_HEAD(dl); \
TD_DLIST_NODE_PREV(TD_DLIST_HEAD(dl)) = (dln); \
TD_DLIST_HEAD(dl) = (dln); \
} \
TD_DLIST_NELES(dl) += 1; \
} while (0)
#define tDListPop(dl, dln) \
do { \
if (TD_DLIST_HEAD(dl) == (dln)) { \
TD_DLIST_HEAD(dl) = TD_DLIST_NODE_NEXT(dln); \
} \
if (TD_DLIST_TAIL(dl) == (dln)) { \
TD_DLIST_TAIL(dl) = TD_DLIST_NODE_PREV(dln); \
} \
if (TD_DLIST_NODE_PREV(dln) != NULL) { \
TD_DLIST_NODE_NEXT(TD_DLIST_NODE_PREV(dln)) = TD_DLIST_NODE_NEXT(dln); \
} \
if (TD_DLIST_NODE_NEXT(dln) != NULL) { \
TD_DLIST_NODE_PREV(TD_DLIST_NODE_NEXT(dln)) = TD_DLIST_NODE_PREV(dln); \
} \
TD_DLIST_NELES(dl) -= 1; \
TD_DLIST_NODE_PREV(dln) = TD_DLIST_NODE_NEXT(dln) = NULL; \
} while (0)
#if 0
// List iterator
#define TD_LIST_FITER 0
#define TD_LIST_BITER 1
#define TD_LIST_ITER(S) \
struct { \
int it_dir_; \
S * it_next_; \
S * it_ptr_; \
TD_DLIST(S) * it_list_; \
}
#define tlistIterInit(it, l, dir) \
(it)->it_dir_ = (dir); \
(it)->it_list_ = l; \
if ((dir) == TD_LIST_FITER) { \
(it)->it_next_ = (l)->dl_head_; \
} else { \
(it)->it_next_ = (l)->dl_tail_; \
}
#define tlistIterNext(it) \
({ \
(it)->it_ptr_ = (it)->it_next_; \
if ((it)->it_next_ != NULL) { \
if ((it)->it_dir_ == TD_LIST_FITER) { \
(it)->it_next_ = (it)->it_next_->next_; \
} else { \
(it)->it_next_ = (it)->it_next_->prev_; \
} \
} \
(it)->it_ptr_; \
})
#endif
#ifdef __cplusplus
}
#endif
#endif /*_TD_UTIL_TDLIST_H_*/
\ No newline at end of file
......@@ -45,6 +45,8 @@ extern int32_t sDebugFlag;
extern int32_t tsdbDebugFlag;
extern int32_t cqDebugFlag;
extern int32_t debugFlag;
extern int32_t ctgDebugFlag;
#define DEBUG_FATAL 1U
#define DEBUG_ERROR DEBUG_FATAL
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_UTIL_MACRO_H_
#define _TD_UTIL_MACRO_H_
#include "os.h"
#ifdef __cplusplus
extern "C" {
#endif
// Module init/clear MACRO definitions
#define TD_MOD_UNINITIALIZED 0
#define TD_MOD_INITIALIZED 1
#define TD_MOD_UNCLEARD 0
#define TD_MOD_CLEARD 1
typedef int8_t td_mode_flag_t;
#define TD_CHECK_AND_SET_MODE_INIT(FLAG) atomic_val_compare_exchange_8((FLAG), TD_MOD_UNINITIALIZED, TD_MOD_INITIALIZED)
#define TD_CHECK_AND_SET_MOD_CLEAR(FLAG) atomic_val_compare_exchange_8((FLAG), TD_MOD_UNCLEARD, TD_MOD_CLEARD)
#ifdef __cplusplus
}
#endif
#endif /*_TD_UTIL_MACRO_H_*/
\ No newline at end of file
......@@ -45,14 +45,25 @@ char *taosIpStr(uint32_t ipInt);
uint32_t ip2uint(const char *const ip_addr);
void taosIp2String(uint32_t ip, char *str);
void taosIpPort2String(uint32_t ip, uint16_t port, char *str);
int32_t taosGetFqdnPortFromEp(const char *ep, char *fqdn, uint16_t *port);
static FORCE_INLINE void taosEncryptPass(uint8_t *inBuf, size_t inLen, char *target) {
T_MD5_CTX context;
tMD5Init(&context);
tMD5Update(&context, inBuf, (unsigned int)inLen);
tMD5Final(&context);
memcpy(target, context.digest, TSDB_KEY_LEN);
memcpy(target, context.digest, tListLen(context.digest));
}
static FORCE_INLINE void taosEncryptPass_c(uint8_t *inBuf, size_t len, char *target) {
T_MD5_CTX context;
tMD5Init(&context);
tMD5Update(&context, inBuf, (unsigned int)len);
tMD5Final(&context);
sprintf(target, "%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x", context.digest[0], context.digest[1], context.digest[2],
context.digest[3], context.digest[4], context.digest[5], context.digest[6], context.digest[7],
context.digest[8], context.digest[9], context.digest[10], context.digest[11], context.digest[12],
context.digest[13], context.digest[14], context.digest[15]);
}
#ifdef __cplusplus
......
......@@ -2,9 +2,14 @@ aux_source_directory(src CLIENT_SRC)
add_library(taos ${CLIENT_SRC})
target_include_directories(
taos
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
PUBLIC "${CMAKE_SOURCE_DIR}/include/client"
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
)
target_link_libraries(
taos
PRIVATE common
INTERFACE api
PRIVATE os util common transport parser
)
ADD_SUBDIRECTORY(test)
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_CLIENTINT_H
#define TDENGINE_CLIENTINT_H
#ifdef __cplusplus
extern "C" {
#endif
#include "taos.h"
#include "taosmsg.h"
#include "thash.h"
#include "tlist.h"
#include "trpc.h"
#include "tdef.h"
#include "tmsgtype.h"
#include "tep.h"
typedef struct SQueryExecMetric {
int64_t start; // start timestamp
int64_t parsed; // start to parse
int64_t send; // start to send to server
int64_t rsp; // receive response from server
} SQueryExecMetric;
typedef struct SInstanceActivity {
uint64_t numOfInsertsReq;
uint64_t numOfInsertRows;
uint64_t insertElapsedTime;
uint64_t insertBytes; // submit to tsdb since launched.
uint64_t fetchBytes;
uint64_t queryElapsedTime;
uint64_t numOfSlowQueries;
uint64_t totalRequests;
uint64_t currentRequests; // the number of SRequestObj
} SInstanceActivity;
typedef struct SHeartBeatInfo {
void *pTimer; // timer, used to send request msg to mnode
} SHeartBeatInfo;
typedef struct SAppInstInfo {
int64_t numOfConns;
SCorEpSet mgmtEp;
SInstanceActivity summary;
SList *pConnList; // STscObj linked list
uint32_t clusterId;
void *pTransporter;
} SAppInstInfo;
typedef struct SAppInfo {
int64_t startTime;
char appName[TSDB_APP_NAME_LEN];
char *ep;
int32_t pid;
int32_t numOfThreads;
SHeartBeatInfo hb;
SHashObj *pInstMap;
} SAppInfo;
typedef struct STscObj {
char user[TSDB_USER_LEN];
char pass[TSDB_PASSWORD_LEN];
char acctId[TSDB_ACCT_ID_LEN];
char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN];
uint32_t connId;
uint64_t id; // ref ID returned by taosAddRef
// struct SSqlObj *sqlList;
void *pTransporter;
pthread_mutex_t mutex; // used to protect the operation on db
int32_t numOfReqs; // number of sqlObj from this tscObj
SAppInstInfo *pAppInfo;
} STscObj;
typedef struct SReqBody {
tsem_t rspSem; // not used now
void* fp;
void* param;
} SRequestBody;
typedef struct SRequestObj {
uint64_t requestId;
int32_t type; // request type
STscObj *pTscObj;
SQueryExecMetric metric;
char *sqlstr; // sql string
SRequestBody body;
int64_t self;
char *msgBuf;
int32_t code;
void *pInfo; // sql parse info, generated by parser module
} SRequestObj;
typedef struct SRequestMsgBody {
int32_t msgType;
void *pData;
int32_t msgLen;
uint64_t requestId;
uint64_t requestObjRefId;
} SRequestMsgBody;
extern SAppInfo appInfo;
extern int32_t tscReqRef;
extern void *tscQhandle;
extern int32_t tscConnRef;
extern int (*tscBuildMsg[TSDB_SQL_MAX])(SRequestObj *pRequest, SRequestMsgBody *pMsg);
extern int (*handleRequestRspFp[TSDB_SQL_MAX])(SRequestObj *pRequest, const char* pMsg, int32_t msgLen);
int taos_init();
void* createTscObj(const char* user, const char* auth, const char *ip, uint32_t port, SAppInstInfo* pAppInfo);
void destroyTscObj(void*pObj);
void* createRequest(STscObj* pObj, __taos_async_fn_t fp, void* param, int32_t type);
void destroyRequest(SRequestObj* pRequest);
TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, const char *auth, const char *db, uint16_t port);
void taos_init_imp(void);
int taos_options_imp(TSDB_OPTION option, const char *str);
void* openTransporter(const char *user, const char *auth);
void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet);
void initMsgHandleFp();
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_CLIENTINT_H
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_TSCLOG_H
#define TDENGINE_TSCLOG_H
#ifdef __cplusplus
extern "C" {
#endif
#include "tlog.h"
#define tscFatal(...) do { if (cDebugFlag & DEBUG_FATAL) { taosPrintLog("TSC FATAL ", cDebugFlag, __VA_ARGS__); }} while(0)
#define tscError(...) do { if (cDebugFlag & DEBUG_ERROR) { taosPrintLog("TSC ERROR ", cDebugFlag, __VA_ARGS__); }} while(0)
#define tscWarn(...) do { if (cDebugFlag & DEBUG_WARN) { taosPrintLog("TSC WARN ", cDebugFlag, __VA_ARGS__); }} while(0)
#define tscInfo(...) do { if (cDebugFlag & DEBUG_INFO) { taosPrintLog("TSC ", cDebugFlag, __VA_ARGS__); }} while(0)
#define tscDebug(...) do { if (cDebugFlag & DEBUG_DEBUG) { taosPrintLog("TSC ", cDebugFlag, __VA_ARGS__); }} while(0)
#define tscTrace(...) do { if (cDebugFlag & DEBUG_TRACE) { taosPrintLog("TSC ", cDebugFlag, __VA_ARGS__); }} while(0)
#define tscDebugL(...) do { if (cDebugFlag & DEBUG_DEBUG) { taosPrintLongString("TSC ", cDebugFlag, __VA_ARGS__); }} while(0)
#ifdef __cplusplus
}
#endif
#endif
......@@ -13,11 +13,51 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
//#include "taos.h"
#include "os.h"
#include "tdef.h"
#include "tglobal.h"
#include "clientInt.h"
#include "tscLog.h"
//TAOS_RES *taos_query(TAOS *taos, const char *sql) {
//
//}
TAOS *taos_connect(const char *ip, const char *user, const char *pass, const char *db, uint16_t port) {
int32_t p = (port != 0)? port:tsServerPort;
tscDebug("try to connect to %s:%u, user:%s db:%s", ip, p, user, db);
if (user == NULL) {
user = TSDB_DEFAULT_USER;
}
if (pass == NULL) {
pass = TSDB_DEFAULT_PASS;
}
return taos_connect_internal(ip, user, pass, NULL, db, p);
}
TAOS *taos_connect_auth(const char *ip, const char *user, const char *auth, const char *db, uint16_t port) {
tscDebug("try to connect to %s:%u by auth, user:%s db:%s", ip, port, user, db);
if (user == NULL) {
user = TSDB_DEFAULT_USER;
}
if (auth == NULL) {
tscError("No auth info is given, failed to connect to server");
return NULL;
}
return taos_connect_internal(ip, user, NULL, auth, db, port);
}
TAOS *taos_connect_l(const char *ip, int ipLen, const char *user, int userLen, const char *pass, int passLen, const char *db, int dbLen, uint16_t port) {
char ipStr[TSDB_EP_LEN] = {0};
char dbStr[TSDB_DB_NAME_LEN] = {0};
char userStr[TSDB_USER_LEN] = {0};
char passStr[TSDB_PASSWORD_LEN] = {0};
strncpy(ipStr, ip, MIN(TSDB_EP_LEN - 1, ipLen));
strncpy(userStr, user, MIN(TSDB_USER_LEN - 1, userLen));
strncpy(passStr, pass, MIN(TSDB_PASSWORD_LEN - 1, passLen));
strncpy(dbStr, db, MIN(TSDB_DB_NAME_LEN - 1, dbLen));
return taos_connect(ipStr, userStr, passStr, dbStr, port);
}
int taos_init() { return 0; }
void taos_cleanup(void) {}
#include <tpagedfile.h>
#include "clientInt.h"
#include "tdef.h"
#include "tep.h"
#include "tglobal.h"
#include "tmsgtype.h"
#include "tref.h"
#include "tscLog.h"
static int32_t initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet);
static int32_t buildConnectMsg(SRequestObj *pRequest, SRequestMsgBody* pMsgBody);
static void destroyConnectMsg(SRequestMsgBody* pMsgBody);
static int32_t sendMsgToServer(void *pTransporter, SEpSet* epSet, const SRequestMsgBody *pBody, int64_t* pTransporterId);
static bool stringLengthCheck(const char* str, size_t maxsize) {
if (str == NULL) {
return false;
}
size_t len = strlen(str);
if (len <= 0 || len > maxsize) {
return false;
}
return true;
}
static bool validateUserName(const char* user) {
return stringLengthCheck(user, TSDB_USER_LEN - 1);
}
static bool validatePassword(const char* passwd) {
return stringLengthCheck(passwd, TSDB_PASSWORD_LEN - 1);
}
static bool validateDbName(const char* db) {
return stringLengthCheck(db, TSDB_DB_NAME_LEN - 1);
}
static char* getClusterKey(const char* user, const char* auth, const char* ip, int32_t port) {
char key[512] = {0};
snprintf(key, sizeof(key), "%s:%s:%s:%d", user, auth, ip, port);
return strdup(key);
}
static STscObj* taosConnectImpl(const char *ip, const char *user, const char *auth, const char *db, uint16_t port, __taos_async_fn_t fp, void *param, SAppInstInfo* pAppInfo);
TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, const char *auth, const char *db, uint16_t port) {
if (taos_init() != TSDB_CODE_SUCCESS) {
return NULL;
}
if (!validateUserName(user)) {
terrno = TSDB_CODE_TSC_INVALID_USER_LENGTH;
return NULL;
}
char tmp[TSDB_DB_NAME_LEN] = {0};
if (db != NULL) {
if(!validateDbName(db)) {
terrno = TSDB_CODE_TSC_INVALID_DB_LENGTH;
return NULL;
}
tstrncpy(tmp, db, sizeof(tmp));
strdequote(tmp);
}
char secretEncrypt[32] = {0};
if (auth == NULL) {
if (!validatePassword(pass)) {
terrno = TSDB_CODE_TSC_INVALID_PASS_LENGTH;
return NULL;
}
taosEncryptPass_c((uint8_t *)pass, strlen(pass), secretEncrypt);
} else {
tstrncpy(secretEncrypt, auth, tListLen(secretEncrypt));
}
SCorEpSet epSet = {0};
if (ip) {
if (initEpSetFromCfg(ip, NULL, &epSet) < 0) {
return NULL;
}
if (port) {
epSet.epSet.port[0] = port;
}
} else {
if (initEpSetFromCfg(tsFirst, tsSecond, &epSet) < 0) {
return NULL;
}
}
char* key = getClusterKey(user, secretEncrypt, ip, port);
SAppInstInfo* pInst = taosHashGet(appInfo.pInstMap, key, strlen(key));
if (pInst == NULL) {
pInst = calloc(1, sizeof(struct SAppInstInfo));
pInst->mgmtEp = epSet;
pInst->pTransporter = openTransporter(user, secretEncrypt);
taosHashPut(appInfo.pInstMap, key, strlen(key), &pInst, POINTER_BYTES);
}
return taosConnectImpl(ip, user, &secretEncrypt[0], db, port, NULL, NULL, pInst);
}
int initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet) {
pEpSet->version = 0;
// init mgmt ip set
SEpSet *mgmtEpSet = &(pEpSet->epSet);
mgmtEpSet->numOfEps = 0;
mgmtEpSet->inUse = 0;
if (firstEp && firstEp[0] != 0) {
if (strlen(firstEp) >= TSDB_EP_LEN) {
terrno = TSDB_CODE_TSC_INVALID_FQDN;
return -1;
}
taosGetFqdnPortFromEp(firstEp, mgmtEpSet->fqdn[0], &(mgmtEpSet->port[0]));
mgmtEpSet->numOfEps++;
}
if (secondEp && secondEp[0] != 0) {
if (strlen(secondEp) >= TSDB_EP_LEN) {
terrno = TSDB_CODE_TSC_INVALID_FQDN;
return -1;
}
taosGetFqdnPortFromEp(secondEp, mgmtEpSet->fqdn[mgmtEpSet->numOfEps], &(mgmtEpSet->port[mgmtEpSet->numOfEps]));
mgmtEpSet->numOfEps++;
}
if (mgmtEpSet->numOfEps == 0) {
terrno = TSDB_CODE_TSC_INVALID_FQDN;
return -1;
}
return 0;
}
STscObj* taosConnectImpl(const char *ip, const char *user, const char *auth, const char *db, uint16_t port, __taos_async_fn_t fp, void *param, SAppInstInfo* pAppInfo) {
STscObj *pTscObj = createTscObj(user, auth, ip, port, pAppInfo);
if (NULL == pTscObj) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
return pTscObj;
}
SRequestObj *pRequest = createRequest(pTscObj, fp, param, TSDB_SQL_CONNECT);
if (pRequest == NULL) {
destroyTscObj(pTscObj);
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
return NULL;
}
SRequestMsgBody body = {0};
buildConnectMsg(pRequest, &body);
int64_t transporterId = 0;
sendMsgToServer(pTscObj->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &body, &transporterId);
tsem_wait(&pRequest->body.rspSem);
destroyConnectMsg(&body);
if (pRequest->code != TSDB_CODE_SUCCESS) {
const char *errorMsg = (pRequest->code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(terrno);
printf("failed to connect to server, reason: %s\n\n", errorMsg);
destroyRequest(pRequest);
taos_close(pTscObj);
pTscObj = NULL;
} else {
tscDebug("0x%"PRIx64" connection is opening, connId:%d, dnodeConn:%p", pTscObj->id, pTscObj->connId, pTscObj->pTransporter);
destroyRequest(pRequest);
}
return pTscObj;
}
static int32_t buildConnectMsg(SRequestObj *pRequest, SRequestMsgBody* pMsgBody) {
pMsgBody->msgType = TSDB_MSG_TYPE_CONNECT;
pMsgBody->msgLen = sizeof(SConnectMsg);
pMsgBody->requestObjRefId = pRequest->self;
SConnectMsg *pConnect = calloc(1, sizeof(SConnectMsg));
if (pConnect == NULL) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
return -1;
}
// TODO refactor full_name
char *db; // ugly code to move the space
STscObj *pObj = pRequest->pTscObj;
pthread_mutex_lock(&pObj->mutex);
db = strstr(pObj->db, TS_PATH_DELIMITER);
db = (db == NULL) ? pObj->db : db + 1;
tstrncpy(pConnect->db, db, sizeof(pConnect->db));
pthread_mutex_unlock(&pObj->mutex);
pConnect->pid = htonl(appInfo.pid);
pConnect->startTime = htobe64(appInfo.startTime);
tstrncpy(pConnect->app, appInfo.appName, tListLen(pConnect->app));
pMsgBody->pData = pConnect;
return 0;
}
static void destroyConnectMsg(SRequestMsgBody* pMsgBody) {
assert(pMsgBody != NULL);
tfree(pMsgBody->pData);
}
int32_t sendMsgToServer(void *pTransporter, SEpSet* epSet, const SRequestMsgBody *pBody, int64_t* pTransporterId) {
char *pMsg = rpcMallocCont(pBody->msgLen);
if (NULL == pMsg) {
tscError("0x%"PRIx64" msg:%s malloc failed", pBody->requestId, taosMsg[pBody->msgType]);
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
return -1;
}
memcpy(pMsg, pBody->pData, pBody->msgLen);
SRpcMsg rpcMsg = {
.msgType = pBody->msgType,
.pCont = pMsg,
.contLen = pBody->msgLen,
.ahandle = (void*) pBody->requestObjRefId,
.handle = NULL,
.code = 0
};
rpcSendRequest(pTransporter, epSet, &rpcMsg, pTransporterId);
return TSDB_CODE_SUCCESS;
}
void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
int64_t requestRefId = (int64_t)pMsg->ahandle;
SRequestObj *pRequest = (SRequestObj *)taosAcquireRef(tscReqRef, requestRefId);
if (pRequest == NULL) {
rpcFreeCont(pMsg->pCont);
return;
}
assert(pRequest->self == requestRefId);
pRequest->metric.rsp = taosGetTimestampMs();
pRequest->code = pMsg->code;
STscObj *pTscObj = pRequest->pTscObj;
if (pEpSet) {
if (!isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, pEpSet)) {
updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, pEpSet);
}
}
/*
* There is not response callback function for submit response.
* The actual inserted number of points is the first number.
*/
if (pMsg->code == TSDB_CODE_SUCCESS) {
tscDebug("0x%" PRIx64 " message:%s, code:%s rspLen:%d, elapsed:%"PRId64 " ms", pRequest->requestId, taosMsg[pMsg->msgType],
tstrerror(pMsg->code), pMsg->contLen, pRequest->metric.rsp - pRequest->metric.start);
if (handleRequestRspFp[pRequest->type]) {
pMsg->code = (*handleRequestRspFp[pRequest->type])(pRequest, pMsg->pCont, pMsg->contLen);
}
} else {
tscError("0x%" PRIx64 " SQL cmd:%s, code:%s rspLen:%d", pRequest->requestId, taosMsg[pMsg->msgType],
tstrerror(pMsg->code), pMsg->contLen);
}
taosReleaseRef(tscReqRef, requestRefId);
rpcFreeCont(pMsg->pCont);
sem_post(&pRequest->body.rspSem);
}
此差异已折叠。
#include "clientInt.h"
#include "trpc.h"
#include "os.h"
#include "taosmsg.h"
#include "tcache.h"
#include "tconfig.h"
#include "tglobal.h"
#include "tnote.h"
#include "tref.h"
#include "tscLog.h"
#include "tsched.h"
#include "ttime.h"
#include "ttimezone.h"
#define TSC_VAR_NOT_RELEASE 1
#define TSC_VAR_RELEASED 0
static int32_t sentinel = TSC_VAR_NOT_RELEASE;
static pthread_once_t tscinit = PTHREAD_ONCE_INIT;
extern int32_t tscInitRes;
int taos_options(TSDB_OPTION option, const void *arg, ...) {
static int32_t lock = 0;
for (int i = 1; atomic_val_compare_exchange_32(&lock, 0, 1) != 0; ++i) {
if (i % 1000 == 0) {
tscInfo("haven't acquire lock after spin %d times.", i);
sched_yield();
}
}
int ret = taos_options_imp(option, (const char*)arg);
atomic_store_32(&lock, 0);
return ret;
}
int taos_init() {
pthread_once(&tscinit, taos_init_imp);
return tscInitRes;
}
// this function may be called by user or system, or by both simultaneously.
void taos_cleanup(void) {
tscDebug("start to cleanup client environment");
if (atomic_val_compare_exchange_32(&sentinel, TSC_VAR_NOT_RELEASE, TSC_VAR_RELEASED) != TSC_VAR_NOT_RELEASE) {
return;
}
int32_t id = tscReqRef;
tscReqRef = -1;
taosCloseRef(id);
void* p = tscQhandle;
tscQhandle = NULL;
taosCleanUpScheduler(p);
id = tscConnRef;
tscConnRef = -1;
taosCloseRef(id);
rpcCleanup();
taosCloseLog();
}
void taos_close(TAOS* taos) {
if (taos == NULL) {
return;
}
STscObj *pTscObj = (STscObj *)taos;
tscDebug("0x%"PRIx64" try to close connection, numOfReq:%d", pTscObj->id, pTscObj->numOfReqs);
taosRemoveRef(tscConnRef, pTscObj->id);
}
const char *taos_errstr(TAOS_RES *res) {
}
void taos_free_result(TAOS_RES *res) {
}
\ No newline at end of file
此差异已折叠。
MESSAGE(STATUS "build parser unit test")
# GoogleTest requires at least C++11
SET(CMAKE_CXX_STANDARD 11)
AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
ADD_EXECUTABLE(clientTest ${SOURCE_LIST})
TARGET_LINK_LIBRARIES(
clientTest
PUBLIC os util common transport gtest taos
)
TARGET_INCLUDE_DIRECTORIES(
clientTest
PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/client/"
PRIVATE "${CMAKE_SOURCE_DIR}/source/libs/client/inc"
)
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <gtest/gtest.h>
#include <iostream>
#include "tglobal.h"
#pragma GCC diagnostic ignored "-Wwrite-strings"
#pragma GCC diagnostic ignored "-Wunused-function"
#pragma GCC diagnostic ignored "-Wunused-variable"
#pragma GCC diagnostic ignored "-Wsign-compare"
#include "taos.h"
namespace {
} // namespace
int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
TEST(testCase, driverInit_Test) {
TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
taos_close(pConn);
}
\ No newline at end of file
......@@ -20,8 +20,11 @@
extern "C" {
#endif
#ifdef __cplusplus
}
#endif
#endif /*_TD_COMMON_INT_H_*/
\ No newline at end of file
#endif /*_TD_COMMON_INT_H_*/
......@@ -16,3 +16,5 @@
#define TAOS_MESSAGE_C
#include "taosmsg.h"
#include "tep.h"
#include "tglobal.h"
#include "tlockfree.h"
int taosGetFqdnPortFromEp(const char *ep, char *fqdn, uint16_t *port) {
*port = 0;
strcpy(fqdn, ep);
char *temp = strchr(fqdn, ':');
if (temp) {
*temp = 0;
*port = atoi(temp+1);
}
if (*port == 0) {
*port = tsServerPort;
return -1;
}
return 0;
}
bool isEpsetEqual(const SEpSet *s1, const SEpSet *s2) {
if (s1->numOfEps != s2->numOfEps || s1->inUse != s2->inUse) {
return false;
}
for (int32_t i = 0; i < s1->numOfEps; i++) {
if (s1->port[i] != s2->port[i]
|| strncmp(s1->fqdn[i], s2->fqdn[i], TSDB_FQDN_LEN) != 0)
return false;
}
return true;
}
void updateEpSet_s(SCorEpSet *pEpSet, SEpSet *pNewEpSet) {
taosCorBeginWrite(&pEpSet->version);
pEpSet->epSet = *pNewEpSet;
taosCorEndWrite(&pEpSet->version);
}
此差异已折叠。
......@@ -112,12 +112,12 @@ int dmnReadConfig(const char *path) {
return -1;
}
if (taosReadGlobalCfg() != 0) {
if (taosReadCfgFromFile() != 0) {
uError("failed to read global config");
return -1;
}
if (taosCheckGlobalCfg() != 0) {
if (taosCheckAndPrintCfg() != 0) {
uError("failed to check global config");
return -1;
}
......
......@@ -17,6 +17,7 @@
#include "dndDnode.h"
#include "dndTransport.h"
#include "dndVnodes.h"
#include "tep.h"
int32_t dndGetDnodeId(SDnode *pDnode) {
SDnodeMgmt *pMgmt = &pDnode->dmgmt;
......
......@@ -72,7 +72,7 @@ static void dndInitMsgFp(STransMgmt *pMgmt) {
pMgmt->msgFp[TSDB_MSG_TYPE_CREATE_STB] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_ALTER_STB] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_DROP_STB] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_STB_VGROUP] = dndProcessMnodeReadMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_VGROUP_LIST] = dndProcessMnodeReadMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_KILL_QUERY] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_KILL_STREAM] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_KILL_CONN] = dndProcessMnodeWriteMsg;
......@@ -235,18 +235,18 @@ static void dndSendMsgToMnodeRecv(SDnode *pDnode, SRpcMsg *pRpcMsg, SRpcMsg *pRp
static int32_t dndAuthInternalMsg(SDnode *pDnode, char *user, char *spi, char *encrypt, char *secret, char *ckey) {
if (strcmp(user, INTERNAL_USER) == 0) {
// A simple temporary implementation
char pass[32] = {0};
char pass[TSDB_PASSWORD_LEN] = {0};
taosEncryptPass((uint8_t *)(INTERNAL_SECRET), strlen(INTERNAL_SECRET), pass);
memcpy(secret, pass, TSDB_KEY_LEN);
memcpy(secret, pass, TSDB_PASSWORD_LEN);
*spi = 0;
*encrypt = 0;
*ckey = 0;
return 0;
} else if (strcmp(user, TSDB_NETTEST_USER) == 0) {
// A simple temporary implementation
char pass[32] = {0};
char pass[TSDB_PASSWORD_LEN] = {0};
taosEncryptPass((uint8_t *)(TSDB_NETTEST_USER), strlen(TSDB_NETTEST_USER), pass);
memcpy(secret, pass, TSDB_KEY_LEN);
memcpy(secret, pass, TSDB_PASSWORD_LEN);
*spi = 0;
*encrypt = 0;
*ckey = 0;
......@@ -288,8 +288,8 @@ static int32_t dndRetrieveUserAuthInfo(void *parent, char *user, char *spi, char
dError("user:%s, failed to get user auth from other mnodes since %s", user, terrstr());
} else {
SAuthRsp *pRsp = rpcRsp.pCont;
memcpy(secret, pRsp->secret, TSDB_KEY_LEN);
memcpy(ckey, pRsp->ckey, TSDB_KEY_LEN);
memcpy(secret, pRsp->secret, TSDB_PASSWORD_LEN);
memcpy(ckey, pRsp->ckey, TSDB_PASSWORD_LEN);
*spi = pRsp->spi;
*encrypt = pRsp->encrypt;
dDebug("user:%s, success to get user auth from other mnodes", user);
......@@ -368,4 +368,4 @@ void dndSendMsgToMnode(SDnode *pDnode, SRpcMsg *pMsg) {
SEpSet epSet = {0};
dndGetMnodeEpSet(pDnode, &epSet);
dndSendMsgToDnode(pDnode, &epSet, pMsg);
}
\ No newline at end of file
}
......@@ -120,7 +120,7 @@ SClient* createClient(const char* user, const char* pass, const char* fqdn, uint
SClient* pClient = (SClient*)calloc(1, sizeof(SClient));
ASSERT(pClient);
char secretEncrypt[32] = {0};
char secretEncrypt[TSDB_PASSWORD_LEN] = {0};
taosEncryptPass((uint8_t*)pass, strlen(pass), secretEncrypt);
SRpcInit rpcInit;
......
......@@ -180,13 +180,11 @@ typedef struct SAcctObj {
typedef struct SUserObj {
char user[TSDB_USER_LEN];
char pass[TSDB_KEY_LEN];
char pass[TSDB_PASSWORD_LEN];
char acct[TSDB_USER_LEN];
int64_t createdTime;
int64_t updateTime;
int8_t superAuth;
int8_t readAuth;
int8_t writeAuth;
int8_t superUser;
int32_t acctId;
SHashObj *prohibitDbHash;
} SUserObj;
......
......@@ -65,11 +65,9 @@ static int32_t mndCreateDefaultUser(SMnode *pMnode, char *acct, char *user, char
taosEncryptPass((uint8_t *)pass, strlen(pass), userObj.pass);
userObj.createdTime = taosGetTimestampMs();
userObj.updateTime = userObj.createdTime;
userObj.readAuth = 1;
userObj.writeAuth = 1;
if (strcmp(user, TSDB_DEFAULT_USER) == 0) {
userObj.superAuth = 1;
userObj.superUser = 1;
}
SSdbRaw *pRaw = mndUserActionEncode(&userObj);
......@@ -89,7 +87,7 @@ static int32_t mndCreateDefaultUsers(SMnode *pMnode) {
if (mndCreateDefaultUser(pMnode, TSDB_DEFAULT_USER, "_" TSDB_DEFAULT_USER, TSDB_DEFAULT_PASS) != 0) {
return -1;
}
#endif
#endif
return 0;
}
......@@ -100,13 +98,11 @@ static SSdbRaw *mndUserActionEncode(SUserObj *pUser) {
int32_t dataPos = 0;
SDB_SET_BINARY(pRaw, dataPos, pUser->user, TSDB_USER_LEN)
SDB_SET_BINARY(pRaw, dataPos, pUser->pass, TSDB_KEY_LEN)
SDB_SET_BINARY(pRaw, dataPos, pUser->pass, TSDB_PASSWORD_LEN)
SDB_SET_BINARY(pRaw, dataPos, pUser->acct, TSDB_USER_LEN)
SDB_SET_INT64(pRaw, dataPos, pUser->createdTime)
SDB_SET_INT64(pRaw, dataPos, pUser->updateTime)
SDB_SET_INT8(pRaw, dataPos, pUser->superAuth)
SDB_SET_INT8(pRaw, dataPos, pUser->readAuth)
SDB_SET_INT8(pRaw, dataPos, pUser->writeAuth)
SDB_SET_INT8(pRaw, dataPos, pUser->superUser)
SDB_SET_DATALEN(pRaw, dataPos);
return pRaw;
......@@ -128,13 +124,11 @@ static SSdbRow *mndUserActionDecode(SSdbRaw *pRaw) {
int32_t dataPos = 0;
SDB_GET_BINARY(pRaw, pRow, dataPos, pUser->user, TSDB_USER_LEN)
SDB_GET_BINARY(pRaw, pRow, dataPos, pUser->pass, TSDB_KEY_LEN)
SDB_GET_BINARY(pRaw, pRow, dataPos, pUser->pass, TSDB_PASSWORD_LEN)
SDB_GET_BINARY(pRaw, pRow, dataPos, pUser->acct, TSDB_USER_LEN)
SDB_GET_INT64(pRaw, pRow, dataPos, &pUser->createdTime)
SDB_GET_INT64(pRaw, pRow, dataPos, &pUser->updateTime)
SDB_GET_INT8(pRaw, pRow, dataPos, &pUser->superAuth)
SDB_GET_INT8(pRaw, pRow, dataPos, &pUser->readAuth)
SDB_GET_INT8(pRaw, pRow, dataPos, &pUser->writeAuth)
SDB_GET_INT8(pRaw, pRow, dataPos, &pUser->superUser)
return pRow;
}
......@@ -173,13 +167,11 @@ static int32_t mndUserActionDelete(SSdb *pSdb, SUserObj *pUser) {
static int32_t mndUserActionUpdate(SSdb *pSdb, SUserObj *pOldUser, SUserObj *pNewUser) {
mTrace("user:%s, perform update action", pOldUser->user);
memcpy(pOldUser->user, pNewUser->user, TSDB_USER_LEN);
memcpy(pOldUser->pass, pNewUser->pass, TSDB_KEY_LEN);
memcpy(pOldUser->pass, pNewUser->pass, TSDB_PASSWORD_LEN);
memcpy(pOldUser->acct, pNewUser->acct, TSDB_USER_LEN);
pOldUser->createdTime = pNewUser->createdTime;
pOldUser->updateTime = pNewUser->updateTime;
pOldUser->superAuth = pNewUser->superAuth;
pOldUser->readAuth = pNewUser->readAuth;
pOldUser->writeAuth = pNewUser->writeAuth;
pOldUser->superUser = pNewUser->superUser;
return 0;
}
......@@ -200,9 +192,7 @@ static int32_t mndCreateUser(SMnode *pMnode, char *acct, char *user, char *pass,
taosEncryptPass((uint8_t *)pass, strlen(pass), userObj.pass);
userObj.createdTime = taosGetTimestampMs();
userObj.updateTime = userObj.createdTime;
userObj.superAuth = 0;
userObj.readAuth = 1;
userObj.writeAuth = 1;
userObj.superUser = 0;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle);
if (pTrans == NULL) {
......@@ -475,7 +465,7 @@ static int32_t mndRetrieveUsers(SMnodeMsg *pMsg, SShowObj *pShow, char *data, in
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
if (pUser->superAuth) {
if (pUser->superUser) {
const char *src = "super";
STR_WITH_SIZE_TO_VARSTR(pWrite, src, strlen(src));
} else {
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册