From 0b83e6746ed2d1927585b19bf69f1c176e27d2a2 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 3 Nov 2020 18:02:31 +0800 Subject: [PATCH] [TD-1876]: fix bugs in arithmetic expression in super table join query processing. --- src/client/src/tscSQLParser.c | 30 +++---- src/client/src/tscServer.c | 28 ++----- src/client/src/tscSubquery.c | 1 - src/query/inc/qTsbuf.h | 2 + src/query/src/qExecutor.c | 3 +- src/query/src/qTsbuf.c | 81 ++++++++++++++++--- src/query/tests/tsBufTest.cpp | 11 ++- .../script/general/parser/join_multivnode.sim | 2 + 8 files changed, 99 insertions(+), 59 deletions(-) diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 55658ac446..27246aa7d3 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -65,7 +65,6 @@ static bool validateTagParams(tFieldList* pTagsList, tFieldList* pFieldList, SSq static int32_t setObjFullName(char* fullName, const char* account, SStrToken* pDB, SStrToken* tableName, int32_t* len); static void getColumnName(tSQLExprItem* pItem, char* resultFieldName, int32_t nameLength); -static void getRevisedName(char* resultFieldName, int32_t functionId, int32_t maxLen, char* columnName); static int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t colIndex, tSQLExprItem* pItem, bool finalResult); static int32_t insertResultField(SQueryInfo* pQueryInfo, int32_t outputIndex, SColumnList* pIdList, int16_t bytes, @@ -1625,12 +1624,11 @@ int32_t addProjectionExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, t } static int32_t setExprInfoForFunctions(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSchema* pSchema, SConvertFunc cvtFunc, - char* aliasName, int32_t resColIdx, SColumnIndex* pColIndex, bool finalResult) { + tSQLExprItem* item, int32_t resColIdx, SColumnIndex* pColIndex, bool finalResult) { const char* msg1 = "not support column types"; int16_t type = 0; int16_t bytes = 0; - char columnName[TSDB_COL_NAME_LEN] = {0}; int32_t functionID = cvtFunc.execFuncId; if (functionID == TSDB_FUNC_SPREAD) { @@ -1647,15 +1645,14 @@ static int32_t setExprInfoForFunctions(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SS bytes = pSchema[pColIndex->columnIndex].bytes; } - if (aliasName != NULL) { - tstrncpy(columnName, aliasName, sizeof(columnName)); + SSqlExpr* pExpr = tscSqlExprAppend(pQueryInfo, functionID, pColIndex, type, bytes, bytes, false); + if (item->aliasName != NULL) { + tstrncpy(pExpr->aliasName, item->aliasName, tListLen(pExpr->aliasName)); } else { - getRevisedName(columnName, cvtFunc.originFuncId, sizeof(columnName) - 1, pSchema[pColIndex->columnIndex].name); + int32_t len = MIN(tListLen(pExpr->aliasName), item->pNode->token.n + 1); + tstrncpy(pExpr->aliasName, item->pNode->token.z, len); } - SSqlExpr* pExpr = tscSqlExprAppend(pQueryInfo, functionID, pColIndex, type, bytes, bytes, false); - tstrncpy(pExpr->aliasName, columnName, sizeof(pExpr->aliasName)); - if (cvtFunc.originFuncId == TSDB_FUNC_LAST_ROW && cvtFunc.originFuncId != functionID) { pExpr->colInfo.flag |= TSDB_COL_NULL; } @@ -1674,7 +1671,7 @@ static int32_t setExprInfoForFunctions(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SS // if it is not in the final result, do not add it SColumnList ids = getColumnList(1, pColIndex->tableIndex, pColIndex->columnIndex); if (finalResult) { - insertResultField(pQueryInfo, resColIdx, &ids, bytes, (int8_t)type, columnName, pExpr); + insertResultField(pQueryInfo, resColIdx, &ids, bytes, (int8_t)type, pExpr->aliasName, pExpr); } else { tscColumnListInsert(pQueryInfo->colList, &(ids.ids[0])); } @@ -1939,8 +1936,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col SColumnIndex index = COLUMN_INDEX_INITIALIZER; - if (pParamElem->pNode->nSQLOptr == TK_ALL) { - // select table.* + if (pParamElem->pNode->nSQLOptr == TK_ALL) { // select table.* SStrToken tmpToken = pParamElem->pNode->colInfo; if (getTableIndexByName(&tmpToken, pQueryInfo, &index) != TSDB_CODE_SUCCESS) { @@ -1952,7 +1948,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col for (int32_t j = 0; j < tscGetNumOfColumns(pTableMetaInfo->pTableMeta); ++j) { index.columnIndex = j; - if (setExprInfoForFunctions(pCmd, pQueryInfo, pSchema, cvtFunc, pItem->aliasName, colIndex++, &index, finalResult) != 0) { + if (setExprInfoForFunctions(pCmd, pQueryInfo, pSchema, cvtFunc, pItem, colIndex++, &index, finalResult) != 0) { return TSDB_CODE_TSC_INVALID_SQL; } } @@ -1970,7 +1966,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg6); } - if (setExprInfoForFunctions(pCmd, pQueryInfo, pSchema, cvtFunc, pItem->aliasName, colIndex + i, &index, finalResult) != 0) { + if (setExprInfoForFunctions(pCmd, pQueryInfo, pSchema, cvtFunc, pItem, colIndex + i, &index, finalResult) != 0) { return TSDB_CODE_TSC_INVALID_SQL; } @@ -2007,7 +2003,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col for (int32_t i = 0; i < tscGetNumOfColumns(pTableMetaInfo->pTableMeta); ++i) { SColumnIndex index = {.tableIndex = j, .columnIndex = i}; - if (setExprInfoForFunctions(pCmd, pQueryInfo, pSchema, cvtFunc, pItem->aliasName, colIndex, &index, finalResult) != 0) { + if (setExprInfoForFunctions(pCmd, pQueryInfo, pSchema, cvtFunc, pItem, colIndex, &index, finalResult) != 0) { return TSDB_CODE_TSC_INVALID_SQL; } @@ -2240,10 +2236,6 @@ void getColumnName(tSQLExprItem* pItem, char* resultFieldName, int32_t nameLengt } } -void getRevisedName(char* resultFieldName, int32_t functionId, int32_t maxLen, char* columnName) { - snprintf(resultFieldName, maxLen, "%s(%s)", aAggs[functionId].aName, columnName); -} - static bool isTablenameToken(SStrToken* token) { SStrToken tmpToken = *token; SStrToken tableToken = {0}; diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index e678464932..779cee2163 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -878,37 +878,19 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { // compressed ts block pQueryMsg->tsOffset = htonl((int32_t)(pMsg - pCmd->payload)); - int32_t tsLen = 0; - int32_t numOfBlocks = 0; if (pQueryInfo->tsBuf != NULL) { int32_t vnodeId = htonl(pQueryMsg->head.vgId); - STSVnodeBlockInfo *pBlockInfo = tsBufGetVnodeBlockInfo(pQueryInfo->tsBuf, vnodeId); - assert(QUERY_IS_JOIN_QUERY(pQueryInfo->type) && pBlockInfo != NULL); // this query should not be sent - - // todo refactor, extract method and put into tsBuf.c - if (fseek(pQueryInfo->tsBuf->f, pBlockInfo->offset, SEEK_SET) != 0) { - int code = TAOS_SYSTEM_ERROR(ferror(pQueryInfo->tsBuf->f)); - tscError("%p: fseek failed: %s", pSql, tstrerror(code)); - return code; - } - - size_t s = fread(pMsg, 1, pBlockInfo->compLen, pQueryInfo->tsBuf->f); - if (s != pBlockInfo->compLen) { - int code = TAOS_SYSTEM_ERROR(ferror(pQueryInfo->tsBuf->f)); - tscError("%p: fread didn't return expected data: %s", pSql, tstrerror(code)); + int32_t code = dumpFileBlockByVnodeId(pQueryInfo->tsBuf, vnodeId, pMsg, &pQueryMsg->tsLen, &pQueryMsg->tsNumOfBlocks); + if (code != TSDB_CODE_SUCCESS) { return code; } - pMsg += pBlockInfo->compLen; - tsLen = pBlockInfo->compLen; - numOfBlocks = pBlockInfo->numOfBlocks; - } + pMsg += pQueryMsg->tsLen; - pQueryMsg->tsLen = htonl(tsLen); - pQueryMsg->tsNumOfBlocks = htonl(numOfBlocks); - if (pQueryInfo->tsBuf != NULL) { pQueryMsg->tsOrder = htonl(pQueryInfo->tsBuf->tsOrder); + pQueryMsg->tsLen = htonl(pQueryMsg->tsLen); + pQueryMsg->tsNumOfBlocks = htonl(pQueryMsg->tsNumOfBlocks); } int32_t msgLen = (int32_t)(pMsg - pCmd->payload); diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 786e237a53..38b0388d35 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -189,7 +189,6 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJ STSElem el1 = tsBufGetElem(pSupporter1->pTSBuf); if (el1.vnode < 0) { // no data exists, abort - completed = true; break; } } diff --git a/src/query/inc/qTsbuf.h b/src/query/inc/qTsbuf.h index 6c2a955f47..fc5efa069d 100644 --- a/src/query/inc/qTsbuf.h +++ b/src/query/inc/qTsbuf.h @@ -135,6 +135,8 @@ int32_t tsBufGetNumOfVnodes(STSBuf* pTSBuf); void tsBufGetVnodeIdList(STSBuf* pTSBuf, int32_t* num, int32_t** vnodeId); +int32_t dumpFileBlockByVnodeId(STSBuf* pTSBuf, int32_t vnodeId, void* buf, int32_t* len, int32_t* numOfBlocks); + #ifdef __cplusplus } #endif diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 8b12af7fa9..d65d01d155 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -6373,12 +6373,13 @@ static int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQ SQuery *pQuery = pQInfo->runtimeEnv.pQuery; STSBuf *pTSBuf = NULL; - if (pQueryMsg->tsLen > 0) { // open new file to save the result + if (pQueryMsg->tsLen > 0) { // open new file to save the result char *tsBlock = (char *) pQueryMsg + pQueryMsg->tsOffset; pTSBuf = tsBufCreateFromCompBlocks(tsBlock, pQueryMsg->tsNumOfBlocks, pQueryMsg->tsLen, pQueryMsg->tsOrder, vgId); tsBufResetPos(pTSBuf); bool ret = tsBufNextPos(pTSBuf); + UNUSED(ret); } diff --git a/src/query/src/qTsbuf.c b/src/query/src/qTsbuf.c index ed7fa16cc4..82f36a8951 100644 --- a/src/query/src/qTsbuf.c +++ b/src/query/src/qTsbuf.c @@ -219,6 +219,15 @@ static void shrinkBuffer(STSList* ptsData) { } } +static int32_t getTagAreaLength(tVariant* pa) { + int32_t t = sizeof(pa->nLen) * 2 + sizeof(pa->nType); + if (pa->nType != TSDB_DATA_TYPE_NULL) { + t += pa->nLen; + } + + return t; +} + static void writeDataToDisk(STSBuf* pTSBuf) { if (pTSBuf->tsData.len == 0) { return; @@ -244,19 +253,27 @@ static void writeDataToDisk(STSBuf* pTSBuf) { */ int32_t metaLen = 0; metaLen += (int32_t)fwrite(&pBlock->tag.nType, 1, sizeof(pBlock->tag.nType), pTSBuf->f); - metaLen += (int32_t)fwrite(&pBlock->tag.nLen, 1, sizeof(pBlock->tag.nLen), pTSBuf->f); + int32_t trueLen = pBlock->tag.nLen; if (pBlock->tag.nType == TSDB_DATA_TYPE_BINARY || pBlock->tag.nType == TSDB_DATA_TYPE_NCHAR) { + metaLen += (int32_t)fwrite(&pBlock->tag.nLen, 1, sizeof(pBlock->tag.nLen), pTSBuf->f); metaLen += (int32_t)fwrite(pBlock->tag.pz, 1, (size_t)pBlock->tag.nLen, pTSBuf->f); } else if (pBlock->tag.nType != TSDB_DATA_TYPE_NULL) { - metaLen += (int32_t)fwrite(&pBlock->tag.i64Key, 1, sizeof(int64_t), pTSBuf->f); + metaLen += (int32_t)fwrite(&pBlock->tag.nLen, 1, sizeof(pBlock->tag.nLen), pTSBuf->f); + metaLen += (int32_t)fwrite(&pBlock->tag.i64Key, 1, (size_t) pBlock->tag.nLen, pTSBuf->f); + } else { + trueLen = 0; + metaLen += (int32_t)fwrite(&trueLen, 1, sizeof(pBlock->tag.nLen), pTSBuf->f); } fwrite(&pBlock->numOfElem, sizeof(pBlock->numOfElem), 1, pTSBuf->f); fwrite(&pBlock->compLen, sizeof(pBlock->compLen), 1, pTSBuf->f); fwrite(pBlock->payload, (size_t)pBlock->compLen, 1, pTSBuf->f); fwrite(&pBlock->compLen, sizeof(pBlock->compLen), 1, pTSBuf->f); - + + metaLen += fwrite(&trueLen, 1, sizeof(pBlock->tag.nLen), pTSBuf->f); + assert(metaLen == getTagAreaLength(&pBlock->tag)); + int32_t blockSize = metaLen + sizeof(pBlock->numOfElem) + sizeof(pBlock->compLen) * 2 + pBlock->compLen; pTSBuf->fileSize += blockSize; @@ -291,17 +308,22 @@ STSBlock* readDataFromDisk(STSBuf* pTSBuf, int32_t order, bool decomp) { pBlock->padding = 0; pBlock->numOfElem = 0; + int32_t offset = -1; + if (order == TSDB_ORDER_DESC) { /* * set the right position for the reversed traverse, the reversed traverse is started from * the end of each comp data block */ - int32_t ret = fseek(pTSBuf->f, -(int32_t)(sizeof(pBlock->padding)), SEEK_CUR); - size_t sz = fread(&pBlock->padding, sizeof(pBlock->padding), 1, pTSBuf->f); + int32_t prev = -(int32_t) (sizeof(pBlock->padding) + sizeof(pBlock->tag.nLen)); + int32_t ret = fseek(pTSBuf->f, prev, SEEK_CUR); + size_t sz = fread(&pBlock->padding, 1, sizeof(pBlock->padding), pTSBuf->f); + sz = fread(&pBlock->tag.nLen, 1, sizeof(pBlock->tag.nLen), pTSBuf->f); UNUSED(sz); - + pBlock->compLen = pBlock->padding; - int32_t offset = pBlock->compLen + sizeof(pBlock->compLen) * 2 + sizeof(pBlock->numOfElem) + sizeof(pBlock->tag); + + offset = pBlock->compLen + sizeof(pBlock->compLen) * 2 + sizeof(pBlock->numOfElem) + getTagAreaLength(&pBlock->tag); ret = fseek(pTSBuf->f, -offset, SEEK_CUR); UNUSED(ret); } @@ -320,7 +342,7 @@ STSBlock* readDataFromDisk(STSBuf* pTSBuf, int32_t order, bool decomp) { sz = fread(pBlock->tag.pz, (size_t)pBlock->tag.nLen, 1, pTSBuf->f); } else if (pBlock->tag.nType != TSDB_DATA_TYPE_NULL) { - sz = fread(&pBlock->tag.i64Key, sizeof(int64_t), 1, pTSBuf->f); + sz = fread(&pBlock->tag.i64Key, (size_t) pBlock->tag.nLen, 1, pTSBuf->f); } sz = fread(&pBlock->numOfElem, sizeof(pBlock->numOfElem), 1, pTSBuf->f); @@ -328,8 +350,7 @@ STSBlock* readDataFromDisk(STSBuf* pTSBuf, int32_t order, bool decomp) { sz = fread(&pBlock->compLen, sizeof(pBlock->compLen), 1, pTSBuf->f); UNUSED(sz); sz = fread(pBlock->payload, (size_t)pBlock->compLen, 1, pTSBuf->f); - UNUSED(sz); - + if (decomp) { pTSBuf->tsData.len = tsDecompressTimestamp(pBlock->payload, pBlock->compLen, pBlock->numOfElem, pTSBuf->tsData.rawBuf, @@ -338,11 +359,20 @@ STSBlock* readDataFromDisk(STSBuf* pTSBuf, int32_t order, bool decomp) { // read the comp length at the length of comp block sz = fread(&pBlock->padding, sizeof(pBlock->padding), 1, pTSBuf->f); + assert(pBlock->padding == pBlock->compLen); + + int32_t n = 0; + sz = fread(&n, sizeof(pBlock->tag.nLen), 1, pTSBuf->f); + if (pBlock->tag.nType == TSDB_DATA_TYPE_NULL) { + assert(n == 0); + } else { + assert(n == pBlock->tag.nLen); + } + UNUSED(sz); // for backwards traverse, set the start position at the end of previous block if (order == TSDB_ORDER_DESC) { - int32_t offset = pBlock->compLen + sizeof(pBlock->compLen) * 2 + sizeof(pBlock->numOfElem) + sizeof(pBlock->tag); int32_t r = fseek(pTSBuf->f, -offset, SEEK_CUR); UNUSED(r); } @@ -479,7 +509,7 @@ static int32_t tsBufFindBlock(STSBuf* pTSBuf, STSVnodeBlockInfo* pBlockInfo, int if (pTSBuf->cur.order == TSDB_ORDER_DESC) { STSBlock* pBlock = &pTSBuf->block; int32_t compBlockSize = - pBlock->compLen + sizeof(pBlock->compLen) * 2 + sizeof(pBlock->numOfElem) + sizeof(pBlock->tag); + pBlock->compLen + sizeof(pBlock->compLen) * 2 + sizeof(pBlock->numOfElem) + getTagAreaLength(&pBlock->tag); int32_t ret = fseek(pTSBuf->f, -compBlockSize, SEEK_CUR); UNUSED(ret); } @@ -507,7 +537,7 @@ static int32_t tsBufFindBlockByTag(STSBuf* pTSBuf, STSVnodeBlockInfo* pBlockInfo } if (tVariantCompare(&pTSBuf->block.tag, tag) == 0) { - return i; + return (pTSBuf->cur.order == TSDB_ORDER_ASC)? i: (pBlockInfo->numOfBlocks - (i + 1)); } } @@ -993,4 +1023,29 @@ void tsBufGetVnodeIdList(STSBuf* pTSBuf, int32_t* num, int32_t** vnodeId) { for(int32_t i = 0; i < size; ++i) { (*vnodeId)[i] = pTSBuf->pData[i].info.vnode; } +} + +int32_t dumpFileBlockByVnodeId(STSBuf* pTSBuf, int32_t vnodeId, void* buf, int32_t* len, int32_t* numOfBlocks) { + STSVnodeBlockInfo *pBlockInfo = tsBufGetVnodeBlockInfo(pTSBuf, vnodeId); + + *len = 0; + *numOfBlocks = 0; + + if (fseek(pTSBuf->f, pBlockInfo->offset, SEEK_SET) != 0) { + int code = TAOS_SYSTEM_ERROR(ferror(pTSBuf->f)); +// qError("%p: fseek failed: %s", pSql, tstrerror(code)); + return code; + } + + size_t s = fread(buf, 1, pBlockInfo->compLen, pTSBuf->f); + if (s != pBlockInfo->compLen) { + int code = TAOS_SYSTEM_ERROR(ferror(pTSBuf->f)); +// tscError("%p: fread didn't return expected data: %s", pSql, tstrerror(code)); + return code; + } + + *len = pBlockInfo->compLen; + *numOfBlocks = pBlockInfo->numOfBlocks; + + return TSDB_CODE_SUCCESS; } \ No newline at end of file diff --git a/src/query/tests/tsBufTest.cpp b/src/query/tests/tsBufTest.cpp index 8cd3a9cbef..2892af3979 100644 --- a/src/query/tests/tsBufTest.cpp +++ b/src/query/tests/tsBufTest.cpp @@ -472,13 +472,20 @@ void mergeIdenticalVnodeBufferTest() { tsBufFlush(pTSBuf2); tsBufMerge(pTSBuf1, pTSBuf2); - EXPECT_EQ(pTSBuf1->numOfVnodes, 1); + EXPECT_EQ(pTSBuf1->numOfVnodes, 2); EXPECT_EQ(pTSBuf1->numOfTotal, numOfTags * 2 * num); tsBufResetPos(pTSBuf1); + + int32_t count = 0; while (tsBufNextPos(pTSBuf1)) { STSElem elem = tsBufGetElem(pTSBuf1); - EXPECT_EQ(elem.vnode, 12); + + if (count++ < numOfTags * num) { + EXPECT_EQ(elem.vnode, 12); + } else { + EXPECT_EQ(elem.vnode, 77); + } printf("%d-%" PRIu64 "-%" PRIu64 "\n", elem.vnode, elem.tag->i64Key, elem.ts); } diff --git a/tests/script/general/parser/join_multivnode.sim b/tests/script/general/parser/join_multivnode.sim index 85d5c79d86..dc2c0fcf3e 100644 --- a/tests/script/general/parser/join_multivnode.sim +++ b/tests/script/general/parser/join_multivnode.sim @@ -316,6 +316,8 @@ if $data03 != 0 then endi select count(join_mt0.c1), first(join_mt0.c1)-last(join_mt1.c1), first(join_mt1.c9) from join_mt0, join_mt1 where join_mt0.t1=join_mt1.t1 and join_mt0.ts=join_mt1.ts;", NULL); +select count(join_mt0.c1), first(join_mt0.c1)/count(*), first(join_mt1.c9) from join_mt0, join_mt1 where join_mt0.t1=join_mt1.t1 and join_mt0.ts=join_mt1.ts;", NULL); select count(join_mt0.c1), first(join_mt0.c1)-last(join_mt0.c1), first(join_mt1.c9) from join_mt0, join_mt1 where join_mt0.t1=join_mt1.t1 and join_mt0.ts=join_mt1.ts;", NULL); +select last(join_mt0.c1) from join_mt0, join_mt1 where join_mt0.t1=join_mt1.t1 and join_mt0.ts=join_mt1.ts;", NULL); system sh/exec.sh -n dnode1 -s stop -x SIGINT \ No newline at end of file -- GitLab