提交 6af40311 编写于 作者: 陶建辉(Jeff)'s avatar 陶建辉(Jeff)

integrate WAL module

move some functions in dnode into vnodeMain.c and vnodeWrite.c
上级 18da95ad
...@@ -13,7 +13,7 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) ...@@ -13,7 +13,7 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
AUX_SOURCE_DIRECTORY(src SRC) AUX_SOURCE_DIRECTORY(src SRC)
ADD_EXECUTABLE(taosd ${SRC}) ADD_EXECUTABLE(taosd ${SRC})
TARGET_LINK_LIBRARIES(taosd mnode taos_static monitor http tsdb) TARGET_LINK_LIBRARIES(taosd mnode taos_static monitor http tsdb twal)
IF (TD_ACCOUNT) IF (TD_ACCOUNT)
TARGET_LINK_LIBRARIES(taosd account) TARGET_LINK_LIBRARIES(taosd account)
......
...@@ -23,8 +23,8 @@ extern "C" { ...@@ -23,8 +23,8 @@ extern "C" {
int32_t dnodeInitRead(); int32_t dnodeInitRead();
void dnodeCleanupRead(); void dnodeCleanupRead();
void dnodeRead(SRpcMsg *pMsg); void dnodeRead(SRpcMsg *pMsg);
void * dnodeAllocateReadWorker(); void * dnodeAllocateRqueue();
void dnodeFreeReadWorker(void *rqueue); void dnodeFreeRqueue(void *rqueue);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -23,8 +23,9 @@ extern "C" { ...@@ -23,8 +23,9 @@ extern "C" {
int32_t dnodeInitWrite(); int32_t dnodeInitWrite();
void dnodeCleanupWrite(); void dnodeCleanupWrite();
void dnodeWrite(SRpcMsg *pMsg); void dnodeWrite(SRpcMsg *pMsg);
void * dnodeAllocateWriteWorker(); void * dnodeAllocateWqueue();
void dnodeFreeWriteWorker(void *worker); void dnodeFreeWqueue(void *worker);
void dnodeSendWriteResponse(void *pVnode, void *param, int32_t code);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_VNODE_H
#define TDENGINE_VNODE_H
#ifdef __cplusplus
extern "C" {
#endif
int32_t vnodeInitWrite();
int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg);
int32_t vnodeDrop(int32_t vgId);
int32_t vnodeOpen(int32_t vnode, char *rootDir);
int32_t vnodeClose(void *pVnode);
void vnodeRelease(void *pVnode);
void* vnodeGetVnode(int32_t vgId);
void* vnodeGetRqueue(void *);
void* vnodeGetWqueue(int32_t vgId);
void* vnodeGetWal(void *pVnode);
void* vnodeGetTsdb(void *pVnode);
int32_t vnodeProcessWrite(void *pVnode, int qtype, SWalHead *pHead, void *item);
#ifdef __cplusplus
}
#endif
#endif
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_VNODE_INT_H
#define TDENGINE_VNODE_INT_H
#ifdef __cplusplus
extern "C" {
#endif
typedef enum _VN_STATUS {
VN_STATUS_INIT,
VN_STATUS_CREATING,
VN_STATUS_READY,
VN_STATUS_CLOSING,
VN_STATUS_DELETING,
} EVnStatus;
typedef struct {
int32_t vgId; // global vnode group ID
int32_t refCount; // reference count
EVnStatus status;
int role;
int64_t version;
void * wqueue;
void * rqueue;
void * wal;
void * tsdb;
void * sync;
void * events;
void * cq; // continuous query
} SVnodeObj;
int vnodeWriteToQueue(void *param, SWalHead *pHead, int type);
#ifdef __cplusplus
}
#endif
#endif
...@@ -24,12 +24,12 @@ ...@@ -24,12 +24,12 @@
#include "tsdb.h" #include "tsdb.h"
#include "ttime.h" #include "ttime.h"
#include "ttimer.h" #include "ttimer.h"
#include "twal.h"
#include "dnodeMClient.h" #include "dnodeMClient.h"
#include "dnodeMgmt.h" #include "dnodeMgmt.h"
#include "dnodeRead.h" #include "dnodeRead.h"
#include "dnodeWrite.h" #include "dnodeWrite.h"
#include "vnode.h"
typedef enum { CLOSE_TSDB, DROP_TSDB } ECloseTsdbFlag;
typedef struct { typedef struct {
int32_t vgId; // global vnode group ID int32_t vgId; // global vnode group ID
...@@ -47,21 +47,16 @@ typedef struct { ...@@ -47,21 +47,16 @@ typedef struct {
static int32_t dnodeOpenVnodes(); static int32_t dnodeOpenVnodes();
static void dnodeCleanupVnodes(); static void dnodeCleanupVnodes();
static int32_t dnodeOpenVnode(int32_t vnode, char *rootDir); static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *pMsg);
static void dnodeCleanupVnode(SVnodeObj *pVnode); static int32_t dnodeProcessDropVnodeMsg(SRpcMsg *pMsg);
static void dnodeDoCleanupVnode(SVnodeObj *pVnode, ECloseTsdbFlag dropFlag); static int32_t dnodeProcessAlterVnodeMsg(SRpcMsg *pMsg);
static int32_t dnodeCreateVnode(SMDCreateVnodeMsg *cfg); static int32_t dnodeProcessAlterStreamMsg(SRpcMsg *pMsg);
static void dnodeDropVnode(SVnodeObj *pVnode); static int32_t dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg);
static void dnodeProcessCreateVnodeMsg(SRpcMsg *pMsg); static int32_t (*dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *pMsg);
static void dnodeProcessDropVnodeMsg(SRpcMsg *pMsg);
static void dnodeProcessAlterVnodeMsg(SRpcMsg *pMsg);
static void dnodeProcessAlterStreamMsg(SRpcMsg *pMsg);
static void dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg);
static void (*dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *pMsg);
static void dnodeSendStatusMsg(void *handle, void *tmrId); static void dnodeSendStatusMsg(void *handle, void *tmrId);
static void dnodeReadDnodeId(); static void dnodeReadDnodeId();
static void *tsDnodeVnodesHash = NULL; void *tsDnodeVnodesHash = NULL;
static void *tsDnodeTmr = NULL; static void *tsDnodeTmr = NULL;
static void *tsStatusTimer = NULL; static void *tsStatusTimer = NULL;
static uint32_t tsRebootTime; static uint32_t tsRebootTime;
...@@ -119,41 +114,19 @@ void dnodeCleanupMgmt() { ...@@ -119,41 +114,19 @@ void dnodeCleanupMgmt() {
} }
void dnodeMgmt(SRpcMsg *pMsg) { void dnodeMgmt(SRpcMsg *pMsg) {
terrno = 0; SRpcMsg rsp;
if (dnodeProcessMgmtMsgFp[pMsg->msgType]) { if (dnodeProcessMgmtMsgFp[pMsg->msgType]) {
(*dnodeProcessMgmtMsgFp[pMsg->msgType])(pMsg); rsp.code = (*dnodeProcessMgmtMsgFp[pMsg->msgType])(pMsg);
} else { } else {
SRpcMsg rsp; rsp.code = TSDB_CODE_MSG_NOT_PROCESSED;
rsp.handle = pMsg->handle;
rsp.code = TSDB_CODE_MSG_NOT_PROCESSED;
rsp.pCont = NULL;
rpcSendResponse(&rsp);
} }
rpcFreeCont(pMsg->pCont); rsp.handle = pMsg->handle;
} rsp.pCont = NULL;
rpcSendResponse(&rsp);
void *dnodeGetVnode(int32_t vgId) {
SVnodeObj *pVnode = (SVnodeObj *) taosGetIntHashData(tsDnodeVnodesHash, vgId);
if (pVnode == NULL) {
terrno = TSDB_CODE_INVALID_VGROUP_ID;
return NULL;
}
if (pVnode->status != TSDB_VN_STATUS_MASTER && pVnode->status == TSDB_VN_STATUS_SLAVE) { rpcFreeCont(pMsg->pCont);
terrno = TSDB_CODE_INVALID_VNODE_STATUS;
return NULL;
}
atomic_add_fetch_32(&pVnode->refCount, 1);
dTrace("pVnode:%p, vgroup:%d, get vnode, refCount:%d", pVnode, pVnode->vgId, pVnode->refCount);
return pVnode;
}
int32_t dnodeGetVnodeStatus(void *pVnode) {
return ((SVnodeObj *)pVnode)->status;
} }
void *dnodeGetVnodeWworker(void *pVnode) { void *dnodeGetVnodeWworker(void *pVnode) {
...@@ -164,37 +137,6 @@ void *dnodeGetVnodeRworker(void *pVnode) { ...@@ -164,37 +137,6 @@ void *dnodeGetVnodeRworker(void *pVnode) {
return ((SVnodeObj *)pVnode)->rworker; return ((SVnodeObj *)pVnode)->rworker;
} }
void *dnodeGetVnodeWal(void *pVnode) {
return ((SVnodeObj *)pVnode)->wal;
}
void *dnodeGetVnodeTsdb(void *pVnode) {
return ((SVnodeObj *)pVnode)->tsdb;
}
void dnodeReleaseVnode(void *pVnodeRaw) {
SVnodeObj *pVnode = pVnodeRaw;
int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1);
if (pVnode->status == TSDB_VN_STATUS_DELETING) {
if (refCount <= 0) {
dPrint("pVnode:%p, vgroup:%d, drop vnode, refCount:%d", pVnode, pVnode->vgId, refCount);
dnodeDoCleanupVnode(pVnode, DROP_TSDB);
} else {
dTrace("pVnode:%p, vgroup:%d, vnode will be dropped until refCount:%d is 0", pVnode, pVnode->vgId, refCount);
}
} else if (pVnode->status == TSDB_VN_STATUS_CLOSING) {
if (refCount <= 0) {
dPrint("pVnode:%p, vgroup:%d, cleanup vnode, refCount:%d", pVnode, pVnode->vgId, refCount);
dnodeDoCleanupVnode(pVnode, CLOSE_TSDB);
} else {
dTrace("pVnode:%p, vgroup:%d, vnode will cleanup until refCount:%d is 0", pVnode, pVnode->vgId, refCount);
}
} else {
dTrace("pVnode:%p, vgroup:%d, release vnode, refCount:%d", pVnode, pVnode->vgId, refCount);
}
}
static int32_t dnodeOpenVnodes() { static int32_t dnodeOpenVnodes() {
DIR *dir = opendir(tsVnodeDir); DIR *dir = opendir(tsVnodeDir);
if (dir == NULL) { if (dir == NULL) {
...@@ -212,7 +154,7 @@ static int32_t dnodeOpenVnodes() { ...@@ -212,7 +154,7 @@ static int32_t dnodeOpenVnodes() {
char vnodeDir[TSDB_FILENAME_LEN * 3]; char vnodeDir[TSDB_FILENAME_LEN * 3];
snprintf(vnodeDir, TSDB_FILENAME_LEN * 3, "%s/%s", tsVnodeDir, de->d_name); snprintf(vnodeDir, TSDB_FILENAME_LEN * 3, "%s/%s", tsVnodeDir, de->d_name);
int32_t code = dnodeOpenVnode(vnode, vnodeDir); int32_t code = vnodeOpen(vnode, vnodeDir);
if (code == 0) { if (code == 0) {
numOfVnodes++; numOfVnodes++;
} }
...@@ -227,208 +169,39 @@ static int32_t dnodeOpenVnodes() { ...@@ -227,208 +169,39 @@ static int32_t dnodeOpenVnodes() {
typedef void (*CleanupFp)(char *); typedef void (*CleanupFp)(char *);
static void dnodeCleanupVnodes() { static void dnodeCleanupVnodes() {
int32_t num = taosGetIntHashSize(tsDnodeVnodesHash); int32_t num = taosGetIntHashSize(tsDnodeVnodesHash);
taosCleanUpIntHashWithFp(tsDnodeVnodesHash, (CleanupFp)dnodeCleanupVnode); taosCleanUpIntHashWithFp(tsDnodeVnodesHash, (CleanupFp)vnodeClose);
dPrint("dnode mgmt is closed, vnodes:%d", num); dPrint("dnode mgmt is closed, vnodes:%d", num);
} }
static int32_t dnodeOpenVnode(int32_t vnode, char *rootDir) { static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) {
SVnodeObj vnodeObj = {0};
vnodeObj.vgId = vnode;
vnodeObj.status = TSDB_VN_STATUS_NOT_READY;
vnodeObj.refCount = 1;
vnodeObj.version = 0;
SVnodeObj *pVnode = (SVnodeObj *)taosAddIntHash(tsDnodeVnodesHash, vnodeObj.vgId, (char *)(&vnodeObj));
char tsdbDir[TSDB_FILENAME_LEN];
sprintf(tsdbDir, "%s/tsdb", rootDir);
void *pTsdb = tsdbOpenRepo(tsdbDir);
if (pTsdb == NULL) {
dError("pVnode:%p, vgroup:%d, failed to open tsdb in %s, reason:%s", pVnode, pVnode->vgId, tsdbDir, tstrerror(terrno));
taosDeleteIntHash(tsDnodeVnodesHash, pVnode->vgId);
return terrno;
}
pVnode->wal = NULL;
pVnode->tsdb = pTsdb;
pVnode->replica = NULL;
pVnode->events = NULL;
pVnode->cq = NULL;
pVnode->wworker = dnodeAllocateWriteWorker(pVnode);
pVnode->rworker = dnodeAllocateReadWorker(pVnode);
//TODO: jude status while replca is not null
if (pVnode->replica == NULL) {
pVnode->status = TSDB_VN_STATUS_MASTER;
}
dTrace("pVnode:%p, vgroup:%d, vnode is opened in %s", pVnode, pVnode->vgId, rootDir);
return TSDB_CODE_SUCCESS;
}
static void dnodeDoCleanupVnode(SVnodeObj *pVnode, ECloseTsdbFlag closeFlag) {
dTrace("pVnode:%p, vgroup:%d, cleanup vnode", pVnode, pVnode->vgId);
// remove replica
// remove read queue
dnodeFreeReadWorker(pVnode->rworker);
pVnode->rworker = NULL;
// remove write queue
dnodeFreeWriteWorker(pVnode->wworker);
pVnode->wworker = NULL;
// remove wal
// remove tsdb
if (pVnode->tsdb) {
if (closeFlag == DROP_TSDB) {
tsdbDropRepo(pVnode->tsdb);
taosDeleteIntHash(tsDnodeVnodesHash, pVnode->vgId);
} else if (closeFlag == CLOSE_TSDB) {
tsdbCloseRepo(pVnode->tsdb);
}
pVnode->tsdb = NULL;
}
}
static void dnodeCleanupVnode(SVnodeObj *pVnode) {
pVnode->status = TSDB_VN_STATUS_CLOSING;
dnodeReleaseVnode(pVnode);
}
static int32_t dnodeCreateVnode(SMDCreateVnodeMsg *pVnodeCfg) {
STsdbCfg tsdbCfg = {0};
tsdbCfg.precision = pVnodeCfg->cfg.precision;
tsdbCfg.tsdbId = pVnodeCfg->cfg.vgId;
tsdbCfg.maxTables = pVnodeCfg->cfg.maxSessions;
tsdbCfg.daysPerFile = pVnodeCfg->cfg.daysPerFile;
tsdbCfg.minRowsPerFileBlock = -1;
tsdbCfg.maxRowsPerFileBlock = -1;
tsdbCfg.keep = -1;
tsdbCfg.maxCacheSize = -1;
char rootDir[TSDB_FILENAME_LEN] = {0};
sprintf(rootDir, "%s/vnode%d", tsVnodeDir, pVnodeCfg->cfg.vgId);
if (mkdir(rootDir, 0755) != 0) {
if (errno == EACCES) {
return TSDB_CODE_NO_DISK_PERMISSIONS;
} else if (errno == ENOSPC) {
return TSDB_CODE_SERV_NO_DISKSPACE;
} else if (errno == EEXIST) {
} else {
return TSDB_CODE_VG_INIT_FAILED;
}
}
sprintf(rootDir, "%s/vnode%d/tsdb", tsVnodeDir, pVnodeCfg->cfg.vgId);
if (mkdir(rootDir, 0755) != 0) {
if (errno == EACCES) {
return TSDB_CODE_NO_DISK_PERMISSIONS;
} else if (errno == ENOSPC) {
return TSDB_CODE_SERV_NO_DISKSPACE;
} else if (errno == EEXIST) {
} else {
return TSDB_CODE_VG_INIT_FAILED;
}
}
void *pTsdb = tsdbCreateRepo(rootDir, &tsdbCfg, NULL);
if (pTsdb == NULL) {
dError("vgroup:%d, failed to create tsdb in vnode, reason:%s", pVnodeCfg->cfg.vgId, tstrerror(terrno));
return terrno;
}
SVnodeObj vnodeObj = {0};
vnodeObj.vgId = pVnodeCfg->cfg.vgId;
vnodeObj.status = TSDB_VN_STATUS_CREATING;
vnodeObj.refCount = 1;
vnodeObj.version = 0;
vnodeObj.wal = NULL;
vnodeObj.tsdb = pTsdb;
vnodeObj.replica = NULL;
vnodeObj.events = NULL;
vnodeObj.cq = NULL;
SVnodeObj *pVnode = (SVnodeObj *)taosAddIntHash(tsDnodeVnodesHash, vnodeObj.vgId, (char *)(&vnodeObj));
pVnode->wworker = dnodeAllocateWriteWorker(pVnode);
pVnode->rworker = dnodeAllocateReadWorker(pVnode);
if (pVnode->replica == NULL) {
pVnode->status = TSDB_VN_STATUS_MASTER;
}
dPrint("pVnode:%p, vgroup:%d, vnode:%d is created", pVnode, pVnode->vgId, pVnode->vgId);
return TSDB_CODE_SUCCESS;
}
static void dnodeDropVnode(SVnodeObj *pVnode) {
pVnode->status = TSDB_VN_STATUS_DELETING;
dnodeReleaseVnode(pVnode);
}
static void dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) {
SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
SMDCreateVnodeMsg *pCreate = rpcMsg->pCont; SMDCreateVnodeMsg *pCreate = rpcMsg->pCont;
pCreate->cfg.vgId = htonl(pCreate->cfg.vgId); pCreate->cfg.vgId = htonl(pCreate->cfg.vgId);
pCreate->cfg.maxSessions = htonl(pCreate->cfg.maxSessions); pCreate->cfg.maxSessions = htonl(pCreate->cfg.maxSessions);
pCreate->cfg.daysPerFile = htonl(pCreate->cfg.daysPerFile); pCreate->cfg.daysPerFile = htonl(pCreate->cfg.daysPerFile);
dTrace("vgroup:%d, start to create vnode in dnode", pCreate->cfg.vgId); return vnodeCreate(pCreate);
SVnodeObj *pVnodeObj = (SVnodeObj *) taosGetIntHashData(tsDnodeVnodesHash, pCreate->cfg.vgId);
if (pVnodeObj != NULL) {
rpcRsp.code = TSDB_CODE_SUCCESS;
dPrint("pVnode:%p, vgroup:%d, vnode is already exist", pVnodeObj, pCreate->cfg.vgId);
} else {
rpcRsp.code = dnodeCreateVnode(pCreate);
}
rpcSendResponse(&rpcRsp);
} }
static void dnodeProcessDropVnodeMsg(SRpcMsg *rpcMsg) { static int32_t dnodeProcessDropVnodeMsg(SRpcMsg *rpcMsg) {
SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
SMDDropVnodeMsg *pDrop = rpcMsg->pCont; SMDDropVnodeMsg *pDrop = rpcMsg->pCont;
pDrop->vgId = htonl(pDrop->vgId); pDrop->vgId = htonl(pDrop->vgId);
SVnodeObj *pVnodeObj = (SVnodeObj *) taosGetIntHashData(tsDnodeVnodesHash, pDrop->vgId); return vnodeDrop(pDrop->vgId);
if (pVnodeObj != NULL) {
dPrint("pVnode:%p, vgroup:%d, start to drop vnode in dnode", pVnodeObj, pDrop->vgId);
dnodeDropVnode(pVnodeObj);
rpcRsp.code = TSDB_CODE_SUCCESS;
} else {
dTrace("vgroup:%d, failed drop vnode in dnode, vgroup not exist", pDrop->vgId);
rpcRsp.code = TSDB_CODE_INVALID_VGROUP_ID;
}
rpcSendResponse(&rpcRsp);
} }
static void dnodeProcessAlterVnodeMsg(SRpcMsg *rpcMsg) { static int32_t dnodeProcessAlterVnodeMsg(SRpcMsg *rpcMsg) {
SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
SMDCreateVnodeMsg *pCreate = rpcMsg->pCont; SMDCreateVnodeMsg *pCreate = rpcMsg->pCont;
pCreate->cfg.vgId = htonl(pCreate->cfg.vgId); pCreate->cfg.vgId = htonl(pCreate->cfg.vgId);
pCreate->cfg.maxSessions = htonl(pCreate->cfg.maxSessions); pCreate->cfg.maxSessions = htonl(pCreate->cfg.maxSessions);
pCreate->cfg.daysPerFile = htonl(pCreate->cfg.daysPerFile); pCreate->cfg.daysPerFile = htonl(pCreate->cfg.daysPerFile);
dTrace("vgroup:%d, start to alter vnode in dnode", pCreate->cfg.vgId); return 0;
SVnodeObj *pVnodeObj = (SVnodeObj *) taosGetIntHashData(tsDnodeVnodesHash, pCreate->cfg.vgId);
if (pVnodeObj != NULL) {
dPrint("pVnode:%p, vgroup:%d, start to alter vnode in dnode", pVnodeObj, pCreate->cfg.vgId);
rpcRsp.code = TSDB_CODE_SUCCESS;
} else {
dTrace("vgroup:%d, alter vnode msg received, start to create vnode", pCreate->cfg.vgId);
rpcRsp.code = dnodeCreateVnode(pCreate);;
}
rpcSendResponse(&rpcRsp);
} }
static void dnodeProcessAlterStreamMsg(SRpcMsg *pMsg) { static int32_t dnodeProcessAlterStreamMsg(SRpcMsg *pMsg) {
// SMDAlterStreamMsg *pStream = pCont; // SMDAlterStreamMsg *pStream = pCont;
// pStream->uid = htobe64(pStream->uid); // pStream->uid = htobe64(pStream->uid);
// pStream->stime = htobe64(pStream->stime); // pStream->stime = htobe64(pStream->stime);
...@@ -437,14 +210,13 @@ static void dnodeProcessAlterStreamMsg(SRpcMsg *pMsg) { ...@@ -437,14 +210,13 @@ static void dnodeProcessAlterStreamMsg(SRpcMsg *pMsg) {
// pStream->status = htonl(pStream->status); // pStream->status = htonl(pStream->status);
// //
// int32_t code = dnodeCreateStream(pStream); // int32_t code = dnodeCreateStream(pStream);
return 0;
} }
static void dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg) { static int32_t dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg) {
SMDCfgDnodeMsg *pCfg = (SMDCfgDnodeMsg *)pMsg->pCont; SMDCfgDnodeMsg *pCfg = (SMDCfgDnodeMsg *)pMsg->pCont;
int32_t code = tsCfgDynamicOptions(pCfg->config); return tsCfgDynamicOptions(pCfg->config);
SRpcMsg rpcRsp = {.handle = pMsg->handle, .pCont = NULL, .contLen = 0, .code = code, .msgType = 0};
rpcSendResponse(&rpcRsp);
} }
static void dnodeBuildVloadMsg(char *pNode, void * param) { static void dnodeBuildVloadMsg(char *pNode, void * param) {
......
...@@ -22,9 +22,11 @@ ...@@ -22,9 +22,11 @@
#include "tqueue.h" #include "tqueue.h"
#include "trpc.h" #include "trpc.h"
#include "twal.h"
#include "dnodeMgmt.h" #include "dnodeMgmt.h"
#include "dnodeRead.h" #include "dnodeRead.h"
#include "queryExecutor.h" #include "queryExecutor.h"
#include "vnode.h"
typedef struct { typedef struct {
int32_t code; int32_t code;
...@@ -76,7 +78,8 @@ void dnodeRead(SRpcMsg *pMsg) { ...@@ -76,7 +78,8 @@ void dnodeRead(SRpcMsg *pMsg) {
int32_t leftLen = pMsg->contLen; int32_t leftLen = pMsg->contLen;
char *pCont = (char *) pMsg->pCont; char *pCont = (char *) pMsg->pCont;
SRpcContext *pRpcContext = NULL; SRpcContext *pRpcContext = NULL;
void *pVnode;
dTrace("dnode %s msg incoming, thandle:%p", taosMsg[pMsg->msgType], pMsg->handle); dTrace("dnode %s msg incoming, thandle:%p", taosMsg[pMsg->msgType], pMsg->handle);
if (pMsg->msgType == TSDB_MSG_TYPE_RETRIEVE) { if (pMsg->msgType == TSDB_MSG_TYPE_RETRIEVE) {
...@@ -88,7 +91,7 @@ void dnodeRead(SRpcMsg *pMsg) { ...@@ -88,7 +91,7 @@ void dnodeRead(SRpcMsg *pMsg) {
pHead->vgId = htonl(pHead->vgId); pHead->vgId = htonl(pHead->vgId);
pHead->contLen = htonl(pHead->contLen); pHead->contLen = htonl(pHead->contLen);
void *pVnode = dnodeGetVnode(pHead->vgId); pVnode = vnodeGetVnode(pHead->vgId);
if (pVnode == NULL) { if (pVnode == NULL) {
leftLen -= pHead->contLen; leftLen -= pHead->contLen;
pCont -= pHead->contLen; pCont -= pHead->contLen;
...@@ -96,13 +99,13 @@ void dnodeRead(SRpcMsg *pMsg) { ...@@ -96,13 +99,13 @@ void dnodeRead(SRpcMsg *pMsg) {
} }
// put message into queue // put message into queue
taos_queue queue = vnodeGetRqueue(pVnode);
SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg)); SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg));
pRead->rpcMsg = *pMsg; pRead->rpcMsg = *pMsg;
pRead->pCont = pCont; pRead->pCont = pCont;
pRead->contLen = pHead->contLen; pRead->contLen = pHead->contLen;
pRead->pRpcContext = pRpcContext; pRead->pRpcContext = pRpcContext;
taos_queue queue = dnodeGetVnodeRworker(pVnode);
taosWriteQitem(queue, TAOS_QTYPE_RPC, pRead); taosWriteQitem(queue, TAOS_QTYPE_RPC, pRead);
// next vnode // next vnode
...@@ -123,7 +126,7 @@ void dnodeRead(SRpcMsg *pMsg) { ...@@ -123,7 +126,7 @@ void dnodeRead(SRpcMsg *pMsg) {
} }
} }
void *dnodeAllocateReadWorker(void *pVnode) { void *dnodeAllocateRqueue(void *pVnode) {
taos_queue *queue = taosOpenQueue(sizeof(SReadMsg)); taos_queue *queue = taosOpenQueue(sizeof(SReadMsg));
if (queue == NULL) return NULL; if (queue == NULL) return NULL;
...@@ -144,7 +147,7 @@ void *dnodeAllocateReadWorker(void *pVnode) { ...@@ -144,7 +147,7 @@ void *dnodeAllocateReadWorker(void *pVnode) {
return queue; return queue;
} }
void dnodeFreeReadWorker(void *rqueue) { void dnodeFreeRqueue(void *rqueue) {
taosCloseQueue(rqueue); taosCloseQueue(rqueue);
// dynamically adjust the number of threads // dynamically adjust the number of threads
...@@ -229,7 +232,7 @@ static void dnodeContinueExecuteQuery(void* pVnode, void* qhandle, SReadMsg *pMs ...@@ -229,7 +232,7 @@ static void dnodeContinueExecuteQuery(void* pVnode, void* qhandle, SReadMsg *pMs
pRead->pRpcContext = pMsg->pRpcContext; pRead->pRpcContext = pMsg->pRpcContext;
pRead->rpcMsg.msgType = TSDB_MSG_TYPE_QUERY; pRead->rpcMsg.msgType = TSDB_MSG_TYPE_QUERY;
taos_queue queue = dnodeGetVnodeRworker(pVnode); taos_queue queue = vnodeGetRqueue(pVnode);
taosWriteQitem(queue, TAOS_QTYPE_RPC, pRead); taosWriteQitem(queue, TAOS_QTYPE_RPC, pRead);
} }
...@@ -238,7 +241,7 @@ static void dnodeProcessQueryMsg(void *pVnode, SReadMsg *pMsg) { ...@@ -238,7 +241,7 @@ static void dnodeProcessQueryMsg(void *pVnode, SReadMsg *pMsg) {
SQInfo* pQInfo = NULL; SQInfo* pQInfo = NULL;
if (pMsg->contLen != 0) { if (pMsg->contLen != 0) {
void* tsdb = dnodeGetVnodeTsdb(pVnode); void* tsdb = vnodeGetTsdb(pVnode);
int32_t code = qCreateQueryInfo(tsdb, pQueryTableMsg, &pQInfo); int32_t code = qCreateQueryInfo(tsdb, pQueryTableMsg, &pQInfo);
SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp)); SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp));
...@@ -255,7 +258,7 @@ static void dnodeProcessQueryMsg(void *pVnode, SReadMsg *pMsg) { ...@@ -255,7 +258,7 @@ static void dnodeProcessQueryMsg(void *pVnode, SReadMsg *pMsg) {
rpcSendResponse(&rpcRsp); rpcSendResponse(&rpcRsp);
dTrace("dnode query msg disposed, thandle:%p", pMsg->rpcMsg.handle); dTrace("dnode query msg disposed, thandle:%p", pMsg->rpcMsg.handle);
dnodeReleaseVnode(pVnode); vnodeRelease(pVnode);
} else { } else {
pQInfo = pMsg->pCont; pQInfo = pMsg->pCont;
} }
...@@ -299,5 +302,5 @@ static void dnodeProcessRetrieveMsg(void *pVnode, SReadMsg *pMsg) { ...@@ -299,5 +302,5 @@ static void dnodeProcessRetrieveMsg(void *pVnode, SReadMsg *pMsg) {
rpcSendResponse(&rpcRsp); rpcSendResponse(&rpcRsp);
dTrace("dnode retrieve msg disposed, thandle:%p", pMsg->rpcMsg.handle); dTrace("dnode retrieve msg disposed, thandle:%p", pMsg->rpcMsg.handle);
dnodeReleaseVnode(pVnode); vnodeRelease(pVnode);
} }
...@@ -21,22 +21,11 @@ ...@@ -21,22 +21,11 @@
#include "tqueue.h" #include "tqueue.h"
#include "trpc.h" #include "trpc.h"
#include "tsdb.h" #include "tsdb.h"
#include "twal.h"
#include "dataformat.h" #include "dataformat.h"
#include "dnodeWrite.h" #include "dnodeWrite.h"
#include "dnodeMgmt.h" #include "dnodeMgmt.h"
#include "vnode.h"
typedef struct {
int32_t code;
int32_t count; // number of vnodes returned result
int32_t numOfVnodes; // number of vnodes involved
} SRpcContext;
typedef struct _write {
void *pCont;
int32_t contLen;
SRpcMsg rpcMsg;
SRpcContext *pRpcContext; // RPC message context
} SWriteMsg;
typedef struct { typedef struct {
taos_qset qset; // queue set taos_qset qset; // queue set
...@@ -44,30 +33,26 @@ typedef struct { ...@@ -44,30 +33,26 @@ typedef struct {
int32_t workerId; // worker ID int32_t workerId; // worker ID
} SWriteWorker; } SWriteWorker;
typedef struct {
void *pCont;
int32_t contLen;
SRpcMsg rpcMsg;
} SWriteMsg;
typedef struct _thread_obj { typedef struct _thread_obj {
int32_t max; // max number of workers int32_t max; // max number of workers
int32_t nextId; // from 0 to max-1, cyclic int32_t nextId; // from 0 to max-1, cyclic
SWriteWorker *writeWorker; SWriteWorker *writeWorker;
} SWriteWorkerPool; } SWriteWorkerPool;
static void (*dnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MAX])(void *, SWriteMsg *);
static void *dnodeProcessWriteQueue(void *param); static void *dnodeProcessWriteQueue(void *param);
static void dnodeHandleIdleWorker(SWriteWorker *pWorker); static void dnodeHandleIdleWorker(SWriteWorker *pWorker);
static void dnodeProcessWriteResult(void *pVnode, SWriteMsg *pWrite);
static void dnodeProcessSubmitMsg(void *pVnode, SWriteMsg *pMsg);
static void dnodeProcessCreateTableMsg(void *pVnode, SWriteMsg *pMsg);
static void dnodeProcessDropTableMsg(void *pVnode, SWriteMsg *pMsg);
static void dnodeProcessAlterTableMsg(void *pVnode, SWriteMsg *pMsg);
static void dnodeProcessDropStableMsg(void *pVnode, SWriteMsg *pMsg);
SWriteWorkerPool wWorkerPool; SWriteWorkerPool wWorkerPool;
int32_t dnodeInitWrite() { int32_t dnodeInitWrite() {
dnodeProcessWriteMsgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeProcessSubmitMsg;
dnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE] = dnodeProcessCreateTableMsg; vnodeInitWrite();
dnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_DROP_TABLE] = dnodeProcessDropTableMsg;
dnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE] = dnodeProcessAlterTableMsg;
dnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_DROP_STABLE] = dnodeProcessDropStableMsg;
wWorkerPool.max = tsNumOfCores; wWorkerPool.max = tsNumOfCores;
wWorkerPool.writeWorker = (SWriteWorker *)calloc(sizeof(SWriteWorker), wWorkerPool.max); wWorkerPool.writeWorker = (SWriteWorker *)calloc(sizeof(SWriteWorker), wWorkerPool.max);
...@@ -87,50 +72,28 @@ void dnodeCleanupWrite() { ...@@ -87,50 +72,28 @@ void dnodeCleanupWrite() {
} }
void dnodeWrite(SRpcMsg *pMsg) { void dnodeWrite(SRpcMsg *pMsg) {
int32_t queuedMsgNum = 0;
int32_t leftLen = pMsg->contLen;
char *pCont = (char *) pMsg->pCont; char *pCont = (char *) pMsg->pCont;
SRpcContext *pRpcContext = NULL;
if (pMsg->msgType == TSDB_MSG_TYPE_SUBMIT || pMsg->msgType == TSDB_MSG_TYPE_MD_DROP_STABLE) { if (pMsg->msgType == TSDB_MSG_TYPE_SUBMIT || pMsg->msgType == TSDB_MSG_TYPE_MD_DROP_STABLE) {
SMsgDesc *pDesc = (SMsgDesc *)pCont; SMsgDesc *pDesc = (SMsgDesc *)pCont;
pDesc->numOfVnodes = htonl(pDesc->numOfVnodes); pDesc->numOfVnodes = htonl(pDesc->numOfVnodes);
pCont += sizeof(SMsgDesc); pCont += sizeof(SMsgDesc);
if (pDesc->numOfVnodes > 1) {
pRpcContext = calloc(sizeof(SRpcContext), 1);
pRpcContext->numOfVnodes = pDesc->numOfVnodes;
}
} }
while (leftLen > 0) { SMsgHead *pHead = (SMsgHead *) pCont;
SMsgHead *pHead = (SMsgHead *) pCont; pHead->vgId = htonl(pHead->vgId);
pHead->vgId = htonl(pHead->vgId); pHead->contLen = htonl(pHead->contLen);
pHead->contLen = htonl(pHead->contLen);
void *pVnode = dnodeGetVnode(pHead->vgId); taos_queue queue = vnodeGetWqueue(pHead->vgId);
if (pVnode == NULL) { if (queue) {
leftLen -= pHead->contLen;
pCont -= pHead->contLen;
continue;
}
// put message into queue // put message into queue
SWriteMsg *pWrite = (SWriteMsg *)taosAllocateQitem(sizeof(SWriteMsg)); SWriteMsg *pWrite = (SWriteMsg *)taosAllocateQitem(sizeof(SWriteMsg));
pWrite->rpcMsg = *pMsg; pWrite->rpcMsg = *pMsg;
pWrite->pCont = pCont; pWrite->pCont = pCont;
pWrite->contLen = pHead->contLen; pWrite->contLen = pHead->contLen;
pWrite->pRpcContext = pRpcContext;
taos_queue queue = dnodeGetVnodeWworker(pVnode);
taosWriteQitem(queue, TAOS_QTYPE_RPC, pWrite);
// next vnode taosWriteQitem(queue, TAOS_QTYPE_RPC, pWrite);
leftLen -= pHead->contLen; } else {
pCont -= pHead->contLen;
queuedMsgNum++;
}
if (queuedMsgNum == 0) {
SRpcMsg rpcRsp = { SRpcMsg rpcRsp = {
.handle = pMsg->handle, .handle = pMsg->handle,
.pCont = NULL, .pCont = NULL,
...@@ -142,7 +105,7 @@ void dnodeWrite(SRpcMsg *pMsg) { ...@@ -142,7 +105,7 @@ void dnodeWrite(SRpcMsg *pMsg) {
} }
} }
void *dnodeAllocateWriteWorker(void *pVnode) { void *dnodeAllocateWqueue(void *pVnode) {
SWriteWorker *pWorker = wWorkerPool.writeWorker + wWorkerPool.nextId; SWriteWorker *pWorker = wWorkerPool.writeWorker + wWorkerPool.nextId;
taos_queue *queue = taosOpenQueue(); taos_queue *queue = taosOpenQueue();
if (queue == NULL) return NULL; if (queue == NULL) return NULL;
...@@ -167,22 +130,44 @@ void *dnodeAllocateWriteWorker(void *pVnode) { ...@@ -167,22 +130,44 @@ void *dnodeAllocateWriteWorker(void *pVnode) {
wWorkerPool.nextId = (wWorkerPool.nextId + 1) % wWorkerPool.max; wWorkerPool.nextId = (wWorkerPool.nextId + 1) % wWorkerPool.max;
} }
dTrace("queue:%p is allocated for pVnode:%p", queue, pVnode);
return queue; return queue;
} }
void dnodeFreeWriteWorker(void *wqueue) { void dnodeFreeWqueue(void *wqueue) {
taosCloseQueue(wqueue); taosCloseQueue(wqueue);
// dynamically adjust the number of threads // dynamically adjust the number of threads
} }
void dnodeSendRpcWriteRsp(void *pVnode, void *param, int32_t code) {
SWriteMsg *pWrite = (SWriteMsg *)param;
if (code > 0) return;
SRpcMsg rpcRsp = {
.handle = pWrite->rpcMsg.handle,
.pCont = NULL,
.contLen = 0,
.code = code,
};
rpcSendResponse(&rpcRsp);
rpcFreeCont(pWrite->rpcMsg.pCont);
taosFreeQitem(pWrite);
vnodeRelease(pVnode);
}
static void *dnodeProcessWriteQueue(void *param) { static void *dnodeProcessWriteQueue(void *param) {
SWriteWorker *pWorker = (SWriteWorker *)param; SWriteWorker *pWorker = (SWriteWorker *)param;
taos_qall qall; taos_qall qall;
SWriteMsg *pWriteMsg; SWriteMsg *pWrite;
SWalHead *pHead;
int32_t numOfMsgs; int32_t numOfMsgs;
int type; int type;
void *pVnode; void *pVnode, *item;
qall = taosAllocateQall(); qall = taosAllocateQall();
...@@ -194,29 +179,35 @@ static void *dnodeProcessWriteQueue(void *param) { ...@@ -194,29 +179,35 @@ static void *dnodeProcessWriteQueue(void *param) {
} }
for (int32_t i = 0; i < numOfMsgs; ++i) { for (int32_t i = 0; i < numOfMsgs; ++i) {
// retrieve all items, and write them into WAL pWrite = NULL;
taosGetQitem(qall, &type, (void **)&pWriteMsg); taosGetQitem(qall, &type, &item);
if (type == TAOS_QTYPE_RPC) {
pWrite = (SWriteMsg *)item;
pHead = (SWalHead *)(pWrite->pCont - sizeof(SWalHead));
pHead->msgType = pWrite->rpcMsg.msgType;
pHead->version = 0;
pHead->len = pWrite->contLen;
} else {
pHead = (SWalHead *)item;
}
// walWrite(pVnode->whandle, writeMsg.rpcMsg.msgType, writeMsg.pCont, writeMsg.contLen); int32_t code = vnodeProcessWrite(pVnode, type, pHead, item);
if (pWrite) pWrite->rpcMsg.code = code;
} }
// flush WAL file walFsync(vnodeGetWal(pVnode));
// walFsync(pVnode->whandle);
// browse all items, and process them one by one // browse all items, and process them one by one
taosResetQitems(qall); taosResetQitems(qall);
for (int32_t i = 0; i < numOfMsgs; ++i) { for (int32_t i = 0; i < numOfMsgs; ++i) {
taosGetQitem(qall, &type, (void **)&pWriteMsg); taosGetQitem(qall, &type, &item);
if (type == TAOS_QTYPE_RPC) {
terrno = 0; pWrite = (SWriteMsg *)item;
if (dnodeProcessWriteMsgFp[pWriteMsg->rpcMsg.msgType]) { dnodeSendRpcWriteRsp(pVnode, item, pWrite->rpcMsg.code);
(*dnodeProcessWriteMsgFp[pWriteMsg->rpcMsg.msgType]) (pVnode, pWriteMsg);
} else { } else {
terrno = TSDB_CODE_MSG_NOT_PROCESSED; taosFreeQitem(item);
vnodeRelease(pVnode);
} }
dnodeProcessWriteResult(pVnode, pWriteMsg);
taosFreeQitem(pWriteMsg);
} }
} }
...@@ -225,36 +216,6 @@ static void *dnodeProcessWriteQueue(void *param) { ...@@ -225,36 +216,6 @@ static void *dnodeProcessWriteQueue(void *param) {
return NULL; return NULL;
} }
static void dnodeProcessWriteResult(void *pVnode, SWriteMsg *pWrite) {
SRpcContext *pRpcContext = pWrite->pRpcContext;
int32_t code = 0;
dnodeReleaseVnode(pVnode);
if (pRpcContext) {
if (terrno) {
if (pRpcContext->code == 0) pRpcContext->code = terrno;
}
int32_t count = atomic_add_fetch_32(&pRpcContext->count, 1);
if (count < pRpcContext->numOfVnodes) {
// not over yet, multiple vnodes
return;
}
// over, result can be merged now
code = pRpcContext->code;
} else {
code = terrno;
}
SRpcMsg rsp;
rsp.handle = pWrite->rpcMsg.handle;
rsp.code = code;
rsp.pCont = NULL;
rpcSendResponse(&rsp);
rpcFreeCont(pWrite->rpcMsg.pCont); // free the received message
}
static void dnodeHandleIdleWorker(SWriteWorker *pWorker) { static void dnodeHandleIdleWorker(SWriteWorker *pWorker) {
int32_t num = taosGetQueueNumber(pWorker->qset); int32_t num = taosGetQueueNumber(pWorker->qset);
...@@ -269,174 +230,3 @@ static void dnodeHandleIdleWorker(SWriteWorker *pWorker) { ...@@ -269,174 +230,3 @@ static void dnodeHandleIdleWorker(SWriteWorker *pWorker) {
} }
} }
static void dnodeProcessSubmitMsg(void *pVnode, SWriteMsg *pMsg) {
dTrace("pVnode:%p, submit msg is disposed", pVnode);
SShellSubmitRspMsg *pRsp = rpcMallocCont(sizeof(SShellSubmitRspMsg));
pRsp->code = 0;
pRsp->numOfRows = htonl(1);
pRsp->affectedRows = htonl(1);
pRsp->numOfFailedBlocks = 0;
void* tsdb = dnodeGetVnodeTsdb(pVnode);
assert(tsdb != NULL);
tsdbInsertData(tsdb, pMsg->pCont);
SRpcMsg rpcRsp = {
.handle = pMsg->rpcMsg.handle,
.pCont = pRsp,
.contLen = sizeof(SShellSubmitRspMsg),
.code = 0,
.msgType = 0
};
rpcSendResponse(&rpcRsp);
}
static void dnodeProcessCreateTableMsg(void *pVnode, SWriteMsg *pMsg) {
SMDCreateTableMsg *pTable = pMsg->rpcMsg.pCont;
SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
dTrace("pVnode:%p, table:%s, start to create in dnode, vgroup:%d", pVnode, pTable->tableId, pTable->vgId);
pTable->numOfColumns = htons(pTable->numOfColumns);
pTable->numOfTags = htons(pTable->numOfTags);
pTable->sid = htonl(pTable->sid);
pTable->sversion = htonl(pTable->sversion);
pTable->tagDataLen = htonl(pTable->tagDataLen);
pTable->sqlDataLen = htonl(pTable->sqlDataLen);
pTable->uid = htobe64(pTable->uid);
pTable->superTableUid = htobe64(pTable->superTableUid);
pTable->createdTime = htobe64(pTable->createdTime);
SSchema *pSchema = (SSchema *) pTable->data;
int totalCols = pTable->numOfColumns + pTable->numOfTags;
for (int i = 0; i < totalCols; i++) {
pSchema[i].colId = htons(pSchema[i].colId);
pSchema[i].bytes = htons(pSchema[i].bytes);
}
STableCfg tCfg;
tsdbInitTableCfg(&tCfg, pTable->tableType, pTable->uid, pTable->sid);
STSchema *pDestSchema = tdNewSchema(pTable->numOfColumns);
for (int i = 0; i < pTable->numOfColumns; i++) {
tdSchemaAppendCol(pDestSchema, pSchema[i].type, pSchema[i].colId, pSchema[i].bytes);
}
tsdbTableSetSchema(&tCfg, pDestSchema, false);
if (pTable->numOfTags != 0) {
STSchema *pDestTagSchema = tdNewSchema(pTable->numOfTags);
for (int i = pTable->numOfColumns; i < totalCols; i++) {
tdSchemaAppendCol(pDestTagSchema, pSchema[i].type, pSchema[i].colId, pSchema[i].bytes);
}
tsdbTableSetTagSchema(&tCfg, pDestTagSchema, false);
char *pTagData = pTable->data + totalCols * sizeof(SSchema);
int accumBytes = 0;
SDataRow dataRow = tdNewDataRowFromSchema(pDestTagSchema);
for (int i = 0; i < pTable->numOfTags; i++) {
tdAppendColVal(dataRow, pTagData + accumBytes, pDestTagSchema->columns + i);
accumBytes += pSchema[i + pTable->numOfColumns].bytes;
}
tsdbTableSetTagValue(&tCfg, dataRow, false);
}
void *pTsdb = dnodeGetVnodeTsdb(pVnode);
rpcRsp.code = tsdbCreateTable(pTsdb, &tCfg);
dTrace("pVnode:%p, table:%s, create table result:%s", pVnode, pTable->tableId, tstrerror(rpcRsp.code));
rpcSendResponse(&rpcRsp);
}
static void dnodeProcessDropTableMsg(void *pVnode, SWriteMsg *pMsg) {
SMDDropTableMsg *pTable = pMsg->rpcMsg.pCont;
SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
dTrace("pVnode:%p, table:%s, start to drop in dnode, vgroup:%d", pVnode, pTable->tableId, pTable->vgId);
STableId tableId = {
.uid = htobe64(pTable->uid),
.tid = htonl(pTable->sid)
};
void *pTsdb = dnodeGetVnodeTsdb(pVnode);
rpcRsp.code = tsdbDropTable(pTsdb, tableId);
dTrace("pVnode:%p, table:%s, drop table result:%s", pVnode, pTable->tableId, tstrerror(rpcRsp.code));
rpcSendResponse(&rpcRsp);
}
static void dnodeProcessAlterTableMsg(void *pVnode, SWriteMsg *pMsg) {
SMDCreateTableMsg *pTable = pMsg->rpcMsg.pCont;
SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
dTrace("pVnode:%p, table:%s, start to alter in dnode, vgroup:%d", pVnode, pTable->tableId, pTable->vgId);
pTable->numOfColumns = htons(pTable->numOfColumns);
pTable->numOfTags = htons(pTable->numOfTags);
pTable->sid = htonl(pTable->sid);
pTable->sversion = htonl(pTable->sversion);
pTable->tagDataLen = htonl(pTable->tagDataLen);
pTable->sqlDataLen = htonl(pTable->sqlDataLen);
pTable->uid = htobe64(pTable->uid);
pTable->superTableUid = htobe64(pTable->superTableUid);
pTable->createdTime = htobe64(pTable->createdTime);
SSchema *pSchema = (SSchema *) pTable->data;
int totalCols = pTable->numOfColumns + pTable->numOfTags;
for (int i = 0; i < totalCols; i++) {
pSchema[i].colId = htons(pSchema[i].colId);
pSchema[i].bytes = htons(pSchema[i].bytes);
}
STableCfg tCfg;
tsdbInitTableCfg(&tCfg, pTable->tableType, pTable->uid, pTable->sid);
STSchema *pDestSchema = tdNewSchema(pTable->numOfColumns);
for (int i = 0; i < pTable->numOfColumns; i++) {
tdSchemaAppendCol(pDestSchema, pSchema[i].type, pSchema[i].colId, pSchema[i].bytes);
}
tsdbTableSetSchema(&tCfg, pDestSchema, false);
if (pTable->numOfTags != 0) {
STSchema *pDestTagSchema = tdNewSchema(pTable->numOfTags);
for (int i = pTable->numOfColumns; i < totalCols; i++) {
tdSchemaAppendCol(pDestTagSchema, pSchema[i].type, pSchema[i].colId, pSchema[i].bytes);
}
tsdbTableSetSchema(&tCfg, pDestTagSchema, false);
char *pTagData = pTable->data + totalCols * sizeof(SSchema);
int accumBytes = 0;
SDataRow dataRow = tdNewDataRowFromSchema(pDestTagSchema);
for (int i = 0; i < pTable->numOfTags; i++) {
tdAppendColVal(dataRow, pTagData + accumBytes, pDestTagSchema->columns + i);
accumBytes += pSchema[i + pTable->numOfColumns].bytes;
}
tsdbTableSetTagValue(&tCfg, dataRow, false);
}
void *pTsdb = dnodeGetVnodeTsdb(pVnode);
rpcRsp.code = tsdbAlterTable(pTsdb, &tCfg);
dTrace("pVnode:%p, table:%s, alter table result:%s", pVnode, pTable->tableId, tstrerror(rpcRsp.code));
rpcSendResponse(&rpcRsp);
}
static void dnodeProcessDropStableMsg(void *pVnode, SWriteMsg *pMsg) {
SMDDropSTableMsg *pTable = pMsg->rpcMsg.pCont;
SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
dTrace("pVnode:%p, stable:%s, start to it drop in dnode, vgroup:%d", pVnode, pTable->tableId, pTable->vgId);
pTable->uid = htobe64(pTable->uid);
// TODO: drop stable in vvnode
//void *pTsdb = dnodeGetVnodeTsdb(pMsg->pVnode);
//rpcRsp.code = tsdbDropSTable(pTsdb, pTable->uid);
rpcRsp.code = TSDB_CODE_SUCCESS;
dTrace("pVnode:%p, stable:%s, drop stable result:%s", pVnode, pTable->tableId, tstrerror(rpcRsp.code));
rpcSendResponse(&rpcRsp);
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "ihash.h"
#include "taoserror.h"
#include "taosmsg.h"
#include "tlog.h"
#include "trpc.h"
#include "tstatus.h"
#include "tsdb.h"
#include "ttime.h"
#include "ttimer.h"
#include "twal.h"
#include "dnodeMClient.h"
#include "dnodeMgmt.h"
#include "dnodeRead.h"
#include "dnodeWrite.h"
#include "vnode.h"
#include "vnodeInt.h"
extern void *tsDnodeVnodesHash;
static void vnodeCleanUp(SVnodeObj *pVnode);
int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) {
int32_t code;
SVnodeObj *pTemp = (SVnodeObj *) taosGetIntHashData(tsDnodeVnodesHash, pVnodeCfg->cfg.vgId);
if (pTemp != NULL) {
dPrint("vgId:%d, vnode already exist, pVnode:%p", pVnodeCfg->cfg.vgId, pTemp);
return TSDB_CODE_SUCCESS;
}
STsdbCfg tsdbCfg = {0};
tsdbCfg.precision = pVnodeCfg->cfg.precision;
tsdbCfg.tsdbId = pVnodeCfg->cfg.vgId;
tsdbCfg.maxTables = pVnodeCfg->cfg.maxSessions;
tsdbCfg.daysPerFile = pVnodeCfg->cfg.daysPerFile;
tsdbCfg.minRowsPerFileBlock = -1;
tsdbCfg.maxRowsPerFileBlock = -1;
tsdbCfg.keep = -1;
tsdbCfg.maxCacheSize = -1;
char rootDir[TSDB_FILENAME_LEN] = {0};
sprintf(rootDir, "%s/vnode%d", tsVnodeDir, pVnodeCfg->cfg.vgId);
if (mkdir(rootDir, 0755) != 0) {
if (errno == EACCES) {
return TSDB_CODE_NO_DISK_PERMISSIONS;
} else if (errno == ENOSPC) {
return TSDB_CODE_SERV_NO_DISKSPACE;
} else if (errno == EEXIST) {
} else {
return TSDB_CODE_VG_INIT_FAILED;
}
}
char tsdbDir[TSDB_FILENAME_LEN] = {0};
sprintf(tsdbDir, "%s/vnode%d/tsdb", tsVnodeDir, pVnodeCfg->cfg.vgId);
code = tsdbCreateRepo(tsdbDir, &tsdbCfg, NULL);
if (code <0) {
dError("vgId:%d, failed to create tsdb in vnode, reason:%s", pVnodeCfg->cfg.vgId, tstrerror(terrno));
return code;
}
dPrint("vgId:%d, vnode is created", pVnodeCfg->cfg.vgId);
code = vnodeOpen(pVnodeCfg->cfg.vgId, rootDir);
return code;
}
int32_t vnodeDrop(int32_t vgId) {
SVnodeObj *pVnode = (SVnodeObj *) taosGetIntHashData(tsDnodeVnodesHash, vgId);
if (pVnode == NULL) {
dTrace("vgId:%d, failed to drop, vgId not exist", vgId);
return TSDB_CODE_INVALID_VGROUP_ID;
}
dTrace("pVnode:%p vgId:%d, vnode will be dropped", pVnode, pVnode->vgId);
pVnode->status = VN_STATUS_DELETING;
vnodeCleanUp(pVnode);
return TSDB_CODE_SUCCESS;
}
int32_t vnodeOpen(int32_t vnode, char *rootDir) {
char temp[TSDB_FILENAME_LEN];
SVnodeObj vnodeObj = {0};
vnodeObj.vgId = vnode;
vnodeObj.status = VN_STATUS_INIT;
vnodeObj.refCount = 1;
vnodeObj.version = 0;
SVnodeObj *pVnode = (SVnodeObj *)taosAddIntHash(tsDnodeVnodesHash, vnodeObj.vgId, (char *)(&vnodeObj));
sprintf(temp, "%s/tsdb", rootDir);
void *pTsdb = tsdbOpenRepo(temp);
if (pTsdb == NULL) {
dError("pVnode:%p vgId:%d, failed to open tsdb at %s(%s)", pVnode, pVnode->vgId, temp, tstrerror(terrno));
taosDeleteIntHash(tsDnodeVnodesHash, pVnode->vgId);
return terrno;
}
pVnode->wqueue = dnodeAllocateWqueue(pVnode);
pVnode->rqueue = dnodeAllocateRqueue(pVnode);
sprintf(temp, "%s/wal", rootDir);
pVnode->wal = walOpen(temp, 3, tsCommitLog);
pVnode->tsdb = pTsdb;
pVnode->sync = NULL;
pVnode->events = NULL;
pVnode->cq = NULL;
walRestore(pVnode->wal, pVnode, vnodeWriteToQueue);
pVnode->status = VN_STATUS_READY;
dTrace("pVnode:%p vgId:%d, vnode is opened in %s", pVnode, pVnode->vgId, rootDir);
return TSDB_CODE_SUCCESS;
}
int32_t vnodeClose(void *param) {
SVnodeObj *pVnode = (SVnodeObj *)param;
dTrace("pVnode:%p vgId:%d, vnode will be closed", pVnode, pVnode->vgId);
pVnode->status = VN_STATUS_CLOSING;
vnodeCleanUp(pVnode);
return 0;
}
void vnodeRelease(void *pVnodeRaw) {
SVnodeObj *pVnode = pVnodeRaw;
int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1);
if (refCount > 0) return;
// remove read queue
dnodeFreeRqueue(pVnode->rqueue);
pVnode->rqueue = NULL;
// remove write queue
dnodeFreeWqueue(pVnode->wqueue);
pVnode->wqueue = NULL;
if (pVnode->status == VN_STATUS_DELETING) {
// remove the whole directory
}
dTrace("pVnode:%p vgId:%d, vnode is released", pVnode, pVnode->vgId);
}
void *vnodeGetVnode(int32_t vgId) {
SVnodeObj *pVnode = (SVnodeObj *) taosGetIntHashData(tsDnodeVnodesHash, vgId);
if (pVnode == NULL) {
terrno = TSDB_CODE_INVALID_VGROUP_ID;
return NULL;
}
atomic_add_fetch_32(&pVnode->refCount, 1);
dTrace("pVnode:%p vgId:%d, get vnode, refCount:%d", pVnode, pVnode->vgId, pVnode->refCount);
return pVnode;
}
void *vnodeGetRqueue(void *pVnode) {
return ((SVnodeObj *)pVnode)->rqueue;
}
void *vnodeGetWqueue(int32_t vgId) {
SVnodeObj *pVnode = vnodeGetVnode(vgId);
if (pVnode == NULL) return NULL;
return pVnode->wqueue;
}
void *vnodeGetWal(void *pVnode) {
return ((SVnodeObj *)pVnode)->wal;
}
void *vnodeGetTsdb(void *pVnode) {
return ((SVnodeObj *)pVnode)->tsdb;
}
static void vnodeCleanUp(SVnodeObj *pVnode) {
taosDeleteIntHash(tsDnodeVnodesHash, pVnode->vgId);
//syncStop(pVnode->sync);
tsdbCloseRepo(pVnode->tsdb);
walClose(pVnode->wal);
vnodeRelease(pVnode);
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "taosmsg.h"
#include "taoserror.h"
#include "tlog.h"
#include "tqueue.h"
#include "trpc.h"
#include "tsdb.h"
#include "twal.h"
#include "dataformat.h"
#include "dnodeWrite.h"
#include "dnodeMgmt.h"
#include "vnode.h"
#include "vnodeInt.h"
static int32_t (*vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *, void *, void*);
static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pMsg, void *);
static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pMsg, void *);
static int32_t vnodeProcessDropTableMsg(SVnodeObj *pVnode, void *pMsg, void *);
static int32_t vnodeProcessAlterTableMsg(SVnodeObj *pVnode, void *pMsg, void *);
static int32_t vnodeProcessDropStableMsg(SVnodeObj *pVnode, void *pMsg, void *);
int32_t vnodeInitWrite() {
vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_SUBMIT] = vnodeProcessSubmitMsg;
vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE] = vnodeProcessCreateTableMsg;
vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_DROP_TABLE] = vnodeProcessDropTableMsg;
vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE] = vnodeProcessAlterTableMsg;
vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_DROP_STABLE] = vnodeProcessDropStableMsg;
return 0;
}
int32_t vnodeProcessWrite(void *param, int qtype, SWalHead *pHead, void *item) {
int32_t code = 0;
SVnodeObj *pVnode = (SVnodeObj *)param;
if (pVnode->status == VN_STATUS_DELETING || pVnode->status == VN_STATUS_CLOSING)
return TSDB_CODE_NOT_ACTIVE_VNODE;
if (pHead->version == 0) { // from client
if (pVnode->status != VN_STATUS_READY)
return TSDB_CODE_NOT_ACTIVE_VNODE;
// if (pVnode->replica > 1 && pVnode->role != TAOS_SYNC_ROLE_MASTER)
// assign version
pVnode->version++;
pHead->version = pVnode->version;
} else {
// for data from WAL or forward, version may be smaller
if (pHead->version <= pVnode->version) return 0;
}
// more status and role checking here
pVnode->version = pHead->version;
// write into WAL
code = walWrite(pVnode->wal, pHead);
if ( code < 0) return code;
code = (*vnodeProcessWriteMsgFp[pHead->msgType])(pVnode, pHead->cont, item);
if (code < 0) return code;
/* forward
if (pVnode->replica > 1 && pVnode->role == TAOS_SYNC_ROLE_MASTER) {
code = syncForwardToPeer(pVnode->sync, pHead, item);
}
*/
return code;
}
static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pCont, void *item) {
int32_t code = 0;
// save insert result into item
dTrace("pVnode:%p vgId:%d, submit msg is processed", pVnode, pVnode->vgId);
code = tsdbInsertData(pVnode->tsdb, pCont);
return code;
}
static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, void *item) {
SMDCreateTableMsg *pTable = pCont;
int32_t code = 0;
dTrace("pVnode:%p vgId:%d, table:%s, start to create", pVnode, pVnode->vgId, pTable->tableId);
pTable->numOfColumns = htons(pTable->numOfColumns);
pTable->numOfTags = htons(pTable->numOfTags);
pTable->sid = htonl(pTable->sid);
pTable->sversion = htonl(pTable->sversion);
pTable->tagDataLen = htonl(pTable->tagDataLen);
pTable->sqlDataLen = htonl(pTable->sqlDataLen);
pTable->uid = htobe64(pTable->uid);
pTable->superTableUid = htobe64(pTable->superTableUid);
pTable->createdTime = htobe64(pTable->createdTime);
SSchema *pSchema = (SSchema *) pTable->data;
int totalCols = pTable->numOfColumns + pTable->numOfTags;
for (int i = 0; i < totalCols; i++) {
pSchema[i].colId = htons(pSchema[i].colId);
pSchema[i].bytes = htons(pSchema[i].bytes);
}
STableCfg tCfg;
tsdbInitTableCfg(&tCfg, pTable->tableType, pTable->uid, pTable->sid);
STSchema *pDestSchema = tdNewSchema(pTable->numOfColumns);
for (int i = 0; i < pTable->numOfColumns; i++) {
tdSchemaAppendCol(pDestSchema, pSchema[i].type, pSchema[i].colId, pSchema[i].bytes);
}
tsdbTableSetSchema(&tCfg, pDestSchema, false);
if (pTable->numOfTags != 0) {
STSchema *pDestTagSchema = tdNewSchema(pTable->numOfTags);
for (int i = pTable->numOfColumns; i < totalCols; i++) {
tdSchemaAppendCol(pDestTagSchema, pSchema[i].type, pSchema[i].colId, pSchema[i].bytes);
}
tsdbTableSetTagSchema(&tCfg, pDestTagSchema, false);
char *pTagData = pTable->data + totalCols * sizeof(SSchema);
int accumBytes = 0;
SDataRow dataRow = tdNewDataRowFromSchema(pDestTagSchema);
for (int i = 0; i < pTable->numOfTags; i++) {
tdAppendColVal(dataRow, pTagData + accumBytes, pDestTagSchema->columns + i);
accumBytes += pSchema[i + pTable->numOfColumns].bytes;
}
tsdbTableSetTagValue(&tCfg, dataRow, false);
}
void *pTsdb = vnodeGetTsdb(pVnode);
code = tsdbCreateTable(pTsdb, &tCfg);
dTrace("pVnode:%p vgId:%d, table:%s is created, result:%x", pVnode, pVnode->vgId, pTable->tableId, code);
return code;
}
static int32_t vnodeProcessDropTableMsg(SVnodeObj *pVnode, void *pCont, void *item) {
SMDDropTableMsg *pTable = pCont;
int32_t code = 0;
dTrace("pVnode:%p vgId:%d, table:%s, start to drop", pVnode, pVnode->vgId, pTable->tableId);
STableId tableId = {
.uid = htobe64(pTable->uid),
.tid = htonl(pTable->sid)
};
code = tsdbDropTable(pVnode->tsdb, tableId);
return code;
}
static int32_t vnodeProcessAlterTableMsg(SVnodeObj *pVnode, void *pCont, void *item) {
SMDCreateTableMsg *pTable = pCont;
int32_t code = 0;
dTrace("pVnode:%p vgId:%d, table:%s, start to alter", pVnode, pVnode->vgId, pTable->tableId);
pTable->numOfColumns = htons(pTable->numOfColumns);
pTable->numOfTags = htons(pTable->numOfTags);
pTable->sid = htonl(pTable->sid);
pTable->sversion = htonl(pTable->sversion);
pTable->tagDataLen = htonl(pTable->tagDataLen);
pTable->sqlDataLen = htonl(pTable->sqlDataLen);
pTable->uid = htobe64(pTable->uid);
pTable->superTableUid = htobe64(pTable->superTableUid);
pTable->createdTime = htobe64(pTable->createdTime);
SSchema *pSchema = (SSchema *) pTable->data;
int totalCols = pTable->numOfColumns + pTable->numOfTags;
for (int i = 0; i < totalCols; i++) {
pSchema[i].colId = htons(pSchema[i].colId);
pSchema[i].bytes = htons(pSchema[i].bytes);
}
STableCfg tCfg;
tsdbInitTableCfg(&tCfg, pTable->tableType, pTable->uid, pTable->sid);
STSchema *pDestSchema = tdNewSchema(pTable->numOfColumns);
for (int i = 0; i < pTable->numOfColumns; i++) {
tdSchemaAppendCol(pDestSchema, pSchema[i].type, pSchema[i].colId, pSchema[i].bytes);
}
tsdbTableSetSchema(&tCfg, pDestSchema, false);
if (pTable->numOfTags != 0) {
STSchema *pDestTagSchema = tdNewSchema(pTable->numOfTags);
for (int i = pTable->numOfColumns; i < totalCols; i++) {
tdSchemaAppendCol(pDestTagSchema, pSchema[i].type, pSchema[i].colId, pSchema[i].bytes);
}
tsdbTableSetSchema(&tCfg, pDestTagSchema, false);
char *pTagData = pTable->data + totalCols * sizeof(SSchema);
int accumBytes = 0;
SDataRow dataRow = tdNewDataRowFromSchema(pDestTagSchema);
for (int i = 0; i < pTable->numOfTags; i++) {
tdAppendColVal(dataRow, pTagData + accumBytes, pDestTagSchema->columns + i);
accumBytes += pSchema[i + pTable->numOfColumns].bytes;
}
tsdbTableSetTagValue(&tCfg, dataRow, false);
}
code = tsdbAlterTable(pVnode->tsdb, &tCfg);
dTrace("pVnode:%p vgId:%d, table:%s, alter table result:%d", pVnode, pVnode->vgId, pTable->tableId, code);
return code;
}
static int32_t vnodeProcessDropStableMsg(SVnodeObj *pVnode, void *pCont, void *item) {
SMDDropSTableMsg *pTable = pCont;
int32_t code = 0;
dTrace("pVnode:%p vgId:%d, stable:%s, start to drop", pVnode, pVnode->vgId, pTable->tableId);
pTable->uid = htobe64(pTable->uid);
// TODO: drop stable in vvnode
//void *pTsdb = dnodeGetVnodeTsdb(pMsg->pVnode);
//rpcRsp.code = tsdbDropSTable(pTsdb, pTable->uid);
code = TSDB_CODE_SUCCESS;
dTrace("pVnode:%p vgId:%d, stable:%s, drop stable result:%x", pVnode, pTable->tableId, code);
return code;
}
int vnodeWriteToQueue(void *param, SWalHead *pHead, int type) {
SVnodeObj *pVnode = param;
int size = sizeof(SWalHead) + pHead->len;
SWalHead *pWal = (SWalHead *)taosAllocateQitem(size);
memcpy(pWal, pHead, size);
taosWriteQitem(pVnode->wqueue, type, pHead);
return 0;
}
...@@ -40,7 +40,7 @@ void walClose(twal_h); ...@@ -40,7 +40,7 @@ void walClose(twal_h);
int walRenew(twal_h); int walRenew(twal_h);
int walWrite(twal_h, SWalHead *); int walWrite(twal_h, SWalHead *);
void walFsync(twal_h); void walFsync(twal_h);
int walRestore(twal_h, void *pVnode, int (*writeFp)(void *ahandle, void *pWalHead)); int walRestore(twal_h, void *pVnode, int (*writeFp)(void *ahandle, SWalHead *pHead, int type));
int walGetWalFile(twal_h, char *name, uint32_t *index); int walGetWalFile(twal_h, char *name, uint32_t *index);
extern int wDebugFlag; extern int wDebugFlag;
......
...@@ -53,7 +53,7 @@ void tsdbFreeCfg(STsdbCfg *pCfg); ...@@ -53,7 +53,7 @@ void tsdbFreeCfg(STsdbCfg *pCfg);
// --------- TSDB REPOSITORY DEFINITION // --------- TSDB REPOSITORY DEFINITION
typedef void tsdb_repo_t; // use void to hide implementation details from outside typedef void tsdb_repo_t; // use void to hide implementation details from outside
tsdb_repo_t * tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg, void *limiter); int tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg, void *limiter);
int32_t tsdbDropRepo(tsdb_repo_t *repo); int32_t tsdbDropRepo(tsdb_repo_t *repo);
tsdb_repo_t * tsdbOpenRepo(char *tsdbDir); tsdb_repo_t * tsdbOpenRepo(char *tsdbDir);
int32_t tsdbCloseRepo(tsdb_repo_t *repo); int32_t tsdbCloseRepo(tsdb_repo_t *repo);
...@@ -332,4 +332,4 @@ SArray *tsdbQueryTableList(tsdb_repo_t* tsdb, int64_t uid, const wchar_t *pTagCo ...@@ -332,4 +332,4 @@ SArray *tsdbQueryTableList(tsdb_repo_t* tsdb, int64_t uid, const wchar_t *pTagCo
} }
#endif #endif
#endif // _TD_TSDB_H_ #endif // _TD_TSDB_H_
\ No newline at end of file
...@@ -108,67 +108,40 @@ void tsdbFreeCfg(STsdbCfg *pCfg) { ...@@ -108,67 +108,40 @@ void tsdbFreeCfg(STsdbCfg *pCfg) {
* *
* @return a TSDB repository handle on success, NULL for failure * @return a TSDB repository handle on success, NULL for failure
*/ */
tsdb_repo_t *tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg, void *limiter /* TODO */) { int32_t tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg, void *limiter /* TODO */) {
if (rootDir == NULL) return NULL;
if (mkdir(rootDir, 0755) != 0) {
if (errno == EACCES) {
return TSDB_CODE_NO_DISK_PERMISSIONS;
} else if (errno == ENOSPC) {
return TSDB_CODE_SERV_NO_DISKSPACE;
} else if (errno == EEXIST) {
} else {
return TSDB_CODE_VG_INIT_FAILED;
}
}
if (access(rootDir, F_OK | R_OK | W_OK) == -1) return NULL; if (access(rootDir, F_OK | R_OK | W_OK) == -1) return -1;
if (tsdbCheckAndSetDefaultCfg(pCfg) < 0) { if (tsdbCheckAndSetDefaultCfg(pCfg) < 0) {
return NULL; return -1;
} }
STsdbRepo *pRepo = (STsdbRepo *)malloc(sizeof(STsdbRepo)); STsdbRepo *pRepo = (STsdbRepo *)malloc(sizeof(STsdbRepo));
if (pRepo == NULL) { if (pRepo == NULL) {
return NULL; return -1;
} }
pRepo->rootDir = strdup(rootDir); pRepo->rootDir = strdup(rootDir);
pRepo->config = *pCfg; pRepo->config = *pCfg;
pRepo->limiter = limiter; pRepo->limiter = limiter;
pthread_mutex_init(&pRepo->mutex, NULL);
// Create the environment files and directories // Create the environment files and directories
if (tsdbSetRepoEnv(pRepo) < 0) { int32_t code = tsdbSetRepoEnv(pRepo);
free(pRepo->rootDir);
free(pRepo); free(pRepo->rootDir);
return NULL; free(pRepo);
} return code;
// Initialize meta
STsdbMeta *pMeta = tsdbInitMeta(rootDir, pCfg->maxTables);
if (pMeta == NULL) {
free(pRepo->rootDir);
free(pRepo);
return NULL;
}
pRepo->tsdbMeta = pMeta;
// Initialize cache
STsdbCache *pCache = tsdbInitCache(pCfg->maxCacheSize, -1, (tsdb_repo_t *)pRepo);
if (pCache == NULL) {
free(pRepo->rootDir);
tsdbFreeMeta(pRepo->tsdbMeta);
free(pRepo);
return NULL;
}
pRepo->tsdbCache = pCache;
// Initialize file handle
char dataDir[128] = "\0";
tsdbGetDataDirName(pRepo, dataDir);
pRepo->tsdbFileH =
tsdbInitFileH(dataDir, pCfg->maxTables);
if (pRepo->tsdbFileH == NULL) {
free(pRepo->rootDir);
tsdbFreeCache(pRepo->tsdbCache);
tsdbFreeMeta(pRepo->tsdbMeta);
free(pRepo);
return NULL;
}
pRepo->state = TSDB_REPO_STATE_ACTIVE;
return (tsdb_repo_t *)pRepo;
} }
/** /**
...@@ -1236,4 +1209,4 @@ int tsdbWriteBlockToFile(STsdbRepo *pRepo, SFileGroup *pGroup, SCompIdx *pIdx, S ...@@ -1236,4 +1209,4 @@ int tsdbWriteBlockToFile(STsdbRepo *pRepo, SFileGroup *pGroup, SCompIdx *pIdx, S
} }
return numOfPointsToWrite; return numOfPointsToWrite;
} }
\ No newline at end of file
...@@ -26,6 +26,7 @@ ...@@ -26,6 +26,7 @@
#include "tchecksum.h" #include "tchecksum.h"
#include "tutil.h" #include "tutil.h"
#include "twal.h" #include "twal.h"
#include "tqueue.h"
#define walPrefix "wal" #define walPrefix "wal"
#define wError(...) if (wDebugFlag & DEBUG_ERROR) {tprintf("ERROR WAL ", wDebugFlag, __VA_ARGS__);} #define wError(...) if (wDebugFlag & DEBUG_ERROR) {tprintf("ERROR WAL ", wDebugFlag, __VA_ARGS__);}
...@@ -48,7 +49,7 @@ int wDebugFlag = 135; ...@@ -48,7 +49,7 @@ int wDebugFlag = 135;
static uint32_t walSignature = 0xFAFBFDFE; static uint32_t walSignature = 0xFAFBFDFE;
static int walHandleExistingFiles(char *path); static int walHandleExistingFiles(char *path);
static int walRestoreWalFile(char *name, void *pVnode, int (*writeFp)(void *, void *)); static int walRestoreWalFile(char *name, void *pVnode, int (*writeFp)(void *, SWalHead *, int));
static int walRemoveWalFiles(char *path); static int walRemoveWalFiles(char *path);
void *walOpen(char *path, int max, int level) { void *walOpen(char *path, int max, int level) {
...@@ -168,7 +169,7 @@ void walFsync(void *handle) { ...@@ -168,7 +169,7 @@ void walFsync(void *handle) {
fsync(pWal->fd); fsync(pWal->fd);
} }
int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *)) { int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, SWalHead *, int)) {
SWal *pWal = (SWal *)handle; SWal *pWal = (SWal *)handle;
int code = 0; int code = 0;
struct dirent *ent; struct dirent *ent;
...@@ -245,43 +246,45 @@ int walGetWalFile(void *handle, char *name, uint32_t *index) { ...@@ -245,43 +246,45 @@ int walGetWalFile(void *handle, char *name, uint32_t *index) {
return code; return code;
} }
static int walRestoreWalFile(char *name, void *pVnode, int (*writeFp)(void *, void *)) { static int walRestoreWalFile(char *name, void *pVnode, int (*writeFp)(void *, SWalHead *, int)) {
SWalHead walHead; int code = 0;
int code = 0;
char *buffer = malloc(1024000); // size for one record
if (buffer == NULL) return -1;
SWalHead *pHead = (SWalHead *)buffer;
int fd = open(name, O_RDONLY); int fd = open(name, O_RDONLY);
if (fd < 0) { if (fd < 0) {
wError("wal:%s, failed to open for restore(%s)", name, strerror(errno)); wError("wal:%s, failed to open for restore(%s)", name, strerror(errno));
free(buffer);
return -1; return -1;
} }
wTrace("wal:%s, start to restore", name); wTrace("wal:%s, start to restore", name);
while (1) { while (1) {
int ret = read(fd, &walHead, sizeof(walHead)); int ret = read(fd, pHead, sizeof(SWalHead));
if ( ret == 0) { code = 0; break;} if ( ret == 0) { code = 0; break;}
if (ret != sizeof(walHead)) { if (ret != sizeof(SWalHead)) {
wWarn("wal:%s, failed to read head, skip, ret:%d(%s)", name, ret, strerror(errno)); wWarn("wal:%s, failed to read head, skip, ret:%d(%s)", name, ret, strerror(errno));
break; break;
} }
if (taosCheckChecksumWhole((uint8_t *)&walHead, sizeof(walHead))) { if (taosCheckChecksumWhole((uint8_t *)pHead, sizeof(SWalHead))) {
wWarn("wal:%s, cksum is messed up, skip the rest of file", name); wWarn("wal:%s, cksum is messed up, skip the rest of file", name);
break; break;
} }
char *buffer = malloc(sizeof(SWalHead) + walHead.len); ret = read(fd, pHead->cont, pHead->len);
memcpy(buffer, &walHead, sizeof(walHead)); if ( ret != pHead->len) {
wWarn("wal:%s, failed to read body, skip, len:%d ret:%d", name, pHead->len, ret);
ret = read(fd, buffer+sizeof(walHead), walHead.len);
if ( ret != walHead.len) {
wWarn("wal:%s, failed to read body, skip, len:%d ret:%d", name, walHead.len, ret);
break; break;
} }
// write into queue // write into queue
(*writeFp)(pVnode, buffer); (*writeFp)(pVnode, buffer, TAOS_QTYPE_WAL);
} }
return code; return code;
......
...@@ -21,16 +21,14 @@ ...@@ -21,16 +21,14 @@
int64_t ver = 0; int64_t ver = 0;
void *pWal = NULL; void *pWal = NULL;
int writeToQueue(void *pVnode, void *data) { int writeToQueue(void *pVnode, SWalHead *pHead, int type) {
SWalHead *pHead = (SWalHead *)data;
// do nothing // do nothing
if (pHead->version > ver) if (pHead->version > ver)
ver = pHead->version; ver = pHead->version;
walWrite(pWal, pHead); walWrite(pWal, pHead);
free(data); free(pHead);
return 0; return 0;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册