提交 208f4480 编写于 作者: H Hongze Cheng

Merge branch '3.0' of https://github.com/taosdata/TDengine into feature/tkv

...@@ -100,7 +100,7 @@ typedef enum _mgmt_table { ...@@ -100,7 +100,7 @@ typedef enum _mgmt_table {
TSDB_MGMT_TABLE_CLUSTER, TSDB_MGMT_TABLE_CLUSTER,
TSDB_MGMT_TABLE_STREAMTABLES, TSDB_MGMT_TABLE_STREAMTABLES,
TSDB_MGMT_TABLE_TP, TSDB_MGMT_TABLE_TP,
TSDB_MGMT_TABLE_FUNCTION, TSDB_MGMT_TABLE_FUNC,
TSDB_MGMT_TABLE_MAX, TSDB_MGMT_TABLE_MAX,
} EShowType; } EShowType;
...@@ -526,20 +526,21 @@ typedef struct { ...@@ -526,20 +526,21 @@ typedef struct {
typedef struct { typedef struct {
char name[TSDB_FUNC_NAME_LEN]; char name[TSDB_FUNC_NAME_LEN];
int8_t igExists;
int8_t funcType; int8_t funcType;
int8_t scriptType; int8_t scriptType;
int8_t align;
int8_t outputType; int8_t outputType;
int32_t outputLen; int32_t outputLen;
int32_t bufSize; int32_t bufSize;
int64_t sigature; int64_t signature;
int32_t commentSize; int32_t commentSize;
int32_t codeSize; int32_t codeSize;
char pCont[]; char pCont[];
} SCreateFuncReq; } SCreateFuncReq;
typedef struct { typedef struct {
char name[TSDB_FUNC_NAME_LEN]; char name[TSDB_FUNC_NAME_LEN];
int8_t igNotExists;
} SDropFuncReq; } SDropFuncReq;
typedef struct { typedef struct {
...@@ -549,13 +550,13 @@ typedef struct { ...@@ -549,13 +550,13 @@ typedef struct {
typedef struct { typedef struct {
char name[TSDB_FUNC_NAME_LEN]; char name[TSDB_FUNC_NAME_LEN];
int8_t align;
int8_t funcType; int8_t funcType;
int8_t scriptType; int8_t scriptType;
int8_t align;
int8_t outputType; int8_t outputType;
int32_t outputLen; int32_t outputLen;
int32_t bufSize; int32_t bufSize;
int64_t sigature; int64_t signature;
int32_t commentSize; int32_t commentSize;
int32_t codeSize; int32_t codeSize;
char pCont[]; char pCont[];
......
...@@ -119,9 +119,9 @@ enum { ...@@ -119,9 +119,9 @@ enum {
TD_DEF_MSG_TYPE(TDMT_MND_ALTER_DB, "mnode-alter-db", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_ALTER_DB, "mnode-alter-db", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_SYNC_DB, "mnode-sync-db", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_SYNC_DB, "mnode-sync-db", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_COMPACT_DB, "mnode-compact-db", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_COMPACT_DB, "mnode-compact-db", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_FUNCTION, "mnode-create-function", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_CREATE_FUNC, "mnode-create-func", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_RETRIEVE_FUNCTION, "mnode-retrieve-function", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_RETRIEVE_FUNC, "mnode-retrieve-func", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_DROP_FUNCTION, "mnode-drop-function", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_DROP_FUNC, "mnode-drop-func", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_STB, "mnode-create-stb", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_CREATE_STB, "mnode-create-stb", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_ALTER_STB, "mnode-alter-stb", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_ALTER_STB, "mnode-alter-stb", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_DROP_STB, "mnode-drop-stb", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_DROP_STB, "mnode-drop-stb", NULL, NULL)
......
...@@ -74,6 +74,7 @@ void columnListCopy(SArray* dst, const SArray* src, uint64_t uid); ...@@ -74,6 +74,7 @@ void columnListCopy(SArray* dst, const SArray* src, uint64_t uid);
void columnListDestroy(SArray* pColumnList); void columnListDestroy(SArray* pColumnList);
void dropAllExprInfo(SArray** pExprInfo, int32_t numOfLevel); void dropAllExprInfo(SArray** pExprInfo, int32_t numOfLevel);
void dropOneLevelExprInfo(SArray* pExprInfo);
typedef struct SSourceParam { typedef struct SSourceParam {
SArray *pExprNodeList; //Array<struct tExprNode*> SArray *pExprNodeList; //Array<struct tExprNode*>
......
...@@ -239,6 +239,7 @@ int32_t* taosGetErrno(); ...@@ -239,6 +239,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MND_INVALID_FUNC_COMMENT TAOS_DEF_ERROR_CODE(0, 0x03C4) #define TSDB_CODE_MND_INVALID_FUNC_COMMENT TAOS_DEF_ERROR_CODE(0, 0x03C4)
#define TSDB_CODE_MND_INVALID_FUNC_CODE TAOS_DEF_ERROR_CODE(0, 0x03C5) #define TSDB_CODE_MND_INVALID_FUNC_CODE TAOS_DEF_ERROR_CODE(0, 0x03C5)
#define TSDB_CODE_MND_INVALID_FUNC_BUFSIZE TAOS_DEF_ERROR_CODE(0, 0x03C6) #define TSDB_CODE_MND_INVALID_FUNC_BUFSIZE TAOS_DEF_ERROR_CODE(0, 0x03C6)
#define TSDB_CODE_MND_INVALID_FUNC_RETRIEVE TAOS_DEF_ERROR_CODE(0, 0x03C7)
// mnode-trans // mnode-trans
#define TSDB_CODE_MND_TRANS_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03D0) #define TSDB_CODE_MND_TRANS_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03D0)
......
...@@ -173,6 +173,7 @@ do { \ ...@@ -173,6 +173,7 @@ do { \
#define TSDB_FUNC_BUF_SIZE 512 #define TSDB_FUNC_BUF_SIZE 512
#define TSDB_FUNC_TYPE_SCALAR 1 #define TSDB_FUNC_TYPE_SCALAR 1
#define TSDB_FUNC_TYPE_AGGREGATE 2 #define TSDB_FUNC_TYPE_AGGREGATE 2
#define TSDB_FUNC_MAX_RETRIEVE 1024
#define TSDB_TYPE_STR_MAX_LEN 32 #define TSDB_TYPE_STR_MAX_LEN 32
#define TSDB_TABLE_FNAME_LEN (TSDB_DB_FNAME_LEN + TSDB_TABLE_NAME_LEN + TSDB_NAME_DELIMITER_LEN) #define TSDB_TABLE_FNAME_LEN (TSDB_DB_FNAME_LEN + TSDB_TABLE_NAME_LEN + TSDB_NAME_DELIMITER_LEN)
......
...@@ -218,6 +218,7 @@ int32_t getPlan(SRequestObj* pRequest, SQueryNode* pQueryNode, SQueryDag** pDag, ...@@ -218,6 +218,7 @@ int32_t getPlan(SRequestObj* pRequest, SQueryNode* pQueryNode, SQueryDag** pDag,
if (pQueryNode->type == TSDB_SQL_SELECT) { if (pQueryNode->type == TSDB_SQL_SELECT) {
setResSchemaInfo(&pRequest->body.resInfo, pSchema, numOfCols); setResSchemaInfo(&pRequest->body.resInfo, pSchema, numOfCols);
tfree(pSchema);
pRequest->type = TDMT_VND_QUERY; pRequest->type = TDMT_VND_QUERY;
} else { } else {
tfree(pSchema); tfree(pSchema);
......
...@@ -90,9 +90,9 @@ static void dndInitMsgFp(STransMgmt *pMgmt) { ...@@ -90,9 +90,9 @@ static void dndInitMsgFp(STransMgmt *pMgmt) {
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_ALTER_DB)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_ALTER_DB)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SYNC_DB)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SYNC_DB)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_COMPACT_DB)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_COMPACT_DB)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_FUNCTION)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_FUNC)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_RETRIEVE_FUNCTION)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_RETRIEVE_FUNC)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_FUNCTION)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_FUNC)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_STB)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_STB)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_ALTER_STB)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_ALTER_STB)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_STB)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_STB)] = dndProcessMnodeWriteMsg;
......
...@@ -96,6 +96,15 @@ class Testbase { ...@@ -96,6 +96,15 @@ class Testbase {
#define CheckBinary(val, len) \ #define CheckBinary(val, len) \
{ EXPECT_STREQ(test.GetShowBinary(len), val); } { EXPECT_STREQ(test.GetShowBinary(len), val); }
#define CheckBinaryByte(b, len) \
{ \
char* bytes = (char*)calloc(1, len); \
for (int32_t i = 0; i < len - 1; ++i) { \
bytes[i] = b; \
} \
EXPECT_STREQ(test.GetShowBinary(len), bytes); \
}
#define CheckInt8(val) \ #define CheckInt8(val) \
{ EXPECT_EQ(test.GetShowInt8(), val); } { EXPECT_EQ(test.GetShowInt8(), val); }
......
...@@ -314,7 +314,7 @@ typedef struct { ...@@ -314,7 +314,7 @@ typedef struct {
int8_t outputType; int8_t outputType;
int32_t outputLen; int32_t outputLen;
int32_t bufSize; int32_t bufSize;
int64_t sigature; int64_t signature;
int32_t commentSize; int32_t commentSize;
int32_t codeSize; int32_t codeSize;
char* pComment; char* pComment;
......
...@@ -25,14 +25,14 @@ static SSdbRaw *mndFuncActionEncode(SFuncObj *pFunc); ...@@ -25,14 +25,14 @@ static SSdbRaw *mndFuncActionEncode(SFuncObj *pFunc);
static SSdbRow *mndFuncActionDecode(SSdbRaw *pRaw); static SSdbRow *mndFuncActionDecode(SSdbRaw *pRaw);
static int32_t mndFuncActionInsert(SSdb *pSdb, SFuncObj *pFunc); static int32_t mndFuncActionInsert(SSdb *pSdb, SFuncObj *pFunc);
static int32_t mndFuncActionDelete(SSdb *pSdb, SFuncObj *pFunc); static int32_t mndFuncActionDelete(SSdb *pSdb, SFuncObj *pFunc);
static int32_t mndFuncActionUpdate(SSdb *pSdb, SFuncObj *pOldFunc, SFuncObj *pNewFunc); static int32_t mndFuncActionUpdate(SSdb *pSdb, SFuncObj *pOld, SFuncObj *pNew);
static int32_t mndCreateFunc(SMnode *pMnode, SMnodeMsg *pMsg, SCreateFuncReq *pCreate); static int32_t mndCreateFunc(SMnode *pMnode, SMnodeMsg *pReq, SCreateFuncReq *pCreate);
static int32_t mndDropFunc(SMnode *pMnode, SMnodeMsg *pMsg, SFuncObj *pFunc); static int32_t mndDropFunc(SMnode *pMnode, SMnodeMsg *pReq, SFuncObj *pFunc);
static int32_t mndProcessCreateFuncMsg(SMnodeMsg *pMsg); static int32_t mndProcessCreateFuncReq(SMnodeMsg *pReq);
static int32_t mndProcessDropFuncMsg(SMnodeMsg *pMsg); static int32_t mndProcessDropFuncReq(SMnodeMsg *pReq);
static int32_t mndProcessRetrieveFuncMsg(SMnodeMsg *pMsg); static int32_t mndProcessRetrieveFuncReq(SMnodeMsg *pReq);
static int32_t mndGetFuncMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta); static int32_t mndGetFuncMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
static int32_t mndRetrieveFuncs(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows); static int32_t mndRetrieveFuncs(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
static void mndCancelGetNextFunc(SMnode *pMnode, void *pIter); static void mndCancelGetNextFunc(SMnode *pMnode, void *pIter);
int32_t mndInitFunc(SMnode *pMnode) { int32_t mndInitFunc(SMnode *pMnode) {
...@@ -44,13 +44,13 @@ int32_t mndInitFunc(SMnode *pMnode) { ...@@ -44,13 +44,13 @@ int32_t mndInitFunc(SMnode *pMnode) {
.updateFp = (SdbUpdateFp)mndFuncActionUpdate, .updateFp = (SdbUpdateFp)mndFuncActionUpdate,
.deleteFp = (SdbDeleteFp)mndFuncActionDelete}; .deleteFp = (SdbDeleteFp)mndFuncActionDelete};
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_FUNCTION, mndProcessCreateFuncMsg); mndSetMsgHandle(pMnode, TDMT_MND_CREATE_FUNC, mndProcessCreateFuncReq);
mndSetMsgHandle(pMnode, TDMT_MND_DROP_FUNCTION, mndProcessDropFuncMsg); mndSetMsgHandle(pMnode, TDMT_MND_DROP_FUNC, mndProcessDropFuncReq);
mndSetMsgHandle(pMnode, TDMT_MND_RETRIEVE_FUNCTION, mndProcessRetrieveFuncMsg); mndSetMsgHandle(pMnode, TDMT_MND_RETRIEVE_FUNC, mndProcessRetrieveFuncReq);
mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_FUNCTION, mndGetFuncMeta); mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_FUNC, mndGetFuncMeta);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_FUNCTION, mndRetrieveFuncs); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_FUNC, mndRetrieveFuncs);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_FUNCTION, mndCancelGetNextFunc); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_FUNC, mndCancelGetNextFunc);
return sdbSetTable(pMnode->pSdb, table); return sdbSetTable(pMnode->pSdb, table);
} }
...@@ -73,7 +73,7 @@ static SSdbRaw *mndFuncActionEncode(SFuncObj *pFunc) { ...@@ -73,7 +73,7 @@ static SSdbRaw *mndFuncActionEncode(SFuncObj *pFunc) {
SDB_SET_INT8(pRaw, dataPos, pFunc->outputType, FUNC_ENCODE_OVER) SDB_SET_INT8(pRaw, dataPos, pFunc->outputType, FUNC_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pFunc->outputLen, FUNC_ENCODE_OVER) SDB_SET_INT32(pRaw, dataPos, pFunc->outputLen, FUNC_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pFunc->bufSize, FUNC_ENCODE_OVER) SDB_SET_INT32(pRaw, dataPos, pFunc->bufSize, FUNC_ENCODE_OVER)
SDB_SET_INT64(pRaw, dataPos, pFunc->sigature, FUNC_ENCODE_OVER) SDB_SET_INT64(pRaw, dataPos, pFunc->signature, FUNC_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pFunc->commentSize, FUNC_ENCODE_OVER) SDB_SET_INT32(pRaw, dataPos, pFunc->commentSize, FUNC_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pFunc->codeSize, FUNC_ENCODE_OVER) SDB_SET_INT32(pRaw, dataPos, pFunc->codeSize, FUNC_ENCODE_OVER)
SDB_SET_BINARY(pRaw, dataPos, pFunc->pComment, pFunc->commentSize, FUNC_ENCODE_OVER) SDB_SET_BINARY(pRaw, dataPos, pFunc->pComment, pFunc->commentSize, FUNC_ENCODE_OVER)
...@@ -104,13 +104,11 @@ static SSdbRow *mndFuncActionDecode(SSdbRaw *pRaw) { ...@@ -104,13 +104,11 @@ static SSdbRow *mndFuncActionDecode(SSdbRaw *pRaw) {
goto FUNC_DECODE_OVER; goto FUNC_DECODE_OVER;
} }
int32_t size = sizeof(SFuncObj) + TSDB_FUNC_COMMENT_LEN + TSDB_FUNC_CODE_LEN; SSdbRow *pRow = sdbAllocRow(sizeof(SFuncObj));
SSdbRow *pRow = sdbAllocRow(size);
if (pRow == NULL) goto FUNC_DECODE_OVER; if (pRow == NULL) goto FUNC_DECODE_OVER;
SFuncObj *pFunc = sdbGetRowObj(pRow); SFuncObj *pFunc = sdbGetRowObj(pRow);
if (pFunc == NULL) goto FUNC_DECODE_OVER; if (pFunc == NULL) goto FUNC_DECODE_OVER;
char *tmp = (char *)pFunc + sizeof(SFuncObj);
int32_t dataPos = 0; int32_t dataPos = 0;
SDB_GET_BINARY(pRaw, dataPos, pFunc->name, TSDB_FUNC_NAME_LEN, FUNC_DECODE_OVER) SDB_GET_BINARY(pRaw, dataPos, pFunc->name, TSDB_FUNC_NAME_LEN, FUNC_DECODE_OVER)
...@@ -121,12 +119,18 @@ static SSdbRow *mndFuncActionDecode(SSdbRaw *pRaw) { ...@@ -121,12 +119,18 @@ static SSdbRow *mndFuncActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT8(pRaw, dataPos, &pFunc->outputType, FUNC_DECODE_OVER) SDB_GET_INT8(pRaw, dataPos, &pFunc->outputType, FUNC_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &pFunc->outputLen, FUNC_DECODE_OVER) SDB_GET_INT32(pRaw, dataPos, &pFunc->outputLen, FUNC_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &pFunc->bufSize, FUNC_DECODE_OVER) SDB_GET_INT32(pRaw, dataPos, &pFunc->bufSize, FUNC_DECODE_OVER)
SDB_GET_INT64(pRaw, dataPos, &pFunc->sigature, FUNC_DECODE_OVER) SDB_GET_INT64(pRaw, dataPos, &pFunc->signature, FUNC_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &pFunc->commentSize, FUNC_DECODE_OVER) SDB_GET_INT32(pRaw, dataPos, &pFunc->commentSize, FUNC_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &pFunc->codeSize, FUNC_DECODE_OVER) SDB_GET_INT32(pRaw, dataPos, &pFunc->codeSize, FUNC_DECODE_OVER)
SDB_GET_BINARY(pRaw, dataPos, pFunc->pData, pFunc->commentSize + pFunc->codeSize, FUNC_DECODE_OVER)
pFunc->pComment = pFunc->pData; pFunc->pComment = calloc(1, pFunc->commentSize);
pFunc->pCode = (pFunc->pData + pFunc->commentSize); pFunc->pCode = calloc(1, pFunc->codeSize);
if (pFunc->pComment == NULL || pFunc->pCode == NULL) {
goto FUNC_DECODE_OVER;
}
SDB_GET_BINARY(pRaw, dataPos, pFunc->pComment, pFunc->commentSize, FUNC_DECODE_OVER)
SDB_GET_BINARY(pRaw, dataPos, pFunc->pCode, pFunc->codeSize, FUNC_DECODE_OVER)
terrno = 0; terrno = 0;
...@@ -148,136 +152,136 @@ static int32_t mndFuncActionInsert(SSdb *pSdb, SFuncObj *pFunc) { ...@@ -148,136 +152,136 @@ static int32_t mndFuncActionInsert(SSdb *pSdb, SFuncObj *pFunc) {
static int32_t mndFuncActionDelete(SSdb *pSdb, SFuncObj *pFunc) { static int32_t mndFuncActionDelete(SSdb *pSdb, SFuncObj *pFunc) {
mTrace("func:%s, perform delete action, row:%p", pFunc->name, pFunc); mTrace("func:%s, perform delete action, row:%p", pFunc->name, pFunc);
tfree(pFunc->pCode);
tfree(pFunc->pComment);
return 0; return 0;
} }
static int32_t mndFuncActionUpdate(SSdb *pSdb, SFuncObj *pOldFunc, SFuncObj *pNewFunc) { static int32_t mndFuncActionUpdate(SSdb *pSdb, SFuncObj *pOld, SFuncObj *pNew) {
mTrace("func:%s, perform update action, old row:%p new row:%p", pOldFunc->name, pOldFunc, pNewFunc); mTrace("func:%s, perform update action, old row:%p new row:%p", pOld->name, pOld, pNew);
return 0; return 0;
} }
static int32_t mndCreateFunc(SMnode *pMnode, SMnodeMsg *pMsg, SCreateFuncReq *pCreate) { static SFuncObj *mndAcquireFunc(SMnode *pMnode, char *funcName) {
SFuncObj *pFunc = calloc(1, sizeof(SFuncObj) + pCreate->commentSize + pCreate->codeSize); SSdb *pSdb = pMnode->pSdb;
pFunc->createdTime = taosGetTimestampMs(); SFuncObj *pFunc = sdbAcquire(pSdb, SDB_FUNC, funcName);
pFunc->funcType = pCreate->funcType; if (pFunc == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
pFunc->scriptType = pCreate->scriptType; terrno = TSDB_CODE_MND_FUNC_NOT_EXIST;
pFunc->outputType = pCreate->outputType;
pFunc->outputLen = pCreate->outputLen;
pFunc->bufSize = pCreate->bufSize;
pFunc->sigature = pCreate->sigature;
pFunc->commentSize = pCreate->commentSize;
pFunc->codeSize = pCreate->codeSize;
pFunc->pComment = pFunc->pData;
memcpy(pFunc->pComment, pCreate->pCont, pCreate->commentSize);
pFunc->pCode = pFunc->pData + pCreate->commentSize;
memcpy(pFunc->pCode, pCreate->pCont + pCreate->commentSize, pFunc->codeSize);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pMsg->rpcMsg);
if (pTrans == NULL) {
free(pFunc);
mError("func:%s, failed to create since %s", pCreate->name, terrstr());
return -1;
} }
return pFunc;
}
mDebug("trans:%d, used to create func:%s", pTrans->id, pCreate->name); static void mndReleaseFunc(SMnode *pMnode, SFuncObj *pFunc) {
SSdb *pSdb = pMnode->pSdb;
sdbRelease(pSdb, pFunc);
}
SSdbRaw *pRedoRaw = mndFuncActionEncode(pFunc); static int32_t mndCreateFunc(SMnode *pMnode, SMnodeMsg *pReq, SCreateFuncReq *pCreate) {
if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) { int32_t code = -1;
mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr()); STrans *pTrans = NULL;
free(pFunc);
mndTransDrop(pTrans); SFuncObj func = {0};
return -1; memcpy(func.name, pCreate->name, TSDB_FUNC_NAME_LEN);
func.createdTime = taosGetTimestampMs();
func.funcType = pCreate->funcType;
func.scriptType = pCreate->scriptType;
func.outputType = pCreate->outputType;
func.outputLen = pCreate->outputLen;
func.bufSize = pCreate->bufSize;
func.signature = pCreate->signature;
func.commentSize = pCreate->commentSize;
func.codeSize = pCreate->codeSize;
func.pComment = malloc(func.commentSize);
func.pCode = malloc(func.codeSize);
if (func.pCode == NULL || func.pCode == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto CREATE_FUNC_OVER;
} }
sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING);
SSdbRaw *pUndoRaw = mndFuncActionEncode(pFunc); memcpy(func.pComment, pCreate->pCont, pCreate->commentSize);
if (pUndoRaw == NULL || mndTransAppendUndolog(pTrans, pUndoRaw) != 0) { memcpy(func.pCode, pCreate->pCont + pCreate->commentSize, func.codeSize);
mError("trans:%d, failed to append undo log since %s", pTrans->id, terrstr());
free(pFunc);
mndTransDrop(pTrans);
return -1;
}
sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED);
SSdbRaw *pCommitRaw = mndFuncActionEncode(pFunc); pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg);
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { if (pTrans == NULL) goto CREATE_FUNC_OVER;
mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
free(pFunc);
mndTransDrop(pTrans);
return -1;
}
sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
if (mndTransPrepare(pMnode, pTrans) != 0) { mDebug("trans:%d, used to create func:%s", pTrans->id, pCreate->name);
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
mndTransDrop(pTrans); SSdbRaw *pRedoRaw = mndFuncActionEncode(&func);
return -1; if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) goto CREATE_FUNC_OVER;
} if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING) != 0) goto CREATE_FUNC_OVER;
free(pFunc); SSdbRaw *pUndoRaw = mndFuncActionEncode(&func);
if (pUndoRaw == NULL || mndTransAppendUndolog(pTrans, pUndoRaw) != 0) goto CREATE_FUNC_OVER;
if (sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED) != 0) goto CREATE_FUNC_OVER;
SSdbRaw *pCommitRaw = mndFuncActionEncode(&func);
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) goto CREATE_FUNC_OVER;
if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) goto CREATE_FUNC_OVER;
if (mndTransPrepare(pMnode, pTrans) != 0) goto CREATE_FUNC_OVER;
code = 0;
CREATE_FUNC_OVER:
free(func.pCode);
free(func.pComment);
mndTransDrop(pTrans); mndTransDrop(pTrans);
return 0; return code;
} }
static int32_t mndDropFunc(SMnode *pMnode, SMnodeMsg *pMsg, SFuncObj *pFunc) { static int32_t mndDropFunc(SMnode *pMnode, SMnodeMsg *pReq, SFuncObj *pFunc) {
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pMsg->rpcMsg); int32_t code = -1;
if (pTrans == NULL) { STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg);
mError("func:%s, failed to drop since %s", pFunc->name, terrstr()); if (pTrans == NULL) goto DROP_FUNC_OVER;
return -1;
}
mDebug("trans:%d, used to drop user:%s", pTrans->id, pFunc->name); mDebug("trans:%d, used to drop user:%s", pTrans->id, pFunc->name);
SSdbRaw *pRedoRaw = mndFuncActionEncode(pFunc); SSdbRaw *pRedoRaw = mndFuncActionEncode(pFunc);
if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) { if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) goto DROP_FUNC_OVER;
mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
return -1;
}
sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING); sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING);
SSdbRaw *pUndoRaw = mndFuncActionEncode(pFunc); SSdbRaw *pUndoRaw = mndFuncActionEncode(pFunc);
if (pUndoRaw == NULL || mndTransAppendUndolog(pTrans, pUndoRaw) != 0) { if (pUndoRaw == NULL || mndTransAppendUndolog(pTrans, pUndoRaw) != 0) goto DROP_FUNC_OVER;
mError("trans:%d, failed to append undo log since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
return -1;
}
sdbSetRawStatus(pUndoRaw, SDB_STATUS_READY); sdbSetRawStatus(pUndoRaw, SDB_STATUS_READY);
SSdbRaw *pCommitRaw = mndFuncActionEncode(pFunc); SSdbRaw *pCommitRaw = mndFuncActionEncode(pFunc);
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) goto DROP_FUNC_OVER;
mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
return -1;
}
sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED); sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED);
if (mndTransPrepare(pMnode, pTrans) != 0) { if (mndTransPrepare(pMnode, pTrans) != 0) goto DROP_FUNC_OVER;
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
mndTransDrop(pTrans); code = 0;
return -1;
}
DROP_FUNC_OVER:
mndTransDrop(pTrans); mndTransDrop(pTrans);
return 0; return code;
} }
static int32_t mndProcessCreateFuncMsg(SMnodeMsg *pMsg) { static int32_t mndProcessCreateFuncReq(SMnodeMsg *pReq) {
SMnode *pMnode = pMsg->pMnode; SMnode *pMnode = pReq->pMnode;
SCreateFuncReq *pCreate = pMsg->rpcMsg.pCont; SCreateFuncReq *pCreate = pReq->rpcMsg.pCont;
pCreate->outputLen = htonl(pCreate->outputLen); pCreate->outputLen = htonl(pCreate->outputLen);
pCreate->bufSize = htonl(pCreate->bufSize); pCreate->bufSize = htonl(pCreate->bufSize);
pCreate->sigature = htobe64(pCreate->sigature); pCreate->signature = htobe64(pCreate->signature);
pCreate->commentSize = htonl(pCreate->commentSize); pCreate->commentSize = htonl(pCreate->commentSize);
pCreate->codeSize = htonl(pCreate->codeSize); pCreate->codeSize = htonl(pCreate->codeSize);
mDebug("func:%s, start to create", pCreate->name); mDebug("func:%s, start to create", pCreate->name);
SFuncObj *pFunc = sdbAcquire(pMnode->pSdb, SDB_FUNC, pCreate->name); SFuncObj *pFunc = mndAcquireFunc(pMnode, pCreate->name);
if (pFunc != NULL) { if (pFunc != NULL) {
sdbRelease(pMnode->pSdb, pFunc); mndReleaseFunc(pMnode, pFunc);
terrno = TSDB_CODE_MND_FUNC_ALREADY_EXIST; if (pCreate->igExists) {
mError("func:%s, failed to create since %s", pCreate->name, terrstr()); mDebug("stb:%s, already exist, ignore exist is set", pCreate->name);
return 0;
} else {
terrno = TSDB_CODE_MND_FUNC_ALREADY_EXIST;
mError("func:%s, failed to create since %s", pCreate->name, terrstr());
return -1;
}
} else if (terrno == TSDB_CODE_MND_FUNC_ALREADY_EXIST) {
mError("stb:%s, failed to create since %s", pCreate->name, terrstr());
return -1; return -1;
} }
...@@ -305,14 +309,13 @@ static int32_t mndProcessCreateFuncMsg(SMnodeMsg *pMsg) { ...@@ -305,14 +309,13 @@ static int32_t mndProcessCreateFuncMsg(SMnodeMsg *pMsg) {
return -1; return -1;
} }
if (pCreate->bufSize < 0 || pCreate->bufSize > TSDB_FUNC_BUF_SIZE) { if (pCreate->bufSize <= 0 || pCreate->bufSize > TSDB_FUNC_BUF_SIZE) {
terrno = TSDB_CODE_MND_INVALID_FUNC_BUFSIZE; terrno = TSDB_CODE_MND_INVALID_FUNC_BUFSIZE;
mError("func:%s, failed to create since %s", pCreate->name, terrstr()); mError("func:%s, failed to create since %s", pCreate->name, terrstr());
return -1; return -1;
} }
int32_t code = mndCreateFunc(pMnode, pMsg, pCreate); int32_t code = mndCreateFunc(pMnode, pReq, pCreate);
if (code != 0) { if (code != 0) {
mError("func:%s, failed to create since %s", pCreate->name, terrstr()); mError("func:%s, failed to create since %s", pCreate->name, terrstr());
return -1; return -1;
...@@ -321,9 +324,9 @@ static int32_t mndProcessCreateFuncMsg(SMnodeMsg *pMsg) { ...@@ -321,9 +324,9 @@ static int32_t mndProcessCreateFuncMsg(SMnodeMsg *pMsg) {
return TSDB_CODE_MND_ACTION_IN_PROGRESS; return TSDB_CODE_MND_ACTION_IN_PROGRESS;
} }
static int32_t mndProcessDropFuncMsg(SMnodeMsg *pMsg) { static int32_t mndProcessDropFuncReq(SMnodeMsg *pReq) {
SMnode *pMnode = pMsg->pMnode; SMnode *pMnode = pReq->pMnode;
SDropFuncReq *pDrop = pMsg->rpcMsg.pCont; SDropFuncReq *pDrop = pReq->rpcMsg.pCont;
mDebug("func:%s, start to drop", pDrop->name); mDebug("func:%s, start to drop", pDrop->name);
...@@ -333,14 +336,20 @@ static int32_t mndProcessDropFuncMsg(SMnodeMsg *pMsg) { ...@@ -333,14 +336,20 @@ static int32_t mndProcessDropFuncMsg(SMnodeMsg *pMsg) {
return -1; return -1;
} }
SFuncObj *pFunc = sdbAcquire(pMnode->pSdb, SDB_FUNC, pDrop->name); SFuncObj *pFunc = mndAcquireFunc(pMnode, pDrop->name);
if (pFunc == NULL) { if (pFunc == NULL) {
terrno = TSDB_CODE_MND_FUNC_NOT_EXIST; if (pDrop->igNotExists) {
mError("func:%s, failed to drop since %s", pDrop->name, terrstr()); mDebug("func:%s, not exist, ignore not exist is set", pDrop->name);
return -1; return 0;
} else {
terrno = TSDB_CODE_MND_FUNC_NOT_EXIST;
mError("func:%s, failed to drop since %s", pDrop->name, terrstr());
return -1;
}
} }
int32_t code = mndDropFunc(pMnode, pMsg, pFunc); int32_t code = mndDropFunc(pMnode, pReq, pFunc);
mndReleaseFunc(pMnode, pFunc);
if (code != 0) { if (code != 0) {
mError("func:%s, failed to drop since %s", pDrop->name, terrstr()); mError("func:%s, failed to drop since %s", pDrop->name, terrstr());
...@@ -350,15 +359,26 @@ static int32_t mndProcessDropFuncMsg(SMnodeMsg *pMsg) { ...@@ -350,15 +359,26 @@ static int32_t mndProcessDropFuncMsg(SMnodeMsg *pMsg) {
return TSDB_CODE_MND_ACTION_IN_PROGRESS; return TSDB_CODE_MND_ACTION_IN_PROGRESS;
} }
static int32_t mndProcessRetrieveFuncMsg(SMnodeMsg *pMsg) { static int32_t mndProcessRetrieveFuncReq(SMnodeMsg *pReq) {
SMnode *pMnode = pMsg->pMnode; int32_t code = -1;
SMnode *pMnode = pReq->pMnode;
SRetrieveFuncReq *pRetrieve = pMsg->rpcMsg.pCont; SRetrieveFuncReq *pRetrieve = pReq->rpcMsg.pCont;
pRetrieve->numOfFuncs = htonl(pRetrieve->numOfFuncs); pRetrieve->numOfFuncs = htonl(pRetrieve->numOfFuncs);
if (pRetrieve->numOfFuncs <= 0 || pRetrieve->numOfFuncs > TSDB_FUNC_MAX_RETRIEVE) {
terrno = TSDB_CODE_MND_INVALID_FUNC_RETRIEVE;
return -1;
}
int32_t size = sizeof(SRetrieveFuncRsp) + (sizeof(SFuncInfo) + TSDB_FUNC_CODE_LEN) * pRetrieve->numOfFuncs + 16384; int32_t fsize = sizeof(SFuncInfo) + TSDB_FUNC_CODE_LEN + TSDB_FUNC_COMMENT_LEN;
int32_t size = sizeof(SRetrieveFuncRsp) + fsize * pRetrieve->numOfFuncs;
SRetrieveFuncRsp *pRetrieveRsp = rpcMallocCont(size); SRetrieveFuncRsp *pRetrieveRsp = rpcMallocCont(size);
if (pRetrieveRsp == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto FUNC_RETRIEVE_OVER;
}
pRetrieveRsp->numOfFuncs = htonl(pRetrieve->numOfFuncs); pRetrieveRsp->numOfFuncs = htonl(pRetrieve->numOfFuncs);
char *pOutput = pRetrieveRsp->pFuncInfos; char *pOutput = pRetrieveRsp->pFuncInfos;
...@@ -366,37 +386,42 @@ static int32_t mndProcessRetrieveFuncMsg(SMnodeMsg *pMsg) { ...@@ -366,37 +386,42 @@ static int32_t mndProcessRetrieveFuncMsg(SMnodeMsg *pMsg) {
char funcName[TSDB_FUNC_NAME_LEN] = {0}; char funcName[TSDB_FUNC_NAME_LEN] = {0};
memcpy(funcName, pRetrieve->pFuncNames + i * TSDB_FUNC_NAME_LEN, TSDB_FUNC_NAME_LEN); memcpy(funcName, pRetrieve->pFuncNames + i * TSDB_FUNC_NAME_LEN, TSDB_FUNC_NAME_LEN);
SFuncObj *pFunc = sdbAcquire(pMnode->pSdb, SDB_FUNC, funcName); SFuncObj *pFunc = mndAcquireFunc(pMnode, funcName);
if (pFunc == NULL) { if (pFunc == NULL) {
terrno = TSDB_CODE_MND_INVALID_FUNC; terrno = TSDB_CODE_MND_INVALID_FUNC;
mError("func:%s, failed to retrieve since %s", funcName, terrstr()); mError("func:%s, failed to retrieve since %s", funcName, terrstr());
return -1; goto FUNC_RETRIEVE_OVER;
} }
SFuncInfo *pFuncInfo = (SFuncInfo *)pOutput; SFuncInfo *pFuncInfo = (SFuncInfo *)pOutput;
memcpy(pFuncInfo->name, pFunc->name, TSDB_FUNC_NAME_LEN);
strncpy(pFuncInfo->name, pFunc->name, TSDB_FUNC_NAME_LEN);
pFuncInfo->funcType = pFunc->funcType; pFuncInfo->funcType = pFunc->funcType;
pFuncInfo->scriptType = pFunc->scriptType; pFuncInfo->scriptType = pFunc->scriptType;
pFuncInfo->outputType = pFunc->outputType; pFuncInfo->outputType = pFunc->outputType;
pFuncInfo->outputLen = htonl(pFunc->outputLen); pFuncInfo->outputLen = htonl(pFunc->outputLen);
pFuncInfo->bufSize = htonl(pFunc->bufSize); pFuncInfo->bufSize = htonl(pFunc->bufSize);
pFuncInfo->sigature = htobe64(pFunc->sigature); pFuncInfo->signature = htobe64(pFunc->signature);
pFuncInfo->commentSize = htonl(pFunc->commentSize); pFuncInfo->commentSize = htonl(pFunc->commentSize);
pFuncInfo->codeSize = htonl(pFunc->codeSize); pFuncInfo->codeSize = htonl(pFunc->codeSize);
memcpy(pFuncInfo->pCont, pFunc->pCode, pFunc->commentSize + pFunc->codeSize); memcpy(pFuncInfo->pCont, pFunc->pComment, pFunc->commentSize);
memcpy(pFuncInfo->pCont + pFunc->commentSize, pFunc->pCode, pFunc->codeSize);
pOutput += sizeof(SFuncInfo) + pFunc->commentSize + pFunc->codeSize; pOutput += (sizeof(SFuncInfo) + pFunc->commentSize + pFunc->codeSize);
mndReleaseFunc(pMnode, pFunc);
} }
pMsg->pCont = pRetrieveRsp; pReq->pCont = pRetrieveRsp;
pMsg->contLen = (int32_t)(pOutput - (char *)pRetrieveRsp); pReq->contLen = (int32_t)(pOutput - (char *)pRetrieveRsp);
return 0; code = 0;
FUNC_RETRIEVE_OVER:
if (code != 0) rpcFreeCont(pRetrieveRsp);
return code;
} }
static int32_t mndGetFuncMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta) { static int32_t mndGetFuncMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) {
SMnode *pMnode = pMsg->pMnode; SMnode *pMnode = pReq->pMnode;
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
int32_t cols = 0; int32_t cols = 0;
...@@ -454,7 +479,7 @@ static int32_t mndGetFuncMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *p ...@@ -454,7 +479,7 @@ static int32_t mndGetFuncMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *p
pShow->numOfRows = sdbGetSize(pSdb, SDB_FUNC); pShow->numOfRows = sdbGetSize(pSdb, SDB_FUNC);
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
strcpy(pMeta->tbFname, "show funcs"); strcpy(pMeta->tbFname, mndShowStr(pShow->type));
return 0; return 0;
} }
...@@ -477,8 +502,8 @@ static void *mnodeGenTypeStr(char *buf, int32_t buflen, uint8_t type, int16_t le ...@@ -477,8 +502,8 @@ static void *mnodeGenTypeStr(char *buf, int32_t buflen, uint8_t type, int16_t le
return tDataTypes[type].name; return tDataTypes[type].name;
} }
static int32_t mndRetrieveFuncs(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows) { static int32_t mndRetrieveFuncs(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
SMnode *pMnode = pMsg->pMnode; SMnode *pMnode = pReq->pMnode;
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
int32_t numOfRows = 0; int32_t numOfRows = 0;
SFuncObj *pFunc = NULL; SFuncObj *pFunc = NULL;
......
...@@ -296,7 +296,7 @@ char *mndShowStr(int32_t showType) { ...@@ -296,7 +296,7 @@ char *mndShowStr(int32_t showType) {
return "show streamtables"; return "show streamtables";
case TSDB_MGMT_TABLE_TP: case TSDB_MGMT_TABLE_TP:
return "show topics"; return "show topics";
case TSDB_MGMT_TABLE_FUNCTION: case TSDB_MGMT_TABLE_FUNC:
return "show functions"; return "show functions";
default: default:
return "undefined"; return "undefined";
......
...@@ -123,8 +123,7 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) { ...@@ -123,8 +123,7 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) {
goto STB_DECODE_OVER; goto STB_DECODE_OVER;
} }
int32_t size = sizeof(SStbObj) + TSDB_MAX_COLUMNS * sizeof(SSchema); SSdbRow *pRow = sdbAllocRow(sizeof(SStbObj));
SSdbRow *pRow = sdbAllocRow(size);
if (pRow == NULL) goto STB_DECODE_OVER; if (pRow == NULL) goto STB_DECODE_OVER;
SStbObj *pStb = sdbGetRowObj(pRow); SStbObj *pStb = sdbGetRowObj(pRow);
...@@ -143,6 +142,9 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) { ...@@ -143,6 +142,9 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) {
int32_t totalCols = pStb->numOfColumns + pStb->numOfTags; int32_t totalCols = pStb->numOfColumns + pStb->numOfTags;
pStb->pSchema = calloc(totalCols, sizeof(SSchema)); pStb->pSchema = calloc(totalCols, sizeof(SSchema));
if (pStb->pSchema == NULL) {
goto STB_DECODE_OVER;
}
for (int32_t i = 0; i < totalCols; ++i) { for (int32_t i = 0; i < totalCols; ++i) {
SSchema *pSchema = &pStb->pSchema[i]; SSchema *pSchema = &pStb->pSchema[i];
...@@ -448,7 +450,7 @@ static int32_t mndCreateStb(SMnode *pMnode, SMnodeMsg *pReq, SMCreateStbReq *pCr ...@@ -448,7 +450,7 @@ static int32_t mndCreateStb(SMnode *pMnode, SMnodeMsg *pReq, SMCreateStbReq *pCr
stbObj.pSchema[i].colId = i + 1; stbObj.pSchema[i].colId = i + 1;
} }
int32_t code = 0; int32_t code = -1;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg);
if (pTrans == NULL) goto CREATE_STB_OVER; if (pTrans == NULL) goto CREATE_STB_OVER;
...@@ -481,7 +483,7 @@ static int32_t mndProcessMCreateStbReq(SMnodeMsg *pReq) { ...@@ -481,7 +483,7 @@ static int32_t mndProcessMCreateStbReq(SMnodeMsg *pReq) {
SStbObj *pStb = mndAcquireStb(pMnode, pCreate->name); SStbObj *pStb = mndAcquireStb(pMnode, pCreate->name);
if (pStb != NULL) { if (pStb != NULL) {
sdbRelease(pMnode->pSdb, pStb); mndReleaseStb(pMnode, pStb);
if (pCreate->igExists) { if (pCreate->igExists) {
mDebug("stb:%s, already exist, ignore exist is set", pCreate->name); mDebug("stb:%s, already exist, ignore exist is set", pCreate->name);
return 0; return 0;
...@@ -492,6 +494,7 @@ static int32_t mndProcessMCreateStbReq(SMnodeMsg *pReq) { ...@@ -492,6 +494,7 @@ static int32_t mndProcessMCreateStbReq(SMnodeMsg *pReq) {
} }
} else if (terrno != TSDB_CODE_MND_STB_NOT_EXIST) { } else if (terrno != TSDB_CODE_MND_STB_NOT_EXIST) {
mError("stb:%s, failed to create since %s", pCreate->name, terrstr()); mError("stb:%s, failed to create since %s", pCreate->name, terrstr());
return -1;
} }
// topic should have different name with stb // topic should have different name with stb
...@@ -640,7 +643,7 @@ static int32_t mndDropStb(SMnode *pMnode, SMnodeMsg *pReq, SStbObj *pStb) { ...@@ -640,7 +643,7 @@ static int32_t mndDropStb(SMnode *pMnode, SMnodeMsg *pReq, SStbObj *pStb) {
DROP_STB_OVER: DROP_STB_OVER:
mndTransDrop(pTrans); mndTransDrop(pTrans);
return 0; return code;
} }
static int32_t mndProcessMDropStbReq(SMnodeMsg *pReq) { static int32_t mndProcessMDropStbReq(SMnodeMsg *pReq) {
...@@ -665,7 +668,6 @@ static int32_t mndProcessMDropStbReq(SMnodeMsg *pReq) { ...@@ -665,7 +668,6 @@ static int32_t mndProcessMDropStbReq(SMnodeMsg *pReq) {
mndReleaseStb(pMnode, pStb); mndReleaseStb(pMnode, pStb);
if (code != 0) { if (code != 0) {
terrno = code;
mError("stb:%s, failed to drop since %s", pDrop->name, terrstr()); mError("stb:%s, failed to drop since %s", pDrop->name, terrstr());
return -1; return -1;
} }
......
...@@ -390,9 +390,11 @@ static void mndTransDropActions(SArray *pArray) { ...@@ -390,9 +390,11 @@ static void mndTransDropActions(SArray *pArray) {
} }
void mndTransDrop(STrans *pTrans) { void mndTransDrop(STrans *pTrans) {
mndTransDropData(pTrans); if (pTrans != NULL) {
mDebug("trans:%d, is dropped, data:%p", pTrans->id, pTrans); mndTransDropData(pTrans);
tfree(pTrans); mDebug("trans:%d, is dropped, data:%p", pTrans->id, pTrans);
tfree(pTrans);
}
} }
static int32_t mndTransAppendLog(SArray *pArray, SSdbRaw *pRaw) { static int32_t mndTransAppendLog(SArray *pArray, SSdbRaw *pRaw) {
......
...@@ -12,3 +12,4 @@ add_subdirectory(dnode) ...@@ -12,3 +12,4 @@ add_subdirectory(dnode)
add_subdirectory(mnode) add_subdirectory(mnode)
add_subdirectory(db) add_subdirectory(db)
add_subdirectory(stb) add_subdirectory(stb)
add_subdirectory(func)
aux_source_directory(. FUNC_SRC)
add_executable(mnode_test_func ${FUNC_SRC})
target_link_libraries(
mnode_test_func
PUBLIC sut
)
add_test(
NAME mnode_test_func
COMMAND mnode_test_func
)
/**
* @file func.cpp
* @author slguan (slguan@taosdata.com)
* @brief MNODE module func tests
* @version 1.0
* @date 2022-01-24
*
* @copyright Copyright (c) 2022
*
*/
#include "sut.h"
class MndTestFunc : public ::testing::Test {
protected:
static void SetUpTestSuite() { test.Init("/tmp/mnode_test_func", 9038); }
static void TearDownTestSuite() { test.Cleanup(); }
static Testbase test;
public:
void SetUp() override {}
void TearDown() override {}
};
Testbase MndTestFunc::test;
TEST_F(MndTestFunc, 01_Show_Func) {
test.SendShowMetaReq(TSDB_MGMT_TABLE_FUNC, "");
CHECK_META("show functions", 7);
CHECK_SCHEMA(0, TSDB_DATA_TYPE_BINARY, TSDB_FUNC_NAME_LEN + VARSTR_HEADER_SIZE, "name");
CHECK_SCHEMA(1, TSDB_DATA_TYPE_BINARY, PATH_MAX + VARSTR_HEADER_SIZE, "comment");
CHECK_SCHEMA(2, TSDB_DATA_TYPE_INT, 4, "aggregate");
CHECK_SCHEMA(3, TSDB_DATA_TYPE_BINARY, TSDB_TYPE_STR_MAX_LEN + VARSTR_HEADER_SIZE, "outputtype");
CHECK_SCHEMA(4, TSDB_DATA_TYPE_TIMESTAMP, 8, "create_time");
CHECK_SCHEMA(5, TSDB_DATA_TYPE_INT, 4, "code_len");
CHECK_SCHEMA(6, TSDB_DATA_TYPE_INT, 4, "bufsize");
test.SendShowRetrieveReq();
EXPECT_EQ(test.GetShowRows(), 0);
}
TEST_F(MndTestFunc, 02_Create_Func) {
{
int32_t contLen = sizeof(SCreateFuncReq);
SCreateFuncReq* pReq = (SCreateFuncReq*)rpcMallocCont(contLen);
strcpy(pReq->name, "");
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_FUNC, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_MND_INVALID_FUNC_NAME);
}
{
int32_t contLen = sizeof(SCreateFuncReq);
SCreateFuncReq* pReq = (SCreateFuncReq*)rpcMallocCont(contLen);
strcpy(pReq->name, "f1");
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_FUNC, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_MND_INVALID_FUNC_COMMENT);
}
{
int32_t contLen = sizeof(SCreateFuncReq);
SCreateFuncReq* pReq = (SCreateFuncReq*)rpcMallocCont(contLen);
strcpy(pReq->name, "f1");
pReq->commentSize = htonl(TSDB_FUNC_COMMENT_LEN + 1);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_FUNC, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_MND_INVALID_FUNC_COMMENT);
}
{
int32_t contLen = sizeof(SCreateFuncReq);
SCreateFuncReq* pReq = (SCreateFuncReq*)rpcMallocCont(contLen);
strcpy(pReq->name, "f1");
pReq->commentSize = htonl(TSDB_FUNC_COMMENT_LEN);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_FUNC, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_MND_INVALID_FUNC_CODE);
}
{
int32_t contLen = sizeof(SCreateFuncReq);
SCreateFuncReq* pReq = (SCreateFuncReq*)rpcMallocCont(contLen);
strcpy(pReq->name, "f1");
pReq->commentSize = htonl(TSDB_FUNC_COMMENT_LEN);
pReq->codeSize = htonl(TSDB_FUNC_CODE_LEN - 1);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_FUNC, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_MND_INVALID_FUNC_CODE);
}
{
int32_t contLen = sizeof(SCreateFuncReq) + 24;
SCreateFuncReq* pReq = (SCreateFuncReq*)rpcMallocCont(contLen);
strcpy(pReq->name, "f1");
pReq->commentSize = htonl(TSDB_FUNC_COMMENT_LEN);
pReq->codeSize = htonl(TSDB_FUNC_CODE_LEN);
pReq->pCont[0] = 0;
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_FUNC, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_MND_INVALID_FUNC_CODE);
}
{
int32_t contLen = sizeof(SCreateFuncReq) + 24;
SCreateFuncReq* pReq = (SCreateFuncReq*)rpcMallocCont(contLen);
strcpy(pReq->name, "f1");
pReq->commentSize = htonl(TSDB_FUNC_COMMENT_LEN);
pReq->codeSize = htonl(TSDB_FUNC_CODE_LEN);
pReq->pCont[0] = 'a';
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_FUNC, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_MND_INVALID_FUNC_BUFSIZE);
}
{
int32_t contLen = sizeof(SCreateFuncReq) + 24;
SCreateFuncReq* pReq = (SCreateFuncReq*)rpcMallocCont(contLen);
strcpy(pReq->name, "f1");
pReq->commentSize = htonl(TSDB_FUNC_COMMENT_LEN);
pReq->codeSize = htonl(TSDB_FUNC_CODE_LEN);
pReq->pCont[0] = 'a';
pReq->bufSize = htonl(TSDB_FUNC_BUF_SIZE + 1);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_FUNC, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_MND_INVALID_FUNC_BUFSIZE);
}
for (int32_t i = 0; i < 3; ++i) {
int32_t contLen = sizeof(SCreateFuncReq);
int32_t commentSize = TSDB_FUNC_COMMENT_LEN;
int32_t codeSize = TSDB_FUNC_CODE_LEN;
contLen = (contLen + codeSize + commentSize);
SCreateFuncReq* pReq = (SCreateFuncReq*)rpcMallocCont(contLen);
strcpy(pReq->name, "f1");
pReq->igExists = 0;
if (i == 2) pReq->igExists = 1;
pReq->funcType = 1;
pReq->scriptType = 2;
pReq->outputType = TSDB_DATA_TYPE_SMALLINT;
pReq->outputLen = htonl(12);
pReq->bufSize = htonl(4);
pReq->signature = htobe64(5);
pReq->commentSize = htonl(commentSize);
pReq->codeSize = htonl(codeSize);
for (int32_t i = 0; i < commentSize - 1; ++i) {
pReq->pCont[i] = 'm';
}
for (int32_t i = commentSize; i < commentSize + codeSize - 1; ++i) {
pReq->pCont[i] = 'd';
}
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_FUNC, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
if (i == 0 || i == 2) {
ASSERT_EQ(pRsp->code, 0);
} else {
ASSERT_EQ(pRsp->code, TSDB_CODE_MND_FUNC_ALREADY_EXIST);
}
}
test.SendShowMetaReq(TSDB_MGMT_TABLE_FUNC, "");
CHECK_META("show functions", 7);
test.SendShowRetrieveReq();
EXPECT_EQ(test.GetShowRows(), 1);
CheckBinary("f1", TSDB_FUNC_NAME_LEN);
CheckBinaryByte('m', TSDB_FUNC_COMMENT_LEN);
CheckInt32(0);
CheckBinary("SMALLINT", TSDB_TYPE_STR_MAX_LEN);
CheckTimestamp();
CheckInt32(TSDB_FUNC_CODE_LEN);
CheckInt32(4);
}
TEST_F(MndTestFunc, 03_Retrieve_Func) {
{
int32_t contLen = sizeof(SRetrieveFuncReq);
int32_t numOfFuncs = 1;
contLen = (contLen + numOfFuncs * TSDB_FUNC_NAME_LEN);
SRetrieveFuncReq* pReq = (SRetrieveFuncReq*)rpcMallocCont(contLen);
pReq->numOfFuncs = htonl(1);
strcpy(pReq->pFuncNames, "f1");
SRpcMsg* pRsp = test.SendReq(TDMT_MND_RETRIEVE_FUNC, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, 0);
SRetrieveFuncRsp* pRetrieveRsp = (SRetrieveFuncRsp*)pRsp->pCont;
pRetrieveRsp->numOfFuncs = htonl(pRetrieveRsp->numOfFuncs);
SFuncInfo* pFuncInfo = (SFuncInfo*)(pRetrieveRsp->pFuncInfos);
pFuncInfo->outputLen = htonl(pFuncInfo->outputLen);
pFuncInfo->bufSize = htonl(pFuncInfo->bufSize);
pFuncInfo->signature = htobe64(pFuncInfo->signature);
pFuncInfo->commentSize = htonl(pFuncInfo->commentSize);
pFuncInfo->codeSize = htonl(pFuncInfo->codeSize);
EXPECT_STREQ(pFuncInfo->name, "f1");
EXPECT_EQ(pFuncInfo->funcType, 1);
EXPECT_EQ(pFuncInfo->scriptType, 2);
EXPECT_EQ(pFuncInfo->outputType, TSDB_DATA_TYPE_SMALLINT);
EXPECT_EQ(pFuncInfo->outputLen, 12);
EXPECT_EQ(pFuncInfo->bufSize, 4);
EXPECT_EQ(pFuncInfo->signature, 5);
EXPECT_EQ(pFuncInfo->commentSize, TSDB_FUNC_COMMENT_LEN);
EXPECT_EQ(pFuncInfo->codeSize, TSDB_FUNC_CODE_LEN);
char* pComment = pFuncInfo->pCont;
char* pCode = pFuncInfo->pCont + pFuncInfo->commentSize;
char comments[TSDB_FUNC_COMMENT_LEN] = {0};
for (int32_t i = 0; i < TSDB_FUNC_COMMENT_LEN - 1; ++i) {
comments[i] = 'm';
}
char codes[TSDB_FUNC_CODE_LEN] = {0};
for (int32_t i = 0; i < TSDB_FUNC_CODE_LEN - 1; ++i) {
codes[i] = 'd';
}
EXPECT_STREQ(pComment, comments);
EXPECT_STREQ(pCode, codes);
}
{
int32_t contLen = sizeof(SRetrieveFuncReq);
int32_t numOfFuncs = 0;
contLen = (contLen + numOfFuncs * TSDB_FUNC_NAME_LEN);
SRetrieveFuncReq* pReq = (SRetrieveFuncReq*)rpcMallocCont(contLen);
pReq->numOfFuncs = htonl(numOfFuncs);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_RETRIEVE_FUNC, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_MND_INVALID_FUNC_RETRIEVE);
}
{
int32_t contLen = sizeof(SRetrieveFuncReq);
int32_t numOfFuncs = TSDB_FUNC_MAX_RETRIEVE + 1;
contLen = (contLen + numOfFuncs * TSDB_FUNC_NAME_LEN);
SRetrieveFuncReq* pReq = (SRetrieveFuncReq*)rpcMallocCont(contLen);
pReq->numOfFuncs = htonl(numOfFuncs);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_RETRIEVE_FUNC, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_MND_INVALID_FUNC_RETRIEVE);
}
{
int32_t contLen = sizeof(SRetrieveFuncReq);
int32_t numOfFuncs = 1;
contLen = (contLen + numOfFuncs * TSDB_FUNC_NAME_LEN);
SRetrieveFuncReq* pReq = (SRetrieveFuncReq*)rpcMallocCont(contLen);
pReq->numOfFuncs = htonl(numOfFuncs);
strcpy(pReq->pFuncNames, "f2");
SRpcMsg* pRsp = test.SendReq(TDMT_MND_RETRIEVE_FUNC, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_MND_INVALID_FUNC);
}
{
int32_t contLen = sizeof(SCreateFuncReq);
int32_t commentSize = 1024;
int32_t codeSize = 9527;
contLen = (contLen + codeSize + commentSize);
SCreateFuncReq* pReq = (SCreateFuncReq*)rpcMallocCont(contLen);
strcpy(pReq->name, "f2");
pReq->igExists = 1;
pReq->funcType = 2;
pReq->scriptType = 3;
pReq->outputType = TSDB_DATA_TYPE_BINARY;
pReq->outputLen = htonl(24);
pReq->bufSize = htonl(6);
pReq->signature = htobe64(18);
pReq->commentSize = htonl(commentSize);
pReq->codeSize = htonl(codeSize);
for (int32_t i = 0; i < commentSize - 1; ++i) {
pReq->pCont[i] = 'p';
}
for (int32_t i = commentSize; i < commentSize + codeSize - 1; ++i) {
pReq->pCont[i] = 'q';
}
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_FUNC, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, 0);
test.SendShowMetaReq(TSDB_MGMT_TABLE_FUNC, "");
CHECK_META("show functions", 7);
test.SendShowRetrieveReq();
EXPECT_EQ(test.GetShowRows(), 2);
}
{
int32_t contLen = sizeof(SRetrieveFuncReq);
int32_t numOfFuncs = 1;
contLen = (contLen + numOfFuncs * TSDB_FUNC_NAME_LEN);
SRetrieveFuncReq* pReq = (SRetrieveFuncReq*)rpcMallocCont(contLen);
pReq->numOfFuncs = htonl(1);
strcpy(pReq->pFuncNames, "f2");
SRpcMsg* pRsp = test.SendReq(TDMT_MND_RETRIEVE_FUNC, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, 0);
SRetrieveFuncRsp* pRetrieveRsp = (SRetrieveFuncRsp*)pRsp->pCont;
pRetrieveRsp->numOfFuncs = htonl(pRetrieveRsp->numOfFuncs);
SFuncInfo* pFuncInfo = (SFuncInfo*)(pRetrieveRsp->pFuncInfos);
pFuncInfo->outputLen = htonl(pFuncInfo->outputLen);
pFuncInfo->bufSize = htonl(pFuncInfo->bufSize);
pFuncInfo->signature = htobe64(pFuncInfo->signature);
pFuncInfo->commentSize = htonl(pFuncInfo->commentSize);
pFuncInfo->codeSize = htonl(pFuncInfo->codeSize);
EXPECT_STREQ(pFuncInfo->name, "f2");
EXPECT_EQ(pFuncInfo->funcType, 2);
EXPECT_EQ(pFuncInfo->scriptType, 3);
EXPECT_EQ(pFuncInfo->outputType, TSDB_DATA_TYPE_BINARY);
EXPECT_EQ(pFuncInfo->outputLen, 24);
EXPECT_EQ(pFuncInfo->bufSize, 6);
EXPECT_EQ(pFuncInfo->signature, 18);
EXPECT_EQ(pFuncInfo->commentSize, 1024);
EXPECT_EQ(pFuncInfo->codeSize, 9527);
char* pComment = pFuncInfo->pCont;
char* pCode = pFuncInfo->pCont + pFuncInfo->commentSize;
char* comments = (char*)calloc(1, 1024);
for (int32_t i = 0; i < 1024 - 1; ++i) {
comments[i] = 'p';
}
char* codes = (char*)calloc(1, 9527);
for (int32_t i = 0; i < 9527 - 1; ++i) {
codes[i] = 'q';
}
EXPECT_STREQ(pComment, comments);
EXPECT_STREQ(pCode, codes);
free(comments);
free(codes);
}
{
int32_t contLen = sizeof(SRetrieveFuncReq);
int32_t numOfFuncs = 2;
contLen = (contLen + numOfFuncs * TSDB_FUNC_NAME_LEN);
SRetrieveFuncReq* pReq = (SRetrieveFuncReq*)rpcMallocCont(contLen);
pReq->numOfFuncs = htonl(numOfFuncs);
strcpy(pReq->pFuncNames, "f2");
strcpy((char*)pReq->pFuncNames + TSDB_FUNC_NAME_LEN, "f1");
SRpcMsg* pRsp = test.SendReq(TDMT_MND_RETRIEVE_FUNC, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, 0);
SRetrieveFuncRsp* pRetrieveRsp = (SRetrieveFuncRsp*)pRsp->pCont;
pRetrieveRsp->numOfFuncs = htonl(pRetrieveRsp->numOfFuncs);
{
SFuncInfo* pFuncInfo = (SFuncInfo*)(pRetrieveRsp->pFuncInfos);
pFuncInfo->outputLen = htonl(pFuncInfo->outputLen);
pFuncInfo->bufSize = htonl(pFuncInfo->bufSize);
pFuncInfo->signature = htobe64(pFuncInfo->signature);
pFuncInfo->commentSize = htonl(pFuncInfo->commentSize);
pFuncInfo->codeSize = htonl(pFuncInfo->codeSize);
EXPECT_STREQ(pFuncInfo->name, "f2");
EXPECT_EQ(pFuncInfo->funcType, 2);
EXPECT_EQ(pFuncInfo->scriptType, 3);
EXPECT_EQ(pFuncInfo->outputType, TSDB_DATA_TYPE_BINARY);
EXPECT_EQ(pFuncInfo->outputLen, 24);
EXPECT_EQ(pFuncInfo->bufSize, 6);
EXPECT_EQ(pFuncInfo->signature, 18);
EXPECT_EQ(pFuncInfo->commentSize, 1024);
EXPECT_EQ(pFuncInfo->codeSize, 9527);
char* pComment = pFuncInfo->pCont;
char* pCode = pFuncInfo->pCont + pFuncInfo->commentSize;
char* comments = (char*)calloc(1, 1024);
for (int32_t i = 0; i < 1024 - 1; ++i) {
comments[i] = 'p';
}
char* codes = (char*)calloc(1, 9527);
for (int32_t i = 0; i < 9527 - 1; ++i) {
codes[i] = 'q';
}
EXPECT_STREQ(pComment, comments);
EXPECT_STREQ(pCode, codes);
free(comments);
free(codes);
}
{
SFuncInfo* pFuncInfo = (SFuncInfo*)(pRetrieveRsp->pFuncInfos + sizeof(SFuncInfo) + 1024 + 9527);
pFuncInfo->outputLen = htonl(pFuncInfo->outputLen);
pFuncInfo->bufSize = htonl(pFuncInfo->bufSize);
pFuncInfo->signature = htobe64(pFuncInfo->signature);
pFuncInfo->commentSize = htonl(pFuncInfo->commentSize);
pFuncInfo->codeSize = htonl(pFuncInfo->codeSize);
EXPECT_STREQ(pFuncInfo->name, "f1");
EXPECT_EQ(pFuncInfo->funcType, 1);
EXPECT_EQ(pFuncInfo->scriptType, 2);
EXPECT_EQ(pFuncInfo->outputType, TSDB_DATA_TYPE_SMALLINT);
EXPECT_EQ(pFuncInfo->outputLen, 12);
EXPECT_EQ(pFuncInfo->bufSize, 4);
EXPECT_EQ(pFuncInfo->signature, 5);
EXPECT_EQ(pFuncInfo->commentSize, TSDB_FUNC_COMMENT_LEN);
EXPECT_EQ(pFuncInfo->codeSize, TSDB_FUNC_CODE_LEN);
char* pComment = pFuncInfo->pCont;
char* pCode = pFuncInfo->pCont + pFuncInfo->commentSize;
char comments[TSDB_FUNC_COMMENT_LEN] = {0};
for (int32_t i = 0; i < TSDB_FUNC_COMMENT_LEN - 1; ++i) {
comments[i] = 'm';
}
char codes[TSDB_FUNC_CODE_LEN] = {0};
for (int32_t i = 0; i < TSDB_FUNC_CODE_LEN - 1; ++i) {
codes[i] = 'd';
}
EXPECT_STREQ(pComment, comments);
EXPECT_STREQ(pCode, codes);
}
}
{
int32_t contLen = sizeof(SRetrieveFuncReq);
int32_t numOfFuncs = 2;
contLen = (contLen + numOfFuncs * TSDB_FUNC_NAME_LEN);
SRetrieveFuncReq* pReq = (SRetrieveFuncReq*)rpcMallocCont(contLen);
pReq->numOfFuncs = htonl(numOfFuncs);
strcpy(pReq->pFuncNames, "f2");
strcpy((char*)pReq->pFuncNames + TSDB_FUNC_NAME_LEN, "f3");
SRpcMsg* pRsp = test.SendReq(TDMT_MND_RETRIEVE_FUNC, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_MND_INVALID_FUNC);
}
}
TEST_F(MndTestFunc, 04_Drop_Func) {
{
int32_t contLen = sizeof(SDropFuncReq);
SDropFuncReq* pReq = (SDropFuncReq*)rpcMallocCont(contLen);
strcpy(pReq->name, "");
SRpcMsg* pRsp = test.SendReq(TDMT_MND_DROP_FUNC, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_MND_INVALID_FUNC_NAME);
}
{
int32_t contLen = sizeof(SDropFuncReq);
SDropFuncReq* pReq = (SDropFuncReq*)rpcMallocCont(contLen);
strcpy(pReq->name, "f3");
SRpcMsg* pRsp = test.SendReq(TDMT_MND_DROP_FUNC, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_MND_FUNC_NOT_EXIST);
}
{
int32_t contLen = sizeof(SDropFuncReq);
SDropFuncReq* pReq = (SDropFuncReq*)rpcMallocCont(contLen);
strcpy(pReq->name, "f3");
pReq->igNotExists = 1;
SRpcMsg* pRsp = test.SendReq(TDMT_MND_DROP_FUNC, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, 0);
}
{
int32_t contLen = sizeof(SDropFuncReq);
SDropFuncReq* pReq = (SDropFuncReq*)rpcMallocCont(contLen);
strcpy(pReq->name, "f1");
SRpcMsg* pRsp = test.SendReq(TDMT_MND_DROP_FUNC, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, 0);
}
test.SendShowMetaReq(TSDB_MGMT_TABLE_FUNC, "");
CHECK_META("show functions", 7);
test.SendShowRetrieveReq();
EXPECT_EQ(test.GetShowRows(), 1);
// restart
test.Restart();
test.SendShowMetaReq(TSDB_MGMT_TABLE_FUNC, "");
CHECK_META("show functions", 7);
test.SendShowRetrieveReq();
EXPECT_EQ(test.GetShowRows(), 1);
CheckBinary("f2", TSDB_FUNC_NAME_LEN);
}
...@@ -171,6 +171,8 @@ int32_t boundIdxCompar(const void *lhs, const void *rhs); ...@@ -171,6 +171,8 @@ int32_t boundIdxCompar(const void *lhs, const void *rhs);
void setBoundColumnInfo(SParsedDataColInfo* pColList, SSchema* pSchema, int32_t numOfCols); void setBoundColumnInfo(SParsedDataColInfo* pColList, SSchema* pSchema, int32_t numOfCols);
void destroyBoundColumnInfo(SParsedDataColInfo* pColList); void destroyBoundColumnInfo(SParsedDataColInfo* pColList);
void destroyBlockArrayList(SArray* pDataBlockList); void destroyBlockArrayList(SArray* pDataBlockList);
void destroyBlockHashmap(SHashObj* pDataBlockHash);
int32_t initMemRowBuilder(SMemRowBuilder *pBuilder, uint32_t nRows, uint32_t nCols, uint32_t nBoundCols, int32_t allNullLen); int32_t initMemRowBuilder(SMemRowBuilder *pBuilder, uint32_t nRows, uint32_t nCols, uint32_t nBoundCols, int32_t allNullLen);
int32_t allocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int32_t * numOfRows); int32_t allocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int32_t * numOfRows);
int32_t getDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, int32_t startOffset, int32_t rowSize, int32_t getDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, int32_t startOffset, int32_t rowSize,
......
...@@ -65,7 +65,7 @@ program ::= cmd. {} ...@@ -65,7 +65,7 @@ program ::= cmd. {}
//////////////////////////////////THE SHOW STATEMENT/////////////////////////////////////////// //////////////////////////////////THE SHOW STATEMENT///////////////////////////////////////////
cmd ::= SHOW DATABASES. { setShowOptions(pInfo, TSDB_MGMT_TABLE_DB, 0, 0);} cmd ::= SHOW DATABASES. { setShowOptions(pInfo, TSDB_MGMT_TABLE_DB, 0, 0);}
cmd ::= SHOW TOPICS. { setShowOptions(pInfo, TSDB_MGMT_TABLE_TP, 0, 0);} cmd ::= SHOW TOPICS. { setShowOptions(pInfo, TSDB_MGMT_TABLE_TP, 0, 0);}
cmd ::= SHOW FUNCTIONS. { setShowOptions(pInfo, TSDB_MGMT_TABLE_FUNCTION, 0, 0);} cmd ::= SHOW FUNCTIONS. { setShowOptions(pInfo, TSDB_MGMT_TABLE_FUNC, 0, 0);}
cmd ::= SHOW MNODES. { setShowOptions(pInfo, TSDB_MGMT_TABLE_MNODE, 0, 0);} cmd ::= SHOW MNODES. { setShowOptions(pInfo, TSDB_MGMT_TABLE_MNODE, 0, 0);}
cmd ::= SHOW DNODES. { setShowOptions(pInfo, TSDB_MGMT_TABLE_DNODE, 0, 0);} cmd ::= SHOW DNODES. { setShowOptions(pInfo, TSDB_MGMT_TABLE_DNODE, 0, 0);}
cmd ::= SHOW ACCOUNTS. { setShowOptions(pInfo, TSDB_MGMT_TABLE_ACCT, 0, 0);} cmd ::= SHOW ACCOUNTS. { setShowOptions(pInfo, TSDB_MGMT_TABLE_ACCT, 0, 0);}
......
...@@ -213,10 +213,11 @@ SQueryStmtInfo *createQueryInfo() { ...@@ -213,10 +213,11 @@ SQueryStmtInfo *createQueryInfo() {
pQueryInfo->slimit.limit = -1; pQueryInfo->slimit.limit = -1;
pQueryInfo->slimit.offset = 0; pQueryInfo->slimit.offset = 0;
pQueryInfo->pDownstream = taosArrayInit(4, POINTER_BYTES); pQueryInfo->pDownstream = taosArrayInit(4, POINTER_BYTES);
pQueryInfo->window = TSWINDOW_INITIALIZER; pQueryInfo->window = TSWINDOW_INITIALIZER;
pQueryInfo->exprList = calloc(10, POINTER_BYTES); pQueryInfo->exprList = calloc(10, POINTER_BYTES);
for(int32_t i = 0; i < 10; ++i) { for(int32_t i = 0; i < 10; ++i) {
pQueryInfo->exprList[i] = taosArrayInit(4, POINTER_BYTES); pQueryInfo->exprList[i] = taosArrayInit(4, POINTER_BYTES);
} }
...@@ -232,7 +233,8 @@ static void destroyQueryInfoImpl(SQueryStmtInfo* pQueryInfo) { ...@@ -232,7 +233,8 @@ static void destroyQueryInfoImpl(SQueryStmtInfo* pQueryInfo) {
cleanupFieldInfo(&pQueryInfo->fieldsInfo); cleanupFieldInfo(&pQueryInfo->fieldsInfo);
dropAllExprInfo(pQueryInfo->exprList, 10); dropAllExprInfo(pQueryInfo->exprList, 10);
pQueryInfo->exprList = NULL;
tfree(pQueryInfo->exprList);
columnListDestroy(pQueryInfo->colList); columnListDestroy(pQueryInfo->colList);
pQueryInfo->colList = NULL; pQueryInfo->colList = NULL;
...@@ -258,10 +260,10 @@ void destroyQueryInfo(SQueryStmtInfo* pQueryInfo) { ...@@ -258,10 +260,10 @@ void destroyQueryInfo(SQueryStmtInfo* pQueryInfo) {
size_t numOfUpstream = taosArrayGetSize(pQueryInfo->pDownstream); size_t numOfUpstream = taosArrayGetSize(pQueryInfo->pDownstream);
for (int32_t i = 0; i < numOfUpstream; ++i) { for (int32_t i = 0; i < numOfUpstream; ++i) {
SQueryStmtInfo* pUpQueryInfo = taosArrayGetP(pQueryInfo->pDownstream, i); SQueryStmtInfo* pDownstream = taosArrayGetP(pQueryInfo->pDownstream, i);
destroyQueryInfoImpl(pUpQueryInfo); destroyQueryInfoImpl(pDownstream);
clearAllTableMetaInfo(pUpQueryInfo, false, 0); clearAllTableMetaInfo(pDownstream, false, 0);
tfree(pUpQueryInfo); tfree(pDownstream);
} }
destroyQueryInfoImpl(pQueryInfo); destroyQueryInfoImpl(pQueryInfo);
...@@ -1395,6 +1397,13 @@ int32_t validateFillNode(SQueryStmtInfo *pQueryInfo, SSqlNode* pSqlNode, SMsgBuf ...@@ -1395,6 +1397,13 @@ int32_t validateFillNode(SQueryStmtInfo *pQueryInfo, SSqlNode* pSqlNode, SMsgBuf
static void pushDownAggFuncExprInfo(SQueryStmtInfo* pQueryInfo); static void pushDownAggFuncExprInfo(SQueryStmtInfo* pQueryInfo);
static void addColumnNodeFromLowerLevel(SQueryStmtInfo* pQueryInfo); static void addColumnNodeFromLowerLevel(SQueryStmtInfo* pQueryInfo);
static void freeItemHelper(void* pItem) {
void** p = pItem;
if (*p != NULL) {
tfree(*p);
}
}
int32_t validateSqlNode(SSqlNode* pSqlNode, SQueryStmtInfo* pQueryInfo, SMsgBuf* pMsgBuf) { int32_t validateSqlNode(SSqlNode* pSqlNode, SQueryStmtInfo* pQueryInfo, SMsgBuf* pMsgBuf) {
assert(pSqlNode != NULL && (pSqlNode->from == NULL || taosArrayGetSize(pSqlNode->from->list) > 0)); assert(pSqlNode != NULL && (pSqlNode->from == NULL || taosArrayGetSize(pSqlNode->from->list) > 0));
...@@ -1590,7 +1599,10 @@ int32_t validateSqlNode(SSqlNode* pSqlNode, SQueryStmtInfo* pQueryInfo, SMsgBuf* ...@@ -1590,7 +1599,10 @@ int32_t validateSqlNode(SSqlNode* pSqlNode, SQueryStmtInfo* pQueryInfo, SMsgBuf*
SArray* functionList = extractFunctionList(pQueryInfo->exprList[i]); SArray* functionList = extractFunctionList(pQueryInfo->exprList[i]);
extractFunctionDesc(functionList, &pQueryInfo->info); extractFunctionDesc(functionList, &pQueryInfo->info);
if ((code = checkForInvalidExpr(pQueryInfo, pMsgBuf)) != TSDB_CODE_SUCCESS) { code = checkForInvalidExpr(pQueryInfo, pMsgBuf);
taosArrayDestroyEx(functionList, freeItemHelper);
if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
} }
...@@ -2902,6 +2914,8 @@ int32_t doAddOneProjectCol(SQueryStmtInfo* pQueryInfo, int32_t outputColIndex, S ...@@ -2902,6 +2914,8 @@ int32_t doAddOneProjectCol(SQueryStmtInfo* pQueryInfo, int32_t outputColIndex, S
} }
pQueryInfo->info.projectionQuery = true; pQueryInfo->info.projectionQuery = true;
taosArrayDestroy(pColumnList);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -3983,5 +3997,9 @@ int32_t qParserValidateSqlNode(SParseContext *pCtx, SSqlInfo* pInfo, SQueryStmtI ...@@ -3983,5 +3997,9 @@ int32_t qParserValidateSqlNode(SParseContext *pCtx, SSqlInfo* pInfo, SQueryStmtI
validateSqlNode(p, pQueryInfo, &buf); validateSqlNode(p, pQueryInfo, &buf);
} }
taosArrayDestroy(data.pTableMeta);
taosArrayDestroy(req.pUdf);
taosArrayDestroy(req.pTableName);
return code; return code;
} }
...@@ -249,7 +249,7 @@ static FORCE_INLINE void convertSMemRow(SMemRow dest, SMemRow src, STableDataBlo ...@@ -249,7 +249,7 @@ static FORCE_INLINE void convertSMemRow(SMemRow dest, SMemRow src, STableDataBlo
} }
} }
void destroyDataBlock(STableDataBlocks* pDataBlock) { static void destroyDataBlock(STableDataBlocks* pDataBlock) {
if (pDataBlock == NULL) { if (pDataBlock == NULL) {
return; return;
} }
...@@ -273,12 +273,29 @@ void destroyBlockArrayList(SArray* pDataBlockList) { ...@@ -273,12 +273,29 @@ void destroyBlockArrayList(SArray* pDataBlockList) {
size_t size = taosArrayGetSize(pDataBlockList); size_t size = taosArrayGetSize(pDataBlockList);
for (int32_t i = 0; i < size; i++) { for (int32_t i = 0; i < size; i++) {
destroyDataBlock(taosArrayGetP(pDataBlockList, i)); void* p = taosArrayGetP(pDataBlockList, i);
destroyDataBlock(p);
} }
taosArrayDestroy(pDataBlockList); taosArrayDestroy(pDataBlockList);
} }
void destroyBlockHashmap(SHashObj* pDataBlockHash) {
if (pDataBlockHash == NULL) {
return;
}
void** p1 = taosHashIterate(pDataBlockHash, NULL);
while (p1) {
STableDataBlocks* pBlocks = *p1;
destroyDataBlock(pBlocks);
p1 = taosHashIterate(pDataBlockHash, p1);
}
taosHashCleanup(pDataBlockHash);
}
// data block is disordered, sort it in ascending order // data block is disordered, sort it in ascending order
void sortRemoveDataBlockDupRowsRaw(STableDataBlocks *dataBuf) { void sortRemoveDataBlockDupRowsRaw(STableDataBlocks *dataBuf) {
SSubmitBlk *pBlocks = (SSubmitBlk *)dataBuf->pData; SSubmitBlk *pBlocks = (SSubmitBlk *)dataBuf->pData;
...@@ -490,7 +507,7 @@ int32_t mergeTableDataBlocks(SHashObj* pHashObj, int8_t schemaAttached, uint8_t ...@@ -490,7 +507,7 @@ int32_t mergeTableDataBlocks(SHashObj* pHashObj, int8_t schemaAttached, uint8_t
} }
// the maximum expanded size in byte when a row-wise data is converted to SDataRow format // the maximum expanded size in byte when a row-wise data is converted to SDataRow format
int32_t expandSize = isRawPayload ? getRowExpandSize(pOneTableBlock->pTableMeta) : 0; int32_t expandSize = isRawPayload ? getRowExpandSize(pOneTableBlock->pTableMeta) : 0;
int64_t destSize = dataBuf->size + pOneTableBlock->size + pBlocks->numOfRows * expandSize + int64_t destSize = dataBuf->size + pOneTableBlock->size + pBlocks->numOfRows * expandSize +
sizeof(STColumn) * getNumOfColumns(pOneTableBlock->pTableMeta); sizeof(STColumn) * getNumOfColumns(pOneTableBlock->pTableMeta);
......
...@@ -525,10 +525,28 @@ static void destroyInsertParseContextForTable(SInsertParseContext* pCxt) { ...@@ -525,10 +525,28 @@ static void destroyInsertParseContextForTable(SInsertParseContext* pCxt) {
tdDestroyKVRowBuilder(&pCxt->tagsBuilder); tdDestroyKVRowBuilder(&pCxt->tagsBuilder);
} }
static void destroyDataBlock(STableDataBlocks* pDataBlock) {
if (pDataBlock == NULL) {
return;
}
tfree(pDataBlock->pData);
if (!pDataBlock->cloned) {
// free the refcount for metermeta
if (pDataBlock->pTableMeta != NULL) {
tfree(pDataBlock->pTableMeta);
}
destroyBoundColumnInfo(&pDataBlock->boundColumnInfo);
}
tfree(pDataBlock);
}
static void destroyInsertParseContext(SInsertParseContext* pCxt) { static void destroyInsertParseContext(SInsertParseContext* pCxt) {
destroyInsertParseContextForTable(pCxt); destroyInsertParseContextForTable(pCxt);
taosHashCleanup(pCxt->pVgroupsHashObj); taosHashCleanup(pCxt->pVgroupsHashObj);
taosHashCleanup(pCxt->pTableBlockHashObj);
destroyBlockHashmap(pCxt->pTableBlockHashObj);
destroyBlockArrayList(pCxt->pTableDataBlocks); destroyBlockArrayList(pCxt->pTableDataBlocks);
destroyBlockArrayList(pCxt->pVgDataBlocks); destroyBlockArrayList(pCxt->pVgDataBlocks);
} }
......
...@@ -248,10 +248,15 @@ void qDestroyQuery(SQueryNode* pQueryNode) { ...@@ -248,10 +248,15 @@ void qDestroyQuery(SQueryNode* pQueryNode) {
if (NULL == pQueryNode) { if (NULL == pQueryNode) {
return; return;
} }
if (nodeType(pQueryNode) == TSDB_SQL_INSERT || nodeType(pQueryNode) == TSDB_SQL_CREATE_TABLE) {
int32_t type = nodeType(pQueryNode);
if (type == TSDB_SQL_INSERT || type == TSDB_SQL_CREATE_TABLE) {
SVnodeModifOpStmtInfo* pModifInfo = (SVnodeModifOpStmtInfo*)pQueryNode; SVnodeModifOpStmtInfo* pModifInfo = (SVnodeModifOpStmtInfo*)pQueryNode;
taosArrayDestroy(pModifInfo->pDataBlocks); taosArrayDestroy(pModifInfo->pDataBlocks);
}
tfree(pQueryNode); tfree(pQueryNode);
} else if (type == TSDB_SQL_SELECT) {
SQueryStmtInfo* pQueryStmtInfo = (SQueryStmtInfo*) pQueryNode;
destroyQueryInfo(pQueryStmtInfo);
}
} }
...@@ -732,18 +732,8 @@ void cleanupFieldInfo(SFieldInfo* pFieldInfo) { ...@@ -732,18 +732,8 @@ void cleanupFieldInfo(SFieldInfo* pFieldInfo) {
return; return;
} }
if (pFieldInfo->internalField != NULL) {
size_t num = taosArrayGetSize(pFieldInfo->internalField);
for (int32_t i = 0; i < num; ++i) {
// SInternalField* pfield = taosArrayGet(pFieldInfo->internalField, i);
// if (pfield->pExpr != NULL && pfield->pExpr->pExpr != NULL) {
// sqlExprDestroy(pfield->pExpr);
// }
}
}
taosArrayDestroy(pFieldInfo->internalField); taosArrayDestroy(pFieldInfo->internalField);
// tfree(pFieldInfo->final); tfree(pFieldInfo->final);
memset(pFieldInfo, 0, sizeof(SFieldInfo)); memset(pFieldInfo, 0, sizeof(SFieldInfo));
} }
......
...@@ -191,10 +191,12 @@ void destroyExprInfo(SExprInfo* pExprInfo) { ...@@ -191,10 +191,12 @@ void destroyExprInfo(SExprInfo* pExprInfo) {
for(int32_t i = 0; i < pExprInfo->base.numOfParams; ++i) { for(int32_t i = 0; i < pExprInfo->base.numOfParams; ++i) {
taosVariantDestroy(&pExprInfo->base.param[i]); taosVariantDestroy(&pExprInfo->base.param[i]);
} }
tfree(pExprInfo->base.pColumns);
tfree(pExprInfo); tfree(pExprInfo);
} }
static void dropOneLevelExprInfo(SArray* pExprInfo) { void dropOneLevelExprInfo(SArray* pExprInfo) {
size_t size = taosArrayGetSize(pExprInfo); size_t size = taosArrayGetSize(pExprInfo);
for (int32_t i = 0; i < size; ++i) { for (int32_t i = 0; i < size; ++i) {
...@@ -239,6 +241,9 @@ void assignExprInfo(SExprInfo* dst, const SExprInfo* src) { ...@@ -239,6 +241,9 @@ void assignExprInfo(SExprInfo* dst, const SExprInfo* src) {
#endif #endif
dst->pExpr = exprdup(src->pExpr); dst->pExpr = exprdup(src->pExpr);
dst->base.pColumns = calloc(src->base.numOfCols, sizeof(SColumn));
memcpy(dst->base.pColumns, src->base.pColumns, sizeof(SColumn) * src->base.numOfCols);
memset(dst->base.param, 0, sizeof(SVariant) * tListLen(dst->base.param)); memset(dst->base.param, 0, sizeof(SVariant) * tListLen(dst->base.param));
for (int32_t j = 0; j < src->base.numOfParams; ++j) { for (int32_t j = 0; j < src->base.numOfParams; ++j) {
taosVariantAssign(&dst->base.param[j], &src->base.param[j]); taosVariantAssign(&dst->base.param[j], &src->base.param[j]);
......
...@@ -2232,7 +2232,7 @@ static void yy_reduce( ...@@ -2232,7 +2232,7 @@ static void yy_reduce(
{ setShowOptions(pInfo, TSDB_MGMT_TABLE_TP, 0, 0);} { setShowOptions(pInfo, TSDB_MGMT_TABLE_TP, 0, 0);}
break; break;
case 3: /* cmd ::= SHOW FUNCTIONS */ case 3: /* cmd ::= SHOW FUNCTIONS */
{ setShowOptions(pInfo, TSDB_MGMT_TABLE_FUNCTION, 0, 0);} { setShowOptions(pInfo, TSDB_MGMT_TABLE_FUNC, 0, 0);}
break; break;
case 4: /* cmd ::= SHOW MNODES */ case 4: /* cmd ::= SHOW MNODES */
{ setShowOptions(pInfo, TSDB_MGMT_TABLE_MNODE, 0, 0);} { setShowOptions(pInfo, TSDB_MGMT_TABLE_MNODE, 0, 0);}
......
...@@ -265,7 +265,6 @@ static SQueryPlanNode* doCreateQueryPlanForSingleTableImpl(const SQueryStmtInfo* ...@@ -265,7 +265,6 @@ static SQueryPlanNode* doCreateQueryPlanForSingleTableImpl(const SQueryStmtInfo*
} else { } else {
// here we can push down the projection to tablescan operator. // here we can push down the projection to tablescan operator.
pNode->numOfExpr = num; pNode->numOfExpr = num;
pNode->pExpr = taosArrayInit(num, POINTER_BYTES);
taosArrayAddAll(pNode->pExpr, p); taosArrayAddAll(pNode->pExpr, p);
} }
} }
...@@ -357,7 +356,6 @@ SArray* createQueryPlanImpl(const SQueryStmtInfo* pQueryInfo) { ...@@ -357,7 +356,6 @@ SArray* createQueryPlanImpl(const SQueryStmtInfo* pQueryInfo) {
SArray* exprList = taosArrayInit(4, POINTER_BYTES); SArray* exprList = taosArrayInit(4, POINTER_BYTES);
if (copyExprInfoList(exprList, pQueryInfo->exprList[0], uid, true) != 0) { if (copyExprInfoList(exprList, pQueryInfo->exprList[0], uid, true) != 0) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
// dropAllExprInfo(exprList);
exit(-1); exit(-1);
} }
...@@ -373,7 +371,6 @@ SArray* createQueryPlanImpl(const SQueryStmtInfo* pQueryInfo) { ...@@ -373,7 +371,6 @@ SArray* createQueryPlanImpl(const SQueryStmtInfo* pQueryInfo) {
// 4. add the projection query node // 4. add the projection query node
SQueryPlanNode* pNode = doAddTableColumnNode(pQueryInfo, &info, exprList, tableColumnList); SQueryPlanNode* pNode = doAddTableColumnNode(pQueryInfo, &info, exprList, tableColumnList);
columnListDestroy(tableColumnList); columnListDestroy(tableColumnList);
// dropAllExprInfo(exprList);
taosArrayPush(pDownstream, &pNode); taosArrayPush(pDownstream, &pNode);
} }
...@@ -398,7 +395,8 @@ SArray* createQueryPlanImpl(const SQueryStmtInfo* pQueryInfo) { ...@@ -398,7 +395,8 @@ SArray* createQueryPlanImpl(const SQueryStmtInfo* pQueryInfo) {
} }
static void doDestroyQueryNode(SQueryPlanNode* pQueryNode) { static void doDestroyQueryNode(SQueryPlanNode* pQueryNode) {
if (pQueryNode->info.type == QNODE_MODIFY) { int32_t type = nodeType(pQueryNode);
if (type == QNODE_MODIFY) {
SDataPayloadInfo* pInfo = pQueryNode->pExtInfo; SDataPayloadInfo* pInfo = pQueryNode->pExtInfo;
size_t size = taosArrayGetSize(pInfo->payload); size_t size = taosArrayGetSize(pInfo->payload);
...@@ -410,10 +408,17 @@ static void doDestroyQueryNode(SQueryPlanNode* pQueryNode) { ...@@ -410,10 +408,17 @@ static void doDestroyQueryNode(SQueryPlanNode* pQueryNode) {
taosArrayDestroy(pInfo->payload); taosArrayDestroy(pInfo->payload);
} }
if (type == QNODE_STREAMSCAN || type == QNODE_TABLESCAN) {
SQueryTableInfo* pQueryTableInfo = pQueryNode->pExtInfo;
tfree(pQueryTableInfo->tableName);
}
printf("----------->Free:%p\n", pQueryNode->pExpr);
taosArrayDestroy(pQueryNode->pExpr);
tfree(pQueryNode->pExtInfo); tfree(pQueryNode->pExtInfo);
tfree(pQueryNode->pSchema); tfree(pQueryNode->pSchema);
tfree(pQueryNode->info.name); tfree(pQueryNode->info.name);
// dropAllExprInfo(pQueryNode->pExpr);
if (pQueryNode->pChildren != NULL) { if (pQueryNode->pChildren != NULL) {
int32_t size = (int32_t) taosArrayGetSize(pQueryNode->pChildren); int32_t size = (int32_t) taosArrayGetSize(pQueryNode->pChildren);
......
...@@ -155,6 +155,16 @@ static SPhyNode* initPhyNode(SQueryPlanNode* pPlanNode, int32_t type, int32_t si ...@@ -155,6 +155,16 @@ static SPhyNode* initPhyNode(SQueryPlanNode* pPlanNode, int32_t type, int32_t si
return node; return node;
} }
static void cleanupPhyNode(SPhyNode* pPhyNode) {
if (pPhyNode == NULL) {
return;
}
dropOneLevelExprInfo(pPhyNode->pTargets);
tfree(pPhyNode->targetSchema.pSchema);
tfree(pPhyNode);
}
static SPhyNode* initScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable, int32_t type, int32_t size) { static SPhyNode* initScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable, int32_t type, int32_t size) {
SScanPhyNode* node = (SScanPhyNode*) initPhyNode(pPlanNode, type, size); SScanPhyNode* node = (SScanPhyNode*) initPhyNode(pPlanNode, type, size);
...@@ -445,3 +455,29 @@ void setExchangSourceNode(uint64_t templateId, SDownstreamSource *pSource, SPhyN ...@@ -445,3 +455,29 @@ void setExchangSourceNode(uint64_t templateId, SDownstreamSource *pSource, SPhyN
void setSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SDownstreamSource* pSource) { void setSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SDownstreamSource* pSource) {
setExchangSourceNode(templateId, pSource, subplan->pNode); setExchangSourceNode(templateId, pSource, subplan->pNode);
} }
static void destroyDataSinkNode(SDataSink* pSinkNode) {
if (pSinkNode == NULL) {
return;
}
if (nodeType(pSinkNode) == DSINK_Dispatch) {
SDataDispatcher* pDdSink = (SDataDispatcher*)pSinkNode;
tfree(pDdSink->sink.schema.pSchema);
}
tfree(pSinkNode);
}
void qDestroySubplan(SSubplan* pSubplan) {
if (pSubplan == NULL) {
return;
}
taosArrayDestroy(pSubplan->pChildren);
taosArrayDestroy(pSubplan->pParents);
destroyDataSinkNode(pSubplan->pDataSink);
cleanupPhyNode(pSubplan->pNode);
tfree(pSubplan);
}
...@@ -1121,8 +1121,10 @@ int32_t subPlanToString(const SSubplan* subplan, char** str, int32_t* len) { ...@@ -1121,8 +1121,10 @@ int32_t subPlanToString(const SSubplan* subplan, char** str, int32_t* len) {
} }
*str = cJSON_Print(json); *str = cJSON_Print(json);
// printf("====Physical plan:====\n"); cJSON_Delete(json);
// printf("%s\n", *str);
printf("====Physical plan:====\n");
printf("%s\n", *str);
*len = strlen(*str) + 1; *len = strlen(*str) + 1;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -18,25 +18,6 @@ ...@@ -18,25 +18,6 @@
static void extractResSchema(struct SQueryDag* const* pDag, SSchema** pResSchema, int32_t* numOfCols); static void extractResSchema(struct SQueryDag* const* pDag, SSchema** pResSchema, int32_t* numOfCols);
static void destroyDataSinkNode(SDataSink* pSinkNode) {
if (pSinkNode == NULL) {
return;
}
tfree(pSinkNode);
}
void qDestroySubplan(SSubplan* pSubplan) {
if (pSubplan == NULL) {
return;
}
taosArrayDestroy(pSubplan->pChildren);
taosArrayDestroy(pSubplan->pParents);
destroyDataSinkNode(pSubplan->pDataSink);
// todo destroy pNode
tfree(pSubplan);
}
void qDestroyQueryDag(struct SQueryDag* pDag) { void qDestroyQueryDag(struct SQueryDag* pDag) {
if (pDag == NULL) { if (pDag == NULL) {
return; return;
...@@ -51,6 +32,7 @@ void qDestroyQueryDag(struct SQueryDag* pDag) { ...@@ -51,6 +32,7 @@ void qDestroyQueryDag(struct SQueryDag* pDag) {
SSubplan* pSubplan = taosArrayGetP(pa, j); SSubplan* pSubplan = taosArrayGetP(pa, j);
qDestroySubplan(pSubplan); qDestroySubplan(pSubplan);
} }
taosArrayDestroy(pa); taosArrayDestroy(pa);
} }
......
...@@ -1512,9 +1512,7 @@ int32_t schedulerConvertDagToTaskList(SQueryDag* pDag, SArray **pTasks) { ...@@ -1512,9 +1512,7 @@ int32_t schedulerConvertDagToTaskList(SQueryDag* pDag, SArray **pTasks) {
info = NULL; info = NULL;
_return: _return:
schedulerFreeTaskList(info); schedulerFreeTaskList(info);
SCH_RET(code); SCH_RET(code);
} }
......
...@@ -123,9 +123,9 @@ typedef struct { ...@@ -123,9 +123,9 @@ typedef struct {
} SRpcReqContext; } SRpcReqContext;
typedef struct { typedef struct {
SRpcInfo* pRpc; // associated SRpcInfo SRpcInfo* pTransInst; // associated SRpcInfo
SEpSet epSet; // ip list provided by app SEpSet epSet; // ip list provided by app
void* ahandle; // handle provided by app void* ahandle; // handle provided by app
// struct SRpcConn* pConn; // pConn allocated // struct SRpcConn* pConn; // pConn allocated
tmsg_t msgType; // message type tmsg_t msgType; // message type
uint8_t* pCont; // content provided by app uint8_t* pCont; // content provided by app
......
...@@ -42,6 +42,8 @@ int tsRpcMaxRetry; ...@@ -42,6 +42,8 @@ int tsRpcMaxRetry;
int tsRpcHeadSize; int tsRpcHeadSize;
int tsRpcOverhead; int tsRpcOverhead;
SHashObj *tsFqdnHash;
#ifndef USE_UV #ifndef USE_UV
typedef struct { typedef struct {
...@@ -215,6 +217,8 @@ static void rpcInitImp(void) { ...@@ -215,6 +217,8 @@ static void rpcInitImp(void) {
tsRpcOverhead = sizeof(SRpcReqContext); tsRpcOverhead = sizeof(SRpcReqContext);
tsRpcRefId = taosOpenRef(200, rpcFree); tsRpcRefId = taosOpenRef(200, rpcFree);
tsFqdnHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
} }
int32_t rpcInit(void) { int32_t rpcInit(void) {
...@@ -224,6 +228,9 @@ int32_t rpcInit(void) { ...@@ -224,6 +228,9 @@ int32_t rpcInit(void) {
void rpcCleanup(void) { void rpcCleanup(void) {
taosCloseRef(tsRpcRefId); taosCloseRef(tsRpcRefId);
taosHashClear(tsFqdnHash);
taosHashCleanup(tsFqdnHash);
tsFqdnHash = NULL;
tsRpcRefId = -1; tsRpcRefId = -1;
} }
...@@ -571,7 +578,17 @@ static void rpcFreeMsg(void *msg) { ...@@ -571,7 +578,17 @@ static void rpcFreeMsg(void *msg) {
static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerFqdn, uint16_t peerPort, int8_t connType) { static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerFqdn, uint16_t peerPort, int8_t connType) {
SRpcConn *pConn; SRpcConn *pConn;
uint32_t peerIp = taosGetIpv4FromFqdn(peerFqdn); uint32_t peerIp = 0;
uint32_t *pPeerIp = taosHashGet(tsFqdnHash, peerFqdn, strlen(peerFqdn) + 1);
if (pPeerIp != NULL) {
peerIp = *pPeerIp;
} else {
peerIp = taosGetIpv4FromFqdn(peerFqdn);
if (peerIp != 0xFFFFFFFF) {
taosHashPut(tsFqdnHash, peerFqdn, strlen(peerFqdn) + 1, &peerIp, sizeof(peerIp));
}
}
if (peerIp == 0xFFFFFFFF) { if (peerIp == 0xFFFFFFFF) {
tError("%s, failed to resolve FQDN:%s", pRpc->label, peerFqdn); tError("%s, failed to resolve FQDN:%s", pRpc->label, peerFqdn);
terrno = TSDB_CODE_RPC_FQDN_ERROR; terrno = TSDB_CODE_RPC_FQDN_ERROR;
......
...@@ -30,6 +30,7 @@ typedef struct SCliConn { ...@@ -30,6 +30,7 @@ typedef struct SCliConn {
char spi; char spi;
char secured; char secured;
uint64_t expireTime; uint64_t expireTime;
int8_t notifyCount; // timers already notify to client
} SCliConn; } SCliConn;
typedef struct SCliMsg { typedef struct SCliMsg {
...@@ -72,8 +73,6 @@ static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* co ...@@ -72,8 +73,6 @@ static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* co
// register timer in each thread to clear expire conn // register timer in each thread to clear expire conn
static void clientTimeoutCb(uv_timer_t* handle); static void clientTimeoutCb(uv_timer_t* handle);
// process data read from server, auth/decompress etc later
static void clientHandleResp(SCliConn* conn);
// check whether already read complete packet from server // check whether already read complete packet from server
static bool clientReadComplete(SConnBuffer* pBuf); static bool clientReadComplete(SConnBuffer* pBuf);
// alloc buf for read // alloc buf for read
...@@ -88,10 +87,15 @@ static void clientAsyncCb(uv_async_t* handle); ...@@ -88,10 +87,15 @@ static void clientAsyncCb(uv_async_t* handle);
static void clientDestroy(uv_handle_t* handle); static void clientDestroy(uv_handle_t* handle);
static void clientConnDestroy(SCliConn* pConn, bool clear /*clear tcp handle or not*/); static void clientConnDestroy(SCliConn* pConn, bool clear /*clear tcp handle or not*/);
static void clientMsgDestroy(SCliMsg* pMsg); // process data read from server, auth/decompress etc later
static void clientHandleResp(SCliConn* conn);
// handle except about conn
static void clientHandleExcept(SCliConn* conn);
// handle req from app // handle req from app
static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd); static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd);
static void clientMsgDestroy(SCliMsg* pMsg);
static void destroyTransConnCtx(STransConnCtx* ctx);
// thread obj // thread obj
static SCliThrdObj* createThrdObj(); static SCliThrdObj* createThrdObj();
static void destroyThrdObj(SCliThrdObj* pThrd); static void destroyThrdObj(SCliThrdObj* pThrd);
...@@ -100,22 +104,38 @@ static void* clientThread(void* arg); ...@@ -100,22 +104,38 @@ static void* clientThread(void* arg);
static void clientHandleResp(SCliConn* conn) { static void clientHandleResp(SCliConn* conn) {
STransConnCtx* pCtx = ((SCliMsg*)conn->data)->ctx; STransConnCtx* pCtx = ((SCliMsg*)conn->data)->ctx;
SRpcInfo* pRpc = pCtx->pRpc; SRpcInfo* pRpc = pCtx->pTransInst;
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
rpcMsg.pCont = conn->readBuf.buf; rpcMsg.pCont = conn->readBuf.buf;
rpcMsg.contLen = conn->readBuf.len; rpcMsg.contLen = conn->readBuf.len;
rpcMsg.ahandle = pCtx->ahandle; rpcMsg.ahandle = pCtx->ahandle;
(pRpc->cfp)(NULL, &rpcMsg, NULL); (pRpc->cfp)(NULL, &rpcMsg, NULL);
conn->notifyCount += 1;
SCliThrdObj* pThrd = conn->hostThrd; SCliThrdObj* pThrd = conn->hostThrd;
addConnToPool(pThrd->pool, pCtx->ip, pCtx->port, conn); addConnToPool(pThrd->pool, pCtx->ip, pCtx->port, conn);
// start thread's timer of conn pool if not active
if (!uv_is_active((uv_handle_t*)pThrd->pTimer) && pRpc->idleTime > 0) { if (!uv_is_active((uv_handle_t*)pThrd->pTimer) && pRpc->idleTime > 0) {
uv_timer_start((uv_timer_t*)pThrd->pTimer, clientTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0); uv_timer_start((uv_timer_t*)pThrd->pTimer, clientTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0);
} }
free(pCtx->ip); destroyTransConnCtx(pCtx);
free(pCtx); }
// impl static void clientHandleExcept(SCliConn* pConn) {
SCliMsg* pMsg = pConn->data;
STransConnCtx* pCtx = pMsg->ctx;
SRpcInfo* pRpc = pCtx->pTransInst;
SRpcMsg rpcMsg;
rpcMsg.ahandle = pCtx->ahandle;
// SRpcInfo* pRpc = pMsg->ctx->pRpc;
(pRpc->cfp)(NULL, &rpcMsg, NULL);
pConn->notifyCount += 1;
destroyTransConnCtx(pCtx);
clientConnDestroy(pConn, true);
} }
static void clientTimeoutCb(uv_timer_t* handle) { static void clientTimeoutCb(uv_timer_t* handle) {
...@@ -191,6 +211,7 @@ static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn) { ...@@ -191,6 +211,7 @@ static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn) {
SRpcInfo* pRpc = ((SCliThrdObj*)conn->hostThrd)->pTransInst; SRpcInfo* pRpc = ((SCliThrdObj*)conn->hostThrd)->pTransInst;
conn->expireTime = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime); conn->expireTime = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime);
SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key)); SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key));
conn->notifyCount = 0;
// list already create before // list already create before
assert(plist != NULL); assert(plist != NULL);
QUEUE_PUSH(&plist->conn, &conn->conn); QUEUE_PUSH(&plist->conn, &conn->conn);
...@@ -246,19 +267,21 @@ static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf ...@@ -246,19 +267,21 @@ static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf
if (nread > 0) { if (nread > 0) {
pBuf->len += nread; pBuf->len += nread;
if (clientReadComplete(pBuf)) { if (clientReadComplete(pBuf)) {
tDebug("alread read complete"); tDebug("conn %p read complete", conn);
clientHandleResp(conn); clientHandleResp(conn);
} else { } else {
tDebug("read half packet, continue to read"); tDebug("conn %p read partial packet, continue to read", conn);
} }
return; return;
} }
assert(nread <= 0); assert(nread <= 0);
if (nread == 0) { if (nread == 0) {
tError("conn %p closed", conn);
return; return;
} }
if (nread != UV_EOF) { if (nread < 0) {
tDebug("read error %s", uv_err_name(nread)); tError("conn %p read error: %s", conn, uv_err_name(nread));
clientHandleExcept(conn);
} }
// tDebug("Read error %s\n", uv_err_name(nread)); // tDebug("Read error %s\n", uv_err_name(nread));
// uv_close((uv_handle_t*)handle, clientDestroy); // uv_close((uv_handle_t*)handle, clientDestroy);
...@@ -282,19 +305,20 @@ static void clientDestroy(uv_handle_t* handle) { ...@@ -282,19 +305,20 @@ static void clientDestroy(uv_handle_t* handle) {
static void clientWriteCb(uv_write_t* req, int status) { static void clientWriteCb(uv_write_t* req, int status) {
SCliConn* pConn = req->data; SCliConn* pConn = req->data;
if (status == 0) { if (status == 0) {
tDebug("data already was written on stream"); tDebug("conn %p data already was written out", pConn);
} else { } else {
tError("failed to write: %s", uv_err_name(status)); tError("conn %p failed to write: %s", pConn, uv_err_name(status));
clientConnDestroy(pConn, true); clientHandleExcept(pConn);
return; return;
} }
SCliThrdObj* pThrd = pConn->hostThrd; SCliThrdObj* pThrd = pConn->hostThrd;
if (pConn->stream == NULL) { // if (pConn->stream == NULL) {
pConn->stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t)); // pConn->stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t));
uv_tcp_init(pThrd->loop, (uv_tcp_t*)pConn->stream); // uv_tcp_init(pThrd->loop, (uv_tcp_t*)pConn->stream);
pConn->stream->data = pConn; // pConn->stream->data = pConn;
} //}
uv_read_start((uv_stream_t*)pConn->stream, clientAllocReadBufferCb, clientReadCb); uv_read_start((uv_stream_t*)pConn->stream, clientAllocReadBufferCb, clientReadCb);
// impl later // impl later
} }
...@@ -310,30 +334,19 @@ static void clientWrite(SCliConn* pConn) { ...@@ -310,30 +334,19 @@ static void clientWrite(SCliConn* pConn) {
pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
uv_buf_t wb = uv_buf_init((char*)pHead, msgLen); uv_buf_t wb = uv_buf_init((char*)pHead, msgLen);
tDebug("data write out, msgType : %d, len: %d", pHead->msgType, msgLen); tDebug("conn %p data write out, msgType : %d, len: %d", pConn, pHead->msgType, msgLen);
uv_write(pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, clientWriteCb); uv_write(pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, clientWriteCb);
} }
static void clientConnCb(uv_connect_t* req, int status) { static void clientConnCb(uv_connect_t* req, int status) {
// impl later // impl later
SCliConn* pConn = req->data; SCliConn* pConn = req->data;
SCliMsg* pMsg = pConn->data;
STransConnCtx* pCtx = pMsg->ctx;
SRpcInfo* pRpc = pCtx->pRpc;
if (status != 0) { if (status != 0) {
// tError("failed to connect server(%s, %d), errmsg: %s", pCtx->ip, pCtx->port, uv_strerror(status)); // tError("failed to connect server(%s, %d), errmsg: %s", pCtx->ip, pCtx->port, uv_strerror(status));
tError("failed to connect server, errmsg: %s", uv_strerror(status)); tError("conn %p failed to connect server: %s", pConn, uv_strerror(status));
// call user fp later clientHandleExcept(pConn);
SRpcMsg rpcMsg;
rpcMsg.ahandle = pCtx->ahandle;
// SRpcInfo* pRpc = pMsg->ctx->pRpc;
(pRpc->cfp)(NULL, &rpcMsg, NULL);
clientConnDestroy(pConn, true);
// uv_close((uv_handle_t*)req->handle, clientDestroy);
return; return;
} }
tDebug("conn %p create", pConn);
assert(pConn->stream == req->handle); assert(pConn->stream == req->handle);
clientWrite(pConn); clientWrite(pConn);
...@@ -349,6 +362,7 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { ...@@ -349,6 +362,7 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
SCliConn* conn = getConnFromPool(pThrd->pool, pCtx->ip, pCtx->port); SCliConn* conn = getConnFromPool(pThrd->pool, pCtx->ip, pCtx->port);
if (conn != NULL) { if (conn != NULL) {
// impl later // impl later
tDebug("conn %p get from conn pool", conn);
conn->data = pMsg; conn->data = pMsg;
conn->writeReq->data = conn; conn->writeReq->data = conn;
...@@ -462,6 +476,13 @@ static void destroyThrdObj(SCliThrdObj* pThrd) { ...@@ -462,6 +476,13 @@ static void destroyThrdObj(SCliThrdObj* pThrd) {
free(pThrd->loop); free(pThrd->loop);
free(pThrd); free(pThrd);
} }
static void destroyTransConnCtx(STransConnCtx* ctx) {
if (ctx != NULL) {
free(ctx->ip);
}
free(ctx);
}
// //
void taosCloseClient(void* arg) { void taosCloseClient(void* arg) {
// impl later // impl later
...@@ -472,7 +493,6 @@ void taosCloseClient(void* arg) { ...@@ -472,7 +493,6 @@ void taosCloseClient(void* arg) {
free(cli->pThreadObj); free(cli->pThreadObj);
free(cli); free(cli);
} }
void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) { void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) {
// impl later // impl later
char* ip = (char*)(pEpSet->fqdn[pEpSet->inUse]); char* ip = (char*)(pEpSet->fqdn[pEpSet->inUse]);
...@@ -487,7 +507,7 @@ void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* ...@@ -487,7 +507,7 @@ void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t*
STransConnCtx* pCtx = calloc(1, sizeof(STransConnCtx)); STransConnCtx* pCtx = calloc(1, sizeof(STransConnCtx));
pCtx->pRpc = (SRpcInfo*)shandle; pCtx->pTransInst = (SRpcInfo*)shandle;
pCtx->ahandle = pMsg->ahandle; pCtx->ahandle = pMsg->ahandle;
pCtx->msgType = pMsg->msgType; pCtx->msgType = pMsg->msgType;
pCtx->ip = strdup(ip); pCtx->ip = strdup(ip);
......
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
#ifdef USE_UV #ifdef USE_UV
#include "transComm.h" #include "transComm.h"
typedef struct SConn { typedef struct SConn {
uv_tcp_t* pTcp; uv_tcp_t* pTcp;
uv_write_t* pWriter; uv_write_t* pWriter;
...@@ -26,7 +27,6 @@ typedef struct SConn { ...@@ -26,7 +27,6 @@ typedef struct SConn {
int ref; int ref;
int persist; // persist connection or not int persist; // persist connection or not
SConnBuffer connBuf; // read buf, SConnBuffer connBuf; // read buf,
int count;
int inType; int inType;
void* pTransInst; // rpc init void* pTransInst; // rpc init
void* ahandle; // void* ahandle; //
...@@ -226,7 +226,7 @@ static void uvHandleActivityTimeout(uv_timer_t* handle) { ...@@ -226,7 +226,7 @@ static void uvHandleActivityTimeout(uv_timer_t* handle) {
tDebug("%p timeout since no activity", conn); tDebug("%p timeout since no activity", conn);
} }
static void uvProcessData(SConn* pConn) { static void uvHandleReq(SConn* pConn) {
SRecvInfo info; SRecvInfo info;
SRecvInfo* p = &info; SRecvInfo* p = &info;
SConnBuffer* pBuf = &pConn->connBuf; SConnBuffer* pBuf = &pConn->connBuf;
...@@ -271,6 +271,7 @@ static void uvProcessData(SConn* pConn) { ...@@ -271,6 +271,7 @@ static void uvProcessData(SConn* pConn) {
rpcMsg.ahandle = NULL; rpcMsg.ahandle = NULL;
rpcMsg.handle = pConn; rpcMsg.handle = pConn;
pConn->ref++;
(*(pRpc->cfp))(pRpc->parent, &rpcMsg, NULL); (*(pRpc->cfp))(pRpc->parent, &rpcMsg, NULL);
// uv_timer_start(pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0); // uv_timer_start(pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0);
// auth // auth
...@@ -283,20 +284,23 @@ void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { ...@@ -283,20 +284,23 @@ void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
SConnBuffer* pBuf = &conn->connBuf; SConnBuffer* pBuf = &conn->connBuf;
if (nread > 0) { if (nread > 0) {
pBuf->len += nread; pBuf->len += nread;
tDebug("on read %p, total read: %d, current read: %d", cli, pBuf->len, (int)nread); tDebug("conn %p read summroy, total read: %d, current read: %d", conn, pBuf->len, (int)nread);
if (readComplete(pBuf)) { if (readComplete(pBuf)) {
tDebug("alread read complete packet"); tDebug("conn %p alread read complete packet", conn);
uvProcessData(conn); uvHandleReq(conn);
} else { } else {
tDebug("read half packet, continue to read"); tDebug("conn %p read partial packet, continue to read", conn);
} }
return; return;
} }
if (nread == 0) { if (nread == 0) {
tDebug("conn %p except read", conn);
// destroyConn(conn, true);
return; return;
} }
if (nread != UV_EOF) { if (nread != UV_EOF) {
tDebug("read error %s", uv_err_name(nread)); tDebug("conn %p read error: %s", conn, uv_err_name(nread));
destroyConn(conn, true);
} }
} }
void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
...@@ -306,7 +310,8 @@ void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* b ...@@ -306,7 +310,8 @@ void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* b
void uvOnTimeoutCb(uv_timer_t* handle) { void uvOnTimeoutCb(uv_timer_t* handle) {
// opt // opt
tDebug("time out"); SConn* pConn = handle->data;
tDebug("conn %p time out", pConn);
} }
void uvOnWriteCb(uv_write_t* req, int status) { void uvOnWriteCb(uv_write_t* req, int status) {
...@@ -317,9 +322,9 @@ void uvOnWriteCb(uv_write_t* req, int status) { ...@@ -317,9 +322,9 @@ void uvOnWriteCb(uv_write_t* req, int status) {
memset(buf->buf, 0, buf->cap); memset(buf->buf, 0, buf->cap);
buf->left = -1; buf->left = -1;
if (status == 0) { if (status == 0) {
tDebug("data already was written on stream"); tDebug("conn %p data already was written on stream", conn);
} else { } else {
tDebug("failed to write data, %s", uv_err_name(status)); tDebug("conn %p failed to write data, %s", conn, uv_err_name(status));
destroyConn(conn, true); destroyConn(conn, true);
} }
// opt // opt
...@@ -334,7 +339,7 @@ static void uvOnPipeWriteCb(uv_write_t* req, int status) { ...@@ -334,7 +339,7 @@ static void uvOnPipeWriteCb(uv_write_t* req, int status) {
static void uvPrepareSendData(SConn* conn, uv_buf_t* wb) { static void uvPrepareSendData(SConn* conn, uv_buf_t* wb) {
// impl later; // impl later;
tDebug("prepare to send back"); tDebug("conn %p prepare to send resp", conn);
SRpcMsg* pMsg = &conn->sendMsg; SRpcMsg* pMsg = &conn->sendMsg;
if (pMsg->pCont == 0) { if (pMsg->pCont == 0) {
pMsg->pCont = (void*)rpcMallocCont(0); pMsg->pCont = (void*)rpcMallocCont(0);
...@@ -427,6 +432,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { ...@@ -427,6 +432,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
assert(pending == UV_TCP); assert(pending == UV_TCP);
SConn* pConn = createConn(); SConn* pConn = createConn();
pConn->pTransInst = pThrd->pTransInst; pConn->pTransInst = pThrd->pTransInst;
/* init conn timer*/ /* init conn timer*/
pConn->pTimer = malloc(sizeof(uv_timer_t)); pConn->pTimer = malloc(sizeof(uv_timer_t));
...@@ -448,7 +454,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { ...@@ -448,7 +454,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
if (uv_accept(q, (uv_stream_t*)(pConn->pTcp)) == 0) { if (uv_accept(q, (uv_stream_t*)(pConn->pTcp)) == 0) {
uv_os_fd_t fd; uv_os_fd_t fd;
uv_fileno((const uv_handle_t*)pConn->pTcp, &fd); uv_fileno((const uv_handle_t*)pConn->pTcp, &fd);
tDebug("new connection created: %d", fd); tDebug("conn %p created, fd: %d", pConn, fd);
uv_read_start((uv_stream_t*)(pConn->pTcp), uvAllocReadBufferCb, uvOnReadCb); uv_read_start((uv_stream_t*)(pConn->pTcp), uvAllocReadBufferCb, uvOnReadCb);
} else { } else {
tDebug("failed to create new connection"); tDebug("failed to create new connection");
...@@ -515,19 +521,19 @@ void* workerThread(void* arg) { ...@@ -515,19 +521,19 @@ void* workerThread(void* arg) {
static SConn* createConn() { static SConn* createConn() {
SConn* pConn = (SConn*)calloc(1, sizeof(SConn)); SConn* pConn = (SConn*)calloc(1, sizeof(SConn));
++pConn->ref;
return pConn; return pConn;
} }
static void connCloseCb(uv_handle_t* handle) {
// impl later
//
}
static void destroyConn(SConn* conn, bool clear) { static void destroyConn(SConn* conn, bool clear) {
if (conn == NULL) { if (conn == NULL) {
return; return;
} }
if (--conn->ref == 0) {
return;
}
if (clear) { if (clear) {
uv_handle_t handle = *((uv_handle_t*)conn->pTcp); uv_close((uv_handle_t*)conn->pTcp, NULL);
uv_close(&handle, NULL);
} }
uv_timer_stop(conn->pTimer); uv_timer_stop(conn->pTimer);
free(conn->pTimer); free(conn->pTimer);
...@@ -646,6 +652,7 @@ void rpcSendResponse(const SRpcMsg* pMsg) { ...@@ -646,6 +652,7 @@ void rpcSendResponse(const SRpcMsg* pMsg) {
pthread_mutex_lock(&pThrd->connMtx); pthread_mutex_lock(&pThrd->connMtx);
QUEUE_PUSH(&pThrd->conn, &pConn->queue); QUEUE_PUSH(&pThrd->conn, &pConn->queue);
pthread_mutex_unlock(&pThrd->connMtx); pthread_mutex_unlock(&pThrd->connMtx);
tDebug("conn %p start to send resp", pConn);
uv_async_send(pConn->pWorkerAsync); uv_async_send(pConn->pWorkerAsync);
} }
......
...@@ -63,7 +63,7 @@ static void *sendRequest(void *param) { ...@@ -63,7 +63,7 @@ static void *sendRequest(void *param) {
if (pInfo->num % 20000 == 0) tInfo("thread:%d, %d requests have been sent", pInfo->index, pInfo->num); if (pInfo->num % 20000 == 0) tInfo("thread:%d, %d requests have been sent", pInfo->index, pInfo->num);
// tsem_wait(&pInfo->rspSem); // tsem_wait(&pInfo->rspSem);
tsem_wait(&pInfo->rspSem); tsem_wait(&pInfo->rspSem);
tDebug("recv response"); tDebug("recv response succefully");
// usleep(100000000); // usleep(100000000);
} }
......
...@@ -248,6 +248,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_FUNC_NAME, "Invalid func name") ...@@ -248,6 +248,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_FUNC_NAME, "Invalid func name")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_FUNC_COMMENT, "Invalid func comment") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_FUNC_COMMENT, "Invalid func comment")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_FUNC_CODE, "Invalid func code") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_FUNC_CODE, "Invalid func code")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_FUNC_BUFSIZE, "Invalid func bufSize") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_FUNC_BUFSIZE, "Invalid func bufSize")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_FUNC_RETRIEVE, "Invalid func retrieve msg")
// mnode-trans // mnode-trans
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_ALREADY_EXIST, "Transaction already exists") TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_ALREADY_EXIST, "Transaction already exists")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册