diff --git a/include/client/taos.h b/include/client/taos.h index 0f7edc9fedba746c1f5510063b0acc4bd8dea95b..bfb557a23969cbb735b69ff25d00af09e2486daf 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -73,10 +73,13 @@ typedef struct taosField { #define DLL_EXPORT #endif +typedef void (*__taos_async_fn_t)(void *param, TAOS_RES *, int code); + DLL_EXPORT int taos_init(); DLL_EXPORT void taos_cleanup(void); DLL_EXPORT int taos_options(TSDB_OPTION option, const void *arg, ...); DLL_EXPORT TAOS *taos_connect(const char *ip, const char *user, const char *pass, const char *db, uint16_t port); +DLL_EXPORT TAOS *taos_connect_l(const char *ip, int ipLen, const char *user, int userLen, const char *pass, int passLen, const char *db, int dbLen, uint16_t port); DLL_EXPORT TAOS *taos_connect_auth(const char *ip, const char *user, const char *auth, const char *db, uint16_t port); DLL_EXPORT void taos_close(TAOS *taos); @@ -154,14 +157,14 @@ DLL_EXPORT int* taos_fetch_lengths(TAOS_RES *res); // TAOS_RES *taos_list_dbs(TAOS *mysql, const char *wild); // TODO: the return value should be `const` -DLL_EXPORT char *taos_get_server_info(TAOS *taos); -DLL_EXPORT char *taos_get_client_info(); -DLL_EXPORT char *taos_errstr(TAOS_RES *tres); +DLL_EXPORT const char *taos_get_server_info(TAOS *taos); +DLL_EXPORT const char *taos_get_client_info(); +DLL_EXPORT const char *taos_errstr(TAOS_RES *tres); DLL_EXPORT int taos_errno(TAOS_RES *tres); -DLL_EXPORT void taos_query_a(TAOS *taos, const char *sql, void (*fp)(void *param, TAOS_RES *, int code), void *param); -DLL_EXPORT void taos_fetch_rows_a(TAOS_RES *res, void (*fp)(void *param, TAOS_RES *, int numOfRows), void *param); +DLL_EXPORT void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param); +DLL_EXPORT void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param); typedef void (*TAOS_SUBSCRIBE_CALLBACK)(TAOS_SUB* tsub, TAOS_RES *res, void* param, int code); DLL_EXPORT TAOS_SUB *taos_subscribe(TAOS* taos, int restart, const char* topic, const char *sql, TAOS_SUBSCRIBE_CALLBACK fp, void *param, int interval); diff --git a/include/common/tep.h b/include/common/tep.h new file mode 100644 index 0000000000000000000000000000000000000000..bdc25f6b82abb69600d371d76521417ed4588c3a --- /dev/null +++ b/include/common/tep.h @@ -0,0 +1,6 @@ +#ifndef TDENGINE_TEP_H +#define TDENGINE_TEP_H + +int taosGetFqdnPortFromEp(const char *ep, char *fqdn, uint16_t *port); + +#endif // TDENGINE_TEP_H diff --git a/include/common/tglobal.h b/include/common/tglobal.h index d0f95b786a1b88bb8bba580fbe9073bac12252fa..df5f6b7c22100d2d54df78753ef66266056ac677 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -114,16 +114,8 @@ extern int8_t tsEnableSlaveQuery; extern int8_t tsEnableAdjustMaster; // restful -extern int8_t tsEnableHttpModule; extern int32_t tsRestRowLimit; -extern uint16_t tsHttpPort; -extern int32_t tsHttpCacheSessions; -extern int32_t tsHttpSessionExpire; -extern int32_t tsHttpMaxThreads; -extern int8_t tsHttpEnableCompress; -extern int8_t tsHttpEnableRecordSql; extern int8_t tsTelegrafUseFieldNum; -extern int8_t tsHttpDbNameMandatory; // mqtt extern int8_t tsEnableMqttModule; @@ -145,7 +137,6 @@ extern int8_t tsEnableStream; // internal extern int8_t tsPrintAuth; -extern int8_t tscEmbedded; extern char tsVnodeDir[]; extern char tsMnodeDir[]; extern int64_t tsTickPerDay[3]; @@ -196,7 +187,6 @@ extern SDiskCfg tsDiskCfg[]; void taosInitGlobalCfg(); int32_t taosCheckGlobalCfg(); int32_t taosCfgDynamicOptions(char *msg); -int taosGetFqdnPortFromEp(const char *ep, char *fqdn, uint16_t *port); bool taosCheckBalanceCfgOptions(const char *option, int32_t *vnodeId, int32_t *dnodeId); void taosAddDataDir(int index, char *v1, int level, int primary); void taosReadDataDirCfg(char *v1, char *v2, char *v3); diff --git a/include/util/tdef.h b/include/util/tdef.h index 897f51f5c119cdecacb9b335d812b9032445cd64..d227888582a5f7e55856ebf8be0d4d4b037d229a 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -161,7 +161,7 @@ do { \ #define TSDB_NODE_NAME_LEN 64 #define TSDB_TABLE_NAME_LEN 193 // it is a null-terminated string -#define TSDB_DB_NAME_LEN 33 +#define TSDB_DB_NAME_LEN 65 #define TSDB_FULL_DB_NAME_LEN (TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN) #define TSDB_FUNC_NAME_LEN 65 #define TSDB_FUNC_CODE_LEN (65535 - 512) @@ -188,7 +188,7 @@ do { \ #define TSDB_MAX_TAG_CONDITIONS 1024 #define TSDB_AUTH_LEN 16 -#define TSDB_KEY_LEN 16 +#define TSDB_KEY_LEN 64 #define TSDB_VERSION_LEN 12 #define TSDB_LABEL_LEN 8 diff --git a/source/client/CMakeLists.txt b/source/client/CMakeLists.txt index bc0d439407f9e77f9e54de716e32663e24e5b767..5a47602dcd34c532565e9a7365810b81d661225c 100644 --- a/source/client/CMakeLists.txt +++ b/source/client/CMakeLists.txt @@ -2,9 +2,11 @@ aux_source_directory(src CLIENT_SRC) add_library(taos ${CLIENT_SRC}) target_include_directories( taos - PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" + PUBLIC "${CMAKE_SOURCE_DIR}/include/client" + PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" ) target_link_libraries( taos INTERFACE api + PRIVATE os util common transport parser ) diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h new file mode 100644 index 0000000000000000000000000000000000000000..27a1c92d923c2a9496b1a725cb033a3fc15212ac --- /dev/null +++ b/source/client/inc/clientInt.h @@ -0,0 +1,114 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_CLIENTINT_H +#define TDENGINE_CLIENTINT_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include "taos.h" +#include "taosmsg.h" +#include "thash.h" +#include "tlist.h" +#include "trpc.h" +#include "tdef.h" + +typedef struct SQueryExecMetric { + int64_t start; // start timestamp + int64_t parsed; // start to parse + int64_t send; // start to send to server + int64_t rsp; // receive response from server +} SQueryExecMetric; + +typedef struct SInstanceActivity { + uint64_t numOfInsertsReq; + uint64_t numOfInsertRows; + uint64_t insertElapsedTime; + uint64_t insertBytes; // submit to tsdb since launched. + + uint64_t fetchBytes; + uint64_t queryElapsedTime; + uint64_t numOfSlowQueries; + uint64_t totalRequests; + uint64_t currentRequests; // the number of SRequestObj +} SInstanceActivity; + +typedef struct SHeartBeatInfo { + void *pTimer; // timer, used to send request msg to mnode +} SHeartBeatInfo; + +typedef struct SAppInstInfo { + int64_t numOfConns; + SRpcCorEpSet mgmtEp; + SInstanceActivity summary; + SList *pConnList; // STscObj linked list + char clusterId[TSDB_CLUSTER_ID_LEN]; +} SAppInstInfo; + +typedef struct SAppInfo { + int64_t startTime; + char appName[TSDB_APPNAME_LEN]; + char *ep; + int32_t pid; + int32_t numOfThreads; + SHeartBeatInfo hb; + SHashObj *pInstMap; +} SAppInfo; + +typedef struct STscObj { + char user[TSDB_USER_LEN]; + char pass[TSDB_KEY_LEN]; + char acctId[TSDB_ACCT_ID_LEN]; + char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN]; + uint32_t connId; + uint64_t id; // ref ID returned by taosAddRef +// struct SSqlObj *sqlList; +// SRpcObj *pRpcObj; + pthread_mutex_t mutex; // used to protect the operation on db + int32_t numOfReqs; // number of sqlObj from this tscObj + SAppInstInfo *pAppInfo; +} STscObj; + +typedef struct SReqBody { + tsem_t rspSem; // not used now + void* fp; + void* param; +} SRequestBody; + +typedef struct SRequestObj { + uint64_t requestId; + int32_t type; // request type + STscObj *pTscObj; + SQueryExecMetric metric; + char *sqlstr; // sql string + SRequestBody body; + int64_t self; + char *msgBuf; + int32_t code; + void *pInfo; // sql parse info, generated by parser module +} SRequestObj; + +void* createTscObj(const char* user, const char* auth, const char *ip, uint32_t port); +void destroyTscObj(void* pTscObj); + +TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, const char *auth, const char *db, uint16_t port); + +#ifdef __cplusplus +} +#endif + +#endif // TDENGINE_CLIENTINT_H diff --git a/source/client/inc/tscLog.h b/source/client/inc/tscLog.h new file mode 100644 index 0000000000000000000000000000000000000000..f205a50227308e71e27e9715e9f819078be5a20a --- /dev/null +++ b/source/client/inc/tscLog.h @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_TSCLOG_H +#define TDENGINE_TSCLOG_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include "tlog.h" + +#define tscFatal(...) do { if (cDebugFlag & DEBUG_FATAL) { taosPrintLog("TSC FATAL ", cDebugFlag, __VA_ARGS__); }} while(0) +#define tscError(...) do { if (cDebugFlag & DEBUG_ERROR) { taosPrintLog("TSC ERROR ", cDebugFlag, __VA_ARGS__); }} while(0) +#define tscWarn(...) do { if (cDebugFlag & DEBUG_WARN) { taosPrintLog("TSC WARN ", cDebugFlag, __VA_ARGS__); }} while(0) +#define tscInfo(...) do { if (cDebugFlag & DEBUG_INFO) { taosPrintLog("TSC ", cDebugFlag, __VA_ARGS__); }} while(0) +#define tscDebug(...) do { if (cDebugFlag & DEBUG_DEBUG) { taosPrintLog("TSC ", cDebugFlag, __VA_ARGS__); }} while(0) +#define tscTrace(...) do { if (cDebugFlag & DEBUG_TRACE) { taosPrintLog("TSC ", cDebugFlag, __VA_ARGS__); }} while(0) +#define tscDebugL(...) do { if (cDebugFlag & DEBUG_DEBUG) { taosPrintLongString("TSC ", cDebugFlag, __VA_ARGS__); }} while(0) + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c new file mode 100644 index 0000000000000000000000000000000000000000..e3e6e8d3aefb2ed6095a89d9aaeffc3e5f338092 --- /dev/null +++ b/source/client/src/clientImpl.c @@ -0,0 +1,190 @@ +#include "tglobal.h" +#include "clientInt.h" +#include "tdef.h" +#include "tep.h" +#include "tmsgtype.h" +#include "tref.h" +#include "tscLog.h" + +static int initEpSetFromCfg(const char *firstEp, const char *secondEp, SRpcCorEpSet *pEpSet); + +static bool stringLengthCheck(const char* str, size_t maxsize) { + if (str == NULL) { + return false; + } + + size_t len = strlen(str); + if (len <= 0 || len > maxsize) { + return false; + } + + return true; +} + +static bool validateUserName(const char* user) { + return stringLengthCheck(user, TSDB_USER_LEN - 1); +} + +static bool validatePassword(const char* passwd) { + return stringLengthCheck(passwd, TSDB_KEY_LEN - 1); +} + +static bool validateDbName(const char* db) { + return stringLengthCheck(db, TSDB_DB_NAME_LEN - 1); +} + +static SRequestObj* taosConnectImpl(const char *ip, const char *user, const char *auth, const char *db, uint16_t port, __taos_async_fn_t fp, void *param); + +TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, const char *auth, const char *db, uint16_t port) { + STscObj *pObj = NULL; + + if (!validateUserName(user)) { + terrno = TSDB_CODE_TSC_INVALID_USER_LENGTH; + return NULL; + } + + char tmp[TSDB_DB_NAME_LEN] = {0}; + if (db != NULL) { + if(!validateDbName(db)) { + terrno = TSDB_CODE_TSC_INVALID_DB_LENGTH; + return NULL; + } + + tstrncpy(tmp, db, sizeof(tmp)); + strdequote(tmp); + } + + char secretEncrypt[32] = {0}; + if (auth == NULL) { + if (!validatePassword(pass)) { + terrno = TSDB_CODE_TSC_INVALID_PASS_LENGTH; + return NULL; + } + + taosEncryptPass((uint8_t *)pass, strlen(pass), secretEncrypt); + } else { + tstrncpy(secretEncrypt, auth, tListLen(secretEncrypt)); + } + + SRpcCorEpSet epSet; + if (ip) { + if (initEpSetFromCfg(ip, NULL, &epSet) < 0) { + return NULL; + } + + if (port) { + epSet.epSet.port[0] = port; + } + } else { + if (initEpSetFromCfg(tsFirst, tsSecond, &epSet) < 0) { + return NULL; + } + } + + SRequestObj *pRequest = taosConnectImpl(ip, user, auth, db, port, NULL, NULL); + if (pRequest != NULL) { + pObj = pRequest->pTscObj; + + pRequest->body.fp = NULL; + pRequest->body.param = pRequest; + +// tscBuildAndSendRequest(pRequest, NULL); + tsem_wait(&pRequest->body.rspSem); + + if (pRequest->code != TSDB_CODE_SUCCESS) { + if (pRequest->code == TSDB_CODE_RPC_FQDN_ERROR) { + printf("taos connect failed, reason: %s\n\n", taos_errstr(pRequest)); + } else { + printf("taos connect failed, reason: %s.\n\n", tstrerror(terrno)); + } + + taos_free_result(pRequest); + taos_close(pObj); + return NULL; + } + +// tscDebug("%p DB connection is opening, rpcObj: %p, dnodeConn:%p", pObj, pObj->pRpcObj, pObj->pRpcObj->pDnodeConn); + taos_free_result(pRequest); + return pObj; + } + + return NULL; +} + +int initEpSetFromCfg(const char *firstEp, const char *secondEp, SRpcCorEpSet *pEpSet) { + pEpSet->version = 0; + + // init mgmt ip set + SEpSet *mgmtEpSet = &(pEpSet->epSet); + mgmtEpSet->numOfEps = 0; + mgmtEpSet->inUse = 0; + + if (firstEp && firstEp[0] != 0) { + if (strlen(firstEp) >= TSDB_EP_LEN) { + terrno = TSDB_CODE_TSC_INVALID_FQDN; + return -1; + } + + taosGetFqdnPortFromEp(firstEp, mgmtEpSet->fqdn[0], &(mgmtEpSet->port[0])); + mgmtEpSet->numOfEps++; + } + + if (secondEp && secondEp[0] != 0) { + if (strlen(secondEp) >= TSDB_EP_LEN) { + terrno = TSDB_CODE_TSC_INVALID_FQDN; + return -1; + } + + taosGetFqdnPortFromEp(secondEp, mgmtEpSet->fqdn[mgmtEpSet->numOfEps], &(mgmtEpSet->port[mgmtEpSet->numOfEps])); + mgmtEpSet->numOfEps++; + } + + if (mgmtEpSet->numOfEps == 0) { + terrno = TSDB_CODE_TSC_INVALID_FQDN; + return -1; + } + + return 0; +} + +SRequestObj* taosConnectImpl(const char *ip, const char *user, const char *auth, const char *db, uint16_t port, __taos_async_fn_t fp, void *param) { + if (taos_init() != TSDB_CODE_SUCCESS) { + return NULL; + } + + STscObj *pObj = createTscObj(user, auth, ip, port); + if (NULL == pObj) { + terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; + return NULL; + } + + SRequestObj *pRequest = (SRequestObj *)calloc(1, sizeof(SRequestObj)); + if (NULL == pRequest) { + terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; + free(pObj); + return NULL; + } + + void *pRpcObj = NULL; + + char rpcKey[512] = {0}; + snprintf(rpcKey, sizeof(rpcKey), "%s:%s:%s:%d", user, auth, ip, port); + if (tscAcquireRpc(rpcKey, user, auth, &pRpcObj) != 0) { + terrno = TSDB_CODE_RPC_NETWORK_UNAVAIL; + return NULL; + } + + pObj->pRpcObj = (SRpcObj *)pRpcObj; + + pRequest->pTscObj = pObj; + pRequest->body.fp = fp; + pRequest->body.param = param; + pRequest->type = TSDB_SQL_CONNECT; + + tsem_init(&pRequest->body.rspSem, 0, 0); + + pObj->id = taosAddRef(tscConn, pObj); + registerSqlObj(pRequest); + + return pRequest; +} \ No newline at end of file diff --git a/source/client/src/tscEnv.c b/source/client/src/tscEnv.c new file mode 100644 index 0000000000000000000000000000000000000000..d77d8b1bece4f6a11c3d8d7085ce66d2628b98d1 --- /dev/null +++ b/source/client/src/tscEnv.c @@ -0,0 +1,487 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "clientInt.h" +#include "trpc.h" +#include "os.h" +#include "taosmsg.h" +#include "tcache.h" +#include "tconfig.h" +#include "tglobal.h" +#include "tnote.h" +#include "tref.h" +#include "tscLog.h" +#include "tsched.h" +#include "ttime.h" +#include "ttimezone.h" + +#define TSC_VAR_NOT_RELEASE 1 +#define TSC_VAR_RELEASED 0 + +SAppInfo appInfo; +int32_t sentinel = TSC_VAR_NOT_RELEASE; + +int32_t tscReqRef = -1; +void *tscQhandle; +int32_t tscConnRef = -1; +void *tscRpcCache; // TODO removed from here. + +static pthread_mutex_t rpcObjMutex; // mutex to protect open the rpc obj concurrently +static pthread_once_t tscinit = PTHREAD_ONCE_INIT; +static pthread_mutex_t setConfMutex = PTHREAD_MUTEX_INITIALIZER; + +// pthread_once can not return result code, so result code is set to a global variable. +static volatile int tscInitRes = 0; + +void tscFreeRpcObj(void *param) { +#if 0 + assert(param); + SRpcObj *pRpcObj = (SRpcObj *)(param); + tscDebug("free rpcObj:%p and free pDnodeConn: %p", pRpcObj, pRpcObj->pDnodeConn); + rpcClose(pRpcObj->pDnodeConn); +#endif +} + +void tscReleaseRpc(void *param) { + if (param == NULL) { + return; + } + + taosCacheRelease(tscRpcCache, (void *)¶m, false); +} + +void* tscAcquireRpc(const char *key, const char *user, const char *secretEncrypt) { +#if 0 + SRpcObj *pRpcObj = (SRpcObj *)taosCacheAcquireByKey(tscRpcCache, key, strlen(key)); + pthread_mutex_lock(&rpcObjMutex); + if (pRpcObj != NULL) { + pthread_mutex_unlock(&rpcObjMutex); + return pRpcObj; + } + + SRpcInit rpcInit; + memset(&rpcInit, 0, sizeof(rpcInit)); + rpcInit.localPort = 0; + rpcInit.label = "TSC"; + rpcInit.numOfThreads = tscNumOfThreads; + rpcInit.cfp = tscProcessMsgFromServer; + rpcInit.sessions = tsMaxConnections; + rpcInit.connType = TAOS_CONN_CLIENT; + rpcInit.user = (char *)user; + rpcInit.idleTime = tsShellActivityTimer * 1000; + rpcInit.ckey = "key"; + rpcInit.spi = 1; + rpcInit.secret = (char *)secretEncrypt; + + SRpcObj rpcObj = {0}; + strncpy(rpcObj.key, key, strlen(key)); + rpcObj.pDnodeConn = rpcOpen(&rpcInit); + if (rpcObj.pDnodeConn == NULL) { + pthread_mutex_unlock(&rpcObjMutex); + tscError("failed to init connection to server"); + return NULL; + } + + pRpcObj = taosCachePut(tscRpcCache, rpcObj.key, strlen(rpcObj.key), &rpcObj, sizeof(rpcObj), 1000*5); + if (pRpcObj == NULL) { + rpcClose(rpcObj.pDnodeConn); + pthread_mutex_unlock(&rpcObjMutex); + return NULL; + } + + pthread_mutex_unlock(&rpcObjMutex); + return pRpcObj; +#endif + +} + +void destroyTscObj(void *pTscObj) { + STscObj *pObj = pTscObj; +// tfree(pObj->tscCorMgmtEpSet); +// tscReleaseRpc(pObj->pRpcObj); + pthread_mutex_destroy(&pObj->mutex); + tfree(pObj); +} + +void* createTscObj(const char* user, const char* auth, const char *ip, uint32_t port) { + STscObj *pObj = (STscObj *)calloc(1, sizeof(STscObj)); + if (NULL == pObj) { + terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; + return NULL; + } + +// char rpcKey[512] = {0}; +// snprintf(rpcKey, sizeof(rpcKey), "%s:%s:%s:%d", user, auth, ip, port); + +// pObj->tscCorMgmtEpSet = malloc(sizeof(SRpcCorEpSet)); +// if (pObj->tscCorMgmtEpSet == NULL) { +// terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; +// free(pObj); +// return NULL; +// } +// +// memcpy(pObj->tscCorMgmtEpSet, &corMgmtEpSet, sizeof(corMgmtEpSet)); + + tstrncpy(pObj->user, user, sizeof(pObj->user)); + int32_t len = MIN(strlen(auth) + 1, sizeof(pObj->pass)); + tstrncpy(pObj->pass, auth, len); + + pthread_mutex_init(&pObj->mutex, NULL); +} + +static void tscInitLogFile() { + taosReadGlobalLogCfg(); + if (mkdir(tsLogDir, 0755) != 0 && errno != EEXIST) { + printf("failed to create log dir:%s\n", tsLogDir); + } + + const char *defaultLogFileNamePrefix = "taoslog"; + const int32_t maxLogFileNum = 10; + + char temp[128] = {0}; + sprintf(temp, "%s/%s", tsLogDir, defaultLogFileNamePrefix); + if (taosInitLog(temp, tsNumOfLogLines, maxLogFileNum) < 0) { + printf("failed to open log file in directory:%s\n", tsLogDir); + } +} + +void taos_init_imp(void) { + // In the APIs of other program language, taos_cleanup is not available yet. + // So, to make sure taos_cleanup will be invoked to clean up the allocated resource to suppress the valgrind warning. + atexit(taos_cleanup); + + errno = TSDB_CODE_SUCCESS; + srand(taosGetTimestampSec()); + + deltaToUtcInitOnce(); + taosInitGlobalCfg(); + taosReadGlobalCfg(); + + tscInitLogFile(); + + if (taosCheckGlobalCfg()) { + tscInitRes = -1; + return; + } + + taosInitNotes(); + rpcInit(); + + tscDebug("starting to initialize TAOS client ..."); + tscDebug("Local End Point is:%s", tsLocalEp); + + taosSetCoreDump(true); + + double factor = 4.0; + int32_t numOfThreads = MAX((int)(tsNumOfCores * tsNumOfThreadsPerCore / factor), 2); + + int32_t queueSize = tsMaxConnections * 2; + tscQhandle = taosInitScheduler(queueSize, numOfThreads, "tsc"); + if (NULL == tscQhandle) { + tscError("failed to init task queue"); + tscInitRes = -1; + return; + } + + tscDebug("client task queue is initialized, numOfWorkers: %d", numOfThreads); + + int refreshTime = 5; + tscRpcCache = taosCacheInit(TSDB_DATA_TYPE_BINARY, refreshTime, true, tscFreeRpcObj, "rpcObj"); + pthread_mutex_init(&rpcObjMutex, NULL); + + tscConnRef = taosOpenRef(200, destroyTscObj); + tscReqRef = taosOpenRef(40960, tscFreeRegisteredSqlObj); + + taosGetCurrentAPPName(appInfo.appName, NULL); + appInfo.pid = taosGetPId(); + appInfo.startTime = taosGetTimestampMs(); + + tscDebug("client is initialized successfully"); +} + +int taos_init() { + pthread_once(&tscinit, taos_init_imp); + return tscInitRes; +} + +// this function may be called by user or system, or by both simultaneously. +void taos_cleanup(void) { + tscDebug("start to cleanup client environment"); + + if (atomic_val_compare_exchange_32(&sentinel, TSC_VAR_NOT_RELEASE, TSC_VAR_RELEASED) != TSC_VAR_NOT_RELEASE) { + return; + } + + int32_t id = tscReqRef; + tscReqRef = -1; + taosCloseRef(id); + + void* p = tscQhandle; + tscQhandle = NULL; + taosCleanUpScheduler(p); + + id = tscConnRef; + tscConnRef = -1; + taosCloseRef(id); + + p = tscRpcCache; + tscRpcCache = NULL; + + if (p != NULL) { + taosCacheCleanup(p); + pthread_mutex_destroy(&rpcObjMutex); + } + + pthread_mutex_destroy(&setConfMutex); + + rpcCleanup(); + taosCloseLog(); +} + +static int taos_options_imp(TSDB_OPTION option, const char *pStr) { + SGlobalCfg *cfg = NULL; + + switch (option) { + case TSDB_OPTION_CONFIGDIR: + cfg = taosGetConfigOption("configDir"); + assert(cfg != NULL); + + if (cfg->cfgStatus <= TAOS_CFG_CSTATUS_OPTION) { + tstrncpy(configDir, pStr, TSDB_FILENAME_LEN); + cfg->cfgStatus = TAOS_CFG_CSTATUS_OPTION; + tscInfo("set config file directory:%s", pStr); + } else { + tscWarn("config option:%s, input value:%s, is configured by %s, use %s", cfg->option, pStr, tsCfgStatusStr[cfg->cfgStatus], (char *)cfg->ptr); + } + break; + + case TSDB_OPTION_SHELL_ACTIVITY_TIMER: + cfg = taosGetConfigOption("shellActivityTimer"); + assert(cfg != NULL); + + if (cfg->cfgStatus <= TAOS_CFG_CSTATUS_OPTION) { + tsShellActivityTimer = atoi(pStr); + if (tsShellActivityTimer < 1) tsShellActivityTimer = 1; + if (tsShellActivityTimer > 3600) tsShellActivityTimer = 3600; + cfg->cfgStatus = TAOS_CFG_CSTATUS_OPTION; + tscInfo("set shellActivityTimer:%d", tsShellActivityTimer); + } else { + tscWarn("config option:%s, input value:%s, is configured by %s, use %d", cfg->option, pStr, tsCfgStatusStr[cfg->cfgStatus], *(int32_t *)cfg->ptr); + } + break; + + case TSDB_OPTION_LOCALE: { // set locale + cfg = taosGetConfigOption("locale"); + assert(cfg != NULL); + + size_t len = strlen(pStr); + if (len == 0 || len > TSDB_LOCALE_LEN) { + tscInfo("Invalid locale:%s, use default", pStr); + return -1; + } + + if (cfg->cfgStatus <= TAOS_CFG_CSTATUS_OPTION) { + char sep = '.'; + + if (strlen(tsLocale) == 0) { // locale does not set yet + char* defaultLocale = setlocale(LC_CTYPE, ""); + + // The locale of the current OS does not be set correctly, so the default locale cannot be acquired. + // The launch of current system will abort soon. + if (defaultLocale == NULL) { + tscError("failed to get default locale, please set the correct locale in current OS"); + return -1; + } + + tstrncpy(tsLocale, defaultLocale, TSDB_LOCALE_LEN); + } + + // set the user specified locale + char *locale = setlocale(LC_CTYPE, pStr); + + if (locale != NULL) { // failed to set the user specified locale + tscInfo("locale set, prev locale:%s, new locale:%s", tsLocale, locale); + cfg->cfgStatus = TAOS_CFG_CSTATUS_OPTION; + } else { // set the user specified locale failed, use default LC_CTYPE as current locale + locale = setlocale(LC_CTYPE, tsLocale); + tscInfo("failed to set locale:%s, current locale:%s", pStr, tsLocale); + } + + tstrncpy(tsLocale, locale, TSDB_LOCALE_LEN); + + char *charset = strrchr(tsLocale, sep); + if (charset != NULL) { + charset += 1; + + charset = taosCharsetReplace(charset); + + if (taosValidateEncodec(charset)) { + if (strlen(tsCharset) == 0) { + tscInfo("charset set:%s", charset); + } else { + tscInfo("charset changed from %s to %s", tsCharset, charset); + } + + tstrncpy(tsCharset, charset, TSDB_LOCALE_LEN); + cfg->cfgStatus = TAOS_CFG_CSTATUS_OPTION; + + } else { + tscInfo("charset:%s is not valid in locale, charset remains:%s", charset, tsCharset); + } + + free(charset); + } else { // it may be windows system + tscInfo("charset remains:%s", tsCharset); + } + } else { + tscWarn("config option:%s, input value:%s, is configured by %s, use %s", cfg->option, pStr, tsCfgStatusStr[cfg->cfgStatus], (char *)cfg->ptr); + } + break; + } + + case TSDB_OPTION_CHARSET: { + /* set charset will override the value of charset, assigned during system locale changed */ + cfg = taosGetConfigOption("charset"); + assert(cfg != NULL); + + size_t len = strlen(pStr); + if (len == 0 || len > TSDB_LOCALE_LEN) { + tscInfo("failed to set charset:%s", pStr); + return -1; + } + + if (cfg->cfgStatus <= TAOS_CFG_CSTATUS_OPTION) { + if (taosValidateEncodec(pStr)) { + if (strlen(tsCharset) == 0) { + tscInfo("charset is set:%s", pStr); + } else { + tscInfo("charset changed from %s to %s", tsCharset, pStr); + } + + tstrncpy(tsCharset, pStr, TSDB_LOCALE_LEN); + cfg->cfgStatus = TAOS_CFG_CSTATUS_OPTION; + } else { + tscInfo("charset:%s not valid", pStr); + } + } else { + tscWarn("config option:%s, input value:%s, is configured by %s, use %s", cfg->option, pStr, tsCfgStatusStr[cfg->cfgStatus], (char *)cfg->ptr); + } + + break; + } + + case TSDB_OPTION_TIMEZONE: + cfg = taosGetConfigOption("timezone"); + assert(cfg != NULL); + + if (cfg->cfgStatus <= TAOS_CFG_CSTATUS_OPTION) { + tstrncpy(tsTimezone, pStr, TSDB_TIMEZONE_LEN); + tsSetTimeZone(); + cfg->cfgStatus = TAOS_CFG_CSTATUS_OPTION; + tscDebug("timezone set:%s, input:%s by taos_options", tsTimezone, pStr); + } else { + tscWarn("config option:%s, input value:%s, is configured by %s, use %s", cfg->option, pStr, tsCfgStatusStr[cfg->cfgStatus], (char *)cfg->ptr); + } + break; + + default: + // TODO return the correct error code to client in the format for taos_errstr() + tscError("Invalid option %d", option); + return -1; + } + + return 0; +} + +int taos_options(TSDB_OPTION option, const void *arg, ...) { + static int32_t lock = 0; + + for (int i = 1; atomic_val_compare_exchange_32(&lock, 0, 1) != 0; ++i) { + if (i % 1000 == 0) { + tscInfo("haven't acquire lock after spin %d times.", i); + sched_yield(); + } + } + + int ret = taos_options_imp(option, (const char*)arg); + + atomic_store_32(&lock, 0); + return ret; +} + +#if 0 +#include "cJSON.h" +static setConfRet taos_set_config_imp(const char *config){ + setConfRet ret = {SET_CONF_RET_SUCC, {0}}; + static bool setConfFlag = false; + if (setConfFlag) { + ret.retCode = SET_CONF_RET_ERR_ONLY_ONCE; + strcpy(ret.retMsg, "configuration can only set once"); + return ret; + } + taosInitGlobalCfg(); + cJSON *root = cJSON_Parse(config); + if (root == NULL){ + ret.retCode = SET_CONF_RET_ERR_JSON_PARSE; + strcpy(ret.retMsg, "parse json error"); + return ret; + } + + int size = cJSON_GetArraySize(root); + if(!cJSON_IsObject(root) || size == 0) { + ret.retCode = SET_CONF_RET_ERR_JSON_INVALID; + strcpy(ret.retMsg, "json content is invalid, must be not empty object"); + return ret; + } + + if(size >= 1000) { + ret.retCode = SET_CONF_RET_ERR_TOO_LONG; + strcpy(ret.retMsg, "json object size is too long"); + return ret; + } + + for(int i = 0; i < size; i++){ + cJSON *item = cJSON_GetArrayItem(root, i); + if(!item) { + ret.retCode = SET_CONF_RET_ERR_INNER; + strcpy(ret.retMsg, "inner error"); + return ret; + } + if(!taosReadConfigOption(item->string, item->valuestring, NULL, NULL, TAOS_CFG_CSTATUS_OPTION, TSDB_CFG_CTYPE_B_CLIENT)){ + ret.retCode = SET_CONF_RET_ERR_PART; + if (strlen(ret.retMsg) == 0){ + snprintf(ret.retMsg, RET_MSG_LENGTH, "part error|%s", item->string); + }else{ + int tmp = RET_MSG_LENGTH - 1 - (int)strlen(ret.retMsg); + size_t leftSize = tmp >= 0 ? tmp : 0; + strncat(ret.retMsg, "|", leftSize); + tmp = RET_MSG_LENGTH - 1 - (int)strlen(ret.retMsg); + leftSize = tmp >= 0 ? tmp : 0; + strncat(ret.retMsg, item->string, leftSize); + } + } + } + cJSON_Delete(root); + setConfFlag = true; + return ret; +} + +setConfRet taos_set_config(const char *config){ + pthread_mutex_lock(&setConfMutex); + setConfRet ret = taos_set_config_imp(config); + pthread_mutex_unlock(&setConfMutex); + return ret; +} +#endif \ No newline at end of file diff --git a/source/common/src/tep.c b/source/common/src/tep.c new file mode 100644 index 0000000000000000000000000000000000000000..7dda5f5f6f5f3d66706ed70fa354ecab64362190 --- /dev/null +++ b/source/common/src/tep.c @@ -0,0 +1,20 @@ + + +int taosGetFqdnPortFromEp(const char *ep, char *fqdn, uint16_t *port) { + *port = 0; + strcpy(fqdn, ep); + + char *temp = strchr(fqdn, ':'); + if (temp) { + *temp = 0; + *port = atoi(temp+1); + } + + if (*port == 0) { + *port = tsServerPort; + return -1; + } + + return 0; +} + diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 1737bd9def7ef64ff8c52d787ab332870bd804e9..a2cbdbff4a2d12ef488dc02e5df42596fb35c3e9 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -166,16 +166,8 @@ int8_t tsEnableSlaveQuery = 1; int8_t tsEnableAdjustMaster = 1; // restful -int8_t tsEnableHttpModule = 1; int32_t tsRestRowLimit = 10240; -uint16_t tsHttpPort = 6041; // only tcp, range tcp[6041] -int32_t tsHttpCacheSessions = 1000; -int32_t tsHttpSessionExpire = 36000; -int32_t tsHttpMaxThreads = 2; -int8_t tsHttpEnableCompress = 1; -int8_t tsHttpEnableRecordSql = 0; int8_t tsTelegrafUseFieldNum = 0; -int8_t tsHttpDbNameMandatory = 0; // mqtt int8_t tsEnableMqttModule = 0; // not finished yet, not started it by default @@ -198,7 +190,6 @@ int8_t tsEnableStream = 1; // internal int8_t tsCompactMnodeWal = 0; int8_t tsPrintAuth = 0; -int8_t tscEmbedded = 0; char tsVnodeDir[PATH_MAX] = {0}; char tsDnodeDir[PATH_MAX] = {0}; char tsMnodeDir[PATH_MAX] = {0}; @@ -261,7 +252,6 @@ void taosSetAllDebugFlag() { sdbDebugFlag = debugFlag; dDebugFlag = debugFlag; vDebugFlag = debugFlag; - cDebugFlag = debugFlag; jniDebugFlag = debugFlag; odbcDebugFlag = debugFlag; httpDebugFlag = debugFlag; @@ -1161,16 +1151,6 @@ static void doInitGlobalConfig(void) { cfg.unitType = TAOS_CFG_UTYPE_NONE; taosInitConfigOption(cfg); - cfg.option = "http"; - cfg.ptr = &tsEnableHttpModule; - cfg.valType = TAOS_CFG_VTYPE_INT8; - cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW; - cfg.minValue = 0; - cfg.maxValue = 1; - cfg.ptrLength = 1; - cfg.unitType = TAOS_CFG_UTYPE_NONE; - taosInitConfigOption(cfg); - cfg.option = "mqtt"; cfg.ptr = &tsEnableMqttModule; cfg.valType = TAOS_CFG_VTYPE_INT8; @@ -1211,16 +1191,6 @@ static void doInitGlobalConfig(void) { cfg.unitType = TAOS_CFG_UTYPE_NONE; taosInitConfigOption(cfg); - cfg.option = "httpEnableRecordSql"; - cfg.ptr = &tsHttpEnableRecordSql; - cfg.valType = TAOS_CFG_VTYPE_INT8; - cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG; - cfg.minValue = 0; - cfg.maxValue = 1; - cfg.ptrLength = 0; - cfg.unitType = TAOS_CFG_UTYPE_NONE; - taosInitConfigOption(cfg); - cfg.option = "telegrafUseFieldNum"; cfg.ptr = &tsTelegrafUseFieldNum; cfg.valType = TAOS_CFG_VTYPE_INT8; @@ -1231,16 +1201,6 @@ static void doInitGlobalConfig(void) { cfg.unitType = TAOS_CFG_UTYPE_NONE; taosInitConfigOption(cfg); - cfg.option = "httpMaxThreads"; - cfg.ptr = &tsHttpMaxThreads; - cfg.valType = TAOS_CFG_VTYPE_INT32; - cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG; - cfg.minValue = 2; - cfg.maxValue = 1000000; - cfg.ptrLength = 0; - cfg.unitType = TAOS_CFG_UTYPE_NONE; - taosInitConfigOption(cfg); - cfg.option = "restfulRowLimit"; cfg.ptr = &tsRestRowLimit; cfg.valType = TAOS_CFG_VTYPE_INT32; @@ -1251,16 +1211,6 @@ static void doInitGlobalConfig(void) { cfg.unitType = TAOS_CFG_UTYPE_NONE; taosInitConfigOption(cfg); - cfg.option = "httpDbNameMandatory"; - cfg.ptr = &tsHttpDbNameMandatory; - cfg.valType = TAOS_CFG_VTYPE_INT8; - cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG; - cfg.minValue = 0; - cfg.maxValue = 1; - cfg.ptrLength = 0; - cfg.unitType = TAOS_CFG_UTYPE_NONE; - taosInitConfigOption(cfg); - // debug flag cfg.option = "numOfLogLines"; cfg.ptr = &tsNumOfLogLines; @@ -1697,13 +1647,6 @@ int32_t taosCheckGlobalCfg() { tsNumOfCores = 1; } - if (tsHttpMaxThreads == 2) { - int32_t halfNumOfCores = tsNumOfCores >> 1; - if (halfNumOfCores > 2) { - tsHttpMaxThreads = halfNumOfCores; - } - } - if (tsMaxTablePerVnode < tsMinTablePerVnode) { uError("maxTablesPerVnode(%d) < minTablesPerVnode(%d), reset to minTablesPerVnode(%d)", tsMaxTablePerVnode, tsMinTablePerVnode, tsMinTablePerVnode); @@ -1738,24 +1681,6 @@ int32_t taosCheckGlobalCfg() { return 0; } -int taosGetFqdnPortFromEp(const char *ep, char *fqdn, uint16_t *port) { - *port = 0; - strcpy(fqdn, ep); - - char *temp = strchr(fqdn, ':'); - if (temp) { - *temp = 0; - *port = atoi(temp+1); - } - - if (*port == 0) { - *port = tsServerPort; - return -1; - } - - return 0; -} - /* * alter dnode 1 balance "vnode:1-dnode:2" */ diff --git a/source/dnode/mgmt/impl/test/sut/deploy.cpp b/source/dnode/mgmt/impl/test/sut/deploy.cpp index ad1667eac566ddf389fa4df0c6b656041f3439b2..c484ea122d16cfef9d5e7ef6303b8f4ca5911ba0 100644 --- a/source/dnode/mgmt/impl/test/sut/deploy.cpp +++ b/source/dnode/mgmt/impl/test/sut/deploy.cpp @@ -19,7 +19,6 @@ void initLog(const char* path) { dDebugFlag = 0; vDebugFlag = 0; mDebugFlag = 207; - cDebugFlag = 0; jniDebugFlag = 0; tmrDebugFlag = 0; sdbDebugFlag = 0; diff --git a/source/libs/transport/inc/rpcLog.h b/source/libs/transport/inc/rpcLog.h index 6c4a281d2c0f0bf85975200c833acbac856dc40c..904680bbe66be06b30b49be727ca522b9b5ed47e 100644 --- a/source/libs/transport/inc/rpcLog.h +++ b/source/libs/transport/inc/rpcLog.h @@ -23,11 +23,10 @@ extern "C" { #include "tlog.h" extern int32_t rpcDebugFlag; -extern int8_t tscEmbedded; -#define tFatal(...) { if (rpcDebugFlag & DEBUG_FATAL) { taosPrintLog("RPC FATAL ", tscEmbedded ? 255 : rpcDebugFlag, __VA_ARGS__); }} -#define tError(...) { if (rpcDebugFlag & DEBUG_ERROR) { taosPrintLog("RPC ERROR ", tscEmbedded ? 255 : rpcDebugFlag, __VA_ARGS__); }} -#define tWarn(...) { if (rpcDebugFlag & DEBUG_WARN) { taosPrintLog("RPC WARN ", tscEmbedded ? 255 : rpcDebugFlag, __VA_ARGS__); }} +#define tFatal(...) { if (rpcDebugFlag & DEBUG_FATAL) { taosPrintLog("RPC FATAL ", rpcDebugFlag, __VA_ARGS__); }} +#define tError(...) { if (rpcDebugFlag & DEBUG_ERROR) { taosPrintLog("RPC ERROR ", rpcDebugFlag, __VA_ARGS__); }} +#define tWarn(...) { if (rpcDebugFlag & DEBUG_WARN) { taosPrintLog("RPC WARN ", rpcDebugFlag, __VA_ARGS__); }} #define tInfo(...) { if (rpcDebugFlag & DEBUG_INFO) { taosPrintLog("RPC ", tscEmbedded ? 255 : rpcDebugFlag, __VA_ARGS__); }} #define tDebug(...) { if (rpcDebugFlag & DEBUG_DEBUG) { taosPrintLog("RPC ", rpcDebugFlag, __VA_ARGS__); }} #define tTrace(...) { if (rpcDebugFlag & DEBUG_TRACE) { taosPrintLog("RPC ", rpcDebugFlag, __VA_ARGS__); }} diff --git a/source/util/src/tnote.c b/source/util/src/tnote.c index 5606ab248d71b62f9b6041b76069a5292cd33223..d9356b7e4049cfdf9bc77ff1606e24993758bd6c 100644 --- a/source/util/src/tnote.c +++ b/source/util/src/tnote.c @@ -49,15 +49,6 @@ int32_t taosInitNotes() { taosInitNote(tsNumOfLogLines, 1, &tsTscNote, name); } - if (tsHttpEnableRecordSql) { - snprintf(name, TSDB_FILENAME_LEN * 2, "%s/httpsql", tsLogDir); - taosInitNote(tsNumOfLogLines, 1, &tsHttpNote, name); - } - - if (tscEmbedded == 1) { - snprintf(name, TSDB_FILENAME_LEN * 2, "%s/taosinfo", tsLogDir); - taosInitNote(tsNumOfLogLines, 1, &tsInfoNote, name); - } #endif return 0; }