diff --git a/src/client/src/tscParseLineProtocol.c b/src/client/src/tscParseLineProtocol.c index 04b96dc52a9a34e6ed91add4792d4bbe49a292b8..b657594db447a13b1db495fe63db7307565e2b07 100644 --- a/src/client/src/tscParseLineProtocol.c +++ b/src/client/src/tscParseLineProtocol.c @@ -98,8 +98,12 @@ static int32_t getFieldBytesFromSmlKv(TAOS_SML_KV* kv, int32_t* bytes) { if (kv->type == TSDB_DATA_TYPE_NCHAR) { char* ucs = malloc(kv->length * TSDB_NCHAR_SIZE + 1); int32_t bytesNeeded = 0; - //todo check conversion succeed - taosMbsToUcs4(kv->value, kv->length, ucs, kv->length * TSDB_NCHAR_SIZE, &bytesNeeded); + bool succ = taosMbsToUcs4(kv->value, kv->length, ucs, kv->length * TSDB_NCHAR_SIZE, &bytesNeeded); + if (!succ) { + free(ucs); + tscError("convert nchar string to UCS4_LE failed:%s", kv->value); + return TSDB_CODE_TSC_INVALID_VALUE; + } free(ucs); *bytes = bytesNeeded + VARSTR_HEADER_SIZE; } else if (kv->type == TSDB_DATA_TYPE_BINARY) { @@ -112,17 +116,20 @@ static int32_t getFieldBytesFromSmlKv(TAOS_SML_KV* kv, int32_t* bytes) { static int32_t buildSmlKvSchema(TAOS_SML_KV* smlKv, SHashObj* hash, SArray* array) { SSchema* pField = NULL; SSchema** ppField = taosHashGet(hash, smlKv->key, strlen(smlKv->key)); + int32_t code = 0; if (ppField) { pField = *ppField; if (pField->type != smlKv->type) { - //TODO: - tscError("type mismatch"); - return -1; + tscError("type mismatch. key %s, type %d. type before %d", smlKv->key, smlKv->type, pField->type); + return TSDB_CODE_TSC_INVALID_VALUE; } int32_t bytes = 0; - getFieldBytesFromSmlKv(smlKv, &bytes); + code = getFieldBytesFromSmlKv(smlKv, &bytes); + if (code != 0) { + return code; + } pField->bytes = MAX(pField->bytes, bytes); } else { @@ -133,7 +140,10 @@ static int32_t buildSmlKvSchema(TAOS_SML_KV* smlKv, SHashObj* hash, SArray* arra field.type = smlKv->type; int32_t bytes = 0; - getFieldBytesFromSmlKv(smlKv, &bytes); + code = getFieldBytesFromSmlKv(smlKv, &bytes); + if (code != 0) { + return code; + } field.bytes = bytes; pField = taosArrayPush(array, &field); @@ -146,6 +156,7 @@ static int32_t buildSmlKvSchema(TAOS_SML_KV* smlKv, SHashObj* hash, SArray* arra } static int32_t buildDataPointSchemas(TAOS_SML_DATA_POINT* points, int numPoint, SArray* stableSchemas) { + int32_t code = 0; SHashObj* sname2shema = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false); @@ -171,12 +182,20 @@ static int32_t buildDataPointSchemas(TAOS_SML_DATA_POINT* points, int numPoint, for (int j = 0; j < point->tagNum; ++j) { TAOS_SML_KV* tagKv = point->tags + j; - buildSmlKvSchema(tagKv, pStableSchema->tagHash, pStableSchema->tags); + code = buildSmlKvSchema(tagKv, pStableSchema->tagHash, pStableSchema->tags); + if (code != 0) { + tscError("build data point schema failed. point no.: %d, tag key: %s", i, tagKv->key); + return code; + } } for (int j = 0; j < point->fieldNum; ++j) { TAOS_SML_KV* fieldKv = point->fields + j; - buildSmlKvSchema(fieldKv, pStableSchema->fieldHash, pStableSchema->fields); + code = buildSmlKvSchema(fieldKv, pStableSchema->fieldHash, pStableSchema->fields); + if (code != 0) { + tscError("build data point schema failed. point no.: %d, tag key: %s", i, fieldKv->key); + return code; + } } point->schema = pStableSchema; @@ -190,6 +209,13 @@ static int32_t buildDataPointSchemas(TAOS_SML_DATA_POINT* points, int numPoint, } taosHashCleanup(sname2shema); + tscDebug("build point schema succeed. num of super table: %zu", numStables); + for (int32_t i = 0; i < numStables; ++i) { + SSmlSTableSchema* schema = taosArrayGet(stableSchemas, i); + tscDebug("\ttable name: %s, tags number: %zu, fields number: %zu", schema->sTableName, + taosArrayGetSize(schema->tags), taosArrayGetSize(schema->fields)); + } + return 0; } @@ -199,8 +225,9 @@ static int32_t generateSchemaAction(SSchema* pointColField, SHashObj* dbAttrHash if (ppDbAttr) { SSchema* dbAttr = *ppDbAttr; if (pointColField->type != dbAttr->type) { - //todo error - return -5; + tscError("point type and db type mismatch. key: %s. point type: %d, db type: %d", pointColField->name, + pointColField->type, dbAttr->type); + return TSDB_CODE_TSC_INVALID_VALUE; } if (IS_VAR_DATA_TYPE(pointColField->type) && (pointColField->bytes > dbAttr->bytes)) { @@ -225,6 +252,7 @@ static int32_t generateSchemaAction(SSchema* pointColField, SHashObj* dbAttrHash action->alterSTable.field = pointColField; *actionNeeded = true; } + tscDebug("generate schema action. action needed: %d, action: %d", *actionNeeded, action->action); return 0; } @@ -256,12 +284,14 @@ static int32_t applySchemaAction(TAOS* taos, SSchemaAction* action) { int32_t outBytes = 0; char *result = (char *)calloc(1, capacity); + tscDebug("apply schema action: %d", action->action); switch (action->action) { case SCHEMA_ACTION_ADD_COLUMN: { int n = sprintf(result, "alter stable %s add column ", action->alterSTable.sTableName); buildColumnDescription(action->alterSTable.field, result+n, capacity-n, &outBytes); TAOS_RES* res = taos_query(taos, result); //TODO async doAsyncQuery code = taos_errno(res); + taos_free_result(res); break; } case SCHEMA_ACTION_ADD_TAG: { @@ -270,6 +300,7 @@ static int32_t applySchemaAction(TAOS* taos, SSchemaAction* action) { result+n, capacity-n, &outBytes); TAOS_RES* res = taos_query(taos, result); //TODO async doAsyncQuery code = taos_errno(res); + taos_free_result(res); break; } case SCHEMA_ACTION_CHANGE_COLUMN_SIZE: { @@ -278,6 +309,7 @@ static int32_t applySchemaAction(TAOS* taos, SSchemaAction* action) { capacity-n, &outBytes); TAOS_RES* res = taos_query(taos, result); //TODO async doAsyncQuery code = taos_errno(res); + taos_free_result(res); break; } case SCHEMA_ACTION_CHANGE_TAG_SIZE: { @@ -286,6 +318,7 @@ static int32_t applySchemaAction(TAOS* taos, SSchemaAction* action) { capacity-n, &outBytes); TAOS_RES* res = taos_query(taos, result); //TODO async doAsyncQuery code = taos_errno(res); + taos_free_result(res); break; } case SCHEMA_ACTION_CREATE_STABLE: { @@ -314,13 +347,18 @@ static int32_t applySchemaAction(TAOS* taos, SSchemaAction* action) { outBytes = snprintf(pos, freeBytes, ")"); TAOS_RES* res = taos_query(taos, result); code = taos_errno(res); + taos_free_result(res); break; } default: break; } + free(result); + if (code != 0) { + tscError("apply schema action failure. %s", tstrerror(code)); + } return code; } @@ -341,11 +379,14 @@ int32_t loadTableMeta(TAOS* taos, char* tableName, SSmlSTableSchema* schema) { return TSDB_CODE_TSC_DISCONNECTED; } + tscDebug("load table schema. super table name: %s", tableName); + char sql[256]; snprintf(sql, 256, "describe %s", tableName); TAOS_RES* res = taos_query(taos, sql); code = taos_errno(res); if (code != 0) { + tscError("describe table failure. %s", taos_errstr(res)); taos_free_result(res); return code; } @@ -369,16 +410,13 @@ int32_t loadTableMeta(TAOS* taos, char* tableName, SSmlSTableSchema* schema) { if ((code = tscSetTableFullName(&sname, &tableToken, pSql)) != TSDB_CODE_SUCCESS) { return code; } - char fullTableName[TSDB_TABLE_FNAME_LEN] = {0}; memset(fullTableName, 0, tListLen(fullTableName)); tNameExtractFullName(&sname, fullTableName); - if (code != TSDB_CODE_SUCCESS) { tscFreeSqlObj(pSql); return code; } - tscFreeSqlObj(pSql); schema->tags = taosArrayInit(8, sizeof(SSchema)); @@ -410,6 +448,8 @@ int32_t loadTableMeta(TAOS* taos, char* tableName, SSmlSTableSchema* schema) { SSchema* pField = taosArrayPush(schema->tags, &field); taosHashPut(schema->tagHash, field.name, strlen(field.name), &pField, POINTER_BYTES); } + tscDebug("load table meta succeed. %s, columns number: %d, tag number: %d, precision: %d", + tableName, tableMeta->tableInfo.numOfColumns, tableMeta->tableInfo.numOfTags, schema->precision); free(tableMeta); tableMeta = NULL; return code; } @@ -422,7 +462,6 @@ static int32_t reconcileDBSchemas(TAOS* taos, SArray* stableSchemas) { SSmlSTableSchema dbSchema = {0}; code = loadTableMeta(taos, pointSchema->sTableName, &dbSchema); - if (code == TSDB_CODE_MND_INVALID_TABLE_NAME) { SSchemaAction schemaAction = {0}; schemaAction.action = SCHEMA_ACTION_CREATE_STABLE; @@ -432,10 +471,12 @@ static int32_t reconcileDBSchemas(TAOS* taos, SArray* stableSchemas) { schemaAction.createSTable.fields = pointSchema->fields; applySchemaAction(taos, &schemaAction); code = loadTableMeta(taos, pointSchema->sTableName, &dbSchema); - - pointSchema->precision = dbSchema.precision; - - destroySmlSTableSchema(&dbSchema); + if (code != 0) { + tscError("reconcile point schema failed. can not create %s", pointSchema->sTableName); + } else { + pointSchema->precision = dbSchema.precision; + destroySmlSTableSchema(&dbSchema); + } } else if (code == TSDB_CODE_SUCCESS) { size_t pointTagSize = taosArrayGetSize(pointSchema->tags); size_t pointFieldSize = taosArrayGetSize(pointSchema->fields); @@ -471,6 +512,7 @@ static int32_t reconcileDBSchemas(TAOS* taos, SArray* stableSchemas) { destroySmlSTableSchema(&dbSchema); } else { + tscError("load table meta error: %s", tstrerror(code)); return code; } } @@ -496,11 +538,12 @@ static int32_t getChildTableName(TAOS_SML_DATA_POINT* point, char* tableName, in MD5Update(&context, (uint8_t *)keyJoined, (uint32_t)len); MD5Final(&context); *tableNameLen = snprintf(tableName, *tableNameLen, - "tbl_%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x", context.digest[0], + "t_%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x", context.digest[0], context.digest[1], context.digest[2], context.digest[3], context.digest[4], context.digest[5], context.digest[6], context.digest[7], context.digest[8], context.digest[9], context.digest[10], context.digest[11], context.digest[12], context.digest[13], context.digest[14], context.digest[15]); taosStringBuilderDestroy(&sb); + tscDebug("child table name: %s", tableName); return 0; } @@ -524,23 +567,25 @@ static int32_t creatChildTableIfNotExists(TAOS* taos, const char* cTableName, co } snprintf(sql + strlen(sql) - 1, freeBytes-strlen(sql)+1, ")"); + tscDebug("create table : %s", sql); + TAOS_STMT* stmt = taos_stmt_init(taos); int32_t code; code = taos_stmt_prepare(stmt, sql, strlen(sql)); if (code != 0) { - printf("%s", taos_stmt_errstr(stmt)); + tscError("%s", taos_stmt_errstr(stmt)); return code; } code = taos_stmt_bind_param(stmt, TARRAY_GET_START(tagsBind)); if (code != 0) { - printf("%s", taos_stmt_errstr(stmt)); + tscError("%s", taos_stmt_errstr(stmt)); return code; } code = taos_stmt_execute(stmt); if (code != 0) { - printf("%s", taos_stmt_errstr(stmt)); + tscError("%s", taos_stmt_errstr(stmt)); return code; } @@ -565,6 +610,7 @@ static int32_t insertChildTableBatch(TAOS* taos, char* cTableName, SArray* cols } snprintf(sql + strlen(sql)-1, freeBytes-strlen(sql)+1, ")"); + tscDebug("insert rows %zu into child table %s. ", taosArrayGetSize(rowsBind), cTableName); int32_t code = 0; int32_t try = 0; do { @@ -572,13 +618,13 @@ static int32_t insertChildTableBatch(TAOS* taos, char* cTableName, SArray* cols code = taos_stmt_prepare(stmt, sql, strlen(sql)); if (code != 0) { - printf("%s", taos_stmt_errstr(stmt)); + tscError("%s", taos_stmt_errstr(stmt)); return code; } code = taos_stmt_set_tbname(stmt, cTableName); if (code != 0) { - printf("%s", taos_stmt_errstr(stmt)); + tscError("%s", taos_stmt_errstr(stmt)); return code; } @@ -587,19 +633,19 @@ static int32_t insertChildTableBatch(TAOS* taos, char* cTableName, SArray* cols TAOS_BIND* colsBinds = taosArrayGetP(rowsBind, i); code = taos_stmt_bind_param(stmt, colsBinds); if (code != 0) { - printf("%s", taos_stmt_errstr(stmt)); + tscError("%s", taos_stmt_errstr(stmt)); return code; } code = taos_stmt_add_batch(stmt); if (code != 0) { - printf("%s", taos_stmt_errstr(stmt)); + tscError("%s", taos_stmt_errstr(stmt)); return code; } } code = taos_stmt_execute(stmt); if (code != 0) { - printf("%s", taos_stmt_errstr(stmt)); + tscError("%s", taos_stmt_errstr(stmt)); taos_stmt_close(stmt); } else { taos_stmt_close(stmt); @@ -722,6 +768,7 @@ static int32_t insertPoints(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t num TAOS_BIND* bind = colBinds + j; free(bind->length); } + free(colBinds); } taosArrayDestroy(rowsBind); taosArrayDestroy(cTablePoints); @@ -734,6 +781,8 @@ static int32_t insertPoints(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t num } int taos_sml_insert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint) { + tscDebug("taos_sml_insert. number of points: %d", numPoint); + int32_t code = TSDB_CODE_SUCCESS; SArray* stableSchemas = taosArrayInit(32, sizeof(SSmlSTableSchema)); // SArray @@ -846,6 +895,30 @@ int32_t parseItemValue(SLPItem* item, LPItemKind kind) { item->value = malloc(item->length); char* endptr = NULL; *(int64_t*)(item->value) = strtoll(sv, &endptr, 10); + } else if (*last == 'b') { + item->type = TSDB_DATA_TYPE_TINYINT; + item->length = (int16_t)tDataTypes[item->type].bytes; + item->value = malloc(item->length); + char* endptr = NULL; + *(int8_t*)(item->value) = strtoll(sv, &endptr, 10); + } else if (*last == 's') { + item->type = TSDB_DATA_TYPE_SMALLINT; + item->length = (int16_t)tDataTypes[item->type].bytes; + item->value = malloc(item->length); + char* endptr = NULL; + *(int16_t*)(item->value) = strtoll(sv, &endptr, 10); + } else if (*last == 'w') { + item->type = TSDB_DATA_TYPE_INT; + item->length = (int16_t)tDataTypes[item->type].bytes; + item->value = malloc(item->length); + char* endptr = NULL; + *(int32_t*)(item->value) = strtoll(sv, &endptr, 10); + } else if (*last == 'f') { + item->type = TSDB_DATA_TYPE_FLOAT; + item->length = (int16_t)tDataTypes[item->type].bytes; + item->value = malloc(item->length); + char* endptr = NULL; + *(float*)(item->value) = strtold(sv, &endptr); } else { item->type = TSDB_DATA_TYPE_DOUBLE; item->length = (int16_t)tDataTypes[item->type].bytes; @@ -1001,6 +1074,36 @@ int32_t tscParseLines(char* lines[], int numLines, SArray* points, SArray* faile return 0; } +void destroyLPPoint(void* p) { + SLPPoint* lpPoint = p; + for (int i=0; ifields); ++i) { + SLPItem* item = taosArrayGet(lpPoint->fields, i); + free(item->value); + } + taosArrayDestroy(lpPoint->fields); + + for (int i=0; itags); ++i) { + SLPItem* item = taosArrayGet(lpPoint->tags, i); + free(item->value); + } + taosArrayDestroy(lpPoint->tags); +} + +void destroySmlDataPoint(TAOS_SML_DATA_POINT* point) { + for (int i=0; itagNum; ++i) { + free((point->tags+i)->key); + free((point->tags+i)->value); + } + free(point->tags); + for (int i=0; ifieldNum; ++i) { + free((point->fields+i)->key); + free((point->fields+i)->value); + } + free(point->fields); + free(point->stableName); + free(point->childTableName); +} + int taos_insert_by_lines(TAOS* taos, char* lines[], int numLines) { SArray* lpPoints = taosArrayInit(numLines, sizeof(SLPPoint)); tscParseLines(lines, numLines, lpPoints, NULL); @@ -1065,6 +1168,12 @@ int taos_insert_by_lines(TAOS* taos, char* lines[], int numLines) { } taos_sml_insert(taos, points, numPoints); + + for (int i=0; i