From 0f2c6f31810792e25be3b2c3ff90634f27f06e8d Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 22 Oct 2020 15:49:15 +0800 Subject: [PATCH] [td-1373] --- src/client/src/tscFunctionImpl.c | 4 +-- src/client/src/tscSubquery.c | 6 ++-- src/query/inc/qTsbuf.h | 4 +++ src/query/src/qExecutor.c | 8 +++-- src/query/src/qTsbuf.c | 52 ++++++++++++++++++++++++-------- 5 files changed, 54 insertions(+), 20 deletions(-) diff --git a/src/client/src/tscFunctionImpl.c b/src/client/src/tscFunctionImpl.c index 1b4f92d3fc..81893841f7 100644 --- a/src/client/src/tscFunctionImpl.c +++ b/src/client/src/tscFunctionImpl.c @@ -4025,11 +4025,11 @@ static void ts_comp_function(SQLFunctionCtx *pCtx) { // primary ts must be existed, so no need to check its existance if (pCtx->order == TSDB_ORDER_ASC) { - tsBufAppend(pTSbuf, 0, &pCtx->tag, input, pCtx->size * TSDB_KEYSIZE); + tsBufAppend(pTSbuf, pCtx->param[0].i64Key, &pCtx->tag, input, pCtx->size * TSDB_KEYSIZE); } else { for (int32_t i = pCtx->size - 1; i >= 0; --i) { char *d = GET_INPUT_CHAR_INDEX(pCtx, i); - tsBufAppend(pTSbuf, 0, &pCtx->tag, d, TSDB_KEYSIZE); + tsBufAppend(pTSbuf, pCtx->param[0].i64Key, &pCtx->tag, d, TSDB_KEYSIZE); } } diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 2a684f8608..ca778d4f92 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -158,9 +158,9 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJ tsBufDestroy(pSupporter1->pTSBuf); tsBufDestroy(pSupporter2->pTSBuf); - tscDebug("%p input1:%" PRId64 ", input2:%" PRId64 ", final:%" PRId64 " for secondary query after ts blocks " - "intersecting, skey:%" PRId64 ", ekey:%" PRId64, pSql, numOfInput1, numOfInput2, output1->numOfTotal, - win->skey, win->ekey); + tscDebug("%p input1:%" PRId64 ", input2:%" PRId64 ", final:%" PRId64 " in %d vnodes for secondary query after ts blocks " + "intersecting, skey:%" PRId64 ", ekey:%" PRId64 ", numOfVnode:%d", pSql, numOfInput1, numOfInput2, output1->numOfTotal, + output1->numOfVnodes, win->skey, win->ekey, tsBufGetNumOfVnodes(output1)); return output1->numOfTotal; } diff --git a/src/query/inc/qTsbuf.h b/src/query/inc/qTsbuf.h index 46e6f79014..be3d97dad7 100644 --- a/src/query/inc/qTsbuf.h +++ b/src/query/inc/qTsbuf.h @@ -136,6 +136,10 @@ void tsBufSetCursor(STSBuf* pTSBuf, STSCursor* pCur); */ void tsBufDisplay(STSBuf* pTSBuf); +int32_t tsBufGetNumOfVnodes(STSBuf* pTSBuf); + +void tsBufGetVnodeIdList(STSBuf* pTSBuf, int32_t* num, int32_t** vnodeId); + #ifdef __cplusplus } #endif diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 3cb8db9faf..224ecfe08c 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -1005,9 +1005,10 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis * longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } + SQInfo* pQInfo = GET_QINFO_ADDR(pQuery); for (int32_t k = 0; k < pQuery->numOfOutput; ++k) { char *dataBlock = getDataBlock(pRuntimeEnv, &sasArray[k], k, pDataBlockInfo->rows, pDataBlock); - setExecParams(pQuery, &pCtx[k], dataBlock, tsCols, pDataBlockInfo, pStatis, &sasArray[k], k); + setExecParams(pQuery, &pCtx[k], dataBlock, tsCols, pDataBlockInfo, pStatis, &sasArray[k], k, pQInfo->vgId); } int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); @@ -1469,7 +1470,7 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBl } void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inputData, TSKEY *tsCol, SDataBlockInfo* pBlockInfo, - SDataStatis *pStatis, void *param, int32_t colIndex) { + SDataStatis *pStatis, void *param, int32_t colIndex, int32_t vgId) { int32_t functionId = pQuery->pSelectExpr[colIndex].base.functionId; int32_t colId = pQuery->pSelectExpr[colIndex].base.colInfo.colId; @@ -1542,6 +1543,9 @@ void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inputData, TSKEY } } } + } else if (functionId == TSDB_FUNC_TS_COMP) { + pCtx->param[0].i64Key = vgId; + pCtx->param[0].nType = TSDB_DATA_TYPE_BIGINT; } #if defined(_DEBUG_VIEW) diff --git a/src/query/src/qTsbuf.c b/src/query/src/qTsbuf.c index b264f6cdc9..30cf070c0a 100644 --- a/src/query/src/qTsbuf.c +++ b/src/query/src/qTsbuf.c @@ -561,6 +561,19 @@ static void tsBufGetBlock(STSBuf* pTSBuf, int32_t vnodeIndex, int32_t blockIndex pCur->tsIndex = (pCur->order == TSDB_ORDER_ASC) ? 0 : pBlock->numOfElem - 1; } +static int32_t doUpdateVnodeInfo(STSBuf* pTSBuf, int64_t offset, STSVnodeBlockInfo* pVInfo) { + if (offset < 0 || offset >= getDataStartOffset()) { + return -1; + } + + if (fseek(pTSBuf->f, (int32_t)offset, SEEK_SET) != 0) { + return -1; + } + + fwrite(pVInfo, sizeof(STSVnodeBlockInfo), 1, pTSBuf->f); + return 0; +} + STSVnodeBlockInfo* tsBufGetVnodeBlockInfo(STSBuf* pTSBuf, int32_t vnodeId) { int32_t j = tsBufFindVnodeIndexFromId(pTSBuf->pData, pTSBuf->numOfVnodes, vnodeId); if (j == -1) { @@ -915,19 +928,6 @@ static int32_t getDataStartOffset() { return sizeof(STSBufFileHeader) + TS_COMP_FILE_VNODE_MAX * sizeof(STSVnodeBlockInfo); } -static int32_t doUpdateVnodeInfo(STSBuf* pTSBuf, int64_t offset, STSVnodeBlockInfo* pVInfo) { - if (offset < 0 || offset >= getDataStartOffset()) { - return -1; - } - - if (fseek(pTSBuf->f, (int32_t)offset, SEEK_SET) != 0) { - return -1; - } - - fwrite(pVInfo, sizeof(STSVnodeBlockInfo), 1, pTSBuf->f); - return 0; -} - // update prev vnode length info in file static void TSBufUpdateVnodeInfo(STSBuf* pTSBuf, int32_t index, STSVnodeBlockInfo* pBlockInfo) { int32_t offset = sizeof(STSBufFileHeader) + index * sizeof(STSVnodeBlockInfo); @@ -969,3 +969,29 @@ static STSBuf* allocResForTSBuf(STSBuf* pTSBuf) { pTSBuf->fileSize += getDataStartOffset(); return pTSBuf; } + +int32_t tsBufGetNumOfVnodes(STSBuf* pTSBuf) { + if (pTSBuf == NULL) { + return 0; + } + + return pTSBuf->pData->len; +} + +void tsBufGetVnodeIdList(STSBuf* pTSBuf, int32_t* num, int32_t** vnodeId) { + int32_t size = tsBugGetNumOfVnodes(pTSBuf); + if (num != NULL) { + *num = size; + } + + *vnodeId = NULL; + if (size == 0) { + return; + } + + (*vnodeId) = malloc(tsBugGetNumOfVnodes(pTSBuf) * sizeof(int32_t)); + + for(int32_t i = 0; i < size; ++i) { + (*vnodeId)[i] = pTSBuf->pData[i].info.vnode; + } +} \ No newline at end of file -- GitLab