提交 3155e725 编写于 作者: C Cary Xu

enhance

上级 1c0e89de
......@@ -448,370 +448,6 @@ int32_t tsCheckTimestamp(STableDataBlocks *pDataBlocks, const char *start) {
return TSDB_CODE_SUCCESS;
}
static int32_t tsParseOneColumnOld(SSchema *pSchema, SStrToken *pToken, char *payload, char *msg, char **str,
bool primaryKey, int16_t timePrec) {
int64_t iv;
int32_t ret;
char * endptr = NULL;
if (IS_NUMERIC_TYPE(pSchema->type) && pToken->n == 0) {
return tscInvalidOperationMsg(msg, "invalid numeric data", pToken->z);
}
switch (pSchema->type) {
case TSDB_DATA_TYPE_BOOL: { // bool
if (isNullStr(pToken)) {
*((uint8_t *)payload) = TSDB_DATA_BOOL_NULL;
} else {
if ((pToken->type == TK_BOOL || pToken->type == TK_STRING) && (pToken->n != 0)) {
if (strncmp(pToken->z, "true", pToken->n) == 0) {
*(uint8_t *)payload = TSDB_TRUE;
} else if (strncmp(pToken->z, "false", pToken->n) == 0) {
*(uint8_t *)payload = TSDB_FALSE;
} else {
return tscSQLSyntaxErrMsg(msg, "invalid bool data", pToken->z);
}
} else if (pToken->type == TK_INTEGER) {
iv = strtoll(pToken->z, NULL, 10);
*(uint8_t *)payload = (int8_t)((iv == 0) ? TSDB_FALSE : TSDB_TRUE);
} else if (pToken->type == TK_FLOAT) {
double dv = strtod(pToken->z, NULL);
*(uint8_t *)payload = (int8_t)((dv == 0) ? TSDB_FALSE : TSDB_TRUE);
} else {
return tscInvalidOperationMsg(msg, "invalid bool data", pToken->z);
}
}
break;
}
case TSDB_DATA_TYPE_TINYINT:
if (isNullStr(pToken)) {
*((uint8_t *)payload) = TSDB_DATA_TINYINT_NULL;
} else {
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true);
if (ret != TSDB_CODE_SUCCESS) {
return tscInvalidOperationMsg(msg, "invalid tinyint data", pToken->z);
} else if (!IS_VALID_TINYINT(iv)) {
return tscInvalidOperationMsg(msg, "data overflow", pToken->z);
}
*((uint8_t *)payload) = (uint8_t)iv;
}
break;
case TSDB_DATA_TYPE_UTINYINT:
if (isNullStr(pToken)) {
*((uint8_t *)payload) = TSDB_DATA_UTINYINT_NULL;
} else {
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false);
if (ret != TSDB_CODE_SUCCESS) {
return tscInvalidOperationMsg(msg, "invalid unsigned tinyint data", pToken->z);
} else if (!IS_VALID_UTINYINT(iv)) {
return tscInvalidOperationMsg(msg, "unsigned tinyint data overflow", pToken->z);
}
*((uint8_t *)payload) = (uint8_t)iv;
}
break;
case TSDB_DATA_TYPE_SMALLINT:
if (isNullStr(pToken)) {
*((int16_t *)payload) = TSDB_DATA_SMALLINT_NULL;
} else {
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true);
if (ret != TSDB_CODE_SUCCESS) {
return tscInvalidOperationMsg(msg, "invalid smallint data", pToken->z);
} else if (!IS_VALID_SMALLINT(iv)) {
return tscInvalidOperationMsg(msg, "smallint data overflow", pToken->z);
}
*((int16_t *)payload) = (int16_t)iv;
}
break;
case TSDB_DATA_TYPE_USMALLINT:
if (isNullStr(pToken)) {
*((uint16_t *)payload) = TSDB_DATA_USMALLINT_NULL;
} else {
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false);
if (ret != TSDB_CODE_SUCCESS) {
return tscInvalidOperationMsg(msg, "invalid unsigned smallint data", pToken->z);
} else if (!IS_VALID_USMALLINT(iv)) {
return tscInvalidOperationMsg(msg, "unsigned smallint data overflow", pToken->z);
}
*((uint16_t *)payload) = (uint16_t)iv;
}
break;
case TSDB_DATA_TYPE_INT:
if (isNullStr(pToken)) {
*((int32_t *)payload) = TSDB_DATA_INT_NULL;
} else {
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true);
if (ret != TSDB_CODE_SUCCESS) {
return tscInvalidOperationMsg(msg, "invalid int data", pToken->z);
} else if (!IS_VALID_INT(iv)) {
return tscInvalidOperationMsg(msg, "int data overflow", pToken->z);
}
*((int32_t *)payload) = (int32_t)iv;
}
break;
case TSDB_DATA_TYPE_UINT:
if (isNullStr(pToken)) {
*((uint32_t *)payload) = TSDB_DATA_UINT_NULL;
} else {
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false);
if (ret != TSDB_CODE_SUCCESS) {
return tscInvalidOperationMsg(msg, "invalid unsigned int data", pToken->z);
} else if (!IS_VALID_UINT(iv)) {
return tscInvalidOperationMsg(msg, "unsigned int data overflow", pToken->z);
}
*((uint32_t *)payload) = (uint32_t)iv;
}
break;
case TSDB_DATA_TYPE_BIGINT:
if (isNullStr(pToken)) {
*((int64_t *)payload) = TSDB_DATA_BIGINT_NULL;
} else {
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true);
if (ret != TSDB_CODE_SUCCESS) {
return tscInvalidOperationMsg(msg, "invalid bigint data", pToken->z);
} else if (!IS_VALID_BIGINT(iv)) {
return tscInvalidOperationMsg(msg, "bigint data overflow", pToken->z);
}
*((int64_t *)payload) = iv;
}
break;
case TSDB_DATA_TYPE_UBIGINT:
if (isNullStr(pToken)) {
*((uint64_t *)payload) = TSDB_DATA_UBIGINT_NULL;
} else {
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false);
if (ret != TSDB_CODE_SUCCESS) {
return tscInvalidOperationMsg(msg, "invalid unsigned bigint data", pToken->z);
} else if (!IS_VALID_UBIGINT((uint64_t)iv)) {
return tscInvalidOperationMsg(msg, "unsigned bigint data overflow", pToken->z);
}
*((uint64_t *)payload) = iv;
}
break;
case TSDB_DATA_TYPE_FLOAT:
if (isNullStr(pToken)) {
*((int32_t *)payload) = TSDB_DATA_FLOAT_NULL;
} else {
double dv;
if (TK_ILLEGAL == tscToDouble(pToken, &dv, &endptr)) {
return tscInvalidOperationMsg(msg, "illegal float data", pToken->z);
}
if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || dv > FLT_MAX || dv < -FLT_MAX || isinf(dv) ||
isnan(dv)) {
return tscInvalidOperationMsg(msg, "illegal float data", pToken->z);
}
// *((float *)payload) = (float)dv;
SET_FLOAT_VAL(payload, dv);
}
break;
case TSDB_DATA_TYPE_DOUBLE:
if (isNullStr(pToken)) {
*((int64_t *)payload) = TSDB_DATA_DOUBLE_NULL;
} else {
double dv;
if (TK_ILLEGAL == tscToDouble(pToken, &dv, &endptr)) {
return tscInvalidOperationMsg(msg, "illegal double data", pToken->z);
}
if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || isinf(dv) || isnan(dv)) {
return tscInvalidOperationMsg(msg, "illegal double data", pToken->z);
}
*((double *)payload) = dv;
}
break;
case TSDB_DATA_TYPE_BINARY:
// binary data cannot be null-terminated char string, otherwise the last char of the string is lost
if (pToken->type == TK_NULL) {
setVardataNull(payload, TSDB_DATA_TYPE_BINARY);
} else { // too long values will return invalid sql, not be truncated automatically
if (pToken->n + VARSTR_HEADER_SIZE > pSchema->bytes) { // todo refactor
return tscInvalidOperationMsg(msg, "string data overflow", pToken->z);
}
STR_WITH_SIZE_TO_VARSTR(payload, pToken->z, pToken->n);
}
break;
case TSDB_DATA_TYPE_NCHAR:
if (pToken->type == TK_NULL) {
setVardataNull(payload, TSDB_DATA_TYPE_NCHAR);
} else {
// if the converted output len is over than pColumnModel->bytes, return error: 'Argument list too long'
int32_t output = 0;
if (!taosMbsToUcs4(pToken->z, pToken->n, varDataVal(payload), pSchema->bytes - VARSTR_HEADER_SIZE, &output)) {
char buf[512] = {0};
snprintf(buf, tListLen(buf), "%s", strerror(errno));
return tscInvalidOperationMsg(msg, buf, pToken->z);
}
varDataSetLen(payload, output);
}
break;
case TSDB_DATA_TYPE_TIMESTAMP: {
if (pToken->type == TK_NULL) {
if (primaryKey) {
*((int64_t *)payload) = 0;
} else {
*((int64_t *)payload) = TSDB_DATA_BIGINT_NULL;
}
} else {
int64_t temp;
if (tsParseTime(pToken, &temp, str, msg, timePrec) != TSDB_CODE_SUCCESS) {
return tscInvalidOperationMsg(msg, "invalid timestamp", pToken->z);
}
*((int64_t *)payload) = temp;
}
break;
}
}
return TSDB_CODE_SUCCESS;
}
static int tsParseOneRowOld(char **str, STableDataBlocks *pDataBlocks, int16_t timePrec, int32_t *len,
char *tmpTokenBuf, SInsertStatementParam *pInsertParam) {
int32_t index = 0;
SStrToken sToken = {0};
char * payload = pDataBlocks->pData + pDataBlocks->size;
SParsedDataColInfo *spd = &pDataBlocks->boundColumnInfo;
SSchema * schema = tscGetTableSchema(pDataBlocks->pTableMeta);
// 1. set the parsed value from sql string
int32_t rowSize = 0;
for (int i = 0; i < spd->numOfBound; ++i) {
// the start position in data block buffer of current value in sql
int32_t colIndex = spd->boundedColumns[i];
char * start = payload + spd->cols[colIndex].offset;
SSchema *pSchema = &schema[colIndex];
rowSize += pSchema->bytes;
index = 0;
sToken = tStrGetToken(*str, &index, true);
*str += index;
if (sToken.type == TK_QUESTION) {
if (pInsertParam->insertType != TSDB_QUERY_TYPE_STMT_INSERT) {
return tscSQLSyntaxErrMsg(pInsertParam->msg, "? only allowed in binding insertion", *str);
}
uint32_t offset = (uint32_t)(start - pDataBlocks->pData);
if (tscAddParamToDataBlock(pDataBlocks, pSchema->type, (uint8_t)timePrec, pSchema->bytes, offset) != NULL) {
continue;
}
strcpy(pInsertParam->msg, "client out of memory");
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
int16_t type = sToken.type;
if ((type != TK_NOW && type != TK_INTEGER && type != TK_STRING && type != TK_FLOAT && type != TK_BOOL &&
type != TK_NULL && type != TK_HEX && type != TK_OCT && type != TK_BIN) ||
(sToken.n == 0) || (type == TK_RP)) {
return tscSQLSyntaxErrMsg(pInsertParam->msg, "invalid data or symbol", sToken.z);
}
// Remove quotation marks
if (TK_STRING == sToken.type) {
// delete escape character: \\, \', \"
char delim = sToken.z[0];
int32_t cnt = 0;
int32_t j = 0;
if (sToken.n >= TSDB_MAX_BYTES_PER_ROW) {
return tscSQLSyntaxErrMsg(pInsertParam->msg, "too long string", sToken.z);
}
for (uint32_t k = 1; k < sToken.n - 1; ++k) {
if (sToken.z[k] == '\\' || (sToken.z[k] == delim && sToken.z[k + 1] == delim)) {
tmpTokenBuf[j] = sToken.z[k + 1];
cnt++;
j++;
k++;
continue;
}
tmpTokenBuf[j] = sToken.z[k];
j++;
}
tmpTokenBuf[j] = 0;
sToken.z = tmpTokenBuf;
sToken.n -= 2 + cnt;
}
bool isPrimaryKey = (colIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX);
int32_t ret = tsParseOneColumnOld(pSchema, &sToken, start, pInsertParam->msg, str, isPrimaryKey, timePrec);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
if (isPrimaryKey && tsCheckTimestamp(pDataBlocks, start) != TSDB_CODE_SUCCESS) {
tscInvalidOperationMsg(pInsertParam->msg, "client time/server time can not be mixed up", sToken.z);
return TSDB_CODE_TSC_INVALID_TIME_STAMP;
}
}
// 2. set the null value for the columns that do not assign values
if (spd->numOfBound < spd->numOfCols) {
char *ptr = payload;
for (int32_t i = 0; i < spd->numOfCols; ++i) {
if (spd->cols[i].valStat == VAL_STAT_NONE) { // current column do not have any value to insert, set it to null
if (schema[i].type == TSDB_DATA_TYPE_BINARY) {
varDataSetLen(ptr, sizeof(int8_t));
*(uint8_t *)varDataVal(ptr) = TSDB_DATA_BINARY_NULL;
} else if (schema[i].type == TSDB_DATA_TYPE_NCHAR) {
varDataSetLen(ptr, sizeof(int32_t));
*(uint32_t *)varDataVal(ptr) = TSDB_DATA_NCHAR_NULL;
} else {
setNull(ptr, schema[i].type, schema[i].bytes);
}
}
ptr += schema[i].bytes;
}
rowSize = (int32_t)(ptr - payload);
}
*len = rowSize;
return TSDB_CODE_SUCCESS;
}
int tsParseOneRow(char **str, STableDataBlocks *pDataBlocks, int16_t timePrec, int32_t *len, char *tmpTokenBuf,
SInsertStatementParam *pInsertParam) {
int32_t index = 0;
......@@ -1036,6 +672,7 @@ int32_t tsParseValues(char **str, STableDataBlocks *pDataBlock, int maxRows, SIn
}
}
#if 1
void tscSetBoundColumnInfo(SParsedDataColInfo *pColInfo, SSchema *pSchema, int32_t numOfCols) {
pColInfo->numOfCols = numOfCols;
pColInfo->numOfBound = numOfCols;
......@@ -1071,6 +708,30 @@ void tscSetBoundColumnInfo(SParsedDataColInfo *pColInfo, SSchema *pSchema, int32
pColInfo->allNullLen += pColInfo->flen;
pColInfo->extendedVarLen = (uint16_t)(nVar * sizeof(VarDataOffsetT));
}
#endif
#if 0
void tscSetBoundColumnInfo(SParsedDataColInfo *pColInfo, SSchema *pSchema, int32_t numOfCols) {
pColInfo->numOfCols = numOfCols;
pColInfo->numOfBound = numOfCols;
pColInfo->orderStatus = ORDER_STATUS_ORDERED; // default is ORDERED for non-bound mode
pColInfo->boundedColumns = calloc(pColInfo->numOfCols, sizeof(int32_t));
pColInfo->cols = calloc(pColInfo->numOfCols, sizeof(SBoundColumn));
pColInfo->colIdxInfo = NULL;
pColInfo->flen = 0;
pColInfo->allNullLen = 0;
for (int32_t i = 0; i < pColInfo->numOfCols; ++i) {
if (i > 0) {
pColInfo->cols[i].offset = pSchema[i - 1].bytes + pColInfo->cols[i - 1].offset;
}
pColInfo->cols[i].valStat = VAL_STAT_HAS;
pColInfo->boundedColumns[i] = i;
}
}
#endif
int32_t tscAllocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int32_t * numOfRows) {
size_t remain = pDataBlock->nAllocSize - pDataBlock->size;
......@@ -1172,13 +833,12 @@ int tscSortRemoveDataBlockDupRows(STableDataBlocks *dataBuf, SBlockKeyInfo *pBlk
// allocate memory
size_t nAlloc = nRows * sizeof(SBlockKeyTuple);
if (pBlkKeyInfo->pKeyTuple == NULL || pBlkKeyInfo->maxBytesAlloc < nAlloc) {
size_t nRealAlloc = nAlloc + 10 * sizeof(SBlockKeyTuple);
char * tmp = trealloc(pBlkKeyInfo->pKeyTuple, nRealAlloc);
char *tmp = trealloc(pBlkKeyInfo->pKeyTuple, nAlloc);
if (tmp == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
pBlkKeyInfo->pKeyTuple = (SBlockKeyTuple *)tmp;
pBlkKeyInfo->maxBytesAlloc = (int32_t)nRealAlloc;
pBlkKeyInfo->maxBytesAlloc = (int32_t)nAlloc;
}
memset(pBlkKeyInfo->pKeyTuple, 0, nAlloc);
......@@ -2061,7 +1721,7 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int32_t numOfRow
STableComInfo tinfo = tscGetTableInfo(pTableMeta);
SInsertStatementParam* pInsertParam = &pCmd->insertParam;
pInsertParam->payloadType = PAYLOAD_TYPE_RAW;
// pInsertParam->payloadType = PAYLOAD_TYPE_RAW;
destroyTableNameList(pInsertParam);
pInsertParam->pDataBlocks = tscDestroyBlockArrayList(pInsertParam->pDataBlocks);
......@@ -2103,7 +1763,7 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int32_t numOfRow
strtolower(line, line);
int32_t len = 0;
code = tsParseOneRowOld(&lineptr, pTableDataBlock, tinfo.precision, &len, tokenBuf, pInsertParam);
code = tsParseOneRow(&lineptr, pTableDataBlock, tinfo.precision, &len, tokenBuf, pInsertParam);
if (code != TSDB_CODE_SUCCESS || pTableDataBlock->numOfParams > 0) {
pSql->res.code = code;
break;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册