From 6af40311edebeec85752f585580919801a55b7cf Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Sat, 4 Apr 2020 19:04:11 +0800 Subject: [PATCH] integrate WAL module move some functions in dnode into vnodeMain.c and vnodeWrite.c --- src/dnode/CMakeLists.txt | 2 +- src/dnode/inc/dnodeRead.h | 4 +- src/dnode/inc/dnodeWrite.h | 5 +- src/dnode/inc/vnode.h | 43 +++++ src/dnode/inc/vnodeInt.h | 52 +++++ src/dnode/src/dnodeMgmt.c | 286 +++------------------------- src/dnode/src/dnodeRead.c | 21 ++- src/dnode/src/dnodeWrite.c | 336 +++++++-------------------------- src/dnode/src/vnodeMain.c | 207 ++++++++++++++++++++ src/dnode/src/vnodeWrite.c | 254 +++++++++++++++++++++++++ src/{vnode/wal => }/inc/twal.h | 2 +- src/vnode/tsdb/inc/tsdb.h | 4 +- src/vnode/tsdb/src/tsdbMain.c | 69 +++---- src/vnode/wal/src/walMain.c | 33 ++-- src/vnode/wal/test/waltest.c | 6 +- 15 files changed, 710 insertions(+), 614 deletions(-) create mode 100644 src/dnode/inc/vnode.h create mode 100644 src/dnode/inc/vnodeInt.h create mode 100644 src/dnode/src/vnodeMain.c create mode 100644 src/dnode/src/vnodeWrite.c rename src/{vnode/wal => }/inc/twal.h (97%) diff --git a/src/dnode/CMakeLists.txt b/src/dnode/CMakeLists.txt index 42bcea0383..472c270e62 100644 --- a/src/dnode/CMakeLists.txt +++ b/src/dnode/CMakeLists.txt @@ -13,7 +13,7 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) AUX_SOURCE_DIRECTORY(src SRC) ADD_EXECUTABLE(taosd ${SRC}) - TARGET_LINK_LIBRARIES(taosd mnode taos_static monitor http tsdb) + TARGET_LINK_LIBRARIES(taosd mnode taos_static monitor http tsdb twal) IF (TD_ACCOUNT) TARGET_LINK_LIBRARIES(taosd account) diff --git a/src/dnode/inc/dnodeRead.h b/src/dnode/inc/dnodeRead.h index fba3245a07..c18647e575 100644 --- a/src/dnode/inc/dnodeRead.h +++ b/src/dnode/inc/dnodeRead.h @@ -23,8 +23,8 @@ extern "C" { int32_t dnodeInitRead(); void dnodeCleanupRead(); void dnodeRead(SRpcMsg *pMsg); -void * dnodeAllocateReadWorker(); -void dnodeFreeReadWorker(void *rqueue); +void * dnodeAllocateRqueue(); +void dnodeFreeRqueue(void *rqueue); #ifdef __cplusplus } diff --git a/src/dnode/inc/dnodeWrite.h b/src/dnode/inc/dnodeWrite.h index 3e37141f94..054d9d796d 100644 --- a/src/dnode/inc/dnodeWrite.h +++ b/src/dnode/inc/dnodeWrite.h @@ -23,8 +23,9 @@ extern "C" { int32_t dnodeInitWrite(); void dnodeCleanupWrite(); void dnodeWrite(SRpcMsg *pMsg); -void * dnodeAllocateWriteWorker(); -void dnodeFreeWriteWorker(void *worker); +void * dnodeAllocateWqueue(); +void dnodeFreeWqueue(void *worker); +void dnodeSendWriteResponse(void *pVnode, void *param, int32_t code); #ifdef __cplusplus } diff --git a/src/dnode/inc/vnode.h b/src/dnode/inc/vnode.h new file mode 100644 index 0000000000..1cfecca456 --- /dev/null +++ b/src/dnode/inc/vnode.h @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_VNODE_H +#define TDENGINE_VNODE_H + +#ifdef __cplusplus +extern "C" { +#endif + +int32_t vnodeInitWrite(); +int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg); +int32_t vnodeDrop(int32_t vgId); +int32_t vnodeOpen(int32_t vnode, char *rootDir); +int32_t vnodeClose(void *pVnode); + +void vnodeRelease(void *pVnode); + +void* vnodeGetVnode(int32_t vgId); +void* vnodeGetRqueue(void *); +void* vnodeGetWqueue(int32_t vgId); +void* vnodeGetWal(void *pVnode); +void* vnodeGetTsdb(void *pVnode); + +int32_t vnodeProcessWrite(void *pVnode, int qtype, SWalHead *pHead, void *item); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/src/dnode/inc/vnodeInt.h b/src/dnode/inc/vnodeInt.h new file mode 100644 index 0000000000..99dda7d389 --- /dev/null +++ b/src/dnode/inc/vnodeInt.h @@ -0,0 +1,52 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_VNODE_INT_H +#define TDENGINE_VNODE_INT_H + +#ifdef __cplusplus +extern "C" { +#endif + +typedef enum _VN_STATUS { + VN_STATUS_INIT, + VN_STATUS_CREATING, + VN_STATUS_READY, + VN_STATUS_CLOSING, + VN_STATUS_DELETING, +} EVnStatus; + +typedef struct { + int32_t vgId; // global vnode group ID + int32_t refCount; // reference count + EVnStatus status; + int role; + int64_t version; + void * wqueue; + void * rqueue; + void * wal; + void * tsdb; + void * sync; + void * events; + void * cq; // continuous query +} SVnodeObj; + +int vnodeWriteToQueue(void *param, SWalHead *pHead, int type); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/src/dnode/src/dnodeMgmt.c b/src/dnode/src/dnodeMgmt.c index 70fde55f89..d3fb6c9f8d 100644 --- a/src/dnode/src/dnodeMgmt.c +++ b/src/dnode/src/dnodeMgmt.c @@ -24,12 +24,12 @@ #include "tsdb.h" #include "ttime.h" #include "ttimer.h" +#include "twal.h" #include "dnodeMClient.h" #include "dnodeMgmt.h" #include "dnodeRead.h" #include "dnodeWrite.h" - -typedef enum { CLOSE_TSDB, DROP_TSDB } ECloseTsdbFlag; +#include "vnode.h" typedef struct { int32_t vgId; // global vnode group ID @@ -47,21 +47,16 @@ typedef struct { static int32_t dnodeOpenVnodes(); static void dnodeCleanupVnodes(); -static int32_t dnodeOpenVnode(int32_t vnode, char *rootDir); -static void dnodeCleanupVnode(SVnodeObj *pVnode); -static void dnodeDoCleanupVnode(SVnodeObj *pVnode, ECloseTsdbFlag dropFlag); -static int32_t dnodeCreateVnode(SMDCreateVnodeMsg *cfg); -static void dnodeDropVnode(SVnodeObj *pVnode); -static void dnodeProcessCreateVnodeMsg(SRpcMsg *pMsg); -static void dnodeProcessDropVnodeMsg(SRpcMsg *pMsg); -static void dnodeProcessAlterVnodeMsg(SRpcMsg *pMsg); -static void dnodeProcessAlterStreamMsg(SRpcMsg *pMsg); -static void dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg); -static void (*dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *pMsg); +static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *pMsg); +static int32_t dnodeProcessDropVnodeMsg(SRpcMsg *pMsg); +static int32_t dnodeProcessAlterVnodeMsg(SRpcMsg *pMsg); +static int32_t dnodeProcessAlterStreamMsg(SRpcMsg *pMsg); +static int32_t dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg); +static int32_t (*dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *pMsg); static void dnodeSendStatusMsg(void *handle, void *tmrId); static void dnodeReadDnodeId(); -static void *tsDnodeVnodesHash = NULL; +void *tsDnodeVnodesHash = NULL; static void *tsDnodeTmr = NULL; static void *tsStatusTimer = NULL; static uint32_t tsRebootTime; @@ -119,41 +114,19 @@ void dnodeCleanupMgmt() { } void dnodeMgmt(SRpcMsg *pMsg) { - terrno = 0; + SRpcMsg rsp; if (dnodeProcessMgmtMsgFp[pMsg->msgType]) { - (*dnodeProcessMgmtMsgFp[pMsg->msgType])(pMsg); + rsp.code = (*dnodeProcessMgmtMsgFp[pMsg->msgType])(pMsg); } else { - SRpcMsg rsp; - rsp.handle = pMsg->handle; - rsp.code = TSDB_CODE_MSG_NOT_PROCESSED; - rsp.pCont = NULL; - rpcSendResponse(&rsp); + rsp.code = TSDB_CODE_MSG_NOT_PROCESSED; } - rpcFreeCont(pMsg->pCont); -} - -void *dnodeGetVnode(int32_t vgId) { - SVnodeObj *pVnode = (SVnodeObj *) taosGetIntHashData(tsDnodeVnodesHash, vgId); - if (pVnode == NULL) { - terrno = TSDB_CODE_INVALID_VGROUP_ID; - return NULL; - } + rsp.handle = pMsg->handle; + rsp.pCont = NULL; + rpcSendResponse(&rsp); - if (pVnode->status != TSDB_VN_STATUS_MASTER && pVnode->status == TSDB_VN_STATUS_SLAVE) { - terrno = TSDB_CODE_INVALID_VNODE_STATUS; - return NULL; - } - - atomic_add_fetch_32(&pVnode->refCount, 1); - dTrace("pVnode:%p, vgroup:%d, get vnode, refCount:%d", pVnode, pVnode->vgId, pVnode->refCount); - - return pVnode; -} - -int32_t dnodeGetVnodeStatus(void *pVnode) { - return ((SVnodeObj *)pVnode)->status; + rpcFreeCont(pMsg->pCont); } void *dnodeGetVnodeWworker(void *pVnode) { @@ -164,37 +137,6 @@ void *dnodeGetVnodeRworker(void *pVnode) { return ((SVnodeObj *)pVnode)->rworker; } -void *dnodeGetVnodeWal(void *pVnode) { - return ((SVnodeObj *)pVnode)->wal; -} - -void *dnodeGetVnodeTsdb(void *pVnode) { - return ((SVnodeObj *)pVnode)->tsdb; -} - -void dnodeReleaseVnode(void *pVnodeRaw) { - SVnodeObj *pVnode = pVnodeRaw; - - int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1); - if (pVnode->status == TSDB_VN_STATUS_DELETING) { - if (refCount <= 0) { - dPrint("pVnode:%p, vgroup:%d, drop vnode, refCount:%d", pVnode, pVnode->vgId, refCount); - dnodeDoCleanupVnode(pVnode, DROP_TSDB); - } else { - dTrace("pVnode:%p, vgroup:%d, vnode will be dropped until refCount:%d is 0", pVnode, pVnode->vgId, refCount); - } - } else if (pVnode->status == TSDB_VN_STATUS_CLOSING) { - if (refCount <= 0) { - dPrint("pVnode:%p, vgroup:%d, cleanup vnode, refCount:%d", pVnode, pVnode->vgId, refCount); - dnodeDoCleanupVnode(pVnode, CLOSE_TSDB); - } else { - dTrace("pVnode:%p, vgroup:%d, vnode will cleanup until refCount:%d is 0", pVnode, pVnode->vgId, refCount); - } - } else { - dTrace("pVnode:%p, vgroup:%d, release vnode, refCount:%d", pVnode, pVnode->vgId, refCount); - } -} - static int32_t dnodeOpenVnodes() { DIR *dir = opendir(tsVnodeDir); if (dir == NULL) { @@ -212,7 +154,7 @@ static int32_t dnodeOpenVnodes() { char vnodeDir[TSDB_FILENAME_LEN * 3]; snprintf(vnodeDir, TSDB_FILENAME_LEN * 3, "%s/%s", tsVnodeDir, de->d_name); - int32_t code = dnodeOpenVnode(vnode, vnodeDir); + int32_t code = vnodeOpen(vnode, vnodeDir); if (code == 0) { numOfVnodes++; } @@ -227,208 +169,39 @@ static int32_t dnodeOpenVnodes() { typedef void (*CleanupFp)(char *); static void dnodeCleanupVnodes() { int32_t num = taosGetIntHashSize(tsDnodeVnodesHash); - taosCleanUpIntHashWithFp(tsDnodeVnodesHash, (CleanupFp)dnodeCleanupVnode); + taosCleanUpIntHashWithFp(tsDnodeVnodesHash, (CleanupFp)vnodeClose); dPrint("dnode mgmt is closed, vnodes:%d", num); } -static int32_t dnodeOpenVnode(int32_t vnode, char *rootDir) { - SVnodeObj vnodeObj = {0}; - vnodeObj.vgId = vnode; - vnodeObj.status = TSDB_VN_STATUS_NOT_READY; - vnodeObj.refCount = 1; - vnodeObj.version = 0; - SVnodeObj *pVnode = (SVnodeObj *)taosAddIntHash(tsDnodeVnodesHash, vnodeObj.vgId, (char *)(&vnodeObj)); - - char tsdbDir[TSDB_FILENAME_LEN]; - sprintf(tsdbDir, "%s/tsdb", rootDir); - void *pTsdb = tsdbOpenRepo(tsdbDir); - if (pTsdb == NULL) { - dError("pVnode:%p, vgroup:%d, failed to open tsdb in %s, reason:%s", pVnode, pVnode->vgId, tsdbDir, tstrerror(terrno)); - taosDeleteIntHash(tsDnodeVnodesHash, pVnode->vgId); - return terrno; - } - - pVnode->wal = NULL; - pVnode->tsdb = pTsdb; - pVnode->replica = NULL; - pVnode->events = NULL; - pVnode->cq = NULL; - pVnode->wworker = dnodeAllocateWriteWorker(pVnode); - pVnode->rworker = dnodeAllocateReadWorker(pVnode); - - //TODO: jude status while replca is not null - if (pVnode->replica == NULL) { - pVnode->status = TSDB_VN_STATUS_MASTER; - } - - dTrace("pVnode:%p, vgroup:%d, vnode is opened in %s", pVnode, pVnode->vgId, rootDir); - return TSDB_CODE_SUCCESS; -} - -static void dnodeDoCleanupVnode(SVnodeObj *pVnode, ECloseTsdbFlag closeFlag) { - dTrace("pVnode:%p, vgroup:%d, cleanup vnode", pVnode, pVnode->vgId); - - // remove replica - - // remove read queue - dnodeFreeReadWorker(pVnode->rworker); - pVnode->rworker = NULL; - - // remove write queue - dnodeFreeWriteWorker(pVnode->wworker); - pVnode->wworker = NULL; - - // remove wal - - // remove tsdb - if (pVnode->tsdb) { - if (closeFlag == DROP_TSDB) { - tsdbDropRepo(pVnode->tsdb); - taosDeleteIntHash(tsDnodeVnodesHash, pVnode->vgId); - } else if (closeFlag == CLOSE_TSDB) { - tsdbCloseRepo(pVnode->tsdb); - } - pVnode->tsdb = NULL; - } -} - -static void dnodeCleanupVnode(SVnodeObj *pVnode) { - pVnode->status = TSDB_VN_STATUS_CLOSING; - dnodeReleaseVnode(pVnode); -} - -static int32_t dnodeCreateVnode(SMDCreateVnodeMsg *pVnodeCfg) { - STsdbCfg tsdbCfg = {0}; - tsdbCfg.precision = pVnodeCfg->cfg.precision; - tsdbCfg.tsdbId = pVnodeCfg->cfg.vgId; - tsdbCfg.maxTables = pVnodeCfg->cfg.maxSessions; - tsdbCfg.daysPerFile = pVnodeCfg->cfg.daysPerFile; - tsdbCfg.minRowsPerFileBlock = -1; - tsdbCfg.maxRowsPerFileBlock = -1; - tsdbCfg.keep = -1; - tsdbCfg.maxCacheSize = -1; - - char rootDir[TSDB_FILENAME_LEN] = {0}; - sprintf(rootDir, "%s/vnode%d", tsVnodeDir, pVnodeCfg->cfg.vgId); - if (mkdir(rootDir, 0755) != 0) { - if (errno == EACCES) { - return TSDB_CODE_NO_DISK_PERMISSIONS; - } else if (errno == ENOSPC) { - return TSDB_CODE_SERV_NO_DISKSPACE; - } else if (errno == EEXIST) { - } else { - return TSDB_CODE_VG_INIT_FAILED; - } - } - - sprintf(rootDir, "%s/vnode%d/tsdb", tsVnodeDir, pVnodeCfg->cfg.vgId); - if (mkdir(rootDir, 0755) != 0) { - if (errno == EACCES) { - return TSDB_CODE_NO_DISK_PERMISSIONS; - } else if (errno == ENOSPC) { - return TSDB_CODE_SERV_NO_DISKSPACE; - } else if (errno == EEXIST) { - } else { - return TSDB_CODE_VG_INIT_FAILED; - } - } - - void *pTsdb = tsdbCreateRepo(rootDir, &tsdbCfg, NULL); - if (pTsdb == NULL) { - dError("vgroup:%d, failed to create tsdb in vnode, reason:%s", pVnodeCfg->cfg.vgId, tstrerror(terrno)); - return terrno; - } - - SVnodeObj vnodeObj = {0}; - vnodeObj.vgId = pVnodeCfg->cfg.vgId; - vnodeObj.status = TSDB_VN_STATUS_CREATING; - vnodeObj.refCount = 1; - vnodeObj.version = 0; - vnodeObj.wal = NULL; - vnodeObj.tsdb = pTsdb; - vnodeObj.replica = NULL; - vnodeObj.events = NULL; - vnodeObj.cq = NULL; - - SVnodeObj *pVnode = (SVnodeObj *)taosAddIntHash(tsDnodeVnodesHash, vnodeObj.vgId, (char *)(&vnodeObj)); - pVnode->wworker = dnodeAllocateWriteWorker(pVnode); - pVnode->rworker = dnodeAllocateReadWorker(pVnode); - if (pVnode->replica == NULL) { - pVnode->status = TSDB_VN_STATUS_MASTER; - } - - dPrint("pVnode:%p, vgroup:%d, vnode:%d is created", pVnode, pVnode->vgId, pVnode->vgId); - return TSDB_CODE_SUCCESS; -} - -static void dnodeDropVnode(SVnodeObj *pVnode) { - pVnode->status = TSDB_VN_STATUS_DELETING; - dnodeReleaseVnode(pVnode); -} - -static void dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) { - SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; +static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) { SMDCreateVnodeMsg *pCreate = rpcMsg->pCont; pCreate->cfg.vgId = htonl(pCreate->cfg.vgId); pCreate->cfg.maxSessions = htonl(pCreate->cfg.maxSessions); pCreate->cfg.daysPerFile = htonl(pCreate->cfg.daysPerFile); - dTrace("vgroup:%d, start to create vnode in dnode", pCreate->cfg.vgId); - - SVnodeObj *pVnodeObj = (SVnodeObj *) taosGetIntHashData(tsDnodeVnodesHash, pCreate->cfg.vgId); - if (pVnodeObj != NULL) { - rpcRsp.code = TSDB_CODE_SUCCESS; - dPrint("pVnode:%p, vgroup:%d, vnode is already exist", pVnodeObj, pCreate->cfg.vgId); - } else { - rpcRsp.code = dnodeCreateVnode(pCreate); - } - - rpcSendResponse(&rpcRsp); + return vnodeCreate(pCreate); } -static void dnodeProcessDropVnodeMsg(SRpcMsg *rpcMsg) { - SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; +static int32_t dnodeProcessDropVnodeMsg(SRpcMsg *rpcMsg) { SMDDropVnodeMsg *pDrop = rpcMsg->pCont; pDrop->vgId = htonl(pDrop->vgId); - SVnodeObj *pVnodeObj = (SVnodeObj *) taosGetIntHashData(tsDnodeVnodesHash, pDrop->vgId); - if (pVnodeObj != NULL) { - dPrint("pVnode:%p, vgroup:%d, start to drop vnode in dnode", pVnodeObj, pDrop->vgId); - dnodeDropVnode(pVnodeObj); - rpcRsp.code = TSDB_CODE_SUCCESS; - } else { - dTrace("vgroup:%d, failed drop vnode in dnode, vgroup not exist", pDrop->vgId); - rpcRsp.code = TSDB_CODE_INVALID_VGROUP_ID; - } - - rpcSendResponse(&rpcRsp); + return vnodeDrop(pDrop->vgId); } -static void dnodeProcessAlterVnodeMsg(SRpcMsg *rpcMsg) { - SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; +static int32_t dnodeProcessAlterVnodeMsg(SRpcMsg *rpcMsg) { SMDCreateVnodeMsg *pCreate = rpcMsg->pCont; pCreate->cfg.vgId = htonl(pCreate->cfg.vgId); pCreate->cfg.maxSessions = htonl(pCreate->cfg.maxSessions); pCreate->cfg.daysPerFile = htonl(pCreate->cfg.daysPerFile); - dTrace("vgroup:%d, start to alter vnode in dnode", pCreate->cfg.vgId); - - SVnodeObj *pVnodeObj = (SVnodeObj *) taosGetIntHashData(tsDnodeVnodesHash, pCreate->cfg.vgId); - if (pVnodeObj != NULL) { - dPrint("pVnode:%p, vgroup:%d, start to alter vnode in dnode", pVnodeObj, pCreate->cfg.vgId); - rpcRsp.code = TSDB_CODE_SUCCESS; - } else { - dTrace("vgroup:%d, alter vnode msg received, start to create vnode", pCreate->cfg.vgId); - rpcRsp.code = dnodeCreateVnode(pCreate);; - } - - rpcSendResponse(&rpcRsp); + return 0; } -static void dnodeProcessAlterStreamMsg(SRpcMsg *pMsg) { +static int32_t dnodeProcessAlterStreamMsg(SRpcMsg *pMsg) { // SMDAlterStreamMsg *pStream = pCont; // pStream->uid = htobe64(pStream->uid); // pStream->stime = htobe64(pStream->stime); @@ -437,14 +210,13 @@ static void dnodeProcessAlterStreamMsg(SRpcMsg *pMsg) { // pStream->status = htonl(pStream->status); // // int32_t code = dnodeCreateStream(pStream); + + return 0; } -static void dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg) { +static int32_t dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg) { SMDCfgDnodeMsg *pCfg = (SMDCfgDnodeMsg *)pMsg->pCont; - int32_t code = tsCfgDynamicOptions(pCfg->config); - - SRpcMsg rpcRsp = {.handle = pMsg->handle, .pCont = NULL, .contLen = 0, .code = code, .msgType = 0}; - rpcSendResponse(&rpcRsp); + return tsCfgDynamicOptions(pCfg->config); } static void dnodeBuildVloadMsg(char *pNode, void * param) { diff --git a/src/dnode/src/dnodeRead.c b/src/dnode/src/dnodeRead.c index 1c3e3a8638..827c2b004f 100644 --- a/src/dnode/src/dnodeRead.c +++ b/src/dnode/src/dnodeRead.c @@ -22,9 +22,11 @@ #include "tqueue.h" #include "trpc.h" +#include "twal.h" #include "dnodeMgmt.h" #include "dnodeRead.h" #include "queryExecutor.h" +#include "vnode.h" typedef struct { int32_t code; @@ -76,7 +78,8 @@ void dnodeRead(SRpcMsg *pMsg) { int32_t leftLen = pMsg->contLen; char *pCont = (char *) pMsg->pCont; SRpcContext *pRpcContext = NULL; - + void *pVnode; + dTrace("dnode %s msg incoming, thandle:%p", taosMsg[pMsg->msgType], pMsg->handle); if (pMsg->msgType == TSDB_MSG_TYPE_RETRIEVE) { @@ -88,7 +91,7 @@ void dnodeRead(SRpcMsg *pMsg) { pHead->vgId = htonl(pHead->vgId); pHead->contLen = htonl(pHead->contLen); - void *pVnode = dnodeGetVnode(pHead->vgId); + pVnode = vnodeGetVnode(pHead->vgId); if (pVnode == NULL) { leftLen -= pHead->contLen; pCont -= pHead->contLen; @@ -96,13 +99,13 @@ void dnodeRead(SRpcMsg *pMsg) { } // put message into queue + taos_queue queue = vnodeGetRqueue(pVnode); SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg)); pRead->rpcMsg = *pMsg; pRead->pCont = pCont; pRead->contLen = pHead->contLen; pRead->pRpcContext = pRpcContext; - taos_queue queue = dnodeGetVnodeRworker(pVnode); taosWriteQitem(queue, TAOS_QTYPE_RPC, pRead); // next vnode @@ -123,7 +126,7 @@ void dnodeRead(SRpcMsg *pMsg) { } } -void *dnodeAllocateReadWorker(void *pVnode) { +void *dnodeAllocateRqueue(void *pVnode) { taos_queue *queue = taosOpenQueue(sizeof(SReadMsg)); if (queue == NULL) return NULL; @@ -144,7 +147,7 @@ void *dnodeAllocateReadWorker(void *pVnode) { return queue; } -void dnodeFreeReadWorker(void *rqueue) { +void dnodeFreeRqueue(void *rqueue) { taosCloseQueue(rqueue); // dynamically adjust the number of threads @@ -229,7 +232,7 @@ static void dnodeContinueExecuteQuery(void* pVnode, void* qhandle, SReadMsg *pMs pRead->pRpcContext = pMsg->pRpcContext; pRead->rpcMsg.msgType = TSDB_MSG_TYPE_QUERY; - taos_queue queue = dnodeGetVnodeRworker(pVnode); + taos_queue queue = vnodeGetRqueue(pVnode); taosWriteQitem(queue, TAOS_QTYPE_RPC, pRead); } @@ -238,7 +241,7 @@ static void dnodeProcessQueryMsg(void *pVnode, SReadMsg *pMsg) { SQInfo* pQInfo = NULL; if (pMsg->contLen != 0) { - void* tsdb = dnodeGetVnodeTsdb(pVnode); + void* tsdb = vnodeGetTsdb(pVnode); int32_t code = qCreateQueryInfo(tsdb, pQueryTableMsg, &pQInfo); SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp)); @@ -255,7 +258,7 @@ static void dnodeProcessQueryMsg(void *pVnode, SReadMsg *pMsg) { rpcSendResponse(&rpcRsp); dTrace("dnode query msg disposed, thandle:%p", pMsg->rpcMsg.handle); - dnodeReleaseVnode(pVnode); + vnodeRelease(pVnode); } else { pQInfo = pMsg->pCont; } @@ -299,5 +302,5 @@ static void dnodeProcessRetrieveMsg(void *pVnode, SReadMsg *pMsg) { rpcSendResponse(&rpcRsp); dTrace("dnode retrieve msg disposed, thandle:%p", pMsg->rpcMsg.handle); - dnodeReleaseVnode(pVnode); + vnodeRelease(pVnode); } diff --git a/src/dnode/src/dnodeWrite.c b/src/dnode/src/dnodeWrite.c index 75169a3fa2..66651fdf5f 100644 --- a/src/dnode/src/dnodeWrite.c +++ b/src/dnode/src/dnodeWrite.c @@ -21,22 +21,11 @@ #include "tqueue.h" #include "trpc.h" #include "tsdb.h" +#include "twal.h" #include "dataformat.h" #include "dnodeWrite.h" #include "dnodeMgmt.h" - -typedef struct { - int32_t code; - int32_t count; // number of vnodes returned result - int32_t numOfVnodes; // number of vnodes involved -} SRpcContext; - -typedef struct _write { - void *pCont; - int32_t contLen; - SRpcMsg rpcMsg; - SRpcContext *pRpcContext; // RPC message context -} SWriteMsg; +#include "vnode.h" typedef struct { taos_qset qset; // queue set @@ -44,30 +33,26 @@ typedef struct { int32_t workerId; // worker ID } SWriteWorker; +typedef struct { + void *pCont; + int32_t contLen; + SRpcMsg rpcMsg; +} SWriteMsg; + typedef struct _thread_obj { int32_t max; // max number of workers int32_t nextId; // from 0 to max-1, cyclic SWriteWorker *writeWorker; } SWriteWorkerPool; -static void (*dnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MAX])(void *, SWriteMsg *); static void *dnodeProcessWriteQueue(void *param); static void dnodeHandleIdleWorker(SWriteWorker *pWorker); -static void dnodeProcessWriteResult(void *pVnode, SWriteMsg *pWrite); -static void dnodeProcessSubmitMsg(void *pVnode, SWriteMsg *pMsg); -static void dnodeProcessCreateTableMsg(void *pVnode, SWriteMsg *pMsg); -static void dnodeProcessDropTableMsg(void *pVnode, SWriteMsg *pMsg); -static void dnodeProcessAlterTableMsg(void *pVnode, SWriteMsg *pMsg); -static void dnodeProcessDropStableMsg(void *pVnode, SWriteMsg *pMsg); SWriteWorkerPool wWorkerPool; int32_t dnodeInitWrite() { - dnodeProcessWriteMsgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeProcessSubmitMsg; - dnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE] = dnodeProcessCreateTableMsg; - dnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_DROP_TABLE] = dnodeProcessDropTableMsg; - dnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE] = dnodeProcessAlterTableMsg; - dnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_DROP_STABLE] = dnodeProcessDropStableMsg; + + vnodeInitWrite(); wWorkerPool.max = tsNumOfCores; wWorkerPool.writeWorker = (SWriteWorker *)calloc(sizeof(SWriteWorker), wWorkerPool.max); @@ -87,50 +72,28 @@ void dnodeCleanupWrite() { } void dnodeWrite(SRpcMsg *pMsg) { - int32_t queuedMsgNum = 0; - int32_t leftLen = pMsg->contLen; char *pCont = (char *) pMsg->pCont; - SRpcContext *pRpcContext = NULL; if (pMsg->msgType == TSDB_MSG_TYPE_SUBMIT || pMsg->msgType == TSDB_MSG_TYPE_MD_DROP_STABLE) { SMsgDesc *pDesc = (SMsgDesc *)pCont; pDesc->numOfVnodes = htonl(pDesc->numOfVnodes); pCont += sizeof(SMsgDesc); - if (pDesc->numOfVnodes > 1) { - pRpcContext = calloc(sizeof(SRpcContext), 1); - pRpcContext->numOfVnodes = pDesc->numOfVnodes; - } } - while (leftLen > 0) { - SMsgHead *pHead = (SMsgHead *) pCont; - pHead->vgId = htonl(pHead->vgId); - pHead->contLen = htonl(pHead->contLen); + SMsgHead *pHead = (SMsgHead *) pCont; + pHead->vgId = htonl(pHead->vgId); + pHead->contLen = htonl(pHead->contLen); - void *pVnode = dnodeGetVnode(pHead->vgId); - if (pVnode == NULL) { - leftLen -= pHead->contLen; - pCont -= pHead->contLen; - continue; - } - + taos_queue queue = vnodeGetWqueue(pHead->vgId); + if (queue) { // put message into queue SWriteMsg *pWrite = (SWriteMsg *)taosAllocateQitem(sizeof(SWriteMsg)); pWrite->rpcMsg = *pMsg; pWrite->pCont = pCont; pWrite->contLen = pHead->contLen; - pWrite->pRpcContext = pRpcContext; - - taos_queue queue = dnodeGetVnodeWworker(pVnode); - taosWriteQitem(queue, TAOS_QTYPE_RPC, pWrite); - // next vnode - leftLen -= pHead->contLen; - pCont -= pHead->contLen; - queuedMsgNum++; - } - - if (queuedMsgNum == 0) { + taosWriteQitem(queue, TAOS_QTYPE_RPC, pWrite); + } else { SRpcMsg rpcRsp = { .handle = pMsg->handle, .pCont = NULL, @@ -142,7 +105,7 @@ void dnodeWrite(SRpcMsg *pMsg) { } } -void *dnodeAllocateWriteWorker(void *pVnode) { +void *dnodeAllocateWqueue(void *pVnode) { SWriteWorker *pWorker = wWorkerPool.writeWorker + wWorkerPool.nextId; taos_queue *queue = taosOpenQueue(); if (queue == NULL) return NULL; @@ -167,22 +130,44 @@ void *dnodeAllocateWriteWorker(void *pVnode) { wWorkerPool.nextId = (wWorkerPool.nextId + 1) % wWorkerPool.max; } + dTrace("queue:%p is allocated for pVnode:%p", queue, pVnode); + return queue; } -void dnodeFreeWriteWorker(void *wqueue) { +void dnodeFreeWqueue(void *wqueue) { taosCloseQueue(wqueue); // dynamically adjust the number of threads } +void dnodeSendRpcWriteRsp(void *pVnode, void *param, int32_t code) { + SWriteMsg *pWrite = (SWriteMsg *)param; + + if (code > 0) return; + + SRpcMsg rpcRsp = { + .handle = pWrite->rpcMsg.handle, + .pCont = NULL, + .contLen = 0, + .code = code, + }; + + rpcSendResponse(&rpcRsp); + rpcFreeCont(pWrite->rpcMsg.pCont); + taosFreeQitem(pWrite); + + vnodeRelease(pVnode); +} + static void *dnodeProcessWriteQueue(void *param) { SWriteWorker *pWorker = (SWriteWorker *)param; taos_qall qall; - SWriteMsg *pWriteMsg; + SWriteMsg *pWrite; + SWalHead *pHead; int32_t numOfMsgs; int type; - void *pVnode; + void *pVnode, *item; qall = taosAllocateQall(); @@ -194,29 +179,35 @@ static void *dnodeProcessWriteQueue(void *param) { } for (int32_t i = 0; i < numOfMsgs; ++i) { - // retrieve all items, and write them into WAL - taosGetQitem(qall, &type, (void **)&pWriteMsg); + pWrite = NULL; + taosGetQitem(qall, &type, &item); + if (type == TAOS_QTYPE_RPC) { + pWrite = (SWriteMsg *)item; + pHead = (SWalHead *)(pWrite->pCont - sizeof(SWalHead)); + pHead->msgType = pWrite->rpcMsg.msgType; + pHead->version = 0; + pHead->len = pWrite->contLen; + } else { + pHead = (SWalHead *)item; + } - // walWrite(pVnode->whandle, writeMsg.rpcMsg.msgType, writeMsg.pCont, writeMsg.contLen); + int32_t code = vnodeProcessWrite(pVnode, type, pHead, item); + if (pWrite) pWrite->rpcMsg.code = code; } - // flush WAL file - // walFsync(pVnode->whandle); + walFsync(vnodeGetWal(pVnode)); // browse all items, and process them one by one taosResetQitems(qall); for (int32_t i = 0; i < numOfMsgs; ++i) { - taosGetQitem(qall, &type, (void **)&pWriteMsg); - - terrno = 0; - if (dnodeProcessWriteMsgFp[pWriteMsg->rpcMsg.msgType]) { - (*dnodeProcessWriteMsgFp[pWriteMsg->rpcMsg.msgType]) (pVnode, pWriteMsg); + taosGetQitem(qall, &type, &item); + if (type == TAOS_QTYPE_RPC) { + pWrite = (SWriteMsg *)item; + dnodeSendRpcWriteRsp(pVnode, item, pWrite->rpcMsg.code); } else { - terrno = TSDB_CODE_MSG_NOT_PROCESSED; + taosFreeQitem(item); + vnodeRelease(pVnode); } - - dnodeProcessWriteResult(pVnode, pWriteMsg); - taosFreeQitem(pWriteMsg); } } @@ -225,36 +216,6 @@ static void *dnodeProcessWriteQueue(void *param) { return NULL; } -static void dnodeProcessWriteResult(void *pVnode, SWriteMsg *pWrite) { - SRpcContext *pRpcContext = pWrite->pRpcContext; - int32_t code = 0; - - dnodeReleaseVnode(pVnode); - - if (pRpcContext) { - if (terrno) { - if (pRpcContext->code == 0) pRpcContext->code = terrno; - } - - int32_t count = atomic_add_fetch_32(&pRpcContext->count, 1); - if (count < pRpcContext->numOfVnodes) { - // not over yet, multiple vnodes - return; - } - - // over, result can be merged now - code = pRpcContext->code; - } else { - code = terrno; - } - - SRpcMsg rsp; - rsp.handle = pWrite->rpcMsg.handle; - rsp.code = code; - rsp.pCont = NULL; - rpcSendResponse(&rsp); - rpcFreeCont(pWrite->rpcMsg.pCont); // free the received message -} static void dnodeHandleIdleWorker(SWriteWorker *pWorker) { int32_t num = taosGetQueueNumber(pWorker->qset); @@ -269,174 +230,3 @@ static void dnodeHandleIdleWorker(SWriteWorker *pWorker) { } } -static void dnodeProcessSubmitMsg(void *pVnode, SWriteMsg *pMsg) { - dTrace("pVnode:%p, submit msg is disposed", pVnode); - - SShellSubmitRspMsg *pRsp = rpcMallocCont(sizeof(SShellSubmitRspMsg)); - pRsp->code = 0; - pRsp->numOfRows = htonl(1); - pRsp->affectedRows = htonl(1); - pRsp->numOfFailedBlocks = 0; - - void* tsdb = dnodeGetVnodeTsdb(pVnode); - assert(tsdb != NULL); - - tsdbInsertData(tsdb, pMsg->pCont); - - SRpcMsg rpcRsp = { - .handle = pMsg->rpcMsg.handle, - .pCont = pRsp, - .contLen = sizeof(SShellSubmitRspMsg), - .code = 0, - .msgType = 0 - }; - - rpcSendResponse(&rpcRsp); -} - -static void dnodeProcessCreateTableMsg(void *pVnode, SWriteMsg *pMsg) { - SMDCreateTableMsg *pTable = pMsg->rpcMsg.pCont; - SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; - - dTrace("pVnode:%p, table:%s, start to create in dnode, vgroup:%d", pVnode, pTable->tableId, pTable->vgId); - pTable->numOfColumns = htons(pTable->numOfColumns); - pTable->numOfTags = htons(pTable->numOfTags); - pTable->sid = htonl(pTable->sid); - pTable->sversion = htonl(pTable->sversion); - pTable->tagDataLen = htonl(pTable->tagDataLen); - pTable->sqlDataLen = htonl(pTable->sqlDataLen); - pTable->uid = htobe64(pTable->uid); - pTable->superTableUid = htobe64(pTable->superTableUid); - pTable->createdTime = htobe64(pTable->createdTime); - SSchema *pSchema = (SSchema *) pTable->data; - - int totalCols = pTable->numOfColumns + pTable->numOfTags; - for (int i = 0; i < totalCols; i++) { - pSchema[i].colId = htons(pSchema[i].colId); - pSchema[i].bytes = htons(pSchema[i].bytes); - } - - STableCfg tCfg; - tsdbInitTableCfg(&tCfg, pTable->tableType, pTable->uid, pTable->sid); - - STSchema *pDestSchema = tdNewSchema(pTable->numOfColumns); - for (int i = 0; i < pTable->numOfColumns; i++) { - tdSchemaAppendCol(pDestSchema, pSchema[i].type, pSchema[i].colId, pSchema[i].bytes); - } - tsdbTableSetSchema(&tCfg, pDestSchema, false); - - if (pTable->numOfTags != 0) { - STSchema *pDestTagSchema = tdNewSchema(pTable->numOfTags); - for (int i = pTable->numOfColumns; i < totalCols; i++) { - tdSchemaAppendCol(pDestTagSchema, pSchema[i].type, pSchema[i].colId, pSchema[i].bytes); - } - tsdbTableSetTagSchema(&tCfg, pDestTagSchema, false); - - char *pTagData = pTable->data + totalCols * sizeof(SSchema); - int accumBytes = 0; - SDataRow dataRow = tdNewDataRowFromSchema(pDestTagSchema); - - for (int i = 0; i < pTable->numOfTags; i++) { - tdAppendColVal(dataRow, pTagData + accumBytes, pDestTagSchema->columns + i); - accumBytes += pSchema[i + pTable->numOfColumns].bytes; - } - tsdbTableSetTagValue(&tCfg, dataRow, false); - } - - void *pTsdb = dnodeGetVnodeTsdb(pVnode); - rpcRsp.code = tsdbCreateTable(pTsdb, &tCfg); - - dTrace("pVnode:%p, table:%s, create table result:%s", pVnode, pTable->tableId, tstrerror(rpcRsp.code)); - rpcSendResponse(&rpcRsp); -} - -static void dnodeProcessDropTableMsg(void *pVnode, SWriteMsg *pMsg) { - SMDDropTableMsg *pTable = pMsg->rpcMsg.pCont; - SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; - - dTrace("pVnode:%p, table:%s, start to drop in dnode, vgroup:%d", pVnode, pTable->tableId, pTable->vgId); - STableId tableId = { - .uid = htobe64(pTable->uid), - .tid = htonl(pTable->sid) - }; - - void *pTsdb = dnodeGetVnodeTsdb(pVnode); - rpcRsp.code = tsdbDropTable(pTsdb, tableId); - - dTrace("pVnode:%p, table:%s, drop table result:%s", pVnode, pTable->tableId, tstrerror(rpcRsp.code)); - rpcSendResponse(&rpcRsp); -} - -static void dnodeProcessAlterTableMsg(void *pVnode, SWriteMsg *pMsg) { - SMDCreateTableMsg *pTable = pMsg->rpcMsg.pCont; - SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; - - dTrace("pVnode:%p, table:%s, start to alter in dnode, vgroup:%d", pVnode, pTable->tableId, pTable->vgId); - pTable->numOfColumns = htons(pTable->numOfColumns); - pTable->numOfTags = htons(pTable->numOfTags); - pTable->sid = htonl(pTable->sid); - pTable->sversion = htonl(pTable->sversion); - pTable->tagDataLen = htonl(pTable->tagDataLen); - pTable->sqlDataLen = htonl(pTable->sqlDataLen); - pTable->uid = htobe64(pTable->uid); - pTable->superTableUid = htobe64(pTable->superTableUid); - pTable->createdTime = htobe64(pTable->createdTime); - SSchema *pSchema = (SSchema *) pTable->data; - - int totalCols = pTable->numOfColumns + pTable->numOfTags; - for (int i = 0; i < totalCols; i++) { - pSchema[i].colId = htons(pSchema[i].colId); - pSchema[i].bytes = htons(pSchema[i].bytes); - } - - STableCfg tCfg; - tsdbInitTableCfg(&tCfg, pTable->tableType, pTable->uid, pTable->sid); - - STSchema *pDestSchema = tdNewSchema(pTable->numOfColumns); - for (int i = 0; i < pTable->numOfColumns; i++) { - tdSchemaAppendCol(pDestSchema, pSchema[i].type, pSchema[i].colId, pSchema[i].bytes); - } - tsdbTableSetSchema(&tCfg, pDestSchema, false); - - if (pTable->numOfTags != 0) { - STSchema *pDestTagSchema = tdNewSchema(pTable->numOfTags); - for (int i = pTable->numOfColumns; i < totalCols; i++) { - tdSchemaAppendCol(pDestTagSchema, pSchema[i].type, pSchema[i].colId, pSchema[i].bytes); - } - tsdbTableSetSchema(&tCfg, pDestTagSchema, false); - - char *pTagData = pTable->data + totalCols * sizeof(SSchema); - int accumBytes = 0; - SDataRow dataRow = tdNewDataRowFromSchema(pDestTagSchema); - - for (int i = 0; i < pTable->numOfTags; i++) { - tdAppendColVal(dataRow, pTagData + accumBytes, pDestTagSchema->columns + i); - accumBytes += pSchema[i + pTable->numOfColumns].bytes; - } - tsdbTableSetTagValue(&tCfg, dataRow, false); - } - - void *pTsdb = dnodeGetVnodeTsdb(pVnode); - rpcRsp.code = tsdbAlterTable(pTsdb, &tCfg); - - dTrace("pVnode:%p, table:%s, alter table result:%s", pVnode, pTable->tableId, tstrerror(rpcRsp.code)); - rpcSendResponse(&rpcRsp); -} - -static void dnodeProcessDropStableMsg(void *pVnode, SWriteMsg *pMsg) { - SMDDropSTableMsg *pTable = pMsg->rpcMsg.pCont; - SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; - - dTrace("pVnode:%p, stable:%s, start to it drop in dnode, vgroup:%d", pVnode, pTable->tableId, pTable->vgId); - pTable->uid = htobe64(pTable->uid); - - // TODO: drop stable in vvnode - //void *pTsdb = dnodeGetVnodeTsdb(pMsg->pVnode); - //rpcRsp.code = tsdbDropSTable(pTsdb, pTable->uid); - - rpcRsp.code = TSDB_CODE_SUCCESS; - - dTrace("pVnode:%p, stable:%s, drop stable result:%s", pVnode, pTable->tableId, tstrerror(rpcRsp.code)); - rpcSendResponse(&rpcRsp); -} - diff --git a/src/dnode/src/vnodeMain.c b/src/dnode/src/vnodeMain.c new file mode 100644 index 0000000000..b4d1349e29 --- /dev/null +++ b/src/dnode/src/vnodeMain.c @@ -0,0 +1,207 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "os.h" +#include "ihash.h" +#include "taoserror.h" +#include "taosmsg.h" +#include "tlog.h" +#include "trpc.h" +#include "tstatus.h" +#include "tsdb.h" +#include "ttime.h" +#include "ttimer.h" +#include "twal.h" +#include "dnodeMClient.h" +#include "dnodeMgmt.h" +#include "dnodeRead.h" +#include "dnodeWrite.h" +#include "vnode.h" +#include "vnodeInt.h" + +extern void *tsDnodeVnodesHash; +static void vnodeCleanUp(SVnodeObj *pVnode); + +int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) { + int32_t code; + + SVnodeObj *pTemp = (SVnodeObj *) taosGetIntHashData(tsDnodeVnodesHash, pVnodeCfg->cfg.vgId); + + if (pTemp != NULL) { + dPrint("vgId:%d, vnode already exist, pVnode:%p", pVnodeCfg->cfg.vgId, pTemp); + return TSDB_CODE_SUCCESS; + } + + STsdbCfg tsdbCfg = {0}; + tsdbCfg.precision = pVnodeCfg->cfg.precision; + tsdbCfg.tsdbId = pVnodeCfg->cfg.vgId; + tsdbCfg.maxTables = pVnodeCfg->cfg.maxSessions; + tsdbCfg.daysPerFile = pVnodeCfg->cfg.daysPerFile; + tsdbCfg.minRowsPerFileBlock = -1; + tsdbCfg.maxRowsPerFileBlock = -1; + tsdbCfg.keep = -1; + tsdbCfg.maxCacheSize = -1; + + char rootDir[TSDB_FILENAME_LEN] = {0}; + sprintf(rootDir, "%s/vnode%d", tsVnodeDir, pVnodeCfg->cfg.vgId); + if (mkdir(rootDir, 0755) != 0) { + if (errno == EACCES) { + return TSDB_CODE_NO_DISK_PERMISSIONS; + } else if (errno == ENOSPC) { + return TSDB_CODE_SERV_NO_DISKSPACE; + } else if (errno == EEXIST) { + } else { + return TSDB_CODE_VG_INIT_FAILED; + } + } + + char tsdbDir[TSDB_FILENAME_LEN] = {0}; + sprintf(tsdbDir, "%s/vnode%d/tsdb", tsVnodeDir, pVnodeCfg->cfg.vgId); + code = tsdbCreateRepo(tsdbDir, &tsdbCfg, NULL); + if (code <0) { + dError("vgId:%d, failed to create tsdb in vnode, reason:%s", pVnodeCfg->cfg.vgId, tstrerror(terrno)); + return code; + } + + dPrint("vgId:%d, vnode is created", pVnodeCfg->cfg.vgId); + code = vnodeOpen(pVnodeCfg->cfg.vgId, rootDir); + + return code; +} + +int32_t vnodeDrop(int32_t vgId) { + + SVnodeObj *pVnode = (SVnodeObj *) taosGetIntHashData(tsDnodeVnodesHash, vgId); + if (pVnode == NULL) { + dTrace("vgId:%d, failed to drop, vgId not exist", vgId); + return TSDB_CODE_INVALID_VGROUP_ID; + } + + dTrace("pVnode:%p vgId:%d, vnode will be dropped", pVnode, pVnode->vgId); + pVnode->status = VN_STATUS_DELETING; + vnodeCleanUp(pVnode); + + return TSDB_CODE_SUCCESS; +} + +int32_t vnodeOpen(int32_t vnode, char *rootDir) { + char temp[TSDB_FILENAME_LEN]; + + SVnodeObj vnodeObj = {0}; + vnodeObj.vgId = vnode; + vnodeObj.status = VN_STATUS_INIT; + vnodeObj.refCount = 1; + vnodeObj.version = 0; + SVnodeObj *pVnode = (SVnodeObj *)taosAddIntHash(tsDnodeVnodesHash, vnodeObj.vgId, (char *)(&vnodeObj)); + + sprintf(temp, "%s/tsdb", rootDir); + void *pTsdb = tsdbOpenRepo(temp); + if (pTsdb == NULL) { + dError("pVnode:%p vgId:%d, failed to open tsdb at %s(%s)", pVnode, pVnode->vgId, temp, tstrerror(terrno)); + taosDeleteIntHash(tsDnodeVnodesHash, pVnode->vgId); + return terrno; + } + + pVnode->wqueue = dnodeAllocateWqueue(pVnode); + pVnode->rqueue = dnodeAllocateRqueue(pVnode); + + sprintf(temp, "%s/wal", rootDir); + pVnode->wal = walOpen(temp, 3, tsCommitLog); + pVnode->tsdb = pTsdb; + pVnode->sync = NULL; + pVnode->events = NULL; + pVnode->cq = NULL; + + walRestore(pVnode->wal, pVnode, vnodeWriteToQueue); + + pVnode->status = VN_STATUS_READY; + dTrace("pVnode:%p vgId:%d, vnode is opened in %s", pVnode, pVnode->vgId, rootDir); + + return TSDB_CODE_SUCCESS; +} + +int32_t vnodeClose(void *param) { + SVnodeObj *pVnode = (SVnodeObj *)param; + + dTrace("pVnode:%p vgId:%d, vnode will be closed", pVnode, pVnode->vgId); + pVnode->status = VN_STATUS_CLOSING; + vnodeCleanUp(pVnode); + + return 0; +} + +void vnodeRelease(void *pVnodeRaw) { + SVnodeObj *pVnode = pVnodeRaw; + + int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1); + + if (refCount > 0) return; + + // remove read queue + dnodeFreeRqueue(pVnode->rqueue); + pVnode->rqueue = NULL; + + // remove write queue + dnodeFreeWqueue(pVnode->wqueue); + pVnode->wqueue = NULL; + + if (pVnode->status == VN_STATUS_DELETING) { + // remove the whole directory + } + + dTrace("pVnode:%p vgId:%d, vnode is released", pVnode, pVnode->vgId); +} + +void *vnodeGetVnode(int32_t vgId) { + SVnodeObj *pVnode = (SVnodeObj *) taosGetIntHashData(tsDnodeVnodesHash, vgId); + if (pVnode == NULL) { + terrno = TSDB_CODE_INVALID_VGROUP_ID; + return NULL; + } + + atomic_add_fetch_32(&pVnode->refCount, 1); + dTrace("pVnode:%p vgId:%d, get vnode, refCount:%d", pVnode, pVnode->vgId, pVnode->refCount); + + return pVnode; +} + +void *vnodeGetRqueue(void *pVnode) { + return ((SVnodeObj *)pVnode)->rqueue; +} + +void *vnodeGetWqueue(int32_t vgId) { + SVnodeObj *pVnode = vnodeGetVnode(vgId); + if (pVnode == NULL) return NULL; + return pVnode->wqueue; +} + +void *vnodeGetWal(void *pVnode) { + return ((SVnodeObj *)pVnode)->wal; +} + +void *vnodeGetTsdb(void *pVnode) { + return ((SVnodeObj *)pVnode)->tsdb; +} + +static void vnodeCleanUp(SVnodeObj *pVnode) { + taosDeleteIntHash(tsDnodeVnodesHash, pVnode->vgId); + + //syncStop(pVnode->sync); + tsdbCloseRepo(pVnode->tsdb); + walClose(pVnode->wal); + + vnodeRelease(pVnode); +} diff --git a/src/dnode/src/vnodeWrite.c b/src/dnode/src/vnodeWrite.c new file mode 100644 index 0000000000..4133b666d6 --- /dev/null +++ b/src/dnode/src/vnodeWrite.c @@ -0,0 +1,254 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "os.h" +#include "taosmsg.h" +#include "taoserror.h" +#include "tlog.h" +#include "tqueue.h" +#include "trpc.h" +#include "tsdb.h" +#include "twal.h" +#include "dataformat.h" +#include "dnodeWrite.h" +#include "dnodeMgmt.h" +#include "vnode.h" +#include "vnodeInt.h" + +static int32_t (*vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *, void *, void*); +static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pMsg, void *); +static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pMsg, void *); +static int32_t vnodeProcessDropTableMsg(SVnodeObj *pVnode, void *pMsg, void *); +static int32_t vnodeProcessAlterTableMsg(SVnodeObj *pVnode, void *pMsg, void *); +static int32_t vnodeProcessDropStableMsg(SVnodeObj *pVnode, void *pMsg, void *); + +int32_t vnodeInitWrite() { + vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_SUBMIT] = vnodeProcessSubmitMsg; + vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE] = vnodeProcessCreateTableMsg; + vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_DROP_TABLE] = vnodeProcessDropTableMsg; + vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE] = vnodeProcessAlterTableMsg; + vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_DROP_STABLE] = vnodeProcessDropStableMsg; + + return 0; +} + +int32_t vnodeProcessWrite(void *param, int qtype, SWalHead *pHead, void *item) { + int32_t code = 0; + SVnodeObj *pVnode = (SVnodeObj *)param; + + if (pVnode->status == VN_STATUS_DELETING || pVnode->status == VN_STATUS_CLOSING) + return TSDB_CODE_NOT_ACTIVE_VNODE; + + if (pHead->version == 0) { // from client + if (pVnode->status != VN_STATUS_READY) + return TSDB_CODE_NOT_ACTIVE_VNODE; + + // if (pVnode->replica > 1 && pVnode->role != TAOS_SYNC_ROLE_MASTER) + + // assign version + pVnode->version++; + pHead->version = pVnode->version; + } else { + // for data from WAL or forward, version may be smaller + if (pHead->version <= pVnode->version) return 0; + } + + // more status and role checking here + + pVnode->version = pHead->version; + + // write into WAL + code = walWrite(pVnode->wal, pHead); + if ( code < 0) return code; + + code = (*vnodeProcessWriteMsgFp[pHead->msgType])(pVnode, pHead->cont, item); + if (code < 0) return code; + +/* forward + if (pVnode->replica > 1 && pVnode->role == TAOS_SYNC_ROLE_MASTER) { + code = syncForwardToPeer(pVnode->sync, pHead, item); + } +*/ + + return code; +} + +static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pCont, void *item) { + int32_t code = 0; + + // save insert result into item + + dTrace("pVnode:%p vgId:%d, submit msg is processed", pVnode, pVnode->vgId); + code = tsdbInsertData(pVnode->tsdb, pCont); + + return code; +} + +static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, void *item) { + SMDCreateTableMsg *pTable = pCont; + int32_t code = 0; + + dTrace("pVnode:%p vgId:%d, table:%s, start to create", pVnode, pVnode->vgId, pTable->tableId); + pTable->numOfColumns = htons(pTable->numOfColumns); + pTable->numOfTags = htons(pTable->numOfTags); + pTable->sid = htonl(pTable->sid); + pTable->sversion = htonl(pTable->sversion); + pTable->tagDataLen = htonl(pTable->tagDataLen); + pTable->sqlDataLen = htonl(pTable->sqlDataLen); + pTable->uid = htobe64(pTable->uid); + pTable->superTableUid = htobe64(pTable->superTableUid); + pTable->createdTime = htobe64(pTable->createdTime); + SSchema *pSchema = (SSchema *) pTable->data; + + int totalCols = pTable->numOfColumns + pTable->numOfTags; + for (int i = 0; i < totalCols; i++) { + pSchema[i].colId = htons(pSchema[i].colId); + pSchema[i].bytes = htons(pSchema[i].bytes); + } + + STableCfg tCfg; + tsdbInitTableCfg(&tCfg, pTable->tableType, pTable->uid, pTable->sid); + + STSchema *pDestSchema = tdNewSchema(pTable->numOfColumns); + for (int i = 0; i < pTable->numOfColumns; i++) { + tdSchemaAppendCol(pDestSchema, pSchema[i].type, pSchema[i].colId, pSchema[i].bytes); + } + tsdbTableSetSchema(&tCfg, pDestSchema, false); + + if (pTable->numOfTags != 0) { + STSchema *pDestTagSchema = tdNewSchema(pTable->numOfTags); + for (int i = pTable->numOfColumns; i < totalCols; i++) { + tdSchemaAppendCol(pDestTagSchema, pSchema[i].type, pSchema[i].colId, pSchema[i].bytes); + } + tsdbTableSetTagSchema(&tCfg, pDestTagSchema, false); + + char *pTagData = pTable->data + totalCols * sizeof(SSchema); + int accumBytes = 0; + SDataRow dataRow = tdNewDataRowFromSchema(pDestTagSchema); + + for (int i = 0; i < pTable->numOfTags; i++) { + tdAppendColVal(dataRow, pTagData + accumBytes, pDestTagSchema->columns + i); + accumBytes += pSchema[i + pTable->numOfColumns].bytes; + } + tsdbTableSetTagValue(&tCfg, dataRow, false); + } + + void *pTsdb = vnodeGetTsdb(pVnode); + code = tsdbCreateTable(pTsdb, &tCfg); + + dTrace("pVnode:%p vgId:%d, table:%s is created, result:%x", pVnode, pVnode->vgId, pTable->tableId, code); + return code; +} + +static int32_t vnodeProcessDropTableMsg(SVnodeObj *pVnode, void *pCont, void *item) { + SMDDropTableMsg *pTable = pCont; + int32_t code = 0; + + dTrace("pVnode:%p vgId:%d, table:%s, start to drop", pVnode, pVnode->vgId, pTable->tableId); + STableId tableId = { + .uid = htobe64(pTable->uid), + .tid = htonl(pTable->sid) + }; + + code = tsdbDropTable(pVnode->tsdb, tableId); + + return code; +} + +static int32_t vnodeProcessAlterTableMsg(SVnodeObj *pVnode, void *pCont, void *item) { + SMDCreateTableMsg *pTable = pCont; + int32_t code = 0; + + dTrace("pVnode:%p vgId:%d, table:%s, start to alter", pVnode, pVnode->vgId, pTable->tableId); + pTable->numOfColumns = htons(pTable->numOfColumns); + pTable->numOfTags = htons(pTable->numOfTags); + pTable->sid = htonl(pTable->sid); + pTable->sversion = htonl(pTable->sversion); + pTable->tagDataLen = htonl(pTable->tagDataLen); + pTable->sqlDataLen = htonl(pTable->sqlDataLen); + pTable->uid = htobe64(pTable->uid); + pTable->superTableUid = htobe64(pTable->superTableUid); + pTable->createdTime = htobe64(pTable->createdTime); + SSchema *pSchema = (SSchema *) pTable->data; + + int totalCols = pTable->numOfColumns + pTable->numOfTags; + for (int i = 0; i < totalCols; i++) { + pSchema[i].colId = htons(pSchema[i].colId); + pSchema[i].bytes = htons(pSchema[i].bytes); + } + + STableCfg tCfg; + tsdbInitTableCfg(&tCfg, pTable->tableType, pTable->uid, pTable->sid); + + STSchema *pDestSchema = tdNewSchema(pTable->numOfColumns); + for (int i = 0; i < pTable->numOfColumns; i++) { + tdSchemaAppendCol(pDestSchema, pSchema[i].type, pSchema[i].colId, pSchema[i].bytes); + } + tsdbTableSetSchema(&tCfg, pDestSchema, false); + + if (pTable->numOfTags != 0) { + STSchema *pDestTagSchema = tdNewSchema(pTable->numOfTags); + for (int i = pTable->numOfColumns; i < totalCols; i++) { + tdSchemaAppendCol(pDestTagSchema, pSchema[i].type, pSchema[i].colId, pSchema[i].bytes); + } + tsdbTableSetSchema(&tCfg, pDestTagSchema, false); + + char *pTagData = pTable->data + totalCols * sizeof(SSchema); + int accumBytes = 0; + SDataRow dataRow = tdNewDataRowFromSchema(pDestTagSchema); + + for (int i = 0; i < pTable->numOfTags; i++) { + tdAppendColVal(dataRow, pTagData + accumBytes, pDestTagSchema->columns + i); + accumBytes += pSchema[i + pTable->numOfColumns].bytes; + } + tsdbTableSetTagValue(&tCfg, dataRow, false); + } + + code = tsdbAlterTable(pVnode->tsdb, &tCfg); + dTrace("pVnode:%p vgId:%d, table:%s, alter table result:%d", pVnode, pVnode->vgId, pTable->tableId, code); + + return code; +} + +static int32_t vnodeProcessDropStableMsg(SVnodeObj *pVnode, void *pCont, void *item) { + SMDDropSTableMsg *pTable = pCont; + int32_t code = 0; + + dTrace("pVnode:%p vgId:%d, stable:%s, start to drop", pVnode, pVnode->vgId, pTable->tableId); + pTable->uid = htobe64(pTable->uid); + + // TODO: drop stable in vvnode + //void *pTsdb = dnodeGetVnodeTsdb(pMsg->pVnode); + //rpcRsp.code = tsdbDropSTable(pTsdb, pTable->uid); + + code = TSDB_CODE_SUCCESS; + dTrace("pVnode:%p vgId:%d, stable:%s, drop stable result:%x", pVnode, pTable->tableId, code); + + return code; +} + +int vnodeWriteToQueue(void *param, SWalHead *pHead, int type) { + SVnodeObj *pVnode = param; + + int size = sizeof(SWalHead) + pHead->len; + SWalHead *pWal = (SWalHead *)taosAllocateQitem(size); + memcpy(pWal, pHead, size); + + taosWriteQitem(pVnode->wqueue, type, pHead); + + return 0; +} + diff --git a/src/vnode/wal/inc/twal.h b/src/inc/twal.h similarity index 97% rename from src/vnode/wal/inc/twal.h rename to src/inc/twal.h index 49fcde9e28..bac5f87215 100644 --- a/src/vnode/wal/inc/twal.h +++ b/src/inc/twal.h @@ -40,7 +40,7 @@ void walClose(twal_h); int walRenew(twal_h); int walWrite(twal_h, SWalHead *); void walFsync(twal_h); -int walRestore(twal_h, void *pVnode, int (*writeFp)(void *ahandle, void *pWalHead)); +int walRestore(twal_h, void *pVnode, int (*writeFp)(void *ahandle, SWalHead *pHead, int type)); int walGetWalFile(twal_h, char *name, uint32_t *index); extern int wDebugFlag; diff --git a/src/vnode/tsdb/inc/tsdb.h b/src/vnode/tsdb/inc/tsdb.h index 5c458989bc..85575e1a8b 100644 --- a/src/vnode/tsdb/inc/tsdb.h +++ b/src/vnode/tsdb/inc/tsdb.h @@ -53,7 +53,7 @@ void tsdbFreeCfg(STsdbCfg *pCfg); // --------- TSDB REPOSITORY DEFINITION typedef void tsdb_repo_t; // use void to hide implementation details from outside -tsdb_repo_t * tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg, void *limiter); +int tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg, void *limiter); int32_t tsdbDropRepo(tsdb_repo_t *repo); tsdb_repo_t * tsdbOpenRepo(char *tsdbDir); int32_t tsdbCloseRepo(tsdb_repo_t *repo); @@ -332,4 +332,4 @@ SArray *tsdbQueryTableList(tsdb_repo_t* tsdb, int64_t uid, const wchar_t *pTagCo } #endif -#endif // _TD_TSDB_H_ \ No newline at end of file +#endif // _TD_TSDB_H_ diff --git a/src/vnode/tsdb/src/tsdbMain.c b/src/vnode/tsdb/src/tsdbMain.c index 8ab7648510..5be2f11b15 100644 --- a/src/vnode/tsdb/src/tsdbMain.c +++ b/src/vnode/tsdb/src/tsdbMain.c @@ -108,67 +108,40 @@ void tsdbFreeCfg(STsdbCfg *pCfg) { * * @return a TSDB repository handle on success, NULL for failure */ -tsdb_repo_t *tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg, void *limiter /* TODO */) { - if (rootDir == NULL) return NULL; +int32_t tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg, void *limiter /* TODO */) { + + if (mkdir(rootDir, 0755) != 0) { + if (errno == EACCES) { + return TSDB_CODE_NO_DISK_PERMISSIONS; + } else if (errno == ENOSPC) { + return TSDB_CODE_SERV_NO_DISKSPACE; + } else if (errno == EEXIST) { + } else { + return TSDB_CODE_VG_INIT_FAILED; + } + } - if (access(rootDir, F_OK | R_OK | W_OK) == -1) return NULL; + if (access(rootDir, F_OK | R_OK | W_OK) == -1) return -1; if (tsdbCheckAndSetDefaultCfg(pCfg) < 0) { - return NULL; + return -1; } STsdbRepo *pRepo = (STsdbRepo *)malloc(sizeof(STsdbRepo)); if (pRepo == NULL) { - return NULL; + return -1; } pRepo->rootDir = strdup(rootDir); pRepo->config = *pCfg; pRepo->limiter = limiter; - pthread_mutex_init(&pRepo->mutex, NULL); // Create the environment files and directories - if (tsdbSetRepoEnv(pRepo) < 0) { - free(pRepo->rootDir); - free(pRepo); - return NULL; - } - - // Initialize meta - STsdbMeta *pMeta = tsdbInitMeta(rootDir, pCfg->maxTables); - if (pMeta == NULL) { - free(pRepo->rootDir); - free(pRepo); - return NULL; - } - pRepo->tsdbMeta = pMeta; - - // Initialize cache - STsdbCache *pCache = tsdbInitCache(pCfg->maxCacheSize, -1, (tsdb_repo_t *)pRepo); - if (pCache == NULL) { - free(pRepo->rootDir); - tsdbFreeMeta(pRepo->tsdbMeta); - free(pRepo); - return NULL; - } - pRepo->tsdbCache = pCache; - - // Initialize file handle - char dataDir[128] = "\0"; - tsdbGetDataDirName(pRepo, dataDir); - pRepo->tsdbFileH = - tsdbInitFileH(dataDir, pCfg->maxTables); - if (pRepo->tsdbFileH == NULL) { - free(pRepo->rootDir); - tsdbFreeCache(pRepo->tsdbCache); - tsdbFreeMeta(pRepo->tsdbMeta); - free(pRepo); - return NULL; - } - - pRepo->state = TSDB_REPO_STATE_ACTIVE; - - return (tsdb_repo_t *)pRepo; + int32_t code = tsdbSetRepoEnv(pRepo); + + free(pRepo->rootDir); + free(pRepo); + return code; } /** @@ -1236,4 +1209,4 @@ int tsdbWriteBlockToFile(STsdbRepo *pRepo, SFileGroup *pGroup, SCompIdx *pIdx, S } return numOfPointsToWrite; -} \ No newline at end of file +} diff --git a/src/vnode/wal/src/walMain.c b/src/vnode/wal/src/walMain.c index f327c28ce3..96819b47ff 100644 --- a/src/vnode/wal/src/walMain.c +++ b/src/vnode/wal/src/walMain.c @@ -26,6 +26,7 @@ #include "tchecksum.h" #include "tutil.h" #include "twal.h" +#include "tqueue.h" #define walPrefix "wal" #define wError(...) if (wDebugFlag & DEBUG_ERROR) {tprintf("ERROR WAL ", wDebugFlag, __VA_ARGS__);} @@ -48,7 +49,7 @@ int wDebugFlag = 135; static uint32_t walSignature = 0xFAFBFDFE; static int walHandleExistingFiles(char *path); -static int walRestoreWalFile(char *name, void *pVnode, int (*writeFp)(void *, void *)); +static int walRestoreWalFile(char *name, void *pVnode, int (*writeFp)(void *, SWalHead *, int)); static int walRemoveWalFiles(char *path); void *walOpen(char *path, int max, int level) { @@ -168,7 +169,7 @@ void walFsync(void *handle) { fsync(pWal->fd); } -int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *)) { +int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, SWalHead *, int)) { SWal *pWal = (SWal *)handle; int code = 0; struct dirent *ent; @@ -245,43 +246,45 @@ int walGetWalFile(void *handle, char *name, uint32_t *index) { return code; } -static int walRestoreWalFile(char *name, void *pVnode, int (*writeFp)(void *, void *)) { - SWalHead walHead; - int code = 0; +static int walRestoreWalFile(char *name, void *pVnode, int (*writeFp)(void *, SWalHead *, int)) { + int code = 0; + + char *buffer = malloc(1024000); // size for one record + if (buffer == NULL) return -1; + + SWalHead *pHead = (SWalHead *)buffer; int fd = open(name, O_RDONLY); if (fd < 0) { wError("wal:%s, failed to open for restore(%s)", name, strerror(errno)); + free(buffer); return -1; } wTrace("wal:%s, start to restore", name); while (1) { - int ret = read(fd, &walHead, sizeof(walHead)); + int ret = read(fd, pHead, sizeof(SWalHead)); if ( ret == 0) { code = 0; break;} - if (ret != sizeof(walHead)) { + if (ret != sizeof(SWalHead)) { wWarn("wal:%s, failed to read head, skip, ret:%d(%s)", name, ret, strerror(errno)); break; } - if (taosCheckChecksumWhole((uint8_t *)&walHead, sizeof(walHead))) { + if (taosCheckChecksumWhole((uint8_t *)pHead, sizeof(SWalHead))) { wWarn("wal:%s, cksum is messed up, skip the rest of file", name); break; } - char *buffer = malloc(sizeof(SWalHead) + walHead.len); - memcpy(buffer, &walHead, sizeof(walHead)); - - ret = read(fd, buffer+sizeof(walHead), walHead.len); - if ( ret != walHead.len) { - wWarn("wal:%s, failed to read body, skip, len:%d ret:%d", name, walHead.len, ret); + ret = read(fd, pHead->cont, pHead->len); + if ( ret != pHead->len) { + wWarn("wal:%s, failed to read body, skip, len:%d ret:%d", name, pHead->len, ret); break; } // write into queue - (*writeFp)(pVnode, buffer); + (*writeFp)(pVnode, buffer, TAOS_QTYPE_WAL); } return code; diff --git a/src/vnode/wal/test/waltest.c b/src/vnode/wal/test/waltest.c index e90b54d1f3..37d1d8e84c 100644 --- a/src/vnode/wal/test/waltest.c +++ b/src/vnode/wal/test/waltest.c @@ -21,16 +21,14 @@ int64_t ver = 0; void *pWal = NULL; -int writeToQueue(void *pVnode, void *data) { - SWalHead *pHead = (SWalHead *)data; - +int writeToQueue(void *pVnode, SWalHead *pHead, int type) { // do nothing if (pHead->version > ver) ver = pHead->version; walWrite(pWal, pHead); - free(data); + free(pHead); return 0; } -- GitLab