未验证 提交 0ac78291 编写于 作者: wmmhello's avatar wmmhello 提交者: GitHub

Merge pull request #13384 from taosdata/feature/TD-14761

feat:add schemaless param from db
......@@ -779,6 +779,7 @@ typedef struct {
int8_t cacheLastRow;
int32_t numOfRetensions;
SArray* pRetensions;
int8_t schemaless;
} SDbCfgRsp;
int32_t tSerializeSDbCfgRsp(void* buf, int32_t bufLen, const SDbCfgRsp* pRsp);
......
......@@ -49,6 +49,7 @@ typedef struct SParseContext {
const char* pUser;
bool isSuperUser;
bool async;
int8_t schemalessType;
} SParseContext;
int32_t qParseSql(SParseContext* pCxt, SQuery** pQuery);
......
......@@ -675,6 +675,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_SML_INVALID_PROTOCOL_TYPE TAOS_DEF_ERROR_CODE(0, 0x3000)
#define TSDB_CODE_SML_INVALID_PRECISION_TYPE TAOS_DEF_ERROR_CODE(0, 0x3001)
#define TSDB_CODE_SML_INVALID_DATA TAOS_DEF_ERROR_CODE(0, 0x3002)
#define TSDB_CODE_SML_INVALID_DB_CONF TAOS_DEF_ERROR_CODE(0, 0x3003)
#ifdef __cplusplus
}
......
......@@ -151,6 +151,7 @@ typedef struct STscObj {
int32_t numOfReqs; // number of sqlObj bound to this connection
SAppInstInfo* pAppInfo;
SHashObj* pRequests;
int8_t schemalessType;
} STscObj;
typedef struct SResultColumn {
......
......@@ -161,6 +161,7 @@ void *createTscObj(const char *user, const char *auth, const char *db, int32_t c
taosThreadMutexInit(&pObj->mutex, NULL);
pObj->id = taosAddRef(clientConnRefPool, pObj);
pObj->schemalessType = 0;
tscDebug("connObj created, 0x%" PRIx64, pObj->id);
return pObj;
......
......@@ -176,6 +176,7 @@ int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtC
.pTransporter = pTscObj->pAppInfo->pTransporter,
.pStmtCb = pStmtCb,
.pUser = pTscObj->user,
.schemalessType = pTscObj->schemalessType,
.isSuperUser = (0 == strcmp(pTscObj->user, TSDB_DEFAULT_USER))};
cxt.mgmtEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
......
......@@ -960,10 +960,10 @@ static int32_t smlParseInfluxString(const char* sql, SSmlLineInfo *elements, SSm
return TSDB_CODE_SML_INVALID_DATA;
}
elements->colsLen = sql - elements->cols;
if(elements->colsLen == 0) {
smlBuildInvalidDataMsg(msg, "cols is empty", NULL);
return TSDB_CODE_SML_INVALID_DATA;
}
// if(elements->colsLen == 0) {
// smlBuildInvalidDataMsg(msg, "cols is empty", NULL);
// return TSDB_CODE_SML_INVALID_DATA;
// }
// parse timestamp
JUMP_SPACE(sql)
......@@ -1124,7 +1124,7 @@ static int32_t smlParseTelnetString(SSmlHandle *info, const char* sql, SSmlTable
}
static int32_t smlParseCols(const char* data, int32_t len, SArray *cols, char *childTableName, bool isTag, SHashObj *dumplicateKey, SSmlMsgBuf *msg){
if(isTag && len == 0){
if(len == 0){
return TSDB_CODE_SUCCESS;
}
......@@ -2318,6 +2318,28 @@ cleanup:
return code;
}
static int32_t isSchemalessDb(SSmlHandle* info){
SName name;
tNameSetDbName(&name, info->taos->acctId, info->taos->db, strlen(info->taos->db));
char dbFname[TSDB_DB_FNAME_LEN] = {0};
tNameGetFullDbName(&name, dbFname);
SDbCfgInfo pInfo = {0};
SEpSet ep = getEpSet_s(&info->taos->pAppInfo->mgmtEp);
int32_t code = catalogGetDBCfg(info->pCatalog, info->taos->pAppInfo->pTransporter, &ep, dbFname, &pInfo);
if (code != TSDB_CODE_SUCCESS) {
info->pRequest->code = code;
smlBuildInvalidDataMsg(&info->msgBuf, "catalogGetDBCfg error, code:", tstrerror(code));
return code;
}
if (!pInfo.schemaless){
info->pRequest->code = TSDB_CODE_SML_INVALID_DB_CONF;
smlBuildInvalidDataMsg(&info->msgBuf, "can not insert into schemaless db:", dbFname);
return TSDB_CODE_SML_INVALID_DB_CONF;
}
return TSDB_CODE_SUCCESS;
}
/**
* taos_schemaless_insert() parse and insert data points into database according to
* different protocol.
......@@ -2351,6 +2373,19 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr
return (TAOS_RES*)request;
}
info->taos->schemalessType = 1;
if(request->pDb == NULL){
request->code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
smlBuildInvalidDataMsg(&info->msgBuf, "Database not specified", NULL);
goto end;
}
if(isSchemalessDb(info) != TSDB_CODE_SUCCESS){
request->code = TSDB_CODE_SML_INVALID_DB_CONF;
smlBuildInvalidDataMsg(&info->msgBuf, "Cannot write data to a non schemaless database", NULL);
goto end;
}
if (!lines) {
request->code = TSDB_CODE_SML_INVALID_DATA;
smlBuildInvalidDataMsg(&info->msgBuf, "lines is null", NULL);
......@@ -2372,6 +2407,7 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr
info->pRequest->code = smlProcess(info, lines, numLines);
end:
uDebug("result:%s", info->msgBuf.buf);
smlDestroyInfo(info);
return (TAOS_RES*)request;
}
......
......@@ -1258,4 +1258,26 @@ TEST(testCase, sml_TD15742_Test) {
destroyRequest(request);
smlDestroyInfo(info);
}
\ No newline at end of file
}
TEST(testCase, sml_params_Test) {
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(taos, nullptr);
TAOS_RES* pRes = taos_query(taos, "create database if not exists param");
taos_free_result(pRes);
const char *sql[] = {
"test_ms,t0=t c0=f 1626006833641",
};
TAOS_RES* res = taos_schemaless_insert(taos, (char**)sql, 1, TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_MILLI_SECONDS);
ASSERT_EQ(taos_errno(res), TSDB_CODE_PAR_DB_NOT_SPECIFIED);
taos_free_result(pRes);
pRes = taos_query(taos, "use param");
taos_free_result(pRes);
res = taos_schemaless_insert(taos, (char**)sql, 1, TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_MILLI_SECONDS);
ASSERT_EQ(taos_errno(res), TSDB_CODE_SML_INVALID_DB_CONF);
taos_free_result(pRes);
}
......@@ -91,6 +91,8 @@ static const SSysDbTableSchema userDBSchema[] = {
{.name = "precision", .bytes = 2 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "single_stable", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT},
{.name = "status", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "schemaless", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT},
// {.name = "update", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT}, // disable update
};
......
......@@ -2278,6 +2278,7 @@ int32_t tSerializeSDbCfgRsp(void *buf, int32_t bufLen, const SDbCfgRsp *pRsp) {
if (tEncodeI8(&encoder, pRetension->freqUnit) < 0) return -1;
if (tEncodeI8(&encoder, pRetension->keepUnit) < 0) return -1;
}
if (tEncodeI8(&encoder, pRsp->schemaless) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
......@@ -2326,6 +2327,7 @@ int32_t tDeserializeSDbCfgRsp(void *buf, int32_t bufLen, SDbCfgRsp *pRsp) {
return -1;
}
}
if (tDecodeI8(&decoder, &pRsp->schemaless) < 0) return -1;
tEndDecode(&decoder);
tDecoderClear(&decoder);
......
......@@ -255,6 +255,7 @@ typedef struct {
int8_t hashMethod; // default is 1
int32_t numOfRetensions;
SArray* pRetensions;
int8_t schemaless;
} SDbCfg;
typedef struct {
......
......@@ -115,6 +115,7 @@ static SSdbRaw *mndDbActionEncode(SDbObj *pDb) {
SDB_SET_INT8(pRaw, dataPos, pRetension->freqUnit, _OVER)
SDB_SET_INT8(pRaw, dataPos, pRetension->keepUnit, _OVER)
}
SDB_SET_INT8(pRaw, dataPos, pDb->cfg.schemaless, _OVER)
SDB_SET_RESERVE(pRaw, dataPos, DB_RESERVE_SIZE, _OVER)
SDB_SET_DATALEN(pRaw, dataPos, _OVER)
......@@ -192,6 +193,7 @@ static SSdbRow *mndDbActionDecode(SSdbRaw *pRaw) {
}
}
}
SDB_GET_INT8(pRaw, dataPos, &pDb->cfg.schemaless, _OVER)
SDB_GET_RESERVE(pRaw, dataPos, DB_RESERVE_SIZE, _OVER)
taosInitRWLatch(&pDb->lock);
......@@ -380,6 +382,7 @@ static int32_t mndCheckDbCfg(SMnode *pMnode, SDbCfg *pCfg) {
if (pCfg->replications < TSDB_MIN_DB_REPLICA || pCfg->replications > TSDB_MAX_DB_REPLICA) return -1;
if (pCfg->replications != 1 && pCfg->replications != 3) return -1;
if (pCfg->strict < TSDB_DB_STRICT_OFF || pCfg->strict > TSDB_DB_STRICT_ON) return -1;
if (pCfg->schemaless < TSDB_DB_SCHEMALESS_OFF || pCfg->schemaless > TSDB_DB_SCHEMALESS_ON) return -1;
if (pCfg->cacheLastRow < TSDB_MIN_DB_CACHE_LAST_ROW || pCfg->cacheLastRow > TSDB_MAX_DB_CACHE_LAST_ROW) return -1;
if (pCfg->hashMethod != 1) return -1;
if (pCfg->replications > mndGetDnodeSize(pMnode)) {
......@@ -411,6 +414,8 @@ static void mndSetDefaultDbCfg(SDbCfg *pCfg) {
if (pCfg->strict < 0) pCfg->strict = TSDB_DEFAULT_DB_STRICT;
if (pCfg->cacheLastRow < 0) pCfg->cacheLastRow = TSDB_DEFAULT_CACHE_LAST_ROW;
if (pCfg->numOfRetensions < 0) pCfg->numOfRetensions = 0;
if (pCfg->schemaless < 0) pCfg->schemaless = TSDB_DB_SCHEMALESS_OFF;
}
static int32_t mndSetCreateDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroups) {
......@@ -521,6 +526,7 @@ static int32_t mndCreateDb(SMnode *pMnode, SRpcMsg *pReq, SCreateDbReq *pCreate,
.strict = pCreate->strict,
.cacheLastRow = pCreate->cacheLastRow,
.hashMethod = 1,
.schemaless = pCreate->schemaless,
};
dbObj.cfg.numOfRetensions = pCreate->numOfRetensions;
......@@ -899,6 +905,7 @@ static int32_t mndProcessGetDbCfgReq(SRpcMsg *pReq) {
cfgRsp.cacheLastRow = pDb->cfg.cacheLastRow;
cfgRsp.numOfRetensions = pDb->cfg.numOfRetensions;
cfgRsp.pRetensions = pDb->cfg.pRetensions;
cfgRsp.schemaless = pDb->cfg.schemaless;
int32_t contLen = tSerializeSDbCfgRsp(NULL, 0, &cfgRsp);
void *pRsp = rpcMallocCont(contLen);
......@@ -1542,8 +1549,11 @@ static void dumpDbInfoData(SSDataBlock *pBlock, SDbObj *pDb, SShowObj *pShow, in
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.numOfStables, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, rows, (const char *)statusB, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols);
colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.schemaless, false);
}
}
......
......@@ -32,6 +32,7 @@ int32_t authenticate(SParseContext* pParseCxt, SQuery* pQuery);
int32_t translate(SParseContext* pParseCxt, SQuery* pQuery);
int32_t extractResultSchema(const SNode* pRoot, int32_t* numOfCols, SSchema** pSchema);
int32_t calculateConstant(SParseContext* pParseCxt, SQuery* pQuery);
int32_t isNotSchemalessDb(SParseContext* pContext, char *dbName);
#ifdef __cplusplus
}
......
......@@ -1335,6 +1335,8 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
SName name;
CHECK_CODE(createSName(&name, &tbnameToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg));
CHECK_CODE(isNotSchemalessDb(pCxt->pComCxt, name.dbname));
tNameExtractFullName(&name, tbFName);
CHECK_CODE(taosHashPut(pCxt->pTableNameHashObj, tbFName, strlen(tbFName), &name, sizeof(SName)));
char dbFName[TSDB_DB_FNAME_LEN];
......@@ -1411,6 +1413,23 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
return buildOutput(pCxt);
}
int32_t isNotSchemalessDb(SParseContext* pContext, char *dbName){
SName name;
tNameSetDbName(&name, pContext->acctId, dbName, strlen(dbName));
char dbFname[TSDB_DB_FNAME_LEN] = {0};
tNameGetFullDbName(&name, dbFname);
SDbCfgInfo pInfo = {0};
int32_t code = catalogGetDBCfg(pContext->pCatalog, pContext->pTransporter, &pContext->mgmtEpSet, dbFname, &pInfo);
if (code != TSDB_CODE_SUCCESS) {
parserError("catalogGetDBCfg error, code:%s, dbFName:%s", tstrerror(code), dbFname);
return code;
}
if (pInfo.schemaless){
parserError("can not insert into schemaless db:%s", dbFname);
return TSDB_CODE_SML_INVALID_DB_CONF;
}
return TSDB_CODE_SUCCESS;
}
// INSERT INTO
// tb_name
// [USING stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)]
......
......@@ -2646,6 +2646,11 @@ static int32_t checkCreateTable(STranslateContext* pCxt, SCreateTableStmt* pStmt
if (TSDB_CODE_SUCCESS == code) {
code = checkTableSchema(pCxt, pStmt);
}
if (TSDB_CODE_SUCCESS == code) {
if(pCxt->pParseCxt->schemalessType == 0){
code = isNotSchemalessDb(pCxt->pParseCxt, pStmt->dbName);
}
}
return code;
}
......@@ -4424,6 +4429,7 @@ static SArray* serializeVgroupsCreateTableBatch(int32_t acctId, SHashObj* pVgrou
}
static int32_t rewriteCreateMultiTable(STranslateContext* pCxt, SQuery* pQuery) {
SCreateMultiTableStmt* pStmt = (SCreateMultiTableStmt*)pQuery->pRoot;
SHashObj* pVgroupHashmap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
......@@ -4434,6 +4440,10 @@ static int32_t rewriteCreateMultiTable(STranslateContext* pCxt, SQuery* pQuery)
int32_t code = TSDB_CODE_SUCCESS;
SNode* pNode;
FOREACH(pNode, pStmt->pSubTables) {
if(pCxt->pParseCxt->schemalessType == 0 &&
(code = isNotSchemalessDb(pCxt->pParseCxt, ((SCreateSubTableClause*)pNode)->dbName)) != TSDB_CODE_SUCCESS){
return code;
}
code = rewriteCreateSubTable(pCxt, (SCreateSubTableClause*)pNode, pVgroupHashmap);
if (TSDB_CODE_SUCCESS != code) {
taosHashCleanup(pVgroupHashmap);
......@@ -4845,9 +4855,14 @@ static int32_t buildModifyVnodeArray(STranslateContext* pCxt, SAlterTableStmt* p
static int32_t rewriteAlterTable(STranslateContext* pCxt, SQuery* pQuery) {
SAlterTableStmt* pStmt = (SAlterTableStmt*)pQuery->pRoot;
int32_t code = TSDB_CODE_SUCCESS;
if(pCxt->pParseCxt->schemalessType == 0 &&
(code = isNotSchemalessDb(pCxt->pParseCxt, pStmt->dbName)) != TSDB_CODE_SUCCESS){
return code;
}
STableMeta* pTableMeta = NULL;
int32_t code = getTableMeta(pCxt, pStmt->dbName, pStmt->tableName, &pTableMeta);
code = getTableMeta(pCxt, pStmt->dbName, pStmt->tableName, &pTableMeta);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
......
......@@ -470,6 +470,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_UDF_INVALID_OUTPUT_TYPE, "udf invalid output ty
TAOS_DEFINE_ERROR(TSDB_CODE_SML_INVALID_PROTOCOL_TYPE, "Invalid line protocol type")
TAOS_DEFINE_ERROR(TSDB_CODE_SML_INVALID_PRECISION_TYPE, "Invalid timestamp precision type")
TAOS_DEFINE_ERROR(TSDB_CODE_SML_INVALID_DATA, "Invalid data type")
TAOS_DEFINE_ERROR(TSDB_CODE_SML_INVALID_DB_CONF, "Invalid schemaless db config")
#ifdef TAOS_ERROR_C
};
......
......@@ -33,10 +33,10 @@ class TDTestCase:
def createDb(self, name="test", db_update_tag=0):
if db_update_tag == 0:
tdSql.execute(f"drop database if exists {name}")
tdSql.execute(f"create database if not exists {name} precision 'ms'")
tdSql.execute(f"create database if not exists {name} precision 'ms' schemaless 1")
else:
tdSql.execute(f"drop database if exists {name}")
tdSql.execute(f"create database if not exists {name} precision 'ms' update 1")
tdSql.execute(f"create database if not exists {name} precision 'ms' update 1 schemaless 1")
tdSql.execute(f'use {name}')
def timeTrans(self, time_value, ts_type):
......
......@@ -36,10 +36,10 @@ class TDTestCase:
if db_update_tag == 0:
tdSql.execute(f"drop database if exists {name}")
tdSql.execute(f"create database if not exists {name} precision 'ms'")
tdSql.execute(f"create database if not exists {name} precision 'ms' schemaless 1")
else:
tdSql.execute(f"drop database if exists {name}")
tdSql.execute(f"create database if not exists {name} precision 'ms' update 1")
tdSql.execute(f"create database if not exists {name} precision 'ms' update 1 schemaless 1")
tdSql.execute(f'use {name}')
def timeTrans(self, time_value):
......@@ -535,7 +535,7 @@ class TDTestCase:
# check result
#! bug
tdSql.execute(f"drop database if exists test_ts")
tdSql.execute(f"create database if not exists test_ts precision 'ms'")
tdSql.execute(f"create database if not exists test_ts precision 'ms' schemaless 1")
tdSql.execute("use test_ts")
input_json = [{"metric": "test_ms", "timestamp": {"value": 1626006833640, "type": "ms"}, "value": True, "tags": {"t0": True}},
{"metric": "test_ms", "timestamp": {"value": 1626006833641, "type": "ms"}, "value": False, "tags": {"t0": True}}]
......@@ -545,7 +545,7 @@ class TDTestCase:
tdSql.checkEqual(str(res[1][0]), "2021-07-11 20:33:53.641000")
tdSql.execute(f"drop database if exists test_ts")
tdSql.execute(f"create database if not exists test_ts precision 'us'")
tdSql.execute(f"create database if not exists test_ts precision 'us' schemaless 1")
tdSql.execute("use test_ts")
input_json = [{"metric": "test_us", "timestamp": {"value": 1626006833639000, "type": "us"}, "value": True, "tags": {"t0": True}},
{"metric": "test_us", "timestamp": {"value": 1626006833639001, "type": "us"}, "value": False, "tags": {"t0": True}}]
......@@ -555,7 +555,7 @@ class TDTestCase:
tdSql.checkEqual(str(res[1][0]), "2021-07-11 20:33:53.639001")
tdSql.execute(f"drop database if exists test_ts")
tdSql.execute(f"create database if not exists test_ts precision 'ns'")
tdSql.execute(f"create database if not exists test_ts precision 'ns' schemaless 1")
tdSql.execute("use test_ts")
input_json = [{"metric": "test_ns", "timestamp": {"value": 1626006833639000000, "type": "ns"}, "value": True, "tags": {"t0": True}},
{"metric": "test_ns", "timestamp": {"value": 1626006833639000001, "type": "ns"}, "value": False, "tags": {"t0": True}}]
......
......@@ -36,10 +36,10 @@ class TDTestCase:
if db_update_tag == 0:
tdSql.execute(f"drop database if exists {name}")
tdSql.execute(f"create database if not exists {name} precision 'us'")
tdSql.execute(f"create database if not exists {name} precision 'us' schemaless 1")
else:
tdSql.execute(f"drop database if exists {name}")
tdSql.execute(f"create database if not exists {name} precision 'ns' update 1")
tdSql.execute(f"create database if not exists {name} precision 'ns' update 1 schemaless 1")
tdSql.execute(f'use {name}')
def timeTrans(self, time_value, ts_type):
......@@ -393,7 +393,7 @@ class TDTestCase:
self.resCmp(input_sql, stb_name, ts_type=TDSmlTimestampType.SECOND.value)
tdSql.execute(f"drop database if exists test_ts")
tdSql.execute(f"create database if not exists test_ts precision 'ms'")
tdSql.execute(f"create database if not exists test_ts precision 'ms' schemaless 1")
tdSql.execute("use test_ts")
input_sql = ['test_ms 1626006833640 t t0=t', 'test_ms 1626006833641 f t0=t']
self._conn.schemaless_insert(input_sql, TDSmlProtocolType.TELNET.value, None)
......
......@@ -86,6 +86,7 @@ void dumpDb(SSdb *pSdb, SJson *json) {
tjsonAddIntegerToObject(item, "cacheLastRow", pObj->cfg.cacheLastRow);
tjsonAddIntegerToObject(item, "hashMethod", pObj->cfg.hashMethod);
tjsonAddIntegerToObject(item, "numOfRetensions", pObj->cfg.numOfRetensions);
tjsonAddIntegerToObject(item, "schemaless", pObj->cfg.schemaless);
sdbRelease(pSdb, pObj);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册