提交 ba3697c6 编写于 作者: H Hongze Cheng

Merge branch '3.0' of https://github.com/taosdata/TDengine into feat/vnode_compact

......@@ -2,7 +2,7 @@
# taosadapter
ExternalProject_Add(taosadapter
GIT_REPOSITORY https://github.com/taosdata/taosadapter.git
GIT_TAG f0c1753
GIT_TAG 5662a6d
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taosadapter"
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE
......
......@@ -30,6 +30,8 @@ database_option: {
| WAL_LEVEL {1 | 2}
| VGROUPS value
| SINGLE_STABLE {0 | 1}
| TABLE_PREFIX value
| TABLE_SUFFIX value
| WAL_RETENTION_PERIOD value
| WAL_ROLL_PERIOD value
| WAL_RETENTION_SIZE value
......@@ -67,6 +69,8 @@ database_option: {
- SINGLE_STABLE: specifies whether the database can contain more than one supertable.
- 0: The database can contain multiple supertables.
- 1: The database can contain only one supertable.
- TABLE_PREFIX:The prefix length in the table name that is ignored when distributing table to vnode based on table name.
- TABLE_SUFFIX:The suffix length in the table name that is ignored when distributing table to vnode based on table name.
- WAL_RETENTION_PERIOD: specifies the time after which WAL files are deleted. This parameter is used for data subscription. Enter a time in seconds. The default value of single copy is 0. A value of 0 indicates that each WAL file is deleted immediately after its contents are written to disk. -1: WAL files are never deleted. The default value of multiple copy is 4 days.
- WAL_RETENTION_SIZE: specifies the size at which WAL files are deleted. This parameter is used for data subscription. Enter a size in KB. The default value of single copy is 0. A value of 0 indicates that each WAL file is deleted immediately after its contents are written to disk. -1: WAL files are never deleted. The default value of multiple copy is -1.
- WAL_ROLL_PERIOD: specifies the time after which WAL files are rotated. After this period elapses, a new WAL file is created. The default value of single copy is 0. A value of 0 indicates that a new WAL file is created only after the previous WAL file was written to disk. The default values of multiple copy is 1 day.
......
......@@ -30,6 +30,8 @@ database_option: {
| WAL_LEVEL {1 | 2}
| VGROUPS value
| SINGLE_STABLE {0 | 1}
| TABLE_PREFIX value
| TABLE_SUFFIX value
| WAL_RETENTION_PERIOD value
| WAL_ROLL_PERIOD value
| WAL_RETENTION_SIZE value
......@@ -67,6 +69,8 @@ database_option: {
- SINGLE_STABLE:表示此数据库中是否只可以创建一个超级表,用于超级表列非常多的情况。
- 0:表示可以创建多张超级表。
- 1:表示只可以创建一张超级表。
- TABLE_PREFIX:内部存储引擎根据表名分配存储该表数据的 VNODE 时要忽略的前缀的长度。
- TABLE_SUFFIX:内部存储引擎根据表名分配存储该表数据的 VNODE 时要忽略的后缀的长度。
- WAL_RETENTION_PERIOD:wal 文件的额外保留策略,用于数据订阅。wal 的保存时长,单位为 s。单副本默认为 0,即落盘后立即删除。-1 表示不删除。多副本默认为 4 天。
- WAL_RETENTION_SIZE:wal 文件的额外保留策略,用于数据订阅。wal 的保存的最大上限,单位为 KB。单副本默认为 0,即落盘后立即删除。多副本默认为-1,表示不删除。
- WAL_ROLL_PERIOD:wal 文件切换时长,单位为 s。当 wal 文件创建并写入后,经过该时间,会自动创建一个新的 wal 文件。单副本默认为 0,即仅在落盘时创建新文件。多副本默认为 1 天。
......
......@@ -213,6 +213,7 @@ int32_t qStreamSourceRecoverStep1(qTaskInfo_t tinfo, int64_t ver);
int32_t qStreamSourceRecoverStep2(qTaskInfo_t tinfo, int64_t ver);
int32_t qStreamRecoverFinish(qTaskInfo_t tinfo);
int32_t qStreamRestoreParam(qTaskInfo_t tinfo);
bool qStreamRecoverScanFinished(qTaskInfo_t tinfo);
#ifdef __cplusplus
}
......
......@@ -19,7 +19,7 @@ void metaReaderInit(SMetaReader *pReader, SMeta *pMeta, int32_t flags) {
memset(pReader, 0, sizeof(*pReader));
pReader->flags = flags;
pReader->pMeta = pMeta;
if (!(flags & META_READER_NOLOCK)) {
if (pReader->pMeta && !(flags & META_READER_NOLOCK)) {
metaRLock(pMeta);
}
}
......@@ -239,7 +239,6 @@ int metaGetTableSzNameByUid(void *meta, uint64_t uid, char *tbName) {
return 0;
}
int metaGetTableUidByName(void *meta, char *tbName, uint64_t *uid) {
int code = 0;
SMetaReader mr = {0};
......@@ -756,9 +755,7 @@ int64_t metaGetTimeSeriesNum(SMeta *pMeta) {
return pMeta->pVnode->config.vndStats.numOfTimeSeries + pMeta->pVnode->config.vndStats.numOfNTimeSeries;
}
int64_t metaGetNtbNum(SMeta *pMeta) {
return pMeta->pVnode->config.vndStats.numOfNTables;
}
int64_t metaGetNtbNum(SMeta *pMeta) { return pMeta->pVnode->config.vndStats.numOfNTables; }
typedef struct {
SMeta *pMeta;
......
......@@ -136,6 +136,7 @@ typedef struct {
SSchemaWrapper* schema;
char tbName[TSDB_TABLE_NAME_LEN];
int8_t recoverStep;
int8_t recoverScanFinished;
SQueryTableDataCond tableCond;
int64_t fillHistoryVer1;
int64_t fillHistoryVer2;
......@@ -182,7 +183,7 @@ struct SExecTaskInfo {
SSubplan* pSubplan;
struct SOperatorInfo* pRoot;
SLocalFetch localFetch;
SArray* pResultBlockList;// result block list
SArray* pResultBlockList; // result block list
STaskStopInfo stopInfo;
};
......@@ -263,7 +264,7 @@ typedef struct SExchangeInfo {
// SArray<SSDataBlock*>, result block list, used to keep the multi-block that
// passed by downstream operator
SArray* pResultBlockList;
SArray* pRecycledBlocks;// build a pool for small data block to avoid to repeatly create and then destroy.
SArray* pRecycledBlocks; // build a pool for small data block to avoid to repeatly create and then destroy.
SSDataBlock* pDummyBlock; // dummy block, not keep data
bool seqLoadData; // sequential load data or not, false by default
int32_t current;
......@@ -468,6 +469,11 @@ typedef struct SStreamScanInfo {
SNodeList* pGroupTags;
SNode* pTagCond;
SNode* pTagIndexCond;
// recover
int32_t blockRecoverContiCnt;
int32_t blockRecoverTotCnt;
} SStreamScanInfo;
typedef struct {
......
......@@ -937,6 +937,11 @@ int32_t qStreamRestoreParam(qTaskInfo_t tinfo) {
return 0;
}
bool qStreamRecoverScanFinished(qTaskInfo_t tinfo) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
return pTaskInfo->streamInfo.recoverScanFinished;
}
void* qExtractReaderFromStreamScanner(void* scanner) {
SStreamScanInfo* pInfo = scanner;
return (void*)pInfo->tqReader;
......
......@@ -1785,11 +1785,18 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
pTSInfo->scanTimes = 0;
pTSInfo->currentGroupId = -1;
pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__SCAN;
pTaskInfo->streamInfo.recoverScanFinished = false;
}
if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__SCAN) {
if (pInfo->blockRecoverContiCnt > 100) {
pInfo->blockRecoverTotCnt += pInfo->blockRecoverContiCnt;
pInfo->blockRecoverContiCnt = 0;
return NULL;
}
SSDataBlock* pBlock = doTableScan(pInfo->pTableScanOp);
if (pBlock != NULL) {
pInfo->blockRecoverContiCnt++;
calBlockTbName(pInfo, pBlock);
if (pInfo->pUpdateInfo) {
TSKEY maxTs = updateInfoFillBlockData(pInfo->pUpdateInfo, pBlock, pInfo->primaryTsIndex);
......@@ -1807,6 +1814,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
pTSInfo->base.cond.startVersion = -1;
pTSInfo->base.cond.endVersion = -1;
pTaskInfo->streamInfo.recoverScanFinished = true;
return NULL;
}
......
......@@ -21,6 +21,25 @@ namespace ParserTest {
class ParserInitialATest : public ParserDdlTest {};
/*
* ALTER ACCOUNT account_name alter_account_options
*
* alter_account_options:
* alter_account_option ...
*
* alter_account_option: {
* PASS value
* | PPS value
* | TSERIES value
* | STORAGE value
* | STREAMS value
* | QTIME value
* | DBS value
* | USERS value
* | CONNS value
* | STATE value
* }
*/
TEST_F(ParserInitialATest, alterAccount) {
useDb("root", "test");
......@@ -48,6 +67,7 @@ TEST_F(ParserInitialATest, alterDnode) {
setCheckDdlFunc([&](const SQuery* pQuery, ParserStage stage) {
ASSERT_EQ(nodeType(pQuery->pRoot), QUERY_NODE_ALTER_DNODE_STMT);
ASSERT_EQ(pQuery->pCmdMsg->msgType, TDMT_MND_CONFIG_DNODE);
SMCfgDnodeReq req = {0};
ASSERT_EQ(tDeserializeSMCfgDnodeReq(pQuery->pCmdMsg->pMsg, pQuery->pCmdMsg->msgLen, &req), TSDB_CODE_SUCCESS);
ASSERT_EQ(req.dnodeId, expect.dnodeId);
......@@ -86,9 +106,8 @@ TEST_F(ParserInitialATest, alterDnode) {
* | KEEP {int_value | duration_value} -- rang [1, 365000], default 3650, unit day
* | PAGES int_value -- rang [64, INT32_MAX], default 256, unit page
* | REPLICA int_value -- todo: enum 1, 3, default 1, unit replica
* | STRICT {'off' | 'on'} -- todo: default 'off'
* | WAL_LEVEL int_value -- enum 1, 2, default 1
* | SST_TRIGGER int_value -- rang [1, 16], default 8
* | STT_TRIGGER int_value -- rang [1, 16], default 8
* }
*/
TEST_F(ParserInitialATest, alterDatabase) {
......@@ -130,10 +149,11 @@ TEST_F(ParserInitialATest, alterDatabase) {
auto setAlterDbStrict = [&](int8_t strict) { expect.strict = strict; };
auto setAlterDbCacheModel = [&](int8_t cacheModel) { expect.cacheLast = cacheModel; };
auto setAlterDbReplica = [&](int8_t replications) { expect.replications = replications; };
auto setAlterDbSstTrigger = [&](int8_t sstTrigger) { expect.sstTrigger = sstTrigger; };
auto setAlterDbSttTrigger = [&](int8_t sstTrigger) { expect.sstTrigger = sstTrigger; };
setCheckDdlFunc([&](const SQuery* pQuery, ParserStage stage) {
ASSERT_EQ(nodeType(pQuery->pRoot), QUERY_NODE_ALTER_DATABASE_STMT);
ASSERT_EQ(pQuery->pCmdMsg->msgType, TDMT_MND_ALTER_DB);
SAlterDbReq req = {0};
ASSERT_EQ(tDeserializeSAlterDbReq(pQuery->pCmdMsg->pMsg, pQuery->pCmdMsg->msgLen, &req), TSDB_CODE_SUCCESS);
ASSERT_EQ(std::string(req.db), std::string(expect.db));
......@@ -161,11 +181,12 @@ TEST_F(ParserInitialATest, alterDatabase) {
setAlterDbFsync(200);
setAlterDbWal(1);
setAlterDbCacheModel(TSDB_CACHE_MODEL_LAST_ROW);
setAlterDbSstTrigger(16);
setAlterDbSttTrigger(16);
setAlterDbBuffer(16);
setAlterDbPages(128);
setAlterDbReplica(3);
run("ALTER DATABASE test BUFFER 16 CACHEMODEL 'last_row' CACHESIZE 32 WAL_FSYNC_PERIOD 200 KEEP 10 PAGES 128 "
"WAL_LEVEL 1 STT_TRIGGER 16");
"REPLICA 3 WAL_LEVEL 1 STT_TRIGGER 16");
clearAlterDbReq();
initAlterDb("test");
......@@ -240,6 +261,22 @@ TEST_F(ParserInitialATest, alterDatabase) {
setAlterDbWal(2);
run("ALTER DATABASE test WAL_LEVEL 2");
clearAlterDbReq();
initAlterDb("test");
setAlterDbReplica(1);
run("ALTER DATABASE test REPLICA 1");
setAlterDbReplica(3);
run("ALTER DATABASE test REPLICA 3");
clearAlterDbReq();
initAlterDb("test");
setAlterDbSttTrigger(1);
run("ALTER DATABASE test STT_TRIGGER 1");
setAlterDbSttTrigger(4);
run("ALTER DATABASE test STT_TRIGGER 4");
setAlterDbSttTrigger(16);
run("ALTER DATABASE test STT_TRIGGER 16");
clearAlterDbReq();
}
TEST_F(ParserInitialATest, alterDatabaseSemanticCheck) {
......@@ -260,6 +297,7 @@ TEST_F(ParserInitialATest, alterDatabaseSemanticCheck) {
run("ALTER DATABASE test PAGES 63", TSDB_CODE_PAR_INVALID_DB_OPTION);
run("ALTER DATABASE test WAL_LEVEL 0", TSDB_CODE_PAR_INVALID_DB_OPTION);
run("ALTER DATABASE test WAL_LEVEL 3", TSDB_CODE_PAR_INVALID_DB_OPTION);
run("ALTER DATABASE test REPLICA 2", TSDB_CODE_PAR_INVALID_DB_OPTION);
run("ALTER DATABASE test STT_TRIGGER 0", TSDB_CODE_PAR_INVALID_DB_OPTION);
run("ALTER DATABASE test STT_TRIGGER 17", TSDB_CODE_PAR_INVALID_DB_OPTION);
// Regardless of the specific sentence
......@@ -267,7 +305,7 @@ TEST_F(ParserInitialATest, alterDatabaseSemanticCheck) {
}
/*
* ALTER LOCAL dnode_id 'config' ['value']
* ALTER LOCAL 'config' ['value']
*/
TEST_F(ParserInitialATest, alterLocal) {
useDb("root", "test");
......@@ -311,19 +349,19 @@ TEST_F(ParserInitialATest, alterLocal) {
* | ADD COLUMN col_name column_type
* | DROP COLUMN col_name
* | MODIFY COLUMN col_name column_type
* | RENAME COLUMN old_col_name new_col_name -- normal table
* | ADD TAG tag_name tag_type -- super table
* | DROP TAG tag_name -- super table
* | MODIFY TAG tag_name tag_type -- super table
* | RENAME TAG old_tag_name new_tag_name -- super table
* | SET TAG tag_name = new_tag_value -- child table
* | RENAME COLUMN old_col_name new_col_name -- only normal table
* | ADD TAG tag_name tag_type -- only super table
* | DROP TAG tag_name -- only super table
* | MODIFY TAG tag_name tag_type -- only super table
* | RENAME TAG old_tag_name new_tag_name -- only super table
* | SET TAG tag_name = new_tag_value -- only child table
* }
*
* alter_table_options:
* alter_table_option ...
*
* alter_table_option: {
* TTL int_value -- child/normal table
* TTL int_value -- only child/normal table
* | COMMENT 'string_value'
* }
*/
......@@ -379,6 +417,7 @@ TEST_F(ParserInitialATest, alterSTable) {
setCheckDdlFunc([&](const SQuery* pQuery, ParserStage stage) {
ASSERT_EQ(nodeType(pQuery->pRoot), QUERY_NODE_ALTER_SUPER_TABLE_STMT);
ASSERT_EQ(pQuery->pCmdMsg->msgType, TDMT_MND_ALTER_STB);
SMAlterStbReq req = {0};
ASSERT_EQ(tDeserializeSMAlterStbReq(pQuery->pCmdMsg->pMsg, pQuery->pCmdMsg->msgLen, &req), TSDB_CODE_SUCCESS);
ASSERT_EQ(std::string(req.name), std::string(expect.name));
......@@ -444,9 +483,35 @@ TEST_F(ParserInitialATest, alterSTableSemanticCheck) {
run("ALTER STABLE st1 TTL 10", TSDB_CODE_PAR_INVALID_ALTER_TABLE);
}
/*
* ALTER TABLE [db_name.]tb_name alter_table_clause
*
* alter_table_clause: {
* alter_table_options
* | ADD COLUMN col_name column_type
* | DROP COLUMN col_name
* | MODIFY COLUMN col_name column_type
* | RENAME COLUMN old_col_name new_col_name -- only normal table
* | ADD TAG tag_name tag_type -- only super table
* | DROP TAG tag_name -- only super table
* | MODIFY TAG tag_name tag_type -- only super table
* | RENAME TAG old_tag_name new_tag_name -- only super table
* | SET TAG tag_name = new_tag_value -- only child table
* }
*
* alter_table_options:
* alter_table_option ...
*
* alter_table_option: {
* TTL int_value -- only child/normal table
* | COMMENT 'string_value'
* }
*/
TEST_F(ParserInitialATest, alterTable) {
useDb("root", "test");
// normal/child table
{
SVAlterTbReq expect = {0};
auto clearAlterTbReq = [&]() {
......@@ -574,6 +639,99 @@ TEST_F(ParserInitialATest, alterTable) {
setAlterTableTag("st1s1", "tag1", (uint8_t*)&val, sizeof(val));
run("ALTER TABLE st1s1 SET TAG tag1=10");
clearAlterTbReq();
}
// super table
{
SMAlterStbReq expect = {0};
auto clearAlterStbReq = [&]() {
tFreeSMAltertbReq(&expect);
memset(&expect, 0, sizeof(SMAlterStbReq));
};
auto setAlterStbReq = [&](const char* pTbname, int8_t alterType, int32_t numOfFields = 0,
const char* pField1Name = nullptr, int8_t field1Type = 0, int32_t field1Bytes = 0,
const char* pField2Name = nullptr, const char* pComment = nullptr) {
int32_t len = snprintf(expect.name, sizeof(expect.name), "0.test.%s", pTbname);
expect.name[len] = '\0';
expect.alterType = alterType;
if (nullptr != pComment) {
expect.comment = strdup(pComment);
expect.commentLen = strlen(pComment);
}
expect.numOfFields = numOfFields;
if (NULL == expect.pFields) {
expect.pFields = taosArrayInit(2, sizeof(TAOS_FIELD));
TAOS_FIELD field = {0};
taosArrayPush(expect.pFields, &field);
taosArrayPush(expect.pFields, &field);
}
TAOS_FIELD* pField = (TAOS_FIELD*)taosArrayGet(expect.pFields, 0);
if (NULL != pField1Name) {
strcpy(pField->name, pField1Name);
pField->name[strlen(pField1Name)] = '\0';
} else {
memset(pField, 0, sizeof(TAOS_FIELD));
}
pField->type = field1Type;
pField->bytes = field1Bytes > 0 ? field1Bytes : (field1Type > 0 ? tDataTypes[field1Type].bytes : 0);
pField = (TAOS_FIELD*)taosArrayGet(expect.pFields, 1);
if (NULL != pField2Name) {
strcpy(pField->name, pField2Name);
pField->name[strlen(pField2Name)] = '\0';
} else {
memset(pField, 0, sizeof(TAOS_FIELD));
}
pField->type = 0;
pField->bytes = 0;
};
setCheckDdlFunc([&](const SQuery* pQuery, ParserStage stage) {
ASSERT_EQ(nodeType(pQuery->pRoot), QUERY_NODE_ALTER_TABLE_STMT);
ASSERT_EQ(pQuery->pCmdMsg->msgType, TDMT_MND_ALTER_STB);
SMAlterStbReq req = {0};
ASSERT_EQ(tDeserializeSMAlterStbReq(pQuery->pCmdMsg->pMsg, pQuery->pCmdMsg->msgLen, &req), TSDB_CODE_SUCCESS);
ASSERT_EQ(std::string(req.name), std::string(expect.name));
ASSERT_EQ(req.alterType, expect.alterType);
ASSERT_EQ(req.numOfFields, expect.numOfFields);
if (expect.numOfFields > 0) {
TAOS_FIELD* pField = (TAOS_FIELD*)taosArrayGet(req.pFields, 0);
TAOS_FIELD* pExpectField = (TAOS_FIELD*)taosArrayGet(expect.pFields, 0);
ASSERT_EQ(std::string(pField->name), std::string(pExpectField->name));
ASSERT_EQ(pField->type, pExpectField->type);
ASSERT_EQ(pField->bytes, pExpectField->bytes);
}
if (expect.numOfFields > 1) {
TAOS_FIELD* pField = (TAOS_FIELD*)taosArrayGet(req.pFields, 1);
TAOS_FIELD* pExpectField = (TAOS_FIELD*)taosArrayGet(expect.pFields, 1);
ASSERT_EQ(std::string(pField->name), std::string(pExpectField->name));
ASSERT_EQ(pField->type, pExpectField->type);
ASSERT_EQ(pField->bytes, pExpectField->bytes);
}
tFreeSMAltertbReq(&req);
});
setAlterStbReq("st1", TSDB_ALTER_TABLE_ADD_TAG, 1, "tag11", TSDB_DATA_TYPE_BIGINT);
run("ALTER TABLE st1 ADD TAG tag11 BIGINT");
clearAlterStbReq();
setAlterStbReq("st1", TSDB_ALTER_TABLE_DROP_TAG, 1, "tag1");
run("ALTER TABLE st1 DROP TAG tag1");
clearAlterStbReq();
setAlterStbReq("st1", TSDB_ALTER_TABLE_UPDATE_TAG_BYTES, 1, "tag2", TSDB_DATA_TYPE_VARCHAR,
30 + VARSTR_HEADER_SIZE);
run("ALTER TABLE st1 MODIFY TAG tag2 VARCHAR(30)");
clearAlterStbReq();
setAlterStbReq("st1", TSDB_ALTER_TABLE_UPDATE_TAG_NAME, 2, "tag1", 0, 0, "tag11");
run("ALTER TABLE st1 RENAME TAG tag1 tag11");
clearAlterStbReq();
}
}
TEST_F(ParserInitialATest, alterTableSemanticCheck) {
......@@ -588,7 +746,7 @@ TEST_F(ParserInitialATest, alterTableSemanticCheck) {
}
/*
* ALTER USER user_name PASS str_value
* ALTER USER user_name alter_user_clause
*
* alter_user_clause: {
* PASS str_value
......@@ -618,6 +776,7 @@ TEST_F(ParserInitialATest, alterUser) {
setCheckDdlFunc([&](const SQuery* pQuery, ParserStage stage) {
ASSERT_EQ(nodeType(pQuery->pRoot), QUERY_NODE_ALTER_USER_STMT);
ASSERT_EQ(pQuery->pCmdMsg->msgType, TDMT_MND_ALTER_USER);
SAlterUserReq req = {0};
ASSERT_TRUE(TSDB_CODE_SUCCESS == tDeserializeSAlterUserReq(pQuery->pCmdMsg->pMsg, pQuery->pCmdMsg->msgLen, &req));
......
......@@ -52,8 +52,8 @@ TEST_F(ParserExplainToSyncdbTest, grant) {
ASSERT_EQ(string(req.objname), string(expect.objname));
});
setAlterUserReq(TSDB_ALTER_USER_ADD_ALL_DB, "wxy", "0.test");
run("GRANT ALL ON test.* TO wxy");
setAlterUserReq(TSDB_ALTER_USER_ADD_ALL_DB, "wxy", "0.*");
run("GRANT ALL ON *.* TO wxy");
setAlterUserReq(TSDB_ALTER_USER_ADD_READ_DB, "wxy", "0.test");
run("GRANT READ ON test.* TO wxy");
......@@ -138,10 +138,38 @@ TEST_F(ParserExplainToSyncdbTest, redistributeVgroup) {
TEST_F(ParserExplainToSyncdbTest, revoke) {
useDb("root", "test");
run("REVOKE ALL ON test.* FROM wxy");
SAlterUserReq expect = {0};
auto setAlterUserReq = [&](int8_t alterType, const string& user, const string& obj) {
expect.alterType = alterType;
snprintf(expect.user, sizeof(expect.user), "%s", user.c_str());
snprintf(expect.objname, sizeof(expect.objname), "%s", obj.c_str());
};
setCheckDdlFunc([&](const SQuery* pQuery, ParserStage stage) {
ASSERT_EQ(nodeType(pQuery->pRoot), QUERY_NODE_REVOKE_STMT);
ASSERT_EQ(pQuery->pCmdMsg->msgType, TDMT_MND_ALTER_USER);
SAlterUserReq req = {0};
ASSERT_EQ(tDeserializeSAlterUserReq(pQuery->pCmdMsg->pMsg, pQuery->pCmdMsg->msgLen, &req), TSDB_CODE_SUCCESS);
ASSERT_EQ(req.alterType, expect.alterType);
ASSERT_EQ(string(req.user), string(expect.user));
ASSERT_EQ(string(req.objname), string(expect.objname));
});
setAlterUserReq(TSDB_ALTER_USER_REMOVE_ALL_DB, "wxy", "0.*");
run("REVOKE ALL ON *.* FROM wxy");
setAlterUserReq(TSDB_ALTER_USER_REMOVE_READ_DB, "wxy", "0.test");
run("REVOKE READ ON test.* FROM wxy");
setAlterUserReq(TSDB_ALTER_USER_REMOVE_WRITE_DB, "wxy", "0.test");
run("REVOKE WRITE ON test.* FROM wxy");
setAlterUserReq(TSDB_ALTER_USER_REMOVE_ALL_DB, "wxy", "0.test");
run("REVOKE READ, WRITE ON test.* FROM wxy");
setAlterUserReq(TSDB_ALTER_USER_REMOVE_SUBSCRIBE_TOPIC, "wxy", "0.tp1");
run("REVOKE SUBSCRIBE ON tp1 FROM wxy");
}
// todo syncdb
......
......@@ -99,7 +99,7 @@ TEST_F(ParserInitialDTest, dropDnode) {
expect.force = force;
};
auto setDropDnodeReqByEndpoint = [&](const char* pFqdn, int32_t port, bool force = false) {
auto setDropDnodeReqByEndpoint = [&](const char* pFqdn, int32_t port = tsServerPort, bool force = false) {
strcpy(expect.fqdn, pFqdn);
expect.port = port;
expect.force = force;
......@@ -131,6 +131,14 @@ TEST_F(ParserInitialDTest, dropDnode) {
setDropDnodeReqByEndpoint("host2", 8030, true);
run("DROP DNODE 'host2:8030' FORCE");
clearDropDnodeReq();
setDropDnodeReqByEndpoint("host1");
run("DROP DNODE host1");
clearDropDnodeReq();
setDropDnodeReqByEndpoint("host2", tsServerPort, true);
run("DROP DNODE host2 FORCE");
clearDropDnodeReq();
}
// todo DROP function
......@@ -174,7 +182,21 @@ TEST_F(ParserInitialDTest, dropMnode) {
TEST_F(ParserInitialDTest, dropQnode) {
useDb("root", "test");
run("DROP qnode on dnode 1");
SMDropQnodeReq expect = {0};
auto setDropQnodeReq = [&](int32_t dnodeId) { expect.dnodeId = dnodeId; };
setCheckDdlFunc([&](const SQuery* pQuery, ParserStage stage) {
ASSERT_EQ(nodeType(pQuery->pRoot), QUERY_NODE_DROP_QNODE_STMT);
SMDropQnodeReq req = {0};
ASSERT_TRUE(TSDB_CODE_SUCCESS ==
tDeserializeSCreateDropMQSNodeReq(pQuery->pCmdMsg->pMsg, pQuery->pCmdMsg->msgLen, &req));
ASSERT_EQ(req.dnodeId, expect.dnodeId);
});
setDropQnodeReq(1);
run("DROP QNODE ON DNODE 1");
}
TEST_F(ParserInitialDTest, dropSnode) {
......@@ -237,7 +259,20 @@ TEST_F(ParserInitialDTest, dropUser) {
login("root");
useDb("root", "test");
run("DROP user wxy");
SDropUserReq expect = {0};
auto setDropUserReq = [&](const char* pUser) { sprintf(expect.user, "%s", pUser); };
setCheckDdlFunc([&](const SQuery* pQuery, ParserStage stage) {
ASSERT_EQ(nodeType(pQuery->pRoot), QUERY_NODE_DROP_USER_STMT);
SDropUserReq req = {0};
ASSERT_TRUE(TSDB_CODE_SUCCESS == tDeserializeSDropUserReq(pQuery->pCmdMsg->pMsg, pQuery->pCmdMsg->msgLen, &req));
ASSERT_EQ(std::string(req.user), std::string(expect.user));
});
setDropUserReq("wxy");
run("DROP USER wxy");
}
} // namespace ParserTest
......@@ -112,7 +112,11 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
ASSERT(0);
}
if (output == NULL) {
if (qStreamRecoverScanFinished(exec)) {
finished = true;
} else {
qSetStreamOpOpen(exec);
}
break;
}
......
......@@ -16,6 +16,7 @@
#define ALLOW_FORBID_FUNC
#define _DEFAULT_SOURCE
#include "os.h"
#include <stdlib.h>
#ifdef WINDOWS
void swapStr(char* j, char* J, int width) {
......@@ -33,16 +34,5 @@ void swapStr(char* j, char* J, int width) {
// todo refactor: 1) move away; 2) use merge sort instead; 3) qsort is not a stable sort actually.
void taosSort(void* arr, int64_t sz, int64_t width, __compar_fn_t compar) {
#ifdef WINDOWS
int64_t i, j;
for (i = 0; i < sz - 1; i++) {
for (j = 0; j < sz - 1 - i; j++) {
if (compar((char*)arr + j * width, (char*)arr + (j + 1) * width) > 0.00) {
swapStr((char*)arr + j * width, (char*)arr + (j + 1) * width, width);
}
}
}
#else
qsort(arr, sz, width, compar);
#endif
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册