diff --git a/src/client/src/tscParseLineProtocol.c b/src/client/src/tscParseLineProtocol.c index 2a16f1aad6525a25da652f643dc7b7f91debd432..43509d3533ebfa3143a7bb43c7e4953132e4474b 100644 --- a/src/client/src/tscParseLineProtocol.c +++ b/src/client/src/tscParseLineProtocol.c @@ -32,9 +32,6 @@ typedef struct { uint8_t type; int16_t length; char* value; - - //=================================== - uint32_t fieldSchemaIdx; } TAOS_SML_KV; typedef struct { @@ -47,9 +44,6 @@ typedef struct { // first kv must be timestamp TAOS_SML_KV* fields; int32_t fieldNum; - - //================================ - uint32_t schemaIdx; } TAOS_SML_DATA_POINT; typedef enum { @@ -62,10 +56,23 @@ typedef enum { typedef struct { uint64_t id; - + SHashObj* smlDataToSchema; } SSmlLinesInfo; + //================================================================================================= +static uint64_t linesSmlHandleId = 0; + +uint64_t genLinesSmlId() { + uint64_t id; + + do { + id = atomic_add_fetch_64(&linesSmlHandleId, 1); + } while (id == 0); + + return id; +} + int compareSmlColKv(const void* p1, const void* p2) { TAOS_SML_KV* kv1 = (TAOS_SML_KV*)p1; TAOS_SML_KV* kv2 = (TAOS_SML_KV*)p2; @@ -168,11 +175,46 @@ static int32_t buildSmlKvSchema(TAOS_SML_KV* smlKv, SHashObj* hash, SArray* arra taosHashPut(hash, field.name, tagKeyLen, &fieldIdx, sizeof(fieldIdx)); } - smlKv->fieldSchemaIdx = (uint32_t)fieldIdx; + uintptr_t valPointer = (uintptr_t)smlKv; + taosHashPut(info->smlDataToSchema, &valPointer, sizeof(uintptr_t), &fieldIdx, sizeof(fieldIdx)); return 0; } +static int32_t getSmlMd5ChildTableName(TAOS_SML_DATA_POINT* point, char* tableName, int* tableNameLen, + SSmlLinesInfo* info) { + tscDebug("SML:0x%"PRIx64" taos_sml_insert get child table name through md5", info->id); + qsort(point->tags, point->tagNum, sizeof(TAOS_SML_KV), compareSmlColKv); + + SStringBuilder sb; memset(&sb, 0, sizeof(sb)); + char sTableName[TSDB_TABLE_NAME_LEN] = {0}; + strtolower(sTableName, point->stableName); + taosStringBuilderAppendString(&sb, sTableName); + for (int j = 0; j < point->tagNum; ++j) { + taosStringBuilderAppendChar(&sb, ','); + TAOS_SML_KV* tagKv = point->tags + j; + char tagName[TSDB_COL_NAME_LEN] = {0}; + strtolower(tagName, tagKv->key); + taosStringBuilderAppendString(&sb, tagName); + taosStringBuilderAppendChar(&sb, '='); + taosStringBuilderAppend(&sb, tagKv->value, tagKv->length); + } + size_t len = 0; + char* keyJoined = taosStringBuilderGetResult(&sb, &len); + MD5_CTX context; + MD5Init(&context); + MD5Update(&context, (uint8_t *)keyJoined, (uint32_t)len); + MD5Final(&context); + *tableNameLen = snprintf(tableName, *tableNameLen, + "t_%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x", context.digest[0], + context.digest[1], context.digest[2], context.digest[3], context.digest[4], context.digest[5], context.digest[6], + context.digest[7], context.digest[8], context.digest[9], context.digest[10], context.digest[11], + context.digest[12], context.digest[13], context.digest[14], context.digest[15]); + taosStringBuilderDestroy(&sb); + tscDebug("SML:0x%"PRIx64" child table name: %s", info->id, tableName); + return 0; +} + static int32_t buildDataPointSchemas(TAOS_SML_DATA_POINT* points, int numPoint, SArray* stableSchemas, SSmlLinesInfo* info) { int32_t code = 0; SHashObj* sname2shema = taosHashInit(32, @@ -203,6 +245,15 @@ static int32_t buildDataPointSchemas(TAOS_SML_DATA_POINT* points, int numPoint, for (int j = 0; j < point->tagNum; ++j) { TAOS_SML_KV* tagKv = point->tags + j; + if (!point->childTableName) { + char childTableName[TSDB_TABLE_NAME_LEN]; + int32_t tableNameLen = TSDB_TABLE_NAME_LEN; + getSmlMd5ChildTableName(point, childTableName, &tableNameLen, info); + point->childTableName = calloc(1, tableNameLen+1); + strncpy(point->childTableName, childTableName, tableNameLen); + point->childTableName[tableNameLen] = '\0'; + } + code = buildSmlKvSchema(tagKv, pStableSchema->tagHash, pStableSchema->tags, info); if (code != 0) { tscError("SML:0x%"PRIx64" build data point schema failed. point no.: %d, tag key: %s", info->id, i, tagKv->key); @@ -219,7 +270,8 @@ static int32_t buildDataPointSchemas(TAOS_SML_DATA_POINT* points, int numPoint, } } - point->schemaIdx = (uint32_t)stableIdx; + uintptr_t valPointer = (uintptr_t)point; + taosHashPut(info->smlDataToSchema, &valPointer, sizeof(uintptr_t), &stableIdx, sizeof(stableIdx)); } size_t numStables = taosArrayGetSize(stableSchemas); @@ -567,41 +619,6 @@ static int32_t modifyDBSchemas(TAOS* taos, SArray* stableSchemas, SSmlLinesInfo* return 0; } -static int32_t getSmlMd5ChildTableName(TAOS_SML_DATA_POINT* point, char* tableName, int* tableNameLen, - SSmlLinesInfo* info) { - tscDebug("SML:0x%"PRIx64" taos_sml_insert get child table name through md5", info->id); - qsort(point->tags, point->tagNum, sizeof(TAOS_SML_KV), compareSmlColKv); - - SStringBuilder sb; memset(&sb, 0, sizeof(sb)); - char sTableName[TSDB_TABLE_NAME_LEN] = {0}; - strtolower(sTableName, point->stableName); - taosStringBuilderAppendString(&sb, sTableName); - for (int j = 0; j < point->tagNum; ++j) { - taosStringBuilderAppendChar(&sb, ','); - TAOS_SML_KV* tagKv = point->tags + j; - char tagName[TSDB_COL_NAME_LEN] = {0}; - strtolower(tagName, tagKv->key); - taosStringBuilderAppendString(&sb, tagName); - taosStringBuilderAppendChar(&sb, '='); - taosStringBuilderAppend(&sb, tagKv->value, tagKv->length); - } - size_t len = 0; - char* keyJoined = taosStringBuilderGetResult(&sb, &len); - MD5_CTX context; - MD5Init(&context); - MD5Update(&context, (uint8_t *)keyJoined, (uint32_t)len); - MD5Final(&context); - *tableNameLen = snprintf(tableName, *tableNameLen, - "t_%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x", context.digest[0], - context.digest[1], context.digest[2], context.digest[3], context.digest[4], context.digest[5], context.digest[6], - context.digest[7], context.digest[8], context.digest[9], context.digest[10], context.digest[11], - context.digest[12], context.digest[13], context.digest[14], context.digest[15]); - taosStringBuilderDestroy(&sb); - tscDebug("SML:0x%"PRIx64" child table name: %s", info->id, tableName); - return 0; -} - - static int32_t changeChildTableTagValue(TAOS* taos, const char* cTableName, const char* tagName, TAOS_BIND* bind, SSmlLinesInfo* info) { char sql[512]; sprintf(sql, "alter table %s set tag %s=?", cTableName, tagName); @@ -611,25 +628,25 @@ static int32_t changeChildTableTagValue(TAOS* taos, const char* cTableName, cons code = taos_stmt_prepare(stmt, sql, (unsigned long)strlen(sql)); if (code != 0) { - tscError("SML:0x%"PRIx64" %s", info->id, taos_stmt_errstr(stmt)); + tscError("SML:0x%"PRIx64" taos_stmt_prepare return %d:%s", info->id, code, tstrerror(code)); return code; } code = taos_stmt_bind_param(stmt, bind); if (code != 0) { - tscError("SML:0x%"PRIx64" %s", info->id, taos_stmt_errstr(stmt)); + tscError("SML:0x%"PRIx64" taos_stmt_bind_param return %d:%s", info->id, code, tstrerror(code)); return code; } code = taos_stmt_execute(stmt); if (code != 0) { - tscError("SML:0x%"PRIx64" %s", info->id, taos_stmt_errstr(stmt)); + tscError("SML:0x%"PRIx64" taos_stmt_execute return %d:%s", info->id, code, tstrerror(code)); return code; } code = taos_stmt_close(stmt); if (code != 0) { - tscError("SML:0x%"PRIx64" %s", info->id, taos_stmt_errstr(stmt)); + tscError("SML:0x%"PRIx64" taos_stmt_close return %d:%s", info->id, code, tstrerror(code)); return code; } return code; @@ -674,27 +691,27 @@ static int32_t creatChildTableIfNotExists(TAOS* taos, const char* cTableName, co if (code != 0) { tfree(stmt); - tscError("SML:0x%"PRIx64" %s", info->id, taos_stmt_errstr(stmt)); + tscError("SML:0x%"PRIx64" taos_stmt_prepare returns %d:%s", info->id, code, tstrerror(code)); return code; } code = taos_stmt_bind_param(stmt, TARRAY_GET_START(tagsBind)); if (code != 0) { tfree(stmt); - tscError("SML:0x%"PRIx64" %s", info->id, taos_stmt_errstr(stmt)); + tscError("SML:0x%"PRIx64" taos_stmt_bind_param returns %d:%s", info->id, code, tstrerror(code)); return code; } code = taos_stmt_execute(stmt); if (code != 0) { tfree(stmt); - tscError("SML:0x%"PRIx64" %s", info->id, taos_stmt_errstr(stmt)); + tscError("SML:0x%"PRIx64" taos_stmt_execute returns %d:%s", info->id, code, tstrerror(code)); return code; } code = taos_stmt_close(stmt); if (code != 0) { - tscError("SML:0x%"PRIx64" %s", info->id, taos_stmt_errstr(stmt)); + tscError("SML:0x%"PRIx64" taos_stmt_close return %d:%s", info->id, code, tstrerror(code)); return code; } return code; @@ -738,7 +755,7 @@ static int32_t insertChildTableBatch(TAOS* taos, char* cTableName, SArray* cols if (code != 0) { tfree(stmt); - tscError("SML:0x%"PRIx64" %s", info->id, taos_stmt_errstr(stmt)); + tscError("SML:0x%"PRIx64" taos_stmt_prepare return %d:%s", info->id, code, tstrerror(code)); return code; } @@ -746,7 +763,7 @@ static int32_t insertChildTableBatch(TAOS* taos, char* cTableName, SArray* cols code = taos_stmt_set_tbname(stmt, cTableName); if (code != 0) { tfree(stmt); - tscError("SML:0x%"PRIx64" %s", info->id, taos_stmt_errstr(stmt)); + tscError("SML:0x%"PRIx64" taos_stmt_set_tbname return %d:%s", info->id, code, tstrerror(code)); return code; } @@ -756,25 +773,25 @@ static int32_t insertChildTableBatch(TAOS* taos, char* cTableName, SArray* cols code = taos_stmt_bind_param(stmt, colsBinds); if (code != 0) { tfree(stmt); - tscError("SML:0x%"PRIx64" %s", info->id, taos_stmt_errstr(stmt)); + tscError("SML:0x%"PRIx64" taos_stmt_bind_param return %d:%s", info->id, code, tstrerror(code)); return code; } code = taos_stmt_add_batch(stmt); if (code != 0) { tfree(stmt); - tscError("SML:0x%"PRIx64" %s", info->id, taos_stmt_errstr(stmt)); + tscError("SML:0x%"PRIx64" taos_stmt_add_batch return %d:%s", info->id, code, tstrerror(code)); return code; } } code = taos_stmt_execute(stmt); if (code != 0) { - tscError("SML:0x%"PRIx64" %s", info->id, taos_stmt_errstr(stmt)); + tscError("SML:0x%"PRIx64" taos_stmt_execute return %d:%s", info->id, code, tstrerror(code)); } } while (code == TSDB_CODE_TDB_TABLE_RECONFIGURE && try++ < TSDB_MAX_REPLICA); if (code != 0) { - tscError("SML:0x%"PRIx64" %s", info->id, taos_stmt_errstr(stmt)); + tscError("SML:0x%"PRIx64" %d:%s", info->id, code, tstrerror(code)); taos_stmt_close(stmt); } else { taos_stmt_close(stmt); @@ -787,16 +804,10 @@ static int32_t arrangePointsByChildTableName(TAOS_SML_DATA_POINT* points, int nu SHashObj* cname2points, SArray* stableSchemas, SSmlLinesInfo* info) { for (int32_t i = 0; i < numPoints; ++i) { TAOS_SML_DATA_POINT * point = points + i; - if (!point->childTableName) { - char childTableName[TSDB_TABLE_NAME_LEN]; - int32_t tableNameLen = TSDB_TABLE_NAME_LEN; - getSmlMd5ChildTableName(point, childTableName, &tableNameLen, info); - point->childTableName = calloc(1, tableNameLen+1); - strncpy(point->childTableName, childTableName, tableNameLen); - point->childTableName[tableNameLen] = '\0'; - } - - SSmlSTableSchema* stableSchema = taosArrayGet(stableSchemas, point->schemaIdx); + uintptr_t valPointer = (uintptr_t)point; + size_t* pSchemaIndex = taosHashGet(info->smlDataToSchema, &valPointer, sizeof(uintptr_t)); + assert(pSchemaIndex != NULL); + SSmlSTableSchema* stableSchema = taosArrayGet(stableSchemas, *pSchemaIndex); for (int j = 0; j < point->tagNum; ++j) { TAOS_SML_KV* kv = point->tags + j; @@ -840,7 +851,10 @@ static int32_t applyChildTableTags(TAOS* taos, char* cTableName, char* sTableNam TAOS_SML_DATA_POINT * pDataPoint = taosArrayGetP(cTablePoints, i); for (int j = 0; j < pDataPoint->tagNum; ++j) { TAOS_SML_KV* kv = pDataPoint->tags + j; - tagKVs[kv->fieldSchemaIdx] = kv; + uintptr_t valPointer = (uintptr_t)kv; + size_t* pFieldSchemaIdx = taosHashGet(info->smlDataToSchema, &valPointer, sizeof(uintptr_t)); + assert(pFieldSchemaIdx != NULL); + tagKVs[*pFieldSchemaIdx] = kv; } } @@ -863,7 +877,10 @@ static int32_t applyChildTableTags(TAOS* taos, char* cTableName, char* sTableNam for (int j = 0; j < numTags; ++j) { if (tagKVs[j] == NULL) continue; TAOS_SML_KV* kv = tagKVs[j]; - TAOS_BIND* bind = taosArrayGet(tagBinds, kv->fieldSchemaIdx); + uintptr_t valPointer = (uintptr_t)kv; + size_t* pFieldSchemaIdx = taosHashGet(info->smlDataToSchema, &valPointer, sizeof(uintptr_t)); + assert(pFieldSchemaIdx != NULL); + TAOS_BIND* bind = taosArrayGet(tagBinds, *pFieldSchemaIdx); bind->buffer_type = kv->type; bind->length = malloc(sizeof(uintptr_t*)); *bind->length = kv->length; @@ -912,7 +929,10 @@ static int32_t applyChildTableTags(TAOS* taos, char* cTableName, char* sTableNam assert(tagKV->value); if (val == NULL || length != tagKV->length || memcmp(tagKV->value, val, length) != 0) { - TAOS_BIND* bind = taosArrayGet(tagBinds, tagKV->fieldSchemaIdx); + uintptr_t valPointer = (uintptr_t)tagKV; + size_t* pFieldSchemaIdx = taosHashGet(info->smlDataToSchema, &valPointer, sizeof(uintptr_t)); + assert(pFieldSchemaIdx != NULL); + TAOS_BIND* bind = taosArrayGet(tagBinds, *pFieldSchemaIdx); code = changeChildTableTagValue(taos, cTableName, tagKV->key, bind, info); if (code != 0) { tscError("SML:0x%"PRIx64" change child table tag failed. table name %s, tag %s", info->id, cTableName, tagKV->key); @@ -963,7 +983,10 @@ static int32_t applyChildTableFields(TAOS* taos, SSmlSTableSchema* sTableSchema, } for (int j = 0; j < point->fieldNum; ++j) { TAOS_SML_KV* kv = point->fields + j; - TAOS_BIND* bind = colBinds + kv->fieldSchemaIdx; + uintptr_t valPointer = (uintptr_t)kv; + size_t* pFieldSchemaIdx = taosHashGet(info->smlDataToSchema, &valPointer, sizeof(uintptr_t)); + assert(pFieldSchemaIdx != NULL); + TAOS_BIND* bind = colBinds + *pFieldSchemaIdx; bind->buffer_type = kv->type; bind->length = malloc(sizeof(uintptr_t*)); *bind->length = kv->length; @@ -1000,9 +1023,11 @@ static int32_t applyDataPoints(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t while (pCTablePoints) { SArray* cTablePoints = *pCTablePoints; - TAOS_SML_DATA_POINT* point = taosArrayGetP(cTablePoints, 0); - SSmlSTableSchema* sTableSchema = taosArrayGet(stableSchemas, point->schemaIdx); + uintptr_t valPointer = (uintptr_t)point; + size_t* pSchemaIndex = taosHashGet(info->smlDataToSchema, &valPointer, sizeof(uintptr_t)); + assert(pSchemaIndex != NULL); + SSmlSTableSchema* sTableSchema = taosArrayGet(stableSchemas, *pSchemaIndex); tscDebug("SML:0x%"PRIx64" apply child table tags. child table: %s", info->id, point->childTableName); code = applyChildTableTags(taos, point->childTableName, point->stableName, sTableSchema, cTablePoints, info); @@ -1034,10 +1059,11 @@ cleanup: return code; } -int taos_sml_insert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint, SSmlLinesInfo* info) { +int tscSmlInsert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint, SSmlLinesInfo* info) { tscDebug("SML:0x%"PRIx64" taos_sml_insert. number of points: %d", info->id, numPoint); int32_t code = TSDB_CODE_SUCCESS; + info->smlDataToSchema = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, false); tscDebug("SML:0x%"PRIx64" build data point schemas", info->id); SArray* stableSchemas = taosArrayInit(32, sizeof(SSmlSTableSchema)); // SArray @@ -1067,6 +1093,15 @@ clean_up: taosArrayDestroy(schema->tags); } taosArrayDestroy(stableSchemas); + taosHashCleanup(info->smlDataToSchema); + return code; +} + +int taos_sml_insert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint) { + SSmlLinesInfo* info = calloc(1, sizeof(SSmlLinesInfo)); + info->id = genLinesSmlId(); + int code = tscSmlInsert(taos, points, numPoint, info); + free(info); return code; } @@ -2076,18 +2111,6 @@ int32_t tscParseLine(const char* sql, TAOS_SML_DATA_POINT* smlData, SSmlLinesInf //========================================================================= -static uint64_t linesSmlHandleId = 0; - -uint64_t genLinesSmlId() { - uint64_t id; - - do { - id = atomic_add_fetch_64(&linesSmlHandleId, 1); - } while (id == 0); - - return id; -} - void destroySmlDataPoint(TAOS_SML_DATA_POINT* point) { for (int i=0; itagNum; ++i) { free((point->tags+i)->key); @@ -2157,7 +2180,7 @@ int taos_insert_lines(TAOS* taos, char* lines[], int numLines) { } TAOS_SML_DATA_POINT* points = TARRAY_GET_START(lpPoints); - code = taos_sml_insert(taos, points, (int)numPoints, info); + code = tscSmlInsert(taos, points, (int)numPoints, info); if (code != 0) { tscError("SML:0x%"PRIx64" taos_sml_insert error: %s", info->id, tstrerror((code))); }