diff --git a/src/client/src/tscParseInsert.c b/src/client/src/tscParseInsert.c index f2e2e55e9075af02cbf34063e24c923238377772..8039fbe405feada1552993cc2c1d78364ec2b2ab 100644 --- a/src/client/src/tscParseInsert.c +++ b/src/client/src/tscParseInsert.c @@ -1071,7 +1071,7 @@ int doParseInsertSql(SSqlObj *pSql, char *str) { } // todo add to return - tscError("async insert parse error, code:%d, %s", code, tsError[code]); + tscError("async insert parse error, code:%d, %s", code, tstrerror(code)); pSql->asyncTblPos = NULL; } diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 2c0d0f898a39b0cab5d62bd00a51d0c0461ecd4b..e76cc0446c8b7c665e39a96af5e902b88abd1f21 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -849,7 +849,7 @@ char *taos_errstr(TAOS *taos) { STscObj *pObj = (STscObj *)taos; uint8_t code; - if (pObj == NULL || pObj->signature != pObj) return tsError[globalCode]; + if (pObj == NULL || pObj->signature != pObj) return tstrerror(globalCode); SSqlObj* pSql = pObj->pSql; @@ -862,7 +862,7 @@ char *taos_errstr(TAOS *taos) { if (hasAdditionalErrorInfo(code, &pSql->cmd)) { return pSql->cmd.payload; } else { - return tsError[code]; + return tstrerror(code); } } diff --git a/src/dnode/CMakeLists.txt b/src/dnode/CMakeLists.txt index 143c5397e8301da5bf411258ed721f8e63f84098..c1e572cdccefa35c11346f6ae6e728126900bac1 100644 --- a/src/dnode/CMakeLists.txt +++ b/src/dnode/CMakeLists.txt @@ -4,25 +4,18 @@ PROJECT(TDengine) IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc) + INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc) INCLUDE_DIRECTORIES(${TD_ENTERPRISE_DIR}/src/inc) - - INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/dnode/inc) - INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/mnode/detail/inc) - INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/vnode/detail/inc) - INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/client/inc) - INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/modules/http/inc) - INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/modules/monitor/inc) - INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/inc) - INCLUDE_DIRECTORIES(${TD_ENTERPRISE_DIR}/src/util/cluster/inc) INCLUDE_DIRECTORIES(inc) AUX_SOURCE_DIRECTORY(src SRC) - ADD_EXECUTABLE(taosd ${SRC}) - TARGET_LINK_LIBRARIES(taosd mnode vnode taos_static monitor http) + ADD_LIBRARY(dnode ${SRC}) + #ADD_EXECUTABLE(taosd ${SRC}) + #TARGET_LINK_LIBRARIES(taosd mnode sdb vnode taos_static monitor http) - IF (TD_CLUSTER) - TARGET_LINK_LIBRARIES(taosd dcluster) - ENDIF () + #IF (TD_CLUSTER) + # TARGET_LINK_LIBRARIES(taosd dcluster) + #ENDIF () SET(PREPARE_ENV_CMD "prepare_env_cmd") SET(PREPARE_ENV_TARGET "prepare_env_target") diff --git a/src/dnode/inc/dnodeRead.h b/src/dnode/inc/dnodeRead.h index 1cd2e44342a3c67331195ebf345d77d2abf7aaf7..b002d0eefb3d44b3bd5406eb984881384606cdd4 100644 --- a/src/dnode/inc/dnodeRead.h +++ b/src/dnode/inc/dnodeRead.h @@ -23,7 +23,7 @@ extern "C" { /* * Dnode handle read messages * The processing result is returned by callback function with pShellObj parameter -*/ + */ int32_t dnodeReadData(SQueryMeterMsg *msg, void *pShellObj, void (*callback)(SQueryMeterRsp *rspMsg, void *pShellObj)); #ifdef __cplusplus diff --git a/src/dnode/inc/dnodeShell.h b/src/dnode/inc/dnodeShell.h index e4e26512b14742ca6c0b827739c8c741622b041b..bb04767eb8ea6f59aeebed74f5a67df61f717439 100644 --- a/src/dnode/inc/dnodeShell.h +++ b/src/dnode/inc/dnodeShell.h @@ -20,6 +20,26 @@ extern "C" { #endif +#include +#include +#include "dnode.h" + +typedef struct { + int sid; + int vnode; + uint32_t ip; + uint16_t port; + int32_t count; // track the number of imports + int32_t code; // track the code of imports + int32_t numOfTotalPoints; // track the total number of points imported + void *thandle; // handle from TAOS layer + void *qhandle; +} SShellObj; + +int32_t dnodeInitShell(); + +//SDnodeStatisInfo dnodeGetStatisInfo() + #ifdef __cplusplus } #endif diff --git a/src/dnode/inc/dnodeSystem.h b/src/dnode/inc/dnodeSystem.h index eb25357c59438a0cd3e3dd9d2bab7524ba769ee3..98a9f582f73b143379e4f693f7c6264e98476fad 100644 --- a/src/dnode/inc/dnodeSystem.h +++ b/src/dnode/inc/dnodeSystem.h @@ -23,16 +23,31 @@ extern "C" { #include #include #include +#include "dnode.h" -extern pthread_mutex_t dmutex; -extern bool tsDnodeStopping; +typedef enum { + TSDB_DNODE_RUN_STATUS_INITIALIZE, + TSDB_DNODE_RUN_STATUS_RUNING, + TSDB_DNODE_RUN_STATUS_STOPPED +} SDnodeRunStatus; -int dnodeInitSystem(); +extern int32_t (*dnodeInitPeers)(int32_t numOfThreads); +extern int32_t (*dnodeCheckSystem)(); +extern int32_t (*dnodeInitStorage)(); +extern void (*dnodeCleanupStorage)(); +extern void (*dnodeParseParameterK)(); +extern int32_t tsMaxQueues; + + +int32_t dnodeInitSystem(); void dnodeCleanUpSystem(); -void dnodeCheckDbRunning(const char* dir); +void dnodeInitPlugins(); -int vnodeInitStore(); -int vnodeInitPeer(int numOfThreads); +SDnodeRunStatus dnodeGetRunStatus(); +void dnodeSetRunStatus(SDnodeRunStatus status); +void dnodeCheckDataDirOpenned(const char *dir); +void dnodeLockVnodes(); +void dnodeUnLockVnodes(); #ifdef __cplusplus } diff --git a/src/dnode/inc/dnodeVnodeMgmt.h b/src/dnode/inc/dnodeVnodeMgmt.h index 137461607e72390793677d92e50a11d35f9abea4..9fee09166bdfd97f24fee07e7255c5b84dc26e25 100644 --- a/src/dnode/inc/dnodeVnodeMgmt.h +++ b/src/dnode/inc/dnodeVnodeMgmt.h @@ -33,7 +33,7 @@ int32_t dnodeOpenVnodes(); /* * Close all Vnodes that have been created and opened */ -int32_t dnodeCloseVnodes(); +int32_t dnodeCleanupVnodes(); /* * Check if vnode already exists @@ -43,7 +43,8 @@ int32_t dnodeCheckVnodeExist(int vid); /* * Create vnode with specified configuration and open it */ -tsdb_repo_t* dnodeCreateVnode(int vid, SVnodeCfg *cfg); +//tsdb_repo_t* dnodeCreateVnode(int vid, SVnodeCfg *cfg); +void* dnodeCreateVnode(int vid, SVnodeCfg *cfg); /* * Modify vnode configuration information @@ -53,7 +54,7 @@ int32_t dnodeConfigVnode(int vid, SVnodeCfg *cfg); /* * Modify vnode replication information */ -int32_t dnodeConfigVnodePeers(int vid, /*SVpeerCfgMsg *cfg*/); +int32_t dnodeConfigVnodePeers(int vid/*, SVpeerCfgMsg *cfg*/); /* * Remove vnode from local repository @@ -63,8 +64,8 @@ int32_t dnodeDropVnode(int vid); /* * Get the vnode object that has been opened */ -tsdb_repo_t* dnodeGetVnode(int vid); - +//tsdb_repo_t* dnodeGetVnode(int vid); +void* dnodeGetVnode(int vid); #ifdef __cplusplus } diff --git a/src/dnode/inc/dnodeWrite.h b/src/dnode/inc/dnodeWrite.h index 77ac78ea0e6e286bd690373c2e124323b9c4a8d6..e323f48020cf3938914846db754166cc88ecc422 100644 --- a/src/dnode/inc/dnodeWrite.h +++ b/src/dnode/inc/dnodeWrite.h @@ -25,31 +25,58 @@ extern "C" { #include "taosdef.h" #include "taosmsg.h" +/* + * Write data based on dnode + * If >= 0, it is affect rows + * If < 0, get error code from terrno + */ +int32_t dnodeWriteData(SShellSubmitMsg *msg); + /* * Check if table already exists */ -int32_t dnodeCheckTableExist(int vid, int sid, int64_t uid); +int32_t dnodeCheckTableExist(char *tableId); /* - * Create table with specified configuration and open it + * Create noraml table with specified configuration and open it */ -int32_t dnodeCreateTable(int vid, int sid, SCreateMsg *table); +int32_t dnodeCreateNormalTable(SCreateNormalTableMsg *table); /* - * Modify table configuration information + * Create stream table with specified configuration and open it */ -int32_t dnodeAlterTable(int vid, SMeterObj *table); +int32_t dnodeCreateStreamTable(SCreateStreamTableMsg *table); /* - * Remove table from local repository + * Create child table with specified configuration and open it */ -int32_t dnodeDropTable(int vid, int sid, int64_t uid); +int32_t dnodeCreateChildTable(SCreateChildTableMsg *table); /* - * Write data based on dnode + * Modify normal table configuration information + * */ -int32_t dnodeWriteData(SShellSubmitMsg *msg); +int32_t dnodeAlterNormalTable(SCreateNormalTableMsg *table); +/* + * Modify stream table configuration information + */ +int32_t dnodeAlterStreamTable(SCreateStreamTableMsg *table); + +/* + * Modify child table configuration information + */ +int32_t dnodeAlterChildTable(SCreateChildTableMsg *table); + +/* + * Remove all child tables of supertable from local repository + */ +int32_t dnodeDropSuperTable(int vid, int sid, int64_t uid); + +/* + * Remove table from local repository + */ +int32_t dnodeDropTable(int vid, int sid, int64_t uid); #ifdef __cplusplus } diff --git a/src/dnode/src/dnodeModule.c b/src/dnode/src/dnodeModule.c index 5999ff3239461901e37619f05e0a9c5fa4219134..370a96ac61b7f3abf5bee08891898f8534b62590 100644 --- a/src/dnode/src/dnodeModule.c +++ b/src/dnode/src/dnodeModule.c @@ -71,7 +71,7 @@ void dnodeCleanUpModules() { } void dnodeProcessModuleStatus(uint32_t status) { - if (tsDnodeStopping) { + if (tsDnodeRunStatus) { return; } diff --git a/src/dnode/src/dnodeService.c b/src/dnode/src/dnodeService.c index 763422b1dd2a9a7c41eb692245a4339c62dc65d9..d7f9721fda9091efeb06d72f9fad2f76b4f65ebc 100644 --- a/src/dnode/src/dnodeService.c +++ b/src/dnode/src/dnodeService.c @@ -15,39 +15,13 @@ #define _DEFAULT_SOURCE #include "os.h" +#include "tlog.h" #include "tglobalcfg.h" -#include "vnode.h" #include "dnodeSystem.h" -#ifdef CLUSTER - #include "dnodeCluster.h" - #include "httpAdmin.h" - #include "mnodeAccount.h" - #include "mnodeBalance.h" - #include "mnodeCluster.h" - #include "sdbReplica.h" - #include "multilevelStorage.h" - #include "vnodeCluster.h" - #include "vnodeReplica.h" - #include "dnodeGrant.h" - void init() { - dnodeClusterInit(); - httpAdminInit(); - mnodeAccountInit(); - mnodeBalanceInit(); - mnodeClusterInit(); - sdbReplicaInit(); - multilevelStorageInit(); - vnodeClusterInit(); - vnodeReplicaInit(); - dnodeGrantInit(); - } -#endif - -void dnodeParseParameterKImp() {} -void (*dnodeParseParameterK)() = dnodeParseParameterKImp; - -/* Termination handler */ +/* + * Termination handler + */ void signal_handler(int signum, siginfo_t *sigInfo, void *context) { if (signum == SIGUSR1) { tsCfgDynamicOptions("debugFlag 135"); @@ -70,6 +44,8 @@ void signal_handler(int signum, siginfo_t *sigInfo, void *context) { } int main(int argc, char *argv[]) { + dnodeInitPlugins(); + // Set global configuration file for (int i = 1; i < argc; ++i) { if (strcmp(argv[i], "-c") == 0) { diff --git a/src/dnode/src/dnodeShell.c b/src/dnode/src/dnodeShell.c index 6e43e7074243ba8480c4781139c14c50e311eeb4..f02c7849e7a2a8407b74936864ca7e805fd16548 100644 --- a/src/dnode/src/dnodeShell.c +++ b/src/dnode/src/dnodeShell.c @@ -15,33 +15,29 @@ #define _DEFAULT_SOURCE #include "os.h" - +#include "taoserror.h" +#include "taosdef.h" #include "taosmsg.h" -#include "vnode.h" -#include "vnodeShell.h" +#include "tlog.h" #include "tschemautil.h" - #include "textbuffer.h" #include "trpc.h" -#include "tscJoinProcess.h" -#include "vnode.h" -#include "vnodeRead.h" -#include "vnodeUtil.h" -#include "vnodeStore.h" -#include "vnodeStatus.h" +#include "dnode.h" +#include "dnodeSystem.h" +#include "dnodeShell.h" -extern int tsMaxQueues; -void * pShellServer = NULL; -SShellObj **shellList = NULL; -int vnodeProcessRetrieveRequest(char *pMsg, int msgLen, SShellObj *pObj); -int vnodeProcessQueryRequest(char *pMsg, int msgLen, SShellObj *pObj); -int vnodeProcessShellSubmitRequest(char *pMsg, int msgLen, SShellObj *pObj); +int32_t vnodeProcessRetrieveRequest(char *pMsg, int msgLen, SShellObj *pObj); +int32_t vnodeProcessQueryRequest(char *pMsg, int msgLen, SShellObj *pObj); +int32_t vnodeProcessShellSubmitRequest(char *pMsg, int msgLen, SShellObj *pObj); + static void vnodeProcessBatchSubmitTimer(void *param, void *tmrId); -int vnodeSelectReqNum = 0; -int vnodeInsertReqNum = 0; +static void *pShellServer = NULL; +static SShellObj **shellList = NULL; +static int32_t dnodeSelectReqNum = 0; +static int32_t dnodeInsertReqNum = 0; typedef struct { int32_t import; @@ -131,7 +127,7 @@ void *vnodeProcessMsgFromShell(char *msg, void *ahandle, void *thandle) { return pObj; } -int vnodeInitShell() { +int32_t dnodeInitShell() { int size; SRpcInit rpcInit; @@ -399,7 +395,7 @@ _query_over: vnodeFreeColumnInfo(&pQueryMsg->colList[i]); } - atomic_fetch_add_32(&vnodeSelectReqNum, 1); + atomic_fetch_add_32(&dnodeSelectReqNum, 1); return ret; } @@ -700,7 +696,7 @@ _submit_over: ret = vnodeSendShellSubmitRspMsg(pObj, code, pObj->numOfTotalPoints); } - atomic_fetch_add_32(&vnodeInsertReqNum, 1); + atomic_fetch_add_32(&dnodeInsertReqNum, 1); return ret; } @@ -729,3 +725,15 @@ static void vnodeProcessBatchSubmitTimer(void *param, void *tmrId) { vnodeSendShellSubmitRspMsg(pShell, code, pShell->numOfTotalPoints); } } + + +SDnodeStatisInfo dnodeGetStatisInfo() { + SDnodeStatisInfo info = {0}; + if (dnodeGetRunStatus() == TSDB_DNODE_RUN_STATUS_RUNING) { + info.httpReqNum = httpGetReqCount(); + info.selectReqNum = atomic_exchange_32(&dnodeSelectReqNum, 0); + info.insertReqNum = atomic_exchange_32(&dnodeInsertReqNum, 0); + } + + return info; +} \ No newline at end of file diff --git a/src/dnode/src/dnodeSystem.c b/src/dnode/src/dnodeSystem.c index 1203e15ed4b0a882c1ae426f3a5ecf97ca88b71f..600a0c38d7f9867a7e762f2ad9225adf2abc38f7 100644 --- a/src/dnode/src/dnodeSystem.c +++ b/src/dnode/src/dnodeSystem.c @@ -16,43 +16,78 @@ #define _DEFAULT_SOURCE #include "os.h" #include "taosdef.h" +#include "taoserror.h" +#include "tcrc32c.h" #include "tlog.h" +#include "ttime.h" #include "ttimer.h" +#include "tutil.h" +#include "http.h" #include "dnode.h" #include "dnodeMgmt.h" #include "dnodeModule.h" +#include "dnodeShell.h" #include "dnodeSystem.h" -#include "monitorSystem.h" -#include "httpSystem.h" -#include "mgmtSystem.h" - -#include "vnode.h" - -pthread_mutex_t dmutex; -extern int vnodeSelectReqNum; -extern int vnodeInsertReqNum; -void * tsStatusTimer = NULL; -bool tsDnodeStopping = false; - -// internal global, not configurable -void * vnodeTmrCtrl; -void ** rpcQhandle; -void * dmQhandle; -void * queryQhandle; -int tsVnodePeers = TSDB_VNODES_SUPPORT - 1; -int tsMaxQueues; +#include "dnodeVnodeMgmt.h" + +#ifdef CLUSTER +#include "dnodeCluster.h" +#include "httpAdmin.h" +#include "mnodeAccount.h" +#include "mnodeBalance.h" +#include "mnodeCluster.h" +#include "sdbReplica.h" +#include "multilevelStorage.h" +#include "vnodeCluster.h" +#include "vnodeReplica.h" +#include "dnodeGrant.h" +#endif + +static pthread_mutex_t tsDnodeMutex; +static SDnodeRunStatus tsDnodeRunStatus = TSDB_DNODE_RUN_STATUS_STOPPED; + +static int32_t dnodeInitRpcQHandle(); +static int32_t dnodeInitQueryQHandle(); +static int32_t dnodeInitTmrCtl(); + +void *tsStatusTimer = NULL; +void *vnodeTmrCtrl; +void **rpcQhandle; +void *dmQhandle; +void *queryQhandle; +int32_t tsVnodePeers = TSDB_VNODES_SUPPORT - 1; +int32_t tsMaxQueues; uint32_t tsRebootTime; -int32_t dnodeInitRpcQHandle(); -int32_t dnodeInitQueryQHandle(); -int32_t dnodeInitTmrCtl(); -void dnodeCountRequestImp(SCountInfo *info); +static void dnodeInitVnodesLock() { + pthread_mutex_init(&tsDnodeMutex, NULL); +} + +void dnodeLockVnodes() { + pthread_mutex_lock(&tsDnodeMutex); +} + +void dnodeUnLockVnodes() { + pthread_mutex_unlock(&tsDnodeMutex); +} + +static void dnodeCleanVnodesLock() { + pthread_mutex_destroy(&tsDnodeMutex); +} + +SDnodeRunStatus dnodeGetRunStatus() { + return tsDnodeRunStatus; +} + +void dnodeSetRunStatus(SDnodeRunStatus status) { + tsDnodeRunStatus = status; +} void dnodeCleanUpSystem() { - if (tsDnodeStopping) { + if (dnodeGetRunStatus() == TSDB_DNODE_RUN_STATUS_STOPPED) { return; } else { - tsDnodeStopping = true; + dnodeSetRunStatus(TSDB_DNODE_RUN_STATUS_STOPPED); } if (tsStatusTimer != NULL) { @@ -61,33 +96,48 @@ void dnodeCleanUpSystem() { } dnodeCleanUpModules(); - - vnodeCleanUpVnodes(); - + dnodeCleanupVnodes(); taosCloseLogger(); - dnodeCleanupStorage(); + dnodeCleanVnodesLock(); } -void dnodeCheckDbRunning(const char* dir) { +void dnodeCheckDataDirOpenned(const char *dir) { char filepath[256] = {0}; sprintf(filepath, "%s/.running", dir); - int fd = open(filepath, O_WRONLY | O_CREAT | O_TRUNC, S_IRWXU | S_IRWXG | S_IRWXO); - int ret = flock(fd, LOCK_EX | LOCK_NB); + int32_t fd = open(filepath, O_WRONLY | O_CREAT | O_TRUNC, S_IRWXU | S_IRWXG | S_IRWXO); + int32_t ret = flock(fd, LOCK_EX | LOCK_NB); if (ret != 0) { dError("failed to lock file:%s ret:%d, database may be running, quit", filepath, ret); exit(0); } } -int dnodeInitSystem() { +void dnodeInitPlugins() { +#ifdef CLUSTER + dnodeClusterInit(); + httpAdminInit(); + mnodeAccountInit(); + mnodeBalanceInit(); + mnodeClusterInit(); + sdbReplicaInit(); + multilevelStorageInit(); + vnodeClusterInit(); + vnodeReplicaInit(); + dnodeGrantInit(); +#endif +} + +int32_t dnodeInitSystem() { char temp[128]; struct stat dirstat; + dnodeSetRunStatus(TSDB_DNODE_RUN_STATUS_INITIALIZE); + taosResolveCRC(); tsRebootTime = taosGetTimestampSec(); - tscEmbedded = 1; + tscEmbedded = 1; // Read global configuration. tsReadGlobalLogConfig(); @@ -124,7 +174,7 @@ int dnodeInitSystem() { dnodeAllocModules(); - pthread_mutex_init(&dmutex, NULL); + dnodeInitVnodesLock(); dPrint("starting to initialize TDengine ..."); @@ -136,7 +186,7 @@ int dnodeInitSystem() { if (dnodeCheckSystem() < 0) { return -1; } - + if (dnodeInitModules() < 0) { return -1; } @@ -151,14 +201,14 @@ int dnodeInitSystem() { return -1; } - if (vnodeInitStore() < 0) { + if (dnodeOpenVnodes() < 0) { dError("failed to init vnode storage"); return -1; } - int numOfThreads = (1.0 - tsRatioOfQueryThreads) * tsNumOfCores * tsNumOfThreadsPerCore / 2.0; + int32_t numOfThreads = (1.0 - tsRatioOfQueryThreads) * tsNumOfCores * tsNumOfThreadsPerCore / 2.0; if (numOfThreads < 1) numOfThreads = 1; - if (vnodeInitPeer(numOfThreads) < 0) { + if (dnodeInitPeers(numOfThreads) < 0) { dError("failed to init vnode peer communication"); return -1; } @@ -168,40 +218,21 @@ int dnodeInitSystem() { return -1; } - if (vnodeInitShell() < 0) { + if (dnodeInitShell() < 0) { dError("failed to init communication to shell"); return -1; } - if (vnodeInitVnodes() < 0) { - dError("failed to init store"); - return -1; - } - - mnodeCountRequestFp = dnodeCountRequestImp; - dnodeStartModules(); + dnodeSetRunStatus(TSDB_DNODE_RUN_STATUS_RUNING); + dPrint("TDengine is initialized successfully"); return 0; } -void dnodeResetSystem() { - dPrint("reset the system ..."); - for (int vnode = 0; vnode < TSDB_MAX_VNODES; ++vnode) { - vnodeRemoveVnode(vnode); - } - mgmtStopSystem(); -} - -void dnodeCountRequestImp(SCountInfo *info) { - httpGetReqCount(&info->httpReqNum); - info->selectReqNum = atomic_exchange_32(&vnodeSelectReqNum, 0); - info->insertReqNum = atomic_exchange_32(&vnodeInsertReqNum, 0); -} - -int dnodeInitStorageImp() { +int32_t dnodeInitStorageImp() { struct stat dirstat; strcpy(tsDirectory, dataDir); if (stat(dataDir, &dirstat) < 0) { @@ -218,31 +249,34 @@ int dnodeInitStorageImp() { sprintf(mgmtDirectory, "%s/mgmt", tsDirectory); sprintf(tsDirectory, "%s/tsdb", dataDir); - dnodeCheckDbRunning(dataDir); + dnodeCheckDataDirOpenned(dataDir); return 0; } + int32_t (*dnodeInitStorage)() = dnodeInitStorageImp; void dnodeCleanupStorageImp() {} + void (*dnodeCleanupStorage)() = dnodeCleanupStorageImp; -int32_t dnodeInitQueryQHandle() { - int numOfThreads = tsRatioOfQueryThreads * tsNumOfCores * tsNumOfThreadsPerCore; +static int32_t dnodeInitQueryQHandle() { + int32_t numOfThreads = tsRatioOfQueryThreads * tsNumOfCores * tsNumOfThreadsPerCore; if (numOfThreads < 1) { numOfThreads = 1; } int32_t maxQueueSize = tsNumOfVnodesPerCore * tsNumOfCores * tsSessionsPerVnode; - dTrace("query task queue initialized, max slot:%d, task threads:%d", maxQueueSize,numOfThreads); + dTrace("query task queue initialized, max slot:%d, task threads:%d", maxQueueSize, numOfThreads); queryQhandle = taosInitSchedulerWithInfo(maxQueueSize, numOfThreads, "query", vnodeTmrCtrl); return 0; } -int32_t dnodeInitTmrCtl() { - vnodeTmrCtrl = taosTmrInit(TSDB_MAX_VNODES * (tsVnodePeers + 10) + tsSessionsPerVnode + 1000, 200, 60000, "DND-vnode"); +static int32_t dnodeInitTmrCtl() { + vnodeTmrCtrl = taosTmrInit(TSDB_MAX_VNODES * (tsVnodePeers + 10) + tsSessionsPerVnode + 1000, 200, 60000, + "DND-vnode"); if (vnodeTmrCtrl == NULL) { dError("failed to init timer, exit"); return -1; @@ -251,22 +285,39 @@ int32_t dnodeInitTmrCtl() { return 0; } -int32_t dnodeInitRpcQHandle() { - tsMaxQueues = (1.0 - tsRatioOfQueryThreads)*tsNumOfCores*tsNumOfThreadsPerCore / 2.0; - if (tsMaxQueues < 1) tsMaxQueues = 1; +static int32_t dnodeInitRpcQHandle() { + tsMaxQueues = (1.0 - tsRatioOfQueryThreads) * tsNumOfCores * tsNumOfThreadsPerCore / 2.0; + if (tsMaxQueues < 1) { + tsMaxQueues = 1; + } - rpcQhandle = malloc(tsMaxQueues*sizeof(void *)); + rpcQhandle = malloc(tsMaxQueues * sizeof(void *)); - for (int i=0; i< tsMaxQueues; ++i ) + for (int32_t i = 0; i < tsMaxQueues; ++i) { rpcQhandle[i] = taosInitScheduler(tsSessionsPerVnode, 1, "dnode"); + } dmQhandle = taosInitScheduler(tsSessionsPerVnode, 1, "mgmt"); return 0; } +int32_t dnodeCheckSystemImp() { + return 0; +} + +int32_t (*dnodeCheckSystem)() = dnodeCheckSystemImp; + +void dnodeParseParameterKImp() {} + +void (*dnodeParseParameterK)() = dnodeParseParameterKImp; + +int32_t dnodeInitPeersImp(int32_t numOfThreads) { + return 0; +} + +int32_t (*dnodeInitPeers)(int32_t numOfThreads) = dnodeInitPeersImp; + -int dnodeCheckSystemImp() { return 0; } -int (*dnodeCheckSystem)() = dnodeCheckSystemImp; diff --git a/src/dnode/src/dnodeWrite.c b/src/dnode/src/dnodeWrite.c index d3c4e1a4dece8a6e954edaa62db6d129e306001c..268c843d0d0df53ee558f0565abb4aa05bcdb0c3 100644 --- a/src/dnode/src/dnodeWrite.c +++ b/src/dnode/src/dnodeWrite.c @@ -15,7 +15,45 @@ #define _DEFAULT_SOURCE #include "dnodeWrite.h" +#include "taoserror.h" -int32_t dnodeCreateTable(int vid, int sid, SCreateTableMsg *table) { +int32_t dnodeCheckTableExist(char *tableId) { + return 0; +} + +int32_t dnodeWriteData(SShellSubmitMsg *msg) { + return 0; +} + +int32_t dnodeCreateNormalTable(SCreateNormalTableMsg *table) { + return 0; +} + +int32_t dnodeCreateStreamTable(SCreateStreamTableMsg *table) { + return 0; +} + +int32_t dnodeCreateChildTable(SCreateChildTableMsg *table) { + return 0; +} +int32_t dnodeAlterNormalTable(SCreateNormalTableMsg *table) { + return 0; } + +int32_t dnodeAlterStreamTable(SCreateStreamTableMsg *table) { + return 0; +} + +int32_t dnodeAlterChildTable(SCreateChildTableMsg *table) { + return 0; +} + +int32_t dnodeDropSuperTable(int vid, int sid, int64_t uid) { + return 0; +} + +int32_t dnodeDropTable(int vid, int sid, int64_t uid) { + return 0; +} + diff --git a/src/inc/dnode.h b/src/inc/dnode.h index d77d583dcc1d55dcb6b44a084fc76cf39151e2e3..326cddcba9f5ba030ce0ee281fece25f9c526971 100644 --- a/src/inc/dnode.h +++ b/src/inc/dnode.h @@ -22,6 +22,13 @@ extern "C" { #include #include +#include "tsched.h" + +typedef struct { + int32_t selectReqNum; + int32_t insertReqNum; + int32_t httpReqNum; +} SDnodeStatisInfo; typedef struct { char id[20]; @@ -32,7 +39,7 @@ typedef struct { } SMgmtObj; // global variables -extern pthread_mutex_t dmutex; +extern uint32_t tsRebootTime; // dnodeCluster extern void (*dnodeStartModules)(); @@ -52,11 +59,15 @@ extern int (*dnodeInitMgmt)(); extern int32_t (*dnodeInitStorage)(); extern void (*dnodeCleanupStorage)(); -void dnodeCheckDbRunning(const char* dir); - +void dnodeCheckDataDirOpenned(const char* dir); void dnodeProcessMsgFromMgmtImp(SSchedMsg *sched); + +void dnodeLockVnodes(); +void dnodeUnLockVnodes(); +SDnodeStatisInfo dnodeGetStatisInfo(); + #ifdef __cplusplus } #endif diff --git a/src/modules/http/inc/http.h b/src/inc/http.h similarity index 98% rename from src/modules/http/inc/http.h rename to src/inc/http.h index ea209f082c82c271b059e4352a5689371d1cd43f..d77f27d3eab662ed98a4d70e6ed091d6809b05d2 100644 --- a/src/modules/http/inc/http.h +++ b/src/inc/http.h @@ -42,4 +42,6 @@ #define httpLWarn(...) taosLogWarn(__VA_ARGS__) httpWarn(__VA_ARGS__) #define httpLPrint(...) taosLogPrint(__VA_ARGS__) httpPrint(__VA_ARGS__) +int32_t httpGetReqCount(); + #endif diff --git a/src/inc/taoserror.h b/src/inc/taoserror.h index 0aea5a127c3531595f54e46980ecc89989b5899b..dcc224ef05483e92c514e34ed8cb0122f0e902e3 100644 --- a/src/inc/taoserror.h +++ b/src/inc/taoserror.h @@ -20,6 +20,9 @@ extern "C" { #endif +#include +#include + #ifdef TAOS_ERROR_C #define TAOS_DEFINE_ERROR(name, mod, code, msg) {.val = (0x80000000 | ((mod)<<16) | (code)), .str=(msg)}, #else diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 98f82b2e722f6122b277ffcc99329e8c8168cb15..a180a963d8e4497ed903c9d545166f4a47b817c9 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -319,9 +319,6 @@ typedef struct { char reserved[16]; int32_t sversion; SMColumn schema[]; - - SVariableMsg tags; - } SCreateMsg; typedef struct { diff --git a/src/modules/http/inc/httpSystem.h b/src/modules/http/inc/httpSystem.h index 0a09c3f7629ee2876117b232c041d9bd015aa2a4..ccf0f3582dd2b1d3a48ce9cc93d505e61b53c879 100644 --- a/src/modules/http/inc/httpSystem.h +++ b/src/modules/http/inc/httpSystem.h @@ -20,6 +20,5 @@ int httpInitSystem(); int httpStartSystem(); void httpStopSystem(); void httpCleanUpSystem(); -void httpGetReqCount(int32_t *httpConns); #endif diff --git a/src/modules/http/src/httpJson.c b/src/modules/http/src/httpJson.c index 5d5d29f4e0fae91c8e30bfb3aaa78fb440b8a188..c75d84d731e48410ea124739988cebcfb2d0d57a 100644 --- a/src/modules/http/src/httpJson.c +++ b/src/modules/http/src/httpJson.c @@ -451,7 +451,7 @@ void httpJsonPairStatus(JsonBuf* buf, int code) { } else if (code == TSDB_CODE_INVALID_TABLE) { httpJsonPair(buf, "desc", 4, "failed to create table", 22); } else - httpJsonPair(buf, "desc", 4, tsError[code], (int)strlen(tsError[code])); + httpJsonPair(buf, "desc", 4, tstrerror(code), (int)strlen(tstrerror(code))); } } } \ No newline at end of file diff --git a/src/modules/http/src/httpSystem.c b/src/modules/http/src/httpSystem.c index efc24c15dd68fbd6105f896bf07ea48ed1db95c4..3171aee9ec4774811e0bc5ec94f0f9fe31ad19e3 100644 --- a/src/modules/http/src/httpSystem.c +++ b/src/modules/http/src/httpSystem.c @@ -146,10 +146,9 @@ void httpCleanUpSystem() { #endif } -void httpGetReqCount(int32_t *httpReqestNum) { +int32_t httpGetReqCount() { if (httpServer != NULL) { - *httpReqestNum = atomic_exchange_32(&httpServer->requestNum, 0); - } else { - *httpReqestNum = 0; + return atomic_exchange_32(&httpServer->requestNum, 0); } + return 0; } diff --git a/src/modules/monitor/inc/monitorSystem.h b/src/modules/monitor/inc/monitorSystem.h index 8093379dc0c055a246cfd5e84dbafe534dddd6d2..e64b7fad6e7e38a6ad67f439fdceebc9e579e52e 100644 --- a/src/modules/monitor/inc/monitorSystem.h +++ b/src/modules/monitor/inc/monitorSystem.h @@ -23,12 +23,6 @@ int monitorStartSystem(); void monitorStopSystem(); void monitorCleanUpSystem(); -typedef struct { - int selectReqNum; - int insertReqNum; - int httpReqNum; -} SCountInfo; - -extern void (*mnodeCountRequestFp)(SCountInfo *info); +extern void (*mnodeCountRequestFp)(SDnodeStatisInfo *info); #endif \ No newline at end of file diff --git a/src/modules/monitor/src/monitorSystem.c b/src/modules/monitor/src/monitorSystem.c index d3bff0e281d675fa230e0a49d9064ef4df442b44..a5985b4393101993051f811169e79d72722ef9fe 100644 --- a/src/modules/monitor/src/monitorSystem.c +++ b/src/modules/monitor/src/monitorSystem.c @@ -77,7 +77,7 @@ void monitorSaveAcctLog(char *acctId, int64_t currentPointsPerSecond, int64_t ma int64_t totalOutbound, int64_t maxOutbound, int64_t totalDbs, int64_t maxDbs, int64_t totalUsers, int64_t maxUsers, int64_t totalStreams, int64_t maxStreams, int64_t totalConns, int64_t maxConns, int8_t accessState); -void (*mnodeCountRequestFp)(SCountInfo *info) = NULL; +void (*mnodeCountRequestFp)(SDnodeStatisInfo *info) = NULL; void monitorExecuteSQL(char *sql); void monitorCheckDiskUsage(void *para, void *unused) { @@ -333,7 +333,7 @@ int monitorBuildBandSql(char *sql) { } int monitorBuildReqSql(char *sql) { - SCountInfo info; + SDnodeStatisInfo info; info.httpReqNum = info.insertReqNum = info.selectReqNum = 0; (*mnodeCountRequestFp)(&info); diff --git a/src/rpc/CMakeLists.txt b/src/rpc/CMakeLists.txt index 8d54d5fda591acd790c4bfab751cec079762e561..70511db53369fc0e69ba870bd8c3c151e580cbe7 100644 --- a/src/rpc/CMakeLists.txt +++ b/src/rpc/CMakeLists.txt @@ -1,8 +1,9 @@ CMAKE_MINIMUM_REQUIRED(VERSION 2.8) PROJECT(TDengine) -INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc) INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc) +INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc) +INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc) INCLUDE_DIRECTORIES(inc) IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) diff --git a/src/rpc/src/tudp.c b/src/rpc/src/tudp.c index db3e5e81c43754abe46cd907463737ceeb2b1116..a605abea5f6bdbb4470b80892910d1e49d6ee951 100644 --- a/src/rpc/src/tudp.c +++ b/src/rpc/src/tudp.c @@ -699,10 +699,7 @@ int taosSendPacketViaTcp(uint32_t ip, uint16_t port, char *data, int dataLen, vo pHead = (STaosHeader *)buffer; memcpy(pHead, data, sizeof(STaosHeader)); -#pragma GCC diagnostic push -#pragma GCC diagnostic ignored "-Wbitfield-constant-conversion" pHead->tcp = 2; -#pragma GCC diagnostic pop msgLen = sizeof(STaosHeader); pHead->msgLen = (int32_t)htonl(msgLen); diff --git a/src/sdb/CMakeLists.txt b/src/sdb/CMakeLists.txt index 62239116bd6d1e3c7900e23e6632825248f7f51b..b0617353d9e54c40257eb29b68eb0c1ac16a32d3 100644 --- a/src/sdb/CMakeLists.txt +++ b/src/sdb/CMakeLists.txt @@ -1,8 +1,9 @@ CMAKE_MINIMUM_REQUIRED(VERSION 2.8) PROJECT(TDengine) -INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc) INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc) +INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc) +INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc) INCLUDE_DIRECTORIES(inc) IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) diff --git a/src/system/CMakeLists.txt b/src/system/CMakeLists.txt deleted file mode 100644 index 516b9e85e23a56bdb6c191b16c0077afe63f4863..0000000000000000000000000000000000000000 --- a/src/system/CMakeLists.txt +++ /dev/null @@ -1,8 +0,0 @@ -CMAKE_MINIMUM_REQUIRED(VERSION 2.8) -PROJECT(TDengine) - -ADD_SUBDIRECTORY(detail) - -IF (TD_EDGE) - ADD_SUBDIRECTORY(lite) -ENDIF () \ No newline at end of file diff --git a/src/system/detail/CMakeLists.txt b/src/system/detail/CMakeLists.txt deleted file mode 100644 index 6268b97f91a359ed924a25cb8c97c5afd495d558..0000000000000000000000000000000000000000 --- a/src/system/detail/CMakeLists.txt +++ /dev/null @@ -1,46 +0,0 @@ -CMAKE_MINIMUM_REQUIRED(VERSION 2.8) -PROJECT(TDengine) - -IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) - INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc) - INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/client/inc) - INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/modules/http/inc) - INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/modules/monitor/inc) - INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/inc) - INCLUDE_DIRECTORIES(${TD_ENTERPRISE_DIR}/src/util/cluster/inc) - INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc) - INCLUDE_DIRECTORIES(inc) - AUX_SOURCE_DIRECTORY(./src SRC) - LIST(REMOVE_ITEM SRC ./src/vnodeFileUtil.c) - LIST(REMOVE_ITEM SRC ./src/taosGrant.c) - - ADD_EXECUTABLE(taosd ${SRC}) - - IF (TD_PAGMODE_LITE) - TARGET_LINK_LIBRARIES(taosd taos trpc tutil sdb monitor pthread http) - ELSE () - TARGET_LINK_LIBRARIES(taosd taos_static trpc tutil sdb monitor pthread http) - ENDIF () - - IF (TD_EDGE) - TARGET_LINK_LIBRARIES(taosd taosd_edge) - ELSE () - TARGET_LINK_LIBRARIES(taosd taosd_cluster) - ENDIF () - - SET(PREPARE_ENV_CMD "prepare_env_cmd") - SET(PREPARE_ENV_TARGET "prepare_env_target") - ADD_CUSTOM_COMMAND(OUTPUT ${PREPARE_ENV_CMD} - POST_BUILD - COMMAND echo "make test directory" - DEPENDS taosd - COMMAND ${CMAKE_COMMAND} -E make_directory ${TD_TESTS_OUTPUT_DIR}/cfg/ - COMMAND ${CMAKE_COMMAND} -E make_directory ${TD_TESTS_OUTPUT_DIR}/log/ - COMMAND ${CMAKE_COMMAND} -E make_directory ${TD_TESTS_OUTPUT_DIR}/data/ - COMMAND ${CMAKE_COMMAND} -E echo dataDir ${TD_TESTS_OUTPUT_DIR}/data > ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg - COMMAND ${CMAKE_COMMAND} -E echo logDir ${TD_TESTS_OUTPUT_DIR}/log >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg - COMMAND ${CMAKE_COMMAND} -E echo charset UTF-8 >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg - COMMENT "prepare taosd environment") - ADD_CUSTOM_TARGET(${PREPARE_ENV_TARGET} ALL WORKING_DIRECTORY ${TD_EXECUTABLE_OUTPUT_PATH} DEPENDS ${PREPARE_ENV_CMD}) - -ENDIF () diff --git a/src/system/lite/CMakeLists.txt b/src/system/lite/CMakeLists.txt deleted file mode 100644 index 8c648747e7bb243e06b8c9167b7a85e549d2ad9c..0000000000000000000000000000000000000000 --- a/src/system/lite/CMakeLists.txt +++ /dev/null @@ -1,17 +0,0 @@ -CMAKE_MINIMUM_REQUIRED(VERSION 2.8) -PROJECT(TDengine) - -IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) - INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc) - INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/client/inc) - INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/modules/http/inc) - INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/modules/monitor/inc) - INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/system/detail/inc) - INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/inc) - INCLUDE_DIRECTORIES(${TD_ENTERPRISE_DIR}/src/util/cluster/inc) - INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc) - INCLUDE_DIRECTORIES(inc) - - AUX_SOURCE_DIRECTORY(./src SRC) - ADD_LIBRARY(taosd_edge ${SRC}) -ENDIF () diff --git a/src/util/inc/tbuffer.h b/src/util/inc/tbuffer.h new file mode 100644 index 0000000000000000000000000000000000000000..c3bb336e817eb4eade3c97c0f5ea9e17553e88c5 --- /dev/null +++ b/src/util/inc/tbuffer.h @@ -0,0 +1,104 @@ +/* + * Copyright (c) 2020 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include +#include +#include +#include + +#ifndef TDENGINE_TBUFFER_H +#define TDENGINE_TBUFFER_H + + +/* +SBuffer can be used to read or write a buffer, but cannot be used for both +read & write at a same time. +Read example: + SBuffer rbuf; + if (tbufBeginOperation(&rbuf) != 0) { + // handling errors + } + tbufInitRead(&rbuf, data, 1024); + int32_t a = tbufReadInt32(&rbuf); + // other read functions + +Write example: + SBuffer wbuf; + if (tbufBeginOperation(&wbuf) != 0) { + // handling errors + } + tbufInitWrite(&wbuf, 1024); + tbufWriteInt32(&wbuf, 10); + // other write functions + size_t size = tbufGetSize(&wbuf); + char* data = tbufGetBuffer(&wbuf, true); + tbufUninitWrite(&wbuf); +*/ +typedef struct { + jmp_buf jb; + char* buf; + size_t pos; + size_t size; +} SBuffer; + + +// common functions can be used in both read & write +#define tbufBeginOperation(buf) setjmp((buf)->jb) +size_t tbufTell(SBuffer* buf); +size_t tbufSeekTo(SBuffer* buf, size_t pos); +size_t tbufSkip(SBuffer* buf, size_t size); + + +// basic read functions +void tbufInitRead(SBuffer* buf, void* data, size_t size); +char* tbufRead(SBuffer* buf, size_t size); +void tbufReadToBuffer(SBuffer* buf, void* dst, size_t size); +const char* tbufReadString(SBuffer* buf, size_t* len); +size_t tbufReadToString(SBuffer* buf, char* dst, size_t size); + + +// basic write functions +void tbufInitWrite(SBuffer* buf, size_t size); +void tbufEnsureCapacity(SBuffer* buf, size_t size); +char* tbufGetResult(SBuffer* buf, bool takeOver); +void tbufUninitWrite(SBuffer* buf); +void tbufWrite(SBuffer* buf, const void* data, size_t size); +void tbufWriteAt(SBuffer* buf, size_t pos, const void* data, size_t size); +void tbufWriteStringLen(SBuffer* buf, const char* str, size_t len); +void tbufWriteString(SBuffer* buf, const char* str); + + +// read & write function for primitive types +#ifndef TBUFFER_DEFINE_OPERATION +#define TBUFFER_DEFINE_OPERATION(type, name) \ + type tbufRead##name(SBuffer* buf); \ + void tbufWrite##name(SBuffer* buf, type data); \ + void tbufWrite##name##At(SBuffer* buf, size_t pos, type data); +#endif + +TBUFFER_DEFINE_OPERATION( bool, Bool ) +TBUFFER_DEFINE_OPERATION( char, Char ) +TBUFFER_DEFINE_OPERATION( int8_t, Int8 ) +TBUFFER_DEFINE_OPERATION( uint8_t, Unt8 ) +TBUFFER_DEFINE_OPERATION( int16_t, Int16 ) +TBUFFER_DEFINE_OPERATION( uint16_t, Uint16 ) +TBUFFER_DEFINE_OPERATION( int32_t, Int32 ) +TBUFFER_DEFINE_OPERATION( uint32_t, Uint32 ) +TBUFFER_DEFINE_OPERATION( int64_t, Int64 ) +TBUFFER_DEFINE_OPERATION( uint64_t, Uint64 ) +TBUFFER_DEFINE_OPERATION( float, Float ) +TBUFFER_DEFINE_OPERATION( double, Double ) + +#endif \ No newline at end of file diff --git a/src/util/inc/tstatus.h b/src/util/inc/tstatus.h index 2e848ebfcdc56e8fff4102900ae7dd8c67fb4e08..8e2f82e024a0560cea5570b9ebd11292b875ecd0 100644 --- a/src/util/inc/tstatus.h +++ b/src/util/inc/tstatus.h @@ -25,13 +25,13 @@ extern "C" { #include "taoserror.h" enum _TSDB_VG_STATUS { - TSDB_VG_STATUS_READY = TSDB_CODE_SUCCESS, - TSDB_VG_STATUS_IN_PROGRESS = TSDB_CODE_ACTION_IN_PROGRESS, - TSDB_VG_STATUS_NO_DISK_PERMISSIONS = TSDB_CODE_NO_DISK_PERMISSIONS, - TSDB_VG_STATUS_SERVER_NO_PACE = TSDB_CODE_SERV_NO_DISKSPACE, - TSDB_VG_STATUS_SERV_OUT_OF_MEMORY = TSDB_CODE_SERV_OUT_OF_MEMORY, - TSDB_VG_STATUS_INIT_FAILED = TSDB_CODE_VG_INIT_FAILED, - TSDB_VG_STATUS_FULL = TSDB_CODE_NO_ENOUGH_DNODES, + TSDB_VG_STATUS_READY = TSDB_CODE_SUCCESS, + TSDB_VG_STATUS_IN_PROGRESS = 1, //TSDB_CODE_ACTION_IN_PROGRESS, + TSDB_VG_STATUS_NO_DISK_PERMISSIONS = 73,//TSDB_CODE_NO_DISK_PERMISSIONS, + TSDB_VG_STATUS_SERVER_NO_PACE = 110, //TSDB_CODE_SERV_NO_DISKSPACE, + TSDB_VG_STATUS_SERV_OUT_OF_MEMORY = 69, //TSDB_CODE_SERV_OUT_OF_MEMORY, + TSDB_VG_STATUS_INIT_FAILED = 74, //TSDB_CODE_VG_INIT_FAILED, + TSDB_VG_STATUS_FULL = 48, //TSDB_CODE_NO_ENOUGH_DNODES, }; enum _TSDB_DB_STATUS { diff --git a/src/util/src/tbuffer.c b/src/util/src/tbuffer.c new file mode 100644 index 0000000000000000000000000000000000000000..0b8cfbbae9bf5c9e62d895dfe6005f6a79516537 --- /dev/null +++ b/src/util/src/tbuffer.c @@ -0,0 +1,158 @@ +/* + * Copyright (c) 2020 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define TBUFFER_DEFINE_OPERATION(type, name) \ + type tbufRead##name(SBuffer* buf) { \ + type ret; \ + tbufReadToBuffer(buf, &ret, sizeof(type)); \ + return ret; \ + }\ + void tbufWrite##name(SBuffer* buf, type data) {\ + tbufWrite(buf, &data, sizeof(data));\ + }\ + void tbufWrite##name##At(SBuffer* buf, size_t pos, type data) {\ + tbufWriteAt(buf, pos, &data, sizeof(data));\ + } + +#include "../inc/tbuffer.h" + + +//////////////////////////////////////////////////////////////////////////////// +// common functions + +size_t tbufTell(SBuffer* buf) { + return buf->pos; +} + +size_t tbufSeekTo(SBuffer* buf, size_t pos) { + if (pos > buf->size) { + longjmp(buf->jb, 1); + } + size_t old = buf->pos; + buf->pos = pos; + return old; +} + +size_t tbufSkip(SBuffer* buf, size_t size) { + return tbufSeekTo(buf, buf->pos + size); +} + +//////////////////////////////////////////////////////////////////////////////// +// read functions + +void tbufInitRead(SBuffer* buf, void* data, size_t size) { + buf->buf = (char*)data; + buf->pos = 0; + // empty buffer is not an error, but read an empty buffer is + buf->size = (data == NULL) ? 0 : size; +} + +char* tbufRead(SBuffer* buf, size_t size) { + char* ret = buf->buf + buf->pos; + tbufSkip(buf, size); + return ret; +} + +void tbufReadToBuffer(SBuffer* buf, void* dst, size_t size) { + assert(dst != NULL); + // always using memcpy, leave optimization to compiler + memcpy(dst, tbufRead(buf, size), size); +} + +const char* tbufReadString(SBuffer* buf, size_t* len) { + uint16_t l = tbufReadUint16(); + char* ret = buf->buf + buf->pos; + tbufSkip(buf, l + 1); + ret[l] = 0; // ensure the string end with '\0' + if (len != NULL) { + *len = l; + } + return ret; +} + +size_t tbufReadToString(SBuffer* buf, char* dst, size_t size) { + size_t len; + const char* str = tbufReadString(buf, &len); + if (len >= size) len = size - 1; + memcpy(dst, str, len); + dst[len] = 0; + return len; +} + + +//////////////////////////////////////////////////////////////////////////////// +// write functions + +void tbufEnsureCapacity(SBuffer* buf, size_t size) { + size += buf->pos; + if (size > buf->size) { + char* nbuf = NULL; + size_t nsize = size + buf->size; + nbuf = realloc(buf->buf, nsize); + if (nbuf == NULL) { + longjmp(buf->jb, 2); + } + buf->buf = nbuf; + buf->size = nsize; + } +} + +void tbufInitWrite(SBuffer* buf, size_t size) { + buf->buf = NULL; + buf->pos = 0; + buf->size = 0; + tbufEnsureCapacity(buf, size); +} + +char* tbufGetResult(SBuffer* buf, bool takeOver) { + char* ret = buf->buf; + if (takeOver) { + buf->pos = 0; + buf->size = 0; + buf->buf = NULL; + } + return ret; +} + +void tbufUninitWrite(SBuffer* buf) { + free(buf->buf); +} + +void tbufWrite(SBuffer* buf, const void* data, size_t size) { + tbufEnsureCapacity(size); + memcpy(buf->buf + buf->pos, data, size); + buf->pos += size; +} + +void tbufWriteAt(SBuffer* buf, size_t pos, const void* data, size_t size) { + // this function can only be called to fill the gap on previous writes, + // so 'pos + size <= buf->pos' must be true + if (pos + size > buf->pos) { + longjmp(buf->jb, 3); + } + memcpy(buf->buf + pos, data, size); +} + +void tbufWriteStringLen(SBuffer* buf, const char* str, size_t len) { + if (len > 0xffff) { + longjmp(buf->jb , 4); + } + tbufWriteUint16(buf, (uint16_t)len); + tbufWrite(buf, str, len + 1); +} + +void tbufWriteString(SBuffer* buf, const char* str) { + tbufWriteStringLen(buf, str, strlen(str)); +} diff --git a/src/util/src/tstatus.c b/src/util/src/tstatus.c index 4c8d7868fdebdcdcbee016cf605881b8070a277d..ea8fb630c8b97efe43592beed267d8991db3103e 100644 --- a/src/util/src/tstatus.c +++ b/src/util/src/tstatus.c @@ -18,13 +18,13 @@ const char* taosGetVgroupStatusStr(int32_t vgroupStatus) { switch (vgroupStatus) { - case TSDB_VG_STATUS_READY: return tsError[vgroupStatus]; - case TSDB_VG_STATUS_IN_PROGRESS: return tsError[vgroupStatus]; - case TSDB_VG_STATUS_NO_DISK_PERMISSIONS: return tsError[vgroupStatus]; - case TSDB_VG_STATUS_SERVER_NO_PACE: return tsError[vgroupStatus]; - case TSDB_VG_STATUS_SERV_OUT_OF_MEMORY: return tsError[vgroupStatus]; - case TSDB_VG_STATUS_INIT_FAILED: return tsError[vgroupStatus]; - case TSDB_VG_STATUS_FULL: return tsError[vgroupStatus]; + case TSDB_VG_STATUS_READY: return tstrerror(vgroupStatus); + case TSDB_VG_STATUS_IN_PROGRESS: return tstrerror(vgroupStatus); + case TSDB_VG_STATUS_NO_DISK_PERMISSIONS: return tstrerror(vgroupStatus); + case TSDB_VG_STATUS_SERVER_NO_PACE: return tstrerror(vgroupStatus); + case TSDB_VG_STATUS_SERV_OUT_OF_MEMORY: return tstrerror(vgroupStatus); + case TSDB_VG_STATUS_INIT_FAILED: return tstrerror(vgroupStatus); + case TSDB_VG_STATUS_FULL: return tstrerror(vgroupStatus); default: return "undefined"; } } diff --git a/src/vnode/common/inc/vnodePeer.h b/src/vnode/common/inc/vnodePeer.h index 6c69326fbcc08f6120c347f6c395117f38600653..47aaa4037772c1d14dee4d22b82066a2136eb934 100644 --- a/src/vnode/common/inc/vnodePeer.h +++ b/src/vnode/common/inc/vnodePeer.h @@ -27,7 +27,7 @@ extern "C" { /* * Initialize the resources */ -int32_t vnodeInitPeer(int numOfThreads); +int32_t vnodeInitPeers(int numOfThreads); /* * Free the resources diff --git a/src/vnode/detail/inc/vnode.h b/src/vnode/detail/inc/vnode.h index 158f4b2479f7be81a4de933cc6ea98b568bcc84d..3b502a9f7995e1c329c36fa11827e4ce2268efd1 100644 --- a/src/vnode/detail/inc/vnode.h +++ b/src/vnode/detail/inc/vnode.h @@ -302,7 +302,6 @@ typedef struct { // internal globals extern int tsMeterSizeOnFile; -extern uint32_t tsRebootTime; extern void ** rpcQhandle; extern void * queryQhandle; diff --git a/src/vnode/detail/inc/vnodeShell.h b/src/vnode/detail/inc/vnodeShell.h index 25646fbb1a88e2bd971d90b35fa8ecaf4cc42374..b03634ca8ff9d8697d82479e922a83382e29f221 100644 --- a/src/vnode/detail/inc/vnodeShell.h +++ b/src/vnode/detail/inc/vnodeShell.h @@ -22,18 +22,6 @@ extern "C" { #include "os.h" -typedef struct { - int sid; - int vnode; - uint32_t ip; - uint16_t port; - int count; // track the number of imports - int code; // track the code of imports - int numOfTotalPoints; // track the total number of points imported - void * thandle; // handle from TAOS layer - void * qhandle; -} SShellObj; - #ifdef __cplusplus } #endif