diff --git a/src/client/src/tscParseLineProtocol.c b/src/client/src/tscParseLineProtocol.c index 9401694897f8a66c05db6a07db3e90b01a5457e0..1bad1c72409baa9921a096e500c71a2e1a172ad4 100644 --- a/src/client/src/tscParseLineProtocol.c +++ b/src/client/src/tscParseLineProtocol.c @@ -32,6 +32,8 @@ typedef struct { uint8_t type; int16_t length; char* value; + + uint32_t fieldSchemaIdx; } TAOS_SML_KV; typedef struct { @@ -44,6 +46,8 @@ typedef struct { // first kv must be timestamp TAOS_SML_KV* fields; int32_t fieldNum; + + uint32_t schemaIdx; } TAOS_SML_DATA_POINT; typedef enum { @@ -56,7 +60,6 @@ typedef enum { typedef struct { uint64_t id; - SHashObj* smlDataToSchema; } SSmlLinesInfo; //================================================================================================= @@ -175,8 +178,7 @@ static int32_t buildSmlKvSchema(TAOS_SML_KV* smlKv, SHashObj* hash, SArray* arra taosHashPut(hash, field.name, tagKeyLen, &fieldIdx, sizeof(fieldIdx)); } - uintptr_t valPointer = (uintptr_t)smlKv; - taosHashPut(info->smlDataToSchema, &valPointer, sizeof(uintptr_t), &fieldIdx, sizeof(fieldIdx)); + smlKv->fieldSchemaIdx = (uint32_t)fieldIdx; return 0; } @@ -270,8 +272,7 @@ static int32_t buildDataPointSchemas(TAOS_SML_DATA_POINT* points, int numPoint, } } - uintptr_t valPointer = (uintptr_t)point; - taosHashPut(info->smlDataToSchema, &valPointer, sizeof(uintptr_t), &stableIdx, sizeof(stableIdx)); + point->schemaIdx = (uint32_t)stableIdx; } size_t numStables = taosArrayGetSize(stableSchemas); @@ -598,19 +599,19 @@ static int32_t retrieveTableMeta(TAOS* taos, char* tableName, STableMeta** pTabl if (tscValidateName(&tableToken) != TSDB_CODE_SUCCESS) { code = TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH; sprintf(pSql->cmd.payload, "table name is invalid"); - tscFreeRegisteredSqlObj(pSql); + taosReleaseRef(tscObjRef, pSql->self); return code; } SName sname = {0}; if ((code = tscSetTableFullName(&sname, &tableToken, pSql)) != TSDB_CODE_SUCCESS) { - tscFreeRegisteredSqlObj(pSql); + taosReleaseRef(tscObjRef, pSql->self); return code; } char fullTableName[TSDB_TABLE_FNAME_LEN] = {0}; memset(fullTableName, 0, tListLen(fullTableName)); tNameExtractFullName(&sname, fullTableName); - tscFreeRegisteredSqlObj(pSql); + taosReleaseRef(tscObjRef, pSql->self); size_t size = 0; taosHashGetCloneExt(tscTableMetaMap, fullTableName, strlen(fullTableName), NULL, (void**)&tableMeta, &size); @@ -884,19 +885,20 @@ static int32_t insertChildTableBatch(TAOS* taos, char* cTableName, SArray* cols snprintf(sql + strlen(sql)-1, freeBytes-strlen(sql)+1, ")"); sql[strlen(sql)] = '\0'; - tscDebug("SML:0x%"PRIx64" insert rows into child table %s. num of rows: %zu", info->id, cTableName, taosArrayGetSize(rowsBind)); - size_t rows = taosArrayGetSize(rowsBind); size_t maxBatchSize = TSDB_MAX_WAL_SIZE/rowSize * 4 / 5; size_t batchSize = MIN(maxBatchSize, rows); + tscDebug("SML:0x%"PRIx64" insert rows into child table %s. num of rows: %zu, batch size: %zu", + info->id, cTableName, rows, batchSize); SArray* batchBind = taosArrayInit(batchSize, POINTER_BYTES); int32_t code = TSDB_CODE_SUCCESS; - for (int i=0; i=i) { + if (j > i) { + tscDebug("SML:0x%"PRIx64" insert child table batch from line %d to line %d.", info->id, i, j - 1); code = doInsertChildTableWithStmt(taos, sql, cTableName, batchBind, info); if (code != 0) { taosArrayDestroy(batchBind); @@ -916,10 +918,7 @@ static int32_t arrangePointsByChildTableName(TAOS_SML_DATA_POINT* points, int nu SHashObj* cname2points, SArray* stableSchemas, SSmlLinesInfo* info) { for (int32_t i = 0; i < numPoints; ++i) { TAOS_SML_DATA_POINT * point = points + i; - uintptr_t valPointer = (uintptr_t)point; - size_t* pSchemaIndex = taosHashGet(info->smlDataToSchema, &valPointer, sizeof(uintptr_t)); - assert(pSchemaIndex != NULL); - SSmlSTableSchema* stableSchema = taosArrayGet(stableSchemas, *pSchemaIndex); + SSmlSTableSchema* stableSchema = taosArrayGet(stableSchemas, point->schemaIdx); for (int j = 0; j < point->tagNum; ++j) { TAOS_SML_KV* kv = point->tags + j; @@ -963,10 +962,7 @@ static int32_t applyChildTableTags(TAOS* taos, char* cTableName, char* sTableNam TAOS_SML_DATA_POINT * pDataPoint = taosArrayGetP(cTablePoints, i); for (int j = 0; j < pDataPoint->tagNum; ++j) { TAOS_SML_KV* kv = pDataPoint->tags + j; - uintptr_t valPointer = (uintptr_t)kv; - size_t* pFieldSchemaIdx = taosHashGet(info->smlDataToSchema, &valPointer, sizeof(uintptr_t)); - assert(pFieldSchemaIdx != NULL); - tagKVs[*pFieldSchemaIdx] = kv; + tagKVs[kv->fieldSchemaIdx] = kv; } } @@ -980,10 +976,7 @@ static int32_t applyChildTableTags(TAOS* taos, char* cTableName, char* sTableNam for (int j = 0; j < numTags; ++j) { if (tagKVs[j] == NULL) continue; TAOS_SML_KV* kv = tagKVs[j]; - uintptr_t valPointer = (uintptr_t)kv; - size_t* pFieldSchemaIdx = taosHashGet(info->smlDataToSchema, &valPointer, sizeof(uintptr_t)); - assert(pFieldSchemaIdx != NULL); - TAOS_BIND* bind = taosArrayGet(tagBinds, *pFieldSchemaIdx); + TAOS_BIND* bind = taosArrayGet(tagBinds, kv->fieldSchemaIdx); bind->buffer_type = kv->type; bind->length = malloc(sizeof(uintptr_t*)); *bind->length = kv->length; @@ -1026,10 +1019,7 @@ static int32_t applyChildTableFields(TAOS* taos, SSmlSTableSchema* sTableSchema, } for (int j = 0; j < point->fieldNum; ++j) { TAOS_SML_KV* kv = point->fields + j; - uintptr_t valPointer = (uintptr_t)kv; - size_t* pFieldSchemaIdx = taosHashGet(info->smlDataToSchema, &valPointer, sizeof(uintptr_t)); - assert(pFieldSchemaIdx != NULL); - TAOS_BIND* bind = colBinds + *pFieldSchemaIdx; + TAOS_BIND* bind = colBinds + kv->fieldSchemaIdx; bind->buffer_type = kv->type; bind->length = malloc(sizeof(uintptr_t*)); *bind->length = kv->length; @@ -1067,10 +1057,7 @@ static int32_t applyDataPoints(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t SArray* cTablePoints = *pCTablePoints; TAOS_SML_DATA_POINT* point = taosArrayGetP(cTablePoints, 0); - uintptr_t valPointer = (uintptr_t)point; - size_t* pSchemaIndex = taosHashGet(info->smlDataToSchema, &valPointer, sizeof(uintptr_t)); - assert(pSchemaIndex != NULL); - SSmlSTableSchema* sTableSchema = taosArrayGet(stableSchemas, *pSchemaIndex); + SSmlSTableSchema* sTableSchema = taosArrayGet(stableSchemas, point->schemaIdx); tscDebug("SML:0x%"PRIx64" apply child table tags. child table: %s", info->id, point->childTableName); code = applyChildTableTags(taos, point->childTableName, point->stableName, sTableSchema, cTablePoints, info); @@ -1113,7 +1100,6 @@ int tscSmlInsert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint, SSmlLine tscDebug("SML:0x%"PRIx64" taos_sml_insert. number of points: %d", info->id, numPoint); int32_t code = TSDB_CODE_SUCCESS; - info->smlDataToSchema = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, false); tscDebug("SML:0x%"PRIx64" build data point schemas", info->id); SArray* stableSchemas = taosArrayInit(32, sizeof(SSmlSTableSchema)); // SArray @@ -1143,11 +1129,10 @@ clean_up: taosArrayDestroy(schema->tags); } taosArrayDestroy(stableSchemas); - taosHashCleanup(info->smlDataToSchema); return code; } -int taos_sml_insert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint) { +int tsc_sml_insert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint) { SSmlLinesInfo* info = calloc(1, sizeof(SSmlLinesInfo)); info->id = genLinesSmlId(); int code = tscSmlInsert(taos, points, numPoint, info); diff --git a/src/client/src/tscPrepare.c b/src/client/src/tscPrepare.c index 6ade48cb196b2e50fd9ba8f811f0a7e9a8930484..6fb5b3c8d1116674937bb5930b3082ced5cd4485 100644 --- a/src/client/src/tscPrepare.c +++ b/src/client/src/tscPrepare.c @@ -1567,8 +1567,6 @@ int taos_stmt_prepare(TAOS_STMT* stmt, const char* sql, unsigned long length) { pRes->qId = 0; pRes->numOfRows = 1; - registerSqlObj(pSql); - strtolower(pSql->sqlstr, sql); tscDebugL("0x%"PRIx64" SQL: %s", pSql->self, pSql->sqlstr); diff --git a/tests/examples/c/schemaless.c b/tests/examples/c/schemaless.c index 830e59880198b0ef2c21cd78448fd6cb80f98b2f..43e9e1697810968dbeef195343924555caf8a986 100644 --- a/tests/examples/c/schemaless.c +++ b/tests/examples/c/schemaless.c @@ -8,8 +8,9 @@ #include #include +int numThreads = 8; int numSuperTables = 8; -int numChildTables = 4; +int numChildTables = 4; // per thread, per super table int numRowsPerChildTable = 2048; void shuffle(char** lines, size_t n) { @@ -24,12 +25,39 @@ void shuffle(char** lines, size_t n) { } } +void printThreadId(pthread_t id, char* buf) +{ + size_t i; + for (i = sizeof(i); i; --i) + sprintf(buf + strlen(buf), "%02x", *(((unsigned char*) &id) + i - 1)); +} + static int64_t getTimeInUs() { struct timeval systemTime; gettimeofday(&systemTime, NULL); return (int64_t)systemTime.tv_sec * 1000000L + (int64_t)systemTime.tv_usec; } +typedef struct { + TAOS* taos; + char** lines; + int numLines; + int64_t costTime; +} SThreadInsertArgs; + +static void* insertLines(void* args) { + SThreadInsertArgs* insertArgs = (SThreadInsertArgs*) args; + char tidBuf[32] = {0}; + printThreadId(pthread_self(), tidBuf); + printf("%s, thread: 0x%s\n", "begin taos_insert_lines", tidBuf); + int64_t begin = getTimeInUs(); + int32_t code = taos_insert_lines(insertArgs->taos, insertArgs->lines, insertArgs->numLines); + int64_t end = getTimeInUs(); + insertArgs->costTime = end-begin; + printf("code: %d, %s. time used:%"PRId64", thread: 0x%s\n", code, tstrerror(code), end - begin, tidBuf); + return NULL; +} + int main(int argc, char* argv[]) { TAOS_RES* result; const char* host = "127.0.0.1"; @@ -60,25 +88,88 @@ int main(int argc, char* argv[]) { int64_t ts = ct * 1000; char* lineFormat = "sta%d,t0=true,t1=127i8,t2=32767i16,t3=%di32,t4=9223372036854775807i64,t9=11.12345f32,t10=22.123456789f64,t11=\"binaryTagValue\",t12=L\"ncharTagValue\" c0=true,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=254u8,c6=32770u16,c7=2147483699u32,c8=9223372036854775899u64,c9=11.12345f32,c10=22.123456789f64,c11=\"binaryValue\",c12=L\"ncharValue\" %lldms"; - char** lines = calloc(numSuperTables * numChildTables * numRowsPerChildTable, sizeof(char*)); - int l = 0; - for (int i = 0; i < numSuperTables; ++i) { - for (int j = 0; j < numChildTables; ++j) { - for (int k = 0; k < numRowsPerChildTable; ++k) { - char* line = calloc(512, 1); - snprintf(line, 512, lineFormat, i, j, ts + 10 * l); - lines[l] = line; - ++l; + { + char** linesStb = calloc(numSuperTables, sizeof(char*)); + for (int i = 0; i < numSuperTables; i++) { + char* lineStb = calloc(512, 1); + snprintf(lineStb, 512, lineFormat, i, + numThreads * numSuperTables * numChildTables, + ts + numThreads * numSuperTables * numChildTables * numRowsPerChildTable); + linesStb[i] = lineStb; + } + SThreadInsertArgs args = {0}; + args.taos = taos; + args.lines = linesStb; + args.numLines = numSuperTables; + insertLines(&args); + for (int i = 0; i < numSuperTables; ++i) { + free(linesStb[i]); + } + free(linesStb); + } + + printf("generate lines...\n"); + char*** linesThread = calloc(numThreads, sizeof(char**)); + for (int i = 0; i < numThreads; ++i) { + char** lines = calloc(numSuperTables * numChildTables * numRowsPerChildTable, sizeof(char*)); + linesThread[i] = lines; + } + + for (int t = 0; t < numThreads; ++t) { + int l = 0; + char** lines = linesThread[t]; + for (int i = 0; i < numSuperTables; ++i) { + for (int j = 0; j < numChildTables; ++j) { + for (int k = 0; k < numRowsPerChildTable; ++k) { + int stIdx = i; + int ctIdx = t*numSuperTables*numChildTables + j; + char* line = calloc(512, 1); + snprintf(line, 512, lineFormat, stIdx, ctIdx, ts + 10 * l); + lines[l] = line; + ++l; + } } } } - //shuffle(lines, numSuperTables * numChildTables * numRowsPerChildTable); - printf("%s\n", "begin taos_insert_lines"); - int64_t begin = getTimeInUs(); - int32_t code = taos_insert_lines(taos, lines, numSuperTables * numChildTables * numRowsPerChildTable); - int64_t end = getTimeInUs(); - printf("code: %d, %s. time used: %" PRId64 "\n", code, tstrerror(code), end - begin); + printf("shuffle lines...\n"); + for (int t = 0; t < numThreads; ++t) { + shuffle(linesThread[t], numSuperTables * numChildTables * numRowsPerChildTable); + } + + printf("begin multi-thread insertion...\n"); + int64_t begin = taosGetTimestampUs(); + pthread_t* tids = calloc(numThreads, sizeof(pthread_t)); + SThreadInsertArgs* argsThread = calloc(numThreads, sizeof(SThreadInsertArgs)); + for (int i=0; i < numThreads; ++i) { + argsThread[i].lines = linesThread[i]; + argsThread[i].taos = taos; + argsThread[i].numLines = numSuperTables * numChildTables * numRowsPerChildTable; + pthread_create(tids+i, NULL, insertLines, argsThread+i); + } + + for (int i = 0; i < numThreads; ++i) { + pthread_join(tids[i], NULL); + } + int64_t end = taosGetTimestampUs(); + + int totalLines = numThreads*numSuperTables*numChildTables*numRowsPerChildTable; + printf("TOTAL LINES: %d\n", totalLines); + printf("THREADS: %d\n", numThreads); + int64_t sumTime = 0; + for (int i=0; i