diff --git a/src/client/src/tscSecondaryMerge.c b/src/client/src/tscSecondaryMerge.c index 8c3345f112fce156a1fc38351e425f7f572eae8b..fd9ac533fd3734e298e98f67a0c7bfaabdd67c20 100644 --- a/src/client/src/tscSecondaryMerge.c +++ b/src/client/src/tscSecondaryMerge.c @@ -353,7 +353,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd pRes->numOfGroups = 0; STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); - STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);; + STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta); TSKEY stime = MIN(pQueryInfo->window.skey, pQueryInfo->window.ekey); int64_t revisedSTime = diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 3a77aaffb3e7cdd0fe5030bf4cec25381c461a70..8a6d9d02a8db947e6c2602cb458a4f25606b1cc1 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -1474,7 +1474,7 @@ int tscBuildTableMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pMsg += len; } - pCmd->payloadLen = pMsg - (char*)pInfoMsg;; + pCmd->payloadLen = pMsg - (char*)pInfoMsg; pCmd->msgType = TSDB_MSG_TYPE_CM_TABLE_META; tfree(tmpData); diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index c4413f85414f67d8db3cb5422acc5e34e5eb87c3..7ce86dc1a41a8fab22fc47883a4fd03ced1d6678 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -499,7 +499,7 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p if (pSql->sqlstr == NULL) { tscError("%p failed to malloc sql string buffer", pSql); tscFreeSqlObj(pSql); - return NULL;; + return NULL; } strtolower(pSql->sqlstr, sqlstr); diff --git a/src/dnode/src/dnodeMWrite.c b/src/dnode/src/dnodeMWrite.c index eb9d8653e56fd8984c96615015cb593fec3e6fa3..e3a228919fcffc39b135c1685c7b3385e3253e39 100644 --- a/src/dnode/src/dnodeMWrite.c +++ b/src/dnode/src/dnodeMWrite.c @@ -118,6 +118,8 @@ void dnodeDispatchToMnodeWriteQueue(SRpcMsg *pMsg) { SMnodeMsg *pWrite = (SMnodeMsg *)taosAllocateQitem(sizeof(SMnodeMsg)); mnodeCreateMsg(pWrite, pMsg); + + dTrace("app:%p:%p, msg:%s is put into mwrite queue", pWrite->rpcMsg.ahandle, pWrite, taosMsg[pWrite->rpcMsg.msgType]); taosWriteQitem(tsMWriteQueue, TAOS_QTYPE_RPC, pWrite); } @@ -128,6 +130,7 @@ static void dnodeFreeMnodeWriteMsg(SMnodeMsg *pWrite) { void dnodeSendRpcMnodeWriteRsp(void *pRaw, int32_t code) { SMnodeMsg *pWrite = pRaw; + if (pWrite == NULL) return; if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) return; if (code == TSDB_CODE_MND_ACTION_NEED_REPROCESSED) { dnodeReprocessMnodeWriteMsg(pWrite); @@ -146,19 +149,21 @@ void dnodeSendRpcMnodeWriteRsp(void *pRaw, int32_t code) { } static void *dnodeProcessMnodeWriteQueue(void *param) { - SMnodeMsg *pWriteMsg; + SMnodeMsg *pWrite; int32_t type; void * unUsed; while (1) { - if (taosReadQitemFromQset(tsMWriteQset, &type, (void **)&pWriteMsg, &unUsed) == 0) { + if (taosReadQitemFromQset(tsMWriteQset, &type, (void **)&pWrite, &unUsed) == 0) { dTrace("dnodeProcessMnodeWriteQueue: got no message from qset, exiting..."); break; } - dTrace("%p, msg:%s will be processed in mwrite queue", pWriteMsg->rpcMsg.ahandle, taosMsg[pWriteMsg->rpcMsg.msgType]); - int32_t code = mnodeProcessWrite(pWriteMsg); - dnodeSendRpcMnodeWriteRsp(pWriteMsg, code); + dTrace("app:%p:%p, msg:%s will be processed in mwrite queue", pWrite->rpcMsg.ahandle, pWrite, + taosMsg[pWrite->rpcMsg.msgType]); + + int32_t code = mnodeProcessWrite(pWrite); + dnodeSendRpcMnodeWriteRsp(pWrite, code); } return NULL; @@ -168,9 +173,15 @@ void dnodeReprocessMnodeWriteMsg(void *pMsg) { SMnodeMsg *pWrite = pMsg; if (!mnodeIsRunning() || tsMWriteQueue == NULL) { + dTrace("app:%p:%p, msg:%s is redirected for mnode not running, retry times:%d", pWrite->rpcMsg.ahandle, pWrite, + taosMsg[pWrite->rpcMsg.msgType], pWrite->retry); + dnodeSendRedirectMsg(pMsg, true); dnodeFreeMnodeWriteMsg(pWrite); - } else { + } else { + dTrace("app:%p:%p, msg:%s is reput into mwrite queue, retry times:%d", pWrite->rpcMsg.ahandle, pWrite, + taosMsg[pWrite->rpcMsg.msgType], pWrite->retry); + taosWriteQitem(tsMWriteQueue, TAOS_QTYPE_RPC, pWrite); } } diff --git a/src/inc/taoserror.h b/src/inc/taoserror.h index 3f1cbffa53bd8f0eaa32e25e8cd5999d8152c588..ba9559864612a767138cca1082d27e320c32986f 100644 --- a/src/inc/taoserror.h +++ b/src/inc/taoserror.h @@ -113,8 +113,12 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_QUERY_ID, 0, 0x030C, "mnode inva TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_STREAM_ID, 0, 0x030D, "mnode invalid stream id") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_CONN_ID, 0, 0x030E, "mnode invalid connection") -TAOS_DEFINE_ERROR(TSDB_CODE_MND_SDB_OBJ_ALREADY_THERE, 0, 0x0320, "mnode object already there") -TAOS_DEFINE_ERROR(TSDB_CODE_MND_SDB_ERROR, 0, 0x0321, "mnode sdb error") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_SDB_OBJ_ALREADY_THERE, 0, 0x0320, "[sdb] object already there") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_SDB_ERROR, 0, 0x0321, "[sdb] app error") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE, 0, 0x0322, "[sdb] invalid table type") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_SDB_OBJ_NOT_THERE, 0, 0x0323, "[sdb] object not there") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_SDB_INVAID_META_ROW, 0, 0x0324, "[sdb] invalid meta row") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_SDB_INVAID_KEY_TYPE, 0, 0x0325, "[sdb] invalid key type") TAOS_DEFINE_ERROR(TSDB_CODE_MND_DNODE_ALREADY_EXIST, 0, 0x0330, "mnode dnode already exist") TAOS_DEFINE_ERROR(TSDB_CODE_MND_DNODE_NOT_EXIST, 0, 0x0331, "mnode dnode not exist") diff --git a/src/mnode/inc/mnodeDnode.h b/src/mnode/inc/mnodeDnode.h index f95a163d838fc10ea9bb9acadef7bd9df7a83769..9e21f8f56ae5942382a162a9a2c9a4eac12f5908 100644 --- a/src/mnode/inc/mnodeDnode.h +++ b/src/mnode/inc/mnodeDnode.h @@ -47,7 +47,7 @@ void mnodeDecDnodeRef(SDnodeObj *pDnode); void * mnodeGetDnode(int32_t dnodeId); void * mnodeGetDnodeByEp(char *ep); void mnodeUpdateDnode(SDnodeObj *pDnode); -int32_t mnodeDropDnode(SDnodeObj *pDnode); +int32_t mnodeDropDnode(SDnodeObj *pDnode, void *pMsg); extern int32_t tsAccessSquence; diff --git a/src/mnode/inc/mnodeInt.h b/src/mnode/inc/mnodeInt.h index 7405cef6f3ce9786864db8ff69a9e248c7dfc53b..708f11d1f78e8fb9006c6afb039a6fe366835d14 100644 --- a/src/mnode/inc/mnodeInt.h +++ b/src/mnode/inc/mnodeInt.h @@ -36,10 +36,10 @@ extern int32_t sdbDebugFlag; #define mLWarn(...) { monitorSaveLog(1, __VA_ARGS__); mWarn(__VA_ARGS__) } #define mLPrint(...) { monitorSaveLog(0, __VA_ARGS__); mPrint(__VA_ARGS__) } -#define sdbError(...) { if (sdbDebugFlag & DEBUG_ERROR) { taosPrintLog("ERROR MND-SDB ", 255, __VA_ARGS__); }} -#define sdbWarn(...) { if (sdbDebugFlag & DEBUG_WARN) { taosPrintLog("WARN MND-SDB ", sdbDebugFlag, __VA_ARGS__); }} -#define sdbTrace(...) { if (sdbDebugFlag & DEBUG_TRACE) { taosPrintLog("MND-SDB ", sdbDebugFlag, __VA_ARGS__);}} -#define sdbPrint(...) { taosPrintLog("MND-SDB ", 255, __VA_ARGS__); } +#define sdbError(...) { if (sdbDebugFlag & DEBUG_ERROR) { taosPrintLog("ERROR SDB ", 255, __VA_ARGS__); }} +#define sdbWarn(...) { if (sdbDebugFlag & DEBUG_WARN) { taosPrintLog("WARN SDB ", sdbDebugFlag, __VA_ARGS__); }} +#define sdbTrace(...) { if (sdbDebugFlag & DEBUG_TRACE) { taosPrintLog("SDB ", sdbDebugFlag, __VA_ARGS__);}} +#define sdbPrint(...) { taosPrintLog("SDB ", 255, __VA_ARGS__); } #define sdbLError(...) { monitorSaveLog(2, __VA_ARGS__); sdbError(__VA_ARGS__) } #define sdbLWarn(...) { monitorSaveLog(1, __VA_ARGS__); sdbWarn(__VA_ARGS__) } diff --git a/src/mnode/inc/mnodeSdb.h b/src/mnode/inc/mnodeSdb.h index 975206d52e50dbdcb17ad272ef792143e88f6c98..ca2fffe24c76b48dc647a08d08ffdce9e8873240 100644 --- a/src/mnode/inc/mnodeSdb.h +++ b/src/mnode/inc/mnodeSdb.h @@ -20,6 +20,8 @@ extern "C" { #endif +struct SMnodeMsg; + typedef enum { SDB_TABLE_DNODE = 0, SDB_TABLE_MNODE = 1, @@ -48,8 +50,11 @@ typedef struct { ESdbOper type; void * table; void * pObj; - int32_t rowSize; void * rowData; + int32_t rowSize; + int32_t retCode; // for callback in sdb queue + int32_t (*cb)(struct SMnodeMsg *pMsg, int32_t code); + struct SMnodeMsg *pMsg; } SSdbOper; typedef struct { diff --git a/src/mnode/inc/mnodeUser.h b/src/mnode/inc/mnodeUser.h index 338b43b47de070d2ebe76fe23499b55f3d42d318..073460f9d3621c7ec1f1eb57fd7da15791753b9e 100644 --- a/src/mnode/inc/mnodeUser.h +++ b/src/mnode/inc/mnodeUser.h @@ -28,7 +28,8 @@ void * mnodeGetNextUser(void *pIter, SUserObj **pUser); void mnodeIncUserRef(SUserObj *pUser); void mnodeDecUserRef(SUserObj *pUser); SUserObj *mnodeGetUserFromConn(void *pConn); -int32_t mnodeCreateUser(SAcctObj *pAcct, char *name, char *pass); +char * mnodeGetUserFromMsg(void *pMnodeMsg); +int32_t mnodeCreateUser(SAcctObj *pAcct, char *name, char *pass, void *pMsg); void mnodeDropAllUsers(SAcctObj *pAcct); #ifdef __cplusplus diff --git a/src/mnode/src/mnodeAcct.c b/src/mnode/src/mnodeAcct.c index 7ea5188b96a47981f5c42e9c28389f8cd0410409..4c795647698edf64e28e8f5542fc4143fa8aebd9 100644 --- a/src/mnode/src/mnodeAcct.c +++ b/src/mnode/src/mnodeAcct.c @@ -78,7 +78,9 @@ static int32_t mnodeAcctActionDecode(SSdbOper *pOper) { } static int32_t mnodeAcctActionRestored() { - if (dnodeIsFirstDeploy()) { + int32_t numOfRows = sdbGetNumOfRows(tsAcctSdb); + if (numOfRows <= 0 && dnodeIsFirstDeploy()) { + mPrint("dnode first deploy, create root acct"); int32_t code = mnodeCreateRootAcct(); if (code != TSDB_CODE_SUCCESS) { mError("failed to create root account, reason:%s", tstrerror(code)); diff --git a/src/mnode/src/mnodeDb.c b/src/mnode/src/mnodeDb.c index aeb6f4beb9e7a51b92f4c878454f41e82fe18597..5cb0096e8d243ef34bb501c7fe3cd4206e84a72d 100644 --- a/src/mnode/src/mnodeDb.c +++ b/src/mnode/src/mnodeDb.c @@ -41,7 +41,7 @@ static void * tsDbSdb = NULL; static int32_t tsDbUpdateSize; -static int32_t mnodeCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate); +static int32_t mnodeCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate, void *pMsg); static int32_t mnodeDropDb(SMnodeMsg *newMsg); static int32_t mnodeSetDbDropping(SDbObj *pDb); static int32_t mnodeGetDbMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); @@ -311,7 +311,7 @@ static void mnodeSetDefaultDbCfg(SDbCfg *pCfg) { if (pCfg->replications < 0) pCfg->replications = tsReplications; } -static int32_t mnodeCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate) { +static int32_t mnodeCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate, void *pMsg) { int32_t code = acctCheck(pAcct, ACCT_GRANT_DB); if (code != 0) return code; @@ -364,12 +364,15 @@ static int32_t mnodeCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate) { .table = tsDbSdb, .pObj = pDb, .rowSize = sizeof(SDbObj), + .pMsg = pMsg }; code = sdbInsertRow(&oper); if (code != TSDB_CODE_SUCCESS) { tfree(pDb); - code = TSDB_CODE_MND_SDB_ERROR; + } else { + mLPrint("db:%s, is created by %s", pDb->name, mnodeGetUserFromMsg(pMsg)); + if (pMsg != NULL) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; } return code; @@ -771,12 +774,7 @@ static int32_t mnodeProcessCreateDbMsg(SMnodeMsg *pMsg) { } else if (!pMsg->pUser->writeAuth) { code = TSDB_CODE_MND_NO_RIGHTS; } else { - code = mnodeCreateDb(pMsg->pUser->pAcct, pCreate); - if (code == TSDB_CODE_SUCCESS) { - mLPrint("db:%s, is created by %s", pCreate->db, pMsg->pUser->user); - } else { - mError("db:%s, failed to create, reason:%s", pCreate->db, tstrerror(code)); - } + code = mnodeCreateDb(pMsg->pUser->pAcct, pCreate, pMsg); } return code; @@ -893,7 +891,31 @@ static SDbCfg mnodeGetAlterDbOption(SDbObj *pDb, SCMAlterDbMsg *pAlter) { return newCfg; } -static int32_t mnodeAlterDb(SDbObj *pDb, SCMAlterDbMsg *pAlter) { +static int32_t mnodeAlterDbCb(SMnodeMsg *pMsg, int32_t code) { + if (code != TSDB_CODE_SUCCESS) return code; + SDbObj *pDb = pMsg->pDb; + + void *pIter = NULL; + while (1) { + SVgObj *pVgroup = NULL; + pIter = mnodeGetNextVgroup(pIter, &pVgroup); + if (pVgroup == NULL) break; + if (pVgroup->pDb == pDb) { + mnodeSendCreateVgroupMsg(pVgroup, NULL); + } + mnodeDecVgroupRef(pVgroup); + } + sdbFreeIter(pIter); + + mTrace("db:%s, all vgroups is altered", pDb->name); + mLPrint("db:%s, is alterd by %s", pDb->name, mnodeGetUserFromMsg(pMsg)); + + balanceNotify(); + + return TSDB_CODE_SUCCESS; +} + +static int32_t mnodeAlterDb(SDbObj *pDb, SCMAlterDbMsg *pAlter, void *pMsg) { SDbCfg newCfg = mnodeGetAlterDbOption(pDb, pAlter); if (terrno != TSDB_CODE_SUCCESS) { return terrno; @@ -904,38 +926,24 @@ static int32_t mnodeAlterDb(SDbObj *pDb, SCMAlterDbMsg *pAlter) { return code; } - int32_t oldReplica = pDb->cfg.replications; - if (memcmp(&newCfg, &pDb->cfg, sizeof(SDbCfg)) != 0) { pDb->cfg = newCfg; pDb->cfgVersion++; SSdbOper oper = { - .type = SDB_OPER_GLOBAL, + .type = SDB_OPER_GLOBAL, .table = tsDbSdb, - .pObj = pDb + .pObj = pDb, + .pMsg = pMsg, + .cb = mnodeAlterDbCb }; - int32_t code = sdbUpdateRow(&oper); - if (code != TSDB_CODE_SUCCESS) { - return TSDB_CODE_MND_SDB_ERROR; + code = sdbUpdateRow(&oper); + if (code == TSDB_CODE_SUCCESS) { + if (pMsg != NULL) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; } } - void *pIter = NULL; - while (1) { - SVgObj *pVgroup = NULL; - pIter = mnodeGetNextVgroup(pIter, &pVgroup); - if (pVgroup == NULL) break; - mnodeSendCreateVgroupMsg(pVgroup, NULL); - mnodeDecVgroupRef(pVgroup); - } - sdbFreeIter(pIter); - - if (oldReplica != pDb->cfg.replications) { - balanceNotify(); - } - - return TSDB_CODE_SUCCESS; + return code; } static int32_t mnodeProcessAlterDbMsg(SMnodeMsg *pMsg) { @@ -948,14 +956,7 @@ static int32_t mnodeProcessAlterDbMsg(SMnodeMsg *pMsg) { return TSDB_CODE_MND_INVALID_DB; } - int32_t code = mnodeAlterDb(pMsg->pDb, pAlter); - if (code != TSDB_CODE_SUCCESS) { - mError("db:%s, failed to alter, invalid db option", pAlter->db); - return code; - } - - mTrace("db:%s, all vgroups is altered", pMsg->pDb->name); - return TSDB_CODE_SUCCESS; + return mnodeAlterDb(pMsg->pDb, pAlter, pMsg); } static int32_t mnodeDropDb(SMnodeMsg *pMsg) { @@ -963,13 +964,16 @@ static int32_t mnodeDropDb(SMnodeMsg *pMsg) { mPrint("db:%s, drop db from sdb", pDb->name); SSdbOper oper = { - .type = SDB_OPER_GLOBAL, + .type = SDB_OPER_GLOBAL, .table = tsDbSdb, - .pObj = pDb + .pObj = pDb, + .pMsg = pMsg }; + int32_t code = sdbDeleteRow(&oper); - if (code != 0) { - code = TSDB_CODE_MND_SDB_ERROR; + if (code == TSDB_CODE_SUCCESS) { + mLPrint("db:%s, is dropped by %s", pDb->name, mnodeGetUserFromMsg(pMsg)); + if (pMsg != NULL) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; } return code; diff --git a/src/mnode/src/mnodeDnode.c b/src/mnode/src/mnodeDnode.c index fda3f48da5fdd7af0f1782d29fb07812539ca82c..35f4ea43d33c22b966eec0ec83f8cd751ed7f7af 100644 --- a/src/mnode/src/mnodeDnode.c +++ b/src/mnode/src/mnodeDnode.c @@ -44,7 +44,7 @@ static int32_t tsDnodeUpdateSize = 0; extern void * tsMnodeSdb; extern void * tsVgroupSdb; -static int32_t mnodeCreateDnode(char *ep); +static int32_t mnodeCreateDnode(char *ep, SMnodeMsg *pMsg); static int32_t mnodeProcessCreateDnodeMsg(SMnodeMsg *pMsg); static int32_t mnodeProcessDropDnodeMsg(SMnodeMsg *pMsg); static int32_t mnodeProcessCfgDnodeMsg(SMnodeMsg *pMsg); @@ -117,7 +117,8 @@ static int32_t mnodeDnodeActionDecode(SSdbOper *pOper) { static int32_t mnodeDnodeActionRestored() { int32_t numOfRows = sdbGetNumOfRows(tsDnodeSdb); if (numOfRows <= 0 && dnodeIsFirstDeploy()) { - mnodeCreateDnode(tsLocalEp); + mPrint("dnode first deploy, create dnode:%s", tsLocalEp); + mnodeCreateDnode(tsLocalEp, NULL); SDnodeObj *pDnode = mnodeGetDnodeByEp(tsLocalEp); mnodeAddMnode(pDnode->dnodeId); mnodeDecDnodeRef(pDnode); @@ -391,7 +392,7 @@ static int32_t mnodeProcessDnodeStatusMsg(SMnodeMsg *pMsg) { return TSDB_CODE_SUCCESS; } -static int32_t mnodeCreateDnode(char *ep) { +static int32_t mnodeCreateDnode(char *ep, SMnodeMsg *pMsg) { int32_t grantCode = grantCheck(TSDB_GRANT_DNODE); if (grantCode != TSDB_CODE_SUCCESS) { return grantCode; @@ -415,7 +416,8 @@ static int32_t mnodeCreateDnode(char *ep) { .type = SDB_OPER_GLOBAL, .table = tsDnodeSdb, .pObj = pDnode, - .rowSize = sizeof(SDnodeObj) + .rowSize = sizeof(SDnodeObj), + .pMsg = pMsg }; int32_t code = sdbInsertRow(&oper); @@ -423,30 +425,32 @@ static int32_t mnodeCreateDnode(char *ep) { int dnodeId = pDnode->dnodeId; tfree(pDnode); mError("failed to create dnode:%d, result:%s", dnodeId, tstrerror(code)); - return TSDB_CODE_MND_SDB_ERROR; + } else { + mPrint("dnode:%d is created, result:%s", pDnode->dnodeId, tstrerror(code)); + if (pMsg != NULL) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; } - mPrint("dnode:%d is created, result:%s", pDnode->dnodeId, tstrerror(code)); return code; } -int32_t mnodeDropDnode(SDnodeObj *pDnode) { +int32_t mnodeDropDnode(SDnodeObj *pDnode, void *pMsg) { SSdbOper oper = { .type = SDB_OPER_GLOBAL, .table = tsDnodeSdb, - .pObj = pDnode + .pObj = pDnode, + .pMsg = pMsg }; - int32_t code = sdbDeleteRow(&oper); - if (code != TSDB_CODE_SUCCESS) { - code = TSDB_CODE_MND_SDB_ERROR; + int32_t code = sdbDeleteRow(&oper); + if (code == TSDB_CODE_SUCCESS) { + mLPrint("dnode:%d, is dropped from cluster, result:%s", pDnode->dnodeId, tstrerror(code)); + if (pMsg != NULL) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; } - mLPrint("dnode:%d, is dropped from cluster, result:%s", pDnode->dnodeId, tstrerror(code)); return code; } -static int32_t mnodeDropDnodeByEp(char *ep) { +static int32_t mnodeDropDnodeByEp(char *ep, SMnodeMsg *pMsg) { SDnodeObj *pDnode = mnodeGetDnodeByEp(ep); if (pDnode == NULL) { mError("dnode:%s, is not exist", ep); @@ -461,7 +465,7 @@ static int32_t mnodeDropDnodeByEp(char *ep) { mPrint("dnode:%d, start to drop it", pDnode->dnodeId); #ifndef _SYNC - return mnodeDropDnode(pDnode); + return mnodeDropDnode(pDnode, pMsg); #else return balanceDropDnode(pDnode); #endif @@ -473,17 +477,7 @@ static int32_t mnodeProcessCreateDnodeMsg(SMnodeMsg *pMsg) { if (strcmp(pMsg->pUser->user, "root") != 0) { return TSDB_CODE_MND_NO_RIGHTS; } else { - int32_t code = mnodeCreateDnode(pCreate->ep); - - if (code == TSDB_CODE_SUCCESS) { - SDnodeObj *pDnode = mnodeGetDnodeByEp(pCreate->ep); - mLPrint("dnode:%d, %s is created by %s", pDnode->dnodeId, pCreate->ep, pMsg->pUser->user); - mnodeDecDnodeRef(pDnode); - } else { - mError("failed to create dnode:%s, reason:%s", pCreate->ep, tstrerror(code)); - } - - return code; + return mnodeCreateDnode(pCreate->ep, pMsg); } } @@ -493,15 +487,7 @@ static int32_t mnodeProcessDropDnodeMsg(SMnodeMsg *pMsg) { if (strcmp(pMsg->pUser->user, "root") != 0) { return TSDB_CODE_MND_NO_RIGHTS; } else { - int32_t code = mnodeDropDnodeByEp(pDrop->ep); - - if (code == TSDB_CODE_SUCCESS) { - mLPrint("dnode:%s is dropped by %s", pDrop->ep, pMsg->pUser->user); - } else { - mError("failed to drop dnode:%s, reason:%s", pDrop->ep, tstrerror(code)); - } - - return code; + return mnodeDropDnodeByEp(pDrop->ep, pMsg); } } diff --git a/src/mnode/src/mnodeSdb.c b/src/mnode/src/mnodeSdb.c index 659ac159a8fee323e2a1677c9032598ecadd2331..d8fc9aee5a680913ce52753214cac27b225033ab 100644 --- a/src/mnode/src/mnodeSdb.c +++ b/src/mnode/src/mnodeSdb.c @@ -24,6 +24,7 @@ #include "tsync.h" #include "tglobal.h" #include "dnode.h" +#include "mnode.h" #include "mnodeDef.h" #include "mnodeInt.h" #include "mnodeMnode.h" @@ -31,6 +32,7 @@ #include "mnodeSdb.h" #define SDB_TABLE_LEN 12 +#define SDB_SYNC_HACK 16 typedef enum { SDB_ACTION_INSERT, @@ -83,8 +85,29 @@ typedef struct { void * row; } SSdbRow; +typedef struct { + pthread_t thread; + int32_t workerId; +} SSdbWriteWorker; + +typedef struct { + int32_t num; + SSdbWriteWorker *writeWorker; +} SSdbWriteWorkerPool; + static SSdbObject tsSdbObj = {0}; -static int sdbWrite(void *param, void *data, int type); +static taos_qset tsSdbWriteQset; +static taos_qall tsSdbWriteQall; +static taos_queue tsSdbWriteQueue; +static SSdbWriteWorkerPool tsSdbPool; + +static int sdbWrite(void *param, void *data, int type); +static int sdbWriteToQueue(void *param, void *data, int type); +static void * sdbWorkerFp(void *param); +static int32_t sdbInitWriteWorker(); +static void sdbCleanupWriteWorker(); +static int32_t sdbAllocWriteQueue(); +static void sdbFreeWritequeue(); int32_t sdbGetId(void *handle) { return ((SSdbTable *)handle)->autoIndex; @@ -302,7 +325,7 @@ void sdbUpdateSync() { syncInfo.ahandle = NULL; syncInfo.getWalInfo = sdbGetWalInfo; syncInfo.getFileInfo = sdbGetFileInfo; - syncInfo.writeToCache = sdbWrite; + syncInfo.writeToCache = sdbWriteToQueue; syncInfo.confirmForward = sdbConfirmForward; syncInfo.notifyRole = sdbNotifyRole; tsSdbObj.cfg = syncCfg; @@ -319,10 +342,14 @@ int32_t sdbInit() { pthread_mutex_init(&tsSdbObj.mutex, NULL); sem_init(&tsSdbObj.sem, 0, 0); + if (sdbInitWriteWorker() != 0) { + return -1; + } + if (sdbInitWal() != 0) { return -1; } - + sdbRestoreTables(); if (mnodeGetMnodesNum() == 1) { @@ -340,6 +367,8 @@ void sdbCleanUp() { tsSdbObj.status = SDB_STATUS_CLOSING; + sdbCleanupWriteWorker(); + if (tsSdbObj.sync) { syncStop(tsSdbObj.sync); tsSdbObj.sync = NULL; @@ -475,7 +504,7 @@ static int32_t sdbDeleteHash(SSdbTable *pTable, SSdbOper *pOper) { pTable->numOfRows--; pthread_mutex_unlock(&pTable->mutex); - sdbTrace("table:%s, delete record:%s from hash, numOfRows:%" PRId64 "version:%" PRIu64, pTable->tableName, + sdbTrace("table:%s, delete record:%s from hash, numOfRows:%" PRId64 " version:%" PRIu64, pTable->tableName, sdbGetKeyStrFromObj(pTable, pOper->pObj), pTable->numOfRows, sdbGetVersion()); int8_t *updateEnd = pOper->pObj + pTable->refCountPos - 1; @@ -494,9 +523,10 @@ static int32_t sdbUpdateHash(SSdbTable *pTable, SSdbOper *pOper) { } static int sdbWrite(void *param, void *data, int type) { + SSdbOper *pOper = param; SWalHead *pHead = data; - int32_t tableId = pHead->msgType / 10; - int32_t action = pHead->msgType % 10; + int32_t tableId = pHead->msgType / 10; + int32_t action = pHead->msgType % 10; SSdbTable *pTable = sdbGetTableFromId(tableId); assert(pTable != NULL); @@ -531,21 +561,22 @@ static int sdbWrite(void *param, void *data, int type) { pthread_mutex_unlock(&tsSdbObj.mutex); return code; } - walFsync(tsSdbObj.wal); - + code = sdbForwardToPeer(pHead); pthread_mutex_unlock(&tsSdbObj.mutex); // from app, oper is created - if (param != NULL) { - //sdbTrace("request from app is disposed, version:%" PRIu64 " code:%s", pHead->version, tstrerror(code)); + if (pOper != NULL) { + sdbTrace("record from app is disposed, version:%" PRIu64 " result:%s", pHead->version, tstrerror(code)); return code; } // from wal or forward msg, oper not created, should add into hash if (tsSdbObj.sync != NULL) { - sdbTrace("forward request is received, version:%" PRIu64 " result:%s, confirm it", pHead->version, tstrerror(code)); + sdbTrace("record from wal forward is disposed, version:%" PRIu64 " confirm it", pHead->version); syncConfirmForward(tsSdbObj.sync, pHead->version, code); + } else { + sdbTrace("record from wal restore is disposed, version:%" PRIu64 , pHead->version); } if (action == SDB_ACTION_INSERT) { @@ -568,7 +599,7 @@ static int sdbWrite(void *param, void *data, int type) { int32_t sdbInsertRow(SSdbOper *pOper) { SSdbTable *pTable = (SSdbTable *)pOper->table; - if (pTable == NULL) return -1; + if (pTable == NULL) return TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE; if (sdbGetRowFromObj(pTable, pOper->pObj)) { sdbError("table:%s, failed to insert record:%s, already exist", pTable->tableName, sdbGetKeyStrFromObj(pTable, pOper->pObj)); @@ -587,98 +618,146 @@ int32_t sdbInsertRow(SSdbOper *pOper) { pthread_mutex_unlock(&pTable->mutex); } - if (pOper->type == SDB_OPER_GLOBAL) { - int32_t size = sizeof(SWalHead) + pTable->maxRowSize; - SWalHead *pHead = taosAllocateQitem(size); - pHead->version = 0; - pHead->len = pOper->rowSize; - pHead->msgType = pTable->tableId * 10 + SDB_ACTION_INSERT; + int32_t code = sdbInsertHash(pTable, pOper); + if (code != TSDB_CODE_SUCCESS) { + sdbError("table:%s, failed to insert into hash", pTable->tableName); + return code; + } + + // just insert data into memory + if (pOper->type != SDB_OPER_GLOBAL) { + return TSDB_CODE_SUCCESS; + } - pOper->rowData = pHead->cont; - (*pTable->encodeFp)(pOper); - pHead->len = pOper->rowSize; + int32_t size = sizeof(SSdbOper) + sizeof(SWalHead) + pTable->maxRowSize + SDB_SYNC_HACK; + SSdbOper *pNewOper = taosAllocateQitem(size); + + SWalHead *pHead = (void *)pNewOper + sizeof(SSdbOper) + SDB_SYNC_HACK; + pHead->version = 0; + pHead->len = pOper->rowSize; + pHead->msgType = pTable->tableId * 10 + SDB_ACTION_INSERT; - int32_t code = sdbWrite(pOper, pHead, pHead->msgType); - taosFreeQitem(pHead); - if (code < 0) return code; + pOper->rowData = pHead->cont; + (*pTable->encodeFp)(pOper); + pHead->len = pOper->rowSize; + + memcpy(pNewOper, pOper, sizeof(SSdbOper)); + + if (pNewOper->pMsg != NULL) { + sdbTrace("app:%p:%p, insert action is add to sdb queue", pNewOper->pMsg->rpcMsg.ahandle, pNewOper->pMsg); } - return sdbInsertHash(pTable, pOper); + taosWriteQitem(tsSdbWriteQueue, TAOS_QTYPE_RPC, pNewOper); + return TSDB_CODE_SUCCESS; } int32_t sdbDeleteRow(SSdbOper *pOper) { SSdbTable *pTable = (SSdbTable *)pOper->table; - if (pTable == NULL) return -1; + if (pTable == NULL) return TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE; SSdbRow *pMeta = sdbGetRowMetaFromObj(pTable, pOper->pObj); if (pMeta == NULL) { sdbTrace("table:%s, record is not there, delete failed", pTable->tableName); - return -1; + return TSDB_CODE_MND_SDB_OBJ_NOT_THERE; } - void * pMetaRow = pMeta->row; - assert(pMetaRow != NULL); - - if (pOper->type == SDB_OPER_GLOBAL) { - void * key = sdbGetObjKey(pTable, pOper->pObj); - int32_t keySize = 0; - switch (pTable->keyType) { - case SDB_KEY_STRING: - case SDB_KEY_VAR_STRING: - keySize = strlen((char *)key) + 1; - break; - case SDB_KEY_INT: - case SDB_KEY_AUTO: - keySize = sizeof(uint32_t); - break; - default: - return -1; - } + void *pMetaRow = pMeta->row; + if (pMetaRow == NULL) { + sdbError("table:%s, record meta is null", pTable->tableName); + return TSDB_CODE_MND_SDB_INVAID_META_ROW; + } + + int32_t code = sdbDeleteHash(pTable, pOper); + if (code != TSDB_CODE_SUCCESS) { + sdbError("table:%s, failed to delete from hash", pTable->tableName); + return code; + } - int32_t size = sizeof(SWalHead) + keySize; - SWalHead *pHead = taosAllocateQitem(size); - pHead->version = 0; - pHead->len = keySize; - pHead->msgType = pTable->tableId * 10 + SDB_ACTION_DELETE; - memcpy(pHead->cont, key, keySize); + // just delete data from memory + if (pOper->type != SDB_OPER_GLOBAL) { + return TSDB_CODE_SUCCESS; + } - int32_t code = sdbWrite(pOper, pHead, pHead->msgType); - taosFreeQitem(pHead); - if (code < 0) return code; + void * key = sdbGetObjKey(pTable, pOper->pObj); + int32_t keySize = 0; + switch (pTable->keyType) { + case SDB_KEY_STRING: + case SDB_KEY_VAR_STRING: + keySize = strlen((char *)key) + 1; + break; + case SDB_KEY_INT: + case SDB_KEY_AUTO: + keySize = sizeof(uint32_t); + break; + default: + return TSDB_CODE_MND_SDB_INVAID_KEY_TYPE; } - return sdbDeleteHash(pTable, pOper); + int32_t size = sizeof(SSdbOper) + sizeof(SWalHead) + keySize + SDB_SYNC_HACK; + SSdbOper *pNewOper = taosAllocateQitem(size); + + SWalHead *pHead = (void *)pNewOper + sizeof(SSdbOper) + SDB_SYNC_HACK; + pHead->version = 0; + pHead->len = keySize; + pHead->msgType = pTable->tableId * 10 + SDB_ACTION_DELETE; + memcpy(pHead->cont, key, keySize); + + memcpy(pNewOper, pOper, sizeof(SSdbOper)); + + if (pNewOper->pMsg != NULL) { + sdbTrace("app:%p:%p, delete action is add to sdb queue", pNewOper->pMsg->rpcMsg.ahandle, pNewOper->pMsg); + } + + taosWriteQitem(tsSdbWriteQueue, TAOS_QTYPE_RPC, pNewOper); + return TSDB_CODE_SUCCESS; } int32_t sdbUpdateRow(SSdbOper *pOper) { SSdbTable *pTable = (SSdbTable *)pOper->table; - if (pTable == NULL) return -1; + if (pTable == NULL) return TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE; SSdbRow *pMeta = sdbGetRowMetaFromObj(pTable, pOper->pObj); if (pMeta == NULL) { - sdbTrace("table:%s, record is not there, delete failed", pTable->tableName); - return -1; + sdbTrace("table:%s, record is not there, update failed", pTable->tableName); + return TSDB_CODE_MND_SDB_OBJ_NOT_THERE; } - void * pMetaRow = pMeta->row; - assert(pMetaRow != NULL); + void *pMetaRow = pMeta->row; + if (pMetaRow == NULL) { + sdbError("table:%s, record meta is null", pTable->tableName); + return TSDB_CODE_MND_SDB_INVAID_META_ROW; + } - if (pOper->type == SDB_OPER_GLOBAL) { - int32_t size = sizeof(SWalHead) + pTable->maxRowSize; - SWalHead *pHead = taosAllocateQitem(size); - pHead->version = 0; - pHead->msgType = pTable->tableId * 10 + SDB_ACTION_UPDATE; + int32_t code = sdbUpdateHash(pTable, pOper); + if (code != TSDB_CODE_SUCCESS) { + sdbError("table:%s, failed to update hash", pTable->tableName); + return code; + } - pOper->rowData = pHead->cont; - (*pTable->encodeFp)(pOper); - pHead->len = pOper->rowSize; + // just update data in memory + if (pOper->type != SDB_OPER_GLOBAL) { + return TSDB_CODE_SUCCESS; + } - int32_t code = sdbWrite(pOper, pHead, pHead->msgType); - taosFreeQitem(pHead); - if (code < 0) return code; - } - - return sdbUpdateHash(pTable, pOper); + int32_t size = sizeof(SSdbOper) + sizeof(SWalHead) + pTable->maxRowSize + SDB_SYNC_HACK; + SSdbOper *pNewOper = taosAllocateQitem(size); + + SWalHead *pHead = (void *)pNewOper + sizeof(SSdbOper) + SDB_SYNC_HACK; + pHead->version = 0; + pHead->msgType = pTable->tableId * 10 + SDB_ACTION_UPDATE; + + pOper->rowData = pHead->cont; + (*pTable->encodeFp)(pOper); + pHead->len = pOper->rowSize; + + memcpy(pNewOper, pOper, sizeof(SSdbOper)); + + if (pNewOper->pMsg != NULL) { + sdbTrace("app:%p:%p, update action is add to sdb queue", pNewOper->pMsg->rpcMsg.ahandle, pNewOper->pMsg); + } + + taosWriteQitem(tsSdbWriteQueue, TAOS_QTYPE_RPC, pNewOper); + return TSDB_CODE_SUCCESS; } void *sdbFetchRow(void *handle, void *pNode, void **ppRow) { @@ -775,3 +854,158 @@ void sdbCloseTable(void *handle) { free(pTable); } +int32_t sdbInitWriteWorker() { + tsSdbPool.num = 1; + tsSdbPool.writeWorker = (SSdbWriteWorker *)calloc(sizeof(SSdbWriteWorker), tsSdbPool.num); + + if (tsSdbPool.writeWorker == NULL) return -1; + for (int32_t i = 0; i < tsSdbPool.num; ++i) { + SSdbWriteWorker *pWorker = tsSdbPool.writeWorker + i; + pWorker->workerId = i; + } + + sdbAllocWriteQueue(); + + mPrint("sdb write is opened"); + return 0; +} + +void sdbCleanupWriteWorker() { + for (int32_t i = 0; i < tsSdbPool.num; ++i) { + SSdbWriteWorker *pWorker = tsSdbPool.writeWorker + i; + if (pWorker->thread) { + taosQsetThreadResume(tsSdbWriteQset); + } + } + + for (int32_t i = 0; i < tsSdbPool.num; ++i) { + SSdbWriteWorker *pWorker = tsSdbPool.writeWorker + i; + if (pWorker->thread) { + pthread_join(pWorker->thread, NULL); + } + } + + sdbFreeWritequeue(); + + mPrint("sdb write is closed"); +} + +int32_t sdbAllocWriteQueue() { + tsSdbWriteQueue = taosOpenQueue(); + if (tsSdbWriteQueue == NULL) return TSDB_CODE_MND_OUT_OF_MEMORY; + + tsSdbWriteQset = taosOpenQset(); + if (tsSdbWriteQset == NULL) { + taosCloseQueue(tsSdbWriteQueue); + return TSDB_CODE_MND_OUT_OF_MEMORY; + } + taosAddIntoQset(tsSdbWriteQset, tsSdbWriteQueue, NULL); + + tsSdbWriteQall = taosAllocateQall(); + if (tsSdbWriteQall == NULL) { + taosCloseQset(tsSdbWriteQset); + taosCloseQueue(tsSdbWriteQueue); + return TSDB_CODE_MND_OUT_OF_MEMORY; + } + + for (int32_t i = 0; i < tsSdbPool.num; ++i) { + SSdbWriteWorker *pWorker = tsSdbPool.writeWorker + i; + pWorker->workerId = i; + + pthread_attr_t thAttr; + pthread_attr_init(&thAttr); + pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); + + if (pthread_create(&pWorker->thread, &thAttr, sdbWorkerFp, pWorker) != 0) { + mError("failed to create thread to process sdb write queue, reason:%s", strerror(errno)); + taosFreeQall(tsSdbWriteQall); + taosCloseQset(tsSdbWriteQset); + taosCloseQueue(tsSdbWriteQueue); + return TSDB_CODE_MND_OUT_OF_MEMORY; + } + + pthread_attr_destroy(&thAttr); + mTrace("sdb write worker:%d is launched, total:%d", pWorker->workerId, tsSdbPool.num); + } + + mTrace("sdb write queue:%p is allocated", tsSdbWriteQueue); + return TSDB_CODE_SUCCESS; +} + +void sdbFreeWritequeue() { + taosCloseQset(tsSdbWriteQueue); + taosFreeQall(tsSdbWriteQall); + taosCloseQset(tsSdbWriteQset); + tsSdbWriteQall = NULL; + tsSdbWriteQset = NULL; + tsSdbWriteQueue = NULL; +} + +int sdbWriteToQueue(void *param, void *data, int type) { + SWalHead *pHead = data; + int size = sizeof(SWalHead) + pHead->len; + SWalHead *pWal = (SWalHead *)taosAllocateQitem(size); + memcpy(pWal, pHead, size); + + taosWriteQitem(tsSdbWriteQueue, type, pWal); + return 0; +} + +static void *sdbWorkerFp(void *param) { + SWalHead *pHead; + SSdbOper *pOper; + int32_t type; + int32_t numOfMsgs; + void * item; + void * unUsed; + + while (1) { + numOfMsgs = taosReadAllQitemsFromQset(tsSdbWriteQset, tsSdbWriteQall, &unUsed); + if (numOfMsgs == 0) { + sdbTrace("sdbWorkerFp: got no message from qset, exiting..."); + break; + } + + for (int32_t i = 0; i < numOfMsgs; ++i) { + taosGetQitem(tsSdbWriteQall, &type, &item); + if (type == TAOS_QTYPE_RPC) { + pOper = (SSdbOper *)item; + pHead = (void *)pOper + sizeof(SSdbOper) + SDB_SYNC_HACK; + } else { + pHead = (SWalHead *)item; + pOper = NULL; + } + + if (pOper != NULL && pOper->pMsg != NULL) { + sdbTrace("app:%p:%p, will be processed in sdb queue", pOper->pMsg->rpcMsg.ahandle, pOper->pMsg); + } + + int32_t code = sdbWrite(pOper, pHead, type); + if (pOper) pOper->retCode = code; + } + + walFsync(tsSdbObj.wal); + + // browse all items, and process them one by one + taosResetQitems(tsSdbWriteQall); + for (int32_t i = 0; i < numOfMsgs; ++i) { + taosGetQitem(tsSdbWriteQall, &type, &item); + if (type == TAOS_QTYPE_RPC) { + pOper = (SSdbOper *)item; + if (pOper != NULL && pOper->cb != NULL) { + pOper->retCode = (*pOper->cb)(pOper->pMsg, pOper->retCode); + } + + if (pOper != NULL && pOper->pMsg != NULL) { + sdbTrace("app:%p:%p, msg is processed, result:%s", pOper->pMsg->rpcMsg.ahandle, pOper->pMsg, + tstrerror(pOper->retCode)); + } + + dnodeSendRpcMnodeWriteRsp(pOper->pMsg, pOper->retCode); + } + taosFreeQitem(item); + } + } + + return NULL; +} diff --git a/src/mnode/src/mnodeTable.c b/src/mnode/src/mnodeTable.c index a2deeaa489faee88f1ad4906466fa6c9b651d383..a421b01d7a04f049472e9a8c8486fbd89eea67e0 100644 --- a/src/mnode/src/mnodeTable.c +++ b/src/mnode/src/mnodeTable.c @@ -185,7 +185,7 @@ static int32_t mnodeChildTableActionUpdate(SSdbOper *pOper) { void *oldTableId = pTable->info.tableId; void *oldSql = pTable->sql; void *oldSchema = pTable->schema; - memcpy(pTable, pNew, pOper->rowSize); + memcpy(pTable, pNew, sizeof(SChildTableObj)); pTable->sql = pNew->sql; pTable->schema = pNew->schema; free(pNew); @@ -439,7 +439,7 @@ static int32_t mnodeSuperTableActionUpdate(SSdbOper *pOper) { if (pTable != pNew) { void *oldTableId = pTable->info.tableId; void *oldSchema = pTable->schema; - memcpy(pTable, pNew, pOper->rowSize); + memcpy(pTable, pNew, sizeof(SSuperTableObj)); pTable->schema = pNew->schema; free(pNew->vgHash); free(pNew); @@ -669,29 +669,32 @@ static int32_t mnodeProcessCreateTableMsg(SMnodeMsg *pMsg) { if (pMsg->pDb == NULL) pMsg->pDb = mnodeGetDb(pCreate->db); if (pMsg->pDb == NULL || pMsg->pDb->status != TSDB_DB_STATUS_READY) { - mError("table:%s, failed to create, db not selected", pCreate->tableId); + mError("app:%p:%p, table:%s, failed to create, db not selected", pMsg->rpcMsg.ahandle, pMsg, pCreate->tableId); return TSDB_CODE_MND_DB_NOT_SELECTED; } if (pMsg->pTable == NULL) pMsg->pTable = mnodeGetTable(pCreate->tableId); if (pMsg->pTable != NULL && pMsg->retry == 0) { if (pCreate->getMeta) { - mTrace("table:%s, continue to get meta", pCreate->tableId); + mTrace("app:%p:%p, table:%s, continue to get meta", pMsg->rpcMsg.ahandle, pMsg, pCreate->tableId); return mnodeGetChildTableMeta(pMsg); } else if (pCreate->igExists) { - mTrace("table:%s, is already exist", pCreate->tableId); + mTrace("app:%p:%p, table:%s, is already exist", pMsg->rpcMsg.ahandle, pMsg, pCreate->tableId); return TSDB_CODE_SUCCESS; } else { - mError("table:%s, failed to create, table already exist", pCreate->tableId); + mError("app:%p:%p, table:%s, failed to create, table already exist", pMsg->rpcMsg.ahandle, pMsg, + pCreate->tableId); return TSDB_CODE_MND_TABLE_ALREADY_EXIST; } } if (pCreate->numOfTags != 0) { - mTrace("table:%s, create stable msg is received from thandle:%p", pCreate->tableId, pMsg->rpcMsg.handle); + mTrace("app:%p:%p, table:%s, create stable msg is received from thandle:%p", pMsg->rpcMsg.ahandle, pMsg, + pCreate->tableId, pMsg->rpcMsg.handle); return mnodeProcessCreateSuperTableMsg(pMsg); } else { - mTrace("table:%s, create ctable msg is received from thandle:%p", pCreate->tableId, pMsg->rpcMsg.handle); + mTrace("app:%p:%p, table:%s, create ctable msg is received from thandle:%p", pMsg->rpcMsg.ahandle, pMsg, + pCreate->tableId, pMsg->rpcMsg.handle); return mnodeProcessCreateChildTableMsg(pMsg); } } @@ -700,31 +703,32 @@ static int32_t mnodeProcessDropTableMsg(SMnodeMsg *pMsg) { SCMDropTableMsg *pDrop = pMsg->rpcMsg.pCont; if (pMsg->pDb == NULL) pMsg->pDb = mnodeGetDbByTableId(pDrop->tableId); if (pMsg->pDb == NULL || pMsg->pDb->status != TSDB_DB_STATUS_READY) { - mError("table:%s, failed to drop table, db not selected", pDrop->tableId); + mError("app:%p:%p, table:%s, failed to drop table, db not selected", pMsg->rpcMsg.ahandle, pMsg, pDrop->tableId); return TSDB_CODE_MND_DB_NOT_SELECTED; } if (mnodeCheckIsMonitorDB(pMsg->pDb->name, tsMonitorDbName)) { - mError("table:%s, failed to drop table, in monitor database", pDrop->tableId); + mError("app:%p:%p, table:%s, failed to drop table, in monitor database", pMsg->rpcMsg.ahandle, pMsg, + pDrop->tableId); return TSDB_CODE_MND_MONITOR_DB_FORBIDDEN; } if (pMsg->pTable == NULL) pMsg->pTable = mnodeGetTable(pDrop->tableId); if (pMsg->pTable == NULL) { if (pDrop->igNotExists) { - mTrace("table:%s, table is not exist, think drop success", pDrop->tableId); + mTrace("app:%p:%p, table:%s, table is not exist, think drop success", pMsg->rpcMsg.ahandle, pMsg, pDrop->tableId); return TSDB_CODE_SUCCESS; } else { - mError("table:%s, failed to drop table, table not exist", pDrop->tableId); + mError("app:%p:%p, table:%s, failed to drop table, table not exist", pMsg->rpcMsg.ahandle, pMsg, pDrop->tableId); return TSDB_CODE_MND_INVALID_TABLE_NAME; } } if (pMsg->pTable->type == TSDB_SUPER_TABLE) { - mPrint("table:%s, start to drop stable", pDrop->tableId); + mPrint("app:%p:%p, table:%s, start to drop stable", pMsg->rpcMsg.ahandle, pMsg, pDrop->tableId); return mnodeProcessDropSuperTableMsg(pMsg); } else { - mPrint("table:%s, start to drop ctable", pDrop->tableId); + mPrint("app:%p:%p, table:%s, start to drop ctable", pMsg->rpcMsg.ahandle, pMsg, pDrop->tableId); return mnodeProcessDropChildTableMsg(pMsg); } } @@ -732,21 +736,25 @@ static int32_t mnodeProcessDropTableMsg(SMnodeMsg *pMsg) { static int32_t mnodeProcessTableMetaMsg(SMnodeMsg *pMsg) { SCMTableInfoMsg *pInfo = pMsg->rpcMsg.pCont; pInfo->createFlag = htons(pInfo->createFlag); - mTrace("table:%s, table meta msg is received from thandle:%p, createFlag:%d", pInfo->tableId, pMsg->rpcMsg.handle, pInfo->createFlag); + mTrace("app:%p:%p, table:%s, table meta msg is received from thandle:%p, createFlag:%d", pMsg->rpcMsg.ahandle, pMsg, + pInfo->tableId, pMsg->rpcMsg.handle, pInfo->createFlag); if (pMsg->pDb == NULL) pMsg->pDb = mnodeGetDbByTableId(pInfo->tableId); if (pMsg->pDb == NULL || pMsg->pDb->status != TSDB_DB_STATUS_READY) { - mError("table:%s, failed to get table meta, db not selected", pInfo->tableId); + mError("app:%p:%p, table:%s, failed to get table meta, db not selected", pMsg->rpcMsg.ahandle, pMsg, + pInfo->tableId); return TSDB_CODE_MND_DB_NOT_SELECTED; } if (pMsg->pTable == NULL) pMsg->pTable = mnodeGetTable(pInfo->tableId); if (pMsg->pTable == NULL) { if (!pInfo->createFlag) { - mError("table:%s, failed to get table meta, table not exist", pInfo->tableId); + mError("app:%p:%p, table:%s, failed to get table meta, table not exist", pMsg->rpcMsg.ahandle, pMsg, + pInfo->tableId); return TSDB_CODE_MND_INVALID_TABLE_NAME; } else { - mTrace("table:%s, failed to get table meta, start auto create table ", pInfo->tableId); + mTrace("app:%p:%p, table:%s, failed to get table meta, start auto create table ", pMsg->rpcMsg.ahandle, pMsg, + pInfo->tableId); return mnodeAutoCreateChildTable(pMsg); } } else { @@ -760,9 +768,9 @@ static int32_t mnodeProcessTableMetaMsg(SMnodeMsg *pMsg) { static int32_t mnodeProcessCreateSuperTableMsg(SMnodeMsg *pMsg) { SCMCreateTableMsg *pCreate = pMsg->rpcMsg.pCont; - SSuperTableObj *pStable = calloc(1, sizeof(SSuperTableObj)); + SSuperTableObj * pStable = calloc(1, sizeof(SSuperTableObj)); if (pStable == NULL) { - mError("table:%s, failed to create, no enough memory", pCreate->tableId); + mError("app:%p:%p, table:%s, failed to create, no enough memory", pMsg->rpcMsg.ahandle, pMsg, pCreate->tableId); return TSDB_CODE_MND_OUT_OF_MEMORY; } @@ -780,7 +788,7 @@ static int32_t mnodeProcessCreateSuperTableMsg(SMnodeMsg *pMsg) { pStable->schema = (SSchema *)calloc(1, schemaSize); if (pStable->schema == NULL) { free(pStable); - mError("table:%s, failed to create, no schema input", pCreate->tableId); + mError("app:%p:%p, table:%s, failed to create, no schema input", pMsg->rpcMsg.ahandle, pMsg, pCreate->tableId); return TSDB_CODE_MND_INVALID_TABLE_NAME; } memcpy(pStable->schema, pCreate->schema, numOfCols * sizeof(SSchema)); @@ -799,18 +807,21 @@ static int32_t mnodeProcessCreateSuperTableMsg(SMnodeMsg *pMsg) { .type = SDB_OPER_GLOBAL, .table = tsSuperTableSdb, .pObj = pStable, - .rowSize = sizeof(SSuperTableObj) + schemaSize + .rowSize = sizeof(SSuperTableObj) + schemaSize, + .pMsg = pMsg }; int32_t code = sdbInsertRow(&oper); if (code != TSDB_CODE_SUCCESS) { mnodeDestroySuperTable(pStable); - mError("table:%s, failed to create, sdb error", pCreate->tableId); - return TSDB_CODE_MND_SDB_ERROR; + mError("app:%p:%p, table:%s, failed to create, sdb error", pMsg->rpcMsg.ahandle, pMsg, pCreate->tableId); } else { - mLPrint("table:%s, is created, tags:%d fields:%d", pStable->info.tableId, pStable->numOfTags, pStable->numOfColumns); - return TSDB_CODE_SUCCESS; + mLPrint("app:%p:%p, table:%s, is created, tags:%d fields:%d", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId, + pStable->numOfTags, pStable->numOfColumns); + if (pMsg != NULL) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; } + + return code; } static int32_t mnodeProcessDropSuperTableMsg(SMnodeMsg *pMsg) { @@ -827,10 +838,11 @@ static int32_t mnodeProcessDropSuperTableMsg(SMnodeMsg *pMsg) { pDrop->vgId = htonl(pVgroup->vgId); pDrop->uid = htobe64(pStable->uid); mnodeExtractTableName(pStable->info.tableId, pDrop->tableId); - - mPrint("stable:%s, send drop stable msg to vgId:%d", pStable->info.tableId, pVgroup->vgId); + + mPrint("app:%p:%p, stable:%s, send drop stable msg to vgId:%d", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId, + pVgroup->vgId); SRpcIpSet ipSet = mnodeGetIpSetFromVgroup(pVgroup); - SRpcMsg rpcMsg = {.pCont = pDrop, .contLen = sizeof(SMDDropSTableMsg), .msgType = TSDB_MSG_TYPE_MD_DROP_STABLE}; + SRpcMsg rpcMsg = {.pCont = pDrop, .contLen = sizeof(SMDDropSTableMsg), .msgType = TSDB_MSG_TYPE_MD_DROP_STABLE}; dnodeSendMsgToDnode(&ipSet, &rpcMsg); mnodeDecVgroupRef(pVgroup); } @@ -842,11 +854,16 @@ static int32_t mnodeProcessDropSuperTableMsg(SMnodeMsg *pMsg) { SSdbOper oper = { .type = SDB_OPER_GLOBAL, .table = tsSuperTableSdb, - .pObj = pStable + .pObj = pStable, + .pMsg = pMsg }; int32_t code = sdbDeleteRow(&oper); - mLPrint("stable:%s, is dropped from sdb, result:%s", pStable->info.tableId, tstrerror(code)); + if (code == TSDB_CODE_SUCCESS) { + mLPrint("stable:%s, is dropped from sdb, result:%s", pStable->info.tableId, tstrerror(code)); + if (pMsg != NULL) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; + } + return code; } @@ -861,20 +878,31 @@ static int32_t mnodeFindSuperTableTagIndex(SSuperTableObj *pStable, const char * return -1; } -static int32_t mnodeAddSuperTableTag(SSuperTableObj *pStable, SSchema schema[], int32_t ntags) { +static int32_t mnodeAddSuperTableTagCb(SMnodeMsg *pMsg, int32_t code) { + SSuperTableObj *pStable = (SSuperTableObj *)pMsg->pTable; + mLPrint("app:%p:%p, stable %s, add tag result:%s", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId, + tstrerror(code)); + + return code; +} + +static int32_t mnodeAddSuperTableTag(SMnodeMsg *pMsg, SSchema schema[], int32_t ntags) { + SSuperTableObj *pStable = (SSuperTableObj *)pMsg->pTable; if (pStable->numOfTags + ntags > TSDB_MAX_TAGS) { - mError("stable:%s, add tag, too many tags", pStable->info.tableId); + mError("app:%p:%p, stable:%s, add tag, too many tags", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId); return TSDB_CODE_MND_TOO_MANY_TAGS; } for (int32_t i = 0; i < ntags; i++) { if (mnodeFindSuperTableColumnIndex(pStable, schema[i].name) > 0) { - mError("stable:%s, add tag, column:%s already exist", pStable->info.tableId, schema[i].name); + mError("app:%p:%p, stable:%s, add tag, column:%s already exist", pMsg->rpcMsg.ahandle, pMsg, + pStable->info.tableId, schema[i].name); return TSDB_CODE_MND_TAG_ALREAY_EXIST; } if (mnodeFindSuperTableTagIndex(pStable, schema[i].name) > 0) { - mError("stable:%s, add tag, tag:%s already exist", pStable->info.tableId, schema[i].name); + mError("app:%p:%p, stable:%s, add tag, tag:%s already exist", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId, + schema[i].name); return TSDB_CODE_MND_FIELD_ALREAY_EXIST; } } @@ -892,25 +920,38 @@ static int32_t mnodeAddSuperTableTag(SSuperTableObj *pStable, SSchema schema[], pStable->numOfTags += ntags; pStable->tversion++; + mPrint("app:%p:%p, stable %s, start to add tag %s", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId, + schema[0].name); + SSdbOper oper = { .type = SDB_OPER_GLOBAL, .table = tsSuperTableSdb, - .pObj = pStable + .pObj = pStable, + .pMsg = pMsg, + .cb = mnodeAddSuperTableTagCb }; int32_t code = sdbUpdateRow(&oper); - if (code != TSDB_CODE_SUCCESS) { - return TSDB_CODE_MND_SDB_ERROR; + if (code == TSDB_CODE_SUCCESS) { + code = TSDB_CODE_MND_ACTION_IN_PROGRESS; } - mPrint("stable %s, succeed to add tag %s", pStable->info.tableId, schema[0].name); - return TSDB_CODE_SUCCESS; + return code; } -static int32_t mnodeDropSuperTableTag(SSuperTableObj *pStable, char *tagName) { +static int32_t mnodeDropSuperTableTagCb(SMnodeMsg *pMsg, int32_t code) { + SSuperTableObj *pStable = (SSuperTableObj *)pMsg->pTable; + mLPrint("app:%p:%p, stable %s, drop tag result:%s", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId, + tstrerror(code)); + return code; +} + +static int32_t mnodeDropSuperTableTag(SMnodeMsg *pMsg, char *tagName) { + SSuperTableObj *pStable = (SSuperTableObj *)pMsg->pTable; int32_t col = mnodeFindSuperTableTagIndex(pStable, tagName); if (col < 0) { - mError("stable:%s, drop tag, tag:%s not exist", pStable->info.tableId, tagName); + mError("app:%p:%p, stable:%s, drop tag, tag:%s not exist", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId, + tagName); return TSDB_CODE_MND_TAG_NOT_EXIST; } @@ -919,25 +960,37 @@ static int32_t mnodeDropSuperTableTag(SSuperTableObj *pStable, char *tagName) { pStable->numOfTags--; pStable->tversion++; + mPrint("app:%p:%p, stable %s, start to drop tag %s", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId, tagName); + SSdbOper oper = { .type = SDB_OPER_GLOBAL, .table = tsSuperTableSdb, - .pObj = pStable + .pObj = pStable, + .pMsg = pMsg, + .cb = mnodeDropSuperTableTagCb }; int32_t code = sdbUpdateRow(&oper); - if (code != TSDB_CODE_SUCCESS) { - return TSDB_CODE_MND_SDB_ERROR; + if (code == TSDB_CODE_SUCCESS) { + code = TSDB_CODE_MND_ACTION_IN_PROGRESS; } - - mPrint("stable %s, succeed to drop tag %s", pStable->info.tableId, tagName); - return TSDB_CODE_SUCCESS; + + return code; } -static int32_t mnodeModifySuperTableTagName(SSuperTableObj *pStable, char *oldTagName, char *newTagName) { +static int32_t mnodeModifySuperTableTagNameCb(SMnodeMsg *pMsg, int32_t code) { + SSuperTableObj *pStable = (SSuperTableObj *)pMsg->pTable; + mLPrint("app:%p:%p, stable %s, modify tag result:%s", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId, + tstrerror(code)); + return code; +} + +static int32_t mnodeModifySuperTableTagName(SMnodeMsg *pMsg, char *oldTagName, char *newTagName) { + SSuperTableObj *pStable = (SSuperTableObj *)pMsg->pTable; int32_t col = mnodeFindSuperTableTagIndex(pStable, oldTagName); if (col < 0) { - mError("stable:%s, failed to modify table tag, oldName: %s, newName: %s", pStable->info.tableId, oldTagName, newTagName); + mError("app:%p:%p, stable:%s, failed to modify table tag, oldName: %s, newName: %s", pMsg->rpcMsg.ahandle, pMsg, + pStable->info.tableId, oldTagName, newTagName); return TSDB_CODE_MND_TAG_NOT_EXIST; } @@ -950,24 +1003,28 @@ static int32_t mnodeModifySuperTableTagName(SSuperTableObj *pStable, char *oldTa if (mnodeFindSuperTableTagIndex(pStable, newTagName) >= 0) { return TSDB_CODE_MND_TAG_ALREAY_EXIST; } - + // update SSchema *schema = (SSchema *) (pStable->schema + pStable->numOfColumns + col); tstrncpy(schema->name, newTagName, sizeof(schema->name)); + mPrint("app:%p:%p, stable %s, start to modify tag %s to %s", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId, + oldTagName, newTagName); + SSdbOper oper = { .type = SDB_OPER_GLOBAL, .table = tsSuperTableSdb, - .pObj = pStable + .pObj = pStable, + .pMsg = pMsg, + .cb = mnodeModifySuperTableTagNameCb }; int32_t code = sdbUpdateRow(&oper); - if (code != TSDB_CODE_SUCCESS) { - return TSDB_CODE_MND_SDB_ERROR; + if (code == TSDB_CODE_SUCCESS) { + code = TSDB_CODE_MND_ACTION_IN_PROGRESS; } - - mPrint("stable %s, succeed to modify tag %s to %s", pStable->info.tableId, oldTagName, newTagName); - return TSDB_CODE_SUCCESS; + + return code; } static int32_t mnodeFindSuperTableColumnIndex(SSuperTableObj *pStable, char *colName) { @@ -981,20 +1038,31 @@ static int32_t mnodeFindSuperTableColumnIndex(SSuperTableObj *pStable, char *col return -1; } -static int32_t mnodeAddSuperTableColumn(SDbObj *pDb, SSuperTableObj *pStable, SSchema schema[], int32_t ncols) { +static int32_t mnodeAddSuperTableColumnCb(SMnodeMsg *pMsg, int32_t code) { + SSuperTableObj *pStable = (SSuperTableObj *)pMsg->pTable; + mLPrint("app:%p:%p, stable %s, add column result:%s", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId, + tstrerror(code)); + return code; +} + +static int32_t mnodeAddSuperTableColumn(SMnodeMsg *pMsg, SSchema schema[], int32_t ncols) { + SDbObj *pDb = pMsg->pDb; + SSuperTableObj *pStable = (SSuperTableObj *)pMsg->pTable; if (ncols <= 0) { - mError("stable:%s, add column, ncols:%d <= 0", pStable->info.tableId); + mError("app:%p:%p, stable:%s, add column, ncols:%d <= 0", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId); return TSDB_CODE_MND_APP_ERROR; } for (int32_t i = 0; i < ncols; i++) { if (mnodeFindSuperTableColumnIndex(pStable, schema[i].name) > 0) { - mError("stable:%s, add column, column:%s already exist", pStable->info.tableId, schema[i].name); + mError("app:%p:%p, stable:%s, add column, column:%s already exist", pMsg->rpcMsg.ahandle, pMsg, + pStable->info.tableId, schema[i].name); return TSDB_CODE_MND_FIELD_ALREAY_EXIST; } if (mnodeFindSuperTableTagIndex(pStable, schema[i].name) > 0) { - mError("stable:%s, add column, tag:%s already exist", pStable->info.tableId, schema[i].name); + mError("app:%p:%p, stable:%s, add column, tag:%s already exist", pMsg->rpcMsg.ahandle, pMsg, + pStable->info.tableId, schema[i].name); return TSDB_CODE_MND_TAG_ALREAY_EXIST; } } @@ -1020,25 +1088,38 @@ static int32_t mnodeAddSuperTableColumn(SDbObj *pDb, SSuperTableObj *pStable, SS mnodeDecAcctRef(pAcct); } + mPrint("app:%p:%p, stable %s, start to add column", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId); + SSdbOper oper = { .type = SDB_OPER_GLOBAL, .table = tsSuperTableSdb, - .pObj = pStable + .pObj = pStable, + .pMsg = pMsg, + .cb = mnodeAddSuperTableColumnCb }; int32_t code = sdbUpdateRow(&oper); - if (code != TSDB_CODE_SUCCESS) { - return TSDB_CODE_MND_SDB_ERROR; + if (code == TSDB_CODE_SUCCESS) { + code = TSDB_CODE_MND_ACTION_IN_PROGRESS; } - - mPrint("stable %s, succeed to add column", pStable->info.tableId); - return TSDB_CODE_SUCCESS; + + return code; +} + +static int32_t mnodeDropSuperTableColumnCb(SMnodeMsg *pMsg, int32_t code) { + SSuperTableObj *pStable = (SSuperTableObj *)pMsg->pTable; + mLPrint("app:%p:%p, stable %s, delete column result:%s", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId, + tstrerror(code)); + return code; } -static int32_t mnodeDropSuperTableColumn(SDbObj *pDb, SSuperTableObj *pStable, char *colName) { +static int32_t mnodeDropSuperTableColumn(SMnodeMsg *pMsg, char *colName) { + SDbObj *pDb = pMsg->pDb; + SSuperTableObj *pStable = (SSuperTableObj *)pMsg->pTable; int32_t col = mnodeFindSuperTableColumnIndex(pStable, colName); if (col <= 0) { - mError("stable:%s, drop column, column:%s not exist", pStable->info.tableId, colName); + mError("app:%p:%p, stable:%s, drop column, column:%s not exist", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId, + colName); return TSDB_CODE_MND_FIELD_NOT_EXIST; } @@ -1057,19 +1138,22 @@ static int32_t mnodeDropSuperTableColumn(SDbObj *pDb, SSuperTableObj *pStable, c mnodeDecAcctRef(pAcct); } + mPrint("app:%p:%p, stable %s, start to delete column", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId); + SSdbOper oper = { .type = SDB_OPER_GLOBAL, .table = tsSuperTableSdb, - .pObj = pStable + .pObj = pStable, + .pMsg = pMsg, + .cb = mnodeDropSuperTableColumnCb }; int32_t code = sdbUpdateRow(&oper); - if (code != TSDB_CODE_SUCCESS) { - return TSDB_CODE_MND_SDB_ERROR; + if (code == TSDB_CODE_SUCCESS) { + code = TSDB_CODE_MND_ACTION_IN_PROGRESS; } - - mPrint("stable %s, succeed to delete column", pStable->info.tableId); - return TSDB_CODE_SUCCESS; + + return code; } // show super tables @@ -1258,8 +1342,9 @@ static int32_t mnodeGetSuperTableMeta(SMnodeMsg *pMsg) { pMeta->contLen = htons(pMeta->contLen); pMsg->rpcRsp.rsp = pMeta; - - mTrace("stable:%s, uid:%" PRIu64 " table meta is retrieved", pTable->info.tableId, pTable->uid); + + mTrace("app:%p:%p, stable:%s, uid:%" PRIu64 " table meta is retrieved", pMsg->rpcMsg.ahandle, pMsg, + pTable->info.tableId, pTable->uid); return TSDB_CODE_SUCCESS; } @@ -1290,12 +1375,13 @@ static int32_t mnodeProcessSuperTableVgroupMsg(SMnodeMsg *pMsg) { char * stableName = (char *)pInfo + sizeof(SCMSTableVgroupMsg) + (TSDB_TABLE_ID_LEN)*i; SSuperTableObj *pTable = mnodeGetSuperTable(stableName); if (pTable == NULL) { - mError("stable:%s, not exist while get stable vgroup info", stableName); + mError("app:%p:%p, stable:%s, not exist while get stable vgroup info", pMsg->rpcMsg.ahandle, pMsg, stableName); mnodeDecTableRef(pTable); continue; } if (pTable->vgHash == NULL) { - mError("stable:%s, not vgroup exist while get stable vgroup info", stableName); + mError("app:%p:%p, stable:%s, not vgroup exist while get stable vgroup info", pMsg->rpcMsg.ahandle, pMsg, + stableName); mnodeDecTableRef(pTable); // even this super table has no corresponding table, still return @@ -1430,12 +1516,35 @@ static void *mnodeBuildCreateChildTableMsg(SCMCreateTableMsg *pMsg, SChildTableO return pCreate; } -static SChildTableObj* mnodeDoCreateChildTable(SCMCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t tid) { +static int32_t mnodeDoCreateChildTableCb(SMnodeMsg *pMsg, int32_t code) { + if (code != TSDB_CODE_SUCCESS) return code; + + SCMCreateTableMsg *pCreate = pMsg->rpcMsg.pCont; + SMDCreateTableMsg *pMDCreate = mnodeBuildCreateChildTableMsg(pCreate, (SChildTableObj *)pMsg->pTable); + if (pMDCreate == NULL) { + return terrno; + } + + SRpcIpSet ipSet = mnodeGetIpSetFromVgroup(pMsg->pVgroup); + SRpcMsg rpcMsg = { + .handle = pMsg, + .pCont = pMDCreate, + .contLen = htonl(pMDCreate->contLen), + .code = 0, + .msgType = TSDB_MSG_TYPE_MD_CREATE_TABLE + }; + + dnodeSendMsgToDnode(&ipSet, &rpcMsg); + return TSDB_CODE_MND_ACTION_IN_PROGRESS; +} + +static int32_t mnodeDoCreateChildTable(SMnodeMsg *pMsg, int32_t tid) { + SVgObj *pVgroup = pMsg->pVgroup; + SCMCreateTableMsg *pCreate = pMsg->rpcMsg.pCont; SChildTableObj *pTable = calloc(1, sizeof(SChildTableObj)); if (pTable == NULL) { - mError("table:%s, failed to alloc memory", pCreate->tableId); - terrno = TSDB_CODE_MND_OUT_OF_MEMORY; - return NULL; + mError("app:%p:%p, table:%s, failed to alloc memory", pMsg->rpcMsg.ahandle, pMsg, pCreate->tableId); + return TSDB_CODE_MND_OUT_OF_MEMORY; } if (pCreate->numOfColumns == 0) { @@ -1453,10 +1562,10 @@ static SChildTableObj* mnodeDoCreateChildTable(SCMCreateTableMsg *pCreate, SVgOb STagData *pTagData = (STagData *) pCreate->schema; // it is a tag key SSuperTableObj *pSuperTable = mnodeGetSuperTable(pTagData->name); if (pSuperTable == NULL) { - mError("table:%s, corresponding super table:%s does not exist", pCreate->tableId, pTagData->name); + mError("app:%p:%p, table:%s, corresponding super table:%s does not exist", pMsg->rpcMsg.ahandle, pMsg, + pCreate->tableId, pTagData->name); mnodeDestroyChildTable(pTable); - terrno = TSDB_CODE_MND_INVALID_TABLE_NAME; - return NULL; + return TSDB_CODE_MND_INVALID_TABLE_NAME; } mnodeDecTableRef(pSuperTable); @@ -1475,8 +1584,7 @@ static SChildTableObj* mnodeDoCreateChildTable(SCMCreateTableMsg *pCreate, SVgOb pTable->schema = (SSchema *) calloc(1, schemaSize); if (pTable->schema == NULL) { free(pTable); - terrno = TSDB_CODE_MND_OUT_OF_MEMORY; - return NULL; + return TSDB_CODE_MND_OUT_OF_MEMORY; } memcpy(pTable->schema, pCreate->schema, numOfCols * sizeof(SSchema)); @@ -1492,42 +1600,51 @@ static SChildTableObj* mnodeDoCreateChildTable(SCMCreateTableMsg *pCreate, SVgOb pTable->sql = calloc(1, pTable->sqlLen); if (pTable->sql == NULL) { free(pTable); - terrno = TSDB_CODE_MND_OUT_OF_MEMORY; - return NULL; + return TSDB_CODE_MND_OUT_OF_MEMORY; } memcpy(pTable->sql, (char *) (pCreate->schema) + numOfCols * sizeof(SSchema), pTable->sqlLen); pTable->sql[pTable->sqlLen - 1] = 0; - mTrace("table:%s, stream sql len:%d sql:%s", pTable->info.tableId, pTable->sqlLen, pTable->sql); + mTrace("app:%p:%p, table:%s, stream sql len:%d sql:%s", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId, + pTable->sqlLen, pTable->sql); } } - + + pMsg->pTable = (STableObj *)pTable; + mnodeIncTableRef(pMsg->pTable); + SSdbOper desc = {0}; desc.type = SDB_OPER_GLOBAL; desc.pObj = pTable; desc.table = tsChildTableSdb; + desc.pMsg = pMsg; + desc.cb = mnodeDoCreateChildTableCb; - if (sdbInsertRow(&desc) != TSDB_CODE_SUCCESS) { + int32_t code = sdbInsertRow(&desc); + if (code != TSDB_CODE_SUCCESS) { free(pTable); - mError("table:%s, update sdb error", pCreate->tableId); - terrno = TSDB_CODE_MND_SDB_ERROR; - return NULL; + mError("app:%p:%p, table:%s, update sdb error, reason:%s", pMsg->rpcMsg.ahandle, pMsg, pCreate->tableId, + tstrerror(code)); + pMsg->pTable = NULL; + return code; + } else { + mTrace("app:%p:%p, table:%s, create table in vgroup:%d, id:%d, uid:%" PRIu64, pMsg->rpcMsg.ahandle, pMsg, + pTable->info.tableId, pVgroup->vgId, pTable->sid, pTable->uid); + return TSDB_CODE_SUCCESS; } - - mTrace("table:%s, create table in vgroup:%d, id:%d, uid:%" PRIu64 , pTable->info.tableId, pVgroup->vgId, pTable->sid, pTable->uid); - return pTable; } static int32_t mnodeProcessCreateChildTableMsg(SMnodeMsg *pMsg) { SCMCreateTableMsg *pCreate = pMsg->rpcMsg.pCont; int32_t code = grantCheck(TSDB_GRANT_TIMESERIES); if (code != TSDB_CODE_SUCCESS) { - mError("table:%s, failed to create, grant timeseries failed", pCreate->tableId); + mError("app:%p:%p, table:%s, failed to create, grant timeseries failed", pMsg->rpcMsg.ahandle, pMsg, + pCreate->tableId); return code; } SVgObj *pVgroup = mnodeGetAvailableVgroup(pMsg->pDb); if (pVgroup == NULL) { - mTrace("table:%s, start to create a new vgroup", pCreate->tableId); + mTrace("app:%p:%p, table:%s, start to create a new vgroup", pMsg->rpcMsg.ahandle, pMsg, pCreate->tableId); return mnodeCreateVgroup(pMsg, pMsg->pDb); } @@ -1535,55 +1652,53 @@ static int32_t mnodeProcessCreateChildTableMsg(SMnodeMsg *pMsg) { if (pMsg->pTable == NULL) { int32_t sid = taosAllocateId(pVgroup->idPool); if (sid <= 0) { - mTrace("tables:%s, no enough sid in vgId:%d", pCreate->tableId, pVgroup->vgId); + mTrace("app:%p:%p, table:%s, no enough sid in vgId:%d", pMsg->rpcMsg.ahandle, pMsg, pCreate->tableId, + pVgroup->vgId); return mnodeCreateVgroup(pMsg, pMsg->pDb); } - pMsg->pTable = (STableObj *)mnodeDoCreateChildTable(pCreate, pVgroup, sid); - if (pMsg->pTable == NULL) { - return terrno; + if (pMsg->pVgroup == NULL) { + pMsg->pVgroup = pVgroup; + mnodeIncVgroupRef(pVgroup); } - mnodeIncTableRef(pMsg->pTable); + mTrace("app:%p:%p, table:%s, create table in vgroup, vgId:%d sid:%d", pMsg->rpcMsg.ahandle, pMsg, pCreate->tableId, + pVgroup->vgId, sid); + + code = mnodeDoCreateChildTable(pMsg, sid); + if (code != TSDB_CODE_SUCCESS) { + return code; + } else { + return TSDB_CODE_MND_ACTION_IN_PROGRESS; + } } } else { if (pMsg->pTable == NULL) pMsg->pTable = mnodeGetTable(pCreate->tableId); } if (pMsg->pTable == NULL) { + mError("app:%p:%p, table:%s, object not found, retry:%d reason:%s", pMsg->rpcMsg.ahandle, pMsg, pCreate->tableId, + tstrerror(terrno)); return terrno; + } else { + mTrace("app:%p:%p, table:%s, send create msg to vnode again", pMsg->rpcMsg.ahandle, pMsg, pCreate->tableId); + return mnodeDoCreateChildTableCb(pMsg, TSDB_CODE_SUCCESS); } - - SMDCreateTableMsg *pMDCreate = mnodeBuildCreateChildTableMsg(pCreate, (SChildTableObj *)pMsg->pTable); - if (pMDCreate == NULL) { - return terrno; - } - - SRpcIpSet ipSet = mnodeGetIpSetFromVgroup(pVgroup); - SRpcMsg rpcMsg = { - .handle = pMsg, - .pCont = pMDCreate, - .contLen = htonl(pMDCreate->contLen), - .code = 0, - .msgType = TSDB_MSG_TYPE_MD_CREATE_TABLE - }; - - dnodeSendMsgToDnode(&ipSet, &rpcMsg); - - return TSDB_CODE_MND_ACTION_IN_PROGRESS; } static int32_t mnodeProcessDropChildTableMsg(SMnodeMsg *pMsg) { SChildTableObj *pTable = (SChildTableObj *)pMsg->pTable; if (pMsg->pVgroup == NULL) pMsg->pVgroup = mnodeGetVgroup(pTable->vgId); if (pMsg->pVgroup == NULL) { - mError("table:%s, failed to drop ctable, vgroup not exist", pTable->info.tableId); + mError("app:%p:%p, table:%s, failed to drop ctable, vgroup not exist", pMsg->rpcMsg.ahandle, pMsg, + pTable->info.tableId); return TSDB_CODE_MND_APP_ERROR; } SMDDropTableMsg *pDrop = rpcMallocCont(sizeof(SMDDropTableMsg)); if (pDrop == NULL) { - mError("table:%s, failed to drop ctable, no enough memory", pTable->info.tableId); + mError("app:%p:%p, table:%s, failed to drop ctable, no enough memory", pMsg->rpcMsg.ahandle, pMsg, + pTable->info.tableId); return TSDB_CODE_MND_OUT_OF_MEMORY; } @@ -1595,7 +1710,7 @@ static int32_t mnodeProcessDropChildTableMsg(SMnodeMsg *pMsg) { SRpcIpSet ipSet = mnodeGetIpSetFromVgroup(pMsg->pVgroup); - mPrint("table:%s, send drop ctable msg", pDrop->tableId); + mPrint("app:%p:%p, table:%s, send drop ctable msg", pMsg->rpcMsg.ahandle, pMsg, pDrop->tableId); SRpcMsg rpcMsg = { .handle = pMsg, .pCont = pDrop, @@ -1609,10 +1724,6 @@ static int32_t mnodeProcessDropChildTableMsg(SMnodeMsg *pMsg) { return TSDB_CODE_MND_ACTION_IN_PROGRESS; } -static int32_t mnodeModifyChildTableTagValue(SChildTableObj *pTable, char *tagName, char *nContent) { - return TSDB_CODE_COM_OPS_NOT_SUPPORT; -} - static int32_t mnodeFindNormalTableColumnIndex(SChildTableObj *pTable, char *colName) { SSchema *schema = (SSchema *) pTable->schema; for (int32_t col = 0; col < pTable->numOfColumns; col++) { @@ -1624,15 +1735,25 @@ static int32_t mnodeFindNormalTableColumnIndex(SChildTableObj *pTable, char *col return -1; } -static int32_t mnodeAddNormalTableColumn(SDbObj *pDb, SChildTableObj *pTable, SSchema schema[], int32_t ncols) { +static int32_t mnodeAddNormalTableColumnCb(SMnodeMsg *pMsg, int32_t code) { + SSuperTableObj *pTable = (SSuperTableObj *)pMsg->pTable; + mLPrint("app:%p:%p, ctable %s, add column result:%s", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId, + tstrerror(code)); + return code; +} + +static int32_t mnodeAddNormalTableColumn(SMnodeMsg *pMsg, SSchema schema[], int32_t ncols) { + SChildTableObj *pTable = (SChildTableObj *)pMsg->pTable; + SDbObj *pDb = pMsg->pDb; if (ncols <= 0) { - mError("table:%s, add column, ncols:%d <= 0", pTable->info.tableId); + mError("app:%p:%p, ctable:%s, add column, ncols:%d <= 0", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId); return TSDB_CODE_MND_APP_ERROR; } for (int32_t i = 0; i < ncols; i++) { if (mnodeFindNormalTableColumnIndex(pTable, schema[i].name) > 0) { - mError("table:%s, add column, column:%s already exist", pTable->info.tableId, schema[i].name); + mError("app:%p:%p, ctable:%s, add column, column:%s already exist", pMsg->rpcMsg.ahandle, pMsg, + pTable->info.tableId, schema[i].name); return TSDB_CODE_MND_FIELD_ALREAY_EXIST; } } @@ -1655,26 +1776,39 @@ static int32_t mnodeAddNormalTableColumn(SDbObj *pDb, SChildTableObj *pTable, SS pAcct->acctInfo.numOfTimeSeries += ncols; mnodeDecAcctRef(pAcct); } - + + mPrint("app:%p:%p, ctable %s, start to add column", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId); + SSdbOper oper = { .type = SDB_OPER_GLOBAL, .table = tsChildTableSdb, - .pObj = pTable + .pObj = pTable, + .pMsg = pMsg, + .cb = mnodeAddNormalTableColumnCb }; int32_t code = sdbUpdateRow(&oper); - if (code != TSDB_CODE_SUCCESS) { - return TSDB_CODE_MND_SDB_ERROR; + if (code == TSDB_CODE_SUCCESS) { + return TSDB_CODE_MND_ACTION_IN_PROGRESS; } - - mPrint("table %s, succeed to add column", pTable->info.tableId); - return TSDB_CODE_SUCCESS; + + return code; +} + +static int32_t mnodeDropNormalTableColumnCb(SMnodeMsg *pMsg, int32_t code) { + SSuperTableObj *pTable = (SSuperTableObj *)pMsg->pTable; + mLPrint("app:%p:%p, ctable %s, drop column result:%s", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId, + tstrerror(code)); + return code; } -static int32_t mnodeDropNormalTableColumn(SDbObj *pDb, SChildTableObj *pTable, char *colName) { +static int32_t mnodeDropNormalTableColumn(SMnodeMsg *pMsg, char *colName) { + SDbObj *pDb = pMsg->pDb; + SChildTableObj *pTable = (SChildTableObj *)pMsg->pTable; int32_t col = mnodeFindNormalTableColumnIndex(pTable, colName); if (col <= 0) { - mError("table:%s, drop column, column:%s not exist", pTable->info.tableId, colName); + mError("app:%p:%p, ctable:%s, drop column, column:%s not exist", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId, + colName); return TSDB_CODE_MND_FIELD_NOT_EXIST; } @@ -1687,20 +1821,23 @@ static int32_t mnodeDropNormalTableColumn(SDbObj *pDb, SChildTableObj *pTable, c pAcct->acctInfo.numOfTimeSeries--; mnodeDecAcctRef(pAcct); } - + + mPrint("app:%p:%p, ctable %s, start to drop column %s", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId, colName); + SSdbOper oper = { .type = SDB_OPER_GLOBAL, .table = tsChildTableSdb, - .pObj = pTable + .pObj = pTable, + .pMsg = pMsg, + .cb = mnodeDropNormalTableColumnCb }; int32_t code = sdbUpdateRow(&oper); if (code != TSDB_CODE_SUCCESS) { - return TSDB_CODE_MND_SDB_ERROR; + return TSDB_CODE_MND_ACTION_IN_PROGRESS; } - - mPrint("table %s, succeed to drop column %s", pTable->info.tableId, colName); - return TSDB_CODE_SUCCESS; + + return code; } static int32_t mnodeSetSchemaFromNormalTable(SSchema *pSchema, SChildTableObj *pTable) { @@ -1739,10 +1876,11 @@ static int32_t mnodeDoGetChildTableMeta(SMnodeMsg *pMsg, STableMetaMsg *pMeta) { pMeta->numOfColumns = htons((int16_t)pTable->numOfColumns); pMeta->contLen = sizeof(STableMetaMsg) + mnodeSetSchemaFromNormalTable(pMeta->schema, pTable); } - + if (pMsg->pVgroup == NULL) pMsg->pVgroup = mnodeGetVgroup(pTable->vgId); if (pMsg->pVgroup == NULL) { - mError("table:%s, failed to get table meta, vgroup not exist", pTable->info.tableId); + mError("app:%p:%p, table:%s, failed to get table meta, vgroup not exist", pMsg->rpcMsg.ahandle, pMsg, + pTable->info.tableId); return TSDB_CODE_MND_VGROUP_NOT_EXIST; } @@ -1756,7 +1894,8 @@ static int32_t mnodeDoGetChildTableMeta(SMnodeMsg *pMsg, STableMetaMsg *pMeta) { } pMeta->vgroup.vgId = htonl(pMsg->pVgroup->vgId); - mTrace("table:%s, uid:%" PRIu64 " table meta is retrieved", pTable->info.tableId, pTable->uid); + mTrace("app:%p:%p, table:%s, uid:%" PRIu64 " table meta is retrieved", pMsg->rpcMsg.ahandle, pMsg, + pTable->info.tableId, pTable->uid); return TSDB_CODE_SUCCESS; } @@ -1768,7 +1907,8 @@ static int32_t mnodeAutoCreateChildTable(SMnodeMsg *pMsg) { int32_t contLen = sizeof(SCMCreateTableMsg) + offsetof(STagData, data) + ntohl(pTag->dataLen); SCMCreateTableMsg *pCreateMsg = rpcMallocCont(contLen); if (pCreateMsg == NULL) { - mError("table:%s, failed to create table while get meta info, no enough memory", pInfo->tableId); + mError("app:%p:%p, table:%s, failed to create table while get meta info, no enough memory", pMsg->rpcMsg.ahandle, + pMsg, pInfo->tableId); return TSDB_CODE_MND_OUT_OF_MEMORY; } @@ -1780,7 +1920,8 @@ static int32_t mnodeAutoCreateChildTable(SMnodeMsg *pMsg) { pCreateMsg->contLen = htonl(contLen); memcpy(pCreateMsg->schema, pInfo->tags, contLen - sizeof(SCMCreateTableMsg)); - mTrace("table:%s, start to create on demand, stable:%s", pInfo->tableId, ((STagData *)(pCreateMsg->schema))->name); + mTrace("app:%p:%p, table:%s, start to create on demand, stable:%s", pMsg->rpcMsg.ahandle, pMsg, pInfo->tableId, + ((STagData *)(pCreateMsg->schema))->name); rpcFreeCont(pMsg->rpcMsg.pCont); pMsg->rpcMsg.msgType = TSDB_MSG_TYPE_CM_CREATE_TABLE; @@ -1791,9 +1932,11 @@ static int32_t mnodeAutoCreateChildTable(SMnodeMsg *pMsg) { } static int32_t mnodeGetChildTableMeta(SMnodeMsg *pMsg) { - STableMetaMsg *pMeta = rpcMallocCont(sizeof(STableMetaMsg) + sizeof(SSchema) * (TSDB_MAX_TAGS + TSDB_MAX_COLUMNS + 16)); + STableMetaMsg *pMeta = + rpcMallocCont(sizeof(STableMetaMsg) + sizeof(SSchema) * (TSDB_MAX_TAGS + TSDB_MAX_COLUMNS + 16)); if (pMeta == NULL) { - mError("table:%s, failed to get table meta, no enough memory", pMsg->pTable->tableId); + mError("app:%p:%p, table:%s, failed to get table meta, no enough memory", pMsg->rpcMsg.ahandle, pMsg, + pMsg->pTable->tableId); return TSDB_CODE_MND_OUT_OF_MEMORY; } @@ -1908,11 +2051,13 @@ static int32_t mnodeProcessTableCfgMsg(SMnodeMsg *pMsg) { pCfg->dnodeId = htonl(pCfg->dnodeId); pCfg->vgId = htonl(pCfg->vgId); pCfg->sid = htonl(pCfg->sid); - mTrace("dnode:%d, vgId:%d sid:%d, receive table config msg", pCfg->dnodeId, pCfg->vgId, pCfg->sid); + mTrace("app:%p:%p, dnode:%d, vgId:%d sid:%d, receive table config msg", pMsg->rpcMsg.ahandle, pMsg, pCfg->dnodeId, + pCfg->vgId, pCfg->sid); SChildTableObj *pTable = mnodeGetTableByPos(pCfg->vgId, pCfg->sid); if (pTable == NULL) { - mError("dnode:%d, vgId:%d sid:%d, table not found", pCfg->dnodeId, pCfg->vgId, pCfg->sid); + mError("app:%p:%p, dnode:%d, vgId:%d sid:%d, table not found", pMsg->rpcMsg.ahandle, pMsg, pCfg->dnodeId, + pCfg->vgId, pCfg->sid); return TSDB_CODE_MND_INVALID_TABLE_ID; } @@ -1936,44 +2081,49 @@ static void mnodeProcessDropChildTableRsp(SRpcMsg *rpcMsg) { SChildTableObj *pTable = (SChildTableObj *)mnodeMsg->pTable; assert(pTable); - mPrint("table:%s, drop table rsp received, thandle:%p result:%s", pTable->info.tableId, mnodeMsg->rpcMsg.handle, tstrerror(rpcMsg->code)); + mPrint("app:%p:%p, table:%s, drop table rsp received, thandle:%p result:%s", mnodeMsg->rpcMsg.ahandle, mnodeMsg, + pTable->info.tableId, mnodeMsg->rpcMsg.handle, tstrerror(rpcMsg->code)); if (rpcMsg->code != TSDB_CODE_SUCCESS) { - mError("table:%s, failed to drop in dnode, reason:%s", pTable->info.tableId, tstrerror(rpcMsg->code)); + mError("app:%p:%p, table:%s, failed to drop in dnode, reason:%s", mnodeMsg->rpcMsg.ahandle, mnodeMsg, + pTable->info.tableId, tstrerror(rpcMsg->code)); dnodeSendRpcMnodeWriteRsp(mnodeMsg, rpcMsg->code); return; } if (mnodeMsg->pVgroup == NULL) mnodeMsg->pVgroup = mnodeGetVgroup(pTable->vgId); if (mnodeMsg->pVgroup == NULL) { - mError("table:%s, failed to get vgroup", pTable->info.tableId); + mError("app:%p:%p, table:%s, failed to get vgroup", mnodeMsg->rpcMsg.ahandle, mnodeMsg, pTable->info.tableId); dnodeSendRpcMnodeWriteRsp(mnodeMsg, TSDB_CODE_MND_VGROUP_NOT_EXIST); return; } - + SSdbOper oper = { .type = SDB_OPER_GLOBAL, .table = tsChildTableSdb, .pObj = pTable }; - + int32_t code = sdbDeleteRow(&oper); if (code != TSDB_CODE_SUCCESS) { - mError("table:%s, update ctables sdb error", pTable->info.tableId); + mError("app:%p:%p, table:%s, update ctables sdb error", mnodeMsg->rpcMsg.ahandle, mnodeMsg, pTable->info.tableId); dnodeSendRpcMnodeWriteRsp(mnodeMsg, TSDB_CODE_MND_SDB_ERROR); return; } if (mnodeMsg->pVgroup->numOfTables <= 0) { - mPrint("vgId:%d, all tables is dropped, drop vgroup", mnodeMsg->pVgroup->vgId); + mPrint("app:%p:%p, vgId:%d, all tables is dropped, drop vgroup", mnodeMsg->rpcMsg.ahandle, mnodeMsg, + mnodeMsg->pVgroup->vgId); mnodeDropVgroup(mnodeMsg->pVgroup, NULL); } dnodeSendRpcMnodeWriteRsp(mnodeMsg, TSDB_CODE_SUCCESS); } -// handle create table response from dnode -// if failed, drop the table cached +/* + * handle create table response from dnode + * if failed, drop the table cached + */ static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg) { if (rpcMsg->handle == NULL) return; @@ -1982,36 +2132,36 @@ static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg) { SChildTableObj *pTable = (SChildTableObj *)mnodeMsg->pTable; assert(pTable); - mTrace("table:%s, create table rsp received, thandle:%p result:%s", pTable->info.tableId, mnodeMsg->rpcMsg.handle, - tstrerror(rpcMsg->code)); - if (rpcMsg->code != TSDB_CODE_SUCCESS) { - if (mnodeMsg->retry++ < 10) { - mTrace("table:%s, create table rsp received, retry:%d thandle:%p result:%s", pTable->info.tableId, - mnodeMsg->retry, mnodeMsg->rpcMsg.handle, tstrerror(rpcMsg->code)); - dnodeDelayReprocessMnodeWriteMsg(mnodeMsg); - } else { - mError("table:%s, failed to create in dnode, thandle:%p result:%s", pTable->info.tableId, - mnodeMsg->rpcMsg.handle, tstrerror(rpcMsg->code)); - - SSdbOper oper = { - .type = SDB_OPER_GLOBAL, - .table = tsChildTableSdb, - .pObj = pTable - }; - sdbDeleteRow(&oper); - - dnodeSendRpcMnodeWriteRsp(mnodeMsg, rpcMsg->code); - } - } else { - mTrace("table:%s, created in dnode, thandle:%p result:%s", pTable->info.tableId, mnodeMsg->rpcMsg.handle, - tstrerror(rpcMsg->code)); + if (rpcMsg->code == TSDB_CODE_SUCCESS || rpcMsg->code == TSDB_CODE_TDB_TABLE_ALREADY_EXIST) { SCMCreateTableMsg *pCreate = mnodeMsg->rpcMsg.pCont; if (pCreate->getMeta) { - mTrace("table:%s, continue to get meta", pTable->info.tableId); + mTrace("app:%p:%p, table:%s, created in dnode and continue to get meta, thandle:%p result:%s", + mnodeMsg->rpcMsg.ahandle, mnodeMsg, pTable->info.tableId, mnodeMsg->rpcMsg.handle, + tstrerror(rpcMsg->code)); + mnodeMsg->retry = 0; dnodeReprocessMnodeWriteMsg(mnodeMsg); } else { + mTrace("app:%p:%p, table:%s, created in dnode, thandle:%p result:%s", mnodeMsg->rpcMsg.ahandle, mnodeMsg, + pTable->info.tableId, mnodeMsg->rpcMsg.handle, tstrerror(rpcMsg->code)); + + dnodeSendRpcMnodeWriteRsp(mnodeMsg, TSDB_CODE_SUCCESS); + } + } else { + if (mnodeMsg->retry++ < 10) { + mTrace("app:%p:%p, table:%s, create table rsp received, need retry, times:%d result:%s thandle:%p", + mnodeMsg->rpcMsg.ahandle, mnodeMsg, pTable->info.tableId, mnodeMsg->retry, tstrerror(rpcMsg->code), + mnodeMsg->rpcMsg.handle); + + dnodeDelayReprocessMnodeWriteMsg(mnodeMsg); + } else { + mError("app:%p:%p, table:%s, failed to create in dnode, result:%s thandle:%p", mnodeMsg->rpcMsg.ahandle, mnodeMsg, + pTable->info.tableId, tstrerror(rpcMsg->code), mnodeMsg->rpcMsg.handle); + + SSdbOper oper = {.type = SDB_OPER_GLOBAL, .table = tsChildTableSdb, .pObj = pTable}; + sdbDeleteRow(&oper); + dnodeSendRpcMnodeWriteRsp(mnodeMsg, rpcMsg->code); } } @@ -2202,22 +2352,24 @@ static int32_t mnodeRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows static int32_t mnodeProcessAlterTableMsg(SMnodeMsg *pMsg) { SCMAlterTableMsg *pAlter = pMsg->rpcMsg.pCont; - mTrace("table:%s, alter table msg is received from thandle:%p", pAlter->tableId, pMsg->rpcMsg.handle); + mTrace("app:%p:%p, table:%s, alter table msg is received from thandle:%p", pMsg->rpcMsg.ahandle, pMsg, + pAlter->tableId, pMsg->rpcMsg.handle); if (pMsg->pDb == NULL) pMsg->pDb = mnodeGetDbByTableId(pAlter->tableId); if (pMsg->pDb == NULL || pMsg->pDb->status != TSDB_DB_STATUS_READY) { - mError("table:%s, failed to alter table, db not selected", pAlter->tableId); + mError("app:%p:%p, table:%s, failed to alter table, db not selected", pMsg->rpcMsg.ahandle, pMsg, pAlter->tableId); return TSDB_CODE_MND_DB_NOT_SELECTED; } if (mnodeCheckIsMonitorDB(pMsg->pDb->name, tsMonitorDbName)) { - mError("table:%s, failed to alter table, its log db", pAlter->tableId); + mError("app:%p:%p, table:%s, failed to alter table, its log db", pMsg->rpcMsg.ahandle, pMsg, pAlter->tableId); return TSDB_CODE_MND_MONITOR_DB_FORBIDDEN; } if (pMsg->pTable == NULL) pMsg->pTable = mnodeGetTable(pAlter->tableId); if (pMsg->pTable == NULL) { - mError("table:%s, failed to alter table, table not exist", pMsg->pTable->tableId); + mError("app:%p:%p, table:%s, failed to alter table, table not exist", pMsg->rpcMsg.ahandle, pMsg, + pMsg->pTable->tableId); return TSDB_CODE_MND_INVALID_TABLE_NAME; } @@ -2226,7 +2378,8 @@ static int32_t mnodeProcessAlterTableMsg(SMnodeMsg *pMsg) { pAlter->tagValLen = htonl(pAlter->tagValLen); if (pAlter->numOfCols > 2) { - mError("table:%s, error numOfCols:%d in alter table", pAlter->tableId, pAlter->numOfCols); + mError("app:%p:%p, table:%s, error numOfCols:%d in alter table", pMsg->rpcMsg.ahandle, pMsg, pAlter->tableId, + pAlter->numOfCols); return TSDB_CODE_MND_APP_ERROR; } @@ -2236,35 +2389,32 @@ static int32_t mnodeProcessAlterTableMsg(SMnodeMsg *pMsg) { int32_t code = TSDB_CODE_COM_OPS_NOT_SUPPORT; if (pMsg->pTable->type == TSDB_SUPER_TABLE) { - SSuperTableObj *pTable = (SSuperTableObj *)pMsg->pTable; - mTrace("table:%s, start to alter stable", pAlter->tableId); + mTrace("app:%p:%p, table:%s, start to alter stable", pMsg->rpcMsg.ahandle, pMsg, pAlter->tableId); if (pAlter->type == TSDB_ALTER_TABLE_ADD_TAG_COLUMN) { - code = mnodeAddSuperTableTag(pTable, pAlter->schema, 1); + code = mnodeAddSuperTableTag(pMsg, pAlter->schema, 1); } else if (pAlter->type == TSDB_ALTER_TABLE_DROP_TAG_COLUMN) { - code = mnodeDropSuperTableTag(pTable, pAlter->schema[0].name); + code = mnodeDropSuperTableTag(pMsg, pAlter->schema[0].name); } else if (pAlter->type == TSDB_ALTER_TABLE_CHANGE_TAG_COLUMN) { - code = mnodeModifySuperTableTagName(pTable, pAlter->schema[0].name, pAlter->schema[1].name); + code = mnodeModifySuperTableTagName(pMsg, pAlter->schema[0].name, pAlter->schema[1].name); } else if (pAlter->type == TSDB_ALTER_TABLE_ADD_COLUMN) { - code = mnodeAddSuperTableColumn(pMsg->pDb, pTable, pAlter->schema, 1); + code = mnodeAddSuperTableColumn(pMsg, pAlter->schema, 1); } else if (pAlter->type == TSDB_ALTER_TABLE_DROP_COLUMN) { - code = mnodeDropSuperTableColumn(pMsg->pDb, pTable, pAlter->schema[0].name); + code = mnodeDropSuperTableColumn(pMsg, pAlter->schema[0].name); } else { } } else { - mTrace("table:%s, start to alter ctable", pAlter->tableId); - SChildTableObj *pTable = (SChildTableObj *)pMsg->pTable; + mTrace("app:%p:%p, table:%s, start to alter ctable", pMsg->rpcMsg.ahandle, pMsg, pAlter->tableId); if (pAlter->type == TSDB_ALTER_TABLE_UPDATE_TAG_VAL) { - char *tagVal = (char*)(pAlter->schema + pAlter->numOfCols); - code = mnodeModifyChildTableTagValue(pTable, pAlter->schema[0].name, tagVal); + return TSDB_CODE_COM_OPS_NOT_SUPPORT; } else if (pAlter->type == TSDB_ALTER_TABLE_ADD_COLUMN) { - code = mnodeAddNormalTableColumn(pMsg->pDb, pTable, pAlter->schema, 1); + code = mnodeAddNormalTableColumn(pMsg, pAlter->schema, 1); } else if (pAlter->type == TSDB_ALTER_TABLE_DROP_COLUMN) { - code = mnodeDropNormalTableColumn(pMsg->pDb, pTable, pAlter->schema[0].name); + code = mnodeDropNormalTableColumn(pMsg, pAlter->schema[0].name); } else { } } - return code; + return code; } static int32_t mnodeGetStreamTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { diff --git a/src/mnode/src/mnodeUser.c b/src/mnode/src/mnodeUser.c index 053ea3ef75471a6ceca058e511b219275b8974b4..eb79d6acb63d1539369b2c9231d302bfa64934c6 100644 --- a/src/mnode/src/mnodeUser.c +++ b/src/mnode/src/mnodeUser.c @@ -102,11 +102,13 @@ static int32_t mnodeUserActionDecode(SSdbOper *pOper) { } static int32_t mnodeUserActionRestored() { - if (dnodeIsFirstDeploy()) { + int32_t numOfRows = sdbGetNumOfRows(tsUserSdb); + if (numOfRows <= 0 && dnodeIsFirstDeploy()) { + mPrint("dnode first deploy, create root user"); SAcctObj *pAcct = mnodeGetAcct("root"); - mnodeCreateUser(pAcct, "root", "taosdata"); - mnodeCreateUser(pAcct, "monitor", tsInternalPass); - mnodeCreateUser(pAcct, "_root", tsInternalPass); + mnodeCreateUser(pAcct, "root", "taosdata", NULL); + mnodeCreateUser(pAcct, "monitor", tsInternalPass, NULL); + mnodeCreateUser(pAcct, "_root", tsInternalPass, NULL); mnodeDecAcctRef(pAcct); } @@ -170,22 +172,24 @@ void mnodeDecUserRef(SUserObj *pUser) { return sdbDecRef(tsUserSdb, pUser); } -static int32_t mnodeUpdateUser(SUserObj *pUser) { +static int32_t mnodeUpdateUser(SUserObj *pUser, void *pMsg) { SSdbOper oper = { - .type = SDB_OPER_GLOBAL, + .type = SDB_OPER_GLOBAL, .table = tsUserSdb, - .pObj = pUser + .pObj = pUser, + .pMsg = pMsg }; int32_t code = sdbUpdateRow(&oper); - if (code != TSDB_CODE_SUCCESS) { - code = TSDB_CODE_MND_SDB_ERROR; + if (code == TSDB_CODE_SUCCESS) { + mLPrint("user:%s, is altered by %s", pUser->user, mnodeGetUserFromMsg(pMsg)); + if (pMsg != NULL) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; } return code; } -int32_t mnodeCreateUser(SAcctObj *pAcct, char *name, char *pass) { +int32_t mnodeCreateUser(SAcctObj *pAcct, char *name, char *pass, void *pMsg) { int32_t code = acctCheck(pAcct, ACCT_GRANT_USER); if (code != TSDB_CODE_SUCCESS) { return code; @@ -223,31 +227,36 @@ int32_t mnodeCreateUser(SAcctObj *pAcct, char *name, char *pass) { } SSdbOper oper = { - .type = SDB_OPER_GLOBAL, - .table = tsUserSdb, - .pObj = pUser, - .rowSize = sizeof(SUserObj) + .type = SDB_OPER_GLOBAL, + .table = tsUserSdb, + .pObj = pUser, + .rowSize = sizeof(SUserObj), + .pMsg = pMsg }; code = sdbInsertRow(&oper); if (code != TSDB_CODE_SUCCESS) { tfree(pUser); - code = TSDB_CODE_MND_SDB_ERROR; + } else { + mLPrint("user:%s, is created by %s", pUser->user, mnodeGetUserFromMsg(pMsg)); + if (pMsg != NULL) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; } return code; } -static int32_t mnodeDropUser(SUserObj *pUser) { +static int32_t mnodeDropUser(SUserObj *pUser, void *pMsg) { SSdbOper oper = { - .type = SDB_OPER_GLOBAL, + .type = SDB_OPER_GLOBAL, .table = tsUserSdb, - .pObj = pUser + .pObj = pUser, + .pMsg = pMsg }; int32_t code = sdbDeleteRow(&oper); - if (code != TSDB_CODE_SUCCESS) { - code = TSDB_CODE_MND_SDB_ERROR; + if (code == TSDB_CODE_SUCCESS) { + mLPrint("user:%s, is dropped by %s", pUser->user, mnodeGetUserFromMsg(pMsg)); + if (pMsg != NULL) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; } return code; @@ -357,22 +366,25 @@ SUserObj *mnodeGetUserFromConn(void *pConn) { } } +char *mnodeGetUserFromMsg(void *pMsg) { + SMnodeMsg *pMnodeMsg = pMsg; + if (pMnodeMsg != NULL &&pMnodeMsg->pUser != NULL) { + return pMnodeMsg->pUser->user; + } else { + return "system"; + } +} + static int32_t mnodeProcessCreateUserMsg(SMnodeMsg *pMsg) { - int32_t code; SUserObj *pOperUser = pMsg->pUser; if (pOperUser->superAuth) { SCMCreateUserMsg *pCreate = pMsg->rpcMsg.pCont; - code = mnodeCreateUser(pOperUser->pAcct, pCreate->user, pCreate->pass); - if (code == TSDB_CODE_SUCCESS) { - mLPrint("user:%s, is created by %s", pCreate->user, pOperUser->user); - } + return mnodeCreateUser(pOperUser->pAcct, pCreate->user, pCreate->pass, pMsg); } else { mError("user:%s, no rights to create user", pOperUser->user); - code = TSDB_CODE_MND_NO_RIGHTS; + return TSDB_CODE_MND_NO_RIGHTS; } - - return code; } static int32_t mnodeProcessAlterUserMsg(SMnodeMsg *pMsg) { @@ -409,8 +421,7 @@ static int32_t mnodeProcessAlterUserMsg(SMnodeMsg *pMsg) { if (hasRight) { memset(pUser->pass, 0, sizeof(pUser->pass)); taosEncryptPass((uint8_t*)pAlter->pass, strlen(pAlter->pass), pUser->pass); - code = mnodeUpdateUser(pUser); - mLPrint("user:%s, password is altered by %s, result:%s", pUser->user, pOperUser->user, tstrerror(code)); + code = mnodeUpdateUser(pUser, pMsg); } else { mError("user:%s, no rights to alter user", pOperUser->user); code = TSDB_CODE_MND_NO_RIGHTS; @@ -450,8 +461,7 @@ static int32_t mnodeProcessAlterUserMsg(SMnodeMsg *pMsg) { pUser->writeAuth = 1; } - code = mnodeUpdateUser(pUser); - mLPrint("user:%s, privilege is altered by %s, result:%s", pUser->user, pOperUser->user, tstrerror(code)); + code = mnodeUpdateUser(pUser, pMsg); } else { mError("user:%s, no rights to alter user", pOperUser->user); code = TSDB_CODE_MND_NO_RIGHTS; @@ -497,10 +507,7 @@ static int32_t mnodeProcessDropUserMsg(SMnodeMsg *pMsg) { } if (hasRight) { - code = mnodeDropUser(pUser); - if (code == TSDB_CODE_SUCCESS) { - mLPrint("user:%s, is dropped by %s, result:%s", pUser->user, pOperUser->user, tstrerror(code)); - } + code = mnodeDropUser(pUser, pMsg); } else { code = TSDB_CODE_MND_NO_RIGHTS; } diff --git a/src/mnode/src/mnodeVgroup.c b/src/mnode/src/mnodeVgroup.c index bc6c2665c875d17c832cbe75008dc028a9321433..0475a48ce2c9da6c4c6ab15772a8e31880529f7f 100644 --- a/src/mnode/src/mnodeVgroup.c +++ b/src/mnode/src/mnodeVgroup.c @@ -149,7 +149,7 @@ static int32_t mnodeVgroupActionUpdate(SSdbOper *pOper) { } } - memcpy(pVgroup, pNew, pOper->rowSize); + memcpy(pVgroup, pNew, sizeof(SVgObj)); free(pNew); for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) { @@ -299,6 +299,27 @@ void *mnodeGetNextVgroup(void *pIter, SVgObj **pVgroup) { return sdbFetchRow(tsVgroupSdb, pIter, (void **)pVgroup); } +static int32_t mnodeCreateVgroupCb(SMnodeMsg *pMsg, int32_t code) { + if (code != TSDB_CODE_SUCCESS) { + pMsg->pVgroup = NULL; + return code; + } + + SVgObj *pVgroup = pMsg->pVgroup; + SDbObj *pDb = pMsg->pDb; + + mPrint("vgId:%d, is created in mnode, db:%s replica:%d", pVgroup->vgId, pDb->name, pVgroup->numOfVnodes); + for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) { + mPrint("vgId:%d, index:%d, dnode:%d", pVgroup->vgId, i, pVgroup->vnodeGid[i].dnodeId); + } + + mnodeIncVgroupRef(pVgroup); + pMsg->expected = pVgroup->numOfVnodes; + mnodeSendCreateVgroupMsg(pVgroup, pMsg); + + return TSDB_CODE_MND_ACTION_IN_PROGRESS; +} + int32_t mnodeCreateVgroup(SMnodeMsg *pMsg, SDbObj *pDb) { SVgObj *pVgroup = (SVgObj *)calloc(1, sizeof(SVgObj)); strcpy(pVgroup->dbName, pDb->name); @@ -314,26 +335,22 @@ int32_t mnodeCreateVgroup(SMnodeMsg *pMsg, SDbObj *pDb) { .type = SDB_OPER_GLOBAL, .table = tsVgroupSdb, .pObj = pVgroup, - .rowSize = sizeof(SVgObj) + .rowSize = sizeof(SVgObj), + .pMsg = pMsg, + .cb = mnodeCreateVgroupCb }; + pMsg->pVgroup = pVgroup; + int32_t code = sdbInsertRow(&oper); if (code != TSDB_CODE_SUCCESS) { + pMsg->pVgroup = NULL; tfree(pVgroup); - return TSDB_CODE_MND_SDB_ERROR; - } - - mPrint("vgId:%d, is created in mnode, db:%s replica:%d", pVgroup->vgId, pDb->name, pVgroup->numOfVnodes); - for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) { - mPrint("vgId:%d, index:%d, dnode:%d", pVgroup->vgId, i, pVgroup->vnodeGid[i].dnodeId); + } else { + if (pMsg != NULL) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; } - mnodeIncVgroupRef(pVgroup); - pMsg->pVgroup = pVgroup; - pMsg->expected = pVgroup->numOfVnodes; - mnodeSendCreateVgroupMsg(pVgroup, pMsg); - - return TSDB_CODE_MND_ACTION_IN_PROGRESS; + return code; } void mnodeDropVgroup(SVgObj *pVgroup, void *ahandle) { @@ -596,7 +613,6 @@ SRpcIpSet mnodeGetIpSetFromIp(char *ep) { } void mnodeSendCreateVnodeMsg(SVgObj *pVgroup, SRpcIpSet *ipSet, void *ahandle) { - mTrace("vgId:%d, send create vnode:%d msg, ahandle:%p db:%s", pVgroup->vgId, pVgroup->vgId, ahandle, pVgroup->dbName); SMDCreateVnodeMsg *pCreate = mnodeBuildCreateVnodeMsg(pVgroup); SRpcMsg rpcMsg = { .handle = ahandle, @@ -609,9 +625,12 @@ void mnodeSendCreateVnodeMsg(SVgObj *pVgroup, SRpcIpSet *ipSet, void *ahandle) { } void mnodeSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle) { - mTrace("vgId:%d, send create all vnodes msg, ahandle:%p", pVgroup->vgId, ahandle); + mTrace("vgId:%d, send create all vnodes msg, numOfVnodes:%d db:%s", pVgroup->vgId, pVgroup->numOfVnodes, + pVgroup->dbName); for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) { SRpcIpSet ipSet = mnodeGetIpSetFromIp(pVgroup->vnodeGid[i].pDnode->dnodeEp); + mTrace("vgId:%d, index:%d, send create vnode msg to dnode %s, ahandle:%p", pVgroup->vgId, + i, pVgroup->vnodeGid[i].pDnode->dnodeEp, ahandle); mnodeSendCreateVnodeMsg(pVgroup, &ipSet, ahandle); } } @@ -729,6 +748,7 @@ static int32_t mnodeProcessVnodeCfgMsg(SMnodeMsg *pMsg) { } mnodeDecVgroupRef(pVgroup); + mTrace("vgId:%d, send create vnode msg to dnode %s for vnode cfg msg", pVgroup->vgId, pDnode->dnodeEp); SRpcIpSet ipSet = mnodeGetIpSetFromIp(pDnode->dnodeEp); mnodeSendCreateVnodeMsg(pVgroup, &ipSet, NULL); diff --git a/src/mnode/src/mnodeWrite.c b/src/mnode/src/mnodeWrite.c index 29b2e6c82b3b028e87ac027ac0f6b5478d1a750d..cd3d722480981e6a10329020861d45c29cde3fd7 100644 --- a/src/mnode/src/mnodeWrite.c +++ b/src/mnode/src/mnodeWrite.c @@ -43,7 +43,7 @@ void mnodeAddWriteMsgHandle(uint8_t msgType, int32_t (*fp)(SMnodeMsg *mnodeMsg)) int32_t mnodeProcessWrite(SMnodeMsg *pMsg) { if (pMsg->rpcMsg.pCont == NULL) { - mError("%p, msg:%s in mwrite queue, content is null", pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType]); + mError("app:%p:%p, msg:%s content is null", pMsg->rpcMsg.ahandle, pMsg, taosMsg[pMsg->rpcMsg.msgType]); return TSDB_CODE_MND_INVALID_MSG_LEN; } @@ -54,27 +54,31 @@ int32_t mnodeProcessWrite(SMnodeMsg *pMsg) { rpcRsp->rsp = ipSet; rpcRsp->len = sizeof(SRpcIpSet); - mTrace("%p, msg:%s in mwrite queue, will be redireced inUse:%d", pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType], ipSet->inUse); + mTrace("app:%p:%p, msg:%s will be redireced inUse:%d", pMsg->rpcMsg.ahandle, pMsg, taosMsg[pMsg->rpcMsg.msgType], + ipSet->inUse); for (int32_t i = 0; i < ipSet->numOfIps; ++i) { - mTrace("mnode index:%d ip:%s:%d", i, ipSet->fqdn[i], htons(ipSet->port[i])); + mTrace("app:%p:%p, mnode index:%d ip:%s:%d", pMsg->rpcMsg.ahandle, pMsg, i, ipSet->fqdn[i], + htons(ipSet->port[i])); } return TSDB_CODE_RPC_REDIRECT; } if (tsMnodeProcessWriteMsgFp[pMsg->rpcMsg.msgType] == NULL) { - mError("%p, msg:%s in mwrite queue, not processed", pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType]); + mError("app:%p:%p, msg:%s not processed", pMsg->rpcMsg.ahandle, pMsg, taosMsg[pMsg->rpcMsg.msgType]); return TSDB_CODE_MND_MSG_NOT_PROCESSED; } int32_t code = mnodeInitMsg(pMsg); if (code != TSDB_CODE_SUCCESS) { - mError("%p, msg:%s in mwrite queue, not processed reason:%s", pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType], tstrerror(code)); + mError("app:%p:%p, msg:%s not processed, reason:%s", pMsg->rpcMsg.ahandle, pMsg, taosMsg[pMsg->rpcMsg.msgType], + tstrerror(code)); return code; } if (!pMsg->pUser->writeAuth) { - mError("%p, msg:%s in mwrite queue, not processed, no write auth", pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType]); + mError("app:%p:%p, msg:%s not processed, no write auth", pMsg->rpcMsg.ahandle, pMsg, + taosMsg[pMsg->rpcMsg.msgType]); return TSDB_CODE_MND_NO_RIGHTS; } diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 3c8f8623a100c8bdf2f159a51e53f67a5c715f94..e3a0f159490907c9ea9ec15a396b2f0257a9a524 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -3356,7 +3356,7 @@ void setWindowResOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult * int32_t setAdditionalInfo(SQInfo *pQInfo, STableId* pTableId, STableQueryInfo *pTableQueryInfo) { SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; - assert(pTableQueryInfo->lastKey >= TSKEY_INITIAL_VAL); + //assert(pTableQueryInfo->lastKey >= TSKEY_INITIAL_VAL); setTagVal(pRuntimeEnv, pTableId, pQInfo->tsdb); @@ -4252,7 +4252,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { SArray* group = taosArrayGetP(pQInfo->groupInfo.pGroupList, pQInfo->groupIndex); qTrace("QInfo:%p last_row query on group:%d, total group:%d, current group:%d", pQInfo, pQInfo->groupIndex, - numOfGroups); + numOfGroups, group); STsdbQueryCond cond = { .twindow = pQuery->window, @@ -4286,10 +4286,10 @@ static void sequentialTableProcess(SQInfo *pQInfo) { setTagVal(pRuntimeEnv, (STableId*) taosArrayGet(s, 0), pQInfo->tsdb); - taosArrayDestroy(s); if (isFirstLastRowQuery(pQuery)) { assert(taosArrayGetSize(s) == 1); } + taosArrayDestroy(s); // here we simply set the first table as current table pQuery->current = ((SGroupItem*) taosArrayGet(group, 0))->info; @@ -5405,6 +5405,7 @@ static int32_t createFilterInfo(void *pQInfo, SQuery *pQuery) { if ((lower == TSDB_RELATION_GREATER_EQUAL || lower == TSDB_RELATION_GREATER) && (upper == TSDB_RELATION_LESS_EQUAL || upper == TSDB_RELATION_LESS)) { + assert(rangeFilterArray != NULL); if (lower == TSDB_RELATION_GREATER_EQUAL) { if (upper == TSDB_RELATION_LESS_EQUAL) { pSingleColFilter->fp = rangeFilterArray[4]; @@ -5419,11 +5420,12 @@ static int32_t createFilterInfo(void *pQInfo, SQuery *pQuery) { } } } else { // set callback filter function + assert(filterArray != NULL); if (lower != TSDB_RELATION_INVALID) { pSingleColFilter->fp = filterArray[lower]; if (upper != TSDB_RELATION_INVALID) { - qError("pQInfo:%p failed to get filter function, invalid filter condition", pQInfo, type); + qError("pQInfo:%p failed to get filter function, invalid filter condition: %d", pQInfo, type); return TSDB_CODE_QRY_INVALID_MSG; } } else { diff --git a/src/query/src/tvariant.c b/src/query/src/tvariant.c index 32d06225c513dfef071113278666d451bd9a0ae3..7ba4e7899e498b47283d9afc122c4ce3d74bcd67 100644 --- a/src/query/src/tvariant.c +++ b/src/query/src/tvariant.c @@ -887,7 +887,7 @@ int32_t tVariantTypeSetType(tVariant *pVariant, char type) { free(pVariant->pz); pVariant->dKey = v; } else if (pVariant->nType >= TSDB_DATA_TYPE_BOOL && pVariant->nType <= TSDB_DATA_TYPE_BIGINT) { - pVariant->dKey = pVariant->i64Key; + pVariant->dKey = (double)(pVariant->i64Key); } pVariant->nType = TSDB_DATA_TYPE_DOUBLE; diff --git a/src/rpc/src/rpcTcp.c b/src/rpc/src/rpcTcp.c index 7035b30b66ec1d0b0d1a1e50cfb1d74f83ff904f..0ca73c1b40d9f7c12b95a434555a0127a0f574e1 100644 --- a/src/rpc/src/rpcTcp.c +++ b/src/rpc/src/rpcTcp.c @@ -117,7 +117,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread code = pthread_mutex_init(&(pThreadObj->mutex), NULL); if (code < 0) { tError("%s failed to init TCP process data mutex(%s)", label, strerror(errno)); - break;; + break; } pThreadObj->pollFd = epoll_create(10); // size does not matter @@ -367,7 +367,7 @@ static void taosReportBrokenLink(SFdObj *pFdObj) { recvInfo.ip = 0; recvInfo.port = 0; recvInfo.shandle = pThreadObj->shandle; - recvInfo.thandle = pFdObj->thandle;; + recvInfo.thandle = pFdObj->thandle; recvInfo.chandle = NULL; recvInfo.connType = RPC_CONN_TCP; (*(pThreadObj->processData))(&recvInfo); @@ -414,7 +414,7 @@ static int taosReadTcpData(SFdObj *pFdObj, SRecvInfo *pInfo) { pInfo->ip = pFdObj->ip; pInfo->port = pFdObj->port; pInfo->shandle = pThreadObj->shandle; - pInfo->thandle = pFdObj->thandle;; + pInfo->thandle = pFdObj->thandle; pInfo->chandle = pFdObj; pInfo->connType = RPC_CONN_TCP; diff --git a/src/util/src/tmem.c b/src/util/src/tmem.c index 38d930ac4e89456aa706c87880ceb52107bb8849..ec5f90990b4d84a92f898656f835c71a976676fc 100644 --- a/src/util/src/tmem.c +++ b/src/util/src/tmem.c @@ -29,7 +29,7 @@ static FILE* fpAllocLog = NULL; // memory allocator which fails randomly extern int32_t taosGetTimestampSec(); -static int32_t startTime = INT32_MAX;; +static int32_t startTime = INT32_MAX; static bool random_alloc_fail(size_t size, const char* file, uint32_t line) { if (taosGetTimestampSec() < startTime) { diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index fef0b071a632d08e9f9068539ec9ff138d48b80a..bada60b19bcc8f7f311aabca34a7225f95046db8 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -119,7 +119,7 @@ int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) { tsdbCfg.minRowsPerFileBlock = pVnodeCfg->cfg.minRowsPerFileBlock; tsdbCfg.maxRowsPerFileBlock = pVnodeCfg->cfg.maxRowsPerFileBlock; tsdbCfg.precision = pVnodeCfg->cfg.precision; - tsdbCfg.compression = pVnodeCfg->cfg.compression;; + tsdbCfg.compression = pVnodeCfg->cfg.compression; char tsdbDir[TSDB_FILENAME_LEN] = {0}; sprintf(tsdbDir, "%s/vnode%d/tsdb", tsVnodeDir, pVnodeCfg->cfg.vgId); diff --git a/tests/script/jenkins/basic.txt b/tests/script/jenkins/basic.txt index 773125e06d7e4101cecd44be84089ab0edcf1f64..3e6d245af93039c326977d2073f3de50422463bd 100644 --- a/tests/script/jenkins/basic.txt +++ b/tests/script/jenkins/basic.txt @@ -359,3 +359,4 @@ cd ../../../debug; make ./test.sh -f unique/arbitrator/sync_replica3_alterTable_drop.sim ./test.sh -f unique/arbitrator/sync_replica3_dropDb.sim ./test.sh -f unique/arbitrator/sync_replica3_dropTable.sim + diff --git a/tests/script/tmp/prepare.sim b/tests/script/tmp/prepare.sim index a71401d6e4709feec4bf60c3b1bb6eceba349179..2480c20709be97840b691d2d90e99d2e74d5722a 100644 --- a/tests/script/tmp/prepare.sim +++ b/tests/script/tmp/prepare.sim @@ -20,10 +20,10 @@ system sh/cfg.sh -n dnode2 -c mnodeEqualVnodeNum -v 4 system sh/cfg.sh -n dnode3 -c mnodeEqualVnodeNum -v 4 system sh/cfg.sh -n dnode4 -c mnodeEqualVnodeNum -v 4 -system sh/cfg.sh -n dnode1 -c numOfTotalVnodes -v 4 -system sh/cfg.sh -n dnode2 -c numOfTotalVnodes -v 4 -system sh/cfg.sh -n dnode3 -c numOfTotalVnodes -v 4 -system sh/cfg.sh -n dnode4 -c numOfTotalVnodes -v 4 +system sh/cfg.sh -n dnode1 -c numOfTotalVnodes -v 20 +system sh/cfg.sh -n dnode2 -c numOfTotalVnodes -v 20 +system sh/cfg.sh -n dnode3 -c numOfTotalVnodes -v 20 +system sh/cfg.sh -n dnode4 -c numOfTotalVnodes -v 20 system sh/cfg.sh -n dnode1 -c http -v 1 system sh/cfg.sh -n dnode2 -c http -v 1 diff --git a/tests/script/unique/arbitrator/dn3_mn1_vnode_corruptFile_offline.sim b/tests/script/unique/arbitrator/dn3_mn1_vnode_corruptFile_offline.sim index 40ba2f6e9aef3b6db900e0fe845f2067880ba18c..a0d8acb272254db1f724f4eeff3e25a4237ae741 100644 --- a/tests/script/unique/arbitrator/dn3_mn1_vnode_corruptFile_offline.sim +++ b/tests/script/unique/arbitrator/dn3_mn1_vnode_corruptFile_offline.sim @@ -155,22 +155,31 @@ if $dnode2Vtatus != master then goto wait_dnode3_vgroup_offline endi -#system_content ls ../../../sim/dnode3/data/vnode/vnode2/tsdb/data/ -l | grep "^-" | wc -l | sed 's/^[ \t]*//g' -#system_content ls ../../../sim/dnode3/data/vnode/vnode2/tsdb/data/ -l | grep "^-" | wc -l | sed 's/[ \t]*$//g' +#$expectCnt = 3 . : +#print expectCnt: [ $expectCnt ] +#system_content ls ../../../sim/dnode3/data/vnode/vnode2/tsdb/data/ -l | grep "^-" | wc -l | tr '\n' ':' +#system_content ls ../../../sim/dnode3/data/vnode/vnode2/tsdb/data/ -l | grep "^-" | wc -l | tr '\n' ':' #print --2-->dnode3 data files: [ $system_content ] -system_content ls ../../../sim/dnode2/data/vnode/vnode2/tsdb/data/ -l | grep "^-" | wc -l -print ---->dnode2 data files: [ $system_content ], expect is 0 -system_content ls ../../../sim/dnode3/data/vnode/vnode2/tsdb/data/ -l | grep "^-" | wc -l -print ---->dnode3 data files: [ $system_content ], expect is 3 -#if $system_content != 3 then -# return -1 -#endi +system_content ls ../../../sim/dnode2/data/vnode/vnode2/tsdb/data/ -l | grep "^-" | wc -l | tr -d '\n' +print ---->dnode2 data files: $system_content expect: 0 +if $system_content != 0 then + return -1 +endi + +system_content ls ../../../sim/dnode3/data/vnode/vnode2/tsdb/data/ -l | grep "^-" | wc -l | tr -d '\n' +print ---->dnode3 data files: $system_content expect: 3 +if $system_content != 3 then + return -1 +endi system echo "haha, nothing......" > ../../../sim/dnode3/data/vnode/vnode2/tsdb/data/f1643.data + +print ============== step3-1: insert some news data for let version changed sql insert into $tb values ( now + 0a , $x ) ( now + 1a , $x ) ( now + 2a , $x ) -$totalRows = $totalRows + 3 +sql insert into $tb values ( now + 10a , $x ) ( now + 11a , $x ) ( now + 12a , $x ) +$totalRows = $totalRows + 6 sql select count(*) from $stb print data00 $data00 if $data00 != $totalRows then @@ -242,18 +251,18 @@ if $data00 != $totalRows then return -1 endi -system_content ls ../../../sim/dnode2/data/vnode/vnode2/tsdb/data/ -l |grep "^-"|wc -l -print ----> dnode2 data files: [ $system_content ], expect is 0 -#if $system_content != 0 then -# return -1 -#endi +system_content ls ../../../sim/dnode2/data/vnode/vnode2/tsdb/data/ -l |grep "^-"|wc -l | tr -d '\n' +print ----> dnode2 data files: $system_content expect: 0 +if $system_content != 0 then + return -1 +endi -system_content ls ../../../sim/dnode3/data/vnode/vnode2/tsdb/data/ -l |grep "^-"|wc -l -print ----> dnode3 data files: [ $system_content ], expect is 0 -#if $system_content != 0 then -# print there should be no data file in dnode3 after sync -# return -1 -#endi +system_content ls ../../../sim/dnode3/data/vnode/vnode2/tsdb/data/ -l |grep "^-"|wc -l | tr -d '\n' +print ----> dnode3 data files: $system_content expect: 0 +if $system_content != 0 then + print there should be no data file in dnode3 after sync + return -1 +endi print ============== step5: stop dnode2, and check if dnode3 sync ok system sh/exec.sh -n dnode2 -s stop -x SIGINT diff --git a/tests/script/unique/arbitrator/replica_changeWithArbitrator.sim b/tests/script/unique/arbitrator/replica_changeWithArbitrator.sim index a5816b589bb5de3060cb1cebf5f2042df1e00eca..1b2c8d1db6b3068f8bf637466c72edfb2593ff2c 100644 --- a/tests/script/unique/arbitrator/replica_changeWithArbitrator.sim +++ b/tests/script/unique/arbitrator/replica_changeWithArbitrator.sim @@ -25,9 +25,9 @@ system sh/cfg.sh -n dnode3 -c numOfTotalVnodes -v 8 system sh/cfg.sh -n dnode4 -c numOfTotalVnodes -v 8 system sh/cfg.sh -n dnode1 -c mnodeEqualVnodeNum -v 4 -#system sh/cfg.sh -n dnode2 -c mnodeEqualVnodeNum -v 4 -#system sh/cfg.sh -n dnode3 -c mnodeEqualVnodeNum -v 4 -#system sh/cfg.sh -n dnode4 -c mnodeEqualVnodeNum -v 4 +system sh/cfg.sh -n dnode2 -c mnodeEqualVnodeNum -v 4 +system sh/cfg.sh -n dnode3 -c mnodeEqualVnodeNum -v 4 +system sh/cfg.sh -n dnode4 -c mnodeEqualVnodeNum -v 4 system sh/cfg.sh -n dnode1 -c alternativeRole -v 0 system sh/cfg.sh -n dnode2 -c alternativeRole -v 0 diff --git a/tests/script/unique/dnode/balance3.sim b/tests/script/unique/dnode/balance3.sim index a09fd7e4ea99a3ac0ed0fee159111c3b8e1d98b1..b2adb24dfa6c85ec09e37c6c415160963f8e1378 100644 --- a/tests/script/unique/dnode/balance3.sim +++ b/tests/script/unique/dnode/balance3.sim @@ -177,7 +177,6 @@ endi print ========== step5 sql create dnode $hostname6 -system sh/deploy.sh -n dnode6 -i 6 system sh/exec.sh -n dnode6 -s start $x = 0 diff --git a/tests/test/c/insertPerRow.c b/tests/test/c/insertPerRow.c index 6b5a6780938f534ffc1bcab2618301350c6c09e4..906956b998abb98eb85ddf7a86130c1a7bb2b694 100644 --- a/tests/test/c/insertPerRow.c +++ b/tests/test/c/insertPerRow.c @@ -33,6 +33,7 @@ typedef struct { int threadIndex; char dbName[32]; char stableName[64]; + float createTableSpeed; pthread_t thread; } SInfo; @@ -49,8 +50,8 @@ int64_t numOfThreads = 1; int64_t numOfTablesPerThread = 200; char dbName[32] = "db"; char stableName[64] = "st"; -int32_t cache = 16384; -int32_t tables = 1000; +int32_t cache = 16; +int32_t tables = 5000; int main(int argc, char *argv[]) { shellParseArgument(argc, argv); @@ -63,9 +64,8 @@ int main(int argc, char *argv[]) { void createDbAndTable() { pPrint("start to create table"); + TAOS_RES * pSql; TAOS * con; - struct timeval systemTime; - int64_t st, et; char qstr[64000]; char fqdn[TSDB_FQDN_LEN]; @@ -77,22 +77,24 @@ void createDbAndTable() { exit(1); } - sprintf(qstr, "create database if not exists %s cache %d tables %d", dbName, cache, tables); - if (taos_query(con, qstr)) { - pError("failed to create database:%s, code:%d reason:%s", dbName, taos_errno(con), taos_errstr(con)); + sprintf(qstr, "create database if not exists %s cache %d maxtables %d", dbName, cache, tables); + pSql = taos_query(con, qstr); + int32_t code = taos_errno(pSql); + if (code != 0) { + pError("failed to create database:%s, sql:%s, code:%d reason:%s", dbName, qstr, taos_errno(con), taos_errstr(con)); exit(0); } + taos_free_result(pSql); sprintf(qstr, "use %s", dbName); - if (taos_query(con, qstr)) { + pSql = taos_query(con, qstr); + code = taos_errno(pSql); + if (code != 0) { pError("failed to use db, code:%d reason:%s", taos_errno(con), taos_errstr(con)); exit(0); } + taos_free_result(pSql); - gettimeofday(&systemTime, NULL); - st = systemTime.tv_sec * 1000000 + systemTime.tv_usec; - int64_t totalTables = numOfTablesPerThread * numOfThreads; - if (strcmp(stableName, "no") != 0) { int len = sprintf(qstr, "create table if not exists %s(ts timestamp", stableName); for (int64_t f = 0; f < pointsPerTable; ++f) { @@ -100,36 +102,14 @@ void createDbAndTable() { } sprintf(qstr + len, ") tags(t int)"); - if (taos_query(con, qstr)) { + pSql = taos_query(con, qstr); + code = taos_errno(pSql); + if (code != 0) { pError("failed to create stable, code:%d reason:%s", taos_errno(con), taos_errstr(con)); exit(0); } - - for (int64_t t = 0; t < totalTables; ++t) { - sprintf(qstr, "create table if not exists %s%ld using %s tags(%ld)", stableName, t, stableName, t); - if (taos_query(con, qstr)) { - pError("failed to create table %s%d, reason:%s", stableName, t, taos_errstr(con)); - exit(0); - } - } - } else { - for (int64_t t = 0; t < totalTables; ++t) { - int len = sprintf(qstr, "create table if not exists %s%ld(ts timestamp", stableName, t); - for (int64_t f = 0; f < pointsPerTable; ++f) { - len += sprintf(qstr + len, ", f%ld double", f); - } - sprintf(qstr + len, ")"); - - if (taos_query(con, qstr)) { - pError("failed to create table %s%ld, reason:%s", stableName, t, taos_errstr(con)); - exit(0); - } - } + taos_free_result(pSql); } - - gettimeofday(&systemTime, NULL); - et = systemTime.tv_sec * 1000000 + systemTime.tv_usec; - pPrint("%.1f seconds to create %ld tables", (et - st) / 1000.0 / 1000.0, totalTables); } void insertData() { @@ -144,7 +124,7 @@ void insertData() { pthread_attr_t thattr; pthread_attr_init(&thattr); pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); - SInfo *pInfo = (SInfo *)malloc(sizeof(SInfo) * numOfThreads); + SInfo *pInfo = (SInfo *)calloc(numOfThreads, sizeof(SInfo)); // Start threads to write for (int i = 0; i < numOfThreads; ++i) { @@ -173,10 +153,15 @@ void insertData() { double speedOfRows = totalRows / seconds; double speedOfPoints = totalPoints / seconds; + float createTableSpeed = 0; + for (int i = 0; i < numOfThreads; ++i) { + createTableSpeed += pInfo[i].createTableSpeed; + } + pPrint( "%sall threads:%ld finished, use %.1lf seconds, tables:%.ld rows:%ld points:%ld, speed RowsPerSecond:%.1lf " - "PointsPerSecond:%.1lf%s", - GREEN, numOfThreads, seconds, totalTables, totalRows, totalPoints, speedOfRows, speedOfPoints, NC); + "PointsPerSecond:%.1lf CreateTableSpeed:%.1f t/s %s", + GREEN, numOfThreads, seconds, totalTables, totalRows, totalPoints, speedOfRows, speedOfPoints, createTableSpeed, NC); pPrint("threads exit"); @@ -191,6 +176,7 @@ void *syncTest(void *param) { int64_t st, et; char qstr[65000]; int maxBytes = 60000; + int code; pPrint("thread:%d, start to run", pInfo->threadIndex); @@ -210,6 +196,48 @@ void *syncTest(void *param) { gettimeofday(&systemTime, NULL); st = systemTime.tv_sec * 1000000 + systemTime.tv_usec; + if (strcmp(stableName, "no") != 0) { + for (int64_t t = pInfo->tableBeginIndex; t < pInfo->tableEndIndex; ++t) { + sprintf(qstr, "create table if not exists %s%ld using %s tags(%ld)", stableName, t, stableName, t); + TAOS_RES *pSql = taos_query(con, qstr); + code = taos_errno(pSql); + if (code != 0) { + pError("failed to create table %s%d, reason:%s", stableName, t, taos_errstr(con)); + exit(0); + } + taos_free_result(pSql); + } + } else { + for (int64_t t = pInfo->tableBeginIndex; t < pInfo->tableEndIndex; ++t) { + int len = sprintf(qstr, "create table if not exists %s%ld(ts timestamp", stableName, t); + for (int64_t f = 0; f < pointsPerTable; ++f) { + len += sprintf(qstr + len, ", f%ld double", f); + } + sprintf(qstr + len, ")"); + + TAOS_RES *pSql = taos_query(con, qstr); + code = taos_errno(pSql); + if (code != 0) { + pError("failed to create table %s%ld, reason:%s", stableName, t, taos_errstr(con)); + exit(0); + } + taos_free_result(pSql); + } + } + + gettimeofday(&systemTime, NULL); + et = systemTime.tv_sec * 1000000 + systemTime.tv_usec; + float seconds = (et - st) / 1000.0 / 1000.0; + int64_t tables = pInfo->tableEndIndex - pInfo->tableBeginIndex; + pInfo->createTableSpeed = (float)tables / seconds; + pPrint("thread:%d, %.1f seconds to create %ld tables, speed:%.1f", pInfo->threadIndex, seconds, tables, + pInfo->createTableSpeed); + + if (pInfo->rowsPerTable == 0) return NULL; + + gettimeofday(&systemTime, NULL); + st = systemTime.tv_sec * 1000000 + systemTime.tv_usec; + int64_t start = 1430000000000; int64_t interval = 1000; // 1000 ms @@ -227,10 +255,13 @@ void *syncTest(void *param) { } len += sprintf(sql + len, ")"); if (len > maxBytes) { - if (taos_query(con, qstr)) { + TAOS_RES *pSql = taos_query(con, qstr); + int32_t code = taos_errno(pSql); + if (code != 0) { pError("thread:%d, failed to insert table:%s%ld row:%ld, reason:%s", pInfo->threadIndex, pInfo->stableName, table, row, taos_errstr(con)); } + taos_free_result(pSql); // "insert into" len = sprintf(sql, "%s", inserStr); @@ -239,7 +270,8 @@ void *syncTest(void *param) { } if (len != strlen(inserStr)) { - taos_query(con, qstr); + TAOS_RES *pSql = taos_query(con, qstr); + taos_free_result(pSql); } gettimeofday(&systemTime, NULL); diff --git a/tests/test/c/insertPerTable.c b/tests/test/c/insertPerTable.c index a439abec881d21a7d2a1ae51e701f0da59afed44..af92927a9e3003c847f09d235814b4594aa5aa97 100644 --- a/tests/test/c/insertPerTable.c +++ b/tests/test/c/insertPerTable.c @@ -49,8 +49,8 @@ int64_t numOfThreads = 1; int64_t numOfTablesPerThread = 1; char dbName[32] = "db"; char stableName[64] = "st"; -int32_t cache = 16384; -int32_t tables = 1000; +int32_t cache = 16; +int32_t tables = 5000; int main(int argc, char *argv[]) { shellParseArgument(argc, argv); @@ -63,6 +63,7 @@ int main(int argc, char *argv[]) { void createDbAndTable() { pPrint("start to create table"); + TAOS_RES * pSql; TAOS * con; struct timeval systemTime; int64_t st, et; @@ -79,17 +80,22 @@ void createDbAndTable() { exit(1); } - sprintf(qstr, "create database if not exists %s cache %d tables %d", dbName, cache, tables); - if (taos_query(con, qstr)) { - pError("failed to create database:%s, code:%d reason:%s", dbName, taos_errno(con), taos_errstr(con)); + sprintf(qstr, "create database if not exists %s cache %d maxtables %d", dbName, cache, tables); + pSql = taos_query(con, qstr); + int32_t code = taos_errno(pSql); + if (code != 0) { + pError("failed to create database:%s, sql:%s, code:%d reason:%s", dbName, qstr, taos_errno(con), taos_errstr(con)); exit(0); } sprintf(qstr, "use %s", dbName); - if (taos_query(con, qstr)) { + pSql = taos_query(con, qstr); + code = taos_errno(pSql); + if (code != 0) { pError("failed to use db, code:%d reason:%s", taos_errno(con), taos_errstr(con)); exit(0); } + taos_stop_query(pSql); gettimeofday(&systemTime, NULL); st = systemTime.tv_sec * 1000000 + systemTime.tv_usec; @@ -102,17 +108,23 @@ void createDbAndTable() { } sprintf(qstr + len, ") tags(t int)"); - if (taos_query(con, qstr)) { + pSql = taos_query(con, qstr); + code = taos_errno(pSql); + if (code != 0) { pError("failed to create stable, code:%d reason:%s", taos_errno(con), taos_errstr(con)); exit(0); } - + taos_stop_query(pSql); + for (int64_t t = 0; t < totalTables; ++t) { sprintf(qstr, "create table if not exists %s%ld using %s tags(%ld)", stableName, t, stableName, t); - if (taos_query(con, qstr)) { + pSql = taos_query(con, qstr); + code = taos_errno(pSql); + if (code != 0) { pError("failed to create table %s%d, reason:%s", stableName, t, taos_errstr(con)); exit(0); } + taos_stop_query(pSql); } } else { for (int64_t t = 0; t < totalTables; ++t) { @@ -122,16 +134,20 @@ void createDbAndTable() { } sprintf(qstr + len, ")"); - if (taos_query(con, qstr)) { + pSql = taos_query(con, qstr); + code = taos_errno(pSql); + if (code != 0) { pError("failed to create table %s%ld, reason:%s", stableName, t, taos_errstr(con)); exit(0); } + taos_stop_query(pSql); } } gettimeofday(&systemTime, NULL); et = systemTime.tv_sec * 1000000 + systemTime.tv_usec; - pPrint("%.1f seconds to create %ld tables", (et - st) / 1000.0 / 1000.0, totalTables); + float seconds = (et - st) / 1000.0 / 1000.0; + pPrint("%.1f seconds to create %ld tables, speed:%.1f", seconds, totalTables, totalTables / seconds); } void insertData() { @@ -141,7 +157,12 @@ void insertData() { gettimeofday(&systemTime, NULL); st = systemTime.tv_sec * 1000000 + systemTime.tv_usec; - pPrint("%d threads are spawned to insert data", numOfThreads); + if (rowsPerTable <= 0) { + pPrint("not insert data for rowsPerTable is :%d", rowsPerTable); + exit(0); + } else { + pPrint("%d threads are spawned to insert data", numOfThreads); + } pthread_attr_t thattr; pthread_attr_init(&thattr); @@ -230,10 +251,13 @@ void *syncTest(void *param) { } len += sprintf(sql + len, ")"); if (len > maxBytes) { - if (taos_query(con, qstr)) { + TAOS_RES *pSql = taos_query(con, qstr); + int32_t code = taos_errno(pSql); + if (code != 0) { pError("thread:%d, failed to insert table:%s%ld row:%ld, reason:%s", pInfo->threadIndex, pInfo->stableName, table, row, taos_errstr(con)); } + taos_stop_query(pSql); // "insert into" len = sprintf(sql, "%s", inserStr);