提交 4076520a 编写于 作者: S Shengliang Guan

feat: make show transactions and kill transaction work

上级 495bc3a3
...@@ -23,7 +23,6 @@ extern "C" { ...@@ -23,7 +23,6 @@ extern "C" {
#define TDENGINE_SYSTABLE_H #define TDENGINE_SYSTABLE_H
#define TSDB_INFORMATION_SCHEMA_DB "information_schema" #define TSDB_INFORMATION_SCHEMA_DB "information_schema"
#define TSDB_PERFORMANCE_SCHEMA_DB "performance_schema"
#define TSDB_INS_TABLE_DNODES "dnodes" #define TSDB_INS_TABLE_DNODES "dnodes"
#define TSDB_INS_TABLE_MNODES "mnodes" #define TSDB_INS_TABLE_MNODES "mnodes"
#define TSDB_INS_TABLE_MODULES "modules" #define TSDB_INS_TABLE_MODULES "modules"
...@@ -44,27 +43,27 @@ extern "C" { ...@@ -44,27 +43,27 @@ extern "C" {
#define TSDB_INS_TABLE_VNODES "vnodes" #define TSDB_INS_TABLE_VNODES "vnodes"
#define TSDB_INS_TABLE_CONFIGS "configs" #define TSDB_INS_TABLE_CONFIGS "configs"
#define TSDB_PERFORMANCE_SCHEMA_DB "performance_schema" #define TSDB_PERFORMANCE_SCHEMA_DB "performance_schema"
#define TSDB_PERFS_TABLE_SMAS "smas" #define TSDB_PERFS_TABLE_SMAS "smas"
#define TSDB_PERFS_TABLE_SUBSCRIBES "subscribes" #define TSDB_PERFS_TABLE_SUBSCRIBES "subscribes"
#define TSDB_PERFS_TABLE_CONNECTIONS "connections" #define TSDB_PERFS_TABLE_CONNECTIONS "connections"
#define TSDB_PERFS_TABLE_QUERIES "queries" #define TSDB_PERFS_TABLE_QUERIES "queries"
#define TSDB_PERFS_TABLE_TOPICS "topics" #define TSDB_PERFS_TABLE_TOPICS "topics"
#define TSDB_PERFS_TABLE_CONSUMERS "consumers" #define TSDB_PERFS_TABLE_CONSUMERS "consumers"
#define TSDB_PERFS_TABLE_SUBSCRIPTIONS "subscriptions" #define TSDB_PERFS_TABLE_SUBSCRIPTIONS "subscriptions"
#define TSDB_PERFS_TABLE_OFFSETS "offsets" #define TSDB_PERFS_TABLE_OFFSETS "offsets"
#define TSDB_PERFS_TABLE_TRANS "trans" #define TSDB_PERFS_TABLE_TRANS "trans"
#define TSDB_PERFS_TABLE_STREAMS "streams" #define TSDB_PERFS_TABLE_STREAMS "streams"
typedef struct SSysDbTableSchema { typedef struct SSysDbTableSchema {
const char *name; const char* name;
const int32_t type; const int32_t type;
const int32_t bytes; const int32_t bytes;
} SSysDbTableSchema; } SSysDbTableSchema;
typedef struct SSysTableMeta { typedef struct SSysTableMeta {
const char *name; const char* name;
const SSysDbTableSchema *schema; const SSysDbTableSchema* schema;
const int32_t colNum; const int32_t colNum;
} SSysTableMeta; } SSysTableMeta;
......
...@@ -17,8 +17,8 @@ ...@@ -17,8 +17,8 @@
#define _TD_COMMON_GLOBAL_H_ #define _TD_COMMON_GLOBAL_H_
#include "tarray.h" #include "tarray.h"
#include "tdef.h"
#include "tconfig.h" #include "tconfig.h"
#include "tdef.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
...@@ -121,15 +121,16 @@ extern char tsCompressor[]; ...@@ -121,15 +121,16 @@ extern char tsCompressor[];
extern int32_t tsDiskCfgNum; extern int32_t tsDiskCfgNum;
extern SDiskCfg tsDiskCfg[]; extern SDiskCfg tsDiskCfg[];
// internal // internal
extern int32_t tsTransPullupMs; extern int32_t tsTransPullupInterval;
extern int32_t tsMaRebalanceMs; extern int32_t tsMqRebalanceInterval;
#define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize) #define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize)
int32_t taosCreateLog(const char *logname, int32_t logFileNum, const char *cfgDir, const char **envCmd, const char *envFile, int32_t taosCreateLog(const char *logname, int32_t logFileNum, const char *cfgDir, const char **envCmd,
char *apolloUrl, SArray *pArgs, bool tsc); const char *envFile, char *apolloUrl, SArray *pArgs, bool tsc);
int32_t taosInitCfg(const char *cfgDir, const char **envCmd, const char *envFile, char *apolloUrl, SArray *pArgs, bool tsc); int32_t taosInitCfg(const char *cfgDir, const char **envCmd, const char *envFile, char *apolloUrl, SArray *pArgs,
bool tsc);
void taosCleanupCfg(); void taosCleanupCfg();
void taosCfgDynamicOptions(const char *option, const char *value); void taosCfgDynamicOptions(const char *option, const char *value);
void taosAddDataDir(int32_t index, char *v1, int32_t level, int32_t primary); void taosAddDataDir(int32_t index, char *v1, int32_t level, int32_t primary);
......
...@@ -211,6 +211,7 @@ static const SSysDbTableSchema transSchema[] = { ...@@ -211,6 +211,7 @@ static const SSysDbTableSchema transSchema[] = {
{.name = "stage", .bytes = TSDB_TRANS_STAGE_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "stage", .bytes = TSDB_TRANS_STAGE_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "db", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "db", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "type", .bytes = TSDB_TRANS_TYPE_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "type", .bytes = TSDB_TRANS_TYPE_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "failed_times", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
{.name = "last_exec_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP}, {.name = "last_exec_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
{.name = "last_error", .bytes = (TSDB_TRANS_ERROR_LEN - 1) + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "last_error", .bytes = (TSDB_TRANS_ERROR_LEN - 1) + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
}; };
......
...@@ -170,8 +170,8 @@ uint32_t tsCurRange = 100; // range ...@@ -170,8 +170,8 @@ uint32_t tsCurRange = 100; // range
char tsCompressor[32] = "ZSTD_COMPRESSOR"; // ZSTD_COMPRESSOR or GZIP_COMPRESSOR char tsCompressor[32] = "ZSTD_COMPRESSOR"; // ZSTD_COMPRESSOR or GZIP_COMPRESSOR
// internal // internal
int32_t tsTransPullupMs = 6000; int32_t tsTransPullupInterval = 6;
int32_t tsMaRebalanceMs = 2000; int32_t tsMqRebalanceInterval = 2;
void taosAddDataDir(int32_t index, char *v1, int32_t level, int32_t primary) { void taosAddDataDir(int32_t index, char *v1, int32_t level, int32_t primary) {
tstrncpy(tsDiskCfg[index].dir, v1, TSDB_FILENAME_LEN); tstrncpy(tsDiskCfg[index].dir, v1, TSDB_FILENAME_LEN);
...@@ -438,6 +438,9 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { ...@@ -438,6 +438,9 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddString(pCfg, "telemetryServer", tsTelemServer, 0) != 0) return -1; if (cfgAddString(pCfg, "telemetryServer", tsTelemServer, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "telemetryPort", tsTelemPort, 1, 65056, 0) != 0) return -1; if (cfgAddInt32(pCfg, "telemetryPort", tsTelemPort, 1, 65056, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "transPullupInterval", tsTransPullupInterval, 1, 10000, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "mqRebalanceInterval", tsMqRebalanceInterval, 1, 10000, 1) != 0) return -1;
return 0; return 0;
} }
...@@ -575,6 +578,9 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { ...@@ -575,6 +578,9 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tstrncpy(tsTelemServer, cfgGetItem(pCfg, "telemetryServer")->str, TSDB_FQDN_LEN); tstrncpy(tsTelemServer, cfgGetItem(pCfg, "telemetryServer")->str, TSDB_FQDN_LEN);
tsTelemPort = (uint16_t)cfgGetItem(pCfg, "telemetryPort")->i32; tsTelemPort = (uint16_t)cfgGetItem(pCfg, "telemetryPort")->i32;
tsTransPullupInterval = cfgGetItem(pCfg, "transPullupInterval")->i32;
tsMqRebalanceInterval = cfgGetItem(pCfg, "mqRebalanceInterval")->i32;
if (tsQueryBufferSize >= 0) { if (tsQueryBufferSize >= 0) {
tsQueryBufferSizeBytes = tsQueryBufferSize * 1048576UL; tsQueryBufferSizeBytes = tsQueryBufferSize * 1048576UL;
} }
......
...@@ -1405,15 +1405,18 @@ static int32_t mndRetrieveTrans(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pB ...@@ -1405,15 +1405,18 @@ static int32_t mndRetrieveTrans(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pB
colDataAppend(pColInfo, numOfRows, (const char *)dbname, false); colDataAppend(pColInfo, numOfRows, (const char *)dbname, false);
char type[TSDB_TRANS_TYPE_LEN + VARSTR_HEADER_SIZE] = {0}; char type[TSDB_TRANS_TYPE_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(dbname, mndTransType(pTrans->type), pShow->pMeta->pSchemas[cols].bytes); STR_WITH_MAXSIZE_TO_VARSTR(type, mndTransType(pTrans->type), pShow->pMeta->pSchemas[cols].bytes);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)type, false); colDataAppend(pColInfo, numOfRows, (const char *)type, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pTrans->failedTimes, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pTrans->lastExecTime, false); colDataAppend(pColInfo, numOfRows, (const char *)&pTrans->lastExecTime, false);
char lastError[TSDB_TRANS_ERROR_LEN + VARSTR_HEADER_SIZE] = {0}; char lastError[TSDB_TRANS_ERROR_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(dbname, pTrans->lastError, pShow->pMeta->pSchemas[cols].bytes); STR_WITH_MAXSIZE_TO_VARSTR(lastError, pTrans->lastError, pShow->pMeta->pSchemas[cols].bytes);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)lastError, false); colDataAppend(pColInfo, numOfRows, (const char *)lastError, false);
......
...@@ -65,7 +65,7 @@ static void mndPullupTrans(void *param, void *tmrId) { ...@@ -65,7 +65,7 @@ static void mndPullupTrans(void *param, void *tmrId) {
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
} }
taosTmrReset(mndPullupTrans, tsTransPullupMs, pMnode, pMnode->timer, &pMnode->transTimer); taosTmrReset(mndPullupTrans, tsTransPullupInterval * 1000, pMnode, pMnode->timer, &pMnode->transTimer);
} }
static void mndCalMqRebalance(void *param, void *tmrId) { static void mndCalMqRebalance(void *param, void *tmrId) {
...@@ -81,7 +81,7 @@ static void mndCalMqRebalance(void *param, void *tmrId) { ...@@ -81,7 +81,7 @@ static void mndCalMqRebalance(void *param, void *tmrId) {
tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg); tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
} }
taosTmrReset(mndCalMqRebalance, tsMaRebalanceMs, pMnode, pMnode->timer, &pMnode->mqTimer); taosTmrReset(mndCalMqRebalance, tsMqRebalanceInterval * 1000, pMnode, pMnode->timer, &pMnode->mqTimer);
} }
static void mndPullupTelem(void *param, void *tmrId) { static void mndPullupTelem(void *param, void *tmrId) {
...@@ -103,12 +103,12 @@ static int32_t mndInitTimer(SMnode *pMnode) { ...@@ -103,12 +103,12 @@ static int32_t mndInitTimer(SMnode *pMnode) {
return -1; return -1;
} }
if (taosTmrReset(mndPullupTrans, tsTransPullupMs, pMnode, pMnode->timer, &pMnode->transTimer)) { if (taosTmrReset(mndPullupTrans, tsTransPullupInterval * 1000, pMnode, pMnode->timer, &pMnode->transTimer)) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
if (taosTmrReset(mndCalMqRebalance, tsMaRebalanceMs, pMnode, pMnode->timer, &pMnode->mqTimer)) { if (taosTmrReset(mndCalMqRebalance, tsMqRebalanceInterval * 1000, pMnode, pMnode->timer, &pMnode->mqTimer)) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
......
...@@ -58,7 +58,7 @@ class MndTestTrans2 : public ::testing::Test { ...@@ -58,7 +58,7 @@ class MndTestTrans2 : public ::testing::Test {
strcpy(opt.replicas[0].fqdn, "localhost"); strcpy(opt.replicas[0].fqdn, "localhost");
opt.msgCb = msgCb; opt.msgCb = msgCb;
tsTransPullupMs = 1000; tsTransPullupInterval = 1;
const char *mnodepath = "/tmp/mnode_test_trans"; const char *mnodepath = "/tmp/mnode_test_trans";
taosRemoveDir(mnodepath); taosRemoveDir(mnodepath);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册