提交 41e67fb2 编写于 作者: X Xiaoyu Wang

Merge remote-tracking branch 'origin/3.0' into feature/3.0_debug_wxy

...@@ -287,7 +287,7 @@ typedef enum ELogicConditionType { ...@@ -287,7 +287,7 @@ typedef enum ELogicConditionType {
#define TSDB_MULTI_TABLEMETA_MAX_NUM 100000 // maximum batch size allowed to load table meta #define TSDB_MULTI_TABLEMETA_MAX_NUM 100000 // maximum batch size allowed to load table meta
#define TSDB_MIN_VNODES_PER_DB 1 #define TSDB_MIN_VNODES_PER_DB 1
#define TSDB_MAX_VNODES_PER_DB 4096 #define TSDB_MAX_VNODES_PER_DB 1024
#define TSDB_DEFAULT_VN_PER_DB 2 #define TSDB_DEFAULT_VN_PER_DB 2
#define TSDB_MIN_BUFFER_PER_VNODE 3 // unit MB #define TSDB_MIN_BUFFER_PER_VNODE 3 // unit MB
#define TSDB_MAX_BUFFER_PER_VNODE 16384 // unit MB #define TSDB_MAX_BUFFER_PER_VNODE 16384 // unit MB
......
...@@ -108,13 +108,11 @@ static const SSysDbTableSchema userFuncSchema[] = { ...@@ -108,13 +108,11 @@ static const SSysDbTableSchema userFuncSchema[] = {
}; };
static const SSysDbTableSchema userIdxSchema[] = { static const SSysDbTableSchema userIdxSchema[] = {
{.name = "index_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "db_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "db_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "table_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "table_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "index_database", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "vgroup_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
{.name = "index_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
{.name = "column_name", .bytes = SYSTABLE_SCH_COL_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "index_type", .bytes = 10, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "index_extensions", .bytes = 256, .type = TSDB_DATA_TYPE_VARCHAR},
}; };
static const SSysDbTableSchema userStbsSchema[] = { static const SSysDbTableSchema userStbsSchema[] = {
...@@ -313,7 +311,7 @@ static const SSysDbTableSchema querySchema[] = { ...@@ -313,7 +311,7 @@ static const SSysDbTableSchema querySchema[] = {
{.name = "query_id", .bytes = TSDB_QUERY_ID_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "query_id", .bytes = TSDB_QUERY_ID_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "req_id", .bytes = 8, .type = TSDB_DATA_TYPE_UBIGINT}, {.name = "req_id", .bytes = 8, .type = TSDB_DATA_TYPE_UBIGINT},
{.name = "connId", .bytes = 4, .type = TSDB_DATA_TYPE_UINT}, {.name = "connId", .bytes = 4, .type = TSDB_DATA_TYPE_UINT},
{.name = "app", .bytes = TSDB_APP_NAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "app", .bytes = TSDB_APP_NAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "pid", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, {.name = "pid", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
{.name = "user", .bytes = TSDB_USER_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "user", .bytes = TSDB_USER_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "end_point", .bytes = TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "end_point", .bytes = TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
...@@ -329,17 +327,17 @@ static const SSysDbTableSchema appSchema[] = { ...@@ -329,17 +327,17 @@ static const SSysDbTableSchema appSchema[] = {
{.name = "app_id", .bytes = 8, .type = TSDB_DATA_TYPE_UBIGINT}, {.name = "app_id", .bytes = 8, .type = TSDB_DATA_TYPE_UBIGINT},
{.name = "ip", .bytes = TSDB_IPv4ADDR_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "ip", .bytes = TSDB_IPv4ADDR_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "pid", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, {.name = "pid", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
{.name = "name", .bytes = TSDB_APP_NAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "name", .bytes = TSDB_APP_NAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "start_time", .bytes = 8 , .type = TSDB_DATA_TYPE_TIMESTAMP}, {.name = "start_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
{.name = "insert_req", .bytes = 8 , .type = TSDB_DATA_TYPE_UBIGINT}, {.name = "insert_req", .bytes = 8, .type = TSDB_DATA_TYPE_UBIGINT},
{.name = "insert_row", .bytes = 8 , .type = TSDB_DATA_TYPE_UBIGINT}, {.name = "insert_row", .bytes = 8, .type = TSDB_DATA_TYPE_UBIGINT},
{.name = "insert_time", .bytes = 8 , .type = TSDB_DATA_TYPE_UBIGINT}, {.name = "insert_time", .bytes = 8, .type = TSDB_DATA_TYPE_UBIGINT},
{.name = "insert_bytes", .bytes = 8 , .type = TSDB_DATA_TYPE_UBIGINT}, {.name = "insert_bytes", .bytes = 8, .type = TSDB_DATA_TYPE_UBIGINT},
{.name = "fetch_bytes", .bytes = 8 , .type = TSDB_DATA_TYPE_UBIGINT}, {.name = "fetch_bytes", .bytes = 8, .type = TSDB_DATA_TYPE_UBIGINT},
{.name = "query_time", .bytes = 8 , .type = TSDB_DATA_TYPE_UBIGINT}, {.name = "query_time", .bytes = 8, .type = TSDB_DATA_TYPE_UBIGINT},
{.name = "show_query", .bytes = 8 , .type = TSDB_DATA_TYPE_UBIGINT}, {.name = "show_query", .bytes = 8, .type = TSDB_DATA_TYPE_UBIGINT},
{.name = "total_req", .bytes = 8 , .type = TSDB_DATA_TYPE_UBIGINT}, {.name = "total_req", .bytes = 8, .type = TSDB_DATA_TYPE_UBIGINT},
{.name = "current_req", .bytes = 8 , .type = TSDB_DATA_TYPE_UBIGINT}, {.name = "current_req", .bytes = 8, .type = TSDB_DATA_TYPE_UBIGINT},
{.name = "last_access", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP}, {.name = "last_access", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
}; };
...@@ -353,8 +351,7 @@ static const SSysTableMeta perfsMeta[] = { ...@@ -353,8 +351,7 @@ static const SSysTableMeta perfsMeta[] = {
{TSDB_PERFS_TABLE_TRANS, transSchema, tListLen(transSchema)}, {TSDB_PERFS_TABLE_TRANS, transSchema, tListLen(transSchema)},
{TSDB_PERFS_TABLE_SMAS, smaSchema, tListLen(smaSchema)}, {TSDB_PERFS_TABLE_SMAS, smaSchema, tListLen(smaSchema)},
{TSDB_PERFS_TABLE_STREAMS, streamSchema, tListLen(streamSchema)}, {TSDB_PERFS_TABLE_STREAMS, streamSchema, tListLen(streamSchema)},
{TSDB_PERFS_TABLE_APPS, appSchema, tListLen(appSchema)} {TSDB_PERFS_TABLE_APPS, appSchema, tListLen(appSchema)}};
};
void getInfosDbMeta(const SSysTableMeta** pInfosTableMeta, size_t* size) { void getInfosDbMeta(const SSysTableMeta** pInfosTableMeta, size_t* size) {
*pInfosTableMeta = infosMeta; *pInfosTableMeta = infosMeta;
......
...@@ -549,18 +549,33 @@ static int32_t mndSetDbCfgFromAlterDbReq(SDbObj *pDb, SAlterDbReq *pAlter) { ...@@ -549,18 +549,33 @@ static int32_t mndSetDbCfgFromAlterDbReq(SDbObj *pDb, SAlterDbReq *pAlter) {
terrno = TSDB_CODE_MND_DB_OPTION_UNCHANGED; terrno = TSDB_CODE_MND_DB_OPTION_UNCHANGED;
if (pAlter->buffer > 0 && pAlter->buffer != pDb->cfg.buffer) { if (pAlter->buffer > 0 && pAlter->buffer != pDb->cfg.buffer) {
#if 1
terrno = TSDB_CODE_OPS_NOT_SUPPORT;
return terrno;
#else
pDb->cfg.buffer = pAlter->buffer; pDb->cfg.buffer = pAlter->buffer;
terrno = 0; terrno = 0;
#endif
} }
if (pAlter->pages > 0 && pAlter->pages != pDb->cfg.pages) { if (pAlter->pages > 0 && pAlter->pages != pDb->cfg.pages) {
#if 1
terrno = TSDB_CODE_OPS_NOT_SUPPORT;
return terrno;
#else
pDb->cfg.pages = pAlter->pages; pDb->cfg.pages = pAlter->pages;
terrno = 0; terrno = 0;
#endif
} }
if (pAlter->pageSize > 0 && pAlter->pageSize != pDb->cfg.pageSize) { if (pAlter->pageSize > 0 && pAlter->pageSize != pDb->cfg.pageSize) {
#if 1
terrno = TSDB_CODE_OPS_NOT_SUPPORT;
return terrno;
#else
pDb->cfg.pageSize = pAlter->pageSize; pDb->cfg.pageSize = pAlter->pageSize;
terrno = 0; terrno = 0;
#endif
} }
if (pAlter->daysPerFile > 0 && pAlter->daysPerFile != pDb->cfg.daysPerFile) { if (pAlter->daysPerFile > 0 && pAlter->daysPerFile != pDb->cfg.daysPerFile) {
...@@ -594,8 +609,12 @@ static int32_t mndSetDbCfgFromAlterDbReq(SDbObj *pDb, SAlterDbReq *pAlter) { ...@@ -594,8 +609,12 @@ static int32_t mndSetDbCfgFromAlterDbReq(SDbObj *pDb, SAlterDbReq *pAlter) {
} }
if (pAlter->strict >= 0 && pAlter->strict != pDb->cfg.strict) { if (pAlter->strict >= 0 && pAlter->strict != pDb->cfg.strict) {
#if 1
terrno = TSDB_CODE_OPS_NOT_SUPPORT;
#else
pDb->cfg.strict = pAlter->strict; pDb->cfg.strict = pAlter->strict;
terrno = 0; terrno = 0;
#endif
} }
if (pAlter->cacheLastRow >= 0 && pAlter->cacheLastRow != pDb->cfg.cacheLastRow) { if (pAlter->cacheLastRow >= 0 && pAlter->cacheLastRow != pDb->cfg.cacheLastRow) {
...@@ -604,9 +623,13 @@ static int32_t mndSetDbCfgFromAlterDbReq(SDbObj *pDb, SAlterDbReq *pAlter) { ...@@ -604,9 +623,13 @@ static int32_t mndSetDbCfgFromAlterDbReq(SDbObj *pDb, SAlterDbReq *pAlter) {
} }
if (pAlter->replications > 0 && pAlter->replications != pDb->cfg.replications) { if (pAlter->replications > 0 && pAlter->replications != pDb->cfg.replications) {
#if 1
terrno = TSDB_CODE_OPS_NOT_SUPPORT;
#else
pDb->cfg.replications = pAlter->replications; pDb->cfg.replications = pAlter->replications;
pDb->vgVersion++; pDb->vgVersion++;
terrno = 0; terrno = 0;
#endif
} }
return terrno; return terrno;
......
...@@ -758,6 +758,11 @@ static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq) { ...@@ -758,6 +758,11 @@ static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq) {
} }
} }
if (numOfVnodes > 0) {
terrno = TSDB_CODE_OPS_NOT_SUPPORT;
goto _OVER;
}
code = mndDropDnode(pMnode, pReq, pDnode, pMObj, pQObj, pSObj, numOfVnodes); code = mndDropDnode(pMnode, pReq, pDnode, pMObj, pQObj, pSObj, numOfVnodes);
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
......
...@@ -71,7 +71,7 @@ static int32_t convertToRetrieveType(char *name, int32_t len) { ...@@ -71,7 +71,7 @@ static int32_t convertToRetrieveType(char *name, int32_t len) {
} else if (strncasecmp(name, TSDB_INS_TABLE_USER_FUNCTIONS, len) == 0) { } else if (strncasecmp(name, TSDB_INS_TABLE_USER_FUNCTIONS, len) == 0) {
type = TSDB_MGMT_TABLE_FUNC; type = TSDB_MGMT_TABLE_FUNC;
} else if (strncasecmp(name, TSDB_INS_TABLE_USER_INDEXES, len) == 0) { } else if (strncasecmp(name, TSDB_INS_TABLE_USER_INDEXES, len) == 0) {
// type = TSDB_MGMT_TABLE_INDEX; type = TSDB_MGMT_TABLE_INDEX;
} else if (strncasecmp(name, TSDB_INS_TABLE_USER_STABLES, len) == 0) { } else if (strncasecmp(name, TSDB_INS_TABLE_USER_STABLES, len) == 0) {
type = TSDB_MGMT_TABLE_STB; type = TSDB_MGMT_TABLE_STB;
} else if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, len) == 0) { } else if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, len) == 0) {
......
...@@ -1147,29 +1147,32 @@ static int32_t mndRetrieveSma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBloc ...@@ -1147,29 +1147,32 @@ static int32_t mndRetrieveSma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBloc
SName smaName = {0}; SName smaName = {0};
tNameFromString(&smaName, pSma->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); tNameFromString(&smaName, pSma->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
char n1[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(n1, (char *)tNameGetTableName(&smaName));
char n[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0}; char n2[TSDB_DB_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(n, (char *)tNameGetTableName(&smaName)); STR_TO_VARSTR(n2, (char *)mndGetDbStr(pDb->name));
cols++;
SName stbName = {0}; SName stbName = {0};
tNameFromString(&stbName, pSma->stb, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); tNameFromString(&stbName, pSma->stb, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
char n3[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
char n1[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0}; STR_TO_VARSTR(n3, (char *)tNameGetTableName(&stbName));
STR_TO_VARSTR(n1, (char *)tNameGetTableName(&stbName));
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)n, false); colDataAppend(pColInfo, numOfRows, (const char *)n1, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pSma->createdTime, false); colDataAppend(pColInfo, numOfRows, (const char *)n2, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)n1, false); colDataAppend(pColInfo, numOfRows, (const char *)n3, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pSma->dstVgId, false); colDataAppend(pColInfo, numOfRows, (const char *)&pSma->dstVgId, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pSma->createdTime, false);
numOfRows++; numOfRows++;
sdbRelease(pSdb, pSma); sdbRelease(pSdb, pSma);
} }
......
...@@ -273,6 +273,9 @@ _OVER: ...@@ -273,6 +273,9 @@ _OVER:
} }
static int32_t mndProcessCreateSnodeReq(SRpcMsg *pReq) { static int32_t mndProcessCreateSnodeReq(SRpcMsg *pReq) {
#if 1
return TSDB_CODE_OPS_NOT_SUPPORT;
#else
SMnode *pMnode = pReq->info.node; SMnode *pMnode = pReq->info.node;
int32_t code = -1; int32_t code = -1;
SSnodeObj *pObj = NULL; SSnodeObj *pObj = NULL;
...@@ -315,6 +318,7 @@ _OVER: ...@@ -315,6 +318,7 @@ _OVER:
mndReleaseSnode(pMnode, pObj); mndReleaseSnode(pMnode, pObj);
mndReleaseDnode(pMnode, pDnode); mndReleaseDnode(pMnode, pDnode);
return code; return code;
#endif
} }
static int32_t mndSetDropSnodeRedoLogs(STrans *pTrans, SSnodeObj *pObj) { static int32_t mndSetDropSnodeRedoLogs(STrans *pTrans, SSnodeObj *pObj) {
...@@ -386,9 +390,12 @@ _OVER: ...@@ -386,9 +390,12 @@ _OVER:
} }
static int32_t mndProcessDropSnodeReq(SRpcMsg *pReq) { static int32_t mndProcessDropSnodeReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node; #if 1
int32_t code = -1; return TSDB_CODE_OPS_NOT_SUPPORT;
SSnodeObj *pObj = NULL; #else
SMnode *pMnode = pReq->info.node;
int32_t code = -1;
SSnodeObj *pObj = NULL;
SMDropSnodeReq dropReq = {0}; SMDropSnodeReq dropReq = {0};
if (tDeserializeSCreateDropMQSBNodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) { if (tDeserializeSCreateDropMQSBNodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
...@@ -422,6 +429,7 @@ _OVER: ...@@ -422,6 +429,7 @@ _OVER:
mndReleaseSnode(pMnode, pObj); mndReleaseSnode(pMnode, pObj);
return code; return code;
#endif
} }
static int32_t mndRetrieveSnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) { static int32_t mndRetrieveSnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
......
...@@ -247,7 +247,6 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, ...@@ -247,7 +247,6 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
pObj->uid = mndGenerateUid(pObj->name, strlen(pObj->name)); pObj->uid = mndGenerateUid(pObj->name, strlen(pObj->name));
pObj->status = 0; pObj->status = 0;
// TODO
pObj->igExpired = pCreate->igExpired; pObj->igExpired = pCreate->igExpired;
pObj->trigger = pCreate->triggerType; pObj->trigger = pCreate->triggerType;
pObj->triggerParam = pCreate->maxDelay; pObj->triggerParam = pCreate->maxDelay;
......
...@@ -1219,6 +1219,9 @@ _OVER: ...@@ -1219,6 +1219,9 @@ _OVER:
} }
static int32_t mndProcessRedistributeVgroupMsg(SRpcMsg *pReq) { static int32_t mndProcessRedistributeVgroupMsg(SRpcMsg *pReq) {
#if 1
return TSDB_CODE_OPS_NOT_SUPPORT;
#else
SMnode *pMnode = pReq->info.node; SMnode *pMnode = pReq->info.node;
SDnodeObj *pNew1 = NULL; SDnodeObj *pNew1 = NULL;
SDnodeObj *pNew2 = NULL; SDnodeObj *pNew2 = NULL;
...@@ -1412,6 +1415,7 @@ _OVER: ...@@ -1412,6 +1415,7 @@ _OVER:
mndReleaseDb(pMnode, pDb); mndReleaseDb(pMnode, pDb);
return code; return code;
#endif
} }
int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SArray *pArray) { int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SArray *pArray) {
...@@ -1711,6 +1715,9 @@ _OVER: ...@@ -1711,6 +1715,9 @@ _OVER:
} }
static int32_t mndProcessBalanceVgroupMsg(SRpcMsg *pReq) { static int32_t mndProcessBalanceVgroupMsg(SRpcMsg *pReq) {
#if 1
return TSDB_CODE_OPS_NOT_SUPPORT;
#else
SMnode *pMnode = pReq->info.node; SMnode *pMnode = pReq->info.node;
int32_t code = -1; int32_t code = -1;
SArray *pArray = NULL; SArray *pArray = NULL;
...@@ -1759,6 +1766,7 @@ _OVER: ...@@ -1759,6 +1766,7 @@ _OVER:
taosArrayDestroy(pArray); taosArrayDestroy(pArray);
return code; return code;
#endif
} }
bool mndVgroupInDb(SVgObj *pVgroup, int64_t dbUid) { return !pVgroup->isTsma && pVgroup->dbUid == dbUid; } bool mndVgroupInDb(SVgObj *pVgroup, int64_t dbUid) { return !pVgroup->isTsma && pVgroup->dbUid == dbUid; }
\ No newline at end of file
...@@ -93,7 +93,7 @@ TEST_F(MndTestDb, 02_Create_Alter_Drop_Db) { ...@@ -93,7 +93,7 @@ TEST_F(MndTestDb, 02_Create_Alter_Drop_Db) {
SRpcMsg* pRsp = test.SendReq(TDMT_MND_ALTER_DB, pReq, contLen); SRpcMsg* pRsp = test.SendReq(TDMT_MND_ALTER_DB, pReq, contLen);
ASSERT_NE(pRsp, nullptr); ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, 0); ASSERT_EQ(pRsp->code, TSDB_CODE_OPS_NOT_SUPPORT);
} }
test.SendShowReq(TSDB_MGMT_TABLE_DB, "user_databases", ""); test.SendShowReq(TSDB_MGMT_TABLE_DB, "user_databases", "");
......
...@@ -62,12 +62,10 @@ struct STSmaStat { ...@@ -62,12 +62,10 @@ struct STSmaStat {
struct SRSmaStat { struct SRSmaStat {
SSma *pSma; SSma *pSma;
int64_t refId; // shared by persistence/fetch tasks int64_t refId; // shared by fetch tasks
void *tmrHandle; // for persistence task void *tmrHandle; // shared by fetch tasks
tmr_h tmrId; // for persistence task int8_t triggerStat; // shared by fetch tasks
int32_t tmrSeconds; // for persistence task int8_t runningStat; // for persistence task
int8_t triggerStat; // for persistence task
int8_t runningStat; // for persistence task
SHashObj *rsmaInfoHash; // key: stbUid, value: SRSmaInfo; SHashObj *rsmaInfoHash; // key: stbUid, value: SRSmaInfo;
}; };
...@@ -82,7 +80,6 @@ struct SSmaStat { ...@@ -82,7 +80,6 @@ struct SSmaStat {
#define SMA_TSMA_STAT(s) (&(s)->tsmaStat) #define SMA_TSMA_STAT(s) (&(s)->tsmaStat)
#define SMA_RSMA_STAT(s) (&(s)->rsmaStat) #define SMA_RSMA_STAT(s) (&(s)->rsmaStat)
#define RSMA_INFO_HASH(r) ((r)->rsmaInfoHash) #define RSMA_INFO_HASH(r) ((r)->rsmaInfoHash)
#define RSMA_TMR_ID(r) ((r)->tmrId)
#define RSMA_TMR_HANDLE(r) ((r)->tmrHandle) #define RSMA_TMR_HANDLE(r) ((r)->tmrHandle)
#define RSMA_TRIGGER_STAT(r) (&(r)->triggerStat) #define RSMA_TRIGGER_STAT(r) (&(r)->triggerStat)
#define RSMA_RUNNING_STAT(r) (&(r)->runningStat) #define RSMA_RUNNING_STAT(r) (&(r)->runningStat)
...@@ -185,9 +182,11 @@ static FORCE_INLINE void tdSmaStatSetDropped(STSmaStat *pTStat) { ...@@ -185,9 +182,11 @@ static FORCE_INLINE void tdSmaStatSetDropped(STSmaStat *pTStat) {
static int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType); static int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType);
void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType); void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType);
void *tdFreeRSmaInfo(SRSmaInfo *pInfo); void *tdFreeRSmaInfo(SRSmaInfo *pInfo);
int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat);
int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName); int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName);
int32_t tdProcessRSmaRestoreImpl(SSma *pSma); int32_t tdProcessRSmaRestoreImpl(SSma *pSma);
int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t version, const char *pMsg); int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t version, const char *pMsg);
int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg); int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg);
int32_t tdProcessTSmaGetDaysImpl(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days); int32_t tdProcessTSmaGetDaysImpl(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days);
...@@ -244,8 +243,8 @@ void tdUpdateTFileMagic(STFile *pTFile, void *pCksm); ...@@ -244,8 +243,8 @@ void tdUpdateTFileMagic(STFile *pTFile, void *pCksm);
void tdCloseTFile(STFile *pTFile); void tdCloseTFile(STFile *pTFile);
void tdDestroyTFile(STFile *pTFile); void tdDestroyTFile(STFile *pTFile);
void tdGetVndFileName(int32_t vgId, const char *dname, const char *fname, int64_t version, char *outputName); void tdGetVndFileName(int32_t vgId, const char *pdname, const char *dname, const char *fname, int64_t version, char *outputName);
void tdGetVndDirName(int32_t vgId, const char *dname, char *outputName); void tdGetVndDirName(int32_t vgId,const char *pdname, const char *dname, bool endWithSep, char *outputName);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -163,8 +163,8 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pSchema, bool ...@@ -163,8 +163,8 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pSchema, bool
// sma // sma
int32_t smaOpen(SVnode* pVnode); int32_t smaOpen(SVnode* pVnode);
int32_t smaCloseEnv(SSma* pSma); int32_t smaClose(SSma* pSma);
int32_t smaCloseEx(SSma* pSma); int32_t smaBegin(SSma* pSma);
int32_t smaPreCommit(SSma* pSma); int32_t smaPreCommit(SSma* pSma);
int32_t smaCommit(SSma* pSma); int32_t smaCommit(SSma* pSma);
int32_t smaPostCommit(SSma* pSma); int32_t smaPostCommit(SSma* pSma);
......
...@@ -43,13 +43,48 @@ int32_t smaCommit(SSma *pSma) { return tdProcessRSmaCommitImpl(pSma); } ...@@ -43,13 +43,48 @@ int32_t smaCommit(SSma *pSma) { return tdProcessRSmaCommitImpl(pSma); }
*/ */
int32_t smaPostCommit(SSma *pSma) { return tdProcessRSmaPostCommitImpl(pSma); } int32_t smaPostCommit(SSma *pSma) { return tdProcessRSmaPostCommitImpl(pSma); }
/**
* @brief set rsma trigger stat active
*
* @param pSma
* @return int32_t
*/
int32_t smaBegin(SSma *pSma) {
SSmaEnv *pSmaEnv = SMA_RSMA_ENV(pSma);
if (!pSmaEnv) {
return TSDB_CODE_SUCCESS;
}
SSmaStat *pStat = SMA_ENV_STAT(pSmaEnv);
SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pStat);
int8_t rsmaTriggerStat =
atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_PAUSED, TASK_TRIGGER_STAT_ACTIVE);
switch (rsmaTriggerStat) {
case TASK_TRIGGER_STAT_PAUSED: {
smaDebug("vgId:%d rsma trigger stat from paused to active", SMA_VID(pSma));
break;
}
case TASK_TRIGGER_STAT_INIT: {
atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_ACTIVE);
smaDebug("vgId:%d rsma trigger stat from init to active", SMA_VID(pSma));
break;
}
default: {
atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_ACTIVE);
smaWarn("vgId:%d rsma trigger stat %" PRIi8 " is unexpected", SMA_VID(pSma), rsmaTriggerStat);
ASSERT(0);
break;
}
}
return TSDB_CODE_SUCCESS;
}
/** /**
* @brief pre-commit for rollup sma. * @brief pre-commit for rollup sma.
* 1) set trigger stat of rsma timer TASK_TRIGGER_STAT_PAUSED. * 1) set trigger stat of rsma timer TASK_TRIGGER_STAT_PAUSED.
* 2) perform persist task for qTaskInfo * 2) perform persist task for qTaskInfo
* 3) wait all triggered fetch tasks finished * 3) wait all triggered fetch tasks finished
* 4) set trigger stat of rsma timer TASK_TRIGGER_STAT_ACTIVE.
* 5) finish
* *
* @param pSma * @param pSma
* @return int32_t * @return int32_t
...@@ -63,10 +98,30 @@ static int32_t tdProcessRSmaPreCommitImpl(SSma *pSma) { ...@@ -63,10 +98,30 @@ static int32_t tdProcessRSmaPreCommitImpl(SSma *pSma) {
SSmaStat *pStat = SMA_ENV_STAT(pSmaEnv); SSmaStat *pStat = SMA_ENV_STAT(pSmaEnv);
SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pStat); SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pStat);
// step 1
// step 1: set persistence task paused
atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_PAUSED); atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_PAUSED);
// step 2 // step 2: perform persist task for qTaskInfo
tdRSmaPersistExecImpl(pRSmaStat);
// step 3: wait all triggered fetch tasks finished
int32_t nLoops = 0;
while (1) {
if (T_REF_VAL_GET(pStat) == 0) {
smaDebug("vgId:%d, rsma fetch tasks all finished", SMA_VID(pSma));
break;
} else {
smaDebug("vgId:%d, rsma fetch tasks not all finished yet", SMA_VID(pSma));
}
++nLoops;
if (nLoops > 1000) {
sched_yield();
nLoops = 0;
}
}
smaDebug("vgId:%d, rsma pre commit succeess", SMA_VID(pSma));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -103,48 +158,68 @@ static int32_t tdProcessRSmaPostCommitImpl(SSma *pSma) { ...@@ -103,48 +158,68 @@ static int32_t tdProcessRSmaPostCommitImpl(SSma *pSma) {
TdDirPtr pDir = NULL; TdDirPtr pDir = NULL;
TdDirEntryPtr pDirEntry = NULL; TdDirEntryPtr pDirEntry = NULL;
char dir[TSDB_FILENAME_LEN]; char dir[TSDB_FILENAME_LEN];
char bname[TSDB_FILENAME_LEN]; const char *pattern = "v[0-9]+qtaskinfo\\.ver([0-9]+)?$";
const char *pattern = "^v[0-9]+qtaskinfo\\.ver([0-9]+)?$";
regex_t regex; regex_t regex;
int code = 0;
tdGetVndDirName(TD_VID(pVnode), VNODE_RSMA_DIR, dir); tdGetVndDirName(TD_VID(pVnode), tfsGetPrimaryPath(pVnode->pTfs), VNODE_RSMA_DIR, true, dir);
// Resource allocation and init // Resource allocation and init
regcomp(&regex, pattern, REG_EXTENDED); if ((code = regcomp(&regex, pattern, REG_EXTENDED)) != 0) {
char errbuf[128];
regerror(code, &regex, errbuf, sizeof(errbuf));
smaWarn("vgId:%d, rsma post commit, regcomp for %s failed since %s", TD_VID(pVnode), dir, errbuf);
return TSDB_CODE_FAILED;
}
if ((pDir = taosOpenDir(dir)) == NULL) { if ((pDir = taosOpenDir(dir)) == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
smaWarn("rsma post-commit open dir %s failed since %s", dir, terrstr()); smaWarn("vgId:%d, rsma post commit, open dir %s failed since %s", TD_VID(pVnode), dir, terrstr());
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
int32_t dirLen = strlen(dir);
char *dirEnd = POINTER_SHIFT(dir, dirLen);
regmatch_t regMatch[2]; regmatch_t regMatch[2];
while ((pDirEntry = taosReadDir(pDir)) != NULL) { while ((pDirEntry = taosReadDir(pDir)) != NULL) {
char *entryName = taosGetDirEntryName(pDirEntry); char *entryName = taosGetDirEntryName(pDirEntry);
if (!entryName) { if (!entryName) {
continue; continue;
} }
char *fileName = taosDirEntryBaseName(entryName);
int code = regexec(&regex, bname, 2, regMatch, 0); code = regexec(&regex, entryName, 2, regMatch, 0);
if (code == 0) { if (code == 0) {
// match // match
printf("match 0 = %s\n", (char *)POINTER_SHIFT(fileName, regMatch[0].rm_so)); int64_t version = -1;
printf("match 1 = %s\n", (char *)POINTER_SHIFT(fileName, regMatch[1].rm_so)); sscanf((const char *)POINTER_SHIFT(entryName, regMatch[1].rm_so), "%" PRIi64, &version);
if ((version < committed) && (version > -1)) {
strncpy(dirEnd, entryName, TSDB_FILENAME_LEN - dirLen);
if (taosRemoveFile(dir) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
smaWarn("vgId:%d, committed version:%" PRIi64 ", failed to remove %s since %s", TD_VID(pVnode), committed,
dir, terrstr());
} else {
smaDebug("vgId:%d, committed version:%" PRIi64 ", success to remove %s", TD_VID(pVnode), committed, dir);
}
}
} else if (code == REG_NOMATCH) { } else if (code == REG_NOMATCH) {
// not match // not match
smaInfo("rsma post-commit not match %s", fileName); smaTrace("vgId:%d, rsma post commit, not match %s", TD_VID(pVnode), entryName);
continue; continue;
} else { } else {
// has other error // has other error
terrno = TAOS_SYSTEM_ERROR(code); char errbuf[128];
smaWarn("rsma post-commit regexec failed since %s", terrstr()); regerror(code, &regex, errbuf, sizeof(errbuf));
smaWarn("vgId:%d, rsma post commit, regexec failed since %s", TD_VID(pVnode), errbuf);
taosCloseDir(&pDir); taosCloseDir(&pDir);
regfree(&regex); regfree(&regex);
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
} }
taosCloseDir(&pDir); taosCloseDir(&pDir);
regfree(&regex);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -132,6 +132,7 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pS ...@@ -132,6 +132,7 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pS
if (smaType == TSDB_SMA_TYPE_ROLLUP) { if (smaType == TSDB_SMA_TYPE_ROLLUP) {
SRSmaStat *pRSmaStat = (SRSmaStat *)(*pSmaStat); SRSmaStat *pRSmaStat = (SRSmaStat *)(*pSmaStat);
pRSmaStat->pSma = (SSma *)pSma; pRSmaStat->pSma = (SSma *)pSma;
atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_INIT);
// init smaMgmt // init smaMgmt
smaMgmt.smaRef = taosOpenRef(SMA_MGMT_REF_NUM, tdDestroyRSmaStat); smaMgmt.smaRef = taosOpenRef(SMA_MGMT_REF_NUM, tdDestroyRSmaStat);
...@@ -192,22 +193,20 @@ static void *tdFreeTSmaStat(STSmaStat *pStat) { ...@@ -192,22 +193,20 @@ static void *tdFreeTSmaStat(STSmaStat *pStat) {
static void tdDestroyRSmaStat(void *pRSmaStat) { static void tdDestroyRSmaStat(void *pRSmaStat) {
if (pRSmaStat) { if (pRSmaStat) {
SRSmaStat *pStat = (SRSmaStat *)pRSmaStat; SRSmaStat *pStat = (SRSmaStat *)pRSmaStat;
smaDebug("vgId:%d %s:%d destroy rsma stat %p", SMA_VID(pStat->pSma), __func__, __LINE__, pRSmaStat); SSma *pSma = pStat->pSma;
// step 1: set persistence task cancelled smaDebug("vgId:%d, destroy rsma stat %p", SMA_VID(pSma), pRSmaStat);
// step 1: set rsma trigger stat cancelled
atomic_store_8(RSMA_TRIGGER_STAT(pStat), TASK_TRIGGER_STAT_CANCELLED); atomic_store_8(RSMA_TRIGGER_STAT(pStat), TASK_TRIGGER_STAT_CANCELLED);
// step 2: stop the persistence timer // step 2: wait the persistence thread to finish
taosTmrStopA(&RSMA_TMR_ID(pStat));
// step 3: wait the persistence thread to finish
int32_t nLoops = 0; int32_t nLoops = 0;
if (atomic_load_8(RSMA_RUNNING_STAT(pStat)) == 1) { if (atomic_load_8(RSMA_RUNNING_STAT(pStat)) == 1) {
while (1) { while (1) {
if (atomic_load_8(RSMA_TRIGGER_STAT(pStat)) == TASK_TRIGGER_STAT_FINISHED) { if (atomic_load_8(RSMA_TRIGGER_STAT(pStat)) == TASK_TRIGGER_STAT_FINISHED) {
smaDebug("rsma, persist task finished already"); smaDebug("vgId:%d, rsma persist task finished already", SMA_VID(pSma));
break; break;
} else { } else {
smaDebug("rsma, persist task not finished yet since rsma stat in %" PRIi8, smaDebug("vgId:%d, rsma persist task not finished yet since rsma stat in %" PRIi8, SMA_VID(pSma),
atomic_load_8(RSMA_TRIGGER_STAT(pStat))); atomic_load_8(RSMA_TRIGGER_STAT(pStat)));
} }
++nLoops; ++nLoops;
...@@ -218,13 +217,15 @@ static void tdDestroyRSmaStat(void *pRSmaStat) { ...@@ -218,13 +217,15 @@ static void tdDestroyRSmaStat(void *pRSmaStat) {
} }
} }
// step 4: destroy the rsma info and associated fetch tasks // step 3: destroy the rsma info and associated fetch tasks
// TODO: use taosHashSetFreeFp when taosHashSetFreeFp is ready. // TODO: use taosHashSetFreeFp when taosHashSetFreeFp is ready.
void *infoHash = taosHashIterate(RSMA_INFO_HASH(pStat), NULL); if (taosHashGetSize(RSMA_INFO_HASH(pStat)) > 0) {
while (infoHash) { void *infoHash = taosHashIterate(RSMA_INFO_HASH(pStat), NULL);
SRSmaInfo *pSmaInfo = *(SRSmaInfo **)infoHash; while (infoHash) {
tdFreeRSmaInfo(pSmaInfo); SRSmaInfo *pSmaInfo = *(SRSmaInfo **)infoHash;
infoHash = taosHashIterate(RSMA_INFO_HASH(pStat), infoHash); tdFreeRSmaInfo(pSmaInfo);
infoHash = taosHashIterate(RSMA_INFO_HASH(pStat), infoHash);
}
} }
taosHashCleanup(RSMA_INFO_HASH(pStat)); taosHashCleanup(RSMA_INFO_HASH(pStat));
...@@ -232,10 +233,10 @@ static void tdDestroyRSmaStat(void *pRSmaStat) { ...@@ -232,10 +233,10 @@ static void tdDestroyRSmaStat(void *pRSmaStat) {
nLoops = 0; nLoops = 0;
while (1) { while (1) {
if (T_REF_VAL_GET((SSmaStat *)pStat) == 0) { if (T_REF_VAL_GET((SSmaStat *)pStat) == 0) {
smaDebug("rsma, all fetch task finished already"); smaDebug("vgId:%d, rsma fetch tasks all finished", SMA_VID(pSma));
break; break;
} else { } else {
smaDebug("rsma, fetch tasks not all finished yet"); smaDebug("vgId:%d, rsma fetch tasks not all finished yet", SMA_VID(pSma));
} }
++nLoops; ++nLoops;
if (nLoops > 1000) { if (nLoops > 1000) {
...@@ -275,7 +276,7 @@ int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType) { ...@@ -275,7 +276,7 @@ int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType) {
} else if (smaType == TSDB_SMA_TYPE_ROLLUP) { } else if (smaType == TSDB_SMA_TYPE_ROLLUP) {
SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSmaStat); SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSmaStat);
if (taosRemoveRef(smaMgmt.smaRef, RSMA_REF_ID(pRSmaStat)) < 0) { if (taosRemoveRef(smaMgmt.smaRef, RSMA_REF_ID(pRSmaStat)) < 0) {
smaError("remove refId from smaRef failed, refId:0x%" PRIx64, RSMA_REF_ID(pRSmaStat)); smaError("remove refId from rsmaRef:0x%" PRIx64 " failed since %s", RSMA_REF_ID(pRSmaStat), terrstr());
} }
} else { } else {
ASSERT(0); ASSERT(0);
......
...@@ -135,17 +135,11 @@ _err: ...@@ -135,17 +135,11 @@ _err:
return -1; return -1;
} }
int32_t smaCloseEnv(SSma *pSma) { int32_t smaClose(SSma *pSma) {
if (pSma) { if (pSma) {
taosThreadMutexDestroy(&pSma->mutex);
SMA_TSMA_ENV(pSma) = tdFreeSmaEnv(SMA_TSMA_ENV(pSma)); SMA_TSMA_ENV(pSma) = tdFreeSmaEnv(SMA_TSMA_ENV(pSma));
SMA_RSMA_ENV(pSma) = tdFreeSmaEnv(SMA_RSMA_ENV(pSma)); SMA_RSMA_ENV(pSma) = tdFreeSmaEnv(SMA_RSMA_ENV(pSma));
}
return 0;
}
int32_t smaCloseEx(SSma *pSma) {
if (pSma) {
taosThreadMutexDestroy(&pSma->mutex);
if SMA_RSMA_TSDB0 (pSma) tsdbClose(&SMA_RSMA_TSDB0(pSma)); if SMA_RSMA_TSDB0 (pSma) tsdbClose(&SMA_RSMA_TSDB0(pSma));
if SMA_RSMA_TSDB1 (pSma) tsdbClose(&SMA_RSMA_TSDB1(pSma)); if SMA_RSMA_TSDB1 (pSma) tsdbClose(&SMA_RSMA_TSDB1(pSma));
if SMA_RSMA_TSDB2 (pSma) tsdbClose(&SMA_RSMA_TSDB2(pSma)); if SMA_RSMA_TSDB2 (pSma) tsdbClose(&SMA_RSMA_TSDB2(pSma));
......
...@@ -15,9 +15,8 @@ ...@@ -15,9 +15,8 @@
#include "sma.h" #include "sma.h"
#define RSMA_QTASKINFO_PERSIST_MS 7200000 #define RSMA_QTASKINFO_BUFSIZE 32768
#define RSMA_QTASKINFO_BUFSIZE 32768 #define RSMA_QTASKINFO_HEAD_LEN (sizeof(int32_t) + sizeof(int8_t) + sizeof(int64_t)) // len + type + suid
#define RSMA_QTASKINFO_HEAD_LEN (sizeof(int32_t) + sizeof(int8_t) + sizeof(int64_t)) // len + type + suid
SSmaMgmt smaMgmt = { SSmaMgmt smaMgmt = {
.smaRef = -1, .smaRef = -1,
...@@ -43,9 +42,9 @@ static int32_t tdRSmaQTaskInfoIterNextBlock(SRSmaQTaskInfoIter *pIter, bool *isF ...@@ -43,9 +42,9 @@ static int32_t tdRSmaQTaskInfoIterNextBlock(SRSmaQTaskInfoIter *pIter, bool *isF
static int32_t tdRSmaQTaskInfoRestore(SSma *pSma, SRSmaQTaskInfoIter *pIter); static int32_t tdRSmaQTaskInfoRestore(SSma *pSma, SRSmaQTaskInfoIter *pIter);
static int32_t tdRSmaQTaskInfoItemRestore(SSma *pSma, const SRSmaQTaskInfoItem *infoItem); static int32_t tdRSmaQTaskInfoItemRestore(SSma *pSma, const SRSmaQTaskInfoItem *infoItem);
static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma); static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables);
static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma); static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma, int64_t *committed);
static int32_t tdRSmaRestoreTSDataReload(SSma *pSma); static int32_t tdRSmaRestoreTSDataReload(SSma *pSma, int64_t committed);
struct SRSmaInfoItem { struct SRSmaInfoItem {
SRSmaInfo *pRsmaInfo; SRSmaInfo *pRsmaInfo;
...@@ -88,7 +87,7 @@ struct SRSmaQTaskInfoIter { ...@@ -88,7 +87,7 @@ struct SRSmaQTaskInfoIter {
}; };
static void tdRSmaQTaskInfoGetFName(int32_t vgId, int64_t version, char *outputName) { static void tdRSmaQTaskInfoGetFName(int32_t vgId, int64_t version, char *outputName) {
tdGetVndFileName(vgId, VNODE_RSMA_DIR, TD_QTASKINFO_FNAME_PREFIX, version, outputName); tdGetVndFileName(vgId, NULL, VNODE_RSMA_DIR, TD_QTASKINFO_FNAME_PREFIX, version, outputName);
} }
static FORCE_INLINE int32_t tdRSmaQTaskInfoContLen(int32_t lenWithHead) { static FORCE_INLINE int32_t tdRSmaQTaskInfoContLen(int32_t lenWithHead) {
...@@ -114,12 +113,14 @@ void *tdFreeRSmaInfo(SRSmaInfo *pInfo) { ...@@ -114,12 +113,14 @@ void *tdFreeRSmaInfo(SRSmaInfo *pInfo) {
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
SRSmaInfoItem *pItem = &pInfo->items[i]; SRSmaInfoItem *pItem = &pInfo->items[i];
if (pItem->taskInfo) { if (pItem->taskInfo) {
smaDebug("vgId:%d, stb %" PRIi64 " stop fetch-timer %p level %d", SMA_VID(pSma), pInfo->suid, pItem->tmrId, if (pItem->tmrId) {
i + 1); smaDebug("vgId:%d, table %" PRIi64 " stop fetch timer %p level %d", SMA_VID(pSma), pInfo->suid, pItem->tmrId,
taosTmrStopA(&pItem->tmrId); i + 1);
taosTmrStopA(&pItem->tmrId);
}
tdFreeTaskHandle(&pItem->taskInfo, SMA_VID(pSma), i + 1); tdFreeTaskHandle(&pItem->taskInfo, SMA_VID(pSma), i + 1);
} else { } else {
smaDebug("vgId:%d, stb %" PRIi64 " no need to destroy rsma info level %d since empty taskInfo", SMA_VID(pSma), smaDebug("vgId:%d, table %" PRIi64 " no need to destroy rsma info level %d since empty taskInfo", SMA_VID(pSma),
pInfo->suid, i + 1); pInfo->suid, i + 1);
} }
} }
...@@ -358,13 +359,7 @@ int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con ...@@ -358,13 +359,7 @@ int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con
goto _err; goto _err;
} }
smaDebug("vgId:%d, register rsma info succeed for suid:%" PRIi64, SMA_VID(pSma), suid); smaDebug("vgId:%d, register rsma info succeed for table %" PRIi64, SMA_VID(pSma), suid);
// start the persist timer
if (TASK_TRIGGER_STAT_INIT ==
atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pStat), TASK_TRIGGER_STAT_INIT, TASK_TRIGGER_STAT_ACTIVE)) {
taosTmrStart(tdRSmaPersistTrigger, RSMA_QTASKINFO_PERSIST_MS, pStat, RSMA_TMR_HANDLE(pStat));
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_err: _err:
...@@ -748,7 +743,7 @@ int32_t tdProcessRSmaSubmit(SSma *pSma, void *pMsg, int32_t inputType) { ...@@ -748,7 +743,7 @@ int32_t tdProcessRSmaSubmit(SSma *pSma, void *pMsg, int32_t inputType) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma) { static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables) {
SVnode *pVnode = pSma->pVnode; SVnode *pVnode = pSma->pVnode;
SArray *suidList = taosArrayInit(1, sizeof(tb_uid_t)); SArray *suidList = taosArrayInit(1, sizeof(tb_uid_t));
...@@ -758,7 +753,12 @@ static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma) { ...@@ -758,7 +753,12 @@ static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma) {
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
int32_t arrSize = taosArrayGetSize(suidList); int64_t arrSize = taosArrayGetSize(suidList);
if (nTables) {
*nTables = arrSize;
}
if (arrSize == 0) { if (arrSize == 0) {
taosArrayDestroy(suidList); taosArrayDestroy(suidList);
smaDebug("vgId:%d, no need to restore rsma env since empty stb id list", TD_VID(pVnode)); smaDebug("vgId:%d, no need to restore rsma env since empty stb id list", TD_VID(pVnode));
...@@ -767,9 +767,9 @@ static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma) { ...@@ -767,9 +767,9 @@ static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma) {
SMetaReader mr = {0}; SMetaReader mr = {0};
metaReaderInit(&mr, SMA_META(pSma), 0); metaReaderInit(&mr, SMA_META(pSma), 0);
for (int32_t i = 0; i < arrSize; ++i) { for (int64_t i = 0; i < arrSize; ++i) {
tb_uid_t suid = *(tb_uid_t *)taosArrayGet(suidList, i); tb_uid_t suid = *(tb_uid_t *)taosArrayGet(suidList, i);
smaDebug("vgId:%d, rsma restore, suid[%d] is %" PRIi64, TD_VID(pVnode), i, suid); smaDebug("vgId:%d, rsma restore, suid is %" PRIi64, TD_VID(pVnode), suid);
if (metaGetTableEntryByUid(&mr, suid) < 0) { if (metaGetTableEntryByUid(&mr, suid) < 0) {
smaError("vgId:%d, rsma restore, failed to get table meta for %" PRIi64 " since %s", TD_VID(pVnode), suid, smaError("vgId:%d, rsma restore, failed to get table meta for %" PRIi64 " since %s", TD_VID(pVnode), suid,
terrstr()); terrstr());
...@@ -803,7 +803,7 @@ _err: ...@@ -803,7 +803,7 @@ _err:
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma) { static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma, int64_t *committed) {
SVnode *pVnode = pSma->pVnode; SVnode *pVnode = pSma->pVnode;
STFile tFile = {0}; STFile tFile = {0};
char qTaskInfoFName[TSDB_FILENAME_LEN] = {0}; char qTaskInfoFName[TSDB_FILENAME_LEN] = {0};
...@@ -814,6 +814,13 @@ static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma) { ...@@ -814,6 +814,13 @@ static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma) {
} }
if (!taosCheckExistFile(TD_TFILE_FULL_NAME(&tFile))) { if (!taosCheckExistFile(TD_TFILE_FULL_NAME(&tFile))) {
if (pVnode->state.committed > 0) {
smaWarn("vgId:%d, rsma restore for version %" PRIi64 ", not start as %s not exist", TD_VID(pVnode),
pVnode->state.committed, TD_TFILE_FULL_NAME(&tFile));
} else {
smaDebug("vgId:%d, rsma restore for version %" PRIi64 ", no need as %s not exist", TD_VID(pVnode),
pVnode->state.committed, TD_TFILE_FULL_NAME(&tFile));
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -839,9 +846,14 @@ static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma) { ...@@ -839,9 +846,14 @@ static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma) {
tdRSmaQTaskInfoIterDestroy(&fIter); tdRSmaQTaskInfoIterDestroy(&fIter);
tdCloseTFile(&tFile); tdCloseTFile(&tFile);
tdDestroyTFile(&tFile); tdDestroyTFile(&tFile);
// restored successfully from committed
*committed = pVnode->state.committed;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_err: _err:
smaError("rsma restore, qtaskinfo reload failed since %s", terrstr()); smaError("vgId:%d, rsma restore for version %" PRIi64 ", qtaskinfo reload failed since %s", TD_VID(pVnode),
pVnode->state.committed, terrstr());
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
...@@ -849,35 +861,45 @@ _err: ...@@ -849,35 +861,45 @@ _err:
* @brief reload ts data from checkpoint * @brief reload ts data from checkpoint
* *
* @param pSma * @param pSma
* @param committed restore from committed version
* @return int32_t * @return int32_t
*/ */
static int32_t tdRSmaRestoreTSDataReload(SSma *pSma) { static int32_t tdRSmaRestoreTSDataReload(SSma *pSma, int64_t committed) {
// TODO // TODO
smaDebug("vgId:%d, rsma restore from %" PRIi64 ", ts data reload success", SMA_VID(pSma), committed);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_err: _err:
smaError("rsma restore, ts data reload failed since %s", terrstr()); smaError("vgId:%d, rsma restore from %" PRIi64 ", ts data reload failed since %s", SMA_VID(pSma), committed,
terrstr());
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
int32_t tdProcessRSmaRestoreImpl(SSma *pSma) { int32_t tdProcessRSmaRestoreImpl(SSma *pSma) {
// step 1: iterate all stables to restore the rsma env // step 1: iterate all stables to restore the rsma env
if (tdRSmaRestoreQTaskInfoInit(pSma) < 0) { int64_t nTables = 0;
if (tdRSmaRestoreQTaskInfoInit(pSma, &nTables) < 0) {
goto _err; goto _err;
} }
if (nTables <= 0) {
smaDebug("vgId:%d, no need to restore rsma task since no tables", SMA_VID(pSma));
return TSDB_CODE_SUCCESS;
}
// step 2: retrieve qtaskinfo items from the persistence file(rsma/qtaskinfo) and restore // step 2: retrieve qtaskinfo items from the persistence file(rsma/qtaskinfo) and restore
if (tdRSmaRestoreQTaskInfoReload(pSma) < 0) { int64_t committed = -1;
if (tdRSmaRestoreQTaskInfoReload(pSma, &committed) < 0) {
goto _err; goto _err;
} }
// step 3: reload ts data from checkpoint // step 3: reload ts data from checkpoint
if (tdRSmaRestoreTSDataReload(pSma) < 0) { if (tdRSmaRestoreTSDataReload(pSma, committed) < 0) {
goto _err; goto _err;
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_err: _err:
smaError("failed to restore rsma task since %s", terrstr()); smaError("vgId:%d failed to restore rsma task since %s", SMA_VID(pSma), terrstr());
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
...@@ -1017,7 +1039,8 @@ static int32_t tdRSmaQTaskInfoRestore(SSma *pSma, SRSmaQTaskInfoIter *pIter) { ...@@ -1017,7 +1039,8 @@ static int32_t tdRSmaQTaskInfoRestore(SSma *pSma, SRSmaQTaskInfoIter *pIter) {
pIter->qBuf = taosDecodeFixedI32(pIter->qBuf, &qTaskInfoLenWithHead); pIter->qBuf = taosDecodeFixedI32(pIter->qBuf, &qTaskInfoLenWithHead);
if (qTaskInfoLenWithHead < RSMA_QTASKINFO_HEAD_LEN) { if (qTaskInfoLenWithHead < RSMA_QTASKINFO_HEAD_LEN) {
terrno = TSDB_CODE_TDB_FILE_CORRUPTED; terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
smaError("restore rsma qtaskinfo file %s failed since %s", TD_TFILE_FULL_NAME(pIter->pTFile), terrstr()); smaError("vgId:%d, restore rsma qtaskinfo file %s failed since %s", SMA_VID(pSma),
TD_TFILE_FULL_NAME(pIter->pTFile), terrstr());
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
...@@ -1054,13 +1077,17 @@ static int32_t tdRSmaQTaskInfoRestore(SSma *pSma, SRSmaQTaskInfoIter *pIter) { ...@@ -1054,13 +1077,17 @@ static int32_t tdRSmaQTaskInfoRestore(SSma *pSma, SRSmaQTaskInfoIter *pIter) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat) { int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat) {
SSma *pSma = pRSmaStat->pSma; SSma *pSma = pRSmaStat->pSma;
SVnode *pVnode = pSma->pVnode; SVnode *pVnode = pSma->pVnode;
int32_t vid = SMA_VID(pSma); int32_t vid = SMA_VID(pSma);
int64_t toffset = 0; int64_t toffset = 0;
bool isFileCreated = false; bool isFileCreated = false;
if (taosHashGetSize(RSMA_INFO_HASH(pRSmaStat)) <= 0) {
return TSDB_CODE_SUCCESS;
}
void *infoHash = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), NULL); void *infoHash = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), NULL);
if (!infoHash) { if (!infoHash) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -1099,11 +1126,15 @@ static int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat) { ...@@ -1099,11 +1126,15 @@ static int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat) {
char qTaskInfoFName[TSDB_FILENAME_LEN]; char qTaskInfoFName[TSDB_FILENAME_LEN];
tdRSmaQTaskInfoGetFName(vid, pSma->pVnode->state.applied, qTaskInfoFName); tdRSmaQTaskInfoGetFName(vid, pSma->pVnode->state.applied, qTaskInfoFName);
if (tdInitTFile(&tFile, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFName) < 0) { if (tdInitTFile(&tFile, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFName) < 0) {
smaError("vgId:%d, rsma persit, init %s failed since %s", vid, qTaskInfoFName, terrstr());
goto _err; goto _err;
} }
if (tdCreateTFile(&tFile, true, -1) < 0) { if (tdCreateTFile(&tFile, true, -1) < 0) {
smaError("vgId:%d, rsma persit, create %s failed since %s", vid, TD_TFILE_FULL_NAME(&tFile), terrstr());
goto _err; goto _err;
} }
smaDebug("vgId:%d, rsma, table %" PRIi64 " level %d serialize qTaskInfo, file %s created", vid, pRSmaInfo->suid,
i + 1, TD_TFILE_FULL_NAME(&tFile));
isFileCreated = true; isFileCreated = true;
} }
...@@ -1143,6 +1174,7 @@ static int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat) { ...@@ -1143,6 +1174,7 @@ static int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat) {
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_err: _err:
smaError("vgId:%d, rsma persit failed since %s", vid, terrstr());
if (isFileCreated) { if (isFileCreated) {
tdRemoveTFile(&tFile); tdRemoveTFile(&tFile);
tdDestroyTFile(&tFile); tdDestroyTFile(&tFile);
...@@ -1238,8 +1270,8 @@ static void tdRSmaPersistTrigger(void *param, void *tmrId) { ...@@ -1238,8 +1270,8 @@ static void tdRSmaPersistTrigger(void *param, void *tmrId) {
// start persist task // start persist task
tdRSmaPersistTask(pRSmaStat); tdRSmaPersistTask(pRSmaStat);
taosTmrReset(tdRSmaPersistTrigger, RSMA_QTASKINFO_PERSIST_MS, pRSmaStat, pRSmaStat->tmrHandle, // taosTmrReset(tdRSmaPersistTrigger, 5000, pRSmaStat, pRSmaStat->tmrHandle,
&pRSmaStat->tmrId); // RSMA_TMR_ID(pRSmaStat));
} else { } else {
atomic_store_8(RSMA_RUNNING_STAT(pRSmaStat), 0); atomic_store_8(RSMA_RUNNING_STAT(pRSmaStat), 0);
} }
......
...@@ -140,7 +140,7 @@ int64_t tdAppendTFile(STFile *pTFile, void *buf, int64_t nbyte, int64_t *offset) ...@@ -140,7 +140,7 @@ int64_t tdAppendTFile(STFile *pTFile, void *buf, int64_t nbyte, int64_t *offset)
return -1; return -1;
} }
#if 1 #if 0
smaDebug("append to file %s, offset:%" PRIi64 " nbyte:%" PRIi64 " fsize:%" PRIi64, TD_TFILE_FULL_NAME(pTFile), smaDebug("append to file %s, offset:%" PRIi64 " nbyte:%" PRIi64 " fsize:%" PRIi64, TD_TFILE_FULL_NAME(pTFile),
toffset, nbyte, toffset + nbyte); toffset, nbyte, toffset + nbyte);
#endif #endif
...@@ -181,16 +181,43 @@ void tdCloseTFile(STFile *pTFile) { ...@@ -181,16 +181,43 @@ void tdCloseTFile(STFile *pTFile) {
void tdDestroyTFile(STFile *pTFile) { taosMemoryFreeClear(TD_TFILE_FULL_NAME(pTFile)); } void tdDestroyTFile(STFile *pTFile) { taosMemoryFreeClear(TD_TFILE_FULL_NAME(pTFile)); }
void tdGetVndFileName(int32_t vgId, const char *dname, const char *fname, int64_t version, char *outputName) { void tdGetVndFileName(int32_t vgId, const char *pdname, const char *dname, const char *fname, int64_t version,
char *outputName) {
if (version < 0) { if (version < 0) {
snprintf(outputName, TSDB_FILENAME_LEN, "vnode/vnode%d/%s/v%d%s", vgId, dname, vgId, fname); if (pdname) {
snprintf(outputName, TSDB_FILENAME_LEN, "%s%svnode%svnode%d%s%s%sv%d%s", pdname, TD_DIRSEP, TD_DIRSEP, vgId,
TD_DIRSEP, dname, TD_DIRSEP, vgId, fname);
} else {
snprintf(outputName, TSDB_FILENAME_LEN, "vnode%svnode%d%s%s%sv%d%s", TD_DIRSEP, vgId, TD_DIRSEP, dname, TD_DIRSEP,
vgId, fname);
}
} else { } else {
snprintf(outputName, TSDB_FILENAME_LEN, "vnode/vnode%d/%s/v%d%s%" PRIi64, vgId, dname, vgId, fname, version); if (pdname) {
snprintf(outputName, TSDB_FILENAME_LEN, "%s%svnode%svnode%d%s%s%sv%d%s%" PRIi64, pdname, TD_DIRSEP, TD_DIRSEP,
vgId, TD_DIRSEP, dname, TD_DIRSEP, vgId, fname, version);
} else {
snprintf(outputName, TSDB_FILENAME_LEN, "vnode%svnode%d%s%s%sv%d%s%" PRIi64, TD_DIRSEP, vgId, TD_DIRSEP, dname,
TD_DIRSEP, vgId, fname, version);
}
} }
} }
void tdGetVndDirName(int32_t vgId, const char *dname, char *outputName) { void tdGetVndDirName(int32_t vgId, const char *pdname, const char *dname, bool endWithSep, char *outputName) {
snprintf(outputName, TSDB_FILENAME_LEN, "vnode/vnode%d/%s", vgId, dname); if (pdname) {
if (endWithSep) {
snprintf(outputName, TSDB_FILENAME_LEN, "%s%svnode%svnode%d%s%s%s", pdname, TD_DIRSEP, TD_DIRSEP, vgId, TD_DIRSEP,
dname, TD_DIRSEP);
} else {
snprintf(outputName, TSDB_FILENAME_LEN, "%s%svnode%svnode%d%s%s", pdname, TD_DIRSEP, TD_DIRSEP, vgId, TD_DIRSEP,
dname);
}
} else {
if (endWithSep) {
snprintf(outputName, TSDB_FILENAME_LEN, "vnode%svnode%d%s%s%s", TD_DIRSEP, vgId, TD_DIRSEP, dname, TD_DIRSEP);
} else {
snprintf(outputName, TSDB_FILENAME_LEN, "vnode%svnode%d%s%s", TD_DIRSEP, vgId, TD_DIRSEP, dname);
}
}
} }
int32_t tdInitTFile(STFile *pTFile, const char *dname, const char *fname) { int32_t tdInitTFile(STFile *pTFile, const char *dname, const char *fname) {
...@@ -215,35 +242,36 @@ int32_t tdInitTFile(STFile *pTFile, const char *dname, const char *fname) { ...@@ -215,35 +242,36 @@ int32_t tdInitTFile(STFile *pTFile, const char *dname, const char *fname) {
int32_t tdCreateTFile(STFile *pTFile, bool updateHeader, int8_t fType) { int32_t tdCreateTFile(STFile *pTFile, bool updateHeader, int8_t fType) {
ASSERT(pTFile->info.fsize == 0 && pTFile->info.magic == TD_FILE_INIT_MAGIC); ASSERT(pTFile->info.fsize == 0 && pTFile->info.magic == TD_FILE_INIT_MAGIC);
pTFile->pFile = taosOpenFile(TD_TFILE_FULL_NAME(pTFile), TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); pTFile->pFile = taosOpenFile(TD_TFILE_FULL_NAME(pTFile), TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
if (pTFile->pFile == NULL) { if (pTFile->pFile == NULL) {
if (errno == ENOENT) { if (errno == ENOENT) {
// Try to create directory recursively // Try to create directory recursively
if (taosMulMkDir(taosDirName(TD_TFILE_FULL_NAME(pTFile))) != 0) { char *s = strdup(TD_TFILE_FULL_NAME(pTFile));
if (taosMulMkDir(taosDirName(s)) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
taosMemoryFree(s);
return -1;
}
taosMemoryFree(s);
pTFile->pFile = taosOpenFile(TD_TFILE_FULL_NAME(pTFile), TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
if (pTFile->pFile == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
} else {
pTFile->pFile = taosOpenFile(TD_TFILE_FULL_NAME(pTFile), TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
if (pTFile->pFile == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
} }
} }
}
if (!updateHeader) { if (!updateHeader) {
return 0; return 0;
} }
pTFile->info.fsize += TD_FILE_HEAD_SIZE; pTFile->info.fsize += TD_FILE_HEAD_SIZE;
pTFile->info.fver = 0; pTFile->info.fver = 0;
if (tdUpdateTFileHeader(pTFile) < 0) { if (tdUpdateTFileHeader(pTFile) < 0) {
tdCloseTFile(pTFile); tdCloseTFile(pTFile);
tdRemoveTFile(pTFile); tdRemoveTFile(pTFile);
return -1; return -1;
}
} }
return 0; return 0;
......
...@@ -406,193 +406,6 @@ OVER: ...@@ -406,193 +406,6 @@ OVER:
return code; return code;
} }
#if 0
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
SMqPollReq* pReq = pMsg->pCont;
int64_t consumerId = pReq->consumerId;
int64_t timeout = pReq->timeout;
int32_t reqEpoch = pReq->epoch;
int64_t fetchOffset;
int32_t code = 0;
// get offset to fetch message
if (pReq->currentOffset >= 0) {
fetchOffset = pReq->currentOffset + 1;
} else {
STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, pReq->subKey);
if (pOffset != NULL) {
ASSERT(pOffset->val.type == TMQ_OFFSET__LOG);
tqDebug("consumer %ld, restore offset of %s on vg %d, offset(type:log) version: %ld", consumerId, pReq->subKey,
TD_VID(pTq->pVnode), pOffset->val.version);
fetchOffset = pOffset->val.version + 1;
} else {
if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__EARLIEAST) {
fetchOffset = walGetFirstVer(pTq->pWal);
} else if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__LATEST) {
fetchOffset = walGetCommittedVer(pTq->pWal);
} else if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__NONE) {
tqError("tmq poll: no offset committed for consumer %ld in vg %d, subkey %s", consumerId, TD_VID(pTq->pVnode),
pReq->subKey);
terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
return -1;
}
tqDebug("consumer %ld, restore offset of %s on vg %d failed, config is %ld, set to %ld", consumerId, pReq->subKey,
TD_VID(pTq->pVnode), pReq->currentOffset, fetchOffset);
}
}
tqDebug("tmq poll: consumer %ld (epoch %d) recv poll req in vg %d, req offset %ld fetch offset %ld", consumerId,
pReq->epoch, TD_VID(pTq->pVnode), pReq->currentOffset, fetchOffset);
STqHandle* pHandle = taosHashGet(pTq->handles, pReq->subKey, strlen(pReq->subKey));
/*ASSERT(pHandle);*/
if (pHandle == NULL) {
tqError("tmq poll: no consumer handle for consumer %ld in vg %d, subkey %s", consumerId, TD_VID(pTq->pVnode),
pReq->subKey);
return -1;
}
if (pHandle->consumerId != consumerId) {
tqError("tmq poll: consumer handle mismatch for consumer %ld in vg %d, subkey %s, handle consumer id %ld",
consumerId, TD_VID(pTq->pVnode), pReq->subKey, pHandle->consumerId);
return -1;
}
int32_t consumerEpoch = atomic_load_32(&pHandle->epoch);
while (consumerEpoch < reqEpoch) {
consumerEpoch = atomic_val_compare_exchange_32(&pHandle->epoch, consumerEpoch, reqEpoch);
}
SMqDataBlkRsp rsp = {0};
rsp.reqOffset = pReq->currentOffset;
rsp.blockData = taosArrayInit(0, sizeof(void*));
rsp.blockDataLen = taosArrayInit(0, sizeof(int32_t));
if (rsp.blockData == NULL || rsp.blockDataLen == NULL) {
return -1;
}
rsp.withTbName = pReq->withTbName;
if (rsp.withTbName) {
rsp.blockTbName = taosArrayInit(0, sizeof(void*));
}
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
rsp.withSchema = false;
} else {
rsp.withSchema = true;
rsp.blockSchema = taosArrayInit(0, sizeof(void*));
}
#if 1
if (pReq->useSnapshot) {
// TODO set ver into snapshot
int64_t lastVer = walGetCommittedVer(pTq->pWal);
if (rsp.reqOffset < lastVer) {
tqInfo("retrieve using snapshot req offset %ld last ver %ld", rsp.reqOffset, lastVer);
tqScanSnapshot(pTq, &pHandle->execHandle, &rsp, workerId);
if (rsp.blockNum != 0) {
rsp.withTbName = false;
rsp.rspOffset = lastVer;
tqInfo("direct send by snapshot req offset %ld rsp offset %ld", rsp.reqOffset, rsp.rspOffset);
fetchOffset = lastVer;
goto SEND_RSP;
}
}
}
#endif
SWalHead* pHeadWithCkSum = taosMemoryMalloc(sizeof(SWalHead) + 2048);
if (pHeadWithCkSum == NULL) {
return -1;
}
walSetReaderCapacity(pHandle->pWalReader, 2048);
while (1) {
consumerEpoch = atomic_load_32(&pHandle->epoch);
if (consumerEpoch > reqEpoch) {
tqWarn("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, found new consumer epoch %d, discard req epoch %d",
consumerId, pReq->epoch, TD_VID(pTq->pVnode), fetchOffset, consumerEpoch, reqEpoch);
break;
}
if (tqFetchLog(pTq, pHandle, &fetchOffset, &pHeadWithCkSum) < 0) {
// TODO add push mgr
break;
}
SWalCont* pHead = &pHeadWithCkSum->head;
tqDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d offset %ld msgType %d", consumerId, pReq->epoch,
TD_VID(pTq->pVnode), fetchOffset, pHead->msgType);
if (pHead->msgType == TDMT_VND_SUBMIT) {
SSubmitReq* pCont = (SSubmitReq*)&pHead->body;
if (tqDataExec(pTq, &pHandle->execHandle, pCont, &rsp, workerId) < 0) {
/*ASSERT(0);*/
}
} else {
ASSERT(pHandle->fetchMeta);
ASSERT(IS_META_MSG(pHead->msgType));
tqInfo("fetch meta msg, ver: %ld, type: %d", pHead->version, pHead->msgType);
SMqMetaRsp metaRsp = {0};
metaRsp.reqOffset = pReq->currentOffset;
metaRsp.rspOffset = fetchOffset;
metaRsp.resMsgType = pHead->msgType;
metaRsp.metaRspLen = pHead->bodyLen;
metaRsp.metaRsp = pHead->body;
if (tqSendMetaPollRsp(pTq, pMsg, pReq, &metaRsp) < 0) {
code = -1;
goto OVER;
}
code = 0;
goto OVER;
}
// TODO batch optimization:
// TODO continue scan until meeting batch requirement
if (rsp.blockNum > 0 /* threshold */) {
break;
} else {
fetchOffset++;
}
}
taosMemoryFree(pHeadWithCkSum);
SEND_RSP:
ASSERT(taosArrayGetSize(rsp.blockData) == rsp.blockNum);
ASSERT(taosArrayGetSize(rsp.blockDataLen) == rsp.blockNum);
if (rsp.withSchema) {
ASSERT(taosArrayGetSize(rsp.blockSchema) == rsp.blockNum);
}
rsp.rspOffset = fetchOffset;
if (tqSendDataRsp(pTq, pMsg, pReq, &rsp) < 0) {
code = -1;
}
OVER:
// TODO wrap in destroy func
taosArrayDestroy(rsp.blockDataLen);
taosArrayDestroyP(rsp.blockData, (FDelete)taosMemoryFree);
if (rsp.withSchema) {
taosArrayDestroyP(rsp.blockSchema, (FDelete)tDeleteSSchemaWrapper);
}
if (rsp.withTbName) {
taosArrayDestroyP(rsp.blockTbName, (FDelete)taosMemoryFree);
}
return code;
}
#endif
int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen) { int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen) {
SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg; SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
......
...@@ -69,6 +69,9 @@ int vnodeBegin(SVnode *pVnode) { ...@@ -69,6 +69,9 @@ int vnodeBegin(SVnode *pVnode) {
} }
} }
// begin sma
smaBegin(pVnode->pSma); // TODO: refactor to include the rsma1/rsma2 tsdbBegin() after tsdb_refact branch merged
return 0; return 0;
} }
...@@ -230,8 +233,8 @@ int vnodeCommit(SVnode *pVnode) { ...@@ -230,8 +233,8 @@ int vnodeCommit(SVnode *pVnode) {
} }
// preCommit // preCommit
// TODO smaPreCommit(pVnode->pSma);
// commit each sub-system // commit each sub-system
if (metaCommit(pVnode->pMeta) < 0) { if (metaCommit(pVnode->pMeta) < 0) {
ASSERT(0); ASSERT(0);
......
...@@ -152,12 +152,11 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { ...@@ -152,12 +152,11 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
return pVnode; return pVnode;
_err: _err:
if (pVnode->pSma) smaCloseEnv(pVnode->pSma);
if (pVnode->pQuery) vnodeQueryClose(pVnode); if (pVnode->pQuery) vnodeQueryClose(pVnode);
if (pVnode->pTq) tqClose(pVnode->pTq); if (pVnode->pTq) tqClose(pVnode->pTq);
if (pVnode->pWal) walClose(pVnode->pWal); if (pVnode->pWal) walClose(pVnode->pWal);
if (pVnode->pTsdb) tsdbClose(&pVnode->pTsdb); if (pVnode->pTsdb) tsdbClose(&pVnode->pTsdb);
if (pVnode->pSma) smaCloseEx(pVnode->pSma); if (pVnode->pSma) smaClose(pVnode->pSma);
if (pVnode->pMeta) metaClose(pVnode->pMeta); if (pVnode->pMeta) metaClose(pVnode->pMeta);
tsem_destroy(&(pVnode->canCommit)); tsem_destroy(&(pVnode->canCommit));
...@@ -167,14 +166,13 @@ _err: ...@@ -167,14 +166,13 @@ _err:
void vnodeClose(SVnode *pVnode) { void vnodeClose(SVnode *pVnode) {
if (pVnode) { if (pVnode) {
smaCloseEnv(pVnode->pSma);
vnodeCommit(pVnode); vnodeCommit(pVnode);
vnodeSyncClose(pVnode); vnodeSyncClose(pVnode);
vnodeQueryClose(pVnode); vnodeQueryClose(pVnode);
walClose(pVnode->pWal); walClose(pVnode->pWal);
tqClose(pVnode->pTq); tqClose(pVnode->pTq);
if (pVnode->pTsdb) tsdbClose(&pVnode->pTsdb); if (pVnode->pTsdb) tsdbClose(&pVnode->pTsdb);
smaCloseEx(pVnode->pSma); smaClose(pVnode->pSma);
metaClose(pVnode->pMeta); metaClose(pVnode->pMeta);
vnodeCloseBufPool(pVnode); vnodeCloseBufPool(pVnode);
// destroy handle // destroy handle
......
...@@ -1422,7 +1422,7 @@ void setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t ...@@ -1422,7 +1422,7 @@ void setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t
return; return;
} }
#ifdef BUF_PAGE_DEBUG #ifdef BUF_PAGE_DEBUG
qDebug("page_setbuf, groupId:%"PRIu64, groupId); qDebug("page_setbuf, groupId:%" PRIu64, groupId);
#endif #endif
doSetTableGroupOutputBuf(pOperator, pAggInfo, numOfOutput, groupId); doSetTableGroupOutputBuf(pOperator, pAggInfo, numOfOutput, groupId);
...@@ -1570,9 +1570,9 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprI ...@@ -1570,9 +1570,9 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprI
releaseBufPage(pBuf, page); releaseBufPage(pBuf, page);
pBlock->info.rows += pRow->numOfRows; pBlock->info.rows += pRow->numOfRows;
// if (pBlock->info.rows >= pBlock->info.capacity) { // output buffer is full // if (pBlock->info.rows >= pBlock->info.capacity) { // output buffer is full
// break; // break;
// } // }
} }
qDebug("%s result generated, rows:%d, groupId:%" PRIu64, GET_TASKID(pTaskInfo), pBlock->info.rows, qDebug("%s result generated, rows:%d, groupId:%" PRIu64, GET_TASKID(pTaskInfo), pBlock->info.rows,
...@@ -2868,7 +2868,24 @@ int32_t doPrepareScan(SOperatorInfo* pOperator, uint64_t uid, int64_t ts) { ...@@ -2868,7 +2868,24 @@ int32_t doPrepareScan(SOperatorInfo* pOperator, uint64_t uid, int64_t ts) {
pInfo->cond.twindows[0].skey = oldSkey; pInfo->cond.twindows[0].skey = oldSkey;
pInfo->scanTimes = 0; pInfo->scanTimes = 0;
pInfo->curTWinIdx = 0; pInfo->curTWinIdx = 0;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
int32_t tableSz = taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList);
bool found = false;
for (int32_t i = 0; i < tableSz; i++) {
STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, i);
if (pTableInfo->uid == uid) {
found = true;
pInfo->currentTable = i;
}
}
// TODO after processing drop,
ASSERT(found);
qDebug("tsdb reader offset seek to uid %ld ts %ld, table cur set to %d , all table num %d", uid, ts,
pInfo->currentTable, tableSz);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else { } else {
...@@ -4107,8 +4124,8 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, ...@@ -4107,8 +4124,8 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle,
} else { } else {
isNull[index++] = 0; isNull[index++] = 0;
char* data = nodesGetValueFromNode(pValue); char* data = nodesGetValueFromNode(pValue);
if (pValue->node.resType.type == TSDB_DATA_TYPE_JSON){ if (pValue->node.resType.type == TSDB_DATA_TYPE_JSON) {
if(tTagIsJson(data)){ if (tTagIsJson(data)) {
terrno = TSDB_CODE_QRY_JSON_IN_GROUP_ERROR; terrno = TSDB_CODE_QRY_JSON_IN_GROUP_ERROR;
taosMemoryFree(keyBuf); taosMemoryFree(keyBuf);
nodesClearList(groupNew); nodesClearList(groupNew);
...@@ -4173,7 +4190,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -4173,7 +4190,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
} else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == type) {
STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode; STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode;
int32_t code = createScanTableListInfo(pTableScanNode, pHandle, pTableListInfo, queryId, taskId); int32_t code = createScanTableListInfo(pTableScanNode, pHandle, pTableListInfo, queryId, taskId);
if(code){ if (code) {
pTaskInfo->code = code; pTaskInfo->code = code;
return NULL; return NULL;
} }
...@@ -4202,7 +4219,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -4202,7 +4219,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
}; };
if (pHandle) { if (pHandle) {
int32_t code = createScanTableListInfo(pTableScanNode, pHandle, pTableListInfo, queryId, taskId); int32_t code = createScanTableListInfo(pTableScanNode, pHandle, pTableListInfo, queryId, taskId);
if(code){ if (code) {
pTaskInfo->code = code; pTaskInfo->code = code;
return NULL; return NULL;
} }
......
...@@ -1329,6 +1329,13 @@ static void destroySysScanOperator(void* param, int32_t numOfOutput) { ...@@ -1329,6 +1329,13 @@ static void destroySysScanOperator(void* param, int32_t numOfOutput) {
taosArrayDestroy(pInfo->scanCols); taosArrayDestroy(pInfo->scanCols);
} }
static int32_t getSysTableDbNameColId(const char* pTable) {
// if (0 == strcmp(TSDB_INS_TABLE_USER_INDEXES, pTable)) {
// return 1;
// }
return TSDB_INS_USER_STABLES_DBNAME_COLID;
}
EDealRes getDBNameFromConditionWalker(SNode* pNode, void* pContext) { EDealRes getDBNameFromConditionWalker(SNode* pNode, void* pContext) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
ENodeType nType = nodeType(pNode); ENodeType nType = nodeType(pNode);
...@@ -1350,7 +1357,7 @@ EDealRes getDBNameFromConditionWalker(SNode* pNode, void* pContext) { ...@@ -1350,7 +1357,7 @@ EDealRes getDBNameFromConditionWalker(SNode* pNode, void* pContext) {
} }
SColumnNode* node = (SColumnNode*)pNode; SColumnNode* node = (SColumnNode*)pNode;
if (TSDB_INS_USER_STABLES_DBNAME_COLID == node->colId) { if (getSysTableDbNameColId(node->tableName) == node->colId) {
*(int32_t*)pContext = 2; *(int32_t*)pContext = 2;
return DEAL_RES_CONTINUE; return DEAL_RES_CONTINUE;
} }
......
...@@ -778,7 +778,6 @@ SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrd* pThrd, bool* ignore) { ...@@ -778,7 +778,6 @@ SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrd* pThrd, bool* ignore) {
*ignore = true; *ignore = true;
destroyCmsg(pMsg); destroyCmsg(pMsg);
return NULL; return NULL;
// assert(0);
} else { } else {
conn = exh->handle; conn = exh->handle;
transReleaseExHandle(refId); transReleaseExHandle(refId);
...@@ -812,7 +811,6 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { ...@@ -812,7 +811,6 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
cliMayCvtFqdnToIp(&pCtx->epSet, &pThrd->cvtAddr); cliMayCvtFqdnToIp(&pCtx->epSet, &pThrd->cvtAddr);
// transPrintEpSet(&pCtx->epSet);
bool ignore = false; bool ignore = false;
SCliConn* conn = cliGetConn(pMsg, pThrd, &ignore); SCliConn* conn = cliGetConn(pMsg, pThrd, &ignore);
if (ignore == true) { if (ignore == true) {
......
...@@ -9,8 +9,8 @@ ...@@ -9,8 +9,8 @@
## ---- db ## ---- db
./test.sh -f tsim/db/alter_option.sim ./test.sh -f tsim/db/alter_option.sim
./test.sh -f tsim/db/alter_replica_13.sim # ./test.sh -f tsim/db/alter_replica_13.sim
./test.sh -f tsim/db/alter_replica_31.sim # ./test.sh -f tsim/db/alter_replica_31.sim
./test.sh -f tsim/db/basic1.sim ./test.sh -f tsim/db/basic1.sim
./test.sh -f tsim/db/basic2.sim ./test.sh -f tsim/db/basic2.sim
./test.sh -f tsim/db/basic3.sim ./test.sh -f tsim/db/basic3.sim
...@@ -24,26 +24,26 @@ ...@@ -24,26 +24,26 @@
./test.sh -f tsim/db/taosdlog.sim ./test.sh -f tsim/db/taosdlog.sim
# ---- dnode # ---- dnode
./test.sh -f tsim/dnode/balance_replica1.sim # ./test.sh -f tsim/dnode/balance_replica1.sim
./test.sh -f tsim/dnode/balance_replica3.sim # ./test.sh -f tsim/dnode/balance_replica3.sim
./test.sh -f tsim/dnode/balance1.sim # ./test.sh -f tsim/dnode/balance1.sim
./test.sh -f tsim/dnode/balance2.sim # ./test.sh -f tsim/dnode/balance2.sim
./test.sh -f tsim/dnode/balance3.sim # ./test.sh -f tsim/dnode/balance3.sim
./test.sh -f tsim/dnode/balancex.sim # ./test.sh -f tsim/dnode/balancex.sim
./test.sh -f tsim/dnode/create_dnode.sim ./test.sh -f tsim/dnode/create_dnode.sim
./test.sh -f tsim/dnode/drop_dnode_has_mnode.sim ./test.sh -f tsim/dnode/drop_dnode_has_mnode.sim
./test.sh -f tsim/dnode/drop_dnode_has_qnode_snode.sim # ./test.sh -f tsim/dnode/drop_dnode_has_qnode_snode.sim
./test.sh -f tsim/dnode/drop_dnode_has_vnode_replica1.sim # ./test.sh -f tsim/dnode/drop_dnode_has_vnode_replica1.sim
./test.sh -f tsim/dnode/drop_dnode_has_vnode_replica3.sim # ./test.sh -f tsim/dnode/drop_dnode_has_vnode_replica3.sim
./test.sh -f tsim/dnode/drop_dnode_has_multi_vnode_replica1.sim # ./test.sh -f tsim/dnode/drop_dnode_has_multi_vnode_replica1.sim
./test.sh -f tsim/dnode/drop_dnode_has_multi_vnode_replica3.sim # ./test.sh -f tsim/dnode/drop_dnode_has_multi_vnode_replica3.sim
./test.sh -f tsim/dnode/offline_reason.sim ./test.sh -f tsim/dnode/offline_reason.sim
./test.sh -f tsim/dnode/redistribute_vgroup_replica1.sim # ./test.sh -f tsim/dnode/redistribute_vgroup_replica1.sim
./test.sh -f tsim/dnode/redistribute_vgroup_replica3_v1_leader.sim # ./test.sh -f tsim/dnode/redistribute_vgroup_replica3_v1_leader.sim
./test.sh -f tsim/dnode/redistribute_vgroup_replica3_v1_follower.sim # ./test.sh -f tsim/dnode/redistribute_vgroup_replica3_v1_follower.sim
./test.sh -f tsim/dnode/redistribute_vgroup_replica3_v2.sim # ./test.sh -f tsim/dnode/redistribute_vgroup_replica3_v2.sim
./test.sh -f tsim/dnode/redistribute_vgroup_replica3_v3.sim # ./test.sh -f tsim/dnode/redistribute_vgroup_replica3_v3.sim
./test.sh -f tsim/dnode/vnode_clean.sim # ./test.sh -f tsim/dnode/vnode_clean.sim
# ---- insert # ---- insert
./test.sh -f tsim/insert/basic0.sim ./test.sh -f tsim/insert/basic0.sim
...@@ -71,7 +71,7 @@ ...@@ -71,7 +71,7 @@
./test.sh -f tsim/qnode/basic1.sim ./test.sh -f tsim/qnode/basic1.sim
# ---- snode # ---- snode
./test.sh -f tsim/snode/basic1.sim # ./test.sh -f tsim/snode/basic1.sim
# ---- bnode # ---- bnode
./test.sh -f tsim/bnode/basic1.sim ./test.sh -f tsim/bnode/basic1.sim
...@@ -104,7 +104,7 @@ ...@@ -104,7 +104,7 @@
# ./test.sh -f tsim/stream/triggerSession0.sim # ./test.sh -f tsim/stream/triggerSession0.sim
./test.sh -f tsim/stream/partitionby.sim ./test.sh -f tsim/stream/partitionby.sim
./test.sh -f tsim/stream/partitionby1.sim ./test.sh -f tsim/stream/partitionby1.sim
./test.sh -f tsim/stream/schedSnode.sim # ./test.sh -f tsim/stream/schedSnode.sim
./test.sh -f tsim/stream/windowClose.sim ./test.sh -f tsim/stream/windowClose.sim
./test.sh -f tsim/stream/ignoreExpiredData.sim ./test.sh -f tsim/stream/ignoreExpiredData.sim
...@@ -169,13 +169,13 @@ ...@@ -169,13 +169,13 @@
./test.sh -f tsim/valgrind/checkError.sim -v ./test.sh -f tsim/valgrind/checkError.sim -v
# --- vnode # --- vnode
#./test.sh -f tsim/vnode/replica3_basic.sim # ./test.sh -f tsim/vnode/replica3_basic.sim
#./test.sh -f tsim/vnode/replica3_repeat.sim # ./test.sh -f tsim/vnode/replica3_repeat.sim
./test.sh -f tsim/vnode/replica3_vgroup.sim # ./test.sh -f tsim/vnode/replica3_vgroup.sim
#./test.sh -f tsim/vnode/replica3_many.sim # ./test.sh -f tsim/vnode/replica3_many.sim
#./test.sh -f tsim/vnode/replica3_import.sim # ./test.sh -f tsim/vnode/replica3_import.sim
./test.sh -f tsim/vnode/stable_balance_replica1.sim # ./test.sh -f tsim/vnode/stable_balance_replica1.sim
./test.sh -f tsim/vnode/stable_dnode2_stop.sim # ./test.sh -f tsim/vnode/stable_dnode2_stop.sim
./test.sh -f tsim/vnode/stable_dnode2.sim ./test.sh -f tsim/vnode/stable_dnode2.sim
./test.sh -f tsim/vnode/stable_dnode3.sim ./test.sh -f tsim/vnode/stable_dnode3.sim
./test.sh -f tsim/vnode/stable_replica3_dnode6.sim ./test.sh -f tsim/vnode/stable_replica3_dnode6.sim
......
...@@ -50,6 +50,21 @@ sql create table if not exists stb (ts timestamp, c1 int, c2 float, c3 double) t ...@@ -50,6 +50,21 @@ sql create table if not exists stb (ts timestamp, c1 int, c2 float, c3 double) t
print --> create sma print --> create sma
sql create sma index sma_index_name1 on stb function(max(c1),max(c2),min(c1)) interval(6m,10s) sliding(6m); sql create sma index sma_index_name1 on stb function(max(c1),max(c2),min(c1)) interval(6m,10s) sliding(6m);
print --> show sma
sql show indexes from stb from d1;
if $rows != 1 then
return -1
endi
if $data[0][0] != sma_index_name1 then
return -1
endi
if $data[0][1] != d1 then
return -1
endi
if $data[0][2] != stb then
return -1
endi
print --> drop stb print --> drop stb
sql drop table stb; sql drop table stb;
...@@ -61,6 +76,21 @@ sql create table if not exists stb (ts timestamp, c1 int, c2 float, c3 double) t ...@@ -61,6 +76,21 @@ sql create table if not exists stb (ts timestamp, c1 int, c2 float, c3 double) t
print --> create sma print --> create sma
sql create sma index sma_index_name1 on stb function(max(c1),max(c2),min(c1)) interval(6m,10s) sliding(6m); sql create sma index sma_index_name1 on stb function(max(c1),max(c2),min(c1)) interval(6m,10s) sliding(6m);
print --> show sma
sql show indexes from stb from d1;
if $rows != 1 then
return -1
endi
if $data[0][0] != sma_index_name1 then
return -1
endi
if $data[0][1] != d1 then
return -1
endi
if $data[0][2] != stb then
return -1
endi
print --> drop stb print --> drop stb
sql drop table stb; sql drop table stb;
......
...@@ -16,135 +16,150 @@ import string ...@@ -16,135 +16,150 @@ import string
from util.log import * from util.log import *
from util.cases import * from util.cases import *
from util.sql import * from util.sql import *
from util.sqlset import *
from util import constant
from util.common import *
class TDTestCase: class TDTestCase:
def init(self, conn, logSql): def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__) tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor()) tdSql.init(conn.cursor())
self.setsql = TDSetSql()
self.ntbname = 'ntb'
self.stbname = 'stb'
self.binary_length = 20 # the length of binary for column_dict
self.nchar_length = 20 # the length of nchar for column_dict
self.column_dict = {
'ts' : 'timestamp',
'col1': 'tinyint',
'col2': 'smallint',
'col3': 'int',
'col4': 'bigint',
'col5': 'tinyint unsigned',
'col6': 'smallint unsigned',
'col7': 'int unsigned',
'col8': 'bigint unsigned',
'col9': 'float',
'col10': 'double',
'col11': 'bool',
'col12': f'binary({self.binary_length})',
'col13': f'nchar({self.nchar_length})'
}
self.tag_dict = {
'ts_tag' : 'timestamp',
't1': 'tinyint',
't2': 'smallint',
't3': 'int',
't4': 'bigint',
't5': 'tinyint unsigned',
't6': 'smallint unsigned',
't7': 'int unsigned',
't8': 'bigint unsigned',
't9': 'float',
't10': 'double',
't11': 'bool',
't12': f'binary({self.binary_length})',
't13': f'nchar({self.nchar_length})'
}
self.tag_list = [
f'now,1,2,3,4,5,6,7,8,9.9,10.1,true,"abcd","涛思数据"'
]
self.tbnum = 1
self.values_list = [
f'now,1,2,3,4,5,6,7,8,9.9,10.1,true,"abcd","涛思数据"'
]
self.column_add_dict = {
'col_time' : 'timestamp',
'col_tinyint' : 'tinyint',
'col_smallint' : 'smallint',
'col_int' : 'int',
'col_bigint' : 'bigint',
'col_untinyint' : 'tinyint unsigned',
'col_smallint' : 'smallint unsigned',
'col_int' : 'int unsigned',
'col_bigint' : 'bigint unsigned',
'col_bool' : 'bool',
'col_float' : 'float',
'col_double' : 'double',
'col_binary' : f'binary({constant.BINARY_LENGTH_MAX})',
'col_nchar' : f'nchar({constant.NCAHR_LENGTH_MAX})'
def get_long_name(self, length, mode="mixed"): }
"""
generate long name
mode could be numbers/letters/letters_mixed/mixed
"""
if mode == "numbers":
population = string.digits
elif mode == "letters":
population = string.ascii_letters.lower()
elif mode == "letters_mixed":
population = string.ascii_letters.upper() + string.ascii_letters.lower()
else:
population = string.ascii_letters.lower() + string.digits
return "".join(random.choices(population, k=length))
def alter_stable_column_check(self,dbname,stbname,tbname):
tdSql.execute(f'create database if not exists {dbname}')
tdSql.execute(f'use {dbname}')
tdSql.execute(
f'create stable {stbname} (ts timestamp, c1 tinyint, c2 smallint, c3 int, \
c4 bigint, c5 tinyint unsigned, c6 smallint unsigned, c7 int unsigned, c8 bigint unsigned, c9 float, c10 double, c11 bool,c12 binary(20),c13 nchar(20)) tags(t0 int) ')
tdSql.execute(f'create table {tbname} using {stbname} tags(1)')
tdSql.execute(f'insert into {tbname} values (now,1,2,3,4,5,6,7,8,9.9,10.1,true,"abcd","涛思数据")')
tdSql.execute(f'alter stable {stbname} add column c14 int')
tdSql.query(f'select c14 from {stbname}')
tdSql.checkRows(1)
tdSql.execute(f'alter stable {stbname} add column `c15` int')
tdSql.query(f'select c15 from {stbname}')
tdSql.checkRows(1)
tdSql.query(f'describe {stbname}')
tdSql.checkRows(17)
tdSql.execute(f'alter stable {stbname} drop column c14')
tdSql.query(f'describe {stbname}')
tdSql.checkRows(16)
tdSql.execute(f'alter stable {stbname} drop column `c15`')
tdSql.query(f'describe {stbname}')
tdSql.checkRows(15)
tdSql.execute(f'alter stable {stbname} modify column c12 binary(30)')
tdSql.query(f'describe {stbname}')
tdSql.checkData(12,2,30)
tdSql.execute(f'alter stable {stbname} modify column `c12` binary(35)')
tdSql.query(f'describe {stbname}')
tdSql.checkData(12,2,35)
tdSql.error(f'alter stable {stbname} modify column `c12` binary(34)')
tdSql.execute(f'alter stable {stbname} modify column c13 nchar(30)')
tdSql.query(f'describe {stbname}')
tdSql.checkData(13,2,30)
tdSql.error(f'alter stable {stbname} modify column c13 nchar(29)')
tdSql.error(f'alter stable {stbname} rename column c1 c21')
tdSql.error(f'alter stable {stbname} modify column c1 int')
tdSql.error(f'alter stable {stbname} modify column c4 int')
tdSql.error(f'alter stable {stbname} modify column c8 int')
tdSql.error(f'alter stable {stbname} modify column c1 unsigned int')
tdSql.error(f'alter stable {stbname} modify column c9 double')
tdSql.error(f'alter stable {stbname} modify column c10 float')
tdSql.error(f'alter stable {stbname} modify column c11 int')
tdSql.error(f'alter stable {stbname} drop tag t0')
tdSql.execute(f'drop database {dbname}')
def alter_stable_tag_check(self,dbname,stbname,tbname):
tdSql.execute(f'create database if not exists {dbname}')
tdSql.execute(f'use {dbname}')
tdSql.execute(
f'create stable {stbname} (ts timestamp, c1 int) tags(ts_tag timestamp, t1 tinyint, t2 smallint, t3 int, \
t4 bigint, t5 tinyint unsigned, t6 smallint unsigned, t7 int unsigned, t8 bigint unsigned, t9 float, t10 double, t11 bool,t12 binary(20),t13 nchar(20)) ')
tdSql.execute(f'create table {tbname} using {stbname} tags(now,1,2,3,4,5,6,7,8,9.9,10.1,true,"abcd","涛思数据")')
tdSql.execute(f'insert into {tbname} values(now,1)')
tdSql.execute(f'alter stable {stbname} add tag t14 int')
tdSql.query(f'select t14 from {stbname}')
tdSql.checkRows(1)
tdSql.execute(f'alter stable {stbname} add tag `t15` int')
tdSql.query(f'select t14 from {stbname}')
tdSql.checkRows(1)
tdSql.query(f'describe {stbname}')
tdSql.checkRows(18)
tdSql.execute(f'alter stable {stbname} drop tag t14')
tdSql.query(f'describe {stbname}')
tdSql.checkRows(17)
tdSql.execute(f'alter stable {stbname} drop tag `t15`')
tdSql.query(f'describe {stbname}')
tdSql.checkRows(16)
tdSql.execute(f'alter stable {stbname} modify tag t12 binary(30)')
tdSql.query(f'describe {stbname}')
tdSql.checkData(14,2,30)
tdSql.execute(f'alter stable {stbname} modify tag `t12` binary(35)')
tdSql.query(f'describe {stbname}')
tdSql.checkData(14,2,35)
tdSql.error(f'alter stable {stbname} modify tag `t12` binary(34)')
tdSql.execute(f'alter stable {stbname} modify tag t13 nchar(30)')
tdSql.query(f'describe {stbname}')
tdSql.checkData(15,2,30)
tdSql.error(f'alter stable {stbname} modify tag t13 nchar(29)')
tdSql.execute(f'alter table {stbname} rename tag t1 t21')
tdSql.query(f'describe {stbname}')
tdSql.checkData(3,0,'t21')
tdSql.execute(f'alter table {stbname} rename tag `t21` t1')
tdSql.query(f'describe {stbname}')
tdSql.checkData(3,0,'t1')
for i in ['bigint','unsigned int','float','double','binary(10)','nchar(10)']:
for j in [1,2,3]:
tdSql.error(f'alter stable {stbname} modify tag t{j} {i}')
for i in ['int','unsigned int','float','binary(10)','nchar(10)']:
tdSql.error(f'alter stable {stbname} modify tag t8 {i}')
tdSql.error(f'alter stable {stbname} modify tag t4 int')
tdSql.error(f'alter stable {stbname} drop column t0')
#!bug TD-16410
# tdSql.error(f'alter stable {tbname} set tag t1=100 ')
# tdSql.execute(f'create table ntb (ts timestamp,c0 int)')
tdSql.error(f'alter stable ntb add column c2 ')
tdSql.execute(f'drop database {dbname}')
def alter_stable_check(self):
tdSql.prepare()
tdSql.execute(self.setsql.set_create_stable_sql(self.stbname,self.column_dict,self.tag_dict))
tdSql.execute(self.setsql.set_create_normaltable_sql(self.ntbname,self.column_dict))
for i in self.values_list:
tdSql.execute(f'insert into {self.ntbname} values({i})')
for i in range(self.tbnum):
tdSql.execute(f'create table {self.stbname}_{i} using {self.stbname} tags({self.tag_list[i]})')
for j in self.values_list:
tdSql.execute(f'insert into {self.stbname}_{i} values({j})')
for key,values in self.column_add_dict.items():
tdSql.execute(f'alter stable {self.stbname} add column {key} {values}')
tdSql.query(f'describe {self.stbname}')
tdSql.checkRows(len(self.column_dict)+len(self.tag_dict)+1)
for i in range(self.tbnum):
tdSql.query(f'describe {self.stbname}_{i}')
tdSql.checkRows(len(self.column_dict)+len(self.tag_dict)+1)
tdSql.query(f'select {key} from {self.stbname}_{i}')
tdSql.checkRows(len(self.values_list))
for i in range(self.tbnum):
tdSql.error(f'alter stable {self.stbname}_{i} add column {key} {values}')
tdSql.error(f'alter stable {self.stbname}_{i} drop column {key}')
#! bug TD-16921
#tdSql.error(f'alter stable {self.ntbname} add column {key} {values}')
#tdSql.error(f'alter stable {self.ntbname} drop column {key}')
tdSql.execute(f'alter stable {self.stbname} drop column {key}')
tdSql.query(f'describe {self.stbname}')
tdSql.checkRows(len(self.column_dict)+len(self.tag_dict))
for i in range(self.tbnum):
tdSql.query(f'describe {self.stbname}_{i}')
tdSql.checkRows(len(self.column_dict)+len(self.tag_dict))
tdSql.error(f'select {key} from {self.stbname} ')
for key,values in self.column_dict.items():
if 'binary' in values.lower():
v = f'binary({self.binary_length+1})'
v_error = f'binary({self.binary_length-1})'
tdSql.error(f'alter stable {self.stbname} modify column {key} {v_error}')
tdSql.execute(f'alter stable {self.stbname} modify column {key} {v}')
tdSql.query(f'describe {self.stbname}')
result = tdCom.getOneRow(1,'VARCHAR')
tdSql.checkEqual(result[0][2],self.binary_length+1)
for i in range(self.tbnum):
tdSql.query(f'describe {self.stbname}_{i}')
result = tdCom.getOneRow(1,'VARCHAR')
tdSql.checkEqual(result[0][2],self.binary_length+1)
tdSql.error(f'alter stable {self.stbname}_{i} modify column {key} {v}')
#! bug TD-16921
# tdSql.error(f'alter stable {self.ntbname} modify column {key} {v}')
elif 'nchar' in values.lower():
v = f'nchar({self.binary_length+1})'
v_error = f'nchar({self.binary_length-1})'
tdSql.error(f'alter stable {self.stbname} modify column {key} {v_error}')
tdSql.execute(f'alter stable {self.stbname} modify column {key} {v}')
tdSql.query(f'describe {self.stbname}')
result = tdCom.getOneRow(1,'NCHAR')
tdSql.checkEqual(result[0][2],self.binary_length+1)
for i in range(self.tbnum):
tdSql.query(f'describe {self.stbname}_{i}')
result = tdCom.getOneRow(1,'NCHAR')
tdSql.checkEqual(result[0][2],self.binary_length+1)
tdSql.error(f'alter stable {self.stbname}_{i} modify column {key} {v}')
#! bug TD-16921
#tdSql.error(f'alter stable {self.ntbname} modify column {key} {v}')
else:
for v in self.column_dict.values():
tdSql.error(f'alter stable {self.stbname} modify column {key} {v}')
# tdSql.error(f'alter stable {self.ntbname} modify column {key} {v}')
for i in range(self.tbnum):
tdSql.error(f'alter stable {self.stbname}_{i} modify column {key} {v}')
def run(self): def run(self):
dbname = self.get_long_name(length=10, mode="letters") self.alter_stable_check()
stbname = self.get_long_name(length=5, mode="letters")
tbname = self.get_long_name(length=5, mode="letters")
self.alter_stable_column_check(dbname,stbname,tbname)
self.alter_stable_tag_check(dbname,stbname,tbname)
def stop(self): def stop(self):
tdSql.close() tdSql.close()
tdLog.success("%s successfully executed" % __file__) tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase()) tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase()) tdCases.addLinux(__file__, TDTestCase())
\ No newline at end of file
...@@ -126,6 +126,7 @@ class TDTestCase: ...@@ -126,6 +126,7 @@ class TDTestCase:
tdSql.execute(f'alter table {self.ntbname} rename column {key} {rename_str}') tdSql.execute(f'alter table {self.ntbname} rename column {key} {rename_str}')
tdSql.query(f'select {rename_str} from {self.ntbname}') tdSql.query(f'select {rename_str} from {self.ntbname}')
tdSql.checkRows(1) tdSql.checkRows(1)
tdSql.error(f'select {key} from {self.ntbname}')
def alter_check_tb(self): def alter_check_tb(self):
tag_tinyint = random.randint(constant.TINYINT_MIN,constant.TINYINT_MAX) tag_tinyint = random.randint(constant.TINYINT_MIN,constant.TINYINT_MAX)
...@@ -277,7 +278,11 @@ class TDTestCase: ...@@ -277,7 +278,11 @@ class TDTestCase:
else: else:
for v in self.column_dict.values(): for v in self.column_dict.values():
tdSql.error(f'alter table {self.stbname} modify column {key} {v}') tdSql.error(f'alter table {self.stbname} modify column {key} {v}')
for key,values in self.column_dict.items():
rename_str = f'{tdCom.getLongName(constant.COL_NAME_LENGTH_MAX,"letters")}'
tdSql.error(f'alter table {self.stbname} rename column {key} {rename_str}')
for i in range(self.tbnum):
tdSql.error(f'alter table {self.stbname}_{i} rename column {key} {rename_str}')
def run(self): def run(self):
self.alter_check_ntb() self.alter_check_ntb()
self.alter_check_tb() self.alter_check_tb()
......
...@@ -40,6 +40,7 @@ class TDTestCase: ...@@ -40,6 +40,7 @@ class TDTestCase:
self.time_unit = ['b','u','a','s','m','h','d','w'] self.time_unit = ['b','u','a','s','m','h','d','w']
self.symbol = ['+','-','*','/'] self.symbol = ['+','-','*','/']
self.error_values = [1.5,'abc','"abc"','!@','today()'] self.error_values = [1.5,'abc','"abc"','!@','today()']
self.db_percision = ['ms','us','ns']
def tbtype_check(self,tb_type): def tbtype_check(self,tb_type):
if tb_type == 'normal table' or tb_type == 'child table': if tb_type == 'normal table' or tb_type == 'child table':
tdSql.checkRows(len(self.values_list)) tdSql.checkRows(len(self.values_list))
...@@ -70,23 +71,29 @@ class TDTestCase: ...@@ -70,23 +71,29 @@ class TDTestCase:
tdSql.checkData(i,0,None) tdSql.checkData(i,0,None)
def now_check_ntb(self): def now_check_ntb(self):
tdSql.prepare() for time_unit in self.db_percision:
tdSql.execute(self.setsql.set_create_normaltable_sql(self.ntbname,self.column_dict)) tdSql.execute(f'create database db precision "{time_unit}"')
for value in self.values_list: tdSql.execute('use db')
tdSql.execute( tdSql.execute(self.setsql.set_create_normaltable_sql(self.ntbname,self.column_dict))
f'insert into {self.ntbname} values({value})') for value in self.values_list:
self.data_check(self.ntbname,'normal table') tdSql.execute(
f'insert into {self.ntbname} values({value})')
self.data_check(self.ntbname,'normal table')
tdSql.execute('drop database db')
def now_check_stb(self): def now_check_stb(self):
tdSql.prepare() for time_unit in self.db_percision:
tdSql.execute(self.setsql.set_create_stable_sql(self.stbname,self.column_dict,self.tag_dict)) tdSql.execute(f'create database db precision "{time_unit}"')
for i in range(self.tbnum): tdSql.execute('use db')
tdSql.execute(f"create table {self.stbname}_{i} using {self.stbname} tags({self.tag_values[0]})") tdSql.execute(self.setsql.set_create_stable_sql(self.stbname,self.column_dict,self.tag_dict))
for value in self.values_list: for i in range(self.tbnum):
tdSql.execute(f'insert into {self.stbname}_{i} values({value})') tdSql.execute(f"create table {self.stbname}_{i} using {self.stbname} tags({self.tag_values[0]})")
for i in range(self.tbnum): for value in self.values_list:
self.data_check(f'{self.stbname}_{i}','child table') tdSql.execute(f'insert into {self.stbname}_{i} values({value})')
self.data_check(self.stbname,'stable') for i in range(self.tbnum):
self.data_check(f'{self.stbname}_{i}','child table')
self.data_check(self.stbname,'stable')
tdSql.execute('drop database db')
def run(self): # sourcery skip: extract-duplicate-method def run(self): # sourcery skip: extract-duplicate-method
self.now_check_ntb() self.now_check_ntb()
......
...@@ -5,6 +5,7 @@ from util.log import * ...@@ -5,6 +5,7 @@ from util.log import *
from util.sql import * from util.sql import *
from util.cases import * from util.cases import *
import datetime import datetime
import pandas as pd
class TDTestCase: class TDTestCase:
...@@ -12,8 +13,8 @@ class TDTestCase: ...@@ -12,8 +13,8 @@ class TDTestCase:
def init(self, conn, logSql): def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}") tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor()) tdSql.init(conn.cursor())
self.today_date = datetime.datetime.strptime( self.today_date = datetime.datetime.strptime(datetime.datetime.now().strftime("%Y-%m-%d"), "%Y-%m-%d")
datetime.datetime.now().strftime("%Y-%m-%d"), "%Y-%m-%d") self.today_ts = datetime.datetime.strptime(datetime.datetime.now().strftime("%Y-%m-%d"), "%Y-%m-%d").timestamp()
self.time_unit = ['b','u','a','s','m','h','d','w'] self.time_unit = ['b','u','a','s','m','h','d','w']
self.error_param = ['1.5','abc','!@#','"abc"','today()'] self.error_param = ['1.5','abc','!@#','"abc"','today()']
self.arithmetic_operators = ['+','-','*','/'] self.arithmetic_operators = ['+','-','*','/']
...@@ -41,7 +42,7 @@ class TDTestCase: ...@@ -41,7 +42,7 @@ class TDTestCase:
f'today(),3,3.333,333.333333,now()', f'today(),3,3.333,333.333333,now()',
f'today()-1d,10,11.11,99.999999,now()', f'today()-1d,10,11.11,99.999999,now()',
f'today()+1d,1,1.55,100.555555,today()'] f'today()+1d,1,1.55,100.555555,today()']
self.db_percision = ['ms','us','ns']
def set_create_normaltable_sql(self, ntbname, column_dict): def set_create_normaltable_sql(self, ntbname, column_dict):
column_sql = '' column_sql = ''
for k, v in column_dict.items(): for k, v in column_dict.items():
...@@ -57,7 +58,8 @@ class TDTestCase: ...@@ -57,7 +58,8 @@ class TDTestCase:
tag_sql += f"{k} {v}," tag_sql += f"{k} {v},"
create_stb_sql = f'create table {stbname} ({column_sql[:-1]}) tags({tag_sql[:-1]})' create_stb_sql = f'create table {stbname} ({column_sql[:-1]}) tags({tag_sql[:-1]})'
return create_stb_sql return create_stb_sql
def data_check(self,column_dict={},tbname = '',values_list = [],tb_num = 1,tb = 'tb'):
def data_check(self,column_dict={},tbname = '',values_list = [],tb_num = 1,tb = 'tb',precision = 'ms'):
for k,v in column_dict.items(): for k,v in column_dict.items():
num_up = 0 num_up = 0
num_down = 0 num_down = 0
...@@ -65,12 +67,27 @@ class TDTestCase: ...@@ -65,12 +67,27 @@ class TDTestCase:
if v.lower() == 'timestamp': if v.lower() == 'timestamp':
tdSql.query(f'select {k} from {tbname}') tdSql.query(f'select {k} from {tbname}')
for i in tdSql.queryResult: for i in tdSql.queryResult:
if i[0] > self.today_date: if precision == 'ms':
num_up += 1 if int(i[0].timestamp())*1000 > int(self.today_ts)*1000:
elif i[0] == self.today_date: num_up += 1
num_same += 1 elif int(i[0].timestamp())*1000 == int(self.today_ts)*1000:
elif i[0] < self.today_date: num_same += 1
num_down += 1 elif int(i[0].timestamp())*1000 < int(self.today_ts)*1000:
num_down += 1
elif precision == 'us':
if int(i[0].timestamp())*1000000 > int(self.today_ts)*1000000:
num_up += 1
elif int(i[0].timestamp())*1000000 == int(self.today_ts)*1000000:
num_same += 1
elif int(i[0].timestamp())*1000000 < int(self.today_ts)*1000000:
num_down += 1
elif precision == 'ns':
if i[0] > int(self.today_ts)*1000000000:
num_up += 1
elif i[0] == int(self.today_ts)*1000000000:
num_same += 1
elif i[0] < int(self.today_ts)*1000000000:
num_down += 1
tdSql.query(f"select today() from {tbname}") tdSql.query(f"select today() from {tbname}")
tdSql.checkRows(len(values_list)*tb_num) tdSql.checkRows(len(values_list)*tb_num)
tdSql.checkData(0, 0, str(self.today_date)) tdSql.checkData(0, 0, str(self.today_date))
...@@ -130,32 +147,36 @@ class TDTestCase: ...@@ -130,32 +147,36 @@ class TDTestCase:
for i in range(num_same): for i in range(num_same):
tdSql.checkData(i, 0, str(self.today_date)) tdSql.checkData(i, 0, str(self.today_date))
def today_check_ntb(self): def today_check_ntb(self):
tdSql.prepare() for time_unit in self.db_percision:
tdSql.execute(self.set_create_normaltable_sql(self.ntbname,self.column_dict)) print(time_unit)
for i in self.values_list: tdSql.execute(f'create database db precision "{time_unit}"')
tdSql.execute( tdSql.execute('use db')
f'insert into {self.ntbname} values({i})') tdSql.execute(self.set_create_normaltable_sql(self.ntbname,self.column_dict))
self.data_check(self.column_dict,self.ntbname,self.values_list) for i in self.values_list:
tdSql.execute('drop database db') tdSql.execute(
f'insert into {self.ntbname} values({i})')
self.data_check(self.column_dict,self.ntbname,self.values_list,1,'tb',time_unit)
tdSql.execute('drop database db')
def today_check_stb_tb(self): def today_check_stb_tb(self):
tdSql.prepare() for time_unit in self.db_percision:
tdSql.execute(self.set_create_stable_sql(self.stbname,self.column_dict,self.tag_dict)) print(time_unit)
for i in range(self.tbnum): tdSql.execute(f'create database db precision "{time_unit}"')
tdSql.execute(f'create table if not exists {self.stbname}_{i} using {self.stbname} tags({self.tag_values[i]})') tdSql.execute('use db')
for j in self.values_list: tdSql.execute(self.set_create_stable_sql(self.stbname,self.column_dict,self.tag_dict))
tdSql.execute(f'insert into {self.stbname}_{i} values ({j})') for i in range(self.tbnum):
# check child table tdSql.execute(f'create table if not exists {self.stbname}_{i} using {self.stbname} tags({self.tag_values[i]})')
for i in range(self.tbnum): for j in self.values_list:
self.data_check(self.column_dict,f'{self.stbname}_{i}',self.values_list) tdSql.execute(f'insert into {self.stbname}_{i} values ({j})')
# check stable # check child table
self.data_check(self.column_dict,self.stbname,self.values_list,self.tbnum,'stb') for i in range(self.tbnum):
tdSql.execute('drop database db') self.data_check(self.column_dict,f'{self.stbname}_{i}',self.values_list,1,'tb',time_unit)
# check stable
self.data_check(self.column_dict,self.stbname,self.values_list,self.tbnum,'stb',time_unit)
tdSql.execute('drop database db')
def run(self): # sourcery skip: extract-duplicate-method def run(self): # sourcery skip: extract-duplicate-method
tdLog.printNoPrefix("==========check today() for normal table ==========")
self.today_check_ntb() self.today_check_ntb()
tdLog.printNoPrefix("==========check today() for stable and child table==========")
self.today_check_stb_tb() self.today_check_stb_tb()
def stop(self): def stop(self):
......
import taos
import sys
import time
import socket
import os
import threading
import math
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
from util.common import *
sys.path.append("./7-tmq")
from tmqCommon import *
class TDTestCase:
def __init__(self):
self.vgroups = 4
self.ctbNum = 10
self.rowsPerTbl = 10000
def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor(), False)
def prepareTestEnv(self):
tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ")
paraDict = {'dbName': 'dbt',
'dropFlag': 1,
'event': '',
'vgroups': 1,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 10,
'rowsPerTbl': 10000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 3,
'showMsg': 1,
'showRow': 1,
'snapshot': 1}
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
tmqCom.initConsumerTable()
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
tdLog.info("create stb")
tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"])
tdLog.info("create ctb")
tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'],
ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx'])
tdLog.info("insert data")
tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
tdLog.info("restart taosd to ensure that the data falls into the disk")
tdDnodes.stop(1)
# tdDnodes.start(1)
tdDnodes.starttaosd(1)
return
def tmqCase1(self):
tdLog.printNoPrefix("======== test case 1: ")
paraDict = {'dbName': 'dbt',
'dropFlag': 1,
'event': '',
'vgroups': 1,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 10,
'rowsPerTbl': 10000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 3,
'showMsg': 1,
'showRow': 1,
'snapshot': 1}
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
topicNameList = ['topic1']
expectRowsList = []
tmqCom.initConsumerTable()
tdLog.info("create topics from stb with filter")
queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
# sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
tdSql.query(queryString)
expectRowsList.append(tdSql.getRows())
# init consume info, and start tmq_sim, then check consume result
tdLog.info("insert consume info to consume processor")
consumerId = 0
expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"]
topicList = topicNameList[0]
ifcheckdata = 1
ifManualCommit = 1
keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:1000, auto.offset.reset:earliest'
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
tdLog.info("wait the consume result")
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
if expectRowsList[0] != resultList[0]:
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0]))
tdLog.exit("%d tmq consume rows error!"%consumerId)
# tmqCom.checkFileContent(consumerId, queryString)
time.sleep(10)
for i in range(len(topicNameList)):
tdSql.query("drop topic %s"%topicNameList[i])
tdLog.printNoPrefix("======== test case 1 end ...... ")
def tmqCase2(self):
tdLog.printNoPrefix("======== test case 2: ")
paraDict = {'dbName': 'dbt',
'dropFlag': 1,
'event': '',
'vgroups': 1,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 10,
'rowsPerTbl': 10000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 3,
'showMsg': 1,
'showRow': 1,
'snapshot': 1}
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
topicNameList = ['topic1']
expectRowsList = []
tmqCom.initConsumerTable()
tdLog.info("create topics from stb with filter")
queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
# sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
tdSql.query(queryString)
expectRowsList.append(tdSql.getRows())
totalRowsInserted = expectRowsList[0]
# init consume info, and start tmq_sim, then check consume result
tdLog.info("insert consume info to consume processor")
consumerId = 1
expectrowcnt = math.ceil(paraDict["rowsPerTbl"] * paraDict["ctbNum"] / 3)
topicList = topicNameList[0]
ifcheckdata = 1
ifManualCommit = 1
keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:1000, auto.offset.reset:earliest'
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor 0")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
tdLog.info("wait the consume result")
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
if not (expectrowcnt <= resultList[0] and totalRowsInserted >= resultList[0]):
tdLog.info("act consume rows: %d, expect consume rows between %d and %d"%(resultList[0], expectrowcnt, totalRowsInserted))
tdLog.exit("%d tmq consume rows error!"%consumerId)
firstConsumeRows = resultList[0]
# reinit consume info, and start tmq_sim, then check consume result
tmqCom.initConsumerTable()
consumerId = 2
expectrowcnt = math.ceil(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2/3)
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor 1")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
tdLog.info("wait the consume result")
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
actConsumeTotalRows = firstConsumeRows + resultList[0]
if not (expectrowcnt >= resultList[0] and totalRowsInserted == actConsumeTotalRows):
tdLog.info("act consume rows, first: %d, second: %d "%(firstConsumeRows, resultList[0]))
tdLog.info("and sum of two consume rows: %d should be equal to total inserted rows: %d"%(actConsumeTotalRows, totalRowsInserted))
tdLog.exit("%d tmq consume rows error!"%consumerId)
time.sleep(10)
for i in range(len(topicNameList)):
tdSql.query("drop topic %s"%topicNameList[i])
tdLog.printNoPrefix("======== test case 2 end ...... ")
def run(self):
tdSql.prepare()
self.prepareTestEnv()
self.tmqCase1()
self.tmqCase2()
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
event = threading.Event()
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())
...@@ -116,8 +116,8 @@ python3 ./test.py -f 2-query/irate.py ...@@ -116,8 +116,8 @@ python3 ./test.py -f 2-query/irate.py
python3 ./test.py -f 2-query/function_null.py python3 ./test.py -f 2-query/function_null.py
python3 ./test.py -f 2-query/queryQnode.py python3 ./test.py -f 2-query/queryQnode.py
python3 ./test.py -f 6-cluster/5dnode1mnode.py #python3 ./test.py -f 6-cluster/5dnode1mnode.py
python3 ./test.py -f 6-cluster/5dnode2mnode.py -N 5 -M 3 #python3 ./test.py -f 6-cluster/5dnode2mnode.py -N 5 -M 3
#python3 ./test.py -f 6-cluster/5dnode3mnodeStop.py -N 5 -M 3 #python3 ./test.py -f 6-cluster/5dnode3mnodeStop.py -N 5 -M 3
#python3 ./test.py -f 6-cluster/5dnode3mnodeStopLoop.py -N 5 -M 3 #python3 ./test.py -f 6-cluster/5dnode3mnodeStopLoop.py -N 5 -M 3
# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateDb.py -N 5 -M 3 # BUG python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateDb.py -N 5 -M 3
...@@ -157,3 +157,4 @@ python3 ./test.py -f 7-tmq/tmqShow.py ...@@ -157,3 +157,4 @@ python3 ./test.py -f 7-tmq/tmqShow.py
python3 ./test.py -f 7-tmq/tmqAlterSchema.py python3 ./test.py -f 7-tmq/tmqAlterSchema.py
python3 ./test.py -f 7-tmq/tmqConsFromTsdb.py python3 ./test.py -f 7-tmq/tmqConsFromTsdb.py
python3 ./test.py -f 7-tmq/tmqConsFromTsdb1.py python3 ./test.py -f 7-tmq/tmqConsFromTsdb1.py
python3 ./test.py -f 7-tmq/tmqConsFromTsdb-mutilVg.py
Subproject commit 5fdd694621fbb7bd2d6102ff4feaec92a7001038 Subproject commit 7105027650b51e701cfa1dac11b8fb42d447dd01
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册