diff --git a/include/dnode/vnode/meta/meta.h b/include/dnode/vnode/meta/meta.h index cc5eabf3bf4ebffdd92aaf434ca45640e40d34e0..b75e478add7bc18b34e737dbd77114b84c6ca3ab 100644 --- a/include/dnode/vnode/meta/meta.h +++ b/include/dnode/vnode/meta/meta.h @@ -37,6 +37,13 @@ typedef struct SMetaCfg { uint64_t lruSize; } SMetaCfg; +typedef struct { + int32_t nCols; + SSchema *pSchema; +} SSchemaWrapper; + +typedef struct SMTbCursor SMTbCursor; + typedef SVCreateTbReq STbCfg; // SMeta operations @@ -48,7 +55,13 @@ int metaDropTable(SMeta *pMeta, tb_uid_t uid); int metaCommit(SMeta *pMeta); // For Query -int metaGetTableInfo(SMeta *pMeta, char *tbname, STableMetaMsg **ppMsg); +STbCfg * metaGetTbInfoByUid(SMeta *pMeta, tb_uid_t uid); +STbCfg * metaGetTbInfoByName(SMeta *pMeta, char *tbname, tb_uid_t *uid); +SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, bool isinline); + +SMTbCursor * metaOpenTbCursor(SMeta *pMeta); +void metaCloseTbCursor(SMTbCursor *pTbCur); +char *metaTbCursorNext(SMTbCursor *pTbCur); // Options void metaOptionsInit(SMetaCfg *pMetaCfg); diff --git a/source/dnode/vnode/impl/inc/vnodeDef.h b/source/dnode/vnode/impl/inc/vnodeDef.h index 78214ce14da0099c832268fdc66be3fd86ed869d..fdb9837292abc0c686353bc491c8445ddfa0380e 100644 --- a/source/dnode/vnode/impl/inc/vnodeDef.h +++ b/source/dnode/vnode/impl/inc/vnodeDef.h @@ -31,10 +31,10 @@ #include "vnodeCommit.h" #include "vnodeFS.h" #include "vnodeMemAllocator.h" +#include "vnodeQuery.h" #include "vnodeRequest.h" #include "vnodeStateMgr.h" #include "vnodeSync.h" -#include "vnodeQuery.h" #ifdef __cplusplus extern "C" { @@ -62,6 +62,7 @@ typedef struct SVnodeMgr { extern SVnodeMgr vnodeMgr; struct SVnode { + int32_t vgId; char* path; SVnodeCfg config; SVState state; diff --git a/source/dnode/vnode/impl/src/vnodeQuery.c b/source/dnode/vnode/impl/src/vnodeQuery.c index 6632676d8bb336513295f25280c495a831d97382..2dde9e03e8346f377eced92876cb60fa32e5cf9b 100644 --- a/source/dnode/vnode/impl/src/vnodeQuery.c +++ b/source/dnode/vnode/impl/src/vnodeQuery.c @@ -47,23 +47,70 @@ int vnodeProcessFetchReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { } static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { - STableInfoMsg *pReq = (STableInfoMsg *)(pMsg->pCont); - STableMetaMsg *pRspMsg; - int ret; + STableInfoMsg * pReq = (STableInfoMsg *)(pMsg->pCont); + STbCfg * pTbCfg = NULL; + STbCfg * pStbCfg = NULL; + tb_uid_t uid; + int32_t nCols; + int32_t nTagCols; + SSchemaWrapper *pSW; + STableMetaMsg * pTbMetaMsg; + SSchema * pTagSchema; - if (metaGetTableInfo(pVnode->pMeta, pReq->tableFname, &pRspMsg) < 0) { + pTbCfg = metaGetTbInfoByName(pVnode->pMeta, pReq->tableFname, &uid); + if (pTbCfg == NULL) { return -1; } - *pRsp = malloc(sizeof(SRpcMsg)); - if (TD_IS_NULL(*pRsp)) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - free(pMsg); + if (pTbCfg->type == META_CHILD_TABLE) { + pStbCfg = metaGetTbInfoByUid(pVnode->pMeta, pTbCfg->ctbCfg.suid); + if (pStbCfg == NULL) { + return -1; + } + + pSW = metaGetTableSchema(pVnode->pMeta, pTbCfg->ctbCfg.suid, 0, true); + } else { + pSW = metaGetTableSchema(pVnode->pMeta, uid, 0, true); + } + + nCols = pSW->nCols; + if (pTbCfg->type == META_SUPER_TABLE) { + nTagCols = pTbCfg->stbCfg.nTagCols; + pTagSchema = pTbCfg->stbCfg.pTagSchema; + } else if (pTbCfg->type == META_SUPER_TABLE) { + nTagCols = pStbCfg->stbCfg.nTagCols; + pTagSchema = pStbCfg->stbCfg.pTagSchema; + } else { + nTagCols = 0; + pTagSchema = NULL; + } + + pTbMetaMsg = (STableMetaMsg *)calloc(1, sizeof(STableMetaMsg) + sizeof(SSchema) * (nCols + nTagCols)); + if (pTbMetaMsg == NULL) { return -1; } - // TODO - (*pRsp)->pCont = pRspMsg; + strcpy(pTbMetaMsg->tbFname, pTbCfg->name); + if (pTbCfg->type == META_CHILD_TABLE) { + strcpy(pTbMetaMsg->stbFname, pStbCfg->name); + pTbMetaMsg->suid = htobe64(pTbCfg->ctbCfg.suid); + } + pTbMetaMsg->numOfTags = htonl(nTagCols); + pTbMetaMsg->numOfColumns = htonl(nCols); + pTbMetaMsg->tableType = pTbCfg->type; + pTbMetaMsg->tuid = htobe64(uid); + pTbMetaMsg->vgId = htonl(pVnode->vgId); + + memcpy(pTbMetaMsg->pSchema, pSW->pSchema, sizeof(SSchema) * pSW->nCols); + if (nTagCols) { + memcpy(POINTER_SHIFT(pTbMetaMsg->pSchema, sizeof(SSchema) * pSW->nCols), pTagSchema, sizeof(SSchema) * nTagCols); + } + + for (int i = 0; i < nCols + nTagCols; i++) { + SSchema *pSch = pTbMetaMsg->pSchema + i; + pSch->colId = htonl(pSch->colId); + pSch->bytes = htonl(pSch->bytes); + } return 0; } \ No newline at end of file diff --git a/source/dnode/vnode/meta/src/metaBDBImpl.c b/source/dnode/vnode/meta/src/metaBDBImpl.c index 4ef7dc01a46eb937f402fc8656c9cf8366427995..1f693346cca1530ac38999d3a0b8aa05c5ca2b73 100644 --- a/source/dnode/vnode/meta/src/metaBDBImpl.c +++ b/source/dnode/vnode/meta/src/metaBDBImpl.c @@ -431,79 +431,154 @@ static void metaClearTbCfg(STbCfg *pTbCfg) { } /* ------------------------ FOR QUERY ------------------------ */ -int metaGetTableInfo(SMeta *pMeta, char *tbname, STableMetaMsg **ppMsg) { - DBT key = {0}; - DBT value = {0}; - SMetaDB * pMetaDB = pMeta->pDB; - int ret; - STbCfg tbCfg; - SSchemaKey schemaKey; - DBT key1 = {0}; - DBT value1 = {0}; - uint32_t ncols; - void * pBuf; - int tlen; - STableMetaMsg *pMsg; - SSchema * pSchema; +STbCfg *metaGetTbInfoByUid(SMeta *pMeta, tb_uid_t uid) { + STbCfg * pTbCfg = NULL; + SMetaDB *pDB = pMeta->pDB; + DBT key = {0}; + DBT value = {0}; + int ret; + + // Set key/value + key.data = &uid; + key.size = sizeof(uid); + + // Query + ret = pDB->pTbDB->get(pDB->pTbDB, NULL, &key, &value, 0); + if (ret != 0) { + return NULL; + } + + // Decode + pTbCfg = (STbCfg *)malloc(sizeof(*pTbCfg)); + if (pTbCfg == NULL) { + return NULL; + } + + metaDecodeTbInfo(value.data, pTbCfg); + return pTbCfg; +} + +STbCfg *metaGetTbInfoByName(SMeta *pMeta, char *tbname, tb_uid_t *uid) { + STbCfg * pTbCfg = NULL; + SMetaDB *pDB = pMeta->pDB; + DBT key = {0}; + DBT pkey = {0}; + DBT pvalue = {0}; + int ret; + + // Set key/value key.data = tbname; - key.size = strlen(tbname) + 1; + key.size = strlen(tbname); - ret = pMetaDB->pNameIdx->get(pMetaDB->pNameIdx, NULL, &key, &value, 0); + // Query + ret = pDB->pNameIdx->pget(pDB->pNameIdx, NULL, &key, &pkey, &pvalue, 0); if (ret != 0) { - // TODO - return -1; + return NULL; } - metaDecodeTbInfo(value.data, &tbCfg); - - switch (tbCfg.type) { - case META_SUPER_TABLE: - schemaKey.uid = tbCfg.stbCfg.suid; - schemaKey.sver = 0; - - key1.data = &schemaKey; - key1.size = sizeof(schemaKey); - - ret = pMetaDB->pSchemaDB->get(pMetaDB->pSchemaDB, &key1, &value1, NULL, 0); - if (ret != 0) { - // TODO - return -1; - } - pBuf = value1.data; - pBuf = taosDecodeFixedU32(pBuf, &ncols); - - tlen = sizeof(STableMetaMsg) + (tbCfg.stbCfg.nTagCols + ncols) * sizeof(SSchema); - pMsg = calloc(1, tlen); - if (pMsg == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - strcpy(pMsg->tbFname, tbCfg.name); - pMsg->numOfTags = tbCfg.stbCfg.nTagCols; - pMsg->numOfColumns = ncols; - pMsg->tableType = tbCfg.type; - pMsg->sversion = 0; - pMsg->tversion = 0; - pMsg->suid = tbCfg.stbCfg.suid; - pMsg->tuid = tbCfg.stbCfg.suid; - memcpy(pMsg->pSchema, tbCfg.stbCfg.pSchema, sizeof(SSchema) * tbCfg.stbCfg.nCols); - memcpy(POINTER_SHIFT(pMsg->pSchema, sizeof(SSchema) * tbCfg.stbCfg.nCols), tbCfg.stbCfg.pTagSchema, - sizeof(SSchema) * tbCfg.stbCfg.nTagCols); - break; - case META_CHILD_TABLE: - ASSERT(0); - break; - case META_NORMAL_TABLE: - ASSERT(0); - break; - default: - ASSERT(0); - break; - } - - *ppMsg = pMsg; + // Decode + *uid = *(tb_uid_t *)(pkey.data); + pTbCfg = (STbCfg *)malloc(sizeof(*pTbCfg)); + if (pTbCfg == NULL) { + return NULL; + } - return 0; + metaDecodeTbInfo(pvalue.data, pTbCfg); + + return pTbCfg; +} + +SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, bool isinline) { + uint32_t nCols; + SSchemaWrapper *pSW = NULL; + SMetaDB * pDB = pMeta->pDB; + int ret; + void * pBuf; + SSchema * pSchema; + SSchemaKey schemaKey = {uid, sver}; + DBT key = {0}; + DBT value = {0}; + + // Set key/value properties + key.data = &schemaKey; + key.size = sizeof(schemaKey); + + // Query + ret = pDB->pSchemaDB->get(pDB->pSchemaDB, NULL, &key, &value, 0); + if (ret != 0) { + return NULL; + } + + // Decode the schema + pBuf = value.data; + taosDecodeFixedI32(&pBuf, &nCols); + if (isinline) { + pSW = (SSchemaWrapper *)malloc(sizeof(*pSW) + sizeof(SSchema) * nCols); + if (pSW == NULL) { + return NULL; + } + pSW->pSchema = POINTER_SHIFT(pSW, sizeof(*pSW)); + } else { + pSW = (SSchemaWrapper *)malloc(sizeof(*pSW)); + if (pSW == NULL) { + return NULL; + } + + pSW->pSchema = (SSchema *)malloc(sizeof(SSchema) * nCols); + if (pSW->pSchema == NULL) { + free(pSW); + return NULL; + } + } + + for (int i = 0; i < nCols; i++) { + pSchema = pSW->pSchema + i; + taosDecodeFixedI8(&pBuf, &(pSchema->type)); + taosDecodeFixedI32(&pBuf, &(pSchema->colId)); + taosDecodeFixedI32(&pBuf, &(pSchema->bytes)); + taosDecodeStringTo(&pBuf, pSchema->name); + } + + return pSW; +} + +struct SMTbCursor { + DBC *pCur; +}; + +SMTbCursor *metaOpenTbCursor(SMeta *pMeta) { + SMTbCursor *pTbCur = NULL; + SMetaDB * pDB = pMeta->pDB; + + pTbCur = (SMTbCursor *)calloc(1, sizeof(*pTbCur)); + if (pTbCur == NULL) { + return NULL; + } + + pDB->pTbDB->cursor(pDB->pTbDB, NULL, &(pTbCur->pCur), 0); + + return pTbCur; +} + +void metaCloseTbCursor(SMTbCursor *pTbCur) { + if (pTbCur) { + if (pTbCur->pCur) { + pTbCur->pCur->close(pTbCur->pCur); + } + free(pTbCur); + } +} + +char *metaTbCursorNext(SMTbCursor *pTbCur) { + DBT key = {0}; + DBT value = {0}; + STbCfg tbCfg; + + if (pTbCur->pCur->get(pTbCur->pCur, &key, &value, DB_NEXT) == 0) { + metaDecodeTbInfo(&(value.data), &tbCfg); + return tbCfg.name; + } else { + return NULL; + } } \ No newline at end of file