Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
a7fe2cb3
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
a7fe2cb3
编写于
2月 21, 2023
作者:
D
dapan1121
浏览文件
操作
浏览文件
下载
差异文件
Merge remote-tracking branch 'origin/main' into fix/TS-2687
上级
6e0a3494
0b0af524
变更
15
隐藏空白更改
内联
并排
Showing
15 changed file
with
146 addition
and
1457 deletion
+146
-1457
include/util/tworker.h
include/util/tworker.h
+3
-3
include/util/xxhash.h
include/util/xxhash.h
+0
-328
source/client/src/clientHb.c
source/client/src/clientHb.c
+1
-1
source/client/src/clientTmq.c
source/client/src/clientTmq.c
+17
-9
source/client/test/clientTests.cpp
source/client/test/clientTests.cpp
+1
-1
source/dnode/mnode/impl/inc/mndInt.h
source/dnode/mnode/impl/inc/mndInt.h
+1
-1
source/dnode/qnode/inc/qndInt.h
source/dnode/qnode/inc/qndInt.h
+1
-1
source/dnode/vnode/src/inc/vnodeInt.h
source/dnode/vnode/src/inc/vnodeInt.h
+1
-1
source/libs/qworker/inc/qwInt.h
source/libs/qworker/inc/qwInt.h
+2
-0
source/libs/qworker/src/qwDbg.c
source/libs/qworker/src/qwDbg.c
+10
-2
source/libs/qworker/src/qworker.c
source/libs/qworker/src/qworker.c
+52
-38
source/libs/transport/src/transCli.c
source/libs/transport/src/transCli.c
+29
-22
source/util/src/tcompression.c
source/util/src/tcompression.c
+17
-9
source/util/src/tworker.c
source/util/src/tworker.c
+11
-11
source/util/src/xxhash.c
source/util/src/xxhash.c
+0
-1030
未找到文件。
include/util/tworker.h
浏览文件 @
a7fe2cb3
...
@@ -26,12 +26,12 @@ extern "C" {
...
@@ -26,12 +26,12 @@ extern "C" {
typedef
struct
SQWorkerPool
SQWorkerPool
;
typedef
struct
SQWorkerPool
SQWorkerPool
;
typedef
struct
SWWorkerPool
SWWorkerPool
;
typedef
struct
SWWorkerPool
SWWorkerPool
;
typedef
struct
SQWorker
{
typedef
struct
SQ
ueue
Worker
{
int32_t
id
;
// worker id
int32_t
id
;
// worker id
int64_t
pid
;
// thread pid
int64_t
pid
;
// thread pid
TdThread
thread
;
// thread id
TdThread
thread
;
// thread id
void
*
pool
;
void
*
pool
;
}
SQWorker
;
}
SQ
ueue
Worker
;
typedef
struct
SQWorkerPool
{
typedef
struct
SQWorkerPool
{
int32_t
max
;
// max number of workers
int32_t
max
;
// max number of workers
...
@@ -39,7 +39,7 @@ typedef struct SQWorkerPool {
...
@@ -39,7 +39,7 @@ typedef struct SQWorkerPool {
int32_t
num
;
// current number of workers
int32_t
num
;
// current number of workers
STaosQset
*
qset
;
STaosQset
*
qset
;
const
char
*
name
;
const
char
*
name
;
SQ
Worker
*
workers
;
SQ
ueueWorker
*
workers
;
TdThreadMutex
mutex
;
TdThreadMutex
mutex
;
}
SQWorkerPool
;
}
SQWorkerPool
;
...
...
include/util/xxhash.h
已删除
100644 → 0
浏览文件 @
6e0a3494
/*
xxHash - Extremely Fast Hash algorithm
Header File
Copyright (C) 2012-2016, Yann Collet.
BSD 2-Clause License (http://www.opensource.org/licenses/bsd-license.php)
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are
met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above
copyright notice, this list of conditions and the following disclaimer
in the documentation and/or other materials provided with the
distribution.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
You can contact the author at :
- xxHash source repository : https://github.com/Cyan4973/xxHash
*/
/* Notice extracted from xxHash homepage :
xxHash is an extremely fast Hash algorithm, running at RAM speed limits.
It also successfully passes all tests from the SMHasher suite.
Comparison (single thread, Windows Seven 32 bits, using SMHasher on a Core 2 Duo @3GHz)
Name Speed Q.Score Author
xxHash 5.4 GB/s 10
CrapWow 3.2 GB/s 2 Andrew
MumurHash 3a 2.7 GB/s 10 Austin Appleby
SpookyHash 2.0 GB/s 10 Bob Jenkins
SBox 1.4 GB/s 9 Bret Mulvey
Lookup3 1.2 GB/s 9 Bob Jenkins
SuperFastHash 1.2 GB/s 1 Paul Hsieh
CityHash64 1.05 GB/s 10 Pike & Alakuijala
FNV 0.55 GB/s 5 Fowler, Noll, Vo
CRC32 0.43 GB/s 9
MD5-32 0.33 GB/s 10 Ronald L. Rivest
SHA1-32 0.28 GB/s 10
Q.Score is a measure of quality of the hash function.
It depends on successfully passing SMHasher test set.
10 is a perfect score.
A 64-bit version, named XXH64, is available since r35.
It offers much better speed, but for 64-bit applications only.
Name Speed on 64 bits Speed on 32 bits
XXH64 13.8 GB/s 1.9 GB/s
XXH32 6.8 GB/s 6.0 GB/s
*/
#ifndef XXHASH_H_5627135585666179
#define XXHASH_H_5627135585666179 1
#if defined (__cplusplus)
extern
"C"
{
#endif
/* ****************************
* Definitions
******************************/
#include <stddef.h>
/* size_t */
typedef
enum
{
XXH_OK
=
0
,
XXH_ERROR
}
XXH_errorcode
;
/* ****************************
* API modifier
******************************/
/** XXH_INLINE_ALL (and XXH_PRIVATE_API)
* This is useful to include xxhash functions in `static` mode
* in order to inline them, and remove their symbol from the public list.
* Inlining can offer dramatic performance improvement on small keys.
* Methodology :
* #define XXH_INLINE_ALL
* #include "xxhash.h"
* `xxhash.c` is automatically included.
* It's not useful to compile and link it as a separate module.
*/
#if defined(XXH_INLINE_ALL) || defined(XXH_PRIVATE_API)
# ifndef XXH_STATIC_LINKING_ONLY
# define XXH_STATIC_LINKING_ONLY
# endif
# if defined(__GNUC__)
# define XXH_PUBLIC_API static __inline __attribute__((unused))
# elif defined (__cplusplus) || (defined (__STDC_VERSION__) && (__STDC_VERSION__ >= 199901L)
/* C99 */
)
# define XXH_PUBLIC_API static inline
# elif defined(_MSC_VER)
# define XXH_PUBLIC_API static __inline
# else
/* this version may generate warnings for unused static functions */
# define XXH_PUBLIC_API static
# endif
#else
# define XXH_PUBLIC_API
/* do nothing */
#endif
/* XXH_INLINE_ALL || XXH_PRIVATE_API */
/*! XXH_NAMESPACE, aka Namespace Emulation :
*
* If you want to include _and expose_ xxHash functions from within your own library,
* but also want to avoid symbol collisions with other libraries which may also include xxHash,
*
* you can use XXH_NAMESPACE, to automatically prefix any public symbol from xxhash library
* with the value of XXH_NAMESPACE (therefore, avoid NULL and numeric values).
*
* Note that no change is required within the calling program as long as it includes `xxhash.h` :
* regular symbol name will be automatically translated by this header.
*/
#ifdef XXH_NAMESPACE
# define XXH_CAT(A,B) A##B
# define XXH_NAME2(A,B) XXH_CAT(A,B)
# define XXH_versionNumber XXH_NAME2(XXH_NAMESPACE, XXH_versionNumber)
# define XXH32 XXH_NAME2(XXH_NAMESPACE, XXH32)
# define XXH32_createState XXH_NAME2(XXH_NAMESPACE, XXH32_createState)
# define XXH32_freeState XXH_NAME2(XXH_NAMESPACE, XXH32_freeState)
# define XXH32_reset XXH_NAME2(XXH_NAMESPACE, XXH32_reset)
# define XXH32_update XXH_NAME2(XXH_NAMESPACE, XXH32_update)
# define XXH32_digest XXH_NAME2(XXH_NAMESPACE, XXH32_digest)
# define XXH32_copyState XXH_NAME2(XXH_NAMESPACE, XXH32_copyState)
# define XXH32_canonicalFromHash XXH_NAME2(XXH_NAMESPACE, XXH32_canonicalFromHash)
# define XXH32_hashFromCanonical XXH_NAME2(XXH_NAMESPACE, XXH32_hashFromCanonical)
# define XXH64 XXH_NAME2(XXH_NAMESPACE, XXH64)
# define XXH64_createState XXH_NAME2(XXH_NAMESPACE, XXH64_createState)
# define XXH64_freeState XXH_NAME2(XXH_NAMESPACE, XXH64_freeState)
# define XXH64_reset XXH_NAME2(XXH_NAMESPACE, XXH64_reset)
# define XXH64_update XXH_NAME2(XXH_NAMESPACE, XXH64_update)
# define XXH64_digest XXH_NAME2(XXH_NAMESPACE, XXH64_digest)
# define XXH64_copyState XXH_NAME2(XXH_NAMESPACE, XXH64_copyState)
# define XXH64_canonicalFromHash XXH_NAME2(XXH_NAMESPACE, XXH64_canonicalFromHash)
# define XXH64_hashFromCanonical XXH_NAME2(XXH_NAMESPACE, XXH64_hashFromCanonical)
#endif
/* *************************************
* Version
***************************************/
#define XXH_VERSION_MAJOR 0
#define XXH_VERSION_MINOR 6
#define XXH_VERSION_RELEASE 5
#define XXH_VERSION_NUMBER (XXH_VERSION_MAJOR *100*100 + XXH_VERSION_MINOR *100 + XXH_VERSION_RELEASE)
XXH_PUBLIC_API
unsigned
XXH_versionNumber
(
void
);
/*-**********************************************************************
* 32-bit hash
************************************************************************/
typedef
unsigned
int
XXH32_hash_t
;
/*! XXH32() :
Calculate the 32-bit hash of sequence "length" bytes stored at memory address "input".
The memory between input & input+length must be valid (allocated and read-accessible).
"seed" can be used to alter the result predictably.
Speed on Core 2 Duo @ 3 GHz (single thread, SMHasher benchmark) : 5.4 GB/s */
XXH_PUBLIC_API
XXH32_hash_t
XXH32
(
const
void
*
input
,
size_t
length
,
unsigned
int
seed
);
/*====== Streaming ======*/
typedef
struct
XXH32_state_s
XXH32_state_t
;
/* incomplete type */
XXH_PUBLIC_API
XXH32_state_t
*
XXH32_createState
(
void
);
XXH_PUBLIC_API
XXH_errorcode
XXH32_freeState
(
XXH32_state_t
*
statePtr
);
XXH_PUBLIC_API
void
XXH32_copyState
(
XXH32_state_t
*
dst_state
,
const
XXH32_state_t
*
src_state
);
XXH_PUBLIC_API
XXH_errorcode
XXH32_reset
(
XXH32_state_t
*
statePtr
,
unsigned
int
seed
);
XXH_PUBLIC_API
XXH_errorcode
XXH32_update
(
XXH32_state_t
*
statePtr
,
const
void
*
input
,
size_t
length
);
XXH_PUBLIC_API
XXH32_hash_t
XXH32_digest
(
const
XXH32_state_t
*
statePtr
);
/*
* Streaming functions generate the xxHash of an input provided in multiple segments.
* Note that, for small input, they are slower than single-call functions, due to state management.
* For small inputs, prefer `XXH32()` and `XXH64()`, which are better optimized.
*
* XXH state must first be allocated, using XXH*_createState() .
*
* Start a new hash by initializing state with a seed, using XXH*_reset().
*
* Then, feed the hash state by calling XXH*_update() as many times as necessary.
* The function returns an error code, with 0 meaning OK, and any other value meaning there is an error.
*
* Finally, a hash value can be produced anytime, by using XXH*_digest().
* This function returns the nn-bits hash as an int or long long.
*
* It's still possible to continue inserting input into the hash state after a digest,
* and generate some new hashes later on, by calling again XXH*_digest().
*
* When done, free XXH state space if it was allocated dynamically.
*/
/*====== Canonical representation ======*/
typedef
struct
{
unsigned
char
digest
[
4
];
}
XXH32_canonical_t
;
XXH_PUBLIC_API
void
XXH32_canonicalFromHash
(
XXH32_canonical_t
*
dst
,
XXH32_hash_t
hash
);
XXH_PUBLIC_API
XXH32_hash_t
XXH32_hashFromCanonical
(
const
XXH32_canonical_t
*
src
);
/* Default result type for XXH functions are primitive unsigned 32 and 64 bits.
* The canonical representation uses human-readable write convention, aka big-endian (large digits first).
* These functions allow transformation of hash result into and from its canonical format.
* This way, hash values can be written into a file / memory, and remain comparable on different systems and programs.
*/
#ifndef XXH_NO_LONG_LONG
/*-**********************************************************************
* 64-bit hash
************************************************************************/
typedef
unsigned
long
long
XXH64_hash_t
;
/*! XXH64() :
Calculate the 64-bit hash of sequence of length "len" stored at memory address "input".
"seed" can be used to alter the result predictably.
This function runs faster on 64-bit systems, but slower on 32-bit systems (see benchmark).
*/
XXH_PUBLIC_API
XXH64_hash_t
XXH64
(
const
void
*
input
,
size_t
length
,
unsigned
long
long
seed
);
/*====== Streaming ======*/
typedef
struct
XXH64_state_s
XXH64_state_t
;
/* incomplete type */
XXH_PUBLIC_API
XXH64_state_t
*
XXH64_createState
(
void
);
XXH_PUBLIC_API
XXH_errorcode
XXH64_freeState
(
XXH64_state_t
*
statePtr
);
XXH_PUBLIC_API
void
XXH64_copyState
(
XXH64_state_t
*
dst_state
,
const
XXH64_state_t
*
src_state
);
XXH_PUBLIC_API
XXH_errorcode
XXH64_reset
(
XXH64_state_t
*
statePtr
,
unsigned
long
long
seed
);
XXH_PUBLIC_API
XXH_errorcode
XXH64_update
(
XXH64_state_t
*
statePtr
,
const
void
*
input
,
size_t
length
);
XXH_PUBLIC_API
XXH64_hash_t
XXH64_digest
(
const
XXH64_state_t
*
statePtr
);
/*====== Canonical representation ======*/
typedef
struct
{
unsigned
char
digest
[
8
];
}
XXH64_canonical_t
;
XXH_PUBLIC_API
void
XXH64_canonicalFromHash
(
XXH64_canonical_t
*
dst
,
XXH64_hash_t
hash
);
XXH_PUBLIC_API
XXH64_hash_t
XXH64_hashFromCanonical
(
const
XXH64_canonical_t
*
src
);
#endif
/* XXH_NO_LONG_LONG */
#ifdef XXH_STATIC_LINKING_ONLY
/* ================================================================================================
This section contains declarations which are not guaranteed to remain stable.
They may change in future versions, becoming incompatible with a different version of the library.
These declarations should only be used with static linking.
Never use them in association with dynamic linking !
=================================================================================================== */
/* These definitions are only present to allow
* static allocation of XXH state, on stack or in a struct for example.
* Never **ever** use members directly. */
#if !defined (__VMS) \
&& (defined (__cplusplus) \
|| (defined (__STDC_VERSION__) && (__STDC_VERSION__ >= 199901L)
/* C99 */
) )
# include <stdint.h>
struct
XXH32_state_s
{
uint32_t
total_len_32
;
uint32_t
large_len
;
uint32_t
v1
;
uint32_t
v2
;
uint32_t
v3
;
uint32_t
v4
;
uint32_t
mem32
[
4
];
uint32_t
memsize
;
uint32_t
reserved
;
/* never read nor write, might be removed in a future version */
};
/* typedef'd to XXH32_state_t */
struct
XXH64_state_s
{
uint64_t
total_len
;
uint64_t
v1
;
uint64_t
v2
;
uint64_t
v3
;
uint64_t
v4
;
uint64_t
mem64
[
4
];
uint32_t
memsize
;
uint32_t
reserved
[
2
];
/* never read nor write, might be removed in a future version */
};
/* typedef'd to XXH64_state_t */
# else
struct
XXH32_state_s
{
unsigned
total_len_32
;
unsigned
large_len
;
unsigned
v1
;
unsigned
v2
;
unsigned
v3
;
unsigned
v4
;
unsigned
mem32
[
4
];
unsigned
memsize
;
unsigned
reserved
;
/* never read nor write, might be removed in a future version */
};
/* typedef'd to XXH32_state_t */
# ifndef XXH_NO_LONG_LONG
/* remove 64-bit support */
struct
XXH64_state_s
{
unsigned
long
long
total_len
;
unsigned
long
long
v1
;
unsigned
long
long
v2
;
unsigned
long
long
v3
;
unsigned
long
long
v4
;
unsigned
long
long
mem64
[
4
];
unsigned
memsize
;
unsigned
reserved
[
2
];
/* never read nor write, might be removed in a future version */
};
/* typedef'd to XXH64_state_t */
# endif
# endif
#if defined(XXH_INLINE_ALL) || defined(XXH_PRIVATE_API)
# include "xxhash.c"
/* include xxhash function bodies as `static`, for inlining */
#endif
#endif
/* XXH_STATIC_LINKING_ONLY */
#if defined (__cplusplus)
}
#endif
#endif
/* XXHASH_H_5627135585666179 */
source/client/src/clientHb.c
浏览文件 @
a7fe2cb3
...
@@ -347,7 +347,7 @@ int32_t hbBuildQueryDesc(SQueryHbReqBasic *hbBasic, STscObj *pObj) {
...
@@ -347,7 +347,7 @@ int32_t hbBuildQueryDesc(SQueryHbReqBasic *hbBasic, STscObj *pObj) {
continue
;
continue
;
}
}
if
(
pRequest
->
killed
)
{
if
(
pRequest
->
killed
||
0
==
pRequest
->
body
.
queryJob
)
{
releaseRequest
(
*
rid
);
releaseRequest
(
*
rid
);
pIter
=
taosHashIterate
(
pObj
->
pRequests
,
pIter
);
pIter
=
taosHashIterate
(
pObj
->
pRequests
,
pIter
);
continue
;
continue
;
...
...
source/client/src/clientTmq.c
浏览文件 @
a7fe2cb3
...
@@ -458,6 +458,7 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT
...
@@ -458,6 +458,7 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
return
-
1
;
}
}
pOffset
->
val
=
pVg
->
currentOffset
;
pOffset
->
val
=
pVg
->
currentOffset
;
int32_t
groupLen
=
strlen
(
tmq
->
groupId
);
int32_t
groupLen
=
strlen
(
tmq
->
groupId
);
...
@@ -471,11 +472,13 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT
...
@@ -471,11 +472,13 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT
if
(
code
<
0
)
{
if
(
code
<
0
)
{
return
-
1
;
return
-
1
;
}
}
void
*
buf
=
taosMemoryCalloc
(
1
,
sizeof
(
SMsgHead
)
+
len
);
void
*
buf
=
taosMemoryCalloc
(
1
,
sizeof
(
SMsgHead
)
+
len
);
if
(
buf
==
NULL
)
{
if
(
buf
==
NULL
)
{
taosMemoryFree
(
pOffset
);
taosMemoryFree
(
pOffset
);
return
-
1
;
return
-
1
;
}
}
((
SMsgHead
*
)
buf
)
->
vgId
=
htonl
(
pVg
->
vgId
);
((
SMsgHead
*
)
buf
)
->
vgId
=
htonl
(
pVg
->
vgId
);
void
*
abuf
=
POINTER_SHIFT
(
buf
,
sizeof
(
SMsgHead
));
void
*
abuf
=
POINTER_SHIFT
(
buf
,
sizeof
(
SMsgHead
));
...
@@ -492,6 +495,7 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT
...
@@ -492,6 +495,7 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT
taosMemoryFree
(
buf
);
taosMemoryFree
(
buf
);
return
-
1
;
return
-
1
;
}
}
pParam
->
params
=
pParamSet
;
pParam
->
params
=
pParamSet
;
pParam
->
pOffset
=
pOffset
;
pParam
->
pOffset
=
pOffset
;
...
@@ -503,14 +507,16 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT
...
@@ -503,14 +507,16 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT
taosMemoryFree
(
pParam
);
taosMemoryFree
(
pParam
);
return
-
1
;
return
-
1
;
}
}
pMsgSendInfo
->
msgInfo
=
(
SDataBuf
){
pMsgSendInfo
->
msgInfo
=
(
SDataBuf
){
.
pData
=
buf
,
.
pData
=
buf
,
.
len
=
sizeof
(
SMsgHead
)
+
len
,
.
len
=
sizeof
(
SMsgHead
)
+
len
,
.
handle
=
NULL
,
.
handle
=
NULL
,
};
};
tscDebug
(
"consumer:0x%"
PRIx64
" topic:%s on vgId:%d offset:%"
PRId64
,
tmq
->
consumerId
,
pOffset
->
subKey
,
SEp
*
pEp
=
&
pVg
->
epSet
.
eps
[
pVg
->
epSet
.
inUse
];
pVg
->
vgId
,
pOffset
->
val
.
version
);
tscDebug
(
"consumer:0x%"
PRIx64
" topic:%s on vgId:%d offset:%"
PRId64
" prev:%"
PRId64
", ep:%s:%d"
,
tmq
->
consumerId
,
pOffset
->
subKey
,
pVg
->
vgId
,
pOffset
->
val
.
version
,
pVg
->
committedOffset
.
version
,
pEp
->
fqdn
,
pEp
->
port
);
// TODO: put into cb
// TODO: put into cb
pVg
->
committedOffset
=
pVg
->
currentOffset
;
pVg
->
committedOffset
=
pVg
->
currentOffset
;
...
@@ -637,15 +643,16 @@ static int32_t tmqCommitConsumerImpl(tmq_t* tmq, int8_t automatic, int8_t async,
...
@@ -637,15 +643,16 @@ static int32_t tmqCommitConsumerImpl(tmq_t* tmq, int8_t automatic, int8_t async,
// init as 1 to prevent concurrency issue
// init as 1 to prevent concurrency issue
pParamSet
->
waitingRspNum
=
1
;
pParamSet
->
waitingRspNum
=
1
;
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
tmq
->
clientTopics
);
i
++
)
{
int32_t
numOfTopics
=
taosArrayGetSize
(
tmq
->
clientTopics
);
tscDebug
(
"consumer:0x%"
PRIx64
" start to commit offset for %d topics"
,
tmq
->
consumerId
,
numOfTopics
);
for
(
int32_t
i
=
0
;
i
<
numOfTopics
;
i
++
)
{
SMqClientTopic
*
pTopic
=
taosArrayGet
(
tmq
->
clientTopics
,
i
);
SMqClientTopic
*
pTopic
=
taosArrayGet
(
tmq
->
clientTopics
,
i
);
int32_t
numOfVgroups
=
taosArrayGetSize
(
pTopic
->
vgs
);
int32_t
numOfVgroups
=
taosArrayGetSize
(
pTopic
->
vgs
);
for
(
int32_t
j
=
0
;
j
<
numOfVgroups
;
j
++
)
{
for
(
int32_t
j
=
0
;
j
<
numOfVgroups
;
j
++
)
{
SMqClientVg
*
pVg
=
taosArrayGet
(
pTopic
->
vgs
,
j
);
SMqClientVg
*
pVg
=
taosArrayGet
(
pTopic
->
vgs
,
j
);
if
(
pVg
->
currentOffset
.
type
>
0
&&
!
tOffsetEqual
(
&
pVg
->
currentOffset
,
&
pVg
->
committedOffset
))
{
if
(
pVg
->
currentOffset
.
type
>
0
&&
!
tOffsetEqual
(
&
pVg
->
currentOffset
,
&
pVg
->
committedOffset
))
{
tscDebug
(
"consumer:0x%"
PRIx64
" topic:%s vgId:%d, current %"
PRId64
", committed %"
PRId64
,
tmq
->
consumerId
,
pTopic
->
topicName
,
pVg
->
vgId
,
pVg
->
currentOffset
.
version
,
pVg
->
committedOffset
.
version
);
if
(
tmqSendCommitReq
(
tmq
,
pVg
,
pTopic
,
pParamSet
)
<
0
)
{
if
(
tmqSendCommitReq
(
tmq
,
pVg
,
pTopic
,
pParamSet
)
<
0
)
{
continue
;
continue
;
}
}
...
@@ -1085,7 +1092,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
...
@@ -1085,7 +1092,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
}
}
tNameExtractFullName
(
&
name
,
topicFName
);
tNameExtractFullName
(
&
name
,
topicFName
);
tscDebug
(
"consumer:0x%"
PRIx64
"
,
subscribe topic: %s"
,
tmq
->
consumerId
,
topicFName
);
tscDebug
(
"consumer:0x%"
PRIx64
" subscribe topic: %s"
,
tmq
->
consumerId
,
topicFName
);
taosArrayPush
(
req
.
topicNames
,
&
topicFName
);
taosArrayPush
(
req
.
topicNames
,
&
topicFName
);
}
}
...
@@ -1398,7 +1405,7 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) {
...
@@ -1398,7 +1405,7 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) {
}
}
atomic_store_32
(
&
tmq
->
epoch
,
epoch
);
atomic_store_32
(
&
tmq
->
epoch
,
epoch
);
tscDebug
(
"consumer:0x%"
PRIx64
"
,
update topic info completed"
,
tmq
->
consumerId
);
tscDebug
(
"consumer:0x%"
PRIx64
" update topic info completed"
,
tmq
->
consumerId
);
return
set
;
return
set
;
}
}
...
@@ -1548,7 +1555,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool async) {
...
@@ -1548,7 +1555,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool async) {
sendInfo
->
msgType
=
TDMT_MND_TMQ_ASK_EP
;
sendInfo
->
msgType
=
TDMT_MND_TMQ_ASK_EP
;
SEpSet
epSet
=
getEpSet_s
(
&
tmq
->
pTscObj
->
pAppInfo
->
mgmtEp
);
SEpSet
epSet
=
getEpSet_s
(
&
tmq
->
pTscObj
->
pAppInfo
->
mgmtEp
);
tscDebug
(
"consumer:0x%"
PRIx64
" ask ep from mnode, async:%d
"
,
tmq
->
consumerId
,
async
);
tscDebug
(
"consumer:0x%"
PRIx64
" ask ep from mnode, async:%d
, reqId:0x%"
PRIx64
,
tmq
->
consumerId
,
async
,
tmq
->
consumerId
);
int64_t
transporterId
=
0
;
int64_t
transporterId
=
0
;
asyncSendMsgToServer
(
tmq
->
pTscObj
->
pAppInfo
->
pTransporter
,
&
epSet
,
&
transporterId
,
sendInfo
);
asyncSendMsgToServer
(
tmq
->
pTscObj
->
pAppInfo
->
pTransporter
,
&
epSet
,
&
transporterId
,
sendInfo
);
...
@@ -1759,6 +1766,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
...
@@ -1759,6 +1766,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
while
(
1
)
{
while
(
1
)
{
SMqRspWrapper
*
rspWrapper
=
NULL
;
SMqRspWrapper
*
rspWrapper
=
NULL
;
taosGetQitem
(
tmq
->
qall
,
(
void
**
)
&
rspWrapper
);
taosGetQitem
(
tmq
->
qall
,
(
void
**
)
&
rspWrapper
);
if
(
rspWrapper
==
NULL
)
{
if
(
rspWrapper
==
NULL
)
{
taosReadAllQitems
(
tmq
->
mqueue
,
tmq
->
qall
);
taosReadAllQitems
(
tmq
->
mqueue
,
tmq
->
qall
);
taosGetQitem
(
tmq
->
qall
,
(
void
**
)
&
rspWrapper
);
taosGetQitem
(
tmq
->
qall
,
(
void
**
)
&
rspWrapper
);
...
@@ -1881,7 +1889,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
...
@@ -1881,7 +1889,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
// in no topic status, delayed task also need to be processed
// in no topic status, delayed task also need to be processed
if
(
atomic_load_8
(
&
tmq
->
status
)
==
TMQ_CONSUMER_STATUS__INIT
)
{
if
(
atomic_load_8
(
&
tmq
->
status
)
==
TMQ_CONSUMER_STATUS__INIT
)
{
tscDebug
(
"consumer:0x%"
PRIx64
", poll return since consumer
status
is init"
,
tmq
->
consumerId
);
tscDebug
(
"consumer:0x%"
PRIx64
", poll return since consumer is init"
,
tmq
->
consumerId
);
return
NULL
;
return
NULL
;
}
}
...
...
source/client/test/clientTests.cpp
浏览文件 @
a7fe2cb3
...
@@ -912,7 +912,7 @@ TEST(clientCase, subscription_test) {
...
@@ -912,7 +912,7 @@ TEST(clientCase, subscription_test) {
tmq_conf_t
*
conf
=
tmq_conf_new
();
tmq_conf_t
*
conf
=
tmq_conf_new
();
tmq_conf_set
(
conf
,
"enable.auto.commit"
,
"true"
);
tmq_conf_set
(
conf
,
"enable.auto.commit"
,
"true"
);
tmq_conf_set
(
conf
,
"auto.commit.interval.ms"
,
"1000"
);
tmq_conf_set
(
conf
,
"auto.commit.interval.ms"
,
"1000"
);
tmq_conf_set
(
conf
,
"group.id"
,
"
newabcdefgjhijlm__
"
);
tmq_conf_set
(
conf
,
"group.id"
,
"
consumer_group
"
);
tmq_conf_set
(
conf
,
"td.connect.user"
,
"root"
);
tmq_conf_set
(
conf
,
"td.connect.user"
,
"root"
);
tmq_conf_set
(
conf
,
"td.connect.pass"
,
"taosdata"
);
tmq_conf_set
(
conf
,
"td.connect.pass"
,
"taosdata"
);
tmq_conf_set
(
conf
,
"auto.offset.reset"
,
"earliest"
);
tmq_conf_set
(
conf
,
"auto.offset.reset"
,
"earliest"
);
...
...
source/dnode/mnode/impl/inc/mndInt.h
浏览文件 @
a7fe2cb3
...
@@ -58,7 +58,7 @@ typedef int32_t (*MndInitFp)(SMnode *pMnode);
...
@@ -58,7 +58,7 @@ typedef int32_t (*MndInitFp)(SMnode *pMnode);
typedef
void
(
*
MndCleanupFp
)(
SMnode
*
pMnode
);
typedef
void
(
*
MndCleanupFp
)(
SMnode
*
pMnode
);
typedef
int32_t
(
*
ShowRetrieveFp
)(
SRpcMsg
*
pMsg
,
SShowObj
*
pShow
,
SSDataBlock
*
pBlock
,
int32_t
rows
);
typedef
int32_t
(
*
ShowRetrieveFp
)(
SRpcMsg
*
pMsg
,
SShowObj
*
pShow
,
SSDataBlock
*
pBlock
,
int32_t
rows
);
typedef
void
(
*
ShowFreeIterFp
)(
SMnode
*
pMnode
,
void
*
pIter
);
typedef
void
(
*
ShowFreeIterFp
)(
SMnode
*
pMnode
,
void
*
pIter
);
typedef
struct
SQWorker
SQHandle
;
typedef
struct
SQ
ueue
Worker
SQHandle
;
typedef
struct
{
typedef
struct
{
const
char
*
name
;
const
char
*
name
;
...
...
source/dnode/qnode/inc/qndInt.h
浏览文件 @
a7fe2cb3
...
@@ -29,7 +29,7 @@
...
@@ -29,7 +29,7 @@
extern
"C"
{
extern
"C"
{
#endif
#endif
typedef
struct
SQWorker
SQHandle
;
typedef
struct
SQ
ueue
Worker
SQHandle
;
typedef
struct
SQnode
{
typedef
struct
SQnode
{
int32_t
qndId
;
int32_t
qndId
;
...
...
source/dnode/vnode/src/inc/vnodeInt.h
浏览文件 @
a7fe2cb3
...
@@ -58,7 +58,7 @@ typedef struct STQ STQ;
...
@@ -58,7 +58,7 @@ typedef struct STQ STQ;
typedef
struct
SVState
SVState
;
typedef
struct
SVState
SVState
;
typedef
struct
SVStatis
SVStatis
;
typedef
struct
SVStatis
SVStatis
;
typedef
struct
SVBufPool
SVBufPool
;
typedef
struct
SVBufPool
SVBufPool
;
typedef
struct
SQ
Worker
SQHandle
;
typedef
struct
SQ
ueueWorker
SQHandle
;
typedef
struct
STsdbKeepCfg
STsdbKeepCfg
;
typedef
struct
STsdbKeepCfg
STsdbKeepCfg
;
typedef
struct
SMetaSnapReader
SMetaSnapReader
;
typedef
struct
SMetaSnapReader
SMetaSnapReader
;
typedef
struct
SMetaSnapWriter
SMetaSnapWriter
;
typedef
struct
SMetaSnapWriter
SMetaSnapWriter
;
...
...
source/libs/qworker/inc/qwInt.h
浏览文件 @
a7fe2cb3
...
@@ -76,6 +76,7 @@ typedef struct SQWDebug {
...
@@ -76,6 +76,7 @@ typedef struct SQWDebug {
bool
lockEnable
;
bool
lockEnable
;
bool
statusEnable
;
bool
statusEnable
;
bool
dumpEnable
;
bool
dumpEnable
;
bool
forceStop
;
bool
sleepSimulate
;
bool
sleepSimulate
;
bool
deadSimulate
;
bool
deadSimulate
;
bool
redirectSimulate
;
bool
redirectSimulate
;
...
@@ -248,6 +249,7 @@ typedef struct SQWorkerMgmt {
...
@@ -248,6 +249,7 @@ typedef struct SQWorkerMgmt {
#define QW_QUERY_RUNNING(ctx) (QW_GET_PHASE(ctx) == QW_PHASE_PRE_QUERY || QW_GET_PHASE(ctx) == QW_PHASE_PRE_CQUERY)
#define QW_QUERY_RUNNING(ctx) (QW_GET_PHASE(ctx) == QW_PHASE_PRE_QUERY || QW_GET_PHASE(ctx) == QW_PHASE_PRE_CQUERY)
#define QW_FETCH_RUNNING(ctx) ((ctx)->inFetch)
#define QW_FETCH_RUNNING(ctx) ((ctx)->inFetch)
#define QW_QUERY_NOT_STARTED(ctx) (QW_GET_PHASE(ctx) == -1)
#define QW_SET_QTID(id, qId, tId, eId) \
#define QW_SET_QTID(id, qId, tId, eId) \
do { \
do { \
...
...
source/libs/qworker/src/qwDbg.c
浏览文件 @
a7fe2cb3
...
@@ -9,11 +9,13 @@
...
@@ -9,11 +9,13 @@
#include "tmsg.h"
#include "tmsg.h"
#include "tname.h"
#include "tname.h"
SQWDebug
gQWDebug
=
{.
statusEnable
=
true
,
SQWDebug
gQWDebug
=
{.
lockEnable
=
false
,
.
statusEnable
=
true
,
.
dumpEnable
=
false
,
.
dumpEnable
=
false
,
.
redirectSimulate
=
false
,
.
redirectSimulate
=
false
,
.
deadSimulate
=
false
,
.
deadSimulate
=
false
,
.
sleepSimulate
=
false
};
.
sleepSimulate
=
false
,
.
forceStop
=
false
};
int32_t
qwDbgValidateStatus
(
QW_FPARAMS_DEF
,
int8_t
oriStatus
,
int8_t
newStatus
,
bool
*
ignore
)
{
int32_t
qwDbgValidateStatus
(
QW_FPARAMS_DEF
,
int8_t
oriStatus
,
int8_t
newStatus
,
bool
*
ignore
)
{
if
(
!
gQWDebug
.
statusEnable
)
{
if
(
!
gQWDebug
.
statusEnable
)
{
...
@@ -306,6 +308,12 @@ int32_t qwDbgEnableDebug(char *option) {
...
@@ -306,6 +308,12 @@ int32_t qwDbgEnableDebug(char *option) {
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
if
(
0
==
strcasecmp
(
option
,
"forceStop"
))
{
gQWDebug
.
forceStop
=
true
;
qError
(
"qw forceStop debug enabled"
);
return
TSDB_CODE_SUCCESS
;
}
qError
(
"invalid qw debug option:%s"
,
option
);
qError
(
"invalid qw debug option:%s"
,
option
);
return
TSDB_CODE_APP_ERROR
;
return
TSDB_CODE_APP_ERROR
;
...
...
source/libs/qworker/src/qworker.c
浏览文件 @
a7fe2cb3
...
@@ -18,6 +18,51 @@ SQWorkerMgmt gQwMgmt = {
...
@@ -18,6 +18,51 @@ SQWorkerMgmt gQwMgmt = {
.
qwNum
=
0
,
.
qwNum
=
0
,
};
};
int32_t
qwStopAllTasks
(
SQWorker
*
mgmt
)
{
uint64_t
qId
,
tId
,
sId
;
int32_t
eId
;
int64_t
rId
=
0
;
void
*
pIter
=
taosHashIterate
(
mgmt
->
ctxHash
,
NULL
);
while
(
pIter
)
{
SQWTaskCtx
*
ctx
=
(
SQWTaskCtx
*
)
pIter
;
void
*
key
=
taosHashGetKey
(
pIter
,
NULL
);
QW_GET_QTID
(
key
,
qId
,
tId
,
eId
);
QW_LOCK
(
QW_WRITE
,
&
ctx
->
lock
);
sId
=
ctx
->
sId
;
QW_TASK_DLOG_E
(
"start to force stop task"
);
if
(
QW_EVENT_RECEIVED
(
ctx
,
QW_EVENT_DROP
)
||
QW_EVENT_PROCESSED
(
ctx
,
QW_EVENT_DROP
))
{
QW_TASK_WLOG_E
(
"task already dropping"
);
QW_UNLOCK
(
QW_WRITE
,
&
ctx
->
lock
);
pIter
=
taosHashIterate
(
mgmt
->
ctxHash
,
pIter
);
continue
;
}
if
(
QW_QUERY_RUNNING
(
ctx
))
{
qwKillTaskHandle
(
ctx
,
TSDB_CODE_VND_STOPPED
);
QW_TASK_DLOG_E
(
"task running, async killed"
);
}
else
if
(
QW_FETCH_RUNNING
(
ctx
))
{
QW_UPDATE_RSP_CODE
(
ctx
,
TSDB_CODE_VND_STOPPED
);
QW_SET_EVENT_RECEIVED
(
ctx
,
QW_EVENT_DROP
);
QW_TASK_DLOG_E
(
"task fetching, update drop received"
);
}
else
{
qwDropTask
(
QW_FPARAMS
());
}
QW_UNLOCK
(
QW_WRITE
,
&
ctx
->
lock
);
pIter
=
taosHashIterate
(
mgmt
->
ctxHash
,
pIter
);
}
return
TSDB_CODE_SUCCESS
;
}
int32_t
qwProcessHbLinkBroken
(
SQWorker
*
mgmt
,
SQWMsg
*
qwMsg
,
SSchedulerHbReq
*
req
)
{
int32_t
qwProcessHbLinkBroken
(
SQWorker
*
mgmt
,
SQWMsg
*
qwMsg
,
SSchedulerHbReq
*
req
)
{
int32_t
code
=
0
;
int32_t
code
=
0
;
SSchedulerHbRsp
rsp
=
{
0
};
SSchedulerHbRsp
rsp
=
{
0
};
...
@@ -973,6 +1018,10 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) {
...
@@ -973,6 +1018,10 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) {
qwDbgDumpMgmtInfo
(
mgmt
);
qwDbgDumpMgmtInfo
(
mgmt
);
if
(
gQWDebug
.
forceStop
)
{
(
void
)
qwStopAllTasks
(
mgmt
);
}
QW_LOCK
(
QW_READ
,
&
mgmt
->
schLock
);
QW_LOCK
(
QW_READ
,
&
mgmt
->
schLock
);
int32_t
schNum
=
taosHashGetSize
(
mgmt
->
schHash
);
int32_t
schNum
=
taosHashGetSize
(
mgmt
->
schHash
);
...
@@ -1087,6 +1136,7 @@ _return:
...
@@ -1087,6 +1136,7 @@ _return:
QW_RET
(
TSDB_CODE_SUCCESS
);
QW_RET
(
TSDB_CODE_SUCCESS
);
}
}
int32_t
qWorkerInit
(
int8_t
nodeType
,
int32_t
nodeId
,
void
**
qWorkerMgmt
,
const
SMsgCb
*
pMsgCb
)
{
int32_t
qWorkerInit
(
int8_t
nodeType
,
int32_t
nodeId
,
void
**
qWorkerMgmt
,
const
SMsgCb
*
pMsgCb
)
{
if
(
NULL
==
qWorkerMgmt
||
(
pMsgCb
&&
pMsgCb
->
mgmt
==
NULL
))
{
if
(
NULL
==
qWorkerMgmt
||
(
pMsgCb
&&
pMsgCb
->
mgmt
==
NULL
))
{
qError
(
"invalid param to init qworker"
);
qError
(
"invalid param to init qworker"
);
...
@@ -1185,46 +1235,10 @@ void qWorkerStopAllTasks(void *qWorkerMgmt) {
...
@@ -1185,46 +1235,10 @@ void qWorkerStopAllTasks(void *qWorkerMgmt) {
SQWorker
*
mgmt
=
(
SQWorker
*
)
qWorkerMgmt
;
SQWorker
*
mgmt
=
(
SQWorker
*
)
qWorkerMgmt
;
QW_DLOG
(
"start to stop all tasks, taskNum:%d"
,
taosHashGetSize
(
mgmt
->
ctxHash
));
QW_DLOG
(
"start to stop all tasks, taskNum:%d"
,
taosHashGetSize
(
mgmt
->
ctxHash
));
uint64_t
qId
,
tId
,
sId
;
int32_t
eId
;
int64_t
rId
=
0
;
atomic_store_8
(
&
mgmt
->
nodeStopped
,
1
);
atomic_store_8
(
&
mgmt
->
nodeStopped
,
1
);
void
*
pIter
=
taosHashIterate
(
mgmt
->
ctxHash
,
NULL
);
(
void
)
qwStopAllTasks
(
mgmt
);
while
(
pIter
)
{
SQWTaskCtx
*
ctx
=
(
SQWTaskCtx
*
)
pIter
;
void
*
key
=
taosHashGetKey
(
pIter
,
NULL
);
QW_GET_QTID
(
key
,
qId
,
tId
,
eId
);
QW_LOCK
(
QW_WRITE
,
&
ctx
->
lock
);
sId
=
ctx
->
sId
;
QW_TASK_DLOG_E
(
"start to force stop task"
);
if
(
QW_EVENT_RECEIVED
(
ctx
,
QW_EVENT_DROP
)
||
QW_EVENT_PROCESSED
(
ctx
,
QW_EVENT_DROP
))
{
QW_TASK_WLOG_E
(
"task already dropping"
);
QW_UNLOCK
(
QW_WRITE
,
&
ctx
->
lock
);
pIter
=
taosHashIterate
(
mgmt
->
ctxHash
,
pIter
);
continue
;
}
if
(
QW_QUERY_RUNNING
(
ctx
))
{
qwKillTaskHandle
(
ctx
,
TSDB_CODE_VND_STOPPED
);
}
else
if
(
QW_FETCH_RUNNING
(
ctx
))
{
QW_UPDATE_RSP_CODE
(
ctx
,
TSDB_CODE_VND_STOPPED
);
QW_SET_EVENT_RECEIVED
(
ctx
,
QW_EVENT_DROP
);
}
else
{
qwDropTask
(
QW_FPARAMS
());
}
QW_UNLOCK
(
QW_WRITE
,
&
ctx
->
lock
);
pIter
=
taosHashIterate
(
mgmt
->
ctxHash
,
pIter
);
}
}
}
void
qWorkerDestroy
(
void
**
qWorkerMgmt
)
{
void
qWorkerDestroy
(
void
**
qWorkerMgmt
)
{
...
...
source/libs/transport/src/transCli.c
浏览文件 @
a7fe2cb3
...
@@ -727,7 +727,7 @@ static SCliConn* cliCreateConn(SCliThrd* pThrd) {
...
@@ -727,7 +727,7 @@ static SCliConn* cliCreateConn(SCliThrd* pThrd) {
QUEUE_INIT
(
&
conn
->
q
);
QUEUE_INIT
(
&
conn
->
q
);
conn
->
hostThrd
=
pThrd
;
conn
->
hostThrd
=
pThrd
;
conn
->
status
=
ConnNormal
;
conn
->
status
=
ConnNormal
;
conn
->
broken
=
0
;
conn
->
broken
=
false
;
transRefCliHandle
(
conn
);
transRefCliHandle
(
conn
);
atomic_add_fetch_32
(
&
pThrd
->
connCount
,
1
);
atomic_add_fetch_32
(
&
pThrd
->
connCount
,
1
);
...
@@ -997,6 +997,11 @@ static void cliDestroyBatch(SCliBatch* pBatch) {
...
@@ -997,6 +997,11 @@ static void cliDestroyBatch(SCliBatch* pBatch) {
taosMemoryFree
(
pBatch
);
taosMemoryFree
(
pBatch
);
}
}
static
void
cliHandleBatchReq
(
SCliBatch
*
pBatch
,
SCliThrd
*
pThrd
)
{
static
void
cliHandleBatchReq
(
SCliBatch
*
pBatch
,
SCliThrd
*
pThrd
)
{
if
(
pThrd
->
quit
==
true
)
{
cliDestroyBatch
(
pBatch
);
return
;
}
if
(
pBatch
==
NULL
||
pBatch
->
wLen
==
0
||
QUEUE_IS_EMPTY
(
&
pBatch
->
wq
))
{
if
(
pBatch
==
NULL
||
pBatch
->
wLen
==
0
||
QUEUE_IS_EMPTY
(
&
pBatch
->
wq
))
{
return
;
return
;
}
}
...
@@ -1082,17 +1087,23 @@ static void cliSendBatchCb(uv_write_t* req, int status) {
...
@@ -1082,17 +1087,23 @@ static void cliSendBatchCb(uv_write_t* req, int status) {
if
(
status
!=
0
)
{
if
(
status
!=
0
)
{
tDebug
(
"%s conn %p failed to send batch msg, batch size:%d, msgLen:%d, reason:%s"
,
CONN_GET_INST_LABEL
(
conn
),
conn
,
tDebug
(
"%s conn %p failed to send batch msg, batch size:%d, msgLen:%d, reason:%s"
,
CONN_GET_INST_LABEL
(
conn
),
conn
,
p
->
wLen
,
p
->
batchSize
,
uv_err_name
(
status
));
p
->
wLen
,
p
->
batchSize
,
uv_err_name
(
status
));
cliHandleExcept
(
conn
);
if
(
!
uv_is_closing
((
uv_handle_t
*
)
&
conn
->
stream
))
cliHandleExcept
(
conn
);
cliHandleBatchReq
(
nxtBatch
,
thrd
);
cliHandleBatchReq
(
nxtBatch
,
thrd
);
}
else
{
}
else
{
tDebug
(
"%s conn %p succ to send batch msg, batch size:%d, msgLen:%d"
,
CONN_GET_INST_LABEL
(
conn
),
conn
,
p
->
wLen
,
tDebug
(
"%s conn %p succ to send batch msg, batch size:%d, msgLen:%d"
,
CONN_GET_INST_LABEL
(
conn
),
conn
,
p
->
wLen
,
p
->
batchSize
);
p
->
batchSize
);
if
(
!
uv_is_closing
((
uv_handle_t
*
)
&
conn
->
stream
))
{
if
(
nxtBatch
!=
NULL
)
{
if
(
nxtBatch
!=
NULL
)
{
conn
->
pBatch
=
nxtBatch
;
conn
->
pBatch
=
nxtBatch
;
cliSendBatch
(
conn
);
cliSendBatch
(
conn
);
}
else
{
addConnToPool
(
thrd
->
pool
,
conn
);
}
}
else
{
}
else
{
addConnToPool
(
thrd
->
pool
,
conn
);
cliDestroyBatch
(
nxtBatch
);
// conn release by other callback
}
}
}
}
...
@@ -1454,6 +1465,11 @@ static void cliNoBatchDealReq(queue* wq, SCliThrd* pThrd) {
...
@@ -1454,6 +1465,11 @@ static void cliNoBatchDealReq(queue* wq, SCliThrd* pThrd) {
QUEUE_REMOVE
(
h
);
QUEUE_REMOVE
(
h
);
SCliMsg
*
pMsg
=
QUEUE_DATA
(
h
,
SCliMsg
,
q
);
SCliMsg
*
pMsg
=
QUEUE_DATA
(
h
,
SCliMsg
,
q
);
if
(
pMsg
->
type
==
Quit
)
{
pThrd
->
stopMsg
=
pMsg
;
continue
;
}
(
*
cliAsyncHandle
[
pMsg
->
type
])(
pMsg
,
pThrd
);
(
*
cliAsyncHandle
[
pMsg
->
type
])(
pMsg
,
pThrd
);
count
++
;
count
++
;
...
@@ -1485,6 +1501,12 @@ static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) {
...
@@ -1485,6 +1501,12 @@ static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) {
QUEUE_REMOVE
(
h
);
QUEUE_REMOVE
(
h
);
SCliMsg
*
pMsg
=
QUEUE_DATA
(
h
,
SCliMsg
,
q
);
SCliMsg
*
pMsg
=
QUEUE_DATA
(
h
,
SCliMsg
,
q
);
if
(
pMsg
->
type
==
Quit
)
{
pThrd
->
stopMsg
=
pMsg
;
continue
;
}
if
(
pMsg
->
type
==
Normal
&&
REQUEST_NO_RESP
(
&
pMsg
->
msg
))
{
if
(
pMsg
->
type
==
Normal
&&
REQUEST_NO_RESP
(
&
pMsg
->
msg
))
{
STransConnCtx
*
pCtx
=
pMsg
->
ctx
;
STransConnCtx
*
pCtx
=
pMsg
->
ctx
;
...
@@ -1582,7 +1604,6 @@ static void cliAsyncCb(uv_async_t* handle) {
...
@@ -1582,7 +1604,6 @@ static void cliAsyncCb(uv_async_t* handle) {
SCliThrd
*
pThrd
=
item
->
pThrd
;
SCliThrd
*
pThrd
=
item
->
pThrd
;
STrans
*
pTransInst
=
pThrd
->
pTransInst
;
STrans
*
pTransInst
=
pThrd
->
pTransInst
;
SCliMsg
*
pMsg
=
NULL
;
// batch process to avoid to lock/unlock frequently
// batch process to avoid to lock/unlock frequently
queue
wq
;
queue
wq
;
taosThreadMutexLock
(
&
item
->
mtx
);
taosThreadMutexLock
(
&
item
->
mtx
);
...
@@ -2285,22 +2306,8 @@ int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STran
...
@@ -2285,22 +2306,8 @@ int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STran
transReleaseExHandle
(
transGetInstMgt
(),
(
int64_t
)
shandle
);
transReleaseExHandle
(
transGetInstMgt
(),
(
int64_t
)
shandle
);
return
TSDB_CODE_RPC_BROKEN_LINK
;
return
TSDB_CODE_RPC_BROKEN_LINK
;
}
}
/*if (pTransInst->connLimitNum > 0 && REQUEST_NO_RESP(pReq)) {
char key[TSDB_FQDN_LEN + 64] = {0};
char* ip = EPSET_GET_INUSE_IP((SEpSet*)pEpSet);
uint16_t port = EPSET_GET_INUSE_PORT((SEpSet*)pEpSet);
CONN_CONSTRUCT_HASH_KEY(key, ip, port);
int32_t* val = taosHashGet(pThrd->connLimitCache, key, strlen(key));
if (val != NULL && *val >= pTransInst->connLimitNum) {
transFreeMsg(pReq->pCont);
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
return TSDB_CODE_RPC_MAX_SESSIONS;
}
}*/
TRACE_SET_MSGID
(
&
pReq
->
info
.
traceId
,
tGenIdPI64
());
TRACE_SET_MSGID
(
&
pReq
->
info
.
traceId
,
tGenIdPI64
());
STransConnCtx
*
pCtx
=
taosMemoryCalloc
(
1
,
sizeof
(
STransConnCtx
));
STransConnCtx
*
pCtx
=
taosMemoryCalloc
(
1
,
sizeof
(
STransConnCtx
));
pCtx
->
epSet
=
*
pEpSet
;
pCtx
->
epSet
=
*
pEpSet
;
pCtx
->
ahandle
=
pReq
->
info
.
ahandle
;
pCtx
->
ahandle
=
pReq
->
info
.
ahandle
;
...
...
source/util/src/tcompression.c
浏览文件 @
a7fe2cb3
...
@@ -327,23 +327,26 @@ int32_t tsDecompressINTImp(const char *const input, const int32_t nelements, cha
...
@@ -327,23 +327,26 @@ int32_t tsDecompressINTImp(const char *const input, const int32_t nelements, cha
// calculate the cumulative sum (prefix sum) for each number
// calculate the cumulative sum (prefix sum) for each number
// decode[0] = prev_value + final[0]
// decode[0] = prev_value + final[0]
// decode[1] = decode[0] + final[1] -----> prev_value + final[0] + final[1]
// decode[1] = decode[0] + final[1] -----> prev_value + final[0] + final[1]
// decode[2] = decode[1] + final[
1
] -----> prev_value + final[0] + final[1] + final[2]
// decode[2] = decode[1] + final[
2
] -----> prev_value + final[0] + final[1] + final[2]
// decode[3] = decode[2] + final[
1
] -----> prev_value + final[0] + final[1] + final[2] + final[3]
// decode[3] = decode[2] + final[
3
] -----> prev_value + final[0] + final[1] + final[2] + final[3]
// 1, 2, 3, 4
// 1, 2, 3, 4
//+ 0, 1,
2
, 3
//+ 0, 1,
0
, 3
// 1, 3,
5
, 7
// 1, 3,
3
, 7
// shift and add for the first round
// shift and add for the first round
__m128i
prev
=
_mm_set1_epi64x
(
prev_value
);
__m128i
prev
=
_mm_set1_epi64x
(
prev_value
);
delta
=
_mm256_add_epi64
(
delta
,
_mm256_slli_si256
(
delta
,
8
));
__m256i
x
=
_mm256_slli_si256
(
delta
,
8
);
delta
=
_mm256_add_epi64
(
delta
,
x
);
_mm256_storeu_si256
((
__m256i
*
)
&
p
[
_pos
],
delta
);
_mm256_storeu_si256
((
__m256i
*
)
&
p
[
_pos
],
delta
);
// 1, 3,
5
, 7
// 1, 3,
3
, 7
//+ 0, 0,
1
, 3
//+ 0, 0,
3
, 3
// 1, 3, 6, 10
// 1, 3, 6, 10
// shift and add operation for the second round
// shift and add operation for the second round
__m128i
firstPart
=
_mm_loadu_si128
((
__m128i
*
)
&
p
[
_pos
]);
__m128i
firstPart
=
_mm_loadu_si128
((
__m128i
*
)
&
p
[
_pos
]);
__m128i
secPart
=
_mm_add_epi64
(
_mm_loadu_si128
((
__m128i
*
)
&
p
[
_pos
+
2
]),
firstPart
);
__m128i
secondItem
=
_mm_set1_epi64x
(
p
[
_pos
+
1
]);
__m128i
secPart
=
_mm_add_epi64
(
_mm_loadu_si128
((
__m128i
*
)
&
p
[
_pos
+
2
]),
secondItem
);
firstPart
=
_mm_add_epi64
(
firstPart
,
prev
);
firstPart
=
_mm_add_epi64
(
firstPart
,
prev
);
secPart
=
_mm_add_epi64
(
secPart
,
prev
);
secPart
=
_mm_add_epi64
(
secPart
,
prev
);
...
@@ -353,15 +356,18 @@ int32_t tsDecompressINTImp(const char *const input, const int32_t nelements, cha
...
@@ -353,15 +356,18 @@ int32_t tsDecompressINTImp(const char *const input, const int32_t nelements, cha
shiftBits
=
_mm256_add_epi64
(
shiftBits
,
inc
);
shiftBits
=
_mm256_add_epi64
(
shiftBits
,
inc
);
prev_value
=
p
[
_pos
+
3
];
prev_value
=
p
[
_pos
+
3
];
// uDebug("_pos:%d %"PRId64", %"PRId64", %"PRId64", %"PRId64, _pos, p[_pos], p[_pos+1], p[_pos+2], p[_pos+3]);
_pos
+=
4
;
_pos
+=
4
;
}
}
// handle the remain value
// handle the remain value
for
(
int32_t
i
=
0
;
i
<
remain
;
i
++
)
{
for
(
int32_t
i
=
0
;
i
<
remain
;
i
++
)
{
zigzag_value
=
((
w
>>
(
v
+
(
batch
*
bit
)))
&
mask
);
zigzag_value
=
((
w
>>
(
v
+
(
batch
*
bit
*
4
)))
&
mask
);
prev_value
+=
ZIGZAG_DECODE
(
int64_t
,
zigzag_value
);
prev_value
+=
ZIGZAG_DECODE
(
int64_t
,
zigzag_value
);
p
[
_pos
++
]
=
prev_value
;
p
[
_pos
++
]
=
prev_value
;
// uDebug("_pos:%d %"PRId64, _pos-1, p[_pos-1]);
v
+=
bit
;
v
+=
bit
;
}
}
}
else
{
}
else
{
...
@@ -370,6 +376,8 @@ int32_t tsDecompressINTImp(const char *const input, const int32_t nelements, cha
...
@@ -370,6 +376,8 @@ int32_t tsDecompressINTImp(const char *const input, const int32_t nelements, cha
prev_value
+=
ZIGZAG_DECODE
(
int64_t
,
zigzag_value
);
prev_value
+=
ZIGZAG_DECODE
(
int64_t
,
zigzag_value
);
p
[
_pos
++
]
=
prev_value
;
p
[
_pos
++
]
=
prev_value
;
// uDebug("_pos:%d %"PRId64, _pos-1, p[_pos-1]);
v
+=
bit
;
v
+=
bit
;
}
}
}
}
...
...
source/util/src/tworker.c
浏览文件 @
a7fe2cb3
...
@@ -22,7 +22,7 @@ typedef void *(*ThreadFp)(void *param);
...
@@ -22,7 +22,7 @@ typedef void *(*ThreadFp)(void *param);
int32_t
tQWorkerInit
(
SQWorkerPool
*
pool
)
{
int32_t
tQWorkerInit
(
SQWorkerPool
*
pool
)
{
pool
->
qset
=
taosOpenQset
();
pool
->
qset
=
taosOpenQset
();
pool
->
workers
=
taosMemoryCalloc
(
pool
->
max
,
sizeof
(
SQWorker
));
pool
->
workers
=
taosMemoryCalloc
(
pool
->
max
,
sizeof
(
SQ
ueue
Worker
));
if
(
pool
->
workers
==
NULL
)
{
if
(
pool
->
workers
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
return
-
1
;
...
@@ -31,7 +31,7 @@ int32_t tQWorkerInit(SQWorkerPool *pool) {
...
@@ -31,7 +31,7 @@ int32_t tQWorkerInit(SQWorkerPool *pool) {
(
void
)
taosThreadMutexInit
(
&
pool
->
mutex
,
NULL
);
(
void
)
taosThreadMutexInit
(
&
pool
->
mutex
,
NULL
);
for
(
int32_t
i
=
0
;
i
<
pool
->
max
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
pool
->
max
;
++
i
)
{
SQWorker
*
worker
=
pool
->
workers
+
i
;
SQ
ueue
Worker
*
worker
=
pool
->
workers
+
i
;
worker
->
id
=
i
;
worker
->
id
=
i
;
worker
->
pool
=
pool
;
worker
->
pool
=
pool
;
}
}
...
@@ -42,14 +42,14 @@ int32_t tQWorkerInit(SQWorkerPool *pool) {
...
@@ -42,14 +42,14 @@ int32_t tQWorkerInit(SQWorkerPool *pool) {
void
tQWorkerCleanup
(
SQWorkerPool
*
pool
)
{
void
tQWorkerCleanup
(
SQWorkerPool
*
pool
)
{
for
(
int32_t
i
=
0
;
i
<
pool
->
max
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
pool
->
max
;
++
i
)
{
SQWorker
*
worker
=
pool
->
workers
+
i
;
SQ
ueue
Worker
*
worker
=
pool
->
workers
+
i
;
if
(
taosCheckPthreadValid
(
worker
->
thread
))
{
if
(
taosCheckPthreadValid
(
worker
->
thread
))
{
taosQsetThreadResume
(
pool
->
qset
);
taosQsetThreadResume
(
pool
->
qset
);
}
}
}
}
for
(
int32_t
i
=
0
;
i
<
pool
->
max
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
pool
->
max
;
++
i
)
{
SQWorker
*
worker
=
pool
->
workers
+
i
;
SQ
ueue
Worker
*
worker
=
pool
->
workers
+
i
;
if
(
taosCheckPthreadValid
(
worker
->
thread
))
{
if
(
taosCheckPthreadValid
(
worker
->
thread
))
{
uInfo
(
"worker:%s:%d is stopping"
,
pool
->
name
,
worker
->
id
);
uInfo
(
"worker:%s:%d is stopping"
,
pool
->
name
,
worker
->
id
);
taosThreadJoin
(
worker
->
thread
,
NULL
);
taosThreadJoin
(
worker
->
thread
,
NULL
);
...
@@ -65,7 +65,7 @@ void tQWorkerCleanup(SQWorkerPool *pool) {
...
@@ -65,7 +65,7 @@ void tQWorkerCleanup(SQWorkerPool *pool) {
uInfo
(
"worker:%s is closed"
,
pool
->
name
);
uInfo
(
"worker:%s is closed"
,
pool
->
name
);
}
}
static
void
*
tQWorkerThreadFp
(
SQWorker
*
worker
)
{
static
void
*
tQWorkerThreadFp
(
SQ
ueue
Worker
*
worker
)
{
SQWorkerPool
*
pool
=
worker
->
pool
;
SQWorkerPool
*
pool
=
worker
->
pool
;
SQueueInfo
qinfo
=
{
0
};
SQueueInfo
qinfo
=
{
0
};
void
*
msg
=
NULL
;
void
*
msg
=
NULL
;
...
@@ -106,7 +106,7 @@ STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp) {
...
@@ -106,7 +106,7 @@ STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp) {
// spawn a thread to process queue
// spawn a thread to process queue
if
(
pool
->
num
<
pool
->
max
)
{
if
(
pool
->
num
<
pool
->
max
)
{
do
{
do
{
SQWorker
*
worker
=
pool
->
workers
+
pool
->
num
;
SQ
ueue
Worker
*
worker
=
pool
->
workers
+
pool
->
num
;
TdThreadAttr
thAttr
;
TdThreadAttr
thAttr
;
taosThreadAttrInit
(
&
thAttr
);
taosThreadAttrInit
(
&
thAttr
);
...
@@ -138,7 +138,7 @@ void tQWorkerFreeQueue(SQWorkerPool *pool, STaosQueue *queue) {
...
@@ -138,7 +138,7 @@ void tQWorkerFreeQueue(SQWorkerPool *pool, STaosQueue *queue) {
int32_t
tAutoQWorkerInit
(
SAutoQWorkerPool
*
pool
)
{
int32_t
tAutoQWorkerInit
(
SAutoQWorkerPool
*
pool
)
{
pool
->
qset
=
taosOpenQset
();
pool
->
qset
=
taosOpenQset
();
pool
->
workers
=
taosArrayInit
(
2
,
sizeof
(
SQWorker
*
));
pool
->
workers
=
taosArrayInit
(
2
,
sizeof
(
SQ
ueue
Worker
*
));
if
(
pool
->
workers
==
NULL
)
{
if
(
pool
->
workers
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
return
-
1
;
...
@@ -153,14 +153,14 @@ int32_t tAutoQWorkerInit(SAutoQWorkerPool *pool) {
...
@@ -153,14 +153,14 @@ int32_t tAutoQWorkerInit(SAutoQWorkerPool *pool) {
void
tAutoQWorkerCleanup
(
SAutoQWorkerPool
*
pool
)
{
void
tAutoQWorkerCleanup
(
SAutoQWorkerPool
*
pool
)
{
int32_t
size
=
taosArrayGetSize
(
pool
->
workers
);
int32_t
size
=
taosArrayGetSize
(
pool
->
workers
);
for
(
int32_t
i
=
0
;
i
<
size
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
size
;
++
i
)
{
SQWorker
*
worker
=
taosArrayGetP
(
pool
->
workers
,
i
);
SQ
ueue
Worker
*
worker
=
taosArrayGetP
(
pool
->
workers
,
i
);
if
(
taosCheckPthreadValid
(
worker
->
thread
))
{
if
(
taosCheckPthreadValid
(
worker
->
thread
))
{
taosQsetThreadResume
(
pool
->
qset
);
taosQsetThreadResume
(
pool
->
qset
);
}
}
}
}
for
(
int32_t
i
=
0
;
i
<
size
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
size
;
++
i
)
{
SQWorker
*
worker
=
taosArrayGetP
(
pool
->
workers
,
i
);
SQ
ueue
Worker
*
worker
=
taosArrayGetP
(
pool
->
workers
,
i
);
if
(
taosCheckPthreadValid
(
worker
->
thread
))
{
if
(
taosCheckPthreadValid
(
worker
->
thread
))
{
uInfo
(
"worker:%s:%d is stopping"
,
pool
->
name
,
worker
->
id
);
uInfo
(
"worker:%s:%d is stopping"
,
pool
->
name
,
worker
->
id
);
taosThreadJoin
(
worker
->
thread
,
NULL
);
taosThreadJoin
(
worker
->
thread
,
NULL
);
...
@@ -177,7 +177,7 @@ void tAutoQWorkerCleanup(SAutoQWorkerPool *pool) {
...
@@ -177,7 +177,7 @@ void tAutoQWorkerCleanup(SAutoQWorkerPool *pool) {
uInfo
(
"worker:%s is closed"
,
pool
->
name
);
uInfo
(
"worker:%s is closed"
,
pool
->
name
);
}
}
static
void
*
tAutoQWorkerThreadFp
(
SQWorker
*
worker
)
{
static
void
*
tAutoQWorkerThreadFp
(
SQ
ueue
Worker
*
worker
)
{
SAutoQWorkerPool
*
pool
=
worker
->
pool
;
SAutoQWorkerPool
*
pool
=
worker
->
pool
;
SQueueInfo
qinfo
=
{
0
};
SQueueInfo
qinfo
=
{
0
};
void
*
msg
=
NULL
;
void
*
msg
=
NULL
;
...
@@ -222,7 +222,7 @@ STaosQueue *tAutoQWorkerAllocQueue(SAutoQWorkerPool *pool, void *ahandle, FItem
...
@@ -222,7 +222,7 @@ STaosQueue *tAutoQWorkerAllocQueue(SAutoQWorkerPool *pool, void *ahandle, FItem
// spawn a thread to process queue
// spawn a thread to process queue
while
(
curWorkerNum
<
dstWorkerNum
)
{
while
(
curWorkerNum
<
dstWorkerNum
)
{
SQ
Worker
*
worker
=
taosMemoryCalloc
(
1
,
sizeof
(
SQ
Worker
));
SQ
ueueWorker
*
worker
=
taosMemoryCalloc
(
1
,
sizeof
(
SQueue
Worker
));
if
(
worker
==
NULL
||
taosArrayPush
(
pool
->
workers
,
&
worker
)
==
NULL
)
{
if
(
worker
==
NULL
||
taosArrayPush
(
pool
->
workers
,
&
worker
)
==
NULL
)
{
uError
(
"worker:%s:%d failed to create"
,
pool
->
name
,
curWorkerNum
);
uError
(
"worker:%s:%d failed to create"
,
pool
->
name
,
curWorkerNum
);
taosMemoryFree
(
worker
);
taosMemoryFree
(
worker
);
...
...
source/util/src/xxhash.c
已删除
100644 → 0
浏览文件 @
6e0a3494
/*
* xxHash - Fast Hash algorithm
* Copyright (C) 2012-2016, Yann Collet
*
* BSD 2-Clause License (http://www.opensource.org/licenses/bsd-license.php)
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
* You can contact the author at :
* - xxHash homepage: http://www.xxhash.com
* - xxHash source repository : https://github.com/Cyan4973/xxHash
*/
/* *************************************
* Tuning parameters
***************************************/
/*!XXH_FORCE_MEMORY_ACCESS :
* By default, access to unaligned memory is controlled by `memcpy()`, which is safe and portable.
* Unfortunately, on some target/compiler combinations, the generated assembly is sub-optimal.
* The below switch allow to select different access method for improved performance.
* Method 0 (default) : use `memcpy()`. Safe and portable.
* Method 1 : `__packed` statement. It depends on compiler extension (ie, not portable).
* This method is safe if your compiler supports it, and *generally* as fast or faster than `memcpy`.
* Method 2 : direct access. This method doesn't depend on compiler but violate C standard.
* It can generate buggy code on targets which do not support unaligned memory accesses.
* But in some circumstances, it's the only known way to get the most performance (ie GCC + ARMv6)
* See http://stackoverflow.com/a/32095106/646947 for details.
* Prefer these methods in priority order (0 > 1 > 2)
*/
#ifndef XXH_FORCE_MEMORY_ACCESS
/* can be defined externally, on command line for example */
# if defined(__GNUC__) && ( defined(__ARM_ARCH_6__) || defined(__ARM_ARCH_6J__) \
|| defined(__ARM_ARCH_6K__) || defined(__ARM_ARCH_6Z__) \
|| defined(__ARM_ARCH_6ZK__) || defined(__ARM_ARCH_6T2__) )
# define XXH_FORCE_MEMORY_ACCESS 2
# elif (defined(__INTEL_COMPILER) && !defined(_WIN32)) || \
(defined(__GNUC__) && ( defined(__ARM_ARCH_7__) || defined(__ARM_ARCH_7A__) \
|| defined(__ARM_ARCH_7R__) || defined(__ARM_ARCH_7M__) \
|| defined(__ARM_ARCH_7S__) ))
# define XXH_FORCE_MEMORY_ACCESS 1
# endif
#endif
/*!XXH_ACCEPT_NULL_INPUT_POINTER :
* If input pointer is NULL, xxHash default behavior is to dereference it, triggering a segfault.
* When this macro is enabled, xxHash actively checks input for null pointer.
* It it is, result for null input pointers is the same as a null-length input.
*/
#ifndef XXH_ACCEPT_NULL_INPUT_POINTER
/* can be defined externally */
# define XXH_ACCEPT_NULL_INPUT_POINTER 0
#endif
/*!XXH_FORCE_NATIVE_FORMAT :
* By default, xxHash library provides endian-independent Hash values, based on little-endian convention.
* Results are therefore identical for little-endian and big-endian CPU.
* This comes at a performance cost for big-endian CPU, since some swapping is required to emulate little-endian format.
* Should endian-independence be of no importance for your application, you may set the #define below to 1,
* to improve speed for Big-endian CPU.
* This option has no impact on Little_Endian CPU.
*/
#ifndef XXH_FORCE_NATIVE_FORMAT
/* can be defined externally */
# define XXH_FORCE_NATIVE_FORMAT 0
#endif
/*!XXH_FORCE_ALIGN_CHECK :
* This is a minor performance trick, only useful with lots of very small keys.
* It means : check for aligned/unaligned input.
* The check costs one initial branch per hash;
* set it to 0 when the input is guaranteed to be aligned,
* or when alignment doesn't matter for performance.
*/
#ifndef XXH_FORCE_ALIGN_CHECK
/* can be defined externally */
# if defined(__i386) || defined(_M_IX86) || defined(__x86_64__) || defined(_M_X64)
# define XXH_FORCE_ALIGN_CHECK 0
# else
# define XXH_FORCE_ALIGN_CHECK 1
# endif
#endif
/* *************************************
* Includes & Memory related functions
***************************************/
/*! Modify the local functions below should you wish to use some other memory routines
* for malloc(), free() */
#include <stdlib.h>
static
void
*
XXH_malloc
(
size_t
s
)
{
return
malloc
(
s
);
}
static
void
XXH_free
(
void
*
p
)
{
free
(
p
);
}
/*! and for memcpy() */
#include <string.h>
static
void
*
XXH_memcpy
(
void
*
dest
,
const
void
*
src
,
size_t
size
)
{
return
memcpy
(
dest
,
src
,
size
);
}
#include <assert.h>
/* assert */
#define XXH_STATIC_LINKING_ONLY
#include "xxhash.h"
/* *************************************
* Compiler Specific Options
***************************************/
#ifdef _MSC_VER
/* Visual Studio */
# pragma warning(disable : 4127)
/* disable: C4127: conditional expression is constant */
# define FORCE_INLINE static __forceinline
#else
# if defined (__cplusplus) || defined (__STDC_VERSION__) && __STDC_VERSION__ >= 199901L
/* C99 */
# ifdef __GNUC__
# define FORCE_INLINE static inline __attribute__((always_inline))
# else
# define FORCE_INLINE static inline
# endif
# else
# define FORCE_INLINE static
# endif
/* __STDC_VERSION__ */
#endif
/* *************************************
* Basic Types
***************************************/
#ifndef MEM_MODULE
# if !defined (__VMS) \
&& (defined (__cplusplus) \
|| (defined (__STDC_VERSION__) && (__STDC_VERSION__ >= 199901L)
/* C99 */
) )
# include <stdint.h>
typedef
uint8_t
BYTE
;
typedef
uint16_t
U16
;
typedef
uint32_t
U32
;
# else
typedef
unsigned
char
BYTE
;
typedef
unsigned
short
U16
;
typedef
unsigned
int
U32
;
# endif
#endif
#if (defined(XXH_FORCE_MEMORY_ACCESS) && (XXH_FORCE_MEMORY_ACCESS==2))
/* Force direct memory access. Only works on CPU which support unaligned memory access in hardware */
static
U32
XXH_read32
(
const
void
*
memPtr
)
{
return
*
(
const
U32
*
)
memPtr
;
}
#elif (defined(XXH_FORCE_MEMORY_ACCESS) && (XXH_FORCE_MEMORY_ACCESS==1))
/* __pack instructions are safer, but compiler specific, hence potentially problematic for some compilers */
/* currently only defined for gcc and icc */
typedef
union
{
U32
u32
;
}
__attribute__
((
packed
))
unalign
;
static
U32
XXH_read32
(
const
void
*
ptr
)
{
return
((
const
unalign
*
)
ptr
)
->
u32
;
}
#else
/* portable and safe solution. Generally efficient.
* see : http://stackoverflow.com/a/32095106/646947
*/
static
U32
XXH_read32
(
const
void
*
memPtr
)
{
U32
val
;
memcpy
(
&
val
,
memPtr
,
sizeof
(
val
));
return
val
;
}
#endif
/* XXH_FORCE_DIRECT_MEMORY_ACCESS */
/* ****************************************
* Compiler-specific Functions and Macros
******************************************/
#define XXH_GCC_VERSION (__GNUC__ * 100 + __GNUC_MINOR__)
/* Note : although _rotl exists for minGW (GCC under windows), performance seems poor */
#if defined(_MSC_VER)
# define XXH_rotl32(x,r) _rotl(x,r)
# define XXH_rotl64(x,r) _rotl64(x,r)
#else
# define XXH_rotl32(x,r) ((x << r) | (x >> (32 - r)))
# define XXH_rotl64(x,r) ((x << r) | (x >> (64 - r)))
#endif
#if defined(_MSC_VER)
/* Visual Studio */
# define XXH_swap32 _byteswap_ulong
#elif XXH_GCC_VERSION >= 403
# define XXH_swap32 __builtin_bswap32
#else
static
U32
XXH_swap32
(
U32
x
)
{
return
((
x
<<
24
)
&
0xff000000
)
|
((
x
<<
8
)
&
0x00ff0000
)
|
((
x
>>
8
)
&
0x0000ff00
)
|
((
x
>>
24
)
&
0x000000ff
);
}
#endif
/* *************************************
* Architecture Macros
***************************************/
typedef
enum
{
XXH_bigEndian
=
0
,
XXH_littleEndian
=
1
}
XXH_endianess
;
/* XXH_CPU_LITTLE_ENDIAN can be defined externally, for example on the compiler command line */
#ifndef XXH_CPU_LITTLE_ENDIAN
static
int
XXH_isLittleEndian
(
void
)
{
const
union
{
U32
u
;
BYTE
c
[
4
];
}
one
=
{
1
};
/* don't use static : performance detrimental */
return
one
.
c
[
0
];
}
# define XXH_CPU_LITTLE_ENDIAN XXH_isLittleEndian()
#endif
/* ***************************
* Memory reads
*****************************/
typedef
enum
{
XXH_aligned
,
XXH_unaligned
}
XXH_alignment
;
FORCE_INLINE
U32
XXH_readLE32_align
(
const
void
*
ptr
,
XXH_endianess
endian
,
XXH_alignment
align
)
{
if
(
align
==
XXH_unaligned
)
return
endian
==
XXH_littleEndian
?
XXH_read32
(
ptr
)
:
XXH_swap32
(
XXH_read32
(
ptr
));
else
return
endian
==
XXH_littleEndian
?
*
(
const
U32
*
)
ptr
:
XXH_swap32
(
*
(
const
U32
*
)
ptr
);
}
FORCE_INLINE
U32
XXH_readLE32
(
const
void
*
ptr
,
XXH_endianess
endian
)
{
return
XXH_readLE32_align
(
ptr
,
endian
,
XXH_unaligned
);
}
static
U32
XXH_readBE32
(
const
void
*
ptr
)
{
return
XXH_CPU_LITTLE_ENDIAN
?
XXH_swap32
(
XXH_read32
(
ptr
))
:
XXH_read32
(
ptr
);
}
/* *************************************
* Macros
***************************************/
#define XXH_STATIC_ASSERT(c) { enum { XXH_sa = 1/(int)(!!(c)) }; }
/* use after variable declarations */
XXH_PUBLIC_API
unsigned
XXH_versionNumber
(
void
)
{
return
XXH_VERSION_NUMBER
;
}
/* *******************************************************************
* 32-bit hash functions
*********************************************************************/
static
const
U32
PRIME32_1
=
2654435761U
;
static
const
U32
PRIME32_2
=
2246822519U
;
static
const
U32
PRIME32_3
=
3266489917U
;
static
const
U32
PRIME32_4
=
668265263U
;
static
const
U32
PRIME32_5
=
374761393U
;
static
U32
XXH32_round
(
U32
seed
,
U32
input
)
{
seed
+=
input
*
PRIME32_2
;
seed
=
XXH_rotl32
(
seed
,
13
);
seed
*=
PRIME32_1
;
return
seed
;
}
/* mix all bits */
static
U32
XXH32_avalanche
(
U32
h32
)
{
h32
^=
h32
>>
15
;
h32
*=
PRIME32_2
;
h32
^=
h32
>>
13
;
h32
*=
PRIME32_3
;
h32
^=
h32
>>
16
;
return
(
h32
);
}
#define XXH_get32bits(p) XXH_readLE32_align(p, endian, align)
static
U32
XXH32_finalize
(
U32
h32
,
const
void
*
ptr
,
size_t
len
,
XXH_endianess
endian
,
XXH_alignment
align
)
{
const
BYTE
*
p
=
(
const
BYTE
*
)
ptr
;
#define PROCESS1 \
h32 += (*p++) * PRIME32_5; \
h32 = XXH_rotl32(h32, 11) * PRIME32_1 ;
#define PROCESS4 \
h32 += XXH_get32bits(p) * PRIME32_3; \
p+=4; \
h32 = XXH_rotl32(h32, 17) * PRIME32_4 ;
switch
(
len
&
15
)
/* or switch(bEnd - p) */
{
case
12
:
PROCESS4
;
/* fallthrough */
case
8
:
PROCESS4
;
/* fallthrough */
case
4
:
PROCESS4
;
return
XXH32_avalanche
(
h32
);
case
13
:
PROCESS4
;
/* fallthrough */
case
9
:
PROCESS4
;
/* fallthrough */
case
5
:
PROCESS4
;
PROCESS1
;
return
XXH32_avalanche
(
h32
);
case
14
:
PROCESS4
;
/* fallthrough */
case
10
:
PROCESS4
;
/* fallthrough */
case
6
:
PROCESS4
;
PROCESS1
;
PROCESS1
;
return
XXH32_avalanche
(
h32
);
case
15
:
PROCESS4
;
/* fallthrough */
case
11
:
PROCESS4
;
/* fallthrough */
case
7
:
PROCESS4
;
/* fallthrough */
case
3
:
PROCESS1
;
/* fallthrough */
case
2
:
PROCESS1
;
/* fallthrough */
case
1
:
PROCESS1
;
/* fallthrough */
case
0
:
return
XXH32_avalanche
(
h32
);
}
assert
(
0
);
return
h32
;
/* reaching this point is deemed impossible */
}
FORCE_INLINE
U32
XXH32_endian_align
(
const
void
*
input
,
size_t
len
,
U32
seed
,
XXH_endianess
endian
,
XXH_alignment
align
)
{
const
BYTE
*
p
=
(
const
BYTE
*
)
input
;
const
BYTE
*
bEnd
=
p
+
len
;
U32
h32
;
#if defined(XXH_ACCEPT_NULL_INPUT_POINTER) && (XXH_ACCEPT_NULL_INPUT_POINTER>=1)
if
(
p
==
NULL
)
{
len
=
0
;
bEnd
=
p
=
(
const
BYTE
*
)(
size_t
)
16
;
}
#endif
if
(
len
>=
16
)
{
const
BYTE
*
const
limit
=
bEnd
-
15
;
U32
v1
=
seed
+
PRIME32_1
+
PRIME32_2
;
U32
v2
=
seed
+
PRIME32_2
;
U32
v3
=
seed
+
0
;
U32
v4
=
seed
-
PRIME32_1
;
do
{
v1
=
XXH32_round
(
v1
,
XXH_get32bits
(
p
));
p
+=
4
;
v2
=
XXH32_round
(
v2
,
XXH_get32bits
(
p
));
p
+=
4
;
v3
=
XXH32_round
(
v3
,
XXH_get32bits
(
p
));
p
+=
4
;
v4
=
XXH32_round
(
v4
,
XXH_get32bits
(
p
));
p
+=
4
;
}
while
(
p
<
limit
);
h32
=
XXH_rotl32
(
v1
,
1
)
+
XXH_rotl32
(
v2
,
7
)
+
XXH_rotl32
(
v3
,
12
)
+
XXH_rotl32
(
v4
,
18
);
}
else
{
h32
=
seed
+
PRIME32_5
;
}
h32
+=
(
U32
)
len
;
return
XXH32_finalize
(
h32
,
p
,
len
&
15
,
endian
,
align
);
}
XXH_PUBLIC_API
unsigned
int
XXH32
(
const
void
*
input
,
size_t
len
,
unsigned
int
seed
)
{
#if 0
/* Simple version, good for code maintenance, but unfortunately slow for small inputs */
XXH32_state_t state;
XXH32_reset(&state, seed);
XXH32_update(&state, input, len);
return XXH32_digest(&state);
#else
XXH_endianess
endian_detected
=
(
XXH_endianess
)
XXH_CPU_LITTLE_ENDIAN
;
if
(
XXH_FORCE_ALIGN_CHECK
)
{
if
((((
size_t
)
input
)
&
3
)
==
0
)
{
/* Input is 4-bytes aligned, leverage the speed benefit */
if
((
endian_detected
==
XXH_littleEndian
)
||
XXH_FORCE_NATIVE_FORMAT
)
return
XXH32_endian_align
(
input
,
len
,
seed
,
XXH_littleEndian
,
XXH_aligned
);
else
return
XXH32_endian_align
(
input
,
len
,
seed
,
XXH_bigEndian
,
XXH_aligned
);
}
}
if
((
endian_detected
==
XXH_littleEndian
)
||
XXH_FORCE_NATIVE_FORMAT
)
return
XXH32_endian_align
(
input
,
len
,
seed
,
XXH_littleEndian
,
XXH_unaligned
);
else
return
XXH32_endian_align
(
input
,
len
,
seed
,
XXH_bigEndian
,
XXH_unaligned
);
#endif
}
/*====== Hash streaming ======*/
XXH_PUBLIC_API
XXH32_state_t
*
XXH32_createState
(
void
)
{
return
(
XXH32_state_t
*
)
XXH_malloc
(
sizeof
(
XXH32_state_t
));
}
XXH_PUBLIC_API
XXH_errorcode
XXH32_freeState
(
XXH32_state_t
*
statePtr
)
{
XXH_free
(
statePtr
);
return
XXH_OK
;
}
XXH_PUBLIC_API
void
XXH32_copyState
(
XXH32_state_t
*
dstState
,
const
XXH32_state_t
*
srcState
)
{
memcpy
(
dstState
,
srcState
,
sizeof
(
*
dstState
));
}
XXH_PUBLIC_API
XXH_errorcode
XXH32_reset
(
XXH32_state_t
*
statePtr
,
unsigned
int
seed
)
{
XXH32_state_t
state
;
/* using a local state to memcpy() in order to avoid strict-aliasing warnings */
memset
(
&
state
,
0
,
sizeof
(
state
));
state
.
v1
=
seed
+
PRIME32_1
+
PRIME32_2
;
state
.
v2
=
seed
+
PRIME32_2
;
state
.
v3
=
seed
+
0
;
state
.
v4
=
seed
-
PRIME32_1
;
/* do not write into reserved, planned to be removed in a future version */
memcpy
(
statePtr
,
&
state
,
sizeof
(
state
)
-
sizeof
(
state
.
reserved
));
return
XXH_OK
;
}
FORCE_INLINE
XXH_errorcode
XXH32_update_endian
(
XXH32_state_t
*
state
,
const
void
*
input
,
size_t
len
,
XXH_endianess
endian
)
{
if
(
input
==
NULL
)
#if defined(XXH_ACCEPT_NULL_INPUT_POINTER) && (XXH_ACCEPT_NULL_INPUT_POINTER>=1)
return
XXH_OK
;
#else
return
XXH_ERROR
;
#endif
{
const
BYTE
*
p
=
(
const
BYTE
*
)
input
;
const
BYTE
*
const
bEnd
=
p
+
len
;
state
->
total_len_32
+=
(
unsigned
)
len
;
state
->
large_len
|=
(
len
>=
16
)
|
(
state
->
total_len_32
>=
16
);
if
(
state
->
memsize
+
len
<
16
)
{
/* fill in tmp buffer */
XXH_memcpy
((
BYTE
*
)(
state
->
mem32
)
+
state
->
memsize
,
input
,
len
);
state
->
memsize
+=
(
unsigned
)
len
;
return
XXH_OK
;
}
if
(
state
->
memsize
)
{
/* some data left from previous update */
XXH_memcpy
((
BYTE
*
)(
state
->
mem32
)
+
state
->
memsize
,
input
,
16
-
state
->
memsize
);
{
const
U32
*
p32
=
state
->
mem32
;
state
->
v1
=
XXH32_round
(
state
->
v1
,
XXH_readLE32
(
p32
,
endian
));
p32
++
;
state
->
v2
=
XXH32_round
(
state
->
v2
,
XXH_readLE32
(
p32
,
endian
));
p32
++
;
state
->
v3
=
XXH32_round
(
state
->
v3
,
XXH_readLE32
(
p32
,
endian
));
p32
++
;
state
->
v4
=
XXH32_round
(
state
->
v4
,
XXH_readLE32
(
p32
,
endian
));
}
p
+=
16
-
state
->
memsize
;
state
->
memsize
=
0
;
}
if
(
p
<=
bEnd
-
16
)
{
const
BYTE
*
const
limit
=
bEnd
-
16
;
U32
v1
=
state
->
v1
;
U32
v2
=
state
->
v2
;
U32
v3
=
state
->
v3
;
U32
v4
=
state
->
v4
;
do
{
v1
=
XXH32_round
(
v1
,
XXH_readLE32
(
p
,
endian
));
p
+=
4
;
v2
=
XXH32_round
(
v2
,
XXH_readLE32
(
p
,
endian
));
p
+=
4
;
v3
=
XXH32_round
(
v3
,
XXH_readLE32
(
p
,
endian
));
p
+=
4
;
v4
=
XXH32_round
(
v4
,
XXH_readLE32
(
p
,
endian
));
p
+=
4
;
}
while
(
p
<=
limit
);
state
->
v1
=
v1
;
state
->
v2
=
v2
;
state
->
v3
=
v3
;
state
->
v4
=
v4
;
}
if
(
p
<
bEnd
)
{
XXH_memcpy
(
state
->
mem32
,
p
,
(
size_t
)(
bEnd
-
p
));
state
->
memsize
=
(
unsigned
)(
bEnd
-
p
);
}
}
return
XXH_OK
;
}
XXH_PUBLIC_API
XXH_errorcode
XXH32_update
(
XXH32_state_t
*
state_in
,
const
void
*
input
,
size_t
len
)
{
XXH_endianess
endian_detected
=
(
XXH_endianess
)
XXH_CPU_LITTLE_ENDIAN
;
if
((
endian_detected
==
XXH_littleEndian
)
||
XXH_FORCE_NATIVE_FORMAT
)
return
XXH32_update_endian
(
state_in
,
input
,
len
,
XXH_littleEndian
);
else
return
XXH32_update_endian
(
state_in
,
input
,
len
,
XXH_bigEndian
);
}
FORCE_INLINE
U32
XXH32_digest_endian
(
const
XXH32_state_t
*
state
,
XXH_endianess
endian
)
{
U32
h32
;
if
(
state
->
large_len
)
{
h32
=
XXH_rotl32
(
state
->
v1
,
1
)
+
XXH_rotl32
(
state
->
v2
,
7
)
+
XXH_rotl32
(
state
->
v3
,
12
)
+
XXH_rotl32
(
state
->
v4
,
18
);
}
else
{
h32
=
state
->
v3
/* == seed */
+
PRIME32_5
;
}
h32
+=
state
->
total_len_32
;
return
XXH32_finalize
(
h32
,
state
->
mem32
,
state
->
memsize
,
endian
,
XXH_aligned
);
}
XXH_PUBLIC_API
unsigned
int
XXH32_digest
(
const
XXH32_state_t
*
state_in
)
{
XXH_endianess
endian_detected
=
(
XXH_endianess
)
XXH_CPU_LITTLE_ENDIAN
;
if
((
endian_detected
==
XXH_littleEndian
)
||
XXH_FORCE_NATIVE_FORMAT
)
return
XXH32_digest_endian
(
state_in
,
XXH_littleEndian
);
else
return
XXH32_digest_endian
(
state_in
,
XXH_bigEndian
);
}
/*====== Canonical representation ======*/
/*! Default XXH result types are basic unsigned 32 and 64 bits.
* The canonical representation follows human-readable write convention, aka big-endian (large digits first).
* These functions allow transformation of hash result into and from its canonical format.
* This way, hash values can be written into a file or buffer, remaining comparable across different systems.
*/
XXH_PUBLIC_API
void
XXH32_canonicalFromHash
(
XXH32_canonical_t
*
dst
,
XXH32_hash_t
hash
)
{
XXH_STATIC_ASSERT
(
sizeof
(
XXH32_canonical_t
)
==
sizeof
(
XXH32_hash_t
));
if
(
XXH_CPU_LITTLE_ENDIAN
)
hash
=
XXH_swap32
(
hash
);
memcpy
(
dst
,
&
hash
,
sizeof
(
*
dst
));
}
XXH_PUBLIC_API
XXH32_hash_t
XXH32_hashFromCanonical
(
const
XXH32_canonical_t
*
src
)
{
return
XXH_readBE32
(
src
);
}
#ifndef XXH_NO_LONG_LONG
/* *******************************************************************
* 64-bit hash functions
*********************************************************************/
/*====== Memory access ======*/
#ifndef MEM_MODULE
# define MEM_MODULE
# if !defined (__VMS) \
&& (defined (__cplusplus) \
|| (defined (__STDC_VERSION__) && (__STDC_VERSION__ >= 199901L)
/* C99 */
) )
# include <stdint.h>
typedef
uint64_t
U64
;
# else
/* if compiler doesn't support unsigned long long, replace by another 64-bit type */
typedef
unsigned
long
long
U64
;
# endif
#endif
#if (defined(XXH_FORCE_MEMORY_ACCESS) && (XXH_FORCE_MEMORY_ACCESS==2))
/* Force direct memory access. Only works on CPU which support unaligned memory access in hardware */
static
U64
XXH_read64
(
const
void
*
memPtr
)
{
return
*
(
const
U64
*
)
memPtr
;
}
#elif (defined(XXH_FORCE_MEMORY_ACCESS) && (XXH_FORCE_MEMORY_ACCESS==1))
/* __pack instructions are safer, but compiler specific, hence potentially problematic for some compilers */
/* currently only defined for gcc and icc */
typedef
union
{
U32
u32
;
U64
u64
;
}
__attribute__
((
packed
))
unalign64
;
static
U64
XXH_read64
(
const
void
*
ptr
)
{
return
((
const
unalign64
*
)
ptr
)
->
u64
;
}
#else
/* portable and safe solution. Generally efficient.
* see : http://stackoverflow.com/a/32095106/646947
*/
static
U64
XXH_read64
(
const
void
*
memPtr
)
{
U64
val
;
memcpy
(
&
val
,
memPtr
,
sizeof
(
val
));
return
val
;
}
#endif
/* XXH_FORCE_DIRECT_MEMORY_ACCESS */
#if defined(_MSC_VER)
/* Visual Studio */
# define XXH_swap64 _byteswap_uint64
#elif XXH_GCC_VERSION >= 403
# define XXH_swap64 __builtin_bswap64
#else
static
U64
XXH_swap64
(
U64
x
)
{
return
((
x
<<
56
)
&
0xff00000000000000ULL
)
|
((
x
<<
40
)
&
0x00ff000000000000ULL
)
|
((
x
<<
24
)
&
0x0000ff0000000000ULL
)
|
((
x
<<
8
)
&
0x000000ff00000000ULL
)
|
((
x
>>
8
)
&
0x00000000ff000000ULL
)
|
((
x
>>
24
)
&
0x0000000000ff0000ULL
)
|
((
x
>>
40
)
&
0x000000000000ff00ULL
)
|
((
x
>>
56
)
&
0x00000000000000ffULL
);
}
#endif
FORCE_INLINE
U64
XXH_readLE64_align
(
const
void
*
ptr
,
XXH_endianess
endian
,
XXH_alignment
align
)
{
if
(
align
==
XXH_unaligned
)
return
endian
==
XXH_littleEndian
?
XXH_read64
(
ptr
)
:
XXH_swap64
(
XXH_read64
(
ptr
));
else
return
endian
==
XXH_littleEndian
?
*
(
const
U64
*
)
ptr
:
XXH_swap64
(
*
(
const
U64
*
)
ptr
);
}
FORCE_INLINE
U64
XXH_readLE64
(
const
void
*
ptr
,
XXH_endianess
endian
)
{
return
XXH_readLE64_align
(
ptr
,
endian
,
XXH_unaligned
);
}
static
U64
XXH_readBE64
(
const
void
*
ptr
)
{
return
XXH_CPU_LITTLE_ENDIAN
?
XXH_swap64
(
XXH_read64
(
ptr
))
:
XXH_read64
(
ptr
);
}
/*====== xxh64 ======*/
static
const
U64
PRIME64_1
=
11400714785074694791ULL
;
static
const
U64
PRIME64_2
=
14029467366897019727ULL
;
static
const
U64
PRIME64_3
=
1609587929392839161ULL
;
static
const
U64
PRIME64_4
=
9650029242287828579ULL
;
static
const
U64
PRIME64_5
=
2870177450012600261ULL
;
static
U64
XXH64_round
(
U64
acc
,
U64
input
)
{
acc
+=
input
*
PRIME64_2
;
acc
=
XXH_rotl64
(
acc
,
31
);
acc
*=
PRIME64_1
;
return
acc
;
}
static
U64
XXH64_mergeRound
(
U64
acc
,
U64
val
)
{
val
=
XXH64_round
(
0
,
val
);
acc
^=
val
;
acc
=
acc
*
PRIME64_1
+
PRIME64_4
;
return
acc
;
}
static
U64
XXH64_avalanche
(
U64
h64
)
{
h64
^=
h64
>>
33
;
h64
*=
PRIME64_2
;
h64
^=
h64
>>
29
;
h64
*=
PRIME64_3
;
h64
^=
h64
>>
32
;
return
h64
;
}
#define XXH_get64bits(p) XXH_readLE64_align(p, endian, align)
static
U64
XXH64_finalize
(
U64
h64
,
const
void
*
ptr
,
size_t
len
,
XXH_endianess
endian
,
XXH_alignment
align
)
{
const
BYTE
*
p
=
(
const
BYTE
*
)
ptr
;
#define PROCESS1_64 \
h64 ^= (*p++) * PRIME64_5; \
h64 = XXH_rotl64(h64, 11) * PRIME64_1;
#define PROCESS4_64 \
h64 ^= (U64)(XXH_get32bits(p)) * PRIME64_1; \
p+=4; \
h64 = XXH_rotl64(h64, 23) * PRIME64_2 + PRIME64_3;
#define PROCESS8_64 { \
U64 const k1 = XXH64_round(0, XXH_get64bits(p)); \
p+=8; \
h64 ^= k1; \
h64 = XXH_rotl64(h64,27) * PRIME64_1 + PRIME64_4; \
}
switch
(
len
&
31
)
{
case
24
:
PROCESS8_64
;
/* fallthrough */
case
16
:
PROCESS8_64
;
/* fallthrough */
case
8
:
PROCESS8_64
;
return
XXH64_avalanche
(
h64
);
case
28
:
PROCESS8_64
;
/* fallthrough */
case
20
:
PROCESS8_64
;
/* fallthrough */
case
12
:
PROCESS8_64
;
/* fallthrough */
case
4
:
PROCESS4_64
;
return
XXH64_avalanche
(
h64
);
case
25
:
PROCESS8_64
;
/* fallthrough */
case
17
:
PROCESS8_64
;
/* fallthrough */
case
9
:
PROCESS8_64
;
PROCESS1_64
;
return
XXH64_avalanche
(
h64
);
case
29
:
PROCESS8_64
;
/* fallthrough */
case
21
:
PROCESS8_64
;
/* fallthrough */
case
13
:
PROCESS8_64
;
/* fallthrough */
case
5
:
PROCESS4_64
;
PROCESS1_64
;
return
XXH64_avalanche
(
h64
);
case
26
:
PROCESS8_64
;
/* fallthrough */
case
18
:
PROCESS8_64
;
/* fallthrough */
case
10
:
PROCESS8_64
;
PROCESS1_64
;
PROCESS1_64
;
return
XXH64_avalanche
(
h64
);
case
30
:
PROCESS8_64
;
/* fallthrough */
case
22
:
PROCESS8_64
;
/* fallthrough */
case
14
:
PROCESS8_64
;
/* fallthrough */
case
6
:
PROCESS4_64
;
PROCESS1_64
;
PROCESS1_64
;
return
XXH64_avalanche
(
h64
);
case
27
:
PROCESS8_64
;
/* fallthrough */
case
19
:
PROCESS8_64
;
/* fallthrough */
case
11
:
PROCESS8_64
;
PROCESS1_64
;
PROCESS1_64
;
PROCESS1_64
;
return
XXH64_avalanche
(
h64
);
case
31
:
PROCESS8_64
;
/* fallthrough */
case
23
:
PROCESS8_64
;
/* fallthrough */
case
15
:
PROCESS8_64
;
/* fallthrough */
case
7
:
PROCESS4_64
;
/* fallthrough */
case
3
:
PROCESS1_64
;
/* fallthrough */
case
2
:
PROCESS1_64
;
/* fallthrough */
case
1
:
PROCESS1_64
;
/* fallthrough */
case
0
:
return
XXH64_avalanche
(
h64
);
}
/* impossible to reach */
assert
(
0
);
return
0
;
/* unreachable, but some compilers complain without it */
}
FORCE_INLINE
U64
XXH64_endian_align
(
const
void
*
input
,
size_t
len
,
U64
seed
,
XXH_endianess
endian
,
XXH_alignment
align
)
{
const
BYTE
*
p
=
(
const
BYTE
*
)
input
;
const
BYTE
*
bEnd
=
p
+
len
;
U64
h64
;
#if defined(XXH_ACCEPT_NULL_INPUT_POINTER) && (XXH_ACCEPT_NULL_INPUT_POINTER>=1)
if
(
p
==
NULL
)
{
len
=
0
;
bEnd
=
p
=
(
const
BYTE
*
)(
size_t
)
32
;
}
#endif
if
(
len
>=
32
)
{
const
BYTE
*
const
limit
=
bEnd
-
32
;
U64
v1
=
seed
+
PRIME64_1
+
PRIME64_2
;
U64
v2
=
seed
+
PRIME64_2
;
U64
v3
=
seed
+
0
;
U64
v4
=
seed
-
PRIME64_1
;
do
{
v1
=
XXH64_round
(
v1
,
XXH_get64bits
(
p
));
p
+=
8
;
v2
=
XXH64_round
(
v2
,
XXH_get64bits
(
p
));
p
+=
8
;
v3
=
XXH64_round
(
v3
,
XXH_get64bits
(
p
));
p
+=
8
;
v4
=
XXH64_round
(
v4
,
XXH_get64bits
(
p
));
p
+=
8
;
}
while
(
p
<=
limit
);
h64
=
XXH_rotl64
(
v1
,
1
)
+
XXH_rotl64
(
v2
,
7
)
+
XXH_rotl64
(
v3
,
12
)
+
XXH_rotl64
(
v4
,
18
);
h64
=
XXH64_mergeRound
(
h64
,
v1
);
h64
=
XXH64_mergeRound
(
h64
,
v2
);
h64
=
XXH64_mergeRound
(
h64
,
v3
);
h64
=
XXH64_mergeRound
(
h64
,
v4
);
}
else
{
h64
=
seed
+
PRIME64_5
;
}
h64
+=
(
U64
)
len
;
return
XXH64_finalize
(
h64
,
p
,
len
,
endian
,
align
);
}
XXH_PUBLIC_API
unsigned
long
long
XXH64
(
const
void
*
input
,
size_t
len
,
unsigned
long
long
seed
)
{
#if 0
/* Simple version, good for code maintenance, but unfortunately slow for small inputs */
XXH64_state_t state;
XXH64_reset(&state, seed);
XXH64_update(&state, input, len);
return XXH64_digest(&state);
#else
XXH_endianess
endian_detected
=
(
XXH_endianess
)
XXH_CPU_LITTLE_ENDIAN
;
if
(
XXH_FORCE_ALIGN_CHECK
)
{
if
((((
size_t
)
input
)
&
7
)
==
0
)
{
/* Input is aligned, let's leverage the speed advantage */
if
((
endian_detected
==
XXH_littleEndian
)
||
XXH_FORCE_NATIVE_FORMAT
)
return
XXH64_endian_align
(
input
,
len
,
seed
,
XXH_littleEndian
,
XXH_aligned
);
else
return
XXH64_endian_align
(
input
,
len
,
seed
,
XXH_bigEndian
,
XXH_aligned
);
}
}
if
((
endian_detected
==
XXH_littleEndian
)
||
XXH_FORCE_NATIVE_FORMAT
)
return
XXH64_endian_align
(
input
,
len
,
seed
,
XXH_littleEndian
,
XXH_unaligned
);
else
return
XXH64_endian_align
(
input
,
len
,
seed
,
XXH_bigEndian
,
XXH_unaligned
);
#endif
}
/*====== Hash Streaming ======*/
XXH_PUBLIC_API
XXH64_state_t
*
XXH64_createState
(
void
)
{
return
(
XXH64_state_t
*
)
XXH_malloc
(
sizeof
(
XXH64_state_t
));
}
XXH_PUBLIC_API
XXH_errorcode
XXH64_freeState
(
XXH64_state_t
*
statePtr
)
{
XXH_free
(
statePtr
);
return
XXH_OK
;
}
XXH_PUBLIC_API
void
XXH64_copyState
(
XXH64_state_t
*
dstState
,
const
XXH64_state_t
*
srcState
)
{
memcpy
(
dstState
,
srcState
,
sizeof
(
*
dstState
));
}
XXH_PUBLIC_API
XXH_errorcode
XXH64_reset
(
XXH64_state_t
*
statePtr
,
unsigned
long
long
seed
)
{
XXH64_state_t
state
;
/* using a local state to memcpy() in order to avoid strict-aliasing warnings */
memset
(
&
state
,
0
,
sizeof
(
state
));
state
.
v1
=
seed
+
PRIME64_1
+
PRIME64_2
;
state
.
v2
=
seed
+
PRIME64_2
;
state
.
v3
=
seed
+
0
;
state
.
v4
=
seed
-
PRIME64_1
;
/* do not write into reserved, planned to be removed in a future version */
memcpy
(
statePtr
,
&
state
,
sizeof
(
state
)
-
sizeof
(
state
.
reserved
));
return
XXH_OK
;
}
FORCE_INLINE
XXH_errorcode
XXH64_update_endian
(
XXH64_state_t
*
state
,
const
void
*
input
,
size_t
len
,
XXH_endianess
endian
)
{
if
(
input
==
NULL
)
#if defined(XXH_ACCEPT_NULL_INPUT_POINTER) && (XXH_ACCEPT_NULL_INPUT_POINTER>=1)
return
XXH_OK
;
#else
return
XXH_ERROR
;
#endif
{
const
BYTE
*
p
=
(
const
BYTE
*
)
input
;
const
BYTE
*
const
bEnd
=
p
+
len
;
state
->
total_len
+=
len
;
if
(
state
->
memsize
+
len
<
32
)
{
/* fill in tmp buffer */
XXH_memcpy
(((
BYTE
*
)
state
->
mem64
)
+
state
->
memsize
,
input
,
len
);
state
->
memsize
+=
(
U32
)
len
;
return
XXH_OK
;
}
if
(
state
->
memsize
)
{
/* tmp buffer is full */
XXH_memcpy
(((
BYTE
*
)
state
->
mem64
)
+
state
->
memsize
,
input
,
32
-
state
->
memsize
);
state
->
v1
=
XXH64_round
(
state
->
v1
,
XXH_readLE64
(
state
->
mem64
+
0
,
endian
));
state
->
v2
=
XXH64_round
(
state
->
v2
,
XXH_readLE64
(
state
->
mem64
+
1
,
endian
));
state
->
v3
=
XXH64_round
(
state
->
v3
,
XXH_readLE64
(
state
->
mem64
+
2
,
endian
));
state
->
v4
=
XXH64_round
(
state
->
v4
,
XXH_readLE64
(
state
->
mem64
+
3
,
endian
));
p
+=
32
-
state
->
memsize
;
state
->
memsize
=
0
;
}
if
(
p
+
32
<=
bEnd
)
{
const
BYTE
*
const
limit
=
bEnd
-
32
;
U64
v1
=
state
->
v1
;
U64
v2
=
state
->
v2
;
U64
v3
=
state
->
v3
;
U64
v4
=
state
->
v4
;
do
{
v1
=
XXH64_round
(
v1
,
XXH_readLE64
(
p
,
endian
));
p
+=
8
;
v2
=
XXH64_round
(
v2
,
XXH_readLE64
(
p
,
endian
));
p
+=
8
;
v3
=
XXH64_round
(
v3
,
XXH_readLE64
(
p
,
endian
));
p
+=
8
;
v4
=
XXH64_round
(
v4
,
XXH_readLE64
(
p
,
endian
));
p
+=
8
;
}
while
(
p
<=
limit
);
state
->
v1
=
v1
;
state
->
v2
=
v2
;
state
->
v3
=
v3
;
state
->
v4
=
v4
;
}
if
(
p
<
bEnd
)
{
XXH_memcpy
(
state
->
mem64
,
p
,
(
size_t
)(
bEnd
-
p
));
state
->
memsize
=
(
unsigned
)(
bEnd
-
p
);
}
}
return
XXH_OK
;
}
XXH_PUBLIC_API
XXH_errorcode
XXH64_update
(
XXH64_state_t
*
state_in
,
const
void
*
input
,
size_t
len
)
{
XXH_endianess
endian_detected
=
(
XXH_endianess
)
XXH_CPU_LITTLE_ENDIAN
;
if
((
endian_detected
==
XXH_littleEndian
)
||
XXH_FORCE_NATIVE_FORMAT
)
return
XXH64_update_endian
(
state_in
,
input
,
len
,
XXH_littleEndian
);
else
return
XXH64_update_endian
(
state_in
,
input
,
len
,
XXH_bigEndian
);
}
FORCE_INLINE
U64
XXH64_digest_endian
(
const
XXH64_state_t
*
state
,
XXH_endianess
endian
)
{
U64
h64
;
if
(
state
->
total_len
>=
32
)
{
U64
const
v1
=
state
->
v1
;
U64
const
v2
=
state
->
v2
;
U64
const
v3
=
state
->
v3
;
U64
const
v4
=
state
->
v4
;
h64
=
XXH_rotl64
(
v1
,
1
)
+
XXH_rotl64
(
v2
,
7
)
+
XXH_rotl64
(
v3
,
12
)
+
XXH_rotl64
(
v4
,
18
);
h64
=
XXH64_mergeRound
(
h64
,
v1
);
h64
=
XXH64_mergeRound
(
h64
,
v2
);
h64
=
XXH64_mergeRound
(
h64
,
v3
);
h64
=
XXH64_mergeRound
(
h64
,
v4
);
}
else
{
h64
=
state
->
v3
/*seed*/
+
PRIME64_5
;
}
h64
+=
(
U64
)
state
->
total_len
;
return
XXH64_finalize
(
h64
,
state
->
mem64
,
(
size_t
)
state
->
total_len
,
endian
,
XXH_aligned
);
}
XXH_PUBLIC_API
unsigned
long
long
XXH64_digest
(
const
XXH64_state_t
*
state_in
)
{
XXH_endianess
endian_detected
=
(
XXH_endianess
)
XXH_CPU_LITTLE_ENDIAN
;
if
((
endian_detected
==
XXH_littleEndian
)
||
XXH_FORCE_NATIVE_FORMAT
)
return
XXH64_digest_endian
(
state_in
,
XXH_littleEndian
);
else
return
XXH64_digest_endian
(
state_in
,
XXH_bigEndian
);
}
/*====== Canonical representation ======*/
XXH_PUBLIC_API
void
XXH64_canonicalFromHash
(
XXH64_canonical_t
*
dst
,
XXH64_hash_t
hash
)
{
XXH_STATIC_ASSERT
(
sizeof
(
XXH64_canonical_t
)
==
sizeof
(
XXH64_hash_t
));
if
(
XXH_CPU_LITTLE_ENDIAN
)
hash
=
XXH_swap64
(
hash
);
memcpy
(
dst
,
&
hash
,
sizeof
(
*
dst
));
}
XXH_PUBLIC_API
XXH64_hash_t
XXH64_hashFromCanonical
(
const
XXH64_canonical_t
*
src
)
{
return
XXH_readBE64
(
src
);
}
#endif
/* XXH_NO_LONG_LONG */
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录