未验证 提交 d94f2c32 编写于 作者: H Hongze Cheng 提交者: GitHub

Merge pull request #9506 from taosdata/feature/vnode

Merge branch '3.0' into feature/vnode
...@@ -37,6 +37,13 @@ typedef struct SMetaCfg { ...@@ -37,6 +37,13 @@ typedef struct SMetaCfg {
uint64_t lruSize; uint64_t lruSize;
} SMetaCfg; } SMetaCfg;
typedef struct {
int32_t nCols;
SSchema *pSchema;
} SSchemaWrapper;
typedef struct SMTbCursor SMTbCursor;
typedef SVCreateTbReq STbCfg; typedef SVCreateTbReq STbCfg;
// SMeta operations // SMeta operations
...@@ -48,7 +55,13 @@ int metaDropTable(SMeta *pMeta, tb_uid_t uid); ...@@ -48,7 +55,13 @@ int metaDropTable(SMeta *pMeta, tb_uid_t uid);
int metaCommit(SMeta *pMeta); int metaCommit(SMeta *pMeta);
// For Query // For Query
int metaGetTableInfo(SMeta *pMeta, char *tbname, STableMetaMsg **ppMsg); STbCfg * metaGetTbInfoByUid(SMeta *pMeta, tb_uid_t uid);
STbCfg * metaGetTbInfoByName(SMeta *pMeta, char *tbname, tb_uid_t *uid);
SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, bool isinline);
SMTbCursor * metaOpenTbCursor(SMeta *pMeta);
void metaCloseTbCursor(SMTbCursor *pTbCur);
char *metaTbCursorNext(SMTbCursor *pTbCur);
// Options // Options
void metaOptionsInit(SMetaCfg *pMetaCfg); void metaOptionsInit(SMetaCfg *pMetaCfg);
......
...@@ -31,10 +31,10 @@ ...@@ -31,10 +31,10 @@
#include "vnodeCommit.h" #include "vnodeCommit.h"
#include "vnodeFS.h" #include "vnodeFS.h"
#include "vnodeMemAllocator.h" #include "vnodeMemAllocator.h"
#include "vnodeQuery.h"
#include "vnodeRequest.h" #include "vnodeRequest.h"
#include "vnodeStateMgr.h" #include "vnodeStateMgr.h"
#include "vnodeSync.h" #include "vnodeSync.h"
#include "vnodeQuery.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
...@@ -62,6 +62,7 @@ typedef struct SVnodeMgr { ...@@ -62,6 +62,7 @@ typedef struct SVnodeMgr {
extern SVnodeMgr vnodeMgr; extern SVnodeMgr vnodeMgr;
struct SVnode { struct SVnode {
int32_t vgId;
char* path; char* path;
SVnodeCfg config; SVnodeCfg config;
SVState state; SVState state;
......
...@@ -47,23 +47,70 @@ int vnodeProcessFetchReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { ...@@ -47,23 +47,70 @@ int vnodeProcessFetchReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
} }
static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
STableInfoMsg *pReq = (STableInfoMsg *)(pMsg->pCont); STableInfoMsg * pReq = (STableInfoMsg *)(pMsg->pCont);
STableMetaMsg *pRspMsg; STbCfg * pTbCfg = NULL;
int ret; STbCfg * pStbCfg = NULL;
tb_uid_t uid;
int32_t nCols;
int32_t nTagCols;
SSchemaWrapper *pSW;
STableMetaMsg * pTbMetaMsg;
SSchema * pTagSchema;
if (metaGetTableInfo(pVnode->pMeta, pReq->tableFname, &pRspMsg) < 0) { pTbCfg = metaGetTbInfoByName(pVnode->pMeta, pReq->tableFname, &uid);
if (pTbCfg == NULL) {
return -1; return -1;
} }
*pRsp = malloc(sizeof(SRpcMsg)); if (pTbCfg->type == META_CHILD_TABLE) {
if (TD_IS_NULL(*pRsp)) { pStbCfg = metaGetTbInfoByUid(pVnode->pMeta, pTbCfg->ctbCfg.suid);
terrno = TSDB_CODE_OUT_OF_MEMORY; if (pStbCfg == NULL) {
free(pMsg); return -1;
}
pSW = metaGetTableSchema(pVnode->pMeta, pTbCfg->ctbCfg.suid, 0, true);
} else {
pSW = metaGetTableSchema(pVnode->pMeta, uid, 0, true);
}
nCols = pSW->nCols;
if (pTbCfg->type == META_SUPER_TABLE) {
nTagCols = pTbCfg->stbCfg.nTagCols;
pTagSchema = pTbCfg->stbCfg.pTagSchema;
} else if (pTbCfg->type == META_SUPER_TABLE) {
nTagCols = pStbCfg->stbCfg.nTagCols;
pTagSchema = pStbCfg->stbCfg.pTagSchema;
} else {
nTagCols = 0;
pTagSchema = NULL;
}
pTbMetaMsg = (STableMetaMsg *)calloc(1, sizeof(STableMetaMsg) + sizeof(SSchema) * (nCols + nTagCols));
if (pTbMetaMsg == NULL) {
return -1; return -1;
} }
// TODO strcpy(pTbMetaMsg->tbFname, pTbCfg->name);
(*pRsp)->pCont = pRspMsg; if (pTbCfg->type == META_CHILD_TABLE) {
strcpy(pTbMetaMsg->stbFname, pStbCfg->name);
pTbMetaMsg->suid = htobe64(pTbCfg->ctbCfg.suid);
}
pTbMetaMsg->numOfTags = htonl(nTagCols);
pTbMetaMsg->numOfColumns = htonl(nCols);
pTbMetaMsg->tableType = pTbCfg->type;
pTbMetaMsg->tuid = htobe64(uid);
pTbMetaMsg->vgId = htonl(pVnode->vgId);
memcpy(pTbMetaMsg->pSchema, pSW->pSchema, sizeof(SSchema) * pSW->nCols);
if (nTagCols) {
memcpy(POINTER_SHIFT(pTbMetaMsg->pSchema, sizeof(SSchema) * pSW->nCols), pTagSchema, sizeof(SSchema) * nTagCols);
}
for (int i = 0; i < nCols + nTagCols; i++) {
SSchema *pSch = pTbMetaMsg->pSchema + i;
pSch->colId = htonl(pSch->colId);
pSch->bytes = htonl(pSch->bytes);
}
return 0; return 0;
} }
\ No newline at end of file
...@@ -431,79 +431,154 @@ static void metaClearTbCfg(STbCfg *pTbCfg) { ...@@ -431,79 +431,154 @@ static void metaClearTbCfg(STbCfg *pTbCfg) {
} }
/* ------------------------ FOR QUERY ------------------------ */ /* ------------------------ FOR QUERY ------------------------ */
int metaGetTableInfo(SMeta *pMeta, char *tbname, STableMetaMsg **ppMsg) { STbCfg *metaGetTbInfoByUid(SMeta *pMeta, tb_uid_t uid) {
DBT key = {0}; STbCfg * pTbCfg = NULL;
DBT value = {0}; SMetaDB *pDB = pMeta->pDB;
SMetaDB * pMetaDB = pMeta->pDB; DBT key = {0};
int ret; DBT value = {0};
STbCfg tbCfg; int ret;
SSchemaKey schemaKey;
DBT key1 = {0}; // Set key/value
DBT value1 = {0}; key.data = &uid;
uint32_t ncols; key.size = sizeof(uid);
void * pBuf;
int tlen; // Query
STableMetaMsg *pMsg; ret = pDB->pTbDB->get(pDB->pTbDB, NULL, &key, &value, 0);
SSchema * pSchema; if (ret != 0) {
return NULL;
}
// Decode
pTbCfg = (STbCfg *)malloc(sizeof(*pTbCfg));
if (pTbCfg == NULL) {
return NULL;
}
metaDecodeTbInfo(value.data, pTbCfg);
return pTbCfg;
}
STbCfg *metaGetTbInfoByName(SMeta *pMeta, char *tbname, tb_uid_t *uid) {
STbCfg * pTbCfg = NULL;
SMetaDB *pDB = pMeta->pDB;
DBT key = {0};
DBT pkey = {0};
DBT pvalue = {0};
int ret;
// Set key/value
key.data = tbname; key.data = tbname;
key.size = strlen(tbname) + 1; key.size = strlen(tbname);
ret = pMetaDB->pNameIdx->get(pMetaDB->pNameIdx, NULL, &key, &value, 0); // Query
ret = pDB->pNameIdx->pget(pDB->pNameIdx, NULL, &key, &pkey, &pvalue, 0);
if (ret != 0) { if (ret != 0) {
// TODO return NULL;
return -1;
} }
metaDecodeTbInfo(value.data, &tbCfg); // Decode
*uid = *(tb_uid_t *)(pkey.data);
switch (tbCfg.type) { pTbCfg = (STbCfg *)malloc(sizeof(*pTbCfg));
case META_SUPER_TABLE: if (pTbCfg == NULL) {
schemaKey.uid = tbCfg.stbCfg.suid; return NULL;
schemaKey.sver = 0; }
key1.data = &schemaKey;
key1.size = sizeof(schemaKey);
ret = pMetaDB->pSchemaDB->get(pMetaDB->pSchemaDB, &key1, &value1, NULL, 0);
if (ret != 0) {
// TODO
return -1;
}
pBuf = value1.data;
pBuf = taosDecodeFixedU32(pBuf, &ncols);
tlen = sizeof(STableMetaMsg) + (tbCfg.stbCfg.nTagCols + ncols) * sizeof(SSchema);
pMsg = calloc(1, tlen);
if (pMsg == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
strcpy(pMsg->tbFname, tbCfg.name);
pMsg->numOfTags = tbCfg.stbCfg.nTagCols;
pMsg->numOfColumns = ncols;
pMsg->tableType = tbCfg.type;
pMsg->sversion = 0;
pMsg->tversion = 0;
pMsg->suid = tbCfg.stbCfg.suid;
pMsg->tuid = tbCfg.stbCfg.suid;
memcpy(pMsg->pSchema, tbCfg.stbCfg.pSchema, sizeof(SSchema) * tbCfg.stbCfg.nCols);
memcpy(POINTER_SHIFT(pMsg->pSchema, sizeof(SSchema) * tbCfg.stbCfg.nCols), tbCfg.stbCfg.pTagSchema,
sizeof(SSchema) * tbCfg.stbCfg.nTagCols);
break;
case META_CHILD_TABLE:
ASSERT(0);
break;
case META_NORMAL_TABLE:
ASSERT(0);
break;
default:
ASSERT(0);
break;
}
*ppMsg = pMsg;
return 0; metaDecodeTbInfo(pvalue.data, pTbCfg);
return pTbCfg;
}
SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, bool isinline) {
uint32_t nCols;
SSchemaWrapper *pSW = NULL;
SMetaDB * pDB = pMeta->pDB;
int ret;
void * pBuf;
SSchema * pSchema;
SSchemaKey schemaKey = {uid, sver};
DBT key = {0};
DBT value = {0};
// Set key/value properties
key.data = &schemaKey;
key.size = sizeof(schemaKey);
// Query
ret = pDB->pSchemaDB->get(pDB->pSchemaDB, NULL, &key, &value, 0);
if (ret != 0) {
return NULL;
}
// Decode the schema
pBuf = value.data;
taosDecodeFixedI32(&pBuf, &nCols);
if (isinline) {
pSW = (SSchemaWrapper *)malloc(sizeof(*pSW) + sizeof(SSchema) * nCols);
if (pSW == NULL) {
return NULL;
}
pSW->pSchema = POINTER_SHIFT(pSW, sizeof(*pSW));
} else {
pSW = (SSchemaWrapper *)malloc(sizeof(*pSW));
if (pSW == NULL) {
return NULL;
}
pSW->pSchema = (SSchema *)malloc(sizeof(SSchema) * nCols);
if (pSW->pSchema == NULL) {
free(pSW);
return NULL;
}
}
for (int i = 0; i < nCols; i++) {
pSchema = pSW->pSchema + i;
taosDecodeFixedI8(&pBuf, &(pSchema->type));
taosDecodeFixedI32(&pBuf, &(pSchema->colId));
taosDecodeFixedI32(&pBuf, &(pSchema->bytes));
taosDecodeStringTo(&pBuf, pSchema->name);
}
return pSW;
}
struct SMTbCursor {
DBC *pCur;
};
SMTbCursor *metaOpenTbCursor(SMeta *pMeta) {
SMTbCursor *pTbCur = NULL;
SMetaDB * pDB = pMeta->pDB;
pTbCur = (SMTbCursor *)calloc(1, sizeof(*pTbCur));
if (pTbCur == NULL) {
return NULL;
}
pDB->pTbDB->cursor(pDB->pTbDB, NULL, &(pTbCur->pCur), 0);
return pTbCur;
}
void metaCloseTbCursor(SMTbCursor *pTbCur) {
if (pTbCur) {
if (pTbCur->pCur) {
pTbCur->pCur->close(pTbCur->pCur);
}
free(pTbCur);
}
}
char *metaTbCursorNext(SMTbCursor *pTbCur) {
DBT key = {0};
DBT value = {0};
STbCfg tbCfg;
if (pTbCur->pCur->get(pTbCur->pCur, &key, &value, DB_NEXT) == 0) {
metaDecodeTbInfo(&(value.data), &tbCfg);
return tbCfg.name;
} else {
return NULL;
}
} }
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册