提交 f893b656 编写于 作者: X Xiaoyu Wang

reorganize physical plan code

上级 09f36860
...@@ -287,6 +287,7 @@ int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) { ...@@ -287,6 +287,7 @@ int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) {
int32_t tlen = 0; int32_t tlen = 0;
tlen += taosEncodeFixedI64(buf, pReq->ver); tlen += taosEncodeFixedI64(buf, pReq->ver);
tlen += taosEncodeString(buf, pReq->dbFName);
tlen += taosEncodeString(buf, pReq->name); tlen += taosEncodeString(buf, pReq->name);
tlen += taosEncodeFixedU32(buf, pReq->ttl); tlen += taosEncodeFixedU32(buf, pReq->ttl);
tlen += taosEncodeFixedU32(buf, pReq->keep); tlen += taosEncodeFixedU32(buf, pReq->keep);
...@@ -360,6 +361,7 @@ int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) { ...@@ -360,6 +361,7 @@ int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) {
void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) { void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) {
buf = taosDecodeFixedI64(buf, &(pReq->ver)); buf = taosDecodeFixedI64(buf, &(pReq->ver));
buf = taosDecodeString(buf, &(pReq->dbFName));
buf = taosDecodeString(buf, &(pReq->name)); buf = taosDecodeString(buf, &(pReq->name));
buf = taosDecodeFixedU32(buf, &(pReq->ttl)); buf = taosDecodeFixedU32(buf, &(pReq->ttl));
buf = taosDecodeFixedU32(buf, &(pReq->keep)); buf = taosDecodeFixedU32(buf, &(pReq->keep));
...@@ -478,7 +480,7 @@ void *tDeserializeSVCreateTbBatchReq(void *buf, SVCreateTbBatchReq *pReq) { ...@@ -478,7 +480,7 @@ void *tDeserializeSVCreateTbBatchReq(void *buf, SVCreateTbBatchReq *pReq) {
buf = taosDecodeFixedU32(buf, &nsize); buf = taosDecodeFixedU32(buf, &nsize);
pReq->pArray = taosArrayInit(nsize, sizeof(SVCreateTbReq)); pReq->pArray = taosArrayInit(nsize, sizeof(SVCreateTbReq));
for (size_t i = 0; i < nsize; i++) { for (size_t i = 0; i < nsize; i++) {
SVCreateTbReq req; SVCreateTbReq req = {0};
buf = tDeserializeSVCreateTbReq(buf, &req); buf = tDeserializeSVCreateTbReq(buf, &req);
taosArrayPush(pReq->pArray, &req); taosArrayPush(pReq->pArray, &req);
} }
......
...@@ -271,9 +271,12 @@ static SDbObj *mndAcquireDbByStb(SMnode *pMnode, const char *stbName) { ...@@ -271,9 +271,12 @@ static SDbObj *mndAcquireDbByStb(SMnode *pMnode, const char *stbName) {
static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, int32_t *pContLen) { static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, int32_t *pContLen) {
SName name = {0}; SName name = {0};
tNameFromString(&name, pStb->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); tNameFromString(&name, pStb->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
char dbFName[TSDB_DB_FNAME_LEN] = {0};
tNameGetFullDbName(&name, dbFName);
SVCreateTbReq req = {0}; SVCreateTbReq req = {0};
req.ver = 0; req.ver = 0;
req.dbFName = dbFName;
req.name = (char *)tNameGetTableName(&name); req.name = (char *)tNameGetTableName(&name);
req.ttl = 0; req.ttl = 0;
req.keep = 0; req.keep = 0;
......
...@@ -1870,14 +1870,21 @@ static void toSchema(const SColumnDefNode* pCol, int32_t colId, SSchema* pSchema ...@@ -1870,14 +1870,21 @@ static void toSchema(const SColumnDefNode* pCol, int32_t colId, SSchema* pSchema
} }
static void destroyCreateTbReq(SVCreateTbReq* pReq) { static void destroyCreateTbReq(SVCreateTbReq* pReq) {
tfree(pReq->dbFName);
tfree(pReq->name); tfree(pReq->name);
tfree(pReq->ntbCfg.pSchema); tfree(pReq->ntbCfg.pSchema);
} }
static int32_t buildNormalTableBatchReq( static int32_t buildNormalTableBatchReq(int32_t acctId, const char* pDbName, const char* pTableName,
const char* pDbName, const char* pTableName, const SNodeList* pColumns, const SVgroupInfo* pVgroupInfo, SVgroupTablesBatch* pBatch) { const SNodeList* pColumns, const SVgroupInfo* pVgroupInfo, SVgroupTablesBatch* pBatch) {
char dbFName[TSDB_DB_FNAME_LEN] = {0};
SName name = { .type = TSDB_DB_NAME_T, .acctId = acctId };
strcpy(name.dbname, pDbName);
tNameGetFullDbName(&name, dbFName);
SVCreateTbReq req = {0}; SVCreateTbReq req = {0};
req.type = TD_NORMAL_TABLE; req.type = TD_NORMAL_TABLE;
req.dbFName = strdup(dbFName);
req.name = strdup(pTableName); req.name = strdup(pTableName);
req.ntbCfg.nCols = LIST_LENGTH(pColumns); req.ntbCfg.nCols = LIST_LENGTH(pColumns);
req.ntbCfg.pSchema = calloc(req.ntbCfg.nCols, sizeof(SSchema)); req.ntbCfg.pSchema = calloc(req.ntbCfg.nCols, sizeof(SSchema));
...@@ -1904,7 +1911,7 @@ static int32_t buildNormalTableBatchReq( ...@@ -1904,7 +1911,7 @@ static int32_t buildNormalTableBatchReq(
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t serializeVgroupTablesBatch(int32_t acctId, SVgroupTablesBatch* pTbBatch, SArray* pBufArray) { static int32_t serializeVgroupTablesBatch(SVgroupTablesBatch* pTbBatch, SArray* pBufArray) {
int tlen = sizeof(SMsgHead) + tSerializeSVCreateTbBatchReq(NULL, &(pTbBatch->req)); int tlen = sizeof(SMsgHead) + tSerializeSVCreateTbBatchReq(NULL, &(pTbBatch->req));
void* buf = malloc(tlen); void* buf = malloc(tlen);
if (NULL == buf) { if (NULL == buf) {
...@@ -1932,6 +1939,7 @@ static void destroyCreateTbReqBatch(SVgroupTablesBatch* pTbBatch) { ...@@ -1932,6 +1939,7 @@ static void destroyCreateTbReqBatch(SVgroupTablesBatch* pTbBatch) {
size_t size = taosArrayGetSize(pTbBatch->req.pArray); size_t size = taosArrayGetSize(pTbBatch->req.pArray);
for(int32_t i = 0; i < size; ++i) { for(int32_t i = 0; i < size; ++i) {
SVCreateTbReq* pTableReq = taosArrayGet(pTbBatch->req.pArray, i); SVCreateTbReq* pTableReq = taosArrayGet(pTbBatch->req.pArray, i);
tfree(pTableReq->dbFName);
tfree(pTableReq->name); tfree(pTableReq->name);
if (pTableReq->type == TSDB_NORMAL_TABLE) { if (pTableReq->type == TSDB_NORMAL_TABLE) {
...@@ -1973,9 +1981,9 @@ static int32_t buildCreateTableDataBlock(int32_t acctId, const SCreateTableStmt* ...@@ -1973,9 +1981,9 @@ static int32_t buildCreateTableDataBlock(int32_t acctId, const SCreateTableStmt*
} }
SVgroupTablesBatch tbatch = {0}; SVgroupTablesBatch tbatch = {0};
int32_t code = buildNormalTableBatchReq(pStmt->dbName, pStmt->tableName, pStmt->pCols, pInfo, &tbatch); int32_t code = buildNormalTableBatchReq(acctId, pStmt->dbName, pStmt->tableName, pStmt->pCols, pInfo, &tbatch);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = serializeVgroupTablesBatch(acctId, &tbatch, *pBufArray); code = serializeVgroupTablesBatch(&tbatch, *pBufArray);
} }
destroyCreateTbReqBatch(&tbatch); destroyCreateTbReqBatch(&tbatch);
...@@ -2004,9 +2012,16 @@ static int32_t rewriteCreateTable(STranslateContext* pCxt, SQuery* pQuery) { ...@@ -2004,9 +2012,16 @@ static int32_t rewriteCreateTable(STranslateContext* pCxt, SQuery* pQuery) {
return code; return code;
} }
static void addCreateTbReqIntoVgroup(SHashObj* pVgroupHashmap, const char* pDbName, const char* pTableName, SKVRow row, uint64_t suid, SVgroupInfo* pVgInfo) { static void addCreateTbReqIntoVgroup(int32_t acctId, SHashObj* pVgroupHashmap,
const char* pDbName, const char* pTableName, SKVRow row, uint64_t suid, SVgroupInfo* pVgInfo) {
char dbFName[TSDB_DB_FNAME_LEN] = {0};
SName name = { .type = TSDB_DB_NAME_T, .acctId = acctId };
strcpy(name.dbname, pDbName);
tNameGetFullDbName(&name, dbFName);
struct SVCreateTbReq req = {0}; struct SVCreateTbReq req = {0};
req.type = TD_CHILD_TABLE; req.type = TD_CHILD_TABLE;
req.dbFName = strdup(dbFName);
req.name = strdup(pTableName); req.name = strdup(pTableName);
req.ctbCfg.suid = suid; req.ctbCfg.suid = suid;
req.ctbCfg.pTag = row; req.ctbCfg.pTag = row;
...@@ -2159,7 +2174,7 @@ static int32_t rewriteCreateSubTable(STranslateContext* pCxt, SCreateSubTableCla ...@@ -2159,7 +2174,7 @@ static int32_t rewriteCreateSubTable(STranslateContext* pCxt, SCreateSubTableCla
code = getTableHashVgroup(pCxt, pStmt->dbName, pStmt->tableName, &info); code = getTableHashVgroup(pCxt, pStmt->dbName, pStmt->tableName, &info);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
addCreateTbReqIntoVgroup(pVgroupHashmap, pStmt->dbName, pStmt->tableName, row, pSuperTableMeta->uid, &info); addCreateTbReqIntoVgroup(pCxt->pParseCxt->acctId, pVgroupHashmap, pStmt->dbName, pStmt->tableName, row, pSuperTableMeta->uid, &info);
} }
tfree(pSuperTableMeta); tfree(pSuperTableMeta);
...@@ -2181,7 +2196,7 @@ static SArray* serializeVgroupsTablesBatch(int32_t acctId, SHashObj* pVgroupHash ...@@ -2181,7 +2196,7 @@ static SArray* serializeVgroupsTablesBatch(int32_t acctId, SHashObj* pVgroupHash
break; break;
} }
serializeVgroupTablesBatch(acctId, pTbBatch, pBufArray); serializeVgroupTablesBatch(pTbBatch, pBufArray);
destroyCreateTbReqBatch(pTbBatch); destroyCreateTbReqBatch(pTbBatch);
} while (true); } while (true);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册