From 88b413ccc7ef321541ca560cf9ec583899aa101d Mon Sep 17 00:00:00 2001 From: slguan Date: Sun, 8 Mar 2020 21:52:53 +0800 Subject: [PATCH] fix compile error in dnode module --- src/dnode/CMakeLists.txt | 4 +- src/dnode/inc/dnodeMClient.h | 30 +++ src/dnode/inc/dnodeMgmt.h | 23 +- src/dnode/inc/dnodeMnode.h | 30 +++ src/dnode/inc/dnodeModule.h | 10 +- src/dnode/inc/dnodeRead.h | 31 +-- src/dnode/inc/dnodeShell.h | 20 +- src/dnode/inc/dnodeSystem.h | 21 +- src/dnode/inc/dnodeVnodeMgmt.h | 77 ------ src/dnode/inc/dnodeWrite.h | 39 +-- src/dnode/src/dnodeMClient.c | 72 ++++++ src/dnode/src/dnodeMgmt.c | 426 ++++++++++++--------------------- src/dnode/src/dnodeMnode.c | 87 +++++++ src/dnode/src/dnodeRead.c | 208 +++++++++++++--- src/dnode/src/dnodeShell.c | 192 +++------------ src/dnode/src/dnodeSystem.c | 16 +- src/dnode/src/dnodeVnodeMgmt.c | 64 ----- src/dnode/src/dnodeWrite.c | 276 ++++++++++++++++----- src/mnode/src/mgmtDnodeInt.c | 2 +- 19 files changed, 840 insertions(+), 788 deletions(-) create mode 100644 src/dnode/inc/dnodeMClient.h create mode 100644 src/dnode/inc/dnodeMnode.h delete mode 100644 src/dnode/inc/dnodeVnodeMgmt.h create mode 100644 src/dnode/src/dnodeMClient.c create mode 100644 src/dnode/src/dnodeMnode.c delete mode 100644 src/dnode/src/dnodeVnodeMgmt.c diff --git a/src/dnode/CMakeLists.txt b/src/dnode/CMakeLists.txt index af84071849..bcde03664d 100644 --- a/src/dnode/CMakeLists.txt +++ b/src/dnode/CMakeLists.txt @@ -10,7 +10,7 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) AUX_SOURCE_DIRECTORY(src SRC) ADD_EXECUTABLE(taosd ${SRC}) - TARGET_LINK_LIBRARIES(taosd mnode sdb taos_static monitor http tsdb) + TARGET_LINK_LIBRARIES(taosd mnode sdb taos_static monitor http) #IF (TD_CLUSTER) # TARGET_LINK_LIBRARIES(taosd dcluster) @@ -23,7 +23,7 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) COMMAND echo "make test directory" DEPENDS taosd COMMAND ${CMAKE_COMMAND} -E make_directory ${TD_TESTS_OUTPUT_DIR}/cfg/ - COMMAND ${CMAKE_COMMAND} -E make_directory ${TD_TESTS_OUTPUT_DIR}/log/ + COMMAND ${CMAKE_COMMAND} -E make_directoryF ${TD_TESTS_OUTPUT_DIR}/log/ COMMAND ${CMAKE_COMMAND} -E make_directory ${TD_TESTS_OUTPUT_DIR}/data/ COMMAND ${CMAKE_COMMAND} -E echo dataDir ${TD_TESTS_OUTPUT_DIR}/data > ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg COMMAND ${CMAKE_COMMAND} -E echo logDir ${TD_TESTS_OUTPUT_DIR}/log >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg diff --git a/src/dnode/inc/dnodeMClient.h b/src/dnode/inc/dnodeMClient.h new file mode 100644 index 0000000000..391e8da2c1 --- /dev/null +++ b/src/dnode/inc/dnodeMClient.h @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_DNODE_MCLIENT_H +#define TDENGINE_DNODE_MCLIENT_H + +#ifdef __cplusplus +extern "C" { +#endif + +int32_t dnodeInitMClient(); +void dnodeCleanupMClient(); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/src/dnode/inc/dnodeMgmt.h b/src/dnode/inc/dnodeMgmt.h index 7a67d7dbf2..e4f0a00664 100644 --- a/src/dnode/inc/dnodeMgmt.h +++ b/src/dnode/inc/dnodeMgmt.h @@ -20,18 +20,17 @@ extern "C" { #endif -#include -#include - -int32_t dnodeInitMgmt(); -void dnodeInitMgmtIp(); - -void dnodeProcessMsgFromMgmt(char msgType, void *pCont, int32_t contLen, void *pConn, int32_t code); -void dnodeSendMsgToMnode(int8_t msgType, void *pCont, int32_t contLen); -void dnodeSendRspToMnode(void *pConn, int8_t msgType, int32_t code, void *pCont, int32_t contLen); - -void dnodeSendVnodeCfgMsg(int32_t vnode); -void dnodeSendTableCfgMsg(int32_t vnode, int32_t sid); +int dnodeInitMgmt(); +void dnodeCleanupMgmt(); +void dnodeMgmt(SRpcMsg *); + +void* dnodeGetVnode(int vgId); +int dnodeGetVnodeStatus(void *); +void* dnodeGetVnodeRworker(void *); +void* dnodeGetVnodeWworker(void *); +void* dnodeGetVnodeWal(void *); +void* dnodeGetVnodeTsdb(void *); +void dnodeReleaseVnode(void *); #ifdef __cplusplus } diff --git a/src/dnode/inc/dnodeMnode.h b/src/dnode/inc/dnodeMnode.h new file mode 100644 index 0000000000..76a65a06c9 --- /dev/null +++ b/src/dnode/inc/dnodeMnode.h @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_DNODE_MNODE_H +#define TDENGINE_DNODE_MNODE_H + +#ifdef __cplusplus +extern "C" { +#endif + +int32_t dnodeInitMnode(); +void dnodeCleanupMnode(); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/src/dnode/inc/dnodeModule.h b/src/dnode/inc/dnodeModule.h index 8f1a3cce78..1ad97034aa 100644 --- a/src/dnode/inc/dnodeModule.h +++ b/src/dnode/inc/dnodeModule.h @@ -20,15 +20,9 @@ extern "C" { #endif -#include -#include -#include - -void dnodeAllocModules(); +void dnodeAllocModules(); int32_t dnodeInitModules(); -void dnodeCleanUpModules(); - -extern void (*dnodeStartModules)(); +void dnodeCleanUpModules(); #ifdef __cplusplus } diff --git a/src/dnode/inc/dnodeRead.h b/src/dnode/inc/dnodeRead.h index ce73ecac85..a5c7be74b5 100644 --- a/src/dnode/inc/dnodeRead.h +++ b/src/dnode/inc/dnodeRead.h @@ -20,31 +20,12 @@ extern "C" { #endif -#include -#include -#include "taosdef.h" -#include "taosmsg.h" - -/* - * handle query message, and the result is returned by callback function - */ -void dnodeQueryData(SQueryTableMsg *pQuery, void *pConn, void (*callback)(int32_t code, void *pQInfo, void *pConn)); - -/* - * Dispose retrieve msg, and the result will passed through callback function - */ -typedef void (*SDnodeRetrieveCallbackFp)(int32_t code, void *pQInfo, void *pConn); -void dnodeRetrieveData(SRetrieveTableMsg *pRetrieve, void *pConn, SDnodeRetrieveCallbackFp callbackFp); - -/* - * Fill retrieve result according to query info - */ -int32_t dnodeGetRetrieveData(void *pQInfo, SRetrieveTableRsp *pRetrieve); - -/* - * Get the size of retrieve result according to query info - */ -int32_t dnodeGetRetrieveDataSize(void *pQInfo); +int dnodeInitRead(); +void dnodeCleanupRead(); +void dnodeRead(SRpcMsg *); +void *dnodeAllocateReadWorker(); +void dnodeFreeReadWorker(void *rqueue); + #ifdef __cplusplus } diff --git a/src/dnode/inc/dnodeShell.h b/src/dnode/inc/dnodeShell.h index a1fa8875ab..300c86c599 100644 --- a/src/dnode/inc/dnodeShell.h +++ b/src/dnode/inc/dnodeShell.h @@ -20,26 +20,8 @@ extern "C" { #endif -#include -#include -#include "dnode.h" - -typedef struct { - int sid; - uint32_t ip; - uint16_t port; - int32_t count; // track the number of imports - int32_t code; // track the code of imports - int32_t numOfTotalPoints; // track the total number of points imported - void *thandle; // handle from TAOS layer - void *qhandle; -} SShellObj; - int32_t dnodeInitShell(); - -void dnodeCleanupShell(); - -//SDnodeStatisInfo dnodeGetStatisInfo() +void dnodeCleanupShell(); #ifdef __cplusplus } diff --git a/src/dnode/inc/dnodeSystem.h b/src/dnode/inc/dnodeSystem.h index 7aeb26b54f..9c56b8db5d 100644 --- a/src/dnode/inc/dnodeSystem.h +++ b/src/dnode/inc/dnodeSystem.h @@ -20,34 +20,15 @@ extern "C" { #endif -#include -#include - typedef enum { TSDB_DNODE_RUN_STATUS_INITIALIZE, TSDB_DNODE_RUN_STATUS_RUNING, TSDB_DNODE_RUN_STATUS_STOPPED } SDnodeRunStatus; -extern int32_t (*dnodeInitPeers)(int32_t numOfThreads); -extern int32_t (*dnodeCheckSystem)(); -extern int32_t (*dnodeInitStorage)(); -extern void (*dnodeCleanupStorage)(); -extern int32_t tsMaxQueues; -extern void ** tsRpcQhandle; -extern void *tsQueryQhandle; -extern void *tsDnodeMgmtQhandle; -extern void *tsDnodeTmr; - int32_t dnodeInitSystem(); -void dnodeCleanUpSystem(); -void dnodeInitPlugins(); - +void dnodeCleanUpSystem(); SDnodeRunStatus dnodeGetRunStatus(); -void dnodeSetRunStatus(SDnodeRunStatus status); -void dnodeCheckDataDirOpenned(const char *dir); -void dnodeLockVnodes(); -void dnodeUnLockVnodes(); #ifdef __cplusplus } diff --git a/src/dnode/inc/dnodeVnodeMgmt.h b/src/dnode/inc/dnodeVnodeMgmt.h deleted file mode 100644 index a60d74425b..0000000000 --- a/src/dnode/inc/dnodeVnodeMgmt.h +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef TDENGINE_DNODE_VNODE_MGMT_H -#define TDENGINE_DNODE_VNODE_MGMT_H - -#ifdef __cplusplus -extern "C" { -#endif - -#include -#include -#include "taosdef.h" -#include "taosmsg.h" -#include "tstatus.h" - -/* - * Open all Vnodes in the local data directory - */ -int32_t dnodeOpenVnodes(); - -/* - * Close all Vnodes that have been created and opened - */ -int32_t dnodeCleanupVnodes(); - -/* - * Check if vnode already exists - */ -bool dnodeCheckVnodeExist(int32_t vid); - -/* - * Create vnode with specified configuration and open it - * if exist, config it - */ -int32_t dnodeCreateVnode(SCreateVnodeMsg *pVnode); - -/* - * Remove vnode from local repository - */ -int32_t dnodeDropVnode(int32_t vnode); - -/* - * Get the vnode object that has been opened - */ -//tsdb_repo_t* dnodeGetVnode(int vid); -void* dnodeGetVnode(int32_t vnode); - -int32_t dnodeGetVnodesNum(); - -/* - * get the status of vnode - */ -EVnodeStatus dnodeGetVnodeStatus(int32_t vnode); - -/* - * Check if vnode already exists, and table exist in this vnode - */ -bool dnodeCheckTableExist(int32_t vnode, int32_t sid, int64_t uid); - -#ifdef __cplusplus -} -#endif - -#endif diff --git a/src/dnode/inc/dnodeWrite.h b/src/dnode/inc/dnodeWrite.h index 42c94a440c..51340fe1c2 100644 --- a/src/dnode/inc/dnodeWrite.h +++ b/src/dnode/inc/dnodeWrite.h @@ -20,41 +20,12 @@ extern "C" { #endif -#include -#include -#include "taosdef.h" -#include "taosmsg.h" +int dnodeInitWrite(); +void dnodeCleanupWrite(); +void dnodeWrite(SRpcMsg *pMsg); +void *dnodeAllocateWriteWorker(); +void dnodeFreeWriteWorker(void *worker); -/* - * Write data based on dnode, the detail result can be fetched from rsponse - * pSubmit: Data to be written - * pConn: Communication handle - * callback: Pass the write result through a callback function, possibly in a different thread space - * rsp: will not be freed by callback function - */ -void dnodeWriteData(SShellSubmitMsg *pSubmit, void *pConn, void (*callback)(SShellSubmitRspMsg *rsp, void *pConn)); - -/* - * Create table with specified configuration and open it - * if table already exist, update its schema and tag - */ -int32_t dnodeCreateTable(SDCreateTableMsg *pTable); - -/* - * Remove table from local repository - */ -int32_t dnodeDropTable(SDRemoveTableMsg *pTable); - -/* - * Create stream - * if stream already exist, update it - */ -int32_t dnodeCreateStream(SDAlterStreamMsg *pStream); - -/* - * Remove all child tables of supertable from local repository - */ -int32_t dnodeDropSuperTable(SDRemoveSuperTableMsg *pStable); #ifdef __cplusplus } diff --git a/src/dnode/src/dnodeMClient.c b/src/dnode/src/dnodeMClient.c new file mode 100644 index 0000000000..99aaa0bd37 --- /dev/null +++ b/src/dnode/src/dnodeMClient.c @@ -0,0 +1,72 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "os.h" +#include "taosmsg.h" +#include "tlog.h" +#include "trpc.h" +#include "dnodeSystem.h" + +static void (*dnodeProcessMgmtRspFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *); +static void dnodeProcessRspFromMnode(SRpcMsg *pMsg); +static void dnodeProcessStatusRsp(SRpcMsg *pMsg); +static void *tsDnodeMClientRpc; + +int32_t dnodeInitMClient() { + dnodeProcessMgmtRspFp[TSDB_MSG_TYPE_STATUS_RSP] = dnodeProcessStatusRsp; + + SRpcInit rpcInit; + memset(&rpcInit, 0, sizeof(rpcInit)); + rpcInit.localIp = tsAnyIp ? "0.0.0.0" : tsPrivateIp; + rpcInit.localPort = 0; + rpcInit.label = "DND-MC"; + rpcInit.numOfThreads = 1; + rpcInit.cfp = dnodeProcessRspFromMnode; + rpcInit.sessions = TSDB_SESSIONS_PER_DNODE; + rpcInit.connType = TAOS_CONN_CLIENT; + rpcInit.idleTime = tsShellActivityTimer * 1000; + + tsDnodeMClientRpc = rpcOpen(&rpcInit); + if (tsDnodeMClientRpc == NULL) { + dError("failed to init connection from mgmt"); + return -1; + } + + dPrint("client connection to mgmt is opened"); + return 0; +} + +void dnodeCleanupMClient() { + if (tsDnodeMClientRpc) { + rpcClose(tsDnodeMClientRpc); + } +} + +static void dnodeProcessRspFromMnode(SRpcMsg *pMsg) { + + if (dnodeProcessMgmtRspFp[pMsg->msgType]) { + (*dnodeProcessMgmtRspFp[pMsg->msgType])(pMsg); + } else { + dError("%s is not processed", taosMsg[pMsg->msgType]); + } + + rpcFreeCont(pMsg->pCont); +} + +static void dnodeProcessStatusRsp(SRpcMsg *pMsg) { + + +} diff --git a/src/dnode/src/dnodeMgmt.c b/src/dnode/src/dnodeMgmt.c index 1e7af8d094..8712b3a692 100644 --- a/src/dnode/src/dnodeMgmt.c +++ b/src/dnode/src/dnodeMgmt.c @@ -15,327 +15,219 @@ #define _DEFAULT_SOURCE #include "os.h" -#include "taosmsg.h" #include "tlog.h" +#include "taoserror.h" +#include "taosmsg.h" #include "trpc.h" -#include "tsched.h" -#include "tsystem.h" -#include "mnode.h" -#include "dnode.h" -#include "dnodeSystem.h" -#include "dnodeMgmt.h" #include "dnodeWrite.h" -#include "dnodeVnodeMgmt.h" - -void (*dnodeInitMgmtIpFp)() = NULL; -int32_t (*dnodeInitMgmtFp)() = NULL; -void (*dnodeCleanUpMgmtFp)() = NULL; -void (*dnodeProcessStatusRspFp)(void *pCont, int32_t contLen, int8_t msgType, void *pConn) = NULL; -void (*dnodeSendMsgToMnodeFp)(int8_t msgType, void *pCont, int32_t contLen) = NULL; -void (*dnodeSendRspToMnodeFp)(void *handle, int32_t code, void *pCont, int contLen) = NULL; - -static void *tsStatusTimer = NULL; -static void (*dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MAX])(void *pCont, int32_t contLen, int8_t msgType, void *pConn); -static void dnodeInitProcessShellMsg(); - -static void dnodeSendMsgToMnodeQueueFp(SSchedMsg *sched) { - int32_t contLen = *(int32_t *) (sched->msg - 4); - int32_t code = *(int32_t *) (sched->msg - 8); - int8_t msgType = *(int8_t *) (sched->msg - 9); - void *handle = sched->ahandle; - int8_t *pCont = sched->msg; - - mgmtProcessMsgFromDnode(msgType, pCont, contLen, handle, code); +#include "dnodeRead.h" +#include "dnodeMgmt.h" + +typedef struct { + int32_t vgId; // global vnode group ID + int status; // status: master, slave, notready, deleting + int refCount; // reference count + int64_t version; + void *wworker; + void *rworker; + void *wal; + void *tsdb; + void *replica; + void *events; + void *cq; // continuous query +} SVnodeObj; + +static int dnodeOpenVnodes(); +static void dnodeCleanupVnodes(); +static int dnodeCreateVnode(int32_t vgId, SCreateVnodeMsg *cfg); +static int dnodeDropVnode(SVnodeObj *pVnode); +static void dnodeRemoveVnode(SVnodeObj *pVnode); + +static void dnodeProcessCreateVnodeMsg(SRpcMsg *pMsg); +static void dnodeProcessDropVnodeMsg(SRpcMsg *pMsg); +static void dnodeProcessAlterVnodeMsg(SRpcMsg *pMsg); +static void (*dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *pMsg); + +int dnodeInitMgmt() { + dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_FREE_VNODE] = dnodeProcessDropVnodeMsg; } -void dnodeSendMsgToMnode(int8_t msgType, void *pCont, int32_t contLen) { - dTrace("msg:%d:%s is sent to mnode", msgType, taosMsg[msgType]); - if (dnodeSendMsgToMnodeFp) { - dnodeSendMsgToMnodeFp(msgType, pCont, contLen); - } else { - if (pCont == NULL) { - pCont = rpcMallocCont(1); - contLen = 0; - } - SSchedMsg schedMsg = {0}; - schedMsg.fp = dnodeSendMsgToMnodeQueueFp; - schedMsg.msg = pCont; - *(int32_t *) (pCont - 4) = contLen; - *(int32_t *) (pCont - 8) = TSDB_CODE_SUCCESS; - *(int8_t *) (pCont - 9) = msgType; - taosScheduleTask(tsDnodeMgmtQhandle, &schedMsg); - } +void dnodeCleanupMgmt() { + } -void dnodeSendRspToMnode(void *pConn, int8_t msgType, int32_t code, void *pCont, int32_t contLen) { - dTrace("rsp:%d:%s is sent to mnode, pConn:%p", msgType, taosMsg[msgType], pConn); - if (dnodeSendRspToMnodeFp) { - dnodeSendRspToMnodeFp(pConn, code, pCont, contLen); +void dnodeMgmt(SRpcMsg *pMsg) { + + terrno = 0; + + if (dnodeProcessMgmtMsgFp[pMsg->msgType]) { + (*dnodeProcessMgmtMsgFp[pMsg->msgType])(pMsg); } else { - //hack way - if (pCont == NULL) { - pCont = rpcMallocCont(1); - contLen = 0; - } - SSchedMsg schedMsg = {0}; - schedMsg.fp = dnodeSendMsgToMnodeQueueFp; - schedMsg.msg = pCont; - schedMsg.ahandle = pConn; - *(int32_t *) (pCont - 4) = contLen; - *(int32_t *) (pCont - 8) = code; - *(int8_t *) (pCont - 9) = msgType; - taosScheduleTask(tsDnodeMgmtQhandle, &schedMsg); + terrno = TSDB_CODE_MSG_NOT_PROCESSED; } + + SRpcMsg rsp; + rsp.handle = pMsg->handle; + rsp.code = terrno; + rsp.pCont = NULL; + rpcSendResponse(&rsp); + rpcFreeCont(pMsg->pCont); // free the received message } + +void *dnodeGetVnode(int vgId) { + SVnodeObj *pVnode; -void dnodeSendStatusMsgToMgmt(void *handle, void *tmrId) { - taosTmrReset(dnodeSendStatusMsgToMgmt, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer); - if (tsStatusTimer == NULL) { - dError("Failed to start status timer"); - return; - } + // retrieve the pVnode from vgId - int32_t contLen = sizeof(SStatusMsg) + dnodeGetVnodesNum() * sizeof(SVnodeLoad); - SStatusMsg *pStatus = rpcMallocCont(contLen); - if (pStatus == NULL) { - dError("Failed to malloc status message"); - return; - } + + // if (pVnode->status == ....) { + // terrno = ; + // return NULL; + // } - int32_t totalVnodes = dnodeGetVnodesNum(); - - pStatus->version = htonl(tsVersion); - pStatus->privateIp = htonl(inet_addr(tsPrivateIp)); - pStatus->publicIp = htonl(inet_addr(tsPublicIp)); - pStatus->lastReboot = htonl(tsRebootTime); - pStatus->numOfTotalVnodes = htons((uint16_t) tsNumOfTotalVnodes); - pStatus->openVnodes = htons((uint16_t) totalVnodes); - pStatus->numOfCores = htons((uint16_t) tsNumOfCores); - pStatus->diskAvailable = tsAvailDataDirGB; - pStatus->alternativeRole = (uint8_t) tsAlternativeRole; - - SVnodeLoad *pLoad = (SVnodeLoad *)pStatus->load; - - //TODO loop all vnodes -// for (int32_t vnode = 0, count = 0; vnode <= totalVnodes; ++vnode) { -// if (vnodeList[vnode].cfg.maxSessions <= 0) continue; -// -// SVnodeObj *pVnode = vnodeList + vnode; -// pLoad->vnode = htonl(vnode); -// pLoad->vgId = htonl(pVnode->cfg.vgId); -// pLoad->status = (uint8_t)vnodeList[vnode].vnodeStatus; -// pLoad->syncStatus =(uint8_t)vnodeList[vnode].syncStatus; -// pLoad->accessState = (uint8_t)(pVnode->accessState); -// pLoad->totalStorage = htobe64(pVnode->vnodeStatistic.totalStorage); -// pLoad->compStorage = htobe64(pVnode->vnodeStatistic.compStorage); -// if (pVnode->vnodeStatus == TSDB_VN_STATUS_MASTER) { -// pLoad->pointsWritten = htobe64(pVnode->vnodeStatistic.pointsWritten); -// } else { -// pLoad->pointsWritten = htobe64(0); -// } -// pLoad++; -// -// if (++count >= tsOpenVnodes) { -// break; -// } -// } - - dnodeSendMsgToMnode(TSDB_MSG_TYPE_STATUS, pStatus, contLen); + atomic_add_fetch_32(&pVnode->refCount, 1); + return pVnode; } +int dnodeGetVnodeStatus(void *pVnode) { + return ((SVnodeObj *)pVnode)->status; +} -int32_t dnodeInitMgmt() { - if (dnodeInitMgmtFp) { - dnodeInitMgmtFp(); - } - - dnodeInitProcessShellMsg(); - taosTmrReset(dnodeSendStatusMsgToMgmt, 500, NULL, tsDnodeTmr, &tsStatusTimer); - return 0; +void *dnodeGetVnodeWworker(void *pVnode) { + return ((SVnodeObj *)pVnode)->wworker; +} + +void *dnodeGetVnodeRworker(void *pVnode) { + return ((SVnodeObj *)pVnode)->rworker; +} + +void *dnodeGetVnodeWal(void *pVnode) { + return ((SVnodeObj *)pVnode)->wal; } -void dnodeInitMgmtIp() { - if (dnodeInitMgmtIpFp) { - dnodeInitMgmtIpFp(); - } +void *dnodeGetVnodeTsdb(void *pVnode) { + return ((SVnodeObj *)pVnode)->tsdb; } -void dnodeCleanUpMgmt() { - if (tsStatusTimer != NULL) { - taosTmrStopA(&tsStatusTimer); - tsStatusTimer = NULL; - } +void dnodeReleaseVnode(void *param) { + SVnodeObj *pVnode = (SVnodeObj *)param; - if (dnodeCleanUpMgmtFp) { - dnodeCleanUpMgmtFp(); - } + int refCount = atomic_sub_fetch_32(&pVnode->refCount, 1); + if (refCount == 0) dnodeRemoveVnode(pVnode); } -void dnodeProcessMsgFromMgmt(char msgType, void *pCont, int32_t contLen, void *pConn, int32_t code) { - if (msgType < 0 || msgType >= TSDB_MSG_TYPE_MAX) { - dError("invalid msg type:%d", msgType); - return; - } +static int dnodeOpenVnode() { + SVnodeObj *pVnode; - dTrace("msg:%d:%s is received from mgmt, pConn:%p", msgType, taosMsg[(int8_t)msgType], pConn); + // create tsdb - if (msgType == TSDB_MSG_TYPE_STATUS_RSP && dnodeProcessStatusRspFp != NULL) { - dnodeProcessStatusRspFp(pCont, contLen, msgType, pConn); - } - if (dnodeProcessMgmtMsgFp[msgType]) { - (*dnodeProcessMgmtMsgFp[msgType])(pCont, contLen, msgType, pConn); - } else { - dError("%s is not processed", taosMsg[msgType]); - } + // create wal - //rpcFreeCont(pCont); -} + // allocate write worker + pVnode->wworker = dnodeAllocateWriteWorker(); -static void dnodeProcessCreateTableRequest(void *pCont, int32_t contLen, int8_t msgType, void *pConn) { - SDCreateTableMsg *pTable = pCont; - pTable->numOfColumns = htons(pTable->numOfColumns); - pTable->numOfTags = htons(pTable->numOfTags); - pTable->sid = htonl(pTable->sid); - pTable->sversion = htonl(pTable->sversion); - pTable->tagDataLen = htonl(pTable->tagDataLen); - pTable->sqlDataLen = htonl(pTable->sqlDataLen); - pTable->contLen = htonl(pTable->contLen); - pTable->numOfVPeers = htonl(pTable->numOfVPeers); - pTable->uid = htobe64(pTable->uid); - pTable->superTableUid = htobe64(pTable->superTableUid); - pTable->createdTime = htobe64(pTable->createdTime); - - for (int i = 0; i < pTable->numOfVPeers; ++i) { - pTable->vpeerDesc[i].ip = htonl(pTable->vpeerDesc[i].ip); - pTable->vpeerDesc[i].vnode = htonl(pTable->vpeerDesc[i].vnode); - } + // create read queue + pVnode->rworker = dnodeAllocateReadWorker(); - int32_t totalCols = pTable->numOfColumns + pTable->numOfTags; - SSchema *pSchema = (SSchema *) pTable->data; - for (int32_t col = 0; col < totalCols; ++col) { - pSchema->bytes = htons(pSchema->bytes); - pSchema->colId = htons(pSchema->colId); - pSchema++; - } + // create the replica - int32_t code = dnodeCreateTable(pTable); - dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0); -} + // set the status -static void dnodeProcessAlterStreamRequest(void *pCont, int32_t contLen, int8_t msgType, void *pConn) { - SDAlterStreamMsg *pStream = pCont; - pStream->uid = htobe64(pStream->uid); - pStream->stime = htobe64(pStream->stime); - pStream->vnode = htonl(pStream->vnode); - pStream->sid = htonl(pStream->sid); - pStream->status = htonl(pStream->status); + pVnode->refCount = 1; - int32_t code = dnodeCreateStream(pStream); - dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0); + return 0; } -static void dnodeProcessRemoveTableRequest(void *pCont, int32_t contLen, int8_t msgType, void *pConn) { - SDRemoveTableMsg *pTable = pCont; - pTable->sid = htonl(pTable->sid); - pTable->numOfVPeers = htonl(pTable->numOfVPeers); - pTable->uid = htobe64(pTable->uid); +static int dnodeOpenVnodes() { + return 0; +} - for (int i = 0; i < pTable->numOfVPeers; ++i) { - pTable->vpeerDesc[i].ip = htonl(pTable->vpeerDesc[i].ip); - pTable->vpeerDesc[i].vnode = htonl(pTable->vpeerDesc[i].vnode); - } +static void dnodeCleanupVnode() { - int32_t code = dnodeDropTable(pTable); - dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0); } -static void dnodeProcessVPeerCfgRsp(void *pCont, int32_t contLen, int8_t msgType, void *pConn) { - int32_t code = htonl(*((int32_t *) pCont)); - - if (code == TSDB_CODE_SUCCESS) { - SCreateVnodeMsg *pVnode = (SCreateVnodeMsg *) (pCont + sizeof(int32_t)); - dnodeCreateVnode(pVnode); - } else if (code == TSDB_CODE_INVALID_VNODE_ID) { - SFreeVnodeMsg *vpeer = (SFreeVnodeMsg *) (pCont + sizeof(int32_t)); - int32_t vnode = htonl(vpeer->vnode); - dError("vnode:%d, not exist, remove it", vnode); - dnodeDropVnode(vnode); - } else { - dError("code:%d invalid message", code); - } -} +static void dnodeCleanupVnodes() { -static void dnodeProcessTableCfgRsp(void *pCont, int32_t contLen, int8_t msgType, void *pConn) { - int32_t code = htonl(*((int32_t *) pCont)); - - if (code == TSDB_CODE_SUCCESS) { - SDCreateTableMsg *table = (SDCreateTableMsg *) (pCont + sizeof(int32_t)); - dnodeCreateTable(table); - } else if (code == TSDB_CODE_INVALID_TABLE_ID) { - SDRemoveTableMsg *pTable = (SDRemoveTableMsg *) (pCont + sizeof(int32_t)); - pTable->sid = htonl(pTable->sid); - pTable->uid = htobe64(pTable->uid); - dError("table:%s, sid:%d table is not configured, remove it", pTable->tableId, pTable->sid); - dnodeDropTable(pTable); - } else { - dError("code:%d invalid message", code); - } } -static void dnodeProcessCreateVnodeRequest(void *pCont, int32_t contLen, int8_t msgType, void *pConn) { - SCreateVnodeMsg *pVnode = (SCreateVnodeMsg *) pCont; +static int dnodeCreateVnode(int32_t vgId, SCreateVnodeMsg *cfg) { - int32_t code = dnodeCreateVnode(pVnode); - dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0); + SVnodeObj *pVnode = malloc(sizeof(SVnodeObj)); + + // save the vnode info in non-volatile storage + + // add into hash, so it can be retrieved + dnodeOpenVnode(pVnode); + + return 0; } -static void dnodeProcessFreeVnodeRequest(void *pCont, int32_t contLen, int8_t msgType, void *pConn) { - SFreeVnodeMsg *pVnode = (SFreeVnodeMsg *) pCont; - int32_t vnode = htonl(pVnode->vnode); +static void dnodeRemoveVnode(SVnodeObj *pVnode) { + + // remove replica + + // remove read queue + dnodeFreeReadWorker(pVnode->rworker); + + // remove write queue + dnodeFreeWriteWorker(pVnode->wworker); + + // remove wal + + // remove tsdb - int32_t code = dnodeDropVnode(vnode); - dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0); } -static void dnodeProcessDnodeCfgRequest(void *pCont, int32_t contLen, int8_t msgType, void *pConn) { - SCfgDnodeMsg *pCfg = (SCfgDnodeMsg *)pCont; +static int dnodeDropVnode(SVnodeObj *pVnode) { + + int count = atomic_sub_fetch_32(&pVnode->refCount, 1); - int32_t code = tsCfgDynamicOptions(pCfg->config); - dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0); + if (count<=0) dnodeRemoveVnode(pVnode); + + return 0; } -static void dnodeProcessDropStableRequest(void *pCont, int32_t contLen, int8_t msgType, void *pConn) { - dnodeSendRspToMnode(pConn, msgType + 1, TSDB_CODE_SUCCESS, NULL, 0); +static void dnodeProcessCreateVnodeMsg(SRpcMsg *pMsg) { + +// SVnodeObj *pVnode; +// int vgId; +// SVPeersMsg *pCfg; + + // check everything, if not ok, set terrno; + + + // everything is ok + +// dnodeCreateVnode(vgId, pCfg); + + //if (pVnode == NULL) terrno = TSDB_CODE } -void dnodeSendVnodeCfgMsg(int32_t vnode) { - SVpeerCfgMsg *cfg = (SVpeerCfgMsg *) rpcMallocCont(sizeof(SVpeerCfgMsg)); - if (cfg == NULL) { - return; - } +static void dnodeProcessDropVnodeMsg(SRpcMsg *pMsg) { - cfg->vnode = htonl(vnode); - dnodeSendMsgToMnode(TSDB_MSG_TYPE_VNODE_CFG, cfg, sizeof(SVpeerCfgMsg)); + SVnodeObj *pVnode; + int vgId; + + // check everything, if not ok, set terrno; + + + // everything is ok + dnodeDropVnode(pVnode); + + //if (pVnode == NULL) terrno = TSDB_CODE } -void dnodeSendTableCfgMsg(int32_t vnode, int32_t sid) { - STableCfgMsg *cfg = (STableCfgMsg *) rpcMallocCont(sizeof(STableCfgMsg)); - if (cfg == NULL) { - return; - } +static void dnodeProcessAlterVnodeMsg(SRpcMsg *pMsg) { + + SVnodeObj *pVnode; + int vgId; + + // check everything, if not ok, set terrno; + + + // everything is ok +// dnodeAlterVnode(pVnode); - cfg->vnode = htonl(vnode); - dnodeSendMsgToMnode(TSDB_MSG_TYPE_TABLE_CFG, cfg, sizeof(STableCfgMsg)); + //if (pVnode == NULL) terrno = TSDB_CODE } -static void dnodeInitProcessShellMsg() { - dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_DNODE_CREATE_TABLE] = dnodeProcessCreateTableRequest; - dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_DNODE_REMOVE_TABLE] = dnodeProcessRemoveTableRequest; - dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_CREATE_VNODE] = dnodeProcessCreateVnodeRequest; - dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_FREE_VNODE] = dnodeProcessFreeVnodeRequest; - dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_DNODE_CFG] = dnodeProcessDnodeCfgRequest; - dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_ALTER_STREAM] = dnodeProcessAlterStreamRequest; - dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_DROP_STABLE] = dnodeProcessDropStableRequest; - dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_VNODE_CFG_RSP] = dnodeProcessVPeerCfgRsp; - dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_TABLE_CFG_RSP] = dnodeProcessTableCfgRsp; -} \ No newline at end of file diff --git a/src/dnode/src/dnodeMnode.c b/src/dnode/src/dnodeMnode.c new file mode 100644 index 0000000000..1b51c70cfc --- /dev/null +++ b/src/dnode/src/dnodeMnode.c @@ -0,0 +1,87 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "os.h" +#include "taosmsg.h" +#include "tlog.h" +#include "trpc.h" +#include "dnodeSystem.h" +#include "dnodeMgmt.h" +#include "dnodeWrite.h" + +static void (*dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *); +static void dnodeProcessMsgFromMnode(SRpcMsg *pMsg); +static void *tsDnodeMnodeRpc = NULL; + +int32_t dnodeInitMnode() { + dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_DNODE_CREATE_TABLE] = dnodeWrite; + dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_DNODE_REMOVE_TABLE] = dnodeWrite; + dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_FREE_VNODE] = dnodeMgmt; + + SRpcInit rpcInit; + memset(&rpcInit, 0, sizeof(rpcInit)); + rpcInit.localIp = tsAnyIp ? "0.0.0.0" : tsPrivateIp; + + // note: a new port shall be assigned + // rpcInit.localPort = tsDnodeMnodePort; + rpcInit.label = "DND-mgmt"; + rpcInit.numOfThreads = 1; + rpcInit.cfp = dnodeProcessMsgFromMnode; + rpcInit.sessions = TSDB_SESSIONS_PER_DNODE; + rpcInit.connType = TAOS_CONN_SERVER; + rpcInit.idleTime = tsShellActivityTimer * 1500; + + tsDnodeMnodeRpc = rpcOpen(&rpcInit); + if (tsDnodeMnodeRpc == NULL) { + dError("failed to init connection from mgmt"); + return -1; + } + + dPrint("connection to mgmt is opened"); + return 0; +} + +void dnodeCleanupMnode() { + if (tsDnodeMnodeRpc) { + rpcClose(tsDnodeMnodeRpc); + } +} + +static void dnodeProcessMsgFromMnode(SRpcMsg *pMsg) { + SRpcMsg rspMsg; + + rspMsg.handle = pMsg->handle; + rspMsg.pCont = NULL; + rspMsg.contLen = 0; + + if (dnodeGetRunStatus() != TSDB_DNODE_RUN_STATUS_RUNING) { + rspMsg.code = TSDB_CODE_NOT_READY; + rpcSendResponse(&rspMsg); + rpcFreeCont(pMsg->pCont); + dTrace("conn:%p, query msg is ignored since dnode not running", pMsg->handle); + return; + } + + if (dnodeProcessMgmtMsgFp[pMsg->msgType]) { + (*dnodeProcessMgmtMsgFp[pMsg->msgType])(pMsg); + } else { + dError("%s is not processed", taosMsg[pMsg->msgType]); + rspMsg.code = TSDB_CODE_MSG_NOT_PROCESSED; + rpcSendResponse(&rspMsg); + rpcFreeCont(pMsg->pCont); + } +} + + diff --git a/src/dnode/src/dnodeRead.c b/src/dnode/src/dnodeRead.c index 827e599806..8175bea691 100644 --- a/src/dnode/src/dnodeRead.c +++ b/src/dnode/src/dnodeRead.c @@ -17,56 +17,196 @@ #include "os.h" #include "taoserror.h" #include "tlog.h" -#include "tsched.h" -#include "dnode.h" +#include "trpc.h" +#include "taosmsg.h" +#include "tqueue.h" #include "dnodeRead.h" -#include "dnodeSystem.h" +#include "dnodeMgmt.h" -void dnodeQueryData(SQueryTableMsg *pQuery, void *pConn, void (*callback)(int32_t code, void *pQInfo, void *pConn)) { - dTrace("conn:%p, query msg is disposed", pConn); - void *pQInfo = 100; - callback(TSDB_CODE_SUCCESS, pQInfo, pConn); +typedef struct { + int32_t code; + int32_t count; + int32_t numOfVnodes; +} SRpcContext; + +typedef struct { + void *pCont; + int contLen; + SRpcMsg rpcMsg; + void *pVnode; + SRpcContext *pRpcContext; // RPC message context +} SReadMsg; + +static void *dnodeProcessReadQueue(void *param); +static void dnodeProcessReadResult(SReadMsg *pRead); +static void dnodeHandleIdleReadWorker(); +static void dnodeProcessQueryMsg(SReadMsg *pMsg); +static void dnodeProcessRetrieveMsg(SReadMsg *pMsg); +static void (*dnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SReadMsg *pNode); + +// module global variable +static taos_qset readQset; +static int threads; // number of query threads +static int maxThreads; +static int minThreads; + +int dnodeInitRead() { + dnodeProcessReadMsgFp[TSDB_MSG_TYPE_QUERY] = dnodeProcessQueryMsg; + dnodeProcessReadMsgFp[TSDB_MSG_TYPE_RETRIEVE] = dnodeProcessRetrieveMsg; + + readQset = taosOpenQset(); + + minThreads = 3; + maxThreads = tsNumOfCores*tsNumOfThreadsPerCore; + if (maxThreads <= minThreads*2) maxThreads = 2*minThreads; + + return 0; } -static void dnodeExecuteRetrieveData(SSchedMsg *pSched) { - SDnodeRetrieveCallbackFp callback = (SDnodeRetrieveCallbackFp)pSched->thandle; - SRetrieveTableMsg *pRetrieve = pSched->msg; - void *pConn = pSched->ahandle; +void dnodeCleanupRead() { + taosCloseQset(readQset); +} - dTrace("conn:%p, retrieve msg is disposed, qhandle:%" PRId64, pConn, pRetrieve->qhandle); +void dnodeRead(SRpcMsg *pMsg) { + int leftLen = pMsg->contLen; + char *pCont = (char *)pMsg->pCont; + int contLen = 0; + int numOfVnodes = 0; + int32_t vgId = 0; + SRpcContext *pRpcContext = NULL; - //examples - int32_t code = TSDB_CODE_SUCCESS; - void *pQInfo = (void*)pRetrieve->qhandle; + // parse head, get number of vnodes; + if ( numOfVnodes > 1) { + pRpcContext = calloc(sizeof(SRpcContext), 1); + pRpcContext->numOfVnodes = 1; + } - (*callback)(code, pQInfo, pConn); + while (leftLen > 0) { + // todo: parse head, get vgId, contLen - free(pSched->msg); + // get pVnode from vgId + void *pVnode = dnodeGetVnode(vgId); + if (pVnode == NULL) { + + continue; + } + + // put message into queue + SReadMsg readMsg; + readMsg.rpcMsg = *pMsg; + readMsg.pCont = pCont; + readMsg.contLen = contLen; + readMsg.pRpcContext = pRpcContext; + readMsg.pVnode = pVnode; + + taos_queue queue = dnodeGetVnodeRworker(pVnode); + taosWriteQitem(queue, &readMsg); + + // next vnode + leftLen -= contLen; + pCont -= contLen; + } } -void dnodeRetrieveData(SRetrieveTableMsg *pRetrieve, void *pConn, SDnodeRetrieveCallbackFp callbackFp) { - dTrace("conn:%p, retrieve msg is received", pConn); +void *dnodeAllocateReadWorker() { + + taos_queue *queue = taosOpenQueue(sizeof(SReadMsg)); + if ( queue == NULL ) return NULL; + + taosAddIntoQset(readQset, queue); - void *msg = malloc(sizeof(SRetrieveTableMsg)); - memcpy(msg, pRetrieve, sizeof(SRetrieveTableMsg)); + // spawn a thread to process queue + if (threads < maxThreads) { + pthread_t thread; + pthread_attr_t thAttr; + pthread_attr_init(&thAttr); + pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); - SSchedMsg schedMsg; - schedMsg.msg = msg; - schedMsg.ahandle = pConn; - schedMsg.thandle = callbackFp; - schedMsg.fp = dnodeExecuteRetrieveData; - taosScheduleTask(tsQueryQhandle, &schedMsg); + if (pthread_create(&thread, &thAttr, dnodeProcessReadQueue, readQset) != 0) { + dError("failed to create thread to process read queue, reason:%s", strerror(errno)); + } + } + + return queue; } -int32_t dnodeGetRetrieveData(void *pQInfo, SRetrieveTableRsp *pRetrieve) { - dTrace("qInfo:%p, data is retrieved"); - pRetrieve->numOfRows = 0; - return 0; +void dnodeFreeReadWorker(void *rqueue) { + + taosCloseQueue(rqueue); + + // dynamically adjust the number of threads } -int32_t dnodeGetRetrieveDataSize(void *pQInfo) { - dTrace("qInfo:%p, contLen is 100"); - return 100; +static void *dnodeProcessReadQueue(void *param) { + taos_qset qset = (taos_qset)param; + SReadMsg readMsg; + + while (1) { + if (taosReadQitemFromQset(qset, &readMsg) <= 0) { + dnodeHandleIdleReadWorker(); + continue; + } + + terrno = 0; + if (dnodeProcessReadMsgFp[readMsg.rpcMsg.msgType]) { + (*dnodeProcessReadMsgFp[readMsg.rpcMsg.msgType]) (&readMsg); + } else { + terrno = TSDB_CODE_MSG_NOT_PROCESSED; + } + + dnodeProcessReadResult(&readMsg); + } + + return NULL; } +static void dnodeHandleIdleReadWorker() { + int num = taosGetQueueNumber(readQset); + + if (num == 0 || (num <= minThreads && threads > minThreads)) { + threads--; + pthread_exit(NULL); + } else { + usleep(100); + sched_yield(); + } +} + +static void dnodeProcessReadResult(SReadMsg *pRead) { + SRpcContext *pRpcContext = pRead->pRpcContext; + int32_t code = 0; + + dnodeReleaseVnode(pRead->pVnode); + + if (pRpcContext) { + if (terrno) { + if (pRpcContext->code == 0) pRpcContext->code = terrno; + } + + int count = atomic_add_fetch_32(&pRpcContext->count, 1); + if (count < pRpcContext->numOfVnodes) { + // not over yet, multiple vnodes + return; + } + // over, result can be merged now + code = pRpcContext->code; + } else { + code = terrno; + } + + SRpcMsg rsp; + rsp.handle = pRead->rpcMsg.handle; + rsp.code = code; + rsp.pCont = NULL; + rpcSendResponse(&rsp); + rpcFreeCont(pRead->rpcMsg.pCont); // free the received message +} + +static void dnodeProcessQueryMsg(SReadMsg *pMsg) { + +} + +static void dnodeProcessRetrieveMsg(SReadMsg *pMsg) { + +} diff --git a/src/dnode/src/dnodeShell.c b/src/dnode/src/dnodeShell.c index d1a58c65e9..6cf3cf4df9 100644 --- a/src/dnode/src/dnodeShell.c +++ b/src/dnode/src/dnodeShell.c @@ -19,31 +19,22 @@ #include "taosdef.h" #include "taosmsg.h" #include "tlog.h" -#include "tsocket.h" -#include "tschemautil.h" -#include "textbuffer.h" #include "trpc.h" -#include "http.h" -#include "dnode.h" -#include "dnodeMgmt.h" -#include "dnodeRead.h" #include "dnodeSystem.h" -#include "dnodeShell.h" -#include "dnodeVnodeMgmt.h" +#include "dnodeRead.h" #include "dnodeWrite.h" +#include "dnodeShell.h" -static void dnodeProcessRetrieveMsg(void *pCont, int32_t contLen, void *pConn); -static void dnodeProcessQueryMsg(void *pCont, int32_t contLen, void *pConn); -static void dnodeProcessSubmitMsg(void *pCont, int32_t contLen, void *pConn); -static void dnodeProcessMsgFromShell(char msgType, void *pCont, int contLen, void *handle, int32_t code); -static int dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey); - -static void *tsDnodeShellServer = NULL; -static int32_t tsDnodeQueryReqNum = 0; -static int32_t tsDnodeSubmitReqNum = 0; +static void (*dnodeProcessShellMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *); +static void dnodeProcessMsgFromShell(SRpcMsg *pMsg); +static void *tsDnodeShellRpc = NULL; int32_t dnodeInitShell() { - int32_t numOfThreads = tsNumOfCores * tsNumOfThreadsPerCore; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeWrite; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_QUERY] = dnodeRead; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_RETRIEVE] = dnodeRead; + + int numOfThreads = tsNumOfCores * tsNumOfThreadsPerCore; numOfThreads = (int32_t) ((1.0 - tsRatioOfQueryThreads) * numOfThreads / 2.0); if (numOfThreads < 1) { numOfThreads = 1; @@ -55,167 +46,50 @@ int32_t dnodeInitShell() { rpcInit.localPort = tsVnodeShellPort; rpcInit.label = "DND-shell"; rpcInit.numOfThreads = numOfThreads; - rpcInit.cfp = dnodeProcessMsgFromShell; + rpcInit.cfp = dnodeProcessMsgFromShell; rpcInit.sessions = TSDB_SESSIONS_PER_DNODE; rpcInit.connType = TAOS_CONN_SERVER; - rpcInit.idleTime = tsShellActivityTimer * 2000; - rpcInit.afp = dnodeRetrieveUserAuthInfo; + rpcInit.idleTime = tsShellActivityTimer * 1500; - tsDnodeShellServer = rpcOpen(&rpcInit); - if (tsDnodeShellServer == NULL) { + tsDnodeShellRpc = rpcOpen(&rpcInit); + if (tsDnodeShellRpc == NULL) { dError("failed to init connection from shell"); return -1; } - dPrint("shell is opened"); - return TSDB_CODE_SUCCESS; + dPrint("connection to shell is opened"); + return 0; } void dnodeCleanupShell() { - if (tsDnodeShellServer) { - rpcClose(tsDnodeShellServer); + if (tsDnodeShellRpc) { + rpcClose(tsDnodeShellRpc); } } -SDnodeStatisInfo dnodeGetStatisInfo() { - SDnodeStatisInfo info = {0}; - if (dnodeGetRunStatus() == TSDB_DNODE_RUN_STATUS_RUNING) { - info.httpReqNum = httpGetReqCount(); - info.queryReqNum = atomic_exchange_32(&tsDnodeQueryReqNum, 0); - info.submitReqNum = atomic_exchange_32(&tsDnodeSubmitReqNum, 0); - } +void dnodeProcessMsgFromShell(SRpcMsg *pMsg) { + SRpcMsg rpcMsg; - return info; -} + rpcMsg.handle = pMsg->handle; + rpcMsg.pCont = NULL; + rpcMsg.contLen = 0; -static void dnodeProcessMsgFromShell(char msgType, void *pCont, int contLen, void *handle, int32_t code) { if (dnodeGetRunStatus() != TSDB_DNODE_RUN_STATUS_RUNING) { - rpcSendResponse(handle, TSDB_CODE_NOT_READY, 0, 0); - dTrace("query msg is ignored since dnode not running"); + dError("RPC %p, shell msg is ignored since dnode not running", pMsg->handle); + rpcMsg.code = TSDB_CODE_NOT_READY; + rpcSendResponse(&rpcMsg); + rpcFreeCont(pMsg->pCont); return; } - dTrace("conn:%p, msg:%s is received", handle, taosMsg[(int8_t)msgType]); - - if (msgType == TSDB_MSG_TYPE_QUERY) { - dnodeProcessQueryMsg(pCont, contLen, handle); - } else if (msgType == TSDB_MSG_TYPE_RETRIEVE) { - dnodeProcessRetrieveMsg(pCont, contLen, handle); - } else if (msgType == TSDB_MSG_TYPE_SUBMIT) { - dnodeProcessSubmitMsg(pCont, contLen, handle); + if ( dnodeProcessShellMsgFp[pMsg->msgType] ) { + (*dnodeProcessShellMsgFp[pMsg->msgType])(pMsg); } else { - dError("conn:%p, msg:%s is not processed", handle, taosMsg[(int8_t)msgType]); - } - - //TODO free may be cause segmentfault - // rpcFreeCont(pCont); -} - -static int dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey) { - return TSDB_CODE_SUCCESS; -} - -static void dnodeProcessQueryMsgCb(int32_t code, void *pQInfo, void *pConn) { - dTrace("conn:%p, query is returned, code:%d", pConn, code); - - int32_t contLen = sizeof(SQueryTableRsp); - SQueryTableRsp *queryRsp = (SQueryTableRsp *) rpcMallocCont(contLen); - if (queryRsp == NULL) { - rpcSendResponse(pConn, TSDB_CODE_SERV_OUT_OF_MEMORY, NULL, 0); - return; - } - - queryRsp->code = htonl(code); - queryRsp->qhandle = htobe64((uint64_t) (pQInfo)); - rpcSendResponse(pConn, TSDB_CODE_SUCCESS, queryRsp, contLen); -} - -static void dnodeProcessQueryMsg(void *pCont, int32_t contLen, void *pConn) { - atomic_fetch_add_32(&tsDnodeQueryReqNum, 1); - SQueryTableMsg *pQuery = (SQueryTableMsg *) pCont; - dnodeQueryData(pQuery, pConn, dnodeProcessQueryMsgCb); -} - -void dnodeProcessRetrieveMsgCb(int32_t code, void *pQInfo, void *pConn) { - dTrace("conn:%p, retrieve is returned, code:%d", pConn, code); - - assert(pConn != NULL); - if (code != TSDB_CODE_SUCCESS) { - rpcSendResponse(pConn, code, 0, 0); - return; - } - - assert(pQInfo != NULL); - int32_t contLen = dnodeGetRetrieveDataSize(pQInfo); - SRetrieveTableRsp *pRetrieve = (SRetrieveTableRsp *) rpcMallocCont(contLen); - if (pRetrieve == NULL) { - rpcSendResponse(pConn, TSDB_CODE_SERV_OUT_OF_MEMORY, 0, 0); - return; - } - - code = dnodeGetRetrieveData(pQInfo, pRetrieve); - if (code != TSDB_CODE_SUCCESS) { - rpcSendResponse(pConn, TSDB_CODE_INVALID_QHANDLE, 0, 0); - } - - pRetrieve->numOfRows = htonl(pRetrieve->numOfRows); - pRetrieve->precision = htons(pRetrieve->precision); - pRetrieve->offset = htobe64(pRetrieve->offset); - pRetrieve->useconds = htobe64(pRetrieve->useconds); - - rpcSendResponse(pConn, TSDB_CODE_SUCCESS, pRetrieve, contLen); -} - -static void dnodeProcessRetrieveMsg(void *pCont, int32_t contLen, void *pConn) { - SRetrieveTableMsg *pRetrieve = (SRetrieveTableMsg *) pCont; - pRetrieve->qhandle = htobe64(pRetrieve->qhandle); - pRetrieve->free = htons(pRetrieve->free); - - dnodeRetrieveData(pRetrieve, pConn, dnodeProcessRetrieveMsgCb); -} - -void dnodeProcessSubmitMsgCb(SShellSubmitRspMsg *result, void *pConn) { - assert(result != NULL); - dTrace("conn:%p, submit is returned, code:%d", pConn, result->code); - - if (result->code != 0) { - rpcSendResponse(pConn, result->code, NULL, 0); - return; - } - - int32_t contLen = sizeof(SShellSubmitRspMsg) + result->numOfFailedBlocks * sizeof(SShellSubmitRspBlock); - SShellSubmitRspMsg *submitRsp = (SShellSubmitRspMsg *) rpcMallocCont(contLen); - if (submitRsp == NULL) { - rpcSendResponse(pConn, TSDB_CODE_SERV_OUT_OF_MEMORY, NULL, 0); - return; - } - - memcpy(submitRsp, result, contLen); - - for (int i = 0; i < submitRsp->numOfFailedBlocks; ++i) { - SShellSubmitRspBlock *block = &submitRsp->failedBlocks[i]; - if (block->code == TSDB_CODE_NOT_ACTIVE_VNODE || block->code == TSDB_CODE_INVALID_VNODE_ID) { - dnodeSendVnodeCfgMsg(block->vnode); - } else if (block->code == TSDB_CODE_INVALID_TABLE_ID || block->code == TSDB_CODE_NOT_ACTIVE_TABLE) { - dnodeSendTableCfgMsg(block->vnode, block->sid); - } - block->index = htonl(block->index); - block->vnode = htonl(block->vnode); - block->sid = htonl(block->sid); - block->code = htonl(block->code); + dError("RPC %p, msg:%s from shell is not handled", pMsg->handle, taosMsg[pMsg->msgType]); + rpcMsg.code = TSDB_CODE_MSG_NOT_PROCESSED; + rpcSendResponse(&rpcMsg); + rpcFreeCont(pMsg->pCont); } - submitRsp->code = htonl(submitRsp->code); - submitRsp->numOfRows = htonl(submitRsp->numOfRows); - submitRsp->affectedRows = htonl(submitRsp->affectedRows); - submitRsp->failedRows = htonl(submitRsp->failedRows); - submitRsp->numOfFailedBlocks = htonl(submitRsp->numOfFailedBlocks); - - rpcSendResponse(pConn, TSDB_CODE_SUCCESS, submitRsp, contLen); } -static void dnodeProcessSubmitMsg(void *pCont, int32_t contLen, void *pConn) { - atomic_fetch_add_32(&tsDnodeSubmitReqNum, 1); - SShellSubmitMsg *pSubmit = (SShellSubmitMsg *) pCont; - dnodeWriteData(pSubmit, pConn, dnodeProcessSubmitMsgCb); -} diff --git a/src/dnode/src/dnodeSystem.c b/src/dnode/src/dnodeSystem.c index d6127574c6..65e5c35da1 100644 --- a/src/dnode/src/dnodeSystem.c +++ b/src/dnode/src/dnodeSystem.c @@ -25,12 +25,12 @@ #include "ttimer.h" #include "tutil.h" #include "http.h" +#include "trpc.h" #include "dnode.h" #include "dnodeMgmt.h" #include "dnodeModule.h" #include "dnodeShell.h" #include "dnodeSystem.h" -#include "dnodeVnodeMgmt.h" #ifdef CLUSTER #include "account.h" @@ -50,6 +50,8 @@ static int32_t dnodeInitRpcQHandle(); static int32_t dnodeInitQueryQHandle(); static int32_t dnodeInitTmrCtl(); +int32_t (*dnodeInitPeers)(int32_t numOfThreads) = NULL; + void *tsDnodeTmr; void **tsRpcQhandle; void *tsDnodeMgmtQhandle; @@ -93,7 +95,7 @@ void dnodeCleanUpSystem() { dnodeCleanupShell(); dnodeCleanUpModules(); - dnodeCleanupVnodes(); + dnodeCleanupMgmt(); taosCloseLogger(); dnodeCleanupStorage(); dnodeCleanVnodesLock(); @@ -154,7 +156,7 @@ int32_t dnodeInitSystem() { return -1; } - dnodeInitMgmtIp(); +// dnodeInitMgmtIp(); tsPrintGlobalConfig(); @@ -193,7 +195,7 @@ int32_t dnodeInitSystem() { return -1; } - if (dnodeOpenVnodes() < 0) { + if (dnodeInitMgmt() < 0) { dError("failed to init vnode storage"); return -1; } @@ -303,9 +305,3 @@ int32_t (*dnodeCheckSystem)() = dnodeCheckSystemImp; int32_t dnodeInitPeersImp(int32_t numOfThreads) { return 0; } - -int32_t (*dnodeInitPeers)(int32_t numOfThreads) = dnodeInitPeersImp; - - - - diff --git a/src/dnode/src/dnodeVnodeMgmt.c b/src/dnode/src/dnodeVnodeMgmt.c deleted file mode 100644 index fd1a0b6f28..0000000000 --- a/src/dnode/src/dnodeVnodeMgmt.c +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#define _DEFAULT_SOURCE -#include "os.h" -#include "tlog.h" -#include "taoserror.h" -#include "dnodeVnodeMgmt.h" - -int32_t dnodeOpenVnodes() { - dPrint("open all vnodes"); - return TSDB_CODE_SUCCESS; -} - -int32_t dnodeCleanupVnodes() { - dPrint("clean all vnodes"); - return TSDB_CODE_SUCCESS; -} - -bool dnodeCheckVnodeExist(int32_t vnode) { - dPrint("vnode:%d, check vnode exist", vnode); - return true; -} - -int32_t dnodeCreateVnode(SCreateVnodeMsg *pVnode) { - dPrint("vnode:%d, is created", htonl(pVnode->vnode)); - return TSDB_CODE_SUCCESS; -} - -int32_t dnodeDropVnode(int32_t vnode) { - dPrint("vnode:%d, is dropped", vnode); - return TSDB_CODE_SUCCESS; -} - -void* dnodeGetVnode(int32_t vnode) { - dPrint("vnode:%d, get vnode"); - return NULL; -} - -EVnodeStatus dnodeGetVnodeStatus(int32_t vnode) { - dPrint("vnode:%d, get vnode status"); - return TSDB_VN_STATUS_MASTER; -} - -bool dnodeCheckTableExist(int32_t vnode, int32_t sid, int64_t uid) { - dPrint("vnode:%d, sid:%d, check table exist"); - return true; -} - -int32_t dnodeGetVnodesNum() { - return 1; -} diff --git a/src/dnode/src/dnodeWrite.c b/src/dnode/src/dnodeWrite.c index 238fb58e75..b453fdff17 100644 --- a/src/dnode/src/dnodeWrite.c +++ b/src/dnode/src/dnodeWrite.c @@ -17,82 +17,246 @@ #include "os.h" #include "taoserror.h" #include "tlog.h" -#include "tutil.h" +#include "trpc.h" +#include "tqueue.h" +#include "taosmsg.h" #include "dnodeWrite.h" -#include "dnodeVnodeMgmt.h" +#include "dnodeMgmt.h" -void dnodeWriteData(SShellSubmitMsg *pSubmit, void *pConn, void (*callback)(SShellSubmitRspMsg *rsp, void *pConn)) { - dTrace("submit msg is disposed, affectrows:1"); +typedef struct { + int32_t code; + int32_t count; // number of vnodes returned result + int32_t numOfVnodes; // number of vnodes involved +} SRpcContext; - SShellSubmitRspMsg result = {0}; +typedef struct _write { + void *pCont; + int contLen; + SRpcMsg rpcMsg; + void *pVnode; // pointer to vnode + SRpcContext *pRpcContext; // RPC message context +} SWriteMsg; - int32_t numOfSid = htonl(pSubmit->numOfSid); - if (numOfSid <= 0) { - dError("invalid num of tables:%d", numOfSid); - result.code = TSDB_CODE_INVALID_QUERY_MSG; - callback(&result, pConn); +typedef struct { + taos_qset qset; // queue set + pthread_t thread; // thread + int workerId; // worker ID +} SWriteWorker; + +typedef struct _thread_obj { + int max; // max number of workers + int nextId; // from 0 to max-1, cyclic + SWriteWorker *writeWorker; +} SWriteWorkerPool; + +static void (*dnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MAX])(SWriteMsg *); +static void *dnodeProcessWriteQueue(void *param); +static void dnodeHandleIdleWorker(SWriteWorker *pWorker); +static void dnodeProcessWriteResult(SWriteMsg *pWrite); +static void dnodeProcessSubmitMsg(SWriteMsg *pMsg); +static void dnodeProcessCreateTableMsg(SWriteMsg *pMsg); +static void dnodeProcessDropTableMsg(SWriteMsg *pMsg); + +SWriteWorkerPool wWorkerPool; + +int dnodeInitWrite() { + dnodeProcessWriteMsgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeProcessSubmitMsg; + dnodeProcessWriteMsgFp[TSDB_MSG_TYPE_DNODE_CREATE_TABLE] = dnodeProcessCreateTableMsg; + dnodeProcessWriteMsgFp[TSDB_MSG_TYPE_DNODE_REMOVE_TABLE] = dnodeProcessDropTableMsg; + + wWorkerPool.max = tsNumOfCores; + wWorkerPool.writeWorker = (SWriteWorker *)calloc(sizeof(SWriteWorker), wWorkerPool.max); + if (wWorkerPool.writeWorker == NULL) return -1; + + for (int i=0; itableType == TSDB_TABLE_TYPE_CHILD_TABLE) { - dTrace("table:%s, start to create child table, stable:%s", pTable->tableId, pTable->superTableId); - } else if (pTable->tableType == TSDB_TABLE_TYPE_NORMAL_TABLE){ - dTrace("table:%s, start to create normal table", pTable->tableId); - } else if (pTable->tableType == TSDB_TABLE_TYPE_STREAM_TABLE){ - dTrace("table:%s, start to create stream table", pTable->tableId); - } else { - dError("table:%s, invalid table type:%d", pTable->tableType); +void dnodeCleanupWrite() { + + + free(wWorkerPool.writeWorker); +} + +void dnodeWrite(SRpcMsg *pMsg) { + int leftLen = pMsg->contLen; + char *pCont = (char *)pMsg->pCont; + int contLen = 0; + int numOfVnodes = 0; + int32_t vgId = 0; + SRpcContext *pRpcContext = NULL; + + // parse head, get number of vnodes; + + if ( numOfVnodes > 1) { + pRpcContext = calloc(sizeof(SRpcContext), 1); + pRpcContext->numOfVnodes = numOfVnodes; } - for (int i = 0; i < pTable->numOfVPeers; ++i) { - dTrace("table:%s ip:%s vnode:%d sid:%d", pTable->tableId, taosIpStr(pTable->vpeerDesc[i].ip), - pTable->vpeerDesc[i].vnode, pTable->sid); + while (leftLen > 0) { + // todo: parse head, get vgId, contLen + + // get pVnode from vgId + void *pVnode = dnodeGetVnode(vgId); + if (pVnode == NULL) { + + continue; + } + + // put message into queue + SWriteMsg writeMsg; + writeMsg.rpcMsg = *pMsg; + writeMsg.pCont = pCont; + writeMsg.contLen = contLen; + writeMsg.pRpcContext = pRpcContext; + writeMsg.pVnode = pVnode; // pVnode shall be saved for usage later + + taos_queue queue = dnodeGetVnodeWworker(pVnode); + taosWriteQitem(queue, &writeMsg); + + // next vnode + leftLen -= contLen; + pCont -= contLen; } +} - SSchema *pSchema = (SSchema *) pTable->data; - for (int32_t col = 0; col < pTable->numOfColumns; ++col) { - dTrace("table:%s col index:%d colId:%d bytes:%d type:%d name:%s", - pTable->tableId, col, pSchema->colId, pSchema->bytes, pSchema->type, pSchema->name); - pSchema++; +void *dnodeAllocateWriteWorker() { + SWriteWorker *pWorker = wWorkerPool.writeWorker + wWorkerPool.nextId; + + if (pWorker->qset == NULL) { + pWorker->qset = taosOpenQset(); + if (pWorker->qset == NULL) return NULL; + + pthread_attr_t thAttr; + pthread_attr_init(&thAttr); + pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); + + if (pthread_create(&pWorker->thread, &thAttr, dnodeProcessWriteQueue, pWorker) != 0) { + dError("failed to create thread to process read queue, reason:%s", strerror(errno)); + taosCloseQset(pWorker->qset); + } } - for (int32_t col = 0; col < pTable->numOfTags; ++col) { - dTrace("table:%s tag index:%d colId:%d bytes:%d type:%d name:%s", - pTable->tableId, col, pSchema->colId, pSchema->bytes, pSchema->type, pSchema->name); - pSchema++; + + taos_queue *queue = taosOpenQueue(sizeof(SWriteMsg)); + if (queue) { + taosAddIntoQset(pWorker->qset, queue); + wWorkerPool.nextId = (wWorkerPool.nextId + 1) % wWorkerPool.max; } + + return queue; +} + +void dnodeFreeWriteWorker(void *wqueue) { + + taosCloseQueue(wqueue); - return TSDB_CODE_SUCCESS; + // dynamically adjust the number of threads } -/* - * Remove table from local repository - */ -int32_t dnodeDropTable(SDRemoveTableMsg *pTable) { - dPrint("table:%s, sid:%d is removed", pTable->tableId, pTable->sid); - return TSDB_CODE_SUCCESS; +static void *dnodeProcessWriteQueue(void *param) { + SWriteWorker *pWorker = (SWriteWorker *)param; + taos_qall qall; + SWriteMsg writeMsg; + int numOfMsgs; + + while (1) { + numOfMsgs = taosReadAllQitemsFromQset(pWorker->qset, &qall); + if (numOfMsgs <=0) { + dnodeHandleIdleWorker(pWorker); // thread exit if no queues anymore + continue; + } + + for (int i=0; iwhandle, writeMsg.rpcMsg.msgType, writeMsg.pCont, writeMsg.contLen); + } + + // flush WAL file + // walFsync(pVnode->whandle); + + // browse all items, and process them one by one + taosResetQitems(qall); + for (int i=0; itableId); - return TSDB_CODE_SUCCESS; +static void dnodeProcessWriteResult(SWriteMsg *pWrite) { + SRpcContext *pRpcContext = pWrite->pRpcContext; + int32_t code = 0; + + dnodeReleaseVnode(pWrite->pVnode); + + if (pRpcContext) { + if (terrno) { + if (pRpcContext->code == 0) pRpcContext->code = terrno; + } + + int count = atomic_add_fetch_32(&pRpcContext->count, 1); + if (count < pRpcContext->numOfVnodes) { + // not over yet, multiple vnodes + return; + } + + // over, result can be merged now + code = pRpcContext->code; + } else { + code = terrno; + } + + SRpcMsg rsp; + rsp.handle = pWrite->rpcMsg.handle; + rsp.code = code; + rsp.pCont = NULL; + rpcSendResponse(&rsp); + rpcFreeCont(pWrite->rpcMsg.pCont); // free the received message } -/* - * Remove all child tables of supertable from local repository - */ -int32_t dnodeDropSuperTable(SDRemoveSuperTableMsg *pStable) { - dPrint("stable:%s, is removed", pStable->tableId); - return TSDB_CODE_SUCCESS; +static void dnodeHandleIdleWorker(SWriteWorker *pWorker) { + + int num = taosGetQueueNumber(pWorker->qset); + + if (num > 0) { + usleep(100); + sched_yield(); + } else { + taosCloseQset(pWorker->qset); + pWorker->qset = NULL; + pthread_exit(NULL); + } +} + +static void dnodeProcessSubmitMsg(SWriteMsg *pMsg) { + + } +static void dnodeProcessCreateTableMsg(SWriteMsg *pMsg) { + + +} + +static void dnodeProcessDropTableMsg(SWriteMsg *pMsg) { + + +} diff --git a/src/mnode/src/mgmtDnodeInt.c b/src/mnode/src/mgmtDnodeInt.c index 4979f4bb37..a7555c8b6e 100644 --- a/src/mnode/src/mgmtDnodeInt.c +++ b/src/mnode/src/mgmtDnodeInt.c @@ -48,7 +48,7 @@ static void mgmtSendMsgToDnodeQueueFp(SSchedMsg *sched) { void *ahandle = sched->ahandle; int8_t *pCont = sched->msg; - dnodeProcessMsgFromMgmt(msgType, pCont, contLen, ahandle, code); +// dnodeProcessMsgFromMgmt(msgType, pCont, contLen, ahandle, code); } void mgmtSendMsgToDnode(SRpcIpSet *ipSet, int8_t msgType, void *pCont, int32_t contLen, void *ahandle) { -- GitLab