提交 4e902908 编写于 作者: L Liu Jicong

Merge branch '3.0' into feature/tq

......@@ -706,41 +706,31 @@ typedef struct {
} SStatusRsp;
typedef struct {
uint32_t vgId;
int32_t dbCfgVersion;
int32_t maxTables;
int32_t cacheBlockSize;
int32_t totalBlocks;
int32_t daysPerFile;
int32_t daysToKeep;
int32_t daysToKeep1;
int32_t daysToKeep2;
int32_t minRowsPerFileBlock;
int32_t maxRowsPerFileBlock;
int32_t commitTime;
int32_t fsyncPeriod;
int8_t precision;
int8_t compression;
int8_t walLevel;
int8_t vgReplica;
int8_t wals;
int8_t quorum;
int8_t update;
int8_t cacheLastRow;
int32_t vgCfgVersion;
int8_t dbReplica;
int8_t dbType;
int8_t reserved[8];
} SVnodeCfg;
typedef struct {
int32_t nodeId;
char nodeEp[TSDB_EP_LEN];
uint16_t port;
char fqdn[TSDB_FQDN_LEN];
} SVnodeDesc;
typedef struct {
char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN];
SVnodeCfg cfg;
uint32_t vgId;
int32_t cacheBlockSize;
int32_t totalBlocks;
int32_t daysPerFile;
int32_t daysToKeep0;
int32_t daysToKeep1;
int32_t daysToKeep2;
int32_t minRowsPerFileBlock;
int32_t maxRowsPerFileBlock;
int32_t fsyncPeriod;
int8_t reserved[16];
int8_t precision;
int8_t compression;
int8_t cacheLastRow;
int8_t update;
int8_t walLevel;
int8_t replica;
int8_t quorum;
int8_t selfIndex;
SVnodeDesc nodes[TSDB_MAX_REPLICA];
} SCreateVnodeMsg, SAlterVnodeMsg;
......
......@@ -308,7 +308,7 @@ static FORCE_INLINE void tdCopyColOfRowBySchema(SDataRow dst, STSchema *pDstSche
SET_DOUBLE_PTR(pData, value);
break;
case TSDB_DATA_TYPE_TIMESTAMP:
if (pSrcSchema->columns[srcIdx].colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
if (pSrcSchema->columns[srcIdx].colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
*(TSKEY *)pData = tdGetKey(*(TKEY *)value);
} else {
*(TSKEY *)pData = *(TSKEY *)value;
......
......@@ -23,6 +23,8 @@ extern "C" {
#include "taosdef.h"
#include "taosmsg.h"
#define TIME_IS_VAR_DURATION(_t) ((_t) == 'n' || (_t) == 'y' || (_t) == 'N' || (_t) == 'Y')
/*
* @return timestamp decided by global conf variable, tsTimePrecision
* if precision == TSDB_TIME_PRECISION_MICRO, it returns timestamp in microsecond.
......@@ -50,7 +52,6 @@ void deltaToUtcInitOnce();
int64_t convertTimePrecision(int64_t time, int32_t fromPrecision, int32_t toPrecision);
#ifdef __cplusplus
}
#endif
......
......@@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_TFUNCTION_H
#define TDENGINE_TFUNCTION_H
#ifndef TDENGINE_FUNCTION_H
#define TDENGINE_FUNCTION_H
#ifdef __cplusplus
extern "C" {
......@@ -24,6 +24,8 @@ extern "C" {
#include "tvariant.h"
#include "tbuffer.h"
#define MAX_INTERVAL_TIME_WINDOW 1000000 // maximum allowed time windows in final results
#define FUNCTION_SCALAR 1
#define FUNCTION_AGG 2
......@@ -184,6 +186,25 @@ typedef struct SResultDataInfo {
int32_t intermediateBytes;
} SResultDataInfo;
typedef struct SMultiFunctionsDesc {
bool stableQuery;
bool groupbyColumn;
bool simpleAgg;
bool arithmeticOnAgg;
bool projectionQuery;
bool hasFilter;
bool onlyTagQuery;
bool orderProjectQuery;
bool stateWindow;
bool globalMerge;
bool multigroupResult;
bool blockDistribution;
bool timewindow;
bool topbotQuery;
bool interpQuery;
} SMultiFunctionsDesc;
int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionId, int32_t param, SResultDataInfo* pInfo, int16_t extLength,
bool isSuperTable);
......@@ -199,8 +220,10 @@ bool qIsValidUdf(SArray* pUdfInfo, const char* name, int32_t len, int32_t* funct
const char* qGetFunctionName(int32_t functionId);
void extractFunctionDesc(SArray* pFunctionIdList, SMultiFunctionsDesc* pDesc);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_TFUNCTION_H
#endif // TDENGINE_FUNCTION_H
......@@ -24,6 +24,7 @@ extern "C" {
#include "common.h"
#include "tname.h"
#include "tvariant.h"
#include "function.h"
typedef struct SColumn {
uint64_t tableUid;
......@@ -130,20 +131,6 @@ typedef struct STableMetaInfo {
SArray *tagColList; // SArray<SColumn*>, involved tag columns
} STableMetaInfo;
typedef struct SQueryAttrInfo {
bool stableQuery;
bool groupbyColumn;
bool simpleAgg;
bool arithmeticOnAgg;
bool projectionQuery;
bool hasFilter;
bool onlyTagQuery;
bool orderProjectQuery;
bool stateWindow;
bool globalMerge;
bool multigroupResult;
} SQueryAttrInfo;
typedef struct SQueryStmtInfo {
int16_t command; // the command may be different for each subclause, so keep it seperately.
uint32_t type; // query/insert type
......@@ -177,7 +164,6 @@ typedef struct SQueryStmtInfo {
int32_t udColumnId; // current user-defined constant output field column id, monotonically decreases from TSDB_UD_COLUMN_INDEX
bool distinct; // distinct tag or not
bool onlyHasTagCond;
int32_t bufLen;
char* buf;
SArray *pUdfInfo;
......@@ -186,7 +172,7 @@ typedef struct SQueryStmtInfo {
SArray *pUpstream; // SArray<struct SQueryStmtInfo>
struct SQueryStmtInfo *pDownstream;
int32_t havingFieldNum;
SQueryAttrInfo info;
SMultiFunctionsDesc info;
} SQueryStmtInfo;
typedef struct SColumnIndex {
......
......@@ -22,11 +22,10 @@ extern "C" {
#include <stdint.h>
#include "taosdef.h"
#include "wal.h"
typedef int64_t SyncNodeId;
typedef int32_t SyncGroupId;
typedef int64_t SyncIndex;
typedef int32_t SyncNodeId;
typedef int32_t SyncGroupId;
typedef int64_t SyncIndex;
typedef uint64_t SSyncTerm;
typedef enum {
......@@ -41,41 +40,42 @@ typedef struct {
} SSyncBuffer;
typedef struct {
uint16_t nodePort; // node sync Port
char nodeFqdn[TSDB_FQDN_LEN]; // node FQDN
SyncNodeId nodeId;
uint16_t nodePort; // node sync Port
char nodeFqdn[TSDB_FQDN_LEN]; // node FQDN
} SNodeInfo;
typedef struct {
int selfIndex;
int nNode;
SNodeInfo* nodeInfo;
int32_t selfIndex;
int32_t replica;
SNodeInfo nodeInfo[TSDB_MAX_REPLICA];
} SSyncCluster;
typedef struct {
int32_t selfIndex;
int nNode;
SNodeInfo* node;
ESyncRole* role;
int32_t selfIndex;
int32_t replica;
SNodeInfo node[TSDB_MAX_REPLICA];
ESyncRole role[TSDB_MAX_REPLICA];
} SNodesRole;
typedef struct SSyncFSM {
void* pData;
// apply committed log, bufs will be free by raft module
int (*applyLog)(struct SSyncFSM* fsm, SyncIndex index, const SSyncBuffer* buf, void* pData);
int32_t (*applyLog)(struct SSyncFSM* fsm, SyncIndex index, const SSyncBuffer* buf, void* pData);
// cluster commit callback
int (*onClusterChanged)(struct SSyncFSM* fsm, const SSyncCluster* cluster, void* pData);
int32_t (*onClusterChanged)(struct SSyncFSM* fsm, const SSyncCluster* cluster, void* pData);
// fsm return snapshot in ppBuf, bufs will be free by raft module
// TODO: getSnapshot SHOULD be async?
int (*getSnapshot)(struct SSyncFSM* fsm, SSyncBuffer** ppBuf, int* objId, bool* isLast);
int32_t (*getSnapshot)(struct SSyncFSM* fsm, SSyncBuffer** ppBuf, int32_t* objId, bool* isLast);
// fsm apply snapshot with pBuf data
int (*applySnapshot)(struct SSyncFSM* fsm, SSyncBuffer* pBuf, int objId, bool isLast);
int32_t (*applySnapshot)(struct SSyncFSM* fsm, SSyncBuffer* pBuf, int32_t objId, bool isLast);
// call when restore snapshot and log done
int (*onRestoreDone)(struct SSyncFSM* fsm);
int32_t (*onRestoreDone)(struct SSyncFSM* fsm);
void (*onRollback)(struct SSyncFSM* fsm, SyncIndex index, const SSyncBuffer* buf);
......@@ -83,52 +83,79 @@ typedef struct SSyncFSM {
} SSyncFSM;
typedef struct SSyncLogStore {
void* pData;
// write log with given index
int32_t (*logWrite)(struct SSyncLogStore* logStore, SyncIndex index, SSyncBuffer* pBuf);
// mark log with given index has been commtted
int32_t (*logCommit)(struct SSyncLogStore* logStore, SyncIndex index);
// prune log before given index
int32_t (*logPrune)(struct SSyncLogStore* logStore, SyncIndex index);
// rollback log after given index
int32_t (*logRollback)(struct SSyncLogStore* logStore, SyncIndex index);
} SSyncLogStore;
typedef struct SSyncServerState {
SNodeInfo voteFor;
SSyncTerm term;
SyncNodeId voteFor;
SSyncTerm term;
} SSyncServerState;
typedef struct SSyncClusterConfig {
// Log index number of current cluster config.
SyncIndex index;
// Log index number of previous cluster config.
SyncIndex prevIndex;
// current cluster
const SSyncCluster* cluster;
} SSyncClusterConfig;
typedef struct SStateManager {
void* pData;
void (*saveServerState)(struct SStateManager* stateMng, const SSyncServerState* state);
int32_t (*saveServerState)(struct SStateManager* stateMng, SSyncServerState* state);
const SSyncServerState* (*readServerState)(struct SStateManager* stateMng);
int32_t (*readServerState)(struct SStateManager* stateMng, SSyncServerState* state);
void (*saveCluster)(struct SStateManager* stateMng, const SSyncCluster* cluster);
// void (*saveCluster)(struct SStateManager* stateMng, const SSyncClusterConfig* cluster);
const SSyncCluster* (*readCluster)(struct SStateManager* stateMng);
// const SSyncClusterConfig* (*readCluster)(struct SStateManager* stateMng);
} SStateManager;
typedef struct {
SyncGroupId vgId;
twalh walHandle;
SyncIndex snapshotIndex;
SSyncCluster syncCfg;
SSyncFSM fsm;
SyncGroupId vgId;
SyncIndex snapshotIndex;
SSyncCluster syncCfg;
SSyncFSM fsm;
SSyncLogStore logStore;
SStateManager stateManager;
} SSyncInfo;
struct SSyncNode;
typedef struct SSyncNode SSyncNode;
int32_t syncInit();
void syncCleanUp();
SyncNodeId syncStart(const SSyncInfo*);
void syncStop(SyncNodeId);
SSyncNode* syncStart(const SSyncInfo*);
void syncReconfig(const SSyncNode*, const SSyncCluster*);
void syncStop(const SSyncNode*);
int32_t syncPropose(SyncNodeId nodeId, SSyncBuffer buffer, void* pData, bool isWeak);
int32_t syncPropose(SSyncNode* syncNode, SSyncBuffer buffer, void* pData, bool isWeak);
int32_t syncAddNode(SyncNodeId nodeId, const SNodeInfo *pNode);
// int32_t syncAddNode(SSyncNode syncNode, const SNodeInfo *pNode);
int32_t syncRemoveNode(SyncNodeId nodeId, const SNodeInfo *pNode);
// int32_t syncRemoveNode(SSyncNode syncNode, const SNodeInfo *pNode);
extern int32_t syncDebugFlag;
extern int32_t syncDebugFlag;
#ifdef __cplusplus
}
#endif
#endif /*_TD_LIBS_SYNC_H*/
#endif /*_TD_LIBS_SYNC_H*/
......@@ -44,41 +44,41 @@ typedef struct {
EWalType walLevel; // wal level
} SWalCfg;
typedef void * twalh; // WAL HANDLE
typedef int32_t FWalWrite(void *ahandle, void *pHead, int32_t qtype, void *pMsg);
struct SWal;
typedef struct SWal SWal; // WAL HANDLE
typedef int32_t (*FWalWrite)(void *ahandle, void *pHead, int32_t qtype, void *pMsg);
//module initialization
int32_t walInit();
void walCleanUp();
// module initialization
int32_t walInit();
void walCleanUp();
//handle open and ctl
twalh walOpen(char *path, SWalCfg *pCfg);
int32_t walAlter(twalh, SWalCfg *pCfg);
void walStop(twalh);
void walClose(twalh);
// handle open and ctl
SWal *walOpen(char *path, SWalCfg *pCfg);
int32_t walAlter(SWal *, SWalCfg *pCfg);
void walClose(SWal *);
//write
//int64_t walWriteWithMsgType(twalh, int8_t msgType, void* body, int32_t bodyLen);
int64_t walWrite(twalh, void* body, int32_t bodyLen);
int64_t walWriteBatch(twalh, void** bodies, int32_t* bodyLen, int32_t batchSize);
// write
// int64_t walWriteWithMsgType(SWal*, int8_t msgType, void* body, int32_t bodyLen);
int64_t walWrite(SWal *, int64_t index, void *body, int32_t bodyLen);
int64_t walWriteBatch(SWal *, void **bodies, int32_t *bodyLen, int32_t batchSize);
//apis for lifecycle management
void walFsync(twalh, bool force);
int32_t walCommit(twalh, int64_t ver);
//truncate after
int32_t walRollback(twalh, int64_t ver);
//notify that previous log can be pruned safely
int32_t walPrune(twalh, int64_t ver);
// apis for lifecycle management
void walFsync(SWal *, bool force);
int32_t walCommit(SWal *, int64_t ver);
// truncate after
int32_t walRollback(SWal *, int64_t ver);
// notify that previous log can be pruned safely
int32_t walPrune(SWal *, int64_t ver);
//read
int32_t walRead(twalh, SWalHead **, int64_t ver);
int32_t walReadWithFp(twalh, FWalWrite writeFp, int64_t verStart, int readNum);
// read
int32_t walRead(SWal *, SWalHead **, int64_t ver);
int32_t walReadWithFp(SWal *, FWalWrite writeFp, int64_t verStart, int32_t readNum);
//lifecycle check
int32_t walFirstVer(twalh);
int32_t walPersistedVer(twalh);
int32_t walLastVer(twalh);
//int32_t walDataCorrupted(twalh);
// lifecycle check
int32_t walFirstVer(SWal *);
int32_t walPersistedVer(SWal *);
int32_t walLastVer(SWal *);
// int32_t walDataCorrupted(SWal*);
#ifdef __cplusplus
}
......
......@@ -67,6 +67,11 @@ void dnodeSendRedirectMsg(struct SRpcMsg *rpcMsg, bool forShell);
*/
void dnodeGetEp(int32_t dnodeId, char *ep, char *fqdn, uint16_t *port);
/**
* Report the startup progress.
*/
void dnodeReportStartup(char *name, char *desc);
#ifdef __cplusplus
}
#endif
......
......@@ -46,6 +46,11 @@ typedef struct {
*/
void (*GetDnodeEp)(int32_t dnodeId, char *ep, char *fqdn, uint16_t *port);
/**
* Report the startup progress.
*/
void (*ReportStartup)(char *name, char *desc);
} SVnodeFp;
typedef struct {
......
......@@ -233,11 +233,11 @@ int32_t* taosGetErrno();
#define TSDB_CODE_VND_NO_SUCH_FILE_OR_DIR TAOS_DEF_ERROR_CODE(0, 0x0507) //"Missing data file")
#define TSDB_CODE_VND_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0508) //"Out of memory")
#define TSDB_CODE_VND_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x0509) //"Unexpected generic error in vnode")
#define TSDB_CODE_VND_INVALID_VRESION_FILE TAOS_DEF_ERROR_CODE(0, 0x050A) //"Invalid version file")
#define TSDB_CODE_VND_IS_FULL TAOS_DEF_ERROR_CODE(0, 0x050B) //"Database memory is full for commit failed")
#define TSDB_CODE_VND_IS_FLOWCTRL TAOS_DEF_ERROR_CODE(0, 0x050C) //"Database memory is full for waiting commit")
#define TSDB_CODE_VND_INVALID_CFG_FILE TAOS_DEF_ERROR_CODE(0, 0x050A) //"Invalid config file)
#define TSDB_CODE_VND_INVALID_TERM_FILE TAOS_DEF_ERROR_CODE(0, 0x050B) //"Invalid term file")
#define TSDB_CODE_VND_IS_FLOWCTRL TAOS_DEF_ERROR_CODE(0, 0x050C) //"Database memory is full")
#define TSDB_CODE_VND_IS_DROPPING TAOS_DEF_ERROR_CODE(0, 0x050D) //"Database is dropping")
#define TSDB_CODE_VND_IS_BALANCING TAOS_DEF_ERROR_CODE(0, 0x050E) //"Database is balancing")
#define TSDB_CODE_VND_IS_UPDATING TAOS_DEF_ERROR_CODE(0, 0x050E) //"Database is updating")
#define TSDB_CODE_VND_IS_CLOSING TAOS_DEF_ERROR_CODE(0, 0x0510) //"Database is closing")
#define TSDB_CODE_VND_NOT_SYNCED TAOS_DEF_ERROR_CODE(0, 0x0511) //"Database suspended")
#define TSDB_CODE_VND_NO_WRITE_AUTH TAOS_DEF_ERROR_CODE(0, 0x0512) //"Database write operation denied")
......
......@@ -303,7 +303,7 @@ do { \
#define TSDB_MAX_FIELD_LEN 16384
#define TSDB_MAX_BINARY_LEN (TSDB_MAX_FIELD_LEN-TSDB_KEYSIZE) // keep 16384
#define TSDB_MAX_NCHAR_LEN (TSDB_MAX_FIELD_LEN-TSDB_KEYSIZE) // keep 16384
#define PRIMARYKEY_TIMESTAMP_COL_INDEX 0
#define PRIMARYKEY_TIMESTAMP_COL_ID 0
#define TSDB_MAX_RPC_THREADS 5
......@@ -382,6 +382,9 @@ do { \
#define TSDB_DATA_TYPE_UINT 13 // 4 bytes
#define TSDB_DATA_TYPE_UBIGINT 14 // 8 bytes
enum { TRANS_STAT_INIT = 0, TRANS_STAT_EXECUTING, TRANS_STAT_EXECUTED, TRANS_STAT_ROLLBACKING, TRANS_STAT_ROLLBACKED };
enum { TRANS_OPER_INIT = 0, TRANS_OPER_EXECUTE, TRANS_OPER_ROLLBACK };
#ifdef __cplusplus
}
#endif
......
......@@ -48,7 +48,7 @@
* An encoding of midnight at the end of the day as 24:00:00 - ie. midnight
* tomorrow - (allowable under ISO 8601) is supported.
*/
int64_t user_mktime64(const unsigned int year0, const unsigned int mon0,
static int64_t user_mktime64(const unsigned int year0, const unsigned int mon0,
const unsigned int day, const unsigned int hour,
const unsigned int min, const unsigned int sec, int64_t time_zone)
{
......@@ -79,19 +79,18 @@ void deltaToUtcInitOnce() {
(void)strptime("1970-01-01 00:00:00", (const char *)("%Y-%m-%d %H:%M:%S"), &tm);
m_deltaUtc = (int64_t)mktime(&tm);
//printf("====delta:%lld\n\n", seconds);
return;
}
static int64_t parseFraction(char* str, char** end, int32_t timePrec);
static int32_t parseTimeWithTz(char* timestr, int64_t* time, int32_t timePrec, char delim);
static int32_t parseLocaltime(char* timestr, int64_t* time, int32_t timePrec);
static int32_t parseLocaltimeWithDst(char* timestr, int64_t* time, int32_t timePrec);
static int32_t parseLocaltimeDst(char* timestr, int64_t* time, int32_t timePrec);
static char* forwardToTimeStringEnd(char* str);
static bool checkTzPresent(char *str, int32_t len);
static int32_t (*parseLocaltimeFp[]) (char* timestr, int64_t* time, int32_t timePrec) = {
parseLocaltime,
parseLocaltimeWithDst
parseLocaltimeDst
};
int32_t taosParseTime(char* timestr, int64_t* time, int32_t len, int32_t timePrec, int8_t day_light) {
......@@ -116,8 +115,8 @@ bool checkTzPresent(char *str, int32_t len) {
}
c--;
}
return false;
return false;
}
char* forwardToTimeStringEnd(char* str) {
......@@ -344,7 +343,7 @@ int32_t parseLocaltime(char* timestr, int64_t* time, int32_t timePrec) {
return 0;
}
int32_t parseLocaltimeWithDst(char* timestr, int64_t* time, int32_t timePrec) {
int32_t parseLocaltimeDst(char* timestr, int64_t* time, int32_t timePrec) {
*time = 0;
struct tm tm = {0};
tm.tm_isdst = -1;
......
......@@ -7,6 +7,6 @@ target_include_directories(
)
target_link_libraries(
function
PRIVATE os util common
function
PRIVATE os util common
)
\ No newline at end of file
......@@ -56,8 +56,6 @@ typedef struct SResultRowCellInfo {
#define QUERY_DESC_FORWARD_STEP -1
#define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) == TSDB_ORDER_ASC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP)
#define MAX_INTERVAL_TIME_WINDOW 1000000 // maximum allowed time windows in final results
#define TOP_BOTTOM_QUERY_LIMIT 100
enum {
......
......@@ -513,7 +513,7 @@ static void count_func_merge(SQLFunctionCtx *pCtx) {
* @return
*/
int32_t countRequired(SQLFunctionCtx *pCtx, STimeWindow* w, int32_t colId) {
if (colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
if (colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
return BLK_DATA_NO_NEEDED;
} else {
return BLK_DATA_STATIS_NEEDED;
......@@ -2303,10 +2303,10 @@ static void top_bottom_func_finalizer(SQLFunctionCtx *pCtx) {
tValuePair **tvp = pRes->res;
// user specify the order of output by sort the result according to timestamp
if (pCtx->param[1].i64 == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
if (pCtx->param[1].i64 == PRIMARYKEY_TIMESTAMP_COL_ID) {
__compar_fn_t comparator = (pCtx->param[2].i64 == TSDB_ORDER_ASC) ? resAscComparFn : resDescComparFn;
qsort(tvp, (size_t)pResInfo->numOfRes, POINTER_BYTES, comparator);
} else /*if (pCtx->param[1].i64 > PRIMARYKEY_TIMESTAMP_COL_INDEX)*/ {
} else /*if (pCtx->param[1].i64 > PRIMARYKEY_TIMESTAMP_COL_ID)*/ {
__compar_fn_t comparator = (pCtx->param[2].i64 == TSDB_ORDER_ASC) ? resDataAscComparFn : resDataDescComparFn;
qsort(tvp, (size_t)pResInfo->numOfRes, POINTER_BYTES, comparator);
}
......
......@@ -128,18 +128,26 @@ static void doExprTreeDestroy(tExprNode **pExpr, void (*fp)(void *)) {
if (*pExpr == NULL) {
return;
}
if ((*pExpr)->nodeType == TEXPR_BINARYEXPR_NODE) {
int32_t type = (*pExpr)->nodeType;
if (type == TEXPR_BINARYEXPR_NODE) {
doExprTreeDestroy(&(*pExpr)->_node.pLeft, fp);
doExprTreeDestroy(&(*pExpr)->_node.pRight, fp);
if (fp != NULL) {
fp((*pExpr)->_node.info);
}
} else if ((*pExpr)->nodeType == TEXPR_VALUE_NODE) {
} else if (type == TEXPR_UNARYEXPR_NODE) {
doExprTreeDestroy(&(*pExpr)->_node.pLeft, fp);
if (fp != NULL) {
fp((*pExpr)->_node.info);
}
assert((*pExpr)->_node.pRight == NULL);
} else if (type == TEXPR_VALUE_NODE) {
taosVariantDestroy((*pExpr)->pVal);
free((*pExpr)->pVal);
} else if ((*pExpr)->nodeType == TEXPR_COL_NODE) {
} else if (type == TEXPR_COL_NODE) {
free((*pExpr)->pSchema);
}
......
......@@ -52,7 +52,7 @@ bool isTagsQuery(SArray* pFunctionIdList) {
int16_t f = *(int16_t*) taosArrayGet(pFunctionIdList, i);
// "select count(tbname)" query
// if (functId == FUNCTION_COUNT && pExpr->base.colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) {
// if (functId == FUNCTION_COUNT && pExpr->base.colpDesc->colId == TSDB_TBNAME_COLUMN_INDEX) {
// continue;
// }
......@@ -80,19 +80,6 @@ bool isTagsQuery(SArray* pFunctionIdList) {
// return false;
//}
bool isBlockInfoQuery(SArray* pFunctionIdList) {
int32_t num = (int32_t) taosArrayGetSize(pFunctionIdList);
for (int32_t i = 0; i < num; ++i) {
int32_t f = *(int16_t*) taosArrayGet(pFunctionIdList, i);
if (f == FUNCTION_BLKINFO) {
return true;
}
}
return false;
}
bool isProjectionQuery(SArray* pFunctionIdList) {
int32_t num = (int32_t) taosArrayGetSize(pFunctionIdList);
for (int32_t i = 0; i < num; ++i) {
......@@ -101,8 +88,12 @@ bool isProjectionQuery(SArray* pFunctionIdList) {
continue;
}
if (f != FUNCTION_PRJ && f != FUNCTION_TAGPRJ && f != FUNCTION_TAG &&
f != FUNCTION_TS && f != FUNCTION_ARITHM && f != FUNCTION_DIFF &&
if (f != FUNCTION_PRJ &&
f != FUNCTION_TAGPRJ &&
f != FUNCTION_TAG &&
f != FUNCTION_TS &&
f != FUNCTION_ARITHM &&
f != FUNCTION_DIFF &&
f != FUNCTION_DERIVATIVE) {
return false;
}
......@@ -111,7 +102,7 @@ bool isProjectionQuery(SArray* pFunctionIdList) {
return true;
}
bool isDiffDerivQuery(SArray* pFunctionIdList) {
bool isDiffDerivativeQuery(SArray* pFunctionIdList) {
int32_t num = (int32_t) taosArrayGetSize(pFunctionIdList);
for (int32_t i = 0; i < num; ++i) {
int32_t f = *(int16_t*) taosArrayGet(pFunctionIdList, i);
......@@ -127,7 +118,7 @@ bool isDiffDerivQuery(SArray* pFunctionIdList) {
return false;
}
bool isPointInterpQuery(SArray* pFunctionIdList) {
bool isInterpQuery(SArray* pFunctionIdList) {
int32_t num = (int32_t) taosArrayGetSize(pFunctionIdList);
for (int32_t i = 0; i < num; ++i) {
int32_t f = *(int16_t*) taosArrayGet(pFunctionIdList, i);
......@@ -264,8 +255,6 @@ bool needReverseScan(SArray* pFunctionIdList) {
}
bool isSimpleAggregateRv(SArray* pFunctionIdList) {
assert(0);
// if (pQueryInfo->interval.interval > 0 || pQueryInfo->sessionWindow.gap > 0) {
// return false;
// }
......@@ -380,33 +369,17 @@ bool isProjectionQueryOnSTable(SArray* pFunctionIdList, int32_t tableIndex) {
}
bool hasTagValOutput(SArray* pFunctionIdList) {
// size_t numOfExprs = getNumOfExprs(pQueryInfo);
// SExprInfo* pExpr1 = getExprInfo(pQueryInfo, 0);
//
// if (numOfExprs == 1 && pExpr1->base.functionId == FUNCTION_TS_COMP) {
size_t size = taosArrayGetSize(pFunctionIdList);
// if (numOfExprs == 1 && pExpr1->base.functionId == FUNCTION_TS_COMP) {
// return true;
// }
//
// for (int32_t i = 0; i < numOfExprs; ++i) {
// SExprInfo* pExpr = getExprInfo(pQueryInfo, i);
// if (pExpr == NULL) {
// continue;
// }
//
// // ts_comp column required the tag value for join filter
// if (TSDB_COL_IS_TAG(pExpr->base.colInfo.flag)) {
// return true;
// }
// }
return false;
}
for (int32_t i = 0; i < size; ++i) {
int32_t functionId = *(int16_t*) taosArrayGet(pFunctionIdList, i);
bool timeWindowInterpoRequired(SArray* pFunctionIdList) {
int32_t num = (int32_t) taosArrayGetSize(pFunctionIdList);
for (int32_t i = 0; i < num; ++i) {
int32_t f = *(int16_t*) taosArrayGet(pFunctionIdList, i);
if (f == FUNCTION_TWA || f == FUNCTION_INTERP) {
// ts_comp column required the tag value for join filter
if (functionId == FUNCTION_TAG || functionId == FUNCTION_TAGPRJ) {
return true;
}
}
......@@ -414,8 +387,28 @@ bool timeWindowInterpoRequired(SArray* pFunctionIdList) {
return false;
}
//SQueryAttrInfo setQueryType(SArray* pFunctionIdList) {
// assert(pFunctionIdList != NULL);
//
//bool timeWindowInterpoRequired(SArray* pFunctionIdList) {
// int32_t num = (int32_t) taosArrayGetSize(pFunctionIdList);
// for (int32_t i = 0; i < num; ++i) {
// int32_t f = *(int16_t*) taosArrayGet(pFunctionIdList, i);
// if (f == FUNCTION_TWA || f == FUNCTION_INTERP) {
// return true;
// }
// }
//
//}
\ No newline at end of file
// return false;
//}
void extractFunctionDesc(SArray* pFunctionIdList, SMultiFunctionsDesc* pDesc) {
assert(pFunctionIdList != NULL);
pDesc->blockDistribution = isBlockDistQuery(pFunctionIdList);
if (pDesc->blockDistribution) {
return;
}
pDesc->projectionQuery = isProjectionQuery(pFunctionIdList);
pDesc->onlyTagQuery = isTagsQuery(pFunctionIdList);
pDesc->interpQuery = isInterpQuery(pFunctionIdList);
}
......@@ -298,7 +298,7 @@ void* destroyCreateTableSql(SCreateTableSql* pCreate);
void setDropFuncInfo(SSqlInfo *pInfo, int32_t type, SToken* pToken);
void setCreateFuncInfo(SSqlInfo *pInfo, int32_t type, SToken *pName, SToken *pPath, SField *output, SToken* bufSize, int32_t funcType);
void SqlInfoDestroy(SSqlInfo *pInfo);
void destroySqlInfo(SSqlInfo *pInfo);
void setDCLSqlElems(SSqlInfo *pInfo, int32_t type, int32_t nParams, ...);
void setDropDbTableInfo(SSqlInfo *pInfo, int32_t type, SToken* pToken, SToken* existsCheck,int16_t dbType,int16_t tableType);
......
......@@ -73,7 +73,9 @@ int32_t evaluateSqlNode(SSqlNode* pNode, int32_t tsPrecision, SMsgBuf* pMsgBuf);
int32_t validateSqlNode(SSqlNode* pSqlNode, SQueryStmtInfo* pQueryInfo, SMsgBuf* pMsgBuf);
void initQueryInfo(SQueryStmtInfo* pQueryInfo);
SQueryStmtInfo* createQueryInfo();
void destroyQueryInfo(SQueryStmtInfo* pQueryInfo);
int32_t checkForInvalidExpr(SQueryStmtInfo* pQueryInfo, SMsgBuf* pMsgBuf);
......@@ -87,6 +89,12 @@ int32_t checkForInvalidExpr(SQueryStmtInfo* pQueryInfo, SMsgBuf* pMsgBuf);
*/
int32_t qParserExtractRequestedMetaInfo(const SSqlInfo* pSqlInfo, SMetaReq* pMetaInfo, char* msg, int32_t msgBufLen);
/**
* Destroy the meta data request structure.
* @param pMetaInfo
*/
void qParserClearupMetaRequestInfo(SMetaReq* pMetaInfo);
#ifdef __cplusplus
}
#endif
......
......@@ -30,9 +30,7 @@ SSchema *getTableTagSchema(const STableMeta* pTableMeta);
SSchema *getOneColumnSchema(const STableMeta* pTableMeta, int32_t colIndex);
size_t getNumOfExprs(SQueryStmtInfo* pQueryInfo);
//SExprInfo* createExprInfo(STableMetaInfo* pTableMetaInfo, int16_t functionId, SColumnIndex* pColIndex, struct tExprNode* pParamExpr, SSchema* pResSchema, int16_t interSize);
SExprInfo* createBinaryExprInfo(struct tExprNode* pNode, SSchema* pResSchema);
void destroyExprInfoList();
void addExprInfo(SQueryStmtInfo* pQueryInfo, int32_t index, SExprInfo* pExprInfo);
void updateExprInfo(SExprInfo* pExprInfo, int16_t functionId, int32_t colId, int16_t srcColumnIndex, int16_t resType, int16_t resSize);
......@@ -42,9 +40,11 @@ int32_t copyAllExprInfo(SArray* dst, const SArray* src, bool deepcopy);
void addExprInfoParam(SSqlExpr* pExpr, char* argument, int32_t type, int32_t bytes);
int32_t getExprFunctionId(SExprInfo *pExprInfo);
void cleanupFieldInfo(SFieldInfo* pFieldInfo);
STableComInfo getTableInfo(const STableMeta* pTableMeta);
SArray* extractFunctionIdList(SArray* pExprInfoList);
#ifdef __cplusplus
}
......
......@@ -242,7 +242,6 @@ tSqlExpr *tSqlExprClone(tSqlExpr *pSrc) {
}
void tSqlExprCompact(tSqlExpr **pExpr) {
if (*pExpr == NULL || tSqlExprIsParentOfLeaf(*pExpr)) {
return;
}
......@@ -770,8 +769,11 @@ void setCreateFuncInfo(SSqlInfo *pInfo, int32_t type, SToken *pName, SToken *pPa
}
}
void SqlInfoDestroy(SSqlInfo *pInfo) {
if (pInfo == NULL) return;;
void destroySqlInfo(SSqlInfo *pInfo) {
if (pInfo == NULL) {
return;
}
taosArrayDestroy(pInfo->funcs);
if (pInfo->type == TSDB_SQL_SELECT) {
destroyAllSqlNode(pInfo->list);
......
......@@ -185,4 +185,13 @@ int32_t qParserExtractRequestedMetaInfo(const SSqlInfo* pSqlInfo, SMetaReq* pMet
}
return code;
}
\ No newline at end of file
}
void qParserClearupMetaRequestInfo(SMetaReq* pMetaReq) {
if (pMetaReq == NULL) {
return;
}
taosArrayDestroy(pMetaReq->pTableName);
taosArrayDestroy(pMetaReq->pUdf);
}
......@@ -576,13 +576,6 @@ TAOS_FIELD* getFieldInfo(SFieldInfo* pFieldInfo, int32_t index) {
return &((SInternalField*)TARRAY_GET_ELEM(pFieldInfo->internalField, index))->field;
}
int16_t getFieldInfoOffset(SQueryStmtInfo* pQueryInfo, int32_t index) {
SInternalField* pInfo = getInternalField(&pQueryInfo->fieldsInfo, index);
assert(pInfo != NULL && pInfo->pExpr->pExpr == NULL);
return 0;
// return pInfo->pExpr->base.offset;
}
int32_t fieldInfoCompare(const SFieldInfo* pFieldInfo1, const SFieldInfo* pFieldInfo2, int32_t *diffSize) {
assert(pFieldInfo1 != NULL && pFieldInfo2 != NULL);
......@@ -780,8 +773,8 @@ SColumn* columnListInsert(SArray* pColumnList, int32_t columnIndex, uint64_t uid
}
SColumn* insertPrimaryTsColumn(SArray* pColumnList, uint64_t tableUid) {
SSchema s = {.type = TSDB_DATA_TYPE_TIMESTAMP, .bytes = TSDB_KEYSIZE, .colId = PRIMARYKEY_TIMESTAMP_COL_INDEX};
return columnListInsert(pColumnList, PRIMARYKEY_TIMESTAMP_COL_INDEX, tableUid, &s);
SSchema s = {.type = TSDB_DATA_TYPE_TIMESTAMP, .bytes = TSDB_KEYSIZE, .colId = PRIMARYKEY_TIMESTAMP_COL_ID};
return columnListInsert(pColumnList, PRIMARYKEY_TIMESTAMP_COL_ID, tableUid, &s);
}
void columnCopy(SColumn* pDest, const SColumn* pSrc);
......
#include "queryInfoUtil.h"
#include <function.h>
#include "astGenerator.h"
#include "function.h"
#include "os.h"
......@@ -55,7 +56,6 @@ SSchema* getTableTagSchema(const STableMeta* pTableMeta) {
}
static tExprNode* createUnaryFunctionExprNode(int32_t functionId, SSchema* pSchema, tExprNode* pColumnNode) {
if (pColumnNode == NULL) {
pColumnNode = calloc(1, sizeof(tExprNode));
pColumnNode->nodeType = TEXPR_COL_NODE;
......@@ -167,6 +167,10 @@ SExprInfo* getExprInfo(SQueryStmtInfo* pQueryInfo, int32_t index) {
void destroyExprInfo(SExprInfo* pExprInfo) {
tExprTreeDestroy(pExprInfo->pExpr, NULL);
for(int32_t i = 0; i < pExprInfo->base.numOfParams; ++i) {
taosVariantDestroy(&pExprInfo->base.param[i]);
}
tfree(pExprInfo);
}
......@@ -192,6 +196,11 @@ void addExprInfoParam(SSqlExpr* pExpr, char* argument, int32_t type, int32_t byt
assert(pExpr->numOfParams <= 3);
}
int32_t getExprFunctionId(SExprInfo *pExprInfo) {
assert(pExprInfo != NULL && pExprInfo->pExpr != NULL && pExprInfo->pExpr->nodeType == TEXPR_UNARYEXPR_NODE);
return pExprInfo->pExpr->_node.functionId;
}
void assignExprInfo(SExprInfo* dst, const SExprInfo* src) {
assert(dst != NULL && src != NULL);
......@@ -284,62 +293,11 @@ int32_t getResRowLength(SArray* pExprList) {
return size;
}
static void freeQueryInfoImpl(SQueryStmtInfo* pQueryInfo) {
cleanupTagCond(&pQueryInfo->tagCond);
cleanupColumnCond(&pQueryInfo->colCond);
cleanupFieldInfo(&pQueryInfo->fieldsInfo);
dropAllExprInfo(pQueryInfo->exprList);
pQueryInfo->exprList = NULL;
if (pQueryInfo->exprList1 != NULL) {
dropAllExprInfo(pQueryInfo->exprList1);
pQueryInfo->exprList1 = NULL;
}
columnListDestroy(pQueryInfo->colList);
pQueryInfo->colList = NULL;
if (pQueryInfo->groupbyExpr.columnInfo != NULL) {
taosArrayDestroy(pQueryInfo->groupbyExpr.columnInfo);
pQueryInfo->groupbyExpr.columnInfo = NULL;
}
pQueryInfo->fillType = 0;
tfree(pQueryInfo->fillVal);
tfree(pQueryInfo->buf);
taosArrayDestroy(pQueryInfo->pUpstream);
pQueryInfo->pUpstream = NULL;
pQueryInfo->bufLen = 0;
}
void freeQueryInfo(SQueryStmtInfo* pQueryInfo, bool removeCachedMeta, uint64_t id) {
while(pQueryInfo != NULL) {
SQueryStmtInfo* p = pQueryInfo->sibling;
size_t numOfUpstream = taosArrayGetSize(pQueryInfo->pUpstream);
for(int32_t i = 0; i < numOfUpstream; ++i) {
SQueryStmtInfo* pUpQueryInfo = taosArrayGetP(pQueryInfo->pUpstream, i);
freeQueryInfoImpl(pUpQueryInfo);
clearAllTableMetaInfo(pUpQueryInfo, removeCachedMeta, id);
tfree(pUpQueryInfo);
}
freeQueryInfoImpl(pQueryInfo);
clearAllTableMetaInfo(pQueryInfo, removeCachedMeta, id);
tfree(pQueryInfo);
pQueryInfo = p;
}
}
SArray* extractFunctionIdList(SArray* pExprInfoList) {
assert(pExprInfoList != NULL);
size_t len = taosArrayGetSize(pExprInfoList);
SArray* p = taosArrayInit(len, sizeof(int16_t));
SArray* p = taosArrayInit(len, sizeof(int32_t));
for(int32_t i = 0; i < len; ++i) {
SExprInfo* pExprInfo = taosArrayGetP(pExprInfoList, i);
taosArrayPush(p, &pExprInfo->pExpr->_node.functionId);
......
......@@ -5,14 +5,14 @@ MESSAGE(STATUS "build parser unit test")
SET(CMAKE_CXX_STANDARD 11)
AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
ADD_EXECUTABLE(astTest ${SOURCE_LIST})
ADD_EXECUTABLE(parserTest ${SOURCE_LIST})
TARGET_LINK_LIBRARIES(
astTest
PUBLIC os util common parser catalog transport gtest
parserTest
PUBLIC os util common parser catalog transport gtest function
)
TARGET_INCLUDE_DIRECTORIES(
astTest
parserTest
PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/parser/"
PRIVATE "${CMAKE_SOURCE_DIR}/source/libs/parser/inc"
)
......@@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <function.h>
#include <gtest/gtest.h>
#include <iostream>
#pragma GCC diagnostic ignored "-Wwrite-strings"
......@@ -65,61 +66,64 @@ void setTableMetaInfo(SQueryStmtInfo* pQueryInfo, SMetaReq *req) {
}
}
//TEST(testCase, validateAST_test) {
// SSqlInfo info1 = doGenerateAST("select a a1111, a+b + 22, tbname from `t.1abc` where ts<now+2h and `col` < 20 + 99");
// ASSERT_EQ(info1.valid, true);
//
// char msg[128] = {0};
// SMsgBuf buf;
// buf.len = 128;
// buf.buf = msg;
//
// SSqlNode* pNode = (SSqlNode*) taosArrayGetP(((SArray*)info1.list), 0);
// int32_t code = evaluateSqlNode(pNode, TSDB_TIME_PRECISION_NANO, &buf);
// ASSERT_EQ(code, 0);
//
// SMetaReq req = {0};
// int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128);
// ASSERT_EQ(ret, 0);
// ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);
//
// SQueryStmtInfo* pQueryInfo = (SQueryStmtInfo*)calloc(1, sizeof(SQueryStmtInfo));
// initQueryInfo(pQueryInfo);
// setTableMetaInfo(pQueryInfo, &req);
//
// SSqlNode* pSqlNode = (SSqlNode*)taosArrayGetP(info1.list, 0);
// ret = validateSqlNode(pSqlNode, pQueryInfo, &buf);
//
// SArray* pExprList = pQueryInfo->exprList;
// ASSERT_EQ(taosArrayGetSize(pExprList), 3);
//
// SExprInfo* p1 = (SExprInfo*) taosArrayGetP(pExprList, 0);
// ASSERT_EQ(p1->base.uid, 110);
// ASSERT_EQ(p1->base.numOfParams, 0);
// ASSERT_EQ(p1->base.resSchema.type, TSDB_DATA_TYPE_INT);
// ASSERT_STRCASEEQ(p1->base.resSchema.name, "a1111");
// ASSERT_STRCASEEQ(p1->base.colInfo.name, "t.1abc.a");
TEST(testCase, validateAST_test) {
SSqlInfo info1 = doGenerateAST("select a a1111, a+b + 22, tbname from `t.1abc` where ts<now+2h and `col` < 20 + 99");
ASSERT_EQ(info1.valid, true);
char msg[128] = {0};
SMsgBuf buf;
buf.len = 128;
buf.buf = msg;
SSqlNode* pNode = (SSqlNode*) taosArrayGetP(((SArray*)info1.list), 0);
int32_t code = evaluateSqlNode(pNode, TSDB_TIME_PRECISION_NANO, &buf);
ASSERT_EQ(code, 0);
SMetaReq req = {0};
int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128);
ASSERT_EQ(ret, 0);
ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);
SQueryStmtInfo* pQueryInfo = createQueryInfo();
setTableMetaInfo(pQueryInfo, &req);
SSqlNode* pSqlNode = (SSqlNode*)taosArrayGetP(info1.list, 0);
ret = validateSqlNode(pSqlNode, pQueryInfo, &buf);
SArray* pExprList = pQueryInfo->exprList;
ASSERT_EQ(taosArrayGetSize(pExprList), 3);
SExprInfo* p1 = (SExprInfo*) taosArrayGetP(pExprList, 0);
ASSERT_EQ(p1->base.uid, 110);
ASSERT_EQ(p1->base.numOfParams, 0);
ASSERT_EQ(p1->base.resSchema.type, TSDB_DATA_TYPE_INT);
ASSERT_STRCASEEQ(p1->base.resSchema.name, "a1111");
ASSERT_STRCASEEQ(p1->base.colInfo.name, "t.1abc.a");
ASSERT_EQ(p1->base.colInfo.colId, 1);
ASSERT_EQ(p1->base.colInfo.flag, TSDB_COL_NORMAL);
ASSERT_STRCASEEQ(p1->base.token, "a");
ASSERT_EQ(taosArrayGetSize(pExprList), 3);
SExprInfo* p2 = (SExprInfo*) taosArrayGetP(pExprList, 1);
ASSERT_EQ(p2->base.uid, 0);
ASSERT_EQ(p2->base.numOfParams, 1); // it is the serialized binary string of expression.
ASSERT_EQ(p2->base.resSchema.type, TSDB_DATA_TYPE_DOUBLE);
ASSERT_STRCASEEQ(p2->base.resSchema.name, "a+b + 22");
// ASSERT_STRCASEEQ(p2->base.colInfo.name, "t.1abc.a");
// ASSERT_EQ(p1->base.colInfo.colId, 1);
// ASSERT_EQ(p1->base.colInfo.flag, TSDB_COL_NORMAL);
// ASSERT_STRCASEEQ(p1->base.token, "a");
//
// ASSERT_EQ(taosArrayGetSize(pExprList), 3);
//
// SExprInfo* p2 = (SExprInfo*) taosArrayGetP(pExprList, 1);
// ASSERT_EQ(p2->base.uid, 0);
// ASSERT_EQ(p2->base.numOfParams, 1); // it is the serialized binary string of expression.
// ASSERT_EQ(p2->base.resSchema.type, TSDB_DATA_TYPE_DOUBLE);
// ASSERT_STRCASEEQ(p2->base.resSchema.name, "a+b + 22");
//
//// ASSERT_STRCASEEQ(p2->base.colInfo.name, "t.1abc.a");
//// ASSERT_EQ(p1->base.colInfo.colId, 1);
//// ASSERT_EQ(p1->base.colInfo.flag, TSDB_COL_NORMAL);
// ASSERT_STRCASEEQ(p2->base.token, "a+b + 22");
//
// ASSERT_EQ(taosArrayGetSize(pQueryInfo->colList), 3);
// ASSERT_EQ(pQueryInfo->fieldsInfo.numOfOutput, 3);
//}
//
ASSERT_STRCASEEQ(p2->base.token, "a+b + 22");
ASSERT_EQ(taosArrayGetSize(pQueryInfo->colList), 3);
ASSERT_EQ(pQueryInfo->fieldsInfo.numOfOutput, 3);
destroyQueryInfo(pQueryInfo);
qParserClearupMetaRequestInfo(&req);
destroySqlInfo(&info1);
}
//TEST(testCase, function_Test) {
// SSqlInfo info1 = doGenerateAST("select count(a) from `t.1abc`");
// ASSERT_EQ(info1.valid, true);
......@@ -138,8 +142,7 @@ void setTableMetaInfo(SQueryStmtInfo* pQueryInfo, SMetaReq *req) {
// ASSERT_EQ(ret, 0);
// ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);
//
// SQueryStmtInfo* pQueryInfo = (SQueryStmtInfo*)calloc(1, sizeof(SQueryStmtInfo));
// initQueryInfo(pQueryInfo);
// SQueryStmtInfo* pQueryInfo = createQueryInfo();
// setTableMetaInfo(pQueryInfo, &req);
//
// SSqlNode* pSqlNode = (SSqlNode*)taosArrayGetP(info1.list, 0);
......@@ -161,6 +164,10 @@ void setTableMetaInfo(SQueryStmtInfo* pQueryInfo, SMetaReq *req) {
//
// ASSERT_EQ(taosArrayGetSize(pQueryInfo->colList), 2);
// ASSERT_EQ(pQueryInfo->fieldsInfo.numOfOutput, 1);
//
// destroyQueryInfo(pQueryInfo);
// qParserClearupMetaRequestInfo(&req);
// destroySqlInfo(&info1);
//}
//
//TEST(testCase, function_Test2) {
......@@ -181,8 +188,7 @@ void setTableMetaInfo(SQueryStmtInfo* pQueryInfo, SMetaReq *req) {
// ASSERT_EQ(ret, 0);
// ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);
//
// SQueryStmtInfo* pQueryInfo = (SQueryStmtInfo*)calloc(1, sizeof(SQueryStmtInfo));
// initQueryInfo(pQueryInfo);
// SQueryStmtInfo* pQueryInfo = createQueryInfo();
// setTableMetaInfo(pQueryInfo, &req);
//
// SSqlNode* pSqlNode = (SSqlNode*)taosArrayGetP(info1.list, 0);
......@@ -204,6 +210,10 @@ void setTableMetaInfo(SQueryStmtInfo* pQueryInfo, SMetaReq *req) {
//
// ASSERT_EQ(taosArrayGetSize(pQueryInfo->colList), 2);
// ASSERT_EQ(pQueryInfo->fieldsInfo.numOfOutput, 1);
//
// destroyQueryInfo(pQueryInfo);
// qParserClearupMetaRequestInfo(&req);
// destroySqlInfo(&info1);
//}
//
//TEST(testCase, function_Test3) {
......@@ -224,8 +234,7 @@ void setTableMetaInfo(SQueryStmtInfo* pQueryInfo, SMetaReq *req) {
// ASSERT_EQ(ret, 0);
// ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);
//
// SQueryStmtInfo* pQueryInfo = (SQueryStmtInfo*)calloc(1, sizeof(SQueryStmtInfo));
// initQueryInfo(pQueryInfo);
// SQueryStmtInfo* pQueryInfo = createQueryInfo();
// setTableMetaInfo(pQueryInfo, &req);
//
// SSqlNode* pSqlNode = (SSqlNode*)taosArrayGetP(info1.list, 0);
......@@ -246,6 +255,10 @@ void setTableMetaInfo(SQueryStmtInfo* pQueryInfo, SMetaReq *req) {
// ASSERT_EQ(p1->base.interBytes, 24);
//
// ASSERT_EQ(pQueryInfo->fieldsInfo.numOfOutput, 4);
//
// destroyQueryInfo(pQueryInfo);
// qParserClearupMetaRequestInfo(&req);
// destroySqlInfo(&info1);
//}
//
//TEST(testCase, function_Test4) {
......@@ -266,8 +279,7 @@ void setTableMetaInfo(SQueryStmtInfo* pQueryInfo, SMetaReq *req) {
// ASSERT_EQ(ret, 0);
// ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);
//
// SQueryStmtInfo* pQueryInfo = (SQueryStmtInfo*)calloc(1, sizeof(SQueryStmtInfo));
// initQueryInfo(pQueryInfo);
// SQueryStmtInfo* pQueryInfo = createQueryInfo();
// setTableMetaInfo(pQueryInfo, &req);
//
// SSqlNode* pSqlNode = (SSqlNode*)taosArrayGetP(info1.list, 0);
......@@ -289,6 +301,10 @@ void setTableMetaInfo(SQueryStmtInfo* pQueryInfo, SMetaReq *req) {
//
// ASSERT_EQ(taosArrayGetSize(pQueryInfo->colList), 1);
// ASSERT_EQ(pQueryInfo->fieldsInfo.numOfOutput, 1);
//
// destroyQueryInfo(pQueryInfo);
// qParserClearupMetaRequestInfo(&req);
// destroySqlInfo(&info1);
//}
//
//TEST(testCase, function_Test5) {
......@@ -309,8 +325,7 @@ void setTableMetaInfo(SQueryStmtInfo* pQueryInfo, SMetaReq *req) {
// ASSERT_EQ(ret, 0);
// ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);
//
// SQueryStmtInfo* pQueryInfo = (SQueryStmtInfo*)calloc(1, sizeof(SQueryStmtInfo));
// initQueryInfo(pQueryInfo);
// SQueryStmtInfo* pQueryInfo = createQueryInfo();
// setTableMetaInfo(pQueryInfo, &req);
//
// SSqlNode* pSqlNode = (SSqlNode*)taosArrayGetP(info1.list, 0);
......@@ -333,46 +348,63 @@ void setTableMetaInfo(SQueryStmtInfo* pQueryInfo, SMetaReq *req) {
//
// ASSERT_EQ(taosArrayGetSize(pQueryInfo->colList), 3);
// ASSERT_EQ(pQueryInfo->fieldsInfo.numOfOutput, 1);
//
// destroyQueryInfo(pQueryInfo);
// qParserClearupMetaRequestInfo(&req);
// destroySqlInfo(&info1);
//}
TEST(testCase, function_Test6) {
SSqlInfo info1 = doGenerateAST("select sum(a+b) as a1, first(b*a) from `t.1abc`");
ASSERT_EQ(info1.valid, true);
char msg[128] = {0};
SMsgBuf buf;
buf.len = 128;
buf.buf = msg;
SSqlNode* pNode = (SSqlNode*) taosArrayGetP(((SArray*)info1.list), 0);
int32_t code = evaluateSqlNode(pNode, TSDB_TIME_PRECISION_NANO, &buf);
ASSERT_EQ(code, 0);
SMetaReq req = {0};
int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128);
ASSERT_EQ(ret, 0);
ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);
SQueryStmtInfo* pQueryInfo = (SQueryStmtInfo*)calloc(1, sizeof(SQueryStmtInfo));
initQueryInfo(pQueryInfo);
setTableMetaInfo(pQueryInfo, &req);
SSqlNode* pSqlNode = (SSqlNode*)taosArrayGetP(info1.list, 0);
ret = validateSqlNode(pSqlNode, pQueryInfo, &buf);
ASSERT_EQ(ret, 0);
SArray* pExprList = pQueryInfo->exprList;
ASSERT_EQ(taosArrayGetSize(pExprList), 2);
SExprInfo* p1 = (SExprInfo*) taosArrayGetP(pExprList, 0);
ASSERT_EQ(p1->base.uid, 110);
ASSERT_EQ(p1->base.numOfParams, 0);
ASSERT_EQ(p1->base.resSchema.type, TSDB_DATA_TYPE_DOUBLE);
ASSERT_STRCASEEQ(p1->base.resSchema.name, "a1");
ASSERT_EQ(p1->base.colInfo.flag, TSDB_COL_NORMAL);
ASSERT_STRCASEEQ(p1->base.token, "sum(a+b)");
ASSERT_EQ(p1->base.interBytes, 16);
ASSERT_EQ(taosArrayGetSize(pQueryInfo->colList), 3);
ASSERT_EQ(pQueryInfo->fieldsInfo.numOfOutput, 2);
}
\ No newline at end of file
//
//TEST(testCase, function_Test6) {
// SSqlInfo info1 = doGenerateAST("select sum(a+b) as a1, first(b*a) from `t.1abc` interval(10s, 1s)");
// ASSERT_EQ(info1.valid, true);
//
// char msg[128] = {0};
// SMsgBuf buf;
// buf.len = 128;
// buf.buf = msg;
//
// SSqlNode* pNode = (SSqlNode*) taosArrayGetP(((SArray*)info1.list), 0);
// int32_t code = evaluateSqlNode(pNode, TSDB_TIME_PRECISION_NANO, &buf);
// ASSERT_EQ(code, 0);
//
// SMetaReq req = {0};
// int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128);
// ASSERT_EQ(ret, 0);
// ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);
//
// SQueryStmtInfo* pQueryInfo = createQueryInfo();
// setTableMetaInfo(pQueryInfo, &req);
//
// SSqlNode* pSqlNode = (SSqlNode*)taosArrayGetP(info1.list, 0);
// ret = validateSqlNode(pSqlNode, pQueryInfo, &buf);
// ASSERT_EQ(ret, 0);
//
// SArray* pExprList = pQueryInfo->exprList;
// ASSERT_EQ(taosArrayGetSize(pExprList), 2);
//
// SExprInfo* p1 = (SExprInfo*) taosArrayGetP(pExprList, 0);
// ASSERT_EQ(p1->base.uid, 110);
// ASSERT_EQ(p1->base.numOfParams, 0);
// ASSERT_EQ(p1->base.resSchema.type, TSDB_DATA_TYPE_DOUBLE);
// ASSERT_STRCASEEQ(p1->base.resSchema.name, "a1");
// ASSERT_EQ(p1->base.colInfo.flag, TSDB_COL_NORMAL);
// ASSERT_STRCASEEQ(p1->base.token, "sum(a+b)");
// ASSERT_EQ(p1->base.interBytes, 16);
// ASSERT_EQ(p1->pExpr->nodeType, TEXPR_UNARYEXPR_NODE);
// ASSERT_EQ(p1->pExpr->_node.functionId, FUNCTION_SUM);
// ASSERT_TRUE(p1->pExpr->_node.pRight == NULL);
//
// tExprNode* pParam = p1->pExpr->_node.pLeft;
//
// ASSERT_EQ(pParam->nodeType, TEXPR_BINARYEXPR_NODE);
// ASSERT_EQ(pParam->_node.optr, TSDB_BINARY_OP_ADD);
// ASSERT_EQ(pParam->_node.pLeft->nodeType, TEXPR_COL_NODE);
// ASSERT_EQ(pParam->_node.pRight->nodeType, TEXPR_COL_NODE);
//
// ASSERT_EQ(taosArrayGetSize(pQueryInfo->colList), 3);
// ASSERT_EQ(pQueryInfo->fieldsInfo.numOfOutput, 2);
//
// destroyQueryInfo(pQueryInfo);
// qParserClearupMetaRequestInfo(&req);
// destroySqlInfo(&info1);
//}
\ No newline at end of file
......@@ -667,51 +667,59 @@ TEST(testCase, isValidNumber_test) {
EXPECT_EQ(tGetNumericStringType(&t1), TK_FLOAT);
}
TEST(testCase, generateAST_test) {
SSqlInfo info = doGenerateAST("select * from t1 where ts < now");
ASSERT_EQ(info.valid, true);
SSqlInfo info1 = doGenerateAST("select * from `t.1abc` where ts<now+2h and col < 20+99");
ASSERT_EQ(info1.valid, true);
char msg[128] = {0};
SMsgBuf msgBuf = {0};
msgBuf.buf = msg;
msgBuf.len = 128;
SSqlNode* pNode = (SSqlNode*) taosArrayGetP(((SArray*)info1.list), 0);
int32_t code = evaluateSqlNode(pNode, TSDB_TIME_PRECISION_NANO, &msgBuf);
ASSERT_EQ(code, 0);
SSqlInfo info2 = doGenerateAST("select * from abc where ts<now+2");
SSqlNode* pNode2 = (SSqlNode*) taosArrayGetP(((SArray*)info2.list), 0);
code = evaluateSqlNode(pNode2, TSDB_TIME_PRECISION_MILLI, &msgBuf);
ASSERT_NE(code, 0);
}
TEST(testCase, evaluateAST_test) {
SSqlInfo info1 = doGenerateAST("select a, b+22 from `t.1abc` where ts<now+2h and `col` < 20 + 99");
ASSERT_EQ(info1.valid, true);
char msg[128] = {0};
SMsgBuf msgBuf = {0};
msgBuf.buf = msg;
msgBuf.len = 128;
SSqlNode* pNode = (SSqlNode*) taosArrayGetP(((SArray*)info1.list), 0);
int32_t code = evaluateSqlNode(pNode, TSDB_TIME_PRECISION_NANO, &msgBuf);
ASSERT_EQ(code, 0);
}
TEST(testCase, extractMeta_test) {
SSqlInfo info1 = doGenerateAST("select a, b+22 from `t.1abc` where ts<now+2h and `col` < 20 + 99");
ASSERT_EQ(info1.valid, true);
char msg[128] = {0};
SMetaReq req = {0};
int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128);
ASSERT_EQ(ret, 0);
ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);
}
//TEST(testCase, generateAST_test) {
// SSqlInfo info = doGenerateAST("select * from t1 where ts < now");
// ASSERT_EQ(info.valid, true);
//
// SSqlInfo info1 = doGenerateAST("select * from `t.1abc` where ts<now+2h and col < 20+99");
// ASSERT_EQ(info1.valid, true);
//
// char msg[128] = {0};
//
// SMsgBuf msgBuf = {0};
// msgBuf.buf = msg;
// msgBuf.len = 128;
//
// SSqlNode* pNode = (SSqlNode*) taosArrayGetP(((SArray*)info1.list), 0);
// int32_t code = evaluateSqlNode(pNode, TSDB_TIME_PRECISION_NANO, &msgBuf);
// ASSERT_EQ(code, 0);
//
// SSqlInfo info2 = doGenerateAST("select * from abc where ts<now+2");
// SSqlNode* pNode2 = (SSqlNode*) taosArrayGetP(((SArray*)info2.list), 0);
// code = evaluateSqlNode(pNode2, TSDB_TIME_PRECISION_MILLI, &msgBuf);
// ASSERT_NE(code, 0);
//
// destroySqlInfo(&info);
// destroySqlInfo(&info1);
// destroySqlInfo(&info2);
//}
//
//TEST(testCase, evaluateAST_test) {
// SSqlInfo info1 = doGenerateAST("select a, b+22 from `t.1abc` where ts<now+2h and `col` < 20 + 99");
// ASSERT_EQ(info1.valid, true);
//
// char msg[128] = {0};
// SMsgBuf msgBuf = {0};
// msgBuf.buf = msg;
// msgBuf.len = 128;
//
// SSqlNode* pNode = (SSqlNode*) taosArrayGetP(((SArray*)info1.list), 0);
// int32_t code = evaluateSqlNode(pNode, TSDB_TIME_PRECISION_NANO, &msgBuf);
// ASSERT_EQ(code, 0);
// destroySqlInfo(&info1);
//}
//
//TEST(testCase, extractMeta_test) {
// SSqlInfo info1 = doGenerateAST("select a, b+22 from `t.1abc` where ts<now+2h and `col` < 20 + 99");
// ASSERT_EQ(info1.valid, true);
//
// char msg[128] = {0};
// SMetaReq req = {0};
// int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128);
// ASSERT_EQ(ret, 0);
// ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);
//
// qParserClearupMetaRequestInfo(&req);
// destroySqlInfo(&info1);
//}
......@@ -15,5 +15,12 @@
#include "sync.h"
int32_t syncInit() {return 0;}
void syncCleanUp() {}
\ No newline at end of file
int32_t syncInit() { return 0; }
void syncCleanUp() {}
SSyncNode* syncStart(const SSyncInfo* pInfo) { return NULL; }
void syncStop(const SSyncNode* pNode) {}
void syncReconfig(const SSyncNode* pNode, const SSyncCluster* pCfg) {}
\ No newline at end of file
......@@ -15,5 +15,22 @@
#include "wal.h"
int32_t walInit() {return 0;}
void walCleanUp() {}
\ No newline at end of file
int32_t walInit() { return 0; }
void walCleanUp() {}
SWal *walOpen(char *path, SWalCfg *pCfg) { return NULL; }
int32_t walAlter(SWal *pWal, SWalCfg *pCfg) { return 0; }
void walClose(SWal *pWal) {}
void walFsync(SWal *pWal, bool force) {}
int64_t walWrite(SWal *pWal, int64_t index, void *body, int32_t bodyLen) {}
int32_t walCommit(SWal *pWal, int64_t ver) { return 0; }
int32_t walRollback(SWal *pWal, int64_t ver) { return 0; }
int32_t walPrune(SWal *pWal, int64_t ver) { return 0; }
\ No newline at end of file
......@@ -37,7 +37,7 @@ EDnStat dnodeGetRunStat() { return tsDnode.runStatus; }
void dnodeSetRunStat(EDnStat stat) { tsDnode.runStatus = stat; }
static void dnodeReportStartup(char *name, char *desc) {
void dnodeReportStartup(char *name, char *desc) {
SStartupStep *startup = &tsDnode.startup;
tstrncpy(startup->name, name, strlen(startup->name));
tstrncpy(startup->desc, desc, strlen(startup->desc));
......@@ -58,6 +58,7 @@ static int32_t dnodeInitVnode() {
para.fp.GetDnodeEp = dnodeGetEp;
para.fp.SendMsgToDnode = dnodeSendMsgToDnode;
para.fp.SendMsgToMnode = dnodeSendMsgToMnode;
para.fp.ReportStartup = dnodeReportStartup;
return vnodeInit(para);
}
......
......@@ -24,7 +24,7 @@ extern "C" {
tmr_h mnodeGetTimer();
int32_t mnodeGetDnodeId();
char *mnodeGetClusterId();
int64_t mnodeGetClusterId();
EMnStatus mnodeGetStatus();
void mnodeSendMsgToDnode(struct SRpcEpSet *epSet, struct SRpcMsg *rpcMsg);
......
......@@ -202,12 +202,13 @@ static void mnodeSendTelemetryReport() {
return;
}
char clusterId[TSDB_CLUSTER_ID_LEN] = {0};
mnodeGetClusterId(clusterId);
int64_t clusterId = mnodeGetClusterId();
char clusterIdStr[20] = {0};
snprintf(clusterIdStr, sizeof(clusterIdStr), "%" PRId64, clusterId);
SBufferWriter bw = tbufInitWriter(NULL, false);
mnodeBeginObject(&bw);
mnodeAddStringField(&bw, "instanceId", clusterId);
mnodeAddStringField(&bw, "instanceId", clusterIdStr);
mnodeAddIntField(&bw, "reportVersion", 1);
mnodeAddOsInfo(&bw);
mnodeAddCpuInfo(&bw);
......
......@@ -39,7 +39,7 @@
static struct {
int32_t state;
int32_t dnodeId;
char clusterId[TSDB_CLUSTER_ID_LEN];
int64_t clusterId;
tmr_h timer;
SMnodeFp fp;
SSteps * steps1;
......@@ -50,7 +50,7 @@ tmr_h mnodeGetTimer() { return tsMint.timer; }
int32_t mnodeGetDnodeId() { return tsMint.dnodeId; }
char *mnodeGetClusterId() { return tsMint.clusterId; }
int64_t mnodeGetClusterId() { return tsMint.clusterId; }
EMnStatus mnodeGetStatus() { return tsMint.state; }
......@@ -71,12 +71,14 @@ int32_t mnodeGetStatistics(SMnodeStat *stat) { return 0; }
static int32_t mnodeSetPara(SMnodePara para) {
tsMint.fp = para.fp;
tsMint.dnodeId = para.dnodeId;
strncpy(tsMint.clusterId, para.clusterId, TSDB_CLUSTER_ID_LEN);
tsMint.clusterId = para.clusterId;
if (tsMint.fp.SendMsgToDnode == NULL) return -1;
if (tsMint.fp.SendMsgToMnode == NULL) return -1;
if (tsMint.fp.SendRedirectMsg == NULL) return -1;
if (tsMint.fp.GetDnodeEp == NULL) return -1;
if (tsMint.dnodeId < 0) return -1;
if (tsMint.clusterId < 0) return -1;
return 0;
}
......@@ -141,7 +143,7 @@ static void mnodeCleanupStep2() { taosStepCleanup(tsMint.steps2); }
static bool mnodeNeedDeploy() {
if (tsMint.dnodeId > 0) return false;
if (tsMint.clusterId[0] != 0) return false;
if (tsMint.clusterId > 0) return false;
if (strcmp(tsFirst, tsLocalEp) != 0) return false;
return true;
}
......@@ -154,7 +156,7 @@ int32_t mnodeDeploy() {
tsMint.state = MN_STATUS_INIT;
}
if (tsMint.dnodeId <= 0 || tsMint.clusterId[0] == 0) {
if (tsMint.dnodeId <= 0 || tsMint.clusterId <= 0) {
mError("failed to deploy mnode since cluster not ready");
return TSDB_CODE_MND_NOT_READY;
}
......
......@@ -21,8 +21,10 @@ extern "C" {
#endif
#include "vnodeInt.h"
int32_t vnodeReadCfg(SVnode *pVnode);
int32_t vnodeWriteCfg(SCreateVnodeMsg *pVnodeCfg);
int32_t vnodeReadCfg(int32_t vgId, SVnodeCfg *pCfg);
int32_t vnodeWriteCfg(int32_t vgId, SVnodeCfg *pCfg);
int32_t vnodeReadState(int32_t vgId, SSyncServerState *pState);
int32_t vnodeSaveState(int32_t vgid, SSyncServerState *pState);
#ifdef __cplusplus
}
......
......@@ -16,11 +16,12 @@
#ifndef _TD_VNODE_INT_H_
#define _TD_VNODE_INT_H_
#include "os.h"
#include "amalloc.h"
#include "meta.h"
#include "os.h"
#include "sync.h"
#include "taosmsg.h"
#include "tglobal.h"
#include "tlog.h"
#include "tq.h"
#include "tqueue.h"
......@@ -43,57 +44,70 @@ extern int32_t vDebugFlag;
#define vDebug(...) { if (vDebugFlag & DEBUG_DEBUG) { taosPrintLog("VND ", vDebugFlag, __VA_ARGS__); }}
#define vTrace(...) { if (vDebugFlag & DEBUG_TRACE) { taosPrintLog("VND ", vDebugFlag, __VA_ARGS__); }}
typedef struct STsdbCfg {
int32_t cacheBlockSize; // MB
int32_t totalBlocks;
int32_t daysPerFile;
int32_t daysToKeep0;
int32_t daysToKeep1;
int32_t daysToKeep2;
int32_t minRowsPerFileBlock;
int32_t maxRowsPerFileBlock;
uint8_t precision; // time resolution
int8_t compression;
int8_t cacheLastRow;
int8_t update;
} STsdbCfg;
typedef struct SMetaCfg {
} SMetaCfg;
typedef struct SVnodeCfg {
char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN];
int8_t dropped;
int8_t quorum;
SWalCfg wal;
STsdbCfg tsdb;
SMetaCfg meta;
SSyncCluster sync;
} SVnodeCfg;
typedef struct {
int32_t vgId; // global vnode group ID
int32_t refCount; // reference count
SMemAllocator *allocator;
SMeta *pMeta;
STsdb *pTsdb;
STQ *pTQ;
twalh pWal;
SyncNodeId syncNode;
taos_queue pWriteQ; // write queue
taos_queue pQueryQ; // read query queue
taos_queue pFetchQ; // read fetch/cancel queue
SWalCfg walCfg;
SSyncCluster syncCfg;
char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN];
int64_t queuedWMsgSize;
int32_t queuedWMsg;
int32_t queuedRMsg;
int32_t numOfQHandle; // current initialized and existed query handle in current dnode
int8_t status;
int8_t role;
int8_t accessState;
int8_t dropped;
pthread_mutex_t statusMutex;
int32_t vgId; // global vnode group ID
int32_t refCount; // reference count
SMemAllocator *allocator;
SMeta *pMeta;
STsdb *pTsdb;
STQ *pTQ;
SWal *pWal;
void *pQuery;
SSyncNode *pSync;
taos_queue pWriteQ; // write queue
taos_queue pQueryQ; // read query queue
taos_queue pFetchQ; // read fetch/cancel queue
SVnodeCfg cfg;
SSyncServerState term;
int64_t queuedWMsgSize;
int32_t queuedWMsg;
int32_t queuedRMsg;
int32_t numOfQHandle; // current initialized and existed query handle in current dnode
int8_t role;
int8_t accessState;
int8_t dropped;
int8_t status;
pthread_mutex_t statusMutex;
} SVnode;
typedef struct {
int32_t len;
void * rsp;
void * qhandle; // used by query and retrieve msg
void *rsp;
void *qhandle; // used by query and retrieve msg
} SVnRsp;
void vnodeSendMsgToDnode(struct SRpcEpSet *epSet, struct SRpcMsg *rpcMsg);
void vnodeSendMsgToMnode(struct SRpcMsg *rpcMsg);
void vnodeGetDnodeEp(int32_t dnodeId, char *ep, char *fqdn, uint16_t *port);
int32_t vnodeCreate(SCreateVnodeMsg *pVnodeCfg);
int32_t vnodeDrop(int32_t vgId);
int32_t vnodeOpen(int32_t vgId);
int32_t vnodeAlter(SVnode *pVnode, SCreateVnodeMsg *pVnodeCfg);
int32_t vnodeSync(int32_t vgId);
int32_t vnodeClose(int32_t vgId);
void vnodeCleanUp(SVnode *pVnode);
void vnodeDestroy(SVnode *pVnode);
int32_t vnodeCompact(int32_t vgId);
void vnodeBackup(int32_t vgId);
void vnodeGetStatus(struct SStatusMsg *status);
SVnode *vnodeAcquire(int32_t vgId);
SVnode *vnodeAcquireNotClose(int32_t vgId);
void vnodeRelease(SVnode *pVnode);
void vnodeReportStartup(char *name, char *desc);
#ifdef __cplusplus
}
......
......@@ -13,23 +13,30 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_VNODE_MGMT_MSG_H_
#define _TD_VNODE_MGMT_MSG_H_
#ifndef _TD_VNODE_MAIN_H_
#define _TD_VNODE_MAIN_H_
#include "vnodeInt.h"
#ifdef __cplusplus
extern "C" {
#endif
#include "vnodeInt.h"
int32_t vnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg);
int32_t vnodeProcessAlterVnodeMsg(SRpcMsg *rpcMsg);
int32_t vnodeProcessSyncVnodeMsg(SRpcMsg *rpcMsg);
int32_t vnodeProcessCompactVnodeMsg(SRpcMsg *rpcMsg);
int32_t vnodeProcessDropVnodeMsg(SRpcMsg *rpcMsg);
int32_t vnodeProcessAlterStreamReq(SRpcMsg *pMsg);
int32_t vnodeInitMain();
void vnodeCleanupMain();
SVnode *vnodeAcquireInAllState(int32_t vgId);
SVnode *vnodeAcquire(int32_t vgId);
void vnodeRelease(SVnode *pVnode);
int32_t vnodeCreateVnode(int32_t vgId, SVnodeCfg *pCfg);
int32_t vnodeAlterVnode(SVnode *pVnode, SVnodeCfg *pCfg);
int32_t vnodeDropVnode(SVnode *pVnode);
int32_t vnodeSyncVnode(SVnode *pVnode);
int32_t vnodeCompactVnode(SVnode *pVnode);
#ifdef __cplusplus
}
#endif
#endif /*_TD_VNODE_MGMT_H_*/
#endif /*_TD_VNODE_MAIN_H_*/
......@@ -21,6 +21,14 @@ extern "C" {
#endif
#include "vnodeInt.h"
typedef struct {
SVnode *pVnode;
SRpcMsg rpcMsg;
char pCont[];
} SVnMgmtMsg;
int32_t vnodeInitMgmt();
void vnodeCleanupMgmt();
void vnodeProcessMgmtMsg(SRpcMsg *pMsg);
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_VNODE_STATUS_H_
#define _TD_VNODE_STATUS_H_
#ifdef __cplusplus
extern "C" {
#endif
#include "vnodeInt.h"
typedef enum _VN_STATUS {
TAOS_VN_STATUS_INIT = 0,
TAOS_VN_STATUS_READY = 1,
TAOS_VN_STATUS_CLOSING = 2,
TAOS_VN_STATUS_UPDATING = 3
} EVnodeStatus;
// vnodeStatus
extern char* vnodeStatus[];
bool vnodeSetInitStatus(SVnode* pVnode);
bool vnodeSetReadyStatus(SVnode* pVnode);
bool vnodeSetClosingStatus(SVnode* pVnode);
bool vnodeSetUpdatingStatus(SVnode* pVnode);
bool vnodeInInitStatus(SVnode* pVnode);
bool vnodeInReadyStatus(SVnode* pVnode);
bool vnodeInClosingStatus(SVnode* pVnode);
#ifdef __cplusplus
}
#endif
#endif /*_TD_VNODE_STATUS_H_*/
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_VNODE_VERSION_H_
#define _TD_VNODE_VERSION_H_
#ifdef __cplusplus
extern "C" {
#endif
#include "vnodeInt.h"
int32_t vnodeReadVersion(SVnode *pVnode);
int32_t vnodeSaveVersion(SVnode *pVnode);
#ifdef __cplusplus
}
#endif
#endif /*_TD_VNODE_VERSION_H_*/
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_VNODE_WORKER_H_
#define _TD_VNODE_WORKER_H_
#ifdef __cplusplus
extern "C" {
#endif
#include "vnodeInt.h"
int32_t vnodeInitWorker();
void vnodeCleanupWorker();
void vnodeProcessCleanupTask(SVnode *pVnode);
void vnodeProcessDestroyTask(SVnode *pVnode);
void vnodeProcessBackupTask(SVnode *pVnode);
#ifdef __cplusplus
}
#endif
#endif /*_TD_VNODE_WORKER_H_*/
\ No newline at end of file
此差异已折叠。
此差异已折叠。
......@@ -15,21 +15,185 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "vnodeMain.h"
#include "vnodeMgmt.h"
#include "vnodeMgmtMsg.h"
typedef struct {
SRpcMsg rpcMsg;
char pCont[];
} SVnMgmtMsg;
static struct {
SWorkerPool pool;
taos_queue pQueue;
SWorkerPool createPool;
taos_queue createQueue;
SWorkerPool workerPool;
taos_queue workerQueue;
int32_t (*msgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *);
} tsVmgmt = {0};
static int32_t vnodeParseCreateVnodeReq(SRpcMsg *rpcMsg, int32_t *vgId, SVnodeCfg *pCfg) {
SCreateVnodeMsg *pCreate = rpcMsg->pCont;
*vgId = htonl(pCreate->vgId);
pCfg->dropped = 0;
pCfg->quorum = pCreate->quorum;
tstrncpy(pCfg->db, pCreate->db, sizeof(pCfg->db));
pCfg->tsdb.cacheBlockSize = htonl(pCreate->cacheBlockSize);
pCfg->tsdb.totalBlocks = htonl(pCreate->totalBlocks);
pCfg->tsdb.daysPerFile = htonl(pCreate->daysPerFile);
pCfg->tsdb.daysToKeep1 = htonl(pCreate->daysToKeep1);
pCfg->tsdb.daysToKeep2 = htonl(pCreate->daysToKeep2);
pCfg->tsdb.daysToKeep0 = htonl(pCreate->daysToKeep0);
pCfg->tsdb.minRowsPerFileBlock = htonl(pCreate->minRowsPerFileBlock);
pCfg->tsdb.maxRowsPerFileBlock = htonl(pCreate->maxRowsPerFileBlock);
pCfg->tsdb.precision = pCreate->precision;
pCfg->tsdb.compression = pCreate->compression;
pCfg->tsdb.cacheLastRow = pCreate->cacheLastRow;
pCfg->tsdb.update = pCreate->update;
pCfg->wal.fsyncPeriod = htonl(pCreate->fsyncPeriod);
pCfg->wal.walLevel = pCreate->walLevel;
pCfg->sync.replica = pCreate->replica;
pCfg->sync.selfIndex = pCreate->selfIndex;
for (int32_t j = 0; j < pCreate->replica; ++j) {
pCfg->sync.nodeInfo[j].nodePort = htons(pCreate->nodes[j].port);
tstrncpy(pCfg->sync.nodeInfo[j].nodeFqdn, pCreate->nodes[j].fqdn, TSDB_FQDN_LEN);
}
return 0;
}
static int32_t vnodeProcessCreateVnodeReq(SRpcMsg *rpcMsg) {
SVnodeCfg vnodeCfg = {0};
int32_t vgId = 0;
int32_t code = vnodeParseCreateVnodeReq(rpcMsg, &vgId, &vnodeCfg);
if (code != 0) {
vError("failed to parse create vnode msg since %s", tstrerror(code));
}
vDebug("vgId:%d, create vnode req is received", vgId);
SVnode *pVnode = vnodeAcquireInAllState(vgId);
if (pVnode != NULL) {
vDebug("vgId:%d, already exist, return success", vgId);
vnodeRelease(pVnode);
return code;
}
code = vnodeCreateVnode(vgId, &vnodeCfg);
if (code != 0) {
vError("vgId:%d, failed to create vnode since %s", vgId, tstrerror(code));
}
return code;
}
static int32_t vnodeProcessAlterVnodeReq(SRpcMsg *rpcMsg) {
SVnodeCfg vnodeCfg = {0};
int32_t vgId = 0;
int32_t code = vnodeParseCreateVnodeReq(rpcMsg, &vgId, &vnodeCfg);
if (code != 0) {
vError("failed to parse create vnode msg since %s", tstrerror(code));
}
vDebug("vgId:%d, alter vnode req is received", vgId);
SVnode *pVnode = vnodeAcquire(vgId);
if (pVnode == NULL) {
code = terrno;
vDebug("vgId:%d, failed to alter vnode since %s", vgId, tstrerror(code));
return code;
}
code = vnodeAlterVnode(pVnode, &vnodeCfg);
if (code != 0) {
vError("vgId:%d, failed to alter vnode since %s", vgId, tstrerror(code));
}
vnodeRelease(pVnode);
return code;
}
static SDropVnodeMsg *vnodeParseDropVnodeReq(SRpcMsg *rpcMsg) {
SDropVnodeMsg *pDrop = rpcMsg->pCont;
pDrop->vgId = htonl(pDrop->vgId);
return pDrop;
}
static int32_t vnodeProcessSyncVnodeReq(SRpcMsg *rpcMsg) {
SSyncVnodeMsg *pSync = (SSyncVnodeMsg *)vnodeParseDropVnodeReq(rpcMsg);
int32_t code = 0;
int32_t vgId = pSync->vgId;
vDebug("vgId:%d, sync vnode req is received", vgId);
SVnode *pVnode = vnodeAcquire(vgId);
if (pVnode == NULL) {
code = terrno;
vDebug("vgId:%d, failed to sync since %s", vgId, tstrerror(code));
return code;
}
code = vnodeSyncVnode(pVnode);
if (code != 0) {
vError("vgId:%d, failed to compact vnode since %s", vgId, tstrerror(code));
}
vnodeRelease(pVnode);
return code;
}
static int32_t vnodeProcessCompactVnodeReq(SRpcMsg *rpcMsg) {
SCompactVnodeMsg *pCompact = (SCompactVnodeMsg *)vnodeParseDropVnodeReq(rpcMsg);
int32_t code = 0;
int32_t vgId = pCompact->vgId;
vDebug("vgId:%d, compact vnode req is received", vgId);
SVnode *pVnode = vnodeAcquire(vgId);
if (pVnode == NULL) {
code = terrno;
vDebug("vgId:%d, failed to compact since %s", vgId, tstrerror(code));
return code;
}
code = vnodeCompactVnode(pVnode);
if (code != 0) {
vError("vgId:%d, failed to compact vnode since %s", vgId, tstrerror(code));
}
vnodeRelease(pVnode);
return code;
}
static int32_t vnodeProcessDropVnodeReq(SRpcMsg *rpcMsg) {
SDropVnodeMsg *pDrop = vnodeParseDropVnodeReq(rpcMsg);
int32_t code = 0;
int32_t vgId = pDrop->vgId;
vDebug("vgId:%d, drop vnode req is received", vgId);
SVnode *pVnode = vnodeAcquire(vgId);
if (pVnode == NULL) {
code = terrno;
vDebug("vgId:%d, failed to drop since %s", vgId, tstrerror(code));
return code;
}
code = vnodeDropVnode(pVnode);
if (code != 0) {
vError("vgId:%d, failed to drop vnode since %s", vgId, tstrerror(code));
}
vnodeRelease(pVnode);
return code;
}
static int32_t vnodeProcessAlterStreamReq(SRpcMsg *pMsg) {
vError("alter stream msg not processed");
return TSDB_CODE_VND_MSG_NOT_PROCESSED;
}
static int32_t vnodeProcessMgmtStart(void *unused, SVnMgmtMsg *pMgmt, int32_t qtype) {
SRpcMsg *pMsg = &pMgmt->rpcMsg;
int32_t msgType = pMsg->msgType;
......@@ -43,27 +207,21 @@ static int32_t vnodeProcessMgmtStart(void *unused, SVnMgmtMsg *pMgmt, int32_t qt
}
}
static void vnodeSendMgmtEnd(void *unused, SVnMgmtMsg *pMgmt, int32_t qtype, int32_t code) {
static void vnodeProcessMgmtEnd(void *unused, SVnMgmtMsg *pMgmt, int32_t qtype, int32_t code) {
SRpcMsg *pMsg = &pMgmt->rpcMsg;
SRpcMsg rsp = {0};
vTrace("msg:%p, is processed, result:%s", pMgmt, tstrerror(code));
rsp.code = code;
vTrace("msg:%p, is processed, code:0x%x", pMgmt, rsp.code);
if (rsp.code != TSDB_CODE_DND_ACTION_IN_PROGRESS) {
rsp.handle = pMsg->handle;
rsp.pCont = NULL;
rpcSendResponse(&rsp);
}
taosFreeQitem(pMsg);
SRpcMsg rsp = {.code = code, .handle = pMsg->handle};
rpcSendResponse(&rsp);
taosFreeQitem(pMgmt);
}
static void vnodeInitMgmtReqFp() {
tsVmgmt.msgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = vnodeProcessCreateVnodeMsg;
tsVmgmt.msgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = vnodeProcessAlterVnodeMsg;
tsVmgmt.msgFp[TSDB_MSG_TYPE_MD_SYNC_VNODE] = vnodeProcessSyncVnodeMsg;
tsVmgmt.msgFp[TSDB_MSG_TYPE_MD_COMPACT_VNODE]= vnodeProcessCompactVnodeMsg;
tsVmgmt.msgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = vnodeProcessDropVnodeMsg;
tsVmgmt.msgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = vnodeProcessCreateVnodeReq;
tsVmgmt.msgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = vnodeProcessAlterVnodeReq;
tsVmgmt.msgFp[TSDB_MSG_TYPE_MD_SYNC_VNODE] = vnodeProcessSyncVnodeReq;
tsVmgmt.msgFp[TSDB_MSG_TYPE_MD_COMPACT_VNODE] = vnodeProcessCompactVnodeReq;
tsVmgmt.msgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = vnodeProcessDropVnodeReq;
tsVmgmt.msgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = vnodeProcessAlterStreamReq;
}
......@@ -75,14 +233,18 @@ static int32_t vnodeWriteToMgmtQueue(SRpcMsg *pMsg) {
pMgmt->rpcMsg = *pMsg;
pMgmt->rpcMsg.pCont = pMgmt->pCont;
memcpy(pMgmt->pCont, pMsg->pCont, pMsg->contLen);
taosWriteQitem(tsVmgmt.pQueue, TAOS_QTYPE_RPC, pMgmt);
return TSDB_CODE_SUCCESS;
if (pMsg->msgType == TSDB_MSG_TYPE_MD_CREATE_VNODE) {
return taosWriteQitem(tsVmgmt.createQueue, TAOS_QTYPE_RPC, pMgmt);
} else {
return taosWriteQitem(tsVmgmt.workerQueue, TAOS_QTYPE_RPC, pMgmt);
}
}
void vnodeProcessMgmtMsg(SRpcMsg *pMsg) {
int32_t code = vnodeWriteToMgmtQueue(pMsg);
if (code != TSDB_CODE_SUCCESS) {
vError("msg, ahandle:%p type:%s not processed since %s", pMsg->ahandle, taosMsg[pMsg->msgType], tstrerror(code));
SRpcMsg rsp = {.handle = pMsg->handle, .code = code};
rpcSendResponse(&rsp);
}
......@@ -93,25 +255,41 @@ void vnodeProcessMgmtMsg(SRpcMsg *pMsg) {
int32_t vnodeInitMgmt() {
vnodeInitMgmtReqFp();
SWorkerPool *pPool = &tsVmgmt.pool;
pPool->name = "vmgmt";
SWorkerPool *pPool = &tsVmgmt.createPool;
pPool->name = "vnode-mgmt-create";
pPool->startFp = (ProcessStartFp)vnodeProcessMgmtStart;
pPool->endFp = (ProcessEndFp)vnodeSendMgmtEnd;
pPool->endFp = (ProcessEndFp)vnodeProcessMgmtEnd;
pPool->min = 1;
pPool->max = 1;
if (tWorkerInit(pPool) != 0) {
return TSDB_CODE_VND_OUT_OF_MEMORY;
}
tsVmgmt.pQueue = tWorkerAllocQueue(pPool, NULL);
tsVmgmt.createQueue = tWorkerAllocQueue(pPool, NULL);
vInfo("vmgmt is initialized, max worker %d", pPool->max);
pPool = &tsVmgmt.workerPool;
pPool->name = "vnode-mgmt-worker";
pPool->startFp = (ProcessStartFp)vnodeProcessMgmtStart;
pPool->endFp = (ProcessEndFp)vnodeProcessMgmtEnd;
pPool->min = 1;
pPool->max = 1;
if (tWorkerInit(pPool) != 0) {
return TSDB_CODE_VND_OUT_OF_MEMORY;
}
tsVmgmt.workerQueue = tWorkerAllocQueue(pPool, NULL);
vInfo("vmgmt is initialized");
return TSDB_CODE_SUCCESS;
}
void vnodeCleanupMgmt() {
tWorkerFreeQueue(&tsVmgmt.pool, tsVmgmt.pQueue);
tWorkerCleanup(&tsVmgmt.pool);
tsVmgmt.pQueue = NULL;
tWorkerFreeQueue(&tsVmgmt.createPool, tsVmgmt.createQueue);
tWorkerCleanup(&tsVmgmt.createPool);
tsVmgmt.createQueue = NULL;
tWorkerFreeQueue(&tsVmgmt.workerPool, tsVmgmt.workerQueue);
tWorkerCleanup(&tsVmgmt.workerPool);
tsVmgmt.createQueue = NULL;
vInfo("vmgmt is closed");
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "vnodeMgmtMsg.h"
static SCreateVnodeMsg* vnodeParseVnodeMsg(SRpcMsg *rpcMsg) {
SCreateVnodeMsg *pCreate = rpcMsg->pCont;
pCreate->cfg.vgId = htonl(pCreate->cfg.vgId);
pCreate->cfg.dbCfgVersion = htonl(pCreate->cfg.dbCfgVersion);
pCreate->cfg.vgCfgVersion = htonl(pCreate->cfg.vgCfgVersion);
pCreate->cfg.maxTables = htonl(pCreate->cfg.maxTables);
pCreate->cfg.cacheBlockSize = htonl(pCreate->cfg.cacheBlockSize);
pCreate->cfg.totalBlocks = htonl(pCreate->cfg.totalBlocks);
pCreate->cfg.daysPerFile = htonl(pCreate->cfg.daysPerFile);
pCreate->cfg.daysToKeep1 = htonl(pCreate->cfg.daysToKeep1);
pCreate->cfg.daysToKeep2 = htonl(pCreate->cfg.daysToKeep2);
pCreate->cfg.daysToKeep = htonl(pCreate->cfg.daysToKeep);
pCreate->cfg.minRowsPerFileBlock = htonl(pCreate->cfg.minRowsPerFileBlock);
pCreate->cfg.maxRowsPerFileBlock = htonl(pCreate->cfg.maxRowsPerFileBlock);
pCreate->cfg.fsyncPeriod = htonl(pCreate->cfg.fsyncPeriod);
pCreate->cfg.commitTime = htonl(pCreate->cfg.commitTime);
for (int32_t j = 0; j < pCreate->cfg.vgReplica; ++j) {
pCreate->nodes[j].nodeId = htonl(pCreate->nodes[j].nodeId);
}
return pCreate;
}
int32_t vnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) {
SCreateVnodeMsg *pCreate = vnodeParseVnodeMsg(rpcMsg);
SVnode *pVnode = vnodeAcquire(pCreate->cfg.vgId);
if (pVnode != NULL) {
vDebug("vgId:%d, already exist, return success", pCreate->cfg.vgId);
vnodeRelease(pVnode);
return TSDB_CODE_SUCCESS;
} else {
vDebug("vgId:%d, create vnode msg is received", pCreate->cfg.vgId);
return vnodeCreate(pCreate);
}
}
int32_t vnodeProcessAlterVnodeMsg(SRpcMsg *rpcMsg) {
SAlterVnodeMsg *pAlter = vnodeParseVnodeMsg(rpcMsg);
void *pVnode = vnodeAcquireNotClose(pAlter->cfg.vgId);
if (pVnode != NULL) {
vDebug("vgId:%d, alter vnode msg is received", pAlter->cfg.vgId);
int32_t code = vnodeAlter(pVnode, pAlter);
vnodeRelease(pVnode);
return code;
} else {
vInfo("vgId:%d, vnode not exist, can't alter it", pAlter->cfg.vgId);
return TSDB_CODE_VND_INVALID_VGROUP_ID;
}
}
int32_t vnodeProcessSyncVnodeMsg(SRpcMsg *rpcMsg) {
SSyncVnodeMsg *pSyncVnode = rpcMsg->pCont;
pSyncVnode->vgId = htonl(pSyncVnode->vgId);
return vnodeSync(pSyncVnode->vgId);
}
int32_t vnodeProcessCompactVnodeMsg(SRpcMsg *rpcMsg) {
SCompactVnodeMsg *pCompactVnode = rpcMsg->pCont;
pCompactVnode->vgId = htonl(pCompactVnode->vgId);
return vnodeCompact(pCompactVnode->vgId);
}
int32_t vnodeProcessDropVnodeMsg(SRpcMsg *rpcMsg) {
SDropVnodeMsg *pDrop = rpcMsg->pCont;
pDrop->vgId = htonl(pDrop->vgId);
return vnodeDrop(pDrop->vgId);
}
int32_t vnodeProcessAlterStreamReq(SRpcMsg *pMsg) { return 0; }
......@@ -14,14 +14,9 @@
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "taosmsg.h"
#include "tglobal.h"
// #include "query.h"
#include "vnodeMain.h"
#include "vnodeRead.h"
#include "vnodeReadMsg.h"
#include "vnodeStatus.h"
static struct {
SWorkerPool query;
......@@ -50,11 +45,6 @@ static int32_t vnodeWriteToRQueue(SVnode *pVnode, void *pCont, int32_t contLen,
}
#endif
if (!vnodeInReadyStatus(pVnode)) {
vDebug("vgId:%d, failed to write into vread queue, vnode status is %s", pVnode->vgId, vnodeStatus[pVnode->status]);
return TSDB_CODE_APP_NOT_READY;
}
int32_t size = sizeof(SReadMsg) + contLen;
SReadMsg *pRead = taosAllocateQitem(size);
if (pRead == NULL) {
......@@ -119,7 +109,7 @@ void vnodeProcessReadMsg(SRpcMsg *pMsg) {
pHead->contLen = htonl(pHead->contLen);
assert(pHead->contLen > 0);
SVnode *pVnode = vnodeAcquireNotClose(pHead->vgId);
SVnode *pVnode = vnodeAcquire(pHead->vgId);
if (pVnode != NULL) {
code = vnodeWriteToRQueue(pVnode, pCont, pHead->contLen, TAOS_QTYPE_RPC, pMsg);
if (code == TSDB_CODE_SUCCESS) queuedMsgNum++;
......
......@@ -14,11 +14,7 @@
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "taosmsg.h"
#include "tglobal.h"
// #include "query.h"
#include "vnodeStatus.h"
#include "vnodeMain.h"
#include "vnodeRead.h"
#include "vnodeReadMsg.h"
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册