提交 e2e08176 编写于 作者: D dapan1121

Merge remote-tracking branch 'origin/3.0' into feature/qnode

...@@ -4,9 +4,9 @@ ExternalProject_Add(libuv ...@@ -4,9 +4,9 @@ ExternalProject_Add(libuv
GIT_REPOSITORY https://github.com/libuv/libuv.git GIT_REPOSITORY https://github.com/libuv/libuv.git
GIT_TAG v1.42.0 GIT_TAG v1.42.0
SOURCE_DIR "${CMAKE_CONTRIB_DIR}/libuv" SOURCE_DIR "${CMAKE_CONTRIB_DIR}/libuv"
BINARY_DIR "" BINARY_DIR "${CMAKE_CONTRIB_DIR}/libuv"
CONFIGURE_COMMAND "" CONFIGURE_COMMAND ""
BUILD_COMMAND "" BUILD_COMMAND ""
INSTALL_COMMAND "" INSTALL_COMMAND ""
TEST_COMMAND "" TEST_COMMAND ""
) )
\ No newline at end of file
...@@ -23,22 +23,18 @@ extern "C" { ...@@ -23,22 +23,18 @@ extern "C" {
/* ------------------------ TYPES EXPOSED ------------------------ */ /* ------------------------ TYPES EXPOSED ------------------------ */
typedef struct SDnode SDnode; typedef struct SDnode SDnode;
typedef struct SBnode SBnode; typedef struct SBnode SBnode;
typedef int32_t (*SendReqToDnodeFp)(SDnode *pDnode, struct SEpSet *epSet, struct SRpcMsg *rpcMsg); typedef int32_t (*SendReqToDnodeFp)(SDnode *pDnode, struct SEpSet *epSet, struct SRpcMsg *pMsg);
typedef int32_t (*SendReqToMnodeFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg); typedef int32_t (*SendReqToMnodeFp)(SDnode *pDnode, struct SRpcMsg *pMsg);
typedef void (*SendRedirectRspFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg); typedef void (*SendRedirectRspFp)(SDnode *pDnode, struct SRpcMsg *pMsg);
typedef struct { typedef struct {
int64_t numOfErrors; int64_t numOfErrors;
} SBnodeLoad; } SBnodeLoad;
typedef struct { typedef struct {
int32_t sver; int32_t sver;
} SBnodeCfg;
typedef struct {
int32_t dnodeId; int32_t dnodeId;
int64_t clusterId; int64_t clusterId;
SBnodeCfg cfg;
SDnode *pDnode; SDnode *pDnode;
SendReqToDnodeFp sendReqToDnodeFp; SendReqToDnodeFp sendReqToDnodeFp;
SendReqToMnodeFp sendReqToMnodeFp; SendReqToMnodeFp sendReqToMnodeFp;
......
...@@ -22,15 +22,39 @@ ...@@ -22,15 +22,39 @@
extern "C" { extern "C" {
#endif #endif
/* ------------------------ TYPES EXPOSED ------------------------ */ /* ------------------------ TYPES EXPOSED ---------------- */
typedef struct SDnode SDnode; typedef struct SDnode SDnode;
/* ------------------------ Environment ------------------ */
typedef struct {
int32_t sver;
int32_t numOfCores;
int16_t numOfCommitThreads;
int8_t enableTelem;
char timezone[TSDB_TIMEZONE_LEN];
char locale[TSDB_LOCALE_LEN];
char charset[TSDB_LOCALE_LEN];
char buildinfo[64];
char gitinfo[48];
} SDnodeEnvCfg;
/**
* @brief Initialize the environment
*
* @param pOption Option of the environment
* @return int32_t 0 for success and -1 for failure
*/
int32_t dndInit(const SDnodeEnvCfg *pCfg);
/**
* @brief clear the environment
*
*/
void dndCleanup();
/* ------------------------ SDnode ----------------------- */
typedef struct { typedef struct {
int32_t sver;
int32_t numOfCores;
int32_t numOfSupportVnodes; int32_t numOfSupportVnodes;
int16_t numOfCommitThreads;
int8_t enableTelem;
int32_t statusInterval; int32_t statusInterval;
float numOfThreadsPerCore; float numOfThreadsPerCore;
float ratioOfQueryCores; float ratioOfQueryCores;
...@@ -41,28 +65,22 @@ typedef struct { ...@@ -41,28 +65,22 @@ typedef struct {
char localEp[TSDB_EP_LEN]; char localEp[TSDB_EP_LEN];
char localFqdn[TSDB_FQDN_LEN]; char localFqdn[TSDB_FQDN_LEN];
char firstEp[TSDB_EP_LEN]; char firstEp[TSDB_EP_LEN];
char timezone[TSDB_TIMEZONE_LEN]; } SDnodeObjCfg;
char locale[TSDB_LOCALE_LEN];
char charset[TSDB_LOCALE_LEN];
char buildinfo[64];
char gitinfo[48];
} SDnodeOpt;
/* ------------------------ SDnode ------------------------ */
/** /**
* @brief Initialize and start the dnode. * @brief Initialize and start the dnode.
* *
* @param pOption Option of the dnode. * @param pCfg Config of the dnode.
* @return SDnode* The dnode object. * @return SDnode* The dnode object.
*/ */
SDnode *dndInit(SDnodeOpt *pOption); SDnode *dndCreate(SDnodeObjCfg *pCfg);
/** /**
* @brief Stop and cleanup the dnode. * @brief Stop and cleanup the dnode.
* *
* @param pDnode The dnode object to close. * @param pDnode The dnode object to close.
*/ */
void dndCleanup(SDnode *pDnode); void dndClose(SDnode *pDnode);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -23,9 +23,9 @@ extern "C" { ...@@ -23,9 +23,9 @@ extern "C" {
/* ------------------------ TYPES EXPOSED ------------------------ */ /* ------------------------ TYPES EXPOSED ------------------------ */
typedef struct SDnode SDnode; typedef struct SDnode SDnode;
typedef struct SQnode SQnode; typedef struct SQnode SQnode;
typedef int32_t (*SendReqToDnodeFp)(SDnode *pDnode, struct SEpSet *epSet, struct SRpcMsg *rpcMsg); typedef int32_t (*SendReqToDnodeFp)(SDnode *pDnode, struct SEpSet *epSet, struct SRpcMsg *pMsg);
typedef int32_t (*SendReqToMnodeFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg); typedef int32_t (*SendReqToMnodeFp)(SDnode *pDnode, struct SRpcMsg *pMsg);
typedef void (*SendRedirectRspFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg); typedef void (*SendRedirectRspFp)(SDnode *pDnode, struct SRpcMsg *pMsg);
typedef struct { typedef struct {
int64_t numOfStartTask; int64_t numOfStartTask;
...@@ -39,13 +39,9 @@ typedef struct { ...@@ -39,13 +39,9 @@ typedef struct {
} SQnodeLoad; } SQnodeLoad;
typedef struct { typedef struct {
int32_t sver; int32_t sver;
} SQnodeCfg;
typedef struct {
int32_t dnodeId; int32_t dnodeId;
int64_t clusterId; int64_t clusterId;
SQnodeCfg cfg;
SDnode *pDnode; SDnode *pDnode;
SendReqToDnodeFp sendReqToDnodeFp; SendReqToDnodeFp sendReqToDnodeFp;
SendReqToMnodeFp sendReqToMnodeFp; SendReqToMnodeFp sendReqToMnodeFp;
......
...@@ -23,22 +23,18 @@ extern "C" { ...@@ -23,22 +23,18 @@ extern "C" {
/* ------------------------ TYPES EXPOSED ------------------------ */ /* ------------------------ TYPES EXPOSED ------------------------ */
typedef struct SDnode SDnode; typedef struct SDnode SDnode;
typedef struct SSnode SSnode; typedef struct SSnode SSnode;
typedef int32_t (*SendReqToDnodeFp)(SDnode *pDnode, struct SEpSet *epSet, struct SRpcMsg *rpcMsg); typedef int32_t (*SendReqToDnodeFp)(SDnode *pDnode, struct SEpSet *epSet, struct SRpcMsg *pMsg);
typedef int32_t (*SendReqToMnodeFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg); typedef int32_t (*SendReqToMnodeFp)(SDnode *pDnode, struct SRpcMsg *pMsg);
typedef void (*SendRedirectRspFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg); typedef void (*SendRedirectRspFp)(SDnode *pDnode, struct SRpcMsg *pMsg);
typedef struct { typedef struct {
int64_t numOfErrors; int64_t numOfErrors;
} SSnodeLoad; } SSnodeLoad;
typedef struct { typedef struct {
int32_t sver; int32_t sver;
} SSnodeCfg;
typedef struct {
int32_t dnodeId; int32_t dnodeId;
int64_t clusterId; int64_t clusterId;
SSnodeCfg cfg;
SDnode *pDnode; SDnode *pDnode;
SendReqToDnodeFp sendReqToDnodeFp; SendReqToDnodeFp sendReqToDnodeFp;
SendReqToMnodeFp sendReqToMnodeFp; SendReqToMnodeFp sendReqToMnodeFp;
......
...@@ -43,15 +43,6 @@ typedef struct SField { ...@@ -43,15 +43,6 @@ typedef struct SField {
int32_t bytes; int32_t bytes;
} SField; } SField;
typedef struct SParseBasicCtx {
uint64_t requestId;
int32_t acctId;
const char *db;
void *pTransporter;
SEpSet mgmtEpSet;
struct SCatalog *pCatalog;
} SParseBasicCtx;
typedef struct SFieldInfo { typedef struct SFieldInfo {
int16_t numOfOutput; // number of column in result int16_t numOfOutput; // number of column in result
SField *final; SField *final;
......
...@@ -23,11 +23,17 @@ extern "C" { ...@@ -23,11 +23,17 @@ extern "C" {
#include "parsenodes.h" #include "parsenodes.h"
typedef struct SParseContext { typedef struct SParseContext {
SParseBasicCtx ctx; uint64_t requestId;
int32_t acctId;
const char *db;
void *pTransporter;
SEpSet mgmtEpSet;
const char *pSql; // sql string const char *pSql; // sql string
size_t sqlLen; // length of the sql string size_t sqlLen; // length of the sql string
char *pMsg; // extended error message if exists to help identifying the problem in sql statement. char *pMsg; // extended error message if exists to help identifying the problem in sql statement.
int32_t msgLen; // max length of the msg int32_t msgLen; // max length of the msg
struct SCatalog *pCatalog;
} SParseContext; } SParseContext;
/** /**
......
...@@ -91,8 +91,10 @@ typedef struct SPhyNode { ...@@ -91,8 +91,10 @@ typedef struct SPhyNode {
typedef struct SScanPhyNode { typedef struct SScanPhyNode {
SPhyNode node; SPhyNode node;
uint64_t uid; // unique id of the table uint64_t uid; // unique id of the table
int8_t tableType; int8_t tableType;
int32_t order; // scan order: TSDB_ORDER_ASC|TSDB_ORDER_DESC
int32_t count; // repeat count
} SScanPhyNode; } SScanPhyNode;
typedef SScanPhyNode SSystemTableScanPhyNode; typedef SScanPhyNode SSystemTableScanPhyNode;
......
...@@ -49,7 +49,7 @@ typedef struct { ...@@ -49,7 +49,7 @@ typedef struct {
} STierMeta; } STierMeta;
int tfsInit(SDiskCfg *pDiskCfg, int ndisk); int tfsInit(SDiskCfg *pDiskCfg, int ndisk);
void tfsDestroy(); void tfsCleanup();
void tfsUpdateInfo(SFSMeta *pFSMeta, STierMeta *tierMetas, int8_t numLevels); void tfsUpdateInfo(SFSMeta *pFSMeta, STierMeta *tierMetas, int8_t numLevels);
void tfsGetMeta(SFSMeta *pMeta); void tfsGetMeta(SFSMeta *pMeta);
void tfsAllocDisk(int expLevel, int *level, int *id); void tfsAllocDisk(int expLevel, int *level, int *id);
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_TKV_H_
#define _TD_TKV_H_
#if 0
#include "os.h"
#ifdef __cplusplus
extern "C" {
#endif
// Types exported
typedef struct STkvDb STkvDb;
typedef struct STkvOpts STkvOpts;
typedef struct STkvCache STkvCache;
typedef struct STkvReadOpts STkvReadOpts;
typedef struct STkvWriteOpts STkvWriteOpts;
// DB operations
STkvDb *tkvOpen(const STkvOpts *options, const char *path);
void tkvClose(STkvDb *db);
void tkvPut(STkvDb *db, const STkvWriteOpts *, const char *key, size_t keylen, const char *val, size_t vallen);
char * tkvGet(STkvDb *db, const STkvReadOpts *, const char *key, size_t keylen, size_t *vallen);
void tkvCommit(STkvDb *db);
// DB options
STkvOpts *tkvOptsCreate();
void tkvOptsDestroy(STkvOpts *);
void tkvOptionsSetCache(STkvOpts *, STkvCache *);
void tkvOptsSetCreateIfMissing(STkvOpts *, unsigned char);
// DB cache
typedef enum { TKV_LRU_CACHE = 0, TKV_LFU_CACHE = 1 } ETkvCacheType;
STkvCache *tkvCacheCreate(size_t capacity, ETkvCacheType type);
void tkvCacheDestroy(STkvCache *);
// STkvReadOpts
STkvReadOpts *tkvReadOptsCreate();
void tkvReadOptsDestroy(STkvReadOpts *);
// STkvWriteOpts
STkvWriteOpts *tkvWriteOptsCreate();
void tkvWriteOptsDestroy(STkvWriteOpts *);
#ifdef __cplusplus
}
#endif
#endif
#endif /*_TD_TKV_H_*/
\ No newline at end of file
...@@ -70,6 +70,8 @@ int32_t* taosGetErrno(); ...@@ -70,6 +70,8 @@ int32_t* taosGetErrno();
#define TSDB_CODE_INVALID_MSG TAOS_DEF_ERROR_CODE(0, 0x0108) #define TSDB_CODE_INVALID_MSG TAOS_DEF_ERROR_CODE(0, 0x0108)
#define TSDB_CODE_MSG_NOT_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0109) #define TSDB_CODE_MSG_NOT_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0109)
#define TSDB_CODE_INVALID_PARA TAOS_DEF_ERROR_CODE(0, 0x010A) #define TSDB_CODE_INVALID_PARA TAOS_DEF_ERROR_CODE(0, 0x010A)
#define TSDB_CODE_REPEAT_INIT TAOS_DEF_ERROR_CODE(0, 0x010B)
#define TSDB_CODE_REF_NO_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0110) #define TSDB_CODE_REF_NO_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0110)
#define TSDB_CODE_REF_FULL TAOS_DEF_ERROR_CODE(0, 0x0111) #define TSDB_CODE_REF_FULL TAOS_DEF_ERROR_CODE(0, 0x0111)
#define TSDB_CODE_REF_ID_REMOVED TAOS_DEF_ERROR_CODE(0, 0x0112) #define TSDB_CODE_REF_ID_REMOVED TAOS_DEF_ERROR_CODE(0, 0x0112)
......
...@@ -23,10 +23,23 @@ extern "C" { ...@@ -23,10 +23,23 @@ extern "C" {
#include "os.h" #include "os.h"
#include "talgo.h" #include "talgo.h"
#if 0
#define TARRAY(TYPE) \
struct { \
int32_t tarray_size_; \
int32_t tarray_neles_; \
struct TYPE* td_array_data_; \
}
#define TARRAY_SIZE(ARRAY) (ARRAY)->tarray_size_
#define TARRAY_NELES(ARRAY) (ARRAY)->tarray_neles_
#define TARRAY_ELE_AT(ARRAY, IDX) ((ARRAY)->td_array_data_ + idx)
#endif
#define TARRAY_MIN_SIZE 8 #define TARRAY_MIN_SIZE 8
#define TARRAY_GET_ELEM(array, index) ((void*)((char*)((array)->pData) + (index) * (array)->elemSize)) #define TARRAY_GET_ELEM(array, index) ((void*)((char*)((array)->pData) + (index) * (array)->elemSize))
#define TARRAY_ELEM_IDX(array, ele) (POINTER_DISTANCE(ele, (array)->pData) / (array)->elemSize) #define TARRAY_ELEM_IDX(array, ele) (POINTER_DISTANCE(ele, (array)->pData) / (array)->elemSize)
#define TARRAY_GET_START(array) ((array)->pData) #define TARRAY_GET_START(array) ((array)->pData)
typedef struct SArray { typedef struct SArray {
size_t size; size_t size;
...@@ -57,7 +70,7 @@ int32_t taosArrayEnsureCap(SArray* pArray, size_t tsize); ...@@ -57,7 +70,7 @@ int32_t taosArrayEnsureCap(SArray* pArray, size_t tsize);
* @param nEles * @param nEles
* @return * @return
*/ */
void *taosArrayAddBatch(SArray *pArray, const void *pData, int nEles); void* taosArrayAddBatch(SArray* pArray, const void* pData, int nEles);
/** /**
* *
...@@ -65,7 +78,7 @@ void *taosArrayAddBatch(SArray *pArray, const void *pData, int nEles); ...@@ -65,7 +78,7 @@ void *taosArrayAddBatch(SArray *pArray, const void *pData, int nEles);
* @param pData position array list * @param pData position array list
* @param numOfElems the number of removed position * @param numOfElems the number of removed position
*/ */
void taosArrayRemoveBatch(SArray *pArray, const int32_t* pData, int32_t numOfElems); void taosArrayRemoveBatch(SArray* pArray, const int32_t* pData, int32_t numOfElems);
/** /**
* *
...@@ -73,7 +86,7 @@ void taosArrayRemoveBatch(SArray *pArray, const int32_t* pData, int32_t numOfEle ...@@ -73,7 +86,7 @@ void taosArrayRemoveBatch(SArray *pArray, const int32_t* pData, int32_t numOfEle
* @param comparFn * @param comparFn
* @param fp * @param fp
*/ */
void taosArrayRemoveDuplicate(SArray *pArray, __compar_fn_t comparFn, void (*fp)(void*)); void taosArrayRemoveDuplicate(SArray* pArray, __compar_fn_t comparFn, void (*fp)(void*));
/** /**
* add all element from the source array list into the destination * add all element from the source array list into the destination
...@@ -242,19 +255,18 @@ int32_t taosArraySearchIdx(const SArray* pArray, const void* key, __compar_fn_t ...@@ -242,19 +255,18 @@ int32_t taosArraySearchIdx(const SArray* pArray, const void* key, __compar_fn_t
*/ */
char* taosArraySearchString(const SArray* pArray, const char* key, __compar_fn_t comparFn, int flags); char* taosArraySearchString(const SArray* pArray, const char* key, __compar_fn_t comparFn, int flags);
/** /**
* sort the pointer data in the array * sort the pointer data in the array
* @param pArray * @param pArray
* @param compar * @param compar
* @param param * @param param
* @return * @return
*/ */
void taosArraySortPWithExt(SArray* pArray, __ext_compar_fn_t fn, const void *param); void taosArraySortPWithExt(SArray* pArray, __ext_compar_fn_t fn, const void* param);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
#endif /*_TD_UTIL_ARRAY_H*/ #endif /*_TD_UTIL_ARRAY_H*/
...@@ -93,13 +93,12 @@ typedef struct SReqResultInfo { ...@@ -93,13 +93,12 @@ typedef struct SReqResultInfo {
const char *pData; const char *pData;
TAOS_FIELD *fields; TAOS_FIELD *fields;
uint32_t numOfCols; uint32_t numOfCols;
int32_t *length; int32_t *length;
TAOS_ROW row; TAOS_ROW row;
char **pCol; char **pCol;
uint32_t numOfRows; uint32_t numOfRows;
uint32_t current; uint32_t current;
bool completed;
} SReqResultInfo; } SReqResultInfo;
typedef struct SShowReqInfo { typedef struct SShowReqInfo {
...@@ -130,7 +129,6 @@ typedef struct SRequestObj { ...@@ -130,7 +129,6 @@ typedef struct SRequestObj {
char *msgBuf; char *msgBuf;
void *pInfo; // sql parse info, generated by parser module void *pInfo; // sql parse info, generated by parser module
int32_t code; int32_t code;
uint64_t affectedRows; // todo remove it
SQueryExecMetric metric; SQueryExecMetric metric;
SRequestSendRecvBody body; SRequestSendRecvBody body;
} SRequestObj; } SRequestObj;
......
...@@ -25,8 +25,9 @@ ...@@ -25,8 +25,9 @@
static int32_t initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet); static int32_t initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet);
static SMsgSendInfo* buildConnectMsg(SRequestObj *pRequest); static SMsgSendInfo* buildConnectMsg(SRequestObj *pRequest);
static void destroySendMsgInfo(SMsgSendInfo* pMsgBody); static void destroySendMsgInfo(SMsgSendInfo* pMsgBody);
static void setQueryResultByRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp);
static bool stringLengthCheck(const char* str, size_t maxsize) { static bool stringLengthCheck(const char* str, size_t maxsize) {
if (str == NULL) { if (str == NULL) {
return false; return false;
} }
...@@ -150,23 +151,26 @@ int32_t parseSql(SRequestObj* pRequest, SQueryNode** pQuery) { ...@@ -150,23 +151,26 @@ int32_t parseSql(SRequestObj* pRequest, SQueryNode** pQuery) {
STscObj* pTscObj = pRequest->pTscObj; STscObj* pTscObj = pRequest->pTscObj;
SParseContext cxt = { SParseContext cxt = {
.ctx = {.requestId = pRequest->requestId, .acctId = pTscObj->acctId, .db = getConnectionDB(pTscObj), .pTransporter = pTscObj->pTransporter}, .requestId = pRequest->requestId,
.acctId = pTscObj->acctId,
.db = getConnectionDB(pTscObj),
.pTransporter = pTscObj->pTransporter,
.pSql = pRequest->sqlstr, .pSql = pRequest->sqlstr,
.sqlLen = pRequest->sqlLen, .sqlLen = pRequest->sqlLen,
.pMsg = pRequest->msgBuf, .pMsg = pRequest->msgBuf,
.msgLen = ERROR_MSG_BUF_DEFAULT_SIZE .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE
}; };
cxt.ctx.mgmtEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp); cxt.mgmtEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
int32_t code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &cxt.ctx.pCatalog); int32_t code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &cxt.pCatalog);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
tfree(cxt.ctx.db); tfree(cxt.db);
return code; return code;
} }
code = qParseQuerySql(&cxt, pQuery); code = qParseQuerySql(&cxt, pQuery);
tfree(cxt.ctx.db); tfree(cxt.db);
return code; return code;
} }
...@@ -212,6 +216,7 @@ int32_t getPlan(SRequestObj* pRequest, SQueryNode* pQueryNode, SQueryDag** pDag) ...@@ -212,6 +216,7 @@ int32_t getPlan(SRequestObj* pRequest, SQueryNode* pQueryNode, SQueryDag** pDag)
SSubplan* pPlan = taosArrayGetP(pa, 0); SSubplan* pPlan = taosArrayGetP(pa, 0);
SDataBlockSchema* pDataBlockSchema = &(pPlan->pDataSink->schema); SDataBlockSchema* pDataBlockSchema = &(pPlan->pDataSink->schema);
setResSchemaInfo(pResInfo, pDataBlockSchema); setResSchemaInfo(pResInfo, pDataBlockSchema);
pRequest->type = TDMT_VND_QUERY; pRequest->type = TDMT_VND_QUERY;
} }
...@@ -228,7 +233,7 @@ void setResSchemaInfo(SReqResultInfo* pResInfo, const SDataBlockSchema* pDataBlo ...@@ -228,7 +233,7 @@ void setResSchemaInfo(SReqResultInfo* pResInfo, const SDataBlockSchema* pDataBlo
SSchema* pSchema = &pDataBlockSchema->pSchema[i]; SSchema* pSchema = &pDataBlockSchema->pSchema[i];
pResInfo->fields[i].bytes = pSchema->bytes; pResInfo->fields[i].bytes = pSchema->bytes;
pResInfo->fields[i].type = pSchema->type; pResInfo->fields[i].type = pSchema->type;
tstrncpy(pResInfo->fields[i].name, pSchema[i].name, tListLen(pResInfo->fields[i].name)); tstrncpy(pResInfo->fields[i].name, pSchema->name, tListLen(pResInfo->fields[i].name));
} }
} }
...@@ -245,8 +250,9 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag) { ...@@ -245,8 +250,9 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag) {
} }
} }
pRequest->affectedRows = res.numOfRows; pRequest->body.resInfo.numOfRows = res.numOfRows;
return res.code; pRequest->code = res.code;
return pRequest->code;
} }
return scheduleAsyncExecJob(pRequest->pTscObj->pTransporter, NULL, pDag, &pRequest->body.pQueryJob); return scheduleAsyncExecJob(pRequest->pTscObj->pTransporter, NULL, pDag, &pRequest->body.pQueryJob);
...@@ -545,11 +551,15 @@ void* doFetchRow(SRequestObj* pRequest) { ...@@ -545,11 +551,15 @@ void* doFetchRow(SRequestObj* pRequest) {
if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) { if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) {
if (pRequest->type == TDMT_VND_QUERY) { if (pRequest->type == TDMT_VND_QUERY) {
pRequest->type = TDMT_VND_FETCH; // All data has returned to App already, no need to try again
if (pResultInfo->completed) {
return NULL;
}
scheduleFetchRows(pRequest->body.pQueryJob, (void **)&pRequest->body.resInfo.pData); scheduleFetchRows(pRequest->body.pQueryJob, (void **)&pRequest->body.resInfo.pData);
setQueryResultByRsp(&pRequest->body.resInfo, (SRetrieveTableRsp*)pRequest->body.resInfo.pData);
pResultInfo->current = 0;
if (pResultInfo->numOfRows <= pResultInfo->current) { if (pResultInfo->numOfRows == 0) {
return NULL; return NULL;
} }
...@@ -611,12 +621,23 @@ _return: ...@@ -611,12 +621,23 @@ _return:
return pResultInfo->row; return pResultInfo->row;
} }
static void doPrepareResPtr(SReqResultInfo* pResInfo) {
if (pResInfo->row == NULL) {
pResInfo->row = calloc(pResInfo->numOfCols, POINTER_BYTES);
pResInfo->pCol = calloc(pResInfo->numOfCols, POINTER_BYTES);
pResInfo->length = calloc(pResInfo->numOfCols, sizeof(int32_t));
}
}
void setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows) { void setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows) {
assert(numOfCols > 0 && pFields != NULL && pResultInfo != NULL); assert(numOfCols > 0 && pFields != NULL && pResultInfo != NULL);
if (numOfRows == 0) { if (numOfRows == 0) {
return; return;
} }
// todo check for the failure of malloc
doPrepareResPtr(pResultInfo);
int32_t offset = 0; int32_t offset = 0;
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
pResultInfo->length[i] = pResultInfo->fields[i].bytes; pResultInfo->length[i] = pResultInfo->fields[i].bytes;
...@@ -642,3 +663,14 @@ void setConnectionDB(STscObj* pTscObj, const char* db) { ...@@ -642,3 +663,14 @@ void setConnectionDB(STscObj* pTscObj, const char* db) {
pthread_mutex_unlock(&pTscObj->mutex); pthread_mutex_unlock(&pTscObj->mutex);
} }
void setQueryResultByRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp) {
assert(pResultInfo != NULL && pRsp != NULL);
pResultInfo->pRspMsg = (const char*) pRsp;
pResultInfo->pData = (void*) pRsp->data;
pResultInfo->numOfRows = htonl(pRsp->numOfRows);
pResultInfo->current = 0;
pResultInfo->completed = (pRsp->completed == 1);
setResultDataPtr(pResultInfo, pResultInfo->fields, pResultInfo->numOfCols, pResultInfo->numOfRows);
}
\ No newline at end of file
...@@ -265,7 +265,13 @@ const char *taos_data_type(int type) { ...@@ -265,7 +265,13 @@ const char *taos_data_type(int type) {
const char *taos_get_client_info() { return version; } const char *taos_get_client_info() { return version; }
int taos_affected_rows(TAOS_RES *res) { int taos_affected_rows(TAOS_RES *res) {
return ((SRequestObj*)res)->affectedRows; if (res == NULL) {
return 0;
}
SRequestObj* pRequest = (SRequestObj*) res;
SReqResultInfo* pResInfo = &pRequest->body.resInfo;
return pResInfo->numOfRows;
} }
int taos_result_precision(TAOS_RES *res) { return TSDB_TIME_PRECISION_MILLI; } int taos_result_precision(TAOS_RES *res) { return TSDB_TIME_PRECISION_MILLI; }
...@@ -154,9 +154,6 @@ int32_t processShowRsp(void* param, const SDataBuf* pMsg, int32_t code) { ...@@ -154,9 +154,6 @@ int32_t processShowRsp(void* param, const SDataBuf* pMsg, int32_t code) {
pResInfo->fields = pFields; pResInfo->fields = pFields;
pResInfo->numOfCols = pMetaMsg->numOfColumns; pResInfo->numOfCols = pMetaMsg->numOfColumns;
pResInfo->row = calloc(pResInfo->numOfCols, POINTER_BYTES);
pResInfo->pCol = calloc(pResInfo->numOfCols, POINTER_BYTES);
pResInfo->length = calloc(pResInfo->numOfCols, sizeof(int32_t));
pRequest->body.showInfo.execId = pShow->showId; pRequest->body.showInfo.execId = pShow->showId;
......
...@@ -279,7 +279,7 @@ TEST(testCase, connect_Test) { ...@@ -279,7 +279,7 @@ TEST(testCase, connect_Test) {
// taos_free_result(pRes); // taos_free_result(pRes);
// taos_close(pConn); // taos_close(pConn);
//} //}
//
//TEST(testCase, create_table_Test) { //TEST(testCase, create_table_Test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// assert(pConn != NULL); // assert(pConn != NULL);
...@@ -292,26 +292,26 @@ TEST(testCase, connect_Test) { ...@@ -292,26 +292,26 @@ TEST(testCase, connect_Test) {
// //
// taos_close(pConn); // taos_close(pConn);
//} //}
//
//TEST(testCase, create_ctable_Test) { TEST(testCase, create_ctable_Test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// assert(pConn != NULL); assert(pConn != NULL);
//
// TAOS_RES* pRes = taos_query(pConn, "use abc1"); TAOS_RES* pRes = taos_query(pConn, "use abc1");
// if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
// printf("failed to use db, reason:%s\n", taos_errstr(pRes)); printf("failed to use db, reason:%s\n", taos_errstr(pRes));
// } }
// taos_free_result(pRes); taos_free_result(pRes);
//
// pRes = taos_query(pConn, "create table tm0 using st1 tags(1)"); pRes = taos_query(pConn, "create table tm0 using st1 tags(1)");
// if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
// printf("failed to create child table tm0, reason:%s\n", taos_errstr(pRes)); printf("failed to create child table tm0, reason:%s\n", taos_errstr(pRes));
// } }
//
// taos_free_result(pRes); taos_free_result(pRes);
// taos_close(pConn); taos_close(pConn);
//} }
//
//TEST(testCase, show_stable_Test) { //TEST(testCase, show_stable_Test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// assert(pConn != NULL); // assert(pConn != NULL);
...@@ -533,6 +533,7 @@ TEST(testCase, connect_Test) { ...@@ -533,6 +533,7 @@ TEST(testCase, connect_Test) {
// tmq_create_topic(pConn, "test_topic_1", sql, strlen(sql)); // tmq_create_topic(pConn, "test_topic_1", sql, strlen(sql));
// taos_close(pConn); // taos_close(pConn);
//} //}
//TEST(testCase, insert_test) { //TEST(testCase, insert_test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// ASSERT_EQ(pConn, nullptr); // ASSERT_EQ(pConn, nullptr);
...@@ -550,49 +551,48 @@ TEST(testCase, connect_Test) { ...@@ -550,49 +551,48 @@ TEST(testCase, connect_Test) {
// taos_free_result(pRes); // taos_free_result(pRes);
// taos_close(pConn); // taos_close(pConn);
//} //}
//#endif
TEST(testCase, projection_query_tables) { //TEST(testCase, projection_query_tables) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(pConn, nullptr); // ASSERT_NE(pConn, nullptr);
//
// TAOS_RES* pRes = taos_query(pConn, "create database abc1 vgroups 2"); //// TAOS_RES* pRes = taos_query(pConn, "create database abc1 vgroups 2");
//// if (taos_errno(pRes) != 0) {
//// printf("failed to use db, reason:%s\n", taos_errstr(pRes));
//// taos_free_result(pRes);
//// return;
//// }
//
//// taos_free_result(pRes);
//
// TAOS_RES* pRes = taos_query(pConn, "use abc1");
//
//// pRes = taos_query(pConn, "create table m1 (ts timestamp, k int) tags(a int)");
// taos_free_result(pRes);
////
//// pRes = taos_query(pConn, "create table tu using m1 tags(1)");
//// taos_free_result(pRes);
////
//// pRes = taos_query(pConn, "insert into tu values(now, 1)");
//// taos_free_result(pRes);
//
// pRes = taos_query(pConn, "select * from tu");
// if (taos_errno(pRes) != 0) { // if (taos_errno(pRes) != 0) {
// printf("failed to use db, reason:%s\n", taos_errstr(pRes)); // printf("failed to select from table, reason:%s\n", taos_errstr(pRes));
// taos_free_result(pRes); // taos_free_result(pRes);
// return; // ASSERT_TRUE(false);
// } // }
// taos_free_result(pRes);
TAOS_RES* pRes = taos_query(pConn, "use abc1");
// pRes = taos_query(pConn, "create table m1 (ts timestamp, k int) tags(a int)");
taos_free_result(pRes);
// //
// pRes = taos_query(pConn, "create table tu using m1 tags(1)"); // TAOS_ROW pRow = NULL;
// taos_free_result(pRes); // TAOS_FIELD* pFields = taos_fetch_fields(pRes);
// int32_t numOfFields = taos_num_fields(pRes);
//
// char str[512] = {0};
// while ((pRow = taos_fetch_row(pRes)) != NULL) {
// int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
// printf("%s\n", str);
// }
// //
// pRes = taos_query(pConn, "insert into tu values(now, 1)");
// taos_free_result(pRes); // taos_free_result(pRes);
// taos_close(pConn);
pRes = taos_query(pConn, "select * from tu"); //}
if (taos_errno(pRes) != 0) {
printf("failed to select from table, reason:%s\n", taos_errstr(pRes));
taos_free_result(pRes);
ASSERT_TRUE(false);
}
TAOS_ROW pRow = NULL;
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
int32_t numOfFields = taos_num_fields(pRes);
char str[512] = {0};
while ((pRow = taos_fetch_row(pRes)) != NULL) {
int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
printf("%s\n", str);
}
taos_free_result(pRes);
taos_close(pConn);
}
...@@ -30,12 +30,7 @@ extern "C" { ...@@ -30,12 +30,7 @@ extern "C" {
#endif #endif
typedef struct SBnode { typedef struct SBnode {
int32_t dnodeId; SBnodeOpt opt;
int64_t clusterId;
SBnodeCfg cfg;
SendReqToDnodeFp sendReqToDnodeFp;
SendReqToMnodeFp sendReqToMnodeFp;
SendRedirectRspFp sendRedirectRspFp;
} SBnode; } SBnode;
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -28,11 +28,11 @@ static struct { ...@@ -28,11 +28,11 @@ static struct {
bool printAuth; bool printAuth;
bool printVersion; bool printVersion;
char configDir[PATH_MAX]; char configDir[PATH_MAX];
} global = {0}; } dmn = {0};
void dmnSigintHandle(int signum, void *info, void *ctx) { void dmnSigintHandle(int signum, void *info, void *ctx) {
uInfo("singal:%d is received", signum); uInfo("singal:%d is received", signum);
global.stop = true; dmn.stop = true;
} }
void dmnSetSignalHandle() { void dmnSetSignalHandle() {
...@@ -44,7 +44,7 @@ void dmnSetSignalHandle() { ...@@ -44,7 +44,7 @@ void dmnSetSignalHandle() {
} }
int dmnParseOption(int argc, char const *argv[]) { int dmnParseOption(int argc, char const *argv[]) {
tstrncpy(global.configDir, "/etc/taos", PATH_MAX); tstrncpy(dmn.configDir, "/etc/taos", PATH_MAX);
for (int i = 1; i < argc; ++i) { for (int i = 1; i < argc; ++i) {
if (strcmp(argv[i], "-c") == 0) { if (strcmp(argv[i], "-c") == 0) {
...@@ -53,19 +53,19 @@ int dmnParseOption(int argc, char const *argv[]) { ...@@ -53,19 +53,19 @@ int dmnParseOption(int argc, char const *argv[]) {
printf("config file path overflow"); printf("config file path overflow");
return -1; return -1;
} }
tstrncpy(global.configDir, argv[i], PATH_MAX); tstrncpy(dmn.configDir, argv[i], PATH_MAX);
} else { } else {
printf("'-c' requires a parameter, default is %s\n", configDir); printf("'-c' requires a parameter, default is %s\n", configDir);
return -1; return -1;
} }
} else if (strcmp(argv[i], "-C") == 0) { } else if (strcmp(argv[i], "-C") == 0) {
global.dumpConfig = true; dmn.dumpConfig = true;
} else if (strcmp(argv[i], "-k") == 0) { } else if (strcmp(argv[i], "-k") == 0) {
global.generateGrant = true; dmn.generateGrant = true;
} else if (strcmp(argv[i], "-A") == 0) { } else if (strcmp(argv[i], "-A") == 0) {
global.printAuth = true; dmn.printAuth = true;
} else if (strcmp(argv[i], "-V") == 0) { } else if (strcmp(argv[i], "-V") == 0) {
global.printVersion = true; dmn.printVersion = true;
} else { } else {
} }
} }
...@@ -92,7 +92,7 @@ void dmnPrintVersion() { ...@@ -92,7 +92,7 @@ void dmnPrintVersion() {
} }
int dmnReadConfig(const char *path) { int dmnReadConfig(const char *path) {
tstrncpy(configDir, global.configDir, PATH_MAX); tstrncpy(configDir, dmn.configDir, PATH_MAX);
taosInitGlobalCfg(); taosInitGlobalCfg();
taosReadGlobalLogCfg(); taosReadGlobalLogCfg();
...@@ -114,12 +114,12 @@ int dmnReadConfig(const char *path) { ...@@ -114,12 +114,12 @@ int dmnReadConfig(const char *path) {
} }
if (taosReadCfgFromFile() != 0) { if (taosReadCfgFromFile() != 0) {
uError("failed to read global config"); uError("failed to read config");
return -1; return -1;
} }
if (taosCheckAndPrintCfg() != 0) { if (taosCheckAndPrintCfg() != 0) {
uError("failed to check global config"); uError("failed to check config");
return -1; return -1;
} }
...@@ -131,38 +131,50 @@ void dmnDumpConfig() { taosDumpGlobalCfg(); } ...@@ -131,38 +131,50 @@ void dmnDumpConfig() { taosDumpGlobalCfg(); }
void dmnWaitSignal() { void dmnWaitSignal() {
dmnSetSignalHandle(); dmnSetSignalHandle();
while (!global.stop) { while (!dmn.stop) {
taosMsleep(100); taosMsleep(100);
} }
} }
void dmnInitOption(SDnodeOpt *pOption) { void dnmInitEnvCfg(SDnodeEnvCfg *pCfg) {
pOption->sver = 30000000; //3.0.0.0 pCfg->sver = 30000000; // 3.0.0.0
pOption->numOfCores = tsNumOfCores; pCfg->numOfCores = tsNumOfCores;
pOption->numOfSupportVnodes = tsNumOfSupportVnodes; pCfg->numOfCommitThreads = tsNumOfCommitThreads;
pOption->numOfCommitThreads = tsNumOfCommitThreads; pCfg->enableTelem = 0;
pOption->statusInterval = tsStatusInterval; tstrncpy(pCfg->timezone, tsTimezone, TSDB_TIMEZONE_LEN);
pOption->numOfThreadsPerCore = tsNumOfThreadsPerCore; tstrncpy(pCfg->locale, tsLocale, TSDB_LOCALE_LEN);
pOption->ratioOfQueryCores = tsRatioOfQueryCores; tstrncpy(pCfg->charset, tsCharset, TSDB_LOCALE_LEN);
pOption->maxShellConns = tsMaxShellConns; tstrncpy(pCfg->buildinfo, buildinfo, 64);
pOption->shellActivityTimer = tsShellActivityTimer; tstrncpy(pCfg->gitinfo, gitinfo, 48);
pOption->serverPort = tsServerPort; }
tstrncpy(pOption->dataDir, tsDataDir, TSDB_FILENAME_LEN);
tstrncpy(pOption->localEp, tsLocalEp, TSDB_EP_LEN); void dmnInitObjCfg(SDnodeObjCfg *pCfg) {
tstrncpy(pOption->localFqdn, tsLocalFqdn, TSDB_FQDN_LEN); pCfg->numOfSupportVnodes = tsNumOfSupportVnodes;
tstrncpy(pOption->firstEp, tsFirst, TSDB_EP_LEN); pCfg->statusInterval = tsStatusInterval;
tstrncpy(pOption->timezone, tsTimezone, TSDB_TIMEZONE_LEN); pCfg->numOfThreadsPerCore = tsNumOfThreadsPerCore;
tstrncpy(pOption->locale, tsLocale, TSDB_LOCALE_LEN); pCfg->ratioOfQueryCores = tsRatioOfQueryCores;
tstrncpy(pOption->charset, tsCharset, TSDB_LOCALE_LEN); pCfg->maxShellConns = tsMaxShellConns;
tstrncpy(pOption->buildinfo, buildinfo, 64); pCfg->shellActivityTimer = tsShellActivityTimer;
tstrncpy(pOption->gitinfo, gitinfo, 48); pCfg->serverPort = tsServerPort;
tstrncpy(pCfg->dataDir, tsDataDir, TSDB_FILENAME_LEN);
tstrncpy(pCfg->localEp, tsLocalEp, TSDB_EP_LEN);
tstrncpy(pCfg->localFqdn, tsLocalFqdn, TSDB_FQDN_LEN);
tstrncpy(pCfg->firstEp, tsFirst, TSDB_EP_LEN);
} }
int dmnRunDnode() { int dmnRunDnode() {
SDnodeOpt option = {0}; SDnodeEnvCfg envCfg = {0};
dmnInitOption(&option); SDnodeObjCfg objCfg = {0};
dnmInitEnvCfg(&envCfg);
dmnInitObjCfg(&objCfg);
if (dndInit(&envCfg) != 0) {
uInfo("Failed to start TDengine, please check the log at %s", tsLogDir);
return -1;
}
SDnode *pDnode = dndInit(&option); SDnode *pDnode = dndCreate(&objCfg);
if (pDnode == NULL) { if (pDnode == NULL) {
uInfo("Failed to start TDengine, please check the log at %s", tsLogDir); uInfo("Failed to start TDengine, please check the log at %s", tsLogDir);
return -1; return -1;
...@@ -172,7 +184,8 @@ int dmnRunDnode() { ...@@ -172,7 +184,8 @@ int dmnRunDnode() {
dmnWaitSignal(); dmnWaitSignal();
uInfo("TDengine is shut down!"); uInfo("TDengine is shut down!");
dndCleanup(pDnode); dndClose(pDnode);
dndCleanup();
taosCloseLog(); taosCloseLog();
return 0; return 0;
} }
...@@ -182,21 +195,21 @@ int main(int argc, char const *argv[]) { ...@@ -182,21 +195,21 @@ int main(int argc, char const *argv[]) {
return -1; return -1;
} }
if (global.generateGrant) { if (dmn.generateGrant) {
dmnGenerateGrant(); dmnGenerateGrant();
return 0; return 0;
} }
if (global.printVersion) { if (dmn.printVersion) {
dmnPrintVersion(); dmnPrintVersion();
return 0; return 0;
} }
if (dmnReadConfig(global.configDir) != 0) { if (dmnReadConfig(dmn.configDir) != 0) {
return -1; return -1;
} }
if (global.dumpConfig) { if (dmn.dumpConfig) {
dmnDumpConfig(); dmnDumpConfig();
return 0; return 0;
} }
......
...@@ -19,7 +19,7 @@ ...@@ -19,7 +19,7 @@
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
#include "dndInt.h" #include "dndEnv.h"
int32_t dndInitBnode(SDnode *pDnode); int32_t dndInitBnode(SDnode *pDnode);
void dndCleanupBnode(SDnode *pDnode); void dndCleanupBnode(SDnode *pDnode);
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_DND_ENV_H_
#define _TD_DND_ENV_H_
#ifdef __cplusplus
extern "C" {
#endif
#include "dndInt.h"
typedef struct {
EWorkerType type;
const char *name;
int32_t minNum;
int32_t maxNum;
void *queueFp;
SDnode *pDnode;
STaosQueue *queue;
union {
SWorkerPool pool;
SMWorkerPool mpool;
};
} SDnodeWorker;
typedef struct {
char *dnode;
char *mnode;
char *snode;
char *bnode;
char *vnodes;
} SDnodeDir;
typedef struct {
int32_t dnodeId;
int32_t dropped;
int64_t clusterId;
int64_t dver;
int64_t rebootTime;
int64_t updateTime;
int8_t statusSent;
SEpSet mnodeEpSet;
char *file;
SHashObj *dnodeHash;
SDnodeEps *dnodeEps;
pthread_t *threadId;
SRWLatch latch;
SDnodeWorker mgmtWorker;
SDnodeWorker statusWorker;
} SDnodeMgmt;
typedef struct {
int32_t refCount;
int8_t deployed;
int8_t dropped;
SMnode *pMnode;
SRWLatch latch;
SDnodeWorker readWorker;
SDnodeWorker writeWorker;
SDnodeWorker syncWorker;
int8_t replica;
int8_t selfIndex;
SReplica replicas[TSDB_MAX_REPLICA];
} SMnodeMgmt;
typedef struct {
int32_t refCount;
int8_t deployed;
int8_t dropped;
SQnode *pQnode;
SRWLatch latch;
SDnodeWorker queryWorker;
SDnodeWorker fetchWorker;
} SQnodeMgmt;
typedef struct {
int32_t refCount;
int8_t deployed;
int8_t dropped;
SSnode *pSnode;
SRWLatch latch;
SDnodeWorker writeWorker;
} SSnodeMgmt;
typedef struct {
int32_t refCount;
int8_t deployed;
int8_t dropped;
SBnode *pBnode;
SRWLatch latch;
SDnodeWorker writeWorker;
} SBnodeMgmt;
typedef struct {
SHashObj *hash;
int32_t openVnodes;
int32_t totalVnodes;
SRWLatch latch;
SWorkerPool queryPool;
SWorkerPool fetchPool;
SMWorkerPool syncPool;
SMWorkerPool writePool;
} SVnodesMgmt;
typedef struct {
void *serverRpc;
void *clientRpc;
DndMsgFp msgFp[TDMT_MAX];
} STransMgmt;
typedef struct SDnode {
EStat stat;
SDnodeObjCfg cfg;
SDnodeEnvCfg env;
SDnodeDir dir;
FileFd lockFd;
SDnodeMgmt dmgmt;
SMnodeMgmt mmgmt;
SQnodeMgmt qmgmt;
SSnodeMgmt smgmt;
SBnodeMgmt bmgmt;
SVnodesMgmt vmgmt;
STransMgmt tmgmt;
SStartupReq startup;
} SDnode;
typedef struct {
int8_t once;
SDnodeEnvCfg cfg;
} SDnodeEnv;
#ifdef __cplusplus
}
#endif
#endif /*_TD_DND_ENV_H_*/
\ No newline at end of file
...@@ -55,125 +55,12 @@ extern int32_t dDebugFlag; ...@@ -55,125 +55,12 @@ extern int32_t dDebugFlag;
typedef enum { DND_STAT_INIT, DND_STAT_RUNNING, DND_STAT_STOPPED } EStat; typedef enum { DND_STAT_INIT, DND_STAT_RUNNING, DND_STAT_STOPPED } EStat;
typedef enum { DND_WORKER_SINGLE, DND_WORKER_MULTI } EWorkerType; typedef enum { DND_WORKER_SINGLE, DND_WORKER_MULTI } EWorkerType;
typedef enum { DND_ENV_INIT = 0, DND_ENV_READY = 1, DND_ENV_CLEANUP = 2 } EEnvStat;
typedef void (*DndMsgFp)(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEps); typedef void (*DndMsgFp)(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEps);
typedef struct { EStat dndGetStat(SDnode *pDnode);
EWorkerType type; void dndSetStat(SDnode *pDnode, EStat stat);
const char *name; const char *dndStatStr(EStat stat);
int32_t minNum;
int32_t maxNum;
void *queueFp;
SDnode *pDnode;
STaosQueue *queue;
union {
SWorkerPool pool;
SMWorkerPool mpool;
};
} SDnodeWorker;
typedef struct {
char *dnode;
char *mnode;
char *snode;
char *bnode;
char *vnodes;
} SDnodeDir;
typedef struct {
int32_t dnodeId;
int32_t dropped;
int64_t clusterId;
int64_t dver;
int64_t rebootTime;
int64_t updateTime;
int8_t statusSent;
SEpSet mnodeEpSet;
char *file;
SHashObj *dnodeHash;
SDnodeEps *dnodeEps;
pthread_t *threadId;
SRWLatch latch;
SDnodeWorker mgmtWorker;
SDnodeWorker statusWorker;
} SDnodeMgmt;
typedef struct {
int32_t refCount;
int8_t deployed;
int8_t dropped;
SMnode *pMnode;
SRWLatch latch;
SDnodeWorker readWorker;
SDnodeWorker writeWorker;
SDnodeWorker syncWorker;
int8_t replica;
int8_t selfIndex;
SReplica replicas[TSDB_MAX_REPLICA];
} SMnodeMgmt;
typedef struct {
int32_t refCount;
int8_t deployed;
int8_t dropped;
SQnode *pQnode;
SRWLatch latch;
SDnodeWorker queryWorker;
SDnodeWorker fetchWorker;
} SQnodeMgmt;
typedef struct {
int32_t refCount;
int8_t deployed;
int8_t dropped;
SSnode *pSnode;
SRWLatch latch;
SDnodeWorker writeWorker;
} SSnodeMgmt;
typedef struct {
int32_t refCount;
int8_t deployed;
int8_t dropped;
SBnode *pBnode;
SRWLatch latch;
SDnodeWorker writeWorker;
} SBnodeMgmt;
typedef struct {
SHashObj *hash;
int32_t openVnodes;
int32_t totalVnodes;
SRWLatch latch;
SWorkerPool queryPool;
SWorkerPool fetchPool;
SMWorkerPool syncPool;
SMWorkerPool writePool;
} SVnodesMgmt;
typedef struct {
void *serverRpc;
void *clientRpc;
DndMsgFp msgFp[TDMT_MAX];
} STransMgmt;
typedef struct SDnode {
EStat stat;
SDnodeOpt opt;
SDnodeDir dir;
FileFd lockFd;
SDnodeMgmt dmgmt;
SMnodeMgmt mmgmt;
SQnodeMgmt qmgmt;
SSnodeMgmt smgmt;
SBnodeMgmt bmgmt;
SVnodesMgmt vmgmt;
STransMgmt tmgmt;
SStartupReq startup;
} SDnode;
EStat dndGetStat(SDnode *pDnode);
void dndSetStat(SDnode *pDnode, EStat stat);
char *dndStatStr(EStat stat);
void dndReportStartup(SDnode *pDnode, char *pName, char *pDesc); void dndReportStartup(SDnode *pDnode, char *pName, char *pDesc);
void dndGetStartup(SDnode *pDnode, SStartupReq *pStartup); void dndGetStartup(SDnode *pDnode, SStartupReq *pStartup);
......
...@@ -19,7 +19,7 @@ ...@@ -19,7 +19,7 @@
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
#include "dndInt.h" #include "dndEnv.h"
int32_t dndInitMgmt(SDnode *pDnode); int32_t dndInitMgmt(SDnode *pDnode);
void dndStopMgmt(SDnode *pDnode); void dndStopMgmt(SDnode *pDnode);
......
...@@ -19,7 +19,7 @@ ...@@ -19,7 +19,7 @@
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
#include "dndInt.h" #include "dndEnv.h"
int32_t dndInitMnode(SDnode *pDnode); int32_t dndInitMnode(SDnode *pDnode);
void dndCleanupMnode(SDnode *pDnode); void dndCleanupMnode(SDnode *pDnode);
......
...@@ -19,7 +19,7 @@ ...@@ -19,7 +19,7 @@
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
#include "dndInt.h" #include "dndEnv.h"
int32_t dndInitQnode(SDnode *pDnode); int32_t dndInitQnode(SDnode *pDnode);
void dndCleanupQnode(SDnode *pDnode); void dndCleanupQnode(SDnode *pDnode);
......
...@@ -19,7 +19,7 @@ ...@@ -19,7 +19,7 @@
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
#include "dndInt.h" #include "dndEnv.h"
int32_t dndInitSnode(SDnode *pDnode); int32_t dndInitSnode(SDnode *pDnode);
void dndCleanupSnode(SDnode *pDnode); void dndCleanupSnode(SDnode *pDnode);
......
...@@ -19,7 +19,7 @@ ...@@ -19,7 +19,7 @@
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
#include "dndInt.h" #include "dndEnv.h"
int32_t dndInitTrans(SDnode *pDnode); int32_t dndInitTrans(SDnode *pDnode);
void dndCleanupTrans(SDnode *pDnode); void dndCleanupTrans(SDnode *pDnode);
......
...@@ -19,7 +19,7 @@ ...@@ -19,7 +19,7 @@
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
#include "dndInt.h" #include "dndEnv.h"
int32_t dndInitVnodes(SDnode *pDnode); int32_t dndInitVnodes(SDnode *pDnode);
void dndCleanupVnodes(SDnode *pDnode); void dndCleanupVnodes(SDnode *pDnode);
......
...@@ -19,7 +19,7 @@ ...@@ -19,7 +19,7 @@
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
#include "dndInt.h" #include "dndEnv.h"
int32_t dndInitWorker(SDnode *pDnode, SDnodeWorker *pWorker, EWorkerType type, const char *name, int32_t minNum, int32_t dndInitWorker(SDnode *pDnode, SDnodeWorker *pWorker, EWorkerType type, const char *name, int32_t minNum,
int32_t maxNum, void *queueFp); int32_t maxNum, void *queueFp);
......
...@@ -179,7 +179,7 @@ static void dndBuildBnodeOption(SDnode *pDnode, SBnodeOpt *pOption) { ...@@ -179,7 +179,7 @@ static void dndBuildBnodeOption(SDnode *pDnode, SBnodeOpt *pOption) {
pOption->sendRedirectRspFp = dndSendRedirectRsp; pOption->sendRedirectRspFp = dndSendRedirectRsp;
pOption->dnodeId = dndGetDnodeId(pDnode); pOption->dnodeId = dndGetDnodeId(pDnode);
pOption->clusterId = dndGetClusterId(pDnode); pOption->clusterId = dndGetClusterId(pDnode);
pOption->cfg.sver = pDnode->opt.sver; pOption->sver = pDnode->env.sver;
} }
static int32_t dndOpenBnode(SDnode *pDnode) { static int32_t dndOpenBnode(SDnode *pDnode) {
......
...@@ -25,6 +25,8 @@ ...@@ -25,6 +25,8 @@
#include "tfs.h" #include "tfs.h"
#include "wal.h" #include "wal.h"
static SDnodeEnv dndEnv = {0};
EStat dndGetStat(SDnode *pDnode) { return pDnode->stat; } EStat dndGetStat(SDnode *pDnode) { return pDnode->stat; }
void dndSetStat(SDnode *pDnode, EStat stat) { void dndSetStat(SDnode *pDnode, EStat stat) {
...@@ -32,7 +34,7 @@ void dndSetStat(SDnode *pDnode, EStat stat) { ...@@ -32,7 +34,7 @@ void dndSetStat(SDnode *pDnode, EStat stat) {
pDnode->stat = stat; pDnode->stat = stat;
} }
char *dndStatStr(EStat stat) { const char *dndStatStr(EStat stat) {
switch (stat) { switch (stat) {
case DND_STAT_INIT: case DND_STAT_INIT:
return "init"; return "init";
...@@ -79,25 +81,26 @@ static FileFd dndCheckRunning(char *dataDir) { ...@@ -79,25 +81,26 @@ static FileFd dndCheckRunning(char *dataDir) {
return fd; return fd;
} }
static int32_t dndInitEnv(SDnode *pDnode, SDnodeOpt *pOption) { static int32_t dndCreateImp(SDnode *pDnode, SDnodeObjCfg *pCfg) {
pDnode->lockFd = dndCheckRunning(pOption->dataDir); pDnode->lockFd = dndCheckRunning(pCfg->dataDir);
if (pDnode->lockFd < 0) { if (pDnode->lockFd < 0) {
return -1; return -1;
} }
char path[PATH_MAX + 100]; char path[PATH_MAX + 100];
snprintf(path, sizeof(path), "%s%smnode", pOption->dataDir, TD_DIRSEP); snprintf(path, sizeof(path), "%s%smnode", pCfg->dataDir, TD_DIRSEP);
pDnode->dir.mnode = tstrdup(path); pDnode->dir.mnode = tstrdup(path);
snprintf(path, sizeof(path), "%s%svnode", pOption->dataDir, TD_DIRSEP); snprintf(path, sizeof(path), "%s%svnode", pCfg->dataDir, TD_DIRSEP);
pDnode->dir.vnodes = tstrdup(path); pDnode->dir.vnodes = tstrdup(path);
snprintf(path, sizeof(path), "%s%sdnode", pOption->dataDir, TD_DIRSEP); snprintf(path, sizeof(path), "%s%sdnode", pCfg->dataDir, TD_DIRSEP);
pDnode->dir.dnode = tstrdup(path); pDnode->dir.dnode = tstrdup(path);
snprintf(path, sizeof(path), "%s%ssnode", pOption->dataDir, TD_DIRSEP); snprintf(path, sizeof(path), "%s%ssnode", pCfg->dataDir, TD_DIRSEP);
pDnode->dir.snode = tstrdup(path); pDnode->dir.snode = tstrdup(path);
snprintf(path, sizeof(path), "%s%sbnode", pOption->dataDir, TD_DIRSEP); snprintf(path, sizeof(path), "%s%sbnode", pCfg->dataDir, TD_DIRSEP);
pDnode->dir.bnode = tstrdup(path); pDnode->dir.bnode = tstrdup(path);
if (pDnode->dir.mnode == NULL || pDnode->dir.vnodes == NULL || pDnode->dir.dnode == NULL) { if (pDnode->dir.mnode == NULL || pDnode->dir.vnodes == NULL || pDnode->dir.dnode == NULL ||
pDnode->dir.snode == NULL || pDnode->dir.bnode == NULL) {
dError("failed to malloc dir object"); dError("failed to malloc dir object");
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
...@@ -133,11 +136,12 @@ static int32_t dndInitEnv(SDnode *pDnode, SDnodeOpt *pOption) { ...@@ -133,11 +136,12 @@ static int32_t dndInitEnv(SDnode *pDnode, SDnodeOpt *pOption) {
return -1; return -1;
} }
memcpy(&pDnode->opt, pOption, sizeof(SDnodeOpt)); memcpy(&pDnode->cfg, pCfg, sizeof(SDnodeObjCfg));
memcpy(&pDnode->env, &dndEnv.cfg, sizeof(SDnodeEnvCfg));
return 0; return 0;
} }
static void dndCleanupEnv(SDnode *pDnode) { static void dndCloseImp(SDnode *pDnode) {
tfree(pDnode->dir.mnode); tfree(pDnode->dir.mnode);
tfree(pDnode->dir.vnodes); tfree(pDnode->dir.vnodes);
tfree(pDnode->dir.dnode); tfree(pDnode->dir.dnode);
...@@ -149,126 +153,95 @@ static void dndCleanupEnv(SDnode *pDnode) { ...@@ -149,126 +153,95 @@ static void dndCleanupEnv(SDnode *pDnode) {
taosCloseFile(pDnode->lockFd); taosCloseFile(pDnode->lockFd);
pDnode->lockFd = 0; pDnode->lockFd = 0;
} }
taosStopCacheRefreshWorker();
} }
SDnode *dndInit(SDnodeOpt *pOption) { SDnode *dndCreate(SDnodeObjCfg *pCfg) {
taosIgnSIGPIPE(); dInfo("start to create dnode object");
taosBlockSIGPIPE();
taosResolveCRC();
SDnode *pDnode = calloc(1, sizeof(SDnode)); SDnode *pDnode = calloc(1, sizeof(SDnode));
if (pDnode == NULL) { if (pDnode == NULL) {
dError("failed to create dnode object");
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
dError("failed to create dnode object since %s", terrstr());
return NULL; return NULL;
} }
dInfo("start to initialize TDengine");
dndSetStat(pDnode, DND_STAT_INIT); dndSetStat(pDnode, DND_STAT_INIT);
if (dndInitEnv(pDnode, pOption) != 0) { if (dndCreateImp(pDnode, pCfg) != 0) {
dError("failed to init env"); dError("failed to init dnode dir since %s", terrstr());
dndCleanup(pDnode); dndClose(pDnode);
return NULL;
}
if (rpcInit() != 0) {
dError("failed to init rpc env");
dndCleanup(pDnode);
return NULL;
}
if (walInit() != 0) {
dError("failed to init wal env");
dndCleanup(pDnode);
return NULL; return NULL;
} }
SDiskCfg dCfg; SDiskCfg dCfg;
strcpy(dCfg.dir, pDnode->opt.dataDir); strcpy(dCfg.dir, pDnode->cfg.dataDir);
dCfg.level = 0; dCfg.level = 0;
dCfg.primary = 1; dCfg.primary = 1;
if (tfsInit(&dCfg, 1) != 0) { if (tfsInit(&dCfg, 1) != 0) {
dError("failed to init tfs env"); dError("failed to init tfs since %s", terrstr());
dndCleanup(pDnode); dndClose(pDnode);
return NULL;
}
SVnodeOpt vnodeOpt = {
.sver = pDnode->opt.sver,
.timezone = pDnode->opt.timezone,
.locale = pDnode->opt.locale,
.charset = pDnode->opt.charset,
.nthreads = pDnode->opt.numOfCommitThreads,
.putReqToVQueryQFp = dndPutReqToVQueryQ,
};
if (vnodeInit(&vnodeOpt) != 0) {
dError("failed to init vnode env");
dndCleanup(pDnode);
return NULL; return NULL;
} }
if (dndInitMgmt(pDnode) != 0) { if (dndInitMgmt(pDnode) != 0) {
dError("failed to init dnode"); dError("failed to init mgmt since %s", terrstr());
dndCleanup(pDnode); dndClose(pDnode);
return NULL; return NULL;
} }
if (dndInitVnodes(pDnode) != 0) { if (dndInitVnodes(pDnode) != 0) {
dError("failed to init vnodes"); dError("failed to init vnodes since %s", terrstr());
dndCleanup(pDnode); dndClose(pDnode);
return NULL; return NULL;
} }
if (dndInitQnode(pDnode) != 0) { if (dndInitQnode(pDnode) != 0) {
dError("failed to init qnode"); dError("failed to init qnode since %s", terrstr());
dndCleanup(pDnode); dndClose(pDnode);
return NULL; return NULL;
} }
if (dndInitSnode(pDnode) != 0) { if (dndInitSnode(pDnode) != 0) {
dError("failed to init snode"); dError("failed to init snode since %s", terrstr());
dndCleanup(pDnode); dndClose(pDnode);
return NULL; return NULL;
} }
if (dndInitBnode(pDnode) != 0) { if (dndInitBnode(pDnode) != 0) {
dError("failed to init bnode"); dError("failed to init bnode since %s", terrstr());
dndCleanup(pDnode); dndClose(pDnode);
return NULL; return NULL;
} }
if (dndInitMnode(pDnode) != 0) { if (dndInitMnode(pDnode) != 0) {
dError("failed to init mnode"); dError("failed to init mnode since %s", terrstr());
dndCleanup(pDnode); dndClose(pDnode);
return NULL; return NULL;
} }
if (dndInitTrans(pDnode) != 0) { if (dndInitTrans(pDnode) != 0) {
dError("failed to init transport"); dError("failed to init transport since %s", terrstr());
dndCleanup(pDnode); dndClose(pDnode);
return NULL; return NULL;
} }
dndSetStat(pDnode, DND_STAT_RUNNING); dndSetStat(pDnode, DND_STAT_RUNNING);
dndSendStatusReq(pDnode); dndSendStatusReq(pDnode);
dndReportStartup(pDnode, "TDengine", "initialized successfully"); dndReportStartup(pDnode, "TDengine", "initialized successfully");
dInfo("TDengine is initialized successfully, pDnode:%p", pDnode); dInfo("dnode object is created, data:%p", pDnode);
return pDnode; return pDnode;
} }
void dndCleanup(SDnode *pDnode) { void dndClose(SDnode *pDnode) {
if (pDnode == NULL) return; if (pDnode == NULL) return;
if (dndGetStat(pDnode) == DND_STAT_STOPPED) { if (dndGetStat(pDnode) == DND_STAT_STOPPED) {
dError("dnode is shutting down"); dError("dnode is shutting down, data:%p", pDnode);
return; return;
} }
dInfo("start to cleanup TDengine"); dInfo("start to close dnode, data:%p", pDnode);
dndSetStat(pDnode, DND_STAT_STOPPED); dndSetStat(pDnode, DND_STAT_STOPPED);
dndCleanupTrans(pDnode); dndCleanupTrans(pDnode);
dndStopMgmt(pDnode); dndStopMgmt(pDnode);
...@@ -278,12 +251,66 @@ void dndCleanup(SDnode *pDnode) { ...@@ -278,12 +251,66 @@ void dndCleanup(SDnode *pDnode) {
dndCleanupQnode(pDnode); dndCleanupQnode(pDnode);
dndCleanupVnodes(pDnode); dndCleanupVnodes(pDnode);
dndCleanupMgmt(pDnode); dndCleanupMgmt(pDnode);
vnodeClear(); tfsCleanup();
tfsDestroy();
walCleanUp();
rpcCleanup();
dndCleanupEnv(pDnode); dndCloseImp(pDnode);
free(pDnode); free(pDnode);
dInfo("TDengine is cleaned up successfully"); dInfo("dnode object is closed, data:%p", pDnode);
} }
int32_t dndInit(const SDnodeEnvCfg *pCfg) {
if (atomic_val_compare_exchange_8(&dndEnv.once, DND_ENV_INIT, DND_ENV_READY) != DND_ENV_INIT) {
terrno = TSDB_CODE_REPEAT_INIT;
dError("failed to init dnode env since %s", terrstr());
return -1;
}
taosIgnSIGPIPE();
taosBlockSIGPIPE();
taosResolveCRC();
if (rpcInit() != 0) {
dError("failed to init rpc since %s", terrstr());
dndCleanup();
return -1;
}
if (walInit() != 0) {
dError("failed to init wal since %s", terrstr());
dndCleanup();
return -1;
}
SVnodeOpt vnodeOpt = {
.sver = pCfg->sver,
.timezone = pCfg->timezone,
.locale = pCfg->locale,
.charset = pCfg->charset,
.nthreads = pCfg->numOfCommitThreads,
.putReqToVQueryQFp = dndPutReqToVQueryQ,
};
if (vnodeInit(&vnodeOpt) != 0) {
dError("failed to init vnode since %s", terrstr());
dndCleanup();
return NULL;
}
memcpy(&dndEnv.cfg, pCfg, sizeof(SDnodeEnvCfg));
dInfo("dnode env is initialized");
return 0;
}
void dndCleanup() {
if (atomic_val_compare_exchange_8(&dndEnv.once, DND_ENV_READY, DND_ENV_CLEANUP) != DND_ENV_READY) {
dError("dnode env is already cleaned up");
return;
}
walCleanUp();
vnodeCleanup();
rpcCleanup();
taosStopCacheRefreshWorker();
dInfo("dnode env is cleaned up");
}
\ No newline at end of file
...@@ -86,7 +86,7 @@ void dndSendRedirectRsp(SDnode *pDnode, SRpcMsg *pReq) { ...@@ -86,7 +86,7 @@ void dndSendRedirectRsp(SDnode *pDnode, SRpcMsg *pReq) {
dDebug("RPC %p, req:%s is redirected, num:%d use:%d", pReq->handle, TMSG_INFO(msgType), epSet.numOfEps, epSet.inUse); dDebug("RPC %p, req:%s is redirected, num:%d use:%d", pReq->handle, TMSG_INFO(msgType), epSet.numOfEps, epSet.inUse);
for (int32_t i = 0; i < epSet.numOfEps; ++i) { for (int32_t i = 0; i < epSet.numOfEps; ++i) {
dDebug("mnode index:%d %s:%u", i, epSet.fqdn[i], epSet.port[i]); dDebug("mnode index:%d %s:%u", i, epSet.fqdn[i], epSet.port[i]);
if (strcmp(epSet.fqdn[i], pDnode->opt.localFqdn) == 0 && epSet.port[i] == pDnode->opt.serverPort) { if (strcmp(epSet.fqdn[i], pDnode->cfg.localFqdn) == 0 && epSet.port[i] == pDnode->cfg.serverPort) {
epSet.inUse = (i + 1) % epSet.numOfEps; epSet.inUse = (i + 1) % epSet.numOfEps;
} }
...@@ -289,8 +289,8 @@ PRASE_DNODE_OVER: ...@@ -289,8 +289,8 @@ PRASE_DNODE_OVER:
if (root != NULL) cJSON_Delete(root); if (root != NULL) cJSON_Delete(root);
if (fp != NULL) fclose(fp); if (fp != NULL) fclose(fp);
if (dndIsEpChanged(pDnode, pMgmt->dnodeId, pDnode->opt.localEp)) { if (dndIsEpChanged(pDnode, pMgmt->dnodeId, pDnode->cfg.localEp)) {
dError("localEp %s different with %s and need reconfigured", pDnode->opt.localEp, pMgmt->file); dError("localEp %s different with %s and need reconfigured", pDnode->cfg.localEp, pMgmt->file);
return -1; return -1;
} }
...@@ -298,7 +298,7 @@ PRASE_DNODE_OVER: ...@@ -298,7 +298,7 @@ PRASE_DNODE_OVER:
pMgmt->dnodeEps = calloc(1, sizeof(SDnodeEps) + sizeof(SDnodeEp)); pMgmt->dnodeEps = calloc(1, sizeof(SDnodeEps) + sizeof(SDnodeEp));
pMgmt->dnodeEps->num = 1; pMgmt->dnodeEps->num = 1;
pMgmt->dnodeEps->eps[0].isMnode = 1; pMgmt->dnodeEps->eps[0].isMnode = 1;
taosGetFqdnPortFromEp(pDnode->opt.firstEp, pMgmt->dnodeEps->eps[0].fqdn, &pMgmt->dnodeEps->eps[0].port); taosGetFqdnPortFromEp(pDnode->cfg.firstEp, pMgmt->dnodeEps->eps[0].fqdn, &pMgmt->dnodeEps->eps[0].port);
} }
dndResetDnodes(pDnode, pMgmt->dnodeEps); dndResetDnodes(pDnode, pMgmt->dnodeEps);
...@@ -362,24 +362,24 @@ void dndSendStatusReq(SDnode *pDnode) { ...@@ -362,24 +362,24 @@ void dndSendStatusReq(SDnode *pDnode) {
SDnodeMgmt *pMgmt = &pDnode->dmgmt; SDnodeMgmt *pMgmt = &pDnode->dmgmt;
taosRLockLatch(&pMgmt->latch); taosRLockLatch(&pMgmt->latch);
pStatus->sver = htonl(pDnode->opt.sver); pStatus->sver = htonl(pDnode->env.sver);
pStatus->dver = htobe64(pMgmt->dver); pStatus->dver = htobe64(pMgmt->dver);
pStatus->dnodeId = htonl(pMgmt->dnodeId); pStatus->dnodeId = htonl(pMgmt->dnodeId);
pStatus->clusterId = htobe64(pMgmt->clusterId); pStatus->clusterId = htobe64(pMgmt->clusterId);
pStatus->rebootTime = htobe64(pMgmt->rebootTime); pStatus->rebootTime = htobe64(pMgmt->rebootTime);
pStatus->updateTime = htobe64(pMgmt->updateTime); pStatus->updateTime = htobe64(pMgmt->updateTime);
pStatus->numOfCores = htonl(pDnode->opt.numOfCores); pStatus->numOfCores = htonl(pDnode->env.numOfCores);
pStatus->numOfSupportVnodes = htonl(pDnode->opt.numOfSupportVnodes); pStatus->numOfSupportVnodes = htonl(pDnode->cfg.numOfSupportVnodes);
tstrncpy(pStatus->dnodeEp, pDnode->opt.localEp, TSDB_EP_LEN); tstrncpy(pStatus->dnodeEp, pDnode->cfg.localEp, TSDB_EP_LEN);
pStatus->clusterCfg.statusInterval = htonl(pDnode->opt.statusInterval); pStatus->clusterCfg.statusInterval = htonl(pDnode->cfg.statusInterval);
pStatus->clusterCfg.checkTime = 0; pStatus->clusterCfg.checkTime = 0;
char timestr[32] = "1970-01-01 00:00:00.00"; char timestr[32] = "1970-01-01 00:00:00.00";
(void)taosParseTime(timestr, &pStatus->clusterCfg.checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0); (void)taosParseTime(timestr, &pStatus->clusterCfg.checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0);
pStatus->clusterCfg.checkTime = htonl(pStatus->clusterCfg.checkTime); pStatus->clusterCfg.checkTime = htonl(pStatus->clusterCfg.checkTime);
tstrncpy(pStatus->clusterCfg.timezone, pDnode->opt.timezone, TSDB_TIMEZONE_LEN); tstrncpy(pStatus->clusterCfg.timezone, pDnode->env.timezone, TSDB_TIMEZONE_LEN);
tstrncpy(pStatus->clusterCfg.locale, pDnode->opt.locale, TSDB_LOCALE_LEN); tstrncpy(pStatus->clusterCfg.locale, pDnode->env.locale, TSDB_LOCALE_LEN);
tstrncpy(pStatus->clusterCfg.charset, pDnode->opt.charset, TSDB_LOCALE_LEN); tstrncpy(pStatus->clusterCfg.charset, pDnode->env.charset, TSDB_LOCALE_LEN);
taosRUnLockLatch(&pMgmt->latch); taosRUnLockLatch(&pMgmt->latch);
dndGetVnodeLoads(pDnode, &pStatus->vnodeLoads); dndGetVnodeLoads(pDnode, &pStatus->vnodeLoads);
...@@ -485,7 +485,7 @@ void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pReq) { ...@@ -485,7 +485,7 @@ void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pReq) {
static void *dnodeThreadRoutine(void *param) { static void *dnodeThreadRoutine(void *param) {
SDnode *pDnode = param; SDnode *pDnode = param;
SDnodeMgmt *pMgmt = &pDnode->dmgmt; SDnodeMgmt *pMgmt = &pDnode->dmgmt;
int32_t ms = pDnode->opt.statusInterval * 1000; int32_t ms = pDnode->cfg.statusInterval * 1000;
while (true) { while (true) {
pthread_testcancel(); pthread_testcancel();
......
...@@ -247,7 +247,7 @@ static bool dndNeedDeployMnode(SDnode *pDnode) { ...@@ -247,7 +247,7 @@ static bool dndNeedDeployMnode(SDnode *pDnode) {
return false; return false;
} }
if (strcmp(pDnode->opt.localEp, pDnode->opt.firstEp) != 0) { if (strcmp(pDnode->cfg.localEp, pDnode->cfg.firstEp) != 0) {
return false; return false;
} }
...@@ -266,15 +266,15 @@ static void dndInitMnodeOption(SDnode *pDnode, SMnodeOpt *pOption) { ...@@ -266,15 +266,15 @@ static void dndInitMnodeOption(SDnode *pDnode, SMnodeOpt *pOption) {
pOption->putReqToMWriteQFp = dndPutMsgToMWriteQ; pOption->putReqToMWriteQFp = dndPutMsgToMWriteQ;
pOption->dnodeId = dndGetDnodeId(pDnode); pOption->dnodeId = dndGetDnodeId(pDnode);
pOption->clusterId = dndGetClusterId(pDnode); pOption->clusterId = dndGetClusterId(pDnode);
pOption->cfg.sver = pDnode->opt.sver; pOption->cfg.sver = pDnode->env.sver;
pOption->cfg.enableTelem = pDnode->opt.enableTelem; pOption->cfg.enableTelem = pDnode->env.enableTelem;
pOption->cfg.statusInterval = pDnode->opt.statusInterval; pOption->cfg.statusInterval = pDnode->cfg.statusInterval;
pOption->cfg.shellActivityTimer = pDnode->opt.shellActivityTimer; pOption->cfg.shellActivityTimer = pDnode->cfg.shellActivityTimer;
pOption->cfg.timezone = pDnode->opt.timezone; pOption->cfg.timezone = pDnode->env.timezone;
pOption->cfg.charset = pDnode->opt.charset; pOption->cfg.charset = pDnode->env.charset;
pOption->cfg.locale = pDnode->opt.locale; pOption->cfg.locale = pDnode->env.locale;
pOption->cfg.gitinfo = pDnode->opt.gitinfo; pOption->cfg.gitinfo = pDnode->env.gitinfo;
pOption->cfg.buildinfo = pDnode->opt.buildinfo; pOption->cfg.buildinfo = pDnode->env.buildinfo;
} }
static void dndBuildMnodeDeployOption(SDnode *pDnode, SMnodeOpt *pOption) { static void dndBuildMnodeDeployOption(SDnode *pDnode, SMnodeOpt *pOption) {
...@@ -283,8 +283,8 @@ static void dndBuildMnodeDeployOption(SDnode *pDnode, SMnodeOpt *pOption) { ...@@ -283,8 +283,8 @@ static void dndBuildMnodeDeployOption(SDnode *pDnode, SMnodeOpt *pOption) {
pOption->selfIndex = 0; pOption->selfIndex = 0;
SReplica *pReplica = &pOption->replicas[0]; SReplica *pReplica = &pOption->replicas[0];
pReplica->id = 1; pReplica->id = 1;
pReplica->port = pDnode->opt.serverPort; pReplica->port = pDnode->cfg.serverPort;
memcpy(pReplica->fqdn, pDnode->opt.localFqdn, TSDB_FQDN_LEN); memcpy(pReplica->fqdn, pDnode->cfg.localFqdn, TSDB_FQDN_LEN);
SMnodeMgmt *pMgmt = &pDnode->mmgmt; SMnodeMgmt *pMgmt = &pDnode->mmgmt;
pMgmt->selfIndex = pOption->selfIndex; pMgmt->selfIndex = pOption->selfIndex;
......
...@@ -185,7 +185,7 @@ static void dndBuildQnodeOption(SDnode *pDnode, SQnodeOpt *pOption) { ...@@ -185,7 +185,7 @@ static void dndBuildQnodeOption(SDnode *pDnode, SQnodeOpt *pOption) {
pOption->sendRedirectRspFp = dndSendRedirectRsp; pOption->sendRedirectRspFp = dndSendRedirectRsp;
pOption->dnodeId = dndGetDnodeId(pDnode); pOption->dnodeId = dndGetDnodeId(pDnode);
pOption->clusterId = dndGetClusterId(pDnode); pOption->clusterId = dndGetClusterId(pDnode);
pOption->cfg.sver = pDnode->opt.sver; pOption->sver = pDnode->env.sver;
} }
static int32_t dndOpenQnode(SDnode *pDnode) { static int32_t dndOpenQnode(SDnode *pDnode) {
......
...@@ -179,7 +179,7 @@ static void dndBuildSnodeOption(SDnode *pDnode, SSnodeOpt *pOption) { ...@@ -179,7 +179,7 @@ static void dndBuildSnodeOption(SDnode *pDnode, SSnodeOpt *pOption) {
pOption->sendRedirectRspFp = dndSendRedirectRsp; pOption->sendRedirectRspFp = dndSendRedirectRsp;
pOption->dnodeId = dndGetDnodeId(pDnode); pOption->dnodeId = dndGetDnodeId(pDnode);
pOption->clusterId = dndGetClusterId(pDnode); pOption->clusterId = dndGetClusterId(pDnode);
pOption->cfg.sver = pDnode->opt.sver; pOption->sver = pDnode->env.sver;
} }
static int32_t dndOpenSnode(SDnode *pDnode) { static int32_t dndOpenSnode(SDnode *pDnode) {
......
...@@ -176,7 +176,7 @@ static int32_t dndInitClient(SDnode *pDnode) { ...@@ -176,7 +176,7 @@ static int32_t dndInitClient(SDnode *pDnode) {
rpcInit.cfp = dndProcessResponse; rpcInit.cfp = dndProcessResponse;
rpcInit.sessions = 1024; rpcInit.sessions = 1024;
rpcInit.connType = TAOS_CONN_CLIENT; rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.idleTime = pDnode->opt.shellActivityTimer * 1000; rpcInit.idleTime = pDnode->cfg.shellActivityTimer * 1000;
rpcInit.user = INTERNAL_USER; rpcInit.user = INTERNAL_USER;
rpcInit.ckey = INTERNAL_CKEY; rpcInit.ckey = INTERNAL_CKEY;
rpcInit.secret = INTERNAL_SECRET; rpcInit.secret = INTERNAL_SECRET;
...@@ -325,20 +325,20 @@ static int32_t dndInitServer(SDnode *pDnode) { ...@@ -325,20 +325,20 @@ static int32_t dndInitServer(SDnode *pDnode) {
STransMgmt *pMgmt = &pDnode->tmgmt; STransMgmt *pMgmt = &pDnode->tmgmt;
dndInitMsgFp(pMgmt); dndInitMsgFp(pMgmt);
int32_t numOfThreads = (int32_t)((pDnode->opt.numOfCores * pDnode->opt.numOfThreadsPerCore) / 2.0); int32_t numOfThreads = (int32_t)((pDnode->env.numOfCores * pDnode->cfg.numOfThreadsPerCore) / 2.0);
if (numOfThreads < 1) { if (numOfThreads < 1) {
numOfThreads = 1; numOfThreads = 1;
} }
SRpcInit rpcInit; SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit)); memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localPort = pDnode->opt.serverPort; rpcInit.localPort = pDnode->cfg.serverPort;
rpcInit.label = "DND-S"; rpcInit.label = "DND-S";
rpcInit.numOfThreads = numOfThreads; rpcInit.numOfThreads = numOfThreads;
rpcInit.cfp = dndProcessRequest; rpcInit.cfp = dndProcessRequest;
rpcInit.sessions = pDnode->opt.maxShellConns; rpcInit.sessions = pDnode->cfg.maxShellConns;
rpcInit.connType = TAOS_CONN_SERVER; rpcInit.connType = TAOS_CONN_SERVER;
rpcInit.idleTime = pDnode->opt.shellActivityTimer * 1000; rpcInit.idleTime = pDnode->cfg.shellActivityTimer * 1000;
rpcInit.afp = dndRetrieveUserAuthInfo; rpcInit.afp = dndRetrieveUserAuthInfo;
rpcInit.parent = pDnode; rpcInit.parent = pDnode;
......
...@@ -420,7 +420,7 @@ static int32_t dndOpenVnodes(SDnode *pDnode) { ...@@ -420,7 +420,7 @@ static int32_t dndOpenVnodes(SDnode *pDnode) {
pMgmt->totalVnodes = numOfVnodes; pMgmt->totalVnodes = numOfVnodes;
int32_t threadNum = pDnode->opt.numOfCores; int32_t threadNum = pDnode->env.numOfCores;
int32_t vnodesPerThread = numOfVnodes / threadNum + 1; int32_t vnodesPerThread = numOfVnodes / threadNum + 1;
SVnodeThread *threads = calloc(threadNum, sizeof(SVnodeThread)); SVnodeThread *threads = calloc(threadNum, sizeof(SVnodeThread));
...@@ -904,11 +904,11 @@ static int32_t dndInitVnodeWorkers(SDnode *pDnode) { ...@@ -904,11 +904,11 @@ static int32_t dndInitVnodeWorkers(SDnode *pDnode) {
SVnodesMgmt *pMgmt = &pDnode->vmgmt; SVnodesMgmt *pMgmt = &pDnode->vmgmt;
int32_t maxFetchThreads = 4; int32_t maxFetchThreads = 4;
int32_t minFetchThreads = MIN(maxFetchThreads, pDnode->opt.numOfCores); int32_t minFetchThreads = MIN(maxFetchThreads, pDnode->env.numOfCores);
int32_t minQueryThreads = MAX((int32_t)(pDnode->opt.numOfCores * pDnode->opt.ratioOfQueryCores), 1); int32_t minQueryThreads = MAX((int32_t)(pDnode->env.numOfCores * pDnode->cfg.ratioOfQueryCores), 1);
int32_t maxQueryThreads = minQueryThreads; int32_t maxQueryThreads = minQueryThreads;
int32_t maxWriteThreads = MAX(pDnode->opt.numOfCores, 1); int32_t maxWriteThreads = MAX(pDnode->env.numOfCores, 1);
int32_t maxSyncThreads = MAX(pDnode->opt.numOfCores / 2, 1); int32_t maxSyncThreads = MAX(pDnode->env.numOfCores / 2, 1);
SWorkerPool *pPool = &pMgmt->queryPool; SWorkerPool *pPool = &pMgmt->queryPool;
pPool->name = "vnode-query"; pPool->name = "vnode-query";
......
...@@ -24,7 +24,7 @@ class TestServer { ...@@ -24,7 +24,7 @@ class TestServer {
bool DoStart(); bool DoStart();
private: private:
SDnodeOpt BuildOption(const char* path, const char* fqdn, uint16_t port, const char* firstEp); SDnodeObjCfg BuildOption(const char* path, const char* fqdn, uint16_t port, const char* firstEp);
private: private:
SDnode* pDnode; SDnode* pDnode;
......
...@@ -22,30 +22,27 @@ void* serverLoop(void* param) { ...@@ -22,30 +22,27 @@ void* serverLoop(void* param) {
} }
} }
SDnodeOpt TestServer::BuildOption(const char* path, const char* fqdn, uint16_t port, const char* firstEp) { SDnodeObjCfg TestServer::BuildOption(const char* path, const char* fqdn, uint16_t port, const char* firstEp) {
SDnodeOpt option = {0}; SDnodeObjCfg cfg = {0};
option.sver = 1; cfg.numOfSupportVnodes = 16;
option.numOfCores = 1; cfg.statusInterval = 1;
option.numOfSupportVnodes = 16; cfg.numOfThreadsPerCore = 1;
option.numOfCommitThreads = 1; cfg.ratioOfQueryCores = 1;
option.statusInterval = 1; cfg.maxShellConns = 1000;
option.numOfThreadsPerCore = 1; cfg.shellActivityTimer = 30;
option.ratioOfQueryCores = 1; cfg.serverPort = port;
option.maxShellConns = 1000; strcpy(cfg.dataDir, path);
option.shellActivityTimer = 30; snprintf(cfg.localEp, TSDB_EP_LEN, "%s:%u", fqdn, port);
option.serverPort = port; snprintf(cfg.localFqdn, TSDB_FQDN_LEN, "%s", fqdn);
strcpy(option.dataDir, path); snprintf(cfg.firstEp, TSDB_EP_LEN, "%s", firstEp);
snprintf(option.localEp, TSDB_EP_LEN, "%s:%u", fqdn, port); return cfg;
snprintf(option.localFqdn, TSDB_FQDN_LEN, "%s", fqdn);
snprintf(option.firstEp, TSDB_EP_LEN, "%s", firstEp);
return option;
} }
bool TestServer::DoStart() { bool TestServer::DoStart() {
SDnodeOpt option = BuildOption(path, fqdn, port, firstEp); SDnodeObjCfg cfg = BuildOption(path, fqdn, port, firstEp);
taosMkDir(path); taosMkDir(path);
pDnode = dndInit(&option); pDnode = dndCreate(&cfg);
if (pDnode != NULL) { if (pDnode != NULL) {
return false; return false;
} }
...@@ -81,7 +78,7 @@ void TestServer::Stop() { ...@@ -81,7 +78,7 @@ void TestServer::Stop() {
} }
if (pDnode != NULL) { if (pDnode != NULL) {
dndCleanup(pDnode); dndClose(pDnode);
pDnode = NULL; pDnode = NULL;
} }
} }
...@@ -43,6 +43,11 @@ void Testbase::InitLog(const char* path) { ...@@ -43,6 +43,11 @@ void Testbase::InitLog(const char* path) {
} }
void Testbase::Init(const char* path, int16_t port) { void Testbase::Init(const char* path, int16_t port) {
SDnodeEnvCfg cfg = {0};
cfg.numOfCommitThreads = 1;
cfg.numOfCores = 1;
dndInit(&cfg);
char fqdn[] = "localhost"; char fqdn[] = "localhost";
char firstEp[TSDB_EP_LEN] = {0}; char firstEp[TSDB_EP_LEN] = {0};
snprintf(firstEp, TSDB_EP_LEN, "%s:%u", fqdn, port); snprintf(firstEp, TSDB_EP_LEN, "%s:%u", fqdn, port);
...@@ -56,6 +61,7 @@ void Testbase::Init(const char* path, int16_t port) { ...@@ -56,6 +61,7 @@ void Testbase::Init(const char* path, int16_t port) {
void Testbase::Cleanup() { void Testbase::Cleanup() {
server.Stop(); server.Stop();
client.Cleanup(); client.Cleanup();
dndCleanup();
} }
void Testbase::Restart() { server.Restart(); } void Testbase::Restart() { server.Restart(); }
......
...@@ -29,12 +29,7 @@ extern "C" { ...@@ -29,12 +29,7 @@ extern "C" {
#endif #endif
typedef struct SQnode { typedef struct SQnode {
int32_t dnodeId; SQnodeOpt opt;
int64_t clusterId;
SQnodeCfg cfg;
SendReqToDnodeFp sendReqToDnodeFp;
SendReqToMnodeFp sendReqToMnodeFp;
SendRedirectRspFp sendRedirectRspFp;
} SQnode; } SQnode;
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -29,12 +29,7 @@ extern "C" { ...@@ -29,12 +29,7 @@ extern "C" {
#endif #endif
typedef struct SSnode { typedef struct SSnode {
int32_t dnodeId; SSnodeOpt cfg;
int64_t clusterId;
SSnodeCfg cfg;
SendReqToDnodeFp sendReqToDnodeFp;
SendReqToMnodeFp sendReqToMnodeFp;
SendRedirectRspFp sendRedirectRspFp;
} SSnode; } SSnode;
#ifdef __cplusplus #ifdef __cplusplus
......
add_subdirectory(meta) aux_source_directory(src/meta META_SRC)
add_subdirectory(tq) aux_source_directory(src/tq TQ_SRC)
add_subdirectory(tsdb) aux_source_directory(src/tsdb TSDB_SRC)
add_subdirectory(impl) aux_source_directory(src/vnd VND_SRC)
\ No newline at end of file list(APPEND
VNODE_SRC
${META_SRC}
${TQ_SRC}
${TSDB_SRC}
${VND_SRC}
)
add_library(vnode STATIC ${VNODE_SRC})
target_include_directories(
vnode
PUBLIC inc
PRIVATE src/inc
)
target_link_libraries(
vnode
PUBLIC os
PUBLIC util
PUBLIC common
PUBLIC transport
PUBLIC bdb
PUBLIC tfs
PUBLIC wal
PUBLIC qworker
)
if(${BUILD_TEST})
# add_subdirectory(test)
endif(${BUILD_TEST})
# Vnode API test
add_executable(vnodeApiTests "")
target_sources(vnodeApiTests
PRIVATE
"vnodeApiTests.cpp"
)
target_link_libraries(vnodeApiTests vnode gtest gtest_main)
add_test(
NAME vnode_api_tests
COMMAND ${CMAKE_CURRENT_BINARY_DIR}/vnodeApiTests
)
\ No newline at end of file
// https://stackoverflow.com/questions/8565666/benchmarking-with-googletest
// https://github.com/google/benchmark
\ No newline at end of file
/**
* @file vnodeApiTests.cpp
* @author hzcheng (hzcheng@taosdata.com)
* @brief VNODE module API tests
* @version 0.1
* @date 2021-12-13
*
* @copyright Copyright (c) 2021
*
*/
#include <gtest/gtest.h>
#include <iostream>
#include "vnode.h"
static STSchema *vtCreateBasicSchema() {
STSchemaBuilder sb;
STSchema * pSchema = NULL;
tdInitTSchemaBuilder(&sb, 0);
tdAddColToSchema(&sb, TSDB_DATA_TYPE_TIMESTAMP, 0, 0);
for (int i = 1; i < 10; i++) {
tdAddColToSchema(&sb, TSDB_DATA_TYPE_INT, i, 0);
}
pSchema = tdGetSchemaFromBuilder(&sb);
tdDestroyTSchemaBuilder(&sb);
return pSchema;
}
static STSchema *vtCreateBasicTagSchema() {
STSchemaBuilder sb;
STSchema * pSchema = NULL;
tdInitTSchemaBuilder(&sb, 0);
tdAddColToSchema(&sb, TSDB_DATA_TYPE_TIMESTAMP, 0, 0);
for (int i = 10; i < 12; i++) {
tdAddColToSchema(&sb, TSDB_DATA_TYPE_BINARY, i, 20);
}
pSchema = tdGetSchemaFromBuilder(&sb);
tdDestroyTSchemaBuilder(&sb);
return pSchema;
}
static SKVRow vtCreateBasicTag() {
SKVRowBuilder rb;
SKVRow pTag;
tdInitKVRowBuilder(&rb);
for (int i = 0; i < 2; i++) {
void *pVal = malloc(sizeof(VarDataLenT) + strlen("foo"));
varDataLen(pVal) = strlen("foo");
memcpy(varDataVal(pVal), "foo", strlen("foo"));
tdAddColToKVRow(&rb, i, TSDB_DATA_TYPE_BINARY, pVal);
free(pVal);
}
pTag = tdGetKVRowFromBuilder(&rb);
tdDestroyKVRowBuilder(&rb);
return pTag;
}
static void vtBuildCreateStbReq(tb_uid_t suid, char *tbname, SRpcMsg **ppMsg) {
SRpcMsg * pMsg;
STSchema *pSchema;
STSchema *pTagSchema;
int zs;
void * pBuf;
pSchema = vtCreateBasicSchema();
pTagSchema = vtCreateBasicTagSchema();
SVnodeReq vCreateSTbReq;
vnodeSetCreateStbReq(&vCreateSTbReq, tbname, UINT32_MAX, UINT32_MAX, suid, pSchema, pTagSchema);
zs = vnodeBuildReq(NULL, &vCreateSTbReq, TDMT_VND_CREATE_STB);
pMsg = (SRpcMsg *)malloc(sizeof(SRpcMsg) + zs);
pMsg->msgType = TDMT_VND_CREATE_STB;
pMsg->contLen = zs;
pMsg->pCont = POINTER_SHIFT(pMsg, sizeof(SRpcMsg));
pBuf = pMsg->pCont;
vnodeBuildReq(&pBuf, &vCreateSTbReq, TDMT_VND_CREATE_STB);
META_CLEAR_TB_CFG(&vCreateSTbReq);
tdFreeSchema(pSchema);
tdFreeSchema(pTagSchema);
*ppMsg = pMsg;
}
static void vtBuildCreateCtbReq(tb_uid_t suid, char *tbname, SRpcMsg **ppMsg) {
SRpcMsg *pMsg;
int tz;
SKVRow pTag = vtCreateBasicTag();
SVnodeReq vCreateCTbReq;
vnodeSetCreateCtbReq(&vCreateCTbReq, tbname, UINT32_MAX, UINT32_MAX, suid, pTag);
tz = vnodeBuildReq(NULL, &vCreateCTbReq, TDMT_VND_CREATE_TABLE);
pMsg = (SRpcMsg *)malloc(sizeof(SRpcMsg) + tz);
pMsg->msgType = TDMT_VND_CREATE_TABLE;
pMsg->contLen = tz;
pMsg->pCont = POINTER_SHIFT(pMsg, sizeof(*pMsg));
void *pBuf = pMsg->pCont;
vnodeBuildReq(&pBuf, &vCreateCTbReq, TDMT_VND_CREATE_TABLE);
META_CLEAR_TB_CFG(&vCreateCTbReq);
free(pTag);
*ppMsg = pMsg;
}
static void vtBuildCreateNtbReq(char *tbname, SRpcMsg **ppMsg) {
// TODO
}
static void vtBuildSubmitReq(SRpcMsg **ppMsg) {
SRpcMsg * pMsg;
SSubmitMsg *pSubmitMsg;
SSubmitBlk *pSubmitBlk;
int tz = 1024; // TODO
pMsg = (SRpcMsg *)malloc(sizeof(*pMsg) + tz);
pMsg->msgType = TDMT_VND_SUBMIT;
pMsg->contLen = tz;
pMsg->pCont = POINTER_SHIFT(pMsg, sizeof(*pMsg));
// For submit msg header
pSubmitMsg = (SSubmitMsg *)(pMsg->pCont);
// pSubmitMsg->header.contLen = 0;
// pSubmitMsg->header.vgId = 0;
// pSubmitMsg->length = 0;
pSubmitMsg->numOfBlocks = 1;
// For submit blk
pSubmitBlk = (SSubmitBlk *)(pSubmitMsg->blocks);
pSubmitBlk->uid = 0;
pSubmitBlk->tid = 0;
pSubmitBlk->padding = 0;
pSubmitBlk->sversion = 0;
pSubmitBlk->dataLen = 0;
pSubmitBlk->numOfRows = 0;
// For row batch
*ppMsg = pMsg;
}
static void vtClearMsgBatch(SArray *pMsgArr) {
SRpcMsg *pMsg;
for (size_t i = 0; i < taosArrayGetSize(pMsgArr); i++) {
pMsg = *(SRpcMsg **)taosArrayGet(pMsgArr, i);
free(pMsg);
}
taosArrayClear(pMsgArr);
}
static void vtProcessAndApplyReqs(SVnode *pVnode, SArray *pMsgArr) {
int rcode;
SRpcMsg *pReq;
SRpcMsg *pRsp;
rcode = vnodeProcessWMsgs(pVnode, pMsgArr);
GTEST_ASSERT_EQ(rcode, 0);
for (size_t i = 0; i < taosArrayGetSize(pMsgArr); i++) {
pReq = *(SRpcMsg **)taosArrayGet(pMsgArr, i);
rcode = vnodeApplyWMsg(pVnode, pReq, NULL);
GTEST_ASSERT_EQ(rcode, 0);
}
}
TEST(vnodeApiTest, vnode_simple_create_table_test) {
tb_uid_t suid = 1638166374163;
SRpcMsg *pMsg;
SArray * pMsgArr = NULL;
SVnode * pVnode;
int rcode;
int ntables = 1000000;
int batch = 10;
char tbname[128];
pMsgArr = (SArray *)taosArrayInit(batch, sizeof(pMsg));
vnodeDestroy("vnode1");
GTEST_ASSERT_GE(vnodeInit(2), 0);
// CREATE AND OPEN A VNODE
pVnode = vnodeOpen("vnode1", NULL);
ASSERT_NE(pVnode, nullptr);
// CREATE A SUPER TABLE
sprintf(tbname, "st");
vtBuildCreateStbReq(suid, tbname, &pMsg);
taosArrayPush(pMsgArr, &pMsg);
vtProcessAndApplyReqs(pVnode, pMsgArr);
vtClearMsgBatch(pMsgArr);
// CREATE A LOT OF CHILD TABLES
for (int i = 0; i < ntables / batch; i++) {
// Build request batch
for (int j = 0; j < batch; j++) {
sprintf(tbname, "ct%d", i * batch + j + 1);
vtBuildCreateCtbReq(suid, tbname, &pMsg);
taosArrayPush(pMsgArr, &pMsg);
}
// Process request batch
vtProcessAndApplyReqs(pVnode, pMsgArr);
// Clear request batch
vtClearMsgBatch(pMsgArr);
}
// CLOSE THE VNODE
vnodeClose(pVnode);
vnodeClear();
taosArrayDestroy(pMsgArr);
}
TEST(vnodeApiTest, vnode_simple_insert_test) {
const char *vname = "vnode2";
char tbname[128];
tb_uid_t suid = 1638166374163;
SRpcMsg * pMsg;
SArray * pMsgArr;
int rcode;
SVnode * pVnode;
int batch = 1;
int loop = 1000000;
pMsgArr = (SArray *)taosArrayInit(0, sizeof(pMsg));
vnodeDestroy(vname);
GTEST_ASSERT_GE(vnodeInit(2), 0);
// Open a vnode
pVnode = vnodeOpen(vname, NULL);
GTEST_ASSERT_NE(pVnode, nullptr);
// 1. CREATE A SUPER TABLE
sprintf(tbname, "st");
vtBuildCreateStbReq(suid, tbname, &pMsg);
taosArrayPush(pMsgArr, &pMsg);
vtProcessAndApplyReqs(pVnode, pMsgArr);
vtClearMsgBatch(pMsgArr);
// 2. CREATE A CHILD TABLE
sprintf(tbname, "t0");
vtBuildCreateCtbReq(suid, tbname, &pMsg);
taosArrayPush(pMsgArr, &pMsg);
vtProcessAndApplyReqs(pVnode, pMsgArr);
vtClearMsgBatch(pMsgArr);
// 3. WRITE A LOT OF TIME-SERIES DATA
for (int j = 0; j < loop; j++) {
for (int i = 0; i < batch; i++) {
vtBuildSubmitReq(&pMsg);
taosArrayPush(pMsgArr, &pMsg);
}
vtProcessAndApplyReqs(pVnode, pMsgArr);
vtClearMsgBatch(pMsgArr);
}
// Close the vnode
vnodeClose(pVnode);
vnodeClear();
taosArrayDestroy(pMsgArr);
}
\ No newline at end of file
...@@ -89,10 +89,10 @@ typedef struct { ...@@ -89,10 +89,10 @@ typedef struct {
int vnodeInit(const SVnodeOpt *pOption); int vnodeInit(const SVnodeOpt *pOption);
/** /**
* @brief clear a vnode * @brief Cleanup the vnode module
* *
*/ */
void vnodeClear(); void vnodeCleanup();
/** /**
* @brief Open a VNODE. * @brief Open a VNODE.
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "metaDef.h"
#include "sqlite3.h"
struct SMetaDB {
sqlite3 *pDB;
};
int metaOpenDB(SMeta *pMeta) {
char dir[128];
int rc;
char *err = NULL;
pMeta->pDB = (SMetaDB *)calloc(1, sizeof(SMetaDB));
if (pMeta->pDB == NULL) {
// TODO: handle error
return -1;
}
sprintf(dir, "%s/meta.db", pMeta->path);
rc = sqlite3_open(dir, &(pMeta->pDB->pDB));
if (rc != SQLITE_OK) {
// TODO: handle error
printf("failed to open meta.db\n");
}
// For all tables
rc = sqlite3_exec(pMeta->pDB->pDB,
"CREATE TABLE IF NOT EXISTS tb ("
" tbname VARCHAR(256) NOT NULL UNIQUE,"
" tb_uid INTEGER NOT NULL UNIQUE "
");",
NULL, NULL, &err);
if (rc != SQLITE_OK) {
// TODO: handle error
printf("failed to create meta table tb since %s\n", err);
}
// For super tables
rc = sqlite3_exec(pMeta->pDB->pDB,
"CREATE TABLE IF NOT EXISTS stb ("
" tb_uid INTEGER NOT NULL UNIQUE,"
" tbname VARCHAR(256) NOT NULL UNIQUE,"
" tb_schema BLOB NOT NULL,"
" tag_schema BLOB NOT NULL"
");",
NULL, NULL, &err);
if (rc != SQLITE_OK) {
// TODO: handle error
printf("failed to create meta table stb since %s\n", err);
}
// For normal tables
rc = sqlite3_exec(pMeta->pDB->pDB,
"CREATE TABLE IF NOT EXISTS ntb ("
" tb_uid INTEGER NOT NULL UNIQUE,"
" tbname VARCHAR(256) NOT NULL,"
" tb_schema BLOB NOT NULL"
");",
NULL, NULL, &err);
if (rc != SQLITE_OK) {
// TODO: handle error
printf("failed to create meta table ntb since %s\n", err);
}
sqlite3_exec(pMeta->pDB->pDB, "BEGIN;", NULL, NULL, &err);
tfree(err);
return 0;
}
void metaCloseDB(SMeta *pMeta) {
if (pMeta->pDB) {
sqlite3_exec(pMeta->pDB->pDB, "COMMIT;", NULL, NULL, NULL);
sqlite3_close(pMeta->pDB->pDB);
free(pMeta->pDB);
pMeta->pDB = NULL;
}
// TODO
}
int metaSaveTableToDB(SMeta *pMeta, const STbCfg *pTbCfg) {
char sql[256];
char * err = NULL;
int rc;
tb_uid_t uid;
sqlite3_stmt *stmt;
char buf[256];
void * pBuf;
switch (pTbCfg->type) {
case META_SUPER_TABLE:
uid = pTbCfg->stbCfg.suid;
sprintf(sql,
"INSERT INTO tb VALUES (\'%s\', %" PRIu64
");"
"CREATE TABLE IF NOT EXISTS stb_%" PRIu64
" ("
" tb_uid INTEGER NOT NULL UNIQUE,"
" tbname VARCHAR(256),"
" tag1 INTEGER);",
pTbCfg->name, uid, uid);
rc = sqlite3_exec(pMeta->pDB->pDB, sql, NULL, NULL, &err);
if (rc != SQLITE_OK) {
printf("failed to create normal table since %s\n", err);
}
sprintf(sql, "INSERT INTO stb VALUES (%" PRIu64 ", %s, ?, ?)", uid, pTbCfg->name);
sqlite3_prepare_v2(pMeta->pDB->pDB, sql, -1, &stmt, NULL);
pBuf = buf;
tdEncodeSchema(&pBuf, pTbCfg->stbCfg.pSchema);
sqlite3_bind_blob(stmt, 1, buf, POINTER_DISTANCE(pBuf, buf), NULL);
pBuf = buf;
tdEncodeSchema(&pBuf, pTbCfg->stbCfg.pTagSchema);
sqlite3_bind_blob(stmt, 2, buf, POINTER_DISTANCE(pBuf, buf), NULL);
sqlite3_step(stmt);
sqlite3_finalize(stmt);
#if 0
sprintf(sql,
"INSERT INTO tb VALUES (?, ?);"
// "INSERT INTO stb VALUES (?, ?, ?, ?);"
// "CREATE TABLE IF NOT EXISTS stb_%" PRIu64
// " ("
// " tb_uid INTEGER NOT NULL UNIQUE,"
// " tbname VARCHAR(256),"
// " tag1 INTEGER);"
,
uid);
rc = sqlite3_prepare_v2(pMeta->pDB->pDB, sql, -1, &stmt, NULL);
if (rc != SQLITE_OK) {
return -1;
}
sqlite3_bind_text(stmt, 1, pTbCfg->name, -1, SQLITE_TRANSIENT);
sqlite3_bind_int64(stmt, 2, uid);
sqlite3_step(stmt);
sqlite3_finalize(stmt);
// sqlite3_bind_int64(stmt, 3, uid);
// sqlite3_bind_text(stmt, 4, pTbCfg->name, -1, SQLITE_TRANSIENT);
// pBuf = buf;
// tdEncodeSchema(&pBuf, pTbCfg->stbCfg.pSchema);
// sqlite3_bind_blob(stmt, 5, buf, POINTER_DISTANCE(pBuf, buf), NULL);
// pBuf = buf;
// tdEncodeSchema(&pBuf, pTbCfg->stbCfg.pTagSchema);
// sqlite3_bind_blob(stmt, 6, buf, POINTER_DISTANCE(pBuf, buf), NULL);
rc = sqliteVjj3_step(stmt);
if (rc != SQLITE_OK) {
printf("failed to create normal table since %s\n", sqlite3_errmsg(pMeta->pDB->pDB));
}
sqlite3_finalize(stmt);
#endif
break;
case META_NORMAL_TABLE:
// uid = metaGenerateUid(pMeta);
// sprintf(sql,
// "INSERT INTO tb VALUES (\'%s\', %" PRIu64
// ");"
// "INSERT INTO ntb VALUES (%" PRIu64 ", \'%s\', );",
// pTbCfg->name, uid, uid, pTbCfg->name, );
// rc = sqlite3_exec(pMeta->pDB->pDB, sql, NULL, NULL, &err);
// if (rc != SQLITE_OK) {
// printf("failed to create normal table since %s\n", err);
// }
break;
case META_CHILD_TABLE:
#if 0
uid = metaGenerateUid(pMeta);
// sprintf(sql, "INSERT INTO tb VALUES (\'%s\', %" PRIu64
// ");"
// "INSERT INTO stb_%" PRIu64 " VALUES (%" PRIu64 ", \'%s\', );");
rc = sqlite3_exec(pMeta->pDB->pDB, sql, NULL, NULL, &err);
if (rc != SQLITE_OK) {
printf("failed to create child table since %s\n", err);
}
#endif
break;
default:
break;
}
tfree(err);
return 0;
}
int metaRemoveTableFromDb(SMeta *pMeta, tb_uid_t uid) {
/* TODO */
return 0;
}
\ No newline at end of file
# add_executable(metaTest "")
# target_sources(metaTest
# PRIVATE
# "../src/metaMain.c"
# "../src/metaUid.c"
# "metaTests.cpp"
# )
# target_include_directories(metaTest
# PUBLIC
# "${CMAKE_SOURCE_DIR}/include/server/vnode/meta"
# "${CMAKE_CURRENT_SOURCE_DIR}/../inc"
# )
# target_link_libraries(metaTest
# os
# util
# common
# gtest_main
# tkv
# )
# enable_testing()
# add_test(
# NAME meta_test
# COMMAND metaTest
# )
#if 0
#include <gtest/gtest.h>
#include <string.h>
#include <iostream>
#include "meta.h"
static STSchema *metaGetSimpleSchema() {
STSchema * pSchema = NULL;
STSchemaBuilder sb = {0};
tdInitTSchemaBuilder(&sb, 0);
tdAddColToSchema(&sb, TSDB_DATA_TYPE_TIMESTAMP, 0, 8);
tdAddColToSchema(&sb, TSDB_DATA_TYPE_INT, 1, 4);
pSchema = tdGetSchemaFromBuilder(&sb);
tdDestroyTSchemaBuilder(&sb);
return pSchema;
}
static SKVRow metaGetSimpleTags() {
SKVRowBuilder kvrb = {0};
SKVRow row;
tdInitKVRowBuilder(&kvrb);
int64_t ts = 1634287978000;
int32_t a = 10;
tdAddColToKVRow(&kvrb, 0, TSDB_DATA_TYPE_TIMESTAMP, (void *)(&ts));
tdAddColToKVRow(&kvrb, 0, TSDB_DATA_TYPE_INT, (void *)(&a));
row = tdGetKVRowFromBuilder(&kvrb);
tdDestroyKVRowBuilder(&kvrb);
return row;
}
TEST(MetaTest, DISABLED_meta_create_1m_normal_tables_test) {
// Open Meta
SMeta *meta = metaOpen(NULL, NULL);
std::cout << "Meta is opened!" << std::endl;
// Create 1000000 normal tables
META_TABLE_OPTS_DECLARE(tbOpts);
STSchema *pSchema = metaGetSimpleSchema();
char tbname[128];
for (size_t i = 0; i < 1000000; i++) {
sprintf(tbname, "ntb%ld", i);
metaNormalTableOptsInit(&tbOpts, tbname, pSchema);
metaCreateTable(meta, &tbOpts);
metaTableOptsClear(&tbOpts);
}
tdFreeSchema(pSchema);
// Close Meta
metaClose(meta);
std::cout << "Meta is closed!" << std::endl;
// Destroy Meta
metaDestroy("meta");
std::cout << "Meta is destroyed!" << std::endl;
}
TEST(MetaTest, meta_create_1m_child_tables_test) {
// Open Meta
SMeta *meta = metaOpen(NULL);
std::cout << "Meta is opened!" << std::endl;
// Create a super tables
tb_uid_t uid = 477529885843758ul;
META_TABLE_OPTS_DECLARE(tbOpts);
STSchema *pSchema = metaGetSimpleSchema();
STSchema *pTagSchema = metaGetSimpleSchema();
metaSuperTableOptsInit(&tbOpts, "st", uid, pSchema, pTagSchema);
metaCreateTable(meta, &tbOpts);
metaTableOptsClear(&tbOpts);
tdFreeSchema(pSchema);
tdFreeSchema(pTagSchema);
// Create 1000000 child tables
char name[128];
SKVRow row = metaGetSimpleTags();
for (size_t i = 0; i < 1000000; i++) {
sprintf(name, "ctb%ld", i);
metaChildTableOptsInit(&tbOpts, name, uid, row);
metaCreateTable(meta, &tbOpts);
metaTableOptsClear(&tbOpts);
}
kvRowFree(row);
// Close Meta
metaClose(meta);
std::cout << "Meta is closed!" << std::endl;
// Destroy Meta
metaDestroy("meta");
std::cout << "Meta is destroyed!" << std::endl;
}
#endif
\ No newline at end of file
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
#define _TD_VNODE_DEF_H_ #define _TD_VNODE_DEF_H_
#include "mallocator.h" #include "mallocator.h"
#include "sync.h" // #include "sync.h"
#include "tcoding.h" #include "tcoding.h"
#include "tlist.h" #include "tlist.h"
#include "tlockfree.h" #include "tlockfree.h"
......
...@@ -19,7 +19,7 @@ ...@@ -19,7 +19,7 @@
#include "vnode.h" #include "vnode.h"
#include "meta.h" #include "meta.h"
#include "sync.h" // #include "sync.h"
#include "tlog.h" #include "tlog.h"
#include "tq.h" #include "tq.h"
#include "tsdb.h" #include "tsdb.h"
......
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
#ifndef _TD_VNODE_SYNC_H_ #ifndef _TD_VNODE_SYNC_H_
#define _TD_VNODE_SYNC_H_ #define _TD_VNODE_SYNC_H_
#include "sync.h" // #include "sync.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
......
...@@ -13,7 +13,9 @@ ...@@ -13,7 +13,9 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#ifdef USE_INVERTED_INDEX
#include "index.h" #include "index.h"
#endif
#include "metaDef.h" #include "metaDef.h"
struct SMetaIdx { struct SMetaIdx {
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册