未验证 提交 23f8739e 编写于 作者: S slguan 提交者: GitHub

Merge pull request #1525 from taosdata/hotfix/headfile

Hotfix/headfile
......@@ -31,22 +31,7 @@
#include "dnodeWrite.h"
#include "vnode.h"
typedef struct {
int32_t vgId; // global vnode group ID
int32_t refCount; // reference count
EVnodeStatus status; // status: master, slave, notready, deleting
int64_t version;
void * wworker;
void * rworker;
void * wal;
void * tsdb;
void * replica;
void * events;
void * cq; // continuous query
} SVnodeObj;
static int32_t dnodeOpenVnodes();
static void dnodeCleanupVnodes();
static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *pMsg);
static int32_t dnodeProcessDropVnodeMsg(SRpcMsg *pMsg);
static int32_t dnodeProcessAlterVnodeMsg(SRpcMsg *pMsg);
......@@ -56,7 +41,6 @@ static int32_t (*dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *pMsg);
static void dnodeSendStatusMsg(void *handle, void *tmrId);
static void dnodeReadDnodeId();
void *tsDnodeVnodesHash = NULL;
static void *tsDnodeTmr = NULL;
static void *tsStatusTimer = NULL;
static uint32_t tsRebootTime;
......@@ -72,12 +56,6 @@ int32_t dnodeInitMgmt() {
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = dnodeProcessAlterStreamMsg;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeProcessConfigDnodeMsg;
tsDnodeVnodesHash = taosInitIntHash(TSDB_MAX_VNODES, sizeof(SVnodeObj), taosHashInt);
if (tsDnodeVnodesHash == NULL) {
dError("failed to init vnode list");
return -1;
}
tsRebootTime = taosGetTimestampSec();
tsDnodeTmr = taosTmrInit(100, 200, 60000, "DND-DM");
......@@ -86,6 +64,10 @@ int32_t dnodeInitMgmt() {
return -1;
}
if ( vnodeInitModule() != TSDB_CODE_SUCCESS) {
return -1;
}
int32_t code = dnodeOpenVnodes();
if (code != TSDB_CODE_SUCCESS) {
return -1;
......@@ -106,11 +88,7 @@ void dnodeCleanupMgmt() {
tsDnodeTmr = NULL;
}
dnodeCleanupVnodes();
if (tsDnodeVnodesHash == NULL) {
taosCleanUpIntHash(tsDnodeVnodesHash);
tsDnodeVnodesHash = NULL;
}
vnodeCleanupModule();
}
void dnodeMgmt(SRpcMsg *pMsg) {
......@@ -129,14 +107,6 @@ void dnodeMgmt(SRpcMsg *pMsg) {
rpcFreeCont(pMsg->pCont);
}
void *dnodeGetVnodeWworker(void *pVnode) {
return ((SVnodeObj *)pVnode)->wworker;
}
void *dnodeGetVnodeRworker(void *pVnode) {
return ((SVnodeObj *)pVnode)->rworker;
}
static int32_t dnodeOpenVnodes() {
DIR *dir = opendir(tsVnodeDir);
if (dir == NULL) {
......@@ -166,13 +136,6 @@ static int32_t dnodeOpenVnodes() {
return TSDB_CODE_SUCCESS;
}
typedef void (*CleanupFp)(char *);
static void dnodeCleanupVnodes() {
int32_t num = taosGetIntHashSize(tsDnodeVnodesHash);
taosCleanUpIntHashWithFp(tsDnodeVnodesHash, (CleanupFp)vnodeClose);
dPrint("dnode mgmt is closed, vnodes:%d", num);
}
static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) {
SMDCreateVnodeMsg *pCreate = rpcMsg->pCont;
......@@ -219,19 +182,6 @@ static int32_t dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg) {
return tsCfgDynamicOptions(pCfg->config);
}
static void dnodeBuildVloadMsg(char *pNode, void * param) {
SVnodeObj *pVnode = (SVnodeObj *) pNode;
if (pVnode->status == TSDB_VN_STATUS_DELETING) return;
SDMStatusMsg *pStatus = param;
if (pStatus->openVnodes >= TSDB_MAX_VNODES) return;
SVnodeLoad *pLoad = &pStatus->load[pStatus->openVnodes++];
pLoad->vgId = htonl(pVnode->vgId);
pLoad->vnode = htonl(pVnode->vgId);
pLoad->status = pVnode->status;
}
static void dnodeSendStatusMsg(void *handle, void *tmrId) {
if (tsDnodeTmr == NULL) {
dError("dnode timer is already released");
......@@ -263,7 +213,7 @@ static void dnodeSendStatusMsg(void *handle, void *tmrId) {
pStatus->diskAvailable = tsAvailDataDirGB;
pStatus->alternativeRole = (uint8_t) tsAlternativeRole;
taosVisitIntHashWithFp(tsDnodeVnodesHash, dnodeBuildVloadMsg, pStatus);
vnodeBuildStatusMsg(pStatus);
contLen = sizeof(SDMStatusMsg) + pStatus->openVnodes * sizeof(SVnodeLoad);
pStatus->openVnodes = htons(pStatus->openVnodes);
......
......@@ -127,7 +127,7 @@ void dnodeRead(SRpcMsg *pMsg) {
}
void *dnodeAllocateRqueue(void *pVnode) {
taos_queue *queue = taosOpenQueue(sizeof(SReadMsg));
taos_queue queue = taosOpenQueue();
if (queue == NULL) return NULL;
taosAddIntoQset(readQset, queue, pVnode);
......@@ -144,6 +144,8 @@ void *dnodeAllocateRqueue(void *pVnode) {
}
}
dTrace("pVnode:%p, queue:%p is allocated", pVnode, queue);
return queue;
}
......
......@@ -106,7 +106,7 @@ void dnodeWrite(SRpcMsg *pMsg) {
void *dnodeAllocateWqueue(void *pVnode) {
SWriteWorker *pWorker = wWorkerPool.writeWorker + wWorkerPool.nextId;
taos_queue *queue = taosOpenQueue();
void *queue = taosOpenQueue();
if (queue == NULL) return NULL;
if (pWorker->qset == NULL) {
......@@ -129,7 +129,7 @@ void *dnodeAllocateWqueue(void *pVnode) {
wWorkerPool.nextId = (wWorkerPool.nextId + 1) % wWorkerPool.max;
}
dTrace("queue:%p is allocated for pVnode:%p", queue, pVnode);
dTrace("pVnode:%p, queue:%p is allocated", pVnode, queue);
return queue;
}
......
......@@ -44,7 +44,6 @@ void *dnodeAllocateRqueue(void *pVnode);
void dnodeFreeRqueue(void *rqueue);
void dnodeSendWriteResponse(void *pVnode, void *param, int32_t code);
#ifdef __cplusplus
}
#endif
......
......@@ -25,6 +25,9 @@ typedef struct {
void *rsp;
} SRspRet;
int32_t vnodeInitModule();
void vnodeCleanupModule();
int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg);
int32_t vnodeDrop(int32_t vgId);
int32_t vnodeOpen(int32_t vnode, char *rootDir);
......@@ -39,6 +42,7 @@ void* vnodeGetWal(void *pVnode);
void* vnodeGetTsdb(void *pVnode);
int32_t vnodeProcessWrite(void *pVnode, int qtype, SWalHead *pHead, void *item);
void vnodeBuildStatusMsg(void * param);
#ifdef __cplusplus
}
......
......@@ -117,7 +117,7 @@ int taosWriteQitem(taos_queue param, int type, void *item) {
queue->numOfItems++;
if (queue->qset) atomic_add_fetch_32(&queue->qset->numOfItems, 1);
//pTrace("item:%p is put into queue, items:%d", item, queue->numOfItems);
//pTrace("item:%p is put into queue, type:%d items:%d", item, type, queue->numOfItems);
pthread_mutex_unlock(&queue->mutex);
......@@ -197,7 +197,7 @@ int taosGetQitem(taos_qall param, int *type, void **pitem) {
*pitem = pNode->item;
*type = pNode->type;
num = 1;
//pTrace("item:%p is fetched", *pitem);
// pTrace("item:%p is fetched, type:%d", *pitem, *type);
}
return num;
......
......@@ -34,13 +34,13 @@ typedef struct {
EVnStatus status;
int role;
int64_t version;
void * wqueue;
void * rqueue;
void * wal;
void * tsdb;
void * sync;
void * events;
void * cq; // continuous query
void *wqueue;
void *rqueue;
void *wal;
void *tsdb;
void *sync;
void *events;
void *cq; // continuous query
} SVnodeObj;
int vnodeWriteToQueue(void *param, SWalHead *pHead, int type);
......
......@@ -25,10 +25,32 @@
#include "ttime.h"
#include "ttimer.h"
#include "twal.h"
#include "dnode.h"
#include "vnode.h"
#include "vnodeInt.h"
extern void *tsDnodeVnodesHash;
static void vnodeCleanUp(SVnodeObj *pVnode);
static void *tsDnodeVnodesHash;
static void vnodeCleanUp(SVnodeObj *pVnode);
static void vnodeBuildVloadMsg(char *pNode, void * param);
int32_t vnodeInitModule() {
vnodeInitWriteFp();
tsDnodeVnodesHash = taosInitIntHash(TSDB_MAX_VNODES, sizeof(SVnodeObj), taosHashInt);
if (tsDnodeVnodesHash == NULL) {
dError("failed to init vnode list");
return -1;
}
return 0;
}
typedef void (*CleanupFp)(char *);
void vnodeCleanupModule() {
taosCleanUpIntHashWithFp(tsDnodeVnodesHash, (CleanupFp)vnodeClose);
taosCleanUpIntHash(tsDnodeVnodesHash);
}
int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) {
int32_t code;
......@@ -95,9 +117,6 @@ int32_t vnodeDrop(int32_t vgId) {
int32_t vnodeOpen(int32_t vnode, char *rootDir) {
char temp[TSDB_FILENAME_LEN];
static pthread_once_t vnodeInitWrite = PTHREAD_ONCE_INIT;
pthread_once(&vnodeInitWrite, vnodeInitWriteFp);
SVnodeObj vnodeObj = {0};
vnodeObj.vgId = vnode;
vnodeObj.status = VN_STATUS_INIT;
......@@ -194,6 +213,24 @@ void *vnodeGetTsdb(void *pVnode) {
return ((SVnodeObj *)pVnode)->tsdb;
}
void vnodeBuildStatusMsg(void *param) {
SDMStatusMsg *pStatus = param;
taosVisitIntHashWithFp(tsDnodeVnodesHash, vnodeBuildVloadMsg, pStatus);
}
static void vnodeBuildVloadMsg(char *pNode, void * param) {
SVnodeObj *pVnode = (SVnodeObj *) pNode;
if (pVnode->status == VN_STATUS_DELETING) return;
SDMStatusMsg *pStatus = param;
if (pStatus->openVnodes >= TSDB_MAX_VNODES) return;
SVnodeLoad *pLoad = &pStatus->load[pStatus->openVnodes++];
pLoad->vgId = htonl(pVnode->vgId);
pLoad->vnode = htonl(pVnode->vgId);
pLoad->status = pVnode->status;
}
static void vnodeCleanUp(SVnodeObj *pVnode) {
taosDeleteIntHash(tsDnodeVnodesHash, pVnode->vgId);
......
......@@ -26,7 +26,7 @@
#include "vnode.h"
#include "vnodeInt.h"
static int32_t (*vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *, void *, void*);
static int32_t (*vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *, void *, SRspRet *);
static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *);
static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *);
static int32_t vnodeProcessDropTableMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *);
......
......@@ -284,7 +284,7 @@ static int walRestoreWalFile(char *name, void *pVnode, int (*writeFp)(void *, SW
}
// write into queue
(*writeFp)(pVnode, buffer, TAOS_QTYPE_WAL);
(*writeFp)(pVnode, pHead, TAOS_QTYPE_WAL);
}
return code;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册