提交 8007cc55 编写于 作者: C Cary Xu

not perform SDataRow/SKVRow convert anymore, utilize boundNullLen to...

not perform SDataRow/SKVRow convert anymore, utilize boundNullLen to facilitate the memRowType predict
上级 b1573409
...@@ -115,9 +115,9 @@ typedef struct SParsedDataColInfo { ...@@ -115,9 +115,9 @@ typedef struct SParsedDataColInfo {
int16_t numOfCols; int16_t numOfCols;
int16_t numOfBound; int16_t numOfBound;
uint16_t flen; // TODO: get from STSchema uint16_t flen; // TODO: get from STSchema
uint16_t allNullLen; // TODO: get from STSchema uint16_t allNullLen; // TODO: get from STSchema(base on SDataRow)
uint16_t extendedVarLen; uint16_t extendedVarLen;
uint16_t boundColsLen; // len of bound columns uint16_t boundNullLen; // bound column len with all NULL value(without VarDataOffsetT/SColIdx part)
int32_t * boundedColumns; // bound column idx according to schema int32_t * boundedColumns; // bound column idx according to schema
SBoundColumn * cols; SBoundColumn * cols;
SBoundIdxInfo *colIdxInfo; SBoundIdxInfo *colIdxInfo;
...@@ -205,9 +205,6 @@ static FORCE_INLINE void tscAppendMemRowColValEx(SMemRow row, const void *value, ...@@ -205,9 +205,6 @@ static FORCE_INLINE void tscAppendMemRowColValEx(SMemRow row, const void *value,
int8_t colType, int32_t toffset, int32_t *dataLen, int32_t *kvLen, int8_t colType, int32_t toffset, int32_t *dataLen, int32_t *kvLen,
uint8_t compareStat) { uint8_t compareStat) {
tdAppendMemRowColVal(row, value, isCopyVarData, colId, colType, toffset); tdAppendMemRowColVal(row, value, isCopyVarData, colId, colType, toffset);
if (compareStat == ROW_COMPARE_NEED) {
tdGetColAppendDeltaLen(value, colType, dataLen, kvLen);
}
} }
typedef struct STableDataBlocks { typedef struct STableDataBlocks {
SName tableName; SName tableName;
...@@ -529,17 +526,6 @@ static FORCE_INLINE int32_t getExtendedRowSize(STableDataBlocks *pBlock) { ...@@ -529,17 +526,6 @@ static FORCE_INLINE int32_t getExtendedRowSize(STableDataBlocks *pBlock) {
return pBlock->rowSize + TD_MEM_ROW_DATA_HEAD_SIZE + pBlock->boundColumnInfo.extendedVarLen; return pBlock->rowSize + TD_MEM_ROW_DATA_HEAD_SIZE + pBlock->boundColumnInfo.extendedVarLen;
} }
static FORCE_INLINE bool checkConvertMemRow(SMemRow row, int32_t dataLen, int32_t kvLen) {
if (isDataRow(row)) {
if (isConvertToKvRow(kvLen, dataLen)) {
return true;
}
} else if (isConvertToDataRow(kvLen, dataLen)) {
return true;
}
return false;
}
static FORCE_INLINE void initSMemRow(SMemRow row, uint8_t memRowType, STableDataBlocks *pBlock, int16_t nBoundCols) { static FORCE_INLINE void initSMemRow(SMemRow row, uint8_t memRowType, STableDataBlocks *pBlock, int16_t nBoundCols) {
memRowSetType(row, memRowType); memRowSetType(row, memRowType);
if (isDataRowT(memRowType)) { if (isDataRowT(memRowType)) {
...@@ -639,8 +625,7 @@ static uint8_t TRUE_VALUE = (uint8_t)TSDB_TRUE; ...@@ -639,8 +625,7 @@ static uint8_t TRUE_VALUE = (uint8_t)TSDB_TRUE;
static uint8_t FALSE_VALUE = (uint8_t)TSDB_FALSE; static uint8_t FALSE_VALUE = (uint8_t)TSDB_FALSE;
static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, SMemRow row, char *msg, char **str, static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, SMemRow row, char *msg, char **str,
bool primaryKey, int16_t timePrec, int32_t toffset, int16_t colId, bool primaryKey, int16_t timePrec, int32_t toffset, int16_t colId) {
int32_t *dataLen, int32_t *kvLen, uint8_t compareStat) {
int64_t iv; int64_t iv;
int32_t ret; int32_t ret;
char * endptr = NULL; char * endptr = NULL;
...@@ -652,26 +637,22 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok ...@@ -652,26 +637,22 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok
switch (pSchema->type) { switch (pSchema->type) {
case TSDB_DATA_TYPE_BOOL: { // bool case TSDB_DATA_TYPE_BOOL: { // bool
if (isNullStr(pToken)) { if (isNullStr(pToken)) {
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen, tdAppendMemRowColVal(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset);
compareStat);
} else { } else {
if ((pToken->type == TK_BOOL || pToken->type == TK_STRING) && (pToken->n != 0)) { if ((pToken->type == TK_BOOL || pToken->type == TK_STRING) && (pToken->n != 0)) {
if (strncmp(pToken->z, "true", pToken->n) == 0) { if (strncmp(pToken->z, "true", pToken->n) == 0) {
tscAppendMemRowColValEx(row, &TRUE_VALUE, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat); tdAppendMemRowColVal(row, &TRUE_VALUE, true, colId, pSchema->type, toffset);
} else if (strncmp(pToken->z, "false", pToken->n) == 0) { } else if (strncmp(pToken->z, "false", pToken->n) == 0) {
tscAppendMemRowColValEx(row, &FALSE_VALUE, true, colId, pSchema->type, toffset, dataLen, kvLen, tdAppendMemRowColVal(row, &FALSE_VALUE, true, colId, pSchema->type, toffset);
compareStat);
} else { } else {
return tscSQLSyntaxErrMsg(msg, "invalid bool data", pToken->z); return tscSQLSyntaxErrMsg(msg, "invalid bool data", pToken->z);
} }
} else if (pToken->type == TK_INTEGER) { } else if (pToken->type == TK_INTEGER) {
iv = strtoll(pToken->z, NULL, 10); iv = strtoll(pToken->z, NULL, 10);
tscAppendMemRowColValEx(row, ((iv == 0) ? &FALSE_VALUE : &TRUE_VALUE), true, colId, pSchema->type, toffset, tdAppendMemRowColVal(row, ((iv == 0) ? &FALSE_VALUE : &TRUE_VALUE), true, colId, pSchema->type, toffset);
dataLen, kvLen, compareStat);
} else if (pToken->type == TK_FLOAT) { } else if (pToken->type == TK_FLOAT) {
double dv = strtod(pToken->z, NULL); double dv = strtod(pToken->z, NULL);
tscAppendMemRowColValEx(row, ((dv == 0) ? &FALSE_VALUE : &TRUE_VALUE), true, colId, pSchema->type, toffset, tdAppendMemRowColVal(row, ((dv == 0) ? &FALSE_VALUE : &TRUE_VALUE), true, colId, pSchema->type, toffset);
dataLen, kvLen, compareStat);
} else { } else {
return tscInvalidOperationMsg(msg, "invalid bool data", pToken->z); return tscInvalidOperationMsg(msg, "invalid bool data", pToken->z);
} }
...@@ -681,8 +662,7 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok ...@@ -681,8 +662,7 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok
case TSDB_DATA_TYPE_TINYINT: case TSDB_DATA_TYPE_TINYINT:
if (isNullStr(pToken)) { if (isNullStr(pToken)) {
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen, tdAppendMemRowColVal(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset);
compareStat);
} else { } else {
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true); ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
...@@ -692,15 +672,14 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok ...@@ -692,15 +672,14 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok
} }
uint8_t tmpVal = (uint8_t)iv; uint8_t tmpVal = (uint8_t)iv;
tscAppendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat); tdAppendMemRowColVal(row, &tmpVal, true, colId, pSchema->type, toffset);
} }
break; break;
case TSDB_DATA_TYPE_UTINYINT: case TSDB_DATA_TYPE_UTINYINT:
if (isNullStr(pToken)) { if (isNullStr(pToken)) {
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen, tdAppendMemRowColVal(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset);
compareStat);
} else { } else {
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false); ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
...@@ -710,15 +689,14 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok ...@@ -710,15 +689,14 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok
} }
uint8_t tmpVal = (uint8_t)iv; uint8_t tmpVal = (uint8_t)iv;
tscAppendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat); tdAppendMemRowColVal(row, &tmpVal, true, colId, pSchema->type, toffset);
} }
break; break;
case TSDB_DATA_TYPE_SMALLINT: case TSDB_DATA_TYPE_SMALLINT:
if (isNullStr(pToken)) { if (isNullStr(pToken)) {
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen, tdAppendMemRowColVal(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset);
compareStat);
} else { } else {
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true); ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
...@@ -728,15 +706,14 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok ...@@ -728,15 +706,14 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok
} }
int16_t tmpVal = (int16_t)iv; int16_t tmpVal = (int16_t)iv;
tscAppendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat); tdAppendMemRowColVal(row, &tmpVal, true, colId, pSchema->type, toffset);
} }
break; break;
case TSDB_DATA_TYPE_USMALLINT: case TSDB_DATA_TYPE_USMALLINT:
if (isNullStr(pToken)) { if (isNullStr(pToken)) {
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen, tdAppendMemRowColVal(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset);
compareStat);
} else { } else {
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false); ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
...@@ -746,15 +723,14 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok ...@@ -746,15 +723,14 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok
} }
uint16_t tmpVal = (uint16_t)iv; uint16_t tmpVal = (uint16_t)iv;
tscAppendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat); tdAppendMemRowColVal(row, &tmpVal, true, colId, pSchema->type, toffset);
} }
break; break;
case TSDB_DATA_TYPE_INT: case TSDB_DATA_TYPE_INT:
if (isNullStr(pToken)) { if (isNullStr(pToken)) {
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen, tdAppendMemRowColVal(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset);
compareStat);
} else { } else {
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true); ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
...@@ -764,15 +740,14 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok ...@@ -764,15 +740,14 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok
} }
int32_t tmpVal = (int32_t)iv; int32_t tmpVal = (int32_t)iv;
tscAppendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat); tdAppendMemRowColVal(row, &tmpVal, true, colId, pSchema->type, toffset);
} }
break; break;
case TSDB_DATA_TYPE_UINT: case TSDB_DATA_TYPE_UINT:
if (isNullStr(pToken)) { if (isNullStr(pToken)) {
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen, tdAppendMemRowColVal(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset);
compareStat);
} else { } else {
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false); ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
...@@ -782,15 +757,14 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok ...@@ -782,15 +757,14 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok
} }
uint32_t tmpVal = (uint32_t)iv; uint32_t tmpVal = (uint32_t)iv;
tscAppendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat); tdAppendMemRowColVal(row, &tmpVal, true, colId, pSchema->type, toffset);
} }
break; break;
case TSDB_DATA_TYPE_BIGINT: case TSDB_DATA_TYPE_BIGINT:
if (isNullStr(pToken)) { if (isNullStr(pToken)) {
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen, tdAppendMemRowColVal(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset);
compareStat);
} else { } else {
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true); ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
...@@ -799,14 +773,13 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok ...@@ -799,14 +773,13 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok
return tscInvalidOperationMsg(msg, "bigint data overflow", pToken->z); return tscInvalidOperationMsg(msg, "bigint data overflow", pToken->z);
} }
tscAppendMemRowColValEx(row, &iv, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat); tdAppendMemRowColVal(row, &iv, true, colId, pSchema->type, toffset);
} }
break; break;
case TSDB_DATA_TYPE_UBIGINT: case TSDB_DATA_TYPE_UBIGINT:
if (isNullStr(pToken)) { if (isNullStr(pToken)) {
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen, tdAppendMemRowColVal(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset);
compareStat);
} else { } else {
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false); ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
...@@ -816,14 +789,13 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok ...@@ -816,14 +789,13 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok
} }
uint64_t tmpVal = (uint64_t)iv; uint64_t tmpVal = (uint64_t)iv;
tscAppendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat); tdAppendMemRowColVal(row, &tmpVal, true, colId, pSchema->type, toffset);
} }
break; break;
case TSDB_DATA_TYPE_FLOAT: case TSDB_DATA_TYPE_FLOAT:
if (isNullStr(pToken)) { if (isNullStr(pToken)) {
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen, tdAppendMemRowColVal(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset);
compareStat);
} else { } else {
double dv; double dv;
if (TK_ILLEGAL == tscToDouble(pToken, &dv, &endptr)) { if (TK_ILLEGAL == tscToDouble(pToken, &dv, &endptr)) {
...@@ -836,14 +808,13 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok ...@@ -836,14 +808,13 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok
} }
float tmpVal = (float)dv; float tmpVal = (float)dv;
tscAppendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat); tdAppendMemRowColVal(row, &tmpVal, true, colId, pSchema->type, toffset);
} }
break; break;
case TSDB_DATA_TYPE_DOUBLE: case TSDB_DATA_TYPE_DOUBLE:
if (isNullStr(pToken)) { if (isNullStr(pToken)) {
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen, tdAppendMemRowColVal(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset);
compareStat);
} else { } else {
double dv; double dv;
if (TK_ILLEGAL == tscToDouble(pToken, &dv, &endptr)) { if (TK_ILLEGAL == tscToDouble(pToken, &dv, &endptr)) {
...@@ -854,15 +825,14 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok ...@@ -854,15 +825,14 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok
return tscInvalidOperationMsg(msg, "illegal double data", pToken->z); return tscInvalidOperationMsg(msg, "illegal double data", pToken->z);
} }
tscAppendMemRowColValEx(row, &dv, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat); tdAppendMemRowColVal(row, &dv, true, colId, pSchema->type, toffset);
} }
break; break;
case TSDB_DATA_TYPE_BINARY: case TSDB_DATA_TYPE_BINARY:
// binary data cannot be null-terminated char string, otherwise the last char of the string is lost // binary data cannot be null-terminated char string, otherwise the last char of the string is lost
if (pToken->type == TK_NULL) { if (pToken->type == TK_NULL) {
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen, tdAppendMemRowColVal(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset);
compareStat);
} else { // too long values will return invalid sql, not be truncated automatically } else { // too long values will return invalid sql, not be truncated automatically
if (pToken->n + VARSTR_HEADER_SIZE > pSchema->bytes) { // todo refactor if (pToken->n + VARSTR_HEADER_SIZE > pSchema->bytes) { // todo refactor
return tscInvalidOperationMsg(msg, "string data overflow", pToken->z); return tscInvalidOperationMsg(msg, "string data overflow", pToken->z);
...@@ -870,14 +840,13 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok ...@@ -870,14 +840,13 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok
// STR_WITH_SIZE_TO_VARSTR(payload, pToken->z, pToken->n); // STR_WITH_SIZE_TO_VARSTR(payload, pToken->z, pToken->n);
char *rowEnd = memRowEnd(row); char *rowEnd = memRowEnd(row);
STR_WITH_SIZE_TO_VARSTR(rowEnd, pToken->z, pToken->n); STR_WITH_SIZE_TO_VARSTR(rowEnd, pToken->z, pToken->n);
tscAppendMemRowColValEx(row, rowEnd, false, colId, pSchema->type, toffset, dataLen, kvLen, compareStat); tdAppendMemRowColVal(row, rowEnd, false, colId, pSchema->type, toffset);
} }
break; break;
case TSDB_DATA_TYPE_NCHAR: case TSDB_DATA_TYPE_NCHAR:
if (pToken->type == TK_NULL) { if (pToken->type == TK_NULL) {
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen, tdAppendMemRowColVal(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset);
compareStat);
} else { } else {
// if the converted output len is over than pColumnModel->bytes, return error: 'Argument list too long' // if the converted output len is over than pColumnModel->bytes, return error: 'Argument list too long'
int32_t output = 0; int32_t output = 0;
...@@ -889,7 +858,7 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok ...@@ -889,7 +858,7 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok
return tscInvalidOperationMsg(msg, buf, pToken->z); return tscInvalidOperationMsg(msg, buf, pToken->z);
} }
varDataSetLen(rowEnd, output); varDataSetLen(rowEnd, output);
tscAppendMemRowColValEx(row, rowEnd, false, colId, pSchema->type, toffset, dataLen, kvLen, compareStat); tdAppendMemRowColVal(row, rowEnd, false, colId, pSchema->type, toffset);
} }
break; break;
...@@ -898,17 +867,16 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok ...@@ -898,17 +867,16 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok
if (primaryKey) { if (primaryKey) {
// When building SKVRow primaryKey, we should not skip even with NULL value. // When building SKVRow primaryKey, we should not skip even with NULL value.
int64_t tmpVal = 0; int64_t tmpVal = 0;
tscAppendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat); tdAppendMemRowColVal(row, &tmpVal, true, colId, pSchema->type, toffset);
} else { } else {
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen, tdAppendMemRowColVal(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset);
compareStat);
} }
} else { } else {
int64_t tmpVal; int64_t tmpVal;
if (tsParseTime(pToken, &tmpVal, str, msg, timePrec) != TSDB_CODE_SUCCESS) { if (tsParseTime(pToken, &tmpVal, str, msg, timePrec) != TSDB_CODE_SUCCESS) {
return tscInvalidOperationMsg(msg, "invalid timestamp", pToken->z); return tscInvalidOperationMsg(msg, "invalid timestamp", pToken->z);
} }
tscAppendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat); tdAppendMemRowColVal(row, &tmpVal, true, colId, pSchema->type, toffset);
} }
break; break;
......
...@@ -50,52 +50,13 @@ int initMemRowBuilder(SMemRowBuilder *pBuilder, uint32_t nRows, SPars ...@@ -50,52 +50,13 @@ int initMemRowBuilder(SMemRowBuilder *pBuilder, uint32_t nRows, SPars
} }
} }
// default compareStat is ROW_COMPARE_NO_NEED uint32_t dataLen = TD_MEM_ROW_DATA_HEAD_SIZE + pColInfo->allNullLen;
if (pColInfo->numOfBound == 0) { // file input uint32_t kvLen = TD_MEM_ROW_KV_HEAD_SIZE + pColInfo->numOfBound * sizeof(SColIdx) + pColInfo->boundNullLen;
pBuilder->memRowType = SMEM_ROW_DATA; if (isConvertToKVRow(kvLen, dataLen)) {
return TSDB_CODE_SUCCESS;
} else if (pColInfo->extendedVarLen == 0) {
// all columns are NoVarType, we can calculate the memRowType precisely
uint32_t dataLen = pColInfo->allNullLen + TD_MEM_ROW_DATA_HEAD_SIZE;
uint32_t kvLen = TD_MEM_ROW_KV_HEAD_SIZE + pColInfo->numOfBound * sizeof(SColIdx) + pColInfo->boundColsLen;
if (isConvertToKvRow(kvLen, dataLen)) {
pBuilder->memRowType = SMEM_ROW_KV; pBuilder->memRowType = SMEM_ROW_KV;
} else { } else {
pBuilder->memRowType = SMEM_ROW_DATA; pBuilder->memRowType = SMEM_ROW_DATA;
} }
return TSDB_CODE_SUCCESS;
} else {
float boundRatio = ((float)pColInfo->numOfBound / (float)pColInfo->numOfCols);
if (boundRatio < KVRatioKV) {
pBuilder->memRowType = SMEM_ROW_KV;
return TSDB_CODE_SUCCESS;
} else if (boundRatio > KVRatioData) {
pBuilder->memRowType = SMEM_ROW_DATA;
return TSDB_CODE_SUCCESS;
}
pBuilder->compareStat = ROW_COMPARE_NEED;
if (boundRatio < KVRatioPredict) {
pBuilder->memRowType = SMEM_ROW_KV;
} else {
pBuilder->memRowType = SMEM_ROW_DATA;
}
}
pBuilder->kvRowInitLen = TD_MEM_ROW_KV_HEAD_SIZE + pColInfo->numOfBound * sizeof(SColIdx);
if (nRows > 0) {
pBuilder->rowInfo = tcalloc(nRows, sizeof(SMemRowInfo));
if (pBuilder->rowInfo == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
for (int i = 0; i < nRows; ++i) {
(pBuilder->rowInfo + i)->dataLen = TD_MEM_ROW_DATA_HEAD_SIZE + pColInfo->allNullLen;
(pBuilder->rowInfo + i)->kvLen = pBuilder->kvRowInitLen;
}
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -455,18 +416,17 @@ int32_t tsCheckTimestamp(STableDataBlocks *pDataBlocks, const char *start) { ...@@ -455,18 +416,17 @@ int32_t tsCheckTimestamp(STableDataBlocks *pDataBlocks, const char *start) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int tsParseOneRow(char **str, STableDataBlocks *pDataBlocks, int16_t timePrec, int32_t *convertOffset, int32_t rowSize, int tsParseOneRow(char **str, STableDataBlocks *pDataBlocks, int16_t timePrec, int32_t rowSize, char *tmpTokenBuf,
char *tmpTokenBuf, SInsertStatementParam *pInsertParam) { SInsertStatementParam *pInsertParam) {
int32_t tsc_index = 0; int32_t tsc_index = 0;
SStrToken sToken = {0}; SStrToken sToken = {0};
char *row = pDataBlocks->pData + (pDataBlocks->size - (*convertOffset) * rowSize); // skip the SSubmitBlk header char *row = pDataBlocks->pData + pDataBlocks->size; // skip the SSubmitBlk header
SParsedDataColInfo *spd = &pDataBlocks->boundColumnInfo; SParsedDataColInfo *spd = &pDataBlocks->boundColumnInfo;
STableMeta * pTableMeta = pDataBlocks->pTableMeta; STableMeta * pTableMeta = pDataBlocks->pTableMeta;
SSchema * schema = tscGetTableSchema(pTableMeta); SSchema * schema = tscGetTableSchema(pTableMeta);
SMemRowBuilder * pBuilder = &pDataBlocks->rowBuilder; SMemRowBuilder * pBuilder = &pDataBlocks->rowBuilder;
int32_t dataLen = spd->allNullLen + TD_MEM_ROW_DATA_HEAD_SIZE;
int32_t kvLen = pBuilder->kvRowInitLen;
bool isParseBindParam = false; bool isParseBindParam = false;
initSMemRow(row, pBuilder->memRowType, pDataBlocks, spd->numOfBound); initSMemRow(row, pBuilder->memRowType, pDataBlocks, spd->numOfBound);
...@@ -543,8 +503,8 @@ int tsParseOneRow(char **str, STableDataBlocks *pDataBlocks, int16_t timePrec, i ...@@ -543,8 +503,8 @@ int tsParseOneRow(char **str, STableDataBlocks *pDataBlocks, int16_t timePrec, i
int16_t colId = -1; int16_t colId = -1;
tscGetMemRowAppendInfo(schema, pBuilder->memRowType, spd, i, &toffset, &colId); tscGetMemRowAppendInfo(schema, pBuilder->memRowType, spd, i, &toffset, &colId);
int32_t ret = tsParseOneColumnKV(pSchema, &sToken, row, pInsertParam->msg, str, isPrimaryKey, timePrec, toffset, int32_t ret =
colId, &dataLen, &kvLen, pBuilder->compareStat); tsParseOneColumnKV(pSchema, &sToken, row, pInsertParam->msg, str, isPrimaryKey, timePrec, toffset, colId);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
return ret; return ret;
} }
...@@ -559,14 +519,8 @@ int tsParseOneRow(char **str, STableDataBlocks *pDataBlocks, int16_t timePrec, i ...@@ -559,14 +519,8 @@ int tsParseOneRow(char **str, STableDataBlocks *pDataBlocks, int16_t timePrec, i
} }
if (!isParseBindParam) { if (!isParseBindParam) {
// 2. check and set convert flag // set the null value for the columns that do not assign values
bool isNeedConvertRow = false; if ((spd->numOfBound < spd->numOfCols) && isDataRow(row)) {
if (pBuilder->compareStat == ROW_COMPARE_NEED) {
isNeedConvertRow = checkConvertMemRow(row, dataLen, kvLen);
}
// 3. set the null value for the columns that do not assign values
if ((spd->numOfBound < spd->numOfCols) && isDataRow(row) && !isNeedConvertRow) {
SDataRow dataRow = memRowDataBody(row); SDataRow dataRow = memRowDataBody(row);
for (int32_t i = 0; i < spd->numOfCols; ++i) { for (int32_t i = 0; i < spd->numOfCols; ++i) {
if (spd->cols[i].valStat == VAL_STAT_NONE) { if (spd->cols[i].valStat == VAL_STAT_NONE) {
...@@ -574,18 +528,6 @@ int tsParseOneRow(char **str, STableDataBlocks *pDataBlocks, int16_t timePrec, i ...@@ -574,18 +528,6 @@ int tsParseOneRow(char **str, STableDataBlocks *pDataBlocks, int16_t timePrec, i
} }
} }
} }
// 4. perform the convert
if (isNeedConvertRow) {
// put converted row to next location to minimize the memcpy
pDataBlocks->ordered = false;
++(*convertOffset);
convertSMemRow(row + (*convertOffset) * rowSize, row, pDataBlocks);
// revise the predicted memRowType for batch insert
pBuilder->memRowType = pBuilder->memRowType == SMEM_ROW_DATA ? SMEM_ROW_KV : SMEM_ROW_DATA;
} else {
*convertOffset = 0;
}
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -642,15 +584,13 @@ int32_t tsParseValues(char **str, STableDataBlocks *pDataBlock, int maxRows, SIn ...@@ -642,15 +584,13 @@ int32_t tsParseValues(char **str, STableDataBlocks *pDataBlock, int maxRows, SIn
if (TSDB_CODE_SUCCESS != (code = initMemRowBuilder(&pDataBlock->rowBuilder, 0, &pDataBlock->boundColumnInfo))) { if (TSDB_CODE_SUCCESS != (code = initMemRowBuilder(&pDataBlock->rowBuilder, 0, &pDataBlock->boundColumnInfo))) {
return code; return code;
} }
int32_t convertOffset = 0;
while (1) { while (1) {
tsc_index = 0; tsc_index = 0;
sToken = tStrGetToken(*str, &tsc_index, false); sToken = tStrGetToken(*str, &tsc_index, false);
if (sToken.n == 0 || sToken.type != TK_LP) break; if (sToken.n == 0 || sToken.type != TK_LP) break;
*str += tsc_index; *str += tsc_index;
// allocate 1 more row size to facilitate the SDataRow/SKVRow convert if ((*numOfRows) >= maxRows || pDataBlock->size + extendedRowSize >= pDataBlock->nAllocSize) {
if ((*numOfRows + 1) >= maxRows || pDataBlock->size + extendedRowSize >= pDataBlock->nAllocSize) {
int32_t tSize; int32_t tSize;
code = tscAllocateMemIfNeed(pDataBlock, extendedRowSize, &tSize); code = tscAllocateMemIfNeed(pDataBlock, extendedRowSize, &tSize);
if (code != TSDB_CODE_SUCCESS) { //TODO pass the correct error code to client if (code != TSDB_CODE_SUCCESS) { //TODO pass the correct error code to client
...@@ -658,11 +598,11 @@ int32_t tsParseValues(char **str, STableDataBlocks *pDataBlock, int maxRows, SIn ...@@ -658,11 +598,11 @@ int32_t tsParseValues(char **str, STableDataBlocks *pDataBlock, int maxRows, SIn
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
ASSERT(tSize > maxRows); // 1 more row allocated ASSERT(tSize >= maxRows);
maxRows = tSize; maxRows = tSize;
} }
code = tsParseOneRow(str, pDataBlock, precision, &convertOffset, extendedRowSize, tmpTokenBuf, pInsertParam); code = tsParseOneRow(str, pDataBlock, precision, extendedRowSize, tmpTokenBuf, pInsertParam);
if (code != TSDB_CODE_SUCCESS) { // error message has been set in tsParseOneRow, return directly if (code != TSDB_CODE_SUCCESS) { // error message has been set in tsParseOneRow, return directly
return TSDB_CODE_TSC_SQL_SYNTAX_ERROR; return TSDB_CODE_TSC_SQL_SYNTAX_ERROR;
} }
...@@ -679,10 +619,6 @@ int32_t tsParseValues(char **str, STableDataBlocks *pDataBlock, int maxRows, SIn ...@@ -679,10 +619,6 @@ int32_t tsParseValues(char **str, STableDataBlocks *pDataBlock, int maxRows, SIn
(*numOfRows)++; (*numOfRows)++;
} }
if (convertOffset) {
char *convertedSMemRow = pDataBlock->pData + pDataBlock->size;
memcpy(convertedSMemRow - convertOffset * extendedRowSize, convertedSMemRow, (size_t)memRowTLen(convertedSMemRow));
}
if ((*numOfRows) <= 0) { if ((*numOfRows) <= 0) {
strcpy(pInsertParam->msg, "no any data points"); strcpy(pInsertParam->msg, "no any data points");
...@@ -725,7 +661,7 @@ void tscSetBoundColumnInfo(SParsedDataColInfo *pColInfo, SSchema *pSchema, int32 ...@@ -725,7 +661,7 @@ void tscSetBoundColumnInfo(SParsedDataColInfo *pColInfo, SSchema *pSchema, int32
pColInfo->boundedColumns[i] = i; pColInfo->boundedColumns[i] = i;
} }
pColInfo->allNullLen += pColInfo->flen; pColInfo->allNullLen += pColInfo->flen;
pColInfo->boundColsLen = pColInfo->flen; pColInfo->boundNullLen = pColInfo->allNullLen; // default set allNullLen
pColInfo->extendedVarLen = (uint16_t)(nVar * sizeof(VarDataOffsetT)); pColInfo->extendedVarLen = (uint16_t)(nVar * sizeof(VarDataOffsetT));
} }
...@@ -1229,7 +1165,7 @@ static int32_t parseBoundColumns(SInsertStatementParam *pInsertParam, SParsedDat ...@@ -1229,7 +1165,7 @@ static int32_t parseBoundColumns(SInsertStatementParam *pInsertParam, SParsedDat
int32_t nCols = pColInfo->numOfCols; int32_t nCols = pColInfo->numOfCols;
pColInfo->numOfBound = 0; pColInfo->numOfBound = 0;
pColInfo->boundColsLen = 0; pColInfo->boundNullLen = 0;
memset(pColInfo->boundedColumns, 0, sizeof(int32_t) * nCols); memset(pColInfo->boundedColumns, 0, sizeof(int32_t) * nCols);
for (int32_t i = 0; i < nCols; ++i) { for (int32_t i = 0; i < nCols; ++i) {
pColInfo->cols[i].valStat = VAL_STAT_NONE; pColInfo->cols[i].valStat = VAL_STAT_NONE;
...@@ -1279,7 +1215,17 @@ static int32_t parseBoundColumns(SInsertStatementParam *pInsertParam, SParsedDat ...@@ -1279,7 +1215,17 @@ static int32_t parseBoundColumns(SInsertStatementParam *pInsertParam, SParsedDat
pColInfo->cols[t].valStat = VAL_STAT_HAS; pColInfo->cols[t].valStat = VAL_STAT_HAS;
pColInfo->boundedColumns[pColInfo->numOfBound] = t; pColInfo->boundedColumns[pColInfo->numOfBound] = t;
++pColInfo->numOfBound; ++pColInfo->numOfBound;
pColInfo->boundColsLen += pSchema[t].bytes; switch (pSchema[t].type) {
case TSDB_DATA_TYPE_BINARY:
pColInfo->boundNullLen += (VARSTR_HEADER_SIZE + CHAR_BYTES);
break;
case TSDB_DATA_TYPE_NCHAR:
pColInfo->boundNullLen += (VARSTR_HEADER_SIZE + TSDB_NCHAR_SIZE);
break;
default:
pColInfo->boundNullLen += TYPE_BYTES[pSchema[t].type];
break;
}
findColumnIndex = true; findColumnIndex = true;
if (isOrdered && (lastColIdx > t)) { if (isOrdered && (lastColIdx > t)) {
isOrdered = false; isOrdered = false;
...@@ -1303,7 +1249,17 @@ static int32_t parseBoundColumns(SInsertStatementParam *pInsertParam, SParsedDat ...@@ -1303,7 +1249,17 @@ static int32_t parseBoundColumns(SInsertStatementParam *pInsertParam, SParsedDat
pColInfo->cols[t].valStat = VAL_STAT_HAS; pColInfo->cols[t].valStat = VAL_STAT_HAS;
pColInfo->boundedColumns[pColInfo->numOfBound] = t; pColInfo->boundedColumns[pColInfo->numOfBound] = t;
++pColInfo->numOfBound; ++pColInfo->numOfBound;
pColInfo->boundColsLen += pSchema[t].bytes; switch (pSchema[t].type) {
case TSDB_DATA_TYPE_BINARY:
pColInfo->boundNullLen += (VARSTR_HEADER_SIZE + CHAR_BYTES);
break;
case TSDB_DATA_TYPE_NCHAR:
pColInfo->boundNullLen += (VARSTR_HEADER_SIZE + TSDB_NCHAR_SIZE);
break;
default:
pColInfo->boundNullLen += TYPE_BYTES[pSchema[t].type];
break;
}
findColumnIndex = true; findColumnIndex = true;
if (isOrdered && (lastColIdx > t)) { if (isOrdered && (lastColIdx > t)) {
isOrdered = false; isOrdered = false;
...@@ -1756,10 +1712,8 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int32_t numOfRow ...@@ -1756,10 +1712,8 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int32_t numOfRow
goto _error; goto _error;
} }
// insert from .csv means full and ordered columns, thus use SDataRow all the time and no need to covert // insert from .csv means full and ordered columns, thus use SDataRow all the time
ASSERT(SMEM_ROW_DATA == pTableDataBlock->rowBuilder.memRowType && ASSERT(SMEM_ROW_DATA == pTableDataBlock->rowBuilder.memRowType);
ROW_COMPARE_NO_NEED == pTableDataBlock->rowBuilder.compareStat);
int32_t convertOffset = 0;
while ((readLen = tgetline(&line, &n, fp)) != -1) { while ((readLen = tgetline(&line, &n, fp)) != -1) {
if (('\r' == line[readLen - 1]) || ('\n' == line[readLen - 1])) { if (('\r' == line[readLen - 1]) || ('\n' == line[readLen - 1])) {
line[--readLen] = 0; line[--readLen] = 0;
...@@ -1772,8 +1726,7 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int32_t numOfRow ...@@ -1772,8 +1726,7 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int32_t numOfRow
char *lineptr = line; char *lineptr = line;
strtolower(line, line); strtolower(line, line);
code = tsParseOneRow(&lineptr, pTableDataBlock, tinfo.precision, &convertOffset, extendedRowSize, tokenBuf, code = tsParseOneRow(&lineptr, pTableDataBlock, tinfo.precision, extendedRowSize, tokenBuf, pInsertParam);
pInsertParam);
if (code != TSDB_CODE_SUCCESS || pTableDataBlock->numOfParams > 0) { if (code != TSDB_CODE_SUCCESS || pTableDataBlock->numOfParams > 0) {
pSql->res.code = code; pSql->res.code = code;
break; break;
......
...@@ -610,9 +610,6 @@ typedef void *SMemRow; ...@@ -610,9 +610,6 @@ typedef void *SMemRow;
#define SMEM_ROW_DATA 0x0U // SDataRow #define SMEM_ROW_DATA 0x0U // SDataRow
#define SMEM_ROW_KV 0x01U // SKVRow #define SMEM_ROW_KV 0x01U // SKVRow
#define KVRatioKV (0.2f) // all bool
#define KVRatioPredict (0.4f)
#define KVRatioData (0.75f) // all bigint
#define KVRatioConvert (0.9f) #define KVRatioConvert (0.9f)
#define memRowType(r) ((*(uint8_t *)(r)) & 0x01) #define memRowType(r) ((*(uint8_t *)(r)) & 0x01)
...@@ -622,8 +619,7 @@ typedef void *SMemRow; ...@@ -622,8 +619,7 @@ typedef void *SMemRow;
#define isDataRow(r) (SMEM_ROW_DATA == memRowType(r)) #define isDataRow(r) (SMEM_ROW_DATA == memRowType(r))
#define isKvRowT(t) (SMEM_ROW_KV == (((uint8_t)(t)) & 0x01)) #define isKvRowT(t) (SMEM_ROW_KV == (((uint8_t)(t)) & 0x01))
#define isKvRow(r) (SMEM_ROW_KV == memRowType(r)) #define isKvRow(r) (SMEM_ROW_KV == memRowType(r))
#define isConvertToKvRow(k, d) ((k) < ((d)*KVRatioConvert)) #define isConvertToKVRow(k, d) ((k) < ((d)*KVRatioConvert))
#define isConvertToDataRow(k, d) ((k) > (d))
#define memRowDataBody(r) POINTER_SHIFT(r, TD_MEM_ROW_TYPE_SIZE) // section after flag #define memRowDataBody(r) POINTER_SHIFT(r, TD_MEM_ROW_TYPE_SIZE) // section after flag
#define memRowKvBody(r) \ #define memRowKvBody(r) \
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册