From 8be56fb73c8573bd9113908607909988a2f5d3a8 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 15 Aug 2020 11:35:04 +0800 Subject: [PATCH] [td-1148] enable the binary tag to be used as the join condition. --- src/client/src/tscFunctionImpl.c | 6 +- src/client/src/tscSQLParser.c | 5 -- src/client/src/tscSubquery.c | 9 +-- src/common/inc/tvariant.h | 2 + src/common/src/tvariant.c | 34 +++++++++- src/query/inc/qExecutor.h | 2 +- src/query/inc/qTsbuf.h | 21 ++++--- src/query/src/qExecutor.c | 10 ++- src/query/src/qTsbuf.c | 66 +++++++++++++------- src/query/tests/tsBufTest.cpp | 104 ++++++++++++++++++++++--------- 10 files changed, 176 insertions(+), 83 deletions(-) diff --git a/src/client/src/tscFunctionImpl.c b/src/client/src/tscFunctionImpl.c index 5c708dffee..205be5e3a6 100644 --- a/src/client/src/tscFunctionImpl.c +++ b/src/client/src/tscFunctionImpl.c @@ -3900,11 +3900,11 @@ static void ts_comp_function(SQLFunctionCtx *pCtx) { // primary ts must be existed, so no need to check its existance if (pCtx->order == TSDB_ORDER_ASC) { - tsBufAppend(pTSbuf, 0, pCtx->tag.i64Key, input, pCtx->size * TSDB_KEYSIZE); + tsBufAppend(pTSbuf, 0, &pCtx->tag, input, pCtx->size * TSDB_KEYSIZE); } else { for (int32_t i = pCtx->size - 1; i >= 0; --i) { char *d = GET_INPUT_CHAR_INDEX(pCtx, i); - tsBufAppend(pTSbuf, 0, pCtx->tag.i64Key, d, TSDB_KEYSIZE); + tsBufAppend(pTSbuf, 0, &pCtx->tag, d, TSDB_KEYSIZE); } } @@ -3923,7 +3923,7 @@ static void ts_comp_function_f(SQLFunctionCtx *pCtx, int32_t index) { STSBuf *pTSbuf = pInfo->pTSBuf; - tsBufAppend(pTSbuf, 0, pCtx->tag.i64Key, pData, TSDB_KEYSIZE); + tsBufAppend(pTSbuf, 0, &pCtx->tag, pData, TSDB_KEYSIZE); SET_VAL(pCtx, pCtx->size, 1); pResInfo->hasResult = DATA_SET_FLAG; diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index f933366711..43a43b6d31 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -3064,7 +3064,6 @@ static int32_t getColumnQueryCondInfo(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSQ static int32_t getJoinCondInfo(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSQLExpr* pExpr) { const char* msg1 = "invalid join query condition"; - const char* msg2 = "join on binary/nchar not supported"; const char* msg3 = "type of join columns must be identical"; const char* msg4 = "invalid column name in join condition"; @@ -3108,10 +3107,6 @@ static int32_t getJoinCondInfo(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSQLExpr* return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3); } - if (pTagSchema1->type == TSDB_DATA_TYPE_BINARY || pTagSchema1->type == TSDB_DATA_TYPE_NCHAR) { - return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2); - } - pTagCond->joinInfo.hasJoin = true; return TSDB_CODE_SUCCESS; } diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 75644c355c..eedb229e1c 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -93,13 +93,14 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJ tscInfo("%" PRId64 ", tags:%d \t %" PRId64 ", tags:%d", elem1.ts, elem1.tag, elem2.ts, elem2.tag); #endif - if (elem1.tag < elem2.tag || (elem1.tag == elem2.tag && tsCompare(order, elem1.ts, elem2.ts))) { + int32_t res = tVariantCompare(&elem1.tag, &elem2.tag); + if (res == -1 || (res == 0 && tsCompare(order, elem1.ts, elem2.ts))) { if (!tsBufNextPos(pSupporter1->pTSBuf)) { break; } numOfInput1++; - } else if (elem1.tag > elem2.tag || (elem1.tag == elem2.tag && tsCompare(order, elem2.ts, elem1.ts))) { + } else if ((res > 0) || (res == 0 && tsCompare(order, elem2.ts, elem1.ts))) { if (!tsBufNextPos(pSupporter2->pTSBuf)) { break; } @@ -119,8 +120,8 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJ win->ekey = elem1.ts; } - tsBufAppend(output1, elem1.vnode, elem1.tag, (const char*)&elem1.ts, sizeof(elem1.ts)); - tsBufAppend(output2, elem2.vnode, elem2.tag, (const char*)&elem2.ts, sizeof(elem2.ts)); + tsBufAppend(output1, elem1.vnode, &elem1.tag, (const char*)&elem1.ts, sizeof(elem1.ts)); + tsBufAppend(output2, elem2.vnode, &elem2.tag, (const char*)&elem2.ts, sizeof(elem2.ts)); } else { pLimit->offset -= 1; } diff --git a/src/common/inc/tvariant.h b/src/common/inc/tvariant.h index 4fd6ea5541..211635b4af 100644 --- a/src/common/inc/tvariant.h +++ b/src/common/inc/tvariant.h @@ -46,6 +46,8 @@ void tVariantDestroy(tVariant *pV); void tVariantAssign(tVariant *pDst, const tVariant *pSrc); +int32_t tVariantCompare(const tVariant* p1, const tVariant* p2); + int32_t tVariantToString(tVariant *pVar, char *dst); int32_t tVariantDump(tVariant *pVariant, char *payload, int16_t type, bool includeLengthPrefix); diff --git a/src/common/src/tvariant.c b/src/common/src/tvariant.c index 5460d21252..33f8eb127f 100644 --- a/src/common/src/tvariant.c +++ b/src/common/src/tvariant.c @@ -12,12 +12,10 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ +#include "os.h" #include "tvariant.h" #include "hash.h" -#include "hashfunc.h" -#include "os.h" -#include "hash.h" #include "taos.h" #include "taosdef.h" #include "tstoken.h" @@ -172,6 +170,36 @@ void tVariantAssign(tVariant *pDst, const tVariant *pSrc) { } } +int32_t tVariantCompare(const tVariant* p1, const tVariant* p2) { + assert((p1->nType != TSDB_DATA_TYPE_NULL) || (p2->nType != TSDB_DATA_TYPE_NULL)); + + switch (p1->nType) { + case TSDB_DATA_TYPE_BINARY: + case TSDB_DATA_TYPE_NCHAR: { + if (p1->nLen == p2->nLen) { + return memcmp(p1->pz, p2->pz, p1->nLen); + } else { + return p1->nLen > p2->nLen? 1:-1; + } + }; + + case TSDB_DATA_TYPE_FLOAT: + case TSDB_DATA_TYPE_DOUBLE: + if (p1->dKey == p2->dKey) { + return 0; + } else { + return p1->dKey > p2->dKey? 1:-1; + } + + default: + if (p1->i64Key == p2->i64Key) { + return 0; + } else { + return p1->i64Key > p2->i64Key? 1:-1; + } + } +} + int32_t tVariantToString(tVariant *pVar, char *dst) { if (pVar == NULL || dst == NULL) return 0; diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index b5487561b2..d824a269c9 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -98,7 +98,7 @@ typedef struct STableQueryInfo { // todo merge with the STableQueryInfo struct TSKEY lastKey; int32_t groupIndex; // group id in table list int16_t queryRangeSet; // denote if the query range is set, only available for interval query - int64_t tag; + tVariant tag; STimeWindow win; STSCursor cur; void* pTable; // for retrieve the page id list diff --git a/src/query/inc/qTsbuf.h b/src/query/inc/qTsbuf.h index 59b224e096..f6d672b26d 100644 --- a/src/query/inc/qTsbuf.h +++ b/src/query/inc/qTsbuf.h @@ -22,6 +22,7 @@ extern "C" { #include "os.h" #include "taosdef.h" +#include "tvariant.h" #define MEM_BUF_SIZE (1 << 20) #define TS_COMP_FILE_MAGIC 0x87F5EC4C @@ -42,9 +43,9 @@ typedef struct STSRawBlock { } STSRawBlock; typedef struct STSElem { - TSKEY ts; - int64_t tag; - int32_t vnode; + TSKEY ts; + tVariant tag; + int32_t vnode; } STSElem; typedef struct STSCursor { @@ -55,11 +56,11 @@ typedef struct STSCursor { } STSCursor; typedef struct STSBlock { - int64_t tag; // tag value - int32_t numOfElem; // number of elements - int32_t compLen; // size after compressed - int32_t padding; // 0xFFFFFFFF by default, after the payload - char* payload; // actual data that is compressed + tVariant tag; // tag value + int32_t numOfElem; // number of elements + int32_t compLen; // size after compressed + int32_t padding; // 0xFFFFFFFF by default, after the payload + char* payload; // actual data that is compressed } STSBlock; /* @@ -109,7 +110,7 @@ STSBuf* tsBufCreateFromCompBlocks(const char* pData, int32_t numOfBlocks, int32_ void* tsBufDestroy(STSBuf* pTSBuf); -void tsBufAppend(STSBuf* pTSBuf, int32_t vnodeId, int64_t tag, const char* pData, int32_t len); +void tsBufAppend(STSBuf* pTSBuf, int32_t vnodeId, tVariant* tag, const char* pData, int32_t len); int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf, int32_t vnodeIdx); STSBuf* tsBufClone(STSBuf* pTSBuf); @@ -122,7 +123,7 @@ void tsBufResetPos(STSBuf* pTSBuf); STSElem tsBufGetElem(STSBuf* pTSBuf); bool tsBufNextPos(STSBuf* pTSBuf); -STSElem tsBufGetElemStartPos(STSBuf* pTSBuf, int32_t vnodeId, int64_t tag); +STSElem tsBufGetElemStartPos(STSBuf* pTSBuf, int32_t vnodeId, tVariant* tag); STSCursor tsBufGetCursor(STSBuf* pTSBuf); void tsBufSetTraverseOrder(STSBuf* pTSBuf, int32_t order); diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index c8efee03cd..52408f78bf 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -1103,7 +1103,7 @@ static int32_t doTSJoinFilter(SQueryRuntimeEnv *pRuntimeEnv, int32_t offset) { SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx; // compare tag first - if (pCtx[0].tag.i64Key != elem.tag) { + if (tVariantCompare(&pCtx[0].tag, &elem.tag) != 0) { return TS_JOIN_TAG_NOT_EQUALS; } @@ -3644,9 +3644,8 @@ int32_t setAdditionalInfo(SQInfo *pQInfo, void* pTable, STableQueryInfo *pTableQ // both the master and supplement scan needs to set the correct ts comp start position if (pRuntimeEnv->pTSBuf != NULL) { if (pTableQueryInfo->cur.vgroupIndex == -1) { - pTableQueryInfo->tag = pRuntimeEnv->pCtx[0].tag.i64Key; - - tsBufGetElemStartPos(pRuntimeEnv->pTSBuf, 0, pTableQueryInfo->tag); + tVariantAssign(&pTableQueryInfo->tag, &pRuntimeEnv->pCtx[0].tag); + tsBufGetElemStartPos(pRuntimeEnv->pTSBuf, 0, &pTableQueryInfo->tag); // keep the cursor info of current meter pTableQueryInfo->cur = pRuntimeEnv->pTSBuf->cur; @@ -4501,8 +4500,7 @@ static bool multiTableMultioutputHelper(SQInfo *pQInfo, int32_t index) { if (pRuntimeEnv->pTSBuf != NULL) { if (pRuntimeEnv->cur.vgroupIndex == -1) { - int64_t tag = pRuntimeEnv->pCtx[0].tag.i64Key; - STSElem elem = tsBufGetElemStartPos(pRuntimeEnv->pTSBuf, 0, tag); + STSElem elem = tsBufGetElemStartPos(pRuntimeEnv->pTSBuf, 0, &pRuntimeEnv->pCtx[0].tag); // failed to find data with the specified tag value if (elem.vnode < 0) { diff --git a/src/query/src/qTsbuf.c b/src/query/src/qTsbuf.c index 25eb33ff7d..eb47a1b7a1 100644 --- a/src/query/src/qTsbuf.c +++ b/src/query/src/qTsbuf.c @@ -8,7 +8,6 @@ static void TSBufUpdateVnodeInfo(STSBuf* pTSBuf, int32_t index, STSVnodeBlockInf static STSBuf* allocResForTSBuf(STSBuf* pTSBuf); static int32_t STSBufUpdateHeader(STSBuf* pTSBuf, STSBufFileHeader* pHeader); - /** * todo error handling * support auto closeable tmp file @@ -225,11 +224,12 @@ static void writeDataToDisk(STSBuf* pTSBuf) { } STSBlock* pBlock = &pTSBuf->block; - - pBlock->numOfElem = pTSBuf->tsData.len / TSDB_KEYSIZE; + STSList* pTsData = &pTSBuf->tsData; + + pBlock->numOfElem = pTsData->len / TSDB_KEYSIZE; pBlock->compLen = - tsCompressTimestamp(pTSBuf->tsData.rawBuf, pTSBuf->tsData.len, pTSBuf->tsData.len / TSDB_KEYSIZE, pBlock->payload, - pTSBuf->tsData.allocSize, TWO_STAGE_COMP, pTSBuf->assistBuf, pTSBuf->bufSize); + tsCompressTimestamp(pTsData->rawBuf, pTsData->len, pTsData->len/TSDB_KEYSIZE, pBlock->payload, pTsData->allocSize, + TWO_STAGE_COMP, pTSBuf->assistBuf, pTSBuf->bufSize); int64_t r = fseek(pTSBuf->f, pTSBuf->fileSize, SEEK_SET); UNUSED(r); @@ -241,13 +241,18 @@ static void writeDataToDisk(STSBuf* pTSBuf) { * * both side has the compressed length is used to support load data forwards/backwords. */ - fwrite(&pBlock->tag, sizeof(pBlock->tag), 1, pTSBuf->f); + fwrite(&pBlock->tag.nType, sizeof(pBlock->tag.nType), 1, pTSBuf->f); + fwrite(&pBlock->tag.nLen, sizeof(pBlock->tag.nLen), 1, pTSBuf->f); + + if (pBlock->tag.nType == TSDB_DATA_TYPE_BINARY || pBlock->tag.nType == TSDB_DATA_TYPE_NCHAR) { + fwrite(pBlock->tag.pz, (size_t)pBlock->tag.nLen, 1, pTSBuf->f); + } else if (pBlock->tag.nType != TSDB_DATA_TYPE_NULL) { + fwrite(&pBlock->tag.i64Key, sizeof(int64_t), 1, pTSBuf->f); + } + fwrite(&pBlock->numOfElem, sizeof(pBlock->numOfElem), 1, pTSBuf->f); - fwrite(&pBlock->compLen, sizeof(pBlock->compLen), 1, pTSBuf->f); - fwrite(pBlock->payload, (size_t)pBlock->compLen, 1, pTSBuf->f); - fwrite(&pBlock->compLen, sizeof(pBlock->compLen), 1, pTSBuf->f); int32_t blockSize = sizeof(pBlock->tag) + sizeof(pBlock->numOfElem) + sizeof(pBlock->compLen) * 2 + pBlock->compLen; @@ -298,9 +303,24 @@ STSBlock* readDataFromDisk(STSBuf* pTSBuf, int32_t order, bool decomp) { ret = fseek(pTSBuf->f, -offset, SEEK_CUR); UNUSED(ret); } - - size_t sz = fread(&pBlock->tag, sizeof(pBlock->tag), 1, pTSBuf->f); - UNUSED(sz); + + fread(&pBlock->tag.nType, sizeof(pBlock->tag.nType), 1, pTSBuf->f); + fread(&pBlock->tag.nLen, sizeof(pBlock->tag.nLen), 1, pTSBuf->f); + + // NOTE: mix types tags are not supported + size_t sz = 0; + if (pBlock->tag.nType == TSDB_DATA_TYPE_BINARY || pBlock->tag.nType == TSDB_DATA_TYPE_NCHAR) { + char* tp = realloc(pBlock->tag.pz, pBlock->tag.nLen + 1); + assert(tp != NULL); + + memset(tp, 0, pBlock->tag.nLen + 1); + pBlock->tag.pz = tp; + + sz = fread(pBlock->tag.pz, (size_t)pBlock->tag.nLen, 1, pTSBuf->f); + } else if (pBlock->tag.nType != TSDB_DATA_TYPE_NULL) { + sz = fread(&pBlock->tag.i64Key, sizeof(int64_t), 1, pTSBuf->f); + } + sz = fread(&pBlock->numOfElem, sizeof(pBlock->numOfElem), 1, pTSBuf->f); UNUSED(sz); sz = fread(&pBlock->compLen, sizeof(pBlock->compLen), 1, pTSBuf->f); @@ -361,7 +381,7 @@ static int32_t setCheckTSOrder(STSBuf* pTSBuf, const char* pData, int32_t len) { return TSDB_CODE_SUCCESS; } -void tsBufAppend(STSBuf* pTSBuf, int32_t vnodeId, int64_t tag, const char* pData, int32_t len) { +void tsBufAppend(STSBuf* pTSBuf, int32_t vnodeId, tVariant* tag, const char* pData, int32_t len) { STSVnodeBlockInfoEx* pBlockInfo = NULL; STSList* ptsData = &pTSBuf->tsData; @@ -375,15 +395,15 @@ void tsBufAppend(STSBuf* pTSBuf, int32_t vnodeId, int64_t tag, const char* pData } assert(pBlockInfo->info.vnode == vnodeId); - - if (pTSBuf->block.tag != tag && ptsData->len > 0) { + + if ((tVariantCompare(&pTSBuf->block.tag, tag) != 0) && ptsData->len > 0) { // new arrived data with different tags value, save current value into disk first writeDataToDisk(pTSBuf); } else { expandBuffer(ptsData, len); } - pTSBuf->block.tag = tag; + tVariantAssign(&pTSBuf->block.tag, tag); memcpy(ptsData->rawBuf + ptsData->len, pData, (size_t)len); // todo check return value @@ -465,7 +485,7 @@ static int32_t tsBufFindBlock(STSBuf* pTSBuf, STSVnodeBlockInfo* pBlockInfo, int return 0; } -static int32_t tsBufFindBlockByTag(STSBuf* pTSBuf, STSVnodeBlockInfo* pBlockInfo, int64_t tag) { +static int32_t tsBufFindBlockByTag(STSBuf* pTSBuf, STSVnodeBlockInfo* pBlockInfo, tVariant* tag) { bool decomp = false; int64_t offset = 0; @@ -484,7 +504,7 @@ static int32_t tsBufFindBlockByTag(STSBuf* pTSBuf, STSVnodeBlockInfo* pBlockInfo return -1; } - if (pTSBuf->block.tag == tag) { + if (tVariantCompare(&pTSBuf->block.tag, tag) == 0) { return i; } } @@ -669,8 +689,8 @@ STSElem tsBufGetElem(STSBuf* pTSBuf) { elem1.vnode = pTSBuf->pData[pCur->vgroupIndex].info.vnode; elem1.ts = *(TSKEY*)(pTSBuf->tsData.rawBuf + pCur->tsIndex * TSDB_KEYSIZE); - elem1.tag = pBlock->tag; - + tVariantAssign(&elem1.tag, &pBlock->tag); + return elem1; } @@ -800,7 +820,7 @@ STSBuf* tsBufCreateFromCompBlocks(const char* pData, int32_t numOfBlocks, int32_ return pTSBuf; } -STSElem tsBufGetElemStartPos(STSBuf* pTSBuf, int32_t vnodeId, int64_t tag) { +STSElem tsBufGetElemStartPos(STSBuf* pTSBuf, int32_t vnodeId, tVariant* tag) { STSElem elem = {.vnode = -1}; if (pTSBuf == NULL) { @@ -881,7 +901,9 @@ void tsBufDisplay(STSBuf* pTSBuf) { while (tsBufNextPos(pTSBuf)) { STSElem elem = tsBufGetElem(pTSBuf); - printf("%d-%" PRId64 "-%" PRId64 "\n", elem.vnode, elem.tag, elem.ts); + if (elem.tag.nType == TSDB_DATA_TYPE_BIGINT) { + printf("%d-%" PRId64 "-%" PRId64 "\n", elem.vnode, elem.tag.i64Key, elem.ts); + } } pTSBuf->cur.order = old; diff --git a/src/query/tests/tsBufTest.cpp b/src/query/tests/tsBufTest.cpp index f8738eec9c..b78c5314f2 100644 --- a/src/query/tests/tsBufTest.cpp +++ b/src/query/tests/tsBufTest.cpp @@ -32,14 +32,16 @@ void simpleTest() { // write 10 ts points int32_t num = 10; - int64_t tag = 1; + tVariant t = {0}; + t.nType = TSDB_DATA_TYPE_BIGINT; + t.i64Key = 1; int64_t* list = createTsList(10, 10000000, 30); - tsBufAppend(pTSBuf, 0, tag, (const char*)list, num * sizeof(int64_t)); + tsBufAppend(pTSBuf, 0, &t, (const char*)list, num * sizeof(int64_t)); EXPECT_EQ(pTSBuf->tsOrder, TSDB_ORDER_ASC); EXPECT_EQ(pTSBuf->tsData.len, sizeof(int64_t) * num); - EXPECT_EQ(pTSBuf->block.tag, tag); + EXPECT_EQ(tVariantCompare(&pTSBuf->block.tag, &t), 0); EXPECT_EQ(pTSBuf->numOfVnodes, 1); tsBufFlush(pTSBuf); @@ -57,14 +59,16 @@ void largeTSTest() { // write 10 ts points int32_t num = 1000000; - int64_t tag = 1; + tVariant t = {0}; + t.nType = TSDB_DATA_TYPE_BIGINT; + t.i64Key = 1; int64_t* list = createTsList(num, 10000000, 30); - tsBufAppend(pTSBuf, 0, tag, (const char*)list, num * sizeof(int64_t)); + tsBufAppend(pTSBuf, 0, &t, (const char*)list, num * sizeof(int64_t)); // the data has been flush to disk, no data in cache EXPECT_EQ(pTSBuf->tsData.len, 0); - EXPECT_EQ(pTSBuf->block.tag, tag); + EXPECT_EQ(tVariantCompare(&pTSBuf->block.tag, &t), 0); EXPECT_EQ(pTSBuf->numOfVnodes, 1); EXPECT_EQ(pTSBuf->tsOrder, TSDB_ORDER_ASC); @@ -80,14 +84,18 @@ void multiTagsTest() { STSBuf* pTSBuf = tsBufCreate(true, TSDB_ORDER_ASC); int32_t num = 10000; - int64_t tag = 1; + tVariant t = {0}; + t.nType = TSDB_DATA_TYPE_BIGINT; + int64_t start = 10000000; int32_t numOfTags = 50; int32_t step = 30; for (int32_t i = 0; i < numOfTags; ++i) { int64_t* list = createTsList(num, start, step); - tsBufAppend(pTSBuf, 0, i, (const char*)list, num * sizeof(int64_t)); + t.i64Key = i; + + tsBufAppend(pTSBuf, 0, &t, (const char*)list, num * sizeof(int64_t)); free(list); start += step * num; @@ -96,7 +104,7 @@ void multiTagsTest() { EXPECT_EQ(pTSBuf->tsOrder, TSDB_ORDER_ASC); EXPECT_EQ(pTSBuf->tsData.len, num * sizeof(int64_t)); - EXPECT_EQ(pTSBuf->block.tag, numOfTags - 1); + EXPECT_EQ(pTSBuf->block.tag.i64Key, numOfTags - 1); EXPECT_EQ(pTSBuf->numOfVnodes, 1); tsBufFlush(pTSBuf); @@ -118,9 +126,14 @@ void multiVnodeTagsTest() { for (int32_t j = 0; j < 20; ++j) { // vnodeId:0 start = 10000000; + tVariant t = {0}; + t.nType = TSDB_DATA_TYPE_BIGINT; + for (int32_t i = 0; i < numOfTags; ++i) { int64_t* list = createTsList(num, start, step); - tsBufAppend(pTSBuf, j, i, (const char*)list, num * sizeof(int64_t)); + t.i64Key = i; + + tsBufAppend(pTSBuf, j, &t, (const char*)list, num * sizeof(int64_t)); free(list); start += step * num; @@ -131,11 +144,11 @@ void multiVnodeTagsTest() { EXPECT_EQ(pTSBuf->tsOrder, TSDB_ORDER_ASC); EXPECT_EQ(pTSBuf->tsData.len, num * sizeof(int64_t)); - EXPECT_EQ(pTSBuf->block.tag, numOfTags - 1); + EXPECT_EQ(pTSBuf->block.tag.i64Key, numOfTags - 1); EXPECT_EQ(pTSBuf->tsData.len, num * sizeof(int64_t)); - EXPECT_EQ(pTSBuf->block.tag, numOfTags - 1); + EXPECT_EQ(pTSBuf->block.tag.i64Key, numOfTags - 1); tsBufFlush(pTSBuf); EXPECT_EQ(pTSBuf->tsData.len, 0); @@ -157,9 +170,14 @@ void loadDataTest() { for (int32_t j = 0; j < numOfVnode; ++j) { // vnodeId:0 int64_t start = 10000000; + tVariant t = {0}; + t.nType = TSDB_DATA_TYPE_BIGINT; + for (int32_t i = 0; i < numOfTags; ++i) { int64_t* list = createTsList(num, start, step); - tsBufAppend(pTSBuf, j, i, (const char*)list, num * sizeof(int64_t)); + t.i64Key = i; + + tsBufAppend(pTSBuf, j, &t, (const char*)list, num * sizeof(int64_t)); printf("%d - %" PRIu64 "\n", i, list[0]); free(list); @@ -172,11 +190,11 @@ void loadDataTest() { EXPECT_EQ(pTSBuf->tsOrder, TSDB_ORDER_ASC); EXPECT_EQ(pTSBuf->tsData.len, num * sizeof(int64_t)); - EXPECT_EQ(pTSBuf->block.tag, numOfTags - 1); + EXPECT_EQ(pTSBuf->block.tag.i64Key, numOfTags - 1); EXPECT_EQ(pTSBuf->tsData.len, num * sizeof(int64_t)); - EXPECT_EQ(pTSBuf->block.tag, numOfTags - 1); + EXPECT_EQ(pTSBuf->block.tag.i64Key, numOfTags - 1); tsBufFlush(pTSBuf); EXPECT_EQ(pTSBuf->tsData.len, 0); @@ -230,16 +248,21 @@ void TSTraverse() { for (int32_t j = 0; j < numOfVnode; ++j) { // vnodeId:0 int64_t start = 10000000; + tVariant t = {0}; + t.nType = TSDB_DATA_TYPE_BIGINT; + for (int32_t i = 0; i < numOfTags; ++i) { int64_t* list = createTsList(num, start, step); - tsBufAppend(pTSBuf, j, i, (const char*)list, num * sizeof(int64_t)); + t.i64Key = i; + + tsBufAppend(pTSBuf, j, &t, (const char*)list, num * sizeof(int64_t)); printf("%d - %d - %" PRIu64 ", %" PRIu64 "\n", j, i, list[0], list[num - 1]); free(list); start += step * num; list = createTsList(num, start, step); - tsBufAppend(pTSBuf, j, i, (const char*)list, num * sizeof(int64_t)); + tsBufAppend(pTSBuf, j, &t, (const char*)list, num * sizeof(int64_t)); printf("%d - %d - %" PRIu64 ", %" PRIu64 "\n", j, i, list[0], list[num - 1]); free(list); @@ -272,12 +295,16 @@ void TSTraverse() { int32_t startVnode = 1; int32_t startTag = 2; - tsBufGetElemStartPos(pTSBuf, startVnode, startTag); + tVariant t = {0}; + t.nType = TSDB_DATA_TYPE_BIGINT; + t.i64Key = startTag; + + tsBufGetElemStartPos(pTSBuf, startVnode, &t); int32_t totalOutput = 10; while (1) { STSElem elem = tsBufGetElem(pTSBuf); - printf("%d-%" PRIu64 "-%" PRIu64 "\n", elem.vnode, elem.tag, elem.ts); + printf("%d-%" PRIu64 "-%" PRIu64 "\n", elem.vnode, elem.tag.i64Key, elem.ts); if (!tsBufNextPos(pTSBuf)) { break; @@ -286,7 +313,9 @@ void TSTraverse() { if (--totalOutput <= 0) { totalOutput = 10; - tsBufGetElemStartPos(pTSBuf, startVnode, --startTag); + startTag -= 1; + t.i64Key = startTag; + tsBufGetElemStartPos(pTSBuf, startVnode, &t); if (startTag == 0) { startVnode -= 1; @@ -316,13 +345,14 @@ void TSTraverse() { startVnode = 1; startTag = 2; + t.i64Key = startTag; - tsBufGetElemStartPos(pTSBuf, startVnode, startTag); + tsBufGetElemStartPos(pTSBuf, startVnode, &t); totalOutput = 10; while (1) { STSElem elem = tsBufGetElem(pTSBuf); - printf("%d-%" PRIu64 "-%" PRIu64 "\n", elem.vnode, elem.tag, elem.ts); + printf("%d-%" PRIu64 "-%" PRIu64 "\n", elem.vnode, elem.tag.i64Key, elem.ts); if (!tsBufNextPos(pTSBuf)) { break; @@ -331,7 +361,9 @@ void TSTraverse() { if (--totalOutput <= 0) { totalOutput = 10; - tsBufGetElemStartPos(pTSBuf, startVnode, --startTag); + startTag -= 1; + t.i64Key = startTag; + tsBufGetElemStartPos(pTSBuf, startVnode, &t); if (startTag < 0) { startVnode -= 1; @@ -375,12 +407,17 @@ void mergeDiffVnodeBufferTest() { int32_t num = 1000; int32_t numOfTags = 10; + tVariant t = {0}; + t.nType = TSDB_DATA_TYPE_BIGINT; + // vnodeId:0 int64_t start = 10000000; for (int32_t i = 0; i < numOfTags; ++i) { int64_t* list = createTsList(num, start, step); - tsBufAppend(pTSBuf1, 0, i, (const char*)list, num * sizeof(int64_t)); - tsBufAppend(pTSBuf2, 0, i, (const char*)list, num * sizeof(int64_t)); + t.i64Key = i; + + tsBufAppend(pTSBuf1, 0, &t, (const char*)list, num * sizeof(int64_t)); + tsBufAppend(pTSBuf2, 0, &t, (const char*)list, num * sizeof(int64_t)); free(list); @@ -403,6 +440,9 @@ void mergeIdenticalVnodeBufferTest() { STSBuf* pTSBuf1 = tsBufCreate(true, TSDB_ORDER_ASC); STSBuf* pTSBuf2 = tsBufCreate(true, TSDB_ORDER_ASC); + tVariant t = {0}; + t.nType = TSDB_DATA_TYPE_BIGINT; + int32_t step = 30; int32_t num = 1000; int32_t numOfTags = 10; @@ -411,17 +451,21 @@ void mergeIdenticalVnodeBufferTest() { int64_t start = 10000000; for (int32_t i = 0; i < numOfTags; ++i) { int64_t* list = createTsList(num, start, step); + t.i64Key = i; - tsBufAppend(pTSBuf1, 12, i, (const char*)list, num * sizeof(int64_t)); + tsBufAppend(pTSBuf1, 12, &t, (const char*)list, num * sizeof(int64_t)); free(list); start += step * num; } + + for (int32_t i = numOfTags; i < numOfTags * 2; ++i) { int64_t* list = createTsList(num, start, step); - tsBufAppend(pTSBuf2, 77, i, (const char*)list, num * sizeof(int64_t)); + t.i64Key = i; + tsBufAppend(pTSBuf2, 77, &t, (const char*)list, num * sizeof(int64_t)); free(list); start += step * num; @@ -438,7 +482,7 @@ void mergeIdenticalVnodeBufferTest() { STSElem elem = tsBufGetElem(pTSBuf1); EXPECT_EQ(elem.vnode, 12); - printf("%d-%" PRIu64 "-%" PRIu64 "\n", elem.vnode, elem.tag, elem.ts); + printf("%d-%" PRIu64 "-%" PRIu64 "\n", elem.vnode, elem.tag.i64Key, elem.ts); } tsBufDestroy(pTSBuf1); @@ -446,6 +490,8 @@ void mergeIdenticalVnodeBufferTest() { } } // namespace + +//TODO add binary tag value test case TEST(testCase, tsBufTest) { simpleTest(); largeTSTest(); @@ -453,7 +499,7 @@ TEST(testCase, tsBufTest) { multiVnodeTagsTest(); loadDataTest(); invalidFileTest(); - // randomIncTsTest(); +// randomIncTsTest(); TSTraverse(); mergeDiffVnodeBufferTest(); mergeIdenticalVnodeBufferTest(); -- GitLab