diff --git a/src/client/src/tscParseInsert.c b/src/client/src/tscParseInsert.c index cd56153bc4cbf0a6bf5748fc5f6e6a7390159377..26d9cf0e49caa0dfaa50fa7ff29b74f0793e73a1 100644 --- a/src/client/src/tscParseInsert.c +++ b/src/client/src/tscParseInsert.c @@ -1129,6 +1129,7 @@ int tsParseInsertSql(SSqlObj *pSql) { SSqlCmd *pCmd = &pSql->cmd; SInsertStatementParam* pInsertParam = &pCmd->insertParam; + pInsertParam->objectId = pSql->self; char* str = pInsertParam->sql; int32_t totalNum = 0; diff --git a/src/client/src/tscParseLineProtocol.c b/src/client/src/tscParseLineProtocol.c new file mode 100644 index 0000000000000000000000000000000000000000..37264e8eaa6b22444af893f12205d85dec8ad795 --- /dev/null +++ b/src/client/src/tscParseLineProtocol.c @@ -0,0 +1,1214 @@ +#include +#include +#include +#include + +#include "os.h" +#include "osString.h" +#include "ttype.h" +#include "tmd5.h" +#include "tstrbuild.h" +#include "tname.h" +#include "hash.h" +#include "tskiplist.h" + +#include "tscUtil.h" +#include "tsclient.h" +#include "tscLog.h" + +#include "taos.h" +typedef struct { + char sTableName[TSDB_TABLE_NAME_LEN]; + SHashObj* tagHash; + SHashObj* fieldHash; + SArray* tags; //SArray + SArray* fields; //SArray + uint8_t precision; +} SSmlSTableSchema; + +typedef struct { + char* key; + uint8_t type; + int16_t length; + char* value; + + //=================================== + SSchema* schema; +} TAOS_SML_KV; + +typedef struct { + char* stableName; + + char* childTableName; + TAOS_SML_KV* tags; + int tagNum; + + // first kv must be timestamp + TAOS_SML_KV* fields; + int fieldNum; + + //================================ + SSmlSTableSchema* schema; +} TAOS_SML_DATA_POINT; + +//================================================================================================= + +int compareSmlColKv(const void* p1, const void* p2) { + TAOS_SML_KV* kv1 = (TAOS_SML_KV*)p1; + TAOS_SML_KV* kv2 = (TAOS_SML_KV*)p2; + int kvLen1 = (int)strlen(kv1->key); + int kvLen2 = (int)strlen(kv2->key); + int res = strncasecmp(kv1->key, kv2->key, MIN(kvLen1, kvLen2)); + if (res != 0) { + return res; + } else { + return kvLen1-kvLen2; + } +} + +typedef enum { + SCHEMA_ACTION_CREATE_STABLE, + SCHEMA_ACTION_ADD_COLUMN, + SCHEMA_ACTION_ADD_TAG, + SCHEMA_ACTION_CHANGE_COLUMN_SIZE, + SCHEMA_ACTION_CHANGE_TAG_SIZE, +} ESchemaAction; + +typedef struct { + char sTableName[TSDB_TABLE_NAME_LEN]; + SArray* tags; //SArray + SArray* fields; //SArray +} SCreateSTableActionInfo; + +typedef struct { + char sTableName[TSDB_TABLE_NAME_LEN]; + SSchema* field; +} SAlterSTableActionInfo; + +typedef struct { + ESchemaAction action; + union { + SCreateSTableActionInfo createSTable; + SAlterSTableActionInfo alterSTable; + }; +} SSchemaAction; + +static int32_t getFieldBytesFromSmlKv(TAOS_SML_KV* kv, int32_t* bytes) { + if (!IS_VAR_DATA_TYPE(kv->type)) { + *bytes = tDataTypes[kv->type].bytes; + } else { + if (kv->type == TSDB_DATA_TYPE_NCHAR) { + char* ucs = malloc(kv->length * TSDB_NCHAR_SIZE + 1); + int32_t bytesNeeded = 0; + bool succ = taosMbsToUcs4(kv->value, kv->length, ucs, kv->length * TSDB_NCHAR_SIZE, &bytesNeeded); + if (!succ) { + free(ucs); + tscError("convert nchar string to UCS4_LE failed:%s", kv->value); + return TSDB_CODE_TSC_INVALID_VALUE; + } + free(ucs); + *bytes = bytesNeeded + VARSTR_HEADER_SIZE; + } else if (kv->type == TSDB_DATA_TYPE_BINARY) { + *bytes = kv->length + VARSTR_HEADER_SIZE; + } + } + return 0; +} + +static int32_t buildSmlKvSchema(TAOS_SML_KV* smlKv, SHashObj* hash, SArray* array) { + SSchema* pField = NULL; + SSchema** ppField = taosHashGet(hash, smlKv->key, strlen(smlKv->key)); + int32_t code = 0; + if (ppField) { + pField = *ppField; + + if (pField->type != smlKv->type) { + tscError("type mismatch. key %s, type %d. type before %d", smlKv->key, smlKv->type, pField->type); + return TSDB_CODE_TSC_INVALID_VALUE; + } + + int32_t bytes = 0; + code = getFieldBytesFromSmlKv(smlKv, &bytes); + if (code != 0) { + return code; + } + pField->bytes = MAX(pField->bytes, bytes); + + } else { + SSchema field = {0}; + size_t tagKeyLen = strlen(smlKv->key); + strncpy(field.name, smlKv->key, tagKeyLen); + field.name[tagKeyLen] = '\0'; + field.type = smlKv->type; + + int32_t bytes = 0; + code = getFieldBytesFromSmlKv(smlKv, &bytes); + if (code != 0) { + return code; + } + field.bytes = bytes; + + pField = taosArrayPush(array, &field); + taosHashPut(hash, field.name, tagKeyLen, &pField, POINTER_BYTES); + } + + smlKv->schema = pField; + + return 0; +} + +static int32_t buildDataPointSchemas(TAOS_SML_DATA_POINT* points, int numPoint, SArray* stableSchemas) { + int32_t code = 0; + SHashObj* sname2shema = taosHashInit(32, + taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false); + + for (int i = 0; i < numPoint; ++i) { + TAOS_SML_DATA_POINT* point = &points[i]; + size_t stableNameLen = strlen(point->stableName); + SSmlSTableSchema** ppStableSchema = taosHashGet(sname2shema, point->stableName, stableNameLen); + SSmlSTableSchema* pStableSchema = NULL; + if (ppStableSchema) { + pStableSchema= *ppStableSchema; + } else { + SSmlSTableSchema schema; + strncpy(schema.sTableName, point->stableName, stableNameLen); + schema.sTableName[stableNameLen] = '\0'; + schema.fields = taosArrayInit(64, sizeof(SSchema)); + schema.tags = taosArrayInit(8, sizeof(SSchema)); + schema.tagHash = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false); + schema.fieldHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false); + + pStableSchema = taosArrayPush(stableSchemas, &schema); + taosHashPut(sname2shema, schema.sTableName, stableNameLen, &pStableSchema, POINTER_BYTES); + } + + for (int j = 0; j < point->tagNum; ++j) { + TAOS_SML_KV* tagKv = point->tags + j; + code = buildSmlKvSchema(tagKv, pStableSchema->tagHash, pStableSchema->tags); + if (code != 0) { + tscError("build data point schema failed. point no.: %d, tag key: %s", i, tagKv->key); + return code; + } + } + + for (int j = 0; j < point->fieldNum; ++j) { + TAOS_SML_KV* fieldKv = point->fields + j; + code = buildSmlKvSchema(fieldKv, pStableSchema->fieldHash, pStableSchema->fields); + if (code != 0) { + tscError("build data point schema failed. point no.: %d, tag key: %s", i, fieldKv->key); + return code; + } + } + + point->schema = pStableSchema; + } + + size_t numStables = taosArrayGetSize(stableSchemas); + for (int32_t i = 0; i < numStables; ++i) { + SSmlSTableSchema* schema = taosArrayGet(stableSchemas, i); + taosHashCleanup(schema->tagHash); + taosHashCleanup(schema->fieldHash); + } + taosHashCleanup(sname2shema); + + tscDebug("build point schema succeed. num of super table: %zu", numStables); + for (int32_t i = 0; i < numStables; ++i) { + SSmlSTableSchema* schema = taosArrayGet(stableSchemas, i); + tscDebug("\ttable name: %s, tags number: %zu, fields number: %zu", schema->sTableName, + taosArrayGetSize(schema->tags), taosArrayGetSize(schema->fields)); + } + + return 0; +} + +static int32_t generateSchemaAction(SSchema* pointColField, SHashObj* dbAttrHash, bool isTag, char sTableName[], + SSchemaAction* action, bool* actionNeeded) { + SSchema** ppDbAttr = taosHashGet(dbAttrHash, pointColField->name, strlen(pointColField->name)); + if (ppDbAttr) { + SSchema* dbAttr = *ppDbAttr; + if (pointColField->type != dbAttr->type) { + tscError("point type and db type mismatch. key: %s. point type: %d, db type: %d", pointColField->name, + pointColField->type, dbAttr->type); + return TSDB_CODE_TSC_INVALID_VALUE; + } + + if (IS_VAR_DATA_TYPE(pointColField->type) && (pointColField->bytes > dbAttr->bytes)) { + if (isTag) { + action->action = SCHEMA_ACTION_CHANGE_TAG_SIZE; + } else { + action->action = SCHEMA_ACTION_CHANGE_COLUMN_SIZE; + } + memset(&action->alterSTable, 0, sizeof(SAlterSTableActionInfo)); + memcpy(action->alterSTable.sTableName, sTableName, TSDB_TABLE_NAME_LEN); + action->alterSTable.field = pointColField; + *actionNeeded = true; + } + } else { + if (isTag) { + action->action = SCHEMA_ACTION_ADD_TAG; + } else { + action->action = SCHEMA_ACTION_ADD_COLUMN; + } + memset(&action->alterSTable, 0, sizeof(SAlterSTableActionInfo)); + memcpy(action->alterSTable.sTableName, sTableName, TSDB_TABLE_NAME_LEN); + action->alterSTable.field = pointColField; + *actionNeeded = true; + } + tscDebug("generate schema action. action needed: %d, action: %d", *actionNeeded, action->action); + return 0; +} + +static int32_t buildColumnDescription(SSchema* field, + char* buf, int32_t bufSize, int32_t* outBytes) { + uint8_t type = field->type; + + if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) { + int32_t bytes = field->bytes - VARSTR_HEADER_SIZE; + if (type == TSDB_DATA_TYPE_NCHAR) { + bytes = bytes/TSDB_NCHAR_SIZE; + } + int out = snprintf(buf, bufSize,"%s %s(%d)", + field->name,tDataTypes[field->type].name, bytes); + *outBytes = out; + } else { + int out = snprintf(buf, bufSize, "%s %s", + field->name, tDataTypes[type].name); + *outBytes = out; + } + + return 0; +} + + +static int32_t applySchemaAction(TAOS* taos, SSchemaAction* action) { + int32_t code = 0; + int32_t capacity = TSDB_MAX_BINARY_LEN; + int32_t outBytes = 0; + char *result = (char *)calloc(1, capacity); + + tscDebug("apply schema action: %d", action->action); + switch (action->action) { + case SCHEMA_ACTION_ADD_COLUMN: { + int n = sprintf(result, "alter stable %s add column ", action->alterSTable.sTableName); + buildColumnDescription(action->alterSTable.field, result+n, capacity-n, &outBytes); + TAOS_RES* res = taos_query(taos, result); //TODO async doAsyncQuery + code = taos_errno(res); + taos_free_result(res); + break; + } + case SCHEMA_ACTION_ADD_TAG: { + int n = sprintf(result, "alter stable %s add tag ", action->alterSTable.sTableName); + buildColumnDescription(action->alterSTable.field, + result+n, capacity-n, &outBytes); + TAOS_RES* res = taos_query(taos, result); //TODO async doAsyncQuery + code = taos_errno(res); + taos_free_result(res); + break; + } + case SCHEMA_ACTION_CHANGE_COLUMN_SIZE: { + int n = sprintf(result, "alter stable %s modify column ", action->alterSTable.sTableName); + buildColumnDescription(action->alterSTable.field, result+n, + capacity-n, &outBytes); + TAOS_RES* res = taos_query(taos, result); //TODO async doAsyncQuery + code = taos_errno(res); + taos_free_result(res); + break; + } + case SCHEMA_ACTION_CHANGE_TAG_SIZE: { + int n = sprintf(result, "alter stable %s modify tag ", action->alterSTable.sTableName); + buildColumnDescription(action->alterSTable.field, result+n, + capacity-n, &outBytes); + TAOS_RES* res = taos_query(taos, result); //TODO async doAsyncQuery + code = taos_errno(res); + taos_free_result(res); + break; + } + case SCHEMA_ACTION_CREATE_STABLE: { + int n = sprintf(result, "create stable %s (", action->createSTable.sTableName); + char* pos = result + n; int freeBytes = capacity - n; + size_t numCols = taosArrayGetSize(action->createSTable.fields); + for (int32_t i = 0; i < numCols; ++i) { + SSchema* field = taosArrayGet(action->createSTable.fields, i); + buildColumnDescription(field, pos, freeBytes, &outBytes); + pos += outBytes; freeBytes -= outBytes; + *pos = ','; ++pos; --freeBytes; + } + --pos; ++freeBytes; + + outBytes = snprintf(pos, freeBytes, ") tags ("); + pos += outBytes; freeBytes -= outBytes; + + size_t numTags = taosArrayGetSize(action->createSTable.tags); + for (int32_t i = 0; i < numTags; ++i) { + SSchema* field = taosArrayGet(action->createSTable.tags, i); + buildColumnDescription(field, pos, freeBytes, &outBytes); + pos += outBytes; freeBytes -= outBytes; + *pos = ','; ++pos; --freeBytes; + } + pos--; ++freeBytes; + outBytes = snprintf(pos, freeBytes, ")"); + TAOS_RES* res = taos_query(taos, result); + code = taos_errno(res); + taos_free_result(res); + break; + } + + default: + break; + } + + free(result); + if (code != 0) { + tscError("apply schema action failure. %s", tstrerror(code)); + } + return code; +} + +static int32_t destroySmlSTableSchema(SSmlSTableSchema* schema) { + taosHashCleanup(schema->tagHash); + taosHashCleanup(schema->fieldHash); + taosArrayDestroy(schema->tags); + taosArrayDestroy(schema->fields); + return 0; +} + +int32_t loadTableMeta(TAOS* taos, char* tableName, SSmlSTableSchema* schema) { + int32_t code = 0; + + STscObj *pObj = (STscObj *)taos; + if (pObj == NULL || pObj->signature != pObj) { + terrno = TSDB_CODE_TSC_DISCONNECTED; + return TSDB_CODE_TSC_DISCONNECTED; + } + + tscDebug("load table schema. super table name: %s", tableName); + + char sql[256]; + snprintf(sql, 256, "describe %s", tableName); + TAOS_RES* res = taos_query(taos, sql); + code = taos_errno(res); + if (code != 0) { + tscError("describe table failure. %s", taos_errstr(res)); + taos_free_result(res); + return code; + } + taos_free_result(res); + + SSqlObj* pSql = calloc(1, sizeof(SSqlObj)); + pSql->pTscObj = taos; + pSql->signature = pSql; + pSql->fp = NULL; + + SStrToken tableToken = {.z=tableName, .n=(uint32_t)strlen(tableName), .type=TK_ID}; + tGetToken(tableName, &tableToken.type); + // Check if the table name available or not + if (tscValidateName(&tableToken) != TSDB_CODE_SUCCESS) { + code = TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH; + sprintf(pSql->cmd.payload, "table name is invalid"); + return code; + } + + SName sname = {0}; + if ((code = tscSetTableFullName(&sname, &tableToken, pSql)) != TSDB_CODE_SUCCESS) { + return code; + } + char fullTableName[TSDB_TABLE_FNAME_LEN] = {0}; + memset(fullTableName, 0, tListLen(fullTableName)); + tNameExtractFullName(&sname, fullTableName); + if (code != TSDB_CODE_SUCCESS) { + tscFreeSqlObj(pSql); + return code; + } + tscFreeSqlObj(pSql); + + schema->tags = taosArrayInit(8, sizeof(SSchema)); + schema->fields = taosArrayInit(64, sizeof(SSchema)); + schema->tagHash = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false); + schema->fieldHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false); + + uint32_t size = tscGetTableMetaMaxSize(); + STableMeta* tableMeta = calloc(1, size); + taosHashGetClone(tscTableMetaInfo, fullTableName, strlen(fullTableName), NULL, tableMeta, -1); + + tstrncpy(schema->sTableName, tableName, strlen(tableName)+1); + schema->precision = tableMeta->tableInfo.precision; + for (int i=0; itableInfo.numOfColumns; ++i) { + SSchema field; + tstrncpy(field.name, tableMeta->schema[i].name, strlen(tableMeta->schema[i].name)+1); + field.type = tableMeta->schema[i].type; + field.bytes = tableMeta->schema[i].bytes; + SSchema* pField = taosArrayPush(schema->fields, &field); + taosHashPut(schema->fieldHash, field.name, strlen(field.name), &pField, POINTER_BYTES); + } + + for (int i=0; itableInfo.numOfTags; ++i) { + int j = i + tableMeta->tableInfo.numOfColumns; + SSchema field; + tstrncpy(field.name, tableMeta->schema[j].name, strlen(tableMeta->schema[j].name)+1); + field.type = tableMeta->schema[j].type; + field.bytes = tableMeta->schema[j].bytes; + SSchema* pField = taosArrayPush(schema->tags, &field); + taosHashPut(schema->tagHash, field.name, strlen(field.name), &pField, POINTER_BYTES); + } + tscDebug("load table meta succeed. %s, columns number: %d, tag number: %d, precision: %d", + tableName, tableMeta->tableInfo.numOfColumns, tableMeta->tableInfo.numOfTags, schema->precision); + free(tableMeta); tableMeta = NULL; + return code; +} + +static int32_t reconcileDBSchemas(TAOS* taos, SArray* stableSchemas) { + int32_t code = 0; + size_t numStable = taosArrayGetSize(stableSchemas); + for (int i = 0; i < numStable; ++i) { + SSmlSTableSchema* pointSchema = taosArrayGet(stableSchemas, i); + SSmlSTableSchema dbSchema; + memset(&dbSchema, 0, sizeof(SSmlSTableSchema)); + + code = loadTableMeta(taos, pointSchema->sTableName, &dbSchema); + if (code == TSDB_CODE_MND_INVALID_TABLE_NAME) { + SSchemaAction schemaAction = {0}; + schemaAction.action = SCHEMA_ACTION_CREATE_STABLE; + memset(&schemaAction.createSTable, 0, sizeof(SCreateSTableActionInfo)); + memcpy(schemaAction.createSTable.sTableName, pointSchema->sTableName, TSDB_TABLE_NAME_LEN); + schemaAction.createSTable.tags = pointSchema->tags; + schemaAction.createSTable.fields = pointSchema->fields; + applySchemaAction(taos, &schemaAction); + code = loadTableMeta(taos, pointSchema->sTableName, &dbSchema); + if (code != 0) { + tscError("reconcile point schema failed. can not create %s", pointSchema->sTableName); + } else { + pointSchema->precision = dbSchema.precision; + destroySmlSTableSchema(&dbSchema); + } + } else if (code == TSDB_CODE_SUCCESS) { + size_t pointTagSize = taosArrayGetSize(pointSchema->tags); + size_t pointFieldSize = taosArrayGetSize(pointSchema->fields); + + SHashObj* dbTagHash = dbSchema.tagHash; + SHashObj* dbFieldHash = dbSchema.fieldHash; + + for (int j = 0; j < pointTagSize; ++j) { + SSchema* pointTag = taosArrayGet(pointSchema->tags, j); + SSchemaAction schemaAction = {0}; + bool actionNeeded = false; + generateSchemaAction(pointTag, dbTagHash, true, pointSchema->sTableName, &schemaAction, &actionNeeded); + if (actionNeeded) { + applySchemaAction(taos, &schemaAction); + } + } + + SSchema* pointColTs = taosArrayGet(pointSchema->fields, 0); + SSchema* dbColTs = taosArrayGet(dbSchema.fields, 0); + memcpy(pointColTs->name, dbColTs->name, TSDB_COL_NAME_LEN); + + for (int j = 1; j < pointFieldSize; ++j) { + SSchema* pointCol = taosArrayGet(pointSchema->fields, j); + SSchemaAction schemaAction = {0}; + bool actionNeeded = false; + generateSchemaAction(pointCol, dbFieldHash, false, pointSchema->sTableName, &schemaAction, &actionNeeded); + if (actionNeeded) { + applySchemaAction(taos, &schemaAction); + } + } + + pointSchema->precision = dbSchema.precision; + + destroySmlSTableSchema(&dbSchema); + } else { + tscError("load table meta error: %s", tstrerror(code)); + return code; + } + } + return 0; +} + +static int32_t getChildTableName(TAOS_SML_DATA_POINT* point, char* tableName, int* tableNameLen) { + qsort(point->tags, point->tagNum, sizeof(TAOS_SML_KV), compareSmlColKv); + + SStringBuilder sb; memset(&sb, 0, sizeof(sb)); + taosStringBuilderAppendString(&sb, point->stableName); + for (int j = 0; j < point->tagNum; ++j) { + taosStringBuilderAppendChar(&sb, ','); + TAOS_SML_KV* tagKv = point->tags + j; + taosStringBuilderAppendString(&sb, tagKv->key); + taosStringBuilderAppendChar(&sb, '='); + taosStringBuilderAppend(&sb, tagKv->value, tagKv->length); + } + size_t len = 0; + char* keyJoined = taosStringBuilderGetResult(&sb, &len); + MD5_CTX context; + MD5Init(&context); + MD5Update(&context, (uint8_t *)keyJoined, (uint32_t)len); + MD5Final(&context); + *tableNameLen = snprintf(tableName, *tableNameLen, + "t_%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x", context.digest[0], + context.digest[1], context.digest[2], context.digest[3], context.digest[4], context.digest[5], context.digest[6], + context.digest[7], context.digest[8], context.digest[9], context.digest[10], context.digest[11], + context.digest[12], context.digest[13], context.digest[14], context.digest[15]); + taosStringBuilderDestroy(&sb); + tscDebug("child table name: %s", tableName); + return 0; +} + +static int32_t creatChildTableIfNotExists(TAOS* taos, const char* cTableName, const char* sTableName, SArray* tagsSchema, SArray* tagsBind) { + size_t numTags = taosArrayGetSize(tagsSchema); + char sql[TSDB_MAX_BINARY_LEN] = {0}; + int freeBytes = TSDB_MAX_BINARY_LEN; + sprintf(sql, "create table if not exists %s using %s", cTableName, sTableName); + + snprintf(sql+strlen(sql), freeBytes-strlen(sql), "("); + for (int i = 0; i < numTags; ++i) { + SSchema* tagSchema = taosArrayGet(tagsSchema, i); + snprintf(sql+strlen(sql), freeBytes-strlen(sql), "%s,", tagSchema->name); + } + snprintf(sql + strlen(sql) - 1, freeBytes-strlen(sql)+1, ")"); + + snprintf(sql + strlen(sql), freeBytes-strlen(sql), " tags ("); + + for (int i = 0; i < numTags; ++i) { + snprintf(sql+strlen(sql), freeBytes-strlen(sql), "?,"); + } + snprintf(sql + strlen(sql) - 1, freeBytes-strlen(sql)+1, ")"); + + tscDebug("create table : %s", sql); + + TAOS_STMT* stmt = taos_stmt_init(taos); + int32_t code; + code = taos_stmt_prepare(stmt, sql, (unsigned long)strlen(sql)); + if (code != 0) { + tscError("%s", taos_stmt_errstr(stmt)); + return code; + } + + code = taos_stmt_bind_param(stmt, TARRAY_GET_START(tagsBind)); + if (code != 0) { + tscError("%s", taos_stmt_errstr(stmt)); + return code; + } + + code = taos_stmt_execute(stmt); + if (code != 0) { + tscError("%s", taos_stmt_errstr(stmt)); + return code; + } + + taos_stmt_close(stmt); + return 0; +} + +static int32_t insertChildTableBatch(TAOS* taos, char* cTableName, SArray* colsSchema, SArray* rowsBind) { + size_t numCols = taosArrayGetSize(colsSchema); + char sql[TSDB_MAX_BINARY_LEN]; + int32_t freeBytes = TSDB_MAX_BINARY_LEN; + sprintf(sql, "insert into ? ("); + + for (int i = 0; i < numCols; ++i) { + SSchema* colSchema = taosArrayGet(colsSchema, i); + snprintf(sql+strlen(sql), freeBytes-strlen(sql), "%s,", colSchema->name); + } + snprintf(sql + strlen(sql)-1, freeBytes-strlen(sql)+1, ") values ("); + + for (int i = 0; i < numCols; ++i) { + snprintf(sql+strlen(sql), freeBytes-strlen(sql), "?,"); + } + snprintf(sql + strlen(sql)-1, freeBytes-strlen(sql)+1, ")"); + + tscDebug("insert rows %zu into child table %s. ", taosArrayGetSize(rowsBind), cTableName); + + int32_t code = 0; + int32_t try = 0; + + TAOS_STMT* stmt = taos_stmt_init(taos); + + code = taos_stmt_prepare(stmt, sql, (unsigned long)strlen(sql)); + if (code != 0) { + tscError("%s", taos_stmt_errstr(stmt)); + return code; + } + + do { + + code = taos_stmt_set_tbname(stmt, cTableName); + if (code != 0) { + tscError("%s", taos_stmt_errstr(stmt)); + return code; + } + + size_t rows = taosArrayGetSize(rowsBind); + for (int32_t i = 0; i < rows; ++i) { + TAOS_BIND* colsBinds = taosArrayGetP(rowsBind, i); + code = taos_stmt_bind_param(stmt, colsBinds); + if (code != 0) { + tscError("%s", taos_stmt_errstr(stmt)); + return code; + } + code = taos_stmt_add_batch(stmt); + if (code != 0) { + tscError("%s", taos_stmt_errstr(stmt)); + return code; + } + } + + code = taos_stmt_execute(stmt); + if (code != 0) { + tscError("%s", taos_stmt_errstr(stmt)); + } + } while (code == TSDB_CODE_TDB_TABLE_RECONFIGURE && try++ < TSDB_MAX_REPLICA); + + if (code != 0) { + tscError("%s", taos_stmt_errstr(stmt)); + taos_stmt_close(stmt); + } else { + taos_stmt_close(stmt); + } + + return code; +} + +static int32_t arrangePointsByChildTableName(TAOS_SML_DATA_POINT* points, int numPoints, SHashObj* cname2points) { + for (int32_t i = 0; i < numPoints; ++i) { + TAOS_SML_DATA_POINT * point = points + i; + if (!point->childTableName) { + char childTableName[TSDB_TABLE_NAME_LEN]; + int32_t tableNameLen = TSDB_TABLE_NAME_LEN; + getChildTableName(point, childTableName, &tableNameLen); + point->childTableName = calloc(1, tableNameLen+1); + strncpy(point->childTableName, childTableName, tableNameLen); + point->childTableName[tableNameLen] = '\0'; + } + + for (int j = 0; j < point->tagNum; ++j) { + TAOS_SML_KV* kv = point->tags + j; + if (kv->type == TSDB_DATA_TYPE_TIMESTAMP) { + int64_t ts = *(int64_t*)(kv->value); + ts = convertTimePrecision(ts, TSDB_TIME_PRECISION_NANO, point->schema->precision); + *(int64_t*)(kv->value) = ts; + } + } + + for (int j = 0; j < point->fieldNum; ++j) { + TAOS_SML_KV* kv = point->fields + j; + if (kv->type == TSDB_DATA_TYPE_TIMESTAMP) { + int64_t ts = *(int64_t*)(kv->value); + ts = convertTimePrecision(ts, TSDB_TIME_PRECISION_NANO, point->schema->precision); + *(int64_t*)(kv->value) = ts; + } + } + + SArray* cTablePoints = NULL; + SArray** pCTablePoints = taosHashGet(cname2points, point->childTableName, strlen(point->childTableName)); + if (pCTablePoints) { + cTablePoints = *pCTablePoints; + } else { + cTablePoints = taosArrayInit(64, sizeof(point)); + taosHashPut(cname2points, point->childTableName, strlen(point->childTableName), &cTablePoints, POINTER_BYTES); + } + taosArrayPush(cTablePoints, &point); + } + + return 0; +} + +static int32_t insertPoints(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t numPoints) { + SHashObj* cname2points = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), + true, false); + arrangePointsByChildTableName(points, numPoints, cname2points); + + int isNullColBind = TSDB_TRUE; + SArray** pCTablePoints = taosHashIterate(cname2points, NULL); + while (pCTablePoints) { + SArray* cTablePoints = *pCTablePoints; + + TAOS_SML_DATA_POINT * point = taosArrayGetP(cTablePoints, 0); + size_t numTags = taosArrayGetSize(point->schema->tags); + size_t numCols = taosArrayGetSize(point->schema->fields); + + SArray* tagBinds = taosArrayInit(numTags, sizeof(TAOS_BIND)); + taosArraySetSize(tagBinds, numTags); + for (int j = 0; j < numTags; ++j) { + TAOS_BIND* bind = taosArrayGet(tagBinds, j); + bind->is_null = &isNullColBind; + } + for (int j = 0; j < point->tagNum; ++j) { + TAOS_SML_KV* kv = point->tags + j; + size_t idx = TARRAY_ELEM_IDX(point->schema->tags, kv->schema); + TAOS_BIND* bind = taosArrayGet(tagBinds, idx); + bind->buffer_type = kv->type; + bind->length = malloc(sizeof(uintptr_t*)); + *bind->length = kv->length; + bind->buffer = kv->value; + bind->is_null = NULL; + } + + size_t rows = taosArrayGetSize(cTablePoints); + SArray* rowsBind = taosArrayInit(rows, POINTER_BYTES); + + for (int i = 0; i < rows; ++i) { + point = taosArrayGetP(cTablePoints, i); + + TAOS_BIND* colBinds = calloc(numCols, sizeof(TAOS_BIND)); + for (int j = 0; j < numCols; ++j) { + TAOS_BIND* bind = colBinds + j; + bind->is_null = &isNullColBind; + } + for (int j = 0; j < point->fieldNum; ++j) { + TAOS_SML_KV* kv = point->fields + j; + size_t idx = TARRAY_ELEM_IDX(point->schema->fields, kv->schema); + TAOS_BIND* bind = colBinds + idx; + bind->buffer_type = kv->type; + bind->length = malloc(sizeof(uintptr_t*)); + *bind->length = kv->length; + bind->buffer = kv->value; + bind->is_null = NULL; + } + taosArrayPush(rowsBind, &colBinds); + } + + creatChildTableIfNotExists(taos, point->childTableName, point->stableName, point->schema->tags, tagBinds); + for (int i = 0; i < taosArrayGetSize(tagBinds); ++i) { + TAOS_BIND* bind = taosArrayGet(tagBinds, i); + free(bind->length); + } + taosArrayDestroy(tagBinds); + + insertChildTableBatch(taos, point->childTableName, point->schema->fields, rowsBind); + for (int i = 0; i < rows; ++i) { + TAOS_BIND* colBinds = taosArrayGetP(rowsBind, i); + for (int j = 0; j < numCols; ++j) { + TAOS_BIND* bind = colBinds + j; + free(bind->length); + } + free(colBinds); + } + taosArrayDestroy(rowsBind); + taosArrayDestroy(cTablePoints); + + pCTablePoints = taosHashIterate(cname2points, pCTablePoints); + } + + taosHashCleanup(cname2points); + return 0; +} + +int taos_sml_insert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint) { + tscDebug("taos_sml_insert. number of points: %d", numPoint); + + int32_t code = TSDB_CODE_SUCCESS; + + SArray* stableSchemas = taosArrayInit(32, sizeof(SSmlSTableSchema)); // SArray + code = buildDataPointSchemas(points, numPoint, stableSchemas); + if (code != 0) { + tscError("error building data point schemas : %s", tstrerror(code)); + goto clean_up; + } + + code = reconcileDBSchemas(taos, stableSchemas); + if (code != 0) { + tscError("error change db schema : %s", tstrerror(code)); + goto clean_up; + } + + code = insertPoints(taos, points, numPoint); + if (code != 0) { + tscError("error insert points : %s", tstrerror(code)); + } + +clean_up: + for (int i = 0; i < taosArrayGetSize(stableSchemas); ++i) { + SSmlSTableSchema* schema = taosArrayGet(stableSchemas, i); + taosArrayDestroy(schema->fields); + taosArrayDestroy(schema->tags); + } + taosArrayDestroy(stableSchemas); + return code; +} + +//========================================================================= + +typedef enum { + LP_ITEM_TAG, + LP_ITEM_FIELD +} LPItemKind; + +typedef struct { + SStrToken keyToken; + SStrToken valueToken; + + char key[TSDB_COL_NAME_LEN]; + int8_t type; + int16_t length; + + char* value; +}SLPItem; + +typedef struct { + SStrToken measToken; + SStrToken tsToken; + + char sTableName[TSDB_TABLE_NAME_LEN]; + SArray* tags; + SArray* fields; + int64_t ts; + +} SLPPoint; + +typedef enum { + LP_MEASUREMENT, + LP_TAG_KEY, + LP_TAG_VALUE, + LP_FIELD_KEY, + LP_FIELD_VALUE +} LPPart; + +int32_t scanToCommaOrSpace(SStrToken s, int32_t start, int32_t* index, LPPart part) { + for (int32_t i = start; i < s.n; ++i) { + if (s.z[i] == ',' || s.z[i] == ' ') { + *index = i; + return 0; + } + } + return -1; +} + +int32_t scanToEqual(SStrToken s, int32_t start, int32_t* index) { + for (int32_t i = start; i < s.n; ++i) { + if (s.z[i] == '=') { + *index = i; + return 0; + } + } + return -1; +} + +int32_t setPointMeasurement(SLPPoint* point, SStrToken token) { + point->measToken = token; + if (point->measToken.n < TSDB_TABLE_NAME_LEN) { + strncpy(point->sTableName, point->measToken.z, point->measToken.n); + point->sTableName[point->measToken.n] = '\0'; + } + return 0; +} + +int32_t setItemKey(SLPItem* item, SStrToken key, LPPart part) { + item->keyToken = key; + if (item->keyToken.n < TSDB_COL_NAME_LEN) { + strncpy(item->key, item->keyToken.z, item->keyToken.n); + item->key[item->keyToken.n] = '\0'; + } + return 0; +} + +int32_t setItemValue(SLPItem* item, SStrToken value, LPPart part) { + item->valueToken = value; + return 0; +} + +int32_t parseItemValue(SLPItem* item, LPItemKind kind) { + char* sv = item->valueToken.z; + char* last = item->valueToken.z + item->valueToken.n - 1; + + if (isdigit(sv[0]) || sv[0] == '-') { + if (*last == 'i') { + item->type = TSDB_DATA_TYPE_BIGINT; + item->length = (int16_t)tDataTypes[item->type].bytes; + item->value = malloc(item->length); + char* endptr = NULL; + *(int64_t*)(item->value) = strtoll(sv, &endptr, 10); + } else if (*last == 'u') { + item->type = TSDB_DATA_TYPE_UBIGINT; + item->length = (int16_t)tDataTypes[item->type].bytes; + item->value = malloc(item->length); + char* endptr = NULL; + *(uint64_t*)(item->value) = (uint64_t)strtoull(sv, &endptr, 10); + } else if (*last == 'b') { + item->type = TSDB_DATA_TYPE_TINYINT; + item->length = (int16_t)tDataTypes[item->type].bytes; + item->value = malloc(item->length); + char* endptr = NULL; + *(int8_t*)(item->value) = (int8_t)strtoll(sv, &endptr, 10); + } else if (*last == 's') { + item->type = TSDB_DATA_TYPE_SMALLINT; + item->length = (int16_t)tDataTypes[item->type].bytes; + item->value = malloc(item->length); + char* endptr = NULL; + *(int16_t*)(item->value) = (int16_t)strtoll(sv, &endptr, 10); + } else if (*last == 'w') { + item->type = TSDB_DATA_TYPE_INT; + item->length = (int16_t)tDataTypes[item->type].bytes; + item->value = malloc(item->length); + char* endptr = NULL; + *(int32_t*)(item->value) = (int32_t)strtoll(sv, &endptr, 10); + } else if (*last == 'f') { + item->type = TSDB_DATA_TYPE_FLOAT; + item->length = (int16_t)tDataTypes[item->type].bytes; + item->value = malloc(item->length); + char* endptr = NULL; + *(float*)(item->value) = (float)strtold(sv, &endptr); + } else { + item->type = TSDB_DATA_TYPE_DOUBLE; + item->length = (int16_t)tDataTypes[item->type].bytes; + item->value = malloc(item->length); + char* endptr = NULL; + *(double*)(item->value) = strtold(sv, &endptr); + } + } else if ((sv[0] == 'L' && sv[1] =='"') || sv[0] == '"' ) { + if (sv[0] == 'L') { + item->type = TSDB_DATA_TYPE_NCHAR; + uint32_t bytes = item->valueToken.n - 3; + item->length = bytes; + item->value = malloc(bytes); + memcpy(item->value, sv+2, bytes); + } else if (sv[0]=='"'){ + item->type = TSDB_DATA_TYPE_BINARY; + uint32_t bytes = item->valueToken.n - 2; + item->length = bytes; + item->value = malloc(bytes); + memcpy(item->value, sv+1, bytes); + } + } else if (sv[0] == 't' || sv[0] == 'f' || sv[0]=='T' || sv[0] == 'F') { + item->type = TSDB_DATA_TYPE_BOOL; + item->length = tDataTypes[item->type].bytes; + item->value = malloc(tDataTypes[item->type].bytes); + *(uint8_t*)(item->value) = tolower(sv[0])=='t' ? TSDB_TRUE : TSDB_FALSE; + } + return 0; +} + +int32_t compareLPItemKey(const void* p1, const void* p2) { + const SLPItem* t1 = p1; + const SLPItem* t2 = p2; + uint32_t min = (t1->keyToken.n < t2->keyToken.n) ? t1->keyToken.n : t2->keyToken.n; + int res = strncmp(t1->keyToken.z, t2->keyToken.z, min); + if (res != 0) { + return res; + } else { + return (int)(t1->keyToken.n) - (int)(t2->keyToken.n); + } +} + +int32_t setPointTimeStamp(SLPPoint* point, SStrToken tsToken) { + point->tsToken = tsToken; + return 0; +} + +int32_t parsePointTime(SLPPoint* point) { + if (point->tsToken.n <= 0) { + point->ts = taosGetTimestampNs(); + } else { + char* endptr = NULL; + point->ts = strtoll(point->tsToken.z, &endptr, 10); + char* last = point->tsToken.z + point->tsToken.n - 1; + if (*last == 's') { + point->ts *= (int64_t)1e9; + } else if (*last == 'a') { + point->ts *= (int64_t)1e6; + } else if (*last == 'u') { + point->ts *= (int64_t)1e3; + } else if (*last == 'b') { + point->ts *= 1; + } + } + return 0; +} + +int32_t tscParseLine(SStrToken line, SLPPoint* point) { + int32_t pos = 0; + + int32_t start = 0; + int32_t err = scanToCommaOrSpace(line, start, &pos, LP_MEASUREMENT); + if (err != 0) { + tscError("a"); + return err; + } + + SStrToken measurement = {.z = line.z+start, .n = pos-start}; + setPointMeasurement(point, measurement); + point->tags = taosArrayInit(64, sizeof(SLPItem)); + start = pos; + while (line.z[start] == ',') { + SLPItem item; + + start++; + err = scanToEqual(line, start, &pos); + if (err != 0) { + tscError("b"); + goto error; + } + + SStrToken tagKey = {.z = line.z + start, .n = pos-start}; + setItemKey(&item, tagKey, LP_TAG_KEY); + + start = pos + 1; + err = scanToCommaOrSpace(line, start, &pos, LP_TAG_VALUE); + if (err != 0) { + tscError("c"); + goto error; + } + + SStrToken tagValue = {.z = line.z + start, .n = pos-start}; + setItemValue(&item, tagValue, LP_TAG_VALUE); + + parseItemValue(&item, LP_ITEM_TAG); + taosArrayPush(point->tags, &item); + + start = pos; + } + + taosArraySort(point->tags, compareLPItemKey); + + point->fields = taosArrayInit(64, sizeof(SLPItem)); + + start++; + do { + SLPItem item; + + err = scanToEqual(line, start, &pos); + if (err != 0) { + goto error; + } + SStrToken fieldKey = {.z = line.z + start, .n = pos- start}; + setItemKey(&item, fieldKey, LP_FIELD_KEY); + + start = pos + 1; + err = scanToCommaOrSpace(line, start, &pos, LP_FIELD_VALUE); + if (err != 0) { + goto error; + } + SStrToken fieldValue = {.z = line.z + start, .n = pos - start}; + setItemValue(&item, fieldValue, LP_TAG_VALUE); + + parseItemValue(&item, LP_ITEM_FIELD); + taosArrayPush(point->fields, &item); + + start = pos + 1; + } while (line.z[pos] == ','); + + taosArraySort(point->fields, compareLPItemKey); + + SStrToken tsToken = {.z = line.z+start, .n = line.n-start}; + setPointTimeStamp(point, tsToken); + parsePointTime(point); + + goto done; + + error: + // free array + return err; + done: + return 0; +} + + +int32_t tscParseLines(char* lines[], int numLines, SArray* points, SArray* failedLines) { + for (int32_t i = 0; i < numLines; ++i) { + SStrToken tkLine = {.z = lines[i], .n = (uint32_t)strlen(lines[i])}; + SLPPoint point; + tscParseLine(tkLine, &point); + taosArrayPush(points, &point); + } + return 0; +} + +void destroyLPPoint(void* p) { + SLPPoint* lpPoint = p; + for (int i=0; ifields); ++i) { + SLPItem* item = taosArrayGet(lpPoint->fields, i); + free(item->value); + } + taosArrayDestroy(lpPoint->fields); + + for (int i=0; itags); ++i) { + SLPItem* item = taosArrayGet(lpPoint->tags, i); + free(item->value); + } + taosArrayDestroy(lpPoint->tags); +} + +void destroySmlDataPoint(TAOS_SML_DATA_POINT* point) { + for (int i=0; itagNum; ++i) { + free((point->tags+i)->key); + free((point->tags+i)->value); + } + free(point->tags); + for (int i=0; ifieldNum; ++i) { + free((point->fields+i)->key); + free((point->fields+i)->value); + } + free(point->fields); + free(point->stableName); + free(point->childTableName); +} + +int taos_insert_lines(TAOS* taos, char* lines[], int numLines) { + SArray* lpPoints = taosArrayInit(numLines, sizeof(SLPPoint)); + tscParseLines(lines, numLines, lpPoints, NULL); + + size_t numPoints = taosArrayGetSize(lpPoints); + TAOS_SML_DATA_POINT* points = calloc(numPoints, sizeof(TAOS_SML_DATA_POINT)); + for (int i = 0; i < numPoints; ++i) { + SLPPoint* lpPoint = taosArrayGet(lpPoints, i); + TAOS_SML_DATA_POINT* point = points+i; + point->stableName = calloc(1, strlen(lpPoint->sTableName)+1); + strncpy(point->stableName, lpPoint->sTableName, strlen(lpPoint->sTableName)); + point->stableName[strlen(lpPoint->sTableName)] = '\0'; + + size_t lpTagSize = taosArrayGetSize(lpPoint->tags); + point->tags = calloc(lpTagSize, sizeof(TAOS_SML_KV)); + point->tagNum = (int)lpTagSize; + for (int j=0; jtags, j); + TAOS_SML_KV* tagKv = point->tags + j; + + size_t kenLen = strlen(lpTag->key); + tagKv->key = calloc(1, kenLen+1); + strncpy(tagKv->key, lpTag->key, kenLen); + tagKv->key[kenLen] = '\0'; + + tagKv->type = lpTag->type; + tagKv->length = lpTag->length; + tagKv->value = malloc(tagKv->length); + memcpy(tagKv->value, lpTag->value, tagKv->length); + } + + size_t lpFieldsSize = taosArrayGetSize(lpPoint->fields); + point->fields = calloc(lpFieldsSize + 1, sizeof(TAOS_SML_KV)); + point->fieldNum = (int)(lpFieldsSize + 1); + + TAOS_SML_KV* tsField = point->fields + 0; + char tsKey[256]; + snprintf(tsKey, 256, "_%s_ts", point->stableName); + size_t tsKeyLen = strlen(tsKey); + tsField->key = calloc(1, tsKeyLen+1); + strncpy(tsField->key, tsKey, tsKeyLen); + tsField->key[tsKeyLen] = '\0'; + tsField->type = TSDB_DATA_TYPE_TIMESTAMP; + tsField->length = tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes; + tsField->value = malloc(tsField->length); + memcpy(tsField->value, &(lpPoint->ts), tsField->length); + + for (int j=0; jfields, j); + TAOS_SML_KV* fieldKv = point->fields + j + 1; + + size_t kenLen = strlen(lpField->key); + fieldKv->key = calloc(1, kenLen+1); + strncpy(fieldKv->key, lpField->key, kenLen); + fieldKv->key[kenLen] = '\0'; + + fieldKv->type = lpField->type; + fieldKv->length = lpField->length; + fieldKv->value = malloc(fieldKv->length); + memcpy(fieldKv->value, lpField->value, fieldKv->length); + } + } + + taos_sml_insert(taos, points, (int)numPoints); + + for (int i=0; ipSql->rspSem); + code = pStmt->pSql->res.code; + insertBatchClean(pStmt); - return pStmt->pSql->res.code; + return code; } int stmtParseInsertTbTags(SSqlObj* pSql, STscStmt* pStmt) { @@ -1470,6 +1472,7 @@ int taos_stmt_prepare(TAOS_STMT* stmt, const char* sql, unsigned long length) { pSql->fetchFp = waitForQueryRsp; pCmd->insertParam.insertType = TSDB_QUERY_TYPE_STMT_INSERT; + pCmd->insertParam.objectId = pSql->self; pSql->sqlstr = realloc(pSql->sqlstr, sqlLen + 1); @@ -1646,7 +1649,11 @@ int taos_stmt_close(TAOS_STMT* stmt) { } else { if (pStmt->multiTbInsert) { taosHashCleanup(pStmt->mtb.pTableHash); - pStmt->mtb.pTableBlockHashList = tscDestroyBlockHashTable(pStmt->mtb.pTableBlockHashList, false); + bool rmMeta = false; + if (pStmt->pSql && pStmt->pSql->res.code != 0) { + rmMeta = true; + } + pStmt->mtb.pTableBlockHashList = tscDestroyBlockHashTable(pStmt->mtb.pTableBlockHashList, rmMeta); taosHashCleanup(pStmt->pSql->cmd.insertParam.pTableBlockHashList); pStmt->pSql->cmd.insertParam.pTableBlockHashList = NULL; taosArrayDestroy(pStmt->mtb.tags); diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 173db05e10a53316566db4edb661c4466cfc5f2f..0f29f3c1781ab873a01b118fa805762184075a2e 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -3342,6 +3342,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t pnCmd->insertParam.numOfTables = 0; pnCmd->insertParam.pTableNameList = NULL; pnCmd->insertParam.pTableBlockHashList = NULL; + pnCmd->insertParam.objectId = pNew->self; memset(&pnCmd->insertParam.tagData, 0, sizeof(STagData)); @@ -3608,6 +3609,7 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) { pNew->signature = pNew; pNew->sqlstr = strdup(pSql->sqlstr); // todo refactor pNew->fp = tscSubqueryCompleteCallback; + tsem_init(&pNew->rspSem, 0, 0); SRetrieveSupport* ps = calloc(1, sizeof(SRetrieveSupport)); // todo use object id ps->pParentSql = pSql; diff --git a/src/inc/taos.h b/src/inc/taos.h index 9f72945ef03f28fb54ab05f84be810a0f9d5a66a..a62f38792499994ebb54567c43ecddec829de368 100644 --- a/src/inc/taos.h +++ b/src/inc/taos.h @@ -169,6 +169,8 @@ DLL_EXPORT void taos_close_stream(TAOS_STREAM *tstr); DLL_EXPORT int taos_load_table_info(TAOS *taos, const char* tableNameList); +DLL_EXPORT int taos_insert_lines(TAOS* taos, char* lines[], int numLines); + #ifdef __cplusplus } #endif diff --git a/tests/examples/c/apitest.c b/tests/examples/c/apitest.c index 0f24df0f4767fe1cdace072425768473ffcaa88f..a377bbc7b47e1a58d4b3294b88386a9c4fb74e47 100644 --- a/tests/examples/c/apitest.c +++ b/tests/examples/c/apitest.c @@ -12,7 +12,7 @@ static void prepare_data(TAOS* taos) { result = taos_query(taos, "drop database if exists test;"); taos_free_result(result); usleep(100000); - result = taos_query(taos, "create database test;"); + result = taos_query(taos, "create database test precision 'us';"); taos_free_result(result); usleep(100000); taos_select_db(taos, "test"); @@ -949,13 +949,45 @@ void verify_stream(TAOS* taos) { taos_close_stream(strm); } +int32_t verify_schema_less(TAOS* taos) { + TAOS_RES *result; + result = taos_query(taos, "drop database if exists test;"); + taos_free_result(result); + usleep(100000); + result = taos_query(taos, "create database test precision 'us';"); + taos_free_result(result); + usleep(100000); + + taos_select_db(taos, "test"); + result = taos_query(taos, "create stable ste(ts timestamp, f int) tags(t1 bigint)"); + taos_free_result(result); + usleep(100000); + + char* lines[] = { + "st,t1=3i,t2=4,t3=\"t3\" c1=3i,c3=L\"passit\",c2=false,c4=4 1626006833639000000", + "st,t1=4i,t3=\"t4\",t2=5,t4=5 c1=3i,c3=L\"passitagin\",c2=true,c4=5,c5=5 1626006833640000000", + "ste,t2=5,t3=L\"ste\" c1=true,c2=4,c3=\"iam\" 1626056811823316532", + "st,t1=4i,t2=5,t3=\"t4\" c1=3i,c3=L\"passitagain\",c2=true,c4=5 1626006833642000000", + "ste,t2=5,t3=L\"ste2\" c3=\"iamszhou\",c4=false 1626056811843316532", + "ste,t2=5,t3=L\"ste2\" c3=\"iamszhou\",c4=false,c5=32b,c6=64s,c7=32w,c8=88.88f 1626056812843316532", + "st,t1=4i,t3=\"t4\",t2=5,t4=5 c1=3i,c3=L\"passitagin\",c2=true,c4=5,c5=5,c6=7u 1626006933640000000", + "stf,t1=4i,t3=\"t4\",t2=5,t4=5 c1=3i,c3=L\"passitagin\",c2=true,c4=5,c5=5,c6=7u 1626006933640000000", + "stf,t1=4i,t3=\"t4\",t2=5,t4=5 c1=3i,c3=L\"passitagin_stf\",c2=false,c5=5,c6=7u 1626006933641a" + }; + +// int code = taos_insert_lines(taos, lines , sizeof(lines)/sizeof(char*)); + int code = taos_insert_lines(taos, &lines[0], 1); + code = taos_insert_lines(taos, &lines[1], 1); + + return code; +} + int main(int argc, char *argv[]) { const char* host = "127.0.0.1"; const char* user = "root"; const char* passwd = "taosdata"; taos_options(TSDB_OPTION_TIMEZONE, "GMT-8"); - TAOS* taos = taos_connect(host, user, passwd, "", 0); if (taos == NULL) { printf("\033[31mfailed to connect to db, reason:%s\033[0m\n", taos_errstr(taos)); @@ -967,6 +999,12 @@ int main(int argc, char *argv[]) { info = taos_get_client_info(taos); printf("client info: %s\n", info); + printf("************ verify shemaless *************\n"); + int code = verify_schema_less(taos); + if (code == 0) { + return code; + } + printf("************ verify query *************\n"); verify_query(taos); diff --git a/tests/script/general/parser/line_insert.sim b/tests/script/general/parser/line_insert.sim new file mode 100644 index 0000000000000000000000000000000000000000..f3067a3bbec8c7d566570704d6b84caaaa1f8e67 --- /dev/null +++ b/tests/script/general/parser/line_insert.sim @@ -0,0 +1,54 @@ +system sh/stop_dnodes.sh + +system sh/deploy.sh -n dnode1 -i 1 +system sh/cfg.sh -n dnode1 -c walLevel -v 1 +system sh/exec.sh -n dnode1 -s start +sleep 2000 +sql connect + +print =============== step1 +$db = testlp +$mte = ste +$mt = st +sql drop database $db -x step1 +step1: +sql create database $db precision 'us' +sql use $db +sql create stable $mte (ts timestamp, f int) TAGS(t1 bigint) + +line_insert st,t1=3i,t2=4,t3="t3" c1=3i,c3=L"passit",c2=false,c4=4 1626006833639000000 +line_insert st,t1=4i,t3="t41",t2=5 c1=3i,c3=L"passiT",c2=true,c4=5 1626006833640000000 +line_insert stf,t1=4i,t2=5,t3="t4" c1=3i,c3=L"passitagain",c2=true,c4=5 1626006833642000000 +line_insert ste,t2=5,t3=L"ste" c1=true,c2=4,c3="iam" 1626056811823316532 + +sql select * from st +if $rows != 2 then + return -1 +endi + +if $data00 != @21-07-11 20:33:53.639000@ then + return -1 +endi + +if $data03 != @passit@ then + return -1 +endi + +sql select * from stf +if $rows != 1 then + return -1 +endi + +sql select * from ste +if $rows != 1 then + return -1 +endi + +#print =============== clear +sql drop database $db +sql show databases +if $rows != 0 then + return -1 +endi + +system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/tsim/inc/sim.h b/tests/tsim/inc/sim.h index 58314d2e5055f0716793342157f6c82d6d729b29..2e19dde3d9c52c20705d131f471a2e0e389589e4 100644 --- a/tests/tsim/inc/sim.h +++ b/tests/tsim/inc/sim.h @@ -87,6 +87,8 @@ enum { SIM_CMD_RESTFUL, SIM_CMD_TEST, SIM_CMD_RETURN, + SIM_CMD_LINE_INSERT, + SIM_CMD_LINE_INSERT_ERROR, SIM_CMD_END }; @@ -172,6 +174,8 @@ bool simExecuteSqlCmd(SScript *script, char *option); bool simExecuteSqlErrorCmd(SScript *script, char *rest); bool simExecuteSqlSlowCmd(SScript *script, char *option); bool simExecuteRestfulCmd(SScript *script, char *rest); +bool simExecuteLineInsertCmd(SScript *script, char *option); +bool simExecuteLineInsertErrorCmd(SScript *script, char *option); void simVisuallizeOption(SScript *script, char *src, char *dst); #endif \ No newline at end of file diff --git a/tests/tsim/src/simExe.c b/tests/tsim/src/simExe.c index 7d74946e939bb5f34f81ef6a6aee56a31c4a6cfe..a05f46ce0de54628f289c937e959ccc3337e00a9 100644 --- a/tests/tsim/src/simExe.c +++ b/tests/tsim/src/simExe.c @@ -1067,3 +1067,49 @@ bool simExecuteSqlErrorCmd(SScript *script, char *rest) { return false; } + +bool simExecuteLineInsertCmd(SScript *script, char *rest) { + char buf[TSDB_MAX_BINARY_LEN]; + + simVisuallizeOption(script, rest, buf); + rest = buf; + + SCmdLine *line = &script->lines[script->linePos]; + + simInfo("script:%s, %s", script->fileName, rest); + simLogSql(buf, true); + char * lines[] = {rest}; + int32_t ret = taos_insert_lines(script->taos, lines, 1); + if (ret == TSDB_CODE_SUCCESS) { + simDebug("script:%s, taos:%p, %s executed. success.", script->fileName, script->taos, rest); + script->linePos++; + return true; + } else { + sprintf(script->error, "lineNum: %d. line: %s failed, ret:%d:%s", line->lineNum, rest, + ret & 0XFFFF, tstrerror(ret)); + return false; + } +} + +bool simExecuteLineInsertErrorCmd(SScript *script, char *rest) { + char buf[TSDB_MAX_BINARY_LEN]; + + simVisuallizeOption(script, rest, buf); + rest = buf; + + SCmdLine *line = &script->lines[script->linePos]; + + simInfo("script:%s, %s", script->fileName, rest); + simLogSql(buf, true); + char * lines[] = {rest}; + int32_t ret = taos_insert_lines(script->taos, lines, 1); + if (ret == TSDB_CODE_SUCCESS) { + sprintf(script->error, "script:%s, taos:%p, %s executed. expect failed, but success.", script->fileName, script->taos, rest); + script->linePos++; + return false; + } else { + simDebug("lineNum: %d. line: %s failed, ret:%d:%s. Expect failed, so success", line->lineNum, rest, + ret & 0XFFFF, tstrerror(ret)); + return true; + } +} diff --git a/tests/tsim/src/simParse.c b/tests/tsim/src/simParse.c index b909f5bd8fc10bea09afd65dc504ae35d6de3505..1acdcd2ac6eb0ecb66e2977dee7577393ed242ef 100644 --- a/tests/tsim/src/simParse.c +++ b/tests/tsim/src/simParse.c @@ -838,6 +838,38 @@ bool simParseRunBackCmd(char *rest, SCommand *pCmd, int32_t lineNum) { return true; } +bool simParseLineInsertCmd(char* rest, SCommand* pCmd, int32_t lineNum) { + int32_t expLen; + + rest++; + cmdLine[numOfLines].cmdno = SIM_CMD_LINE_INSERT; + cmdLine[numOfLines].lineNum = lineNum; + cmdLine[numOfLines].optionOffset = optionOffset; + expLen = (int32_t)strlen(rest); + memcpy(optionBuffer + optionOffset, rest, expLen); + optionOffset += expLen + 1; + *(optionBuffer + optionOffset - 1) = 0; + + numOfLines++; + return true; +} + +bool simParseLineInsertErrorCmd(char* rest, SCommand* pCmd, int32_t lineNum) { + int32_t expLen; + + rest++; + cmdLine[numOfLines].cmdno = SIM_CMD_LINE_INSERT; + cmdLine[numOfLines].lineNum = lineNum; + cmdLine[numOfLines].optionOffset = optionOffset; + expLen = (int32_t)strlen(rest); + memcpy(optionBuffer + optionOffset, rest, expLen); + optionOffset += expLen + 1; + *(optionBuffer + optionOffset - 1) = 0; + + numOfLines++; + return true; +} + void simInitsimCmdList() { int32_t cmdno; memset(simCmdList, 0, SIM_CMD_END * sizeof(SCommand)); @@ -1049,4 +1081,20 @@ void simInitsimCmdList() { simCmdList[cmdno].parseCmd = simParseReturnCmd; simCmdList[cmdno].executeCmd = simExecuteReturnCmd; simAddCmdIntoHash(&(simCmdList[cmdno])); + + cmdno = SIM_CMD_LINE_INSERT; + simCmdList[cmdno].cmdno = cmdno; + strcpy(simCmdList[cmdno].name, "line_insert"); + simCmdList[cmdno].nlen = (int16_t)strlen(simCmdList[cmdno].name); + simCmdList[cmdno].parseCmd = simParseLineInsertCmd; + simCmdList[cmdno].executeCmd = simExecuteLineInsertCmd; + simAddCmdIntoHash(&(simCmdList[cmdno])); + + cmdno = SIM_CMD_LINE_INSERT_ERROR; + simCmdList[cmdno].cmdno = cmdno; + strcpy(simCmdList[cmdno].name, "line_insert_error"); + simCmdList[cmdno].nlen = (int16_t)strlen(simCmdList[cmdno].name); + simCmdList[cmdno].parseCmd = simParseLineInsertErrorCmd; + simCmdList[cmdno].executeCmd = simExecuteLineInsertErrorCmd; + simAddCmdIntoHash(&(simCmdList[cmdno])); }