diff --git a/src/client/src/tscParseLineProtocol.c b/src/client/src/tscParseLineProtocol.c index 10d2429b5e95d70644f5e78099535dd33b21b3fc..2738fe6f7afad80eec8c909253207b01c1ade21f 100644 --- a/src/client/src/tscParseLineProtocol.c +++ b/src/client/src/tscParseLineProtocol.c @@ -272,6 +272,14 @@ TAOS_RES* taos_insert_by_lines(TAOS* taos, char* lines[], int numLines) { } //================================================================================================= +typedef struct { + char sTableName[TSDB_TABLE_NAME_LEN]; + SHashObj* tagHash; + SHashObj* fieldHash; + SArray* tags; //SArray + SArray* fields; //SArray +} SSmlSTableSchema; + typedef struct { char* key; uint8_t type; @@ -279,7 +287,7 @@ typedef struct { char* value; //=================================== - SSchema* fieldSchema; + SSchema* schema; } TAOS_SML_KV; typedef struct { @@ -293,17 +301,10 @@ typedef struct { TAOS_SML_KV* fields; int fieldNum; + //================================ + SSmlSTableSchema* schema; } TAOS_SML_DATA_POINT; -typedef struct { - char sTableName[TSDB_TABLE_NAME_LEN]; - SHashObj* tagHash; - SHashObj* fieldHash; - SArray* tags; //SArray - SArray* fields; //SArray -} SSmlSTableSchema; - - int compareSmlColKv(const void* p1, const void* p2) { TAOS_SML_KV* kv1 = (TAOS_SML_KV*)p1; TAOS_SML_KV* kv2 = (TAOS_SML_KV*)p2; @@ -426,7 +427,6 @@ typedef enum { SCHEMA_ACTION_ADD_TAG, SCHEMA_ACTION_CHANGE_COLUMN_SIZE, SCHEMA_ACTION_CHANGE_TAG_SIZE, - SCHEMA_ACTION_CREATE_CTABLE } ESchemaAction; typedef struct { @@ -440,19 +440,11 @@ typedef struct { SSchema* field; } SAlterSTableActionInfo; -typedef struct { - char sTableName[TSDB_TABLE_NAME_LEN]; - char cTableName[TSDB_TABLE_NAME_LEN]; - TAOS_SML_KV* tags; - int tagNum; -} SCreateCTableActionInfo; - typedef struct { ESchemaAction action; union { SCreateSTableActionInfo createSTable; SAlterSTableActionInfo alterSTable; - SCreateCTableActionInfo createCTable; }; } SSchemaAction; @@ -505,7 +497,7 @@ int32_t addTaosFieldToHashAndArray(TAOS_SML_KV* smlKv, SHashObj* hash, SArray* a taosHashPut(hash, field.name, tagKeyLen, &pField, POINTER_BYTES); } - smlKv->fieldSchema = pField; + smlKv->schema = pField; return 0; } @@ -545,96 +537,6 @@ int32_t generateSchemaAction(SSchema* pointColField, SHashObj* dbAttrHash, bool return 0; } -int taos_sml_insert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint) { - int32_t code = TSDB_CODE_SUCCESS; - SArray* stableArray = taosArrayInit(32, sizeof(SSmlSTableSchema)); // SArray - SHashObj* sname2shema = taosHashInit(32, - taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false); - - for (int i = 0; i < numPoint; ++i) { - TAOS_SML_DATA_POINT* point = &points[i]; - SSmlSTableSchema** ppStableSchema = taosHashGet(sname2shema, point->stableName, TSDB_TABLE_NAME_LEN); - SSmlSTableSchema* pStableSchema = NULL; - if (ppStableSchema) { - pStableSchema= *ppStableSchema; - } else { - SSmlSTableSchema schema; - size_t stableNameLen = strlen(point->stableName); - strncpy(schema.sTableName, point->stableName, stableNameLen); - schema.sTableName[stableNameLen] = '\0'; - schema.fields = taosArrayInit(64, sizeof(SSchema)); - schema.tags = taosArrayInit(8, sizeof(SSchema)); - schema.tagHash = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false); - schema.fieldHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false); - - pStableSchema = taosArrayPush(stableArray, &schema); - taosHashPut(sname2shema, schema.sTableName, stableNameLen, &pStableSchema, POINTER_BYTES); - } - - for (int j = 0; j < point->tagNum; ++j) { - TAOS_SML_KV* tagKv = point->tags + j; - addTaosFieldToHashAndArray(tagKv, pStableSchema->tagHash, pStableSchema->tags); - } - - for (int j = 0; j < point->fieldNum; ++j) { - TAOS_SML_KV* fieldKv = point->fields + j; - addTaosFieldToHashAndArray(fieldKv, pStableSchema->fieldHash, pStableSchema->fields); - } - } - - SArray* schemaActions = taosArrayInit(32, sizeof(SSchemaAction)); - size_t numStable = taosArrayGetSize(stableArray); - for (int i = 0; i < numStable; ++i) { - SSmlSTableSchema* pointSchema = taosArrayGet(stableArray, i); - SSmlSTableSchema dbSchema = {0}; - dbSchema.fields = taosArrayInit(64, sizeof(SSchema)); - dbSchema.tags = taosArrayInit(8, sizeof(SSchema)); - dbSchema.tagHash = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false); - dbSchema.fieldHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false); - code = loadTableMeta(taos, pointSchema->sTableName, &dbSchema); - if (code == TSDB_CODE_MND_INVALID_TABLE_NAME) { - SSchemaAction schemaAction = {0}; - schemaAction.action = SCHEMA_ACTION_CREATE_STABLE; - memset(&schemaAction.createSTable, 0, sizeof(SCreateSTableActionInfo)); - memcpy(schemaAction.createSTable.sTableName, pointSchema->sTableName, TSDB_TABLE_NAME_LEN); - schemaAction.createSTable.tags = pointSchema->tags; - schemaAction.createSTable.fields = pointSchema->fields; - taosArrayPush(schemaActions, &schemaAction); - }else if (code == TSDB_CODE_SUCCESS) { - size_t pointTagSize = taosArrayGetSize(pointSchema->tags); - size_t pointFieldSize = taosArrayGetSize(pointSchema->fields); - - SHashObj* dbTagHash = dbSchema.tagHash; - SHashObj* dbFieldHash = dbSchema.fieldHash; - - for (int j = 0; j < pointTagSize; ++j) { - SSchema* pointTag = taosArrayGet(pointSchema->tags, j); - SSchemaAction schemaAction = {0}; - bool actionNeeded = false; - generateSchemaAction(pointTag, dbTagHash, true, pointSchema->sTableName, &schemaAction, &actionNeeded); - if (actionNeeded) { - taosArrayPush(schemaActions, &schemaAction); - } - } - - for (int j = 0; j < pointFieldSize; ++j) { - SSchema* pointCol = taosArrayGet(pointSchema->tags, j); - SSchemaAction schemaAction = {0}; - bool actionNeeded = false; - generateSchemaAction(pointCol, dbFieldHash, false, pointSchema->sTableName, &schemaAction, &actionNeeded); - if (actionNeeded) { - taosArrayPush(schemaActions, &schemaAction); - } - } - } else { - return code; - } - } - - return code; -} - - int32_t buildColumnDescription(SSchema* field, char* buf, int32_t bufSize, int32_t* outBytes) { uint8_t type = field->type; @@ -721,46 +623,7 @@ int32_t applySchemaAction(TAOS* taos, SSchemaAction* action) { code = taos_errno(res); break; } - case SCHEMA_ACTION_CREATE_CTABLE: { -// SCreateCTableActionInfo* pInfo = &action->createCTable; -// SArray* bindParams = taosArrayInit(2 + 2 * pInfo->tagNum, sizeof(TAOS_BIND)); -// outBytes = sprintf(result, "create table ? using ?("); -// char* pos = result + outBytes; int32_t freeBytes = capacity-outBytes; -// uintptr_t lenSTableName = strlen(pInfo->sTableName); -// uintptr_t lenCTableName = strlen(pInfo->cTableName); -// TAOS_BIND tbCTableName = {.is_null = NULL, .buffer_type = TSDB_DATA_TYPE_BINARY, -// .buffer = pInfo->cTableName, .length = &lenCTableName}; -// TAOS_BIND tbSTableName = {.is_null = NULL, .buffer_type = TSDB_DATA_TYPE_BINARY, -// .buffer = pInfo->sTableName, .length = &lenSTableName}; -// taosArrayPush(bindParams, &tbCTableName); -// taosArrayPush(bindParams, &tbSTableName); -// for (int32_t i = 0; i < pInfo->tagNum; ++i) { -// outBytes = snprintf(pos, freeBytes, "?,"); -// -// TAOS_SML_KV* tagKv = pInfo->tags + i; -// TAOS_BIND tbTag = {.is_null = NULL, .buffer_type = TSDB_DATA_TYPE_BINARY, -// .buffer = tagKv->key, .length = }; -// pos += outBytes; freeBytes -= outBytes; -// } -// --pos; ++freeBytes; -// -// outBytes = snprintf(pos, freeBytes, ") tags ("); -// pos += outBytes; freeBytes -= outBytes; -// for (int32_t i = 0; i < pInfo->tagNum; ++i) { -// TAOS_SML_KV* tagKv = pInfo->tags + i; -// outBytes = snprintf(pos, freeBytes, "?,"); -// pos += outBytes; freeBytes -= outBytes; -// } -// pos--; ++freeBytes; -// outBytes = snprintf(pos, freeBytes, ")"); -// -// TAOS_STMT* stmt = taos_stmt_init(taos); -// taos_stmt_prepare(stmt, result, strlen(result)); -// -// -// taos_stmt_bind_param(stmt, (TAOS_BIND*)bindParams); - break; - } + default: break; } @@ -768,55 +631,221 @@ int32_t applySchemaAction(TAOS* taos, SSchemaAction* action) { return code; } -int32_t transformIntoPreparedStatement(SArray* points) { - size_t numPoints = taosArrayGetSize(points); +int32_t getPreparedSQL(const char* sTableName, SArray* tagsSchema, SArray* colsSchema, char* result, int16_t freeBytes) { + size_t numTags = taosArrayGetSize(tagsSchema); + size_t numCols = taosArrayGetSize(colsSchema); + sprintf(result, "insert into ? using %s(", sTableName); + for (int i = 0; i < numTags; ++i) { + SSchema* tagSchema = taosArrayGet(tagsSchema, i); + snprintf(result+strlen(result), freeBytes-strlen(result), "%s,", tagSchema->name); + } + snprintf(result + strlen(result)-1, freeBytes-strlen(result)+1, ") tags ("); -// SHashObj* tag2bind = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false); -// SHashObj* field2multiBind = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false); + for (int i = 0; i < numTags; ++i) { + snprintf(result+strlen(result), freeBytes-strlen(result), "?,"); + } + snprintf(result + strlen(result)-1, freeBytes-strlen(result)+1, ") ("); - for (int32_t i = 0; i < numPoints; ++i) { - TAOS_SML_DATA_POINT * point = taosArrayGet(points, i); - char tableKey[256]; - snprintf(tableKey, 256, "%s.%s", point->stableName, point->childTableName); + for (int i = 0; i < numCols; ++i) { + SSchema* colSchema = taosArrayGet(colsSchema, i); + snprintf(result+strlen(result), freeBytes-strlen(result), "%s,", colSchema->name); + } + snprintf(result + strlen(result)-1, freeBytes-strlen(result)+1, ") values ("); + for (int i = 0; i < numCols; ++i) { + snprintf(result+strlen(result), freeBytes-strlen(result), "?,"); } + snprintf(result + strlen(result)-1, freeBytes-strlen(result)+1, ")"); return 0; } -int32_t insertBatch(TAOS* taos, const char* sTableName, char* cTableName, SSchema* tagsSchema, int numTags, TAOS_BIND* tagBind, - SSchema* colsSchema, int numCols, TAOS_MULTI_BIND* colBind) { +int32_t insertBatch(TAOS* taos, char* sql, char* cTableName, SArray* tagsBind, SArray* rowsBind) { TAOS_STMT* stmt = taos_stmt_init(taos); + taos_stmt_prepare(stmt, sql, strlen(sql)); + + taos_stmt_set_tbname_tags(stmt, cTableName, TARRAY_GET_START(tagsBind)); + size_t rows = taosArrayGetSize(rowsBind); + for (int32_t i = 0; i < rows; ++i) { + TAOS_BIND* colBind = taosArrayGetP(rowsBind, i); + taos_stmt_bind_param(stmt, colBind); + taos_stmt_add_batch(stmt); + } - char result[TSDB_MAX_BINARY_LEN] = {0}; - sprintf(result, "insert into ? using %s(", sTableName); - for (int i = 0; i < numTags; ++i) { - snprintf(result+strlen(result), TSDB_MAX_BINARY_LEN-strlen(result), "%s,", tagsSchema[i].name); + taos_stmt_execute(stmt); + TAOS_RES* res = taos_stmt_use_result(stmt); + return taos_errno(res); +} + +int32_t insertPoints(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t numPoints) { + SHashObj* cname2points = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false); + for (int32_t i = 0; i < numPoints; ++i) { + TAOS_SML_DATA_POINT * point = points + i; + if (!point->childTableName) { + char childTableName[TSDB_TABLE_NAME_LEN]; + int32_t tableNameLen; + getChildTableName(point, childTableName, &tableNameLen); + point->childTableName = calloc(1, tableNameLen+1); + strncpy(point->childTableName, childTableName, tableNameLen); + point->childTableName[tableNameLen] = '\0'; + } + SArray* cTablePoints = NULL; + SArray** pCTablePoints = taosHashGet(cname2points, point->childTableName, strlen(point->childTableName)); + if (pCTablePoints) { + cTablePoints = *pCTablePoints; + } else { + cTablePoints = taosArrayInit(64, sizeof(point)); + taosHashPut(cname2points, point->childTableName, strlen(point->childTableName), &cTablePoints, POINTER_BYTES); + } + taosArrayPush(cTablePoints, point); } - snprintf(result + strlen(result)-1, TSDB_MAX_BINARY_LEN-strlen(result)+1, ") tags ("); - for (int i = 0; i < numTags; ++i) { - snprintf(result+strlen(result), TSDB_MAX_BINARY_LEN-strlen(result), "?,"); + SArray** pCTablePoints = taosHashIterate(cname2points, NULL); + while (pCTablePoints) { + SArray* cTablePoints = *pCTablePoints; + TAOS_SML_DATA_POINT * point = taosArrayGet(cTablePoints, 0); + int32_t numTags = taosArrayGetSize(point->schema->tags); + int32_t numCols = taosArrayGetSize(point->schema->fields); + char* stableName = point->stableName; + char* ctableName = point->childTableName; + char sql[TSDB_MAX_BINARY_LEN]; + getPreparedSQL(stableName, point->schema->tags, point->schema->fields, sql, TSDB_MAX_BINARY_LEN); + + size_t rows = taosArrayGetSize(cTablePoints); + SArray* rowsBind = taosArrayInit(rows, POINTER_BYTES); + SArray* tagBinds = taosArrayInit(numTags, sizeof(TAOS_BIND)); + + for (int i = 0; i < rows; ++i) { + point = taosArrayGet(cTablePoints, i); + + taosArraySetSize(tagBinds, numTags); + for (int j = 0; j < point->tagNum; ++j) { + TAOS_SML_KV* kv = point->tags + j; + int32_t idx = TARRAY_ELEM_IDX(point->schema->tags, kv->schema); + TAOS_BIND* bind = taosArrayGet(tagBinds, idx); + bind->buffer_type = kv->type; + bind->length = (uintptr_t*)&kv->length; + bind->buffer = kv->value; + } + + SArray* colBinds = taosArrayInit(numCols, sizeof(TAOS_BIND)); + taosArraySetSize(colBinds, numCols); + for (int j = 0; jfieldNum; ++j) { + TAOS_SML_KV* kv = point->fields + j; + int32_t idx = TARRAY_ELEM_IDX(point->schema->fields, kv->schema); + TAOS_BIND* bind = taosArrayGet(colBinds, idx); + bind->buffer_type = kv->type; + bind->length = (uintptr_t*)&kv->length; + bind->buffer = kv->value; + } + taosArrayPush(rowsBind, &colBinds); + } + + insertBatch(taos, sql, ctableName, tagBinds, rowsBind); + + pCTablePoints = taosHashIterate(cname2points, pCTablePoints); } - snprintf(result + strlen(result)-1, TSDB_MAX_BINARY_LEN-strlen(result)+1, ") ("); + return 0; +} - for (int i = 0; i < numCols; ++i) { - snprintf(result+strlen(result), TSDB_MAX_BINARY_LEN-strlen(result), "%s,", colsSchema[i].name); + +int taos_sml_insert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint) { + int32_t code = TSDB_CODE_SUCCESS; + SArray* stableArray = taosArrayInit(32, sizeof(SSmlSTableSchema)); // SArray + SHashObj* sname2shema = taosHashInit(32, + taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false); + + for (int i = 0; i < numPoint; ++i) { + TAOS_SML_DATA_POINT* point = &points[i]; + SSmlSTableSchema** ppStableSchema = taosHashGet(sname2shema, point->stableName, TSDB_TABLE_NAME_LEN); + SSmlSTableSchema* pStableSchema = NULL; + if (ppStableSchema) { + pStableSchema= *ppStableSchema; + } else { + SSmlSTableSchema schema; + size_t stableNameLen = strlen(point->stableName); + strncpy(schema.sTableName, point->stableName, stableNameLen); + schema.sTableName[stableNameLen] = '\0'; + schema.fields = taosArrayInit(64, sizeof(SSchema)); + schema.tags = taosArrayInit(8, sizeof(SSchema)); + schema.tagHash = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false); + schema.fieldHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false); + + pStableSchema = taosArrayPush(stableArray, &schema); + taosHashPut(sname2shema, schema.sTableName, stableNameLen, &pStableSchema, POINTER_BYTES); + } + + for (int j = 0; j < point->tagNum; ++j) { + TAOS_SML_KV* tagKv = point->tags + j; + addTaosFieldToHashAndArray(tagKv, pStableSchema->tagHash, pStableSchema->tags); + } + + for (int j = 0; j < point->fieldNum; ++j) { + TAOS_SML_KV* fieldKv = point->fields + j; + addTaosFieldToHashAndArray(fieldKv, pStableSchema->fieldHash, pStableSchema->fields); + } + + point->schema = pStableSchema; } - snprintf(result + strlen(result)-1, TSDB_MAX_BINARY_LEN-strlen(result)+1, ") values ("); - for (int i = 0; i < numCols; ++i) { - snprintf(result+strlen(result), TSDB_MAX_BINARY_LEN-strlen(result), "?,"); + SArray* schemaActions = taosArrayInit(32, sizeof(SSchemaAction)); + size_t numStable = taosArrayGetSize(stableArray); + for (int i = 0; i < numStable; ++i) { + SSmlSTableSchema* pointSchema = taosArrayGet(stableArray, i); + SSmlSTableSchema dbSchema = {0}; + dbSchema.fields = taosArrayInit(64, sizeof(SSchema)); + dbSchema.tags = taosArrayInit(8, sizeof(SSchema)); + dbSchema.tagHash = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false); + dbSchema.fieldHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false); + code = loadTableMeta(taos, pointSchema->sTableName, &dbSchema); + if (code == TSDB_CODE_MND_INVALID_TABLE_NAME) { + SSchemaAction schemaAction = {0}; + schemaAction.action = SCHEMA_ACTION_CREATE_STABLE; + memset(&schemaAction.createSTable, 0, sizeof(SCreateSTableActionInfo)); + memcpy(schemaAction.createSTable.sTableName, pointSchema->sTableName, TSDB_TABLE_NAME_LEN); + schemaAction.createSTable.tags = pointSchema->tags; + schemaAction.createSTable.fields = pointSchema->fields; + taosArrayPush(schemaActions, &schemaAction); + }else if (code == TSDB_CODE_SUCCESS) { + size_t pointTagSize = taosArrayGetSize(pointSchema->tags); + size_t pointFieldSize = taosArrayGetSize(pointSchema->fields); + + SHashObj* dbTagHash = dbSchema.tagHash; + SHashObj* dbFieldHash = dbSchema.fieldHash; + + for (int j = 0; j < pointTagSize; ++j) { + SSchema* pointTag = taosArrayGet(pointSchema->tags, j); + SSchemaAction schemaAction = {0}; + bool actionNeeded = false; + generateSchemaAction(pointTag, dbTagHash, true, pointSchema->sTableName, &schemaAction, &actionNeeded); + if (actionNeeded) { + taosArrayPush(schemaActions, &schemaAction); + } + } + + for (int j = 0; j < pointFieldSize; ++j) { + SSchema* pointCol = taosArrayGet(pointSchema->tags, j); + SSchemaAction schemaAction = {0}; + bool actionNeeded = false; + generateSchemaAction(pointCol, dbFieldHash, false, pointSchema->sTableName, &schemaAction, &actionNeeded); + if (actionNeeded) { + taosArrayPush(schemaActions, &schemaAction); + } + } + } else { + return code; + } } - snprintf(result + strlen(result)-1, TSDB_MAX_BINARY_LEN-strlen(result)+1, ")"); - int32_t code = 0; - code = taos_stmt_prepare(stmt, result, strlen(result)); + for (int i = 0; i < taosArrayGetSize(schemaActions); ++i) { + SSchemaAction* action = taosArrayGet(schemaActions, i); + applySchemaAction(taos, action); + } - code = taos_stmt_set_tbname_tags(stmt, cTableName, tagBind); - code = taos_stmt_bind_param_batch(stmt, colBind); - code = taos_stmt_execute(stmt); + insertPoints(taos, points, numPoint); return code; } + + //todo: table/column length check //todo: type check //todo: taosmbs2ucs4 check