diff --git a/src/client/inc/tscJoinProcess.h b/src/client/inc/tscJoinProcess.h index 34764e4db62469af14592a026015c88b53a03fa5..01a92c36b19dfcffc9d4650512781cc4d0120ba1 100644 --- a/src/client/inc/tscJoinProcess.h +++ b/src/client/inc/tscJoinProcess.h @@ -47,14 +47,14 @@ typedef struct STSList { typedef struct STSRawBlock { int32_t vnode; - int64_t tag; + tVariant tag; TSKEY* ts; int32_t len; } STSRawBlock; typedef struct STSElem { TSKEY ts; - int64_t tag; + tVariant tag; int32_t vnode; } STSElem; @@ -66,7 +66,7 @@ typedef struct STSCursor { } STSCursor; typedef struct STSBlock { - int64_t tag; // tag value + tVariant tag; // tag value int32_t numOfElem; // number of elements int32_t compLen; // size after compressed int32_t padding; // 0xFFFFFFFF by default, after the payload @@ -123,7 +123,7 @@ STSBuf* tsBufCreateFromCompBlocks(const char* pData, int32_t numOfBlocks, int32_ void* tsBufDestory(STSBuf* pTSBuf); -void tsBufAppend(STSBuf* pTSBuf, int32_t vnodeId, int64_t tag, const char* pData, int32_t len); +void tsBufAppend(STSBuf* pTSBuf, int32_t vnodeId, tVariant tag, const char* pData, int32_t len); int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf, int32_t vnodeIdx); STSVnodeBlockInfo* tsBufGetVnodeBlockInfo(STSBuf* pTSBuf, int32_t vnodeId); @@ -134,7 +134,7 @@ void tsBufResetPos(STSBuf* pTSBuf); STSElem tsBufGetElem(STSBuf* pTSBuf); bool tsBufNextPos(STSBuf* pTSBuf); -STSElem tsBufGetElemStartPos(STSBuf* pTSBuf, int32_t vnodeId, int64_t tag); +STSElem tsBufGetElemStartPos(STSBuf* pTSBuf, int32_t vnodeId, tVariant tag); STSCursor tsBufGetCursor(STSBuf* pTSBuf); void tsBufSetTraverseOrder(STSBuf* pTSBuf, int32_t order); diff --git a/src/client/src/tscFunctionImpl.c b/src/client/src/tscFunctionImpl.c index 328526f17fdbe512a5e592ef792d3fdf724817d3..f893d33454a8c6a00ba87476e3363798cb62d047 100644 --- a/src/client/src/tscFunctionImpl.c +++ b/src/client/src/tscFunctionImpl.c @@ -4335,11 +4335,11 @@ static void ts_comp_function(SQLFunctionCtx *pCtx) { // primary ts must be existed, so no need to check its existance if (pCtx->order == TSQL_SO_ASC) { - tsBufAppend(pTSbuf, 0, pCtx->tag.i64Key, input, pCtx->size * TSDB_KEYSIZE); + tsBufAppend(pTSbuf, 0, pCtx->tag, input, pCtx->size * TSDB_KEYSIZE); } else { for (int32_t i = pCtx->size - 1; i >= 0; --i) { char *d = GET_INPUT_CHAR_INDEX(pCtx, i); - tsBufAppend(pTSbuf, 0, pCtx->tag.i64Key, d, TSDB_KEYSIZE); + tsBufAppend(pTSbuf, 0, pCtx->tag, d, TSDB_KEYSIZE); } } @@ -4359,7 +4359,7 @@ static void ts_comp_function_f(SQLFunctionCtx *pCtx, int32_t index) { STSBuf *pTSbuf = pInfo->pTSBuf; - tsBufAppend(pTSbuf, 0, pCtx->tag.i64Key, pData, TSDB_KEYSIZE); + tsBufAppend(pTSbuf, 0, pCtx->tag, pData, TSDB_KEYSIZE); SET_VAL(pCtx, pCtx->size, 1); pResInfo->hasResult = DATA_SET_FLAG; diff --git a/src/client/src/tscJoinProcess.c b/src/client/src/tscJoinProcess.c index 3d2f7949f251330372480b9dfb74eded9bd5967c..f220970da94d2d969551fbc98ecb37a42809e17b 100644 --- a/src/client/src/tscJoinProcess.c +++ b/src/client/src/tscJoinProcess.c @@ -82,14 +82,14 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSubquerySupporter* pSuppor // for debug purpose tscPrint("%" PRId64 ", tags:%d \t %" PRId64 ", tags:%d", elem1.ts, elem1.tag, elem2.ts, elem2.tag); #endif - - if (elem1.tag < elem2.tag || (elem1.tag == elem2.tag && doCompare(order, elem1.ts, elem2.ts))) { + int32_t ret = tVariantCompare(&elem1.tag,&elem2.tag ); + if (ret < 0 || (ret == 0 && doCompare(order, elem1.ts, elem2.ts))) { if (!tsBufNextPos(pSupporter1->pTSBuf)) { break; } numOfInput1++; - } else if (elem1.tag > elem2.tag || (elem1.tag == elem2.tag && doCompare(order, elem2.ts, elem1.ts))) { + } else if (ret > 0 || (ret == 0 && doCompare(order, elem2.ts, elem1.ts))) { if (!tsBufNextPos(pSupporter2->pTSBuf)) { break; } @@ -870,6 +870,11 @@ static STSBuf* allocResForTSBuf(STSBuf* pTSBuf) { return NULL; } + pTSBuf->block.tag.pz = malloc(MEM_TAG_SIZE); //Need to define the length + if (pTSBuf->block.tag.pz == NULL) { + tsBufDestory(pTSBuf); + return NULL; + } pTSBuf->fileSize += getDataStartOffset(); return pTSBuf; } @@ -1005,6 +1010,7 @@ void* tsBufDestory(STSBuf* pTSBuf) { tfree(pTSBuf->pData); tfree(pTSBuf->block.payload); + tfree(pTSBuf->block.tag.pz); fclose(pTSBuf->f); @@ -1098,7 +1104,9 @@ static void writeDataToDisk(STSBuf* pTSBuf) { * * both side has the compressed length is used to support load data forwards/backwords. */ - fwrite(&pBlock->tag, sizeof(pBlock->tag), 1, pTSBuf->f); + fwrite(&pBlock->tag.nType, sizeof(pBlock->tag.nType), 1, pTSBuf->f); + fwrite(&pBlock->tag.nLen, sizeof(pBlock->tag.nLen), 1, pTSBuf->f); + fwrite(pBlock->tag.pz,(size_t)pBlock->tag.nLen,1,pTSBuf->f); fwrite(&pBlock->numOfElem, sizeof(pBlock->numOfElem), 1, pTSBuf->f); fwrite(&pBlock->compLen, sizeof(pBlock->compLen), 1, pTSBuf->f); @@ -1107,7 +1115,7 @@ static void writeDataToDisk(STSBuf* pTSBuf) { fwrite(&pBlock->compLen, sizeof(pBlock->compLen), 1, pTSBuf->f); - int32_t blockSize = sizeof(pBlock->tag) + sizeof(pBlock->numOfElem) + sizeof(pBlock->compLen) * 2 + pBlock->compLen; + int32_t blockSize = sizeof(pBlock->tag.nType) + sizeof(pBlock->tag.nLen) + pBlock->tag.nLen + sizeof(pBlock->numOfElem) + sizeof(pBlock->compLen) * 2 + pBlock->compLen; pTSBuf->fileSize += blockSize; pTSBuf->tsData.len = 0; @@ -1150,11 +1158,13 @@ STSBlock* readDataFromDisk(STSBuf* pTSBuf, int32_t order, bool decomp) { fread(&pBlock->padding, sizeof(pBlock->padding), 1, pTSBuf->f); pBlock->compLen = pBlock->padding; - int32_t offset = pBlock->compLen + sizeof(pBlock->compLen) * 2 + sizeof(pBlock->numOfElem) + sizeof(pBlock->tag); + int32_t offset = pBlock->compLen + sizeof(pBlock->compLen) * 2 + sizeof(pBlock->numOfElem) + sizeof(pBlock->tag.nType) + sizeof(pBlock->tag.nLen) + pBlock->tag.nLen ; fseek(pTSBuf->f, -offset, SEEK_CUR); } - fread(&pBlock->tag, sizeof(pBlock->tag), 1, pTSBuf->f); + fread(&pBlock->tag.nType, sizeof(pBlock->tag.nType), 1, pTSBuf->f); + fread(&pBlock->tag.nLen, sizeof(pBlock->tag.nLen), 1, pTSBuf->f); + fread(pBlock->tag.pz,(size_t)pBlock->tag.nLen,1,pTSBuf->f); fread(&pBlock->numOfElem, sizeof(pBlock->numOfElem), 1, pTSBuf->f); fread(&pBlock->compLen, sizeof(pBlock->compLen), 1, pTSBuf->f); @@ -1212,9 +1222,10 @@ static int32_t setCheckTSOrder(STSBuf* pTSBuf, const char* pData, int32_t len) { return TSDB_CODE_SUCCESS; } -void tsBufAppend(STSBuf* pTSBuf, int32_t vnodeId, int64_t tag, const char* pData, int32_t len) { +void tsBufAppend(STSBuf* pTSBuf, int32_t vnodeId, tVariant tag, const char* pData, int32_t len) { STSVnodeBlockInfoEx* pBlockInfo = NULL; STSList* ptsData = &pTSBuf->tsData; + int32_t tagEqual = 0; if (pTSBuf->numOfVnodes == 0 || tsBufGetLastVnodeInfo(pTSBuf)->info.vnode != vnodeId) { writeDataToDisk(pTSBuf); @@ -1226,15 +1237,17 @@ void tsBufAppend(STSBuf* pTSBuf, int32_t vnodeId, int64_t tag, const char* pData } assert(pBlockInfo->info.vnode == vnodeId); + tagEqual = tVariantCompare(&pTSBuf->block.tag,&tag); - if (pTSBuf->block.tag != tag && ptsData->len > 0) { + if (tagEqual !=0 && ptsData->len > 0) { // new arrived data with different tags value, save current value into disk first writeDataToDisk(pTSBuf); } else { expandBuffer(ptsData, len); } - pTSBuf->block.tag = tag; + //pTSBuf->block.tag = tag; + tVariantAssign(&pTSBuf->block.tag,&tag); memcpy(ptsData->rawBuf + ptsData->len, pData, (size_t)len); // todo check return value @@ -1315,7 +1328,7 @@ static int32_t tsBufFindBlock(STSBuf* pTSBuf, STSVnodeBlockInfo* pBlockInfo, int return 0; } -static int32_t tsBufFindBlockByTag(STSBuf* pTSBuf, STSVnodeBlockInfo* pBlockInfo, int64_t tag) { +static int32_t tsBufFindBlockByTag(STSBuf* pTSBuf, STSVnodeBlockInfo* pBlockInfo, tVariant tag) { bool decomp = false; int64_t offset = 0; @@ -1334,7 +1347,7 @@ static int32_t tsBufFindBlockByTag(STSBuf* pTSBuf, STSVnodeBlockInfo* pBlockInfo return -1; } - if (pTSBuf->block.tag == tag) { + if (0 == tVariantCompare(&pTSBuf->block.tag,&tag)) { return i; } } @@ -1513,7 +1526,9 @@ STSElem tsBufGetElem(STSBuf* pTSBuf) { elem1.vnode = pTSBuf->pData[pCur->vnodeIndex].info.vnode; elem1.ts = *(TSKEY*)(pTSBuf->tsData.rawBuf + pCur->tsIndex * TSDB_KEYSIZE); - elem1.tag = pBlock->tag; + elem1.tag.nType = pBlock->tag.nType; + elem1.tag.nLen = pBlock->tag.nLen; + elem1.tag.pz = pBlock->tag.pz; return elem1; } @@ -1644,7 +1659,7 @@ STSBuf* tsBufCreateFromCompBlocks(const char* pData, int32_t numOfBlocks, int32_ return pTSBuf; } -STSElem tsBufGetElemStartPos(STSBuf* pTSBuf, int32_t vnodeId, int64_t tag) { +STSElem tsBufGetElemStartPos(STSBuf* pTSBuf, int32_t vnodeId, tVariant tag) { STSElem elem = {.vnode = -1}; if (pTSBuf == NULL) { @@ -1723,7 +1738,7 @@ void tsBufDisplay(STSBuf* pTSBuf) { while (tsBufNextPos(pTSBuf)) { STSElem elem = tsBufGetElem(pTSBuf); - printf("%d-%" PRId64 "-%" PRId64 "\n", elem.vnode, *(int64_t*) elem.tag, elem.ts); + printf("%d-%" PRId64 "\n", elem.vnode, elem.ts); } pTSBuf->cur.order = old; diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index cee3998017523b09cba96bc86b70a057321b5261..45a0bbee0fb276d0c88e6e5c24d69fc042511127 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -3199,10 +3199,10 @@ static bool validateJoinExprNode(SQueryInfo* pQueryInfo, tSQLExpr* pExpr, SColum } else if (pLeftIndex->tableIndex == rightIndex.tableIndex) { invalidSqlErrMsg(pQueryInfo->msg, msg4); return false; - } else if (leftType == TSDB_DATA_TYPE_BINARY || leftType == TSDB_DATA_TYPE_NCHAR) { + } /*else if (leftType == TSDB_DATA_TYPE_BINARY || leftType == TSDB_DATA_TYPE_NCHAR) { invalidSqlErrMsg(pQueryInfo->msg, msg6); return false; - } + }*/ // table to table/ super table to super table are allowed if (UTIL_METER_IS_SUPERTABLE(pLeftMeterMeta) != UTIL_METER_IS_SUPERTABLE(pRightMeterMeta)) { diff --git a/src/inc/ttypes.h b/src/inc/ttypes.h index db6490f8404f2b9c0be4f83ce0391ec4dad39a81..0118c12b747377443136ac04d6e144cd583e43ed 100644 --- a/src/inc/ttypes.h +++ b/src/inc/ttypes.h @@ -75,6 +75,8 @@ void tVariantDestroy(tVariant *pV); void tVariantAssign(tVariant *pDst, const tVariant *pSrc); +int32_t tVariantCompare(tVariant *pDst, const tVariant *pSrc); + int32_t tVariantToString(tVariant *pVar, char *dst); int32_t tVariantDump(tVariant *pVariant, char *payload, char type); diff --git a/src/system/detail/inc/vnodeRead.h b/src/system/detail/inc/vnodeRead.h index 0b636db110b9816e14a32cf1de8a3a6b913cdf65..3aa020ad7f050129b4099ff0f0fda4c431eac65e 100644 --- a/src/system/detail/inc/vnodeRead.h +++ b/src/system/detail/inc/vnodeRead.h @@ -198,7 +198,7 @@ typedef struct SMeterQueryInfo { int64_t ekey; int32_t numOfRes; int16_t queryRangeSet; // denote if the query range is set, only available for interval query - int64_t tag; + tVariant tag; STSCursor cur; int32_t sid; // for retrieve the page id list diff --git a/src/system/detail/src/vnodeQueryImpl.c b/src/system/detail/src/vnodeQueryImpl.c index e69b5a5b8cf0cc49ada67f44ec5d4eb17b7eff57..8d72095de93691de3d794c1247f3eaa05392474d 100644 --- a/src/system/detail/src/vnodeQueryImpl.c +++ b/src/system/detail/src/vnodeQueryImpl.c @@ -2557,7 +2557,7 @@ static int32_t doTSJoinFilter(SQueryRuntimeEnv *pRuntimeEnv, int32_t offset) { SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx; // compare tag first - if (pCtx[0].tag.i64Key != elem.tag) { + if (0 != tVariantCompare(&pCtx[0].tag,&elem.tag)) { return TS_JOIN_TAG_NOT_EQUALS; } @@ -7461,7 +7461,8 @@ int32_t setAdditionalInfo(STableQuerySupportObj *pSupporter, int32_t meterIdx, S // both the master and supplement scan needs to set the correct ts comp start position if (pRuntimeEnv->pTSBuf != NULL) { if (pMeterQueryInfo->cur.vnodeIndex == -1) { - pMeterQueryInfo->tag = pRuntimeEnv->pCtx[0].tag.i64Key; + //pMeterQueryInfo->tag = pRuntimeEnv->pCtx[0].tag.i64Key; + tVariantAssign(&pMeterQueryInfo->tag,&pRuntimeEnv->pCtx[0].tag); tsBufGetElemStartPos(pRuntimeEnv->pTSBuf, 0, pMeterQueryInfo->tag); diff --git a/src/system/detail/src/vnodeQueryProcess.c b/src/system/detail/src/vnodeQueryProcess.c index ae51365918b142e392dcffa27a6b071543f3d02e..3c05f40188592e249cb47c778b7436fe17ef213e 100644 --- a/src/system/detail/src/vnodeQueryProcess.c +++ b/src/system/detail/src/vnodeQueryProcess.c @@ -500,7 +500,8 @@ static bool multimeterMultioutputHelper(SQInfo *pQInfo, bool *dataInDisk, bool * if (pRuntimeEnv->pTSBuf != NULL) { if (pRuntimeEnv->cur.vnodeIndex == -1) { - int64_t tag = pRuntimeEnv->pCtx[0].tag.i64Key; + tVariant tag; + tVariantAssign(&tag,&pRuntimeEnv->pCtx[0].tag); STSElem elem = tsBufGetElemStartPos(pRuntimeEnv->pTSBuf, 0, tag); // failed to find data with the specified tag value diff --git a/src/util/src/ttypes.c b/src/util/src/ttypes.c index ae994cb77b7cdb27f3e857115d6d1db7df9bd9b0..86c78c028391986368d65880f3a8c877f074adc7 100644 --- a/src/util/src/ttypes.c +++ b/src/util/src/ttypes.c @@ -184,6 +184,34 @@ void tVariantAssign(tVariant *pDst, const tVariant *pSrc) { memcpy(pDst->pz, pSrc->pz, len); } } +/* compare two tVariant, if same, return 0; else return nonezero */ +int32_t tVariantCompare(tVariant *pDst, const tVariant *pSrc) { + if (pSrc == NULL || pDst == NULL) return 1; + if (pSrc->nType != pDst->nType) return 1; + switch (pSrc->nType) { + case TSDB_DATA_TYPE_BIGINT: + case TSDB_DATA_TYPE_BOOL: + case TSDB_DATA_TYPE_DOUBLE: + case TSDB_DATA_TYPE_FLOAT: + case TSDB_DATA_TYPE_TINYINT: + case TSDB_DATA_TYPE_INT: + case TSDB_DATA_TYPE_SMALLINT: + case TSDB_DATA_TYPE_TIMESTAMP: + if (pSrc->i64Key > pDst->i64Key){ + return 1; + }else if (pSrc->i64Key < pDst->i64Key) { + return -1; + }else { + return 0; + } + + case TSDB_DATA_TYPE_BINARY: + case TSDB_DATA_TYPE_NCHAR: + return strncasecmp(pSrc->pz,pDst->pz,pSrc->nLen); + default: + return 1; + } +} int32_t tVariantToString(tVariant *pVar, char *dst) { if (pVar == NULL || dst == NULL) return 0;