提交 ee470a94 编写于 作者: H Haojun Liao

other: merge 3.0

......@@ -121,7 +121,7 @@ def pre_test_win(){
set
date /t
time /t
rd /s /Q C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal\\debug
rd /s /Q C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal\\debug || exit 0
'''
bat '''
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal
......
......@@ -46,11 +46,17 @@ ENDIF ()
IF (TD_WINDOWS)
MESSAGE("${Yellow} set compiler flag for Windows! ${ColourReset}")
SET(COMMON_FLAGS "/W3 /D_WIN32")
SET(COMMON_FLAGS "/w /D_WIN32")
SET(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} /MANIFEST:NO")
# IF (MSVC AND (MSVC_VERSION GREATER_EQUAL 1900))
# SET(COMMON_FLAGS "${COMMON_FLAGS} /Wv:18")
# ENDIF ()
IF (CMAKE_DEPFILE_FLAGS_C)
SET(CMAKE_DEPFILE_FLAGS_C "")
ENDIF ()
IF (CMAKE_DEPFILE_FLAGS_CXX)
SET(CMAKE_DEPFILE_FLAGS_CXX "")
ENDIF ()
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} ${COMMON_FLAGS}")
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${COMMON_FLAGS}")
......
......@@ -46,6 +46,18 @@ IF(${TD_WINDOWS})
ON
)
option(
BUILD_TEST
"If build unit tests using googletest"
OFF
)
ELSE ()
option(
BUILD_TEST
"If build unit tests using googletest"
ON
)
ENDIF ()
option(
......@@ -54,12 +66,6 @@ option(
OFF
)
option(
BUILD_TEST
"If build unit tests using googletest"
ON
)
option(
BUILD_WITH_LEVELDB
"If build with leveldb"
......
......@@ -1648,6 +1648,15 @@ typedef struct {
int32_t tSerializeSMDropTopicReq(void* buf, int32_t bufLen, SMDropTopicReq* pReq);
int32_t tDeserializeSMDropTopicReq(void* buf, int32_t bufLen, SMDropTopicReq* pReq);
typedef struct {
char topic[TSDB_TOPIC_FNAME_LEN];
char cgroup[TSDB_CGROUP_LEN];
int8_t igNotExists;
} SMDropCgroupReq;
int32_t tSerializeSMDropCgroupReq(void* buf, int32_t bufLen, SMDropCgroupReq* pReq);
int32_t tDeserializeSMDropCgroupReq(void* buf, int32_t bufLen, SMDropCgroupReq* pReq);
typedef struct {
char name[TSDB_TABLE_FNAME_LEN];
int8_t alterType;
......
......@@ -236,6 +236,7 @@ typedef struct SSelectStmt {
bool isTimeOrderQuery;
bool hasAggFuncs;
bool hasRepeatScanFuncs;
bool hasNonstdSQLFunc;
} SSelectStmt;
typedef enum ESetOperatorType { SET_OP_TYPE_UNION_ALL = 1, SET_OP_TYPE_UNION } ESetOperatorType;
......@@ -350,9 +351,6 @@ bool nodesIsComparisonOp(const SOperatorNode* pOp);
bool nodesIsJsonOp(const SOperatorNode* pOp);
bool nodesIsRegularOp(const SOperatorNode* pOp);
bool nodesIsTimeorderQuery(const SNode* pQuery);
bool nodesIsTimelineQuery(const SNode* pQuery);
void* nodesGetValueFromNode(SValueNode* pNode);
int32_t nodesSetValueNodeValue(SValueNode* pNode, void* value);
char* nodesGetStrValueFromNode(SValueNode* pNode);
......
......@@ -107,6 +107,7 @@ static FORCE_INLINE void streamDataSubmitRefDec(SStreamDataSubmit* pDataSubmit)
if (ref == 0) {
taosMemoryFree(pDataSubmit->data);
taosMemoryFree(pDataSubmit->dataRef);
taosFreeQitem(pDataSubmit);
}
}
......@@ -279,6 +280,12 @@ typedef struct {
SArray* res; // SArray<SSDataBlock>
} SStreamSinkReq;
typedef struct {
SMsgHead head;
int64_t streamId;
int32_t taskId;
} SStreamTaskRunReq;
int32_t streamEnqueueDataSubmit(SStreamTask* pTask, SStreamDataSubmit* input);
int32_t streamEnqueueDataBlk(SStreamTask* pTask, SStreamDataBlock* input);
int32_t streamDequeueOutput(SStreamTask* pTask, void** output);
......
......@@ -20,20 +20,23 @@
extern "C" {
#endif
#include <stdbool.h>
#include <stdint.h>
//#include <tdatablock.h>
#include "cJSON.h"
#include "tdef.h"
//#include "taosdef.h"
//#include "trpc.h"
//#include "wal.h"
#include "tmsgcb.h"
#define SYNC_INDEX_BEGIN 0
#define SYNC_INDEX_INVALID -1
typedef uint64_t SyncNodeId;
typedef int32_t SyncGroupId;
typedef int64_t SyncIndex;
typedef uint64_t SyncTerm;
typedef struct SSyncNode SSyncNode;
typedef struct SSyncBuffer SSyncBuffer;
typedef struct SWal SWal;
typedef struct SSyncRaftEntry SSyncRaftEntry;
typedef enum {
TAOS_SYNC_STATE_FOLLOWER = 100,
TAOS_SYNC_STATE_CANDIDATE = 101,
......@@ -41,6 +44,17 @@ typedef enum {
TAOS_SYNC_STATE_ERROR = 103,
} ESyncState;
typedef enum {
TAOS_SYNC_PROPOSE_SUCCESS = 0,
TAOS_SYNC_PROPOSE_NOT_LEADER = 1,
TAOS_SYNC_PROPOSE_OTHER_ERROR = 2,
} ESyncProposeCode;
typedef enum {
TAOS_SYNC_FSM_CB_SUCCESS = 0,
TAOS_SYNC_FSM_CB_OTHER_ERROR = 1,
} ESyncFsmCbCode;
typedef struct SNodeInfo {
uint16_t nodePort;
char nodeFqdn[TSDB_FQDN_LEN];
......@@ -58,11 +72,6 @@ typedef struct SSnapshot {
SyncTerm lastApplyTerm;
} SSnapshot;
typedef enum {
TAOS_SYNC_FSM_CB_SUCCESS = 0,
TAOS_SYNC_FSM_CB_OTHER_ERROR,
} ESyncFsmCbCode;
typedef struct SFsmCbMeta {
SyncIndex index;
bool isWeak;
......@@ -71,27 +80,15 @@ typedef struct SFsmCbMeta {
uint64_t seqNum;
} SFsmCbMeta;
struct SRpcMsg;
typedef struct SRpcMsg SRpcMsg;
typedef struct SSyncFSM {
void* data;
void (*FpCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
void (*FpPreCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
void (*FpRollBackCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
int32_t (*FpGetSnapshot)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot);
int32_t (*FpRestoreSnapshot)(struct SSyncFSM* pFsm, const SSnapshot* snapshot);
} SSyncFSM;
struct SSyncRaftEntry;
typedef struct SSyncRaftEntry SSyncRaftEntry;
#define SYNC_INDEX_BEGIN 0
#define SYNC_INDEX_INVALID -1
// abstract definition of log store in raft
// SWal implements it
typedef struct SSyncLogStore {
......@@ -120,11 +117,6 @@ typedef struct SSyncLogStore {
} SSyncLogStore;
struct SWal;
typedef struct SWal SWal;
struct SEpSet;
typedef struct SEpSet SEpSet;
typedef struct SSyncInfo {
SyncGroupId vgId;
......@@ -132,12 +124,9 @@ typedef struct SSyncInfo {
char path[TSDB_FILENAME_LEN];
SWal* pWal;
SSyncFSM* pFsm;
void* rpcClient;
int32_t (*FpSendMsg)(void* rpcClient, const SEpSet* pEpSet, SRpcMsg* pMsg);
void* queue;
int32_t (*FpEqMsg)(void* queue, SRpcMsg* pMsg);
SMsgCb* msgcb;
int32_t (*FpSendMsg)(const SEpSet* pEpSet, SRpcMsg* pMsg);
int32_t (*FpEqMsg)(const SMsgCb* msgcb, SRpcMsg* pMsg);
} SSyncInfo;
int32_t syncInit();
......@@ -152,27 +141,8 @@ const char* syncGetMyRoleStr(int64_t rid);
SyncTerm syncGetMyTerm(int64_t rid);
void syncGetEpSet(int64_t rid, SEpSet* pEpSet);
int32_t syncGetVgId(int64_t rid);
typedef enum {
TAOS_SYNC_PROPOSE_SUCCESS = 0,
TAOS_SYNC_PROPOSE_NOT_LEADER,
TAOS_SYNC_PROPOSE_OTHER_ERROR,
} ESyncProposeCode;
int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak);
bool syncEnvIsStart();
extern int32_t sDebugFlag;
//-----------------------------------------
struct SSyncNode;
typedef struct SSyncNode SSyncNode;
struct SSyncBuffer;
typedef struct SSyncBuffer SSyncBuffer;
//-----------------------------------------
const char* syncStr(ESyncState state);
#ifdef __cplusplus
......
......@@ -20,13 +20,7 @@
extern "C" {
#endif
#include <stdbool.h>
#include <stdint.h>
//#include <tdatablock.h>
#include "cJSON.h"
//#include "taosdef.h"
#include "trpc.h"
//#include "wal.h"
// ------------------ ds -------------------
typedef struct SRaftId {
......@@ -35,16 +29,12 @@ typedef struct SRaftId {
} SRaftId;
// ------------------ control -------------------
struct SSyncNode;
typedef struct SSyncNode SSyncNode;
SSyncNode* syncNodeAcquire(int64_t rid);
void syncNodeRelease(SSyncNode* pNode);
int32_t syncGetRespRpc(int64_t rid, uint64_t index, SRpcMsg* msg);
int32_t syncGetAndDelRespRpc(int64_t rid, uint64_t index, SRpcMsg* msg);
void syncSetQ(int64_t rid, void* queueHandle);
void syncSetRpc(int64_t rid, void* rpcHandle);
void syncSetMsgCb(int64_t rid, const SMsgCb* msgcb);
char* sync2SimpleStr(int64_t rid);
// set timer ms
......
......@@ -38,7 +38,7 @@ typedef struct {
typedef struct SRpcHandleInfo {
// rpc info
void *handle; // rpc handle returned to app
void * handle; // rpc handle returned to app
int64_t refId; // refid, used by server
int32_t noResp; // has response or not(default 0, 0: resp, 1: no resp);
int32_t persistHandle; // persist handle or not
......@@ -49,13 +49,13 @@ typedef struct SRpcHandleInfo {
void *node; // node mgmt handle
// resp info
void *rsp;
void * rsp;
int32_t rspLen;
} SRpcHandleInfo;
typedef struct SRpcMsg {
tmsg_t msgType;
void *pCont;
void * pCont;
int32_t contLen;
int32_t code;
SRpcHandleInfo info;
......@@ -63,11 +63,6 @@ typedef struct SRpcMsg {
} SRpcMsg;
typedef void (*RpcCfp)(void *parent, SRpcMsg *, SEpSet *rf);
typedef int (*RpcAfp)(void *parent, char *tableId, char *spi, char *encrypt, char *secret, char *ckey);
///
// // SRpcMsg code
// REDIERE,
// NOT READY, EpSet
typedef bool (*RpcRfp)(int32_t code);
typedef struct SRpcInit {
......@@ -81,17 +76,10 @@ typedef struct SRpcInit {
// the following is for client app ecurity only
char *user; // user name
char spi; // security parameter index
char encrypt; // encrypt algorithm
char *secret; // key for authentication
char *ckey; // ciphering key
// call back to process incoming msg, code shall be ignored by server app
RpcCfp cfp;
// call back to retrieve the client auth info, for server app only
RpcAfp afp;
// user defined retry func
RpcRfp rfp;
......
......@@ -649,6 +649,8 @@ int32_t* taosGetErrno();
#define TSDB_CODE_PAR_INVALID_TBNAME TAOS_DEF_ERROR_CODE(0, 0x264C)
#define TSDB_CODE_PAR_INVALID_FUNCTION_NAME TAOS_DEF_ERROR_CODE(0, 0x264D)
#define TSDB_CODE_PAR_COMMENT_TOO_LONG TAOS_DEF_ERROR_CODE(0, 0x264E)
#define TSDB_CODE_PAR_NOT_ALLOWED_FUNC TAOS_DEF_ERROR_CODE(0, 0x264F)
#define TSDB_CODE_PAR_NOT_ALLOWED_WIN_QUERY TAOS_DEF_ERROR_CODE(0, 0x2650)
//planner
#define TSDB_CODE_PLAN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x2700)
......
......@@ -67,9 +67,9 @@ fi
cp ${compile_dir}/build/bin/taos ${pkg_dir}${install_home_path}/bin
cp ${compile_dir}/build/lib/${libfile} ${pkg_dir}${install_home_path}/driver
cp ${compile_dir}/../src/inc/taos.h ${pkg_dir}${install_home_path}/include
cp ${compile_dir}/../src/inc/taosdef.h ${pkg_dir}${install_home_path}/include
cp ${compile_dir}/../src/inc/taoserror.h ${pkg_dir}${install_home_path}/include
cp ${compile_dir}/../include/client/taos.h ${pkg_dir}${install_home_path}/include
cp ${compile_dir}/../include/common/taosdef.h ${pkg_dir}${install_home_path}/include
cp ${compile_dir}/../include/util/taoserror.h ${pkg_dir}${install_home_path}/include
cp -r ${top_dir}/examples/* ${pkg_dir}${install_home_path}/examples
#cp -r ${top_dir}/src/connector/python ${pkg_dir}${install_home_path}/connector
#cp -r ${top_dir}/src/connector/go ${pkg_dir}${install_home_path}/connector
......
#!/bin/bash
#
# Generate the tar.gz package for linux os
# Generate the deb package for ubuntu, or rpm package for centos, or tar.gz package for other linux os
set -e
#set -x
# release.sh -v [cluster | edge]
# -c [aarch32 | aarch64 | x64 | x86 | mips64 ...]
# -o [Linux | Kylin | Alpine | Raspberrypi | Darwin | Windows | Ningsi60 | Ningsi80 |...]
# -V [stable | beta]
# -l [full | lite]
# -s [static | dynamic]
# -d [taos | ...]
# -n [2.0.0.3]
# -m [2.0.0.0]
# -H [ false | true]
# set parameters by default value
version="3.0.0.0"
verMode=edge # [cluster, edge]
verType=stable # [stable, beta]
cpuType=x64 # [aarch32 | aarch64 | x64 | x86 | mips64 ...]
osType=Linux # [Linux | Kylin | Alpine | Raspberrypi | Darwin | Windows | Ningsi60 | Ningsi80 |...]
pagMode=full # [full | lite]
soMode=dynamic # [static | dynamic]
dbName=taos # [taos | ...]
allocator=glibc # [glibc | jemalloc]
verNumber=""
verNumberComp="2.0.0.0"
httpdBuild=false
while getopts "hv:V:c:o:l:s:d:a:n:m:H:" arg; do
case $arg in
v)
#echo "verMode=$OPTARG"
verMode=$(echo $OPTARG)
;;
V)
#echo "verType=$OPTARG"
verType=$(echo $OPTARG)
;;
c)
#echo "cpuType=$OPTARG"
cpuType=$(echo $OPTARG)
;;
l)
#echo "pagMode=$OPTARG"
pagMode=$(echo $OPTARG)
;;
s)
#echo "soMode=$OPTARG"
soMode=$(echo $OPTARG)
;;
d)
#echo "dbName=$OPTARG"
dbName=$(echo $OPTARG)
;;
a)
#echo "allocator=$OPTARG"
allocator=$(echo $OPTARG)
;;
n)
#echo "verNumber=$OPTARG"
verNumber=$(echo $OPTARG)
;;
m)
#echo "verNumberComp=$OPTARG"
verNumberComp=$(echo $OPTARG)
;;
o)
#echo "osType=$OPTARG"
osType=$(echo $OPTARG)
;;
H)
#echo "httpdBuild=$OPTARG"
httpdBuild=$(echo $OPTARG)
;;
h)
echo "Usage: $(basename $0) -v [cluster | edge] "
echo " -c [aarch32 | aarch64 | x64 | x86 | mips64 ...] "
echo " -o [Linux | Kylin | Alpine | Raspberrypi | Darwin | Windows | Ningsi60 | Ningsi80 |...] "
echo " -V [stable | beta] "
echo " -l [full | lite] "
echo " -a [glibc | jemalloc] "
echo " -s [static | dynamic] "
echo " -d [taos | ...] "
echo " -n [version number] "
echo " -m [compatible version number] "
echo " -H [false | true] "
exit 0
;;
?) #unknow option
echo "unkonw argument"
exit 1
;;
esac
done
echo "verMode=${verMode} verType=${verType} cpuType=${cpuType} osType=${osType} pagMode=${pagMode} soMode=${soMode} dbName=${dbName} allocator=${allocator} verNumber=${verNumber} verNumberComp=${verNumberComp} httpdBuild=${httpdBuild}"
curr_dir=$(pwd)
script_dir="$(dirname $(readlink -f $0))"
top_dir="$(readlink -f ${script_dir}/..)"
if [ "$osType" == "Darwin" ]; then
script_dir=$(dirname $0)
cd ${script_dir}
script_dir="$(pwd)"
top_dir=${script_dir}/..
else
script_dir="$(dirname $(readlink -f $0))"
top_dir="$(readlink -f ${script_dir}/..)"
fi
csudo=""
if command -v sudo > /dev/null; then
csudo="sudo "
fi
function is_valid_version() {
[ -z $1 ] && return 1 || :
rx='^([0-9]+\.){3}(\*|[0-9]+)$'
if [[ $1 =~ $rx ]]; then
return 0
fi
return 1
}
function vercomp() {
if [[ $1 == $2 ]]; then
echo 0
exit 0
fi
local IFS=.
local i ver1=($1) ver2=($2)
# fill empty fields in ver1 with zeros
for ((i = ${#ver1[@]}; i < ${#ver2[@]}; i++)); do
ver1[i]=0
done
for ((i = 0; i < ${#ver1[@]}; i++)); do
if [[ -z ${ver2[i]} ]]; then
# fill empty fields in ver2 with zeros
ver2[i]=0
fi
if ((10#${ver1[i]} > 10#${ver2[i]})); then
echo 1
exit 0
fi
if ((10#${ver1[i]} < 10#${ver2[i]})); then
echo 2
exit 0
fi
done
echo 0
}
# 1. check version information
if ( (! is_valid_version $verNumber) || (! is_valid_version $verNumberComp) || [[ "$(vercomp $verNumber $verNumberComp)" == '2' ]]); then
echo "please enter correct version"
exit 0
fi
echo "=======================new version number: ${verNumber}======================================"
echo "=======================new version number: ${verNumber}, compatible version: ${verNumberComp}======================================"
build_time=$(date +"%F %R")
echo "script_dir: ${script_dir}"
echo "top_dir: ${top_dir}"
# get commint id from git
gitinfo=$(git rev-parse --verify HEAD)
cd ${top_dir}
# git checkout -- .
# git checkout 3.0
# git pull || :
if [[ "$verMode" == "cluster" ]]; then
enterprise_dir="${top_dir}/../enterprise"
cd ${enterprise_dir}
gitinfoOfInternal=$(git rev-parse --verify HEAD)
else
gitinfoOfInternal=NULL
fi
echo "curr_dir: ${curr_dir}"
cd "${curr_dir}"
# 2. cmake executable file
compile_dir="${top_dir}/debug"
# if [ -d ${compile_dir} ]; then
# rm -rf ${compile_dir}
# fi
mkdir -p ${compile_dir}
if [ -d ${compile_dir} ]; then
${csudo}rm -rf ${compile_dir}
fi
if [ "$osType" != "Darwin" ]; then
${csudo}mkdir -p ${compile_dir}
else
mkdir -p ${compile_dir}
fi
cd ${compile_dir}
echo "compile_dir: ${compile_dir}"
if [[ "$allocator" == "jemalloc" ]]; then
allocator_macro="-DJEMALLOC_ENABLED=true"
else
allocator_macro=""
fi
cmake .. -DBUILD_TOOLS=true
make -j32
if [[ "$dbName" != "taos" ]]; then
source ${enterprise_dir}/packaging/oem/sed_$dbName.sh
replace_community_$dbName
fi
release_dir="${top_dir}/release"
if [ -d ${release_dir} ]; then
rm -rf ${release_dir}
if [[ "$httpdBuild" == "true" ]]; then
BUILD_HTTP=true
else
BUILD_HTTP=false
fi
mkdir -p ${release_dir}
cd ${release_dir}
if [[ "$verMode" == "cluster" ]]; then
BUILD_HTTP=internal
fi
install_dir="${release_dir}/TDengine-server-${version}"
mkdir -p ${install_dir}
mkdir -p ${install_dir}/bin
mkdir -p ${install_dir}/lib
mkdir -p ${install_dir}/inc
if [[ "$pagMode" == "full" ]]; then
BUILD_TOOLS=true
else
BUILD_TOOLS=false
fi
# check support cpu type
if [[ "$cpuType" == "x64" ]] || [[ "$cpuType" == "aarch64" ]] || [[ "$cpuType" == "aarch32" ]] || [[ "$cpuType" == "mips64" ]]; then
if [ "$verMode" != "cluster" ]; then
# community-version compile
cmake ../ -DCPUTYPE=${cpuType} -DOSTYPE=${osType} -DSOMODE=${soMode} -DDBNAME=${dbName} -DVERTYPE=${verType} -DVERDATE="${build_time}" -DGITINFO=${gitinfo} -DGITINFOI=${gitinfoOfInternal} -DVERNUMBER=${verNumber} -DVERCOMPATIBLE=${verNumberComp} -DPAGMODE=${pagMode} -DBUILD_HTTP=${BUILD_HTTP} -DBUILD_TOOLS=${BUILD_TOOLS} ${allocator_macro}
else
if [[ "$dbName" != "taos" ]]; then
replace_enterprise_$dbName
fi
cmake ../../ -DCPUTYPE=${cpuType} -DOSTYPE=${osType} -DSOMODE=${soMode} -DDBNAME=${dbName} -DVERTYPE=${verType} -DVERDATE="${build_time}" -DGITINFO=${gitinfo} -DGITINFOI=${gitinfoOfInternal} -DVERNUMBER=${verNumber} -DVERCOMPATIBLE=${verNumberComp} -DBUILD_HTTP=${BUILD_HTTP} -DBUILD_TOOLS=${BUILD_TOOLS} ${allocator_macro}
fi
else
echo "input cpuType=${cpuType} error!!!"
exit 1
fi
install_files="${script_dir}/tools/install.sh"
chmod a+x ${script_dir}/tools/install.sh || :
cp ${install_files} ${install_dir}
CORES=$(grep -c ^processor /proc/cpuinfo)
header_files="${top_dir}/include/client/taos.h ${top_dir}/include/util/taoserror.h"
cp ${header_files} ${install_dir}/inc
if [[ "$allocator" == "jemalloc" ]]; then
# jemalloc need compile first, so disable parallel build
make -j ${CORES} && ${csudo}make install
else
make -j ${CORES} && ${csudo}make install
fi
cd ${curr_dir}
bin_files="${compile_dir}/build/bin/taosd ${compile_dir}/build/bin/taos ${compile_dir}/build/bin/create_table ${compile_dir}/build/bin/tmq_sim ${script_dir}/tools/remove.sh ${compile_dir}/build/bin/taosBenchmark ${compile_dir}/build/bin/taosdump"
cp -rf ${bin_files} ${install_dir}/bin && chmod a+x ${install_dir}/bin/* || :
# 3. Call the corresponding script for packaging
if [ "$osType" != "Darwin" ]; then
if [[ "$verMode" != "cluster" ]] && [[ "$pagMode" == "full" ]] && [[ "$cpuType" == "x64" ]] && [[ "$dbName" == "taos" ]]; then
ret='0'
command -v dpkg >/dev/null 2>&1 || { ret='1'; }
if [ "$ret" -eq 0 ]; then
echo "====do deb package for the ubuntu system===="
output_dir="${top_dir}/debs"
if [ -d ${output_dir} ]; then
${csudo}rm -rf ${output_dir}
fi
${csudo}mkdir -p ${output_dir}
cd ${script_dir}/deb
${csudo}./makedeb.sh ${compile_dir} ${output_dir} ${verNumber} ${cpuType} ${osType} ${verMode} ${verType}
cp ${compile_dir}/build/lib/libtaos.so ${install_dir}/lib/
cp ${compile_dir}/build/lib/libavro* ${install_dir}/lib/ > /dev/null || echo -e "failed to copy avro libraries"
cp -rf ${compile_dir}/build/lib/pkgconfig ${install_dir}/lib/ > /dev/null || echo -e "failed to copy pkgconfig directory"
if [[ "$pagMode" == "full" ]]; then
if [ -d ${top_dir}/tools/taos-tools/packaging/deb ]; then
cd ${top_dir}/tools/taos-tools/packaging/deb
[ -z "$taos_tools_ver" ] && taos_tools_ver="0.1.0"
taos_tools_ver=$(git describe --tags | sed -e 's/ver-//g' | awk -F '-' '{print $1}')
${csudo}./make-taos-tools-deb.sh ${top_dir} \
${compile_dir} ${output_dir} ${taos_tools_ver} ${cpuType} ${osType} ${verMode} ${verType}
fi
fi
else
echo "==========dpkg command not exist, so not release deb package!!!"
fi
ret='0'
command -v rpmbuild >/dev/null 2>&1 || { ret='1'; }
if [ "$ret" -eq 0 ]; then
echo "====do rpm package for the centos system===="
output_dir="${top_dir}/rpms"
if [ -d ${output_dir} ]; then
${csudo}rm -rf ${output_dir}
fi
${csudo}mkdir -p ${output_dir}
cd ${script_dir}/rpm
${csudo}./makerpm.sh ${compile_dir} ${output_dir} ${verNumber} ${cpuType} ${osType} ${verMode} ${verType}
#cp ${compile_dir}/source/dnode/mnode/impl/libmnode.so ${install_dir}/lib/
#cp ${compile_dir}/source/dnode/qnode/libqnode.so ${install_dir}/lib/
#cp ${compile_dir}/source/dnode/snode/libsnode.so ${install_dir}/lib/
#cp ${compile_dir}/source/dnode/bnode/libbnode.so ${install_dir}/lib/
#cp ${compile_dir}/source/libs/wal/libwal.so ${install_dir}/lib/
#cp ${compile_dir}/source/libs/scheduler/libscheduler.so ${install_dir}/lib/
#cp ${compile_dir}/source/libs/planner/libplanner.so ${install_dir}/lib/
#cp ${compile_dir}/source/libs/parser/libparser.so ${install_dir}/lib/
#cp ${compile_dir}/source/libs/qcom/libqcom.so ${install_dir}/lib/
#cp ${compile_dir}/source/libs/transport/libtransport.so ${install_dir}/lib/
#cp ${compile_dir}/source/libs/function/libfunction.so ${install_dir}/lib/
#cp ${compile_dir}/source/common/libcommon.so ${install_dir}/lib/
#cp ${compile_dir}/source/os/libos.so ${install_dir}/lib/
#cp ${compile_dir}/source/dnode/mnode/sdb/libsdb.so ${install_dir}/lib/
#cp ${compile_dir}/source/libs/catalog/libcatalog.so ${install_dir}/lib/
if [[ "$pagMode" == "full" ]]; then
if [ -d ${top_dir}/tools/taos-tools/packaging/rpm ]; then
cd ${top_dir}/tools/taos-tools/packaging/rpm
[ -z "$taos_tools_ver" ] && taos_tools_ver="0.1.0"
pkg_name=${install_dir}-Linux-x64
taos_tools_ver=$(git describe --tags | sed -e 's/ver-//g' | awk -F '-' '{print $1}' | sed -e 's/-/_/g')
${csudo}./make-taos-tools-rpm.sh ${top_dir} \
${compile_dir} ${output_dir} ${taos_tools_ver} ${cpuType} ${osType} ${verMode} ${verType}
fi
fi
else
echo "==========rpmbuild command not exist, so not release rpm package!!!"
fi
fi
tar -zcv -f "$(basename ${pkg_name}).tar.gz" $(basename ${install_dir}) --remove-files || :
echo "====do tar.gz package for all systems===="
cd ${script_dir}/tools
${csudo}./makepkg.sh ${compile_dir} ${verNumber} "${build_time}" ${cpuType} ${osType} ${verMode} ${verType} ${pagMode} ${verNumberComp} ${dbName}
${csudo}./makeclient.sh ${compile_dir} ${verNumber} "${build_time}" ${cpuType} ${osType} ${verMode} ${verType} ${pagMode} ${dbName}
# ${csudo}./makearbi.sh ${compile_dir} ${verNumber} "${build_time}" ${cpuType} ${osType} ${verMode} ${verType} ${pagMode}
else
# only make client for Darwin
cd ${script_dir}/tools
./makeclient.sh ${compile_dir} ${verNumber} "${build_time}" ${cpuType} ${osType} ${verMode} ${verType} ${pagMode} ${dbName}
fi
......@@ -74,9 +74,9 @@ if [ -f %{_compiledir}/build/bin/taosadapter ]; then
cp %{_compiledir}/build/bin/taosadapter %{buildroot}%{homepath}/bin ||:
fi
cp %{_compiledir}/build/lib/${libfile} %{buildroot}%{homepath}/driver
cp %{_compiledir}/../src/inc/taos.h %{buildroot}%{homepath}/include
cp %{_compiledir}/../src/inc/taosdef.h %{buildroot}%{homepath}/include
cp %{_compiledir}/../src/inc/taoserror.h %{buildroot}%{homepath}/include
cp %{_compiledir}/../include/client/taos.h %{buildroot}%{homepath}/include
cp %{_compiledir}/../include/common/taosdef.h %{buildroot}%{homepath}/include
cp %{_compiledir}/../include/util/taoserror.h %{buildroot}%{homepath}/include
#cp -r %{_compiledir}/../src/connector/python %{buildroot}%{homepath}/connector
#cp -r %{_compiledir}/../src/connector/go %{buildroot}%{homepath}/connector
#cp -r %{_compiledir}/../src/connector/nodejs %{buildroot}%{homepath}/connector
......
......@@ -485,6 +485,17 @@ function install_service() {
# fi
}
function install_config() {
if [ ! -f ${cfg_install_dir}/${configFile} ]; then
${csudo}mkdir -p ${cfg_install_dir}
[ -f ${script_dir}/cfg/${configFile} ] && ${csudo}cp ${script_dir}/cfg/${configFile} ${cfg_install_dir}
${csudo}chmod 644 ${cfg_install_dir}/*
fi
${csudo}cp -f ${script_dir}/cfg/${configFile} ${install_main_dir}/cfg/${configFile}.org
${csudo}ln -s ${cfg_install_dir}/${configFile} ${install_main_dir}/cfg
}
function install_TDengine() {
# Start to install
echo -e "${GREEN}Start to install TDengine...${NC}"
......@@ -500,7 +511,7 @@ function install_TDengine() {
# For installing new
install_bin
install_service
#install_config
install_config
# Ask if to start the service
#echo
......@@ -539,7 +550,7 @@ function install_TDengine() {
echo
else # Only install client
install_bin
#install_config
install_config
echo
echo -e "\033[44;32;1mTDengine client is installed successfully!${NC}"
fi
......
......@@ -17,7 +17,6 @@ serverName="taosd"
clientName="taos"
uninstallScript="rmtaos"
configFile="taos.cfg"
tarName="taos.tar.gz"
osType=Linux
pagMode=full
......@@ -243,12 +242,6 @@ function install_examples() {
function update_TDengine() {
# Start to update
if [ ! -e ${tarName} ]; then
echo "File ${tarName} does not exist"
exit 1
fi
tar -zxf ${tarName}
echo -e "${GREEN}Start to update ${productName} client...${NC}"
# Stop the client shell if running
if pidof ${clientName} &> /dev/null; then
......@@ -271,18 +264,10 @@ function update_TDengine() {
echo
echo -e "\033[44;32;1m${productName} client is updated successfully!${NC}"
rm -rf $(tar -tf ${tarName})
}
function install_TDengine() {
# Start to install
if [ ! -e ${tarName} ]; then
echo "File ${tarName} does not exist"
exit 1
fi
tar -zxf ${tarName}
echo -e "${GREEN}Start to install ${productName} client...${NC}"
install_main_path
......
......@@ -22,7 +22,7 @@ productName="TDengine"
# create compressed install file.
build_dir="${compile_dir}/build"
code_dir="${top_dir}/src"
code_dir="${top_dir}"
release_dir="${top_dir}/release"
#package_name='linux'
......@@ -36,7 +36,7 @@ fi
bin_files="${build_dir}/bin/tarbitrator ${script_dir}/remove_arbi.sh"
install_files="${script_dir}/install_arbi.sh"
#header_files="${code_dir}/inc/taos.h ${code_dir}/inc/taosdef.h ${code_dir}/inc/taoserror.h"
#header_files="${code_dir}/include/client/taos.h ${code_dir}/include/common/taosdef.h ${code_dir}/include/util/taoserror.h"
init_file_tarbitrator_deb=${script_dir}/../deb/tarbitratord
init_file_tarbitrator_rpm=${script_dir}/../rpm/tarbitratord
......
......@@ -32,7 +32,7 @@ fi
# create compressed install file.
build_dir="${compile_dir}/build"
code_dir="${top_dir}/src"
code_dir="${top_dir}"
release_dir="${top_dir}/release"
#package_name='linux'
......@@ -62,7 +62,7 @@ else
lib_files="${build_dir}/lib/libtaos.${version}.dylib"
fi
header_files="${code_dir}/inc/taos.h ${code_dir}/inc/taosdef.h ${code_dir}/inc/taoserror.h"
header_files="${code_dir}/include/client/taos.h ${code_dir}/include/common/taosdef.h ${code_dir}/include/util/taoserror.h"
if [ "$dbName" != "taos" ]; then
cfg_dir="${top_dir}/../enterprise/packaging/cfg"
else
......@@ -152,7 +152,7 @@ if [[ $productName == "TDengine" ]]; then
cp -r ${examples_dir}/go ${install_dir}/examples
cp -r ${examples_dir}/nodejs ${install_dir}/examples
cp -r ${examples_dir}/C# ${install_dir}/examples
mkdir -p ${install_dir}/examples/taosbenchmark-json && cp ${examples_dir}/../src/kit/taos-tools/example/* ${install_dir}/examples/taosbenchmark-json
mkdir -p ${install_dir}/examples/taosbenchmark-json && cp ${examples_dir}/../tools/taos-tools/example/* ${install_dir}/examples/taosbenchmark-json
fi
if [ "$verMode" == "cluster" ]; then
......@@ -199,8 +199,8 @@ if [[ "$pagMode" != "lite" ]] && [[ "$cpuType" != "aarch32" ]]; then
else
echo "WARNING: go connector not found, please check if want to use it!"
fi
cp -r ${connector_dir}/python ${install_dir}/connector
cp -r ${connector_dir}/nodejs ${install_dir}/connector
cp -r ${connector_dir}/python ${install_dir}/connector || :
cp -r ${connector_dir}/nodejs ${install_dir}/connector || :
fi
# Copy release note
# cp ${script_dir}/release_note ${install_dir}
......
......@@ -33,7 +33,7 @@ defaultPasswd="taosdata"
# create compressed install file.
build_dir="${compile_dir}/build"
code_dir="${top_dir}/src"
code_dir="${top_dir}"
release_dir="${top_dir}/release"
#package_name='linux'
......@@ -43,8 +43,8 @@ else
install_dir="${release_dir}/${productName}-server-${version}"
fi
if [ -d ${top_dir}/src/kit/taos-tools/packaging/deb ]; then
cd ${top_dir}/src/kit/taos-tools/packaging/deb
if [ -d ${top_dir}/tools/taos-tools/packaging/deb ]; then
cd ${top_dir}/tools/taos-tools/packaging/deb
[ -z "$taos_tools_ver" ] && taos_tools_ver="0.1.0"
taostools_ver=$(git describe --tags | sed -e 's/ver-//g' | awk -F '-' '{print $1}')
......@@ -94,7 +94,7 @@ else
fi
lib_files="${build_dir}/lib/libtaos.so.${version}"
header_files="${code_dir}/inc/taos.h ${code_dir}/inc/taosdef.h ${code_dir}/inc/taoserror.h"
header_files="${code_dir}/include/client/taos.h ${code_dir}/include/common/taosdef.h ${code_dir}/include/util/taoserror.h"
if [ "$dbName" != "taos" ]; then
cfg_dir="${top_dir}/../enterprise/packaging/cfg"
......@@ -103,7 +103,7 @@ else
fi
install_files="${script_dir}/install.sh"
nginx_dir="${code_dir}/../../enterprise/src/plugins/web"
nginx_dir="${top_dir}/../enterprise/src/plugins/web"
init_file_deb=${script_dir}/../deb/taosd
init_file_rpm=${script_dir}/../rpm/taosd
......@@ -162,8 +162,8 @@ if [ -n "${taostools_bin_files}" ]; then
&& cp ${taostools_bin_files} ${taostools_install_dir}/bin \
&& chmod a+x ${taostools_install_dir}/bin/* || :
if [ -f ${top_dir}/src/kit/taos-tools/packaging/tools/install-taostools.sh ]; then
cp ${top_dir}/src/kit/taos-tools/packaging/tools/install-taostools.sh \
if [ -f ${top_dir}/tools/taos-tools/packaging/tools/install-taostools.sh ]; then
cp ${top_dir}/tools/taos-tools/packaging/tools/install-taostools.sh \
${taostools_install_dir}/ > /dev/null \
&& chmod a+x ${taostools_install_dir}/install-taostools.sh \
|| echo -e "failed to copy install-taostools.sh"
......@@ -171,8 +171,8 @@ if [ -n "${taostools_bin_files}" ]; then
echo -e "install-taostools.sh not found"
fi
if [ -f ${top_dir}/src/kit/taos-tools/packaging/tools/uninstall-taostools.sh ]; then
cp ${top_dir}/src/kit/taos-tools/packaging/tools/uninstall-taostools.sh \
if [ -f ${top_dir}/tools/taos-tools/packaging/tools/uninstall-taostools.sh ]; then
cp ${top_dir}/tools/taos-tools/packaging/tools/uninstall-taostools.sh \
${taostools_install_dir}/ > /dev/null \
&& chmod a+x ${taostools_install_dir}/uninstall-taostools.sh \
|| echo -e "failed to copy uninstall-taostools.sh"
......@@ -288,7 +288,7 @@ if [[ $dbName == "taos" ]]; then
cp -r ${examples_dir}/go ${install_dir}/examples
cp -r ${examples_dir}/nodejs ${install_dir}/examples
cp -r ${examples_dir}/C# ${install_dir}/examples
mkdir -p ${install_dir}/examples/taosbenchmark-json && cp ${examples_dir}/../src/kit/taos-tools/example/* ${install_dir}/examples/taosbenchmark-json
mkdir -p ${install_dir}/examples/taosbenchmark-json && cp ${examples_dir}/../tools/taos-tools/example/* ${install_dir}/examples/taosbenchmark-json
fi
fi
......
文件模式从 100644 更改为 100755
文件模式从 100644 更改为 100755
......@@ -60,7 +60,7 @@ static void registerRequest(SRequestObj *pRequest) {
static void deregisterRequest(SRequestObj *pRequest) {
assert(pRequest != NULL);
STscObj *pTscObj = pRequest->pTscObj;
STscObj * pTscObj = pRequest->pTscObj;
SInstanceSummary *pActivity = &pTscObj->pAppInfo->summary;
int32_t currentInst = atomic_sub_fetch_64((int64_t *)&pActivity->currentRequests, 1);
......@@ -91,7 +91,6 @@ static bool clientRpcRfp(int32_t code) {
}
}
// TODO refactor
void *openTransporter(const char *user, const char *auth, int32_t numOfThread) {
SRpcInit rpcInit;
......@@ -105,10 +104,6 @@ void *openTransporter(const char *user, const char *auth, int32_t numOfThread) {
rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.user = (char *)user;
rpcInit.idleTime = tsShellActivityTimer * 1000;
rpcInit.ckey = "key";
rpcInit.spi = 1;
rpcInit.secret = (char *)auth;
void *pDnodeConn = rpcOpen(&rpcInit);
if (pDnodeConn == NULL) {
tscError("failed to init connection to server");
......@@ -318,7 +313,7 @@ int taos_options_imp(TSDB_OPTION option, const char *str) {
return 0;
}
SConfig *pCfg = taosGetCfg();
SConfig * pCfg = taosGetCfg();
SConfigItem *pItem = NULL;
switch (option) {
......
......@@ -23,6 +23,8 @@
#include "tmsgtype.h"
#include "tpagedbuf.h"
#include "tref.h"
#include "cJSON.h"
#include "tdataformat.h"
static int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet);
static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest);
......@@ -268,7 +270,7 @@ void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t
if (pSchema[i].type == TSDB_DATA_TYPE_VARCHAR) {
pResInfo->userFields[i].bytes -= VARSTR_HEADER_SIZE;
} else if (pSchema[i].type == TSDB_DATA_TYPE_NCHAR) {
} else if (pSchema[i].type == TSDB_DATA_TYPE_NCHAR || pSchema[i].type == TSDB_DATA_TYPE_JSON) {
pResInfo->userFields[i].bytes = (pResInfo->userFields[i].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
}
......@@ -339,12 +341,11 @@ int32_t validateSversion(SRequestObj* pRequest, void* res) {
}
for (int32_t i = 0; i < pRsp->nBlocks; ++i) {
SSubmitBlkRsp *blk = pRsp->pBlocks + i;
SSubmitBlkRsp* blk = pRsp->pBlocks + i;
STbSVersion tbSver = {.tbFName = blk->tblFName, .sver = blk->sver};
taosArrayPush(pArray, &tbSver);
}
} else if (TDMT_VND_QUERY == pRequest->type) {
}
SCatalog* pCatalog = NULL;
......@@ -369,7 +370,6 @@ void freeRequestRes(SRequestObj* pRequest, void* res) {
if (TDMT_VND_SUBMIT == pRequest->type) {
tFreeSSubmitRsp((SSubmitRsp*)res);
} else if (TDMT_VND_QUERY == pRequest->type) {
}
}
......@@ -805,6 +805,101 @@ static int32_t doPrepareResPtr(SReqResultInfo* pResInfo) {
return TSDB_CODE_SUCCESS;
}
static char* parseTagDatatoJson(void *p){
char* string = NULL;
cJSON *json = cJSON_CreateObject();
if (json == NULL)
{
goto end;
}
int16_t nCols = kvRowNCols(p);
char tagJsonKey[256] = {0};
for (int j = 0; j < nCols; ++j) {
SColIdx * pColIdx = kvRowColIdxAt(p, j);
char* val = (char*)(kvRowColVal(p, pColIdx));
if (j == 0){
if(*val == TSDB_DATA_TYPE_NULL){
string = taosMemoryCalloc(1, 8);
sprintf(varDataVal(string), "%s", TSDB_DATA_NULL_STR_L);
varDataSetLen(string, strlen(varDataVal(string)));
goto end;
}
continue;
}
// json key encode by binary
memset(tagJsonKey, 0, sizeof(tagJsonKey));
memcpy(tagJsonKey, varDataVal(val), varDataLen(val));
// json value
val += varDataTLen(val);
char* realData = POINTER_SHIFT(val, CHAR_BYTES);
char type = *val;
if(type == TSDB_DATA_TYPE_NULL) {
cJSON* value = cJSON_CreateNull();
if (value == NULL)
{
goto end;
}
cJSON_AddItemToObject(json, tagJsonKey, value);
}else if(type == TSDB_DATA_TYPE_NCHAR) {
cJSON* value = NULL;
if (varDataLen(realData) > 0){
char *tagJsonValue = taosMemoryCalloc(varDataLen(realData), 1);
int32_t length = taosUcs4ToMbs((TdUcs4 *)varDataVal(realData), varDataLen(realData), tagJsonValue);
if (length < 0) {
tscError("charset:%s to %s. val:%s convert json value failed.", DEFAULT_UNICODE_ENCODEC, tsCharset, val);
taosMemoryFree(tagJsonValue);
goto end;
}
value = cJSON_CreateString(tagJsonValue);
taosMemoryFree(tagJsonValue);
if (value == NULL)
{
goto end;
}
}else if(varDataLen(realData) == 0){
value = cJSON_CreateString("");
}else{
ASSERT(0);
}
cJSON_AddItemToObject(json, tagJsonKey, value);
}else if(type == TSDB_DATA_TYPE_DOUBLE){
double jsonVd = *(double*)(realData);
cJSON* value = cJSON_CreateNumber(jsonVd);
if (value == NULL)
{
goto end;
}
cJSON_AddItemToObject(json, tagJsonKey, value);
// }else if(type == TSDB_DATA_TYPE_BIGINT){
// int64_t jsonVd = *(int64_t*)(realData);
// cJSON* value = cJSON_CreateNumber((double)jsonVd);
// if (value == NULL)
// {
// goto end;
// }
// cJSON_AddItemToObject(json, tagJsonKey, value);
}else if (type == TSDB_DATA_TYPE_BOOL) {
char jsonVd = *(char*)(realData);
cJSON* value = cJSON_CreateBool(jsonVd);
if (value == NULL)
{
goto end;
}
cJSON_AddItemToObject(json, tagJsonKey, value);
}else{
ASSERT(0);
}
}
string = cJSON_PrintUnformatted(json);
end:
cJSON_Delete(json);
return string;
}
static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t numOfRows, int32_t numOfCols, int32_t* colLength) {
for (int32_t i = 0; i < numOfCols; ++i) {
int32_t type = pResultInfo->fields[i].type;
......@@ -835,9 +930,7 @@ static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t numOfRows, int
pResultInfo->pCol[i].pData = pResultInfo->convertBuf[i];
pResultInfo->row[i] = pResultInfo->pCol[i].pData;
}
if (type == TSDB_DATA_TYPE_JSON) {
}else if (type == TSDB_DATA_TYPE_JSON && colLength[i] > 0) {
char* p = taosMemoryRealloc(pResultInfo->convertBuf[i], colLength[i]);
if (p == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
......@@ -850,6 +943,7 @@ static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t numOfRows, int
if (pCol->offset[j] != -1) {
char* pStart = pCol->offset[j] + pCol->pData;
int32_t jsonInnerType = *pStart;
char* jsonInnerData = pStart + CHAR_BYTES;
char dst[TSDB_MAX_JSON_TAG_LEN] = {0};
......@@ -857,15 +951,9 @@ static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t numOfRows, int
sprintf(varDataVal(dst), "%s", TSDB_DATA_NULL_STR_L);
varDataSetLen(dst, strlen(varDataVal(dst)));
} else if (jsonInnerType == TSDB_DATA_TYPE_JSON) {
int32_t length =
taosUcs4ToMbs((TdUcs4*)varDataVal(jsonInnerData), varDataLen(jsonInnerData), varDataVal(dst));
if (length <= 0) {
tscError("charset:%s to %s. val:%s convert failed.", DEFAULT_UNICODE_ENCODEC, tsCharset,
varDataVal(jsonInnerData));
length = 0;
}
varDataSetLen(dst, length);
char *jsonString = parseTagDatatoJson(jsonInnerData);
STR_TO_VARSTR(dst, jsonString);
taosMemoryFree(jsonString);
} else if (jsonInnerType == TSDB_DATA_TYPE_NCHAR) { // value -> "value"
*(char*)varDataVal(dst) = '\"';
int32_t length = taosUcs4ToMbs((TdUcs4*)varDataVal(jsonInnerData), varDataLen(jsonInnerData),
......@@ -1022,7 +1110,6 @@ TSDB_SERVER_STATUS taos_check_server_status(const char* fqdn, int port, char* de
SRpcInit rpcInit = {0};
char pass[TSDB_PASSWORD_LEN + 1] = {0};
taosEncryptPass_c((uint8_t*)("_pwd"), strlen("_pwd"), pass);
rpcInit.label = "CHK";
rpcInit.numOfThreads = 1;
rpcInit.cfp = NULL;
......@@ -1030,9 +1117,6 @@ TSDB_SERVER_STATUS taos_check_server_status(const char* fqdn, int port, char* de
rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.idleTime = tsShellActivityTimer * 1000;
rpcInit.user = "_dnd";
rpcInit.ckey = "_key";
rpcInit.spi = 1;
rpcInit.secret = pass;
clientRpc = rpcOpen(&rpcInit);
if (clientRpc == NULL) {
......
......@@ -67,8 +67,10 @@ for (int i = 1; i < keyLen; ++i) { \
#define TS "_ts"
#define TS_LEN 3
#define TAG "_tagNone"
#define TAG_LEN 8
#define TAG "_tag"
#define TAG_LEN 4
#define TAG_VALUE "NULL"
#define TAG_VALUE_LEN 4
#define VALUE "value"
#define VALUE_LEN 5
......@@ -598,25 +600,33 @@ static bool smlParseNumber(SSmlKv *kvVal, SSmlMsgBuf *msg){
kvVal->type = TSDB_DATA_TYPE_FLOAT;
kvVal->f = (float)result;
}else if ((left == 1 && *endptr == 'i') || (left == 3 && strncasecmp(endptr, "i64", left) == 0)){
if(result >= (double)INT64_MAX){
kvVal->i = INT64_MAX;
}else if(result <= (double)INT64_MIN){
kvVal->i = INT64_MIN;
}else{
kvVal->i = result;
if(smlDoubleToInt64OverFlow(result)){
errno = 0;
int64_t tmp = taosStr2Int64(pVal, &endptr, 10);
if(errno == ERANGE){
smlBuildInvalidDataMsg(msg, "big int out of range[-9223372036854775808,9223372036854775807]", pVal);
return false;
}
kvVal->type = TSDB_DATA_TYPE_BIGINT;
kvVal->i = tmp;
return true;
}
kvVal->type = TSDB_DATA_TYPE_BIGINT;
kvVal->i = (int64_t)result;
}else if ((left == 3 && strncasecmp(endptr, "u64", left) == 0)){
if(result < 0){
smlBuildInvalidDataMsg(msg, "unsigned big int is too large, out of precision", pVal);
if(result >= (double)UINT64_MAX || result < 0){
errno = 0;
uint64_t tmp = taosStr2UInt64(pVal, &endptr, 10);
if(errno == ERANGE || result < 0){
smlBuildInvalidDataMsg(msg, "unsigned big int out of range[0,18446744073709551615]", pVal);
return false;
}
if(result >= (double)UINT64_MAX){
kvVal->u = UINT64_MAX;
}else{
kvVal->u = result;
kvVal->type = TSDB_DATA_TYPE_UBIGINT;
kvVal->u = tmp;
return true;
}
kvVal->type = TSDB_DATA_TYPE_UBIGINT;
kvVal->u = result;
}else if (left == 3 && strncasecmp(endptr, "i32", left) == 0){
if(!IS_VALID_INT(result)){
smlBuildInvalidDataMsg(msg, "int out of range[-2147483648,2147483647]", pVal);
......@@ -1103,8 +1113,7 @@ static int32_t smlParseTelnetString(SSmlHandle *info, const char* sql, SSmlTable
kv->keyLen = VALUE_LEN;
kv->value = value;
kv->length = valueLen;
if(!smlParseValue(kv, &info->msgBuf) || kv->type == TSDB_DATA_TYPE_BINARY
|| kv->type == TSDB_DATA_TYPE_NCHAR || kv->type == TSDB_DATA_TYPE_BOOL){
if(!smlParseValue(kv, &info->msgBuf)){
return TSDB_CODE_SML_INVALID_DATA;
}
......@@ -1124,8 +1133,8 @@ static int32_t smlParseCols(const char* data, int32_t len, SArray *cols, char *c
if(!kv) return TSDB_CODE_OUT_OF_MEMORY;
kv->key = TAG;
kv->keyLen = TAG_LEN;
kv->value = TAG;
kv->length = TAG_LEN;
kv->value = TAG_VALUE;
kv->length = TAG_VALUE_LEN;
kv->type = TSDB_DATA_TYPE_NCHAR;
if(cols) taosArrayPush(cols, &kv);
return TSDB_CODE_SUCCESS;
......@@ -2264,6 +2273,7 @@ static int32_t smlParseLine(SSmlHandle *info, char* lines[], int numLines){
uError("SML:0x%" PRIx64 " smlParseJSON failed:%s", info->id, *lines);
return code;
}
return code;
}
for (int32_t i = 0; i < numLines; ++i) {
......
......@@ -1435,7 +1435,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t wait_time) {
while (1) {
tmqHandleAllDelayedTask(tmq);
tmqPollImpl(tmq, wait_time);
if (tmqPollImpl(tmq, wait_time) < 0) return NULL;
rspObj = tmqHandleAllRsp(tmq, wait_time, false);
if (rspObj) {
......
......@@ -567,6 +567,7 @@ TEST(testCase, insert_test) {
taos_free_result(pRes);
taos_close(pConn);
}
#endif
TEST(testCase, projection_query_tables) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
......@@ -605,7 +606,7 @@ TEST(testCase, projection_query_tables) {
}
taos_free_result(pRes);
for(int32_t i = 0; i < 10000000; i += 20) {
for(int32_t i = 0; i < 100000; i += 20) {
char sql[1024] = {0};
sprintf(sql,
"insert into tu values(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)"
......@@ -625,7 +626,7 @@ TEST(testCase, projection_query_tables) {
printf("start to insert next table\n");
for(int32_t i = 0; i < 10000000; i += 20) {
for(int32_t i = 0; i < 100000; i += 20) {
char sql[1024] = {0};
sprintf(sql,
"insert into tu2 values(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)"
......@@ -692,8 +693,6 @@ TEST(testCase, projection_query_stables) {
taos_close(pConn);
}
#endif
TEST(testCase, agg_query_tables) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(pConn, nullptr);
......
......@@ -208,6 +208,7 @@ TEST(testCase, smlParseCols_Error_Test) {
memcpy(sql, data[i], len + 1);
SArray *cols = taosArrayInit(8, POINTER_BYTES);
int32_t ret = smlParseCols(sql, len, cols, NULL, false, dumplicateKey, &msgBuf);
printf("i:%d\n",i);
ASSERT_NE(ret, TSDB_CODE_SUCCESS);
taosHashClear(dumplicateKey);
taosMemoryFree(sql);
......@@ -272,11 +273,11 @@ TEST(testCase, smlParseCols_tag_Test) {
// nchar
kv = (SSmlKv *)taosArrayGetP(cols, 0);
ASSERT_EQ(strncasecmp(kv->key, TAG, strlen(TAG)), 0);
ASSERT_EQ(kv->keyLen, strlen(TAG));
ASSERT_EQ(strncasecmp(kv->key, TAG, TAG_LEN), 0);
ASSERT_EQ(kv->keyLen, TAG_LEN);
ASSERT_EQ(kv->type, TSDB_DATA_TYPE_NCHAR);
ASSERT_EQ(kv->length, strlen(TAG));
ASSERT_EQ(strncasecmp(kv->value, TAG, strlen(TAG)), 0);
ASSERT_EQ(kv->length, TAG_LEN);
ASSERT_EQ(strncasecmp(kv->value, TAG_VALUE, TAG_VALUE_LEN), 0);
taosMemoryFree(kv);
taosArrayDestroy(cols);
......@@ -506,7 +507,7 @@ TEST(testCase, smlProcess_influx_Test) {
"readings,name=truck_0,fleet=South,driver=Trish,model=H-2,device_version=v2.3 load_capacity=1500,fuel_capacity=150,nominal_fuel_consumption=12,latitude=52.31854,longitude=4.72037,elevation=124,heading=221,grade=0,fuel_consumption=25 1451608403000000000",
"readings,name=truck_0,fleet=South,driver=Trish,model=H-2,device_version=v2.3 fuel_capacity=150,nominal_fuel_consumption=12,latitude=52.31854,longitude=4.72037,elevation=124,velocity=0,heading=221,grade=0,fuel_consumption=25 1451609404000000000",
"readings,name=truck_0,fleet=South,driver=Trish,model=H-2,device_version=v2.3 fuel_consumption=25,grade=0 1451619405000000000",
"readings,name=truck_1,fleet=South,driver=Albert,model=F-150,device_version=v1.5 load_capacity=2000,fuel_capacity=200,nominal_fuel_consumption=15,latitude=72.45258,longitude=68.83761,elevation=255,velocity=0,heading=181,grade=0,fuel_consumption=25 145160640600000000",
"readings,name=truck_1,fleet=South,driver=Albert,model=F-150,device_version=v1.5 load_capacity=2000,fuel_capacity=200,nominal_fuel_consumption=15,latitude=72.45258,longitude=68.83761,elevation=255,velocity=0,heading=181,grade=0,fuel_consumption=25 1451606406000000000",
"readings,name=truck_2,driver=Derek,model=F-150,device_version=v1.5 load_capacity=2000,fuel_capacity=200,nominal_fuel_consumption=15,latitude=24.5208,longitude=28.09377,elevation=428,velocity=0,heading=304,grade=0,fuel_consumption=25 1451606407000000000",
"readings,name=truck_2,fleet=North,driver=Derek,model=F-150 load_capacity=2000,fuel_capacity=200,nominal_fuel_consumption=15,latitude=24.5208,longitude=28.09377,elevation=428,velocity=0,heading=304,grade=0,fuel_consumption=25 1451609408000000000",
"readings,fleet=South,name=truck_0,driver=Trish,model=H-2,device_version=v2.3 fuel_consumption=25,grade=0 1451629409000000000",
......@@ -745,7 +746,7 @@ TEST(testCase, smlProcess_json1_Test) {
" }\n"
" }\n"
"]";
int ret = smlProcess(info, (char **)(&sql), -1);
int ret = smlProcess(info, (char **)(&sql), 1);
ASSERT_EQ(ret, 0);
// case 1
......@@ -1221,3 +1222,55 @@ TEST(testCase, sml_TD15662_Test) {
taos_free_result(res);
}
TEST(testCase, sml_TD15735_Test) {
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(taos, nullptr);
TAOS_RES* pRes = taos_query(taos, "create database if not exists sml_db");
taos_free_result(pRes);
pRes = taos_query(taos, "use sml_db");
taos_free_result(pRes);
SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, NULL, NULL, TSDB_SQL_INSERT);
ASSERT_NE(request, nullptr);
SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_TELNET_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS);
ASSERT_NE(info, nullptr);
const char *sql[1] = {
"{'metric': 'pekoiw', 'timestamp': {'value': 1626006833639000000, 'type': 'ns'}, 'value': {'value': False, 'type': 'bool'}, 'tags': {'t0': {'value': True, 'type': 'bool'}, 't1': {'value': 127, 'type': 'tinyint'}, 't2': {'value': 32767, 'type': 'smallint'}, 't3': {'value': 2147483647, 'type': 'int'}, 't4': {'value': 9223372036854775807, 'type': 'bigint'}, 't5': {'value': 11.12345027923584, 'type': 'float'}, 't6': {'value': 22.123456789, 'type': 'double'}, 't7': {'value': 'binaryTagValue', 'type': 'binary'}, 't8': {'value': 'ncharTagValue', 'type': 'nchar'}}}",
};
int32_t ret = smlProcess(info, (char**)sql, sizeof(sql)/sizeof(sql[0]));
ASSERT_NE(ret, 0);
destroyRequest(request);
smlDestroyInfo(info);
}
TEST(testCase, sml_TD15742_Test) {
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(taos, nullptr);
TAOS_RES* pRes = taos_query(taos, "create database if not exists TD15742");
taos_free_result(pRes);
pRes = taos_query(taos, "use TD15742");
taos_free_result(pRes);
SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, NULL, NULL, TSDB_SQL_INSERT);
ASSERT_NE(request, nullptr);
SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_TELNET_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS);
ASSERT_NE(info, nullptr);
const char *sql[] = {
"zgzbix 1626006833641 False id=zgzbix_992_38861 t0=t t1=127i8 t2=32767i16 t3=2147483647i32 t4=9223372036854775807i64 t5=11.12345f32 t6=22.123456789f64 t7=\"binaryTagValue\" t8=L\"ncharTagValue\"",
};
int ret = smlProcess(info, (char**)sql, sizeof(sql)/sizeof(sql[0]));
ASSERT_EQ(ret, 0);
destroyRequest(request);
smlDestroyInfo(info);
}
\ No newline at end of file
......@@ -122,10 +122,14 @@ int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, con
dataLen = 0;
} else if (*pData == TSDB_DATA_TYPE_NCHAR) {
dataLen = varDataTLen(pData + CHAR_BYTES);
} else if (*pData == TSDB_DATA_TYPE_BIGINT || *pData == TSDB_DATA_TYPE_DOUBLE) {
dataLen = LONG_BYTES;
} else if (*pData == TSDB_DATA_TYPE_DOUBLE) {
dataLen = DOUBLE_BYTES;
} else if (*pData == TSDB_DATA_TYPE_BOOL) {
dataLen = CHAR_BYTES;
} else if (*pData == TSDB_DATA_TYPE_JSON) {
dataLen = kvRowLen(pData + CHAR_BYTES);
} else {
ASSERT(0);
}
dataLen += CHAR_BYTES;
}
......
......@@ -40,11 +40,11 @@ bool tsPrintAuth = false;
// multi process
int32_t tsMultiProcess = 0;
int32_t tsMnodeShmSize = TSDB_MAX_WAL_SIZE * 2 + 128;
int32_t tsVnodeShmSize = TSDB_MAX_WAL_SIZE * 10 + 128;
int32_t tsQnodeShmSize = TSDB_MAX_WAL_SIZE * 4 + 128;
int32_t tsSnodeShmSize = TSDB_MAX_WAL_SIZE * 4 + 128;
int32_t tsBnodeShmSize = TSDB_MAX_WAL_SIZE * 4 + 128;
int32_t tsMnodeShmSize = TSDB_MAX_WAL_SIZE * 2 + 1024;
int32_t tsVnodeShmSize = TSDB_MAX_WAL_SIZE * 10 + 1024;
int32_t tsQnodeShmSize = TSDB_MAX_WAL_SIZE * 4 + 1024;
int32_t tsSnodeShmSize = TSDB_MAX_WAL_SIZE * 4 + 1024;
int32_t tsBnodeShmSize = TSDB_MAX_WAL_SIZE * 4 + 1024;
int32_t tsNumOfShmThreads = 1;
// queue & threads
......@@ -380,11 +380,11 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddBool(pCfg, "deadLockKillQuery", tsDeadLockKillQuery, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "multiProcess", tsMultiProcess, 0, 2, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "mnodeShmSize", tsMnodeShmSize, TSDB_MAX_WAL_SIZE + 128, INT32_MAX, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "vnodeShmSize", tsVnodeShmSize, TSDB_MAX_WAL_SIZE + 128, INT32_MAX, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "qnodeShmSize", tsQnodeShmSize, TSDB_MAX_WAL_SIZE + 128, INT32_MAX, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "snodeShmSize", tsSnodeShmSize, TSDB_MAX_WAL_SIZE + 128, INT32_MAX, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "bnodeShmSize", tsBnodeShmSize, TSDB_MAX_WAL_SIZE + 128, INT32_MAX, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "mnodeShmSize", tsMnodeShmSize, TSDB_MAX_WAL_SIZE * 2 + 1024, INT32_MAX, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "vnodeShmSize", tsVnodeShmSize, TSDB_MAX_WAL_SIZE * 2 + 1024, INT32_MAX, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "qnodeShmSize", tsQnodeShmSize, TSDB_MAX_WAL_SIZE * 2 + 1024, INT32_MAX, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "snodeShmSize", tsSnodeShmSize, TSDB_MAX_WAL_SIZE * 2 + 1024, INT32_MAX, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "bnodeShmSize", tsBnodeShmSize, TSDB_MAX_WAL_SIZE * 2 + 1024, INT32_MAX, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "mumOfShmThreads", tsNumOfShmThreads, 1, 1024, 0) != 0) return -1;
tsNumOfRpcThreads = tsNumOfCores / 2;
......
......@@ -2627,6 +2627,35 @@ int32_t tDeserializeSMDropTopicReq(void *buf, int32_t bufLen, SMDropTopicReq *pR
return 0;
}
int32_t tSerializeSMDropCgroupReq(void *buf, int32_t bufLen, SMDropCgroupReq *pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->topic) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->cgroup) < 0) return -1;
if (tEncodeI8(&encoder, pReq->igNotExists) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSMDropCgroupReq(void *buf, int32_t bufLen, SMDropCgroupReq *pReq) {
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->topic) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->cgroup) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->igNotExists) < 0) return -1;
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
int32_t tSerializeSCMCreateTopicReq(void *buf, int32_t bufLen, const SCMCreateTopicReq *pReq) {
int32_t sqlLen = 0;
int32_t astLen = 0;
......
......@@ -36,9 +36,9 @@ typedef struct SBnodeMgmt {
// bmHandle.c
SArray *bmGetMsgHandles();
int32_t bmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pReq);
int32_t bmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pReq);
int32_t bmProcessGetMonBmInfoReq(SBnodeMgmt *pMgmt, SRpcMsg *pReq);
int32_t bmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg);
int32_t bmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg);
int32_t bmProcessGetMonBmInfoReq(SBnodeMgmt *pMgmt, SRpcMsg *pMsg);
// bmWorker.c
int32_t bmStartWorker(SBnodeMgmt *pMgmt);
......
......@@ -18,7 +18,7 @@
void bmGetMonitorInfo(SBnodeMgmt *pMgmt, SMonBmInfo *bmInfo) {}
int32_t bmProcessGetMonBmInfoReq(SBnodeMgmt *pMgmt, SRpcMsg *pReq) {
int32_t bmProcessGetMonBmInfoReq(SBnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SMonBmInfo bmInfo = {0};
bmGetMonitorInfo(pMgmt, &bmInfo);
dmGetMonitorSystemInfo(&bmInfo.sys);
......@@ -37,17 +37,15 @@ int32_t bmProcessGetMonBmInfoReq(SBnodeMgmt *pMgmt, SRpcMsg *pReq) {
}
tSerializeSMonBmInfo(pRsp, rspLen, &bmInfo);
pReq->info.rsp = pRsp;
pReq->info.rspLen = rspLen;
pMsg->info.rsp = pRsp;
pMsg->info.rspLen = rspLen;
tFreeSMonBmInfo(&bmInfo);
return 0;
}
int32_t bmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
SRpcMsg *pReq = pMsg;
SDCreateBnodeReq createReq = {0};
if (tDeserializeSCreateDropMQSBNodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) {
if (tDeserializeSCreateDropMQSBNodeReq(pMsg->pCont, pMsg->contLen, &createReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
......@@ -68,10 +66,8 @@ int32_t bmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
}
int32_t bmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
SRpcMsg *pReq = pMsg;
SDDropBnodeReq dropReq = {0};
if (tDeserializeSCreateDropMQSBNodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
if (tDeserializeSCreateDropMQSBNodeReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
......
......@@ -41,18 +41,18 @@ typedef struct SMnodeMgmt {
// mmFile.c
int32_t mmReadFile(SMnodeMgmt *pMgmt, bool *pDeployed);
int32_t mmWriteFile(SMnodeMgmt *pMgmt, SDCreateMnodeReq *pReq, bool deployed);
int32_t mmWriteFile(SMnodeMgmt *pMgmt, SDCreateMnodeReq *pMsg, bool deployed);
// mmInt.c
int32_t mmAlter(SMnodeMgmt *pMgmt, SDAlterMnodeReq *pReq);
int32_t mmAlter(SMnodeMgmt *pMgmt, SDAlterMnodeReq *pMsg);
// mmHandle.c
SArray *mmGetMsgHandles();
int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg);
int32_t mmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg);
int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmProcessGetMonitorInfoReq(SMnodeMgmt *pMgmt, SRpcMsg *pReq);
int32_t mmProcessGetLoadsReq(SMnodeMgmt *pMgmt, SRpcMsg *pReq);
int32_t mmProcessGetMonitorInfoReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmProcessGetLoadsReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
// mmWorker.c
int32_t mmStartWorker(SMnodeMgmt *pMgmt);
......@@ -62,10 +62,10 @@ int32_t mmPutNodeMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmPutNodeMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmPutNodeMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmPutNodeMsgToMonitorQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmPutRpcMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pRpc);
int32_t mmPutRpcMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pRpc);
int32_t mmPutRpcMsgToWriteQueue(SMnodeMgmt *pMgmt, SRpcMsg *pRpc);
int32_t mmPutRpcMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pRpc);
int32_t mmPutRpcMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmPutRpcMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmPutRpcMsgToWriteQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmPutRpcMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
#ifdef __cplusplus
}
......
......@@ -104,7 +104,7 @@ _OVER:
return code;
}
int32_t mmWriteFile(SMnodeMgmt *pMgmt, SDCreateMnodeReq *pReq, bool deployed) {
int32_t mmWriteFile(SMnodeMgmt *pMgmt, SDCreateMnodeReq *pMsg, bool deployed) {
char file[PATH_MAX] = {0};
char realfile[PATH_MAX] = {0};
snprintf(file, sizeof(file), "%s%smnode.json.bak", pMgmt->path, TD_DIRSEP);
......@@ -124,11 +124,11 @@ int32_t mmWriteFile(SMnodeMgmt *pMgmt, SDCreateMnodeReq *pReq, bool deployed) {
len += snprintf(content + len, maxLen - len, "{\n");
len += snprintf(content + len, maxLen - len, " \"mnodes\": [{\n");
int8_t replica = (pReq != NULL ? pReq->replica : pMgmt->replica);
int8_t replica = (pMsg != NULL ? pMsg->replica : pMgmt->replica);
for (int32_t i = 0; i < replica; ++i) {
SReplica *pReplica = &pMgmt->replicas[i];
if (pReq != NULL) {
pReplica = &pReq->replicas[i];
if (pMsg != NULL) {
pReplica = &pMsg->replicas[i];
}
len += snprintf(content + len, maxLen - len, " \"id\": %d,\n", pReplica->id);
len += snprintf(content + len, maxLen - len, " \"fqdn\": \"%s\",\n", pReplica->fqdn);
......
......@@ -25,7 +25,7 @@ void mmGetMnodeLoads(SMnodeMgmt *pMgmt, SMonMloadInfo *pInfo) {
mndGetLoad(pMgmt->pMnode, &pInfo->load);
}
int32_t mmProcessGetMonitorInfoReq(SMnodeMgmt *pMgmt, SRpcMsg *pReq) {
int32_t mmProcessGetMonitorInfoReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SMonMmInfo mmInfo = {0};
mmGetMonitorInfo(pMgmt, &mmInfo);
dmGetMonitorSystemInfo(&mmInfo.sys);
......@@ -44,13 +44,13 @@ int32_t mmProcessGetMonitorInfoReq(SMnodeMgmt *pMgmt, SRpcMsg *pReq) {
}
tSerializeSMonMmInfo(pRsp, rspLen, &mmInfo);
pReq->info.rsp = pRsp;
pReq->info.rspLen = rspLen;
pMsg->info.rsp = pRsp;
pMsg->info.rspLen = rspLen;
tFreeSMonMmInfo(&mmInfo);
return 0;
}
int32_t mmProcessGetLoadsReq(SMnodeMgmt *pMgmt, SRpcMsg *pReq) {
int32_t mmProcessGetLoadsReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SMonMloadInfo mloads = {0};
mmGetMnodeLoads(pMgmt, &mloads);
......@@ -67,16 +67,14 @@ int32_t mmProcessGetLoadsReq(SMnodeMgmt *pMgmt, SRpcMsg *pReq) {
}
tSerializeSMonMloadInfo(pRsp, rspLen, &mloads);
pReq->info.rsp = pRsp;
pReq->info.rspLen = rspLen;
pMsg->info.rsp = pRsp;
pMsg->info.rspLen = rspLen;
return 0;
}
int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
SRpcMsg *pReq = pMsg;
SDCreateMnodeReq createReq = {0};
if (tDeserializeSDCreateMnodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) {
if (tDeserializeSDCreateMnodeReq(pMsg->pCont, pMsg->contLen, &createReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
......@@ -101,10 +99,8 @@ int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
}
int32_t mmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
SRpcMsg *pReq = pMsg;
SDDropMnodeReq dropReq = {0};
if (tDeserializeSCreateDropMQSBNodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
if (tDeserializeSCreateDropMQSBNodeReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
......@@ -129,10 +125,8 @@ int32_t mmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
}
int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SRpcMsg *pReq = pMsg;
SDAlterMnodeReq alterReq = {0};
if (tDeserializeSDCreateMnodeReq(pReq->pCont, pReq->contLen, &alterReq) != 0) {
if (tDeserializeSDCreateMnodeReq(pMsg->pCont, pMsg->contLen, &alterReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
......
......@@ -87,9 +87,9 @@ static int32_t mmBuildOptionFromReq(SMnodeMgmt *pMgmt, SMnodeOpt *pOption, SDCre
return 0;
}
int32_t mmAlter(SMnodeMgmt *pMgmt, SDAlterMnodeReq *pReq) {
int32_t mmAlter(SMnodeMgmt *pMgmt, SDAlterMnodeReq *pMsg) {
SMnodeOpt option = {0};
if (mmBuildOptionFromReq(pMgmt, &option, pReq) != 0) {
if (mmBuildOptionFromReq(pMgmt, &option, pMsg) != 0) {
return -1;
}
......@@ -98,7 +98,7 @@ int32_t mmAlter(SMnodeMgmt *pMgmt, SDAlterMnodeReq *pReq) {
}
bool deployed = true;
if (mmWriteFile(pMgmt, pReq, deployed) != 0) {
if (mmWriteFile(pMgmt, pMsg, deployed) != 0) {
dError("failed to write mnode file since %s", terrstr());
return -1;
}
......@@ -135,7 +135,7 @@ static int32_t mmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
pMgmt->msgCb.queueFps[QUERY_QUEUE] = (PutToQueueFp)mmPutRpcMsgToQueryQueue;
pMgmt->msgCb.queueFps[READ_QUEUE] = (PutToQueueFp)mmPutRpcMsgToReadQueue;
pMgmt->msgCb.queueFps[WRITE_QUEUE] = (PutToQueueFp)mmPutRpcMsgToWriteQueue;
pMgmt->msgCb.queueFps[SYNC_QUEUE] = (PutToQueueFp)mmPutRpcMsgToWriteQueue;
pMgmt->msgCb.queueFps[SYNC_QUEUE] = (PutToQueueFp)mmPutRpcMsgToSyncQueue;
pMgmt->msgCb.mgmt = pMgmt;
bool deployed = false;
......
......@@ -126,7 +126,7 @@ int32_t mmPutRpcMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return mmPutRpcMsgToWorker(&pMgmt->readWorker, pMsg);
}
int32_t mmPutMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { return mmPutRpcMsgToWorker(&pMgmt->syncWorker, pMsg); }
int32_t mmPutRpcMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { return mmPutRpcMsgToWorker(&pMgmt->syncWorker, pMsg); }
int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
SSingleWorkerCfg qCfg = {
......
......@@ -39,7 +39,7 @@ typedef struct SQnodeMgmt {
SArray *qmGetMsgHandles();
int32_t qmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg);
int32_t qmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg);
int32_t qmProcessGetMonitorInfoReq(SQnodeMgmt *pMgmt, SRpcMsg *pReq);
int32_t qmProcessGetMonitorInfoReq(SQnodeMgmt *pMgmt, SRpcMsg *pMsg);
// qmWorker.c
int32_t qmPutRpcMsgToQueryQueue(SQnodeMgmt *pMgmt, SRpcMsg *pMsg);
......
......@@ -18,7 +18,7 @@
void qmGetMonitorInfo(SQnodeMgmt *pMgmt, SMonQmInfo *qmInfo) {}
int32_t qmProcessGetMonitorInfoReq(SQnodeMgmt *pMgmt, SRpcMsg *pReq) {
int32_t qmProcessGetMonitorInfoReq(SQnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SMonQmInfo qmInfo = {0};
qmGetMonitorInfo(pMgmt, &qmInfo);
dmGetMonitorSystemInfo(&qmInfo.sys);
......@@ -37,17 +37,15 @@ int32_t qmProcessGetMonitorInfoReq(SQnodeMgmt *pMgmt, SRpcMsg *pReq) {
}
tSerializeSMonQmInfo(pRsp, rspLen, &qmInfo);
pReq->info.rsp = pRsp;
pReq->info.rspLen = rspLen;
pMsg->info.rsp = pRsp;
pMsg->info.rspLen = rspLen;
tFreeSMonQmInfo(&qmInfo);
return 0;
}
int32_t qmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
SRpcMsg *pReq = pMsg;
SDCreateQnodeReq createReq = {0};
if (tDeserializeSCreateDropMQSBNodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) {
if (tDeserializeSCreateDropMQSBNodeReq(pMsg->pCont, pMsg->contLen, &createReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
......@@ -68,10 +66,8 @@ int32_t qmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
}
int32_t qmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
SRpcMsg *pReq = pMsg;
SDDropQnodeReq dropReq = {0};
if (tDeserializeSCreateDropMQSBNodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
if (tDeserializeSCreateDropMQSBNodeReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
......
......@@ -40,7 +40,7 @@ typedef struct SSnodeMgmt {
SArray *smGetMsgHandles();
int32_t smProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg);
int32_t smProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg);
int32_t smProcessGetMonitorInfoReq(SSnodeMgmt *pMgmt, SRpcMsg *pReq);
int32_t smProcessGetMonitorInfoReq(SSnodeMgmt *pMgmt, SRpcMsg *pMsg);
// smWorker.c
int32_t smStartWorker(SSnodeMgmt *pMgmt);
......
......@@ -18,7 +18,7 @@
void smGetMonitorInfo(SSnodeMgmt *pMgmt, SMonSmInfo *smInfo) {}
int32_t smProcessGetMonitorInfoReq(SSnodeMgmt *pMgmt, SRpcMsg *pReq) {
int32_t smProcessGetMonitorInfoReq(SSnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SMonSmInfo smInfo = {0};
smGetMonitorInfo(pMgmt, &smInfo);
dmGetMonitorSystemInfo(&smInfo.sys);
......@@ -37,17 +37,15 @@ int32_t smProcessGetMonitorInfoReq(SSnodeMgmt *pMgmt, SRpcMsg *pReq) {
}
tSerializeSMonSmInfo(pRsp, rspLen, &smInfo);
pReq->info.rsp = pRsp;
pReq->info.rspLen = rspLen;
pMsg->info.rsp = pRsp;
pMsg->info.rspLen = rspLen;
tFreeSMonSmInfo(&smInfo);
return 0;
}
int32_t smProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
SRpcMsg *pReq = pMsg;
SDCreateSnodeReq createReq = {0};
if (tDeserializeSCreateDropMQSBNodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) {
if (tDeserializeSCreateDropMQSBNodeReq(pMsg->pCont, pMsg->contLen, &createReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
......@@ -68,10 +66,8 @@ int32_t smProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
}
int32_t smProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
SRpcMsg *pReq = pMsg;
SDDropSnodeReq dropReq = {0};
if (tDeserializeSCreateDropMQSBNodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
if (tDeserializeSCreateDropMQSBNodeReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
......
......@@ -84,10 +84,10 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode);
// vmHandle.c
SArray *vmGetMsgHandles();
int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pReq);
int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pReq);
int32_t vmProcessGetMonitorInfoReq(SVnodeMgmt *pMgmt, SRpcMsg *pReq);
int32_t vmProcessGetLoadsReq(SVnodeMgmt *pMgmt, SRpcMsg *pReq);
int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmProcessGetMonitorInfoReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmProcessGetLoadsReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
// vmFile.c
int32_t vmGetVnodeListFromFile(SVnodeMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t *numOfVnodes);
......
......@@ -82,7 +82,7 @@ void vmGetMonitorInfo(SVnodeMgmt *pMgmt, SMonVmInfo *pInfo) {
taosArrayDestroy(pVloads);
}
int32_t vmProcessGetMonitorInfoReq(SVnodeMgmt *pMgmt, SRpcMsg *pReq) {
int32_t vmProcessGetMonitorInfoReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SMonVmInfo vmInfo = {0};
vmGetMonitorInfo(pMgmt, &vmInfo);
dmGetMonitorSystemInfo(&vmInfo.sys);
......@@ -101,13 +101,13 @@ int32_t vmProcessGetMonitorInfoReq(SVnodeMgmt *pMgmt, SRpcMsg *pReq) {
}
tSerializeSMonVmInfo(pRsp, rspLen, &vmInfo);
pReq->info.rsp = pRsp;
pReq->info.rspLen = rspLen;
pMsg->info.rsp = pRsp;
pMsg->info.rspLen = rspLen;
tFreeSMonVmInfo(&vmInfo);
return 0;
}
int32_t vmProcessGetLoadsReq(SVnodeMgmt *pMgmt, SRpcMsg *pReq) {
int32_t vmProcessGetLoadsReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SMonVloadInfo vloads = {0};
vmGetVnodeLoads(pMgmt, &vloads);
......@@ -124,8 +124,8 @@ int32_t vmProcessGetLoadsReq(SVnodeMgmt *pMgmt, SRpcMsg *pReq) {
}
tSerializeSMonVloadInfo(pRsp, rspLen, &vloads);
pReq->info.rsp = pRsp;
pReq->info.rspLen = rspLen;
pMsg->info.rsp = pRsp;
pMsg->info.rspLen = rspLen;
tFreeSMonVloadInfo(&vloads);
return 0;
}
......@@ -174,12 +174,11 @@ static void vmGenerateWrapperCfg(SVnodeMgmt *pMgmt, SCreateVnodeReq *pCreate, SW
}
int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SRpcMsg *pReq = pMsg;
SCreateVnodeReq createReq = {0};
int32_t code = -1;
char path[TSDB_FILENAME_LEN] = {0};
if (tDeserializeSCreateVnodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) {
if (tDeserializeSCreateVnodeReq(pMsg->pCont, pMsg->contLen, &createReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
......@@ -242,9 +241,8 @@ _OVER:
}
int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SRpcMsg *pReq = pMsg;
SDropVnodeReq dropReq = {0};
if (tDeserializeSDropVnodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
if (tDeserializeSDropVnodeReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
......
......@@ -22,21 +22,19 @@
static inline void vmSendRsp(SRpcMsg *pMsg, int32_t code) {
SRpcMsg rsp = {
.code = code,
.info = pMsg->info,
.pCont = pMsg->info.rsp,
.contLen = pMsg->info.rspLen,
.info = pMsg->info,
};
tmsgSendRsp(&rsp);
}
static void vmProcessMgmtMonitorQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
SVnodeMgmt *pMgmt = pInfo->ahandle;
int32_t code = -1;
tmsg_t msgType = pMsg->msgType;
dTrace("msg:%p, get from vnode queue, type:%s", pMsg, TMSG_INFO(msgType));
dTrace("msg:%p, get from vnode queue, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
switch (msgType) {
switch (pMsg->msgType) {
case TDMT_MON_VM_INFO:
code = vmProcessGetMonitorInfoReq(pMgmt, pMsg);
break;
......@@ -54,7 +52,7 @@ static void vmProcessMgmtMonitorQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
dError("msg:%p, not processed in vnode queue", pMsg);
}
if (msgType & 1u) {
if (IsReq(pMsg)) {
if (code != 0 && terrno != 0) code = terrno;
vmSendRsp(pMsg, code);
}
......@@ -96,7 +94,6 @@ static void vmProcessFetchQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
SVnodeObj *pVnode = pInfo->ahandle;
SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SRpcMsg *));
if (pArray == NULL) {
dError("failed to process %d msgs in write-queue since %s", numOfMsgs, terrstr());
......@@ -116,7 +113,7 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
for (int i = 0; i < taosArrayGetSize(pArray); i++) {
SRpcMsg *pMsg = *(SRpcMsg **)taosArrayGet(pArray, i);
SRpcMsg rsp = {.info = pMsg->info, .pCont = NULL, .contLen = 0};
SRpcMsg rsp = {.info = pMsg->info};
int32_t ret = syncPropose(vnodeGetSyncHandle(pVnode->pImpl), pMsg, false);
if (ret == TAOS_SYNC_PROPOSE_NOT_LEADER) {
......@@ -126,12 +123,10 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
syncGetEpSet(vnodeGetSyncHandle(pVnode->pImpl), &newEpSet);
newEpSet.inUse = (newEpSet.inUse + 1) % newEpSet.numOfEps;
tmsgSendRedirectRsp(&rsp, &newEpSet);
} else if (ret == TAOS_SYNC_PROPOSE_OTHER_ERROR) {
rsp.code = TSDB_CODE_SYN_INTERNAL_ERROR;
tmsgSendRsp(&rsp);
} else if (ret == TAOS_SYNC_PROPOSE_SUCCESS) {
// ok
// send response in applyQ
} else {
assert(0);
......@@ -150,16 +145,13 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
SVnodeObj *pVnode = pInfo->ahandle;
SRpcMsg *pMsg = NULL;
SRpcMsg rsp;
for (int32_t i = 0; i < numOfMsgs; ++i) {
SRpcMsg *pMsg = NULL;
taosGetQitem(qall, (void **)&pMsg);
// init response rpc msg
rsp.code = 0;
rsp.pCont = NULL;
rsp.contLen = 0;
SRpcMsg rsp = {0};
// get original rpc msg
assert(pMsg->msgType == TDMT_VND_SYNC_APPLY_MSG);
......@@ -178,7 +170,6 @@ static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
rpcFreeCont(originalRpcMsg.pCont);
// if leader, send response
// if (pMsg->rpcMsg.handle != NULL && pMsg->rpcMsg.ahandle != NULL) {
if (pMsg->info.handle != NULL) {
rsp.info = pMsg->info;
tmsgSendRsp(&rsp);
......@@ -191,21 +182,19 @@ static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
SVnodeObj *pVnode = pInfo->ahandle;
SRpcMsg *pMsg = NULL;
for (int32_t i = 0; i < numOfMsgs; ++i) {
SRpcMsg *pMsg = NULL;
taosGetQitem(qall, (void **)&pMsg);
// todo
SRpcMsg *pRsp = NULL;
int32_t ret = vnodeProcessSyncReq(pVnode->pImpl, pMsg, &pRsp);
if (ret != 0) {
// if leader, send response
int32_t code = vnodeProcessSyncReq(pVnode->pImpl, pMsg, NULL);
if (code != 0) {
if (pMsg->info.handle != NULL) {
SRpcMsg rsp = {0};
rsp.code = terrno;
rsp.info = pMsg->info;
dTrace("msg:%p, process sync queue error since code:%s", pMsg, terrstr());
SRpcMsg rsp = {
.code = (terrno < 0) ? terrno : code,
.info = pMsg->info,
};
dTrace("msg:%p, failed to process sync queue since %s", pMsg, terrstr());
tmsgSendRsp(&rsp);
}
}
......@@ -217,9 +206,9 @@ static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOf
static void vmProcessMergeQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
SVnodeObj *pVnode = pInfo->ahandle;
SRpcMsg *pMsg = NULL;
for (int32_t i = 0; i < numOfMsgs; ++i) {
SRpcMsg *pMsg = NULL;
taosGetQitem(qall, (void **)&pMsg);
dTrace("msg:%p, get from vnode-merge queue", pMsg);
......@@ -309,7 +298,6 @@ int32_t vmPutNodeMsgToMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
int32_t vmPutNodeMsgToMonitorQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SSingleWorker *pWorker = &pMgmt->monitorWorker;
dTrace("msg:%p, put into vnode-monitor worker, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
taosWriteQitem(pWorker->queue, pMsg);
return 0;
......@@ -317,14 +305,17 @@ int32_t vmPutNodeMsgToMonitorQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
static int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pRpc, EQueueType qtype) {
SMsgHead *pHead = pRpc->pCont;
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId);
if (pVnode == NULL) return -1;
SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM);
int32_t code = 0;
if (pMsg != NULL) {
if (pMsg == NULL) {
rpcFreeCont(pRpc->pCont);
pRpc->pCont = NULL;
code = -1;
} else {
memcpy(pMsg, pRpc, sizeof(SRpcMsg));
switch (qtype) {
case WRITE_QUEUE:
......@@ -429,7 +420,7 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
return -1;
}
dDebug("vgId:%d, vnode queue is alloced", pVnode->vgId);
dDebug("vgId:%d, queue is alloced", pVnode->vgId);
return 0;
}
......@@ -446,7 +437,7 @@ void vmFreeQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
pVnode->pQueryQ = NULL;
pVnode->pFetchQ = NULL;
pVnode->pMergeQ = NULL;
dDebug("vgId:%d, vnode queue is freed", pVnode->vgId);
dDebug("vgId:%d, queue is freed", pVnode->vgId);
}
int32_t vmStartWorker(SVnodeMgmt *pMgmt) {
......@@ -497,7 +488,7 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) {
.param = pMgmt,
};
if (tSingleWorkerInit(&pMgmt->monitorWorker, &mCfg) != 0) {
dError("failed to start mnode vnode-monitor worker since %s", terrstr());
dError("failed to start vnode-monitor worker since %s", terrstr());
return -1;
}
......
......@@ -161,10 +161,12 @@ void dmGetVnodeLoads(SMonVloadInfo *pInfo) {
void dmGetMnodeLoads(SMonMloadInfo *pInfo) {
SDnode *pDnode = dmInstance();
SMgmtWrapper *pWrapper = &pDnode->wrappers[MNODE];
if (dmMarkWrapper(pWrapper) == 0) {
if (tsMultiProcess) {
dmSendLocalRecv(pDnode, TDMT_MON_MM_LOAD, tDeserializeSMonMloadInfo, pInfo);
} else if (pWrapper->pMgmt != NULL) {
mmGetMnodeLoads(pWrapper->pMgmt, pInfo);
}
dmReleaseWrapper(pWrapper);
}
}
......@@ -103,7 +103,7 @@ static inline int32_t dmPushToProcQueue(SProc *proc, SProcQueue *queue, SRpcMsg
return -1;
}
if (ftype == DND_FUNC_REQ && IsReq(pMsg) && pMsg->code == 0 && handle != 0) {
if (ftype == DND_FUNC_REQ && IsReq(pMsg) && pMsg->code == 0 && handle != 0 && pMsg->info.noResp == 0) {
if (taosHashPut(proc->hash, &handle, sizeof(int64_t), &pMsg->info, sizeof(SRpcConnInfo)) != 0) {
taosThreadMutexUnlock(&queue->mutex);
return -1;
......
......@@ -49,9 +49,9 @@ int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) {
}
static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
SDnodeTrans *pTrans = &pDnode->trans;
SDnodeTrans * pTrans = &pDnode->trans;
int32_t code = -1;
SRpcMsg *pMsg = NULL;
SRpcMsg * pMsg = NULL;
bool needRelease = false;
SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pRpc->msgType)];
SMgmtWrapper *pWrapper = NULL;
......@@ -179,11 +179,11 @@ int32_t dmInitMsgHandle(SDnode *pDnode) {
for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
SArray *pArray = (*pWrapper->func.getHandlesFp)();
SArray * pArray = (*pWrapper->func.getHandlesFp)();
if (pArray == NULL) return -1;
for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
SMgmtHandle *pMgmt = taosArrayGet(pArray, i);
SMgmtHandle * pMgmt = taosArrayGet(pArray, i);
SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pMgmt->msgType)];
if (pMgmt->needCheckVgId) {
pHandle->needCheckVgId = pMgmt->needCheckVgId;
......@@ -200,94 +200,54 @@ int32_t dmInitMsgHandle(SDnode *pDnode) {
return 0;
}
static void dmSendRpcRedirectRsp(const SRpcMsg *pReq) {
SDnode *pDnode = dmInstance();
SEpSet epSet = {0};
dmGetMnodeEpSet(&pDnode->data, &epSet);
dDebug("RPC %p, req is redirected, num:%d use:%d", pReq->info.handle, epSet.numOfEps, epSet.inUse);
for (int32_t i = 0; i < epSet.numOfEps; ++i) {
dDebug("mnode index:%d %s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port);
if (strcmp(epSet.eps[i].fqdn, tsLocalFqdn) == 0 && epSet.eps[i].port == tsServerPort) {
epSet.inUse = (i + 1) % epSet.numOfEps;
}
epSet.eps[i].port = htons(epSet.eps[i].port);
}
SMEpSet msg = {.epSet = epSet};
int32_t len = tSerializeSMEpSet(NULL, 0, &msg);
SRpcMsg rsp = {
.code = TSDB_CODE_RPC_REDIRECT,
.info = pReq->info,
.contLen = len,
};
rsp.pCont = rpcMallocCont(len);
tSerializeSMEpSet(rsp.pCont, len, &msg);
rpcSendResponse(&rsp);
}
static inline void dmSendRecv(SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp) {
static inline int32_t dmSendReq(const SEpSet *pEpSet, SRpcMsg *pMsg) {
SDnode *pDnode = dmInstance();
if (pDnode->status != DND_STAT_RUNNING) {
pRsp->code = TSDB_CODE_NODE_OFFLINE;
rpcFreeCont(pReq->pCont);
pReq->pCont = NULL;
} else {
rpcSendRecv(pDnode->trans.clientRpc, pEpSet, pReq, pRsp);
}
}
static inline int32_t dmSendReq(const SEpSet *pEpSet, SRpcMsg *pReq) {
SDnode *pDnode = dmInstance();
if (pDnode->status != DND_STAT_RUNNING) {
rpcFreeCont(pReq->pCont);
pReq->pCont = NULL;
rpcFreeCont(pMsg->pCont);
pMsg->pCont = NULL;
terrno = TSDB_CODE_NODE_OFFLINE;
dError("failed to send rpc msg since %s, handle:%p", terrstr(), pReq->info.handle);
dError("failed to send rpc msg since %s, handle:%p", terrstr(), pMsg->info.handle);
return -1;
} else {
rpcSendRequest(pDnode->trans.clientRpc, pEpSet, pReq, NULL);
rpcSendRequest(pDnode->trans.clientRpc, pEpSet, pMsg, NULL);
return 0;
}
}
static inline void dmSendRsp(SRpcMsg *pMsg) {
SMgmtWrapper *pWrapper = pMsg->info.wrapper;
if (pMsg->code == TSDB_CODE_NODE_REDIRECT) {
dmSendRpcRedirectRsp(pMsg);
} else {
if (InChildProc(pWrapper)) {
dmPutToProcPQueue(&pWrapper->proc, pMsg, DND_FUNC_RSP);
rpcFreeCont(pMsg->pCont);
pMsg->pCont = NULL;
} else {
rpcSendResponse(pMsg);
}
}
}
static inline void dmSendRedirectRsp(SRpcMsg *pMsg, const SEpSet *pNewEpSet) {
SMgmtWrapper *pWrapper = pMsg->info.wrapper;
if (InChildProc(pWrapper)) {
dmPutToProcPQueue(&pWrapper->proc, pMsg, DND_FUNC_RSP);
} else {
SRpcMsg rsp = {0};
SRpcMsg rsp = {.code = TSDB_CODE_RPC_REDIRECT, .info = pMsg->info};
SMEpSet msg = {.epSet = *pNewEpSet};
int32_t len = tSerializeSMEpSet(NULL, 0, &msg);
rsp.pCont = rpcMallocCont(len);
rsp.contLen = len;
tSerializeSMEpSet(rsp.pCont, len, &msg);
int32_t contLen = tSerializeSMEpSet(NULL, 0, &msg);
rsp.code = TSDB_CODE_RPC_REDIRECT;
rsp.info = pMsg->info;
rpcSendResponse(&rsp);
rsp.pCont = rpcMallocCont(contLen);
if (rsp.pCont == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
} else {
tSerializeSMEpSet(rsp.pCont, contLen, &msg);
rsp.contLen = contLen;
}
dmSendRsp(&rsp);
rpcFreeCont(pMsg->pCont);
pMsg->pCont = NULL;
}
static inline void dmRegisterBrokenLinkArg(SRpcMsg *pMsg) {
SMgmtWrapper *pWrapper = pMsg->info.wrapper;
if (InChildProc(pWrapper)) {
dmPutToProcPQueue(&pWrapper->proc, pMsg, DND_FUNC_REGIST);
rpcFreeCont(pMsg->pCont);
pMsg->pCont = NULL;
} else {
rpcRegisterBrokenLinkArg(pMsg);
}
......@@ -316,15 +276,9 @@ int32_t dmInitClient(SDnode *pDnode) {
rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.idleTime = tsShellActivityTimer * 1000;
rpcInit.user = INTERNAL_USER;
rpcInit.ckey = INTERNAL_CKEY;
rpcInit.spi = 1;
rpcInit.parent = pDnode;
rpcInit.rfp = rpcRfp;
char pass[TSDB_PASSWORD_LEN + 1] = {0};
taosEncryptPass_c((uint8_t *)(INTERNAL_SECRET), strlen(INTERNAL_SECRET), pass);
rpcInit.secret = pass;
pTrans->clientRpc = rpcOpen(&rpcInit);
if (pTrans->clientRpc == NULL) {
dError("failed to init dnode rpc client");
......@@ -389,3 +343,34 @@ SMsgCb dmGetMsgcb(SDnode *pDnode) {
};
return msgCb;
}
static void dmSendMnodeRedirectRsp(SRpcMsg *pMsg) {
SDnode *pDnode = dmInstance();
SEpSet epSet = {0};
dmGetMnodeEpSet(&pDnode->data, &epSet);
dDebug("msg:%p, is redirected, num:%d use:%d", pMsg, epSet.numOfEps, epSet.inUse);
for (int32_t i = 0; i < epSet.numOfEps; ++i) {
dDebug("mnode index:%d %s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port);
if (strcmp(epSet.eps[i].fqdn, tsLocalFqdn) == 0 && epSet.eps[i].port == tsServerPort) {
epSet.inUse = (i + 1) % epSet.numOfEps;
}
epSet.eps[i].port = htons(epSet.eps[i].port);
}
SRpcMsg rsp = {.code = TSDB_CODE_RPC_REDIRECT, .info = pMsg->info};
SMEpSet msg = {.epSet = epSet};
int32_t contLen = tSerializeSMEpSet(NULL, 0, &msg);
rsp.pCont = rpcMallocCont(contLen);
if (rsp.pCont == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
} else {
tSerializeSMEpSet(rsp.pCont, contLen, &msg);
rsp.contLen = contLen;
}
dmSendRsp(&rsp);
rpcFreeCont(pMsg->pCont);
pMsg->pCont = NULL;
}
......@@ -48,10 +48,10 @@ void TestClient::DoInit() {
rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.idleTime = 30 * 1000;
rpcInit.user = (char*)this->user;
rpcInit.ckey = (char*)"key";
// rpcInit.ckey = (char*)"key";
rpcInit.parent = this;
rpcInit.secret = (char*)secretEncrypt;
rpcInit.spi = 1;
// rpcInit.secret = (char*)secretEncrypt;
// rpcInit.spi = 1;
clientRpc = rpcOpen(&rpcInit);
ASSERT(clientRpc);
......
......@@ -848,7 +848,7 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)cgroup, false);
// app id
// client id
char clientId[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
tstrncpy(varDataVal(clientId), pConsumer->clientId, TSDB_CGROUP_LEN);
varDataSetLen(clientId, strlen(varDataVal(clientId)));
......
......@@ -624,14 +624,12 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) {
SRpcMsg rpcMsg = {.msgType = TDMT_DND_CONFIG_DNODE, .pCont = pBuf, .contLen = bufLen, .info = pReq->info};
mInfo("dnode:%d, app:%p config:%s req send to dnode", cfgReq.dnodeId, rpcMsg.info.ahandle, cfgReq.config);
tmsgSendReq(&epSet, &rpcMsg);
return 0;
mDebug("dnode:%d, send config req to dnode, app:%p", cfgReq.dnodeId, rpcMsg.info.ahandle);
return tmsgSendReq(&epSet, &rpcMsg);
}
static int32_t mndProcessConfigDnodeRsp(SRpcMsg *pRsp) {
mInfo("app:%p config rsp from dnode", pRsp->info.ahandle);
mDebug("config rsp from dnode, app:%p", pRsp->info.ahandle);
return TSDB_CODE_SUCCESS;
}
......
......@@ -45,20 +45,20 @@ typedef struct SVnodeCfg SVnodeCfg;
extern const SVnodeCfg vnodeCfgDefault;
int vnodeInit(int nthreads);
int32_t vnodeInit(int32_t nthreads);
void vnodeCleanup();
int vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs);
int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs);
void vnodeDestroy(const char *path, STfs *pTfs);
SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb);
void vnodeClose(SVnode *pVnode);
int vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs, int64_t *version);
int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp);
int vnodeProcessCMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg);
int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo);
int32_t vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs, int64_t *version);
int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp);
int32_t vnodeProcessCMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg);
int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo);
int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad);
int vnodeValidateTableHash(SVnode *pVnode, char *tableFName);
int32_t vnodeValidateTableHash(SVnode *pVnode, char *tableFName);
int32_t vnodeStart(SVnode *pVnode);
void vnodeStop(SVnode *pVnode);
......@@ -74,8 +74,8 @@ typedef struct SMetaEntry SMetaEntry;
void metaReaderInit(SMetaReader *pReader, SMeta *pMeta, int32_t flags);
void metaReaderClear(SMetaReader *pReader);
int metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid);
int metaReadNext(SMetaReader *pReader);
int32_t metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid);
int32_t metaReadNext(SMetaReader *pReader);
const void *metaGetTableTagVal(SMetaEntry *pEntry, int16_t cid);
#if 1 // refact APIs below (TODO)
......@@ -86,7 +86,7 @@ typedef struct SMTbCursor SMTbCursor;
SMTbCursor *metaOpenTbCursor(SMeta *pMeta);
void metaCloseTbCursor(SMTbCursor *pTbCur);
int metaTbCursorNext(SMTbCursor *pTbCur);
int32_t metaTbCursorNext(SMTbCursor *pTbCur);
#endif
// tsdb
......@@ -124,12 +124,13 @@ typedef struct STqReadHandle STqReadHandle;
STqReadHandle *tqInitSubmitMsgScanner(SMeta *pMeta);
void tqReadHandleSetColIdList(STqReadHandle *pReadHandle, SArray *pColIdList);
int tqReadHandleSetTbUidList(STqReadHandle *pHandle, const SArray *tbUidList);
int tqReadHandleAddTbUidList(STqReadHandle *pHandle, const SArray *tbUidList);
int tqReadHandleRemoveTbUidList(STqReadHandle* pHandle, const SArray* tbUidList);
int32_t tqReadHandleSetTbUidList(STqReadHandle *pHandle, const SArray *tbUidList);
int32_t tqReadHandleAddTbUidList(STqReadHandle *pHandle, const SArray *tbUidList);
int32_t tqReadHandleRemoveTbUidList(STqReadHandle* pHandle, const SArray* tbUidList);
int32_t tqReadHandleSetMsg(STqReadHandle *pHandle, SSubmitReq *pMsg, int64_t ver);
bool tqNextDataBlock(STqReadHandle *pHandle);
bool tqNextDataBlockFilterOut(STqReadHandle *pHandle, SHashObj *filterOutUids);
int32_t tqRetrieveDataBlock(SArray **ppCols, STqReadHandle *pHandle, uint64_t *pGroupId, uint64_t *pUid,
int32_t *pNumOfRows, int16_t *pNumOfCols);
......@@ -208,15 +209,15 @@ struct SMetaReader {
SDecoder coder;
SMetaEntry me;
void *pBuf;
int szBuf;
int32_t szBuf;
};
struct SMTbCursor {
TBC *pDbc;
void *pKey;
void *pVal;
int kLen;
int vLen;
int32_t kLen;
int32_t vLen;
SMetaReader mr;
};
......
......@@ -163,6 +163,7 @@ typedef struct {
int8_t withSchema;
int8_t withTag;
char* qmsg;
SHashObj* pDropTbUid;
STqPushHandle pushHandle;
// SRWLatch lock;
SWalReadHandle* pWalReader;
......
......@@ -24,7 +24,6 @@
extern "C" {
#endif
// vnodeDebug ====================
// clang-format off
#define vFatal(...) do { if (vDebugFlag & DEBUG_FATAL) { taosPrintLog("VND FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0)
#define vError(...) do { if (vDebugFlag & DEBUG_ERROR) { taosPrintLog("VND ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while(0)
......@@ -34,17 +33,17 @@ extern "C" {
#define vTrace(...) do { if (vDebugFlag & DEBUG_TRACE) { taosPrintLog("VND ", DEBUG_TRACE, vDebugFlag, __VA_ARGS__); }} while(0)
// clang-format on
// vnodeCfg ====================
// vnodeCfg.c
extern const SVnodeCfg vnodeCfgDefault;
int vnodeCheckCfg(const SVnodeCfg*);
int vnodeEncodeConfig(const void* pObj, SJson* pJson);
int vnodeDecodeConfig(const SJson* pJson, void* pObj);
int32_t vnodeCheckCfg(const SVnodeCfg*);
int32_t vnodeEncodeConfig(const void* pObj, SJson* pJson);
int32_t vnodeDecodeConfig(const SJson* pJson, void* pObj);
// vnodeModule ====================
int vnodeScheduleTask(int (*execute)(void*), void* arg);
// vnodeModule.c
int32_t vnodeScheduleTask(int32_t (*execute)(void*), void* arg);
// vnodeBufPool ====================
// vnodeBufPool.c
typedef struct SVBufPoolNode SVBufPoolNode;
struct SVBufPoolNode {
SVBufPoolNode* prev;
......@@ -62,38 +61,29 @@ struct SVBufPool {
SVBufPoolNode node;
};
int vnodeOpenBufPool(SVnode* pVnode, int64_t size);
int vnodeCloseBufPool(SVnode* pVnode);
int32_t vnodeOpenBufPool(SVnode* pVnode, int64_t size);
int32_t vnodeCloseBufPool(SVnode* pVnode);
void vnodeBufPoolReset(SVBufPool* pPool);
// vnodeQuery ====================
int vnodeQueryOpen(SVnode* pVnode);
// vnodeQuery.c
int32_t vnodeQueryOpen(SVnode* pVnode);
void vnodeQueryClose(SVnode* pVnode);
int vnodeGetTableMeta(SVnode* pVnode, SRpcMsg* pMsg);
int32_t vnodeGetTableMeta(SVnode* pVnode, SRpcMsg* pMsg);
// vnodeCommit ====================
int vnodeBegin(SVnode* pVnode);
int vnodeShouldCommit(SVnode* pVnode);
int vnodeCommit(SVnode* pVnode);
int vnodeSaveInfo(const char* dir, const SVnodeInfo* pCfg);
int vnodeCommitInfo(const char* dir, const SVnodeInfo* pInfo);
int vnodeLoadInfo(const char* dir, SVnodeInfo* pInfo);
int vnodeSyncCommit(SVnode* pVnode);
int vnodeAsyncCommit(SVnode* pVnode);
// vnodeCommit.c
int32_t vnodeBegin(SVnode* pVnode);
int32_t vnodeShouldCommit(SVnode* pVnode);
int32_t vnodeCommit(SVnode* pVnode);
int32_t vnodeSaveInfo(const char* dir, const SVnodeInfo* pCfg);
int32_t vnodeCommitInfo(const char* dir, const SVnodeInfo* pInfo);
int32_t vnodeLoadInfo(const char* dir, SVnodeInfo* pInfo);
int32_t vnodeSyncCommit(SVnode* pVnode);
int32_t vnodeAsyncCommit(SVnode* pVnode);
// vnodeCommit ====================
// vnodeSync.c
int32_t vnodeSyncOpen(SVnode* pVnode, char* path);
int32_t vnodeSyncStart(SVnode* pVnode);
void vnodeSyncStart(SVnode* pVnode);
void vnodeSyncClose(SVnode* pVnode);
void vnodeSyncSetQ(SVnode* pVnode, void* qHandle);
void vnodeSyncSetRpc(SVnode* pVnode, void* rpcHandle);
int32_t vnodeSyncEqMsg(void* qHandle, SRpcMsg* pMsg);
int32_t vnodeSendMsg(void* rpcHandle, const SEpSet* pEpSet, SRpcMsg* pMsg);
void vnodeSyncCommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
void vnodeSyncPreCommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
void vnodeSyncRollBackCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
int32_t vnodeSyncGetSnapshotCb(struct SSyncFSM* pFsm, SSnapshot* pSnapshot);
SSyncFSM* syncVnodeMakeFsm();
#ifdef __cplusplus
}
......
......@@ -82,7 +82,7 @@ int metaCreateSTable(SMeta* pMeta, int64_t version, SVCreateStbReq*
int metaAlterSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq);
int metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq);
int metaCreateTable(SMeta* pMeta, int64_t version, SVCreateTbReq* pReq);
int metaDropTable(SMeta* pMeta, int64_t version, SVDropTbReq* pReq);
int metaDropTable(SMeta* pMeta, int64_t version, SVDropTbReq* pReq, SArray* tbUids);
int metaAlterTable(SMeta* pMeta, int64_t version, SVAlterTbReq* pReq);
SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, bool isinline);
STSchema* metaGetTbTSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver);
......@@ -104,7 +104,7 @@ int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeep
int tsdbClose(STsdb** pTsdb);
int tsdbBegin(STsdb* pTsdb);
int tsdbCommit(STsdb* pTsdb);
int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, const SSubmitReq *pMsg);
int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, const SSubmitReq* pMsg);
int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSubmitRsp* pRsp);
int tsdbInsertTableData(STsdb* pTsdb, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock, SSubmitBlkRsp* pRsp);
tsdbReaderT* tsdbQueryTables(SVnode* pVnode, SQueryTableDataCond* pCond, STableGroupInfo* groupList, uint64_t qId,
......@@ -118,7 +118,7 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal);
void tqClose(STQ*);
int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver);
int tqCommit(STQ*);
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList);
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd);
int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen);
int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen);
int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen, int32_t workerId);
......
......@@ -56,8 +56,8 @@ int metaDecodeEntry(SDecoder *pCoder, SMetaEntry *pME) {
if (tDecodeCStr(pCoder, &pME->name) < 0) return -1;
if (pME->type == TSDB_SUPER_TABLE) {
if (tDecodeSSchemaWrapperEx(pCoder, &pME->stbEntry.schema) < 0) return -1;
if (tDecodeSSchemaWrapperEx(pCoder, &pME->stbEntry.schemaTag) < 0) return -1;
if (tDecodeSSchemaWrapper(pCoder, &pME->stbEntry.schema) < 0) return -1;
if (tDecodeSSchemaWrapper(pCoder, &pME->stbEntry.schemaTag) < 0) return -1;
} else if (pME->type == TSDB_CHILD_TABLE) {
if (tDecodeI64(pCoder, &pME->ctbEntry.ctime) < 0) return -1;
if (tDecodeI32(pCoder, &pME->ctbEntry.ttlDays) < 0) return -1;
......@@ -67,10 +67,10 @@ int metaDecodeEntry(SDecoder *pCoder, SMetaEntry *pME) {
if (tDecodeI64(pCoder, &pME->ntbEntry.ctime) < 0) return -1;
if (tDecodeI32(pCoder, &pME->ntbEntry.ttlDays) < 0) return -1;
if (tDecodeI32v(pCoder, &pME->ntbEntry.ncid) < 0) return -1;
if (tDecodeSSchemaWrapperEx(pCoder, &pME->ntbEntry.schema) < 0) return -1;
if (tDecodeSSchemaWrapper(pCoder, &pME->ntbEntry.schema) < 0) return -1;
} else if (pME->type == TSDB_TSMA_TABLE) {
pME->smaEntry.tsma = tDecoderMalloc(pCoder, sizeof(STSma));
if(!pME->smaEntry.tsma) {
if (!pME->smaEntry.tsma) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
......
......@@ -255,7 +255,7 @@ _err:
return -1;
}
int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq) {
int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq, SArray *tbUids) {
TBC *pTbDbc = NULL;
TBC *pUidIdxc = NULL;
TBC *pNameIdxc = NULL;
......@@ -336,6 +336,7 @@ int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq) {
if (type == TSDB_CHILD_TABLE) {
ctime = me.ctbEntry.ctime;
suid = me.ctbEntry.suid;
taosArrayPush(tbUids, &me.uid);
} else if (type == TSDB_NORMAL_TABLE) {
ctime = me.ntbEntry.ctime;
suid = 0;
......
......@@ -104,19 +104,28 @@ static void tdSRowDemo() {
taosMemoryFree(pTSChema);
}
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList) {
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
void* pIter = NULL;
STqExec* pExec = NULL;
while (1) {
pIter = taosHashIterate(pTq->execs, pIter);
if (pIter == NULL) break;
pExec = (STqExec*)pIter;
if (pExec->subType == TOPIC_SUB_TYPE__DB) continue;
if (pExec->subType == TOPIC_SUB_TYPE__DB) {
if (!isAdd) {
int32_t sz = taosArrayGetSize(tbUidList);
for (int32_t i = 0; i < sz; i++) {
int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i);
taosHashPut(pExec->pDropTbUid, &tbUid, sizeof(int64_t), NULL, 0);
}
}
} else {
for (int32_t i = 0; i < 5; i++) {
int32_t code = qUpdateQualifiedTableId(pExec->task[i], tbUidList, true);
int32_t code = qUpdateQualifiedTableId(pExec->task[i], tbUidList, isAdd);
ASSERT(code == 0);
}
}
}
return 0;
}
......@@ -582,7 +591,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
rsp.withSchema = 1;
STqReadHandle* pReader = pExec->pExecReader[workerId];
tqReadHandleSetMsg(pReader, pCont, 0);
while (tqNextDataBlock(pReader)) {
while (tqNextDataBlockFilterOut(pReader, pExec->pDropTbUid)) {
SSDataBlock block = {0};
if (tqRetrieveDataBlock(&block.pDataBlock, pReader, &block.info.groupId, &block.info.uid, &block.info.rows,
&block.info.numOfCols) < 0) {
......@@ -915,9 +924,10 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
req.qmsg = NULL;
pExec->pWalReader = walOpenReadHandle(pTq->pVnode->pWal);
if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
for (int32_t i = 0; i < 5; i++) {
pExec->pExecReader[i] = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);
if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
SReadHandle handle = {
.reader = pExec->pExecReader[i],
.meta = pTq->pVnode->pMeta,
......@@ -925,9 +935,12 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
};
pExec->task[i] = qCreateStreamExecTaskInfo(pExec->qmsg, &handle);
ASSERT(pExec->task[i]);
}
} else {
pExec->task[i] = NULL;
for (int32_t i = 0; i < 5; i++) {
pExec->pExecReader[i] = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);
}
pExec->pDropTbUid = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
}
taosHashPut(pTq->execs, req.subKey, strlen(req.subKey), pExec, sizeof(STqExec));
return 0;
......@@ -1045,6 +1058,57 @@ int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen, int32_t wo
return 0;
}
int32_t tqProcessStreamTriggerNew(STQ* pTq, SSubmitReq* data) {
SStreamDataSubmit* pSubmit = NULL;
// build data
pSubmit = taosAllocateQitem(sizeof(SStreamDataSubmit), DEF_QITEM);
if (pSubmit == NULL) return -1;
pSubmit->dataRef = taosMemoryMalloc(sizeof(int32_t));
if (pSubmit->dataRef == NULL) goto FAIL;
*pSubmit->dataRef = 1;
pSubmit->data = data;
pSubmit->type = STREAM_INPUT__DATA_BLOCK;
void* pIter = NULL;
while (1) {
pIter = taosHashIterate(pTq->pStreamTasks, pIter);
if (pIter == NULL) break;
SStreamTask* pTask = (SStreamTask*)pIter;
if (pTask->inputType == TASK_INPUT_TYPE__SUMBIT_BLOCK) {
streamEnqueueDataSubmit(pTask, pSubmit);
// TODO cal back pressure
}
// check run
int8_t execStatus = atomic_load_8(&pTask->status);
if (execStatus == TASK_STATUS__IDLE || execStatus == TASK_STATUS__CLOSING) {
SStreamTaskRunReq* pReq = taosMemoryMalloc(sizeof(SStreamTaskRunReq));
if (pReq == NULL) continue;
// TODO: do we need htonl?
pReq->head.vgId = pTq->pVnode->config.vgId;
pReq->streamId = pTask->streamId;
pReq->taskId = pTask->taskId;
SRpcMsg msg = {
.msgType = 0,
.pCont = pReq,
.contLen = sizeof(SStreamTaskRunReq),
};
tmsgPutToQueue(&pTq->pVnode->msgCb, FETCH_QUEUE, &msg);
}
}
streamDataSubmitRefDec(pSubmit);
return 0;
FAIL:
if (pSubmit) {
if (pSubmit->dataRef) {
taosMemoryFree(pSubmit->dataRef);
}
taosFreeQitem(pSubmit);
}
return -1;
}
int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen, int32_t workerId) {
SStreamTaskExecReq req;
tDecodeSStreamTaskExecReq(msg, &req);
......
......@@ -34,21 +34,11 @@ STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta) {
int32_t tqReadHandleSetMsg(STqReadHandle* pReadHandle, SSubmitReq* pMsg, int64_t ver) {
pReadHandle->pMsg = pMsg;
// pMsg->length = htonl(pMsg->length);
// pMsg->numOfBlocks = htonl(pMsg->numOfBlocks);
// iterate and convert
if (tInitSubmitMsgIter(pMsg, &pReadHandle->msgIter) < 0) return -1;
while (true) {
if (tGetSubmitMsgNext(&pReadHandle->msgIter, &pReadHandle->pBlock) < 0) return -1;
if (pReadHandle->pBlock == NULL) break;
// pReadHandle->pBlock->uid = htobe64(pReadHandle->pBlock->uid);
// pReadHandle->pBlock->suid = htobe64(pReadHandle->pBlock->suid);
// pReadHandle->pBlock->sversion = htonl(pReadHandle->pBlock->sversion);
// pReadHandle->pBlock->dataLen = htonl(pReadHandle->pBlock->dataLen);
// pReadHandle->pBlock->schemaLen = htonl(pReadHandle->pBlock->schemaLen);
// pReadHandle->pBlock->numOfRows = htons(pReadHandle->pBlock->numOfRows);
}
if (tInitSubmitMsgIter(pMsg, &pReadHandle->msgIter) < 0) return -1;
......@@ -64,22 +54,28 @@ bool tqNextDataBlock(STqReadHandle* pHandle) {
}
if (pHandle->pBlock == NULL) return false;
/*pHandle->pBlock->uid = htobe64(pHandle->pBlock->uid);*/
/*if (pHandle->tbUid == pHandle->pBlock->uid) {*/
if (pHandle->tbIdHash == NULL) {
return true;
}
void* ret = taosHashGet(pHandle->tbIdHash, &pHandle->msgIter.uid, sizeof(int64_t));
if (ret != NULL) {
/*printf("retrieve one tb %ld\n", pHandle->pBlock->uid);*/
/*pHandle->pBlock->tid = htonl(pHandle->pBlock->tid);*/
/*pHandle->pBlock->sversion = htonl(pHandle->pBlock->sversion);*/
/*pHandle->pBlock->dataLen = htonl(pHandle->pBlock->dataLen);*/
/*pHandle->pBlock->schemaLen = htonl(pHandle->pBlock->schemaLen);*/
/*pHandle->pBlock->numOfRows = htons(pHandle->pBlock->numOfRows);*/
return true;
/*} else {*/
/*printf("skip one tb %ld\n", pHandle->pBlock->uid);*/
}
}
return false;
}
bool tqNextDataBlockFilterOut(STqReadHandle* pHandle, SHashObj* filterOutUids) {
while (1) {
if (tGetSubmitMsgNext(&pHandle->msgIter, &pHandle->pBlock) < 0) {
return false;
}
if (pHandle->pBlock == NULL) return false;
ASSERT(pHandle->tbIdHash == NULL);
void* ret = taosHashGet(filterOutUids, &pHandle->msgIter.uid, sizeof(int64_t));
if (ret == NULL) {
return true;
}
}
return false;
......
......@@ -180,8 +180,6 @@ void vnodeClose(SVnode *pVnode) {
// start the sync timer after the queue is ready
int32_t vnodeStart(SVnode *pVnode) {
vnodeSyncSetQ(pVnode, NULL);
vnodeSyncSetRpc(pVnode, NULL);
vnodeSyncStart(pVnode);
return 0;
}
......
......@@ -118,13 +118,13 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg
break;
}
vDebug("vgId:%d process %s request success, version: %" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), version);
if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, version) < 0) {
vError("vgId:%d failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno));
return -1;
}
vDebug("vgId:%d process %s request success, version: %" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), version);
// commit if need
if (vnodeShouldCommit(pVnode)) {
vInfo("vgId:%d commit at version %" PRId64, TD_VID(pVnode), version);
......@@ -386,7 +386,7 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq,
tDecoderClear(&decoder);
tqUpdateTbUidList(pVnode->pTq, tbUids);
tqUpdateTbUidList(pVnode->pTq, tbUids, true);
tdUpdateTbUidList(pVnode->pSma, pStore);
tdUidStoreFree(pStore);
......@@ -517,6 +517,7 @@ static int vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, in
SDecoder decoder = {0};
SEncoder encoder = {0};
int ret;
SArray *tbUids = NULL;
pRsp->msgType = TDMT_VND_DROP_TABLE_RSP;
pRsp->pCont = NULL;
......@@ -533,13 +534,16 @@ static int vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, in
}
// process req
tbUids = taosArrayInit(req.nReqs, sizeof(int64_t));
rsp.pArray = taosArrayInit(req.nReqs, sizeof(SVDropTbRsp));
if (tbUids == NULL || rsp.pArray == NULL) goto _exit;
for (int iReq = 0; iReq < req.nReqs; iReq++) {
SVDropTbReq *pDropTbReq = req.pReqs + iReq;
SVDropTbRsp dropTbRsp = {0};
/* code */
ret = metaDropTable(pVnode->pMeta, version, pDropTbReq);
ret = metaDropTable(pVnode->pMeta, version, pDropTbReq, tbUids);
if (ret < 0) {
if (pDropTbReq->igNotExists && terrno == TSDB_CODE_VND_TABLE_NOT_EXIST) {
dropTbRsp.code = TSDB_CODE_SUCCESS;
......@@ -553,7 +557,10 @@ static int vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, in
taosArrayPush(rsp.pArray, &dropTbRsp);
}
tqUpdateTbUidList(pVnode->pTq, tbUids, false);
_exit:
taosArrayDestroy(tbUids);
tDecoderClear(&decoder);
tEncodeSize(tEncodeSVDropTbBatchRsp, &rsp, pRsp->contLen, ret);
pRsp->pCont = rpcMallocCont(pRsp->contLen);
......@@ -675,7 +682,7 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in
submitBlkRsp.uid = createTbReq.uid;
submitBlkRsp.tblFName = taosMemoryMalloc(strlen(pVnode->config.dbname) + strlen(createTbReq.name) + 2);
sprintf(submitBlkRsp.tblFName, "%s.%s", pVnode->config.dbname, createTbReq.name);
sprintf(submitBlkRsp.tblFName, "%s.", pVnode->config.dbname);
msgIter.uid = createTbReq.uid;
if (createTbReq.type == TSDB_CHILD_TABLE) {
......
......@@ -13,90 +13,62 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "vnd.h"
static int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg);
static int32_t vnodeSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg);
static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode);
static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta);
static void vnodeSyncPreCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta);
static void vnodeSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta);
static int32_t vnodeSyncGetSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot);
int32_t vnodeSyncOpen(SVnode *pVnode, char *path) {
SSyncInfo syncInfo;
syncInfo.vgId = pVnode->config.vgId;
SSyncCfg *pCfg = &(syncInfo.syncCfg);
pCfg->replicaNum = pVnode->config.syncCfg.replicaNum;
pCfg->myIndex = pVnode->config.syncCfg.myIndex;
memcpy(pCfg->nodeInfo, pVnode->config.syncCfg.nodeInfo, sizeof(pCfg->nodeInfo));
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s/sync", path);
syncInfo.pWal = pVnode->pWal;
syncInfo.pFsm = syncVnodeMakeFsm(pVnode);
syncInfo.rpcClient = NULL;
syncInfo.FpSendMsg = vnodeSendMsg;
syncInfo.queue = NULL;
syncInfo.FpEqMsg = vnodeSyncEqMsg;
SSyncInfo syncInfo = {
.vgId = pVnode->config.vgId,
.syncCfg = pVnode->config.syncCfg,
.pWal = pVnode->pWal,
.msgcb = NULL,
.FpSendMsg = vnodeSyncSendMsg,
.FpEqMsg = vnodeSyncEqMsg,
};
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", path, TD_DIRSEP);
syncInfo.pFsm = vnodeSyncMakeFsm(pVnode);
pVnode->sync = syncOpen(&syncInfo);
assert(pVnode->sync > 0);
if (pVnode->sync <= 0) {
vError("vgId:%d, failed to open sync since %s", pVnode->config.vgId, terrstr());
return -1;
}
// for test
setPingTimerMS(pVnode->sync, 3000);
setElectTimerMS(pVnode->sync, 500);
setHeartbeatTimerMS(pVnode->sync, 100);
return 0;
}
int32_t vnodeSyncStart(SVnode *pVnode) {
void vnodeSyncStart(SVnode *pVnode) {
syncSetMsgCb(pVnode->sync, &pVnode->msgCb);
syncStart(pVnode->sync);
return 0;
}
void vnodeSyncClose(SVnode *pVnode) {
// stop by ref id
syncStop(pVnode->sync);
}
void vnodeSyncSetQ(SVnode *pVnode, void *qHandle) { syncSetQ(pVnode->sync, (void *)(&(pVnode->msgCb))); }
void vnodeSyncSetRpc(SVnode *pVnode, void *rpcHandle) { syncSetRpc(pVnode->sync, (void *)(&(pVnode->msgCb))); }
int32_t vnodeSyncEqMsg(void *qHandle, SRpcMsg *pMsg) {
int32_t ret = 0;
SMsgCb *pMsgCb = qHandle;
if (pMsgCb->queueFps[SYNC_QUEUE] != NULL) {
tmsgPutToQueue(qHandle, SYNC_QUEUE, pMsg);
} else {
vError("vnodeSyncEqMsg queue is NULL, SYNC_QUEUE:%d", SYNC_QUEUE);
}
return ret;
}
int32_t vnodeSendMsg(void *rpcHandle, const SEpSet *pEpSet, SRpcMsg *pMsg) {
int32_t ret = 0;
SMsgCb *pMsgCb = rpcHandle;
if (pMsgCb->queueFps[SYNC_QUEUE] != NULL) {
pMsg->info.noResp = 1;
tmsgSendReq(pEpSet, pMsg);
} else {
vError("vnodeSendMsg queue is NULL, SYNC_QUEUE:%d", SYNC_QUEUE);
}
return ret;
}
void vnodeSyncClose(SVnode *pVnode) { syncStop(pVnode->sync); }
int32_t vnodeSyncGetSnapshotCb(struct SSyncFSM *pFsm, SSnapshot *pSnapshot) {
SVnode *pVnode = (SVnode *)(pFsm->data);
vnodeGetSnapshot(pVnode, pSnapshot);
int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { return tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg); }
/*
pSnapshot->data = NULL;
pSnapshot->lastApplyIndex = 0;
pSnapshot->lastApplyTerm = 0;
*/
int32_t vnodeSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { return tmsgSendReq(pEpSet, pMsg); }
int32_t vnodeSyncGetSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot) {
vnodeGetSnapshot(pFsm->data, pSnapshot);
return 0;
}
void vnodeSyncCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
SyncIndex beginIndex = SYNC_INDEX_INVALID;
if (pFsm->FpGetSnapshot != NULL) {
SSnapshot snapshot;
SSnapshot snapshot = {0};
pFsm->FpGetSnapshot(pFsm, &snapshot);
beginIndex = snapshot.lastApplyIndex;
}
......@@ -147,7 +119,7 @@ void vnodeSyncCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cb
}
}
void vnodeSyncPreCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
void vnodeSyncPreCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"==callback== ==PreCommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", pFsm, cbMeta.index,
......@@ -155,19 +127,19 @@ void vnodeSyncPreCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta
syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);
}
void vnodeSyncRollBackCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
void vnodeSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
char logBuf[256];
snprintf(logBuf, sizeof(logBuf), "==callback== ==RollBackCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n",
pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state));
syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);
}
SSyncFSM *syncVnodeMakeFsm(SVnode *pVnode) {
SSyncFSM *pFsm = (SSyncFSM *)taosMemoryMalloc(sizeof(SSyncFSM));
SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
pFsm->data = pVnode;
pFsm->FpCommitCb = vnodeSyncCommitCb;
pFsm->FpPreCommitCb = vnodeSyncPreCommitCb;
pFsm->FpRollBackCb = vnodeSyncRollBackCb;
pFsm->FpGetSnapshot = vnodeSyncGetSnapshotCb;
pFsm->FpCommitCb = vnodeSyncCommitMsg;
pFsm->FpPreCommitCb = vnodeSyncPreCommitMsg;
pFsm->FpRollBackCb = vnodeSyncRollBackMsg;
pFsm->FpGetSnapshot = vnodeSyncGetSnapshot;
return pFsm;
}
\ No newline at end of file
......@@ -300,10 +300,26 @@ void addTagPseudoColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock)
if (fmIsScanPseudoColumnFunc(functionId)) {
setTbNameColData(pTableScanInfo->readHandle.meta, pBlock, pColInfoData, functionId);
} else { // these are tags
const char* p = metaGetTableTagVal(&mr.me, pExpr->base.pParam[0].pCol->colId);
const char* p = NULL;
if(pColInfoData->info.type == TSDB_DATA_TYPE_JSON){
const uint8_t *tmp = mr.me.ctbEntry.pTags;
char *data = taosMemoryCalloc(kvRowLen(tmp) + 1, 1);
if(data == NULL){
qError("doTagScan calloc error:%d", kvRowLen(tmp) + 1);
return;
}
*data = TSDB_DATA_TYPE_JSON;
memcpy(data+1, tmp, kvRowLen(tmp));
p = data;
}else{
p = metaGetTableTagVal(&mr.me, pExpr->base.pParam[0].pCol->colId);
}
for (int32_t i = 0; i < pBlock->info.rows; ++i) {
colDataAppend(pColInfoData, i, p, (p == NULL));
}
if(pColInfoData->info.type == TSDB_DATA_TYPE_JSON){
taosMemoryFree((void*)p);
}
}
}
......@@ -1587,10 +1603,23 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
STR_TO_VARSTR(str, mr.me.name);
colDataAppend(pDst, count, str, false);
} else { // it is a tag value
if(pDst->info.type == TSDB_DATA_TYPE_JSON){
const uint8_t *tmp = mr.me.ctbEntry.pTags;
char *data = taosMemoryCalloc(kvRowLen(tmp) + 1, 1);
if(data == NULL){
qError("doTagScan calloc error:%d", kvRowLen(tmp) + 1);
return NULL;
}
*data = TSDB_DATA_TYPE_JSON;
memcpy(data+1, tmp, kvRowLen(tmp));
colDataAppend(pDst, count, data, false);
taosMemoryFree(data);
}else{
const char* p = metaGetTableTagVal(&mr.me, pExprInfo[j].base.pParam[0].pCol->colId);
colDataAppend(pDst, count, p, (p == NULL));
}
}
}
count += 1;
if (++pInfo->curPos >= pInfo->pTableGroups->numOfTables) {
......
......@@ -120,7 +120,13 @@ int32_t sampleFunction(SqlFunctionCtx* pCtx);
bool getTailFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
bool tailFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
int32_t tailFunction(SqlFunctionCtx* pCtx);
int32_t tailFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
//int32_t tailFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
bool getUniqueFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
bool uniqueFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
int32_t uniqueFunction(SqlFunctionCtx *pCtx);
//int32_t uniqueFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
bool getSelectivityFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv);
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册