diff --git a/src/client/src/tscPrepare.c b/src/client/src/tscPrepare.c index dc1bc35fc09a0852ca37493aeeef2e1692f55faf..7794e3190cb0a679def32db34b91cef41b804cd2 100644 --- a/src/client/src/tscPrepare.c +++ b/src/client/src/tscPrepare.c @@ -276,6 +276,60 @@ static char* normalStmtBuildSql(STscStmt* stmt) { return taosStringBuilderGetResult(&sb, NULL); } +static int fillColumnsNull(STableDataBlocks* pBlock, int32_t rowNum) { + SParsedDataColInfo* spd = &pBlock->boundColumnInfo; + int32_t offset = 0; + SSchema *schema = (SSchema*)pBlock->pTableMeta->schema; + + for (int32_t i = 0; i < spd->numOfCols; ++i) { + if (!spd->cols[i].hasVal) { // current column do not have any value to insert, set it to null + for (int32_t n = 0; n < rowNum; ++n) { + char *ptr = pBlock->pData + sizeof(SSubmitBlk) + pBlock->rowSize * n + offset; + + if (schema[i].type == TSDB_DATA_TYPE_BINARY) { + varDataSetLen(ptr, sizeof(int8_t)); + *(uint8_t*) varDataVal(ptr) = TSDB_DATA_BINARY_NULL; + } else if (schema[i].type == TSDB_DATA_TYPE_NCHAR) { + varDataSetLen(ptr, sizeof(int32_t)); + *(uint32_t*) varDataVal(ptr) = TSDB_DATA_NCHAR_NULL; + } else { + setNull(ptr, schema[i].type, schema[i].bytes); + } + } + } + + offset += schema[i].bytes; + } + + return TSDB_CODE_SUCCESS; +} + + +int32_t fillTablesColumnsNull(SSqlObj* pSql) { + SSqlCmd* pCmd = &pSql->cmd; + + STableDataBlocks** p = taosHashIterate(pCmd->pTableBlockHashList, NULL); + + STableDataBlocks* pOneTableBlock = *p; + while(pOneTableBlock) { + SSubmitBlk* pBlocks = (SSubmitBlk*) pOneTableBlock->pData; + if (pBlocks->numOfRows > 0 && pOneTableBlock->boundColumnInfo.numOfBound < pOneTableBlock->boundColumnInfo.numOfCols) { + fillColumnsNull(pOneTableBlock, pBlocks->numOfRows); + } + + p = taosHashIterate(pCmd->pTableBlockHashList, p); + if (p == NULL) { + break; + } + + pOneTableBlock = *p; + } + + return TSDB_CODE_SUCCESS; +} + + + //////////////////////////////////////////////////////////////////////////////// // functions for insertion statement preparation static int doBindParam(STableDataBlocks* pBlock, char* data, SParamInfo* param, TAOS_BIND* bind, int32_t colNum) { @@ -1027,6 +1081,8 @@ static int insertStmtExecute(STscStmt* stmt) { pBlk->uid = pTableMeta->id.uid; pBlk->tid = pTableMeta->id.tid; + fillTablesColumnsNull(stmt->pSql); + int code = tscMergeTableDataBlocks(stmt->pSql, false); if (code != TSDB_CODE_SUCCESS) { return code; @@ -1120,10 +1176,15 @@ static int insertBatchStmtExecute(STscStmt* pStmt) { pStmt->pSql->retry = pStmt->pSql->maxRetry + 1; //no retry - if (taosHashGetSize(pStmt->pSql->cmd.pTableBlockHashList) > 0) { // merge according to vgId - if ((code = tscMergeTableDataBlocks(pStmt->pSql, false)) != TSDB_CODE_SUCCESS) { - return code; - } + if (taosHashGetSize(pStmt->pSql->cmd.pTableBlockHashList) <= 0) { // merge according to vgId + tscError("0x%"PRIx64" no data block to insert", pStmt->pSql->self); + return TSDB_CODE_TSC_APP_ERROR; + } + + fillTablesColumnsNull(pStmt->pSql); + + if ((code = tscMergeTableDataBlocks(pStmt->pSql, false)) != TSDB_CODE_SUCCESS) { + return code; } code = tscHandleMultivnodeInsert(pStmt->pSql); diff --git a/tests/script/api/batchprepare.c b/tests/script/api/batchprepare.c index 8f1337486e736cbb7f322609b8bf7ab71d1eb693..db48632cb7814229db4a973cc6b41e00d6e8ada4 100644 --- a/tests/script/api/batchprepare.c +++ b/tests/script/api/batchprepare.c @@ -28,6 +28,445 @@ unsigned long long getCurrentTime(){ + + +int stmt_scol_func1(TAOS_STMT *stmt) { + struct { + int64_t ts; + int8_t b; + int8_t v1; + int16_t v2; + int32_t v4; + int64_t v8; + float f4; + double f8; + char bin[40]; + char blob[80]; + } v = {0}; + + TAOS_BIND params[10]; + params[0].buffer_type = TSDB_DATA_TYPE_TIMESTAMP; + params[0].buffer_length = sizeof(v.ts); + params[0].buffer = &v.ts; + params[0].length = ¶ms[0].buffer_length; + params[0].is_null = NULL; + + params[1].buffer_type = TSDB_DATA_TYPE_TINYINT; + params[1].buffer_length = sizeof(v.v1); + params[1].buffer = &v.v1; + params[1].length = ¶ms[1].buffer_length; + params[1].is_null = NULL; + + params[2].buffer_type = TSDB_DATA_TYPE_SMALLINT; + params[2].buffer_length = sizeof(v.v2); + params[2].buffer = &v.v2; + params[2].length = ¶ms[2].buffer_length; + params[2].is_null = NULL; + + params[3].buffer_type = TSDB_DATA_TYPE_FLOAT; + params[3].buffer_length = sizeof(v.f4); + params[3].buffer = &v.f4; + params[3].length = ¶ms[3].buffer_length; + params[3].is_null = NULL; + + params[4].buffer_type = TSDB_DATA_TYPE_BINARY; + params[4].buffer_length = sizeof(v.bin); + params[4].buffer = v.bin; + params[4].length = ¶ms[4].buffer_length; + params[4].is_null = NULL; + + params[5].buffer_type = TSDB_DATA_TYPE_BINARY; + params[5].buffer_length = sizeof(v.bin); + params[5].buffer = v.bin; + params[5].length = ¶ms[5].buffer_length; + params[5].is_null = NULL; + + char *sql = "insert into ? (ts, v1,v2,f4,bin,bin2) values(?,?,?,?,?,?)"; + int code = taos_stmt_prepare(stmt, sql, 0); + if (code != 0){ + printf("failed to execute taos_stmt_prepare. code:0x%x\n", code); + } + + for (int zz = 0; zz < 10; zz++) { + char buf[32]; + sprintf(buf, "m%d", zz); + code = taos_stmt_set_tbname(stmt, buf); + if (code != 0){ + printf("failed to execute taos_stmt_set_tbname. code:0x%x\n", code); + exit(1); + } + v.ts = 1591060628000 + zz * 10; + for (int i = 0; i < 10; ++i) { + v.ts += 1; + + v.b = (int8_t)(i+zz*10) % 2; + v.v1 = (int8_t)(i+zz*10); + v.v2 = (int16_t)((i+zz*10) * 2); + v.v4 = (int32_t)((i+zz*10) * 4); + v.v8 = (int64_t)((i+zz*10) * 8); + v.f4 = (float)((i+zz*10) * 40); + v.f8 = (double)((i+zz*10) * 80); + for (int j = 0; j < sizeof(v.bin) - 1; ++j) { + v.bin[j] = (char)((i)%10 + '0'); + } + + taos_stmt_bind_param(stmt, params); + taos_stmt_add_batch(stmt); + } + } + + if (taos_stmt_execute(stmt) != 0) { + printf("failed to execute insert statement.\n"); + exit(1); + } + + return 0; +} + + + +int stmt_scol_func2(TAOS_STMT *stmt) { + struct { + int64_t ts; + int8_t b; + int8_t v1; + int16_t v2; + int32_t v4; + int64_t v8; + float f4; + double f8; + char bin[40]; + char blob[80]; + } v = {0}; + + TAOS_BIND params[10]; + params[0].buffer_type = TSDB_DATA_TYPE_TIMESTAMP; + params[0].buffer_length = sizeof(v.ts); + params[0].buffer = &v.ts; + params[0].length = ¶ms[0].buffer_length; + params[0].is_null = NULL; + + params[1].buffer_type = TSDB_DATA_TYPE_TINYINT; + params[1].buffer_length = sizeof(v.v1); + params[1].buffer = &v.v1; + params[1].length = ¶ms[1].buffer_length; + params[1].is_null = NULL; + + params[2].buffer_type = TSDB_DATA_TYPE_SMALLINT; + params[2].buffer_length = sizeof(v.v2); + params[2].buffer = &v.v2; + params[2].length = ¶ms[2].buffer_length; + params[2].is_null = NULL; + + params[3].buffer_type = TSDB_DATA_TYPE_FLOAT; + params[3].buffer_length = sizeof(v.f4); + params[3].buffer = &v.f4; + params[3].length = ¶ms[3].buffer_length; + params[3].is_null = NULL; + + params[4].buffer_type = TSDB_DATA_TYPE_BINARY; + params[4].buffer_length = sizeof(v.bin); + params[4].buffer = v.bin; + params[4].length = ¶ms[4].buffer_length; + params[4].is_null = NULL; + + params[5].buffer_type = TSDB_DATA_TYPE_BINARY; + params[5].buffer_length = sizeof(v.bin); + params[5].buffer = v.bin; + params[5].length = ¶ms[5].buffer_length; + params[5].is_null = NULL; + + char *sql = "insert into m0 (ts, v1,v2,f4,bin,bin2) values(?,?,?,?,?,?)"; + int code = taos_stmt_prepare(stmt, sql, 0); + if (code != 0){ + printf("failed to execute taos_stmt_prepare. code:0x%x\n", code); + } + + for (int zz = 0; zz < 10; zz++) { + v.ts = 1591060628000 + zz * 10; + for (int i = 0; i < 10; ++i) { + v.ts += 1; + + v.b = (int8_t)(i+zz*10) % 2; + v.v1 = (int8_t)(i+zz*10); + v.v2 = (int16_t)((i+zz*10) * 2); + v.v4 = (int32_t)((i+zz*10) * 4); + v.v8 = (int64_t)((i+zz*10) * 8); + v.f4 = (float)((i+zz*10) * 40); + v.f8 = (double)((i+zz*10) * 80); + for (int j = 0; j < sizeof(v.bin) - 1; ++j) { + v.bin[j] = (char)((i)%10 + '0'); + } + + taos_stmt_bind_param(stmt, params); + taos_stmt_add_batch(stmt); + } + } + + if (taos_stmt_execute(stmt) != 0) { + printf("failed to execute insert statement.\n"); + exit(1); + } + + return 0; +} + + + + +//300 tables 60 records +int stmt_scol_func3(TAOS_STMT *stmt) { + struct { + int64_t *ts; + int8_t b[60]; + int8_t v1[60]; + int16_t v2[60]; + int32_t v4[60]; + int64_t v8[60]; + float f4[60]; + double f8[60]; + char bin[60][40]; + } v = {0}; + + v.ts = malloc(sizeof(int64_t) * 900000 * 60); + + int *lb = malloc(60 * sizeof(int)); + + TAOS_MULTI_BIND *params = calloc(1, sizeof(TAOS_MULTI_BIND) * 900000*10); + char* is_null = malloc(sizeof(char) * 60); + char* no_null = malloc(sizeof(char) * 60); + + for (int i = 0; i < 60; ++i) { + lb[i] = 40; + no_null[i] = 0; + is_null[i] = (i % 10 == 2) ? 1 : 0; + v.b[i] = (int8_t)(i % 2); + v.v1[i] = (int8_t)((i+1) % 2); + v.v2[i] = (int16_t)i; + v.v4[i] = (int32_t)(i+1); + v.v8[i] = (int64_t)(i+2); + v.f4[i] = (float)(i+3); + v.f8[i] = (double)(i+4); + memset(v.bin[i], '0'+i%10, 40); + } + + for (int i = 0; i < 9000000; i+=10) { + params[i+0].buffer_type = TSDB_DATA_TYPE_TIMESTAMP; + params[i+0].buffer_length = sizeof(int64_t); + params[i+0].buffer = &v.ts[10*i/10]; + params[i+0].length = NULL; + params[i+0].is_null = no_null; + params[i+0].num = 10; + + params[i+1].buffer_type = TSDB_DATA_TYPE_TINYINT; + params[i+1].buffer_length = sizeof(int8_t); + params[i+1].buffer = v.v1; + params[i+1].length = NULL; + params[i+1].is_null = no_null; + params[i+1].num = 10; + + params[i+2].buffer_type = TSDB_DATA_TYPE_SMALLINT; + params[i+2].buffer_length = sizeof(int16_t); + params[i+2].buffer = v.v2; + params[i+2].length = NULL; + params[i+2].is_null = no_null; + params[i+2].num = 10; + + params[i+3].buffer_type = TSDB_DATA_TYPE_FLOAT; + params[i+3].buffer_length = sizeof(float); + params[i+3].buffer = v.f4; + params[i+3].length = NULL; + params[i+3].is_null = no_null; + params[i+3].num = 10; + + params[i+4].buffer_type = TSDB_DATA_TYPE_BINARY; + params[i+4].buffer_length = 40; + params[i+4].buffer = v.bin; + params[i+4].length = lb; + params[i+4].is_null = no_null; + params[i+4].num = 10; + + params[i+5].buffer_type = TSDB_DATA_TYPE_BINARY; + params[i+5].buffer_length = 40; + params[i+5].buffer = v.bin; + params[i+5].length = lb; + params[i+5].is_null = no_null; + params[i+5].num = 10; + + } + + int64_t tts = 1591060628000; + for (int i = 0; i < 54000000; ++i) { + v.ts[i] = tts + i; + } + + unsigned long long starttime = getCurrentTime(); + + char *sql = "insert into ? (ts, v1,v2,f4,bin,bin2) values(?,?,?,?,?,?)"; + int code = taos_stmt_prepare(stmt, sql, 0); + if (code != 0){ + printf("failed to execute taos_stmt_prepare. code:0x%x\n", code); + } + + int id = 0; + for (int l = 0; l < 2; l++) { + for (int zz = 0; zz < 300; zz++) { + char buf[32]; + sprintf(buf, "m%d", zz); + code = taos_stmt_set_tbname(stmt, buf); + if (code != 0){ + printf("failed to execute taos_stmt_set_tbname. code:0x%x\n", code); + } + + taos_stmt_bind_param_batch(stmt, params + id * 10); + taos_stmt_add_batch(stmt); + } + + if (taos_stmt_execute(stmt) != 0) { + printf("failed to execute insert statement.\n"); + exit(1); + } + + ++id; + } + + unsigned long long endtime = getCurrentTime(); + printf("insert total %d records, used %u seconds, avg:%u useconds\n", 3000*300*60, (endtime-starttime)/1000000UL, (endtime-starttime)/(3000*300*60)); + + free(v.ts); + free(lb); + free(params); + free(is_null); + free(no_null); + + return 0; +} + + + +//10 tables 10 records single column bind +int stmt_scol_func4(TAOS_STMT *stmt) { + struct { + int64_t *ts; + int8_t b[60]; + int8_t v1[60]; + int16_t v2[60]; + int32_t v4[60]; + int64_t v8[60]; + float f4[60]; + double f8[60]; + char bin[60][40]; + } v = {0}; + + v.ts = malloc(sizeof(int64_t) * 1000 * 60); + + int *lb = malloc(60 * sizeof(int)); + + TAOS_MULTI_BIND *params = calloc(1, sizeof(TAOS_MULTI_BIND) * 1000*10); + char* is_null = malloc(sizeof(char) * 60); + char* no_null = malloc(sizeof(char) * 60); + + for (int i = 0; i < 60; ++i) { + lb[i] = 40; + no_null[i] = 0; + is_null[i] = (i % 10 == 2) ? 1 : 0; + v.b[i] = (int8_t)(i % 2); + v.v1[i] = (int8_t)((i+1) % 2); + v.v2[i] = (int16_t)i; + v.v4[i] = (int32_t)(i+1); + v.v8[i] = (int64_t)(i+2); + v.f4[i] = (float)(i+3); + v.f8[i] = (double)(i+4); + memset(v.bin[i], '0'+i%10, 40); + } + + for (int i = 0; i < 10000; i+=10) { + params[i+0].buffer_type = TSDB_DATA_TYPE_TIMESTAMP; + params[i+0].buffer_length = sizeof(int64_t); + params[i+0].buffer = &v.ts[10*i/10]; + params[i+0].length = NULL; + params[i+0].is_null = no_null; + params[i+0].num = 2; + + params[i+1].buffer_type = TSDB_DATA_TYPE_BOOL; + params[i+1].buffer_length = sizeof(int8_t); + params[i+1].buffer = v.b; + params[i+1].length = NULL; + params[i+1].is_null = no_null; + params[i+1].num = 2; + + params[i+2].buffer_type = TSDB_DATA_TYPE_INT; + params[i+2].buffer_length = sizeof(int32_t); + params[i+2].buffer = v.v4; + params[i+2].length = NULL; + params[i+2].is_null = no_null; + params[i+2].num = 2; + + params[i+3].buffer_type = TSDB_DATA_TYPE_BIGINT; + params[i+3].buffer_length = sizeof(int64_t); + params[i+3].buffer = v.v8; + params[i+3].length = NULL; + params[i+3].is_null = no_null; + params[i+3].num = 2; + + params[i+4].buffer_type = TSDB_DATA_TYPE_DOUBLE; + params[i+4].buffer_length = sizeof(double); + params[i+4].buffer = v.f8; + params[i+4].length = NULL; + params[i+4].is_null = no_null; + params[i+4].num = 2; + } + + int64_t tts = 1591060628000; + for (int i = 0; i < 60000; ++i) { + v.ts[i] = tts + i; + } + + unsigned long long starttime = getCurrentTime(); + + char *sql = "insert into ? (ts,b,v4,v8,f8) values(?,?,?,?,?)"; + int code = taos_stmt_prepare(stmt, sql, 0); + if (code != 0){ + printf("failed to execute taos_stmt_prepare. code:0x%x\n", code); + } + + int id = 0; + for (int l = 0; l < 10; l++) { + for (int zz = 0; zz < 10; zz++) { + char buf[32]; + sprintf(buf, "m%d", zz); + code = taos_stmt_set_tbname(stmt, buf); + if (code != 0){ + printf("failed to execute taos_stmt_set_tbname. code:0x%x\n", code); + } + + for (int col=0; col < 10; ++col) { + taos_stmt_bind_single_param_batch(stmt, params + id++, col); + } + + taos_stmt_add_batch(stmt); + } + + if (taos_stmt_execute(stmt) != 0) { + printf("failed to execute insert statement.\n"); + exit(1); + } + } + + unsigned long long endtime = getCurrentTime(); + printf("insert total %d records, used %u seconds, avg:%u useconds\n", 3000*300*60, (endtime-starttime)/1000000UL, (endtime-starttime)/(3000*300*60)); + + free(v.ts); + free(lb); + free(params); + free(is_null); + free(no_null); + + return 0; +} + + + int stmt_func1(TAOS_STMT *stmt) { struct { int64_t ts; @@ -2201,6 +2640,90 @@ void* runcase(void *par) { (void)idx; +#if 0 + prepare(taos, 1); + + stmt = taos_stmt_init(taos); + + printf("10t+10records+specifycol start\n"); + stmt_scol_func1(stmt); + printf("10t+10records+specifycol end\n"); + printf("check result start\n"); + check_result(taos, "m0", 1, 10); + check_result(taos, "m1", 1, 10); + check_result(taos, "m2", 1, 10); + check_result(taos, "m3", 1, 10); + check_result(taos, "m4", 1, 10); + check_result(taos, "m5", 1, 10); + check_result(taos, "m6", 1, 10); + check_result(taos, "m7", 1, 10); + check_result(taos, "m8", 1, 10); + check_result(taos, "m9", 1, 10); + printf("check result end\n"); + taos_stmt_close(stmt); +#endif + + +#if 0 + prepare(taos, 1); + + stmt = taos_stmt_init(taos); + + printf("1t+100records+specifycol start\n"); + stmt_scol_func2(stmt); + printf("1t+100records+specifycol end\n"); + printf("check result start\n"); + check_result(taos, "m0", 1, 100); + printf("check result end\n"); + taos_stmt_close(stmt); +#endif + + +#if 0 + prepare(taos, 1); + + stmt = taos_stmt_init(taos); + + printf("300t+10r+bm+specifycol start\n"); + stmt_scol_func3(stmt); + printf("300t+10r+bm+specifycol end\n"); + printf("check result start\n"); + check_result(taos, "m0", 1, 20); + check_result(taos, "m1", 1, 20); + check_result(taos, "m111", 1, 20); + check_result(taos, "m223", 1, 20); + check_result(taos, "m299", 1, 20); + printf("check result end\n"); + taos_stmt_close(stmt); + +#endif + +#if 1 + prepare(taos, 1); + + stmt = taos_stmt_init(taos); + + printf("10t+2r+bm+specifycol start\n"); + stmt_scol_func4(stmt); + printf("10t+2r+bm+specifycol end\n"); + printf("check result start\n"); + check_result(taos, "m0", 1, 20); + check_result(taos, "m1", 1, 20); + check_result(taos, "m2", 1, 20); + check_result(taos, "m3", 1, 20); + check_result(taos, "m4", 1, 20); + check_result(taos, "m5", 1, 20); + check_result(taos, "m6", 1, 20); + check_result(taos, "m7", 1, 20); + check_result(taos, "m8", 1, 20); + check_result(taos, "m9", 1, 20); + printf("check result end\n"); + taos_stmt_close(stmt); + + return NULL; +#endif + + #if 1 prepare(taos, 1);