提交 8be56fb7 编写于 作者: H Haojun Liao

[td-1148] enable the binary tag to be used as the join condition.

上级 179f933e
...@@ -3900,11 +3900,11 @@ static void ts_comp_function(SQLFunctionCtx *pCtx) { ...@@ -3900,11 +3900,11 @@ static void ts_comp_function(SQLFunctionCtx *pCtx) {
// primary ts must be existed, so no need to check its existance // primary ts must be existed, so no need to check its existance
if (pCtx->order == TSDB_ORDER_ASC) { if (pCtx->order == TSDB_ORDER_ASC) {
tsBufAppend(pTSbuf, 0, pCtx->tag.i64Key, input, pCtx->size * TSDB_KEYSIZE); tsBufAppend(pTSbuf, 0, &pCtx->tag, input, pCtx->size * TSDB_KEYSIZE);
} else { } else {
for (int32_t i = pCtx->size - 1; i >= 0; --i) { for (int32_t i = pCtx->size - 1; i >= 0; --i) {
char *d = GET_INPUT_CHAR_INDEX(pCtx, i); char *d = GET_INPUT_CHAR_INDEX(pCtx, i);
tsBufAppend(pTSbuf, 0, pCtx->tag.i64Key, d, TSDB_KEYSIZE); tsBufAppend(pTSbuf, 0, &pCtx->tag, d, TSDB_KEYSIZE);
} }
} }
...@@ -3923,7 +3923,7 @@ static void ts_comp_function_f(SQLFunctionCtx *pCtx, int32_t index) { ...@@ -3923,7 +3923,7 @@ static void ts_comp_function_f(SQLFunctionCtx *pCtx, int32_t index) {
STSBuf *pTSbuf = pInfo->pTSBuf; STSBuf *pTSbuf = pInfo->pTSBuf;
tsBufAppend(pTSbuf, 0, pCtx->tag.i64Key, pData, TSDB_KEYSIZE); tsBufAppend(pTSbuf, 0, &pCtx->tag, pData, TSDB_KEYSIZE);
SET_VAL(pCtx, pCtx->size, 1); SET_VAL(pCtx, pCtx->size, 1);
pResInfo->hasResult = DATA_SET_FLAG; pResInfo->hasResult = DATA_SET_FLAG;
......
...@@ -3064,7 +3064,6 @@ static int32_t getColumnQueryCondInfo(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSQ ...@@ -3064,7 +3064,6 @@ static int32_t getColumnQueryCondInfo(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSQ
static int32_t getJoinCondInfo(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSQLExpr* pExpr) { static int32_t getJoinCondInfo(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSQLExpr* pExpr) {
const char* msg1 = "invalid join query condition"; const char* msg1 = "invalid join query condition";
const char* msg2 = "join on binary/nchar not supported";
const char* msg3 = "type of join columns must be identical"; const char* msg3 = "type of join columns must be identical";
const char* msg4 = "invalid column name in join condition"; const char* msg4 = "invalid column name in join condition";
...@@ -3108,10 +3107,6 @@ static int32_t getJoinCondInfo(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSQLExpr* ...@@ -3108,10 +3107,6 @@ static int32_t getJoinCondInfo(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSQLExpr*
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3); return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3);
} }
if (pTagSchema1->type == TSDB_DATA_TYPE_BINARY || pTagSchema1->type == TSDB_DATA_TYPE_NCHAR) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
pTagCond->joinInfo.hasJoin = true; pTagCond->joinInfo.hasJoin = true;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -93,13 +93,14 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJ ...@@ -93,13 +93,14 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJ
tscInfo("%" PRId64 ", tags:%d \t %" PRId64 ", tags:%d", elem1.ts, elem1.tag, elem2.ts, elem2.tag); tscInfo("%" PRId64 ", tags:%d \t %" PRId64 ", tags:%d", elem1.ts, elem1.tag, elem2.ts, elem2.tag);
#endif #endif
if (elem1.tag < elem2.tag || (elem1.tag == elem2.tag && tsCompare(order, elem1.ts, elem2.ts))) { int32_t res = tVariantCompare(&elem1.tag, &elem2.tag);
if (res == -1 || (res == 0 && tsCompare(order, elem1.ts, elem2.ts))) {
if (!tsBufNextPos(pSupporter1->pTSBuf)) { if (!tsBufNextPos(pSupporter1->pTSBuf)) {
break; break;
} }
numOfInput1++; numOfInput1++;
} else if (elem1.tag > elem2.tag || (elem1.tag == elem2.tag && tsCompare(order, elem2.ts, elem1.ts))) { } else if ((res > 0) || (res == 0 && tsCompare(order, elem2.ts, elem1.ts))) {
if (!tsBufNextPos(pSupporter2->pTSBuf)) { if (!tsBufNextPos(pSupporter2->pTSBuf)) {
break; break;
} }
...@@ -119,8 +120,8 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJ ...@@ -119,8 +120,8 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJ
win->ekey = elem1.ts; win->ekey = elem1.ts;
} }
tsBufAppend(output1, elem1.vnode, elem1.tag, (const char*)&elem1.ts, sizeof(elem1.ts)); tsBufAppend(output1, elem1.vnode, &elem1.tag, (const char*)&elem1.ts, sizeof(elem1.ts));
tsBufAppend(output2, elem2.vnode, elem2.tag, (const char*)&elem2.ts, sizeof(elem2.ts)); tsBufAppend(output2, elem2.vnode, &elem2.tag, (const char*)&elem2.ts, sizeof(elem2.ts));
} else { } else {
pLimit->offset -= 1; pLimit->offset -= 1;
} }
......
...@@ -46,6 +46,8 @@ void tVariantDestroy(tVariant *pV); ...@@ -46,6 +46,8 @@ void tVariantDestroy(tVariant *pV);
void tVariantAssign(tVariant *pDst, const tVariant *pSrc); void tVariantAssign(tVariant *pDst, const tVariant *pSrc);
int32_t tVariantCompare(const tVariant* p1, const tVariant* p2);
int32_t tVariantToString(tVariant *pVar, char *dst); int32_t tVariantToString(tVariant *pVar, char *dst);
int32_t tVariantDump(tVariant *pVariant, char *payload, int16_t type, bool includeLengthPrefix); int32_t tVariantDump(tVariant *pVariant, char *payload, int16_t type, bool includeLengthPrefix);
......
...@@ -12,12 +12,10 @@ ...@@ -12,12 +12,10 @@
* You should have received a copy of the GNU Affero General Public License * You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "os.h"
#include "tvariant.h" #include "tvariant.h"
#include "hash.h" #include "hash.h"
#include "hashfunc.h"
#include "os.h"
#include "hash.h"
#include "taos.h" #include "taos.h"
#include "taosdef.h" #include "taosdef.h"
#include "tstoken.h" #include "tstoken.h"
...@@ -172,6 +170,36 @@ void tVariantAssign(tVariant *pDst, const tVariant *pSrc) { ...@@ -172,6 +170,36 @@ void tVariantAssign(tVariant *pDst, const tVariant *pSrc) {
} }
} }
int32_t tVariantCompare(const tVariant* p1, const tVariant* p2) {
assert((p1->nType != TSDB_DATA_TYPE_NULL) || (p2->nType != TSDB_DATA_TYPE_NULL));
switch (p1->nType) {
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR: {
if (p1->nLen == p2->nLen) {
return memcmp(p1->pz, p2->pz, p1->nLen);
} else {
return p1->nLen > p2->nLen? 1:-1;
}
};
case TSDB_DATA_TYPE_FLOAT:
case TSDB_DATA_TYPE_DOUBLE:
if (p1->dKey == p2->dKey) {
return 0;
} else {
return p1->dKey > p2->dKey? 1:-1;
}
default:
if (p1->i64Key == p2->i64Key) {
return 0;
} else {
return p1->i64Key > p2->i64Key? 1:-1;
}
}
}
int32_t tVariantToString(tVariant *pVar, char *dst) { int32_t tVariantToString(tVariant *pVar, char *dst) {
if (pVar == NULL || dst == NULL) return 0; if (pVar == NULL || dst == NULL) return 0;
......
...@@ -98,7 +98,7 @@ typedef struct STableQueryInfo { // todo merge with the STableQueryInfo struct ...@@ -98,7 +98,7 @@ typedef struct STableQueryInfo { // todo merge with the STableQueryInfo struct
TSKEY lastKey; TSKEY lastKey;
int32_t groupIndex; // group id in table list int32_t groupIndex; // group id in table list
int16_t queryRangeSet; // denote if the query range is set, only available for interval query int16_t queryRangeSet; // denote if the query range is set, only available for interval query
int64_t tag; tVariant tag;
STimeWindow win; STimeWindow win;
STSCursor cur; STSCursor cur;
void* pTable; // for retrieve the page id list void* pTable; // for retrieve the page id list
......
...@@ -22,6 +22,7 @@ extern "C" { ...@@ -22,6 +22,7 @@ extern "C" {
#include "os.h" #include "os.h"
#include "taosdef.h" #include "taosdef.h"
#include "tvariant.h"
#define MEM_BUF_SIZE (1 << 20) #define MEM_BUF_SIZE (1 << 20)
#define TS_COMP_FILE_MAGIC 0x87F5EC4C #define TS_COMP_FILE_MAGIC 0x87F5EC4C
...@@ -42,9 +43,9 @@ typedef struct STSRawBlock { ...@@ -42,9 +43,9 @@ typedef struct STSRawBlock {
} STSRawBlock; } STSRawBlock;
typedef struct STSElem { typedef struct STSElem {
TSKEY ts; TSKEY ts;
int64_t tag; tVariant tag;
int32_t vnode; int32_t vnode;
} STSElem; } STSElem;
typedef struct STSCursor { typedef struct STSCursor {
...@@ -55,11 +56,11 @@ typedef struct STSCursor { ...@@ -55,11 +56,11 @@ typedef struct STSCursor {
} STSCursor; } STSCursor;
typedef struct STSBlock { typedef struct STSBlock {
int64_t tag; // tag value tVariant tag; // tag value
int32_t numOfElem; // number of elements int32_t numOfElem; // number of elements
int32_t compLen; // size after compressed int32_t compLen; // size after compressed
int32_t padding; // 0xFFFFFFFF by default, after the payload int32_t padding; // 0xFFFFFFFF by default, after the payload
char* payload; // actual data that is compressed char* payload; // actual data that is compressed
} STSBlock; } STSBlock;
/* /*
...@@ -109,7 +110,7 @@ STSBuf* tsBufCreateFromCompBlocks(const char* pData, int32_t numOfBlocks, int32_ ...@@ -109,7 +110,7 @@ STSBuf* tsBufCreateFromCompBlocks(const char* pData, int32_t numOfBlocks, int32_
void* tsBufDestroy(STSBuf* pTSBuf); void* tsBufDestroy(STSBuf* pTSBuf);
void tsBufAppend(STSBuf* pTSBuf, int32_t vnodeId, int64_t tag, const char* pData, int32_t len); void tsBufAppend(STSBuf* pTSBuf, int32_t vnodeId, tVariant* tag, const char* pData, int32_t len);
int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf, int32_t vnodeIdx); int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf, int32_t vnodeIdx);
STSBuf* tsBufClone(STSBuf* pTSBuf); STSBuf* tsBufClone(STSBuf* pTSBuf);
...@@ -122,7 +123,7 @@ void tsBufResetPos(STSBuf* pTSBuf); ...@@ -122,7 +123,7 @@ void tsBufResetPos(STSBuf* pTSBuf);
STSElem tsBufGetElem(STSBuf* pTSBuf); STSElem tsBufGetElem(STSBuf* pTSBuf);
bool tsBufNextPos(STSBuf* pTSBuf); bool tsBufNextPos(STSBuf* pTSBuf);
STSElem tsBufGetElemStartPos(STSBuf* pTSBuf, int32_t vnodeId, int64_t tag); STSElem tsBufGetElemStartPos(STSBuf* pTSBuf, int32_t vnodeId, tVariant* tag);
STSCursor tsBufGetCursor(STSBuf* pTSBuf); STSCursor tsBufGetCursor(STSBuf* pTSBuf);
void tsBufSetTraverseOrder(STSBuf* pTSBuf, int32_t order); void tsBufSetTraverseOrder(STSBuf* pTSBuf, int32_t order);
......
...@@ -1103,7 +1103,7 @@ static int32_t doTSJoinFilter(SQueryRuntimeEnv *pRuntimeEnv, int32_t offset) { ...@@ -1103,7 +1103,7 @@ static int32_t doTSJoinFilter(SQueryRuntimeEnv *pRuntimeEnv, int32_t offset) {
SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx; SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
// compare tag first // compare tag first
if (pCtx[0].tag.i64Key != elem.tag) { if (tVariantCompare(&pCtx[0].tag, &elem.tag) != 0) {
return TS_JOIN_TAG_NOT_EQUALS; return TS_JOIN_TAG_NOT_EQUALS;
} }
...@@ -3644,9 +3644,8 @@ int32_t setAdditionalInfo(SQInfo *pQInfo, void* pTable, STableQueryInfo *pTableQ ...@@ -3644,9 +3644,8 @@ int32_t setAdditionalInfo(SQInfo *pQInfo, void* pTable, STableQueryInfo *pTableQ
// both the master and supplement scan needs to set the correct ts comp start position // both the master and supplement scan needs to set the correct ts comp start position
if (pRuntimeEnv->pTSBuf != NULL) { if (pRuntimeEnv->pTSBuf != NULL) {
if (pTableQueryInfo->cur.vgroupIndex == -1) { if (pTableQueryInfo->cur.vgroupIndex == -1) {
pTableQueryInfo->tag = pRuntimeEnv->pCtx[0].tag.i64Key; tVariantAssign(&pTableQueryInfo->tag, &pRuntimeEnv->pCtx[0].tag);
tsBufGetElemStartPos(pRuntimeEnv->pTSBuf, 0, &pTableQueryInfo->tag);
tsBufGetElemStartPos(pRuntimeEnv->pTSBuf, 0, pTableQueryInfo->tag);
// keep the cursor info of current meter // keep the cursor info of current meter
pTableQueryInfo->cur = pRuntimeEnv->pTSBuf->cur; pTableQueryInfo->cur = pRuntimeEnv->pTSBuf->cur;
...@@ -4501,8 +4500,7 @@ static bool multiTableMultioutputHelper(SQInfo *pQInfo, int32_t index) { ...@@ -4501,8 +4500,7 @@ static bool multiTableMultioutputHelper(SQInfo *pQInfo, int32_t index) {
if (pRuntimeEnv->pTSBuf != NULL) { if (pRuntimeEnv->pTSBuf != NULL) {
if (pRuntimeEnv->cur.vgroupIndex == -1) { if (pRuntimeEnv->cur.vgroupIndex == -1) {
int64_t tag = pRuntimeEnv->pCtx[0].tag.i64Key; STSElem elem = tsBufGetElemStartPos(pRuntimeEnv->pTSBuf, 0, &pRuntimeEnv->pCtx[0].tag);
STSElem elem = tsBufGetElemStartPos(pRuntimeEnv->pTSBuf, 0, tag);
// failed to find data with the specified tag value // failed to find data with the specified tag value
if (elem.vnode < 0) { if (elem.vnode < 0) {
......
...@@ -8,7 +8,6 @@ static void TSBufUpdateVnodeInfo(STSBuf* pTSBuf, int32_t index, STSVnodeBlockInf ...@@ -8,7 +8,6 @@ static void TSBufUpdateVnodeInfo(STSBuf* pTSBuf, int32_t index, STSVnodeBlockInf
static STSBuf* allocResForTSBuf(STSBuf* pTSBuf); static STSBuf* allocResForTSBuf(STSBuf* pTSBuf);
static int32_t STSBufUpdateHeader(STSBuf* pTSBuf, STSBufFileHeader* pHeader); static int32_t STSBufUpdateHeader(STSBuf* pTSBuf, STSBufFileHeader* pHeader);
/** /**
* todo error handling * todo error handling
* support auto closeable tmp file * support auto closeable tmp file
...@@ -225,11 +224,12 @@ static void writeDataToDisk(STSBuf* pTSBuf) { ...@@ -225,11 +224,12 @@ static void writeDataToDisk(STSBuf* pTSBuf) {
} }
STSBlock* pBlock = &pTSBuf->block; STSBlock* pBlock = &pTSBuf->block;
STSList* pTsData = &pTSBuf->tsData;
pBlock->numOfElem = pTSBuf->tsData.len / TSDB_KEYSIZE;
pBlock->numOfElem = pTsData->len / TSDB_KEYSIZE;
pBlock->compLen = pBlock->compLen =
tsCompressTimestamp(pTSBuf->tsData.rawBuf, pTSBuf->tsData.len, pTSBuf->tsData.len / TSDB_KEYSIZE, pBlock->payload, tsCompressTimestamp(pTsData->rawBuf, pTsData->len, pTsData->len/TSDB_KEYSIZE, pBlock->payload, pTsData->allocSize,
pTSBuf->tsData.allocSize, TWO_STAGE_COMP, pTSBuf->assistBuf, pTSBuf->bufSize); TWO_STAGE_COMP, pTSBuf->assistBuf, pTSBuf->bufSize);
int64_t r = fseek(pTSBuf->f, pTSBuf->fileSize, SEEK_SET); int64_t r = fseek(pTSBuf->f, pTSBuf->fileSize, SEEK_SET);
UNUSED(r); UNUSED(r);
...@@ -241,13 +241,18 @@ static void writeDataToDisk(STSBuf* pTSBuf) { ...@@ -241,13 +241,18 @@ static void writeDataToDisk(STSBuf* pTSBuf) {
* *
* both side has the compressed length is used to support load data forwards/backwords. * both side has the compressed length is used to support load data forwards/backwords.
*/ */
fwrite(&pBlock->tag, sizeof(pBlock->tag), 1, pTSBuf->f); fwrite(&pBlock->tag.nType, sizeof(pBlock->tag.nType), 1, pTSBuf->f);
fwrite(&pBlock->tag.nLen, sizeof(pBlock->tag.nLen), 1, pTSBuf->f);
if (pBlock->tag.nType == TSDB_DATA_TYPE_BINARY || pBlock->tag.nType == TSDB_DATA_TYPE_NCHAR) {
fwrite(pBlock->tag.pz, (size_t)pBlock->tag.nLen, 1, pTSBuf->f);
} else if (pBlock->tag.nType != TSDB_DATA_TYPE_NULL) {
fwrite(&pBlock->tag.i64Key, sizeof(int64_t), 1, pTSBuf->f);
}
fwrite(&pBlock->numOfElem, sizeof(pBlock->numOfElem), 1, pTSBuf->f); fwrite(&pBlock->numOfElem, sizeof(pBlock->numOfElem), 1, pTSBuf->f);
fwrite(&pBlock->compLen, sizeof(pBlock->compLen), 1, pTSBuf->f); fwrite(&pBlock->compLen, sizeof(pBlock->compLen), 1, pTSBuf->f);
fwrite(pBlock->payload, (size_t)pBlock->compLen, 1, pTSBuf->f); fwrite(pBlock->payload, (size_t)pBlock->compLen, 1, pTSBuf->f);
fwrite(&pBlock->compLen, sizeof(pBlock->compLen), 1, pTSBuf->f); fwrite(&pBlock->compLen, sizeof(pBlock->compLen), 1, pTSBuf->f);
int32_t blockSize = sizeof(pBlock->tag) + sizeof(pBlock->numOfElem) + sizeof(pBlock->compLen) * 2 + pBlock->compLen; int32_t blockSize = sizeof(pBlock->tag) + sizeof(pBlock->numOfElem) + sizeof(pBlock->compLen) * 2 + pBlock->compLen;
...@@ -298,9 +303,24 @@ STSBlock* readDataFromDisk(STSBuf* pTSBuf, int32_t order, bool decomp) { ...@@ -298,9 +303,24 @@ STSBlock* readDataFromDisk(STSBuf* pTSBuf, int32_t order, bool decomp) {
ret = fseek(pTSBuf->f, -offset, SEEK_CUR); ret = fseek(pTSBuf->f, -offset, SEEK_CUR);
UNUSED(ret); UNUSED(ret);
} }
size_t sz = fread(&pBlock->tag, sizeof(pBlock->tag), 1, pTSBuf->f); fread(&pBlock->tag.nType, sizeof(pBlock->tag.nType), 1, pTSBuf->f);
UNUSED(sz); fread(&pBlock->tag.nLen, sizeof(pBlock->tag.nLen), 1, pTSBuf->f);
// NOTE: mix types tags are not supported
size_t sz = 0;
if (pBlock->tag.nType == TSDB_DATA_TYPE_BINARY || pBlock->tag.nType == TSDB_DATA_TYPE_NCHAR) {
char* tp = realloc(pBlock->tag.pz, pBlock->tag.nLen + 1);
assert(tp != NULL);
memset(tp, 0, pBlock->tag.nLen + 1);
pBlock->tag.pz = tp;
sz = fread(pBlock->tag.pz, (size_t)pBlock->tag.nLen, 1, pTSBuf->f);
} else if (pBlock->tag.nType != TSDB_DATA_TYPE_NULL) {
sz = fread(&pBlock->tag.i64Key, sizeof(int64_t), 1, pTSBuf->f);
}
sz = fread(&pBlock->numOfElem, sizeof(pBlock->numOfElem), 1, pTSBuf->f); sz = fread(&pBlock->numOfElem, sizeof(pBlock->numOfElem), 1, pTSBuf->f);
UNUSED(sz); UNUSED(sz);
sz = fread(&pBlock->compLen, sizeof(pBlock->compLen), 1, pTSBuf->f); sz = fread(&pBlock->compLen, sizeof(pBlock->compLen), 1, pTSBuf->f);
...@@ -361,7 +381,7 @@ static int32_t setCheckTSOrder(STSBuf* pTSBuf, const char* pData, int32_t len) { ...@@ -361,7 +381,7 @@ static int32_t setCheckTSOrder(STSBuf* pTSBuf, const char* pData, int32_t len) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void tsBufAppend(STSBuf* pTSBuf, int32_t vnodeId, int64_t tag, const char* pData, int32_t len) { void tsBufAppend(STSBuf* pTSBuf, int32_t vnodeId, tVariant* tag, const char* pData, int32_t len) {
STSVnodeBlockInfoEx* pBlockInfo = NULL; STSVnodeBlockInfoEx* pBlockInfo = NULL;
STSList* ptsData = &pTSBuf->tsData; STSList* ptsData = &pTSBuf->tsData;
...@@ -375,15 +395,15 @@ void tsBufAppend(STSBuf* pTSBuf, int32_t vnodeId, int64_t tag, const char* pData ...@@ -375,15 +395,15 @@ void tsBufAppend(STSBuf* pTSBuf, int32_t vnodeId, int64_t tag, const char* pData
} }
assert(pBlockInfo->info.vnode == vnodeId); assert(pBlockInfo->info.vnode == vnodeId);
if (pTSBuf->block.tag != tag && ptsData->len > 0) { if ((tVariantCompare(&pTSBuf->block.tag, tag) != 0) && ptsData->len > 0) {
// new arrived data with different tags value, save current value into disk first // new arrived data with different tags value, save current value into disk first
writeDataToDisk(pTSBuf); writeDataToDisk(pTSBuf);
} else { } else {
expandBuffer(ptsData, len); expandBuffer(ptsData, len);
} }
pTSBuf->block.tag = tag; tVariantAssign(&pTSBuf->block.tag, tag);
memcpy(ptsData->rawBuf + ptsData->len, pData, (size_t)len); memcpy(ptsData->rawBuf + ptsData->len, pData, (size_t)len);
// todo check return value // todo check return value
...@@ -465,7 +485,7 @@ static int32_t tsBufFindBlock(STSBuf* pTSBuf, STSVnodeBlockInfo* pBlockInfo, int ...@@ -465,7 +485,7 @@ static int32_t tsBufFindBlock(STSBuf* pTSBuf, STSVnodeBlockInfo* pBlockInfo, int
return 0; return 0;
} }
static int32_t tsBufFindBlockByTag(STSBuf* pTSBuf, STSVnodeBlockInfo* pBlockInfo, int64_t tag) { static int32_t tsBufFindBlockByTag(STSBuf* pTSBuf, STSVnodeBlockInfo* pBlockInfo, tVariant* tag) {
bool decomp = false; bool decomp = false;
int64_t offset = 0; int64_t offset = 0;
...@@ -484,7 +504,7 @@ static int32_t tsBufFindBlockByTag(STSBuf* pTSBuf, STSVnodeBlockInfo* pBlockInfo ...@@ -484,7 +504,7 @@ static int32_t tsBufFindBlockByTag(STSBuf* pTSBuf, STSVnodeBlockInfo* pBlockInfo
return -1; return -1;
} }
if (pTSBuf->block.tag == tag) { if (tVariantCompare(&pTSBuf->block.tag, tag) == 0) {
return i; return i;
} }
} }
...@@ -669,8 +689,8 @@ STSElem tsBufGetElem(STSBuf* pTSBuf) { ...@@ -669,8 +689,8 @@ STSElem tsBufGetElem(STSBuf* pTSBuf) {
elem1.vnode = pTSBuf->pData[pCur->vgroupIndex].info.vnode; elem1.vnode = pTSBuf->pData[pCur->vgroupIndex].info.vnode;
elem1.ts = *(TSKEY*)(pTSBuf->tsData.rawBuf + pCur->tsIndex * TSDB_KEYSIZE); elem1.ts = *(TSKEY*)(pTSBuf->tsData.rawBuf + pCur->tsIndex * TSDB_KEYSIZE);
elem1.tag = pBlock->tag; tVariantAssign(&elem1.tag, &pBlock->tag);
return elem1; return elem1;
} }
...@@ -800,7 +820,7 @@ STSBuf* tsBufCreateFromCompBlocks(const char* pData, int32_t numOfBlocks, int32_ ...@@ -800,7 +820,7 @@ STSBuf* tsBufCreateFromCompBlocks(const char* pData, int32_t numOfBlocks, int32_
return pTSBuf; return pTSBuf;
} }
STSElem tsBufGetElemStartPos(STSBuf* pTSBuf, int32_t vnodeId, int64_t tag) { STSElem tsBufGetElemStartPos(STSBuf* pTSBuf, int32_t vnodeId, tVariant* tag) {
STSElem elem = {.vnode = -1}; STSElem elem = {.vnode = -1};
if (pTSBuf == NULL) { if (pTSBuf == NULL) {
...@@ -881,7 +901,9 @@ void tsBufDisplay(STSBuf* pTSBuf) { ...@@ -881,7 +901,9 @@ void tsBufDisplay(STSBuf* pTSBuf) {
while (tsBufNextPos(pTSBuf)) { while (tsBufNextPos(pTSBuf)) {
STSElem elem = tsBufGetElem(pTSBuf); STSElem elem = tsBufGetElem(pTSBuf);
printf("%d-%" PRId64 "-%" PRId64 "\n", elem.vnode, elem.tag, elem.ts); if (elem.tag.nType == TSDB_DATA_TYPE_BIGINT) {
printf("%d-%" PRId64 "-%" PRId64 "\n", elem.vnode, elem.tag.i64Key, elem.ts);
}
} }
pTSBuf->cur.order = old; pTSBuf->cur.order = old;
......
...@@ -32,14 +32,16 @@ void simpleTest() { ...@@ -32,14 +32,16 @@ void simpleTest() {
// write 10 ts points // write 10 ts points
int32_t num = 10; int32_t num = 10;
int64_t tag = 1; tVariant t = {0};
t.nType = TSDB_DATA_TYPE_BIGINT;
t.i64Key = 1;
int64_t* list = createTsList(10, 10000000, 30); int64_t* list = createTsList(10, 10000000, 30);
tsBufAppend(pTSBuf, 0, tag, (const char*)list, num * sizeof(int64_t)); tsBufAppend(pTSBuf, 0, &t, (const char*)list, num * sizeof(int64_t));
EXPECT_EQ(pTSBuf->tsOrder, TSDB_ORDER_ASC); EXPECT_EQ(pTSBuf->tsOrder, TSDB_ORDER_ASC);
EXPECT_EQ(pTSBuf->tsData.len, sizeof(int64_t) * num); EXPECT_EQ(pTSBuf->tsData.len, sizeof(int64_t) * num);
EXPECT_EQ(pTSBuf->block.tag, tag); EXPECT_EQ(tVariantCompare(&pTSBuf->block.tag, &t), 0);
EXPECT_EQ(pTSBuf->numOfVnodes, 1); EXPECT_EQ(pTSBuf->numOfVnodes, 1);
tsBufFlush(pTSBuf); tsBufFlush(pTSBuf);
...@@ -57,14 +59,16 @@ void largeTSTest() { ...@@ -57,14 +59,16 @@ void largeTSTest() {
// write 10 ts points // write 10 ts points
int32_t num = 1000000; int32_t num = 1000000;
int64_t tag = 1; tVariant t = {0};
t.nType = TSDB_DATA_TYPE_BIGINT;
t.i64Key = 1;
int64_t* list = createTsList(num, 10000000, 30); int64_t* list = createTsList(num, 10000000, 30);
tsBufAppend(pTSBuf, 0, tag, (const char*)list, num * sizeof(int64_t)); tsBufAppend(pTSBuf, 0, &t, (const char*)list, num * sizeof(int64_t));
// the data has been flush to disk, no data in cache // the data has been flush to disk, no data in cache
EXPECT_EQ(pTSBuf->tsData.len, 0); EXPECT_EQ(pTSBuf->tsData.len, 0);
EXPECT_EQ(pTSBuf->block.tag, tag); EXPECT_EQ(tVariantCompare(&pTSBuf->block.tag, &t), 0);
EXPECT_EQ(pTSBuf->numOfVnodes, 1); EXPECT_EQ(pTSBuf->numOfVnodes, 1);
EXPECT_EQ(pTSBuf->tsOrder, TSDB_ORDER_ASC); EXPECT_EQ(pTSBuf->tsOrder, TSDB_ORDER_ASC);
...@@ -80,14 +84,18 @@ void multiTagsTest() { ...@@ -80,14 +84,18 @@ void multiTagsTest() {
STSBuf* pTSBuf = tsBufCreate(true, TSDB_ORDER_ASC); STSBuf* pTSBuf = tsBufCreate(true, TSDB_ORDER_ASC);
int32_t num = 10000; int32_t num = 10000;
int64_t tag = 1; tVariant t = {0};
t.nType = TSDB_DATA_TYPE_BIGINT;
int64_t start = 10000000; int64_t start = 10000000;
int32_t numOfTags = 50; int32_t numOfTags = 50;
int32_t step = 30; int32_t step = 30;
for (int32_t i = 0; i < numOfTags; ++i) { for (int32_t i = 0; i < numOfTags; ++i) {
int64_t* list = createTsList(num, start, step); int64_t* list = createTsList(num, start, step);
tsBufAppend(pTSBuf, 0, i, (const char*)list, num * sizeof(int64_t)); t.i64Key = i;
tsBufAppend(pTSBuf, 0, &t, (const char*)list, num * sizeof(int64_t));
free(list); free(list);
start += step * num; start += step * num;
...@@ -96,7 +104,7 @@ void multiTagsTest() { ...@@ -96,7 +104,7 @@ void multiTagsTest() {
EXPECT_EQ(pTSBuf->tsOrder, TSDB_ORDER_ASC); EXPECT_EQ(pTSBuf->tsOrder, TSDB_ORDER_ASC);
EXPECT_EQ(pTSBuf->tsData.len, num * sizeof(int64_t)); EXPECT_EQ(pTSBuf->tsData.len, num * sizeof(int64_t));
EXPECT_EQ(pTSBuf->block.tag, numOfTags - 1); EXPECT_EQ(pTSBuf->block.tag.i64Key, numOfTags - 1);
EXPECT_EQ(pTSBuf->numOfVnodes, 1); EXPECT_EQ(pTSBuf->numOfVnodes, 1);
tsBufFlush(pTSBuf); tsBufFlush(pTSBuf);
...@@ -118,9 +126,14 @@ void multiVnodeTagsTest() { ...@@ -118,9 +126,14 @@ void multiVnodeTagsTest() {
for (int32_t j = 0; j < 20; ++j) { for (int32_t j = 0; j < 20; ++j) {
// vnodeId:0 // vnodeId:0
start = 10000000; start = 10000000;
tVariant t = {0};
t.nType = TSDB_DATA_TYPE_BIGINT;
for (int32_t i = 0; i < numOfTags; ++i) { for (int32_t i = 0; i < numOfTags; ++i) {
int64_t* list = createTsList(num, start, step); int64_t* list = createTsList(num, start, step);
tsBufAppend(pTSBuf, j, i, (const char*)list, num * sizeof(int64_t)); t.i64Key = i;
tsBufAppend(pTSBuf, j, &t, (const char*)list, num * sizeof(int64_t));
free(list); free(list);
start += step * num; start += step * num;
...@@ -131,11 +144,11 @@ void multiVnodeTagsTest() { ...@@ -131,11 +144,11 @@ void multiVnodeTagsTest() {
EXPECT_EQ(pTSBuf->tsOrder, TSDB_ORDER_ASC); EXPECT_EQ(pTSBuf->tsOrder, TSDB_ORDER_ASC);
EXPECT_EQ(pTSBuf->tsData.len, num * sizeof(int64_t)); EXPECT_EQ(pTSBuf->tsData.len, num * sizeof(int64_t));
EXPECT_EQ(pTSBuf->block.tag, numOfTags - 1); EXPECT_EQ(pTSBuf->block.tag.i64Key, numOfTags - 1);
EXPECT_EQ(pTSBuf->tsData.len, num * sizeof(int64_t)); EXPECT_EQ(pTSBuf->tsData.len, num * sizeof(int64_t));
EXPECT_EQ(pTSBuf->block.tag, numOfTags - 1); EXPECT_EQ(pTSBuf->block.tag.i64Key, numOfTags - 1);
tsBufFlush(pTSBuf); tsBufFlush(pTSBuf);
EXPECT_EQ(pTSBuf->tsData.len, 0); EXPECT_EQ(pTSBuf->tsData.len, 0);
...@@ -157,9 +170,14 @@ void loadDataTest() { ...@@ -157,9 +170,14 @@ void loadDataTest() {
for (int32_t j = 0; j < numOfVnode; ++j) { for (int32_t j = 0; j < numOfVnode; ++j) {
// vnodeId:0 // vnodeId:0
int64_t start = 10000000; int64_t start = 10000000;
tVariant t = {0};
t.nType = TSDB_DATA_TYPE_BIGINT;
for (int32_t i = 0; i < numOfTags; ++i) { for (int32_t i = 0; i < numOfTags; ++i) {
int64_t* list = createTsList(num, start, step); int64_t* list = createTsList(num, start, step);
tsBufAppend(pTSBuf, j, i, (const char*)list, num * sizeof(int64_t)); t.i64Key = i;
tsBufAppend(pTSBuf, j, &t, (const char*)list, num * sizeof(int64_t));
printf("%d - %" PRIu64 "\n", i, list[0]); printf("%d - %" PRIu64 "\n", i, list[0]);
free(list); free(list);
...@@ -172,11 +190,11 @@ void loadDataTest() { ...@@ -172,11 +190,11 @@ void loadDataTest() {
EXPECT_EQ(pTSBuf->tsOrder, TSDB_ORDER_ASC); EXPECT_EQ(pTSBuf->tsOrder, TSDB_ORDER_ASC);
EXPECT_EQ(pTSBuf->tsData.len, num * sizeof(int64_t)); EXPECT_EQ(pTSBuf->tsData.len, num * sizeof(int64_t));
EXPECT_EQ(pTSBuf->block.tag, numOfTags - 1); EXPECT_EQ(pTSBuf->block.tag.i64Key, numOfTags - 1);
EXPECT_EQ(pTSBuf->tsData.len, num * sizeof(int64_t)); EXPECT_EQ(pTSBuf->tsData.len, num * sizeof(int64_t));
EXPECT_EQ(pTSBuf->block.tag, numOfTags - 1); EXPECT_EQ(pTSBuf->block.tag.i64Key, numOfTags - 1);
tsBufFlush(pTSBuf); tsBufFlush(pTSBuf);
EXPECT_EQ(pTSBuf->tsData.len, 0); EXPECT_EQ(pTSBuf->tsData.len, 0);
...@@ -230,16 +248,21 @@ void TSTraverse() { ...@@ -230,16 +248,21 @@ void TSTraverse() {
for (int32_t j = 0; j < numOfVnode; ++j) { for (int32_t j = 0; j < numOfVnode; ++j) {
// vnodeId:0 // vnodeId:0
int64_t start = 10000000; int64_t start = 10000000;
tVariant t = {0};
t.nType = TSDB_DATA_TYPE_BIGINT;
for (int32_t i = 0; i < numOfTags; ++i) { for (int32_t i = 0; i < numOfTags; ++i) {
int64_t* list = createTsList(num, start, step); int64_t* list = createTsList(num, start, step);
tsBufAppend(pTSBuf, j, i, (const char*)list, num * sizeof(int64_t)); t.i64Key = i;
tsBufAppend(pTSBuf, j, &t, (const char*)list, num * sizeof(int64_t));
printf("%d - %d - %" PRIu64 ", %" PRIu64 "\n", j, i, list[0], list[num - 1]); printf("%d - %d - %" PRIu64 ", %" PRIu64 "\n", j, i, list[0], list[num - 1]);
free(list); free(list);
start += step * num; start += step * num;
list = createTsList(num, start, step); list = createTsList(num, start, step);
tsBufAppend(pTSBuf, j, i, (const char*)list, num * sizeof(int64_t)); tsBufAppend(pTSBuf, j, &t, (const char*)list, num * sizeof(int64_t));
printf("%d - %d - %" PRIu64 ", %" PRIu64 "\n", j, i, list[0], list[num - 1]); printf("%d - %d - %" PRIu64 ", %" PRIu64 "\n", j, i, list[0], list[num - 1]);
free(list); free(list);
...@@ -272,12 +295,16 @@ void TSTraverse() { ...@@ -272,12 +295,16 @@ void TSTraverse() {
int32_t startVnode = 1; int32_t startVnode = 1;
int32_t startTag = 2; int32_t startTag = 2;
tsBufGetElemStartPos(pTSBuf, startVnode, startTag); tVariant t = {0};
t.nType = TSDB_DATA_TYPE_BIGINT;
t.i64Key = startTag;
tsBufGetElemStartPos(pTSBuf, startVnode, &t);
int32_t totalOutput = 10; int32_t totalOutput = 10;
while (1) { while (1) {
STSElem elem = tsBufGetElem(pTSBuf); STSElem elem = tsBufGetElem(pTSBuf);
printf("%d-%" PRIu64 "-%" PRIu64 "\n", elem.vnode, elem.tag, elem.ts); printf("%d-%" PRIu64 "-%" PRIu64 "\n", elem.vnode, elem.tag.i64Key, elem.ts);
if (!tsBufNextPos(pTSBuf)) { if (!tsBufNextPos(pTSBuf)) {
break; break;
...@@ -286,7 +313,9 @@ void TSTraverse() { ...@@ -286,7 +313,9 @@ void TSTraverse() {
if (--totalOutput <= 0) { if (--totalOutput <= 0) {
totalOutput = 10; totalOutput = 10;
tsBufGetElemStartPos(pTSBuf, startVnode, --startTag); startTag -= 1;
t.i64Key = startTag;
tsBufGetElemStartPos(pTSBuf, startVnode, &t);
if (startTag == 0) { if (startTag == 0) {
startVnode -= 1; startVnode -= 1;
...@@ -316,13 +345,14 @@ void TSTraverse() { ...@@ -316,13 +345,14 @@ void TSTraverse() {
startVnode = 1; startVnode = 1;
startTag = 2; startTag = 2;
t.i64Key = startTag;
tsBufGetElemStartPos(pTSBuf, startVnode, startTag); tsBufGetElemStartPos(pTSBuf, startVnode, &t);
totalOutput = 10; totalOutput = 10;
while (1) { while (1) {
STSElem elem = tsBufGetElem(pTSBuf); STSElem elem = tsBufGetElem(pTSBuf);
printf("%d-%" PRIu64 "-%" PRIu64 "\n", elem.vnode, elem.tag, elem.ts); printf("%d-%" PRIu64 "-%" PRIu64 "\n", elem.vnode, elem.tag.i64Key, elem.ts);
if (!tsBufNextPos(pTSBuf)) { if (!tsBufNextPos(pTSBuf)) {
break; break;
...@@ -331,7 +361,9 @@ void TSTraverse() { ...@@ -331,7 +361,9 @@ void TSTraverse() {
if (--totalOutput <= 0) { if (--totalOutput <= 0) {
totalOutput = 10; totalOutput = 10;
tsBufGetElemStartPos(pTSBuf, startVnode, --startTag); startTag -= 1;
t.i64Key = startTag;
tsBufGetElemStartPos(pTSBuf, startVnode, &t);
if (startTag < 0) { if (startTag < 0) {
startVnode -= 1; startVnode -= 1;
...@@ -375,12 +407,17 @@ void mergeDiffVnodeBufferTest() { ...@@ -375,12 +407,17 @@ void mergeDiffVnodeBufferTest() {
int32_t num = 1000; int32_t num = 1000;
int32_t numOfTags = 10; int32_t numOfTags = 10;
tVariant t = {0};
t.nType = TSDB_DATA_TYPE_BIGINT;
// vnodeId:0 // vnodeId:0
int64_t start = 10000000; int64_t start = 10000000;
for (int32_t i = 0; i < numOfTags; ++i) { for (int32_t i = 0; i < numOfTags; ++i) {
int64_t* list = createTsList(num, start, step); int64_t* list = createTsList(num, start, step);
tsBufAppend(pTSBuf1, 0, i, (const char*)list, num * sizeof(int64_t)); t.i64Key = i;
tsBufAppend(pTSBuf2, 0, i, (const char*)list, num * sizeof(int64_t));
tsBufAppend(pTSBuf1, 0, &t, (const char*)list, num * sizeof(int64_t));
tsBufAppend(pTSBuf2, 0, &t, (const char*)list, num * sizeof(int64_t));
free(list); free(list);
...@@ -403,6 +440,9 @@ void mergeIdenticalVnodeBufferTest() { ...@@ -403,6 +440,9 @@ void mergeIdenticalVnodeBufferTest() {
STSBuf* pTSBuf1 = tsBufCreate(true, TSDB_ORDER_ASC); STSBuf* pTSBuf1 = tsBufCreate(true, TSDB_ORDER_ASC);
STSBuf* pTSBuf2 = tsBufCreate(true, TSDB_ORDER_ASC); STSBuf* pTSBuf2 = tsBufCreate(true, TSDB_ORDER_ASC);
tVariant t = {0};
t.nType = TSDB_DATA_TYPE_BIGINT;
int32_t step = 30; int32_t step = 30;
int32_t num = 1000; int32_t num = 1000;
int32_t numOfTags = 10; int32_t numOfTags = 10;
...@@ -411,17 +451,21 @@ void mergeIdenticalVnodeBufferTest() { ...@@ -411,17 +451,21 @@ void mergeIdenticalVnodeBufferTest() {
int64_t start = 10000000; int64_t start = 10000000;
for (int32_t i = 0; i < numOfTags; ++i) { for (int32_t i = 0; i < numOfTags; ++i) {
int64_t* list = createTsList(num, start, step); int64_t* list = createTsList(num, start, step);
t.i64Key = i;
tsBufAppend(pTSBuf1, 12, i, (const char*)list, num * sizeof(int64_t)); tsBufAppend(pTSBuf1, 12, &t, (const char*)list, num * sizeof(int64_t));
free(list); free(list);
start += step * num; start += step * num;
} }
for (int32_t i = numOfTags; i < numOfTags * 2; ++i) { for (int32_t i = numOfTags; i < numOfTags * 2; ++i) {
int64_t* list = createTsList(num, start, step); int64_t* list = createTsList(num, start, step);
tsBufAppend(pTSBuf2, 77, i, (const char*)list, num * sizeof(int64_t)); t.i64Key = i;
tsBufAppend(pTSBuf2, 77, &t, (const char*)list, num * sizeof(int64_t));
free(list); free(list);
start += step * num; start += step * num;
...@@ -438,7 +482,7 @@ void mergeIdenticalVnodeBufferTest() { ...@@ -438,7 +482,7 @@ void mergeIdenticalVnodeBufferTest() {
STSElem elem = tsBufGetElem(pTSBuf1); STSElem elem = tsBufGetElem(pTSBuf1);
EXPECT_EQ(elem.vnode, 12); EXPECT_EQ(elem.vnode, 12);
printf("%d-%" PRIu64 "-%" PRIu64 "\n", elem.vnode, elem.tag, elem.ts); printf("%d-%" PRIu64 "-%" PRIu64 "\n", elem.vnode, elem.tag.i64Key, elem.ts);
} }
tsBufDestroy(pTSBuf1); tsBufDestroy(pTSBuf1);
...@@ -446,6 +490,8 @@ void mergeIdenticalVnodeBufferTest() { ...@@ -446,6 +490,8 @@ void mergeIdenticalVnodeBufferTest() {
} }
} // namespace } // namespace
//TODO add binary tag value test case
TEST(testCase, tsBufTest) { TEST(testCase, tsBufTest) {
simpleTest(); simpleTest();
largeTSTest(); largeTSTest();
...@@ -453,7 +499,7 @@ TEST(testCase, tsBufTest) { ...@@ -453,7 +499,7 @@ TEST(testCase, tsBufTest) {
multiVnodeTagsTest(); multiVnodeTagsTest();
loadDataTest(); loadDataTest();
invalidFileTest(); invalidFileTest();
// randomIncTsTest(); // randomIncTsTest();
TSTraverse(); TSTraverse();
mergeDiffVnodeBufferTest(); mergeDiffVnodeBufferTest();
mergeIdenticalVnodeBufferTest(); mergeIdenticalVnodeBufferTest();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册