未验证 提交 bf33e7ed 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #7015 from taosdata/hotfix/td-5517

[TD-5517]<fix>: lowercase letters for tablename/colname and return error when apply schema failed and add traceable id in sml log
......@@ -34,7 +34,7 @@ typedef struct {
char* value;
//===================================
size_t fieldSchemaIdx;
uint32_t fieldSchemaIdx;
} TAOS_SML_KV;
typedef struct {
......@@ -42,14 +42,14 @@ typedef struct {
char* childTableName;
TAOS_SML_KV* tags;
int tagNum;
int32_t tagNum;
// first kv must be timestamp
TAOS_SML_KV* fields;
int fieldNum;
int32_t fieldNum;
//================================
size_t schemaIdx;
uint32_t schemaIdx;
} TAOS_SML_DATA_POINT;
typedef enum {
......@@ -60,6 +60,10 @@ typedef enum {
SML_TIME_STAMP_NANO_SECONDS
} SMLTimeStampType;
typedef struct {
uint64_t id;
} SSmlLinesInfo;
//=================================================================================================
int compareSmlColKv(const void* p1, const void* p2) {
......@@ -102,7 +106,7 @@ typedef struct {
};
} SSchemaAction;
static int32_t getFieldBytesFromSmlKv(TAOS_SML_KV* kv, int32_t* bytes) {
static int32_t getFieldBytesFromSmlKv(TAOS_SML_KV* kv, int32_t* bytes, uint64_t id) {
if (!IS_VAR_DATA_TYPE(kv->type)) {
*bytes = tDataTypes[kv->type].bytes;
} else {
......@@ -112,7 +116,7 @@ static int32_t getFieldBytesFromSmlKv(TAOS_SML_KV* kv, int32_t* bytes) {
bool succ = taosMbsToUcs4(kv->value, kv->length, ucs, kv->length * TSDB_NCHAR_SIZE, &bytesNeeded);
if (!succ) {
free(ucs);
tscError("convert nchar string to UCS4_LE failed:%s", kv->value);
tscError("SML:0x%"PRIx64" convert nchar string to UCS4_LE failed:%s", id, kv->value);
return TSDB_CODE_TSC_INVALID_VALUE;
}
free(ucs);
......@@ -124,7 +128,7 @@ static int32_t getFieldBytesFromSmlKv(TAOS_SML_KV* kv, int32_t* bytes) {
return 0;
}
static int32_t buildSmlKvSchema(TAOS_SML_KV* smlKv, SHashObj* hash, SArray* array) {
static int32_t buildSmlKvSchema(TAOS_SML_KV* smlKv, SHashObj* hash, SArray* array, SSmlLinesInfo* info) {
SSchema* pField = NULL;
size_t* pFieldIdx = taosHashGet(hash, smlKv->key, strlen(smlKv->key));
size_t fieldIdx = -1;
......@@ -134,12 +138,12 @@ static int32_t buildSmlKvSchema(TAOS_SML_KV* smlKv, SHashObj* hash, SArray* arra
pField = taosArrayGet(array, fieldIdx);
if (pField->type != smlKv->type) {
tscError("type mismatch. key %s, type %d. type before %d", smlKv->key, smlKv->type, pField->type);
tscError("SML:0x%"PRIx64" type mismatch. key %s, type %d. type before %d", info->id, smlKv->key, smlKv->type, pField->type);
return TSDB_CODE_TSC_INVALID_VALUE;
}
int32_t bytes = 0;
code = getFieldBytesFromSmlKv(smlKv, &bytes);
code = getFieldBytesFromSmlKv(smlKv, &bytes, info->id);
if (code != 0) {
return code;
}
......@@ -153,7 +157,7 @@ static int32_t buildSmlKvSchema(TAOS_SML_KV* smlKv, SHashObj* hash, SArray* arra
field.type = smlKv->type;
int32_t bytes = 0;
code = getFieldBytesFromSmlKv(smlKv, &bytes);
code = getFieldBytesFromSmlKv(smlKv, &bytes, info->id);
if (code != 0) {
return code;
}
......@@ -164,12 +168,12 @@ static int32_t buildSmlKvSchema(TAOS_SML_KV* smlKv, SHashObj* hash, SArray* arra
taosHashPut(hash, field.name, tagKeyLen, &fieldIdx, sizeof(fieldIdx));
}
smlKv->fieldSchemaIdx = fieldIdx;
smlKv->fieldSchemaIdx = (uint32_t)fieldIdx;
return 0;
}
static int32_t buildDataPointSchemas(TAOS_SML_DATA_POINT* points, int numPoint, SArray* stableSchemas) {
static int32_t buildDataPointSchemas(TAOS_SML_DATA_POINT* points, int numPoint, SArray* stableSchemas, SSmlLinesInfo* info) {
int32_t code = 0;
SHashObj* sname2shema = taosHashInit(32,
taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
......@@ -199,23 +203,23 @@ static int32_t buildDataPointSchemas(TAOS_SML_DATA_POINT* points, int numPoint,
for (int j = 0; j < point->tagNum; ++j) {
TAOS_SML_KV* tagKv = point->tags + j;
code = buildSmlKvSchema(tagKv, pStableSchema->tagHash, pStableSchema->tags);
code = buildSmlKvSchema(tagKv, pStableSchema->tagHash, pStableSchema->tags, info);
if (code != 0) {
tscError("build data point schema failed. point no.: %d, tag key: %s", i, tagKv->key);
tscError("SML:0x%"PRIx64" build data point schema failed. point no.: %d, tag key: %s", info->id, i, tagKv->key);
return code;
}
}
for (int j = 0; j < point->fieldNum; ++j) {
TAOS_SML_KV* fieldKv = point->fields + j;
code = buildSmlKvSchema(fieldKv, pStableSchema->fieldHash, pStableSchema->fields);
code = buildSmlKvSchema(fieldKv, pStableSchema->fieldHash, pStableSchema->fields, info);
if (code != 0) {
tscError("build data point schema failed. point no.: %d, tag key: %s", i, fieldKv->key);
tscError("SML:0x%"PRIx64" build data point schema failed. point no.: %d, tag key: %s", info->id, i, fieldKv->key);
return code;
}
}
point->schemaIdx = stableIdx;
point->schemaIdx = (uint32_t)stableIdx;
}
size_t numStables = taosArrayGetSize(stableSchemas);
......@@ -226,7 +230,7 @@ static int32_t buildDataPointSchemas(TAOS_SML_DATA_POINT* points, int numPoint,
}
taosHashCleanup(sname2shema);
tscDebug("build point schema succeed. num of super table: %zu", numStables);
tscDebug("SML:0x%"PRIx64" build point schema succeed. num of super table: %zu", info->id, numStables);
for (int32_t i = 0; i < numStables; ++i) {
SSmlSTableSchema* schema = taosArrayGet(stableSchemas, i);
tscDebug("\ttable name: %s, tags number: %zu, fields number: %zu", schema->sTableName,
......@@ -237,13 +241,16 @@ static int32_t buildDataPointSchemas(TAOS_SML_DATA_POINT* points, int numPoint,
}
static int32_t generateSchemaAction(SSchema* pointColField, SHashObj* dbAttrHash, SArray* dbAttrArray, bool isTag, char sTableName[],
SSchemaAction* action, bool* actionNeeded) {
size_t* pDbIndex = taosHashGet(dbAttrHash, pointColField->name, strlen(pointColField->name));
SSchemaAction* action, bool* actionNeeded, SSmlLinesInfo* info) {
char fieldNameLowerCase[TSDB_COL_NAME_LEN] = {0};
strtolower(fieldNameLowerCase, pointColField->name);
size_t* pDbIndex = taosHashGet(dbAttrHash, fieldNameLowerCase, strlen(fieldNameLowerCase));
if (pDbIndex) {
SSchema* dbAttr = taosArrayGet(dbAttrArray, *pDbIndex);
assert(strcasecmp(dbAttr->name, pointColField->name) == 0);
if (pointColField->type != dbAttr->type) {
tscError("point type and db type mismatch. key: %s. point type: %d, db type: %d", pointColField->name,
tscError("SML:0x%"PRIx64" point type and db type mismatch. key: %s. point type: %d, db type: %d", info->id, pointColField->name,
pointColField->type, dbAttr->type);
return TSDB_CODE_TSC_INVALID_VALUE;
}
......@@ -270,7 +277,10 @@ static int32_t generateSchemaAction(SSchema* pointColField, SHashObj* dbAttrHash
action->alterSTable.field = pointColField;
*actionNeeded = true;
}
tscDebug("generate schema action. action needed: %d, action: %d", *actionNeeded, action->action);
if (*actionNeeded) {
tscDebug("SML:0x%" PRIx64 " generate schema action. column name: %s, action: %d", info->id, fieldNameLowerCase,
action->action);
}
return 0;
}
......@@ -296,13 +306,13 @@ static int32_t buildColumnDescription(SSchema* field,
}
static int32_t applySchemaAction(TAOS* taos, SSchemaAction* action) {
static int32_t applySchemaAction(TAOS* taos, SSchemaAction* action, SSmlLinesInfo* info) {
int32_t code = 0;
int32_t outBytes = 0;
char *result = (char *)calloc(1, tsMaxSQLStringLen+1);
int32_t capacity = tsMaxSQLStringLen + 1;
tscDebug("apply schema action: %d", action->action);
tscDebug("SML:0x%"PRIx64" apply schema action. action: %d", info->id, action->action);
switch (action->action) {
case SCHEMA_ACTION_ADD_COLUMN: {
int n = sprintf(result, "alter stable %s add column ", action->alterSTable.sTableName);
......@@ -375,7 +385,7 @@ static int32_t applySchemaAction(TAOS* taos, SSchemaAction* action) {
free(result);
if (code != 0) {
tscError("apply schema action failure. %s", tstrerror(code));
tscError("SML:0x%"PRIx64 "apply schema action failure. %s", info->id, tstrerror(code));
}
return code;
}
......@@ -388,7 +398,7 @@ static int32_t destroySmlSTableSchema(SSmlSTableSchema* schema) {
return 0;
}
int32_t loadTableMeta(TAOS* taos, char* tableName, SSmlSTableSchema* schema) {
int32_t loadTableMeta(TAOS* taos, char* tableName, SSmlSTableSchema* schema, SSmlLinesInfo* info) {
int32_t code = 0;
STscObj *pObj = (STscObj *)taos;
......@@ -397,14 +407,17 @@ int32_t loadTableMeta(TAOS* taos, char* tableName, SSmlSTableSchema* schema) {
return TSDB_CODE_TSC_DISCONNECTED;
}
tscDebug("load table schema. super table name: %s", tableName);
tscDebug("SML:0x%"PRIx64" load table schema. super table name: %s", info->id, tableName);
char tableNameLowerCase[TSDB_TABLE_NAME_LEN];
strtolower(tableNameLowerCase, tableName);
char sql[256];
snprintf(sql, 256, "describe %s", tableName);
snprintf(sql, 256, "describe %s", tableNameLowerCase);
TAOS_RES* res = taos_query(taos, sql);
code = taos_errno(res);
if (code != 0) {
tscError("describe table failure. %s", taos_errstr(res));
tscError("SML:0x%"PRIx64" describe table failure. %s", info->id, taos_errstr(res));
taos_free_result(res);
return code;
}
......@@ -415,8 +428,8 @@ int32_t loadTableMeta(TAOS* taos, char* tableName, SSmlSTableSchema* schema) {
pSql->signature = pSql;
pSql->fp = NULL;
SStrToken tableToken = {.z=tableName, .n=(uint32_t)strlen(tableName), .type=TK_ID};
tGetToken(tableName, &tableToken.type);
SStrToken tableToken = {.z=tableNameLowerCase, .n=(uint32_t)strlen(tableNameLowerCase), .type=TK_ID};
tGetToken(tableNameLowerCase, &tableToken.type);
// Check if the table name available or not
if (tscValidateName(&tableToken) != TSDB_CODE_SUCCESS) {
code = TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
......@@ -468,13 +481,13 @@ int32_t loadTableMeta(TAOS* taos, char* tableName, SSmlSTableSchema* schema) {
size_t tagIndex = taosArrayGetSize(schema->tags) - 1;
taosHashPut(schema->tagHash, field.name, strlen(field.name), &tagIndex, sizeof(tagIndex));
}
tscDebug("load table meta succeed. %s, columns number: %d, tag number: %d, precision: %d",
tableName, tableMeta->tableInfo.numOfColumns, tableMeta->tableInfo.numOfTags, schema->precision);
tscDebug("SML:0x%"PRIx64 "load table meta succeed. %s, columns number: %d, tag number: %d, precision: %d",
info->id, tableName, tableMeta->tableInfo.numOfColumns, tableMeta->tableInfo.numOfTags, schema->precision);
free(tableMeta); tableMeta = NULL;
return code;
}
static int32_t modifyDBSchemas(TAOS* taos, SArray* stableSchemas) {
static int32_t modifyDBSchemas(TAOS* taos, SArray* stableSchemas, SSmlLinesInfo* info) {
int32_t code = 0;
size_t numStable = taosArrayGetSize(stableSchemas);
for (int i = 0; i < numStable; ++i) {
......@@ -482,7 +495,7 @@ static int32_t modifyDBSchemas(TAOS* taos, SArray* stableSchemas) {
SSmlSTableSchema dbSchema;
memset(&dbSchema, 0, sizeof(SSmlSTableSchema));
code = loadTableMeta(taos, pointSchema->sTableName, &dbSchema);
code = loadTableMeta(taos, pointSchema->sTableName, &dbSchema, info);
if (code == TSDB_CODE_MND_INVALID_TABLE_NAME) {
SSchemaAction schemaAction = {0};
schemaAction.action = SCHEMA_ACTION_CREATE_STABLE;
......@@ -490,10 +503,10 @@ static int32_t modifyDBSchemas(TAOS* taos, SArray* stableSchemas) {
memcpy(schemaAction.createSTable.sTableName, pointSchema->sTableName, TSDB_TABLE_NAME_LEN);
schemaAction.createSTable.tags = pointSchema->tags;
schemaAction.createSTable.fields = pointSchema->fields;
applySchemaAction(taos, &schemaAction);
code = loadTableMeta(taos, pointSchema->sTableName, &dbSchema);
applySchemaAction(taos, &schemaAction, info);
code = loadTableMeta(taos, pointSchema->sTableName, &dbSchema, info);
if (code != 0) {
tscError("reconcile point schema failed. can not create %s", pointSchema->sTableName);
tscError("SML:0x%"PRIx64" reconcile point schema failed. can not create %s", info->id, pointSchema->sTableName);
return code;
} else {
pointSchema->precision = dbSchema.precision;
......@@ -510,9 +523,14 @@ static int32_t modifyDBSchemas(TAOS* taos, SArray* stableSchemas) {
SSchema* pointTag = taosArrayGet(pointSchema->tags, j);
SSchemaAction schemaAction = {0};
bool actionNeeded = false;
generateSchemaAction(pointTag, dbTagHash, dbSchema.tags, true, pointSchema->sTableName, &schemaAction, &actionNeeded);
generateSchemaAction(pointTag, dbTagHash, dbSchema.tags, true, pointSchema->sTableName,
&schemaAction, &actionNeeded, info);
if (actionNeeded) {
applySchemaAction(taos, &schemaAction);
code = applySchemaAction(taos, &schemaAction, info);
if (code != 0) {
destroySmlSTableSchema(&dbSchema);
return code;
}
}
}
......@@ -524,9 +542,14 @@ static int32_t modifyDBSchemas(TAOS* taos, SArray* stableSchemas) {
SSchema* pointCol = taosArrayGet(pointSchema->fields, j);
SSchemaAction schemaAction = {0};
bool actionNeeded = false;
generateSchemaAction(pointCol, dbFieldHash, dbSchema.fields,false, pointSchema->sTableName, &schemaAction, &actionNeeded);
generateSchemaAction(pointCol, dbFieldHash, dbSchema.fields,false, pointSchema->sTableName,
&schemaAction, &actionNeeded, info);
if (actionNeeded) {
applySchemaAction(taos, &schemaAction);
code = applySchemaAction(taos, &schemaAction, info);
if (code != 0) {
destroySmlSTableSchema(&dbSchema);
return code;
}
}
}
......@@ -534,23 +557,28 @@ static int32_t modifyDBSchemas(TAOS* taos, SArray* stableSchemas) {
destroySmlSTableSchema(&dbSchema);
} else {
tscError("load table meta error: %s", tstrerror(code));
tscError("SML:0x%"PRIx64" load table meta error: %s", info->id, tstrerror(code));
return code;
}
}
return 0;
}
static int32_t getSmlMd5ChildTableName(TAOS_SML_DATA_POINT* point, char* tableName, int* tableNameLen) {
tscDebug("taos_sml_insert get child table name through md5");
static int32_t getSmlMd5ChildTableName(TAOS_SML_DATA_POINT* point, char* tableName, int* tableNameLen,
SSmlLinesInfo* info) {
tscDebug("SML:0x%"PRIx64" taos_sml_insert get child table name through md5", info->id);
qsort(point->tags, point->tagNum, sizeof(TAOS_SML_KV), compareSmlColKv);
SStringBuilder sb; memset(&sb, 0, sizeof(sb));
taosStringBuilderAppendString(&sb, point->stableName);
char sTableName[TSDB_TABLE_NAME_LEN] = {0};
strtolower(sTableName, point->stableName);
taosStringBuilderAppendString(&sb, sTableName);
for (int j = 0; j < point->tagNum; ++j) {
taosStringBuilderAppendChar(&sb, ',');
TAOS_SML_KV* tagKv = point->tags + j;
taosStringBuilderAppendString(&sb, tagKv->key);
char tagName[TSDB_COL_NAME_LEN] = {0};
strtolower(tagName, tagKv->key);
taosStringBuilderAppendString(&sb, tagName);
taosStringBuilderAppendChar(&sb, '=');
taosStringBuilderAppend(&sb, tagKv->value, tagKv->length);
}
......@@ -566,12 +594,12 @@ static int32_t getSmlMd5ChildTableName(TAOS_SML_DATA_POINT* point, char* tableNa
context.digest[7], context.digest[8], context.digest[9], context.digest[10], context.digest[11],
context.digest[12], context.digest[13], context.digest[14], context.digest[15]);
taosStringBuilderDestroy(&sb);
tscDebug("child table name: %s", tableName);
tscDebug("SML:0x%"PRIx64" child table name: %s", info->id, tableName);
return 0;
}
static int32_t changeChildTableTagValue(TAOS* taos, const char* cTableName, const char* tagName, TAOS_BIND* bind) {
static int32_t changeChildTableTagValue(TAOS* taos, const char* cTableName, const char* tagName, TAOS_BIND* bind, SSmlLinesInfo* info) {
char sql[512];
sprintf(sql, "alter table %s set tag %s=?", cTableName, tagName);
......@@ -580,31 +608,32 @@ static int32_t changeChildTableTagValue(TAOS* taos, const char* cTableName, cons
code = taos_stmt_prepare(stmt, sql, (unsigned long)strlen(sql));
if (code != 0) {
tscError("%s", taos_stmt_errstr(stmt));
tscError("SML:0x%"PRIx64" %s", info->id, taos_stmt_errstr(stmt));
return code;
}
code = taos_stmt_bind_param(stmt, bind);
if (code != 0) {
tscError("%s", taos_stmt_errstr(stmt));
tscError("SML:0x%"PRIx64" %s", info->id, taos_stmt_errstr(stmt));
return code;
}
code = taos_stmt_execute(stmt);
if (code != 0) {
tscError("%s", taos_stmt_errstr(stmt));
tscError("SML:0x%"PRIx64" %s", info->id, taos_stmt_errstr(stmt));
return code;
}
code = taos_stmt_close(stmt);
if (code != 0) {
tscError("%s", taos_stmt_errstr(stmt));
tscError("SML:0x%"PRIx64" %s", info->id, taos_stmt_errstr(stmt));
return code;
}
return code;
}
static int32_t creatChildTableIfNotExists(TAOS* taos, const char* cTableName, const char* sTableName, SArray* tagsSchema, SArray* tagsBind) {
static int32_t creatChildTableIfNotExists(TAOS* taos, const char* cTableName, const char* sTableName,
SArray* tagsSchema, SArray* tagsBind, SSmlLinesInfo* info) {
size_t numTags = taosArrayGetSize(tagsSchema);
char* sql = malloc(tsMaxSQLStringLen+1);
int freeBytes = tsMaxSQLStringLen + 1;
......@@ -625,7 +654,7 @@ static int32_t creatChildTableIfNotExists(TAOS* taos, const char* cTableName, co
snprintf(sql + strlen(sql) - 1, freeBytes-strlen(sql)+1, ")");
sql[strlen(sql)] = '\0';
tscDebug("create table : %s", sql);
tscDebug("SML:0x%"PRIx64" create table : %s", info->id, sql);
TAOS_STMT* stmt = taos_stmt_init(taos);
int32_t code;
......@@ -633,31 +662,31 @@ static int32_t creatChildTableIfNotExists(TAOS* taos, const char* cTableName, co
free(sql);
if (code != 0) {
tscError("%s", taos_stmt_errstr(stmt));
tscError("SML:0x%"PRIx64" %s", info->id, taos_stmt_errstr(stmt));
return code;
}
code = taos_stmt_bind_param(stmt, TARRAY_GET_START(tagsBind));
if (code != 0) {
tscError("%s", taos_stmt_errstr(stmt));
tscError("SML:0x%"PRIx64" %s", info->id, taos_stmt_errstr(stmt));
return code;
}
code = taos_stmt_execute(stmt);
if (code != 0) {
tscError("%s", taos_stmt_errstr(stmt));
tscError("SML:0x%"PRIx64" %s", info->id, taos_stmt_errstr(stmt));
return code;
}
code = taos_stmt_close(stmt);
if (code != 0) {
tscError("%s", taos_stmt_errstr(stmt));
tscError("SML:0x%"PRIx64" %s", info->id, taos_stmt_errstr(stmt));
return code;
}
return code;
}
static int32_t insertChildTableBatch(TAOS* taos, char* cTableName, SArray* colsSchema, SArray* rowsBind) {
static int32_t insertChildTableBatch(TAOS* taos, char* cTableName, SArray* colsSchema, SArray* rowsBind, SSmlLinesInfo* info) {
size_t numCols = taosArrayGetSize(colsSchema);
char* sql = malloc(tsMaxSQLStringLen+1);
int32_t freeBytes = tsMaxSQLStringLen + 1 ;
......@@ -675,7 +704,7 @@ static int32_t insertChildTableBatch(TAOS* taos, char* cTableName, SArray* cols
snprintf(sql + strlen(sql)-1, freeBytes-strlen(sql)+1, ")");
sql[strlen(sql)] = '\0';
tscDebug("insert rows %zu into child table %s. ", taosArrayGetSize(rowsBind), cTableName);
tscDebug("SML:0x%"PRIx64" insert rows into child table %s. num of rows: %zu", info->id, cTableName, taosArrayGetSize(rowsBind));
int32_t code = 0;
int32_t try = 0;
......@@ -686,14 +715,14 @@ static int32_t insertChildTableBatch(TAOS* taos, char* cTableName, SArray* cols
free(sql);
if (code != 0) {
tscError("%s", taos_stmt_errstr(stmt));
tscError("SML:0x%"PRIx64" %s", info->id, taos_stmt_errstr(stmt));
return code;
}
do {
code = taos_stmt_set_tbname(stmt, cTableName);
if (code != 0) {
tscError("%s", taos_stmt_errstr(stmt));
tscError("SML:0x%"PRIx64" %s", info->id, taos_stmt_errstr(stmt));
return code;
}
......@@ -702,24 +731,24 @@ static int32_t insertChildTableBatch(TAOS* taos, char* cTableName, SArray* cols
TAOS_BIND* colsBinds = taosArrayGetP(rowsBind, i);
code = taos_stmt_bind_param(stmt, colsBinds);
if (code != 0) {
tscError("%s", taos_stmt_errstr(stmt));
tscError("SML:0x%"PRIx64" %s", info->id, taos_stmt_errstr(stmt));
return code;
}
code = taos_stmt_add_batch(stmt);
if (code != 0) {
tscError("%s", taos_stmt_errstr(stmt));
tscError("SML:0x%"PRIx64" %s", info->id, taos_stmt_errstr(stmt));
return code;
}
}
code = taos_stmt_execute(stmt);
if (code != 0) {
tscError("%s", taos_stmt_errstr(stmt));
tscError("SML:0x%"PRIx64" %s", info->id, taos_stmt_errstr(stmt));
}
} while (code == TSDB_CODE_TDB_TABLE_RECONFIGURE && try++ < TSDB_MAX_REPLICA);
if (code != 0) {
tscError("%s", taos_stmt_errstr(stmt));
tscError("SML:0x%"PRIx64" %s", info->id, taos_stmt_errstr(stmt));
taos_stmt_close(stmt);
} else {
taos_stmt_close(stmt);
......@@ -729,13 +758,13 @@ static int32_t insertChildTableBatch(TAOS* taos, char* cTableName, SArray* cols
}
static int32_t arrangePointsByChildTableName(TAOS_SML_DATA_POINT* points, int numPoints,
SHashObj* cname2points, SArray* stableSchemas) {
SHashObj* cname2points, SArray* stableSchemas, SSmlLinesInfo* info) {
for (int32_t i = 0; i < numPoints; ++i) {
TAOS_SML_DATA_POINT * point = points + i;
if (!point->childTableName) {
char childTableName[TSDB_TABLE_NAME_LEN];
int32_t tableNameLen = TSDB_TABLE_NAME_LEN;
getSmlMd5ChildTableName(point, childTableName, &tableNameLen);
getSmlMd5ChildTableName(point, childTableName, &tableNameLen, info);
point->childTableName = calloc(1, tableNameLen+1);
strncpy(point->childTableName, childTableName, tableNameLen);
point->childTableName[tableNameLen] = '\0';
......@@ -776,7 +805,7 @@ static int32_t arrangePointsByChildTableName(TAOS_SML_DATA_POINT* points, int nu
}
static int32_t applyChildTableTags(TAOS* taos, char* cTableName, char* sTableName,
SSmlSTableSchema* sTableSchema, SArray* cTablePoints) {
SSmlSTableSchema* sTableSchema, SArray* cTablePoints, SSmlLinesInfo* info) {
size_t numTags = taosArrayGetSize(sTableSchema->tags);
size_t rows = taosArrayGetSize(cTablePoints);
......@@ -832,7 +861,7 @@ static int32_t applyChildTableTags(TAOS* taos, char* cTableName, char* sTableNam
int32_t code = taos_errno(result);
if (code != 0) {
tscError("get child table %s tags failed. error string %s", cTableName, taos_errstr(result));
tscError("SML:0x%"PRIx64" get child table %s tags failed. error string %s", info->id, cTableName, taos_errstr(result));
goto cleanup;
}
......@@ -849,8 +878,8 @@ static int32_t applyChildTableTags(TAOS* taos, char* cTableName, char* sTableNam
TAOS_SML_KV* tagKV = tagKVs[notNullTagsIndices[i-1]];
if (tagKV->type != dbType) {
tscError("child table %s tag %s type mismatch. point type : %d, db type : %d",
cTableName, tagKV->key, tagKV->type, dbType);
tscError("SML:0x%"PRIx64" child table %s tag %s type mismatch. point type : %d, db type : %d",
info->id, cTableName, tagKV->key, tagKV->type, dbType);
return TSDB_CODE_TSC_INVALID_VALUE;
}
......@@ -858,16 +887,16 @@ static int32_t applyChildTableTags(TAOS* taos, char* cTableName, char* sTableNam
if (val == NULL || length != tagKV->length || memcmp(tagKV->value, val, length) != 0) {
TAOS_BIND* bind = taosArrayGet(tagBinds, tagKV->fieldSchemaIdx);
code = changeChildTableTagValue(taos, cTableName, tagKV->key, bind);
code = changeChildTableTagValue(taos, cTableName, tagKV->key, bind, info);
if (code != 0) {
tscError("change child table tag failed. table name %s, tag %s", cTableName, tagKV->key);
tscError("SML:0x%"PRIx64" change child table tag failed. table name %s, tag %s", info->id, cTableName, tagKV->key);
goto cleanup;
}
}
}
tscDebug("successfully applied point tags. child table: %s", cTableName);
tscDebug("SML:0x%"PRIx64" successfully applied point tags. child table: %s", info->id, cTableName);
} else {
code = creatChildTableIfNotExists(taos, cTableName, sTableName, sTableSchema->tags, tagBinds);
code = creatChildTableIfNotExists(taos, cTableName, sTableName, sTableSchema->tags, tagBinds, info);
if (code != 0) {
goto cleanup;
}
......@@ -883,7 +912,8 @@ cleanup:
return code;
}
static int32_t applyChildTableFields(TAOS* taos, SSmlSTableSchema* sTableSchema, char* cTableName, SArray* cTablePoints) {
static int32_t applyChildTableFields(TAOS* taos, SSmlSTableSchema* sTableSchema, char* cTableName,
SArray* cTablePoints, SSmlLinesInfo* info) {
int32_t code = TSDB_CODE_SUCCESS;
size_t numCols = taosArrayGetSize(sTableSchema->fields);
......@@ -895,8 +925,8 @@ static int32_t applyChildTableFields(TAOS* taos, SSmlSTableSchema* sTableSchema,
TAOS_BIND* colBinds = calloc(numCols, sizeof(TAOS_BIND));
if (colBinds == NULL) {
tscError("taos_sml_insert insert points, failed to allocated memory for TAOS_BIND, "
"num of rows: %zu, num of cols: %zu", rows, numCols);
tscError("SML:0x%"PRIx64" taos_sml_insert insert points, failed to allocated memory for TAOS_BIND, "
"num of rows: %zu, num of cols: %zu", info->id, rows, numCols);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
......@@ -917,9 +947,9 @@ static int32_t applyChildTableFields(TAOS* taos, SSmlSTableSchema* sTableSchema,
taosArrayPush(rowsBind, &colBinds);
}
code = insertChildTableBatch(taos, cTableName, sTableSchema->fields, rowsBind);
code = insertChildTableBatch(taos, cTableName, sTableSchema->fields, rowsBind, info);
if (code != 0) {
tscError("insert into child table %s failed. error %s", cTableName, tstrerror(code));
tscError("SML:0x%"PRIx64" insert into child table %s failed. error %s", info->id, cTableName, tstrerror(code));
}
for (int i = 0; i < rows; ++i) {
......@@ -934,30 +964,35 @@ static int32_t applyChildTableFields(TAOS* taos, SSmlSTableSchema* sTableSchema,
return code;
}
static int32_t applyDataPoints(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t numPoints, SArray* stableSchemas) {
static int32_t applyDataPoints(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t numPoints, SArray* stableSchemas, SSmlLinesInfo* info) {
int32_t code = TSDB_CODE_SUCCESS;
SHashObj* cname2points = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
arrangePointsByChildTableName(points, numPoints, cname2points, stableSchemas);
arrangePointsByChildTableName(points, numPoints, cname2points, stableSchemas, info);
SArray** pCTablePoints = taosHashIterate(cname2points, NULL);
while (pCTablePoints) {
SArray* cTablePoints = *pCTablePoints;
TAOS_SML_DATA_POINT* point = taosArrayGetP(cTablePoints, 0);
SSmlSTableSchema* sTableSchema = taosArrayGet(stableSchemas, point->schemaIdx);
code = applyChildTableTags(taos, point->childTableName, point->stableName, sTableSchema, cTablePoints);
tscDebug("SML:0x%"PRIx64" apply child table tags. child table: %s", info->id, point->childTableName);
code = applyChildTableTags(taos, point->childTableName, point->stableName, sTableSchema, cTablePoints, info);
if (code != 0) {
tscError("apply child table tags failed. child table %s, error %s", point->childTableName, tstrerror(code));
goto cleanup;
}
code = applyChildTableFields(taos, sTableSchema, point->childTableName, cTablePoints);
tscDebug("SML:0x%"PRIx64" apply child table points. child table: %s", info->id, point->childTableName);
code = applyChildTableFields(taos, sTableSchema, point->childTableName, cTablePoints, info);
if (code != 0) {
tscError("Apply child table fields failed. child table %s, error %s", point->childTableName, tstrerror(code));
goto cleanup;
}
tscDebug("successfully applied data points of child table %s", point->childTableName);
tscDebug("SML:0x%"PRIx64" successfully applied data points of child table %s", info->id, point->childTableName);
pCTablePoints = taosHashIterate(cname2points, pCTablePoints);
}
......@@ -973,27 +1008,30 @@ cleanup:
return code;
}
int taos_sml_insert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint) {
tscDebug("taos_sml_insert. number of points: %d", numPoint);
int taos_sml_insert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint, SSmlLinesInfo* info) {
tscDebug("SML:0x%"PRIx64" taos_sml_insert. number of points: %d", info->id, numPoint);
int32_t code = TSDB_CODE_SUCCESS;
tscDebug("SML:0x%"PRIx64" build data point schemas", info->id);
SArray* stableSchemas = taosArrayInit(32, sizeof(SSmlSTableSchema)); // SArray<STableColumnsSchema>
code = buildDataPointSchemas(points, numPoint, stableSchemas);
code = buildDataPointSchemas(points, numPoint, stableSchemas, info);
if (code != 0) {
tscError("error building data point schemas : %s", tstrerror(code));
tscError("SML:0x%"PRIx64" error building data point schemas : %s", info->id, tstrerror(code));
goto clean_up;
}
code = modifyDBSchemas(taos, stableSchemas);
tscDebug("SML:0x%"PRIx64" modify db schemas", info->id);
code = modifyDBSchemas(taos, stableSchemas, info);
if (code != 0) {
tscError("error change db schema : %s", tstrerror(code));
tscError("SML:0x%"PRIx64" error change db schema : %s", info->id, tstrerror(code));
goto clean_up;
}
code = applyDataPoints(taos, points, numPoint, stableSchemas);
tscDebug("SML:0x%"PRIx64" apply data points", info->id);
code = applyDataPoints(taos, points, numPoint, stableSchemas, info);
if (code != 0) {
tscError("error apply data points : %s", tstrerror(code));
tscError("SML:0x%"PRIx64" error apply data points : %s", info->id, tstrerror(code));
}
clean_up:
......@@ -1904,6 +1942,18 @@ int32_t tscParseLine(const char* sql, TAOS_SML_DATA_POINT* smlData) {
//=========================================================================
static uint64_t linesSmlHandleId = 0;
uint64_t genLinesSmlId() {
uint64_t id;
do {
id = atomic_add_fetch_64(&linesSmlHandleId, 1);
} while (id == 0);
return id;
}
void destroySmlDataPoint(TAOS_SML_DATA_POINT* point) {
for (int i=0; i<point->tagNum; ++i) {
free((point->tags+i)->key);
......@@ -1919,16 +1969,16 @@ void destroySmlDataPoint(TAOS_SML_DATA_POINT* point) {
free(point->childTableName);
}
int32_t tscParseLines(char* lines[], int numLines, SArray* points, SArray* failedLines) {
int32_t tscParseLines(char* lines[], int numLines, SArray* points, SArray* failedLines, SSmlLinesInfo* info) {
for (int32_t i = 0; i < numLines; ++i) {
TAOS_SML_DATA_POINT point = {0};
int32_t code = tscParseLine(lines[i], &point);
if (code != TSDB_CODE_SUCCESS) {
tscError("data point line parse failed. line %d : %s", i, lines[i]);
tscError("SML:0x%"PRIx64" data point line parse failed. line %d : %s", info->id, i, lines[i]);
destroySmlDataPoint(&point);
return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
} else {
tscDebug("data point line parse success. line %d", i);
tscDebug("SML:0x%"PRIx64" data point line parse success. line %d", info->id, i);
}
taosArrayPush(points, &point);
......@@ -1939,15 +1989,19 @@ int32_t tscParseLines(char* lines[], int numLines, SArray* points, SArray* faile
int taos_insert_lines(TAOS* taos, char* lines[], int numLines) {
int32_t code = 0;
SSmlLinesInfo* info = calloc(1, sizeof(SSmlLinesInfo));
info->id = genLinesSmlId();
if (numLines <= 0 || numLines > 65536) {
tscError("taos_insert_lines numLines should be between 1 and 65536. numLines: %d", numLines);
tscError("SML:0x%"PRIx64" taos_insert_lines numLines should be between 1 and 65536. numLines: %d", info->id, numLines);
code = TSDB_CODE_TSC_APP_ERROR;
return code;
}
for (int i = 0; i < numLines; ++i) {
if (lines[i] == NULL) {
tscError("taos_insert_lines line %d is NULL", i);
tscError("SML:0x%"PRIx64" taos_insert_lines line %d is NULL", info->id, i);
free(info);
code = TSDB_CODE_TSC_APP_ERROR;
return code;
}
......@@ -1955,12 +2009,13 @@ int taos_insert_lines(TAOS* taos, char* lines[], int numLines) {
SArray* lpPoints = taosArrayInit(numLines, sizeof(TAOS_SML_DATA_POINT));
if (lpPoints == NULL) {
tscError("taos_insert_lines failed to allocate memory");
tscError("SML:0x%"PRIx64" taos_insert_lines failed to allocate memory", info->id);
free(info);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
tscDebug("taos_insert_lines begin inserting %d lines, first line: %s", numLines, lines[0]);
code = tscParseLines(lines, numLines, lpPoints, NULL);
tscDebug("SML:0x%"PRIx64" taos_insert_lines begin inserting %d lines, first line: %s", info->id, numLines, lines[0]);
code = tscParseLines(lines, numLines, lpPoints, NULL, info);
size_t numPoints = taosArrayGetSize(lpPoints);
if (code != 0) {
......@@ -1968,13 +2023,13 @@ int taos_insert_lines(TAOS* taos, char* lines[], int numLines) {
}
TAOS_SML_DATA_POINT* points = TARRAY_GET_START(lpPoints);
code = taos_sml_insert(taos, points, (int)numPoints);
code = taos_sml_insert(taos, points, (int)numPoints, info);
if (code != 0) {
tscError("taos_sml_insert error: %s", tstrerror((code)));
tscError("SML:0x%"PRIx64" taos_sml_insert error: %s", info->id, tstrerror((code)));
}
cleanup:
tscDebug("taos_insert_lines finish inserting %d lines. code: %d", numLines, code);
tscDebug("SML:0x%"PRIx64" taos_insert_lines finish inserting %d lines. code: %d", info->id, numLines, code);
points = TARRAY_GET_START(lpPoints);
numPoints = taosArrayGetSize(lpPoints);
for (int i=0; i<numPoints; ++i) {
......@@ -1982,6 +2037,8 @@ cleanup:
}
taosArrayDestroy(lpPoints);
free(info);
return code;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册