未验证 提交 9ab8fb6f 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #2737 from taosdata/feature/vnode

Feature/vnode
......@@ -50,8 +50,8 @@ typedef struct SDnodeObj {
int8_t alternativeRole; // from dnode status msg, 0-any, 1-mgmt, 2-dnode
int8_t status; // set in balance function
int8_t isMgmt;
int8_t reserve1[14];
int8_t updateEnd[1];
int8_t reserve1[11];
int8_t updateEnd[4];
int32_t refCount;
uint32_t moduleStatus;
uint32_t lastReboot; // time stamp for last reboot
......@@ -68,8 +68,8 @@ typedef struct SMnodeObj {
int32_t mnodeId;
int8_t reserved0[4];
int64_t createdTime;
int8_t reserved1[7];
int8_t updateEnd[1];
int8_t reserved1[4];
int8_t updateEnd[4];
int32_t refCount;
int8_t role;
int8_t reserved2[3];
......@@ -90,8 +90,7 @@ typedef struct SSuperTableObj {
int32_t tversion;
int32_t numOfColumns;
int32_t numOfTags;
int8_t reserved1[3];
int8_t updateEnd[1];
int8_t updateEnd[4];
int32_t refCount;
int32_t numOfTables;
SSchema * schema;
......@@ -111,8 +110,7 @@ typedef struct {
int32_t sid;
int32_t vgId;
int32_t sqlLen;
int8_t updateEnd[1];
int8_t reserved1[1];
int8_t updateEnd[4];
int32_t refCount;
char* sql; //used by normal table
SSchema* schema; //used by normal table
......@@ -138,8 +136,8 @@ typedef struct SVgObj {
int8_t status;
int8_t reserved0[4];
SVnodeGid vnodeGid[TSDB_MAX_REPLICA];
int8_t reserved1[7];
int8_t updateEnd[1];
int8_t reserved1[4];
int8_t updateEnd[4];
int32_t refCount;
int32_t numOfTables;
int64_t totalStorage;
......@@ -176,8 +174,8 @@ typedef struct SDbObj {
int32_t cfgVersion;
SDbCfg cfg;
int8_t status;
int8_t reserved1[14];
int8_t updateEnd[1];
int8_t reserved1[11];
int8_t updateEnd[4];
int32_t refCount;
int32_t numOfVgroups;
int32_t numOfTables;
......@@ -196,8 +194,8 @@ typedef struct SUserObj {
int64_t createdTime;
int8_t superAuth;
int8_t writeAuth;
int8_t reserved[13];
int8_t updateEnd[1];
int8_t reserved[10];
int8_t updateEnd[4];
int32_t refCount;
struct SAcctObj * pAcct;
} SUserObj;
......@@ -228,11 +226,11 @@ typedef struct SAcctObj {
int64_t createdTime;
int32_t acctId;
int8_t status;
int8_t reserved0[10];
int8_t updateEnd[1];
SAcctInfo acctInfo;
int8_t reserved0[7];
int8_t updateEnd[4];
int32_t refCount;
int8_t reserved1[4];
SAcctInfo acctInfo;
pthread_mutex_t mutex;
} SAcctObj;
......
......@@ -67,8 +67,11 @@ static int32_t mnodeDbActionInsert(SSdbOper *pOper) {
SAcctObj *pAcct = mnodeGetAcct(pDb->acct);
pthread_mutex_init(&pDb->mutex, NULL);
pthread_mutex_lock(&pDb->mutex);
pDb->vgListSize = VG_LIST_SIZE;
pDb->vgList = calloc(pDb->vgListSize, sizeof(SVgObj *));
pthread_mutex_unlock(&pDb->mutex);
pDb->numOfVgroups = 0;
pDb->numOfTables = 0;
pDb->numOfSuperTables = 0;
......@@ -395,8 +398,8 @@ static int32_t mnodeCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate, void *pMs
code = sdbInsertRow(&oper);
if (code != TSDB_CODE_SUCCESS) {
mnodeDestroyDb(pDb);
mLInfo("db:%s, failed to create, reason:%s", pDb->name, tstrerror(code));
mnodeDestroyDb(pDb);
return code;
} else {
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
......@@ -605,7 +608,9 @@ static int32_t mnodeGetDbMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn
static char *mnodeGetDbStr(char *src) {
char *pos = strstr(src, TS_PATH_DELIMITER);
return ++pos;
if (pos != NULL) ++pos;
return pos;
}
static int32_t mnodeRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void *pConn) {
......@@ -622,10 +627,13 @@ static int32_t mnodeRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void
cols = 0;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
char* name = mnodeGetDbStr(pDb->name);
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, name, pShow->bytes[cols]);
if (name != NULL) {
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, name, pShow->bytes[cols]);
} else {
STR_TO_VARSTR(pWrite, "NULL");
}
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
......
......@@ -406,7 +406,7 @@ void sdbDecRef(void *handle, void *pObj) {
int32_t refCount = atomic_sub_fetch_32(pRefCount, 1);
sdbTrace("def ref of table:%s record:%p:%s:%d", pTable->tableName, pObj, sdbGetKeyStrFromObj(pTable, pObj), *pRefCount);
int8_t *updateEnd = pObj + pTable->refCountPos - 1;
int32_t *updateEnd = pObj + pTable->refCountPos - 4;
if (refCount <= 0 && *updateEnd) {
sdbTrace("table:%s, record:%p:%s:%d is destroyed", pTable->tableName, pObj, sdbGetKeyStrFromObj(pTable, pObj), *pRefCount);
SSdbOper oper = {.pObj = pObj};
......@@ -453,7 +453,7 @@ static int32_t sdbInsertHash(SSdbTable *pTable, SSdbOper *pOper) {
keySize = strlen((char *)key);
}
taosHashPut(pTable->iHandle, key, keySize, &pOper->pObj, sizeof(void **));
taosHashPut(pTable->iHandle, key, keySize, &pOper->pObj, sizeof(int64_t));
sdbIncRef(pTable, pOper->pObj);
atomic_add_fetch_32(&pTable->numOfRows, 1);
......@@ -472,6 +472,14 @@ static int32_t sdbInsertHash(SSdbTable *pTable, SSdbOper *pOper) {
}
static int32_t sdbDeleteHash(SSdbTable *pTable, SSdbOper *pOper) {
int32_t *updateEnd = pOper->pObj + pTable->refCountPos - 4;
bool set = atomic_val_compare_exchange_32(updateEnd, 0, 1) == 0;
if (!set) {
sdbError("table:%s, failed to delete record:%s from hash, for it already removed", pTable->tableName,
sdbGetKeyStrFromObj(pTable, pOper->pObj));
return TSDB_CODE_MND_SDB_OBJ_NOT_THERE;
}
(*pTable->deleteFp)(pOper);
void * key = sdbGetObjKey(pTable, pOper->pObj);
......@@ -486,8 +494,6 @@ static int32_t sdbDeleteHash(SSdbTable *pTable, SSdbOper *pOper) {
sdbDebug("table:%s, delete record:%s from hash, numOfRows:%" PRId64 ", msg:%p", pTable->tableName,
sdbGetKeyStrFromObj(pTable, pOper->pObj), pTable->numOfRows, pOper->pMsg);
int8_t *updateEnd = pOper->pObj + pTable->refCountPos - 1;
*updateEnd = 1;
sdbDecRef(pTable, pOper->pObj);
return TSDB_CODE_SUCCESS;
......@@ -654,8 +660,9 @@ bool sdbCheckRowDeleted(void *pTableInput, void *pRow) {
SSdbTable *pTable = pTableInput;
if (pTable == NULL) return false;
int8_t *updateEnd = pRow + pTable->refCountPos - 1;
return (*updateEnd == 1);
int32_t *updateEnd = pRow + pTable->refCountPos - 4;
return atomic_val_compare_exchange_32(updateEnd, 1, 1) == 1;
// return (*updateEnd == 1);
}
int32_t sdbDeleteRow(SSdbOper *pOper) {
......
......@@ -236,7 +236,7 @@ static int32_t mnodeProcessHeartBeatMsg(SMnodeMsg *pMsg) {
}
SCMHeartBeatMsg *pHBMsg = pMsg->rpcMsg.pCont;
SRpcConnInfo connInfo;
SRpcConnInfo connInfo = {0};
rpcGetConnInfo(pMsg->rpcMsg.handle, &connInfo);
int32_t connId = htonl(pHBMsg->connId);
......@@ -284,7 +284,7 @@ static int32_t mnodeProcessConnectMsg(SMnodeMsg *pMsg) {
SCMConnectRsp *pConnectRsp = NULL;
int32_t code = TSDB_CODE_SUCCESS;
SRpcConnInfo connInfo;
SRpcConnInfo connInfo = {0};
if (rpcGetConnInfo(pMsg->rpcMsg.handle, &connInfo) != 0) {
mError("thandle:%p is already released while process connect msg", pMsg->rpcMsg.handle);
code = TSDB_CODE_MND_INVALID_CONNECTION;
......
......@@ -72,7 +72,7 @@ static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg);
static int32_t mnodeProcessDropTableMsg(SMnodeMsg *mnodeMsg);
static int32_t mnodeProcessDropSuperTableMsg(SMnodeMsg *pMsg);
static void mnodeProcessDropSuperTableRsp(SRpcMsg *rpcMsg);
static int32_t mnodeProcessDropChildTableMsg(SMnodeMsg *pMsg, bool needReturn);
static int32_t mnodeProcessDropChildTableMsg(SMnodeMsg *pMsg);
static void mnodeProcessDropChildTableRsp(SRpcMsg *rpcMsg);
static int32_t mnodeProcessSuperTableVgroupMsg(SMnodeMsg *mnodeMsg);
......@@ -759,7 +759,7 @@ static int32_t mnodeProcessDropTableMsg(SMnodeMsg *pMsg) {
SChildTableObj *pCTable = (SChildTableObj *)pMsg->pTable;
mInfo("app:%p:%p, table:%s, start to drop ctable, vgId:%d sid:%d uid:%" PRIu64, pMsg->rpcMsg.ahandle, pMsg,
pDrop->tableId, pCTable->vgId, pCTable->sid, pCTable->uid);
return mnodeProcessDropChildTableMsg(pMsg, true);
return mnodeProcessDropChildTableMsg(pMsg);
}
}
......@@ -882,7 +882,7 @@ static int32_t mnodeProcessCreateSuperTableMsg(SMnodeMsg *pMsg) {
static int32_t mnodeDropSuperTableCb(SMnodeMsg *pMsg, int32_t code) {
SSuperTableObj *pTable = (SSuperTableObj *)pMsg->pTable;
if (code != TSDB_CODE_SUCCESS) {
mError("app:%p:%p, table:%s, failed to drop, sdb error", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId);
mError("app:%p:%p, stable:%s, failed to drop, sdb error", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId);
} else {
mLInfo("app:%p:%p, stable:%s, is dropped from sdb", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId);
}
......@@ -1765,18 +1765,13 @@ static int32_t mnodeProcessCreateChildTableMsg(SMnodeMsg *pMsg) {
}
}
static int32_t mnodeProcessDropChildTableMsg(SMnodeMsg *pMsg, bool needReturn) {
static int32_t mnodeSendDropChildTableMsg(SMnodeMsg *pMsg, bool needReturn) {
SChildTableObj *pTable = (SChildTableObj *)pMsg->pTable;
if (pMsg->pVgroup == NULL) pMsg->pVgroup = mnodeGetVgroup(pTable->vgId);
if (pMsg->pVgroup == NULL) {
mError("app:%p:%p, table:%s, failed to drop ctable, vgroup not exist", pMsg->rpcMsg.ahandle, pMsg,
pTable->info.tableId);
return TSDB_CODE_MND_APP_ERROR;
}
mLInfo("app:%p:%p, ctable:%s, is dropped from sdb", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId);
SMDDropTableMsg *pDrop = rpcMallocCont(sizeof(SMDDropTableMsg));
if (pDrop == NULL) {
mError("app:%p:%p, table:%s, failed to drop ctable, no enough memory", pMsg->rpcMsg.ahandle, pMsg,
mError("app:%p:%p, ctable:%s, failed to drop ctable, no enough memory", pMsg->rpcMsg.ahandle, pMsg,
pTable->info.tableId);
return TSDB_CODE_MND_OUT_OF_MEMORY;
}
......@@ -1789,7 +1784,7 @@ static int32_t mnodeProcessDropChildTableMsg(SMnodeMsg *pMsg, bool needReturn) {
SRpcEpSet epSet = mnodeGetEpSetFromVgroup(pMsg->pVgroup);
mInfo("app:%p:%p, table:%s, send drop ctable msg, vgId:%d sid:%d uid:%" PRIu64, pMsg->rpcMsg.ahandle, pMsg,
mInfo("app:%p:%p, ctable:%s, send drop ctable msg, vgId:%d sid:%d uid:%" PRIu64, pMsg->rpcMsg.ahandle, pMsg,
pDrop->tableId, pTable->vgId, pTable->sid, pTable->uid);
SRpcMsg rpcMsg = {
......@@ -1807,6 +1802,40 @@ static int32_t mnodeProcessDropChildTableMsg(SMnodeMsg *pMsg, bool needReturn) {
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
}
static int32_t mnodeDropChildTableCb(SMnodeMsg *pMsg, int32_t code) {
if (code != TSDB_CODE_SUCCESS) {
SChildTableObj *pTable = (SChildTableObj *)pMsg->pTable;
mError("app:%p:%p, ctable:%s, failed to drop, sdb error", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId);
return code;
}
return mnodeSendDropChildTableMsg(pMsg, true);
}
static int32_t mnodeProcessDropChildTableMsg(SMnodeMsg *pMsg) {
SChildTableObj *pTable = (SChildTableObj *)pMsg->pTable;
if (pMsg->pVgroup == NULL) pMsg->pVgroup = mnodeGetVgroup(pTable->vgId);
if (pMsg->pVgroup == NULL) {
mError("app:%p:%p, table:%s, failed to drop ctable, vgroup not exist", pMsg->rpcMsg.ahandle, pMsg,
pTable->info.tableId);
return TSDB_CODE_MND_APP_ERROR;
}
SSdbOper oper = {
.type = SDB_OPER_GLOBAL,
.table = tsChildTableSdb,
.pObj = pTable,
.pMsg = pMsg,
.cb = mnodeDropChildTableCb
};
int32_t code = sdbDeleteRow(&oper);
if (code == TSDB_CODE_SUCCESS) {
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
}
return code;
}
static int32_t mnodeFindNormalTableColumnIndex(SChildTableObj *pTable, char *colName) {
SSchema *schema = (SSchema *) pTable->schema;
for (int32_t col = 0; col < pTable->numOfColumns; col++) {
......@@ -2220,19 +2249,6 @@ static void mnodeProcessDropChildTableRsp(SRpcMsg *rpcMsg) {
return;
}
SSdbOper oper = {
.type = SDB_OPER_GLOBAL,
.table = tsChildTableSdb,
.pObj = pTable
};
int32_t code = sdbDeleteRow(&oper);
if (code != TSDB_CODE_SUCCESS) {
mError("app:%p:%p, table:%s, update ctables sdb error", mnodeMsg->rpcMsg.ahandle, mnodeMsg, pTable->info.tableId);
dnodeSendRpcMnodeWriteRsp(mnodeMsg, TSDB_CODE_MND_SDB_ERROR);
return;
}
if (mnodeMsg->pVgroup->numOfTables <= 0) {
mInfo("app:%p:%p, vgId:%d, all tables is dropped, drop vgroup", mnodeMsg->rpcMsg.ahandle, mnodeMsg,
mnodeMsg->pVgroup->vgId);
......@@ -2259,7 +2275,7 @@ static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg) {
if (sdbCheckRowDeleted(tsChildTableSdb, pTable)) {
mDebug("app:%p:%p, table:%s, create table rsp received, but a deleting opertion incoming, vgId:%d sid:%d uid:%" PRIu64,
mnodeMsg->rpcMsg.ahandle, mnodeMsg, pTable->info.tableId, pTable->vgId, pTable->sid, pTable->uid);
mnodeProcessDropChildTableMsg(mnodeMsg, false);
mnodeSendDropChildTableMsg(mnodeMsg, false);
rpcMsg->code = TSDB_CODE_SUCCESS;
}
......
......@@ -358,7 +358,7 @@ static int32_t mnodeRetrieveUsers(SShowObj *pShow, char *data, int32_t rows, voi
}
SUserObj *mnodeGetUserFromConn(void *pConn) {
SRpcConnInfo connInfo;
SRpcConnInfo connInfo = {0};
if (rpcGetConnInfo(pConn, &connInfo) == 0) {
return mnodeGetUser(connInfo.user);
} else {
......
......@@ -434,15 +434,22 @@ int32_t mnodeGetAvailableVgroup(SMnodeMsg *pMsg, SVgObj **ppVgroup, int32_t *pSi
}
if (pDb->numOfVgroups < maxVgroupsPerDb) {
mDebug("app:%p:%p, db:%s, try to create a new vgroup, numOfVgroups:%d maxVgroupsPerDb:%d", pMsg->rpcMsg.ahandle, pMsg,
pDb->name, pDb->numOfVgroups, maxVgroupsPerDb);
mDebug("app:%p:%p, db:%s, try to create a new vgroup, numOfVgroups:%d maxVgroupsPerDb:%d", pMsg->rpcMsg.ahandle,
pMsg, pDb->name, pDb->numOfVgroups, maxVgroupsPerDb);
pthread_mutex_unlock(&pDb->mutex);
int32_t code = mnodeCreateVgroup(pMsg);
if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) return code;
if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) {
return code;
} else {
pthread_mutex_lock(&pDb->mutex);
}
}
SVgObj *pVgroup = pDb->vgList[0];
if (pVgroup == NULL) return TSDB_CODE_MND_NO_ENOUGH_DNODES;
if (pVgroup == NULL) {
pthread_mutex_unlock(&pDb->mutex);
return TSDB_CODE_MND_NO_ENOUGH_DNODES;
}
int32_t code = mnodeAllocVgroupIdPool(pVgroup);
if (code != TSDB_CODE_SUCCESS) {
......@@ -483,7 +490,7 @@ static int32_t mnodeCreateVgroupCb(SMnodeMsg *pMsg, int32_t code) {
} else {
pVgroup->status = TAOS_VG_STATUS_READY;
SSdbOper desc = {.type = SDB_OPER_GLOBAL, .pObj = pVgroup, .table = tsVgroupSdb};
sdbUpdateRow(&desc);
(void)sdbUpdateRow(&desc);
}
mInfo("app:%p:%p, vgId:%d, is created in mnode, db:%s replica:%d", pMsg->rpcMsg.ahandle, pMsg, pVgroup->vgId,
......
......@@ -121,6 +121,10 @@ bool gcBuildQueryJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result,
for (int k = 0; k < numOfRows; ++k) {
TAOS_ROW row = taos_fetch_row(result);
if (row == NULL) {
cmd->numOfRows--;
continue;
}
int32_t* length = taos_fetch_lengths(result);
// for group by
......
......@@ -108,7 +108,7 @@ HttpContext *httpCreateContext(int32_t fd) {
pContext->lastAccessTime = taosGetTimestampSec();
pContext->state = HTTP_CONTEXT_STATE_READY;
HttpContext **ppContext = taosCachePut(tsHttpServer.contextCache, &pContext, sizeof(void *), &pContext, sizeof(void *), 3);
HttpContext **ppContext = taosCachePut(tsHttpServer.contextCache, &pContext, sizeof(int64_t), &pContext, sizeof(int64_t), 3);
pContext->ppContext = ppContext;
httpDebug("context:%p, fd:%d, is created, data:%p", pContext, fd, ppContext);
......
......@@ -94,6 +94,10 @@ bool restBuildSqlJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result,
for (int k = 0; k < numOfRows; ++k) {
TAOS_ROW row = taos_fetch_row(result);
if (row == NULL) {
cmd->numOfRows--;
continue;
}
int32_t* length = taos_fetch_lengths(result);
// data row array begin
......
......@@ -64,7 +64,7 @@ int32_t mqttInitSystem() {
}
char* _begin_hostname = strstr(url, recntStatus.hostname);
if (strstr(_begin_hostname, ":") != NULL) {
if (_begin_hostname != NULL && strstr(_begin_hostname, ":") != NULL) {
recntStatus.port = strbetween(_begin_hostname, ":", "/");
} else {
recntStatus.port = strbetween("'1883'", "'", "'");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册