提交 1c52e659 编写于 作者: F freemine

1. add batch-insert

2. allow buffer-type conversion within taos
上级 e7fb3285
......@@ -139,7 +139,7 @@ static int normalStmtBindParam(STscStmt* stmt, TAOS_BIND* bind) {
return TSDB_CODE_TSC_INVALID_VALUE;
}
}
return TSDB_CODE_SUCCESS;
}
......@@ -213,7 +213,7 @@ static char* normalStmtBuildSql(STscStmt* stmt) {
case TSDB_DATA_TYPE_NULL:
taosStringBuilderAppendNull(&sb);
break;
case TSDB_DATA_TYPE_BOOL:
case TSDB_DATA_TYPE_TINYINT:
case TSDB_DATA_TYPE_SMALLINT:
......@@ -266,6 +266,387 @@ static int doBindParam(char* data, SParamInfo* param, TAOS_BIND* bind) {
return TSDB_CODE_SUCCESS;
}
if (1) {
// allow user bind param data with different type
short size = 0;
union {
int8_t v1;
int16_t v2;
int32_t v4;
int64_t v8;
float f4;
double f8;
unsigned char buf[32*1024];
} u;
switch (param->type) {
case TSDB_DATA_TYPE_BOOL: {
switch (bind->buffer_type) {
case TSDB_DATA_TYPE_BOOL: {
u.v1 = *(int8_t*)bind->buffer;
if (u.v1==0 || u.v1==1) break;
} break;
case TSDB_DATA_TYPE_TINYINT: {
u.v1 = *(int8_t*)bind->buffer;
if (u.v1==0 || u.v1==1) break;
} break;
case TSDB_DATA_TYPE_SMALLINT: {
u.v1 = *(int16_t*)bind->buffer;
if (u.v1==0 || u.v1==1) break;
} break;
case TSDB_DATA_TYPE_INT: {
u.v1 = *(int32_t*)bind->buffer;
if (u.v1==0 || u.v1==1) break;
} break;
case TSDB_DATA_TYPE_BIGINT: {
u.v1 = *(int64_t*)bind->buffer;
if (u.v1==0 || u.v1==1) break;
} break;
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR: {
// "0", "1" convertible
if (strncmp((const char*)bind->buffer, "0", *bind->length)==0) {
u.v1 = 0;
break;
}
if (strncmp((const char*)bind->buffer, "1", *bind->length)==0) {
u.v1 = 1;
break;
}
return TSDB_CODE_TSC_INVALID_VALUE;
} break;
case TSDB_DATA_TYPE_FLOAT:
case TSDB_DATA_TYPE_DOUBLE:
case TSDB_DATA_TYPE_TIMESTAMP:
default: {
return TSDB_CODE_TSC_INVALID_VALUE;
} break;
}
memcpy(data + param->offset, &u.v1, sizeof(u.v1));
return TSDB_CODE_SUCCESS;
} break;
case TSDB_DATA_TYPE_TINYINT: {
switch (bind->buffer_type) {
case TSDB_DATA_TYPE_BOOL:
case TSDB_DATA_TYPE_TINYINT: {
int8_t v = *(int8_t*)bind->buffer;
u.v1 = v;
if (v >= SCHAR_MIN && v <= SCHAR_MAX) break;
} break;
case TSDB_DATA_TYPE_SMALLINT: {
int16_t v = *(int16_t*)bind->buffer;
u.v1 = v;
if (v >= SCHAR_MIN && v <= SCHAR_MAX) break;
return TSDB_CODE_TSC_INVALID_VALUE;
} break;
case TSDB_DATA_TYPE_INT: {
int32_t v = *(int32_t*)bind->buffer;
u.v1 = v;
if (v >= SCHAR_MIN && v <= SCHAR_MAX) break;
return TSDB_CODE_TSC_INVALID_VALUE;
} break;
case TSDB_DATA_TYPE_BIGINT: {
int64_t v = *(int64_t*)bind->buffer;
u.v1 = v;
if (v >= SCHAR_MIN && v <= SCHAR_MAX) break;
return TSDB_CODE_TSC_INVALID_VALUE;
} break;
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR: {
int64_t v;
int n,r;
r = sscanf((const char*)bind->buffer, "%ld%n", &v, &n);
if (r==1 && n==strlen((const char*)bind->buffer)) {
u.v1 = v;
if (v >= SCHAR_MIN && v <= SCHAR_MAX) break;
}
return TSDB_CODE_TSC_INVALID_VALUE;
} break;
case TSDB_DATA_TYPE_FLOAT:
case TSDB_DATA_TYPE_DOUBLE:
case TSDB_DATA_TYPE_TIMESTAMP:
default: {
return TSDB_CODE_TSC_INVALID_VALUE;
} break;
}
memcpy(data + param->offset, &u.v1, sizeof(u.v1));
return TSDB_CODE_SUCCESS;
} break;
case TSDB_DATA_TYPE_SMALLINT: {
switch (bind->buffer_type) {
case TSDB_DATA_TYPE_BOOL:
case TSDB_DATA_TYPE_TINYINT:
case TSDB_DATA_TYPE_SMALLINT: {
int v = *(int16_t*)bind->buffer;
u.v2 = v;
} break;
case TSDB_DATA_TYPE_INT: {
int32_t v = *(int32_t*)bind->buffer;
u.v2 = v;
if (v >= SHRT_MIN && v <= SHRT_MAX) break;
return TSDB_CODE_TSC_INVALID_VALUE;
} break;
case TSDB_DATA_TYPE_BIGINT: {
int64_t v = *(int64_t*)bind->buffer;
u.v2 = v;
if (v >= SHRT_MIN && v <= SHRT_MAX) break;
return TSDB_CODE_TSC_INVALID_VALUE;
} break;
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR: {
int64_t v;
int n,r;
r = sscanf((const char*)bind->buffer, "%ld%n", &v, &n);
if (r==1 && n==strlen((const char*)bind->buffer)) {
u.v2 = v;
if (v >= SHRT_MIN && v <= SHRT_MAX) break;
}
return TSDB_CODE_TSC_INVALID_VALUE;
} break;
case TSDB_DATA_TYPE_FLOAT:
case TSDB_DATA_TYPE_DOUBLE:
case TSDB_DATA_TYPE_TIMESTAMP:
default: {
return TSDB_CODE_TSC_INVALID_VALUE;
} break;
}
memcpy(data + param->offset, &u.v2, sizeof(u.v2));
return TSDB_CODE_SUCCESS;
} break;
case TSDB_DATA_TYPE_INT: {
switch (bind->buffer_type) {
case TSDB_DATA_TYPE_BOOL:
case TSDB_DATA_TYPE_TINYINT:
case TSDB_DATA_TYPE_SMALLINT:
case TSDB_DATA_TYPE_INT: {
u.v4 = *(int32_t*)bind->buffer;
} break;
case TSDB_DATA_TYPE_BIGINT: {
int64_t v = *(int64_t*)bind->buffer;
u.v4 = v;
if (v >= INT_MIN && v <= INT_MAX) break;
return TSDB_CODE_TSC_INVALID_VALUE;
} break;
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR: {
int64_t v;
int n,r;
r = sscanf((const char*)bind->buffer, "%ld%n", &v, &n);
if (r==1 && n==strlen((const char*)bind->buffer)) {
u.v4 = v;
if (v >= INT_MIN && v <= INT_MAX) break;
}
return TSDB_CODE_TSC_INVALID_VALUE;
} break;
case TSDB_DATA_TYPE_FLOAT:
case TSDB_DATA_TYPE_DOUBLE:
case TSDB_DATA_TYPE_TIMESTAMP:
default: {
return TSDB_CODE_TSC_INVALID_VALUE;
} break;
}
memcpy(data + param->offset, &u.v2, sizeof(u.v2));
return TSDB_CODE_SUCCESS;
} break;
case TSDB_DATA_TYPE_FLOAT: {
switch (bind->buffer_type) {
case TSDB_DATA_TYPE_BOOL:
case TSDB_DATA_TYPE_TINYINT: {
u.f4 = *(int8_t*)bind->buffer;
} break;
case TSDB_DATA_TYPE_SMALLINT: {
u.f4 = *(int16_t*)bind->buffer;
} break;
case TSDB_DATA_TYPE_INT: {
u.f4 = *(int32_t*)bind->buffer;
// shall we check equality?
} break;
case TSDB_DATA_TYPE_BIGINT: {
u.f4 = *(int64_t*)bind->buffer;
// shall we check equality?
} break;
case TSDB_DATA_TYPE_FLOAT: {
u.f4 = *(float*)bind->buffer;
} break;
case TSDB_DATA_TYPE_DOUBLE: {
u.f4 = *(float*)bind->buffer;
// shall we check equality?
} break;
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR: {
float v;
int n,r;
r = sscanf((const char*)bind->buffer, "%f%n", &v, &n);
if (r==1 && n==strlen((const char*)bind->buffer)) {
u.f4 = v;
break;
}
return TSDB_CODE_TSC_INVALID_VALUE;
} break;
case TSDB_DATA_TYPE_TIMESTAMP:
default: {
return TSDB_CODE_TSC_INVALID_VALUE;
} break;
}
memcpy(data + param->offset, &u.f4, sizeof(u.f4));
return TSDB_CODE_SUCCESS;
} break;
case TSDB_DATA_TYPE_BIGINT: {
switch (bind->buffer_type) {
case TSDB_DATA_TYPE_BOOL:
case TSDB_DATA_TYPE_TINYINT: {
u.v8 = *(int8_t*)bind->buffer;
} break;
case TSDB_DATA_TYPE_SMALLINT: {
u.v8 = *(int16_t*)bind->buffer;
} break;
case TSDB_DATA_TYPE_INT: {
u.v8 = *(int32_t*)bind->buffer;
} break;
case TSDB_DATA_TYPE_BIGINT: {
u.v8 = *(int64_t*)bind->buffer;
} break;
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR: {
int64_t v;
int n,r;
r = sscanf((const char*)bind->buffer, "%ld%n", &v, &n);
if (r==1 && n==strlen((const char*)bind->buffer)) {
u.v8 = v;
break;
}
return TSDB_CODE_TSC_INVALID_VALUE;
} break;
case TSDB_DATA_TYPE_FLOAT:
case TSDB_DATA_TYPE_DOUBLE:
case TSDB_DATA_TYPE_TIMESTAMP:
default: {
return TSDB_CODE_TSC_INVALID_VALUE;
} break;
}
memcpy(data + param->offset, &u.v8, sizeof(u.v8));
return TSDB_CODE_SUCCESS;
} break;
case TSDB_DATA_TYPE_DOUBLE: {
switch (bind->buffer_type) {
case TSDB_DATA_TYPE_BOOL:
case TSDB_DATA_TYPE_TINYINT: {
u.f8 = *(int8_t*)bind->buffer;
} break;
case TSDB_DATA_TYPE_SMALLINT: {
u.f8 = *(int16_t*)bind->buffer;
} break;
case TSDB_DATA_TYPE_INT: {
u.f8 = *(int32_t*)bind->buffer;
} break;
case TSDB_DATA_TYPE_BIGINT: {
u.f8 = *(int64_t*)bind->buffer;
} break;
case TSDB_DATA_TYPE_FLOAT: {
u.f8 = *(float*)bind->buffer;
} break;
case TSDB_DATA_TYPE_DOUBLE: {
u.f8 = *(double*)bind->buffer;
} break;
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR: {
double v;
int n,r;
r = sscanf((const char*)bind->buffer, "%lf%n", &v, &n);
if (r==1 && n==strlen((const char*)bind->buffer)) {
u.v8 = v;
break;
}
return TSDB_CODE_TSC_INVALID_VALUE;
} break;
case TSDB_DATA_TYPE_TIMESTAMP:
default: {
return TSDB_CODE_TSC_INVALID_VALUE;
} break;
} break;
memcpy(data + param->offset, &u.f8, sizeof(u.f8));
return TSDB_CODE_SUCCESS;
} break;
case TSDB_DATA_TYPE_TIMESTAMP: {
switch (bind->buffer_type) {
case TSDB_DATA_TYPE_TIMESTAMP: {
u.v8 = *(int64_t*)bind->buffer;
} break;
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR: {
// is this the correct way to call taosParseTime?
if (taosParseTime(bind->buffer, &u.v8, *bind->length, 3, tsDaylight) == TSDB_CODE_SUCCESS) {
break;
}
return TSDB_CODE_TSC_INVALID_VALUE;
} break;
case TSDB_DATA_TYPE_BOOL:
case TSDB_DATA_TYPE_TINYINT:
case TSDB_DATA_TYPE_SMALLINT:
case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_FLOAT:
case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_DOUBLE:
default: {
return TSDB_CODE_TSC_INVALID_VALUE;
} break;
} break;
memcpy(data + param->offset, &u.v8, sizeof(u.v8));
return TSDB_CODE_SUCCESS;
} break;
case TSDB_DATA_TYPE_BINARY: {
switch (bind->buffer_type) {
case TSDB_DATA_TYPE_BINARY: {
if ((*bind->length) > (uintptr_t)param->bytes) {
return TSDB_CODE_TSC_INVALID_VALUE;
}
size = (short)*bind->length;
STR_WITH_SIZE_TO_VARSTR(data + param->offset, bind->buffer, size);
return TSDB_CODE_SUCCESS;
} break;
case TSDB_DATA_TYPE_BOOL:
case TSDB_DATA_TYPE_TINYINT:
case TSDB_DATA_TYPE_SMALLINT:
case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_FLOAT:
case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_DOUBLE:
case TSDB_DATA_TYPE_TIMESTAMP:
case TSDB_DATA_TYPE_NCHAR:
default: {
return TSDB_CODE_TSC_INVALID_VALUE;
} break;
}
} break;
case TSDB_DATA_TYPE_NCHAR: {
switch (bind->buffer_type) {
case TSDB_DATA_TYPE_NCHAR: {
size_t output = 0;
if (!taosMbsToUcs4(bind->buffer, *bind->length, varDataVal(data + param->offset), param->bytes - VARSTR_HEADER_SIZE, &output)) {
return TSDB_CODE_TSC_INVALID_VALUE;
}
varDataSetLen(data + param->offset, output);
return TSDB_CODE_SUCCESS;
} break;
case TSDB_DATA_TYPE_BOOL:
case TSDB_DATA_TYPE_TINYINT:
case TSDB_DATA_TYPE_SMALLINT:
case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_FLOAT:
case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_DOUBLE:
case TSDB_DATA_TYPE_TIMESTAMP:
case TSDB_DATA_TYPE_BINARY:
default: {
return TSDB_CODE_TSC_INVALID_VALUE;
} break;
}
} break;
default: {
return TSDB_CODE_TSC_INVALID_VALUE;
} break;
}
}
if (bind->buffer_type != param->type) {
return TSDB_CODE_TSC_INVALID_VALUE;
}
......@@ -299,12 +680,12 @@ static int doBindParam(char* data, SParamInfo* param, TAOS_BIND* bind) {
size = (short)*bind->length;
STR_WITH_SIZE_TO_VARSTR(data + param->offset, bind->buffer, size);
return TSDB_CODE_SUCCESS;
case TSDB_DATA_TYPE_NCHAR: {
size_t output = 0;
if (!taosMbsToUcs4(bind->buffer, *bind->length, varDataVal(data + param->offset), param->bytes - VARSTR_HEADER_SIZE, &output)) {
return TSDB_CODE_TSC_INVALID_VALUE;
}
}
varDataSetLen(data + param->offset, output);
return TSDB_CODE_SUCCESS;
}
......@@ -358,7 +739,7 @@ static int insertStmtBindParam(STscStmt* stmt, TAOS_BIND* bind) {
}
// actual work of all data blocks is done, update block size and numOfRows.
// note we don't do this block by block during the binding process, because
// note we don't do this block by block during the binding process, because
// we cannot recover if something goes wrong.
pCmd->batchSize = binded * 2 + 1;
......@@ -405,7 +786,7 @@ static int insertStmtReset(STscStmt* pStmt) {
}
}
pCmd->batchSize = 0;
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
pTableMetaInfo->vgroupIndex = 0;
return TSDB_CODE_SUCCESS;
......@@ -447,7 +828,7 @@ static int insertStmtExecute(STscStmt* stmt) {
pRes->numOfRows = 0;
pRes->numOfTotal = 0;
pRes->numOfClauseTotal = 0;
pRes->qhandle = 0;
pSql->cmd.insertType = 0;
......@@ -508,35 +889,35 @@ int taos_stmt_prepare(TAOS_STMT* stmt, const char* sql, unsigned long length) {
SSqlObj* pSql = pStmt->pSql;
size_t sqlLen = strlen(sql);
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
pSql->param = (void*) pSql;
pSql->fp = waitForQueryRsp;
pSql->cmd.insertType = TSDB_QUERY_TYPE_STMT_INSERT;
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE)) {
tscError("%p failed to malloc payload buffer", pSql);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
pSql->sqlstr = realloc(pSql->sqlstr, sqlLen + 1);
if (pSql->sqlstr == NULL) {
tscError("%p failed to malloc sql string buffer", pSql);
free(pCmd->payload);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
pRes->qhandle = 0;
pRes->numOfRows = 1;
strtolower(pSql->sqlstr, sql);
tscDebugL("%p SQL: %s", pSql, pSql->sqlstr);
if (tscIsInsertData(pSql->sqlstr)) {
if (tscIsInsertData(pSql->sqlstr)) {
pStmt->isInsert = true;
pSql->cmd.numOfParams = 0;
pSql->cmd.batchSize = 0;
......@@ -548,7 +929,7 @@ int taos_stmt_prepare(TAOS_STMT* stmt, const char* sql, unsigned long length) {
tsem_wait(&pSql->rspSem);
return pSql->res.code;
}
return code;
}
......@@ -665,7 +1046,9 @@ int taos_stmt_num_params(TAOS_STMT *stmt, int *nums) {
*nums = pCmd->numOfParams;
return TSDB_CODE_SUCCESS;
} else {
return TSDB_CODE_TSC_APP_ERROR;
SNormalStmt* normal = &pStmt->normal;
*nums = normal->numParams;
return TSDB_CODE_SUCCESS;
}
}
......
此差异已折叠。
......@@ -99,6 +99,18 @@ const char* sql_c_type(int type) {
}
}
int is_valid_sql_c_type(int type) {
const char *ctype = sql_c_type(type);
if (strcmp(ctype, "UNKNOWN")==0) return 0;
return 1;
}
int is_valid_sql_sql_type(int type) {
const char *sqltype = sql_sql_type(type);
if (strcmp(sqltype, "UNKNOWN")==0) return 0;
return 1;
}
int string_conv(const char *fromcode, const char *tocode,
const unsigned char *src, size_t sbytes,
unsigned char *dst, size_t dbytes,
......
......@@ -47,6 +47,8 @@ do { \
const char* sql_sql_type(int type);
const char* sql_c_type(int type);
int is_valid_sql_c_type(int type);
int is_valid_sql_sql_type(int type);
int string_conv(const char *fromcode, const char *tocode,
const unsigned char *src, size_t sbytes,
......
......@@ -43,10 +43,13 @@ cursor = cnxn.cursor()
cursor.execute("insert into db.t values('2020-10-13 06:44:00', 1, 127, 32767, 32768, 32769, 123.456, 789.987, 'hello', 'world')")
cursor.close()
print("hhhhhhhhhhhhhhhhhhhhhh")
cursor = cnxn.cursor()
cursor.execute("insert into db.t values(?,?,?,?,?,?,?,?,?,?)", "2020-10-13 07:06:00", 0, 127, 32767, 32768, 32769, 123.456, 789.987, "hel后lo", "wo哈rld");
cursor.execute("insert into db.t values(?,?,?,?,?,?,?,?,?,?)", "2020-10-13 07:06:00", 0, 229, 32767, 32768, 32769, 123.456, 789.987, "hel后lo", "wo哈rld");
cursor.close()
print("hhhhhhhhhhhhhhhhhhhhhh")
cursor = cnxn.cursor()
cursor.execute("SELECT * from db.t")
row = cursor.fetchone()
......@@ -79,3 +82,34 @@ while row:
row = cursor.fetchone()
cursor.close()
cursor = cnxn.cursor()
cursor.execute("create table db.v (ts timestamp, v1 tinyint)")
cursor.close()
params = [ ('A', 1), ('B', 2), ('C', 3) ]
params = [ ('A', 1), ('B', 2), ('C', 3) ]
params = [ ('2020-10-16 00:00:00', 1),
('2020-10-16 00:00:01', 4),
('2020-10-16 00:00:02', 5),
('2020-10-16 00:00:03', 6) ]
cursor = cnxn.cursor()
cursor.fast_executemany = True
cursor.executemany("insert into db.v values (?, ?)", params)
cursor.close()
cursor = cnxn.cursor()
cursor.execute("SELECT * from db.v")
row = cursor.fetchone()
while row:
print(row)
row = cursor.fetchone()
cursor.close()
cursor = cnxn.cursor()
cursor.execute("SELECT * from db.v where v1 > ?", 4)
row = cursor.fetchone()
while row:
print(row)
row = cursor.fetchone()
cursor.close()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册