提交 3d396ca7 编写于 作者: S Shengliang Guan

serialize func msg

上级 b19987ac
......@@ -394,8 +394,8 @@ typedef struct {
SHashObj* writeDbs;
} SGetUserAuthRsp;
int32_t tSerializeSGetUserAuthRsp(void* buf, int32_t bufLen, SGetUserAuthRsp* pReq);
int32_t tDeserializeSGetUserAuthRsp(void* buf, int32_t bufLen, SGetUserAuthRsp* pReq);
int32_t tSerializeSGetUserAuthRsp(void* buf, int32_t bufLen, SGetUserAuthRsp* pRsp);
int32_t tDeserializeSGetUserAuthRsp(void* buf, int32_t bufLen, SGetUserAuthRsp* pRsp);
typedef struct {
int16_t colId; // column id
......@@ -616,22 +616,31 @@ typedef struct {
int64_t signature;
int32_t commentSize;
int32_t codeSize;
char pCont[];
char pComment[TSDB_FUNC_COMMENT_LEN];
char pCode[TSDB_FUNC_CODE_LEN];
} SCreateFuncReq;
int32_t tSerializeSCreateFuncReq(void* buf, int32_t bufLen, SCreateFuncReq* pReq);
int32_t tDeserializeSCreateFuncReq(void* buf, int32_t bufLen, SCreateFuncReq* pReq);
typedef struct {
char name[TSDB_FUNC_NAME_LEN];
int8_t igNotExists;
} SDropFuncReq;
int32_t tSerializeSDropFuncReq(void* buf, int32_t bufLen, SDropFuncReq* pReq);
int32_t tDeserializeSDropFuncReq(void* buf, int32_t bufLen, SDropFuncReq* pReq);
typedef struct {
int32_t numOfFuncs;
char pFuncNames[];
SArray* pFuncNames;
} SRetrieveFuncReq;
int32_t tSerializeSRetrieveFuncReq(void* buf, int32_t bufLen, SRetrieveFuncReq* pReq);
int32_t tDeserializeSRetrieveFuncReq(void* buf, int32_t bufLen, SRetrieveFuncReq* pReq);
typedef struct {
char name[TSDB_FUNC_NAME_LEN];
int8_t align;
int8_t funcType;
int8_t scriptType;
int8_t outputType;
......@@ -640,14 +649,18 @@ typedef struct {
int64_t signature;
int32_t commentSize;
int32_t codeSize;
char pCont[];
char pComment[TSDB_FUNC_COMMENT_LEN];
char pCode[TSDB_FUNC_CODE_LEN];
} SFuncInfo;
typedef struct {
int32_t numOfFuncs;
char pFuncInfos[];
SArray* pFuncInfos;
} SRetrieveFuncRsp;
int32_t tSerializeSRetrieveFuncRsp(void* buf, int32_t bufLen, SRetrieveFuncRsp* pRsp);
int32_t tDeserializeSRetrieveFuncRsp(void* buf, int32_t bufLen, SRetrieveFuncRsp* pRsp);
typedef struct {
int32_t statusInterval;
int64_t checkTime; // 1970-01-01 00:00:00.000
......
......@@ -753,29 +753,29 @@ int32_t tDeserializeSGetUserAuthReq(void *buf, int32_t bufLen, SGetUserAuthReq *
return 0;
}
int32_t tSerializeSGetUserAuthRsp(void *buf, int32_t bufLen, SGetUserAuthRsp *pReq) {
int32_t tSerializeSGetUserAuthRsp(void *buf, int32_t bufLen, SGetUserAuthRsp *pRsp) {
SCoder encoder = {0};
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->user) < 0) return -1;
if (tEncodeI8(&encoder, pReq->superAuth) < 0) return -1;
if (tEncodeCStr(&encoder, pRsp->user) < 0) return -1;
if (tEncodeI8(&encoder, pRsp->superAuth) < 0) return -1;
int32_t numOfReadDbs = taosHashGetSize(pReq->readDbs);
int32_t numOfWriteDbs = taosHashGetSize(pReq->writeDbs);
int32_t numOfReadDbs = taosHashGetSize(pRsp->readDbs);
int32_t numOfWriteDbs = taosHashGetSize(pRsp->writeDbs);
if (tEncodeI32(&encoder, numOfReadDbs) < 0) return -1;
if (tEncodeI32(&encoder, numOfWriteDbs) < 0) return -1;
char *db = taosHashIterate(pReq->readDbs, NULL);
char *db = taosHashIterate(pRsp->readDbs, NULL);
while (db != NULL) {
if (tEncodeCStr(&encoder, db) < 0) return -1;
db = taosHashIterate(pReq->readDbs, db);
db = taosHashIterate(pRsp->readDbs, db);
}
db = taosHashIterate(pReq->writeDbs, NULL);
db = taosHashIterate(pRsp->writeDbs, NULL);
while (db != NULL) {
if (tEncodeCStr(&encoder, db) < 0) return -1;
db = taosHashIterate(pReq->writeDbs, db);
db = taosHashIterate(pRsp->writeDbs, db);
}
tEndEncode(&encoder);
......@@ -785,10 +785,10 @@ int32_t tSerializeSGetUserAuthRsp(void *buf, int32_t bufLen, SGetUserAuthRsp *pR
return tlen;
}
int32_t tDeserializeSGetUserAuthRsp(void *buf, int32_t bufLen, SGetUserAuthRsp *pReq) {
pReq->readDbs = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
pReq->writeDbs = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
if (pReq->readDbs == NULL || pReq->writeDbs == NULL) {
int32_t tDeserializeSGetUserAuthRsp(void *buf, int32_t bufLen, SGetUserAuthRsp *pRsp) {
pRsp->readDbs = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
pRsp->writeDbs = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
if (pRsp->readDbs == NULL || pRsp->writeDbs == NULL) {
return -1;
}
......@@ -796,8 +796,8 @@ int32_t tDeserializeSGetUserAuthRsp(void *buf, int32_t bufLen, SGetUserAuthRsp *
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->user) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->superAuth) < 0) return -1;
if (tDecodeCStrTo(&decoder, pRsp->user) < 0) return -1;
if (tDecodeI8(&decoder, &pRsp->superAuth) < 0) return -1;
int32_t numOfReadDbs = 0;
int32_t numOfWriteDbs = 0;
......@@ -808,14 +808,14 @@ int32_t tDeserializeSGetUserAuthRsp(void *buf, int32_t bufLen, SGetUserAuthRsp *
char db[TSDB_DB_FNAME_LEN] = {0};
if (tDecodeCStrTo(&decoder, db) < 0) return -1;
int32_t len = strlen(db) + 1;
taosHashPut(pReq->readDbs, db, len, db, len);
taosHashPut(pRsp->readDbs, db, len, db, len);
}
for (int32_t i = 0; i < numOfWriteDbs; ++i) {
char db[TSDB_DB_FNAME_LEN] = {0};
if (tDecodeCStrTo(&decoder, db) < 0) return -1;
int32_t len = strlen(db) + 1;
taosHashPut(pReq->writeDbs, db, len, db, len);
taosHashPut(pRsp->writeDbs, db, len, db, len);
}
tEndDecode(&decoder);
......@@ -920,3 +920,180 @@ int32_t tDeserializeSCreateDnodeReq(void *buf, int32_t bufLen, SCreateDnodeReq *
tCoderClear(&decoder);
return 0;
}
int32_t tSerializeSCreateFuncReq(void *buf, int32_t bufLen, SCreateFuncReq *pReq) {
SCoder encoder = {0};
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->name) < 0) return -1;
if (tEncodeI8(&encoder, pReq->igExists) < 0) return -1;
if (tEncodeI8(&encoder, pReq->funcType) < 0) return -1;
if (tEncodeI8(&encoder, pReq->scriptType) < 0) return -1;
if (tEncodeI8(&encoder, pReq->outputType) < 0) return -1;
if (tEncodeI32(&encoder, pReq->outputLen) < 0) return -1;
if (tEncodeI32(&encoder, pReq->bufSize) < 0) return -1;
if (tEncodeI64(&encoder, pReq->signature) < 0) return -1;
if (tEncodeI32(&encoder, pReq->commentSize) < 0) return -1;
if (tEncodeI32(&encoder, pReq->codeSize) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->pComment) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->pCode) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tCoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSCreateFuncReq(void *buf, int32_t bufLen, SCreateFuncReq *pReq) {
SCoder decoder = {0};
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->igExists) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->funcType) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->scriptType) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->outputType) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->outputLen) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->bufSize) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->signature) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->commentSize) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->codeSize) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->pComment) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->pCode) < 0) return -1;
tEndDecode(&decoder);
tCoderClear(&decoder);
return 0;
}
int32_t tSerializeSDropFuncReq(void *buf, int32_t bufLen, SDropFuncReq *pReq) {
SCoder encoder = {0};
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->name) < 0) return -1;
if (tEncodeI8(&encoder, pReq->igNotExists) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tCoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSDropFuncReq(void *buf, int32_t bufLen, SDropFuncReq *pReq) {
SCoder decoder = {0};
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->igNotExists) < 0) return -1;
tEndDecode(&decoder);
tCoderClear(&decoder);
return 0;
}
int32_t tSerializeSRetrieveFuncReq(void *buf, int32_t bufLen, SRetrieveFuncReq *pReq) {
SCoder encoder = {0};
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeI32(&encoder, pReq->numOfFuncs) < 0) return -1;
if (pReq->numOfFuncs != (int32_t)taosArrayGetSize(pReq->pFuncNames)) return -1;
for (int32_t i = 0; i < pReq->numOfFuncs; ++i) {
char *fname = taosArrayGet(pReq->pFuncNames, i);
if (tEncodeCStr(&encoder, fname) < 0) return -1;
}
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tCoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSRetrieveFuncReq(void *buf, int32_t bufLen, SRetrieveFuncReq *pReq) {
SCoder decoder = {0};
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->numOfFuncs) < 0) return -1;
pReq->pFuncNames = taosArrayInit(pReq->numOfFuncs, TSDB_FUNC_NAME_LEN);
if (pReq->pFuncNames == NULL) return -1;
for (int32_t i = 0; i < pReq->numOfFuncs; ++i) {
char fname[TSDB_FUNC_NAME_LEN] = {0};
if (tDecodeCStrTo(&decoder, fname) < 0) return -1;
taosArrayPush(pReq->pFuncNames, fname);
}
tEndDecode(&decoder);
tCoderClear(&decoder);
return 0;
}
int32_t tSerializeSRetrieveFuncRsp(void *buf, int32_t bufLen, SRetrieveFuncRsp *pRsp) {
SCoder encoder = {0};
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->numOfFuncs) < 0) return -1;
if (pRsp->numOfFuncs != (int32_t)taosArrayGetSize(pRsp->pFuncInfos)) return -1;
for (int32_t i = 0; i < pRsp->numOfFuncs; ++i) {
SFuncInfo *pInfo = taosArrayGet(pRsp->pFuncInfos, i);
if (tEncodeCStr(&encoder, pInfo->name) < 0) return -1;
if (tEncodeI8(&encoder, pInfo->funcType) < 0) return -1;
if (tEncodeI8(&encoder, pInfo->scriptType) < 0) return -1;
if (tEncodeI8(&encoder, pInfo->outputType) < 0) return -1;
if (tEncodeI32(&encoder, pInfo->outputLen) < 0) return -1;
if (tEncodeI32(&encoder, pInfo->bufSize) < 0) return -1;
if (tEncodeI64(&encoder, pInfo->signature) < 0) return -1;
if (tEncodeI32(&encoder, pInfo->commentSize) < 0) return -1;
if (tEncodeI32(&encoder, pInfo->codeSize) < 0) return -1;
if (tEncodeCStr(&encoder, pInfo->pComment) < 0) return -1;
if (tEncodeCStr(&encoder, pInfo->pCode) < 0) return -1;
}
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tCoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSRetrieveFuncRsp(void *buf, int32_t bufLen, SRetrieveFuncRsp *pRsp) {
SCoder decoder = {0};
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeI32(&decoder, &pRsp->numOfFuncs) < 0) return -1;
pRsp->pFuncInfos = taosArrayInit(pRsp->numOfFuncs, sizeof(SFuncInfo));
if (pRsp->pFuncInfos == NULL) return -1;
for (int32_t i = 0; i < pRsp->numOfFuncs; ++i) {
SFuncInfo fInfo = {0};
if (tDecodeCStrTo(&decoder, fInfo.name) < 0) return -1;
if (tDecodeI8(&decoder, &fInfo.funcType) < 0) return -1;
if (tDecodeI8(&decoder, &fInfo.scriptType) < 0) return -1;
if (tDecodeI8(&decoder, &fInfo.outputType) < 0) return -1;
if (tDecodeI32(&decoder, &fInfo.outputLen) < 0) return -1;
if (tDecodeI32(&decoder, &fInfo.bufSize) < 0) return -1;
if (tDecodeI64(&decoder, &fInfo.signature) < 0) return -1;
if (tDecodeI32(&decoder, &fInfo.commentSize) < 0) return -1;
if (tDecodeI32(&decoder, &fInfo.codeSize) < 0) return -1;
if (tDecodeCStrTo(&decoder, fInfo.pComment) < 0) return -1;
if (tDecodeCStrTo(&decoder, fInfo.pCode) < 0) return -1;
taosArrayPush(pRsp->pFuncInfos, &fInfo);
}
tEndDecode(&decoder);
tCoderClear(&decoder);
return 0;
}
......@@ -29,9 +29,8 @@ int32_t mndCheckCreateUserAuth(SUserObj *pOperUser);
int32_t mndCheckAlterUserAuth(SUserObj *pOperUser, SUserObj *pUser, SDbObj *pDb, SAlterUserReq *pAlter);
int32_t mndCheckDropUserAuth(SUserObj *pOperUser);
int32_t mndCheckCreateNodeAuth(SUserObj *pOperUser);
int32_t mndCheckDropNodeAuth(SUserObj *pOperUser);
int32_t mndCheckAlterNodeAuth(SUserObj *pOperUser);
int32_t mndCheckOperateNodeAuth(SUserObj *pOperUser);
int32_t mndCheckOperateFuncAuth(SUserObj *pOperUser);
#ifdef __cplusplus
}
......
......@@ -111,7 +111,7 @@ int32_t mndCheckDropUserAuth(SUserObj *pOperUser) {
return -1;
}
int32_t mndCheckCreateNodeAuth(SUserObj *pOperUser) {
int32_t mndCheckOperateNodeAuth(SUserObj *pOperUser) {
if (pOperUser->superUser) {
return 0;
}
......@@ -120,6 +120,11 @@ int32_t mndCheckCreateNodeAuth(SUserObj *pOperUser) {
return -1;
}
int32_t mndCheckDropNodeAuth(SUserObj *pOperUser) { return mndCheckCreateNodeAuth(pOperUser); }
int32_t mndCheckOperateFuncAuth(SUserObj *pOperUser) {
if (pOperUser->superUser) {
return 0;
}
int32_t mndCheckAlterNodeAuth(SUserObj *pOperUser) { return mndCheckCreateNodeAuth(pOperUser); }
terrno = TSDB_CODE_MND_NO_RIGHTS;
return -1;
}
......@@ -293,7 +293,7 @@ static int32_t mndProcessCreateBnodeReq(SMnodeMsg *pReq) {
goto CREATE_BNODE_OVER;
}
if (mndCheckDropNodeAuth(pUser)) {
if (mndCheckOperateNodeAuth(pUser)) {
goto CREATE_BNODE_OVER;
}
......@@ -400,7 +400,7 @@ static int32_t mndProcessDropBnodeReq(SMnodeMsg *pReq) {
goto DROP_BNODE_OVER;
}
if (mndCheckCreateNodeAuth(pUser)) {
if (mndCheckOperateNodeAuth(pUser)) {
goto DROP_BNODE_OVER;
}
......
......@@ -496,7 +496,7 @@ static int32_t mndProcessCreateDnodeReq(SMnodeMsg *pReq) {
goto CREATE_DNODE_OVER;
}
if (mndCheckDropNodeAuth(pUser)) {
if (mndCheckOperateNodeAuth(pUser)) {
goto CREATE_DNODE_OVER;
}
......@@ -506,7 +506,6 @@ static int32_t mndProcessCreateDnodeReq(SMnodeMsg *pReq) {
CREATE_DNODE_OVER:
if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mError("dnode:%s:%d, failed to create since %s", createReq.fqdn, createReq.port, terrstr());
return -1;
}
mndReleaseDnode(pMnode, pDnode);
......@@ -571,7 +570,7 @@ static int32_t mndProcessDropDnodeReq(SMnodeMsg *pReq) {
goto DROP_DNODE_OVER;
}
if (mndCheckCreateNodeAuth(pUser)) {
if (mndCheckOperateNodeAuth(pUser)) {
goto DROP_DNODE_OVER;
}
......@@ -581,7 +580,6 @@ static int32_t mndProcessDropDnodeReq(SMnodeMsg *pReq) {
DROP_DNODE_OVER:
if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mError("dnode:%d, failed to drop since %s", dropReq.dnodeId, terrstr());
return -1;
}
mndReleaseDnode(pMnode, pDnode);
......
......@@ -15,9 +15,11 @@
#define _DEFAULT_SOURCE
#include "mndFunc.h"
#include "mndAuth.h"
#include "mndShow.h"
#include "mndSync.h"
#include "mndTrans.h"
#include "mndUser.h"
#define SDB_FUNC_VER 1
#define SDB_FUNC_RESERVE_SIZE 64
......@@ -201,8 +203,8 @@ static int32_t mndCreateFunc(SMnode *pMnode, SMnodeMsg *pReq, SCreateFuncReq *pC
goto CREATE_FUNC_OVER;
}
memcpy(func.pComment, pCreate->pCont, pCreate->commentSize);
memcpy(func.pCode, pCreate->pCont + pCreate->commentSize, func.codeSize);
memcpy(func.pComment, pCreate->pComment, pCreate->commentSize);
memcpy(func.pCode, pCreate->pCode, func.codeSize);
pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg);
if (pTrans == NULL) goto CREATE_FUNC_OVER;
......@@ -261,164 +263,202 @@ DROP_FUNC_OVER:
}
static int32_t mndProcessCreateFuncReq(SMnodeMsg *pReq) {
SMnode *pMnode = pReq->pMnode;
SCreateFuncReq *pCreate = pReq->rpcMsg.pCont;
pCreate->outputLen = htonl(pCreate->outputLen);
pCreate->bufSize = htonl(pCreate->bufSize);
pCreate->signature = htobe64(pCreate->signature);
pCreate->commentSize = htonl(pCreate->commentSize);
pCreate->codeSize = htonl(pCreate->codeSize);
SMnode *pMnode = pReq->pMnode;
int32_t code = -1;
SUserObj *pUser = NULL;
SFuncObj *pFunc = NULL;
SCreateFuncReq createReq = {0};
if (tDeserializeSCreateFuncReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &createReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
goto CREATE_FUNC_OVER;
}
mDebug("func:%s, start to create", pCreate->name);
mDebug("func:%s, start to create", createReq.name);
SFuncObj *pFunc = mndAcquireFunc(pMnode, pCreate->name);
pFunc = mndAcquireFunc(pMnode, createReq.name);
if (pFunc != NULL) {
mndReleaseFunc(pMnode, pFunc);
if (pCreate->igExists) {
mDebug("stb:%s, already exist, ignore exist is set", pCreate->name);
return 0;
if (createReq.igExists) {
mDebug("func:%s, already exist, ignore exist is set", createReq.name);
code = 0;
goto CREATE_FUNC_OVER;
} else {
terrno = TSDB_CODE_MND_FUNC_ALREADY_EXIST;
mError("func:%s, failed to create since %s", pCreate->name, terrstr());
return -1;
goto CREATE_FUNC_OVER;
}
} else if (terrno == TSDB_CODE_MND_FUNC_ALREADY_EXIST) {
mError("stb:%s, failed to create since %s", pCreate->name, terrstr());
return -1;
goto CREATE_FUNC_OVER;
}
if (pCreate->name[0] == 0) {
if (createReq.name[0] == 0) {
terrno = TSDB_CODE_MND_INVALID_FUNC_NAME;
mError("func:%s, failed to create since %s", pCreate->name, terrstr());
return -1;
goto CREATE_FUNC_OVER;
}
if (pCreate->commentSize <= 0 || pCreate->commentSize > TSDB_FUNC_COMMENT_LEN) {
if (createReq.commentSize <= 0 || createReq.commentSize > TSDB_FUNC_COMMENT_LEN) {
terrno = TSDB_CODE_MND_INVALID_FUNC_COMMENT;
mError("func:%s, failed to create since %s", pCreate->name, terrstr());
return -1;
goto CREATE_FUNC_OVER;
}
if (pCreate->codeSize <= 0 || pCreate->codeSize > TSDB_FUNC_CODE_LEN) {
if (createReq.codeSize <= 0 || createReq.codeSize > TSDB_FUNC_CODE_LEN) {
terrno = TSDB_CODE_MND_INVALID_FUNC_CODE;
mError("func:%s, failed to create since %s", pCreate->name, terrstr());
return -1;
goto CREATE_FUNC_OVER;
}
if (pCreate->pCont[0] == 0) {
if (createReq.pCode[0] == 0) {
terrno = TSDB_CODE_MND_INVALID_FUNC_CODE;
mError("func:%s, failed to create since %s", pCreate->name, terrstr());
return -1;
goto CREATE_FUNC_OVER;
}
if (pCreate->bufSize <= 0 || pCreate->bufSize > TSDB_FUNC_BUF_SIZE) {
if (createReq.bufSize <= 0 || createReq.bufSize > TSDB_FUNC_BUF_SIZE) {
terrno = TSDB_CODE_MND_INVALID_FUNC_BUFSIZE;
mError("func:%s, failed to create since %s", pCreate->name, terrstr());
return -1;
goto CREATE_FUNC_OVER;
}
int32_t code = mndCreateFunc(pMnode, pReq, pCreate);
if (code != 0) {
mError("func:%s, failed to create since %s", pCreate->name, terrstr());
return -1;
pUser = mndAcquireUser(pMnode, pReq->user);
if (pUser == NULL) {
terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
goto CREATE_FUNC_OVER;
}
if (mndCheckOperateFuncAuth(pUser)) {
goto CREATE_FUNC_OVER;
}
code = mndCreateFunc(pMnode, pReq, &createReq);
if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
CREATE_FUNC_OVER:
if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mError("func:%s, failed to create since %s", createReq.name, terrstr());
}
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
mndReleaseFunc(pMnode, pFunc);
mndReleaseUser(pMnode, pUser);
return code;
}
static int32_t mndProcessDropFuncReq(SMnodeMsg *pReq) {
SMnode *pMnode = pReq->pMnode;
SDropFuncReq *pDrop = pReq->rpcMsg.pCont;
SMnode *pMnode = pReq->pMnode;
int32_t code = -1;
SUserObj *pUser = NULL;
SFuncObj *pFunc = NULL;
SDropFuncReq dropReq = {0};
if (tDeserializeSDropFuncReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &dropReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
goto DROP_FUNC_OVER;
}
mDebug("func:%s, start to drop", pDrop->name);
mDebug("func:%s, start to drop", dropReq.name);
if (pDrop->name[0] == 0) {
if (dropReq.name[0] == 0) {
terrno = TSDB_CODE_MND_INVALID_FUNC_NAME;
mError("func:%s, failed to drop since %s", pDrop->name, terrstr());
return -1;
goto DROP_FUNC_OVER;
}
SFuncObj *pFunc = mndAcquireFunc(pMnode, pDrop->name);
pFunc = mndAcquireFunc(pMnode, dropReq.name);
if (pFunc == NULL) {
if (pDrop->igNotExists) {
mDebug("func:%s, not exist, ignore not exist is set", pDrop->name);
return 0;
if (dropReq.igNotExists) {
mDebug("func:%s, not exist, ignore not exist is set", dropReq.name);
code = 0;
goto DROP_FUNC_OVER;
} else {
terrno = TSDB_CODE_MND_FUNC_NOT_EXIST;
mError("func:%s, failed to drop since %s", pDrop->name, terrstr());
return -1;
goto DROP_FUNC_OVER;
}
}
int32_t code = mndDropFunc(pMnode, pReq, pFunc);
mndReleaseFunc(pMnode, pFunc);
pUser = mndAcquireUser(pMnode, pReq->user);
if (pUser == NULL) {
terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
goto DROP_FUNC_OVER;
}
if (mndCheckOperateFuncAuth(pUser)) {
goto DROP_FUNC_OVER;
}
code = mndDropFunc(pMnode, pReq, pFunc);
if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
if (code != 0) {
mError("func:%s, failed to drop since %s", pDrop->name, terrstr());
return -1;
DROP_FUNC_OVER:
if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mError("func:%s, failed to drop since %s", dropReq.name, terrstr());
}
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
mndReleaseFunc(pMnode, pFunc);
mndReleaseUser(pMnode, pUser);
return code;
}
static int32_t mndProcessRetrieveFuncReq(SMnodeMsg *pReq) {
int32_t code = -1;
SMnode *pMnode = pReq->pMnode;
SMnode *pMnode = pReq->pMnode;
int32_t code = -1;
SRetrieveFuncReq retrieveReq = {0};
SRetrieveFuncRsp retrieveRsp = {0};
if (tDeserializeSRetrieveFuncReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &retrieveReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
goto RETRIEVE_FUNC_OVER;
}
SRetrieveFuncReq *pRetrieve = pReq->rpcMsg.pCont;
pRetrieve->numOfFuncs = htonl(pRetrieve->numOfFuncs);
if (pRetrieve->numOfFuncs <= 0 || pRetrieve->numOfFuncs > TSDB_FUNC_MAX_RETRIEVE) {
if (retrieveReq.numOfFuncs <= 0 || retrieveReq.numOfFuncs > TSDB_FUNC_MAX_RETRIEVE) {
terrno = TSDB_CODE_MND_INVALID_FUNC_RETRIEVE;
return -1;
goto RETRIEVE_FUNC_OVER;
}
int32_t fsize = sizeof(SFuncInfo) + TSDB_FUNC_CODE_LEN + TSDB_FUNC_COMMENT_LEN;
int32_t size = sizeof(SRetrieveFuncRsp) + fsize * pRetrieve->numOfFuncs;
SRetrieveFuncRsp *pRetrieveRsp = rpcMallocCont(size);
if (pRetrieveRsp == NULL) {
retrieveRsp.numOfFuncs = retrieveReq.numOfFuncs;
retrieveRsp.pFuncInfos = taosArrayInit(retrieveReq.numOfFuncs, sizeof(SFuncInfo));
if (retrieveRsp.pFuncInfos == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto FUNC_RETRIEVE_OVER;
goto RETRIEVE_FUNC_OVER;
}
pRetrieveRsp->numOfFuncs = htonl(pRetrieve->numOfFuncs);
char *pOutput = pRetrieveRsp->pFuncInfos;
for (int32_t i = 0; i < pRetrieve->numOfFuncs; ++i) {
char funcName[TSDB_FUNC_NAME_LEN] = {0};
memcpy(funcName, pRetrieve->pFuncNames + i * TSDB_FUNC_NAME_LEN, TSDB_FUNC_NAME_LEN);
for (int32_t i = 0; i < retrieveReq.numOfFuncs; ++i) {
char *funcName = taosArrayGet(retrieveReq.pFuncNames, i);
SFuncObj *pFunc = mndAcquireFunc(pMnode, funcName);
if (pFunc == NULL) {
terrno = TSDB_CODE_MND_INVALID_FUNC;
mError("func:%s, failed to retrieve since %s", funcName, terrstr());
goto FUNC_RETRIEVE_OVER;
goto RETRIEVE_FUNC_OVER;
}
SFuncInfo *pFuncInfo = (SFuncInfo *)pOutput;
memcpy(pFuncInfo->name, pFunc->name, TSDB_FUNC_NAME_LEN);
pFuncInfo->funcType = pFunc->funcType;
pFuncInfo->scriptType = pFunc->scriptType;
pFuncInfo->outputType = pFunc->outputType;
pFuncInfo->outputLen = htonl(pFunc->outputLen);
pFuncInfo->bufSize = htonl(pFunc->bufSize);
pFuncInfo->signature = htobe64(pFunc->signature);
pFuncInfo->commentSize = htonl(pFunc->commentSize);
pFuncInfo->codeSize = htonl(pFunc->codeSize);
memcpy(pFuncInfo->pCont, pFunc->pComment, pFunc->commentSize);
memcpy(pFuncInfo->pCont + pFunc->commentSize, pFunc->pCode, pFunc->codeSize);
pOutput += (sizeof(SFuncInfo) + pFunc->commentSize + pFunc->codeSize);
SFuncInfo funcInfo = {0};
memcpy(funcInfo.name, pFunc->name, TSDB_FUNC_NAME_LEN);
funcInfo.funcType = pFunc->funcType;
funcInfo.scriptType = pFunc->scriptType;
funcInfo.outputType = pFunc->outputType;
funcInfo.outputLen = pFunc->outputLen;
funcInfo.bufSize = pFunc->bufSize;
funcInfo.signature = pFunc->signature;
funcInfo.commentSize = pFunc->commentSize;
funcInfo.codeSize = pFunc->codeSize;
memcpy(funcInfo.pComment, pFunc->pComment, pFunc->commentSize);
memcpy(funcInfo.pCode, pFunc->pCode, pFunc->codeSize);
taosArrayPush(retrieveRsp.pFuncInfos, &funcInfo);
mndReleaseFunc(pMnode, pFunc);
}
pReq->pCont = pRetrieveRsp;
pReq->contLen = (int32_t)(pOutput - (char *)pRetrieveRsp);
int32_t contLen = tSerializeSRetrieveFuncRsp(NULL, 0, &retrieveRsp);
void *pRsp = rpcMallocCont(contLen);
if (pRsp == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto RETRIEVE_FUNC_OVER;
}
tSerializeSRetrieveFuncRsp(pRsp, contLen, &retrieveRsp);
pReq->pCont = pRsp;
pReq->contLen = contLen;
code = 0;
FUNC_RETRIEVE_OVER:
if (code != 0) rpcFreeCont(pRetrieveRsp);
RETRIEVE_FUNC_OVER:
taosArrayDestroy(retrieveReq.pFuncNames);
taosArrayDestroy(retrieveRsp.pFuncInfos);
return code;
}
......
......@@ -415,7 +415,7 @@ static int32_t mndProcessCreateMnodeReq(SMnodeMsg *pReq) {
goto CREATE_MNODE_OVER;
}
if (mndCheckDropNodeAuth(pUser)) {
if (mndCheckOperateNodeAuth(pUser)) {
goto CREATE_MNODE_OVER;
}
......@@ -425,7 +425,6 @@ static int32_t mndProcessCreateMnodeReq(SMnodeMsg *pReq) {
CREATE_MNODE_OVER:
if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mError("mnode:%d, failed to create since %s", createReq.dnodeId, terrstr());
return -1;
}
mndReleaseMnode(pMnode, pObj);
......@@ -583,7 +582,7 @@ static int32_t mndProcessDropMnodeReq(SMnodeMsg *pReq) {
goto DROP_MNODE_OVER;
}
if (mndCheckCreateNodeAuth(pUser)) {
if (mndCheckOperateNodeAuth(pUser)) {
goto DROP_MNODE_OVER;
}
......
......@@ -293,7 +293,7 @@ static int32_t mndProcessCreateQnodeReq(SMnodeMsg *pReq) {
goto CREATE_QNODE_OVER;
}
if (mndCheckDropNodeAuth(pUser)) {
if (mndCheckOperateNodeAuth(pUser)) {
goto CREATE_QNODE_OVER;
}
......@@ -303,7 +303,6 @@ static int32_t mndProcessCreateQnodeReq(SMnodeMsg *pReq) {
CREATE_QNODE_OVER:
if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mError("qnode:%d, failed to create since %s", createReq.dnodeId, terrstr());
return -1;
}
mndReleaseQnode(pMnode, pObj);
......@@ -401,7 +400,7 @@ static int32_t mndProcessDropQnodeReq(SMnodeMsg *pReq) {
goto DROP_QNODE_OVER;
}
if (mndCheckCreateNodeAuth(pUser)) {
if (mndCheckOperateNodeAuth(pUser)) {
goto DROP_QNODE_OVER;
}
......@@ -411,7 +410,6 @@ static int32_t mndProcessDropQnodeReq(SMnodeMsg *pReq) {
DROP_QNODE_OVER:
if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mError("qnode:%d, failed to drop since %s", pMnode->dnodeId, terrstr());
return -1;
}
mndReleaseQnode(pMnode, pObj);
......
......@@ -294,7 +294,7 @@ static int32_t mndProcessCreateSnodeReq(SMnodeMsg *pReq) {
goto CREATE_SNODE_OVER;
}
if (mndCheckDropNodeAuth(pUser)) {
if (mndCheckOperateNodeAuth(pUser)) {
goto CREATE_SNODE_OVER;
}
......@@ -403,7 +403,7 @@ static int32_t mndProcessDropSnodeReq(SMnodeMsg *pReq) {
goto DROP_SNODE_OVER;
}
if (mndCheckCreateNodeAuth(pUser)) {
if (mndCheckOperateNodeAuth(pUser)) {
goto DROP_SNODE_OVER;
}
......@@ -413,7 +413,6 @@ static int32_t mndProcessDropSnodeReq(SMnodeMsg *pReq) {
DROP_SNODE_OVER:
if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mError("snode:%d, failed to drop since %s", pMnode->dnodeId, terrstr());
return -1;
}
mndReleaseSnode(pMnode, pObj);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册