提交 0d02f7fc 编写于 作者: H Hongze Cheng

Merge branch '3.0' into feature/vnode

......@@ -28,10 +28,6 @@ extern char tsSecond[];
extern char tsLocalFqdn[];
extern char tsLocalEp[];
extern uint16_t tsServerPort;
extern uint16_t tsDnodeShellPort;
extern uint16_t tsDnodeDnodePort;
extern uint16_t tsSyncPort;
extern uint16_t tsArbitratorPort;
extern int32_t tsStatusInterval;
extern int32_t tsNumOfMnodes;
extern int8_t tsEnableVnodeBak;
......
......@@ -78,7 +78,7 @@ typedef struct {
* @brief data file's directory.
*
*/
char dataDir[PATH_MAX];
char dataDir[TSDB_FILENAME_LEN];
/**
* @brief local endpoint.
......@@ -121,10 +121,10 @@ typedef struct {
/**
* @brief Initialize and start the dnode.
*
* @param pOptions Options of the dnode.
* @param pOption Option of the dnode.
* @return SDnode* The dnode object.
*/
SDnode *dndInit(SDnodeOpt *pOptions);
SDnode *dndInit(SDnodeOpt *pOption);
/**
* @brief Stop and cleanup the dnode.
......
......@@ -24,10 +24,10 @@ extern "C" {
typedef struct SDnode SDnode;
typedef struct SMnode SMnode;
typedef struct SMnodeMsg SMnodeMsg;
typedef void (*SendMsgToDnodeFp)(SDnode *pDnd, struct SEpSet *epSet, struct SRpcMsg *rpcMsg);
typedef void (*SendMsgToMnodeFp)(SDnode *pDnd, struct SRpcMsg *rpcMsg);
typedef void (*SendRedirectMsgFp)(SDnode *pDnd, struct SRpcMsg *rpcMsg, bool forShell);
typedef int32_t (*PutMsgToMnodeQFp)(SDnode *pDnd, SMnodeMsg *pMsg);
typedef void (*SendMsgToDnodeFp)(SDnode *pDnode, struct SEpSet *epSet, struct SRpcMsg *rpcMsg);
typedef void (*SendMsgToMnodeFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg);
typedef void (*SendRedirectMsgFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg);
typedef int32_t (*PutMsgToMnodeQFp)(SDnode *pDnode, SMnodeMsg *pMsg);
typedef struct SMnodeLoad {
int64_t numOfDnode;
......@@ -53,17 +53,17 @@ typedef struct {
SendMsgToDnodeFp sendMsgToDnodeFp;
SendMsgToMnodeFp sendMsgToMnodeFp;
SendRedirectMsgFp sendRedirectMsgFp;
} SMnodeOptions;
} SMnodeOpt;
/* ------------------------ SMnode ------------------------ */
/**
* @brief Open a mnode.
*
* @param path Path of the mnode
* @param pOptions Options of the mnode
* @param pOption Option of the mnode
* @return SMnode* The mnode object
*/
SMnode *mnodeOpen(const char *path, const SMnodeOptions *pOptions);
SMnode *mnodeOpen(const char *path, const SMnodeOpt *pOption);
/**
* @brief Close a mnode
......@@ -76,10 +76,10 @@ void mnodeClose(SMnode *pMnode);
* @brief Close a mnode
*
* @param pMnode The mnode object to close
* @param pOptions Options of the mnode
* @param pOption Options of the mnode
* @return int32_t 0 for success, -1 for failure
*/
int32_t mnodeAlter(SMnode *pMnode, const SMnodeOptions *pOptions);
int32_t mnodeAlter(SMnode *pMnode, const SMnodeOpt *pOption);
/**
* @brief Drop a mnode.
......
......@@ -194,27 +194,9 @@ typedef struct {
SRpcMsg rpcMsg[];
} SVnodeMsg;
typedef struct SDnode SDnode;
typedef void (*SendMsgToDnodeFp)(SDnode *pDnd, struct SEpSet *epSet, struct SRpcMsg *rpcMsg);
typedef void (*SendMsgToMnodeFp)(SDnode *pDnd, struct SRpcMsg *rpcMsg);
typedef void (*SendRedirectMsgFp)(SDnode *pDnd, struct SRpcMsg *rpcMsg, bool forShell);
typedef int32_t (*PutMsgToVnodeQFp)(SDnode *pDnd, int32_t vgId, SVnodeMsg *pMsg);
typedef struct {
PutMsgToVnodeQFp putMsgToApplyQueueFp;
SendMsgToDnodeFp sendMsgToDnodeFp;
SendMsgToMnodeFp sendMsgToMnodeFp;
} SVnodePara;
int32_t vnodeInit(SVnodePara);
void vnodeCleanup();
int32_t vnodeAlter(SVnode *pVnode, const SVnodeCfg *pCfg);
SVnode *vnodeCreate(int32_t vgId, const char *path, const SVnodeCfg *pCfg);
void vnodeDrop(SVnode *pVnode);
int32_t vnodeCompact(SVnode *pVnode);
int32_t vnodeSync(SVnode *pVnode);
int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad);
SVnodeMsg *vnodeInitMsg(int32_t msgNum);
......
......@@ -53,45 +53,63 @@ typedef struct {
EWalType walLevel; // wal level
} SWalCfg;
#define WAL_PREFIX "wal"
#define WAL_PREFIX_LEN 3
#define WAL_REFRESH_MS 1000
#define WAL_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead) + 16)
#define WAL_SIGNATURE ((uint32_t)(0xFAFBFDFE))
#define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12)
#define WAL_FILE_LEN (WAL_PATH_LEN + 32)
#define WAL_FILE_NUM 1 // 3
#define WAL_PREFIX "wal"
#define WAL_LOG_SUFFIX "log"
#define WAL_INDEX_SUFFIX "idx"
#define WAL_PREFIX_LEN 3
#define WAL_REFRESH_MS 1000
#define WAL_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead) + 16)
#define WAL_SIGNATURE ((uint32_t)(0xFAFBFDFEUL))
#define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12)
#define WAL_FILE_LEN (WAL_PATH_LEN + 32)
//#define WAL_FILE_NUM 1 // 3
#define WAL_CUR_POS_READ_ONLY 1
#define WAL_CUR_FILE_READ_ONLY 2
typedef struct SWal {
int64_t version;
int64_t fileId;
int64_t rId;
int64_t tfd;
int32_t vgId;
int32_t keep;
int32_t level;
int32_t fsyncPeriod;
// cfg
int32_t vgId;
int32_t fsyncPeriod; // millisecond
EWalType level;
//reference
int64_t refId;
//current tfd
int64_t curLogTfd;
int64_t curIdxTfd;
//current version
int64_t curVersion;
int64_t curOffset;
//current file version
int64_t curFileFirstVersion;
int64_t curFileLastVersion;
//wal fileset version
int64_t firstVersion;
int64_t snapshotVersion;
int64_t lastVersion;
//fsync status
int32_t fsyncSeq;
int8_t stop;
int8_t reseved[3];
char path[WAL_PATH_LEN];
char name[WAL_FILE_LEN];
//ctl
int32_t curStatus;
pthread_mutex_t mutex;
//path
char path[WAL_PATH_LEN];
} SWal; // WAL HANDLE
typedef int32_t (*FWalWrite)(void *ahandle, void *pHead, void *pMsg);
typedef int32_t (*FWalWrite)(void *ahandle, void *pHead);
// module initialization
int32_t walInit();
void walCleanUp();
// handle open and ctl
SWal *walOpen(char *path, SWalCfg *pCfg);
SWal *walOpen(const char *path, SWalCfg *pCfg);
void walStop(SWal *pWal);
int32_t walAlter(SWal *, SWalCfg *pCfg);
void walClose(SWal *);
// write
// int64_t walWriteWithMsgType(SWal*, int8_t msgType, void* body, int32_t bodyLen);
//int64_t walWriteWithMsgType(SWal*, int8_t msgType, void* body, int32_t bodyLen);
int64_t walWrite(SWal *, int64_t index, void *body, int32_t bodyLen);
int64_t walWriteBatch(SWal *, void **bodies, int32_t *bodyLen, int32_t batchSize);
......@@ -101,7 +119,8 @@ int32_t walCommit(SWal *, int64_t ver);
// truncate after
int32_t walRollback(SWal *, int64_t ver);
// notify that previous log can be pruned safely
int32_t walPrune(SWal *, int64_t ver);
int32_t walTakeSnapshot(SWal *, int64_t ver);
//int32_t walDataCorrupted(SWal*);
// read
int32_t walRead(SWal *, SWalHead **, int64_t ver);
......@@ -111,7 +130,6 @@ int32_t walReadWithFp(SWal *, FWalWrite writeFp, int64_t verStart, int32_t readN
int64_t walGetFirstVer(SWal *);
int64_t walGetSnapshotVer(SWal *);
int64_t walGetLastVer(SWal *);
// int32_t walDataCorrupted(SWal*);
//internal
int32_t walGetNextFile(SWal *pWal, int64_t *nextFileId);
......
......@@ -358,12 +358,6 @@ do { \
#define TSDB_DEFAULT_STABLES_HASH_SIZE 100
#define TSDB_DEFAULT_CTABLES_HASH_SIZE 20000
#define TSDB_PORT_DNODESHELL 0
#define TSDB_PORT_DNODEDNODE 5
#define TSDB_PORT_SYNC 10
#define TSDB_PORT_HTTP 11
#define TSDB_PORT_ARBITRATOR 12
#define TSDB_MAX_WAL_SIZE (1024*1024*3)
#define TSDB_ARB_DUMMY_TIME 4765104000000 // 2121-01-01 00:00:00.000, :P
......
......@@ -22,10 +22,13 @@
extern "C" {
#endif
typedef struct SWorkerPool SWorkerPool;
typedef struct SMWorkerPool SMWorkerPool;
typedef struct SWorker {
int32_t id; // worker ID
pthread_t thread; // thread
struct SWorkerPool *pool;
int32_t id; // worker ID
pthread_t thread; // thread
SWorkerPool *pool;
} SWorker;
typedef struct SWorkerPool {
......@@ -39,11 +42,11 @@ typedef struct SWorkerPool {
} SWorkerPool;
typedef struct SMWorker {
int32_t id; // worker id
pthread_t thread; // thread
taos_qall qall;
taos_qset qset; // queue set
struct SMWorkerPool *pool;
int32_t id; // worker id
pthread_t thread; // thread
taos_qall qall;
taos_qset qset; // queue set
SMWorkerPool *pool;
} SMWorker;
typedef struct SMWorkerPool {
......
......@@ -33,10 +33,6 @@ char tsArbitrator[TSDB_EP_LEN] = {0};
char tsLocalFqdn[TSDB_FQDN_LEN] = {0};
char tsLocalEp[TSDB_EP_LEN] = {0}; // Local End Point, hostname:port
uint16_t tsServerPort = 6030;
uint16_t tsDnodeShellPort = 6030; // udp[6035-6039] tcp[6035]
uint16_t tsDnodeDnodePort = 6035; // udp/tcp
uint16_t tsSyncPort = 6040;
uint16_t tsArbitratorPort = 6042;
int32_t tsStatusInterval = 1; // second
int32_t tsNumOfMnodes = 1;
int8_t tsEnableVnodeBak = 1;
......@@ -1726,11 +1722,6 @@ int32_t taosCheckGlobalCfg() {
}
}
tsDnodeShellPort = tsServerPort + TSDB_PORT_DNODESHELL; // udp[6035-6039] tcp[6035]
tsDnodeDnodePort = tsServerPort + TSDB_PORT_DNODEDNODE; // udp/tcp
tsSyncPort = tsServerPort + TSDB_PORT_SYNC;
tsHttpPort = tsServerPort + TSDB_PORT_HTTP;
if (tsQueryBufferSize >= 0) {
tsQueryBufferSizeBytes = tsQueryBufferSize * 1048576UL;
}
......
......@@ -30,7 +30,10 @@ static struct {
char configDir[PATH_MAX];
} global = {0};
void dmnSigintHandle(int signum, void *info, void *ctx) { global.stop = true; }
void dmnSigintHandle(int signum, void *info, void *ctx) {
uError("singal:%d is received", signum);
global.stop = true;
}
void dmnSetSignalHandle() {
taosSetSignal(SIGTERM, dmnSigintHandle);
......@@ -141,13 +144,13 @@ void dmnInitOption(SDnodeOpt *pOption) {
pOption->shellActivityTimer = tsShellActivityTimer;
pOption->statusInterval = tsStatusInterval;
pOption->serverPort = tsServerPort;
tstrncpy(pOption->dataDir, tsDataDir, TSDB_EP_LEN);
tstrncpy(pOption->dataDir, tsDataDir, TSDB_FILENAME_LEN);
tstrncpy(pOption->localEp, tsLocalEp, TSDB_EP_LEN);
tstrncpy(pOption->localFqdn, tsLocalEp, TSDB_FQDN_LEN);
tstrncpy(pOption->firstEp, tsFirst, TSDB_FQDN_LEN);
tstrncpy(pOption->timezone, tsLocalEp, TSDB_TIMEZONE_LEN);
tstrncpy(pOption->locale, tsLocalEp, TSDB_LOCALE_LEN);
tstrncpy(pOption->charset, tsLocalEp, TSDB_LOCALE_LEN);
tstrncpy(pOption->localFqdn, tsLocalFqdn, TSDB_FQDN_LEN);
tstrncpy(pOption->firstEp, tsFirst, TSDB_EP_LEN);
tstrncpy(pOption->timezone, tsTimezone, TSDB_TIMEZONE_LEN);
tstrncpy(pOption->locale, tsLocale, TSDB_LOCALE_LEN);
tstrncpy(pOption->charset, tsCharset, TSDB_LOCALE_LEN);
}
int dmnRunDnode() {
......
......@@ -21,16 +21,16 @@ extern "C" {
#endif
#include "dndInt.h"
int32_t dndInitDnode(SDnode *pDnd);
void dndCleanupDnode(SDnode *pDnd);
void dndProcessDnodeReq(SDnode *pDnd, SRpcMsg *pMsg, SEpSet *pEpSet);
void dndProcessDnodeRsp(SDnode *pDnd, SRpcMsg *pMsg, SEpSet *pEpSet);
int32_t dndInitDnode(SDnode *pDnode);
void dndCleanupDnode(SDnode *pDnode);
void dndProcessDnodeReq(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void dndProcessDnodeRsp(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
int32_t dndGetDnodeId(SDnode *pDnd);
int64_t dndGetClusterId(SDnode *pDnd);
void dndGetDnodeEp(SDnode *pDnd, int32_t dnodeId, char *pEp, char *pFqdn, uint16_t *pPort);
void dndGetMnodeEpSet(SDnode *pDnd, SEpSet *pEpSet);
void dndSendRedirectMsg(SDnode *pDnd, SRpcMsg *pMsg, bool forShell);
int32_t dndGetDnodeId(SDnode *pDnode);
int64_t dndGetClusterId(SDnode *pDnode);
void dndGetDnodeEp(SDnode *pDnode, int32_t dnodeId, char *pEp, char *pFqdn, uint16_t *pPort);
void dndGetMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet);
void dndSendRedirectMsg(SDnode *pDnode, SRpcMsg *pMsg);
#ifdef __cplusplus
}
......
......@@ -31,18 +31,19 @@ extern "C" {
#include "tthread.h"
#include "ttime.h"
#include "tworker.h"
#include "dnode.h"
#include "mnode.h"
#include "vnode.h"
#include "dnode.h"
extern int32_t dDebugFlag;
#define dFatal(...) { if (dDebugFlag & DEBUG_FATAL) { taosPrintLog("SRV FATAL ", 255, __VA_ARGS__); }}
#define dError(...) { if (dDebugFlag & DEBUG_ERROR) { taosPrintLog("SRV ERROR ", 255, __VA_ARGS__); }}
#define dWarn(...) { if (dDebugFlag & DEBUG_WARN) { taosPrintLog("SRV WARN ", 255, __VA_ARGS__); }}
#define dInfo(...) { if (dDebugFlag & DEBUG_INFO) { taosPrintLog("SRV ", 255, __VA_ARGS__); }}
#define dDebug(...) { if (dDebugFlag & DEBUG_DEBUG) { taosPrintLog("SRV ", dDebugFlag, __VA_ARGS__); }}
#define dTrace(...) { if (dDebugFlag & DEBUG_TRACE) { taosPrintLog("SRV ", dDebugFlag, __VA_ARGS__); }}
#define dFatal(...) { if (dDebugFlag & DEBUG_FATAL) { taosPrintLog("DND FATAL ", 255, __VA_ARGS__); }}
#define dError(...) { if (dDebugFlag & DEBUG_ERROR) { taosPrintLog("DND ERROR ", 255, __VA_ARGS__); }}
#define dWarn(...) { if (dDebugFlag & DEBUG_WARN) { taosPrintLog("DND WARN ", 255, __VA_ARGS__); }}
#define dInfo(...) { if (dDebugFlag & DEBUG_INFO) { taosPrintLog("DND ", 255, __VA_ARGS__); }}
#define dDebug(...) { if (dDebugFlag & DEBUG_DEBUG) { taosPrintLog("DND ", dDebugFlag, __VA_ARGS__); }}
#define dTrace(...) { if (dDebugFlag & DEBUG_TRACE) { taosPrintLog("DND ", dDebugFlag, __VA_ARGS__); }}
typedef enum { DND_STAT_INIT, DND_STAT_RUNNING, DND_STAT_STOPPED } EStat;
typedef void (*DndMsgFp)(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEps);
......@@ -54,48 +55,50 @@ typedef struct {
} SDnodeDir;
typedef struct {
int32_t dnodeId;
uint32_t rebootTime;
int32_t dropped;
int64_t clusterId;
SEpSet shellEpSet;
SEpSet peerEpSet;
char *file;
SHashObj *dnodeHash;
SDnodeEps *dnodeEps;
pthread_t *threadId;
pthread_mutex_t mutex;
int32_t dnodeId;
int32_t dropped;
uint32_t rebootTime;
int64_t clusterId;
SEpSet mnodeEpSet;
char *file;
SHashObj *dnodeHash;
SDnodeEps *dnodeEps;
pthread_t *threadId;
SRWLatch latch;
} SDnodeMgmt;
typedef struct {
int32_t refCount;
int8_t deployed;
int8_t dropped;
SWorkerPool mgmtPool;
SWorkerPool readPool;
SWorkerPool writePool;
SWorkerPool syncPool;
int8_t replica;
int8_t selfIndex;
SReplica replicas[TSDB_MAX_REPLICA];
char *file;
SMnode *pMnode;
SRWLatch latch;
taos_queue pReadQ;
taos_queue pWriteQ;
taos_queue pApplyQ;
taos_queue pSyncQ;
taos_queue pMgmtQ;
char *file;
SMnode *pMnode;
SRWLatch latch;
SWorkerPool mgmtPool;
SWorkerPool readPool;
SWorkerPool writePool;
SWorkerPool syncPool;
} SMnodeMgmt;
typedef struct {
SHashObj *hash;
int32_t openVnodes;
int32_t totalVnodes;
SRWLatch latch;
taos_queue pMgmtQ;
SWorkerPool mgmtPool;
SWorkerPool queryPool;
SWorkerPool fetchPool;
SMWorkerPool syncPool;
SMWorkerPool writePool;
taos_queue pMgmtQ;
int32_t openVnodes;
int32_t totalVnodes;
SRWLatch latch;
} SVnodesMgmt;
typedef struct {
......@@ -108,10 +111,10 @@ typedef struct SDnode {
EStat stat;
SDnodeOpt opt;
SDnodeDir dir;
SDnodeMgmt d;
SMnodeMgmt m;
SDnodeMgmt dmgmt;
SMnodeMgmt mmgmt;
SVnodesMgmt vmgmt;
STransMgmt t;
STransMgmt tmgmt;
SStartupMsg startup;
} SDnode;
......@@ -119,7 +122,7 @@ EStat dndGetStat(SDnode *pDnode);
void dndSetStat(SDnode *pDnode, EStat stat);
char *dndStatStr(EStat stat);
void dndReportStartup(SDnode *pDnode, char *name, char *desc);
void dndReportStartup(SDnode *pDnode, char *pName, char *pDesc);
void dndGetStartup(SDnode *pDnode, SStartupMsg *pStartup);
#ifdef __cplusplus
......
......@@ -25,6 +25,10 @@
#include "dndMnode.h"
#include "dndVnodes.h"
#define INTERNAL_USER "_internal"
#define INTERNAL_CKEY "_key"
#define INTERNAL_SECRET "_secret"
static void dndInitMsgFp(STransMgmt *pMgmt) {
// msg from client to dnode
pMgmt->msgFp[TSDB_MSG_TYPE_SUBMIT] = dndProcessVnodeWriteMsg;
......@@ -121,7 +125,7 @@ static void dndInitMsgFp(STransMgmt *pMgmt) {
static void dndProcessResponse(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
SDnode *pDnode = parent;
STransMgmt *pMgmt = &pDnode->t;
STransMgmt *pMgmt = &pDnode->tmgmt;
int32_t msgType = pMsg->msgType;
......@@ -143,19 +147,20 @@ static void dndProcessResponse(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
}
static int32_t dndInitClient(SDnode *pDnode) {
STransMgmt *pMgmt = &pDnode->t;
STransMgmt *pMgmt = &pDnode->tmgmt;
SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.label = "DND-C";
rpcInit.numOfThreads = 1;
rpcInit.cfp = dndProcessResponse;
rpcInit.sessions = TSDB_MAX_VNODES << 4;
rpcInit.sessions = 8;
rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.idleTime = pDnode->opt.shellActivityTimer * 1000;
rpcInit.user = "-internal";
rpcInit.ckey = "-key";
rpcInit.secret = "-secret";
rpcInit.user = INTERNAL_USER;
rpcInit.ckey = INTERNAL_CKEY;
rpcInit.secret = INTERNAL_SECRET;
rpcInit.parent = pDnode;
pMgmt->clientRpc = rpcOpen(&rpcInit);
if (pMgmt->clientRpc == NULL) {
......@@ -163,21 +168,22 @@ static int32_t dndInitClient(SDnode *pDnode) {
return -1;
}
dDebug("dnode rpc client is initialized");
return 0;
}
static void dndCleanupClient(SDnode *pDnode) {
STransMgmt *pMgmt = &pDnode->t;
STransMgmt *pMgmt = &pDnode->tmgmt;
if (pMgmt->clientRpc) {
rpcClose(pMgmt->clientRpc);
pMgmt->clientRpc = NULL;
dInfo("dnode peer rpc client is closed");
dDebug("dnode rpc client is closed");
}
}
static void dndProcessRequest(void *param, SRpcMsg *pMsg, SEpSet *pEpSet) {
SDnode *pDnode = param;
STransMgmt *pMgmt = &pDnode->t;
SDnode *pDnode = param;
STransMgmt *pMgmt = &pDnode->tmgmt;
int32_t msgType = pMsg->msgType;
if (msgType == TSDB_MSG_TYPE_NETWORK_TEST) {
......@@ -218,24 +224,56 @@ static void dndProcessRequest(void *param, SRpcMsg *pMsg, SEpSet *pEpSet) {
}
static void dndSendMsgToMnodeRecv(SDnode *pDnode, SRpcMsg *pRpcMsg, SRpcMsg *pRpcRsp) {
STransMgmt *pMgmt = &pDnode->t;
STransMgmt *pMgmt = &pDnode->tmgmt;
SEpSet epSet = {0};
dndGetMnodeEpSet(pDnode, &epSet);
rpcSendRecv(pMgmt->clientRpc, &epSet, pRpcMsg, pRpcRsp);
}
static int32_t dndAuthInternalMsg(SDnode *pDnode, char *user, char *spi, char *encrypt, char *secret, char *ckey) {
if (strcmp(user, INTERNAL_USER) == 0) {
// A simple temporary implementation
char pass[32] = {0};
taosEncryptPass((uint8_t *)(INTERNAL_SECRET), strlen(INTERNAL_SECRET), pass);
memcpy(secret, pass, TSDB_KEY_LEN);
*spi = 0;
*encrypt = 0;
*ckey = 0;
return 0;
} else if (strcmp(user, TSDB_NETTEST_USER) == 0) {
// A simple temporary implementation
char pass[32] = {0};
taosEncryptPass((uint8_t *)(TSDB_NETTEST_USER), strlen(TSDB_NETTEST_USER), pass);
memcpy(secret, pass, TSDB_KEY_LEN);
*spi = 0;
*encrypt = 0;
*ckey = 0;
return 0;
} else {
return -1;
}
}
static int32_t dndRetrieveUserAuthInfo(void *parent, char *user, char *spi, char *encrypt, char *secret, char *ckey) {
SDnode *pDnode = parent;
if (dndGetUserAuthFromMnode(pDnode, user, spi, encrypt, secret, ckey) != 0) {
if (terrno != TSDB_CODE_APP_NOT_READY) {
dTrace("failed to get user auth from mnode since %s", terrstr());
return -1;
}
if (dndAuthInternalMsg(parent, user, spi, encrypt, secret, ckey) == 0) {
dTrace("get internal auth success");
return 0;
}
if (dndGetUserAuthFromMnode(pDnode, user, spi, encrypt, secret, ckey) == 0) {
dTrace("get auth from internal mnode");
return 0;
}
if (terrno != TSDB_CODE_APP_NOT_READY) {
dTrace("failed to get user auth from internal mnode since %s", terrstr());
return -1;
}
dDebug("user:%s, send auth msg to mnodes", user);
dDebug("user:%s, send auth msg to other mnodes", user);
SAuthMsg *pMsg = rpcMallocCont(sizeof(SAuthMsg));
tstrncpy(pMsg->user, user, TSDB_USER_LEN);
......@@ -246,14 +284,14 @@ static int32_t dndRetrieveUserAuthInfo(void *parent, char *user, char *spi, char
if (rpcRsp.code != 0) {
terrno = rpcRsp.code;
dError("user:%s, failed to get user auth from mnodes since %s", user, terrstr());
dError("user:%s, failed to get user auth from other mnodes since %s", user, terrstr());
} else {
SAuthRsp *pRsp = rpcRsp.pCont;
memcpy(secret, pRsp->secret, TSDB_KEY_LEN);
memcpy(ckey, pRsp->ckey, TSDB_KEY_LEN);
*spi = pRsp->spi;
*encrypt = pRsp->encrypt;
dDebug("user:%s, success to get user auth from mnodes", user);
dDebug("user:%s, success to get user auth from other mnodes", user);
}
rpcFreeCont(rpcRsp.pCont);
......@@ -261,7 +299,7 @@ static int32_t dndRetrieveUserAuthInfo(void *parent, char *user, char *spi, char
}
static int32_t dndInitServer(SDnode *pDnode) {
STransMgmt *pMgmt = &pDnode->t;
STransMgmt *pMgmt = &pDnode->tmgmt;
dndInitMsgFp(pMgmt);
int32_t numOfThreads = (int32_t)((pDnode->opt.numOfCores * pDnode->opt.numOfThreadsPerCore) / 2.0);
......@@ -279,6 +317,7 @@ static int32_t dndInitServer(SDnode *pDnode) {
rpcInit.connType = TAOS_CONN_SERVER;
rpcInit.idleTime = pDnode->opt.shellActivityTimer * 1000;
rpcInit.afp = dndRetrieveUserAuthInfo;
rpcInit.parent = pDnode;
pMgmt->serverRpc = rpcOpen(&rpcInit);
if (pMgmt->serverRpc == NULL) {
......@@ -286,14 +325,16 @@ static int32_t dndInitServer(SDnode *pDnode) {
return -1;
}
dDebug("dnode rpc server is initialized");
return 0;
}
static void dndCleanupServer(SDnode *pDnode) {
STransMgmt *pMgmt = &pDnode->t;
STransMgmt *pMgmt = &pDnode->tmgmt;
if (pMgmt->serverRpc) {
rpcClose(pMgmt->serverRpc);
pMgmt->serverRpc = NULL;
dDebug("dnode rpc server is closed");
}
}
......@@ -311,13 +352,14 @@ int32_t dndInitTrans(SDnode *pDnode) {
}
void dndCleanupTrans(SDnode *pDnode) {
dInfo("dnode-transport start to clean up");
dndCleanupServer(pDnode);
dndCleanupClient(pDnode);
dInfo("dnode-transport is cleaned up");
}
void dndSendMsgToDnode(SDnode *pDnode, SEpSet *pEpSet, SRpcMsg *pMsg) {
STransMgmt *pMgmt = &pDnode->t;
STransMgmt *pMgmt = &pDnode->tmgmt;
rpcSendRequest(pMgmt->clientRpc, pEpSet, pMsg, NULL);
}
......
......@@ -22,7 +22,8 @@ typedef struct {
int32_t refCount;
int8_t dropped;
int8_t accessState;
SVnode * pImpl;
char *path;
SVnode *pImpl;
taos_queue pWriteQ;
taos_queue pSyncQ;
taos_queue pApplyQ;
......@@ -74,7 +75,7 @@ static int32_t dndPutMsgIntoVnodeApplyQueue(SDnode *pDnode, int32_t vgId, SVnode
static SVnodeObj * dndAcquireVnode(SDnode *pDnode, int32_t vgId);
static void dndReleaseVnode(SDnode *pDnode, SVnodeObj *pVnode);
static int32_t dndCreateVnodeWrapper(SDnode *pDnode, int32_t vgId, SVnode *pImpl);
static int32_t dndCreateVnodeWrapper(SDnode *pDnode, int32_t vgId, char *path, SVnode *pImpl);
static void dndDropVnodeWrapper(SDnode *pDnode, SVnodeObj *pVnode);
static SVnodeObj **dndGetVnodesFromHash(SDnode *pDnode, int32_t *numOfVnodes);
static int32_t dndGetVnodesFromFile(SDnode *pDnode, SVnodeObj **ppVnodes, int32_t *numOfVnodes);
......@@ -125,7 +126,7 @@ static void dndReleaseVnode(SDnode *pDnode, SVnodeObj *pVnode) {
}
}
static int32_t dndCreateVnodeWrapper(SDnode *pDnode, int32_t vgId, SVnode *pImpl) {
static int32_t dndCreateVnodeWrapper(SDnode *pDnode, int32_t vgId, char *path, SVnode *pImpl) {
SVnodesMgmt *pMgmt = &pDnode->vmgmt;
SVnodeObj * pVnode = calloc(1, sizeof(SVnodeObj));
if (pVnode == NULL) {
......@@ -139,6 +140,12 @@ static int32_t dndCreateVnodeWrapper(SDnode *pDnode, int32_t vgId, SVnode *pImpl
pVnode->accessState = TSDB_VN_ALL_ACCCESS;
pVnode->pImpl = pImpl;
pVnode->path = tstrdup(path);
if (pVnode->path == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
if (dndAllocVnodeQueryQueue(pDnode, pVnode) != 0) {
return -1;
}
......@@ -232,7 +239,7 @@ static int32_t dndGetVnodesFromFile(SDnode *pDnode, SVnodeObj **ppVnodes, int32_
snprintf(file, PATH_MAX + 20, "%s/vnodes.json", pDnode->dir.vnodes);
fp = fopen(file, "r");
if (!fp) {
if (fp == NULL) {
dDebug("file %s not exist", file);
code = 0;
goto PRASE_VNODE_OVER;
......@@ -354,22 +361,25 @@ static int32_t dndWriteVnodesToFile(SDnode *pDnode) {
static int32_t dndCreateVnode(SDnode *pDnode, int32_t vgId, SVnodeCfg *pCfg) {
char path[PATH_MAX + 20] = {0};
snprintf(path, sizeof(path), "%s/vnode%d", pDnode->dir.vnodes, vgId);
SVnode *pImpl = vnodeCreate(vgId, path, pCfg);
// SVnode *pImpl = vnodeCreate(vgId, path, pCfg);
SVnode *pImpl = vnodeOpen(path, NULL);
if (pImpl == NULL) {
return -1;
}
int32_t code = dndCreateVnodeWrapper(pDnode, vgId, pImpl);
int32_t code = dndCreateVnodeWrapper(pDnode, vgId, path, pImpl);
if (code != 0) {
vnodeDrop(pImpl);
vnodeClose(pImpl);
vnodeDestroy(path);
terrno = code;
return code;
}
code = dndWriteVnodesToFile(pDnode);
if (code != 0) {
vnodeDrop(pImpl);
vnodeClose(pImpl);
vnodeDestroy(path);
terrno = code;
return code;
}
......@@ -385,7 +395,8 @@ static int32_t dndDropVnode(SDnode *pDnode, SVnodeObj *pVnode) {
}
dndDropVnodeWrapper(pDnode, pVnode);
vnodeDrop(pVnode->pImpl);
vnodeClose(pVnode->pImpl);
vnodeDestroy(pVnode->path);
dndWriteVnodesToFile(pDnode);
return 0;
}
......@@ -413,7 +424,7 @@ static void *dnodeOpenVnodeFunc(void *param) {
dError("vgId:%d, failed to open vnode by thread:%d", pVnode->vgId, pThread->threadIndex);
pThread->failed++;
} else {
dndCreateVnodeWrapper(pDnode, pVnode->vgId, pImpl);
dndCreateVnodeWrapper(pDnode, pVnode->vgId, path, pImpl);
dDebug("vgId:%d, is opened by thread:%d", pVnode->vgId, pThread->threadIndex);
pThread->opened++;
}
......@@ -433,7 +444,7 @@ static int32_t dndOpenVnodes(SDnode *pDnode) {
pMgmt->hash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
if (pMgmt->hash == NULL) {
dError("failed to init vnode hash");
terrno = TSDB_CODE_VND_OUT_OF_MEMORY;
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
......@@ -876,13 +887,13 @@ static int32_t dndInitVnodeMgmtWorker(SDnode *pDnode) {
pPool->min = 1;
pPool->max = 1;
if (tWorkerInit(pPool) != 0) {
terrno = TSDB_CODE_VND_OUT_OF_MEMORY;
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pMgmt->pMgmtQ = tWorkerAllocQueue(pPool, pDnode, (FProcessItem)dndProcessVnodeMgmtQueue);
if (pMgmt->pMgmtQ == NULL) {
terrno = TSDB_CODE_VND_OUT_OF_MEMORY;
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
......@@ -920,6 +931,7 @@ static int32_t dndAllocVnodeFetchQueue(SDnode *pDnode, SVnodeObj *pVnode) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
return 0;
}
......@@ -940,7 +952,8 @@ static int32_t dndInitVnodeReadWorker(SDnode *pDnode) {
pPool->min = (int32_t)threadsForQuery;
pPool->max = pPool->min;
if (tWorkerInit(pPool) != 0) {
return TSDB_CODE_VND_OUT_OF_MEMORY;
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pPool = &pMgmt->fetchPool;
......@@ -948,7 +961,8 @@ static int32_t dndInitVnodeReadWorker(SDnode *pDnode) {
pPool->min = MIN(maxFetchThreads, pDnode->opt.numOfCores);
pPool->max = pPool->min;
if (tWorkerInit(pPool) != 0) {
TSDB_CODE_VND_OUT_OF_MEMORY;
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
return 0;
......@@ -1000,7 +1014,7 @@ static int32_t dndInitVnodeWriteWorker(SDnode *pDnode) {
pPool->name = "vnode-write";
pPool->max = tsNumOfCores;
if (tMWorkerInit(pPool) != 0) {
terrno = TSDB_CODE_VND_OUT_OF_MEMORY;
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
......@@ -1038,7 +1052,7 @@ static int32_t dndInitVnodeSyncWorker(SDnode *pDnode) {
pPool->name = "vnode-sync";
pPool->max = maxThreads;
if (tMWorkerInit(pPool) != 0) {
terrno = TSDB_CODE_VND_OUT_OF_MEMORY;
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
......
......@@ -20,8 +20,8 @@
#include "dndVnodes.h"
#include "sync.h"
#include "tcache.h"
#include "wal.h"
#include "tcrc32c.h"
#include "wal.h"
EStat dndGetStat(SDnode *pDnode) { return pDnode->stat; }
......@@ -43,10 +43,10 @@ char *dndStatStr(EStat stat) {
}
}
void dndReportStartup(SDnode *pDnode, char *name, char *desc) {
void dndReportStartup(SDnode *pDnode, char *pName, char *pDesc) {
SStartupMsg *pStartup = &pDnode->startup;
tstrncpy(pStartup->name, name, strlen(pStartup->name));
tstrncpy(pStartup->desc, desc, strlen(pStartup->desc));
tstrncpy(pStartup->name, pName, TSDB_STEP_NAME_LEN);
tstrncpy(pStartup->desc, pDesc, TSDB_STEP_DESC_LEN);
pStartup->finished = 0;
}
......@@ -61,7 +61,7 @@ static int32_t dndCheckRunning(char *dataDir) {
FileFd fd = taosOpenFileCreateWriteTrunc(filepath);
if (fd < 0) {
dError("failed to open lock file:%s since %s, quit", filepath, strerror(errno));
dError("failed to lock file:%s since %s, quit", filepath, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
......@@ -77,20 +77,20 @@ static int32_t dndCheckRunning(char *dataDir) {
return 0;
}
static int32_t dndInitEnv(SDnode *pDnode, SDnodeOpt *pOptions) {
if (dndCheckRunning(pOptions->dataDir) != 0) {
static int32_t dndInitEnv(SDnode *pDnode, SDnodeOpt *pOption) {
if (dndCheckRunning(pOption->dataDir) != 0) {
return -1;
}
char path[PATH_MAX + 100];
snprintf(path, sizeof(path), "%s%smnode", pOptions->dataDir, TD_DIRSEP);
pDnode->dir.mnode = strdup(path);
snprintf(path, sizeof(path), "%s%smnode", pOption->dataDir, TD_DIRSEP);
pDnode->dir.mnode = tstrdup(path);
snprintf(path, sizeof(path), "%s%svnode", pOptions->dataDir, TD_DIRSEP);
pDnode->dir.vnodes = strdup(path);
snprintf(path, sizeof(path), "%s%svnode", pOption->dataDir, TD_DIRSEP);
pDnode->dir.vnodes = tstrdup(path);
snprintf(path, sizeof(path), "%s%sdnode", pOptions->dataDir, TD_DIRSEP);
pDnode->dir.dnode = strdup(path);
snprintf(path, sizeof(path), "%s%sdnode", pOption->dataDir, TD_DIRSEP);
pDnode->dir.dnode = tstrdup(path);
if (pDnode->dir.mnode == NULL || pDnode->dir.vnodes == NULL || pDnode->dir.dnode == NULL) {
dError("failed to malloc dir object");
......@@ -116,6 +116,7 @@ static int32_t dndInitEnv(SDnode *pDnode, SDnodeOpt *pOptions) {
return -1;
}
memcpy(&pDnode->opt, pOption, sizeof(SDnodeOpt));
return 0;
}
......@@ -135,12 +136,12 @@ static void dndCleanupEnv(SDnode *pDnode) {
taosStopCacheRefreshWorker();
}
SDnode *dndInit(SDnodeOpt *pOptions) {
SDnode *dndInit(SDnodeOpt *pOption) {
taosIgnSIGPIPE();
taosBlockSIGPIPE();
taosResolveCRC();
SDnode *pDnode = calloc(1, sizeof(pDnode));
SDnode *pDnode = calloc(1, sizeof(SDnode));
if (pDnode == NULL) {
dError("failed to create dnode object");
terrno = TSDB_CODE_OUT_OF_MEMORY;
......@@ -150,7 +151,7 @@ SDnode *dndInit(SDnodeOpt *pOptions) {
dInfo("start to initialize TDengine");
dndSetStat(pDnode, DND_STAT_INIT);
if (dndInitEnv(pDnode, pOptions) != 0) {
if (dndInitEnv(pDnode, pOption) != 0) {
dError("failed to init env");
dndCleanup(pDnode);
return NULL;
......@@ -196,7 +197,7 @@ SDnode *dndInit(SDnodeOpt *pOptions) {
dndReportStartup(pDnode, "TDengine", "initialized successfully");
dInfo("TDengine is initialized successfully");
return 0;
return pDnode;
}
void dndCleanup(SDnode *pDnode) {
......
......@@ -131,7 +131,7 @@ typedef struct SMnodeObj {
int64_t roleTime;
int64_t createdTime;
int64_t updateTime;
SDnodeObj *pDnd;
SDnodeObj *pDnode;
} SMnodeObj;
typedef struct {
......@@ -215,7 +215,7 @@ typedef struct SDbObj {
typedef struct {
int32_t dnodeId;
int8_t role;
SDnodeObj *pDnd;
SDnodeObj *pDnode;
} SVnodeGid;
typedef struct SVgObj {
......
......@@ -32,7 +32,7 @@ typedef struct SMnodeBak {
tmr_h timer;
SSteps *pInitSteps;
SSteps *pStartSteps;
SMnodeOptions para;
SMnodeOpt para;
MnodeRpcFp msgFp[TSDB_MSG_TYPE_MAX];
} SMnodeBak;
......
......@@ -31,9 +31,9 @@
#include "mnodeStable.h"
#include "mnodeSync.h"
#include "mnodeTelem.h"
#include "mnodeTrans.h"
#include "mnodeUser.h"
#include "mnodeVgroup.h"
#include "mnodeTrans.h"
SMnodeBak tsMint = {0};
......@@ -53,7 +53,7 @@ void mnodeSendMsgToMnode(SMnode *pMnode, struct SRpcMsg *rpcMsg) {
void mnodeSendRedirectMsg(SMnode *pMnode, struct SRpcMsg *rpcMsg, bool forShell) {
assert(pMnode);
(*pMnode->sendRedirectMsgFp)(pMnode->pServer, rpcMsg, forShell);
(*pMnode->sendRedirectMsgFp)(pMnode->pServer, rpcMsg);
}
static int32_t mnodeInitTimer() {
......@@ -77,17 +77,17 @@ static void mnodeCleanupTimer() {
tmr_h mnodeGetTimer() { return tsMint.timer; }
static int32_t mnodeSetOptions(SMnode *pMnode, const SMnodeOptions *pOptions) {
pMnode->dnodeId = pOptions->dnodeId;
pMnode->clusterId = pOptions->clusterId;
pMnode->replica = pOptions->replica;
pMnode->selfIndex = pOptions->selfIndex;
memcpy(&pMnode->replicas, pOptions->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA);
pMnode->pServer = pOptions->pDnode;
pMnode->putMsgToApplyMsgFp = pOptions->putMsgToApplyMsgFp;
pMnode->sendMsgToDnodeFp = pOptions->sendMsgToDnodeFp;
pMnode->sendMsgToMnodeFp = pOptions->sendMsgToMnodeFp;
pMnode->sendRedirectMsgFp = pOptions->sendRedirectMsgFp;
static int32_t mnodeSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) {
pMnode->dnodeId = pOption->dnodeId;
pMnode->clusterId = pOption->clusterId;
pMnode->replica = pOption->replica;
pMnode->selfIndex = pOption->selfIndex;
memcpy(&pMnode->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA);
pMnode->pServer = pOption->pDnode;
pMnode->putMsgToApplyMsgFp = pOption->putMsgToApplyMsgFp;
pMnode->sendMsgToDnodeFp = pOption->sendMsgToDnodeFp;
pMnode->sendMsgToMnodeFp = pOption->sendMsgToMnodeFp;
pMnode->sendRedirectMsgFp = pOption->sendRedirectMsgFp;
if (pMnode->sendMsgToDnodeFp == NULL || pMnode->sendMsgToMnodeFp == NULL || pMnode->sendRedirectMsgFp == NULL ||
pMnode->putMsgToApplyMsgFp == NULL || pMnode->dnodeId < 0 || pMnode->clusterId < 0) {
......@@ -136,10 +136,10 @@ static int32_t mnodeAllocStartSteps() {
return 0;
}
SMnode *mnodeOpen(const char *path, const SMnodeOptions *pOptions) {
SMnode *mnodeOpen(const char *path, const SMnodeOpt *pOption) {
SMnode *pMnode = calloc(1, sizeof(SMnode));
if (mnodeSetOptions(pMnode, pOptions) != 0) {
if (mnodeSetOptions(pMnode, pOption) != 0) {
free(pMnode);
mError("failed to init mnode options since %s", terrstr());
return NULL;
......@@ -173,7 +173,7 @@ SMnode *mnodeOpen(const char *path, const SMnodeOptions *pOptions) {
void mnodeClose(SMnode *pMnode) { free(pMnode); }
int32_t mnodeAlter(SMnode *pMnode, const SMnodeOptions *pOptions) { return 0; }
int32_t mnodeAlter(SMnode *pMnode, const SMnodeOpt *pOption) { return 0; }
void mnodeDestroy(const char *path) { sdbUnDeploy(); }
......
......@@ -21,16 +21,5 @@ int32_t mnodeInitAuth() { return 0; }
void mnodeCleanupAuth() {}
int32_t mnodeRetriveAuth(SMnode *pMnode, char *user, char *spi, char *encrypt, char *secret, char *ckey) {
if (strcmp(user, TSDB_NETTEST_USER) == 0) {
char pass[32] = {0};
taosEncryptPass((uint8_t *)user, strlen(user), pass);
*spi = 0;
*encrypt = 0;
*ckey = 0;
memcpy(secret, pass, TSDB_KEY_LEN);
mDebug("nettest user is authorized");
return 0;
}
return 0;
}
\ No newline at end of file
......@@ -17,9 +17,6 @@
#include "vnodeInt.h"
#include "tqueue.h"
int32_t vnodeInit(SVnodePara para) { return 0; }
void vnodeCleanup() {}
int32_t vnodeAlter(SVnode *pVnode, const SVnodeCfg *pCfg) { return 0; }
SVnode *vnodeCreate(int32_t vgId, const char *path, const SVnodeCfg *pCfg) { return NULL; }
void vnodeDrop(SVnode *pVnode) {}
......@@ -31,7 +28,7 @@ int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) { return 0; }
SVnodeMsg *vnodeInitMsg(int32_t msgNum) {
SVnodeMsg *pMsg = taosAllocateQitem(msgNum * sizeof(SRpcMsg *) + sizeof(SVnodeMsg));
if (pMsg == NULL) {
terrno = TSDB_CODE_VND_OUT_OF_MEMORY;
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
} else {
pMsg->allocNum = msgNum;
......@@ -41,7 +38,7 @@ SVnodeMsg *vnodeInitMsg(int32_t msgNum) {
int32_t vnodeAppendMsg(SVnodeMsg *pMsg, SRpcMsg *pRpcMsg) {
if (pMsg->curNum >= pMsg->allocNum) {
return TSDB_CODE_VND_OUT_OF_MEMORY;
return TSDB_CODE_OUT_OF_MEMORY;
}
pMsg->rpcMsg[pMsg->curNum++] = *pRpcMsg;
......
......@@ -80,6 +80,9 @@ void fstBuilderInsertOutput(FstBuilder *b, FstSlice bs, Output in);
OrderType fstBuilderCheckLastKey(FstBuilder *b, FstSlice bs, bool ckDup);
void fstBuilderCompileFrom(FstBuilder *b, uint64_t istate);
CompiledAddr fstBuilderCompile(FstBuilder *b, FstBuilderNode *bn);
void* fstBuilerIntoInner(FstBuilder *b);
void fstBuilderFinish(FstBuilder *b);
......@@ -216,6 +219,15 @@ bool fstNodeFindInput(FstNode *node, uint8_t b, uint64_t *res);
bool fstNodeCompile(FstNode *node, void *w, CompiledAddr lastAddr, CompiledAddr addr, FstBuilderNode *builderNode);
FstSlice fstNodeAsSlice(FstNode *node);
// ops
typedef struct FstIndexedValue {
uint64_t index;
uint64_t value;
} FstIndexedValue;
FstLastTransition *fstLastTransitionCreate(uint8_t inp, Output out);
void fstLastTransitionDestroy(FstLastTransition *trn);
typedef struct FstMeta {
......@@ -227,20 +239,20 @@ typedef struct FstMeta {
} FstMeta;
typedef struct Fst {
FstMeta meta;
void *data; //
FstMeta *meta;
FstSlice *data; //
FstNode *root; //
} Fst;
// ops
typedef struct FstIndexedValue {
uint64_t index;
uint64_t value;
} FstIndexedValue;
FstLastTransition *fstLastTransitionCreate(uint8_t inp, Output out);
void fstLastTransitionDestroy(FstLastTransition *trn);
// refactor simple function
Fst* fstCreate(FstSlice *data);
void fstDestroy(Fst *fst);
bool fstGet(Fst *fst, FstSlice *b, Output *out);
FstNode* fstGetNode(Fst *fst, CompiledAddr);
FstType fstGetType(Fst *fst);
CompiledAddr fstGetRootAddr(Fst *fst);
Output fstEmptyFinalOutput(Fst *fst, bool *null);
bool fstVerify(Fst *fst);
#endif
......@@ -27,9 +27,11 @@ typedef struct FstCountingWriter {
uint64_t fstCountingWriterWrite(FstCountingWriter *write, uint8_t *buf, uint32_t bufLen);
int FstCountingWriterFlush(FstCountingWriter *write);
int fstCountingWriterFlush(FstCountingWriter *write);
uint32_t fstCountingWriterMaskedCheckSum(FstCountingWriter *write);
FstCountingWriter *fstCountingWriterCreate(void *wtr);
void fstCountingWriterDestroy(FstCountingWriter *w);
......
......@@ -32,9 +32,9 @@ extern const CompiledAddr EMPTY_ADDRESS;
extern const CompiledAddr NONE_ADDRESS;
// This version number is written to every finite state transducer created by
// this crate. When a finite state transducer is read, its version number is
// this version When a finite state transducer is read, its version number is
// checked against this value.
extern const uint64_t version;
extern const uint64_t VERSION;
// The threshold (in number of transitions) at which an index is created for
// a node's transitions. This speeds up lookup time at the expense of FST size
......
......@@ -14,7 +14,8 @@
*/
#include "index_fst.h"
#include "tcoding.h"
#include "tchecksum.h"
static void fstPackDeltaIn(FstCountingWriter *wrt, CompiledAddr nodeAddr, CompiledAddr transAddr, uint8_t nBytes) {
......@@ -98,7 +99,7 @@ void fstUnFinishedNodesAddSuffix(FstUnFinishedNodes *nodes, FstSlice bs, Output
FstBuilderNodeUnfinished *un = taosArrayGet(nodes->stack, sz);
assert(un->last == NULL);
//FstLastTransition *trn = malloc(sizeof(FstLastTransition));
//trn->inp = s->data[s->start];
......@@ -146,24 +147,27 @@ uint64_t fstUnFinishedNodesFindCommPrefixAndSetOutput(FstUnFinishedNodes *node,
size_t lsz = (size_t)(s->end - s->start + 1); // data len
size_t ssz = taosArrayGetSize(node->stack); // stack size
uint64_t res = 0;
for (size_t i = 0; i < lsz && i < ssz; i++) {
uint64_t i = 0;
for (i = 0; i < lsz && i < ssz; i++) {
FstBuilderNodeUnfinished *un = taosArrayGet(node->stack, i);
FstLastTransition *last = un->last;
if (last->inp == s->data[s->start + i]) {
uint64_t commPrefix = last->out;
uint64_t addPrefix = last->out - commPrefix;
out = out - commPrefix;
last->out = commPrefix;
if (addPrefix != 0) {
fstBuilderNodeUnfinishedAddOutputPrefix(un, addPrefix);
}
FstLastTransition *t = un->last;
uint64_t addPrefix = 0;
if (t && t->inp == s->data[s->start + i]) {
uint64_t commPrefix = MIN(t->out, *out);
uint64_t tAddPrefix = t->out - commPrefix;
(*out) = (*out) - commPrefix;
t->out = commPrefix;
addPrefix = tAddPrefix;
} else {
break;
break;
}
if (addPrefix != 0) {
fstBuilderNodeUnfinishedAddOutputPrefix(un, addPrefix);
}
}
return res;
return i;
}
......@@ -771,16 +775,16 @@ void fstBuilderInsertOutput(FstBuilder *b, FstSlice bs, Output in) {
return;
}
Output out;
uint64_t prefixLen;
if (in != 0) { //if let Some(in) = in
prefixLen = fstUnFinishedNodesFindCommPrefixAndSetOutput(b->unfinished, bs, in, &out);
} else {
prefixLen = fstUnFinishedNodesFindCommPrefix(b->unfinished, bs);
out = 0;
}
//if (in != 0) { //if let Some(in) = in
// prefixLen = fstUnFinishedNodesFindCommPrefixAndSetOutput(b->unfinished, bs, in, &out);
//} else {
// prefixLen = fstUnFinishedNodesFindCommPrefix(b->unfinished, bs);
// out = 0;
//}
uint64_t prefixLen = fstUnFinishedNodesFindCommPrefixAndSetOutput(b->unfinished, bs, in, &out);
if (prefixLen == FST_SLICE_LEN(s)) {
assert(out != 0);
assert(out == 0);
return;
}
......@@ -849,6 +853,31 @@ CompiledAddr fstBuilderCompile(FstBuilder *b, FstBuilderNode *bn) {
return b->lastAddr;
}
void* fstBuilderInsertInner(FstBuilder *b) {
fstBuilderCompileFrom(b, 0);
FstBuilderNode *rootNode = fstUnFinishedNodesPopRoot(b->unfinished);
CompiledAddr rootAddr = fstBuilderCompile(b, rootNode);
uint8_t buf64[8] = {0};
taosEncodeFixedU64((void **)&buf64, b->len);
fstCountingWriterWrite(b->wrt, buf64, sizeof(buf64));
taosEncodeFixedU64((void **)&buf64, rootAddr);
fstCountingWriterWrite(b->wrt, buf64, sizeof(buf64));
uint8_t buf32[4] = {0};
uint32_t sum = fstCountingWriterMaskedCheckSum(b->wrt);
taosEncodeFixedU32((void **)&buf32, sum);
fstCountingWriterWrite(b->wrt, buf32, sizeof(buf32));
fstCountingWriterFlush(b->wrt);
return b->wrt;
}
void fstBuilderFinish(FstBuilder *b) {
fstBuilderInsertInner(b);
}
......@@ -894,4 +923,108 @@ void fstBuilderNodeUnfinishedAddOutputPrefix(FstBuilderNodeUnfinished *unNode, O
return;
}
Fst* fstCreate(FstSlice *slice) {
char *buf = slice->data;
uint64_t skip = 0;
uint64_t len = slice->dLen;
if (len < 36) {
return NULL;
}
uint64_t version;
taosDecodeFixedU64(buf, &version);
skip += sizeof(version);
if (version == 0 || version > VERSION) {
return NULL;
}
uint64_t type;
taosDecodeFixedU64(buf + skip, &type);
skip += sizeof(type);
uint32_t checkSum = 0;
len -= sizeof(checkSum);
taosDecodeFixedU32(buf + len, &checkSum);
CompiledAddr rootAddr;
len -= sizeof(rootAddr);
taosDecodeFixedU64(buf + len, &rootAddr);
uint64_t fstLen;
len -= sizeof(fstLen);
taosDecodeFixedU64(buf + len, &fstLen);
//TODO(validat root addr)
//
Fst *fst= (Fst *)calloc(1, sizeof(Fst));
if (fst == NULL) { return NULL; }
fst->meta = (FstMeta *)malloc(sizeof(FstMeta));
if (NULL == fst->meta) {
goto FST_CREAT_FAILED;
}
fst->meta->version = version;
fst->meta->rootAddr = rootAddr;
fst->meta->ty = type;
fst->meta->len = fstLen;
fst->meta->checkSum = checkSum;
fst->data = slice;
return fst;
FST_CREAT_FAILED:
free(fst->meta);
free(fst);
}
void fstDestroy(Fst *fst) {
if (fst) {
free(fst->meta);
fstNodeDestroy(fst->root);
}
free(fst);
}
bool fstGet(Fst *fst, FstSlice *b, Output *out) {
return false;
}
FstNode* fstGetNode(Fst *fst, CompiledAddr addr) {
if (fst->root != NULL) {
return fst->root;
}
fst->root = fstNodeCreate(fst->meta->version, addr, fst->data);
return fst->root;
}
FstType fstGetType(Fst *fst) {
return fst->meta->ty;
}
CompiledAddr fstGetRootAddr(Fst *fst) {
return fst->meta->rootAddr;
}
Output fstEmptyFinalOutput(Fst *fst, bool *null) {
Output res = 0;
FstNode *node = fst->root;
if (FST_NODE_IS_FINAL(node)) {
*null = false;
res = FST_NODE_FINAL_OUTPUT(node);
} else {
*null = true;
}
return res;
}
bool fstVerify(Fst *fst) {
uint32_t checkSum = fst->meta->checkSum;
FstSlice *data = fst->data;
TSCKSUM initSum = 0;
if (taosCheckChecksumWhole(data->data, data->dLen)) {
return false;
}
}
......@@ -37,6 +37,9 @@ uint64_t fstCountingWriterWrite(FstCountingWriter *write, uint8_t *buf, uint32_t
return bufLen;
}
uint32_t fstCountingWriterMaskedCheckSum(FstCountingWriter *write) {
return 0;
}
int fstCountingWriterFlush(FstCountingWriter *write) {
//write->wtr->flush
return 1;
......
......@@ -25,7 +25,7 @@ const CompiledAddr NONE_ADDRESS = 1;
// This version number is written to every finite state transducer created by
// this crate. When a finite state transducer is read, its version number is
// checked against this value.
const uint64_t version = 3;
const uint64_t VERSION = 3;
// The threshold (in number of transitions) at which an index is created for
// a node's transitions. This speeds up lookup time at the expense of FST size
......
......@@ -22,6 +22,9 @@
extern "C" {
#endif
int walRotate(SWal* pWal);
int walGetFile(SWal* pWal, int32_t version);
#ifdef __cplusplus
}
#endif
......
......@@ -23,11 +23,10 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
return 0;
}
int32_t walPrune(SWal *pWal, int64_t ver) {
int32_t walTakeSnapshot(SWal *pWal, int64_t ver) {
return 0;
}
int32_t walRead(SWal *pWal, SWalHead **ppHead, int64_t ver) {
return 0;
}
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "taoserror.h"
#include "tref.h"
#include "tfile.h"
#include "walInt.h"
int walSetCurVerImpl(SWal *pWal, int64_t ver) {
//close old file
//iterate all files
//open right file
//set cur version, cur file version and cur status
return 0;
}
int walSetCurVer(SWal *pWal, int64_t ver) {
if(ver > pWal->lastVersion + 1) {
//TODO: some records are skipped
return -1;
}
if(ver < pWal->firstVersion) {
//TODO: try to seek pruned log
return -1;
}
if(ver < pWal->snapshotVersion) {
//TODO: seek snapshotted log
}
if(ver < pWal->curFileFirstVersion || (pWal->curFileLastVersion != -1 && ver > pWal->curFileLastVersion)) {
//back up to avoid inconsistency
int64_t curVersion = pWal->curVersion;
int64_t curOffset = pWal->curOffset;
int64_t curFileFirstVersion = pWal->curFileFirstVersion;
int64_t curFileLastVersion = pWal->curFileLastVersion;
if(walSetCurVerImpl(pWal, ver) < 0) {
//TODO: errno
pWal->curVersion = curVersion;
pWal->curOffset = curOffset;
pWal->curFileFirstVersion = curFileFirstVersion;
pWal->curFileLastVersion = curFileLastVersion;
return -1;
}
}
return 0;
}
int walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) {
int code = 0;
//get index file
if(!tfValid(pWal->curIdxTfd)) {
code = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d, file:%"PRId64".idx, failed to open since %s", pWal->vgId, pWal->curFileFirstVersion, strerror(errno));
}
if(pWal->curVersion != ver) {
if(walSetCurVer(pWal, ver) != 0) {
//TODO: some records are skipped
return -1;
}
}
//check file checksum
//append index
return 0;
}
int walRotateIndex(SWal *pWal) {
//check file checksum
//create new file
//switch file
return 0;
}
......@@ -21,7 +21,7 @@
#include "walInt.h"
typedef struct {
int32_t refId;
int32_t refSetId;
int32_t seq;
int8_t stop;
pthread_t thread;
......@@ -36,7 +36,7 @@ static void walFreeObj(void *pWal);
int32_t walInit() {
int32_t code = 0;
tsWal.refId = taosOpenRef(TSDB_MIN_VNODES, walFreeObj);
tsWal.refSetId = taosOpenRef(TSDB_MIN_VNODES, walFreeObj);
code = pthread_mutex_init(&tsWal.mutex, NULL);
if (code) {
......@@ -45,23 +45,23 @@ int32_t walInit() {
}
code = walCreateThread();
if (code != TSDB_CODE_SUCCESS) {
if (code != 0) {
wError("failed to init wal module since %s", tstrerror(code));
return code;
}
wInfo("wal module is initialized, rsetId:%d", tsWal.refId);
wInfo("wal module is initialized, rsetId:%d", tsWal.refSetId);
return code;
}
void walCleanUp() {
walStopThread();
taosCloseRef(tsWal.refId);
taosCloseRef(tsWal.refSetId);
pthread_mutex_destroy(&tsWal.mutex);
wInfo("wal module is cleaned up");
}
SWal *walOpen(char *path, SWalCfg *pCfg) {
SWal *walOpen(const char *path, SWalCfg *pCfg) {
SWal *pWal = malloc(sizeof(SWal));
if (pWal == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
......@@ -69,10 +69,9 @@ SWal *walOpen(char *path, SWalCfg *pCfg) {
}
pWal->vgId = pCfg->vgId;
pWal->tfd = -1;
pWal->fileId = -1;
pWal->curLogTfd = -1;
/*pWal->curFileId = -1;*/
pWal->level = pCfg->walLevel;
/*pWal->keep = pCfg->keep;*/
pWal->fsyncPeriod = pCfg->fsyncPeriod;
tstrncpy(pWal->path, path, sizeof(pWal->path));
pthread_mutex_init(&pWal->mutex, NULL);
......@@ -80,13 +79,13 @@ SWal *walOpen(char *path, SWalCfg *pCfg) {
pWal->fsyncSeq = pCfg->fsyncPeriod / 1000;
if (pWal->fsyncSeq <= 0) pWal->fsyncSeq = 1;
if (walInitObj(pWal) != TSDB_CODE_SUCCESS) {
if (walInitObj(pWal) != 0) {
walFreeObj(pWal);
return NULL;
}
pWal->rId = taosAddRef(tsWal.refId, pWal);
if (pWal->rId < 0) {
pWal->refId = taosAddRef(tsWal.refSetId, pWal);
if (pWal->refId < 0) {
walFreeObj(pWal);
return NULL;
}
......@@ -102,7 +101,7 @@ int32_t walAlter(SWal *pWal, SWalCfg *pCfg) {
if (pWal->level == pCfg->walLevel && pWal->fsyncPeriod == pCfg->fsyncPeriod) {
wDebug("vgId:%d, old walLevel:%d fsync:%d, new walLevel:%d fsync:%d not change", pWal->vgId, pWal->level,
pWal->fsyncPeriod, pCfg->walLevel, pCfg->fsyncPeriod);
return TSDB_CODE_SUCCESS;
return 0;
}
wInfo("vgId:%d, change old walLevel:%d fsync:%d, new walLevel:%d fsync:%d", pWal->vgId, pWal->level,
......@@ -113,26 +112,16 @@ int32_t walAlter(SWal *pWal, SWalCfg *pCfg) {
pWal->fsyncSeq = pCfg->fsyncPeriod / 1000;
if (pWal->fsyncSeq <= 0) pWal->fsyncSeq = 1;
return TSDB_CODE_SUCCESS;
}
void walStop(void *handle) {
if (handle == NULL) return;
SWal *pWal = handle;
pthread_mutex_lock(&pWal->mutex);
pWal->stop = 1;
pthread_mutex_unlock(&pWal->mutex);
wDebug("vgId:%d, stop write wal", pWal->vgId);
return 0;
}
void walClose(SWal *pWal) {
if (pWal == NULL) return;
pthread_mutex_lock(&pWal->mutex);
tfClose(pWal->tfd);
tfClose(pWal->curLogTfd);
pthread_mutex_unlock(&pWal->mutex);
taosRemoveRef(tsWal.refId, pWal->rId);
taosRemoveRef(tsWal.refSetId, pWal->refId);
}
static int32_t walInitObj(SWal *pWal) {
......@@ -142,14 +131,14 @@ static int32_t walInitObj(SWal *pWal) {
}
wDebug("vgId:%d, object is initialized", pWal->vgId);
return TSDB_CODE_SUCCESS;
return 0;
}
static void walFreeObj(void *wal) {
SWal *pWal = wal;
wDebug("vgId:%d, wal:%p is freed", pWal->vgId, pWal);
tfClose(pWal->tfd);
tfClose(pWal->curLogTfd);
pthread_mutex_destroy(&pWal->mutex);
tfree(pWal);
}
......@@ -174,16 +163,16 @@ static void walUpdateSeq() {
}
static void walFsyncAll() {
SWal *pWal = taosIterateRef(tsWal.refId, 0);
SWal *pWal = taosIterateRef(tsWal.refSetId, 0);
while (pWal) {
if (walNeedFsync(pWal)) {
wTrace("vgId:%d, do fsync, level:%d seq:%d rseq:%d", pWal->vgId, pWal->level, pWal->fsyncSeq, tsWal.seq);
int32_t code = tfFsync(pWal->tfd);
int32_t code = tfFsync(pWal->curLogTfd);
if (code != 0) {
wError("vgId:%d, file:%s, failed to fsync since %s", pWal->vgId, pWal->name, strerror(code));
wError("vgId:%d, file:%"PRId64".log, failed to fsync since %s", pWal->vgId, pWal->curFileFirstVersion, strerror(code));
}
}
pWal = taosIterateRef(tsWal.refId, pWal->rId);
pWal = taosIterateRef(tsWal.refSetId, pWal->refId);
}
}
......@@ -216,7 +205,7 @@ static int32_t walCreateThread() {
pthread_attr_destroy(&thAttr);
wDebug("wal thread is launched, thread:0x%08" PRIx64, taosGetPthreadId(tsWal.thread));
return TSDB_CODE_SUCCESS;
return 0;
}
static void walStopThread() {
......
......@@ -21,6 +21,7 @@
#include "tfile.h"
#include "walInt.h"
#if 0
static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, char *name, int64_t fileId);
int32_t walRenew(void *handle) {
......@@ -29,16 +30,16 @@ int32_t walRenew(void *handle) {
SWal * pWal = handle;
int32_t code = 0;
if (pWal->stop) {
wDebug("vgId:%d, do not create a new wal file", pWal->vgId);
return 0;
}
/*if (pWal->stop) {*/
/*wDebug("vgId:%d, do not create a new wal file", pWal->vgId);*/
/*return 0;*/
/*}*/
pthread_mutex_lock(&pWal->mutex);
if (tfValid(pWal->tfd)) {
tfClose(pWal->tfd);
wDebug("vgId:%d, file:%s, it is closed while renew", pWal->vgId, pWal->name);
if (tfValid(pWal->logTfd)) {
tfClose(pWal->logTfd);
wDebug("vgId:%d, file:%s, it is closed while renew", pWal->vgId, pWal->logName);
}
/*if (pWal->keep == TAOS_WAL_KEEP) {*/
......@@ -48,14 +49,14 @@ int32_t walRenew(void *handle) {
/*pWal->fileId++;*/
/*}*/
snprintf(pWal->name, sizeof(pWal->name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, pWal->fileId);
pWal->tfd = tfOpenCreateWrite(pWal->name);
snprintf(pWal->logName, sizeof(pWal->logName), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, pWal->curFileId);
pWal->logTfd = tfOpenCreateWrite(pWal->logName);
if (!tfValid(pWal->tfd)) {
if (!tfValid(pWal->logTfd)) {
code = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d, file:%s, failed to open since %s", pWal->vgId, pWal->name, strerror(errno));
wError("vgId:%d, file:%s, failed to open since %s", pWal->vgId, pWal->logName, strerror(errno));
} else {
wDebug("vgId:%d, file:%s, it is created and open while renew", pWal->vgId, pWal->name);
wDebug("vgId:%d, file:%s, it is created and open while renew", pWal->vgId, pWal->logName);
}
pthread_mutex_unlock(&pWal->mutex);
......@@ -67,13 +68,13 @@ void walRemoveOneOldFile(void *handle) {
SWal *pWal = handle;
if (pWal == NULL) return;
/*if (pWal->keep == TAOS_WAL_KEEP) return;*/
if (!tfValid(pWal->tfd)) return;
if (!tfValid(pWal->logTfd)) return;
pthread_mutex_lock(&pWal->mutex);
// remove the oldest wal file
int64_t oldFileId = -1;
if (walGetOldFile(pWal, pWal->fileId, WAL_FILE_NUM, &oldFileId) == 0) {
if (walGetOldFile(pWal, pWal->curFileId, WAL_FILE_NUM, &oldFileId) == 0) {
char walName[WAL_FILE_LEN] = {0};
snprintf(walName, sizeof(walName), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, oldFileId);
......@@ -95,26 +96,24 @@ void walRemoveAllOldFiles(void *handle) {
pthread_mutex_lock(&pWal->mutex);
tfClose(pWal->tfd);
wDebug("vgId:%d, file:%s, it is closed before remove all wals", pWal->vgId, pWal->name);
tfClose(pWal->logTfd);
wDebug("vgId:%d, file:%s, it is closed before remove all wals", pWal->vgId, pWal->logName);
while (walGetNextFile(pWal, &fileId) >= 0) {
snprintf(pWal->name, sizeof(pWal->name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, fileId);
snprintf(pWal->logName, sizeof(pWal->logName), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, fileId);
if (remove(pWal->name) < 0) {
wError("vgId:%d, wal:%p file:%s, failed to remove since %s", pWal->vgId, pWal, pWal->name, strerror(errno));
if (remove(pWal->logName) < 0) {
wError("vgId:%d, wal:%p file:%s, failed to remove since %s", pWal->vgId, pWal, pWal->logName, strerror(errno));
} else {
wInfo("vgId:%d, wal:%p file:%s, it is removed", pWal->vgId, pWal, pWal->name);
wInfo("vgId:%d, wal:%p file:%s, it is removed", pWal->vgId, pWal, pWal->logName);
}
}
pthread_mutex_unlock(&pWal->mutex);
}
#if defined(WAL_CHECKSUM_WHOLE)
#endif
static void walUpdateChecksum(SWalHead *pHead) {
pHead->sver = 2;
pHead->cksum = 0;
pHead->cksum = taosCalcChecksum(0, (uint8_t *)pHead, sizeof(SWalHead) + pHead->len);
}
......@@ -130,8 +129,6 @@ static int walValidateChecksum(SWalHead *pHead) {
return 0;
}
#endif
int64_t walWrite(SWal *pWal, int64_t index, void *body, int32_t bodyLen) {
if (pWal == NULL) return -1;
......@@ -143,32 +140,27 @@ int64_t walWrite(SWal *pWal, int64_t index, void *body, int32_t bodyLen) {
int32_t code = 0;
// no wal
if (!tfValid(pWal->tfd)) return 0;
if (!tfValid(pWal->curLogTfd)) return 0;
if (pWal->level == TAOS_WAL_NOLOG) return 0;
if (pHead->version <= pWal->version) return 0;
if (pHead->version <= pWal->curVersion) return 0;
pHead->signature = WAL_SIGNATURE;
pHead->len = bodyLen;
memcpy(pHead->cont, body, bodyLen);
#if defined(WAL_CHECKSUM_WHOLE)
walUpdateChecksum(pHead);
#else
pHead->sver = 0;
taosCalcChecksumAppend(0, (uint8_t *)pHead, sizeof(SWalHead));
#endif
int32_t contLen = pHead->len + sizeof(SWalHead);
pthread_mutex_lock(&pWal->mutex);
if (tfWrite(pWal->tfd, pHead, contLen) != contLen) {
if (tfWrite(pWal->curLogTfd, pHead, contLen) != contLen) {
code = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d, file:%s, failed to write since %s", pWal->vgId, pWal->name, strerror(errno));
wError("vgId:%d, file:%"PRId64".log, failed to write since %s", pWal->vgId, pWal->curFileFirstVersion, strerror(errno));
} else {
wTrace("vgId:%d, write wal, fileId:%" PRId64 " tfd:%" PRId64 " hver:%" PRId64 " wver:%" PRIu64 " len:%d", pWal->vgId,
pWal->fileId, pWal->tfd, pHead->version, pWal->version, pHead->len);
pWal->version = pHead->version;
/*wTrace("vgId:%d, write wal, fileId:%" PRId64 " tfd:%" PRId64 " hver:%" PRId64 " wver:%" PRIu64 " len:%d", pWal->vgId,*/
/*pWal->curFileId, pWal->logTfd, pHead->version, pWal->curVersion, pHead->len);*/
pWal->curVersion = pHead->version;
}
pthread_mutex_unlock(&pWal->mutex);
......@@ -179,16 +171,17 @@ int64_t walWrite(SWal *pWal, int64_t index, void *body, int32_t bodyLen) {
}
void walFsync(SWal *pWal, bool forceFsync) {
if (pWal == NULL || !tfValid(pWal->tfd)) return;
if (pWal == NULL || !tfValid(pWal->curLogTfd)) return;
if (forceFsync || (pWal->level == TAOS_WAL_FSYNC && pWal->fsyncPeriod == 0)) {
wTrace("vgId:%d, fileId:%" PRId64 ", do fsync", pWal->vgId, pWal->fileId);
if (tfFsync(pWal->tfd) < 0) {
wError("vgId:%d, fileId:%" PRId64 ", fsync failed since %s", pWal->vgId, pWal->fileId, strerror(errno));
wTrace("vgId:%d, fileId:%"PRId64".log, do fsync", pWal->vgId, pWal->curFileFirstVersion);
if (tfFsync(pWal->curLogTfd) < 0) {
wError("vgId:%d, file:%"PRId64".log, fsync failed since %s", pWal->vgId, pWal->curFileFirstVersion, strerror(errno));
}
}
}
#if 0
int32_t walRestore(void *handle, void *pVnode, FWalWrite writeFp) {
if (handle == NULL) return -1;
......@@ -198,10 +191,10 @@ int32_t walRestore(void *handle, void *pVnode, FWalWrite writeFp) {
int64_t fileId = -1;
while ((code = walGetNextFile(pWal, &fileId)) >= 0) {
if (fileId == pWal->fileId) continue;
/*if (fileId == pWal->curFileId) continue;*/
char walName[WAL_FILE_LEN];
snprintf(walName, sizeof(pWal->name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, fileId);
snprintf(walName, sizeof(pWal->logName), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, fileId);
wInfo("vgId:%d, file:%s, will be restored", pWal->vgId, walName);
code = walRestoreWalFile(pWal, pVnode, writeFp, walName, fileId);
......@@ -210,7 +203,7 @@ int32_t walRestore(void *handle, void *pVnode, FWalWrite writeFp) {
continue;
}
wInfo("vgId:%d, file:%s, restore success, wver:%" PRIu64, pWal->vgId, walName, pWal->version);
wInfo("vgId:%d, file:%s, restore success, wver:%" PRIu64, pWal->vgId, walName, pWal->curVersion);
count++;
}
......@@ -222,14 +215,14 @@ int32_t walRestore(void *handle, void *pVnode, FWalWrite writeFp) {
return walRenew(pWal);
} else {
// open the existing WAL file in append mode
pWal->fileId = 0;
snprintf(pWal->name, sizeof(pWal->name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, pWal->fileId);
pWal->tfd = tfOpenCreateWriteAppend(pWal->name);
if (!tfValid(pWal->tfd)) {
wError("vgId:%d, file:%s, failed to open since %s", pWal->vgId, pWal->name, strerror(errno));
/*pWal->curFileId = 0;*/
snprintf(pWal->logName, sizeof(pWal->logName), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, pWal->curFileId);
pWal->logTfd = tfOpenCreateWriteAppend(pWal->logName);
if (!tfValid(pWal->logTfd)) {
wError("vgId:%d, file:%s, failed to open since %s", pWal->vgId, pWal->logName, strerror(errno));
return TAOS_SYSTEM_ERROR(errno);
}
wDebug("vgId:%d, file:%s, it is created and open while restore", pWal->vgId, pWal->name);
wDebug("vgId:%d, file:%s, it is created and open while restore", pWal->vgId, pWal->logName);
}
return TSDB_CODE_SUCCESS;
......@@ -246,14 +239,15 @@ int32_t walGetWalFile(void *handle, char *fileName, int64_t *fileId) {
int32_t code = walGetNextFile(pWal, fileId);
if (code >= 0) {
sprintf(fileName, "wal/%s%" PRId64, WAL_PREFIX, *fileId);
code = (*fileId == pWal->fileId) ? 0 : 1;
/*code = (*fileId == pWal->curFileId) ? 0 : 1;*/
}
wDebug("vgId:%d, get wal file, code:%d curId:%" PRId64 " outId:%" PRId64, pWal->vgId, code, pWal->fileId, *fileId);
wDebug("vgId:%d, get wal file, code:%d curId:%" PRId64 " outId:%" PRId64, pWal->vgId, code, pWal->curFileId, *fileId);
pthread_mutex_unlock(&(pWal->mutex));
return code;
}
#endif
static void walFtruncate(SWal *pWal, int64_t tfd, int64_t offset) {
tfFtruncate(tfd, offset);
......@@ -279,13 +273,6 @@ static int32_t walSkipCorruptedRecord(SWal *pWal, SWalHead *pHead, int64_t tfd,
continue;
}
#if defined(WAL_CHECKSUM_WHOLE)
if (pHead->sver == 0 && walValidateChecksum(pHead)) {
wInfo("vgId:%d, wal head cksum check passed, offset:%" PRId64, pWal->vgId, pos);
*offset = pos;
return TSDB_CODE_SUCCESS;
}
if (pHead->sver >= 1) {
if (tfRead(tfd, pHead->cont, pHead->len) < pHead->len) {
wError("vgId:%d, read to end of corrupted wal file, offset:%" PRId64, pWal->vgId, pos);
......@@ -298,15 +285,6 @@ static int32_t walSkipCorruptedRecord(SWal *pWal, SWalHead *pHead, int64_t tfd,
return TSDB_CODE_SUCCESS;
}
}
#else
if (taosCheckChecksumWhole((uint8_t *)pHead, sizeof(SWalHead))) {
wInfo("vgId:%d, wal head cksum check passed, offset:%" PRId64, pWal->vgId, pos);
*offset = pos;
return TSDB_CODE_SUCCESS;
}
#endif
}
return TSDB_CODE_WAL_FILE_CORRUPTED;
......@@ -349,7 +327,6 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch
break;
}
#if defined(WAL_CHECKSUM_WHOLE)
if ((pHead->sver == 0 && !walValidateChecksum(pHead)) || pHead->sver < 0 || pHead->sver > 2) {
wError("vgId:%d, file:%s, wal head cksum is messed up, hver:%" PRIu64 " len:%d offset:%" PRId64, pWal->vgId, name,
pHead->version, pHead->len, offset);
......@@ -393,50 +370,15 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch
}
}
#else
if (!taosCheckChecksumWhole((uint8_t *)pHead, sizeof(SWalHead))) {
wError("vgId:%d, file:%s, wal head cksum is messed up, hver:%" PRIu64 " len:%d offset:%" PRId64, pWal->vgId, name,
pHead->version, pHead->len, offset);
code = walSkipCorruptedRecord(pWal, pHead, tfd, &offset);
if (code != TSDB_CODE_SUCCESS) {
walFtruncate(pWal, tfd, offset);
break;
}
}
if (pHead->len < 0 || pHead->len > size - sizeof(SWalHead)) {
wError("vgId:%d, file:%s, wal head len out of range, hver:%" PRIu64 " len:%d offset:%" PRId64, pWal->vgId, name,
pHead->version, pHead->len, offset);
code = walSkipCorruptedRecord(pWal, pHead, tfd, &offset);
if (code != TSDB_CODE_SUCCESS) {
walFtruncate(pWal, tfd, offset);
break;
}
}
ret = (int32_t)tfRead(tfd, pHead->cont, pHead->len);
if (ret < 0) {
wError("vgId:%d, file:%s, failed to read wal body since %s", pWal->vgId, name, strerror(errno));
code = TAOS_SYSTEM_ERROR(errno);
break;
}
if (ret < pHead->len) {
wError("vgId:%d, file:%s, failed to read wal body, ret:%d len:%d", pWal->vgId, name, ret, pHead->len);
offset += sizeof(SWalHead);
continue;
}
#endif
offset = offset + sizeof(SWalHead) + pHead->len;
wTrace("vgId:%d, restore wal, fileId:%" PRId64 " hver:%" PRIu64 " wver:%" PRIu64 " len:%d offset:%" PRId64,
pWal->vgId, fileId, pHead->version, pWal->version, pHead->len, offset);
pWal->vgId, fileId, pHead->version, pWal->curVersion, pHead->len, offset);
pWal->version = pHead->version;
pWal->curVersion = pHead->version;
// wInfo("writeFp: %ld", offset);
(*writeFp)(pVnode, pHead, NULL);
(*writeFp)(pVnode, pHead);
}
tfClose(tfd);
......@@ -449,7 +391,7 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch
uint64_t walGetVersion(SWal *pWal) {
if (pWal == NULL) return 0;
return pWal->version;
return pWal->curVersion;
}
// Wal version in slave (dnode1) must be reset.
......@@ -459,7 +401,7 @@ uint64_t walGetVersion(SWal *pWal) {
void walResetVersion(SWal *pWal, uint64_t newVer) {
if (pWal == NULL) return;
wInfo("vgId:%d, version reset from %" PRIu64 " to %" PRIu64, pWal->vgId, pWal->version, newVer);
wInfo("vgId:%d, version reset from %" PRIu64 " to %" PRIu64, pWal->vgId, pWal->curVersion, newVer);
pWal->version = newVer;
pWal->curVersion = newVer;
}
......@@ -48,14 +48,14 @@ void taosRemoveDir(const char *dirname) {
taosRemoveDir(filename);
} else {
(void)remove(filename);
printf("file:%s is removed\n", filename);
//printf("file:%s is removed\n", filename);
}
}
closedir(dir);
rmdir(dirname);
printf("dir:%s is removed\n", dirname);
//printf("dir:%s is removed\n", dirname);
}
int32_t taosDirExist(char *dirname) { return access(dirname, F_OK); }
......@@ -101,9 +101,9 @@ void taosRemoveOldFiles(char *dirname, int32_t keepDays) {
int32_t days = (int32_t)(ABS(sec - fileSec) / 86400 + 1);
if (days > keepDays) {
(void)remove(filename);
printf("file:%s is removed, days:%d keepDays:%d", filename, days, keepDays);
//printf("file:%s is removed, days:%d keepDays:%d", filename, days, keepDays);
} else {
printf("file:%s won't be removed, days:%d keepDays:%d", filename, days, keepDays);
//printf("file:%s won't be removed, days:%d keepDays:%d", filename, days, keepDays);
}
}
}
......@@ -115,7 +115,7 @@ void taosRemoveOldFiles(char *dirname, int32_t keepDays) {
int32_t taosExpandDir(char *dirname, char *outname, int32_t maxlen) {
wordexp_t full_path;
if (0 != wordexp(dirname, &full_path, 0)) {
printf("failed to expand path:%s since %s", dirname, strerror(errno));
//printf("failed to expand path:%s since %s", dirname, strerror(errno));
wordfree(&full_path);
return -1;
}
......
......@@ -404,14 +404,14 @@ int32_t taosRenameFile(char *oldName, char *newName) {
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
int32_t code = MoveFileEx(oldName, newName, MOVEFILE_REPLACE_EXISTING | MOVEFILE_COPY_ALLOWED);
if (code < 0) {
printf("failed to rename file %s to %s, reason:%s", oldName, newName, strerror(errno));
//printf("failed to rename file %s to %s, reason:%s", oldName, newName, strerror(errno));
}
return code;
#else
int32_t code = rename(oldName, newName);
if (code < 0) {
printf("failed to rename file %s to %s, reason:%s", oldName, newName, strerror(errno));
//printf("failed to rename file %s to %s, reason:%s", oldName, newName, strerror(errno));
}
return code;
......
......@@ -95,7 +95,7 @@ void taosShutDownSocketWR(SOCKET fd) {
int32_t taosSetNonblocking(SOCKET sock, int32_t on) {
int32_t flags = 0;
if ((flags = fcntl(sock, F_GETFL, 0)) < 0) {
printf("fcntl(F_GETFL) error: %d (%s)\n", errno, strerror(errno));
//printf("fcntl(F_GETFL) error: %d (%s)\n", errno, strerror(errno));
return 1;
}
......@@ -105,7 +105,7 @@ int32_t taosSetNonblocking(SOCKET sock, int32_t on) {
flags &= ~O_NONBLOCK;
if ((flags = fcntl(sock, F_SETFL, flags)) < 0) {
printf("fcntl(F_SETFL) error: %d (%s)\n", errno, strerror(errno));
//printf("fcntl(F_SETFL) error: %d (%s)\n", errno, strerror(errno));
return 1;
}
......@@ -120,7 +120,7 @@ void taosBlockSIGPIPE() {
sigaddset(&signal_mask, SIGPIPE);
int32_t rc = pthread_sigmask(SIG_BLOCK, &signal_mask, NULL);
if (rc != 0) {
printf("failed to block SIGPIPE");
//printf("failed to block SIGPIPE");
}
}
......@@ -130,7 +130,7 @@ void taosSetMaskSIGPIPE() {
sigaddset(&signal_mask, SIGPIPE);
int32_t rc = pthread_sigmask(SIG_SETMASK, &signal_mask, NULL);
if (rc != 0) {
printf("failed to setmask SIGPIPE");
//printf("failed to setmask SIGPIPE");
}
}
......@@ -277,7 +277,7 @@ int32_t taosGetFqdn(char *fqdn) {
char hostname[1024];
hostname[1023] = '\0';
if (gethostname(hostname, 1023) == -1) {
printf("failed to get hostname, reason:%s", strerror(errno));
//printf("failed to get hostname, reason:%s", strerror(errno));
return -1;
}
......@@ -294,7 +294,7 @@ int32_t taosGetFqdn(char *fqdn) {
#endif // __APPLE__
int32_t ret = getaddrinfo(hostname, NULL, &hints, &result);
if (!result) {
printf("failed to get fqdn, code:%d, reason:%s", ret, gai_strerror(ret));
//printf("failed to get fqdn, code:%d, reason:%s", ret, gai_strerror(ret));
return -1;
}
......@@ -326,12 +326,12 @@ uint32_t taosGetIpv4FromFqdn(const char *fqdn) {
} else {
#ifdef EAI_SYSTEM
if (ret == EAI_SYSTEM) {
printf("failed to get the ip address, fqdn:%s, since:%s", fqdn, strerror(errno));
//printf("failed to get the ip address, fqdn:%s, since:%s", fqdn, strerror(errno));
} else {
printf("failed to get the ip address, fqdn:%s, since:%s", fqdn, gai_strerror(ret));
//printf("failed to get the ip address, fqdn:%s, since:%s", fqdn, gai_strerror(ret));
}
#else
printf("failed to get the ip address, fqdn:%s, since:%s", fqdn, gai_strerror(ret));
//printf("failed to get the ip address, fqdn:%s, since:%s", fqdn, gai_strerror(ret));
#endif
return 0xFFFFFFFF;
}
......@@ -437,13 +437,13 @@ int32_t taosNonblockwrite(SOCKET fd, char *ptr, int32_t nbytes) {
FD_SET(fd, &fset);
if ((nready = select((int32_t)(fd + 1), NULL, &fset, NULL, &tv)) == 0) {
errno = ETIMEDOUT;
printf("fd %d timeout, no enough space to write", fd);
//printf("fd %d timeout, no enough space to write", fd);
break;
} else if (nready < 0) {
if (errno == EINTR) continue;
printf("select error, %d (%s)", errno, strerror(errno));
//printf("select error, %d (%s)", errno, strerror(errno));
return -1;
}
......@@ -451,7 +451,7 @@ int32_t taosNonblockwrite(SOCKET fd, char *ptr, int32_t nbytes) {
if (nwritten <= 0) {
if (errno == EAGAIN || errno == EINTR) continue;
printf("write error, %d (%s)", errno, strerror(errno));
//printf("write error, %d (%s)", errno, strerror(errno));
return -1;
}
......@@ -477,21 +477,21 @@ int32_t taosReadn(SOCKET fd, char *ptr, int32_t nbytes) {
FD_SET(fd, &fset);
if ((nready = select((int32_t)(fd + 1), NULL, &fset, NULL, &tv)) == 0) {
errno = ETIMEDOUT;
printf("fd %d timeout\n", fd);
//printf("fd %d timeout\n", fd);
break;
} else if (nready < 0) {
if (errno == EINTR) continue;
printf("select error, %d (%s)", errno, strerror(errno));
//printf("select error, %d (%s)", errno, strerror(errno));
return -1;
}
if ((nread = (int32_t)taosReadSocket(fd, ptr, (size_t)nleft)) < 0) {
if (errno == EINTR) continue;
printf("read error, %d (%s)", errno, strerror(errno));
//printf("read error, %d (%s)", errno, strerror(errno));
return -1;
} else if (nread == 0) {
printf("fd %d EOF", fd);
//printf("fd %d EOF", fd);
break; // EOF
}
......@@ -507,7 +507,7 @@ SOCKET taosOpenUdpSocket(uint32_t ip, uint16_t port) {
SOCKET sockFd;
int32_t bufSize = 1024000;
printf("open udp socket:0x%x:%hu", ip, port);
//printf("open udp socket:0x%x:%hu", ip, port);
memset((char *)&localAddr, 0, sizeof(localAddr));
localAddr.sin_family = AF_INET;
......@@ -515,26 +515,26 @@ SOCKET taosOpenUdpSocket(uint32_t ip, uint16_t port) {
localAddr.sin_port = (uint16_t)htons(port);
if ((sockFd = socket(AF_INET, SOCK_DGRAM, 0)) <= 2) {
printf("failed to open udp socket: %d (%s)", errno, strerror(errno));
//printf("failed to open udp socket: %d (%s)", errno, strerror(errno));
taosCloseSocketNoCheck(sockFd);
return -1;
}
if (taosSetSockOpt(sockFd, SOL_SOCKET, SO_SNDBUF, (void *)&bufSize, sizeof(bufSize)) != 0) {
printf("failed to set the send buffer size for UDP socket\n");
//printf("failed to set the send buffer size for UDP socket\n");
taosCloseSocket(sockFd);
return -1;
}
if (taosSetSockOpt(sockFd, SOL_SOCKET, SO_RCVBUF, (void *)&bufSize, sizeof(bufSize)) != 0) {
printf("failed to set the receive buffer size for UDP socket\n");
//printf("failed to set the receive buffer size for UDP socket\n");
taosCloseSocket(sockFd);
return -1;
}
/* bind socket to local address */
if (bind(sockFd, (struct sockaddr *)&localAddr, sizeof(localAddr)) < 0) {
printf("failed to bind udp socket: %d (%s), 0x%x:%hu", errno, strerror(errno), ip, port);
//printf("failed to bind udp socket: %d (%s), 0x%x:%hu", errno, strerror(errno), ip, port);
taosCloseSocket(sockFd);
return -1;
}
......@@ -551,7 +551,7 @@ SOCKET taosOpenTcpClientSocket(uint32_t destIp, uint16_t destPort, uint32_t clie
sockFd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
if (sockFd <= 2) {
printf("failed to open the socket: %d (%s)", errno, strerror(errno));
//printf("failed to open the socket: %d (%s)", errno, strerror(errno));
taosCloseSocketNoCheck(sockFd);
return -1;
}
......@@ -559,19 +559,19 @@ SOCKET taosOpenTcpClientSocket(uint32_t destIp, uint16_t destPort, uint32_t clie
/* set REUSEADDR option, so the portnumber can be re-used */
int32_t reuse = 1;
if (taosSetSockOpt(sockFd, SOL_SOCKET, SO_REUSEADDR, (void *)&reuse, sizeof(reuse)) < 0) {
printf("setsockopt SO_REUSEADDR failed: %d (%s)", errno, strerror(errno));
//printf("setsockopt SO_REUSEADDR failed: %d (%s)", errno, strerror(errno));
taosCloseSocket(sockFd);
return -1;
}
if (taosSetSockOpt(sockFd, SOL_SOCKET, SO_SNDBUF, (void *)&bufSize, sizeof(bufSize)) != 0) {
printf("failed to set the send buffer size for TCP socket\n");
//printf("failed to set the send buffer size for TCP socket\n");
taosCloseSocket(sockFd);
return -1;
}
if (taosSetSockOpt(sockFd, SOL_SOCKET, SO_RCVBUF, (void *)&bufSize, sizeof(bufSize)) != 0) {
printf("failed to set the receive buffer size for TCP socket\n");
//printf("failed to set the receive buffer size for TCP socket\n");
taosCloseSocket(sockFd);
return -1;
}
......@@ -584,8 +584,8 @@ SOCKET taosOpenTcpClientSocket(uint32_t destIp, uint16_t destPort, uint32_t clie
/* bind socket to client address */
if (bind(sockFd, (struct sockaddr *)&clientAddr, sizeof(clientAddr)) < 0) {
printf("bind tcp client socket failed, client(0x%x:0), dest(0x%x:%d), reason:(%s)", clientIp, destIp, destPort,
strerror(errno));
//printf("bind tcp client socket failed, client(0x%x:0), dest(0x%x:%d), reason:(%s)", clientIp, destIp, destPort,
// strerror(errno));
taosCloseSocket(sockFd);
return -1;
}
......@@ -601,7 +601,7 @@ SOCKET taosOpenTcpClientSocket(uint32_t destIp, uint16_t destPort, uint32_t clie
ret = connect(sockFd, (struct sockaddr *)&serverAddr, sizeof(serverAddr));
if (ret == -1) {
if (errno == EHOSTUNREACH) {
printf("failed to connect socket, ip:0x%x, port:%hu(%s)", destIp, destPort, strerror(errno));
//printf("failed to connect socket, ip:0x%x, port:%hu(%s)", destIp, destPort, strerror(errno));
taosCloseSocket(sockFd);
return -1;
} else if (errno == EINPROGRESS || errno == EAGAIN || errno == EWOULDBLOCK) {
......@@ -612,19 +612,19 @@ SOCKET taosOpenTcpClientSocket(uint32_t destIp, uint16_t destPort, uint32_t clie
int res = poll(wfd, 1, TCP_CONN_TIMEOUT);
if (res == -1 || res == 0) {
printf("failed to connect socket, ip:0x%x, port:%hu(poll error/conn timeout)", destIp, destPort);
//printf("failed to connect socket, ip:0x%x, port:%hu(poll error/conn timeout)", destIp, destPort);
taosCloseSocket(sockFd); //
return -1;
}
int optVal = -1, optLen = sizeof(int);
if ((0 != taosGetSockOpt(sockFd, SOL_SOCKET, SO_ERROR, &optVal, &optLen)) || (optVal != 0)) {
printf("failed to connect socket, ip:0x%x, port:%hu(connect host error)", destIp, destPort);
//printf("failed to connect socket, ip:0x%x, port:%hu(connect host error)", destIp, destPort);
taosCloseSocket(sockFd); //
return -1;
}
ret = 0;
} else { // Other error
printf("failed to connect socket, ip:0x%x, port:%hu(target host cannot be reached)", destIp, destPort);
//printf("failed to connect socket, ip:0x%x, port:%hu(target host cannot be reached)", destIp, destPort);
taosCloseSocket(sockFd); //
return -1;
}
......@@ -636,7 +636,7 @@ SOCKET taosOpenTcpClientSocket(uint32_t destIp, uint16_t destPort, uint32_t clie
#endif
if (ret != 0) {
printf("failed to connect socket, ip:0x%x, port:%hu(%s)", destIp, destPort, strerror(errno));
//printf("failed to connect socket, ip:0x%x, port:%hu(%s)", destIp, destPort, strerror(errno));
taosCloseSocket(sockFd);
sockFd = -1;
} else {
......@@ -649,7 +649,7 @@ SOCKET taosOpenTcpClientSocket(uint32_t destIp, uint16_t destPort, uint32_t clie
int32_t taosKeepTcpAlive(SOCKET sockFd) {
int32_t alive = 1;
if (taosSetSockOpt(sockFd, SOL_SOCKET, SO_KEEPALIVE, (void *)&alive, sizeof(alive)) < 0) {
printf("fd:%d setsockopt SO_KEEPALIVE failed: %d (%s)", sockFd, errno, strerror(errno));
//printf("fd:%d setsockopt SO_KEEPALIVE failed: %d (%s)", sockFd, errno, strerror(errno));
taosCloseSocket(sockFd);
return -1;
}
......@@ -658,21 +658,21 @@ int32_t taosKeepTcpAlive(SOCKET sockFd) {
// all fails on macosx
int32_t probes = 3;
if (taosSetSockOpt(sockFd, SOL_TCP, TCP_KEEPCNT, (void *)&probes, sizeof(probes)) < 0) {
printf("fd:%d setsockopt SO_KEEPCNT failed: %d (%s)", sockFd, errno, strerror(errno));
//printf("fd:%d setsockopt SO_KEEPCNT failed: %d (%s)", sockFd, errno, strerror(errno));
taosCloseSocket(sockFd);
return -1;
}
int32_t alivetime = 10;
if (taosSetSockOpt(sockFd, SOL_TCP, TCP_KEEPIDLE, (void *)&alivetime, sizeof(alivetime)) < 0) {
printf("fd:%d setsockopt SO_KEEPIDLE failed: %d (%s)", sockFd, errno, strerror(errno));
//printf("fd:%d setsockopt SO_KEEPIDLE failed: %d (%s)", sockFd, errno, strerror(errno));
taosCloseSocket(sockFd);
return -1;
}
int32_t interval = 3;
if (taosSetSockOpt(sockFd, SOL_TCP, TCP_KEEPINTVL, (void *)&interval, sizeof(interval)) < 0) {
printf("fd:%d setsockopt SO_KEEPINTVL failed: %d (%s)", sockFd, errno, strerror(errno));
//printf("fd:%d setsockopt SO_KEEPINTVL failed: %d (%s)", sockFd, errno, strerror(errno));
taosCloseSocket(sockFd);
return -1;
}
......@@ -680,7 +680,7 @@ int32_t taosKeepTcpAlive(SOCKET sockFd) {
int32_t nodelay = 1;
if (taosSetSockOpt(sockFd, IPPROTO_TCP, TCP_NODELAY, (void *)&nodelay, sizeof(nodelay)) < 0) {
printf("fd:%d setsockopt TCP_NODELAY failed %d (%s)", sockFd, errno, strerror(errno));
//printf("fd:%d setsockopt TCP_NODELAY failed %d (%s)", sockFd, errno, strerror(errno));
taosCloseSocket(sockFd);
return -1;
}
......@@ -689,7 +689,7 @@ int32_t taosKeepTcpAlive(SOCKET sockFd) {
linger.l_onoff = 1;
linger.l_linger = 3;
if (taosSetSockOpt(sockFd, SOL_SOCKET, SO_LINGER, (void *)&linger, sizeof(linger)) < 0) {
printf("setsockopt SO_LINGER failed: %d (%s)", errno, strerror(errno));
//printf("setsockopt SO_LINGER failed: %d (%s)", errno, strerror(errno));
taosCloseSocket(sockFd);
return -1;
}
......@@ -702,7 +702,7 @@ SOCKET taosOpenTcpServerSocket(uint32_t ip, uint16_t port) {
SOCKET sockFd;
int32_t reuse;
printf("open tcp server socket:0x%x:%hu", ip, port);
//printf("open tcp server socket:0x%x:%hu", ip, port);
bzero((char *)&serverAdd, sizeof(serverAdd));
serverAdd.sin_family = AF_INET;
......@@ -710,7 +710,7 @@ SOCKET taosOpenTcpServerSocket(uint32_t ip, uint16_t port) {
serverAdd.sin_port = (uint16_t)htons(port);
if ((sockFd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) <= 2) {
printf("failed to open TCP socket: %d (%s)", errno, strerror(errno));
//printf("failed to open TCP socket: %d (%s)", errno, strerror(errno));
taosCloseSocketNoCheck(sockFd);
return -1;
}
......@@ -718,26 +718,26 @@ SOCKET taosOpenTcpServerSocket(uint32_t ip, uint16_t port) {
/* set REUSEADDR option, so the portnumber can be re-used */
reuse = 1;
if (taosSetSockOpt(sockFd, SOL_SOCKET, SO_REUSEADDR, (void *)&reuse, sizeof(reuse)) < 0) {
printf("setsockopt SO_REUSEADDR failed: %d (%s)", errno, strerror(errno));
//printf("setsockopt SO_REUSEADDR failed: %d (%s)", errno, strerror(errno));
taosCloseSocket(sockFd);
return -1;
}
/* bind socket to server address */
if (bind(sockFd, (struct sockaddr *)&serverAdd, sizeof(serverAdd)) < 0) {
printf("bind tcp server socket failed, 0x%x:%hu(%s)", ip, port, strerror(errno));
//printf("bind tcp server socket failed, 0x%x:%hu(%s)", ip, port, strerror(errno));
taosCloseSocket(sockFd);
return -1;
}
if (taosKeepTcpAlive(sockFd) < 0) {
printf("failed to set tcp server keep-alive option, 0x%x:%hu(%s)", ip, port, strerror(errno));
//printf("failed to set tcp server keep-alive option, 0x%x:%hu(%s)", ip, port, strerror(errno));
taosCloseSocket(sockFd);
return -1;
}
if (listen(sockFd, 1024) < 0) {
printf("listen tcp server socket failed, 0x%x:%hu(%s)", ip, port, strerror(errno));
//printf("listen tcp server socket failed, 0x%x:%hu(%s)", ip, port, strerror(errno));
taosCloseSocket(sockFd);
return -1;
}
......@@ -767,16 +767,16 @@ int64_t taosCopyFds(SOCKET sfd, int32_t dfd, int64_t len) {
int64_t retLen = taosReadMsg(sfd, temp, (int32_t)readLen);
if (readLen != retLen) {
printf("read error, readLen:%" PRId64 " retLen:%" PRId64 " len:%" PRId64 " leftLen:%" PRId64 ", reason:%s",
readLen, retLen, len, leftLen, strerror(errno));
//printf("read error, readLen:%" PRId64 " retLen:%" PRId64 " len:%" PRId64 " leftLen:%" PRId64 ", reason:%s",
// readLen, retLen, len, leftLen, strerror(errno));
return -1;
}
writeLen = taosWriteMsg(dfd, temp, (int32_t)readLen);
if (readLen != writeLen) {
printf("copy error, readLen:%" PRId64 " writeLen:%" PRId64 " len:%" PRId64 " leftLen:%" PRId64 ", reason:%s",
readLen, writeLen, len, leftLen, strerror(errno));
//printf("copy error, readLen:%" PRId64 " writeLen:%" PRId64 " len:%" PRId64 " leftLen:%" PRId64 ", reason:%s",
// readLen, writeLen, len, leftLen, strerror(errno));
return -1;
}
......
......@@ -277,7 +277,7 @@ char *strsep(char **stringp, const char *delim) {
char *getpass(const char *prefix) {
static char passwd[TSDB_KEY_LEN] = {0};
memset(passwd, 0, TSDB_KEY_LEN);
printf("%s", prefix);
//printf("%s", prefix);
int32_t index = 0;
char ch;
......
......@@ -134,7 +134,7 @@ int32_t taosGetDiskSize(char *dataDir, SysDiskSize *diskSize) {
diskSize->used = (int64_t)(i64TotalBytes - i64FreeBytes);
return 0;
} else {
printf("failed to get disk size, dataDir:%s errno:%s", tsDataDir, strerror(errno));
//printf("failed to get disk size, dataDir:%s errno:%s", tsDataDir, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
......@@ -205,12 +205,12 @@ void taosGetSystemInfo() {
}
void taosKillSystem() {
printf("function taosKillSystem, exit!");
//printf("function taosKillSystem, exit!");
exit(0);
}
int taosSystem(const char *cmd) {
printf("taosSystem not support");
//printf("taosSystem not support");
return -1;
}
......@@ -280,7 +280,7 @@ static void taosGetSystemTimezone() {
{
int n = readlink("/etc/localtime", buf, sizeof(buf));
if (n < 0) {
printf("read /etc/localtime error, reason:%s", strerror(errno));
//printf("read /etc/localtime error, reason:%s", strerror(errno));
return;
}
buf[n] = '\0';
......@@ -294,7 +294,7 @@ static void taosGetSystemTimezone() {
}
}
if (!tz || 0 == strchr(tz, '/')) {
printf("parsing /etc/localtime failed");
//printf("parsing /etc/localtime failed");
return;
}
......@@ -321,7 +321,7 @@ static void taosGetSystemTimezone() {
-timezone / 3600);
// cfg_timezone->cfgStatus = TAOS_CFG_CSTATUS_DEFAULT;
printf("timezone not configured, set to system default:%s", tsTimezone);
//printf("timezone not configured, set to system default:%s", tsTimezone);
}
/*
......@@ -348,11 +348,11 @@ static void taosGetSystemLocale() { // get and set default locale
locale = setlocale(LC_CTYPE, "");
if (locale == NULL) {
printf("can't get locale from system, set it to en_US.UTF-8 since error:%d:%s", errno, strerror(errno));
//printf("can't get locale from system, set it to en_US.UTF-8 since error:%d:%s", errno, strerror(errno));
strcpy(tsLocale, "en_US.UTF-8");
} else {
tstrncpy(tsLocale, locale, TSDB_LOCALE_LEN);
printf("locale not configured, set to system default:%s", tsLocale);
//printf("locale not configured, set to system default:%s", tsLocale);
}
/* if user does not specify the charset, extract it from locale */
......@@ -364,15 +364,15 @@ static void taosGetSystemLocale() { // get and set default locale
tstrncpy(tsCharset, revisedCharset, TSDB_LOCALE_LEN);
free(revisedCharset);
printf("charset not configured, set to system default:%s", tsCharset);
//printf("charset not configured, set to system default:%s", tsCharset);
} else {
strcpy(tsCharset, "UTF-8");
printf("can't get locale and charset from system, set it to UTF-8");
//printf("can't get locale and charset from system, set it to UTF-8");
}
}
void taosKillSystem() {
printf("function taosKillSystem, exit!");
//printf("function taosKillSystem, exit!");
exit(0);
}
......@@ -432,7 +432,7 @@ bool taosGetSysMemory(float *memoryUsedMB) {
}
int taosSystem(const char *cmd) {
printf("un support funtion");
//printf("un support funtion");
return -1;
}
......@@ -441,7 +441,7 @@ void taosSetCoreDump() {}
int32_t taosGetDiskSize(char *dataDir, SysDiskSize *diskSize) {
struct statvfs info;
if (statvfs(dataDir, &info)) {
printf("failed to get disk size, dataDir:%s errno:%s", tsDataDir, strerror(errno));
//printf("failed to get disk size, dataDir:%s errno:%s", tsDataDir, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
} else {
......@@ -535,7 +535,7 @@ bool taosGetSysMemory(float *memoryUsedMB) {
bool taosGetProcMemory(float *memoryUsedMB) {
FILE *fp = fopen(tsProcMemFile, "r");
if (fp == NULL) {
printf("open file:%s failed", tsProcMemFile);
//printf("open file:%s failed", tsProcMemFile);
return false;
}
......@@ -555,7 +555,7 @@ bool taosGetProcMemory(float *memoryUsedMB) {
}
if (line == NULL) {
printf("read file:%s failed", tsProcMemFile);
//printf("read file:%s failed", tsProcMemFile);
fclose(fp);
return false;
}
......@@ -573,7 +573,7 @@ bool taosGetProcMemory(float *memoryUsedMB) {
static bool taosGetSysCpuInfo(SysCpuInfo *cpuInfo) {
FILE *fp = fopen(tsSysCpuFile, "r");
if (fp == NULL) {
printf("open file:%s failed", tsSysCpuFile);
//printf("open file:%s failed", tsSysCpuFile);
return false;
}
......@@ -581,7 +581,7 @@ static bool taosGetSysCpuInfo(SysCpuInfo *cpuInfo) {
char * line = NULL;
ssize_t _bytes = getline(&line, &len, fp);
if ((_bytes < 0) || (line == NULL)) {
printf("read file:%s failed", tsSysCpuFile);
//printf("read file:%s failed", tsSysCpuFile);
fclose(fp);
return false;
}
......@@ -598,7 +598,7 @@ static bool taosGetSysCpuInfo(SysCpuInfo *cpuInfo) {
static bool taosGetProcCpuInfo(ProcCpuInfo *cpuInfo) {
FILE *fp = fopen(tsProcCpuFile, "r");
if (fp == NULL) {
printf("open file:%s failed", tsProcCpuFile);
//printf("open file:%s failed", tsProcCpuFile);
return false;
}
......@@ -606,7 +606,7 @@ static bool taosGetProcCpuInfo(ProcCpuInfo *cpuInfo) {
char * line = NULL;
ssize_t _bytes = getline(&line, &len, fp);
if ((_bytes < 0) || (line == NULL)) {
printf("read file:%s failed", tsProcCpuFile);
//printf("read file:%s failed", tsProcCpuFile);
fclose(fp);
return false;
}
......@@ -642,7 +642,7 @@ static void taosGetSystemTimezone() {
int len = fread(buf, 64, 1, f);
if (len < 64 && ferror(f)) {
fclose(f);
printf("read /etc/timezone error, reason:%s", strerror(errno));
//printf("read /etc/timezone error, reason:%s", strerror(errno));
return;
}
......@@ -681,7 +681,7 @@ static void taosGetSystemTimezone() {
snprintf(tsTimezone, TSDB_TIMEZONE_LEN, "%s (%s, %s%02d00)", buf, tzname[daylight], tz >= 0 ? "+" : "-", abs(tz));
// cfg_timezone->cfgStatus = TAOS_CFG_CSTATUS_DEFAULT;
printf("timezone not configured, set to system default:%s", tsTimezone);
//printf("timezone not configured, set to system default:%s", tsTimezone);
}
/*
......@@ -707,11 +707,11 @@ static void taosGetSystemLocale() { // get and set default locale
locale = setlocale(LC_CTYPE, "");
if (locale == NULL) {
printf("can't get locale from system, set it to en_US.UTF-8 since error:%d:%s", errno, strerror(errno));
//printf("can't get locale from system, set it to en_US.UTF-8 since error:%d:%s", errno, strerror(errno));
strcpy(tsLocale, "en_US.UTF-8");
} else {
tstrncpy(tsLocale, locale, TSDB_LOCALE_LEN);
printf("locale not configured, set to system default:%s", tsLocale);
//printf("locale not configured, set to system default:%s", tsLocale);
}
/* if user does not specify the charset, extract it from locale */
......@@ -723,10 +723,10 @@ static void taosGetSystemLocale() { // get and set default locale
tstrncpy(tsCharset, revisedCharset, TSDB_LOCALE_LEN);
free(revisedCharset);
printf("charset not configured, set to system default:%s", tsCharset);
//printf("charset not configured, set to system default:%s", tsCharset);
} else {
strcpy(tsCharset, "UTF-8");
printf("can't get locale and charset from system, set it to UTF-8");
//printf("can't get locale and charset from system, set it to UTF-8");
}
}
......@@ -774,7 +774,7 @@ bool taosGetCpuUsage(float *sysCpuUsage, float *procCpuUsage) {
int32_t taosGetDiskSize(char *dataDir, SysDiskSize *diskSize) {
struct statvfs info;
if (statvfs(dataDir, &info)) {
printf("failed to get disk size, dataDir:%s errno:%s", dataDir, strerror(errno));
//printf("failed to get disk size, dataDir:%s errno:%s", dataDir, strerror(errno));
return -1;
} else {
diskSize->tsize = info.f_blocks * info.f_frsize;
......@@ -788,7 +788,7 @@ bool taosGetCardInfo(int64_t *bytes, int64_t *rbytes, int64_t *tbytes) {
*bytes = 0;
FILE *fp = fopen(tsSysNetFile, "r");
if (fp == NULL) {
printf("open file:%s failed", tsSysNetFile);
//printf("open file:%s failed", tsSysNetFile);
return false;
}
......@@ -864,7 +864,7 @@ bool taosGetBandSpeed(float *bandSpeedKb) {
double totalBytes = (double)(curBytes - lastBytes) / 1024 * 8; // Kb
*bandSpeedKb = (float)(totalBytes / (double)(curTime - lastTime));
// printf("bandwidth lastBytes:%ld, lastTime:%ld, curBytes:%ld, curTime:%ld,
// //printf("bandwidth lastBytes:%ld, lastTime:%ld, curBytes:%ld, curTime:%ld,
// speed:%f", lastBytes, lastTime, curBytes, curTime, *bandSpeed);
lastTime = curTime;
......@@ -876,7 +876,7 @@ bool taosGetBandSpeed(float *bandSpeedKb) {
bool taosReadProcIO(int64_t *rchars, int64_t *wchars) {
FILE *fp = fopen(tsProcIOFile, "r");
if (fp == NULL) {
printf("open file:%s failed", tsProcIOFile);
//printf("open file:%s failed", tsProcIOFile);
return false;
}
......@@ -909,7 +909,7 @@ bool taosReadProcIO(int64_t *rchars, int64_t *wchars) {
fclose(fp);
if (readIndex < 2) {
printf("read file:%s failed", tsProcIOFile);
//printf("read file:%s failed", tsProcIOFile);
return false;
}
......@@ -964,7 +964,7 @@ void taosGetSystemInfo() {
void taosKillSystem() {
// SIGINT
printf("taosd will shut down soon");
//printf("taosd will shut down soon");
kill(tsProcId, 2);
}
......@@ -973,22 +973,22 @@ int taosSystem(const char *cmd) {
int res;
char buf[1024];
if (cmd == NULL) {
printf("taosSystem cmd is NULL!");
//printf("taosSystem cmd is NULL!");
return -1;
}
if ((fp = popen(cmd, "r")) == NULL) {
printf("popen cmd:%s error: %s", cmd, strerror(errno));
//printf("popen cmd:%s error: %s", cmd, strerror(errno));
return -1;
} else {
while (fgets(buf, sizeof(buf), fp)) {
printf("popen result:%s", buf);
//printf("popen result:%s", buf);
}
if ((res = pclose(fp)) == -1) {
printf("close popen file pointer fp error!");
//printf("close popen file pointer fp error!");
} else {
printf("popen res is :%d", res);
//printf("popen res is :%d", res);
}
return res;
......@@ -1003,14 +1003,14 @@ void taosSetCoreDump(bool enable) {
struct rlimit rlim_new;
if (getrlimit(RLIMIT_CORE, &rlim) == 0) {
#ifndef _ALPINE
printf("the old unlimited para: rlim_cur=%" PRIu64 ", rlim_max=%" PRIu64, rlim.rlim_cur, rlim.rlim_max);
//printf("the old unlimited para: rlim_cur=%" PRIu64 ", rlim_max=%" PRIu64, rlim.rlim_cur, rlim.rlim_max);
#else
printf("the old unlimited para: rlim_cur=%llu, rlim_max=%llu", rlim.rlim_cur, rlim.rlim_max);
//printf("the old unlimited para: rlim_cur=%llu, rlim_max=%llu", rlim.rlim_cur, rlim.rlim_max);
#endif
rlim_new.rlim_cur = RLIM_INFINITY;
rlim_new.rlim_max = RLIM_INFINITY;
if (setrlimit(RLIMIT_CORE, &rlim_new) != 0) {
printf("set unlimited fail, error: %s", strerror(errno));
//printf("set unlimited fail, error: %s", strerror(errno));
rlim_new.rlim_cur = rlim.rlim_max;
rlim_new.rlim_max = rlim.rlim_max;
(void)setrlimit(RLIMIT_CORE, &rlim_new);
......@@ -1019,9 +1019,9 @@ void taosSetCoreDump(bool enable) {
if (getrlimit(RLIMIT_CORE, &rlim) == 0) {
#ifndef _ALPINE
printf("the new unlimited para: rlim_cur=%" PRIu64 ", rlim_max=%" PRIu64, rlim.rlim_cur, rlim.rlim_max);
//printf("the new unlimited para: rlim_cur=%" PRIu64 ", rlim_max=%" PRIu64, rlim.rlim_cur, rlim.rlim_max);
#else
printf("the new unlimited para: rlim_cur=%llu, rlim_max=%llu", rlim.rlim_cur, rlim.rlim_max);
//printf("the new unlimited para: rlim_cur=%llu, rlim_max=%llu", rlim.rlim_cur, rlim.rlim_max);
#endif
}
......@@ -1047,10 +1047,10 @@ void taosSetCoreDump(bool enable) {
old_len = sizeof(old_usespid);
if (syscall(SYS__sysctl, &args) == -1) {
printf("_sysctl(kern_core_uses_pid) set fail: %s", strerror(errno));
//printf("_sysctl(kern_core_uses_pid) set fail: %s", strerror(errno));
}
printf("The old core_uses_pid[%" PRIu64 "]: %d", old_len, old_usespid);
//printf("The old core_uses_pid[%" PRIu64 "]: %d", old_len, old_usespid);
old_usespid = 0;
old_len = 0;
......@@ -1063,10 +1063,10 @@ void taosSetCoreDump(bool enable) {
old_len = sizeof(old_usespid);
if (syscall(SYS__sysctl, &args) == -1) {
printf("_sysctl(kern_core_uses_pid) get fail: %s", strerror(errno));
//printf("_sysctl(kern_core_uses_pid) get fail: %s", strerror(errno));
}
printf("The new core_uses_pid[%" PRIu64 "]: %d", old_len, old_usespid);
//printf("The new core_uses_pid[%" PRIu64 "]: %d", old_len, old_usespid);
#endif
}
......
......@@ -87,11 +87,11 @@ int taosSetConsoleEcho(bool on) {
void* taosLoadDll(const char* filename) {
void* handle = dlopen(filename, RTLD_LAZY);
if (!handle) {
printf("load dll:%s failed, error:%s", filename, dlerror());
//printf("load dll:%s failed, error:%s", filename, dlerror());
return NULL;
}
printf("dll %s loaded", filename);
//printf("dll %s loaded", filename);
return handle;
}
......@@ -101,11 +101,11 @@ void* taosLoadSym(void* handle, char* name) {
char* error = NULL;
if ((error = dlerror()) != NULL) {
printf("load sym:%s failed, error:%s", name, dlerror());
//printf("load sym:%s failed, error:%s", name, dlerror());
return NULL;
}
printf("sym %s loaded", name);
//printf("sym %s loaded", name);
return sym;
}
......@@ -133,7 +133,7 @@ int taosSetConsoleEcho(bool on) {
err = tcsetattr(STDIN_FILENO, TCSAFLUSH, &term);
if (err == -1 || err == EINTR) {
printf("Cannot set the attribution of the terminal");
//printf("Cannot set the attribution of the terminal");
return -1;
}
......
......@@ -170,7 +170,7 @@ static void *taosProcessAlarmSignal(void *tharg) {
sevent.sigev_signo = SIGALRM;
if (timer_create(CLOCK_REALTIME, &sevent, &timerId) == -1) {
printf("Failed to create timer");
//printf("Failed to create timer");
}
pthread_cleanup_push(taosDeleteTimer, &timerId);
......@@ -182,17 +182,17 @@ static void *taosProcessAlarmSignal(void *tharg) {
ts.it_interval.tv_nsec = 1000000 * MSECONDS_PER_TICK;
if (timer_settime(timerId, 0, &ts, NULL)) {
printf("Failed to init timer");
//printf("Failed to init timer");
return NULL;
}
int signo;
while (!stopTimer) {
if (sigwait(&sigset, &signo)) {
printf("Failed to wait signal: number %d", signo);
//printf("Failed to wait signal: number %d", signo);
continue;
}
/* printf("Signal handling: number %d ......\n", signo); */
/* //printf("Signal handling: number %d ......\n", signo); */
callback(0);
}
......@@ -208,10 +208,10 @@ int taosInitTimer(void (*callback)(int), int ms) {
int code = pthread_create(&timerThread, &tattr, taosProcessAlarmSignal, callback);
pthread_attr_destroy(&tattr);
if (code != 0) {
printf("failed to create timer thread");
//printf("failed to create timer thread");
return -1;
} else {
printf("timer thread:0x%08" PRIx64 " is created", taosGetPthreadId(timerThread));
//printf("timer thread:0x%08" PRIx64 " is created", taosGetPthreadId(timerThread));
}
return 0;
......@@ -220,7 +220,7 @@ int taosInitTimer(void (*callback)(int), int ms) {
void taosUninitTimer() {
stopTimer = true;
printf("join timer thread:0x%08" PRIx64, taosGetPthreadId(timerThread));
//printf("join timer thread:0x%08" PRIx64, taosGetPthreadId(timerThread));
pthread_join(timerThread, NULL);
}
......
......@@ -107,7 +107,7 @@ bool taosQueueEmpty(taos_queue param) {
if (queue->head == NULL && queue->tail == NULL) {
empty = true;
}
pthread_mutex_destroy(&queue->mutex);
pthread_mutex_unlock(&queue->mutex);
return empty;
}
......
......@@ -50,7 +50,7 @@ void tWorkerCleanup(SWorkerPool *pool) {
}
}
free(pool->workers);
tfree(pool->workers);
taosCloseQset(pool->qset);
pthread_mutex_destroy(&pool->mutex);
......@@ -159,7 +159,7 @@ void tMWorkerCleanup(SMWorkerPool *pool) {
}
}
free(pool->workers);
tfree(pool->workers);
pthread_mutex_destroy(&pool->mutex);
uInfo("worker:%s is closed", pool->name);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册