提交 99d6fda6 编写于 作者: D dapan1121

feature/qnode

上级 9f4744c6
......@@ -2293,6 +2293,24 @@ static FORCE_INLINE void* tDecodeTSmaWrapper(void* buf, STSmaWrapper* pSW) {
return buf;
}
typedef struct {
char indexFName[TSDB_INDEX_FNAME_LEN];
} SUserIndexReq;
int32_t tSerializeSUserIndexReq(void* buf, int32_t bufLen, SUserIndexReq* pReq);
int32_t tDeserializeSUserIndexReq(void* buf, int32_t bufLen, SUserIndexReq* pReq);
typedef struct {
char dbFName[TSDB_DB_FNAME_LEN];
char tblFName[TSDB_TABLE_FNAME_LEN];
char colName[TSDB_COL_NAME_LEN];
char indexType[TSDB_INDEX_TYPE_LEN];
char indexExts[TSDB_INDEX_EXTS_LEN];
} SUserIndexRsp;
int32_t tSerializeSUserIndexRsp(void* buf, int32_t bufLen, const SUserIndexRsp* pRsp);
int32_t tDeserializeSUserIndexRsp(void* buf, int32_t bufLen, SUserIndexRsp* pRsp);
typedef struct {
int8_t mqMsgType;
int32_t code;
......
......@@ -156,6 +156,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_MND_ALTER_STREAM, "mnode-alter-stream", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_DROP_STREAM, "mnode-drop-stream", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_GET_DB_CFG, "mnode-get-db-cfg", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_GET_INDEX, "mnode-get-index", NULL, NULL)
// Requests handled by VNODE
TD_NEW_MSG_SEG(TDMT_VND_MSG)
......
......@@ -217,6 +217,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MND_INVALID_DB_OPTION TAOS_DEF_ERROR_CODE(0, 0x0385)
#define TSDB_CODE_MND_INVALID_DB_ACCT TAOS_DEF_ERROR_CODE(0, 0x0386)
#define TSDB_CODE_MND_DB_OPTION_UNCHANGED TAOS_DEF_ERROR_CODE(0, 0x0387)
#define TSDB_CODE_MND_DB_INDEX_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0388)
// mnode-vgroup
#define TSDB_CODE_MND_VGROUP_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0390)
......
......@@ -109,6 +109,9 @@ extern const int32_t TYPE_BYTES[15];
#define TSDB_INS_TABLE_USER_USERS "user_users"
#define TSDB_INS_TABLE_VGROUPS "vgroups"
#define TSDB_INDEX_TYPE_SMA "SMA"
#define TSDB_INDEX_TYPE_FULLTEXT "FULLTEXT"
#define TSDB_INS_USER_STABLES_DBNAME_COLID 2
#define TSDB_TICK_PER_SECOND(precision) \
......@@ -213,6 +216,9 @@ typedef enum ELogicConditionType {
#define TSDB_FUNC_MAX_RETRIEVE 1024
#define TSDB_INDEX_NAME_LEN 65 // 64 + 1 '\0'
#define TSDB_INDEX_TYPE_LEN 10
#define TSDB_INDEX_EXTS_LEN 256
#define TSDB_INDEX_FNAME_LEN (TSDB_DB_FNAME_LEN + TSDB_INDEX_NAME_LEN + TSDB_NAME_DELIMITER_LEN)
#define TSDB_TYPE_STR_MAX_LEN 32
#define TSDB_TABLE_FNAME_LEN (TSDB_DB_FNAME_LEN + TSDB_TABLE_NAME_LEN + TSDB_NAME_DELIMITER_LEN)
#define TSDB_TOPIC_FNAME_LEN TSDB_TABLE_FNAME_LEN
......
......@@ -2033,6 +2033,64 @@ int32_t tDeserializeSDbCfgRsp(void* buf, int32_t bufLen, SDbCfgRsp* pRsp) {
return 0;
}
int32_t tSerializeSUserIndexReq(void* buf, int32_t bufLen, SUserIndexReq* pReq) {
SCoder encoder = {0};
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->indexFName) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tCoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSUserIndexReq(void* buf, int32_t bufLen, SUserIndexReq* pReq) {
SCoder decoder = {0};
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->indexFName) < 0) return -1;
tEndDecode(&decoder);
tCoderClear(&decoder);
return 0;
}
int32_t tSerializeSUserIndexRsp(void* buf, int32_t bufLen, const SUserIndexRsp* pRsp) {
SCoder encoder = {0};
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeCStr(&encoder, pRsp->dbFName) < 0) return -1;
if (tEncodeCStr(&encoder, pRsp->tblFName) < 0) return -1;
if (tEncodeCStr(&encoder, pRsp->colName) < 0) return -1;
if (tEncodeCStr(&encoder, pRsp->indexType) < 0) return -1;
if (tEncodeCStr(&encoder, pRsp->indexExts) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tCoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSUserIndexRsp(void* buf, int32_t bufLen, SUserIndexRsp* pRsp) {
SCoder decoder = {0};
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeCStrTo(&decoder, pRsp->dbFName) < 0) return -1;
if (tDecodeCStrTo(&decoder, pRsp->tblFName) < 0) return -1;
if (tDecodeCStrTo(&decoder, pRsp->colName) < 0) return -1;
if (tDecodeCStrTo(&decoder, pRsp->indexType) < 0) return -1;
if (tDecodeCStrTo(&decoder, pRsp->indexExts) < 0) return -1;
tEndDecode(&decoder);
tCoderClear(&decoder);
return 0;
}
int32_t tSerializeSShowReq(void *buf, int32_t bufLen, SShowReq *pReq) {
SCoder encoder = {0};
......
......@@ -147,6 +147,7 @@ void mmInitMsgHandle(SMgmtWrapper *pWrapper) {
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_STREAM, mmProcessWriteMsg, DEFAULT_HANDLE);
dndSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
dndSetMsgHandle(pWrapper, TDMT_MND_GET_DB_CFG, mmProcessReadMsg, DEFAULT_HANDLE);
dndSetMsgHandle(pWrapper, TDMT_MND_GET_INDEX, mmProcessReadMsg, DEFAULT_HANDLE);
// Requests handled by VNODE
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CONN_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
......
......@@ -58,6 +58,7 @@ int32_t mndInitDb(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_MND_SYNC_DB, mndProcessSyncDbReq);
mndSetMsgHandle(pMnode, TDMT_MND_COMPACT_DB, mndProcessCompactDbReq);
mndSetMsgHandle(pMnode, TDMT_MND_GET_DB_CFG, mndProcessGetDbCfgReq);
mndSetMsgHandle(pMnode, TDMT_MND_GET_INDEX, mndProcessGetIndexReq);
mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_DB, mndGetDbMeta);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_DB, mndRetrieveDbs);
......@@ -1661,3 +1662,48 @@ static void mndCancelGetNextDb(SMnode *pMnode, void *pIter) {
SSdb *pSdb = pMnode->pSdb;
sdbCancelFetch(pSdb, pIter);
}
static int32_t mndProcessGetIndexReq(SNodeMsg *pReq) {
SUserIndexReq indexReq = {0};
SMnode *pMnode = pReq->pNode;
int32_t code = -1;
SUserIndexRsp rsp = {0};
bool exist = false;
if (tDeserializeSUserIndexReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &indexReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
goto _OVER;
}
code = mndProcessGetSmaReq(pMnode, &indexReq, &rsp, &exist);
if (code) {
goto _OVER;
}
if (!exist) {
//TODO GET INDEX FROM FULLTEXT
} else {
int32_t contLen = tSerializeSUserIndexRsp(NULL, 0, &rsp);
void *pRsp = rpcMallocCont(contLen);
if (pRsp == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
code = -1;
goto _OVER;
}
tSerializeSUserIndexRsp(pRsp, contLen, &rsp);
pReq->pRsp = pRsp;
pReq->rspLen = contLen;
code = 0;
}
_OVER:
if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mError("failed to get index %s since %s", indexReq.indexFName, terrstr());
}
return code;
}
......@@ -18,6 +18,7 @@
#define SYSTABLE_SCH_TABLE_NAME_LEN ((TSDB_TABLE_NAME_LEN - 1) + VARSTR_HEADER_SIZE)
#define SYSTABLE_SCH_DB_NAME_LEN ((TSDB_DB_NAME_LEN - 1) + VARSTR_HEADER_SIZE)
#define SYSTABLE_SCH_COL_NAME_LEN ((TSDB_COL_NAME_LEN - 1) + VARSTR_HEADER_SIZE)
//!!!! Note: only APPEND columns in below tables, NO insert !!!!
static const SInfosTableSchema dnodesSchema[] = {
......@@ -79,11 +80,11 @@ static const SInfosTableSchema userFuncSchema[] = {
{.name = "status", .bytes = 10, .type = TSDB_DATA_TYPE_BINARY},
};
static const SInfosTableSchema userIdxSchema[] = {
{.name = "db_name", .bytes = 32, .type = TSDB_DATA_TYPE_BINARY},
{.name = "table_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
{.name = "index_database", .bytes = 32, .type = TSDB_DATA_TYPE_BINARY},
{.name = "index_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
{.name = "column_name", .bytes = 64, .type = TSDB_DATA_TYPE_BINARY},
{.name = "db_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
{.name = "table_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
{.name = "index_database", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
{.name = "index_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
{.name = "column_name", .bytes = SYSTABLE_SCH_COL_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
{.name = "index_type", .bytes = 10, .type = TSDB_DATA_TYPE_BINARY},
{.name = "index_extensions", .bytes = 256, .type = TSDB_DATA_TYPE_BINARY},
};
......
......@@ -689,6 +689,40 @@ _OVER:
return code;
}
static int32_t mndProcessGetSmaReq(SMnode *pMnode, SUserIndexReq *indexReq, SUserIndexRsp *rsp, bool *exist) {
int32_t code = -1;
SSmaObj *pSma = NULL;
pSma = mndAcquireSma(pMnode, indexReq.indexFName);
if (pSma == NULL) {
//TODO TRY TO GET INDEX FROM FULLTEXT
*exist = false;
return 0;
}
memcpy(rsp->dbFName, pSma->db, sizeof(pSma->db));
memcpy(rsp->tblFName, pSma->stb, sizeof(pSma->stb));
strcpy(rsp->indexType, TSDB_INDEX_TYPE_SMA);
SNodeList *pList = NULL;
int32_t extOffset = 0;
code = nodesStringToList(pSma->expr, &pList);
if (0 == code) {
SNode *node = NULL;
FOREACH(node, pList) {
SFunctionNode *pFunc = (SFunctionNode *)node;
extOffset += snprintf(rsp->indexExts + extOffset, sizeof(rsp->indexExts) - extOffset - 1, "%s%s", (extOffset ? ",":""), pFunc->functionName);
}
*exist = true;
}
mndReleaseSma(pMnode, pSma);
return code;
}
static int32_t mndProcessVDropSmaRsp(SNodeMsg *pRsp) {
mndTransProcessRsp(pRsp);
return 0;
......
......@@ -224,6 +224,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_DB, "Invalid database name
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_DB_OPTION, "Invalid database options")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_DB_ACCT, "Invalid database account")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_DB_OPTION_UNCHANGED, "Database options not changed")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_DB_INDEX_NOT_EXIST, "Index not exist")
// mnode-vgroup
TAOS_DEFINE_ERROR(TSDB_CODE_MND_VGROUP_ALREADY_IN_DNODE, "Vgroup already in dnode")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册