未验证 提交 9e3733a0 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #22358 from taosdata/feat/TD-24559

feat:[TD-24559]support geomety type in schemaless
...@@ -16,7 +16,7 @@ target_include_directories( ...@@ -16,7 +16,7 @@ target_include_directories(
target_link_libraries( target_link_libraries(
taos taos
INTERFACE api INTERFACE api
PRIVATE os util common transport nodes parser command planner catalog scheduler function qcom PRIVATE os util common transport nodes parser command planner catalog scheduler function qcom geometry
) )
if(TD_DARWIN_ARM64) if(TD_DARWIN_ARM64)
...@@ -57,7 +57,7 @@ target_include_directories( ...@@ -57,7 +57,7 @@ target_include_directories(
target_link_libraries( target_link_libraries(
taos_static taos_static
INTERFACE api INTERFACE api
PRIVATE os util common transport nodes parser command planner catalog scheduler function qcom PRIVATE os util common transport nodes parser command planner catalog scheduler function qcom geometry
) )
if(${BUILD_TEST}) if(${BUILD_TEST})
......
...@@ -33,6 +33,7 @@ extern "C" { ...@@ -33,6 +33,7 @@ extern "C" {
#include "ttime.h" #include "ttime.h"
#include "ttypes.h" #include "ttypes.h"
#include "cJSON.h" #include "cJSON.h"
#include "geosWrapper.h"
#if (defined(__GNUC__) && (__GNUC__ >= 3)) || (defined(__INTEL_COMPILER) && (__INTEL_COMPILER >= 800)) || defined(__clang__) #if (defined(__GNUC__) && (__GNUC__ >= 3)) || (defined(__INTEL_COMPILER) && (__INTEL_COMPILER >= 800)) || defined(__clang__)
# define expect(expr,value) (__builtin_expect ((expr),(value)) ) # define expect(expr,value) (__builtin_expect ((expr),(value)) )
...@@ -192,7 +193,7 @@ typedef struct { ...@@ -192,7 +193,7 @@ typedef struct {
// //
SArray *preLineTagKV; SArray *preLineTagKV;
SArray *maxTagKVs; SArray *maxTagKVs;
SArray *masColKVs; SArray *maxColKVs;
SSmlLineInfo preLine; SSmlLineInfo preLine;
STableMeta *currSTableMeta; STableMeta *currSTableMeta;
......
...@@ -1073,6 +1073,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) { ...@@ -1073,6 +1073,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
return 0; return 0;
end: end:
taosHashCancelIterate(info->superTables, tmp);
taosHashCleanup(hashTmp); taosHashCleanup(hashTmp);
taosMemoryFreeClear(pTableMeta); taosMemoryFreeClear(pTableMeta);
catalogRefreshTableMeta(info->pCatalog, &conn, &pName, 1); catalogRefreshTableMeta(info->pCatalog, &conn, &pName, 1);
...@@ -1191,6 +1192,7 @@ void freeSSmlKv(void *data) { ...@@ -1191,6 +1192,7 @@ void freeSSmlKv(void *data) {
SSmlKv *kv = (SSmlKv *)data; SSmlKv *kv = (SSmlKv *)data;
if (kv->keyEscaped) taosMemoryFree((void *)(kv->key)); if (kv->keyEscaped) taosMemoryFree((void *)(kv->key));
if (kv->valueEscaped) taosMemoryFree((void *)(kv->value)); if (kv->valueEscaped) taosMemoryFree((void *)(kv->value));
if (kv->type == TSDB_DATA_TYPE_GEOMETRY) geosFreeBuffer((void *)(kv->value));
} }
void smlDestroyInfo(SSmlHandle *info) { void smlDestroyInfo(SSmlHandle *info) {
...@@ -1433,6 +1435,7 @@ static int32_t smlInsertData(SSmlHandle *info) { ...@@ -1433,6 +1435,7 @@ static int32_t smlInsertData(SSmlHandle *info) {
code = smlCheckAuth(info, &conn, pName.tname, AUTH_TYPE_WRITE); code = smlCheckAuth(info, &conn, pName.tname, AUTH_TYPE_WRITE);
if(code != TSDB_CODE_SUCCESS){ if(code != TSDB_CODE_SUCCESS){
taosMemoryFree(measure); taosMemoryFree(measure);
taosHashCancelIterate(info->childTables, oneTable);
return code; return code;
} }
...@@ -1441,6 +1444,7 @@ static int32_t smlInsertData(SSmlHandle *info) { ...@@ -1441,6 +1444,7 @@ static int32_t smlInsertData(SSmlHandle *info) {
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
uError("SML:0x%" PRIx64 " catalogGetTableHashVgroup failed. table name: %s", info->id, tableData->childTableName); uError("SML:0x%" PRIx64 " catalogGetTableHashVgroup failed. table name: %s", info->id, tableData->childTableName);
taosMemoryFree(measure); taosMemoryFree(measure);
taosHashCancelIterate(info->childTables, oneTable);
return code; return code;
} }
taosHashPut(info->pVgHash, (const char *)&vg.vgId, sizeof(vg.vgId), (char *)&vg, sizeof(vg)); taosHashPut(info->pVgHash, (const char *)&vg.vgId, sizeof(vg.vgId), (char *)&vg, sizeof(vg));
...@@ -1450,6 +1454,7 @@ static int32_t smlInsertData(SSmlHandle *info) { ...@@ -1450,6 +1454,7 @@ static int32_t smlInsertData(SSmlHandle *info) {
if (unlikely(NULL == pMeta || NULL == (*pMeta)->tableMeta)) { if (unlikely(NULL == pMeta || NULL == (*pMeta)->tableMeta)) {
uError("SML:0x%" PRIx64 " NULL == pMeta. table name: %s", info->id, tableData->childTableName); uError("SML:0x%" PRIx64 " NULL == pMeta. table name: %s", info->id, tableData->childTableName);
taosMemoryFree(measure); taosMemoryFree(measure);
taosHashCancelIterate(info->childTables, oneTable);
return TSDB_CODE_SML_INTERNAL_ERROR; return TSDB_CODE_SML_INTERNAL_ERROR;
} }
...@@ -1465,6 +1470,7 @@ static int32_t smlInsertData(SSmlHandle *info) { ...@@ -1465,6 +1470,7 @@ static int32_t smlInsertData(SSmlHandle *info) {
taosMemoryFree(measure); taosMemoryFree(measure);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
uError("SML:0x%" PRIx64 " smlBindData failed", info->id); uError("SML:0x%" PRIx64 " smlBindData failed", info->id);
taosHashCancelIterate(info->childTables, oneTable);
return code; return code;
} }
oneTable = (SSmlTableInfo **)taosHashIterate(info->childTables, oneTable); oneTable = (SSmlTableInfo **)taosHashIterate(info->childTables, oneTable);
......
...@@ -102,6 +102,30 @@ int32_t smlParseValue(SSmlKv *pVal, SSmlMsgBuf *msg) { ...@@ -102,6 +102,30 @@ int32_t smlParseValue(SSmlKv *pVal, SSmlMsgBuf *msg) {
return TSDB_CODE_TSC_INVALID_VALUE; return TSDB_CODE_TSC_INVALID_VALUE;
} }
if (pVal->value[0] == 'g' || pVal->value[0] == 'G') { // geometry
if (pVal->value[1] == '"' && pVal->value[pVal->length - 1] == '"' && pVal->length >= sizeof("POINT")+3) {
int32_t code = initCtxGeomFromText();
if (code != TSDB_CODE_SUCCESS) {
return code;
}
char* tmp = taosMemoryCalloc(pVal->length, 1);
memcpy(tmp, pVal->value + 2, pVal->length - 3);
code = doGeomFromText(tmp, (unsigned char **)&pVal->value, &pVal->length);
taosMemoryFree(tmp);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
pVal->type = TSDB_DATA_TYPE_GEOMETRY;
if (pVal->length > TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE) {
geosFreeBuffer((void*)(pVal->value));
return TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN;
}
return TSDB_CODE_SUCCESS;
}
return TSDB_CODE_TSC_INVALID_VALUE;
}
if (pVal->value[0] == 't' || pVal->value[0] == 'T') { if (pVal->value[0] == 't' || pVal->value[0] == 'T') {
if (pVal->length == 1 || if (pVal->length == 1 ||
(pVal->length == 4 && (pVal->value[1] == 'r' || pVal->value[1] == 'R') && (pVal->length == 4 && (pVal->value[1] == 'r' || pVal->value[1] == 'R') &&
...@@ -390,14 +414,14 @@ static int32_t smlParseColKv(SSmlHandle *info, char **sql, char *sqlEnd, SSmlLin ...@@ -390,14 +414,14 @@ static int32_t smlParseColKv(SSmlHandle *info, char **sql, char *sqlEnd, SSmlLin
SSmlKv kv = {.key = tag->name, .keyLen = strlen(tag->name), .type = tag->type}; SSmlKv kv = {.key = tag->name, .keyLen = strlen(tag->name), .type = tag->type};
if (tag->type == TSDB_DATA_TYPE_NCHAR) { if (tag->type == TSDB_DATA_TYPE_NCHAR) {
kv.length = (tag->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE; kv.length = (tag->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
} else if (tag->type == TSDB_DATA_TYPE_BINARY) { } else if (tag->type == TSDB_DATA_TYPE_BINARY || tag->type == TSDB_DATA_TYPE_GEOMETRY) {
kv.length = tag->bytes - VARSTR_HEADER_SIZE; kv.length = tag->bytes - VARSTR_HEADER_SIZE;
} }
taosArrayPush((*tmp)->cols, &kv); taosArrayPush((*tmp)->cols, &kv);
} }
} }
info->currSTableMeta = (*tmp)->tableMeta; info->currSTableMeta = (*tmp)->tableMeta;
info->masColKVs = (*tmp)->cols; info->maxColKVs = (*tmp)->cols;
} }
} }
...@@ -512,13 +536,13 @@ static int32_t smlParseColKv(SSmlHandle *info, char **sql, char *sqlEnd, SSmlLin ...@@ -512,13 +536,13 @@ static int32_t smlParseColKv(SSmlHandle *info, char **sql, char *sqlEnd, SSmlLin
freeSSmlKv(&kv); freeSSmlKv(&kv);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (cnt >= taosArrayGetSize(info->masColKVs)) { if (cnt >= taosArrayGetSize(info->maxColKVs)) {
info->dataFormat = false; info->dataFormat = false;
info->reRun = true; info->reRun = true;
freeSSmlKv(&kv); freeSSmlKv(&kv);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SSmlKv *maxKV = (SSmlKv *)taosArrayGet(info->masColKVs, cnt); SSmlKv *maxKV = (SSmlKv *)taosArrayGet(info->maxColKVs, cnt);
if (kv.type != maxKV->type) { if (kv.type != maxKV->type) {
info->dataFormat = false; info->dataFormat = false;
info->reRun = true; info->reRun = true;
...@@ -663,14 +687,15 @@ int32_t smlParseInfluxString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLine ...@@ -663,14 +687,15 @@ int32_t smlParseInfluxString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLine
if (info->dataFormat) { if (info->dataFormat) {
uDebug("SML:0x%" PRIx64 " smlParseInfluxString format true, ts:%" PRId64, info->id, ts); uDebug("SML:0x%" PRIx64 " smlParseInfluxString format true, ts:%" PRId64, info->id, ts);
ret = smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, &kv, 0); ret = smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, &kv, 0);
if (ret != TSDB_CODE_SUCCESS) { if (ret == TSDB_CODE_SUCCESS) {
return ret; ret = smlBuildRow(info->currTableDataCtx);
} }
ret = smlBuildRow(info->currTableDataCtx);
if (ret != TSDB_CODE_SUCCESS) { clearColValArray(info->currTableDataCtx->pValues);
if (unlikely(ret != TSDB_CODE_SUCCESS)) {
smlBuildInvalidDataMsg(&info->msgBuf, "smlBuildCol error", NULL);
return ret; return ret;
} }
clearColValArray(info->currTableDataCtx->pValues);
} else { } else {
uDebug("SML:0x%" PRIx64 " smlParseInfluxString format false, ts:%" PRId64, info->id, ts); uDebug("SML:0x%" PRIx64 " smlParseInfluxString format false, ts:%" PRId64, info->id, ts);
taosArraySet(elements->colArray, 0, &kv); taosArraySet(elements->colArray, 0, &kv);
......
...@@ -20,7 +20,7 @@ TARGET_LINK_LIBRARIES( ...@@ -20,7 +20,7 @@ TARGET_LINK_LIBRARIES(
ADD_EXECUTABLE(smlTest smlTest.cpp) ADD_EXECUTABLE(smlTest smlTest.cpp)
TARGET_LINK_LIBRARIES( TARGET_LINK_LIBRARIES(
smlTest smlTest
PUBLIC os util common transport parser catalog scheduler function gtest taos_static qcom PUBLIC os util common transport parser catalog scheduler function gtest taos_static qcom geometry
) )
TARGET_INCLUDE_DIRECTORIES( TARGET_INCLUDE_DIRECTORIES(
......
...@@ -489,7 +489,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR ...@@ -489,7 +489,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
SMqVgEp *pVgEp = taosArrayGetP(pConsumerEpNew->vgs, i); SMqVgEp *pVgEp = taosArrayGetP(pConsumerEpNew->vgs, i);
if(pVgEp->vgId == d1->vgId){ if(pVgEp->vgId == d1->vgId){
jump = true; jump = true;
mInfo("pSub->offsetRows jump, because consumer id:%"PRIx64 " and vgId:%d not change", pConsumerEp->consumerId, pVgEp->vgId); mInfo("pSub->offsetRows jump, because consumer id:0x%"PRIx64 " and vgId:%d not change", pConsumerEp->consumerId, pVgEp->vgId);
break; break;
} }
} }
......
...@@ -905,6 +905,7 @@ int32_t tdProcessRSmaSubmit(SSma *pSma, int64_t version, void *pReq, void *pMsg, ...@@ -905,6 +905,7 @@ int32_t tdProcessRSmaSubmit(SSma *pSma, int64_t version, void *pReq, void *pMsg,
tb_uid_t *pTbSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL); tb_uid_t *pTbSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL);
if (tdExecuteRSmaAsync(pSma, version, pMsg, len, inputType, *pTbSuid) < 0) { if (tdExecuteRSmaAsync(pSma, version, pMsg, len, inputType, *pTbSuid) < 0) {
smaError("vgId:%d, failed to process rsma submit exec 2 since: %s", SMA_VID(pSma), terrstr()); smaError("vgId:%d, failed to process rsma submit exec 2 since: %s", SMA_VID(pSma), terrstr());
taosHashCancelIterate(uidStore.uidHash, pIter);
goto _err; goto _err;
} }
} }
......
...@@ -1088,6 +1088,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) { ...@@ -1088,6 +1088,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
if(ret != TDB_CODE_SUCCESS) { if(ret != TDB_CODE_SUCCESS) {
tqError("qGetTableList in tqUpdateTbUidList error:%d handle %s consumer:0x%" PRIx64, ret, pTqHandle->subKey, pTqHandle->consumerId); tqError("qGetTableList in tqUpdateTbUidList error:%d handle %s consumer:0x%" PRIx64, ret, pTqHandle->subKey, pTqHandle->consumerId);
taosArrayDestroy(list); taosArrayDestroy(list);
taosHashCancelIterate(pTq->pHandle, pIter);
return ret; return ret;
} }
tqReaderSetTbUidList(pTqHandle->execHandle.pTqReader, list, NULL); tqReaderSetTbUidList(pTqHandle->execHandle.pTqReader, list, NULL);
......
...@@ -22,7 +22,7 @@ static void clearColValArray(SArray* pCols) { ...@@ -22,7 +22,7 @@ static void clearColValArray(SArray* pCols) {
int32_t num = taosArrayGetSize(pCols); int32_t num = taosArrayGetSize(pCols);
for (int32_t i = 0; i < num; ++i) { for (int32_t i = 0; i < num; ++i) {
SColVal* pCol = taosArrayGet(pCols, i); SColVal* pCol = taosArrayGet(pCols, i);
if (TSDB_DATA_TYPE_NCHAR == pCol->type) { if (TSDB_DATA_TYPE_NCHAR == pCol->type || TSDB_DATA_TYPE_GEOMETRY == pCol->type) {
taosMemoryFreeClear(pCol->value.pData); taosMemoryFreeClear(pCol->value.pData);
} }
pCol->flag = CV_FLAG_NONE; pCol->flag = CV_FLAG_NONE;
...@@ -237,9 +237,13 @@ int32_t smlBuildCol(STableDataCxt* pTableCxt, SSchema* schema, void* data, int32 ...@@ -237,9 +237,13 @@ int32_t smlBuildCol(STableDataCxt* pTableCxt, SSchema* schema, void* data, int32
} }
pVal->value.pData = pUcs4; pVal->value.pData = pUcs4;
pVal->value.nData = len; pVal->value.nData = len;
} else if (kv->type == TSDB_DATA_TYPE_BINARY || kv->type == TSDB_DATA_TYPE_GEOMETRY) { } else if (kv->type == TSDB_DATA_TYPE_BINARY) {
pVal->value.nData = kv->length; pVal->value.nData = kv->length;
pVal->value.pData = (uint8_t*)kv->value; pVal->value.pData = (uint8_t*)kv->value;
} else if (kv->type == TSDB_DATA_TYPE_GEOMETRY) {
pVal->value.nData = kv->length;
pVal->value.pData = taosMemoryMalloc(kv->length);
memcpy(pVal->value.pData, (uint8_t*)kv->value, kv->length);
} else { } else {
memcpy(&pVal->value.val, &(kv->value), kv->length); memcpy(&pVal->value.val, &(kv->value), kv->length);
} }
...@@ -364,9 +368,13 @@ int32_t smlBindData(SQuery* query, bool dataFormat, SArray* tags, SArray* colsSc ...@@ -364,9 +368,13 @@ int32_t smlBindData(SQuery* query, bool dataFormat, SArray* tags, SArray* colsSc
} }
pVal->value.pData = pUcs4; pVal->value.pData = pUcs4;
pVal->value.nData = len; pVal->value.nData = len;
} else if (kv->type == TSDB_DATA_TYPE_BINARY || kv->type == TSDB_DATA_TYPE_GEOMETRY) { } else if (kv->type == TSDB_DATA_TYPE_BINARY) {
pVal->value.nData = kv->length; pVal->value.nData = kv->length;
pVal->value.pData = (uint8_t*)kv->value; pVal->value.pData = (uint8_t*)kv->value;
} else if (kv->type == TSDB_DATA_TYPE_GEOMETRY) {
pVal->value.nData = kv->length;
pVal->value.pData = taosMemoryMalloc(kv->length);
memcpy(pVal->value.pData, (uint8_t*)kv->value, kv->length);
} else { } else {
memcpy(&pVal->value.val, &(kv->value), kv->length); memcpy(&pVal->value.val, &(kv->value), kv->length);
} }
......
...@@ -333,7 +333,7 @@ int32_t insGetTableDataCxt(SHashObj* pHash, void* id, int32_t idLen, STableMeta* ...@@ -333,7 +333,7 @@ int32_t insGetTableDataCxt(SHashObj* pHash, void* id, int32_t idLen, STableMeta*
static void destroyColVal(void* p) { static void destroyColVal(void* p) {
SColVal* pVal = p; SColVal* pVal = p;
if (TSDB_DATA_TYPE_NCHAR == pVal->type) { if (TSDB_DATA_TYPE_NCHAR == pVal->type || TSDB_DATA_TYPE_GEOMETRY == pVal->type) {
taosMemoryFree(pVal->value.pData); taosMemoryFree(pVal->value.pData);
} }
} }
......
...@@ -110,6 +110,11 @@ class TDTestCase: ...@@ -110,6 +110,11 @@ class TDTestCase:
tdSql.query(f"select * from ts3724.`stb2.`") tdSql.query(f"select * from ts3724.`stb2.`")
tdSql.checkRows(1) tdSql.checkRows(1)
# tdSql.query(f"select * from td24559.stb order by _ts")
# tdSql.checkRows(4)
# tdSql.checkData(0, 2, "POINT (4.343000 89.342000)")
# tdSql.checkData(3, 2, "GEOMETRYCOLLECTION (MULTIPOINT ((0.000000 0.000000), (1.000000 1.000000)), POINT (3.000000 4.000000), LINESTRING (2.000000 3.000000, 3.000000 4.000000))")
return return
def run(self): def run(self):
......
...@@ -1552,12 +1552,45 @@ int sml_ts3724_Test() { ...@@ -1552,12 +1552,45 @@ int sml_ts3724_Test() {
return code; return code;
} }
int sml_td24559_Test() {
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
TAOS_RES *pRes = taos_query(taos, "drop database if exists td24559");
taos_free_result(pRes);
pRes = taos_query(taos, "create database if not exists td24559");
taos_free_result(pRes);
const char *sql[] = {
"stb,t1=1 f1=283i32,f2=g\"Point(4.343 89.342)\" 1632299372000",
"stb,t1=1 f2=G\"Point(4.343 89.342)\",f1=106i32 1632299373000",
"stb,t2=1 f2=G\"Point(4.343 89.342)\",f1=106i32 1632299374000",
"stb,t1=1 f1=106i32,f2=G\"GEOMETRYCOLLECTION (MULTIPOINT((0 0), (1 1)), POINT(3 4), LINESTRING(2 3, 3 4))\" 1632299378000",
};
pRes = taos_query(taos, "use td24559");
taos_free_result(pRes);
pRes = taos_schemaless_insert(taos, (char **)sql, sizeof(sql) / sizeof(sql[0]), TSDB_SML_LINE_PROTOCOL,
TSDB_SML_TIMESTAMP_MILLI_SECONDS);
int code = taos_errno(pRes);
printf("%s result0:%s\n", __FUNCTION__, taos_errstr(pRes));
taos_free_result(pRes);
taos_close(taos);
return code;
}
int main(int argc, char *argv[]) { int main(int argc, char *argv[]) {
if (argc == 2) { if (argc == 2) {
taos_options(TSDB_OPTION_CONFIGDIR, argv[1]); taos_options(TSDB_OPTION_CONFIGDIR, argv[1]);
} }
int ret = 0; int ret = 0;
ret = sml_td24559_Test();
ASSERT(!ret);
ret = sml_td24070_Test(); ret = sml_td24070_Test();
ASSERT(!ret); ASSERT(!ret);
ret = sml_td23881_Test(); ret = sml_td23881_Test();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册