提交 b5a9cbf7 编写于 作者: H Hongze Cheng

Merge branch '3.0' of https://github.com/taosdata/TDengine into feat/vnode_compact

...@@ -99,6 +99,7 @@ typedef struct { ...@@ -99,6 +99,7 @@ typedef struct {
char *tags; char *tags;
char *cols; char *cols;
char *timestamp; char *timestamp;
char *measureTag;
int32_t measureLen; int32_t measureLen;
int32_t measureTagsLen; int32_t measureTagsLen;
...@@ -114,7 +115,7 @@ typedef struct { ...@@ -114,7 +115,7 @@ typedef struct {
int32_t sTableNameLen; int32_t sTableNameLen;
char childTableName[TSDB_TABLE_NAME_LEN]; char childTableName[TSDB_TABLE_NAME_LEN];
uint64_t uid; uint64_t uid;
void *key; // for openTsdb // void *key; // for openTsdb
SArray *tags; SArray *tags;
...@@ -166,8 +167,8 @@ typedef struct { ...@@ -166,8 +167,8 @@ typedef struct {
int32_t ttl; int32_t ttl;
int32_t uid; // used for automatic create child table int32_t uid; // used for automatic create child table
NodeList *childTables; SHashObj *childTables;
NodeList *superTables; SHashObj *superTables;
SHashObj *pVgHash; SHashObj *pVgHash;
STscObj *taos; STscObj *taos;
......
...@@ -759,13 +759,13 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) { ...@@ -759,13 +759,13 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
conn.requestObjRefId = info->pRequest->self; conn.requestObjRefId = info->pRequest->self;
conn.mgmtEps = getEpSet_s(&info->taos->pAppInfo->mgmtEp); conn.mgmtEps = getEpSet_s(&info->taos->pAppInfo->mgmtEp);
NodeList *tmp = info->superTables; SSmlSTableMeta **tmp = (SSmlSTableMeta **)taosHashIterate(info->superTables, NULL);
while (tmp) { while (tmp) {
SSmlSTableMeta *sTableData = (SSmlSTableMeta *)tmp->data.value; SSmlSTableMeta *sTableData = *tmp;
bool needCheckMeta = false; // for multi thread bool needCheckMeta = false; // for multi thread
size_t superTableLen = (size_t)tmp->data.keyLen; size_t superTableLen = 0;
const void *superTable = tmp->data.key; void *superTable = taosHashGetKey(tmp, &superTableLen);
memset(pName.tname, 0, TSDB_TABLE_NAME_LEN); memset(pName.tname, 0, TSDB_TABLE_NAME_LEN);
memcpy(pName.tname, superTable, superTableLen); memcpy(pName.tname, superTable, superTableLen);
...@@ -914,7 +914,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) { ...@@ -914,7 +914,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
sTableData->tableMeta = pTableMeta; sTableData->tableMeta = pTableMeta;
tmp = tmp->next; tmp = (SSmlSTableMeta **)taosHashIterate(info->superTables, tmp);
} }
return 0; return 0;
...@@ -1017,11 +1017,11 @@ void smlDestroyTableInfo(SSmlHandle *info, SSmlTableInfo *tag) { ...@@ -1017,11 +1017,11 @@ void smlDestroyTableInfo(SSmlHandle *info, SSmlTableInfo *tag) {
taosHashCleanup(kvHash); taosHashCleanup(kvHash);
} }
if (info->parseJsonByLib) { // if (info->parseJsonByLib) {
SSmlLineInfo *key = (SSmlLineInfo *)(tag->key); // SSmlLineInfo *key = (SSmlLineInfo *)(tag->key);
if (key != NULL) taosMemoryFree(key->tags); // if (key != NULL) taosMemoryFree(key->tags);
} // }
taosMemoryFree(tag->key); // taosMemoryFree(tag->key);
taosArrayDestroy(tag->cols); taosArrayDestroy(tag->cols);
taosArrayDestroy(tag->tags); taosArrayDestroy(tag->tags);
taosMemoryFree(tag); taosMemoryFree(tag);
...@@ -1042,29 +1042,23 @@ void smlDestroyInfo(SSmlHandle *info) { ...@@ -1042,29 +1042,23 @@ void smlDestroyInfo(SSmlHandle *info) {
qDestroyQuery(info->pQuery); qDestroyQuery(info->pQuery);
// destroy info->childTables // destroy info->childTables
NodeList *tmp = info->childTables; SSmlTableInfo **oneTable = (SSmlTableInfo **)taosHashIterate(info->childTables, NULL);
while (tmp) { while (oneTable) {
if (tmp->data.used) { smlDestroyTableInfo(info, *oneTable);
smlDestroyTableInfo(info, (SSmlTableInfo *)tmp->data.value); oneTable = (SSmlTableInfo **)taosHashIterate(info->childTables, oneTable);
}
NodeList *t = tmp->next;
taosMemoryFree(tmp);
tmp = t;
} }
// destroy info->superTables // destroy info->superTables
tmp = info->superTables; SSmlSTableMeta **oneSTable = (SSmlSTableMeta **)taosHashIterate(info->superTables, NULL);
while (tmp) { while (oneSTable) {
if (tmp->data.used) { smlDestroySTableMeta(*oneSTable);
smlDestroySTableMeta((SSmlSTableMeta *)tmp->data.value); oneSTable = (SSmlSTableMeta **)taosHashIterate(info->superTables, oneSTable);
}
NodeList *t = tmp->next;
taosMemoryFree(tmp);
tmp = t;
} }
// destroy info->pVgHash // destroy info->pVgHash
taosHashCleanup(info->pVgHash); taosHashCleanup(info->pVgHash);
taosHashCleanup(info->childTables);
taosHashCleanup(info->superTables);
for (int i = 0; i < taosArrayGetSize(info->tagJsonArray); i++) { for (int i = 0; i < taosArrayGetSize(info->tagJsonArray); i++) {
cJSON *tags = (cJSON *)taosArrayGetP(info->tagJsonArray, i); cJSON *tags = (cJSON *)taosArrayGetP(info->tagJsonArray, i);
...@@ -1088,6 +1082,8 @@ void smlDestroyInfo(SSmlHandle *info) { ...@@ -1088,6 +1082,8 @@ void smlDestroyInfo(SSmlHandle *info) {
if (info->parseJsonByLib) { if (info->parseJsonByLib) {
taosMemoryFree(info->lines[i].tags); taosMemoryFree(info->lines[i].tags);
} }
if(info->lines[i].measureTagsLen != 0)
taosMemoryFree(info->lines[i].measureTag);
} }
taosMemoryFree(info->lines); taosMemoryFree(info->lines);
} }
...@@ -1112,6 +1108,9 @@ SSmlHandle *smlBuildSmlInfo(TAOS *taos) { ...@@ -1112,6 +1108,9 @@ SSmlHandle *smlBuildSmlInfo(TAOS *taos) {
} }
info->pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); info->pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
info->childTables = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
info->superTables = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
info->id = smlGenId(); info->id = smlGenId();
info->pQuery = smlInitHandle(); info->pQuery = smlInitHandle();
info->dataFormat = true; info->dataFormat = true;
...@@ -1122,7 +1121,7 @@ SSmlHandle *smlBuildSmlInfo(TAOS *taos) { ...@@ -1122,7 +1121,7 @@ SSmlHandle *smlBuildSmlInfo(TAOS *taos) {
info->maxTagKVs = taosArrayInit(8, sizeof(SSmlKv)); info->maxTagKVs = taosArrayInit(8, sizeof(SSmlKv));
info->preLineColKV = taosArrayInit(8, sizeof(SSmlKv)); info->preLineColKV = taosArrayInit(8, sizeof(SSmlKv));
if (NULL == info->pVgHash) { if (NULL == info->pVgHash || NULL == info->childTables || NULL == info->superTables) {
uError("create SSmlHandle failed"); uError("create SSmlHandle failed");
goto cleanup; goto cleanup;
} }
...@@ -1156,11 +1155,11 @@ static int32_t smlParseLineBottom(SSmlHandle *info) { ...@@ -1156,11 +1155,11 @@ static int32_t smlParseLineBottom(SSmlHandle *info) {
SSmlLineInfo *elements = info->lines + i; SSmlLineInfo *elements = info->lines + i;
SSmlTableInfo *tinfo = NULL; SSmlTableInfo *tinfo = NULL;
if (info->protocol == TSDB_SML_LINE_PROTOCOL) { if (info->protocol == TSDB_SML_LINE_PROTOCOL) {
tinfo = (SSmlTableInfo *)nodeListGet(info->childTables, elements->measure, elements->measureTagsLen, NULL); tinfo = *(SSmlTableInfo **)taosHashGet(info->childTables, elements->measure, elements->measureTagsLen);
} else if (info->protocol == TSDB_SML_TELNET_PROTOCOL) { } else if (info->protocol == TSDB_SML_TELNET_PROTOCOL) {
tinfo = (SSmlTableInfo *)nodeListGet(info->childTables, elements, POINTER_BYTES, is_same_child_table_telnet); tinfo = *(SSmlTableInfo **)taosHashGet(info->childTables, elements->measureTag, elements->measureLen + elements->tagsLen);
} else { } else {
tinfo = (SSmlTableInfo *)nodeListGet(info->childTables, elements, POINTER_BYTES, is_same_child_table_telnet); tinfo = *(SSmlTableInfo **)taosHashGet(info->childTables, elements->measureTag, elements->measureLen + elements->tagsLen);
} }
if (tinfo == NULL) { if (tinfo == NULL) {
...@@ -1184,12 +1183,12 @@ static int32_t smlParseLineBottom(SSmlHandle *info) { ...@@ -1184,12 +1183,12 @@ static int32_t smlParseLineBottom(SSmlHandle *info) {
return ret; return ret;
} }
SSmlSTableMeta *tableMeta = SSmlSTableMeta **tableMeta =
(SSmlSTableMeta *)nodeListGet(info->superTables, elements->measure, elements->measureLen, NULL); (SSmlSTableMeta **)taosHashGet(info->superTables, elements->measure, elements->measureLen);
if (tableMeta) { // update meta if (tableMeta) { // update meta
ret = smlUpdateMeta(tableMeta->colHash, tableMeta->cols, elements->colArray, false, &info->msgBuf); ret = smlUpdateMeta((*tableMeta)->colHash, (*tableMeta)->cols, elements->colArray, false, &info->msgBuf);
if (ret == TSDB_CODE_SUCCESS) { if (ret == TSDB_CODE_SUCCESS) {
ret = smlUpdateMeta(tableMeta->tagHash, tableMeta->tags, tinfo->tags, true, &info->msgBuf); ret = smlUpdateMeta((*tableMeta)->tagHash, (*tableMeta)->tags, tinfo->tags, true, &info->msgBuf);
} }
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
uError("SML:0x%" PRIx64 " smlUpdateMeta failed", info->id); uError("SML:0x%" PRIx64 " smlUpdateMeta failed", info->id);
...@@ -1205,7 +1204,7 @@ static int32_t smlParseLineBottom(SSmlHandle *info) { ...@@ -1205,7 +1204,7 @@ static int32_t smlParseLineBottom(SSmlHandle *info) {
SSmlSTableMeta *meta = smlBuildSTableMeta(info->dataFormat); SSmlSTableMeta *meta = smlBuildSTableMeta(info->dataFormat);
smlInsertMeta(meta->tagHash, meta->tags, tinfo->tags); smlInsertMeta(meta->tagHash, meta->tags, tinfo->tags);
smlInsertMeta(meta->colHash, meta->cols, elements->colArray); smlInsertMeta(meta->colHash, meta->cols, elements->colArray);
nodeListSet(&info->superTables, elements->measure, elements->measureLen, meta, NULL); taosHashPut(info->superTables, elements->measure, elements->measureLen, &meta, POINTER_BYTES);
} }
} }
...@@ -1215,9 +1214,9 @@ static int32_t smlParseLineBottom(SSmlHandle *info) { ...@@ -1215,9 +1214,9 @@ static int32_t smlParseLineBottom(SSmlHandle *info) {
static int32_t smlInsertData(SSmlHandle *info) { static int32_t smlInsertData(SSmlHandle *info) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
NodeList *tmp = info->childTables; SSmlTableInfo **oneTable = (SSmlTableInfo **)taosHashIterate(info->childTables, NULL);
while (tmp) { while (oneTable) {
SSmlTableInfo *tableData = (SSmlTableInfo *)tmp->data.value; SSmlTableInfo *tableData = *oneTable;
SName pName = {TSDB_TABLE_NAME_T, info->taos->acctId, {0}, {0}}; SName pName = {TSDB_TABLE_NAME_T, info->taos->acctId, {0}, {0}};
tstrncpy(pName.dbname, info->pRequest->pDb, sizeof(pName.dbname)); tstrncpy(pName.dbname, info->pRequest->pDb, sizeof(pName.dbname));
...@@ -1237,25 +1236,25 @@ static int32_t smlInsertData(SSmlHandle *info) { ...@@ -1237,25 +1236,25 @@ static int32_t smlInsertData(SSmlHandle *info) {
} }
taosHashPut(info->pVgHash, (const char *)&vg.vgId, sizeof(vg.vgId), (char *)&vg, sizeof(vg)); taosHashPut(info->pVgHash, (const char *)&vg.vgId, sizeof(vg.vgId), (char *)&vg, sizeof(vg));
SSmlSTableMeta *pMeta = SSmlSTableMeta **pMeta =
(SSmlSTableMeta *)nodeListGet(info->superTables, tableData->sTableName, tableData->sTableNameLen, NULL); (SSmlSTableMeta **)taosHashGet(info->superTables, tableData->sTableName, tableData->sTableNameLen);
if (unlikely(NULL == pMeta || NULL == pMeta->tableMeta)) { if (unlikely(NULL == pMeta || NULL == (*pMeta)->tableMeta)) {
uError("SML:0x%" PRIx64 " NULL == pMeta. table name: %s", info->id, tableData->childTableName); uError("SML:0x%" PRIx64 " NULL == pMeta. table name: %s", info->id, tableData->childTableName);
return TSDB_CODE_SML_INTERNAL_ERROR; return TSDB_CODE_SML_INTERNAL_ERROR;
} }
// use tablemeta of stable to save vgid and uid of child table // use tablemeta of stable to save vgid and uid of child table
pMeta->tableMeta->vgId = vg.vgId; (*pMeta)->tableMeta->vgId = vg.vgId;
pMeta->tableMeta->uid = tableData->uid; // one table merge data block together according uid (*pMeta)->tableMeta->uid = tableData->uid; // one table merge data block together according uid
code = smlBindData(info->pQuery, info->dataFormat, tableData->tags, pMeta->cols, tableData->cols, pMeta->tableMeta, code = smlBindData(info->pQuery, info->dataFormat, tableData->tags, (*pMeta)->cols, tableData->cols, (*pMeta)->tableMeta,
tableData->childTableName, tableData->sTableName, tableData->sTableNameLen, info->ttl, tableData->childTableName, tableData->sTableName, tableData->sTableNameLen, info->ttl,
info->msgBuf.buf, info->msgBuf.len); info->msgBuf.buf, info->msgBuf.len);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
uError("SML:0x%" PRIx64 " smlBindData failed", info->id); uError("SML:0x%" PRIx64 " smlBindData failed", info->id);
return code; return code;
} }
tmp = tmp->next; oneTable = (SSmlTableInfo **)taosHashIterate(info->childTables, oneTable);
} }
code = smlBuildOutput(info->pQuery, info->pVgHash); code = smlBuildOutput(info->pQuery, info->pVgHash);
...@@ -1288,25 +1287,22 @@ static void smlPrintStatisticInfo(SSmlHandle *info) { ...@@ -1288,25 +1287,22 @@ static void smlPrintStatisticInfo(SSmlHandle *info) {
int32_t smlClearForRerun(SSmlHandle *info) { int32_t smlClearForRerun(SSmlHandle *info) {
info->reRun = false; info->reRun = false;
// clear info->childTables // clear info->childTables
NodeList *pList = info->childTables; SSmlTableInfo **oneTable = (SSmlTableInfo **)taosHashIterate(info->childTables, NULL);
while (pList) { while (oneTable) {
if (pList->data.used) { smlDestroyTableInfo(info, *oneTable);
smlDestroyTableInfo(info, (SSmlTableInfo *)pList->data.value); oneTable = (SSmlTableInfo **)taosHashIterate(info->childTables, oneTable);
pList->data.used = false;
}
pList = pList->next;
} }
// clear info->superTables // clear info->superTables
pList = info->superTables; SSmlSTableMeta **oneSTable = (SSmlSTableMeta **)taosHashIterate(info->superTables, NULL);
while (pList) { while (oneSTable) {
if (pList->data.used) { smlDestroySTableMeta(*oneSTable);
smlDestroySTableMeta((SSmlSTableMeta *)pList->data.value); oneSTable = (SSmlSTableMeta **)taosHashIterate(info->superTables, oneSTable);
pList->data.used = false;
}
pList = pList->next;
} }
taosHashClear(info->childTables);
taosHashClear(info->superTables);
if (!info->dataFormat) { if (!info->dataFormat) {
if (unlikely(info->lines != NULL)) { if (unlikely(info->lines != NULL)) {
uError("SML:0x%" PRIx64 " info->lines != NULL", info->id); uError("SML:0x%" PRIx64 " info->lines != NULL", info->id);
...@@ -1375,6 +1371,7 @@ static int32_t smlParseLine(SSmlHandle *info, char *lines[], char *rawLine, char ...@@ -1375,6 +1371,7 @@ static int32_t smlParseLine(SSmlHandle *info, char *lines[], char *rawLine, char
if (info->dataFormat) { if (info->dataFormat) {
SSmlLineInfo element = {0}; SSmlLineInfo element = {0};
code = smlParseTelnetString(info, (char *)tmp, (char *)tmp + len, &element); code = smlParseTelnetString(info, (char *)tmp, (char *)tmp + len, &element);
if(element.measureTagsLen != 0) taosMemoryFree(element.measureTag);
} else { } else {
code = smlParseTelnetString(info, (char *)tmp, (char *)tmp + len, info->lines + i); code = smlParseTelnetString(info, (char *)tmp, (char *)tmp + len, info->lines + i);
} }
...@@ -1418,15 +1415,15 @@ static int smlProcess(SSmlHandle *info, char *lines[], char *rawLine, char *rawL ...@@ -1418,15 +1415,15 @@ static int smlProcess(SSmlHandle *info, char *lines[], char *rawLine, char *rawL
} }
info->cost.lineNum = info->lineNum; info->cost.lineNum = info->lineNum;
info->cost.numOfSTables = nodeListSize(info->superTables); info->cost.numOfSTables = taosHashGetSize(info->superTables);
info->cost.numOfCTables = nodeListSize(info->childTables); info->cost.numOfCTables = taosHashGetSize(info->childTables);
info->cost.schemaTime = taosGetTimestampUs(); info->cost.schemaTime = taosGetTimestampUs();
do { do {
code = smlModifyDBSchemas(info); code = smlModifyDBSchemas(info);
if (code == 0) break; if (code == 0) break;
} while (retryNum++ < nodeListSize(info->superTables) * MAX_RETRY_TIMES); } while (retryNum++ < taosHashGetSize(info->superTables) * MAX_RETRY_TIMES);
if (code != 0) { if (code != 0) {
uError("SML:0x%" PRIx64 " smlModifyDBSchemas error : %s", info->id, tstrerror(code)); uError("SML:0x%" PRIx64 " smlModifyDBSchemas error : %s", info->id, tstrerror(code));
...@@ -1504,7 +1501,7 @@ TAOS_RES *taos_schemaless_insert_inner(TAOS *taos, char *lines[], char *rawLine, ...@@ -1504,7 +1501,7 @@ TAOS_RES *taos_schemaless_insert_inner(TAOS *taos, char *lines[], char *rawLine,
request->code = code; request->code = code;
info->cost.endTime = taosGetTimestampUs(); info->cost.endTime = taosGetTimestampUs();
info->cost.code = code; info->cost.code = code;
// smlPrintStatisticInfo(info); smlPrintStatisticInfo(info);
end: end:
smlDestroyInfo(info); smlDestroyInfo(info);
......
...@@ -694,9 +694,9 @@ static int32_t smlParseTagsFromJSON(SSmlHandle *info, cJSON *tags, SSmlLineInfo ...@@ -694,9 +694,9 @@ static int32_t smlParseTagsFromJSON(SSmlHandle *info, cJSON *tags, SSmlLineInfo
SArray *superKV = NULL; SArray *superKV = NULL;
if(info->dataFormat){ if(info->dataFormat){
if(unlikely(!isSameMeasure)){ if(unlikely(!isSameMeasure)){
SSmlSTableMeta *sMeta = (SSmlSTableMeta *)nodeListGet(info->superTables, elements->measure, elements->measureLen, NULL); SSmlSTableMeta **tmp = (SSmlSTableMeta **)taosHashGet(info->superTables, elements->measure, elements->measureLen);
SSmlSTableMeta *sMeta = NULL;
if(unlikely(sMeta == NULL)){ if(unlikely(tmp == NULL)){
STableMeta * pTableMeta = smlGetMeta(info, elements->measure, elements->measureLen); STableMeta * pTableMeta = smlGetMeta(info, elements->measure, elements->measureLen);
if(pTableMeta == NULL){ if(pTableMeta == NULL){
info->dataFormat = false; info->dataFormat = false;
...@@ -705,10 +705,11 @@ static int32_t smlParseTagsFromJSON(SSmlHandle *info, cJSON *tags, SSmlLineInfo ...@@ -705,10 +705,11 @@ static int32_t smlParseTagsFromJSON(SSmlHandle *info, cJSON *tags, SSmlLineInfo
} }
sMeta = smlBuildSTableMeta(info->dataFormat); sMeta = smlBuildSTableMeta(info->dataFormat);
sMeta->tableMeta = pTableMeta; sMeta->tableMeta = pTableMeta;
nodeListSet(&info->superTables, elements->measure, elements->measureLen, sMeta, NULL); taosHashPut(info->superTables, elements->measure, elements->measureLen, &sMeta, POINTER_BYTES);
tmp = &sMeta;
} }
info->currSTableMeta = sMeta->tableMeta; info->currSTableMeta = (*tmp)->tableMeta;
superKV = sMeta->tags; superKV = (*tmp)->tags;
if(unlikely(taosArrayGetSize(superKV) == 0)){ if(unlikely(taosArrayGetSize(superKV) == 0)){
isSuperKVInit = false; isSuperKVInit = false;
...@@ -761,13 +762,13 @@ static int32_t smlParseTagsFromJSON(SSmlHandle *info, cJSON *tags, SSmlLineInfo ...@@ -761,13 +762,13 @@ static int32_t smlParseTagsFromJSON(SSmlHandle *info, cJSON *tags, SSmlLineInfo
SSmlKv *maxKV = (SSmlKv *)taosArrayGet(maxKVs, cnt); SSmlKv *maxKV = (SSmlKv *)taosArrayGet(maxKVs, cnt);
if(unlikely(kv.length > maxKV->length)){ if(unlikely(kv.length > maxKV->length)){
maxKV->length = kv.length; maxKV->length = kv.length;
SSmlSTableMeta *tableMeta = (SSmlSTableMeta *)nodeListGet(info->superTables, elements->measure, elements->measureLen, NULL); SSmlSTableMeta **tableMeta = (SSmlSTableMeta **)taosHashGet(info->superTables, elements->measure, elements->measureLen);
if(unlikely(NULL == tableMeta)){ if(unlikely(NULL == tableMeta)){
uError("SML:0x%" PRIx64 " NULL == tableMeta", info->id); uError("SML:0x%" PRIx64 " NULL == tableMeta", info->id);
return TSDB_CODE_SML_INTERNAL_ERROR; return TSDB_CODE_SML_INTERNAL_ERROR;
} }
SSmlKv *oldKV = (SSmlKv *)taosArrayGet(tableMeta->tags, cnt); SSmlKv *oldKV = (SSmlKv *)taosArrayGet((*tableMeta)->tags, cnt);
oldKV->length = kv.length; oldKV->length = kv.length;
info->needModifySchema = true; info->needModifySchema = true;
} }
...@@ -808,8 +809,14 @@ static int32_t smlParseTagsFromJSON(SSmlHandle *info, cJSON *tags, SSmlLineInfo ...@@ -808,8 +809,14 @@ static int32_t smlParseTagsFromJSON(SSmlHandle *info, cJSON *tags, SSmlLineInfo
cnt++; cnt++;
} }
SSmlTableInfo *tinfo = (SSmlTableInfo *)nodeListGet(info->childTables, elements, POINTER_BYTES, is_same_child_table_telnet); elements->measureTag = (char*)taosMemoryMalloc(elements->measureLen + elements->tagsLen);
if (unlikely(tinfo == NULL)) { memcpy(elements->measureTag, elements->measure, elements->measureLen);
memcpy(elements->measureTag + elements->measureLen, elements->tags, elements->tagsLen);
elements->measureTagsLen = elements->measureLen + elements->tagsLen;
SSmlTableInfo **tmp = (SSmlTableInfo **)taosHashGet(info->childTables, elements->measureTag, elements->measureLen + elements->tagsLen);
SSmlTableInfo *tinfo = NULL;
if (unlikely(tmp == NULL)) {
tinfo = smlBuildTableInfo(1, elements->measure, elements->measureLen); tinfo = smlBuildTableInfo(1, elements->measure, elements->measureLen);
if (unlikely(!tinfo)) { if (unlikely(!tinfo)) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
...@@ -828,17 +835,18 @@ static int32_t smlParseTagsFromJSON(SSmlHandle *info, cJSON *tags, SSmlLineInfo ...@@ -828,17 +835,18 @@ static int32_t smlParseTagsFromJSON(SSmlHandle *info, cJSON *tags, SSmlLineInfo
} }
} }
SSmlLineInfo *key = (SSmlLineInfo *)taosMemoryMalloc(sizeof(SSmlLineInfo)); // SSmlLineInfo *key = (SSmlLineInfo *)taosMemoryMalloc(sizeof(SSmlLineInfo));
*key = *elements; // *key = *elements;
if(info->parseJsonByLib){ // if(info->parseJsonByLib){
key->tags = taosMemoryMalloc(elements->tagsLen + 1); // key->tags = taosMemoryMalloc(elements->tagsLen + 1);
memcpy(key->tags, elements->tags, elements->tagsLen); // memcpy(key->tags, elements->tags, elements->tagsLen);
key->tags[elements->tagsLen] = 0; // key->tags[elements->tagsLen] = 0;
} // }
tinfo->key = key; // tinfo->key = key;
nodeListSet(&info->childTables, key, POINTER_BYTES, tinfo, is_same_child_table_telnet); taosHashPut(info->childTables, elements->measureTag, elements->measureLen + elements->tagsLen, &tinfo, POINTER_BYTES);
tmp = &tinfo;
} }
if (info->dataFormat) info->currTableDataCtx = tinfo->tableDataCtx; if (info->dataFormat) info->currTableDataCtx = (*tmp)->tableDataCtx;
return ret; return ret;
} }
...@@ -1011,6 +1019,8 @@ static int32_t smlParseJSONStringExt(SSmlHandle *info, cJSON *root, SSmlLineInfo ...@@ -1011,6 +1019,8 @@ static int32_t smlParseJSONStringExt(SSmlHandle *info, cJSON *root, SSmlLineInfo
elements->tags = NULL; elements->tags = NULL;
return ret; return ret;
} }
}else{
elements->measureTag = info->preLine.measureTag;
} }
if(needFree){ if(needFree){
...@@ -1189,6 +1199,8 @@ static int32_t smlParseJSONString(SSmlHandle *info, char **start, SSmlLineInfo * ...@@ -1189,6 +1199,8 @@ static int32_t smlParseJSONString(SSmlHandle *info, char **start, SSmlLineInfo *
uError("OTD:0x%" PRIx64 " Unable to parse tags from JSON payload", info->id); uError("OTD:0x%" PRIx64 " Unable to parse tags from JSON payload", info->id);
return ret; return ret;
} }
}else{
elements->measureTag = info->preLine.measureTag;
} }
if(unlikely(info->reRun)){ if(unlikely(info->reRun)){
...@@ -1259,7 +1271,7 @@ int32_t smlParseJSON(SSmlHandle *info, char *payload) { ...@@ -1259,7 +1271,7 @@ int32_t smlParseJSON(SSmlHandle *info, char *payload) {
if(info->dataFormat) { if(info->dataFormat) {
SSmlLineInfo element = {0}; SSmlLineInfo element = {0};
ret = smlParseJSONString(info, &dataPointStart, &element); ret = smlParseJSONString(info, &dataPointStart, &element);
if(element.measure == NULL) break; if(element.measureTagsLen != 0) taosMemoryFree(element.measureTag);
}else{ }else{
if(cnt >= payloadNum){ if(cnt >= payloadNum){
payloadNum = payloadNum << 1; payloadNum = payloadNum << 1;
......
...@@ -148,9 +148,9 @@ static int32_t smlParseTagKv(SSmlHandle *info, char **sql, char *sqlEnd, ...@@ -148,9 +148,9 @@ static int32_t smlParseTagKv(SSmlHandle *info, char **sql, char *sqlEnd,
SArray *superKV = NULL; SArray *superKV = NULL;
if(info->dataFormat){ if(info->dataFormat){
if(unlikely(!isSameMeasure)){ if(unlikely(!isSameMeasure)){
SSmlSTableMeta *sMeta = (SSmlSTableMeta *)nodeListGet(info->superTables, currElement->measure, currElement->measureLen, NULL); SSmlSTableMeta **tmp = (SSmlSTableMeta **)taosHashGet(info->superTables, currElement->measure, currElement->measureLen);
SSmlSTableMeta *sMeta = NULL;
if(unlikely(sMeta == NULL)){ if(unlikely(tmp == NULL)){
STableMeta * pTableMeta = smlGetMeta(info, currElement->measure, currElement->measureLen); STableMeta * pTableMeta = smlGetMeta(info, currElement->measure, currElement->measureLen);
if(pTableMeta == NULL){ if(pTableMeta == NULL){
info->dataFormat = false; info->dataFormat = false;
...@@ -159,10 +159,11 @@ static int32_t smlParseTagKv(SSmlHandle *info, char **sql, char *sqlEnd, ...@@ -159,10 +159,11 @@ static int32_t smlParseTagKv(SSmlHandle *info, char **sql, char *sqlEnd,
} }
sMeta = smlBuildSTableMeta(info->dataFormat); sMeta = smlBuildSTableMeta(info->dataFormat);
sMeta->tableMeta = pTableMeta; sMeta->tableMeta = pTableMeta;
nodeListSet(&info->superTables, currElement->measure, currElement->measureLen, sMeta, NULL); taosHashPut(info->superTables, currElement->measure, currElement->measureLen, &sMeta, POINTER_BYTES);
tmp = &sMeta;
} }
info->currSTableMeta = sMeta->tableMeta; info->currSTableMeta = (*tmp)->tableMeta;
superKV = sMeta->tags; superKV = (*tmp)->tags;
if(unlikely(taosArrayGetSize(superKV) == 0)){ if(unlikely(taosArrayGetSize(superKV) == 0)){
isSuperKVInit = false; isSuperKVInit = false;
...@@ -258,13 +259,13 @@ static int32_t smlParseTagKv(SSmlHandle *info, char **sql, char *sqlEnd, ...@@ -258,13 +259,13 @@ static int32_t smlParseTagKv(SSmlHandle *info, char **sql, char *sqlEnd,
SSmlKv *maxKV = (SSmlKv *)taosArrayGet(maxKVs, cnt); SSmlKv *maxKV = (SSmlKv *)taosArrayGet(maxKVs, cnt);
if(unlikely(kv.length > maxKV->length)){ if(unlikely(kv.length > maxKV->length)){
maxKV->length = kv.length; maxKV->length = kv.length;
SSmlSTableMeta *tableMeta = (SSmlSTableMeta *)nodeListGet(info->superTables, currElement->measure, currElement->measureLen, NULL); SSmlSTableMeta **tableMeta = (SSmlSTableMeta **)taosHashGet(info->superTables, currElement->measure, currElement->measureLen);
if(unlikely(NULL == tableMeta)){ if(unlikely(NULL == tableMeta)){
uError("SML:0x%" PRIx64 " NULL == tableMeta", info->id); uError("SML:0x%" PRIx64 " NULL == tableMeta", info->id);
return TSDB_CODE_SML_INTERNAL_ERROR; return TSDB_CODE_SML_INTERNAL_ERROR;
} }
SSmlKv *oldKV = (SSmlKv *)taosArrayGet(tableMeta->tags, cnt); SSmlKv *oldKV = (SSmlKv *)taosArrayGet((*tableMeta)->tags, cnt);
oldKV->length = kv.length; oldKV->length = kv.length;
info->needModifySchema = true; info->needModifySchema = true;
} }
...@@ -310,7 +311,7 @@ static int32_t smlParseTagKv(SSmlHandle *info, char **sql, char *sqlEnd, ...@@ -310,7 +311,7 @@ static int32_t smlParseTagKv(SSmlHandle *info, char **sql, char *sqlEnd,
(*sql)++; (*sql)++;
} }
void* oneTable = nodeListGet(info->childTables, currElement->measure, currElement->measureTagsLen, NULL); void* oneTable = taosHashGet(info->childTables, currElement->measure, currElement->measureTagsLen);
if ((oneTable != NULL)) { if ((oneTable != NULL)) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -332,7 +333,7 @@ static int32_t smlParseTagKv(SSmlHandle *info, char **sql, char *sqlEnd, ...@@ -332,7 +333,7 @@ static int32_t smlParseTagKv(SSmlHandle *info, char **sql, char *sqlEnd,
} }
} }
nodeListSet(&info->childTables, currElement->measure, currElement->measureTagsLen, tinfo, NULL); taosHashPut(info->childTables, currElement->measure, currElement->measureTagsLen, &tinfo, POINTER_BYTES);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -345,18 +346,18 @@ static int32_t smlParseColKv(SSmlHandle *info, char **sql, char *sqlEnd, ...@@ -345,18 +346,18 @@ static int32_t smlParseColKv(SSmlHandle *info, char **sql, char *sqlEnd,
SArray *superKV = NULL; SArray *superKV = NULL;
if(info->dataFormat){ if(info->dataFormat){
if(unlikely(!isSameCTable)){ if(unlikely(!isSameCTable)){
SSmlTableInfo *oneTable = (SSmlTableInfo *)nodeListGet(info->childTables, currElement->measure, currElement->measureTagsLen, NULL); SSmlTableInfo **oneTable = (SSmlTableInfo **)taosHashGet(info->childTables, currElement->measure, currElement->measureTagsLen);
if (unlikely(oneTable == NULL)) { if (unlikely(oneTable == NULL)) {
smlBuildInvalidDataMsg(&info->msgBuf, "child table should inside", currElement->measure); smlBuildInvalidDataMsg(&info->msgBuf, "child table should inside", currElement->measure);
return TSDB_CODE_SML_INVALID_DATA; return TSDB_CODE_SML_INVALID_DATA;
} }
info->currTableDataCtx = oneTable->tableDataCtx; info->currTableDataCtx = (*oneTable)->tableDataCtx;
} }
if(unlikely(!isSameMeasure)){ if(unlikely(!isSameMeasure)){
SSmlSTableMeta *sMeta = (SSmlSTableMeta *)nodeListGet(info->superTables, currElement->measure, currElement->measureLen, NULL); SSmlSTableMeta **tmp = (SSmlSTableMeta **)taosHashGet(info->superTables, currElement->measure, currElement->measureLen);
SSmlSTableMeta *sMeta = NULL;
if(unlikely(sMeta == NULL)){ if(unlikely(tmp == NULL)){
STableMeta * pTableMeta = smlGetMeta(info, currElement->measure, currElement->measureLen); STableMeta * pTableMeta = smlGetMeta(info, currElement->measure, currElement->measureLen);
if(pTableMeta == NULL){ if(pTableMeta == NULL){
info->dataFormat = false; info->dataFormat = false;
...@@ -365,10 +366,11 @@ static int32_t smlParseColKv(SSmlHandle *info, char **sql, char *sqlEnd, ...@@ -365,10 +366,11 @@ static int32_t smlParseColKv(SSmlHandle *info, char **sql, char *sqlEnd,
} }
sMeta = smlBuildSTableMeta(info->dataFormat); sMeta = smlBuildSTableMeta(info->dataFormat);
sMeta->tableMeta = pTableMeta; sMeta->tableMeta = pTableMeta;
nodeListSet(&info->superTables, currElement->measure, currElement->measureLen, sMeta, NULL); taosHashPut(info->superTables, currElement->measure, currElement->measureLen, &sMeta, POINTER_BYTES);
tmp = &sMeta;
} }
info->currSTableMeta = sMeta->tableMeta; info->currSTableMeta = (*tmp)->tableMeta;
superKV = sMeta->cols; superKV = (*tmp)->cols;
if(unlikely(taosArrayGetSize(superKV) == 0)){ if(unlikely(taosArrayGetSize(superKV) == 0)){
isSuperKVInit = false; isSuperKVInit = false;
} }
...@@ -487,13 +489,13 @@ static int32_t smlParseColKv(SSmlHandle *info, char **sql, char *sqlEnd, ...@@ -487,13 +489,13 @@ static int32_t smlParseColKv(SSmlHandle *info, char **sql, char *sqlEnd,
if(unlikely(IS_VAR_DATA_TYPE(kv.type) && kv.length > maxKV->length)){ if(unlikely(IS_VAR_DATA_TYPE(kv.type) && kv.length > maxKV->length)){
maxKV->length = kv.length; maxKV->length = kv.length;
SSmlSTableMeta *tableMeta = (SSmlSTableMeta *)nodeListGet(info->superTables, currElement->measure, currElement->measureLen, NULL); SSmlSTableMeta **tableMeta = (SSmlSTableMeta **)taosHashGet(info->superTables, currElement->measure, currElement->measureLen);
if(unlikely(NULL == tableMeta)){ if(unlikely(NULL == tableMeta)){
uError("SML:0x%" PRIx64 " NULL == tableMeta", info->id); uError("SML:0x%" PRIx64 " NULL == tableMeta", info->id);
return TSDB_CODE_SML_INTERNAL_ERROR; return TSDB_CODE_SML_INTERNAL_ERROR;
} }
SSmlKv *oldKV = (SSmlKv *)taosArrayGet(tableMeta->cols, cnt); SSmlKv *oldKV = (SSmlKv *)taosArrayGet((*tableMeta)->cols, cnt);
oldKV->length = kv.length; oldKV->length = kv.length;
info->needModifySchema = true; info->needModifySchema = true;
} }
......
...@@ -71,6 +71,7 @@ static void smlParseTelnetElement(char **sql, char *sqlEnd, char **data, int32_t ...@@ -71,6 +71,7 @@ static void smlParseTelnetElement(char **sql, char *sqlEnd, char **data, int32_t
static int32_t smlParseTelnetTags(SSmlHandle *info, char *data, char *sqlEnd, SSmlLineInfo *elements, SSmlMsgBuf *msg) { static int32_t smlParseTelnetTags(SSmlHandle *info, char *data, char *sqlEnd, SSmlLineInfo *elements, SSmlMsgBuf *msg) {
if(is_same_child_table_telnet(elements, &info->preLine) == 0){ if(is_same_child_table_telnet(elements, &info->preLine) == 0){
elements->measureTag = info->preLine.measureTag;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -83,9 +84,9 @@ static int32_t smlParseTelnetTags(SSmlHandle *info, char *data, char *sqlEnd, SS ...@@ -83,9 +84,9 @@ static int32_t smlParseTelnetTags(SSmlHandle *info, char *data, char *sqlEnd, SS
SArray *superKV = NULL; SArray *superKV = NULL;
if(info->dataFormat){ if(info->dataFormat){
if(!isSameMeasure){ if(!isSameMeasure){
SSmlSTableMeta *sMeta = (SSmlSTableMeta *)nodeListGet(info->superTables, elements->measure, elements->measureLen, NULL); SSmlSTableMeta **tmp = (SSmlSTableMeta **)taosHashGet(info->superTables, elements->measure, elements->measureLen);
SSmlSTableMeta *sMeta = NULL;
if(unlikely(sMeta == NULL)){ if(unlikely(tmp == NULL)){
STableMeta * pTableMeta = smlGetMeta(info, elements->measure, elements->measureLen); STableMeta * pTableMeta = smlGetMeta(info, elements->measure, elements->measureLen);
if(pTableMeta == NULL){ if(pTableMeta == NULL){
info->dataFormat = false; info->dataFormat = false;
...@@ -94,10 +95,11 @@ static int32_t smlParseTelnetTags(SSmlHandle *info, char *data, char *sqlEnd, SS ...@@ -94,10 +95,11 @@ static int32_t smlParseTelnetTags(SSmlHandle *info, char *data, char *sqlEnd, SS
} }
sMeta = smlBuildSTableMeta(info->dataFormat); sMeta = smlBuildSTableMeta(info->dataFormat);
sMeta->tableMeta = pTableMeta; sMeta->tableMeta = pTableMeta;
nodeListSet(&info->superTables, elements->measure, elements->measureLen, sMeta, NULL); taosHashPut(info->superTables, elements->measure, elements->measureLen, &sMeta, POINTER_BYTES);
tmp = &sMeta;
} }
info->currSTableMeta = sMeta->tableMeta; info->currSTableMeta = (*tmp)->tableMeta;
superKV = sMeta->tags; superKV = (*tmp)->tags;
if(unlikely(taosArrayGetSize(superKV) == 0)){ if(unlikely(taosArrayGetSize(superKV) == 0)){
isSuperKVInit = false; isSuperKVInit = false;
...@@ -183,13 +185,13 @@ static int32_t smlParseTelnetTags(SSmlHandle *info, char *data, char *sqlEnd, SS ...@@ -183,13 +185,13 @@ static int32_t smlParseTelnetTags(SSmlHandle *info, char *data, char *sqlEnd, SS
SSmlKv *maxKV = (SSmlKv *)taosArrayGet(maxKVs, cnt); SSmlKv *maxKV = (SSmlKv *)taosArrayGet(maxKVs, cnt);
if(unlikely(kv.length > maxKV->length)){ if(unlikely(kv.length > maxKV->length)){
maxKV->length = kv.length; maxKV->length = kv.length;
SSmlSTableMeta *tableMeta = (SSmlSTableMeta *)nodeListGet(info->superTables, elements->measure, elements->measureLen, NULL); SSmlSTableMeta **tableMeta = (SSmlSTableMeta **)taosHashGet(info->superTables, elements->measure, elements->measureLen);
if(unlikely(NULL == tableMeta)){ if(unlikely(NULL == tableMeta)){
uError("SML:0x%" PRIx64 " NULL == tableMeta", info->id); uError("SML:0x%" PRIx64 " NULL == tableMeta", info->id);
return TSDB_CODE_SML_INTERNAL_ERROR; return TSDB_CODE_SML_INTERNAL_ERROR;
} }
SSmlKv *oldKV = (SSmlKv *)taosArrayGet(tableMeta->tags, cnt); SSmlKv *oldKV = (SSmlKv *)taosArrayGet((*tableMeta)->tags, cnt);
oldKV->length = kv.length; oldKV->length = kv.length;
info->needModifySchema = true; info->needModifySchema = true;
} }
...@@ -229,8 +231,15 @@ static int32_t smlParseTelnetTags(SSmlHandle *info, char *data, char *sqlEnd, SS ...@@ -229,8 +231,15 @@ static int32_t smlParseTelnetTags(SSmlHandle *info, char *data, char *sqlEnd, SS
taosArrayPush(preLineKV, &kv); taosArrayPush(preLineKV, &kv);
cnt++; cnt++;
} }
SSmlTableInfo *tinfo = (SSmlTableInfo *)nodeListGet(info->childTables, elements, POINTER_BYTES, is_same_child_table_telnet);
if (unlikely(tinfo == NULL)) { elements->measureTag = (char*)taosMemoryMalloc(elements->measureLen + elements->tagsLen);
memcpy(elements->measureTag, elements->measure, elements->measureLen);
memcpy(elements->measureTag + elements->measureLen, elements->tags, elements->tagsLen);
elements->measureTagsLen = elements->measureLen + elements->tagsLen;
SSmlTableInfo **tmp = (SSmlTableInfo **)taosHashGet(info->childTables, elements->measureTag, elements->measureLen + elements->tagsLen);
SSmlTableInfo *tinfo = NULL;
if (unlikely(tmp == NULL)) {
tinfo = smlBuildTableInfo(1, elements->measure, elements->measureLen); tinfo = smlBuildTableInfo(1, elements->measure, elements->measureLen);
if (!tinfo) { if (!tinfo) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
...@@ -249,12 +258,13 @@ static int32_t smlParseTelnetTags(SSmlHandle *info, char *data, char *sqlEnd, SS ...@@ -249,12 +258,13 @@ static int32_t smlParseTelnetTags(SSmlHandle *info, char *data, char *sqlEnd, SS
} }
} }
SSmlLineInfo *key = (SSmlLineInfo *)taosMemoryMalloc(sizeof(SSmlLineInfo)); // SSmlLineInfo *key = (SSmlLineInfo *)taosMemoryMalloc(sizeof(SSmlLineInfo));
*key = *elements; // *key = *elements;
tinfo->key = key; // tinfo->key = key;
nodeListSet(&info->childTables, key, POINTER_BYTES, tinfo, is_same_child_table_telnet); taosHashPut(info->childTables, elements->measureTag, elements->measureLen + elements->tagsLen, &tinfo, POINTER_BYTES);
tmp = &tinfo;
} }
if (info->dataFormat) info->currTableDataCtx = tinfo->tableDataCtx; if (info->dataFormat) info->currTableDataCtx = (*tmp)->tableDataCtx;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册