提交 f4ec009a 编写于 作者: S shenglian zhou

Merge branch '3.0' of github.com:taosdata/TDengine into feature/3_liaohj

...@@ -829,6 +829,14 @@ typedef struct { ...@@ -829,6 +829,14 @@ typedef struct {
int32_t tSerializeSQnodeListReq(void* buf, int32_t bufLen, SQnodeListReq* pReq); int32_t tSerializeSQnodeListReq(void* buf, int32_t bufLen, SQnodeListReq* pReq);
int32_t tDeserializeSQnodeListReq(void* buf, int32_t bufLen, SQnodeListReq* pReq); int32_t tDeserializeSQnodeListReq(void* buf, int32_t bufLen, SQnodeListReq* pReq);
typedef struct {
int32_t rowNum;
} SDnodeListReq;
int32_t tSerializeSDnodeListReq(void* buf, int32_t bufLen, SDnodeListReq* pReq);
int32_t tDeserializeSDnodeListReq(void* buf, int32_t bufLen, SDnodeListReq* pReq);
typedef struct SQueryNodeAddr { typedef struct SQueryNodeAddr {
int32_t nodeId; // vgId or qnodeId int32_t nodeId; // vgId or qnodeId
SEpSet epSet; SEpSet epSet;
...@@ -847,6 +855,15 @@ int32_t tSerializeSQnodeListRsp(void* buf, int32_t bufLen, SQnodeListRsp* pRsp); ...@@ -847,6 +855,15 @@ int32_t tSerializeSQnodeListRsp(void* buf, int32_t bufLen, SQnodeListRsp* pRsp);
int32_t tDeserializeSQnodeListRsp(void* buf, int32_t bufLen, SQnodeListRsp* pRsp); int32_t tDeserializeSQnodeListRsp(void* buf, int32_t bufLen, SQnodeListRsp* pRsp);
void tFreeSQnodeListRsp(SQnodeListRsp* pRsp); void tFreeSQnodeListRsp(SQnodeListRsp* pRsp);
typedef struct {
SArray* dnodeList; // SArray<SEpSet>
} SDnodeListRsp;
int32_t tSerializeSDnodeListRsp(void* buf, int32_t bufLen, SDnodeListRsp* pRsp);
int32_t tDeserializeSDnodeListRsp(void* buf, int32_t bufLen, SDnodeListRsp* pRsp);
void tFreeSDnodeListRsp(SDnodeListRsp* pRsp);
typedef struct { typedef struct {
SArray* pArray; // Array of SUseDbRsp SArray* pArray; // Array of SUseDbRsp
} SUseDbBatchRsp; } SUseDbBatchRsp;
......
...@@ -81,6 +81,7 @@ enum { ...@@ -81,6 +81,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_DND_SERVER_STATUS, "server-status", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_DND_SERVER_STATUS, "server-status", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_DND_NET_TEST, "net-test", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_DND_NET_TEST, "net-test", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_DND_CONFIG_DNODE, "config-dnode", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_DND_CONFIG_DNODE, "config-dnode", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_DND_SYSTABLE_RETRIEVE, "dnode-retrieve", NULL, NULL)
TD_NEW_MSG_SEG(TDMT_MND_MSG) TD_NEW_MSG_SEG(TDMT_MND_MSG)
TD_DEF_MSG_TYPE(TDMT_MND_CONNECT, "connect", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_CONNECT, "connect", NULL, NULL)
...@@ -101,6 +102,7 @@ enum { ...@@ -101,6 +102,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_MND_ALTER_QNODE, "alter-qnode", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_ALTER_QNODE, "alter-qnode", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_DROP_QNODE, "drop-qnode", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_DROP_QNODE, "drop-qnode", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_QNODE_LIST, "qnode-list", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_QNODE_LIST, "qnode-list", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_DNODE_LIST, "dnode-list", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_SNODE, "create-snode", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_CREATE_SNODE, "create-snode", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_ALTER_SNODE, "alter-snode", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_ALTER_SNODE, "alter-snode", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_DROP_SNODE, "drop-snode", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_DROP_SNODE, "drop-snode", NULL, NULL)
...@@ -152,7 +154,7 @@ enum { ...@@ -152,7 +154,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_MND_HEARTBEAT, "heartbeat", SClientHbBatchReq, SClientHbBatchRsp) TD_DEF_MSG_TYPE(TDMT_MND_HEARTBEAT, "heartbeat", SClientHbBatchReq, SClientHbBatchRsp)
TD_DEF_MSG_TYPE(TDMT_MND_STATUS, "status", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_STATUS, "status", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_SHOW, "show", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_SHOW, "show", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_SYSTABLE_RETRIEVE, "retrieve", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_SYSTABLE_RETRIEVE, "mnd-retrieve", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_GRANT, "grant", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_GRANT, "grant", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_AUTH, "auth", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_AUTH, "auth", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_APPLY_MSG, "mnode-apply", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_APPLY_MSG, "mnode-apply", NULL, NULL)
......
...@@ -36,6 +36,10 @@ extern "C" { ...@@ -36,6 +36,10 @@ extern "C" {
#define SHOW_CREATE_TB_RESULT_FIELD1_LEN (TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE) #define SHOW_CREATE_TB_RESULT_FIELD1_LEN (TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE)
#define SHOW_CREATE_TB_RESULT_FIELD2_LEN (TSDB_MAX_BINARY_LEN + VARSTR_HEADER_SIZE) #define SHOW_CREATE_TB_RESULT_FIELD2_LEN (TSDB_MAX_BINARY_LEN + VARSTR_HEADER_SIZE)
#define SHOW_LOCAL_VARIABLES_RESULT_COLS 2
#define SHOW_LOCAL_VARIABLES_RESULT_FIELD1_LEN (TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE)
#define SHOW_LOCAL_VARIABLES_RESULT_FIELD2_LEN (TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE)
#define PRIVILEGE_TYPE_MASK(n) (1 << n) #define PRIVILEGE_TYPE_MASK(n) (1 << n)
......
...@@ -26,8 +26,9 @@ extern "C" { ...@@ -26,8 +26,9 @@ extern "C" {
extern bool gRaftDetailLog; extern bool gRaftDetailLog;
#define SYNC_INDEX_BEGIN 0 #define SYNC_INDEX_BEGIN 0
#define SYNC_INDEX_INVALID -1 #define SYNC_INDEX_INVALID -1
#define SYNC_TERM_INVALID 0xFFFFFFFFFFFFFFFF
typedef uint64_t SyncNodeId; typedef uint64_t SyncNodeId;
typedef int32_t SyncGroupId; typedef int32_t SyncGroupId;
...@@ -199,7 +200,7 @@ const char* syncGetMyRoleStr(int64_t rid); ...@@ -199,7 +200,7 @@ const char* syncGetMyRoleStr(int64_t rid);
SyncTerm syncGetMyTerm(int64_t rid); SyncTerm syncGetMyTerm(int64_t rid);
SyncGroupId syncGetVgId(int64_t rid); SyncGroupId syncGetVgId(int64_t rid);
void syncGetEpSet(int64_t rid, SEpSet* pEpSet); void syncGetEpSet(int64_t rid, SEpSet* pEpSet);
int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak); int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak);
bool syncEnvIsStart(); bool syncEnvIsStart();
const char* syncStr(ESyncState state); const char* syncStr(ESyncState state);
bool syncIsRestoreFinish(int64_t rid); bool syncIsRestoreFinish(int64_t rid);
......
...@@ -43,7 +43,7 @@ void setElectTimerMS(int64_t rid, int32_t electTimerMS); ...@@ -43,7 +43,7 @@ void setElectTimerMS(int64_t rid, int32_t electTimerMS);
void setHeartbeatTimerMS(int64_t rid, int32_t hbTimerMS); void setHeartbeatTimerMS(int64_t rid, int32_t hbTimerMS);
// for compatibility, the same as syncPropose // for compatibility, the same as syncPropose
int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pMsg, bool isWeak); int32_t syncForwardToPeer(int64_t rid, SRpcMsg* pMsg, bool isWeak);
// utils // utils
const char* syncUtilState2String(ESyncState state); const char* syncUtilState2String(ESyncState state);
...@@ -468,7 +468,7 @@ typedef struct SyncLeaderTransfer { ...@@ -468,7 +468,7 @@ typedef struct SyncLeaderTransfer {
SRaftId destId; SRaftId destId;
*/ */
SNodeInfo newNodeInfo; SNodeInfo newNodeInfo;
SRaftId newLeaderId; SRaftId newLeaderId;
} SyncLeaderTransfer; } SyncLeaderTransfer;
SyncLeaderTransfer* syncLeaderTransferBuild(int32_t vgId); SyncLeaderTransfer* syncLeaderTransferBuild(int32_t vgId);
...@@ -489,17 +489,16 @@ void syncLeaderTransferPrint2(char* s, const SyncLeaderTransfer* pMsg); ...@@ -489,17 +489,16 @@ void syncLeaderTransferPrint2(char* s, const SyncLeaderTransfer* pMsg);
void syncLeaderTransferLog(const SyncLeaderTransfer* pMsg); void syncLeaderTransferLog(const SyncLeaderTransfer* pMsg);
void syncLeaderTransferLog2(char* s, const SyncLeaderTransfer* pMsg); void syncLeaderTransferLog2(char* s, const SyncLeaderTransfer* pMsg);
// --------------------------------------------- // ---------------------------------------------
typedef struct SyncReconfigFinish { typedef struct SyncReconfigFinish {
uint32_t bytes; uint32_t bytes;
int32_t vgId; int32_t vgId;
uint32_t msgType; uint32_t msgType;
SSyncCfg oldCfg; SSyncCfg oldCfg;
SSyncCfg newCfg; SSyncCfg newCfg;
SyncIndex newCfgIndex; SyncIndex newCfgIndex;
SyncTerm newCfgTerm; SyncTerm newCfgTerm;
uint64_t newCfgSeqNum; uint64_t newCfgSeqNum;
} SyncReconfigFinish; } SyncReconfigFinish;
...@@ -521,8 +520,6 @@ void syncReconfigFinishPrint2(char* s, const SyncReconfigFinish* pMsg); ...@@ -521,8 +520,6 @@ void syncReconfigFinishPrint2(char* s, const SyncReconfigFinish* pMsg);
void syncReconfigFinishLog(const SyncReconfigFinish* pMsg); void syncReconfigFinishLog(const SyncReconfigFinish* pMsg);
void syncReconfigFinishLog2(char* s, const SyncReconfigFinish* pMsg); void syncReconfigFinishLog2(char* s, const SyncReconfigFinish* pMsg);
// on message ---------------------- // on message ----------------------
int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg); int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg);
int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg); int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg);
......
...@@ -104,6 +104,8 @@ int32_t cfgAddTimezone(SConfig *pCfg, const char *name, const char *defaultVal); ...@@ -104,6 +104,8 @@ int32_t cfgAddTimezone(SConfig *pCfg, const char *name, const char *defaultVal);
const char *cfgStypeStr(ECfgSrcType type); const char *cfgStypeStr(ECfgSrcType type);
const char *cfgDtypeStr(ECfgDataType type); const char *cfgDtypeStr(ECfgDataType type);
void cfgDumpItemValue(SConfigItem *pItem, char* buf, int32_t bufSize, int32_t* pLen);
void cfgDumpCfg(SConfig *pCfg, bool tsc, bool dump); void cfgDumpCfg(SConfig *pCfg, bool tsc, bool dump);
int32_t cfgGetApollUrl(const char **envCmd, const char *envFile, char* apolloUrl); int32_t cfgGetApollUrl(const char **envCmd, const char *envFile, char* apolloUrl);
......
...@@ -443,8 +443,8 @@ enum { ...@@ -443,8 +443,8 @@ enum {
#define VNODE_HANDLE -3 #define VNODE_HANDLE -3
#define BNODE_HANDLE -4 #define BNODE_HANDLE -4
#define TSDB_CONFIG_OPTION_LEN 16 #define TSDB_CONFIG_OPTION_LEN 32
#define TSDB_CONIIG_VALUE_LEN 48 #define TSDB_CONFIG_VALUE_LEN 64
#define TSDB_CONFIG_NUMBER 8 #define TSDB_CONFIG_NUMBER 8
#define QUERY_ID_SIZE 20 #define QUERY_ID_SIZE 20
......
...@@ -231,7 +231,13 @@ static const SSysDbTableSchema transSchema[] = { ...@@ -231,7 +231,13 @@ static const SSysDbTableSchema transSchema[] = {
static const SSysDbTableSchema configSchema[] = { static const SSysDbTableSchema configSchema[] = {
{.name = "name", .bytes = TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "name", .bytes = TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "value", .bytes = TSDB_CONIIG_VALUE_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "value", .bytes = TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
};
static const SSysDbTableSchema variablesSchema[] = {
{.name = "dnode_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
{.name = "name", .bytes = TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "value", .bytes = TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
}; };
static const SSysTableMeta infosMeta[] = { static const SSysTableMeta infosMeta[] = {
...@@ -253,6 +259,7 @@ static const SSysTableMeta infosMeta[] = { ...@@ -253,6 +259,7 @@ static const SSysTableMeta infosMeta[] = {
{TSDB_INS_TABLE_LICENCES, grantsSchema, tListLen(grantsSchema)}, {TSDB_INS_TABLE_LICENCES, grantsSchema, tListLen(grantsSchema)},
{TSDB_INS_TABLE_VGROUPS, vgroupsSchema, tListLen(vgroupsSchema)}, {TSDB_INS_TABLE_VGROUPS, vgroupsSchema, tListLen(vgroupsSchema)},
{TSDB_INS_TABLE_CONFIGS, configSchema, tListLen(configSchema)}, {TSDB_INS_TABLE_CONFIGS, configSchema, tListLen(configSchema)},
{TSDB_INS_TABLE_DNODE_VARIABLES, variablesSchema, tListLen(variablesSchema)},
}; };
static const SSysDbTableSchema connectionsSchema[] = { static const SSysDbTableSchema connectionsSchema[] = {
......
...@@ -2194,6 +2194,32 @@ int32_t tDeserializeSQnodeListReq(void *buf, int32_t bufLen, SQnodeListReq *pReq ...@@ -2194,6 +2194,32 @@ int32_t tDeserializeSQnodeListReq(void *buf, int32_t bufLen, SQnodeListReq *pReq
return 0; return 0;
} }
int32_t tSerializeSDnodeListReq(void *buf, int32_t bufLen, SDnodeListReq *pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeI32(&encoder, pReq->rowNum) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSDnodeListReq(void *buf, int32_t bufLen, SDnodeListReq *pReq) {
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->rowNum) < 0) return -1;
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
int32_t tSerializeSQnodeListRsp(void *buf, int32_t bufLen, SQnodeListRsp *pRsp) { int32_t tSerializeSQnodeListRsp(void *buf, int32_t bufLen, SQnodeListRsp *pRsp) {
SEncoder encoder = {0}; SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen); tEncoderInit(&encoder, buf, bufLen);
...@@ -2237,6 +2263,50 @@ int32_t tDeserializeSQnodeListRsp(void *buf, int32_t bufLen, SQnodeListRsp *pRsp ...@@ -2237,6 +2263,50 @@ int32_t tDeserializeSQnodeListRsp(void *buf, int32_t bufLen, SQnodeListRsp *pRsp
void tFreeSQnodeListRsp(SQnodeListRsp *pRsp) { taosArrayDestroy(pRsp->qnodeList); } void tFreeSQnodeListRsp(SQnodeListRsp *pRsp) { taosArrayDestroy(pRsp->qnodeList); }
int32_t tSerializeSDnodeListRsp(void *buf, int32_t bufLen, SDnodeListRsp *pRsp) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
int32_t num = taosArrayGetSize(pRsp->dnodeList);
if (tEncodeI32(&encoder, num) < 0) return -1;
for (int32_t i = 0; i < num; ++i) {
SEpSet *pEpSet = taosArrayGet(pRsp->dnodeList, i);
if (tEncodeSEpSet(&encoder, pEpSet) < 0) return -1;
}
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSDnodeListRsp(void *buf, int32_t bufLen, SDnodeListRsp *pRsp) {
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1;
int32_t num = 0;
if (tDecodeI32(&decoder, &num) < 0) return -1;
if (NULL == pRsp->dnodeList) {
pRsp->dnodeList = taosArrayInit(num, sizeof(SEpSet));
if (NULL == pRsp->dnodeList) return -1;
}
for (int32_t i = 0; i < num; ++i) {
SEpSet epSet = {0};
if (tDecodeSEpSet(&decoder, &epSet) < 0) return -1;
taosArrayPush(pRsp->dnodeList, &epSet);
}
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
void tFreeSDnodeListRsp(SDnodeListRsp *pRsp) { taosArrayDestroy(pRsp->dnodeList); }
int32_t tSerializeSCompactDbReq(void *buf, int32_t bufLen, SCompactDbReq *pReq) { int32_t tSerializeSCompactDbReq(void *buf, int32_t bufLen, SCompactDbReq *pReq) {
SEncoder encoder = {0}; SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen); tEncoderInit(&encoder, buf, bufLen);
......
...@@ -45,6 +45,7 @@ int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg); ...@@ -45,6 +45,7 @@ int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t dmProcessAuthRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t dmProcessAuthRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t dmProcessGrantRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t dmProcessGrantRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t dmProcessServerRunStatus(SDnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t dmProcessServerRunStatus(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t dmProcessRetrieve(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
// dmWorker.c // dmWorker.c
int32_t dmPutNodeMsgToMgmtQueue(SDnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t dmPutNodeMsgToMgmtQueue(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
......
...@@ -15,6 +15,10 @@ ...@@ -15,6 +15,10 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "dmInt.h" #include "dmInt.h"
#include "systable.h"
extern SConfig *tsCfg;
static void dmUpdateDnodeCfg(SDnodeMgmt *pMgmt, SDnodeCfg *pCfg) { static void dmUpdateDnodeCfg(SDnodeMgmt *pMgmt, SDnodeCfg *pCfg) {
if (pMgmt->pData->dnodeId == 0 || pMgmt->pData->clusterId == 0) { if (pMgmt->pData->dnodeId == 0 || pMgmt->pData->clusterId == 0) {
...@@ -175,6 +179,130 @@ int32_t dmProcessServerRunStatus(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) { ...@@ -175,6 +179,130 @@ int32_t dmProcessServerRunStatus(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return 0; return 0;
} }
SSDataBlock* dmBuildVariablesBlock(void) {
SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
size_t size = 0;
const SSysTableMeta* pMeta = NULL;
getInfosDbMeta(&pMeta, &size);
int32_t index = 0;
for (int32_t i = 0; i < size; ++i) {
if (strcmp(pMeta[i].name, TSDB_INS_TABLE_DNODE_VARIABLES) == 0) {
index = i;
break;
}
}
pBlock->pDataBlock = taosArrayInit(pMeta[index].colNum, sizeof(SColumnInfoData));
for (int32_t i = 0; i < pMeta[index].colNum; ++i) {
SColumnInfoData colInfoData = {0};
colInfoData.info.colId = i + 1;
colInfoData.info.type = pMeta[index].schema[i].type;
colInfoData.info.bytes = pMeta[index].schema[i].bytes;
taosArrayPush(pBlock->pDataBlock, &colInfoData);
}
pBlock->info.numOfCols = pMeta[index].colNum;
pBlock->info.hasVarCol = true;
return pBlock;
}
int32_t dmAppendVariablesToBlock(SSDataBlock* pBlock, int32_t dnodeId) {
int32_t numOfCfg = taosArrayGetSize(tsCfg->array);
int32_t numOfRows = 0;
blockDataEnsureCapacity(pBlock, numOfCfg);
for (int32_t i = 0, c = 0; i < numOfCfg; ++i, c = 0) {
SConfigItem *pItem = taosArrayGet(tsCfg->array, i);
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, c++);
colDataAppend(pColInfo, i, (const char *)&dnodeId, false);
char name[TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(name, pItem->name, TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE);
pColInfo = taosArrayGet(pBlock->pDataBlock, c++);
colDataAppend(pColInfo, i, name, false);
char value[TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE] = {0};
int32_t valueLen = 0;
cfgDumpItemValue(pItem, &value[VARSTR_HEADER_SIZE], TSDB_CONFIG_VALUE_LEN, &valueLen);
varDataSetLen(value, valueLen);
pColInfo = taosArrayGet(pBlock->pDataBlock, c++);
colDataAppend(pColInfo, i, value, false);
numOfRows++;
}
pBlock->info.rows = numOfRows;
return TSDB_CODE_SUCCESS;
}
int32_t dmProcessRetrieve(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
int32_t size = 0;
int32_t rowsRead = 0;
SRetrieveTableReq retrieveReq = {0};
if (tDeserializeSRetrieveTableReq(pMsg->pCont, pMsg->contLen, &retrieveReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
if (strcasecmp(retrieveReq.tb, TSDB_INS_TABLE_DNODE_VARIABLES)) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
SSDataBlock* pBlock = dmBuildVariablesBlock();
dmAppendVariablesToBlock(pBlock, pMgmt->pData->dnodeId);
size = sizeof(SRetrieveMetaTableRsp) + sizeof(int32_t) + sizeof(SSysTableSchema) * pBlock->info.numOfCols +
blockDataGetSize(pBlock) + blockDataGetSerialMetaSize(pBlock->info.numOfCols);
SRetrieveMetaTableRsp *pRsp = rpcMallocCont(size);
if (pRsp == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
dError("failed to retrieve data since %s", terrstr());
blockDataDestroy(pBlock);
return -1;
}
char *pStart = pRsp->data;
*(int32_t *)pStart = htonl(pBlock->info.numOfCols);
pStart += sizeof(int32_t); // number of columns
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
SSysTableSchema *pSchema = (SSysTableSchema *)pStart;
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, i);
pSchema->bytes = htonl(pColInfo->info.bytes);
pSchema->colId = htons(pColInfo->info.colId);
pSchema->type = pColInfo->info.type;
pStart += sizeof(SSysTableSchema);
}
int32_t len = 0;
blockCompressEncode(pBlock, pStart, &len, pBlock->info.numOfCols, false);
pRsp->numOfRows = htonl(pBlock->info.rows);
pRsp->precision = TSDB_TIME_PRECISION_MILLI; // millisecond time precision
pRsp->completed = 1;
pMsg->info.rsp = pRsp;
pMsg->info.rspLen = size;
dDebug("dnode variables retrieve completed");
blockDataDestroy(pBlock);
return TSDB_CODE_SUCCESS;
}
SArray *dmGetMsgHandles() { SArray *dmGetMsgHandles() {
int32_t code = -1; int32_t code = -1;
SArray *pArray = taosArrayInit(16, sizeof(SMgmtHandle)); SArray *pArray = taosArrayInit(16, sizeof(SMgmtHandle));
...@@ -191,6 +319,7 @@ SArray *dmGetMsgHandles() { ...@@ -191,6 +319,7 @@ SArray *dmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_BNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_BNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_CONFIG_DNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_CONFIG_DNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_SERVER_STATUS, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_SERVER_STATUS, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_SYSTABLE_RETRIEVE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
// Requests handled by MNODE // Requests handled by MNODE
if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT_RSP, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT_RSP, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
......
...@@ -141,6 +141,9 @@ static void dmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { ...@@ -141,6 +141,9 @@ static void dmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
case TDMT_DND_SERVER_STATUS: case TDMT_DND_SERVER_STATUS:
code = dmProcessServerRunStatus(pMgmt, pMsg); code = dmProcessServerRunStatus(pMgmt, pMsg);
break; break;
case TDMT_DND_SYSTABLE_RETRIEVE:
code = dmProcessRetrieve(pMgmt, pMsg);
break;
default: default:
terrno = TSDB_CODE_MSG_NOT_PROCESSED; terrno = TSDB_CODE_MSG_NOT_PROCESSED;
break; break;
......
...@@ -161,6 +161,7 @@ SArray *mmGetMsgHandles() { ...@@ -161,6 +161,7 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_QNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_QNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_QNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_QNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_QNODE_LIST, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_QNODE_LIST, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_DNODE_LIST, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_SNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_SNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_SNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_SNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_BNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_BNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
......
...@@ -85,6 +85,7 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { ...@@ -85,6 +85,7 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
dmProcessNetTestReq(pDnode, pRpc); dmProcessNetTestReq(pDnode, pRpc);
return; return;
case TDMT_MND_SYSTABLE_RETRIEVE_RSP: case TDMT_MND_SYSTABLE_RETRIEVE_RSP:
case TDMT_DND_SYSTABLE_RETRIEVE_RSP:
case TDMT_VND_FETCH_RSP: case TDMT_VND_FETCH_RSP:
qWorkerProcessFetchRsp(NULL, NULL, pRpc, 0); qWorkerProcessFetchRsp(NULL, NULL, pRpc, 0);
return; return;
......
...@@ -47,6 +47,7 @@ static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw); ...@@ -47,6 +47,7 @@ static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw);
static int32_t mndDnodeActionInsert(SSdb *pSdb, SDnodeObj *pDnode); static int32_t mndDnodeActionInsert(SSdb *pSdb, SDnodeObj *pDnode);
static int32_t mndDnodeActionDelete(SSdb *pSdb, SDnodeObj *pDnode); static int32_t mndDnodeActionDelete(SSdb *pSdb, SDnodeObj *pDnode);
static int32_t mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pOld, SDnodeObj *pNew); static int32_t mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pOld, SDnodeObj *pNew);
static int32_t mndProcessDnodeListReq(SRpcMsg *pReq);
static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq); static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq);
static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq); static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq);
...@@ -76,6 +77,7 @@ int32_t mndInitDnode(SMnode *pMnode) { ...@@ -76,6 +77,7 @@ int32_t mndInitDnode(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_MND_CONFIG_DNODE, mndProcessConfigDnodeReq); mndSetMsgHandle(pMnode, TDMT_MND_CONFIG_DNODE, mndProcessConfigDnodeReq);
mndSetMsgHandle(pMnode, TDMT_DND_CONFIG_DNODE_RSP, mndProcessConfigDnodeRsp); mndSetMsgHandle(pMnode, TDMT_DND_CONFIG_DNODE_RSP, mndProcessConfigDnodeRsp);
mndSetMsgHandle(pMnode, TDMT_MND_STATUS, mndProcessStatusReq); mndSetMsgHandle(pMnode, TDMT_MND_STATUS, mndProcessStatusReq);
mndSetMsgHandle(pMnode, TDMT_MND_DNODE_LIST, mndProcessDnodeListReq);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONFIGS, mndRetrieveConfigs); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONFIGS, mndRetrieveConfigs);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONFIGS, mndCancelGetNextConfig); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONFIGS, mndCancelGetNextConfig);
...@@ -499,6 +501,60 @@ _OVER: ...@@ -499,6 +501,60 @@ _OVER:
return code; return code;
} }
static int32_t mndProcessDnodeListReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;
SDnodeObj *pObj = NULL;
void *pIter = NULL;
SDnodeListRsp rsp = {0};
int32_t code = -1;
rsp.dnodeList = taosArrayInit(5, sizeof(SEpSet));
if (NULL == rsp.dnodeList) {
mError("failed to alloc epSet while process dnode list req");
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _OVER;
}
while (1) {
pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pObj);
if (pIter == NULL) break;
SEpSet epSet = {0};
epSet.numOfEps = 1;
tstrncpy(epSet.eps[0].fqdn, pObj->fqdn, TSDB_FQDN_LEN);
epSet.eps[0].port = pObj->port;
(void)taosArrayPush(rsp.dnodeList, &epSet);
sdbRelease(pSdb, pObj);
}
int32_t rspLen = tSerializeSDnodeListRsp(NULL, 0, &rsp);
void *pRsp = rpcMallocCont(rspLen);
if (pRsp == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _OVER;
}
tSerializeSDnodeListRsp(pRsp, rspLen, &rsp);
pReq->info.rspLen = rspLen;
pReq->info.rsp = pRsp;
code = 0;
_OVER:
if (code != 0) {
mError("failed to get dnode list since %s", terrstr());
}
tFreeSDnodeListRsp(&rsp);
return code;
}
static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq) { static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node; SMnode *pMnode = pReq->info.node;
int32_t code = -1; int32_t code = -1;
...@@ -700,28 +756,28 @@ static int32_t mndRetrieveConfigs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p ...@@ -700,28 +756,28 @@ static int32_t mndRetrieveConfigs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p
int32_t totalRows = 0; int32_t totalRows = 0;
int32_t numOfRows = 0; int32_t numOfRows = 0;
char *cfgOpts[TSDB_CONFIG_NUMBER] = {0}; char *cfgOpts[TSDB_CONFIG_NUMBER] = {0};
char cfgVals[TSDB_CONFIG_NUMBER][TSDB_CONIIG_VALUE_LEN + 1] = {0}; char cfgVals[TSDB_CONFIG_NUMBER][TSDB_CONFIG_VALUE_LEN + 1] = {0};
char *pWrite = NULL; char *pWrite = NULL;
int32_t cols = 0; int32_t cols = 0;
cfgOpts[totalRows] = "statusInterval"; cfgOpts[totalRows] = "statusInterval";
snprintf(cfgVals[totalRows], TSDB_CONIIG_VALUE_LEN, "%d", tsStatusInterval); snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsStatusInterval);
totalRows++; totalRows++;
cfgOpts[totalRows] = "timezone"; cfgOpts[totalRows] = "timezone";
snprintf(cfgVals[totalRows], TSDB_CONIIG_VALUE_LEN, "%s", tsTimezoneStr); snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", tsTimezoneStr);
totalRows++; totalRows++;
cfgOpts[totalRows] = "locale"; cfgOpts[totalRows] = "locale";
snprintf(cfgVals[totalRows], TSDB_CONIIG_VALUE_LEN, "%s", tsLocale); snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", tsLocale);
totalRows++; totalRows++;
cfgOpts[totalRows] = "charset"; cfgOpts[totalRows] = "charset";
snprintf(cfgVals[totalRows], TSDB_CONIIG_VALUE_LEN, "%s", tsCharset); snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", tsCharset);
totalRows++; totalRows++;
char buf[TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE] = {0}; char buf[TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE] = {0};
char bufVal[TSDB_CONIIG_VALUE_LEN + VARSTR_HEADER_SIZE] = {0}; char bufVal[TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE] = {0};
for (int32_t i = 0; i < totalRows; i++) { for (int32_t i = 0; i < totalRows; i++) {
cols = 0; cols = 0;
...@@ -730,7 +786,7 @@ static int32_t mndRetrieveConfigs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p ...@@ -730,7 +786,7 @@ static int32_t mndRetrieveConfigs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)buf, false); colDataAppend(pColInfo, numOfRows, (const char *)buf, false);
STR_WITH_MAXSIZE_TO_VARSTR(bufVal, cfgVals[i], TSDB_CONIIG_VALUE_LEN); STR_WITH_MAXSIZE_TO_VARSTR(bufVal, cfgVals[i], TSDB_CONFIG_VALUE_LEN);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)bufVal, false); colDataAppend(pColInfo, numOfRows, (const char *)bufVal, false);
......
...@@ -555,7 +555,7 @@ static int32_t mndCheckMnodeState(SRpcMsg *pMsg) { ...@@ -555,7 +555,7 @@ static int32_t mndCheckMnodeState(SRpcMsg *pMsg) {
static int32_t mndCheckMsgContent(SRpcMsg *pMsg) { static int32_t mndCheckMsgContent(SRpcMsg *pMsg) {
if (!IsReq(pMsg)) return 0; if (!IsReq(pMsg)) return 0;
if (pMsg->contLen != 0 && pMsg->pCont != NULL) return 0; if (pMsg->contLen != 0 && pMsg->pCont != NULL) return 0;
const STraceId *trace = &pMsg->info.traceId; const STraceId *trace = &pMsg->info.traceId;
mGError("msg:%p, failed to check msg, cont:%p contLen:%d, app:%p type:%s", pMsg, pMsg->pCont, pMsg->contLen, mGError("msg:%p, failed to check msg, cont:%p contLen:%d, app:%p type:%s", pMsg, pMsg->pCont, pMsg->contLen,
pMsg->info.ahandle, TMSG_INFO(pMsg->msgType)); pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
......
...@@ -219,11 +219,9 @@ _err: ...@@ -219,11 +219,9 @@ _err:
} }
int metaTtlSmaller(SMeta *pMeta, uint64_t ttl, SArray *uidList){ int metaTtlSmaller(SMeta *pMeta, uint64_t ttl, SArray *uidList){
metaRLock(pMeta);
TBC * pCur; TBC * pCur;
int ret = tdbTbcOpen(pMeta->pTtlIdx, &pCur, NULL); int ret = tdbTbcOpen(pMeta->pTtlIdx, &pCur, NULL);
if (ret < 0) { if (ret < 0) {
metaULock(pMeta);
return ret; return ret;
} }
...@@ -249,6 +247,7 @@ int metaTtlSmaller(SMeta *pMeta, uint64_t ttl, SArray *uidList){ ...@@ -249,6 +247,7 @@ int metaTtlSmaller(SMeta *pMeta, uint64_t ttl, SArray *uidList){
tdbTbcClose(pCur); tdbTbcClose(pCur);
tdbFree(pKey); tdbFree(pKey);
return 0; return 0;
} }
......
...@@ -375,6 +375,7 @@ int metaTtlDropTable(SMeta *pMeta, int64_t ttl, SArray *tbUids) { ...@@ -375,6 +375,7 @@ int metaTtlDropTable(SMeta *pMeta, int64_t ttl, SArray *tbUids) {
metaWLock(pMeta); metaWLock(pMeta);
int ret = metaTtlSmaller(pMeta, ttl, tbUids); int ret = metaTtlSmaller(pMeta, ttl, tbUids);
if(ret != 0){ if(ret != 0){
metaULock(pMeta);
return ret; return ret;
} }
for (int i = 0; i < taosArrayGetSize(tbUids); ++i) { for (int i = 0; i < taosArrayGetSize(tbUids); ++i) {
......
...@@ -65,6 +65,7 @@ enum { ...@@ -65,6 +65,7 @@ enum {
typedef enum { typedef enum {
CTG_TASK_GET_QNODE = 0, CTG_TASK_GET_QNODE = 0,
CTG_TASK_GET_DNODE,
CTG_TASK_GET_DB_VGROUP, CTG_TASK_GET_DB_VGROUP,
CTG_TASK_GET_DB_CFG, CTG_TASK_GET_DB_CFG,
CTG_TASK_GET_DB_INFO, CTG_TASK_GET_DB_INFO,
...@@ -216,6 +217,7 @@ typedef struct SCtgJob { ...@@ -216,6 +217,7 @@ typedef struct SCtgJob {
int32_t dbVgNum; int32_t dbVgNum;
int32_t udfNum; int32_t udfNum;
int32_t qnodeNum; int32_t qnodeNum;
int32_t dnodeNum;
int32_t dbCfgNum; int32_t dbCfgNum;
int32_t indexNum; int32_t indexNum;
int32_t userNum; int32_t userNum;
...@@ -565,6 +567,7 @@ int32_t ctgGetTbHashVgroupFromCache(SCatalog *pCtg, const SName *pTableName, SVg ...@@ -565,6 +567,7 @@ int32_t ctgGetTbHashVgroupFromCache(SCatalog *pCtg, const SName *pTableName, SVg
int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize, int32_t rspCode, char* target); int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize, int32_t rspCode, char* target);
int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SBuildUseDBInput *input, SUseDbOutput *out, SCtgTask* pTask); int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SBuildUseDBInput *input, SUseDbOutput *out, SCtgTask* pTask);
int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray *out, SCtgTask* pTask); int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray *out, SCtgTask* pTask);
int32_t ctgGetDnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray **out, SCtgTask* pTask);
int32_t ctgGetDBCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char *dbFName, SDbCfgInfo *out, SCtgTask* pTask); int32_t ctgGetDBCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char *dbFName, SDbCfgInfo *out, SCtgTask* pTask);
int32_t ctgGetIndexInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char *indexName, SIndexInfo *out, SCtgTask* pTask); int32_t ctgGetIndexInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char *indexName, SIndexInfo *out, SCtgTask* pTask);
int32_t ctgGetTbIndexFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SName *name, STableIndex* out, SCtgTask* pTask); int32_t ctgGetTbIndexFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SName *name, STableIndex* out, SCtgTask* pTask);
......
...@@ -1099,8 +1099,19 @@ _return: ...@@ -1099,8 +1099,19 @@ _return:
CTG_API_LEAVE(TSDB_CODE_SUCCESS); CTG_API_LEAVE(TSDB_CODE_SUCCESS);
} }
int32_t catalogGetDnodeList(SCatalog* pCatalog, SRequestConnInfo* pConn, SArray** pDnodeList) { int32_t catalogGetDnodeList(SCatalog* pCtg, SRequestConnInfo* pConn, SArray** pDnodeList) {
return TSDB_CODE_CTG_INVALID_INPUT; CTG_API_ENTER();
int32_t code = 0;
if (NULL == pCtg || NULL == pConn || NULL == pDnodeList) {
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
}
CTG_ERR_JRET(ctgGetDnodeListFromMnode(pCtg, pConn, pDnodeList, NULL));
_return:
CTG_API_LEAVE(TSDB_CODE_SUCCESS);
} }
int32_t catalogGetExpiredSTables(SCatalog* pCtg, SSTableVersion **stables, uint32_t *num) { int32_t catalogGetExpiredSTables(SCatalog* pCtg, SSTableVersion **stables, uint32_t *num) {
......
...@@ -168,6 +168,21 @@ int32_t ctgInitGetQnodeTask(SCtgJob *pJob, int32_t taskIdx, void* param) { ...@@ -168,6 +168,21 @@ int32_t ctgInitGetQnodeTask(SCtgJob *pJob, int32_t taskIdx, void* param) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgInitGetDnodeTask(SCtgJob *pJob, int32_t taskIdx, void* param) {
SCtgTask task = {0};
task.type = CTG_TASK_GET_DNODE;
task.taskId = taskIdx;
task.pJob = pJob;
task.taskCtx = NULL;
taosArrayPush(pJob->pTasks, &task);
qDebug("QID:0x%" PRIx64 " the %d task type %s initialized", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type));
return TSDB_CODE_SUCCESS;
}
int32_t ctgInitGetIndexTask(SCtgJob *pJob, int32_t taskIdx, void* param) { int32_t ctgInitGetIndexTask(SCtgJob *pJob, int32_t taskIdx, void* param) {
char *name = (char*)param; char *name = (char*)param;
SCtgTask task = {0}; SCtgTask task = {0};
...@@ -405,6 +420,7 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, uint6 ...@@ -405,6 +420,7 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, uint6
int32_t tbHashNum = (int32_t)taosArrayGetSize(pReq->pTableHash); int32_t tbHashNum = (int32_t)taosArrayGetSize(pReq->pTableHash);
int32_t udfNum = (int32_t)taosArrayGetSize(pReq->pUdf); int32_t udfNum = (int32_t)taosArrayGetSize(pReq->pUdf);
int32_t qnodeNum = pReq->qNodeRequired ? 1 : 0; int32_t qnodeNum = pReq->qNodeRequired ? 1 : 0;
int32_t dnodeNum = pReq->dNodeRequired ? 1 : 0;
int32_t dbCfgNum = (int32_t)taosArrayGetSize(pReq->pDbCfg); int32_t dbCfgNum = (int32_t)taosArrayGetSize(pReq->pDbCfg);
int32_t indexNum = (int32_t)taosArrayGetSize(pReq->pIndex); int32_t indexNum = (int32_t)taosArrayGetSize(pReq->pIndex);
int32_t userNum = (int32_t)taosArrayGetSize(pReq->pUser); int32_t userNum = (int32_t)taosArrayGetSize(pReq->pUser);
...@@ -412,7 +428,7 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, uint6 ...@@ -412,7 +428,7 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, uint6
int32_t tbIndexNum = (int32_t)taosArrayGetSize(pReq->pTableIndex); int32_t tbIndexNum = (int32_t)taosArrayGetSize(pReq->pTableIndex);
int32_t tbCfgNum = (int32_t)taosArrayGetSize(pReq->pTableCfg); int32_t tbCfgNum = (int32_t)taosArrayGetSize(pReq->pTableCfg);
*taskNum = tbMetaNum + dbVgNum + udfNum + tbHashNum + qnodeNum + dbCfgNum + indexNum + userNum + dbInfoNum + tbIndexNum + tbCfgNum; *taskNum = tbMetaNum + dbVgNum + udfNum + tbHashNum + qnodeNum + dnodeNum + dbCfgNum + indexNum + userNum + dbInfoNum + tbIndexNum + tbCfgNum;
if (*taskNum <= 0) { if (*taskNum <= 0) {
ctgDebug("Empty input for job, no need to retrieve meta, reqId:0x%" PRIx64, reqId); ctgDebug("Empty input for job, no need to retrieve meta, reqId:0x%" PRIx64, reqId);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -435,6 +451,7 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, uint6 ...@@ -435,6 +451,7 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, uint6
pJob->tbMetaNum = tbMetaNum; pJob->tbMetaNum = tbMetaNum;
pJob->tbHashNum = tbHashNum; pJob->tbHashNum = tbHashNum;
pJob->qnodeNum = qnodeNum; pJob->qnodeNum = qnodeNum;
pJob->dnodeNum = dnodeNum;
pJob->dbVgNum = dbVgNum; pJob->dbVgNum = dbVgNum;
pJob->udfNum = udfNum; pJob->udfNum = udfNum;
pJob->dbCfgNum = dbCfgNum; pJob->dbCfgNum = dbCfgNum;
...@@ -509,6 +526,10 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, uint6 ...@@ -509,6 +526,10 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, uint6
CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_QNODE, NULL, NULL)); CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_QNODE, NULL, NULL));
} }
if (dnodeNum) {
CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_DNODE, NULL, NULL));
}
pJob->refId = taosAddRef(gCtgMgmt.jobPool, pJob); pJob->refId = taosAddRef(gCtgMgmt.jobPool, pJob);
if (pJob->refId < 0) { if (pJob->refId < 0) {
ctgError("add job to ref failed, error: %s", tstrerror(terrno)); ctgError("add job to ref failed, error: %s", tstrerror(terrno));
...@@ -631,6 +652,22 @@ int32_t ctgDumpQnodeRes(SCtgTask* pTask) { ...@@ -631,6 +652,22 @@ int32_t ctgDumpQnodeRes(SCtgTask* pTask) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgDumpDnodeRes(SCtgTask* pTask) {
SCtgJob* pJob = pTask->pJob;
if (NULL == pJob->jobRes.pDnodeList) {
pJob->jobRes.pDnodeList = taosArrayInit(1, sizeof(SMetaRes));
if (NULL == pJob->jobRes.pDnodeList) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
}
SMetaRes res = {.code = pTask->code, .pRes = pTask->res};
taosArrayPush(pJob->jobRes.pDnodeList, &res);
return TSDB_CODE_SUCCESS;
}
int32_t ctgDumpDbCfgRes(SCtgTask* pTask) { int32_t ctgDumpDbCfgRes(SCtgTask* pTask) {
SCtgJob* pJob = pTask->pJob; SCtgJob* pJob = pTask->pJob;
if (NULL == pJob->jobRes.pDbCfg) { if (NULL == pJob->jobRes.pDbCfg) {
...@@ -1036,6 +1073,19 @@ _return: ...@@ -1036,6 +1073,19 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgHandleGetDnodeRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) {
int32_t code = 0;
CTG_ERR_JRET(ctgProcessRspMsg(&pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target));
TSWAP(pTask->res, pTask->msgCtx.out);
_return:
ctgHandleTaskEnd(pTask, code);
CTG_RET(code);
}
int32_t ctgHandleGetIndexRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) { int32_t ctgHandleGetIndexRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) {
int32_t code = 0; int32_t code = 0;
CTG_ERR_JRET(ctgProcessRspMsg(pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target)); CTG_ERR_JRET(ctgProcessRspMsg(pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target));
...@@ -1311,6 +1361,15 @@ int32_t ctgLaunchGetQnodeTask(SCtgTask *pTask) { ...@@ -1311,6 +1361,15 @@ int32_t ctgLaunchGetQnodeTask(SCtgTask *pTask) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgLaunchGetDnodeTask(SCtgTask *pTask) {
SCatalog* pCtg = pTask->pJob->pCtg;
SRequestConnInfo* pConn = &pTask->pJob->conn;
CTG_ERR_RET(ctgGetDnodeListFromMnode(pCtg, pConn, NULL, pTask));
return TSDB_CODE_SUCCESS;
}
int32_t ctgLaunchGetDbCfgTask(SCtgTask *pTask) { int32_t ctgLaunchGetDbCfgTask(SCtgTask *pTask) {
SCatalog* pCtg = pTask->pJob->pCtg; SCatalog* pCtg = pTask->pJob->pCtg;
SRequestConnInfo* pConn = &pTask->pJob->conn; SRequestConnInfo* pConn = &pTask->pJob->conn;
...@@ -1462,6 +1521,7 @@ int32_t ctgCloneDbVg(SCtgTask* pTask, void** pRes) { ...@@ -1462,6 +1521,7 @@ int32_t ctgCloneDbVg(SCtgTask* pTask, void** pRes) {
SCtgAsyncFps gCtgAsyncFps[] = { SCtgAsyncFps gCtgAsyncFps[] = {
{ctgInitGetQnodeTask, ctgLaunchGetQnodeTask, ctgHandleGetQnodeRsp, ctgDumpQnodeRes, NULL, NULL}, {ctgInitGetQnodeTask, ctgLaunchGetQnodeTask, ctgHandleGetQnodeRsp, ctgDumpQnodeRes, NULL, NULL},
{ctgInitGetDnodeTask, ctgLaunchGetDnodeTask, ctgHandleGetDnodeRsp, ctgDumpDnodeRes, NULL, NULL},
{ctgInitGetDbVgTask, ctgLaunchGetDbVgTask, ctgHandleGetDbVgRsp, ctgDumpDbVgRes, ctgCompDbVgTasks, ctgCloneDbVg}, {ctgInitGetDbVgTask, ctgLaunchGetDbVgTask, ctgHandleGetDbVgRsp, ctgDumpDbVgRes, ctgCompDbVgTasks, ctgCloneDbVg},
{ctgInitGetDbCfgTask, ctgLaunchGetDbCfgTask, ctgHandleGetDbCfgRsp, ctgDumpDbCfgRes, NULL, NULL}, {ctgInitGetDbCfgTask, ctgLaunchGetDbCfgTask, ctgHandleGetDbCfgRsp, ctgDumpDbCfgRes, NULL, NULL},
{ctgInitGetDbInfoTask, ctgLaunchGetDbInfoTask, ctgHandleGetDbInfoRsp, ctgDumpDbInfoRes, NULL, NULL}, {ctgInitGetDbInfoTask, ctgLaunchGetDbInfoTask, ctgHandleGetDbInfoRsp, ctgDumpDbInfoRes, NULL, NULL},
......
...@@ -40,6 +40,21 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize, ...@@ -40,6 +40,21 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize,
qDebug("Got qnode list from mnode, listNum:%d", (int32_t)taosArrayGetSize(out)); qDebug("Got qnode list from mnode, listNum:%d", (int32_t)taosArrayGetSize(out));
break; break;
} }
case TDMT_MND_DNODE_LIST: {
if (TSDB_CODE_SUCCESS != rspCode) {
qError("error rsp for dnode list, error:%s", tstrerror(rspCode));
CTG_ERR_RET(rspCode);
}
code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
if (code) {
qError("Process dnode list rsp failed, error:%s", tstrerror(rspCode));
CTG_ERR_RET(code);
}
qDebug("Got dnode list from mnode, listNum:%d", (int32_t)taosArrayGetSize(*(SArray**)out));
break;
}
case TDMT_MND_USE_DB: { case TDMT_MND_USE_DB: {
if (TSDB_CODE_SUCCESS != rspCode) { if (TSDB_CODE_SUCCESS != rspCode) {
qError("error rsp for use db, error:%s, dbFName:%s", tstrerror(rspCode), target); qError("error rsp for use db, error:%s, dbFName:%s", tstrerror(rspCode), target);
...@@ -309,9 +324,6 @@ _return: ...@@ -309,9 +324,6 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray *out, SCtgTask* pTask) { int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray *out, SCtgTask* pTask) {
char *msg = NULL; char *msg = NULL;
int32_t msgLen = 0; int32_t msgLen = 0;
...@@ -349,6 +361,39 @@ int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray ...@@ -349,6 +361,39 @@ int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgGetDnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray **out, SCtgTask* pTask) {
char *msg = NULL;
int32_t msgLen = 0;
int32_t reqType = TDMT_MND_DNODE_LIST;
void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
ctgDebug("try to get dnode list from mnode, mgmtEpInUse:%d", pConn->mgmtEps.inUse);
int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](NULL, &msg, 0, &msgLen, mallocFp);
if (code) {
ctgError("Build dnode list msg failed, error:%s", tstrerror(code));
CTG_ERR_RET(code);
}
if (pTask) {
CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, NULL, NULL));
CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask, reqType, msg, msgLen));
}
SRpcMsg rpcMsg = {
.msgType = reqType,
.pCont = msg,
.contLen = msgLen,
};
SRpcMsg rpcRsp = {0};
rpcSendRecv(pConn->pTrans, &pConn->mgmtEps, &rpcMsg, &rpcRsp);
CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, NULL));
return TSDB_CODE_SUCCESS;
}
int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SBuildUseDBInput *input, SUseDbOutput *out, SCtgTask* pTask) { int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SBuildUseDBInput *input, SUseDbOutput *out, SCtgTask* pTask) {
char *msg = NULL; char *msg = NULL;
......
...@@ -23,6 +23,8 @@ char *ctgTaskTypeStr(CTG_TASK_TYPE type) { ...@@ -23,6 +23,8 @@ char *ctgTaskTypeStr(CTG_TASK_TYPE type) {
switch (type) { switch (type) {
case CTG_TASK_GET_QNODE: case CTG_TASK_GET_QNODE:
return "[get qnode list]"; return "[get qnode list]";
case CTG_TASK_GET_DNODE:
return "[get dnode list]";
case CTG_TASK_GET_DB_VGROUP: case CTG_TASK_GET_DB_VGROUP:
return "[get db vgroup]"; return "[get db vgroup]";
case CTG_TASK_GET_DB_CFG: case CTG_TASK_GET_DB_CFG:
...@@ -349,6 +351,11 @@ void ctgFreeTaskRes(CTG_TASK_TYPE type, void **pRes) { ...@@ -349,6 +351,11 @@ void ctgFreeTaskRes(CTG_TASK_TYPE type, void **pRes) {
*pRes = NULL; *pRes = NULL;
break; break;
} }
case CTG_TASK_GET_DNODE: {
taosArrayDestroy((SArray*)*pRes);
*pRes = NULL;
break;
}
case CTG_TASK_GET_TB_META: { case CTG_TASK_GET_TB_META: {
taosMemoryFreeClear(*pRes); taosMemoryFreeClear(*pRes);
break; break;
...@@ -413,6 +420,11 @@ void ctgFreeSubTaskRes(CTG_TASK_TYPE type, void **pRes) { ...@@ -413,6 +420,11 @@ void ctgFreeSubTaskRes(CTG_TASK_TYPE type, void **pRes) {
*pRes = NULL; *pRes = NULL;
break; break;
} }
case CTG_TASK_GET_DNODE: {
taosArrayDestroy((SArray*)*pRes);
*pRes = NULL;
break;
}
case CTG_TASK_GET_TB_META: { case CTG_TASK_GET_TB_META: {
taosMemoryFreeClear(*pRes); taosMemoryFreeClear(*pRes);
break; break;
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
#include "tdatablock.h" #include "tdatablock.h"
#include "tglobal.h" #include "tglobal.h"
extern SConfig *tsCfg;
static int32_t getSchemaBytes(const SSchema* pSchema) { static int32_t getSchemaBytes(const SSchema* pSchema) {
switch (pSchema->type) { switch (pSchema->type) {
case TSDB_DATA_TYPE_BINARY: case TSDB_DATA_TYPE_BINARY:
...@@ -521,7 +522,85 @@ static int32_t execShowCreateSTable(SShowCreateTableStmt* pStmt, SRetrieveTableR ...@@ -521,7 +522,85 @@ static int32_t execShowCreateSTable(SShowCreateTableStmt* pStmt, SRetrieveTableR
static int32_t execAlterLocal(SAlterLocalStmt* pStmt) { return TSDB_CODE_FAILED; } static int32_t execAlterLocal(SAlterLocalStmt* pStmt) { return TSDB_CODE_FAILED; }
static int32_t execShowLocalVariables() { return TSDB_CODE_FAILED; } static SSDataBlock* buildLocalVariablesResultDataBlock() {
SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
pBlock->info.numOfCols = SHOW_LOCAL_VARIABLES_RESULT_COLS;
pBlock->info.hasVarCol = true;
pBlock->pDataBlock = taosArrayInit(pBlock->info.numOfCols, sizeof(SColumnInfoData));
SColumnInfoData infoData = {0};
infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
infoData.info.bytes = SHOW_LOCAL_VARIABLES_RESULT_FIELD1_LEN;
taosArrayPush(pBlock->pDataBlock, &infoData);
infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
infoData.info.bytes = SHOW_LOCAL_VARIABLES_RESULT_FIELD2_LEN;
taosArrayPush(pBlock->pDataBlock, &infoData);
return pBlock;
}
int32_t setLocalVariablesResultIntoDataBlock(SSDataBlock* pBlock) {
int32_t numOfCfg = taosArrayGetSize(tsCfg->array);
int32_t numOfRows = 0;
blockDataEnsureCapacity(pBlock, numOfCfg);
for (int32_t i = 0, c = 0; i < numOfCfg; ++i, c = 0) {
SConfigItem *pItem = taosArrayGet(tsCfg->array, i);
char name[TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(name, pItem->name, TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE);
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, c++);
colDataAppend(pColInfo, i, name, false);
char value[TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE] = {0};
int32_t valueLen = 0;
cfgDumpItemValue(pItem, &value[VARSTR_HEADER_SIZE], TSDB_CONFIG_VALUE_LEN, &valueLen);
varDataSetLen(value, valueLen);
pColInfo = taosArrayGet(pBlock->pDataBlock, c++);
colDataAppend(pColInfo, i, value, false);
numOfRows++;
}
pBlock->info.rows = numOfRows;
return TSDB_CODE_SUCCESS;
}
static int32_t execShowLocalVariables(SRetrieveTableRsp** pRsp) {
SSDataBlock* pBlock = buildLocalVariablesResultDataBlock();
int32_t code = setLocalVariablesResultIntoDataBlock(pBlock);
if (code) {
return code;
}
size_t rspSize = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock);
*pRsp = taosMemoryCalloc(1, rspSize);
if (NULL == *pRsp) {
return TSDB_CODE_OUT_OF_MEMORY;
}
(*pRsp)->useconds = 0;
(*pRsp)->completed = 1;
(*pRsp)->precision = 0;
(*pRsp)->compressed = 0;
(*pRsp)->compLen = 0;
(*pRsp)->numOfRows = htonl(pBlock->info.rows);
(*pRsp)->numOfCols = htonl(SHOW_LOCAL_VARIABLES_RESULT_COLS);
int32_t len = 0;
blockCompressEncode(pBlock, (*pRsp)->data, &len, SHOW_LOCAL_VARIABLES_RESULT_COLS, false);
ASSERT(len == rspSize - sizeof(SRetrieveTableRsp));
blockDataDestroy(pBlock);
return TSDB_CODE_SUCCESS;
}
int32_t qExecCommand(SNode* pStmt, SRetrieveTableRsp** pRsp) { int32_t qExecCommand(SNode* pStmt, SRetrieveTableRsp** pRsp) {
switch (nodeType(pStmt)) { switch (nodeType(pStmt)) {
...@@ -538,7 +617,7 @@ int32_t qExecCommand(SNode* pStmt, SRetrieveTableRsp** pRsp) { ...@@ -538,7 +617,7 @@ int32_t qExecCommand(SNode* pStmt, SRetrieveTableRsp** pRsp) {
case QUERY_NODE_ALTER_LOCAL_STMT: case QUERY_NODE_ALTER_LOCAL_STMT:
return execAlterLocal((SAlterLocalStmt*)pStmt); return execAlterLocal((SAlterLocalStmt*)pStmt);
case QUERY_NODE_SHOW_LOCAL_VARIABLES_STMT: case QUERY_NODE_SHOW_LOCAL_VARIABLES_STMT:
return execShowLocalVariables(); return execShowLocalVariables(pRsp);
default: default:
break; break;
} }
......
...@@ -839,7 +839,6 @@ int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosi ...@@ -839,7 +839,6 @@ int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosi
int32_t createScanTableListInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, int32_t createScanTableListInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle,
STableListInfo* pTableListInfo, uint64_t queryId, uint64_t taskId); STableListInfo* pTableListInfo, uint64_t queryId, uint64_t taskId);
SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSortPhysiNode* pSortPhyNode, SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSortPhysiNode* pSortPhyNode,
SExecTaskInfo* pTaskInfo); SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, STableListInfo *pTableListInfo, SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, STableListInfo *pTableListInfo,
......
...@@ -1536,11 +1536,14 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) { ...@@ -1536,11 +1536,14 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
return NULL; return NULL;
} }
int32_t msgType = (strcasecmp(name, TSDB_INS_TABLE_DNODE_VARIABLES) == 0) ? TDMT_DND_SYSTABLE_RETRIEVE : TDMT_MND_SYSTABLE_RETRIEVE;
pMsgSendInfo->param = pOperator; pMsgSendInfo->param = pOperator;
pMsgSendInfo->msgInfo.pData = buf1; pMsgSendInfo->msgInfo.pData = buf1;
pMsgSendInfo->msgInfo.len = contLen; pMsgSendInfo->msgInfo.len = contLen;
pMsgSendInfo->msgType = TDMT_MND_SYSTABLE_RETRIEVE; pMsgSendInfo->msgType = msgType;
pMsgSendInfo->fp = loadSysTableCallback; pMsgSendInfo->fp = loadSysTableCallback;
pMsgSendInfo->requestId = pTaskInfo->id.queryId;
int64_t transporterId = 0; int64_t transporterId = 0;
int32_t code = int32_t code =
...@@ -1575,6 +1578,8 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) { ...@@ -1575,6 +1578,8 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
taosMemoryFree(pRsp); taosMemoryFree(pRsp);
if (pInfo->pRes->info.rows > 0) { if (pInfo->pRes->info.rows > 0) {
return pInfo->pRes; return pInfo->pRes;
} else if (pOperator->status == OP_EXEC_DONE) {
return NULL;
} }
} }
} }
......
...@@ -137,6 +137,9 @@ static int32_t collectMetaKeyFromRealTableImpl(SCollectMetaKeyCxt* pCxt, SRealTa ...@@ -137,6 +137,9 @@ static int32_t collectMetaKeyFromRealTableImpl(SCollectMetaKeyCxt* pCxt, SRealTa
code = reserveTableIndexInCache(pCxt->pParseCxt->acctId, pRealTable->table.dbName, pRealTable->table.tableName, code = reserveTableIndexInCache(pCxt->pParseCxt->acctId, pRealTable->table.dbName, pRealTable->table.tableName,
pCxt->pMetaCache); pCxt->pMetaCache);
} }
if (TSDB_CODE_SUCCESS == code && (0 == strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_DNODE_VARIABLES))) {
code = reserveDnodeRequiredInCache(pCxt->pMetaCache);
}
return code; return code;
} }
......
...@@ -4611,6 +4611,25 @@ static int32_t extractShowCreateTableResultSchema(int32_t* numOfCols, SSchema** ...@@ -4611,6 +4611,25 @@ static int32_t extractShowCreateTableResultSchema(int32_t* numOfCols, SSchema**
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t extractShowLocalVariablesResultSchema(int32_t* numOfCols, SSchema** pSchema) {
*numOfCols = 2;
*pSchema = taosMemoryCalloc((*numOfCols), sizeof(SSchema));
if (NULL == (*pSchema)) {
return TSDB_CODE_OUT_OF_MEMORY;
}
(*pSchema)[0].type = TSDB_DATA_TYPE_BINARY;
(*pSchema)[0].bytes = TSDB_CONFIG_OPTION_LEN;
strcpy((*pSchema)[0].name, "name");
(*pSchema)[1].type = TSDB_DATA_TYPE_BINARY;
(*pSchema)[1].bytes = TSDB_CONFIG_VALUE_LEN;
strcpy((*pSchema)[1].name, "value");
return TSDB_CODE_SUCCESS;
}
int32_t extractResultSchema(const SNode* pRoot, int32_t* numOfCols, SSchema** pSchema) { int32_t extractResultSchema(const SNode* pRoot, int32_t* numOfCols, SSchema** pSchema) {
if (NULL == pRoot) { if (NULL == pRoot) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -4629,6 +4648,8 @@ int32_t extractResultSchema(const SNode* pRoot, int32_t* numOfCols, SSchema** pS ...@@ -4629,6 +4648,8 @@ int32_t extractResultSchema(const SNode* pRoot, int32_t* numOfCols, SSchema** pS
case QUERY_NODE_SHOW_CREATE_TABLE_STMT: case QUERY_NODE_SHOW_CREATE_TABLE_STMT:
case QUERY_NODE_SHOW_CREATE_STABLE_STMT: case QUERY_NODE_SHOW_CREATE_STABLE_STMT:
return extractShowCreateTableResultSchema(numOfCols, pSchema); return extractShowCreateTableResultSchema(numOfCols, pSchema);
case QUERY_NODE_SHOW_LOCAL_VARIABLES_STMT:
return extractShowLocalVariablesResultSchema(numOfCols, pSchema);
default: default:
break; break;
} }
...@@ -5945,12 +5966,12 @@ static int32_t setQuery(STranslateContext* pCxt, SQuery* pQuery) { ...@@ -5945,12 +5966,12 @@ static int32_t setQuery(STranslateContext* pCxt, SQuery* pQuery) {
case QUERY_NODE_SHOW_CREATE_DATABASE_STMT: case QUERY_NODE_SHOW_CREATE_DATABASE_STMT:
case QUERY_NODE_SHOW_CREATE_TABLE_STMT: case QUERY_NODE_SHOW_CREATE_TABLE_STMT:
case QUERY_NODE_SHOW_CREATE_STABLE_STMT: case QUERY_NODE_SHOW_CREATE_STABLE_STMT:
case QUERY_NODE_SHOW_LOCAL_VARIABLES_STMT:
pQuery->execMode = QUERY_EXEC_MODE_LOCAL; pQuery->execMode = QUERY_EXEC_MODE_LOCAL;
pQuery->haveResultSet = true; pQuery->haveResultSet = true;
break; break;
case QUERY_NODE_RESET_QUERY_CACHE_STMT: case QUERY_NODE_RESET_QUERY_CACHE_STMT:
case QUERY_NODE_ALTER_LOCAL_STMT: case QUERY_NODE_ALTER_LOCAL_STMT:
case QUERY_NODE_SHOW_LOCAL_VARIABLES_STMT:
pQuery->execMode = QUERY_EXEC_MODE_LOCAL; pQuery->execMode = QUERY_EXEC_MODE_LOCAL;
break; break;
default: default:
......
...@@ -928,7 +928,12 @@ int32_t reserveDnodeRequiredInCache(SParseMetaCache* pMetaCache) { ...@@ -928,7 +928,12 @@ int32_t reserveDnodeRequiredInCache(SParseMetaCache* pMetaCache) {
} }
int32_t getDnodeListFromCache(SParseMetaCache* pMetaCache, SArray** pDnodes) { int32_t getDnodeListFromCache(SParseMetaCache* pMetaCache, SArray** pDnodes) {
*pDnodes = taosArrayDup(pMetaCache->pDnodes); SMetaRes* pRes = taosArrayGet(pMetaCache->pDnodes, 0);
if (pRes->code) {
return pRes->code;
}
*pDnodes = taosArrayDup((SArray*)pRes->pRes);
if (NULL == *pDnodes) { if (NULL == *pDnodes) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
......
...@@ -166,10 +166,13 @@ class MockCatalogServiceImpl { ...@@ -166,10 +166,13 @@ class MockCatalogServiceImpl {
} }
int32_t catalogGetDnodeList(SArray** pDnodes) const { int32_t catalogGetDnodeList(SArray** pDnodes) const {
*pDnodes = taosArrayInit(dnode_.size(), sizeof(SEpSet)); SMetaRes res = {0};
res.pRes = taosArrayInit(dnode_.size(), sizeof(SEpSet));
for (const auto& dnode : dnode_) { for (const auto& dnode : dnode_) {
taosArrayPush(*pDnodes, &dnode.second); taosArrayPush((SArray*)res.pRes, &dnode.second);
} }
*pDnodes = taosArrayInit(1, sizeof(SMetaRes));
taosArrayPush(*pDnodes, &res);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -126,6 +126,25 @@ int32_t queryBuildQnodeListMsg(void *input, char **msg, int32_t msgSize, int32_t ...@@ -126,6 +126,25 @@ int32_t queryBuildQnodeListMsg(void *input, char **msg, int32_t msgSize, int32_t
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t queryBuildDnodeListMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void*(*mallcFp)(int32_t)) {
if (NULL == msg || NULL == msgLen) {
return TSDB_CODE_TSC_INVALID_INPUT;
}
SDnodeListReq dnodeListReq = {0};
dnodeListReq.rowNum = -1;
int32_t bufLen = tSerializeSDnodeListReq(NULL, 0, &dnodeListReq);
void *pBuf = (*mallcFp)(bufLen);
tSerializeSDnodeListReq(pBuf, bufLen, &dnodeListReq);
*msg = pBuf;
*msgLen = bufLen;
return TSDB_CODE_SUCCESS;
}
int32_t queryBuildGetDBCfgMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void*(*mallcFp)(int32_t)) { int32_t queryBuildGetDBCfgMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void*(*mallcFp)(int32_t)) {
if (NULL == msg || NULL == msgLen) { if (NULL == msg || NULL == msgLen) {
return TSDB_CODE_TSC_INVALID_INPUT; return TSDB_CODE_TSC_INVALID_INPUT;
...@@ -428,6 +447,27 @@ int32_t queryProcessQnodeListRsp(void *output, char *msg, int32_t msgSize) { ...@@ -428,6 +447,27 @@ int32_t queryProcessQnodeListRsp(void *output, char *msg, int32_t msgSize) {
return code; return code;
} }
int32_t queryProcessDnodeListRsp(void *output, char *msg, int32_t msgSize) {
SDnodeListRsp out = {0};
int32_t code = 0;
if (NULL == output || NULL == msg || msgSize <= 0) {
code = TSDB_CODE_TSC_INVALID_INPUT;
return code;
}
if (tDeserializeSDnodeListRsp(msg, msgSize, &out) != 0) {
qError("invalid dnode list rsp msg, msgSize:%d", msgSize);
code = TSDB_CODE_INVALID_MSG;
return code;
}
*(SArray**)output = out.dnodeList;
return code;
}
int32_t queryProcessGetDbCfgRsp(void *output, char *msg, int32_t msgSize) { int32_t queryProcessGetDbCfgRsp(void *output, char *msg, int32_t msgSize) {
SDbCfgRsp out = {0}; SDbCfgRsp out = {0};
...@@ -535,6 +575,7 @@ void initQueryModuleMsgHandle() { ...@@ -535,6 +575,7 @@ void initQueryModuleMsgHandle() {
queryBuildMsg[TMSG_INDEX(TDMT_MND_TABLE_META)] = queryBuildTableMetaReqMsg; queryBuildMsg[TMSG_INDEX(TDMT_MND_TABLE_META)] = queryBuildTableMetaReqMsg;
queryBuildMsg[TMSG_INDEX(TDMT_MND_USE_DB)] = queryBuildUseDbMsg; queryBuildMsg[TMSG_INDEX(TDMT_MND_USE_DB)] = queryBuildUseDbMsg;
queryBuildMsg[TMSG_INDEX(TDMT_MND_QNODE_LIST)] = queryBuildQnodeListMsg; queryBuildMsg[TMSG_INDEX(TDMT_MND_QNODE_LIST)] = queryBuildQnodeListMsg;
queryBuildMsg[TMSG_INDEX(TDMT_MND_DNODE_LIST)] = queryBuildDnodeListMsg;
queryBuildMsg[TMSG_INDEX(TDMT_MND_GET_DB_CFG)] = queryBuildGetDBCfgMsg; queryBuildMsg[TMSG_INDEX(TDMT_MND_GET_DB_CFG)] = queryBuildGetDBCfgMsg;
queryBuildMsg[TMSG_INDEX(TDMT_MND_GET_INDEX)] = queryBuildGetIndexMsg; queryBuildMsg[TMSG_INDEX(TDMT_MND_GET_INDEX)] = queryBuildGetIndexMsg;
queryBuildMsg[TMSG_INDEX(TDMT_MND_RETRIEVE_FUNC)] = queryBuildRetrieveFuncMsg; queryBuildMsg[TMSG_INDEX(TDMT_MND_RETRIEVE_FUNC)] = queryBuildRetrieveFuncMsg;
...@@ -547,6 +588,7 @@ void initQueryModuleMsgHandle() { ...@@ -547,6 +588,7 @@ void initQueryModuleMsgHandle() {
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_TABLE_META)] = queryProcessTableMetaRsp; queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_TABLE_META)] = queryProcessTableMetaRsp;
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_USE_DB)] = queryProcessUseDBRsp; queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_USE_DB)] = queryProcessUseDBRsp;
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_QNODE_LIST)] = queryProcessQnodeListRsp; queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_QNODE_LIST)] = queryProcessQnodeListRsp;
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_DNODE_LIST)] = queryProcessDnodeListRsp;
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_DB_CFG)] = queryProcessGetDbCfgRsp; queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_DB_CFG)] = queryProcessGetDbCfgRsp;
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_INDEX)] = queryProcessGetIndexRsp; queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_INDEX)] = queryProcessGetIndexRsp;
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_RETRIEVE_FUNC)] = queryProcessRetrieveFuncRsp; queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_RETRIEVE_FUNC)] = queryProcessRetrieveFuncRsp;
......
...@@ -1288,11 +1288,11 @@ TEST(columnTest, json_column_logic_op) { ...@@ -1288,11 +1288,11 @@ TEST(columnTest, json_column_logic_op) {
printf("--------------------json string--0 {1, 8, 2, 2, 3, 0, 0, 0, 0}-------------------\n"); printf("--------------------json string--0 {1, 8, 2, 2, 3, 0, 0, 0, 0}-------------------\n");
key = "k2"; key = "k2";
bool eRes1[len+len1] = {false, false, true, true, false, false, false, true, false, true, false, true, true}; bool eRes1[len+len1] = {false, false, false, false, false, false, false, true, false, true, false, true, true};
for(int i = 0; i < len; i++){ for(int i = 0; i < len; i++){
makeCalculate(row, key, TSDB_DATA_TYPE_INT, &input[i], eRes1[i], op[i], false); makeCalculate(row, key, TSDB_DATA_TYPE_INT, &input[i], eRes1[i], op[i], false);
} }
bool eRes_1[len0] = {true, true, false, false, false, false}; bool eRes_1[len0] = {false, false, false, false, false, false};
for(int i = 0; i < len0; i++){ for(int i = 0; i < len0; i++){
makeCalculate(row, key, TSDB_DATA_TYPE_INT, &input[i], eRes_1[i], op[i], true); makeCalculate(row, key, TSDB_DATA_TYPE_INT, &input[i], eRes_1[i], op[i], true);
} }
...@@ -1324,11 +1324,11 @@ TEST(columnTest, json_column_logic_op) { ...@@ -1324,11 +1324,11 @@ TEST(columnTest, json_column_logic_op) {
printf("--------------------json bool--1 {1, 8, 2, 2, 3, 0, 0, 0, 0}-------------------\n"); printf("--------------------json bool--1 {1, 8, 2, 2, 3, 0, 0, 0, 0}-------------------\n");
key = "k4"; key = "k4";
bool eRes3[len+len1] = {false, false, true, true, false, true, false, true, true, false, false, false, false}; bool eRes3[len+len1] = {false, false, false, false, false, false, false, true, true, false, false, false, false};
for(int i = 0; i < len; i++){ for(int i = 0; i < len; i++){
makeCalculate(row, key, TSDB_DATA_TYPE_INT, &input[i], eRes3[i], op[i], false); makeCalculate(row, key, TSDB_DATA_TYPE_INT, &input[i], eRes3[i], op[i], false);
} }
bool eRes_3[len0] = {false, true, false, false, false, true}; bool eRes_3[len0] = {false, false, false, false, false, false};
for(int i = 0; i < len0; i++){ for(int i = 0; i < len0; i++){
makeCalculate(row, key, TSDB_DATA_TYPE_INT, &input[i], eRes_3[i], op[i], true); makeCalculate(row, key, TSDB_DATA_TYPE_INT, &input[i], eRes_3[i], op[i], true);
} }
...@@ -1397,11 +1397,11 @@ TEST(columnTest, json_column_logic_op) { ...@@ -1397,11 +1397,11 @@ TEST(columnTest, json_column_logic_op) {
printf("--------------------json bool-- 0 {1, 8, 2, 2, 3, 0, 0, 0, 0}-------------------\n"); printf("--------------------json bool-- 0 {1, 8, 2, 2, 3, 0, 0, 0, 0}-------------------\n");
key = "k8"; key = "k8";
bool eRes7[len+len1] = {false, false, true, true, false, false, false, true, false, false, false, false, false}; bool eRes7[len+len1] = {false, false, false, false, false, false, false, true, false, false, false, false, false};
for(int i = 0; i < len; i++){ for(int i = 0; i < len; i++){
makeCalculate(row, key, TSDB_DATA_TYPE_INT, &input[i], eRes7[i], op[i], false); makeCalculate(row, key, TSDB_DATA_TYPE_INT, &input[i], eRes7[i], op[i], false);
} }
bool eRes_7[len0] = {true, true, false, false, false, false}; bool eRes_7[len0] = {false, false, false, false, false, false};
for(int i = 0; i < len0; i++) { for(int i = 0; i < len0; i++) {
makeCalculate(row, key, TSDB_DATA_TYPE_INT, &input[i], eRes_7[i], op[i], true); makeCalculate(row, key, TSDB_DATA_TYPE_INT, &input[i], eRes_7[i], op[i], true);
} }
...@@ -1416,11 +1416,11 @@ TEST(columnTest, json_column_logic_op) { ...@@ -1416,11 +1416,11 @@ TEST(columnTest, json_column_logic_op) {
printf("--------------------json string-- 6.6hello {1, 8, 2, 2, 3, 0, 0, 0, 0}-------------------\n"); printf("--------------------json string-- 6.6hello {1, 8, 2, 2, 3, 0, 0, 0, 0}-------------------\n");
key = "k9"; key = "k9";
bool eRes8[len+len1] = {true, false, false, false, false, true, false, true, true, false, true, false, true}; bool eRes8[len+len1] = {false, false, false, false, false, false, false, true, true, false, true, false, true};
for(int i = 0; i < len; i++){ for(int i = 0; i < len; i++){
makeCalculate(row, key, TSDB_DATA_TYPE_INT, &input[i], eRes8[i], op[i], false); makeCalculate(row, key, TSDB_DATA_TYPE_INT, &input[i], eRes8[i], op[i], false);
} }
bool eRes_8[len0] = {false, true, true, true, false, true}; bool eRes_8[len0] = {false, false, false, false, false, false};
for(int i = 0; i < len0; i++) { for(int i = 0; i < len0; i++) {
makeCalculate(row, key, TSDB_DATA_TYPE_INT, &input[i], eRes_8[i], op[i], true); makeCalculate(row, key, TSDB_DATA_TYPE_INT, &input[i], eRes_8[i], op[i], true);
} }
......
...@@ -436,6 +436,11 @@ static bool syncNodeOnAppendEntriesLogOK(SSyncNode* pSyncNode, SyncAppendEntries ...@@ -436,6 +436,11 @@ static bool syncNodeOnAppendEntriesLogOK(SSyncNode* pSyncNode, SyncAppendEntries
} }
SyncTerm myPreLogTerm = syncNodeGetPreTerm(pSyncNode, pMsg->prevLogIndex + 1); SyncTerm myPreLogTerm = syncNodeGetPreTerm(pSyncNode, pMsg->prevLogIndex + 1);
if (myPreLogTerm == SYNC_TERM_INVALID) {
sError("vgId:%d sync get pre term error, preindex:%ld", pSyncNode->vgId, pMsg->prevLogIndex);
return false;
}
if (pMsg->prevLogIndex <= myLastIndex && pMsg->prevLogTerm == myPreLogTerm) { if (pMsg->prevLogIndex <= myLastIndex && pMsg->prevLogTerm == myPreLogTerm) {
if (gRaftDetailLog) { if (gRaftDetailLog) {
sTrace( sTrace(
......
...@@ -345,7 +345,7 @@ bool syncCanLeaderTransfer(int64_t rid) { ...@@ -345,7 +345,7 @@ bool syncCanLeaderTransfer(int64_t rid) {
return matchOK; return matchOK;
} }
int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pMsg, bool isWeak) { int32_t syncForwardToPeer(int64_t rid, SRpcMsg* pMsg, bool isWeak) {
int32_t ret = syncPropose(rid, pMsg, isWeak); int32_t ret = syncPropose(rid, pMsg, isWeak);
return ret; return ret;
} }
...@@ -584,7 +584,7 @@ void setHeartbeatTimerMS(int64_t rid, int32_t hbTimerMS) { ...@@ -584,7 +584,7 @@ void setHeartbeatTimerMS(int64_t rid, int32_t hbTimerMS) {
taosReleaseRef(tsNodeRefId, pSyncNode->rid); taosReleaseRef(tsNodeRefId, pSyncNode->rid);
} }
int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) { int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak) {
int32_t ret = 0; int32_t ret = 0;
SSyncNode* pSyncNode = taosAcquireRef(tsNodeRefId, rid); SSyncNode* pSyncNode = taosAcquireRef(tsNodeRefId, rid);
...@@ -1309,40 +1309,44 @@ void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) { ...@@ -1309,40 +1309,44 @@ void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) {
SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore); SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
SyncIndex logBeginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore); SyncIndex logBeginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore);
char* pCfgStr = syncCfg2SimpleStr(&(pSyncNode->pRaftCfg->cfg));
if (userStrLen < 256) { if (userStrLen < 256) {
char logBuf[128 + 256]; char logBuf[256 + 256];
if (pSyncNode != NULL && pSyncNode->pRaftCfg != NULL && pSyncNode->pRaftStore != NULL) { if (pSyncNode != NULL && pSyncNode->pRaftCfg != NULL && pSyncNode->pRaftStore != NULL) {
snprintf(logBuf, sizeof(logBuf), snprintf(logBuf, sizeof(logBuf),
"vgId:%d, sync %s %s, term:%lu, commit:%ld, beginlog:%ld, lastlog:%ld, lastsnapshot:%ld, standby:%d, " "vgId:%d, sync %s %s, term:%lu, commit:%ld, beginlog:%ld, lastlog:%ld, lastsnapshot:%ld, standby:%d, "
"replica-num:%d, " "replica-num:%d, "
"lconfig:%ld, changing:%d", "lconfig:%ld, changing:%d, %s",
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm, pSyncNode->vgId, syncUtilState2String(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm,
pSyncNode->commitIndex, logBeginIndex, logLastIndex, snapshot.lastApplyIndex, pSyncNode->commitIndex, logBeginIndex, logLastIndex, snapshot.lastApplyIndex,
pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex, pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex,
pSyncNode->changing); pSyncNode->changing, pCfgStr);
} else { } else {
snprintf(logBuf, sizeof(logBuf), "%s", str); snprintf(logBuf, sizeof(logBuf), "%s", str);
} }
sDebug("%s", logBuf); sDebug("%s", logBuf);
} else { } else {
int len = 128 + userStrLen; int len = 256 + userStrLen;
char* s = (char*)taosMemoryMalloc(len); char* s = (char*)taosMemoryMalloc(len);
if (pSyncNode != NULL && pSyncNode->pRaftCfg != NULL && pSyncNode->pRaftStore != NULL) { if (pSyncNode != NULL && pSyncNode->pRaftCfg != NULL && pSyncNode->pRaftStore != NULL) {
snprintf(s, len, snprintf(s, len,
"vgId:%d, sync %s %s, term:%lu, commit:%ld, beginlog:%ld, lastlog:%ld, lastsnapshot:%ld, standby:%d, " "vgId:%d, sync %s %s, term:%lu, commit:%ld, beginlog:%ld, lastlog:%ld, lastsnapshot:%ld, standby:%d, "
"replica-num:%d, " "replica-num:%d, "
"lconfig:%ld, changing:%d", "lconfig:%ld, changing:%d, %s",
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm, pSyncNode->vgId, syncUtilState2String(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm,
pSyncNode->commitIndex, logBeginIndex, logLastIndex, snapshot.lastApplyIndex, pSyncNode->commitIndex, logBeginIndex, logLastIndex, snapshot.lastApplyIndex,
pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex, pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex,
pSyncNode->changing); pSyncNode->changing, pCfgStr);
} else { } else {
snprintf(s, len, "%s", str); snprintf(s, len, "%s", str);
} }
sDebug("%s", s); sDebug("%s", s);
taosMemoryFree(s); taosMemoryFree(s);
} }
taosMemoryFree(pCfgStr);
} }
void syncNodeErrorLog(const SSyncNode* pSyncNode, char* str) { void syncNodeErrorLog(const SSyncNode* pSyncNode, char* str) {
...@@ -1455,6 +1459,17 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde ...@@ -1455,6 +1459,17 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde
isAdd = false; isAdd = false;
} }
// log begin config change
do {
char eventLog[256];
char* pOldCfgStr = syncCfg2SimpleStr(&oldConfig);
char* pNewCfgStr = syncCfg2SimpleStr(pNewConfig);
snprintf(eventLog, sizeof(eventLog), "begin do config change, from %s to %s", pOldCfgStr, pNewCfgStr);
syncNodeEventLog(pSyncNode, eventLog);
taosMemoryFree(pOldCfgStr);
taosMemoryFree(pNewCfgStr);
} while (0);
if (IamInNew) { if (IamInNew) {
pSyncNode->pRaftCfg->isStandBy = 0; // change isStandBy to normal pSyncNode->pRaftCfg->isStandBy = 0; // change isStandBy to normal
} }
...@@ -1613,6 +1628,17 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde ...@@ -1613,6 +1628,17 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde
} }
_END: _END:
// log end config change
do {
char eventLog[256];
char* pOldCfgStr = syncCfg2SimpleStr(&oldConfig);
char* pNewCfgStr = syncCfg2SimpleStr(pNewConfig);
snprintf(eventLog, sizeof(eventLog), "end do config change, from %s to %s", pOldCfgStr, pNewCfgStr);
syncNodeEventLog(pSyncNode, eventLog);
taosMemoryFree(pOldCfgStr);
taosMemoryFree(pNewCfgStr);
} while (0);
return; return;
} }
...@@ -1888,6 +1914,16 @@ SyncIndex syncNodeSyncStartIndex(SSyncNode* pSyncNode) { ...@@ -1888,6 +1914,16 @@ SyncIndex syncNodeSyncStartIndex(SSyncNode* pSyncNode) {
return syncStartIndex; return syncStartIndex;
} }
SyncIndex syncNodeGetPreIndex(SSyncNode* pSyncNode, SyncIndex index) {
SyncIndex preIndex = index - 1;
if (preIndex < SYNC_INDEX_INVALID) {
preIndex = SYNC_INDEX_INVALID;
}
return preIndex;
}
/*
SyncIndex syncNodeGetPreIndex(SSyncNode* pSyncNode, SyncIndex index) { SyncIndex syncNodeGetPreIndex(SSyncNode* pSyncNode, SyncIndex index) {
ASSERT(index >= SYNC_INDEX_BEGIN); ASSERT(index >= SYNC_INDEX_BEGIN);
...@@ -1900,7 +1936,42 @@ SyncIndex syncNodeGetPreIndex(SSyncNode* pSyncNode, SyncIndex index) { ...@@ -1900,7 +1936,42 @@ SyncIndex syncNodeGetPreIndex(SSyncNode* pSyncNode, SyncIndex index) {
SyncIndex preIndex = index - 1; SyncIndex preIndex = index - 1;
return preIndex; return preIndex;
} }
*/
SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) {
if (index < SYNC_INDEX_BEGIN) {
return SYNC_TERM_INVALID;
}
if (index == SYNC_INDEX_BEGIN) {
return 0;
}
SyncTerm preTerm = 0;
SyncIndex preIndex = index - 1;
SSyncRaftEntry* pPreEntry = NULL;
int32_t code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, preIndex, &pPreEntry);
if (code == 0) {
ASSERT(pPreEntry != NULL);
preTerm = pPreEntry->term;
taosMemoryFree(pPreEntry);
return preTerm;
} else {
if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST) {
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
if (snapshot.lastApplyIndex == preIndex) {
return snapshot.lastApplyTerm;
}
}
}
}
return SYNC_TERM_INVALID;
}
#if 0
SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) { SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) {
ASSERT(index >= SYNC_INDEX_BEGIN); ASSERT(index >= SYNC_INDEX_BEGIN);
...@@ -1938,6 +2009,7 @@ SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) { ...@@ -1938,6 +2009,7 @@ SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) {
ASSERT(0); ASSERT(0);
return -1; return -1;
} }
#endif
#if 0 #if 0
SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) { SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) {
......
...@@ -144,7 +144,8 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr ...@@ -144,7 +144,8 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr
SyncIndex writeIndex = raftLogWriteIndex(pLogStore); SyncIndex writeIndex = raftLogWriteIndex(pLogStore);
if (pEntry->index != writeIndex) { if (pEntry->index != writeIndex) {
sError("raftLogAppendEntry error, pEntry->index:%ld update to writeIndex:%ld", pEntry->index, writeIndex); sError("vgId:%d wal write index error, entry-index:%ld update to %ld", pData->pSyncNode->vgId, pEntry->index,
writeIndex);
pEntry->index = writeIndex; pEntry->index = writeIndex;
} }
...@@ -157,10 +158,10 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr ...@@ -157,10 +158,10 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr
if (code != 0) { if (code != 0) {
int32_t err = terrno; int32_t err = terrno;
const char* errStr = tstrerror(err); const char* errStr = tstrerror(err);
int32_t linuxErr = errno; int32_t sysErr = errno;
const char* linuxErrMsg = strerror(errno); const char* sysErrStr = strerror(errno);
sError("raftLogAppendEntry error, err:%d %X, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, err, errStr, linuxErr, sError("vgId:%d wal write error, index:%ld, err:%d %X, msg:%s, syserr:%d, sysmsg:%s", pData->pSyncNode->vgId,
linuxErrMsg); pEntry->index, err, err, errStr, sysErr, sysErrStr);
ASSERT(0); ASSERT(0);
} }
...@@ -237,12 +238,15 @@ static int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, ...@@ -237,12 +238,15 @@ static int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index,
if (code != 0) { if (code != 0) {
int32_t err = terrno; int32_t err = terrno;
const char* errStr = tstrerror(err); const char* errStr = tstrerror(err);
int32_t linuxErr = errno; int32_t sysErr = errno;
const char* linuxErrMsg = strerror(errno); const char* sysErrStr = strerror(errno);
sError("raftLogGetEntry error, err:%d %X, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, err, errStr, linuxErr, sError("vgId:%d wal read error, index:%ld, err:%d %X, msg:%s, syserr:%d, sysmsg:%s", pData->pSyncNode->vgId, index,
linuxErrMsg); err, err, errStr, sysErr, sysErrStr);
int32_t saveErr = terrno;
walCloseReadHandle(pWalHandle); walCloseReadHandle(pWalHandle);
terrno = saveErr;
return code; return code;
} }
...@@ -257,8 +261,9 @@ static int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, ...@@ -257,8 +261,9 @@ static int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index,
ASSERT((*ppEntry)->dataLen == pWalHandle->pHead->head.bodyLen); ASSERT((*ppEntry)->dataLen == pWalHandle->pHead->head.bodyLen);
memcpy((*ppEntry)->data, pWalHandle->pHead->head.body, pWalHandle->pHead->head.bodyLen); memcpy((*ppEntry)->data, pWalHandle->pHead->head.body, pWalHandle->pHead->head.bodyLen);
// need to hold, do not new every time!! int32_t saveErr = terrno;
walCloseReadHandle(pWalHandle); walCloseReadHandle(pWalHandle);
terrno = saveErr;
return code; return code;
} }
...@@ -270,10 +275,11 @@ static int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex fromIn ...@@ -270,10 +275,11 @@ static int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex fromIn
if (code != 0) { if (code != 0) {
int32_t err = terrno; int32_t err = terrno;
const char* errStr = tstrerror(err); const char* errStr = tstrerror(err);
int32_t linuxErr = errno; int32_t sysErr = errno;
const char* linuxErrMsg = strerror(errno); const char* sysErrStr = strerror(errno);
sError("raftLogTruncate error, err:%d %X, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, err, errStr, linuxErr, sError("vgId:%d wal truncate error, from-index:%ld, err:%d %X, msg:%s, syserr:%d, sysmsg:%s",
linuxErrMsg); pData->pSyncNode->vgId, fromIndex, err, err, errStr, sysErr, sysErrStr);
ASSERT(0); ASSERT(0);
} }
return code; return code;
...@@ -360,10 +366,11 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) { ...@@ -360,10 +366,11 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
if (code != 0) { if (code != 0) {
int32_t err = terrno; int32_t err = terrno;
const char* errStr = tstrerror(err); const char* errStr = tstrerror(err);
int32_t linuxErr = errno; int32_t sysErr = errno;
const char* linuxErrMsg = strerror(errno); const char* sysErrStr = strerror(errno);
sError("walWriteWithSyncInfo error, err:%d %X, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, err, errStr, linuxErr, sError("vgId:%d wal write error, index:%ld, err:%d %X, msg:%s, syserr:%d, sysmsg:%s", pData->pSyncNode->vgId,
linuxErrMsg); pEntry->index, err, err, errStr, sysErr, sysErrStr);
ASSERT(0); ASSERT(0);
} }
...@@ -389,10 +396,11 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) { ...@@ -389,10 +396,11 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) {
if (code != 0) { if (code != 0) {
int32_t err = terrno; int32_t err = terrno;
const char* errStr = tstrerror(err); const char* errStr = tstrerror(err);
int32_t linuxErr = errno; int32_t sysErr = errno;
const char* linuxErrMsg = strerror(errno); const char* sysErrStr = strerror(errno);
sError("walReadWithHandle error, err:%d %X, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, err, errStr, linuxErr, sError("vgId:%d wal read error, index:%ld, err:%d %X, msg:%s, syserr:%d, sysmsg:%s", pData->pSyncNode->vgId,
linuxErrMsg); index, err, err, errStr, sysErr, sysErrStr);
ASSERT(0); ASSERT(0);
} }
// ASSERT(walReadWithHandle(pWalHandle, index) == 0); // ASSERT(walReadWithHandle(pWalHandle, index) == 0);
...@@ -409,8 +417,10 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) { ...@@ -409,8 +417,10 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) {
ASSERT(pEntry->dataLen == pWalHandle->pHead->head.bodyLen); ASSERT(pEntry->dataLen == pWalHandle->pHead->head.bodyLen);
memcpy(pEntry->data, pWalHandle->pHead->head.body, pWalHandle->pHead->head.bodyLen); memcpy(pEntry->data, pWalHandle->pHead->head.body, pWalHandle->pHead->head.bodyLen);
// need to hold, do not new every time!! int32_t saveErr = terrno;
walCloseReadHandle(pWalHandle); walCloseReadHandle(pWalHandle);
terrno = saveErr;
return pEntry; return pEntry;
} else { } else {
...@@ -426,10 +436,11 @@ int32_t logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex fromIndex) { ...@@ -426,10 +436,11 @@ int32_t logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex fromIndex) {
if (code != 0) { if (code != 0) {
int32_t err = terrno; int32_t err = terrno;
const char* errStr = tstrerror(err); const char* errStr = tstrerror(err);
int32_t linuxErr = errno; int32_t sysErr = errno;
const char* linuxErrMsg = strerror(errno); const char* sysErrStr = strerror(errno);
sError("walRollback error, err:%d %X, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, err, errStr, linuxErr, sError("vgId:%d wal truncate error, from-index:%ld, err:%d %X, msg:%s, syserr:%d, sysmsg:%s",
linuxErrMsg); pData->pSyncNode->vgId, fromIndex, err, err, errStr, sysErr, sysErrStr);
ASSERT(0); ASSERT(0);
} }
return 0; return 0;
...@@ -460,9 +471,11 @@ int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index) { ...@@ -460,9 +471,11 @@ int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index) {
if (code != 0) { if (code != 0) {
int32_t err = terrno; int32_t err = terrno;
const char* errStr = tstrerror(err); const char* errStr = tstrerror(err);
int32_t linuxErr = errno; int32_t sysErr = errno;
const char* linuxErrMsg = strerror(errno); const char* sysErrStr = strerror(errno);
sError("walCommit error, err:%d %X, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, err, errStr, linuxErr, linuxErrMsg); sError("vgId:%d wal update commit index error, index:%ld, err:%d %X, msg:%s, syserr:%d, sysmsg:%s",
pData->pSyncNode->vgId, index, err, err, errStr, sysErr, sysErrStr);
ASSERT(0); ASSERT(0);
} }
return 0; return 0;
......
...@@ -139,6 +139,15 @@ int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode) { ...@@ -139,6 +139,15 @@ int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode) {
// pre index, pre term // pre index, pre term
SyncIndex preLogIndex = syncNodeGetPreIndex(pSyncNode, nextIndex); SyncIndex preLogIndex = syncNodeGetPreIndex(pSyncNode, nextIndex);
SyncTerm preLogTerm = syncNodeGetPreTerm(pSyncNode, nextIndex); SyncTerm preLogTerm = syncNodeGetPreTerm(pSyncNode, nextIndex);
if (preLogTerm == SYNC_TERM_INVALID) {
SyncIndex newNextIndex = syncNodeGetLastIndex(pSyncNode) + 1;
syncIndexMgrSetIndex(pSyncNode->pNextIndex, pDestId, newNextIndex);
syncIndexMgrSetIndex(pSyncNode->pMatchIndex, pDestId, SYNC_INDEX_INVALID);
sError("vgId:%d sync get pre term error, nextIndex:%ld, update next-index:%ld, match-index:%d, raftid:%ld",
pSyncNode->vgId, nextIndex, newNextIndex, SYNC_INDEX_INVALID, pDestId->addr);
return -1;
}
// batch optimized // batch optimized
// SyncIndex lastIndex = syncUtilMinIndex(pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore), nextIndex); // SyncIndex lastIndex = syncUtilMinIndex(pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore), nextIndex);
......
...@@ -113,8 +113,16 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender, SSnapshot snapshot, void ...@@ -113,8 +113,16 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender, SSnapshot snapshot, void
} }
if (!getLastConfig) { if (!getLastConfig) {
syncNodeLog3("", pSender->pSyncNode); char logBuf[128];
ASSERT(0); snprintf(logBuf, sizeof(logBuf), "snapshot sender update lcindex from %ld to -1",
pSender->snapshot.lastConfigIndex);
pSender->snapshot.lastConfigIndex = -1;
char *eventLog = snapshotSender2SimpleStr(pSender, logBuf);
syncNodeEventLog(pSender->pSyncNode, eventLog);
taosMemoryFree(eventLog);
memset(&(pSender->lastConfig), 0, sizeof(SSyncCfg));
} }
} else { } else {
......
...@@ -857,20 +857,20 @@ static int tdbBtreeBalance(SBTC *pBtc) { ...@@ -857,20 +857,20 @@ static int tdbBtreeBalance(SBTC *pBtc) {
} }
// TDB_BTREE_BALANCE // TDB_BTREE_BALANCE
static int tdbFetchOvflPage(SPager *pPager, SPgno *pPgno, SPage **ppOfp, TXN *pTxn, SBTree *pBt) { static int tdbFetchOvflPage(SPgno *pPgno, SPage **ppOfp, TXN *pTxn, SBTree *pBt) {
int ret = 0; int ret = 0;
*pPgno = 0; *pPgno = 0;
SBtreeInitPageArg iArg; SBtreeInitPageArg iArg;
iArg.pBt = pBt; iArg.pBt = pBt;
iArg.flags = TDB_FLAG_ADD(0, TDB_BTREE_OVFL); iArg.flags = TDB_FLAG_ADD(0, TDB_BTREE_OVFL);
ret = tdbPagerFetchPage(pPager, pPgno, ppOfp, tdbBtreeInitPage, &iArg, pTxn); ret = tdbPagerFetchPage(pBt->pPager, pPgno, ppOfp, tdbBtreeInitPage, &iArg, pTxn);
if (ret < 0) { if (ret < 0) {
return -1; return -1;
} }
// mark dirty // mark dirty
ret = tdbPagerWrite(pPager, *ppOfp); ret = tdbPagerWrite(pBt->pPager, *ppOfp);
if (ret < 0) { if (ret < 0) {
ASSERT(0); ASSERT(0);
return -1; return -1;
...@@ -879,13 +879,13 @@ static int tdbFetchOvflPage(SPager *pPager, SPgno *pPgno, SPage **ppOfp, TXN *pT ...@@ -879,13 +879,13 @@ static int tdbFetchOvflPage(SPager *pPager, SPgno *pPgno, SPage **ppOfp, TXN *pT
return ret; return ret;
} }
static int tdbLoadOvflPage(SPager *pPager, SPgno *pPgno, SPage **ppOfp, TXN *pTxn, SBTree *pBt) { static int tdbLoadOvflPage(SPgno *pPgno, SPage **ppOfp, TXN *pTxn, SBTree *pBt) {
int ret = 0; int ret = 0;
SBtreeInitPageArg iArg; SBtreeInitPageArg iArg;
iArg.pBt = pBt; iArg.pBt = pBt;
iArg.flags = TDB_FLAG_ADD(0, TDB_BTREE_OVFL); iArg.flags = TDB_FLAG_ADD(0, TDB_BTREE_OVFL);
ret = tdbPagerFetchPage(pPager, pPgno, ppOfp, tdbBtreeInitPage, &iArg, pTxn); ret = tdbPagerFetchPage(pBt->pPager, pPgno, ppOfp, tdbBtreeInitPage, &iArg, pTxn);
if (ret < 0) { if (ret < 0) {
return -1; return -1;
} }
...@@ -922,7 +922,7 @@ static int tdbBtreeEncodePayload(SPage *pPage, SCell *pCell, int nHeader, const ...@@ -922,7 +922,7 @@ static int tdbBtreeEncodePayload(SPage *pPage, SCell *pCell, int nHeader, const
SPgno pgno = 0; SPgno pgno = 0;
SPage *ofp, *nextOfp; SPage *ofp, *nextOfp;
ret = tdbFetchOvflPage(pPage->pPager, &pgno, &ofp, pTxn, pBt); ret = tdbFetchOvflPage(&pgno, &ofp, pTxn, pBt);
if (ret < 0) { if (ret < 0) {
return -1; return -1;
} }
...@@ -962,7 +962,7 @@ static int tdbBtreeEncodePayload(SPage *pPage, SCell *pCell, int nHeader, const ...@@ -962,7 +962,7 @@ static int tdbBtreeEncodePayload(SPage *pPage, SCell *pCell, int nHeader, const
// fetch next ofp if not last page // fetch next ofp if not last page
if (!lastPage) { if (!lastPage) {
// fetch a new ofp and make it dirty // fetch a new ofp and make it dirty
ret = tdbFetchOvflPage(pPage->pPager, &pgno, &nextOfp, pTxn, pBt); ret = tdbFetchOvflPage(&pgno, &nextOfp, pTxn, pBt);
if (ret < 0) { if (ret < 0) {
tdbFree(pBuf); tdbFree(pBuf);
return -1; return -1;
...@@ -1019,14 +1019,14 @@ static int tdbBtreeEncodePayload(SPage *pPage, SCell *pCell, int nHeader, const ...@@ -1019,14 +1019,14 @@ static int tdbBtreeEncodePayload(SPage *pPage, SCell *pCell, int nHeader, const
nLeft -= lastKeyPageSpace; nLeft -= lastKeyPageSpace;
// fetch next ofp, a new ofp and make it dirty // fetch next ofp, a new ofp and make it dirty
ret = tdbFetchOvflPage(pPage->pPager, &pgno, &nextOfp, pTxn, pBt); ret = tdbFetchOvflPage(&pgno, &nextOfp, pTxn, pBt);
if (ret < 0) { if (ret < 0) {
return -1; return -1;
} }
} }
} else { } else {
// fetch next ofp, a new ofp and make it dirty // fetch next ofp, a new ofp and make it dirty
ret = tdbFetchOvflPage(pPage->pPager, &pgno, &nextOfp, pTxn, pBt); ret = tdbFetchOvflPage(&pgno, &nextOfp, pTxn, pBt);
if (ret < 0) { if (ret < 0) {
return -1; return -1;
} }
...@@ -1057,7 +1057,7 @@ static int tdbBtreeEncodePayload(SPage *pPage, SCell *pCell, int nHeader, const ...@@ -1057,7 +1057,7 @@ static int tdbBtreeEncodePayload(SPage *pPage, SCell *pCell, int nHeader, const
// fetch next ofp if not last page // fetch next ofp if not last page
if (!lastPage) { if (!lastPage) {
// fetch a new ofp and make it dirty // fetch a new ofp and make it dirty
ret = tdbFetchOvflPage(pPage->pPager, &pgno, &nextOfp, pTxn, pBt); ret = tdbFetchOvflPage(&pgno, &nextOfp, pTxn, pBt);
if (ret < 0) { if (ret < 0) {
tdbFree(pBuf); tdbFree(pBuf);
return -1; return -1;
...@@ -1198,7 +1198,7 @@ static int tdbBtreeDecodePayload(SPage *pPage, const SCell *pCell, int nHeader, ...@@ -1198,7 +1198,7 @@ static int tdbBtreeDecodePayload(SPage *pPage, const SCell *pCell, int nHeader,
// unpack left val data from ovpages // unpack left val data from ovpages
while (pgno != 0) { while (pgno != 0) {
ret = tdbLoadOvflPage(pPage->pPager, &pgno, &ofp, pTxn, pBt); ret = tdbLoadOvflPage(&pgno, &ofp, pTxn, pBt);
if (ret < 0) { if (ret < 0) {
return -1; return -1;
} }
...@@ -1235,7 +1235,7 @@ static int tdbBtreeDecodePayload(SPage *pPage, const SCell *pCell, int nHeader, ...@@ -1235,7 +1235,7 @@ static int tdbBtreeDecodePayload(SPage *pPage, const SCell *pCell, int nHeader,
int lastKeyPageSpace = 0; int lastKeyPageSpace = 0;
// load left key & val to ovpages // load left key & val to ovpages
while (pgno != 0) { while (pgno != 0) {
ret = tdbLoadOvflPage(pPage->pPager, &pgno, &ofp, pTxn, pBt); ret = tdbLoadOvflPage(&pgno, &ofp, pTxn, pBt);
if (ret < 0) { if (ret < 0) {
return -1; return -1;
} }
...@@ -1280,7 +1280,7 @@ static int tdbBtreeDecodePayload(SPage *pPage, const SCell *pCell, int nHeader, ...@@ -1280,7 +1280,7 @@ static int tdbBtreeDecodePayload(SPage *pPage, const SCell *pCell, int nHeader,
} }
while (nLeft > 0) { while (nLeft > 0) {
ret = tdbLoadOvflPage(pPage->pPager, &pgno, &ofp, pTxn, pBt); ret = tdbLoadOvflPage(&pgno, &ofp, pTxn, pBt);
if (ret < 0) { if (ret < 0) {
return -1; return -1;
} }
...@@ -1411,7 +1411,7 @@ static int tdbBtreeCellSize(const SPage *pPage, SCell *pCell, int dropOfp, TXN * ...@@ -1411,7 +1411,7 @@ static int tdbBtreeCellSize(const SPage *pPage, SCell *pCell, int dropOfp, TXN *
int bytes; int bytes;
while (pgno != 0) { while (pgno != 0) {
ret = tdbLoadOvflPage(pPage->pPager, &pgno, &ofp, pTxn, pBt); ret = tdbLoadOvflPage(&pgno, &ofp, pTxn, pBt);
if (ret < 0) { if (ret < 0) {
return -1; return -1;
} }
...@@ -2023,7 +2023,7 @@ int tdbBtcMoveTo(SBTC *pBtc, const void *pKey, int kLen, int *pCRst) { ...@@ -2023,7 +2023,7 @@ int tdbBtcMoveTo(SBTC *pBtc, const void *pKey, int kLen, int *pCRst) {
// check if key <= current position // check if key <= current position
if (idx < nCells) { if (idx < nCells) {
pCell = tdbPageGetCell(pPage, idx); pCell = tdbPageGetCell(pPage, idx);
tdbBtreeDecodeCell(pPage, pCell, &cd, pBtc->pTxn, pBtc->pBt); tdbBtreeDecodeCell(pPage, pCell, &cd);
c = pBt->kcmpr(pKey, kLen, cd.pKey, cd.kLen); c = pBt->kcmpr(pKey, kLen, cd.pKey, cd.kLen);
if (c > 0) break; if (c > 0) break;
} }
......
...@@ -503,6 +503,38 @@ const char *cfgDtypeStr(ECfgDataType type) { ...@@ -503,6 +503,38 @@ const char *cfgDtypeStr(ECfgDataType type) {
} }
} }
void cfgDumpItemValue(SConfigItem *pItem, char* buf, int32_t bufSize, int32_t* pLen) {
int32_t len = 0;
switch (pItem->dtype) {
case CFG_DTYPE_BOOL:
len = snprintf(buf, bufSize, "%u", pItem->bval);
break;
case CFG_DTYPE_INT32:
len = snprintf(buf, bufSize, "%d", pItem->i32);
break;
case CFG_DTYPE_INT64:
len = snprintf(buf, bufSize, "%" PRId64, pItem->i64);
break;
case CFG_DTYPE_FLOAT:
len = snprintf(buf, bufSize, "%f", pItem->fval);
break;
case CFG_DTYPE_STRING:
case CFG_DTYPE_DIR:
case CFG_DTYPE_LOCALE:
case CFG_DTYPE_CHARSET:
case CFG_DTYPE_TIMEZONE:
case CFG_DTYPE_NONE:
len = snprintf(buf, bufSize, "%s", pItem->str);
break;
}
if (len > bufSize) {
len = bufSize;
}
*pLen = len;
}
void cfgDumpCfg(SConfig *pCfg, bool tsc, bool dump) { void cfgDumpCfg(SConfig *pCfg, bool tsc, bool dump) {
if (dump) { if (dump) {
printf(" global config"); printf(" global config");
...@@ -996,4 +1028,4 @@ int32_t cfgGetApollUrl(const char **envCmd, const char *envFile, char* apolloUrl ...@@ -996,4 +1028,4 @@ int32_t cfgGetApollUrl(const char **envCmd, const char *envFile, char* apolloUrl
uInfo("fail get apollo url from cmd env file"); uInfo("fail get apollo url from cmd env file");
return -1; return -1;
} }
\ No newline at end of file
...@@ -99,7 +99,7 @@ if $rows != 1 then ...@@ -99,7 +99,7 @@ if $rows != 1 then
endi endi
#sql select * from information_schema.`streams` #sql select * from information_schema.`streams`
sql select * from information_schema.user_tables sql select * from information_schema.user_tables
if $rows != 30 then if $rows != 31 then
return -1 return -1
endi endi
#sql select * from information_schema.user_table_distributed #sql select * from information_schema.user_table_distributed
...@@ -197,7 +197,7 @@ if $rows != 1 then ...@@ -197,7 +197,7 @@ if $rows != 1 then
endi endi
#sql select * from performance_schema.`streams` #sql select * from performance_schema.`streams`
sql select * from information_schema.user_tables sql select * from information_schema.user_tables
if $rows != 30 then if $rows != 31 then
return -1 return -1
endi endi
#sql select * from information_schema.user_table_distributed #sql select * from information_schema.user_table_distributed
...@@ -227,5 +227,20 @@ endi ...@@ -227,5 +227,20 @@ endi
sql_error show create stable t0; sql_error show create stable t0;
sql show variables;
if $rows != 4 then
return -1
endi
sql show dnode 1 variables;
if $rows != 114 then
return -1
endi
sql show local variables;
if $rows != 50 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode2 -s stop -x SIGINT system sh/exec.sh -n dnode2 -s stop -x SIGINT
...@@ -509,7 +509,7 @@ class TDTestCase: ...@@ -509,7 +509,7 @@ class TDTestCase:
tdSql.checkData(0, 0, 5.3) tdSql.checkData(0, 0, 5.3)
# tdSql.query("select twa(dataint) from jsons1 where jtag is not null") # tdSql.query("select twa(dataint) from jsons1 where jtag is not null")
# tdSql.checkData(0, 0, 36) # tdSql.checkData(0, 0, 36)
tdSql.query("select irate(dataint) from jsons1 where jtag is not null") # tdSql.error("select irate(dataint) from jsons1 where jtag is not null")
tdSql.query("select sum(dataint) from jsons1 where jtag->'tag1' is not null") tdSql.query("select sum(dataint) from jsons1 where jtag->'tag1' is not null")
tdSql.checkData(0, 0, 45) tdSql.checkData(0, 0, 45)
tdSql.query("select stddev(dataint) from jsons1 where jtag->'tag1'>1") tdSql.query("select stddev(dataint) from jsons1 where jtag->'tag1'>1")
......
from distutils.log import error
import taos
import sys
import time
import socket
import os
import threading
import subprocess
import platform
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
from util.common import *
sys.path.append("./7-tmq")
from tmqCommon import *
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor())
#tdSql.init(conn.cursor(), logSql) # output sql.txt file
def prepare_udf_so(self):
selfPath = os.path.dirname(os.path.realpath(__file__))
if ("community" in selfPath):
projPath = selfPath[:selfPath.find("community")]
else:
projPath = selfPath[:selfPath.find("tests")]
print(projPath)
if platform.system().lower() == 'windows':
self.libudf1 = subprocess.Popen('(for /r %s %%i in ("udf1.d*") do @echo %%i)|grep lib|head -n1'%projPath , shell=True, stdout=subprocess.PIPE,stderr=subprocess.STDOUT).stdout.read().decode("utf-8")
if (not tdDnodes.dnodes[0].remoteIP == ""):
tdDnodes.dnodes[0].remote_conn.get(tdDnodes.dnodes[0].config["path"]+'/debug/build/lib/libudf1.so',projPath+"\\debug\\build\\lib\\")
self.libudf1 = self.libudf1.replace('udf1.dll','libudf1.so')
else:
self.libudf1 = subprocess.Popen('find %s -name "libudf1.so"|grep lib|head -n1'%projPath , shell=True, stdout=subprocess.PIPE,stderr=subprocess.STDOUT).stdout.read().decode("utf-8")
self.libudf1 = self.libudf1.replace('\r','').replace('\n','')
return
def create_udf_function(self):
# create scalar functions
tdSql.execute("create function udf1 as '%s' outputtype int bufSize 8;"%self.libudf1)
functions = tdSql.getResult("show functions")
function_nums = len(functions)
if function_nums == 1:
tdLog.info("create one udf functions success ")
else:
tdLog.exit("create udf functions fail")
return
def checkFileContent(self, consumerId, queryString):
buildPath = tdCom.getBuildPath()
cfgPath = tdCom.getClientCfgPath()
dstFile = '%s/../log/dstrows_%d.txt'%(cfgPath, consumerId)
cmdStr = '%s/build/bin/taos -c %s -s "%s >> %s"'%(buildPath, cfgPath, queryString, dstFile)
tdLog.info(cmdStr)
os.system(cmdStr)
consumeRowsFile = '%s/../log/consumerid_%d.txt'%(cfgPath, consumerId)
tdLog.info("rows file: %s, %s"%(consumeRowsFile, dstFile))
consumeFile = open(consumeRowsFile, mode='r')
queryFile = open(dstFile, mode='r')
# skip first line for it is schema
queryFile.readline()
while True:
dst = queryFile.readline()
src = consumeFile.readline()
if dst:
if dst != src:
tdLog.exit("consumerId %d consume rows is not match the rows by direct query"%consumerId)
else:
break
return
def tmqCase1(self):
tdLog.printNoPrefix("======== test case 1: ")
paraDict = {'dbName': 'db1',
'dropFlag': 1,
'event': '',
'vgroups': 4,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':2}, {'type': 'binary', 'len':20, 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
'ctbPrefix': 'ctb',
'ctbNum': 1,
'rowsPerTbl': 1000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 10,
'showMsg': 1,
'showRow': 1}
topicNameList = ['topic1', 'topic2']
expectRowsList = []
tmqCom.initConsumerTable()
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=4,replica=1)
tdLog.info("create stb")
tdCom.create_stable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"], column_elm_list=paraDict['colSchema'], tag_elm_list=paraDict['tagSchema'])
tdLog.info("create ctb")
tdCom.create_ctable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"],tag_elm_list=paraDict['tagSchema'],count=paraDict["ctbNum"], default_ctbname_prefix=paraDict['ctbPrefix'])
tdLog.info("insert data")
tmqCom.insert_data_1(tdSql,paraDict["dbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"])
tdLog.info("create topics from stb with filter")
queryString = "select ts, c1,udf1(c1),c2,udf1(c2) from %s.%s where c1 %% 7 == 0" %(paraDict['dbName'], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
tdSql.query(queryString)
expectRowsList.append(tdSql.getRows())
# init consume info, and start tmq_sim, then check consume result
tdLog.info("insert consume info to consume processor")
consumerId = 0
expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"]
topicList = topicNameList[0]
ifcheckdata = 1
ifManualCommit = 1
keyList = 'group.id:cgrp1, enable.auto.commit:false, auto.commit.interval.ms:6000, auto.offset.reset:earliest'
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow'])
tdLog.info("wait the consume result")
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
if expectRowsList[0] != resultList[0]:
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0]))
tdLog.exit("0 tmq consume rows error!")
self.checkFileContent(consumerId, queryString)
tdLog.printNoPrefix("consumerId %d check data ok!"%(consumerId))
# reinit consume info, and start tmq_sim, then check consume result
tmqCom.initConsumerTable()
queryString = "select ts, c1,udf1(c1),sin(udf1(c2)), log(udf1(c2)) from %s.%s where udf1(c1) == 88 or sin(udf1(c1)) > 0" %(paraDict['dbName'], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicNameList[1], queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
tdSql.query(queryString)
expectRowsList.append(tdSql.getRows())
consumerId = 1
topicList = topicNameList[1]
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow'])
tdLog.info("wait the consume result")
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
if expectRowsList[1] != resultList[0]:
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[1], resultList[0]))
tdLog.exit("1 tmq consume rows error!")
self.checkFileContent(consumerId, queryString)
tdLog.printNoPrefix("consumerId %d check data ok!"%(consumerId))
time.sleep(10)
for i in range(len(topicNameList)):
tdSql.query("drop topic %s"%topicNameList[i])
tdLog.printNoPrefix("======== test case 1 end ...... ")
def run(self):
tdSql.prepare()
self.prepare_udf_so()
self.create_udf_function()
self.tmqCase1()
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
event = threading.Event()
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())
...@@ -133,3 +133,4 @@ python3 ./test.py -f 7-tmq/tmqError.py ...@@ -133,3 +133,4 @@ python3 ./test.py -f 7-tmq/tmqError.py
python3 ./test.py -f 7-tmq/schema.py python3 ./test.py -f 7-tmq/schema.py
python3 ./test.py -f 7-tmq/stbFilter.py python3 ./test.py -f 7-tmq/stbFilter.py
python3 ./test.py -f 7-tmq/tmqCheckData.py python3 ./test.py -f 7-tmq/tmqCheckData.py
python3 ./test.py -f 7-tmq/tmqUdf.py
...@@ -5,7 +5,7 @@ python3 .\test.py -f 0-others\taosShellNetChk.py ...@@ -5,7 +5,7 @@ python3 .\test.py -f 0-others\taosShellNetChk.py
python3 .\test.py -f 0-others\telemetry.py python3 .\test.py -f 0-others\telemetry.py
python3 .\test.py -f 0-others\taosdMonitor.py python3 .\test.py -f 0-others\taosdMonitor.py
python3 .\test.py -f 0-others\udfTest.py python3 .\test.py -f 0-others\udfTest.py
python3 .\test.py -f 0-others\udf_create.py @REM python3 .\test.py -f 0-others\udf_create.py
@REM python3 .\test.py -f 0-others\udf_restart_taosd.py @REM python3 .\test.py -f 0-others\udf_restart_taosd.py
@REM python3 .\test.py -f 0-others\cachelast.py @REM python3 .\test.py -f 0-others\cachelast.py
......
...@@ -520,6 +520,16 @@ bool shellIsLimitQuery(const char *sql) { ...@@ -520,6 +520,16 @@ bool shellIsLimitQuery(const char *sql) {
return false; return false;
} }
bool shellIsShowQuery(const char *sql) {
//todo refactor
if (taosStrCaseStr(sql, "show ") != NULL) {
return true;
}
return false;
}
int32_t shellVerticalPrintResult(TAOS_RES *tres, const char *sql) { int32_t shellVerticalPrintResult(TAOS_RES *tres, const char *sql) {
TAOS_ROW row = taos_fetch_row(tres); TAOS_ROW row = taos_fetch_row(tres);
if (row == NULL) { if (row == NULL) {
...@@ -682,7 +692,7 @@ int32_t shellHorizontalPrintResult(TAOS_RES *tres, const char *sql) { ...@@ -682,7 +692,7 @@ int32_t shellHorizontalPrintResult(TAOS_RES *tres, const char *sql) {
uint64_t resShowMaxNum = UINT64_MAX; uint64_t resShowMaxNum = UINT64_MAX;
if (shell.args.commands == NULL && shell.args.file[0] == 0 && !shellIsLimitQuery(sql)) { if (shell.args.commands == NULL && shell.args.file[0] == 0 && !shellIsLimitQuery(sql) && !shellIsShowQuery(sql)) {
resShowMaxNum = SHELL_DEFAULT_RES_SHOW_NUM; resShowMaxNum = SHELL_DEFAULT_RES_SHOW_NUM;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册