提交 c03f7aa7 编写于 作者: H Hongze Cheng

Merge branch '3.0' of https://github.com/taosdata/TDengine into feature/data_format

...@@ -46,7 +46,7 @@ ENDIF () ...@@ -46,7 +46,7 @@ ENDIF ()
IF (TD_WINDOWS) IF (TD_WINDOWS)
MESSAGE("${Yellow} set compiler flag for Windows! ${ColourReset}") MESSAGE("${Yellow} set compiler flag for Windows! ${ColourReset}")
SET(COMMON_FLAGS "/w /D_WIN32") SET(COMMON_FLAGS "/w /D_WIN32 /Zi")
SET(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} /MANIFEST:NO") SET(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} /MANIFEST:NO")
# IF (MSVC AND (MSVC_VERSION GREATER_EQUAL 1900)) # IF (MSVC AND (MSVC_VERSION GREATER_EQUAL 1900))
# SET(COMMON_FLAGS "${COMMON_FLAGS} /Wv:18") # SET(COMMON_FLAGS "${COMMON_FLAGS} /Wv:18")
......
...@@ -100,8 +100,10 @@ endif(${BUILD_WITH_NURAFT}) ...@@ -100,8 +100,10 @@ endif(${BUILD_WITH_NURAFT})
# addr2line # addr2line
if(${BUILD_ADDR2LINE}) if(${BUILD_ADDR2LINE})
if(NOT ${TD_WINDOWS})
cat("${TD_SUPPORT_DIR}/libdwarf_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) cat("${TD_SUPPORT_DIR}/libdwarf_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
cat("${TD_SUPPORT_DIR}/addr2line_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) cat("${TD_SUPPORT_DIR}/addr2line_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
endif(NOT ${TD_WINDOWS})
endif(${BUILD_ADDR2LINE}) endif(${BUILD_ADDR2LINE})
# download dependencies # download dependencies
...@@ -335,6 +337,7 @@ endif(${BUILD_WITH_SQLITE}) ...@@ -335,6 +337,7 @@ endif(${BUILD_WITH_SQLITE})
# addr2line # addr2line
if(${BUILD_ADDR2LINE}) if(${BUILD_ADDR2LINE})
if(NOT ${TD_WINDOWS})
check_include_file( "sys/types.h" HAVE_SYS_TYPES_H) check_include_file( "sys/types.h" HAVE_SYS_TYPES_H)
check_include_file( "sys/stat.h" HAVE_SYS_STAT_H ) check_include_file( "sys/stat.h" HAVE_SYS_STAT_H )
check_include_file( "inttypes.h" HAVE_INTTYPES_H ) check_include_file( "inttypes.h" HAVE_INTTYPES_H )
...@@ -374,6 +377,7 @@ if(${BUILD_ADDR2LINE}) ...@@ -374,6 +377,7 @@ if(${BUILD_ADDR2LINE})
add_library(addr2line STATIC "addr2line/addr2line.c") add_library(addr2line STATIC "addr2line/addr2line.c")
target_link_libraries(addr2line PUBLIC libdwarf dl z) target_link_libraries(addr2line PUBLIC libdwarf dl z)
target_include_directories(addr2line PUBLIC "libdwarf/src/lib/libdwarf" ) target_include_directories(addr2line PUBLIC "libdwarf/src/lib/libdwarf" )
endif(NOT ${TD_WINDOWS})
endif(${BUILD_ADDR2LINE}) endif(${BUILD_ADDR2LINE})
......
...@@ -18,7 +18,7 @@ import CSAsyncQuery from "../../07-develop/04-query-data/_cs_async.mdx" ...@@ -18,7 +18,7 @@ import CSAsyncQuery from "../../07-develop/04-query-data/_cs_async.mdx"
`TDengine.Connector` 是 TDengine 提供的 C# 语言连接器。C# 开发人员可以通过它开发存取 TDengine 集群数据的 C# 应用软件。 `TDengine.Connector` 是 TDengine 提供的 C# 语言连接器。C# 开发人员可以通过它开发存取 TDengine 集群数据的 C# 应用软件。
`TDengine.Connector` 连接器支持通过 TDengine 客户端驱动(taosc)建立与 TDengine 运行实例的连接,提供数据写入、查询、订阅、schemaless 数据写入、参数绑定接口数据写入等功能 `TDengine.Connector` 目前暂未提供 REST 连接方式,用户可以参考 [RESTful APIs](https://docs.taosdata.com//reference/restful-api/) 文档自行编写。 `TDengine.Connector` 连接器支持通过 TDengine 客户端驱动(taosc)建立与 TDengine 运行实例的连接,提供数据写入、查询、订阅、schemaless 数据写入、参数绑定接口数据写入等功能 `TDengine.Connector` 目前暂未提供 REST 连接方式,用户可以参考 [REST API](/reference/rest-api/) 文档自行编写。
本文介绍如何在 Linux 或 Windows 环境中安装 `TDengine.Connector`,并通过 `TDengine.Connector` 连接 TDengine 集群,进行数据写入、查询等基本操作。 本文介绍如何在 Linux 或 Windows 环境中安装 `TDengine.Connector`,并通过 `TDengine.Connector` 连接 TDengine 集群,进行数据写入、查询等基本操作。
......
...@@ -19,7 +19,7 @@ import CSAsyncQuery from "../../07-develop/04-query-data/_cs_async.mdx" ...@@ -19,7 +19,7 @@ import CSAsyncQuery from "../../07-develop/04-query-data/_cs_async.mdx"
`TDengine.Connector` is a C# language connector provided by TDengine that allows C# developers to develop C# applications that access TDengine cluster data. `TDengine.Connector` is a C# language connector provided by TDengine that allows C# developers to develop C# applications that access TDengine cluster data.
The `TDengine.Connector` connector supports connect to TDengine instances via the TDengine client driver (taosc), providing data writing, querying, subscription, schemaless writing, bind interface, etc. The `TDengine.Connector` currently does not provide a REST connection interface. Developers can write their RESTful application by referring to the [RESTful APIs](https://docs.taosdata.com//reference/restful-api/) documentation. The `TDengine.Connector` connector supports connect to TDengine instances via the TDengine client driver (taosc), providing data writing, querying, subscription, schemaless writing, bind interface, etc. The `TDengine.Connector` currently does not provide a REST connection interface. Developers can write their RESTful application by referring to the [REST API](/reference/rest-api/) documentation.
This article describes how to install `TDengine.Connector` in a Linux or Windows environment and connect to TDengine clusters via `TDengine.Connector` to perform basic operations such as data writing and querying. This article describes how to install `TDengine.Connector` in a Linux or Windows environment and connect to TDengine clusters via `TDengine.Connector` to perform basic operations such as data writing and querying.
......
...@@ -244,12 +244,12 @@ typedef struct { ...@@ -244,12 +244,12 @@ typedef struct {
const void* pMsg; const void* pMsg;
} SSubmitMsgIter; } SSubmitMsgIter;
int32_t tInitSubmitMsgIter(const SSubmitReq* pMsg, SSubmitMsgIter* pIter); int32_t tInitSubmitMsgIter(SSubmitReq* pMsg, SSubmitMsgIter* pIter);
int32_t tGetSubmitMsgNext(SSubmitMsgIter* pIter, SSubmitBlk** pPBlock); int32_t tGetSubmitMsgNext(SSubmitMsgIter* pIter, SSubmitBlk** pPBlock);
int32_t tInitSubmitBlkIter(SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock, SSubmitBlkIter* pIter); int32_t tInitSubmitBlkIter(SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock, SSubmitBlkIter* pIter);
STSRow* tGetSubmitBlkNext(SSubmitBlkIter* pIter); STSRow* tGetSubmitBlkNext(SSubmitBlkIter* pIter);
// for debug // for debug
int32_t tPrintFixedSchemaSubmitReq(const SSubmitReq* pReq, STSchema* pSchema); int32_t tPrintFixedSchemaSubmitReq(SSubmitReq* pReq, STSchema* pSchema);
typedef struct { typedef struct {
int32_t code; int32_t code;
...@@ -1697,7 +1697,7 @@ int32_t tDecodeSRSmaParam(SDecoder* pCoder, SRSmaParam* pRSmaParam); ...@@ -1697,7 +1697,7 @@ int32_t tDecodeSRSmaParam(SDecoder* pCoder, SRSmaParam* pRSmaParam);
// TDMT_VND_CREATE_STB ============== // TDMT_VND_CREATE_STB ==============
typedef struct SVCreateStbReq { typedef struct SVCreateStbReq {
const char* name; char* name;
tb_uid_t suid; tb_uid_t suid;
int8_t rollup; int8_t rollup;
SSchemaWrapper schema; SSchemaWrapper schema;
...@@ -1710,7 +1710,7 @@ int tDecodeSVCreateStbReq(SDecoder* pCoder, SVCreateStbReq* pReq); ...@@ -1710,7 +1710,7 @@ int tDecodeSVCreateStbReq(SDecoder* pCoder, SVCreateStbReq* pReq);
// TDMT_VND_DROP_STB ============== // TDMT_VND_DROP_STB ==============
typedef struct SVDropStbReq { typedef struct SVDropStbReq {
const char* name; char* name;
tb_uid_t suid; tb_uid_t suid;
} SVDropStbReq; } SVDropStbReq;
...@@ -1723,13 +1723,13 @@ typedef struct SVCreateTbReq { ...@@ -1723,13 +1723,13 @@ typedef struct SVCreateTbReq {
int32_t flags; int32_t flags;
tb_uid_t uid; tb_uid_t uid;
int64_t ctime; int64_t ctime;
const char* name; char* name;
int32_t ttl; int32_t ttl;
int8_t type; int8_t type;
union { union {
struct { struct {
tb_uid_t suid; tb_uid_t suid;
const uint8_t* pTag; uint8_t* pTag;
} ctb; } ctb;
struct { struct {
SSchemaWrapper schema; SSchemaWrapper schema;
...@@ -1777,7 +1777,7 @@ int32_t tDeserializeSVCreateTbBatchRsp(void* buf, int32_t bufLen, SVCreateTbBatc ...@@ -1777,7 +1777,7 @@ int32_t tDeserializeSVCreateTbBatchRsp(void* buf, int32_t bufLen, SVCreateTbBatc
// TDMT_VND_DROP_TABLE ================= // TDMT_VND_DROP_TABLE =================
typedef struct { typedef struct {
const char* name; char* name;
int8_t igNotExists; int8_t igNotExists;
} SVDropTbReq; } SVDropTbReq;
...@@ -1809,9 +1809,9 @@ int32_t tDecodeSVDropTbBatchRsp(SDecoder* pCoder, SVDropTbBatchRsp* pRsp); ...@@ -1809,9 +1809,9 @@ int32_t tDecodeSVDropTbBatchRsp(SDecoder* pCoder, SVDropTbBatchRsp* pRsp);
// TDMT_VND_ALTER_TABLE ===================== // TDMT_VND_ALTER_TABLE =====================
typedef struct { typedef struct {
const char* tbName; char* tbName;
int8_t action; int8_t action;
const char* colName; char* colName;
// TSDB_ALTER_TABLE_ADD_COLUMN // TSDB_ALTER_TABLE_ADD_COLUMN
int8_t type; int8_t type;
int8_t flags; int8_t flags;
...@@ -1820,17 +1820,17 @@ typedef struct { ...@@ -1820,17 +1820,17 @@ typedef struct {
// TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES // TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES
int32_t colModBytes; int32_t colModBytes;
// TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME // TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME
const char* colNewName; char* colNewName;
// TSDB_ALTER_TABLE_UPDATE_TAG_VAL // TSDB_ALTER_TABLE_UPDATE_TAG_VAL
const char* tagName; char* tagName;
int8_t isNull; int8_t isNull;
uint32_t nTagVal; uint32_t nTagVal;
const uint8_t* pTagVal; uint8_t* pTagVal;
// TSDB_ALTER_TABLE_UPDATE_OPTIONS // TSDB_ALTER_TABLE_UPDATE_OPTIONS
int8_t updateTTL; int8_t updateTTL;
int32_t newTTL; int32_t newTTL;
int8_t updateComment; int8_t updateComment;
const char* newComment; char* newComment;
} SVAlterTbReq; } SVAlterTbReq;
int32_t tEncodeSVAlterTbReq(SEncoder* pEncoder, const SVAlterTbReq* pReq); int32_t tEncodeSVAlterTbReq(SEncoder* pEncoder, const SVAlterTbReq* pReq);
...@@ -2020,7 +2020,7 @@ static FORCE_INLINE void tFreeClientHbBatchRsp(void* pRsp) { ...@@ -2020,7 +2020,7 @@ static FORCE_INLINE void tFreeClientHbBatchRsp(void* pRsp) {
int32_t tSerializeSClientHbBatchRsp(void* buf, int32_t bufLen, const SClientHbBatchRsp* pBatchRsp); int32_t tSerializeSClientHbBatchRsp(void* buf, int32_t bufLen, const SClientHbBatchRsp* pBatchRsp);
int32_t tDeserializeSClientHbBatchRsp(void* buf, int32_t bufLen, SClientHbBatchRsp* pBatchRsp); int32_t tDeserializeSClientHbBatchRsp(void* buf, int32_t bufLen, SClientHbBatchRsp* pBatchRsp);
void tFreeSClientHbBatchRsp(SClientHbBatchRsp *pBatchRsp); void tFreeSClientHbBatchRsp(SClientHbBatchRsp* pBatchRsp);
static FORCE_INLINE int32_t tEncodeSKv(SEncoder* pEncoder, const SKv* pKv) { static FORCE_INLINE int32_t tEncodeSKv(SEncoder* pEncoder, const SKv* pKv) {
if (tEncodeI32(pEncoder, pKv->key) < 0) return -1; if (tEncodeI32(pEncoder, pKv->key) < 0) return -1;
...@@ -2267,8 +2267,8 @@ typedef struct { ...@@ -2267,8 +2267,8 @@ typedef struct {
int64_t interval; int64_t interval;
int64_t offset; // use unit by precision of DB int64_t offset; // use unit by precision of DB
int64_t sliding; int64_t sliding;
const char* expr; // sma expression char* expr; // sma expression
const char* tagsFilter; char* tagsFilter;
} STSma; // Time-range-wise SMA } STSma; // Time-range-wise SMA
typedef STSma SVCreateTSmaReq; typedef STSma SVCreateTSmaReq;
...@@ -2600,7 +2600,7 @@ typedef struct { ...@@ -2600,7 +2600,7 @@ typedef struct {
int64_t uid; int64_t uid;
int32_t sver; int32_t sver;
uint32_t nData; uint32_t nData;
const uint8_t* pData; uint8_t* pData;
SVCreateTbReq cTbReq; SVCreateTbReq cTbReq;
} SVSubmitBlk; } SVSubmitBlk;
......
...@@ -46,24 +46,34 @@ typedef enum { ...@@ -46,24 +46,34 @@ typedef enum {
AUTH_TYPE_OTHER, AUTH_TYPE_OTHER,
} AUTH_TYPE; } AUTH_TYPE;
typedef struct SUserAuthInfo {
char user[TSDB_USER_LEN];
char dbFName[TSDB_DB_FNAME_LEN];
AUTH_TYPE type;
} SUserAuthInfo;
typedef struct SCatalogReq { typedef struct SCatalogReq {
SArray *pTableName; // element is SNAME SArray *pTableMeta; // element is SNAME
SArray *pUdf; // udf name SArray *pDbVgroup; // element is db full name
SArray *pTableHash; // element is SNAME
SArray *pUdf; // element is udf name
SArray *pDbCfg; // element is db full name
SArray *pIndex; // element is index name
SArray *pUser; // element is SUserAuthInfo
bool qNodeRequired; // valid qnode bool qNodeRequired; // valid qnode
} SCatalogReq; } SCatalogReq;
typedef struct SMetaData { typedef struct SMetaData {
SArray *pTableMeta; // STableMeta array SArray *pTableMeta; // SArray<STableMeta>
SArray *pVgroupInfo; // SVgroupInfo list SArray *pDbVgroup; // SArray<SArray<SVgroupInfo>*>
SArray *pUdfList; // udf info list SArray *pTableHash; // SArray<SVgroupInfo>
SArray *pQnodeList; // qnode list, SArray<SQueryNodeAddr> SArray *pUdfList; // SArray<SFuncInfo>
SArray *pDbCfg; // SArray<SDbCfgInfo>
SArray *pIndex; // SArray<SIndexInfo>
SArray *pUser; // SArray<bool>
SArray *pQnodeList; // SArray<SQueryNodeAddr>
} SMetaData; } SMetaData;
typedef struct STbSVersion {
char* tbFName;
int32_t sver;
} STbSVersion;
typedef struct SCatalogCfg { typedef struct SCatalogCfg {
uint32_t maxTblCacheNum; uint32_t maxTblCacheNum;
uint32_t maxDBCacheNum; uint32_t maxDBCacheNum;
...@@ -88,6 +98,11 @@ typedef struct SDbVgVersion { ...@@ -88,6 +98,11 @@ typedef struct SDbVgVersion {
int32_t numOfTable; // unit is TSDB_TABLE_NUM_UNIT int32_t numOfTable; // unit is TSDB_TABLE_NUM_UNIT
} SDbVgVersion; } SDbVgVersion;
typedef struct STbSVersion {
char* tbFName;
int32_t sver;
} STbSVersion;
typedef struct SUserAuthVersion { typedef struct SUserAuthVersion {
char user[TSDB_USER_LEN]; char user[TSDB_USER_LEN];
int32_t version; int32_t version;
...@@ -96,6 +111,8 @@ typedef struct SUserAuthVersion { ...@@ -96,6 +111,8 @@ typedef struct SUserAuthVersion {
typedef SDbCfgRsp SDbCfgInfo; typedef SDbCfgRsp SDbCfgInfo;
typedef SUserIndexRsp SIndexInfo; typedef SUserIndexRsp SIndexInfo;
typedef void (*catalogCallback)(SMetaData* pResult, void* param, int32_t code);
int32_t catalogInit(SCatalogCfg *cfg); int32_t catalogInit(SCatalogCfg *cfg);
/** /**
...@@ -131,7 +148,7 @@ int32_t catalogUpdateDBVgInfo(SCatalog* pCatalog, const char* dbName, uint64_t d ...@@ -131,7 +148,7 @@ int32_t catalogUpdateDBVgInfo(SCatalog* pCatalog, const char* dbName, uint64_t d
int32_t catalogRemoveDB(SCatalog* pCatalog, const char* dbName, uint64_t dbId); int32_t catalogRemoveDB(SCatalog* pCatalog, const char* dbName, uint64_t dbId);
int32_t catalogRemoveTableMeta(SCatalog* pCtg, const SName* pTableName); int32_t catalogRemoveTableMeta(SCatalog* pCtg, SName* pTableName);
int32_t catalogRemoveStbMeta(SCatalog* pCtg, const char* dbFName, uint64_t dbId, const char* stbName, uint64_t suid); int32_t catalogRemoveStbMeta(SCatalog* pCtg, const char* dbFName, uint64_t dbId, const char* stbName, uint64_t suid);
...@@ -241,9 +258,9 @@ int32_t catalogGetExpiredUsers(SCatalog* pCtg, SUserAuthVersion **users, uint32_ ...@@ -241,9 +258,9 @@ int32_t catalogGetExpiredUsers(SCatalog* pCtg, SUserAuthVersion **users, uint32_
int32_t catalogGetDBCfg(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* dbFName, SDbCfgInfo* pDbCfg); int32_t catalogGetDBCfg(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* dbFName, SDbCfgInfo* pDbCfg);
int32_t catalogGetIndexInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* indexName, SIndexInfo* pInfo); int32_t catalogGetIndexMeta(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* indexName, SIndexInfo* pInfo);
int32_t catalogGetUdfInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* funcName, SFuncInfo** pInfo); int32_t catalogGetUdfInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* funcName, SFuncInfo* pInfo);
int32_t catalogChkAuth(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* user, const char* dbFName, AUTH_TYPE type, bool *pass); int32_t catalogChkAuth(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* user, const char* dbFName, AUTH_TYPE type, bool *pass);
......
...@@ -39,7 +39,7 @@ typedef struct { ...@@ -39,7 +39,7 @@ typedef struct {
} SEncoder; } SEncoder;
typedef struct { typedef struct {
const uint8_t* data; uint8_t* data;
uint32_t size; uint32_t size;
uint32_t pos; uint32_t pos;
SCoderMem* mList; SCoderMem* mList;
...@@ -120,7 +120,7 @@ static int32_t tEncodeCStrWithLen(SEncoder* pCoder, const char* val, uint32_t le ...@@ -120,7 +120,7 @@ static int32_t tEncodeCStrWithLen(SEncoder* pCoder, const char* val, uint32_t le
static int32_t tEncodeCStr(SEncoder* pCoder, const char* val); static int32_t tEncodeCStr(SEncoder* pCoder, const char* val);
/* ------------------------ DECODE ------------------------ */ /* ------------------------ DECODE ------------------------ */
void tDecoderInit(SDecoder* pCoder, const uint8_t* data, uint32_t size); void tDecoderInit(SDecoder* pCoder, uint8_t* data, uint32_t size);
void tDecoderClear(SDecoder* SDecoder); void tDecoderClear(SDecoder* SDecoder);
int32_t tStartDecode(SDecoder* pCoder); int32_t tStartDecode(SDecoder* pCoder);
void tEndDecode(SDecoder* pCoder); void tEndDecode(SDecoder* pCoder);
...@@ -141,9 +141,9 @@ static int32_t tDecodeU64v(SDecoder* pCoder, uint64_t* val); ...@@ -141,9 +141,9 @@ static int32_t tDecodeU64v(SDecoder* pCoder, uint64_t* val);
static int32_t tDecodeI64v(SDecoder* pCoder, int64_t* val); static int32_t tDecodeI64v(SDecoder* pCoder, int64_t* val);
static int32_t tDecodeFloat(SDecoder* pCoder, float* val); static int32_t tDecodeFloat(SDecoder* pCoder, float* val);
static int32_t tDecodeDouble(SDecoder* pCoder, double* val); static int32_t tDecodeDouble(SDecoder* pCoder, double* val);
static int32_t tDecodeBinary(SDecoder* pCoder, const uint8_t** val, uint32_t* len); static int32_t tDecodeBinary(SDecoder* pCoder, uint8_t** val, uint32_t* len);
static int32_t tDecodeCStrAndLen(SDecoder* pCoder, const char** val, uint32_t* len); static int32_t tDecodeCStrAndLen(SDecoder* pCoder, char** val, uint32_t* len);
static int32_t tDecodeCStr(SDecoder* pCoder, const char** val); static int32_t tDecodeCStr(SDecoder* pCoder, char** val);
static int32_t tDecodeCStrTo(SDecoder* pCoder, char* val); static int32_t tDecodeCStrTo(SDecoder* pCoder, char* val);
/* ------------------------ IMPL ------------------------ */ /* ------------------------ IMPL ------------------------ */
...@@ -317,7 +317,7 @@ static FORCE_INLINE int32_t tDecodeI16v(SDecoder* pCoder, int16_t* val) { ...@@ -317,7 +317,7 @@ static FORCE_INLINE int32_t tDecodeI16v(SDecoder* pCoder, int16_t* val) {
if (tDecodeU16v(pCoder, &tval) < 0) { if (tDecodeU16v(pCoder, &tval) < 0) {
return -1; return -1;
} }
*val = ZIGZAGD(int16_t, tval); if (val) *val = ZIGZAGD(int16_t, tval);
return 0; return 0;
} }
...@@ -331,7 +331,7 @@ static FORCE_INLINE int32_t tDecodeI32v(SDecoder* pCoder, int32_t* val) { ...@@ -331,7 +331,7 @@ static FORCE_INLINE int32_t tDecodeI32v(SDecoder* pCoder, int32_t* val) {
if (tDecodeU32v(pCoder, &tval) < 0) { if (tDecodeU32v(pCoder, &tval) < 0) {
return -1; return -1;
} }
*val = ZIGZAGD(int32_t, tval); if (val) *val = ZIGZAGD(int32_t, tval);
return 0; return 0;
} }
...@@ -345,7 +345,7 @@ static FORCE_INLINE int32_t tDecodeI64v(SDecoder* pCoder, int64_t* val) { ...@@ -345,7 +345,7 @@ static FORCE_INLINE int32_t tDecodeI64v(SDecoder* pCoder, int64_t* val) {
if (tDecodeU64v(pCoder, &tval) < 0) { if (tDecodeU64v(pCoder, &tval) < 0) {
return -1; return -1;
} }
*val = ZIGZAGD(int64_t, tval); if (val) *val = ZIGZAGD(int64_t, tval);
return 0; return 0;
} }
...@@ -377,7 +377,7 @@ static FORCE_INLINE int32_t tDecodeDouble(SDecoder* pCoder, double* val) { ...@@ -377,7 +377,7 @@ static FORCE_INLINE int32_t tDecodeDouble(SDecoder* pCoder, double* val) {
return 0; return 0;
} }
static FORCE_INLINE int32_t tDecodeBinary(SDecoder* pCoder, const uint8_t** val, uint32_t* len) { static FORCE_INLINE int32_t tDecodeBinary(SDecoder* pCoder, uint8_t** val, uint32_t* len) {
if (tDecodeU32v(pCoder, len) < 0) return -1; if (tDecodeU32v(pCoder, len) < 0) return -1;
if (TD_CODER_CHECK_CAPACITY_FAILED(pCoder, *len)) return -1; if (TD_CODER_CHECK_CAPACITY_FAILED(pCoder, *len)) return -1;
...@@ -389,19 +389,19 @@ static FORCE_INLINE int32_t tDecodeBinary(SDecoder* pCoder, const uint8_t** val, ...@@ -389,19 +389,19 @@ static FORCE_INLINE int32_t tDecodeBinary(SDecoder* pCoder, const uint8_t** val,
return 0; return 0;
} }
static FORCE_INLINE int32_t tDecodeCStrAndLen(SDecoder* pCoder, const char** val, uint32_t* len) { static FORCE_INLINE int32_t tDecodeCStrAndLen(SDecoder* pCoder, char** val, uint32_t* len) {
if (tDecodeBinary(pCoder, (const uint8_t**)val, len) < 0) return -1; if (tDecodeBinary(pCoder, (uint8_t**)val, len) < 0) return -1;
(*len) -= 1; (*len) -= 1;
return 0; return 0;
} }
static FORCE_INLINE int32_t tDecodeCStr(SDecoder* pCoder, const char** val) { static FORCE_INLINE int32_t tDecodeCStr(SDecoder* pCoder, char** val) {
uint32_t len; uint32_t len;
return tDecodeCStrAndLen(pCoder, val, &len); return tDecodeCStrAndLen(pCoder, val, &len);
} }
static int32_t tDecodeCStrTo(SDecoder* pCoder, char* val) { static int32_t tDecodeCStrTo(SDecoder* pCoder, char* val) {
const char* pStr; char* pStr;
uint32_t len; uint32_t len;
if (tDecodeCStrAndLen(pCoder, &pStr, &len) < 0) return -1; if (tDecodeCStrAndLen(pCoder, &pStr, &len) < 0) return -1;
......
此差异已折叠。
...@@ -121,7 +121,7 @@ struct SAppInstInfo { ...@@ -121,7 +121,7 @@ struct SAppInstInfo {
SCorEpSet mgmtEp; SCorEpSet mgmtEp;
SInstanceSummary summary; SInstanceSummary summary;
SList* pConnList; // STscObj linked list SList* pConnList; // STscObj linked list
int64_t clusterId; uint64_t clusterId;
void* pTransporter; void* pTransporter;
SAppHbMgr* pAppHbMgr; SAppHbMgr* pAppHbMgr;
}; };
...@@ -286,6 +286,8 @@ void initMsgHandleFp(); ...@@ -286,6 +286,8 @@ void initMsgHandleFp();
TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db, TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db,
uint16_t port, int connType); uint16_t port, int connType);
SRequestObj* launchQuery(STscObj* pTscObj, const char* sql, int sqlLen);
int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtCallback* pStmtCb); int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtCallback* pStmtCb);
int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList); int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList);
......
...@@ -565,10 +565,32 @@ const char *taos_get_server_info(TAOS *taos) { ...@@ -565,10 +565,32 @@ const char *taos_get_server_info(TAOS *taos) {
void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param) { void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param) {
if (taos == NULL || sql == NULL) { if (taos == NULL || sql == NULL) {
// todo directly call fp fp(param, NULL, TSDB_CODE_INVALID_PARA);
return;
}
SRequestObj* pRequest = NULL;
int32_t retryNum = 0;
int32_t code = 0;
size_t sqlLen = strlen(sql);
while (retryNum++ < REQUEST_MAX_TRY_TIMES) {
pRequest = launchQuery(taos, sql, sqlLen);
if (pRequest == NULL || TSDB_CODE_SUCCESS == pRequest->code || !NEED_CLIENT_HANDLE_ERROR(pRequest->code)) {
break;
}
code = refreshMeta(taos, pRequest);
if (code) {
pRequest->code = code;
break;
}
destroyRequest(pRequest);
} }
taos_query_l(taos, sql, (int32_t)strlen(sql)); fp(param, pRequest, code);
} }
void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) { void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {
......
...@@ -125,10 +125,10 @@ int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) { ...@@ -125,10 +125,10 @@ int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
struct SCatalog* pCatalog = NULL; struct SCatalog* pCatalog = NULL;
if (usedbRsp.vgVersion >= 0) { if (usedbRsp.vgVersion >= 0) {
int32_t code1 = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog); uint64_t clusterId = pRequest->pTscObj->pAppInfo->clusterId;
int32_t code1 = catalogGetHandle(clusterId, &pCatalog);
if (code1 != TSDB_CODE_SUCCESS) { if (code1 != TSDB_CODE_SUCCESS) {
tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", pRequest->pTscObj->pAppInfo->clusterId, tscWarn("0x%" PRIx64 "catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", pRequest->requestId, clusterId, tstrerror(code1));
tstrerror(code1));
} else { } else {
catalogRemoveDB(pCatalog, usedbRsp.db, usedbRsp.uid); catalogRemoveDB(pCatalog, usedbRsp.db, usedbRsp.uid);
} }
...@@ -158,7 +158,7 @@ int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) { ...@@ -158,7 +158,7 @@ int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
if (output.dbVgroup) taosHashCleanup(output.dbVgroup->vgHash); if (output.dbVgroup) taosHashCleanup(output.dbVgroup->vgHash);
taosMemoryFreeClear(output.dbVgroup); taosMemoryFreeClear(output.dbVgroup);
tscError("failed to build use db output since %s", terrstr()); tscError("0x%" PRIx64" failed to build use db output since %s", pRequest->requestId, terrstr());
} else if (output.dbVgroup) { } else if (output.dbVgroup) {
struct SCatalog* pCatalog = NULL; struct SCatalog* pCatalog = NULL;
......
...@@ -28,7 +28,7 @@ ...@@ -28,7 +28,7 @@
#undef TD_MSG_SEG_CODE_ #undef TD_MSG_SEG_CODE_
#include "tmsgdef.h" #include "tmsgdef.h"
int32_t tInitSubmitMsgIter(const SSubmitReq *pMsg, SSubmitMsgIter *pIter) { int32_t tInitSubmitMsgIter(SSubmitReq *pMsg, SSubmitMsgIter *pIter) {
if (pMsg == NULL) { if (pMsg == NULL) {
terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP; terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP;
return -1; return -1;
...@@ -102,7 +102,7 @@ STSRow *tGetSubmitBlkNext(SSubmitBlkIter *pIter) { ...@@ -102,7 +102,7 @@ STSRow *tGetSubmitBlkNext(SSubmitBlkIter *pIter) {
} }
} }
int32_t tPrintFixedSchemaSubmitReq(const SSubmitReq *pReq, STSchema *pTschema) { int32_t tPrintFixedSchemaSubmitReq(SSubmitReq *pReq, STSchema *pTschema) {
SSubmitMsgIter msgIter = {0}; SSubmitMsgIter msgIter = {0};
if (tInitSubmitMsgIter(pReq, &msgIter) < 0) return -1; if (tInitSubmitMsgIter(pReq, &msgIter) < 0) return -1;
while (true) { while (true) {
......
...@@ -113,6 +113,8 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO ...@@ -113,6 +113,8 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
SRpcMsg *pMsg = *(SRpcMsg **)taosArrayGet(pArray, i); SRpcMsg *pMsg = *(SRpcMsg **)taosArrayGet(pArray, i);
SRpcMsg rsp = {.info = pMsg->info}; SRpcMsg rsp = {.info = pMsg->info};
vnodePreprocessReq(pVnode->pImpl, pMsg);
int32_t ret = syncPropose(vnodeGetSyncHandle(pVnode->pImpl), pMsg, false); int32_t ret = syncPropose(vnodeGetSyncHandle(pVnode->pImpl), pMsg, false);
if (ret == TAOS_SYNC_PROPOSE_NOT_LEADER) { if (ret == TAOS_SYNC_PROPOSE_NOT_LEADER) {
dTrace("msg:%p, is redirect since not leader, vgId:%d ", pMsg, pVnode->vgId); dTrace("msg:%p, is redirect since not leader, vgId:%d ", pMsg, pVnode->vgId);
......
...@@ -51,7 +51,7 @@ int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs); ...@@ -51,7 +51,7 @@ int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs);
void vnodeDestroy(const char *path, STfs *pTfs); void vnodeDestroy(const char *path, STfs *pTfs);
SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb); SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb);
void vnodeClose(SVnode *pVnode); void vnodeClose(SVnode *pVnode);
int32_t vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs, int64_t *version); int32_t vnodePreprocessReq(SVnode *pVnode, SRpcMsg *pMsg);
int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp); int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp);
int32_t vnodeProcessCMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); int32_t vnodeProcessCMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
...@@ -126,7 +126,7 @@ STqReadHandle *tqInitSubmitMsgScanner(SMeta *pMeta); ...@@ -126,7 +126,7 @@ STqReadHandle *tqInitSubmitMsgScanner(SMeta *pMeta);
void tqReadHandleSetColIdList(STqReadHandle *pReadHandle, SArray *pColIdList); void tqReadHandleSetColIdList(STqReadHandle *pReadHandle, SArray *pColIdList);
int32_t tqReadHandleSetTbUidList(STqReadHandle *pHandle, const SArray *tbUidList); int32_t tqReadHandleSetTbUidList(STqReadHandle *pHandle, const SArray *tbUidList);
int32_t tqReadHandleAddTbUidList(STqReadHandle *pHandle, const SArray *tbUidList); int32_t tqReadHandleAddTbUidList(STqReadHandle *pHandle, const SArray *tbUidList);
int32_t tqReadHandleRemoveTbUidList(STqReadHandle* pHandle, const SArray* tbUidList); int32_t tqReadHandleRemoveTbUidList(STqReadHandle *pHandle, const SArray *tbUidList);
int32_t tqReadHandleSetMsg(STqReadHandle *pHandle, SSubmitReq *pMsg, int64_t ver); int32_t tqReadHandleSetMsg(STqReadHandle *pHandle, SSubmitReq *pMsg, int64_t ver);
bool tqNextDataBlock(STqReadHandle *pHandle); bool tqNextDataBlock(STqReadHandle *pHandle);
...@@ -177,7 +177,7 @@ struct SMetaEntry { ...@@ -177,7 +177,7 @@ struct SMetaEntry {
int64_t version; int64_t version;
int8_t type; int8_t type;
tb_uid_t uid; tb_uid_t uid;
const char *name; char *name;
union { union {
struct { struct {
SSchemaWrapper schema; SSchemaWrapper schema;
...@@ -187,7 +187,7 @@ struct SMetaEntry { ...@@ -187,7 +187,7 @@ struct SMetaEntry {
int64_t ctime; int64_t ctime;
int32_t ttlDays; int32_t ttlDays;
tb_uid_t suid; tb_uid_t suid;
const uint8_t *pTags; uint8_t *pTags;
} ctbEntry; } ctbEntry;
struct { struct {
int64_t ctime; int64_t ctime;
......
...@@ -104,7 +104,7 @@ int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeep ...@@ -104,7 +104,7 @@ int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeep
int tsdbClose(STsdb** pTsdb); int tsdbClose(STsdb** pTsdb);
int tsdbBegin(STsdb* pTsdb); int tsdbBegin(STsdb* pTsdb);
int tsdbCommit(STsdb* pTsdb); int tsdbCommit(STsdb* pTsdb);
int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, const SSubmitReq* pMsg); int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq* pMsg);
int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSubmitRsp* pRsp); int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSubmitRsp* pRsp);
int tsdbInsertTableData(STsdb* pTsdb, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock, SSubmitBlkRsp* pRsp); int tsdbInsertTableData(STsdb* pTsdb, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock, SSubmitBlkRsp* pRsp);
tsdbReaderT* tsdbQueryTables(SVnode* pVnode, SQueryTableDataCond* pCond, STableGroupInfo* groupList, uint64_t qId, tsdbReaderT* tsdbQueryTables(SVnode* pVnode, SQueryTableDataCond* pCond, STableGroupInfo* groupList, uint64_t qId,
......
...@@ -30,9 +30,9 @@ int metaCreateSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) { ...@@ -30,9 +30,9 @@ int metaCreateSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
int vLen = 0; int vLen = 0;
const void *pKey = NULL; const void *pKey = NULL;
const void *pVal = NULL; const void *pVal = NULL;
void * pBuf = NULL; void *pBuf = NULL;
int32_t szBuf = 0; int32_t szBuf = 0;
void * p = NULL; void *p = NULL;
SMetaReader mr = {0}; SMetaReader mr = {0};
// validate req // validate req
...@@ -71,9 +71,9 @@ _err: ...@@ -71,9 +71,9 @@ _err:
} }
int metaDropSTable(SMeta *pMeta, int64_t verison, SVDropStbReq *pReq) { int metaDropSTable(SMeta *pMeta, int64_t verison, SVDropStbReq *pReq) {
TBC * pNameIdxc = NULL; TBC *pNameIdxc = NULL;
TBC * pUidIdxc = NULL; TBC *pUidIdxc = NULL;
TBC * pCtbIdxc = NULL; TBC *pCtbIdxc = NULL;
SCtbIdxKey *pCtbIdxKey; SCtbIdxKey *pCtbIdxKey;
const void *pKey = NULL; const void *pKey = NULL;
int nKey; int nKey;
...@@ -134,8 +134,8 @@ _err: ...@@ -134,8 +134,8 @@ _err:
int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) { int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
SMetaEntry oStbEntry = {0}; SMetaEntry oStbEntry = {0};
SMetaEntry nStbEntry = {0}; SMetaEntry nStbEntry = {0};
TBC * pUidIdxc = NULL; TBC *pUidIdxc = NULL;
TBC * pTbDbc = NULL; TBC *pTbDbc = NULL;
const void *pData; const void *pData;
int nData; int nData;
int64_t oversion; int64_t oversion;
...@@ -165,7 +165,9 @@ int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) { ...@@ -165,7 +165,9 @@ int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
ret = tdbTbcGet(pTbDbc, NULL, NULL, &pData, &nData); ret = tdbTbcGet(pTbDbc, NULL, NULL, &pData, &nData);
ASSERT(ret == 0); ASSERT(ret == 0);
tDecoderInit(&dc, pData, nData); oStbEntry.pBuf = taosMemoryMalloc(nData);
memcpy(oStbEntry.pBuf, pData, nData);
tDecoderInit(&dc, oStbEntry.pBuf, nData);
metaDecodeEntry(&dc, &oStbEntry); metaDecodeEntry(&dc, &oStbEntry);
nStbEntry.version = version; nStbEntry.version = version;
...@@ -193,6 +195,7 @@ int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) { ...@@ -193,6 +195,7 @@ int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
// update uid index // update uid index
tdbTbcUpsert(pUidIdxc, &pReq->suid, sizeof(tb_uid_t), &version, sizeof(version), 0); tdbTbcUpsert(pUidIdxc, &pReq->suid, sizeof(tb_uid_t), &version, sizeof(version), 0);
if (oStbEntry.pBuf) taosMemoryFree(oStbEntry.pBuf);
metaULock(pMeta); metaULock(pMeta);
tDecoderClear(&dc); tDecoderClear(&dc);
tdbTbcClose(pTbDbc); tdbTbcClose(pTbDbc);
...@@ -220,9 +223,6 @@ int metaCreateTable(SMeta *pMeta, int64_t version, SVCreateTbReq *pReq) { ...@@ -220,9 +223,6 @@ int metaCreateTable(SMeta *pMeta, int64_t version, SVCreateTbReq *pReq) {
terrno = TSDB_CODE_TDB_TABLE_ALREADY_EXIST; terrno = TSDB_CODE_TDB_TABLE_ALREADY_EXIST;
metaReaderClear(&mr); metaReaderClear(&mr);
return -1; return -1;
} else {
pReq->uid = tGenIdPI64();
pReq->ctime = taosGetTimestampMs();
} }
metaReaderClear(&mr); metaReaderClear(&mr);
...@@ -256,9 +256,9 @@ _err: ...@@ -256,9 +256,9 @@ _err:
} }
int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq, SArray *tbUids) { int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq, SArray *tbUids) {
TBC * pTbDbc = NULL; TBC *pTbDbc = NULL;
TBC * pUidIdxc = NULL; TBC *pUidIdxc = NULL;
TBC * pNameIdxc = NULL; TBC *pNameIdxc = NULL;
const void *pData; const void *pData;
int nData; int nData;
tb_uid_t uid; tb_uid_t uid;
...@@ -377,14 +377,14 @@ int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq, SArray *tbUi ...@@ -377,14 +377,14 @@ int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq, SArray *tbUi
} }
static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAlterTbReq) { static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAlterTbReq) {
void * pVal = NULL; void *pVal = NULL;
int nVal = 0; int nVal = 0;
const void * pData = NULL; const void *pData = NULL;
int nData = 0; int nData = 0;
int ret = 0; int ret = 0;
tb_uid_t uid; tb_uid_t uid;
int64_t oversion; int64_t oversion;
SSchema * pColumn = NULL; SSchema *pColumn = NULL;
SMetaEntry entry = {0}; SMetaEntry entry = {0};
SSchemaWrapper *pSchema; SSchemaWrapper *pSchema;
int c; int c;
...@@ -420,7 +420,9 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl ...@@ -420,7 +420,9 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl
// get table entry // get table entry
SDecoder dc = {0}; SDecoder dc = {0};
tDecoderInit(&dc, pData, nData); entry.pBuf = taosMemoryMalloc(nData);
memcpy(entry.pBuf, pData, nData);
tDecoderInit(&dc, entry.pBuf, nData);
ret = metaDecodeEntry(&dc, &entry); ret = metaDecodeEntry(&dc, &entry);
ASSERT(ret == 0); ASSERT(ret == 0);
...@@ -530,7 +532,7 @@ _err: ...@@ -530,7 +532,7 @@ _err:
static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pAlterTbReq) { static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pAlterTbReq) {
SMetaEntry ctbEntry = {0}; SMetaEntry ctbEntry = {0};
SMetaEntry stbEntry = {0}; SMetaEntry stbEntry = {0};
void * pVal = NULL; void *pVal = NULL;
int nVal = 0; int nVal = 0;
int ret; int ret;
int c; int c;
...@@ -561,7 +563,7 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA ...@@ -561,7 +563,7 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA
oversion = *(int64_t *)pData; oversion = *(int64_t *)pData;
// search table.db // search table.db
TBC * pTbDbc = NULL; TBC *pTbDbc = NULL;
SDecoder dc1 = {0}; SDecoder dc1 = {0};
SDecoder dc2 = {0}; SDecoder dc2 = {0};
...@@ -585,7 +587,7 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA ...@@ -585,7 +587,7 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA
metaDecodeEntry(&dc2, &stbEntry); metaDecodeEntry(&dc2, &stbEntry);
SSchemaWrapper *pTagSchema = &stbEntry.stbEntry.schemaTag; SSchemaWrapper *pTagSchema = &stbEntry.stbEntry.schemaTag;
SSchema * pColumn = NULL; SSchema *pColumn = NULL;
int32_t iCol = 0; int32_t iCol = 0;
for (;;) { for (;;) {
pColumn = NULL; pColumn = NULL;
...@@ -681,8 +683,8 @@ int metaAlterTable(SMeta *pMeta, int64_t version, SVAlterTbReq *pReq) { ...@@ -681,8 +683,8 @@ int metaAlterTable(SMeta *pMeta, int64_t version, SVAlterTbReq *pReq) {
static int metaSaveToTbDb(SMeta *pMeta, const SMetaEntry *pME) { static int metaSaveToTbDb(SMeta *pMeta, const SMetaEntry *pME) {
STbDbKey tbDbKey; STbDbKey tbDbKey;
void * pKey = NULL; void *pKey = NULL;
void * pVal = NULL; void *pVal = NULL;
int kLen = 0; int kLen = 0;
int vLen = 0; int vLen = 0;
SEncoder coder = {0}; SEncoder coder = {0};
...@@ -797,14 +799,14 @@ static void metaDestroyTagIdxKey(STagIdxKey *pTagIdxKey) { ...@@ -797,14 +799,14 @@ static void metaDestroyTagIdxKey(STagIdxKey *pTagIdxKey) {
} }
static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry) { static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry) {
void * pData = NULL; void *pData = NULL;
int nData = 0; int nData = 0;
STbDbKey tbDbKey = {0}; STbDbKey tbDbKey = {0};
SMetaEntry stbEntry = {0}; SMetaEntry stbEntry = {0};
STagIdxKey * pTagIdxKey = NULL; STagIdxKey *pTagIdxKey = NULL;
int32_t nTagIdxKey; int32_t nTagIdxKey;
const SSchema *pTagColumn; // = &stbEntry.stbEntry.schema.pSchema[0]; const SSchema *pTagColumn; // = &stbEntry.stbEntry.schema.pSchema[0];
const void * pTagData = NULL; // const void *pTagData = NULL; //
SDecoder dc = {0}; SDecoder dc = {0};
// get super table // get super table
...@@ -846,7 +848,7 @@ static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry) { ...@@ -846,7 +848,7 @@ static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry) {
static int metaSaveToSkmDb(SMeta *pMeta, const SMetaEntry *pME) { static int metaSaveToSkmDb(SMeta *pMeta, const SMetaEntry *pME) {
SEncoder coder = {0}; SEncoder coder = {0};
void * pVal = NULL; void *pVal = NULL;
int vLen = 0; int vLen = 0;
int rcode = 0; int rcode = 0;
SSkmDbKey skmDbKey = {0}; SSkmDbKey skmDbKey = {0};
......
...@@ -217,7 +217,7 @@ int32_t tsdbInsertData2(SMemTable *pMemTb, int64_t version, const SVSubmitBlk *p ...@@ -217,7 +217,7 @@ int32_t tsdbInsertData2(SMemTable *pMemTb, int64_t version, const SVSubmitBlk *p
if (tDecodeIsEnd(&dc)) break; if (tDecodeIsEnd(&dc)) break;
// decode row // decode row
if (tDecodeBinary(&dc, (const uint8_t **)&tRow.pRow, &tRow.szRow) < 0) { if (tDecodeBinary(&dc, (uint8_t **)&tRow.pRow, &tRow.szRow) < 0) {
terrno = TSDB_CODE_INVALID_MSG; terrno = TSDB_CODE_INVALID_MSG;
return -1; return -1;
} }
...@@ -273,7 +273,7 @@ static FORCE_INLINE int32_t tsdbEncodeRow(SEncoder *pEncoder, const STsdbRow *pR ...@@ -273,7 +273,7 @@ static FORCE_INLINE int32_t tsdbEncodeRow(SEncoder *pEncoder, const STsdbRow *pR
static FORCE_INLINE int32_t tsdbDecodeRow(SDecoder *pDecoder, STsdbRow *pRow) { static FORCE_INLINE int32_t tsdbDecodeRow(SDecoder *pDecoder, STsdbRow *pRow) {
if (tDecodeI64(pDecoder, &pRow->version) < 0) return -1; if (tDecodeI64(pDecoder, &pRow->version) < 0) return -1;
if (tDecodeBinary(pDecoder, (const uint8_t **)&pRow->pRow, &pRow->szRow) < 0) return -1; if (tDecodeBinary(pDecoder, (uint8_t **)&pRow->pRow, &pRow->szRow) < 0) return -1;
return 0; return 0;
} }
......
...@@ -85,7 +85,7 @@ static FORCE_INLINE int tsdbCheckRowRange(STsdb *pTsdb, tb_uid_t uid, STSRow *ro ...@@ -85,7 +85,7 @@ static FORCE_INLINE int tsdbCheckRowRange(STsdb *pTsdb, tb_uid_t uid, STSRow *ro
return 0; return 0;
} }
int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, const SSubmitReq *pMsg) { int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg) {
ASSERT(pMsg != NULL); ASSERT(pMsg != NULL);
// STsdbMeta * pMeta = pTsdb->tsdbMeta; // STsdbMeta * pMeta = pTsdb->tsdbMeta;
SSubmitMsgIter msgIter = {0}; SSubmitMsgIter msgIter = {0};
...@@ -150,7 +150,6 @@ int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, const SSubmitReq *pMsg) { ...@@ -150,7 +150,6 @@ int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, const SSubmitReq *pMsg) {
return -1; return -1;
} }
} }
} }
if (terrno != TSDB_CODE_SUCCESS) return -1; if (terrno != TSDB_CODE_SUCCESS) return -1;
......
...@@ -24,26 +24,62 @@ static int vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, in ...@@ -24,26 +24,62 @@ static int vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, in
static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp); static int vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp);
int vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs, int64_t *version) { int32_t vnodePreprocessReq(SVnode *pVnode, SRpcMsg *pMsg) {
#if 0 SDecoder dc = {0};
SRpcMsg *pMsg;
SRpcMsg *pRpc;
*version = pVnode->state.processed; switch (pMsg->msgType) {
for (int i = 0; i < taosArrayGetSize(pMsgs); i++) { case TDMT_VND_CREATE_TABLE: {
pMsg = *(SRpcMsg **)taosArrayGet(pMsgs, i); int64_t ctime = taosGetTimestampMs();
pRpc = pMsg; int32_t nReqs;
// set request version tDecoderInit(&dc, (uint8_t *)pMsg->pCont + sizeof(SMsgHead), pMsg->contLen - sizeof(SMsgHead));
if (walWrite(pVnode->pWal, pVnode->state.processed++, pRpc->msgType, pRpc->pCont, pRpc->contLen) < 0) { tStartDecode(&dc);
vError("vnode:%d write wal error since %s", TD_VID(pVnode), terrstr());
return -1; tDecodeI32v(&dc, &nReqs);
for (int32_t iReq = 0; iReq < nReqs; iReq++) {
tb_uid_t uid = tGenIdPI64();
tStartDecode(&dc);
tDecodeI32v(&dc, NULL);
*(int64_t *)(dc.data + dc.pos) = uid;
*(int64_t *)(dc.data + dc.pos + 8) = ctime;
tEndDecode(&dc);
}
tEndDecode(&dc);
tDecoderClear(&dc);
} break;
case TDMT_VND_SUBMIT: {
SSubmitMsgIter msgIter = {0};
SSubmitReq *pSubmitReq = (SSubmitReq *)pMsg->pCont;
SSubmitBlk *pBlock = NULL;
int64_t ctime = taosGetTimestampMs();
tInitSubmitMsgIter(pSubmitReq, &msgIter);
for (;;) {
tGetSubmitMsgNext(&msgIter, &pBlock);
if (pBlock == NULL) break;
if (msgIter.schemaLen > 0) {
tDecoderInit(&dc, pBlock->data, msgIter.schemaLen);
tStartDecode(&dc);
tDecodeI32v(&dc, NULL);
*(int64_t *)(dc.data + dc.pos) = tGenIdPI64();
*(int64_t *)(dc.data + dc.pos + 8) = ctime;
tEndDecode(&dc);
tDecoderClear(&dc);
} }
} }
walFsync(pVnode->pWal, false); } break;
default:
break;
}
#endif
return 0; return 0;
} }
...@@ -675,7 +711,7 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in ...@@ -675,7 +711,7 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in
goto _exit; goto _exit;
} }
for (int i = 0;;) { for (;;) {
tGetSubmitMsgNext(&msgIter, &pBlock); tGetSubmitMsgNext(&msgIter, &pBlock);
if (pBlock == NULL) break; if (pBlock == NULL) break;
......
...@@ -58,6 +58,17 @@ enum { ...@@ -58,6 +58,17 @@ enum {
CTG_ACT_MAX CTG_ACT_MAX
}; };
typedef enum {
CTG_TASK_GET_QNODE = 0,
CTG_TASK_GET_DB_VGROUP,
CTG_TASK_GET_DB_CFG,
CTG_TASK_GET_TB_META,
CTG_TASK_GET_TB_HASH,
CTG_TASK_GET_INDEX,
CTG_TASK_GET_UDF,
CTG_TASK_GET_USER,
} CTG_TASK_TYPE;
typedef struct SCtgDebug { typedef struct SCtgDebug {
bool lockEnable; bool lockEnable;
bool cacheEnable; bool cacheEnable;
...@@ -66,6 +77,43 @@ typedef struct SCtgDebug { ...@@ -66,6 +77,43 @@ typedef struct SCtgDebug {
uint32_t showCachePeriodSec; uint32_t showCachePeriodSec;
} SCtgDebug; } SCtgDebug;
typedef struct SCtgTbCacheInfo {
bool inCache;
uint64_t dbId;
uint64_t suid;
int32_t tbType;
} SCtgTbCacheInfo;
typedef struct SCtgTbMetaCtx {
SCtgTbCacheInfo tbInfo;
SName* pName;
int32_t flag;
} SCtgTbMetaCtx;
typedef struct SCtgDbVgCtx {
char dbFName[TSDB_DB_FNAME_LEN];
} SCtgDbVgCtx;
typedef struct SCtgDbCfgCtx {
char dbFName[TSDB_DB_FNAME_LEN];
} SCtgDbCfgCtx;
typedef struct SCtgTbHashCtx {
char dbFName[TSDB_DB_FNAME_LEN];
SName* pName;
} SCtgTbHashCtx;
typedef struct SCtgIndexCtx {
char indexFName[TSDB_INDEX_FNAME_LEN];
} SCtgIndexCtx;
typedef struct SCtgUdfCtx {
char udfName[TSDB_FUNC_NAME_LEN];
} SCtgUdfCtx;
typedef struct SCtgUserCtx {
SUserAuthInfo user;
} SCtgUserCtx;
typedef struct SCtgTbMetaCache { typedef struct SCtgTbMetaCache {
SRWLatch stbLock; SRWLatch stbLock;
...@@ -113,6 +161,55 @@ typedef struct SCatalog { ...@@ -113,6 +161,55 @@ typedef struct SCatalog {
SCtgRentMgmt stbRent; SCtgRentMgmt stbRent;
} SCatalog; } SCatalog;
typedef struct SCtgJob {
int64_t refId;
SArray* pTasks;
int32_t taskDone;
SMetaData jobRes;
int32_t rspCode;
uint64_t queryId;
SCatalog* pCtg;
void* pTrans;
const SEpSet* pMgmtEps;
void* userParam;
catalogCallback userFp;
int32_t tbMetaNum;
int32_t tbHashNum;
int32_t dbVgNum;
int32_t udfNum;
int32_t qnodeNum;
int32_t dbCfgNum;
int32_t indexNum;
int32_t userNum;
} SCtgJob;
typedef struct SCtgMsgCtx {
int32_t reqType;
void* lastOut;
void* out;
char* target;
} SCtgMsgCtx;
typedef struct SCtgTask {
CTG_TASK_TYPE type;
int32_t taskId;
SCtgJob *pJob;
void* taskCtx;
SCtgMsgCtx msgCtx;
void* res;
} SCtgTask;
typedef int32_t (*ctgLanchTaskFp)(SCtgTask*);
typedef int32_t (*ctgHandleTaskMsgRspFp)(SCtgTask*, int32_t, const SDataBuf *, int32_t);
typedef int32_t (*ctgDumpTaskResFp)(SCtgTask*);
typedef struct SCtgAsyncFps {
ctgLanchTaskFp launchFp;
ctgHandleTaskMsgRspFp handleRspFp;
ctgDumpTaskResFp dumpResFp;
} SCtgAsyncFps;
typedef struct SCtgApiStat { typedef struct SCtgApiStat {
#ifdef WINDOWS #ifdef WINDOWS
...@@ -214,6 +311,7 @@ typedef struct SCtgQueue { ...@@ -214,6 +311,7 @@ typedef struct SCtgQueue {
typedef struct SCatalogMgmt { typedef struct SCatalogMgmt {
bool exit; bool exit;
int32_t jobPool;
SRWLatch lock; SRWLatch lock;
SCtgQueue queue; SCtgQueue queue;
TdThread updateThread; TdThread updateThread;
...@@ -327,10 +425,80 @@ typedef struct SCtgAction { ...@@ -327,10 +425,80 @@ typedef struct SCtgAction {
#define CTG_API_LEAVE(c) do { int32_t __code = c; CTG_UNLOCK(CTG_READ, &gCtgMgmt.lock); CTG_API_DEBUG("CTG API leave %s", __FUNCTION__); CTG_RET(__code); } while (0) #define CTG_API_LEAVE(c) do { int32_t __code = c; CTG_UNLOCK(CTG_READ, &gCtgMgmt.lock); CTG_API_DEBUG("CTG API leave %s", __FUNCTION__); CTG_RET(__code); } while (0)
#define CTG_API_ENTER() do { CTG_API_DEBUG("CTG API enter %s", __FUNCTION__); CTG_LOCK(CTG_READ, &gCtgMgmt.lock); if (atomic_load_8((int8_t*)&gCtgMgmt.exit)) { CTG_API_LEAVE(TSDB_CODE_CTG_OUT_OF_SERVICE); } } while (0) #define CTG_API_ENTER() do { CTG_API_DEBUG("CTG API enter %s", __FUNCTION__); CTG_LOCK(CTG_READ, &gCtgMgmt.lock); if (atomic_load_8((int8_t*)&gCtgMgmt.exit)) { CTG_API_LEAVE(TSDB_CODE_CTG_OUT_OF_SERVICE); } } while (0)
#define CTG_PARAMS SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps
extern void ctgdShowTableMeta(SCatalog* pCtg, const char *tbName, STableMeta* p); #define CTG_PARAMS_LIST() pCtg, pTrans, pMgmtEps
extern void ctgdShowClusterCache(SCatalog* pCtg);
extern int32_t ctgdShowCacheInfo(void); void ctgdShowTableMeta(SCatalog* pCtg, const char *tbName, STableMeta* p);
void ctgdShowClusterCache(SCatalog* pCtg);
int32_t ctgdShowCacheInfo(void);
int32_t ctgRemoveTbMetaFromCache(SCatalog* pCtg, SName* pTableName, bool syncReq);
int32_t ctgGetTbMetaFromCache(CTG_PARAMS, SCtgTbMetaCtx* ctx, STableMeta** pTableMeta);
int32_t ctgActUpdateVg(SCtgMetaAction *action);
int32_t ctgActUpdateTb(SCtgMetaAction *action);
int32_t ctgActRemoveDB(SCtgMetaAction *action);
int32_t ctgActRemoveStb(SCtgMetaAction *action);
int32_t ctgActRemoveTb(SCtgMetaAction *action);
int32_t ctgActUpdateUser(SCtgMetaAction *action);
int32_t ctgAcquireVgInfoFromCache(SCatalog* pCtg, const char *dbFName, SCtgDBCache **pCache);
void ctgReleaseDBCache(SCatalog *pCtg, SCtgDBCache *dbCache);
void ctgReleaseVgInfo(SCtgDBCache *dbCache);
int32_t ctgAcquireVgInfoFromCache(SCatalog* pCtg, const char *dbFName, SCtgDBCache **pCache);
int32_t ctgTbMetaExistInCache(SCatalog* pCtg, char *dbFName, char* tbName, int32_t *exist);
int32_t ctgReadTbMetaFromCache(SCatalog* pCtg, SCtgTbMetaCtx* ctx, STableMeta** pTableMeta);
int32_t ctgReadTbSverFromCache(SCatalog *pCtg, const SName *pTableName, int32_t *sver, int32_t *tbType, uint64_t *suid, char *stbName);
int32_t ctgChkAuthFromCache(SCatalog* pCtg, const char* user, const char* dbFName, AUTH_TYPE type, bool *inCache, bool *pass);
int32_t ctgPutRmDBToQueue(SCatalog* pCtg, const char *dbFName, int64_t dbId);
int32_t ctgPutRmStbToQueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *stbName, uint64_t suid, bool syncReq);
int32_t ctgPutRmTbToQueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *tbName, bool syncReq);
int32_t ctgPutUpdateVgToQueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, SDBVgInfo* dbInfo, bool syncReq);
int32_t ctgPutUpdateTbToQueue(SCatalog* pCtg, STableMetaOutput *output, bool syncReq);
int32_t ctgPutUpdateUserToQueue(SCatalog* pCtg, SGetUserAuthRsp *pAuth, bool syncReq);
int32_t ctgMetaRentInit(SCtgRentMgmt *mgmt, uint32_t rentSec, int8_t type);
int32_t ctgMetaRentAdd(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t size);
int32_t ctgMetaRentGet(SCtgRentMgmt *mgmt, void **res, uint32_t *num, int32_t size);
int32_t ctgUpdateTbMetaToCache(SCatalog* pCtg, STableMetaOutput* pOut, bool syncReq);
int32_t ctgStartUpdateThread();
int32_t ctgRelaunchGetTbMetaTask(SCtgTask *pTask);
int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize, int32_t rspCode, char* target);
int32_t ctgGetDBVgInfoFromMnode(CTG_PARAMS, SBuildUseDBInput *input, SUseDbOutput *out, SCtgTask* pTask);
int32_t ctgGetQnodeListFromMnode(CTG_PARAMS, SArray *out, SCtgTask* pTask);
int32_t ctgGetDBCfgFromMnode(CTG_PARAMS, const char *dbFName, SDbCfgInfo *out, SCtgTask* pTask);
int32_t ctgGetIndexInfoFromMnode(CTG_PARAMS, const char *indexName, SIndexInfo *out, SCtgTask* pTask);
int32_t ctgGetUdfInfoFromMnode(CTG_PARAMS, const char *funcName, SFuncInfo *out, SCtgTask* pTask);
int32_t ctgGetUserDbAuthFromMnode(CTG_PARAMS, const char *user, SGetUserAuthRsp *out, SCtgTask* pTask);
int32_t ctgGetTbMetaFromMnodeImpl(CTG_PARAMS, char *dbFName, char* tbName, STableMetaOutput* out, SCtgTask* pTask);
int32_t ctgGetTbMetaFromMnode(CTG_PARAMS, const SName* pTableName, STableMetaOutput* out, SCtgTask* pTask);
int32_t ctgGetTbMetaFromVnode(CTG_PARAMS, const SName* pTableName, SVgroupInfo *vgroupInfo, STableMetaOutput* out, SCtgTask* pTask);
int32_t ctgInitJob(CTG_PARAMS, SCtgJob** job, uint64_t reqId, const SCatalogReq* pReq, catalogCallback fp, void* param);
int32_t ctgLaunchJob(SCtgJob *pJob);
int32_t ctgMakeAsyncRes(SCtgJob *pJob);
int32_t ctgCloneVgInfo(SDBVgInfo *src, SDBVgInfo **dst);
int32_t ctgCloneMetaOutput(STableMetaOutput *output, STableMetaOutput **pOutput);
int32_t ctgGenerateVgList(SCatalog *pCtg, SHashObj *vgHash, SArray** pList);
void ctgFreeJob(void* job);
void ctgFreeHandle(SCatalog* pCtg);
void ctgFreeVgInfo(SDBVgInfo *vgInfo);
int32_t ctgGetVgInfoFromHashValue(SCatalog *pCtg, SDBVgInfo *dbInfo, const SName *pTableName, SVgroupInfo *pVgroup);
void ctgResetTbMetaTask(SCtgTask* pTask);
void ctgFreeDbCache(SCtgDBCache *dbCache);
int32_t ctgStbVersionSortCompare(const void* key1, const void* key2);
int32_t ctgDbVgVersionSortCompare(const void* key1, const void* key2);
int32_t ctgStbVersionSearchCompare(const void* key1, const void* key2);
int32_t ctgDbVgVersionSearchCompare(const void* key1, const void* key2);
void ctgFreeSTableMetaOutput(STableMetaOutput* pOutput);
int32_t ctgUpdateMsgCtx(SCtgMsgCtx* pCtx, int32_t reqType, void* out, char* target);
extern SCatalogMgmt gCtgMgmt;
extern SCtgDebug gCTGDebug;
extern SCtgAsyncFps gCtgAsyncFps[];
#ifdef __cplusplus #ifdef __cplusplus
} }
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_CATALOG_REMOTE_H_
#define _TD_CATALOG_REMOTE_H_
#ifdef __cplusplus
extern "C" {
#endif
typedef struct SCtgTaskCallbackParam {
uint64_t queryId;
int64_t refId;
uint64_t taskId;
int32_t reqType;
} SCtgTaskCallbackParam;
#ifdef __cplusplus
}
#endif
#endif /*_TD_CATALOG_REMOTE_H_*/
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
...@@ -40,10 +40,8 @@ ...@@ -40,10 +40,8 @@
namespace { namespace {
extern "C" int32_t ctgGetTableMetaFromCache(struct SCatalog *pCatalog, const SName *pTableName, STableMeta **pTableMeta,
bool *inCache, int32_t flag, uint64_t *dbId);
extern "C" int32_t ctgdGetClusterCacheNum(struct SCatalog* pCatalog, int32_t type); extern "C" int32_t ctgdGetClusterCacheNum(struct SCatalog* pCatalog, int32_t type);
extern "C" int32_t ctgActUpdateTbl(SCtgMetaAction *action); extern "C" int32_t ctgActUpdateTb(SCtgMetaAction *action);
extern "C" int32_t ctgdEnableDebug(char *option); extern "C" int32_t ctgdEnableDebug(char *option);
extern "C" int32_t ctgdGetStatNum(char *option, void *res); extern "C" int32_t ctgdGetStatNum(char *option, void *res);
...@@ -52,7 +50,7 @@ void ctgTestSetRspCTableMeta(); ...@@ -52,7 +50,7 @@ void ctgTestSetRspCTableMeta();
void ctgTestSetRspSTableMeta(); void ctgTestSetRspSTableMeta();
void ctgTestSetRspMultiSTableMeta(); void ctgTestSetRspMultiSTableMeta();
extern "C" SCatalogMgmt gCtgMgmt; //extern "C" SCatalogMgmt gCtgMgmt;
enum { enum {
CTGT_RSP_VGINFO = 1, CTGT_RSP_VGINFO = 1,
...@@ -859,8 +857,12 @@ void *ctgTestGetCtableMetaThread(void *param) { ...@@ -859,8 +857,12 @@ void *ctgTestGetCtableMetaThread(void *param) {
strcpy(cn.dbname, "db1"); strcpy(cn.dbname, "db1");
strcpy(cn.tname, ctgTestCTablename); strcpy(cn.tname, ctgTestCTablename);
SCtgTbMetaCtx ctx = {0};
ctx.pName = &cn;
ctx.flag = CTG_FLAG_UNKNOWN_STB;
while (!ctgTestStop) { while (!ctgTestStop) {
code = ctgGetTableMetaFromCache(pCtg, &cn, &tbMeta, &inCache, 0, NULL); code = ctgReadTbMetaFromCache(pCtg, &ctx, &tbMeta);
if (code || !inCache) { if (code || !inCache) {
assert(0); assert(0);
} }
...@@ -899,7 +901,7 @@ void *ctgTestSetCtableMetaThread(void *param) { ...@@ -899,7 +901,7 @@ void *ctgTestSetCtableMetaThread(void *param) {
msg->output = output; msg->output = output;
action.data = msg; action.data = msg;
code = ctgActUpdateTbl(&action); code = ctgActUpdateTb(&action);
if (code) { if (code) {
assert(0); assert(0);
} }
......
...@@ -13,7 +13,6 @@ ...@@ -13,7 +13,6 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include <executorimpl.h>
#include "filter.h" #include "filter.h"
#include "function.h" #include "function.h"
#include "functionMgt.h" #include "functionMgt.h"
......
...@@ -1181,6 +1181,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* ...@@ -1181,6 +1181,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pInfo->win); initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pInfo->win);
pInfo->invertible = allInvertible(pInfo->binfo.pCtx, numOfCols); pInfo->invertible = allInvertible(pInfo->binfo.pCtx, numOfCols);
pInfo->invertible = false; // Todo(liuyao): Dependent TSDB API
// pInfo->pTableQueryInfo = initTableQueryInfo(pTableGroupInfo); // pInfo->pTableQueryInfo = initTableQueryInfo(pTableGroupInfo);
if (code != TSDB_CODE_SUCCESS /* || pInfo->pTableQueryInfo == NULL*/) { if (code != TSDB_CODE_SUCCESS /* || pInfo->pTableQueryInfo == NULL*/) {
......
...@@ -66,22 +66,18 @@ static bool isSpecificClassifyFunc(int32_t funcId, uint64_t classification) { ...@@ -66,22 +66,18 @@ static bool isSpecificClassifyFunc(int32_t funcId, uint64_t classification) {
} }
static int32_t getUdfInfo(SFmGetFuncInfoParam* pParam, SFunctionNode* pFunc) { static int32_t getUdfInfo(SFmGetFuncInfoParam* pParam, SFunctionNode* pFunc) {
SFuncInfo* pInfo = NULL; SFuncInfo funcInfo = {0};
int32_t code = catalogGetUdfInfo(pParam->pCtg, pParam->pRpc, pParam->pMgmtEps, pFunc->functionName, &pInfo); int32_t code = catalogGetUdfInfo(pParam->pCtg, pParam->pRpc, pParam->pMgmtEps, pFunc->functionName, &funcInfo);
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
return code; return code;
} }
if (NULL == pInfo) {
snprintf(pParam->pErrBuf, pParam->errBufLen, "Invalid function name: %s", pFunc->functionName);
return TSDB_CODE_FUNC_INVALID_FUNTION;
}
pFunc->funcType = FUNCTION_TYPE_UDF; pFunc->funcType = FUNCTION_TYPE_UDF;
pFunc->funcId = TSDB_FUNC_TYPE_AGGREGATE == pInfo->funcType ? FUNC_AGGREGATE_UDF_ID : FUNC_SCALAR_UDF_ID; pFunc->funcId = TSDB_FUNC_TYPE_AGGREGATE == funcInfo.funcType ? FUNC_AGGREGATE_UDF_ID : FUNC_SCALAR_UDF_ID;
pFunc->node.resType.type = pInfo->outputType; pFunc->node.resType.type = funcInfo.outputType;
pFunc->node.resType.bytes = pInfo->outputLen; pFunc->node.resType.bytes = funcInfo.outputLen;
pFunc->udfBufSize = pInfo->bufSize; pFunc->udfBufSize = funcInfo.bufSize;
tFreeSFuncInfo(pInfo); tFreeSFuncInfo(&funcInfo);
taosMemoryFree(pInfo);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -72,12 +72,20 @@ static int32_t udfSpawnUdfd(SUdfdData* pData) { ...@@ -72,12 +72,20 @@ static int32_t udfSpawnUdfd(SUdfdData* pData) {
char path[PATH_MAX] = {0}; char path[PATH_MAX] = {0};
if (tsProcPath == NULL) { if (tsProcPath == NULL) {
path[0] = '.'; path[0] = '.';
#ifdef WINDOWS
GetModuleFileName(NULL, path, PATH_MAX);
taosDirName(path);
#endif
} else { } else {
strncpy(path, tsProcPath, strlen(tsProcPath)); strncpy(path, tsProcPath, strlen(tsProcPath));
taosDirName(path); taosDirName(path);
} }
#ifdef WINDOWS #ifdef WINDOWS
if (strlen(path)==0) {
strcat(path, "udfd.exe"); strcat(path, "udfd.exe");
} else {
strcat(path, "\\udfd.exe");
}
#else #else
strcat(path, "/udfd"); strcat(path, "/udfd");
#endif #endif
......
...@@ -204,7 +204,7 @@ TEST_F(ParserInitialATest, alterTable) { ...@@ -204,7 +204,7 @@ TEST_F(ParserInitialATest, alterTable) {
} }
}; };
auto setAlterTagFunc = [&](const char* pTbname, const char* pTagName, const uint8_t* pNewVal, uint32_t bytes) { auto setAlterTagFunc = [&](const char* pTbname, const char* pTagName, uint8_t* pNewVal, uint32_t bytes) {
memset(&expect, 0, sizeof(SVAlterTbReq)); memset(&expect, 0, sizeof(SVAlterTbReq));
expect.tbName = strdup(pTbname); expect.tbName = strdup(pTbname);
expect.action = TSDB_ALTER_TABLE_UPDATE_TAG_VAL; expect.action = TSDB_ALTER_TABLE_UPDATE_TAG_VAL;
...@@ -215,7 +215,7 @@ TEST_F(ParserInitialATest, alterTable) { ...@@ -215,7 +215,7 @@ TEST_F(ParserInitialATest, alterTable) {
expect.pTagVal = pNewVal; expect.pTagVal = pNewVal;
}; };
auto setAlterOptionsFunc = [&](const char* pTbname, int32_t ttl, const char* pComment = nullptr) { auto setAlterOptionsFunc = [&](const char* pTbname, int32_t ttl, char* pComment = nullptr) {
memset(&expect, 0, sizeof(SVAlterTbReq)); memset(&expect, 0, sizeof(SVAlterTbReq));
expect.tbName = strdup(pTbname); expect.tbName = strdup(pTbname);
expect.action = TSDB_ALTER_TABLE_UPDATE_OPTIONS; expect.action = TSDB_ALTER_TABLE_UPDATE_OPTIONS;
...@@ -240,7 +240,7 @@ TEST_F(ParserInitialATest, alterTable) { ...@@ -240,7 +240,7 @@ TEST_F(ParserInitialATest, alterTable) {
void* pBuf = POINTER_SHIFT(pVgData->pData, sizeof(SMsgHead)); void* pBuf = POINTER_SHIFT(pVgData->pData, sizeof(SMsgHead));
SVAlterTbReq req = {0}; SVAlterTbReq req = {0};
SDecoder coder = {0}; SDecoder coder = {0};
tDecoderInit(&coder, (const uint8_t*)pBuf, pVgData->size); tDecoderInit(&coder, (uint8_t*)pBuf, pVgData->size);
ASSERT_EQ(tDecodeSVAlterTbReq(&coder, &req), TSDB_CODE_SUCCESS); ASSERT_EQ(tDecodeSVAlterTbReq(&coder, &req), TSDB_CODE_SUCCESS);
ASSERT_EQ(std::string(req.tbName), std::string(expect.tbName)); ASSERT_EQ(std::string(req.tbName), std::string(expect.tbName));
...@@ -274,7 +274,7 @@ TEST_F(ParserInitialATest, alterTable) { ...@@ -274,7 +274,7 @@ TEST_F(ParserInitialATest, alterTable) {
setAlterOptionsFunc("t1", 10, nullptr); setAlterOptionsFunc("t1", 10, nullptr);
run("ALTER TABLE t1 TTL 10"); run("ALTER TABLE t1 TTL 10");
setAlterOptionsFunc("t1", -1, "test"); setAlterOptionsFunc("t1", -1, (char*)"test");
run("ALTER TABLE t1 COMMENT 'test'"); run("ALTER TABLE t1 COMMENT 'test'");
setAlterColFunc("t1", TSDB_ALTER_TABLE_ADD_COLUMN, "cc1", TSDB_DATA_TYPE_BIGINT); setAlterColFunc("t1", TSDB_ALTER_TABLE_ADD_COLUMN, "cc1", TSDB_DATA_TYPE_BIGINT);
...@@ -290,7 +290,7 @@ TEST_F(ParserInitialATest, alterTable) { ...@@ -290,7 +290,7 @@ TEST_F(ParserInitialATest, alterTable) {
run("ALTER TABLE t1 RENAME COLUMN c1 cc1"); run("ALTER TABLE t1 RENAME COLUMN c1 cc1");
int32_t val = 10; int32_t val = 10;
setAlterTagFunc("st1s1", "tag1", (const uint8_t*)&val, sizeof(val)); setAlterTagFunc("st1s1", "tag1", (uint8_t*)&val, sizeof(val));
run("ALTER TABLE st1s1 SET TAG tag1=10"); run("ALTER TABLE st1s1 SET TAG tag1=10");
// todo // todo
......
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
*/ */
#include "planTestUtil.h" #include "planTestUtil.h"
#include <getopt.h>
#include <algorithm> #include <algorithm>
#include <array> #include <array>
......
...@@ -528,20 +528,18 @@ int32_t qwDropTask(QW_FPARAMS_DEF) { ...@@ -528,20 +528,18 @@ int32_t qwDropTask(QW_FPARAMS_DEF) {
} }
int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
qTaskInfo_t *taskHandle = &ctx->taskHandle; qTaskInfo_t taskHandle = ctx->taskHandle;
if (TASK_TYPE_TEMP == ctx->taskType) { if (TASK_TYPE_TEMP == ctx->taskType && taskHandle) {
if (ctx->explain) { if (ctx->explain) {
SExplainExecInfo *execInfo = NULL; SExplainExecInfo *execInfo = NULL;
int32_t resNum = 0; int32_t resNum = 0;
QW_ERR_RET(qGetExplainExecInfo(ctx->taskHandle, &resNum, &execInfo)); QW_ERR_RET(qGetExplainExecInfo(taskHandle, &resNum, &execInfo));
SRpcHandleInfo connInfo = ctx->ctrlConnInfo; SRpcHandleInfo connInfo = ctx->ctrlConnInfo;
connInfo.ahandle = NULL; connInfo.ahandle = NULL;
QW_ERR_RET(qwBuildAndSendExplainRsp(&connInfo, execInfo, resNum)); QW_ERR_RET(qwBuildAndSendExplainRsp(&connInfo, execInfo, resNum));
} }
qwFreeTaskHandle(QW_FPARAMS(), taskHandle);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -554,17 +552,22 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) { ...@@ -554,17 +552,22 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) {
uint64_t useconds = 0; uint64_t useconds = 0;
int32_t i = 0; int32_t i = 0;
int32_t execNum = 0; int32_t execNum = 0;
qTaskInfo_t *taskHandle = &ctx->taskHandle; qTaskInfo_t taskHandle = ctx->taskHandle;
DataSinkHandle sinkHandle = ctx->sinkHandle; DataSinkHandle sinkHandle = ctx->sinkHandle;
while (true) { while (true) {
QW_TASK_DLOG("start to execTask, loopIdx:%d", i++); QW_TASK_DLOG("start to execTask, loopIdx:%d", i++);
code = qExecTask(*taskHandle, &pRes, &useconds); pRes = NULL;
// if *taskHandle is NULL, it's killed right now
if (taskHandle) {
code = qExecTask(taskHandle, &pRes, &useconds);
if (code) { if (code) {
QW_TASK_ELOG("qExecTask failed, code:%x - %s", code, tstrerror(code)); QW_TASK_ELOG("qExecTask failed, code:%x - %s", code, tstrerror(code));
QW_ERR_RET(code); QW_ERR_RET(code);
} }
}
++execNum; ++execNum;
......
...@@ -132,7 +132,7 @@ typedef struct SSchLevel { ...@@ -132,7 +132,7 @@ typedef struct SSchLevel {
int32_t taskSucceed; int32_t taskSucceed;
int32_t taskNum; int32_t taskNum;
int32_t taskLaunchedNum; int32_t taskLaunchedNum;
SHashObj *flowCtrl; // key is ep, element is SSchFlowControl int32_t taskDoneNum;
SArray *subTasks; // Element is SQueryTask SArray *subTasks; // Element is SQueryTask
} SSchLevel; } SSchLevel;
...@@ -175,11 +175,13 @@ typedef struct SSchJob { ...@@ -175,11 +175,13 @@ typedef struct SSchJob {
SArray *levels; // starting from 0. SArray<SSchLevel> SArray *levels; // starting from 0. SArray<SSchLevel>
SNodeList *subPlans; // subplan pointer copied from DAG, no need to free it in scheduler SNodeList *subPlans; // subplan pointer copied from DAG, no need to free it in scheduler
SArray *dataSrcTasks; // SArray<SQueryTask*>
int32_t levelIdx; int32_t levelIdx;
SEpSet dataSrcEps; SEpSet dataSrcEps;
SHashObj *execTasks; // executing tasks, key:taskid, value:SQueryTask* SHashObj *execTasks; // executing tasks, key:taskid, value:SQueryTask*
SHashObj *succTasks; // succeed tasks, key:taskid, value:SQueryTask* SHashObj *succTasks; // succeed tasks, key:taskid, value:SQueryTask*
SHashObj *failTasks; // failed tasks, key:taskid, value:SQueryTask* SHashObj *failTasks; // failed tasks, key:taskid, value:SQueryTask*
SHashObj *flowCtrl; // key is ep, element is SSchFlowControl
SExplainCtx *explainCtx; SExplainCtx *explainCtx;
int8_t status; int8_t status;
...@@ -200,7 +202,7 @@ typedef struct SSchJob { ...@@ -200,7 +202,7 @@ typedef struct SSchJob {
extern SSchedulerMgmt schMgmt; extern SSchedulerMgmt schMgmt;
#define SCH_TASK_READY_TO_LUNCH(readyNum, task) ((readyNum) >= taosArrayGetSize((task)->children)) #define SCH_TASK_READY_FOR_LAUNCH(readyNum, task) ((readyNum) >= taosArrayGetSize((task)->children))
#define SCH_TASK_ID(_task) ((_task) ? (_task)->taskId : -1) #define SCH_TASK_ID(_task) ((_task) ? (_task)->taskId : -1)
#define SCH_SET_TASK_LASTMSG_TYPE(_task, _type) do { if(_task) { atomic_store_32(&(_task)->lastMsgType, _type); } } while (0) #define SCH_SET_TASK_LASTMSG_TYPE(_task, _type) do { if(_task) { atomic_store_32(&(_task)->lastMsgType, _type); } } while (0)
...@@ -223,7 +225,7 @@ extern SSchedulerMgmt schMgmt; ...@@ -223,7 +225,7 @@ extern SSchedulerMgmt schMgmt;
#define SCH_SET_JOB_NEED_FLOW_CTRL(_job) (_job)->attr.needFlowCtrl = true #define SCH_SET_JOB_NEED_FLOW_CTRL(_job) (_job)->attr.needFlowCtrl = true
#define SCH_JOB_NEED_FLOW_CTRL(_job) ((_job)->attr.needFlowCtrl) #define SCH_JOB_NEED_FLOW_CTRL(_job) ((_job)->attr.needFlowCtrl)
#define SCH_TASK_NEED_FLOW_CTRL(_job, _task) (SCH_IS_DATA_SRC_QRY_TASK(_task) && SCH_JOB_NEED_FLOW_CTRL(_job) && SCH_IS_LEAF_TASK(_job, _task) && SCH_IS_LEVEL_UNFINISHED((_task)->level)) #define SCH_TASK_NEED_FLOW_CTRL(_job, _task) (SCH_IS_DATA_SRC_QRY_TASK(_task) && SCH_JOB_NEED_FLOW_CTRL(_job) && SCH_IS_LEVEL_UNFINISHED((_task)->level))
#define SCH_SET_JOB_TYPE(_job, type) (_job)->attr.queryJob = ((type) != SUBPLAN_TYPE_MODIFY) #define SCH_SET_JOB_TYPE(_job, type) (_job)->attr.queryJob = ((type) != SUBPLAN_TYPE_MODIFY)
#define SCH_IS_QUERY_JOB(_job) ((_job)->attr.queryJob) #define SCH_IS_QUERY_JOB(_job) ((_job)->attr.queryJob)
...@@ -261,7 +263,7 @@ int32_t schLaunchTask(SSchJob *job, SSchTask *task); ...@@ -261,7 +263,7 @@ int32_t schLaunchTask(SSchJob *job, SSchTask *task);
int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, SQueryNodeAddr *addr, int32_t msgType); int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, SQueryNodeAddr *addr, int32_t msgType);
SSchJob *schAcquireJob(int64_t refId); SSchJob *schAcquireJob(int64_t refId);
int32_t schReleaseJob(int64_t refId); int32_t schReleaseJob(int64_t refId);
void schFreeFlowCtrl(SSchLevel *pLevel); void schFreeFlowCtrl(SSchJob *pJob);
int32_t schCheckJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel); int32_t schCheckJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel);
int32_t schDecTaskFlowQuota(SSchJob *pJob, SSchTask *pTask); int32_t schDecTaskFlowQuota(SSchJob *pJob, SSchTask *pTask);
int32_t schCheckIncTaskFlowQuota(SSchJob *pJob, SSchTask *pTask, bool *enough); int32_t schCheckIncTaskFlowQuota(SSchJob *pJob, SSchTask *pTask, bool *enough);
......
...@@ -19,13 +19,13 @@ ...@@ -19,13 +19,13 @@
#include "catalog.h" #include "catalog.h"
#include "tref.h" #include "tref.h"
void schFreeFlowCtrl(SSchLevel *pLevel) { void schFreeFlowCtrl(SSchJob *pJob) {
if (NULL == pLevel->flowCtrl) { if (NULL == pJob->flowCtrl) {
return; return;
} }
SSchFlowControl *ctrl = NULL; SSchFlowControl *ctrl = NULL;
void *pIter = taosHashIterate(pLevel->flowCtrl, NULL); void *pIter = taosHashIterate(pJob->flowCtrl, NULL);
while (pIter) { while (pIter) {
ctrl = (SSchFlowControl *)pIter; ctrl = (SSchFlowControl *)pIter;
...@@ -33,11 +33,11 @@ void schFreeFlowCtrl(SSchLevel *pLevel) { ...@@ -33,11 +33,11 @@ void schFreeFlowCtrl(SSchLevel *pLevel) {
taosArrayDestroy(ctrl->taskList); taosArrayDestroy(ctrl->taskList);
} }
pIter = taosHashIterate(pLevel->flowCtrl, pIter); pIter = taosHashIterate(pJob->flowCtrl, pIter);
} }
taosHashCleanup(pLevel->flowCtrl); taosHashCleanup(pJob->flowCtrl);
pLevel->flowCtrl = NULL; pJob->flowCtrl = NULL;
} }
int32_t schCheckJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel) { int32_t schCheckJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel) {
...@@ -47,9 +47,9 @@ int32_t schCheckJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel) { ...@@ -47,9 +47,9 @@ int32_t schCheckJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel) {
} }
int32_t sum = 0; int32_t sum = 0;
int32_t taskNum = taosArrayGetSize(pJob->dataSrcTasks);
for (int32_t i = 0; i < pLevel->taskNum; ++i) { for (int32_t i = 0; i < taskNum; ++i) {
SSchTask *pTask = taosArrayGet(pLevel->subTasks, i); SSchTask *pTask = *(SSchTask **)taosArrayGet(pJob->dataSrcTasks, i);
sum += pTask->plan->execNodeStat.tableNum; sum += pTask->plan->execNodeStat.tableNum;
} }
...@@ -59,9 +59,9 @@ int32_t schCheckJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel) { ...@@ -59,9 +59,9 @@ int32_t schCheckJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
pLevel->flowCtrl = taosHashInit(pLevel->taskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); pJob->flowCtrl = taosHashInit(pJob->taskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
if (NULL == pLevel->flowCtrl) { if (NULL == pJob->flowCtrl) {
SCH_JOB_ELOG("taosHashInit %d flowCtrl failed", pLevel->taskNum); SCH_JOB_ELOG("taosHashInit %d flowCtrl failed", pJob->taskNum);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
...@@ -78,7 +78,7 @@ int32_t schDecTaskFlowQuota(SSchJob *pJob, SSchTask *pTask) { ...@@ -78,7 +78,7 @@ int32_t schDecTaskFlowQuota(SSchJob *pJob, SSchTask *pTask) {
int32_t code = 0; int32_t code = 0;
SEp *ep = SCH_GET_CUR_EP(&pTask->plan->execNode); SEp *ep = SCH_GET_CUR_EP(&pTask->plan->execNode);
ctrl = (SSchFlowControl *)taosHashGet(pLevel->flowCtrl, ep, sizeof(SEp)); ctrl = (SSchFlowControl *)taosHashGet(pJob->flowCtrl, ep, sizeof(SEp));
if (NULL == ctrl) { if (NULL == ctrl) {
SCH_TASK_ELOG("taosHashGet node from flowCtrl failed, fqdn:%s, port:%d", ep->fqdn, ep->port); SCH_TASK_ELOG("taosHashGet node from flowCtrl failed, fqdn:%s, port:%d", ep->fqdn, ep->port);
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
...@@ -110,11 +110,11 @@ int32_t schCheckIncTaskFlowQuota(SSchJob *pJob, SSchTask *pTask, bool *enough) { ...@@ -110,11 +110,11 @@ int32_t schCheckIncTaskFlowQuota(SSchJob *pJob, SSchTask *pTask, bool *enough) {
SEp *ep = SCH_GET_CUR_EP(&pTask->plan->execNode); SEp *ep = SCH_GET_CUR_EP(&pTask->plan->execNode);
do { do {
ctrl = (SSchFlowControl *)taosHashGet(pLevel->flowCtrl, ep, sizeof(SEp)); ctrl = (SSchFlowControl *)taosHashGet(pJob->flowCtrl, ep, sizeof(SEp));
if (NULL == ctrl) { if (NULL == ctrl) {
SSchFlowControl nctrl = {.tableNumSum = pTask->plan->execNodeStat.tableNum, .execTaskNum = 1}; SSchFlowControl nctrl = {.tableNumSum = pTask->plan->execNodeStat.tableNum, .execTaskNum = 1};
code = taosHashPut(pLevel->flowCtrl, ep, sizeof(SEp), &nctrl, sizeof(nctrl)); code = taosHashPut(pJob->flowCtrl, ep, sizeof(SEp), &nctrl, sizeof(nctrl));
if (code) { if (code) {
if (HASH_NODE_EXIST(code)) { if (HASH_NODE_EXIST(code)) {
continue; continue;
...@@ -273,10 +273,9 @@ int32_t schLaunchTasksInFlowCtrlList(SSchJob *pJob, SSchTask *pTask) { ...@@ -273,10 +273,9 @@ int32_t schLaunchTasksInFlowCtrlList(SSchJob *pJob, SSchTask *pTask) {
SCH_ERR_RET(schDecTaskFlowQuota(pJob, pTask)); SCH_ERR_RET(schDecTaskFlowQuota(pJob, pTask));
SSchLevel *pLevel = pTask->level;
SEp *ep = SCH_GET_CUR_EP(&pTask->plan->execNode); SEp *ep = SCH_GET_CUR_EP(&pTask->plan->execNode);
SSchFlowControl *ctrl = (SSchFlowControl *)taosHashGet(pLevel->flowCtrl, ep, sizeof(SEp)); SSchFlowControl *ctrl = (SSchFlowControl *)taosHashGet(pJob->flowCtrl, ep, sizeof(SEp));
if (NULL == ctrl) { if (NULL == ctrl) {
SCH_TASK_ELOG("taosHashGet node from flowCtrl failed, fqdn:%s, port:%d", ep->fqdn, ep->port); SCH_TASK_ELOG("taosHashGet node from flowCtrl failed, fqdn:%s, port:%d", ep->fqdn, ep->port);
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
......
...@@ -4,6 +4,7 @@ ...@@ -4,6 +4,7 @@
#include "ttime.h" #include "ttime.h"
using namespace std; using namespace std;
#define MAX_NUM_SCALABLE_BF 100000
TEST(TD_STREAM_UPDATE_TEST, update) { TEST(TD_STREAM_UPDATE_TEST, update) {
int64_t interval = 20 * 1000; int64_t interval = 20 * 1000;
...@@ -91,11 +92,11 @@ TEST(TD_STREAM_UPDATE_TEST, update) { ...@@ -91,11 +92,11 @@ TEST(TD_STREAM_UPDATE_TEST, update) {
} }
SUpdateInfo *pSU4 = updateInfoInit(-1, TSDB_TIME_PRECISION_MILLI, -1); SUpdateInfo *pSU4 = updateInfoInit(-1, TSDB_TIME_PRECISION_MILLI, -1);
GTEST_ASSERT_EQ(pSU4->watermark, 120 * pSU4->interval); GTEST_ASSERT_EQ(pSU4->watermark, MAX_NUM_SCALABLE_BF * pSU4->interval);
GTEST_ASSERT_EQ(pSU4->interval, MILLISECOND_PER_MINUTE); GTEST_ASSERT_EQ(pSU4->interval, MILLISECOND_PER_MINUTE);
SUpdateInfo *pSU5 = updateInfoInit(0, TSDB_TIME_PRECISION_MILLI, 0); SUpdateInfo *pSU5 = updateInfoInit(0, TSDB_TIME_PRECISION_MILLI, 0);
GTEST_ASSERT_EQ(pSU5->watermark, 120 * pSU4->interval); GTEST_ASSERT_EQ(pSU5->watermark, MAX_NUM_SCALABLE_BF * pSU4->interval);
GTEST_ASSERT_EQ(pSU5->interval, MILLISECOND_PER_MINUTE); GTEST_ASSERT_EQ(pSU5->interval, MILLISECOND_PER_MINUTE);
......
...@@ -411,7 +411,7 @@ SyncPing* syncPingDeserialize3(void* buf, int32_t bufLen) { ...@@ -411,7 +411,7 @@ SyncPing* syncPingDeserialize3(void* buf, int32_t bufLen) {
} }
uint32_t len; uint32_t len;
char* data = NULL; char* data = NULL;
if (tDecodeBinary(&decoder, (const uint8_t**)(&data), &len) < 0) { if (tDecodeBinary(&decoder, (uint8_t**)(&data), &len) < 0) {
return NULL; return NULL;
} }
assert(len = pMsg->dataLen); assert(len = pMsg->dataLen);
...@@ -670,7 +670,7 @@ SyncPingReply* syncPingReplyDeserialize3(void* buf, int32_t bufLen) { ...@@ -670,7 +670,7 @@ SyncPingReply* syncPingReplyDeserialize3(void* buf, int32_t bufLen) {
} }
uint32_t len; uint32_t len;
char* data = NULL; char* data = NULL;
if (tDecodeBinary(&decoder, (const uint8_t**)(&data), &len) < 0) { if (tDecodeBinary(&decoder, (uint8_t**)(&data), &len) < 0) {
return NULL; return NULL;
} }
assert(len = pMsg->dataLen); assert(len = pMsg->dataLen);
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Subproject commit 0aad27d725f4ee6b18daf1db0c07d933aed16eea Subproject commit 788929bdc475d264d8306ceff30f7df006fd18d8
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册