From 6788a4222c90bfb388883cf640a3b2102e24402d Mon Sep 17 00:00:00 2001 From: slguan Date: Sat, 20 Jul 2019 17:17:10 +0800 Subject: [PATCH] Fix the issue ##110, unsorted timestamp in one request may cause crash --- src/client/inc/tscUtil.h | 6 ++- src/client/src/tscParseInsert.c | 74 +++++++++++++++++++++------------ src/inc/taosmsg.h | 2 + src/rpc/src/tstring.c | 4 +- src/system/src/vnodeCache.c | 25 +++++++---- src/system/src/vnodeFile.c | 24 ++++++++--- src/system/src/vnodeRead.c | 1 - 7 files changed, 91 insertions(+), 45 deletions(-) diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index 8055ba2f92..7d470c35a4 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -41,10 +41,12 @@ typedef struct SParsedColElem { } SParsedColElem; typedef struct SParsedDataColInfo { - int32_t numOfCols; - int32_t numOfParsedCols; + bool ordered; // denote if the timestamp in one data block ordered or not + int16_t numOfCols; + int16_t numOfAssignedCols; SParsedColElem elems[TSDB_MAX_COLUMNS]; bool hasVal[TSDB_MAX_COLUMNS]; + int64_t prevTimestamp; } SParsedDataColInfo; SInsertedDataBlocks* tscCreateDataBlock(int32_t size); diff --git a/src/client/src/tscParseInsert.c b/src/client/src/tscParseInsert.c index 792ecaf9ee..471ceac4fd 100644 --- a/src/client/src/tscParseInsert.c +++ b/src/client/src/tscParseInsert.c @@ -98,7 +98,6 @@ int tsParseTime(char* value, int32_t valuelen, int64_t* time, char** next, char* char* pTokenEnd = *next; tscGetToken(pTokenEnd, &token, &tokenlen); - if (tokenlen == 0 && strlen(value) == 0) { INVALID_SQL_RET_MSG(error, "missing time stamp"); } @@ -171,7 +170,7 @@ int tsParseTime(char* value, int32_t valuelen, int64_t* time, char** next, char* } int32_t tsParseOneColumnData(SSchema* pSchema, char* value, int valuelen, char* payload, char* msg, char** str, - bool primaryKey, int16_t timePrec) { + bool primaryKey, int16_t timePrec) { int64_t temp; int32_t nullInt = *(int32_t*)TSDB_DATA_NULL_STR_L; char* endptr = NULL; @@ -359,13 +358,13 @@ static void setErrMsg(char* msg, char* sql) { } int tsParseOneRowData(char** str, char* payload, SSchema schema[], SParsedDataColInfo* spd, char* error, - int16_t timePrec) { + int16_t timePrec) { char* value = NULL; int valuelen = 0; /* 1. set the parsed value from sql string */ int32_t rowSize = 0; - for (int i = 0; i < spd->numOfParsedCols; ++i) { + for (int i = 0; i < spd->numOfAssignedCols; ++i) { /* the start position in data block buffer of current value in sql */ char* start = payload + spd->elems[i].offset; int16_t colIndex = spd->elems[i].colIndex; @@ -392,14 +391,24 @@ int tsParseOneRowData(char** str, char* payload, SSchema schema[], SParsedDataCo } int32_t ret = tsParseOneColumnData(&schema[colIndex], value, valuelen, start, error, str, - colIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX, timePrec); + colIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX, timePrec); if (ret != 0) { return -1; // NOTE: here 0 mean error! } + + // once the data block is disordered, we do NOT keep previous timestamp any more + if (colIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX && spd->ordered) { + TSKEY k = *(TSKEY*)start; + if (k < spd->prevTimestamp) { + spd->ordered = false; + } + + spd->prevTimestamp = k; + } } /*2. set the null value for the rest columns */ - if (spd->numOfParsedCols < spd->numOfCols) { + if (spd->numOfAssignedCols < spd->numOfCols) { char* ptr = payload; for (int32_t i = 0; i < spd->numOfCols; ++i) { @@ -417,6 +426,12 @@ int tsParseOneRowData(char** str, char* payload, SSchema schema[], SParsedDataCo return rowSize; } +static int32_t rowDataCompar(const void* lhs, const void* rhs) { + TSKEY left = GET_INT64_VAL(lhs); + TSKEY right = GET_INT64_VAL(rhs); + DEFAULT_COMP(left, right); +} + int tsParseValues(char** str, SInsertedDataBlocks* pDataBlock, SMeterMeta* pMeterMeta, int maxRows, SParsedDataColInfo* spd, char* error) { char* token; @@ -427,7 +442,7 @@ int tsParseValues(char** str, SInsertedDataBlocks* pDataBlock, SMeterMeta* pMete int16_t numOfRows = 0; pDataBlock->size += sizeof(SShellSubmitBlock); - if (spd->hasVal[0] == false) { + if (!spd->hasVal[0]) { sprintf(error, "primary timestamp column can not be null"); return -1; } @@ -442,8 +457,8 @@ int tsParseValues(char** str, SInsertedDataBlocks* pDataBlock, SMeterMeta* pMete maxRows += tscAllocateMemIfNeed(pDataBlock, pMeterMeta->rowSize); } - int32_t len = tsParseOneRowData(str, pDataBlock->pData + pDataBlock->size, pSchema, spd, error, - pMeterMeta->precision); + int32_t len = + tsParseOneRowData(str, pDataBlock->pData + pDataBlock->size, pSchema, spd, error, pMeterMeta->precision); if (len <= 0) { setErrMsg(error, *str); return -1; @@ -462,10 +477,9 @@ int tsParseValues(char** str, SInsertedDataBlocks* pDataBlock, SMeterMeta* pMete if (numOfRows <= 0) { strcpy(error, "no any data points"); - return -1; - } else { - return numOfRows; } + + return numOfRows; } static void appendDataBlock(SDataBlockList* pList, SInsertedDataBlocks* pBlocks) { @@ -480,9 +494,11 @@ static void appendDataBlock(SDataBlockList* pList, SInsertedDataBlocks* pBlocks) pList->pData[pList->nSize++] = pBlocks; } -static void tscSetAllColumnsHasValue(SParsedDataColInfo* spd, SSchema* pSchema, int32_t numOfCols) { +static void tscSetAssignedColumnInfo(SParsedDataColInfo* spd, SSchema* pSchema, int16_t numOfCols) { + spd->ordered = true; + spd->prevTimestamp = INT64_MIN; spd->numOfCols = numOfCols; - spd->numOfParsedCols = numOfCols; + spd->numOfAssignedCols = numOfCols; for (int32_t i = 0; i < numOfCols; ++i) { spd->hasVal[i] = true; @@ -522,7 +538,7 @@ void tsSetBlockInfo(SShellSubmitBlock* pBlocks, const SMeterMeta* pMeterMeta, in } static int32_t doParseInsertStatement(SSqlCmd* pCmd, SSqlRes* pRes, void* pDataBlockHashList, char** str, - SParsedDataColInfo* spd) { + SParsedDataColInfo* spd) { SMeterMeta* pMeterMeta = pCmd->pMeterMeta; int32_t numOfRows = 0; @@ -551,6 +567,13 @@ static int32_t doParseInsertStatement(SSqlCmd* pCmd, SSqlRes* pRes, void* pDataB return TSDB_CODE_INVALID_SQL; } + // data block is disordered, sort it in ascending order + if (!spd->ordered) { + char* pBlockData = dataBuf->pData + startPos + sizeof(SShellSubmitBlock); + qsort(pBlockData, numOfRows, pMeterMeta->rowSize, rowDataCompar); + spd->ordered = true; + } + SShellSubmitBlock* pBlocks = (SShellSubmitBlock*)(dataBuf->pData + startPos); tsSetBlockInfo(pBlocks, pMeterMeta, numOfRows); dataBuf->numOfMeters += 1; @@ -559,7 +582,7 @@ static int32_t doParseInsertStatement(SSqlCmd* pCmd, SSqlRes* pRes, void* pDataB * the value of pRes->numOfRows does not affect the true result of AFFECTED ROWS, which is * actually returned from server. * - * * NOTE: + * NOTE: * The better way is to use a local variable to store the number of rows that * has been extracted from sql expression string, and avoid to do the invalid write check */ @@ -654,7 +677,7 @@ static int32_t tscParseSqlForCreateTableOnDemand(char** sqlstr, SSqlObj* pSql) { } code = tsParseOneColumnData(&pTagSchema[numOfTagValues], id, idlen, tagVal, pCmd->payload, &sql, false, - pCmd->pMeterMeta->precision); + pCmd->pMeterMeta->precision); if (code != TSDB_CODE_SUCCESS) { setErrMsg(pCmd->payload, sql); return TSDB_CODE_INVALID_SQL; @@ -712,8 +735,6 @@ static int32_t tscParseSqlForCreateTableOnDemand(char** sqlstr, SSqlObj* pSql) { * @return */ int tsParseInsertStatement(SSqlCmd* pCmd, char* str, char* acct, char* db, SSqlObj* pSql) { - const int32_t RESERVED_SIZE = 1024; - pCmd->command = TSDB_SQL_INSERT; pCmd->isInsertFromFile = -1; pCmd->count = 0; @@ -789,7 +810,7 @@ int tsParseInsertStatement(SSqlCmd* pCmd, char* str, char* acct, char* db, SSqlO SParsedDataColInfo spd = {0}; SSchema* pSchema = tsGetSchema(pCmd->pMeterMeta); - tscSetAllColumnsHasValue(&spd, pSchema, pCmd->pMeterMeta->numOfColumns); + tscSetAssignedColumnInfo(&spd, pSchema, pCmd->pMeterMeta->numOfColumns); if (pCmd->isInsertFromFile == -1) { pCmd->isInsertFromFile = 0; @@ -828,10 +849,9 @@ int tsParseInsertStatement(SSqlCmd* pCmd, char* str, char* acct, char* db, SSqlO goto _error_clean; } - // char fname[TSDB_FILENAME_LEN] = "\0"; - char* fname = malloc(idlen + 1); - memset(fname, 0, idlen + 1); + char* fname = calloc(1, idlen + 1); memcpy(fname, id, idlen); + wordexp_t full_path; if (wordexp(fname, &full_path, 0) != 0) { code = TSDB_CODE_INVALID_SQL; @@ -886,7 +906,7 @@ int tsParseInsertStatement(SSqlCmd* pCmd, char* str, char* acct, char* db, SSqlO // todo speedup by using hash list for (int32_t t = 0; t < pMeterMeta->numOfColumns; ++t) { if (strncmp(id, pSchema[t].name, idlen) == 0 && strlen(pSchema[t].name) == idlen) { - SParsedColElem* pElem = &spd.elems[spd.numOfParsedCols++]; + SParsedColElem* pElem = &spd.elems[spd.numOfAssignedCols++]; pElem->offset = offset[t]; pElem->colIndex = t; @@ -909,7 +929,7 @@ int tsParseInsertStatement(SSqlCmd* pCmd, char* str, char* acct, char* db, SSqlO } } - if (spd.numOfParsedCols == 0 || spd.numOfParsedCols > pMeterMeta->numOfColumns) { + if (spd.numOfAssignedCols == 0 || spd.numOfAssignedCols > pMeterMeta->numOfColumns) { code = TSDB_CODE_INVALID_SQL; sprintf(pCmd->payload, "column name expected"); goto _error_clean; @@ -1059,12 +1079,12 @@ static int tscInsertDataFromFile(SSqlObj* pSql, FILE* fp) { SParsedDataColInfo spd = {0}; SSchema* pSchema = tsGetSchema(pCmd->pMeterMeta); - tscSetAllColumnsHasValue(&spd, pSchema, pCmd->pMeterMeta->numOfColumns); + tscSetAssignedColumnInfo(&spd, pSchema, pCmd->pMeterMeta->numOfColumns); while ((readLen = getline(&line, &n, fp)) != -1) { // line[--readLen] = '\0'; if (('\r' == line[readLen - 1]) || ('\n' == line[readLen - 1])) line[--readLen] = 0; - if (readLen <= 0 ) continue; + if (readLen <= 0) continue; char* lineptr = line; strtolower(line, line); diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index c25664a522..2187bb4d87 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -127,6 +127,8 @@ extern "C" { #define TSDB_CODE_BATCH_SIZE_TOO_BIG 104 #define TSDB_CODE_TIMESTAMP_OUT_OF_RANGE 105 #define TSDB_CODE_INVALID_QUERY_MSG 106 // failed to validate the sql expression msg by vnode +#define TSDB_CODE_CACHE_BLOCK_TS_DISORDERED 107 // time stamp in cache block is disordered +#define TSDB_CODE_FILE_BLOCK_TS_DISORDERED 108 // time stamp in file block is disordered // message type #define TSDB_MSG_TYPE_REG 1 diff --git a/src/rpc/src/tstring.c b/src/rpc/src/tstring.c index f646fcb1f5..d76f3055a4 100644 --- a/src/rpc/src/tstring.c +++ b/src/rpc/src/tstring.c @@ -230,5 +230,7 @@ char *tsError[] = {"success", "session not ready", "batch size too big", "timestamp out of range", - "invalid query message" + "invalid query message", + "timestamp disordered in cache block", + "timestamp disordered in file block" }; diff --git a/src/system/src/vnodeCache.c b/src/system/src/vnodeCache.c index c67f548bee..81ce27fd83 100644 --- a/src/system/src/vnodeCache.c +++ b/src/system/src/vnodeCache.c @@ -553,8 +553,10 @@ int vnodeQueryFromCache(SMeterObj *pObj, SQuery *pQuery) { return 0; } + TSKEY startkey = vnodeGetTSInCacheBlock(pCacheBlock, 0); + TSKEY endkey = vnodeGetTSInCacheBlock(pCacheBlock, numOfPoints - 1); + if (QUERY_IS_ASC_QUERY(pQuery)) { - TSKEY endkey = vnodeGetTSInCacheBlock(pCacheBlock, numOfPoints - 1); if (endkey < pQuery->ekey) { numOfReads = maxReads; } else { @@ -563,7 +565,6 @@ int vnodeQueryFromCache(SMeterObj *pObj, SQuery *pQuery) { numOfReads = (lastPos >= 0) ? lastPos + 1 : 0; } } else { - TSKEY startkey = vnodeGetTSInCacheBlock(pCacheBlock, 0); if (startkey > pQuery->ekey) { numOfReads = maxReads; } else { @@ -595,8 +596,7 @@ int vnodeQueryFromCache(SMeterObj *pObj, SQuery *pQuery) { int16_t type = GET_COLUMN_TYPE(pQuery, col); pData = pQuery->sdata[col]->data + pQuery->pointsOffset * bytes; - /* this column is absent from current block, fill this block with null - * value */ + /* this column is absent from current block, fill this block with null value */ if (colIdx < 0 || colIdx >= pObj->numOfColumns || pObj->schema[colIdx].colId != pQuery->pSelectExpr[col].pBase.colInfo.colId) { // set null setNullN(pData, type, bytes, pCacheBlock->numOfPoints); @@ -611,8 +611,7 @@ int vnodeQueryFromCache(SMeterObj *pObj, SQuery *pQuery) { for (int32_t k = 0; k < pQuery->numOfFilterCols; ++k) { int16_t colIdx = pQuery->pFilterInfo[k].pFilter.colIdx; - if (colIdx < 0) { - /* current data has not specified column */ + if (colIdx < 0) { // current data has not specified column pQuery->pFilterInfo[k].pData = NULL; } else { pQuery->pFilterInfo[k].pData = pCacheBlock->offset[colIdx]; @@ -625,7 +624,12 @@ int vnodeQueryFromCache(SMeterObj *pObj, SQuery *pQuery) { if (QUERY_IS_ASC_QUERY(pQuery)) { for (int32_t j = startPos; j < pCacheBlock->numOfPoints; ++j) { TSKEY key = vnodeGetTSInCacheBlock(pCacheBlock, j); - assert(key >= pQuery->skey); + if (key < startkey || key > endkey) { + dError("vid:%d sid:%d id:%s, timestamp in cache slot is disordered. slot:%d, pos:%d, ts:%lld, block " + "range:%lld-%lld", pObj->vnode, pObj->sid, pObj->meterId, pQuery->slot, j, key, startkey, endkey); + tfree(ids); + return -TSDB_CODE_FILE_BLOCK_TS_DISORDERED; + } if (key > pQuery->ekey) { break; @@ -645,7 +649,12 @@ int vnodeQueryFromCache(SMeterObj *pObj, SQuery *pQuery) { startPos = pQuery->pos; for (int32_t j = startPos; j >= 0; --j) { TSKEY key = vnodeGetTSInCacheBlock(pCacheBlock, j); - assert(key <= pQuery->skey); + if (key < startkey || key > endkey) { + dError("vid:%d sid:%d id:%s, timestamp in cache slot is disordered. slot:%d, pos:%d, ts:%lld, block " + "range:%lld-%lld", pObj->vnode, pObj->sid, pObj->meterId, pQuery->slot, j, key, startkey, endkey); + tfree(ids); + return -TSDB_CODE_FILE_BLOCK_TS_DISORDERED; + } if (key < pQuery->ekey) { break; diff --git a/src/system/src/vnodeFile.c b/src/system/src/vnodeFile.c index 13efa77c9d..e3806ed456 100644 --- a/src/system/src/vnodeFile.c +++ b/src/system/src/vnodeFile.c @@ -1517,10 +1517,11 @@ int vnodeQueryFromFile(SMeterObj *pObj, SQuery *pQuery) { * we allocate tsData buffer with twice size of the other ordinary pQuery->sdata. * Otherwise, the query function may over-write buffer area while retrieve function has not packed the results into * message to send to client yet. + * * So the startPositionFactor is needed to denote which half part is used to store the result, and which * part is available for keep data during query process. - * Note: the startPositionFactor must be used in conjunction with - * pQuery->pointsOffset + * + * Note: the startPositionFactor must be used in conjunction with pQuery->pointsOffset */ int32_t startPositionFactor = 1; if (pQuery->colList[0].colIdx == PRIMARYKEY_TIMESTAMP_COL_INDEX) { @@ -1537,8 +1538,10 @@ int vnodeQueryFromFile(SMeterObj *pObj, SQuery *pQuery) { int maxReads = QUERY_IS_ASC_QUERY(pQuery) ? pBlock->numOfPoints - pQuery->pos : pQuery->pos + 1; + TSKEY startKey = vnodeGetTSInDataBlock(pQuery, 0, startPositionFactor); + TSKEY endKey = vnodeGetTSInDataBlock(pQuery, pBlock->numOfPoints - 1, startPositionFactor); + if (QUERY_IS_ASC_QUERY(pQuery)) { - TSKEY endKey = vnodeGetTSInDataBlock(pQuery, pBlock->numOfPoints - 1, startPositionFactor); if (endKey < pQuery->ekey) { numOfReads = maxReads; } else { @@ -1548,7 +1551,6 @@ int vnodeQueryFromFile(SMeterObj *pObj, SQuery *pQuery) { numOfReads = (lastPos >= 0) ? lastPos + 1 : 0; } } else { - TSKEY startKey = vnodeGetTSInDataBlock(pQuery, 0, startPositionFactor); if (startKey > pQuery->ekey) { numOfReads = maxReads; } else { @@ -1601,7 +1603,12 @@ int vnodeQueryFromFile(SMeterObj *pObj, SQuery *pQuery) { if (QUERY_IS_ASC_QUERY(pQuery)) { for (int32_t j = startPos; j < pBlock->numOfPoints; j -= step) { TSKEY key = vnodeGetTSInDataBlock(pQuery, j, startPositionFactor); - assert(key >= pQuery->skey); + if (key < startKey || key > endKey) { + dError("vid:%d sid:%d id:%s, timestamp in file block disordered. slot:%d, pos:%d, ts:%lld, block " + "range:%lld-%lld", pObj->vnode, pObj->sid, pObj->meterId, pQuery->slot, j, key, startKey, endKey); + tfree(ids); + return -TSDB_CODE_FILE_BLOCK_TS_DISORDERED; + } // out of query range, quit if (key > pQuery->ekey) { @@ -1621,7 +1628,12 @@ int vnodeQueryFromFile(SMeterObj *pObj, SQuery *pQuery) { } else { for (int32_t j = pQuery->pos; j >= 0; --j) { TSKEY key = vnodeGetTSInDataBlock(pQuery, j, startPositionFactor); - assert(key <= pQuery->skey); + if (key < startKey || key > endKey) { + dError("vid:%d sid:%d id:%s, timestamp in file block disordered. slot:%d, pos:%d, ts:%lld, block " + "range:%lld-%lld", pObj->vnode, pObj->sid, pObj->meterId, pQuery->slot, j, key, startKey, endKey); + tfree(ids); + return -TSDB_CODE_FILE_BLOCK_TS_DISORDERED; + } // out of query range, quit if (key < pQuery->ekey) { diff --git a/src/system/src/vnodeRead.c b/src/system/src/vnodeRead.c index 5ecb63c6f6..ae38eea3d7 100644 --- a/src/system/src/vnodeRead.c +++ b/src/system/src/vnodeRead.c @@ -507,7 +507,6 @@ void vnodeQueryData(SSchedMsg *pMsg) { pQuery->pointsToRead = vnodeList[pObj->vnode].cfg.rowsInFileBlock; pQuery->pointsOffset = pQInfo->bufIndex * pQuery->pointsToRead; - // dTrace("id:%s, start to query data", pQInfo->pObj->meterId); int64_t st = taosGetTimestampUs(); while (1) { -- GitLab