提交 88b413cc 编写于 作者: S slguan

fix compile error in dnode module

上级 bfbd2707
...@@ -10,7 +10,7 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) ...@@ -10,7 +10,7 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
AUX_SOURCE_DIRECTORY(src SRC) AUX_SOURCE_DIRECTORY(src SRC)
ADD_EXECUTABLE(taosd ${SRC}) ADD_EXECUTABLE(taosd ${SRC})
TARGET_LINK_LIBRARIES(taosd mnode sdb taos_static monitor http tsdb) TARGET_LINK_LIBRARIES(taosd mnode sdb taos_static monitor http)
#IF (TD_CLUSTER) #IF (TD_CLUSTER)
# TARGET_LINK_LIBRARIES(taosd dcluster) # TARGET_LINK_LIBRARIES(taosd dcluster)
...@@ -23,7 +23,7 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) ...@@ -23,7 +23,7 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
COMMAND echo "make test directory" COMMAND echo "make test directory"
DEPENDS taosd DEPENDS taosd
COMMAND ${CMAKE_COMMAND} -E make_directory ${TD_TESTS_OUTPUT_DIR}/cfg/ COMMAND ${CMAKE_COMMAND} -E make_directory ${TD_TESTS_OUTPUT_DIR}/cfg/
COMMAND ${CMAKE_COMMAND} -E make_directory ${TD_TESTS_OUTPUT_DIR}/log/ COMMAND ${CMAKE_COMMAND} -E make_directoryF ${TD_TESTS_OUTPUT_DIR}/log/
COMMAND ${CMAKE_COMMAND} -E make_directory ${TD_TESTS_OUTPUT_DIR}/data/ COMMAND ${CMAKE_COMMAND} -E make_directory ${TD_TESTS_OUTPUT_DIR}/data/
COMMAND ${CMAKE_COMMAND} -E echo dataDir ${TD_TESTS_OUTPUT_DIR}/data > ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg COMMAND ${CMAKE_COMMAND} -E echo dataDir ${TD_TESTS_OUTPUT_DIR}/data > ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
COMMAND ${CMAKE_COMMAND} -E echo logDir ${TD_TESTS_OUTPUT_DIR}/log >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg COMMAND ${CMAKE_COMMAND} -E echo logDir ${TD_TESTS_OUTPUT_DIR}/log >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
......
...@@ -13,62 +13,15 @@ ...@@ -13,62 +13,15 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#ifndef TDENGINE_DNODE_VNODE_MGMT_H #ifndef TDENGINE_DNODE_MCLIENT_H
#define TDENGINE_DNODE_VNODE_MGMT_H #define TDENGINE_DNODE_MCLIENT_H
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
#include <stdint.h> int32_t dnodeInitMClient();
#include <stdbool.h> void dnodeCleanupMClient();
#include "taosdef.h"
#include "taosmsg.h"
#include "tstatus.h"
/*
* Open all Vnodes in the local data directory
*/
int32_t dnodeOpenVnodes();
/*
* Close all Vnodes that have been created and opened
*/
int32_t dnodeCleanupVnodes();
/*
* Check if vnode already exists
*/
bool dnodeCheckVnodeExist(int32_t vid);
/*
* Create vnode with specified configuration and open it
* if exist, config it
*/
int32_t dnodeCreateVnode(SCreateVnodeMsg *pVnode);
/*
* Remove vnode from local repository
*/
int32_t dnodeDropVnode(int32_t vnode);
/*
* Get the vnode object that has been opened
*/
//tsdb_repo_t* dnodeGetVnode(int vid);
void* dnodeGetVnode(int32_t vnode);
int32_t dnodeGetVnodesNum();
/*
* get the status of vnode
*/
EVnodeStatus dnodeGetVnodeStatus(int32_t vnode);
/*
* Check if vnode already exists, and table exist in this vnode
*/
bool dnodeCheckTableExist(int32_t vnode, int32_t sid, int64_t uid);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -20,18 +20,17 @@ ...@@ -20,18 +20,17 @@
extern "C" { extern "C" {
#endif #endif
#include <stdint.h> int dnodeInitMgmt();
#include <stdbool.h> void dnodeCleanupMgmt();
void dnodeMgmt(SRpcMsg *);
int32_t dnodeInitMgmt();
void dnodeInitMgmtIp(); void* dnodeGetVnode(int vgId);
int dnodeGetVnodeStatus(void *);
void dnodeProcessMsgFromMgmt(char msgType, void *pCont, int32_t contLen, void *pConn, int32_t code); void* dnodeGetVnodeRworker(void *);
void dnodeSendMsgToMnode(int8_t msgType, void *pCont, int32_t contLen); void* dnodeGetVnodeWworker(void *);
void dnodeSendRspToMnode(void *pConn, int8_t msgType, int32_t code, void *pCont, int32_t contLen); void* dnodeGetVnodeWal(void *);
void* dnodeGetVnodeTsdb(void *);
void dnodeSendVnodeCfgMsg(int32_t vnode); void dnodeReleaseVnode(void *);
void dnodeSendTableCfgMsg(int32_t vnode, int32_t sid);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -13,52 +13,18 @@ ...@@ -13,52 +13,18 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#define _DEFAULT_SOURCE #ifndef TDENGINE_DNODE_MNODE_H
#include "os.h" #define TDENGINE_DNODE_MNODE_H
#include "tlog.h"
#include "taoserror.h"
#include "dnodeVnodeMgmt.h"
int32_t dnodeOpenVnodes() { #ifdef __cplusplus
dPrint("open all vnodes"); extern "C" {
return TSDB_CODE_SUCCESS; #endif
}
int32_t dnodeCleanupVnodes() {
dPrint("clean all vnodes");
return TSDB_CODE_SUCCESS;
}
bool dnodeCheckVnodeExist(int32_t vnode) { int32_t dnodeInitMnode();
dPrint("vnode:%d, check vnode exist", vnode); void dnodeCleanupMnode();
return true;
}
int32_t dnodeCreateVnode(SCreateVnodeMsg *pVnode) { #ifdef __cplusplus
dPrint("vnode:%d, is created", htonl(pVnode->vnode));
return TSDB_CODE_SUCCESS;
} }
#endif
int32_t dnodeDropVnode(int32_t vnode) { #endif
dPrint("vnode:%d, is dropped", vnode);
return TSDB_CODE_SUCCESS;
}
void* dnodeGetVnode(int32_t vnode) {
dPrint("vnode:%d, get vnode");
return NULL;
}
EVnodeStatus dnodeGetVnodeStatus(int32_t vnode) {
dPrint("vnode:%d, get vnode status");
return TSDB_VN_STATUS_MASTER;
}
bool dnodeCheckTableExist(int32_t vnode, int32_t sid, int64_t uid) {
dPrint("vnode:%d, sid:%d, check table exist");
return true;
}
int32_t dnodeGetVnodesNum() {
return 1;
}
...@@ -20,16 +20,10 @@ ...@@ -20,16 +20,10 @@
extern "C" { extern "C" {
#endif #endif
#include <stdint.h>
#include <stdbool.h>
#include <pthread.h>
void dnodeAllocModules(); void dnodeAllocModules();
int32_t dnodeInitModules(); int32_t dnodeInitModules();
void dnodeCleanUpModules(); void dnodeCleanUpModules();
extern void (*dnodeStartModules)();
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -20,31 +20,12 @@ ...@@ -20,31 +20,12 @@
extern "C" { extern "C" {
#endif #endif
#include <stdbool.h> int dnodeInitRead();
#include <stdint.h> void dnodeCleanupRead();
#include "taosdef.h" void dnodeRead(SRpcMsg *);
#include "taosmsg.h" void *dnodeAllocateReadWorker();
void dnodeFreeReadWorker(void *rqueue);
/*
* handle query message, and the result is returned by callback function
*/
void dnodeQueryData(SQueryTableMsg *pQuery, void *pConn, void (*callback)(int32_t code, void *pQInfo, void *pConn));
/*
* Dispose retrieve msg, and the result will passed through callback function
*/
typedef void (*SDnodeRetrieveCallbackFp)(int32_t code, void *pQInfo, void *pConn);
void dnodeRetrieveData(SRetrieveTableMsg *pRetrieve, void *pConn, SDnodeRetrieveCallbackFp callbackFp);
/*
* Fill retrieve result according to query info
*/
int32_t dnodeGetRetrieveData(void *pQInfo, SRetrieveTableRsp *pRetrieve);
/*
* Get the size of retrieve result according to query info
*/
int32_t dnodeGetRetrieveDataSize(void *pQInfo);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -20,27 +20,9 @@ ...@@ -20,27 +20,9 @@
extern "C" { extern "C" {
#endif #endif
#include <stdint.h>
#include <stdint.h>
#include "dnode.h"
typedef struct {
int sid;
uint32_t ip;
uint16_t port;
int32_t count; // track the number of imports
int32_t code; // track the code of imports
int32_t numOfTotalPoints; // track the total number of points imported
void *thandle; // handle from TAOS layer
void *qhandle;
} SShellObj;
int32_t dnodeInitShell(); int32_t dnodeInitShell();
void dnodeCleanupShell(); void dnodeCleanupShell();
//SDnodeStatisInfo dnodeGetStatisInfo()
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -20,34 +20,15 @@ ...@@ -20,34 +20,15 @@
extern "C" { extern "C" {
#endif #endif
#include <stdint.h>
#include <stdbool.h>
typedef enum { typedef enum {
TSDB_DNODE_RUN_STATUS_INITIALIZE, TSDB_DNODE_RUN_STATUS_INITIALIZE,
TSDB_DNODE_RUN_STATUS_RUNING, TSDB_DNODE_RUN_STATUS_RUNING,
TSDB_DNODE_RUN_STATUS_STOPPED TSDB_DNODE_RUN_STATUS_STOPPED
} SDnodeRunStatus; } SDnodeRunStatus;
extern int32_t (*dnodeInitPeers)(int32_t numOfThreads);
extern int32_t (*dnodeCheckSystem)();
extern int32_t (*dnodeInitStorage)();
extern void (*dnodeCleanupStorage)();
extern int32_t tsMaxQueues;
extern void ** tsRpcQhandle;
extern void *tsQueryQhandle;
extern void *tsDnodeMgmtQhandle;
extern void *tsDnodeTmr;
int32_t dnodeInitSystem(); int32_t dnodeInitSystem();
void dnodeCleanUpSystem(); void dnodeCleanUpSystem();
void dnodeInitPlugins();
SDnodeRunStatus dnodeGetRunStatus(); SDnodeRunStatus dnodeGetRunStatus();
void dnodeSetRunStatus(SDnodeRunStatus status);
void dnodeCheckDataDirOpenned(const char *dir);
void dnodeLockVnodes();
void dnodeUnLockVnodes();
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -20,41 +20,12 @@ ...@@ -20,41 +20,12 @@
extern "C" { extern "C" {
#endif #endif
#include <stdbool.h> int dnodeInitWrite();
#include <stdint.h> void dnodeCleanupWrite();
#include "taosdef.h" void dnodeWrite(SRpcMsg *pMsg);
#include "taosmsg.h" void *dnodeAllocateWriteWorker();
void dnodeFreeWriteWorker(void *worker);
/*
* Write data based on dnode, the detail result can be fetched from rsponse
* pSubmit: Data to be written
* pConn: Communication handle
* callback: Pass the write result through a callback function, possibly in a different thread space
* rsp: will not be freed by callback function
*/
void dnodeWriteData(SShellSubmitMsg *pSubmit, void *pConn, void (*callback)(SShellSubmitRspMsg *rsp, void *pConn));
/*
* Create table with specified configuration and open it
* if table already exist, update its schema and tag
*/
int32_t dnodeCreateTable(SDCreateTableMsg *pTable);
/*
* Remove table from local repository
*/
int32_t dnodeDropTable(SDRemoveTableMsg *pTable);
/*
* Create stream
* if stream already exist, update it
*/
int32_t dnodeCreateStream(SDAlterStreamMsg *pStream);
/*
* Remove all child tables of supertable from local repository
*/
int32_t dnodeDropSuperTable(SDRemoveSuperTableMsg *pStable);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "taosmsg.h"
#include "tlog.h"
#include "trpc.h"
#include "dnodeSystem.h"
static void (*dnodeProcessMgmtRspFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *);
static void dnodeProcessRspFromMnode(SRpcMsg *pMsg);
static void dnodeProcessStatusRsp(SRpcMsg *pMsg);
static void *tsDnodeMClientRpc;
int32_t dnodeInitMClient() {
dnodeProcessMgmtRspFp[TSDB_MSG_TYPE_STATUS_RSP] = dnodeProcessStatusRsp;
SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localIp = tsAnyIp ? "0.0.0.0" : tsPrivateIp;
rpcInit.localPort = 0;
rpcInit.label = "DND-MC";
rpcInit.numOfThreads = 1;
rpcInit.cfp = dnodeProcessRspFromMnode;
rpcInit.sessions = TSDB_SESSIONS_PER_DNODE;
rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.idleTime = tsShellActivityTimer * 1000;
tsDnodeMClientRpc = rpcOpen(&rpcInit);
if (tsDnodeMClientRpc == NULL) {
dError("failed to init connection from mgmt");
return -1;
}
dPrint("client connection to mgmt is opened");
return 0;
}
void dnodeCleanupMClient() {
if (tsDnodeMClientRpc) {
rpcClose(tsDnodeMClientRpc);
}
}
static void dnodeProcessRspFromMnode(SRpcMsg *pMsg) {
if (dnodeProcessMgmtRspFp[pMsg->msgType]) {
(*dnodeProcessMgmtRspFp[pMsg->msgType])(pMsg);
} else {
dError("%s is not processed", taosMsg[pMsg->msgType]);
}
rpcFreeCont(pMsg->pCont);
}
static void dnodeProcessStatusRsp(SRpcMsg *pMsg) {
}
...@@ -15,327 +15,219 @@ ...@@ -15,327 +15,219 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h" #include "os.h"
#include "taosmsg.h"
#include "tlog.h" #include "tlog.h"
#include "taoserror.h"
#include "taosmsg.h"
#include "trpc.h" #include "trpc.h"
#include "tsched.h"
#include "tsystem.h"
#include "mnode.h"
#include "dnode.h"
#include "dnodeSystem.h"
#include "dnodeMgmt.h"
#include "dnodeWrite.h" #include "dnodeWrite.h"
#include "dnodeVnodeMgmt.h" #include "dnodeRead.h"
#include "dnodeMgmt.h"
void (*dnodeInitMgmtIpFp)() = NULL;
int32_t (*dnodeInitMgmtFp)() = NULL;
void (*dnodeCleanUpMgmtFp)() = NULL;
void (*dnodeProcessStatusRspFp)(void *pCont, int32_t contLen, int8_t msgType, void *pConn) = NULL;
void (*dnodeSendMsgToMnodeFp)(int8_t msgType, void *pCont, int32_t contLen) = NULL;
void (*dnodeSendRspToMnodeFp)(void *handle, int32_t code, void *pCont, int contLen) = NULL;
static void *tsStatusTimer = NULL;
static void (*dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MAX])(void *pCont, int32_t contLen, int8_t msgType, void *pConn);
static void dnodeInitProcessShellMsg();
static void dnodeSendMsgToMnodeQueueFp(SSchedMsg *sched) {
int32_t contLen = *(int32_t *) (sched->msg - 4);
int32_t code = *(int32_t *) (sched->msg - 8);
int8_t msgType = *(int8_t *) (sched->msg - 9);
void *handle = sched->ahandle;
int8_t *pCont = sched->msg;
mgmtProcessMsgFromDnode(msgType, pCont, contLen, handle, code);
}
void dnodeSendMsgToMnode(int8_t msgType, void *pCont, int32_t contLen) { typedef struct {
dTrace("msg:%d:%s is sent to mnode", msgType, taosMsg[msgType]); int32_t vgId; // global vnode group ID
if (dnodeSendMsgToMnodeFp) { int status; // status: master, slave, notready, deleting
dnodeSendMsgToMnodeFp(msgType, pCont, contLen); int refCount; // reference count
} else { int64_t version;
if (pCont == NULL) { void *wworker;
pCont = rpcMallocCont(1); void *rworker;
contLen = 0; void *wal;
} void *tsdb;
SSchedMsg schedMsg = {0}; void *replica;
schedMsg.fp = dnodeSendMsgToMnodeQueueFp; void *events;
schedMsg.msg = pCont; void *cq; // continuous query
*(int32_t *) (pCont - 4) = contLen; } SVnodeObj;
*(int32_t *) (pCont - 8) = TSDB_CODE_SUCCESS;
*(int8_t *) (pCont - 9) = msgType; static int dnodeOpenVnodes();
taosScheduleTask(tsDnodeMgmtQhandle, &schedMsg); static void dnodeCleanupVnodes();
} static int dnodeCreateVnode(int32_t vgId, SCreateVnodeMsg *cfg);
static int dnodeDropVnode(SVnodeObj *pVnode);
static void dnodeRemoveVnode(SVnodeObj *pVnode);
static void dnodeProcessCreateVnodeMsg(SRpcMsg *pMsg);
static void dnodeProcessDropVnodeMsg(SRpcMsg *pMsg);
static void dnodeProcessAlterVnodeMsg(SRpcMsg *pMsg);
static void (*dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *pMsg);
int dnodeInitMgmt() {
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_FREE_VNODE] = dnodeProcessDropVnodeMsg;
} }
void dnodeSendRspToMnode(void *pConn, int8_t msgType, int32_t code, void *pCont, int32_t contLen) { void dnodeCleanupMgmt() {
dTrace("rsp:%d:%s is sent to mnode, pConn:%p", msgType, taosMsg[msgType], pConn);
if (dnodeSendRspToMnodeFp) {
dnodeSendRspToMnodeFp(pConn, code, pCont, contLen);
} else {
//hack way
if (pCont == NULL) {
pCont = rpcMallocCont(1);
contLen = 0;
}
SSchedMsg schedMsg = {0};
schedMsg.fp = dnodeSendMsgToMnodeQueueFp;
schedMsg.msg = pCont;
schedMsg.ahandle = pConn;
*(int32_t *) (pCont - 4) = contLen;
*(int32_t *) (pCont - 8) = code;
*(int8_t *) (pCont - 9) = msgType;
taosScheduleTask(tsDnodeMgmtQhandle, &schedMsg);
}
} }
void dnodeSendStatusMsgToMgmt(void *handle, void *tmrId) { void dnodeMgmt(SRpcMsg *pMsg) {
taosTmrReset(dnodeSendStatusMsgToMgmt, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer);
if (tsStatusTimer == NULL) { terrno = 0;
dError("Failed to start status timer");
return;
}
int32_t contLen = sizeof(SStatusMsg) + dnodeGetVnodesNum() * sizeof(SVnodeLoad); if (dnodeProcessMgmtMsgFp[pMsg->msgType]) {
SStatusMsg *pStatus = rpcMallocCont(contLen); (*dnodeProcessMgmtMsgFp[pMsg->msgType])(pMsg);
if (pStatus == NULL) { } else {
dError("Failed to malloc status message"); terrno = TSDB_CODE_MSG_NOT_PROCESSED;
return;
} }
int32_t totalVnodes = dnodeGetVnodesNum(); SRpcMsg rsp;
rsp.handle = pMsg->handle;
pStatus->version = htonl(tsVersion); rsp.code = terrno;
pStatus->privateIp = htonl(inet_addr(tsPrivateIp)); rsp.pCont = NULL;
pStatus->publicIp = htonl(inet_addr(tsPublicIp)); rpcSendResponse(&rsp);
pStatus->lastReboot = htonl(tsRebootTime); rpcFreeCont(pMsg->pCont); // free the received message
pStatus->numOfTotalVnodes = htons((uint16_t) tsNumOfTotalVnodes);
pStatus->openVnodes = htons((uint16_t) totalVnodes);
pStatus->numOfCores = htons((uint16_t) tsNumOfCores);
pStatus->diskAvailable = tsAvailDataDirGB;
pStatus->alternativeRole = (uint8_t) tsAlternativeRole;
SVnodeLoad *pLoad = (SVnodeLoad *)pStatus->load;
//TODO loop all vnodes
// for (int32_t vnode = 0, count = 0; vnode <= totalVnodes; ++vnode) {
// if (vnodeList[vnode].cfg.maxSessions <= 0) continue;
//
// SVnodeObj *pVnode = vnodeList + vnode;
// pLoad->vnode = htonl(vnode);
// pLoad->vgId = htonl(pVnode->cfg.vgId);
// pLoad->status = (uint8_t)vnodeList[vnode].vnodeStatus;
// pLoad->syncStatus =(uint8_t)vnodeList[vnode].syncStatus;
// pLoad->accessState = (uint8_t)(pVnode->accessState);
// pLoad->totalStorage = htobe64(pVnode->vnodeStatistic.totalStorage);
// pLoad->compStorage = htobe64(pVnode->vnodeStatistic.compStorage);
// if (pVnode->vnodeStatus == TSDB_VN_STATUS_MASTER) {
// pLoad->pointsWritten = htobe64(pVnode->vnodeStatistic.pointsWritten);
// } else {
// pLoad->pointsWritten = htobe64(0);
// }
// pLoad++;
//
// if (++count >= tsOpenVnodes) {
// break;
// }
// }
dnodeSendMsgToMnode(TSDB_MSG_TYPE_STATUS, pStatus, contLen);
} }
void *dnodeGetVnode(int vgId) {
SVnodeObj *pVnode;
int32_t dnodeInitMgmt() { // retrieve the pVnode from vgId
if (dnodeInitMgmtFp) {
dnodeInitMgmtFp();
}
dnodeInitProcessShellMsg();
taosTmrReset(dnodeSendStatusMsgToMgmt, 500, NULL, tsDnodeTmr, &tsStatusTimer); // if (pVnode->status == ....) {
return 0; // terrno = ;
// return NULL;
// }
atomic_add_fetch_32(&pVnode->refCount, 1);
return pVnode;
} }
void dnodeInitMgmtIp() { int dnodeGetVnodeStatus(void *pVnode) {
if (dnodeInitMgmtIpFp) { return ((SVnodeObj *)pVnode)->status;
dnodeInitMgmtIpFp();
}
} }
void dnodeCleanUpMgmt() { void *dnodeGetVnodeWworker(void *pVnode) {
if (tsStatusTimer != NULL) { return ((SVnodeObj *)pVnode)->wworker;
taosTmrStopA(&tsStatusTimer); }
tsStatusTimer = NULL;
}
if (dnodeCleanUpMgmtFp) { void *dnodeGetVnodeRworker(void *pVnode) {
dnodeCleanUpMgmtFp(); return ((SVnodeObj *)pVnode)->rworker;
}
} }
void dnodeProcessMsgFromMgmt(char msgType, void *pCont, int32_t contLen, void *pConn, int32_t code) { void *dnodeGetVnodeWal(void *pVnode) {
if (msgType < 0 || msgType >= TSDB_MSG_TYPE_MAX) { return ((SVnodeObj *)pVnode)->wal;
dError("invalid msg type:%d", msgType); }
return;
}
dTrace("msg:%d:%s is received from mgmt, pConn:%p", msgType, taosMsg[(int8_t)msgType], pConn); void *dnodeGetVnodeTsdb(void *pVnode) {
return ((SVnodeObj *)pVnode)->tsdb;
}
if (msgType == TSDB_MSG_TYPE_STATUS_RSP && dnodeProcessStatusRspFp != NULL) { void dnodeReleaseVnode(void *param) {
dnodeProcessStatusRspFp(pCont, contLen, msgType, pConn); SVnodeObj *pVnode = (SVnodeObj *)param;
}
if (dnodeProcessMgmtMsgFp[msgType]) {
(*dnodeProcessMgmtMsgFp[msgType])(pCont, contLen, msgType, pConn);
} else {
dError("%s is not processed", taosMsg[msgType]);
}
//rpcFreeCont(pCont); int refCount = atomic_sub_fetch_32(&pVnode->refCount, 1);
if (refCount == 0) dnodeRemoveVnode(pVnode);
} }
static void dnodeProcessCreateTableRequest(void *pCont, int32_t contLen, int8_t msgType, void *pConn) { static int dnodeOpenVnode() {
SDCreateTableMsg *pTable = pCont; SVnodeObj *pVnode;
pTable->numOfColumns = htons(pTable->numOfColumns);
pTable->numOfTags = htons(pTable->numOfTags);
pTable->sid = htonl(pTable->sid);
pTable->sversion = htonl(pTable->sversion);
pTable->tagDataLen = htonl(pTable->tagDataLen);
pTable->sqlDataLen = htonl(pTable->sqlDataLen);
pTable->contLen = htonl(pTable->contLen);
pTable->numOfVPeers = htonl(pTable->numOfVPeers);
pTable->uid = htobe64(pTable->uid);
pTable->superTableUid = htobe64(pTable->superTableUid);
pTable->createdTime = htobe64(pTable->createdTime);
for (int i = 0; i < pTable->numOfVPeers; ++i) {
pTable->vpeerDesc[i].ip = htonl(pTable->vpeerDesc[i].ip);
pTable->vpeerDesc[i].vnode = htonl(pTable->vpeerDesc[i].vnode);
}
int32_t totalCols = pTable->numOfColumns + pTable->numOfTags; // create tsdb
SSchema *pSchema = (SSchema *) pTable->data;
for (int32_t col = 0; col < totalCols; ++col) {
pSchema->bytes = htons(pSchema->bytes);
pSchema->colId = htons(pSchema->colId);
pSchema++;
}
int32_t code = dnodeCreateTable(pTable); // create wal
dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0);
}
static void dnodeProcessAlterStreamRequest(void *pCont, int32_t contLen, int8_t msgType, void *pConn) { // allocate write worker
SDAlterStreamMsg *pStream = pCont; pVnode->wworker = dnodeAllocateWriteWorker();
pStream->uid = htobe64(pStream->uid);
pStream->stime = htobe64(pStream->stime);
pStream->vnode = htonl(pStream->vnode);
pStream->sid = htonl(pStream->sid);
pStream->status = htonl(pStream->status);
int32_t code = dnodeCreateStream(pStream); // create read queue
dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0); pVnode->rworker = dnodeAllocateReadWorker();
}
static void dnodeProcessRemoveTableRequest(void *pCont, int32_t contLen, int8_t msgType, void *pConn) { // create the replica
SDRemoveTableMsg *pTable = pCont;
pTable->sid = htonl(pTable->sid);
pTable->numOfVPeers = htonl(pTable->numOfVPeers);
pTable->uid = htobe64(pTable->uid);
for (int i = 0; i < pTable->numOfVPeers; ++i) { // set the status
pTable->vpeerDesc[i].ip = htonl(pTable->vpeerDesc[i].ip);
pTable->vpeerDesc[i].vnode = htonl(pTable->vpeerDesc[i].vnode); pVnode->refCount = 1;
}
int32_t code = dnodeDropTable(pTable); return 0;
dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0);
} }
static void dnodeProcessVPeerCfgRsp(void *pCont, int32_t contLen, int8_t msgType, void *pConn) { static int dnodeOpenVnodes() {
int32_t code = htonl(*((int32_t *) pCont)); return 0;
if (code == TSDB_CODE_SUCCESS) {
SCreateVnodeMsg *pVnode = (SCreateVnodeMsg *) (pCont + sizeof(int32_t));
dnodeCreateVnode(pVnode);
} else if (code == TSDB_CODE_INVALID_VNODE_ID) {
SFreeVnodeMsg *vpeer = (SFreeVnodeMsg *) (pCont + sizeof(int32_t));
int32_t vnode = htonl(vpeer->vnode);
dError("vnode:%d, not exist, remove it", vnode);
dnodeDropVnode(vnode);
} else {
dError("code:%d invalid message", code);
}
} }
static void dnodeProcessTableCfgRsp(void *pCont, int32_t contLen, int8_t msgType, void *pConn) { static void dnodeCleanupVnode() {
int32_t code = htonl(*((int32_t *) pCont));
if (code == TSDB_CODE_SUCCESS) {
SDCreateTableMsg *table = (SDCreateTableMsg *) (pCont + sizeof(int32_t));
dnodeCreateTable(table);
} else if (code == TSDB_CODE_INVALID_TABLE_ID) {
SDRemoveTableMsg *pTable = (SDRemoveTableMsg *) (pCont + sizeof(int32_t));
pTable->sid = htonl(pTable->sid);
pTable->uid = htobe64(pTable->uid);
dError("table:%s, sid:%d table is not configured, remove it", pTable->tableId, pTable->sid);
dnodeDropTable(pTable);
} else {
dError("code:%d invalid message", code);
}
} }
static void dnodeProcessCreateVnodeRequest(void *pCont, int32_t contLen, int8_t msgType, void *pConn) { static void dnodeCleanupVnodes() {
SCreateVnodeMsg *pVnode = (SCreateVnodeMsg *) pCont;
int32_t code = dnodeCreateVnode(pVnode);
dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0);
} }
static void dnodeProcessFreeVnodeRequest(void *pCont, int32_t contLen, int8_t msgType, void *pConn) { static int dnodeCreateVnode(int32_t vgId, SCreateVnodeMsg *cfg) {
SFreeVnodeMsg *pVnode = (SFreeVnodeMsg *) pCont;
int32_t vnode = htonl(pVnode->vnode); SVnodeObj *pVnode = malloc(sizeof(SVnodeObj));
// save the vnode info in non-volatile storage
// add into hash, so it can be retrieved
dnodeOpenVnode(pVnode);
int32_t code = dnodeDropVnode(vnode); return 0;
dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0);
} }
static void dnodeProcessDnodeCfgRequest(void *pCont, int32_t contLen, int8_t msgType, void *pConn) { static void dnodeRemoveVnode(SVnodeObj *pVnode) {
SCfgDnodeMsg *pCfg = (SCfgDnodeMsg *)pCont;
// remove replica
// remove read queue
dnodeFreeReadWorker(pVnode->rworker);
// remove write queue
dnodeFreeWriteWorker(pVnode->wworker);
// remove wal
// remove tsdb
int32_t code = tsCfgDynamicOptions(pCfg->config);
dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0);
} }
static void dnodeProcessDropStableRequest(void *pCont, int32_t contLen, int8_t msgType, void *pConn) { static int dnodeDropVnode(SVnodeObj *pVnode) {
dnodeSendRspToMnode(pConn, msgType + 1, TSDB_CODE_SUCCESS, NULL, 0);
int count = atomic_sub_fetch_32(&pVnode->refCount, 1);
if (count<=0) dnodeRemoveVnode(pVnode);
return 0;
} }
void dnodeSendVnodeCfgMsg(int32_t vnode) { static void dnodeProcessCreateVnodeMsg(SRpcMsg *pMsg) {
SVpeerCfgMsg *cfg = (SVpeerCfgMsg *) rpcMallocCont(sizeof(SVpeerCfgMsg));
if (cfg == NULL) { // SVnodeObj *pVnode;
return; // int vgId;
} // SVPeersMsg *pCfg;
// check everything, if not ok, set terrno;
cfg->vnode = htonl(vnode); // everything is ok
dnodeSendMsgToMnode(TSDB_MSG_TYPE_VNODE_CFG, cfg, sizeof(SVpeerCfgMsg));
// dnodeCreateVnode(vgId, pCfg);
//if (pVnode == NULL) terrno = TSDB_CODE
} }
void dnodeSendTableCfgMsg(int32_t vnode, int32_t sid) { static void dnodeProcessDropVnodeMsg(SRpcMsg *pMsg) {
STableCfgMsg *cfg = (STableCfgMsg *) rpcMallocCont(sizeof(STableCfgMsg));
if (cfg == NULL) { SVnodeObj *pVnode;
return; int vgId;
}
cfg->vnode = htonl(vnode); // check everything, if not ok, set terrno;
dnodeSendMsgToMnode(TSDB_MSG_TYPE_TABLE_CFG, cfg, sizeof(STableCfgMsg));
// everything is ok
dnodeDropVnode(pVnode);
//if (pVnode == NULL) terrno = TSDB_CODE
} }
static void dnodeInitProcessShellMsg() { static void dnodeProcessAlterVnodeMsg(SRpcMsg *pMsg) {
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_DNODE_CREATE_TABLE] = dnodeProcessCreateTableRequest;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_DNODE_REMOVE_TABLE] = dnodeProcessRemoveTableRequest; SVnodeObj *pVnode;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_CREATE_VNODE] = dnodeProcessCreateVnodeRequest; int vgId;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_FREE_VNODE] = dnodeProcessFreeVnodeRequest;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_DNODE_CFG] = dnodeProcessDnodeCfgRequest; // check everything, if not ok, set terrno;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_ALTER_STREAM] = dnodeProcessAlterStreamRequest;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_DROP_STABLE] = dnodeProcessDropStableRequest;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_VNODE_CFG_RSP] = dnodeProcessVPeerCfgRsp; // everything is ok
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_TABLE_CFG_RSP] = dnodeProcessTableCfgRsp; // dnodeAlterVnode(pVnode);
//if (pVnode == NULL) terrno = TSDB_CODE
} }
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "taosmsg.h"
#include "tlog.h"
#include "trpc.h"
#include "dnodeSystem.h"
#include "dnodeMgmt.h"
#include "dnodeWrite.h"
static void (*dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *);
static void dnodeProcessMsgFromMnode(SRpcMsg *pMsg);
static void *tsDnodeMnodeRpc = NULL;
int32_t dnodeInitMnode() {
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_DNODE_CREATE_TABLE] = dnodeWrite;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_DNODE_REMOVE_TABLE] = dnodeWrite;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_FREE_VNODE] = dnodeMgmt;
SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localIp = tsAnyIp ? "0.0.0.0" : tsPrivateIp;
// note: a new port shall be assigned
// rpcInit.localPort = tsDnodeMnodePort;
rpcInit.label = "DND-mgmt";
rpcInit.numOfThreads = 1;
rpcInit.cfp = dnodeProcessMsgFromMnode;
rpcInit.sessions = TSDB_SESSIONS_PER_DNODE;
rpcInit.connType = TAOS_CONN_SERVER;
rpcInit.idleTime = tsShellActivityTimer * 1500;
tsDnodeMnodeRpc = rpcOpen(&rpcInit);
if (tsDnodeMnodeRpc == NULL) {
dError("failed to init connection from mgmt");
return -1;
}
dPrint("connection to mgmt is opened");
return 0;
}
void dnodeCleanupMnode() {
if (tsDnodeMnodeRpc) {
rpcClose(tsDnodeMnodeRpc);
}
}
static void dnodeProcessMsgFromMnode(SRpcMsg *pMsg) {
SRpcMsg rspMsg;
rspMsg.handle = pMsg->handle;
rspMsg.pCont = NULL;
rspMsg.contLen = 0;
if (dnodeGetRunStatus() != TSDB_DNODE_RUN_STATUS_RUNING) {
rspMsg.code = TSDB_CODE_NOT_READY;
rpcSendResponse(&rspMsg);
rpcFreeCont(pMsg->pCont);
dTrace("conn:%p, query msg is ignored since dnode not running", pMsg->handle);
return;
}
if (dnodeProcessMgmtMsgFp[pMsg->msgType]) {
(*dnodeProcessMgmtMsgFp[pMsg->msgType])(pMsg);
} else {
dError("%s is not processed", taosMsg[pMsg->msgType]);
rspMsg.code = TSDB_CODE_MSG_NOT_PROCESSED;
rpcSendResponse(&rspMsg);
rpcFreeCont(pMsg->pCont);
}
}
...@@ -17,56 +17,196 @@ ...@@ -17,56 +17,196 @@
#include "os.h" #include "os.h"
#include "taoserror.h" #include "taoserror.h"
#include "tlog.h" #include "tlog.h"
#include "tsched.h" #include "trpc.h"
#include "dnode.h" #include "taosmsg.h"
#include "tqueue.h"
#include "dnodeRead.h" #include "dnodeRead.h"
#include "dnodeSystem.h" #include "dnodeMgmt.h"
void dnodeQueryData(SQueryTableMsg *pQuery, void *pConn, void (*callback)(int32_t code, void *pQInfo, void *pConn)) { typedef struct {
dTrace("conn:%p, query msg is disposed", pConn); int32_t code;
void *pQInfo = 100; int32_t count;
callback(TSDB_CODE_SUCCESS, pQInfo, pConn); int32_t numOfVnodes;
} SRpcContext;
typedef struct {
void *pCont;
int contLen;
SRpcMsg rpcMsg;
void *pVnode;
SRpcContext *pRpcContext; // RPC message context
} SReadMsg;
static void *dnodeProcessReadQueue(void *param);
static void dnodeProcessReadResult(SReadMsg *pRead);
static void dnodeHandleIdleReadWorker();
static void dnodeProcessQueryMsg(SReadMsg *pMsg);
static void dnodeProcessRetrieveMsg(SReadMsg *pMsg);
static void (*dnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SReadMsg *pNode);
// module global variable
static taos_qset readQset;
static int threads; // number of query threads
static int maxThreads;
static int minThreads;
int dnodeInitRead() {
dnodeProcessReadMsgFp[TSDB_MSG_TYPE_QUERY] = dnodeProcessQueryMsg;
dnodeProcessReadMsgFp[TSDB_MSG_TYPE_RETRIEVE] = dnodeProcessRetrieveMsg;
readQset = taosOpenQset();
minThreads = 3;
maxThreads = tsNumOfCores*tsNumOfThreadsPerCore;
if (maxThreads <= minThreads*2) maxThreads = 2*minThreads;
return 0;
}
void dnodeCleanupRead() {
taosCloseQset(readQset);
} }
static void dnodeExecuteRetrieveData(SSchedMsg *pSched) { void dnodeRead(SRpcMsg *pMsg) {
SDnodeRetrieveCallbackFp callback = (SDnodeRetrieveCallbackFp)pSched->thandle; int leftLen = pMsg->contLen;
SRetrieveTableMsg *pRetrieve = pSched->msg; char *pCont = (char *)pMsg->pCont;
void *pConn = pSched->ahandle; int contLen = 0;
int numOfVnodes = 0;
int32_t vgId = 0;
SRpcContext *pRpcContext = NULL;
dTrace("conn:%p, retrieve msg is disposed, qhandle:%" PRId64, pConn, pRetrieve->qhandle); // parse head, get number of vnodes;
if ( numOfVnodes > 1) {
pRpcContext = calloc(sizeof(SRpcContext), 1);
pRpcContext->numOfVnodes = 1;
}
//examples while (leftLen > 0) {
int32_t code = TSDB_CODE_SUCCESS; // todo: parse head, get vgId, contLen
void *pQInfo = (void*)pRetrieve->qhandle;
(*callback)(code, pQInfo, pConn); // get pVnode from vgId
void *pVnode = dnodeGetVnode(vgId);
if (pVnode == NULL) {
free(pSched->msg); continue;
}
// put message into queue
SReadMsg readMsg;
readMsg.rpcMsg = *pMsg;
readMsg.pCont = pCont;
readMsg.contLen = contLen;
readMsg.pRpcContext = pRpcContext;
readMsg.pVnode = pVnode;
taos_queue queue = dnodeGetVnodeRworker(pVnode);
taosWriteQitem(queue, &readMsg);
// next vnode
leftLen -= contLen;
pCont -= contLen;
}
} }
void dnodeRetrieveData(SRetrieveTableMsg *pRetrieve, void *pConn, SDnodeRetrieveCallbackFp callbackFp) { void *dnodeAllocateReadWorker() {
dTrace("conn:%p, retrieve msg is received", pConn);
taos_queue *queue = taosOpenQueue(sizeof(SReadMsg));
if ( queue == NULL ) return NULL;
taosAddIntoQset(readQset, queue);
void *msg = malloc(sizeof(SRetrieveTableMsg)); // spawn a thread to process queue
memcpy(msg, pRetrieve, sizeof(SRetrieveTableMsg)); if (threads < maxThreads) {
pthread_t thread;
pthread_attr_t thAttr;
pthread_attr_init(&thAttr);
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
SSchedMsg schedMsg; if (pthread_create(&thread, &thAttr, dnodeProcessReadQueue, readQset) != 0) {
schedMsg.msg = msg; dError("failed to create thread to process read queue, reason:%s", strerror(errno));
schedMsg.ahandle = pConn; }
schedMsg.thandle = callbackFp; }
schedMsg.fp = dnodeExecuteRetrieveData;
taosScheduleTask(tsQueryQhandle, &schedMsg); return queue;
} }
int32_t dnodeGetRetrieveData(void *pQInfo, SRetrieveTableRsp *pRetrieve) { void dnodeFreeReadWorker(void *rqueue) {
dTrace("qInfo:%p, data is retrieved");
pRetrieve->numOfRows = 0; taosCloseQueue(rqueue);
return 0;
// dynamically adjust the number of threads
} }
int32_t dnodeGetRetrieveDataSize(void *pQInfo) { static void *dnodeProcessReadQueue(void *param) {
dTrace("qInfo:%p, contLen is 100"); taos_qset qset = (taos_qset)param;
return 100; SReadMsg readMsg;
while (1) {
if (taosReadQitemFromQset(qset, &readMsg) <= 0) {
dnodeHandleIdleReadWorker();
continue;
}
terrno = 0;
if (dnodeProcessReadMsgFp[readMsg.rpcMsg.msgType]) {
(*dnodeProcessReadMsgFp[readMsg.rpcMsg.msgType]) (&readMsg);
} else {
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
}
dnodeProcessReadResult(&readMsg);
}
return NULL;
}
static void dnodeHandleIdleReadWorker() {
int num = taosGetQueueNumber(readQset);
if (num == 0 || (num <= minThreads && threads > minThreads)) {
threads--;
pthread_exit(NULL);
} else {
usleep(100);
sched_yield();
}
}
static void dnodeProcessReadResult(SReadMsg *pRead) {
SRpcContext *pRpcContext = pRead->pRpcContext;
int32_t code = 0;
dnodeReleaseVnode(pRead->pVnode);
if (pRpcContext) {
if (terrno) {
if (pRpcContext->code == 0) pRpcContext->code = terrno;
}
int count = atomic_add_fetch_32(&pRpcContext->count, 1);
if (count < pRpcContext->numOfVnodes) {
// not over yet, multiple vnodes
return;
}
// over, result can be merged now
code = pRpcContext->code;
} else {
code = terrno;
}
SRpcMsg rsp;
rsp.handle = pRead->rpcMsg.handle;
rsp.code = code;
rsp.pCont = NULL;
rpcSendResponse(&rsp);
rpcFreeCont(pRead->rpcMsg.pCont); // free the received message
} }
static void dnodeProcessQueryMsg(SReadMsg *pMsg) {
}
static void dnodeProcessRetrieveMsg(SReadMsg *pMsg) {
}
...@@ -19,31 +19,22 @@ ...@@ -19,31 +19,22 @@
#include "taosdef.h" #include "taosdef.h"
#include "taosmsg.h" #include "taosmsg.h"
#include "tlog.h" #include "tlog.h"
#include "tsocket.h"
#include "tschemautil.h"
#include "textbuffer.h"
#include "trpc.h" #include "trpc.h"
#include "http.h"
#include "dnode.h"
#include "dnodeMgmt.h"
#include "dnodeRead.h"
#include "dnodeSystem.h" #include "dnodeSystem.h"
#include "dnodeShell.h" #include "dnodeRead.h"
#include "dnodeVnodeMgmt.h"
#include "dnodeWrite.h" #include "dnodeWrite.h"
#include "dnodeShell.h"
static void dnodeProcessRetrieveMsg(void *pCont, int32_t contLen, void *pConn); static void (*dnodeProcessShellMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *);
static void dnodeProcessQueryMsg(void *pCont, int32_t contLen, void *pConn); static void dnodeProcessMsgFromShell(SRpcMsg *pMsg);
static void dnodeProcessSubmitMsg(void *pCont, int32_t contLen, void *pConn); static void *tsDnodeShellRpc = NULL;
static void dnodeProcessMsgFromShell(char msgType, void *pCont, int contLen, void *handle, int32_t code);
static int dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey);
static void *tsDnodeShellServer = NULL;
static int32_t tsDnodeQueryReqNum = 0;
static int32_t tsDnodeSubmitReqNum = 0;
int32_t dnodeInitShell() { int32_t dnodeInitShell() {
int32_t numOfThreads = tsNumOfCores * tsNumOfThreadsPerCore; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeWrite;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_QUERY] = dnodeRead;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_RETRIEVE] = dnodeRead;
int numOfThreads = tsNumOfCores * tsNumOfThreadsPerCore;
numOfThreads = (int32_t) ((1.0 - tsRatioOfQueryThreads) * numOfThreads / 2.0); numOfThreads = (int32_t) ((1.0 - tsRatioOfQueryThreads) * numOfThreads / 2.0);
if (numOfThreads < 1) { if (numOfThreads < 1) {
numOfThreads = 1; numOfThreads = 1;
...@@ -58,164 +49,47 @@ int32_t dnodeInitShell() { ...@@ -58,164 +49,47 @@ int32_t dnodeInitShell() {
rpcInit.cfp = dnodeProcessMsgFromShell; rpcInit.cfp = dnodeProcessMsgFromShell;
rpcInit.sessions = TSDB_SESSIONS_PER_DNODE; rpcInit.sessions = TSDB_SESSIONS_PER_DNODE;
rpcInit.connType = TAOS_CONN_SERVER; rpcInit.connType = TAOS_CONN_SERVER;
rpcInit.idleTime = tsShellActivityTimer * 2000; rpcInit.idleTime = tsShellActivityTimer * 1500;
rpcInit.afp = dnodeRetrieveUserAuthInfo;
tsDnodeShellServer = rpcOpen(&rpcInit); tsDnodeShellRpc = rpcOpen(&rpcInit);
if (tsDnodeShellServer == NULL) { if (tsDnodeShellRpc == NULL) {
dError("failed to init connection from shell"); dError("failed to init connection from shell");
return -1; return -1;
} }
dPrint("shell is opened"); dPrint("connection to shell is opened");
return TSDB_CODE_SUCCESS; return 0;
} }
void dnodeCleanupShell() { void dnodeCleanupShell() {
if (tsDnodeShellServer) { if (tsDnodeShellRpc) {
rpcClose(tsDnodeShellServer); rpcClose(tsDnodeShellRpc);
} }
} }
SDnodeStatisInfo dnodeGetStatisInfo() { void dnodeProcessMsgFromShell(SRpcMsg *pMsg) {
SDnodeStatisInfo info = {0}; SRpcMsg rpcMsg;
if (dnodeGetRunStatus() == TSDB_DNODE_RUN_STATUS_RUNING) {
info.httpReqNum = httpGetReqCount();
info.queryReqNum = atomic_exchange_32(&tsDnodeQueryReqNum, 0);
info.submitReqNum = atomic_exchange_32(&tsDnodeSubmitReqNum, 0);
}
return info; rpcMsg.handle = pMsg->handle;
} rpcMsg.pCont = NULL;
rpcMsg.contLen = 0;
static void dnodeProcessMsgFromShell(char msgType, void *pCont, int contLen, void *handle, int32_t code) {
if (dnodeGetRunStatus() != TSDB_DNODE_RUN_STATUS_RUNING) { if (dnodeGetRunStatus() != TSDB_DNODE_RUN_STATUS_RUNING) {
rpcSendResponse(handle, TSDB_CODE_NOT_READY, 0, 0); dError("RPC %p, shell msg is ignored since dnode not running", pMsg->handle);
dTrace("query msg is ignored since dnode not running"); rpcMsg.code = TSDB_CODE_NOT_READY;
rpcSendResponse(&rpcMsg);
rpcFreeCont(pMsg->pCont);
return; return;
} }
dTrace("conn:%p, msg:%s is received", handle, taosMsg[(int8_t)msgType]); if ( dnodeProcessShellMsgFp[pMsg->msgType] ) {
(*dnodeProcessShellMsgFp[pMsg->msgType])(pMsg);
if (msgType == TSDB_MSG_TYPE_QUERY) {
dnodeProcessQueryMsg(pCont, contLen, handle);
} else if (msgType == TSDB_MSG_TYPE_RETRIEVE) {
dnodeProcessRetrieveMsg(pCont, contLen, handle);
} else if (msgType == TSDB_MSG_TYPE_SUBMIT) {
dnodeProcessSubmitMsg(pCont, contLen, handle);
} else { } else {
dError("conn:%p, msg:%s is not processed", handle, taosMsg[(int8_t)msgType]); dError("RPC %p, msg:%s from shell is not handled", pMsg->handle, taosMsg[pMsg->msgType]);
rpcMsg.code = TSDB_CODE_MSG_NOT_PROCESSED;
rpcSendResponse(&rpcMsg);
rpcFreeCont(pMsg->pCont);
} }
//TODO free may be cause segmentfault
// rpcFreeCont(pCont);
} }
static int dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey) {
return TSDB_CODE_SUCCESS;
}
static void dnodeProcessQueryMsgCb(int32_t code, void *pQInfo, void *pConn) {
dTrace("conn:%p, query is returned, code:%d", pConn, code);
int32_t contLen = sizeof(SQueryTableRsp);
SQueryTableRsp *queryRsp = (SQueryTableRsp *) rpcMallocCont(contLen);
if (queryRsp == NULL) {
rpcSendResponse(pConn, TSDB_CODE_SERV_OUT_OF_MEMORY, NULL, 0);
return;
}
queryRsp->code = htonl(code);
queryRsp->qhandle = htobe64((uint64_t) (pQInfo));
rpcSendResponse(pConn, TSDB_CODE_SUCCESS, queryRsp, contLen);
}
static void dnodeProcessQueryMsg(void *pCont, int32_t contLen, void *pConn) {
atomic_fetch_add_32(&tsDnodeQueryReqNum, 1);
SQueryTableMsg *pQuery = (SQueryTableMsg *) pCont;
dnodeQueryData(pQuery, pConn, dnodeProcessQueryMsgCb);
}
void dnodeProcessRetrieveMsgCb(int32_t code, void *pQInfo, void *pConn) {
dTrace("conn:%p, retrieve is returned, code:%d", pConn, code);
assert(pConn != NULL);
if (code != TSDB_CODE_SUCCESS) {
rpcSendResponse(pConn, code, 0, 0);
return;
}
assert(pQInfo != NULL);
int32_t contLen = dnodeGetRetrieveDataSize(pQInfo);
SRetrieveTableRsp *pRetrieve = (SRetrieveTableRsp *) rpcMallocCont(contLen);
if (pRetrieve == NULL) {
rpcSendResponse(pConn, TSDB_CODE_SERV_OUT_OF_MEMORY, 0, 0);
return;
}
code = dnodeGetRetrieveData(pQInfo, pRetrieve);
if (code != TSDB_CODE_SUCCESS) {
rpcSendResponse(pConn, TSDB_CODE_INVALID_QHANDLE, 0, 0);
}
pRetrieve->numOfRows = htonl(pRetrieve->numOfRows);
pRetrieve->precision = htons(pRetrieve->precision);
pRetrieve->offset = htobe64(pRetrieve->offset);
pRetrieve->useconds = htobe64(pRetrieve->useconds);
rpcSendResponse(pConn, TSDB_CODE_SUCCESS, pRetrieve, contLen);
}
static void dnodeProcessRetrieveMsg(void *pCont, int32_t contLen, void *pConn) {
SRetrieveTableMsg *pRetrieve = (SRetrieveTableMsg *) pCont;
pRetrieve->qhandle = htobe64(pRetrieve->qhandle);
pRetrieve->free = htons(pRetrieve->free);
dnodeRetrieveData(pRetrieve, pConn, dnodeProcessRetrieveMsgCb);
}
void dnodeProcessSubmitMsgCb(SShellSubmitRspMsg *result, void *pConn) {
assert(result != NULL);
dTrace("conn:%p, submit is returned, code:%d", pConn, result->code);
if (result->code != 0) {
rpcSendResponse(pConn, result->code, NULL, 0);
return;
}
int32_t contLen = sizeof(SShellSubmitRspMsg) + result->numOfFailedBlocks * sizeof(SShellSubmitRspBlock);
SShellSubmitRspMsg *submitRsp = (SShellSubmitRspMsg *) rpcMallocCont(contLen);
if (submitRsp == NULL) {
rpcSendResponse(pConn, TSDB_CODE_SERV_OUT_OF_MEMORY, NULL, 0);
return;
}
memcpy(submitRsp, result, contLen);
for (int i = 0; i < submitRsp->numOfFailedBlocks; ++i) {
SShellSubmitRspBlock *block = &submitRsp->failedBlocks[i];
if (block->code == TSDB_CODE_NOT_ACTIVE_VNODE || block->code == TSDB_CODE_INVALID_VNODE_ID) {
dnodeSendVnodeCfgMsg(block->vnode);
} else if (block->code == TSDB_CODE_INVALID_TABLE_ID || block->code == TSDB_CODE_NOT_ACTIVE_TABLE) {
dnodeSendTableCfgMsg(block->vnode, block->sid);
}
block->index = htonl(block->index);
block->vnode = htonl(block->vnode);
block->sid = htonl(block->sid);
block->code = htonl(block->code);
}
submitRsp->code = htonl(submitRsp->code);
submitRsp->numOfRows = htonl(submitRsp->numOfRows);
submitRsp->affectedRows = htonl(submitRsp->affectedRows);
submitRsp->failedRows = htonl(submitRsp->failedRows);
submitRsp->numOfFailedBlocks = htonl(submitRsp->numOfFailedBlocks);
rpcSendResponse(pConn, TSDB_CODE_SUCCESS, submitRsp, contLen);
}
static void dnodeProcessSubmitMsg(void *pCont, int32_t contLen, void *pConn) {
atomic_fetch_add_32(&tsDnodeSubmitReqNum, 1);
SShellSubmitMsg *pSubmit = (SShellSubmitMsg *) pCont;
dnodeWriteData(pSubmit, pConn, dnodeProcessSubmitMsgCb);
}
...@@ -25,12 +25,12 @@ ...@@ -25,12 +25,12 @@
#include "ttimer.h" #include "ttimer.h"
#include "tutil.h" #include "tutil.h"
#include "http.h" #include "http.h"
#include "trpc.h"
#include "dnode.h" #include "dnode.h"
#include "dnodeMgmt.h" #include "dnodeMgmt.h"
#include "dnodeModule.h" #include "dnodeModule.h"
#include "dnodeShell.h" #include "dnodeShell.h"
#include "dnodeSystem.h" #include "dnodeSystem.h"
#include "dnodeVnodeMgmt.h"
#ifdef CLUSTER #ifdef CLUSTER
#include "account.h" #include "account.h"
...@@ -50,6 +50,8 @@ static int32_t dnodeInitRpcQHandle(); ...@@ -50,6 +50,8 @@ static int32_t dnodeInitRpcQHandle();
static int32_t dnodeInitQueryQHandle(); static int32_t dnodeInitQueryQHandle();
static int32_t dnodeInitTmrCtl(); static int32_t dnodeInitTmrCtl();
int32_t (*dnodeInitPeers)(int32_t numOfThreads) = NULL;
void *tsDnodeTmr; void *tsDnodeTmr;
void **tsRpcQhandle; void **tsRpcQhandle;
void *tsDnodeMgmtQhandle; void *tsDnodeMgmtQhandle;
...@@ -93,7 +95,7 @@ void dnodeCleanUpSystem() { ...@@ -93,7 +95,7 @@ void dnodeCleanUpSystem() {
dnodeCleanupShell(); dnodeCleanupShell();
dnodeCleanUpModules(); dnodeCleanUpModules();
dnodeCleanupVnodes(); dnodeCleanupMgmt();
taosCloseLogger(); taosCloseLogger();
dnodeCleanupStorage(); dnodeCleanupStorage();
dnodeCleanVnodesLock(); dnodeCleanVnodesLock();
...@@ -154,7 +156,7 @@ int32_t dnodeInitSystem() { ...@@ -154,7 +156,7 @@ int32_t dnodeInitSystem() {
return -1; return -1;
} }
dnodeInitMgmtIp(); // dnodeInitMgmtIp();
tsPrintGlobalConfig(); tsPrintGlobalConfig();
...@@ -193,7 +195,7 @@ int32_t dnodeInitSystem() { ...@@ -193,7 +195,7 @@ int32_t dnodeInitSystem() {
return -1; return -1;
} }
if (dnodeOpenVnodes() < 0) { if (dnodeInitMgmt() < 0) {
dError("failed to init vnode storage"); dError("failed to init vnode storage");
return -1; return -1;
} }
...@@ -303,9 +305,3 @@ int32_t (*dnodeCheckSystem)() = dnodeCheckSystemImp; ...@@ -303,9 +305,3 @@ int32_t (*dnodeCheckSystem)() = dnodeCheckSystemImp;
int32_t dnodeInitPeersImp(int32_t numOfThreads) { int32_t dnodeInitPeersImp(int32_t numOfThreads) {
return 0; return 0;
} }
int32_t (*dnodeInitPeers)(int32_t numOfThreads) = dnodeInitPeersImp;
...@@ -17,82 +17,246 @@ ...@@ -17,82 +17,246 @@
#include "os.h" #include "os.h"
#include "taoserror.h" #include "taoserror.h"
#include "tlog.h" #include "tlog.h"
#include "tutil.h" #include "trpc.h"
#include "tqueue.h"
#include "taosmsg.h"
#include "dnodeWrite.h" #include "dnodeWrite.h"
#include "dnodeVnodeMgmt.h" #include "dnodeMgmt.h"
void dnodeWriteData(SShellSubmitMsg *pSubmit, void *pConn, void (*callback)(SShellSubmitRspMsg *rsp, void *pConn)) { typedef struct {
dTrace("submit msg is disposed, affectrows:1"); int32_t code;
int32_t count; // number of vnodes returned result
int32_t numOfVnodes; // number of vnodes involved
} SRpcContext;
SShellSubmitRspMsg result = {0}; typedef struct _write {
void *pCont;
int contLen;
SRpcMsg rpcMsg;
void *pVnode; // pointer to vnode
SRpcContext *pRpcContext; // RPC message context
} SWriteMsg;
int32_t numOfSid = htonl(pSubmit->numOfSid); typedef struct {
if (numOfSid <= 0) { taos_qset qset; // queue set
dError("invalid num of tables:%d", numOfSid); pthread_t thread; // thread
result.code = TSDB_CODE_INVALID_QUERY_MSG; int workerId; // worker ID
callback(&result, pConn); } SWriteWorker;
typedef struct _thread_obj {
int max; // max number of workers
int nextId; // from 0 to max-1, cyclic
SWriteWorker *writeWorker;
} SWriteWorkerPool;
static void (*dnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MAX])(SWriteMsg *);
static void *dnodeProcessWriteQueue(void *param);
static void dnodeHandleIdleWorker(SWriteWorker *pWorker);
static void dnodeProcessWriteResult(SWriteMsg *pWrite);
static void dnodeProcessSubmitMsg(SWriteMsg *pMsg);
static void dnodeProcessCreateTableMsg(SWriteMsg *pMsg);
static void dnodeProcessDropTableMsg(SWriteMsg *pMsg);
SWriteWorkerPool wWorkerPool;
int dnodeInitWrite() {
dnodeProcessWriteMsgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeProcessSubmitMsg;
dnodeProcessWriteMsgFp[TSDB_MSG_TYPE_DNODE_CREATE_TABLE] = dnodeProcessCreateTableMsg;
dnodeProcessWriteMsgFp[TSDB_MSG_TYPE_DNODE_REMOVE_TABLE] = dnodeProcessDropTableMsg;
wWorkerPool.max = tsNumOfCores;
wWorkerPool.writeWorker = (SWriteWorker *)calloc(sizeof(SWriteWorker), wWorkerPool.max);
if (wWorkerPool.writeWorker == NULL) return -1;
for (int i=0; i<wWorkerPool.max; ++i) {
wWorkerPool.writeWorker[i].workerId = i;
}
return 0;
}
void dnodeCleanupWrite() {
free(wWorkerPool.writeWorker);
}
void dnodeWrite(SRpcMsg *pMsg) {
int leftLen = pMsg->contLen;
char *pCont = (char *)pMsg->pCont;
int contLen = 0;
int numOfVnodes = 0;
int32_t vgId = 0;
SRpcContext *pRpcContext = NULL;
// parse head, get number of vnodes;
if ( numOfVnodes > 1) {
pRpcContext = calloc(sizeof(SRpcContext), 1);
pRpcContext->numOfVnodes = numOfVnodes;
}
while (leftLen > 0) {
// todo: parse head, get vgId, contLen
// get pVnode from vgId
void *pVnode = dnodeGetVnode(vgId);
if (pVnode == NULL) {
continue;
}
// put message into queue
SWriteMsg writeMsg;
writeMsg.rpcMsg = *pMsg;
writeMsg.pCont = pCont;
writeMsg.contLen = contLen;
writeMsg.pRpcContext = pRpcContext;
writeMsg.pVnode = pVnode; // pVnode shall be saved for usage later
taos_queue queue = dnodeGetVnodeWworker(pVnode);
taosWriteQitem(queue, &writeMsg);
// next vnode
leftLen -= contLen;
pCont -= contLen;
}
}
void *dnodeAllocateWriteWorker() {
SWriteWorker *pWorker = wWorkerPool.writeWorker + wWorkerPool.nextId;
if (pWorker->qset == NULL) {
pWorker->qset = taosOpenQset();
if (pWorker->qset == NULL) return NULL;
pthread_attr_t thAttr;
pthread_attr_init(&thAttr);
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
if (pthread_create(&pWorker->thread, &thAttr, dnodeProcessWriteQueue, pWorker) != 0) {
dError("failed to create thread to process read queue, reason:%s", strerror(errno));
taosCloseQset(pWorker->qset);
} }
}
taos_queue *queue = taosOpenQueue(sizeof(SWriteMsg));
if (queue) {
taosAddIntoQset(pWorker->qset, queue);
wWorkerPool.nextId = (wWorkerPool.nextId + 1) % wWorkerPool.max;
}
return queue;
}
void dnodeFreeWriteWorker(void *wqueue) {
result.code = 0; taosCloseQueue(wqueue);
result.numOfRows = 1;
result.affectedRows = 1; // dynamically adjust the number of threads
result.numOfFailedBlocks = 0;
callback(&result, pConn);
} }
int32_t dnodeCreateTable(SDCreateTableMsg *pTable) { static void *dnodeProcessWriteQueue(void *param) {
if (pTable->tableType == TSDB_TABLE_TYPE_CHILD_TABLE) { SWriteWorker *pWorker = (SWriteWorker *)param;
dTrace("table:%s, start to create child table, stable:%s", pTable->tableId, pTable->superTableId); taos_qall qall;
} else if (pTable->tableType == TSDB_TABLE_TYPE_NORMAL_TABLE){ SWriteMsg writeMsg;
dTrace("table:%s, start to create normal table", pTable->tableId); int numOfMsgs;
} else if (pTable->tableType == TSDB_TABLE_TYPE_STREAM_TABLE){
dTrace("table:%s, start to create stream table", pTable->tableId); while (1) {
numOfMsgs = taosReadAllQitemsFromQset(pWorker->qset, &qall);
if (numOfMsgs <=0) {
dnodeHandleIdleWorker(pWorker); // thread exit if no queues anymore
continue;
}
for (int i=0; i<numOfMsgs; ++i) {
// retrieve all items, and write them into WAL
taosGetQitem(qall, &writeMsg);
// walWrite(pVnode->whandle, writeMsg.rpcMsg.msgType, writeMsg.pCont, writeMsg.contLen);
}
// flush WAL file
// walFsync(pVnode->whandle);
// browse all items, and process them one by one
taosResetQitems(qall);
for (int i=0; i<numOfMsgs; ++i) {
taosGetQitem(qall, &writeMsg);
terrno = 0;
if (dnodeProcessWriteMsgFp[writeMsg.rpcMsg.msgType]) {
(*dnodeProcessWriteMsgFp[writeMsg.rpcMsg.msgType]) (&writeMsg);
} else { } else {
dError("table:%s, invalid table type:%d", pTable->tableType); terrno = TSDB_CODE_MSG_NOT_PROCESSED;
} }
for (int i = 0; i < pTable->numOfVPeers; ++i) { dnodeProcessWriteResult(&writeMsg);
dTrace("table:%s ip:%s vnode:%d sid:%d", pTable->tableId, taosIpStr(pTable->vpeerDesc[i].ip),
pTable->vpeerDesc[i].vnode, pTable->sid);
} }
SSchema *pSchema = (SSchema *) pTable->data; // free the Qitems;
for (int32_t col = 0; col < pTable->numOfColumns; ++col) { taosFreeQitems(qall);
dTrace("table:%s col index:%d colId:%d bytes:%d type:%d name:%s",
pTable->tableId, col, pSchema->colId, pSchema->bytes, pSchema->type, pSchema->name);
pSchema++;
} }
for (int32_t col = 0; col < pTable->numOfTags; ++col) {
dTrace("table:%s tag index:%d colId:%d bytes:%d type:%d name:%s", return NULL;
pTable->tableId, col, pSchema->colId, pSchema->bytes, pSchema->type, pSchema->name); }
pSchema++;
static void dnodeProcessWriteResult(SWriteMsg *pWrite) {
SRpcContext *pRpcContext = pWrite->pRpcContext;
int32_t code = 0;
dnodeReleaseVnode(pWrite->pVnode);
if (pRpcContext) {
if (terrno) {
if (pRpcContext->code == 0) pRpcContext->code = terrno;
}
int count = atomic_add_fetch_32(&pRpcContext->count, 1);
if (count < pRpcContext->numOfVnodes) {
// not over yet, multiple vnodes
return;
}
// over, result can be merged now
code = pRpcContext->code;
} else {
code = terrno;
} }
return TSDB_CODE_SUCCESS; SRpcMsg rsp;
rsp.handle = pWrite->rpcMsg.handle;
rsp.code = code;
rsp.pCont = NULL;
rpcSendResponse(&rsp);
rpcFreeCont(pWrite->rpcMsg.pCont); // free the received message
} }
/* static void dnodeHandleIdleWorker(SWriteWorker *pWorker) {
* Remove table from local repository
*/ int num = taosGetQueueNumber(pWorker->qset);
int32_t dnodeDropTable(SDRemoveTableMsg *pTable) {
dPrint("table:%s, sid:%d is removed", pTable->tableId, pTable->sid); if (num > 0) {
return TSDB_CODE_SUCCESS; usleep(100);
sched_yield();
} else {
taosCloseQset(pWorker->qset);
pWorker->qset = NULL;
pthread_exit(NULL);
}
} }
/* static void dnodeProcessSubmitMsg(SWriteMsg *pMsg) {
* Create stream
* if stream already exist, update it
*/
int32_t dnodeCreateStream(SDAlterStreamMsg *pStream) {
dPrint("stream:%s, is created, ", pStream->tableId);
return TSDB_CODE_SUCCESS;
} }
/* static void dnodeProcessCreateTableMsg(SWriteMsg *pMsg) {
* Remove all child tables of supertable from local repository
*/
int32_t dnodeDropSuperTable(SDRemoveSuperTableMsg *pStable) {
dPrint("stable:%s, is removed", pStable->tableId);
return TSDB_CODE_SUCCESS;
} }
static void dnodeProcessDropTableMsg(SWriteMsg *pMsg) {
}
...@@ -48,7 +48,7 @@ static void mgmtSendMsgToDnodeQueueFp(SSchedMsg *sched) { ...@@ -48,7 +48,7 @@ static void mgmtSendMsgToDnodeQueueFp(SSchedMsg *sched) {
void *ahandle = sched->ahandle; void *ahandle = sched->ahandle;
int8_t *pCont = sched->msg; int8_t *pCont = sched->msg;
dnodeProcessMsgFromMgmt(msgType, pCont, contLen, ahandle, code); // dnodeProcessMsgFromMgmt(msgType, pCont, contLen, ahandle, code);
} }
void mgmtSendMsgToDnode(SRpcIpSet *ipSet, int8_t msgType, void *pCont, int32_t contLen, void *ahandle) { void mgmtSendMsgToDnode(SRpcIpSet *ipSet, int8_t msgType, void *pCont, int32_t contLen, void *ahandle) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册