提交 d4bb3fd6 编写于 作者: S Shengliang Guan

[TD-570] add vgroup/table into sdb write queue

上级 fc82a732
...@@ -353,7 +353,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd ...@@ -353,7 +353,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
pRes->numOfGroups = 0; pRes->numOfGroups = 0;
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);; STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
TSKEY stime = MIN(pQueryInfo->window.skey, pQueryInfo->window.ekey); TSKEY stime = MIN(pQueryInfo->window.skey, pQueryInfo->window.ekey);
int64_t revisedSTime = int64_t revisedSTime =
......
...@@ -1473,7 +1473,7 @@ int tscBuildTableMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -1473,7 +1473,7 @@ int tscBuildTableMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pMsg += len; pMsg += len;
} }
pCmd->payloadLen = pMsg - (char*)pInfoMsg;; pCmd->payloadLen = pMsg - (char*)pInfoMsg;
pCmd->msgType = TSDB_MSG_TYPE_CM_TABLE_META; pCmd->msgType = TSDB_MSG_TYPE_CM_TABLE_META;
tfree(tmpData); tfree(tmpData);
......
...@@ -499,7 +499,7 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p ...@@ -499,7 +499,7 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
if (pSql->sqlstr == NULL) { if (pSql->sqlstr == NULL) {
tscError("%p failed to malloc sql string buffer", pSql); tscError("%p failed to malloc sql string buffer", pSql);
tscFreeSqlObj(pSql); tscFreeSqlObj(pSql);
return NULL;; return NULL;
} }
strtolower(pSql->sqlstr, sqlstr); strtolower(pSql->sqlstr, sqlstr);
......
...@@ -53,7 +53,7 @@ typedef struct { ...@@ -53,7 +53,7 @@ typedef struct {
void * rowData; void * rowData;
int32_t rowSize; int32_t rowSize;
int32_t retCode; // for callback in sdb queue int32_t retCode; // for callback in sdb queue
void (*cb)(struct SMnodeMsg *pMsg, int32_t code); int32_t (*cb)(struct SMnodeMsg *pMsg, int32_t code);
struct SMnodeMsg *pMsg; struct SMnodeMsg *pMsg;
} SSdbOper; } SSdbOper;
......
...@@ -366,7 +366,7 @@ static int32_t mnodeCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate, void *pMs ...@@ -366,7 +366,7 @@ static int32_t mnodeCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate, void *pMs
code = sdbInsertRow(&oper); code = sdbInsertRow(&oper);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
tfree(pDb);; tfree(pDb);
} else { } else {
mLPrint("db:%s, is created by %s", pDb->name, mnodeGetUserFromMsg(pMsg)); mLPrint("db:%s, is created by %s", pDb->name, mnodeGetUserFromMsg(pMsg));
if (pMsg != NULL) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; if (pMsg != NULL) code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
...@@ -888,8 +888,8 @@ static SDbCfg mnodeGetAlterDbOption(SDbObj *pDb, SCMAlterDbMsg *pAlter) { ...@@ -888,8 +888,8 @@ static SDbCfg mnodeGetAlterDbOption(SDbObj *pDb, SCMAlterDbMsg *pAlter) {
return newCfg; return newCfg;
} }
static void mnodeAlterDbCb(SMnodeMsg *pMsg, int32_t code) { static int32_t mnodeAlterDbCb(SMnodeMsg *pMsg, int32_t code) {
if (code != TSDB_CODE_SUCCESS) return; if (code != TSDB_CODE_SUCCESS) return code;
SDbObj *pDb = pMsg->pDb; SDbObj *pDb = pMsg->pDb;
void *pIter = NULL; void *pIter = NULL;
...@@ -908,6 +908,8 @@ static void mnodeAlterDbCb(SMnodeMsg *pMsg, int32_t code) { ...@@ -908,6 +908,8 @@ static void mnodeAlterDbCb(SMnodeMsg *pMsg, int32_t code) {
mLPrint("db:%s, is alterd by %s", pDb->name, mnodeGetUserFromMsg(pMsg)); mLPrint("db:%s, is alterd by %s", pDb->name, mnodeGetUserFromMsg(pMsg));
balanceNotify(); balanceNotify();
return TSDB_CODE_SUCCESS;
} }
static int32_t mnodeAlterDb(SDbObj *pDb, SCMAlterDbMsg *pAlter, void *pMsg) { static int32_t mnodeAlterDb(SDbObj *pDb, SCMAlterDbMsg *pAlter, void *pMsg) {
......
...@@ -962,9 +962,7 @@ static void *sdbWorkerFp(void *param) { ...@@ -962,9 +962,7 @@ static void *sdbWorkerFp(void *param) {
} }
int32_t code = sdbWrite(pOper, pHead, type); int32_t code = sdbWrite(pOper, pHead, type);
if (pOper && code != TSDB_CODE_SUCCESS) { if (pOper) pOper->retCode = code;
pOper->retCode = code;
}
} }
walFsync(tsSdbObj.wal); walFsync(tsSdbObj.wal);
...@@ -976,7 +974,7 @@ static void *sdbWorkerFp(void *param) { ...@@ -976,7 +974,7 @@ static void *sdbWorkerFp(void *param) {
if (type == TAOS_QTYPE_RPC) { if (type == TAOS_QTYPE_RPC) {
pOper = (SSdbOper *)item; pOper = (SSdbOper *)item;
if (pOper->cb) { if (pOper->cb) {
(*pOper->cb)(pOper->pMsg, pOper->retCode); pOper->retCode = (*pOper->cb)(pOper->pMsg, pOper->retCode);
} }
dnodeSendRpcMnodeWriteRsp(pOper->pMsg, pOper->retCode); dnodeSendRpcMnodeWriteRsp(pOper->pMsg, pOper->retCode);
} }
......
...@@ -797,18 +797,20 @@ static int32_t mnodeProcessCreateSuperTableMsg(SMnodeMsg *pMsg) { ...@@ -797,18 +797,20 @@ static int32_t mnodeProcessCreateSuperTableMsg(SMnodeMsg *pMsg) {
.type = SDB_OPER_GLOBAL, .type = SDB_OPER_GLOBAL,
.table = tsSuperTableSdb, .table = tsSuperTableSdb,
.pObj = pStable, .pObj = pStable,
.rowSize = sizeof(SSuperTableObj) + schemaSize .rowSize = sizeof(SSuperTableObj) + schemaSize,
.pMsg = pMsg
}; };
int32_t code = sdbInsertRow(&oper); int32_t code = sdbInsertRow(&oper);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
mnodeDestroySuperTable(pStable); mnodeDestroySuperTable(pStable);
mError("table:%s, failed to create, sdb error", pCreate->tableId); mError("table:%s, failed to create, sdb error", pCreate->tableId);
return TSDB_CODE_MND_SDB_ERROR;
} else { } else {
mLPrint("table:%s, is created, tags:%d fields:%d", pStable->info.tableId, pStable->numOfTags, pStable->numOfColumns); mLPrint("table:%s, is created, tags:%d fields:%d", pStable->info.tableId, pStable->numOfTags, pStable->numOfColumns);
return TSDB_CODE_SUCCESS; if (pMsg != NULL) code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
} }
return code;
} }
static int32_t mnodeProcessDropSuperTableMsg(SMnodeMsg *pMsg) { static int32_t mnodeProcessDropSuperTableMsg(SMnodeMsg *pMsg) {
...@@ -840,11 +842,16 @@ static int32_t mnodeProcessDropSuperTableMsg(SMnodeMsg *pMsg) { ...@@ -840,11 +842,16 @@ static int32_t mnodeProcessDropSuperTableMsg(SMnodeMsg *pMsg) {
SSdbOper oper = { SSdbOper oper = {
.type = SDB_OPER_GLOBAL, .type = SDB_OPER_GLOBAL,
.table = tsSuperTableSdb, .table = tsSuperTableSdb,
.pObj = pStable .pObj = pStable,
.pMsg = pMsg
}; };
int32_t code = sdbDeleteRow(&oper); int32_t code = sdbDeleteRow(&oper);
mLPrint("stable:%s, is dropped from sdb, result:%s", pStable->info.tableId, tstrerror(code)); if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mLPrint("stable:%s, is dropped from sdb, result:%s", pStable->info.tableId, tstrerror(code));
if (pMsg != NULL) code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
}
return code; return code;
} }
...@@ -859,7 +866,7 @@ static int32_t mnodeFindSuperTableTagIndex(SSuperTableObj *pStable, const char * ...@@ -859,7 +866,7 @@ static int32_t mnodeFindSuperTableTagIndex(SSuperTableObj *pStable, const char *
return -1; return -1;
} }
static int32_t mnodeAddSuperTableTag(SSuperTableObj *pStable, SSchema schema[], int32_t ntags) { static int32_t mnodeAddSuperTableTag(SMnodeMsg *pMsg, SSuperTableObj *pStable, SSchema schema[], int32_t ntags) {
if (pStable->numOfTags + ntags > TSDB_MAX_TAGS) { if (pStable->numOfTags + ntags > TSDB_MAX_TAGS) {
mError("stable:%s, add tag, too many tags", pStable->info.tableId); mError("stable:%s, add tag, too many tags", pStable->info.tableId);
return TSDB_CODE_MND_TOO_MANY_TAGS; return TSDB_CODE_MND_TOO_MANY_TAGS;
...@@ -893,19 +900,20 @@ static int32_t mnodeAddSuperTableTag(SSuperTableObj *pStable, SSchema schema[], ...@@ -893,19 +900,20 @@ static int32_t mnodeAddSuperTableTag(SSuperTableObj *pStable, SSchema schema[],
SSdbOper oper = { SSdbOper oper = {
.type = SDB_OPER_GLOBAL, .type = SDB_OPER_GLOBAL,
.table = tsSuperTableSdb, .table = tsSuperTableSdb,
.pObj = pStable .pObj = pStable,
.pMsg = pMsg
}; };
int32_t code = sdbUpdateRow(&oper); int32_t code = sdbUpdateRow(&oper);
if (code != TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
return TSDB_CODE_MND_SDB_ERROR; mPrint("stable %s, succeed to add tag %s", pStable->info.tableId, schema[0].name);
code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
} }
mPrint("stable %s, succeed to add tag %s", pStable->info.tableId, schema[0].name); return code;
return TSDB_CODE_SUCCESS;
} }
static int32_t mnodeDropSuperTableTag(SSuperTableObj *pStable, char *tagName) { static int32_t mnodeDropSuperTableTag(SMnodeMsg *pMsg, SSuperTableObj *pStable, char *tagName) {
int32_t col = mnodeFindSuperTableTagIndex(pStable, tagName); int32_t col = mnodeFindSuperTableTagIndex(pStable, tagName);
if (col < 0) { if (col < 0) {
mError("stable:%s, drop tag, tag:%s not exist", pStable->info.tableId, tagName); mError("stable:%s, drop tag, tag:%s not exist", pStable->info.tableId, tagName);
...@@ -920,19 +928,20 @@ static int32_t mnodeDropSuperTableTag(SSuperTableObj *pStable, char *tagName) { ...@@ -920,19 +928,20 @@ static int32_t mnodeDropSuperTableTag(SSuperTableObj *pStable, char *tagName) {
SSdbOper oper = { SSdbOper oper = {
.type = SDB_OPER_GLOBAL, .type = SDB_OPER_GLOBAL,
.table = tsSuperTableSdb, .table = tsSuperTableSdb,
.pObj = pStable .pObj = pStable,
.pMsg = pMsg
}; };
int32_t code = sdbUpdateRow(&oper); int32_t code = sdbUpdateRow(&oper);
if (code != TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
return TSDB_CODE_MND_SDB_ERROR; mPrint("stable %s, succeed to drop tag %s", pStable->info.tableId, tagName);
code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
} }
mPrint("stable %s, succeed to drop tag %s", pStable->info.tableId, tagName); return code;
return TSDB_CODE_SUCCESS;
} }
static int32_t mnodeModifySuperTableTagName(SSuperTableObj *pStable, char *oldTagName, char *newTagName) { static int32_t mnodeModifySuperTableTagName(SMnodeMsg *pMsg, SSuperTableObj *pStable, char *oldTagName, char *newTagName) {
int32_t col = mnodeFindSuperTableTagIndex(pStable, oldTagName); int32_t col = mnodeFindSuperTableTagIndex(pStable, oldTagName);
if (col < 0) { if (col < 0) {
mError("stable:%s, failed to modify table tag, oldName: %s, newName: %s", pStable->info.tableId, oldTagName, newTagName); mError("stable:%s, failed to modify table tag, oldName: %s, newName: %s", pStable->info.tableId, oldTagName, newTagName);
...@@ -956,16 +965,17 @@ static int32_t mnodeModifySuperTableTagName(SSuperTableObj *pStable, char *oldTa ...@@ -956,16 +965,17 @@ static int32_t mnodeModifySuperTableTagName(SSuperTableObj *pStable, char *oldTa
SSdbOper oper = { SSdbOper oper = {
.type = SDB_OPER_GLOBAL, .type = SDB_OPER_GLOBAL,
.table = tsSuperTableSdb, .table = tsSuperTableSdb,
.pObj = pStable .pObj = pStable,
.pMsg = pMsg
}; };
int32_t code = sdbUpdateRow(&oper); int32_t code = sdbUpdateRow(&oper);
if (code != TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
return TSDB_CODE_MND_SDB_ERROR; mPrint("stable %s, succeed to modify tag %s to %s", pStable->info.tableId, oldTagName, newTagName);
code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
} }
mPrint("stable %s, succeed to modify tag %s to %s", pStable->info.tableId, oldTagName, newTagName); return code;
return TSDB_CODE_SUCCESS;
} }
static int32_t mnodeFindSuperTableColumnIndex(SSuperTableObj *pStable, char *colName) { static int32_t mnodeFindSuperTableColumnIndex(SSuperTableObj *pStable, char *colName) {
...@@ -979,7 +989,8 @@ static int32_t mnodeFindSuperTableColumnIndex(SSuperTableObj *pStable, char *col ...@@ -979,7 +989,8 @@ static int32_t mnodeFindSuperTableColumnIndex(SSuperTableObj *pStable, char *col
return -1; return -1;
} }
static int32_t mnodeAddSuperTableColumn(SDbObj *pDb, SSuperTableObj *pStable, SSchema schema[], int32_t ncols) { static int32_t mnodeAddSuperTableColumn(SMnodeMsg *pMsg, SSuperTableObj *pStable, SSchema schema[], int32_t ncols) {
SDbObj *pDb = pMsg->pDb;
if (ncols <= 0) { if (ncols <= 0) {
mError("stable:%s, add column, ncols:%d <= 0", pStable->info.tableId); mError("stable:%s, add column, ncols:%d <= 0", pStable->info.tableId);
return TSDB_CODE_MND_APP_ERROR; return TSDB_CODE_MND_APP_ERROR;
...@@ -1021,19 +1032,21 @@ static int32_t mnodeAddSuperTableColumn(SDbObj *pDb, SSuperTableObj *pStable, SS ...@@ -1021,19 +1032,21 @@ static int32_t mnodeAddSuperTableColumn(SDbObj *pDb, SSuperTableObj *pStable, SS
SSdbOper oper = { SSdbOper oper = {
.type = SDB_OPER_GLOBAL, .type = SDB_OPER_GLOBAL,
.table = tsSuperTableSdb, .table = tsSuperTableSdb,
.pObj = pStable .pObj = pStable,
.pMsg = pMsg
}; };
int32_t code = sdbUpdateRow(&oper); int32_t code = sdbUpdateRow(&oper);
if (code != TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
return TSDB_CODE_MND_SDB_ERROR; mPrint("stable %s, succeed to add column", pStable->info.tableId);
code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
} }
mPrint("stable %s, succeed to add column", pStable->info.tableId); return code;
return TSDB_CODE_SUCCESS;
} }
static int32_t mnodeDropSuperTableColumn(SDbObj *pDb, SSuperTableObj *pStable, char *colName) { static int32_t mnodeDropSuperTableColumn(SMnodeMsg *pMsg, SSuperTableObj *pStable, char *colName) {
SDbObj *pDb = pMsg->pDb;
int32_t col = mnodeFindSuperTableColumnIndex(pStable, colName); int32_t col = mnodeFindSuperTableColumnIndex(pStable, colName);
if (col <= 0) { if (col <= 0) {
mError("stable:%s, drop column, column:%s not exist", pStable->info.tableId, colName); mError("stable:%s, drop column, column:%s not exist", pStable->info.tableId, colName);
...@@ -1058,16 +1071,17 @@ static int32_t mnodeDropSuperTableColumn(SDbObj *pDb, SSuperTableObj *pStable, c ...@@ -1058,16 +1071,17 @@ static int32_t mnodeDropSuperTableColumn(SDbObj *pDb, SSuperTableObj *pStable, c
SSdbOper oper = { SSdbOper oper = {
.type = SDB_OPER_GLOBAL, .type = SDB_OPER_GLOBAL,
.table = tsSuperTableSdb, .table = tsSuperTableSdb,
.pObj = pStable .pObj = pStable,
.pMsg = pMsg
}; };
int32_t code = sdbUpdateRow(&oper); int32_t code = sdbUpdateRow(&oper);
if (code != TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
return TSDB_CODE_MND_SDB_ERROR; mPrint("stable %s, succeed to delete column", pStable->info.tableId);
code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
} }
mPrint("stable %s, succeed to delete column", pStable->info.tableId); return code;
return TSDB_CODE_SUCCESS;
} }
// show super tables // show super tables
...@@ -1428,7 +1442,30 @@ static void *mnodeBuildCreateChildTableMsg(SCMCreateTableMsg *pMsg, SChildTableO ...@@ -1428,7 +1442,30 @@ static void *mnodeBuildCreateChildTableMsg(SCMCreateTableMsg *pMsg, SChildTableO
return pCreate; return pCreate;
} }
static SChildTableObj* mnodeDoCreateChildTable(SCMCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t tid) { static int32_t mnodeDoCreateChildTableCb(SMnodeMsg *pMsg, int32_t code) {
if (code != TSDB_CODE_SUCCESS) return code;
SCMCreateTableMsg *pCreate = pMsg->rpcMsg.pCont;
SMDCreateTableMsg *pMDCreate = mnodeBuildCreateChildTableMsg(pCreate, (SChildTableObj *)pMsg->pTable);
if (pMDCreate == NULL) {
return terrno;
}
SRpcIpSet ipSet = mnodeGetIpSetFromVgroup(pMsg->pVgroup);
SRpcMsg rpcMsg = {
.handle = pMsg,
.pCont = pMDCreate,
.contLen = htonl(pMDCreate->contLen),
.code = 0,
.msgType = TSDB_MSG_TYPE_MD_CREATE_TABLE
};
dnodeSendMsgToDnode(&ipSet, &rpcMsg);
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
}
static SChildTableObj* mnodeDoCreateChildTable(SMnodeMsg *pMsg, SVgObj *pVgroup, int32_t tid) {
SCMCreateTableMsg *pCreate = pMsg->rpcMsg.pCont;
SChildTableObj *pTable = calloc(1, sizeof(SChildTableObj)); SChildTableObj *pTable = calloc(1, sizeof(SChildTableObj));
if (pTable == NULL) { if (pTable == NULL) {
mError("table:%s, failed to alloc memory", pCreate->tableId); mError("table:%s, failed to alloc memory", pCreate->tableId);
...@@ -1503,16 +1540,20 @@ static SChildTableObj* mnodeDoCreateChildTable(SCMCreateTableMsg *pCreate, SVgOb ...@@ -1503,16 +1540,20 @@ static SChildTableObj* mnodeDoCreateChildTable(SCMCreateTableMsg *pCreate, SVgOb
desc.type = SDB_OPER_GLOBAL; desc.type = SDB_OPER_GLOBAL;
desc.pObj = pTable; desc.pObj = pTable;
desc.table = tsChildTableSdb; desc.table = tsChildTableSdb;
desc.pMsg = pMsg;
desc.cb = mnodeDoCreateChildTableCb;
if (sdbInsertRow(&desc) != TSDB_CODE_SUCCESS) { int32_t code = sdbInsertRow(&desc);
if (code != TSDB_CODE_SUCCESS) {
free(pTable); free(pTable);
mError("table:%s, update sdb error", pCreate->tableId); mError("table:%s, update sdb error", pCreate->tableId);
terrno = TSDB_CODE_MND_SDB_ERROR; terrno = TSDB_CODE_MND_SDB_ERROR;
return NULL; return NULL;
} else {
mTrace("table:%s, create table in vgroup:%d, id:%d, uid:%" PRIu64, pTable->info.tableId, pVgroup->vgId, pTable->sid,
pTable->uid);
return pTable;
} }
mTrace("table:%s, create table in vgroup:%d, id:%d, uid:%" PRIu64 , pTable->info.tableId, pVgroup->vgId, pTable->sid, pTable->uid);
return pTable;
} }
static int32_t mnodeProcessCreateChildTableMsg(SMnodeMsg *pMsg) { static int32_t mnodeProcessCreateChildTableMsg(SMnodeMsg *pMsg) {
...@@ -1537,7 +1578,7 @@ static int32_t mnodeProcessCreateChildTableMsg(SMnodeMsg *pMsg) { ...@@ -1537,7 +1578,7 @@ static int32_t mnodeProcessCreateChildTableMsg(SMnodeMsg *pMsg) {
return mnodeCreateVgroup(pMsg, pMsg->pDb); return mnodeCreateVgroup(pMsg, pMsg->pDb);
} }
pMsg->pTable = (STableObj *)mnodeDoCreateChildTable(pCreate, pVgroup, sid); pMsg->pTable = (STableObj *)mnodeDoCreateChildTable(pMsg, pVgroup, sid);
if (pMsg->pTable == NULL) { if (pMsg->pTable == NULL) {
return terrno; return terrno;
} }
...@@ -1550,25 +1591,9 @@ static int32_t mnodeProcessCreateChildTableMsg(SMnodeMsg *pMsg) { ...@@ -1550,25 +1591,9 @@ static int32_t mnodeProcessCreateChildTableMsg(SMnodeMsg *pMsg) {
if (pMsg->pTable == NULL) { if (pMsg->pTable == NULL) {
return terrno; return terrno;
} else {
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
} }
SMDCreateTableMsg *pMDCreate = mnodeBuildCreateChildTableMsg(pCreate, (SChildTableObj *)pMsg->pTable);
if (pMDCreate == NULL) {
return terrno;
}
SRpcIpSet ipSet = mnodeGetIpSetFromVgroup(pVgroup);
SRpcMsg rpcMsg = {
.handle = pMsg,
.pCont = pMDCreate,
.contLen = htonl(pMDCreate->contLen),
.code = 0,
.msgType = TSDB_MSG_TYPE_MD_CREATE_TABLE
};
dnodeSendMsgToDnode(&ipSet, &rpcMsg);
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
} }
static int32_t mnodeProcessDropChildTableMsg(SMnodeMsg *pMsg) { static int32_t mnodeProcessDropChildTableMsg(SMnodeMsg *pMsg) {
...@@ -1622,7 +1647,8 @@ static int32_t mnodeFindNormalTableColumnIndex(SChildTableObj *pTable, char *col ...@@ -1622,7 +1647,8 @@ static int32_t mnodeFindNormalTableColumnIndex(SChildTableObj *pTable, char *col
return -1; return -1;
} }
static int32_t mnodeAddNormalTableColumn(SDbObj *pDb, SChildTableObj *pTable, SSchema schema[], int32_t ncols) { static int32_t mnodeAddNormalTableColumn(SMnodeMsg *pMsg, SChildTableObj *pTable, SSchema schema[], int32_t ncols) {
SDbObj *pDb = pMsg->pDb;
if (ncols <= 0) { if (ncols <= 0) {
mError("table:%s, add column, ncols:%d <= 0", pTable->info.tableId); mError("table:%s, add column, ncols:%d <= 0", pTable->info.tableId);
return TSDB_CODE_MND_APP_ERROR; return TSDB_CODE_MND_APP_ERROR;
...@@ -1657,19 +1683,21 @@ static int32_t mnodeAddNormalTableColumn(SDbObj *pDb, SChildTableObj *pTable, SS ...@@ -1657,19 +1683,21 @@ static int32_t mnodeAddNormalTableColumn(SDbObj *pDb, SChildTableObj *pTable, SS
SSdbOper oper = { SSdbOper oper = {
.type = SDB_OPER_GLOBAL, .type = SDB_OPER_GLOBAL,
.table = tsChildTableSdb, .table = tsChildTableSdb,
.pObj = pTable .pObj = pTable,
.pMsg = pMsg
}; };
int32_t code = sdbUpdateRow(&oper); int32_t code = sdbUpdateRow(&oper);
if (code != TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
return TSDB_CODE_MND_SDB_ERROR; mPrint("table %s, succeed to add column", pTable->info.tableId);
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
} }
mPrint("table %s, succeed to add column", pTable->info.tableId); return code;
return TSDB_CODE_SUCCESS;
} }
static int32_t mnodeDropNormalTableColumn(SDbObj *pDb, SChildTableObj *pTable, char *colName) { static int32_t mnodeDropNormalTableColumn(SMnodeMsg *pMsg, SChildTableObj *pTable, char *colName) {
SDbObj *pDb = pMsg->pDb;
int32_t col = mnodeFindNormalTableColumnIndex(pTable, colName); int32_t col = mnodeFindNormalTableColumnIndex(pTable, colName);
if (col <= 0) { if (col <= 0) {
mError("table:%s, drop column, column:%s not exist", pTable->info.tableId, colName); mError("table:%s, drop column, column:%s not exist", pTable->info.tableId, colName);
...@@ -1689,16 +1717,17 @@ static int32_t mnodeDropNormalTableColumn(SDbObj *pDb, SChildTableObj *pTable, c ...@@ -1689,16 +1717,17 @@ static int32_t mnodeDropNormalTableColumn(SDbObj *pDb, SChildTableObj *pTable, c
SSdbOper oper = { SSdbOper oper = {
.type = SDB_OPER_GLOBAL, .type = SDB_OPER_GLOBAL,
.table = tsChildTableSdb, .table = tsChildTableSdb,
.pObj = pTable .pObj = pTable,
.pMsg = pMsg
}; };
int32_t code = sdbUpdateRow(&oper); int32_t code = sdbUpdateRow(&oper);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return TSDB_CODE_MND_SDB_ERROR; mPrint("table %s, succeed to drop column %s", pTable->info.tableId, colName);
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
} }
mPrint("table %s, succeed to drop column %s", pTable->info.tableId, colName); return code;
return TSDB_CODE_SUCCESS;
} }
static int32_t mnodeSetSchemaFromNormalTable(SSchema *pSchema, SChildTableObj *pTable) { static int32_t mnodeSetSchemaFromNormalTable(SSchema *pSchema, SChildTableObj *pTable) {
...@@ -2237,32 +2266,32 @@ static int32_t mnodeProcessAlterTableMsg(SMnodeMsg *pMsg) { ...@@ -2237,32 +2266,32 @@ static int32_t mnodeProcessAlterTableMsg(SMnodeMsg *pMsg) {
SSuperTableObj *pTable = (SSuperTableObj *)pMsg->pTable; SSuperTableObj *pTable = (SSuperTableObj *)pMsg->pTable;
mTrace("table:%s, start to alter stable", pAlter->tableId); mTrace("table:%s, start to alter stable", pAlter->tableId);
if (pAlter->type == TSDB_ALTER_TABLE_ADD_TAG_COLUMN) { if (pAlter->type == TSDB_ALTER_TABLE_ADD_TAG_COLUMN) {
code = mnodeAddSuperTableTag(pTable, pAlter->schema, 1); code = mnodeAddSuperTableTag(pMsg, pTable, pAlter->schema, 1);
} else if (pAlter->type == TSDB_ALTER_TABLE_DROP_TAG_COLUMN) { } else if (pAlter->type == TSDB_ALTER_TABLE_DROP_TAG_COLUMN) {
code = mnodeDropSuperTableTag(pTable, pAlter->schema[0].name); code = mnodeDropSuperTableTag(pMsg, pTable, pAlter->schema[0].name);
} else if (pAlter->type == TSDB_ALTER_TABLE_CHANGE_TAG_COLUMN) { } else if (pAlter->type == TSDB_ALTER_TABLE_CHANGE_TAG_COLUMN) {
code = mnodeModifySuperTableTagName(pTable, pAlter->schema[0].name, pAlter->schema[1].name); code = mnodeModifySuperTableTagName(pMsg, pTable, pAlter->schema[0].name, pAlter->schema[1].name);
} else if (pAlter->type == TSDB_ALTER_TABLE_ADD_COLUMN) { } else if (pAlter->type == TSDB_ALTER_TABLE_ADD_COLUMN) {
code = mnodeAddSuperTableColumn(pMsg->pDb, pTable, pAlter->schema, 1); code = mnodeAddSuperTableColumn(pMsg, pTable, pAlter->schema, 1);
} else if (pAlter->type == TSDB_ALTER_TABLE_DROP_COLUMN) { } else if (pAlter->type == TSDB_ALTER_TABLE_DROP_COLUMN) {
code = mnodeDropSuperTableColumn(pMsg->pDb, pTable, pAlter->schema[0].name); code = mnodeDropSuperTableColumn(pMsg, pTable, pAlter->schema[0].name);
} else { } else {
} }
} else { } else {
mTrace("table:%s, start to alter ctable", pAlter->tableId); mTrace("table:%s, start to alter ctable", pAlter->tableId);
SChildTableObj *pTable = (SChildTableObj *)pMsg->pTable; SChildTableObj *pTable = (SChildTableObj *)pMsg->pTable;
if (pAlter->type == TSDB_ALTER_TABLE_UPDATE_TAG_VAL) { if (pAlter->type == TSDB_ALTER_TABLE_UPDATE_TAG_VAL) {
char *tagVal = (char*)(pAlter->schema + pAlter->numOfCols); char *tagVal = (char *)(pAlter->schema + pAlter->numOfCols);
code = mnodeModifyChildTableTagValue(pTable, pAlter->schema[0].name, tagVal); code = mnodeModifyChildTableTagValue(pTable, pAlter->schema[0].name, tagVal);
} else if (pAlter->type == TSDB_ALTER_TABLE_ADD_COLUMN) { } else if (pAlter->type == TSDB_ALTER_TABLE_ADD_COLUMN) {
code = mnodeAddNormalTableColumn(pMsg->pDb, pTable, pAlter->schema, 1); code = mnodeAddNormalTableColumn(pMsg, pTable, pAlter->schema, 1);
} else if (pAlter->type == TSDB_ALTER_TABLE_DROP_COLUMN) { } else if (pAlter->type == TSDB_ALTER_TABLE_DROP_COLUMN) {
code = mnodeDropNormalTableColumn(pMsg->pDb, pTable, pAlter->schema[0].name); code = mnodeDropNormalTableColumn(pMsg, pTable, pAlter->schema[0].name);
} else { } else {
} }
} }
return code; return code;
} }
static int32_t mnodeGetStreamTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { static int32_t mnodeGetStreamTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
......
...@@ -299,6 +299,27 @@ void *mnodeGetNextVgroup(void *pIter, SVgObj **pVgroup) { ...@@ -299,6 +299,27 @@ void *mnodeGetNextVgroup(void *pIter, SVgObj **pVgroup) {
return sdbFetchRow(tsVgroupSdb, pIter, (void **)pVgroup); return sdbFetchRow(tsVgroupSdb, pIter, (void **)pVgroup);
} }
static int32_t mnodeCreateVgroupCb(SMnodeMsg *pMsg, int32_t code) {
if (code != TSDB_CODE_SUCCESS) {
pMsg->pVgroup = NULL;
return code;
}
SVgObj *pVgroup = pMsg->pVgroup;
SDbObj *pDb = pMsg->pDb;
mPrint("vgId:%d, is created in mnode, db:%s replica:%d", pVgroup->vgId, pDb->name, pVgroup->numOfVnodes);
for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
mPrint("vgId:%d, index:%d, dnode:%d", pVgroup->vgId, i, pVgroup->vnodeGid[i].dnodeId);
}
mnodeIncVgroupRef(pVgroup);
pMsg->expected = pVgroup->numOfVnodes;
mnodeSendCreateVgroupMsg(pVgroup, pMsg);
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
}
int32_t mnodeCreateVgroup(SMnodeMsg *pMsg, SDbObj *pDb) { int32_t mnodeCreateVgroup(SMnodeMsg *pMsg, SDbObj *pDb) {
SVgObj *pVgroup = (SVgObj *)calloc(1, sizeof(SVgObj)); SVgObj *pVgroup = (SVgObj *)calloc(1, sizeof(SVgObj));
strcpy(pVgroup->dbName, pDb->name); strcpy(pVgroup->dbName, pDb->name);
...@@ -314,26 +335,22 @@ int32_t mnodeCreateVgroup(SMnodeMsg *pMsg, SDbObj *pDb) { ...@@ -314,26 +335,22 @@ int32_t mnodeCreateVgroup(SMnodeMsg *pMsg, SDbObj *pDb) {
.type = SDB_OPER_GLOBAL, .type = SDB_OPER_GLOBAL,
.table = tsVgroupSdb, .table = tsVgroupSdb,
.pObj = pVgroup, .pObj = pVgroup,
.rowSize = sizeof(SVgObj) .rowSize = sizeof(SVgObj),
.pMsg = pMsg,
.cb = mnodeCreateVgroupCb
}; };
pMsg->pVgroup = pVgroup;
int32_t code = sdbInsertRow(&oper); int32_t code = sdbInsertRow(&oper);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
pMsg->pVgroup = NULL;
tfree(pVgroup); tfree(pVgroup);
return TSDB_CODE_MND_SDB_ERROR; } else {
} if (pMsg != NULL) code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
mPrint("vgId:%d, is created in mnode, db:%s replica:%d", pVgroup->vgId, pDb->name, pVgroup->numOfVnodes);
for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
mPrint("vgId:%d, index:%d, dnode:%d", pVgroup->vgId, i, pVgroup->vnodeGid[i].dnodeId);
} }
mnodeIncVgroupRef(pVgroup); return code;
pMsg->pVgroup = pVgroup;
pMsg->expected = pVgroup->numOfVnodes;
mnodeSendCreateVgroupMsg(pVgroup, pMsg);
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
} }
void mnodeDropVgroup(SVgObj *pVgroup, void *ahandle) { void mnodeDropVgroup(SVgObj *pVgroup, void *ahandle) {
......
...@@ -117,7 +117,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread ...@@ -117,7 +117,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
code = pthread_mutex_init(&(pThreadObj->mutex), NULL); code = pthread_mutex_init(&(pThreadObj->mutex), NULL);
if (code < 0) { if (code < 0) {
tError("%s failed to init TCP process data mutex(%s)", label, strerror(errno)); tError("%s failed to init TCP process data mutex(%s)", label, strerror(errno));
break;; break;
} }
pThreadObj->pollFd = epoll_create(10); // size does not matter pThreadObj->pollFd = epoll_create(10); // size does not matter
...@@ -367,7 +367,7 @@ static void taosReportBrokenLink(SFdObj *pFdObj) { ...@@ -367,7 +367,7 @@ static void taosReportBrokenLink(SFdObj *pFdObj) {
recvInfo.ip = 0; recvInfo.ip = 0;
recvInfo.port = 0; recvInfo.port = 0;
recvInfo.shandle = pThreadObj->shandle; recvInfo.shandle = pThreadObj->shandle;
recvInfo.thandle = pFdObj->thandle;; recvInfo.thandle = pFdObj->thandle;
recvInfo.chandle = NULL; recvInfo.chandle = NULL;
recvInfo.connType = RPC_CONN_TCP; recvInfo.connType = RPC_CONN_TCP;
(*(pThreadObj->processData))(&recvInfo); (*(pThreadObj->processData))(&recvInfo);
...@@ -414,7 +414,7 @@ static int taosReadTcpData(SFdObj *pFdObj, SRecvInfo *pInfo) { ...@@ -414,7 +414,7 @@ static int taosReadTcpData(SFdObj *pFdObj, SRecvInfo *pInfo) {
pInfo->ip = pFdObj->ip; pInfo->ip = pFdObj->ip;
pInfo->port = pFdObj->port; pInfo->port = pFdObj->port;
pInfo->shandle = pThreadObj->shandle; pInfo->shandle = pThreadObj->shandle;
pInfo->thandle = pFdObj->thandle;; pInfo->thandle = pFdObj->thandle;
pInfo->chandle = pFdObj; pInfo->chandle = pFdObj;
pInfo->connType = RPC_CONN_TCP; pInfo->connType = RPC_CONN_TCP;
......
...@@ -29,7 +29,7 @@ static FILE* fpAllocLog = NULL; ...@@ -29,7 +29,7 @@ static FILE* fpAllocLog = NULL;
// memory allocator which fails randomly // memory allocator which fails randomly
extern int32_t taosGetTimestampSec(); extern int32_t taosGetTimestampSec();
static int32_t startTime = INT32_MAX;; static int32_t startTime = INT32_MAX;
static bool random_alloc_fail(size_t size, const char* file, uint32_t line) { static bool random_alloc_fail(size_t size, const char* file, uint32_t line) {
if (taosGetTimestampSec() < startTime) { if (taosGetTimestampSec() < startTime) {
......
...@@ -119,7 +119,7 @@ int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) { ...@@ -119,7 +119,7 @@ int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) {
tsdbCfg.minRowsPerFileBlock = pVnodeCfg->cfg.minRowsPerFileBlock; tsdbCfg.minRowsPerFileBlock = pVnodeCfg->cfg.minRowsPerFileBlock;
tsdbCfg.maxRowsPerFileBlock = pVnodeCfg->cfg.maxRowsPerFileBlock; tsdbCfg.maxRowsPerFileBlock = pVnodeCfg->cfg.maxRowsPerFileBlock;
tsdbCfg.precision = pVnodeCfg->cfg.precision; tsdbCfg.precision = pVnodeCfg->cfg.precision;
tsdbCfg.compression = pVnodeCfg->cfg.compression;; tsdbCfg.compression = pVnodeCfg->cfg.compression;
char tsdbDir[TSDB_FILENAME_LEN] = {0}; char tsdbDir[TSDB_FILENAME_LEN] = {0};
sprintf(tsdbDir, "%s/vnode%d/tsdb", tsVnodeDir, pVnodeCfg->cfg.vgId); sprintf(tsdbDir, "%s/vnode%d/tsdb", tsVnodeDir, pVnodeCfg->cfg.vgId);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册