diff --git a/src/dnode/src/dnodeVMgmt.c b/src/dnode/src/dnodeVMgmt.c index 66c94bf6755850f4b731e425cd0da90f75c218f3..ef99e70bd112494e4735b532396f925106cba5e2 100644 --- a/src/dnode/src/dnodeVMgmt.c +++ b/src/dnode/src/dnodeVMgmt.c @@ -154,7 +154,6 @@ static SCreateVnodeMsg* dnodeParseVnodeMsg(SRpcMsg *rpcMsg) { static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) { SCreateVnodeMsg *pCreate = dnodeParseVnodeMsg(rpcMsg); - void *pVnode = vnodeAcquire(pCreate->cfg.vgId); if (pVnode != NULL) { dDebug("vgId:%d, already exist, return success", pCreate->cfg.vgId); diff --git a/src/mnode/inc/mnodeDb.h b/src/mnode/inc/mnodeDb.h index 0fa1a15e2d7531d941f6236517385d1e97922c6e..40c5afc2c84cd497095a508df42b9e89ad638d70 100644 --- a/src/mnode/inc/mnodeDb.h +++ b/src/mnode/inc/mnodeDb.h @@ -40,6 +40,7 @@ void mnodeIncDbRef(SDbObj *pDb); void mnodeDecDbRef(SDbObj *pDb); bool mnodeCheckIsMonitorDB(char *db, char *monitordb); void mnodeDropAllDbs(SAcctObj *pAcct); +int mnodeInsertAlterDbRow(SDbObj *pDb, void *pMsg); int32_t mnodeCompactDbs(); diff --git a/src/mnode/inc/mnodeVgroup.h b/src/mnode/inc/mnodeVgroup.h index 73b0e6ae1bb2e05876deb2486db7c5d1cb25e65b..c796365843ae257324af3633e1340c9d044413ab 100644 --- a/src/mnode/inc/mnodeVgroup.h +++ b/src/mnode/inc/mnodeVgroup.h @@ -49,7 +49,7 @@ int32_t mnodeAddTableIntoVgroup(SVgObj *pVgroup, SCTableObj *pTable, bool needCh void mnodeRemoveTableFromVgroup(SVgObj *pVgroup, SCTableObj *pTable); void mnodeSendDropVnodeMsg(int32_t vgId, SRpcEpSet *epSet, void *ahandle); void mnodeSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle); -void mnodeSendAlterVgroupMsg(SVgObj *pVgroup); +void mnodeSendAlterVgroupMsg(SVgObj *pVgroup,SMnodeMsg *pMsg); void mnodeSendSyncVgroupMsg(SVgObj *pVgroup); SRpcEpSet mnodeGetEpSetFromVgroup(SVgObj *pVgroup); diff --git a/src/mnode/src/mnodeDb.c b/src/mnode/src/mnodeDb.c index 6ca43ce2da7354ea52ac38943d4d74ebac0a260a..334450f70b5125fcc57722c12cc1694b40eb166d 100644 --- a/src/mnode/src/mnodeDb.c +++ b/src/mnode/src/mnodeDb.c @@ -24,12 +24,14 @@ #include "tdataformat.h" #include "tp.h" #include "mnode.h" +#include "dnode.h" #include "mnodeDef.h" #include "mnodeInt.h" #include "mnodeAcct.h" #include "mnodeDb.h" #include "mnodeDnode.h" #include "mnodeMnode.h" +#include "mnodePeer.h" #include "mnodeProfile.h" #include "mnodeWrite.h" #include "mnodeSdb.h" @@ -1097,17 +1099,18 @@ static SDbCfg mnodeGetAlterDbOption(SDbObj *pDb, SAlterDbMsg *pAlter) { return newCfg; } -static int32_t mnodeAlterDbCb(SMnodeMsg *pMsg, int32_t code) { - if (code != TSDB_CODE_SUCCESS) return code; +static int32_t mnodeAlterDbFp(SMnodeMsg *pMsg) { SDbObj *pDb = pMsg->pDb; void *pIter = NULL; SVgObj *pVgroup = NULL; - while (1) { + pMsg->expected = 0; + while (1) { pIter = mnodeGetNextVgroup(pIter, &pVgroup); if (pVgroup == NULL) break; if (pVgroup->pDb == pDb) { - mnodeSendAlterVgroupMsg(pVgroup); + pMsg->expected += pVgroup->numOfVnodes; + mnodeSendAlterVgroupMsg(pVgroup,pMsg); } mnodeDecVgroupRef(pVgroup); } @@ -1115,9 +1118,32 @@ static int32_t mnodeAlterDbCb(SMnodeMsg *pMsg, int32_t code) { mDebug("db:%s, all vgroups is altered", pDb->name); mLInfo("db:%s, is alterd by %s", pDb->name, mnodeGetUserFromMsg(pMsg)); - bnNotify(); + // in case there is no vnode for this db currently(no table in db,etc.) + if (pMsg->expected == 0) { + SSdbRow row = { + .type = SDB_OPER_GLOBAL, + .pTable = tsDbSdb, + .pObj = pDb, + .pMsg = pMsg, + }; - return TSDB_CODE_SUCCESS; + return sdbUpdateRow(&row); + } + + //bnNotify(); + + return TSDB_CODE_MND_ACTION_IN_PROGRESS; +} + +int mnodeInsertAlterDbRow(SDbObj *pDb, void *pMsg) { + SSdbRow desc = { + .type = SDB_OPER_GLOBAL, + .pTable = tsDbSdb, + .pObj = pDb, + .pMsg = pMsg, + }; + + return sdbUpdateRow(&desc); } static int32_t mnodeAlterDb(SDbObj *pDb, SAlterDbMsg *pAlter, void *pMsg) { @@ -1141,14 +1167,14 @@ static int32_t mnodeAlterDb(SDbObj *pDb, SAlterDbMsg *pAlter, void *pMsg) { .pTable = tsDbSdb, .pObj = pDb, .pMsg = pMsg, - .fpRsp = mnodeAlterDbCb + .fpReq = mnodeAlterDbFp }; code = sdbUpdateRow(&row); if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { mError("db:%s, failed to alter, reason:%s", pDb->name, tstrerror(code)); } - } +} return code; } diff --git a/src/mnode/src/mnodeVgroup.c b/src/mnode/src/mnodeVgroup.c index 2a781b9202494332c6fcaf18cf5bdd64e6831c6b..af816d7ae74b0ed16b43b64ffac997daefc9b20d 100644 --- a/src/mnode/src/mnodeVgroup.c +++ b/src/mnode/src/mnodeVgroup.c @@ -60,7 +60,6 @@ static int32_t mnodeGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *p static int32_t mnodeRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pConn); static void mnodeProcessCreateVnodeRsp(SRpcMsg *rpcMsg); static void mnodeProcessAlterVnodeRsp(SRpcMsg *rpcMsg); -static void mnodeProcessSyncVnodeRsp(SRpcMsg *rpcMsg); static void mnodeProcessDropVnodeRsp(SRpcMsg *rpcMsg); static int32_t mnodeProcessVnodeCfgMsg(SMnodeMsg *pMsg) ; static void mnodeSendDropVgroupMsg(SVgObj *pVgroup, void *ahandle); @@ -237,7 +236,6 @@ int32_t mnodeInitVgroups() { mnodeAddShowFreeIterHandle(TSDB_MGMT_TABLE_VGROUP, mnodeCancelGetNextVgroup); mnodeAddPeerRspHandle(TSDB_MSG_TYPE_MD_CREATE_VNODE_RSP, mnodeProcessCreateVnodeRsp); mnodeAddPeerRspHandle(TSDB_MSG_TYPE_MD_ALTER_VNODE_RSP, mnodeProcessAlterVnodeRsp); - mnodeAddPeerRspHandle(TSDB_MSG_TYPE_MD_ALTER_VNODE_RSP, mnodeProcessSyncVnodeRsp); mnodeAddPeerRspHandle(TSDB_MSG_TYPE_MD_DROP_VNODE_RSP, mnodeProcessDropVnodeRsp); mnodeAddPeerMsgHandle(TSDB_MSG_TYPE_DM_CONFIG_VNODE, mnodeProcessVnodeCfgMsg); @@ -271,7 +269,7 @@ void mnodeUpdateVgroup(SVgObj *pVgroup) { if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { mError("vgId:%d, failed to update vgroup", pVgroup->vgId); } - mnodeSendAlterVgroupMsg(pVgroup); + mnodeSendAlterVgroupMsg(pVgroup,NULL); } /* @@ -350,7 +348,7 @@ void mnodeUpdateVgroupStatus(SVgObj *pVgroup, SDnodeObj *pDnode, SVnodeLoad *pVl mError("dnode:%d, vgId:%d, vnode cfgVersion:%d:%d repica:%d not match with mnode cfgVersion:%d:%d replica:%d", pDnode->dnodeId, pVload->vgId, pVload->dbCfgVersion, pVload->vgCfgVersion, pVload->replica, pVgroup->pDb->dbCfgVersion, pVgroup->vgCfgVersion, pVgroup->numOfVnodes); - mnodeSendAlterVgroupMsg(pVgroup); + mnodeSendAlterVgroupMsg(pVgroup,NULL); } } @@ -946,10 +944,10 @@ SRpcEpSet mnodeGetEpSetFromIp(char *ep) { return epSet; } -static void mnodeSendAlterVnodeMsg(SVgObj *pVgroup, SRpcEpSet *epSet) { +static void mnodeSendAlterVnodeMsg(SVgObj *pVgroup, SRpcEpSet *epSet, SMnodeMsg *pMsg) { SAlterVnodeMsg *pAlter = mnodeBuildVnodeMsg(pVgroup); SRpcMsg rpcMsg = { - .ahandle = NULL, + .ahandle = pMsg, .pCont = pAlter, .contLen = pAlter ? sizeof(SAlterVnodeMsg) : 0, .code = 0, @@ -958,14 +956,18 @@ static void mnodeSendAlterVnodeMsg(SVgObj *pVgroup, SRpcEpSet *epSet) { dnodeSendMsgToDnode(epSet, &rpcMsg); } -void mnodeSendAlterVgroupMsg(SVgObj *pVgroup) { +void mnodeSendAlterVgroupMsg(SVgObj *pVgroup,SMnodeMsg *pMsg) { mDebug("vgId:%d, send alter all vnodes msg, numOfVnodes:%d db:%s", pVgroup->vgId, pVgroup->numOfVnodes, pVgroup->dbName); + if (pMsg) { + pMsg->pVgroup = pVgroup; + mnodeIncVgroupRef(pVgroup); + } for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) { SRpcEpSet epSet = mnodeGetEpSetFromIp(pVgroup->vnodeGid[i].pDnode->dnodeEp); mDebug("vgId:%d, index:%d, send alter vnode msg to dnode %s", pVgroup->vgId, i, pVgroup->vnodeGid[i].pDnode->dnodeEp); - mnodeSendAlterVnodeMsg(pVgroup, &epSet); + mnodeSendAlterVnodeMsg(pVgroup, &epSet,pMsg); } } @@ -1026,11 +1028,27 @@ void mnodeSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle) { } static void mnodeProcessAlterVnodeRsp(SRpcMsg *rpcMsg) { - mDebug("alter vnode rsp received"); -} + mDebug("alter vnode rsp is received, handle:%p", rpcMsg->ahandle); + if (rpcMsg->ahandle == NULL) return; -static void mnodeProcessSyncVnodeRsp(SRpcMsg *rpcMsg) { - mDebug("sync vnode rsp received"); + SMnodeMsg *mnodeMsg = rpcMsg->ahandle; + mnodeMsg->received++; + if (rpcMsg->code == TSDB_CODE_SUCCESS) { + mnodeMsg->code = rpcMsg->code; + mnodeMsg->successed++; + } + + SVgObj *pVgroup = mnodeMsg->pVgroup; + mDebug("vgId:%d, alter vnode rsp received, result:%s received:%d successed:%d expected:%d, thandle:%p ahandle:%p", + pVgroup->vgId, tstrerror(rpcMsg->code), mnodeMsg->received, mnodeMsg->successed, mnodeMsg->expected, + mnodeMsg->rpcMsg.handle, rpcMsg->ahandle); + + if (mnodeMsg->received != mnodeMsg->expected) return; + + int32_t code = mnodeInsertAlterDbRow(pVgroup->pDb, mnodeMsg); + if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { + dnodeSendRpcMWriteRsp(mnodeMsg, code); + } } static void mnodeProcessCreateVnodeRsp(SRpcMsg *rpcMsg) { diff --git a/tests/pytest/alter/alter_keep.py b/tests/pytest/alter/alter_keep.py index a734c660e8db3c26a10d760def1240a06bd05e8e..f09f5ab6d13e25856fc9aca2a060aabae327fc63 100644 --- a/tests/pytest/alter/alter_keep.py +++ b/tests/pytest/alter/alter_keep.py @@ -126,25 +126,64 @@ class TDTestCase: ##TODO: need to wait for TD-4445 to implement the following ## tests - # tdSql.prepare() - # tdSql.execute('create table tb (ts timestamp, speed int)') - # tdSql.execute('alter database db keep 10,10,10') - # tdSql.execute('insert into tb values (now, 10)') - # tdSql.execute('insert into tb values (now + 10m, 10)') - # tdSql.query('select * from tb') - # tdSql.checkRows(2) - # tdSql.execute('alter database db keep 40,40,40') + + + ## preset the keep + tdSql.prepare() + tdSql.execute('create table tb (ts timestamp, speed int)') + tdSql.execute('alter database db keep 10,10,10') + tdSql.execute('insert into tb values (now, 10)') + tdSql.execute('insert into tb values (now + 10m, 10)') + tdSql.query('select * from tb') + tdSql.checkRows(2) + + + #after alter from small to large, check if the alter if functioning + #test if change through test.py is consistent with change from taos client + #test case for TD-4459 and TD-4445 + tdSql.execute('alter database db keep 40,40,40') + tdSql.query('show databases') + tdSql.checkData(0,7,'40,40,40') + tdSql.error('insert into tb values (now-60d, 10)') + tdSql.execute('insert into tb values (now-30d, 10)') + tdSql.query('select * from tb') + tdSql.checkRows(3) + + rowNum = 3 + for i in range(30): + rowNum += 1 + tdSql.execute('alter database db keep 20,20,20') + tdSql.execute('alter database db keep 40,40,40') + tdSql.query('show databases') + tdSql.checkData(0,7,'40,40,40') + tdSql.error('insert into tb values (now-60d, 10)') + tdSql.execute('insert into tb values (now-30d, 10)') + tdSql.query('select * from tb') + tdSql.checkRows(rowNum) + + tdSql.execute('alter database db keep 10,10,10') + tdSql.query('show databases') + tdSql.checkData(0,7,'10,10,10') + + # if uncomment these three lines, timestamp out of range error will appear + # tdSql.execute('alter database db keep 15,15,15') # tdSql.query('show databases') - # tdSql.checkData(0,7,'40,40,40') - # tdSql.error('insert into tb values (now-60d, 10)') - # tdSql.execute('insert into tb values (now-30d, 10)') - # tdSql.query('select * from tb') - # tdSql.checkRows(3) + # tdSql.checkData(0,7,'15,15,15') + + # the following line should generate an error, but the insert was a success + # the time now-15d is out of range of now -10d + tdSql.error('insert into tb values (now-15d, 10)') + tdSql.query('select * from tb') + tdSql.checkRows(rowNum) + # tdSql.execute('alter database db keep 20,20,20') # tdSql.query('show databases') # tdSql.checkData(0,7,'20,20,20') + # tdSql.error('insert into tb values (now-30d, 10)') + # tdSql.query('show databases') + # tdSql.checkData(0,7,'20,20,20') # tdSql.query('select * from tb') - # tdSql.checkRows(2) + # tdSql.checkRows(rowNum) def stop(self): diff --git a/tests/pytest/stream/table_1.py b/tests/pytest/stream/table_1.py index a9fd57393112177fe0e290cadeaabd97d669daca..b205491fad181a51c991c16da65baa8370174e74 100644 --- a/tests/pytest/stream/table_1.py +++ b/tests/pytest/stream/table_1.py @@ -51,7 +51,7 @@ class TDTestCase: tdSql.execute( "insert into tb%d values (now - %dm, %d, %d)" % (i, 1440 - j, j, j)) - time.sleep(0.1) + time.sleep(1) self.createFuncStream("count(*)", "c1", rowNum) self.createFuncStream("count(tbcol)", "c2", rowNum) diff --git a/tests/pytest/tag_lite/unsignedInt.py b/tests/pytest/tag_lite/unsignedInt.py index ce6c546a3f8f2ccb3b60fbf6846ca8258d689c2c..6e741d351abe5c5608f45e414bcabc6d40dd0980 100644 --- a/tests/pytest/tag_lite/unsignedInt.py +++ b/tests/pytest/tag_lite/unsignedInt.py @@ -4,7 +4,7 @@ import sys from util.log import * from util.cases import * from util.sql import * - +import time class TDTestCase: def init(self, conn, logSql): @@ -112,6 +112,7 @@ class TDTestCase: # TSIM: endw # TSIM: # TSIM: print =============== step2 + time.sleep(1) tdLog.info('=============== step2') # TSIM: sleep 100 # TSIM: sql select * from $tb