未验证 提交 09e20e3d 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #8944 from taosdata/feature/dnode3

Feature/dnode3
...@@ -383,14 +383,9 @@ typedef struct { ...@@ -383,14 +383,9 @@ typedef struct {
int32_t maxUsers; int32_t maxUsers;
int32_t maxDbs; int32_t maxDbs;
int32_t maxTimeSeries; int32_t maxTimeSeries;
int32_t maxConnections;
int32_t maxStreams; int32_t maxStreams;
int32_t maxPointsPerSecond;
int64_t maxStorage; // In unit of GB int64_t maxStorage; // In unit of GB
int64_t maxQueryTime; // In unit of hour int32_t accessState; // Configured only by command
int64_t maxInbound;
int64_t maxOutbound;
int8_t accessState; // Configured only by command
} SCreateAcctMsg, SAlterAcctMsg; } SCreateAcctMsg, SAlterAcctMsg;
typedef struct { typedef struct {
...@@ -420,7 +415,7 @@ typedef struct { ...@@ -420,7 +415,7 @@ typedef struct {
typedef struct SColIndex { typedef struct SColIndex {
int16_t colId; // column id int16_t colId; // column id
int16_t colIndex; // column index in colList if it is a normal column or index in tagColList if a tag int16_t colIndex; // column index in colList if it is a normal column or index in tagColList if a tag
uint16_t flag; // denote if it is a tag or a normal column int16_t flag; // denote if it is a tag or a normal column
char name[TSDB_COL_NAME_LEN + TSDB_DB_NAME_LEN + 1]; char name[TSDB_COL_NAME_LEN + TSDB_DB_NAME_LEN + 1];
} SColIndex; } SColIndex;
...@@ -515,8 +510,8 @@ typedef struct { ...@@ -515,8 +510,8 @@ typedef struct {
int16_t numOfCols; // the number of columns will be load from vnode int16_t numOfCols; // the number of columns will be load from vnode
SInterval interval; SInterval interval;
// SSessionWindow sw; // session window // SSessionWindow sw; // session window
uint16_t tagCondLen; // tag length in current query int16_t tagCondLen; // tag length in current query
uint16_t colCondLen; // column length in current query int16_t colCondLen; // column length in current query
int16_t numOfGroupCols; // num of group by columns int16_t numOfGroupCols; // num of group by columns
int16_t orderByIdx; int16_t orderByIdx;
int16_t orderType; // used in group by xx order by xxx int16_t orderType; // used in group by xx order by xxx
...@@ -524,10 +519,10 @@ typedef struct { ...@@ -524,10 +519,10 @@ typedef struct {
int16_t prjOrder; // global order in super table projection query. int16_t prjOrder; // global order in super table projection query.
int64_t limit; int64_t limit;
int64_t offset; int64_t offset;
uint32_t queryType; // denote another query process int32_t queryType; // denote another query process
int16_t numOfOutput; // final output columns numbers int16_t numOfOutput; // final output columns numbers
int16_t fillType; // interpolate type int16_t fillType; // interpolate type
uint64_t fillVal; // default value array list int64_t fillVal; // default value array list
int32_t secondStageOutput; int32_t secondStageOutput;
STsBufInfo tsBuf; // tsBuf info STsBufInfo tsBuf; // tsBuf info
int32_t numOfTags; // number of tags columns involved int32_t numOfTags; // number of tags columns involved
...@@ -543,30 +538,38 @@ typedef struct { ...@@ -543,30 +538,38 @@ typedef struct {
typedef struct { typedef struct {
int32_t code; int32_t code;
union{uint64_t qhandle; uint64_t qId;}; // query handle union {
uint64_t qhandle;
uint64_t qId;
}; // query handle
} SQueryTableRsp; } SQueryTableRsp;
// todo: the show handle should be replaced with id // todo: the show handle should be replaced with id
typedef struct { typedef struct {
SMsgHead header; SMsgHead header;
union{uint64_t qhandle; uint64_t qId;}; // query handle union {
uint16_t free; int32_t showId;
int64_t qhandle;
int64_t qId;
}; // query handle
int8_t free;
} SRetrieveTableMsg; } SRetrieveTableMsg;
typedef struct SRetrieveTableRsp { typedef struct SRetrieveTableRsp {
int32_t numOfRows; int32_t numOfRows;
int8_t completed; // all results are returned to client
int16_t precision;
int64_t offset; // updated offset value for multi-vnode projection query int64_t offset; // updated offset value for multi-vnode projection query
int64_t useconds; int64_t useconds;
int8_t completed; // all results are returned to client
int8_t precision;
int8_t compressed; int8_t compressed;
int8_t reserved;
int32_t compLen; int32_t compLen;
char data[]; char data[];
} SRetrieveTableRsp; } SRetrieveTableRsp;
typedef struct { typedef struct {
char db[TSDB_FULL_DB_NAME_LEN]; char db[TSDB_FULL_DB_NAME_LEN];
int32_t cacheBlockSize; //MB int32_t cacheBlockSize; // MB
int32_t totalBlocks; int32_t totalBlocks;
int32_t maxTables; int32_t maxTables;
int32_t daysPerFile; int32_t daysPerFile;
...@@ -577,7 +580,7 @@ typedef struct { ...@@ -577,7 +580,7 @@ typedef struct {
int32_t maxRowsPerFileBlock; int32_t maxRowsPerFileBlock;
int32_t commitTime; int32_t commitTime;
int32_t fsyncPeriod; int32_t fsyncPeriod;
uint8_t precision; // time resolution int8_t precision; // time resolution
int8_t compression; int8_t compression;
int8_t walLevel; int8_t walLevel;
int8_t replications; int8_t replications;
...@@ -594,7 +597,7 @@ typedef struct { ...@@ -594,7 +597,7 @@ typedef struct {
char name[TSDB_FUNC_NAME_LEN]; char name[TSDB_FUNC_NAME_LEN];
char path[PATH_MAX]; char path[PATH_MAX];
int32_t funcType; int32_t funcType;
uint8_t outputType; int8_t outputType;
int16_t outputLen; int16_t outputLen;
int32_t bufSize; int32_t bufSize;
int32_t codeLen; int32_t codeLen;
...@@ -627,7 +630,7 @@ typedef struct { ...@@ -627,7 +630,7 @@ typedef struct {
typedef struct { typedef struct {
char db[TSDB_TABLE_FNAME_LEN]; char db[TSDB_TABLE_FNAME_LEN];
uint8_t ignoreNotExists; int8_t ignoreNotExists;
} SDropDbMsg, SUseDbMsg, SSyncDbMsg; } SDropDbMsg, SUseDbMsg, SSyncDbMsg;
typedef struct { typedef struct {
...@@ -701,7 +704,7 @@ typedef struct { ...@@ -701,7 +704,7 @@ typedef struct {
typedef struct { typedef struct {
char db[TSDB_FULL_DB_NAME_LEN]; char db[TSDB_FULL_DB_NAME_LEN];
uint32_t vgId; int32_t vgId;
int32_t cacheBlockSize; int32_t cacheBlockSize;
int32_t totalBlocks; int32_t totalBlocks;
int32_t daysPerFile; int32_t daysPerFile;
...@@ -764,18 +767,17 @@ typedef struct { ...@@ -764,18 +767,17 @@ typedef struct {
typedef struct STableMetaMsg { typedef struct STableMetaMsg {
int32_t contLen; int32_t contLen;
char tableFname[TSDB_TABLE_FNAME_LEN]; // table id char tableFname[TSDB_TABLE_FNAME_LEN]; // table id
uint8_t numOfTags; int8_t numOfTags;
uint8_t precision; int8_t precision;
uint8_t tableType; int8_t tableType;
int16_t numOfColumns; int16_t numOfColumns;
int16_t sversion; int16_t sversion;
int16_t tversion; int16_t tversion;
int32_t tid; int32_t tid;
uint64_t uid; int64_t uid;
SVgroupMsg vgroup; SVgroupMsg vgroup;
char sTableName[TSDB_TABLE_FNAME_LEN]; char sTableName[TSDB_TABLE_FNAME_LEN];
uint64_t suid; int64_t suid;
SSchema schema[]; SSchema schema[];
} STableMetaMsg; } STableMetaMsg;
...@@ -784,8 +786,8 @@ typedef struct SMultiTableMeta { ...@@ -784,8 +786,8 @@ typedef struct SMultiTableMeta {
int32_t numOfVgroup; int32_t numOfVgroup;
int32_t numOfUdf; int32_t numOfUdf;
int32_t contLen; int32_t contLen;
uint8_t compressed; // denote if compressed or not int8_t compressed; // denote if compressed or not
uint32_t rawLen; // size before compress int32_t rawLen; // size before compress
uint8_t metaClone; // make meta clone after retrieve meta from mnode uint8_t metaClone; // make meta clone after retrieve meta from mnode
char meta[]; char meta[];
} SMultiTableMeta; } SMultiTableMeta;
...@@ -804,7 +806,7 @@ typedef struct { ...@@ -804,7 +806,7 @@ typedef struct {
typedef struct { typedef struct {
int8_t type; int8_t type;
char db[TSDB_FULL_DB_NAME_LEN]; char db[TSDB_FULL_DB_NAME_LEN];
uint16_t payloadLen; int16_t payloadLen;
char payload[]; char payload[];
} SShowMsg; } SShowMsg;
...@@ -815,7 +817,7 @@ typedef struct { ...@@ -815,7 +817,7 @@ typedef struct {
} SCompactMsg; } SCompactMsg;
typedef struct SShowRsp { typedef struct SShowRsp {
uint64_t qhandle; int32_t showId;
STableMetaMsg tableMeta; STableMetaMsg tableMeta;
} SShowRsp; } SShowRsp;
...@@ -837,7 +839,7 @@ typedef struct { ...@@ -837,7 +839,7 @@ typedef struct {
} SConfigTableMsg; } SConfigTableMsg;
typedef struct { typedef struct {
uint32_t dnodeId; int32_t dnodeId;
int32_t vgId; int32_t vgId;
} SConfigVnodeMsg; } SConfigVnodeMsg;
...@@ -848,22 +850,22 @@ typedef struct { ...@@ -848,22 +850,22 @@ typedef struct {
typedef struct { typedef struct {
char sql[TSDB_SHOW_SQL_LEN]; char sql[TSDB_SHOW_SQL_LEN];
uint32_t queryId; int32_t queryId;
int64_t useconds; int64_t useconds;
int64_t stime; int64_t stime;
uint64_t qId; int64_t qId;
uint64_t sqlObjId; int64_t sqlObjId;
int32_t pid; int32_t pid;
char fqdn[TSDB_FQDN_LEN]; char fqdn[TSDB_FQDN_LEN];
uint8_t stableQuery; int8_t stableQuery;
int32_t numOfSub; int32_t numOfSub;
char subSqlInfo[TSDB_SHOW_SUBQUERY_LEN]; //include subqueries' index, Obj IDs and states(C-complete/I-imcomplete) char subSqlInfo[TSDB_SHOW_SUBQUERY_LEN]; // include subqueries' index, Obj IDs and states(C-complete/I-imcomplete)
} SQueryDesc; } SQueryDesc;
typedef struct { typedef struct {
char sql[TSDB_SHOW_SQL_LEN]; char sql[TSDB_SHOW_SQL_LEN];
char dstTable[TSDB_TABLE_NAME_LEN]; char dstTable[TSDB_TABLE_NAME_LEN];
uint32_t streamId; int32_t streamId;
int64_t num; // number of computing/cycles int64_t num; // number of computing/cycles
int64_t useconds; int64_t useconds;
int64_t ctime; int64_t ctime;
...@@ -893,8 +895,18 @@ typedef struct { ...@@ -893,8 +895,18 @@ typedef struct {
} SHeartBeatRsp; } SHeartBeatRsp;
typedef struct { typedef struct {
char queryId[TSDB_KILL_MSG_LEN + 1]; int32_t connId;
} SKillQueryMsg, SKillConnMsg; int32_t streamId;
} SKillStreamMsg;
typedef struct {
int32_t connId;
int32_t queryId;
} SKillQueryMsg;
typedef struct {
int32_t connId;
} SKillConnMsg;
typedef struct { typedef struct {
char user[TSDB_USER_LEN]; char user[TSDB_USER_LEN];
......
...@@ -31,6 +31,7 @@ typedef struct { ...@@ -31,6 +31,7 @@ typedef struct {
int16_t numOfSupportMnodes; int16_t numOfSupportMnodes;
int16_t numOfSupportVnodes; int16_t numOfSupportVnodes;
int16_t numOfSupportQnodes; int16_t numOfSupportQnodes;
int8_t enableTelem;
int32_t statusInterval; int32_t statusInterval;
int32_t mnodeEqualVnodeNum; int32_t mnodeEqualVnodeNum;
float numOfThreadsPerCore; float numOfThreadsPerCore;
...@@ -45,6 +46,8 @@ typedef struct { ...@@ -45,6 +46,8 @@ typedef struct {
char timezone[TSDB_TIMEZONE_LEN]; char timezone[TSDB_TIMEZONE_LEN];
char locale[TSDB_LOCALE_LEN]; char locale[TSDB_LOCALE_LEN];
char charset[TSDB_LOCALE_LEN]; char charset[TSDB_LOCALE_LEN];
char buildinfo[64];
char gitinfo[48];
} SDnodeOpt; } SDnodeOpt;
/* ------------------------ SDnode ------------------------ */ /* ------------------------ SDnode ------------------------ */
......
...@@ -43,24 +43,31 @@ typedef struct SMnodeLoad { ...@@ -43,24 +43,31 @@ typedef struct SMnodeLoad {
int64_t compStorage; int64_t compStorage;
} SMnodeLoad; } SMnodeLoad;
typedef struct SMnodeCfg {
int32_t sver;
int8_t enableTelem;
int32_t statusInterval;
int32_t mnodeEqualVnodeNum;
int32_t shellActivityTimer;
char *timezone;
char *locale;
char *charset;
char *buildinfo;
char *gitinfo;
} SMnodeCfg;
typedef struct { typedef struct {
int32_t dnodeId; int32_t dnodeId;
int32_t clusterId; int32_t clusterId;
int8_t replica; int8_t replica;
int8_t selfIndex; int8_t selfIndex;
SReplica replicas[TSDB_MAX_REPLICA]; SReplica replicas[TSDB_MAX_REPLICA];
SMnodeCfg cfg;
SDnode *pDnode; SDnode *pDnode;
PutMsgToMnodeQFp putMsgToApplyMsgFp; PutMsgToMnodeQFp putMsgToApplyMsgFp;
SendMsgToDnodeFp sendMsgToDnodeFp; SendMsgToDnodeFp sendMsgToDnodeFp;
SendMsgToMnodeFp sendMsgToMnodeFp; SendMsgToMnodeFp sendMsgToMnodeFp;
SendRedirectMsgFp sendRedirectMsgFp; SendRedirectMsgFp sendRedirectMsgFp;
int32_t sver;
int32_t statusInterval;
int32_t mnodeEqualVnodeNum;
int32_t shellActivityTimer;
char *timezone;
char *locale;
char *charset;
} SMnodeOpt; } SMnodeOpt;
/* ------------------------ SMnode ------------------------ */ /* ------------------------ SMnode ------------------------ */
......
...@@ -76,6 +76,10 @@ int32_t* taosGetErrno(); ...@@ -76,6 +76,10 @@ int32_t* taosGetErrno();
#define TSDB_CODE_REF_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0114) #define TSDB_CODE_REF_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0114)
#define TSDB_CODE_REF_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0115) #define TSDB_CODE_REF_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0115)
#define TSDB_CODE_INVALID_VERSION_NUMBER TAOS_DEF_ERROR_CODE(0, 0x0120)
#define TSDB_CODE_INVALID_VERSION_STRING TAOS_DEF_ERROR_CODE(0, 0x0121)
#define TSDB_CODE_VERSION_NOT_COMPATIBLE TAOS_DEF_ERROR_CODE(0, 0x0122)
//client //client
#define TSDB_CODE_TSC_INVALID_OPERATION TAOS_DEF_ERROR_CODE(0, 0x0200) //"Invalid Operation") #define TSDB_CODE_TSC_INVALID_OPERATION TAOS_DEF_ERROR_CODE(0, 0x0200) //"Invalid Operation")
#define TSDB_CODE_TSC_INVALID_QHANDLE TAOS_DEF_ERROR_CODE(0, 0x0201) //"Invalid qhandle") #define TSDB_CODE_TSC_INVALID_QHANDLE TAOS_DEF_ERROR_CODE(0, 0x0201) //"Invalid qhandle")
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_UTIL_VERSION_H
#define _TD_UTIL_VERSION_H
#ifdef __cplusplus
extern "C" {
#endif
int32_t taosVersionStrToInt(const char *vstr, int32_t *vint);
int32_t taosVersionIntToStr(int32_t vint, char *vstr, int32_t len);
int32_t taosCheckVersionCompatible(int32_t clientVer, int32_t serverVer, int32_t comparedSegments);
#ifdef __cplusplus
}
#endif
#endif /*_TD_UTIL_VERSION_H*/
#include "os.h"
#include "tdef.h"
#include "ulog.h"
#include "taoserror.h"
bool taosGetVersionNumber(char *versionStr, int *versionNubmer) {
if (versionStr == NULL || versionNubmer == NULL) {
return false;
}
int versionNumberPos[5] = {0};
int len = (int)strlen(versionStr);
int dot = 0;
for (int pos = 0; pos < len && dot < 4; ++pos) {
if (versionStr[pos] == '.') {
versionStr[pos] = 0;
versionNumberPos[++dot] = pos + 1;
}
}
if (dot != 3) {
return false;
}
for (int pos = 0; pos < 4; ++pos) {
versionNubmer[pos] = atoi(versionStr + versionNumberPos[pos]);
}
versionStr[versionNumberPos[1] - 1] = '.';
versionStr[versionNumberPos[2] - 1] = '.';
versionStr[versionNumberPos[3] - 1] = '.';
return true;
}
int taosCheckVersion(char *input_client_version, char *input_server_version, int comparedSegments) {
char client_version[TSDB_VERSION_LEN] = {0};
char server_version[TSDB_VERSION_LEN] = {0};
int clientVersionNumber[4] = {0};
int serverVersionNumber[4] = {0};
tstrncpy(client_version, input_client_version, sizeof(client_version));
tstrncpy(server_version, input_server_version, sizeof(server_version));
if (!taosGetVersionNumber(client_version, clientVersionNumber)) {
uError("invalid client version:%s", client_version);
return TSDB_CODE_TSC_INVALID_VERSION;
}
if (!taosGetVersionNumber(server_version, serverVersionNumber)) {
uError("invalid server version:%s", server_version);
return TSDB_CODE_TSC_INVALID_VERSION;
}
for(int32_t i = 0; i < comparedSegments; ++i) {
if (clientVersionNumber[i] != serverVersionNumber[i]) {
uError("the %d-th number of server version:%s not matched with client version:%s", i, server_version,
client_version);
return TSDB_CODE_TSC_INVALID_VERSION;
}
}
return 0;
}
...@@ -136,7 +136,7 @@ void dmnWaitSignal() { ...@@ -136,7 +136,7 @@ void dmnWaitSignal() {
} }
void dmnInitOption(SDnodeOpt *pOption) { void dmnInitOption(SDnodeOpt *pOption) {
pOption->sver = tsVersion; pOption->sver = 30000000; //3.0.0.0
pOption->numOfCores = tsNumOfCores; pOption->numOfCores = tsNumOfCores;
pOption->numOfSupportMnodes = 1; pOption->numOfSupportMnodes = 1;
pOption->numOfSupportVnodes = 1; pOption->numOfSupportVnodes = 1;
...@@ -155,6 +155,8 @@ void dmnInitOption(SDnodeOpt *pOption) { ...@@ -155,6 +155,8 @@ void dmnInitOption(SDnodeOpt *pOption) {
tstrncpy(pOption->timezone, tsTimezone, TSDB_TIMEZONE_LEN); tstrncpy(pOption->timezone, tsTimezone, TSDB_TIMEZONE_LEN);
tstrncpy(pOption->locale, tsLocale, TSDB_LOCALE_LEN); tstrncpy(pOption->locale, tsLocale, TSDB_LOCALE_LEN);
tstrncpy(pOption->charset, tsCharset, TSDB_LOCALE_LEN); tstrncpy(pOption->charset, tsCharset, TSDB_LOCALE_LEN);
tstrncpy(pOption->buildinfo, buildinfo, 64);
tstrncpy(pOption->gitinfo, gitinfo, 48);
} }
int dmnRunDnode() { int dmnRunDnode() {
......
...@@ -331,13 +331,16 @@ static void dndInitMnodeOption(SDnode *pDnode, SMnodeOpt *pOption) { ...@@ -331,13 +331,16 @@ static void dndInitMnodeOption(SDnode *pDnode, SMnodeOpt *pOption) {
pOption->putMsgToApplyMsgFp = dndPutMsgIntoMnodeApplyQueue; pOption->putMsgToApplyMsgFp = dndPutMsgIntoMnodeApplyQueue;
pOption->dnodeId = dndGetDnodeId(pDnode); pOption->dnodeId = dndGetDnodeId(pDnode);
pOption->clusterId = dndGetClusterId(pDnode); pOption->clusterId = dndGetClusterId(pDnode);
pOption->sver = pDnode->opt.sver; pOption->cfg.sver = pDnode->opt.sver;
pOption->statusInterval = pDnode->opt.statusInterval; pOption->cfg.enableTelem = pDnode->opt.enableTelem;
pOption->mnodeEqualVnodeNum = pDnode->opt.mnodeEqualVnodeNum; pOption->cfg.statusInterval = pDnode->opt.statusInterval;
pOption->shellActivityTimer = pDnode->opt.shellActivityTimer; pOption->cfg.mnodeEqualVnodeNum = pDnode->opt.mnodeEqualVnodeNum;
pOption->timezone = pDnode->opt.timezone; pOption->cfg.shellActivityTimer = pDnode->opt.shellActivityTimer;
pOption->charset = pDnode->opt.charset; pOption->cfg.timezone = pDnode->opt.timezone;
pOption->locale = pDnode->opt.locale; pOption->cfg.charset = pDnode->opt.charset;
pOption->cfg.locale = pDnode->opt.locale;
pOption->cfg.gitinfo = pDnode->opt.gitinfo;
pOption->cfg.buildinfo = pDnode->opt.buildinfo;
} }
static void dndBuildMnodeDeployOption(SDnode *pDnode, SMnodeOpt *pOption) { static void dndBuildMnodeDeployOption(SDnode *pDnode, SMnodeOpt *pOption) {
......
add_subdirectory(test01) add_subdirectory(acct)
\ No newline at end of file add_subdirectory(cluster)
add_subdirectory(profile)
add_subdirectory(show)
add_executable(dndTest01 "") add_executable(dndTestAcct "")
target_sources(dndTest01 target_sources(dndTestAcct
PRIVATE PRIVATE
"test01.cpp" "acct.cpp"
"../sut/deploy.cpp" "../sut/deploy.cpp"
) )
target_link_libraries( target_link_libraries(
dndTest01 dndTestAcct
PUBLIC dnode PUBLIC dnode
PUBLIC util PUBLIC util
PUBLIC os PUBLIC os
PUBLIC gtest_main PUBLIC gtest_main
) )
target_include_directories(dndTest01 target_include_directories(dndTestAcct
PUBLIC PUBLIC
"${CMAKE_SOURCE_DIR}/include/server/dnode/mgmt" "${CMAKE_SOURCE_DIR}/include/server/dnode/mgmt"
"${CMAKE_CURRENT_SOURCE_DIR}/../../inc" "${CMAKE_CURRENT_SOURCE_DIR}/../../inc"
...@@ -24,6 +24,6 @@ target_include_directories(dndTest01 ...@@ -24,6 +24,6 @@ target_include_directories(dndTest01
enable_testing() enable_testing()
add_test( add_test(
NAME dndTest01 NAME dndTestAcct
COMMAND dndTest01 COMMAND dndTestAcct
) )
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "deploy.h"
class DndTestAcct : public ::testing::Test {
protected:
void SetUp() override {}
void TearDown() override {}
static void SetUpTestSuite() {
const char* user = "root";
const char* pass = "taosdata";
const char* path = "/tmp/dndTestAcct";
const char* fqdn = "localhost";
uint16_t port = 9520;
pServer = createServer(path, fqdn, port);
ASSERT(pServer);
pClient = createClient(user, pass, fqdn, port);
}
static void TearDownTestSuite() {
dropServer(pServer);
dropClient(pClient);
}
static SServer* pServer;
static SClient* pClient;
static int32_t connId;
};
SServer* DndTestAcct::pServer;
SClient* DndTestAcct::pClient;
int32_t DndTestAcct::connId;
TEST_F(DndTestAcct, CreateAcct) {
ASSERT_NE(pClient, nullptr);
SCreateAcctMsg* pReq = (SCreateAcctMsg*)rpcMallocCont(sizeof(SCreateAcctMsg));
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pReq;
rpcMsg.contLen = sizeof(SCreateAcctMsg);
rpcMsg.msgType = TSDB_MSG_TYPE_CREATE_ACCT;
sendMsg(pClient, &rpcMsg);
SRpcMsg* pMsg = pClient->pRsp;
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, TSDB_CODE_MND_MSG_NOT_PROCESSED);
}
TEST_F(DndTestAcct, AlterAcct) {
ASSERT_NE(pClient, nullptr);
SAlterAcctMsg* pReq = (SAlterAcctMsg*)rpcMallocCont(sizeof(SAlterAcctMsg));
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pReq;
rpcMsg.contLen = sizeof(SAlterAcctMsg);
rpcMsg.msgType = TSDB_MSG_TYPE_ALTER_ACCT;
sendMsg(pClient, &rpcMsg);
SRpcMsg* pMsg = pClient->pRsp;
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, TSDB_CODE_MND_MSG_NOT_PROCESSED);
}
TEST_F(DndTestAcct, DropAcct) {
ASSERT_NE(pClient, nullptr);
SDropAcctMsg* pReq = (SDropAcctMsg*)rpcMallocCont(sizeof(SDropAcctMsg));
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pReq;
rpcMsg.contLen = sizeof(SDropAcctMsg);
rpcMsg.msgType = TSDB_MSG_TYPE_DROP_ACCT;
sendMsg(pClient, &rpcMsg);
SRpcMsg* pMsg = pClient->pRsp;
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, TSDB_CODE_MND_MSG_NOT_PROCESSED);
}
TEST_F(DndTestAcct, ShowAcct) {
ASSERT_NE(pClient, nullptr);
SShowMsg* pReq = (SShowMsg*)rpcMallocCont(sizeof(SShowMsg));
pReq->type = TSDB_MGMT_TABLE_ACCT;
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pReq;
rpcMsg.contLen = sizeof(SShowMsg);
rpcMsg.msgType = TSDB_MSG_TYPE_SHOW;
sendMsg(pClient, &rpcMsg);
SRpcMsg* pMsg = pClient->pRsp;
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, TSDB_CODE_MND_INVALID_MSG_TYPE);
}
\ No newline at end of file
add_executable(dndTestCluster "")
target_sources(dndTestCluster
PRIVATE
"cluster.cpp"
"../sut/deploy.cpp"
)
target_link_libraries(
dndTestCluster
PUBLIC dnode
PUBLIC util
PUBLIC os
PUBLIC gtest_main
)
target_include_directories(dndTestCluster
PUBLIC
"${CMAKE_SOURCE_DIR}/include/server/dnode/mgmt"
"${CMAKE_CURRENT_SOURCE_DIR}/../../inc"
"${CMAKE_CURRENT_SOURCE_DIR}/../sut"
)
enable_testing()
add_test(
NAME dndTestCluster
COMMAND dndTestCluster
)
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "deploy.h"
class DndTestCluster : public ::testing::Test {
protected:
void SetUp() override {}
void TearDown() override {}
static void SetUpTestSuite() {
const char* user = "root";
const char* pass = "taosdata";
const char* path = "/tmp/dndTestCluster";
const char* fqdn = "localhost";
uint16_t port = 9521;
pServer = createServer(path, fqdn, port);
ASSERT(pServer);
pClient = createClient(user, pass, fqdn, port);
}
static void TearDownTestSuite() {
dropServer(pServer);
dropClient(pClient);
}
static SServer* pServer;
static SClient* pClient;
static int32_t connId;
};
SServer* DndTestCluster::pServer;
SClient* DndTestCluster::pClient;
int32_t DndTestCluster::connId;
TEST_F(DndTestCluster, ShowCluster) {
ASSERT_NE(pClient, nullptr);
int32_t showId = 0;
{
SShowMsg* pReq = (SShowMsg*)rpcMallocCont(sizeof(SShowMsg));
pReq->type = TSDB_MGMT_TABLE_CLUSTER;
strcpy(pReq->db, "");
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pReq;
rpcMsg.contLen = sizeof(SShowMsg);
rpcMsg.msgType = TSDB_MSG_TYPE_SHOW;
sendMsg(pClient, &rpcMsg);
SRpcMsg* pMsg = pClient->pRsp;
ASSERT_NE(pMsg, nullptr);
SShowRsp* pRsp = (SShowRsp*)pMsg->pCont;
ASSERT_NE(pRsp, nullptr);
pRsp->showId = htonl(pRsp->showId);
STableMetaMsg* pMeta = &pRsp->tableMeta;
pMeta->contLen = htonl(pMeta->contLen);
pMeta->numOfColumns = htons(pMeta->numOfColumns);
pMeta->sversion = htons(pMeta->sversion);
pMeta->tversion = htons(pMeta->tversion);
pMeta->tid = htonl(pMeta->tid);
pMeta->uid = htobe64(pMeta->uid);
pMeta->suid = htobe64(pMeta->suid);
showId = pRsp->showId;
EXPECT_NE(pRsp->showId, 0);
EXPECT_EQ(pMeta->contLen, 0);
EXPECT_STREQ(pMeta->tableFname, "show cluster");
EXPECT_EQ(pMeta->numOfTags, 0);
EXPECT_EQ(pMeta->precision, 0);
EXPECT_EQ(pMeta->tableType, 0);
EXPECT_EQ(pMeta->numOfColumns, 3);
EXPECT_EQ(pMeta->sversion, 0);
EXPECT_EQ(pMeta->tversion, 0);
EXPECT_EQ(pMeta->tid, 0);
EXPECT_EQ(pMeta->uid, 0);
EXPECT_STREQ(pMeta->sTableName, "");
EXPECT_EQ(pMeta->suid, 0);
SSchema* pSchema = NULL;
pSchema = &pMeta->schema[0];
pSchema->bytes = htons(pSchema->bytes);
EXPECT_EQ(pSchema->colId, 0);
EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_INT);
EXPECT_EQ(pSchema->bytes, 4);
EXPECT_STREQ(pSchema->name, "id");
pSchema = &pMeta->schema[1];
pSchema->bytes = htons(pSchema->bytes);
EXPECT_EQ(pSchema->colId, 0);
EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_BINARY);
EXPECT_EQ(pSchema->bytes, TSDB_CLUSTER_ID_LEN + VARSTR_HEADER_SIZE);
EXPECT_STREQ(pSchema->name, "name");
pSchema = &pMeta->schema[2];
pSchema->bytes = htons(pSchema->bytes);
EXPECT_EQ(pSchema->colId, 0);
EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_TIMESTAMP);
EXPECT_EQ(pSchema->bytes, 8);
EXPECT_STREQ(pSchema->name, "create_time");
}
{
SRetrieveTableMsg* pReq = (SRetrieveTableMsg*)rpcMallocCont(sizeof(SRetrieveTableMsg));
pReq->showId = htonl(showId);
pReq->free = 0;
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pReq;
rpcMsg.contLen = sizeof(SRetrieveTableMsg);
rpcMsg.msgType = TSDB_MSG_TYPE_SHOW_RETRIEVE;
sendMsg(pClient, &rpcMsg);
SRpcMsg* pMsg = pClient->pRsp;
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, 0);
SRetrieveTableRsp* pRsp = (SRetrieveTableRsp*)pMsg->pCont;
ASSERT_NE(pRsp, nullptr);
pRsp->numOfRows = htonl(pRsp->numOfRows);
pRsp->offset = htobe64(pRsp->offset);
pRsp->useconds = htobe64(pRsp->useconds);
pRsp->compLen = htonl(pRsp->compLen);
EXPECT_EQ(pRsp->numOfRows, 1);
EXPECT_EQ(pRsp->offset, 0);
EXPECT_EQ(pRsp->useconds, 0);
EXPECT_EQ(pRsp->completed, 1);
EXPECT_EQ(pRsp->precision, TSDB_TIME_PRECISION_MILLI);
EXPECT_EQ(pRsp->compressed, 0);
EXPECT_EQ(pRsp->reserved, 0);
EXPECT_EQ(pRsp->compLen, 0);
char* pData = pRsp->data;
int32_t pos = 0;
int32_t id = *((int32_t*)(pData + pos));
pos += sizeof(int32_t);
int32_t nameLen = varDataLen(pData + pos);
pos += sizeof(VarDataLenT);
char* name = (char*)(pData + pos);
pos += TSDB_CLUSTER_ID_LEN;
int64_t create_time = *((int64_t*)(pData + pos));
pos += sizeof(int64_t);
EXPECT_NE(id, 0);
EXPECT_EQ(nameLen, 36);
EXPECT_STRNE(name, "");
EXPECT_GT(create_time, 0);
printf("--- id:%d nameLen:%d name:%s time:%" PRId64 " --- \n", id, nameLen, name, create_time);
}
}
\ No newline at end of file
add_executable(dndTestProfile "")
target_sources(dndTestProfile
PRIVATE
"profile.cpp"
"../sut/deploy.cpp"
)
target_link_libraries(
dndTestProfile
PUBLIC dnode
PUBLIC util
PUBLIC os
PUBLIC gtest_main
)
target_include_directories(dndTestProfile
PUBLIC
"${CMAKE_SOURCE_DIR}/include/server/dnode/mgmt"
"${CMAKE_CURRENT_SOURCE_DIR}/../../inc"
"${CMAKE_CURRENT_SOURCE_DIR}/../sut"
)
enable_testing()
add_test(
NAME dndTestProfile
COMMAND dndTestProfile
)
此差异已折叠。
add_executable(dndTestShow "")
target_sources(dndTestShow
PRIVATE
"show.cpp"
"../sut/deploy.cpp"
)
target_link_libraries(
dndTestShow
PUBLIC dnode
PUBLIC util
PUBLIC os
PUBLIC gtest_main
)
target_include_directories(dndTestShow
PUBLIC
"${CMAKE_SOURCE_DIR}/include/server/dnode/mgmt"
"${CMAKE_CURRENT_SOURCE_DIR}/../../inc"
"${CMAKE_CURRENT_SOURCE_DIR}/../sut"
)
enable_testing()
add_test(
NAME dndTestShow
COMMAND dndTestShow
)
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "deploy.h"
class DndTestShow : public ::testing::Test {
protected:
void SetUp() override {}
void TearDown() override {}
static void SetUpTestSuite() {
const char* user = "root";
const char* pass = "taosdata";
const char* path = "/tmp/dndTestShow";
const char* fqdn = "localhost";
uint16_t port = 9523;
pServer = createServer(path, fqdn, port);
ASSERT(pServer);
pClient = createClient(user, pass, fqdn, port);
}
static void TearDownTestSuite() {
dropServer(pServer);
dropClient(pClient);
}
static SServer* pServer;
static SClient* pClient;
static int32_t connId;
};
SServer* DndTestShow::pServer;
SClient* DndTestShow::pClient;
int32_t DndTestShow::connId;
TEST_F(DndTestShow, SShowMsg_01) {
ASSERT_NE(pClient, nullptr);
SConnectMsg* pReq = (SConnectMsg*)rpcMallocCont(sizeof(SConnectMsg));
pReq->pid = htonl(1234);
strcpy(pReq->app, "dndTestShow");
strcpy(pReq->db, "");
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pReq;
rpcMsg.contLen = sizeof(SConnectMsg);
rpcMsg.msgType = TSDB_MSG_TYPE_CONNECT;
sendMsg(pClient, &rpcMsg);
SRpcMsg* pMsg = pClient->pRsp;
ASSERT_NE(pMsg, nullptr);
SConnectRsp* pRsp = (SConnectRsp*)pMsg->pCont;
ASSERT_NE(pRsp, nullptr);
pRsp->connId = htonl(pRsp->connId);
EXPECT_EQ(pRsp->connId, 1);
connId = pRsp->connId;
}
TEST_F(DndTestShow, SShowMsg_02) {
ASSERT_NE(pClient, nullptr);
SShowMsg* pReq = (SShowMsg*)rpcMallocCont(sizeof(SShowMsg));
pReq->type = TSDB_MGMT_TABLE_MAX;
strcpy(pReq->db, "");
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pReq;
rpcMsg.contLen = sizeof(SShowMsg);
rpcMsg.msgType = TSDB_MSG_TYPE_SHOW;
sendMsg(pClient, &rpcMsg);
SRpcMsg* pMsg = pClient->pRsp;
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, TSDB_CODE_MND_INVALID_MSG_TYPE);
}
TEST_F(DndTestShow, SShowMsg_03) {
ASSERT_NE(pClient, nullptr);
SShowMsg* pReq = (SShowMsg*)rpcMallocCont(sizeof(SShowMsg));
pReq->type = TSDB_MGMT_TABLE_START;
strcpy(pReq->db, "");
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pReq;
rpcMsg.contLen = sizeof(SShowMsg);
rpcMsg.msgType = TSDB_MSG_TYPE_SHOW;
sendMsg(pClient, &rpcMsg);
SRpcMsg* pMsg = pClient->pRsp;
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, TSDB_CODE_MND_INVALID_MSG_TYPE);
}
TEST_F(DndTestShow, SShowMsg_04) {
ASSERT_NE(pClient, nullptr);
int32_t showId = 0;
{
SShowMsg* pReq = (SShowMsg*)rpcMallocCont(sizeof(SShowMsg));
pReq->type = TSDB_MGMT_TABLE_CONNS;
strcpy(pReq->db, "");
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pReq;
rpcMsg.contLen = sizeof(SShowMsg);
rpcMsg.msgType = TSDB_MSG_TYPE_SHOW;
sendMsg(pClient, &rpcMsg);
SRpcMsg* pMsg = pClient->pRsp;
ASSERT_NE(pMsg, nullptr);
SShowRsp* pRsp = (SShowRsp*)pMsg->pCont;
ASSERT_NE(pRsp, nullptr);
pRsp->showId = htonl(pRsp->showId);
STableMetaMsg* pMeta = &pRsp->tableMeta;
pMeta->contLen = htonl(pMeta->contLen);
pMeta->numOfColumns = htons(pMeta->numOfColumns);
pMeta->sversion = htons(pMeta->sversion);
pMeta->tversion = htons(pMeta->tversion);
pMeta->tid = htonl(pMeta->tid);
pMeta->uid = htobe64(pMeta->uid);
pMeta->suid = htobe64(pMeta->suid);
showId = pRsp->showId;
EXPECT_NE(pRsp->showId, 0);
EXPECT_EQ(pMeta->contLen, 0);
EXPECT_STREQ(pMeta->tableFname, "");
EXPECT_EQ(pMeta->numOfTags, 0);
EXPECT_EQ(pMeta->precision, 0);
EXPECT_EQ(pMeta->tableType, 0);
EXPECT_EQ(pMeta->numOfColumns, 7);
EXPECT_EQ(pMeta->sversion, 0);
EXPECT_EQ(pMeta->tversion, 0);
EXPECT_EQ(pMeta->tid, 0);
EXPECT_EQ(pMeta->uid, 0);
EXPECT_STREQ(pMeta->sTableName, "");
EXPECT_EQ(pMeta->suid, 0);
SSchema* pSchema = NULL;
pSchema = &pMeta->schema[0];
pSchema->bytes = htons(pSchema->bytes);
EXPECT_EQ(pSchema->colId, 0);
EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_INT);
EXPECT_EQ(pSchema->bytes, 4);
EXPECT_STREQ(pSchema->name, "connId");
pSchema = &pMeta->schema[1];
pSchema->bytes = htons(pSchema->bytes);
EXPECT_EQ(pSchema->colId, 0);
EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_BINARY);
EXPECT_EQ(pSchema->bytes, TSDB_USER_LEN + VARSTR_HEADER_SIZE);
EXPECT_STREQ(pSchema->name, "user");
pSchema = &pMeta->schema[2];
pSchema->bytes = htons(pSchema->bytes);
EXPECT_EQ(pSchema->colId, 0);
EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_BINARY);
EXPECT_EQ(pSchema->bytes, TSDB_USER_LEN + VARSTR_HEADER_SIZE);
EXPECT_STREQ(pSchema->name, "program");
pSchema = &pMeta->schema[3];
pSchema->bytes = htons(pSchema->bytes);
EXPECT_EQ(pSchema->colId, 0);
EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_INT);
EXPECT_EQ(pSchema->bytes, 4);
EXPECT_STREQ(pSchema->name, "pid");
pSchema = &pMeta->schema[4];
pSchema->bytes = htons(pSchema->bytes);
EXPECT_EQ(pSchema->colId, 0);
EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_BINARY);
EXPECT_EQ(pSchema->bytes, TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE);
EXPECT_STREQ(pSchema->name, "ip:port");
pSchema = &pMeta->schema[5];
pSchema->bytes = htons(pSchema->bytes);
EXPECT_EQ(pSchema->colId, 0);
EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_TIMESTAMP);
EXPECT_EQ(pSchema->bytes, 8);
EXPECT_STREQ(pSchema->name, "login_time");
pSchema = &pMeta->schema[6];
pSchema->bytes = htons(pSchema->bytes);
EXPECT_EQ(pSchema->colId, 0);
EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_TIMESTAMP);
EXPECT_EQ(pSchema->bytes, 8);
EXPECT_STREQ(pSchema->name, "last_access");
}
{
SRetrieveTableMsg* pReq = (SRetrieveTableMsg*)rpcMallocCont(sizeof(SRetrieveTableMsg));
pReq->showId = htonl(showId);
pReq->free = 0;
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pReq;
rpcMsg.contLen = sizeof(SRetrieveTableMsg);
rpcMsg.msgType = TSDB_MSG_TYPE_SHOW_RETRIEVE;
sendMsg(pClient, &rpcMsg);
SRpcMsg* pMsg = pClient->pRsp;
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, 0);
SRetrieveTableRsp* pRsp = (SRetrieveTableRsp*)pMsg->pCont;
ASSERT_NE(pRsp, nullptr);
pRsp->numOfRows = htonl(pRsp->numOfRows);
pRsp->offset = htobe64(pRsp->offset);
pRsp->useconds = htobe64(pRsp->useconds);
pRsp->compLen = htonl(pRsp->compLen);
EXPECT_EQ(pRsp->numOfRows, 1);
EXPECT_EQ(pRsp->offset, 0);
EXPECT_EQ(pRsp->useconds, 0);
EXPECT_EQ(pRsp->completed, 1);
EXPECT_EQ(pRsp->precision, TSDB_TIME_PRECISION_MILLI);
EXPECT_EQ(pRsp->compressed, 0);
EXPECT_EQ(pRsp->reserved, 0);
EXPECT_EQ(pRsp->compLen, 0);
}
}
...@@ -15,8 +15,27 @@ ...@@ -15,8 +15,27 @@
#include "deploy.h" #include "deploy.h"
void initLog(char *path) { void initLog(const char* path) {
dDebugFlag = 0;
vDebugFlag = 0;
mDebugFlag = 207; mDebugFlag = 207;
cDebugFlag = 0;
jniDebugFlag = 0;
tmrDebugFlag = 0;
sdbDebugFlag = 0;
httpDebugFlag = 0;
mqttDebugFlag = 0;
monDebugFlag = 0;
uDebugFlag = 0;
rpcDebugFlag = 0;
odbcDebugFlag = 0;
qDebugFlag = 0;
wDebugFlag = 0;
sDebugFlag = 0;
tsdbDebugFlag = 0;
cqDebugFlag = 0;
debugFlag = 0;
char temp[PATH_MAX]; char temp[PATH_MAX];
snprintf(temp, PATH_MAX, "%s/taosdlog", path); snprintf(temp, PATH_MAX, "%s/taosdlog", path);
if (taosInitLog(temp, tsNumOfLogLines, 1) != 0) { if (taosInitLog(temp, tsNumOfLogLines, 1) != 0) {
...@@ -32,7 +51,7 @@ void* runServer(void* param) { ...@@ -32,7 +51,7 @@ void* runServer(void* param) {
} }
} }
void initOption(SDnodeOpt* pOption, char *path) { void initOption(SDnodeOpt* pOption, const char* path, const char* fqdn, uint16_t port) {
pOption->sver = 1; pOption->sver = 1;
pOption->numOfCores = 1; pOption->numOfCores = 1;
pOption->numOfSupportMnodes = 1; pOption->numOfSupportMnodes = 1;
...@@ -44,19 +63,20 @@ void initOption(SDnodeOpt* pOption, char *path) { ...@@ -44,19 +63,20 @@ void initOption(SDnodeOpt* pOption, char *path) {
pOption->ratioOfQueryCores = 1; pOption->ratioOfQueryCores = 1;
pOption->maxShellConns = 1000; pOption->maxShellConns = 1000;
pOption->shellActivityTimer = 30; pOption->shellActivityTimer = 30;
pOption->serverPort = 9527; pOption->serverPort = port;
strcpy(pOption->dataDir, path); strcpy(pOption->dataDir, path);
strcpy(pOption->localEp, "localhost:9527"); snprintf(pOption->localEp, TSDB_EP_LEN, "%s:%u", fqdn, port);
strcpy(pOption->localFqdn, "localhost"); snprintf(pOption->localFqdn, TSDB_FQDN_LEN, "%s", fqdn);
strcpy(pOption->firstEp, "localhost:9527"); snprintf(pOption->firstEp, TSDB_EP_LEN, "%s:%u", fqdn, port);
}
SServer* createServer(const char* path, const char* fqdn, uint16_t port) {
taosRemoveDir(path); taosRemoveDir(path);
taosMkDir(path); taosMkDir(path);
} initLog(path);
SServer* createServer(char *path) {
SDnodeOpt option = {0}; SDnodeOpt option = {0};
initOption(&option, path); initOption(&option, path, fqdn, port);
SDnode* pDnode = dndInit(&option); SDnode* pDnode = dndInit(&option);
ASSERT(pDnode); ASSERT(pDnode);
...@@ -80,11 +100,11 @@ void dropServer(SServer* pServer) { ...@@ -80,11 +100,11 @@ void dropServer(SServer* pServer) {
void processClientRsp(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { void processClientRsp(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
SClient* pClient = (SClient*)parent; SClient* pClient = (SClient*)parent;
pClient->pRsp = pMsg; pClient->pRsp = pMsg;
//taosMsleep(1000000); // taosMsleep(1000000);
tsem_post(&pClient->sem); tsem_post(&pClient->sem);
} }
SClient* createClient(char *user, char *pass) { SClient* createClient(const char* user, const char* pass, const char* fqdn, uint16_t port) {
SClient* pClient = (SClient*)calloc(1, sizeof(SClient)); SClient* pClient = (SClient*)calloc(1, sizeof(SClient));
ASSERT(pClient); ASSERT(pClient);
...@@ -93,14 +113,14 @@ SClient* createClient(char *user, char *pass) { ...@@ -93,14 +113,14 @@ SClient* createClient(char *user, char *pass) {
SRpcInit rpcInit; SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit)); memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.label = "DND-C"; rpcInit.label = (char*)"DND-C";
rpcInit.numOfThreads = 1; rpcInit.numOfThreads = 1;
rpcInit.cfp = processClientRsp; rpcInit.cfp = processClientRsp;
rpcInit.sessions = 1024; rpcInit.sessions = 1024;
rpcInit.connType = TAOS_CONN_CLIENT; rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.idleTime = 30 * 1000; rpcInit.idleTime = 30 * 1000;
rpcInit.user = user; rpcInit.user = (char*)user;
rpcInit.ckey = "key"; rpcInit.ckey = (char*)"key";
rpcInit.parent = pClient; rpcInit.parent = pClient;
rpcInit.secret = (char*)secretEncrypt; rpcInit.secret = (char*)secretEncrypt;
rpcInit.parent = pClient; rpcInit.parent = pClient;
...@@ -110,6 +130,8 @@ SClient* createClient(char *user, char *pass) { ...@@ -110,6 +130,8 @@ SClient* createClient(char *user, char *pass) {
ASSERT(pClient->clientRpc); ASSERT(pClient->clientRpc);
tsem_init(&pClient->sem, 0, 0); tsem_init(&pClient->sem, 0, 0);
strcpy(pClient->fqdn, fqdn);
pClient->port = port;
return pClient; return pClient;
} }
...@@ -123,8 +145,8 @@ void sendMsg(SClient* pClient, SRpcMsg* pMsg) { ...@@ -123,8 +145,8 @@ void sendMsg(SClient* pClient, SRpcMsg* pMsg) {
SEpSet epSet = {0}; SEpSet epSet = {0};
epSet.inUse = 0; epSet.inUse = 0;
epSet.numOfEps = 1; epSet.numOfEps = 1;
epSet.port[0] = 9527; epSet.port[0] = pClient->port;
strcpy(epSet.fqdn[0], "localhost"); strcpy(epSet.fqdn[0], pClient->fqdn);
rpcSendRequest(pClient->clientRpc, &epSet, pMsg, NULL); rpcSendRequest(pClient->clientRpc, &epSet, pMsg, NULL);
tsem_wait(&pClient->sem); tsem_wait(&pClient->sem);
......
...@@ -24,6 +24,7 @@ ...@@ -24,6 +24,7 @@
#include "trpc.h" #include "trpc.h"
#include "tthread.h" #include "tthread.h"
#include "ulog.h" #include "ulog.h"
#include "tdataformat.h"
typedef struct { typedef struct {
SDnode* pDnode; SDnode* pDnode;
...@@ -31,13 +32,15 @@ typedef struct { ...@@ -31,13 +32,15 @@ typedef struct {
} SServer; } SServer;
typedef struct { typedef struct {
char fqdn[TSDB_FQDN_LEN];
uint16_t port;
void* clientRpc; void* clientRpc;
SRpcMsg* pRsp; SRpcMsg* pRsp;
tsem_t sem; tsem_t sem;
} SClient; } SClient;
SServer* createServer(char* path); SServer* createServer(const char* path, const char* fqdn, uint16_t port);
void dropServer(SServer* pServer); void dropServer(SServer* pServer);
SClient* createClient(char *user, char *pass); SClient* createClient(const char* user, const char* pass, const char* fqdn, uint16_t port);
void dropClient(SClient* pClient); void dropClient(SClient* pClient);
void sendMsg(SClient* pClient, SRpcMsg* pMsg); void sendMsg(SClient* pClient, SRpcMsg* pMsg);
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "deploy.h"
class DndTest01 : public ::testing::Test {
protected:
void SetUp() override {
pServer = createServer("/tmp/dndTest01");
pClient = createClient("root", "taosdata");
}
void TearDown() override {
dropServer(pServer);
dropClient(pClient);
}
SServer* pServer;
SClient* pClient;
};
TEST_F(DndTest01, connectMsg) {
SConnectMsg* pReq = (SConnectMsg*)rpcMallocCont(sizeof(SConnectMsg));
pReq->pid = htonl(1234);
strcpy(pReq->app, "test01");
strcpy(pReq->db, "");
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pReq;
rpcMsg.contLen = sizeof(SConnectMsg);
rpcMsg.msgType = TSDB_MSG_TYPE_CONNECT;
sendMsg(pClient, &rpcMsg);
SConnectRsp* pRsp = (SConnectRsp*)pClient->pRsp->pCont;
ASSERT_NE(pRsp, nullptr);
pRsp->acctId = htonl(pRsp->acctId);
pRsp->clusterId = htonl(pRsp->clusterId);
pRsp->connId = htonl(pRsp->connId);
pRsp->epSet.port[0] = htons(pRsp->epSet.port[0]);
EXPECT_EQ(pRsp->acctId, 1);
EXPECT_GT(pRsp->clusterId, 0);
EXPECT_EQ(pRsp->connId, 1);
EXPECT_EQ(pRsp->superAuth, 1);
EXPECT_EQ(pRsp->readAuth, 1);
EXPECT_EQ(pRsp->writeAuth, 1);
EXPECT_EQ(pRsp->epSet.inUse, 0);
EXPECT_EQ(pRsp->epSet.numOfEps, 1);
EXPECT_EQ(pRsp->epSet.port[0], 9527);
EXPECT_STREQ(pRsp->epSet.fqdn[0], "localhost");
}
// TEST_F(DndTest01, heartbeatMsg) {
// SHeartBeatMsg* pReq = (SHeartBeatMsg*)rpcMallocCont(sizeof(SHeartBeatMsg));
// pReq->connId = htonl(1);
// pReq->pid = htonl(1234);
// pReq->numOfQueries = htonl(0);
// pReq->numOfStreams = htonl(0);
// strcpy(pReq->app, "test01");
// SRpcMsg rpcMsg = {0};
// rpcMsg.pCont = pReq;
// rpcMsg.contLen = sizeof(SHeartBeatMsg);
// rpcMsg.msgType = TSDB_MSG_TYPE_HEARTBEAT;
// sendMsg(pClient, &rpcMsg);
// SHeartBeatRsp* pRsp = (SHeartBeatRsp*)pClient->pRsp;
// ASSERT(pRsp);
// pRsp->epSet.port[0] = htonl(pRsp->epSet.port[0]);
// EXPECT_EQ(htonl(pRsp->connId), 1);
// EXPECT_GT(htonl(pRsp->queryId), 0);
// EXPECT_GT(htonl(pRsp->streamId), 1);
// EXPECT_EQ(htonl(pRsp->totalDnodes), 1);
// EXPECT_EQ(htonl(pRsp->onlineDnodes), 1);
// EXPECT_EQ(pRsp->killConnection, 0);
// EXPECT_EQ(pRsp->epSet.inUse, 0);
// EXPECT_EQ(pRsp->epSet.numOfEps, 1);
// EXPECT_EQ(pRsp->epSet.port[0], 9527);
// EXPECT_STREQ(pRsp->epSet.fqdn[0], "localhost");
// }
...@@ -24,6 +24,7 @@ extern "C" { ...@@ -24,6 +24,7 @@ extern "C" {
int32_t mndInitCluster(SMnode *pMnode); int32_t mndInitCluster(SMnode *pMnode);
void mndCleanupCluster(SMnode *pMnode); void mndCleanupCluster(SMnode *pMnode);
int32_t mndGetClusterName(SMnode *pMnode, char *clusterName, int32_t len);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -116,7 +116,7 @@ typedef struct STrans { ...@@ -116,7 +116,7 @@ typedef struct STrans {
typedef struct SClusterObj { typedef struct SClusterObj {
int32_t id; int32_t id;
char uid[TSDB_CLUSTER_ID_LEN]; char name[TSDB_CLUSTER_ID_LEN];
int64_t createdTime; int64_t createdTime;
int64_t updateTime; int64_t updateTime;
} SClusterObj; } SClusterObj;
...@@ -296,7 +296,6 @@ typedef struct SShowObj { ...@@ -296,7 +296,6 @@ typedef struct SShowObj {
void *pIter; void *pIter;
void *pVgIter; void *pVgIter;
SMnode *pMnode; SMnode *pMnode;
SShowObj **ppShow;
char db[TSDB_FULL_DB_NAME_LEN]; char db[TSDB_FULL_DB_NAME_LEN];
int16_t offset[TSDB_MAX_COLUMNS]; int16_t offset[TSDB_MAX_COLUMNS];
int32_t bytes[TSDB_MAX_COLUMNS]; int32_t bytes[TSDB_MAX_COLUMNS];
......
...@@ -51,6 +51,15 @@ typedef struct { ...@@ -51,6 +51,15 @@ typedef struct {
SCacheObj *cache; SCacheObj *cache;
} SProfileMgmt; } SProfileMgmt;
typedef struct {
int8_t enable;
pthread_mutex_t lock;
pthread_cond_t cond;
volatile int32_t exit;
pthread_t thread;
char email[TSDB_FQDN_LEN];
} STelemMgmt;
typedef struct SMnode { typedef struct SMnode {
int32_t dnodeId; int32_t dnodeId;
int32_t clusterId; int32_t clusterId;
...@@ -59,23 +68,18 @@ typedef struct SMnode { ...@@ -59,23 +68,18 @@ typedef struct SMnode {
SReplica replicas[TSDB_MAX_REPLICA]; SReplica replicas[TSDB_MAX_REPLICA];
tmr_h timer; tmr_h timer;
char *path; char *path;
SMnodeCfg cfg;
SSdb *pSdb; SSdb *pSdb;
SDnode *pDnode; SDnode *pDnode;
SArray *pSteps; SArray *pSteps;
SShowMgmt showMgmt; SShowMgmt showMgmt;
SProfileMgmt profileMgmt; SProfileMgmt profileMgmt;
STelemMgmt telemMgmt;
MndMsgFp msgFp[TSDB_MSG_TYPE_MAX]; MndMsgFp msgFp[TSDB_MSG_TYPE_MAX];
SendMsgToDnodeFp sendMsgToDnodeFp; SendMsgToDnodeFp sendMsgToDnodeFp;
SendMsgToMnodeFp sendMsgToMnodeFp; SendMsgToMnodeFp sendMsgToMnodeFp;
SendRedirectMsgFp sendRedirectMsgFp; SendRedirectMsgFp sendRedirectMsgFp;
PutMsgToMnodeQFp putMsgToApplyMsgFp; PutMsgToMnodeQFp putMsgToApplyMsgFp;
int32_t sver;
int32_t statusInterval;
int32_t mnodeEqualVnodeNum;
int32_t shellActivityTimer;
char *timezone;
char *locale;
char *charset;
} SMnode; } SMnode;
void mndSendMsgToDnode(SMnode *pMnode, SEpSet *pEpSet, SRpcMsg *rpcMsg); void mndSendMsgToDnode(SMnode *pMnode, SEpSet *pEpSet, SRpcMsg *rpcMsg);
......
...@@ -15,11 +15,52 @@ ...@@ -15,11 +15,52 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "mndCluster.h" #include "mndCluster.h"
#include "mndTrans.h"
#include "mndShow.h" #include "mndShow.h"
#include "mndTrans.h"
#define SDB_CLUSTER_VER 1 #define SDB_CLUSTER_VER 1
static SSdbRaw *mndClusterActionEncode(SClusterObj *pCluster);
static SSdbRow *mndClusterActionDecode(SSdbRaw *pRaw);
static int32_t mndClusterActionInsert(SSdb *pSdb, SClusterObj *pCluster);
static int32_t mndClusterActionDelete(SSdb *pSdb, SClusterObj *pCluster);
static int32_t mndClusterActionUpdate(SSdb *pSdb, SClusterObj *pSrcCluster, SClusterObj *pDstCluster);
static int32_t mndCreateDefaultCluster(SMnode *pMnode);
static int32_t mndGetClusterMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta);
static int32_t mndRetrieveClusters(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows);
static void mndCancelGetNextCluster(SMnode *pMnode, void *pIter);
int32_t mndInitCluster(SMnode *pMnode) {
SSdbTable table = {.sdbType = SDB_CLUSTER,
.keyType = SDB_KEY_INT32,
.deployFp = (SdbDeployFp)mndCreateDefaultCluster,
.encodeFp = (SdbEncodeFp)mndClusterActionEncode,
.decodeFp = (SdbDecodeFp)mndClusterActionDecode,
.insertFp = (SdbInsertFp)mndClusterActionInsert,
.updateFp = (SdbUpdateFp)mndClusterActionUpdate,
.deleteFp = (SdbDeleteFp)mndClusterActionDelete};
mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_CLUSTER, mndGetClusterMeta);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CLUSTER, mndRetrieveClusters);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CLUSTER, mndCancelGetNextCluster);
return sdbSetTable(pMnode->pSdb, table);
}
void mndCleanupCluster(SMnode *pMnode) {}
int32_t mndGetClusterName(SMnode *pMnode, char *clusterName, int32_t len) {
SSdb *pSdb = pMnode->pSdb;
SClusterObj *pCluster = sdbAcquire(pSdb, SDB_CLUSTER, &pMnode->clusterId);
if (pCluster = NULL) {
return -1;
}
tstrncpy(clusterName, pCluster->name, len);
sdbRelease(pSdb, pCluster);
return 0;
}
static SSdbRaw *mndClusterActionEncode(SClusterObj *pCluster) { static SSdbRaw *mndClusterActionEncode(SClusterObj *pCluster) {
SSdbRaw *pRaw = sdbAllocRaw(SDB_CLUSTER, SDB_CLUSTER_VER, sizeof(SClusterObj)); SSdbRaw *pRaw = sdbAllocRaw(SDB_CLUSTER, SDB_CLUSTER_VER, sizeof(SClusterObj));
if (pRaw == NULL) return NULL; if (pRaw == NULL) return NULL;
...@@ -28,7 +69,7 @@ static SSdbRaw *mndClusterActionEncode(SClusterObj *pCluster) { ...@@ -28,7 +69,7 @@ static SSdbRaw *mndClusterActionEncode(SClusterObj *pCluster) {
SDB_SET_INT32(pRaw, dataPos, pCluster->id); SDB_SET_INT32(pRaw, dataPos, pCluster->id);
SDB_SET_INT64(pRaw, dataPos, pCluster->createdTime) SDB_SET_INT64(pRaw, dataPos, pCluster->createdTime)
SDB_SET_INT64(pRaw, dataPos, pCluster->updateTime) SDB_SET_INT64(pRaw, dataPos, pCluster->updateTime)
SDB_SET_BINARY(pRaw, dataPos, pCluster->uid, TSDB_CLUSTER_ID_LEN) SDB_SET_BINARY(pRaw, dataPos, pCluster->name, TSDB_CLUSTER_ID_LEN)
return pRaw; return pRaw;
} }
...@@ -51,7 +92,7 @@ static SSdbRow *mndClusterActionDecode(SSdbRaw *pRaw) { ...@@ -51,7 +92,7 @@ static SSdbRow *mndClusterActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT32(pRaw, pRow, dataPos, &pCluster->id) SDB_GET_INT32(pRaw, pRow, dataPos, &pCluster->id)
SDB_GET_INT64(pRaw, pRow, dataPos, &pCluster->createdTime) SDB_GET_INT64(pRaw, pRow, dataPos, &pCluster->createdTime)
SDB_GET_INT64(pRaw, pRow, dataPos, &pCluster->updateTime) SDB_GET_INT64(pRaw, pRow, dataPos, &pCluster->updateTime)
SDB_GET_BINARY(pRaw, pRow, dataPos, pCluster->uid, TSDB_CLUSTER_ID_LEN) SDB_GET_BINARY(pRaw, pRow, dataPos, pCluster->name, TSDB_CLUSTER_ID_LEN)
return pRow; return pRow;
} }
...@@ -76,14 +117,14 @@ static int32_t mndCreateDefaultCluster(SMnode *pMnode) { ...@@ -76,14 +117,14 @@ static int32_t mndCreateDefaultCluster(SMnode *pMnode) {
clusterObj.createdTime = taosGetTimestampMs(); clusterObj.createdTime = taosGetTimestampMs();
clusterObj.updateTime = clusterObj.createdTime; clusterObj.updateTime = clusterObj.createdTime;
int32_t code = taosGetSystemUid(clusterObj.uid, TSDB_CLUSTER_ID_LEN); int32_t code = taosGetSystemUid(clusterObj.name, TSDB_CLUSTER_ID_LEN);
if (code != 0) { if (code != 0) {
strcpy(clusterObj.uid, "tdengine2.0"); strcpy(clusterObj.name, "tdengine2.0");
mError("failed to get uid from system, set to default val %s", clusterObj.uid); mError("failed to get name from system, set to default val %s", clusterObj.name);
} else { } else {
mDebug("cluster:%d, uid is %s", clusterObj.id, clusterObj.uid); mDebug("cluster:%d, name is %s", clusterObj.id, clusterObj.name);
} }
clusterObj.id = MurmurHash3_32(clusterObj.uid, TSDB_CLUSTER_ID_LEN); clusterObj.id = MurmurHash3_32(clusterObj.name, TSDB_CLUSTER_ID_LEN);
clusterObj.id = abs(clusterObj.id); clusterObj.id = abs(clusterObj.id);
pMnode->clusterId = clusterObj.id; pMnode->clusterId = clusterObj.id;
...@@ -95,85 +136,79 @@ static int32_t mndCreateDefaultCluster(SMnode *pMnode) { ...@@ -95,85 +136,79 @@ static int32_t mndCreateDefaultCluster(SMnode *pMnode) {
return sdbWrite(pMnode->pSdb, pRaw); return sdbWrite(pMnode->pSdb, pRaw);
} }
static int32_t mndGetClusterMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta) {
int32_t cols = 0;
SSchema *pSchema = pMeta->schema;
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT;
strcpy(pSchema[cols].name, "id");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = TSDB_CLUSTER_ID_LEN + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "name");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 8;
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
strcpy(pSchema[cols].name, "create_time");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pMeta->numOfColumns = htons(cols);
strcpy(pMeta->tableFname, "show cluster");
pShow->numOfColumns = cols;
pShow->offset[0] = 0;
for (int32_t i = 1; i < cols; ++i) {
pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
}
// static int32_t mnodeGetClusterMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { pShow->numOfRows = 1;
// int32_t cols = 0; pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
// SSchema *pSchema = pMeta->schema;
// pShow->bytes[cols] = TSDB_CLUSTER_ID_LEN + VARSTR_HEADER_SIZE;
// pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
// strcpy(pSchema[cols].name, "clusterId");
// pSchema[cols].bytes = htons(pShow->bytes[cols]);
// cols++;
// pShow->bytes[cols] = 8;
// pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
// strcpy(pSchema[cols].name, "create_time");
// pSchema[cols].bytes = htons(pShow->bytes[cols]);
// cols++;
// pMeta->numOfColumns = htons(cols);
// strcpy(pMeta->tableFname, "show cluster");
// pShow->numOfColumns = cols;
// pShow->offset[0] = 0;
// for (int32_t i = 1; i < cols; ++i) {
// pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
// }
// pShow->numOfRows = 1;
// pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
// return 0;
// }
// static int32_t mnodeRetrieveClusters(SShowObj *pShow, char *data, int32_t rows, void *pConn) {
// int32_t numOfRows = 0;
// int32_t cols = 0;
// char * pWrite;
// SClusterObj *pCluster = NULL;
// while (numOfRows < rows) { return 0;
// pShow->pIter = mnodeGetNextCluster(pShow->pIter, &pCluster); }
// if (pCluster == NULL) break;
// cols = 0; static int32_t mndRetrieveClusters(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows) {
SMnode *pMnode = pMsg->pMnode;
SSdb *pSdb = pMnode->pSdb;
int32_t numOfRows = 0;
int32_t cols = 0;
char *pWrite;
SClusterObj *pCluster = NULL;
// pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; while (numOfRows < rows) {
// STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pCluster->uid, TSDB_CLUSTER_ID_LEN); pShow->pIter = sdbFetch(pSdb, SDB_CLUSTER, pShow->pIter, (void **)&pCluster);
// cols++; if (pShow->pIter == NULL) break;
// pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; cols = 0;
// *(int64_t *) pWrite = pCluster->createdTime;
// cols++;
// mnodeDecClusterRef(pCluster); pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
// numOfRows++; *(int32_t *)pWrite = pCluster->id;
// } cols++;
// mnodeVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow); pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
// pShow->numOfReads += numOfRows; STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pCluster->name, TSDB_CLUSTER_ID_LEN);
// return numOfRows; cols++;
// }
// static void mnodeCancelGetNextCluster(void *pIter) { pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
// sdbFreeIter(tsClusterSdb, pIter); *(int64_t *)pWrite = pCluster->createdTime;
// } cols++;
int32_t mndInitCluster(SMnode *pMnode) { sdbRelease(pSdb, pCluster);
SSdbTable table = {.sdbType = SDB_CLUSTER, numOfRows++;
.keyType = SDB_KEY_INT32, }
.deployFp = (SdbDeployFp)mndCreateDefaultCluster,
.encodeFp = (SdbEncodeFp)mndClusterActionEncode,
.decodeFp = (SdbDecodeFp)mndClusterActionDecode,
.insertFp = (SdbInsertFp)mndClusterActionInsert,
.updateFp = (SdbUpdateFp)mndClusterActionUpdate,
.deleteFp = (SdbDeleteFp)mndClusterActionDelete};
// mndAddShowMetaHandle(TSDB_MGMT_TABLE_CLUSTER, mnodeGetClusterMeta); mnodeVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow);
// mndAddShowRetrieveHandle(TSDB_MGMT_TABLE_CLUSTER, mnodeRetrieveClusters); pShow->numOfReads += numOfRows;
// mndAddShowFreeIterHandle(TSDB_MGMT_TABLE_CLUSTER, mnodeCancelGetNextCluster); return numOfRows;
return sdbSetTable(pMnode->pSdb, table);
} }
void mndCleanupCluster(SMnode *pMnode) {} static void mndCancelGetNextCluster(SMnode *pMnode, void *pIter) {
SSdb *pSdb = pMnode->pSdb;
sdbCancelFetch(pSdb, pIter);
}
...@@ -179,32 +179,33 @@ static void mndGetDnodeData(SMnode *pMnode, SDnodeEps *pEps, int32_t numOfEps) { ...@@ -179,32 +179,33 @@ static void mndGetDnodeData(SMnode *pMnode, SDnodeEps *pEps, int32_t numOfEps) {
} }
static int32_t mndCheckClusterCfgPara(SMnode *pMnode, const SClusterCfg *pCfg) { static int32_t mndCheckClusterCfgPara(SMnode *pMnode, const SClusterCfg *pCfg) {
if (pCfg->mnodeEqualVnodeNum != pMnode->mnodeEqualVnodeNum) { if (pCfg->mnodeEqualVnodeNum != pMnode->cfg.mnodeEqualVnodeNum) {
mError("\"mnodeEqualVnodeNum\"[%d - %d] cfg inconsistent", pCfg->mnodeEqualVnodeNum, pMnode->mnodeEqualVnodeNum); mError("\"mnodeEqualVnodeNum\"[%d - %d] cfg inconsistent", pCfg->mnodeEqualVnodeNum,
pMnode->cfg.mnodeEqualVnodeNum);
return DND_REASON_MN_EQUAL_VN_NOT_MATCH; return DND_REASON_MN_EQUAL_VN_NOT_MATCH;
} }
if (pCfg->statusInterval != pMnode->statusInterval) { if (pCfg->statusInterval != pMnode->cfg.statusInterval) {
mError("\"statusInterval\"[%d - %d] cfg inconsistent", pCfg->statusInterval, pMnode->statusInterval); mError("\"statusInterval\"[%d - %d] cfg inconsistent", pCfg->statusInterval, pMnode->cfg.statusInterval);
return DND_REASON_STATUS_INTERVAL_NOT_MATCH; return DND_REASON_STATUS_INTERVAL_NOT_MATCH;
} }
int64_t checkTime = 0; int64_t checkTime = 0;
char timestr[32] = "1970-01-01 00:00:00.00"; char timestr[32] = "1970-01-01 00:00:00.00";
(void)taosParseTime(timestr, &checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0); (void)taosParseTime(timestr, &checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0);
if ((0 != strcasecmp(pCfg->timezone, pMnode->timezone)) && (checkTime != pCfg->checkTime)) { if ((0 != strcasecmp(pCfg->timezone, pMnode->cfg.timezone)) && (checkTime != pCfg->checkTime)) {
mError("\"timezone\"[%s - %s] [%" PRId64 " - %" PRId64 "] cfg inconsistent", pCfg->timezone, tsTimezone, mError("\"timezone\"[%s - %s] [%" PRId64 " - %" PRId64 "] cfg inconsistent", pCfg->timezone, pMnode->cfg.timezone,
pCfg->checkTime, checkTime); pCfg->checkTime, checkTime);
return DND_REASON_TIME_ZONE_NOT_MATCH; return DND_REASON_TIME_ZONE_NOT_MATCH;
} }
if (0 != strcasecmp(pCfg->locale, pMnode->locale)) { if (0 != strcasecmp(pCfg->locale, pMnode->cfg.locale)) {
mError("\"locale\"[%s - %s] cfg parameters inconsistent", pCfg->locale, pMnode->locale); mError("\"locale\"[%s - %s] cfg parameters inconsistent", pCfg->locale, pMnode->cfg.locale);
return DND_REASON_LOCALE_NOT_MATCH; return DND_REASON_LOCALE_NOT_MATCH;
} }
if (0 != strcasecmp(pCfg->charset, pMnode->charset)) { if (0 != strcasecmp(pCfg->charset, pMnode->cfg.charset)) {
mError("\"charset\"[%s - %s] cfg parameters inconsistent.", pCfg->charset, pMnode->charset); mError("\"charset\"[%s - %s] cfg parameters inconsistent.", pCfg->charset, pMnode->cfg.charset);
return DND_REASON_CHARSET_NOT_MATCH; return DND_REASON_CHARSET_NOT_MATCH;
} }
...@@ -251,12 +252,12 @@ static int32_t mndProcessStatusMsg(SMnodeMsg *pMsg) { ...@@ -251,12 +252,12 @@ static int32_t mndProcessStatusMsg(SMnodeMsg *pMsg) {
} }
} }
if (pStatus->sver != pMnode->sver) { if (pStatus->sver != pMnode->cfg.sver) {
if (pDnode != NULL && pDnode->status != DND_STATUS_READY) { if (pDnode != NULL && pDnode->status != DND_STATUS_READY) {
pDnode->offlineReason = DND_REASON_VERSION_NOT_MATCH; pDnode->offlineReason = DND_REASON_VERSION_NOT_MATCH;
} }
mndReleaseDnode(pMnode, pDnode); mndReleaseDnode(pMnode, pDnode);
mError("dnode:%d, status msg version:%d not match cluster:%d", pStatus->dnodeId, pStatus->sver, pMnode->sver); mError("dnode:%d, status msg version:%d not match cluster:%d", pStatus->dnodeId, pStatus->sver, pMnode->cfg.sver);
return TSDB_CODE_MND_INVALID_MSG_VERSION; return TSDB_CODE_MND_INVALID_MSG_VERSION;
} }
......
...@@ -16,19 +16,19 @@ ...@@ -16,19 +16,19 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "mndShow.h" #include "mndShow.h"
static int32_t mndProcessShowMsg(SMnodeMsg *pMnodeMsg); static SShowObj *mndCreateShowObj(SMnode *pMnode, SShowMsg *pMsg);
static int32_t mndProcessRetrieveMsg( SMnodeMsg *pMsg); static void mndFreeShowObj(SShowObj *pShow);
static bool mndCheckRetrieveFinished(SShowObj *pShow); static SShowObj *mndAcquireShowObj(SMnode *pMnode, int32_t showId);
static int32_t mndAcquireShowObj(SMnode *pMnode, SShowObj *pShow);
static void mndReleaseShowObj(SShowObj *pShow, bool forceRemove); static void mndReleaseShowObj(SShowObj *pShow, bool forceRemove);
static int32_t mndPutShowObj(SMnode *pMnode, SShowObj *pShow);
static void mndFreeShowObj(void *ppShow);
static char *mndShowStr(int32_t showType); static char *mndShowStr(int32_t showType);
static int32_t mndProcessShowMsg(SMnodeMsg *pMnodeMsg);
static int32_t mndProcessRetrieveMsg(SMnodeMsg *pMsg);
static bool mndCheckRetrieveFinished(SShowObj *pShow);
int32_t mndInitShow(SMnode *pMnode) { int32_t mndInitShow(SMnode *pMnode) {
SShowMgmt *pMgmt = &pMnode->showMgmt; SShowMgmt *pMgmt = &pMnode->showMgmt;
pMgmt->cache = taosCacheInit(TSDB_CACHE_PTR_KEY, 5, true, mndFreeShowObj, "show"); pMgmt->cache = taosCacheInit(TSDB_DATA_TYPE_INT, 5, true, (__cache_free_fn_t)mndFreeShowObj, "show");
if (pMgmt->cache == NULL) { if (pMgmt->cache == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("failed to alloc show cache since %s", terrstr()); mError("failed to alloc show cache since %s", terrstr());
...@@ -48,47 +48,41 @@ void mndCleanupShow(SMnode *pMnode) { ...@@ -48,47 +48,41 @@ void mndCleanupShow(SMnode *pMnode) {
} }
} }
static int32_t mndAcquireShowObj(SMnode *pMnode, SShowObj *pShow) { static SShowObj *mndCreateShowObj(SMnode *pMnode, SShowMsg *pMsg) {
TSDB_CACHE_PTR_TYPE handleVal = (TSDB_CACHE_PTR_TYPE)pShow;
SShowMgmt *pMgmt = &pMnode->showMgmt; SShowMgmt *pMgmt = &pMnode->showMgmt;
SShowObj **ppShow = taosCacheAcquireByKey(pMgmt->cache, &handleVal, sizeof(TSDB_CACHE_PTR_TYPE));
if (ppShow) {
mTrace("show:%d, data:%p acquired from cache", pShow->id, ppShow);
return 0;
}
return -1; int32_t showId = atomic_add_fetch_32(&pMgmt->showId, 1);
} if (showId == 0) atomic_add_fetch_32(&pMgmt->showId, 1);
static void mndReleaseShowObj(SShowObj *pShow, bool forceRemove) {
SMnode *pMnode = pShow->pMnode;
SShowMgmt *pMgmt = &pMnode->showMgmt;
SShowObj **ppShow = (SShowObj **)pShow->ppShow;
taosCacheRelease(pMgmt->cache, (void **)(&ppShow), forceRemove);
mDebug("show:%d, data:%p released from cache, force:%d", pShow->id, ppShow, forceRemove);
}
static int32_t mndPutShowObj(SMnode *pMnode, SShowObj *pShow) { int32_t size = sizeof(SShowObj) + pMsg->payloadLen;
SShowMgmt *pMgmt = &pMnode->showMgmt; SShowObj *pShow = calloc(1, size);
int32_t lifeSpan = pMnode->shellActivityTimer * 6 * 1000; if (pShow != NULL) {
pShow->id = showId;
TSDB_CACHE_PTR_TYPE val = (TSDB_CACHE_PTR_TYPE)pShow; pShow->pMnode = pMnode;
pShow->id = atomic_add_fetch_32(&pMgmt->showId, 1); pShow->type = pMsg->type;
SShowObj **ppShow = pShow->payloadLen = pMsg->payloadLen;
taosCachePut(pMgmt->cache, &val, sizeof(TSDB_CACHE_PTR_TYPE), &pShow, sizeof(TSDB_CACHE_PTR_TYPE), lifeSpan); memcpy(pShow->db, pMsg->db, TSDB_FULL_DB_NAME_LEN);
if (ppShow == NULL) { memcpy(pShow->payload, pMsg->payload, pMsg->payloadLen);
} else {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("show:%d, failed to put into cache", pShow->id); mError("failed to process show-meta msg:%s since %s", mndShowStr(pMsg->type), terrstr());
return -1; return NULL;
} }
mTrace("show:%d, data:%p put into cache", pShow->id, ppShow); int32_t keepTime = pMnode->cfg.shellActivityTimer * 6 * 1000;
return 0; SShowObj *pShowRet = taosCachePut(pMgmt->cache, &showId, sizeof(int32_t), pShow, size, keepTime);
free(pShow);
if (pShowRet == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("show:%d, failed to put into cache since %s", showId, terrstr());
return NULL;
} else {
mTrace("show:%d, data:%p created", showId, pShowRet);
return pShowRet;
}
} }
static void mndFreeShowObj(void *ppShow) { static void mndFreeShowObj(SShowObj *pShow) {
SShowObj *pShow = *(SShowObj **)ppShow;
SMnode *pMnode = pShow->pMnode; SMnode *pMnode = pShow->pMnode;
SShowMgmt *pMgmt = &pMnode->showMgmt; SShowMgmt *pMgmt = &pMnode->showMgmt;
...@@ -103,8 +97,32 @@ static void mndFreeShowObj(void *ppShow) { ...@@ -103,8 +97,32 @@ static void mndFreeShowObj(void *ppShow) {
} }
} }
mDebug("show:%d, data:%p destroyed", pShow->id, ppShow); mTrace("show:%d, data:%p destroyed", pShow->id, pShow);
tfree(pShow); }
static SShowObj *mndAcquireShowObj(SMnode *pMnode, int32_t showId) {
SShowMgmt *pMgmt = &pMnode->showMgmt;
SShowObj *pShow = taosCacheAcquireByKey(pMgmt->cache, &showId, sizeof(int32_t));
if (pShow == NULL) {
mError("show:%d, already destroyed", showId);
return NULL;
}
mTrace("show:%d, data:%p acquired from cache", pShow->id, pShow);
return pShow;
}
static void mndReleaseShowObj(SShowObj *pShow, bool forceRemove) {
if (pShow == NULL) return;
mTrace("show:%d, data:%p released from cache, force:%d", pShow->id, pShow, forceRemove);
// A bug in tcache.c
forceRemove = 0;
SMnode *pMnode = pShow->pMnode;
SShowMgmt *pMgmt = &pMnode->showMgmt;
taosCacheRelease(pMgmt->cache, (void **)(&pShow), forceRemove);
} }
static int32_t mndProcessShowMsg(SMnodeMsg *pMnodeMsg) { static int32_t mndProcessShowMsg(SMnodeMsg *pMnodeMsg) {
...@@ -112,7 +130,7 @@ static int32_t mndProcessShowMsg(SMnodeMsg *pMnodeMsg) { ...@@ -112,7 +130,7 @@ static int32_t mndProcessShowMsg(SMnodeMsg *pMnodeMsg) {
SShowMgmt *pMgmt = &pMnode->showMgmt; SShowMgmt *pMgmt = &pMnode->showMgmt;
SShowMsg *pMsg = pMnodeMsg->rpcMsg.pCont; SShowMsg *pMsg = pMnodeMsg->rpcMsg.pCont;
int8_t type = pMsg->type; int8_t type = pMsg->type;
uint16_t payloadLen = htonl(pMsg->payloadLen); int16_t payloadLen = htonl(pMsg->payloadLen);
if (type <= TSDB_MGMT_TABLE_START || type >= TSDB_MGMT_TABLE_MAX) { if (type <= TSDB_MGMT_TABLE_START || type >= TSDB_MGMT_TABLE_MAX) {
terrno = TSDB_CODE_MND_INVALID_MSG_TYPE; terrno = TSDB_CODE_MND_INVALID_MSG_TYPE;
...@@ -127,27 +145,13 @@ static int32_t mndProcessShowMsg(SMnodeMsg *pMnodeMsg) { ...@@ -127,27 +145,13 @@ static int32_t mndProcessShowMsg(SMnodeMsg *pMnodeMsg) {
return -1; return -1;
} }
int32_t size = sizeof(SShowObj) + payloadLen; SShowObj *pShow = mndCreateShowObj(pMnode, pMsg);
SShowObj *pShow = calloc(1, size); if (pShow == NULL) {
if (pShow != NULL) {
pShow->pMnode = pMnode;
pShow->type = type;
pShow->payloadLen = payloadLen;
memcpy(pShow->db, pMsg->db, TSDB_FULL_DB_NAME_LEN);
memcpy(pShow->payload, pMsg->payload, payloadLen);
} else {
terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("failed to process show-meta msg:%s since %s", mndShowStr(type), terrstr()); mError("failed to process show-meta msg:%s since %s", mndShowStr(type), terrstr());
return -1; return -1;
} }
if (mndPutShowObj(pMnode, pShow) == 0) { int32_t size = sizeof(SShowRsp) + sizeof(SSchema) * TSDB_MAX_COLUMNS + TSDB_EXTRA_PAYLOAD_SIZE;
mError("failed to process show-meta msg:%s since %s", mndShowStr(type), terrstr());
free(pShow);
return -1;
}
size = sizeof(SShowRsp) + sizeof(SSchema) * TSDB_MAX_COLUMNS + TSDB_EXTRA_PAYLOAD_SIZE;
SShowRsp *pRsp = rpcMallocCont(size); SShowRsp *pRsp = rpcMallocCont(size);
if (pRsp == NULL) { if (pRsp == NULL) {
mndReleaseShowObj(pShow, true); mndReleaseShowObj(pShow, true);
...@@ -156,15 +160,14 @@ static int32_t mndProcessShowMsg(SMnodeMsg *pMnodeMsg) { ...@@ -156,15 +160,14 @@ static int32_t mndProcessShowMsg(SMnodeMsg *pMnodeMsg) {
return -1; return -1;
} }
pRsp->qhandle = htobe64((uint64_t)pShow); int32_t code = (*metaFp)(pMnodeMsg, pShow, &pRsp->tableMeta);
mDebug("show:%d, data:%p get meta finished, numOfRows:%d cols:%d type:%s result:%s", pShow->id, pShow,
int32_t code = (*metaFp)(pMnodeMsg,pShow, &pRsp->tableMeta); pShow->numOfRows, pShow->numOfColumns, mndShowStr(type), tstrerror(code));
mDebug("show:%d, type:%s, get meta finished, numOfRows:%d cols:%d result:%s", pShow->id, mndShowStr(type),
pShow->numOfRows, pShow->numOfColumns, tstrerror(code));
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
pMnodeMsg->contLen = sizeof(SShowRsp) + sizeof(SSchema) * pShow->numOfColumns; pMnodeMsg->contLen = sizeof(SShowRsp) + sizeof(SSchema) * pShow->numOfColumns;
pMnodeMsg->pCont = pRsp; pMnodeMsg->pCont = pRsp;
pRsp->showId = htonl(pShow->id);
mndReleaseShowObj(pShow, false); mndReleaseShowObj(pShow, false);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else { } else {
...@@ -182,14 +185,10 @@ static int32_t mndProcessRetrieveMsg(SMnodeMsg *pMnodeMsg) { ...@@ -182,14 +185,10 @@ static int32_t mndProcessRetrieveMsg(SMnodeMsg *pMnodeMsg) {
int32_t rowsRead = 0; int32_t rowsRead = 0;
SRetrieveTableMsg *pRetrieve = pMnodeMsg->rpcMsg.pCont; SRetrieveTableMsg *pRetrieve = pMnodeMsg->rpcMsg.pCont;
pRetrieve->qhandle = htobe64(pRetrieve->qhandle); int32_t showId = htonl(pRetrieve->showId);
SShowObj *pShow = (SShowObj *)pRetrieve->qhandle;
/* SShowObj *pShow = mndAcquireShowObj(pMnode, showId);
* in case of server restart, apps may hold qhandle created by server before if (pShow == NULL) {
* restart, which is actually invalid, therefore, signature check is required.
*/
if (mndAcquireShowObj(pMnode, pShow) != 0) {
terrno = TSDB_CODE_MND_INVALID_SHOWOBJ; terrno = TSDB_CODE_MND_INVALID_SHOWOBJ;
mError("failed to process show-retrieve msg:%p since %s", pShow, terrstr()); mError("failed to process show-retrieve msg:%p since %s", pShow, terrstr());
return -1; return -1;
...@@ -199,15 +198,16 @@ static int32_t mndProcessRetrieveMsg(SMnodeMsg *pMnodeMsg) { ...@@ -199,15 +198,16 @@ static int32_t mndProcessRetrieveMsg(SMnodeMsg *pMnodeMsg) {
if (retrieveFp == NULL) { if (retrieveFp == NULL) {
mndReleaseShowObj(pShow, false); mndReleaseShowObj(pShow, false);
terrno = TSDB_CODE_MSG_NOT_PROCESSED; terrno = TSDB_CODE_MSG_NOT_PROCESSED;
mError("show:%d, failed to retrieve data since %s", pShow->id, terrstr()); mError("show:%d, data:%p failed to retrieve data since %s", pShow->id, pShow, terrstr());
return -1; return -1;
} }
mDebug("show:%d, type:%s, start retrieve data, numOfReads:%d numOfRows:%d", pShow->id, mndShowStr(pShow->type), mDebug("show:%d, data:%p start retrieve data, numOfReads:%d numOfRows:%d type:%s", pShow->id, pShow,
pShow->numOfReads, pShow->numOfRows); pShow->numOfReads, pShow->numOfRows, mndShowStr(pShow->type));
if (mndCheckRetrieveFinished(pShow)) { if (mndCheckRetrieveFinished(pShow)) {
mDebug("show:%d, read finished, numOfReads:%d numOfRows:%d", pShow->id, pShow->numOfReads, pShow->numOfRows); mDebug("show:%d, data:%p read finished, numOfReads:%d numOfRows:%d", pShow->id, pShow, pShow->numOfReads,
pShow->numOfRows);
pShow->numOfReads = pShow->numOfRows; pShow->numOfReads = pShow->numOfRows;
} }
...@@ -230,7 +230,7 @@ static int32_t mndProcessRetrieveMsg(SMnodeMsg *pMnodeMsg) { ...@@ -230,7 +230,7 @@ static int32_t mndProcessRetrieveMsg(SMnodeMsg *pMnodeMsg) {
if (pRsp == NULL) { if (pRsp == NULL) {
mndReleaseShowObj(pShow, false); mndReleaseShowObj(pShow, false);
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("show:%d, failed to retrieve data since %s", pShow->id, terrstr()); mError("show:%d, data:%p failed to retrieve data since %s", pShow->id, pShow, terrstr());
return -1; return -1;
} }
...@@ -239,20 +239,20 @@ static int32_t mndProcessRetrieveMsg(SMnodeMsg *pMnodeMsg) { ...@@ -239,20 +239,20 @@ static int32_t mndProcessRetrieveMsg(SMnodeMsg *pMnodeMsg) {
rowsRead = (*retrieveFp)(pMnodeMsg, pShow, pRsp->data, rowsToRead); rowsRead = (*retrieveFp)(pMnodeMsg, pShow, pRsp->data, rowsToRead);
} }
mDebug("show:%d, stop retrieve data, rowsRead:%d rowsToRead:%d", pShow->id, rowsRead, rowsToRead); mDebug("show:%d, data:%p stop retrieve data, rowsRead:%d rowsToRead:%d", pShow->id, pShow, rowsRead, rowsToRead);
pRsp->numOfRows = htonl(rowsRead); pRsp->numOfRows = htonl(rowsRead);
pRsp->precision = (int16_t)htonl(TSDB_TIME_PRECISION_MILLI); // millisecond time precision pRsp->precision = TSDB_TIME_PRECISION_MILLI; // millisecond time precision
pMnodeMsg->pCont = pRsp; pMnodeMsg->pCont = pRsp;
pMnodeMsg->contLen = size; pMnodeMsg->contLen = size;
if (rowsToRead == 0 || (rowsRead == rowsToRead && pShow->numOfRows == pShow->numOfReads)) { if (rowsRead == 0 || rowsToRead == 0 || (rowsRead == rowsToRead && pShow->numOfRows == pShow->numOfReads)) {
pRsp->completed = 1; pRsp->completed = 1;
mDebug("%p, retrieve completed", pShow); mDebug("show:%d, data:%p retrieve completed", pShow->id, pShow);
mndReleaseShowObj(pShow, true); mndReleaseShowObj(pShow, true);
} else { } else {
mDebug("%p, retrieve not completed yet", pShow); mDebug("show:%d, data:%p retrieve not completed yet", pShow->id, pShow);
mndReleaseShowObj(pShow, false); mndReleaseShowObj(pShow, false);
} }
......
...@@ -15,27 +15,15 @@ ...@@ -15,27 +15,15 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "mndTelem.h" #include "mndTelem.h"
#include "tbuffer.h" #include "mndCluster.h"
#include "tglobal.h"
#include "mndSync.h" #include "mndSync.h"
#include "tbuffer.h"
#include "tversion.h"
#define TELEMETRY_SERVER "telemetry.taosdata.com" #define TELEMETRY_SERVER "telemetry.taosdata.com"
#define TELEMETRY_PORT 80 #define TELEMETRY_PORT 80
#define REPORT_INTERVAL 86400 #define REPORT_INTERVAL 86400
/*
* sem_timedwait is NOT implemented on MacOSX
* thus we use pthread_mutex_t/pthread_cond_t to simulate
*/
static struct {
bool enable;
pthread_mutex_t lock;
pthread_cond_t cond;
volatile int32_t exit;
pthread_t thread;
char email[TSDB_FQDN_LEN];
} tsTelem;
static void mndBeginObject(SBufferWriter* bw) { tbufWriteChar(bw, '{'); } static void mndBeginObject(SBufferWriter* bw) { tbufWriteChar(bw, '{'); }
static void mndCloseObject(SBufferWriter* bw) { static void mndCloseObject(SBufferWriter* bw) {
...@@ -86,7 +74,7 @@ static void mndAddStringField(SBufferWriter* bw, const char* k, const char* v) { ...@@ -86,7 +74,7 @@ static void mndAddStringField(SBufferWriter* bw, const char* k, const char* v) {
tbufWriteChar(bw, ','); tbufWriteChar(bw, ',');
} }
static void mndAddCpuInfo(SBufferWriter* bw) { static void mndAddCpuInfo(SMnode* pMnode, SBufferWriter* bw) {
char* line = NULL; char* line = NULL;
size_t size = 0; size_t size = 0;
int32_t done = 0; int32_t done = 0;
...@@ -116,7 +104,7 @@ static void mndAddCpuInfo(SBufferWriter* bw) { ...@@ -116,7 +104,7 @@ static void mndAddCpuInfo(SBufferWriter* bw) {
fclose(fp); fclose(fp);
} }
static void mndAddOsInfo(SBufferWriter* bw) { static void mndAddOsInfo(SMnode* pMnode, SBufferWriter* bw) {
char* line = NULL; char* line = NULL;
size_t size = 0; size_t size = 0;
...@@ -142,7 +130,7 @@ static void mndAddOsInfo(SBufferWriter* bw) { ...@@ -142,7 +130,7 @@ static void mndAddOsInfo(SBufferWriter* bw) {
fclose(fp); fclose(fp);
} }
static void mndAddMemoryInfo(SBufferWriter* bw) { static void mndAddMemoryInfo(SMnode* pMnode, SBufferWriter* bw) {
char* line = NULL; char* line = NULL;
size_t size = 0; size_t size = 0;
...@@ -165,16 +153,21 @@ static void mndAddMemoryInfo(SBufferWriter* bw) { ...@@ -165,16 +153,21 @@ static void mndAddMemoryInfo(SBufferWriter* bw) {
fclose(fp); fclose(fp);
} }
static void mndAddVersionInfo(SBufferWriter* bw) { static void mndAddVersionInfo(SMnode* pMnode, SBufferWriter* bw) {
mndAddStringField(bw, "version", version); STelemMgmt* pMgmt = &pMnode->telemMgmt;
mndAddStringField(bw, "buildInfo", buildinfo);
mndAddStringField(bw, "gitInfo", gitinfo); char vstr[32] = {0};
mndAddStringField(bw, "email", tsTelem.email); taosVersionIntToStr(pMnode->cfg.sver, vstr, 32);
mndAddStringField(bw, "version", vstr);
mndAddStringField(bw, "buildInfo", pMnode->cfg.buildinfo);
mndAddStringField(bw, "gitInfo", pMnode->cfg.gitinfo);
mndAddStringField(bw, "email", pMgmt->email);
} }
static void mndAddRuntimeInfo(SBufferWriter* bw) { static void mndAddRuntimeInfo(SMnode* pMnode, SBufferWriter* bw) {
SMnodeLoad load = {0}; SMnodeLoad load = {0};
if (mndGetLoad(NULL, &load) != 0) { if (mndGetLoad(pMnode, &load) != 0) {
return; return;
} }
...@@ -190,11 +183,13 @@ static void mndAddRuntimeInfo(SBufferWriter* bw) { ...@@ -190,11 +183,13 @@ static void mndAddRuntimeInfo(SBufferWriter* bw) {
mndAddIntField(bw, "compStorage", load.compStorage); mndAddIntField(bw, "compStorage", load.compStorage);
} }
static void mndSendTelemetryReport() { static void mndSendTelemetryReport(SMnode* pMnode) {
STelemMgmt* pMgmt = &pMnode->telemMgmt;
char buf[128] = {0}; char buf[128] = {0};
uint32_t ip = taosGetIpv4FromFqdn(TELEMETRY_SERVER); uint32_t ip = taosGetIpv4FromFqdn(TELEMETRY_SERVER);
if (ip == 0xffffffff) { if (ip == 0xffffffff) {
mTrace("failed to get IP address of " TELEMETRY_SERVER ", reason:%s", strerror(errno)); mTrace("failed to get IP address of " TELEMETRY_SERVER " since :%s", strerror(errno));
return; return;
} }
SOCKET fd = taosOpenTcpClientSocket(ip, TELEMETRY_PORT, 0); SOCKET fd = taosOpenTcpClientSocket(ip, TELEMETRY_PORT, 0);
...@@ -203,19 +198,18 @@ static void mndSendTelemetryReport() { ...@@ -203,19 +198,18 @@ static void mndSendTelemetryReport() {
return; return;
} }
int32_t clusterId = 0; char clusterName[64] = {0};
char clusterIdStr[20] = {0}; mndGetClusterName(pMnode, clusterName, sizeof(clusterName));
snprintf(clusterIdStr, sizeof(clusterIdStr), "%d", clusterId);
SBufferWriter bw = tbufInitWriter(NULL, false); SBufferWriter bw = tbufInitWriter(NULL, false);
mndBeginObject(&bw); mndBeginObject(&bw);
mndAddStringField(&bw, "instanceId", clusterIdStr); mndAddStringField(&bw, "instanceId", clusterName);
mndAddIntField(&bw, "reportVersion", 1); mndAddIntField(&bw, "reportVersion", 1);
mndAddOsInfo(&bw); mndAddOsInfo(pMnode, &bw);
mndAddCpuInfo(&bw); mndAddCpuInfo(pMnode, &bw);
mndAddMemoryInfo(&bw); mndAddMemoryInfo(pMnode, &bw);
mndAddVersionInfo(&bw); mndAddVersionInfo(pMnode, &bw);
mndAddRuntimeInfo(&bw); mndAddRuntimeInfo(pMnode, &bw);
mndCloseObject(&bw); mndCloseObject(&bw);
const char* header = const char* header =
...@@ -241,23 +235,26 @@ static void mndSendTelemetryReport() { ...@@ -241,23 +235,26 @@ static void mndSendTelemetryReport() {
} }
static void* mndTelemThreadFp(void* param) { static void* mndTelemThreadFp(void* param) {
SMnode* pMnode = param;
STelemMgmt* pMgmt = &pMnode->telemMgmt;
struct timespec end = {0}; struct timespec end = {0};
clock_gettime(CLOCK_REALTIME, &end); clock_gettime(CLOCK_REALTIME, &end);
end.tv_sec += 300; // wait 5 minutes before send first report end.tv_sec += 300; // wait 5 minutes before send first report
setThreadName("mnd-telem"); setThreadName("mnd-telem");
while (!tsTelem.exit) { while (!pMgmt->exit) {
int32_t r = 0; int32_t r = 0;
struct timespec ts = end; struct timespec ts = end;
pthread_mutex_lock(&tsTelem.lock); pthread_mutex_lock(&pMgmt->lock);
r = pthread_cond_timedwait(&tsTelem.cond, &tsTelem.lock, &ts); r = pthread_cond_timedwait(&pMgmt->cond, &pMgmt->lock, &ts);
pthread_mutex_unlock(&tsTelem.lock); pthread_mutex_unlock(&pMgmt->lock);
if (r == 0) break; if (r == 0) break;
if (r != ETIMEDOUT) continue; if (r != ETIMEDOUT) continue;
if (mndIsMaster(NULL)) { if (mndIsMaster(pMnode)) {
mndSendTelemetryReport(); mndSendTelemetryReport(pMnode);
} }
end.tv_sec += REPORT_INTERVAL; end.tv_sec += REPORT_INTERVAL;
} }
...@@ -265,35 +262,39 @@ static void* mndTelemThreadFp(void* param) { ...@@ -265,35 +262,39 @@ static void* mndTelemThreadFp(void* param) {
return NULL; return NULL;
} }
static void mndGetEmail(char* filepath) { static void mndGetEmail(SMnode* pMnode, char* filepath) {
STelemMgmt* pMgmt = &pMnode->telemMgmt;
int32_t fd = taosOpenFileRead(filepath); int32_t fd = taosOpenFileRead(filepath);
if (fd < 0) { if (fd < 0) {
return; return;
} }
if (taosReadFile(fd, (void*)tsTelem.email, TSDB_FQDN_LEN) < 0) { if (taosReadFile(fd, (void*)pMgmt->email, TSDB_FQDN_LEN) < 0) {
mError("failed to read %d bytes from file %s since %s", TSDB_FQDN_LEN, filepath, strerror(errno)); mError("failed to read %d bytes from file %s since %s", TSDB_FQDN_LEN, filepath, strerror(errno));
} }
taosCloseFile(fd); taosCloseFile(fd);
} }
int32_t mndInitTelem(SMnode *pMnode) { int32_t mndInitTelem(SMnode* pMnode) {
tsTelem.enable = tsEnableTelemetryReporting; STelemMgmt* pMgmt = &pMnode->telemMgmt;
if (!tsTelem.enable) return 0; pMgmt->enable = pMnode->cfg.enableTelem;
if (!pMgmt->enable) return 0;
tsTelem.exit = 0; pMgmt->exit = 0;
pthread_mutex_init(&tsTelem.lock, NULL); pthread_mutex_init(&pMgmt->lock, NULL);
pthread_cond_init(&tsTelem.cond, NULL); pthread_cond_init(&pMgmt->cond, NULL);
tsTelem.email[0] = 0; pMgmt->email[0] = 0;
mndGetEmail("/usr/local/taos/email"); mndGetEmail(pMnode, "/usr/local/taos/email");
pthread_attr_t attr; pthread_attr_t attr;
pthread_attr_init(&attr); pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
int32_t code = pthread_create(&tsTelem.thread, &attr, mndTelemThreadFp, NULL); int32_t code = pthread_create(&pMgmt->thread, &attr, mndTelemThreadFp, pMnode);
pthread_attr_destroy(&attr); pthread_attr_destroy(&attr);
if (code != 0) { if (code != 0) {
mTrace("failed to create telemetry thread since :%s", strerror(code)); mTrace("failed to create telemetry thread since :%s", strerror(code));
...@@ -303,18 +304,19 @@ int32_t mndInitTelem(SMnode *pMnode) { ...@@ -303,18 +304,19 @@ int32_t mndInitTelem(SMnode *pMnode) {
return 0; return 0;
} }
void mndCleanupTelem(SMnode *pMnode) { void mndCleanupTelem(SMnode* pMnode) {
if (!tsTelem.enable) return; STelemMgmt* pMgmt = &pMnode->telemMgmt;
if (!pMgmt->enable) return;
if (taosCheckPthreadValid(tsTelem.thread)) { if (taosCheckPthreadValid(pMgmt->thread)) {
pthread_mutex_lock(&tsTelem.lock); pthread_mutex_lock(&pMgmt->lock);
tsTelem.exit = 1; pMgmt->exit = 1;
pthread_cond_signal(&tsTelem.cond); pthread_cond_signal(&pMgmt->cond);
pthread_mutex_unlock(&tsTelem.lock); pthread_mutex_unlock(&pMgmt->lock);
pthread_join(tsTelem.thread, NULL); pthread_join(pMgmt->thread, NULL);
} }
pthread_mutex_destroy(&tsTelem.lock); pthread_mutex_destroy(&pMgmt->lock);
pthread_cond_destroy(&tsTelem.cond); pthread_cond_destroy(&pMgmt->cond);
} }
...@@ -203,22 +203,25 @@ static int32_t mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) { ...@@ -203,22 +203,25 @@ static int32_t mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) {
pMnode->sendMsgToDnodeFp = pOption->sendMsgToDnodeFp; pMnode->sendMsgToDnodeFp = pOption->sendMsgToDnodeFp;
pMnode->sendMsgToMnodeFp = pOption->sendMsgToMnodeFp; pMnode->sendMsgToMnodeFp = pOption->sendMsgToMnodeFp;
pMnode->sendRedirectMsgFp = pOption->sendRedirectMsgFp; pMnode->sendRedirectMsgFp = pOption->sendRedirectMsgFp;
pMnode->sver = pOption->sver; pMnode->cfg.sver = pOption->cfg.sver;
pMnode->statusInterval = pOption->statusInterval; pMnode->cfg.enableTelem = pOption->cfg.enableTelem;
pMnode->mnodeEqualVnodeNum = pOption->mnodeEqualVnodeNum; pMnode->cfg.statusInterval = pOption->cfg.statusInterval;
pMnode->shellActivityTimer = pOption->shellActivityTimer; pMnode->cfg.mnodeEqualVnodeNum = pOption->cfg.mnodeEqualVnodeNum;
pMnode->timezone = strdup(pOption->timezone); pMnode->cfg.shellActivityTimer = pOption->cfg.shellActivityTimer;
pMnode->locale = strdup(pOption->locale); pMnode->cfg.timezone = strdup(pOption->cfg.timezone);
pMnode->charset = strdup(pOption->charset); pMnode->cfg.locale = strdup(pOption->cfg.locale);
pMnode->cfg.charset = strdup(pOption->cfg.charset);
pMnode->cfg.gitinfo = strdup(pOption->cfg.gitinfo);
pMnode->cfg.buildinfo = strdup(pOption->cfg.buildinfo);
if (pMnode->sendMsgToDnodeFp == NULL || pMnode->sendMsgToMnodeFp == NULL || pMnode->sendRedirectMsgFp == NULL || if (pMnode->sendMsgToDnodeFp == NULL || pMnode->sendMsgToMnodeFp == NULL || pMnode->sendRedirectMsgFp == NULL ||
pMnode->putMsgToApplyMsgFp == NULL || pMnode->dnodeId < 0 || pMnode->clusterId < 0 || pMnode->putMsgToApplyMsgFp == NULL || pMnode->dnodeId < 0 || pMnode->clusterId < 0 ||
pMnode->statusInterval < 1 || pOption->mnodeEqualVnodeNum < 0) { pMnode->cfg.statusInterval < 1 || pOption->cfg.mnodeEqualVnodeNum < 0) {
terrno = TSDB_CODE_MND_INVALID_OPTIONS; terrno = TSDB_CODE_MND_INVALID_OPTIONS;
return -1; return -1;
} }
if (pMnode->timezone == NULL || pMnode->locale == NULL || pMnode->charset == NULL) { if (pMnode->cfg.timezone == NULL || pMnode->cfg.locale == NULL || pMnode->cfg.charset == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
...@@ -289,9 +292,11 @@ void mndClose(SMnode *pMnode) { ...@@ -289,9 +292,11 @@ void mndClose(SMnode *pMnode) {
mDebug("start to close mnode"); mDebug("start to close mnode");
mndCleanupSteps(pMnode, -1); mndCleanupSteps(pMnode, -1);
tfree(pMnode->path); tfree(pMnode->path);
tfree(pMnode->charset); tfree(pMnode->cfg.charset);
tfree(pMnode->locale); tfree(pMnode->cfg.locale);
tfree(pMnode->timezone); tfree(pMnode->cfg.timezone);
tfree(pMnode->cfg.gitinfo);
tfree(pMnode->cfg.buildinfo);
tfree(pMnode); tfree(pMnode);
mDebug("mnode is closed"); mDebug("mnode is closed");
} }
......
...@@ -86,6 +86,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_REF_INVALID_ID, "Invalid Ref ID") ...@@ -86,6 +86,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_REF_INVALID_ID, "Invalid Ref ID")
TAOS_DEFINE_ERROR(TSDB_CODE_REF_ALREADY_EXIST, "Ref is already there") TAOS_DEFINE_ERROR(TSDB_CODE_REF_ALREADY_EXIST, "Ref is already there")
TAOS_DEFINE_ERROR(TSDB_CODE_REF_NOT_EXIST, "Ref is not there") TAOS_DEFINE_ERROR(TSDB_CODE_REF_NOT_EXIST, "Ref is not there")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_VERSION_NUMBER, "Invalid version number")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_VERSION_STRING, "Invalid version string")
TAOS_DEFINE_ERROR(TSDB_CODE_VERSION_NOT_COMPATIBLE, "Version not compatible")
//client //client
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_OPERATION, "Invalid operation") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_OPERATION, "Invalid operation")
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "taoserror.h"
#include "tdef.h"
#include "ulog.h"
int32_t taosVersionStrToInt(const char *vstr, int32_t *vint) {
if (vstr == NULL) {
terrno = TSDB_CODE_INVALID_VERSION_STRING;
return -1;
}
int32_t vnum[4] = {0};
int32_t len = strlen(vstr);
char tmp[16] = {0};
for (int32_t spos = 0, tpos = 0, vpos = 0; spos < len && vpos < 4; ++spos) {
if (vstr[spos] != '.') {
tmp[spos - tpos] = vstr[spos];
} else {
vnum[vpos] = atoi(tmp);
memset(tmp, 0, sizeof(tmp));
vpos++;
tpos = spos + 1;
}
}
if (vnum[0] <= 0) {
terrno = TSDB_CODE_INVALID_VERSION_STRING;
return -1;
}
*vint = vnum[0] * 1000000 + vnum[1] * 10000 + vnum[2] * 100 + vnum[3];
return 0;
}
int32_t taosVersionIntToStr(int32_t vint, char *vstr, int32_t len) {
int32_t s1 = (vint % 100000000) / 1000000;
int32_t s2 = (vint % 1000000) / 10000;
int32_t s3 = (vint % 10000) / 100;
int32_t s4 = vint % 100;
if (s1 <= 0) {
terrno = TSDB_CODE_INVALID_VERSION_NUMBER;
return -1;
}
snprintf(vstr, len, "%02d.%02d.%02d.%02d", s1, s2, s3, s4);
return 0;
}
int32_t taosCheckVersionCompatible(int32_t clientVer, int32_t serverVer, int32_t comparedSegments) {
switch (comparedSegments) {
case 4:
break;
case 3:
clientVer %= 100;
serverVer %= 100;
break;
case 2:
clientVer %= 10000;
serverVer %= 10000;
break;
case 1:
clientVer %= 1000000;
serverVer %= 1000000;
break;
default:
terrno = TSDB_CODE_INVALID_VERSION_NUMBER;
return -1;
}
if (clientVer == serverVer) {
return 0;
} else {
terrno = TSDB_CODE_VERSION_NOT_COMPATIBLE;
return -1;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册