提交 6788a422 编写于 作者: S slguan

Fix the issue ##110, unsorted timestamp in one request may cause crash

上级 95b157c0
......@@ -41,10 +41,12 @@ typedef struct SParsedColElem {
} SParsedColElem;
typedef struct SParsedDataColInfo {
int32_t numOfCols;
int32_t numOfParsedCols;
bool ordered; // denote if the timestamp in one data block ordered or not
int16_t numOfCols;
int16_t numOfAssignedCols;
SParsedColElem elems[TSDB_MAX_COLUMNS];
bool hasVal[TSDB_MAX_COLUMNS];
int64_t prevTimestamp;
} SParsedDataColInfo;
SInsertedDataBlocks* tscCreateDataBlock(int32_t size);
......
......@@ -98,7 +98,6 @@ int tsParseTime(char* value, int32_t valuelen, int64_t* time, char** next, char*
char* pTokenEnd = *next;
tscGetToken(pTokenEnd, &token, &tokenlen);
if (tokenlen == 0 && strlen(value) == 0) {
INVALID_SQL_RET_MSG(error, "missing time stamp");
}
......@@ -365,7 +364,7 @@ int tsParseOneRowData(char** str, char* payload, SSchema schema[], SParsedDataCo
/* 1. set the parsed value from sql string */
int32_t rowSize = 0;
for (int i = 0; i < spd->numOfParsedCols; ++i) {
for (int i = 0; i < spd->numOfAssignedCols; ++i) {
/* the start position in data block buffer of current value in sql */
char* start = payload + spd->elems[i].offset;
int16_t colIndex = spd->elems[i].colIndex;
......@@ -396,10 +395,20 @@ int tsParseOneRowData(char** str, char* payload, SSchema schema[], SParsedDataCo
if (ret != 0) {
return -1; // NOTE: here 0 mean error!
}
// once the data block is disordered, we do NOT keep previous timestamp any more
if (colIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX && spd->ordered) {
TSKEY k = *(TSKEY*)start;
if (k < spd->prevTimestamp) {
spd->ordered = false;
}
spd->prevTimestamp = k;
}
}
/*2. set the null value for the rest columns */
if (spd->numOfParsedCols < spd->numOfCols) {
if (spd->numOfAssignedCols < spd->numOfCols) {
char* ptr = payload;
for (int32_t i = 0; i < spd->numOfCols; ++i) {
......@@ -417,6 +426,12 @@ int tsParseOneRowData(char** str, char* payload, SSchema schema[], SParsedDataCo
return rowSize;
}
static int32_t rowDataCompar(const void* lhs, const void* rhs) {
TSKEY left = GET_INT64_VAL(lhs);
TSKEY right = GET_INT64_VAL(rhs);
DEFAULT_COMP(left, right);
}
int tsParseValues(char** str, SInsertedDataBlocks* pDataBlock, SMeterMeta* pMeterMeta, int maxRows,
SParsedDataColInfo* spd, char* error) {
char* token;
......@@ -427,7 +442,7 @@ int tsParseValues(char** str, SInsertedDataBlocks* pDataBlock, SMeterMeta* pMete
int16_t numOfRows = 0;
pDataBlock->size += sizeof(SShellSubmitBlock);
if (spd->hasVal[0] == false) {
if (!spd->hasVal[0]) {
sprintf(error, "primary timestamp column can not be null");
return -1;
}
......@@ -442,8 +457,8 @@ int tsParseValues(char** str, SInsertedDataBlocks* pDataBlock, SMeterMeta* pMete
maxRows += tscAllocateMemIfNeed(pDataBlock, pMeterMeta->rowSize);
}
int32_t len = tsParseOneRowData(str, pDataBlock->pData + pDataBlock->size, pSchema, spd, error,
pMeterMeta->precision);
int32_t len =
tsParseOneRowData(str, pDataBlock->pData + pDataBlock->size, pSchema, spd, error, pMeterMeta->precision);
if (len <= 0) {
setErrMsg(error, *str);
return -1;
......@@ -462,10 +477,9 @@ int tsParseValues(char** str, SInsertedDataBlocks* pDataBlock, SMeterMeta* pMete
if (numOfRows <= 0) {
strcpy(error, "no any data points");
return -1;
} else {
return numOfRows;
}
return numOfRows;
}
static void appendDataBlock(SDataBlockList* pList, SInsertedDataBlocks* pBlocks) {
......@@ -480,9 +494,11 @@ static void appendDataBlock(SDataBlockList* pList, SInsertedDataBlocks* pBlocks)
pList->pData[pList->nSize++] = pBlocks;
}
static void tscSetAllColumnsHasValue(SParsedDataColInfo* spd, SSchema* pSchema, int32_t numOfCols) {
static void tscSetAssignedColumnInfo(SParsedDataColInfo* spd, SSchema* pSchema, int16_t numOfCols) {
spd->ordered = true;
spd->prevTimestamp = INT64_MIN;
spd->numOfCols = numOfCols;
spd->numOfParsedCols = numOfCols;
spd->numOfAssignedCols = numOfCols;
for (int32_t i = 0; i < numOfCols; ++i) {
spd->hasVal[i] = true;
......@@ -551,6 +567,13 @@ static int32_t doParseInsertStatement(SSqlCmd* pCmd, SSqlRes* pRes, void* pDataB
return TSDB_CODE_INVALID_SQL;
}
// data block is disordered, sort it in ascending order
if (!spd->ordered) {
char* pBlockData = dataBuf->pData + startPos + sizeof(SShellSubmitBlock);
qsort(pBlockData, numOfRows, pMeterMeta->rowSize, rowDataCompar);
spd->ordered = true;
}
SShellSubmitBlock* pBlocks = (SShellSubmitBlock*)(dataBuf->pData + startPos);
tsSetBlockInfo(pBlocks, pMeterMeta, numOfRows);
dataBuf->numOfMeters += 1;
......@@ -559,7 +582,7 @@ static int32_t doParseInsertStatement(SSqlCmd* pCmd, SSqlRes* pRes, void* pDataB
* the value of pRes->numOfRows does not affect the true result of AFFECTED ROWS, which is
* actually returned from server.
*
* * NOTE:
* NOTE:
* The better way is to use a local variable to store the number of rows that
* has been extracted from sql expression string, and avoid to do the invalid write check
*/
......@@ -712,8 +735,6 @@ static int32_t tscParseSqlForCreateTableOnDemand(char** sqlstr, SSqlObj* pSql) {
* @return
*/
int tsParseInsertStatement(SSqlCmd* pCmd, char* str, char* acct, char* db, SSqlObj* pSql) {
const int32_t RESERVED_SIZE = 1024;
pCmd->command = TSDB_SQL_INSERT;
pCmd->isInsertFromFile = -1;
pCmd->count = 0;
......@@ -789,7 +810,7 @@ int tsParseInsertStatement(SSqlCmd* pCmd, char* str, char* acct, char* db, SSqlO
SParsedDataColInfo spd = {0};
SSchema* pSchema = tsGetSchema(pCmd->pMeterMeta);
tscSetAllColumnsHasValue(&spd, pSchema, pCmd->pMeterMeta->numOfColumns);
tscSetAssignedColumnInfo(&spd, pSchema, pCmd->pMeterMeta->numOfColumns);
if (pCmd->isInsertFromFile == -1) {
pCmd->isInsertFromFile = 0;
......@@ -828,10 +849,9 @@ int tsParseInsertStatement(SSqlCmd* pCmd, char* str, char* acct, char* db, SSqlO
goto _error_clean;
}
// char fname[TSDB_FILENAME_LEN] = "\0";
char* fname = malloc(idlen + 1);
memset(fname, 0, idlen + 1);
char* fname = calloc(1, idlen + 1);
memcpy(fname, id, idlen);
wordexp_t full_path;
if (wordexp(fname, &full_path, 0) != 0) {
code = TSDB_CODE_INVALID_SQL;
......@@ -886,7 +906,7 @@ int tsParseInsertStatement(SSqlCmd* pCmd, char* str, char* acct, char* db, SSqlO
// todo speedup by using hash list
for (int32_t t = 0; t < pMeterMeta->numOfColumns; ++t) {
if (strncmp(id, pSchema[t].name, idlen) == 0 && strlen(pSchema[t].name) == idlen) {
SParsedColElem* pElem = &spd.elems[spd.numOfParsedCols++];
SParsedColElem* pElem = &spd.elems[spd.numOfAssignedCols++];
pElem->offset = offset[t];
pElem->colIndex = t;
......@@ -909,7 +929,7 @@ int tsParseInsertStatement(SSqlCmd* pCmd, char* str, char* acct, char* db, SSqlO
}
}
if (spd.numOfParsedCols == 0 || spd.numOfParsedCols > pMeterMeta->numOfColumns) {
if (spd.numOfAssignedCols == 0 || spd.numOfAssignedCols > pMeterMeta->numOfColumns) {
code = TSDB_CODE_INVALID_SQL;
sprintf(pCmd->payload, "column name expected");
goto _error_clean;
......@@ -1059,12 +1079,12 @@ static int tscInsertDataFromFile(SSqlObj* pSql, FILE* fp) {
SParsedDataColInfo spd = {0};
SSchema* pSchema = tsGetSchema(pCmd->pMeterMeta);
tscSetAllColumnsHasValue(&spd, pSchema, pCmd->pMeterMeta->numOfColumns);
tscSetAssignedColumnInfo(&spd, pSchema, pCmd->pMeterMeta->numOfColumns);
while ((readLen = getline(&line, &n, fp)) != -1) {
// line[--readLen] = '\0';
if (('\r' == line[readLen - 1]) || ('\n' == line[readLen - 1])) line[--readLen] = 0;
if (readLen <= 0 ) continue;
if (readLen <= 0) continue;
char* lineptr = line;
strtolower(line, line);
......
......@@ -127,6 +127,8 @@ extern "C" {
#define TSDB_CODE_BATCH_SIZE_TOO_BIG 104
#define TSDB_CODE_TIMESTAMP_OUT_OF_RANGE 105
#define TSDB_CODE_INVALID_QUERY_MSG 106 // failed to validate the sql expression msg by vnode
#define TSDB_CODE_CACHE_BLOCK_TS_DISORDERED 107 // time stamp in cache block is disordered
#define TSDB_CODE_FILE_BLOCK_TS_DISORDERED 108 // time stamp in file block is disordered
// message type
#define TSDB_MSG_TYPE_REG 1
......
......@@ -230,5 +230,7 @@ char *tsError[] = {"success",
"session not ready",
"batch size too big",
"timestamp out of range",
"invalid query message"
"invalid query message",
"timestamp disordered in cache block",
"timestamp disordered in file block"
};
......@@ -553,8 +553,10 @@ int vnodeQueryFromCache(SMeterObj *pObj, SQuery *pQuery) {
return 0;
}
if (QUERY_IS_ASC_QUERY(pQuery)) {
TSKEY startkey = vnodeGetTSInCacheBlock(pCacheBlock, 0);
TSKEY endkey = vnodeGetTSInCacheBlock(pCacheBlock, numOfPoints - 1);
if (QUERY_IS_ASC_QUERY(pQuery)) {
if (endkey < pQuery->ekey) {
numOfReads = maxReads;
} else {
......@@ -563,7 +565,6 @@ int vnodeQueryFromCache(SMeterObj *pObj, SQuery *pQuery) {
numOfReads = (lastPos >= 0) ? lastPos + 1 : 0;
}
} else {
TSKEY startkey = vnodeGetTSInCacheBlock(pCacheBlock, 0);
if (startkey > pQuery->ekey) {
numOfReads = maxReads;
} else {
......@@ -595,8 +596,7 @@ int vnodeQueryFromCache(SMeterObj *pObj, SQuery *pQuery) {
int16_t type = GET_COLUMN_TYPE(pQuery, col);
pData = pQuery->sdata[col]->data + pQuery->pointsOffset * bytes;
/* this column is absent from current block, fill this block with null
* value */
/* this column is absent from current block, fill this block with null value */
if (colIdx < 0 || colIdx >= pObj->numOfColumns ||
pObj->schema[colIdx].colId != pQuery->pSelectExpr[col].pBase.colInfo.colId) { // set null
setNullN(pData, type, bytes, pCacheBlock->numOfPoints);
......@@ -611,8 +611,7 @@ int vnodeQueryFromCache(SMeterObj *pObj, SQuery *pQuery) {
for (int32_t k = 0; k < pQuery->numOfFilterCols; ++k) {
int16_t colIdx = pQuery->pFilterInfo[k].pFilter.colIdx;
if (colIdx < 0) {
/* current data has not specified column */
if (colIdx < 0) { // current data has not specified column
pQuery->pFilterInfo[k].pData = NULL;
} else {
pQuery->pFilterInfo[k].pData = pCacheBlock->offset[colIdx];
......@@ -625,7 +624,12 @@ int vnodeQueryFromCache(SMeterObj *pObj, SQuery *pQuery) {
if (QUERY_IS_ASC_QUERY(pQuery)) {
for (int32_t j = startPos; j < pCacheBlock->numOfPoints; ++j) {
TSKEY key = vnodeGetTSInCacheBlock(pCacheBlock, j);
assert(key >= pQuery->skey);
if (key < startkey || key > endkey) {
dError("vid:%d sid:%d id:%s, timestamp in cache slot is disordered. slot:%d, pos:%d, ts:%lld, block "
"range:%lld-%lld", pObj->vnode, pObj->sid, pObj->meterId, pQuery->slot, j, key, startkey, endkey);
tfree(ids);
return -TSDB_CODE_FILE_BLOCK_TS_DISORDERED;
}
if (key > pQuery->ekey) {
break;
......@@ -645,7 +649,12 @@ int vnodeQueryFromCache(SMeterObj *pObj, SQuery *pQuery) {
startPos = pQuery->pos;
for (int32_t j = startPos; j >= 0; --j) {
TSKEY key = vnodeGetTSInCacheBlock(pCacheBlock, j);
assert(key <= pQuery->skey);
if (key < startkey || key > endkey) {
dError("vid:%d sid:%d id:%s, timestamp in cache slot is disordered. slot:%d, pos:%d, ts:%lld, block "
"range:%lld-%lld", pObj->vnode, pObj->sid, pObj->meterId, pQuery->slot, j, key, startkey, endkey);
tfree(ids);
return -TSDB_CODE_FILE_BLOCK_TS_DISORDERED;
}
if (key < pQuery->ekey) {
break;
......
......@@ -1517,10 +1517,11 @@ int vnodeQueryFromFile(SMeterObj *pObj, SQuery *pQuery) {
* we allocate tsData buffer with twice size of the other ordinary pQuery->sdata.
* Otherwise, the query function may over-write buffer area while retrieve function has not packed the results into
* message to send to client yet.
*
* So the startPositionFactor is needed to denote which half part is used to store the result, and which
* part is available for keep data during query process.
* Note: the startPositionFactor must be used in conjunction with
* pQuery->pointsOffset
*
* Note: the startPositionFactor must be used in conjunction with pQuery->pointsOffset
*/
int32_t startPositionFactor = 1;
if (pQuery->colList[0].colIdx == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
......@@ -1537,8 +1538,10 @@ int vnodeQueryFromFile(SMeterObj *pObj, SQuery *pQuery) {
int maxReads = QUERY_IS_ASC_QUERY(pQuery) ? pBlock->numOfPoints - pQuery->pos : pQuery->pos + 1;
if (QUERY_IS_ASC_QUERY(pQuery)) {
TSKEY startKey = vnodeGetTSInDataBlock(pQuery, 0, startPositionFactor);
TSKEY endKey = vnodeGetTSInDataBlock(pQuery, pBlock->numOfPoints - 1, startPositionFactor);
if (QUERY_IS_ASC_QUERY(pQuery)) {
if (endKey < pQuery->ekey) {
numOfReads = maxReads;
} else {
......@@ -1548,7 +1551,6 @@ int vnodeQueryFromFile(SMeterObj *pObj, SQuery *pQuery) {
numOfReads = (lastPos >= 0) ? lastPos + 1 : 0;
}
} else {
TSKEY startKey = vnodeGetTSInDataBlock(pQuery, 0, startPositionFactor);
if (startKey > pQuery->ekey) {
numOfReads = maxReads;
} else {
......@@ -1601,7 +1603,12 @@ int vnodeQueryFromFile(SMeterObj *pObj, SQuery *pQuery) {
if (QUERY_IS_ASC_QUERY(pQuery)) {
for (int32_t j = startPos; j < pBlock->numOfPoints; j -= step) {
TSKEY key = vnodeGetTSInDataBlock(pQuery, j, startPositionFactor);
assert(key >= pQuery->skey);
if (key < startKey || key > endKey) {
dError("vid:%d sid:%d id:%s, timestamp in file block disordered. slot:%d, pos:%d, ts:%lld, block "
"range:%lld-%lld", pObj->vnode, pObj->sid, pObj->meterId, pQuery->slot, j, key, startKey, endKey);
tfree(ids);
return -TSDB_CODE_FILE_BLOCK_TS_DISORDERED;
}
// out of query range, quit
if (key > pQuery->ekey) {
......@@ -1621,7 +1628,12 @@ int vnodeQueryFromFile(SMeterObj *pObj, SQuery *pQuery) {
} else {
for (int32_t j = pQuery->pos; j >= 0; --j) {
TSKEY key = vnodeGetTSInDataBlock(pQuery, j, startPositionFactor);
assert(key <= pQuery->skey);
if (key < startKey || key > endKey) {
dError("vid:%d sid:%d id:%s, timestamp in file block disordered. slot:%d, pos:%d, ts:%lld, block "
"range:%lld-%lld", pObj->vnode, pObj->sid, pObj->meterId, pQuery->slot, j, key, startKey, endKey);
tfree(ids);
return -TSDB_CODE_FILE_BLOCK_TS_DISORDERED;
}
// out of query range, quit
if (key < pQuery->ekey) {
......
......@@ -507,7 +507,6 @@ void vnodeQueryData(SSchedMsg *pMsg) {
pQuery->pointsToRead = vnodeList[pObj->vnode].cfg.rowsInFileBlock;
pQuery->pointsOffset = pQInfo->bufIndex * pQuery->pointsToRead;
// dTrace("id:%s, start to query data", pQInfo->pObj->meterId);
int64_t st = taosGetTimestampUs();
while (1) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册