From aadbf21ddd3aeaa8f2487d4042af84801293c08f Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Fri, 16 Jul 2021 15:40:52 +0800 Subject: [PATCH] fix bug that save index instead of pointer when taosarray reallocation cause point invalidate --- src/client/src/tscParseLineProtocol.c | 100 ++++++++++++++++---------- 1 file changed, 63 insertions(+), 37 deletions(-) diff --git a/src/client/src/tscParseLineProtocol.c b/src/client/src/tscParseLineProtocol.c index b6779fdc67..7c789b502e 100644 --- a/src/client/src/tscParseLineProtocol.c +++ b/src/client/src/tscParseLineProtocol.c @@ -34,7 +34,7 @@ typedef struct { char* value; //=================================== - SSchema* schema; + size_t fieldSchemaIdx; } TAOS_SML_KV; typedef struct { @@ -49,7 +49,7 @@ typedef struct { int fieldNum; //================================ - SSmlSTableSchema* schema; + size_t schemaIdx; } TAOS_SML_DATA_POINT; typedef enum { @@ -126,10 +126,12 @@ static int32_t getFieldBytesFromSmlKv(TAOS_SML_KV* kv, int32_t* bytes) { static int32_t buildSmlKvSchema(TAOS_SML_KV* smlKv, SHashObj* hash, SArray* array) { SSchema* pField = NULL; - SSchema** ppField = taosHashGet(hash, smlKv->key, strlen(smlKv->key)); + size_t* pFieldIdx = taosHashGet(hash, smlKv->key, strlen(smlKv->key)); + size_t fieldIdx = -1; int32_t code = 0; - if (ppField) { - pField = *ppField; + if (pFieldIdx) { + fieldIdx = *pFieldIdx; + pField = taosArrayGet(array, fieldIdx); if (pField->type != smlKv->type) { tscError("type mismatch. key %s, type %d. type before %d", smlKv->key, smlKv->type, pField->type); @@ -158,10 +160,11 @@ static int32_t buildSmlKvSchema(TAOS_SML_KV* smlKv, SHashObj* hash, SArray* arra field.bytes = bytes; pField = taosArrayPush(array, &field); - taosHashPut(hash, field.name, tagKeyLen, &pField, POINTER_BYTES); + fieldIdx = taosArrayGetSize(array) - 1; + taosHashPut(hash, field.name, tagKeyLen, &fieldIdx, sizeof(fieldIdx)); } - smlKv->schema = pField; + smlKv->fieldSchemaIdx = fieldIdx; return 0; } @@ -174,10 +177,12 @@ static int32_t buildDataPointSchemas(TAOS_SML_DATA_POINT* points, int numPoint, for (int i = 0; i < numPoint; ++i) { TAOS_SML_DATA_POINT* point = &points[i]; size_t stableNameLen = strlen(point->stableName); - SSmlSTableSchema** ppStableSchema = taosHashGet(sname2shema, point->stableName, stableNameLen); + size_t* pStableIdx = taosHashGet(sname2shema, point->stableName, stableNameLen); SSmlSTableSchema* pStableSchema = NULL; - if (ppStableSchema) { - pStableSchema= *ppStableSchema; + size_t stableIdx = -1; + if (pStableIdx) { + pStableSchema= taosArrayGet(stableSchemas, *pStableIdx); + stableIdx = *pStableIdx; } else { SSmlSTableSchema schema; strncpy(schema.sTableName, point->stableName, stableNameLen); @@ -188,7 +193,8 @@ static int32_t buildDataPointSchemas(TAOS_SML_DATA_POINT* points, int numPoint, schema.fieldHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false); pStableSchema = taosArrayPush(stableSchemas, &schema); - taosHashPut(sname2shema, schema.sTableName, stableNameLen, &pStableSchema, POINTER_BYTES); + stableIdx = taosArrayGetSize(stableSchemas) - 1; + taosHashPut(sname2shema, schema.sTableName, stableNameLen, &stableIdx, sizeof(size_t)); } for (int j = 0; j < point->tagNum; ++j) { @@ -209,7 +215,7 @@ static int32_t buildDataPointSchemas(TAOS_SML_DATA_POINT* points, int numPoint, } } - point->schema = pStableSchema; + point->schemaIdx = stableIdx; } size_t numStables = taosArrayGetSize(stableSchemas); @@ -601,8 +607,12 @@ static int32_t creatChildTableIfNotExists(TAOS* taos, const char* cTableName, co return code; } - taos_stmt_close(stmt); - return 0; + code = taos_stmt_close(stmt); + if (code != 0) { + tscError("%s", taos_stmt_errstr(stmt)); + return code; + } + return code; } static int32_t insertChildTableBatch(TAOS* taos, char* cTableName, SArray* colsSchema, SArray* rowsBind) { @@ -674,7 +684,8 @@ static int32_t insertChildTableBatch(TAOS* taos, char* cTableName, SArray* cols return code; } -static int32_t arrangePointsByChildTableName(TAOS_SML_DATA_POINT* points, int numPoints, SHashObj* cname2points) { +static int32_t arrangePointsByChildTableName(TAOS_SML_DATA_POINT* points, int numPoints, + SHashObj* cname2points, SArray* stableSchemas) { for (int32_t i = 0; i < numPoints; ++i) { TAOS_SML_DATA_POINT * point = points + i; if (!point->childTableName) { @@ -686,11 +697,13 @@ static int32_t arrangePointsByChildTableName(TAOS_SML_DATA_POINT* points, int nu point->childTableName[tableNameLen] = '\0'; } + SSmlSTableSchema* stableSchema = taosArrayGet(stableSchemas, point->schemaIdx); + for (int j = 0; j < point->tagNum; ++j) { TAOS_SML_KV* kv = point->tags + j; if (kv->type == TSDB_DATA_TYPE_TIMESTAMP) { int64_t ts = *(int64_t*)(kv->value); - ts = convertTimePrecision(ts, TSDB_TIME_PRECISION_NANO, point->schema->precision); + ts = convertTimePrecision(ts, TSDB_TIME_PRECISION_NANO, stableSchema->precision); *(int64_t*)(kv->value) = ts; } } @@ -699,7 +712,7 @@ static int32_t arrangePointsByChildTableName(TAOS_SML_DATA_POINT* points, int nu TAOS_SML_KV* kv = point->fields + j; if (kv->type == TSDB_DATA_TYPE_TIMESTAMP) { int64_t ts = *(int64_t*)(kv->value); - ts = convertTimePrecision(ts, TSDB_TIME_PRECISION_NANO, point->schema->precision); + ts = convertTimePrecision(ts, TSDB_TIME_PRECISION_NANO, stableSchema->precision); *(int64_t*)(kv->value) = ts; } } @@ -718,10 +731,12 @@ static int32_t arrangePointsByChildTableName(TAOS_SML_DATA_POINT* points, int nu return 0; } -static int32_t insertPoints(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t numPoints) { +static int32_t insertPoints(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t numPoints, SArray* stableSchemas) { + int32_t code = TSDB_CODE_SUCCESS; + SHashObj* cname2points = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false); - arrangePointsByChildTableName(points, numPoints, cname2points); + arrangePointsByChildTableName(points, numPoints, cname2points, stableSchemas); int isNullColBind = TSDB_TRUE; SArray** pCTablePoints = taosHashIterate(cname2points, NULL); @@ -729,8 +744,9 @@ static int32_t insertPoints(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t num SArray* cTablePoints = *pCTablePoints; TAOS_SML_DATA_POINT * point = taosArrayGetP(cTablePoints, 0); - size_t numTags = taosArrayGetSize(point->schema->tags); - size_t numCols = taosArrayGetSize(point->schema->fields); + SSmlSTableSchema* sTableSchema = taosArrayGet(stableSchemas, point->schemaIdx); + size_t numTags = taosArrayGetSize(sTableSchema->tags); + size_t numCols = taosArrayGetSize(sTableSchema->fields); SArray* tagBinds = taosArrayInit(numTags, sizeof(TAOS_BIND)); taosArraySetSize(tagBinds, numTags); @@ -740,8 +756,7 @@ static int32_t insertPoints(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t num } for (int j = 0; j < point->tagNum; ++j) { TAOS_SML_KV* kv = point->tags + j; - size_t idx = TARRAY_ELEM_IDX(point->schema->tags, kv->schema); - TAOS_BIND* bind = taosArrayGet(tagBinds, idx); + TAOS_BIND* bind = taosArrayGet(tagBinds, kv->fieldSchemaIdx); bind->buffer_type = kv->type; bind->length = malloc(sizeof(uintptr_t*)); *bind->length = kv->length; @@ -762,8 +777,7 @@ static int32_t insertPoints(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t num } for (int j = 0; j < point->fieldNum; ++j) { TAOS_SML_KV* kv = point->fields + j; - size_t idx = TARRAY_ELEM_IDX(point->schema->fields, kv->schema); - TAOS_BIND* bind = colBinds + idx; + TAOS_BIND* bind = colBinds + kv->fieldSchemaIdx; bind->buffer_type = kv->type; bind->length = malloc(sizeof(uintptr_t*)); *bind->length = kv->length; @@ -773,14 +787,21 @@ static int32_t insertPoints(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t num taosArrayPush(rowsBind, &colBinds); } - creatChildTableIfNotExists(taos, point->childTableName, point->stableName, point->schema->tags, tagBinds); + code = creatChildTableIfNotExists(taos, point->childTableName, point->stableName, sTableSchema->tags, tagBinds); + if (code == 0) { + code = insertChildTableBatch(taos, point->childTableName, sTableSchema->fields, rowsBind); + if (code != 0) { + tscError("insert into child table %s failed. error %s", point->childTableName, tstrerror(code)); + } + } else { + tscError("Create Child Table %s failed, error %s", point->childTableName, tstrerror(code)); + } + for (int i = 0; i < taosArrayGetSize(tagBinds); ++i) { TAOS_BIND* bind = taosArrayGet(tagBinds, i); free(bind->length); } taosArrayDestroy(tagBinds); - - insertChildTableBatch(taos, point->childTableName, point->schema->fields, rowsBind); for (int i = 0; i < rows; ++i) { TAOS_BIND* colBinds = taosArrayGetP(rowsBind, i); for (int j = 0; j < numCols; ++j) { @@ -791,12 +812,14 @@ static int32_t insertPoints(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t num } taosArrayDestroy(rowsBind); taosArrayDestroy(cTablePoints); - + if (code != 0) { + break; + } pCTablePoints = taosHashIterate(cname2points, pCTablePoints); } taosHashCleanup(cname2points); - return 0; + return code; } int taos_sml_insert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint) { @@ -817,7 +840,7 @@ int taos_sml_insert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint) { goto clean_up; } - code = insertPoints(taos, points, numPoint); + code = insertPoints(taos, points, numPoint, stableSchemas); if (code != 0) { tscError("error insert points : %s", tstrerror(code)); } @@ -1619,13 +1642,17 @@ static int32_t parseSmlKvPairs(TAOS_SML_KV **pKVs, int *num_kvs, if (isField) { if ((*num_kvs + 2) > capacity) { capacity *= 3; capacity /= 2; + more_kvs = realloc(*pKVs, capacity * sizeof(TAOS_SML_KV)); + } else { + more_kvs = *pKVs; } - more_kvs = realloc(*pKVs, capacity * sizeof(TAOS_SML_KV)); } else { if ((*num_kvs + 1) > capacity) { capacity *= 3; capacity /= 2; + more_kvs = realloc(*pKVs, capacity * sizeof(TAOS_SML_KV)); + } else { + more_kvs = *pKVs; } - more_kvs = realloc(*pKVs, capacity * sizeof(TAOS_SML_KV)); } if (!more_kvs) { @@ -1642,7 +1669,6 @@ static int32_t parseSmlKvPairs(TAOS_SML_KV **pKVs, int *num_kvs, goto done; error: - free(*pKVs); return ret; done: *index = cur; @@ -1738,7 +1764,7 @@ int32_t tscParseLine(const char* sql, TAOS_SML_DATA_POINT* smlData) { moveTimeStampToFirstKv(&smlData, timestamp); tscDebug("Parse timestamp finished"); - return true; + return TSDB_CODE_SUCCESS; } //========================================================================= @@ -1746,8 +1772,8 @@ int32_t tscParseLine(const char* sql, TAOS_SML_DATA_POINT* smlData) { int32_t tscParseLines(char* lines[], int numLines, SArray* points, SArray* failedLines) { for (int32_t i = 0; i < numLines; ++i) { TAOS_SML_DATA_POINT point = {0}; - bool succ = tscParseLine(lines[i], &point); - if (!succ) { + int32_t code = tscParseLine(lines[i], &point); + if (code != TSDB_CODE_SUCCESS) { tscError("data point line parse failed. line %d", i); return TSDB_CODE_TSC_LINE_SYNTAX_ERROR; } else { -- GitLab