From 19b965816060c2626a85dceb02acb60024b317ba Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Mon, 26 Jul 2021 15:51:59 +0800 Subject: [PATCH] add traceable id to log --- src/client/src/tscParseLineProtocol.c | 243 +++++++++++++++----------- 1 file changed, 143 insertions(+), 100 deletions(-) diff --git a/src/client/src/tscParseLineProtocol.c b/src/client/src/tscParseLineProtocol.c index 7645a30eee..3ae3f5c371 100644 --- a/src/client/src/tscParseLineProtocol.c +++ b/src/client/src/tscParseLineProtocol.c @@ -34,7 +34,7 @@ typedef struct { char* value; //=================================== - size_t fieldSchemaIdx; + uint32_t fieldSchemaIdx; } TAOS_SML_KV; typedef struct { @@ -42,14 +42,14 @@ typedef struct { char* childTableName; TAOS_SML_KV* tags; - int tagNum; + int32_t tagNum; // first kv must be timestamp TAOS_SML_KV* fields; - int fieldNum; + int32_t fieldNum; //================================ - size_t schemaIdx; + uint32_t schemaIdx; } TAOS_SML_DATA_POINT; typedef enum { @@ -60,6 +60,10 @@ typedef enum { SML_TIME_STAMP_NANO_SECONDS } SMLTimeStampType; +typedef struct { + uint64_t id; + +} SSmlLinesInfo; //================================================================================================= int compareSmlColKv(const void* p1, const void* p2) { @@ -102,7 +106,7 @@ typedef struct { }; } SSchemaAction; -static int32_t getFieldBytesFromSmlKv(TAOS_SML_KV* kv, int32_t* bytes) { +static int32_t getFieldBytesFromSmlKv(TAOS_SML_KV* kv, int32_t* bytes, uint64_t id) { if (!IS_VAR_DATA_TYPE(kv->type)) { *bytes = tDataTypes[kv->type].bytes; } else { @@ -112,7 +116,7 @@ static int32_t getFieldBytesFromSmlKv(TAOS_SML_KV* kv, int32_t* bytes) { bool succ = taosMbsToUcs4(kv->value, kv->length, ucs, kv->length * TSDB_NCHAR_SIZE, &bytesNeeded); if (!succ) { free(ucs); - tscError("convert nchar string to UCS4_LE failed:%s", kv->value); + tscError("SML:0x%"PRIx64" convert nchar string to UCS4_LE failed:%s", id, kv->value); return TSDB_CODE_TSC_INVALID_VALUE; } free(ucs); @@ -124,7 +128,7 @@ static int32_t getFieldBytesFromSmlKv(TAOS_SML_KV* kv, int32_t* bytes) { return 0; } -static int32_t buildSmlKvSchema(TAOS_SML_KV* smlKv, SHashObj* hash, SArray* array) { +static int32_t buildSmlKvSchema(TAOS_SML_KV* smlKv, SHashObj* hash, SArray* array, SSmlLinesInfo* info) { SSchema* pField = NULL; size_t* pFieldIdx = taosHashGet(hash, smlKv->key, strlen(smlKv->key)); size_t fieldIdx = -1; @@ -134,12 +138,12 @@ static int32_t buildSmlKvSchema(TAOS_SML_KV* smlKv, SHashObj* hash, SArray* arra pField = taosArrayGet(array, fieldIdx); if (pField->type != smlKv->type) { - tscError("type mismatch. key %s, type %d. type before %d", smlKv->key, smlKv->type, pField->type); + tscError("SML:0x%"PRIx64" type mismatch. key %s, type %d. type before %d", info->id, smlKv->key, smlKv->type, pField->type); return TSDB_CODE_TSC_INVALID_VALUE; } int32_t bytes = 0; - code = getFieldBytesFromSmlKv(smlKv, &bytes); + code = getFieldBytesFromSmlKv(smlKv, &bytes, info->id); if (code != 0) { return code; } @@ -153,7 +157,7 @@ static int32_t buildSmlKvSchema(TAOS_SML_KV* smlKv, SHashObj* hash, SArray* arra field.type = smlKv->type; int32_t bytes = 0; - code = getFieldBytesFromSmlKv(smlKv, &bytes); + code = getFieldBytesFromSmlKv(smlKv, &bytes, info->id); if (code != 0) { return code; } @@ -169,7 +173,7 @@ static int32_t buildSmlKvSchema(TAOS_SML_KV* smlKv, SHashObj* hash, SArray* arra return 0; } -static int32_t buildDataPointSchemas(TAOS_SML_DATA_POINT* points, int numPoint, SArray* stableSchemas) { +static int32_t buildDataPointSchemas(TAOS_SML_DATA_POINT* points, int numPoint, SArray* stableSchemas, SSmlLinesInfo* info) { int32_t code = 0; SHashObj* sname2shema = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false); @@ -199,18 +203,18 @@ static int32_t buildDataPointSchemas(TAOS_SML_DATA_POINT* points, int numPoint, for (int j = 0; j < point->tagNum; ++j) { TAOS_SML_KV* tagKv = point->tags + j; - code = buildSmlKvSchema(tagKv, pStableSchema->tagHash, pStableSchema->tags); + code = buildSmlKvSchema(tagKv, pStableSchema->tagHash, pStableSchema->tags, info); if (code != 0) { - tscError("build data point schema failed. point no.: %d, tag key: %s", i, tagKv->key); + tscError("SML:0x%"PRIx64" build data point schema failed. point no.: %d, tag key: %s", info->id, i, tagKv->key); return code; } } for (int j = 0; j < point->fieldNum; ++j) { TAOS_SML_KV* fieldKv = point->fields + j; - code = buildSmlKvSchema(fieldKv, pStableSchema->fieldHash, pStableSchema->fields); + code = buildSmlKvSchema(fieldKv, pStableSchema->fieldHash, pStableSchema->fields, info); if (code != 0) { - tscError("build data point schema failed. point no.: %d, tag key: %s", i, fieldKv->key); + tscError("SML:0x%"PRIx64" build data point schema failed. point no.: %d, tag key: %s", info->id, i, fieldKv->key); return code; } } @@ -226,7 +230,7 @@ static int32_t buildDataPointSchemas(TAOS_SML_DATA_POINT* points, int numPoint, } taosHashCleanup(sname2shema); - tscDebug("build point schema succeed. num of super table: %zu", numStables); + tscDebug("SML:0x%"PRIx64" build point schema succeed. num of super table: %zu", info->id, numStables); for (int32_t i = 0; i < numStables; ++i) { SSmlSTableSchema* schema = taosArrayGet(stableSchemas, i); tscDebug("\ttable name: %s, tags number: %zu, fields number: %zu", schema->sTableName, @@ -237,7 +241,7 @@ static int32_t buildDataPointSchemas(TAOS_SML_DATA_POINT* points, int numPoint, } static int32_t generateSchemaAction(SSchema* pointColField, SHashObj* dbAttrHash, SArray* dbAttrArray, bool isTag, char sTableName[], - SSchemaAction* action, bool* actionNeeded) { + SSchemaAction* action, bool* actionNeeded, SSmlLinesInfo* info) { char fieldNameLowerCase[TSDB_COL_NAME_LEN] = {0}; strtolower(fieldNameLowerCase, pointColField->name); @@ -246,7 +250,7 @@ static int32_t generateSchemaAction(SSchema* pointColField, SHashObj* dbAttrHash SSchema* dbAttr = taosArrayGet(dbAttrArray, *pDbIndex); assert(strcasecmp(dbAttr->name, pointColField->name) == 0); if (pointColField->type != dbAttr->type) { - tscError("point type and db type mismatch. key: %s. point type: %d, db type: %d", pointColField->name, + tscError("SML:0x%"PRIx64" point type and db type mismatch. key: %s. point type: %d, db type: %d", info->id, pointColField->name, pointColField->type, dbAttr->type); return TSDB_CODE_TSC_INVALID_VALUE; } @@ -273,7 +277,10 @@ static int32_t generateSchemaAction(SSchema* pointColField, SHashObj* dbAttrHash action->alterSTable.field = pointColField; *actionNeeded = true; } - tscDebug("generate schema action. action needed: %d, action: %d", *actionNeeded, action->action); + if (*actionNeeded) { + tscDebug("SML:0x%" PRIx64 " generate schema action. column name: %s, action: %d", info->id, fieldNameLowerCase, + action->action); + } return 0; } @@ -299,13 +306,13 @@ static int32_t buildColumnDescription(SSchema* field, } -static int32_t applySchemaAction(TAOS* taos, SSchemaAction* action) { +static int32_t applySchemaAction(TAOS* taos, SSchemaAction* action, SSmlLinesInfo* info) { int32_t code = 0; int32_t outBytes = 0; char *result = (char *)calloc(1, tsMaxSQLStringLen+1); int32_t capacity = tsMaxSQLStringLen + 1; - tscDebug("apply schema action: %d", action->action); + tscDebug("SML:0x%"PRIx64" apply schema action. action: %d", info->id, action->action); switch (action->action) { case SCHEMA_ACTION_ADD_COLUMN: { int n = sprintf(result, "alter stable %s add column ", action->alterSTable.sTableName); @@ -378,7 +385,7 @@ static int32_t applySchemaAction(TAOS* taos, SSchemaAction* action) { free(result); if (code != 0) { - tscError("apply schema action failure. %s", tstrerror(code)); + tscError("SML:0x%"PRIx64 "apply schema action failure. %s", info->id, tstrerror(code)); } return code; } @@ -391,7 +398,7 @@ static int32_t destroySmlSTableSchema(SSmlSTableSchema* schema) { return 0; } -int32_t loadTableMeta(TAOS* taos, char* tableName, SSmlSTableSchema* schema) { +int32_t loadTableMeta(TAOS* taos, char* tableName, SSmlSTableSchema* schema, SSmlLinesInfo* info) { int32_t code = 0; STscObj *pObj = (STscObj *)taos; @@ -400,7 +407,7 @@ int32_t loadTableMeta(TAOS* taos, char* tableName, SSmlSTableSchema* schema) { return TSDB_CODE_TSC_DISCONNECTED; } - tscDebug("load table schema. super table name: %s", tableName); + tscDebug("SML:0x%"PRIx64" load table schema. super table name: %s", info->id, tableName); char tableNameLowerCase[TSDB_TABLE_NAME_LEN]; strtolower(tableNameLowerCase, tableName); @@ -410,7 +417,7 @@ int32_t loadTableMeta(TAOS* taos, char* tableName, SSmlSTableSchema* schema) { TAOS_RES* res = taos_query(taos, sql); code = taos_errno(res); if (code != 0) { - tscError("describe table failure. %s", taos_errstr(res)); + tscError("SML:0x%"PRIx64" describe table failure. %s", info->id, taos_errstr(res)); taos_free_result(res); return code; } @@ -474,13 +481,13 @@ int32_t loadTableMeta(TAOS* taos, char* tableName, SSmlSTableSchema* schema) { size_t tagIndex = taosArrayGetSize(schema->tags) - 1; taosHashPut(schema->tagHash, field.name, strlen(field.name), &tagIndex, sizeof(tagIndex)); } - tscDebug("load table meta succeed. %s, columns number: %d, tag number: %d, precision: %d", - tableName, tableMeta->tableInfo.numOfColumns, tableMeta->tableInfo.numOfTags, schema->precision); + tscDebug("SML:0x%"PRIx64 "load table meta succeed. %s, columns number: %d, tag number: %d, precision: %d", + info->id, tableName, tableMeta->tableInfo.numOfColumns, tableMeta->tableInfo.numOfTags, schema->precision); free(tableMeta); tableMeta = NULL; return code; } -static int32_t modifyDBSchemas(TAOS* taos, SArray* stableSchemas) { +static int32_t modifyDBSchemas(TAOS* taos, SArray* stableSchemas, SSmlLinesInfo* info) { int32_t code = 0; size_t numStable = taosArrayGetSize(stableSchemas); for (int i = 0; i < numStable; ++i) { @@ -488,7 +495,7 @@ static int32_t modifyDBSchemas(TAOS* taos, SArray* stableSchemas) { SSmlSTableSchema dbSchema; memset(&dbSchema, 0, sizeof(SSmlSTableSchema)); - code = loadTableMeta(taos, pointSchema->sTableName, &dbSchema); + code = loadTableMeta(taos, pointSchema->sTableName, &dbSchema, info); if (code == TSDB_CODE_MND_INVALID_TABLE_NAME) { SSchemaAction schemaAction = {0}; schemaAction.action = SCHEMA_ACTION_CREATE_STABLE; @@ -496,10 +503,10 @@ static int32_t modifyDBSchemas(TAOS* taos, SArray* stableSchemas) { memcpy(schemaAction.createSTable.sTableName, pointSchema->sTableName, TSDB_TABLE_NAME_LEN); schemaAction.createSTable.tags = pointSchema->tags; schemaAction.createSTable.fields = pointSchema->fields; - applySchemaAction(taos, &schemaAction); - code = loadTableMeta(taos, pointSchema->sTableName, &dbSchema); + applySchemaAction(taos, &schemaAction, info); + code = loadTableMeta(taos, pointSchema->sTableName, &dbSchema, info); if (code != 0) { - tscError("reconcile point schema failed. can not create %s", pointSchema->sTableName); + tscError("SML:0x%"PRIx64" reconcile point schema failed. can not create %s", info->id, pointSchema->sTableName); return code; } else { pointSchema->precision = dbSchema.precision; @@ -516,9 +523,10 @@ static int32_t modifyDBSchemas(TAOS* taos, SArray* stableSchemas) { SSchema* pointTag = taosArrayGet(pointSchema->tags, j); SSchemaAction schemaAction = {0}; bool actionNeeded = false; - generateSchemaAction(pointTag, dbTagHash, dbSchema.tags, true, pointSchema->sTableName, &schemaAction, &actionNeeded); + generateSchemaAction(pointTag, dbTagHash, dbSchema.tags, true, pointSchema->sTableName, + &schemaAction, &actionNeeded, info); if (actionNeeded) { - code = applySchemaAction(taos, &schemaAction); + code = applySchemaAction(taos, &schemaAction, info); if (code != 0) { destroySmlSTableSchema(&dbSchema); return code; @@ -534,9 +542,10 @@ static int32_t modifyDBSchemas(TAOS* taos, SArray* stableSchemas) { SSchema* pointCol = taosArrayGet(pointSchema->fields, j); SSchemaAction schemaAction = {0}; bool actionNeeded = false; - generateSchemaAction(pointCol, dbFieldHash, dbSchema.fields,false, pointSchema->sTableName, &schemaAction, &actionNeeded); + generateSchemaAction(pointCol, dbFieldHash, dbSchema.fields,false, pointSchema->sTableName, + &schemaAction, &actionNeeded, info); if (actionNeeded) { - code = applySchemaAction(taos, &schemaAction); + code = applySchemaAction(taos, &schemaAction, info); if (code != 0) { destroySmlSTableSchema(&dbSchema); return code; @@ -548,23 +557,28 @@ static int32_t modifyDBSchemas(TAOS* taos, SArray* stableSchemas) { destroySmlSTableSchema(&dbSchema); } else { - tscError("load table meta error: %s", tstrerror(code)); + tscError("SML:0x%"PRIx64" load table meta error: %s", info->id, tstrerror(code)); return code; } } return 0; } -static int32_t getSmlMd5ChildTableName(TAOS_SML_DATA_POINT* point, char* tableName, int* tableNameLen) { - tscDebug("taos_sml_insert get child table name through md5"); +static int32_t getSmlMd5ChildTableName(TAOS_SML_DATA_POINT* point, char* tableName, int* tableNameLen, + SSmlLinesInfo* info) { + tscDebug("SML:0x%"PRIx64" taos_sml_insert get child table name through md5", info->id); qsort(point->tags, point->tagNum, sizeof(TAOS_SML_KV), compareSmlColKv); SStringBuilder sb; memset(&sb, 0, sizeof(sb)); - taosStringBuilderAppendString(&sb, point->stableName); + char sTableName[TSDB_TABLE_NAME_LEN] = {0}; + strtolower(sTableName, point->stableName); + taosStringBuilderAppendString(&sb, sTableName); for (int j = 0; j < point->tagNum; ++j) { taosStringBuilderAppendChar(&sb, ','); TAOS_SML_KV* tagKv = point->tags + j; - taosStringBuilderAppendString(&sb, tagKv->key); + char tagName[TSDB_COL_NAME_LEN] = {0}; + strtolower(tagName, tagKv->key); + taosStringBuilderAppendString(&sb, tagName); taosStringBuilderAppendChar(&sb, '='); taosStringBuilderAppend(&sb, tagKv->value, tagKv->length); } @@ -580,12 +594,12 @@ static int32_t getSmlMd5ChildTableName(TAOS_SML_DATA_POINT* point, char* tableNa context.digest[7], context.digest[8], context.digest[9], context.digest[10], context.digest[11], context.digest[12], context.digest[13], context.digest[14], context.digest[15]); taosStringBuilderDestroy(&sb); - tscDebug("child table name: %s", tableName); + tscDebug("SML:0x%"PRIx64" child table name: %s", info->id, tableName); return 0; } -static int32_t changeChildTableTagValue(TAOS* taos, const char* cTableName, const char* tagName, TAOS_BIND* bind) { +static int32_t changeChildTableTagValue(TAOS* taos, const char* cTableName, const char* tagName, TAOS_BIND* bind, SSmlLinesInfo* info) { char sql[512]; sprintf(sql, "alter table %s set tag %s=?", cTableName, tagName); @@ -594,31 +608,32 @@ static int32_t changeChildTableTagValue(TAOS* taos, const char* cTableName, cons code = taos_stmt_prepare(stmt, sql, (unsigned long)strlen(sql)); if (code != 0) { - tscError("%s", taos_stmt_errstr(stmt)); + tscError("SML:0x%"PRIx64" %s", info->id, taos_stmt_errstr(stmt)); return code; } code = taos_stmt_bind_param(stmt, bind); if (code != 0) { - tscError("%s", taos_stmt_errstr(stmt)); + tscError("SML:0x%"PRIx64" %s", info->id, taos_stmt_errstr(stmt)); return code; } code = taos_stmt_execute(stmt); if (code != 0) { - tscError("%s", taos_stmt_errstr(stmt)); + tscError("SML:0x%"PRIx64" %s", info->id, taos_stmt_errstr(stmt)); return code; } code = taos_stmt_close(stmt); if (code != 0) { - tscError("%s", taos_stmt_errstr(stmt)); + tscError("SML:0x%"PRIx64" %s", info->id, taos_stmt_errstr(stmt)); return code; } return code; } -static int32_t creatChildTableIfNotExists(TAOS* taos, const char* cTableName, const char* sTableName, SArray* tagsSchema, SArray* tagsBind) { +static int32_t creatChildTableIfNotExists(TAOS* taos, const char* cTableName, const char* sTableName, + SArray* tagsSchema, SArray* tagsBind, SSmlLinesInfo* info) { size_t numTags = taosArrayGetSize(tagsSchema); char* sql = malloc(tsMaxSQLStringLen+1); int freeBytes = tsMaxSQLStringLen + 1; @@ -639,7 +654,7 @@ static int32_t creatChildTableIfNotExists(TAOS* taos, const char* cTableName, co snprintf(sql + strlen(sql) - 1, freeBytes-strlen(sql)+1, ")"); sql[strlen(sql)] = '\0'; - tscDebug("create table : %s", sql); + tscDebug("SML:0x%"PRIx64" create table : %s", info->id, sql); TAOS_STMT* stmt = taos_stmt_init(taos); int32_t code; @@ -647,31 +662,31 @@ static int32_t creatChildTableIfNotExists(TAOS* taos, const char* cTableName, co free(sql); if (code != 0) { - tscError("%s", taos_stmt_errstr(stmt)); + tscError("SML:0x%"PRIx64" %s", info->id, taos_stmt_errstr(stmt)); return code; } code = taos_stmt_bind_param(stmt, TARRAY_GET_START(tagsBind)); if (code != 0) { - tscError("%s", taos_stmt_errstr(stmt)); + tscError("SML:0x%"PRIx64" %s", info->id, taos_stmt_errstr(stmt)); return code; } code = taos_stmt_execute(stmt); if (code != 0) { - tscError("%s", taos_stmt_errstr(stmt)); + tscError("SML:0x%"PRIx64" %s", info->id, taos_stmt_errstr(stmt)); return code; } code = taos_stmt_close(stmt); if (code != 0) { - tscError("%s", taos_stmt_errstr(stmt)); + tscError("SML:0x%"PRIx64" %s", info->id, taos_stmt_errstr(stmt)); return code; } return code; } -static int32_t insertChildTableBatch(TAOS* taos, char* cTableName, SArray* colsSchema, SArray* rowsBind) { +static int32_t insertChildTableBatch(TAOS* taos, char* cTableName, SArray* colsSchema, SArray* rowsBind, SSmlLinesInfo* info) { size_t numCols = taosArrayGetSize(colsSchema); char* sql = malloc(tsMaxSQLStringLen+1); int32_t freeBytes = tsMaxSQLStringLen + 1 ; @@ -689,7 +704,7 @@ static int32_t insertChildTableBatch(TAOS* taos, char* cTableName, SArray* cols snprintf(sql + strlen(sql)-1, freeBytes-strlen(sql)+1, ")"); sql[strlen(sql)] = '\0'; - tscDebug("insert rows %zu into child table %s. ", taosArrayGetSize(rowsBind), cTableName); + tscDebug("SML:0x%"PRIx64" insert rows into child table %s. num of rows: %zu", info->id, cTableName, taosArrayGetSize(rowsBind)); int32_t code = 0; int32_t try = 0; @@ -700,14 +715,14 @@ static int32_t insertChildTableBatch(TAOS* taos, char* cTableName, SArray* cols free(sql); if (code != 0) { - tscError("%s", taos_stmt_errstr(stmt)); + tscError("SML:0x%"PRIx64" %s", info->id, taos_stmt_errstr(stmt)); return code; } do { code = taos_stmt_set_tbname(stmt, cTableName); if (code != 0) { - tscError("%s", taos_stmt_errstr(stmt)); + tscError("SML:0x%"PRIx64" %s", info->id, taos_stmt_errstr(stmt)); return code; } @@ -716,24 +731,24 @@ static int32_t insertChildTableBatch(TAOS* taos, char* cTableName, SArray* cols TAOS_BIND* colsBinds = taosArrayGetP(rowsBind, i); code = taos_stmt_bind_param(stmt, colsBinds); if (code != 0) { - tscError("%s", taos_stmt_errstr(stmt)); + tscError("SML:0x%"PRIx64" %s", info->id, taos_stmt_errstr(stmt)); return code; } code = taos_stmt_add_batch(stmt); if (code != 0) { - tscError("%s", taos_stmt_errstr(stmt)); + tscError("SML:0x%"PRIx64" %s", info->id, taos_stmt_errstr(stmt)); return code; } } code = taos_stmt_execute(stmt); if (code != 0) { - tscError("%s", taos_stmt_errstr(stmt)); + tscError("SML:0x%"PRIx64" %s", info->id, taos_stmt_errstr(stmt)); } } while (code == TSDB_CODE_TDB_TABLE_RECONFIGURE && try++ < TSDB_MAX_REPLICA); if (code != 0) { - tscError("%s", taos_stmt_errstr(stmt)); + tscError("SML:0x%"PRIx64" %s", info->id, taos_stmt_errstr(stmt)); taos_stmt_close(stmt); } else { taos_stmt_close(stmt); @@ -743,13 +758,13 @@ static int32_t insertChildTableBatch(TAOS* taos, char* cTableName, SArray* cols } static int32_t arrangePointsByChildTableName(TAOS_SML_DATA_POINT* points, int numPoints, - SHashObj* cname2points, SArray* stableSchemas) { + SHashObj* cname2points, SArray* stableSchemas, SSmlLinesInfo* info) { for (int32_t i = 0; i < numPoints; ++i) { TAOS_SML_DATA_POINT * point = points + i; if (!point->childTableName) { char childTableName[TSDB_TABLE_NAME_LEN]; int32_t tableNameLen = TSDB_TABLE_NAME_LEN; - getSmlMd5ChildTableName(point, childTableName, &tableNameLen); + getSmlMd5ChildTableName(point, childTableName, &tableNameLen, info); point->childTableName = calloc(1, tableNameLen+1); strncpy(point->childTableName, childTableName, tableNameLen); point->childTableName[tableNameLen] = '\0'; @@ -790,7 +805,7 @@ static int32_t arrangePointsByChildTableName(TAOS_SML_DATA_POINT* points, int nu } static int32_t applyChildTableTags(TAOS* taos, char* cTableName, char* sTableName, - SSmlSTableSchema* sTableSchema, SArray* cTablePoints) { + SSmlSTableSchema* sTableSchema, SArray* cTablePoints, SSmlLinesInfo* info) { size_t numTags = taosArrayGetSize(sTableSchema->tags); size_t rows = taosArrayGetSize(cTablePoints); @@ -846,7 +861,7 @@ static int32_t applyChildTableTags(TAOS* taos, char* cTableName, char* sTableNam int32_t code = taos_errno(result); if (code != 0) { - tscError("get child table %s tags failed. error string %s", cTableName, taos_errstr(result)); + tscError("SML:0x%"PRIx64" get child table %s tags failed. error string %s", info->id, cTableName, taos_errstr(result)); goto cleanup; } @@ -863,8 +878,8 @@ static int32_t applyChildTableTags(TAOS* taos, char* cTableName, char* sTableNam TAOS_SML_KV* tagKV = tagKVs[notNullTagsIndices[i-1]]; if (tagKV->type != dbType) { - tscError("child table %s tag %s type mismatch. point type : %d, db type : %d", - cTableName, tagKV->key, tagKV->type, dbType); + tscError("SML:0x%"PRIx64" child table %s tag %s type mismatch. point type : %d, db type : %d", + info->id, cTableName, tagKV->key, tagKV->type, dbType); return TSDB_CODE_TSC_INVALID_VALUE; } @@ -872,16 +887,16 @@ static int32_t applyChildTableTags(TAOS* taos, char* cTableName, char* sTableNam if (val == NULL || length != tagKV->length || memcmp(tagKV->value, val, length) != 0) { TAOS_BIND* bind = taosArrayGet(tagBinds, tagKV->fieldSchemaIdx); - code = changeChildTableTagValue(taos, cTableName, tagKV->key, bind); + code = changeChildTableTagValue(taos, cTableName, tagKV->key, bind, info); if (code != 0) { - tscError("change child table tag failed. table name %s, tag %s", cTableName, tagKV->key); + tscError("SML:0x%"PRIx64" change child table tag failed. table name %s, tag %s", info->id, cTableName, tagKV->key); goto cleanup; } } } - tscDebug("successfully applied point tags. child table: %s", cTableName); + tscDebug("SML:0x%"PRIx64" successfully applied point tags. child table: %s", info->id, cTableName); } else { - code = creatChildTableIfNotExists(taos, cTableName, sTableName, sTableSchema->tags, tagBinds); + code = creatChildTableIfNotExists(taos, cTableName, sTableName, sTableSchema->tags, tagBinds, info); if (code != 0) { goto cleanup; } @@ -897,7 +912,8 @@ cleanup: return code; } -static int32_t applyChildTableFields(TAOS* taos, SSmlSTableSchema* sTableSchema, char* cTableName, SArray* cTablePoints) { +static int32_t applyChildTableFields(TAOS* taos, SSmlSTableSchema* sTableSchema, char* cTableName, + SArray* cTablePoints, SSmlLinesInfo* info) { int32_t code = TSDB_CODE_SUCCESS; size_t numCols = taosArrayGetSize(sTableSchema->fields); @@ -909,8 +925,8 @@ static int32_t applyChildTableFields(TAOS* taos, SSmlSTableSchema* sTableSchema, TAOS_BIND* colBinds = calloc(numCols, sizeof(TAOS_BIND)); if (colBinds == NULL) { - tscError("taos_sml_insert insert points, failed to allocated memory for TAOS_BIND, " - "num of rows: %zu, num of cols: %zu", rows, numCols); + tscError("SML:0x%"PRIx64" taos_sml_insert insert points, failed to allocated memory for TAOS_BIND, " + "num of rows: %zu, num of cols: %zu", info->id, rows, numCols); return TSDB_CODE_TSC_OUT_OF_MEMORY; } @@ -931,9 +947,9 @@ static int32_t applyChildTableFields(TAOS* taos, SSmlSTableSchema* sTableSchema, taosArrayPush(rowsBind, &colBinds); } - code = insertChildTableBatch(taos, cTableName, sTableSchema->fields, rowsBind); + code = insertChildTableBatch(taos, cTableName, sTableSchema->fields, rowsBind, info); if (code != 0) { - tscError("insert into child table %s failed. error %s", cTableName, tstrerror(code)); + tscError("SML:0x%"PRIx64" insert into child table %s failed. error %s", info->id, cTableName, tstrerror(code)); } for (int i = 0; i < rows; ++i) { @@ -948,30 +964,35 @@ static int32_t applyChildTableFields(TAOS* taos, SSmlSTableSchema* sTableSchema, return code; } -static int32_t applyDataPoints(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t numPoints, SArray* stableSchemas) { +static int32_t applyDataPoints(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t numPoints, SArray* stableSchemas, SSmlLinesInfo* info) { int32_t code = TSDB_CODE_SUCCESS; SHashObj* cname2points = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false); - arrangePointsByChildTableName(points, numPoints, cname2points, stableSchemas); + arrangePointsByChildTableName(points, numPoints, cname2points, stableSchemas, info); SArray** pCTablePoints = taosHashIterate(cname2points, NULL); while (pCTablePoints) { SArray* cTablePoints = *pCTablePoints; + TAOS_SML_DATA_POINT* point = taosArrayGetP(cTablePoints, 0); SSmlSTableSchema* sTableSchema = taosArrayGet(stableSchemas, point->schemaIdx); - code = applyChildTableTags(taos, point->childTableName, point->stableName, sTableSchema, cTablePoints); + + tscDebug("SML:0x%"PRIx64" apply child table tags. child table: %s", info->id, point->childTableName); + code = applyChildTableTags(taos, point->childTableName, point->stableName, sTableSchema, cTablePoints, info); if (code != 0) { tscError("apply child table tags failed. child table %s, error %s", point->childTableName, tstrerror(code)); goto cleanup; } - code = applyChildTableFields(taos, sTableSchema, point->childTableName, cTablePoints); + + tscDebug("SML:0x%"PRIx64" apply child table points. child table: %s", info->id, point->childTableName); + code = applyChildTableFields(taos, sTableSchema, point->childTableName, cTablePoints, info); if (code != 0) { tscError("Apply child table fields failed. child table %s, error %s", point->childTableName, tstrerror(code)); goto cleanup; } - tscDebug("successfully applied data points of child table %s", point->childTableName); + tscDebug("SML:0x%"PRIx64" successfully applied data points of child table %s", info->id, point->childTableName); pCTablePoints = taosHashIterate(cname2points, pCTablePoints); } @@ -987,27 +1008,30 @@ cleanup: return code; } -int taos_sml_insert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint) { - tscDebug("taos_sml_insert. number of points: %d", numPoint); +int taos_sml_insert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint, SSmlLinesInfo* info) { + tscDebug("SML:0x%"PRIx64" taos_sml_insert. number of points: %d", info->id, numPoint); int32_t code = TSDB_CODE_SUCCESS; + tscDebug("SML:0x%"PRIx64" build data point schemas", info->id); SArray* stableSchemas = taosArrayInit(32, sizeof(SSmlSTableSchema)); // SArray - code = buildDataPointSchemas(points, numPoint, stableSchemas); + code = buildDataPointSchemas(points, numPoint, stableSchemas, info); if (code != 0) { - tscError("error building data point schemas : %s", tstrerror(code)); + tscError("SML:0x%"PRIx64" error building data point schemas : %s", info->id, tstrerror(code)); goto clean_up; } - code = modifyDBSchemas(taos, stableSchemas); + tscDebug("SML:0x%"PRIx64" modify db schemas", info->id); + code = modifyDBSchemas(taos, stableSchemas, info); if (code != 0) { - tscError("error change db schema : %s", tstrerror(code)); + tscError("SML:0x%"PRIx64" error change db schema : %s", info->id, tstrerror(code)); goto clean_up; } - code = applyDataPoints(taos, points, numPoint, stableSchemas); + tscDebug("SML:0x%"PRIx64" apply data points", info->id); + code = applyDataPoints(taos, points, numPoint, stableSchemas, info); if (code != 0) { - tscError("error apply data points : %s", tstrerror(code)); + tscError("SML:0x%"PRIx64" error apply data points : %s", info->id, tstrerror(code)); } clean_up: @@ -1902,6 +1926,18 @@ int32_t tscParseLine(const char* sql, TAOS_SML_DATA_POINT* smlData) { //========================================================================= +static uint64_t linesSmlHandleId = 0; + +uint64_t genLinesSmlId() { + uint64_t id; + + do { + id = atomic_add_fetch_64(&linesSmlHandleId, 1); + } while (id == 0); + + return id; +} + void destroySmlDataPoint(TAOS_SML_DATA_POINT* point) { for (int i=0; itagNum; ++i) { free((point->tags+i)->key); @@ -1917,16 +1953,16 @@ void destroySmlDataPoint(TAOS_SML_DATA_POINT* point) { free(point->childTableName); } -int32_t tscParseLines(char* lines[], int numLines, SArray* points, SArray* failedLines) { +int32_t tscParseLines(char* lines[], int numLines, SArray* points, SArray* failedLines, SSmlLinesInfo* info) { for (int32_t i = 0; i < numLines; ++i) { TAOS_SML_DATA_POINT point = {0}; int32_t code = tscParseLine(lines[i], &point); if (code != TSDB_CODE_SUCCESS) { - tscError("data point line parse failed. line %d : %s", i, lines[i]); + tscError("SML:0x%"PRIx64" data point line parse failed. line %d : %s", info->id, i, lines[i]); destroySmlDataPoint(&point); return TSDB_CODE_TSC_LINE_SYNTAX_ERROR; } else { - tscDebug("data point line parse success. line %d", i); + tscDebug("SML:0x%"PRIx64" data point line parse success. line %d", info->id, i); } taosArrayPush(points, &point); @@ -1937,15 +1973,19 @@ int32_t tscParseLines(char* lines[], int numLines, SArray* points, SArray* faile int taos_insert_lines(TAOS* taos, char* lines[], int numLines) { int32_t code = 0; + SSmlLinesInfo* info = calloc(1, sizeof(SSmlLinesInfo)); + info->id = genLinesSmlId(); + if (numLines <= 0 || numLines > 65536) { - tscError("taos_insert_lines numLines should be between 1 and 65536. numLines: %d", numLines); + tscError("SML:0x%"PRIx64" taos_insert_lines numLines should be between 1 and 65536. numLines: %d", info->id, numLines); code = TSDB_CODE_TSC_APP_ERROR; return code; } for (int i = 0; i < numLines; ++i) { if (lines[i] == NULL) { - tscError("taos_insert_lines line %d is NULL", i); + tscError("SML:0x%"PRIx64" taos_insert_lines line %d is NULL", info->id, i); + free(info); code = TSDB_CODE_TSC_APP_ERROR; return code; } @@ -1953,12 +1993,13 @@ int taos_insert_lines(TAOS* taos, char* lines[], int numLines) { SArray* lpPoints = taosArrayInit(numLines, sizeof(TAOS_SML_DATA_POINT)); if (lpPoints == NULL) { - tscError("taos_insert_lines failed to allocate memory"); + tscError("SML:0x%"PRIx64" taos_insert_lines failed to allocate memory", info->id); + free(info); return TSDB_CODE_TSC_OUT_OF_MEMORY; } - tscDebug("taos_insert_lines begin inserting %d lines, first line: %s", numLines, lines[0]); - code = tscParseLines(lines, numLines, lpPoints, NULL); + tscDebug("SML:0x%"PRIx64" taos_insert_lines begin inserting %d lines, first line: %s", info->id, numLines, lines[0]); + code = tscParseLines(lines, numLines, lpPoints, NULL, info); size_t numPoints = taosArrayGetSize(lpPoints); if (code != 0) { @@ -1966,13 +2007,13 @@ int taos_insert_lines(TAOS* taos, char* lines[], int numLines) { } TAOS_SML_DATA_POINT* points = TARRAY_GET_START(lpPoints); - code = taos_sml_insert(taos, points, (int)numPoints); + code = taos_sml_insert(taos, points, (int)numPoints, info); if (code != 0) { - tscError("taos_sml_insert error: %s", tstrerror((code))); + tscError("SML:0x%"PRIx64" taos_sml_insert error: %s", info->id, tstrerror((code))); } cleanup: - tscDebug("taos_insert_lines finish inserting %d lines. code: %d", numLines, code); + tscDebug("SML:0x%"PRIx64" taos_insert_lines finish inserting %d lines. code: %d", info->id, numLines, code); points = TARRAY_GET_START(lpPoints); numPoints = taosArrayGetSize(lpPoints); for (int i=0; i