未验证 提交 5e8af194 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #2811 from taosdata/feature/platform

Feature/platform
......@@ -142,9 +142,14 @@ int32_t balanceAllocVnodes(SVgObj *pVgroup) {
if (vnodes != pVgroup->numOfVnodes) {
mDebug("vgId:%d, db:%s need vnodes:%d, but alloc:%d, free them", pVgroup->vgId, pVgroup->dbName,
pVgroup->numOfVnodes, vnodes);
balanceReleaseDnodeList();
balanceReleaseDnodeList();
balanceUnLock();
return -1;
if (mnodeGetOnlineDnodesNum() == 0) {
return TSDB_CODE_MND_NOT_READY;
} else {
return TSDB_CODE_MND_NO_ENOUGH_DNODES;
}
}
/*
......@@ -179,7 +184,7 @@ int32_t balanceAllocVnodes(SVgObj *pVgroup) {
balanceReleaseDnodeList();
balanceUnLock();
return 0;
return TSDB_CODE_SUCCESS;
}
static bool balanceCheckVgroupReady(SVgObj *pVgroup, SVnodeGid *pRmVnode) {
......
......@@ -42,6 +42,7 @@ typedef struct SMnodeMsg {
int8_t expected;
int8_t retry;
int32_t code;
void * pObj;
struct SAcctObj * pAcct;
struct SDnodeObj *pDnode;
struct SUserObj * pUser;
......
此差异已折叠。
......@@ -251,6 +251,7 @@ typedef struct {
int32_t rowSize;
int32_t numOfRows;
void * pIter;
void ** ppShow;
int16_t offset[TSDB_MAX_COLUMNS];
int16_t bytes[TSDB_MAX_COLUMNS];
int32_t numOfReads;
......
......@@ -41,7 +41,7 @@ void mgmtMonitorDnodeModule();
int32_t mnodeGetDnodesNum();
int32_t mnodeGetOnlinDnodesCpuCoreNum();
int32_t mnodeGetOnlinDnodesNum();
int32_t mnodeGetOnlineDnodesNum();
void * mnodeGetNextDnode(void *pIter, SDnodeObj **pDnode);
void mnodeIncDnodeRef(SDnodeObj *pDnode);
void mnodeDecDnodeRef(SDnodeObj *pDnode);
......
......@@ -47,15 +47,16 @@ typedef enum {
SDB_OPER_LOCAL
} ESdbOper;
typedef struct {
typedef struct SSdbOper {
ESdbOper type;
void * table;
void * pObj;
void * rowData;
int32_t rowSize;
int32_t retCode; // for callback in sdb queue
int32_t processedCount; // for sync fwd callback
int32_t (*cb)(struct SMnodeMsg *pMsg, int32_t code);
int32_t (*reqFp)(struct SMnodeMsg *pMsg);
int32_t (*writeCb)(struct SMnodeMsg *pMsg, int32_t code);
void * table;
void * pObj;
void * rowData;
struct SMnodeMsg *pMsg;
} SSdbOper;
......@@ -86,6 +87,7 @@ void sdbUpdateMnodeRoles();
int32_t sdbInsertRow(SSdbOper *pOper);
int32_t sdbDeleteRow(SSdbOper *pOper);
int32_t sdbUpdateRow(SSdbOper *pOper);
int32_t sdbInsertRowImp(SSdbOper *pOper);
void *sdbGetRow(void *handle, void *key);
void *sdbFetchRow(void *handle, void *pIter, void **ppRow);
......
......@@ -85,7 +85,7 @@ static int32_t mnodeAcctActionRestored() {
if (numOfRows <= 0 && dnodeIsFirstDeploy()) {
mInfo("dnode first deploy, create root acct");
int32_t code = mnodeCreateRootAcct();
if (code != TSDB_CODE_SUCCESS) {
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mError("failed to create root account, reason:%s", tstrerror(code));
return code;
}
......
......@@ -71,7 +71,7 @@ static int32_t mnodeClusterActionRestored() {
if (numOfRows <= 0 && dnodeIsFirstDeploy()) {
mInfo("dnode first deploy, create cluster");
int32_t code = mnodeCreateCluster();
if (code != TSDB_CODE_SUCCESS) {
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mError("failed to create cluster, reason:%s", tstrerror(code));
return code;
}
......@@ -159,16 +159,15 @@ int32_t mnodeGetClusterId() {
void mnodeUpdateClusterId() {
SClusterObj *pCluster = NULL;
mnodeGetNextCluster(NULL, &pCluster);
void *pIter = mnodeGetNextCluster(NULL, &pCluster);
if (pCluster != NULL) {
tsClusterId = pCluster->clusterId;
mnodeDecClusterRef(pCluster);
mInfo("cluster id is %d", tsClusterId);
} else {
//assert(false);
}
}
mnodeDecClusterRef(pCluster);
sdbFreeIter(pIter);
}
static int32_t mnodeGetClusterMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
int32_t cols = 0;
......
......@@ -331,8 +331,10 @@ static void mnodeSetDefaultDbCfg(SDbCfg *pCfg) {
static int32_t mnodeCreateDbCb(SMnodeMsg *pMsg, int32_t code) {
SDbObj *pDb = pMsg->pDb;
if (pDb != NULL) {
if (code == TSDB_CODE_SUCCESS) {
mLInfo("db:%s, is created by %s", pDb->name, mnodeGetUserFromMsg(pMsg));
} else {
mError("db:%s, failed to create by %s, reason:%s", pDb->name, mnodeGetUserFromMsg(pMsg), tstrerror(code));
}
return code;
......@@ -394,17 +396,16 @@ static int32_t mnodeCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate, void *pMs
.pObj = pDb,
.rowSize = sizeof(SDbObj),
.pMsg = pMsg,
.cb = mnodeCreateDbCb
.writeCb = mnodeCreateDbCb
};
code = sdbInsertRow(&oper);
if (code != TSDB_CODE_SUCCESS) {
mLInfo("db:%s, failed to create, reason:%s", pDb->name, tstrerror(code));
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mError("db:%s, failed to create, reason:%s", pDb->name, tstrerror(code));
mnodeDestroyDb(pDb);
return code;
} else {
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
}
return code;
}
bool mnodeCheckIsMonitorDB(char *db, char *monitordb) {
......@@ -772,8 +773,8 @@ static int32_t mnodeSetDbDropping(SDbObj *pDb) {
};
int32_t code = sdbUpdateRow(&oper);
if (code != TSDB_CODE_SUCCESS) {
return TSDB_CODE_MND_SDB_ERROR;
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mError("db:%s, failed to set dropping state, reason:%s", pDb->name, tstrerror(code));
}
return code;
......@@ -971,12 +972,12 @@ static int32_t mnodeAlterDb(SDbObj *pDb, SCMAlterDbMsg *pAlter, void *pMsg) {
.table = tsDbSdb,
.pObj = pDb,
.pMsg = pMsg,
.cb = mnodeAlterDbCb
.writeCb = mnodeAlterDbCb
};
code = sdbUpdateRow(&oper);
if (code == TSDB_CODE_SUCCESS) {
if (pMsg != NULL) code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mError("db:%s, failed to alter, reason:%s", pDb->name, tstrerror(code));
}
}
......@@ -1019,16 +1020,16 @@ static int32_t mnodeDropDb(SMnodeMsg *pMsg) {
mInfo("db:%s, drop db from sdb", pDb->name);
SSdbOper oper = {
.type = SDB_OPER_GLOBAL,
.table = tsDbSdb,
.pObj = pDb,
.pMsg = pMsg,
.cb = mnodeDropDbCb
.type = SDB_OPER_GLOBAL,
.table = tsDbSdb,
.pObj = pDb,
.pMsg = pMsg,
.writeCb = mnodeDropDbCb
};
int32_t code = sdbDeleteRow(&oper);
if (code == TSDB_CODE_SUCCESS) {
code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mError("db:%s, failed to drop, reason:%s", pDb->name, tstrerror(code));
}
return code;
......@@ -1055,7 +1056,7 @@ static int32_t mnodeProcessDropDbMsg(SMnodeMsg *pMsg) {
}
int32_t code = mnodeSetDbDropping(pMsg->pDb);
if (code != TSDB_CODE_SUCCESS) {
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mError("db:%s, failed to drop, reason:%s", pDrop->db, tstrerror(code));
return code;
}
......
......@@ -206,7 +206,7 @@ int32_t mnodeGetOnlinDnodesCpuCoreNum() {
return cpuCores;
}
int32_t mnodeGetOnlinDnodesNum() {
int32_t mnodeGetOnlineDnodesNum() {
SDnodeObj *pDnode = NULL;
void * pIter = NULL;
int32_t onlineDnodes = 0;
......@@ -261,7 +261,8 @@ void mnodeUpdateDnode(SDnodeObj *pDnode) {
.pObj = pDnode
};
if (sdbUpdateRow(&oper) != 0) {
int32_t code = sdbUpdateRow(&oper);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mError("dnodeId:%d, failed update", pDnode->dnodeId);
}
}
......@@ -501,13 +502,12 @@ static int32_t mnodeCreateDnode(char *ep, SMnodeMsg *pMsg) {
};
int32_t code = sdbInsertRow(&oper);
if (code != TSDB_CODE_SUCCESS) {
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
int dnodeId = pDnode->dnodeId;
tfree(pDnode);
mError("failed to create dnode:%d, result:%s", dnodeId, tstrerror(code));
mError("failed to create dnode:%d, reason:%s", dnodeId, tstrerror(code));
} else {
mInfo("dnode:%d is created, result:%s", pDnode->dnodeId, tstrerror(code));
if (pMsg != NULL) code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
mLInfo("dnode:%d is created", pDnode->dnodeId);
}
return code;
......@@ -522,9 +522,10 @@ int32_t mnodeDropDnode(SDnodeObj *pDnode, void *pMsg) {
};
int32_t code = sdbDeleteRow(&oper);
if (code == TSDB_CODE_SUCCESS) {
mLInfo("dnode:%d, is dropped from cluster, result:%s", pDnode->dnodeId, tstrerror(code));
if (pMsg != NULL) code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mError("dnode:%d, failed to drop from cluster, result:%s", pDnode->dnodeId, tstrerror(code));
} else {
mLInfo("dnode:%d, is dropped from cluster", pDnode->dnodeId);
}
return code;
......
......@@ -16,6 +16,7 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "taoserror.h"
#include "tglobal.h"
#include "trpc.h"
#include "tsync.h"
#include "tbalance.h"
......@@ -31,8 +32,6 @@
#include "mnodeShow.h"
#include "mnodeUser.h"
#include "tglobal.h"
static void * tsMnodeSdb = NULL;
static int32_t tsMnodeUpdateSize = 0;
static SRpcEpSet tsMnodeEpSetForShell;
......@@ -279,9 +278,8 @@ int32_t mnodeAddMnode(int32_t dnodeId) {
};
int32_t code = sdbInsertRow(&oper);
if (code != TSDB_CODE_SUCCESS) {
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
tfree(pMnode);
code = TSDB_CODE_MND_SDB_ERROR;
}
mnodeUpdateMnodeEpSet();
......@@ -313,9 +311,6 @@ int32_t mnodeDropMnode(int32_t dnodeId) {
};
int32_t code = sdbDeleteRow(&oper);
if (code != TSDB_CODE_SUCCESS) {
code = TSDB_CODE_MND_SDB_ERROR;
}
sdbDecRef(tsMnodeSdb, pMnode);
......
......@@ -120,7 +120,7 @@ SConnObj *mnodeAccquireConn(int32_t connId, char *user, uint32_t ip, uint16_t po
}
if (/* pConn->ip != ip || */ pConn->port != port /* || strcmp(pConn->user, user) != 0 */) {
mError("connId:%d, incoming conn user:%s ip:%s:%u, not match exist conn user:%s ip:%s:%u", connId, user,
mDebug("connId:%d, incoming conn user:%s ip:%s:%u, not match exist conn user:%s ip:%s:%u", connId, user,
taosIpStr(ip), port, pConn->user, taosIpStr(pConn->ip), pConn->port);
taosCacheRelease(tsMnodeConnCache, (void **)&pConn, false);
return NULL;
......
......@@ -101,6 +101,11 @@ static int32_t sdbInitWriteWorker();
static void sdbCleanupWriteWorker();
static int32_t sdbAllocWriteQueue();
static void sdbFreeWritequeue();
static int32_t sdbUpdateRowImp(SSdbOper *pOper);
static int32_t sdbDeleteRowImp(SSdbOper *pOper);
static int32_t sdbInsertHash(SSdbTable *pTable, SSdbOper *pOper);
static int32_t sdbUpdateHash(SSdbTable *pTable, SSdbOper *pOper);
static int32_t sdbDeleteHash(SSdbTable *pTable, SSdbOper *pOper);
int32_t sdbGetId(void *handle) {
return ((SSdbTable *)handle)->autoIndex;
......@@ -260,8 +265,20 @@ static void sdbConfirmForward(void *ahandle, void *param, int32_t code) {
tstrerror(code));
}
if (pOper->cb != NULL) {
pOper->retCode = (*pOper->cb)(pMsg, pOper->retCode);
// failed to forward, need revert insert
if (pOper->retCode != TSDB_CODE_SUCCESS) {
SWalHead *pHead = (void *)pOper + sizeof(SSdbOper) + SDB_SYNC_HACK;
int32_t action = pHead->msgType % 10;
sdbError("table:%s record:%p:%s ver:%" PRIu64 ", action:%d failed to foward reason:%s",
((SSdbTable *)pOper->table)->tableName, pOper->pObj, sdbGetKeyStr(pOper->table, pHead->cont),
pHead->version, action, tstrerror(pOper->retCode));
if (action == SDB_ACTION_INSERT) {
sdbDeleteHash(pOper->table, pOper);
}
}
if (pOper->writeCb != NULL) {
pOper->retCode = (*pOper->writeCb)(pMsg, pOper->retCode);
}
dnodeSendRpcMnodeWriteRsp(pMsg, pOper->retCode);
......@@ -269,6 +286,7 @@ static void sdbConfirmForward(void *ahandle, void *param, int32_t code) {
if (ahandle == NULL) {
sdbDecRef(pOper->table, pOper->pObj);
}
taosFreeQitem(pOper);
}
......@@ -396,8 +414,8 @@ void sdbIncRef(void *handle, void *pObj) {
SSdbTable *pTable = handle;
int32_t * pRefCount = (int32_t *)(pObj + pTable->refCountPos);
atomic_add_fetch_32(pRefCount, 1);
sdbTrace("add ref to table:%s record:%p:%s:%d", pTable->tableName, pObj, sdbGetKeyStrFromObj(pTable, pObj), *pRefCount);
int32_t refCount = atomic_add_fetch_32(pRefCount, 1);
sdbTrace("add ref to table:%s record:%p:%s:%d", pTable->tableName, pObj, sdbGetKeyStrFromObj(pTable, pObj), refCount);
}
void sdbDecRef(void *handle, void *pObj) {
......@@ -406,11 +424,11 @@ void sdbDecRef(void *handle, void *pObj) {
SSdbTable *pTable = handle;
int32_t * pRefCount = (int32_t *)(pObj + pTable->refCountPos);
int32_t refCount = atomic_sub_fetch_32(pRefCount, 1);
sdbTrace("def ref of table:%s record:%p:%s:%d", pTable->tableName, pObj, sdbGetKeyStrFromObj(pTable, pObj), *pRefCount);
sdbTrace("def ref of table:%s record:%p:%s:%d", pTable->tableName, pObj, sdbGetKeyStrFromObj(pTable, pObj), refCount);
int32_t *updateEnd = pObj + pTable->refCountPos - 4;
if (refCount <= 0 && *updateEnd) {
sdbTrace("table:%s, record:%p:%s:%d is destroyed", pTable->tableName, pObj, sdbGetKeyStrFromObj(pTable, pObj), *pRefCount);
sdbTrace("table:%s, record:%p:%s:%d is destroyed", pTable->tableName, pObj, sdbGetKeyStrFromObj(pTable, pObj), refCount);
SSdbOper oper = {.pObj = pObj};
(*pTable->destroyFp)(&oper);
}
......@@ -609,7 +627,8 @@ int32_t sdbInsertRow(SSdbOper *pOper) {
if (pTable == NULL) return TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE;
if (sdbGetRowFromObj(pTable, pOper->pObj)) {
sdbError("table:%s, failed to insert record:%s, already exist", pTable->tableName, sdbGetKeyStrFromObj(pTable, pOper->pObj));
sdbError("table:%s, failed to insert record:%s, already exist", pTable->tableName,
sdbGetKeyStrFromObj(pTable, pOper->pObj));
sdbDecRef(pTable, pOper->pObj);
return TSDB_CODE_MND_SDB_OBJ_ALREADY_THERE;
}
......@@ -634,9 +653,20 @@ int32_t sdbInsertRow(SSdbOper *pOper) {
return TSDB_CODE_SUCCESS;
}
if (pOper->reqFp) {
return (*pOper->reqFp)(pOper->pMsg);
} else {
return sdbInsertRowImp(pOper);
}
}
int32_t sdbInsertRowImp(SSdbOper *pOper) {
SSdbTable *pTable = (SSdbTable *)pOper->table;
if (pTable == NULL) return TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE;
int32_t size = sizeof(SSdbOper) + sizeof(SWalHead) + pTable->maxRowSize + SDB_SYNC_HACK;
SSdbOper *pNewOper = taosAllocateQitem(size);
SWalHead *pHead = (void *)pNewOper + sizeof(SSdbOper) + SDB_SYNC_HACK;
pHead->version = 0;
pHead->len = pOper->rowSize;
......@@ -655,7 +685,8 @@ int32_t sdbInsertRow(SSdbOper *pOper) {
sdbIncRef(pNewOper->table, pNewOper->pObj);
taosWriteQitem(tsSdbWriteQueue, TAOS_QTYPE_RPC, pNewOper);
return TSDB_CODE_SUCCESS;
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
}
bool sdbCheckRowDeleted(void *pTableInput, void *pRow) {
......@@ -664,7 +695,6 @@ bool sdbCheckRowDeleted(void *pTableInput, void *pRow) {
int32_t *updateEnd = pRow + pTable->refCountPos - 4;
return atomic_val_compare_exchange_32(updateEnd, 1, 1) == 1;
// return (*updateEnd == 1);
}
int32_t sdbDeleteRow(SSdbOper *pOper) {
......@@ -692,13 +722,24 @@ int32_t sdbDeleteRow(SSdbOper *pOper) {
return TSDB_CODE_SUCCESS;
}
if (pOper->reqFp) {
return (*pOper->reqFp)(pOper->pMsg);
} else {
return sdbDeleteRowImp(pOper);
}
}
int32_t sdbDeleteRowImp(SSdbOper *pOper) {
SSdbTable *pTable = (SSdbTable *)pOper->table;
if (pTable == NULL) return TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE;
int32_t size = sizeof(SSdbOper) + sizeof(SWalHead) + pTable->maxRowSize + SDB_SYNC_HACK;
SSdbOper *pNewOper = taosAllocateQitem(size);
SWalHead *pHead = (void *)pNewOper + sizeof(SSdbOper) + SDB_SYNC_HACK;
pHead->version = 0;
pHead->msgType = pTable->tableId * 10 + SDB_ACTION_DELETE;
pOper->rowData = pHead->cont;
(*pTable->encodeFp)(pOper);
pHead->len = pOper->rowSize;
......@@ -711,7 +752,8 @@ int32_t sdbDeleteRow(SSdbOper *pOper) {
}
taosWriteQitem(tsSdbWriteQueue, TAOS_QTYPE_RPC, pNewOper);
return TSDB_CODE_SUCCESS;
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
}
int32_t sdbUpdateRow(SSdbOper *pOper) {
......@@ -735,6 +777,17 @@ int32_t sdbUpdateRow(SSdbOper *pOper) {
return TSDB_CODE_SUCCESS;
}
if (pOper->reqFp) {
return (*pOper->reqFp)(pOper->pMsg);
} else {
return sdbUpdateRowImp(pOper);
}
}
int32_t sdbUpdateRowImp(SSdbOper *pOper) {
SSdbTable *pTable = (SSdbTable *)pOper->table;
if (pTable == NULL) return TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE;
int32_t size = sizeof(SSdbOper) + sizeof(SWalHead) + pTable->maxRowSize + SDB_SYNC_HACK;
SSdbOper *pNewOper = taosAllocateQitem(size);
......@@ -755,7 +808,8 @@ int32_t sdbUpdateRow(SSdbOper *pOper) {
sdbIncRef(pNewOper->table, pNewOper->pObj);
taosWriteQitem(tsSdbWriteQueue, TAOS_QTYPE_RPC, pNewOper);
return TSDB_CODE_SUCCESS;
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
}
void *sdbFetchRow(void *handle, void *pNode, void **ppRow) {
......
......@@ -49,8 +49,8 @@ static int32_t mnodeProcessUseMsg(SMnodeMsg *mnodeMsg);
static void mnodeFreeShowObj(void *data);
static bool mnodeAccquireShowObj(SShowObj *pShow);
static bool mnodeCheckShowFinished(SShowObj *pShow);
static void *mnodePutShowObj(SShowObj *pShow, int32_t size);
static void mnodeReleaseShowObj(void *pShow, bool forceRemove);
static void *mnodePutShowObj(SShowObj *pShow);
static void mnodeReleaseShowObj(SShowObj *pShow, bool forceRemove);
extern void *tsMnodeTmr;
static void *tsMnodeShowCache = NULL;
......@@ -65,7 +65,7 @@ int32_t mnodeInitShow() {
mnodeAddReadMsgHandle(TSDB_MSG_TYPE_CM_CONNECT, mnodeProcessConnectMsg);
mnodeAddReadMsgHandle(TSDB_MSG_TYPE_CM_USE_DB, mnodeProcessUseMsg);
tsMnodeShowCache = taosCacheInit(TSDB_DATA_TYPE_INT, 5, false, mnodeFreeShowObj, "show");
tsMnodeShowCache = taosCacheInit(TSDB_DATA_TYPE_BIGINT, 5, false, mnodeFreeShowObj, "show");
return 0;
}
......@@ -121,13 +121,13 @@ static int32_t mnodeProcessShowMsg(SMnodeMsg *pMsg) {
}
int32_t showObjSize = sizeof(SShowObj) + htons(pShowMsg->payloadLen);
SShowObj *pShow = (SShowObj *) calloc(1, showObjSize);
SShowObj *pShow = calloc(1, showObjSize);
pShow->type = pShowMsg->type;
pShow->payloadLen = htons(pShowMsg->payloadLen);
tstrncpy(pShow->db, pShowMsg->db, TSDB_DB_NAME_LEN);
memcpy(pShow->payload, pShowMsg->payload, pShow->payloadLen);
pShow = mnodePutShowObj(pShow, showObjSize);
pShow = mnodePutShowObj(pShow);
if (pShow == NULL) {
return TSDB_CODE_MND_OUT_OF_MEMORY;
}
......@@ -270,7 +270,7 @@ static int32_t mnodeProcessHeartBeatMsg(SMnodeMsg *pMsg) {
}
}
pHBRsp->onlineDnodes = htonl(mnodeGetOnlinDnodesNum());
pHBRsp->onlineDnodes = htonl(mnodeGetOnlineDnodesNum());
pHBRsp->totalDnodes = htonl(mnodeGetDnodesNum());
mnodeGetMnodeEpSetForShell(&pHBRsp->epSet);
......@@ -377,37 +377,41 @@ static bool mnodeCheckShowFinished(SShowObj *pShow) {
}
static bool mnodeAccquireShowObj(SShowObj *pShow) {
SShowObj *pSaved = taosCacheAcquireByKey(tsMnodeShowCache, &pShow->index, sizeof(int32_t));
if (pSaved == pShow) {
mDebug("%p, show is accquired from cache", pShow);
SShowObj **ppShow = taosCacheAcquireByKey(tsMnodeShowCache, &pShow, sizeof(int64_t));
if (ppShow) {
mDebug("%p, show is accquired from cache, data:%p, index:%d", pShow, ppShow, pShow->index);
return true;
} else {
return false;
}
return false;
}
static void *mnodePutShowObj(SShowObj *pShow, int32_t size) {
static void* mnodePutShowObj(SShowObj *pShow) {
if (tsMnodeShowCache != NULL) {
pShow->index = atomic_add_fetch_32(&tsShowObjIndex, 1);
SShowObj *newQhandle = taosCachePut(tsMnodeShowCache, &pShow->index, sizeof(int32_t), pShow, size, 6);
mDebug("%p, show is put into cache, index:%d", newQhandle, pShow->index);
free(pShow);
return newQhandle;
SShowObj **ppShow = taosCachePut(tsMnodeShowCache, &pShow, sizeof(int64_t), &pShow, sizeof(int64_t), 6);
pShow->ppShow = (void**)ppShow;
mDebug("%p, show is put into cache, data:%p index:%d", pShow, ppShow, pShow->index);
return pShow;
}
return NULL;
}
static void mnodeFreeShowObj(void *data) {
SShowObj *pShow = data;
SShowObj *pShow = *(SShowObj **)data;
sdbFreeIter(pShow->pIter);
mDebug("%p, show is destroyed", pShow);
mDebug("%p, show is destroyed, data:%p index:%d", pShow, data, pShow->index);
tfree(pShow);
}
static void mnodeReleaseShowObj(void *pShow, bool forceRemove) {
mDebug("%p, show is released, force:%s", pShow, forceRemove ? "true" : "false");
taosCacheRelease(tsMnodeShowCache, &pShow, forceRemove);
static void mnodeReleaseShowObj(SShowObj *pShow, bool forceRemove) {
SShowObj **ppShow = (SShowObj **)pShow->ppShow;
mDebug("%p, show is released, force:%s data:%p index:%d", pShow, forceRemove ? "true" : "false", ppShow,
pShow->index);
taosCacheRelease(tsMnodeShowCache, (void **)(&ppShow), forceRemove);
}
void mnodeVacuumResult(char *data, int32_t numOfCols, int32_t rows, int32_t capacity, SShowObj *pShow) {
......
......@@ -128,9 +128,11 @@ static int32_t mnodeChildTableActionInsert(SSdbOper *pOper) {
if (pTable->info.type == TSDB_CHILD_TABLE) {
// add ref
pTable->superTable = mnodeGetSuperTableByUid(pTable->suid);
mnodeAddTableIntoStable(pTable->superTable, pTable);
grantAdd(TSDB_GRANT_TIMESERIES, pTable->superTable->numOfColumns - 1);
if (pAcct) pAcct->acctInfo.numOfTimeSeries += (pTable->superTable->numOfColumns - 1);
if (pTable->superTable != NULL) {
mnodeAddTableIntoStable(pTable->superTable, pTable);
grantAdd(TSDB_GRANT_TIMESERIES, pTable->superTable->numOfColumns - 1);
if (pAcct) pAcct->acctInfo.numOfTimeSeries += (pTable->superTable->numOfColumns - 1);
}
} else {
grantAdd(TSDB_GRANT_TIMESERIES, pTable->numOfColumns - 1);
if (pAcct) pAcct->acctInfo.numOfTimeSeries += (pTable->numOfColumns - 1);
......@@ -865,18 +867,17 @@ static int32_t mnodeProcessCreateSuperTableMsg(SMnodeMsg *pMsg) {
.pObj = pStable,
.rowSize = sizeof(SSuperTableObj) + schemaSize,
.pMsg = pMsg,
.cb = mnodeCreateSuperTableCb
.writeCb = mnodeCreateSuperTableCb
};
int32_t code = sdbInsertRow(&oper);
if (code != TSDB_CODE_SUCCESS) {
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mnodeDestroySuperTable(pStable);
pMsg->pTable = NULL;
mError("app:%p:%p, table:%s, failed to create, sdb error", pMsg->rpcMsg.ahandle, pMsg, pCreate->tableId);
return code;
} else {
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
}
return code;
}
static int32_t mnodeDropSuperTableCb(SMnodeMsg *pMsg, int32_t code) {
......@@ -924,13 +925,15 @@ static int32_t mnodeProcessDropSuperTableMsg(SMnodeMsg *pMsg) {
.table = tsSuperTableSdb,
.pObj = pStable,
.pMsg = pMsg,
.cb = mnodeDropSuperTableCb
.writeCb = mnodeDropSuperTableCb
};
int32_t code = sdbDeleteRow(&oper);
if (code == TSDB_CODE_SUCCESS) {
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mError("app:%p:%p, table:%s, failed to drop, reason:%s", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId,
tstrerror(code));
}
return code;
}
......@@ -995,15 +998,10 @@ static int32_t mnodeAddSuperTableTag(SMnodeMsg *pMsg, SSchema schema[], int32_t
.table = tsSuperTableSdb,
.pObj = pStable,
.pMsg = pMsg,
.cb = mnodeAddSuperTableTagCb
.writeCb = mnodeAddSuperTableTagCb
};
int32_t code = sdbUpdateRow(&oper);
if (code == TSDB_CODE_SUCCESS) {
code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
}
return code;
return sdbUpdateRow(&oper);
}
static int32_t mnodeDropSuperTableTagCb(SMnodeMsg *pMsg, int32_t code) {
......@@ -1034,15 +1032,10 @@ static int32_t mnodeDropSuperTableTag(SMnodeMsg *pMsg, char *tagName) {
.table = tsSuperTableSdb,
.pObj = pStable,
.pMsg = pMsg,
.cb = mnodeDropSuperTableTagCb
.writeCb = mnodeDropSuperTableTagCb
};
int32_t code = sdbUpdateRow(&oper);
if (code == TSDB_CODE_SUCCESS) {
code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
}
return code;
return sdbUpdateRow(&oper);
}
static int32_t mnodeModifySuperTableTagNameCb(SMnodeMsg *pMsg, int32_t code) {
......@@ -1083,15 +1076,10 @@ static int32_t mnodeModifySuperTableTagName(SMnodeMsg *pMsg, char *oldTagName, c
.table = tsSuperTableSdb,
.pObj = pStable,
.pMsg = pMsg,
.cb = mnodeModifySuperTableTagNameCb
.writeCb = mnodeModifySuperTableTagNameCb
};
int32_t code = sdbUpdateRow(&oper);
if (code == TSDB_CODE_SUCCESS) {
code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
}
return code;
return sdbUpdateRow(&oper);
}
static int32_t mnodeFindSuperTableColumnIndex(SSuperTableObj *pStable, char *colName) {
......@@ -1162,15 +1150,10 @@ static int32_t mnodeAddSuperTableColumn(SMnodeMsg *pMsg, SSchema schema[], int32
.table = tsSuperTableSdb,
.pObj = pStable,
.pMsg = pMsg,
.cb = mnodeAddSuperTableColumnCb
.writeCb = mnodeAddSuperTableColumnCb
};
int32_t code = sdbUpdateRow(&oper);
if (code == TSDB_CODE_SUCCESS) {
code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
}
return code;
return sdbUpdateRow(&oper);
}
static int32_t mnodeDropSuperTableColumnCb(SMnodeMsg *pMsg, int32_t code) {
......@@ -1212,15 +1195,10 @@ static int32_t mnodeDropSuperTableColumn(SMnodeMsg *pMsg, char *colName) {
.table = tsSuperTableSdb,
.pObj = pStable,
.pMsg = pMsg,
.cb = mnodeDropSuperTableColumnCb
.writeCb = mnodeDropSuperTableColumnCb
};
int32_t code = sdbUpdateRow(&oper);
if (code == TSDB_CODE_SUCCESS) {
code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
}
return code;
return sdbUpdateRow(&oper);
}
static int32_t mnodeChangeSuperTableColumnCb(SMnodeMsg *pMsg, int32_t code) {
......@@ -1261,15 +1239,10 @@ static int32_t mnodeChangeSuperTableColumn(SMnodeMsg *pMsg, char *oldName, char
.table = tsSuperTableSdb,
.pObj = pStable,
.pMsg = pMsg,
.cb = mnodeChangeSuperTableColumnCb
.writeCb = mnodeChangeSuperTableColumnCb
};
int32_t code = sdbUpdateRow(&oper);
if (code == TSDB_CODE_SUCCESS) {
code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
}
return code;
return sdbUpdateRow(&oper);
}
// show super tables
......@@ -1645,20 +1618,12 @@ static void *mnodeBuildCreateChildTableMsg(SCMCreateTableMsg *pMsg, SChildTableO
return pCreate;
}
static int32_t mnodeDoCreateChildTableCb(SMnodeMsg *pMsg, int32_t code) {
static int32_t mnodeDoCreateChildTableFp(SMnodeMsg *pMsg) {
SChildTableObj *pTable = (SChildTableObj *)pMsg->pTable;
assert(pTable);
if (code == TSDB_CODE_SUCCESS) {
mDebug("app:%p:%p, table:%s, created in mnode, vgId:%d sid:%d, uid:%" PRIu64 ", result:%s", pMsg->rpcMsg.ahandle,
pMsg, pTable->info.tableId, pTable->vgId, pTable->sid, pTable->uid, tstrerror(code));
} else {
mError("app:%p:%p, table:%s, failed to create table sid:%d, uid:%" PRIu64 ", reason:%s", pMsg->rpcMsg.ahandle, pMsg,
pTable->info.tableId, pTable->sid, pTable->uid, tstrerror(code));
SSdbOper desc = {.type = SDB_OPER_GLOBAL, .pObj = pTable, .table = tsChildTableSdb};
sdbDeleteRow(&desc);
return code;
}
mDebug("app:%p:%p, table:%s, created in mnode, vgId:%d sid:%d, uid:%" PRIu64, pMsg->rpcMsg.ahandle, pMsg,
pTable->info.tableId, pTable->vgId, pTable->sid, pTable->uid);
SCMCreateTableMsg *pCreate = pMsg->rpcMsg.pCont;
SMDCreateTableMsg *pMDCreate = mnodeBuildCreateChildTableMsg(pCreate, pTable);
......@@ -1679,6 +1644,34 @@ static int32_t mnodeDoCreateChildTableCb(SMnodeMsg *pMsg, int32_t code) {
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
}
static int32_t mnodeDoCreateChildTableCb(SMnodeMsg *pMsg, int32_t code) {
SChildTableObj *pTable = (SChildTableObj *)pMsg->pTable;
SCMCreateTableMsg *pCreate = pMsg->rpcMsg.pCont;
assert(pTable);
if (code == TSDB_CODE_SUCCESS) {
if (pCreate->getMeta) {
mDebug("app:%p:%p, table:%s, created in dnode and continue to get meta, thandle:%p", pMsg->rpcMsg.ahandle, pMsg,
pTable->info.tableId, pMsg->rpcMsg.handle);
pMsg->retry = 0;
dnodeReprocessMnodeWriteMsg(pMsg);
} else {
mDebug("app:%p:%p, table:%s, created in dnode, thandle:%p", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId,
pMsg->rpcMsg.handle);
dnodeSendRpcMnodeWriteRsp(pMsg, TSDB_CODE_SUCCESS);
}
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
} else {
mError("app:%p:%p, table:%s, failed to create table sid:%d, uid:%" PRIu64 ", reason:%s", pMsg->rpcMsg.ahandle, pMsg,
pTable->info.tableId, pTable->sid, pTable->uid, tstrerror(code));
SSdbOper desc = {.type = SDB_OPER_GLOBAL, .pObj = pTable, .table = tsChildTableSdb};
sdbDeleteRow(&desc);
return code;
}
}
static int32_t mnodeDoCreateChildTable(SMnodeMsg *pMsg, int32_t tid) {
SVgObj *pVgroup = pMsg->pVgroup;
SCMCreateTableMsg *pCreate = pMsg->rpcMsg.pCont;
......@@ -1752,23 +1745,23 @@ static int32_t mnodeDoCreateChildTable(SMnodeMsg *pMsg, int32_t tid) {
pMsg->pTable = (STableObj *)pTable;
mnodeIncTableRef(pMsg->pTable);
SSdbOper desc = {0};
desc.type = SDB_OPER_GLOBAL;
desc.pObj = pTable;
desc.table = tsChildTableSdb;
desc.pMsg = pMsg;
desc.cb = mnodeDoCreateChildTableCb;
SSdbOper desc = {
.type = SDB_OPER_GLOBAL,
.pObj = pTable,
.table = tsChildTableSdb,
.pMsg = pMsg,
.reqFp = mnodeDoCreateChildTableFp
};
int32_t code = sdbInsertRow(&desc);
if (code != TSDB_CODE_SUCCESS) {
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mnodeDestroyChildTable(pTable);
pMsg->pTable = NULL;
mError("app:%p:%p, table:%s, update sdb error, reason:%s", pMsg->rpcMsg.ahandle, pMsg, pCreate->tableId,
mError("app:%p:%p, table:%s, failed to create, reason:%s", pMsg->rpcMsg.ahandle, pMsg, pCreate->tableId,
tstrerror(code));
return code;
} else {
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
}
return code;
}
static int32_t mnodeProcessCreateChildTableMsg(SMnodeMsg *pMsg) {
......@@ -1813,7 +1806,7 @@ static int32_t mnodeProcessCreateChildTableMsg(SMnodeMsg *pMsg) {
return terrno;
} else {
mDebug("app:%p:%p, table:%s, send create msg to vnode again", pMsg->rpcMsg.ahandle, pMsg, pCreate->tableId);
return mnodeDoCreateChildTableCb(pMsg, TSDB_CODE_SUCCESS);
return mnodeDoCreateChildTableFp(pMsg);
}
}
......@@ -1878,13 +1871,15 @@ static int32_t mnodeProcessDropChildTableMsg(SMnodeMsg *pMsg) {
.table = tsChildTableSdb,
.pObj = pTable,
.pMsg = pMsg,
.cb = mnodeDropChildTableCb
.writeCb = mnodeDropChildTableCb
};
int32_t code = sdbDeleteRow(&oper);
if (code == TSDB_CODE_SUCCESS) {
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mError("app:%p:%p, ctable:%s, failed to drop, reason:%s", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId,
tstrerror(code));
}
return code;
}
......@@ -1980,15 +1975,10 @@ static int32_t mnodeAddNormalTableColumn(SMnodeMsg *pMsg, SSchema schema[], int3
.table = tsChildTableSdb,
.pObj = pTable,
.pMsg = pMsg,
.cb = mnodeAlterNormalTableColumnCb
.writeCb = mnodeAlterNormalTableColumnCb
};
int32_t code = sdbUpdateRow(&oper);
if (code == TSDB_CODE_SUCCESS) {
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
}
return code;
return sdbUpdateRow(&oper);
}
static int32_t mnodeDropNormalTableColumn(SMnodeMsg *pMsg, char *colName) {
......@@ -2018,15 +2008,10 @@ static int32_t mnodeDropNormalTableColumn(SMnodeMsg *pMsg, char *colName) {
.table = tsChildTableSdb,
.pObj = pTable,
.pMsg = pMsg,
.cb = mnodeAlterNormalTableColumnCb
.writeCb = mnodeAlterNormalTableColumnCb
};
int32_t code = sdbUpdateRow(&oper);
if (code == TSDB_CODE_SUCCESS) {
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
}
return code;
return sdbUpdateRow(&oper);
}
static int32_t mnodeChangeNormalTableColumn(SMnodeMsg *pMsg, char *oldName, char *newName) {
......@@ -2060,15 +2045,10 @@ static int32_t mnodeChangeNormalTableColumn(SMnodeMsg *pMsg, char *oldName, char
.table = tsChildTableSdb,
.pObj = pTable,
.pMsg = pMsg,
.cb = mnodeAlterNormalTableColumnCb
.writeCb = mnodeAlterNormalTableColumnCb
};
int32_t code = sdbUpdateRow(&oper);
if (code == TSDB_CODE_SUCCESS) {
code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
}
return code;
return sdbUpdateRow(&oper);
}
static int32_t mnodeSetSchemaFromNormalTable(SSchema *pSchema, SChildTableObj *pTable) {
......@@ -2374,19 +2354,19 @@ static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg) {
}
if (rpcMsg->code == TSDB_CODE_SUCCESS || rpcMsg->code == TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
SCMCreateTableMsg *pCreate = mnodeMsg->rpcMsg.pCont;
if (pCreate->getMeta) {
mDebug("app:%p:%p, table:%s, created in dnode and continue to get meta, thandle:%p result:%s",
mnodeMsg->rpcMsg.ahandle, mnodeMsg, pTable->info.tableId, mnodeMsg->rpcMsg.handle,
tstrerror(rpcMsg->code));
mnodeMsg->retry = 0;
dnodeReprocessMnodeWriteMsg(mnodeMsg);
} else {
mDebug("app:%p:%p, table:%s, created in dnode, thandle:%p result:%s", mnodeMsg->rpcMsg.ahandle, mnodeMsg,
pTable->info.tableId, mnodeMsg->rpcMsg.handle, tstrerror(rpcMsg->code));
dnodeSendRpcMnodeWriteRsp(mnodeMsg, TSDB_CODE_SUCCESS);
SSdbOper desc = {
.type = SDB_OPER_GLOBAL,
.pObj = pTable,
.table = tsChildTableSdb,
.pMsg = mnodeMsg,
.writeCb = mnodeDoCreateChildTableCb
};
int32_t code = sdbInsertRowImp(&desc);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mnodeMsg->pTable = NULL;
mnodeDestroyChildTable(pTable);
dnodeSendRpcMnodeWriteRsp(mnodeMsg, code);
}
} else {
if (mnodeMsg->retry++ < 10) {
......
......@@ -182,9 +182,10 @@ static int32_t mnodeUpdateUser(SUserObj *pUser, void *pMsg) {
};
int32_t code = sdbUpdateRow(&oper);
if (code == TSDB_CODE_SUCCESS) {
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mError("user:%s, failed to alter by %s, reason:%s", pUser->user, mnodeGetUserFromMsg(pMsg), tstrerror(code));
} else {
mLInfo("user:%s, is altered by %s", pUser->user, mnodeGetUserFromMsg(pMsg));
if (pMsg != NULL) code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
}
return code;
......@@ -236,11 +237,11 @@ int32_t mnodeCreateUser(SAcctObj *pAcct, char *name, char *pass, void *pMsg) {
};
code = sdbInsertRow(&oper);
if (code != TSDB_CODE_SUCCESS) {
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mError("user:%s, failed to create by %s, reason:%s", pUser->user, mnodeGetUserFromMsg(pMsg), tstrerror(code));
tfree(pUser);
} else {
mLInfo("user:%s, is created by %s", pUser->user, mnodeGetUserFromMsg(pMsg));
if (pMsg != NULL) code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
}
return code;
......@@ -255,9 +256,10 @@ static int32_t mnodeDropUser(SUserObj *pUser, void *pMsg) {
};
int32_t code = sdbDeleteRow(&oper);
if (code == TSDB_CODE_SUCCESS) {
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mError("user:%s, failed to drop by %s, reason:%s", pUser->user, mnodeGetUserFromMsg(pMsg), tstrerror(code));
} else {
mLInfo("user:%s, is dropped by %s", pUser->user, mnodeGetUserFromMsg(pMsg));
if (pMsg != NULL) code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
}
return code;
......
......@@ -256,7 +256,8 @@ void mnodeUpdateVgroup(SVgObj *pVgroup) {
.pObj = pVgroup
};
if (sdbUpdateRow(&oper) != TSDB_CODE_SUCCESS) {
int32_t code = sdbUpdateRow(&oper);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mError("vgId:%d, failed to update vgroup", pVgroup->vgId);
}
mnodeSendAlterVgroupMsg(pVgroup);
......@@ -433,11 +434,12 @@ int32_t mnodeGetAvailableVgroup(SMnodeMsg *pMsg, SVgObj **ppVgroup, int32_t *pSi
maxVgroupsPerDb = MAX(maxVgroupsPerDb, 2);
}
int32_t code = TSDB_CODE_MND_NO_ENOUGH_DNODES;
if (pDb->numOfVgroups < maxVgroupsPerDb) {
mDebug("app:%p:%p, db:%s, try to create a new vgroup, numOfVgroups:%d maxVgroupsPerDb:%d", pMsg->rpcMsg.ahandle,
pMsg, pDb->name, pDb->numOfVgroups, maxVgroupsPerDb);
pthread_mutex_unlock(&pDb->mutex);
int32_t code = mnodeCreateVgroup(pMsg);
code = mnodeCreateVgroup(pMsg);
if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) {
return code;
} else {
......@@ -448,10 +450,10 @@ int32_t mnodeGetAvailableVgroup(SMnodeMsg *pMsg, SVgObj **ppVgroup, int32_t *pSi
SVgObj *pVgroup = pDb->vgList[0];
if (pVgroup == NULL) {
pthread_mutex_unlock(&pDb->mutex);
return TSDB_CODE_MND_NO_ENOUGH_DNODES;
return code;
}
int32_t code = mnodeAllocVgroupIdPool(pVgroup);
code = mnodeAllocVgroupIdPool(pVgroup);
if (code != TSDB_CODE_SUCCESS) {
pthread_mutex_unlock(&pDb->mutex);
return code;
......@@ -476,23 +478,11 @@ void *mnodeGetNextVgroup(void *pIter, SVgObj **pVgroup) {
return sdbFetchRow(tsVgroupSdb, pIter, (void **)pVgroup);
}
static int32_t mnodeCreateVgroupCb(SMnodeMsg *pMsg, int32_t code) {
static int32_t mnodeCreateVgroupFp(SMnodeMsg *pMsg) {
SVgObj *pVgroup = pMsg->pVgroup;
SDbObj *pDb = pMsg->pDb;
assert(pVgroup);
if (code != TSDB_CODE_SUCCESS) {
mError("app:%p:%p, vgId:%d, failed to create in sdb, reason:%s", pMsg->rpcMsg.ahandle, pMsg, pVgroup->vgId,
tstrerror(code));
SSdbOper desc = {.type = SDB_OPER_GLOBAL, .pObj = pVgroup, .table = tsVgroupSdb};
sdbDeleteRow(&desc);
return code;
} else {
pVgroup->status = TAOS_VG_STATUS_READY;
SSdbOper desc = {.type = SDB_OPER_GLOBAL, .pObj = pVgroup, .table = tsVgroupSdb};
(void)sdbUpdateRow(&desc);
}
mInfo("app:%p:%p, vgId:%d, is created in mnode, db:%s replica:%d", pMsg->rpcMsg.ahandle, pMsg, pVgroup->vgId,
pDb->name, pVgroup->numOfVnodes);
for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
......@@ -508,6 +498,29 @@ static int32_t mnodeCreateVgroupCb(SMnodeMsg *pMsg, int32_t code) {
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
}
static int32_t mnodeCreateVgroupCb(SMnodeMsg *pMsg, int32_t code) {
SVgObj *pVgroup = pMsg->pVgroup;
SDbObj *pDb = pMsg->pDb;
assert(pVgroup);
if (code != TSDB_CODE_SUCCESS) {
mError("app:%p:%p, vgId:%d, failed to create in sdb, reason:%s", pMsg->rpcMsg.ahandle, pMsg, pVgroup->vgId,
tstrerror(code));
SSdbOper desc = {.type = SDB_OPER_GLOBAL, .pObj = pVgroup, .table = tsVgroupSdb};
sdbDeleteRow(&desc);
return code;
} else {
mInfo("app:%p:%p, vgId:%d, is created in sdb, db:%s replica:%d", pMsg->rpcMsg.ahandle, pMsg, pVgroup->vgId,
pDb->name, pVgroup->numOfVnodes);
pVgroup->status = TAOS_VG_STATUS_READY;
SSdbOper desc = {.type = SDB_OPER_GLOBAL, .pObj = pVgroup, .table = tsVgroupSdb};
(void)sdbUpdateRow(&desc);
dnodeReprocessMnodeWriteMsg(pMsg);
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
}
}
int32_t mnodeCreateVgroup(SMnodeMsg *pMsg) {
if (pMsg == NULL) return TSDB_CODE_MND_APP_ERROR;
SDbObj *pDb = pMsg->pDb;
......@@ -517,30 +530,34 @@ int32_t mnodeCreateVgroup(SMnodeMsg *pMsg) {
pVgroup->numOfVnodes = pDb->cfg.replications;
pVgroup->createdTime = taosGetTimestampMs();
pVgroup->accessState = TSDB_VN_ALL_ACCCESS;
if (balanceAllocVnodes(pVgroup) != 0) {
mError("db:%s, no enough dnode to alloc %d vnodes to vgroup", pDb->name, pVgroup->numOfVnodes);
int32_t code = balanceAllocVnodes(pVgroup);
if (code != TSDB_CODE_SUCCESS) {
mError("db:%s, no enough dnode to alloc %d vnodes to vgroup, reason:%s", pDb->name, pVgroup->numOfVnodes,
tstrerror(code));
free(pVgroup);
return TSDB_CODE_MND_NO_ENOUGH_DNODES;
return code;
}
if (pMsg->pVgroup != NULL) {
mnodeDecVgroupRef(pMsg->pVgroup);
}
pMsg->pVgroup = pVgroup;
mnodeIncVgroupRef(pVgroup);
SSdbOper oper = {
.type = SDB_OPER_GLOBAL,
.table = tsVgroupSdb,
.pObj = pVgroup,
.type = SDB_OPER_GLOBAL,
.table = tsVgroupSdb,
.pObj = pVgroup,
.rowSize = sizeof(SVgObj),
.pMsg = pMsg,
.cb = mnodeCreateVgroupCb
.pMsg = pMsg,
.reqFp = mnodeCreateVgroupFp
};
int32_t code = sdbInsertRow(&oper);
if (code != TSDB_CODE_SUCCESS) {
code = sdbInsertRow(&oper);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
pMsg->pVgroup = NULL;
mnodeDestroyVgroup(pVgroup);
} else {
code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
}
return code;
......@@ -801,7 +818,7 @@ static SMDCreateVnodeMsg *mnodeBuildVnodeMsg(SVgObj *pVgroup) {
SRpcEpSet mnodeGetEpSetFromVgroup(SVgObj *pVgroup) {
SRpcEpSet epSet = {
.numOfEps = pVgroup->numOfVnodes,
.inUse = 0,
.inUse = pVgroup->inUse,
};
for (int i = 0; i < pVgroup->numOfVnodes; ++i) {
strcpy(epSet.fqdn[i], pVgroup->vnodeGid[i].pDnode->dnodeFqdn);
......@@ -891,18 +908,28 @@ static void mnodeProcessCreateVnodeRsp(SRpcMsg *rpcMsg) {
if (mnodeMsg->received != mnodeMsg->expected) return;
if (mnodeMsg->received == mnodeMsg->successed) {
dnodeReprocessMnodeWriteMsg(mnodeMsg);
SSdbOper oper = {
.type = SDB_OPER_GLOBAL,
.table = tsVgroupSdb,
.pObj = pVgroup,
.rowSize = sizeof(SVgObj),
.pMsg = mnodeMsg,
.writeCb = mnodeCreateVgroupCb
};
int32_t code = sdbInsertRowImp(&oper);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mnodeMsg->pVgroup = NULL;
mnodeDestroyVgroup(pVgroup);
dnodeSendRpcMnodeWriteRsp(mnodeMsg, code);
}
} else {
SSdbOper oper = {
.type = SDB_OPER_GLOBAL,
.table = tsVgroupSdb,
.pObj = pVgroup
};
int32_t code = sdbDeleteRow(&oper);
if (code != 0) {
code = TSDB_CODE_MND_SDB_ERROR;
}
sdbDeleteRow(&oper);
dnodeSendRpcMnodeWriteRsp(mnodeMsg, mnodeMsg->code);
}
}
......
......@@ -338,14 +338,15 @@ void taosRemoveDir(char *rootDir);
int taosMkDir(const char *pathname, mode_t mode);
void taosMvDir(char* destDir, char *srcDir);
#ifdef TAOS_RANDOM_FILE_FAIL
void taosSetRandomFileFailFactor(int factor);
void taosSetRandomFileFailOutput(const char *path);
ssize_t taosReadFileRandomFail(int fd, void *buf, size_t count, const char *file, uint32_t line);
ssize_t taosWriteFileRandomFail(int fd, void *buf, size_t count, const char *file, uint32_t line);
off_t taosLSeekRandomFail(int fd, off_t offset, int whence, const char *file, uint32_t line);
#undef taosTRead
#undef taosTWrite
#undef taosLSeek
#define taosTRead(fd, buf, count) taosReadFileRandomFail(fd, buf, count, __FILE__, __LINE__)
#define taosTWrite(fd, buf, count) taosWriteFileRandomFail(fd, buf, count, __FILE__, __LINE__)
#define taosLSeek(fd, offset, whence) taosLSeekRandomFail(fd, offset, whence, __FILE__, __LINE__)
......@@ -356,6 +357,10 @@ void taosMvDir(char* destDir, char *srcDir);
ssize_t taosSendToRandomFail(int sockfd, const void *buf, size_t len, int flags, const struct sockaddr *dest_addr, socklen_t addrlen);
ssize_t taosReadSocketRandomFail(int fd, void *buf, size_t count);
ssize_t taosWriteSocketRandomFail(int fd, const void *buf, size_t count);
#undef taosSend
#undef taosSendto
#undef taosReadSocket
#undef taosWriteSocket
#define taosSend(sockfd, buf, len, flags) taosSendRandomFail(sockfd, buf, len, flags)
#define taosSendto(sockfd, buf, len, flags, dest_addr, addrlen) taosSendToRandomFail(sockfd, buf, len, flags, dest_addr, addrlen)
#define taosReadSocket(fd, buf, len) taosReadSocketRandomFail(fd, buf, len)
......
......@@ -58,7 +58,7 @@ static void httpDestroyContext(void *data) {
}
bool httpInitContexts() {
tsHttpServer.contextCache = taosCacheInit(TSDB_DATA_TYPE_BIGINT, 2, true, httpDestroyContext, "restc");
tsHttpServer.contextCache = taosCacheInit(TSDB_DATA_TYPE_BIGINT, 3, true, httpDestroyContext, "restc");
if (tsHttpServer.contextCache == NULL) {
httpError("failed to init context cache");
return false;
......@@ -108,7 +108,7 @@ HttpContext *httpCreateContext(int32_t fd) {
pContext->lastAccessTime = taosGetTimestampSec();
pContext->state = HTTP_CONTEXT_STATE_READY;
HttpContext **ppContext = taosCachePut(tsHttpServer.contextCache, &pContext, sizeof(int64_t), &pContext, sizeof(int64_t), 3);
HttpContext **ppContext = taosCachePut(tsHttpServer.contextCache, &pContext, sizeof(int64_t), &pContext, sizeof(int64_t), 5);
pContext->ppContext = ppContext;
httpDebug("context:%p, fd:%d, is created, data:%p", pContext, fd, ppContext);
......@@ -133,13 +133,22 @@ HttpContext *httpGetContext(void *ptr) {
}
void httpReleaseContext(HttpContext *pContext) {
// Ensure that the context is valid before release
HttpContext **ppContext = taosCacheAcquireByKey(tsHttpServer.contextCache, &pContext, sizeof(HttpContext *));
if (ppContext == NULL) {
httpError("context:%p, is already released", pContext);
return;
}
int32_t refCount = atomic_sub_fetch_32(&pContext->refCount, 1);
assert(refCount >= 0);
assert(ppContext == pContext->ppContext);
HttpContext **ppContext = pContext->ppContext;
httpDebug("context:%p, is released, data:%p refCount:%d", pContext, ppContext, refCount);
if (tsHttpServer.contextCache != NULL) {
// and release context twice
taosCacheRelease(tsHttpServer.contextCache, (void **)(&ppContext), false);
taosCacheRelease(tsHttpServer.contextCache, (void **)(&ppContext), false);
} else {
httpDebug("context:%p, won't be destroyed for cache is already released", pContext);
......
......@@ -12,6 +12,10 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
// no test file errors here
#undef TAOS_RANDOM_FILE_FAIL
#include "tsdbMain.h"
#include "os.h"
#include "talgo.h"
......@@ -22,8 +26,6 @@
#include "ttime.h"
#include "tulog.h"
#include <pthread.h>
#include <sys/stat.h>
#define TSDB_CFG_FILE_NAME "config"
#define TSDB_DATA_DIR_NAME "data"
......
......@@ -14,6 +14,9 @@
*/
#define _DEFAULT_SOURCE
// no test file errors here
#undef TAOS_RANDOM_FILE_FAIL
#include "os.h"
#include "tulog.h"
#include "tlog.h"
......
......@@ -13,6 +13,9 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
// no test file errors here
#undef TAOS_RANDOM_FILE_FAIL
#include "tnote.h"
taosNoteInfo m_HttpNote;
......
......@@ -66,7 +66,7 @@ endi
system_content curl 127.0.0.1:6020/grafana/login/xx/xx/
print 3-> $system_content
if $system_content != @{"status":"error","code":849,"desc":"mnode invalid user"}@ then
if $system_content != @{"status":"error","code":849,"desc":"Invalid user"}@ then
return -1
endi
......@@ -78,7 +78,7 @@ endi
system_content curl -H 'Authorization: Taosd /KfeAzX/f9na8qdtNZmtONryp201ma04bEl8LcvLUd7a8qdtNZmtONryp201ma04' -d 'show databases' 127.0.0.1:6020/grafana/login/1/root/1/
print 5-> $system_content
if $system_content != @{"status":"error","code":849,"desc":"mnode invalid user"}@ then
if $system_content != @{"status":"error","code":849,"desc":"Invalid user"}@ then
return -1
endi
......
......@@ -77,7 +77,7 @@ endi
system_content curl 127.0.0.1:6020/rest/login/u2/aabcd_1234
print curl 127.0.0.1:6020/rest/login/u2/abcd_1234 -----> $system_content
if $system_content != @{"status":"error","code":3,"desc":"auth failure"}@ then
if $system_content != @{"status":"error","code":3,"desc":"Authentication failure"}@ then
return -1
endi
......
......@@ -39,19 +39,19 @@ endi
system_content curl 127.0.0.1:6020/rest/login/root/123
print 5-> $system_content
if $system_content != @{"status":"error","code":3,"desc":"auth failure"}@ then
if $system_content != @{"status":"error","code":3,"desc":"Authentication failure"}@ then
return -1
endi
system_content curl 127.0.0.1:6020/rest/login/root/123/1/1/3
print 6-> $system_content
if $system_content != @{"status":"error","code":3,"desc":"auth failure"}@ then
if $system_content != @{"status":"error","code":3,"desc":"Authentication failure"}@ then
return -1
endi
system_content curl -H 'Authorization: Taosd /KfeAzX/f9na8qdtNZmtONryp201ma04bEl8LcvLUd7a8qdtNZmtONryp201ma04' -d 'show databases' 127.0.0.1:6020/rest/login/root/1
print 7-> $system_content
if $system_content != @{"status":"error","code":3,"desc":"auth failure"}@ then
if $system_content != @{"status":"error","code":3,"desc":"Authentication failure"}@ then
return -1
endi
......@@ -93,7 +93,7 @@ endi
system_content curl -H 'Authorization: Taosd /KfeAzX/f9na8qdtNZmtONryp201ma04bEl8LcvLUd7a8qdtNZmtONryp201ma04' -d 'create database d1' 127.0.0.1:6020/rest/sql
print 13-> $system_content
if $system_content != @{"status":"error","code":897,"desc":"mnode database aleady exist"}@ then
if $system_content != @{"status":"error","code":897,"desc":"Database already exists"}@ then
return -1
endi
......@@ -126,7 +126,7 @@ endi
#18
system_content curl -H 'Authorization: Taosd /KfeAzX/f9na8qdtNZmtONryp201ma04bEl8LcvLUd7a8qdtNZmtONryp201ma04' -d ' show tables;' 127.0.0.1:6020/rest/sql
print 18-> $system_content
if $system_content != @{"status":"error","code":896,"desc":"mnode db not selected"}@ then
if $system_content != @{"status":"error","code":896,"desc":"Database not specified or available"}@ then
return -1
endi
......@@ -147,7 +147,7 @@ print =============== step3 - db
system_content curl -H 'Authorization: Taosd /KfeAzX/f9na8qdtNZmtONryp201ma04bEl8LcvLUd7a8qdtNZmtONryp201ma04' -d ' select * from d1.t1;' 127.0.0.1:6020/rest/sql
print 21-> $system_content
if $system_content != @{"status":"error","code":866,"desc":"mnode invalid table name"}@ then
if $system_content != @{"status":"error","code":866,"desc":"Table does not exist"}@ then
return -1
endi
......
......@@ -57,13 +57,13 @@ endi
system_content curl 127.0.0.1:6020/admin/login/root/123
print 5-> $system_content
if $system_content != @{"status":"error","code":3,"desc":"auth failure"}@ then
if $system_content != @{"status":"error","code":3,"desc":"Authentication failure"}@ then
return -1
endi
system_content curl 127.0.0.1:6020/admin/login/root/123/1/1/3
print 6-> $system_content
if $system_content != @{"status":"error","code":3,"desc":"auth failure"}@ then
if $system_content != @{"status":"error","code":3,"desc":"Authentication failure"}@ then
return -1
endi
......
......@@ -74,7 +74,7 @@ endi
system_content curl -u root:taosdata -d '[{"metric": "ab1234567890123456789012345678ab1234567890123456789012345678","timestamp": 1346846400,"value": 18,"tags": {"host": "web01","group1": "1","dc": "lga"}}]' 127.0.0.1:6020/opentsdb/db/put
print $system_content
if $system_content != @{"errors":[{"datapoint":{"metric":"ab1234567890123456789012345678ab1234567890123456789012345678","stable":"ab1234567890123456789012345678ab1234567890123456789012345678_d_bbb","table":"ab1234567890123456789012345678ab1234567890123456789012345678_d_bbb_lga_1_web01","timestamp":1346846400,"value":18.000000,"tags":{"dc":"lga","group1":"1","host":"web01"},"status":"error","code":1547,"desc":"tsdb timestamp is out of range"}}],"failed":1,"success":0,"affected_rows":0}@ then
if $system_content != @{"errors":[{"datapoint":{"metric":"ab1234567890123456789012345678ab1234567890123456789012345678","stable":"ab1234567890123456789012345678ab1234567890123456789012345678_d_bbb","table":"ab1234567890123456789012345678ab1234567890123456789012345678_d_bbb_lga_1_web01","timestamp":1346846400,"value":18.000000,"tags":{"dc":"lga","group1":"1","host":"web01"},"status":"error","code":1547,"desc":"Timestamp data out of range"}}],"failed":1,"success":0,"affected_rows":0}@ then
return -1
endi
......
......@@ -31,7 +31,6 @@ char dbName[32] = "db";
char stableName[64] = "st";
int32_t numOfThreads = 30;
int32_t numOfTables = 100000;
int32_t maxTables = 5000;
int32_t replica = 1;
int32_t numOfColumns = 2;
......@@ -98,7 +97,7 @@ void createDbAndSTable() {
exit(1);
}
sprintf(qstr, "create database if not exists %s maxtables %d replica %d", dbName, maxTables, replica);
sprintf(qstr, "create database if not exists %s replica %d", dbName, replica);
TAOS_RES *pSql = taos_query(con, qstr);
int32_t code = taos_errno(pSql);
if (code != 0) {
......@@ -195,8 +194,6 @@ void printHelp() {
printf("%s%s%s%d\n", indent, indent, "replica, default is ", replica);
printf("%s%s\n", indent, "-columns");
printf("%s%s%s%d\n", indent, indent, "numOfColumns, default is ", numOfColumns);
printf("%s%s\n", indent, "-tables");
printf("%s%s%s%d\n", indent, indent, "Database parameters tables, default is ", maxTables);
exit(EXIT_SUCCESS);
}
......@@ -218,8 +215,6 @@ void shellParseArgument(int argc, char *argv[]) {
numOfTables = atoi(argv[++i]);
} else if (strcmp(argv[i], "-r") == 0) {
replica = atoi(argv[++i]);
} else if (strcmp(argv[i], "-tables") == 0) {
maxTables = atoi(argv[++i]);
} else if (strcmp(argv[i], "-columns") == 0) {
numOfColumns = atoi(argv[++i]);
} else {
......@@ -233,7 +228,6 @@ void shellParseArgument(int argc, char *argv[]) {
pPrint("%s numOfThreads:%d %s", GREEN, numOfThreads, NC);
pPrint("%s numOfColumns:%d %s", GREEN, numOfColumns, NC);
pPrint("%s replica:%d %s", GREEN, replica, NC);
pPrint("%s dbPara maxTables:%d %s", GREEN, maxTables, NC);
pPrint("%s start create table performace test %s", GREEN, NC);
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册