diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 7337f1afb89dd8da35565e68765e7b7d84b99eaf..eda5ba7a03c0b9bab351f924b48bf0d7cf60e061 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -100,7 +100,7 @@ typedef enum _mgmt_table { TSDB_MGMT_TABLE_STREAMS, TSDB_MGMT_TABLE_VARIABLES, TSDB_MGMT_TABLE_CONNS, - TSDB_MGMT_TABLE_SCORES, + TSDB_MGMT_TABLE_TRANS, TSDB_MGMT_TABLE_GRANTS, TSDB_MGMT_TABLE_VNODES, TSDB_MGMT_TABLE_CLUSTER, diff --git a/include/util/tdef.h b/include/util/tdef.h index 747020a4f997f54836c5fe45a3d919c14d4db066..9b18c2dfb85eddfa0cf34994e8bbdcc95f3381bc 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -214,6 +214,10 @@ do { \ #define TSDB_SHOW_SUBQUERY_LEN 1000 #define TSDB_SLOW_QUERY_SQL_LEN 512 +#define TSDB_TRANS_STAGE_LEN 12 +#define TSDB_TRANS_DESC_LEN 16 +#define TSDB_TRANS_ERROR_LEN 128 + #define TSDB_STEP_NAME_LEN 32 #define TSDB_STEP_DESC_LEN 128 diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 73c0446e3f4720f5d1b5c6b9fb067ce09544b5fe..abc802c588e907de663adf639f119009abcd3b78 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -104,6 +104,10 @@ typedef enum { TRN_STAGE_FINISHED = 8 } ETrnStage; +typedef enum { + TRN_TYPE_CREATE_DB = 0, +} ETrnType; + typedef enum { TRN_POLICY_ROLLBACK = 0, TRN_POLICY_RETRY = 1 } ETrnPolicy; typedef enum { @@ -135,6 +139,12 @@ typedef struct { SArray* commitLogs; SArray* redoActions; SArray* undoActions; + int64_t createdTime; + int64_t lastExecTime; + int32_t transType; + uint64_t dbUid; + char dbname[TSDB_DB_NAME_LEN]; + char lastError[TSDB_TRANS_DESC_LEN]; } STrans; typedef struct { diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index e772ff326c7dd5f745bb0c39d62e034da99e29bd..eb5f39e983e916b92043189191a7d96c8335c2c9 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -914,6 +914,8 @@ static void mndBuildDBVgroupInfo(SDbObj *pDb, SMnode *pMnode, SArray *pVgList) { sdbRelease(pSdb, pVgroup); } + + sdbCancelFetch(pSdb, pIter); } static int32_t mndProcessUseDbReq(SMnodeMsg *pReq) { diff --git a/source/dnode/mnode/impl/src/mndShow.c b/source/dnode/mnode/impl/src/mndShow.c index 3198ded37e0e0b8534dd79d6f302c749cfde3f90..afb338e97d7c53bbc0e99cd46db0386f26270ffb 100644 --- a/source/dnode/mnode/impl/src/mndShow.c +++ b/source/dnode/mnode/impl/src/mndShow.c @@ -295,8 +295,8 @@ char *mndShowStr(int32_t showType) { return "show configs"; case TSDB_MGMT_TABLE_CONNS: return "show connections"; - case TSDB_MGMT_TABLE_SCORES: - return "show scores"; + case TSDB_MGMT_TABLE_TRANS: + return "show trans"; case TSDB_MGMT_TABLE_GRANTS: return "show grants"; case TSDB_MGMT_TABLE_VNODES: diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 53ffd8698fd4bdf350cb9b437bfc726e19367f4f..c5b18bcde0649e94ef1a8ef815dda8e285591776 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -420,7 +420,10 @@ static int32_t mndSetCreateStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj while (1) { pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); if (pIter == NULL) break; - if (pVgroup->dbUid != pDb->uid) continue; + if (pVgroup->dbUid != pDb->uid) { + sdbRelease(pSdb, pVgroup); + continue; + } void *pReq = mndBuildVCreateStbReq(pMnode, pVgroup, pStb, &contLen); if (pReq == NULL) { @@ -455,7 +458,10 @@ static int32_t mndSetCreateStbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj while (1) { pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); if (pIter == NULL) break; - if (pVgroup->dbUid != pDb->uid) continue; + if (pVgroup->dbUid != pDb->uid) { + sdbRelease(pSdb, pVgroup); + continue; + } int32_t contLen = 0; void *pReq = mndBuildVDropStbReq(pMnode, pVgroup, pStb, &contLen); @@ -942,7 +948,10 @@ static int32_t mndSetAlterStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj while (1) { pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); if (pIter == NULL) break; - if (pVgroup->dbUid != pDb->uid) continue; + if (pVgroup->dbUid != pDb->uid) { + sdbRelease(pSdb, pVgroup); + continue; + } void *pReq = mndBuildVCreateStbReq(pMnode, pVgroup, pStb, &contLen); if (pReq == NULL) { @@ -1116,7 +1125,10 @@ static int32_t mndSetDropStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj * while (1) { pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); if (pIter == NULL) break; - if (pVgroup->dbUid != pDb->uid) continue; + if (pVgroup->dbUid != pDb->uid) { + sdbRelease(pSdb, pVgroup); + continue; + } int32_t contLen = 0; void *pReq = mndBuildVDropStbReq(pMnode, pVgroup, pStb, &contLen); diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 840326d3183bd889ef71368bd6c32dee9dc20e74..ff64a3cdd5972f05e4e99e9e3ab667d6a996a0ba 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -134,8 +134,6 @@ static int32_t mndBuildRebalanceMsg(void **pBuf, int32_t *pLen, const SMqConsume static int32_t mndPersistRebalanceMsg(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp) { ASSERT(pConsumerEp->oldConsumerId != -1); - int32_t vgId = pConsumerEp->vgId; - SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId); void *buf; int32_t tlen; @@ -143,6 +141,9 @@ static int32_t mndPersistRebalanceMsg(SMnode *pMnode, STrans *pTrans, const SMqC return -1; } + int32_t vgId = pConsumerEp->vgId; + SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId); + STransAction action = {0}; action.epSet = mndGetVgroupEpset(pMnode, pVgObj); action.pCont = buf; @@ -180,15 +181,15 @@ static int32_t mndBuildCancelConnReq(void **pBuf, int32_t *pLen, const SMqConsum } static int32_t mndPersistCancelConnReq(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp) { - int32_t vgId = pConsumerEp->vgId; - SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId); - void *buf; int32_t tlen; if (mndBuildCancelConnReq(&buf, &tlen, pConsumerEp) < 0) { return -1; } + int32_t vgId = pConsumerEp->vgId; + SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId); + STransAction action = {0}; action.epSet = mndGetVgroupEpset(pMnode, pVgObj); action.pCont = buf; @@ -714,7 +715,10 @@ static int32_t mndInitUnassignedVg(SMnode *pMnode, const SMqTopicObj *pTopic, SM while (1) { pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); if (pIter == NULL) break; - if (pVgroup->dbUid != pTopic->dbUid) continue; + if (pVgroup->dbUid != pTopic->dbUid) { + sdbRelease(pSdb, pVgroup); + continue; + } pSub->vgNum++; plan->execNode.nodeId = pVgroup->vgId; @@ -748,7 +752,6 @@ static int32_t mndPersistMqSetConnReq(SMnode *pMnode, STrans *pTrans, const SMqT const SMqConsumerEp *pConsumerEp) { ASSERT(pConsumerEp->oldConsumerId == -1); int32_t vgId = pConsumerEp->vgId; - SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId); SMqSetCVgReq req = { .vgId = vgId, @@ -776,6 +779,8 @@ static int32_t mndPersistMqSetConnReq(SMnode *pMnode, STrans *pTrans, const SMqT void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); tEncodeSMqSetCVgReq(&abuf, &req); + SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId); + STransAction action = {0}; action.epSet = mndGetVgroupEpset(pMnode, pVgObj); action.pCont = buf; diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 03ac7ec0751622f3b6b3e48c99e04398643105cf..99a5a3a47520811fa3186292f799cd785d64e6bb 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -15,10 +15,13 @@ #define _DEFAULT_SOURCE #include "mndTrans.h" +#include "mndAuth.h" +#include "mndShow.h" #include "mndSync.h" +#include "mndUser.h" -#define MND_TRANS_VER_NUMBER 1 -#define MND_TRANS_ARRAY_SIZE 8 +#define MND_TRANS_VER_NUMBER 1 +#define MND_TRANS_ARRAY_SIZE 8 #define MND_TRANS_RESERVE_SIZE 64 static SSdbRaw *mndTransActionEncode(STrans *pTrans); @@ -53,6 +56,10 @@ static void mndTransExecute(SMnode *pMnode, STrans *pTrans); static void mndTransSendRpcRsp(STrans *pTrans); static int32_t mndProcessTransReq(SMnodeMsg *pMsg); +static int32_t mndGetTransMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); +static int32_t mndRetrieveTrans(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows); +static void mndCancelGetNextTrans(SMnode *pMnode, void *pIter); + int32_t mndInitTrans(SMnode *pMnode) { SSdbTable table = {.sdbType = SDB_TRANS, .keyType = SDB_KEY_INT32, @@ -63,6 +70,10 @@ int32_t mndInitTrans(SMnode *pMnode) { .deleteFp = (SdbDeleteFp)mndTransActionDelete}; mndSetMsgHandle(pMnode, TDMT_MND_TRANS, mndProcessTransReq); + + mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_TRANS, mndGetTransMeta); + mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TRANS, mndRetrieveTrans); + mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TRANS, mndCancelGetNextTrans); return sdbSetTable(pMnode->pSdb, table); } @@ -321,6 +332,15 @@ static const char *mndTransStr(ETrnStage stage) { } } +static const char *mndTransType(ETrnType type) { + switch (type) { + case TRN_TYPE_CREATE_DB: + return "create-db"; + default: + return "invalid"; + } +} + static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans) { // pTrans->stage = TRN_STAGE_PREPARE; mTrace("trans:%d, perform insert action, row:%p stage:%s", pTrans->id, pTrans, mndTransStr(pTrans->stage)); @@ -334,7 +354,7 @@ static void mndTransDropData(STrans *pTrans) { mndTransDropActions(pTrans->redoActions); mndTransDropActions(pTrans->undoActions); if (pTrans->rpcRsp != NULL) { - rpcFreeCont(pTrans->rpcRsp); + free(pTrans->rpcRsp); pTrans->rpcRsp = NULL; pTrans->rpcRspLen = 0; } @@ -988,3 +1008,122 @@ void mndTransPullup(SMnode *pMnode) { sdbWriteFile(pMnode->pSdb); } + +static int32_t mndGetTransMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) { + SMnode *pMnode = pReq->pMnode; + SSdb *pSdb = pMnode->pSdb; + + int32_t cols = 0; + SSchema *pSchema = pMeta->pSchemas; + + pShow->bytes[cols] = 4; + pSchema[cols].type = TSDB_DATA_TYPE_INT; + strcpy(pSchema[cols].name, "id"); + pSchema[cols].bytes = pShow->bytes[cols]; + cols++; + + pShow->bytes[cols] = 8; + pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP; + strcpy(pSchema[cols].name, "create_time"); + pSchema[cols].bytes = pShow->bytes[cols]; + cols++; + + pShow->bytes[cols] = TSDB_TRANS_STAGE_LEN + VARSTR_HEADER_SIZE; + pSchema[cols].type = TSDB_DATA_TYPE_BINARY; + strcpy(pSchema[cols].name, "stage"); + pSchema[cols].bytes = pShow->bytes[cols]; + cols++; + + pShow->bytes[cols] = (TSDB_DB_NAME_LEN - 1) + VARSTR_HEADER_SIZE; + pSchema[cols].type = TSDB_DATA_TYPE_BINARY; + strcpy(pSchema[cols].name, "db"); + pSchema[cols].bytes = pShow->bytes[cols]; + cols++; + + pShow->bytes[cols] = (TSDB_TRANS_DESC_LEN - 1) + VARSTR_HEADER_SIZE; + pSchema[cols].type = TSDB_DATA_TYPE_BINARY; + strcpy(pSchema[cols].name, "type"); + pSchema[cols].bytes = pShow->bytes[cols]; + cols++; + + pShow->bytes[cols] = 8; + pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP; + strcpy(pSchema[cols].name, "last_exec_time"); + pSchema[cols].bytes = pShow->bytes[cols]; + cols++; + + pShow->bytes[cols] = (TSDB_TRANS_ERROR_LEN - 1) + VARSTR_HEADER_SIZE; + pSchema[cols].type = TSDB_DATA_TYPE_BINARY; + strcpy(pSchema[cols].name, "last_error"); + pSchema[cols].bytes = pShow->bytes[cols]; + cols++; + + pMeta->numOfColumns = cols; + pShow->numOfColumns = cols; + + pShow->offset[0] = 0; + for (int32_t i = 1; i < cols; ++i) { + pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1]; + } + + pShow->numOfRows = sdbGetSize(pSdb, SDB_TRANS); + pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; + strcpy(pMeta->tbName, mndShowStr(pShow->type)); + return 0; +} + +static int32_t mndRetrieveTrans(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { + SMnode *pMnode = pReq->pMnode; + SSdb *pSdb = pMnode->pSdb; + int32_t numOfRows = 0; + STrans *pTrans = NULL; + int32_t cols = 0; + char *pWrite; + + while (numOfRows < rows) { + pShow->pIter = sdbFetch(pSdb, SDB_TRANS, pShow->pIter, (void **)&pTrans); + if (pShow->pIter == NULL) break; + + cols = 0; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + *(int32_t *)pWrite = pTrans->id; + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + *(int64_t *)pWrite = pTrans->createdTime; + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + STR_TO_VARSTR(pWrite, mndTransStr(pTrans->stage)); + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + STR_TO_VARSTR(pWrite, pTrans->dbname); + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + STR_TO_VARSTR(pWrite, mndTransType(pTrans->transType)); + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + *(int64_t *)pWrite = pTrans->lastExecTime; + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + STR_TO_VARSTR(pWrite, pTrans->lastError); + cols++; + + numOfRows++; + sdbRelease(pSdb, pTrans); + } + + mndVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow); + pShow->numOfReads += numOfRows; + return numOfRows; +} + +static void mndCancelGetNextTrans(SMnode *pMnode, void *pIter) { + SSdb *pSdb = pMnode->pSdb; + sdbCancelFetch(pSdb, pIter); +} diff --git a/source/dnode/mnode/sdb/src/sdbHash.c b/source/dnode/mnode/sdb/src/sdbHash.c index 315268a9e37f60185947f67db9b48eec2d5eee93..204cd870f4b01683891f6b04155f7a2b402292b0 100644 --- a/source/dnode/mnode/sdb/src/sdbHash.c +++ b/source/dnode/mnode/sdb/src/sdbHash.c @@ -16,6 +16,8 @@ #define _DEFAULT_SOURCE #include "sdbInt.h" +static void sdbCheck(SSdb *pSdb, SSdbRow *pRow); + const char *sdbTableName(ESdbType type) { switch (type) { case SDB_TRANS: @@ -221,6 +223,8 @@ static int32_t sdbDeleteRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow * pSdb->tableVer[pOldRow->type]++; sdbFreeRow(pSdb, pRow); + + sdbCheck(pSdb, pOldRow); // sdbRelease(pSdb, pOldRow->pObj); return 0; } @@ -305,6 +309,19 @@ void *sdbAcquire(SSdb *pSdb, ESdbType type, const void *pKey) { return pRet; } +static void sdbCheck(SSdb *pSdb, SSdbRow *pRow) { + SRWLatch *pLock = &pSdb->locks[pRow->type]; + taosRLockLatch(pLock); + + int32_t ref = atomic_load_32(&pRow->refCount); + sdbPrintOper(pSdb, pRow, "checkRow"); + if (ref <= 0 && pRow->status == SDB_STATUS_DROPPED) { + sdbFreeRow(pSdb, pRow); + } + + taosRUnLockLatch(pLock); +} + void sdbRelease(SSdb *pSdb, void *pObj) { if (pObj == NULL) return; @@ -332,6 +349,7 @@ void *sdbFetch(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj) { SRWLatch *pLock = &pSdb->locks[type]; taosRLockLatch(pLock); +#if 0 if (pIter != NULL) { SSdbRow *pLastRow = *(SSdbRow **)pIter; int32_t ref = atomic_load_32(&pLastRow->refCount); @@ -339,6 +357,7 @@ void *sdbFetch(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj) { sdbFreeRow(pSdb, pLastRow); } } +#endif SSdbRow **ppRow = taosHashIterate(hash, pIter); while (ppRow != NULL) { diff --git a/source/libs/parser/inc/sql.y b/source/libs/parser/inc/sql.y index 1222138b5e4c7f09b9ed4c9d21ed4bdf70122f0a..06154ca19240d776a98eb6c9703e91f142433d8d 100644 --- a/source/libs/parser/inc/sql.y +++ b/source/libs/parser/inc/sql.y @@ -76,7 +76,7 @@ cmd ::= SHOW QUERIES. { setShowOptions(pInfo, TSDB_MGMT_TABLE_QUERIES, 0, 0); cmd ::= SHOW CONNECTIONS.{ setShowOptions(pInfo, TSDB_MGMT_TABLE_CONNS, 0, 0);} cmd ::= SHOW STREAMS. { setShowOptions(pInfo, TSDB_MGMT_TABLE_STREAMS, 0, 0); } cmd ::= SHOW VARIABLES. { setShowOptions(pInfo, TSDB_MGMT_TABLE_VARIABLES, 0, 0); } -cmd ::= SHOW SCORES. { setShowOptions(pInfo, TSDB_MGMT_TABLE_SCORES, 0, 0); } +cmd ::= SHOW SCORES. { setShowOptions(pInfo, TSDB_MGMT_TABLE_TRANS, 0, 0); } cmd ::= SHOW GRANTS. { setShowOptions(pInfo, TSDB_MGMT_TABLE_GRANTS, 0, 0); } cmd ::= SHOW VNODES. { setShowOptions(pInfo, TSDB_MGMT_TABLE_VNODES, 0, 0); } diff --git a/source/libs/parser/src/sql.c b/source/libs/parser/src/sql.c index 664f2a3ff2c124afea9ecec79ecdae9d6339e415..f6585e41fffd79b62d7d332480f0bf5221d44137 100644 --- a/source/libs/parser/src/sql.c +++ b/source/libs/parser/src/sql.c @@ -2262,7 +2262,7 @@ static void yy_reduce( { setShowOptions(pInfo, TSDB_MGMT_TABLE_VARIABLES, 0, 0); } break; case 13: /* cmd ::= SHOW SCORES */ -{ setShowOptions(pInfo, TSDB_MGMT_TABLE_SCORES, 0, 0); } +{ setShowOptions(pInfo, TSDB_MGMT_TABLE_TRANS, 0, 0); } break; case 14: /* cmd ::= SHOW GRANTS */ { setShowOptions(pInfo, TSDB_MGMT_TABLE_GRANTS, 0, 0); }