From 66802544aa8724623ea71e303715c4640cff73eb Mon Sep 17 00:00:00 2001 From: hjxilinx Date: Tue, 21 Apr 2020 18:24:18 +0800 Subject: [PATCH] [Tbase-1279] support binary tag join. --- src/client/inc/tscJoinProcess.h | 6 +- src/client/src/tscFunctionImpl.c | 6 +- src/client/src/tscJoinProcess.c | 71 ++++++++++++++--------- src/inc/ttypes.h | 2 +- src/system/detail/src/vnodeQueryImpl.c | 2 +- src/system/detail/src/vnodeQueryProcess.c | 6 +- src/util/src/ttypes.c | 10 +++- 7 files changed, 65 insertions(+), 38 deletions(-) diff --git a/src/client/inc/tscJoinProcess.h b/src/client/inc/tscJoinProcess.h index 01a92c36b1..4e97dfad4b 100644 --- a/src/client/inc/tscJoinProcess.h +++ b/src/client/inc/tscJoinProcess.h @@ -33,7 +33,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code); SJoinSubquerySupporter* tscCreateJoinSupporter(SSqlObj* pSql, SSubqueryState* pState, int32_t index); void tscDestroyJoinSupporter(SJoinSubquerySupporter* pSupporter); -#define MEM_BUF_SIZE (1<<20) +#define MEM_BUF_SIZE (1u<<20) #define TS_COMP_BLOCK_PADDING 0xFFFFFFFF #define TS_COMP_FILE_MAGIC 0x87F5EC4C #define TS_COMP_FILE_VNODE_MAX 512 @@ -123,7 +123,7 @@ STSBuf* tsBufCreateFromCompBlocks(const char* pData, int32_t numOfBlocks, int32_ void* tsBufDestory(STSBuf* pTSBuf); -void tsBufAppend(STSBuf* pTSBuf, int32_t vnodeId, tVariant tag, const char* pData, int32_t len); +void tsBufAppend(STSBuf* pTSBuf, int32_t vnodeId, tVariant* tag, const char* pData, int32_t len); int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf, int32_t vnodeIdx); STSVnodeBlockInfo* tsBufGetVnodeBlockInfo(STSBuf* pTSBuf, int32_t vnodeId); @@ -134,7 +134,7 @@ void tsBufResetPos(STSBuf* pTSBuf); STSElem tsBufGetElem(STSBuf* pTSBuf); bool tsBufNextPos(STSBuf* pTSBuf); -STSElem tsBufGetElemStartPos(STSBuf* pTSBuf, int32_t vnodeId, tVariant tag); +STSElem tsBufGetElemStartPos(STSBuf* pTSBuf, int32_t vnodeId, tVariant* tag); STSCursor tsBufGetCursor(STSBuf* pTSBuf); void tsBufSetTraverseOrder(STSBuf* pTSBuf, int32_t order); diff --git a/src/client/src/tscFunctionImpl.c b/src/client/src/tscFunctionImpl.c index f893d33454..fdadced667 100644 --- a/src/client/src/tscFunctionImpl.c +++ b/src/client/src/tscFunctionImpl.c @@ -4335,11 +4335,11 @@ static void ts_comp_function(SQLFunctionCtx *pCtx) { // primary ts must be existed, so no need to check its existance if (pCtx->order == TSQL_SO_ASC) { - tsBufAppend(pTSbuf, 0, pCtx->tag, input, pCtx->size * TSDB_KEYSIZE); + tsBufAppend(pTSbuf, 0, &pCtx->tag, input, pCtx->size * TSDB_KEYSIZE); } else { for (int32_t i = pCtx->size - 1; i >= 0; --i) { char *d = GET_INPUT_CHAR_INDEX(pCtx, i); - tsBufAppend(pTSbuf, 0, pCtx->tag, d, TSDB_KEYSIZE); + tsBufAppend(pTSbuf, 0, &pCtx->tag, d, TSDB_KEYSIZE); } } @@ -4359,7 +4359,7 @@ static void ts_comp_function_f(SQLFunctionCtx *pCtx, int32_t index) { STSBuf *pTSbuf = pInfo->pTSBuf; - tsBufAppend(pTSbuf, 0, pCtx->tag, pData, TSDB_KEYSIZE); + tsBufAppend(pTSbuf, 0, &pCtx->tag, pData, TSDB_KEYSIZE); SET_VAL(pCtx, pCtx->size, 1); pResInfo->hasResult = DATA_SET_FLAG; diff --git a/src/client/src/tscJoinProcess.c b/src/client/src/tscJoinProcess.c index f220970da9..eb33e50150 100644 --- a/src/client/src/tscJoinProcess.c +++ b/src/client/src/tscJoinProcess.c @@ -109,8 +109,8 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSubquerySupporter* pSuppor *et = elem1.ts; } - tsBufAppend(output1, elem1.vnode, elem1.tag, (const char*)&elem1.ts, sizeof(elem1.ts)); - tsBufAppend(output2, elem2.vnode, elem2.tag, (const char*)&elem2.ts, sizeof(elem2.ts)); + tsBufAppend(output1, elem1.vnode, &elem1.tag, (const char*)&elem1.ts, sizeof(elem1.ts)); + tsBufAppend(output2, elem2.vnode, &elem2.tag, (const char*)&elem2.ts, sizeof(elem2.ts)); } else { pLimit->offset -= 1; } @@ -869,12 +869,7 @@ static STSBuf* allocResForTSBuf(STSBuf* pTSBuf) { tsBufDestory(pTSBuf); return NULL; } - - pTSBuf->block.tag.pz = malloc(MEM_TAG_SIZE); //Need to define the length - if (pTSBuf->block.tag.pz == NULL) { - tsBufDestory(pTSBuf); - return NULL; - } + pTSBuf->fileSize += getDataStartOffset(); return pTSBuf; } @@ -1010,7 +1005,8 @@ void* tsBufDestory(STSBuf* pTSBuf) { tfree(pTSBuf->pData); tfree(pTSBuf->block.payload); - tfree(pTSBuf->block.tag.pz); + + tVariantDestroy(&pTSBuf->block.tag); fclose(pTSBuf->f); @@ -1094,8 +1090,7 @@ static void writeDataToDisk(STSBuf* pTSBuf) { tsCompressTimestamp(pTSBuf->tsData.rawBuf, pTSBuf->tsData.len, pTSBuf->tsData.len / TSDB_KEYSIZE, pBlock->payload, pTSBuf->tsData.allocSize, TWO_STAGE_COMP, pTSBuf->assistBuf, pTSBuf->bufSize); - int64_t r = fseek(pTSBuf->f, pTSBuf->fileSize, SEEK_SET); - UNUSED(r); + /*int64_t r =*/ fseek(pTSBuf->f, pTSBuf->fileSize, SEEK_SET); /* * format for output data: @@ -1105,8 +1100,14 @@ static void writeDataToDisk(STSBuf* pTSBuf) { * both side has the compressed length is used to support load data forwards/backwords. */ fwrite(&pBlock->tag.nType, sizeof(pBlock->tag.nType), 1, pTSBuf->f); - fwrite(&pBlock->tag.nLen, sizeof(pBlock->tag.nLen), 1, pTSBuf->f); - fwrite(pBlock->tag.pz,(size_t)pBlock->tag.nLen,1,pTSBuf->f); + fwrite(&pBlock->tag.nLen, sizeof(pBlock->tag.nLen), 1, pTSBuf->f); + + if (pBlock->tag.nType == TSDB_DATA_TYPE_BINARY || pBlock->tag.nType == TSDB_DATA_TYPE_NCHAR) { + fwrite(pBlock->tag.pz, (size_t)pBlock->tag.nLen, 1, pTSBuf->f); + } else { + fwrite(&pBlock->tag.i64Key, sizeof(int64_t), 1, pTSBuf->f); + } + fwrite(&pBlock->numOfElem, sizeof(pBlock->numOfElem), 1, pTSBuf->f); fwrite(&pBlock->compLen, sizeof(pBlock->compLen), 1, pTSBuf->f); @@ -1145,10 +1146,13 @@ STSBlock* readDataFromDisk(STSBuf* pTSBuf, int32_t order, bool decomp) { STSBlock* pBlock = &pTSBuf->block; // clear the memory buffer + tVariant t = pBlock->tag; void* tmp = pBlock->payload; memset(pBlock, 0, sizeof(STSBlock)); + pBlock->payload = tmp; - + pBlock->tag = t; + if (order == TSQL_SO_DESC) { /* * set the right position for the reversed traverse, the reversed traverse is started from @@ -1164,7 +1168,20 @@ STSBlock* readDataFromDisk(STSBuf* pTSBuf, int32_t order, bool decomp) { fread(&pBlock->tag.nType, sizeof(pBlock->tag.nType), 1, pTSBuf->f); fread(&pBlock->tag.nLen, sizeof(pBlock->tag.nLen), 1, pTSBuf->f); - fread(pBlock->tag.pz,(size_t)pBlock->tag.nLen,1,pTSBuf->f); + + // NOTE: mix types tags are not supported + if (pBlock->tag.nType == TSDB_DATA_TYPE_BINARY || pBlock->tag.nType == TSDB_DATA_TYPE_NCHAR) { + char* tp = realloc(pBlock->tag.pz, pBlock->tag.nLen + 1); + assert(tp != NULL); + + memset(tp, 0, pBlock->tag.nLen + 1); + pBlock->tag.pz = tp; + + fread(pBlock->tag.pz, (size_t)pBlock->tag.nLen, 1, pTSBuf->f); + } else { + fread(&pBlock->tag.i64Key, pBlock->tag.nLen, 1, pTSBuf->f); + } + fread(&pBlock->numOfElem, sizeof(pBlock->numOfElem), 1, pTSBuf->f); fread(&pBlock->compLen, sizeof(pBlock->compLen), 1, pTSBuf->f); @@ -1222,10 +1239,11 @@ static int32_t setCheckTSOrder(STSBuf* pTSBuf, const char* pData, int32_t len) { return TSDB_CODE_SUCCESS; } -void tsBufAppend(STSBuf* pTSBuf, int32_t vnodeId, tVariant tag, const char* pData, int32_t len) { +void tsBufAppend(STSBuf* pTSBuf, int32_t vnodeId, tVariant* tag, const char* pData, int32_t len) { STSVnodeBlockInfoEx* pBlockInfo = NULL; - STSList* ptsData = &pTSBuf->tsData; - int32_t tagEqual = 0; + + STSList* ptsData = &pTSBuf->tsData; + int32_t tagEqual = 0; if (pTSBuf->numOfVnodes == 0 || tsBufGetLastVnodeInfo(pTSBuf)->info.vnode != vnodeId) { writeDataToDisk(pTSBuf); @@ -1237,17 +1255,18 @@ void tsBufAppend(STSBuf* pTSBuf, int32_t vnodeId, tVariant tag, const char* pDat } assert(pBlockInfo->info.vnode == vnodeId); - tagEqual = tVariantCompare(&pTSBuf->block.tag,&tag); + tagEqual = tVariantCompare(&pTSBuf->block.tag, tag); - if (tagEqual !=0 && ptsData->len > 0) { + if (tagEqual != 0 && ptsData->len > 0) { // new arrived data with different tags value, save current value into disk first writeDataToDisk(pTSBuf); } else { expandBuffer(ptsData, len); } - - //pTSBuf->block.tag = tag; - tVariantAssign(&pTSBuf->block.tag,&tag); + + tVariantDestroy(&pTSBuf->block.tag); + tVariantAssign(&pTSBuf->block.tag, tag); + memcpy(ptsData->rawBuf + ptsData->len, pData, (size_t)len); // todo check return value @@ -1328,7 +1347,7 @@ static int32_t tsBufFindBlock(STSBuf* pTSBuf, STSVnodeBlockInfo* pBlockInfo, int return 0; } -static int32_t tsBufFindBlockByTag(STSBuf* pTSBuf, STSVnodeBlockInfo* pBlockInfo, tVariant tag) { +static int32_t tsBufFindBlockByTag(STSBuf* pTSBuf, STSVnodeBlockInfo* pBlockInfo, tVariant* tag) { bool decomp = false; int64_t offset = 0; @@ -1347,7 +1366,7 @@ static int32_t tsBufFindBlockByTag(STSBuf* pTSBuf, STSVnodeBlockInfo* pBlockInfo return -1; } - if (0 == tVariantCompare(&pTSBuf->block.tag,&tag)) { + if (0 == tVariantCompare(&pTSBuf->block.tag, tag)) { return i; } } @@ -1659,7 +1678,7 @@ STSBuf* tsBufCreateFromCompBlocks(const char* pData, int32_t numOfBlocks, int32_ return pTSBuf; } -STSElem tsBufGetElemStartPos(STSBuf* pTSBuf, int32_t vnodeId, tVariant tag) { +STSElem tsBufGetElemStartPos(STSBuf* pTSBuf, int32_t vnodeId, tVariant* tag) { STSElem elem = {.vnode = -1}; if (pTSBuf == NULL) { diff --git a/src/inc/ttypes.h b/src/inc/ttypes.h index 0118c12b74..c985f56f10 100644 --- a/src/inc/ttypes.h +++ b/src/inc/ttypes.h @@ -75,7 +75,7 @@ void tVariantDestroy(tVariant *pV); void tVariantAssign(tVariant *pDst, const tVariant *pSrc); -int32_t tVariantCompare(tVariant *pDst, const tVariant *pSrc); +int32_t tVariantCompare(const tVariant *pDst, const tVariant *pSrc); int32_t tVariantToString(tVariant *pVar, char *dst); diff --git a/src/system/detail/src/vnodeQueryImpl.c b/src/system/detail/src/vnodeQueryImpl.c index 8d72095de9..334e9af008 100644 --- a/src/system/detail/src/vnodeQueryImpl.c +++ b/src/system/detail/src/vnodeQueryImpl.c @@ -7464,7 +7464,7 @@ int32_t setAdditionalInfo(STableQuerySupportObj *pSupporter, int32_t meterIdx, S //pMeterQueryInfo->tag = pRuntimeEnv->pCtx[0].tag.i64Key; tVariantAssign(&pMeterQueryInfo->tag,&pRuntimeEnv->pCtx[0].tag); - tsBufGetElemStartPos(pRuntimeEnv->pTSBuf, 0, pMeterQueryInfo->tag); + tsBufGetElemStartPos(pRuntimeEnv->pTSBuf, 0, &pMeterQueryInfo->tag); // keep the cursor info of current meter pMeterQueryInfo->cur = pRuntimeEnv->pTSBuf->cur; diff --git a/src/system/detail/src/vnodeQueryProcess.c b/src/system/detail/src/vnodeQueryProcess.c index 3c05f40188..766419dd22 100644 --- a/src/system/detail/src/vnodeQueryProcess.c +++ b/src/system/detail/src/vnodeQueryProcess.c @@ -500,9 +500,9 @@ static bool multimeterMultioutputHelper(SQInfo *pQInfo, bool *dataInDisk, bool * if (pRuntimeEnv->pTSBuf != NULL) { if (pRuntimeEnv->cur.vnodeIndex == -1) { - tVariant tag; - tVariantAssign(&tag,&pRuntimeEnv->pCtx[0].tag); - STSElem elem = tsBufGetElemStartPos(pRuntimeEnv->pTSBuf, 0, tag); + tVariant tag = {0}; + tVariantAssign(&tag, &pRuntimeEnv->pCtx[0].tag); + STSElem elem = tsBufGetElemStartPos(pRuntimeEnv->pTSBuf, 0, &tag); // failed to find data with the specified tag value if (elem.vnode < 0) { diff --git a/src/util/src/ttypes.c b/src/util/src/ttypes.c index 86c78c0283..78f8595e77 100644 --- a/src/util/src/ttypes.c +++ b/src/util/src/ttypes.c @@ -114,27 +114,33 @@ void tVariantCreateFromBinary(tVariant *pVar, char *pz, uint32_t len, uint32_t t case TSDB_DATA_TYPE_BOOL: case TSDB_DATA_TYPE_TINYINT: { pVar->i64Key = GET_INT8_VAL(pz); + pVar->nLen = sizeof(int64_t); break; } case TSDB_DATA_TYPE_SMALLINT: { pVar->i64Key = GET_INT16_VAL(pz); + pVar->nLen = sizeof(int64_t); break; } case TSDB_DATA_TYPE_INT: { pVar->i64Key = GET_INT32_VAL(pz); + pVar->nLen = sizeof(int64_t); break; } case TSDB_DATA_TYPE_BIGINT: case TSDB_DATA_TYPE_TIMESTAMP: { pVar->i64Key = GET_INT64_VAL(pz); + pVar->nLen = sizeof(int64_t); break; } case TSDB_DATA_TYPE_DOUBLE: { pVar->dKey = GET_DOUBLE_VAL(pz); + pVar->nLen = sizeof(int64_t); break; } case TSDB_DATA_TYPE_FLOAT: { pVar->dKey = GET_FLOAT_VAL(pz); + pVar->nLen = sizeof(int64_t); break; } case TSDB_DATA_TYPE_NCHAR: { // here we get the nchar length from raw binary bits length @@ -181,11 +187,13 @@ void tVariantAssign(tVariant *pDst, const tVariant *pSrc) { } pDst->pz = calloc(1, len); + printf("==============alloc assign:%p", pDst->pz); + memcpy(pDst->pz, pSrc->pz, len); } } /* compare two tVariant, if same, return 0; else return nonezero */ -int32_t tVariantCompare(tVariant *pDst, const tVariant *pSrc) { +int32_t tVariantCompare(const tVariant *pDst, const tVariant *pSrc) { if (pSrc == NULL || pDst == NULL) return 1; if (pSrc->nType != pDst->nType) return 1; switch (pSrc->nType) { -- GitLab