diff --git a/include/common/tmsg.h b/include/common/tmsg.h
index f32fdcbae73f613e6ccd8795046e96fc66c52060..dfd376f1e9f4f4abac5fcc755010b33707d0bf70 100644
--- a/include/common/tmsg.h
+++ b/include/common/tmsg.h
@@ -188,16 +188,19 @@ void* tDeserializeSClientHbRsp(void* buf, SClientHbRsp* pRsp);
static FORCE_INLINE void tFreeClientHbReq(void *pReq) {
SClientHbReq* req = (SClientHbReq*)pReq;
- taosHashCleanup(req->info);
- free(pReq);
+ if (req->info) taosHashCleanup(req->info);
}
int tSerializeSClientHbBatchReq(void** buf, const SClientHbBatchReq* pReq);
void* tDeserializeSClientHbBatchReq(void* buf, SClientHbBatchReq* pReq);
-static FORCE_INLINE void tFreeClientHbBatchReq(void* pReq) {
+static FORCE_INLINE void tFreeClientHbBatchReq(void* pReq, bool deep) {
SClientHbBatchReq *req = (SClientHbBatchReq*)pReq;
- //taosArrayDestroyEx(req->reqs, tFreeClientHbReq);
+ if (deep) {
+ taosArrayDestroyEx(req->reqs, tFreeClientHbReq);
+ } else {
+ taosArrayDestroy(req->reqs);
+ }
free(pReq);
}
diff --git a/include/libs/executor/dataSinkMgt.h b/include/libs/executor/dataSinkMgt.h
index 5cef3b22537b6f7c19435694a24622659f242fed..371cb124056b3d7b4d2175a785f9bd139e61531e 100644
--- a/include/libs/executor/dataSinkMgt.h
+++ b/include/libs/executor/dataSinkMgt.h
@@ -70,7 +70,7 @@ int32_t dsCreateDataSinker(const struct SDataSink *pDataSink, DataSinkHandle* pH
*/
int32_t dsPutDataBlock(DataSinkHandle handle, const SInputData* pInput, bool* pContinue);
-void dsEndPut(DataSinkHandle handle, int64_t useconds);
+void dsEndPut(DataSinkHandle handle, uint64_t useconds);
/**
* Get the length of the data returned by the next call to dsGetDataBlock.
diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h
index e4f8d291ebaef7afa11f3e9284a50dbe868e7b43..0fc7fd679e79725b71ad023175c36961c796a381 100644
--- a/include/libs/executor/executor.h
+++ b/include/libs/executor/executor.h
@@ -20,6 +20,8 @@
extern "C" {
#endif
+#include "common.h"
+
typedef void* qTaskInfo_t;
typedef void* DataSinkHandle;
struct SSubplan;
@@ -34,7 +36,7 @@ struct SSubplan;
* @param qId
* @return
*/
-int32_t qCreateExecTask(void* tsdb, int32_t vgId, struct SSubplan* pPlan, qTaskInfo_t* pTaskInfo);
+int32_t qCreateExecTask(void* tsdb, int32_t vgId, struct SSubplan* pPlan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle);
/**
* The main task execution function, including query on both table and multiple tables,
@@ -44,7 +46,7 @@ int32_t qCreateExecTask(void* tsdb, int32_t vgId, struct SSubplan* pPlan, qTaskI
* @param handle
* @return
*/
-struct SSDataBlock* qExecTask(qTaskInfo_t tinfo, DataSinkHandle* handle);
+int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds);
/**
* Retrieve the produced results information, if current query is not paused or completed,
diff --git a/include/libs/tfs/tfs.h b/include/libs/tfs/tfs.h
index 793c861363d64d32d112756d9a58d020fd607378..3828a931446da6f2692de4d0af23dea7302a902b 100644
--- a/include/libs/tfs/tfs.h
+++ b/include/libs/tfs/tfs.h
@@ -13,8 +13,8 @@
* along with this program. If not, see .
*/
-#ifndef TD_TFS_H
-#define TD_TFS_H
+#ifndef _TD_TFS_H_
+#define _TD_TFS_H_
#include "tglobal.h"
@@ -23,8 +23,8 @@ extern "C" {
#endif
typedef struct {
- int level;
- int id;
+ int32_t level;
+ int32_t id;
} SDiskID;
#define TFS_UNDECIDED_LEVEL -1
@@ -36,33 +36,25 @@ typedef struct {
// FS APIs ====================================
typedef struct {
- int64_t tsize;
+ int64_t total;
int64_t used;
int64_t avail;
} SFSMeta;
-typedef struct {
- int64_t size;
- int64_t used;
- int64_t free;
- int16_t nAvailDisks; // # of Available disks
-} STierMeta;
-
-int tfsInit(SDiskCfg *pDiskCfg, int ndisk);
-void tfsCleanup();
-void tfsUpdateInfo(SFSMeta *pFSMeta, STierMeta *tierMetas, int8_t numLevels);
-void tfsGetMeta(SFSMeta *pMeta);
-void tfsAllocDisk(int expLevel, int *level, int *id);
+int32_t tfsInit(SDiskCfg *pDiskCfg, int32_t ndisk);
+void tfsCleanup();
+void tfsUpdateSize(SFSMeta *pFSMeta);
+void tfsAllocDisk(int32_t expLevel, int32_t *level, int32_t *id);
const char *TFS_PRIMARY_PATH();
-const char *TFS_DISK_PATH(int level, int id);
+const char *TFS_DISK_PATH(int32_t level, int32_t id);
// TFILE APIs ====================================
typedef struct {
- int level;
- int id;
- char rname[TSDB_FILENAME_LEN]; // REL name
- char aname[TSDB_FILENAME_LEN]; // ABS name
+ int32_t level;
+ int32_t id;
+ char rname[TSDB_FILENAME_LEN]; // REL name
+ char aname[TSDB_FILENAME_LEN]; // ABS name
} TFILE;
#define TFILE_LEVEL(pf) ((pf)->level)
@@ -76,23 +68,23 @@ typedef struct {
#define tfscopy(sf, df) taosCopyFile(TFILE_NAME(sf), TFILE_NAME(df))
#define tfsrename(sf, df) taosRename(TFILE_NAME(sf), TFILE_NAME(df))
-void tfsInitFile(TFILE *pf, int level, int id, const char *bname);
-bool tfsIsSameFile(const TFILE *pf1, const TFILE *pf2);
-int tfsEncodeFile(void **buf, TFILE *pf);
-void *tfsDecodeFile(void *buf, TFILE *pf);
-void tfsbasename(const TFILE *pf, char *dest);
-void tfsdirname(const TFILE *pf, char *dest);
+void tfsInitFile(TFILE *pf, int32_t level, int32_t id, const char *bname);
+bool tfsIsSameFile(const TFILE *pf1, const TFILE *pf2);
+int32_t tfsEncodeFile(void **buf, TFILE *pf);
+void *tfsDecodeFile(void *buf, TFILE *pf);
+void tfsbasename(const TFILE *pf, char *dest);
+void tfsdirname(const TFILE *pf, char *dest);
// DIR APIs ====================================
-int tfsMkdirAt(const char *rname, int level, int id);
-int tfsMkdirRecurAt(const char *rname, int level, int id);
-int tfsMkdir(const char *rname);
-int tfsRmdir(const char *rname);
-int tfsRename(char *orname, char *nrname);
+int32_t tfsMkdirAt(const char *rname, int32_t level, int32_t id);
+int32_t tfsMkdirRecurAt(const char *rname, int32_t level, int32_t id);
+int32_t tfsMkdir(const char *rname);
+int32_t tfsRmdir(const char *rname);
+int32_t tfsRename(char *orname, char *nrname);
typedef struct TDIR TDIR;
-TDIR * tfsOpendir(const char *rname);
+TDIR *tfsOpendir(const char *rname);
const TFILE *tfsReaddir(TDIR *tdir);
void tfsClosedir(TDIR *tdir);
@@ -100,4 +92,4 @@ void tfsClosedir(TDIR *tdir);
}
#endif
-#endif
+#endif /*_TD_TFS_H_*/
diff --git a/include/os/osSysinfo.h b/include/os/osSysinfo.h
index 36ff4194d8c2db46292ed64fc658c3ad33d409fd..9dcb075489ac3fed02c3bdb77610a420800c6859 100644
--- a/include/os/osSysinfo.h
+++ b/include/os/osSysinfo.h
@@ -35,12 +35,12 @@ extern char tsLocale[];
extern char tsCharset[]; // default encode string
typedef struct {
- int64_t tsize;
+ int64_t total;
int64_t used;
int64_t avail;
-} SysDiskSize;
+} SDiskSize;
-int32_t taosGetDiskSize(char *dataDir, SysDiskSize *diskSize);
+int32_t taosGetDiskSize(char *dataDir, SDiskSize *diskSize);
int32_t taosGetCpuCores();
void taosGetSystemInfo();
bool taosReadProcIO(int64_t *rchars, int64_t *wchars);
diff --git a/include/util/taoserror.h b/include/util/taoserror.h
index a855e6d881961e77c6e1ca38ae63185f170f158e..93e38113ce7f74ec57456eb6d2b390679ab15a2c 100644
--- a/include/util/taoserror.h
+++ b/include/util/taoserror.h
@@ -411,7 +411,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_WAL_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x1004) //"WAL out of memory")
// tfs
-#define TSDB_CODE_FS_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x2200) //"tfs out of memory")
+#define TSDB_CODE_FS_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x2200) //"tfs out of memory")
#define TSDB_CODE_FS_INVLD_CFG TAOS_DEF_ERROR_CODE(0, 0x2201) //"tfs invalid mount config")
#define TSDB_CODE_FS_TOO_MANY_MOUNT TAOS_DEF_ERROR_CODE(0, 0x2202) //"tfs too many mount")
#define TSDB_CODE_FS_DUP_PRIMARY TAOS_DEF_ERROR_CODE(0, 0x2203) //"tfs duplicate primary mount")
diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c
index 6d7fc9f81a1de3effef84893bc1469afdca40732..0f4ff6f72571a31eccaac783f6117baa7761f3bd 100644
--- a/source/client/src/clientHb.c
+++ b/source/client/src/clientHb.c
@@ -60,15 +60,17 @@ SClientHbBatchReq* hbGatherAllInfo(SAppHbMgr *pAppHbMgr) {
pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter);
}
+#if 0
pIter = taosHashIterate(pAppHbMgr->getInfoFuncs, NULL);
while (pIter != NULL) {
FGetConnInfo getConnInfoFp = (FGetConnInfo)pIter;
SClientHbKey connKey;
taosHashCopyKey(pIter, &connKey);
- getConnInfoFp(connKey, NULL);
+ SArray* pArray = getConnInfoFp(connKey, NULL);
pIter = taosHashIterate(pAppHbMgr->getInfoFuncs, pIter);
}
+#endif
return pBatchReq;
}
@@ -99,12 +101,12 @@ static void* hbThreadFunc(void* param) {
//TODO: error handling
break;
}
- void *bufCopy = buf;
- tSerializeSClientHbBatchReq(&bufCopy, pReq);
+ void *abuf = buf;
+ tSerializeSClientHbBatchReq(&abuf, pReq);
SMsgSendInfo *pInfo = malloc(sizeof(SMsgSendInfo));
if (pInfo == NULL) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
- tFreeClientHbBatchReq(pReq);
+ tFreeClientHbBatchReq(pReq, false);
free(buf);
break;
}
@@ -120,7 +122,7 @@ static void* hbThreadFunc(void* param) {
int64_t transporterId = 0;
SEpSet epSet = getEpSet_s(&pAppInstInfo->mgmtEp);
asyncSendMsgToServer(pAppInstInfo->pTransporter, &epSet, &transporterId, pInfo);
- tFreeClientHbBatchReq(pReq);
+ tFreeClientHbBatchReq(pReq, false);
atomic_add_fetch_32(&pAppHbMgr->reportCnt, 1);
}
@@ -155,6 +157,9 @@ SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo) {
}
// init stat
pAppHbMgr->startTime = taosGetTimestampMs();
+ pAppHbMgr->connKeyCnt = 0;
+ pAppHbMgr->reportCnt = 0;
+ pAppHbMgr->reportBytes = 0;
// init app info
pAppHbMgr->pAppInstInfo = pAppInstInfo;
diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h
index de101b0f069f93b9ed05b6b9c403b85b5a45ecd5..a2d6bbf4e63a4aa99a14e823fb3e980c767b67bd 100644
--- a/source/dnode/mnode/impl/inc/mndDef.h
+++ b/source/dnode/mnode/impl/inc/mndDef.h
@@ -325,6 +325,19 @@ typedef struct SMqTopicConsumer {
} SMqTopicConsumer;
#endif
+typedef struct SMqConsumerEp {
+ int32_t vgId;
+ SEpSet epset;
+ int64_t consumerId;
+} SMqConsumerEp;
+
+typedef struct SMqCgroupTopicPair {
+ char key[TSDB_CONSUMER_GROUP_LEN + TSDB_TOPIC_FNAME_LEN];
+ SArray* assigned; // SArray
+ SArray* unassignedConsumer;
+ SArray* unassignedVg;
+} SMqCgroupTopicPair;
+
typedef struct SMqCGroup {
char name[TSDB_CONSUMER_GROUP_LEN];
int32_t status; // 0 - uninitialized, 1 - wait rebalance, 2- normal
@@ -351,10 +364,11 @@ typedef struct SMqTopicObj {
// TODO: add cache and change name to id
typedef struct SMqConsumerTopic {
+ char name[TSDB_TOPIC_FNAME_LEN];
int32_t epoch;
- char name[TSDB_TOPIC_NAME_LEN];
//TODO: replace with something with ep
SList *vgroups; // SList
+ SArray *pVgInfo; // SArray
} SMqConsumerTopic;
typedef struct SMqConsumerObj {
@@ -362,7 +376,7 @@ typedef struct SMqConsumerObj {
SRWLatch lock;
char cgroup[TSDB_CONSUMER_GROUP_LEN];
SArray *topics; // SArray
- SHashObj *topicHash;
+ SHashObj *topicHash; //SHashObj
} SMqConsumerObj;
typedef struct SMqSubConsumerObj {
diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c
index 54e640d8b791bea1a64c253049c4b17bc3d913f5..d27bf53a9068db6b8980d0b937c2c16ff3bb8a24 100644
--- a/source/dnode/mnode/impl/src/mndConsumer.c
+++ b/source/dnode/mnode/impl/src/mndConsumer.c
@@ -204,34 +204,37 @@ void mndReleaseConsumer(SMnode *pMnode, SMqConsumerObj *pConsumer) {
static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
char *msgStr = pMsg->rpcMsg.pCont;
- SCMSubscribeReq *pSubscribe;
- tDeserializeSCMSubscribeReq(msgStr, pSubscribe);
- int64_t consumerId = pSubscribe->consumerId;
- char *consumerGroup = pSubscribe->consumerGroup;
+ SCMSubscribeReq subscribe;
+ tDeserializeSCMSubscribeReq(msgStr, &subscribe);
+ int64_t consumerId = subscribe.consumerId;
+ char *consumerGroup = subscribe.consumerGroup;
int32_t cgroupLen = strlen(consumerGroup);
SArray *newSub = NULL;
- int newTopicNum = pSubscribe->topicNum;
+ int newTopicNum = subscribe.topicNum;
if (newTopicNum) {
newSub = taosArrayInit(newTopicNum, sizeof(SMqConsumerTopic));
}
+ SMqConsumerTopic *pConsumerTopics = calloc(newTopicNum, sizeof(SMqConsumerTopic));
+ if (pConsumerTopics == NULL) {
+ terrno = TSDB_CODE_OUT_OF_MEMORY;
+ return -1;
+ }
for (int i = 0; i < newTopicNum; i++) {
char *newTopicName = taosArrayGetP(newSub, i);
- SMqConsumerTopic *pConsumerTopic = malloc(sizeof(SMqConsumerTopic));
- if (pConsumerTopic == NULL) {
- terrno = TSDB_CODE_OUT_OF_MEMORY;
- // TODO: free
- return -1;
- }
+ SMqConsumerTopic *pConsumerTopic = &pConsumerTopics[i];
+
strcpy(pConsumerTopic->name, newTopicName);
pConsumerTopic->vgroups = tdListNew(sizeof(int64_t));
- taosArrayPush(newSub, pConsumerTopic);
- free(pConsumerTopic);
}
+
+ taosArrayAddBatch(newSub, pConsumerTopics, newTopicNum);
+ free(pConsumerTopics);
taosArraySortString(newSub, taosArrayCompareString);
SArray *oldSub = NULL;
int oldTopicNum = 0;
+ // create consumer if not exist
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId);
if (pConsumer == NULL) {
// create consumer
@@ -249,6 +252,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
}
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg);
if (pTrans == NULL) {
+ //TODO: free memory
return -1;
}
@@ -286,6 +290,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
}
if (pOldTopic != NULL) {
+ //cancel subscribe of that old topic
ASSERT(pNewTopic == NULL);
char *oldTopicName = pOldTopic->name;
SList *vgroups = pOldTopic->vgroups;
@@ -298,13 +303,14 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
SMqCGroup *pGroup = taosHashGet(pTopic->cgroups, consumerGroup, cgroupLen);
while ((pn = tdListNext(&iter)) != NULL) {
int32_t vgId = *(int64_t *)pn->data;
+ // acquire and get epset
SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
- // TODO release
+ // TODO what time to release?
if (pVgObj == NULL) {
// TODO handle error
continue;
}
- // acquire and get epset
+ //build reset msg
void *pMqVgSetReq = mndBuildMqVGroupSetReq(pMnode, oldTopicName, vgId, consumerId, consumerGroup);
// TODO:serialize
if (pMsg == NULL) {
@@ -323,10 +329,12 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
return -1;
}
}
+ //delete data in mnode
taosHashRemove(pTopic->cgroups, consumerGroup, cgroupLen);
mndReleaseTopic(pMnode, pTopic);
} else if (pNewTopic != NULL) {
+ // save subscribe info to mnode
ASSERT(pOldTopic == NULL);
char *newTopicName = pNewTopic->name;
@@ -351,6 +359,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
// add into cgroups
taosHashPut(pTopic->cgroups, consumerGroup, cgroupLen, pGroup, sizeof(SMqCGroup));
}
+ /*taosHashPut(pTopic->consumers, &pConsumer->consumerId, sizeof(int64_t), pConsumer, sizeof(SMqConsumerObj));*/
// put the consumer into list
// rebalance will be triggered by timer
diff --git a/source/dnode/mnode/impl/src/mndProfile.c b/source/dnode/mnode/impl/src/mndProfile.c
index 902eaa5c1c213c43f87863310b2d11b8980d303c..3773750ed348e7e9c41f427efb684b54a4cdc3e0 100644
--- a/source/dnode/mnode/impl/src/mndProfile.c
+++ b/source/dnode/mnode/impl/src/mndProfile.c
@@ -357,10 +357,13 @@ static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) {
}
}
}
+ taosArrayDestroyEx(pArray, tFreeClientHbReq);
+
int32_t tlen = tSerializeSClientHbBatchRsp(NULL, &batchRsp);
void* buf = rpcMallocCont(tlen);
void* abuf = buf;
tSerializeSClientHbBatchRsp(&abuf, &batchRsp);
+ taosArrayDestroy(batchRsp.rsps);
pReq->contLen = tlen;
pReq->pCont = buf;
return 0;
diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c
index d70c93e7585431924345dd29ac2aea59fd254bc2..cab30702ea38d858f318b64de2c1b60a246cf4a6 100644
--- a/source/dnode/mnode/impl/src/mnode.c
+++ b/source/dnode/mnode/impl/src/mnode.c
@@ -69,6 +69,17 @@ static void mndTransReExecute(void *param, void *tmrId) {
taosTmrReset(mndTransReExecute, 3000, pMnode, pMnode->timer, &pMnode->transTimer);
}
+static void mndCalMqRebalance(void* param, void* tmrId) {
+ SMnode* pMnode = param;
+ if (mndIsMaster(pMnode)) {
+ // iterate cgroup, cal rebalance
+ // sync with raft
+ // write sdb
+ }
+
+ taosTmrReset(mndCalMqRebalance, 3000, pMnode, pMnode->timer, &pMnode->transTimer);
+}
+
static int32_t mndInitTimer(SMnode *pMnode) {
if (pMnode->timer == NULL) {
pMnode->timer = taosTmrInit(5000, 200, 3600000, "MND");
diff --git a/source/libs/executor/inc/dataSinkInt.h b/source/libs/executor/inc/dataSinkInt.h
index 69727626afce01a853c30946bb8f1b9f261c9105..700356436556bc7f1bb5e892f5487848b6a70371 100644
--- a/source/libs/executor/inc/dataSinkInt.h
+++ b/source/libs/executor/inc/dataSinkInt.h
@@ -32,7 +32,7 @@ typedef struct SDataSinkManager {
} SDataSinkManager;
typedef int32_t (*FPutDataBlock)(struct SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue);
-typedef void (*FEndPut)(struct SDataSinkHandle* pHandle, int64_t useconds);
+typedef void (*FEndPut)(struct SDataSinkHandle* pHandle, uint64_t useconds);
typedef void (*FGetDataLength)(struct SDataSinkHandle* pHandle, int32_t* pLen, bool* pQueryEnd);
typedef int32_t (*FGetDataBlock)(struct SDataSinkHandle* pHandle, SOutputData* pOutput);
typedef int32_t (*FDestroyDataSinker)(struct SDataSinkHandle* pHandle);
diff --git a/source/libs/executor/src/dataDispatcher.c b/source/libs/executor/src/dataDispatcher.c
index 8280f9d0aff1b00a45114ef3ccb1385ee9f2aae1..edba4fc97dbe95aaf48f097627b6b6f2cd9b15ea 100644
--- a/source/libs/executor/src/dataDispatcher.c
+++ b/source/libs/executor/src/dataDispatcher.c
@@ -44,7 +44,7 @@ typedef struct SDataDispatchHandle {
SDataDispatchBuf nextOutput;
int32_t status;
bool queryEnd;
- int64_t useconds;
+ uint64_t useconds;
pthread_mutex_t mutex;
} SDataDispatchHandle;
@@ -158,7 +158,7 @@ static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput,
return TSDB_CODE_SUCCESS;
}
-static void endPut(struct SDataSinkHandle* pHandle, int64_t useconds) {
+static void endPut(struct SDataSinkHandle* pHandle, uint64_t useconds) {
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
pthread_mutex_lock(&pDispatcher->mutex);
pDispatcher->queryEnd = true;
diff --git a/source/libs/executor/src/dataSinkMgt.c b/source/libs/executor/src/dataSinkMgt.c
index 80d99f96c68b1d399ebd24b41b505069f0b0947f..eb1f75f359f384e291de65b4a9fe52d547e6d220 100644
--- a/source/libs/executor/src/dataSinkMgt.c
+++ b/source/libs/executor/src/dataSinkMgt.c
@@ -37,7 +37,7 @@ int32_t dsPutDataBlock(DataSinkHandle handle, const SInputData* pInput, bool* pC
return pHandleImpl->fPut(pHandleImpl, pInput, pContinue);
}
-void dsEndPut(DataSinkHandle handle, int64_t useconds) {
+void dsEndPut(DataSinkHandle handle, uint64_t useconds) {
SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle;
return pHandleImpl->fEndPut(pHandleImpl, useconds);
}
diff --git a/source/libs/executor/src/executorMain.c b/source/libs/executor/src/executorMain.c
index e451f888bb5c9bf0a38acc1ec531bae2514e6558..1f5d0cd059ad60f81407905b2cd62a3915bbeb2e 100644
--- a/source/libs/executor/src/executorMain.c
+++ b/source/libs/executor/src/executorMain.c
@@ -68,7 +68,7 @@ void freeParam(STaskParam *param) {
tfree(param->prevResult);
}
-int32_t qCreateExecTask(void* tsdb, int32_t vgId, SSubplan* pSubplan, qTaskInfo_t* pTaskInfo) {
+int32_t qCreateExecTask(void* tsdb, int32_t vgId, SSubplan* pSubplan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle) {
assert(tsdb != NULL && pSubplan != NULL);
SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo;
@@ -85,6 +85,8 @@ int32_t qCreateExecTask(void* tsdb, int32_t vgId, SSubplan* pSubplan, qTaskInfo_
code = dsCreateDataSinker(pSubplan->pDataSink, &(*pTask)->dsHandle);
+ *handle = (*pTask)->dsHandle;
+
_error:
// if failed to add ref for all tables in this query, abort current query
return code;
@@ -135,16 +137,18 @@ int waitMoment(SQInfo* pQInfo){
}
#endif
-SSDataBlock* qExecTask(qTaskInfo_t tinfo, DataSinkHandle* handle) {
+int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
int64_t threadId = taosGetSelfPthreadId();
+ *pRes = NULL;
+
int64_t curOwner = 0;
if ((curOwner = atomic_val_compare_exchange_64(&pTaskInfo->owner, 0, threadId)) != 0) {
qError("QInfo:0x%" PRIx64 "-%p qhandle is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo,
(void*)curOwner);
pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC;
- return NULL;
+ return pTaskInfo->code;
}
if (pTaskInfo->cost.start == 0) {
@@ -153,7 +157,7 @@ SSDataBlock* qExecTask(qTaskInfo_t tinfo, DataSinkHandle* handle) {
if (isTaskKilled(pTaskInfo)) {
qDebug("QInfo:0x%" PRIx64 " it is already killed, abort", GET_TASKID(pTaskInfo));
- return NULL;
+ return TSDB_CODE_SUCCESS;
}
// STaskRuntimeEnv* pRuntimeEnv = &pTaskInfo->runtimeEnv;
@@ -170,7 +174,7 @@ SSDataBlock* qExecTask(qTaskInfo_t tinfo, DataSinkHandle* handle) {
pTaskInfo->code = ret;
qDebug("QInfo:0x%" PRIx64 " query abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo),
tstrerror(pTaskInfo->code));
- return NULL;
+ return pTaskInfo->code;
}
qDebug("QInfo:0x%" PRIx64 " query task is launched", GET_TASKID(pTaskInfo));
@@ -179,21 +183,21 @@ SSDataBlock* qExecTask(qTaskInfo_t tinfo, DataSinkHandle* handle) {
publishOperatorProfEvent(pTaskInfo->pRoot, QUERY_PROF_BEFORE_OPERATOR_EXEC);
int64_t st = 0;
- if (handle) {
- *handle = pTaskInfo->dsHandle;
- }
-
st = taosGetTimestampUs();
- SSDataBlock* pRes = pTaskInfo->pRoot->exec(pTaskInfo->pRoot, &newgroup);
+ *pRes = pTaskInfo->pRoot->exec(pTaskInfo->pRoot, &newgroup);
pTaskInfo->cost.elapsedTime += (taosGetTimestampUs() - st);
publishOperatorProfEvent(pTaskInfo->pRoot, QUERY_PROF_AFTER_OPERATOR_EXEC);
+ if (NULL == *pRes) {
+ *useconds = pTaskInfo->cost.elapsedTime;
+ }
+
qDebug("QInfo:0x%" PRIx64 " query paused, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d",
GET_TASKID(pTaskInfo), 0, 0L, 0);
atomic_store_64(&pTaskInfo->owner, 0);
- return pRes;
+ return pTaskInfo->code;
}
int32_t qRetrieveQueryResultInfo(qTaskInfo_t qinfo, bool* buildRes, void* pRspContext) {
diff --git a/source/libs/executor/test/executorTests.cpp b/source/libs/executor/test/executorTests.cpp
index 5f5fddbe28ab3106eb1e7caedad4f3a88ca19bef..c528d879a30af3748ea01d561c865c9524b8369a 100644
--- a/source/libs/executor/test/executorTests.cpp
+++ b/source/libs/executor/test/executorTests.cpp
@@ -217,5 +217,6 @@ TEST(testCase, build_executor_tree_Test) {
"}";
SExecTaskInfo* pTaskInfo = nullptr;
- int32_t code = qCreateExecTask((void*) 1, 2, NULL, (void**) &pTaskInfo);
+ DataSinkHandle sinkHandle = nullptr;
+ int32_t code = qCreateExecTask((void*) 1, 2, NULL, (void**) &pTaskInfo, &sinkHandle);
}
\ No newline at end of file
diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c
index 61bd82a49a06b50ddda2d8699222a8de6c33475a..a0beaba61da85c7ed060d52e02e255e3c8baf66f 100644
--- a/source/libs/qworker/src/qworker.c
+++ b/source/libs/qworker/src/qworker.c
@@ -458,6 +458,37 @@ _return:
QW_RET(code);
}
+int32_t qwExecTask(QW_FPARAMS_DEF, qTaskInfo_t taskHandle, DataSinkHandle sinkHandle) {
+ int32_t code = 0;
+ bool qcontinue = true;
+ SSDataBlock* pRes = NULL;
+ uint64_t useconds = 0;
+
+ while (qcontinue) {
+ code = qExecTask(taskHandle, &pRes, &useconds);
+ if (code) {
+ QW_TASK_ELOG("qExecTask failed, code:%x", code);
+ QW_ERR_JRET(code);
+ }
+
+ if (NULL == pRes) {
+ QW_TASK_DLOG("query done, useconds:%"PRIu64, useconds);
+ dsEndPut(sinkHandle, useconds);
+ break;
+ }
+
+ SInputData inputData = {.pData = pRes, .pTableRetrieveTsMap = NULL};
+ code = dsPutDataBlock(sinkHandle, &inputData, &qcontinue);
+ if (code) {
+ QW_TASK_ELOG("dsPutDataBlock failed, code:%x", code);
+ QW_ERR_JRET(code);
+ }
+ }
+
+_return:
+
+ QW_RET(code);
+}
int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void **rspMsg, SOutputData *pOutput) {
@@ -733,7 +764,9 @@ int32_t qwProcessQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t
}
qTaskInfo_t pTaskInfo = NULL;
- code = qCreateExecTask(qwMsg->node, 0, (struct SSubplan *)plan, &pTaskInfo);
+ DataSinkHandle sinkHandle = NULL;
+
+ code = qCreateExecTask(qwMsg->node, 0, (struct SSubplan *)plan, &pTaskInfo, &sinkHandle);
if (code) {
QW_TASK_ELOG("qCreateExecTask failed, code:%x", code);
QW_ERR_JRET(code);
@@ -743,12 +776,7 @@ int32_t qwProcessQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t
queryRsped = true;
- DataSinkHandle sinkHandle = NULL;
- SSDataBlock* pRes = qExecTask(pTaskInfo, &sinkHandle);
- if (code) {
- QW_TASK_ELOG("qExecTask failed, code:%x", code);
- QW_ERR_JRET(code);
- }
+ QW_ERR_JRET(qwExecTask(QW_FPARAMS(), pTaskInfo, sinkHandle));
_return:
@@ -840,11 +868,7 @@ int32_t qwProcessCQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t
qTaskInfo_t taskHandle = ctx->taskHandle;
DataSinkHandle sinkHandle = ctx->sinkHandle;
- code = qExecTask(taskHandle, &sinkHandle);
- if (code) {
- QW_TASK_ELOG("qExecTask failed, code:%x", code);
- QW_ERR_JRET(code);
- }
+ QW_ERR_JRET(qwExecTask(QW_FPARAMS(), taskHandle, sinkHandle));
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_CQUERY);
diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c
index 2d5322fc2c1acaf69bae35e67bc6d755c9690a68..c53926f8c10243f3cf41baec1bad394bf23994dd 100644
--- a/source/libs/scheduler/src/scheduler.c
+++ b/source/libs/scheduler/src/scheduler.c
@@ -412,6 +412,8 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) {
SCH_TASK_ELOG("taosArrayPush execNode to candidate addrs failed, addNum:%d, errno:%d", addNum, errno);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
+
+ ++addNum;
}
}
@@ -792,6 +794,11 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
if (rspCode != TSDB_CODE_SUCCESS) {
SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, rspCode));
}
+
+ SShellSubmitRsp *rsp = (SShellSubmitRsp *)msg;
+ if (rsp) {
+ pJob->resNumOfRows += rsp->affectedRows;
+ }
#endif
SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
@@ -1355,9 +1362,9 @@ int32_t scheduleExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, stru
SSchJob *job = NULL;
- SCH_ERR_RET(schExecJobImpl(transport, nodeList, pDag, &job, true));
+ SCH_ERR_RET(schExecJobImpl(transport, nodeList, pDag, pJob, true));
- *pJob = job;
+ job = *pJob;
pRes->code = atomic_load_32(&job->errCode);
pRes->numOfRows = job->resNumOfRows;
diff --git a/source/libs/scheduler/test/schedulerTests.cpp b/source/libs/scheduler/test/schedulerTests.cpp
index d72c4920d51ee94cf1a36f188532c517dfcac0cc..5332c6fcd10430b93184004972ffdf1d8139d7c4 100644
--- a/source/libs/scheduler/test/schedulerTests.cpp
+++ b/source/libs/scheduler/test/schedulerTests.cpp
@@ -34,10 +34,12 @@
#include "stub.h"
#include "addr_any.h"
+
namespace {
extern "C" int32_t schHandleResponseMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode);
+
void schtInitLogFile() {
const char *defaultLogFileNamePrefix = "taoslog";
const int32_t maxLogFileNum = 10;
@@ -113,9 +115,9 @@ void schtBuildInsertDag(SQueryDag *dag) {
dag->queryId = qId;
dag->numOfSubplans = 2;
dag->pSubplans = taosArrayInit(1, POINTER_BYTES);
- SArray *inserta = taosArrayInit(dag->numOfSubplans, sizeof(SSubplan));
+ SArray *inserta = taosArrayInit(dag->numOfSubplans, POINTER_BYTES);
- SSubplan insertPlan[2] = {0};
+ SSubplan *insertPlan = (SSubplan *)calloc(2, sizeof(SSubplan));
insertPlan[0].id.queryId = qId;
insertPlan[0].id.templateId = 0x0000000000000003;
@@ -131,6 +133,7 @@ void schtBuildInsertDag(SQueryDag *dag) {
insertPlan[0].pParents = NULL;
insertPlan[0].pNode = NULL;
insertPlan[0].pDataSink = (SDataSink*)calloc(1, sizeof(SDataSink));
+ insertPlan[0].msgType = TDMT_VND_SUBMIT;
insertPlan[1].id.queryId = qId;
insertPlan[1].id.templateId = 0x0000000000000003;
@@ -146,10 +149,11 @@ void schtBuildInsertDag(SQueryDag *dag) {
insertPlan[1].pParents = NULL;
insertPlan[1].pNode = NULL;
insertPlan[1].pDataSink = (SDataSink*)calloc(1, sizeof(SDataSink));
+ insertPlan[1].msgType = TDMT_VND_SUBMIT;
-
- taosArrayPush(inserta, &insertPlan[0]);
- taosArrayPush(inserta, &insertPlan[1]);
+ taosArrayPush(inserta, &insertPlan);
+ insertPlan += 1;
+ taosArrayPush(inserta, &insertPlan);
taosArrayPush(dag->pSubplans, &inserta);
}
@@ -210,6 +214,24 @@ void schtSetRpcSendRequest() {
}
}
+int32_t schtAsyncSendMsgToServer(void *pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo) {
+ return 0;
+}
+
+
+void schtSetAsyncSendMsgToServer() {
+ static Stub stub;
+ stub.set(asyncSendMsgToServer, schtAsyncSendMsgToServer);
+ {
+ AddrAny any("libtransport.so");
+ std::map result;
+ any.get_global_func_addr_dynsym("^asyncSendMsgToServer$", result);
+ for (const auto& f : result) {
+ stub.set(f.second, schtAsyncSendMsgToServer);
+ }
+ }
+}
+
void *schtSendRsp(void *param) {
SSchJob *job = NULL;
@@ -230,7 +252,7 @@ void *schtSendRsp(void *param) {
SShellSubmitRsp rsp = {0};
rsp.affectedRows = 10;
- schHandleResponseMsg(job, task, TDMT_VND_SUBMIT, (char *)&rsp, sizeof(rsp), 0);
+ schHandleResponseMsg(job, task, TDMT_VND_SUBMIT_RSP, (char *)&rsp, sizeof(rsp), 0);
pIter = taosHashIterate(job->execTasks, pIter);
}
@@ -238,6 +260,23 @@ void *schtSendRsp(void *param) {
return NULL;
}
+void *schtCreateFetchRspThread(void *param) {
+ struct SSchJob* job = (struct SSchJob*)param;
+
+ sleep(1);
+
+ int32_t code = 0;
+ SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)calloc(1, sizeof(SRetrieveTableRsp));
+ rsp->completed = 1;
+ rsp->numOfRows = 10;
+ code = schHandleResponseMsg(job, job->fetchTask, TDMT_VND_FETCH_RSP, (char *)rsp, sizeof(rsp), 0);
+
+ assert(code == 0);
+}
+
+
+
+
struct SSchJob *pInsertJob = NULL;
}
@@ -266,6 +305,7 @@ TEST(queryTest, normalCase) {
schtSetPlanToString();
schtSetExecNode();
+ schtSetAsyncSendMsgToServer();
code = scheduleAsyncExecJob(mockPointer, qnodeList, &dag, &pJob);
ASSERT_EQ(code, 0);
@@ -276,7 +316,7 @@ TEST(queryTest, normalCase) {
SSchTask *task = *(SSchTask **)pIter;
SQueryTableRsp rsp = {0};
- code = schHandleResponseMsg(job, task, TDMT_VND_QUERY, (char *)&rsp, sizeof(rsp), 0);
+ code = schHandleResponseMsg(job, task, TDMT_VND_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0);
ASSERT_EQ(code, 0);
pIter = taosHashIterate(job->execTasks, pIter);
@@ -287,8 +327,8 @@ TEST(queryTest, normalCase) {
SSchTask *task = *(SSchTask **)pIter;
SResReadyRsp rsp = {0};
- code = schHandleResponseMsg(job, task, TDMT_VND_RES_READY, (char *)&rsp, sizeof(rsp), 0);
-
+ code = schHandleResponseMsg(job, task, TDMT_VND_RES_READY_RSP, (char *)&rsp, sizeof(rsp), 0);
+ printf("code:%d", code);
ASSERT_EQ(code, 0);
pIter = taosHashIterate(job->execTasks, pIter);
}
@@ -298,7 +338,7 @@ TEST(queryTest, normalCase) {
SSchTask *task = *(SSchTask **)pIter;
SQueryTableRsp rsp = {0};
- code = schHandleResponseMsg(job, task, TDMT_VND_QUERY, (char *)&rsp, sizeof(rsp), 0);
+ code = schHandleResponseMsg(job, task, TDMT_VND_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0);
ASSERT_EQ(code, 0);
pIter = taosHashIterate(job->execTasks, pIter);
@@ -309,22 +349,19 @@ TEST(queryTest, normalCase) {
SSchTask *task = *(SSchTask **)pIter;
SResReadyRsp rsp = {0};
- code = schHandleResponseMsg(job, task, TDMT_VND_RES_READY, (char *)&rsp, sizeof(rsp), 0);
+ code = schHandleResponseMsg(job, task, TDMT_VND_RES_READY_RSP, (char *)&rsp, sizeof(rsp), 0);
ASSERT_EQ(code, 0);
pIter = taosHashIterate(job->execTasks, pIter);
}
- SRetrieveTableRsp rsp = {0};
- rsp.completed = 1;
- rsp.numOfRows = 10;
- code = schHandleResponseMsg(job, NULL, TDMT_VND_FETCH, (char *)&rsp, sizeof(rsp), 0);
-
- ASSERT_EQ(code, 0);
+ pthread_attr_t thattr;
+ pthread_attr_init(&thattr);
+ pthread_t thread1;
+ pthread_create(&(thread1), &thattr, schtCreateFetchRspThread, job);
- void *data = NULL;
-
+ void *data = NULL;
code = scheduleFetchRows(job, &data);
ASSERT_EQ(code, 0);
@@ -340,6 +377,8 @@ TEST(queryTest, normalCase) {
scheduleFreeJob(pJob);
schtFreeQueryDag(&dag);
+
+ schedulerDestroy();
}
@@ -369,6 +408,7 @@ TEST(insertTest, normalCase) {
schtBuildInsertDag(&dag);
schtSetPlanToString();
+ schtSetAsyncSendMsgToServer();
pthread_attr_t thattr;
pthread_attr_init(&thattr);
@@ -382,6 +422,8 @@ TEST(insertTest, normalCase) {
ASSERT_EQ(res.numOfRows, 20);
scheduleFreeJob(pInsertJob);
+
+ schedulerDestroy();
}
TEST(multiThread, forceFree) {
diff --git a/source/libs/tfs/inc/tfsint.h b/source/libs/tfs/inc/tfsInt.h
similarity index 55%
rename from source/libs/tfs/inc/tfsint.h
rename to source/libs/tfs/inc/tfsInt.h
index 3c5dccc63b483a627ce527129f4dbed334c662be..ce1436eb298f35f5ac7f0f70c677d29fae38d08f 100644
--- a/source/libs/tfs/inc/tfsint.h
+++ b/source/libs/tfs/inc/tfsInt.h
@@ -13,19 +13,20 @@
* along with this program. If not, see .
*/
-#ifndef TD_TFSINT_H
-#define TD_TFSINT_H
+#ifndef _TD_TFS_INT_H_
+#define _TD_TFS_INT_H_
-#include "tlog.h"
-#include "tglobal.h"
-#include "tfs.h"
-#include "tcoding.h"
+#include "os.h"
-#ifdef __cplusplus
-extern "C" {
-#endif
+#include "taosdef.h"
+#include "taoserror.h"
+#include "tcoding.h"
+#include "tfs.h"
+#include "tglobal.h"
+#include "thash.h"
+#include "tlog.h"
-extern int fsDebugFlag;
+extern int32_t fsDebugFlag;
// For debug purpose
#define fFatal(...) { if (fsDebugFlag & DEBUG_FATAL) { taosPrintLog("TFS FATAL ", 255, __VA_ARGS__); }}
@@ -38,60 +39,44 @@ extern int fsDebugFlag;
// Global Definitions
#define TFS_MIN_DISK_FREE_SIZE 50 * 1024 * 1024
-// tdisk.c ======================================================
-typedef struct {
- int64_t size;
- int64_t used;
- int64_t free;
-} SDiskMeta;
-
typedef struct SDisk {
- int level;
- int id;
- char dir[TSDB_FILENAME_LEN];
- SDiskMeta dmeta;
+ int32_t level;
+ int32_t id;
+ char *path;
+ SDiskSize size;
} SDisk;
-#define DISK_LEVEL(pd) ((pd)->level)
-#define DISK_ID(pd) ((pd)->id)
-#define DISK_DIR(pd) ((pd)->dir)
-#define DISK_META(pd) ((pd)->dmeta)
-#define DISK_SIZE(pd) ((pd)->dmeta.size)
-#define DISK_USED_SIZE(pd) ((pd)->dmeta.used)
-#define DISK_FREE_SIZE(pd) ((pd)->dmeta.free)
-
-SDisk *tfsNewDisk(int level, int id, const char *dir);
-SDisk *tfsFreeDisk(SDisk *pDisk);
-int tfsUpdateDiskInfo(SDisk *pDisk);
-
-// ttier.c ======================================================
-
typedef struct STier {
pthread_spinlock_t lock;
- int level;
- int16_t ndisk; // # of disks mounted to this tier
- int16_t nextid; // next disk id to allocate
- STierMeta tmeta;
- SDisk * disks[TSDB_MAX_DISKS_PER_TIER];
+ int32_t level;
+ int16_t nextid; // next disk id to allocate
+ int16_t ndisk; // # of disks mounted to this tier
+ int16_t nAvailDisks; // # of Available disks
+ SDisk *disks[TSDB_MAX_DISKS_PER_TIER];
+ SDiskSize size;
} STier;
#define TIER_LEVEL(pt) ((pt)->level)
#define TIER_NDISKS(pt) ((pt)->ndisk)
#define TIER_SIZE(pt) ((pt)->tmeta.size)
#define TIER_FREE_SIZE(pt) ((pt)->tmeta.free)
-#define TIER_AVAIL_DISKS(pt) ((pt)->tmeta.nAvailDisks)
+
#define DISK_AT_TIER(pt, id) ((pt)->disks[id])
+#define DISK_DIR(pd) ((pd)->path)
-int tfsInitTier(STier *pTier, int level);
-void tfsDestroyTier(STier *pTier);
-SDisk *tfsMountDiskToTier(STier *pTier, SDiskCfg *pCfg);
-void tfsUpdateTierInfo(STier *pTier, STierMeta *pTierMeta);
-int tfsAllocDiskOnTier(STier *pTier);
-void tfsGetTierMeta(STier *pTier, STierMeta *pTierMeta);
-void tfsPosNextId(STier *pTier);
+SDisk *tfsNewDisk(int32_t level, int32_t id, const char *dir);
+SDisk *tfsFreeDisk(SDisk *pDisk);
+int32_t tfsUpdateDiskSize(SDisk *pDisk);
+
+int32_t tfsInitTier(STier *pTier, int32_t level);
+void tfsDestroyTier(STier *pTier);
+SDisk *tfsMountDiskToTier(STier *pTier, SDiskCfg *pCfg);
+void tfsUpdateTierSize(STier *pTier);
+int32_t tfsAllocDiskOnTier(STier *pTier);
+void tfsPosNextId(STier *pTier);
#ifdef __cplusplus
}
#endif
-#endif
+#endif /*_TD_TFS_INT_H_*/
diff --git a/source/libs/tfs/src/tfs.c b/source/libs/tfs/src/tfs.c
index 88d6d587a7bf4aad447de76f0ee92d9cf4e1035b..1d11cd6df2541b812b68081649ad293c567845cf 100644
--- a/source/libs/tfs/src/tfs.c
+++ b/source/libs/tfs/src/tfs.c
@@ -13,22 +13,17 @@
* along with this program. If not, see .
*/
-#include "os.h"
-
-#include "taosdef.h"
-#include "taoserror.h"
-#include "tfs.h"
-#include "tfsint.h"
-#include "thash.h"
+#define _DEFAULT_SOURCE
+#include "tfsInt.h"
#define TMPNAME_LEN (TSDB_FILENAME_LEN * 2 + 32)
typedef struct {
pthread_spinlock_t lock;
SFSMeta meta;
- int nlevel;
+ int32_t nlevel;
STier tiers[TSDB_MAX_TIERS];
- SHashObj * map; // name to did map
+ SHashObj *map; // name to did map
} SFS;
typedef struct {
@@ -52,21 +47,24 @@ static SFS tfs = {0};
static SFS *pfs = &tfs;
// STATIC DECLARATION
-static int tfsMount(SDiskCfg *pCfg);
-static int tfsCheck();
-static int tfsCheckAndFormatCfg(SDiskCfg *pCfg);
-static int tfsFormatDir(char *idir, char *odir);
-static SDisk *tfsGetDiskByID(SDiskID did);
-static SDisk *tfsGetDiskByName(const char *dir);
-static int tfsOpendirImpl(TDIR *tdir);
-static void tfsInitDiskIter(SDiskIter *pIter);
-static SDisk *tfsNextDisk(SDiskIter *pIter);
+static int32_t tfsMount(SDiskCfg *pCfg);
+static int32_t tfsCheck();
+static int32_t tfsCheckAndFormatCfg(SDiskCfg *pCfg);
+static int32_t tfsFormatDir(char *idir, char *odir);
+static SDisk *tfsGetDiskByID(SDiskID did);
+static SDisk *tfsGetDiskByName(const char *dir);
+static int32_t tfsOpendirImpl(TDIR *tdir);
+static void tfsInitDiskIter(SDiskIter *pIter);
+static SDisk *tfsNextDisk(SDiskIter *pIter);
// FS APIs ====================================
-int tfsInit(SDiskCfg *pDiskCfg, int ndisk) {
- ASSERT(ndisk > 0);
+int32_t tfsInit(SDiskCfg *pDiskCfg, int32_t ndisk) {
+ if (ndisk < 0) {
+ terrno = TSDB_CODE_INVALID_PARA;
+ return -1;
+ }
- for (int level = 0; level < TSDB_MAX_TIERS; level++) {
+ for (int32_t level = 0; level < TSDB_MAX_TIERS; level++) {
if (tfsInitTier(TFS_TIER_AT(level), level) < 0) {
while (true) {
level--;
@@ -84,12 +82,12 @@ int tfsInit(SDiskCfg *pDiskCfg, int ndisk) {
pfs->map = taosHashInit(TSDB_MAX_TIERS * TSDB_MAX_DISKS_PER_TIER * 2,
taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
if (pfs->map == NULL) {
- terrno = TSDB_CODE_FS_OUT_OF_MEMORY;
+ terrno = TSDB_CODE_OUT_OF_MEMORY;
tfsCleanup();
return -1;
}
- for (int idisk = 0; idisk < ndisk; idisk++) {
+ for (int32_t idisk = 0; idisk < ndisk; idisk++) {
if (tfsMount(pDiskCfg + idisk) < 0) {
tfsCleanup();
return -1;
@@ -101,8 +99,8 @@ int tfsInit(SDiskCfg *pDiskCfg, int ndisk) {
return -1;
}
- tfsUpdateInfo(NULL, NULL, 0);
- for (int level = 0; level < TFS_NLEVEL(); level++) {
+ tfsUpdateSize(NULL);
+ for (int32_t level = 0; level < TFS_NLEVEL(); level++) {
tfsPosNextId(TFS_TIER_AT(level));
}
@@ -114,32 +112,27 @@ void tfsCleanup() {
pfs->map = NULL;
pthread_spin_destroy(&(pfs->lock));
- for (int level = 0; level < TFS_NLEVEL(); level++) {
+ for (int32_t level = 0; level < TFS_NLEVEL(); level++) {
tfsDestroyTier(TFS_TIER_AT(level));
}
}
-void tfsUpdateInfo(SFSMeta *pFSMeta, STierMeta *tierMetas, int8_t numTiers) {
- SFSMeta fsMeta;
- STierMeta tierMeta;
+void tfsUpdateSize(SFSMeta *pFSMeta) {
+ SFSMeta fsMeta = {0};
+ SDiskSize size = {0};
if (pFSMeta == NULL) {
pFSMeta = &fsMeta;
}
- memset(pFSMeta, 0, sizeof(*pFSMeta));
-
- for (int level = 0; level < TFS_NLEVEL(); level++) {
- STierMeta *pTierMeta = &tierMeta;
- if (tierMetas && level < numTiers) {
- pTierMeta = tierMetas + level;
- }
+ memset(pFSMeta, 0, sizeof(SFSMeta));
+ for (int32_t level = 0; level < TFS_NLEVEL(); level++) {
STier *pTier = TFS_TIER_AT(level);
- tfsUpdateTierInfo(pTier, pTierMeta);
- pFSMeta->tsize += pTierMeta->size;
- pFSMeta->avail += pTierMeta->free;
- pFSMeta->used += pTierMeta->used;
+ tfsUpdateTierSize(pTier);
+ pFSMeta->total += pTier->size.total;
+ pFSMeta->avail += pTier->size.avail;
+ pFSMeta->used += pTier->size.used;
}
tfsLock();
@@ -147,17 +140,9 @@ void tfsUpdateInfo(SFSMeta *pFSMeta, STierMeta *tierMetas, int8_t numTiers) {
tfsUnLock();
}
-void tfsGetMeta(SFSMeta *pMeta) {
- ASSERT(pMeta);
-
- tfsLock();
- *pMeta = pfs->meta;
- tfsUnLock();
-}
-
/* Allocate an existing available tier level
*/
-void tfsAllocDisk(int expLevel, int *level, int *id) {
+void tfsAllocDisk(int32_t expLevel, int32_t *level, int32_t *id) {
ASSERT(expLevel >= 0);
*level = expLevel;
@@ -182,10 +167,10 @@ void tfsAllocDisk(int expLevel, int *level, int *id) {
}
const char *TFS_PRIMARY_PATH() { return DISK_DIR(TFS_PRIMARY_DISK()); }
-const char *TFS_DISK_PATH(int level, int id) { return DISK_DIR(TFS_DISK_AT(level, id)); }
+const char *TFS_DISK_PATH(int32_t level, int32_t id) { return DISK_DIR(TFS_DISK_AT(level, id)); }
// TFILE APIs ====================================
-void tfsInitFile(TFILE *pf, int level, int id, const char *bname) {
+void tfsInitFile(TFILE *pf, int32_t level, int32_t id, const char *bname) {
ASSERT(TFS_IS_VALID_DISK(level, id));
SDisk *pDisk = TFS_DISK_AT(level, id);
@@ -208,8 +193,8 @@ bool tfsIsSameFile(const TFILE *pf1, const TFILE *pf2) {
return true;
}
-int tfsEncodeFile(void **buf, TFILE *pf) {
- int tlen = 0;
+int32_t tfsEncodeFile(void **buf, TFILE *pf) {
+ int32_t tlen = 0;
tlen += taosEncodeVariantI32(buf, pf->level);
tlen += taosEncodeVariantI32(buf, pf->id);
@@ -220,7 +205,7 @@ int tfsEncodeFile(void **buf, TFILE *pf) {
void *tfsDecodeFile(void *buf, TFILE *pf) {
int32_t level, id;
- char * rname;
+ char *rname;
buf = taosDecodeVariantI32(buf, &(level));
buf = taosDecodeVariantI32(buf, &(id));
@@ -247,7 +232,7 @@ void tfsdirname(const TFILE *pf, char *dest) {
}
// DIR APIs ====================================
-int tfsMkdirAt(const char *rname, int level, int id) {
+int32_t tfsMkdirAt(const char *rname, int32_t level, int32_t id) {
SDisk *pDisk = TFS_DISK_AT(level, id);
char aname[TMPNAME_LEN];
@@ -260,7 +245,7 @@ int tfsMkdirAt(const char *rname, int level, int id) {
return 0;
}
-int tfsMkdirRecurAt(const char *rname, int level, int id) {
+int32_t tfsMkdirRecurAt(const char *rname, int32_t level, int32_t id) {
if (tfsMkdirAt(rname, level, id) < 0) {
if (errno == ENOENT) {
// Try to create upper
@@ -293,10 +278,10 @@ int tfsMkdirRecurAt(const char *rname, int level, int id) {
return 0;
}
-int tfsMkdir(const char *rname) {
- for (int level = 0; level < TFS_NLEVEL(); level++) {
+int32_t tfsMkdir(const char *rname) {
+ for (int32_t level = 0; level < TFS_NLEVEL(); level++) {
STier *pTier = TFS_TIER_AT(level);
- for (int id = 0; id < TIER_NDISKS(pTier); id++) {
+ for (int32_t id = 0; id < TIER_NDISKS(pTier); id++) {
if (tfsMkdirAt(rname, level, id) < 0) {
return -1;
}
@@ -306,15 +291,15 @@ int tfsMkdir(const char *rname) {
return 0;
}
-int tfsRmdir(const char *rname) {
+int32_t tfsRmdir(const char *rname) {
char aname[TMPNAME_LEN] = "\0";
- for (int level = 0; level < TFS_NLEVEL(); level++) {
+ for (int32_t level = 0; level < TFS_NLEVEL(); level++) {
STier *pTier = TFS_TIER_AT(level);
- for (int id = 0; id < TIER_NDISKS(pTier); id++) {
- SDisk *pDisk = DISK_AT_TIER(pTier, id);
+ for (int32_t id = 0; id < TIER_NDISKS(pTier); id++) {
+ SDisk *pDisk = pTier->disks[id];
- snprintf(aname, TMPNAME_LEN, "%s/%s", DISK_DIR(pDisk), rname);
+ snprintf(aname, TMPNAME_LEN, "%s%s%s", DISK_DIR(pDisk), TS_PATH_DELIMITER, rname);
taosRemoveDir(aname);
}
@@ -323,13 +308,14 @@ int tfsRmdir(const char *rname) {
return 0;
}
-int tfsRename(char *orname, char *nrname) {
+#if 0
+int32_t tfsRename(char *orname, char *nrname) {
char oaname[TMPNAME_LEN] = "\0";
char naname[TMPNAME_LEN] = "\0";
- for (int level = 0; level < pfs->nlevel; level++) {
+ for (int32_t level = 0; level < pfs->nlevel; level++) {
STier *pTier = TFS_TIER_AT(level);
- for (int id = 0; id < TIER_NDISKS(pTier); id++) {
+ for (int32_t id = 0; id < TIER_NDISKS(pTier); id++) {
SDisk *pDisk = DISK_AT_TIER(pTier, id);
snprintf(oaname, TMPNAME_LEN, "%s/%s", DISK_DIR(pDisk), orname);
@@ -341,20 +327,21 @@ int tfsRename(char *orname, char *nrname) {
return 0;
}
+#endif
struct TDIR {
SDiskIter iter;
- int level;
- int id;
+ int32_t level;
+ int32_t id;
char dirname[TSDB_FILENAME_LEN];
TFILE tfile;
- DIR * dir;
+ DIR *dir;
};
TDIR *tfsOpendir(const char *rname) {
TDIR *tdir = (TDIR *)calloc(1, sizeof(*tdir));
if (tdir == NULL) {
- terrno = TSDB_CODE_FS_OUT_OF_MEMORY;
+ terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
@@ -407,9 +394,9 @@ void tfsClosedir(TDIR *tdir) {
}
// private
-static int tfsMount(SDiskCfg *pCfg) {
+static int32_t tfsMount(SDiskCfg *pCfg) {
SDiskID did;
- SDisk * pDisk = NULL;
+ SDisk *pDisk = NULL;
if (tfsCheckAndFormatCfg(pCfg) < 0) return -1;
@@ -419,7 +406,7 @@ static int tfsMount(SDiskCfg *pCfg) {
fError("failed to mount disk %s to level %d since %s", pCfg->dir, pCfg->level, tstrerror(terrno));
return -1;
}
- did.id = DISK_ID(pDisk);
+ did.id = pDisk->id;
taosHashPut(pfs->map, (void *)(pCfg->dir), strnlen(pCfg->dir, TSDB_FILENAME_LEN), (void *)(&did), sizeof(did));
if (pfs->nlevel < pCfg->level + 1) pfs->nlevel = pCfg->level + 1;
@@ -427,7 +414,7 @@ static int tfsMount(SDiskCfg *pCfg) {
return 0;
}
-static int tfsCheckAndFormatCfg(SDiskCfg *pCfg) {
+static int32_t tfsCheckAndFormatCfg(SDiskCfg *pCfg) {
char dirName[TSDB_FILENAME_LEN] = "\0";
struct stat pstat;
@@ -486,10 +473,10 @@ static int tfsCheckAndFormatCfg(SDiskCfg *pCfg) {
return 0;
}
-static int tfsFormatDir(char *idir, char *odir) {
+static int32_t tfsFormatDir(char *idir, char *odir) {
wordexp_t wep = {0};
- int code = wordexp(idir, &wep, 0);
+ int32_t code = wordexp(idir, &wep, 0);
if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(code);
return -1;
@@ -507,14 +494,14 @@ static int tfsFormatDir(char *idir, char *odir) {
return 0;
}
-static int tfsCheck() {
+static int32_t tfsCheck() {
if (TFS_PRIMARY_DISK() == NULL) {
fError("no primary disk is set");
terrno = TSDB_CODE_FS_NO_PRIMARY_DISK;
return -1;
}
- for (int level = 0; level < TFS_NLEVEL(); level++) {
+ for (int32_t level = 0; level < TFS_NLEVEL(); level++) {
if (TIER_NDISKS(TFS_TIER_AT(level)) == 0) {
fError("no disk at level %d", level);
terrno = TSDB_CODE_FS_NO_MOUNT_AT_TIER;
@@ -528,8 +515,8 @@ static int tfsCheck() {
static SDisk *tfsGetDiskByID(SDiskID did) { return TFS_DISK_AT(did.level, did.id); }
static SDisk *tfsGetDiskByName(const char *dir) {
SDiskID did;
- SDisk * pDisk = NULL;
- void * pr = NULL;
+ SDisk *pDisk = NULL;
+ void *pr = NULL;
pr = taosHashGet(pfs->map, (void *)dir, strnlen(dir, TSDB_FILENAME_LEN));
if (pr == NULL) return NULL;
@@ -541,7 +528,7 @@ static SDisk *tfsGetDiskByName(const char *dir) {
return pDisk;
}
-static int tfsOpendirImpl(TDIR *tdir) {
+static int32_t tfsOpendirImpl(TDIR *tdir) {
SDisk *pDisk = NULL;
char adir[TMPNAME_LEN * 2] = "\0";
@@ -554,10 +541,10 @@ static int tfsOpendirImpl(TDIR *tdir) {
pDisk = tfsNextDisk(&(tdir->iter));
if (pDisk == NULL) return 0;
- tdir->level = DISK_LEVEL(pDisk);
- tdir->id = DISK_ID(pDisk);
+ tdir->level = pDisk->level;
+ tdir->id = pDisk->id;
- snprintf(adir, TMPNAME_LEN * 2, "%s/%s", DISK_DIR(pDisk), tdir->dirname);
+ snprintf(adir, TMPNAME_LEN * 2, "%s%s%s", pDisk->path, TS_PATH_DELIMITER,tdir->dirname);
tdir->dir = opendir(adir);
if (tdir->dir != NULL) break;
}
@@ -572,8 +559,8 @@ static SDisk *tfsNextDisk(SDiskIter *pIter) {
if (pDisk == NULL) return NULL;
- int level = DISK_LEVEL(pDisk);
- int id = DISK_ID(pDisk);
+ int32_t level = pDisk->level;
+ int32_t id = pDisk->id;
id++;
if (id < TIER_NDISKS(TFS_TIER_AT(level))) {
@@ -596,21 +583,21 @@ static SDisk *tfsNextDisk(SDiskIter *pIter) {
// OTHER FUNCTIONS ===================================
void taosGetDisk() {
const double unit = 1024 * 1024 * 1024;
- SysDiskSize diskSize;
+ SDiskSize diskSize;
SFSMeta fsMeta;
- tfsUpdateInfo(&fsMeta, NULL, 0);
- tsTotalDataDirGB = (float)(fsMeta.tsize / unit);
+ tfsUpdateSize(&fsMeta);
+ tsTotalDataDirGB = (float)(fsMeta.total / unit);
tsUsedDataDirGB = (float)(fsMeta.used / unit);
tsAvailDataDirGB = (float)(fsMeta.avail / unit);
if (taosGetDiskSize(tsLogDir, &diskSize) == 0) {
- tsTotalLogDirGB = (float)(diskSize.tsize / unit);
+ tsTotalLogDirGB = (float)(diskSize.total / unit);
tsAvailLogDirGB = (float)(diskSize.avail / unit);
}
if (taosGetDiskSize(tsTempDir, &diskSize) == 0) {
- tsTotalTmpDirGB = (float)(diskSize.tsize / unit);
+ tsTotalTmpDirGB = (float)(diskSize.total / unit);
tsAvailTmpDirectorySpace = (float)(diskSize.avail / unit);
}
}
diff --git a/source/libs/tfs/src/tdisk.c b/source/libs/tfs/src/tfsDisk.c
similarity index 52%
rename from source/libs/tfs/src/tdisk.c
rename to source/libs/tfs/src/tfsDisk.c
index 22601e48c3607fca53c91ea00b17551c791302bd..a5ef1121ffd35ec87c3b58d167c13620296ca840 100644
--- a/source/libs/tfs/src/tdisk.c
+++ b/source/libs/tfs/src/tfsDisk.c
@@ -12,48 +12,45 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*/
-#include "os.h"
-#include "taoserror.h"
-#include "tfsint.h"
+#define _DEFAULT_SOURCE
+#include "tfsInt.h"
-// PROTECTED ====================================
-SDisk *tfsNewDisk(int level, int id, const char *dir) {
- SDisk *pDisk = (SDisk *)calloc(1, sizeof(*pDisk));
+SDisk *tfsNewDisk(int32_t level, int32_t id, const char *path) {
+ SDisk *pDisk = calloc(1, sizeof(SDisk));
if (pDisk == NULL) {
- terrno = TSDB_CODE_FS_OUT_OF_MEMORY;
+ terrno = TSDB_CODE_OUT_OF_MEMORY;
+ return NULL;
+ }
+
+ pDisk->path = strdup(path);
+ if (pDisk->path == NULL) {
+ free(pDisk);
+ terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
pDisk->level = level;
pDisk->id = id;
- tstrncpy(pDisk->dir, dir, TSDB_FILENAME_LEN);
-
+ taosGetDiskSize(pDisk->path, &pDisk->size);
return pDisk;
}
SDisk *tfsFreeDisk(SDisk *pDisk) {
- if (pDisk) {
+ if (pDisk != NULL) {
+ free(pDisk->path);
free(pDisk);
}
+
return NULL;
}
-int tfsUpdateDiskInfo(SDisk *pDisk) {
- ASSERT(pDisk != NULL);
-
- SysDiskSize diskSize = {0};
-
- int code = taosGetDiskSize(pDisk->dir, &diskSize);
- if (code != 0) {
- fError("failed to update disk information at level %d id %d dir %s since %s", pDisk->level, pDisk->id, pDisk->dir,
- strerror(errno));
+int32_t tfsUpdateDiskSize(SDisk *pDisk) {
+ if (taosGetDiskSize(pDisk->path, &pDisk->size) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
+ fError("failed to get disk:%s size, level:%d id:%d since %s", pDisk->path, pDisk->level, pDisk->id, terrstr());
+ return -1;
}
- pDisk->dmeta.size = diskSize.tsize;
- pDisk->dmeta.used = diskSize.used;
- pDisk->dmeta.free = diskSize.avail;
-
- return code;
+ return 0;
}
diff --git a/source/libs/tfs/src/tfsTier.c b/source/libs/tfs/src/tfsTier.c
new file mode 100644
index 0000000000000000000000000000000000000000..2a26c23ead3394c1ba552f0c8e4d4fbf10f52af1
--- /dev/null
+++ b/source/libs/tfs/src/tfsTier.c
@@ -0,0 +1,143 @@
+/*
+ * Copyright (c) 2019 TAOS Data, Inc.
+ *
+ * This program is free software: you can use, redistribute, and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3
+ * or later ("AGPL"), as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+#define _DEFAULT_SOURCE
+#include "tfsInt.h"
+
+#define tfsLockTier(pTier) pthread_spin_lock(&(pTier)->lock)
+#define tfsUnLockTier(pTier) pthread_spin_unlock(&(pTier)->lock)
+
+int32_t tfsInitTier(STier *pTier, int32_t level) {
+ memset(pTier, 0, sizeof(STier));
+
+ if (pthread_spin_init(&pTier->lock, 0) != 0) {
+ terrno = TAOS_SYSTEM_ERROR(errno);
+ return -1;
+ }
+
+ pTier->level = level;
+ return 0;
+}
+
+void tfsDestroyTier(STier *pTier) {
+ for (int32_t id = 0; id < TSDB_MAX_DISKS_PER_TIER; id++) {
+ pTier->disks[id] = tfsFreeDisk(pTier->disks[id]);
+ }
+
+ pTier->ndisk = 0;
+ pthread_spin_destroy(&(pTier->lock));
+}
+
+SDisk *tfsMountDiskToTier(STier *pTier, SDiskCfg *pCfg) {
+ if (pTier->ndisk >= TSDB_MAX_DISKS_PER_TIER) {
+ terrno = TSDB_CODE_FS_TOO_MANY_MOUNT;
+ return NULL;
+ }
+
+ int32_t id = 0;
+ if (pTier->level == 0) {
+ if (pTier->disks[0] != NULL) {
+ id = pTier->ndisk;
+ } else {
+ if (pCfg->primary) {
+ id = 0;
+ } else {
+ id = pTier->ndisk + 1;
+ }
+ }
+ } else {
+ id = pTier->ndisk;
+ }
+
+ if (id >= TSDB_MAX_DISKS_PER_TIER) {
+ terrno = TSDB_CODE_FS_TOO_MANY_MOUNT;
+ return NULL;
+ }
+
+ SDisk *pDisk = tfsNewDisk(pCfg->level, id, pCfg->dir);
+ if (pDisk == NULL) return NULL;
+
+ pTier->disks[id] = pDisk;
+ pTier->ndisk++;
+
+ fInfo("disk %s is mounted to tier level %d id %d", pCfg->dir, pCfg->level, id);
+ return pTier->disks[id];
+}
+
+void tfsUpdateTierSize(STier *pTier) {
+ SDiskSize size = {0};
+ int16_t nAvailDisks = 0;
+
+ tfsLockTier(pTier);
+
+ for (int32_t id = 0; id < pTier->ndisk; id++) {
+ SDisk *pDisk = pTier->disks[id];
+ if (pDisk == NULL) continue;
+
+ size.total += pDisk->size.total;
+ size.used += pDisk->size.used;
+ size.avail += pDisk->size.avail;
+ nAvailDisks++;
+ }
+
+ pTier->size = size;
+ pTier->nAvailDisks = nAvailDisks;
+
+ tfsUnLockTier(pTier);
+}
+
+// Round-Robin to allocate disk on a tier
+int32_t tfsAllocDiskOnTier(STier *pTier) {
+ terrno = TSDB_CODE_FS_NO_VALID_DISK;
+
+ tfsLockTier(pTier);
+
+ if (pTier->ndisk <= 0 || pTier->nAvailDisks <= 0) {
+ tfsUnLockTier(pTier);
+ return -1;
+ }
+
+ int32_t retId = -1;
+ for (int32_t id = 0; id < TSDB_MAX_DISKS_PER_TIER; ++id) {
+ int32_t diskId = (pTier->nextid + id) % pTier->ndisk;
+ SDisk *pDisk = pTier->disks[diskId];
+
+ if (pDisk == NULL) continue;
+
+ if (pDisk->size.avail < TFS_MIN_DISK_FREE_SIZE) continue;
+
+ retId = diskId;
+ terrno = 0;
+ pTier->nextid = (diskId + 1) % pTier->ndisk;
+ break;
+ }
+
+ tfsUnLockTier(pTier);
+ return retId;
+}
+
+void tfsPosNextId(STier *pTier) {
+ int32_t nextid = 0;
+
+ for (int32_t id = 1; id < pTier->ndisk; id++) {
+ SDisk *pLDisk = pTier->disks[nextid];
+ SDisk *pDisk = pTier->disks[id];
+ if (pDisk->size.avail > TFS_MIN_DISK_FREE_SIZE && pDisk->size.avail > pLDisk->size.avail) {
+ nextid = id;
+ }
+ }
+
+ pTier->nextid = nextid;
+}
diff --git a/source/libs/tfs/src/ttier.c b/source/libs/tfs/src/ttier.c
deleted file mode 100644
index 3b19797acf67f36a31c30dccefc0c985512f510b..0000000000000000000000000000000000000000
--- a/source/libs/tfs/src/ttier.c
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Copyright (c) 2019 TAOS Data, Inc.
- *
- * This program is free software: you can use, redistribute, and/or modify
- * it under the terms of the GNU Affero General Public License, version 3
- * or later ("AGPL"), as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see .
- */
-#include "os.h"
-
-#include "taosdef.h"
-#include "taoserror.h"
-#include "tfsint.h"
-
-#define tfsLockTier(pTier) pthread_spin_lock(&((pTier)->lock))
-#define tfsUnLockTier(pTier) pthread_spin_unlock(&((pTier)->lock))
-
-// PROTECTED ==========================================
-int tfsInitTier(STier *pTier, int level) {
- memset((void *)pTier, 0, sizeof(*pTier));
-
- int code = pthread_spin_init(&(pTier->lock), 0);
- if (code) {
- terrno = TAOS_SYSTEM_ERROR(code);
- return -1;
- }
-
- pTier->level = level;
-
- return 0;
-}
-
-void tfsDestroyTier(STier *pTier) {
- for (int id = 0; id < TSDB_MAX_DISKS_PER_TIER; id++) {
- DISK_AT_TIER(pTier, id) = tfsFreeDisk(DISK_AT_TIER(pTier, id));
- }
-
- pTier->ndisk = 0;
-
- pthread_spin_destroy(&(pTier->lock));
-}
-
-SDisk *tfsMountDiskToTier(STier *pTier, SDiskCfg *pCfg) {
- ASSERT(pTier->level == pCfg->level);
-
- int id = 0;
- SDisk *pDisk;
-
- if (TIER_NDISKS(pTier) >= TSDB_MAX_DISKS_PER_TIER) {
- terrno = TSDB_CODE_FS_TOO_MANY_MOUNT;
- return NULL;
- }
-
- if (pTier->level == 0) {
- if (DISK_AT_TIER(pTier, 0) != NULL) {
- id = pTier->ndisk;
- } else {
- if (pCfg->primary) {
- id = 0;
- } else {
- id = pTier->ndisk + 1;
- }
- if (id >= TSDB_MAX_DISKS_PER_TIER) {
- terrno = TSDB_CODE_FS_TOO_MANY_MOUNT;
- return NULL;
- }
- }
- } else {
- id = pTier->ndisk;
- }
-
- pDisk = tfsNewDisk(pCfg->level, id, pCfg->dir);
- if (pDisk == NULL) return NULL;
- DISK_AT_TIER(pTier, id) = pDisk;
- pTier->ndisk++;
-
- fInfo("disk %s is mounted to tier level %d id %d", pCfg->dir, pCfg->level, id);
-
- return DISK_AT_TIER(pTier, id);
-}
-
-void tfsUpdateTierInfo(STier *pTier, STierMeta *pTierMeta) {
- STierMeta tmeta;
-
- if (pTierMeta == NULL) {
- pTierMeta = &tmeta;
- }
- memset(pTierMeta, 0, sizeof(*pTierMeta));
-
- tfsLockTier(pTier);
-
- for (int id = 0; id < pTier->ndisk; id++) {
- if (tfsUpdateDiskInfo(DISK_AT_TIER(pTier, id)) < 0) {
- continue;
- }
- pTierMeta->size += DISK_SIZE(DISK_AT_TIER(pTier, id));
- pTierMeta->used += DISK_USED_SIZE(DISK_AT_TIER(pTier, id));
- pTierMeta->free += DISK_FREE_SIZE(DISK_AT_TIER(pTier, id));
- pTierMeta->nAvailDisks++;
- }
-
- pTier->tmeta = *pTierMeta;
-
- tfsUnLockTier(pTier);
-}
-
-// Round-Robin to allocate disk on a tier
-int tfsAllocDiskOnTier(STier *pTier) {
- ASSERT(pTier->ndisk > 0);
- int id = TFS_UNDECIDED_ID;
- SDisk *pDisk;
-
- tfsLockTier(pTier);
-
- if (TIER_AVAIL_DISKS(pTier) <= 0) {
- tfsUnLockTier(pTier);
- return id;
- }
-
- id = pTier->nextid;
- while (true) {
- pDisk = DISK_AT_TIER(pTier, id);
- ASSERT(pDisk != NULL);
-
- if (DISK_FREE_SIZE(pDisk) < TFS_MIN_DISK_FREE_SIZE) {
- id = (id + 1) % pTier->ndisk;
- if (id == pTier->nextid) {
- tfsUnLockTier(pTier);
- return TFS_UNDECIDED_ID;
- } else {
- continue;
- }
- } else {
- pTier->nextid = (id + 1) % pTier->ndisk;
- break;
- }
- }
-
- tfsUnLockTier(pTier);
- return id;
-}
-
-void tfsGetTierMeta(STier *pTier, STierMeta *pTierMeta) {
- ASSERT(pTierMeta != NULL);
-
- tfsLockTier(pTier);
- *pTierMeta = pTier->tmeta;
- tfsUnLockTier(pTier);
-}
-
-void tfsPosNextId(STier *pTier) {
- ASSERT(pTier->ndisk > 0);
- int nextid = 0;
-
- for (int id = 1; id < pTier->ndisk; id++) {
- SDisk *pLDisk = DISK_AT_TIER(pTier, nextid);
- SDisk *pDisk = DISK_AT_TIER(pTier, id);
- if (DISK_FREE_SIZE(pDisk) > TFS_MIN_DISK_FREE_SIZE && DISK_FREE_SIZE(pDisk) > DISK_FREE_SIZE(pLDisk)) {
- nextid = id;
- }
- }
-
- pTier->nextid = nextid;
-}
diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c
index d63008008697b8d10c3ae0e4f0cb0e587bbb4e6d..a3894ceedd4d5863965d7aae04076c4a6bdf5525 100644
--- a/source/libs/wal/src/walMeta.c
+++ b/source/libs/wal/src/walMeta.c
@@ -149,6 +149,7 @@ int walCheckAndRepairMeta(SWal* pWal) {
}
}
+ closedir(dir);
regfree(&logRegPattern);
regfree(&idxRegPattern);
diff --git a/source/os/src/osSysinfo.c b/source/os/src/osSysinfo.c
index 0be17ca2b9299a4477d58f0527e0c8658e07c987..cae1b18b3c3dc876d9a7b2b54b1e4735ec417e4c 100644
--- a/source/os/src/osSysinfo.c
+++ b/source/os/src/osSysinfo.c
@@ -121,7 +121,7 @@ bool taosGetCpuUsage(float *sysCpuUsage, float *procCpuUsage) {
return true;
}
-int32_t taosGetDiskSize(char *dataDir, SysDiskSize *diskSize) {
+int32_t taosGetDiskSize(char *dataDir, SDiskSize *diskSize) {
unsigned _int64 i64FreeBytesToCaller;
unsigned _int64 i64TotalBytes;
unsigned _int64 i64FreeBytes;
@@ -438,7 +438,7 @@ int taosSystem(const char *cmd) {
void taosSetCoreDump() {}
-int32_t taosGetDiskSize(char *dataDir, SysDiskSize *diskSize) {
+int32_t taosGetDiskSize(char *dataDir, SDiskSize *diskSize) {
struct statvfs info;
if (statvfs(dataDir, &info)) {
//printf("failed to get disk size, dataDir:%s errno:%s", tsDataDir, strerror(errno));
@@ -771,13 +771,12 @@ bool taosGetCpuUsage(float *sysCpuUsage, float *procCpuUsage) {
return true;
}
-int32_t taosGetDiskSize(char *dataDir, SysDiskSize *diskSize) {
+int32_t taosGetDiskSize(char *dataDir, SDiskSize *diskSize) {
struct statvfs info;
if (statvfs(dataDir, &info)) {
- //printf("failed to get disk size, dataDir:%s errno:%s", dataDir, strerror(errno));
return -1;
} else {
- diskSize->tsize = info.f_blocks * info.f_frsize;
+ diskSize->total = info.f_blocks * info.f_frsize;
diskSize->avail = info.f_bavail * info.f_frsize;
diskSize->used = (info.f_blocks - info.f_bfree) * info.f_frsize;
return 0;
diff --git a/source/util/src/terror.c b/source/util/src/terror.c
index 144de08cd0a83ad90ba9f9368b726276cd0a91d9..98a6cde37a6880e796b4f12ba001f4bedcae5223 100644
--- a/source/util/src/terror.c
+++ b/source/util/src/terror.c
@@ -396,7 +396,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_WAL_FILE_CORRUPTED, "WAL file is corrupted
TAOS_DEFINE_ERROR(TSDB_CODE_WAL_SIZE_LIMIT, "WAL size exceeds limit")
// tfs
-TAOS_DEFINE_ERROR(TSDB_CODE_FS_OUT_OF_MEMORY, "tfs out of memory")
+TAOS_DEFINE_ERROR(TSDB_CODE_FS_APP_ERROR, "tfs out of memory")
TAOS_DEFINE_ERROR(TSDB_CODE_FS_INVLD_CFG, "tfs invalid mount config")
TAOS_DEFINE_ERROR(TSDB_CODE_FS_TOO_MANY_MOUNT, "tfs too many mount")
TAOS_DEFINE_ERROR(TSDB_CODE_FS_DUP_PRIMARY, "tfs duplicate primary mount")
diff --git a/src/inc/tfs.h b/src/inc/tfs.h
deleted file mode 100644
index 9ad7a8f66e76bd88612e35e8e4d54c66f862b97f..0000000000000000000000000000000000000000
--- a/src/inc/tfs.h
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Copyright (c) 2019 TAOS Data, Inc.
- *
- * This program is free software: you can use, redistribute, and/or modify
- * it under the terms of the GNU Affero General Public License, version 3
- * or later ("AGPL"), as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see .
- */
-
-#ifndef TD_TFS_H
-#define TD_TFS_H
-
-#include "tglobal.h"
-
-#ifdef __cplusplus
-extern "C" {
-#endif
-
-typedef struct {
- int level;
- int id;
-} SDiskID;
-
-#define TFS_UNDECIDED_LEVEL -1
-#define TFS_UNDECIDED_ID -1
-#define TFS_PRIMARY_LEVEL 0
-#define TFS_PRIMARY_ID 0
-#define TFS_MIN_LEVEL 0
-#define TFS_MAX_LEVEL (TSDB_MAX_TIERS - 1)
-
-// FS APIs ====================================
-typedef struct {
- int64_t tsize;
- int64_t used;
- int64_t avail;
-} SFSMeta;
-
-typedef struct {
- int64_t size;
- int64_t used;
- int64_t free;
- int16_t nAvailDisks; // # of Available disks
-} STierMeta;
-
-int tfsInit(SDiskCfg *pDiskCfg, int ndisk);
-void tfsCleanup();
-void tfsUpdateInfo(SFSMeta *pFSMeta, STierMeta *tierMetas, int8_t numLevels);
-void tfsGetMeta(SFSMeta *pMeta);
-void tfsAllocDisk(int expLevel, int *level, int *id);
-
-const char *TFS_PRIMARY_PATH();
-const char *TFS_DISK_PATH(int level, int id);
-
-// TFILE APIs ====================================
-typedef struct {
- int level;
- int id;
- char rname[TSDB_FILENAME_LEN]; // REL name
- char aname[TSDB_FILENAME_LEN]; // ABS name
-} TFILE;
-
-#define TFILE_LEVEL(pf) ((pf)->level)
-#define TFILE_ID(pf) ((pf)->id)
-#define TFILE_NAME(pf) ((pf)->aname)
-#define TFILE_REL_NAME(pf) ((pf)->rname)
-
-#define tfsopen(pf, flags) open(TFILE_NAME(pf), flags)
-#define tfsclose(fd) close(fd)
-#define tfsremove(pf) remove(TFILE_NAME(pf))
-#define tfscopy(sf, df) taosCopy(TFILE_NAME(sf), TFILE_NAME(df))
-#define tfsrename(sf, df) taosRename(TFILE_NAME(sf), TFILE_NAME(df))
-
-void tfsInitFile(TFILE *pf, int level, int id, const char *bname);
-bool tfsIsSameFile(const TFILE *pf1, const TFILE *pf2);
-int tfsEncodeFile(void **buf, TFILE *pf);
-void *tfsDecodeFile(void *buf, TFILE *pf);
-void tfsbasename(const TFILE *pf, char *dest);
-void tfsdirname(const TFILE *pf, char *dest);
-
-// DIR APIs ====================================
-int tfsMkdirAt(const char *rname, int level, int id);
-int tfsMkdirRecurAt(const char *rname, int level, int id);
-int tfsMkdir(const char *rname);
-int tfsRmdir(const char *rname);
-int tfsRename(char *orname, char *nrname);
-
-typedef struct TDIR TDIR;
-
-TDIR * tfsOpendir(const char *rname);
-const TFILE *tfsReaddir(TDIR *tdir);
-void tfsClosedir(TDIR *tdir);
-
-#ifdef __cplusplus
-}
-#endif
-
-#endif
diff --git a/tests/script/sh/massiveTable/cleanCluster.sh b/tests/script/sh/massiveTable/cleanCluster.sh
new file mode 100755
index 0000000000000000000000000000000000000000..af278933b27bff5d65e13da50b8e8e16de44202a
--- /dev/null
+++ b/tests/script/sh/massiveTable/cleanCluster.sh
@@ -0,0 +1,57 @@
+#!/bin/bash
+#
+# clean test environment
+
+set -e
+#set -x
+
+# cleanCluster.sh
+# -r [ dnode root dir]
+
+
+dataRootDir="/data"
+
+
+while getopts "hr:" arg
+do
+ case $arg in
+ r)
+ dataRootDir=$(echo $OPTARG)
+ ;;
+ h)
+ echo "Usage: `basename $0` -r [ dnode root dir] "
+ exit 0
+ ;;
+ ?) #unknow option
+ echo "unkonw argument"
+ exit 1
+ ;;
+ esac
+done
+
+
+rmDnodesDataDir() {
+ if [ -d ${dataRootDir} ]; then
+ rm -rf ${dataRootDir}/dnode*
+ else
+ echo "${dataRootDir} not exist"
+ exit 1
+ fi
+}
+
+function kill_process() {
+ pid=$(ps -ef | grep "$1" | grep -v "grep" | awk '{print $2}')
+ if [ -n "$pid" ]; then
+ kill -9 $pid || :
+ fi
+}
+
+########################################################################################
+############################### main process ##########################################
+
+## kill all taosd process
+kill_process taosd
+
+rmDnodesDataDir
+
+
diff --git a/tests/script/sh/massiveTable/compileVersion.sh b/tests/script/sh/massiveTable/compileVersion.sh
new file mode 100755
index 0000000000000000000000000000000000000000..1976e8f14ac612297e19d38628544b152010c43d
--- /dev/null
+++ b/tests/script/sh/massiveTable/compileVersion.sh
@@ -0,0 +1,81 @@
+#!/bin/bash
+#
+# compile test version
+
+set -e
+#set -x
+
+# compileVersion.sh
+# -r [ TDengine project dir]
+# -v [ TDengine branch version ]
+
+
+projectDir=/root/TDengine
+TDengineBrVer="3.0"
+
+while getopts "hr:v:" arg
+do
+ case $arg in
+ r)
+ projectDir=$(echo $OPTARG)
+ ;;
+ v)
+ TDengineBrVer=$(echo $OPTARG)
+ ;;
+ h)
+ echo "Usage: `basename $0` -r [ TDengine project dir] "
+ echo " -v [ TDengine branch version] "
+ exit 0
+ ;;
+ ?) #unknow option
+ echo "unkonw argument"
+ exit 1
+ ;;
+ esac
+done
+
+echo "projectDir=${projectDir} TDengineBrVer=${TDengineBrVer}"
+
+function gitPullBranchInfo () {
+ branch_name=$1
+
+ git checkout $branch_name
+ echo "==== git pull $branch_name start ===="
+## git submodule update --init --recursive
+ git pull origin $branch_name ||:
+ echo "==== git pull $branch_name end ===="
+}
+
+function compileTDengineVersion() {
+ debugDir=debug
+ if [ -d ${debugDir} ]; then
+ rm -rf ${debugDir}/* ||:
+ else
+ mkdir -p ${debugDir}
+ fi
+
+ cd ${debugDir}
+ cmake ..
+ make -j24
+}
+########################################################################################
+############################### main process ##########################################
+
+## checkout all branchs and git pull
+cd ${projectDir}
+gitPullBranchInfo $TDengineBrVer
+compileTDengineVersion
+
+taos_dir=${projectDir}/debug/tools/shell
+taosd_dir=${projectDir}/debug/source/dnode/mgmt/daemon
+create_table_dir=${projectDir}/debug/tests/test/c
+
+rm -f /usr/bin/taos
+rm -f /usr/bin/taosd
+rm -f /usr/bin/create_table
+
+ln -s $taos_dir/taos /usr/bin/taos
+ln -s $taosd_dir/taosd /usr/bin/taosd
+ln -s $create_table_dir/create_table /usr/bin/create_table
+
+
diff --git a/tests/script/sh/massiveTable/deployCluster.sh b/tests/script/sh/massiveTable/deployCluster.sh
new file mode 100755
index 0000000000000000000000000000000000000000..47802ea6a7063d13180fa8a2517ea37c8065df58
--- /dev/null
+++ b/tests/script/sh/massiveTable/deployCluster.sh
@@ -0,0 +1,25 @@
+#!/bin/bash
+#
+# deploy test cluster
+
+set -e
+#set -x
+
+# deployCluster.sh
+
+curr_dir=$(pwd)
+
+source ./cleanCluster.sh -r /data
+source ./cleanCluster.sh -r /data2
+
+source ./compileVersion.sh -r ${curr_dir}/../../../../ -v "3.0"
+
+source ./setupDnodes.sh -r /data -n 1 -f trd02:7000 -p 7000
+source ./setupDnodes.sh -r /data2 -n 1 -f trd02:7000 -p 8000
+
+#source ./setupDnodes.sh -r /data -n 2 -f trd02:7000 -p 7000
+#source ./setupDnodes.sh -r /data2 -n 2 -f trd02:7000 -p 8000
+
+
+
+
diff --git a/tests/script/sh/massiveTable/setupDnodes.sh b/tests/script/sh/massiveTable/setupDnodes.sh
new file mode 100755
index 0000000000000000000000000000000000000000..034c15c4ebe95014562e460c15cd2e439eabf0ef
--- /dev/null
+++ b/tests/script/sh/massiveTable/setupDnodes.sh
@@ -0,0 +1,136 @@
+#!/bin/bash
+#
+# setup test environment
+
+set -e
+#set -x
+
+# setupDnodes.sh
+# -e [ new | old]
+# -n [ dnode number]
+# -f [ first ep]
+# -p [ start port]
+# -r [ dnode root dir]
+
+# set parameters by default value
+enviMode=new
+dataRootDir="/data"
+firstEp="localhost:7000"
+startPort=7000
+dnodeNumber=1
+
+
+while getopts "he:f:n:r:p:" arg
+do
+ case $arg in
+ e)
+ enviMode=$( echo $OPTARG )
+ ;;
+ n)
+ dnodeNumber=$(echo $OPTARG)
+ ;;
+ f)
+ firstEp=$(echo $OPTARG)
+ ;;
+ p)
+ startPort=$(echo $OPTARG)
+ ;;
+ r)
+ dataRootDir=$(echo $OPTARG)
+ ;;
+ h)
+ echo "Usage: `basename $0` -e [new | old] "
+ echo " -n [ dnode number] "
+ echo " -f [ first ep] "
+ echo " -p [ start port] "
+ echo " -r [ dnode root dir] "
+ exit 0
+ ;;
+ ?) #unknow option
+ echo "unkonw argument"
+ exit 1
+ ;;
+ esac
+done
+
+echo "enviMode=${enviMode} dnodeNumber=${dnodeNumber} dataRootDir=${dataRootDir} firstEp=${firstEp} startPort=${startPort}"
+
+#curr_dir=$(pwd)
+
+
+createNewCfgFile() {
+ cfgFile=$1/taos.cfg
+ dataDir=$2
+ logDir=$3
+ firstEp=$4
+ serverPort=$5
+
+ echo "debugFlag 131" > ${cfgFile}
+ echo "firstEp ${firstEp}" >> ${cfgFile}
+ echo "dataDir ${dataDir}" >> ${cfgFile}
+ echo "logDir ${logDir}" >> ${cfgFile}
+ echo "serverPort ${serverPort}" >> ${cfgFile}
+
+ echo "supportVnodes 1024" >> ${cfgFile}
+ #echo "asyncLog 0" >> ${cfgFile}
+ echo "telemetryReporting 0" >> ${cfgFile}
+}
+
+createNewDnodesDataDir() {
+ if [ -d ${dataRootDir} ]; then
+ rm -rf ${dataRootDir}/dnode*
+ else
+ echo "${dataRootDir} not exist"
+ exit 1
+ fi
+
+ dnodeNumber=$1
+ firstEp=$2
+
+ serverPort=${startPort}
+ for ((i=0; i<${dnodeNumber}; i++)); do
+ mkdir -p ${dataRootDir}/dnode_${i}/cfg
+ mkdir -p ${dataRootDir}/dnode_${i}/log
+ mkdir -p ${dataRootDir}/dnode_${i}/data
+
+ createNewCfgFile ${dataRootDir}/dnode_${i}/cfg ${dataRootDir}/dnode_${i}/data ${dataRootDir}/dnode_${i}/log ${firstEp} ${serverPort}
+ echo "create dnode: ${serverPort}, ${dataRootDir}/dnode_${i}"
+ serverPort=$((10#${serverPort}+100))
+ done
+}
+
+function kill_process() {
+ pid=$(ps -ef | grep "$1" | grep -v "grep" | awk '{print $2}')
+ if [ -n "$pid" ]; then
+ kill -9 $pid || :
+ fi
+}
+
+startDnodes() {
+ dnodeNumber=$1
+
+ for ((i=0; i<${dnodeNumber}; i++)); do
+ if [ -d ${dataRootDir}/dnode_${i} ]; then
+ nohup taosd -c ${dataRootDir}/dnode_${i}/cfg >/dev/null 2>&1 &
+ echo "start taosd ${dataRootDir}/dnode_${i}"
+ fi
+ done
+}
+
+########################################################################################
+############################### main process ##########################################
+
+## kill all taosd process
+kill_process taosd
+
+## create director for all dnode
+if [[ "$enviMode" == "new" ]]; then
+ createNewDnodesDataDir ${dnodeNumber} ${firstEp}
+fi
+
+## start all dnode by nohup
+startDnodes ${dnodeNumber}
+
+echo " run setupDnodes.sh end !!!"
+
+