From 1c57b5e74675b1408d65674d685bb1f2f2fe583f Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Fri, 31 Dec 2021 08:30:40 +0000 Subject: [PATCH] batch create table --- include/common/tmsg.h | 4 +- source/common/src/tmsg.c | 29 ++++++++++++- source/dnode/vnode/impl/src/vnodeWrite.c | 17 ++++++-- source/libs/parser/src/dCDAstProcess.c | 55 ++++++++++++++++-------- 4 files changed, 82 insertions(+), 23 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 10aba94656..af250bff03 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1285,8 +1285,10 @@ typedef struct { int tmsgSVCreateTbReqEncode(SMsgEncoder* pCoder, SVCreateTbReq* pReq); int tmsgSVCreateTbReqDecode(SMsgDecoder* pCoder, SVCreateTbReq* pReq); -int tSerializeSVCreateTbReq(void** buf, const SVCreateTbReq* pReq); +int tSerializeSVCreateTbReq(void** buf, SVCreateTbReq* pReq); void* tDeserializeSVCreateTbReq(void* buf, SVCreateTbReq* pReq); +int tSVCreateTbBatchReqSerialize(void** buf, SVCreateTbBatchReq* pReq); +void* tSVCreateTbBatchReqDeserialize(void* buf, SVCreateTbBatchReq* pReq); typedef struct SVCreateTbRsp { } SVCreateTbRsp; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index b81143ee62..a18a472dba 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -98,7 +98,7 @@ int tmsgSVCreateTbReqDecode(SMsgDecoder *pCoder, SVCreateTbReq *pReq) { return 0; } -int tSerializeSVCreateTbReq(void **buf, const SVCreateTbReq *pReq) { +int tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) { int tlen = 0; tlen += taosEncodeFixedU64(buf, pReq->ver); @@ -193,6 +193,33 @@ void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) { return buf; } +int tSVCreateTbBatchReqSerialize(void **buf, SVCreateTbBatchReq *pReq) { + int tlen = 0; + + tlen += taosEncodeFixedU64(buf, pReq->ver); + tlen += taosEncodeFixedU32(buf, taosArrayGetSize(pReq->pArray)); + for (size_t i = 0; i < taosArrayGetSize(pReq->pArray); i++) { + SVCreateTbReq *pCreateTbReq = taosArrayGet(pReq->pArray, i); + tlen += tSerializeSVCreateTbReq(buf, pCreateTbReq); + } + + return tlen; +} + +void *tSVCreateTbBatchReqDeserialize(void *buf, SVCreateTbBatchReq *pReq) { + uint32_t nsize = 0; + + buf = taosDecodeFixedU64(buf, &pReq->ver); + buf = taosDecodeFixedU32(buf, &nsize); + for (size_t i = 0; i < nsize; i++) { + SVCreateTbReq req; + buf = tDeserializeSVCreateTbReq(buf, &req); + taosArrayPush(pReq->pArray, &req); + } + + return buf; +} + /* ------------------------ STATIC METHODS ------------------------ */ static int tmsgStartEncode(SMsgEncoder *pME) { struct SMEListNode *pNode = (struct SMEListNode *)malloc(sizeof(*pNode)); diff --git a/source/dnode/vnode/impl/src/vnodeWrite.c b/source/dnode/vnode/impl/src/vnodeWrite.c index 3b1442a02c..88a73ca174 100644 --- a/source/dnode/vnode/impl/src/vnodeWrite.c +++ b/source/dnode/vnode/impl/src/vnodeWrite.c @@ -27,7 +27,7 @@ int vnodeProcessNoWalWMsgs(SVnode *pVnode, SRpcMsg *pMsg) { } int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) { - SRpcMsg * pMsg; + SRpcMsg *pMsg; for (int i = 0; i < taosArrayGetSize(pMsgs); i++) { pMsg = *(SRpcMsg **)taosArrayGet(pMsgs, i); @@ -50,8 +50,9 @@ int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) { } int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { - SVCreateTbReq vCreateTbReq; - void * ptr = vnodeMalloc(pVnode, pMsg->contLen); + SVCreateTbReq vCreateTbReq; + SVCreateTbBatchReq vCreateTbBatchReq; + void * ptr = vnodeMalloc(pVnode, pMsg->contLen); if (ptr == NULL) { // TODO: handle error } @@ -68,7 +69,6 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { switch (pMsg->msgType) { case TDMT_VND_CREATE_STB: - case TDMT_VND_CREATE_TABLE: tDeserializeSVCreateTbReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vCreateTbReq); if (metaCreateTable(pVnode->pMeta, &(vCreateTbReq)) < 0) { // TODO: handle error @@ -76,6 +76,15 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { // TODO: maybe need to clear the requst struct break; + case TDMT_VND_CREATE_TABLE: + tSVCreateTbBatchReqDeserialize(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vCreateTbBatchReq); + for (int i = 0; i < taosArrayGetSize(vCreateTbBatchReq.pArray); i++) { + SVCreateTbReq *pCreateTbReq = taosArrayGet(vCreateTbBatchReq.pArray, i); + if (metaCreateTable(pVnode->pMeta, pCreateTbReq) < 0) { + // TODO: handle error + } + } + case TDMT_VND_DROP_STB: case TDMT_VND_DROP_TABLE: // if (metaDropTable(pVnode->pMeta, vReq.dtReq.uid) < 0) { diff --git a/source/libs/parser/src/dCDAstProcess.c b/source/libs/parser/src/dCDAstProcess.c index 8c11f6e528..ecea48f583 100644 --- a/source/libs/parser/src/dCDAstProcess.c +++ b/source/libs/parser/src/dCDAstProcess.c @@ -35,7 +35,7 @@ static int32_t setShowInfo(SShowInfo* pShowInfo, SParseBasicCtx* pCtx, void** ou SVShowTablesReq* pShowReq = calloc(1, sizeof(SVShowTablesReq)); SArray* array = NULL; - SName name = {0}; + SName name = {0}; tNameSetDbName(&name, pCtx->acctId, pCtx->db, strlen(pCtx->db)); char dbFname[TSDB_DB_FNAME_LEN] = {0}; @@ -48,7 +48,7 @@ static int32_t setShowInfo(SShowInfo* pShowInfo, SParseBasicCtx* pCtx, void** ou pEpSet->numOfEps = info->numOfEps; pEpSet->inUse = info->inUse; - for(int32_t i = 0; i < pEpSet->numOfEps; ++i) { + for (int32_t i = 0; i < pEpSet->numOfEps; ++i) { strncpy(pEpSet->fqdn[i], info->epAddr[i].fqdn, tListLen(pEpSet->fqdn[i])); pEpSet->port[i] = info->epAddr[i].port; } @@ -190,7 +190,7 @@ static int32_t doCheckDbOptions(SCreateDbMsg* pCreate, SMsgBuf* pMsgBuf) { val = htonl(pCreate->numOfVgroups); if (val < TSDB_MIN_VNODES_PER_DB || val > TSDB_MAX_VNODES_PER_DB) { snprintf(msg, tListLen(msg), "invalid number of vgroups for DB:%d valid range: [%d, %d]", val, - TSDB_MIN_VNODES_PER_DB, TSDB_MAX_VNODES_PER_DB); + TSDB_MIN_VNODES_PER_DB, TSDB_MAX_VNODES_PER_DB); } return TSDB_CODE_SUCCESS; @@ -468,7 +468,7 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* p SKvParam param = {.builder = &kvRowBuilder, .schema = pSchema}; - SToken* pItem = taosArrayGet(pValList, i); + SToken* pItem = taosArrayGet(pValList, i); code = parseValueToken(&endPtr, pItem, pSchema, tinfo.precision, tmpTokenBuf, KvRowAppend, ¶m, pMsgBuf); if (code != TSDB_CODE_SUCCESS) { @@ -481,7 +481,7 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* p SKVRow row = tdGetKVRowFromBuilder(&kvRowBuilder); tdDestroyKVRowBuilder(&kvRowBuilder); if (row == NULL) { - return TSDB_CODE_QRY_OUT_OF_MEMORY; + return TSDB_CODE_QRY_OUT_OF_MEMORY; } tdSortKVRowByColIdx(row); @@ -501,15 +501,15 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* p req.ctbCfg.suid = pSuperTableMeta->suid; req.ctbCfg.pTag = row; -// pEpSet->inUse = info.inUse; -// pEpSet->numOfEps = info.numOfEps; -// for (int32_t i = 0; i < pEpSet->numOfEps; ++i) { -// pEpSet->port[i] = info.epAddr[i].port; -// tstrncpy(pEpSet->fqdn[i], info.epAddr[i].fqdn, tListLen(pEpSet->fqdn[i])); -// } -// ((SMsgHead*)(*pOutput))->vgId = htonl(info.vgId); -// ((SMsgHead*)(*pOutput))->contLen = htonl(serLen); - SVgroupTablesBatch *pTableBatch = taosHashGet(pVgroupHashmap, &info.vgId, sizeof(info.vgId)); + // pEpSet->inUse = info.inUse; + // pEpSet->numOfEps = info.numOfEps; + // for (int32_t i = 0; i < pEpSet->numOfEps; ++i) { + // pEpSet->port[i] = info.epAddr[i].port; + // tstrncpy(pEpSet->fqdn[i], info.epAddr[i].fqdn, tListLen(pEpSet->fqdn[i])); + // } + // ((SMsgHead*)(*pOutput))->vgId = htonl(info.vgId); + // ((SMsgHead*)(*pOutput))->contLen = htonl(serLen); + SVgroupTablesBatch* pTableBatch = taosHashGet(pVgroupHashmap, &info.vgId, sizeof(info.vgId)); if (pTableBatch == NULL) { SVgroupTablesBatch tBatch = {0}; tBatch.info = info; @@ -518,14 +518,35 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* p taosArrayPush(tBatch.req.pArray, &req); taosHashPut(pVgroupHashmap, &info.vgId, sizeof(info.vgId), &tBatch, sizeof(tBatch)); - } else { // add to the correct vgroup + } else { // add to the correct vgroup assert(info.vgId == pTableBatch->info.vgId); taosArrayPush(pTableBatch->req.pArray, &req); } } // TODO: serialize and - void *pBuf = NULL; + SArray* pBufArray = taosArrayInit(taosHashGetSize(pVgroupHashmap), sizeof(void*)); + SVgroupTablesBatch** ppTbBatch = NULL; + do { + ppTbBatch = taosHashIterate(pVgroupHashmap, ppTbBatch); + if (ppTbBatch == NULL) break; + SVgroupTablesBatch* pTbBatch = *ppTbBatch; + + int tlen = sizeof(SMsgHead) + tSVCreateTbBatchReqSerialize(NULL, &(pTbBatch->req)); + void* buf = malloc(tlen); + if (buf == NULL) { + // TODO: handle error + } + + ((SMsgHead*)buf)->vgId = htonl(pTbBatch->info.vgId); + ((SMsgHead*)buf)->contLen = htonl(tlen); + + void* pBuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); + tSVCreateTbBatchReqSerialize(&pBuf, &(pTbBatch->req)); + + taosArrayPush(pBufArray, &buf); + + } while (true); return TSDB_CODE_SUCCESS; } @@ -634,7 +655,7 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm case TSDB_SQL_SHOW: { SShowInfo* pShowInfo = &pInfo->pMiscInfo->showOpt; code = setShowInfo(pShowInfo, pCtx, (void**)&pDcl->pMsg, &pDcl->msgLen, &pDcl->epSet, &pDcl->pExtension, pMsgBuf); - pDcl->msgType = (pShowInfo->showType == TSDB_MGMT_TABLE_TABLE)? TDMT_VND_SHOW_TABLES:TDMT_MND_SHOW; + pDcl->msgType = (pShowInfo->showType == TSDB_MGMT_TABLE_TABLE) ? TDMT_VND_SHOW_TABLES : TDMT_MND_SHOW; break; } -- GitLab