提交 b868557d 编写于 作者: H Haojun Liao

Merge remote-tracking branch 'origin/3.0' into 3.0

......@@ -706,41 +706,30 @@ typedef struct {
} SStatusRsp;
typedef struct {
uint32_t vgId;
int32_t dbCfgVersion;
int32_t maxTables;
int32_t cacheBlockSize;
int32_t totalBlocks;
int32_t daysPerFile;
int32_t daysToKeep;
int32_t daysToKeep1;
int32_t daysToKeep2;
int32_t minRowsPerFileBlock;
int32_t maxRowsPerFileBlock;
int32_t commitTime;
int32_t fsyncPeriod;
int8_t precision;
int8_t compression;
int8_t walLevel;
int8_t vgReplica;
int8_t wals;
int8_t quorum;
int8_t update;
int8_t cacheLastRow;
int32_t vgCfgVersion;
int8_t dbReplica;
int8_t dbType;
int8_t reserved[8];
} SVnodeCfg;
typedef struct {
int32_t nodeId;
char nodeEp[TSDB_EP_LEN];
uint16_t port;
char fqdn[TSDB_FQDN_LEN];
} SVnodeDesc;
typedef struct {
char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN];
SVnodeCfg cfg;
uint32_t vgId;
int32_t cacheBlockSize;
int32_t totalBlocks;
int32_t daysPerFile;
int32_t daysToKeep0;
int32_t daysToKeep1;
int32_t daysToKeep2;
int32_t minRowsPerFileBlock;
int32_t maxRowsPerFileBlock;
int8_t precision;
int8_t compression;
int8_t cacheLastRow;
int8_t update;
int8_t walLevel;
int8_t replica;
int8_t quorum;
int8_t reserved[9];
int32_t fsyncPeriod;
SVnodeDesc nodes[TSDB_MAX_REPLICA];
} SCreateVnodeMsg, SAlterVnodeMsg;
......
......@@ -22,7 +22,6 @@ extern "C" {
#include <stdint.h>
#include "taosdef.h"
#include "wal.h"
typedef int64_t SyncNodeId;
typedef int32_t SyncGroupId;
......@@ -41,6 +40,7 @@ typedef struct {
} SSyncBuffer;
typedef struct {
SyncNodeId nodeId;
uint16_t nodePort; // node sync Port
char nodeFqdn[TSDB_FQDN_LEN]; // node FQDN
} SNodeInfo;
......@@ -83,11 +83,38 @@ typedef struct SSyncFSM {
} SSyncFSM;
typedef struct SSyncLogStore {
void* pData;
// write log with given index
int32_t (*logWrite)(struct SSyncLogStore* logStore, SyncIndex index, SSyncBuffer* pBuf);
// mark log with given index has been commtted
int32_t (*logCommit)(struct SSyncLogStore* logStore, SyncIndex index);
// prune log before given index
int32_t (*logPrune)(struct SSyncLogStore* logStore, SyncIndex index);
// rollback log after given index
int32_t (*logRollback)(struct SSyncLogStore* logStore, SyncIndex index);
} SSyncLogStore;
typedef struct SSyncServerState {
SNodeInfo voteFor;
SyncNodeId voteFor;
SSyncTerm term;
} SSyncServerState;
typedef struct SSyncClusterConfig {
// Log index number of current cluster config.
SyncIndex index;
// Log index number of previous cluster config.
SyncIndex prevIndex;
// current cluster
const SSyncCluster* cluster;
} SSyncClusterConfig;
typedef struct SStateManager {
void* pData;
......@@ -95,35 +122,38 @@ typedef struct SStateManager {
const SSyncServerState* (*readServerState)(struct SStateManager* stateMng);
void (*saveCluster)(struct SStateManager* stateMng, const SSyncCluster* cluster);
void (*saveCluster)(struct SStateManager* stateMng, const SSyncClusterConfig* cluster);
const SSyncCluster* (*readCluster)(struct SStateManager* stateMng);
const SSyncClusterConfig* (*readCluster)(struct SStateManager* stateMng);
} SStateManager;
typedef struct {
SyncGroupId vgId;
twalh walHandle;
SyncIndex snapshotIndex;
SSyncCluster syncCfg;
SSyncFSM fsm;
SSyncLogStore logStore;
SStateManager stateManager;
} SSyncInfo;
struct SSyncNode;
typedef struct SSyncNode SSyncNode;
int32_t syncInit();
void syncCleanUp();
SyncNodeId syncStart(const SSyncInfo*);
SSyncNode syncStart(const SSyncInfo*);
void syncStop(SyncNodeId);
int32_t syncPropose(SyncNodeId nodeId, SSyncBuffer buffer, void* pData, bool isWeak);
int32_t syncPropose(SSyncNode syncNode, SSyncBuffer buffer, void* pData, bool isWeak);
int32_t syncAddNode(SyncNodeId nodeId, const SNodeInfo *pNode);
int32_t syncAddNode(SSyncNode syncNode, const SNodeInfo *pNode);
int32_t syncRemoveNode(SyncNodeId nodeId, const SNodeInfo *pNode);
int32_t syncRemoveNode(SSyncNode syncNode, const SNodeInfo *pNode);
extern int32_t syncDebugFlag;
......
......@@ -67,6 +67,11 @@ void dnodeSendRedirectMsg(struct SRpcMsg *rpcMsg, bool forShell);
*/
void dnodeGetEp(int32_t dnodeId, char *ep, char *fqdn, uint16_t *port);
/**
* Report the startup progress.
*/
void dnodeReportStartup(char *name, char *desc);
#ifdef __cplusplus
}
#endif
......
......@@ -23,64 +23,79 @@ extern "C" {
#endif
typedef struct tmqMsgHead {
int32_t headLen;
int32_t protoVer;
int32_t msgType;
int64_t cgId;
int64_t topicId;
int64_t clientId;
int32_t checksum;
int32_t msgType;
} tmqMsgHead;
typedef struct tmqOneAck {
int64_t topicId;
int64_t consumeOffset;
} tmqOneAck;
typedef struct tmqAcks {
int32_t ackNum;
//should be sorted
tmqOneAck acks[];
} tmqAcks;
//TODO: put msgs into common
typedef struct tmqConnectReq {
tmqMsgHead head;
tmqAcks acks;
} tmqConnectReq;
typedef struct tmqConnectResp {
typedef struct tmqConnectRsp {
tmqMsgHead head;
int8_t status;
} tmqConnectResp;
} tmqConnectRsp;
typedef struct tmqDisconnectReq {
tmqMsgHead head;
} tmqDisconnectReq;
typedef struct tmqDisconnectResp {
typedef struct tmqDisconnectRsp {
tmqMsgHead head;
int8_t status;
} tmqDiconnectResp;
} tmqDiconnectRsp;
typedef struct tmqConsumeReq {
tmqMsgHead head;
int64_t commitOffset;
tmqAcks acks;
} tmqConsumeReq;
typedef struct tmqConsumeResp {
tmqMsgHead head;
char content[];
} tmqConsumeResp;
typedef struct tmqMsgContent {
int64_t topicId;
int64_t msgLen;
char msg[];
} tmqMsgContent;
typedef struct tmqConsumeRsp {
tmqMsgHead head;
int64_t bodySize;
tmqMsgContent msgs[];
} tmqConsumeRsp;
//
typedef struct tmqMnodeSubscribeReq {
tmqMsgHead head;
int64_t topicLen;
char topic[];
} tmqSubscribeReq;
typedef struct tmqMnodeSubscribeResp {
typedef struct tmqMnodeSubscribeRsp {
tmqMsgHead head;
int64_t vgId;
char ep[]; //TSDB_EP_LEN
} tmqSubscribeResp;
} tmqSubscribeRsp;
typedef struct tmqHeartbeatReq {
} tmqHeartbeatReq;
typedef struct tmqHeartbeatResp {
typedef struct tmqHeartbeatRsp {
} tmqHeartbeatResp;
} tmqHeartbeatRsp;
typedef struct tqTopicVhandle {
//name
......@@ -92,33 +107,57 @@ typedef struct tqTopicVhandle {
} tqTopicVhandle;
typedef struct STQ {
//the set for topics
//key=topicName: str
//value=tqTopicVhandle
//the collection of group handle
//a map
//key=<topic: str, cgId: int64_t>
//value=consumeOffset: int64_t
} STQ;
#define TQ_BUFFER_SIZE 8
//TODO: define a serializer and deserializer
typedef struct tqBufferItem {
int64_t offset;
//executors are identical but not concurrent
//so it must be a copy in each item
void* executor;
int64_t size;
void* content;
} tqBufferItem;
typedef struct tqGroupHandle {
char* topic; //c style, end with '\0'
int64_t cgId;
void* ahandle;
int64_t consumeOffset;
typedef struct tqBufferHandle {
//char* topic; //c style, end with '\0'
//int64_t cgId;
//void* ahandle;
int64_t nextConsumeOffset;
int64_t topicId;
int32_t head;
int32_t tail;
tqBufferItem buffer[TQ_BUFFER_SIZE];
} tqBufferHandle;
typedef struct tqListHandle {
tqBufferHandle* bufHandle;
struct tqListHandle* next;
} tqListHandle;
typedef struct tqGroupHandle {
int64_t cId;
int64_t cgId;
void* ahandle;
int32_t topicNum;
tqListHandle *head;
} tqGroupHandle;
typedef struct tqQueryExec {
void* src;
tqBufferItem* dest;
void* executor;
} tqQueryExec;
typedef struct tqQueryMsg {
tqQueryExec *exec;
struct tqQueryMsg *next;
} tqQueryMsg;
//init in each vnode
STQ* tqInit(void* ref_func(void*), void* unref_func(void*));
void tqCleanUp(STQ*);
......@@ -127,20 +166,33 @@ void tqCleanUp(STQ*);
int tqPushMsg(STQ*, void* msg, int64_t version);
int tqCommit(STQ*);
//void* will be replace by a msg type
int tqHandleConsumeMsg(STQ*, tmqConsumeReq* msg);
int tqConsume(STQ*, tmqConsumeReq*);
tqGroupHandle* tqFindGHandleBycId(STQ*, int64_t cId);
tqGroupHandle* tqGetGroupHandle(STQ*, int64_t cId);
int tqOpenTCGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId);
int tqCloseTCGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId);
int tqMoveOffsetToNext(tqGroupHandle*);
int tqResetOffset(STQ*, int64_t topicId, int64_t cgId, int64_t offset);
int tqFetchMsg(tqGroupHandle*, void*);
int tqRegisterContext(tqGroupHandle*, void*);
int tqLaunchQuery(STQ*, int64_t topicId, int64_t cgId, void* query);
int tqLaunchQuery(tqGroupHandle*);
int tqSendLaunchQuery(STQ*, int64_t topicId, int64_t cgId, void* query);
int tqSerializeGroupHandle(tqGroupHandle *gHandle, void** ppBytes, int32_t offset);
int tqSerializeListHandle(tqListHandle *listHandle, void** ppBytes, int32_t offset);
int tqSerializeBufHandle(tqBufferHandle *bufHandle, void** ppBytes, int32_t offset);
int tqSerializeBufItem(tqBufferItem *bufItem, void** ppBytes, int32_t offset);
int tqDeserializeGroupHandle(const void* pBytes, tqGroupHandle **pGhandle);
int tqDeserializeListHandle(const void* pBytes, tqListHandle **pListHandle);
int tqDeserializeBufHandle(const void* pBytes, tqBufferHandle **pBufHandle);
int tqDeserializeBufItem(const void* pBytes, tqBufferItem **pBufItem);
int tqGetGHandleSSize(const tqGroupHandle *gHandle);
int tqListHandleSSize(const tqListHandle *listHandle);
int tqBufHandleSSize(const tqBufferHandle *bufHandle);
int tqBufItemSSize(const tqBufferItem *bufItem);
#ifdef __cplusplus
}
#endif
......
......@@ -46,6 +46,11 @@ typedef struct {
*/
void (*GetDnodeEp)(int32_t dnodeId, char *ep, char *fqdn, uint16_t *port);
/**
* Report the startup progress.
*/
void (*ReportStartup)(char *name, char *desc);
} SVnodeFp;
typedef struct {
......
......@@ -233,11 +233,11 @@ int32_t* taosGetErrno();
#define TSDB_CODE_VND_NO_SUCH_FILE_OR_DIR TAOS_DEF_ERROR_CODE(0, 0x0507) //"Missing data file")
#define TSDB_CODE_VND_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0508) //"Out of memory")
#define TSDB_CODE_VND_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x0509) //"Unexpected generic error in vnode")
#define TSDB_CODE_VND_INVALID_VRESION_FILE TAOS_DEF_ERROR_CODE(0, 0x050A) //"Invalid version file")
#define TSDB_CODE_VND_IS_FULL TAOS_DEF_ERROR_CODE(0, 0x050B) //"Database memory is full for commit failed")
#define TSDB_CODE_VND_IS_FLOWCTRL TAOS_DEF_ERROR_CODE(0, 0x050C) //"Database memory is full for waiting commit")
#define TSDB_CODE_VND_INVALID_CFG_FILE TAOS_DEF_ERROR_CODE(0, 0x050A) //"Invalid config file)
#define TSDB_CODE_VND_INVALID_TERM_FILE TAOS_DEF_ERROR_CODE(0, 0x050B) //"Invalid term file")
#define TSDB_CODE_VND_IS_FLOWCTRL TAOS_DEF_ERROR_CODE(0, 0x050C) //"Database memory is full")
#define TSDB_CODE_VND_IS_DROPPING TAOS_DEF_ERROR_CODE(0, 0x050D) //"Database is dropping")
#define TSDB_CODE_VND_IS_BALANCING TAOS_DEF_ERROR_CODE(0, 0x050E) //"Database is balancing")
#define TSDB_CODE_VND_IS_UPDATING TAOS_DEF_ERROR_CODE(0, 0x050E) //"Database is updating")
#define TSDB_CODE_VND_IS_CLOSING TAOS_DEF_ERROR_CODE(0, 0x0510) //"Database is closing")
#define TSDB_CODE_VND_NOT_SYNCED TAOS_DEF_ERROR_CODE(0, 0x0511) //"Database suspended")
#define TSDB_CODE_VND_NO_WRITE_AUTH TAOS_DEF_ERROR_CODE(0, 0x0512) //"Database write operation denied")
......
......@@ -382,6 +382,9 @@ do { \
#define TSDB_DATA_TYPE_UINT 13 // 4 bytes
#define TSDB_DATA_TYPE_UBIGINT 14 // 8 bytes
enum { TRANS_STAT_INIT = 0, TRANS_STAT_EXECUTING, TRANS_STAT_EXECUTED, TRANS_STAT_ROLLBACKING, TRANS_STAT_ROLLBACKED };
enum { TRANS_OPER_INIT = 0, TRANS_OPER_EXECUTE, TRANS_OPER_ROLLBACK };
#ifdef __cplusplus
}
#endif
......
......@@ -15,5 +15,10 @@
#include "wal.h"
int32_t walInit() {return 0;}
void walCleanUp() {}
\ No newline at end of file
int32_t walInit() { return 0; }
void walCleanUp() {}
twalh walOpen(char *path, SWalCfg *pCfg) { return NULL; }
int32_t walAlter(twalh pWal, SWalCfg *pCfg) { return 0; }
\ No newline at end of file
......@@ -37,7 +37,7 @@ EDnStat dnodeGetRunStat() { return tsDnode.runStatus; }
void dnodeSetRunStat(EDnStat stat) { tsDnode.runStatus = stat; }
static void dnodeReportStartup(char *name, char *desc) {
void dnodeReportStartup(char *name, char *desc) {
SStartupStep *startup = &tsDnode.startup;
tstrncpy(startup->name, name, strlen(startup->name));
tstrncpy(startup->desc, desc, strlen(startup->desc));
......@@ -58,6 +58,7 @@ static int32_t dnodeInitVnode() {
para.fp.GetDnodeEp = dnodeGetEp;
para.fp.SendMsgToDnode = dnodeSendMsgToDnode;
para.fp.SendMsgToMnode = dnodeSendMsgToMnode;
para.fp.ReportStartup = dnodeReportStartup;
return vnodeInit(para);
}
......
......@@ -21,8 +21,10 @@ extern "C" {
#endif
#include "vnodeInt.h"
int32_t vnodeReadCfg(SVnode *pVnode);
int32_t vnodeWriteCfg(SCreateVnodeMsg *pVnodeCfg);
int32_t vnodeReadCfg(int32_t vgId, SVnodeCfg *pCfg);
int32_t vnodeWriteCfg(int32_t vgId, SVnodeCfg *pCfg);
int32_t vnodeReadTerm(int32_t vgId, SSyncServerState *pState);
int32_t vnodeWriteTerm(int32_t vgid, SSyncServerState *pState);
#ifdef __cplusplus
}
......
......@@ -16,11 +16,12 @@
#ifndef _TD_VNODE_INT_H_
#define _TD_VNODE_INT_H_
#include "os.h"
#include "amalloc.h"
#include "meta.h"
#include "os.h"
#include "sync.h"
#include "taosmsg.h"
#include "tglobal.h"
#include "tlog.h"
#include "tq.h"
#include "tqueue.h"
......@@ -43,57 +44,75 @@ extern int32_t vDebugFlag;
#define vDebug(...) { if (vDebugFlag & DEBUG_DEBUG) { taosPrintLog("VND ", vDebugFlag, __VA_ARGS__); }}
#define vTrace(...) { if (vDebugFlag & DEBUG_TRACE) { taosPrintLog("VND ", vDebugFlag, __VA_ARGS__); }}
typedef struct STsdbCfg {
int32_t cacheBlockSize; // MB
int32_t totalBlocks;
int32_t daysPerFile;
int32_t daysToKeep0;
int32_t daysToKeep1;
int32_t daysToKeep2;
int32_t minRowsPerFileBlock;
int32_t maxRowsPerFileBlock;
uint8_t precision; // time resolution
int8_t compression;
int8_t cacheLastRow;
int8_t update;
} STsdbCfg;
typedef struct SMetaCfg {
} SMetaCfg;
typedef struct SSyncCluster {
int8_t replica;
int8_t quorum;
SNodeInfo nodes[TSDB_MAX_REPLICA];
} SSyncCfg;
typedef struct SVnodeCfg {
char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN];
int8_t dropped;
SWalCfg wal;
STsdbCfg tsdb;
SMetaCfg meta;
SSyncCfg sync;
} SVnodeCfg;
typedef struct {
int32_t vgId; // global vnode group ID
int32_t refCount; // reference count
SMemAllocator *allocator;
SMeta *pMeta;
STsdb *pTsdb;
STQ *pTQ;
twalh pWal;
SyncNodeId syncNode;
taos_queue pWriteQ; // write queue
taos_queue pQueryQ; // read query queue
taos_queue pFetchQ; // read fetch/cancel queue
SWalCfg walCfg;
SSyncCluster syncCfg;
char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN];
int64_t queuedWMsgSize;
int32_t queuedWMsg;
int32_t queuedRMsg;
int32_t numOfQHandle; // current initialized and existed query handle in current dnode
int8_t status;
int8_t role;
int8_t accessState;
int8_t dropped;
pthread_mutex_t statusMutex;
int32_t vgId; // global vnode group ID
int32_t refCount; // reference count
SMemAllocator *allocator;
SMeta *pMeta;
STsdb *pTsdb;
STQ *pTQ;
twalh pWal;
void *pQuery;
SyncNodeId syncNode;
taos_queue pWriteQ; // write queue
taos_queue pQueryQ; // read query queue
taos_queue pFetchQ; // read fetch/cancel queue
SVnodeCfg cfg;
SSyncServerState term;
int64_t queuedWMsgSize;
int32_t queuedWMsg;
int32_t queuedRMsg;
int32_t numOfQHandle; // current initialized and existed query handle in current dnode
int8_t role;
int8_t accessState;
int8_t dropped;
int8_t status;
pthread_mutex_t statusMutex;
} SVnode;
typedef struct {
int32_t len;
void * rsp;
void * qhandle; // used by query and retrieve msg
void *rsp;
void *qhandle; // used by query and retrieve msg
} SVnRsp;
void vnodeSendMsgToDnode(struct SRpcEpSet *epSet, struct SRpcMsg *rpcMsg);
void vnodeSendMsgToMnode(struct SRpcMsg *rpcMsg);
void vnodeGetDnodeEp(int32_t dnodeId, char *ep, char *fqdn, uint16_t *port);
int32_t vnodeCreate(SCreateVnodeMsg *pVnodeCfg);
int32_t vnodeDrop(int32_t vgId);
int32_t vnodeOpen(int32_t vgId);
int32_t vnodeAlter(SVnode *pVnode, SCreateVnodeMsg *pVnodeCfg);
int32_t vnodeSync(int32_t vgId);
int32_t vnodeClose(int32_t vgId);
void vnodeCleanUp(SVnode *pVnode);
void vnodeDestroy(SVnode *pVnode);
int32_t vnodeCompact(int32_t vgId);
void vnodeBackup(int32_t vgId);
void vnodeGetStatus(struct SStatusMsg *status);
SVnode *vnodeAcquire(int32_t vgId);
SVnode *vnodeAcquireNotClose(int32_t vgId);
void vnodeRelease(SVnode *pVnode);
void vnodeReportStartup(char *name, char *desc);
#ifdef __cplusplus
}
......
......@@ -13,23 +13,30 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_VNODE_MGMT_MSG_H_
#define _TD_VNODE_MGMT_MSG_H_
#ifndef _TD_VNODE_MAIN_H_
#define _TD_VNODE_MAIN_H_
#include "vnodeInt.h"
#ifdef __cplusplus
extern "C" {
#endif
#include "vnodeInt.h"
int32_t vnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg);
int32_t vnodeProcessAlterVnodeMsg(SRpcMsg *rpcMsg);
int32_t vnodeProcessSyncVnodeMsg(SRpcMsg *rpcMsg);
int32_t vnodeProcessCompactVnodeMsg(SRpcMsg *rpcMsg);
int32_t vnodeProcessDropVnodeMsg(SRpcMsg *rpcMsg);
int32_t vnodeProcessAlterStreamReq(SRpcMsg *pMsg);
int32_t vnodeInitMain();
void vnodeCleanupMain();
SVnode *vnodeAcquireInAllState(int32_t vgId);
SVnode *vnodeAcquire(int32_t vgId);
void vnodeRelease(SVnode *pVnode);
int32_t vnodeCreateVnode(int32_t vgId, SVnodeCfg *pCfg);
int32_t vnodeAlterVnode(SVnode *pVnode, SVnodeCfg *pCfg);
int32_t vnodeDropVnode(SVnode *pVnode);
int32_t vnodeSyncVnode(SVnode *pVnode);
int32_t vnodeCompactVnode(SVnode *pVnode);
#ifdef __cplusplus
}
#endif
#endif /*_TD_VNODE_MGMT_H_*/
#endif /*_TD_VNODE_MAIN_H_*/
......@@ -21,6 +21,14 @@ extern "C" {
#endif
#include "vnodeInt.h"
typedef struct {
SVnode *pVnode;
SRpcMsg rpcMsg;
char pCont[];
} SVnMgmtMsg;
int32_t vnodeInitMgmt();
void vnodeCleanupMgmt();
void vnodeProcessMgmtMsg(SRpcMsg *pMsg);
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_VNODE_STATUS_H_
#define _TD_VNODE_STATUS_H_
#ifdef __cplusplus
extern "C" {
#endif
#include "vnodeInt.h"
typedef enum _VN_STATUS {
TAOS_VN_STATUS_INIT = 0,
TAOS_VN_STATUS_READY = 1,
TAOS_VN_STATUS_CLOSING = 2,
TAOS_VN_STATUS_UPDATING = 3
} EVnodeStatus;
// vnodeStatus
extern char* vnodeStatus[];
bool vnodeSetInitStatus(SVnode* pVnode);
bool vnodeSetReadyStatus(SVnode* pVnode);
bool vnodeSetClosingStatus(SVnode* pVnode);
bool vnodeSetUpdatingStatus(SVnode* pVnode);
bool vnodeInInitStatus(SVnode* pVnode);
bool vnodeInReadyStatus(SVnode* pVnode);
bool vnodeInClosingStatus(SVnode* pVnode);
#ifdef __cplusplus
}
#endif
#endif /*_TD_VNODE_STATUS_H_*/
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_VNODE_VERSION_H_
#define _TD_VNODE_VERSION_H_
#ifdef __cplusplus
extern "C" {
#endif
#include "vnodeInt.h"
int32_t vnodeReadVersion(SVnode *pVnode);
int32_t vnodeSaveVersion(SVnode *pVnode);
#ifdef __cplusplus
}
#endif
#endif /*_TD_VNODE_VERSION_H_*/
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_VNODE_WORKER_H_
#define _TD_VNODE_WORKER_H_
#ifdef __cplusplus
extern "C" {
#endif
#include "vnodeInt.h"
int32_t vnodeInitWorker();
void vnodeCleanupWorker();
void vnodeProcessCleanupTask(SVnode *pVnode);
void vnodeProcessDestroyTask(SVnode *pVnode);
void vnodeProcessBackupTask(SVnode *pVnode);
#ifdef __cplusplus
}
#endif
#endif /*_TD_VNODE_WORKER_H_*/
\ No newline at end of file
此差异已折叠。
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "thash.h"
#include "tthread.h"
#include "vnodeFile.h"
#include "vnodeMain.h"
#include "vnodeMgmt.h"
#include "vnodeRead.h"
#include "vnodeWrite.h"
typedef enum _VN_STATUS {
TAOS_VN_STATUS_INIT = 0,
TAOS_VN_STATUS_READY = 1,
TAOS_VN_STATUS_CLOSING = 2,
TAOS_VN_STATUS_UPDATING = 3
} EVnodeStatus;
char *vnodeStatus[] = {"init", "ready", "closing", "updating"};
typedef struct {
pthread_t *threadId;
int32_t threadIndex;
int32_t failed;
int32_t opened;
int32_t vnodeNum;
int32_t *vnodeList;
} SOpenVnodeThread;
static struct {
SHashObj *hash;
int32_t openVnodes;
int32_t totalVnodes;
} tsVnode;
static bool vnodeSetInitStatus(SVnode *pVnode) {
pthread_mutex_lock(&pVnode->statusMutex);
pVnode->status = TAOS_VN_STATUS_INIT;
pthread_mutex_unlock(&pVnode->statusMutex);
return true;
}
static bool vnodeSetReadyStatus(SVnode *pVnode) {
bool set = false;
pthread_mutex_lock(&pVnode->statusMutex);
if (pVnode->status == TAOS_VN_STATUS_INIT || pVnode->status == TAOS_VN_STATUS_UPDATING) {
pVnode->status = TAOS_VN_STATUS_READY;
set = true;
}
pthread_mutex_unlock(&pVnode->statusMutex);
return set;
}
static bool vnodeSetUpdatingStatus(SVnode *pVnode) {
bool set = false;
pthread_mutex_lock(&pVnode->statusMutex);
if (pVnode->status == TAOS_VN_STATUS_READY) {
pVnode->status = TAOS_VN_STATUS_UPDATING;
set = true;
}
pthread_mutex_unlock(&pVnode->statusMutex);
return set;
}
static bool vnodeSetClosingStatus(SVnode *pVnode) {
bool set = false;
pthread_mutex_lock(&pVnode->statusMutex);
if (pVnode->status == TAOS_VN_STATUS_INIT || pVnode->status == TAOS_VN_STATUS_READY) {
pVnode->status = TAOS_VN_STATUS_CLOSING;
set = true;
}
pthread_mutex_unlock(&pVnode->statusMutex);
return set;
}
static bool vnodeInStatus(SVnode *pVnode, EVnodeStatus status) {
bool in = false;
pthread_mutex_lock(&pVnode->statusMutex);
if (pVnode->status == status) {
in = true;
}
pthread_mutex_unlock(&pVnode->statusMutex);
return in;
}
static void vnodeDestroyVnode(SVnode *pVnode) {
int32_t code = 0;
int32_t vgId = pVnode->vgId;
if (pVnode->pQuery) {
// todo
}
if (pVnode->pMeta) {
// todo
}
if (pVnode->pTsdb) {
// todo
}
if (pVnode->pTQ) {
// todo
}
if (pVnode->pWal) {
// todo
}
if (pVnode->allocator) {
// todo
}
if (pVnode->pWriteQ) {
vnodeFreeWriteQueue(pVnode->pWriteQ);
pVnode->pWriteQ = NULL;
}
if (pVnode->pQueryQ) {
vnodeFreeQueryQueue(pVnode->pQueryQ);
pVnode->pQueryQ = NULL;
}
if (pVnode->pFetchQ) {
vnodeFreeFetchQueue(pVnode->pFetchQ);
pVnode->pFetchQ = NULL;
}
if (pVnode->dropped) {
// todo
}
pthread_mutex_destroy(&pVnode->statusMutex);
free(pVnode);
}
static void vnodeCleanupVnode(SVnode *pVnode) {
vnodeSetClosingStatus(pVnode);
taosHashRemove(tsVnode.hash, &pVnode->vgId, sizeof(int32_t));
vnodeRelease(pVnode);
}
static int32_t vnodeOpenVnode(int32_t vgId) {
int32_t code = 0;
SVnode *pVnode = calloc(sizeof(SVnode), 1);
if (pVnode == NULL) {
vError("vgId:%d, failed to open vnode since no enough memory", vgId);
return TAOS_SYSTEM_ERROR(errno);
}
pVnode->vgId = vgId;
pVnode->accessState = TAOS_VN_STATUS_INIT;
pVnode->status = TSDB_VN_ALL_ACCCESS;
pVnode->refCount = 1;
pVnode->role = TAOS_SYNC_ROLE_CANDIDATE;
pthread_mutex_init(&pVnode->statusMutex, NULL);
code = vnodeReadCfg(vgId, &pVnode->cfg);
if (code != TSDB_CODE_SUCCESS) {
vError("vgId:%d, failed to read config file, set cfgVersion to 0", pVnode->vgId);
pVnode->cfg.dropped = 1;
vnodeCleanupVnode(pVnode);
return 0;
}
code = vnodeReadTerm(vgId, &pVnode->term);
if (code != TSDB_CODE_SUCCESS) {
vError("vgId:%d, failed to read term file since %s", pVnode->vgId, tstrerror(code));
pVnode->cfg.dropped = 1;
vnodeCleanupVnode(pVnode);
return code;
}
pVnode->pWriteQ = vnodeAllocWriteQueue(pVnode);
pVnode->pQueryQ = vnodeAllocQueryQueue(pVnode);
pVnode->pFetchQ = vnodeAllocFetchQueue(pVnode);
if (pVnode->pWriteQ == NULL || pVnode->pQueryQ == NULL || pVnode->pFetchQ == NULL) {
vnodeCleanupVnode(pVnode);
return terrno;
}
char path[PATH_MAX + 20];
snprintf(path, sizeof(path), "%s/vnode%d/wal", tsVnodeDir, vgId);
pVnode->pWal = walOpen(path, &pVnode->cfg.wal);
if (pVnode->pWal == NULL) {
vnodeCleanupVnode(pVnode);
return terrno;
}
vDebug("vgId:%d, vnode is opened", pVnode->vgId);
taosHashPut(tsVnode.hash, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnode *));
vnodeSetReadyStatus(pVnode);
return TSDB_CODE_SUCCESS;
}
int32_t vnodeCreateVnode(int32_t vgId, SVnodeCfg *pCfg) {
int32_t code = 0;
char path[PATH_MAX + 20] = {0};
snprintf(path, sizeof(path), "%s/vnode%d", tsVnodeDir, vgId);
if (taosMkDir(path) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
vError("vgId:%d, failed to create since %s", vgId, tstrerror(code));
return code;
}
snprintf(path, sizeof(path), "%s/vnode%d/cfg", tsVnodeDir, vgId);
if (taosMkDir(path) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
vError("vgId:%d, failed to create since %s", vgId, tstrerror(code));
return code;
}
snprintf(path, sizeof(path), "%s/vnode%d/wal", tsVnodeDir, vgId);
if (taosMkDir(path) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
vError("vgId:%d, failed to create since %s", vgId, tstrerror(code));
return code;
}
snprintf(path, sizeof(path), "%s/vnode%d/tq", tsVnodeDir, vgId);
if (taosMkDir(path) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
vError("vgId:%d, failed to create since %s", vgId, tstrerror(code));
return code;
}
snprintf(path, sizeof(path), "%s/vnode%d/tsdb", tsVnodeDir, vgId);
if (taosMkDir(path) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
vError("vgId:%d, failed to create since %s", vgId, tstrerror(code));
return code;
}
snprintf(path, sizeof(path), "%s/vnode%d/meta", tsVnodeDir, vgId);
if (taosMkDir(path) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
vError("vgId:%d, failed to create since %s", vgId, tstrerror(code));
return code;
}
code = vnodeWriteCfg(vgId, pCfg);
if (code != 0) {
vError("vgId:%d, failed to save vnode cfg since %s", vgId, tstrerror(code));
return code;
}
return vnodeOpenVnode(vgId);
}
int32_t vnodeAlterVnode(SVnode * pVnode, SVnodeCfg *pCfg) {
int32_t code = 0;
int32_t vgId = pVnode->vgId;
bool walChanged = (memcmp(&pCfg->wal, &pVnode->cfg.wal, sizeof(SWalCfg)) != 0);
bool tsdbChanged = (memcmp(&pCfg->tsdb, &pVnode->cfg.tsdb, sizeof(STsdbCfg)) != 0);
bool metaChanged = (memcmp(&pCfg->meta, &pVnode->cfg.meta, sizeof(SMetaCfg)) != 0);
bool syncChanged = (memcmp(&pCfg->sync, &pVnode->cfg.sync, sizeof(SSyncCluster)) != 0);
if (!walChanged && !tsdbChanged && !metaChanged && !syncChanged) {
vDebug("vgId:%d, nothing changed", vgId);
vnodeRelease(pVnode);
return code;
}
code = vnodeWriteCfg(pVnode->vgId, pCfg);
if (code != 0) {
vError("vgId:%d, failed to write alter msg to file since %s", vgId, tstrerror(code));
vnodeRelease(pVnode);
return code;
}
pVnode->cfg = *pCfg;
if (walChanged) {
code = walAlter(pVnode->pWal, &pVnode->cfg.wal);
if (code != 0) {
vDebug("vgId:%d, failed to alter wal since %s", vgId, tstrerror(code));
vnodeRelease(pVnode);
return code;
}
}
if (tsdbChanged) {
// todo
}
if (metaChanged) {
// todo
}
if (syncChanged) {
// todo
}
vnodeRelease(pVnode);
return code;
}
int32_t vnodeDropVnode(SVnode *pVnode) {
if (pVnode->cfg.dropped) {
vInfo("vgId:%d, already set drop flag, ref:%d", pVnode->vgId, pVnode->refCount);
vnodeRelease(pVnode);
return TSDB_CODE_SUCCESS;
}
pVnode->cfg.dropped = 1;
int32_t code = vnodeWriteCfg(pVnode->vgId, &pVnode->cfg);
if (code == 0) {
vInfo("vgId:%d, set drop flag, ref:%d", pVnode->vgId, pVnode->refCount);
vnodeCleanupVnode(pVnode);
} else {
vError("vgId:%d, failed to set drop flag since %s", pVnode->vgId, tstrerror(code));
pVnode->cfg.dropped = 0;
}
vnodeRelease(pVnode);
return code;
}
int32_t vnodeSyncVnode(SVnode *pVnode) {
return TSDB_CODE_SUCCESS;
}
int32_t vnodeCompactVnode(SVnode *pVnode) {
return TSDB_CODE_SUCCESS;
}
static void *vnodeOpenVnodeFunc(void *param) {
SOpenVnodeThread *pThread = param;
vDebug("thread:%d, start to open %d vnodes", pThread->threadIndex, pThread->vnodeNum);
setThreadName("vnodeOpenVnode");
for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
int32_t vgId = pThread->vnodeList[v];
char stepDesc[TSDB_STEP_DESC_LEN] = {0};
snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to restore, %d of %d have been opened", vgId,
tsVnode.openVnodes, tsVnode.totalVnodes);
// (*vnodeInst()->fp.ReportStartup)("open-vnodes", stepDesc);
if (vnodeOpenVnode(vgId) < 0) {
vError("vgId:%d, failed to open vnode by thread:%d", vgId, pThread->threadIndex);
pThread->failed++;
} else {
vDebug("vgId:%d, is opened by thread:%d", vgId, pThread->threadIndex);
pThread->opened++;
}
atomic_add_fetch_32(&tsVnode.openVnodes, 1);
}
vDebug("thread:%d, total vnodes:%d, opened:%d failed:%d", pThread->threadIndex, pThread->vnodeNum, pThread->opened,
pThread->failed);
return NULL;
}
static int32_t vnodeGetVnodeListFromDisk(int32_t vnodeList[], int32_t *numOfVnodes) {
#if 0
DIR *dir = opendir(tsVnodeDir);
if (dir == NULL) return TSDB_CODE_DND_NO_WRITE_ACCESS;
*numOfVnodes = 0;
struct dirent *de = NULL;
while ((de = readdir(dir)) != NULL) {
if (strcmp(de->d_name, ".") == 0 || strcmp(de->d_name, "..") == 0) continue;
if (de->d_type & DT_DIR) {
if (strncmp("vnode", de->d_name, 5) != 0) continue;
int32_t vnode = atoi(de->d_name + 5);
if (vnode == 0) continue;
(*numOfVnodes)++;
if (*numOfVnodes >= TSDB_MAX_VNODES) {
vError("vgId:%d, too many vnode directory in disk, exist:%d max:%d", vnode, *numOfVnodes, TSDB_MAX_VNODES);
closedir(dir);
return TSDB_CODE_DND_TOO_MANY_VNODES;
} else {
vnodeList[*numOfVnodes - 1] = vnode;
}
}
}
closedir(dir);
#endif
return TSDB_CODE_SUCCESS;
}
static int32_t vnodeOpenVnodes() {
int32_t vnodeList[TSDB_MAX_VNODES] = {0};
int32_t numOfVnodes = 0;
int32_t status = vnodeGetVnodeListFromDisk(vnodeList, &numOfVnodes);
if (status != TSDB_CODE_SUCCESS) {
vInfo("failed to get vnode list from disk since code:%d", status);
return status;
}
tsVnode.totalVnodes = numOfVnodes;
int32_t threadNum = tsNumOfCores;
int32_t vnodesPerThread = numOfVnodes / threadNum + 1;
SOpenVnodeThread *threads = calloc(threadNum, sizeof(SOpenVnodeThread));
for (int32_t t = 0; t < threadNum; ++t) {
threads[t].threadIndex = t;
threads[t].vnodeList = calloc(vnodesPerThread, sizeof(int32_t));
}
for (int32_t v = 0; v < numOfVnodes; ++v) {
int32_t t = v % threadNum;
SOpenVnodeThread *pThread = &threads[t];
pThread->vnodeList[pThread->vnodeNum++] = vnodeList[v];
}
vInfo("start %d threads to open %d vnodes", threadNum, numOfVnodes);
for (int32_t t = 0; t < threadNum; ++t) {
SOpenVnodeThread *pThread = &threads[t];
if (pThread->vnodeNum == 0) continue;
pThread->threadId = taosCreateThread(vnodeOpenVnodeFunc, pThread);
if (pThread->threadId == NULL) {
vError("thread:%d, failed to create thread to open vnode, reason:%s", pThread->threadIndex, strerror(errno));
}
}
int32_t openVnodes = 0;
int32_t failedVnodes = 0;
for (int32_t t = 0; t < threadNum; ++t) {
SOpenVnodeThread *pThread = &threads[t];
taosDestoryThread(pThread->threadId);
pThread->threadId = NULL;
openVnodes += pThread->opened;
failedVnodes += pThread->failed;
free(pThread->vnodeList);
}
free(threads);
vInfo("there are total vnodes:%d, opened:%d", numOfVnodes, openVnodes);
if (failedVnodes != 0) {
vError("there are total vnodes:%d, failed:%d", numOfVnodes, failedVnodes);
return -1;
}
return TSDB_CODE_SUCCESS;
}
static int32_t vnodeGetVnodeList(SVnode *vnodeList[], int32_t *numOfVnodes) {
void *pIter = taosHashIterate(tsVnode.hash, NULL);
while (pIter) {
SVnode **pVnode = pIter;
if (*pVnode) {
(*numOfVnodes)++;
if (*numOfVnodes >= TSDB_MAX_VNODES) {
vError("vgId:%d, too many open vnodes, exist:%d max:%d", (*pVnode)->vgId, *numOfVnodes, TSDB_MAX_VNODES);
continue;
} else {
vnodeList[*numOfVnodes - 1] = (*pVnode);
}
}
pIter = taosHashIterate(tsVnode.hash, pIter);
}
return TSDB_CODE_SUCCESS;
}
static void vnodeCleanupVnodes() {
SVnode* vnodeList[TSDB_MAX_VNODES] = {0};
int32_t numOfVnodes = 0;
int32_t code = vnodeGetVnodeList(vnodeList, &numOfVnodes);
if (code != TSDB_CODE_SUCCESS) {
vInfo("failed to get dnode list since code %d", code);
return;
}
for (int32_t i = 0; i < numOfVnodes; ++i) {
vnodeCleanupVnode(vnodeList[i]);
}
vInfo("total vnodes:%d are all closed", numOfVnodes);
}
static void vnodeIncRef(void *ptNode) {
assert(ptNode != NULL);
SVnode **ppVnode = (SVnode **)ptNode;
assert(ppVnode);
assert(*ppVnode);
SVnode *pVnode = *ppVnode;
atomic_add_fetch_32(&pVnode->refCount, 1);
vTrace("vgId:%d, get vnode, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode);
}
SVnode *vnodeAcquireInAllState(int32_t vgId) {
SVnode *pVnode = NULL;
// taosHashGetClone(tsVnode.hash, &vgId, sizeof(int32_t), vnodeIncRef, (void*)&pVnode);
if (pVnode == NULL) {
vDebug("vgId:%d, can't accquire since not exist", vgId);
terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
return NULL;
}
return pVnode;
}
SVnode *vnodeAcquire(int32_t vgId) {
SVnode *pVnode = vnodeAcquireInAllState(vgId);
if (pVnode == NULL) return NULL;
if (vnodeInStatus(pVnode, TAOS_VN_STATUS_READY)) {
return pVnode;
} else {
vDebug("vgId:%d, can't accquire since not in ready status", vgId);
vnodeRelease(pVnode);
terrno = TSDB_CODE_VND_INVALID_TSDB_STATE;
return NULL;
}
}
void vnodeRelease(SVnode *pVnode) {
if (pVnode == NULL) return;
int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1);
int32_t vgId = pVnode->vgId;
vTrace("vgId:%d, release vnode, refCount:%d pVnode:%p", vgId, refCount, pVnode);
assert(refCount >= 0);
if (refCount <= 0) {
vDebug("vgId:%d, vnode will be destroyed, refCount:%d pVnode:%p", vgId, refCount, pVnode);
vnodeDestroyVnode(pVnode);
int32_t count = taosHashGetSize(tsVnode.hash);
vDebug("vgId:%d, vnode is destroyed, vnodes:%d", vgId, count);
}
}
int32_t vnodeInitMain() {
tsVnode.hash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
if (tsVnode.hash == NULL) {
vError("failed to init vnode mgmt");
return -1;
}
vInfo("vnode main is initialized");
return vnodeOpenVnodes();
}
void vnodeCleanupMain() {
vnodeCleanupVnodes();
taosHashCleanup(tsVnode.hash);
tsVnode.hash = NULL;
}
static void vnodeBuildVloadMsg(SVnode *pVnode, SStatusMsg *pStatus) {
int64_t totalStorage = 0;
int64_t compStorage = 0;
int64_t pointsWritten = 0;
if (pStatus->openVnodes >= TSDB_MAX_VNODES) return;
// if (pVnode->tsdb) {
// tsdbReportStat(pVnode->tsdb, &pointsWritten, &totalStorage, &compStorage);
// }
SVnodeLoad *pLoad = &pStatus->load[pStatus->openVnodes++];
pLoad->vgId = htonl(pVnode->vgId);
pLoad->totalStorage = htobe64(totalStorage);
pLoad->compStorage = htobe64(compStorage);
pLoad->pointsWritten = htobe64(pointsWritten);
pLoad->status = pVnode->status;
pLoad->role = pVnode->role;
}
void vnodeGetStatus(SStatusMsg *pStatus) {
void *pIter = taosHashIterate(tsVnode.hash, NULL);
while (pIter) {
SVnode **pVnode = pIter;
if (*pVnode) {
vnodeBuildVloadMsg(*pVnode, pStatus);
}
pIter = taosHashIterate(tsVnode.hash, pIter);
}
}
void vnodeSetAccess(SVgroupAccess *pAccess, int32_t numOfVnodes) {
for (int32_t i = 0; i < numOfVnodes; ++i) {
pAccess[i].vgId = htonl(pAccess[i].vgId);
SVnode *pVnode = vnodeAcquire(pAccess[i].vgId);
if (pVnode != NULL) {
pVnode->accessState = pAccess[i].accessState;
if (pVnode->accessState != TSDB_VN_ALL_ACCCESS) {
vDebug("vgId:%d, access state is set to %d", pAccess[i].vgId, pVnode->accessState);
}
vnodeRelease(pVnode);
}
}
}
......@@ -15,21 +15,184 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "vnodeMain.h"
#include "vnodeMgmt.h"
#include "vnodeMgmtMsg.h"
typedef struct {
SRpcMsg rpcMsg;
char pCont[];
} SVnMgmtMsg;
static struct {
SWorkerPool pool;
taos_queue pQueue;
SWorkerPool createPool;
taos_queue createQueue;
SWorkerPool workerPool;
taos_queue workerQueue;
int32_t (*msgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *);
} tsVmgmt = {0};
static int32_t vnodeParseCreateVnodeReq(SRpcMsg *rpcMsg, int32_t *vgId, SVnodeCfg *pCfg) {
SCreateVnodeMsg *pCreate = rpcMsg->pCont;
*vgId = htonl(pCreate->vgId);
pCfg->dropped = 0;
tstrncpy(pCfg->db, pCreate->db, sizeof(pCfg->db));
pCfg->tsdb.cacheBlockSize = htonl(pCreate->cacheBlockSize);
pCfg->tsdb.totalBlocks = htonl(pCreate->totalBlocks);
pCfg->tsdb.daysPerFile = htonl(pCreate->daysPerFile);
pCfg->tsdb.daysToKeep1 = htonl(pCreate->daysToKeep1);
pCfg->tsdb.daysToKeep2 = htonl(pCreate->daysToKeep2);
pCfg->tsdb.daysToKeep0 = htonl(pCreate->daysToKeep0);
pCfg->tsdb.minRowsPerFileBlock = htonl(pCreate->minRowsPerFileBlock);
pCfg->tsdb.maxRowsPerFileBlock = htonl(pCreate->maxRowsPerFileBlock);
pCfg->tsdb.precision = pCreate->precision;
pCfg->tsdb.compression = pCreate->compression;
pCfg->tsdb.cacheLastRow = pCreate->cacheLastRow;
pCfg->tsdb.update = pCreate->update;
pCfg->wal.fsyncPeriod = htonl(pCreate->fsyncPeriod);
pCfg->wal.walLevel = pCreate->walLevel;
pCfg->sync.replica = pCreate->replica;
pCfg->sync.quorum = pCreate->quorum;
for (int32_t j = 0; j < pCreate->replica; ++j) {
pCfg->sync.nodes[j].nodePort = htons(pCreate->nodes[j].port);
tstrncpy(pCfg->sync.nodes[j].nodeFqdn, pCreate->nodes[j].fqdn, TSDB_FQDN_LEN);
}
return 0;
}
static int32_t vnodeProcessCreateVnodeReq(SRpcMsg *rpcMsg) {
SVnodeCfg vnodeCfg = {0};
int32_t vgId = 0;
int32_t code = vnodeParseCreateVnodeReq(rpcMsg, &vgId, &vnodeCfg);
if (code != 0) {
vError("failed to parse create vnode msg since %s", tstrerror(code));
}
vDebug("vgId:%d, create vnode req is received", vgId);
SVnode *pVnode = vnodeAcquireInAllState(vgId);
if (pVnode != NULL) {
vDebug("vgId:%d, already exist, return success", vgId);
vnodeRelease(pVnode);
return code;
}
code = vnodeCreateVnode(vgId, &vnodeCfg);
if (code != 0) {
vError("vgId:%d, failed to create vnode since %s", vgId, tstrerror(code));
}
return code;
}
static int32_t vnodeProcessAlterVnodeReq(SRpcMsg *rpcMsg) {
SVnodeCfg vnodeCfg = {0};
int32_t vgId = 0;
int32_t code = vnodeParseCreateVnodeReq(rpcMsg, &vgId, &vnodeCfg);
if (code != 0) {
vError("failed to parse create vnode msg since %s", tstrerror(code));
}
vDebug("vgId:%d, alter vnode req is received", vgId);
SVnode *pVnode = vnodeAcquire(vgId);
if (pVnode == NULL) {
code = terrno;
vDebug("vgId:%d, failed to alter vnode since %s", vgId, tstrerror(code));
return code;
}
code = vnodeAlterVnode(pVnode, &vnodeCfg);
if (code != 0) {
vError("vgId:%d, failed to alter vnode since %s", vgId, tstrerror(code));
}
vnodeRelease(pVnode);
return code;
}
static SDropVnodeMsg *vnodeParseDropVnodeReq(SRpcMsg *rpcMsg) {
SDropVnodeMsg *pDrop = rpcMsg->pCont;
pDrop->vgId = htonl(pDrop->vgId);
return pDrop;
}
static int32_t vnodeProcessSyncVnodeReq(SRpcMsg *rpcMsg) {
SSyncVnodeMsg *pSync = (SSyncVnodeMsg *)vnodeParseDropVnodeReq(rpcMsg);
int32_t code = 0;
int32_t vgId = pSync->vgId;
vDebug("vgId:%d, sync vnode req is received", vgId);
SVnode *pVnode = vnodeAcquire(vgId);
if (pVnode == NULL) {
code = terrno;
vDebug("vgId:%d, failed to sync since %s", vgId, tstrerror(code));
return code;
}
code = vnodeSyncVnode(pVnode);
if (code != 0) {
vError("vgId:%d, failed to compact vnode since %s", vgId, tstrerror(code));
}
vnodeRelease(pVnode);
return code;
}
static int32_t vnodeProcessCompactVnodeReq(SRpcMsg *rpcMsg) {
SCompactVnodeMsg *pCompact = (SCompactVnodeMsg *)vnodeParseDropVnodeReq(rpcMsg);
int32_t code = 0;
int32_t vgId = pCompact->vgId;
vDebug("vgId:%d, compact vnode req is received", vgId);
SVnode *pVnode = vnodeAcquire(vgId);
if (pVnode == NULL) {
code = terrno;
vDebug("vgId:%d, failed to compact since %s", vgId, tstrerror(code));
return code;
}
code = vnodeCompactVnode(pVnode);
if (code != 0) {
vError("vgId:%d, failed to compact vnode since %s", vgId, tstrerror(code));
}
vnodeRelease(pVnode);
return code;
}
static int32_t vnodeProcessDropVnodeReq(SRpcMsg *rpcMsg) {
SDropVnodeMsg *pDrop = vnodeParseDropVnodeReq(rpcMsg);
int32_t code = 0;
int32_t vgId = pDrop->vgId;
vDebug("vgId:%d, drop vnode req is received", vgId);
SVnode *pVnode = vnodeAcquire(vgId);
if (pVnode == NULL) {
code = terrno;
vDebug("vgId:%d, failed to drop since %s", vgId, tstrerror(code));
return code;
}
code = vnodeDropVnode(pVnode);
if (code != 0) {
vError("vgId:%d, failed to drop vnode since %s", vgId, tstrerror(code));
}
vnodeRelease(pVnode);
return code;
}
static int32_t vnodeProcessAlterStreamReq(SRpcMsg *pMsg) {
vError("alter stream msg not processed");
return TSDB_CODE_VND_MSG_NOT_PROCESSED;
}
static int32_t vnodeProcessMgmtStart(void *unused, SVnMgmtMsg *pMgmt, int32_t qtype) {
SRpcMsg *pMsg = &pMgmt->rpcMsg;
int32_t msgType = pMsg->msgType;
......@@ -43,27 +206,21 @@ static int32_t vnodeProcessMgmtStart(void *unused, SVnMgmtMsg *pMgmt, int32_t qt
}
}
static void vnodeSendMgmtEnd(void *unused, SVnMgmtMsg *pMgmt, int32_t qtype, int32_t code) {
static void vnodeProcessMgmtEnd(void *unused, SVnMgmtMsg *pMgmt, int32_t qtype, int32_t code) {
SRpcMsg *pMsg = &pMgmt->rpcMsg;
SRpcMsg rsp = {0};
vTrace("msg:%p, is processed, result:%s", pMgmt, tstrerror(code));
rsp.code = code;
vTrace("msg:%p, is processed, code:0x%x", pMgmt, rsp.code);
if (rsp.code != TSDB_CODE_DND_ACTION_IN_PROGRESS) {
rsp.handle = pMsg->handle;
rsp.pCont = NULL;
rpcSendResponse(&rsp);
}
taosFreeQitem(pMsg);
SRpcMsg rsp = {.code = code, .handle = pMsg->handle};
rpcSendResponse(&rsp);
taosFreeQitem(pMgmt);
}
static void vnodeInitMgmtReqFp() {
tsVmgmt.msgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = vnodeProcessCreateVnodeMsg;
tsVmgmt.msgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = vnodeProcessAlterVnodeMsg;
tsVmgmt.msgFp[TSDB_MSG_TYPE_MD_SYNC_VNODE] = vnodeProcessSyncVnodeMsg;
tsVmgmt.msgFp[TSDB_MSG_TYPE_MD_COMPACT_VNODE]= vnodeProcessCompactVnodeMsg;
tsVmgmt.msgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = vnodeProcessDropVnodeMsg;
tsVmgmt.msgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = vnodeProcessCreateVnodeReq;
tsVmgmt.msgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = vnodeProcessAlterVnodeReq;
tsVmgmt.msgFp[TSDB_MSG_TYPE_MD_SYNC_VNODE] = vnodeProcessSyncVnodeReq;
tsVmgmt.msgFp[TSDB_MSG_TYPE_MD_COMPACT_VNODE] = vnodeProcessCompactVnodeReq;
tsVmgmt.msgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = vnodeProcessDropVnodeReq;
tsVmgmt.msgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = vnodeProcessAlterStreamReq;
}
......@@ -75,14 +232,18 @@ static int32_t vnodeWriteToMgmtQueue(SRpcMsg *pMsg) {
pMgmt->rpcMsg = *pMsg;
pMgmt->rpcMsg.pCont = pMgmt->pCont;
memcpy(pMgmt->pCont, pMsg->pCont, pMsg->contLen);
taosWriteQitem(tsVmgmt.pQueue, TAOS_QTYPE_RPC, pMgmt);
return TSDB_CODE_SUCCESS;
if (pMsg->msgType == TSDB_MSG_TYPE_MD_CREATE_VNODE) {
return taosWriteQitem(tsVmgmt.createQueue, TAOS_QTYPE_RPC, pMgmt);
} else {
return taosWriteQitem(tsVmgmt.workerQueue, TAOS_QTYPE_RPC, pMgmt);
}
}
void vnodeProcessMgmtMsg(SRpcMsg *pMsg) {
int32_t code = vnodeWriteToMgmtQueue(pMsg);
if (code != TSDB_CODE_SUCCESS) {
vError("msg, ahandle:%p type:%s not processed since %s", pMsg->ahandle, taosMsg[pMsg->msgType], tstrerror(code));
SRpcMsg rsp = {.handle = pMsg->handle, .code = code};
rpcSendResponse(&rsp);
}
......@@ -93,25 +254,41 @@ void vnodeProcessMgmtMsg(SRpcMsg *pMsg) {
int32_t vnodeInitMgmt() {
vnodeInitMgmtReqFp();
SWorkerPool *pPool = &tsVmgmt.pool;
pPool->name = "vmgmt";
SWorkerPool *pPool = &tsVmgmt.createPool;
pPool->name = "vnode-mgmt-create";
pPool->startFp = (ProcessStartFp)vnodeProcessMgmtStart;
pPool->endFp = (ProcessEndFp)vnodeSendMgmtEnd;
pPool->endFp = (ProcessEndFp)vnodeProcessMgmtEnd;
pPool->min = 1;
pPool->max = 1;
if (tWorkerInit(pPool) != 0) {
return TSDB_CODE_VND_OUT_OF_MEMORY;
}
tsVmgmt.pQueue = tWorkerAllocQueue(pPool, NULL);
tsVmgmt.createQueue = tWorkerAllocQueue(pPool, NULL);
vInfo("vmgmt is initialized, max worker %d", pPool->max);
pPool = &tsVmgmt.workerPool;
pPool->name = "vnode-mgmt-worker";
pPool->startFp = (ProcessStartFp)vnodeProcessMgmtStart;
pPool->endFp = (ProcessEndFp)vnodeProcessMgmtEnd;
pPool->min = 1;
pPool->max = 1;
if (tWorkerInit(pPool) != 0) {
return TSDB_CODE_VND_OUT_OF_MEMORY;
}
tsVmgmt.workerQueue = tWorkerAllocQueue(pPool, NULL);
vInfo("vmgmt is initialized");
return TSDB_CODE_SUCCESS;
}
void vnodeCleanupMgmt() {
tWorkerFreeQueue(&tsVmgmt.pool, tsVmgmt.pQueue);
tWorkerCleanup(&tsVmgmt.pool);
tsVmgmt.pQueue = NULL;
tWorkerFreeQueue(&tsVmgmt.createPool, tsVmgmt.createQueue);
tWorkerCleanup(&tsVmgmt.createPool);
tsVmgmt.createQueue = NULL;
tWorkerFreeQueue(&tsVmgmt.workerPool, tsVmgmt.workerQueue);
tWorkerCleanup(&tsVmgmt.workerPool);
tsVmgmt.createQueue = NULL;
vInfo("vmgmt is closed");
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "vnodeMgmtMsg.h"
static SCreateVnodeMsg* vnodeParseVnodeMsg(SRpcMsg *rpcMsg) {
SCreateVnodeMsg *pCreate = rpcMsg->pCont;
pCreate->cfg.vgId = htonl(pCreate->cfg.vgId);
pCreate->cfg.dbCfgVersion = htonl(pCreate->cfg.dbCfgVersion);
pCreate->cfg.vgCfgVersion = htonl(pCreate->cfg.vgCfgVersion);
pCreate->cfg.maxTables = htonl(pCreate->cfg.maxTables);
pCreate->cfg.cacheBlockSize = htonl(pCreate->cfg.cacheBlockSize);
pCreate->cfg.totalBlocks = htonl(pCreate->cfg.totalBlocks);
pCreate->cfg.daysPerFile = htonl(pCreate->cfg.daysPerFile);
pCreate->cfg.daysToKeep1 = htonl(pCreate->cfg.daysToKeep1);
pCreate->cfg.daysToKeep2 = htonl(pCreate->cfg.daysToKeep2);
pCreate->cfg.daysToKeep = htonl(pCreate->cfg.daysToKeep);
pCreate->cfg.minRowsPerFileBlock = htonl(pCreate->cfg.minRowsPerFileBlock);
pCreate->cfg.maxRowsPerFileBlock = htonl(pCreate->cfg.maxRowsPerFileBlock);
pCreate->cfg.fsyncPeriod = htonl(pCreate->cfg.fsyncPeriod);
pCreate->cfg.commitTime = htonl(pCreate->cfg.commitTime);
for (int32_t j = 0; j < pCreate->cfg.vgReplica; ++j) {
pCreate->nodes[j].nodeId = htonl(pCreate->nodes[j].nodeId);
}
return pCreate;
}
int32_t vnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) {
SCreateVnodeMsg *pCreate = vnodeParseVnodeMsg(rpcMsg);
SVnode *pVnode = vnodeAcquire(pCreate->cfg.vgId);
if (pVnode != NULL) {
vDebug("vgId:%d, already exist, return success", pCreate->cfg.vgId);
vnodeRelease(pVnode);
return TSDB_CODE_SUCCESS;
} else {
vDebug("vgId:%d, create vnode msg is received", pCreate->cfg.vgId);
return vnodeCreate(pCreate);
}
}
int32_t vnodeProcessAlterVnodeMsg(SRpcMsg *rpcMsg) {
SAlterVnodeMsg *pAlter = vnodeParseVnodeMsg(rpcMsg);
void *pVnode = vnodeAcquireNotClose(pAlter->cfg.vgId);
if (pVnode != NULL) {
vDebug("vgId:%d, alter vnode msg is received", pAlter->cfg.vgId);
int32_t code = vnodeAlter(pVnode, pAlter);
vnodeRelease(pVnode);
return code;
} else {
vInfo("vgId:%d, vnode not exist, can't alter it", pAlter->cfg.vgId);
return TSDB_CODE_VND_INVALID_VGROUP_ID;
}
}
int32_t vnodeProcessSyncVnodeMsg(SRpcMsg *rpcMsg) {
SSyncVnodeMsg *pSyncVnode = rpcMsg->pCont;
pSyncVnode->vgId = htonl(pSyncVnode->vgId);
return vnodeSync(pSyncVnode->vgId);
}
int32_t vnodeProcessCompactVnodeMsg(SRpcMsg *rpcMsg) {
SCompactVnodeMsg *pCompactVnode = rpcMsg->pCont;
pCompactVnode->vgId = htonl(pCompactVnode->vgId);
return vnodeCompact(pCompactVnode->vgId);
}
int32_t vnodeProcessDropVnodeMsg(SRpcMsg *rpcMsg) {
SDropVnodeMsg *pDrop = rpcMsg->pCont;
pDrop->vgId = htonl(pDrop->vgId);
return vnodeDrop(pDrop->vgId);
}
int32_t vnodeProcessAlterStreamReq(SRpcMsg *pMsg) { return 0; }
......@@ -14,14 +14,9 @@
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "taosmsg.h"
#include "tglobal.h"
// #include "query.h"
#include "vnodeMain.h"
#include "vnodeRead.h"
#include "vnodeReadMsg.h"
#include "vnodeStatus.h"
static struct {
SWorkerPool query;
......@@ -50,11 +45,6 @@ static int32_t vnodeWriteToRQueue(SVnode *pVnode, void *pCont, int32_t contLen,
}
#endif
if (!vnodeInReadyStatus(pVnode)) {
vDebug("vgId:%d, failed to write into vread queue, vnode status is %s", pVnode->vgId, vnodeStatus[pVnode->status]);
return TSDB_CODE_APP_NOT_READY;
}
int32_t size = sizeof(SReadMsg) + contLen;
SReadMsg *pRead = taosAllocateQitem(size);
if (pRead == NULL) {
......@@ -119,7 +109,7 @@ void vnodeProcessReadMsg(SRpcMsg *pMsg) {
pHead->contLen = htonl(pHead->contLen);
assert(pHead->contLen > 0);
SVnode *pVnode = vnodeAcquireNotClose(pHead->vgId);
SVnode *pVnode = vnodeAcquire(pHead->vgId);
if (pVnode != NULL) {
code = vnodeWriteToRQueue(pVnode, pCont, pHead->contLen, TAOS_QTYPE_RPC, pMsg);
if (code == TSDB_CODE_SUCCESS) queuedMsgNum++;
......
......@@ -14,11 +14,7 @@
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "taosmsg.h"
#include "tglobal.h"
// #include "query.h"
#include "vnodeStatus.h"
#include "vnodeMain.h"
#include "vnodeRead.h"
#include "vnodeReadMsg.h"
......@@ -225,16 +221,16 @@ int32_t vnodeProcessConsumeMsg(SVnode *pVnode, SReadMsg *pRead) {
tmqMsgHead msgHead = pConsumeMsg->head;
//extract head
STQ *pTq = pVnode->pTQ;
tqGroupHandle *pHandle = tqFindGHandleBycId(pTq, msgHead.clientId);
/*tqBufferHandle *pHandle = tqGetHandle(pTq, msgHead.clientId);*/
//return msg if offset not moved
if(pConsumeMsg->commitOffset == pHandle->consumeOffset) {
/*if(pConsumeMsg->commitOffset == pHandle->consumeOffset) {*/
//return msg
return 0;
}
/*return 0;*/
/*}*/
//or move offset
tqMoveOffsetToNext(pHandle);
/*tqMoveOffsetToNext(pHandle);*/
//fetch or register context
tqFetchMsg(pHandle, pRead);
/*tqFetchMsg(pHandle, pRead);*/
//judge mode, tail read or catch up read
/*int64_t lastVer = walLastVer(pVnode->wal);*/
//launch new query
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "taosmsg.h"
// #include "query.h"
#include "vnodeRead.h"
#include "vnodeStatus.h"
#include "vnodeWrite.h"
char* vnodeStatus[] = {
"init",
"ready",
"closing",
"updating",
"reset"
};
bool vnodeSetInitStatus(SVnode* pVnode) {
pthread_mutex_lock(&pVnode->statusMutex);
pVnode->status = TAOS_VN_STATUS_INIT;
pthread_mutex_unlock(&pVnode->statusMutex);
return true;
}
bool vnodeSetReadyStatus(SVnode* pVnode) {
bool set = false;
pthread_mutex_lock(&pVnode->statusMutex);
if (pVnode->status == TAOS_VN_STATUS_INIT || pVnode->status == TAOS_VN_STATUS_READY ||
pVnode->status == TAOS_VN_STATUS_UPDATING) {
pVnode->status = TAOS_VN_STATUS_READY;
set = true;
}
#if 0
qQueryMgmtReOpen(pVnode->qMgmt);
#endif
pthread_mutex_unlock(&pVnode->statusMutex);
return set;
}
static bool vnodeSetClosingStatusImp(SVnode* pVnode) {
bool set = false;
pthread_mutex_lock(&pVnode->statusMutex);
if (pVnode->status == TAOS_VN_STATUS_READY || pVnode->status == TAOS_VN_STATUS_INIT) {
pVnode->status = TAOS_VN_STATUS_CLOSING;
set = true;
}
pthread_mutex_unlock(&pVnode->statusMutex);
return set;
}
bool vnodeSetClosingStatus(SVnode* pVnode) {
if (pVnode->status == TAOS_VN_STATUS_CLOSING)
return true;
while (!vnodeSetClosingStatusImp(pVnode)) {
taosMsleep(1);
}
#if 0
// release local resources only after cutting off outside connections
qQueryMgmtNotifyClosed(pVnode->qMgmt);
#endif
vnodeWaitReadCompleted(pVnode);
vnodeWaitWriteCompleted(pVnode);
return true;
}
bool vnodeSetUpdatingStatus(SVnode* pVnode) {
bool set = false;
pthread_mutex_lock(&pVnode->statusMutex);
if (pVnode->status == TAOS_VN_STATUS_READY) {
pVnode->status = TAOS_VN_STATUS_UPDATING;
set = true;
}
pthread_mutex_unlock(&pVnode->statusMutex);
return set;
}
bool vnodeInInitStatus(SVnode* pVnode) {
bool in = false;
pthread_mutex_lock(&pVnode->statusMutex);
if (pVnode->status == TAOS_VN_STATUS_INIT) {
in = true;
}
pthread_mutex_unlock(&pVnode->statusMutex);
return in;
}
bool vnodeInReadyStatus(SVnode* pVnode) {
bool in = false;
pthread_mutex_lock(&pVnode->statusMutex);
if (pVnode->status == TAOS_VN_STATUS_READY) {
in = true;
}
pthread_mutex_unlock(&pVnode->statusMutex);
return in;
}
bool vnodeInClosingStatus(SVnode* pVnode) {
bool in = false;
pthread_mutex_lock(&pVnode->statusMutex);
if (pVnode->status == TAOS_VN_STATUS_CLOSING) {
in = true;
}
pthread_mutex_unlock(&pVnode->statusMutex);
return in;
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "cJSON.h"
#include "tglobal.h"
#include "vnodeVersion.h"
int32_t vnodeReadVersion(SVnode *pVnode) {
int32_t len = 0;
int32_t maxLen = 100;
char * content = calloc(1, maxLen + 1);
cJSON * root = NULL;
FILE * fp = NULL;
terrno = TSDB_CODE_VND_INVALID_VRESION_FILE;
char file[TSDB_FILENAME_LEN + 30] = {0};
sprintf(file, "%s/vnode%d/version.json", tsVnodeDir, pVnode->vgId);
fp = fopen(file, "r");
if (!fp) {
if (errno != ENOENT) {
vError("vgId:%d, failed to read %s, error:%s", pVnode->vgId, file, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
} else {
terrno = TSDB_CODE_SUCCESS;
}
goto PARSE_VER_ERROR;
}
len = (int32_t)fread(content, 1, maxLen, fp);
if (len <= 0) {
vError("vgId:%d, failed to read %s, content is null", pVnode->vgId, file);
goto PARSE_VER_ERROR;
}
root = cJSON_Parse(content);
if (root == NULL) {
vError("vgId:%d, failed to read %s, invalid json format", pVnode->vgId, file);
goto PARSE_VER_ERROR;
}
cJSON *ver = cJSON_GetObjectItem(root, "version");
if (!ver || ver->type != cJSON_Number) {
vError("vgId:%d, failed to read %s, version not found", pVnode->vgId, file);
goto PARSE_VER_ERROR;
}
#if 0
pVnode->version = (uint64_t)ver->valueint;
terrno = TSDB_CODE_SUCCESS;
vInfo("vgId:%d, read %s successfully, fver:%" PRIu64, pVnode->vgId, file, pVnode->version);
#endif
PARSE_VER_ERROR:
if (content != NULL) free(content);
if (root != NULL) cJSON_Delete(root);
if (fp != NULL) fclose(fp);
return terrno;
}
int32_t vnodeSaveVersion(SVnode *pVnode) {
char file[TSDB_FILENAME_LEN + 30] = {0};
sprintf(file, "%s/vnode%d/version.json", tsVnodeDir, pVnode->vgId);
FILE *fp = fopen(file, "w");
if (!fp) {
vError("vgId:%d, failed to write %s, reason:%s", pVnode->vgId, file, strerror(errno));
return -1;
}
int32_t len = 0;
int32_t maxLen = 100;
char * content = calloc(1, maxLen + 1);
#if 0
len += snprintf(content + len, maxLen - len, "{\n");
len += snprintf(content + len, maxLen - len, " \"version\": %" PRIu64 "\n", pVnode->fversion);
len += snprintf(content + len, maxLen - len, "}\n");
#endif
fwrite(content, 1, len, fp);
taosFsyncFile(fileno(fp));
fclose(fp);
free(content);
terrno = 0;
// vInfo("vgId:%d, successed to write %s, fver:%" PRIu64, pVnode->vgId, file, pVnode->fversion);
return TSDB_CODE_SUCCESS;
}
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "vnodeWorker.h"
enum { CLEANUP_TASK = 0, DESTROY_TASK = 1, BACKUP_TASK = 2 };
typedef struct {
int32_t vgId;
int32_t code;
int32_t type;
void * rpcHandle;
SVnode *pVnode;
} SVnTask;
static struct {
SWorkerPool pool;
taos_queue pQueue;
} tsVworker = {0};
static void vnodeProcessTaskStart(void *unused, SVnTask *pTask, int32_t qtype) {
pTask->code = 0;
switch (pTask->type) {
case CLEANUP_TASK:
vnodeCleanUp(pTask->pVnode);
break;
case DESTROY_TASK:
vnodeDestroy(pTask->pVnode);
break;
case BACKUP_TASK:
vnodeBackup(pTask->vgId);
break;
default:
break;
}
}
static void vnodeProcessTaskEnd(void *unused, SVnTask *pTask, int32_t qtype, int32_t code) {
if (pTask->rpcHandle != NULL) {
SRpcMsg rpcRsp = {.handle = pTask->rpcHandle, .code = pTask->code};
rpcSendResponse(&rpcRsp);
}
taosFreeQitem(pTask);
}
static int32_t vnodeWriteIntoTaskQueue(SVnode *pVnode, int32_t type, void *rpcHandle) {
SVnTask *pTask = taosAllocateQitem(sizeof(SVnTask));
if (pTask == NULL) return TSDB_CODE_VND_OUT_OF_MEMORY;
pTask->vgId = pVnode->vgId;
pTask->pVnode = pVnode;
pTask->rpcHandle = rpcHandle;
pTask->type = type;
return taosWriteQitem(tsVworker.pQueue, TAOS_QTYPE_RPC, pTask);
}
void vnodeProcessCleanupTask(SVnode *pVnode) {
vnodeWriteIntoTaskQueue(pVnode, CLEANUP_TASK, NULL);
}
void vnodeProcessDestroyTask(SVnode *pVnode) {
vnodeWriteIntoTaskQueue(pVnode, DESTROY_TASK, NULL);
}
void vnodeProcessBackupTask(SVnode *pVnode) {
vnodeWriteIntoTaskQueue(pVnode, BACKUP_TASK, NULL);
}
int32_t vnodeInitWorker() {
SWorkerPool *pPool = &tsVworker.pool;
pPool->name = "vworker";
pPool->startFp = (ProcessStartFp)vnodeProcessTaskStart;
pPool->endFp = (ProcessEndFp)vnodeProcessTaskEnd;
pPool->min = 0;
pPool->max = 1;
if (tWorkerInit(pPool) != 0) {
return TSDB_CODE_VND_OUT_OF_MEMORY;
}
tsVworker.pQueue = tWorkerAllocQueue(pPool, NULL);
vInfo("vworker is initialized, max worker %d", pPool->max);
return TSDB_CODE_SUCCESS;
}
void vnodeCleanupWorker() {
tWorkerFreeQueue(&tsVworker.pool, tsVworker.pQueue);
tWorkerCleanup(&tsVworker.pool);
tsVworker.pQueue = NULL;
vInfo("vworker is closed");
}
......@@ -15,12 +15,7 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "tglobal.h"
#include "tqueue.h"
#include "tworker.h"
#include "taosmsg.h"
#include "vnodeStatus.h"
#include "vnodeMain.h"
#include "vnodeWrite.h"
#include "vnodeWriteMsg.h"
......@@ -68,11 +63,6 @@ static int32_t vnodeWriteToWQueue(SVnode *pVnode, SWalHead *pHead, int32_t qtype
return TSDB_CODE_WAL_SIZE_LIMIT;
}
if (!vnodeInReadyStatus(pVnode)) {
vError("vgId:%d, failed to write into vwqueue, vstatus is %s", pVnode->vgId, vnodeStatus[pVnode->status]);
return TSDB_CODE_APP_NOT_READY;
}
if (tsVwrite.queuedBytes > tsMaxVnodeQueuedBytes) {
vDebug("vgId:%d, too many bytes:%" PRId64 " in vwqueue, flow control", pVnode->vgId, tsVwrite.queuedBytes);
return TSDB_CODE_VND_IS_FLOWCTRL;
......@@ -122,7 +112,7 @@ void vnodeProcessWriteMsg(SRpcMsg *pRpcMsg) {
pMsg->vgId = htonl(pMsg->vgId);
pMsg->contLen = htonl(pMsg->contLen);
SVnode *pVnode = vnodeAcquireNotClose(pMsg->vgId);
SVnode *pVnode = vnodeAcquire(pMsg->vgId);
if (pVnode == NULL) {
code = TSDB_CODE_VND_INVALID_VGROUP_ID;
} else {
......
......@@ -26,13 +26,13 @@ extern "C" {
//create persistent storage for meta info such as consuming offset
//return value > 0: cgId
//return value <= 0: error code
int tqCreateTCGroup(STQ*, const char* topic, int cgId, tqGroupHandle** handle);
//int tqCreateTCGroup(STQ*, const char* topic, int cgId, tqBufferHandle** handle);
//create ring buffer in memory and load consuming offset
//int tqOpenTCGroup(STQ*, const char* topic, int cgId);
//destroy ring buffer and persist consuming offset
//int tqCloseTCGroup(STQ*, const char* topic, int cgId);
//delete persistent storage for meta info
int tqDropTCGroup(STQ*, const char* topic, int cgId);
//int tqDropTCGroup(STQ*, const char* topic, int cgId);
#ifdef __cplusplus
}
......
......@@ -21,65 +21,150 @@
//send to fetch queue
//
//handle management message
//
static int tqProtoCheck(tmqMsgHead *pMsg) {
return pMsg->protoVer == 0;
}
tqGroupHandle* tqLookupGroupHandle(STQ *pTq, const char* topic, int cgId) {
//look in memory
//
//not found, try to restore from disk
//
//still not found
return NULL;
static int tqAckOneTopic(tqBufferHandle *bhandle, tmqOneAck *pAck, tqQueryMsg** ppQuery) {
//clean old item and move forward
int32_t consumeOffset = pAck->consumeOffset;
int idx = consumeOffset % TQ_BUFFER_SIZE;
ASSERT(bhandle->buffer[idx].content && bhandle->buffer[idx].executor);
tfree(bhandle->buffer[idx].content);
if( 1 /* TODO: need to launch new query */) {
tqQueryMsg* pNewQuery = malloc(sizeof(tqQueryMsg));
if(pNewQuery == NULL) {
//TODO: memory insufficient
return -1;
}
//TODO: lock executor
pNewQuery->exec->executor = bhandle->buffer[idx].executor;
//TODO: read from wal and assign to src
pNewQuery->exec->src = 0;
pNewQuery->exec->dest = &bhandle->buffer[idx];
pNewQuery->next = *ppQuery;
*ppQuery = pNewQuery;
}
return 0;
}
static int tqAck(tqGroupHandle* ghandle, tmqAcks* pAcks) {
int32_t ackNum = pAcks->ackNum;
tmqOneAck *acks = pAcks->acks;
//double ptr for acks and list
int i = 0;
tqListHandle* node = ghandle->head;
int ackCnt = 0;
tqQueryMsg *pQuery = NULL;
while(i < ackNum && node->next) {
if(acks[i].topicId == node->next->bufHandle->topicId) {
ackCnt++;
tqAckOneTopic(node->next->bufHandle, &acks[i], &pQuery);
} else if(acks[i].topicId < node->next->bufHandle->topicId) {
i++;
} else {
node = node->next;
}
}
if(pQuery) {
//post message
}
return ackCnt;
}
static int tqCommitTCGroup(tqGroupHandle* handle) {
//persist into disk
//persist modification into disk
return 0;
}
int tqCreateTCGroup(STQ *pTq, const char* topic, int cgId, tqGroupHandle** handle) {
int tqCreateTCGroup(STQ *pTq, int64_t topicId, int64_t cgId, int64_t cId, tqGroupHandle** handle) {
//create in disk
return 0;
}
int tqOpenTGroup(STQ* pTq, const char* topic, int cgId) {
int code;
tqGroupHandle* handle = tqLookupGroupHandle(pTq, topic, cgId);
if(handle == NULL) {
code = tqCreateTCGroup(pTq, topic, cgId, &handle);
if(code != 0) {
return code;
}
}
ASSERT(handle != NULL);
//put into STQ
int tqOpenTCGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) {
//look up in disk
//create
//open
return 0;
}
/*int tqCloseTCGroup(STQ* pTq, const char* topic, int cgId) {*/
/*tqGroupHandle* handle = tqLookupGroupHandle(pTq, topic, cgId);*/
/*return tqCommitTCGroup(handle);*/
/*}*/
int tqCloseTCGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) {
return 0;
}
int tqDropTCGroup(STQ* pTq, const char* topic, int cgId) {
int tqDropTCGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) {
//delete from disk
return 0;
}
static int tqFetch(tqGroupHandle* ghandle, void** msg) {
tqListHandle* head = ghandle->head;
tqListHandle* node = head;
int totSize = 0;
//TODO: make it a macro
int sizeLimit = 4 * 1024;
tmqMsgContent* buffer = malloc(sizeLimit);
if(buffer == NULL) {
//TODO:memory insufficient
return -1;
}
//iterate the list to get msgs of all topics
//until all topic iterated or msgs over sizeLimit
while(node->next) {
node = node->next;
tqBufferHandle* bufHandle = node->bufHandle;
int idx = bufHandle->nextConsumeOffset % TQ_BUFFER_SIZE;
if(bufHandle->buffer[idx].content != NULL &&
bufHandle->buffer[idx].offset == bufHandle->nextConsumeOffset
) {
totSize += bufHandle->buffer[idx].size;
if(totSize > sizeLimit) {
void *ptr = realloc(buffer, totSize);
if(ptr == NULL) {
totSize -= bufHandle->buffer[idx].size;
//TODO:memory insufficient
//return msgs already copied
break;
}
}
*((int64_t*)buffer) = bufHandle->topicId;
buffer = POINTER_SHIFT(buffer, sizeof(int64_t));
*((int64_t*)buffer) = bufHandle->buffer[idx].size;
buffer = POINTER_SHIFT(buffer, sizeof(int64_t));
memcpy(buffer, bufHandle->buffer[idx].content, bufHandle->buffer[idx].size);
buffer = POINTER_SHIFT(buffer, bufHandle->buffer[idx].size);
if(totSize > sizeLimit) {
break;
}
}
}
if(totSize == 0) {
//no msg
return -1;
}
int tqFetchMsg(tqGroupHandle* handle, void* msg) {
return 0;
return totSize;
}
int tqMoveOffsetToNext(tqGroupHandle* handle) {
return 0;
tqGroupHandle* tqGetGroupHandle(STQ* pTq, int64_t cId) {
return NULL;
}
int tqLaunchQuery(tqGroupHandle* ghandle) {
return 0;
}
tqGroupHandle* tqFindGHandleBycId(STQ* pTq, int64_t cId) {
return NULL;
int tqSendLaunchQuery(STQ* pTq, int64_t topicId, int64_t cgId, void* query) {
return 0;
}
/*int tqMoveOffsetToNext(tqGroupHandle* ghandle) {*/
/*return 0;*/
/*}*/
int tqPushMsg(STQ* pTq , void* p, int64_t version) {
//add reference
//judge and launch new query
......@@ -91,10 +176,121 @@ int tqCommit(STQ* pTq) {
return 0;
}
int tqHandleConsumeMsg(STQ* pTq, tmqConsumeReq* msg) {
//parse msg and extract topic and cgId
//lookup handle
//confirm message and send to consumer
int tqConsume(STQ* pTq, tmqConsumeReq* pMsg) {
if(!tqProtoCheck((tmqMsgHead *)pMsg)) {
//proto version invalid
return -1;
}
int64_t clientId = pMsg->head.clientId;
tqGroupHandle *ghandle = tqGetGroupHandle(pTq, clientId);
if(ghandle == NULL) {
//client not connect
return -1;
}
if(pMsg->acks.ackNum != 0) {
if(tqAck(ghandle, &pMsg->acks) != 0) {
//ack not success
return -1;
}
}
tmqConsumeRsp *pRsp = (tmqConsumeRsp*) pMsg;
if(tqFetch(ghandle, (void**)&pRsp->msgs) < 0) {
//fetch error
return -1;
}
//judge and launch new query
if(tqLaunchQuery(ghandle)) {
//launch query error
return -1;
}
return 0;
}
int tqSerializeGroupHandle(tqGroupHandle *gHandle, void** ppBytes, int32_t offset) {
//calculate size
int sz = tqGetGHandleSSize(gHandle);
if(sz <= 0) {
//TODO: err
return -1;
}
void* ptr = realloc(*ppBytes, sz);
if(ptr == NULL) {
free(ppBytes);
//TODO: memory err
return -1;
}
*ppBytes = ptr;
//do serialize
*(int64_t*)ptr = gHandle->cId;
ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
*(int64_t*)ptr = gHandle->cgId;
ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
*(int32_t*)ptr = gHandle->topicNum;
if(gHandle->topicNum > 0) {
tqSerializeListHandle(gHandle->head, ppBytes, ptr - *ppBytes);
}
return 0;
}
int tqSerializeListHandle(tqListHandle *listHandle, void** ppBytes, int32_t offset) {
void* ptr = POINTER_SHIFT(*ppBytes, offset);
tqListHandle *node = listHandle;
while(node->next) {
node = node->next;
offset = tqSerializeBufHandle(node->bufHandle, ppBytes, offset);
}
return offset;
}
int tqSerializeBufHandle(tqBufferHandle *bufHandle, void** ppBytes, int32_t offset) {
void *ptr = POINTER_SHIFT(*ppBytes, offset);
*(int64_t*)ptr = bufHandle->nextConsumeOffset;
ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
*(int64_t*)ptr = bufHandle->topicId;
ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
*(int32_t*)ptr = bufHandle->head;
ptr = POINTER_SHIFT(ptr, sizeof(int32_t));
*(int32_t*)ptr = bufHandle->tail;
ptr = POINTER_SHIFT(ptr, sizeof(int32_t));
for(int i = 0; i < TQ_BUFFER_SIZE; i++) {
int sz = tqSerializeBufItem(&bufHandle->buffer[i], ppBytes, ptr - *ppBytes);
ptr = POINTER_SHIFT(ptr, sz);
}
return ptr - *ppBytes;
}
int tqSerializeBufItem(tqBufferItem *bufItem, void** ppBytes, int32_t offset) {
void *ptr = POINTER_SHIFT(*ppBytes, offset);
//TODO: do we need serialize this?
return 0;
}
int tqDeserializeGroupHandle(const void* pBytes, tqGroupHandle **pGhandle) {
return 0;
}
int tqDeserializeListHandle(const void* pBytes, tqListHandle **pListHandle) {
return 0;
}
int tqDeserializeBufHandle(const void* pBytes, tqBufferHandle **pBufHandle) {
return 0;
}
int tqDeserializeBufItem(const void* pBytes, tqBufferItem **pBufItem) {
return 0;
}
int tqGetGHandleSSize(const tqGroupHandle *gHandle) {
return 0;
}
int tqListHandleSSize(const tqListHandle *listHandle) {
return 0;
}
int tqBufHandleSSize(const tqBufferHandle *bufHandle) {
return 0;
}
int tqBufItemSSize(const tqBufferItem *bufItem) {
return 0;
}
......@@ -245,11 +245,11 @@ TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_DISK_PERMISSIONS, "No write permission f
TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_SUCH_FILE_OR_DIR, "Missing data file")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_OUT_OF_MEMORY, "Out of memory")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_APP_ERROR, "Unexpected generic error in vnode")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_INVALID_VRESION_FILE, "Invalid version file")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_IS_FULL, "Database memory is full for commit failed")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_IS_FLOWCTRL, "Database memory is full for waiting commit")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_INVALID_CFG_FILE, "Invalid config file")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_INVALID_TERM_FILE, "Invalid term file")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_IS_FLOWCTRL, "Database memory is full")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_IS_DROPPING, "Database is dropping")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_IS_BALANCING, "Database is balancing")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_IS_UPDATING, "Database is updating")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_IS_CLOSING, "Database is closing")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_NOT_SYNCED, "Database suspended")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_WRITE_AUTH, "Database write operation denied")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册