提交 2b21792c 编写于 作者: H Hongze Cheng

Merge branch '3.0' of https://github.com/taosdata/TDengine into feat/alter_table

......@@ -144,6 +144,7 @@ int32_t syncInit();
void syncCleanUp();
int64_t syncOpen(const SSyncInfo* pSyncInfo);
void syncStart(int64_t rid);
void syncStartStandBy(int64_t rid);
void syncStop(int64_t rid);
int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg);
ESyncState syncGetMyRole(int64_t rid);
......
......@@ -518,8 +518,9 @@ STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __t
if (pRequest->code != TSDB_CODE_SUCCESS) {
const char* errorMsg =
(pRequest->code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(pRequest->code);
printf("failed to connect to server, reason: %s\n\n", errorMsg);
fprintf(stderr,"failed to connect to server, reason: %s\n\n", errorMsg);
terrno = pRequest->code;
destroyRequest(pRequest);
taos_close(pTscObj);
pTscObj = NULL;
......
......@@ -2034,11 +2034,9 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
lastKeyAppend = key;
if (rv1 != TD_ROW_SVER(row1)) {
// pSchema1 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row1));
rv1 = TD_ROW_SVER(row1);
}
if (row2 && rv2 != TD_ROW_SVER(row2)) {
// pSchema2 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row2));
rv2 = TD_ROW_SVER(row2);
}
......
......@@ -584,6 +584,37 @@ static int32_t jsonToLogicProjectNode(const SJson* pJson, void* pObj) {
return code;
}
static const char* jkExchangeLogicPlanSrcGroupId = "SrcGroupId";
static const char* jkExchangeLogicPlanSrcPrecision = "Precision";
static int32_t logicExchangeNodeToJson(const void* pObj, SJson* pJson) {
const SExchangeLogicNode* pNode = (const SExchangeLogicNode*)pObj;
int32_t code = logicPlanNodeToJson(pObj, pJson);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkExchangeLogicPlanSrcGroupId, pNode->srcGroupId);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkExchangeLogicPlanSrcPrecision, pNode->precision);
}
return code;
}
static int32_t jsonToLogicExchangeNode(const SJson* pJson, void* pObj) {
SExchangeLogicNode* pNode = (SExchangeLogicNode*)pObj;
int32_t code = jsonToLogicPlanNode(pJson, pObj);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetIntValue(pJson, jkExchangeLogicPlanSrcGroupId, &pNode->srcGroupId);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetUTinyIntValue(pJson, jkExchangeLogicPlanSrcPrecision, &pNode->precision);
}
return code;
}
static const char* jkFillLogicPlanMode = "Mode";
static const char* jkFillLogicPlanWStartTs = "WStartTs";
static const char* jkFillLogicPlanValues = "Values";
......@@ -2987,6 +3018,8 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
return logicProjectNodeToJson(pObj, pJson);
case QUERY_NODE_LOGIC_PLAN_VNODE_MODIF:
break;
case QUERY_NODE_LOGIC_PLAN_EXCHANGE:
return logicExchangeNodeToJson(pObj, pJson);
case QUERY_NODE_LOGIC_PLAN_FILL:
return logicFillNodeToJson(pObj, pJson);
case QUERY_NODE_LOGIC_PLAN_SORT:
......@@ -3083,6 +3116,8 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
return jsonToLogicScanNode(pJson, pObj);
case QUERY_NODE_LOGIC_PLAN_PROJECT:
return jsonToLogicProjectNode(pJson, pObj);
case QUERY_NODE_LOGIC_PLAN_EXCHANGE:
return jsonToLogicExchangeNode(pJson, pObj);
case QUERY_NODE_LOGIC_PLAN_FILL:
return jsonToLogicFillNode(pJson, pObj);
case QUERY_NODE_LOGIC_PLAN_SORT:
......
......@@ -14,6 +14,7 @@
*/
#include "catalog.h"
#include "cmdnodes.h"
#include "parInt.h"
typedef struct SAuthCxt {
......@@ -65,8 +66,8 @@ static int32_t authSetOperator(SAuthCxt* pCxt, SSetOperator* pSetOper) {
return code;
}
static int32_t authDropUser(SAuthCxt* pCxt, SDropUserReq* pStmt) {
if (!pCxt->pParseCxt->isSuperUser || 0 == strcmp(pStmt->user, TSDB_DEFAULT_USER)) {
static int32_t authDropUser(SAuthCxt* pCxt, SDropUserStmt* pStmt) {
if (!pCxt->pParseCxt->isSuperUser || 0 == strcmp(pStmt->useName, TSDB_DEFAULT_USER)) {
return TSDB_CODE_PAR_PERMISSION_DENIED;
}
return TSDB_CODE_SUCCESS;
......@@ -92,7 +93,7 @@ static int32_t authQuery(SAuthCxt* pCxt, SNode* pStmt) {
case QUERY_NODE_ALTER_USER_STMT:
break;
case QUERY_NODE_DROP_USER_STMT: {
return authDropUser(pCxt, (SDropUserReq*)pStmt);
return authDropUser(pCxt, (SDropUserStmt*)pStmt);
}
case QUERY_NODE_USE_DATABASE_STMT:
case QUERY_NODE_CREATE_DNODE_STMT:
......
......@@ -235,11 +235,17 @@ TEST_F(ParserSelectTest, semanticError) {
TEST_F(ParserSelectTest, setOperator) {
useDb("root", "test");
// run("SELECT * FROM t1 UNION ALL SELECT * FROM t1");
run("SELECT * FROM t1 UNION ALL SELECT * FROM t1");
// run("(SELECT * FROM t1) UNION ALL (SELECT * FROM t1)");
run("(SELECT * FROM t1) UNION ALL (SELECT * FROM t1)");
run("SELECT c1 FROM (SELECT c1 FROM t1 UNION ALL SELECT c1 FROM t1)");
}
TEST_F(ParserSelectTest, informationSchema) {
useDb("root", "test");
run("SELECT * FROM information_schema.user_databases WHERE name = 'information_schema'");
}
} // namespace ParserTest
......@@ -392,7 +392,8 @@ static int32_t cpdCalcTimeRange(SScanLogicNode* pScan, SNode** pPrimaryKeyCond,
}
static int32_t cpdOptimizeScanCondition(SOptimizeContext* pCxt, SScanLogicNode* pScan) {
if (NULL == pScan->node.pConditions || OPTIMIZE_FLAG_TEST_MASK(pScan->node.optimizedFlag, OPTIMIZE_FLAG_CPD)) {
if (NULL == pScan->node.pConditions || OPTIMIZE_FLAG_TEST_MASK(pScan->node.optimizedFlag, OPTIMIZE_FLAG_CPD) ||
TSDB_SYSTEM_TABLE == pScan->pMeta->tableType) {
return TSDB_CODE_SUCCESS;
}
......
......@@ -303,7 +303,7 @@ static SLogicNode* unMatchByNode(SLogicNode* pNode) {
}
SNode* pChild;
FOREACH(pChild, pNode->pChildren) {
SLogicNode* pSplitNode = uaMatchByNode((SLogicNode*)pChild);
SLogicNode* pSplitNode = unMatchByNode((SLogicNode*)pChild);
if (NULL != pSplitNode) {
return pSplitNode;
}
......@@ -318,7 +318,7 @@ static int32_t unCreateExchangeNode(SSplitContext* pCxt, SLogicSubplan* pSubplan
}
pExchange->srcGroupId = pCxt->groupId;
// pExchange->precision = pScan->pMeta->tableInfo.precision;
pExchange->node.pTargets = nodesCloneList(pAgg->node.pTargets);
pExchange->node.pTargets = nodesCloneList(pAgg->pGroupKeys);
if (NULL == pExchange->node.pTargets) {
return TSDB_CODE_OUT_OF_MEMORY;
}
......
......@@ -47,4 +47,10 @@ TEST_F(PlanOtherTest, explain) {
run("explain analyze SELECT * FROM t1");
run("explain analyze verbose true ratio 0.01 SELECT * FROM t1");
}
\ No newline at end of file
}
TEST_F(PlanOtherTest, show) {
useDb("root", "test");
run("SHOW DATABASES");
}
......@@ -23,13 +23,13 @@ class PlanStateTest : public PlannerTestBase {};
TEST_F(PlanStateTest, basic) {
useDb("root", "test");
run("select count(*) from t1 state_window(c1)");
run("SELECT COUNT(*) FROM t1 STATE_WINDOW(c1)");
}
TEST_F(PlanStateTest, stateExpr) {
useDb("root", "test");
run("select count(*) from t1 state_window(c1 + 10)");
run("SELECT COUNT(*) FROM t1 STATE_WINDOW(c1 + 10)");
}
TEST_F(PlanStateTest, selectFunc) {
......
......@@ -25,11 +25,9 @@ TEST_F(PlanSubqeuryTest, basic) {
if (0 == g_skipSql) {
run("SELECT * FROM (SELECT * FROM t1)");
run("SELECT LAST(c1) FROM (SELECT * FROM t1)");
}
run("SELECT c1 FROM (SELECT c1 FROM t1 UNION ALL SELECT c1 FROM t1)");
run("SELECT LAST(c1) FROM (SELECT * FROM t1)");
}
TEST_F(PlanSubqeuryTest, doubleGroupBy) {
......@@ -39,3 +37,11 @@ TEST_F(PlanSubqeuryTest, doubleGroupBy) {
"SELECT c1 + c3 a, c1 + COUNT(*) b FROM t1 WHERE c2 = 'abc' GROUP BY c1, c3) "
"WHERE a > 100 GROUP BY b");
}
TEST_F(PlanSubqeuryTest, withSetOperator) {
useDb("root", "test");
run("SELECT c1 FROM (SELECT c1 FROM t1 UNION ALL SELECT c1 FROM t1)");
run("SELECT c1 FROM (SELECT c1 FROM t1 UNION SELECT c1 FROM t1)");
}
......@@ -27,8 +27,8 @@ TEST_F(PlanSysTableTest, show) {
run("show stables");
}
TEST_F(PlanSysTableTest, information) {
TEST_F(PlanSysTableTest, informationSchema) {
useDb("root", "information_schema");
run("show tables");
run("SELECT * FROM information_schema.user_databases WHERE name = 'information_schema'");
}
......@@ -812,13 +812,11 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
}
if (ctx->rspCode) {
QW_TASK_ELOG("task already failed at phase %s, error:%x - %s", qwPhaseStr(phase), ctx->rspCode,
tstrerror(ctx->rspCode));
QW_TASK_ELOG("task already failed at phase %s, code:%s", qwPhaseStr(phase), tstrerror(ctx->rspCode));
QW_ERR_JRET(ctx->rspCode);
}
_return:
if (ctx) {
QW_UPDATE_RSP_CODE(ctx, code);
......@@ -836,7 +834,11 @@ _return:
QW_TASK_DLOG("cancel rsp send, handle:%p, code:%x - %s", cancelConnection->handle, code, tstrerror(code));
}
QW_TASK_DLOG("end to handle event at phase %s, code:%x - %s", qwPhaseStr(phase), code, tstrerror(code));
if (code != TSDB_CODE_SUCCESS) {
QW_TASK_ELOG("end to handle event at phase %s, code:%s", qwPhaseStr(phase), tstrerror(code));
} else {
QW_TASK_DLOG("end to handle event at phase %s, code:%s", qwPhaseStr(phase), tstrerror(code));
}
QW_RET(code);
}
......
......@@ -35,6 +35,7 @@ typedef struct SSyncIndexMgr {
} SSyncIndexMgr;
SSyncIndexMgr *syncIndexMgrCreate(SSyncNode *pSyncNode);
void syncIndexMgrUpdate(SSyncIndexMgr *pSyncIndexMgr, SSyncNode *pSyncNode);
void syncIndexMgrDestroy(SSyncIndexMgr *pSyncIndexMgr);
void syncIndexMgrClear(SSyncIndexMgr *pSyncIndexMgr);
void syncIndexMgrSetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, SyncIndex index);
......
......@@ -247,6 +247,7 @@ typedef struct SSyncNode {
// open/close --------------
SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo);
void syncNodeStart(SSyncNode* pSyncNode);
void syncNodeStartStandBy(SSyncNode* pSyncNode);
void syncNodeClose(SSyncNode* pSyncNode);
// ping --------------
......@@ -271,7 +272,7 @@ int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, S
cJSON* syncNode2Json(const SSyncNode* pSyncNode);
char* syncNode2Str(const SSyncNode* pSyncNode);
char* syncNode2SimpleStr(const SSyncNode* pSyncNode);
void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg *newConfig);
void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig);
SSyncNode* syncNodeAcquire(int64_t rid);
void syncNodeRelease(SSyncNode* pNode);
......
......@@ -62,7 +62,6 @@ bool syncUtilUserPreCommit(tmsg_t msgType);
bool syncUtilUserCommit(tmsg_t msgType);
bool syncUtilUserRollback(tmsg_t msgType);
#ifdef __cplusplus
}
#endif
......
......@@ -15,11 +15,11 @@
#include "syncAppendEntries.h"
#include "syncInt.h"
#include "syncRaftCfg.h"
#include "syncRaftLog.h"
#include "syncRaftStore.h"
#include "syncUtil.h"
#include "syncVoteMgr.h"
#include "syncRaftCfg.h"
// TLA+ Spec
// HandleAppendEntriesRequest(i, j, m) ==
......@@ -200,7 +200,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
SSyncRaftEntry* pRollBackEntry = logStoreGetEntry(ths->pLogStore, index);
assert(pRollBackEntry != NULL);
//if (pRollBackEntry->msgType != TDMT_VND_SYNC_NOOP) {
// if (pRollBackEntry->msgType != TDMT_VND_SYNC_NOOP) {
if (syncUtilUserRollback(pRollBackEntry->msgType)) {
SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pRollBackEntry, &rpcMsg);
......@@ -229,7 +229,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pAppendEntry, &rpcMsg);
if (ths->pFsm != NULL) {
//if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->originalRpcType != TDMT_VND_SYNC_NOOP) {
// if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->originalRpcType != TDMT_VND_SYNC_NOOP) {
if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pAppendEntry->originalRpcType)) {
SFsmCbMeta cbMeta;
cbMeta.index = pAppendEntry->index;
......@@ -261,7 +261,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pAppendEntry, &rpcMsg);
if (ths->pFsm != NULL) {
//if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->originalRpcType != TDMT_VND_SYNC_NOOP) {
// if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->originalRpcType != TDMT_VND_SYNC_NOOP) {
if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pAppendEntry->originalRpcType)) {
SFsmCbMeta cbMeta;
cbMeta.index = pAppendEntry->index;
......@@ -324,7 +324,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pEntry, &rpcMsg);
//if (ths->pFsm->FpCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) {
// if (ths->pFsm->FpCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) {
if (ths->pFsm->FpCommitCb != NULL && syncUtilUserCommit(pEntry->originalRpcType)) {
SFsmCbMeta cbMeta;
cbMeta.index = pEntry->index;
......@@ -338,10 +338,15 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
// config change
if (pEntry->originalRpcType == TDMT_VND_SYNC_CONFIG_CHANGE) {
SSyncCfg newSyncCfg;
int32_t ret = syncCfgFromStr(rpcMsg.pCont, &newSyncCfg);
int32_t ret = syncCfgFromStr(rpcMsg.pCont, &newSyncCfg);
ASSERT(ret == 0);
syncNodeUpdateConfig(ths, &newSyncCfg);
if (ths->state == TAOS_SYNC_STATE_LEADER) {
syncNodeBecomeLeader(ths);
} else {
syncNodeBecomeFollower(ths);
}
}
rpcFreeCont(rpcMsg.pCont);
......
......@@ -16,10 +16,10 @@
#include "syncCommit.h"
#include "syncIndexMgr.h"
#include "syncInt.h"
#include "syncRaftCfg.h"
#include "syncRaftLog.h"
#include "syncRaftStore.h"
#include "syncUtil.h"
#include "syncRaftCfg.h"
// \* Leader i advances its commitIndex.
// \* This is done as a separate step from handling AppendEntries responses,
......@@ -102,7 +102,7 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pEntry, &rpcMsg);
//if (pSyncNode->pFsm->FpCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) {
// if (pSyncNode->pFsm->FpCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) {
if (pSyncNode->pFsm->FpCommitCb != NULL && syncUtilUserCommit(pEntry->originalRpcType)) {
SFsmCbMeta cbMeta;
cbMeta.index = pEntry->index;
......@@ -114,12 +114,17 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
}
// config change
if (pEntry->originalRpcType == TDMT_VND_SYNC_CONFIG_CHANGE) {
SSyncCfg newSyncCfg;
int32_t ret = syncCfgFromStr(rpcMsg.pCont, &newSyncCfg);
ASSERT(ret == 0);
syncNodeUpdateConfig(pSyncNode, &newSyncCfg);
if (pEntry->originalRpcType == TDMT_VND_SYNC_CONFIG_CHANGE) {
SSyncCfg newSyncCfg;
int32_t ret = syncCfgFromStr(rpcMsg.pCont, &newSyncCfg);
ASSERT(ret == 0);
syncNodeUpdateConfig(pSyncNode, &newSyncCfg);
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
syncNodeBecomeLeader(pSyncNode);
} else {
syncNodeBecomeFollower(pSyncNode);
}
}
rpcFreeCont(rpcMsg.pCont);
......
......@@ -15,6 +15,7 @@
#include "syncIO.h"
#include <tdatablock.h>
#include "os.h"
#include "syncMessage.h"
#include "syncUtil.h"
#include "tglobal.h"
......@@ -198,6 +199,7 @@ static int32_t syncIOStartInternal(SSyncIO *io) {
{
SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit));
snprintf(rpcInit.localFqdn, sizeof(rpcInit.localFqdn), "%s", "127.0.0.1");
rpcInit.localPort = io->myAddr.eps[0].port;
rpcInit.label = "SYNC-IO-SERVER";
rpcInit.numOfThreads = 1;
......
......@@ -31,6 +31,13 @@ SSyncIndexMgr *syncIndexMgrCreate(SSyncNode *pSyncNode) {
return pSyncIndexMgr;
}
void syncIndexMgrUpdate(SSyncIndexMgr *pSyncIndexMgr, SSyncNode *pSyncNode) {
pSyncIndexMgr->replicas = &(pSyncNode->replicasId);
pSyncIndexMgr->replicaNum = pSyncNode->replicaNum;
pSyncIndexMgr->pSyncNode = pSyncNode;
syncIndexMgrClear(pSyncIndexMgr);
}
void syncIndexMgrDestroy(SSyncIndexMgr *pSyncIndexMgr) {
if (pSyncIndexMgr != NULL) {
taosMemoryFree(pSyncIndexMgr);
......
......@@ -103,6 +103,16 @@ void syncStart(int64_t rid) {
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
}
void syncStartStandBy(int64_t rid) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) {
return;
}
syncNodeStartStandBy(pSyncNode);
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
}
void syncStop(int64_t rid) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) {
......@@ -116,7 +126,7 @@ void syncStop(int64_t rid) {
int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg) {
int32_t ret = 0;
char *configChange = syncCfg2Str((SSyncCfg*)pSyncCfg);
char* configChange = syncCfg2Str((SSyncCfg*)pSyncCfg);
SRpcMsg rpcMsg = {0};
rpcMsg.msgType = TDMT_VND_SYNC_CONFIG_CHANGE;
rpcMsg.noResp = 1;
......@@ -188,10 +198,9 @@ void syncGetEpSet(int64_t rid, SEpSet* pEpSet) {
(pEpSet->numOfEps)++;
sInfo("syncGetEpSet index:%d %s:%d", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port);
}
pEpSet->inUse = pSyncNode->pRaftCfg->cfg.myIndex;
sInfo("syncGetEpSet pEpSet->inUse:%d ", pEpSet->inUse);
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
......@@ -524,6 +533,17 @@ void syncNodeStart(SSyncNode* pSyncNode) {
assert(ret == 0);
}
void syncNodeStartStandBy(SSyncNode* pSyncNode) {
// state change
pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
syncNodeStopHeartbeatTimer(pSyncNode);
// reset elect timer, long enough
int32_t electMS = TIMER_MAX_MS;
int32_t ret = syncNodeRestartElectTimer(pSyncNode, electMS);
ASSERT(ret == 0);
}
void syncNodeClose(SSyncNode* pSyncNode) {
int32_t ret;
assert(pSyncNode != NULL);
......@@ -858,7 +878,7 @@ char* syncNode2SimpleStr(const SSyncNode* pSyncNode) {
return s;
}
void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg *newConfig) {
void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig) {
pSyncNode->pRaftCfg->cfg = *newConfig;
int32_t ret = raftCfgPersist(pSyncNode->pRaftCfg);
ASSERT(ret == 0);
......@@ -885,6 +905,11 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg *newConfig) {
for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
syncUtilnodeInfo2raftId(&pSyncNode->pRaftCfg->cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]);
}
syncIndexMgrUpdate(pSyncNode->pNextIndex, pSyncNode);
syncIndexMgrUpdate(pSyncNode->pMatchIndex, pSyncNode);
syncNodeLog2("==syncNodeUpdateConfig==", pSyncNode);
}
SSyncNode* syncNodeAcquire(int64_t rid) {
......@@ -1245,7 +1270,7 @@ int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg) {
syncEntry2OriginalRpc(pEntry, &rpcMsg);
if (ths->pFsm != NULL) {
//if (ths->pFsm->FpPreCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) {
// if (ths->pFsm->FpPreCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) {
if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pEntry->originalRpcType)) {
SFsmCbMeta cbMeta;
cbMeta.index = pEntry->index;
......@@ -1267,7 +1292,7 @@ int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg) {
syncEntry2OriginalRpc(pEntry, &rpcMsg);
if (ths->pFsm != NULL) {
//if (ths->pFsm->FpPreCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) {
// if (ths->pFsm->FpPreCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) {
if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pEntry->originalRpcType)) {
SFsmCbMeta cbMeta;
cbMeta.index = pEntry->index;
......
......@@ -58,14 +58,15 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
syncMeta.term = pEntry->term;
code = walWriteWithSyncInfo(pWal, pEntry->index, pEntry->originalRpcType, syncMeta, pEntry->data, pEntry->dataLen);
if (code != 0) {
int32_t err = terrno;
const char *errStr = tstrerror(err);
int32_t linuxErr = errno;
const char *linuxErrMsg = strerror(errno);
sError("walWriteWithSyncInfo error, err:%d %X, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, err, errStr, linuxErr, linuxErrMsg);
int32_t err = terrno;
const char* errStr = tstrerror(err);
int32_t linuxErr = errno;
const char* linuxErrMsg = strerror(errno);
sError("walWriteWithSyncInfo error, err:%d %X, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, err, errStr, linuxErr,
linuxErrMsg);
ASSERT(0);
}
//assert(code == 0);
}
// assert(code == 0);
walFsync(pWal, true);
return code;
......@@ -77,16 +78,17 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) {
if (index >= SYNC_INDEX_BEGIN && index <= logStoreLastIndex(pLogStore)) {
SWalReadHandle* pWalHandle = walOpenReadHandle(pWal);
int32_t code = walReadWithHandle(pWalHandle, index);
int32_t code = walReadWithHandle(pWalHandle, index);
if (code != 0) {
int32_t err = terrno;
const char *errStr = tstrerror(err);
int32_t linuxErr = errno;
const char *linuxErrMsg = strerror(errno);
sError("walReadWithHandle error, err:%d %X, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, err, errStr, linuxErr, linuxErrMsg);
int32_t err = terrno;
const char* errStr = tstrerror(err);
int32_t linuxErr = errno;
const char* linuxErrMsg = strerror(errno);
sError("walReadWithHandle error, err:%d %X, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, err, errStr, linuxErr,
linuxErrMsg);
ASSERT(0);
}
//assert(walReadWithHandle(pWalHandle, index) == 0);
}
// assert(walReadWithHandle(pWalHandle, index) == 0);
SSyncRaftEntry* pEntry = syncEntryBuild(pWalHandle->pHead->head.bodyLen);
assert(pEntry != NULL);
......@@ -112,16 +114,17 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) {
int32_t logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex fromIndex) {
SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal;
//assert(walRollback(pWal, fromIndex) == 0);
// assert(walRollback(pWal, fromIndex) == 0);
int32_t code = walRollback(pWal, fromIndex);
if (code != 0) {
int32_t err = terrno;
const char *errStr = tstrerror(err);
int32_t linuxErr = errno;
const char *linuxErrMsg = strerror(errno);
sError("walRollback error, err:%d %X, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, err, errStr, linuxErr, linuxErrMsg);
int32_t err = terrno;
const char* errStr = tstrerror(err);
int32_t linuxErr = errno;
const char* linuxErrMsg = strerror(errno);
sError("walRollback error, err:%d %X, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, err, errStr, linuxErr,
linuxErrMsg);
ASSERT(0);
}
}
return 0; // to avoid compiler error
}
......@@ -145,16 +148,16 @@ SyncTerm logStoreLastTerm(SSyncLogStore* pLogStore) {
int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index) {
SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal;
//assert(walCommit(pWal, index) == 0);
// assert(walCommit(pWal, index) == 0);
int32_t code = walCommit(pWal, index);
if (code != 0) {
int32_t err = terrno;
const char *errStr = tstrerror(err);
int32_t linuxErr = errno;
const char *linuxErrMsg = strerror(errno);
int32_t err = terrno;
const char* errStr = tstrerror(err);
int32_t linuxErr = errno;
const char* linuxErrMsg = strerror(errno);
sError("walCommit error, err:%d %X, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, err, errStr, linuxErr, linuxErrMsg);
ASSERT(0);
}
}
return 0; // to avoid compiler error
}
......
......@@ -37,6 +37,7 @@ add_executable(syncRaftCfgTest "")
add_executable(syncRespMgrTest "")
add_executable(syncSnapshotTest "")
add_executable(syncApplyMsgTest "")
add_executable(syncConfigChangeTest "")
target_sources(syncTest
......@@ -195,6 +196,10 @@ target_sources(syncApplyMsgTest
PRIVATE
"syncApplyMsgTest.cpp"
)
target_sources(syncConfigChangeTest
PRIVATE
"syncConfigChangeTest.cpp"
)
target_include_directories(syncTest
......@@ -392,6 +397,11 @@ target_include_directories(syncApplyMsgTest
"${TD_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_include_directories(syncConfigChangeTest
PUBLIC
"${TD_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_link_libraries(syncTest
......@@ -550,6 +560,10 @@ target_link_libraries(syncApplyMsgTest
sync
gtest_main
)
target_link_libraries(syncConfigChangeTest
sync
gtest_main
)
enable_testing()
......
#include <gtest/gtest.h>
#include <stdio.h>
#include "os.h"
#include "syncEnv.h"
#include "syncIO.h"
#include "syncInt.h"
#include "syncUtil.h"
#include "wal.h"
void logTest() {
sTrace("--- sync log test: trace");
sDebug("--- sync log test: debug");
sInfo("--- sync log test: info");
sWarn("--- sync log test: warn");
sError("--- sync log test: error");
sFatal("--- sync log test: fatal");
}
uint16_t gPorts[] = {7010, 7110, 7210, 7310, 7410};
const char* gDir = "./syncReplicateTest";
int32_t gVgId = 1234;
SyncIndex gSnapshotLastApplyIndex;
void init() {
int code = walInit();
assert(code == 0);
code = syncInit();
assert(code == 0);
sprintf(tsTempDir, "%s", ".");
}
void cleanup() { walCleanUp(); }
void CommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) {
SyncIndex beginIndex = SYNC_INDEX_INVALID;
if (pFsm->FpGetSnapshot != NULL) {
SSnapshot snapshot;
pFsm->FpGetSnapshot(pFsm, &snapshot);
beginIndex = snapshot.lastApplyIndex;
}
if (cbMeta.index > beginIndex) {
char logBuf[256];
snprintf(logBuf, sizeof(logBuf), "==callback== ==CommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n",
pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state));
syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg);
} else {
sTrace("==callback== ==CommitCb== do not apply again %ld", cbMeta.index);
}
}
void PreCommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) {
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"==callback== ==PreCommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", pFsm, cbMeta.index,
cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state));
syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg);
}
void RollBackCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) {
char logBuf[256];
snprintf(logBuf, sizeof(logBuf), "==callback== ==RollBackCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n",
pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state));
syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg);
}
int32_t GetSnapshotCb(struct SSyncFSM* pFsm, SSnapshot* pSnapshot) {
pSnapshot->data = NULL;
pSnapshot->lastApplyIndex = gSnapshotLastApplyIndex;
pSnapshot->lastApplyTerm = 100;
return 0;
}
SSyncFSM* createFsm() {
SSyncFSM* pFsm = (SSyncFSM*)taosMemoryMalloc(sizeof(SSyncFSM));
pFsm->FpCommitCb = CommitCb;
pFsm->FpPreCommitCb = PreCommitCb;
pFsm->FpRollBackCb = RollBackCb;
pFsm->FpGetSnapshot = GetSnapshotCb;
return pFsm;
}
SWal* createWal(char* path, int32_t vgId) {
SWalCfg walCfg;
memset(&walCfg, 0, sizeof(SWalCfg));
walCfg.vgId = vgId;
walCfg.fsyncPeriod = 1000;
walCfg.retentionPeriod = 1000;
walCfg.rollPeriod = 1000;
walCfg.retentionSize = 1000;
walCfg.segSize = 1000;
walCfg.level = TAOS_WAL_FSYNC;
SWal* pWal = walOpen(path, &walCfg);
assert(pWal != NULL);
return pWal;
}
int64_t createSyncNode(int32_t replicaNum, int32_t myIndex, int32_t vgId, SWal* pWal, char* path, bool isStandBy) {
SSyncInfo syncInfo;
syncInfo.vgId = vgId;
syncInfo.rpcClient = gSyncIO->clientRpc;
syncInfo.FpSendMsg = syncIOSendMsg;
syncInfo.queue = gSyncIO->pMsgQ;
syncInfo.FpEqMsg = syncIOEqMsg;
syncInfo.pFsm = createFsm();
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s_sync_replica%d_index%d", path, replicaNum, myIndex);
syncInfo.pWal = pWal;
SSyncCfg* pCfg = &syncInfo.syncCfg;
if (isStandBy) {
pCfg->myIndex = 0;
pCfg->replicaNum = 1;
pCfg->nodeInfo[0].nodePort = gPorts[myIndex];
taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn);
} else {
pCfg->myIndex = myIndex;
pCfg->replicaNum = replicaNum;
for (int i = 0; i < replicaNum; ++i) {
pCfg->nodeInfo[i].nodePort = gPorts[i];
taosGetFqdn(pCfg->nodeInfo[i].nodeFqdn);
// snprintf(pCfg->nodeInfo[i].nodeFqdn, sizeof(pCfg->nodeInfo[i].nodeFqdn), "%s", "127.0.0.1");
}
}
int64_t rid = syncOpen(&syncInfo);
assert(rid > 0);
SSyncNode* pSyncNode = (SSyncNode*)syncNodeAcquire(rid);
assert(pSyncNode != NULL);
gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote;
gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply;
gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries;
gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply;
gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout;
gSyncIO->FpOnSyncClientRequest = pSyncNode->FpOnClientRequest;
gSyncIO->pSyncNode = pSyncNode;
syncNodeRelease(pSyncNode);
return rid;
}
void configChange(int64_t rid, int32_t replicaNum, int32_t myIndex) {
SSyncCfg syncCfg;
syncCfg.myIndex = myIndex;
syncCfg.replicaNum = replicaNum;
for (int i = 0; i < replicaNum; ++i) {
syncCfg.nodeInfo[i].nodePort = gPorts[i];
taosGetFqdn(syncCfg.nodeInfo[i].nodeFqdn);
}
syncReconfig(rid, &syncCfg);
}
void usage(char* exe) {
printf("usage: %s replicaNum myIndex lastApplyIndex writeRecordNum isStandBy isConfigChange \n", exe);
}
SRpcMsg* createRpcMsg(int i, int count, int myIndex) {
SRpcMsg* pMsg = (SRpcMsg*)taosMemoryMalloc(sizeof(SRpcMsg));
memset(pMsg, 0, sizeof(SRpcMsg));
pMsg->msgType = 9999;
pMsg->contLen = 256;
pMsg->pCont = rpcMallocCont(pMsg->contLen);
snprintf((char*)(pMsg->pCont), pMsg->contLen, "value-myIndex:%u-%d-%d-%ld", myIndex, i, count, taosGetTimestampMs());
return pMsg;
}
int main(int argc, char** argv) {
tsAsyncLog = 0;
sDebugFlag = DEBUG_TRACE + DEBUG_SCREEN + DEBUG_FILE;
if (argc != 7) {
usage(argv[0]);
exit(-1);
}
int32_t replicaNum = atoi(argv[1]);
int32_t myIndex = atoi(argv[2]);
int32_t lastApplyIndex = atoi(argv[3]);
int32_t writeRecordNum = atoi(argv[4]);
bool isStandBy = atoi(argv[5]);
bool isConfigChange = atoi(argv[6]);
gSnapshotLastApplyIndex = lastApplyIndex;
if (!isStandBy) {
assert(replicaNum >= 1 && replicaNum <= 5);
assert(myIndex >= 0 && myIndex < replicaNum);
assert(lastApplyIndex >= -1);
assert(writeRecordNum >= 0);
}
init();
int32_t ret = syncIOStart((char*)"127.0.0.1", gPorts[myIndex]);
assert(ret == 0);
char walPath[128];
snprintf(walPath, sizeof(walPath), "%s_wal_replica%d_index%d", gDir, replicaNum, myIndex);
SWal* pWal = createWal(walPath, gVgId);
int64_t rid = createSyncNode(replicaNum, myIndex, gVgId, pWal, (char*)gDir, isStandBy);
assert(rid > 0);
if (isStandBy) {
syncStartStandBy(rid);
} else {
syncStart(rid);
}
SSyncNode* pSyncNode = (SSyncNode*)syncNodeAcquire(rid);
assert(pSyncNode != NULL);
if (isConfigChange) {
configChange(rid, 3, myIndex);
}
//---------------------------
int32_t alreadySend = 0;
while (1) {
char* s = syncNode2SimpleStr(pSyncNode);
if (alreadySend < writeRecordNum) {
SRpcMsg* pRpcMsg = createRpcMsg(alreadySend, writeRecordNum, myIndex);
int32_t ret = syncPropose(rid, pRpcMsg, false);
if (ret == TAOS_SYNC_PROPOSE_NOT_LEADER) {
sTrace("%s value%d write not leader", s, alreadySend);
} else {
assert(ret == 0);
sTrace("%s value%d write ok", s, alreadySend);
}
alreadySend++;
rpcFreeCont(pRpcMsg->pCont);
taosMemoryFree(pRpcMsg);
} else {
sTrace("%s", s);
}
taosMsleep(1000);
taosMemoryFree(s);
taosMsleep(1000);
}
syncNodeRelease(pSyncNode);
syncStop(rid);
walClose(pWal);
syncIOStop();
cleanup();
return 0;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册