提交 5362a0b1 编写于 作者: S slguan

[TD-202]

上级 608f2394
...@@ -74,6 +74,7 @@ static void mgmtProcessTableCfgMsg(SRpcMsg *rpcMsg); ...@@ -74,6 +74,7 @@ static void mgmtProcessTableCfgMsg(SRpcMsg *rpcMsg);
static void mgmtProcessTableMetaMsg(SQueuedMsg *queueMsg); static void mgmtProcessTableMetaMsg(SQueuedMsg *queueMsg);
static void mgmtGetSuperTableMeta(SQueuedMsg *pMsg); static void mgmtGetSuperTableMeta(SQueuedMsg *pMsg);
static void mgmtGetChildTableMeta(SQueuedMsg *pMsg); static void mgmtGetChildTableMeta(SQueuedMsg *pMsg);
static void mgmtAutoCreateChildTable(SQueuedMsg *pMsg);
static void mgmtProcessAlterTableMsg(SQueuedMsg *queueMsg); static void mgmtProcessAlterTableMsg(SQueuedMsg *queueMsg);
static void mgmtProcessAlterTableRsp(SRpcMsg *rpcMsg); static void mgmtProcessAlterTableRsp(SRpcMsg *rpcMsg);
...@@ -612,9 +613,12 @@ static void mgmtExtractTableName(char* tableId, char* name) { ...@@ -612,9 +613,12 @@ static void mgmtExtractTableName(char* tableId, char* name) {
static void mgmtProcessCreateTableMsg(SQueuedMsg *pMsg) { static void mgmtProcessCreateTableMsg(SQueuedMsg *pMsg) {
SCMCreateTableMsg *pCreate = pMsg->pCont; SCMCreateTableMsg *pCreate = pMsg->pCont;
pMsg->pTable = mgmtGetTable(pCreate->tableId); if (pMsg->pTable == NULL) pMsg->pTable = mgmtGetTable(pCreate->tableId);
if (pMsg->pTable != NULL && pMsg->retry == 0) { if (pMsg->pTable != NULL && pMsg->retry == 0) {
if (pCreate->igExists) { if (pCreate->getMeta) {
mTrace("table:%s, continue to get meta", pCreate->tableId);
mgmtProcessTableMetaMsg(pMsg);
} else if (pCreate->igExists) {
mTrace("table:%s, is already exist", pCreate->tableId); mTrace("table:%s, is already exist", pCreate->tableId);
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_SUCCESS); mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_SUCCESS);
} else { } else {
...@@ -624,7 +628,7 @@ static void mgmtProcessCreateTableMsg(SQueuedMsg *pMsg) { ...@@ -624,7 +628,7 @@ static void mgmtProcessCreateTableMsg(SQueuedMsg *pMsg) {
return; return;
} }
pMsg->pDb = mgmtGetDb(pCreate->db); if (pMsg->pDb == NULL) pMsg->pDb = mgmtGetDb(pCreate->db);
if (pMsg->pDb == NULL || pMsg->pDb->status != TSDB_DB_STATUS_READY) { if (pMsg->pDb == NULL || pMsg->pDb->status != TSDB_DB_STATUS_READY) {
mError("table:%s, failed to create, db not selected", pCreate->tableId); mError("table:%s, failed to create, db not selected", pCreate->tableId);
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_DB_NOT_SELECTED); mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_DB_NOT_SELECTED);
...@@ -681,15 +685,21 @@ static void mgmtProcessTableMetaMsg(SQueuedMsg *pMsg) { ...@@ -681,15 +685,21 @@ static void mgmtProcessTableMetaMsg(SQueuedMsg *pMsg) {
SCMTableInfoMsg *pInfo = pMsg->pCont; SCMTableInfoMsg *pInfo = pMsg->pCont;
mTrace("table:%s, table meta msg is received from thandle:%p", pInfo->tableId, pMsg->thandle); mTrace("table:%s, table meta msg is received from thandle:%p", pInfo->tableId, pMsg->thandle);
pMsg->pDb = mgmtGetDbByTableId(pInfo->tableId); if (pMsg->pDb == NULL) pMsg->pDb = mgmtGetDbByTableId(pInfo->tableId);
if (pMsg->pDb == NULL || pMsg->pDb->status != TSDB_DB_STATUS_READY) { if (pMsg->pDb == NULL || pMsg->pDb->status != TSDB_DB_STATUS_READY) {
mError("table:%s, failed to get table meta, db not selected", pInfo->tableId); mError("table:%s, failed to get table meta, db not selected", pInfo->tableId);
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_DB_NOT_SELECTED); mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_DB_NOT_SELECTED);
return; return;
} }
if (pMsg->pTable == NULL) pMsg->pTable = mgmtGetTable(pInfo->tableId);
if (pMsg->pTable == NULL) { if (pMsg->pTable == NULL) {
mgmtGetChildTableMeta(pMsg); if (htons(pInfo->createFlag) != 1) {
mError("table:%s, failed to get table meta, table not exist", pInfo->tableId);
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_TABLE);
} else {
mgmtAutoCreateChildTable(pMsg);
}
} else { } else {
if (pMsg->pTable->type != TSDB_SUPER_TABLE) { if (pMsg->pTable->type != TSDB_SUPER_TABLE) {
mgmtGetChildTableMeta(pMsg); mgmtGetChildTableMeta(pMsg);
...@@ -1654,17 +1664,8 @@ static int32_t mgmtDoGetChildTableMeta(SQueuedMsg *pMsg, STableMetaMsg *pMeta) { ...@@ -1654,17 +1664,8 @@ static int32_t mgmtDoGetChildTableMeta(SQueuedMsg *pMsg, STableMetaMsg *pMeta) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void mgmtGetChildTableMeta(SQueuedMsg *pMsg) { static void mgmtAutoCreateChildTable(SQueuedMsg *pMsg) {
SChildTableObj *pTable = (SChildTableObj *)pMsg->pTable;
SCMTableInfoMsg *pInfo = pMsg->pCont; SCMTableInfoMsg *pInfo = pMsg->pCont;
if (pTable == NULL) {
if (htons(pInfo->createFlag) != 1) {
mError("table:%s, failed to get table meta, table not exist", pInfo->tableId);
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_TABLE);
return;
} else {
//TODO: on demand create table from super table if table does not exists
int32_t contLen = sizeof(SCMCreateTableMsg) + sizeof(STagData); int32_t contLen = sizeof(SCMCreateTableMsg) + sizeof(STagData);
SCMCreateTableMsg *pCreateMsg = rpcMallocCont(contLen); SCMCreateTableMsg *pCreateMsg = rpcMallocCont(contLen);
if (pCreateMsg == NULL) { if (pCreateMsg == NULL) {
...@@ -1672,24 +1673,25 @@ void mgmtGetChildTableMeta(SQueuedMsg *pMsg) { ...@@ -1672,24 +1673,25 @@ void mgmtGetChildTableMeta(SQueuedMsg *pMsg) {
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_SERV_OUT_OF_MEMORY); mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_SERV_OUT_OF_MEMORY);
return; return;
} }
memcpy(pCreateMsg->schema, pInfo->tags, sizeof(STagData));
strncpy(pCreateMsg->tableId, pInfo->tableId, tListLen(pInfo->tableId));
SQueuedMsg *newMsg = malloc(sizeof(SQueuedMsg)); strncpy(pCreateMsg->tableId, pInfo->tableId, tListLen(pInfo->tableId));
memcpy(newMsg, pMsg, sizeof(SQueuedMsg)); strcpy(pCreateMsg->db, pMsg->pDb->name);
pMsg->pCont = NULL; pCreateMsg->igExists = 1;
pCreateMsg->getMeta = 1;
memcpy(pCreateMsg->schema, pInfo->tags, sizeof(STagData));
newMsg->ahandle = newMsg->pCont; SQueuedMsg *newMsg = mgmtCloneQueuedMsg(pMsg);
pMsg->pCont = newMsg->pCont;
newMsg->pCont = pCreateMsg; newMsg->pCont = pCreateMsg;
mTrace("table:%s, start to create in demand", pInfo->tableId);
mTrace("table:%s, start to create on demand", pInfo->tableId);
mgmtAddToShellQueue(newMsg); mgmtAddToShellQueue(newMsg);
return; }
}
}
static void mgmtGetChildTableMeta(SQueuedMsg *pMsg) {
STableMetaMsg *pMeta = rpcMallocCont(sizeof(STableMetaMsg) + sizeof(SSchema) * TSDB_MAX_COLUMNS); STableMetaMsg *pMeta = rpcMallocCont(sizeof(STableMetaMsg) + sizeof(SSchema) * TSDB_MAX_COLUMNS);
if (pMeta == NULL) { if (pMeta == NULL) {
mError("table:%s, failed to get table meta, no enough memory", pTable->info.tableId); mError("table:%s, failed to get table meta, no enough memory", pMsg->pTable->tableId);
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_SERV_OUT_OF_MEMORY); mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_SERV_OUT_OF_MEMORY);
return; return;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册