From 94451c84e5b61ea846f7317428c3390ff0a74d30 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Tue, 12 Oct 2021 17:04:50 +0800 Subject: [PATCH] schemaless merge from master to develop --- src/client/inc/tscParseLine.h | 4 +- src/client/src/tscParseLineProtocol.c | 182 +------------------------- tests/examples/c/schemaless.c | 27 +--- 3 files changed, 11 insertions(+), 202 deletions(-) diff --git a/src/client/inc/tscParseLine.h b/src/client/inc/tscParseLine.h index 8c7aaad81b..5b3f8f05d2 100644 --- a/src/client/inc/tscParseLine.h +++ b/src/client/inc/tscParseLine.h @@ -25,6 +25,7 @@ typedef struct { uint8_t type; int16_t length; char* value; + uint32_t fieldSchemaIdx; } TAOS_SML_KV; typedef struct { @@ -37,6 +38,8 @@ typedef struct { // first kv must be timestamp TAOS_SML_KV* fields; int32_t fieldNum; + + uint32_t schemaIdx; } TAOS_SML_DATA_POINT; typedef enum { @@ -55,7 +58,6 @@ typedef enum { typedef struct { uint64_t id; - SHashObj* smlDataToSchema; } SSmlLinesInfo; int tscSmlInsert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint, SSmlLinesInfo* info); diff --git a/src/client/src/tscParseLineProtocol.c b/src/client/src/tscParseLineProtocol.c index 0c2bf14bfc..3290386858 100644 --- a/src/client/src/tscParseLineProtocol.c +++ b/src/client/src/tscParseLineProtocol.c @@ -28,50 +28,7 @@ typedef struct { uint8_t precision; } SSmlSTableSchema; -<<<<<<< HEAD //================================================================================================= -======= -typedef struct { - char* key; - uint8_t type; - int16_t length; - char* value; - - uint32_t fieldSchemaIdx; -} TAOS_SML_KV; - -typedef struct { - char* stableName; - - char* childTableName; - TAOS_SML_KV* tags; - int32_t tagNum; - - // first kv must be timestamp - TAOS_SML_KV* fields; - int32_t fieldNum; - - uint32_t schemaIdx; -} TAOS_SML_DATA_POINT; ->>>>>>> origin/master - -static uint64_t linesSmlHandleId = 0; - -uint64_t genLinesSmlId() { - uint64_t id; -<<<<<<< HEAD - - do { - id = atomic_add_fetch_64(&linesSmlHandleId, 1); - } while (id == 0); - - return id; -} -======= -} SSmlLinesInfo; - -//================================================================================================= ->>>>>>> origin/master static uint64_t linesSmlHandleId = 0; @@ -187,43 +144,8 @@ static int32_t buildSmlKvSchema(TAOS_SML_KV* smlKv, SHashObj* hash, SArray* arra taosHashPut(hash, field.name, tagKeyLen, &fieldIdx, sizeof(fieldIdx)); } - uintptr_t valPointer = (uintptr_t)smlKv; - taosHashPut(info->smlDataToSchema, &valPointer, sizeof(uintptr_t), &fieldIdx, sizeof(fieldIdx)); - - return 0; -} - -static int32_t getSmlMd5ChildTableName(TAOS_SML_DATA_POINT* point, char* tableName, int* tableNameLen, - SSmlLinesInfo* info) { - tscDebug("SML:0x%"PRIx64" taos_sml_insert get child table name through md5", info->id); - qsort(point->tags, point->tagNum, sizeof(TAOS_SML_KV), compareSmlColKv); + smlKv->fieldSchemaIdx = (uint32_t)fieldIdx; - SStringBuilder sb; memset(&sb, 0, sizeof(sb)); - char sTableName[TSDB_TABLE_NAME_LEN] = {0}; - strtolower(sTableName, point->stableName); - taosStringBuilderAppendString(&sb, sTableName); - for (int j = 0; j < point->tagNum; ++j) { - taosStringBuilderAppendChar(&sb, ','); - TAOS_SML_KV* tagKv = point->tags + j; - char tagName[TSDB_COL_NAME_LEN] = {0}; - strtolower(tagName, tagKv->key); - taosStringBuilderAppendString(&sb, tagName); - taosStringBuilderAppendChar(&sb, '='); - taosStringBuilderAppend(&sb, tagKv->value, tagKv->length); - } - size_t len = 0; - char* keyJoined = taosStringBuilderGetResult(&sb, &len); - MD5_CTX context; - MD5Init(&context); - MD5Update(&context, (uint8_t *)keyJoined, (uint32_t)len); - MD5Final(&context); - *tableNameLen = snprintf(tableName, *tableNameLen, - "t_%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x", context.digest[0], - context.digest[1], context.digest[2], context.digest[3], context.digest[4], context.digest[5], context.digest[6], - context.digest[7], context.digest[8], context.digest[9], context.digest[10], context.digest[11], - context.digest[12], context.digest[13], context.digest[14], context.digest[15]); - taosStringBuilderDestroy(&sb); - tscDebug("SML:0x%"PRIx64" child table name: %s", info->id, tableName); return 0; } @@ -316,8 +238,7 @@ static int32_t buildDataPointSchemas(TAOS_SML_DATA_POINT* points, int numPoint, } } - uintptr_t valPointer = (uintptr_t)point; - taosHashPut(info->smlDataToSchema, &valPointer, sizeof(uintptr_t), &stableIdx, sizeof(stableIdx)); + point->schemaIdx = (uint32_t)stableIdx; } size_t numStables = taosArrayGetSize(stableSchemas); @@ -432,10 +353,7 @@ static int32_t applySchemaAction(TAOS* taos, SSchemaAction* action, SSmlLinesInf tscError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2)); } taos_free_result(res2); -<<<<<<< HEAD -======= taosMsleep(500); ->>>>>>> origin/master } break; } @@ -460,10 +378,7 @@ static int32_t applySchemaAction(TAOS* taos, SSchemaAction* action, SSmlLinesInf tscError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2)); } taos_free_result(res2); -<<<<<<< HEAD -======= taosMsleep(500); ->>>>>>> origin/master } break; } @@ -485,10 +400,7 @@ static int32_t applySchemaAction(TAOS* taos, SSchemaAction* action, SSmlLinesInf tscError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2)); } taos_free_result(res2); -<<<<<<< HEAD -======= taosMsleep(500); ->>>>>>> origin/master } break; } @@ -510,10 +422,7 @@ static int32_t applySchemaAction(TAOS* taos, SSchemaAction* action, SSmlLinesInf tscError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2)); } taos_free_result(res2); -<<<<<<< HEAD -======= taosMsleep(500); ->>>>>>> origin/master } break; } @@ -555,10 +464,7 @@ static int32_t applySchemaAction(TAOS* taos, SSchemaAction* action, SSmlLinesInf tscError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2)); } taos_free_result(res2); -<<<<<<< HEAD -======= taosMsleep(500); ->>>>>>> origin/master } break; } @@ -655,48 +561,28 @@ static int32_t retrieveTableMeta(TAOS* taos, char* tableName, STableMeta** pTabl registerSqlObj(pSql); SStrToken tableToken = {.z = tableNameLowerCase, .n = (uint32_t)strlen(tableNameLowerCase), .type = TK_ID}; tGetToken(tableNameLowerCase, &tableToken.type); -<<<<<<< HEAD - bool dbIncluded = false; // Check if the table name available or not if (tscValidateName(&tableToken, true, &dbIncluded) != TSDB_CODE_SUCCESS) { - code = TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH; - sprintf(pSql->cmd.payload, "table name is invalid"); - tscFreeRegisteredSqlObj(pSql); -======= - // Check if the table name available or not - if (tscValidateName(&tableToken) != TSDB_CODE_SUCCESS) { code = TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH; sprintf(pSql->cmd.payload, "table name is invalid"); taosReleaseRef(tscObjRef, pSql->self); ->>>>>>> origin/master return code; } SName sname = {0}; -<<<<<<< HEAD if ((code = tscSetTableFullName(&sname, &tableToken, pSql, dbIncluded)) != TSDB_CODE_SUCCESS) { - tscFreeRegisteredSqlObj(pSql); -======= - if ((code = tscSetTableFullName(&sname, &tableToken, pSql)) != TSDB_CODE_SUCCESS) { taosReleaseRef(tscObjRef, pSql->self); ->>>>>>> origin/master return code; } + char fullTableName[TSDB_TABLE_FNAME_LEN] = {0}; memset(fullTableName, 0, tListLen(fullTableName)); tNameExtractFullName(&sname, fullTableName); -<<<<<<< HEAD - - size_t size = 0; - taosHashGetCloneExt(UTIL_GET_TABLEMETA(pSql), fullTableName, strlen(fullTableName), NULL, (void**)&tableMeta, &size); - tscFreeRegisteredSqlObj(pSql); -======= taosReleaseRef(tscObjRef, pSql->self); size_t size = 0; - taosHashGetCloneExt(tscTableMetaMap, fullTableName, strlen(fullTableName), NULL, (void**)&tableMeta, &size); ->>>>>>> origin/master + taosHashGetCloneExt(UTIL_GET_TABLEMETA(pSql), fullTableName, strlen(fullTableName), NULL, (void**)&tableMeta, &size); } if (tableMeta != NULL) { @@ -914,17 +800,10 @@ static int32_t doInsertChildTableWithStmt(TAOS* taos, char* sql, char* cTableNam tryAgain = false; if ((code == TSDB_CODE_TDB_INVALID_TABLE_ID -<<<<<<< HEAD - || code == TSDB_CODE_VND_INVALID_VGROUP_ID - || code == TSDB_CODE_TDB_TABLE_RECONFIGURE - || code == TSDB_CODE_APP_NOT_READY - || code == TSDB_CODE_RPC_NETWORK_UNAVAIL) && try++ < TSDB_MAX_REPLICA) { -======= || code == TSDB_CODE_VND_INVALID_VGROUP_ID || code == TSDB_CODE_TDB_TABLE_RECONFIGURE || code == TSDB_CODE_APP_NOT_READY || code == TSDB_CODE_RPC_NETWORK_UNAVAIL) && try++ < TSDB_MAX_REPLICA) { ->>>>>>> origin/master tryAgain = true; } @@ -936,24 +815,11 @@ static int32_t doInsertChildTableWithStmt(TAOS* taos, char* sql, char* cTableNam } taos_free_result(res2); if (tryAgain) { -<<<<<<< HEAD - taosMsleep(50 * (2 << try)); -======= taosMsleep(100 * (2 << try)); ->>>>>>> origin/master } } if (code == TSDB_CODE_APP_NOT_READY || code == TSDB_CODE_RPC_NETWORK_UNAVAIL) { if (tryAgain) { -<<<<<<< HEAD - taosMsleep( 50 * (2 << try)); - } - } - } while (tryAgain); - - - taos_stmt_close(stmt); -======= taosMsleep( 100 * (2 << try)); } } @@ -1013,7 +879,6 @@ static int32_t insertChildTableBatch(TAOS* taos, char* cTableName, SArray* cols } taosArrayDestroy(batchBind); tfree(sql); ->>>>>>> origin/master return code; } @@ -1021,14 +886,7 @@ static int32_t arrangePointsByChildTableName(TAOS_SML_DATA_POINT* points, int nu SHashObj* cname2points, SArray* stableSchemas, SSmlLinesInfo* info) { for (int32_t i = 0; i < numPoints; ++i) { TAOS_SML_DATA_POINT * point = points + i; -<<<<<<< HEAD - uintptr_t valPointer = (uintptr_t)point; - size_t* pSchemaIndex = taosHashGet(info->smlDataToSchema, &valPointer, sizeof(uintptr_t)); - assert(pSchemaIndex != NULL); - SSmlSTableSchema* stableSchema = taosArrayGet(stableSchemas, *pSchemaIndex); -======= SSmlSTableSchema* stableSchema = taosArrayGet(stableSchemas, point->schemaIdx); ->>>>>>> origin/master for (int j = 0; j < point->tagNum; ++j) { TAOS_SML_KV* kv = point->tags + j; @@ -1072,14 +930,7 @@ static int32_t applyChildTableTags(TAOS* taos, char* cTableName, char* sTableNam TAOS_SML_DATA_POINT * pDataPoint = taosArrayGetP(cTablePoints, i); for (int j = 0; j < pDataPoint->tagNum; ++j) { TAOS_SML_KV* kv = pDataPoint->tags + j; -<<<<<<< HEAD - uintptr_t valPointer = (uintptr_t)kv; - size_t* pFieldSchemaIdx = taosHashGet(info->smlDataToSchema, &valPointer, sizeof(uintptr_t)); - assert(pFieldSchemaIdx != NULL); - tagKVs[*pFieldSchemaIdx] = kv; -======= tagKVs[kv->fieldSchemaIdx] = kv; ->>>>>>> origin/master } } @@ -1093,10 +944,7 @@ static int32_t applyChildTableTags(TAOS* taos, char* cTableName, char* sTableNam for (int j = 0; j < numTags; ++j) { if (tagKVs[j] == NULL) continue; TAOS_SML_KV* kv = tagKVs[j]; - uintptr_t valPointer = (uintptr_t)kv; - size_t* pFieldSchemaIdx = taosHashGet(info->smlDataToSchema, &valPointer, sizeof(uintptr_t)); - assert(pFieldSchemaIdx != NULL); - TAOS_BIND* bind = taosArrayGet(tagBinds, *pFieldSchemaIdx); + TAOS_BIND* bind = taosArrayGet(tagBinds, kv->fieldSchemaIdx); bind->buffer_type = kv->type; bind->length = malloc(sizeof(uintptr_t*)); *bind->length = kv->length; @@ -1139,10 +987,7 @@ static int32_t applyChildTableFields(TAOS* taos, SSmlSTableSchema* sTableSchema, } for (int j = 0; j < point->fieldNum; ++j) { TAOS_SML_KV* kv = point->fields + j; - uintptr_t valPointer = (uintptr_t)kv; - size_t* pFieldSchemaIdx = taosHashGet(info->smlDataToSchema, &valPointer, sizeof(uintptr_t)); - assert(pFieldSchemaIdx != NULL); - TAOS_BIND* bind = colBinds + *pFieldSchemaIdx; + TAOS_BIND* bind = colBinds + kv->fieldSchemaIdx; bind->buffer_type = kv->type; bind->length = malloc(sizeof(uintptr_t*)); *bind->length = kv->length; @@ -1180,10 +1025,7 @@ static int32_t applyDataPoints(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t SArray* cTablePoints = *pCTablePoints; TAOS_SML_DATA_POINT* point = taosArrayGetP(cTablePoints, 0); - uintptr_t valPointer = (uintptr_t)point; - size_t* pSchemaIndex = taosHashGet(info->smlDataToSchema, &valPointer, sizeof(uintptr_t)); - assert(pSchemaIndex != NULL); - SSmlSTableSchema* sTableSchema = taosArrayGet(stableSchemas, *pSchemaIndex); + SSmlSTableSchema* sTableSchema = taosArrayGet(stableSchemas, point->schemaIdx); tscDebug("SML:0x%"PRIx64" apply child table tags. child table: %s", info->id, point->childTableName); code = applyChildTableTags(taos, point->childTableName, point->stableName, sTableSchema, cTablePoints, info); @@ -1226,7 +1068,6 @@ int tscSmlInsert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint, SSmlLine tscDebug("SML:0x%"PRIx64" taos_sml_insert. number of points: %d", info->id, numPoint); int32_t code = TSDB_CODE_SUCCESS; - info->smlDataToSchema = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, false); tscDebug("SML:0x%"PRIx64" build data point schemas", info->id); SArray* stableSchemas = taosArrayInit(32, sizeof(SSmlSTableSchema)); // SArray @@ -1256,15 +1097,6 @@ clean_up: taosArrayDestroy(schema->tags); } taosArrayDestroy(stableSchemas); - taosHashCleanup(info->smlDataToSchema); - return code; -} - -int taos_sml_insert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint) { - SSmlLinesInfo* info = calloc(1, sizeof(SSmlLinesInfo)); - info->id = genLinesSmlId(); - int code = tscSmlInsert(taos, points, numPoint, info); - free(info); return code; } diff --git a/tests/examples/c/schemaless.c b/tests/examples/c/schemaless.c index a9e1741f51..6ba9de760f 100644 --- a/tests/examples/c/schemaless.c +++ b/tests/examples/c/schemaless.c @@ -43,7 +43,7 @@ static void* insertLines(void* args) { SThreadLinesBatch* batch = insertArgs->batches + i; printf("%s, thread: 0x%s\n", "begin taos_insert_lines", tidBuf); int64_t begin = getTimeInUs(); - int32_t code = taos_insert_lines(insertArgs->taos, batch->lines, batch->numLines); + int32_t code = taos_schemaless_insert(insertArgs->taos, batch->lines, batch->numLines, 0); int64_t end = getTimeInUs(); insertArgs->costTime += end - begin; printf("code: %d, %s. time used:%"PRId64", thread: 0x%s\n", code, tstrerror(code), end - begin, tidBuf); @@ -160,30 +160,6 @@ int main(int argc, char* argv[]) { time_t ct = time(0); int64_t ts = ct * 1000; -<<<<<<< HEAD - char* lineFormat = "sta%d,t0=true,t1=127i8,t2=32767i16,t3=%di32,t4=9223372036854775807i64,t9=11.12345f32,t10=22.123456789f64,t11=\"binaryTagValue\",t12=L\"ncharTagValue\" c0=true,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=254u8,c6=32770u16,c7=2147483699u32,c8=9223372036854775899u64,c9=11.12345f32,c10=22.123456789f64,c11=\"binaryValue\",c12=L\"ncharValue\" %lldms"; - - char** lines = calloc(numSuperTables * numChildTables * numRowsPerChildTable, sizeof(char*)); - int l = 0; - for (int i = 0; i < numSuperTables; ++i) { - for (int j = 0; j < numChildTables; ++j) { - for (int k = 0; k < numRowsPerChildTable; ++k) { - char* line = calloc(512, 1); - snprintf(line, 512, lineFormat, i, j, ts + 10 * l); - lines[l] = line; - ++l; - } - } - } - //shuffle(lines, numSuperTables * numChildTables * numRowsPerChildTable); - - printf("%s\n", "begin taos_schemaless_insert"); - int64_t begin = getTimeInUs(); - int32_t code = taos_schemaless_insert(taos, lines, numSuperTables * numChildTables * numRowsPerChildTable, 0); - int64_t end = getTimeInUs(); - printf("code: %d, %s. time used: %"PRId64"\n", code, tstrerror(code), end-begin); - -======= char* lineTemplate = calloc(65536, sizeof(char)); getLineTemplate(lineTemplate, 65535, numFields); @@ -277,6 +253,5 @@ int main(int argc, char* argv[]) { free(lineTemplate); taos_close(taos); ->>>>>>> origin/master return 0; } -- GitLab