提交 696f1e12 编写于 作者: C Cary Xu

[TS-854]<fix>:generate final row from origin

上级 e7b7b86a
......@@ -531,14 +531,15 @@ static FORCE_INLINE int32_t getExtendedRowSize(STableDataBlocks *pBlock) {
return pBlock->rowSize + TD_MEM_ROW_DATA_HEAD_SIZE + pBlock->boundColumnInfo.extendedVarLen;
}
static FORCE_INLINE void checkAndConvertMemRow(SMemRow row, int32_t dataLen, int32_t kvLen) {
static FORCE_INLINE bool checkAndConvertMemRow(SMemRow row, int32_t dataLen, int32_t kvLen) {
if (isDataRow(row)) {
if (kvLen < (dataLen * KVRatioConvert)) {
memRowSetConvert(row);
return true;
}
} else if (kvLen > dataLen) {
memRowSetConvert(row);
return true;
}
return false;
}
static FORCE_INLINE void initSMemRow(SMemRow row, uint8_t memRowType, STableDataBlocks *pBlock, int16_t nBoundCols) {
......
......@@ -444,12 +444,14 @@ int32_t tsCheckTimestamp(STableDataBlocks *pDataBlocks, const char *start) {
return TSDB_CODE_SUCCESS;
}
int tsParseOneRow(char **str, STableDataBlocks *pDataBlocks, int16_t timePrec, int32_t *len, char *tmpTokenBuf,
SInsertStatementParam *pInsertParam) {
int tsParseOneRow(char **str, STableDataBlocks *pDataBlocks, int16_t timePrec, bool *isConverted, int32_t rowSize,
char *tmpTokenBuf, SInsertStatementParam *pInsertParam) {
int32_t index = 0;
SStrToken sToken = {0};
char *row = pDataBlocks->pData + pDataBlocks->size; // skip the SSubmitBlk header
char *row = pDataBlocks->pData +
((*isConverted) ? (pDataBlocks->size - rowSize) : pDataBlocks->size); // skip the SSubmitBlk header
*isConverted = false;
SParsedDataColInfo *spd = &pDataBlocks->boundColumnInfo;
STableMeta * pTableMeta = pDataBlocks->pTableMeta;
......@@ -550,12 +552,13 @@ int tsParseOneRow(char **str, STableDataBlocks *pDataBlocks, int16_t timePrec, i
if (!isParseBindParam) {
// 2. check and set convert flag
bool isNeedConvertRow = false;
if (pBuilder->compareStat == ROW_COMPARE_NEED) {
checkAndConvertMemRow(row, dataLen, kvLen);
isNeedConvertRow = checkAndConvertMemRow(row, dataLen, kvLen);
}
// 3. set the null value for the columns that do not assign values
if ((spd->numOfBound < spd->numOfCols) && isDataRow(row) && !isNeedConvertRow(row)) {
if ((spd->numOfBound < spd->numOfCols) && isDataRow(row) && !isNeedConvertRow) {
SDataRow dataRow = memRowDataBody(row);
for (int32_t i = 0; i < spd->numOfCols; ++i) {
if (spd->cols[i].valStat == VAL_STAT_NONE) {
......@@ -563,9 +566,14 @@ int tsParseOneRow(char **str, STableDataBlocks *pDataBlocks, int16_t timePrec, i
}
}
}
}
*len = getExtendedRowSize(pDataBlocks);
// 4. perform the convert
if (isNeedConvertRow) {
// put converted row to next location to minimize the memcpy
convertSMemRow(row + rowSize, row, pDataBlocks);
*isConverted = true;
}
}
return TSDB_CODE_SUCCESS;
}
......@@ -623,13 +631,15 @@ int32_t tsParseValues(char **str, STableDataBlocks *pDataBlock, int maxRows, SIn
pDataBlock->boundColumnInfo.allNullLen))) {
return code;
}
bool isConverted = false;
while (1) {
index = 0;
sToken = tStrGetToken(*str, &index, false);
if (sToken.n == 0 || sToken.type != TK_LP) break;
*str += index;
if ((*numOfRows) >= maxRows || pDataBlock->size + extendedRowSize >= pDataBlock->nAllocSize) {
// allocate 1 more row size to facilitate the SDataRow/SKVRow convert
if ((*numOfRows + 1) >= maxRows || pDataBlock->size + extendedRowSize >= pDataBlock->nAllocSize) {
int32_t tSize;
code = tscAllocateMemIfNeed(pDataBlock, extendedRowSize, &tSize);
if (code != TSDB_CODE_SUCCESS) { //TODO pass the correct error code to client
......@@ -637,17 +647,16 @@ int32_t tsParseValues(char **str, STableDataBlocks *pDataBlock, int maxRows, SIn
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
ASSERT(tSize >= maxRows);
ASSERT(tSize > maxRows); // 1 more row allocated
maxRows = tSize;
}
int32_t len = 0;
code = tsParseOneRow(str, pDataBlock, precision, &len, tmpTokenBuf, pInsertParam);
code = tsParseOneRow(str, pDataBlock, precision, &isConverted, extendedRowSize, tmpTokenBuf, pInsertParam);
if (code != TSDB_CODE_SUCCESS) { // error message has been set in tsParseOneRow, return directly
return TSDB_CODE_TSC_SQL_SYNTAX_ERROR;
}
pDataBlock->size += len;
pDataBlock->size += extendedRowSize;
index = 0;
sToken = tStrGetToken(*str, &index, false);
......@@ -659,6 +668,10 @@ int32_t tsParseValues(char **str, STableDataBlocks *pDataBlock, int maxRows, SIn
(*numOfRows)++;
}
if (isConverted) {
void *convertedSMemRow = pDataBlock->pData + pDataBlock->size;
memcpy(convertedSMemRow - extendedRowSize, convertedSMemRow, memRowTLen(convertedSMemRow));
}
if ((*numOfRows) <= 0) {
strcpy(pInsertParam->msg, "no any data points");
......@@ -1754,13 +1767,19 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int32_t numOfRow
goto _error;
}
tscAllocateMemIfNeed(pTableDataBlock, getExtendedRowSize(pTableDataBlock), &maxRows);
int32_t extendedRowSize = getExtendedRowSize(pTableDataBlock);
tscAllocateMemIfNeed(pTableDataBlock, extendedRowSize, &maxRows);
tokenBuf = calloc(1, TSDB_MAX_BYTES_PER_ROW);
if (tokenBuf == NULL) {
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto _error;
}
--maxRows; // 1 more row needed to facilitate the SDataRow/SKVRow convert
ASSERT(maxRows > 0);
bool isConverted = false;
while ((readLen = tgetline(&line, &n, fp)) != -1) {
if (('\r' == line[readLen - 1]) || ('\n' == line[readLen - 1])) {
line[--readLen] = 0;
......@@ -1773,19 +1792,23 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int32_t numOfRow
char *lineptr = line;
strtolower(line, line);
int32_t len = 0;
code = tsParseOneRow(&lineptr, pTableDataBlock, tinfo.precision, &len, tokenBuf, pInsertParam);
code = tsParseOneRow(&lineptr, pTableDataBlock, tinfo.precision, &isConverted, extendedRowSize, tokenBuf,
pInsertParam);
if (code != TSDB_CODE_SUCCESS || pTableDataBlock->numOfParams > 0) {
pSql->res.code = code;
break;
}
pTableDataBlock->size += len;
pTableDataBlock->size += extendedRowSize;
if (++count >= maxRows) {
break;
}
}
if (isConverted) {
void *convertedSMemRow = pTableDataBlock->pData + pTableDataBlock->size;
memcpy(convertedSMemRow - extendedRowSize, convertedSMemRow, memRowTLen(convertedSMemRow));
}
tfree(tokenBuf);
tfree(line);
......
......@@ -2019,19 +2019,12 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, SI
} else {
for (int32_t i = 0; i < numOfRows; ++i) {
char* payload = (blkKeyTuple + i)->payloadAddr;
if (isNeedConvertRow(payload)) {
convertSMemRow(pDataBlock, payload, pTableDataBlock);
TDRowTLenT rowTLen = memRowTLen(pDataBlock);
pDataBlock = POINTER_SHIFT(pDataBlock, rowTLen);
pBlock->dataLen += rowTLen;
} else {
TDRowTLenT rowTLen = memRowTLen(payload);
memcpy(pDataBlock, payload, rowTLen);
pDataBlock = POINTER_SHIFT(pDataBlock, rowTLen);
pBlock->dataLen += rowTLen;
}
}
}
int32_t len = pBlock->dataLen + pBlock->schemaLen;
pBlock->dataLen = htonl(pBlock->dataLen);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册