提交 ca3b779a 编写于 作者: wmmhello's avatar wmmhello

refactor:modify schemaless function to speed

上级 d418abd0
...@@ -116,6 +116,8 @@ typedef struct { ...@@ -116,6 +116,8 @@ typedef struct {
int32_t affectedRows; int32_t affectedRows;
SSmlMsgBuf msgBuf; SSmlMsgBuf msgBuf;
SHashObj *dumplicateKey; // for dumplicate key
SArray *colsContainer; // for cols parse, if is dataFormat == false
} SSmlHandle; } SSmlHandle;
//================================================================================================= //=================================================================================================
...@@ -177,8 +179,9 @@ static void smlBuildChildTableName(SSmlTableInfo *tags) { ...@@ -177,8 +179,9 @@ static void smlBuildChildTableName(SSmlTableInfo *tags) {
tMD5Update(&context, (uint8_t *)keyJoined, (uint32_t)len); tMD5Update(&context, (uint8_t *)keyJoined, (uint32_t)len);
tMD5Final(&context); tMD5Final(&context);
uint64_t digest1 = *(uint64_t*)(context.digest); uint64_t digest1 = *(uint64_t*)(context.digest);
uint64_t digest2 = *(uint64_t*)(context.digest + 8); //uint64_t digest2 = *(uint64_t*)(context.digest + 8);
snprintf(tags->childTableName, TSDB_TABLE_NAME_LEN, "t_%016"PRIx64"%016"PRIx64, digest1, digest2); //snprintf(tags->childTableName, TSDB_TABLE_NAME_LEN, "t_%016"PRIx64"%016"PRIx64, digest1, digest2);
snprintf(tags->childTableName, TSDB_TABLE_NAME_LEN, "t_%016"PRIx64, digest1);
taosStringBuilderDestroy(&sb); taosStringBuilderDestroy(&sb);
tags->uid = digest1; tags->uid = digest1;
} }
...@@ -353,7 +356,7 @@ static int32_t smlApplySchemaAction(SSmlHandle* info, SSchemaAction* action) { ...@@ -353,7 +356,7 @@ static int32_t smlApplySchemaAction(SSmlHandle* info, SSchemaAction* action) {
int n = sprintf(result, "create stable %s (", action->createSTable.sTableName); int n = sprintf(result, "create stable %s (", action->createSTable.sTableName);
char* pos = result + n; int freeBytes = capacity - n; char* pos = result + n; int freeBytes = capacity - n;
SArray *cols = action->createSTable.tags; SArray *cols = action->createSTable.fields;
for(int i = 0; i < taosArrayGetSize(cols); i++){ for(int i = 0; i < taosArrayGetSize(cols); i++){
SSmlKv *kv = taosArrayGetP(cols, i); SSmlKv *kv = taosArrayGetP(cols, i);
...@@ -1026,7 +1029,7 @@ static int32_t smlParseString(const char* sql, SSmlLineInfo *elements, SSmlMsgBu ...@@ -1026,7 +1029,7 @@ static int32_t smlParseString(const char* sql, SSmlLineInfo *elements, SSmlMsgBu
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t smlParseCols(const char* data, int32_t len, SArray *cols, bool isTag, SSmlMsgBuf *msg){ static int32_t smlParseCols(const char* data, int32_t len, SArray *cols, bool isTag, SHashObj *dumplicateKey, SSmlMsgBuf *msg){
if(isTag && len == 0){ if(isTag && len == 0){
SSmlKv *kv = taosMemoryCalloc(sizeof(SSmlKv), 1); SSmlKv *kv = taosMemoryCalloc(sizeof(SSmlKv), 1);
kv->key = TAG; kv->key = TAG;
...@@ -1054,6 +1057,13 @@ static int32_t smlParseCols(const char* data, int32_t len, SArray *cols, bool is ...@@ -1054,6 +1057,13 @@ static int32_t smlParseCols(const char* data, int32_t len, SArray *cols, bool is
return TSDB_CODE_SML_INVALID_DATA; return TSDB_CODE_SML_INVALID_DATA;
} }
if(taosHashGet(dumplicateKey, key, keyLen)){
smlBuildInvalidDataMsg(msg, "dumplicate key", key);
return TSDB_CODE_SML_INVALID_DATA;
}else{
taosHashPut(dumplicateKey, key, keyLen, key, CHAR_BYTES);
}
// parse value // parse value
i++; i++;
const char *value = data + i; const char *value = data + i;
...@@ -1474,10 +1484,15 @@ static int32_t smlParseLine(SSmlHandle* info, const char* sql) { ...@@ -1474,10 +1484,15 @@ static int32_t smlParseLine(SSmlHandle* info, const char* sql) {
return ret; return ret;
} }
SArray *cols = taosArrayInit(16, POINTER_BYTES); SArray *cols = NULL;
if (cols == NULL) { if(info->dataFormat){ // if dataFormat, cols need new memory to save data
uError("SML:0x%"PRIx64" smlParseLine failed to allocate memory", info->id); cols = taosArrayInit(16, POINTER_BYTES);
return TSDB_CODE_TSC_OUT_OF_MEMORY; if (cols == NULL) {
uError("SML:0x%"PRIx64" smlParseLine failed to allocate memory", info->id);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
}else{ // if dataFormat is false, cols do not need to save data, there is another new memory to save data
cols = info->colsContainer;
} }
ret = smlParseTS(info, elements.timestamp, elements.timestampLen, cols); ret = smlParseTS(info, elements.timestamp, elements.timestampLen, cols);
...@@ -1485,7 +1500,7 @@ static int32_t smlParseLine(SSmlHandle* info, const char* sql) { ...@@ -1485,7 +1500,7 @@ static int32_t smlParseLine(SSmlHandle* info, const char* sql) {
uError("SML:0x%"PRIx64" smlParseTS failed", info->id); uError("SML:0x%"PRIx64" smlParseTS failed", info->id);
return ret; return ret;
} }
ret = smlParseCols(elements.cols, elements.colsLen, cols, false, &info->msgBuf); ret = smlParseCols(elements.cols, elements.colsLen, cols, false, info->dumplicateKey, &info->msgBuf);
if(ret != TSDB_CODE_SUCCESS){ if(ret != TSDB_CODE_SUCCESS){
uError("SML:0x%"PRIx64" smlParseCols parse cloums fields failed", info->id); uError("SML:0x%"PRIx64" smlParseCols parse cloums fields failed", info->id);
return ret; return ret;
...@@ -1518,7 +1533,7 @@ static int32_t smlParseLine(SSmlHandle* info, const char* sql) { ...@@ -1518,7 +1533,7 @@ static int32_t smlParseLine(SSmlHandle* info, const char* sql) {
return ret; return ret;
} }
ret = smlParseCols(elements.tags, elements.tagsLen, tinfo->tags, true, &info->msgBuf); ret = smlParseCols(elements.tags, elements.tagsLen, tinfo->tags, true, info->dumplicateKey, &info->msgBuf);
if(ret != TSDB_CODE_SUCCESS){ if(ret != TSDB_CODE_SUCCESS){
uError("SML:0x%"PRIx64" smlParseCols parse tag fields failed", info->id); uError("SML:0x%"PRIx64" smlParseCols parse tag fields failed", info->id);
return ret; return ret;
...@@ -1549,6 +1564,11 @@ static int32_t smlParseLine(SSmlHandle* info, const char* sql) { ...@@ -1549,6 +1564,11 @@ static int32_t smlParseLine(SSmlHandle* info, const char* sql) {
taosHashPut(info->childTables, elements.measure, elements.measureTagsLen, &tinfo, POINTER_BYTES); taosHashPut(info->childTables, elements.measure, elements.measureTagsLen, &tinfo, POINTER_BYTES);
} }
if(!info->dataFormat){
taosArrayClear(info->colsContainer);
}
taosHashClear(info->dumplicateKey);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -1577,6 +1597,7 @@ static void smlDestroyInfo(SSmlHandle* info){ ...@@ -1577,6 +1597,7 @@ static void smlDestroyInfo(SSmlHandle* info){
// destroy info->pVgHash // destroy info->pVgHash
taosHashCleanup(info->pVgHash); taosHashCleanup(info->pVgHash);
taosHashCleanup(info->dumplicateKey);
taosMemoryFreeClear(info); taosMemoryFreeClear(info);
} }
...@@ -1623,8 +1644,17 @@ static SSmlHandle* smlBuildSmlInfo(TAOS* taos, SRequestObj* request, SMLProtocol ...@@ -1623,8 +1644,17 @@ static SSmlHandle* smlBuildSmlInfo(TAOS* taos, SRequestObj* request, SMLProtocol
info->superTables = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); info->superTables = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
info->pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); info->pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
info->dumplicateKey = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
if(!dataFormat){
info->colsContainer = taosArrayInit(32, POINTER_BYTES);
if(NULL == info->colsContainer){
uError("SML:0x%"PRIx64" create info failed", info->id);
goto cleanup;
}
}
if(NULL == info->exec || NULL == info->childTables if(NULL == info->exec || NULL == info->childTables
|| NULL == info->superTables || NULL == info->pVgHash){ || NULL == info->superTables || NULL == info->pVgHash
|| NULL == info->dumplicateKey){
uError("SML:0x%"PRIx64" create info failed", info->id); uError("SML:0x%"PRIx64" create info failed", info->id);
goto cleanup; goto cleanup;
} }
......
...@@ -468,12 +468,18 @@ TEST(testCase, smlParseLine_Test) { ...@@ -468,12 +468,18 @@ TEST(testCase, smlParseLine_Test) {
SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS, true); SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS, true);
ASSERT_NE(info, NULL); ASSERT_NE(info, NULL);
const char *sql[3] = { const char *sql[9] = {
"readings,name=truck_0,fleet=South,driver=Trish,model=H-2,device_version=v2.3 load_capacity=1500,fuel_capacity=150,nominal_fuel_consumption=12,latitude=52.31854,longitude=4.72037,elevation=124,velocity=0,heading=221,grade=0,fuel_consumption=25 1451606400000000000", "readings,name=truck_0,fleet=South,driver=Trish,model=H-2,device_version=v2.3 load_capacity=1500,fuel_capacity=150,nominal_fuel_consumption=12,latitude=52.31854,longitude=4.72037,elevation=124,velocity=0,heading=221,grade=0 1451606400000000000",
"readings,name=truck_0,fleet=South,driver=Trish,model=H-2,device_version=v2.3 load_capacity=1500,fuel_capacity=150,nominal_fuel_consumption=12,latitude=52.31854,longitude=4.72037,elevation=124,velocity=0,heading=221,grade=0,fuel_consumption=25 1451607400000000000",
"readings,name=truck_0,fleet=South,driver=Trish,model=H-2,device_version=v2.3 load_capacity=1500,fuel_capacity=150,nominal_fuel_consumption=12,latitude=52.31854,longitude=4.72037,elevation=124,heading=221,grade=0,fuel_consumption=25 1451608400000000000",
"readings,name=truck_0,fleet=South,driver=Trish,model=H-2,device_version=v2.3 fuel_capacity=150,nominal_fuel_consumption=12,latitude=52.31854,longitude=4.72037,elevation=124,velocity=0,heading=221,grade=0,fuel_consumption=25 1451609400000000000",
"readings,name=truck_0,fleet=South,driver=Trish,model=H-2,device_version=v2.3 fuel_consumption=25,grade=0 1451619400000000000",
"readings,name=truck_1,fleet=South,driver=Albert,model=F-150,device_version=v1.5 load_capacity=2000,fuel_capacity=200,nominal_fuel_consumption=15,latitude=72.45258,longitude=68.83761,elevation=255,velocity=0,heading=181,grade=0,fuel_consumption=25 1451606400000000000", "readings,name=truck_1,fleet=South,driver=Albert,model=F-150,device_version=v1.5 load_capacity=2000,fuel_capacity=200,nominal_fuel_consumption=15,latitude=72.45258,longitude=68.83761,elevation=255,velocity=0,heading=181,grade=0,fuel_consumption=25 1451606400000000000",
"readings,name=truck_2,fleet=North,driver=Derek,model=F-150,device_version=v1.5 load_capacity=2000,fuel_capacity=200,nominal_fuel_consumption=15,latitude=24.5208,longitude=28.09377,elevation=428,velocity=0,heading=304,grade=0,fuel_consumption=25 1451606400000000000" "readings,name=truck_2,driver=Derek,model=F-150,device_version=v1.5 load_capacity=2000,fuel_capacity=200,nominal_fuel_consumption=15,latitude=24.5208,longitude=28.09377,elevation=428,velocity=0,heading=304,grade=0,fuel_consumption=25 1451606400000000000",
"readings,name=truck_2,fleet=North,driver=Derek,model=F-150 load_capacity=2000,fuel_capacity=200,nominal_fuel_consumption=15,latitude=24.5208,longitude=28.09377,elevation=428,velocity=0,heading=304,grade=0,fuel_consumption=25 1451609400000000000",
"readings,fleet=South,name=truck_0,driver=Trish,model=H-2,device_version=v2.3 fuel_consumption=25,grade=0 1451629400000000000"
}; };
smlInsertLines(info, sql, 3); smlInsertLines(info, sql, 9);
// for (int i = 0; i < 3; i++) { // for (int i = 0; i < 3; i++) {
// smlParseLine(info, sql[i]); // smlParseLine(info, sql[i]);
// } // }
......
...@@ -1737,6 +1737,8 @@ int32_t smlBindData(void *handle, SArray *tags, SArray *colsFormat, SArray *cols ...@@ -1737,6 +1737,8 @@ int32_t smlBindData(void *handle, SArray *tags, SArray *colsFormat, SArray *cols
int32_t colLen = pColSchema->bytes; int32_t colLen = pColSchema->bytes;
if (IS_VAR_DATA_TYPE(pColSchema->type)) { if (IS_VAR_DATA_TYPE(pColSchema->type)) {
colLen = kv->length; colLen = kv->length;
} else if(pColSchema->type == TSDB_DATA_TYPE_TIMESTAMP){
kv->i = convertTimePrecision(kv->i, TSDB_TIME_PRECISION_NANO, pTableMeta->tableInfo.precision);
} }
MemRowAppend(&pBuf, &(kv->value), colLen, &param); MemRowAppend(&pBuf, &(kv->value), colLen, &param);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册