diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index f1b620176d53f24ede3d7a39214a809f54b3fe80..3c1d43d688c57afaee1e8b8fbce33b6eda2da122 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -317,6 +317,7 @@ typedef struct SSqlObj { SRpcIpSet ipList; char freed : 4; char listed : 4; + uint32_t insertType; tsem_t rspSem; SSqlCmd cmd; SSqlRes res; @@ -402,6 +403,7 @@ void tscCloseTscObj(STscObj *pObj); TAOS *taos_connect_a(char *ip, char *user, char *pass, char *db, uint16_t port, void (*fp)(void *, TAOS_RES *, int), void *param, void **taos); +void waitForQueryRsp(void *param, TAOS_RES *tres, int code) ; void doAsyncQuery(STscObj *pObj, SSqlObj *pSql, void (*fp)(), void *param, const char *sqlstr, size_t sqlLen); diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index 96837e4dd47045617e50f0a0e9eeb6982f3453b5..18f987216025bb41033a4e43b6af9fd8e389ffc0 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -482,6 +482,14 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { } } else { code = tsParseSql(pSql, false); + if ((pQueryInfo->type & TSDB_QUERY_TYPE_STMT_INSERT) == TSDB_QUERY_TYPE_STMT_INSERT) { + STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); + code = tscGetTableMeta(pSql, pTableMetaInfo); + assert(code == TSDB_CODE_SUCCESS && pTableMetaInfo->pTableMeta != NULL); + (*pSql->fp)(pSql->param, NULL, code); + return; + } + if (code == TSDB_CODE_ACTION_IN_PROGRESS) return; } } diff --git a/src/client/src/tscParseInsert.c b/src/client/src/tscParseInsert.c index 79872e22c8bf88e13e45714bbeeceec9b7cf6914..d914f392fddf395329de96f7112706ef3097406a 100644 --- a/src/client/src/tscParseInsert.c +++ b/src/client/src/tscParseInsert.c @@ -1312,6 +1312,7 @@ int tsParseInsertSql(SSqlObj *pSql) { tscGetQueryInfoDetailSafely(pCmd, pCmd->clauseIndex, &pQueryInfo); TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_INSERT); + TSDB_QUERY_SET_TYPE(pQueryInfo->type, pSql->insertType); sToken = tStrGetToken(pSql->sqlstr, &index, false, 0, NULL); if (sToken.type != TK_INTO) { @@ -1339,7 +1340,7 @@ int tsParseSql(SSqlObj *pSql, bool initialParse) { * Set the fp before parse the sql string, in case of getTableMeta failed, in which * the error handle callback function can rightfully restore the user-defined callback function (fp). */ - if (initialParse) { + if (initialParse && (pSql->insertType != TSDB_QUERY_TYPE_STMT_INSERT)) { pSql->fetchFp = pSql->fp; pSql->fp = (void(*)())tscHandleMultivnodeInsert; } diff --git a/src/client/src/tscPrepare.c b/src/client/src/tscPrepare.c index 5aaa53c549c06d1937c7a49bb2794838433790fc..d66e40b20f972b62cbbbb10985443a4aac6b416f 100644 --- a/src/client/src/tscPrepare.c +++ b/src/client/src/tscPrepare.c @@ -12,6 +12,7 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ +#include "os.h" #include "taos.h" #include "tsclient.h" @@ -20,6 +21,7 @@ #include "taosmsg.h" #include "tstrbuild.h" #include "tscLog.h" +#include "tscSubquery.h" int tsParseInsertSql(SSqlObj *pSql); int taos_query_imp(STscObj* pObj, SSqlObj* pSql); @@ -262,7 +264,11 @@ static char* normalStmtBuildSql(STscStmt* stmt) { static int doBindParam(char* data, SParamInfo* param, TAOS_BIND* bind) { if (bind->is_null != NULL && *(bind->is_null)) { - setNull(data, param->type, param->bytes); + if (param->type == TSDB_DATA_TYPE_BINARY || param->type == TSDB_DATA_TYPE_NCHAR) { + setVardataNull(data + param->offset, param->type); + } else { + setNull(data + param->offset, param->type, param->bytes); + } return TSDB_CODE_SUCCESS; } @@ -297,14 +303,17 @@ static int doBindParam(char* data, SParamInfo* param, TAOS_BIND* bind) { return TSDB_CODE_INVALID_VALUE; } size = (short)*bind->length; - break; + STR_WITH_SIZE_TO_VARSTR(data + param->offset, bind->buffer, size); + return TSDB_CODE_SUCCESS; - case TSDB_DATA_TYPE_NCHAR: - if (!taosMbsToUcs4(bind->buffer, *bind->length, data + param->offset, param->bytes, NULL)) { + case TSDB_DATA_TYPE_NCHAR: { + size_t output = 0; + if (!taosMbsToUcs4(bind->buffer, *bind->length, varDataVal(data + param->offset), param->bytes - VARSTR_HEADER_SIZE, &output)) { return TSDB_CODE_INVALID_VALUE; - } + } + varDataSetLen(data + param->offset, output); return TSDB_CODE_SUCCESS; - + } default: assert(false); return TSDB_CODE_INVALID_VALUE; @@ -383,14 +392,6 @@ static int insertStmtAddBatch(STscStmt* stmt) { return TSDB_CODE_SUCCESS; } -static int insertStmtPrepare(STscStmt* stmt) { - SSqlObj *pSql = stmt->pSql; - pSql->cmd.numOfParams = 0; - pSql->cmd.batchSize = 0; - - return tsParseInsertSql(pSql); -} - static int insertStmtReset(STscStmt* pStmt) { SSqlCmd* pCmd = &pStmt->pSql->cmd; if (pCmd->batchSize > 2) { @@ -451,14 +452,16 @@ static int insertStmtExecute(STscStmt* stmt) { pRes->qhandle = 0; + pSql->insertType = 0; + pSql->fetchFp = waitForQueryRsp; + pSql->fp = (void(*)())tscHandleMultivnodeInsert; + tscDoQuery(pSql); - // tscTrace("%p SQL result:%d, %s pObj:%p", pSql, pRes->code, taos_errstr(taos), pObj); - if (pRes->code != TSDB_CODE_SUCCESS) { - tscPartiallyFreeSqlObj(pSql); - } + // wait for the callback function to post the semaphore + tsem_wait(&pSql->rspSem); + return pSql->res.code; - return pRes->code; } //////////////////////////////////////////////////////////////////////////////// @@ -478,6 +481,7 @@ TAOS_STMT* taos_stmt_init(TAOS* taos) { tscError("failed to allocate memory for statement"); return NULL; } + pStmt->taos = pObj; SSqlObj* pSql = calloc(1, sizeof(SSqlObj)); if (pSql == NULL) { @@ -488,8 +492,10 @@ TAOS_STMT* taos_stmt_init(TAOS* taos) { } tsem_init(&pSql->rspSem, 0, 0); - pSql->signature = pSql; - pSql->pTscObj = pObj; + pSql->signature = pSql; + pSql->pTscObj = pObj; + pSql->pTscObj->pSql = pSql; + pSql->maxRetry = TSDB_MAX_REPLICA_NUM; pStmt->pSql = pSql; return pStmt; @@ -497,22 +503,55 @@ TAOS_STMT* taos_stmt_init(TAOS* taos) { int taos_stmt_prepare(TAOS_STMT* stmt, const char* sql, unsigned long length) { STscStmt* pStmt = (STscStmt*)stmt; - if (length == 0) { - length = strlen(sql); + + if (stmt == NULL || pStmt->taos == NULL || pStmt->pSql == NULL) { + terrno = TSDB_CODE_DISCONNECTED; + return TSDB_CODE_DISCONNECTED; + } + + SSqlObj* pSql = pStmt->pSql; + size_t sqlLen = strlen(sql); + + //doAsyncQuery(pObj, pSql, waitForQueryRsp, taos, sqlstr, sqlLen); + SSqlCmd *pCmd = &pSql->cmd; + SSqlRes *pRes = &pSql->res; + pSql->param = (void*)pStmt->taos; + pSql->fp = waitForQueryRsp; + pSql->insertType = TSDB_QUERY_TYPE_STMT_INSERT; + + if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE)) { + tscError("%p failed to malloc payload buffer", pSql); + return TSDB_CODE_CLI_OUT_OF_MEMORY; } - char* sqlstr = (char*)malloc(length + 1); - if (sqlstr == NULL) { - tscError("failed to malloc sql string buffer"); + + pSql->sqlstr = realloc(pSql->sqlstr, sqlLen + 1); + + if (pSql->sqlstr == NULL) { + tscError("%p failed to malloc sql string buffer", pSql); + free(pCmd->payload); return TSDB_CODE_CLI_OUT_OF_MEMORY; } - memcpy(sqlstr, sql, length); - sqlstr[length] = 0; - strtolower(sqlstr, sqlstr); + + pRes->qhandle = 0; + pRes->numOfRows = 1; + + strtolower(pSql->sqlstr, sql); + tscDump("%p SQL: %s", pSql, pSql->sqlstr); - pStmt->pSql->sqlstr = sqlstr; - if (tscIsInsertData(sqlstr)) { + if (tscIsInsertData(pSql->sqlstr)) { pStmt->isInsert = true; - return insertStmtPrepare(pStmt); + + pSql->cmd.numOfParams = 0; + pSql->cmd.batchSize = 0; + + int32_t code = tsParseSql(pSql, true); + if (code == TSDB_CODE_ACTION_IN_PROGRESS) { + // wait for the callback function to post the semaphore + tsem_wait(&pSql->rspSem); + return pSql->res.code; + } + + return code; } pStmt->isInsert = false; @@ -574,7 +613,7 @@ int taos_stmt_execute(TAOS_STMT* stmt) { } else { tfree(pStmt->pSql->sqlstr); pStmt->pSql->sqlstr = sql; - ret = taos_query_imp(pStmt->taos, pStmt->pSql); + ret = taos_query(pStmt->taos, pStmt->pSql->sqlstr); } } return ret; diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index a4cbd7f7ec16e15f38dc29fac2b87413ae13896d..a8e4a077eaf185311f1b7b2abaa8c7c81e078b32 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -264,7 +264,7 @@ int taos_query_imp(STscObj *pObj, SSqlObj *pSql) { return pRes->code; } -static void waitForQueryRsp(void *param, TAOS_RES *tres, int code) { +void waitForQueryRsp(void *param, TAOS_RES *tres, int code) { assert(param != NULL); SSqlObj *pSql = ((STscObj *)param)->pSql; diff --git a/src/inc/taosdef.h b/src/inc/taosdef.h index 2aca057ba73666f3b3c4bbf4c56ddce27ce066fe..7ec9aef2957b8837f2efd520035d49518f4877b6 100644 --- a/src/inc/taosdef.h +++ b/src/inc/taosdef.h @@ -331,6 +331,7 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size); #define TSDB_QUERY_TYPE_TAG_FILTER_QUERY 0x400u #define TSDB_QUERY_TYPE_INSERT 0x100u // insert type #define TSDB_QUERY_TYPE_MULTITABLE_QUERY 0x200u +#define TSDB_QUERY_TYPE_STMT_INSERT 0x800u // stmt insert type #define TSDB_QUERY_HAS_TYPE(x, _type) (((x) & (_type)) != 0) #define TSDB_QUERY_SET_TYPE(x, _type) ((x) |= (_type)) diff --git a/tests/examples/c/prepare.c b/tests/examples/c/prepare.c new file mode 100644 index 0000000000000000000000000000000000000000..cb45e8620d126ee280aeea586baeab01cd0dccc4 --- /dev/null +++ b/tests/examples/c/prepare.c @@ -0,0 +1,195 @@ +// TAOS standard API example. The same syntax as MySQL, but only a subet +// to compile: gcc -o prepare prepare.c -ltaos + +#include +#include +#include + +#include "taos.h" + + +void taosMsleep(int mseconds); + +int main(int argc, char *argv[]) +{ + TAOS *taos; + TAOS_RES *result; + TAOS_STMT *stmt; + + // connect to server + if (argc < 2) { + printf("please input server ip \n"); + return 0; + } + + // init TAOS + taos_init(); + + taos = taos_connect(argv[1], "root", "taosdata", NULL, 0); + if (taos == NULL) { + printf("failed to connect to db, reason:%s\n", taos_errstr(taos)); + exit(1); + } + + taos_query(taos, "drop database demo"); + if (taos_query(taos, "create database demo") != 0) { + printf("failed to create database, reason:%s\n", taos_errstr(taos)); + exit(1); + } + + taos_query(taos, "use demo"); + + + // create table + const char* sql = "create table m1 (ts timestamp, b bool, v1 tinyint, v2 smallint, v4 int, v8 bigint, f4 float, f8 double, bin binary(40), blob nchar(10))"; + if (taos_query(taos, sql) != 0) { + printf("failed to create table, reason:%s\n", taos_errstr(taos)); + exit(1); + } + + // sleep for one second to make sure table is created on data node + // taosMsleep(1000); + + // insert 10 records + struct { + int64_t ts; + int8_t b; + int8_t v1; + int16_t v2; + int32_t v4; + int64_t v8; + float f4; + double f8; + char bin[40]; + char blob[80]; + } v = {0}; + + stmt = taos_stmt_init(taos); + TAOS_BIND params[10]; + params[0].buffer_type = TSDB_DATA_TYPE_TIMESTAMP; + params[0].buffer_length = sizeof(v.ts); + params[0].buffer = &v.ts; + params[0].length = ¶ms[0].buffer_length; + params[0].is_null = NULL; + + params[1].buffer_type = TSDB_DATA_TYPE_BOOL; + params[1].buffer_length = sizeof(v.b); + params[1].buffer = &v.b; + params[1].length = ¶ms[1].buffer_length; + params[1].is_null = NULL; + + params[2].buffer_type = TSDB_DATA_TYPE_TINYINT; + params[2].buffer_length = sizeof(v.v1); + params[2].buffer = &v.v1; + params[2].length = ¶ms[2].buffer_length; + params[2].is_null = NULL; + + params[3].buffer_type = TSDB_DATA_TYPE_SMALLINT; + params[3].buffer_length = sizeof(v.v2); + params[3].buffer = &v.v2; + params[3].length = ¶ms[3].buffer_length; + params[3].is_null = NULL; + + params[4].buffer_type = TSDB_DATA_TYPE_INT; + params[4].buffer_length = sizeof(v.v4); + params[4].buffer = &v.v4; + params[4].length = ¶ms[4].buffer_length; + params[4].is_null = NULL; + + params[5].buffer_type = TSDB_DATA_TYPE_BIGINT; + params[5].buffer_length = sizeof(v.v8); + params[5].buffer = &v.v8; + params[5].length = ¶ms[5].buffer_length; + params[5].is_null = NULL; + + params[6].buffer_type = TSDB_DATA_TYPE_FLOAT; + params[6].buffer_length = sizeof(v.f4); + params[6].buffer = &v.f4; + params[6].length = ¶ms[6].buffer_length; + params[6].is_null = NULL; + + params[7].buffer_type = TSDB_DATA_TYPE_DOUBLE; + params[7].buffer_length = sizeof(v.f8); + params[7].buffer = &v.f8; + params[7].length = ¶ms[7].buffer_length; + params[7].is_null = NULL; + + params[8].buffer_type = TSDB_DATA_TYPE_BINARY; + params[8].buffer_length = sizeof(v.bin); + params[8].buffer = v.bin; + params[8].length = ¶ms[8].buffer_length; + params[8].is_null = NULL; + + strcpy(v.blob, "一二三四五六七八九十"); + params[9].buffer_type = TSDB_DATA_TYPE_NCHAR; + params[9].buffer_length = strlen(v.blob); + params[9].buffer = v.blob; + params[9].length = ¶ms[9].buffer_length; + params[9].is_null = NULL; + + int is_null = 1; + + sql = "insert into m1 values(?,?,?,?,?,?,?,?,?,?)"; + int code = taos_stmt_prepare(stmt, sql, 0); + if (code != 0){ + printf("failed to execute taos_stmt_prepare. code:0x%x\n", code); + } + v.ts = 1591060628000; + for (int i = 0; i < 10; ++i) { + v.ts += 1; + for (int j = 1; j < 10; ++j) { + params[j].is_null = ((i == j) ? &is_null : 0); + } + v.b = (int8_t)i % 2; + v.v1 = (int8_t)i; + v.v2 = (int16_t)(i * 2); + v.v4 = (int32_t)(i * 4); + v.v8 = (int64_t)(i * 8); + v.f4 = (float)(i * 40); + v.f8 = (double)(i * 80); + for (int j = 0; j < sizeof(v.bin) - 1; ++j) { + v.bin[j] = (char)(i + '0'); + } + + taos_stmt_bind_param(stmt, params); + taos_stmt_add_batch(stmt); + } + if (taos_stmt_execute(stmt) != 0) { + printf("failed to execute insert statement.\n"); + exit(1); + } + taos_stmt_close(stmt); + printf("==== success inset data ====.\n"); + + // query the records + stmt = taos_stmt_init(taos); + taos_stmt_prepare(stmt, "SELECT * FROM m1 WHERE v1 > ? AND v2 < ?", 0); + v.v1 = 5; + v.v2 = 15; + taos_stmt_bind_param(stmt, params + 2); + if (taos_stmt_execute(stmt) != 0) { + printf("failed to execute select statement.\n"); + exit(1); + } + + result = taos_stmt_use_result(stmt); + + TAOS_ROW row; + int rows = 0; + int num_fields = taos_num_fields(result); + TAOS_FIELD *fields = taos_fetch_fields(result); + char temp[256]; + + // fetch the records row by row + while ((row = taos_fetch_row(result))) { + rows++; + taos_print_row(temp, row, fields, num_fields); + printf("%s\n", temp); + } + + taos_free_result(result); + taos_stmt_close(stmt); + + return getchar(); +} +