From 9cdc251d3df9e6b284d16905c7b58b6b3fd4f0db Mon Sep 17 00:00:00 2001 From: hjxilinx Date: Mon, 4 May 2020 22:18:18 +0800 Subject: [PATCH] [td-168] fix bug in handling the new string --- src/client/src/tscFunctionImpl.c | 8 +++-- src/client/src/tscLocal.c | 58 +++++--------------------------- src/client/src/tscSQLParser.c | 2 +- src/common/inc/tdataformat.h | 19 ++++++----- src/dnode/src/dnodeWrite.c | 2 +- src/inc/taosdef.h | 1 + src/mnode/inc/mgmtDef.h | 1 + src/mnode/src/mgmtDb.c | 22 +++++++----- src/mnode/src/mgmtTable.c | 28 ++++++++------- src/query/inc/qtsbuf.h | 2 +- src/query/src/qtsbuf.c | 38 ++++++++++----------- src/query/src/queryExecutor.c | 18 +++++----- src/tsdb/src/tsdbRead.c | 57 +++++++++++++++++++++---------- 13 files changed, 126 insertions(+), 130 deletions(-) diff --git a/src/client/src/tscFunctionImpl.c b/src/client/src/tscFunctionImpl.c index 9067be6325..caaedca1f2 100644 --- a/src/client/src/tscFunctionImpl.c +++ b/src/client/src/tscFunctionImpl.c @@ -1844,6 +1844,7 @@ static void last_row_function(SQLFunctionCtx *pCtx) { assignVal(pCtx->aOutputBuf, pData, pCtx->inputBytes, pCtx->inputType); SResultInfo *pResInfo = GET_RES_INFO(pCtx); + pResInfo->hasResult = DATA_SET_FLAG; SLastrowInfo *pInfo = (SLastrowInfo *)pResInfo->interResultBuf; pInfo->ts = pCtx->param[0].i64Key; @@ -1863,14 +1864,17 @@ static void last_row_function(SQLFunctionCtx *pCtx) { static void last_row_finalizer(SQLFunctionCtx *pCtx) { // do nothing at the first stage + SResultInfo *pResInfo = GET_RES_INFO(pCtx); if (pCtx->currentStage == SECONDARY_STAGE_MERGE) { - SResultInfo *pResInfo = GET_RES_INFO(pCtx); if (pResInfo->hasResult != DATA_SET_FLAG) { setNull(pCtx->aOutputBuf, pCtx->outputType, pCtx->outputBytes); return; } } else { - // do nothing + if (pResInfo->hasResult != DATA_SET_FLAG) { + setNull(pCtx->aOutputBuf, pCtx->outputType, pCtx->outputBytes); + return; + } } GET_RES_INFO(pCtx)->numOfRes = 1; diff --git a/src/client/src/tscLocal.c b/src/client/src/tscLocal.c index 3dfac55adb..600ef88344 100644 --- a/src/client/src/tscLocal.c +++ b/src/client/src/tscLocal.c @@ -132,7 +132,7 @@ static int32_t tscSetValueToResObj(SSqlObj *pSql, int32_t rowLen) { for (int32_t i = 0; i < numOfRows; ++i) { TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, 0); char* dst = pRes->data + tscFieldInfoGetOffset(pQueryInfo, 0) * totalNumOfRows + pField->bytes * i; - STR_TO_VARSTR(dst, pSchema[i].name); + STR_WITH_MAXSIZE_TO_VARSTR(dst, pSchema[i].name, TSDB_COL_NAME_LEN); char *type = tDataTypeDesc[pSchema[i].type].aName; @@ -155,8 +155,8 @@ static int32_t tscSetValueToResObj(SSqlObj *pSql, int32_t rowLen) { pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, 3); if (i >= tscGetNumOfColumns(pMeta) && tscGetNumOfTags(pMeta) != 0) { - strncpy(pRes->data + tscFieldInfoGetOffset(pQueryInfo, 3) * totalNumOfRows + pField->bytes * i, "tag", - strlen("tag") + 1); + char* output = pRes->data + tscFieldInfoGetOffset(pQueryInfo, 3) * totalNumOfRows + pField->bytes * i; + STR_WITH_SIZE_TO_VARSTR(output, "TAG", 3); } } @@ -169,13 +169,15 @@ static int32_t tscSetValueToResObj(SSqlObj *pSql, int32_t rowLen) { for (int32_t i = numOfRows; i < totalNumOfRows; ++i) { // field name TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, 0); - strncpy(pRes->data + tscFieldInfoGetOffset(pQueryInfo, 0) * totalNumOfRows + pField->bytes * i, pSchema[i].name, - TSDB_COL_NAME_LEN); + char* output = pRes->data + tscFieldInfoGetOffset(pQueryInfo, 0) * totalNumOfRows + pField->bytes * i; + STR_WITH_MAXSIZE_TO_VARSTR(output, pSchema[i].name, TSDB_COL_NAME_LEN); // type name pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, 1); char *type = tDataTypeDesc[pSchema[i].type].aName; - strncpy(pRes->data + tscFieldInfoGetOffset(pQueryInfo, 1) * totalNumOfRows + pField->bytes * i, type, pField->bytes); + + output = pRes->data + tscFieldInfoGetOffset(pQueryInfo, 1) * totalNumOfRows + pField->bytes * i; + STR_WITH_MAXSIZE_TO_VARSTR(output, type, pField->bytes); // type length int32_t bytes = pSchema[i].bytes; @@ -189,49 +191,7 @@ static int32_t tscSetValueToResObj(SSqlObj *pSql, int32_t rowLen) { // tag value pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, 3); char *target = pRes->data + tscFieldInfoGetOffset(pQueryInfo, 3) * totalNumOfRows + pField->bytes * i; - - if (isNull(pTagValue, pSchema[i].type)) { - sprintf(target, "%s", TSDB_DATA_NULL_STR); - } else { - switch (pSchema[i].type) { - case TSDB_DATA_TYPE_BINARY: - /* binary are not null-terminated string */ - strncpy(target, pTagValue, pSchema[i].bytes); - break; - case TSDB_DATA_TYPE_NCHAR: - taosUcs4ToMbs(pTagValue, pSchema[i].bytes, target); - break; - case TSDB_DATA_TYPE_FLOAT: { - float fv = 0; - fv = GET_FLOAT_VAL(pTagValue); - sprintf(target, "%f", fv); - } break; - case TSDB_DATA_TYPE_DOUBLE: { - double dv = 0; - dv = GET_DOUBLE_VAL(pTagValue); - sprintf(target, "%lf", dv); - } break; - case TSDB_DATA_TYPE_TINYINT: - sprintf(target, "%d", *(int8_t *)pTagValue); - break; - case TSDB_DATA_TYPE_SMALLINT: - sprintf(target, "%d", *(int16_t *)pTagValue); - break; - case TSDB_DATA_TYPE_INT: - sprintf(target, "%d", *(int32_t *)pTagValue); - break; - case TSDB_DATA_TYPE_BIGINT: - sprintf(target, "%" PRId64 "", *(int64_t *)pTagValue); - break; - case TSDB_DATA_TYPE_BOOL: { - char *val = (*((int8_t *)pTagValue) == 0) ? "false" : "true"; - sprintf(target, "%s", val); - break; - } - default: - break; - } - } + STR_WITH_SIZE_TO_VARSTR(target, "TAG", 3); pTagValue += pSchema[i].bytes; } diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index e6fd9e47ee..cf4aa3a50d 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -1427,7 +1427,7 @@ int32_t addProjectionExprAndResultField(SQueryInfo* pQueryInfo, tSQLExprItem* pI } if (index.columnIndex == TSDB_TBNAME_COLUMN_INDEX) { - SSchema colSchema = {.type = TSDB_DATA_TYPE_BINARY, .bytes = TSDB_TABLE_NAME_LEN}; + SSchema colSchema = {.type = TSDB_DATA_TYPE_BINARY, .bytes = TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE}; strcpy(colSchema.name, TSQL_TBNAME_L); pQueryInfo->type = TSDB_QUERY_TYPE_STABLE_QUERY; diff --git a/src/common/inc/tdataformat.h b/src/common/inc/tdataformat.h index 27faf41cf9..78b6cb73b2 100644 --- a/src/common/inc/tdataformat.h +++ b/src/common/inc/tdataformat.h @@ -26,15 +26,18 @@ extern "C" { #endif -#define VARSTR_HEADER_SIZE sizeof(int16_t) -#define STR_TO_VARSTR(x, str) do {int16_t __len = strlen(str); \ - *(int16_t*)(x) = __len; \ +#define STR_TO_VARSTR(x, str) do {VarDataLenT __len = strlen(str); \ + *(VarDataLenT*)(x) = __len; \ strncpy((char*)(x) + VARSTR_HEADER_SIZE, (str), __len);} while(0); - -#define STR_TO_VARSTR_WITH_SIZE(x, str, _size) do {\ - int16_t __len = strnlen((str), (_size)); \ - *(int16_t*)(x) = __len; \ - strncpy((char*)(x) + VARSTR_HEADER_SIZE, (str), __len);\ + +#define STR_WITH_MAXSIZE_TO_VARSTR(x, str, _maxs) do {\ + char* _e = stpncpy((char*)(x) + VARSTR_HEADER_SIZE, (str), (_maxs));\ + *(VarDataLenT*)(x) = _e - (x);\ +} while(0) + +#define STR_WITH_SIZE_TO_VARSTR(x, str, _size) do {\ + *(VarDataLenT*)(x) = (_size); \ + strncpy((char*)(x) + VARSTR_HEADER_SIZE, (str), (_size));\ } while(0); // ----------------- TSDB COLUMN DEFINITION diff --git a/src/dnode/src/dnodeWrite.c b/src/dnode/src/dnodeWrite.c index 39757c690f..babbcf4ae8 100644 --- a/src/dnode/src/dnodeWrite.c +++ b/src/dnode/src/dnodeWrite.c @@ -228,7 +228,7 @@ static void dnodeHandleIdleWorker(SWriteWorker *pWorker) { int32_t num = taosGetQueueNumber(pWorker->qset); if (num > 0) { - usleep(30000); + usleep(30); sched_yield(); } else { taosFreeQall(pWorker->qall); diff --git a/src/inc/taosdef.h b/src/inc/taosdef.h index 65c3efd022..aa4889ec9d 100644 --- a/src/inc/taosdef.h +++ b/src/inc/taosdef.h @@ -43,6 +43,7 @@ typedef int16_t VarDataLenT; // this data type is internally used only in 'in' query to hold the values #define TSDB_DATA_TYPE_ARRAY (TSDB_DATA_TYPE_NCHAR + 1) +#define VARSTR_HEADER_SIZE sizeof(VarDataLenT) // Bytes for each type. extern const int32_t TYPE_BYTES[11]; diff --git a/src/mnode/inc/mgmtDef.h b/src/mnode/inc/mgmtDef.h index f6a85ec237..8e5240910d 100644 --- a/src/mnode/inc/mgmtDef.h +++ b/src/mnode/inc/mgmtDef.h @@ -66,6 +66,7 @@ typedef struct SMnodeObj { SDnodeObj *pDnode; } SMnodeObj; +// todo use dynamic length string typedef struct { char tableId[TSDB_TABLE_ID_LEN + 1]; int8_t type; diff --git a/src/mnode/src/mgmtDb.c b/src/mnode/src/mgmtDb.c index ee7d92ea68..9cc7e200d1 100644 --- a/src/mnode/src/mgmtDb.c +++ b/src/mnode/src/mgmtDb.c @@ -23,6 +23,7 @@ #include "ttime.h" #include "tname.h" #include "tbalance.h" +#include "tdataformat.h" #include "mgmtDef.h" #include "mgmtLog.h" #include "mgmtAcct.h" @@ -431,7 +432,7 @@ static int32_t mgmtGetDbMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) SUserObj *pUser = mgmtGetUserFromConn(pConn); if (pUser == NULL) return 0; - pShow->bytes[cols] = TSDB_DB_NAME_LEN; + pShow->bytes[cols] = TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE; pSchema[cols].type = TSDB_DATA_TYPE_BINARY; strcpy(pSchema[cols].name, "name"); pSchema[cols].bytes = htons(pShow->bytes[cols]); @@ -439,7 +440,7 @@ static int32_t mgmtGetDbMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) pShow->bytes[cols] = 8; pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP; - strcpy(pSchema[cols].name, "create time"); + strcpy(pSchema[cols].name, "created_time"); pSchema[cols].bytes = htons(pShow->bytes[cols]); cols++; @@ -586,11 +587,9 @@ static int32_t mgmtRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void * cols = 0; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - char* name = mgmtGetDbStr(pDb->name); - *(int16_t*) pWrite = strnlen(name, TSDB_DB_NAME_LEN); - pWrite += sizeof(int16_t); // todo refactor - strncpy(pWrite, mgmtGetDbStr(pDb->name), TSDB_DB_NAME_LEN); + char* name = mgmtGetDbStr(pDb->name); + STR_WITH_MAXSIZE_TO_VARSTR(pWrite, name, TSDB_DB_NAME_LEN); cols++; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; @@ -626,7 +625,10 @@ static int32_t mgmtRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void * #endif pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - sprintf(pWrite, "%d,%d,%d", pDb->cfg.daysToKeep1, pDb->cfg.daysToKeep2, pDb->cfg.daysToKeep); + + char tmp[128] = {0}; + size_t n = sprintf(tmp, "%d,%d,%d", pDb->cfg.daysToKeep1, pDb->cfg.daysToKeep2, pDb->cfg.daysToKeep); + STR_WITH_SIZE_TO_VARSTR(pWrite, tmp, n); cols++; #ifndef __CLOUD_VERSION__ @@ -674,7 +676,11 @@ static int32_t mgmtRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void * cols++; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - strcpy(pWrite, pDb->status != TSDB_DB_STATUS_READY ? "dropping" : "ready"); + if (pDb->status == TSDB_DB_STATUS_READY) { + STR_WITH_SIZE_TO_VARSTR(pWrite, "ready", 5); + } else { + STR_WITH_SIZE_TO_VARSTR(pWrite, "dropping", 8); + } cols++; numOfRows++; diff --git a/src/mnode/src/mgmtTable.c b/src/mnode/src/mgmtTable.c index 7804cd7342..53a208745c 100644 --- a/src/mnode/src/mgmtTable.c +++ b/src/mnode/src/mgmtTable.c @@ -40,6 +40,7 @@ #include "mgmtUser.h" #include "mgmtVgroup.h" #include "tcompare.h" +#include "tdataformat.h" static void * tsChildTableSdb; static void * tsSuperTableSdb; @@ -624,6 +625,7 @@ void mgmtCleanUpTables() { mgmtCleanUpSuperTables(); } +// todo move to name.h, add length of table name static void mgmtExtractTableName(char* tableId, char* name) { int pos = -1; int num = 0; @@ -1056,7 +1058,7 @@ static int32_t mgmtGetShowSuperTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, int32_t cols = 0; SSchema *pSchema = pMeta->schema; - pShow->bytes[cols] = TSDB_TABLE_NAME_LEN; + pShow->bytes[cols] = TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE; pSchema[cols].type = TSDB_DATA_TYPE_BINARY; strcpy(pSchema[cols].name, "name"); pSchema[cols].bytes = htons(pShow->bytes[cols]); @@ -1064,7 +1066,7 @@ static int32_t mgmtGetShowSuperTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, pShow->bytes[cols] = 8; pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP; - strcpy(pSchema[cols].name, "create_time"); + strcpy(pSchema[cols].name, "created_time"); pSchema[cols].bytes = htons(pShow->bytes[cols]); cols++; @@ -2014,15 +2016,15 @@ static int32_t mgmtGetShowTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void int32_t cols = 0; SSchema *pSchema = pMeta->schema; - pShow->bytes[cols] = TSDB_TABLE_NAME_LEN; + pShow->bytes[cols] = TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE; pSchema[cols].type = TSDB_DATA_TYPE_BINARY; - strcpy(pSchema[cols].name, "table name"); + strcpy(pSchema[cols].name, "table_name"); pSchema[cols].bytes = htons(pShow->bytes[cols]); cols++; pShow->bytes[cols] = 8; pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP; - strcpy(pSchema[cols].name, "create time"); + strcpy(pSchema[cols].name, "created_time"); pSchema[cols].bytes = htons(pShow->bytes[cols]); cols++; @@ -2032,9 +2034,9 @@ static int32_t mgmtGetShowTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void pSchema[cols].bytes = htons(pShow->bytes[cols]); cols++; - pShow->bytes[cols] = TSDB_TABLE_NAME_LEN; + pShow->bytes[cols] = TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE; pSchema[cols].type = TSDB_DATA_TYPE_BINARY; - strcpy(pSchema[cols].name, "stable name"); + strcpy(pSchema[cols].name, "stable_name"); pSchema[cols].bytes = htons(pShow->bytes[cols]); cols++; @@ -2098,11 +2100,7 @@ static int32_t mgmtRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows, char *pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - int16_t len = strnlen(tableName, TSDB_DB_NAME_LEN); - *(int16_t*) pWrite = len; - pWrite += sizeof(int16_t); // todo refactor - - strncpy(pWrite, tableName, len); + STR_WITH_MAXSIZE_TO_VARSTR(pWrite, tableName, TSDB_TABLE_NAME_LEN); cols++; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; @@ -2119,9 +2117,13 @@ static int32_t mgmtRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows, cols++; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + + memset(tableName, 0, tListLen(tableName)); if (pTable->info.type == TSDB_CHILD_TABLE) { - mgmtExtractTableName(pTable->superTable->info.tableId, pWrite); + mgmtExtractTableName(pTable->superTable->info.tableId, tableName); + STR_WITH_MAXSIZE_TO_VARSTR(pWrite, tableName, TSDB_TABLE_NAME_LEN); } + cols++; numOfRows++; diff --git a/src/query/inc/qtsbuf.h b/src/query/inc/qtsbuf.h index 8e014e5feb..c83c3dbe25 100644 --- a/src/query/inc/qtsbuf.h +++ b/src/query/inc/qtsbuf.h @@ -48,7 +48,7 @@ typedef struct STSElem { } STSElem; typedef struct STSCursor { - int32_t vnodeIndex; + int32_t vgroupIndex; int32_t blockIndex; int32_t tsIndex; uint32_t order; diff --git a/src/query/src/qtsbuf.c b/src/query/src/qtsbuf.c index 1d5c4f2d9d..555ccb7318 100644 --- a/src/query/src/qtsbuf.c +++ b/src/query/src/qtsbuf.c @@ -482,7 +482,7 @@ static void tsBufGetBlock(STSBuf* pTSBuf, int32_t vnodeIndex, int32_t blockIndex } STSCursor* pCur = &pTSBuf->cur; - if (pCur->vnodeIndex == vnodeIndex && ((pCur->blockIndex <= blockIndex && pCur->order == TSDB_ORDER_ASC) || + if (pCur->vgroupIndex == vnodeIndex && ((pCur->blockIndex <= blockIndex && pCur->order == TSDB_ORDER_ASC) || (pCur->blockIndex >= blockIndex && pCur->order == TSDB_ORDER_DESC))) { int32_t i = 0; bool decomp = false; @@ -517,7 +517,7 @@ static void tsBufGetBlock(STSBuf* pTSBuf, int32_t vnodeIndex, int32_t blockIndex assert((pTSBuf->tsData.len / TSDB_KEYSIZE == pBlock->numOfElem) && (pTSBuf->tsData.allocSize >= pTSBuf->tsData.len)); - pCur->vnodeIndex = vnodeIndex; + pCur->vgroupIndex = vnodeIndex; pCur->blockIndex = blockIndex; pCur->tsIndex = (pCur->order == TSDB_ORDER_ASC) ? 0 : pBlock->numOfElem - 1; @@ -554,7 +554,7 @@ bool tsBufNextPos(STSBuf* pTSBuf) { STSCursor* pCur = &pTSBuf->cur; // get the first/last position according to traverse order - if (pCur->vnodeIndex == -1) { + if (pCur->vgroupIndex == -1) { if (pCur->order == TSDB_ORDER_ASC) { tsBufGetBlock(pTSBuf, 0, 0); @@ -569,9 +569,9 @@ bool tsBufNextPos(STSBuf* pTSBuf) { assert(pTSBuf->numOfVnodes > 0); int32_t vnodeIndex = pTSBuf->numOfVnodes - 1; - pCur->vnodeIndex = vnodeIndex; + pCur->vgroupIndex = vnodeIndex; - int32_t vnodeId = pTSBuf->pData[pCur->vnodeIndex].info.vnode; + int32_t vnodeId = pTSBuf->pData[pCur->vgroupIndex].info.vnode; STSVnodeBlockInfo* pBlockInfo = tsBufGetVnodeBlockInfo(pTSBuf, vnodeId); int32_t blockIndex = pBlockInfo->numOfBlocks - 1; @@ -594,14 +594,14 @@ bool tsBufNextPos(STSBuf* pTSBuf) { if ((pCur->order == TSDB_ORDER_ASC && pCur->tsIndex >= pTSBuf->block.numOfElem - 1) || (pCur->order == TSDB_ORDER_DESC && pCur->tsIndex <= 0)) { - int32_t vnodeId = pTSBuf->pData[pCur->vnodeIndex].info.vnode; + int32_t vnodeId = pTSBuf->pData[pCur->vgroupIndex].info.vnode; STSVnodeBlockInfo* pBlockInfo = tsBufGetVnodeBlockInfo(pTSBuf, vnodeId); if (pBlockInfo == NULL || (pCur->blockIndex >= pBlockInfo->numOfBlocks - 1 && pCur->order == TSDB_ORDER_ASC) || (pCur->blockIndex <= 0 && pCur->order == TSDB_ORDER_DESC)) { - if ((pCur->vnodeIndex >= pTSBuf->numOfVnodes - 1 && pCur->order == TSDB_ORDER_ASC) || - (pCur->vnodeIndex <= 0 && pCur->order == TSDB_ORDER_DESC)) { - pCur->vnodeIndex = -1; + if ((pCur->vgroupIndex >= pTSBuf->numOfVnodes - 1 && pCur->order == TSDB_ORDER_ASC) || + (pCur->vgroupIndex <= 0 && pCur->order == TSDB_ORDER_DESC)) { + pCur->vgroupIndex = -1; return false; } @@ -610,11 +610,11 @@ bool tsBufNextPos(STSBuf* pTSBuf) { } int32_t blockIndex = pCur->order == TSDB_ORDER_ASC ? 0 : pBlockInfo->numOfBlocks - 1; - tsBufGetBlock(pTSBuf, pCur->vnodeIndex + step, blockIndex); + tsBufGetBlock(pTSBuf, pCur->vgroupIndex + step, blockIndex); break; } else { - tsBufGetBlock(pTSBuf, pCur->vnodeIndex, pCur->blockIndex + step); + tsBufGetBlock(pTSBuf, pCur->vgroupIndex, pCur->blockIndex + step); break; } } else { @@ -631,7 +631,7 @@ void tsBufResetPos(STSBuf* pTSBuf) { return; } - pTSBuf->cur = (STSCursor){.tsIndex = -1, .blockIndex = -1, .vnodeIndex = -1, .order = pTSBuf->cur.order}; + pTSBuf->cur = (STSCursor){.tsIndex = -1, .blockIndex = -1, .vgroupIndex = -1, .order = pTSBuf->cur.order}; } STSElem tsBufGetElem(STSBuf* pTSBuf) { @@ -642,13 +642,13 @@ STSElem tsBufGetElem(STSBuf* pTSBuf) { } STSCursor* pCur = &pTSBuf->cur; - if (pCur != NULL && pCur->vnodeIndex < 0) { + if (pCur != NULL && pCur->vgroupIndex < 0) { return elem1; } STSBlock* pBlock = &pTSBuf->block; - elem1.vnode = pTSBuf->pData[pCur->vnodeIndex].info.vnode; + elem1.vnode = pTSBuf->pData[pCur->vgroupIndex].info.vnode; elem1.ts = *(TSKEY*)(pTSBuf->tsData.rawBuf + pCur->tsIndex * TSDB_KEYSIZE); elem1.tag = pBlock->tag; @@ -804,7 +804,7 @@ STSElem tsBufGetElemStartPos(STSBuf* pTSBuf, int32_t vnodeId, int64_t tag) { return elem; } - pCur->vnodeIndex = j; + pCur->vgroupIndex = j; pCur->blockIndex = blockIndex; tsBufGetBlock(pTSBuf, j, blockIndex); @@ -812,7 +812,7 @@ STSElem tsBufGetElemStartPos(STSBuf* pTSBuf, int32_t vnodeId, int64_t tag) { } STSCursor tsBufGetCursor(STSBuf* pTSBuf) { - STSCursor c = {.vnodeIndex = -1}; + STSCursor c = {.vgroupIndex = -1}; if (pTSBuf == NULL) { return c; } @@ -825,9 +825,9 @@ void tsBufSetCursor(STSBuf* pTSBuf, STSCursor* pCur) { return; } - // assert(pCur->vnodeIndex != -1 && pCur->tsIndex >= 0 && pCur->blockIndex >= 0); - if (pCur->vnodeIndex != -1) { - tsBufGetBlock(pTSBuf, pCur->vnodeIndex, pCur->blockIndex); + // assert(pCur->vgroupIndex != -1 && pCur->tsIndex >= 0 && pCur->blockIndex >= 0); + if (pCur->vgroupIndex != -1) { + tsBufGetBlock(pTSBuf, pCur->vgroupIndex, pCur->blockIndex); } pTSBuf->cur = *pCur; diff --git a/src/query/src/queryExecutor.c b/src/query/src/queryExecutor.c index f0f96c15d5..ade2d273ff 100644 --- a/src/query/src/queryExecutor.c +++ b/src/query/src/queryExecutor.c @@ -27,6 +27,7 @@ #include "tscompression.h" #include "ttime.h" #include "tscUtil.h" // todo move the function to common module +#include "tdataformat.h" #define DEFAULT_INTERN_BUF_SIZE 16384L @@ -3517,7 +3518,7 @@ STableQueryInfo *createTableQueryInfoImpl(SQueryRuntimeEnv *pRuntimeEnv, STableI pTableQueryInfo->lastKey = win.skey; pTableQueryInfo->id = tableId; - pTableQueryInfo->cur.vnodeIndex = -1; + pTableQueryInfo->cur.vgroupIndex = -1; initWindowResInfo(&pTableQueryInfo->windowResInfo, pRuntimeEnv, 100, 100, TSDB_DATA_TYPE_INT); return pTableQueryInfo; @@ -3551,7 +3552,7 @@ void changeMeterQueryInfoForSuppleQuery(SQuery *pQuery, STableQueryInfo *pTableQ pTableQueryInfo->lastKey = pTableQueryInfo->win.skey; pTableQueryInfo->cur.order = pTableQueryInfo->cur.order ^ 1u; - pTableQueryInfo->cur.vnodeIndex = -1; + pTableQueryInfo->cur.vgroupIndex = -1; } void restoreIntervalQueryRange(SQueryRuntimeEnv *pRuntimeEnv, STableQueryInfo *pTableQueryInfo) { @@ -3630,7 +3631,7 @@ int32_t setAdditionalInfo(SQInfo *pQInfo, STableId* pTableId, STableQueryInfo *p // both the master and supplement scan needs to set the correct ts comp start position if (pRuntimeEnv->pTSBuf != NULL) { - if (pTableQueryInfo->cur.vnodeIndex == -1) { + if (pTableQueryInfo->cur.vgroupIndex == -1) { pTableQueryInfo->tag = pRuntimeEnv->pCtx[0].tag.i64Key; tsBufGetElemStartPos(pRuntimeEnv->pTSBuf, 0, pTableQueryInfo->tag); @@ -4243,7 +4244,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, void *param, void *tsdb, int32_t vgId, bool pRuntimeEnv->pQuery = pQuery; pRuntimeEnv->pTSBuf = param; - pRuntimeEnv->cur.vnodeIndex = -1; + pRuntimeEnv->cur.vgroupIndex = -1; pRuntimeEnv->stableQuery = isSTableQuery; if (param != NULL) { @@ -4461,7 +4462,7 @@ static bool multiTableMultioutputHelper(SQInfo *pQInfo, int32_t index) { taosArrayDestroy(g1); if (pRuntimeEnv->pTSBuf != NULL) { - if (pRuntimeEnv->cur.vnodeIndex == -1) { + if (pRuntimeEnv->cur.vgroupIndex == -1) { int64_t tag = pRuntimeEnv->pCtx[0].tag.i64Key; STSElem elem = tsBufGetElemStartPos(pRuntimeEnv->pTSBuf, 0, tag); @@ -6302,11 +6303,8 @@ static void buildTagQueryResult(SQInfo* pQInfo) { if (pExprInfo[j].base.colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) { tsdbGetTableName(pQInfo->tsdb, &item->id, &data); - char* dst = pQuery->sdata[j]->data + i * (TSDB_TABLE_NAME_LEN + sizeof(int16_t)); - *(int16_t*) dst = strnlen(data, TSDB_TABLE_NAME_LEN); - dst += sizeof(int16_t); - - strncpy(dst, data, TSDB_TABLE_NAME_LEN); + char* dst = pQuery->sdata[j]->data + i * (TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE); + STR_WITH_MAXSIZE_TO_VARSTR(dst, data, TSDB_TABLE_NAME_LEN); tfree(data); } else {// todo refactor, return the true length of binary|nchar data diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index e5744ab2ee..6d571bd035 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -35,8 +35,9 @@ enum { }; enum { - TSDB_QUERY_TYPE_ALL_ROWS = 1, - TSDB_QUERY_TYPE_LAST_ROW = 2, + TSDB_QUERY_TYPE_ALL = 1, + TSDB_QUERY_TYPE_LAST = 2, + TSDB_QUERY_TYPE_EXTERNAL = 3, }; typedef struct SField { @@ -58,7 +59,7 @@ typedef struct SDataBlockLoadInfo { } SDataBlockLoadInfo; typedef struct SLoadCompBlockInfo { - int32_t sid; /* meter sid */ + int32_t sid; /* table sid */ int32_t fileId; int32_t fileListIndex; } SLoadCompBlockInfo; @@ -92,7 +93,7 @@ typedef struct SBlockOrderSupporter { int32_t numOfTables; STableBlockInfo** pDataBlockInfo; int32_t* blockIndexArray; - int32_t* numOfBlocksPerMeter; + int32_t* numOfBlocksPerTable; } SBlockOrderSupporter; typedef struct STsdbQueryHandle { @@ -144,6 +145,7 @@ TsdbQueryHandleT* tsdbQueryTables(TsdbRepoT* tsdb, STsdbQueryCond* pCond, STable pQueryHandle->order = pCond->order; pQueryHandle->window = pCond->twindow; pQueryHandle->pTsdb = tsdb; + pQueryHandle->type = TSDB_QUERY_TYPE_ALL; tsdbInitReadHelper(&pQueryHandle->rhelper, (STsdbRepo*) tsdb); pQueryHandle->cur.fid = -1; @@ -204,13 +206,23 @@ TsdbQueryHandleT* tsdbQueryTables(TsdbRepoT* tsdb, STsdbQueryCond* pCond, STable TsdbQueryHandleT tsdbQueryLastRow(TsdbRepoT *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList) { STsdbQueryHandle *pQueryHandle = (STsdbQueryHandle*) tsdbQueryTables(tsdb, pCond, groupList); - pQueryHandle->type = TSDB_QUERY_TYPE_LAST_ROW; + pQueryHandle->type = TSDB_QUERY_TYPE_LAST; pQueryHandle->order = TSDB_ORDER_DESC; changeQueryHandleForLastrowQuery(pQueryHandle); return pQueryHandle; } +TsdbQueryHandleT tsdbQueryRowsInExternalWindow(TsdbRepoT *tsdb, STsdbQueryCond* pCond, STableGroupInfo *groupList) { + STsdbQueryHandle *pQueryHandle = (STsdbQueryHandle*) tsdbQueryTables(tsdb, pCond, groupList); + + pQueryHandle->type = TSDB_QUERY_TYPE_EXTERNAL; + pQueryHandle->order = TSDB_ORDER_ASC; + +// changeQueryHandleForLastrowQuery(pQueryHandle); + return pQueryHandle; +} + static bool hasMoreDataInCache(STsdbQueryHandle* pHandle) { size_t size = taosArrayGetSize(pHandle->pTableCheckInfo); assert(pHandle->activeIndex < size && pHandle->activeIndex >= 0 && size >= 1); @@ -689,7 +701,7 @@ int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order) { } static void cleanBlockOrderSupporter(SBlockOrderSupporter* pSupporter, int32_t numOfTables) { - tfree(pSupporter->numOfBlocksPerMeter); + tfree(pSupporter->numOfBlocksPerTable); tfree(pSupporter->blockIndexArray); for (int32_t i = 0; i < numOfTables; ++i) { @@ -708,10 +720,10 @@ static int32_t dataBlockOrderCompar(const void* pLeft, const void* pRight, void* int32_t leftTableBlockIndex = pSupporter->blockIndexArray[leftTableIndex]; int32_t rightTableBlockIndex = pSupporter->blockIndexArray[rightTableIndex]; - if (leftTableBlockIndex > pSupporter->numOfBlocksPerMeter[leftTableIndex]) { + if (leftTableBlockIndex > pSupporter->numOfBlocksPerTable[leftTableIndex]) { /* left block is empty */ return 1; - } else if (rightTableBlockIndex > pSupporter->numOfBlocksPerMeter[rightTableIndex]) { + } else if (rightTableBlockIndex > pSupporter->numOfBlocksPerTable[rightTableIndex]) { /* right block is empty */ return -1; } @@ -743,11 +755,11 @@ static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numO SBlockOrderSupporter sup = {0}; sup.numOfTables = numOfTables; - sup.numOfBlocksPerMeter = calloc(1, sizeof(int32_t) * numOfTables); + sup.numOfBlocksPerTable = calloc(1, sizeof(int32_t) * numOfTables); sup.blockIndexArray = calloc(1, sizeof(int32_t) * numOfTables); sup.pDataBlockInfo = calloc(1, POINTER_BYTES * numOfTables); - if (sup.numOfBlocksPerMeter == NULL || sup.blockIndexArray == NULL || sup.pDataBlockInfo == NULL) { + if (sup.numOfBlocksPerTable == NULL || sup.blockIndexArray == NULL || sup.pDataBlockInfo == NULL) { cleanBlockOrderSupporter(&sup, 0); return TSDB_CODE_SERV_OUT_OF_MEMORY; } @@ -761,7 +773,7 @@ static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numO } SCompBlock* pBlock = pTableCheck->pCompInfo->blocks; - sup.numOfBlocksPerMeter[numOfQualTables] = pTableCheck->numOfBlocks; + sup.numOfBlocksPerTable[numOfQualTables] = pTableCheck->numOfBlocks; char* buf = calloc(1, sizeof(STableBlockInfo) * pTableCheck->numOfBlocks); if (buf == NULL) { @@ -779,7 +791,7 @@ static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numO pBlockInfoEx->pTableCheckInfo = pTableCheck; // pBlockInfoEx->groupIdx = pTableCheckInfo[j]->groupIdx; // set the group index - // pBlockInfoEx->blockIndex = pTableCheckInfo[j]->start + k; // set the block index in original meter + // pBlockInfoEx->blockIndex = pTableCheckInfo[j]->start + k; // set the block index in original table cnt++; } @@ -788,7 +800,7 @@ static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numO uTrace("%p create data blocks info struct completed, %d blocks in %d tables", pQueryHandle, cnt, numOfQualTables); - assert(cnt <= numOfBlocks && numOfQualTables <= numOfTables); // the pMeterDataInfo[j]->numOfBlocks may be 0 + assert(cnt <= numOfBlocks && numOfQualTables <= numOfTables); // the pTableQueryInfo[j]->numOfBlocks may be 0 sup.numOfTables = numOfQualTables; SLoserTreeInfo* pTree = NULL; @@ -808,8 +820,8 @@ static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numO pQueryHandle->pDataBlockInfo[numOfTotal++] = pBlocksInfoEx[index]; // set data block index overflow, in order to disable the offset comparator - if (sup.blockIndexArray[pos] >= sup.numOfBlocksPerMeter[pos]) { - sup.blockIndexArray[pos] = sup.numOfBlocksPerMeter[pos] + 1; + if (sup.blockIndexArray[pos] >= sup.numOfBlocksPerTable[pos]) { + sup.blockIndexArray[pos] = sup.numOfBlocksPerTable[pos] + 1; } tLoserTreeAdjust(pTree, pos + sup.numOfTables); @@ -843,10 +855,14 @@ static bool getDataBlocksInFilesImpl(STsdbQueryHandle* pQueryHandle) { break; } - assert(numOfBlocks >= 0); uTrace("%p %d blocks found in file for %d table(s), fid:%d", pQueryHandle, numOfBlocks, numOfTables, pQueryHandle->pFileGroup->fileId); + assert(numOfBlocks >= 0); + if (numOfBlocks == 0) { + continue; + } + // todo return error code to query engine if (createDataBlocksInfo(pQueryHandle, numOfBlocks, &pQueryHandle->numOfBlocks) != TSDB_CODE_SUCCESS) { break; @@ -966,17 +982,22 @@ void changeQueryHandleForLastrowQuery(TsdbQueryHandleT pqHandle) { // todo consider the query time window, current last_row does not apply the query time window size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo); - TSKEY key = INT64_MIN; + TSKEY key = 0; int32_t index = -1; for(int32_t i = 0; i < numOfTables; ++i) { STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i); - if (pCheckInfo->pTableObj->lastKey > key) { + if (pCheckInfo->pTableObj->lastKey > key) { //todo lastKey should not be 0 by default key = pCheckInfo->pTableObj->lastKey; index = i; } } + // todo, there are no data in all the tables. opt performance + if (index == -1) { + return; + } + // erase all other elements in array list, todo refactor size_t size = taosArrayGetSize(pQueryHandle->pTableCheckInfo); for (int32_t i = 0; i < size; ++i) { -- GitLab