未验证 提交 cb35b453 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #19870 from taosdata/feature/3_liaohj

other: merge opt to main.
......@@ -367,6 +367,12 @@ typedef struct SSortExecInfo {
int32_t readBytes; // read io bytes
} SSortExecInfo;
typedef struct STUidTagInfo {
char* name;
uint64_t uid;
void* pTagVal;
} STUidTagInfo;
// stream special block column
#define START_TS_COLUMN_INDEX 0
......
......@@ -132,14 +132,16 @@ typedef struct SqlFunctionCtx {
SInputColumnInfoData input;
SResultDataInfo resDataInfo;
uint32_t order; // data block scanner order: asc|desc
uint8_t isPseudoFunc;// denote current function is pseudo function or not [added for perf reason]
uint8_t isNotNullFunc;// not return null value.
uint8_t scanFlag; // record current running step, default: 0
int16_t functionId; // function id
char *pOutput; // final result output buffer, point to sdata->data
int32_t numOfParams;
// input parameter, e.g., top(k, 20), the number of results of top query is kept in param
SFunctParam *param;
// corresponding output buffer for timestamp of each result, e.g., diff/csum
SColumnInfoData *pTsOutput;
int32_t numOfParams;
int32_t offset;
SResultRowEntryInfo *resultInfo;
SSubsidiaryResInfo subsidiaries;
......@@ -152,7 +154,7 @@ typedef struct SqlFunctionCtx {
struct SSDataBlock *pDstBlock; // used by indefinite rows function to set selectivity
SSerializeDataHandle saveHandle;
int32_t exprIdx;
char udfName[TSDB_FUNC_NAME_LEN];
char *udfName;
} SqlFunctionCtx;
typedef struct tExprNode {
......
......@@ -41,6 +41,7 @@ extern char tsSSE42Enable;
extern char tsAVXEnable;
extern char tsAVX2Enable;
extern char tsFMAEnable;
extern char tsTagFilterCache;
extern char configDir[];
extern char tsDataDir[];
......
......@@ -53,6 +53,7 @@ typedef struct SArray {
* @return
*/
SArray* taosArrayInit(size_t size, size_t elemSize);
SArray* taosArrayInit_s(size_t size, size_t elemSize, size_t initialSize);
/**
*
......@@ -149,14 +150,6 @@ void* taosArrayGetLast(const SArray* pArray);
*/
size_t taosArrayGetSize(const SArray* pArray);
/**
* set the size of array
* @param pArray
* @param size size of the array
* @return
*/
void taosArraySetSize(SArray* pArray, size_t size);
/**
* insert data into array
* @param pArray
......
......@@ -89,7 +89,7 @@ bool taosAssertRelease(bool condition);
// Disable all asserts that may compromise the performance.
#if defined DISABLE_ASSERT
#define ASSERT(condition)
#define ASSERTS(condition, ...)
#define ASSERTS(condition, ...) (0)
#else
#define ASSERTS(condition, ...) taosAssertDebug(condition, __FILE__, __LINE__, __VA_ARGS__)
#ifdef NDEBUG
......
......@@ -116,6 +116,7 @@ typedef struct SHNode {
struct SHNode *next;
uint32_t keyLen : 20;
uint32_t dataLen : 12;
uint32_t hashVal;
char data[];
} SHNode;
#pragma pack(pop)
......
......@@ -45,11 +45,25 @@ typedef struct STraceId {
#define TRACE_GET_MSGID(traceId) (traceId)->msgId
#define TRACE_TO_STR(traceId, buf) \
//#define TRACE_TO_STR(traceId, buf) \
// do { \
// int64_t rootId = (traceId) != NULL ? (traceId)->rootId : 0; \
// int64_t msgId = (traceId) != NULL ? (traceId)->msgId : 0; \
// sprintf(buf, "0x%" PRIx64 ":0x%" PRIx64 "", rootId, msgId); \
// } while (0)
#define TRACE_TO_STR(_traceId, _buf) \
do { \
int64_t rootId = (traceId) != NULL ? (traceId)->rootId : 0; \
int64_t msgId = (traceId) != NULL ? (traceId)->msgId : 0; \
sprintf(buf, "0x%" PRIx64 ":0x%" PRIx64 "", rootId, msgId); \
int64_t rootId = (_traceId) != NULL ? (_traceId)->rootId : 0; \
int64_t msgId = (_traceId) != NULL ? (_traceId)->msgId : 0; \
char* _t = _buf; \
_t[0] = '0'; \
_t[1] = 'x'; \
_t += titoa(rootId, 16, &_t[2]); \
_t[0] = ':'; \
_t[1] = '0'; \
_t[2] = 'x'; \
_t += titoa(msgId, 16, &_t[3]); \
} while (0)
#ifdef __cplusplus
......
......@@ -46,6 +46,9 @@ char *paGetToken(char *src, char **token, int32_t *tokenLen);
int32_t taosByteArrayToHexStr(char bytes[], int32_t len, char hexstr[]);
int32_t taosHexStrToByteArray(char hexstr[], char bytes[]);
int32_t tintToHex(uint64_t val, char hex[]);
int32_t titoa(uint64_t val, size_t radix, char str[]);
char *taosIpStr(uint32_t ipInt);
uint32_t ip2uint(const char *const ip_addr);
void taosIp2String(uint32_t ip, char *str);
......
/*
xxHash - Extremely Fast Hash algorithm
Header File
Copyright (C) 2012-2016, Yann Collet.
BSD 2-Clause License (http://www.opensource.org/licenses/bsd-license.php)
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are
met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above
copyright notice, this list of conditions and the following disclaimer
in the documentation and/or other materials provided with the
distribution.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
You can contact the author at :
- xxHash source repository : https://github.com/Cyan4973/xxHash
*/
/* Notice extracted from xxHash homepage :
xxHash is an extremely fast Hash algorithm, running at RAM speed limits.
It also successfully passes all tests from the SMHasher suite.
Comparison (single thread, Windows Seven 32 bits, using SMHasher on a Core 2 Duo @3GHz)
Name Speed Q.Score Author
xxHash 5.4 GB/s 10
CrapWow 3.2 GB/s 2 Andrew
MumurHash 3a 2.7 GB/s 10 Austin Appleby
SpookyHash 2.0 GB/s 10 Bob Jenkins
SBox 1.4 GB/s 9 Bret Mulvey
Lookup3 1.2 GB/s 9 Bob Jenkins
SuperFastHash 1.2 GB/s 1 Paul Hsieh
CityHash64 1.05 GB/s 10 Pike & Alakuijala
FNV 0.55 GB/s 5 Fowler, Noll, Vo
CRC32 0.43 GB/s 9
MD5-32 0.33 GB/s 10 Ronald L. Rivest
SHA1-32 0.28 GB/s 10
Q.Score is a measure of quality of the hash function.
It depends on successfully passing SMHasher test set.
10 is a perfect score.
A 64-bit version, named XXH64, is available since r35.
It offers much better speed, but for 64-bit applications only.
Name Speed on 64 bits Speed on 32 bits
XXH64 13.8 GB/s 1.9 GB/s
XXH32 6.8 GB/s 6.0 GB/s
*/
#ifndef XXHASH_H_5627135585666179
#define XXHASH_H_5627135585666179 1
#if defined (__cplusplus)
extern "C" {
#endif
/* ****************************
* Definitions
******************************/
#include <stddef.h> /* size_t */
typedef enum { XXH_OK=0, XXH_ERROR } XXH_errorcode;
/* ****************************
* API modifier
******************************/
/** XXH_INLINE_ALL (and XXH_PRIVATE_API)
* This is useful to include xxhash functions in `static` mode
* in order to inline them, and remove their symbol from the public list.
* Inlining can offer dramatic performance improvement on small keys.
* Methodology :
* #define XXH_INLINE_ALL
* #include "xxhash.h"
* `xxhash.c` is automatically included.
* It's not useful to compile and link it as a separate module.
*/
#if defined(XXH_INLINE_ALL) || defined(XXH_PRIVATE_API)
# ifndef XXH_STATIC_LINKING_ONLY
# define XXH_STATIC_LINKING_ONLY
# endif
# if defined(__GNUC__)
# define XXH_PUBLIC_API static __inline __attribute__((unused))
# elif defined (__cplusplus) || (defined (__STDC_VERSION__) && (__STDC_VERSION__ >= 199901L) /* C99 */)
# define XXH_PUBLIC_API static inline
# elif defined(_MSC_VER)
# define XXH_PUBLIC_API static __inline
# else
/* this version may generate warnings for unused static functions */
# define XXH_PUBLIC_API static
# endif
#else
# define XXH_PUBLIC_API /* do nothing */
#endif /* XXH_INLINE_ALL || XXH_PRIVATE_API */
/*! XXH_NAMESPACE, aka Namespace Emulation :
*
* If you want to include _and expose_ xxHash functions from within your own library,
* but also want to avoid symbol collisions with other libraries which may also include xxHash,
*
* you can use XXH_NAMESPACE, to automatically prefix any public symbol from xxhash library
* with the value of XXH_NAMESPACE (therefore, avoid NULL and numeric values).
*
* Note that no change is required within the calling program as long as it includes `xxhash.h` :
* regular symbol name will be automatically translated by this header.
*/
#ifdef XXH_NAMESPACE
# define XXH_CAT(A,B) A##B
# define XXH_NAME2(A,B) XXH_CAT(A,B)
# define XXH_versionNumber XXH_NAME2(XXH_NAMESPACE, XXH_versionNumber)
# define XXH32 XXH_NAME2(XXH_NAMESPACE, XXH32)
# define XXH32_createState XXH_NAME2(XXH_NAMESPACE, XXH32_createState)
# define XXH32_freeState XXH_NAME2(XXH_NAMESPACE, XXH32_freeState)
# define XXH32_reset XXH_NAME2(XXH_NAMESPACE, XXH32_reset)
# define XXH32_update XXH_NAME2(XXH_NAMESPACE, XXH32_update)
# define XXH32_digest XXH_NAME2(XXH_NAMESPACE, XXH32_digest)
# define XXH32_copyState XXH_NAME2(XXH_NAMESPACE, XXH32_copyState)
# define XXH32_canonicalFromHash XXH_NAME2(XXH_NAMESPACE, XXH32_canonicalFromHash)
# define XXH32_hashFromCanonical XXH_NAME2(XXH_NAMESPACE, XXH32_hashFromCanonical)
# define XXH64 XXH_NAME2(XXH_NAMESPACE, XXH64)
# define XXH64_createState XXH_NAME2(XXH_NAMESPACE, XXH64_createState)
# define XXH64_freeState XXH_NAME2(XXH_NAMESPACE, XXH64_freeState)
# define XXH64_reset XXH_NAME2(XXH_NAMESPACE, XXH64_reset)
# define XXH64_update XXH_NAME2(XXH_NAMESPACE, XXH64_update)
# define XXH64_digest XXH_NAME2(XXH_NAMESPACE, XXH64_digest)
# define XXH64_copyState XXH_NAME2(XXH_NAMESPACE, XXH64_copyState)
# define XXH64_canonicalFromHash XXH_NAME2(XXH_NAMESPACE, XXH64_canonicalFromHash)
# define XXH64_hashFromCanonical XXH_NAME2(XXH_NAMESPACE, XXH64_hashFromCanonical)
#endif
/* *************************************
* Version
***************************************/
#define XXH_VERSION_MAJOR 0
#define XXH_VERSION_MINOR 6
#define XXH_VERSION_RELEASE 5
#define XXH_VERSION_NUMBER (XXH_VERSION_MAJOR *100*100 + XXH_VERSION_MINOR *100 + XXH_VERSION_RELEASE)
XXH_PUBLIC_API unsigned XXH_versionNumber (void);
/*-**********************************************************************
* 32-bit hash
************************************************************************/
typedef unsigned int XXH32_hash_t;
/*! XXH32() :
Calculate the 32-bit hash of sequence "length" bytes stored at memory address "input".
The memory between input & input+length must be valid (allocated and read-accessible).
"seed" can be used to alter the result predictably.
Speed on Core 2 Duo @ 3 GHz (single thread, SMHasher benchmark) : 5.4 GB/s */
XXH_PUBLIC_API XXH32_hash_t XXH32 (const void* input, size_t length, unsigned int seed);
/*====== Streaming ======*/
typedef struct XXH32_state_s XXH32_state_t; /* incomplete type */
XXH_PUBLIC_API XXH32_state_t* XXH32_createState(void);
XXH_PUBLIC_API XXH_errorcode XXH32_freeState(XXH32_state_t* statePtr);
XXH_PUBLIC_API void XXH32_copyState(XXH32_state_t* dst_state, const XXH32_state_t* src_state);
XXH_PUBLIC_API XXH_errorcode XXH32_reset (XXH32_state_t* statePtr, unsigned int seed);
XXH_PUBLIC_API XXH_errorcode XXH32_update (XXH32_state_t* statePtr, const void* input, size_t length);
XXH_PUBLIC_API XXH32_hash_t XXH32_digest (const XXH32_state_t* statePtr);
/*
* Streaming functions generate the xxHash of an input provided in multiple segments.
* Note that, for small input, they are slower than single-call functions, due to state management.
* For small inputs, prefer `XXH32()` and `XXH64()`, which are better optimized.
*
* XXH state must first be allocated, using XXH*_createState() .
*
* Start a new hash by initializing state with a seed, using XXH*_reset().
*
* Then, feed the hash state by calling XXH*_update() as many times as necessary.
* The function returns an error code, with 0 meaning OK, and any other value meaning there is an error.
*
* Finally, a hash value can be produced anytime, by using XXH*_digest().
* This function returns the nn-bits hash as an int or long long.
*
* It's still possible to continue inserting input into the hash state after a digest,
* and generate some new hashes later on, by calling again XXH*_digest().
*
* When done, free XXH state space if it was allocated dynamically.
*/
/*====== Canonical representation ======*/
typedef struct { unsigned char digest[4]; } XXH32_canonical_t;
XXH_PUBLIC_API void XXH32_canonicalFromHash(XXH32_canonical_t* dst, XXH32_hash_t hash);
XXH_PUBLIC_API XXH32_hash_t XXH32_hashFromCanonical(const XXH32_canonical_t* src);
/* Default result type for XXH functions are primitive unsigned 32 and 64 bits.
* The canonical representation uses human-readable write convention, aka big-endian (large digits first).
* These functions allow transformation of hash result into and from its canonical format.
* This way, hash values can be written into a file / memory, and remain comparable on different systems and programs.
*/
#ifndef XXH_NO_LONG_LONG
/*-**********************************************************************
* 64-bit hash
************************************************************************/
typedef unsigned long long XXH64_hash_t;
/*! XXH64() :
Calculate the 64-bit hash of sequence of length "len" stored at memory address "input".
"seed" can be used to alter the result predictably.
This function runs faster on 64-bit systems, but slower on 32-bit systems (see benchmark).
*/
XXH_PUBLIC_API XXH64_hash_t XXH64 (const void* input, size_t length, unsigned long long seed);
/*====== Streaming ======*/
typedef struct XXH64_state_s XXH64_state_t; /* incomplete type */
XXH_PUBLIC_API XXH64_state_t* XXH64_createState(void);
XXH_PUBLIC_API XXH_errorcode XXH64_freeState(XXH64_state_t* statePtr);
XXH_PUBLIC_API void XXH64_copyState(XXH64_state_t* dst_state, const XXH64_state_t* src_state);
XXH_PUBLIC_API XXH_errorcode XXH64_reset (XXH64_state_t* statePtr, unsigned long long seed);
XXH_PUBLIC_API XXH_errorcode XXH64_update (XXH64_state_t* statePtr, const void* input, size_t length);
XXH_PUBLIC_API XXH64_hash_t XXH64_digest (const XXH64_state_t* statePtr);
/*====== Canonical representation ======*/
typedef struct { unsigned char digest[8]; } XXH64_canonical_t;
XXH_PUBLIC_API void XXH64_canonicalFromHash(XXH64_canonical_t* dst, XXH64_hash_t hash);
XXH_PUBLIC_API XXH64_hash_t XXH64_hashFromCanonical(const XXH64_canonical_t* src);
#endif /* XXH_NO_LONG_LONG */
#ifdef XXH_STATIC_LINKING_ONLY
/* ================================================================================================
This section contains declarations which are not guaranteed to remain stable.
They may change in future versions, becoming incompatible with a different version of the library.
These declarations should only be used with static linking.
Never use them in association with dynamic linking !
=================================================================================================== */
/* These definitions are only present to allow
* static allocation of XXH state, on stack or in a struct for example.
* Never **ever** use members directly. */
#if !defined (__VMS) \
&& (defined (__cplusplus) \
|| (defined (__STDC_VERSION__) && (__STDC_VERSION__ >= 199901L) /* C99 */) )
# include <stdint.h>
struct XXH32_state_s {
uint32_t total_len_32;
uint32_t large_len;
uint32_t v1;
uint32_t v2;
uint32_t v3;
uint32_t v4;
uint32_t mem32[4];
uint32_t memsize;
uint32_t reserved; /* never read nor write, might be removed in a future version */
}; /* typedef'd to XXH32_state_t */
struct XXH64_state_s {
uint64_t total_len;
uint64_t v1;
uint64_t v2;
uint64_t v3;
uint64_t v4;
uint64_t mem64[4];
uint32_t memsize;
uint32_t reserved[2]; /* never read nor write, might be removed in a future version */
}; /* typedef'd to XXH64_state_t */
# else
struct XXH32_state_s {
unsigned total_len_32;
unsigned large_len;
unsigned v1;
unsigned v2;
unsigned v3;
unsigned v4;
unsigned mem32[4];
unsigned memsize;
unsigned reserved; /* never read nor write, might be removed in a future version */
}; /* typedef'd to XXH32_state_t */
# ifndef XXH_NO_LONG_LONG /* remove 64-bit support */
struct XXH64_state_s {
unsigned long long total_len;
unsigned long long v1;
unsigned long long v2;
unsigned long long v3;
unsigned long long v4;
unsigned long long mem64[4];
unsigned memsize;
unsigned reserved[2]; /* never read nor write, might be removed in a future version */
}; /* typedef'd to XXH64_state_t */
# endif
# endif
#if defined(XXH_INLINE_ALL) || defined(XXH_PRIVATE_API)
# include "xxhash.c" /* include xxhash function bodies as `static`, for inlining */
#endif
#endif /* XXH_STATIC_LINKING_ONLY */
#if defined (__cplusplus)
}
#endif
#endif /* XXHASH_H_5627135585666179 */
......@@ -1546,7 +1546,10 @@ size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize) {
}
void colDataDestroy(SColumnInfoData* pColData) {
if (!pColData) return;
if (!pColData) {
return;
}
if (IS_VAR_DATA_TYPE(pColData->info.type)) {
taosMemoryFreeClear(pColData->varmeta.offset);
} else {
......@@ -2353,8 +2356,7 @@ const char* blockDecode(SSDataBlock* pBlock, const char* pData) {
pStart += sizeof(uint64_t);
if (pBlock->pDataBlock == NULL) {
pBlock->pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
taosArraySetSize(pBlock->pDataBlock, numOfCols);
pBlock->pDataBlock = taosArrayInit_s(numOfCols, sizeof(SColumnInfoData), numOfCols);
}
for (int32_t i = 0; i < numOfCols; ++i) {
......
......@@ -140,6 +140,7 @@ int32_t tsMaxMemUsedByInsert = 1024;
float tsSelectivityRatio = 1.0;
int32_t tsTagFilterResCacheSize = 1024 * 10;
char tsTagFilterCache = 0;
// the maximum allowed query buffer size during query processing for each data node.
// -1 no limit (default)
......@@ -351,6 +352,7 @@ static int32_t taosAddSystemCfg(SConfig *pCfg) {
if (cfgAddBool(pCfg, "AVX2", tsAVX2Enable, 0) != 0) return -1;
if (cfgAddBool(pCfg, "FMA", tsFMAEnable, 0) != 0) return -1;
if (cfgAddBool(pCfg, "SIMD-builtins", tsSIMDBuiltins, 0) != 0) return -1;
if (cfgAddBool(pCfg, "tagFilterCache", tsTagFilterCache, 0) != 0) return -1;
if (cfgAddInt64(pCfg, "openMax", tsOpenMax, 0, INT64_MAX, 1) != 0) return -1;
if (cfgAddInt64(pCfg, "streamMax", tsStreamMax, 0, INT64_MAX, 1) != 0) return -1;
......@@ -731,6 +733,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsRpcQueueMemoryAllowed = cfgGetItem(pCfg, "rpcQueueMemoryAllowed")->i64;
tsSIMDBuiltins = (bool)cfgGetItem(pCfg, "SIMD-builtins")->bval;
tsTagFilterCache = (bool)cfgGetItem(pCfg, "tagFilterCache")->bval;
tsEnableMonitor = cfgGetItem(pCfg, "monitor")->bval;
tsMonitorInterval = cfgGetItem(pCfg, "monitorInterval")->i32;
......
......@@ -53,12 +53,20 @@ extern "C" {
#define dDebug(...) { if (dDebugFlag & DEBUG_DEBUG) { taosPrintLog("DND ", DEBUG_DEBUG, dDebugFlag, __VA_ARGS__); }}
#define dTrace(...) { if (dDebugFlag & DEBUG_TRACE) { taosPrintLog("DND ", DEBUG_TRACE, dDebugFlag, __VA_ARGS__); }}
#define dGFatal(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dFatal(param ", gtid:%s", __VA_ARGS__, buf);}
#define dGError(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dError(param ", gtid:%s", __VA_ARGS__, buf);}
#define dGWarn(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dWarn (param ", gtid:%s", __VA_ARGS__, buf);}
#define dGInfo(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dInfo (param ", gtid:%s", __VA_ARGS__, buf);}
#define dGDebug(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dDebug(param ", gtid:%s", __VA_ARGS__, buf);}
#define dGTrace(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dTrace(param ", gtid:%s", __VA_ARGS__, buf);}
//#define dGFatal(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dFatal(param ", gtid:%s", __VA_ARGS__, buf);}
//#define dGError(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dError(param ", gtid:%s", __VA_ARGS__, buf);}
//#define dGWarn(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dWarn (param ", gtid:%s", __VA_ARGS__, buf);}
//#define dGInfo(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dInfo (param ", gtid:%s", __VA_ARGS__, buf);}
//#define dGDebug(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dDebug(param ", gtid:%s", __VA_ARGS__, buf);}
//#define dGTrace(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dTrace(param ", gtid:%s", __VA_ARGS__, buf);}
// TODO: disable it temporarily
#define dGFatal(param, ...)
#define dGError(param, ...)
#define dGWarn(param, ...)
#define dGInfo(param, ...)
#define dGDebug(param, ...)
#define dGTrace(param, ...)
// clang-format on
......
......@@ -479,9 +479,8 @@ static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStre
tstrncpy(createReq.name, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
createReq.numOfColumns = pStream->outputSchema.nCols;
createReq.numOfTags = 1; // group id
createReq.pColumns = taosArrayInit(createReq.numOfColumns, sizeof(SField));
createReq.pColumns = taosArrayInit_s(createReq.numOfColumns, sizeof(SField), createReq.numOfColumns);
// build fields
taosArraySetSize(createReq.pColumns, createReq.numOfColumns);
for (int32_t i = 0; i < createReq.numOfColumns; i++) {
SField *pField = taosArrayGet(createReq.pColumns, i);
tstrncpy(pField->name, pStream->outputSchema.pSchema[i].name, TSDB_COL_NAME_LEN);
......@@ -489,8 +488,8 @@ static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStre
pField->type = pStream->outputSchema.pSchema[i].type;
pField->bytes = pStream->outputSchema.pSchema[i].bytes;
}
createReq.pTags = taosArrayInit(createReq.numOfTags, sizeof(SField));
taosArraySetSize(createReq.pTags, 1);
createReq.pTags = taosArrayInit_s(createReq.numOfTags, sizeof(SField), 1);
// build tags
SField *pField = taosArrayGet(createReq.pTags, 0);
strcpy(pField->name, "group_id");
......
......@@ -104,8 +104,8 @@ void metaReaderClear(SMetaReader *pReader);
int32_t metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid);
int32_t metaGetTableEntryByUidCache(SMetaReader *pReader, tb_uid_t uid);
int metaGetTableEntryByName(SMetaReader *pReader, const char *name);
int32_t metaGetTableTags(SMeta *pMeta, uint64_t suid, SArray *uidList, SHashObj *tags);
int32_t metaGetTableTagsByUids(SMeta *pMeta, int64_t suid, SArray *uidList, SHashObj *tags);
int32_t metaGetTableTags(SMeta *pMeta, uint64_t suid, SArray *uidList);
int32_t metaGetTableTagsByUids(SMeta *pMeta, int64_t suid, SArray *uidList);
int32_t metaReadNext(SMetaReader *pReader);
const void *metaGetTableTagVal(void *tag, int16_t type, STagVal *tagVal);
int metaGetTableNameByUid(void *meta, uint64_t uid, char *tbName);
......
......@@ -236,6 +236,7 @@ void tsdbHeadFileName(STsdb *pTsdb, SDiskID did, int32_t fid, SHeadFile *pHeadF,
void tsdbDataFileName(STsdb *pTsdb, SDiskID did, int32_t fid, SDataFile *pDataF, char fname[]);
void tsdbSttFileName(STsdb *pTsdb, SDiskID did, int32_t fid, SSttFile *pSttF, char fname[]);
void tsdbSmaFileName(STsdb *pTsdb, SDiskID did, int32_t fid, SSmaFile *pSmaF, char fname[]);
// SDelFile
void tsdbDelFileName(STsdb *pTsdb, SDelFile *pFile, char fname[]);
// tsdbFS.c ==============================================================================================
......@@ -646,6 +647,7 @@ typedef struct SSttBlockLoadInfo {
int16_t *colIds;
int32_t numOfCols;
bool sttBlockLoaded;
int32_t numOfStt;
// keep the last access position, this position may be used to reduce the binary times for
// starting last block data for a new table
......@@ -711,7 +713,7 @@ bool tMergeTreeNext(SMergeTree *pMTree);
TSDBROW tMergeTreeGetRow(SMergeTree *pMTree);
void tMergeTreeClose(SMergeTree *pMTree);
SSttBlockLoadInfo *tCreateLastBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols);
SSttBlockLoadInfo *tCreateLastBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols, int32_t numOfStt);
void resetLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo);
void getLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo, int64_t *blocks, double *el);
void *destroyLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo);
......
......@@ -32,9 +32,9 @@ typedef struct SMetaStbStatsEntry {
} SMetaStbStatsEntry;
typedef struct STagFilterResEntry {
uint64_t suid; // uid for super table
SList list; // the linked list of md5 digest, extracted from the serialized tag query condition
uint32_t qTimes; // queried times for current super table
uint32_t hitTimes; // queried times for current super table
uint32_t accTime;
} STagFilterResEntry;
struct SMetaCache {
......@@ -55,6 +55,7 @@ struct SMetaCache {
// query cache
struct STagFilterResCache {
TdThreadMutex lock;
uint32_t accTimes;
SHashObj* pTableEntry;
SLRUCache* pUidResCache;
} sTagFilterResCache;
......@@ -132,6 +133,7 @@ int32_t metaCacheOpen(SMeta* pMeta) {
goto _err2;
}
pCache->sTagFilterResCache.accTimes = 0;
pCache->sTagFilterResCache.pTableEntry =
taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), false, HASH_NO_LOCK);
if (pCache->sTagFilterResCache.pTableEntry == NULL) {
......@@ -159,9 +161,9 @@ void metaCacheClose(SMeta* pMeta) {
entryCacheClose(pMeta);
statsCacheClose(pMeta);
taosHashCleanup(pMeta->pCache->sTagFilterResCache.pTableEntry);
taosLRUCacheCleanup(pMeta->pCache->sTagFilterResCache.pUidResCache);
taosThreadMutexDestroy(&pMeta->pCache->sTagFilterResCache.lock);
taosHashCleanup(pMeta->pCache->sTagFilterResCache.pTableEntry);
taosMemoryFree(pMeta->pCache);
pMeta->pCache = NULL;
......@@ -424,6 +426,31 @@ int32_t metaStatsCacheGet(SMeta* pMeta, int64_t uid, SMetaStbStats* pInfo) {
return code;
}
static int checkAllEntriesInCache(const STagFilterResEntry* pEntry, SArray* pInvalidRes, int32_t keyLen, SLRUCache* pCache, uint64_t suid) {
SListIter iter = {0};
tdListInitIter((SList*)&(pEntry->list), &iter, TD_LIST_FORWARD);
SListNode* pNode = NULL;
uint64_t buf[3];
buf[0] = suid;
int32_t len = sizeof(uint64_t) * tListLen(buf);
while ((pNode = tdListNext(&iter)) != NULL) {
memcpy(&buf[1], pNode->data, keyLen);
// check whether it is existed in LRU cache, and remove it from linked list if not.
LRUHandle* pRes = taosLRUCacheLookup(pCache, buf, len);
if (pRes == NULL) { // remove the item in the linked list
taosArrayPush(pInvalidRes, &pNode);
} else {
taosLRUCacheRelease(pCache, pRes, false);
}
}
return 0;
}
int32_t metaGetCachedTableUidList(SMeta* pMeta, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray* pList1,
bool* acquireRes) {
// generate the composed key for LRU cache
......@@ -431,16 +458,18 @@ int32_t metaGetCachedTableUidList(SMeta* pMeta, tb_uid_t suid, const uint8_t* pK
SHashObj* pTableMap = pMeta->pCache->sTagFilterResCache.pTableEntry;
TdThreadMutex* pLock = &pMeta->pCache->sTagFilterResCache.lock;
uint64_t buf[3] = {0};
uint32_t times = 0;
uint64_t buf[4];
*acquireRes = 0;
buf[0] = suid;
memcpy(&buf[1], pKey, keyLen);
buf[0] = (uint64_t) pTableMap;
buf[1] = suid;
memcpy(&buf[2], pKey, keyLen);
taosThreadMutexLock(pLock);
pMeta->pCache->sTagFilterResCache.accTimes += 1;
int32_t len = keyLen + sizeof(uint64_t);
int32_t len = keyLen + sizeof(uint64_t) * 2;
LRUHandle* pHandle = taosLRUCacheLookup(pCache, buf, len);
if (pHandle == NULL) {
taosThreadMutexUnlock(pLock);
......@@ -458,56 +487,68 @@ int32_t metaGetCachedTableUidList(SMeta* pMeta, tb_uid_t suid, const uint8_t* pK
// set the result into the buffer
taosArrayAddBatch(pList1, p + sizeof(int32_t), size);
times = atomic_add_fetch_32(&(*pEntry)->qTimes, 1);
(*pEntry)->hitTimes += 1;
uint32_t acc = pMeta->pCache->sTagFilterResCache.accTimes;
if ((*pEntry)->hitTimes % 5000 == 0 && (*pEntry)->hitTimes > 0) {
metaInfo("cache hit:%d, total acc:%d, rate:%.2f", (*pEntry)->hitTimes, acc, ((double)(*pEntry)->hitTimes) / acc);
}
taosLRUCacheRelease(pCache, pHandle, false);
// unlock meta
taosThreadMutexUnlock(pLock);
return TSDB_CODE_SUCCESS;
}
// check if scanning all items are necessary or not
if (times >= 5000 && TD_DLIST_NELES(&(*pEntry)->list) > 10) {
taosThreadMutexLock(pLock);
static void freePayload(const void* key, size_t keyLen, void* value) {
if (value == NULL) {
return;
}
const uint64_t* p = key;
if (keyLen != sizeof(int64_t) * 4) {
metaError("key length is invalid, length:%d, expect:%d", (int32_t) keyLen, (int32_t) sizeof(uint64_t)*2);
return;
}
SArray* pInvalidRes = taosArrayInit(64, POINTER_BYTES);
SHashObj* pHashObj = (SHashObj*)p[0];
STagFilterResEntry** pEntry = taosHashGet(pHashObj, &p[1], sizeof(uint64_t));
{
int64_t st = taosGetTimestampUs();
SListIter iter = {0};
tdListInitIter(&(*pEntry)->list, &iter, TD_LIST_FORWARD);
tdListInitIter((SList*)&((*pEntry)->list), &iter, TD_LIST_FORWARD);
SListNode* pNode = NULL;
while ((pNode = tdListNext(&iter)) != NULL) {
memcpy(&buf[1], pNode->data, keyLen);
uint64_t* digest = (uint64_t*)pNode->data;
if (digest[0] == p[2] && digest[1] == p[3]) {
tdListPopNode(&((*pEntry)->list), pNode);
// check whether it is existed in LRU cache, and remove it from linked list if not.
LRUHandle* pRes = taosLRUCacheLookup(pCache, buf, len);
if (pRes == NULL) { // remove the item in the linked list
taosArrayPush(pInvalidRes, &pNode);
} else {
taosLRUCacheRelease(pCache, pRes, false);
int64_t et = taosGetTimestampUs();
metaInfo("clear items in cache, remain cached item:%d, elapsed time:%.2fms", listNEles(&((*pEntry)->list)),
(et - st)/1000.0);
return;
}
}
// remove the keys, of which query uid lists have been replaced already.
size_t s = taosArrayGetSize(pInvalidRes);
for (int32_t i = 0; i < s; ++i) {
SListNode** p1 = taosArrayGet(pInvalidRes, i);
tdListPopNode(&(*pEntry)->list, *p1);
taosMemoryFree(*p1);
}
atomic_store_32(&(*pEntry)->qTimes, 0); // reset the query times
taosArrayDestroy(pInvalidRes);
taosThreadMutexUnlock(pLock);
}
return TSDB_CODE_SUCCESS;
taosMemoryFree(value);
}
static void freePayload(const void* key, size_t keyLen, void* value) {
if (value == NULL) {
return;
static int32_t addNewEntry(SHashObj* pTableEntry, const void* pKey, int32_t keyLen, uint64_t suid) {
STagFilterResEntry* p = taosMemoryMalloc(sizeof(STagFilterResEntry));
if (p == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
taosMemoryFree(value);
p->hitTimes = 0;
tdListInit(&p->list, keyLen);
taosHashPut(pTableEntry, &suid, sizeof(uint64_t), &p, POINTER_BYTES);
tdListAppend(&p->list, pKey);
return 0;
}
// check both the payload size and selectivity ratio
......@@ -533,42 +574,61 @@ int32_t metaUidFilterCachePut(SMeta* pMeta, uint64_t suid, const void* pKey, int
SHashObj* pTableEntry = pMeta->pCache->sTagFilterResCache.pTableEntry;
TdThreadMutex* pLock = &pMeta->pCache->sTagFilterResCache.lock;
// the format of key:
// hash table address(8bytes) + suid(8bytes) + MD5 digest(16bytes)
uint64_t buf[4] = {0};
buf[0] = (uint64_t) pTableEntry;
buf[1] = suid;
memcpy(&buf[2], pKey, keyLen);
ASSERT(keyLen == 16);
int32_t code = 0;
taosThreadMutexLock(pLock);
STagFilterResEntry** pEntry = taosHashGet(pTableEntry, &suid, sizeof(uint64_t));
if (pEntry == NULL) {
STagFilterResEntry* p = taosMemoryMalloc(sizeof(STagFilterResEntry));
p->qTimes = 0;
tdListInit(&p->list, keyLen);
taosHashPut(pTableEntry, &suid, sizeof(uint64_t), &p, POINTER_BYTES);
tdListAppend(&p->list, pKey);
code = addNewEntry(pTableEntry, pKey, keyLen, suid);
if (code != TSDB_CODE_SUCCESS) {
goto _end;
}
} else {
// check if it exists or not
size_t size = listNEles(&(*pEntry)->list);
if (size == 0) {
tdListAppend(&(*pEntry)->list, pKey);
} else {
SListNode* pNode = listHead(&(*pEntry)->list);
uint64_t* p = (uint64_t*)pNode->data;
if (p[1] == ((uint64_t*)pKey)[1] && p[0] == ((uint64_t*)pKey)[0]) {
// we have already found the existed items, no need to added to cache anymore.
taosThreadMutexUnlock(pLock);
return TSDB_CODE_SUCCESS;
} else { // not equal, append it
tdListAppend(&(*pEntry)->list, pKey);
}
uint64_t buf[3] = {0};
buf[0] = suid;
memcpy(&buf[1], pKey, keyLen);
ASSERT(sizeof(uint64_t) + keyLen == 24);
}
}
// add to cache.
taosLRUCacheInsert(pCache, buf, sizeof(uint64_t) + keyLen, pPayload, payloadLen, freePayload, NULL,
taosLRUCacheInsert(pCache, buf, sizeof(uint64_t)*2 + keyLen, pPayload, payloadLen, freePayload, NULL,
TAOS_LRU_PRIORITY_LOW);
_end:
taosThreadMutexUnlock(pLock);
metaDebug("vgId:%d, suid:%" PRIu64 " list cache added into cache, total:%d, tables:%d", TD_VID(pMeta->pVnode), suid,
(int32_t)taosLRUCacheGetUsage(pCache), taosHashGetSize(pTableEntry));
return TSDB_CODE_SUCCESS;
return code;
}
// remove the lru cache that are expired due to the tags value update, or creating, or dropping, of child tables
int32_t metaUidCacheClear(SMeta* pMeta, uint64_t suid) {
int32_t keyLen = sizeof(uint64_t) * 3;
uint64_t p[3] = {0};
p[0] = suid;
uint64_t p[4] = {0};
p[0] = (uint64_t) pMeta->pCache->sTagFilterResCache.pTableEntry;
p[1] = suid;
TdThreadMutex* pLock = &pMeta->pCache->sTagFilterResCache.lock;
......@@ -584,11 +644,11 @@ int32_t metaUidCacheClear(SMeta* pMeta, uint64_t suid) {
SListNode* pNode = NULL;
while ((pNode = tdListNext(&iter)) != NULL) {
memcpy(&p[1], pNode->data, 16);
memcpy(&p[2], pNode->data, 16);
taosLRUCacheErase(pMeta->pCache->sTagFilterResCache.pUidResCache, p, keyLen);
}
(*pEntry)->qTimes = 0;
(*pEntry)->hitTimes = 0;
tdListEmpty(&(*pEntry)->list);
taosThreadMutexUnlock(pLock);
......
......@@ -1371,13 +1371,14 @@ static int32_t metaGetTableTagByUid(SMeta *pMeta, int64_t suid, int64_t uid, voi
return ret;
}
int32_t metaGetTableTagsByUids(SMeta *pMeta, int64_t suid, SArray *uidList, SHashObj *tags) {
int32_t metaGetTableTagsByUids(SMeta *pMeta, int64_t suid, SArray *uidList) {
const int32_t LIMIT = 128;
int32_t isLock = false;
int32_t sz = uidList ? taosArrayGetSize(uidList) : 0;
for (int i = 0; i < sz; i++) {
tb_uid_t *id = taosArrayGet(uidList, i);
STUidTagInfo *p = taosArrayGet(uidList, i);
if (i % LIMIT == 0) {
if (isLock) metaULock(pMeta);
......@@ -1386,51 +1387,72 @@ int32_t metaGetTableTagsByUids(SMeta *pMeta, int64_t suid, SArray *uidList, SHas
isLock = true;
}
if (taosHashGet(tags, id, sizeof(tb_uid_t)) == NULL) {
// if (taosHashGet(tags, &p->uid, sizeof(tb_uid_t)) == NULL) {
void *val = NULL;
int32_t len = 0;
if (metaGetTableTagByUid(pMeta, suid, *id, &val, &len, false) == 0) {
taosHashPut(tags, id, sizeof(tb_uid_t), val, len);
if (metaGetTableTagByUid(pMeta, suid, p->uid, &val, &len, false) == 0) {
p->pTagVal = taosMemoryMalloc(len);
memcpy(p->pTagVal, val, len);
tdbFree(val);
} else {
metaError("vgId:%d, failed to table IDs, suid: %" PRId64 ", uid: %" PRId64 "", TD_VID(pMeta->pVnode), suid,
*id);
}
metaError("vgId:%d, failed to table tags, suid: %" PRId64 ", uid: %" PRId64 "", TD_VID(pMeta->pVnode), suid,
p->uid);
}
}
// }
if (isLock) metaULock(pMeta);
return 0;
}
int32_t metaGetTableTags(SMeta *pMeta, uint64_t suid, SArray *uidList, SHashObj *tags) {
int32_t metaGetTableTags(SMeta *pMeta, uint64_t suid, SArray *pUidTagInfo) {
SMCtbCursor *pCur = metaOpenCtbCursor(pMeta, suid, 1);
SHashObj *uHash = NULL;
size_t len = taosArrayGetSize(uidList); // len > 0 means there already have uids
if (len > 0) {
uHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
for (int i = 0; i < len; i++) {
int64_t *uid = taosArrayGet(uidList, i);
taosHashPut(uHash, uid, sizeof(int64_t), &i, sizeof(i));
// If len > 0 means there already have uids, and we only want the
// tags of the specified tables, of which uid in the uid list. Otherwise, all table tags are retrieved and kept
// in the hash map, that may require a lot of memory
SHashObj *pSepecifiedUidMap = NULL;
size_t numOfElems = taosArrayGetSize(pUidTagInfo);
if (numOfElems > 0) {
pSepecifiedUidMap = taosHashInit(numOfElems / 0.7, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
for (int i = 0; i < numOfElems; i++) {
STUidTagInfo *pTagInfo = taosArrayGet(pUidTagInfo, i);
taosHashPut(pSepecifiedUidMap, &pTagInfo->uid, sizeof(uint64_t), &i, sizeof(int32_t));
}
}
if (numOfElems == 0) { // all data needs to be added into the pUidTagInfo list
while (1) {
tb_uid_t id = metaCtbCursorNext(pCur);
if (id == 0) {
tb_uid_t uid = metaCtbCursorNext(pCur);
if (uid == 0) {
break;
}
if (len > 0 && taosHashGet(uHash, &id, sizeof(int64_t)) == NULL) {
STUidTagInfo info = {.uid = uid, .pTagVal = pCur->pVal};
info.pTagVal = taosMemoryMalloc(pCur->vLen);
memcpy(info.pTagVal, pCur->pVal, pCur->vLen);
taosArrayPush(pUidTagInfo, &info);
}
} else { // only the specified tables need to be added
while (1) {
tb_uid_t uid = metaCtbCursorNext(pCur);
if (uid == 0) {
break;
}
int32_t *index = taosHashGet(pSepecifiedUidMap, &uid, sizeof(uint64_t));
if (index == NULL) {
continue;
} else if (len == 0) {
taosArrayPush(uidList, &id);
}
taosHashPut(tags, &id, sizeof(int64_t), pCur->pVal, pCur->vLen);
STUidTagInfo *pTagInfo = taosArrayGet(pUidTagInfo, *index);
if (pTagInfo->pTagVal == NULL) {
pTagInfo->pTagVal = taosMemoryMalloc(pCur->vLen);
memcpy(pTagInfo->pTagVal, pCur->pVal, pCur->vLen);
}
}
}
taosHashCleanup(uHash);
taosHashCleanup(pSepecifiedUidMap);
metaCloseCtbCursor(pCur, 1);
return TSDB_CODE_SUCCESS;
}
......
......@@ -181,7 +181,8 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList,
}
}
p->pLoadInfo = tCreateLastBlockLoadInfo(p->pSchema, NULL, 0);
int32_t numOfStt = ((SVnode*)pVnode)->config.sttTrigger;
p->pLoadInfo = tCreateLastBlockLoadInfo(p->pSchema, NULL, 0, numOfStt);
if (p->pLoadInfo == NULL) {
tsdbCacherowsReaderClose(p);
return TSDB_CODE_OUT_OF_MEMORY;
......
......@@ -92,24 +92,56 @@ static int32_t tGetSmaFile(uint8_t *p, SSmaFile *pSmaFile) {
}
// EXPOSED APIS ==================================================
static char* getFileNamePrefix(STsdb *pTsdb, SDiskID did, int32_t fid, uint64_t commitId, char fname[]) {
const char* p1 = tfsGetDiskPath(pTsdb->pVnode->pTfs, did);
int32_t len = strlen(p1);
char* p = memcpy(fname, p1, len);
p += len;
*(p++) = TD_DIRSEP[0];
len = strlen(pTsdb->path);
memcpy(p, pTsdb->path, len);
p += len;
*(p++) = TD_DIRSEP[0];
*(p++) = 'v';
p += titoa(TD_VID(pTsdb->pVnode), 10, p);
*(p++) = 'f';
p += titoa(fid, 10, p);
memcpy(p, "ver", 3);
p += 3;
p += titoa(commitId, 10, p);
return p;
}
void tsdbHeadFileName(STsdb *pTsdb, SDiskID did, int32_t fid, SHeadFile *pHeadF, char fname[]) {
snprintf(fname, TSDB_FILENAME_LEN - 1, "%s%s%s%sv%df%dver%" PRId64 "%s", tfsGetDiskPath(pTsdb->pVnode->pTfs, did),
TD_DIRSEP, pTsdb->path, TD_DIRSEP, TD_VID(pTsdb->pVnode), fid, pHeadF->commitID, ".head");
char* p = getFileNamePrefix(pTsdb, did, fid, pHeadF->commitID, fname);
memcpy(p, ".head", 5);
p[5] = 0;
}
void tsdbDataFileName(STsdb *pTsdb, SDiskID did, int32_t fid, SDataFile *pDataF, char fname[]) {
snprintf(fname, TSDB_FILENAME_LEN - 1, "%s%s%s%sv%df%dver%" PRId64 "%s", tfsGetDiskPath(pTsdb->pVnode->pTfs, did),
TD_DIRSEP, pTsdb->path, TD_DIRSEP, TD_VID(pTsdb->pVnode), fid, pDataF->commitID, ".data");
char* p = getFileNamePrefix(pTsdb, did, fid, pDataF->commitID, fname);
memcpy(p, ".data", 5);
p[5] = 0;
}
void tsdbSttFileName(STsdb *pTsdb, SDiskID did, int32_t fid, SSttFile *pSttF, char fname[]) {
snprintf(fname, TSDB_FILENAME_LEN - 1, "%s%s%s%sv%df%dver%" PRId64 "%s", tfsGetDiskPath(pTsdb->pVnode->pTfs, did),
TD_DIRSEP, pTsdb->path, TD_DIRSEP, TD_VID(pTsdb->pVnode), fid, pSttF->commitID, ".stt");
char* p = getFileNamePrefix(pTsdb, did, fid, pSttF->commitID, fname);
memcpy(p, ".stt", 4);
p[4] = 0;
}
void tsdbSmaFileName(STsdb *pTsdb, SDiskID did, int32_t fid, SSmaFile *pSmaF, char fname[]) {
snprintf(fname, TSDB_FILENAME_LEN - 1, "%s%s%s%sv%df%dver%" PRId64 "%s", tfsGetDiskPath(pTsdb->pVnode->pTfs, did),
TD_DIRSEP, pTsdb->path, TD_DIRSEP, TD_VID(pTsdb->pVnode), fid, pSmaF->commitID, ".sma");
char* p = getFileNamePrefix(pTsdb, did, fid, pSmaF->commitID, fname);
memcpy(p, ".sma", 4);
p[4] = 0;
}
bool tsdbDelFileIsSame(SDelFile *pDelFile1, SDelFile *pDelFile2) { return pDelFile1->commitID == pDelFile2->commitID; }
......
......@@ -31,14 +31,16 @@ struct SLDataIter {
SSttBlockLoadInfo *pBlockLoadInfo;
};
SSttBlockLoadInfo *tCreateLastBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols) {
SSttBlockLoadInfo *pLoadInfo = taosMemoryCalloc(TSDB_MAX_STT_TRIGGER, sizeof(SSttBlockLoadInfo));
SSttBlockLoadInfo *tCreateLastBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols, int32_t numOfSttTrigger) {
SSttBlockLoadInfo *pLoadInfo = taosMemoryCalloc(numOfSttTrigger, sizeof(SSttBlockLoadInfo));
if (pLoadInfo == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
for (int32_t i = 0; i < TSDB_MAX_STT_TRIGGER; ++i) {
pLoadInfo->numOfStt = numOfSttTrigger;
for (int32_t i = 0; i < numOfSttTrigger; ++i) {
pLoadInfo[i].blockIndex[0] = -1;
pLoadInfo[i].blockIndex[1] = -1;
pLoadInfo[i].currentLoadBlockIndex = 1;
......@@ -63,7 +65,7 @@ SSttBlockLoadInfo *tCreateLastBlockLoadInfo(STSchema *pSchema, int16_t *colList,
}
void resetLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) {
for (int32_t i = 0; i < TSDB_MAX_STT_TRIGGER; ++i) {
for (int32_t i = 0; i < pLoadInfo->numOfStt; ++i) {
pLoadInfo[i].currentLoadBlockIndex = 1;
pLoadInfo[i].blockIndex[0] = -1;
pLoadInfo[i].blockIndex[1] = -1;
......@@ -77,14 +79,14 @@ void resetLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) {
}
void getLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo, int64_t *blocks, double *el) {
for (int32_t i = 0; i < TSDB_MAX_STT_TRIGGER; ++i) {
for (int32_t i = 0; i < pLoadInfo->numOfStt; ++i) {
*el += pLoadInfo[i].elapsedTime;
*blocks += pLoadInfo[i].loadBlocks;
}
}
void *destroyLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) {
for (int32_t i = 0; i < TSDB_MAX_STT_TRIGGER; ++i) {
for (int32_t i = 0; i < pLoadInfo->numOfStt; ++i) {
pLoadInfo[i].currentLoadBlockIndex = 1;
pLoadInfo[i].blockIndex[0] = -1;
pLoadInfo[i].blockIndex[1] = -1;
......
......@@ -47,6 +47,9 @@ static int32_t tsdbOpenFile(const char *path, int32_t szPage, int32_t flag, STsd
taosMemoryFree(pFD);
goto _exit;
}
// not check file size when reading data files.
if (flag != TD_FILE_READ) {
if (taosStatFile(path, &pFD->szFile, NULL) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
taosMemoryFree(pFD->pBuf);
......@@ -54,8 +57,11 @@ static int32_t tsdbOpenFile(const char *path, int32_t szPage, int32_t flag, STsd
taosMemoryFree(pFD);
goto _exit;
}
ASSERT(pFD->szFile % szPage == 0);
pFD->szFile = pFD->szFile / szPage;
}
*ppFD = pFD;
_exit:
......@@ -103,7 +109,7 @@ _exit:
static int32_t tsdbReadFilePage(STsdbFD *pFD, int64_t pgno) {
int32_t code = 0;
ASSERT(pgno <= pFD->szFile);
// ASSERT(pgno <= pFD->szFile);
// seek
int64_t offset = PAGE_OFFSET(pgno, pFD->szPage);
......@@ -175,7 +181,7 @@ static int32_t tsdbReadFile(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t
int32_t szPgCont = PAGE_CONTENT_SIZE(pFD->szPage);
int64_t bOffset = fOffset % pFD->szPage;
ASSERT(pgno && pgno <= pFD->szFile);
// ASSERT(pgno && pgno <= pFD->szFile);
ASSERT(bOffset < szPgCont);
while (n < size) {
......
......@@ -1054,9 +1054,7 @@ static int32_t tsdbMergeSkyline(SArray *pSkyline1, SArray *pSkyline2, SArray *pS
i2++;
}
taosArraySetSize(pSkyline, TARRAY_ELEM_IDX(pSkyline, pItem));
_exit:
pSkyline->size = TARRAY_ELEM_IDX(pSkyline, pItem);
return code;
}
......
......@@ -300,7 +300,7 @@ typedef struct SCtgSubRes {
ctgSubTaskCbFp fp;
} SCtgSubRes;
typedef struct SCtgTask {
struct SCtgTask {
CTG_TASK_TYPE type;
int32_t taskId;
SCtgJob* pJob;
......@@ -313,7 +313,7 @@ typedef struct SCtgTask {
SRWLatch lock;
SArray* pParents;
SCtgSubRes subRes;
} SCtgTask;
};
typedef struct SCtgTaskReq {
SCtgTask* pTask;
......
......@@ -1707,9 +1707,7 @@ int32_t ctgLaunchGetTbMetasTask(SCtgTask* pTask) {
return TSDB_CODE_SUCCESS;
}
pTask->msgCtxs = taosArrayInit(pCtx->fetchNum, sizeof(SCtgMsgCtx));
taosArraySetSize(pTask->msgCtxs, pCtx->fetchNum);
pTask->msgCtxs = taosArrayInit_s(pCtx->fetchNum, sizeof(SCtgMsgCtx), pCtx->fetchNum);
for (int32_t i = 0; i < pCtx->fetchNum; ++i) {
SCtgFetch* pFetch = taosArrayGet(pCtx->pFetchs, i);
SName* pName = ctgGetFetchName(pCtx->pNames, pFetch);
......@@ -1844,7 +1842,10 @@ int32_t ctgLaunchGetTbHashsTask(SCtgTask* pTask) {
ctgAddFetch(&pCtx->pFetchs, i, -1, &fetchIdx, baseResIdx, 0);
baseResIdx += taosArrayGetSize(pReq->pTables);
taosArraySetSize(pCtx->pResList, baseResIdx);
int32_t inc = baseResIdx - taosArrayGetSize(pCtx->pResList);
for(int32_t j = 0; j < inc; ++j) {
taosArrayPush(pCtx->pResList, &(SMetaRes){0});
}
}
}
......@@ -1856,8 +1857,7 @@ int32_t ctgLaunchGetTbHashsTask(SCtgTask* pTask) {
return TSDB_CODE_SUCCESS;
}
pTask->msgCtxs = taosArrayInit(pCtx->fetchNum, sizeof(SCtgMsgCtx));
taosArraySetSize(pTask->msgCtxs, pCtx->fetchNum);
pTask->msgCtxs = taosArrayInit_s(pCtx->fetchNum, sizeof(SCtgMsgCtx), pCtx->fetchNum);
for (int32_t i = 0; i < pCtx->fetchNum; ++i) {
SCtgFetch* pFetch = taosArrayGet(pCtx->pFetchs, i);
......
......@@ -2480,20 +2480,20 @@ int32_t ctgGetTbMetasFromCache(SCatalog *pCtg, SRequestConnInfo *pConn, SCtgTbMe
ctgDebug("db %s not in cache", dbFName);
for (int32_t i = 0; i < tbNum; ++i) {
ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
taosArraySetSize(ctx->pResList, taosArrayGetSize(ctx->pResList) + 1);
taosArrayPush(ctx->pResList, &(SMetaData){0});
}
return TSDB_CODE_SUCCESS;
}
for (int32_t i = 0; i < tbNum; ++i) {
SName *pName = taosArrayGet(pList, i);
pName = taosArrayGet(pList, i);
pCache = taosHashAcquire(dbCache->tbCache, pName->tname, strlen(pName->tname));
if (NULL == pCache) {
ctgDebug("tb %s not in cache, dbFName:%s", pName->tname, dbFName);
ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
taosArraySetSize(ctx->pResList, taosArrayGetSize(ctx->pResList) + 1);
taosArrayPush(ctx->pResList, &(SMetaRes){0});
continue;
}
......@@ -2503,7 +2503,7 @@ int32_t ctgGetTbMetasFromCache(SCatalog *pCtg, SRequestConnInfo *pConn, SCtgTbMe
CTG_UNLOCK(CTG_READ, &pCache->metaLock);
ctgDebug("tb %s meta not in cache, dbFName:%s", pName->tname, dbFName);
ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
taosArraySetSize(ctx->pResList, taosArrayGetSize(ctx->pResList) + 1);
taosArrayPush(ctx->pResList, &(SMetaRes){0});
continue;
}
......@@ -2576,7 +2576,7 @@ int32_t ctgGetTbMetasFromCache(SCatalog *pCtg, SRequestConnInfo *pConn, SCtgTbMe
if (NULL == stName) {
ctgDebug("stb 0x%" PRIx64 " not in cache, dbFName:%s", pTableMeta->suid, dbFName);
ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
taosArraySetSize(ctx->pResList, taosArrayGetSize(ctx->pResList) + 1);
taosArrayPush(ctx->pResList, &(SMetaRes){0});
taosMemoryFreeClear(pTableMeta);
continue;
......@@ -2588,7 +2588,7 @@ int32_t ctgGetTbMetasFromCache(SCatalog *pCtg, SRequestConnInfo *pConn, SCtgTbMe
taosHashRelease(dbCache->stbCache, stName);
ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
taosArraySetSize(ctx->pResList, taosArrayGetSize(ctx->pResList) + 1);
taosArrayPush(ctx->pResList, &(SMetaRes){0});
taosMemoryFreeClear(pTableMeta);
continue;
......@@ -2603,7 +2603,7 @@ int32_t ctgGetTbMetasFromCache(SCatalog *pCtg, SRequestConnInfo *pConn, SCtgTbMe
taosHashRelease(dbCache->tbCache, pCache);
ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
taosArraySetSize(ctx->pResList, taosArrayGetSize(ctx->pResList) + 1);
taosArrayPush(ctx->pResList, &(SMetaRes){0});
taosMemoryFreeClear(pTableMeta);
......@@ -2619,7 +2619,7 @@ int32_t ctgGetTbMetasFromCache(SCatalog *pCtg, SRequestConnInfo *pConn, SCtgTbMe
nctx.tbInfo.suid);
ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
taosArraySetSize(ctx->pResList, taosArrayGetSize(ctx->pResList) + 1);
taosArrayPush(ctx->pResList, &(SMetaRes){0});
taosMemoryFreeClear(pTableMeta);
......
......@@ -44,6 +44,8 @@
typedef struct SGroupResInfo {
int32_t index;
SArray* pRows; // SArray<SResKeyPos>
char* pBuf;
bool freeItem;
} SGroupResInfo;
typedef struct SResultRow {
......@@ -115,10 +117,6 @@ struct SResultRowEntryInfo* getResultEntryInfo(const SResultRow* pRow, int32_t i
static FORCE_INLINE SResultRow* getResultRowByPos(SDiskbasedBuf* pBuf, SResultRowPosition* pos, bool forUpdate) {
SFilePage* bufPage = (SFilePage*)getBufPage(pBuf, pos->pageId);
if (NULL == bufPage) {
return NULL;
}
if (forUpdate) {
setBufPageDirty(bufPage, true);
}
......
此差异已折叠。
......@@ -176,10 +176,12 @@ SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int32_t* currentPageId, i
// set the number of rows in current disk page
SResultRow* pResultRow = (SResultRow*)((char*)pData + pData->num);
memset((char*) pResultRow, 0, interBufSize);
pResultRow->pageId = pageId;
pResultRow->offset = (int32_t)pData->num;
*currentPageId = pageId;
*currentPageId = pageId;
pData->num += interBufSize;
return pResultRow;
}
......@@ -363,7 +365,7 @@ void applyAggFunctionOnPartialTuples(SExecTaskInfo* taskInfo, SqlFunctionCtx* pC
pCtx[k].input.colDataSMAIsSet = false;
}
if (fmIsWindowPseudoColumnFunc(pCtx[k].functionId)) {
if (pCtx[k].isPseudoFunc) {
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(&pCtx[k]);
char* p = GET_ROWCELL_INTERBUF(pEntryInfo);
......@@ -817,7 +819,7 @@ void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numO
continue;
}
if (fmIsWindowPseudoColumnFunc(pCtx[i].functionId)) {
if (pCtx[i].isPseudoFunc) {
continue;
}
......@@ -1063,7 +1065,7 @@ static void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t nu
pRow->numOfRows = pResInfo->numOfRes;
}
if (fmIsNotNullOutputFunc(pCtx[j].functionId)) {
if (pCtx[j].isNotNullFunc) {
returnNotNull = true;
}
}
......@@ -1187,10 +1189,16 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprS
}
if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
ASSERT(pBlock->info.rows > 0);
releaseBufPage(pBuf, page);
if (pBlock->info.rows <= 0 || pRow->numOfRows > pBlock->info.capacity) {
qError("error in copy data to ssdatablock, existed rows in block:%d, rows in pRow:%d, capacity:%d, %s",
pBlock->info.rows, pRow->numOfRows, pBlock->info.capacity, GET_TASKID(pTaskInfo));
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_APP_ERROR);
} else {
break;
}
}
pGroupResInfo->index += 1;
doCopyResultToDataBlock(pExprInfo, numOfExprs, pRow, pCtx, pBlock, rowEntryOffset, pTaskInfo);
......@@ -1730,12 +1738,12 @@ int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaul
int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize,
const char* pKey) {
int32_t code = 0;
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
// _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pAggSup->currentPageId = -1;
pAggSup->resultRowSize = getResultRowSize(pCtx, numOfOutput);
pAggSup->keyBuf = taosMemoryCalloc(1, keyBufSize + POINTER_BYTES + sizeof(int64_t));
pAggSup->pResultRowHashTable = tSimpleHashInit(100, hashFn);
pAggSup->pResultRowHashTable = tSimpleHashInit(100, taosFastHash);
if (pAggSup->keyBuf == NULL || pAggSup->pResultRowHashTable == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
......@@ -1820,6 +1828,10 @@ void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
taosMemoryFreeClear(pCtx[i].subsidiaries.buf);
taosMemoryFree(pCtx[i].input.pData);
taosMemoryFree(pCtx[i].input.pColumnDataAgg);
if (pCtx[i].udfName != NULL) {
taosMemoryFree(pCtx[i].udfName);
}
}
taosMemoryFreeClear(pCtx);
......@@ -1950,6 +1962,22 @@ void destroyAggOperatorInfo(void* param) {
taosMemoryFreeClear(param);
}
static char* buildTaskId(uint64_t taskId, uint64_t queryId) {
char* p = taosMemoryMalloc(64);
int32_t offset = 6;
memcpy(p, "TID:0x", offset);
offset += tintToHex(taskId, &p[offset]);
memcpy(&p[offset], " QID:0x", 7);
offset += 7;
offset += tintToHex(queryId, &p[offset]);
p[offset] = 0;
return p;
}
static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPTR_EXEC_MODEL model, char* dbFName) {
SExecTaskInfo* pTaskInfo = taosMemoryCalloc(1, sizeof(SExecTaskInfo));
if (pTaskInfo == NULL) {
......@@ -1960,16 +1988,13 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPT
setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
pTaskInfo->schemaInfo.dbname = strdup(dbFName);
pTaskInfo->id.queryId = queryId;
pTaskInfo->execModel = model;
pTaskInfo->pTableInfoList = tableListCreate();
pTaskInfo->stopInfo.pStopInfo = taosArrayInit(4, sizeof(SExchangeOpStopInfo));
pTaskInfo->pResultBlockList = taosArrayInit(128, POINTER_BYTES);
char* p = taosMemoryCalloc(1, 128);
snprintf(p, 128, "TID:0x%" PRIx64 " QID:0x%" PRIx64, taskId, queryId);
pTaskInfo->id.str = p;
pTaskInfo->id.queryId = queryId;
pTaskInfo->id.str = buildTaskId(taskId, queryId);
return pTaskInfo;
}
......
......@@ -847,6 +847,7 @@ static int32_t saveWinResult(int64_t ts, int32_t pageId, int32_t offset, uint64_
if (newPos == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
newPos->groupId = groupId;
newPos->pos = (SResultRowPosition){.pageId = pageId, .offset = offset};
*(int64_t*)newPos->key = ts;
......@@ -854,6 +855,7 @@ static int32_t saveWinResult(int64_t ts, int32_t pageId, int32_t offset, uint64_
if (taosHashPut(pUpdatedMap, &key, sizeof(SWinKey), &newPos, sizeof(void*)) != TSDB_CODE_SUCCESS) {
taosMemoryFree(newPos);
}
return TSDB_CODE_SUCCESS;
}
......@@ -2567,6 +2569,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
}
SArray* pUpdated = taosArrayInit(4, POINTER_BYTES);
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
SHashObj* pUpdatedMap = taosHashInit(1024, hashFn, false, HASH_NO_LOCK);
while (1) {
......@@ -2877,6 +2880,8 @@ int32_t initBasicInfoEx(SOptrBasicInfo* pBasicInfo, SExprSupp* pSup, SExprInfo*
void initDummyFunction(SqlFunctionCtx* pDummy, SqlFunctionCtx* pCtx, int32_t nums) {
for (int i = 0; i < nums; i++) {
pDummy[i].functionId = pCtx[i].functionId;
pDummy[i].isNotNullFunc = pCtx[i].isNotNullFunc;
pDummy[i].isPseudoFunc = pCtx[i].isPseudoFunc;
}
}
......@@ -3404,9 +3409,11 @@ static void copyDeleteWindowInfo(SArray* pResWins, SSHashObj* pStDeleted) {
}
}
// the allocated memory comes from outer function.
void initGroupResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayList) {
pGroupResInfo->pRows = pArrayList;
pGroupResInfo->index = 0;
pGroupResInfo->pBuf = NULL;
}
void doBuildSessionResult(SOperatorInfo* pOperator, SStreamState* pState, SGroupResInfo* pGroupResInfo,
......@@ -3417,8 +3424,7 @@ void doBuildSessionResult(SOperatorInfo* pOperator, SStreamState* pState, SGroup
blockDataCleanup(pBlock);
if (!hasRemainResults(pGroupResInfo)) {
taosArrayDestroy(pGroupResInfo->pRows);
pGroupResInfo->pRows = NULL;
cleanupGroupResInfo(pGroupResInfo);
return;
}
......@@ -4753,6 +4759,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
SOperatorInfo* downstream = pOperator->pDownstream[0];
SArray* pUpdated = taosArrayInit(4, POINTER_BYTES); // SResKeyPos
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
SHashObj* pUpdatedMap = taosHashInit(1024, hashFn, false, HASH_NO_LOCK);
......@@ -4812,6 +4819,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
taosArraySort(pUpdated, resultrowComparAsc);
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
taosHashCleanup(pUpdatedMap);
......
......@@ -123,8 +123,6 @@ static int32_t doAddToBucket(SLHashObj* pHashObj, SLHashBucket* pBucket, int32_t
}
static void doRemoveFromBucket(SFilePage* pPage, SLHashNode* pNode, SLHashBucket* pBucket) {
ASSERT(pPage != NULL && pNode != NULL && pBucket->size >= 1);
int32_t len = GET_LHASH_NODE_LEN(pNode);
char* p = (char*)pNode + len;
......@@ -301,8 +299,6 @@ void* tHashCleanup(SLHashObj* pHashObj) {
}
int32_t tHashPut(SLHashObj* pHashObj, const void* key, size_t keyLen, void* data, size_t size) {
ASSERT(pHashObj != NULL && key != NULL);
if (pHashObj->bits == 0) {
SLHashBucket* pBucket = pHashObj->pBucket[0];
doAddToBucket(pHashObj, pBucket, 0, key, keyLen, data, size);
......@@ -363,14 +359,12 @@ int32_t tHashPut(SLHashObj* pHashObj, const void* key, size_t keyLen, void* data
if (v1 != splitBucketId) { // place it into the new bucket
ASSERT(v1 == newBucketId);
// printf("move key:%d to 0x%x bucket, remain items:%d\n", *(int32_t*)k, v1, pBucket->size - 1);
SLHashBucket* pNewBucket = pHashObj->pBucket[newBucketId];
doAddToBucket(pHashObj, pNewBucket, newBucketId, (void*)GET_LHASH_NODE_KEY(pNode), pNode->keyLen,
GET_LHASH_NODE_KEY(pNode), pNode->dataLen);
doRemoveFromBucket(p, pNode, pBucket);
} else {
// printf("check key:%d, located into: %d, skip it\n", *(int*) k, v1);
int32_t nodeSize = GET_LHASH_NODE_LEN(pStart);
pStart += nodeSize;
}
......@@ -385,7 +379,6 @@ int32_t tHashPut(SLHashObj* pHashObj, const void* key, size_t keyLen, void* data
}
char* tHashGet(SLHashObj* pHashObj, const void* key, size_t keyLen) {
ASSERT(pHashObj != NULL && key != NULL && keyLen > 0);
int32_t hashv = pHashObj->hashFn(key, keyLen);
int32_t bucketId = doGetBucketIdFromHashVal(hashv, pHashObj->bits);
......
......@@ -789,18 +789,47 @@ int32_t minmaxFunctionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
pEntryInfo->isNullRes = (pEntryInfo->numOfRes == 0) ? 1 : 0;
if (pCol->info.type == TSDB_DATA_TYPE_FLOAT) {
// NOTE: do nothing change it, for performance issue
if (!pEntryInfo->isNullRes) {
switch (pCol->info.type) {
case TSDB_DATA_TYPE_UBIGINT:
case TSDB_DATA_TYPE_BIGINT:
((int64_t*)pCol->pData)[currentRow] = pRes->v;
// colDataAppendInt64(pCol, currentRow, &pRes->v);
break;
case TSDB_DATA_TYPE_UINT:
case TSDB_DATA_TYPE_INT:
colDataAppendInt32(pCol, currentRow, (int32_t*)&pRes->v);
break;
case TSDB_DATA_TYPE_USMALLINT:
case TSDB_DATA_TYPE_SMALLINT:
colDataAppendInt16(pCol, currentRow, (int16_t*)&pRes->v);
break;
case TSDB_DATA_TYPE_BOOL:
case TSDB_DATA_TYPE_UTINYINT:
case TSDB_DATA_TYPE_TINYINT:
colDataAppendInt8(pCol, currentRow, (int8_t*)&pRes->v);
break;
case TSDB_DATA_TYPE_DOUBLE:
colDataAppendDouble(pCol, currentRow, (double*)&pRes->v);
break;
case TSDB_DATA_TYPE_FLOAT: {
float v = GET_FLOAT_VAL(&pRes->v);
colDataAppend(pCol, currentRow, (const char*)&v, pEntryInfo->isNullRes);
colDataAppendFloat(pCol, currentRow, &v);
break;
}
}
} else {
colDataAppend(pCol, currentRow, (const char*)&pRes->v, pEntryInfo->isNullRes);
colDataAppendNULL(pCol, currentRow);
}
if (pCtx->subsidiaries.num > 0) {
if (pEntryInfo->numOfRes > 0) {
code = setSelectivityValue(pCtx, pBlock, &pRes->tuplePos, currentRow);
} else {
code = setSelectivityValue(pCtx, pBlock, &pRes->nullTuplePos, currentRow);
}
}
return code;
}
......
......@@ -61,6 +61,8 @@
} \
}
#define GET_INVOKE_INTRINSIC_THRESHOLD(_bits, _bytes) ((_bits) / ((_bytes) << 3u))
static void calculateRounds(int32_t numOfRows, int32_t bytes, int32_t* remainder, int32_t* rounds, int32_t* width) {
const int32_t bitWidth = 256;
......@@ -700,8 +702,29 @@ static void doExtractVal(SColumnInfoData* pCol, int32_t i, int32_t end, SqlFunct
}
}
static int32_t saveRelatedTuple(SqlFunctionCtx* pCtx, SInputColumnInfoData* pInput, int32_t index, void* tval) {
SColumnInfoData* pCol = pInput->pData[0];
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
SMinmaxResInfo* pBuf = GET_ROWCELL_INTERBUF(pResInfo);
int32_t code = 0;
if (pCtx->subsidiaries.num > 0) {
index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval);
if (index >= 0) {
code = saveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
}
return code;
}
int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc, int32_t* nElems) {
int32_t numOfElems = 0;
int32_t code = TSDB_CODE_SUCCESS;
SInputColumnInfoData* pInput = &pCtx->input;
SColumnDataAgg* pAgg = pInput->pColumnDataAgg[0];
......@@ -719,6 +742,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc, int32_t* nElems)
// data in current data block are qualified to the query
if (pInput->colDataSMAIsSet) {
numOfElems = pInput->numOfRows - pAgg->numOfNull;
if (numOfElems == 0) {
goto _over;
......@@ -734,15 +758,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc, int32_t* nElems)
pBuf->v = GET_INT64_VAL(tval);
}
if (pCtx->subsidiaries.num > 0) {
index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval);
if (index >= 0) {
int32_t code = saveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
}
code = saveRelatedTuple(pCtx, pInput, index, tval);
} else {
if (IS_SIGNED_NUMERIC_TYPE(type)) {
int64_t prev = 0;
......@@ -751,15 +767,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc, int32_t* nElems)
int64_t val = GET_INT64_VAL(tval);
if ((prev < val) ^ isMinFunc) {
GET_INT64_VAL(&pBuf->v) = val;
if (pCtx->subsidiaries.num > 0) {
index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval);
if (index >= 0) {
int32_t code = saveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
}
code = saveRelatedTuple(pCtx, pInput, index, tval);
}
} else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
uint64_t prev = 0;
......@@ -768,15 +776,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc, int32_t* nElems)
uint64_t val = GET_UINT64_VAL(tval);
if ((prev < val) ^ isMinFunc) {
GET_UINT64_VAL(&pBuf->v) = val;
if (pCtx->subsidiaries.num > 0) {
index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval);
if (index >= 0) {
int32_t code = saveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
}
code = saveRelatedTuple(pCtx, pInput, index, tval);
}
} else if (type == TSDB_DATA_TYPE_DOUBLE) {
double prev = 0;
......@@ -785,15 +785,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc, int32_t* nElems)
double val = GET_DOUBLE_VAL(tval);
if ((prev < val) ^ isMinFunc) {
GET_DOUBLE_VAL(&pBuf->v) = val;
if (pCtx->subsidiaries.num > 0) {
index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval);
if (index >= 0) {
int32_t code = saveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
}
code = saveRelatedTuple(pCtx, pInput, index, tval);
}
} else if (type == TSDB_DATA_TYPE_FLOAT) {
float prev = 0;
......@@ -802,16 +794,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc, int32_t* nElems)
float val = GET_DOUBLE_VAL(tval);
if ((prev < val) ^ isMinFunc) {
GET_FLOAT_VAL(&pBuf->v) = val;
}
if (pCtx->subsidiaries.num > 0) {
index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval);
if (index >= 0) {
int32_t code = saveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
code = saveRelatedTuple(pCtx, pInput, index, tval);
}
}
}
......@@ -825,14 +808,51 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc, int32_t* nElems)
int32_t numOfRows = pInput->numOfRows;
int32_t end = start + numOfRows;
if (pCol->hasNull || numOfRows < 32 || pCtx->subsidiaries.num > 0) {
// clang-format off
int32_t threshold[] = {
//NULL, BOOL, TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, VARCHAR, TIMESTAMP, NCHAR,
INT32_MAX, INT32_MAX, 32, 16, 8, 4, 8, 4, INT32_MAX, INT32_MAX, INT32_MAX,
// UTINYINT,USMALLINT, UINT, UBIGINT, JSON, VARBINARY, DECIMAL, BLOB, MEDIUMBLOB, BINARY
32, 16, 8, 4, INT32_MAX, INT32_MAX, INT32_MAX, INT32_MAX, INT32_MAX, INT32_MAX,
};
// clang-format on
if (pCol->hasNull || numOfRows < threshold[pCol->info.type] || pCtx->subsidiaries.num > 0) {
int32_t i = findFirstValPosition(pCol, start, numOfRows);
if ((i < end) && (!pBuf->assign)) {
memcpy(&pBuf->v, pCol->pData + (pCol->info.bytes * i), pCol->info.bytes);
char* p = pCol->pData + pCol->info.bytes * i;
switch (type) {
case TSDB_DATA_TYPE_DOUBLE:
case TSDB_DATA_TYPE_UBIGINT:
case TSDB_DATA_TYPE_BIGINT:
pBuf->v = *(int64_t*)p;
break;
case TSDB_DATA_TYPE_UINT:
case TSDB_DATA_TYPE_INT:
pBuf->v = *(int32_t*)p;
break;
case TSDB_DATA_TYPE_USMALLINT:
case TSDB_DATA_TYPE_SMALLINT:
pBuf->v = *(int16_t*)p;
break;
case TSDB_DATA_TYPE_BOOL:
case TSDB_DATA_TYPE_UTINYINT:
case TSDB_DATA_TYPE_TINYINT:
pBuf->v = *(int8_t*)p;
break;
case TSDB_DATA_TYPE_FLOAT: {
*(float*)&pBuf->v = *(float*)p;
break;
}
default:
memcpy(&pBuf->v, p, pCol->info.bytes);
break;
}
if (pCtx->subsidiaries.num > 0) {
int32_t code = saveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
code = saveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -849,7 +869,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc, int32_t* nElems)
} else {
numOfElems = numOfRows;
switch (pCol->info.type) {
switch (type) {
case TSDB_DATA_TYPE_BOOL:
case TSDB_DATA_TYPE_TINYINT: {
handleInt8Col(pCol->pData, start, numOfRows, pBuf, isMinFunc, true);
......@@ -898,13 +918,14 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc, int32_t* nElems)
_over:
if (numOfElems == 0 && pCtx->subsidiaries.num > 0 && !pBuf->nullTupleSaved) {
int32_t code = saveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock, &pBuf->nullTuplePos);
code = saveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock, &pBuf->nullTuplePos);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
pBuf->nullTupleSaved = true;
}
*nElems = numOfElems;
return TSDB_CODE_SUCCESS;
return code;
}
......@@ -347,8 +347,6 @@ void tMemBucketUpdateBoundingBox(MinMaxEntry *r, const char *data, int32_t dataT
* in memory bucket, we only accept data array list
*/
int32_t tMemBucketPut(tMemBucket *pBucket, const void *data, size_t size) {
ASSERT(pBucket != NULL && data != NULL && size > 0);
int32_t count = 0;
int32_t bytes = pBucket->bytes;
for (int32_t i = 0; i < size; ++i) {
......
......@@ -812,7 +812,7 @@ int32_t convertUdfColumnToDataBlock(SUdfColumn *udfCol, SSDataBlock *block) {
block->info.hasVarCol = IS_VAR_DATA_TYPE(udfCol->colMeta.type);
block->pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData));
taosArraySetSize(block->pDataBlock, 1);
taosArrayPush(block->pDataBlock, &(SColumnInfoData){0});
SColumnInfoData *col = taosArrayGet(block->pDataBlock, 0);
SUdfColumnMeta *meta = &udfCol->colMeta;
col->info.precision = meta->precision;
......
......@@ -17,11 +17,10 @@
int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock* pData) {
int32_t blockNum = pReq->blockNum;
SArray* pArray = taosArrayInit(blockNum, sizeof(SSDataBlock));
SArray* pArray = taosArrayInit_s(blockNum, sizeof(SSDataBlock), blockNum);
if (pArray == NULL) {
return -1;
}
taosArraySetSize(pArray, blockNum);
ASSERT(pReq->blockNum == taosArrayGetSize(pReq->data));
ASSERT(pReq->blockNum == taosArrayGetSize(pReq->dataLen));
......@@ -49,7 +48,7 @@ int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock
if (pArray == NULL) {
return -1;
}
taosArraySetSize(pArray, 1);
taosArrayPush(pArray, &(SSDataBlock){0});
SRetrieveTableRsp* pRetrieve = pReq->pRetrieve;
SSDataBlock* pDataBlock = taosArrayGet(pArray, 0);
blockDecode(pDataBlock, pRetrieve->data);
......
......@@ -759,28 +759,30 @@ int walMetaDeserialize(SWal* pWal, const char* bytes) {
// deserialize
SArray* pArray = pWal->fileInfoSet;
taosArrayEnsureCap(pArray, sz);
SWalFileInfo* pData = pArray->pData;
for (int i = 0; i < sz; i++) {
cJSON* pInfoJson = cJSON_GetArrayItem(pFiles, i);
pInfoJson = cJSON_GetArrayItem(pFiles, i);
if (!pInfoJson) goto _err;
SWalFileInfo* pInfo = &pData[i];
SWalFileInfo info = {0};
pField = cJSON_GetObjectItem(pInfoJson, "firstVer");
if (!pField) goto _err;
pInfo->firstVer = atoll(cJSON_GetStringValue(pField));
info.firstVer = atoll(cJSON_GetStringValue(pField));
pField = cJSON_GetObjectItem(pInfoJson, "lastVer");
if (!pField) goto _err;
pInfo->lastVer = atoll(cJSON_GetStringValue(pField));
info.lastVer = atoll(cJSON_GetStringValue(pField));
pField = cJSON_GetObjectItem(pInfoJson, "createTs");
if (!pField) goto _err;
pInfo->createTs = atoll(cJSON_GetStringValue(pField));
info.createTs = atoll(cJSON_GetStringValue(pField));
pField = cJSON_GetObjectItem(pInfoJson, "closeTs");
if (!pField) goto _err;
pInfo->closeTs = atoll(cJSON_GetStringValue(pField));
info.closeTs = atoll(cJSON_GetStringValue(pField));
pField = cJSON_GetObjectItem(pInfoJson, "fileSize");
if (!pField) goto _err;
pInfo->fileSize = atoll(cJSON_GetStringValue(pField));
info.fileSize = atoll(cJSON_GetStringValue(pField));
taosArrayPush(pArray, &info);
}
taosArraySetSize(pArray, sz);
pWal->fileInfoSet = pArray;
pWal->writeCur = sz - 1;
cJSON_Delete(pRoot);
......
......@@ -122,16 +122,16 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
// delete files in descending order
int fileSetSize = taosArrayGetSize(pWal->fileInfoSet);
for (int i = fileSetSize - 1; i >= pWal->writeCur + 1; i--) {
walBuildLogName(pWal, ((SWalFileInfo *)taosArrayGet(pWal->fileInfoSet, i))->firstVer, fnameStr);
for (int i = pWal->writeCur + 1; i < fileSetSize; i++) {
SWalFileInfo* pInfo = taosArrayPop(pWal->fileInfoSet);
walBuildLogName(pWal, pInfo->firstVer, fnameStr);
wDebug("vgId:%d, wal remove file %s for rollback", pWal->cfg.vgId, fnameStr);
taosRemoveFile(fnameStr);
walBuildIdxName(pWal, ((SWalFileInfo *)taosArrayGet(pWal->fileInfoSet, i))->firstVer, fnameStr);
walBuildIdxName(pWal, pInfo->firstVer, fnameStr);
wDebug("vgId:%d, wal remove file %s for rollback", pWal->cfg.vgId, fnameStr);
taosRemoveFile(fnameStr);
}
// pop from fileInfoSet
taosArraySetSize(pWal->fileInfoSet, pWal->writeCur + 1);
}
walBuildIdxName(pWal, walGetCurFileFirstVer(pWal), fnameStr);
......
......@@ -48,6 +48,26 @@ SArray* taosArrayInit(size_t size, size_t elemSize) {
return pArray;
}
SArray* taosArrayInit_s(size_t size, size_t elemSize, size_t initialSize) {
SArray* pArray = taosMemoryMalloc(sizeof(SArray));
if (pArray == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
pArray->size = initialSize;
pArray->pData = taosMemoryCalloc(initialSize, elemSize);
if (pArray->pData == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
taosMemoryFree(pArray);
return NULL;
}
pArray->capacity = initialSize;
pArray->elemSize = elemSize;
return pArray;
}
static int32_t taosArrayResize(SArray* pArray) {
assert(pArray->size >= pArray->capacity);
......@@ -223,7 +243,13 @@ void* taosArrayGetP(const SArray* pArray, size_t index) {
return *p;
}
void* taosArrayGetLast(const SArray* pArray) { return TARRAY_GET_ELEM(pArray, pArray->size - 1); }
void* taosArrayGetLast(const SArray* pArray) {
if (pArray->size == 0) {
return NULL;
}
return TARRAY_GET_ELEM(pArray, pArray->size - 1);
}
size_t taosArrayGetSize(const SArray* pArray) {
if (pArray == NULL) {
......@@ -232,11 +258,6 @@ size_t taosArrayGetSize(const SArray* pArray) {
return pArray->size;
}
void taosArraySetSize(SArray* pArray, size_t size) {
assert(size <= pArray->capacity);
pArray->size = size;
}
void* taosArrayInsert(SArray* pArray, size_t index, void* pData) {
if (pArray == NULL || pData == NULL) {
return NULL;
......
......@@ -228,6 +228,7 @@ int32_t tsCompressINTImp(const char *const input, const int32_t nelements, char
}
int32_t tsDecompressINTImp(const char *const input, const int32_t nelements, char *const output, const char type) {
int32_t word_length = 0;
switch (type) {
case TSDB_DATA_TYPE_BIGINT:
......@@ -263,8 +264,9 @@ int32_t tsDecompressINTImp(const char *const input, const int32_t nelements, cha
int32_t _pos = 0;
int64_t prev_value = 0;
#if __AVX2__
while (1) {
if (count == nelements) break;
if (_pos == nelements) break;
uint64_t w = 0;
memcpy(&w, ip, LONG_BYTES);
......@@ -274,93 +276,216 @@ int32_t tsDecompressINTImp(const char *const input, const int32_t nelements, cha
int32_t elems = selector_to_elems[(int32_t)selector];
// Optimize the performance, by remove the constantly switch operation.
int32_t v = 0;
uint64_t zigzag_value;
int32_t v = 4;
uint64_t zigzag_value = 0;
uint64_t mask = INT64MASK(bit);
switch (type) {
case TSDB_DATA_TYPE_BIGINT: {
for (int32_t i = 0; i < elems; i++) {
int64_t* p = (int64_t*) output;
int32_t gRemainder = (nelements - _pos);
int32_t num = (gRemainder > elems)? elems:gRemainder;
int32_t batch = num >> 2;
int32_t remain = num & 0x03;
if (selector == 0 || selector == 1) {
zigzag_value = 0;
if (tsAVX2Enable && tsSIMDBuiltins) {
for (int32_t i = 0; i < batch; ++i) {
__m256i prev = _mm256_set1_epi64x(prev_value);
_mm256_storeu_si256((__m256i *)&p[_pos], prev);
_pos += 4;
}
for (int32_t i = 0; i < remain; ++i) {
p[_pos++] = prev_value;
}
} else {
zigzag_value = ((w >> (4 + v)) & INT64MASK(bit));
for (int32_t i = 0; i < elems && count < nelements; i++, count++) {
p[_pos++] = prev_value;
v += bit;
}
}
} else {
if (tsAVX2Enable && tsSIMDBuiltins) {
__m256i base = _mm256_set1_epi64x(w);
__m256i maskVal = _mm256_set1_epi64x(mask);
int64_t diff = ZIGZAG_DECODE(int64_t, zigzag_value);
int64_t curr_value = diff + prev_value;
prev_value = curr_value;
__m256i shiftBits = _mm256_set_epi64x(bit * 3 + 4, bit * 2 + 4, bit + 4, 4);
__m256i inc = _mm256_set1_epi64x(bit << 2);
*((int64_t *)output + _pos) = (int64_t)curr_value;
_pos++;
for (int32_t i = 0; i < batch; ++i) {
__m256i after = _mm256_srlv_epi64(base, shiftBits);
__m256i zigzagVal = _mm256_and_si256(after, maskVal);
// ZIGZAG_DECODE(T, v) (((v) >> 1) ^ -((T)((v)&1)))
__m256i signmask = _mm256_and_si256(_mm256_set1_epi64x(1), zigzagVal);
signmask = _mm256_sub_epi64(_mm256_setzero_si256(), signmask);
// get the four zigzag values here
__m256i delta = _mm256_xor_si256(_mm256_srli_epi64(zigzagVal, 1), signmask);
// calculate the cumulative sum (prefix sum) for each number
// decode[0] = prev_value + final[0]
// decode[1] = decode[0] + final[1] -----> prev_value + final[0] + final[1]
// decode[2] = decode[1] + final[1] -----> prev_value + final[0] + final[1] + final[2]
// decode[3] = decode[2] + final[1] -----> prev_value + final[0] + final[1] + final[2] + final[3]
// 1, 2, 3, 4
//+ 0, 1, 2, 3
// 1, 3, 5, 7
// shift and add for the first round
__m128i prev = _mm_set1_epi64x(prev_value);
delta = _mm256_add_epi64(delta, _mm256_slli_si256(delta, 8));
_mm256_storeu_si256((__m256i *)&p[_pos], delta);
// 1, 3, 5, 7
//+ 0, 0, 1, 3
// 1, 3, 6, 10
// shift and add operation for the second round
__m128i firstPart = _mm_loadu_si128((__m128i *)&p[_pos]);
__m128i secPart = _mm_add_epi64(_mm_loadu_si128((__m128i *)&p[_pos + 2]), firstPart);
firstPart = _mm_add_epi64(firstPart, prev);
secPart = _mm_add_epi64(secPart, prev);
// save it in the memory
_mm_storeu_si128((__m128i *)&p[_pos], firstPart);
_mm_storeu_si128((__m128i *)&p[_pos + 2], secPart);
shiftBits = _mm256_add_epi64(shiftBits, inc);
prev_value = p[_pos + 3];
_pos += 4;
}
// handle the remain value
for (int32_t i = 0; i < remain; i++) {
zigzag_value = ((w >> (v + (batch * bit))) & mask);
prev_value += ZIGZAG_DECODE(int64_t, zigzag_value);
p[_pos++] = prev_value;
v += bit;
if ((++count) == nelements) break;
}
} else {
for (int32_t i = 0; i < elems && count < nelements; i++, count++) {
zigzag_value = ((w >> v) & mask);
prev_value += ZIGZAG_DECODE(int64_t, zigzag_value);
p[_pos++] = prev_value;
v += bit;
}
}
}
} break;
case TSDB_DATA_TYPE_INT: {
for (int32_t i = 0; i < elems; i++) {
int32_t* p = (int32_t*) output;
if (selector == 0 || selector == 1) {
zigzag_value = 0;
} else {
zigzag_value = ((w >> (4 + v)) & INT64MASK(bit));
for (int32_t i = 0; i < elems && count < nelements; i++, count++) {
p[_pos++] = (int32_t)prev_value;
}
} else {
for (int32_t i = 0; i < elems && count < nelements; i++, count++) {
zigzag_value = ((w >> v) & mask);
prev_value += ZIGZAG_DECODE(int64_t, zigzag_value);
int64_t diff = ZIGZAG_DECODE(int64_t, zigzag_value);
int64_t curr_value = diff + prev_value;
prev_value = curr_value;
*((int32_t *)output + _pos) = (int32_t)curr_value;
_pos++;
p[_pos++] = (int32_t)prev_value;
v += bit;
if ((++count) == nelements) break;
}
}
} break;
case TSDB_DATA_TYPE_SMALLINT: {
for (int32_t i = 0; i < elems; i++) {
int16_t* p = (int16_t*) output;
if (selector == 0 || selector == 1) {
zigzag_value = 0;
for (int32_t i = 0; i < elems && count < nelements; i++, count++) {
p[_pos++] = (int16_t)prev_value;
}
} else {
zigzag_value = ((w >> (4 + v)) & INT64MASK(bit));
for (int32_t i = 0; i < elems && count < nelements; i++, count++) {
zigzag_value = ((w >> v) & mask);
prev_value += ZIGZAG_DECODE(int64_t, zigzag_value);
p[_pos++] = (int16_t)prev_value;
v += bit;
}
}
} break;
int64_t diff = ZIGZAG_DECODE(int64_t, zigzag_value);
int64_t curr_value = diff + prev_value;
prev_value = curr_value;
case TSDB_DATA_TYPE_TINYINT: {
int8_t *p = (int8_t *)output;
*((int16_t *)output + _pos) = (int16_t)curr_value;
_pos++;
if (selector == 0 || selector == 1) {
for (int32_t i = 0; i < elems && count < nelements; i++, count++) {
p[_pos++] = (int8_t)prev_value;
}
} else {
for (int32_t i = 0; i < elems && count < nelements; i++, count++) {
zigzag_value = ((w >> v) & mask);
prev_value += ZIGZAG_DECODE(int64_t, zigzag_value);
p[_pos++] = (int8_t)prev_value;
v += bit;
if ((++count) == nelements) break;
}
}
} break;
}
ip += LONG_BYTES;
}
return nelements * word_length;
#else
while (1) {
if (count == nelements) break;
uint64_t w = 0;
memcpy(&w, ip, LONG_BYTES);
char selector = (char)(w & INT64MASK(4)); // selector = 4
char bit = bit_per_integer[(int32_t)selector]; // bit = 3
int32_t elems = selector_to_elems[(int32_t)selector];
case TSDB_DATA_TYPE_TINYINT: {
for (int32_t i = 0; i < elems; i++) {
uint64_t zigzag_value;
if (selector == 0 || selector == 1) {
zigzag_value = 0;
} else {
zigzag_value = ((w >> (4 + v)) & INT64MASK(bit));
zigzag_value = ((w >> (4 + bit * i)) & INT64MASK(bit));
}
int64_t diff = ZIGZAG_DECODE(int64_t, zigzag_value);
int64_t curr_value = diff + prev_value;
prev_value = curr_value;
switch (type) {
case TSDB_DATA_TYPE_BIGINT:
*((int64_t *)output + _pos) = (int64_t)curr_value;
_pos++;
break;
case TSDB_DATA_TYPE_INT:
*((int32_t *)output + _pos) = (int32_t)curr_value;
_pos++;
break;
case TSDB_DATA_TYPE_SMALLINT:
*((int16_t *)output + _pos) = (int16_t)curr_value;
_pos++;
break;
case TSDB_DATA_TYPE_TINYINT:
*((int8_t *)output + _pos) = (int8_t)curr_value;
_pos++;
v += bit;
if ((++count) == nelements) break;
break;
default:
perror("Wrong integer types.\n");
return -1;
}
} break;
count++;
if (count == nelements) break;
}
ip += LONG_BYTES;
}
return nelements * word_length;
#endif
}
/* ----------------------------------------------Bool Compression
......
......@@ -421,7 +421,11 @@ int32_t taosHashGetDup_m(SHashObj *pHashObj, const void *key, size_t keyLen, voi
}
void *taosHashGetImpl(SHashObj *pHashObj, const void *key, size_t keyLen, void **d, int32_t *size, bool addRef) {
if (pHashObj == NULL || taosHashTableEmpty(pHashObj) || keyLen == 0 || key == NULL) {
if (pHashObj == NULL || keyLen == 0 || key == NULL) {
return NULL;
}
if ((atomic_load_64((int64_t *)&pHashObj->size) == 0)) {
return NULL;
}
......
......@@ -17,6 +17,7 @@
#include "tcompare.h"
#include "thash.h"
#include "types.h"
#include "xxhash.h"
#define ROTL32(x, r) ((x) << (r) | (x) >> (32u - (r)))
......@@ -49,6 +50,11 @@ uint32_t taosDJB2Hash(const char *key, uint32_t len) {
return hash;
}
uint32_t xxHash(const char *key, uint32_t len) {
int32_t seed = 0xcc9e2d51;
return XXH32(key, len, seed);
}
uint32_t MurmurHash3_32(const char *key, uint32_t len) {
const uint8_t *data = (const uint8_t *)key;
const int32_t nblocks = len >> 2u;
......@@ -192,8 +198,6 @@ _hash_fn_t taosGetDefaultHashFunction(int32_t type) {
fn = taosIntHash_64;
break;
case TSDB_DATA_TYPE_BINARY:
fn = MurmurHash3_32;
break;
case TSDB_DATA_TYPE_NCHAR:
fn = MurmurHash3_32;
break;
......
......@@ -325,11 +325,10 @@ int32_t tjsonToTArray(const SJson* pJson, const char* pName, FToObject func, SAr
const cJSON* jArray = tjsonGetObjectItem(pJson, pName);
int32_t size = tjsonGetArraySize(jArray);
if (size > 0) {
*pArray = taosArrayInit(size, itemSize);
*pArray = taosArrayInit_s(size, itemSize, size);
if (NULL == *pArray) {
return TSDB_CODE_OUT_OF_MEMORY;
}
taosArraySetSize(*pArray, size);
for (int32_t i = 0; i < size; ++i) {
int32_t code = func(tjsonGetArrayItem(jArray, i), taosArrayGet(*pArray, i));
if (TSDB_CODE_SUCCESS != code) {
......
......@@ -2,7 +2,7 @@
#include "tpagedbuf.h"
#include "taoserror.h"
#include "tcompression.h"
#include "thash.h"
#include "tsimplehash.h"
#include "tlog.h"
#define GET_PAYLOAD_DATA(_p) ((char*)(_p)->pData + POINTER_BYTES)
......@@ -38,7 +38,7 @@ struct SDiskbasedBuf {
int32_t inMemPages; // numOfPages that are allocated in memory
SList* freePgList; // free page list
SArray* pIdList; // page id list
SHashObj* all;
SSHashObj*all;
SList* lruList;
void* emptyDummyIdList; // dummy id list
void* assistBuf; // assistant buffer for compress/decompress data
......@@ -374,12 +374,7 @@ int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMem
goto _error;
}
pPBuf->assistBuf = taosMemoryMalloc(pPBuf->pageSize + 2); // EXTRA BYTES
if (pPBuf->assistBuf == NULL) {
goto _error;
}
pPBuf->all = taosHashInit(10, fn, true, false);
pPBuf->all = tSimpleHashInit(64, fn);
if (pPBuf->all == NULL) {
goto _error;
}
......@@ -441,7 +436,7 @@ void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t* pageId) {
}
// add to hash map
taosHashPut(pBuf->all, pageId, sizeof(int32_t), &pi, POINTER_BYTES);
tSimpleHashPut(pBuf->all, pageId, sizeof(int32_t), &pi, POINTER_BYTES);
pBuf->totalBufSize += pBuf->pageSize;
}
......@@ -466,7 +461,7 @@ void* getBufPage(SDiskbasedBuf* pBuf, int32_t id) {
pBuf->statis.getPages += 1;
SPageInfo** pi = taosHashGet(pBuf->all, &id, sizeof(int32_t));
SPageInfo** pi = tSimpleHashGet(pBuf->all, &id, sizeof(int32_t));
if (pi == NULL || *pi == NULL) {
uError("failed to locate the buffer page:%d, %s", id, pBuf->id);
terrno = TSDB_CODE_INVALID_PARA;
......@@ -615,7 +610,7 @@ void destroyDiskbasedBuf(SDiskbasedBuf* pBuf) {
taosArrayDestroy(pBuf->emptyDummyIdList);
taosArrayDestroy(pBuf->pFree);
taosHashCleanup(pBuf->all);
tSimpleHashCleanup(pBuf->all);
taosMemoryFreeClear(pBuf->id);
taosMemoryFreeClear(pBuf->assistBuf);
......@@ -641,7 +636,12 @@ void setBufPageDirty(void* pPage, bool dirty) {
ppi->dirty = dirty;
}
void setBufPageCompressOnDisk(SDiskbasedBuf* pBuf, bool comp) { pBuf->comp = comp; }
void setBufPageCompressOnDisk(SDiskbasedBuf* pBuf, bool comp) {
pBuf->comp = comp;
if (comp && (pBuf->assistBuf == NULL)) {
pBuf->assistBuf = taosMemoryMalloc(pBuf->pageSize + 2); // EXTRA BYTES
}
}
void dBufSetBufPageRecycled(SDiskbasedBuf* pBuf, void* pPage) {
SPageInfo* ppi = getPageInfoFromPayload(pPage);
......@@ -704,7 +704,7 @@ void clearDiskbasedBuf(SDiskbasedBuf* pBuf) {
taosArrayClear(pBuf->emptyDummyIdList);
taosArrayClear(pBuf->pFree);
taosHashClear(pBuf->all);
tSimpleHashClear(pBuf->all);
pBuf->numOfPages = 0; // all pages are in buffer in the first place
pBuf->totalBufSize = 0;
......
......@@ -18,12 +18,13 @@
#include "tlog.h"
#include "tdef.h"
#define DEFAULT_BUF_PAGE_SIZE 1024
#define SHASH_DEFAULT_LOAD_FACTOR 0.75
#define HASH_MAX_CAPACITY (1024 * 1024 * 16L)
#define SHASH_NEED_RESIZE(_h) ((_h)->size >= (_h)->capacity * SHASH_DEFAULT_LOAD_FACTOR)
#define GET_SHASH_NODE_KEY(_n, _dl) ((char *)(_n) + sizeof(SHNode) + (_dl))
#define GET_SHASH_NODE_DATA(_n) ((char *)(_n) + sizeof(SHNode))
#define GET_SHASH_NODE_DATA(_n) (((SHNode*)_n)->data)
#define GET_SHASH_NODE_KEY(_n, _dl) ((char*)GET_SHASH_NODE_DATA(_n) + (_dl))
#define HASH_INDEX(v, c) ((v) & ((c)-1))
......@@ -38,6 +39,8 @@ struct SSHashObj {
int64_t size; // number of elements in hash table
_hash_fn_t hashFp; // hash function
_equal_fn_t equalFp; // equal function
SArray* pHashNodeBuf;// hash node allocation buffer, 1k size of each page by default
int32_t offset; // allocation offset in current page
};
static FORCE_INLINE int32_t taosHashCapacity(int32_t length) {
......@@ -57,17 +60,20 @@ SSHashObj *tSimpleHashInit(size_t capacity, _hash_fn_t fn) {
capacity = 4;
}
SSHashObj *pHashObj = (SSHashObj *)taosMemoryCalloc(1, sizeof(SSHashObj));
SSHashObj *pHashObj = (SSHashObj *)taosMemoryMalloc(sizeof(SSHashObj));
if (!pHashObj) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
// the max slots is not defined by user
pHashObj->hashFp = fn;
pHashObj->capacity = taosHashCapacity((int32_t)capacity);
pHashObj->equalFp = memcmp;
pHashObj->hashFp = fn;
pHashObj->pHashNodeBuf = taosArrayInit(10, sizeof(void*));
pHashObj->offset = 0;
pHashObj->size = 0;
pHashObj->hashList = (SHNode **)taosMemoryCalloc(pHashObj->capacity, sizeof(void *));
if (!pHashObj->hashList) {
......@@ -75,6 +81,7 @@ SSHashObj *tSimpleHashInit(size_t capacity, _hash_fn_t fn) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
return pHashObj;
}
......@@ -82,19 +89,53 @@ int32_t tSimpleHashGetSize(const SSHashObj *pHashObj) {
if (!pHashObj) {
return 0;
}
return (int32_t)atomic_load_64((int64_t *)&pHashObj->size);
return (int32_t) pHashObj->size;
}
static void* doInternalAlloc(SSHashObj* pHashObj, int32_t size) {
#if 0
void** p = taosArrayGetLast(pHashObj->pHashNodeBuf);
if (p == NULL || (pHashObj->offset + size) > DEFAULT_BUF_PAGE_SIZE) {
// let's allocate one new page
int32_t allocSize = TMAX(size, DEFAULT_BUF_PAGE_SIZE);
void* pNewPage = taosMemoryMalloc(allocSize);
if (pNewPage == NULL) {
return NULL;
}
// if the allocate the buffer page is greater than the DFFAULT_BUF_PAGE_SIZE,
// pHashObj->offset will always be greater than DEFAULT_BUF_PAGE_SIZE, which means that
// current buffer page is full. And a new buffer page needs to be allocated.
pHashObj->offset = size;
taosArrayPush(pHashObj->pHashNodeBuf, &pNewPage);
return pNewPage;
} else {
void* pPos = (char*)(*p) + pHashObj->offset;
pHashObj->offset += size;
return pPos;
}
#else
return taosMemoryMalloc(size);
#endif
}
static SHNode *doCreateHashNode(const void *key, size_t keyLen, const void *data, size_t dataLen, uint32_t hashVal) {
SHNode *pNewNode = taosMemoryMalloc(sizeof(SHNode) + keyLen + dataLen);
static SHNode *doCreateHashNode(SSHashObj *pHashObj, const void *key, size_t keyLen, const void *data, size_t dataLen,
uint32_t hashVal) {
SHNode *pNewNode = doInternalAlloc(pHashObj, sizeof(SHNode) + keyLen + dataLen);
if (!pNewNode) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
pNewNode->keyLen = keyLen;
pNewNode->dataLen = dataLen;
pNewNode->next = NULL;
if (data) memcpy(GET_SHASH_NODE_DATA(pNewNode), data, dataLen);
pNewNode->hashVal = hashVal;
if (data) {
memcpy(GET_SHASH_NODE_DATA(pNewNode), data, dataLen);
}
memcpy(GET_SHASH_NODE_KEY(pNewNode, dataLen), key, keyLen);
return pNewNode;
}
......@@ -111,7 +152,7 @@ static void tSimpleHashTableResize(SSHashObj *pHashObj) {
return;
}
int64_t st = taosGetTimestampUs();
// int64_t st = taosGetTimestampUs();
void *pNewEntryList = taosMemoryRealloc(pHashObj->hashList, POINTER_BYTES * newCapacity);
if (!pNewEntryList) {
uWarn("hash resize failed due to out of memory, capacity remain:%zu", pHashObj->capacity);
......@@ -134,10 +175,7 @@ static void tSimpleHashTableResize(SSHashObj *pHashObj) {
SHNode *pPrev = NULL;
while (pNode != NULL) {
void *key = GET_SHASH_NODE_KEY(pNode, pNode->dataLen);
uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)pNode->keyLen);
int32_t newIdx = HASH_INDEX(hashVal, pHashObj->capacity);
int32_t newIdx = HASH_INDEX(pNode->hashVal, pHashObj->capacity);
pNext = pNode->next;
if (newIdx != idx) {
if (!pPrev) {
......@@ -156,8 +194,7 @@ static void tSimpleHashTableResize(SSHashObj *pHashObj) {
}
}
int64_t et = taosGetTimestampUs();
// int64_t et = taosGetTimestampUs();
// uDebug("hash table resize completed, new capacity:%d, load factor:%f, elapsed time:%fms",
// (int32_t)pHashObj->capacity,
// ((double)pHashObj->size) / pHashObj->capacity, (et - st) / 1000.0);
......@@ -179,13 +216,13 @@ int32_t tSimpleHashPut(SSHashObj *pHashObj, const void *key, size_t keyLen, cons
SHNode *pNode = pHashObj->hashList[slot];
if (!pNode) {
SHNode *pNewNode = doCreateHashNode(key, keyLen, data, dataLen, hashVal);
SHNode *pNewNode = doCreateHashNode(pHashObj, key, keyLen, data, dataLen, hashVal);
if (!pNewNode) {
return -1;
}
pHashObj->hashList[slot] = pNewNode;
atomic_add_fetch_64(&pHashObj->size, 1);
pHashObj->size += 1;
return 0;
}
......@@ -197,13 +234,13 @@ int32_t tSimpleHashPut(SSHashObj *pHashObj, const void *key, size_t keyLen, cons
}
if (!pNode) {
SHNode *pNewNode = doCreateHashNode(key, keyLen, data, dataLen, hashVal);
SHNode *pNewNode = doCreateHashNode(pHashObj, key, keyLen, data, dataLen, hashVal);
if (!pNewNode) {
return -1;
}
pNewNode->next = pHashObj->hashList[slot];
pHashObj->hashList[slot] = pNewNode;
atomic_add_fetch_64(&pHashObj->size, 1);
pHashObj->size += 1;
} else if (data) { // update data
memcpy(GET_SHASH_NODE_DATA(pNode), data, dataLen);
}
......@@ -270,7 +307,7 @@ int32_t tSimpleHashRemove(SSHashObj *pHashObj, const void *key, size_t keyLen) {
pPrev->next = pNode->next;
}
FREE_HASH_NODE(pNode);
atomic_sub_fetch_64(&pHashObj->size, 1);
pHashObj->size -= 1;
code = TSDB_CODE_SUCCESS;
break;
}
......@@ -305,7 +342,7 @@ int32_t tSimpleHashIterateRemove(SSHashObj *pHashObj, const void *key, size_t ke
}
FREE_HASH_NODE(pNode);
atomic_sub_fetch_64(&pHashObj->size, 1);
pHashObj->size -= 1;
break;
}
pPrev = pNode;
......@@ -315,6 +352,10 @@ int32_t tSimpleHashIterateRemove(SSHashObj *pHashObj, const void *key, size_t ke
return TSDB_CODE_SUCCESS;
}
static void destroyItems(void* pItem) {
taosMemoryFree(*(void**)pItem);
}
void tSimpleHashClear(SSHashObj *pHashObj) {
if (!pHashObj || taosHashTableEmpty(pHashObj)) {
return;
......@@ -332,9 +373,13 @@ void tSimpleHashClear(SSHashObj *pHashObj) {
FREE_HASH_NODE(pNode);
pNode = pNext;
}
pHashObj->hashList[i] = NULL;
}
atomic_store_64(&pHashObj->size, 0);
taosArrayClearEx(pHashObj->pHashNodeBuf, destroyItems);
pHashObj->offset = 0;
pHashObj->size = 0;
}
void tSimpleHashCleanup(SSHashObj *pHashObj) {
......@@ -343,6 +388,7 @@ void tSimpleHashCleanup(SSHashObj *pHashObj) {
}
tSimpleHashClear(pHashObj);
taosArrayDestroy(pHashObj->pHashNodeBuf);
taosMemoryFreeClear(pHashObj->hashList);
taosMemoryFree(pHashObj);
}
......
......@@ -159,10 +159,6 @@ char *strtolower(char *dst, const char *src) {
int32_t esc = 0;
char quote = 0, *p = dst, c;
if (ASSERTS(dst != NULL, "dst is NULL")) {
return NULL;
}
for (c = *src++; c; c = *src++) {
if (esc) {
esc = 0;
......@@ -188,10 +184,6 @@ char *strntolower(char *dst, const char *src, int32_t n) {
int32_t esc = 0;
char quote = 0, *p = dst, c;
if (ASSERTS(dst != NULL, "dst is NULL")) {
return NULL;
}
if (n == 0) {
*p = 0;
return dst;
......@@ -219,11 +211,6 @@ char *strntolower(char *dst, const char *src, int32_t n) {
char *strntolower_s(char *dst, const char *src, int32_t n) {
char *p = dst, c;
if (ASSERTS(dst != NULL, "dst is NULL")) {
return NULL;
}
if (n == 0) {
return NULL;
}
......@@ -333,6 +320,50 @@ char *strbetween(char *string, char *begin, char *end) {
return result;
}
int32_t tintToHex(uint64_t val, char hex[]) {
const char hexstr[16] = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'};
int32_t j = 0, k = 0;
if (val == 0) {
hex[j++] = hexstr[0];
return j;
}
// ignore the initial 0
while((val & (((uint64_t)0xfL) << ((15 - k) * 4))) == 0) {
k += 1;
}
for (j = 0; k < 16; ++k, ++j) {
hex[j] = hexstr[(val & (((uint64_t)0xfL) << ((15 - k) * 4))) >> (15 - k) * 4];
}
return j;
}
int32_t titoa(uint64_t val, size_t radix, char str[]) {
if (radix < 2 || radix > 16) {
return 0;
}
const char* s = "0123456789abcdef";
char buf[65] = {0};
int32_t i = 0;
uint64_t v = val;
while(v > 0) {
buf[i++] = s[v % radix];
v /= radix;
}
// reverse order
for(int32_t j = 0; j < i; ++j) {
str[j] = buf[i - j - 1];
}
return i;
}
int32_t taosByteArrayToHexStr(char bytes[], int32_t len, char hexstr[]) {
int32_t i;
char hexval[16] = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'};
......
此差异已折叠。
此差异已折叠。
......@@ -351,7 +351,7 @@ sql_error select last_row(*) from (select * from nest_tb0) having c1 > 0
print ===========>td-4805
sql_error select tbname, i from (select * from nest_tb0) group by i;
sql select count(*),c1 from (select * from nest_tb0) where c1 < 2 group by c1;
sql select count(*),c1 from (select * from nest_tb0) where c1 < 2 group by c1 order by c1;
if $rows != 2 then
return -1
endi
......
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册