提交 55a4d262 编写于 作者: C Cary Xu

not perform SDataRow/SKVRow convert anymore, utilize boundNullLen to...

not perform SDataRow/SKVRow convert anymore, utilize boundNullLen to facilitate the memRowType predict
上级 55d4e40d
......@@ -117,7 +117,7 @@ typedef struct SParsedDataColInfo {
uint16_t flen; // TODO: get from STSchema
uint16_t allNullLen; // TODO: get from STSchema
uint16_t extendedVarLen;
uint16_t boundColsLen; // len of bound columns
uint16_t boundNullLen; // bound column len with all NULL value
int32_t * boundedColumns; // bound column idx according to schema
SBoundColumn * cols;
SBoundIdxInfo *colIdxInfo;
......@@ -201,20 +201,12 @@ static FORCE_INLINE void tscAppendMemRowColVal(SMemRow row, const void *value, b
int8_t colType, int32_t toffset, SMemRowBuilder *pBuilder,
int32_t rowNum) {
tdAppendMemRowColVal(row, value, isCopyVarData, colId, colType, toffset);
if (pBuilder->compareStat == ROW_COMPARE_NEED) {
SMemRowInfo *pRowInfo = pBuilder->rowInfo + rowNum;
tdGetColAppendDeltaLen(value, colType, &pRowInfo->dataLen, &pRowInfo->kvLen);
}
}
// Applicable to consume by one row
static FORCE_INLINE void tscAppendMemRowColValEx(SMemRow row, const void *value, bool isCopyVarData, int16_t colId,
int8_t colType, int32_t toffset, int32_t *dataLen, int32_t *kvLen,
uint8_t compareStat) {
int8_t colType, int32_t toffset) {
tdAppendMemRowColVal(row, value, isCopyVarData, colId, colType, toffset);
if (compareStat == ROW_COMPARE_NEED) {
tdGetColAppendDeltaLen(value, colType, dataLen, kvLen);
}
}
typedef struct STableDataBlocks {
SName tableName;
......@@ -531,17 +523,6 @@ static FORCE_INLINE int32_t getExtendedRowSize(STableDataBlocks *pBlock) {
return pBlock->rowSize + TD_MEM_ROW_DATA_HEAD_SIZE + pBlock->boundColumnInfo.extendedVarLen;
}
static FORCE_INLINE bool checkConvertMemRow(SMemRow row, int32_t dataLen, int32_t kvLen) {
if (isDataRow(row)) {
if (isConvertToKvRow(kvLen, dataLen)) {
return true;
}
} else if (isConvertToDataRow(kvLen, dataLen)) {
return true;
}
return false;
}
static FORCE_INLINE void initSMemRow(SMemRow row, uint8_t memRowType, STableDataBlocks *pBlock, int16_t nBoundCols) {
memRowSetType(row, memRowType);
if (isDataRowT(memRowType)) {
......@@ -641,8 +622,7 @@ static uint8_t TRUE_VALUE = (uint8_t)TSDB_TRUE;
static uint8_t FALSE_VALUE = (uint8_t)TSDB_FALSE;
static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, SMemRow row, char *msg, char **str,
bool primaryKey, int16_t timePrec, int32_t toffset, int16_t colId,
int32_t *dataLen, int32_t *kvLen, uint8_t compareStat) {
bool primaryKey, int16_t timePrec, int32_t toffset, int16_t colId) {
int64_t iv;
int32_t ret;
char * endptr = NULL;
......@@ -654,26 +634,22 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok
switch (pSchema->type) {
case TSDB_DATA_TYPE_BOOL: { // bool
if (isNullStr(pToken)) {
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen,
compareStat);
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset);
} else {
if ((pToken->type == TK_BOOL || pToken->type == TK_STRING) && (pToken->n != 0)) {
if (strncmp(pToken->z, "true", pToken->n) == 0) {
tscAppendMemRowColValEx(row, &TRUE_VALUE, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat);
tscAppendMemRowColValEx(row, &TRUE_VALUE, true, colId, pSchema->type, toffset);
} else if (strncmp(pToken->z, "false", pToken->n) == 0) {
tscAppendMemRowColValEx(row, &FALSE_VALUE, true, colId, pSchema->type, toffset, dataLen, kvLen,
compareStat);
tscAppendMemRowColValEx(row, &FALSE_VALUE, true, colId, pSchema->type, toffset);
} else {
return tscSQLSyntaxErrMsg(msg, "invalid bool data", pToken->z);
}
} else if (pToken->type == TK_INTEGER) {
iv = strtoll(pToken->z, NULL, 10);
tscAppendMemRowColValEx(row, ((iv == 0) ? &FALSE_VALUE : &TRUE_VALUE), true, colId, pSchema->type, toffset,
dataLen, kvLen, compareStat);
tscAppendMemRowColValEx(row, ((iv == 0) ? &FALSE_VALUE : &TRUE_VALUE), true, colId, pSchema->type, toffset);
} else if (pToken->type == TK_FLOAT) {
double dv = strtod(pToken->z, NULL);
tscAppendMemRowColValEx(row, ((dv == 0) ? &FALSE_VALUE : &TRUE_VALUE), true, colId, pSchema->type, toffset,
dataLen, kvLen, compareStat);
tscAppendMemRowColValEx(row, ((dv == 0) ? &FALSE_VALUE : &TRUE_VALUE), true, colId, pSchema->type, toffset);
} else {
return tscInvalidOperationMsg(msg, "invalid bool data", pToken->z);
}
......@@ -683,8 +659,7 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok
case TSDB_DATA_TYPE_TINYINT:
if (isNullStr(pToken)) {
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen,
compareStat);
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset);
} else {
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true);
if (ret != TSDB_CODE_SUCCESS) {
......@@ -694,15 +669,14 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok
}
uint8_t tmpVal = (uint8_t)iv;
tscAppendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat);
tscAppendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset);
}
break;
case TSDB_DATA_TYPE_UTINYINT:
if (isNullStr(pToken)) {
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen,
compareStat);
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset);
} else {
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false);
if (ret != TSDB_CODE_SUCCESS) {
......@@ -712,15 +686,14 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok
}
uint8_t tmpVal = (uint8_t)iv;
tscAppendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat);
tscAppendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset);
}
break;
case TSDB_DATA_TYPE_SMALLINT:
if (isNullStr(pToken)) {
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen,
compareStat);
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset);
} else {
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true);
if (ret != TSDB_CODE_SUCCESS) {
......@@ -730,15 +703,14 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok
}
int16_t tmpVal = (int16_t)iv;
tscAppendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat);
tscAppendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset);
}
break;
case TSDB_DATA_TYPE_USMALLINT:
if (isNullStr(pToken)) {
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen,
compareStat);
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset);
} else {
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false);
if (ret != TSDB_CODE_SUCCESS) {
......@@ -748,15 +720,14 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok
}
uint16_t tmpVal = (uint16_t)iv;
tscAppendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat);
tscAppendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset);
}
break;
case TSDB_DATA_TYPE_INT:
if (isNullStr(pToken)) {
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen,
compareStat);
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset);
} else {
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true);
if (ret != TSDB_CODE_SUCCESS) {
......@@ -766,15 +737,14 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok
}
int32_t tmpVal = (int32_t)iv;
tscAppendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat);
tscAppendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset);
}
break;
case TSDB_DATA_TYPE_UINT:
if (isNullStr(pToken)) {
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen,
compareStat);
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset);
} else {
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false);
if (ret != TSDB_CODE_SUCCESS) {
......@@ -784,15 +754,14 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok
}
uint32_t tmpVal = (uint32_t)iv;
tscAppendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat);
tscAppendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset);
}
break;
case TSDB_DATA_TYPE_BIGINT:
if (isNullStr(pToken)) {
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen,
compareStat);
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset);
} else {
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true);
if (ret != TSDB_CODE_SUCCESS) {
......@@ -801,14 +770,13 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok
return tscInvalidOperationMsg(msg, "bigint data overflow", pToken->z);
}
tscAppendMemRowColValEx(row, &iv, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat);
tscAppendMemRowColValEx(row, &iv, true, colId, pSchema->type, toffset);
}
break;
case TSDB_DATA_TYPE_UBIGINT:
if (isNullStr(pToken)) {
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen,
compareStat);
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset);
} else {
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false);
if (ret != TSDB_CODE_SUCCESS) {
......@@ -818,14 +786,13 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok
}
uint64_t tmpVal = (uint64_t)iv;
tscAppendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat);
tscAppendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset);
}
break;
case TSDB_DATA_TYPE_FLOAT:
if (isNullStr(pToken)) {
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen,
compareStat);
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset);
} else {
double dv;
if (TK_ILLEGAL == tscToDouble(pToken, &dv, &endptr)) {
......@@ -838,14 +805,13 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok
}
float tmpVal = (float)dv;
tscAppendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat);
tscAppendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset);
}
break;
case TSDB_DATA_TYPE_DOUBLE:
if (isNullStr(pToken)) {
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen,
compareStat);
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset);
} else {
double dv;
if (TK_ILLEGAL == tscToDouble(pToken, &dv, &endptr)) {
......@@ -856,15 +822,14 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok
return tscInvalidOperationMsg(msg, "illegal double data", pToken->z);
}
tscAppendMemRowColValEx(row, &dv, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat);
tscAppendMemRowColValEx(row, &dv, true, colId, pSchema->type, toffset);
}
break;
case TSDB_DATA_TYPE_BINARY:
// binary data cannot be null-terminated char string, otherwise the last char of the string is lost
if (pToken->type == TK_NULL) {
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen,
compareStat);
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset);
} else { // too long values will return invalid sql, not be truncated automatically
if (pToken->n + VARSTR_HEADER_SIZE > pSchema->bytes) { // todo refactor
return tscInvalidOperationMsg(msg, "string data overflow", pToken->z);
......@@ -872,14 +837,13 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok
// STR_WITH_SIZE_TO_VARSTR(payload, pToken->z, pToken->n);
char *rowEnd = memRowEnd(row);
STR_WITH_SIZE_TO_VARSTR(rowEnd, pToken->z, pToken->n);
tscAppendMemRowColValEx(row, rowEnd, false, colId, pSchema->type, toffset, dataLen, kvLen, compareStat);
tscAppendMemRowColValEx(row, rowEnd, false, colId, pSchema->type, toffset);
}
break;
case TSDB_DATA_TYPE_NCHAR:
if (pToken->type == TK_NULL) {
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen,
compareStat);
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset);
} else {
// if the converted output len is over than pColumnModel->bytes, return error: 'Argument list too long'
int32_t output = 0;
......@@ -891,7 +855,7 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok
return tscInvalidOperationMsg(msg, buf, pToken->z);
}
varDataSetLen(rowEnd, output);
tscAppendMemRowColValEx(row, rowEnd, false, colId, pSchema->type, toffset, dataLen, kvLen, compareStat);
tscAppendMemRowColValEx(row, rowEnd, false, colId, pSchema->type, toffset);
}
break;
......@@ -900,17 +864,16 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok
if (primaryKey) {
// When building SKVRow primaryKey, we should not skip even with NULL value.
int64_t tmpVal = 0;
tscAppendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat);
tscAppendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset);
} else {
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen,
compareStat);
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset);
}
} else {
int64_t tmpVal;
if (tsParseTime(pToken, &tmpVal, str, msg, timePrec) != TSDB_CODE_SUCCESS) {
return tscInvalidOperationMsg(msg, "invalid timestamp", pToken->z);
}
tscAppendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat);
tscAppendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset);
}
break;
......
......@@ -50,51 +50,12 @@ int initMemRowBuilder(SMemRowBuilder *pBuilder, uint32_t nRows, SParsedDataColIn
}
}
// default compareStat is ROW_COMPARE_NO_NEED
if (pColInfo->numOfBound == 0) { // file input
pBuilder->memRowType = SMEM_ROW_DATA;
return TSDB_CODE_SUCCESS;
} else if (pColInfo->extendedVarLen == 0) {
// all columns are NoVarType, we can calculate the memRowType precisely
uint32_t dataLen = pColInfo->allNullLen + TD_MEM_ROW_DATA_HEAD_SIZE;
uint32_t kvLen = TD_MEM_ROW_KV_HEAD_SIZE + pColInfo->numOfBound * sizeof(SColIdx) + pColInfo->boundColsLen;
if (isConvertToKvRow(kvLen, dataLen)) {
pBuilder->memRowType = SMEM_ROW_KV;
} else {
pBuilder->memRowType = SMEM_ROW_DATA;
}
return TSDB_CODE_SUCCESS;
uint32_t dataLen = pColInfo->allNullLen + TD_MEM_ROW_DATA_HEAD_SIZE;
uint32_t kvLen = TD_MEM_ROW_KV_HEAD_SIZE + pColInfo->numOfBound * sizeof(SColIdx) + pColInfo->boundNullLen;
if (isConvertToKvRow(kvLen, dataLen)) {
pBuilder->memRowType = SMEM_ROW_KV;
} else {
float boundRatio = ((float)pColInfo->numOfBound / (float)pColInfo->numOfCols);
if (boundRatio < KVRatioKV) {
pBuilder->memRowType = SMEM_ROW_KV;
return TSDB_CODE_SUCCESS;
} else if (boundRatio > KVRatioData) {
pBuilder->memRowType = SMEM_ROW_DATA;
return TSDB_CODE_SUCCESS;
}
pBuilder->compareStat = ROW_COMPARE_NEED;
if (boundRatio < KVRatioPredict) {
pBuilder->memRowType = SMEM_ROW_KV;
} else {
pBuilder->memRowType = SMEM_ROW_DATA;
}
}
pBuilder->kvRowInitLen = TD_MEM_ROW_KV_HEAD_SIZE + pColInfo->numOfBound * sizeof(SColIdx);
if (nRows > 0) {
pBuilder->rowInfo = tcalloc(nRows, sizeof(SMemRowInfo));
if (pBuilder->rowInfo == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
for (int i = 0; i < nRows; ++i) {
(pBuilder->rowInfo + i)->dataLen = TD_MEM_ROW_DATA_HEAD_SIZE + pColInfo->allNullLen;
(pBuilder->rowInfo + i)->kvLen = pBuilder->kvRowInitLen;
}
pBuilder->memRowType = SMEM_ROW_DATA;
}
return TSDB_CODE_SUCCESS;
......@@ -453,18 +414,16 @@ int32_t tsCheckTimestamp(STableDataBlocks *pDataBlocks, const char *start) {
return TSDB_CODE_SUCCESS;
}
int tsParseOneRow(char **str, STableDataBlocks *pDataBlocks, int16_t timePrec, int32_t *convertOffset, int32_t rowSize,
char *tmpTokenBuf, SInsertStatementParam *pInsertParam) {
int tsParseOneRow(char **str, STableDataBlocks *pDataBlocks, int16_t timePrec, int32_t rowSize, char *tmpTokenBuf,
SInsertStatementParam *pInsertParam) {
int32_t index = 0;
SStrToken sToken = {0};
char *row = pDataBlocks->pData + (pDataBlocks->size - (*convertOffset) * rowSize); // skip the SSubmitBlk header
char * row = pDataBlocks->pData + pDataBlocks->size; // skip the SSubmitBlk header
SParsedDataColInfo *spd = &pDataBlocks->boundColumnInfo;
STableMeta * pTableMeta = pDataBlocks->pTableMeta;
SSchema * schema = tscGetTableSchema(pTableMeta);
SMemRowBuilder * pBuilder = &pDataBlocks->rowBuilder;
int32_t dataLen = spd->allNullLen + TD_MEM_ROW_DATA_HEAD_SIZE;
int32_t kvLen = pBuilder->kvRowInitLen;
bool isParseBindParam = false;
initSMemRow(row, pBuilder->memRowType, pDataBlocks, spd->numOfBound);
......@@ -541,8 +500,8 @@ int tsParseOneRow(char **str, STableDataBlocks *pDataBlocks, int16_t timePrec, i
int16_t colId = -1;
tscGetMemRowAppendInfo(schema, pBuilder->memRowType, spd, i, &toffset, &colId);
int32_t ret = tsParseOneColumnKV(pSchema, &sToken, row, pInsertParam->msg, str, isPrimaryKey, timePrec, toffset,
colId, &dataLen, &kvLen, pBuilder->compareStat);
int32_t ret =
tsParseOneColumnKV(pSchema, &sToken, row, pInsertParam->msg, str, isPrimaryKey, timePrec, toffset, colId);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
......@@ -557,14 +516,8 @@ int tsParseOneRow(char **str, STableDataBlocks *pDataBlocks, int16_t timePrec, i
}
if (!isParseBindParam) {
// 2. check and set convert flag
bool isNeedConvertRow = false;
if (pBuilder->compareStat == ROW_COMPARE_NEED) {
isNeedConvertRow = checkConvertMemRow(row, dataLen, kvLen);
}
// 3. set the null value for the columns that do not assign values
if ((spd->numOfBound < spd->numOfCols) && isDataRow(row) && !isNeedConvertRow) {
// set the null value for the columns that do not assign values
if ((spd->numOfBound < spd->numOfCols) && isDataRow(row)) {
SDataRow dataRow = memRowDataBody(row);
for (int32_t i = 0; i < spd->numOfCols; ++i) {
if (spd->cols[i].valStat == VAL_STAT_NONE) {
......@@ -572,18 +525,6 @@ int tsParseOneRow(char **str, STableDataBlocks *pDataBlocks, int16_t timePrec, i
}
}
}
// 4. perform the convert
if (isNeedConvertRow) {
// put converted row to next location to minimize the memcpy
pDataBlocks->ordered = false;
++(*convertOffset);
convertSMemRow(row + (*convertOffset) * rowSize, row, pDataBlocks);
// revise the predicted memRowType for batch insert
pBuilder->memRowType = pBuilder->memRowType == SMEM_ROW_DATA ? SMEM_ROW_KV : SMEM_ROW_DATA;
} else {
*convertOffset = 0;
}
}
return TSDB_CODE_SUCCESS;
......@@ -640,15 +581,13 @@ int32_t tsParseValues(char **str, STableDataBlocks *pDataBlock, int maxRows, SIn
if (TSDB_CODE_SUCCESS != (code = initMemRowBuilder(&pDataBlock->rowBuilder, 0, &pDataBlock->boundColumnInfo))) {
return code;
}
int32_t convertOffset = 0;
while (1) {
index = 0;
sToken = tStrGetToken(*str, &index, false);
if (sToken.n == 0 || sToken.type != TK_LP) break;
*str += index;
// allocate 1 more row size to facilitate the SDataRow/SKVRow convert
if ((*numOfRows + 1) >= maxRows || pDataBlock->size + extendedRowSize >= pDataBlock->nAllocSize) {
if ((*numOfRows) >= maxRows || pDataBlock->size + extendedRowSize >= pDataBlock->nAllocSize) {
int32_t tSize;
code = tscAllocateMemIfNeed(pDataBlock, extendedRowSize, &tSize);
if (code != TSDB_CODE_SUCCESS) { //TODO pass the correct error code to client
......@@ -656,11 +595,11 @@ int32_t tsParseValues(char **str, STableDataBlocks *pDataBlock, int maxRows, SIn
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
ASSERT(tSize > maxRows); // 1 more row allocated
ASSERT(tSize >= maxRows);
maxRows = tSize;
}
code = tsParseOneRow(str, pDataBlock, precision, &convertOffset, extendedRowSize, tmpTokenBuf, pInsertParam);
code = tsParseOneRow(str, pDataBlock, precision, extendedRowSize, tmpTokenBuf, pInsertParam);
if (code != TSDB_CODE_SUCCESS) { // error message has been set in tsParseOneRow, return directly
return TSDB_CODE_TSC_SQL_SYNTAX_ERROR;
}
......@@ -677,10 +616,6 @@ int32_t tsParseValues(char **str, STableDataBlocks *pDataBlock, int maxRows, SIn
(*numOfRows)++;
}
if (convertOffset) {
char *convertedSMemRow = pDataBlock->pData + pDataBlock->size;
memcpy(convertedSMemRow - convertOffset * extendedRowSize, convertedSMemRow, (size_t)memRowTLen(convertedSMemRow));
}
if ((*numOfRows) <= 0) {
strcpy(pInsertParam->msg, "no any data points");
......@@ -723,7 +658,7 @@ void tscSetBoundColumnInfo(SParsedDataColInfo *pColInfo, SSchema *pSchema, int32
pColInfo->boundedColumns[i] = i;
}
pColInfo->allNullLen += pColInfo->flen;
pColInfo->boundColsLen = pColInfo->flen;
pColInfo->boundNullLen = pColInfo->allNullLen; // default set allNullLen
pColInfo->extendedVarLen = (uint16_t)(nVar * sizeof(VarDataOffsetT));
}
......@@ -1247,7 +1182,7 @@ static int32_t parseBoundColumns(SInsertStatementParam *pInsertParam, SParsedDat
int32_t nCols = pColInfo->numOfCols;
pColInfo->numOfBound = 0;
pColInfo->boundColsLen = 0;
pColInfo->boundNullLen = 0;
memset(pColInfo->boundedColumns, 0, sizeof(int32_t) * nCols);
for (int32_t i = 0; i < nCols; ++i) {
pColInfo->cols[i].valStat = VAL_STAT_NONE;
......@@ -1305,7 +1240,18 @@ static int32_t parseBoundColumns(SInsertStatementParam *pInsertParam, SParsedDat
pColInfo->cols[t].valStat = VAL_STAT_HAS;
pColInfo->boundedColumns[pColInfo->numOfBound] = t;
++pColInfo->numOfBound;
pColInfo->boundColsLen += pSchema[t].bytes;
// N.B. make sure sizeof(VarDataOffsetT) == sizeof(SColIdx)
pColInfo->boundNullLen += TYPE_BYTES[pSchema[t].type];
switch (pSchema[t].type) {
case TSDB_DATA_TYPE_BINARY:
pColInfo->boundNullLen += (VARSTR_HEADER_SIZE + CHAR_BYTES);
break;
case TSDB_DATA_TYPE_NCHAR:
pColInfo->boundNullLen += (VARSTR_HEADER_SIZE + TSDB_NCHAR_SIZE);
break;
default:
break;
}
findColumnIndex = true;
if (isOrdered && (lastColIdx > t)) {
isOrdered = false;
......@@ -1329,7 +1275,18 @@ static int32_t parseBoundColumns(SInsertStatementParam *pInsertParam, SParsedDat
pColInfo->cols[t].valStat = VAL_STAT_HAS;
pColInfo->boundedColumns[pColInfo->numOfBound] = t;
++pColInfo->numOfBound;
pColInfo->boundColsLen += pSchema[t].bytes;
// N.B. make sure sizeof(VarDataOffsetT) == sizeof(SColIdx)
pColInfo->boundNullLen += TYPE_BYTES[pSchema[t].type];
switch (pSchema[t].type) {
case TSDB_DATA_TYPE_BINARY:
pColInfo->boundNullLen += (VARSTR_HEADER_SIZE + CHAR_BYTES);
break;
case TSDB_DATA_TYPE_NCHAR:
pColInfo->boundNullLen += (VARSTR_HEADER_SIZE + TSDB_NCHAR_SIZE);
break;
default:
break;
}
findColumnIndex = true;
if (isOrdered && (lastColIdx > t)) {
isOrdered = false;
......@@ -1788,10 +1745,9 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int32_t numOfRow
goto _error;
}
// insert from .csv means full and ordered columns, thus use SDataRow all the time and no need to covert
ASSERT(SMEM_ROW_DATA == pTableDataBlock->rowBuilder.memRowType &&
ROW_COMPARE_NO_NEED == pTableDataBlock->rowBuilder.compareStat);
int32_t convertOffset = 0;
// insert from .csv means full and ordered columns, thus use SDataRow all the time
ASSERT(SMEM_ROW_DATA == pTableDataBlock->rowBuilder.memRowType);
while ((readLen = tgetline(&line, &n, fp)) != -1) {
if (('\r' == line[readLen - 1]) || ('\n' == line[readLen - 1])) {
line[--readLen] = 0;
......@@ -1804,8 +1760,7 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int32_t numOfRow
char *lineptr = line;
strtolower(line, line);
code = tsParseOneRow(&lineptr, pTableDataBlock, tinfo.precision, &convertOffset, extendedRowSize, tokenBuf,
pInsertParam);
code = tsParseOneRow(&lineptr, pTableDataBlock, tinfo.precision, extendedRowSize, tokenBuf, pInsertParam);
if (code != TSDB_CODE_SUCCESS || pTableDataBlock->numOfParams > 0) {
pSql->res.code = code;
break;
......
......@@ -610,9 +610,6 @@ typedef void *SMemRow;
#define SMEM_ROW_DATA 0x0U // SDataRow
#define SMEM_ROW_KV 0x01U // SKVRow
#define KVRatioKV (0.2f) // all bool
#define KVRatioPredict (0.4f)
#define KVRatioData (0.75f) // all bigint
#define KVRatioConvert (0.9f)
#define memRowType(r) ((*(uint8_t *)(r)) & 0x01)
......@@ -724,31 +721,6 @@ static FORCE_INLINE int32_t tdGetColAppendLen(uint8_t rowType, const void *value
return len;
}
/**
* 1. calculate the delta of AllNullLen for SDataRow.
* 2. calculate the real len for SKVRow.
*/
static FORCE_INLINE void tdGetColAppendDeltaLen(const void *value, int8_t colType, int32_t *dataLen, int32_t *kvLen) {
switch (colType) {
case TSDB_DATA_TYPE_BINARY: {
int32_t varLen = varDataLen(value);
*dataLen += (varLen - CHAR_BYTES);
*kvLen += (varLen + sizeof(SColIdx));
break;
}
case TSDB_DATA_TYPE_NCHAR: {
int32_t varLen = varDataLen(value);
*dataLen += (varLen - TSDB_NCHAR_SIZE);
*kvLen += (varLen + sizeof(SColIdx));
break;
}
default: {
*kvLen += (TYPE_BYTES[colType] + sizeof(SColIdx));
break;
}
}
}
typedef struct {
int16_t colId;
uint8_t colType;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册