未验证 提交 03aa20e2 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #8238 from taosdata/szhou/feature/affectedrows

add affected row to stmt api and accumulate it in schemaless internal…
...@@ -67,6 +67,8 @@ typedef struct { ...@@ -67,6 +67,8 @@ typedef struct {
SMLProtocolType protocol; SMLProtocolType protocol;
SMLTimeStampType tsType; SMLTimeStampType tsType;
SHashObj* smlDataToSchema; SHashObj* smlDataToSchema;
int64_t affectedRows;
} SSmlLinesInfo; } SSmlLinesInfo;
int tscSmlInsert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint, SSmlLinesInfo* info); int tscSmlInsert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint, SSmlLinesInfo* info);
......
...@@ -761,7 +761,7 @@ static int32_t doInsertChildTableWithStmt(TAOS* taos, char* sql, char* cTableNam ...@@ -761,7 +761,7 @@ static int32_t doInsertChildTableWithStmt(TAOS* taos, char* sql, char* cTableNam
code = taos_stmt_prepare(stmt, sql, (unsigned long)strlen(sql)); code = taos_stmt_prepare(stmt, sql, (unsigned long)strlen(sql));
if (code != 0) { if (code != 0) {
tscError("SML:0x%"PRIx64" taos_stmt_prepare return %d:%s", info->id, code, tstrerror(code)); tscError("SML:0x%"PRIx64" taos_stmt_prepare return %d:%s", info->id, code, taos_stmt_errstr(stmt));
taos_stmt_close(stmt); taos_stmt_close(stmt);
return code; return code;
} }
...@@ -771,7 +771,11 @@ static int32_t doInsertChildTableWithStmt(TAOS* taos, char* sql, char* cTableNam ...@@ -771,7 +771,11 @@ static int32_t doInsertChildTableWithStmt(TAOS* taos, char* sql, char* cTableNam
do { do {
code = taos_stmt_set_tbname(stmt, cTableName); code = taos_stmt_set_tbname(stmt, cTableName);
if (code != 0) { if (code != 0) {
tscError("SML:0x%"PRIx64" taos_stmt_set_tbname return %d:%s", info->id, code, tstrerror(code)); tscError("SML:0x%"PRIx64" taos_stmt_set_tbname return %d:%s", info->id, code, taos_stmt_errstr(stmt));
int affectedRows = taos_stmt_affected_rows(stmt);
info->affectedRows += affectedRows;
taos_stmt_close(stmt); taos_stmt_close(stmt);
return code; return code;
} }
...@@ -781,13 +785,21 @@ static int32_t doInsertChildTableWithStmt(TAOS* taos, char* sql, char* cTableNam ...@@ -781,13 +785,21 @@ static int32_t doInsertChildTableWithStmt(TAOS* taos, char* sql, char* cTableNam
TAOS_BIND* colsBinds = taosArrayGetP(batchBind, i); TAOS_BIND* colsBinds = taosArrayGetP(batchBind, i);
code = taos_stmt_bind_param(stmt, colsBinds); code = taos_stmt_bind_param(stmt, colsBinds);
if (code != 0) { if (code != 0) {
tscError("SML:0x%"PRIx64" taos_stmt_bind_param return %d:%s", info->id, code, tstrerror(code)); tscError("SML:0x%"PRIx64" taos_stmt_bind_param return %d:%s", info->id, code, taos_stmt_errstr(stmt));
int affectedRows = taos_stmt_affected_rows(stmt);
info->affectedRows += affectedRows;
taos_stmt_close(stmt); taos_stmt_close(stmt);
return code; return code;
} }
code = taos_stmt_add_batch(stmt); code = taos_stmt_add_batch(stmt);
if (code != 0) { if (code != 0) {
tscError("SML:0x%"PRIx64" taos_stmt_add_batch return %d:%s", info->id, code, tstrerror(code)); tscError("SML:0x%"PRIx64" taos_stmt_add_batch return %d:%s", info->id, code, taos_stmt_errstr(stmt));
int affectedRows = taos_stmt_affected_rows(stmt);
info->affectedRows += affectedRows;
taos_stmt_close(stmt); taos_stmt_close(stmt);
return code; return code;
} }
...@@ -795,8 +807,9 @@ static int32_t doInsertChildTableWithStmt(TAOS* taos, char* sql, char* cTableNam ...@@ -795,8 +807,9 @@ static int32_t doInsertChildTableWithStmt(TAOS* taos, char* sql, char* cTableNam
code = taos_stmt_execute(stmt); code = taos_stmt_execute(stmt);
if (code != 0) { if (code != 0) {
tscError("SML:0x%"PRIx64" taos_stmt_execute return %d:%s, try:%d", info->id, code, tstrerror(code), try); tscError("SML:0x%"PRIx64" taos_stmt_execute return %d:%s, try:%d", info->id, code, taos_stmt_errstr(stmt), try);
} }
tscDebug("SML:0x%"PRIx64" taos_stmt_execute inserted %d rows", info->id, taos_stmt_affected_rows(stmt));
tryAgain = false; tryAgain = false;
if ((code == TSDB_CODE_TDB_INVALID_TABLE_ID if ((code == TSDB_CODE_TDB_INVALID_TABLE_ID
...@@ -825,6 +838,8 @@ static int32_t doInsertChildTableWithStmt(TAOS* taos, char* sql, char* cTableNam ...@@ -825,6 +838,8 @@ static int32_t doInsertChildTableWithStmt(TAOS* taos, char* sql, char* cTableNam
} }
} while (tryAgain); } while (tryAgain);
int affectedRows = taos_stmt_affected_rows(stmt);
info->affectedRows += affectedRows;
taos_stmt_close(stmt); taos_stmt_close(stmt);
return code; return code;
...@@ -1069,6 +1084,8 @@ int tscSmlInsert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint, SSmlLine ...@@ -1069,6 +1084,8 @@ int tscSmlInsert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint, SSmlLine
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
info->affectedRows = 0;
tscDebug("SML:0x%"PRIx64" build data point schemas", info->id); tscDebug("SML:0x%"PRIx64" build data point schemas", info->id);
SArray* stableSchemas = taosArrayInit(32, sizeof(SSmlSTableSchema)); // SArray<STableColumnsSchema> SArray* stableSchemas = taosArrayInit(32, sizeof(SSmlSTableSchema)); // SArray<STableColumnsSchema>
code = buildDataPointSchemas(points, numPoint, stableSchemas, info); code = buildDataPointSchemas(points, numPoint, stableSchemas, info);
......
...@@ -78,6 +78,8 @@ typedef struct STscStmt { ...@@ -78,6 +78,8 @@ typedef struct STscStmt {
SSqlObj* pSql; SSqlObj* pSql;
SMultiTbStmt mtb; SMultiTbStmt mtb;
SNormalStmt normal; SNormalStmt normal;
int numOfRows;
} STscStmt; } STscStmt;
#define STMT_RET(c) do { \ #define STMT_RET(c) do { \
...@@ -1212,6 +1214,8 @@ static int insertStmtExecute(STscStmt* stmt) { ...@@ -1212,6 +1214,8 @@ static int insertStmtExecute(STscStmt* stmt) {
// wait for the callback function to post the semaphore // wait for the callback function to post the semaphore
tsem_wait(&pSql->rspSem); tsem_wait(&pSql->rspSem);
stmt->numOfRows += pSql->res.numOfRows;
// data block reset // data block reset
pCmd->batchSize = 0; pCmd->batchSize = 0;
for(int32_t i = 0; i < pCmd->insertParam.numOfTables; ++i) { for(int32_t i = 0; i < pCmd->insertParam.numOfTables; ++i) {
...@@ -1285,6 +1289,8 @@ static int insertBatchStmtExecute(STscStmt* pStmt) { ...@@ -1285,6 +1289,8 @@ static int insertBatchStmtExecute(STscStmt* pStmt) {
code = pStmt->pSql->res.code; code = pStmt->pSql->res.code;
pStmt->numOfRows += pStmt->pSql->res.numOfRows;
insertBatchClean(pStmt); insertBatchClean(pStmt);
return code; return code;
...@@ -1521,6 +1527,7 @@ TAOS_STMT* taos_stmt_init(TAOS* taos) { ...@@ -1521,6 +1527,7 @@ TAOS_STMT* taos_stmt_init(TAOS* taos) {
pSql->maxRetry = TSDB_MAX_REPLICA; pSql->maxRetry = TSDB_MAX_REPLICA;
pStmt->pSql = pSql; pStmt->pSql = pSql;
pStmt->last = STMT_INIT; pStmt->last = STMT_INIT;
pStmt->numOfRows = 0;
registerSqlObj(pSql); registerSqlObj(pSql);
return pStmt; return pStmt;
...@@ -1564,9 +1571,7 @@ int taos_stmt_prepare(TAOS_STMT* stmt, const char* sql, unsigned long length) { ...@@ -1564,9 +1571,7 @@ int taos_stmt_prepare(TAOS_STMT* stmt, const char* sql, unsigned long length) {
} }
pRes->qId = 0; pRes->qId = 0;
pRes->numOfRows = 1; pRes->numOfRows = 0;
registerSqlObj(pSql);
strtolower(pSql->sqlstr, sql); strtolower(pSql->sqlstr, sql);
tscDebugL("0x%"PRIx64" SQL: %s", pSql->self, pSql->sqlstr); tscDebugL("0x%"PRIx64" SQL: %s", pSql->self, pSql->sqlstr);
...@@ -1981,6 +1986,7 @@ int taos_stmt_execute(TAOS_STMT* stmt) { ...@@ -1981,6 +1986,7 @@ int taos_stmt_execute(TAOS_STMT* stmt) {
} else { } else {
taosReleaseRef(tscObjRef, pStmt->pSql->self); taosReleaseRef(tscObjRef, pStmt->pSql->self);
pStmt->pSql = taos_query((TAOS*)pStmt->taos, sql); pStmt->pSql = taos_query((TAOS*)pStmt->taos, sql);
pStmt->numOfRows += taos_affected_rows(pStmt->pSql);
ret = taos_errno(pStmt->pSql); ret = taos_errno(pStmt->pSql);
free(sql); free(sql);
} }
...@@ -1989,6 +1995,17 @@ int taos_stmt_execute(TAOS_STMT* stmt) { ...@@ -1989,6 +1995,17 @@ int taos_stmt_execute(TAOS_STMT* stmt) {
STMT_RET(ret); STMT_RET(ret);
} }
int taos_stmt_affected_rows(TAOS_STMT* stmt) {
STscStmt* pStmt = (STscStmt*)stmt;
if (pStmt == NULL) {
tscError("statement is invalid");
return 0;
}
return pStmt->numOfRows;
}
TAOS_RES *taos_stmt_use_result(TAOS_STMT* stmt) { TAOS_RES *taos_stmt_use_result(TAOS_STMT* stmt) {
if (stmt == NULL) { if (stmt == NULL) {
tscError("statement is invalid."); tscError("statement is invalid.");
......
...@@ -141,6 +141,7 @@ DLL_EXPORT int taos_stmt_bind_param_batch(TAOS_STMT* stmt, TAOS_MULTI_BIN ...@@ -141,6 +141,7 @@ DLL_EXPORT int taos_stmt_bind_param_batch(TAOS_STMT* stmt, TAOS_MULTI_BIN
DLL_EXPORT int taos_stmt_bind_single_param_batch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind, int colIdx); DLL_EXPORT int taos_stmt_bind_single_param_batch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind, int colIdx);
DLL_EXPORT int taos_stmt_add_batch(TAOS_STMT *stmt); DLL_EXPORT int taos_stmt_add_batch(TAOS_STMT *stmt);
DLL_EXPORT int taos_stmt_execute(TAOS_STMT *stmt); DLL_EXPORT int taos_stmt_execute(TAOS_STMT *stmt);
DLL_EXPORT int taos_stmt_affected_rows(TAOS_STMT *stmt);
DLL_EXPORT TAOS_RES * taos_stmt_use_result(TAOS_STMT *stmt); DLL_EXPORT TAOS_RES * taos_stmt_use_result(TAOS_STMT *stmt);
DLL_EXPORT int taos_stmt_close(TAOS_STMT *stmt); DLL_EXPORT int taos_stmt_close(TAOS_STMT *stmt);
DLL_EXPORT char * taos_stmt_errstr(TAOS_STMT *stmt); DLL_EXPORT char * taos_stmt_errstr(TAOS_STMT *stmt);
......
...@@ -184,6 +184,10 @@ void verify_prepare(TAOS* taos) { ...@@ -184,6 +184,10 @@ void verify_prepare(TAOS* taos) {
taos_stmt_close(stmt); taos_stmt_close(stmt);
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
int affectedRows = taos_stmt_affected_rows(stmt);
printf("sucessfully inserted %d rows\n", affectedRows);
taos_stmt_close(stmt); taos_stmt_close(stmt);
// query the records // query the records
...@@ -400,6 +404,9 @@ void verify_prepare2(TAOS* taos) { ...@@ -400,6 +404,9 @@ void verify_prepare2(TAOS* taos) {
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
int affectedRows = taos_stmt_affected_rows(stmt);
printf("sucessfully inserted %d rows\n", affectedRows);
taos_stmt_close(stmt); taos_stmt_close(stmt);
// query the records // query the records
...@@ -784,6 +791,10 @@ void verify_prepare3(TAOS* taos) { ...@@ -784,6 +791,10 @@ void verify_prepare3(TAOS* taos) {
taos_stmt_close(stmt); taos_stmt_close(stmt);
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
int affectedRows = taos_stmt_affected_rows(stmt);
printf("successfully inserted %d rows\n", affectedRows);
taos_stmt_close(stmt); taos_stmt_close(stmt);
// query the records // query the records
......
...@@ -120,6 +120,10 @@ int stmt_scol_func1(TAOS_STMT *stmt) { ...@@ -120,6 +120,10 @@ int stmt_scol_func1(TAOS_STMT *stmt) {
exit(1); exit(1);
} }
int affectedRows = taos_stmt_affected_rows(stmt);
if (affectedRows != 100) {
printf("failed to insert 100 rows");
}
return 0; return 0;
} }
......
...@@ -46,6 +46,7 @@ void taos_stmt_init_test() { ...@@ -46,6 +46,7 @@ void taos_stmt_init_test() {
} }
stmt = taos_stmt_init(taos); stmt = taos_stmt_init(taos);
assert(stmt != NULL); assert(stmt != NULL);
assert(taos_stmt_affected_rows(stmt) == 0);
assert(taos_stmt_close(stmt) == 0); assert(taos_stmt_close(stmt) == 0);
printf("finish taos_stmt_init test\n"); printf("finish taos_stmt_init test\n");
} }
...@@ -127,6 +128,7 @@ void taos_stmt_set_tbname_test() { ...@@ -127,6 +128,7 @@ void taos_stmt_set_tbname_test() {
assert(taos_stmt_set_tbname(stmt, name) == 0); assert(taos_stmt_set_tbname(stmt, name) == 0);
free(name); free(name);
free(stmt_sql); free(stmt_sql);
assert(taos_stmt_affected_rows(stmt) == 0);
taos_stmt_close(stmt); taos_stmt_close(stmt);
printf("finish taos_stmt_set_tbname test\n"); printf("finish taos_stmt_set_tbname test\n");
} }
...@@ -166,6 +168,7 @@ void taos_stmt_set_tbname_tags_test() { ...@@ -166,6 +168,7 @@ void taos_stmt_set_tbname_tags_test() {
free(stmt_sql); free(stmt_sql);
free(name); free(name);
free(tags); free(tags);
assert(taos_stmt_affected_rows(stmt) == 0);
taos_stmt_close(stmt); taos_stmt_close(stmt);
printf("finish taos_stmt_set_tbname_tags test\n"); printf("finish taos_stmt_set_tbname_tags test\n");
} }
...@@ -194,8 +197,10 @@ void taos_stmt_set_sub_tbname_test() { ...@@ -194,8 +197,10 @@ void taos_stmt_set_sub_tbname_test() {
assert(taos_stmt_set_sub_tbname(stmt, name) != 0); assert(taos_stmt_set_sub_tbname(stmt, name) != 0);
sprintf(name, "tb"); sprintf(name, "tb");
assert(taos_stmt_set_sub_tbname(stmt, name) == 0); assert(taos_stmt_set_sub_tbname(stmt, name) == 0);
assert(taos_stmt_affected_rows(stmt) == 0);
assert(taos_load_table_info(taos, "super, tb") == 0); assert(taos_load_table_info(taos, "super, tb") == 0);
assert(taos_stmt_set_sub_tbname(stmt, name) == 0); assert(taos_stmt_set_sub_tbname(stmt, name) == 0);
assert(taos_stmt_affected_rows(stmt) == 0);
free(name); free(name);
free(stmt_sql); free(stmt_sql);
assert(taos_stmt_close(stmt) == 0); assert(taos_stmt_close(stmt) == 0);
...@@ -238,6 +243,7 @@ void taos_stmt_bind_param_test() { ...@@ -238,6 +243,7 @@ void taos_stmt_bind_param_test() {
assert(taos_stmt_bind_param(stmt, params) != 0); assert(taos_stmt_bind_param(stmt, params) != 0);
assert(taos_stmt_set_tbname(stmt, "super") == 0); assert(taos_stmt_set_tbname(stmt, "super") == 0);
assert(taos_stmt_bind_param(stmt, params) == 0); assert(taos_stmt_bind_param(stmt, params) == 0);
assert(taos_stmt_affected_rows(stmt) == 0);
free(params); free(params);
free(stmt_sql); free(stmt_sql);
taos_stmt_close(stmt); taos_stmt_close(stmt);
...@@ -249,6 +255,7 @@ void taos_stmt_bind_single_param_batch_test() { ...@@ -249,6 +255,7 @@ void taos_stmt_bind_single_param_batch_test() {
TAOS_STMT * stmt = NULL; TAOS_STMT * stmt = NULL;
TAOS_MULTI_BIND *bind = NULL; TAOS_MULTI_BIND *bind = NULL;
assert(taos_stmt_bind_single_param_batch(stmt, bind, 0) != 0); assert(taos_stmt_bind_single_param_batch(stmt, bind, 0) != 0);
assert(taos_stmt_affected_rows(stmt) == 0);
printf("finish taos_stmt_bind_single_param_batch test\n"); printf("finish taos_stmt_bind_single_param_batch test\n");
} }
...@@ -257,6 +264,7 @@ void taos_stmt_bind_param_batch_test() { ...@@ -257,6 +264,7 @@ void taos_stmt_bind_param_batch_test() {
TAOS_STMT * stmt = NULL; TAOS_STMT * stmt = NULL;
TAOS_MULTI_BIND *bind = NULL; TAOS_MULTI_BIND *bind = NULL;
assert(taos_stmt_bind_param_batch(stmt, bind) != 0); assert(taos_stmt_bind_param_batch(stmt, bind) != 0);
assert(taos_stmt_affected_rows(stmt) == 0);
printf("finish taos_stmt_bind_param_batch test\n"); printf("finish taos_stmt_bind_param_batch test\n");
} }
...@@ -293,10 +301,14 @@ void taos_stmt_add_batch_test() { ...@@ -293,10 +301,14 @@ void taos_stmt_add_batch_test() {
params[1].length = &params[1].buffer_length; params[1].length = &params[1].buffer_length;
params[1].is_null = NULL; params[1].is_null = NULL;
assert(taos_stmt_set_tbname(stmt, "super") == 0); assert(taos_stmt_set_tbname(stmt, "super") == 0);
assert(taos_stmt_affected_rows(stmt) == 0);
assert(taos_stmt_bind_param(stmt, params) == 0); assert(taos_stmt_bind_param(stmt, params) == 0);
assert(taos_stmt_affected_rows(stmt) == 0);
assert(taos_stmt_add_batch(stmt) == 0); assert(taos_stmt_add_batch(stmt) == 0);
assert(taos_stmt_affected_rows(stmt) == 0);
free(params); free(params);
free(stmt_sql); free(stmt_sql);
assert(taos_stmt_affected_rows(stmt) == 0);
assert(taos_stmt_close(stmt) == 0); assert(taos_stmt_close(stmt) == 0);
printf("finish taos_stmt_add_batch test\n"); printf("finish taos_stmt_add_batch test\n");
} }
...@@ -317,10 +329,13 @@ void taos_stmt_execute_test() { ...@@ -317,10 +329,13 @@ void taos_stmt_execute_test() {
stmt = taos_stmt_init(taos); stmt = taos_stmt_init(taos);
assert(stmt != NULL); assert(stmt != NULL);
assert(taos_stmt_execute(stmt) != 0); assert(taos_stmt_execute(stmt) != 0);
assert(taos_stmt_affected_rows(stmt) == 0);
char *stmt_sql = calloc(1, 1000); char *stmt_sql = calloc(1, 1000);
sprintf(stmt_sql, "insert into ? values (?,?)"); sprintf(stmt_sql, "insert into ? values (?,?)");
assert(taos_stmt_prepare(stmt, stmt_sql, 0) == 0); assert(taos_stmt_prepare(stmt, stmt_sql, 0) == 0);
assert(taos_stmt_affected_rows(stmt) == 0);
assert(taos_stmt_execute(stmt) != 0); assert(taos_stmt_execute(stmt) != 0);
assert(taos_stmt_affected_rows(stmt) == 0);
TAOS_BIND *params = calloc(2, sizeof(TAOS_BIND)); TAOS_BIND *params = calloc(2, sizeof(TAOS_BIND));
int64_t ts = (int64_t)1591060628000; int64_t ts = (int64_t)1591060628000;
params[0].buffer_type = TSDB_DATA_TYPE_TIMESTAMP; params[0].buffer_type = TSDB_DATA_TYPE_TIMESTAMP;
...@@ -335,11 +350,17 @@ void taos_stmt_execute_test() { ...@@ -335,11 +350,17 @@ void taos_stmt_execute_test() {
params[1].length = &params[1].buffer_length; params[1].length = &params[1].buffer_length;
params[1].is_null = NULL; params[1].is_null = NULL;
assert(taos_stmt_set_tbname(stmt, "super") == 0); assert(taos_stmt_set_tbname(stmt, "super") == 0);
assert(taos_stmt_affected_rows(stmt) == 0);
assert(taos_stmt_execute(stmt) != 0); assert(taos_stmt_execute(stmt) != 0);
assert(taos_stmt_affected_rows(stmt) == 0);
assert(taos_stmt_bind_param(stmt, params) == 0); assert(taos_stmt_bind_param(stmt, params) == 0);
assert(taos_stmt_affected_rows(stmt) == 0);
assert(taos_stmt_execute(stmt) != 0); assert(taos_stmt_execute(stmt) != 0);
assert(taos_stmt_affected_rows(stmt) == 0);
assert(taos_stmt_add_batch(stmt) == 0); assert(taos_stmt_add_batch(stmt) == 0);
assert(taos_stmt_affected_rows(stmt) == 0);
assert(taos_stmt_execute(stmt) == 0); assert(taos_stmt_execute(stmt) == 0);
assert(taos_stmt_affected_rows(stmt) == 1);
free(params); free(params);
free(stmt_sql); free(stmt_sql);
assert(taos_stmt_close(stmt) == 0); assert(taos_stmt_close(stmt) == 0);
......
...@@ -229,6 +229,14 @@ int main(int argc, char *argv[]) { ...@@ -229,6 +229,14 @@ int main(int argc, char *argv[]) {
PRINT_SUCCESS PRINT_SUCCESS
printf("Successfully execute insert statement.\n"); printf("Successfully execute insert statement.\n");
int affectedRows = taos_stmt_affected_rows(stmt);
printf("Successfully inserted %d rows\n", affectedRows);
if (affectedRows != 10) {
PRINT_ERROR
printf("failed to insert 10 rows\n");
exit(EXIT_FAILURE);
}
taos_stmt_close(stmt); taos_stmt_close(stmt);
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
check_result(taos, i, 1); check_result(taos, i, 1);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册