提交 770ca34e 编写于 作者: 陶建辉(Jeff)'s avatar 陶建辉(Jeff)

Merge branch '2.0' of https://github.com/taosdata/TDengine into 2.0

...@@ -1071,7 +1071,7 @@ int doParseInsertSql(SSqlObj *pSql, char *str) { ...@@ -1071,7 +1071,7 @@ int doParseInsertSql(SSqlObj *pSql, char *str) {
} }
// todo add to return // todo add to return
tscError("async insert parse error, code:%d, %s", code, tsError[code]); tscError("async insert parse error, code:%d, %s", code, tstrerror(code));
pSql->asyncTblPos = NULL; pSql->asyncTblPos = NULL;
} }
......
...@@ -849,7 +849,7 @@ char *taos_errstr(TAOS *taos) { ...@@ -849,7 +849,7 @@ char *taos_errstr(TAOS *taos) {
STscObj *pObj = (STscObj *)taos; STscObj *pObj = (STscObj *)taos;
uint8_t code; uint8_t code;
if (pObj == NULL || pObj->signature != pObj) return tsError[globalCode]; if (pObj == NULL || pObj->signature != pObj) return tstrerror(globalCode);
SSqlObj* pSql = pObj->pSql; SSqlObj* pSql = pObj->pSql;
...@@ -862,7 +862,7 @@ char *taos_errstr(TAOS *taos) { ...@@ -862,7 +862,7 @@ char *taos_errstr(TAOS *taos) {
if (hasAdditionalErrorInfo(code, &pSql->cmd)) { if (hasAdditionalErrorInfo(code, &pSql->cmd)) {
return pSql->cmd.payload; return pSql->cmd.payload;
} else { } else {
return tsError[code]; return tstrerror(code);
} }
} }
......
...@@ -4,25 +4,18 @@ PROJECT(TDengine) ...@@ -4,25 +4,18 @@ PROJECT(TDengine)
IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc) INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc)
INCLUDE_DIRECTORIES(${TD_ENTERPRISE_DIR}/src/inc) INCLUDE_DIRECTORIES(${TD_ENTERPRISE_DIR}/src/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/dnode/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/mnode/detail/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/vnode/detail/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/client/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/modules/http/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/modules/monitor/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/inc)
INCLUDE_DIRECTORIES(${TD_ENTERPRISE_DIR}/src/util/cluster/inc)
INCLUDE_DIRECTORIES(inc) INCLUDE_DIRECTORIES(inc)
AUX_SOURCE_DIRECTORY(src SRC) AUX_SOURCE_DIRECTORY(src SRC)
ADD_EXECUTABLE(taosd ${SRC}) ADD_LIBRARY(dnode ${SRC})
TARGET_LINK_LIBRARIES(taosd mnode vnode taos_static monitor http) #ADD_EXECUTABLE(taosd ${SRC})
#TARGET_LINK_LIBRARIES(taosd mnode sdb vnode taos_static monitor http)
IF (TD_CLUSTER) #IF (TD_CLUSTER)
TARGET_LINK_LIBRARIES(taosd dcluster) # TARGET_LINK_LIBRARIES(taosd dcluster)
ENDIF () #ENDIF ()
SET(PREPARE_ENV_CMD "prepare_env_cmd") SET(PREPARE_ENV_CMD "prepare_env_cmd")
SET(PREPARE_ENV_TARGET "prepare_env_target") SET(PREPARE_ENV_TARGET "prepare_env_target")
......
...@@ -23,7 +23,7 @@ extern "C" { ...@@ -23,7 +23,7 @@ extern "C" {
/* /*
* Dnode handle read messages * Dnode handle read messages
* The processing result is returned by callback function with pShellObj parameter * The processing result is returned by callback function with pShellObj parameter
*/ */
int32_t dnodeReadData(SQueryMeterMsg *msg, void *pShellObj, void (*callback)(SQueryMeterRsp *rspMsg, void *pShellObj)); int32_t dnodeReadData(SQueryMeterMsg *msg, void *pShellObj, void (*callback)(SQueryMeterRsp *rspMsg, void *pShellObj));
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -20,6 +20,26 @@ ...@@ -20,6 +20,26 @@
extern "C" { extern "C" {
#endif #endif
#include <stdint.h>
#include <stdint.h>
#include "dnode.h"
typedef struct {
int sid;
int vnode;
uint32_t ip;
uint16_t port;
int32_t count; // track the number of imports
int32_t code; // track the code of imports
int32_t numOfTotalPoints; // track the total number of points imported
void *thandle; // handle from TAOS layer
void *qhandle;
} SShellObj;
int32_t dnodeInitShell();
//SDnodeStatisInfo dnodeGetStatisInfo()
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -23,16 +23,31 @@ extern "C" { ...@@ -23,16 +23,31 @@ extern "C" {
#include <stdint.h> #include <stdint.h>
#include <stdbool.h> #include <stdbool.h>
#include <pthread.h> #include <pthread.h>
#include "dnode.h"
extern pthread_mutex_t dmutex; typedef enum {
extern bool tsDnodeStopping; TSDB_DNODE_RUN_STATUS_INITIALIZE,
TSDB_DNODE_RUN_STATUS_RUNING,
TSDB_DNODE_RUN_STATUS_STOPPED
} SDnodeRunStatus;
int dnodeInitSystem(); extern int32_t (*dnodeInitPeers)(int32_t numOfThreads);
extern int32_t (*dnodeCheckSystem)();
extern int32_t (*dnodeInitStorage)();
extern void (*dnodeCleanupStorage)();
extern void (*dnodeParseParameterK)();
extern int32_t tsMaxQueues;
int32_t dnodeInitSystem();
void dnodeCleanUpSystem(); void dnodeCleanUpSystem();
void dnodeCheckDbRunning(const char* dir); void dnodeInitPlugins();
int vnodeInitStore(); SDnodeRunStatus dnodeGetRunStatus();
int vnodeInitPeer(int numOfThreads); void dnodeSetRunStatus(SDnodeRunStatus status);
void dnodeCheckDataDirOpenned(const char *dir);
void dnodeLockVnodes();
void dnodeUnLockVnodes();
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -33,7 +33,7 @@ int32_t dnodeOpenVnodes(); ...@@ -33,7 +33,7 @@ int32_t dnodeOpenVnodes();
/* /*
* Close all Vnodes that have been created and opened * Close all Vnodes that have been created and opened
*/ */
int32_t dnodeCloseVnodes(); int32_t dnodeCleanupVnodes();
/* /*
* Check if vnode already exists * Check if vnode already exists
...@@ -43,7 +43,8 @@ int32_t dnodeCheckVnodeExist(int vid); ...@@ -43,7 +43,8 @@ int32_t dnodeCheckVnodeExist(int vid);
/* /*
* Create vnode with specified configuration and open it * Create vnode with specified configuration and open it
*/ */
tsdb_repo_t* dnodeCreateVnode(int vid, SVnodeCfg *cfg); //tsdb_repo_t* dnodeCreateVnode(int vid, SVnodeCfg *cfg);
void* dnodeCreateVnode(int vid, SVnodeCfg *cfg);
/* /*
* Modify vnode configuration information * Modify vnode configuration information
...@@ -53,7 +54,7 @@ int32_t dnodeConfigVnode(int vid, SVnodeCfg *cfg); ...@@ -53,7 +54,7 @@ int32_t dnodeConfigVnode(int vid, SVnodeCfg *cfg);
/* /*
* Modify vnode replication information * Modify vnode replication information
*/ */
int32_t dnodeConfigVnodePeers(int vid, /*SVpeerCfgMsg *cfg*/); int32_t dnodeConfigVnodePeers(int vid/*, SVpeerCfgMsg *cfg*/);
/* /*
* Remove vnode from local repository * Remove vnode from local repository
...@@ -63,8 +64,8 @@ int32_t dnodeDropVnode(int vid); ...@@ -63,8 +64,8 @@ int32_t dnodeDropVnode(int vid);
/* /*
* Get the vnode object that has been opened * Get the vnode object that has been opened
*/ */
tsdb_repo_t* dnodeGetVnode(int vid); //tsdb_repo_t* dnodeGetVnode(int vid);
void* dnodeGetVnode(int vid);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -25,31 +25,58 @@ extern "C" { ...@@ -25,31 +25,58 @@ extern "C" {
#include "taosdef.h" #include "taosdef.h"
#include "taosmsg.h" #include "taosmsg.h"
/*
* Write data based on dnode
* If >= 0, it is affect rows
* If < 0, get error code from terrno
*/
int32_t dnodeWriteData(SShellSubmitMsg *msg);
/* /*
* Check if table already exists * Check if table already exists
*/ */
int32_t dnodeCheckTableExist(int vid, int sid, int64_t uid); int32_t dnodeCheckTableExist(char *tableId);
/* /*
* Create table with specified configuration and open it * Create noraml table with specified configuration and open it
*/ */
int32_t dnodeCreateTable(int vid, int sid, SCreateMsg *table); int32_t dnodeCreateNormalTable(SCreateNormalTableMsg *table);
/* /*
* Modify table configuration information * Create stream table with specified configuration and open it
*/ */
int32_t dnodeAlterTable(int vid, SMeterObj *table); int32_t dnodeCreateStreamTable(SCreateStreamTableMsg *table);
/* /*
* Remove table from local repository * Create child table with specified configuration and open it
*/ */
int32_t dnodeDropTable(int vid, int sid, int64_t uid); int32_t dnodeCreateChildTable(SCreateChildTableMsg *table);
/* /*
* Write data based on dnode * Modify normal table configuration information
*
*/ */
int32_t dnodeWriteData(SShellSubmitMsg *msg); int32_t dnodeAlterNormalTable(SCreateNormalTableMsg *table);
/*
* Modify stream table configuration information
*/
int32_t dnodeAlterStreamTable(SCreateStreamTableMsg *table);
/*
* Modify child table configuration information
*/
int32_t dnodeAlterChildTable(SCreateChildTableMsg *table);
/*
* Remove all child tables of supertable from local repository
*/
int32_t dnodeDropSuperTable(int vid, int sid, int64_t uid);
/*
* Remove table from local repository
*/
int32_t dnodeDropTable(int vid, int sid, int64_t uid);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -71,7 +71,7 @@ void dnodeCleanUpModules() { ...@@ -71,7 +71,7 @@ void dnodeCleanUpModules() {
} }
void dnodeProcessModuleStatus(uint32_t status) { void dnodeProcessModuleStatus(uint32_t status) {
if (tsDnodeStopping) { if (tsDnodeRunStatus) {
return; return;
} }
......
...@@ -15,39 +15,13 @@ ...@@ -15,39 +15,13 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h" #include "os.h"
#include "tlog.h"
#include "tglobalcfg.h" #include "tglobalcfg.h"
#include "vnode.h"
#include "dnodeSystem.h" #include "dnodeSystem.h"
#ifdef CLUSTER /*
#include "dnodeCluster.h" * Termination handler
#include "httpAdmin.h" */
#include "mnodeAccount.h"
#include "mnodeBalance.h"
#include "mnodeCluster.h"
#include "sdbReplica.h"
#include "multilevelStorage.h"
#include "vnodeCluster.h"
#include "vnodeReplica.h"
#include "dnodeGrant.h"
void init() {
dnodeClusterInit();
httpAdminInit();
mnodeAccountInit();
mnodeBalanceInit();
mnodeClusterInit();
sdbReplicaInit();
multilevelStorageInit();
vnodeClusterInit();
vnodeReplicaInit();
dnodeGrantInit();
}
#endif
void dnodeParseParameterKImp() {}
void (*dnodeParseParameterK)() = dnodeParseParameterKImp;
/* Termination handler */
void signal_handler(int signum, siginfo_t *sigInfo, void *context) { void signal_handler(int signum, siginfo_t *sigInfo, void *context) {
if (signum == SIGUSR1) { if (signum == SIGUSR1) {
tsCfgDynamicOptions("debugFlag 135"); tsCfgDynamicOptions("debugFlag 135");
...@@ -70,6 +44,8 @@ void signal_handler(int signum, siginfo_t *sigInfo, void *context) { ...@@ -70,6 +44,8 @@ void signal_handler(int signum, siginfo_t *sigInfo, void *context) {
} }
int main(int argc, char *argv[]) { int main(int argc, char *argv[]) {
dnodeInitPlugins();
// Set global configuration file // Set global configuration file
for (int i = 1; i < argc; ++i) { for (int i = 1; i < argc; ++i) {
if (strcmp(argv[i], "-c") == 0) { if (strcmp(argv[i], "-c") == 0) {
......
...@@ -15,33 +15,29 @@ ...@@ -15,33 +15,29 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h" #include "os.h"
#include "taoserror.h"
#include "taosdef.h"
#include "taosmsg.h" #include "taosmsg.h"
#include "vnode.h" #include "tlog.h"
#include "vnodeShell.h"
#include "tschemautil.h" #include "tschemautil.h"
#include "textbuffer.h" #include "textbuffer.h"
#include "trpc.h" #include "trpc.h"
#include "tscJoinProcess.h" #include "dnode.h"
#include "vnode.h" #include "dnodeSystem.h"
#include "vnodeRead.h" #include "dnodeShell.h"
#include "vnodeUtil.h"
#include "vnodeStore.h"
#include "vnodeStatus.h"
extern int tsMaxQueues;
void * pShellServer = NULL;
SShellObj **shellList = NULL;
int vnodeProcessRetrieveRequest(char *pMsg, int msgLen, SShellObj *pObj); int32_t vnodeProcessRetrieveRequest(char *pMsg, int msgLen, SShellObj *pObj);
int vnodeProcessQueryRequest(char *pMsg, int msgLen, SShellObj *pObj); int32_t vnodeProcessQueryRequest(char *pMsg, int msgLen, SShellObj *pObj);
int vnodeProcessShellSubmitRequest(char *pMsg, int msgLen, SShellObj *pObj); int32_t vnodeProcessShellSubmitRequest(char *pMsg, int msgLen, SShellObj *pObj);
static void vnodeProcessBatchSubmitTimer(void *param, void *tmrId); static void vnodeProcessBatchSubmitTimer(void *param, void *tmrId);
int vnodeSelectReqNum = 0; static void *pShellServer = NULL;
int vnodeInsertReqNum = 0; static SShellObj **shellList = NULL;
static int32_t dnodeSelectReqNum = 0;
static int32_t dnodeInsertReqNum = 0;
typedef struct { typedef struct {
int32_t import; int32_t import;
...@@ -131,7 +127,7 @@ void *vnodeProcessMsgFromShell(char *msg, void *ahandle, void *thandle) { ...@@ -131,7 +127,7 @@ void *vnodeProcessMsgFromShell(char *msg, void *ahandle, void *thandle) {
return pObj; return pObj;
} }
int vnodeInitShell() { int32_t dnodeInitShell() {
int size; int size;
SRpcInit rpcInit; SRpcInit rpcInit;
...@@ -399,7 +395,7 @@ _query_over: ...@@ -399,7 +395,7 @@ _query_over:
vnodeFreeColumnInfo(&pQueryMsg->colList[i]); vnodeFreeColumnInfo(&pQueryMsg->colList[i]);
} }
atomic_fetch_add_32(&vnodeSelectReqNum, 1); atomic_fetch_add_32(&dnodeSelectReqNum, 1);
return ret; return ret;
} }
...@@ -700,7 +696,7 @@ _submit_over: ...@@ -700,7 +696,7 @@ _submit_over:
ret = vnodeSendShellSubmitRspMsg(pObj, code, pObj->numOfTotalPoints); ret = vnodeSendShellSubmitRspMsg(pObj, code, pObj->numOfTotalPoints);
} }
atomic_fetch_add_32(&vnodeInsertReqNum, 1); atomic_fetch_add_32(&dnodeInsertReqNum, 1);
return ret; return ret;
} }
...@@ -729,3 +725,15 @@ static void vnodeProcessBatchSubmitTimer(void *param, void *tmrId) { ...@@ -729,3 +725,15 @@ static void vnodeProcessBatchSubmitTimer(void *param, void *tmrId) {
vnodeSendShellSubmitRspMsg(pShell, code, pShell->numOfTotalPoints); vnodeSendShellSubmitRspMsg(pShell, code, pShell->numOfTotalPoints);
} }
} }
SDnodeStatisInfo dnodeGetStatisInfo() {
SDnodeStatisInfo info = {0};
if (dnodeGetRunStatus() == TSDB_DNODE_RUN_STATUS_RUNING) {
info.httpReqNum = httpGetReqCount();
info.selectReqNum = atomic_exchange_32(&dnodeSelectReqNum, 0);
info.insertReqNum = atomic_exchange_32(&dnodeInsertReqNum, 0);
}
return info;
}
\ No newline at end of file
...@@ -16,43 +16,78 @@ ...@@ -16,43 +16,78 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h" #include "os.h"
#include "taosdef.h" #include "taosdef.h"
#include "taoserror.h"
#include "tcrc32c.h"
#include "tlog.h" #include "tlog.h"
#include "ttime.h"
#include "ttimer.h" #include "ttimer.h"
#include "tutil.h"
#include "http.h"
#include "dnode.h" #include "dnode.h"
#include "dnodeMgmt.h" #include "dnodeMgmt.h"
#include "dnodeModule.h" #include "dnodeModule.h"
#include "dnodeShell.h"
#include "dnodeSystem.h" #include "dnodeSystem.h"
#include "monitorSystem.h" #include "dnodeVnodeMgmt.h"
#include "httpSystem.h"
#include "mgmtSystem.h" #ifdef CLUSTER
#include "dnodeCluster.h"
#include "vnode.h" #include "httpAdmin.h"
#include "mnodeAccount.h"
pthread_mutex_t dmutex; #include "mnodeBalance.h"
extern int vnodeSelectReqNum; #include "mnodeCluster.h"
extern int vnodeInsertReqNum; #include "sdbReplica.h"
void * tsStatusTimer = NULL; #include "multilevelStorage.h"
bool tsDnodeStopping = false; #include "vnodeCluster.h"
#include "vnodeReplica.h"
// internal global, not configurable #include "dnodeGrant.h"
void * vnodeTmrCtrl; #endif
void ** rpcQhandle;
void * dmQhandle; static pthread_mutex_t tsDnodeMutex;
void * queryQhandle; static SDnodeRunStatus tsDnodeRunStatus = TSDB_DNODE_RUN_STATUS_STOPPED;
int tsVnodePeers = TSDB_VNODES_SUPPORT - 1;
int tsMaxQueues; static int32_t dnodeInitRpcQHandle();
static int32_t dnodeInitQueryQHandle();
static int32_t dnodeInitTmrCtl();
void *tsStatusTimer = NULL;
void *vnodeTmrCtrl;
void **rpcQhandle;
void *dmQhandle;
void *queryQhandle;
int32_t tsVnodePeers = TSDB_VNODES_SUPPORT - 1;
int32_t tsMaxQueues;
uint32_t tsRebootTime; uint32_t tsRebootTime;
int32_t dnodeInitRpcQHandle(); static void dnodeInitVnodesLock() {
int32_t dnodeInitQueryQHandle(); pthread_mutex_init(&tsDnodeMutex, NULL);
int32_t dnodeInitTmrCtl(); }
void dnodeCountRequestImp(SCountInfo *info);
void dnodeLockVnodes() {
pthread_mutex_lock(&tsDnodeMutex);
}
void dnodeUnLockVnodes() {
pthread_mutex_unlock(&tsDnodeMutex);
}
static void dnodeCleanVnodesLock() {
pthread_mutex_destroy(&tsDnodeMutex);
}
SDnodeRunStatus dnodeGetRunStatus() {
return tsDnodeRunStatus;
}
void dnodeSetRunStatus(SDnodeRunStatus status) {
tsDnodeRunStatus = status;
}
void dnodeCleanUpSystem() { void dnodeCleanUpSystem() {
if (tsDnodeStopping) { if (dnodeGetRunStatus() == TSDB_DNODE_RUN_STATUS_STOPPED) {
return; return;
} else { } else {
tsDnodeStopping = true; dnodeSetRunStatus(TSDB_DNODE_RUN_STATUS_STOPPED);
} }
if (tsStatusTimer != NULL) { if (tsStatusTimer != NULL) {
...@@ -61,33 +96,48 @@ void dnodeCleanUpSystem() { ...@@ -61,33 +96,48 @@ void dnodeCleanUpSystem() {
} }
dnodeCleanUpModules(); dnodeCleanUpModules();
dnodeCleanupVnodes();
vnodeCleanUpVnodes();
taosCloseLogger(); taosCloseLogger();
dnodeCleanupStorage(); dnodeCleanupStorage();
dnodeCleanVnodesLock();
} }
void dnodeCheckDbRunning(const char* dir) { void dnodeCheckDataDirOpenned(const char *dir) {
char filepath[256] = {0}; char filepath[256] = {0};
sprintf(filepath, "%s/.running", dir); sprintf(filepath, "%s/.running", dir);
int fd = open(filepath, O_WRONLY | O_CREAT | O_TRUNC, S_IRWXU | S_IRWXG | S_IRWXO); int32_t fd = open(filepath, O_WRONLY | O_CREAT | O_TRUNC, S_IRWXU | S_IRWXG | S_IRWXO);
int ret = flock(fd, LOCK_EX | LOCK_NB); int32_t ret = flock(fd, LOCK_EX | LOCK_NB);
if (ret != 0) { if (ret != 0) {
dError("failed to lock file:%s ret:%d, database may be running, quit", filepath, ret); dError("failed to lock file:%s ret:%d, database may be running, quit", filepath, ret);
exit(0); exit(0);
} }
} }
int dnodeInitSystem() { void dnodeInitPlugins() {
#ifdef CLUSTER
dnodeClusterInit();
httpAdminInit();
mnodeAccountInit();
mnodeBalanceInit();
mnodeClusterInit();
sdbReplicaInit();
multilevelStorageInit();
vnodeClusterInit();
vnodeReplicaInit();
dnodeGrantInit();
#endif
}
int32_t dnodeInitSystem() {
char temp[128]; char temp[128];
struct stat dirstat; struct stat dirstat;
dnodeSetRunStatus(TSDB_DNODE_RUN_STATUS_INITIALIZE);
taosResolveCRC(); taosResolveCRC();
tsRebootTime = taosGetTimestampSec(); tsRebootTime = taosGetTimestampSec();
tscEmbedded = 1; tscEmbedded = 1;
// Read global configuration. // Read global configuration.
tsReadGlobalLogConfig(); tsReadGlobalLogConfig();
...@@ -124,7 +174,7 @@ int dnodeInitSystem() { ...@@ -124,7 +174,7 @@ int dnodeInitSystem() {
dnodeAllocModules(); dnodeAllocModules();
pthread_mutex_init(&dmutex, NULL); dnodeInitVnodesLock();
dPrint("starting to initialize TDengine ..."); dPrint("starting to initialize TDengine ...");
...@@ -136,7 +186,7 @@ int dnodeInitSystem() { ...@@ -136,7 +186,7 @@ int dnodeInitSystem() {
if (dnodeCheckSystem() < 0) { if (dnodeCheckSystem() < 0) {
return -1; return -1;
} }
if (dnodeInitModules() < 0) { if (dnodeInitModules() < 0) {
return -1; return -1;
} }
...@@ -151,14 +201,14 @@ int dnodeInitSystem() { ...@@ -151,14 +201,14 @@ int dnodeInitSystem() {
return -1; return -1;
} }
if (vnodeInitStore() < 0) { if (dnodeOpenVnodes() < 0) {
dError("failed to init vnode storage"); dError("failed to init vnode storage");
return -1; return -1;
} }
int numOfThreads = (1.0 - tsRatioOfQueryThreads) * tsNumOfCores * tsNumOfThreadsPerCore / 2.0; int32_t numOfThreads = (1.0 - tsRatioOfQueryThreads) * tsNumOfCores * tsNumOfThreadsPerCore / 2.0;
if (numOfThreads < 1) numOfThreads = 1; if (numOfThreads < 1) numOfThreads = 1;
if (vnodeInitPeer(numOfThreads) < 0) { if (dnodeInitPeers(numOfThreads) < 0) {
dError("failed to init vnode peer communication"); dError("failed to init vnode peer communication");
return -1; return -1;
} }
...@@ -168,40 +218,21 @@ int dnodeInitSystem() { ...@@ -168,40 +218,21 @@ int dnodeInitSystem() {
return -1; return -1;
} }
if (vnodeInitShell() < 0) { if (dnodeInitShell() < 0) {
dError("failed to init communication to shell"); dError("failed to init communication to shell");
return -1; return -1;
} }
if (vnodeInitVnodes() < 0) {
dError("failed to init store");
return -1;
}
mnodeCountRequestFp = dnodeCountRequestImp;
dnodeStartModules(); dnodeStartModules();
dnodeSetRunStatus(TSDB_DNODE_RUN_STATUS_RUNING);
dPrint("TDengine is initialized successfully"); dPrint("TDengine is initialized successfully");
return 0; return 0;
} }
void dnodeResetSystem() { int32_t dnodeInitStorageImp() {
dPrint("reset the system ...");
for (int vnode = 0; vnode < TSDB_MAX_VNODES; ++vnode) {
vnodeRemoveVnode(vnode);
}
mgmtStopSystem();
}
void dnodeCountRequestImp(SCountInfo *info) {
httpGetReqCount(&info->httpReqNum);
info->selectReqNum = atomic_exchange_32(&vnodeSelectReqNum, 0);
info->insertReqNum = atomic_exchange_32(&vnodeInsertReqNum, 0);
}
int dnodeInitStorageImp() {
struct stat dirstat; struct stat dirstat;
strcpy(tsDirectory, dataDir); strcpy(tsDirectory, dataDir);
if (stat(dataDir, &dirstat) < 0) { if (stat(dataDir, &dirstat) < 0) {
...@@ -218,31 +249,34 @@ int dnodeInitStorageImp() { ...@@ -218,31 +249,34 @@ int dnodeInitStorageImp() {
sprintf(mgmtDirectory, "%s/mgmt", tsDirectory); sprintf(mgmtDirectory, "%s/mgmt", tsDirectory);
sprintf(tsDirectory, "%s/tsdb", dataDir); sprintf(tsDirectory, "%s/tsdb", dataDir);
dnodeCheckDbRunning(dataDir); dnodeCheckDataDirOpenned(dataDir);
return 0; return 0;
} }
int32_t (*dnodeInitStorage)() = dnodeInitStorageImp; int32_t (*dnodeInitStorage)() = dnodeInitStorageImp;
void dnodeCleanupStorageImp() {} void dnodeCleanupStorageImp() {}
void (*dnodeCleanupStorage)() = dnodeCleanupStorageImp; void (*dnodeCleanupStorage)() = dnodeCleanupStorageImp;
int32_t dnodeInitQueryQHandle() { static int32_t dnodeInitQueryQHandle() {
int numOfThreads = tsRatioOfQueryThreads * tsNumOfCores * tsNumOfThreadsPerCore; int32_t numOfThreads = tsRatioOfQueryThreads * tsNumOfCores * tsNumOfThreadsPerCore;
if (numOfThreads < 1) { if (numOfThreads < 1) {
numOfThreads = 1; numOfThreads = 1;
} }
int32_t maxQueueSize = tsNumOfVnodesPerCore * tsNumOfCores * tsSessionsPerVnode; int32_t maxQueueSize = tsNumOfVnodesPerCore * tsNumOfCores * tsSessionsPerVnode;
dTrace("query task queue initialized, max slot:%d, task threads:%d", maxQueueSize,numOfThreads); dTrace("query task queue initialized, max slot:%d, task threads:%d", maxQueueSize, numOfThreads);
queryQhandle = taosInitSchedulerWithInfo(maxQueueSize, numOfThreads, "query", vnodeTmrCtrl); queryQhandle = taosInitSchedulerWithInfo(maxQueueSize, numOfThreads, "query", vnodeTmrCtrl);
return 0; return 0;
} }
int32_t dnodeInitTmrCtl() { static int32_t dnodeInitTmrCtl() {
vnodeTmrCtrl = taosTmrInit(TSDB_MAX_VNODES * (tsVnodePeers + 10) + tsSessionsPerVnode + 1000, 200, 60000, "DND-vnode"); vnodeTmrCtrl = taosTmrInit(TSDB_MAX_VNODES * (tsVnodePeers + 10) + tsSessionsPerVnode + 1000, 200, 60000,
"DND-vnode");
if (vnodeTmrCtrl == NULL) { if (vnodeTmrCtrl == NULL) {
dError("failed to init timer, exit"); dError("failed to init timer, exit");
return -1; return -1;
...@@ -251,22 +285,39 @@ int32_t dnodeInitTmrCtl() { ...@@ -251,22 +285,39 @@ int32_t dnodeInitTmrCtl() {
return 0; return 0;
} }
int32_t dnodeInitRpcQHandle() { static int32_t dnodeInitRpcQHandle() {
tsMaxQueues = (1.0 - tsRatioOfQueryThreads)*tsNumOfCores*tsNumOfThreadsPerCore / 2.0; tsMaxQueues = (1.0 - tsRatioOfQueryThreads) * tsNumOfCores * tsNumOfThreadsPerCore / 2.0;
if (tsMaxQueues < 1) tsMaxQueues = 1; if (tsMaxQueues < 1) {
tsMaxQueues = 1;
}
rpcQhandle = malloc(tsMaxQueues*sizeof(void *)); rpcQhandle = malloc(tsMaxQueues * sizeof(void *));
for (int i=0; i< tsMaxQueues; ++i ) for (int32_t i = 0; i < tsMaxQueues; ++i) {
rpcQhandle[i] = taosInitScheduler(tsSessionsPerVnode, 1, "dnode"); rpcQhandle[i] = taosInitScheduler(tsSessionsPerVnode, 1, "dnode");
}
dmQhandle = taosInitScheduler(tsSessionsPerVnode, 1, "mgmt"); dmQhandle = taosInitScheduler(tsSessionsPerVnode, 1, "mgmt");
return 0; return 0;
} }
int32_t dnodeCheckSystemImp() {
return 0;
}
int32_t (*dnodeCheckSystem)() = dnodeCheckSystemImp;
void dnodeParseParameterKImp() {}
void (*dnodeParseParameterK)() = dnodeParseParameterKImp;
int32_t dnodeInitPeersImp(int32_t numOfThreads) {
return 0;
}
int32_t (*dnodeInitPeers)(int32_t numOfThreads) = dnodeInitPeersImp;
int dnodeCheckSystemImp() { return 0; }
int (*dnodeCheckSystem)() = dnodeCheckSystemImp;
...@@ -15,7 +15,45 @@ ...@@ -15,7 +15,45 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "dnodeWrite.h" #include "dnodeWrite.h"
#include "taoserror.h"
int32_t dnodeCreateTable(int vid, int sid, SCreateTableMsg *table) { int32_t dnodeCheckTableExist(char *tableId) {
return 0;
}
int32_t dnodeWriteData(SShellSubmitMsg *msg) {
return 0;
}
int32_t dnodeCreateNormalTable(SCreateNormalTableMsg *table) {
return 0;
}
int32_t dnodeCreateStreamTable(SCreateStreamTableMsg *table) {
return 0;
}
int32_t dnodeCreateChildTable(SCreateChildTableMsg *table) {
return 0;
}
int32_t dnodeAlterNormalTable(SCreateNormalTableMsg *table) {
return 0;
} }
int32_t dnodeAlterStreamTable(SCreateStreamTableMsg *table) {
return 0;
}
int32_t dnodeAlterChildTable(SCreateChildTableMsg *table) {
return 0;
}
int32_t dnodeDropSuperTable(int vid, int sid, int64_t uid) {
return 0;
}
int32_t dnodeDropTable(int vid, int sid, int64_t uid) {
return 0;
}
...@@ -22,6 +22,13 @@ extern "C" { ...@@ -22,6 +22,13 @@ extern "C" {
#include <stdint.h> #include <stdint.h>
#include <pthread.h> #include <pthread.h>
#include "tsched.h"
typedef struct {
int32_t selectReqNum;
int32_t insertReqNum;
int32_t httpReqNum;
} SDnodeStatisInfo;
typedef struct { typedef struct {
char id[20]; char id[20];
...@@ -32,7 +39,7 @@ typedef struct { ...@@ -32,7 +39,7 @@ typedef struct {
} SMgmtObj; } SMgmtObj;
// global variables // global variables
extern pthread_mutex_t dmutex; extern uint32_t tsRebootTime;
// dnodeCluster // dnodeCluster
extern void (*dnodeStartModules)(); extern void (*dnodeStartModules)();
...@@ -52,11 +59,15 @@ extern int (*dnodeInitMgmt)(); ...@@ -52,11 +59,15 @@ extern int (*dnodeInitMgmt)();
extern int32_t (*dnodeInitStorage)(); extern int32_t (*dnodeInitStorage)();
extern void (*dnodeCleanupStorage)(); extern void (*dnodeCleanupStorage)();
void dnodeCheckDbRunning(const char* dir); void dnodeCheckDataDirOpenned(const char* dir);
void dnodeProcessMsgFromMgmtImp(SSchedMsg *sched); void dnodeProcessMsgFromMgmtImp(SSchedMsg *sched);
void dnodeLockVnodes();
void dnodeUnLockVnodes();
SDnodeStatisInfo dnodeGetStatisInfo();
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -42,4 +42,6 @@ ...@@ -42,4 +42,6 @@
#define httpLWarn(...) taosLogWarn(__VA_ARGS__) httpWarn(__VA_ARGS__) #define httpLWarn(...) taosLogWarn(__VA_ARGS__) httpWarn(__VA_ARGS__)
#define httpLPrint(...) taosLogPrint(__VA_ARGS__) httpPrint(__VA_ARGS__) #define httpLPrint(...) taosLogPrint(__VA_ARGS__) httpPrint(__VA_ARGS__)
int32_t httpGetReqCount();
#endif #endif
...@@ -20,6 +20,9 @@ ...@@ -20,6 +20,9 @@
extern "C" { extern "C" {
#endif #endif
#include <stdint.h>
#include <stdbool.h>
#ifdef TAOS_ERROR_C #ifdef TAOS_ERROR_C
#define TAOS_DEFINE_ERROR(name, mod, code, msg) {.val = (0x80000000 | ((mod)<<16) | (code)), .str=(msg)}, #define TAOS_DEFINE_ERROR(name, mod, code, msg) {.val = (0x80000000 | ((mod)<<16) | (code)), .str=(msg)},
#else #else
......
...@@ -319,9 +319,6 @@ typedef struct { ...@@ -319,9 +319,6 @@ typedef struct {
char reserved[16]; char reserved[16];
int32_t sversion; int32_t sversion;
SMColumn schema[]; SMColumn schema[];
SVariableMsg tags;
} SCreateMsg; } SCreateMsg;
typedef struct { typedef struct {
......
...@@ -20,6 +20,5 @@ int httpInitSystem(); ...@@ -20,6 +20,5 @@ int httpInitSystem();
int httpStartSystem(); int httpStartSystem();
void httpStopSystem(); void httpStopSystem();
void httpCleanUpSystem(); void httpCleanUpSystem();
void httpGetReqCount(int32_t *httpConns);
#endif #endif
...@@ -451,7 +451,7 @@ void httpJsonPairStatus(JsonBuf* buf, int code) { ...@@ -451,7 +451,7 @@ void httpJsonPairStatus(JsonBuf* buf, int code) {
} else if (code == TSDB_CODE_INVALID_TABLE) { } else if (code == TSDB_CODE_INVALID_TABLE) {
httpJsonPair(buf, "desc", 4, "failed to create table", 22); httpJsonPair(buf, "desc", 4, "failed to create table", 22);
} else } else
httpJsonPair(buf, "desc", 4, tsError[code], (int)strlen(tsError[code])); httpJsonPair(buf, "desc", 4, tstrerror(code), (int)strlen(tstrerror(code)));
} }
} }
} }
\ No newline at end of file
...@@ -146,10 +146,9 @@ void httpCleanUpSystem() { ...@@ -146,10 +146,9 @@ void httpCleanUpSystem() {
#endif #endif
} }
void httpGetReqCount(int32_t *httpReqestNum) { int32_t httpGetReqCount() {
if (httpServer != NULL) { if (httpServer != NULL) {
*httpReqestNum = atomic_exchange_32(&httpServer->requestNum, 0); return atomic_exchange_32(&httpServer->requestNum, 0);
} else {
*httpReqestNum = 0;
} }
return 0;
} }
...@@ -23,12 +23,6 @@ int monitorStartSystem(); ...@@ -23,12 +23,6 @@ int monitorStartSystem();
void monitorStopSystem(); void monitorStopSystem();
void monitorCleanUpSystem(); void monitorCleanUpSystem();
typedef struct { extern void (*mnodeCountRequestFp)(SDnodeStatisInfo *info);
int selectReqNum;
int insertReqNum;
int httpReqNum;
} SCountInfo;
extern void (*mnodeCountRequestFp)(SCountInfo *info);
#endif #endif
\ No newline at end of file
...@@ -77,7 +77,7 @@ void monitorSaveAcctLog(char *acctId, int64_t currentPointsPerSecond, int64_t ma ...@@ -77,7 +77,7 @@ void monitorSaveAcctLog(char *acctId, int64_t currentPointsPerSecond, int64_t ma
int64_t totalOutbound, int64_t maxOutbound, int64_t totalDbs, int64_t maxDbs, int64_t totalOutbound, int64_t maxOutbound, int64_t totalDbs, int64_t maxDbs,
int64_t totalUsers, int64_t maxUsers, int64_t totalStreams, int64_t maxStreams, int64_t totalUsers, int64_t maxUsers, int64_t totalStreams, int64_t maxStreams,
int64_t totalConns, int64_t maxConns, int8_t accessState); int64_t totalConns, int64_t maxConns, int8_t accessState);
void (*mnodeCountRequestFp)(SCountInfo *info) = NULL; void (*mnodeCountRequestFp)(SDnodeStatisInfo *info) = NULL;
void monitorExecuteSQL(char *sql); void monitorExecuteSQL(char *sql);
void monitorCheckDiskUsage(void *para, void *unused) { void monitorCheckDiskUsage(void *para, void *unused) {
...@@ -333,7 +333,7 @@ int monitorBuildBandSql(char *sql) { ...@@ -333,7 +333,7 @@ int monitorBuildBandSql(char *sql) {
} }
int monitorBuildReqSql(char *sql) { int monitorBuildReqSql(char *sql) {
SCountInfo info; SDnodeStatisInfo info;
info.httpReqNum = info.insertReqNum = info.selectReqNum = 0; info.httpReqNum = info.insertReqNum = info.selectReqNum = 0;
(*mnodeCountRequestFp)(&info); (*mnodeCountRequestFp)(&info);
......
CMAKE_MINIMUM_REQUIRED(VERSION 2.8) CMAKE_MINIMUM_REQUIRED(VERSION 2.8)
PROJECT(TDengine) PROJECT(TDengine)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc)
INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc) INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc)
INCLUDE_DIRECTORIES(inc) INCLUDE_DIRECTORIES(inc)
IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
......
...@@ -699,10 +699,7 @@ int taosSendPacketViaTcp(uint32_t ip, uint16_t port, char *data, int dataLen, vo ...@@ -699,10 +699,7 @@ int taosSendPacketViaTcp(uint32_t ip, uint16_t port, char *data, int dataLen, vo
pHead = (STaosHeader *)buffer; pHead = (STaosHeader *)buffer;
memcpy(pHead, data, sizeof(STaosHeader)); memcpy(pHead, data, sizeof(STaosHeader));
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wbitfield-constant-conversion"
pHead->tcp = 2; pHead->tcp = 2;
#pragma GCC diagnostic pop
msgLen = sizeof(STaosHeader); msgLen = sizeof(STaosHeader);
pHead->msgLen = (int32_t)htonl(msgLen); pHead->msgLen = (int32_t)htonl(msgLen);
......
CMAKE_MINIMUM_REQUIRED(VERSION 2.8) CMAKE_MINIMUM_REQUIRED(VERSION 2.8)
PROJECT(TDengine) PROJECT(TDengine)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc)
INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc) INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc)
INCLUDE_DIRECTORIES(inc) INCLUDE_DIRECTORIES(inc)
IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
......
CMAKE_MINIMUM_REQUIRED(VERSION 2.8)
PROJECT(TDengine)
ADD_SUBDIRECTORY(detail)
IF (TD_EDGE)
ADD_SUBDIRECTORY(lite)
ENDIF ()
\ No newline at end of file
CMAKE_MINIMUM_REQUIRED(VERSION 2.8)
PROJECT(TDengine)
IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/client/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/modules/http/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/modules/monitor/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/inc)
INCLUDE_DIRECTORIES(${TD_ENTERPRISE_DIR}/src/util/cluster/inc)
INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc)
INCLUDE_DIRECTORIES(inc)
AUX_SOURCE_DIRECTORY(./src SRC)
LIST(REMOVE_ITEM SRC ./src/vnodeFileUtil.c)
LIST(REMOVE_ITEM SRC ./src/taosGrant.c)
ADD_EXECUTABLE(taosd ${SRC})
IF (TD_PAGMODE_LITE)
TARGET_LINK_LIBRARIES(taosd taos trpc tutil sdb monitor pthread http)
ELSE ()
TARGET_LINK_LIBRARIES(taosd taos_static trpc tutil sdb monitor pthread http)
ENDIF ()
IF (TD_EDGE)
TARGET_LINK_LIBRARIES(taosd taosd_edge)
ELSE ()
TARGET_LINK_LIBRARIES(taosd taosd_cluster)
ENDIF ()
SET(PREPARE_ENV_CMD "prepare_env_cmd")
SET(PREPARE_ENV_TARGET "prepare_env_target")
ADD_CUSTOM_COMMAND(OUTPUT ${PREPARE_ENV_CMD}
POST_BUILD
COMMAND echo "make test directory"
DEPENDS taosd
COMMAND ${CMAKE_COMMAND} -E make_directory ${TD_TESTS_OUTPUT_DIR}/cfg/
COMMAND ${CMAKE_COMMAND} -E make_directory ${TD_TESTS_OUTPUT_DIR}/log/
COMMAND ${CMAKE_COMMAND} -E make_directory ${TD_TESTS_OUTPUT_DIR}/data/
COMMAND ${CMAKE_COMMAND} -E echo dataDir ${TD_TESTS_OUTPUT_DIR}/data > ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
COMMAND ${CMAKE_COMMAND} -E echo logDir ${TD_TESTS_OUTPUT_DIR}/log >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
COMMAND ${CMAKE_COMMAND} -E echo charset UTF-8 >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
COMMENT "prepare taosd environment")
ADD_CUSTOM_TARGET(${PREPARE_ENV_TARGET} ALL WORKING_DIRECTORY ${TD_EXECUTABLE_OUTPUT_PATH} DEPENDS ${PREPARE_ENV_CMD})
ENDIF ()
CMAKE_MINIMUM_REQUIRED(VERSION 2.8)
PROJECT(TDengine)
IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/client/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/modules/http/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/modules/monitor/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/system/detail/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/inc)
INCLUDE_DIRECTORIES(${TD_ENTERPRISE_DIR}/src/util/cluster/inc)
INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc)
INCLUDE_DIRECTORIES(inc)
AUX_SOURCE_DIRECTORY(./src SRC)
ADD_LIBRARY(taosd_edge ${SRC})
ENDIF ()
/*
* Copyright (c) 2020 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <stddef.h>
#include <stdint.h>
#include <stdbool.h>
#include <setjmp.h>
#ifndef TDENGINE_TBUFFER_H
#define TDENGINE_TBUFFER_H
/*
SBuffer can be used to read or write a buffer, but cannot be used for both
read & write at a same time.
Read example:
SBuffer rbuf;
if (tbufBeginOperation(&rbuf) != 0) {
// handling errors
}
tbufInitRead(&rbuf, data, 1024);
int32_t a = tbufReadInt32(&rbuf);
// other read functions
Write example:
SBuffer wbuf;
if (tbufBeginOperation(&wbuf) != 0) {
// handling errors
}
tbufInitWrite(&wbuf, 1024);
tbufWriteInt32(&wbuf, 10);
// other write functions
size_t size = tbufGetSize(&wbuf);
char* data = tbufGetBuffer(&wbuf, true);
tbufUninitWrite(&wbuf);
*/
typedef struct {
jmp_buf jb;
char* buf;
size_t pos;
size_t size;
} SBuffer;
// common functions can be used in both read & write
#define tbufBeginOperation(buf) setjmp((buf)->jb)
size_t tbufTell(SBuffer* buf);
size_t tbufSeekTo(SBuffer* buf, size_t pos);
size_t tbufSkip(SBuffer* buf, size_t size);
// basic read functions
void tbufInitRead(SBuffer* buf, void* data, size_t size);
char* tbufRead(SBuffer* buf, size_t size);
void tbufReadToBuffer(SBuffer* buf, void* dst, size_t size);
const char* tbufReadString(SBuffer* buf, size_t* len);
size_t tbufReadToString(SBuffer* buf, char* dst, size_t size);
// basic write functions
void tbufInitWrite(SBuffer* buf, size_t size);
void tbufEnsureCapacity(SBuffer* buf, size_t size);
char* tbufGetResult(SBuffer* buf, bool takeOver);
void tbufUninitWrite(SBuffer* buf);
void tbufWrite(SBuffer* buf, const void* data, size_t size);
void tbufWriteAt(SBuffer* buf, size_t pos, const void* data, size_t size);
void tbufWriteStringLen(SBuffer* buf, const char* str, size_t len);
void tbufWriteString(SBuffer* buf, const char* str);
// read & write function for primitive types
#ifndef TBUFFER_DEFINE_OPERATION
#define TBUFFER_DEFINE_OPERATION(type, name) \
type tbufRead##name(SBuffer* buf); \
void tbufWrite##name(SBuffer* buf, type data); \
void tbufWrite##name##At(SBuffer* buf, size_t pos, type data);
#endif
TBUFFER_DEFINE_OPERATION( bool, Bool )
TBUFFER_DEFINE_OPERATION( char, Char )
TBUFFER_DEFINE_OPERATION( int8_t, Int8 )
TBUFFER_DEFINE_OPERATION( uint8_t, Unt8 )
TBUFFER_DEFINE_OPERATION( int16_t, Int16 )
TBUFFER_DEFINE_OPERATION( uint16_t, Uint16 )
TBUFFER_DEFINE_OPERATION( int32_t, Int32 )
TBUFFER_DEFINE_OPERATION( uint32_t, Uint32 )
TBUFFER_DEFINE_OPERATION( int64_t, Int64 )
TBUFFER_DEFINE_OPERATION( uint64_t, Uint64 )
TBUFFER_DEFINE_OPERATION( float, Float )
TBUFFER_DEFINE_OPERATION( double, Double )
#endif
\ No newline at end of file
...@@ -25,13 +25,13 @@ extern "C" { ...@@ -25,13 +25,13 @@ extern "C" {
#include "taoserror.h" #include "taoserror.h"
enum _TSDB_VG_STATUS { enum _TSDB_VG_STATUS {
TSDB_VG_STATUS_READY = TSDB_CODE_SUCCESS, TSDB_VG_STATUS_READY = TSDB_CODE_SUCCESS,
TSDB_VG_STATUS_IN_PROGRESS = TSDB_CODE_ACTION_IN_PROGRESS, TSDB_VG_STATUS_IN_PROGRESS = 1, //TSDB_CODE_ACTION_IN_PROGRESS,
TSDB_VG_STATUS_NO_DISK_PERMISSIONS = TSDB_CODE_NO_DISK_PERMISSIONS, TSDB_VG_STATUS_NO_DISK_PERMISSIONS = 73,//TSDB_CODE_NO_DISK_PERMISSIONS,
TSDB_VG_STATUS_SERVER_NO_PACE = TSDB_CODE_SERV_NO_DISKSPACE, TSDB_VG_STATUS_SERVER_NO_PACE = 110, //TSDB_CODE_SERV_NO_DISKSPACE,
TSDB_VG_STATUS_SERV_OUT_OF_MEMORY = TSDB_CODE_SERV_OUT_OF_MEMORY, TSDB_VG_STATUS_SERV_OUT_OF_MEMORY = 69, //TSDB_CODE_SERV_OUT_OF_MEMORY,
TSDB_VG_STATUS_INIT_FAILED = TSDB_CODE_VG_INIT_FAILED, TSDB_VG_STATUS_INIT_FAILED = 74, //TSDB_CODE_VG_INIT_FAILED,
TSDB_VG_STATUS_FULL = TSDB_CODE_NO_ENOUGH_DNODES, TSDB_VG_STATUS_FULL = 48, //TSDB_CODE_NO_ENOUGH_DNODES,
}; };
enum _TSDB_DB_STATUS { enum _TSDB_DB_STATUS {
......
/*
* Copyright (c) 2020 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define TBUFFER_DEFINE_OPERATION(type, name) \
type tbufRead##name(SBuffer* buf) { \
type ret; \
tbufReadToBuffer(buf, &ret, sizeof(type)); \
return ret; \
}\
void tbufWrite##name(SBuffer* buf, type data) {\
tbufWrite(buf, &data, sizeof(data));\
}\
void tbufWrite##name##At(SBuffer* buf, size_t pos, type data) {\
tbufWriteAt(buf, pos, &data, sizeof(data));\
}
#include "../inc/tbuffer.h"
////////////////////////////////////////////////////////////////////////////////
// common functions
size_t tbufTell(SBuffer* buf) {
return buf->pos;
}
size_t tbufSeekTo(SBuffer* buf, size_t pos) {
if (pos > buf->size) {
longjmp(buf->jb, 1);
}
size_t old = buf->pos;
buf->pos = pos;
return old;
}
size_t tbufSkip(SBuffer* buf, size_t size) {
return tbufSeekTo(buf, buf->pos + size);
}
////////////////////////////////////////////////////////////////////////////////
// read functions
void tbufInitRead(SBuffer* buf, void* data, size_t size) {
buf->buf = (char*)data;
buf->pos = 0;
// empty buffer is not an error, but read an empty buffer is
buf->size = (data == NULL) ? 0 : size;
}
char* tbufRead(SBuffer* buf, size_t size) {
char* ret = buf->buf + buf->pos;
tbufSkip(buf, size);
return ret;
}
void tbufReadToBuffer(SBuffer* buf, void* dst, size_t size) {
assert(dst != NULL);
// always using memcpy, leave optimization to compiler
memcpy(dst, tbufRead(buf, size), size);
}
const char* tbufReadString(SBuffer* buf, size_t* len) {
uint16_t l = tbufReadUint16();
char* ret = buf->buf + buf->pos;
tbufSkip(buf, l + 1);
ret[l] = 0; // ensure the string end with '\0'
if (len != NULL) {
*len = l;
}
return ret;
}
size_t tbufReadToString(SBuffer* buf, char* dst, size_t size) {
size_t len;
const char* str = tbufReadString(buf, &len);
if (len >= size) len = size - 1;
memcpy(dst, str, len);
dst[len] = 0;
return len;
}
////////////////////////////////////////////////////////////////////////////////
// write functions
void tbufEnsureCapacity(SBuffer* buf, size_t size) {
size += buf->pos;
if (size > buf->size) {
char* nbuf = NULL;
size_t nsize = size + buf->size;
nbuf = realloc(buf->buf, nsize);
if (nbuf == NULL) {
longjmp(buf->jb, 2);
}
buf->buf = nbuf;
buf->size = nsize;
}
}
void tbufInitWrite(SBuffer* buf, size_t size) {
buf->buf = NULL;
buf->pos = 0;
buf->size = 0;
tbufEnsureCapacity(buf, size);
}
char* tbufGetResult(SBuffer* buf, bool takeOver) {
char* ret = buf->buf;
if (takeOver) {
buf->pos = 0;
buf->size = 0;
buf->buf = NULL;
}
return ret;
}
void tbufUninitWrite(SBuffer* buf) {
free(buf->buf);
}
void tbufWrite(SBuffer* buf, const void* data, size_t size) {
tbufEnsureCapacity(size);
memcpy(buf->buf + buf->pos, data, size);
buf->pos += size;
}
void tbufWriteAt(SBuffer* buf, size_t pos, const void* data, size_t size) {
// this function can only be called to fill the gap on previous writes,
// so 'pos + size <= buf->pos' must be true
if (pos + size > buf->pos) {
longjmp(buf->jb, 3);
}
memcpy(buf->buf + pos, data, size);
}
void tbufWriteStringLen(SBuffer* buf, const char* str, size_t len) {
if (len > 0xffff) {
longjmp(buf->jb , 4);
}
tbufWriteUint16(buf, (uint16_t)len);
tbufWrite(buf, str, len + 1);
}
void tbufWriteString(SBuffer* buf, const char* str) {
tbufWriteStringLen(buf, str, strlen(str));
}
...@@ -18,13 +18,13 @@ ...@@ -18,13 +18,13 @@
const char* taosGetVgroupStatusStr(int32_t vgroupStatus) { const char* taosGetVgroupStatusStr(int32_t vgroupStatus) {
switch (vgroupStatus) { switch (vgroupStatus) {
case TSDB_VG_STATUS_READY: return tsError[vgroupStatus]; case TSDB_VG_STATUS_READY: return tstrerror(vgroupStatus);
case TSDB_VG_STATUS_IN_PROGRESS: return tsError[vgroupStatus]; case TSDB_VG_STATUS_IN_PROGRESS: return tstrerror(vgroupStatus);
case TSDB_VG_STATUS_NO_DISK_PERMISSIONS: return tsError[vgroupStatus]; case TSDB_VG_STATUS_NO_DISK_PERMISSIONS: return tstrerror(vgroupStatus);
case TSDB_VG_STATUS_SERVER_NO_PACE: return tsError[vgroupStatus]; case TSDB_VG_STATUS_SERVER_NO_PACE: return tstrerror(vgroupStatus);
case TSDB_VG_STATUS_SERV_OUT_OF_MEMORY: return tsError[vgroupStatus]; case TSDB_VG_STATUS_SERV_OUT_OF_MEMORY: return tstrerror(vgroupStatus);
case TSDB_VG_STATUS_INIT_FAILED: return tsError[vgroupStatus]; case TSDB_VG_STATUS_INIT_FAILED: return tstrerror(vgroupStatus);
case TSDB_VG_STATUS_FULL: return tsError[vgroupStatus]; case TSDB_VG_STATUS_FULL: return tstrerror(vgroupStatus);
default: return "undefined"; default: return "undefined";
} }
} }
......
...@@ -27,7 +27,7 @@ extern "C" { ...@@ -27,7 +27,7 @@ extern "C" {
/* /*
* Initialize the resources * Initialize the resources
*/ */
int32_t vnodeInitPeer(int numOfThreads); int32_t vnodeInitPeers(int numOfThreads);
/* /*
* Free the resources * Free the resources
......
...@@ -302,7 +302,6 @@ typedef struct { ...@@ -302,7 +302,6 @@ typedef struct {
// internal globals // internal globals
extern int tsMeterSizeOnFile; extern int tsMeterSizeOnFile;
extern uint32_t tsRebootTime;
extern void ** rpcQhandle; extern void ** rpcQhandle;
extern void * queryQhandle; extern void * queryQhandle;
......
...@@ -22,18 +22,6 @@ extern "C" { ...@@ -22,18 +22,6 @@ extern "C" {
#include "os.h" #include "os.h"
typedef struct {
int sid;
int vnode;
uint32_t ip;
uint16_t port;
int count; // track the number of imports
int code; // track the code of imports
int numOfTotalPoints; // track the total number of points imported
void * thandle; // handle from TAOS layer
void * qhandle;
} SShellObj;
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册