未验证 提交 1ba4b1fb 编写于 作者: S slguan 提交者: GitHub

Merge pull request #1579 from taosdata/feature/vpeer

Feature/vpeer
......@@ -149,7 +149,8 @@ typedef struct _vg_obj {
int32_t lbDnodeId;
int32_t lbTime;
int8_t status;
int8_t reserved[14];
int8_t inUse;
int8_t reserved[13];
int8_t updateEnd[1];
int32_t refCount;
struct _vg_obj *prev, *next;
......@@ -243,6 +244,8 @@ typedef struct {
int8_t received;
int8_t successed;
int8_t expected;
int8_t retry;
int8_t maxRetry;
int32_t contLen;
int32_t code;
void *ahandle;
......
......@@ -76,89 +76,89 @@ TAOS_DEFINE_ERROR(TSDB_CODE_NODE_OFFLINE, 0, 28, "node offline")
TAOS_DEFINE_ERROR(TSDB_CODE_NETWORK_UNAVAIL, 0, 29, "network unavailable")
// db & user
TAOS_DEFINE_ERROR(TSDB_CODE_DB_NOT_SELECTED, 0, 30, "db not selected")
TAOS_DEFINE_ERROR(TSDB_CODE_DB_ALREADY_EXIST, 0, 31, "database aleady exist")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_DB, 0, 32, "invalid database")
TAOS_DEFINE_ERROR(TSDB_CODE_MONITOR_DB_FORBIDDEN, 0, 33, "monitor db forbidden")
TAOS_DEFINE_ERROR(TSDB_CODE_USER_ALREADY_EXIST, 0, 34, "user already exist")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_USER, 0, 35, "invalid user")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_PASS, 0, 36, "invalid password")
TAOS_DEFINE_ERROR(TSDB_CODE_DB_NOT_SELECTED, 0, 100, "db not selected")
TAOS_DEFINE_ERROR(TSDB_CODE_DB_ALREADY_EXIST, 0, 101, "database aleady exist")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_DB, 0, 102, "invalid database")
TAOS_DEFINE_ERROR(TSDB_CODE_MONITOR_DB_FORBIDDEN, 0, 103, "monitor db forbidden")
TAOS_DEFINE_ERROR(TSDB_CODE_USER_ALREADY_EXIST, 0, 104, "user already exist")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_USER, 0, 105, "invalid user")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_PASS, 0, 106, "invalid password")
// table
TAOS_DEFINE_ERROR(TSDB_CODE_TABLE_ALREADY_EXIST, 0, 41, "table already exist")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_TABLE_ID, 0, 42, "invalid table id")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_TABLE_TYPE, 0, 43, "invalid table typee")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_TABLE, 0, 44, "invalid table name")
TAOS_DEFINE_ERROR(TSDB_CODE_NOT_SUPER_TABLE, 0, 45, "no super table") // operation only available for super table
TAOS_DEFINE_ERROR(TSDB_CODE_NOT_ACTIVE_TABLE, 0, 46, "not active table")
TAOS_DEFINE_ERROR(TSDB_CODE_TABLE_ID_MISMATCH, 0, 47, "table id mismatch")
TAOS_DEFINE_ERROR(TSDB_CODE_TABLE_ALREADY_EXIST, 0, 200, "table already exist")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_TABLE_ID, 0, 201, "invalid table id")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_TABLE_TYPE, 0, 202, "invalid table typee")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_TABLE, 0, 203, "invalid table name")
TAOS_DEFINE_ERROR(TSDB_CODE_NOT_SUPER_TABLE, 0, 204, "no super table") // operation only available for super table
TAOS_DEFINE_ERROR(TSDB_CODE_NOT_ACTIVE_TABLE, 0, 205, "not active table")
TAOS_DEFINE_ERROR(TSDB_CODE_TABLE_ID_MISMATCH, 0, 206, "table id mismatch")
// dnode & mnode
TAOS_DEFINE_ERROR(TSDB_CODE_NO_ENOUGH_DNODES, 0, 50, "no enough dnodes")
TAOS_DEFINE_ERROR(TSDB_CODE_DNODE_ALREADY_EXIST, 0, 51, "dnode already exist")
TAOS_DEFINE_ERROR(TSDB_CODE_DNODE_NOT_EXIST, 0, 52, "dnode not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_NO_MASTER, 0, 53, "no master")
TAOS_DEFINE_ERROR(TSDB_CODE_NO_REMOVE_MASTER, 0, 54, "no remove master")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_QUERY_ID, 0, 55, "invalid query id")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_STREAM_ID, 0, 56, "invalid stream id")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_CONNECTION, 0, 57, "invalid connection")
TAOS_DEFINE_ERROR(TSDB_CODE_SDB_ERROR, 0, 58, "sdb error")
TAOS_DEFINE_ERROR(TSDB_CODE_NO_ENOUGH_DNODES, 0, 300, "no enough dnodes")
TAOS_DEFINE_ERROR(TSDB_CODE_DNODE_ALREADY_EXIST, 0, 301, "dnode already exist")
TAOS_DEFINE_ERROR(TSDB_CODE_DNODE_NOT_EXIST, 0, 302, "dnode not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_NO_MASTER, 0, 303, "no master")
TAOS_DEFINE_ERROR(TSDB_CODE_NO_REMOVE_MASTER, 0, 304, "no remove master")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_QUERY_ID, 0, 305, "invalid query id")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_STREAM_ID, 0, 306, "invalid stream id")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_CONNECTION, 0, 307, "invalid connection")
TAOS_DEFINE_ERROR(TSDB_CODE_SDB_ERROR, 0, 308, "sdb error")
// acct
TAOS_DEFINE_ERROR(TSDB_CODE_ACCT_ALREADY_EXIST, 0, 60, "accounts already exist")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_ACCT, 0, 61, "invalid account")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_ACCT_PARAMETER, 0, 62, "invalid account parameter")
TAOS_DEFINE_ERROR(TSDB_CODE_TOO_MANY_ACCTS, 0, 63, "too many accounts")
TAOS_DEFINE_ERROR(TSDB_CODE_TOO_MANY_USERS, 0, 64, "too many users")
TAOS_DEFINE_ERROR(TSDB_CODE_TOO_MANY_TABLES, 0, 65, "too many tables")
TAOS_DEFINE_ERROR(TSDB_CODE_TOO_MANY_DATABASES, 0, 66, "too many databases")
TAOS_DEFINE_ERROR(TSDB_CODE_TOO_MANY_TIME_SERIES, 0, 67, "not enough time series")
TAOS_DEFINE_ERROR(TSDB_CODE_ACCT_ALREADY_EXIST, 0, 400, "accounts already exist")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_ACCT, 0, 401, "invalid account")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_ACCT_PARAMETER, 0, 402, "invalid account parameter")
TAOS_DEFINE_ERROR(TSDB_CODE_TOO_MANY_ACCTS, 0, 403, "too many accounts")
TAOS_DEFINE_ERROR(TSDB_CODE_TOO_MANY_USERS, 0, 404, "too many users")
TAOS_DEFINE_ERROR(TSDB_CODE_TOO_MANY_TABLES, 0, 405, "too many tables")
TAOS_DEFINE_ERROR(TSDB_CODE_TOO_MANY_DATABASES, 0, 406, "too many databases")
TAOS_DEFINE_ERROR(TSDB_CODE_TOO_MANY_TIME_SERIES, 0, 407, "not enough time series")
// grant
TAOS_DEFINE_ERROR(TSDB_CODE_AUTH_FAILURE, 0, 70, "auth failure")
TAOS_DEFINE_ERROR(TSDB_CODE_NO_RIGHTS, 0, 71, "no rights")
TAOS_DEFINE_ERROR(TSDB_CODE_NO_WRITE_ACCESS, 0, 72, "no write access")
TAOS_DEFINE_ERROR(TSDB_CODE_NO_READ_ACCESS, 0, 73, "no read access")
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_EXPIRED, 0, 74, "grant expired")
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_DNODE_LIMITED, 0, 75, "grant dnode limited")
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_ACCT_LIMITED, 0, 76, "grant account limited")
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_TIMESERIES_LIMITED, 0, 77, "grant timeseries limited")
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_DB_LIMITED, 0, 78, "grant db limited")
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_USER_LIMITED, 0, 79, "grant user limited")
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_CONN_LIMITED, 0, 80, "grant conn limited")
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_STREAM_LIMITED, 0, 81, "grant stream limited")
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_SPEED_LIMITED, 0, 82, "grant speed limited")
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_STORAGE_LIMITED, 0, 83, "grant storage limited")
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_QUERYTIME_LIMITED, 0, 84, "grant query time limited")
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_CPU_LIMITED, 0, 85, "grant cpu limited")
TAOS_DEFINE_ERROR(TSDB_CODE_AUTH_FAILURE, 0, 400, "auth failure")
TAOS_DEFINE_ERROR(TSDB_CODE_NO_RIGHTS, 0, 401, "no rights")
TAOS_DEFINE_ERROR(TSDB_CODE_NO_WRITE_ACCESS, 0, 402, "no write access")
TAOS_DEFINE_ERROR(TSDB_CODE_NO_READ_ACCESS, 0, 403, "no read access")
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_EXPIRED, 0, 404, "grant expired")
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_DNODE_LIMITED, 0, 405, "grant dnode limited")
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_ACCT_LIMITED, 0, 406, "grant account limited")
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_TIMESERIES_LIMITED, 0, 407, "grant timeseries limited")
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_DB_LIMITED, 0, 408, "grant db limited")
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_USER_LIMITED, 0, 409, "grant user limited")
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_CONN_LIMITED, 0, 410, "grant conn limited")
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_STREAM_LIMITED, 0, 411, "grant stream limited")
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_SPEED_LIMITED, 0, 412, "grant speed limited")
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_STORAGE_LIMITED, 0, 413, "grant storage limited")
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_QUERYTIME_LIMITED, 0, 414, "grant query time limited")
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_CPU_LIMITED, 0, 415, "grant cpu limited")
// server
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_VGROUP_ID, 0, 90, "invalid vgroup id")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_VNODE_ID, 0, 91, "invalid vnode id")
TAOS_DEFINE_ERROR(TSDB_CODE_NOT_ACTIVE_VNODE, 0, 92, "not active vnode")
TAOS_DEFINE_ERROR(TSDB_CODE_VG_INIT_FAILED, 0, 93, "vg init failed")
TAOS_DEFINE_ERROR(TSDB_CODE_SERV_NO_DISKSPACE, 0, 94, "server no diskspace")
TAOS_DEFINE_ERROR(TSDB_CODE_SERV_OUT_OF_MEMORY, 0, 95, "server out of memory")
TAOS_DEFINE_ERROR(TSDB_CODE_NO_DISK_PERMISSIONS, 0, 96, "no disk permissions")
TAOS_DEFINE_ERROR(TSDB_CODE_FILE_CORRUPTED, 0, 97, "file corrupted")
TAOS_DEFINE_ERROR(TSDB_CODE_MEMORY_CORRUPTED, 0, 98, "memory corrupted")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_VGROUP_ID, 0, 500, "invalid vgroup id")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_VNODE_ID, 0, 501, "invalid vnode id")
TAOS_DEFINE_ERROR(TSDB_CODE_NOT_ACTIVE_VNODE, 0, 502, "not active vnode")
TAOS_DEFINE_ERROR(TSDB_CODE_VG_INIT_FAILED, 0, 503, "vg init failed")
TAOS_DEFINE_ERROR(TSDB_CODE_SERV_NO_DISKSPACE, 0, 504, "server no diskspace")
TAOS_DEFINE_ERROR(TSDB_CODE_SERV_OUT_OF_MEMORY, 0, 505, "server out of memory")
TAOS_DEFINE_ERROR(TSDB_CODE_NO_DISK_PERMISSIONS, 0, 506, "no disk permissions")
TAOS_DEFINE_ERROR(TSDB_CODE_FILE_CORRUPTED, 0, 507, "file corrupted")
TAOS_DEFINE_ERROR(TSDB_CODE_MEMORY_CORRUPTED, 0, 508, "memory corrupted")
// client
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_CLIENT_VERSION, 0, 101, "invalid client version")
TAOS_DEFINE_ERROR(TSDB_CODE_CLI_OUT_OF_MEMORY, 0, 102, "client out of memory")
TAOS_DEFINE_ERROR(TSDB_CODE_CLI_NO_DISKSPACE, 0, 103, "client no disk space")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_TIME_STAMP, 0, 104, "invalid timestamp")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_SQL, 0, 105, "invalid sql")
TAOS_DEFINE_ERROR(TSDB_CODE_QUERY_CACHE_ERASED, 0, 106, "query cache erased")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_QUERY_MSG, 0, 107, "invalid query message") // failed to validate the sql expression msg by vnode
TAOS_DEFINE_ERROR(TSDB_CODE_SORTED_RES_TOO_MANY, 0, 108, "sorted res too many") // too many result for ordered super table projection query
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_QHANDLE, 0, 109, "invalid handle")
TAOS_DEFINE_ERROR(TSDB_CODE_QUERY_CANCELLED, 0, 110, "query cancelled")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_IE, 0, 111, "invalid ie")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_VALUE, 0, 112, "invalid value")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_CLIENT_VERSION, 0, 601, "invalid client version")
TAOS_DEFINE_ERROR(TSDB_CODE_CLI_OUT_OF_MEMORY, 0, 602, "client out of memory")
TAOS_DEFINE_ERROR(TSDB_CODE_CLI_NO_DISKSPACE, 0, 603, "client no disk space")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_TIME_STAMP, 0, 604, "invalid timestamp")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_SQL, 0, 605, "invalid sql")
TAOS_DEFINE_ERROR(TSDB_CODE_QUERY_CACHE_ERASED, 0, 606, "query cache erased")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_QUERY_MSG, 0, 607, "invalid query message") // failed to validate the sql expression msg by vnode
TAOS_DEFINE_ERROR(TSDB_CODE_SORTED_RES_TOO_MANY, 0, 608, "sorted res too many") // too many result for ordered super table projection query
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_QHANDLE, 0, 609, "invalid handle")
TAOS_DEFINE_ERROR(TSDB_CODE_QUERY_CANCELLED, 0, 610, "query cancelled")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_IE, 0, 611, "invalid ie")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_VALUE, 0, 612, "invalid value")
// others
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_FILE_FORMAT, 0, 120, "invalid file format")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_FILE_FORMAT, 0, 700, "invalid file format")
#ifdef TAOS_ERROR_C
......
......@@ -510,7 +510,6 @@ typedef struct SRetrieveTableRsp {
typedef struct {
int32_t vgId;
int32_t vnode;
int64_t totalStorage;
int64_t compStorage;
int64_t pointsWritten;
......
......@@ -31,6 +31,7 @@ void mgmtAddShellShowMetaHandle(uint8_t showType, SShowMetaFp fp);
void mgmtAddShellShowRetrieveHandle(uint8_t showType, SShowRetrieveFp fp);
void mgmtAddToShellQueue(SQueuedMsg *queuedMsg);
void mgmtDealyedAddToShellQueue(SQueuedMsg *queuedMsg);
void mgmtSendSimpleResp(void *thandle, int32_t code);
#ifdef __cplusplus
......
......@@ -37,6 +37,7 @@ void mgmtDropAllVgroups(SDbObj *pDropDb);
void * mgmtGetNextVgroup(void *pNode, SVgObj **pVgroup);
void mgmtUpdateVgroup(SVgObj *pVgroup);
void mgmtUpdateVgroupStatus(SVgObj *pVgroup, int32_t dnodeId, SVnodeLoad *pVload);
void mgmtCreateVgroup(SQueuedMsg *pMsg, SDbObj *pDb);
void mgmtDropVgroup(SVgObj *pVgroup, void *ahandle);
......
......@@ -189,18 +189,21 @@ void clusterProcessDnodeStatusMsg(SRpcMsg *rpcMsg) {
int32_t openVnodes = htons(pStatus->openVnodes);
for (int32_t j = 0; j < openVnodes; ++j) {
pDnode->vload[j].vgId = htonl(pStatus->load[j].vgId);
pDnode->vload[j].totalStorage = htobe64(pStatus->load[j].totalStorage);
pDnode->vload[j].compStorage = htobe64(pStatus->load[j].compStorage);
pDnode->vload[j].pointsWritten = htobe64(pStatus->load[j].pointsWritten);
SVnodeLoad *pVload = &pStatus->load[j];
pDnode->vload[j].vgId = htonl(pVload->vgId);
pDnode->vload[j].totalStorage = htobe64(pVload->totalStorage);
pDnode->vload[j].compStorage = htobe64(pVload->compStorage);
pDnode->vload[j].pointsWritten = htobe64(pVload->pointsWritten);
SVgObj *pVgroup = mgmtGetVgroup(pDnode->vload[j].vgId);
if (pVgroup == NULL) {
SRpcIpSet ipSet = mgmtGetIpSetFromIp(pDnode->privateIp);
mPrint("dnode:%d, vgroup:%d not exist in mnode, drop it", pDnode->dnodeId, pDnode->vload[j].vgId);
mgmtSendDropVnodeMsg(pDnode->vload[j].vgId, &ipSet, NULL);
} else {
mgmtUpdateVgroupStatus(pVgroup, pDnode->dnodeId, pVload);
mgmtReleaseVgroup(pVgroup);
}
mgmtReleaseVgroup(pVgroup);
}
if (pDnode->status == TAOS_DN_STATUS_OFFLINE) {
......
......@@ -806,6 +806,8 @@ void* mgmtCloneQueuedMsg(SQueuedMsg *pSrcMsg) {
pDestMsg->msgType = pSrcMsg->msgType;
pDestMsg->pCont = pSrcMsg->pCont;
pDestMsg->contLen = pSrcMsg->contLen;
pDestMsg->retry = pSrcMsg->retry;
pDestMsg->maxRetry= pSrcMsg->maxRetry;
pDestMsg->pUser = pSrcMsg->pUser;
pDestMsg->usePublicIp = pSrcMsg->usePublicIp;
......
......@@ -128,6 +128,15 @@ void mgmtAddToShellQueue(SQueuedMsg *queuedMsg) {
taosScheduleTask(tsMgmtTranQhandle, &schedMsg);
}
static void mgmtDoDealyedAddToShellQueue(void *param, void *tmrId) {
mgmtAddToShellQueue(param);
}
void mgmtDealyedAddToShellQueue(SQueuedMsg *queuedMsg) {
void *unUsed = NULL;
taosTmrReset(mgmtDoDealyedAddToShellQueue, 1000, queuedMsg, tsMgmtTmr, &unUsed);
}
static void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg) {
if (rpcMsg == NULL || rpcMsg->pCont == NULL) {
return;
......
......@@ -540,7 +540,7 @@ static void mgmtProcessCreateTableMsg(SQueuedMsg *pMsg) {
SCMCreateTableMsg *pCreate = pMsg->pCont;
pMsg->pTable = mgmtGetTable(pCreate->tableId);
if (pMsg->pTable != NULL) {
if (pMsg->pTable != NULL && pMsg->retry == 0) {
if (pCreate->igExists) {
mTrace("table:%s, is already exist", pCreate->tableId);
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_SUCCESS);
......@@ -1300,7 +1300,11 @@ static void mgmtProcessCreateChildTableMsg(SQueuedMsg *pMsg) {
return;
}
pMsg->pTable = (STableObj *)mgmtDoCreateChildTable(pCreate, pVgroup, sid);
if (pMsg->retry == 0) {
pMsg->pTable = (STableObj *)mgmtDoCreateChildTable(pCreate, pVgroup, sid);
} else {
pMsg->pTable = mgmtGetTable(pCreate->tableId);
}
if (pMsg->pTable == NULL) {
mgmtSendSimpleResp(pMsg->thandle, terrno);
return;
......@@ -1315,6 +1319,7 @@ static void mgmtProcessCreateChildTableMsg(SQueuedMsg *pMsg) {
SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup);
SQueuedMsg *newMsg = mgmtCloneQueuedMsg(pMsg);
newMsg->ahandle = pMsg->pTable;
newMsg->maxRetry = 5;
mgmtIncTableRef(pMsg->pTable);
SRpcMsg rpcMsg = {
.handle = newMsg,
......@@ -1737,30 +1742,40 @@ static void mgmtProcessCreateChildTableRsp(SRpcMsg *rpcMsg) {
queueMsg->received++;
SChildTableObj *pTable = queueMsg->ahandle;
mTrace("table:%s, create table rsp received, thandle:%p ahandle:%p result:%s", pTable->info.tableId, queueMsg->thandle,
rpcMsg->handle, tstrerror(rpcMsg->code));
mTrace("table:%s, create table rsp received, thandle:%p result:%s", pTable->info.tableId, queueMsg->thandle,
tstrerror(rpcMsg->code));
if (rpcMsg->code != TSDB_CODE_SUCCESS) {
SSdbOperDesc oper = {
.type = SDB_OPER_TYPE_GLOBAL,
.table = tsChildTableSdb,
.pObj = pTable
};
sdbDeleteRow(&oper);
if (queueMsg->retry++ < queueMsg->maxRetry) {
mTrace("table:%s, create table rsp received, retry:%d thandle:%p result:%s", pTable->info.tableId,
queueMsg->retry, queueMsg->thandle, tstrerror(rpcMsg->code));
mgmtDealyedAddToShellQueue(queueMsg);
} else {
mError("table:%s, failed to create in dnode, thandle:%p result:%s", pTable->info.tableId,
queueMsg->thandle, tstrerror(rpcMsg->code));
SSdbOperDesc oper = {
.type = SDB_OPER_TYPE_GLOBAL,
.table = tsChildTableSdb,
.pObj = pTable
};
sdbDeleteRow(&oper);
mError("table:%s, failed to create in dnode, reason:%s", pTable->info.tableId, tstrerror(rpcMsg->code));
mgmtSendSimpleResp(queueMsg->thandle, rpcMsg->code);
mgmtSendSimpleResp(queueMsg->thandle, rpcMsg->code);
mgmtFreeQueuedMsg(queueMsg);
}
} else {
mTrace("table:%s, created in dnode", pTable->info.tableId);
mTrace("table:%s, created in dnode, thandle:%p result:%s", pTable->info.tableId, queueMsg->thandle,
tstrerror(rpcMsg->code));
if (queueMsg->msgType != TSDB_MSG_TYPE_CM_CREATE_TABLE) {
mTrace("table:%s, start to get meta", pTable->info.tableId);
mgmtAddToShellQueue(mgmtCloneQueuedMsg(queueMsg));
mgmtAddToShellQueue(queueMsg);
} else {
mgmtSendSimpleResp(queueMsg->thandle, rpcMsg->code);
mgmtFreeQueuedMsg(queueMsg);
}
}
mgmtFreeQueuedMsg(queueMsg);
}
// not implemented yet
......
......@@ -18,6 +18,7 @@
#include "taoserror.h"
#include "tlog.h"
#include "tbalance.h"
#include "tsync.h"
#include "tcluster.h"
#include "mnode.h"
#include "mgmtDb.h"
......@@ -209,6 +210,18 @@ void mgmtUpdateVgroup(SVgObj *pVgroup) {
mgmtSendCreateVgroupMsg(pVgroup, NULL);
}
void mgmtUpdateVgroupStatus(SVgObj *pVgroup, int32_t dnodeId, SVnodeLoad *pVload) {
if (pVload->role == TAOS_SYNC_ROLE_MASTER) {
for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
SVnodeGid *pVgid = &pVgroup->vnodeGid[i];
if (pVgid->dnodeId == dnodeId) {
pVgroup->inUse = i;
break;
}
}
}
}
SVgObj *mgmtGetAvailableVgroup(SDbObj *pDb) {
return pDb->pHead;
}
......
......@@ -287,7 +287,6 @@ static void vnodeBuildVloadMsg(char *pNode, void * param) {
SVnodeLoad *pLoad = &pStatus->load[pStatus->openVnodes++];
pLoad->vgId = htonl(pVnode->vgId);
pLoad->vnode = htonl(pVnode->vgId);
pLoad->status = pVnode->status;
pLoad->role = pVnode->role;
}
......
......@@ -57,7 +57,8 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) {
if (pVnode->status != TAOS_VN_STATUS_READY)
return TSDB_CODE_NOT_ACTIVE_VNODE;
// if (pVnode->replica > 1 && pVnode->role != TAOS_SYNC_ROLE_MASTER)
if (pVnode->syncCfg.replica > 1 && pVnode->role != TAOS_SYNC_ROLE_MASTER)
return TSDB_CODE_NO_MASTER;
// assign version
pVnode->version++;
......@@ -109,36 +110,27 @@ static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRe
int32_t code = 0;
dTrace("pVnode:%p vgId:%d, table:%s, start to create", pVnode, pVnode->vgId, pTable->tableId);
pTable->numOfColumns = htons(pTable->numOfColumns);
pTable->numOfTags = htons(pTable->numOfTags);
pTable->sid = htonl(pTable->sid);
pTable->sversion = htonl(pTable->sversion);
pTable->tagDataLen = htonl(pTable->tagDataLen);
pTable->sqlDataLen = htonl(pTable->sqlDataLen);
pTable->uid = htobe64(pTable->uid);
pTable->superTableUid = htobe64(pTable->superTableUid);
pTable->createdTime = htobe64(pTable->createdTime);
int16_t numOfColumns = htons(pTable->numOfColumns);
int16_t numOfTags = htons(pTable->numOfTags);
int32_t sid = htonl(pTable->sid);
uint64_t uid = htobe64(pTable->uid);
SSchema *pSchema = (SSchema *) pTable->data;
int totalCols = pTable->numOfColumns + pTable->numOfTags;
for (int i = 0; i < totalCols; i++) {
pSchema[i].colId = htons(pSchema[i].colId);
pSchema[i].bytes = htons(pSchema[i].bytes);
}
int32_t totalCols = numOfColumns + numOfTags;
STableCfg tCfg;
tsdbInitTableCfg(&tCfg, pTable->tableType, pTable->uid, pTable->sid);
tsdbInitTableCfg(&tCfg, pTable->tableType, uid, sid);
STSchema *pDestSchema = tdNewSchema(pTable->numOfColumns);
for (int i = 0; i < pTable->numOfColumns; i++) {
tdSchemaAppendCol(pDestSchema, pSchema[i].type, pSchema[i].colId, pSchema[i].bytes);
STSchema *pDestSchema = tdNewSchema(numOfColumns);
for (int i = 0; i < numOfColumns; i++) {
tdSchemaAppendCol(pDestSchema, pSchema[i].type, htons(pSchema[i].colId), htons(pSchema[i].bytes));
}
tsdbTableSetSchema(&tCfg, pDestSchema, false);
if (pTable->numOfTags != 0) {
STSchema *pDestTagSchema = tdNewSchema(pTable->numOfTags);
for (int i = pTable->numOfColumns; i < totalCols; i++) {
tdSchemaAppendCol(pDestTagSchema, pSchema[i].type, pSchema[i].colId, pSchema[i].bytes);
if (numOfTags != 0) {
STSchema *pDestTagSchema = tdNewSchema(numOfTags);
for (int i = numOfColumns; i < totalCols; i++) {
tdSchemaAppendCol(pDestTagSchema, pSchema[i].type, htons(pSchema[i].colId), htons(pSchema[i].bytes));
}
tsdbTableSetTagSchema(&tCfg, pDestTagSchema, false);
......@@ -146,9 +138,9 @@ static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRe
int accumBytes = 0;
SDataRow dataRow = tdNewDataRowFromSchema(pDestTagSchema);
for (int i = 0; i < pTable->numOfTags; i++) {
for (int i = 0; i < numOfTags; i++) {
tdAppendColVal(dataRow, pTagData + accumBytes, pDestTagSchema->columns + i);
accumBytes += pSchema[i + pTable->numOfColumns].bytes;
accumBytes += htons(pSchema[i + numOfColumns].bytes);
}
tsdbTableSetTagValue(&tCfg, dataRow, false);
}
......@@ -182,51 +174,46 @@ static int32_t vnodeProcessAlterTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet
int32_t code = 0;
dTrace("pVnode:%p vgId:%d, table:%s, start to alter", pVnode, pVnode->vgId, pTable->tableId);
pTable->numOfColumns = htons(pTable->numOfColumns);
pTable->numOfTags = htons(pTable->numOfTags);
pTable->sid = htonl(pTable->sid);
pTable->sversion = htonl(pTable->sversion);
pTable->tagDataLen = htonl(pTable->tagDataLen);
pTable->sqlDataLen = htonl(pTable->sqlDataLen);
pTable->uid = htobe64(pTable->uid);
pTable->superTableUid = htobe64(pTable->superTableUid);
pTable->createdTime = htobe64(pTable->createdTime);
int16_t numOfColumns = htons(pTable->numOfColumns);
int16_t numOfTags = htons(pTable->numOfTags);
int32_t sid = htonl(pTable->sid);
uint64_t uid = htobe64(pTable->uid);
SSchema *pSchema = (SSchema *) pTable->data;
int totalCols = pTable->numOfColumns + pTable->numOfTags;
for (int i = 0; i < totalCols; i++) {
pSchema[i].colId = htons(pSchema[i].colId);
pSchema[i].bytes = htons(pSchema[i].bytes);
}
int32_t totalCols = numOfColumns + numOfTags;
STableCfg tCfg;
tsdbInitTableCfg(&tCfg, pTable->tableType, pTable->uid, pTable->sid);
tsdbInitTableCfg(&tCfg, pTable->tableType, uid, sid);
STSchema *pDestSchema = tdNewSchema(pTable->numOfColumns);
for (int i = 0; i < pTable->numOfColumns; i++) {
tdSchemaAppendCol(pDestSchema, pSchema[i].type, pSchema[i].colId, pSchema[i].bytes);
STSchema *pDestSchema = tdNewSchema(numOfColumns);
for (int i = 0; i < numOfColumns; i++) {
tdSchemaAppendCol(pDestSchema, pSchema[i].type, htons(pSchema[i].colId), htons(pSchema[i].bytes));
}
tsdbTableSetSchema(&tCfg, pDestSchema, false);
if (pTable->numOfTags != 0) {
STSchema *pDestTagSchema = tdNewSchema(pTable->numOfTags);
for (int i = pTable->numOfColumns; i < totalCols; i++) {
tdSchemaAppendCol(pDestTagSchema, pSchema[i].type, pSchema[i].colId, pSchema[i].bytes);
if (numOfTags != 0) {
STSchema *pDestTagSchema = tdNewSchema(numOfTags);
for (int i = numOfColumns; i < totalCols; i++) {
tdSchemaAppendCol(pDestTagSchema, pSchema[i].type, htons(pSchema[i].colId), htons(pSchema[i].bytes));
}
tsdbTableSetSchema(&tCfg, pDestTagSchema, false);
tsdbTableSetTagSchema(&tCfg, pDestTagSchema, false);
char *pTagData = pTable->data + totalCols * sizeof(SSchema);
int accumBytes = 0;
SDataRow dataRow = tdNewDataRowFromSchema(pDestTagSchema);
for (int i = 0; i < pTable->numOfTags; i++) {
for (int i = 0; i < numOfTags; i++) {
tdAppendColVal(dataRow, pTagData + accumBytes, pDestTagSchema->columns + i);
accumBytes += pSchema[i + pTable->numOfColumns].bytes;
accumBytes += htons(pSchema[i + numOfColumns].bytes);
}
tsdbTableSetTagValue(&tCfg, dataRow, false);
}
code = tsdbAlterTable(pVnode->tsdb, &tCfg);
void *pTsdb = vnodeGetTsdb(pVnode);
code = tsdbAlterTable(pTsdb, &tCfg);
tfree(pDestSchema);
dTrace("pVnode:%p vgId:%d, table:%s, alter table result:%d", pVnode, pVnode->vgId, pTable->tableId, code);
return code;
......@@ -237,7 +224,7 @@ static int32_t vnodeProcessDropStableMsg(SVnodeObj *pVnode, void *pCont, SRspRet
int32_t code = 0;
dTrace("pVnode:%p vgId:%d, stable:%s, start to drop", pVnode, pVnode->vgId, pTable->tableId);
pTable->uid = htobe64(pTable->uid);
// int64_t uid = htobe64(pTable->uid);
// TODO: drop stable in vvnode
//void *pTsdb = dnodeGetVnodeTsdb(pMsg->pVnode);
......
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -m 192.168.0.1 -i 192.168.0.1
system sh/deploy.sh -n dnode2 -m 192.168.0.1 -i 192.168.0.2
\ No newline at end of file
system sh/deploy.sh -n dnode2 -m 192.168.0.1 -i 192.168.0.2
system sh/deploy.sh -n dnode3 -m 192.168.0.1 -i 192.168.0.3
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册