diff --git a/include/client/taos.h b/include/client/taos.h index 25887b2879e3d534584f8d857ee3af670c7bcbd1..9aaf15e8b05876e95f46ac7355811f9986a953c7 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -218,6 +218,9 @@ DLL_EXPORT int taos_get_db_route_info(TAOS *taos, const char *db, TAOS_DB_ROUTE_ DLL_EXPORT int taos_get_table_vgId(TAOS *taos, const char *db, const char *table, int *vgId); DLL_EXPORT int taos_load_table_info(TAOS *taos, const char *tableNameList); + +/* --------------------------schemaless INTERFACE------------------------------- */ + DLL_EXPORT TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int protocol, int precision); DLL_EXPORT TAOS_RES *taos_schemaless_insert_with_reqid(TAOS *taos, char *lines[], int numLines, int protocol, int precision, int64_t reqid); @@ -225,6 +228,13 @@ DLL_EXPORT TAOS_RES *taos_schemaless_insert_raw(TAOS *taos, char *lines, int len int precision); DLL_EXPORT TAOS_RES *taos_schemaless_insert_raw_with_reqid(TAOS *taos, char *lines, int len, int32_t *totalRows, int protocol, int precision, int64_t reqid); +DLL_EXPORT TAOS_RES *taos_schemaless_insert_ttl(TAOS *taos, char *lines[], int numLines, int protocol, int precision, int32_t ttl); +DLL_EXPORT TAOS_RES *taos_schemaless_insert_ttl_with_reqid(TAOS *taos, char *lines[], int numLines, int protocol, + int precision, int32_t ttl, int64_t reqid); +DLL_EXPORT TAOS_RES *taos_schemaless_insert_raw_ttl(TAOS *taos, char *lines, int len, int32_t *totalRows, int protocol, + int precision, int32_t ttl); +DLL_EXPORT TAOS_RES *taos_schemaless_insert_raw_ttl_with_reqid(TAOS *taos, char *lines, int len, int32_t *totalRows, + int protocol, int precision, int32_t ttl, int64_t reqid); /* --------------------------TMQ INTERFACE------------------------------- */ diff --git a/include/libs/parser/parser.h b/include/libs/parser/parser.h index b72d85243ba2c51f18023dcb97171c85bbd7b1c8..9be79a539f468d20ad28d2278651fbae169ff4a6 100644 --- a/include/libs/parser/parser.h +++ b/include/libs/parser/parser.h @@ -108,7 +108,7 @@ int32_t qCreateSName(SName* pName, const char* pTableName, int32_t acctId, char* void* smlInitHandle(SQuery* pQuery); void smlDestroyHandle(void* pHandle); int32_t smlBindData(void* handle, SArray* tags, SArray* colsSchema, SArray* cols, bool format, STableMeta* pTableMeta, - char* tableName, const char* sTableName, int32_t sTableNameLen, char* msgBuf, int16_t msgBufLen); + char* tableName, const char* sTableName, int32_t sTableNameLen, int32_t ttl, char* msgBuf, int16_t msgBufLen); int32_t smlBuildOutput(void* handle, SHashObj* pVgHash); int32_t rewriteToVnodeModifyOpStmt(SQuery* pQuery, SArray* pBufArray); diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index 83bbe392cf1db9efb344d0c206471ef52351f6ef..f4d8c80e3f1d09dedaf1b759f854b334d96212d5 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -162,7 +162,9 @@ typedef struct { SMLProtocolType protocol; int8_t precision; - bool dataFormat; // true means that the name and order of keys in each line are the same(only for influx protocol) + bool dataFormat; // true means that the name and order of keys in each line are the same(only for influx protocol) + bool isRawLine; + int32_t ttl; SHashObj *childTables; SHashObj *superTables; @@ -863,6 +865,7 @@ static int32_t smlParseTS(SSmlHandle *info, const char *data, int32_t len, SArra } else { ASSERT(0); } + uDebug("SML:0x%" PRIx64 " smlParseTS:%" PRId64, info->id, ts); if (ts == -1) return TSDB_CODE_INVALID_TIMESTAMP; @@ -2063,7 +2066,7 @@ static int32_t smlParseJSONString(SSmlHandle *info, cJSON *root, SSmlTableInfo * static int32_t smlParseInfluxLine(SSmlHandle *info, const char *sql, const int len) { SSmlLineInfo elements = {0}; - uDebug("SML:0x%" PRIx64 " smlParseInfluxLine sql", info->id); + uDebug("SML:0x%" PRIx64 " smlParseInfluxLine sql:%s", info->id, (info->isRawLine ? "rawdata" : sql)); int ret = smlParseInfluxString(sql, sql + len, &elements, &info->msgBuf); if (ret != TSDB_CODE_SUCCESS) { @@ -2323,7 +2326,7 @@ static int32_t smlInsertData(SSmlHandle *info) { code = smlBindData(info->exec, tableData->tags, (*pMeta)->cols, tableData->cols, info->dataFormat, (*pMeta)->tableMeta, tableData->childTableName, tableData->sTableName, tableData->sTableNameLen, - info->msgBuf.buf, info->msgBuf.len); + info->ttl, info->msgBuf.buf, info->msgBuf.len); if (code != TSDB_CODE_SUCCESS) { uError("SML:0x%" PRIx64 " smlBindData failed", info->id); return code; @@ -2514,7 +2517,7 @@ static void smlInsertCallback(void *param, void *res, int32_t code) { } TAOS_RES *taos_schemaless_insert_inner(SRequestObj *request, char *lines[], char *rawLine, char *rawLineEnd, - int numLines, int protocol, int precision) { + int numLines, int protocol, int precision, int32_t ttl) { int batchs = 0; STscObj *pTscObj = request->pTscObj; @@ -2575,6 +2578,9 @@ TAOS_RES *taos_schemaless_insert_inner(SRequestObj *request, char *lines[], char goto end; } + info->isRawLine = (rawLine == NULL); + info->ttl = ttl; + int32_t perBatch = LINE_BATCH; if (numLines > perBatch) { @@ -2637,13 +2643,14 @@ end: * @return TAOS_RES */ -TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int protocol, int precision) { +TAOS_RES *taos_schemaless_insert_ttl_with_reqid(TAOS *taos, char *lines[], int numLines, int protocol, int precision, + int32_t ttl, int64_t reqid) { if (NULL == taos) { terrno = TSDB_CODE_TSC_DISCONNECTED; return NULL; } - SRequestObj *request = (SRequestObj *)createRequest(*(int64_t *)taos, TSDB_SQL_INSERT, 0); + SRequestObj *request = (SRequestObj *)createRequest(*(int64_t *)taos, TSDB_SQL_INSERT, reqid); if (!request) { uError("SML:taos_schemaless_insert error request is null"); return NULL; @@ -2656,40 +2663,29 @@ TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int pr return (TAOS_RES *)request; } - return taos_schemaless_insert_inner(request, lines, NULL, NULL, numLines, protocol, precision); + return taos_schemaless_insert_inner(request, lines, NULL, NULL, numLines, protocol, precision, ttl); } -TAOS_RES *taos_schemaless_insert_with_reqid(TAOS *taos, char *lines[], int numLines, int protocol, int precision, - int64_t reqid) { - if (NULL == taos) { - terrno = TSDB_CODE_TSC_DISCONNECTED; - return NULL; - } - - SRequestObj *request = (SRequestObj *)createRequest(*(int64_t *)taos, TSDB_SQL_INSERT, reqid); - if (!request) { - uError("SML:taos_schemaless_insert error request is null"); - return NULL; - } +TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int protocol, int precision) { + return taos_schemaless_insert_ttl_with_reqid(taos, lines, numLines, protocol, precision, TSDB_DEFAULT_TABLE_TTL, 0); +} - if (!lines) { - SSmlMsgBuf msg = {ERROR_MSG_BUF_DEFAULT_SIZE, request->msgBuf}; - request->code = TSDB_CODE_SML_INVALID_DATA; - smlBuildInvalidDataMsg(&msg, "lines is null", NULL); - return (TAOS_RES *)request; - } +TAOS_RES *taos_schemaless_insert_ttl(TAOS *taos, char *lines[], int numLines, int protocol, int precision, int32_t ttl) { + return taos_schemaless_insert_ttl_with_reqid(taos, lines, numLines, protocol, precision, ttl, 0); +} - return taos_schemaless_insert_inner(request, lines, NULL, NULL, numLines, protocol, precision); +TAOS_RES *taos_schemaless_insert_with_reqid(TAOS *taos, char *lines[], int numLines, int protocol, int precision, int64_t reqid) { + return taos_schemaless_insert_ttl_with_reqid(taos, lines, numLines, protocol, precision, TSDB_DEFAULT_TABLE_TTL, reqid); } -TAOS_RES *taos_schemaless_insert_raw(TAOS *taos, char *lines, int len, int32_t *totalRows, int protocol, - int precision) { +TAOS_RES *taos_schemaless_insert_raw_ttl_with_reqid(TAOS *taos, char *lines, int len, int32_t *totalRows, int protocol, + int precision, int32_t ttl, int64_t reqid) { if (NULL == taos) { terrno = TSDB_CODE_TSC_DISCONNECTED; return NULL; } - SRequestObj *request = (SRequestObj *)createRequest(*(int64_t *)taos, TSDB_SQL_INSERT, 0); + SRequestObj *request = (SRequestObj *)createRequest(*(int64_t *)taos, TSDB_SQL_INSERT, reqid); if (!request) { uError("SML:taos_schemaless_insert error request is null"); return NULL; @@ -2714,40 +2710,16 @@ TAOS_RES *taos_schemaless_insert_raw(TAOS *taos, char *lines, int len, int32_t * tmp = lines + i + 1; } } - return taos_schemaless_insert_inner(request, NULL, lines, lines + len, numLines, protocol, precision); + return taos_schemaless_insert_inner(request, NULL, lines, lines + len, numLines, protocol, precision, ttl); } -TAOS_RES *taos_schemaless_insert_raw_with_reqid(TAOS *taos, char *lines, int len, int32_t *totalRows, int protocol, - int precision, int64_t reqid) { - if (NULL == taos) { - terrno = TSDB_CODE_TSC_DISCONNECTED; - return NULL; - } - - SRequestObj *request = (SRequestObj *)createRequest(*(int64_t *)taos, TSDB_SQL_INSERT, reqid); - if (!request) { - uError("SML:taos_schemaless_insert error request is null"); - return NULL; - } - - if (!lines || len <= 0) { - SSmlMsgBuf msg = {ERROR_MSG_BUF_DEFAULT_SIZE, request->msgBuf}; - request->code = TSDB_CODE_SML_INVALID_DATA; - smlBuildInvalidDataMsg(&msg, "lines is null", NULL); - return (TAOS_RES *)request; - } +TAOS_RES *taos_schemaless_insert_raw_with_reqid(TAOS *taos, char *lines, int len, int32_t *totalRows, int protocol, int precision, int64_t reqid) { + return taos_schemaless_insert_raw_ttl_with_reqid(taos, lines, len, totalRows, protocol, precision, TSDB_DEFAULT_TABLE_TTL, reqid); +} +TAOS_RES *taos_schemaless_insert_raw_ttl(TAOS *taos, char *lines, int len, int32_t *totalRows, int protocol, int precision, int32_t ttl) { + return taos_schemaless_insert_raw_ttl_with_reqid(taos, lines, len, totalRows, protocol, precision, ttl, 0); +} - int numLines = 0; - *totalRows = 0; - char *tmp = lines; - for (int i = 0; i < len; i++) { - if (lines[i] == '\n' || i == len - 1) { - numLines++; - if (tmp[0] != '#' || protocol != TSDB_SML_LINE_PROTOCOL) { // ignore comment - (*totalRows)++; - } - tmp = lines + i + 1; - } - } - return taos_schemaless_insert_inner(request, NULL, lines, lines + len, numLines, protocol, precision); +TAOS_RES *taos_schemaless_insert_raw(TAOS *taos, char *lines, int len, int32_t *totalRows, int protocol, int precision) { + return taos_schemaless_insert_raw_ttl_with_reqid(taos, lines, len, totalRows, protocol, precision, TSDB_DEFAULT_TABLE_TTL, 0); } diff --git a/source/libs/parser/inc/parInsertUtil.h b/source/libs/parser/inc/parInsertUtil.h index 09d55d369fc0d711b4a4301c74a0cb0d7002c347..5cc72f86923762454c8cae66ef9bbc7828b9ef05 100644 --- a/source/libs/parser/inc/parInsertUtil.h +++ b/source/libs/parser/inc/parInsertUtil.h @@ -135,7 +135,7 @@ int32_t insAllocateMemForSize(STableDataBlocks *pDataBlock, int32_t allSize); int32_t insCreateSName(SName *pName, struct SToken *pTableName, int32_t acctId, const char *dbName, SMsgBuf *pMsgBuf); int32_t insFindCol(struct SToken *pColname, int32_t start, int32_t end, SSchema *pSchema); void insBuildCreateTbReq(SVCreateTbReq *pTbReq, const char *tname, STag *pTag, int64_t suid, const char *sname, - SArray *tagName, uint8_t tagNum); + SArray *tagName, uint8_t tagNum, int32_t ttl); int32_t insMemRowAppend(SMsgBuf *pMsgBuf, const void *value, int32_t len, void *param); int32_t insCheckTimestamp(STableDataBlocks *pDataBlocks, const char *start); int32_t insBuildOutput(SHashObj *pVgroupsHashObj, SArray *pVgDataBlocks, SArray **pDataBlocks); diff --git a/source/libs/parser/src/parInsertSml.c b/source/libs/parser/src/parInsertSml.c index d18b11ad57e25a863c7a507d95da92ef05d19cc3..e76ca7751dd6829b4b7c78635559ea5eeb847f69 100644 --- a/source/libs/parser/src/parInsertSml.c +++ b/source/libs/parser/src/parInsertSml.c @@ -208,7 +208,7 @@ end: } int32_t smlBindData(void* handle, SArray* tags, SArray* colsSchema, SArray* cols, bool format, STableMeta* pTableMeta, - char* tableName, const char* sTableName, int32_t sTableNameLen, char* msgBuf, int16_t msgBufLen) { + char* tableName, const char* sTableName, int32_t sTableNameLen, int32_t ttl, char* msgBuf, int16_t msgBufLen) { SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen}; SSmlExecHandle* smlHandle = (SSmlExecHandle*)handle; @@ -229,7 +229,7 @@ int32_t smlBindData(void* handle, SArray* tags, SArray* colsSchema, SArray* cols } insBuildCreateTbReq(&smlHandle->tableExecHandle.createTblReq, tableName, pTag, pTableMeta->suid, NULL, tagName, - pTableMeta->tableInfo.numOfTags); + pTableMeta->tableInfo.numOfTags, ttl); taosArrayDestroy(tagName); smlHandle->tableExecHandle.createTblReq.ctb.stbName = taosMemoryMalloc(sTableNameLen + 1); @@ -303,9 +303,9 @@ int32_t smlBindData(void* handle, SArray* tags, SArray* colsSchema, SArray* cols if (kv) { int32_t colLen = kv->length; if (pColSchema->type == TSDB_DATA_TYPE_TIMESTAMP) { - // uError("SML:data before:%" PRId64 ", precision:%d", kv->i, pTableMeta->tableInfo.precision); + uDebug("SML:data before:%" PRId64 ", precision:%d", kv->i, pTableMeta->tableInfo.precision); kv->i = convertTimePrecision(kv->i, TSDB_TIME_PRECISION_NANO, pTableMeta->tableInfo.precision); - // uError("SML:data after:%" PRId64 ", precision:%d", kv->i, pTableMeta->tableInfo.precision); + uDebug("SML:data after:%" PRId64 ", precision:%d", kv->i, pTableMeta->tableInfo.precision); } if (IS_VAR_DATA_TYPE(kv->type)) { diff --git a/source/libs/parser/src/parInsertSql.c b/source/libs/parser/src/parInsertSql.c index 1a87f626c68edb302b56535a80c10a51a1c1a235..064e376894a7de608e50430a22cf89f3ce4bf167 100644 --- a/source/libs/parser/src/parInsertSql.c +++ b/source/libs/parser/src/parInsertSql.c @@ -562,7 +562,7 @@ static int32_t parseTagValue(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt static void buildCreateTbReq(SVnodeModifOpStmt* pStmt, STag* pTag, SArray* pTagName) { insBuildCreateTbReq(&pStmt->createTblReq, pStmt->targetTableName.tname, pTag, pStmt->pTableMeta->suid, - pStmt->usingTableName.tname, pTagName, pStmt->pTableMeta->tableInfo.numOfTags); + pStmt->usingTableName.tname, pTagName, pStmt->pTableMeta->tableInfo.numOfTags, TSDB_DEFAULT_TABLE_TTL); } static int32_t checkAndTrimValue(SToken* pToken, char* tmpTokenBuf, SMsgBuf* pMsgBuf) { diff --git a/source/libs/parser/src/parInsertStmt.c b/source/libs/parser/src/parInsertStmt.c index a6ce71211a3f36dc5d2ab46b2740fa94e00343ed..e0e191b7c9b6dc7b36a974ebc7da8942bf2f68a3 100644 --- a/source/libs/parser/src/parInsertStmt.c +++ b/source/libs/parser/src/parInsertStmt.c @@ -137,7 +137,7 @@ int32_t qBindStmtTagsValue(void* pBlock, void* boundTags, int64_t suid, const ch } SVCreateTbReq tbReq = {0}; - insBuildCreateTbReq(&tbReq, tName, pTag, suid, sTableName, tagName, pDataBlock->pTableMeta->tableInfo.numOfTags); + insBuildCreateTbReq(&tbReq, tName, pTag, suid, sTableName, tagName, pDataBlock->pTableMeta->tableInfo.numOfTags, TSDB_DEFAULT_TABLE_TTL); code = insBuildCreateTbMsg(pDataBlock, &tbReq); tdDestroySVCreateTbReq(&tbReq); diff --git a/source/libs/parser/src/parInsertUtil.c b/source/libs/parser/src/parInsertUtil.c index 730dc2eab93329c6c716a7ce594a7c799b3e52bd..0600accd6df8364d74eaccfbfcb272a273719c07 100644 --- a/source/libs/parser/src/parInsertUtil.c +++ b/source/libs/parser/src/parInsertUtil.c @@ -850,7 +850,7 @@ int32_t insFindCol(SToken* pColname, int32_t start, int32_t end, SSchema* pSchem } void insBuildCreateTbReq(SVCreateTbReq* pTbReq, const char* tname, STag* pTag, int64_t suid, const char* sname, - SArray* tagName, uint8_t tagNum) { + SArray* tagName, uint8_t tagNum, int32_t ttl) { pTbReq->type = TD_CHILD_TABLE; pTbReq->name = strdup(tname); pTbReq->ctb.suid = suid; @@ -858,7 +858,7 @@ void insBuildCreateTbReq(SVCreateTbReq* pTbReq, const char* tname, STag* pTag, i if (sname) pTbReq->ctb.stbName = strdup(sname); pTbReq->ctb.pTag = (uint8_t*)pTag; pTbReq->ctb.tagName = taosArrayDup(tagName, NULL); - pTbReq->ttl = TSDB_DEFAULT_TABLE_TTL; + pTbReq->ttl = ttl; pTbReq->commentLen = -1; return; diff --git a/utils/test/c/sml_test.c b/utils/test/c/sml_test.c index 40d5bb12d2ffa01c18857b9177a47a2afe0e17a1..47b7adbf189a97b4be881b4d48fc499c4b9b7dbd 100644 --- a/utils/test/c/sml_test.c +++ b/utils/test/c/sml_test.c @@ -1084,22 +1084,55 @@ int sml_19221_Test() { return code; } -int sml_time_Test() { +int sml_ts2164_Test() { + TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0); + + TAOS_RES *pRes = taos_query(taos, "CREATE DATABASE IF NOT EXISTS line_test BUFFER 384 MINROWS 1000 PAGES 256 PRECISION 'ns'"); + taos_free_result(pRes); + + const char *sql[] = { + "meters,location=la,groupid=ca current=11.8,voltage=221,phase=0.27", + "meters,location=la,groupid=ca current=11.8,voltage=221,phase=0.27", + "meters,location=la,groupid=cb current=11.8,voltage=221,phase=0.27", + }; + + pRes = taos_query(taos, "use line_test"); + taos_free_result(pRes); + + pRes = taos_schemaless_insert(taos, (char **)sql, sizeof(sql) / sizeof(sql[0]), TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_MILLI_SECONDS); + + printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes)); + int code = taos_errno(pRes); + taos_free_result(pRes); + taos_close(taos); + + return code; +} + +int sml_ttl_Test() { TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS_RES *pRes = taos_query(taos, "create database if not exists sml_db schemaless 1"); taos_free_result(pRes); const char *sql[] = { - "meters,location=California.LosAngeles,groupid=2 current=11.8,voltage=221,phase='2022-02-02 10:22:22' 1626006833639000000", + "meters,location=California.LosAngeles,groupid=2 current=11.8,voltage=221,phase=\"2022-02-0210:22:22\" 1626006833739000000", }; pRes = taos_query(taos, "use sml_db"); taos_free_result(pRes); - pRes = taos_schemaless_insert(taos, (char **)sql, sizeof(sql) / sizeof(sql[0]), TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS); + pRes = taos_schemaless_insert_ttl(taos, (char **)sql, sizeof(sql) / sizeof(sql[0]), TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS, 20); + + printf("%s result1:%s\n", __FUNCTION__, taos_errstr(pRes)); + taos_free_result(pRes); + + pRes = taos_query(taos, "select `ttl` from information_schema.ins_tables where table_name='t_be97833a0e1f523fcdaeb6291d6fdf27'"); + printf("%s result2:%s\n", __FUNCTION__, taos_errstr(pRes)); + TAOS_ROW row = taos_fetch_row(pRes); + int32_t ttl = *(int32_t*)row[0]; + ASSERT(ttl == 20); - printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes)); int code = taos_errno(pRes); taos_free_result(pRes); taos_close(taos); @@ -1109,6 +1142,10 @@ int sml_time_Test() { int main(int argc, char *argv[]) { int ret = 0; + ret = sml_ttl_Test(); + ASSERT(!ret); + ret = sml_ts2164_Test(); + ASSERT(!ret); ret = smlProcess_influx_Test(); ASSERT(!ret); ret = smlProcess_telnet_Test();