提交 c597de09 编写于 作者: X Xiaoyu Wang

merge main

......@@ -387,7 +387,7 @@ pipeline {
}
steps {
catchError(buildResult: 'FAILURE', stageResult: 'FAILURE') {
timeout(time: 55, unit: 'MINUTES'){
timeout(time: 75, unit: 'MINUTES'){
pre_test_win()
pre_test_build_win()
run_win_ctest()
......
......@@ -368,6 +368,12 @@ typedef struct SSortExecInfo {
int32_t readBytes; // read io bytes
} SSortExecInfo;
typedef struct STUidTagInfo {
char* name;
uint64_t uid;
void* pTagVal;
} STUidTagInfo;
// stream special block column
#define START_TS_COLUMN_INDEX 0
......
......@@ -49,6 +49,7 @@ extern int32_t tsTagFilterResCacheSize;
// queue & threads
extern int32_t tsNumOfRpcThreads;
extern int32_t tsNumOfRpcSessions;
extern int32_t tsNumOfCommitThreads;
extern int32_t tsNumOfTaskQueueThreads;
extern int32_t tsNumOfMnodeQueryThreads;
......@@ -86,9 +87,9 @@ extern int32_t tsTelemInterval;
extern char tsTelemServer[];
extern uint16_t tsTelemPort;
extern bool tsEnableCrashReport;
extern char* tsTelemUri;
extern char* tsClientCrashReportUri;
extern char* tsSvrCrashReportUri;
extern char *tsTelemUri;
extern char *tsClientCrashReportUri;
extern char *tsSvrCrashReportUri;
// query buffer management
extern int32_t tsQueryBufferSize; // maximum allowed usage buffer size in MB for each data node during query processing
......@@ -159,6 +160,8 @@ extern int32_t tsUptimeInterval;
extern int32_t tsRpcRetryLimit;
extern int32_t tsRpcRetryInterval;
extern bool tsDisableStream;
// #define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize)
int32_t taosCreateLog(const char *logname, int32_t logFileNum, const char *cfgDir, const char **envCmd,
......
......@@ -1813,6 +1813,7 @@ typedef struct {
int8_t createStb;
uint64_t targetStbUid;
SArray* fillNullCols; // array of SColLocation
int64_t deleteMark;
int8_t igUpdate;
} SCMCreateStreamReq;
......
......@@ -132,14 +132,16 @@ typedef struct SqlFunctionCtx {
SInputColumnInfoData input;
SResultDataInfo resDataInfo;
uint32_t order; // data block scanner order: asc|desc
uint8_t isPseudoFunc;// denote current function is pseudo function or not [added for perf reason]
uint8_t isNotNullFunc;// not return null value.
uint8_t scanFlag; // record current running step, default: 0
int16_t functionId; // function id
char *pOutput; // final result output buffer, point to sdata->data
int32_t numOfParams;
// input parameter, e.g., top(k, 20), the number of results of top query is kept in param
SFunctParam *param;
// corresponding output buffer for timestamp of each result, e.g., diff/csum
SColumnInfoData *pTsOutput;
int32_t numOfParams;
int32_t offset;
SResultRowEntryInfo *resultInfo;
SSubsidiaryResInfo subsidiaries;
......@@ -152,7 +154,7 @@ typedef struct SqlFunctionCtx {
struct SSDataBlock *pDstBlock; // used by indefinite rows function to set selectivity
SSerializeDataHandle saveHandle;
int32_t exprIdx;
char udfName[TSDB_FUNC_NAME_LEN];
char *udfName;
} SqlFunctionCtx;
typedef struct tExprNode {
......
......@@ -114,6 +114,7 @@ int32_t streamStateGetParTag(SStreamState* pState, int64_t groupId, void** tagVa
#if 0
char* streamStateSessionDump(SStreamState* pState);
char* streamStateIntervalDump(SStreamState* pState);
#endif
#ifdef __cplusplus
......
......@@ -633,6 +633,7 @@ typedef struct SStreamMeta {
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId);
void streamMetaClose(SStreamMeta* streamMeta);
int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask);
int32_t streamMetaAddTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask);
int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t startVer, char* msg, int32_t msgLen);
SStreamTask* streamMetaGetTask(SStreamMeta* pMeta, int32_t taskId);
......@@ -644,7 +645,7 @@ void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId);
int32_t streamMetaBegin(SStreamMeta* pMeta);
int32_t streamMetaCommit(SStreamMeta* pMeta);
int32_t streamMetaRollBack(SStreamMeta* pMeta);
int32_t streamLoadTasks(SStreamMeta* pMeta);
int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver);
// checkpoint
int32_t streamProcessCheckpointSourceReq(SStreamMeta* pMeta, SStreamTask* pTask, SStreamCheckpointSourceReq* pReq);
......
......@@ -36,7 +36,7 @@ extern "C" {
#define SYNC_DEL_WAL_MS (1000 * 60)
#define SYNC_ADD_QUORUM_COUNT 3
#define SYNC_MNODE_LOG_RETENTION 10000
#define SYNC_VNODE_LOG_RETENTION 20
#define SYNC_VNODE_LOG_RETENTION (TSDB_SYNC_LOG_BUFFER_RETENTION + 1)
#define SNAPSHOT_MAX_CLOCK_SKEW_MS 1000 * 10
#define SNAPSHOT_WAIT_MS 1000 * 30
......
......@@ -112,7 +112,12 @@ typedef struct SRpcInit {
// fail fast fp
RpcFFfp ffp;
void *parent;
int32_t connLimitNum;
int32_t connLimitLock;
int8_t supportBatch; // 0: no batch, 1. batch
int32_t batchSize;
void *parent;
} SRpcInit;
typedef struct {
......
......@@ -41,6 +41,7 @@ extern char tsSSE42Enable;
extern char tsAVXEnable;
extern char tsAVX2Enable;
extern char tsFMAEnable;
extern char tsTagFilterCache;
extern char configDir[];
extern char tsDataDir[];
......
......@@ -67,6 +67,10 @@ int32_t* taosGetErrno();
#define TSDB_CODE_RPC_TIMEOUT TAOS_DEF_ERROR_CODE(0, 0x0019) //
#define TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED TAOS_DEF_ERROR_CODE(0, 0x0020) // "Vgroup could not be connected"
#define TSDB_CODE_RPC_SOMENODE_BROKEN_LINK TAOS_DEF_ERROR_CODE(0, 0x0021) //
#define TSDB_CODE_RPC_MAX_SESSIONS TAOS_DEF_ERROR_CODE(0, 0x0022) //
//common & util
#define TSDB_CODE_OPS_NOT_SUPPORT TAOS_DEF_ERROR_CODE(0, 0x0100) //
......
......@@ -43,6 +43,7 @@ typedef struct SArray {
* @return
*/
SArray* taosArrayInit(size_t size, size_t elemSize);
SArray* taosArrayInit_s(size_t size, size_t elemSize, size_t initialSize);
/**
*
......@@ -139,14 +140,6 @@ void* taosArrayGetLast(const SArray* pArray);
*/
size_t taosArrayGetSize(const SArray* pArray);
/**
* set the size of array
* @param pArray
* @param size size of the array
* @return
*/
void taosArraySetSize(SArray* pArray, size_t size);
/**
* insert data into array
* @param pArray
......
......@@ -282,8 +282,9 @@ typedef enum ELogicConditionType {
#define TSDB_DNODE_ROLE_MGMT 1
#define TSDB_DNODE_ROLE_VNODE 2
#define TSDB_MAX_REPLICA 5
#define TSDB_SYNC_LOG_BUFFER_SIZE 4096
#define TSDB_MAX_REPLICA 5
#define TSDB_SYNC_LOG_BUFFER_SIZE 4096
#define TSDB_SYNC_LOG_BUFFER_RETENTION (TSDB_SYNC_LOG_BUFFER_SIZE >> 4)
#define TSDB_TBNAME_COLUMN_INDEX (-1)
#define TSDB_MULTI_TABLEMETA_MAX_NUM 100000 // maximum batch size allowed to load table meta
......@@ -413,7 +414,7 @@ typedef enum ELogicConditionType {
#ifdef WINDOWS
#define TSDB_MAX_RPC_THREADS 4 // windows pipe only support 4 connections.
#else
#define TSDB_MAX_RPC_THREADS 20
#define TSDB_MAX_RPC_THREADS 10
#endif
#define TSDB_QUERY_TYPE_NON_TYPE 0x00u // none type
......
......@@ -89,7 +89,7 @@ bool taosAssertRelease(bool condition);
// Disable all asserts that may compromise the performance.
#if defined DISABLE_ASSERT
#define ASSERT(condition)
#define ASSERTS(condition, ...)
#define ASSERTS(condition, ...) (0)
#else
#define ASSERTS(condition, ...) taosAssertDebug(condition, __FILE__, __LINE__, __VA_ARGS__)
#ifdef NDEBUG
......
......@@ -116,6 +116,7 @@ typedef struct SHNode {
struct SHNode *next;
uint32_t keyLen : 20;
uint32_t dataLen : 12;
uint32_t hashVal;
char data[];
} SHNode;
#pragma pack(pop)
......
......@@ -45,11 +45,25 @@ typedef struct STraceId {
#define TRACE_GET_MSGID(traceId) (traceId)->msgId
#define TRACE_TO_STR(traceId, buf) \
do { \
int64_t rootId = (traceId) != NULL ? (traceId)->rootId : 0; \
int64_t msgId = (traceId) != NULL ? (traceId)->msgId : 0; \
sprintf(buf, "0x%" PRIx64 ":0x%" PRIx64 "", rootId, msgId); \
//#define TRACE_TO_STR(traceId, buf) \
// do { \
// int64_t rootId = (traceId) != NULL ? (traceId)->rootId : 0; \
// int64_t msgId = (traceId) != NULL ? (traceId)->msgId : 0; \
// sprintf(buf, "0x%" PRIx64 ":0x%" PRIx64 "", rootId, msgId); \
// } while (0)
#define TRACE_TO_STR(_traceId, _buf) \
do { \
int64_t rootId = (_traceId) != NULL ? (_traceId)->rootId : 0; \
int64_t msgId = (_traceId) != NULL ? (_traceId)->msgId : 0; \
char* _t = _buf; \
_t[0] = '0'; \
_t[1] = 'x'; \
_t += titoa(rootId, 16, &_t[2]); \
_t[0] = ':'; \
_t[1] = '0'; \
_t[2] = 'x'; \
_t += titoa(msgId, 16, &_t[3]); \
} while (0)
#ifdef __cplusplus
......
......@@ -46,6 +46,9 @@ char *paGetToken(char *src, char **token, int32_t *tokenLen);
int32_t taosByteArrayToHexStr(char bytes[], int32_t len, char hexstr[]);
int32_t taosHexStrToByteArray(char hexstr[], char bytes[]);
int32_t tintToHex(uint64_t val, char hex[]);
int32_t titoa(uint64_t val, size_t radix, char str[]);
char *taosIpStr(uint32_t ipInt);
uint32_t ip2uint(const char *const ip_addr);
void taosIp2String(uint32_t ip, char *str);
......
/*
xxHash - Extremely Fast Hash algorithm
Header File
Copyright (C) 2012-2016, Yann Collet.
BSD 2-Clause License (http://www.opensource.org/licenses/bsd-license.php)
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are
met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above
copyright notice, this list of conditions and the following disclaimer
in the documentation and/or other materials provided with the
distribution.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
You can contact the author at :
- xxHash source repository : https://github.com/Cyan4973/xxHash
*/
/* Notice extracted from xxHash homepage :
xxHash is an extremely fast Hash algorithm, running at RAM speed limits.
It also successfully passes all tests from the SMHasher suite.
Comparison (single thread, Windows Seven 32 bits, using SMHasher on a Core 2 Duo @3GHz)
Name Speed Q.Score Author
xxHash 5.4 GB/s 10
CrapWow 3.2 GB/s 2 Andrew
MumurHash 3a 2.7 GB/s 10 Austin Appleby
SpookyHash 2.0 GB/s 10 Bob Jenkins
SBox 1.4 GB/s 9 Bret Mulvey
Lookup3 1.2 GB/s 9 Bob Jenkins
SuperFastHash 1.2 GB/s 1 Paul Hsieh
CityHash64 1.05 GB/s 10 Pike & Alakuijala
FNV 0.55 GB/s 5 Fowler, Noll, Vo
CRC32 0.43 GB/s 9
MD5-32 0.33 GB/s 10 Ronald L. Rivest
SHA1-32 0.28 GB/s 10
Q.Score is a measure of quality of the hash function.
It depends on successfully passing SMHasher test set.
10 is a perfect score.
A 64-bit version, named XXH64, is available since r35.
It offers much better speed, but for 64-bit applications only.
Name Speed on 64 bits Speed on 32 bits
XXH64 13.8 GB/s 1.9 GB/s
XXH32 6.8 GB/s 6.0 GB/s
*/
#ifndef XXHASH_H_5627135585666179
#define XXHASH_H_5627135585666179 1
#if defined (__cplusplus)
extern "C" {
#endif
/* ****************************
* Definitions
******************************/
#include <stddef.h> /* size_t */
typedef enum { XXH_OK=0, XXH_ERROR } XXH_errorcode;
/* ****************************
* API modifier
******************************/
/** XXH_INLINE_ALL (and XXH_PRIVATE_API)
* This is useful to include xxhash functions in `static` mode
* in order to inline them, and remove their symbol from the public list.
* Inlining can offer dramatic performance improvement on small keys.
* Methodology :
* #define XXH_INLINE_ALL
* #include "xxhash.h"
* `xxhash.c` is automatically included.
* It's not useful to compile and link it as a separate module.
*/
#if defined(XXH_INLINE_ALL) || defined(XXH_PRIVATE_API)
# ifndef XXH_STATIC_LINKING_ONLY
# define XXH_STATIC_LINKING_ONLY
# endif
# if defined(__GNUC__)
# define XXH_PUBLIC_API static __inline __attribute__((unused))
# elif defined (__cplusplus) || (defined (__STDC_VERSION__) && (__STDC_VERSION__ >= 199901L) /* C99 */)
# define XXH_PUBLIC_API static inline
# elif defined(_MSC_VER)
# define XXH_PUBLIC_API static __inline
# else
/* this version may generate warnings for unused static functions */
# define XXH_PUBLIC_API static
# endif
#else
# define XXH_PUBLIC_API /* do nothing */
#endif /* XXH_INLINE_ALL || XXH_PRIVATE_API */
/*! XXH_NAMESPACE, aka Namespace Emulation :
*
* If you want to include _and expose_ xxHash functions from within your own library,
* but also want to avoid symbol collisions with other libraries which may also include xxHash,
*
* you can use XXH_NAMESPACE, to automatically prefix any public symbol from xxhash library
* with the value of XXH_NAMESPACE (therefore, avoid NULL and numeric values).
*
* Note that no change is required within the calling program as long as it includes `xxhash.h` :
* regular symbol name will be automatically translated by this header.
*/
#ifdef XXH_NAMESPACE
# define XXH_CAT(A,B) A##B
# define XXH_NAME2(A,B) XXH_CAT(A,B)
# define XXH_versionNumber XXH_NAME2(XXH_NAMESPACE, XXH_versionNumber)
# define XXH32 XXH_NAME2(XXH_NAMESPACE, XXH32)
# define XXH32_createState XXH_NAME2(XXH_NAMESPACE, XXH32_createState)
# define XXH32_freeState XXH_NAME2(XXH_NAMESPACE, XXH32_freeState)
# define XXH32_reset XXH_NAME2(XXH_NAMESPACE, XXH32_reset)
# define XXH32_update XXH_NAME2(XXH_NAMESPACE, XXH32_update)
# define XXH32_digest XXH_NAME2(XXH_NAMESPACE, XXH32_digest)
# define XXH32_copyState XXH_NAME2(XXH_NAMESPACE, XXH32_copyState)
# define XXH32_canonicalFromHash XXH_NAME2(XXH_NAMESPACE, XXH32_canonicalFromHash)
# define XXH32_hashFromCanonical XXH_NAME2(XXH_NAMESPACE, XXH32_hashFromCanonical)
# define XXH64 XXH_NAME2(XXH_NAMESPACE, XXH64)
# define XXH64_createState XXH_NAME2(XXH_NAMESPACE, XXH64_createState)
# define XXH64_freeState XXH_NAME2(XXH_NAMESPACE, XXH64_freeState)
# define XXH64_reset XXH_NAME2(XXH_NAMESPACE, XXH64_reset)
# define XXH64_update XXH_NAME2(XXH_NAMESPACE, XXH64_update)
# define XXH64_digest XXH_NAME2(XXH_NAMESPACE, XXH64_digest)
# define XXH64_copyState XXH_NAME2(XXH_NAMESPACE, XXH64_copyState)
# define XXH64_canonicalFromHash XXH_NAME2(XXH_NAMESPACE, XXH64_canonicalFromHash)
# define XXH64_hashFromCanonical XXH_NAME2(XXH_NAMESPACE, XXH64_hashFromCanonical)
#endif
/* *************************************
* Version
***************************************/
#define XXH_VERSION_MAJOR 0
#define XXH_VERSION_MINOR 6
#define XXH_VERSION_RELEASE 5
#define XXH_VERSION_NUMBER (XXH_VERSION_MAJOR *100*100 + XXH_VERSION_MINOR *100 + XXH_VERSION_RELEASE)
XXH_PUBLIC_API unsigned XXH_versionNumber (void);
/*-**********************************************************************
* 32-bit hash
************************************************************************/
typedef unsigned int XXH32_hash_t;
/*! XXH32() :
Calculate the 32-bit hash of sequence "length" bytes stored at memory address "input".
The memory between input & input+length must be valid (allocated and read-accessible).
"seed" can be used to alter the result predictably.
Speed on Core 2 Duo @ 3 GHz (single thread, SMHasher benchmark) : 5.4 GB/s */
XXH_PUBLIC_API XXH32_hash_t XXH32 (const void* input, size_t length, unsigned int seed);
/*====== Streaming ======*/
typedef struct XXH32_state_s XXH32_state_t; /* incomplete type */
XXH_PUBLIC_API XXH32_state_t* XXH32_createState(void);
XXH_PUBLIC_API XXH_errorcode XXH32_freeState(XXH32_state_t* statePtr);
XXH_PUBLIC_API void XXH32_copyState(XXH32_state_t* dst_state, const XXH32_state_t* src_state);
XXH_PUBLIC_API XXH_errorcode XXH32_reset (XXH32_state_t* statePtr, unsigned int seed);
XXH_PUBLIC_API XXH_errorcode XXH32_update (XXH32_state_t* statePtr, const void* input, size_t length);
XXH_PUBLIC_API XXH32_hash_t XXH32_digest (const XXH32_state_t* statePtr);
/*
* Streaming functions generate the xxHash of an input provided in multiple segments.
* Note that, for small input, they are slower than single-call functions, due to state management.
* For small inputs, prefer `XXH32()` and `XXH64()`, which are better optimized.
*
* XXH state must first be allocated, using XXH*_createState() .
*
* Start a new hash by initializing state with a seed, using XXH*_reset().
*
* Then, feed the hash state by calling XXH*_update() as many times as necessary.
* The function returns an error code, with 0 meaning OK, and any other value meaning there is an error.
*
* Finally, a hash value can be produced anytime, by using XXH*_digest().
* This function returns the nn-bits hash as an int or long long.
*
* It's still possible to continue inserting input into the hash state after a digest,
* and generate some new hashes later on, by calling again XXH*_digest().
*
* When done, free XXH state space if it was allocated dynamically.
*/
/*====== Canonical representation ======*/
typedef struct { unsigned char digest[4]; } XXH32_canonical_t;
XXH_PUBLIC_API void XXH32_canonicalFromHash(XXH32_canonical_t* dst, XXH32_hash_t hash);
XXH_PUBLIC_API XXH32_hash_t XXH32_hashFromCanonical(const XXH32_canonical_t* src);
/* Default result type for XXH functions are primitive unsigned 32 and 64 bits.
* The canonical representation uses human-readable write convention, aka big-endian (large digits first).
* These functions allow transformation of hash result into and from its canonical format.
* This way, hash values can be written into a file / memory, and remain comparable on different systems and programs.
*/
#ifndef XXH_NO_LONG_LONG
/*-**********************************************************************
* 64-bit hash
************************************************************************/
typedef unsigned long long XXH64_hash_t;
/*! XXH64() :
Calculate the 64-bit hash of sequence of length "len" stored at memory address "input".
"seed" can be used to alter the result predictably.
This function runs faster on 64-bit systems, but slower on 32-bit systems (see benchmark).
*/
XXH_PUBLIC_API XXH64_hash_t XXH64 (const void* input, size_t length, unsigned long long seed);
/*====== Streaming ======*/
typedef struct XXH64_state_s XXH64_state_t; /* incomplete type */
XXH_PUBLIC_API XXH64_state_t* XXH64_createState(void);
XXH_PUBLIC_API XXH_errorcode XXH64_freeState(XXH64_state_t* statePtr);
XXH_PUBLIC_API void XXH64_copyState(XXH64_state_t* dst_state, const XXH64_state_t* src_state);
XXH_PUBLIC_API XXH_errorcode XXH64_reset (XXH64_state_t* statePtr, unsigned long long seed);
XXH_PUBLIC_API XXH_errorcode XXH64_update (XXH64_state_t* statePtr, const void* input, size_t length);
XXH_PUBLIC_API XXH64_hash_t XXH64_digest (const XXH64_state_t* statePtr);
/*====== Canonical representation ======*/
typedef struct { unsigned char digest[8]; } XXH64_canonical_t;
XXH_PUBLIC_API void XXH64_canonicalFromHash(XXH64_canonical_t* dst, XXH64_hash_t hash);
XXH_PUBLIC_API XXH64_hash_t XXH64_hashFromCanonical(const XXH64_canonical_t* src);
#endif /* XXH_NO_LONG_LONG */
#ifdef XXH_STATIC_LINKING_ONLY
/* ================================================================================================
This section contains declarations which are not guaranteed to remain stable.
They may change in future versions, becoming incompatible with a different version of the library.
These declarations should only be used with static linking.
Never use them in association with dynamic linking !
=================================================================================================== */
/* These definitions are only present to allow
* static allocation of XXH state, on stack or in a struct for example.
* Never **ever** use members directly. */
#if !defined (__VMS) \
&& (defined (__cplusplus) \
|| (defined (__STDC_VERSION__) && (__STDC_VERSION__ >= 199901L) /* C99 */) )
# include <stdint.h>
struct XXH32_state_s {
uint32_t total_len_32;
uint32_t large_len;
uint32_t v1;
uint32_t v2;
uint32_t v3;
uint32_t v4;
uint32_t mem32[4];
uint32_t memsize;
uint32_t reserved; /* never read nor write, might be removed in a future version */
}; /* typedef'd to XXH32_state_t */
struct XXH64_state_s {
uint64_t total_len;
uint64_t v1;
uint64_t v2;
uint64_t v3;
uint64_t v4;
uint64_t mem64[4];
uint32_t memsize;
uint32_t reserved[2]; /* never read nor write, might be removed in a future version */
}; /* typedef'd to XXH64_state_t */
# else
struct XXH32_state_s {
unsigned total_len_32;
unsigned large_len;
unsigned v1;
unsigned v2;
unsigned v3;
unsigned v4;
unsigned mem32[4];
unsigned memsize;
unsigned reserved; /* never read nor write, might be removed in a future version */
}; /* typedef'd to XXH32_state_t */
# ifndef XXH_NO_LONG_LONG /* remove 64-bit support */
struct XXH64_state_s {
unsigned long long total_len;
unsigned long long v1;
unsigned long long v2;
unsigned long long v3;
unsigned long long v4;
unsigned long long mem64[4];
unsigned memsize;
unsigned reserved[2]; /* never read nor write, might be removed in a future version */
}; /* typedef'd to XXH64_state_t */
# endif
# endif
#if defined(XXH_INLINE_ALL) || defined(XXH_PRIVATE_API)
# include "xxhash.c" /* include xxhash function bodies as `static`, for inlining */
#endif
#endif /* XXH_STATIC_LINKING_ONLY */
#if defined (__cplusplus)
}
#endif
#endif /* XXHASH_H_5627135585666179 */
......@@ -97,16 +97,14 @@ typedef struct {
typedef struct SQueryExecMetric {
int64_t start; // start timestamp, us
int64_t syntaxStart; // start to parse, us
int64_t syntaxEnd; // end to parse, us
int64_t ctgStart; // start to parse, us
int64_t ctgEnd; // end to parse, us
int64_t semanticEnd;
int64_t planEnd;
int64_t resultReady;
int64_t execEnd;
int64_t send; // start to send to server, us
int64_t rsp; // receive response from server, us
int64_t execStart; // start to parse, us
int64_t parseCostUs;
int64_t ctgCostUs;
int64_t analyseCostUs;
int64_t planCostUs;
int64_t execCostUs;
} SQueryExecMetric;
struct SAppInstInfo {
......
......@@ -82,28 +82,22 @@ static void deregisterRequest(SRequestObj *pRequest) {
"current:%d, app current:%d",
pRequest->self, pTscObj->id, pRequest->requestId, duration / 1000.0, num, currentInst);
tscPerf("insert duration %" PRId64 "us: syntax:%" PRId64 "us, ctg:%" PRId64 "us, semantic:%" PRId64
"us, exec:%" PRId64 "us, stmtType:%d",
duration, pRequest->metric.syntaxEnd - pRequest->metric.syntaxStart,
pRequest->metric.ctgEnd - pRequest->metric.ctgStart, pRequest->metric.semanticEnd - pRequest->metric.ctgEnd,
pRequest->metric.execEnd - pRequest->metric.semanticEnd, pRequest->stmtType);
if (QUERY_NODE_VNODE_MODIFY_STMT == pRequest->stmtType) {
// tscPerf("insert duration %" PRId64 "us: syntax:%" PRId64 "us, ctg:%" PRId64 "us, semantic:%" PRId64
// "us, exec:%" PRId64 "us",
// duration, pRequest->metric.syntaxEnd - pRequest->metric.syntaxStart,
// pRequest->metric.ctgEnd - pRequest->metric.ctgStart, pRequest->metric.semanticEnd -
// pRequest->metric.ctgEnd, pRequest->metric.execEnd - pRequest->metric.semanticEnd);
// atomic_add_fetch_64((int64_t *)&pActivity->insertElapsedTime, duration);
} else if (QUERY_NODE_SELECT_STMT == pRequest->stmtType) {
// tscPerf("select duration %" PRId64 "us: syntax:%" PRId64 "us, ctg:%" PRId64 "us, semantic:%" PRId64
// "us, planner:%" PRId64 "us, exec:%" PRId64 "us, reqId:0x%" PRIx64,
// duration, pRequest->metric.syntaxEnd - pRequest->metric.syntaxStart,
// pRequest->metric.ctgEnd - pRequest->metric.ctgStart, pRequest->metric.semanticEnd -
// pRequest->metric.ctgEnd, pRequest->metric.planEnd - pRequest->metric.semanticEnd,
// pRequest->metric.resultReady - pRequest->metric.planEnd, pRequest->requestId);
atomic_add_fetch_64((int64_t *)&pActivity->queryElapsedTime, duration);
if (pRequest->pQuery && pRequest->pQuery->pRoot) {
if (QUERY_NODE_VNODE_MODIFY_STMT == pRequest->pQuery->pRoot->type &&
(0 == ((SVnodeModifyOpStmt *)pRequest->pQuery->pRoot)->sqlNodeType)) {
tscDebug("insert duration %" PRId64 "us: parseCost:%" PRId64 "us, ctgCost:%" PRId64 "us, analyseCost:%" PRId64
"us, planCost:%" PRId64 "us, exec:%" PRId64 "us",
duration, pRequest->metric.parseCostUs, pRequest->metric.ctgCostUs, pRequest->metric.analyseCostUs,
pRequest->metric.planCostUs, pRequest->metric.execCostUs);
atomic_add_fetch_64((int64_t *)&pActivity->insertElapsedTime, duration);
} else if (QUERY_NODE_SELECT_STMT == pRequest->stmtType) {
tscDebug("query duration %" PRId64 "us: parseCost:%" PRId64 "us, ctgCost:%" PRId64 "us, analyseCost:%" PRId64
"us, planCost:%" PRId64 "us, exec:%" PRId64 "us",
duration, pRequest->metric.parseCostUs, pRequest->metric.ctgCostUs, pRequest->metric.analyseCostUs,
pRequest->metric.planCostUs, pRequest->metric.execCostUs);
atomic_add_fetch_64((int64_t *)&pActivity->queryElapsedTime, duration);
}
}
if (duration >= SLOW_QUERY_INTERVAL) {
......@@ -370,8 +364,6 @@ void doDestroyRequest(void *p) {
taosArrayDestroy(pRequest->tableList);
taosArrayDestroy(pRequest->dbList);
taosArrayDestroy(pRequest->targetTableList);
qDestroyQuery(pRequest->pQuery);
nodesDestroyAllocator(pRequest->allocatorRefId);
destroyQueryExecRes(&pRequest->body.resInfo.execRes);
......@@ -386,6 +378,9 @@ void doDestroyRequest(void *p) {
taosMemoryFree(pRequest->body.param);
}
qDestroyQuery(pRequest->pQuery);
nodesDestroyAllocator(pRequest->allocatorRefId);
taosMemoryFreeClear(pRequest->sqlstr);
taosMemoryFree(pRequest);
tscTrace("end to destroy request %" PRIx64 " p:%p", reqId, pRequest);
......
......@@ -323,7 +323,7 @@ void asyncExecLocalCmd(SRequestObj* pRequest, SQuery* pQuery) {
return;
}
int32_t code = qExecCommand(&pRequest->pTscObj->id ,pRequest->pTscObj->sysInfo, pQuery->pRoot, &pRsp);
int32_t code = qExecCommand(&pRequest->pTscObj->id, pRequest->pTscObj->sysInfo, pQuery->pRoot, &pRsp);
if (TSDB_CODE_SUCCESS == code && NULL != pRsp) {
code = setQueryResultFromRsp(&pRequest->body.resInfo, pRsp, false, true);
}
......@@ -465,7 +465,7 @@ int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArra
}
void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols) {
if(pResInfo == NULL || pSchema == NULL || numOfCols <= 0){
if (pResInfo == NULL || pSchema == NULL || numOfCols <= 0) {
tscError("invalid paras, pResInfo == NULL || pSchema == NULL || numOfCols <= 0");
return;
}
......@@ -479,7 +479,7 @@ void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t
}
pResInfo->fields = taosMemoryCalloc(numOfCols, sizeof(TAOS_FIELD));
pResInfo->userFields = taosMemoryCalloc(numOfCols, sizeof(TAOS_FIELD));
if(numOfCols != pResInfo->numOfCols){
if (numOfCols != pResInfo->numOfCols) {
tscError("numOfCols:%d != pResInfo->numOfCols:%d", numOfCols, pResInfo->numOfCols);
return;
}
......@@ -925,7 +925,7 @@ void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) {
removeMeta(pTscObj, pRequest->targetTableList);
}
pRequest->metric.execEnd = taosGetTimestampUs();
pRequest->metric.execCostUs = taosGetTimestampUs() - pRequest->metric.execStart;
int32_t code1 = handleQueryExecRsp(pRequest);
if (pRequest->code == TSDB_CODE_SUCCESS && pRequest->code != code1) {
pRequest->code = code1;
......@@ -1051,11 +1051,10 @@ static int32_t asyncExecSchQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaDat
pRequest->body.subplanNum = pDag->numOfSubplans;
}
pRequest->metric.planEnd = taosGetTimestampUs();
if (code == TSDB_CODE_SUCCESS) {
tscDebug("0x%" PRIx64 " create query plan success, elapsed time:%.2f ms, 0x%" PRIx64, pRequest->self,
(pRequest->metric.planEnd - st) / 1000.0, pRequest->requestId);
}
pRequest->metric.execStart = taosGetTimestampUs();
pRequest->metric.planCostUs = pRequest->metric.execStart - st;
if (TSDB_CODE_SUCCESS == code && !pRequest->validateOnly) {
SArray* pNodeList = NULL;
if (QUERY_NODE_VNODE_MODIFY_STMT != nodeType(pQuery->pRoot)) {
......@@ -1103,6 +1102,17 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultM
destorySqlCallbackWrapper(pWrapper);
}
if (pQuery->pRoot && !pRequest->inRetry) {
STscObj* pTscObj = pRequest->pTscObj;
SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
if (QUERY_NODE_VNODE_MODIFY_STMT == pQuery->pRoot->type &&
(0 == ((SVnodeModifyOpStmt*)pQuery->pRoot)->sqlNodeType)) {
atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertsReq, 1);
} else if (QUERY_NODE_SELECT_STMT == pQuery->pRoot->type) {
atomic_add_fetch_64((int64_t*)&pActivity->numOfQueryReq, 1);
}
}
switch (pQuery->execMode) {
case QUERY_EXEC_MODE_LOCAL:
asyncExecLocalCmd(pRequest, pQuery);
......@@ -1358,7 +1368,7 @@ int32_t doProcessMsgFromServer(void* param) {
SEpSet* pEpSet = arg->pEpset;
SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->info.ahandle;
if(pMsg->info.ahandle == NULL){
if (pMsg->info.ahandle == NULL) {
tscError("doProcessMsgFromServer pMsg->info.ahandle == NULL");
return TSDB_CODE_TSC_INTERNAL_ERROR;
}
......@@ -1374,24 +1384,12 @@ int32_t doProcessMsgFromServer(void* param) {
if (pSendInfo->requestObjRefId != 0) {
SRequestObj* pRequest = (SRequestObj*)taosAcquireRef(clientReqRefPool, pSendInfo->requestObjRefId);
if (pRequest) {
if(pRequest->self != pSendInfo->requestObjRefId){
tscError("doProcessMsgFromServer pRequest->self:%"PRId64" != pSendInfo->requestObjRefId:%"PRId64, pRequest->self, pSendInfo->requestObjRefId);
if (pRequest->self != pSendInfo->requestObjRefId) {
tscError("doProcessMsgFromServer pRequest->self:%" PRId64 " != pSendInfo->requestObjRefId:%" PRId64,
pRequest->self, pSendInfo->requestObjRefId);
return TSDB_CODE_TSC_INTERNAL_ERROR;
}
pRequest->metric.rsp = taosGetTimestampUs();
pTscObj = pRequest->pTscObj;
/*
* There is not response callback function for submit response.
* The actual inserted number of points is the first number.
*/
int32_t elapsed = pRequest->metric.rsp - pRequest->metric.start;
if (pMsg->code == TSDB_CODE_SUCCESS) {
tscDebug("0x%" PRIx64 " rsp msg:%s, code:%s rspLen:%d, elapsed:%d ms, reqId:0x%" PRIx64, pRequest->self,
TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed / 1000, pRequest->requestId);
} else {
tscError("0x%" PRIx64 " rsp msg:%s, code:%s rspLen:%d, elapsed time:%d ms, reqId:0x%" PRIx64, pRequest->self,
TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed / 1000, pRequest->requestId);
}
}
}
......@@ -1523,7 +1521,7 @@ void doSetOneRowPtr(SReqResultInfo* pResultInfo) {
}
void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) {
if(pRequest == NULL){
if (pRequest == NULL) {
return NULL;
}
......@@ -1579,7 +1577,7 @@ static void syncFetchFn(void* param, TAOS_RES* res, int32_t numOfRows) {
}
void* doAsyncFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) {
if(pRequest == NULL){
if (pRequest == NULL) {
return NULL;
}
......@@ -1645,8 +1643,11 @@ static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t numOfRows, int
char* pStart = pCol->offset[j] + pCol->pData;
int32_t len = taosUcs4ToMbs((TdUcs4*)varDataVal(pStart), varDataLen(pStart), varDataVal(p));
if(len > bytes || (p + len) >= (pResultInfo->convertBuf[i] + colLength[i])){
tscError("doConvertUCS4 error, invalid data. len:%d, bytes:%d, (p + len):%p, (pResultInfo->convertBuf[i] + colLength[i]):%p", len, bytes, (p + len), (pResultInfo->convertBuf[i] + colLength[i]));
if (len > bytes || (p + len) >= (pResultInfo->convertBuf[i] + colLength[i])) {
tscError(
"doConvertUCS4 error, invalid data. len:%d, bytes:%d, (p + len):%p, (pResultInfo->convertBuf[i] + "
"colLength[i]):%p",
len, bytes, (p + len), (pResultInfo->convertBuf[i] + colLength[i]));
return TSDB_CODE_TSC_INTERNAL_ERROR;
}
......@@ -1675,7 +1676,7 @@ static int32_t estimateJsonLen(SReqResultInfo* pResultInfo, int32_t numOfCols, i
// | version | total length | total rows | total columns | flag seg| block group id | column schema | each column
// length |
int32_t cols = *(int32_t*)(p + sizeof(int32_t) * 3);
if(ASSERT(numOfCols == cols)){
if (ASSERT(numOfCols == cols)) {
tscError("estimateJsonLen error: numOfCols:%d != cols:%d", numOfCols, cols);
return -1;
}
......@@ -1748,7 +1749,7 @@ static int32_t doConvertJson(SReqResultInfo* pResultInfo, int32_t numOfCols, int
char* p = (char*)pResultInfo->pData;
int32_t dataLen = estimateJsonLen(pResultInfo, numOfCols, numOfRows);
if(dataLen <= 0){
if (dataLen <= 0) {
return TSDB_CODE_TSC_INTERNAL_ERROR;
}
......@@ -1758,7 +1759,7 @@ static int32_t doConvertJson(SReqResultInfo* pResultInfo, int32_t numOfCols, int
int32_t totalLen = 0;
int32_t cols = *(int32_t*)(p + sizeof(int32_t) * 3);
if(ASSERT(numOfCols == cols)){
if (ASSERT(numOfCols == cols)) {
tscError("doConvertJson error: numOfCols:%d != cols:%d", numOfCols, cols);
return TSDB_CODE_TSC_INTERNAL_ERROR;
}
......@@ -1783,7 +1784,7 @@ static int32_t doConvertJson(SReqResultInfo* pResultInfo, int32_t numOfCols, int
for (int32_t i = 0; i < numOfCols; ++i) {
int32_t colLen = htonl(colLength[i]);
int32_t colLen1 = htonl(colLength1[i]);
if(ASSERT(colLen < dataLen)){
if (ASSERT(colLen < dataLen)) {
tscError("doConvertJson error: colLen:%d >= dataLen:%d", colLen, dataLen);
return TSDB_CODE_TSC_INTERNAL_ERROR;
}
......@@ -1870,7 +1871,7 @@ static int32_t doConvertJson(SReqResultInfo* pResultInfo, int32_t numOfCols, int
int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows,
bool convertUcs4) {
if(ASSERT(numOfCols > 0 && pFields != NULL && pResultInfo != NULL)){
if (ASSERT(numOfCols > 0 && pFields != NULL && pResultInfo != NULL)) {
tscError("setResultDataPtr paras error");
return TSDB_CODE_TSC_INTERNAL_ERROR;
}
......@@ -1902,8 +1903,9 @@ int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32
int32_t cols = *(int32_t*)p;
p += sizeof(int32_t);
if(ASSERT(rows == numOfRows && cols == numOfCols)){
tscError("setResultDataPtr paras error:rows;%d numOfRows:%d cols:%d numOfCols:%d", rows, numOfRows, cols, numOfCols);
if (ASSERT(rows == numOfRows && cols == numOfCols)) {
tscError("setResultDataPtr paras error:rows;%d numOfRows:%d cols:%d numOfCols:%d", rows, numOfRows, cols,
numOfCols);
return TSDB_CODE_TSC_INTERNAL_ERROR;
}
......@@ -1970,7 +1972,7 @@ char* getDbOfConnection(STscObj* pObj) {
}
void setConnectionDB(STscObj* pTscObj, const char* db) {
if(db == NULL || pTscObj == NULL){
if (db == NULL || pTscObj == NULL) {
tscError("setConnectionDB para is NULL");
return;
}
......@@ -1992,7 +1994,7 @@ void resetConnectDB(STscObj* pTscObj) {
int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4,
bool freeAfterUse) {
if(pResultInfo == NULL || pRsp == NULL){
if (pResultInfo == NULL || pRsp == NULL) {
tscError("setQueryResultFromRsp paras is null");
return TSDB_CODE_TSC_INTERNAL_ERROR;
}
......
......@@ -752,7 +752,8 @@ static void doAsyncQueryFromAnalyse(SMetaData *pResultMeta, void *param, int32_t
SRequestObj *pRequest = pWrapper->pRequest;
SQuery *pQuery = pRequest->pQuery;
pRequest->metric.ctgEnd = taosGetTimestampUs();
int64_t analyseStart = taosGetTimestampUs();
pRequest->metric.ctgCostUs = analyseStart - pRequest->metric.ctgStart;
qDebug("0x%" PRIx64 " start to semantic analysis, reqId:0x%" PRIx64, pRequest->self, pRequest->requestId);
if (code == TSDB_CODE_SUCCESS) {
......@@ -763,7 +764,7 @@ static void doAsyncQueryFromAnalyse(SMetaData *pResultMeta, void *param, int32_t
}
}
pRequest->metric.semanticEnd = taosGetTimestampUs();
pRequest->metric.analyseCostUs = taosGetTimestampUs() - analyseStart;
if (code == TSDB_CODE_SUCCESS) {
if (pQuery->haveResultSet) {
......@@ -775,10 +776,6 @@ static void doAsyncQueryFromAnalyse(SMetaData *pResultMeta, void *param, int32_t
TSWAP(pRequest->tableList, (pQuery)->pTableList);
TSWAP(pRequest->targetTableList, (pQuery)->pTargetTableList);
double el = (pRequest->metric.semanticEnd - pRequest->metric.ctgEnd) / 1000.0;
tscDebug("0x%" PRIx64 " analysis semantics completed, start async query, elapsed time:%.2f ms, reqId:0x%" PRIx64,
pRequest->self, el, pRequest->requestId);
launchAsyncQuery(pRequest, pQuery, pResultMeta, pWrapper);
} else {
destorySqlCallbackWrapper(pWrapper);
......@@ -843,7 +840,7 @@ static void doAsyncQueryFromParse(SMetaData *pResultMeta, void *param, int32_t c
SRequestObj *pRequest = pWrapper->pRequest;
SQuery *pQuery = pRequest->pQuery;
pRequest->metric.ctgEnd = taosGetTimestampUs();
pRequest->metric.ctgCostUs += taosGetTimestampUs() - pRequest->metric.ctgStart;
qDebug("0x%" PRIx64 " start to continue parse, reqId:0x%" PRIx64 ", code:%s", pRequest->self, pRequest->requestId,
tstrerror(code));
......@@ -954,7 +951,7 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) {
}
if (TSDB_CODE_SUCCESS == code) {
pRequest->metric.syntaxStart = taosGetTimestampUs();
int64_t syntaxStart = taosGetTimestampUs();
pWrapper->pCatalogReq = taosMemoryCalloc(1, sizeof(SCatalogReq));
if (pWrapper->pCatalogReq == NULL) {
......@@ -965,19 +962,11 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) {
code = qParseSqlSyntax(pWrapper->pParseCtx, &pRequest->pQuery, pWrapper->pCatalogReq);
}
pRequest->metric.syntaxEnd = taosGetTimestampUs();
}
if (TSDB_CODE_SUCCESS == code && !updateMetaForce) {
SAppClusterSummary *pActivity = &pTscObj->pAppInfo->summary;
if (QUERY_NODE_INSERT_STMT == nodeType(pRequest->pQuery->pRoot)) {
atomic_add_fetch_64((int64_t *)&pActivity->numOfInsertsReq, 1);
} else if (QUERY_NODE_SELECT_STMT == nodeType(pRequest->pQuery->pRoot)) {
atomic_add_fetch_64((int64_t *)&pActivity->numOfQueryReq, 1);
}
pRequest->metric.parseCostUs += taosGetTimestampUs() - syntaxStart;
}
if (TSDB_CODE_SUCCESS == code) {
pRequest->stmtType = pRequest->pQuery->pRoot->type;
phaseAsyncQuery(pWrapper);
} else {
tscError("0x%" PRIx64 " error happens, code:%d - %s, reqId:0x%" PRIx64, pRequest->self, code, tstrerror(code),
......@@ -1004,7 +993,6 @@ static void fetchCallback(void *pResult, void *param, int32_t code) {
SRequestObj *pRequest = (SRequestObj *)param;
SReqResultInfo *pResultInfo = &pRequest->body.resInfo;
pRequest->metric.resultReady = taosGetTimestampUs();
tscDebug("0x%" PRIx64 " enter scheduler fetch cb, code:%d - %s, reqId:0x%" PRIx64, pRequest->self, code,
tstrerror(code), pRequest->requestId);
......
此差异已折叠。
此差异已折叠。
......@@ -20,16 +20,17 @@
#include "clientSml.h"
int32_t is_same_child_table_telnet(const void *a, const void *b){
int32_t is_same_child_table_telnet(const void *a, const void *b) {
SSmlLineInfo *t1 = (SSmlLineInfo *)a;
SSmlLineInfo *t2 = (SSmlLineInfo *)b;
// uError("is_same_child_table_telnet len:%d,%d %s,%s @@@ len:%d,%d %s,%s", t1->measureLen, t2->measureLen,
// t1->measure, t2->measure, t1->tagsLen, t2->tagsLen, t1->tags, t2->tags);
if(t1 == NULL || t2 == NULL || t1->measure == NULL || t2->measure == NULL
|| t1->tags == NULL || t2->tags == NULL)
// uError("is_same_child_table_telnet len:%d,%d %s,%s @@@ len:%d,%d %s,%s", t1->measureLen, t2->measureLen,
// t1->measure, t2->measure, t1->tagsLen, t2->tagsLen, t1->tags, t2->tags);
if (t1 == NULL || t2 == NULL || t1->measure == NULL || t2->measure == NULL || t1->tags == NULL || t2->tags == NULL)
return 1;
return (((t1->measureLen == t2->measureLen) && memcmp(t1->measure, t2->measure, t1->measureLen) == 0)
&& ((t1->tagsLen == t2->tagsLen) && memcmp(t1->tags, t2->tags, t1->tagsLen) == 0)) ? 0 : 1;
return (((t1->measureLen == t2->measureLen) && memcmp(t1->measure, t2->measure, t1->measureLen) == 0) &&
((t1->tagsLen == t2->tagsLen) && memcmp(t1->tags, t2->tags, t1->tagsLen) == 0))
? 0
: 1;
}
int64_t smlParseOpenTsdbTime(SSmlHandle *info, const char *data, int32_t len) {
......@@ -40,7 +41,7 @@ int64_t smlParseOpenTsdbTime(SSmlHandle *info, const char *data, int32_t len) {
return -1;
}
if (unlikely(len == 1 && data[0] == '0')) {
return taosGetTimestampNs()/smlFactorNS[toPrecision];
return taosGetTimestampNs() / smlFactorNS[toPrecision];
}
int8_t fromPrecision = smlGetTsTypeByLen(len);
if (unlikely(fromPrecision == -1)) {
......@@ -56,7 +57,6 @@ int64_t smlParseOpenTsdbTime(SSmlHandle *info, const char *data, int32_t len) {
return ts;
}
static void smlParseTelnetElement(char **sql, char *sqlEnd, char **data, int32_t *len) {
while (*sql < sqlEnd) {
if (unlikely((**sql != SPACE && !(*data)))) {
......@@ -70,7 +70,7 @@ static void smlParseTelnetElement(char **sql, char *sqlEnd, char **data, int32_t
}
static int32_t smlParseTelnetTags(SSmlHandle *info, char *data, char *sqlEnd, SSmlLineInfo *elements, SSmlMsgBuf *msg) {
if(is_same_child_table_telnet(elements, &info->preLine) == 0){
if (is_same_child_table_telnet(elements, &info->preLine) == 0) {
elements->measureTag = info->preLine.measureTag;
return TSDB_CODE_SUCCESS;
}
......@@ -82,15 +82,15 @@ static int32_t smlParseTelnetTags(SSmlHandle *info, char *data, char *sqlEnd, SS
SArray *maxKVs = info->maxTagKVs;
bool isSuperKVInit = true;
SArray *superKV = NULL;
if(info->dataFormat){
if(!isSameMeasure){
if (info->dataFormat) {
if (!isSameMeasure) {
SSmlSTableMeta **tmp = (SSmlSTableMeta **)taosHashGet(info->superTables, elements->measure, elements->measureLen);
SSmlSTableMeta *sMeta = NULL;
if(unlikely(tmp == NULL)){
STableMeta * pTableMeta = smlGetMeta(info, elements->measure, elements->measureLen);
if(pTableMeta == NULL){
SSmlSTableMeta *sMeta = NULL;
if (unlikely(tmp == NULL)) {
STableMeta *pTableMeta = smlGetMeta(info, elements->measure, elements->measureLen);
if (pTableMeta == NULL) {
info->dataFormat = false;
info->reRun = true;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
sMeta = smlBuildSTableMeta(info->dataFormat);
......@@ -101,23 +101,23 @@ static int32_t smlParseTelnetTags(SSmlHandle *info, char *data, char *sqlEnd, SS
info->currSTableMeta = (*tmp)->tableMeta;
superKV = (*tmp)->tags;
if(unlikely(taosArrayGetSize(superKV) == 0)){
if (unlikely(taosArrayGetSize(superKV) == 0)) {
isSuperKVInit = false;
}
taosArraySetSize(maxKVs, 0);
taosArrayClear(maxKVs);
}
}else{
taosArraySetSize(maxKVs, 0);
} else {
taosArrayClear(maxKVs);
}
taosArraySetSize(preLineKV, 0);
taosArrayClear(preLineKV);
const char *sql = data;
while (sql < sqlEnd) {
JUMP_SPACE(sql, sqlEnd)
if (unlikely(*sql == '\0')) break;
const char *key = sql;
size_t keyLen = 0;
size_t keyLen = 0;
// parse key
while (sql < sqlEnd) {
......@@ -137,14 +137,14 @@ static int32_t smlParseTelnetTags(SSmlHandle *info, char *data, char *sqlEnd, SS
smlBuildInvalidDataMsg(msg, "invalid key or key is too long than 64", key);
return TSDB_CODE_TSC_INVALID_COLUMN_LENGTH;
}
// if (smlCheckDuplicateKey(key, keyLen, dumplicateKey)) {
// smlBuildInvalidDataMsg(msg, "dumplicate key", key);
// return TSDB_CODE_TSC_DUP_NAMES;
// }
// if (smlCheckDuplicateKey(key, keyLen, dumplicateKey)) {
// smlBuildInvalidDataMsg(msg, "dumplicate key", key);
// return TSDB_CODE_TSC_DUP_NAMES;
// }
// parse value
const char *value = sql;
size_t valueLen = 0;
size_t valueLen = 0;
while (sql < sqlEnd) {
// parse value
if (unlikely(*sql == SPACE)) {
......@@ -169,24 +169,25 @@ static int32_t smlParseTelnetTags(SSmlHandle *info, char *data, char *sqlEnd, SS
SSmlKv kv = {.key = key, .keyLen = keyLen, .type = TSDB_DATA_TYPE_NCHAR, .value = value, .length = valueLen};
if(info->dataFormat){
if(unlikely(cnt + 1 > info->currSTableMeta->tableInfo.numOfTags)){
if (info->dataFormat) {
if (unlikely(cnt + 1 > info->currSTableMeta->tableInfo.numOfTags)) {
info->dataFormat = false;
info->reRun = true;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
if(isSameMeasure){
if(unlikely(cnt >= taosArrayGetSize(maxKVs))) {
if (isSameMeasure) {
if (unlikely(cnt >= taosArrayGetSize(maxKVs))) {
info->dataFormat = false;
info->reRun = true;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
SSmlKv *maxKV = (SSmlKv *)taosArrayGet(maxKVs, cnt);
if(unlikely(kv.length > maxKV->length)){
if (unlikely(kv.length > maxKV->length)) {
maxKV->length = kv.length;
SSmlSTableMeta **tableMeta = (SSmlSTableMeta **)taosHashGet(info->superTables, elements->measure, elements->measureLen);
if(unlikely(NULL == tableMeta)){
SSmlSTableMeta **tableMeta =
(SSmlSTableMeta **)taosHashGet(info->superTables, elements->measure, elements->measureLen);
if (unlikely(NULL == tableMeta)) {
uError("SML:0x%" PRIx64 " NULL == tableMeta", info->id);
return TSDB_CODE_SML_INTERNAL_ERROR;
}
......@@ -195,49 +196,50 @@ static int32_t smlParseTelnetTags(SSmlHandle *info, char *data, char *sqlEnd, SS
oldKV->length = kv.length;
info->needModifySchema = true;
}
if(unlikely(!IS_SAME_KEY)){
if (unlikely(!IS_SAME_KEY)) {
info->dataFormat = false;
info->reRun = true;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
}else{
if(isSuperKVInit){
if(unlikely(cnt >= taosArrayGetSize(superKV))) {
} else {
if (isSuperKVInit) {
if (unlikely(cnt >= taosArrayGetSize(superKV))) {
info->dataFormat = false;
info->reRun = true;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
SSmlKv *maxKV = (SSmlKv *)taosArrayGet(superKV, cnt);
if(unlikely(kv.length > maxKV->length)) {
if (unlikely(kv.length > maxKV->length)) {
maxKV->length = kv.length;
}else{
} else {
kv.length = maxKV->length;
}
info->needModifySchema = true;
if(unlikely(!IS_SAME_KEY)){
if (unlikely(!IS_SAME_KEY)) {
info->dataFormat = false;
info->reRun = true;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
}else{
} else {
taosArrayPush(superKV, &kv);
}
taosArrayPush(maxKVs, &kv);
}
}else{
} else {
taosArrayPush(maxKVs, &kv);
}
taosArrayPush(preLineKV, &kv);
cnt++;
}
elements->measureTag = (char*)taosMemoryMalloc(elements->measureLen + elements->tagsLen);
elements->measureTag = (char *)taosMemoryMalloc(elements->measureLen + elements->tagsLen);
memcpy(elements->measureTag, elements->measure, elements->measureLen);
memcpy(elements->measureTag + elements->measureLen, elements->tags, elements->tagsLen);
elements->measureTagsLen = elements->measureLen + elements->tagsLen;
SSmlTableInfo **tmp = (SSmlTableInfo **)taosHashGet(info->childTables, elements->measureTag, elements->measureLen + elements->tagsLen);
SSmlTableInfo **tmp =
(SSmlTableInfo **)taosHashGet(info->childTables, elements->measureTag, elements->measureLen + elements->tagsLen);
SSmlTableInfo *tinfo = NULL;
if (unlikely(tmp == NULL)) {
tinfo = smlBuildTableInfo(1, elements->measure, elements->measureLen);
......@@ -258,10 +260,11 @@ static int32_t smlParseTelnetTags(SSmlHandle *info, char *data, char *sqlEnd, SS
}
}
// SSmlLineInfo *key = (SSmlLineInfo *)taosMemoryMalloc(sizeof(SSmlLineInfo));
// *key = *elements;
// tinfo->key = key;
taosHashPut(info->childTables, elements->measureTag, elements->measureLen + elements->tagsLen, &tinfo, POINTER_BYTES);
// SSmlLineInfo *key = (SSmlLineInfo *)taosMemoryMalloc(sizeof(SSmlLineInfo));
// *key = *elements;
// tinfo->key = key;
taosHashPut(info->childTables, elements->measureTag, elements->measureLen + elements->tagsLen, &tinfo,
POINTER_BYTES);
tmp = &tinfo;
}
if (info->dataFormat) info->currTableDataCtx = (*tmp)->tableDataCtx;
......@@ -288,7 +291,7 @@ int32_t smlParseTelnetString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLine
}
bool needConverTime = false; // get TS before parse tag(get meta), so need conver time
if(info->dataFormat && info->currSTableMeta == NULL){
if (info->dataFormat && info->currSTableMeta == NULL) {
needConverTime = true;
}
int64_t ts = smlParseOpenTsdbTime(info, elements->timestamp, elements->timestampLen);
......@@ -296,7 +299,11 @@ int32_t smlParseTelnetString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLine
smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp", sql);
return TSDB_CODE_INVALID_TIMESTAMP;
}
SSmlKv kvTs = { .key = TS, .keyLen = TS_LEN, .type = TSDB_DATA_TYPE_TIMESTAMP, .i = ts, .length = (size_t)tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes};
SSmlKv kvTs = {.key = TS,
.keyLen = TS_LEN,
.type = TSDB_DATA_TYPE_TIMESTAMP,
.i = ts,
.length = (size_t)tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes};
// parse value
smlParseTelnetElement(&sql, sqlEnd, &elements->cols, &elements->colsLen);
......@@ -324,19 +331,19 @@ int32_t smlParseTelnetString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLine
return ret;
}
if(unlikely(info->reRun)){
if (unlikely(info->reRun)) {
return TSDB_CODE_SUCCESS;
}
if(info->dataFormat){
if(needConverTime) {
if (info->dataFormat) {
if (needConverTime) {
kvTs.i = convertTimePrecision(kvTs.i, TSDB_TIME_PRECISION_NANO, info->currSTableMeta->tableInfo.precision);
}
ret = smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, &kvTs, 0);
if(ret == TSDB_CODE_SUCCESS){
if (ret == TSDB_CODE_SUCCESS) {
ret = smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, &kv, 1);
}
if(ret == TSDB_CODE_SUCCESS){
if (ret == TSDB_CODE_SUCCESS) {
ret = smlBuildRow(info->currTableDataCtx);
}
clearColValArray(info->currTableDataCtx->pValues);
......@@ -344,8 +351,8 @@ int32_t smlParseTelnetString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLine
smlBuildInvalidDataMsg(&info->msgBuf, "smlBuildCol error", NULL);
return ret;
}
}else{
if(elements->colArray == NULL){
} else {
if (elements->colArray == NULL) {
elements->colArray = taosArrayInit(16, sizeof(SSmlKv));
}
taosArrayPush(elements->colArray, &kvTs);
......
此差异已折叠。
......@@ -162,6 +162,11 @@ void *queryThread(void *arg) {
}
static int32_t numOfThreads = 1;
void tmq_commit_cb_print(tmq_t *pTmq, int32_t code, void *param) {
printf("success, code:%d\n", code);
}
} // namespace
int main(int argc, char** argv) {
......@@ -176,12 +181,12 @@ int main(int argc, char** argv) {
return RUN_ALL_TESTS();
}
TEST(testCase, driverInit_Test) {
TEST(clientCase, driverInit_Test) {
// taosInitGlobalCfg();
// taos_init();
}
TEST(testCase, connect_Test) {
TEST(clientCase, connect_Test) {
taos_options(TSDB_OPTION_CONFIGDIR, "~/first/cfg");
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
......@@ -190,8 +195,8 @@ TEST(testCase, connect_Test) {
}
taos_close(pConn);
}
#if 0
TEST(testCase, create_user_Test) {
TEST(clientCase, create_user_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
......@@ -204,7 +209,7 @@ TEST(testCase, create_user_Test) {
taos_close(pConn);
}
TEST(testCase, create_account_Test) {
TEST(clientCase, create_account_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
......@@ -217,7 +222,7 @@ TEST(testCase, create_account_Test) {
taos_close(pConn);
}
TEST(testCase, drop_account_Test) {
TEST(clientCase, drop_account_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
......@@ -230,7 +235,7 @@ TEST(testCase, drop_account_Test) {
taos_close(pConn);
}
TEST(testCase, show_user_Test) {
TEST(clientCase, show_user_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
......@@ -250,7 +255,7 @@ TEST(testCase, show_user_Test) {
taos_close(pConn);
}
TEST(testCase, drop_user_Test) {
TEST(clientCase, drop_user_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
......@@ -263,7 +268,7 @@ TEST(testCase, drop_user_Test) {
taos_close(pConn);
}
TEST(testCase, show_db_Test) {
TEST(clientCase, show_db_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
......@@ -282,7 +287,7 @@ TEST(testCase, show_db_Test) {
taos_close(pConn);
}
TEST(testCase, create_db_Test) {
TEST(clientCase, create_db_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
......@@ -306,7 +311,7 @@ TEST(testCase, create_db_Test) {
taos_close(pConn);
}
TEST(testCase, create_dnode_Test) {
TEST(clientCase, create_dnode_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
......@@ -325,7 +330,7 @@ TEST(testCase, create_dnode_Test) {
taos_close(pConn);
}
TEST(testCase, drop_dnode_Test) {
TEST(clientCase, drop_dnode_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
......@@ -349,7 +354,7 @@ TEST(testCase, drop_dnode_Test) {
taos_close(pConn);
}
TEST(testCase, use_db_test) {
TEST(clientCase, use_db_test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
......@@ -367,7 +372,7 @@ TEST(testCase, use_db_test) {
taos_close(pConn);
}
// TEST(testCase, drop_db_test) {
// TEST(clientCase, drop_db_test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// assert(pConn != NULL);
//
......@@ -389,7 +394,7 @@ TEST(testCase, use_db_test) {
// taos_close(pConn);
//}
TEST(testCase, create_stable_Test) {
TEST(clientCase, create_stable_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
......@@ -428,7 +433,7 @@ TEST(testCase, create_stable_Test) {
taos_close(pConn);
}
TEST(testCase, create_table_Test) {
TEST(clientCase, create_table_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
......@@ -447,7 +452,7 @@ TEST(testCase, create_table_Test) {
taos_close(pConn);
}
TEST(testCase, create_ctable_Test) {
TEST(clientCase, create_ctable_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
......@@ -472,7 +477,7 @@ TEST(testCase, create_ctable_Test) {
taos_close(pConn);
}
TEST(testCase, show_stable_Test) {
TEST(clientCase, show_stable_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != nullptr);
......@@ -497,7 +502,7 @@ TEST(testCase, show_stable_Test) {
taos_close(pConn);
}
TEST(testCase, show_vgroup_Test) {
TEST(clientCase, show_vgroup_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
......@@ -529,7 +534,7 @@ TEST(testCase, show_vgroup_Test) {
taos_close(pConn);
}
TEST(testCase, create_multiple_tables) {
TEST(clientCase, create_multiple_tables) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(pConn, nullptr);
......@@ -600,7 +605,7 @@ TEST(testCase, create_multiple_tables) {
taos_close(pConn);
}
TEST(testCase, show_table_Test) {
TEST(clientCase, show_table_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
......@@ -634,7 +639,7 @@ TEST(testCase, show_table_Test) {
taos_close(pConn);
}
//TEST(testCase, drop_stable_Test) {
//TEST(clientCase, drop_stable_Test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// assert(pConn != nullptr);
//
......@@ -659,14 +664,14 @@ TEST(testCase, show_table_Test) {
// taos_close(pConn);
//}
TEST(testCase, generated_request_id_test) {
TEST(clientCase, generated_request_id_test) {
SHashObj* phash = taosHashInit(10000, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
for (int32_t i = 0; i < 50000; ++i) {
uint64_t v = generateRequestId();
void* result = taosHashGet(phash, &v, sizeof(v));
if (result != nullptr) {
printf("0x%lx, index:%d\n", v, i);
// printf("0x%llx, index:%d\n", v, i);
}
assert(result == nullptr);
taosHashPut(phash, &v, sizeof(v), NULL, 0);
......@@ -675,7 +680,7 @@ TEST(testCase, generated_request_id_test) {
taosHashCleanup(phash);
}
TEST(testCase, insert_test) {
TEST(clientCase, insert_test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(pConn, nullptr);
......@@ -692,9 +697,8 @@ TEST(testCase, insert_test) {
taos_free_result(pRes);
taos_close(pConn);
}
#endif
TEST(testCase, projection_query_tables) {
TEST(clientCase, projection_query_tables) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(pConn, nullptr);
......@@ -752,8 +756,7 @@ TEST(testCase, projection_query_tables) {
taos_close(pConn);
}
#if 0
TEST(testCase, tsbs_perf_test) {
TEST(clientCase, tsbs_perf_test) {
TdThread qid[20] = {0};
for(int32_t i = 0; i < numOfThreads; ++i) {
......@@ -762,7 +765,7 @@ TEST(testCase, tsbs_perf_test) {
getchar();
}
TEST(testCase, projection_query_stables) {
TEST(clientCase, projection_query_stables) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(pConn, nullptr);
......@@ -790,7 +793,7 @@ TEST(testCase, projection_query_stables) {
taos_close(pConn);
}
TEST(testCase, agg_query_tables) {
TEST(clientCase, agg_query_tables) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(pConn, nullptr);
......@@ -825,7 +828,7 @@ create table tm1 using m1 tags(2);
insert into tm0 values('2021-1-1 1:1:1.120', 1) ('2021-1-1 1:1:2.9', 2) tm1 values('2021-1-1 1:1:1.120', 11) ('2021-1-1 1:1:2.99', 22);
*/
TEST(testCase, async_api_test) {
TEST(clientCase, async_api_test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(pConn, nullptr);
......@@ -859,7 +862,7 @@ TEST(testCase, async_api_test) {
taos_close(pConn);
}
TEST(testCase, update_test) {
TEST(clientCase, update_test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(pConn, nullptr);
......@@ -895,6 +898,76 @@ TEST(testCase, update_test) {
}
}
#endif
TEST(clientCase, subscription_test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(pConn, nullptr);
// TAOS_RES* pRes = taos_query(pConn, "create topic topic_t1 as select * from t1");
// if (taos_errno(pRes) != TSDB_CODE_SUCCESS) {
// printf("failed to create topic, code:%s", taos_errstr(pRes));
// taos_free_result(pRes);
// return;
// }
tmq_conf_t* conf = tmq_conf_new();
tmq_conf_set(conf, "enable.auto.commit", "true");
tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
tmq_conf_set(conf, "group.id", "newabcdefgjhijlm__");
tmq_conf_set(conf, "td.connect.user", "root");
tmq_conf_set(conf, "td.connect.pass", "taosdata");
tmq_conf_set(conf, "auto.offset.reset", "earliest");
tmq_conf_set(conf, "experimental.snapshot.enable", "true");
tmq_conf_set(conf, "msg.with.table.name", "true");
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
tmq_conf_destroy(conf);
// 创建订阅 topics 列表
tmq_list_t* topicList = tmq_list_new();
tmq_list_append(topicList, "topic_t1");
// 启动订阅
tmq_subscribe(tmq, topicList);
tmq_list_destroy(topicList);
TAOS_FIELD* fields = NULL;
int32_t numOfFields = 0;
int32_t precision = 0;
int32_t totalRows = 0;
int32_t msgCnt = 0;
int32_t timeout = 5000;
while (1) {
TAOS_RES* pRes = tmq_consumer_poll(tmq, timeout);
if (pRes) {
char buf[1024];
int32_t rows = 0;
const char* topicName = tmq_get_topic_name(pRes);
const char* dbName = tmq_get_db_name(pRes);
int32_t vgroupId = tmq_get_vgroup_id(pRes);
printf("topic: %s\n", topicName);
printf("db: %s\n", dbName);
printf("vgroup id: %d\n", vgroupId);
while (1) {
TAOS_ROW row = taos_fetch_row(pRes);
if (row == NULL) break;
fields = taos_fetch_fields(pRes);
numOfFields = taos_field_count(pRes);
precision = taos_result_precision(pRes);
rows++;
taos_print_row(buf, row, fields, numOfFields);
printf("precision: %d, row content: %s\n", precision, buf);
}
}
// return rows;
}
fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
}
#pragma GCC diagnostic pop
......@@ -1546,7 +1546,10 @@ size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize) {
}
void colDataDestroy(SColumnInfoData* pColData) {
if (!pColData) return;
if (!pColData) {
return;
}
if (IS_VAR_DATA_TYPE(pColData->info.type)) {
taosMemoryFreeClear(pColData->varmeta.offset);
} else {
......@@ -2525,8 +2528,7 @@ const char* blockDecode(SSDataBlock* pBlock, const char* pData) {
pStart += sizeof(uint64_t);
if (pBlock->pDataBlock == NULL) {
pBlock->pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
taosArraySetSize(pBlock->pDataBlock, numOfCols);
pBlock->pDataBlock = taosArrayInit_s(numOfCols, sizeof(SColumnInfoData), numOfCols);
}
for (int32_t i = 0; i < numOfCols; ++i) {
......
......@@ -41,6 +41,7 @@ bool tsPrintAuth = false;
// queue & threads
int32_t tsNumOfRpcThreads = 1;
int32_t tsNumOfRpcSessions = 2000;
int32_t tsNumOfCommitThreads = 2;
int32_t tsNumOfTaskQueueThreads = 4;
int32_t tsNumOfMnodeQueryThreads = 4;
......@@ -54,7 +55,6 @@ int32_t tsNumOfQnodeQueryThreads = 4;
int32_t tsNumOfQnodeFetchThreads = 1;
int32_t tsNumOfSnodeStreamThreads = 4;
int32_t tsNumOfSnodeWriteThreads = 1;
// sync raft
int32_t tsElectInterval = 25 * 1000;
int32_t tsHeartbeatInterval = 1000;
......@@ -140,6 +140,7 @@ int32_t tsMaxMemUsedByInsert = 1024;
float tsSelectivityRatio = 1.0;
int32_t tsTagFilterResCacheSize = 1024 * 10;
char tsTagFilterCache = 0;
// the maximum allowed query buffer size during query processing for each data node.
// -1 no limit (default)
......@@ -188,6 +189,7 @@ int32_t tsGrantHBInterval = 60;
int32_t tsUptimeInterval = 300; // seconds
char tsUdfdResFuncs[512] = ""; // udfd resident funcs that teardown when udfd exits
char tsUdfdLdLibPath[512] = "";
bool tsDisableStream = false;
#ifndef _STORAGE
int32_t taosSetTfsCfg(SConfig *pCfg) {
......@@ -349,6 +351,7 @@ static int32_t taosAddSystemCfg(SConfig *pCfg) {
if (cfgAddBool(pCfg, "AVX2", tsAVX2Enable, 0) != 0) return -1;
if (cfgAddBool(pCfg, "FMA", tsFMAEnable, 0) != 0) return -1;
if (cfgAddBool(pCfg, "SIMD-builtins", tsSIMDBuiltins, 0) != 0) return -1;
if (cfgAddBool(pCfg, "tagFilterCache", tsTagFilterCache, 0) != 0) return -1;
if (cfgAddInt64(pCfg, "openMax", tsOpenMax, 0, INT64_MAX, 1) != 0) return -1;
#if !defined(_ALPINE)
......@@ -388,9 +391,12 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddInt32(pCfg, "queryRspPolicy", tsQueryRspPolicy, 0, 1, 0) != 0) return -1;
tsNumOfRpcThreads = tsNumOfCores / 2;
tsNumOfRpcThreads = TRANGE(tsNumOfRpcThreads, 1, TSDB_MAX_RPC_THREADS);
tsNumOfRpcThreads = TRANGE(tsNumOfRpcThreads, 2, TSDB_MAX_RPC_THREADS);
if (cfgAddInt32(pCfg, "numOfRpcThreads", tsNumOfRpcThreads, 1, 1024, 0) != 0) return -1;
tsNumOfRpcSessions = TRANGE(tsNumOfRpcSessions, 100, 10000);
if (cfgAddInt32(pCfg, "numOfRpcSessions", tsNumOfRpcSessions, 1, 100000, 0) != 0) return -1;
tsNumOfCommitThreads = tsNumOfCores / 2;
tsNumOfCommitThreads = TRANGE(tsNumOfCommitThreads, 2, 4);
if (cfgAddInt32(pCfg, "numOfCommitThreads", tsNumOfCommitThreads, 1, 1024, 0) != 0) return -1;
......@@ -467,6 +473,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddString(pCfg, "udfdResFuncs", tsUdfdResFuncs, 0) != 0) return -1;
if (cfgAddString(pCfg, "udfdLdLibPath", tsUdfdLdLibPath, 0) != 0) return -1;
if (cfgAddBool(pCfg, "disableStream", tsDisableStream, 0) != 0) return -1;
GRANT_CFG_ADD;
return 0;
}
......@@ -496,11 +504,19 @@ static int32_t taosUpdateServerCfg(SConfig *pCfg) {
pItem = cfgGetItem(tsCfg, "numOfRpcThreads");
if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) {
tsNumOfRpcThreads = numOfCores / 2;
tsNumOfRpcThreads = TRANGE(tsNumOfRpcThreads, 1, 4);
tsNumOfRpcThreads = TRANGE(tsNumOfRpcThreads, 2, TSDB_MAX_RPC_THREADS);
pItem->i32 = tsNumOfRpcThreads;
pItem->stype = stype;
}
pItem = cfgGetItem(tsCfg, "numOfRpcSessions");
if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) {
tsNumOfRpcSessions = 2000;
tsNumOfRpcSessions = TRANGE(tsNumOfRpcSessions, 100, 10000);
pItem->i32 = tsNumOfRpcSessions;
pItem->stype = stype;
}
pItem = cfgGetItem(tsCfg, "numOfCommitThreads");
if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) {
tsNumOfCommitThreads = numOfCores / 2;
......@@ -718,6 +734,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsPrintAuth = cfgGetItem(pCfg, "printAuth")->bval;
tsNumOfRpcThreads = cfgGetItem(pCfg, "numOfRpcThreads")->i32;
tsNumOfRpcSessions = cfgGetItem(pCfg, "numOfRpcSessions")->i32;
tsNumOfCommitThreads = cfgGetItem(pCfg, "numOfCommitThreads")->i32;
tsNumOfMnodeReadThreads = cfgGetItem(pCfg, "numOfMnodeReadThreads")->i32;
tsNumOfVnodeQueryThreads = cfgGetItem(pCfg, "numOfVnodeQueryThreads")->i32;
......@@ -731,6 +748,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsRpcQueueMemoryAllowed = cfgGetItem(pCfg, "rpcQueueMemoryAllowed")->i64;
tsSIMDBuiltins = (bool)cfgGetItem(pCfg, "SIMD-builtins")->bval;
tsTagFilterCache = (bool)cfgGetItem(pCfg, "tagFilterCache")->bval;
tsEnableMonitor = cfgGetItem(pCfg, "monitor")->bval;
tsMonitorInterval = cfgGetItem(pCfg, "monitorInterval")->i32;
......@@ -767,6 +785,9 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
if (tsQueryBufferSize >= 0) {
tsQueryBufferSizeBytes = tsQueryBufferSize * 1048576UL;
}
tsDisableStream = cfgGetItem(pCfg, "disableStream")->bval;
GRANT_CFG_GET;
return 0;
}
......@@ -973,6 +994,8 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) {
tsNumOfTaskQueueThreads = cfgGetItem(pCfg, "numOfTaskQueueThreads")->i32;
} else if (strcasecmp("numOfRpcThreads", name) == 0) {
tsNumOfRpcThreads = cfgGetItem(pCfg, "numOfRpcThreads")->i32;
} else if (strcasecmp("numOfRpcSessions", name) == 0) {
tsNumOfRpcSessions = cfgGetItem(pCfg, "numOfRpcSessions")->i32;
} else if (strcasecmp("numOfCommitThreads", name) == 0) {
tsNumOfCommitThreads = cfgGetItem(pCfg, "numOfCommitThreads")->i32;
} else if (strcasecmp("numOfMnodeReadThreads", name) == 0) {
......
......@@ -5495,6 +5495,7 @@ int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateS
if (tEncodeI16(&encoder, pCol->colId) < 0) return -1;
if (tEncodeI8(&encoder, pCol->type) < 0) return -1;
}
if (tEncodeI64(&encoder, pReq->deleteMark) < 0) return -1;
if (tEncodeI8(&encoder, pReq->igUpdate) < 0) return -1;
tEndEncode(&encoder);
......@@ -5579,6 +5580,7 @@ int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStrea
}
}
if (tDecodeI64(&decoder, &pReq->deleteMark) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->igUpdate) < 0) return -1;
tEndDecode(&decoder);
......
......@@ -280,10 +280,19 @@ int32_t dmInitClient(SDnode *pDnode) {
rpcInit.retryMaxInterval = tsRedirectMaxPeriod;
rpcInit.retryMaxTimouet = tsMaxRetryWaitTime;
rpcInit.failFastInterval = 1000; // interval threshold(ms)
rpcInit.failFastInterval = 5000; // interval threshold(ms)
rpcInit.failFastThreshold = 3; // failed threshold
rpcInit.ffp = dmFailFastFp;
int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3);
connLimitNum = TMAX(connLimitNum, 10);
connLimitNum = TMIN(connLimitNum, 500);
rpcInit.connLimitNum = connLimitNum;
rpcInit.connLimitLock = 1;
rpcInit.supportBatch = 1;
rpcInit.batchSize = 8 * 1024;
pTrans->clientRpc = rpcOpen(&rpcInit);
if (pTrans->clientRpc == NULL) {
dError("failed to init dnode rpc client");
......
......@@ -146,7 +146,9 @@ int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj, int32_t sver) {
// 3.0.20
if (sver >= 2) {
if (tDecodeI64(pDecoder, &pObj->checkpointFreq) < 0) return -1;
if (tDecodeI8(pDecoder, &pObj->igCheckUpdate) < 0) return -1;
if (!tDecodeIsEnd(pDecoder)) {
if (tDecodeI8(pDecoder, &pObj->igCheckUpdate) < 0) return -1;
}
}
tEndDecode(pDecoder);
return 0;
......@@ -415,19 +417,21 @@ void *tDecodeSMqConsumerEp(const void *buf, SMqConsumerEp *pConsumerEp) {
return (void *)buf;
}
SMqSubscribeObj *tNewSubscribeObj(const char key[TSDB_SUBSCRIBE_KEY_LEN]) {
SMqSubscribeObj *pSubNew = taosMemoryCalloc(1, sizeof(SMqSubscribeObj));
if (pSubNew == NULL) return NULL;
memcpy(pSubNew->key, key, TSDB_SUBSCRIBE_KEY_LEN);
taosInitRWLatch(&pSubNew->lock);
pSubNew->vgNum = 0;
pSubNew->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
// TODO set hash free fp
/*taosHashSetFreeFp(pSubNew->consumerHash, tDeleteSMqConsumerEp);*/
SMqSubscribeObj *tNewSubscribeObj(const char* key) {
SMqSubscribeObj *pSubObj = taosMemoryCalloc(1, sizeof(SMqSubscribeObj));
if (pSubObj == NULL) {
return NULL;
}
pSubNew->unassignedVgs = taosArrayInit(0, sizeof(void *));
memcpy(pSubObj->key, key, TSDB_SUBSCRIBE_KEY_LEN);
taosInitRWLatch(&pSubObj->lock);
pSubObj->vgNum = 0;
pSubObj->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
return pSubNew;
// TODO set hash free fp
/*taosHashSetFreeFp(pSubObj->consumerHash, tDeleteSMqConsumerEp);*/
pSubObj->unassignedVgs = taosArrayInit(0, POINTER_BYTES);
return pSubObj;
}
SMqSubscribeObj *tCloneSubscribeObj(const SMqSubscribeObj *pSub) {
......
......@@ -882,6 +882,12 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) {
if (strcasecmp(cfgReq.config, "resetlog") == 0) {
strcpy(dcfgReq.config, "resetlog");
} else if (strncasecmp(cfgReq.config, "monitor", 7) == 0) {
if (' ' != cfgReq.config[7] && 0 != cfgReq.config[7]) {
mError("dnode:%d, failed to config monitor since invalid conf:%s", cfgReq.dnodeId, cfgReq.config);
terrno = TSDB_CODE_INVALID_CFG;
return -1;
}
const char *value = cfgReq.value;
int32_t flag = atoi(value);
if (flag <= 0) {
......@@ -902,12 +908,18 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) {
int32_t optLen = strlen(optName);
if (strncasecmp(cfgReq.config, optName, optLen) != 0) continue;
if (' ' != cfgReq.config[optLen] && 0 != cfgReq.config[optLen]) {
mError("dnode:%d, failed to config since invalid conf:%s", cfgReq.dnodeId, cfgReq.config);
terrno = TSDB_CODE_INVALID_CFG;
return -1;
}
const char *value = cfgReq.value;
int32_t flag = atoi(value);
if (flag <= 0) {
flag = atoi(cfgReq.config + optLen + 1);
}
if (flag <= 0 || flag > 255) {
if (flag < 0 || flag > 255) {
mError("dnode:%d, failed to config %s since value:%d", cfgReq.dnodeId, optName, flag);
terrno = TSDB_CODE_INVALID_CFG;
return -1;
......
......@@ -297,6 +297,7 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
pObj->triggerParam = pCreate->maxDelay;
pObj->watermark = pCreate->watermark;
pObj->fillHistory = pCreate->fillHistory;
pObj->deleteMark = pCreate->deleteMark;
pObj->igCheckUpdate = pCreate->igUpdate;
memcpy(pObj->sourceDb, pCreate->sourceDB, TSDB_DB_FNAME_LEN);
......@@ -342,9 +343,9 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
}
int32_t numOfNULL = taosArrayGetSize(pCreate->fillNullCols);
if(numOfNULL > 0) {
if (numOfNULL > 0) {
pObj->outputSchema.nCols += numOfNULL;
SSchema* pFullSchema = taosMemoryCalloc(pObj->outputSchema.nCols, sizeof(SSchema));
SSchema *pFullSchema = taosMemoryCalloc(pObj->outputSchema.nCols, sizeof(SSchema));
if (!pFullSchema) {
goto FAIL;
}
......@@ -352,10 +353,10 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
int32_t nullIndex = 0;
int32_t dataIndex = 0;
for (int16_t i = 0; i < pObj->outputSchema.nCols; i++) {
SColLocation* pos = taosArrayGet(pCreate->fillNullCols, nullIndex);
SColLocation *pos = taosArrayGet(pCreate->fillNullCols, nullIndex);
if (i < pos->slotId) {
pFullSchema[i].bytes = pObj->outputSchema.pSchema[dataIndex].bytes;
pFullSchema[i].colId = i + 1; // pObj->outputSchema.pSchema[dataIndex].colId;
pFullSchema[i].colId = i + 1; // pObj->outputSchema.pSchema[dataIndex].colId;
pFullSchema[i].flags = pObj->outputSchema.pSchema[dataIndex].flags;
strcpy(pFullSchema[i].name, pObj->outputSchema.pSchema[dataIndex].name);
pFullSchema[i].type = pObj->outputSchema.pSchema[dataIndex].type;
......@@ -380,6 +381,7 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
.triggerType = pObj->trigger == STREAM_TRIGGER_MAX_DELAY ? STREAM_TRIGGER_WINDOW_CLOSE : pObj->trigger,
.watermark = pObj->watermark,
.igExpired = pObj->igExpired,
.deleteMark = pObj->deleteMark,
.igCheckUpdate = pObj->igCheckUpdate,
};
......@@ -505,9 +507,8 @@ static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStre
SMCreateStbReq createReq = {0};
tstrncpy(createReq.name, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
createReq.numOfColumns = pStream->outputSchema.nCols;
createReq.pColumns = taosArrayInit(createReq.numOfColumns, sizeof(SField));
createReq.pColumns = taosArrayInit_s(createReq.numOfColumns, sizeof(SField), createReq.numOfColumns);
// build fields
taosArraySetSize(createReq.pColumns, createReq.numOfColumns);
for (int32_t i = 0; i < createReq.numOfColumns; i++) {
SField *pField = taosArrayGet(createReq.pColumns, i);
tstrncpy(pField->name, pStream->outputSchema.pSchema[i].name, TSDB_COL_NAME_LEN);
......@@ -518,8 +519,7 @@ static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStre
if (pStream->tagSchema.nCols == 0) {
createReq.numOfTags = 1;
createReq.pTags = taosArrayInit(createReq.numOfTags, sizeof(SField));
taosArraySetSize(createReq.pTags, createReq.numOfTags);
createReq.pTags = taosArrayInit_s(createReq.numOfTags, sizeof(SField), 1);
// build tags
SField *pField = taosArrayGet(createReq.pTags, 0);
strcpy(pField->name, "group_id");
......@@ -528,8 +528,7 @@ static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStre
pField->bytes = 8;
} else {
createReq.numOfTags = pStream->tagSchema.nCols;
createReq.pTags = taosArrayInit(createReq.numOfTags, sizeof(SField));
taosArraySetSize(createReq.pTags, createReq.numOfTags);
createReq.pTags = taosArrayInit_s(createReq.numOfTags, sizeof(SField), createReq.numOfTags);
for (int32_t i = 0; i < createReq.numOfTags; i++) {
SField *pField = taosArrayGet(createReq.pTags, i);
pField->bytes = pStream->tagSchema.pSchema[i].bytes;
......@@ -726,7 +725,8 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
if (mndTrancCheckConflict(pMnode, pTrans) != 0) goto _OVER;
// create stb for stream
if (createStreamReq.createStb == STREAM_CREATE_STABLE_TRUE && mndCreateStbForStream(pMnode, pTrans, &streamObj, pReq->info.conn.user) < 0) {
if (createStreamReq.createStb == STREAM_CREATE_STABLE_TRUE &&
mndCreateStbForStream(pMnode, pTrans, &streamObj, pReq->info.conn.user) < 0) {
mError("trans:%d, failed to create stb for stream %s since %s", pTrans->id, createStreamReq.name, terrstr());
mndTransDrop(pTrans);
goto _OVER;
......
......@@ -966,7 +966,9 @@ static int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
while (numOfRows < rowsCapacity) {
pShow->pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pShow->pIter, (void **)&pSub);
if (pShow->pIter == NULL) break;
if (pShow->pIter == NULL) {
break;
}
taosRLockLatch(&pSub->lock);
......@@ -1075,6 +1077,9 @@ static int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
}
#endif
pBlock->info.rows = numOfRows;
taosRUnLockLatch(&pSub->lock);
sdbRelease(pSdb, pSub);
}
......
......@@ -107,8 +107,8 @@ void metaReaderClear(SMetaReader *pReader);
int32_t metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid);
int32_t metaGetTableEntryByUidCache(SMetaReader *pReader, tb_uid_t uid);
int metaGetTableEntryByName(SMetaReader *pReader, const char *name);
int32_t metaGetTableTags(SMeta *pMeta, uint64_t suid, SArray *uidList, SHashObj *tags);
int32_t metaGetTableTagsByUids(SMeta *pMeta, int64_t suid, SArray *uidList, SHashObj *tags);
int32_t metaGetTableTags(SMeta *pMeta, uint64_t suid, SArray *uidList);
int32_t metaGetTableTagsByUids(SMeta *pMeta, int64_t suid, SArray *uidList);
int32_t metaReadNext(SMetaReader *pReader);
const void *metaGetTableTagVal(void *tag, int16_t type, STagVal *tagVal);
int metaGetTableNameByUid(void *meta, uint64_t uid, char *tbName);
......
......@@ -244,6 +244,7 @@ void tsdbHeadFileName(STsdb *pTsdb, SDiskID did, int32_t fid, SHeadFile *pHeadF,
void tsdbDataFileName(STsdb *pTsdb, SDiskID did, int32_t fid, SDataFile *pDataF, char fname[]);
void tsdbSttFileName(STsdb *pTsdb, SDiskID did, int32_t fid, SSttFile *pSttF, char fname[]);
void tsdbSmaFileName(STsdb *pTsdb, SDiskID did, int32_t fid, SSmaFile *pSmaF, char fname[]);
// SDelFile
void tsdbDelFileName(STsdb *pTsdb, SDelFile *pFile, char fname[]);
// tsdbFS.c ==============================================================================================
......@@ -687,6 +688,7 @@ typedef struct SSttBlockLoadInfo {
int16_t *colIds;
int32_t numOfCols;
bool sttBlockLoaded;
int32_t numOfStt;
// keep the last access position, this position may be used to reduce the binary times for
// starting last block data for a new table
......@@ -752,7 +754,7 @@ bool tMergeTreeNext(SMergeTree *pMTree);
TSDBROW tMergeTreeGetRow(SMergeTree *pMTree);
void tMergeTreeClose(SMergeTree *pMTree);
SSttBlockLoadInfo *tCreateLastBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols);
SSttBlockLoadInfo *tCreateLastBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols, int32_t numOfStt);
void resetLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo);
void getLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo, int64_t *blocks, double *el);
void *destroyLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo);
......
......@@ -122,7 +122,7 @@ typedef struct STbUidStore STbUidStore;
#define META_BEGIN_HEAP_NIL 2
int metaOpen(SVnode* pVnode, SMeta** ppMeta, int8_t rollback);
int metaClose(SMeta* pMeta);
int metaClose(SMeta** pMeta);
int metaBegin(SMeta* pMeta, int8_t fromSys);
TXN* metaGetTxn(SMeta* pMeta);
int metaCommit(SMeta* pMeta, TXN* txn);
......
......@@ -32,9 +32,9 @@ typedef struct SMetaStbStatsEntry {
} SMetaStbStatsEntry;
typedef struct STagFilterResEntry {
uint64_t suid; // uid for super table
SList list; // the linked list of md5 digest, extracted from the serialized tag query condition
uint32_t qTimes; // queried times for current super table
SList list; // the linked list of md5 digest, extracted from the serialized tag query condition
uint32_t hitTimes; // queried times for current super table
uint32_t accTime;
} STagFilterResEntry;
struct SMetaCache {
......@@ -55,6 +55,7 @@ struct SMetaCache {
// query cache
struct STagFilterResCache {
TdThreadMutex lock;
uint32_t accTimes;
SHashObj* pTableEntry;
SLRUCache* pUidResCache;
} sTagFilterResCache;
......@@ -132,6 +133,7 @@ int32_t metaCacheOpen(SMeta* pMeta) {
goto _err2;
}
pCache->sTagFilterResCache.accTimes = 0;
pCache->sTagFilterResCache.pTableEntry =
taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), false, HASH_NO_LOCK);
if (pCache->sTagFilterResCache.pTableEntry == NULL) {
......@@ -159,9 +161,9 @@ void metaCacheClose(SMeta* pMeta) {
entryCacheClose(pMeta);
statsCacheClose(pMeta);
taosHashCleanup(pMeta->pCache->sTagFilterResCache.pTableEntry);
taosLRUCacheCleanup(pMeta->pCache->sTagFilterResCache.pUidResCache);
taosThreadMutexDestroy(&pMeta->pCache->sTagFilterResCache.lock);
taosHashCleanup(pMeta->pCache->sTagFilterResCache.pTableEntry);
taosMemoryFree(pMeta->pCache);
pMeta->pCache = NULL;
......@@ -427,6 +429,32 @@ int32_t metaStatsCacheGet(SMeta* pMeta, int64_t uid, SMetaStbStats* pInfo) {
return code;
}
static int checkAllEntriesInCache(const STagFilterResEntry* pEntry, SArray* pInvalidRes, int32_t keyLen,
SLRUCache* pCache, uint64_t suid) {
SListIter iter = {0};
tdListInitIter((SList*)&(pEntry->list), &iter, TD_LIST_FORWARD);
SListNode* pNode = NULL;
uint64_t buf[3];
buf[0] = suid;
int32_t len = sizeof(uint64_t) * tListLen(buf);
while ((pNode = tdListNext(&iter)) != NULL) {
memcpy(&buf[1], pNode->data, keyLen);
// check whether it is existed in LRU cache, and remove it from linked list if not.
LRUHandle* pRes = taosLRUCacheLookup(pCache, buf, len);
if (pRes == NULL) { // remove the item in the linked list
taosArrayPush(pInvalidRes, &pNode);
} else {
taosLRUCacheRelease(pCache, pRes, false);
}
}
return 0;
}
int32_t metaGetCachedTableUidList(SMeta* pMeta, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray* pList1,
bool* acquireRes) {
// generate the composed key for LRU cache
......@@ -434,16 +462,18 @@ int32_t metaGetCachedTableUidList(SMeta* pMeta, tb_uid_t suid, const uint8_t* pK
SHashObj* pTableMap = pMeta->pCache->sTagFilterResCache.pTableEntry;
TdThreadMutex* pLock = &pMeta->pCache->sTagFilterResCache.lock;
uint64_t buf[3] = {0};
uint32_t times = 0;
uint64_t buf[4];
*acquireRes = 0;
buf[0] = suid;
memcpy(&buf[1], pKey, keyLen);
buf[0] = (uint64_t)pTableMap;
buf[1] = suid;
memcpy(&buf[2], pKey, keyLen);
taosThreadMutexLock(pLock);
pMeta->pCache->sTagFilterResCache.accTimes += 1;
int32_t len = keyLen + sizeof(uint64_t);
int32_t len = keyLen + sizeof(uint64_t) * 2;
LRUHandle* pHandle = taosLRUCacheLookup(pCache, buf, len);
if (pHandle == NULL) {
taosThreadMutexUnlock(pLock);
......@@ -465,56 +495,69 @@ int32_t metaGetCachedTableUidList(SMeta* pMeta, tb_uid_t suid, const uint8_t* pK
// set the result into the buffer
taosArrayAddBatch(pList1, p + sizeof(int32_t), size);
times = atomic_add_fetch_32(&(*pEntry)->qTimes, 1);
(*pEntry)->hitTimes += 1;
uint32_t acc = pMeta->pCache->sTagFilterResCache.accTimes;
if ((*pEntry)->hitTimes % 5000 == 0 && (*pEntry)->hitTimes > 0) {
metaInfo("cache hit:%d, total acc:%d, rate:%.2f", (*pEntry)->hitTimes, acc, ((double)(*pEntry)->hitTimes) / acc);
}
taosLRUCacheRelease(pCache, pHandle, false);
// unlock meta
taosThreadMutexUnlock(pLock);
return TSDB_CODE_SUCCESS;
}
static void freePayload(const void* key, size_t keyLen, void* value) {
if (value == NULL) {
return;
}
const uint64_t* p = key;
if (keyLen != sizeof(int64_t) * 4) {
metaError("key length is invalid, length:%d, expect:%d", (int32_t)keyLen, (int32_t)sizeof(uint64_t) * 2);
return;
}
// check if scanning all items are necessary or not
if (times >= 5000 && TD_DLIST_NELES(&(*pEntry)->list) > 10) {
taosThreadMutexLock(pLock);
SHashObj* pHashObj = (SHashObj*)p[0];
STagFilterResEntry** pEntry = taosHashGet(pHashObj, &p[1], sizeof(uint64_t));
SArray* pInvalidRes = taosArrayInit(64, POINTER_BYTES);
{
int64_t st = taosGetTimestampUs();
SListIter iter = {0};
tdListInitIter(&(*pEntry)->list, &iter, TD_LIST_FORWARD);
tdListInitIter((SList*)&((*pEntry)->list), &iter, TD_LIST_FORWARD);
SListNode* pNode = NULL;
while ((pNode = tdListNext(&iter)) != NULL) {
memcpy(&buf[1], pNode->data, keyLen);
// check whether it is existed in LRU cache, and remove it from linked list if not.
LRUHandle* pRes = taosLRUCacheLookup(pCache, buf, len);
if (pRes == NULL) { // remove the item in the linked list
taosArrayPush(pInvalidRes, &pNode);
} else {
taosLRUCacheRelease(pCache, pRes, false);
uint64_t* digest = (uint64_t*)pNode->data;
if (digest[0] == p[2] && digest[1] == p[3]) {
void* tmp = tdListPopNode(&((*pEntry)->list), pNode);
taosMemoryFree(tmp);
int64_t et = taosGetTimestampUs();
metaInfo("clear items in cache, remain cached item:%d, elapsed time:%.2fms", listNEles(&((*pEntry)->list)),
(et - st) / 1000.0);
break;
}
}
// remove the keys, of which query uid lists have been replaced already.
size_t s = taosArrayGetSize(pInvalidRes);
for (int32_t i = 0; i < s; ++i) {
SListNode** p1 = taosArrayGet(pInvalidRes, i);
tdListPopNode(&(*pEntry)->list, *p1);
taosMemoryFree(*p1);
}
atomic_store_32(&(*pEntry)->qTimes, 0); // reset the query times
taosArrayDestroy(pInvalidRes);
taosThreadMutexUnlock(pLock);
}
return TSDB_CODE_SUCCESS;
taosMemoryFree(value);
}
static void freePayload(const void* key, size_t keyLen, void* value) {
if (value == NULL) {
return;
static int32_t addNewEntry(SHashObj* pTableEntry, const void* pKey, int32_t keyLen, uint64_t suid) {
STagFilterResEntry* p = taosMemoryMalloc(sizeof(STagFilterResEntry));
if (p == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
taosMemoryFree(value);
p->hitTimes = 0;
tdListInit(&p->list, keyLen);
taosHashPut(pTableEntry, &suid, sizeof(uint64_t), &p, POINTER_BYTES);
tdListAppend(&p->list, pKey);
return 0;
}
// check both the payload size and selectivity ratio
......@@ -540,45 +583,61 @@ int32_t metaUidFilterCachePut(SMeta* pMeta, uint64_t suid, const void* pKey, int
SHashObj* pTableEntry = pMeta->pCache->sTagFilterResCache.pTableEntry;
TdThreadMutex* pLock = &pMeta->pCache->sTagFilterResCache.lock;
// the format of key:
// hash table address(8bytes) + suid(8bytes) + MD5 digest(16bytes)
uint64_t buf[4] = {0};
buf[0] = (uint64_t)pTableEntry;
buf[1] = suid;
memcpy(&buf[2], pKey, keyLen);
ASSERT(keyLen == 16);
int32_t code = 0;
taosThreadMutexLock(pLock);
STagFilterResEntry** pEntry = taosHashGet(pTableEntry, &suid, sizeof(uint64_t));
if (pEntry == NULL) {
STagFilterResEntry* p = taosMemoryMalloc(sizeof(STagFilterResEntry));
p->qTimes = 0;
tdListInit(&p->list, keyLen);
taosHashPut(pTableEntry, &suid, sizeof(uint64_t), &p, POINTER_BYTES);
tdListAppend(&p->list, pKey);
code = addNewEntry(pTableEntry, pKey, keyLen, suid);
if (code != TSDB_CODE_SUCCESS) {
goto _end;
}
} else {
tdListAppend(&(*pEntry)->list, pKey);
}
uint64_t buf[3] = {0};
buf[0] = suid;
memcpy(&buf[1], pKey, keyLen);
if (sizeof(uint64_t) + keyLen != 24) {
metaError("meta/cache: incorrect keyLen:%" PRId32 " length.", keyLen);
return TSDB_CODE_FAILED;
// check if it exists or not
size_t size = listNEles(&(*pEntry)->list);
if (size == 0) {
tdListAppend(&(*pEntry)->list, pKey);
} else {
SListNode* pNode = listHead(&(*pEntry)->list);
uint64_t* p = (uint64_t*)pNode->data;
if (p[1] == ((uint64_t*)pKey)[1] && p[0] == ((uint64_t*)pKey)[0]) {
// we have already found the existed items, no need to added to cache anymore.
taosThreadMutexUnlock(pLock);
return TSDB_CODE_SUCCESS;
} else { // not equal, append it
tdListAppend(&(*pEntry)->list, pKey);
}
}
}
// add to cache.
int32_t ret = taosLRUCacheInsert(pCache, buf, sizeof(uint64_t) + keyLen, pPayload, payloadLen, freePayload, NULL,
TAOS_LRU_PRIORITY_LOW);
taosLRUCacheInsert(pCache, buf, sizeof(uint64_t) * 2 + keyLen, pPayload, payloadLen, freePayload, NULL,
TAOS_LRU_PRIORITY_LOW);
_end:
taosThreadMutexUnlock(pLock);
metaDebug("vgId:%d, suid:%" PRIu64 " list cache added into cache, total:%d, tables:%d, ret:%d", TD_VID(pMeta->pVnode),
suid, (int32_t)taosLRUCacheGetUsage(pCache), taosHashGetSize(pTableEntry), ret);
metaDebug("vgId:%d, suid:%" PRIu64 " list cache added into cache, total:%d, tables:%d", TD_VID(pMeta->pVnode), suid,
(int32_t)taosLRUCacheGetUsage(pCache), taosHashGetSize(pTableEntry));
return TSDB_CODE_SUCCESS;
return code;
}
// remove the lru cache that are expired due to the tags value update, or creating, or dropping, of child tables
int32_t metaUidCacheClear(SMeta* pMeta, uint64_t suid) {
int32_t keyLen = sizeof(uint64_t) * 3;
uint64_t p[3] = {0};
p[0] = suid;
uint64_t p[4] = {0};
p[0] = (uint64_t)pMeta->pCache->sTagFilterResCache.pTableEntry;
p[1] = suid;
TdThreadMutex* pLock = &pMeta->pCache->sTagFilterResCache.lock;
......@@ -594,11 +653,11 @@ int32_t metaUidCacheClear(SMeta* pMeta, uint64_t suid) {
SListNode* pNode = NULL;
while ((pNode = tdListNext(&iter)) != NULL) {
memcpy(&p[1], pNode->data, 16);
memcpy(&p[2], pNode->data, 16);
taosLRUCacheErase(pMeta->pCache->sTagFilterResCache.pUidResCache, p, keyLen);
}
(*pEntry)->qTimes = 0;
(*pEntry)->hitTimes = 0;
tdListEmpty(&(*pEntry)->list);
taosThreadMutexUnlock(pLock);
......
......@@ -201,7 +201,8 @@ _err:
return -1;
}
int metaClose(SMeta *pMeta) {
int metaClose(SMeta **ppMeta) {
SMeta *pMeta = *ppMeta;
if (pMeta) {
if (pMeta->pEnv) metaAbort(pMeta);
if (pMeta->pCache) metaCacheClose(pMeta);
......@@ -221,7 +222,8 @@ int metaClose(SMeta *pMeta) {
if (pMeta->pTbDb) tdbTbClose(pMeta->pTbDb);
if (pMeta->pEnv) tdbClose(pMeta->pEnv);
metaDestroyLock(pMeta);
taosMemoryFree(pMeta);
taosMemoryFreeClear(*ppMeta);
}
return 0;
......
......@@ -1357,13 +1357,14 @@ static int32_t metaGetTableTagByUid(SMeta *pMeta, int64_t suid, int64_t uid, voi
return ret;
}
int32_t metaGetTableTagsByUids(SMeta *pMeta, int64_t suid, SArray *uidList, SHashObj *tags) {
int32_t metaGetTableTagsByUids(SMeta *pMeta, int64_t suid, SArray *uidList) {
const int32_t LIMIT = 128;
int32_t isLock = false;
int32_t sz = uidList ? taosArrayGetSize(uidList) : 0;
for (int i = 0; i < sz; i++) {
tb_uid_t *id = taosArrayGet(uidList, i);
STUidTagInfo *p = taosArrayGet(uidList, i);
if (i % LIMIT == 0) {
if (isLock) metaULock(pMeta);
......@@ -1372,51 +1373,72 @@ int32_t metaGetTableTagsByUids(SMeta *pMeta, int64_t suid, SArray *uidList, SHas
isLock = true;
}
if (taosHashGet(tags, id, sizeof(tb_uid_t)) == NULL) {
// if (taosHashGet(tags, &p->uid, sizeof(tb_uid_t)) == NULL) {
void *val = NULL;
int32_t len = 0;
if (metaGetTableTagByUid(pMeta, suid, *id, &val, &len, false) == 0) {
taosHashPut(tags, id, sizeof(tb_uid_t), val, len);
if (metaGetTableTagByUid(pMeta, suid, p->uid, &val, &len, false) == 0) {
p->pTagVal = taosMemoryMalloc(len);
memcpy(p->pTagVal, val, len);
tdbFree(val);
} else {
metaError("vgId:%d, failed to table IDs, suid: %" PRId64 ", uid: %" PRId64 "", TD_VID(pMeta->pVnode), suid,
*id);
metaError("vgId:%d, failed to table tags, suid: %" PRId64 ", uid: %" PRId64 "", TD_VID(pMeta->pVnode), suid,
p->uid);
}
}
}
// }
if (isLock) metaULock(pMeta);
return 0;
}
int32_t metaGetTableTags(SMeta *pMeta, uint64_t suid, SArray *uidList, SHashObj *tags) {
int32_t metaGetTableTags(SMeta *pMeta, uint64_t suid, SArray *pUidTagInfo) {
SMCtbCursor *pCur = metaOpenCtbCursor(pMeta, suid, 1);
SHashObj *uHash = NULL;
size_t len = taosArrayGetSize(uidList); // len > 0 means there already have uids
if (len > 0) {
uHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
for (int i = 0; i < len; i++) {
int64_t *uid = taosArrayGet(uidList, i);
taosHashPut(uHash, uid, sizeof(int64_t), &i, sizeof(i));
// If len > 0 means there already have uids, and we only want the
// tags of the specified tables, of which uid in the uid list. Otherwise, all table tags are retrieved and kept
// in the hash map, that may require a lot of memory
SHashObj *pSepecifiedUidMap = NULL;
size_t numOfElems = taosArrayGetSize(pUidTagInfo);
if (numOfElems > 0) {
pSepecifiedUidMap = taosHashInit(numOfElems / 0.7, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
for (int i = 0; i < numOfElems; i++) {
STUidTagInfo *pTagInfo = taosArrayGet(pUidTagInfo, i);
taosHashPut(pSepecifiedUidMap, &pTagInfo->uid, sizeof(uint64_t), &i, sizeof(int32_t));
}
}
while (1) {
tb_uid_t id = metaCtbCursorNext(pCur);
if (id == 0) {
break;
}
if (len > 0 && taosHashGet(uHash, &id, sizeof(int64_t)) == NULL) {
continue;
} else if (len == 0) {
taosArrayPush(uidList, &id);
if (numOfElems == 0) { // all data needs to be added into the pUidTagInfo list
while (1) {
tb_uid_t uid = metaCtbCursorNext(pCur);
if (uid == 0) {
break;
}
STUidTagInfo info = {.uid = uid, .pTagVal = pCur->pVal};
info.pTagVal = taosMemoryMalloc(pCur->vLen);
memcpy(info.pTagVal, pCur->pVal, pCur->vLen);
taosArrayPush(pUidTagInfo, &info);
}
} else { // only the specified tables need to be added
while (1) {
tb_uid_t uid = metaCtbCursorNext(pCur);
if (uid == 0) {
break;
}
taosHashPut(tags, &id, sizeof(int64_t), pCur->pVal, pCur->vLen);
int32_t *index = taosHashGet(pSepecifiedUidMap, &uid, sizeof(uint64_t));
if (index == NULL) {
continue;
}
STUidTagInfo *pTagInfo = taosArrayGet(pUidTagInfo, *index);
if (pTagInfo->pTagVal == NULL) {
pTagInfo->pTagVal = taosMemoryMalloc(pCur->vLen);
memcpy(pTagInfo->pTagVal, pCur->pVal, pCur->vLen);
}
}
}
taosHashCleanup(uHash);
taosHashCleanup(pSepecifiedUidMap);
metaCloseCtbCursor(pCur, 1);
return TSDB_CODE_SUCCESS;
}
......
......@@ -325,7 +325,7 @@ _exit:
}
static int32_t tdRSmaFSScanAndTryFix(SSma *pSma) {
int32_t code = 0;
int32_t code = 0;
#if 0
int32_t lino = 0;
SVnode *pVnode = pSma->pVnode;
......@@ -559,7 +559,7 @@ int32_t tdRSmaFSRef(SSma *pSma, SRSmaFS *pFS) {
SRSmaFS *qFS = RSMA_FS(pStat);
int32_t size = taosArrayGetSize(qFS->aQTaskInf);
pFS->aQTaskInf = taosArrayInit(size, sizeof(SQTaskFile));
pFS->aQTaskInf = taosArrayInit_s(size, sizeof(SQTaskFile), size);
if (pFS->aQTaskInf == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
......@@ -574,7 +574,6 @@ int32_t tdRSmaFSRef(SSma *pSma, SRSmaFS *pFS) {
}
}
taosArraySetSize(pFS->aQTaskInf, size);
memcpy(pFS->aQTaskInf->pData, qFS->aQTaskInf->pData, size * sizeof(SQTaskFile));
_exit:
......@@ -640,9 +639,8 @@ int32_t tdRSmaFSCopy(SSma *pSma, SRSmaFS *pFS) {
code = tdRSmaFSCreate(pFS, size);
TSDB_CHECK_CODE(code, lino, _exit);
taosArraySetSize(pFS->aQTaskInf, size);
memcpy(pFS->aQTaskInf->pData, qFS->aQTaskInf->pData, size * sizeof(SQTaskFile));
taosArrayClear(pFS->aQTaskInf->pData);
taosArrayAddBatch(pFS->aQTaskInf->pData, qFS->aQTaskInf->pData, size);
_exit:
if (code) {
......
此差异已折叠。
......@@ -209,6 +209,8 @@ int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle) {
tEncoderInit(&encoder, buf, vlen);
if (tEncodeSTqHandle(&encoder, pHandle) < 0) {
tEncoderClear(&encoder);
taosMemoryFree(buf);
return -1;
}
......@@ -216,18 +218,26 @@ int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle) {
if (tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) <
0) {
tEncoderClear(&encoder);
taosMemoryFree(buf);
return -1;
}
if (tdbTbUpsert(pTq->pExecStore, key, (int)strlen(key), buf, vlen, txn) < 0) {
tEncoderClear(&encoder);
taosMemoryFree(buf);
return -1;
}
if (tdbCommit(pTq->pMetaDB, txn) < 0) {
tEncoderClear(&encoder);
taosMemoryFree(buf);
return -1;
}
if (tdbPostCommit(pTq->pMetaDB, txn) < 0) {
tEncoderClear(&encoder);
taosMemoryFree(buf);
return -1;
}
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册