未验证 提交 c90e2aa7 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #20069 from taosdata/feature/3_liaohj

fix(query): fix error in simd.
/*
xxHash - Extremely Fast Hash algorithm
Header File
Copyright (C) 2012-2016, Yann Collet.
BSD 2-Clause License (http://www.opensource.org/licenses/bsd-license.php)
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are
met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above
copyright notice, this list of conditions and the following disclaimer
in the documentation and/or other materials provided with the
distribution.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
You can contact the author at :
- xxHash source repository : https://github.com/Cyan4973/xxHash
*/
/* Notice extracted from xxHash homepage :
xxHash is an extremely fast Hash algorithm, running at RAM speed limits.
It also successfully passes all tests from the SMHasher suite.
Comparison (single thread, Windows Seven 32 bits, using SMHasher on a Core 2 Duo @3GHz)
Name Speed Q.Score Author
xxHash 5.4 GB/s 10
CrapWow 3.2 GB/s 2 Andrew
MumurHash 3a 2.7 GB/s 10 Austin Appleby
SpookyHash 2.0 GB/s 10 Bob Jenkins
SBox 1.4 GB/s 9 Bret Mulvey
Lookup3 1.2 GB/s 9 Bob Jenkins
SuperFastHash 1.2 GB/s 1 Paul Hsieh
CityHash64 1.05 GB/s 10 Pike & Alakuijala
FNV 0.55 GB/s 5 Fowler, Noll, Vo
CRC32 0.43 GB/s 9
MD5-32 0.33 GB/s 10 Ronald L. Rivest
SHA1-32 0.28 GB/s 10
Q.Score is a measure of quality of the hash function.
It depends on successfully passing SMHasher test set.
10 is a perfect score.
A 64-bit version, named XXH64, is available since r35.
It offers much better speed, but for 64-bit applications only.
Name Speed on 64 bits Speed on 32 bits
XXH64 13.8 GB/s 1.9 GB/s
XXH32 6.8 GB/s 6.0 GB/s
*/
#ifndef XXHASH_H_5627135585666179
#define XXHASH_H_5627135585666179 1
#if defined (__cplusplus)
extern "C" {
#endif
/* ****************************
* Definitions
******************************/
#include <stddef.h> /* size_t */
typedef enum { XXH_OK=0, XXH_ERROR } XXH_errorcode;
/* ****************************
* API modifier
******************************/
/** XXH_INLINE_ALL (and XXH_PRIVATE_API)
* This is useful to include xxhash functions in `static` mode
* in order to inline them, and remove their symbol from the public list.
* Inlining can offer dramatic performance improvement on small keys.
* Methodology :
* #define XXH_INLINE_ALL
* #include "xxhash.h"
* `xxhash.c` is automatically included.
* It's not useful to compile and link it as a separate module.
*/
#if defined(XXH_INLINE_ALL) || defined(XXH_PRIVATE_API)
# ifndef XXH_STATIC_LINKING_ONLY
# define XXH_STATIC_LINKING_ONLY
# endif
# if defined(__GNUC__)
# define XXH_PUBLIC_API static __inline __attribute__((unused))
# elif defined (__cplusplus) || (defined (__STDC_VERSION__) && (__STDC_VERSION__ >= 199901L) /* C99 */)
# define XXH_PUBLIC_API static inline
# elif defined(_MSC_VER)
# define XXH_PUBLIC_API static __inline
# else
/* this version may generate warnings for unused static functions */
# define XXH_PUBLIC_API static
# endif
#else
# define XXH_PUBLIC_API /* do nothing */
#endif /* XXH_INLINE_ALL || XXH_PRIVATE_API */
/*! XXH_NAMESPACE, aka Namespace Emulation :
*
* If you want to include _and expose_ xxHash functions from within your own library,
* but also want to avoid symbol collisions with other libraries which may also include xxHash,
*
* you can use XXH_NAMESPACE, to automatically prefix any public symbol from xxhash library
* with the value of XXH_NAMESPACE (therefore, avoid NULL and numeric values).
*
* Note that no change is required within the calling program as long as it includes `xxhash.h` :
* regular symbol name will be automatically translated by this header.
*/
#ifdef XXH_NAMESPACE
# define XXH_CAT(A,B) A##B
# define XXH_NAME2(A,B) XXH_CAT(A,B)
# define XXH_versionNumber XXH_NAME2(XXH_NAMESPACE, XXH_versionNumber)
# define XXH32 XXH_NAME2(XXH_NAMESPACE, XXH32)
# define XXH32_createState XXH_NAME2(XXH_NAMESPACE, XXH32_createState)
# define XXH32_freeState XXH_NAME2(XXH_NAMESPACE, XXH32_freeState)
# define XXH32_reset XXH_NAME2(XXH_NAMESPACE, XXH32_reset)
# define XXH32_update XXH_NAME2(XXH_NAMESPACE, XXH32_update)
# define XXH32_digest XXH_NAME2(XXH_NAMESPACE, XXH32_digest)
# define XXH32_copyState XXH_NAME2(XXH_NAMESPACE, XXH32_copyState)
# define XXH32_canonicalFromHash XXH_NAME2(XXH_NAMESPACE, XXH32_canonicalFromHash)
# define XXH32_hashFromCanonical XXH_NAME2(XXH_NAMESPACE, XXH32_hashFromCanonical)
# define XXH64 XXH_NAME2(XXH_NAMESPACE, XXH64)
# define XXH64_createState XXH_NAME2(XXH_NAMESPACE, XXH64_createState)
# define XXH64_freeState XXH_NAME2(XXH_NAMESPACE, XXH64_freeState)
# define XXH64_reset XXH_NAME2(XXH_NAMESPACE, XXH64_reset)
# define XXH64_update XXH_NAME2(XXH_NAMESPACE, XXH64_update)
# define XXH64_digest XXH_NAME2(XXH_NAMESPACE, XXH64_digest)
# define XXH64_copyState XXH_NAME2(XXH_NAMESPACE, XXH64_copyState)
# define XXH64_canonicalFromHash XXH_NAME2(XXH_NAMESPACE, XXH64_canonicalFromHash)
# define XXH64_hashFromCanonical XXH_NAME2(XXH_NAMESPACE, XXH64_hashFromCanonical)
#endif
/* *************************************
* Version
***************************************/
#define XXH_VERSION_MAJOR 0
#define XXH_VERSION_MINOR 6
#define XXH_VERSION_RELEASE 5
#define XXH_VERSION_NUMBER (XXH_VERSION_MAJOR *100*100 + XXH_VERSION_MINOR *100 + XXH_VERSION_RELEASE)
XXH_PUBLIC_API unsigned XXH_versionNumber (void);
/*-**********************************************************************
* 32-bit hash
************************************************************************/
typedef unsigned int XXH32_hash_t;
/*! XXH32() :
Calculate the 32-bit hash of sequence "length" bytes stored at memory address "input".
The memory between input & input+length must be valid (allocated and read-accessible).
"seed" can be used to alter the result predictably.
Speed on Core 2 Duo @ 3 GHz (single thread, SMHasher benchmark) : 5.4 GB/s */
XXH_PUBLIC_API XXH32_hash_t XXH32 (const void* input, size_t length, unsigned int seed);
/*====== Streaming ======*/
typedef struct XXH32_state_s XXH32_state_t; /* incomplete type */
XXH_PUBLIC_API XXH32_state_t* XXH32_createState(void);
XXH_PUBLIC_API XXH_errorcode XXH32_freeState(XXH32_state_t* statePtr);
XXH_PUBLIC_API void XXH32_copyState(XXH32_state_t* dst_state, const XXH32_state_t* src_state);
XXH_PUBLIC_API XXH_errorcode XXH32_reset (XXH32_state_t* statePtr, unsigned int seed);
XXH_PUBLIC_API XXH_errorcode XXH32_update (XXH32_state_t* statePtr, const void* input, size_t length);
XXH_PUBLIC_API XXH32_hash_t XXH32_digest (const XXH32_state_t* statePtr);
/*
* Streaming functions generate the xxHash of an input provided in multiple segments.
* Note that, for small input, they are slower than single-call functions, due to state management.
* For small inputs, prefer `XXH32()` and `XXH64()`, which are better optimized.
*
* XXH state must first be allocated, using XXH*_createState() .
*
* Start a new hash by initializing state with a seed, using XXH*_reset().
*
* Then, feed the hash state by calling XXH*_update() as many times as necessary.
* The function returns an error code, with 0 meaning OK, and any other value meaning there is an error.
*
* Finally, a hash value can be produced anytime, by using XXH*_digest().
* This function returns the nn-bits hash as an int or long long.
*
* It's still possible to continue inserting input into the hash state after a digest,
* and generate some new hashes later on, by calling again XXH*_digest().
*
* When done, free XXH state space if it was allocated dynamically.
*/
/*====== Canonical representation ======*/
typedef struct { unsigned char digest[4]; } XXH32_canonical_t;
XXH_PUBLIC_API void XXH32_canonicalFromHash(XXH32_canonical_t* dst, XXH32_hash_t hash);
XXH_PUBLIC_API XXH32_hash_t XXH32_hashFromCanonical(const XXH32_canonical_t* src);
/* Default result type for XXH functions are primitive unsigned 32 and 64 bits.
* The canonical representation uses human-readable write convention, aka big-endian (large digits first).
* These functions allow transformation of hash result into and from its canonical format.
* This way, hash values can be written into a file / memory, and remain comparable on different systems and programs.
*/
#ifndef XXH_NO_LONG_LONG
/*-**********************************************************************
* 64-bit hash
************************************************************************/
typedef unsigned long long XXH64_hash_t;
/*! XXH64() :
Calculate the 64-bit hash of sequence of length "len" stored at memory address "input".
"seed" can be used to alter the result predictably.
This function runs faster on 64-bit systems, but slower on 32-bit systems (see benchmark).
*/
XXH_PUBLIC_API XXH64_hash_t XXH64 (const void* input, size_t length, unsigned long long seed);
/*====== Streaming ======*/
typedef struct XXH64_state_s XXH64_state_t; /* incomplete type */
XXH_PUBLIC_API XXH64_state_t* XXH64_createState(void);
XXH_PUBLIC_API XXH_errorcode XXH64_freeState(XXH64_state_t* statePtr);
XXH_PUBLIC_API void XXH64_copyState(XXH64_state_t* dst_state, const XXH64_state_t* src_state);
XXH_PUBLIC_API XXH_errorcode XXH64_reset (XXH64_state_t* statePtr, unsigned long long seed);
XXH_PUBLIC_API XXH_errorcode XXH64_update (XXH64_state_t* statePtr, const void* input, size_t length);
XXH_PUBLIC_API XXH64_hash_t XXH64_digest (const XXH64_state_t* statePtr);
/*====== Canonical representation ======*/
typedef struct { unsigned char digest[8]; } XXH64_canonical_t;
XXH_PUBLIC_API void XXH64_canonicalFromHash(XXH64_canonical_t* dst, XXH64_hash_t hash);
XXH_PUBLIC_API XXH64_hash_t XXH64_hashFromCanonical(const XXH64_canonical_t* src);
#endif /* XXH_NO_LONG_LONG */
#ifdef XXH_STATIC_LINKING_ONLY
/* ================================================================================================
This section contains declarations which are not guaranteed to remain stable.
They may change in future versions, becoming incompatible with a different version of the library.
These declarations should only be used with static linking.
Never use them in association with dynamic linking !
=================================================================================================== */
/* These definitions are only present to allow
* static allocation of XXH state, on stack or in a struct for example.
* Never **ever** use members directly. */
#if !defined (__VMS) \
&& (defined (__cplusplus) \
|| (defined (__STDC_VERSION__) && (__STDC_VERSION__ >= 199901L) /* C99 */) )
# include <stdint.h>
struct XXH32_state_s {
uint32_t total_len_32;
uint32_t large_len;
uint32_t v1;
uint32_t v2;
uint32_t v3;
uint32_t v4;
uint32_t mem32[4];
uint32_t memsize;
uint32_t reserved; /* never read nor write, might be removed in a future version */
}; /* typedef'd to XXH32_state_t */
struct XXH64_state_s {
uint64_t total_len;
uint64_t v1;
uint64_t v2;
uint64_t v3;
uint64_t v4;
uint64_t mem64[4];
uint32_t memsize;
uint32_t reserved[2]; /* never read nor write, might be removed in a future version */
}; /* typedef'd to XXH64_state_t */
# else
struct XXH32_state_s {
unsigned total_len_32;
unsigned large_len;
unsigned v1;
unsigned v2;
unsigned v3;
unsigned v4;
unsigned mem32[4];
unsigned memsize;
unsigned reserved; /* never read nor write, might be removed in a future version */
}; /* typedef'd to XXH32_state_t */
# ifndef XXH_NO_LONG_LONG /* remove 64-bit support */
struct XXH64_state_s {
unsigned long long total_len;
unsigned long long v1;
unsigned long long v2;
unsigned long long v3;
unsigned long long v4;
unsigned long long mem64[4];
unsigned memsize;
unsigned reserved[2]; /* never read nor write, might be removed in a future version */
}; /* typedef'd to XXH64_state_t */
# endif
# endif
#if defined(XXH_INLINE_ALL) || defined(XXH_PRIVATE_API)
# include "xxhash.c" /* include xxhash function bodies as `static`, for inlining */
#endif
#endif /* XXH_STATIC_LINKING_ONLY */
#if defined (__cplusplus)
}
#endif
#endif /* XXHASH_H_5627135585666179 */
...@@ -458,6 +458,7 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT ...@@ -458,6 +458,7 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
pOffset->val = pVg->currentOffset; pOffset->val = pVg->currentOffset;
int32_t groupLen = strlen(tmq->groupId); int32_t groupLen = strlen(tmq->groupId);
...@@ -471,11 +472,13 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT ...@@ -471,11 +472,13 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT
if (code < 0) { if (code < 0) {
return -1; return -1;
} }
void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len); void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
if (buf == NULL) { if (buf == NULL) {
taosMemoryFree(pOffset); taosMemoryFree(pOffset);
return -1; return -1;
} }
((SMsgHead*)buf)->vgId = htonl(pVg->vgId); ((SMsgHead*)buf)->vgId = htonl(pVg->vgId);
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
...@@ -492,6 +495,7 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT ...@@ -492,6 +495,7 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT
taosMemoryFree(buf); taosMemoryFree(buf);
return -1; return -1;
} }
pParam->params = pParamSet; pParam->params = pParamSet;
pParam->pOffset = pOffset; pParam->pOffset = pOffset;
...@@ -503,14 +507,16 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT ...@@ -503,14 +507,16 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT
taosMemoryFree(pParam); taosMemoryFree(pParam);
return -1; return -1;
} }
pMsgSendInfo->msgInfo = (SDataBuf){ pMsgSendInfo->msgInfo = (SDataBuf){
.pData = buf, .pData = buf,
.len = sizeof(SMsgHead) + len, .len = sizeof(SMsgHead) + len,
.handle = NULL, .handle = NULL,
}; };
tscDebug("consumer:0x%" PRIx64 " topic:%s on vgId:%d offset:%" PRId64, tmq->consumerId, pOffset->subKey, SEp* pEp = &pVg->epSet.eps[pVg->epSet.inUse];
pVg->vgId, pOffset->val.version); tscDebug("consumer:0x%" PRIx64 " topic:%s on vgId:%d offset:%" PRId64" prev:%"PRId64", ep:%s:%d", tmq->consumerId, pOffset->subKey,
pVg->vgId, pOffset->val.version, pVg->committedOffset.version, pEp->fqdn, pEp->port);
// TODO: put into cb // TODO: put into cb
pVg->committedOffset = pVg->currentOffset; pVg->committedOffset = pVg->currentOffset;
...@@ -637,15 +643,16 @@ static int32_t tmqCommitConsumerImpl(tmq_t* tmq, int8_t automatic, int8_t async, ...@@ -637,15 +643,16 @@ static int32_t tmqCommitConsumerImpl(tmq_t* tmq, int8_t automatic, int8_t async,
// init as 1 to prevent concurrency issue // init as 1 to prevent concurrency issue
pParamSet->waitingRspNum = 1; pParamSet->waitingRspNum = 1;
for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) { int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
tscDebug("consumer:0x%"PRIx64" start to commit offset for %d topics", tmq->consumerId, numOfTopics);
for (int32_t i = 0; i < numOfTopics; i++) {
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs); int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs);
for (int32_t j = 0; j < numOfVgroups; j++) { for (int32_t j = 0; j < numOfVgroups; j++) {
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
if (pVg->currentOffset.type > 0 && !tOffsetEqual(&pVg->currentOffset, &pVg->committedOffset)) { if (pVg->currentOffset.type > 0 && !tOffsetEqual(&pVg->currentOffset, &pVg->committedOffset)) {
tscDebug("consumer:0x%" PRIx64 " topic:%s vgId:%d, current %" PRId64 ", committed %" PRId64, tmq->consumerId,
pTopic->topicName, pVg->vgId, pVg->currentOffset.version, pVg->committedOffset.version);
if (tmqSendCommitReq(tmq, pVg, pTopic, pParamSet) < 0) { if (tmqSendCommitReq(tmq, pVg, pTopic, pParamSet) < 0) {
continue; continue;
} }
...@@ -1085,7 +1092,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { ...@@ -1085,7 +1092,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
} }
tNameExtractFullName(&name, topicFName); tNameExtractFullName(&name, topicFName);
tscDebug("consumer:0x%"PRIx64", subscribe topic: %s", tmq->consumerId, topicFName); tscDebug("consumer:0x%"PRIx64" subscribe topic: %s", tmq->consumerId, topicFName);
taosArrayPush(req.topicNames, &topicFName); taosArrayPush(req.topicNames, &topicFName);
} }
...@@ -1398,7 +1405,7 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) { ...@@ -1398,7 +1405,7 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) {
} }
atomic_store_32(&tmq->epoch, epoch); atomic_store_32(&tmq->epoch, epoch);
tscDebug("consumer:0x%" PRIx64 ", update topic info completed", tmq->consumerId); tscDebug("consumer:0x%" PRIx64 " update topic info completed", tmq->consumerId);
return set; return set;
} }
...@@ -1548,7 +1555,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool async) { ...@@ -1548,7 +1555,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool async) {
sendInfo->msgType = TDMT_MND_TMQ_ASK_EP; sendInfo->msgType = TDMT_MND_TMQ_ASK_EP;
SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp); SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
tscDebug("consumer:0x%" PRIx64 " ask ep from mnode, async:%d", tmq->consumerId, async); tscDebug("consumer:0x%" PRIx64 " ask ep from mnode, async:%d, reqId:0x%"PRIx64, tmq->consumerId, async, tmq->consumerId);
int64_t transporterId = 0; int64_t transporterId = 0;
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
...@@ -1759,6 +1766,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { ...@@ -1759,6 +1766,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
while (1) { while (1) {
SMqRspWrapper* rspWrapper = NULL; SMqRspWrapper* rspWrapper = NULL;
taosGetQitem(tmq->qall, (void**)&rspWrapper); taosGetQitem(tmq->qall, (void**)&rspWrapper);
if (rspWrapper == NULL) { if (rspWrapper == NULL) {
taosReadAllQitems(tmq->mqueue, tmq->qall); taosReadAllQitems(tmq->mqueue, tmq->qall);
taosGetQitem(tmq->qall, (void**)&rspWrapper); taosGetQitem(tmq->qall, (void**)&rspWrapper);
...@@ -1881,7 +1889,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { ...@@ -1881,7 +1889,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
// in no topic status, delayed task also need to be processed // in no topic status, delayed task also need to be processed
if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__INIT) { if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__INIT) {
tscDebug("consumer:0x%" PRIx64 ", poll return since consumer status is init", tmq->consumerId); tscDebug("consumer:0x%" PRIx64 ", poll return since consumer is init", tmq->consumerId);
return NULL; return NULL;
} }
......
...@@ -912,7 +912,7 @@ TEST(clientCase, subscription_test) { ...@@ -912,7 +912,7 @@ TEST(clientCase, subscription_test) {
tmq_conf_t* conf = tmq_conf_new(); tmq_conf_t* conf = tmq_conf_new();
tmq_conf_set(conf, "enable.auto.commit", "true"); tmq_conf_set(conf, "enable.auto.commit", "true");
tmq_conf_set(conf, "auto.commit.interval.ms", "1000"); tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
tmq_conf_set(conf, "group.id", "newabcdefgjhijlm__"); tmq_conf_set(conf, "group.id", "consumer_group");
tmq_conf_set(conf, "td.connect.user", "root"); tmq_conf_set(conf, "td.connect.user", "root");
tmq_conf_set(conf, "td.connect.pass", "taosdata"); tmq_conf_set(conf, "td.connect.pass", "taosdata");
tmq_conf_set(conf, "auto.offset.reset", "earliest"); tmq_conf_set(conf, "auto.offset.reset", "earliest");
......
...@@ -327,23 +327,26 @@ int32_t tsDecompressINTImp(const char *const input, const int32_t nelements, cha ...@@ -327,23 +327,26 @@ int32_t tsDecompressINTImp(const char *const input, const int32_t nelements, cha
// calculate the cumulative sum (prefix sum) for each number // calculate the cumulative sum (prefix sum) for each number
// decode[0] = prev_value + final[0] // decode[0] = prev_value + final[0]
// decode[1] = decode[0] + final[1] -----> prev_value + final[0] + final[1] // decode[1] = decode[0] + final[1] -----> prev_value + final[0] + final[1]
// decode[2] = decode[1] + final[1] -----> prev_value + final[0] + final[1] + final[2] // decode[2] = decode[1] + final[2] -----> prev_value + final[0] + final[1] + final[2]
// decode[3] = decode[2] + final[1] -----> prev_value + final[0] + final[1] + final[2] + final[3] // decode[3] = decode[2] + final[3] -----> prev_value + final[0] + final[1] + final[2] + final[3]
// 1, 2, 3, 4 // 1, 2, 3, 4
//+ 0, 1, 2, 3 //+ 0, 1, 0, 3
// 1, 3, 5, 7 // 1, 3, 3, 7
// shift and add for the first round // shift and add for the first round
__m128i prev = _mm_set1_epi64x(prev_value); __m128i prev = _mm_set1_epi64x(prev_value);
delta = _mm256_add_epi64(delta, _mm256_slli_si256(delta, 8)); __m256i x = _mm256_slli_si256(delta, 8);
delta = _mm256_add_epi64(delta, x);
_mm256_storeu_si256((__m256i *)&p[_pos], delta); _mm256_storeu_si256((__m256i *)&p[_pos], delta);
// 1, 3, 5, 7 // 1, 3, 3, 7
//+ 0, 0, 1, 3 //+ 0, 0, 3, 3
// 1, 3, 6, 10 // 1, 3, 6, 10
// shift and add operation for the second round // shift and add operation for the second round
__m128i firstPart = _mm_loadu_si128((__m128i *)&p[_pos]); __m128i firstPart = _mm_loadu_si128((__m128i *)&p[_pos]);
__m128i secPart = _mm_add_epi64(_mm_loadu_si128((__m128i *)&p[_pos + 2]), firstPart); __m128i secondItem = _mm_set1_epi64x(p[_pos + 1]);
__m128i secPart = _mm_add_epi64(_mm_loadu_si128((__m128i *)&p[_pos + 2]), secondItem);
firstPart = _mm_add_epi64(firstPart, prev); firstPart = _mm_add_epi64(firstPart, prev);
secPart = _mm_add_epi64(secPart, prev); secPart = _mm_add_epi64(secPart, prev);
...@@ -353,15 +356,18 @@ int32_t tsDecompressINTImp(const char *const input, const int32_t nelements, cha ...@@ -353,15 +356,18 @@ int32_t tsDecompressINTImp(const char *const input, const int32_t nelements, cha
shiftBits = _mm256_add_epi64(shiftBits, inc); shiftBits = _mm256_add_epi64(shiftBits, inc);
prev_value = p[_pos + 3]; prev_value = p[_pos + 3];
// uDebug("_pos:%d %"PRId64", %"PRId64", %"PRId64", %"PRId64, _pos, p[_pos], p[_pos+1], p[_pos+2], p[_pos+3]);
_pos += 4; _pos += 4;
} }
// handle the remain value // handle the remain value
for (int32_t i = 0; i < remain; i++) { for (int32_t i = 0; i < remain; i++) {
zigzag_value = ((w >> (v + (batch * bit))) & mask); zigzag_value = ((w >> (v + (batch * bit * 4))) & mask);
prev_value += ZIGZAG_DECODE(int64_t, zigzag_value); prev_value += ZIGZAG_DECODE(int64_t, zigzag_value);
p[_pos++] = prev_value; p[_pos++] = prev_value;
// uDebug("_pos:%d %"PRId64, _pos-1, p[_pos-1]);
v += bit; v += bit;
} }
} else { } else {
...@@ -370,6 +376,8 @@ int32_t tsDecompressINTImp(const char *const input, const int32_t nelements, cha ...@@ -370,6 +376,8 @@ int32_t tsDecompressINTImp(const char *const input, const int32_t nelements, cha
prev_value += ZIGZAG_DECODE(int64_t, zigzag_value); prev_value += ZIGZAG_DECODE(int64_t, zigzag_value);
p[_pos++] = prev_value; p[_pos++] = prev_value;
// uDebug("_pos:%d %"PRId64, _pos-1, p[_pos-1]);
v += bit; v += bit;
} }
} }
......
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册