提交 f1e7dade 编写于 作者: H Haojun Liao

Merge remote-tracking branch 'origin/3.0' into feature/3.0_liaohj

此差异已折叠。
......@@ -113,8 +113,8 @@ typedef enum {
SDB_USER = 7,
SDB_AUTH = 8,
SDB_ACCT = 9,
SDB_CONSUMER = 10,
SDB_CGROUP = 11,
SDB_SUBSCRIBE = 10,
SDB_CONSUMER = 11,
SDB_TOPIC = 12,
SDB_VGROUP = 13,
SDB_STB = 14,
......
......@@ -16,77 +16,226 @@
#ifndef _TD_TFS_H_
#define _TD_TFS_H_
#include "tglobal.h"
#include "tcfg.h"
#ifdef __cplusplus
extern "C" {
#endif
/* ------------------------ TYPES EXPOSED ------------------------ */
typedef struct STfs STfs;
typedef struct STfsDir STfsDir;
typedef struct {
int32_t level;
int32_t id;
} SDiskID;
#define TFS_UNDECIDED_LEVEL -1
#define TFS_UNDECIDED_ID -1
#define TFS_PRIMARY_LEVEL 0
#define TFS_PRIMARY_ID 0
#define TFS_MIN_LEVEL 0
#define TFS_MAX_LEVEL (TSDB_MAX_TIERS - 1)
// FS APIs ====================================
typedef struct {
int64_t total;
int64_t used;
int64_t avail;
} SFSMeta;
SDiskID did;
char aname[TSDB_FILENAME_LEN]; // ABS name
char rname[TSDB_FILENAME_LEN]; // REL name
STfs *pTfs;
} STfsFile;
int32_t tfsInit(SDiskCfg *pDiskCfg, int32_t ndisk);
void tfsCleanup();
void tfsUpdateSize(SFSMeta *pFSMeta);
void tfsAllocDisk(int32_t expLevel, int32_t *level, int32_t *id);
/**
* @brief Open a fs.
*
* @param pCfg Config of the fs.
* @param ndisk Length of the config.
* @return STfs* The fs object.
*/
STfs *tfsOpen(SDiskCfg *pCfg, int32_t ndisk);
const char *TFS_PRIMARY_PATH();
const char *TFS_DISK_PATH(int32_t level, int32_t id);
/**
* @brief Close a fs.
*
* @param pTfs The fs object to close.
*/
void tfsClose(STfs *pTfs);
// TFILE APIs ====================================
typedef struct {
int32_t level;
int32_t id;
char rname[TSDB_FILENAME_LEN]; // REL name
char aname[TSDB_FILENAME_LEN]; // ABS name
} TFILE;
#define TFILE_LEVEL(pf) ((pf)->level)
#define TFILE_ID(pf) ((pf)->id)
#define TFILE_NAME(pf) ((pf)->aname)
#define TFILE_REL_NAME(pf) ((pf)->rname)
#define tfsopen(pf, flags) open(TFILE_NAME(pf), flags)
#define tfsclose(fd) close(fd)
#define tfsremove(pf) remove(TFILE_NAME(pf))
#define tfscopy(sf, df) taosCopyFile(TFILE_NAME(sf), TFILE_NAME(df))
#define tfsrename(sf, df) taosRename(TFILE_NAME(sf), TFILE_NAME(df))
void tfsInitFile(TFILE *pf, int32_t level, int32_t id, const char *bname);
bool tfsIsSameFile(const TFILE *pf1, const TFILE *pf2);
int32_t tfsEncodeFile(void **buf, TFILE *pf);
void *tfsDecodeFile(void *buf, TFILE *pf);
void tfsbasename(const TFILE *pf, char *dest);
void tfsdirname(const TFILE *pf, char *dest);
// DIR APIs ====================================
int32_t tfsMkdirAt(const char *rname, int32_t level, int32_t id);
int32_t tfsMkdirRecurAt(const char *rname, int32_t level, int32_t id);
int32_t tfsMkdir(const char *rname);
int32_t tfsRmdir(const char *rname);
int32_t tfsRename(char *orname, char *nrname);
typedef struct TDIR TDIR;
TDIR *tfsOpendir(const char *rname);
const TFILE *tfsReaddir(TDIR *tdir);
void tfsClosedir(TDIR *tdir);
/**
* @brief Update the disk size.
*
* @param pTfs The fs object.
*/
void tfsUpdateSize(STfs *pTfs);
/**
* @brief Get the disk size.
*
* @param pTfs The fs object.
*/
SDiskSize tfsGetSize(STfs *pTfs);
/**
* @brief Allocate an existing available tier level from fs.
*
* @param pTfs The fs object.
* @param expLevel Disk level want to allocate.
* @param pDiskId The disk ID after allocation.
* @return int32_t 0 for success, -1 for failure.
*/
int32_t tfsAllocDisk(STfs *pTfs, int32_t expLevel, SDiskID *pDiskId);
/**
* @brief Get the primary path.
*
* @param pTfs The fs object.
* @return const char * The primary path.
*/
const char *tfsGetPrimaryPath(STfs *pTfs);
/**
* @brief Get the disk path.
*
* @param pTfs The fs object.
* @param diskId The diskId.
* @return const char * The primary path.
*/
const char *tfsGetDiskPath(STfs *pTfs, SDiskID diskId);
/**
* @brief Make directory at all levels in tfs.
*
* @param pTfs The fs object.
* @param rname The rel name of directory.
* @return int32_t 0 for success, -1 for failure.
*/
int32_t tfsMkdir(STfs *pTfs, const char *rname);
/**
* @brief Create directories in tfs.
*
* @param pTfs The fs object.
* @param rname The rel name of directory.
* @param diskId The disk ID.
* @return int32_t 0 for success, -1 for failure.
*/
int32_t tfsMkdirAt(STfs *pTfs, const char *rname, SDiskID diskId);
/**
* @brief Recursive create directories in tfs.
*
* @param pTfs The fs object.
* @param rname The rel name of directory.
* @param diskId The disk ID.
* @return int32_t 0 for success, -1 for failure.
*/
int32_t tfsMkdirRecurAt(STfs *pTfs, const char *rname, SDiskID diskId);
/**
* @brief Remove directory at all levels in tfs.
*
* @param pTfs The fs object.
* @param rname The rel name of directory.
* @return int32_t 0 for success, -1 for failure.
*/
int32_t tfsRmdir(STfs *pTfs, const char *rname);
/**
* @brief Rename file/directory in tfs.
*
* @param pTfs The fs object.
* @param orname The rel name of old file.
* @param nrname The rel name of new file.
* @return int32_t 0 for success, -1 for failure.
*/
int32_t tfsRename(STfs *pTfs, char *orname, char *nrname);
/**
* @brief Init file object in tfs.
*
* @param pTfs The fs object.
* @param pFile The file object.
* @param diskId The disk ID.
* @param rname The rel name of file.
*/
void tfsInitFile(STfs *pTfs, STfsFile *pFile, SDiskID diskId, const char *rname);
/**
* @brief Determine whether they are the same file.
*
* @param pFile1 The file object.
* @param pFile2 The file object.
* @param bool The compare result.
*/
bool tfsIsSameFile(const STfsFile *pFile1, const STfsFile *pFile2);
/**
* @brief Encode file name to a buffer.
*
* @param buf The buffer where file name are saved.
* @param pFile The file object.
* @return int32_t 0 for success, -1 for failure.
*/
int32_t tfsEncodeFile(void **buf, STfsFile *pFile);
/**
* @brief Decode file name from a buffer.
*
* @param pTfs The fs object.
* @param buf The buffer where file name are saved.
* @param pFile The file object.
* @return void * Buffer address after decode.
*/
void *tfsDecodeFile(STfs *pTfs, void *buf, STfsFile *pFile);
/**
* @brief Get the basename of the file.
*
* @param pFile The file object.
* @param dest The buffer where basename will be saved.
*/
void tfsBasename(const STfsFile *pFile, char *dest);
/**
* @brief Get the dirname of the file.
*
* @param pFile The file object.
* @param dest The buffer where dirname will be saved.
*/
void tfsDirname(const STfsFile *pFile, char *dest);
/**
* @brief Remove file in tfs.
*
* @param pFile The file to be removed.
* @return int32_t 0 for success, -1 for failure.
*/
int32_t tfsRemoveFile(const STfsFile *pFile);
/**
* @brief Copy file in tfs.
*
* @param pFile1 The src file.
* @param pFile2 The dest file.
* @return int32_t 0 for success, -1 for failure.
*/
int32_t tfsCopyFile(const STfsFile *pFile1, const STfsFile *pFile2);
/**
* @brief Open a directory for traversal.
*
* @param rname The rel name of file.
* @return STfsDir* The dir object.
*/
STfsDir *tfsOpendir(STfs *pTfs, const char *rname);
/**
* @brief Get a file from dir and move to next pos.
*
* @param pDir The dir object.
* @return STfsFile* The file in dir.
*/
const STfsFile *tfsReaddir(STfsDir *pDir);
/**
* @brief Close a directory.
*
* @param pDir The dir object.
*/
void tfsClosedir(STfsDir *pDir);
#ifdef __cplusplus
}
......
......@@ -58,8 +58,8 @@ int64_t taosWriteFile(FileFd fd, const void *buf, int64_t count);
void taosCloseFile(FileFd fd);
int32_t taosRenameFile(char *oldName, char *newName);
int64_t taosCopyFile(char *from, char *to);
int32_t taosRenameFile(const char *oldName, const char *newName);
int64_t taosCopyFile(const char *from, const char *to);
void taosGetTmpfilePath(const char *inputTmpDir, const char *fileNamePrefix, char *dstPath);
......
......@@ -178,6 +178,7 @@ do { \
#define TSDB_TABLE_FNAME_LEN (TSDB_DB_FNAME_LEN + TSDB_TABLE_NAME_LEN + TSDB_NAME_DELIMITER_LEN)
#define TSDB_TOPIC_FNAME_LEN TSDB_TABLE_FNAME_LEN
#define TSDB_CONSUMER_GROUP_LEN 192
#define TSDB_SUBSCRIBE_KEY_LEN (TSDB_CONSUMER_GROUP_LEN + TSDB_TOPIC_FNAME_LEN + 2)
#define TSDB_COL_NAME_LEN 65
#define TSDB_MAX_SAVED_SQL_LEN TSDB_MAX_COLUMNS * 64
#define TSDB_MAX_SQL_LEN TSDB_PAYLOAD_SIZE
......@@ -372,10 +373,14 @@ do { \
#define TSDB_ARB_DUMMY_TIME 4765104000000 // 2121-01-01 00:00:00.000, :P
#define TSDB_MAX_TIERS 3
#define TSDB_MAX_DISKS_PER_TIER 16
#define TSDB_MAX_DISKS (TSDB_MAX_TIERS * TSDB_MAX_DISKS_PER_TIER)
#define TFS_MAX_TIERS 3
#define TFS_MAX_DISKS_PER_TIER 16
#define TFS_MAX_DISKS (TFS_MAX_TIERS * TFS_MAX_DISKS_PER_TIER)
#define TFS_MIN_LEVEL 0
#define TFS_MAX_LEVEL (TFS_MAX_TIERS - 1)
#define TFS_PRIMARY_LEVEL 0
#define TFS_PRIMARY_ID 0
#define TFS_MIN_DISK_FREE_SIZE 50 * 1024 * 1024
enum { TRANS_STAT_INIT = 0, TRANS_STAT_EXECUTING, TRANS_STAT_EXECUTED, TRANS_STAT_ROLLBACKING, TRANS_STAT_ROLLBACKED };
enum { TRANS_OPER_INIT = 0, TRANS_OPER_EXECUTE, TRANS_OPER_ROLLBACK };
......
......@@ -137,7 +137,7 @@ int32_t tsDiskCfgNum = 0;
#ifndef _STORAGE
SDiskCfg tsDiskCfg[1];
#else
SDiskCfg tsDiskCfg[TSDB_MAX_DISKS];
SDiskCfg tsDiskCfg[TFS_MAX_DISKS];
#endif
/*
......
......@@ -134,6 +134,7 @@ typedef struct SDnode {
SBnodeMgmt bmgmt;
SVnodesMgmt vmgmt;
STransMgmt tmgmt;
STfs *pTfs;
SStartupReq startup;
} SDnode;
......
......@@ -43,6 +43,7 @@ extern "C" {
#include "qnode.h"
#include "snode.h"
#include "vnode.h"
#include "tfs.h"
extern int32_t dDebugFlag;
......
......@@ -173,11 +173,12 @@ SDnode *dndCreate(SDnodeObjCfg *pCfg) {
return NULL;
}
SDiskCfg dCfg;
strcpy(dCfg.dir, pDnode->cfg.dataDir);
SDiskCfg dCfg = {0};
tstrncpy(dCfg.dir, pDnode->cfg.dataDir, TSDB_FILENAME_LEN);
dCfg.level = 0;
dCfg.primary = 1;
if (tfsInit(&dCfg, 1) != 0) {
pDnode->pTfs = tfsOpen(&dCfg, 1);
if (pDnode->pTfs == NULL) {
dError("failed to init tfs since %s", terrstr());
dndClose(pDnode);
return NULL;
......@@ -251,7 +252,7 @@ void dndClose(SDnode *pDnode) {
dndCleanupQnode(pDnode);
dndCleanupVnodes(pDnode);
dndCleanupMgmt(pDnode);
tfsCleanup();
tfsClose(pDnode->pTfs);
dndCloseImp(pDnode);
free(pDnode);
......@@ -313,4 +314,28 @@ void dndCleanup() {
taosStopCacheRefreshWorker();
dInfo("dnode env is cleaned up");
}
// OTHER FUNCTIONS ===================================
void taosGetDisk() {
#if 0
const double unit = 1024 * 1024 * 1024;
SDiskSize diskSize = tfsGetSize(pTfs);
tfsUpdateSize(&fsMeta);
tsTotalDataDirGB = (float)(fsMeta.total / unit);
tsUsedDataDirGB = (float)(fsMeta.used / unit);
tsAvailDataDirGB = (float)(fsMeta.avail / unit);
if (taosGetDiskSize(tsLogDir, &diskSize) == 0) {
tsTotalLogDirGB = (float)(diskSize.total / unit);
tsAvailLogDirGB = (float)(diskSize.avail / unit);
}
if (taosGetDiskSize(tsTempDir, &diskSize) == 0) {
tsTotalTmpDirGB = (float)(diskSize.total / unit);
tsAvailTmpDirectorySpace = (float)(diskSize.avail / unit);
}
#endif
}
\ No newline at end of file
......@@ -381,7 +381,7 @@ static void *dnodeOpenVnodeFunc(void *param) {
pMgmt->openVnodes, pMgmt->totalVnodes);
dndReportStartup(pDnode, "open-vnodes", stepDesc);
SVnodeCfg cfg = {.pDnode = pDnode, .vgId = pCfg->vgId};
SVnodeCfg cfg = {.pDnode = pDnode, .pTfs = pDnode->pTfs, .vgId = pCfg->vgId};
SVnode *pImpl = vnodeOpen(pCfg->path, &cfg);
if (pImpl == NULL) {
dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex);
......@@ -587,6 +587,7 @@ int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
}
vnodeCfg.pDnode = pDnode;
vnodeCfg.pTfs = pDnode->pTfs;
SVnode *pImpl = vnodeOpen(wrapperCfg.path, &vnodeCfg);
if (pImpl == NULL) {
dError("vgId:%d, failed to create vnode since %s", pCreate->vgId, terrstr());
......@@ -841,6 +842,7 @@ static SVnodeObj *dndAcquireVnodeFromMsg(SDnode *pDnode, SRpcMsg *pMsg) {
SVnodeObj *pVnode = dndAcquireVnode(pDnode, pHead->vgId);
if (pVnode == NULL) {
dError("vgId:%d, failed to acquire vnode while process req", pHead->vgId);
if (pMsg->msgType & 1u) {
SRpcMsg rsp = {.handle = pMsg->handle, .code = TSDB_CODE_VND_INVALID_VGROUP_ID};
rpcSendResponse(&rsp);
......
......@@ -7,6 +7,7 @@ target_include_directories(
)
target_link_libraries(
mnode
PRIVATE scheduler
PRIVATE sdb
PRIVATE wal
PRIVATE transport
......@@ -16,4 +17,4 @@ target_link_libraries(
if(${BUILD_TEST})
add_subdirectory(test)
endif(${BUILD_TEST})
\ No newline at end of file
endif(${BUILD_TEST})
......@@ -28,6 +28,9 @@ void mndCleanupConsumer(SMnode *pMnode);
SMqConsumerObj *mndAcquireConsumer(SMnode *pMnode, int32_t consumerId);
void mndReleaseConsumer(SMnode *pMnode, SMqConsumerObj *pConsumer);
SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer);
SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw);
#ifdef __cplusplus
}
#endif
......
......@@ -26,6 +26,7 @@
#include "tlog.h"
#include "trpc.h"
#include "ttimer.h"
#include "scheduler.h"
#include "mnode.h"
......@@ -326,17 +327,156 @@ typedef struct SMqTopicConsumer {
#endif
typedef struct SMqConsumerEp {
int32_t vgId;
int32_t vgId; // -1 for unassigned
SEpSet epset;
int64_t consumerId;
int64_t consumerId; // -1 for unassigned
int64_t lastConsumerHbTs;
int64_t lastVgHbTs;
} SMqConsumerEp;
typedef struct SMqCgroupTopicPair {
char key[TSDB_CONSUMER_GROUP_LEN + TSDB_TOPIC_FNAME_LEN];
SArray* assigned; // SArray<SMqConsumerEp>
SArray* unassignedConsumer;
SArray* unassignedVg;
} SMqCgroupTopicPair;
static FORCE_INLINE int32_t tEncodeSMqConsumerEp(void** buf, SMqConsumerEp* pConsumerEp) {
int32_t tlen = 0;
tlen += taosEncodeFixedI32(buf, pConsumerEp->vgId);
tlen += taosEncodeSEpSet(buf, &pConsumerEp->epset);
tlen += taosEncodeFixedI64(buf, pConsumerEp->consumerId);
return tlen;
}
static FORCE_INLINE void* tDecodeSMqConsumerEp(void** buf, SMqConsumerEp* pConsumerEp) {
buf = taosDecodeFixedI32(buf, &pConsumerEp->vgId);
buf = taosDecodeSEpSet(buf, &pConsumerEp->epset);
buf = taosDecodeFixedI64(buf, &pConsumerEp->consumerId);
return buf;
}
//unit for rebalance
typedef struct SMqSubscribeObj {
char key[TSDB_SUBSCRIBE_KEY_LEN];
int32_t epoch;
//TODO: replace with priority queue
SArray* availConsumer; // SArray<int64_t> (consumerId)
SArray* assigned; // SArray<SMqConsumerEp>
SArray* unassignedConsumer; // SArray<SMqConsumerEp>
SArray* unassignedVg; // SArray<SMqConsumerEp>
} SMqSubscribeObj;
static FORCE_INLINE SMqSubscribeObj* tNewSubscribeObj() {
SMqSubscribeObj* pSub = malloc(sizeof(SMqSubscribeObj));
pSub->key[0] = 0;
pSub->epoch = 0;
if (pSub == NULL) {
return NULL;
}
pSub->availConsumer = taosArrayInit(0, sizeof(int64_t));
if (pSub->availConsumer == NULL) {
free(pSub);
return NULL;
}
pSub->assigned = taosArrayInit(0, sizeof(SMqConsumerEp));
if (pSub->assigned == NULL) {
taosArrayDestroy(pSub->availConsumer);
free(pSub);
return NULL;
}
pSub->unassignedConsumer = taosArrayInit(0, sizeof(SMqConsumerEp));
if (pSub->assigned == NULL) {
taosArrayDestroy(pSub->availConsumer);
taosArrayDestroy(pSub->unassignedConsumer);
free(pSub);
return NULL;
}
pSub->unassignedVg = taosArrayInit(0, sizeof(SMqConsumerEp));
if (pSub->assigned == NULL) {
taosArrayDestroy(pSub->availConsumer);
taosArrayDestroy(pSub->unassignedConsumer);
taosArrayDestroy(pSub->unassignedVg);
free(pSub);
return NULL;
}
return NULL;
}
static FORCE_INLINE int32_t tEncodeSubscribeObj(void** buf, const SMqSubscribeObj* pSub) {
int32_t tlen = 0;
tlen += taosEncodeString(buf, pSub->key);
tlen += taosEncodeFixedI32(buf, pSub->epoch);
int32_t sz;
sz = taosArrayGetSize(pSub->availConsumer);
tlen += taosEncodeFixedI32(buf, sz);
for (int32_t i = 0; i < sz; i++) {
int64_t* pConsumerId = taosArrayGet(pSub->availConsumer, i);
tlen += taosEncodeFixedI64(buf, *pConsumerId);
}
sz = taosArrayGetSize(pSub->assigned);
tlen += taosEncodeFixedI32(buf, sz);
for (int32_t i = 0; i < sz; i++) {
SMqConsumerEp* pCEp = taosArrayGet(pSub->assigned, i);
tlen += tEncodeSMqConsumerEp(buf, pCEp);
}
sz = taosArrayGetSize(pSub->unassignedConsumer);
tlen += taosEncodeFixedI32(buf, sz);
for (int32_t i = 0; i < sz; i++) {
SMqConsumerEp* pCEp = taosArrayGet(pSub->unassignedConsumer, i);
tlen += tEncodeSMqConsumerEp(buf, pCEp);
}
sz = taosArrayGetSize(pSub->unassignedVg);
tlen += taosEncodeFixedI32(buf, sz);
for (int32_t i = 0; i < sz; i++) {
SMqConsumerEp* pCEp = taosArrayGet(pSub->unassignedVg, i);
tlen += tEncodeSMqConsumerEp(buf, pCEp);
}
return tlen;
}
static FORCE_INLINE void* tDecodeSubscribeObj(void* buf, SMqSubscribeObj* pSub) {
buf = taosDecodeStringTo(buf, pSub->key);
buf = taosDecodeFixedI32(buf, &pSub->epoch);
int32_t sz;
buf = taosDecodeFixedI32(buf, &sz);
pSub->assigned = taosArrayInit(sz, sizeof(int64_t));
if (pSub->assigned == NULL) {
return NULL;
}
for (int32_t i = 0; i < sz; i++) {
int64_t consumerId;
buf = taosDecodeFixedI64(buf, &consumerId);
taosArrayPush(pSub->assigned, &consumerId);
}
buf = taosDecodeFixedI32(buf, &sz);
pSub->unassignedConsumer = taosArrayInit(sz, sizeof(SMqConsumerEp));
if (pSub->unassignedConsumer == NULL) {
taosArrayDestroy(pSub->assigned);
return NULL;
}
for (int32_t i = 0; i < sz; i++) {
SMqConsumerEp cEp;
buf = tDecodeSMqConsumerEp(buf, &cEp);
taosArrayPush(pSub->unassignedConsumer, &cEp);
}
buf = taosDecodeFixedI32(buf, &sz);
pSub->unassignedVg = taosArrayInit(sz, sizeof(SMqConsumerEp));
if (pSub->unassignedVg == NULL) {
taosArrayDestroy(pSub->assigned);
taosArrayDestroy(pSub->unassignedConsumer);
return NULL;
}
for (int32_t i = 0; i < sz; i++) {
SMqConsumerEp cEp;
buf = tDecodeSMqConsumerEp(buf, &cEp);
taosArrayPush(pSub->unassignedVg, &cEp);
}
return buf;
}
typedef struct SMqCGroup {
char name[TSDB_CONSUMER_GROUP_LEN];
......@@ -358,8 +498,8 @@ typedef struct SMqTopicObj {
char *sql;
char *logicalPlan;
char *physicalPlan;
SHashObj *cgroups; // SHashObj<SMqCGroup>
SHashObj *consumers; // SHashObj<SMqConsumerObj>
//SHashObj *cgroups; // SHashObj<SMqCGroup>
//SHashObj *consumers; // SHashObj<SMqConsumerObj>
} SMqTopicObj;
// TODO: add cache and change name to id
......@@ -367,18 +507,93 @@ typedef struct SMqConsumerTopic {
char name[TSDB_TOPIC_FNAME_LEN];
int32_t epoch;
//TODO: replace with something with ep
SList *vgroups; // SList<int32_t>
//SList *vgroups; // SList<int32_t>
//vg assigned to the consumer on the topic
SArray *pVgInfo; // SArray<int32_t>
} SMqConsumerTopic;
static FORCE_INLINE SMqConsumerTopic* tNewConsumerTopic(int64_t consumerId, SMqTopicObj* pTopic, SMqSubscribeObj* pSub) {
SMqConsumerTopic* pCTopic = malloc(sizeof(SMqConsumerTopic));
if (pCTopic == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
strcpy(pCTopic->name, pTopic->name);
pCTopic->epoch = 0;
pCTopic->pVgInfo = taosArrayInit(0, sizeof(int32_t));
int32_t unassignedVgSz = taosArrayGetSize(pSub->unassignedVg);
if (unassignedVgSz > 0) {
SMqConsumerEp* pCEp = taosArrayPop(pSub->unassignedVg);
pCEp->consumerId = consumerId;
taosArrayPush(pCTopic->pVgInfo, &pCEp->vgId);
taosArrayPush(pSub->assigned, pCEp);
}
return pCTopic;
}
static FORCE_INLINE int32_t tEncodeSMqConsumerTopic(void** buf, SMqConsumerTopic* pConsumerTopic) {
int32_t tlen = 0;
tlen += taosEncodeString(buf, pConsumerTopic->name);
tlen += taosEncodeFixedI32(buf, pConsumerTopic->epoch);
int32_t sz = taosArrayGetSize(pConsumerTopic->pVgInfo);
tlen += taosEncodeFixedI32(buf, sz);
for (int32_t i = 0; i < sz; i++) {
int32_t* pVgInfo = taosArrayGet(pConsumerTopic->pVgInfo, i);
tlen += taosEncodeFixedI32(buf, *pVgInfo);
}
return tlen;
}
static FORCE_INLINE void* tDecodeSMqConsumerTopic(void* buf, SMqConsumerTopic* pConsumerTopic) {
buf = taosDecodeStringTo(buf, pConsumerTopic->name);
buf = taosDecodeFixedI32(buf, &pConsumerTopic->epoch);
int32_t sz;
buf = taosDecodeFixedI32(buf, &sz);
pConsumerTopic->pVgInfo = taosArrayInit(sz, sizeof(SMqConsumerTopic));
for (int32_t i = 0; i < sz; i++) {
int32_t vgInfo;
buf = taosDecodeFixedI32(buf, &vgInfo);
taosArrayPush(pConsumerTopic->pVgInfo, &vgInfo);
}
return buf;
}
typedef struct SMqConsumerObj {
int64_t consumerId;
SRWLatch lock;
char cgroup[TSDB_CONSUMER_GROUP_LEN];
SArray *topics; // SArray<SMqConsumerTopic>
SHashObj *topicHash; //SHashObj<SMqConsumerTopic>
//SHashObj *topicHash; //SHashObj<SMqTopicObj>
} SMqConsumerObj;
static FORCE_INLINE int32_t tEncodeSMqConsumerObj(void** buf, const SMqConsumerObj* pConsumer) {
int32_t tlen = 0;
tlen += taosEncodeFixedI64(buf, pConsumer->consumerId);
tlen += taosEncodeString(buf, pConsumer->cgroup);
int32_t sz = taosArrayGetSize(pConsumer->topics);
tlen += taosEncodeFixedI32(buf, sz);
for (int32_t i = 0; i < sz; i++) {
SMqConsumerTopic* pConsumerTopic = taosArrayGet(pConsumer->topics, i);
tlen += tEncodeSMqConsumerTopic(buf, pConsumerTopic);
}
return tlen;
}
static FORCE_INLINE void* tDecodeSMqConsumerObj(void* buf, SMqConsumerObj* pConsumer) {
buf = taosDecodeFixedI64(buf, &pConsumer->consumerId);
buf = taosDecodeStringTo(buf, pConsumer->cgroup);
int32_t sz;
buf = taosDecodeFixedI32(buf, &sz);
pConsumer->topics = taosArrayInit(sz, sizeof(SMqConsumerObj));
for (int32_t i = 0; i < sz; i++) {
SMqConsumerTopic cTopic;
buf = tDecodeSMqConsumerTopic(buf, &cTopic);
taosArrayPush(pConsumer->topics, &cTopic);
}
return buf;
}
typedef struct SMqSubConsumerObj {
int64_t consumerUid; // if -1, unassigned
SList *vgId; // SList<int32_t>
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_MND_SUBSCRIBE_H_
#define _TD_MND_SUBSCRIBE_H_
#include "mndInt.h"
#ifdef __cplusplus
extern "C" {
#endif
int32_t mndInitSubscribe(SMnode *pMnode);
void mndCleanupSubscribe(SMnode *pMnode);
SMqSubscribeObj *mndAcquireSubscribe(SMnode *pMnode, char *CGroup, char *topicName);
void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub);
SSdbRaw *mndSubscribeActionEncode(SMqSubscribeObj *pSub);
SSdbRow *mndSubscribeActionDecode(SSdbRaw *pRaw);
#ifdef __cplusplus
}
#endif
#endif /*_TD_MND_SUBSCRIBE_H_*/
......@@ -16,9 +16,18 @@
#define _DEFAULT_SOURCE
#include "mndAuth.h"
int32_t mndInitAuth(SMnode *pMnode) { return 0; }
void mndCleanupAuth(SMnode *pMnode) {}
static int32_t mndProcessAuthReq(SMnodeMsg *pReq);
int32_t mndRetriveAuth(SMnode *pMnode, char *user, char *spi, char *encrypt, char *secret, char *ckey) {
int32_t mndInitAuth(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_MND_AUTH, mndProcessAuthReq);
return 0;
}
void mndCleanupAuth(SMnode *pMnode) {}
int32_t mndRetriveAuth(SMnode *pMnode, char *user, char *spi, char *encrypt, char *secret, char *ckey) { return 0; }
static int32_t mndProcessAuthReq(SMnodeMsg *pReq) {
mDebug("user:%s, auth req is processed", pReq->user);
return 0;
}
\ No newline at end of file
......@@ -30,24 +30,14 @@
#define MND_CONSUMER_VER_NUMBER 1
#define MND_CONSUMER_RESERVE_SIZE 64
static SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer);
static SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw);
static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer);
static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer);
static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pConsumer, SMqConsumerObj *pNewConsumer);
static int32_t mndProcessCreateConsumerMsg(SMnodeMsg *pMsg);
static int32_t mndProcessDropConsumerMsg(SMnodeMsg *pMsg);
static int32_t mndProcessDropConsumerInRsp(SMnodeMsg *pMsg);
static int32_t mndProcessConsumerMetaMsg(SMnodeMsg *pMsg);
static int32_t mndGetConsumerMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta);
static int32_t mndRetrieveConsumer(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows);
static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter);
static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg);
static int32_t mndProcessSubscribeRsp(SMnodeMsg *pMsg);
static int32_t mndProcessSubscribeInternalReq(SMnodeMsg *pMsg);
static int32_t mndProcessSubscribeInternalRsp(SMnodeMsg *pMsg);
int32_t mndInitConsumer(SMnode *pMnode) {
SSdbTable table = {.sdbType = SDB_CONSUMER,
.keyType = SDB_KEY_BINARY,
......@@ -57,26 +47,29 @@ int32_t mndInitConsumer(SMnode *pMnode) {
.updateFp = (SdbUpdateFp)mndConsumerActionUpdate,
.deleteFp = (SdbDeleteFp)mndConsumerActionDelete};
mndSetMsgHandle(pMnode, TDMT_MND_SUBSCRIBE, mndProcessSubscribeReq);
/*mndSetMsgHandle(pMnode, TDMT_MND_SUBSCRIBE_RSP, mndProcessSubscribeRsp);*/
/*mndSetMsgHandle(pMnode, TDMT_VND_SUBSCRIBE, mndProcessSubscribeInternalReq);*/
mndSetMsgHandle(pMnode, TDMT_VND_SUBSCRIBE_RSP, mndProcessSubscribeInternalRsp);
return sdbSetTable(pMnode->pSdb, table);
}
void mndCleanupConsumer(SMnode *pMnode) {}
static void *mndBuildMqVGroupSetReq(SMnode *pMnode, char *topicName, int32_t vgId, int64_t consumerId, char *cgroup) {
return 0;
}
static SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) {
int32_t size = sizeof(SMqConsumerObj) + MND_CONSUMER_RESERVE_SIZE;
SSdbRaw *pRaw = sdbAllocRaw(SDB_CONSUMER, MND_CONSUMER_VER_NUMBER, size);
SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
int32_t tlen = tEncodeSMqConsumerObj(NULL, pConsumer);
SSdbRaw *pRaw = sdbAllocRaw(SDB_CONSUMER, MND_CONSUMER_VER_NUMBER, tlen);
if (pRaw == NULL) goto CM_ENCODE_OVER;
void* buf = malloc(tlen);
if (buf == NULL) goto CM_ENCODE_OVER;
void* abuf = buf;
tEncodeSMqConsumerObj(&abuf, pConsumer);
int32_t dataPos = 0;
SDB_SET_INT32(pRaw, dataPos, tlen, CM_ENCODE_OVER);
SDB_SET_BINARY(pRaw, dataPos, buf, tlen, CM_ENCODE_OVER);
#if 0
int32_t topicNum = taosArrayGetSize(pConsumer->topics);
SDB_SET_INT64(pRaw, dataPos, pConsumer->consumerId, CM_ENCODE_OVER);
int32_t len = strlen(pConsumer->cgroup);
......@@ -101,10 +94,13 @@ static SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) {
/*SDB_SET_INT64(pRaw, dataPos, 0[> change to list item <]);*/
}
}
#endif
SDB_SET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CM_ENCODE_OVER);
SDB_SET_DATALEN(pRaw, dataPos, CM_ENCODE_OVER);
terrno = TSDB_CODE_SUCCESS;
CM_ENCODE_OVER:
if (terrno != 0) {
mError("consumer:%ld, failed to encode to raw:%p since %s", pConsumer->consumerId, pRaw, terrstr());
......@@ -116,7 +112,7 @@ CM_ENCODE_OVER:
return pRaw;
}
static SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) {
SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
int8_t sver = 0;
......@@ -127,18 +123,27 @@ static SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) {
goto CONSUME_DECODE_OVER;
}
int32_t size = sizeof(SMqConsumerObj);
SSdbRow *pRow = sdbAllocRow(size);
SSdbRow *pRow = sdbAllocRow(sizeof(SMqConsumerObj));
if (pRow == NULL) goto CONSUME_DECODE_OVER;
SMqConsumerObj *pConsumer = sdbGetRowObj(pRow);
if (pConsumer == NULL) goto CONSUME_DECODE_OVER;
int32_t dataPos = 0;
SDB_GET_INT64(pRaw, dataPos, &pConsumer->consumerId, CONSUME_DECODE_OVER);
int32_t len, topicNum;
int32_t len;
SDB_GET_INT32(pRaw, dataPos, &len, CONSUME_DECODE_OVER);
SDB_GET_BINARY(pRaw, dataPos, pConsumer->cgroup, len, CONSUME_DECODE_OVER);
void* buf = malloc(len);
if (buf == NULL) goto CONSUME_DECODE_OVER;
SDB_GET_BINARY(pRaw, dataPos, buf, len, CONSUME_DECODE_OVER);
tDecodeSMqConsumerObj(buf, pConsumer);
SDB_GET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CONSUME_DECODE_OVER);
terrno = TSDB_CODE_SUCCESS;
#if 0
SDB_GET_INT32(pRaw, dataPos, &topicNum, CONSUME_DECODE_OVER);
for (int i = 0; i < topicNum; i++) {
int32_t topicLen;
......@@ -154,6 +159,7 @@ static SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) {
int32_t vgSize;
SDB_GET_INT32(pRaw, dataPos, &vgSize, CONSUME_DECODE_OVER);
}
#endif
CONSUME_DECODE_OVER:
if (terrno != 0) {
......@@ -162,8 +168,6 @@ CONSUME_DECODE_OVER:
return NULL;
}
/*SDB_GET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE);*/
return pRow;
}
......@@ -201,214 +205,13 @@ void mndReleaseConsumer(SMnode *pMnode, SMqConsumerObj *pConsumer) {
sdbRelease(pSdb, pConsumer);
}
static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
char *msgStr = pMsg->rpcMsg.pCont;
SCMSubscribeReq subscribe;
tDeserializeSCMSubscribeReq(msgStr, &subscribe);
int64_t consumerId = subscribe.consumerId;
char *consumerGroup = subscribe.consumerGroup;
int32_t cgroupLen = strlen(consumerGroup);
SArray *newSub = NULL;
int newTopicNum = subscribe.topicNum;
if (newTopicNum) {
newSub = taosArrayInit(newTopicNum, sizeof(SMqConsumerTopic));
}
SMqConsumerTopic *pConsumerTopics = calloc(newTopicNum, sizeof(SMqConsumerTopic));
if (pConsumerTopics == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
for (int i = 0; i < newTopicNum; i++) {
char *newTopicName = taosArrayGetP(newSub, i);
SMqConsumerTopic *pConsumerTopic = &pConsumerTopics[i];
strcpy(pConsumerTopic->name, newTopicName);
pConsumerTopic->vgroups = tdListNew(sizeof(int64_t));
}
taosArrayAddBatch(newSub, pConsumerTopics, newTopicNum);
free(pConsumerTopics);
taosArraySortString(newSub, taosArrayCompareString);
SArray *oldSub = NULL;
int oldTopicNum = 0;
// create consumer if not exist
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId);
if (pConsumer == NULL) {
// create consumer
pConsumer = malloc(sizeof(SMqConsumerObj));
if (pConsumer == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pConsumer->consumerId = consumerId;
strcpy(pConsumer->cgroup, consumerGroup);
} else {
oldSub = pConsumer->topics;
oldTopicNum = taosArrayGetSize(oldSub);
}
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg);
if (pTrans == NULL) {
//TODO: free memory
return -1;
}
int i = 0, j = 0;
while (i < newTopicNum || j < oldTopicNum) {
SMqConsumerTopic *pOldTopic = NULL;
SMqConsumerTopic *pNewTopic = NULL;
if (i >= newTopicNum) {
// encode unset topic msg to all vnodes related to that topic
pOldTopic = taosArrayGet(oldSub, j);
j++;
} else if (j >= oldTopicNum) {
pNewTopic = taosArrayGet(newSub, i);
i++;
} else {
pNewTopic = taosArrayGet(newSub, i);
pOldTopic = taosArrayGet(oldSub, j);
char *newName = pNewTopic->name;
char *oldName = pOldTopic->name;
int comp = compareLenPrefixedStr(newName, oldName);
if (comp == 0) {
// do nothing
pOldTopic = pNewTopic = NULL;
i++;
j++;
continue;
} else if (comp < 0) {
pOldTopic = NULL;
i++;
} else {
pNewTopic = NULL;
j++;
}
}
if (pOldTopic != NULL) {
//cancel subscribe of that old topic
ASSERT(pNewTopic == NULL);
char *oldTopicName = pOldTopic->name;
SList *vgroups = pOldTopic->vgroups;
SListIter iter;
tdListInitIter(vgroups, &iter, TD_LIST_FORWARD);
SListNode *pn;
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, oldTopicName);
ASSERT(pTopic != NULL);
SMqCGroup *pGroup = taosHashGet(pTopic->cgroups, consumerGroup, cgroupLen);
while ((pn = tdListNext(&iter)) != NULL) {
int32_t vgId = *(int64_t *)pn->data;
// acquire and get epset
SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
// TODO what time to release?
if (pVgObj == NULL) {
// TODO handle error
continue;
}
//build reset msg
void *pMqVgSetReq = mndBuildMqVGroupSetReq(pMnode, oldTopicName, vgId, consumerId, consumerGroup);
// TODO:serialize
if (pMsg == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
STransAction action = {0};
action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
action.pCont = pMqVgSetReq;
action.contLen = 0; // TODO
action.msgType = TDMT_VND_MQ_SET_CONN;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
free(pMqVgSetReq);
mndTransDrop(pTrans);
// TODO free
return -1;
}
}
//delete data in mnode
taosHashRemove(pTopic->cgroups, consumerGroup, cgroupLen);
mndReleaseTopic(pMnode, pTopic);
} else if (pNewTopic != NULL) {
// save subscribe info to mnode
ASSERT(pOldTopic == NULL);
char *newTopicName = pNewTopic->name;
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, newTopicName);
ASSERT(pTopic != NULL);
SMqCGroup *pGroup = taosHashGet(pTopic->cgroups, consumerGroup, cgroupLen);
if (pGroup == NULL) {
// add new group
pGroup = malloc(sizeof(SMqCGroup));
if (pGroup == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pGroup->consumerIds = tdListNew(sizeof(int64_t));
if (pGroup->consumerIds == NULL) {
free(pGroup);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pGroup->status = 0;
// add into cgroups
taosHashPut(pTopic->cgroups, consumerGroup, cgroupLen, pGroup, sizeof(SMqCGroup));
}
/*taosHashPut(pTopic->consumers, &pConsumer->consumerId, sizeof(int64_t), pConsumer, sizeof(SMqConsumerObj));*/
// put the consumer into list
// rebalance will be triggered by timer
tdListAppend(pGroup->consumerIds, &consumerId);
SSdbRaw *pTopicRaw = mndTopicActionEncode(pTopic);
sdbSetRawStatus(pTopicRaw, SDB_STATUS_READY);
// TODO: error handling
mndTransAppendRedolog(pTrans, pTopicRaw);
mndReleaseTopic(pMnode, pTopic);
} else {
ASSERT(0);
}
}
// destroy old sub
taosArrayDestroy(oldSub);
// put new sub into consumerobj
pConsumer->topics = newSub;
// persist consumerObj
SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pConsumer);
sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY);
// TODO: error handling
mndTransAppendRedolog(pTrans, pConsumerRaw);
if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
mndReleaseConsumer(pMnode, pConsumer);
return -1;
}
// TODO: free memory
mndTransDrop(pTrans);
mndReleaseConsumer(pMnode, pConsumer);
return 0;
}
static int32_t mndProcessSubscribeInternalRsp(SMnodeMsg *pMsg) { return 0; }
#if 0
static int32_t mndProcessConsumerMetaMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
STableInfoReq *pInfo = pMsg->rpcMsg.pCont;
mDebug("consumer:%s, start to retrieve meta", pInfo->tableFname);
#if 0
SDbObj *pDb = mndAcquireDbByConsumer(pMnode, pInfo->tableFname);
if (pDb == NULL) {
terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
......@@ -463,7 +266,6 @@ static int32_t mndProcessConsumerMetaMsg(SMnodeMsg *pMsg) {
pMsg->contLen = contLen;
mDebug("consumer:%s, meta is retrieved, cols:%d tags:%d", pInfo->tableFname, pConsumer->numOfColumns, pConsumer->numOfTags);
#endif
return 0;
}
......@@ -546,3 +348,4 @@ static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter) {
SSdb *pSdb = pMnode->pSdb;
sdbCancelFetch(pSdb, pIter);
}
#endif
......@@ -273,6 +273,7 @@ static int32_t mndSaveQueryStreamList(SConnObj *pConn, SHeartBeatReq *pReq) {
}
static SClientHbRsp* mndMqHbBuildRsp(SMnode* pMnode, SClientHbReq* pReq) {
#if 0
SClientHbRsp* pRsp = malloc(sizeof(SClientHbRsp));
if (pRsp == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
......@@ -332,6 +333,8 @@ static SClientHbRsp* mndMqHbBuildRsp(SMnode* pMnode, SClientHbReq* pReq) {
pRsp->body = buf;
pRsp->bodyLen = tlen;
return pRsp;
#endif
return NULL;
}
static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) {
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "mndConsumer.h"
#include "mndDb.h"
#include "mndDnode.h"
#include "mndMnode.h"
#include "mndShow.h"
#include "mndStb.h"
#include "mndSubscribe.h"
#include "mndTopic.h"
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
#include "tcompare.h"
#include "tname.h"
#define MND_SUBSCRIBE_VER_NUMBER 1
#define MND_SUBSCRIBE_RESERVE_SIZE 64
static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *);
static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw);
static int32_t mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *);
static int32_t mndSubActionDelete(SSdb *pSdb, SMqSubscribeObj *);
static int32_t mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubscribeObj *pNewSub);
static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg);
static int32_t mndProcessSubscribeRsp(SMnodeMsg *pMsg);
static int32_t mndProcessSubscribeInternalReq(SMnodeMsg *pMsg);
static int32_t mndProcessSubscribeInternalRsp(SMnodeMsg *pMsg);
int32_t mndInitSubscribe(SMnode *pMnode) {
SSdbTable table = {.sdbType = SDB_SUBSCRIBE,
.keyType = SDB_KEY_BINARY,
.encodeFp = (SdbEncodeFp)mndSubActionEncode,
.decodeFp = (SdbDecodeFp)mndSubActionDecode,
.insertFp = (SdbInsertFp)mndSubActionInsert,
.updateFp = (SdbUpdateFp)mndSubActionUpdate,
.deleteFp = (SdbDeleteFp)mndSubActionDelete};
mndSetMsgHandle(pMnode, TDMT_MND_SUBSCRIBE, mndProcessSubscribeReq);
/*mndSetMsgHandle(pMnode, TDMT_MND_SUBSCRIBE_RSP, mndProcessSubscribeRsp);*/
/*mndSetMsgHandle(pMnode, TDMT_VND_SUBSCRIBE, mndProcessSubscribeInternalReq);*/
mndSetMsgHandle(pMnode, TDMT_VND_SUBSCRIBE_RSP, mndProcessSubscribeInternalRsp);
return sdbSetTable(pMnode->pSdb, table);
}
static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unassignedVg) {
SMqConsumerEp CEp;
CEp.lastConsumerHbTs = CEp.lastVgHbTs = -1;
int32_t sz;
SVgObj *pVgroup = NULL;
SSdb *pSdb = pMnode->pSdb;
void *pIter = sdbFetch(pSdb, SDB_VGROUP, NULL, (void **)&pVgroup);
while (pIter != NULL) {
if (pVgroup->dbUid == pTopic->dbUid) {
CEp.epset = mndGetVgroupEpset(pMnode, pVgroup);
CEp.vgId = pVgroup->vgId;
taosArrayPush(unassignedVg, &CEp);
}
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
}
return 0;
}
static int mndBuildMqSetConsumerVgReq(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer,
SMqConsumerTopic *pConsumerTopic) {
int32_t sz = taosArrayGetSize(pConsumerTopic->pVgInfo);
for (int32_t i = 0; i < sz; i++) {
int32_t vgId = *(int32_t *)taosArrayGet(pConsumerTopic->pVgInfo, i);
SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
SMqSetCVgReq req = {
.vgId = vgId,
.consumerId = pConsumer->consumerId,
};
strcpy(req.cGroup, pConsumer->cgroup);
strcpy(req.topicName, pConsumerTopic->name);
int32_t tlen = tEncodeSMqSetCVgReq(NULL, &req);
void *reqStr = malloc(tlen);
if (reqStr == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
void *abuf = reqStr;
tEncodeSMqSetCVgReq(abuf, &req);
STransAction action = {0};
action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
action.pCont = reqStr;
action.contLen = tlen;
action.msgType = TDMT_VND_MQ_SET_CONN;
mndReleaseVgroup(pMnode, pVgObj);
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
free(reqStr);
return -1;
}
}
return 0;
}
void mndCleanupSubscribe(SMnode *pMnode) {}
static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *pSub) {
int32_t tlen = tEncodeSubscribeObj(NULL, pSub);
int32_t size = tlen + MND_SUBSCRIBE_RESERVE_SIZE;
SSdbRaw *pRaw = sdbAllocRaw(SDB_SUBSCRIBE, MND_SUBSCRIBE_VER_NUMBER, size);
if (pRaw == NULL) goto SUB_ENCODE_OVER;
void *buf = malloc(tlen);
if (buf == NULL) {
goto SUB_ENCODE_OVER;
}
void *abuf = buf;
tEncodeSubscribeObj(&buf, pSub);
int32_t dataPos = 0;
SDB_SET_INT32(pRaw, dataPos, tlen, SUB_ENCODE_OVER);
SDB_SET_BINARY(pRaw, dataPos, buf, tlen, SUB_ENCODE_OVER);
SDB_SET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_ENCODE_OVER);
SDB_SET_DATALEN(pRaw, dataPos, SUB_ENCODE_OVER);
SUB_ENCODE_OVER:
if (terrno != 0) {
mError("subscribe:%s, failed to encode to raw:%p since %s", pSub->key, pRaw, terrstr());
sdbFreeRaw(pRaw);
return NULL;
}
mTrace("subscribe:%s, encode to raw:%p, row:%p", pSub->key, pRaw, pSub);
return pRaw;
}
static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto SUB_DECODE_OVER;
if (sver != MND_SUBSCRIBE_VER_NUMBER) {
terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
goto SUB_DECODE_OVER;
}
int32_t size = sizeof(SMqSubscribeObj);
SSdbRow *pRow = sdbAllocRow(size);
if (pRow == NULL) goto SUB_DECODE_OVER;
SMqSubscribeObj *pSub = sdbGetRowObj(pRow);
if (pSub == NULL) goto SUB_DECODE_OVER;
int32_t dataPos = 0;
int32_t tlen;
void *buf = malloc(tlen + 1);
if (buf == NULL) goto SUB_DECODE_OVER;
SDB_GET_INT32(pRaw, dataPos, &tlen, SUB_DECODE_OVER);
SDB_GET_BINARY(pRaw, dataPos, buf, tlen, SUB_DECODE_OVER);
SDB_GET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_DECODE_OVER);
if (tDecodeSubscribeObj(buf, pSub) == NULL) {
goto SUB_DECODE_OVER;
}
SUB_DECODE_OVER:
if (terrno != 0) {
mError("subscribe:%s, failed to decode from raw:%p since %s", pSub->key, pRaw, terrstr());
// TODO free subscribeobj
tfree(pRow);
return NULL;
}
return pRow;
}
static int32_t mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *pSub) {
mTrace("subscribe:%s, perform insert action", pSub->key);
return 0;
}
static int32_t mndSubActionDelete(SSdb *pSdb, SMqSubscribeObj *pSub) {
mTrace("subscribe:%s, perform delete action", pSub->key);
return 0;
}
static int32_t mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubscribeObj *pNewSub) {
mTrace("subscribe:%s, perform update action", pOldSub->key);
return 0;
}
static void *mndBuildMqVGroupSetReq(SMnode *pMnode, char *topicName, int32_t vgId, int64_t consumerId, char *cgroup) {
return 0;
}
static char *mndMakeSubscribeKey(char *cgroup, char *topicName) {
char *key = malloc(TSDB_SHOW_SUBQUERY_LEN);
if (key == NULL) {
return NULL;
}
int tlen = strlen(cgroup);
memcpy(key, cgroup, tlen);
key[tlen] = ':';
strcpy(key + tlen + 1, topicName);
return key;
}
SMqSubscribeObj *mndAcquireSubscribe(SMnode *pMnode, char *cgroup, char *topicName) {
SSdb *pSdb = pMnode->pSdb;
char *key = mndMakeSubscribeKey(cgroup, topicName);
SMqSubscribeObj *pSub = sdbAcquire(pSdb, SDB_SUBSCRIBE, key);
free(key);
if (pSub == NULL) {
/*terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;*/
}
return pSub;
}
void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub) {
SSdb *pSdb = pMnode->pSdb;
sdbRelease(pSdb, pSub);
}
static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
char *msgStr = pMsg->rpcMsg.pCont;
SCMSubscribeReq subscribe;
tDeserializeSCMSubscribeReq(msgStr, &subscribe);
int64_t consumerId = subscribe.consumerId;
char *consumerGroup = subscribe.consumerGroup;
int32_t cgroupLen = strlen(consumerGroup);
SArray *newSub = subscribe.topicNames;
int newTopicNum = subscribe.topicNum;
taosArraySortString(newSub, taosArrayCompareString);
SArray *oldSub = NULL;
int oldTopicNum = 0;
// create consumer if not exist
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId);
if (pConsumer == NULL) {
// create consumer
pConsumer = malloc(sizeof(SMqConsumerObj));
if (pConsumer == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pConsumer->consumerId = consumerId;
strcpy(pConsumer->cgroup, consumerGroup);
taosInitRWLatch(&pConsumer->lock);
} else {
oldSub = pConsumer->topics;
}
pConsumer->topics = taosArrayInit(newTopicNum, sizeof(SMqConsumerTopic));
if (oldSub != NULL) {
oldTopicNum = taosArrayGetSize(oldSub);
}
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg);
if (pTrans == NULL) {
// TODO: free memory
return -1;
}
int i = 0, j = 0;
while (i < newTopicNum || j < oldTopicNum) {
char* newTopicName = NULL;
char* oldTopicName = NULL;
if (i >= newTopicNum) {
// encode unset topic msg to all vnodes related to that topic
oldTopicName = ((SMqConsumerTopic*)taosArrayGet(oldSub, j))->name;
j++;
} else if (j >= oldTopicNum) {
newTopicName = taosArrayGet(newSub, i);
i++;
} else {
newTopicName = taosArrayGet(newSub, i);
oldTopicName = ((SMqConsumerTopic*)taosArrayGet(oldSub, j))->name;
int comp = compareLenPrefixedStr(newTopicName, oldTopicName);
if (comp == 0) {
// do nothing
oldTopicName = newTopicName = NULL;
i++;
j++;
continue;
} else if (comp < 0) {
oldTopicName = NULL;
i++;
} else {
newTopicName = NULL;
j++;
}
}
if (oldTopicName != NULL) {
#if 0
// cancel subscribe of that old topic
ASSERT(pNewTopic == NULL);
char *oldTopicName = pOldTopic->name;
SList *vgroups = pOldTopic->vgroups;
SListIter iter;
tdListInitIter(vgroups, &iter, TD_LIST_FORWARD);
SListNode *pn;
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, oldTopicName);
ASSERT(pTopic != NULL);
SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, consumerGroup, oldTopicName);
SMqCGroup *pGroup = taosHashGet(pTopic->cgroups, consumerGroup, cgroupLen);
while ((pn = tdListNext(&iter)) != NULL) {
int32_t vgId = *(int64_t *)pn->data;
// acquire and get epset
SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
// TODO what time to release?
if (pVgObj == NULL) {
// TODO handle error
continue;
}
// build reset msg
void *pMqVgSetReq = mndBuildMqVGroupSetReq(pMnode, oldTopicName, vgId, consumerId, consumerGroup);
// TODO:serialize
if (pMsg == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
STransAction action = {0};
action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
action.pCont = pMqVgSetReq;
action.contLen = 0; // TODO
action.msgType = TDMT_VND_MQ_SET_CONN;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
free(pMqVgSetReq);
mndTransDrop(pTrans);
// TODO free
return -1;
}
}
// delete data in mnode
taosHashRemove(pTopic->cgroups, consumerGroup, cgroupLen);
mndReleaseSubscribe(pMnode, pSub);
mndReleaseTopic(pMnode, pTopic);
#endif
} else if (newTopicName != NULL) {
// save subscribe info to mnode
ASSERT(oldTopicName == NULL);
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, newTopicName);
if (pTopic == NULL) {
/*terrno = */
continue;
}
SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, consumerGroup, newTopicName);
if (pSub == NULL) {
pSub = tNewSubscribeObj();
if (pSub == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
// set unassigned vg
mndInitUnassignedVg(pMnode, pTopic, pSub->unassignedVg);
}
SMqConsumerTopic *pConsumerTopic = tNewConsumerTopic(consumerId, pTopic, pSub);
taosArrayPush(pConsumer->topics, pConsumerTopic);
if (taosArrayGetSize(pConsumerTopic->pVgInfo) > 0) {
int32_t vgId = *(int32_t *)taosArrayGetLast(pConsumerTopic->pVgInfo);
// send setmsg to vnode
if (mndBuildMqSetConsumerVgReq(pMnode, pTrans, pConsumer, pConsumerTopic) < 0) {
// TODO
return -1;
}
}
taosArrayDestroy(pConsumerTopic->pVgInfo);
free(pConsumerTopic);
#if 0
SMqCGroup *pGroup = taosHashGet(pTopic->cgroups, consumerGroup, cgroupLen);
if (pGroup == NULL) {
// add new group
pGroup = malloc(sizeof(SMqCGroup));
if (pGroup == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pGroup->consumerIds = tdListNew(sizeof(int64_t));
if (pGroup->consumerIds == NULL) {
free(pGroup);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pGroup->status = 0;
// add into cgroups
taosHashPut(pTopic->cgroups, consumerGroup, cgroupLen, pGroup, sizeof(SMqCGroup));
}
/*taosHashPut(pTopic->consumers, &pConsumer->consumerId, sizeof(int64_t), pConsumer, sizeof(SMqConsumerObj));*/
// put the consumer into list
// rebalance will be triggered by timer
tdListAppend(pGroup->consumerIds, &consumerId);
SSdbRaw *pTopicRaw = mndTopicActionEncode(pTopic);
sdbSetRawStatus(pTopicRaw, SDB_STATUS_READY);
// TODO: error handling
mndTransAppendRedolog(pTrans, pTopicRaw);
#endif
mndReleaseTopic(pMnode, pTopic);
mndReleaseSubscribe(pMnode, pSub);
}
}
// part3. persist consumerObj
// destroy old sub
if (oldSub) taosArrayDestroy(oldSub);
// put new sub into consumerobj
// persist consumerObj
SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pConsumer);
sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY);
// TODO: error handling
mndTransAppendRedolog(pTrans, pConsumerRaw);
if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
mndReleaseConsumer(pMnode, pConsumer);
return -1;
}
// TODO: free memory
if (newSub) taosArrayDestroy(newSub);
mndTransDrop(pTrans);
mndReleaseConsumer(pMnode, pConsumer);
return 0;
}
static int32_t mndProcessSubscribeInternalRsp(SMnodeMsg *pMsg) { return 0; }
static int32_t mndProcessConsumerMetaMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
STableInfoReq *pInfo = pMsg->rpcMsg.pCont;
mDebug("subscribe:%s, start to retrieve meta", pInfo->tableFname);
#if 0
SDbObj *pDb = mndAcquireDbByConsumer(pMnode, pInfo->tableFname);
if (pDb == NULL) {
terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
mError("consumer:%s, failed to retrieve meta since %s", pInfo->tableFname, terrstr());
return -1;
}
SConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pInfo->tableFname);
if (pConsumer == NULL) {
mndReleaseDb(pMnode, pDb);
terrno = TSDB_CODE_MND_INVALID_CONSUMER;
mError("consumer:%s, failed to get meta since %s", pInfo->tableFname, terrstr());
return -1;
}
taosRLockLatch(&pConsumer->lock);
int32_t totalCols = pConsumer->numOfColumns + pConsumer->numOfTags;
int32_t contLen = sizeof(STableMetaRsp) + totalCols * sizeof(SSchema);
STableMetaRsp *pMeta = rpcMallocCont(contLen);
if (pMeta == NULL) {
taosRUnLockLatch(&pConsumer->lock);
mndReleaseDb(pMnode, pDb);
mndReleaseConsumer(pMnode, pConsumer);
terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("consumer:%s, failed to get meta since %s", pInfo->tableFname, terrstr());
return -1;
}
memcpy(pMeta->consumerFname, pConsumer->name, TSDB_TABLE_FNAME_LEN);
pMeta->numOfTags = htonl(pConsumer->numOfTags);
pMeta->numOfColumns = htonl(pConsumer->numOfColumns);
pMeta->precision = pDb->cfg.precision;
pMeta->tableType = TSDB_SUPER_TABLE;
pMeta->update = pDb->cfg.update;
pMeta->sversion = htonl(pConsumer->version);
pMeta->tuid = htonl(pConsumer->uid);
for (int32_t i = 0; i < totalCols; ++i) {
SSchema *pSchema = &pMeta->pSchema[i];
SSchema *pSrcSchema = &pConsumer->pSchema[i];
memcpy(pSchema->name, pSrcSchema->name, TSDB_COL_NAME_LEN);
pSchema->type = pSrcSchema->type;
pSchema->colId = htonl(pSrcSchema->colId);
pSchema->bytes = htonl(pSrcSchema->bytes);
}
taosRUnLockLatch(&pConsumer->lock);
mndReleaseDb(pMnode, pDb);
mndReleaseConsumer(pMnode, pConsumer);
pMsg->pCont = pMeta;
pMsg->contLen = contLen;
mDebug("consumer:%s, meta is retrieved, cols:%d tags:%d", pInfo->tableFname, pConsumer->numOfColumns, pConsumer->numOfTags);
#endif
return 0;
}
static int32_t mndGetNumOfConsumers(SMnode *pMnode, char *dbName, int32_t *pNumOfConsumers) {
SSdb *pSdb = pMnode->pSdb;
SDbObj *pDb = mndAcquireDb(pMnode, dbName);
if (pDb == NULL) {
terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
return -1;
}
int32_t numOfConsumers = 0;
void *pIter = NULL;
while (1) {
SMqConsumerObj *pConsumer = NULL;
pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
if (pIter == NULL) break;
numOfConsumers++;
sdbRelease(pSdb, pConsumer);
}
*pNumOfConsumers = numOfConsumers;
return 0;
}
static int32_t mndGetConsumerMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta) {
SMnode *pMnode = pMsg->pMnode;
SSdb *pSdb = pMnode->pSdb;
if (mndGetNumOfConsumers(pMnode, pShow->db, &pShow->numOfRows) != 0) {
return -1;
}
int32_t cols = 0;
SSchema *pSchema = pMeta->pSchema;
pShow->bytes[cols] = TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "name");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 8;
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
strcpy(pSchema[cols].name, "create_time");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT;
strcpy(pSchema[cols].name, "columns");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT;
strcpy(pSchema[cols].name, "tags");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
cols++;
pMeta->numOfColumns = htonl(cols);
pShow->numOfColumns = cols;
pShow->offset[0] = 0;
for (int32_t i = 1; i < cols; ++i) {
pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
}
pShow->numOfRows = sdbGetSize(pSdb, SDB_CONSUMER);
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
strcpy(pMeta->tbFname, mndShowStr(pShow->type));
return 0;
}
static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter) {
SSdb *pSdb = pMnode->pSdb;
sdbCancelFetch(pSdb, pIter);
}
......@@ -79,8 +79,6 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
SDB_SET_BINARY(pRaw, dataPos, pTopic->logicalPlan, logicalPlanLen, TOPIC_ENCODE_OVER);
int32_t physicalPlanLen = strlen(pTopic->physicalPlan) + 1;
pTopic->physicalPlan = calloc(physicalPlanLen, sizeof(char));
if (pTopic->physicalPlan == NULL) goto TOPIC_ENCODE_OVER;
SDB_SET_INT32(pRaw, dataPos, strlen(pTopic->physicalPlan)+1, TOPIC_ENCODE_OVER);
SDB_SET_BINARY(pRaw, dataPos, pTopic->physicalPlan, physicalPlanLen, TOPIC_ENCODE_OVER);
......@@ -92,12 +90,6 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
TOPIC_ENCODE_OVER:
if (terrno != TSDB_CODE_SUCCESS) {
mError("topic:%s, failed to encode to raw:%p since %s", pTopic->name, pRaw, terrstr());
/*if (pTopic->logicalPlan) {*/
/*free(pTopic->logicalPlan);*/
/*}*/
/*if (pTopic->physicalPlan) {*/
/*free(pTopic->physicalPlan);*/
/*}*/
sdbFreeRaw(pRaw);
return NULL;
}
......@@ -138,7 +130,7 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
SDB_GET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_DECODE_OVER);
SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER);
pTopic->logicalPlan = calloc(len+1, sizeof(char));
pTopic->logicalPlan = calloc(len + 1, sizeof(char));
if (pTopic->logicalPlan == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto TOPIC_DECODE_OVER;
......@@ -146,7 +138,7 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
SDB_GET_BINARY(pRaw, dataPos, pTopic->logicalPlan, len+1, TOPIC_DECODE_OVER);
SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER);
pTopic->logicalPlan = calloc(len + 1, sizeof(char));
pTopic->physicalPlan = calloc(len + 1, sizeof(char));
if (pTopic->physicalPlan == NULL) {
free(pTopic->logicalPlan);
terrno = TSDB_CODE_OUT_OF_MEMORY;
......@@ -154,7 +146,7 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
}
SDB_GET_BINARY(pRaw, dataPos, pTopic->physicalPlan, len+1, TOPIC_DECODE_OVER);
SDB_GET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_DECODE_OVER)
SDB_GET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_DECODE_OVER);
terrno = TSDB_CODE_SUCCESS;
......
......@@ -38,10 +38,10 @@ const char *sdbTableName(ESdbType type) {
return "auth";
case SDB_ACCT:
return "acct";
case SDB_SUBSCRIBE:
return "subscribe";
case SDB_CONSUMER:
return "consumer";
case SDB_CGROUP:
return "cgroup";
case SDB_TOPIC:
return "topic";
case SDB_VGROUP:
......
......@@ -19,6 +19,7 @@
#include "mallocator.h"
#include "meta.h"
#include "common.h"
#include "tfs.h"
#ifdef __cplusplus
extern "C" {
......@@ -80,7 +81,7 @@ typedef struct {
} STableKeyInfo;
// STsdb
STsdb *tsdbOpen(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF, SMeta *pMeta);
STsdb *tsdbOpen(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF, SMeta *pMeta, STfs *pTfs);
void tsdbClose(STsdb *);
void tsdbRemove(const char *path);
int tsdbInsertData(STsdb *pTsdb, SSubmitMsg *pMsg, SSubmitRsp *pRsp);
......
......@@ -21,6 +21,7 @@
#include "meta.h"
#include "tarray.h"
#include "tfs.h"
#include "tq.h"
#include "tsdb.h"
#include "wal.h"
......@@ -36,7 +37,8 @@ typedef int32_t (*PutReqToVQueryQFp)(SDnode *pDnode, struct SRpcMsg *pReq);
typedef struct SVnodeCfg {
int32_t vgId;
SDnode * pDnode;
SDnode *pDnode;
STfs *pTfs;
uint64_t wsize;
uint64_t ssize;
uint64_t lsize;
......@@ -52,9 +54,9 @@ typedef struct SVnodeCfg {
typedef struct {
int32_t sver;
char * timezone;
char * locale;
char * charset;
const char *timezone;
const char *locale;
const char *charset;
uint16_t nthreads; // number of commit threads. 0 for no threads and a schedule queue should be given (TODO)
PutReqToVQueryQFp putReqToVQueryQFp;
} SVnodeOpt;
......
......@@ -50,6 +50,7 @@ struct STsdb {
SMemAllocatorFactory *pmaf;
STsdbFS * fs;
SMeta * pMeta;
STfs * pTfs;
};
#define REPO_ID(r) ((r)->vgId)
......
......@@ -29,12 +29,15 @@
#define TSDB_FILE_INFO(tf) (&((tf)->info))
#define TSDB_FILE_F(tf) (&((tf)->f))
#define TSDB_FILE_FD(tf) ((tf)->fd)
#define TSDB_FILE_FULL_NAME(tf) TFILE_NAME(TSDB_FILE_F(tf))
#define TSDB_FILE_FULL_NAME(tf) (TSDB_FILE_F(tf)->aname)
#define TSDB_FILE_OPENED(tf) (TSDB_FILE_FD(tf) >= 0)
#define TSDB_FILE_CLOSED(tf) (!TSDB_FILE_OPENED(tf))
#define TSDB_FILE_SET_CLOSED(f) (TSDB_FILE_FD(f) = -1)
#define TSDB_FILE_LEVEL(tf) TFILE_LEVEL(TSDB_FILE_F(tf))
#define TSDB_FILE_ID(tf) TFILE_ID(TSDB_FILE_F(tf))
#define TSDB_FILE_LEVEL(tf) (TSDB_FILE_F(tf)->did.level)
#define TSDB_FILE_ID(tf) (TSDB_FILE_F(tf)->did.id)
#define TSDB_FILE_DID(tf) (TSDB_FILE_F(tf)->did)
#define TSDB_FILE_REL_NAME(tf) (TSDB_FILE_F(tf)->rname)
#define TSDB_FILE_ABS_NAME(tf) (TSDB_FILE_F(tf)->aname)
#define TSDB_FILE_FSYNC(tf) taosFsyncFile(TSDB_FILE_FD(tf))
#define TSDB_FILE_STATE(tf) ((tf)->state)
#define TSDB_FILE_SET_STATE(tf, s) ((tf)->state = (s))
......@@ -54,10 +57,10 @@ typedef struct {
} SMFInfo;
typedef struct {
SMFInfo info;
TFILE f;
int fd;
uint8_t state;
SMFInfo info;
STfsFile f;
int fd;
uint8_t state;
} SMFile;
void tsdbInitMFile(SMFile* pMFile, SDiskID did, int vid, uint32_t ver);
......@@ -175,17 +178,17 @@ typedef struct {
} SDFInfo;
typedef struct {
SDFInfo info;
TFILE f;
int fd;
uint8_t state;
SDFInfo info;
STfsFile f;
int fd;
uint8_t state;
} SDFile;
void tsdbInitDFile(SDFile* pDFile, SDiskID did, int vid, int fid, uint32_t ver, TSDB_FILE_T ftype);
void tsdbInitDFile(STsdb *pRepo, SDFile* pDFile, SDiskID did, int fid, uint32_t ver, TSDB_FILE_T ftype);
void tsdbInitDFileEx(SDFile* pDFile, SDFile* pODFile);
int tsdbEncodeSDFile(void** buf, SDFile* pDFile);
void* tsdbDecodeSDFile(void* buf, SDFile* pDFile);
int tsdbCreateDFile(SDFile* pDFile, bool updateHeader);
void* tsdbDecodeSDFile(STsdb *pRepo, void* buf, SDFile* pDFile);
int tsdbCreateDFile(STsdb *pRepo, SDFile* pDFile, bool updateHeader);
int tsdbUpdateDFileHeader(SDFile* pDFile);
int tsdbLoadDFileHeader(SDFile* pDFile, SDFInfo* pInfo);
int tsdbParseDFilename(const char* fname, int* vid, int* fid, TSDB_FILE_T* ftype, uint32_t* version);
......@@ -263,7 +266,7 @@ static FORCE_INLINE int tsdbAppendDFile(SDFile* pDFile, void* buf, int64_t nbyte
return (int)nbyte;
}
static FORCE_INLINE int tsdbRemoveDFile(SDFile* pDFile) { return tfsremove(TSDB_FILE_F(pDFile)); }
static FORCE_INLINE int tsdbRemoveDFile(SDFile* pDFile) { return tfsRemoveFile(TSDB_FILE_F(pDFile)); }
static FORCE_INLINE int64_t tsdbReadDFile(SDFile* pDFile, void* buf, int64_t nbyte) {
ASSERT(TSDB_FILE_OPENED(pDFile));
......@@ -278,7 +281,7 @@ static FORCE_INLINE int64_t tsdbReadDFile(SDFile* pDFile, void* buf, int64_t nby
}
static FORCE_INLINE int tsdbCopyDFile(SDFile* pSrc, SDFile* pDest) {
if (tfscopy(TSDB_FILE_F(pSrc), TSDB_FILE_F(pDest)) < 0) {
if (tfsCopyFile(TSDB_FILE_F(pSrc), TSDB_FILE_F(pDest)) < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
......@@ -311,14 +314,14 @@ typedef struct {
} \
} while (0);
void tsdbInitDFileSet(SDFileSet* pSet, SDiskID did, int vid, int fid, uint32_t ver);
void tsdbInitDFileSet(STsdb *pRepo, SDFileSet* pSet, SDiskID did, int fid, uint32_t ver);
void tsdbInitDFileSetEx(SDFileSet* pSet, SDFileSet* pOSet);
int tsdbEncodeDFileSet(void** buf, SDFileSet* pSet);
void* tsdbDecodeDFileSet(void* buf, SDFileSet* pSet);
void* tsdbDecodeDFileSet(STsdb *pRepo, void* buf, SDFileSet* pSet);
int tsdbEncodeDFileSetEx(void** buf, SDFileSet* pSet);
void* tsdbDecodeDFileSetEx(void* buf, SDFileSet* pSet);
int tsdbApplyDFileSetChange(SDFileSet* from, SDFileSet* to);
int tsdbCreateDFileSet(SDFileSet* pSet, bool updateHeader);
int tsdbCreateDFileSet(STsdb *pRepo, SDFileSet* pSet, bool updateHeader);
int tsdbUpdateDFileSetHeader(SDFileSet* pSet);
int tsdbScanAndTryFixDFileSet(STsdb* pRepo, SDFileSet* pSet);
......
......@@ -78,6 +78,7 @@ struct SVnode {
tsem_t canCommit;
SQHandle* pQuery;
SDnode* pDnode;
STfs* pTfs;
};
int vnodeScheduleTask(SVnodeTask* task);
......
......@@ -97,15 +97,14 @@ int tsdbApplyRtnOnFSet(STsdb *pRepo, SDFileSet *pSet, SRtn *pRtn) {
level = tsdbGetFidLevel(pSet->fid, pRtn);
tfsAllocDisk(level, &(did.level), &(did.id));
if (did.level == TFS_UNDECIDED_LEVEL) {
if (tfsAllocDisk(pRepo->pTfs, level, &did) < 0) {
terrno = TSDB_CODE_TDB_NO_AVAIL_DISK;
return -1;
}
if (did.level > TSDB_FSET_LEVEL(pSet)) {
// Need to move the FSET to higher level
tsdbInitDFileSet(&nSet, did, REPO_ID(pRepo), pSet->fid, FS_TXN_VERSION(pfs));
tsdbInitDFileSet(pRepo, &nSet, did, pSet->fid, FS_TXN_VERSION(pfs));
if (tsdbCopyDFileSet(pSet, &nSet) < 0) {
tsdbError("vgId:%d failed to copy FSET %d from level %d to level %d since %s", REPO_ID(pRepo), pSet->fid,
......@@ -456,8 +455,7 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid
STsdb * pRepo = TSDB_COMMIT_REPO(pCommith);
SDFileSet *pWSet = TSDB_COMMIT_WRITE_FSET(pCommith);
tfsAllocDisk(tsdbGetFidLevel(fid, &(pCommith->rtn)), &(did.level), &(did.id));
if (did.level == TFS_UNDECIDED_LEVEL) {
if (tfsAllocDisk(pRepo->pTfs, tsdbGetFidLevel(fid, &(pCommith->rtn)), &did) < 0) {
terrno = TSDB_CODE_TDB_NO_AVAIL_DISK;
return -1;
}
......@@ -484,9 +482,9 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid
// Set and open commit FSET
if (pSet == NULL || did.level > TSDB_FSET_LEVEL(pSet)) {
// Create a new FSET to write data
tsdbInitDFileSet(pWSet, did, REPO_ID(pRepo), fid, FS_TXN_VERSION(REPO_FS(pRepo)));
tsdbInitDFileSet(pRepo, pWSet, did, fid, FS_TXN_VERSION(REPO_FS(pRepo)));
if (tsdbCreateDFileSet(pWSet, true) < 0) {
if (tsdbCreateDFileSet(pRepo, pWSet, true) < 0) {
tsdbError("vgId:%d failed to create FSET %d at level %d disk id %d since %s", REPO_ID(pRepo),
TSDB_FSET_FID(pWSet), TSDB_FSET_LEVEL(pWSet), TSDB_FSET_ID(pWSet), tstrerror(terrno));
if (pCommith->isRFileSet) {
......@@ -509,8 +507,8 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid
// TSDB_FILE_HEAD
SDFile *pWHeadf = TSDB_COMMIT_HEAD_FILE(pCommith);
tsdbInitDFile(pWHeadf, did, REPO_ID(pRepo), fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_HEAD);
if (tsdbCreateDFile(pWHeadf, true) < 0) {
tsdbInitDFile(pRepo, pWHeadf, did, fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_HEAD);
if (tsdbCreateDFile(pRepo, pWHeadf, true) < 0) {
tsdbError("vgId:%d failed to create file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWHeadf),
tstrerror(terrno));
......@@ -556,10 +554,10 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid
}
}
} else {
tsdbInitDFile(pWLastf, did, REPO_ID(pRepo), fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_LAST);
tsdbInitDFile(pRepo, pWLastf, did, fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_LAST);
pCommith->isLFileSame = false;
if (tsdbCreateDFile(pWLastf, true) < 0) {
if (tsdbCreateDFile(pRepo, pWLastf, true) < 0) {
tsdbError("vgId:%d failed to create file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWLastf),
tstrerror(terrno));
......
......@@ -186,8 +186,7 @@ static int tsdbCompactMeta(STsdbRepo *pRepo) {
}
} else {
// Create new fset as compacted fset
tfsAllocDisk(tsdbGetFidLevel(pSet->fid, &(pComph->rtn)), &(did.level), &(did.id));
if (did.level == TFS_UNDECIDED_LEVEL) {
if (tfsAllocDisk(pRepo->pTfs, tsdbGetFidLevel(pSet->fid, &(pComph->rtn)), &did) < 0) {
terrno = TSDB_CODE_TDB_NO_AVAIL_DISK;
tsdbError("vgId:%d failed to compact FSET %d since %s", REPO_ID(pRepo), pSet->fid, tstrerror(terrno));
tsdbCompactFSetEnd(pComph);
......
......@@ -23,14 +23,14 @@ static const char *tsdbTxnFname[] = {"current.t", "current"};
static int tsdbComparFidFSet(const void *arg1, const void *arg2);
static void tsdbResetFSStatus(SFSStatus *pStatus);
static int tsdbSaveFSStatus(SFSStatus *pStatus, int vid);
static int tsdbSaveFSStatus(STsdb *pRepo, SFSStatus *pStatus);
static void tsdbApplyFSTxnOnDisk(SFSStatus *pFrom, SFSStatus *pTo);
static void tsdbGetTxnFname(int repoid, TSDB_TXN_FILE_T ftype, char fname[]);
static void tsdbGetTxnFname(STsdb *pRepo, TSDB_TXN_FILE_T ftype, char fname[]);
static int tsdbOpenFSFromCurrent(STsdb *pRepo);
static int tsdbScanAndTryFixFS(STsdb *pRepo);
static int tsdbScanRootDir(STsdb *pRepo);
static int tsdbScanDataDir(STsdb *pRepo);
static bool tsdbIsTFileInFS(STsdbFS *pfs, const TFILE *pf);
static bool tsdbIsTFileInFS(STsdbFS *pfs, const STfsFile *pf);
static int tsdbRestoreCurrent(STsdb *pRepo);
static int tsdbComparTFILE(const void *arg1, const void *arg2);
static void tsdbScanAndTryFixDFilesHeader(STsdb *pRepo, int32_t *nExpired);
......@@ -97,7 +97,7 @@ static int tsdbEncodeDFileSetArray(void **buf, SArray *pArray) {
return tlen;
}
static void *tsdbDecodeDFileSetArray(void *buf, SArray *pArray) {
static void *tsdbDecodeDFileSetArray(STsdb*pRepo, void *buf, SArray *pArray) {
uint64_t nset;
SDFileSet dset;
......@@ -105,7 +105,7 @@ static void *tsdbDecodeDFileSetArray(void *buf, SArray *pArray) {
buf = taosDecodeFixedU64(buf, &nset);
for (size_t i = 0; i < nset; i++) {
buf = tsdbDecodeDFileSet(buf, &dset);
buf = tsdbDecodeDFileSet(pRepo, buf, &dset);
taosArrayPush(pArray, (void *)(&dset));
}
return buf;
......@@ -122,13 +122,13 @@ static int tsdbEncodeFSStatus(void **buf, SFSStatus *pStatus) {
return tlen;
}
static void *tsdbDecodeFSStatus(void *buf, SFSStatus *pStatus) {
static void *tsdbDecodeFSStatus(STsdb*pRepo, void *buf, SFSStatus *pStatus) {
tsdbResetFSStatus(pStatus);
// pStatus->pmf = &(pStatus->mf);
// buf = tsdbDecodeSMFile(buf, pStatus->pmf);
buf = tsdbDecodeDFileSetArray(buf, pStatus->df);
buf = tsdbDecodeDFileSetArray(pRepo, buf, pStatus->df);
return buf;
}
......@@ -311,7 +311,7 @@ int tsdbOpenFS(STsdb *pRepo) {
ASSERT(pfs != NULL);
tsdbGetTxnFname(REPO_ID(pRepo), TSDB_TXN_CURR_FILE, current);
tsdbGetTxnFname(pRepo, TSDB_TXN_CURR_FILE, current);
tsdbGetRtnSnap(pRepo, &pRepo->rtn);
if (access(current, F_OK) == 0) {
......@@ -375,7 +375,7 @@ int tsdbEndFSTxn(STsdb *pRepo) {
SFSStatus *pStatus;
// Write current file system snapshot
if (tsdbSaveFSStatus(pfs->nstatus, REPO_ID(pRepo)) < 0) {
if (tsdbSaveFSStatus(pRepo, pfs->nstatus) < 0) {
tsdbEndFSTxnWithError(pfs);
return -1;
}
......@@ -405,7 +405,7 @@ int tsdbEndFSTxnWithError(STsdbFS *pfs) {
int tsdbUpdateDFileSet(STsdbFS *pfs, const SDFileSet *pSet) { return tsdbAddDFileSetToStatus(pfs->nstatus, pSet); }
static int tsdbSaveFSStatus(SFSStatus *pStatus, int vid) {
static int tsdbSaveFSStatus(STsdb *pRepo, SFSStatus *pStatus) {
SFSHeader fsheader;
void * pBuf = NULL;
void * ptr;
......@@ -413,8 +413,8 @@ static int tsdbSaveFSStatus(SFSStatus *pStatus, int vid) {
char tfname[TSDB_FILENAME_LEN] = "\0";
char cfname[TSDB_FILENAME_LEN] = "\0";
tsdbGetTxnFname(vid, TSDB_TXN_TEMP_FILE, tfname);
tsdbGetTxnFname(vid, TSDB_TXN_CURR_FILE, cfname);
tsdbGetTxnFname(pRepo, TSDB_TXN_TEMP_FILE, tfname);
tsdbGetTxnFname(pRepo, TSDB_TXN_CURR_FILE, cfname);
int fd = open(tfname, O_WRONLY | O_CREAT | O_TRUNC | O_BINARY, 0755);
if (fd < 0) {
......@@ -645,8 +645,9 @@ static int tsdbComparFidFSet(const void *arg1, const void *arg2) {
}
}
static void tsdbGetTxnFname(int repoid, TSDB_TXN_FILE_T ftype, char fname[]) {
snprintf(fname, TSDB_FILENAME_LEN, "%s/vnode/vnode%d/tsdb/%s", TFS_PRIMARY_PATH(), repoid, tsdbTxnFname[ftype]);
static void tsdbGetTxnFname(STsdb *pRepo, TSDB_TXN_FILE_T ftype, char fname[]) {
snprintf(fname, TSDB_FILENAME_LEN, "%s/vnode/vnode%d/tsdb/%s", tfsGetPrimaryPath(pRepo->pTfs), pRepo->vgId,
tsdbTxnFname[ftype]);
}
static int tsdbOpenFSFromCurrent(STsdb *pRepo) {
......@@ -657,7 +658,7 @@ static int tsdbOpenFSFromCurrent(STsdb *pRepo) {
char current[TSDB_FILENAME_LEN] = "\0";
void * ptr;
tsdbGetTxnFname(REPO_ID(pRepo), TSDB_TXN_CURR_FILE, current);
tsdbGetTxnFname(pRepo, TSDB_TXN_CURR_FILE, current);
// current file exists, try to recover
fd = open(current, O_RDONLY | O_BINARY);
......@@ -725,7 +726,7 @@ static int tsdbOpenFSFromCurrent(STsdb *pRepo) {
}
ptr = buffer;
ptr = tsdbDecodeFSStatus(ptr, pStatus);
ptr = tsdbDecodeFSStatus(pRepo, ptr, pStatus);
} else {
tsdbResetFSStatus(pStatus);
}
......@@ -910,17 +911,17 @@ static int tsdbScanRootDir(STsdb *pRepo) {
char rootDir[TSDB_FILENAME_LEN];
char bname[TSDB_FILENAME_LEN];
STsdbFS * pfs = REPO_FS(pRepo);
const TFILE *pf;
const STfsFile *pf;
tsdbGetRootDir(REPO_ID(pRepo), rootDir);
TDIR *tdir = tfsOpendir(rootDir);
STfsDir *tdir = tfsOpendir(pRepo->pTfs, rootDir);
if (tdir == NULL) {
tsdbError("vgId:%d failed to open directory %s since %s", REPO_ID(pRepo), rootDir, tstrerror(terrno));
return -1;
}
while ((pf = tfsReaddir(tdir))) {
tfsbasename(pf, bname);
tfsBasename(pf, bname);
if (strcmp(bname, tsdbTxnFname[TSDB_TXN_CURR_FILE]) == 0 || strcmp(bname, "data") == 0) {
// Skip current file and data directory
......@@ -931,8 +932,8 @@ static int tsdbScanRootDir(STsdb *pRepo) {
// continue;
// }
(void)tfsremove(pf);
tsdbDebug("vgId:%d invalid file %s is removed", REPO_ID(pRepo), TFILE_NAME(pf));
(void)tfsRemoveFile(pf);
tsdbDebug("vgId:%d invalid file %s is removed", REPO_ID(pRepo), pf->aname);
}
tfsClosedir(tdir);
......@@ -944,21 +945,21 @@ static int tsdbScanDataDir(STsdb *pRepo) {
char dataDir[TSDB_FILENAME_LEN];
char bname[TSDB_FILENAME_LEN];
STsdbFS * pfs = REPO_FS(pRepo);
const TFILE *pf;
const STfsFile *pf;
tsdbGetDataDir(REPO_ID(pRepo), dataDir);
TDIR *tdir = tfsOpendir(dataDir);
STfsDir *tdir = tfsOpendir(pRepo->pTfs, dataDir);
if (tdir == NULL) {
tsdbError("vgId:%d failed to open directory %s since %s", REPO_ID(pRepo), dataDir, tstrerror(terrno));
return -1;
}
while ((pf = tfsReaddir(tdir))) {
tfsbasename(pf, bname);
tfsBasename(pf, bname);
if (!tsdbIsTFileInFS(pfs, pf)) {
(void)tfsremove(pf);
tsdbDebug("vgId:%d invalid file %s is removed", REPO_ID(pRepo), TFILE_NAME(pf));
(void)tfsRemoveFile(pf);
tsdbDebug("vgId:%d invalid file %s is removed", REPO_ID(pRepo), pf->aname);
}
}
......@@ -967,7 +968,7 @@ static int tsdbScanDataDir(STsdb *pRepo) {
return 0;
}
static bool tsdbIsTFileInFS(STsdbFS *pfs, const TFILE *pf) {
static bool tsdbIsTFileInFS(STsdbFS *pfs, const STfsFile *pf) {
SFSIter fsiter;
tsdbFSIterInit(&fsiter, pfs, TSDB_FS_ITER_FORWARD);
SDFileSet *pSet;
......@@ -987,8 +988,8 @@ static bool tsdbIsTFileInFS(STsdbFS *pfs, const TFILE *pf) {
// static int tsdbRestoreMeta(STsdb *pRepo) {
// char rootDir[TSDB_FILENAME_LEN];
// char bname[TSDB_FILENAME_LEN];
// TDIR * tdir = NULL;
// const TFILE *pf = NULL;
// STfsDir * tdir = NULL;
// const STfsFile *pf = NULL;
// const char * pattern = "^meta(-ver[0-9]+)?$";
// regex_t regex;
// STsdbFS * pfs = REPO_FS(pRepo);
......@@ -1007,7 +1008,7 @@ static bool tsdbIsTFileInFS(STsdbFS *pfs, const TFILE *pf) {
// }
// while ((pf = tfsReaddir(tdir))) {
// tfsbasename(pf, bname);
// tfsBasename(pf, bname);
// if (strcmp(bname, "data") == 0) {
// // Skip the data/ directory
......@@ -1016,7 +1017,7 @@ static bool tsdbIsTFileInFS(STsdbFS *pfs, const TFILE *pf) {
// if (strcmp(bname, tsdbTxnFname[TSDB_TXN_TEMP_FILE]) == 0) {
// // Skip current.t file
// tsdbInfo("vgId:%d file %s exists, remove it", REPO_ID(pRepo), TFILE_NAME(pf));
// tsdbInfo("vgId:%d file %s exists, remove it", REPO_ID(pRepo), pf->aname);
// (void)tfsremove(pf);
// continue;
// }
......@@ -1026,7 +1027,7 @@ static bool tsdbIsTFileInFS(STsdbFS *pfs, const TFILE *pf) {
// // Match
// if (pfs->cstatus->pmf != NULL) {
// tsdbError("vgId:%d failed to restore meta since two file exists, file1 %s and file2 %s", REPO_ID(pRepo),
// TSDB_FILE_FULL_NAME(pfs->cstatus->pmf), TFILE_NAME(pf));
// TSDB_FILE_FULL_NAME(pfs->cstatus->pmf), pf->aname);
// terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
// tfsClosedir(tdir);
// regfree(&regex);
......@@ -1081,7 +1082,7 @@ static bool tsdbIsTFileInFS(STsdbFS *pfs, const TFILE *pf) {
// }
// } else if (code == REG_NOMATCH) {
// // Not match
// tsdbInfo("vgId:%d invalid file %s exists, remove it", REPO_ID(pRepo), TFILE_NAME(pf));
// tsdbInfo("vgId:%d invalid file %s exists, remove it", REPO_ID(pRepo), pf->aname);
// tfsremove(pf);
// continue;
// } else {
......@@ -1108,8 +1109,8 @@ static bool tsdbIsTFileInFS(STsdbFS *pfs, const TFILE *pf) {
static int tsdbRestoreDFileSet(STsdb *pRepo) {
char dataDir[TSDB_FILENAME_LEN];
char bname[TSDB_FILENAME_LEN];
TDIR * tdir = NULL;
const TFILE *pf = NULL;
STfsDir * tdir = NULL;
const STfsFile *pf = NULL;
const char * pattern = "^v[0-9]+f[0-9]+\\.(head|data|last)(-ver[0-9]+)?$";
SArray * fArray = NULL;
regex_t regex;
......@@ -1120,7 +1121,7 @@ static int tsdbRestoreDFileSet(STsdb *pRepo) {
// Resource allocation and init
regcomp(&regex, pattern, REG_EXTENDED);
fArray = taosArrayInit(1024, sizeof(TFILE));
fArray = taosArrayInit(1024, sizeof(STfsFile));
if (fArray == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbError("vgId:%d failed to restore DFileSet while open directory %s since %s", REPO_ID(pRepo), dataDir,
......@@ -1129,7 +1130,7 @@ static int tsdbRestoreDFileSet(STsdb *pRepo) {
return -1;
}
tdir = tfsOpendir(dataDir);
tdir = tfsOpendir(pRepo->pTfs, dataDir);
if (tdir == NULL) {
tsdbError("vgId:%d failed to restore DFileSet while open directory %s since %s", REPO_ID(pRepo), dataDir,
tstrerror(terrno));
......@@ -1139,7 +1140,7 @@ static int tsdbRestoreDFileSet(STsdb *pRepo) {
}
while ((pf = tfsReaddir(tdir))) {
tfsbasename(pf, bname);
tfsBasename(pf, bname);
int code = regexec(&regex, bname, 0, NULL, 0);
if (code == 0) {
......@@ -1152,8 +1153,8 @@ static int tsdbRestoreDFileSet(STsdb *pRepo) {
}
} else if (code == REG_NOMATCH) {
// Not match
tsdbInfo("vgId:%d invalid file %s exists, remove it", REPO_ID(pRepo), TFILE_NAME(pf));
(void)tfsremove(pf);
tsdbInfo("vgId:%d invalid file %s exists, remove it", REPO_ID(pRepo), pf->aname);
(void)tfsRemoveFile(pf);
continue;
} else {
// Has other error
......@@ -1200,7 +1201,7 @@ static int tsdbRestoreDFileSet(STsdb *pRepo) {
uint32_t tversion;
char _bname[TSDB_FILENAME_LEN];
tfsbasename(pf, _bname);
tfsBasename(pf, _bname);
tsdbParseDFilename(_bname, &tvid, &tfid, &ttype, &tversion);
ASSERT(tvid == REPO_ID(pRepo));
......@@ -1287,7 +1288,7 @@ static int tsdbRestoreCurrent(STsdb *pRepo) {
return -1;
}
if (tsdbSaveFSStatus(pRepo->fs->cstatus, REPO_ID(pRepo)) < 0) {
if (tsdbSaveFSStatus(pRepo, pRepo->fs->cstatus) < 0) {
tsdbError("vgId:%d failed to restore corrent since %s", REPO_ID(pRepo), tstrerror(terrno));
return -1;
}
......@@ -1296,8 +1297,8 @@ static int tsdbRestoreCurrent(STsdb *pRepo) {
}
static int tsdbComparTFILE(const void *arg1, const void *arg2) {
TFILE *pf1 = (TFILE *)arg1;
TFILE *pf2 = (TFILE *)arg2;
STfsFile *pf1 = (STfsFile *)arg1;
STfsFile *pf2 = (STfsFile *)arg2;
int vid1, fid1, vid2, fid2;
TSDB_FILE_T ftype1, ftype2;
......@@ -1305,8 +1306,8 @@ static int tsdbComparTFILE(const void *arg1, const void *arg2) {
char bname1[TSDB_FILENAME_LEN];
char bname2[TSDB_FILENAME_LEN];
tfsbasename(pf1, bname1);
tfsbasename(pf2, bname2);
tfsBasename(pf1, bname1);
tfsBasename(pf2, bname2);
tsdbParseDFilename(bname1, &vid1, &fid1, &ftype1, &version1);
tsdbParseDFilename(bname2, &vid2, &fid2, &ftype2, &version2);
......
......@@ -295,7 +295,7 @@ static int tsdbRollBackMFile(SMFile *pMFile) {
#endif
// ============== Operations on SDFile
void tsdbInitDFile(SDFile *pDFile, SDiskID did, int vid, int fid, uint32_t ver, TSDB_FILE_T ftype) {
void tsdbInitDFile(STsdb *pRepo, SDFile *pDFile, SDiskID did, int fid, uint32_t ver, TSDB_FILE_T ftype) {
char fname[TSDB_FILENAME_LEN];
TSDB_FILE_SET_STATE(pDFile, TSDB_FILE_STATE_OK);
......@@ -305,8 +305,8 @@ void tsdbInitDFile(SDFile *pDFile, SDiskID did, int vid, int fid, uint32_t ver,
memset(&(pDFile->info), 0, sizeof(pDFile->info));
pDFile->info.magic = TSDB_FILE_INIT_MAGIC;
tsdbGetFilename(vid, fid, ver, ftype, fname);
tfsInitFile(&(pDFile->f), did.level, did.id, fname);
tsdbGetFilename(pRepo->vgId, fid, ver, ftype, fname);
tfsInitFile(pRepo->pTfs, &(pDFile->f), did, fname);
}
void tsdbInitDFileEx(SDFile *pDFile, SDFile *pODFile) {
......@@ -323,9 +323,9 @@ int tsdbEncodeSDFile(void **buf, SDFile *pDFile) {
return tlen;
}
void *tsdbDecodeSDFile(void *buf, SDFile *pDFile) {
void *tsdbDecodeSDFile(STsdb *pRepo, void *buf, SDFile *pDFile) {
buf = tsdbDecodeDFInfo(buf, &(pDFile->info));
buf = tfsDecodeFile(buf, &(pDFile->f));
buf = tfsDecodeFile(pRepo->pTfs, buf, &(pDFile->f));
TSDB_FILE_SET_CLOSED(pDFile);
return buf;
......@@ -352,15 +352,15 @@ static void *tsdbDecodeSDFileEx(void *buf, SDFile *pDFile) {
return buf;
}
int tsdbCreateDFile(SDFile *pDFile, bool updateHeader) {
int tsdbCreateDFile(STsdb *pRepo, SDFile *pDFile, bool updateHeader) {
ASSERT(pDFile->info.size == 0 && pDFile->info.magic == TSDB_FILE_INIT_MAGIC);
pDFile->fd = open(TSDB_FILE_FULL_NAME(pDFile), O_WRONLY | O_CREAT | O_TRUNC | O_BINARY, 0755);
if (pDFile->fd < 0) {
if (errno == ENOENT) {
// Try to create directory recursively
char *s = strdup(TFILE_REL_NAME(&(pDFile->f)));
if (tfsMkdirRecurAt(dirname(s), TSDB_FILE_LEVEL(pDFile), TSDB_FILE_ID(pDFile)) < 0) {
char *s = strdup(TSDB_FILE_REL_NAME(pDFile));
if (tfsMkdirRecurAt(pRepo->pTfs, dirname(s), TSDB_FILE_DID(pDFile)) < 0) {
tfree(s);
return -1;
}
......@@ -559,13 +559,13 @@ static int tsdbRollBackDFile(SDFile *pDFile) {
}
// ============== Operations on SDFileSet
void tsdbInitDFileSet(SDFileSet *pSet, SDiskID did, int vid, int fid, uint32_t ver) {
void tsdbInitDFileSet(STsdb *pRepo, SDFileSet *pSet, SDiskID did, int fid, uint32_t ver) {
pSet->fid = fid;
pSet->state = 0;
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
SDFile *pDFile = TSDB_DFILE_IN_SET(pSet, ftype);
tsdbInitDFile(pDFile, did, vid, fid, ver, ftype);
tsdbInitDFile(pRepo, pDFile, did, fid, ver, ftype);
}
}
......@@ -587,14 +587,14 @@ int tsdbEncodeDFileSet(void **buf, SDFileSet *pSet) {
return tlen;
}
void *tsdbDecodeDFileSet(void *buf, SDFileSet *pSet) {
void *tsdbDecodeDFileSet(STsdb *pRepo, void *buf, SDFileSet *pSet) {
int32_t fid;
buf = taosDecodeFixedI32(buf, &(fid));
pSet->state = 0;
pSet->fid = fid;
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
buf = tsdbDecodeSDFile(buf, TSDB_DFILE_IN_SET(pSet, ftype));
buf = tsdbDecodeSDFile(pRepo, buf, TSDB_DFILE_IN_SET(pSet, ftype));
}
return buf;
}
......@@ -633,9 +633,9 @@ int tsdbApplyDFileSetChange(SDFileSet *from, SDFileSet *to) {
return 0;
}
int tsdbCreateDFileSet(SDFileSet *pSet, bool updateHeader) {
int tsdbCreateDFileSet(STsdb *pRepo, SDFileSet *pSet, bool updateHeader) {
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
if (tsdbCreateDFile(TSDB_DFILE_IN_SET(pSet, ftype), updateHeader) < 0) {
if (tsdbCreateDFile(pRepo, TSDB_DFILE_IN_SET(pSet, ftype), updateHeader) < 0) {
tsdbCloseDFileSet(pSet);
tsdbRemoveDFileSet(pSet);
return -1;
......
......@@ -16,12 +16,13 @@
#include "tsdbDef.h"
static STsdb *tsdbNew(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF,
SMeta *pMeta);
SMeta *pMeta, STfs *pTfs);
static void tsdbFree(STsdb *pTsdb);
static int tsdbOpenImpl(STsdb *pTsdb);
static void tsdbCloseImpl(STsdb *pTsdb);
STsdb *tsdbOpen(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF, SMeta *pMeta) {
STsdb *tsdbOpen(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF, SMeta *pMeta,
STfs *pTfs) {
STsdb *pTsdb = NULL;
// Set default TSDB Options
......@@ -36,7 +37,7 @@ STsdb *tsdbOpen(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg, SMemAl
}
// Create the handle
pTsdb = tsdbNew(path, vgId, pTsdbCfg, pMAF, pMeta);
pTsdb = tsdbNew(path, vgId, pTsdbCfg, pMAF, pMeta, pTfs);
if (pTsdb == NULL) {
// TODO: handle error
return NULL;
......@@ -64,7 +65,7 @@ void tsdbRemove(const char *path) { taosRemoveDir(path); }
/* ------------------------ STATIC METHODS ------------------------ */
static STsdb *tsdbNew(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF,
SMeta *pMeta) {
SMeta *pMeta, STfs *pTfs) {
STsdb *pTsdb = NULL;
pTsdb = (STsdb *)calloc(1, sizeof(STsdb));
......@@ -78,6 +79,7 @@ static STsdb *tsdbNew(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg,
tsdbOptionsCopy(&(pTsdb->config), pTsdbCfg);
pTsdb->pmaf = pMAF;
pTsdb->pMeta = pMeta;
pTsdb->pTfs = pTfs;
pTsdb->fs = tsdbNewFS(pTsdbCfg);
......@@ -494,7 +496,7 @@ uint32_t tsdbGetFileInfo(STsdbRepo *repo, char *name, uint32_t *index, uint32_t
}
} else { // get the named file at the specified index. If not there, return 0
fname = malloc(256);
sprintf(fname, "%s/vnode/vnode%d/%s", TFS_PRIMARY_PATH(), REPO_ID(pRepo), name);
sprintf(fname, "%s/vnode/vnode%d/%s", tfsGetPrimaryPath(pRepo->pTfs), REPO_ID(pRepo), name);
if (access(fname, F_OK) != 0) {
tfree(fname);
return 0;
......
......@@ -28,6 +28,7 @@ SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg) {
if (pVnodeCfg != NULL) {
cfg.vgId = pVnodeCfg->vgId;
cfg.pDnode = pVnodeCfg->pDnode;
cfg.pTfs = pVnodeCfg->pTfs;
}
// Validate options
......@@ -75,6 +76,7 @@ static SVnode *vnodeNew(const char *path, const SVnodeCfg *pVnodeCfg) {
pVnode->vgId = pVnodeCfg->vgId;
pVnode->pDnode = pVnodeCfg->pDnode;
pVnode->pTfs = pVnodeCfg->pTfs;
pVnode->path = strdup(path);
vnodeOptionsCopy(&(pVnode->config), pVnodeCfg);
......@@ -109,7 +111,7 @@ static int vnodeOpenImpl(SVnode *pVnode) {
// Open tsdb
sprintf(dir, "%s/tsdb", pVnode->path);
pVnode->pTsdb = tsdbOpen(dir, pVnode->vgId, &(pVnode->config.tsdbCfg), vBufPoolGetMAF(pVnode), pVnode->pMeta);
pVnode->pTsdb = tsdbOpen(dir, pVnode->vgId, &(pVnode->config.tsdbCfg), vBufPoolGetMAF(pVnode), pVnode->pMeta, pVnode->pTfs);
if (pVnode->pTsdb == NULL) {
// TODO: handle error
return -1;
......
......@@ -9,9 +9,9 @@ target_include_directories(
target_link_libraries(
scheduler
PRIVATE os util planner qcom common catalog transport
PUBLIC os util planner qcom common catalog transport
)
if(${BUILD_TEST})
ADD_SUBDIRECTORY(test)
endif(${BUILD_TEST})
\ No newline at end of file
endif(${BUILD_TEST})
......@@ -33,47 +33,69 @@ extern int32_t fsDebugFlag;
#define fError(...) { if (fsDebugFlag & DEBUG_ERROR) { taosPrintLog("TFS ERROR ", 255, __VA_ARGS__); }}
#define fWarn(...) { if (fsDebugFlag & DEBUG_WARN) { taosPrintLog("TFS WARN ", 255, __VA_ARGS__); }}
#define fInfo(...) { if (fsDebugFlag & DEBUG_INFO) { taosPrintLog("TFS ", 255, __VA_ARGS__); }}
#define fDebug(...) { if (fsDebugFlag & DEBUG_DEBUG) { taosPrintLog("TFS ", cqDebugFlag, __VA_ARGS__); }}
#define fTrace(...) { if (fsDebugFlag & DEBUG_TRACE) { taosPrintLog("TFS ", cqDebugFlag, __VA_ARGS__); }}
#define fDebug(...) { if (fsDebugFlag & DEBUG_DEBUG) { taosPrintLog("TFS ", fsDebugFlag, __VA_ARGS__); }}
#define fTrace(...) { if (fsDebugFlag & DEBUG_TRACE) { taosPrintLog("TFS ", fsDebugFlag, __VA_ARGS__); }}
// Global Definitions
#define TFS_MIN_DISK_FREE_SIZE 50 * 1024 * 1024
typedef struct SDisk {
typedef struct {
int32_t level;
int32_t id;
char *path;
SDiskSize size;
} SDisk;
} STfsDisk;
typedef struct STier {
typedef struct {
pthread_spinlock_t lock;
int32_t level;
int16_t nextid; // next disk id to allocate
int16_t ndisk; // # of disks mounted to this tier
int16_t nAvailDisks; // # of Available disks
SDisk *disks[TSDB_MAX_DISKS_PER_TIER];
int32_t nextid; // next disk id to allocate
int32_t ndisk; // # of disks mounted to this tier
int32_t nAvailDisks; // # of Available disks
STfsDisk *disks[TFS_MAX_DISKS_PER_TIER];
SDiskSize size;
} STfsTier;
typedef struct {
STfsDisk *pDisk;
} SDiskIter;
typedef struct STfsDir {
SDiskIter iter;
SDiskID did;
char dirname[TSDB_FILENAME_LEN];
STfsFile tfile;
DIR *dir;
STfs *pTfs;
} STfsDir;
typedef struct STfs {
pthread_spinlock_t lock;
SDiskSize size;
} STier;
#define TIER_LEVEL(pt) ((pt)->level)
#define TIER_NDISKS(pt) ((pt)->ndisk)
#define TIER_SIZE(pt) ((pt)->tmeta.size)
#define TIER_FREE_SIZE(pt) ((pt)->tmeta.free)
#define DISK_AT_TIER(pt, id) ((pt)->disks[id])
#define DISK_DIR(pd) ((pd)->path)
SDisk *tfsNewDisk(int32_t level, int32_t id, const char *dir);
SDisk *tfsFreeDisk(SDisk *pDisk);
int32_t tfsUpdateDiskSize(SDisk *pDisk);
int32_t tfsInitTier(STier *pTier, int32_t level);
void tfsDestroyTier(STier *pTier);
SDisk *tfsMountDiskToTier(STier *pTier, SDiskCfg *pCfg);
void tfsUpdateTierSize(STier *pTier);
int32_t tfsAllocDiskOnTier(STier *pTier);
void tfsPosNextId(STier *pTier);
int32_t nlevel;
STfsTier tiers[TFS_MAX_TIERS];
SHashObj *hash; // name to did map
} STfs;
STfsDisk *tfsNewDisk(int32_t level, int32_t id, const char *dir);
STfsDisk *tfsFreeDisk(STfsDisk *pDisk);
int32_t tfsUpdateDiskSize(STfsDisk *pDisk);
int32_t tfsInitTier(STfsTier *pTier, int32_t level);
void tfsDestroyTier(STfsTier *pTier);
STfsDisk *tfsMountDiskToTier(STfsTier *pTier, SDiskCfg *pCfg);
void tfsUpdateTierSize(STfsTier *pTier);
int32_t tfsAllocDiskOnTier(STfsTier *pTier);
void tfsPosNextId(STfsTier *pTier);
#define tfsLockTier(pTier) pthread_spin_lock(&(pTier)->lock)
#define tfsUnLockTier(pTier) pthread_spin_unlock(&(pTier)->lock)
#define tfsLock(pTfs) pthread_spin_lock(&(pTfs)->lock)
#define tfsUnLock(pTfs) pthread_spin_unlock(&(pTfs)->lock)
#define TFS_TIER_AT(pTfs, level) (&(pTfs)->tiers[level])
#define TFS_DISK_AT(pTfs, did) ((pTfs)->tiers[(did).level].disks[(did).id])
#define TFS_PRIMARY_DISK(pTfs) ((pTfs)->tiers[0].disks[0])
#define TMPNAME_LEN (TSDB_FILENAME_LEN * 2 + 32)
#ifdef __cplusplus
}
......
此差异已折叠。
......@@ -16,8 +16,8 @@
#define _DEFAULT_SOURCE
#include "tfsInt.h"
SDisk *tfsNewDisk(int32_t level, int32_t id, const char *path) {
SDisk *pDisk = calloc(1, sizeof(SDisk));
STfsDisk *tfsNewDisk(int32_t level, int32_t id, const char *path) {
STfsDisk *pDisk = calloc(1, sizeof(STfsDisk));
if (pDisk == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
......@@ -36,7 +36,7 @@ SDisk *tfsNewDisk(int32_t level, int32_t id, const char *path) {
return pDisk;
}
SDisk *tfsFreeDisk(SDisk *pDisk) {
STfsDisk *tfsFreeDisk(STfsDisk *pDisk) {
if (pDisk != NULL) {
free(pDisk->path);
free(pDisk);
......@@ -45,8 +45,8 @@ SDisk *tfsFreeDisk(SDisk *pDisk) {
return NULL;
}
int32_t tfsUpdateDiskSize(SDisk *pDisk) {
if (taosGetDiskSize(pDisk->path, &pDisk->size) != 0) {
int32_t tfsUpdateDiskSize(STfsDisk *pDisk) {
if (taosGetDiskSize(pDisk->path, &pDisk->size) < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
fError("failed to get disk:%s size, level:%d id:%d since %s", pDisk->path, pDisk->level, pDisk->id, terrstr());
return -1;
......
......@@ -16,11 +16,8 @@
#define _DEFAULT_SOURCE
#include "tfsInt.h"
#define tfsLockTier(pTier) pthread_spin_lock(&(pTier)->lock)
#define tfsUnLockTier(pTier) pthread_spin_unlock(&(pTier)->lock)
int32_t tfsInitTier(STier *pTier, int32_t level) {
memset(pTier, 0, sizeof(STier));
int32_t tfsInitTier(STfsTier *pTier, int32_t level) {
memset(pTier, 0, sizeof(STfsTier));
if (pthread_spin_init(&pTier->lock, 0) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
......@@ -31,17 +28,17 @@ int32_t tfsInitTier(STier *pTier, int32_t level) {
return 0;
}
void tfsDestroyTier(STier *pTier) {
for (int32_t id = 0; id < TSDB_MAX_DISKS_PER_TIER; id++) {
void tfsDestroyTier(STfsTier *pTier) {
for (int32_t id = 0; id < TFS_MAX_DISKS_PER_TIER; id++) {
pTier->disks[id] = tfsFreeDisk(pTier->disks[id]);
}
pTier->ndisk = 0;
pthread_spin_destroy(&(pTier->lock));
pthread_spin_destroy(&pTier->lock);
}
SDisk *tfsMountDiskToTier(STier *pTier, SDiskCfg *pCfg) {
if (pTier->ndisk >= TSDB_MAX_DISKS_PER_TIER) {
STfsDisk *tfsMountDiskToTier(STfsTier *pTier, SDiskCfg *pCfg) {
if (pTier->ndisk >= TFS_MAX_DISKS_PER_TIER) {
terrno = TSDB_CODE_FS_TOO_MANY_MOUNT;
return NULL;
}
......@@ -61,12 +58,12 @@ SDisk *tfsMountDiskToTier(STier *pTier, SDiskCfg *pCfg) {
id = pTier->ndisk;
}
if (id >= TSDB_MAX_DISKS_PER_TIER) {
if (id >= TFS_MAX_DISKS_PER_TIER) {
terrno = TSDB_CODE_FS_TOO_MANY_MOUNT;
return NULL;
}
SDisk *pDisk = tfsNewDisk(pCfg->level, id, pCfg->dir);
STfsDisk *pDisk = tfsNewDisk(pCfg->level, id, pCfg->dir);
if (pDisk == NULL) return NULL;
pTier->disks[id] = pDisk;
......@@ -76,15 +73,16 @@ SDisk *tfsMountDiskToTier(STier *pTier, SDiskCfg *pCfg) {
return pTier->disks[id];
}
void tfsUpdateTierSize(STier *pTier) {
void tfsUpdateTierSize(STfsTier *pTier) {
SDiskSize size = {0};
int16_t nAvailDisks = 0;
int32_t nAvailDisks = 0;
tfsLockTier(pTier);
for (int32_t id = 0; id < pTier->ndisk; id++) {
SDisk *pDisk = pTier->disks[id];
STfsDisk *pDisk = pTier->disks[id];
if (pDisk == NULL) continue;
if (tfsUpdateDiskSize(pDisk) < 0) continue;
size.total += pDisk->size.total;
size.used += pDisk->size.used;
......@@ -99,7 +97,7 @@ void tfsUpdateTierSize(STier *pTier) {
}
// Round-Robin to allocate disk on a tier
int32_t tfsAllocDiskOnTier(STier *pTier) {
int32_t tfsAllocDiskOnTier(STfsTier *pTier) {
terrno = TSDB_CODE_FS_NO_VALID_DISK;
tfsLockTier(pTier);
......@@ -110,9 +108,9 @@ int32_t tfsAllocDiskOnTier(STier *pTier) {
}
int32_t retId = -1;
for (int32_t id = 0; id < TSDB_MAX_DISKS_PER_TIER; ++id) {
int32_t diskId = (pTier->nextid + id) % pTier->ndisk;
SDisk *pDisk = pTier->disks[diskId];
for (int32_t id = 0; id < TFS_MAX_DISKS_PER_TIER; ++id) {
int32_t diskId = (pTier->nextid + id) % pTier->ndisk;
STfsDisk *pDisk = pTier->disks[diskId];
if (pDisk == NULL) continue;
......@@ -128,12 +126,12 @@ int32_t tfsAllocDiskOnTier(STier *pTier) {
return retId;
}
void tfsPosNextId(STier *pTier) {
void tfsPosNextId(STfsTier *pTier) {
int32_t nextid = 0;
for (int32_t id = 1; id < pTier->ndisk; id++) {
SDisk *pLDisk = pTier->disks[nextid];
SDisk *pDisk = pTier->disks[id];
STfsDisk *pLDisk = pTier->disks[nextid];
STfsDisk *pDisk = pTier->disks[id];
if (pDisk->size.avail > TFS_MIN_DISK_FREE_SIZE && pDisk->size.avail > pLDisk->size.avail) {
nextid = id;
}
......
......@@ -142,7 +142,7 @@ int64_t taosWriteFile(FileFd fd, const void *buf, int64_t n) {
int64_t taosLSeekFile(FileFd fd, int64_t offset, int32_t whence) { return (int64_t)lseek(fd, (long)offset, whence); }
int64_t taosCopyFile(char *from, char *to) {
int64_t taosCopyFile(const char *from, const char *to) {
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
return 0;
#else
......@@ -400,7 +400,7 @@ int32_t taosFsyncFile(FileFd fd) {
#endif
}
int32_t taosRenameFile(char *oldName, char *newName) {
int32_t taosRenameFile(const char *oldName, const char *newName) {
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
int32_t code = MoveFileEx(oldName, newName, MOVEFILE_REPLACE_EXISTING | MOVEFILE_COPY_ALLOWED);
if (code < 0) {
......
......@@ -5,6 +5,49 @@
set -e
#set -x
masterDnode=slave
dataRootDir="/data"
firstEp="trd02:7000"
startPort=7000
dnodeNumber=1
updateSrc=no
while getopts "hm:f:n:r:p:u:" arg
do
case $arg in
m)
masterDnode=$( echo $OPTARG )
;;
n)
dnodeNumber=$(echo $OPTARG)
;;
u)
updateSrc=$(echo $OPTARG)
;;
f)
firstEp=$(echo $OPTARG)
;;
p)
startPort=$(echo $OPTARG)
;;
r)
dataRootDir=$(echo $OPTARG)
;;
h)
echo "Usage: `basename $0` -m [if master dnode] "
echo " -n [ dnode number] "
echo " -f [ first ep] "
echo " -p [ start port] "
echo " -r [ dnode root dir] "
exit 0
;;
?) #unknow option
echo "unkonw argument"
exit 1
;;
esac
done
# deployCluster.sh
curr_dir=$(readlink -f "$(dirname "$0")")
echo $curr_dir
......@@ -12,13 +55,21 @@ echo $curr_dir
${curr_dir}/cleanCluster.sh -r "/data"
${curr_dir}/cleanCluster.sh -r "/data2"
${curr_dir}/compileVersion.sh -r ${curr_dir}/../../../../ -v "3.0"
if [[ "${updateSrc}" == "yes" ]]; then
${curr_dir}/compileVersion.sh -r ${curr_dir}/../../../../ -v "3.0"
fi
${curr_dir}/setupDnodes.sh -r "/data" -n 1 -f "trd02:7000" -p 7000
${curr_dir}/setupDnodes.sh -r "/data2" -n 1 -f "trd02:7000" -p 8000
${curr_dir}/setupDnodes.sh -r "/data" -n ${dnodeNumber} -f ${firstEp} -p 7000
${curr_dir}/setupDnodes.sh -r "/data2" -n ${dnodeNumber} -f ${firstEp} -p 8000
#./setupDnodes.sh -r "/data" -n 2 -f trd02:7000 -p 7000
#./setupDnodes.sh -r "/data2" -n 2 -f trd02:7000 -p 8000
if [[ "${masterDnode}" == "master" ]]; then
# create all dnode into cluster
taos -s "create dnode trd02 port 8000;"
taos -s "create dnode trd03 port 7000;"
taos -s "create dnode trd03 port 8000;"
taos -s "create dnode trd04 port 7000;"
taos -s "create dnode trd04 port 8000;"
fi
......
......@@ -67,7 +67,31 @@ sql insert into c1 values(now+1s, 1)
sql insert into c1 values(now+2s, 2)
sql insert into c1 values(now+3s, 3)
return
print =============== query data
sql select * from c1
if $rows != 3 then
return -1
endi
print $data00 $data01
print $data10 $data11
print $data20 $data11
if $data01 != 1 then
return -1
endi
if $data11 != 2 then
return -1
endi
if $data21 != 3 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode1 -s start
print =============== query data
sql select * from c1
if $rows != 3 then
......
############## config parameter #####################
$node1 = 192.168.0.201
$node2 = 192.168.0.202
$node3 = 192.168.0.203
$node4 = 192.168.0.204
$self = $node1
$num = 25
#deploy = 0, start = 1, stop = 2
$option = 0
print =============== option:$option
############### stop dnodes #####################
if $option == 0 then
system sh/stop_dnodes.sh
endi
############### process firstEp #####################
$firstEp = $node1 . :7100
$firstPort = 7100
if $self == $node1 then
if $option == 1 then
system sh/exec.sh -n dnode1 -s start
endi
if $option == 2 then
system sh/exec.sh -n dnode1 -s stop -x SIGINT
endi
if $option == 0 then
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c firstEp -v $firstEp
system sh/cfg.sh -n dnode1 -c secondEp -v $firstEp
system sh/cfg.sh -n dnode1 -c fqdn -v $node1
system sh/cfg.sh -n dnode1 -c serverPort -v $firstPort
system sh/cfg.sh -n dnode1 -c supportVnodes -v 0
system sh/exec.sh -n dnode1 -s start
sql connect
$i = 0
while $i < $num
$port = $i * 100
$port = $port + 8100
$i = $i + 1
sql create dnode $node1 port $port
endw
$i = 0
while $i < $num
$port = $i * 100
$port = $port + 8100
$i = $i + 1
sql create dnode $node2 port $port
endw
$i = 0
while $i < $num
$port = $i * 100
$port = $port + 8100
$i = $i + 1
sql create dnode $node3 port $port
endw
$i = 0
while $i < $num
$port = $i * 100
$port = $port + 8100
$i = $i + 1
sql create dnode $node4 port $port
endw
endi
endi
############### process nodes #####################
$i = 0
while $i < $num
$index = $i + 80
$port = $i * 100
$port = $port + 8100
$dnodename = dnode . $index
$i = $i + 1
if $option == 1 then
system sh/exec.sh -n $dnodename -s start
endi
if $option == 2 then
system sh/exec.sh -n $dnodename -s stop -x SIGINT
endi
if $option == 0 then
system sh/deploy.sh -n $dnodename -i 1
system sh/cfg.sh -n $dnodename -c firstEp -v $firstEp
system sh/cfg.sh -n $dnodename -c secondEp -v $firstEp
system sh/cfg.sh -n $dnodename -c fqdn -v $self
system sh/cfg.sh -n $dnodename -c serverPort -v $port
system sh/exec.sh -n $dnodename -s start
endi
endw
......@@ -46,9 +46,9 @@ typedef struct {
pthread_t thread;
} SThreadInfo;
//void parseArgument(int32_t argc, char *argv[]);
//void *threadFunc(void *param);
//void createDbAndStb();
// void parseArgument(int32_t argc, char *argv[]);
// void *threadFunc(void *param);
// void createDbAndStb();
void createDbAndStb() {
pPrint("start to create db and stable");
......@@ -64,7 +64,8 @@ void createDbAndStb() {
TAOS_RES *pRes = taos_query(con, qstr);
int32_t code = taos_errno(pRes);
if (code != 0) {
pError("failed to create database:%s, sql:%s, code:%d reason:%s", dbName, qstr, taos_errno(pRes), taos_errstr(pRes));
pError("failed to create database:%s, sql:%s, code:%d reason:%s", dbName, qstr, taos_errno(pRes),
taos_errstr(pRes));
exit(0);
}
taos_free_result(pRes);
......@@ -129,10 +130,9 @@ static int64_t getResult(TAOS_RES *tres) {
return numOfRows;
}
void showTables() {
void showTables() {
pPrint("start to show tables");
char qstr[32];
char qstr[128];
TAOS *con = taos_connect(NULL, "root", "taosdata", NULL, 0);
if (con == NULL) {
......@@ -140,9 +140,9 @@ void showTables() {
exit(1);
}
sprintf(qstr, "use %s", dbName);
snprintf(qstr, 128, "use %s", dbName);
TAOS_RES *pRes = taos_query(con, qstr);
int code = taos_errno(pRes);
int code = taos_errno(pRes);
if (code != 0) {
pError("failed to use db, code:%d reason:%s", taos_errno(pRes), taos_errstr(pRes));
exit(1);
......@@ -160,12 +160,11 @@ void showTables() {
int64_t totalTableNum = getResult(pRes);
taos_free_result(pRes);
pPrint("%s database: %s, total %" PRId64 " tables %s", GREEN, dbName, totalTableNum, NC);
pPrint("%s database: %s, total %" PRId64 " tables %s", GREEN, dbName, totalTableNum, NC);
taos_close(con);
}
void *threadFunc(void *param) {
SThreadInfo *pInfo = (SThreadInfo *)param;
char *qstr = malloc(2000 * 1000);
......@@ -177,48 +176,48 @@ void *threadFunc(void *param) {
exit(1);
}
//printf("thread:%d, table range: %"PRId64 " - %"PRId64 "\n", pInfo->threadIndex, pInfo->tableBeginIndex, pInfo->tableEndIndex);
// printf("thread:%d, table range: %"PRId64 " - %"PRId64 "\n", pInfo->threadIndex, pInfo->tableBeginIndex,
// pInfo->tableEndIndex);
sprintf(qstr, "use %s", pInfo->dbName);
TAOS_RES *pRes = taos_query(con, qstr);
taos_free_result(pRes);
if (createTable) {
int64_t curMs = 0;
int64_t beginMs = taosGetTimestampMs();
pInfo->startMs = beginMs;
int64_t t = pInfo->tableBeginIndex;
int64_t t = pInfo->tableBeginIndex;
for (; t <= pInfo->tableEndIndex;) {
//int64_t batch = (pInfo->tableEndIndex - t);
//batch = MIN(batch, batchNum);
// int64_t batch = (pInfo->tableEndIndex - t);
// batch = MIN(batch, batchNum);
int32_t len = sprintf(qstr, "create table");
for (int32_t i = 0; i < batchNum;) {
len += sprintf(qstr + len, " %s_t%" PRId64 " using %s tags(%" PRId64 ")", stbName, t, stbName, t);
t++;
i++;
t++;
i++;
if (t > pInfo->tableEndIndex) {
break;
}
break;
}
}
int64_t startTs = taosGetTimestampUs();
int64_t startTs = taosGetTimestampUs();
TAOS_RES *pRes = taos_query(con, qstr);
code = taos_errno(pRes);
if (code != 0) {
pError("failed to create table t%" PRId64 ", reason:%s", t, tstrerror(code));
if ((code != 0) && (code != TSDB_CODE_RPC_AUTH_REQUIRED)) {
pError("failed to create table t%" PRId64 ", code: %d, reason:%s", t, code, tstrerror(code));
}
taos_free_result(pRes);
int64_t endTs = taosGetTimestampUs();
int64_t delay = endTs - startTs;
//printf("==== %"PRId64" - %"PRId64", %"PRId64"\n", startTs, endTs, delay);
if (delay > pInfo->maxDelay) pInfo->maxDelay = delay;
int64_t endTs = taosGetTimestampUs();
int64_t delay = endTs - startTs;
// printf("==== %"PRId64" - %"PRId64", %"PRId64"\n", startTs, endTs, delay);
if (delay > pInfo->maxDelay) pInfo->maxDelay = delay;
if (delay < pInfo->minDelay) pInfo->minDelay = delay;
curMs = taosGetTimestampMs();
if (curMs - beginMs > 10000) {
beginMs = curMs;
//printf("==== tableBeginIndex: %"PRId64", t: %"PRId64"\n", pInfo->tableBeginIndex, t);
curMs = taosGetTimestampMs();
if (curMs - beginMs > 10000) {
beginMs = curMs;
// printf("==== tableBeginIndex: %"PRId64", t: %"PRId64"\n", pInfo->tableBeginIndex, t);
printCreateProgress(pInfo, t);
}
}
......@@ -227,7 +226,7 @@ void *threadFunc(void *param) {
if (insertData) {
int64_t curMs = 0;
int64_t beginMs = taosGetTimestampMs();;
int64_t beginMs = taosGetTimestampMs();
pInfo->startMs = taosGetTimestampMs();
for (int64_t t = pInfo->tableBeginIndex; t < pInfo->tableEndIndex; ++t) {
......@@ -247,7 +246,7 @@ void *threadFunc(void *param) {
taos_free_result(pRes);
curMs = taosGetTimestampMs();
if (curMs - beginMs > 10000) {
if (curMs - beginMs > 10000) {
printInsertProgress(pInfo, t);
}
t += (batch - 1);
......@@ -335,33 +334,32 @@ int32_t main(int32_t argc, char *argv[]) {
parseArgument(argc, argv);
if (showTablesFlag) {
showTables();
return 0;
showTables();
return 0;
}
createDbAndStb();
pPrint("%d threads are spawned to create %d tables", numOfThreads, numOfThreads);
pPrint("%d threads are spawned to create %" PRId64 " tables", numOfThreads, numOfTables);
pthread_attr_t thattr;
pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
SThreadInfo *pInfo = (SThreadInfo *)calloc(numOfThreads, sizeof(SThreadInfo));
//int64_t numOfTablesPerThread = numOfTables / numOfThreads;
//numOfTables = numOfTablesPerThread * numOfThreads;
// int64_t numOfTablesPerThread = numOfTables / numOfThreads;
// numOfTables = numOfTablesPerThread * numOfThreads;
if (numOfThreads < 1) {
numOfThreads = 1;
}
int64_t a = numOfTables / numOfThreads;
if (a < 1) {
numOfThreads = numOfTables;
a = 1;
numOfThreads = numOfTables;
a = 1;
}
int64_t b = 0;
b = numOfTables % numOfThreads;
......@@ -371,7 +369,7 @@ int32_t main(int32_t argc, char *argv[]) {
pInfo[i].tableEndIndex = i < b ? tableFrom + a : tableFrom + a - 1;
tableFrom = pInfo[i].tableEndIndex + 1;
pInfo[i].threadIndex = i;
pInfo[i].minDelay = INT64_MAX;
pInfo[i].minDelay = INT64_MAX;
strcpy(pInfo[i].dbName, dbName);
strcpy(pInfo[i].stbName, stbName);
pthread_create(&(pInfo[i].thread), &thattr, threadFunc, (void *)(pInfo + i));
......@@ -390,7 +388,7 @@ int32_t main(int32_t argc, char *argv[]) {
createTableSpeed += pInfo[i].createTableSpeed;
if (pInfo[i].maxDelay > maxDelay) maxDelay = pInfo[i].maxDelay;
if (pInfo[i].minDelay < minDelay) minDelay = pInfo[i].minDelay;
if (pInfo[i].minDelay < minDelay) minDelay = pInfo[i].minDelay;
}
float insertDataSpeed = 0;
......@@ -398,21 +396,15 @@ int32_t main(int32_t argc, char *argv[]) {
insertDataSpeed += pInfo[i].insertDataSpeed;
}
pPrint("%s total %" PRId64 " tables, %.1f tables/second, threads:%d, maxDelay: %" PRId64 "us, minDelay: %" PRId64 "us %s",
GREEN,
numOfTables,
createTableSpeed,
numOfThreads,
maxDelay,
minDelay,
NC);
pPrint("%s total %" PRId64 " tables, %.1f tables/second, threads:%d, maxDelay: %" PRId64 "us, minDelay: %" PRId64
"us %s",
GREEN, numOfTables, createTableSpeed, numOfThreads, maxDelay, minDelay, NC);
if (insertData) {
pPrint("%s total %" PRId64 " tables, %.1f rows/second, threads:%d %s", GREEN, numOfTables, insertDataSpeed,
numOfThreads, NC);
pPrint("%s total %" PRId64 " tables, %.1f rows/second, threads:%d %s", GREEN, numOfTables, insertDataSpeed,
numOfThreads, NC);
}
pthread_attr_destroy(&thattr);
free(pInfo);
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册