diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 238753e5b771f4b3fcae28eba6a7af0a5371b9ba..a5c2c89b2450fc4bd98e51c7fe4165bbbeb94375 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1108,18 +1108,34 @@ typedef struct { char* sql; char* physicalPlan; char* logicalPlan; -} SMCreateTopicReq; +} SCMCreateStreamReq; -int32_t tSerializeMCreateTopicReq(void* buf, int32_t bufLen, const SMCreateTopicReq* pReq); -int32_t tDeserializeSMCreateTopicReq(void* buf, int32_t bufLen, SMCreateTopicReq* pReq); -void tFreeSMCreateTopicReq(SMCreateTopicReq* pReq); +typedef struct { + int64_t streamId; +} SCMCreateStreamRsp; + +int32_t tSerializeSCMCreateStreamReq(void* buf, int32_t bufLen, const SCMCreateStreamReq* pReq); +int32_t tDeserializeSCMCreateStreamReq(void* buf, int32_t bufLen, SCMCreateStreamReq* pReq); +void tFreeSCMCreateStreamReq(SCMCreateStreamReq* pReq); + +typedef struct { + char name[TSDB_TOPIC_FNAME_LEN]; + int8_t igExists; + char* sql; + char* physicalPlan; + char* logicalPlan; +} SCMCreateTopicReq; + +int32_t tSerializeSCMCreateTopicReq(void* buf, int32_t bufLen, const SCMCreateTopicReq* pReq); +int32_t tDeserializeSCMCreateTopicReq(void* buf, int32_t bufLen, SCMCreateTopicReq* pReq); +void tFreeSCMCreateTopicReq(SCMCreateTopicReq* pReq); typedef struct { int64_t topicId; -} SMCreateTopicRsp; +} SCMCreateTopicRsp; -int32_t tSerializeSMCreateTopicRsp(void* buf, int32_t bufLen, const SMCreateTopicRsp* pRsp); -int32_t tDeserializeSMCreateTopicRsp(void* buf, int32_t bufLen, SMCreateTopicRsp* pRsp); +int32_t tSerializeSCMCreateTopicRsp(void* buf, int32_t bufLen, const SCMCreateTopicRsp* pRsp); +int32_t tDeserializeSCMCreateTopicRsp(void* buf, int32_t bufLen, SCMCreateTopicRsp* pRsp); typedef struct { int32_t topicNum; diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 0f0c4729bc70cf4fbcfdc56d331794fe8e846158..6a27db89c6cefaf9bcae23f2715f0bee529e443b 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -148,6 +148,9 @@ enum { TD_DEF_MSG_TYPE(TDMT_MND_MQ_TIMER, "mnode-mq-tmr", SMTimerReq, SMTimerReq) TD_DEF_MSG_TYPE(TDMT_MND_MQ_DO_REBALANCE, "mnode-mq-do-rebalance", SMqDoRebalanceMsg, SMqDoRebalanceMsg) TD_DEF_MSG_TYPE(TDMT_MND_MQ_COMMIT_OFFSET, "mnode-mq-commit-offset", SMqCMCommitOffsetReq, SMqCMCommitOffsetRsp) + TD_DEF_MSG_TYPE(TDMT_MND_CREATE_STREAM, "mnode-create-stream", SCMCreateStreamReq, SCMCreateStreamRsp) + TD_DEF_MSG_TYPE(TDMT_MND_ALTER_STREAM, "mnode-alter-stream", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_DROP_STREAM, "mnode-drop-stream", NULL, NULL) // Requests handled by VNODE TD_NEW_MSG_SEG(TDMT_VND_MSG) diff --git a/include/dnode/mnode/sdb/sdb.h b/include/dnode/mnode/sdb/sdb.h index bf48a8523c427a10801b0cccaf1046d08c9cc07f..d04e9f817e7b601a22212765ba6612bb6ec649f2 100644 --- a/include/dnode/mnode/sdb/sdb.h +++ b/include/dnode/mnode/sdb/sdb.h @@ -113,15 +113,16 @@ typedef enum { SDB_USER = 7, SDB_AUTH = 8, SDB_ACCT = 9, - SDB_OFFSET = 10, - SDB_SUBSCRIBE = 11, - SDB_CONSUMER = 12, - SDB_TOPIC = 13, - SDB_VGROUP = 14, - SDB_STB = 15, - SDB_DB = 16, - SDB_FUNC = 17, - SDB_MAX = 18 + SDB_STREAM = 10, + SDB_OFFSET = 11, + SDB_SUBSCRIBE = 12, + SDB_CONSUMER = 13, + SDB_TOPIC = 14, + SDB_VGROUP = 15, + SDB_STB = 16, + SDB_DB = 17, + SDB_FUNC = 18, + SDB_MAX = 19 } ESdbType; typedef struct SSdb SSdb; diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 6ad34eb0c061fc4564d1cdf4eb97f6bd38d881cb..56ad1eea3bd317a4344d1b7630d4b682a1aa5e5e 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -267,9 +267,11 @@ int32_t* taosGetErrno(); #define TSDB_CODE_MND_UNSUPPORTED_TOPIC TAOS_DEF_ERROR_CODE(0, 0x03E8) #define TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03E9) #define TSDB_CODE_MND_OFFSET_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03EA) -#define TSDB_CODE_MND_MQ_PLACEHOLDER TAOS_DEF_ERROR_CODE(0, 0x03F0) - +// mnode-stream +#define TSDB_CODE_MND_STREAM_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03F0) +#define TSDB_CODE_MND_STREAM_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03F1) +#define TSDB_CODE_MND_INVALID_STREAM_OPTION TAOS_DEF_ERROR_CODE(0, 0x03F2) // dnode #define TSDB_CODE_DND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0400) diff --git a/include/util/tdef.h b/include/util/tdef.h index 664588f68ff4973e14676924a6ba5647f262a43c..ed88fa5535c24e6c66ea88c20e3e17e629091e66 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -211,6 +211,7 @@ typedef enum ELogicConditionType { #define TSDB_TYPE_STR_MAX_LEN 32 #define TSDB_TABLE_FNAME_LEN (TSDB_DB_FNAME_LEN + TSDB_TABLE_NAME_LEN + TSDB_NAME_DELIMITER_LEN) #define TSDB_TOPIC_FNAME_LEN TSDB_TABLE_FNAME_LEN +#define TSDB_STREAM_FNAME_LEN TSDB_TABLE_FNAME_LEN #define TSDB_SUBSCRIBE_KEY_LEN (TSDB_CGROUP_LEN + TSDB_TOPIC_FNAME_LEN + 2) #define TSDB_PARTITION_KEY_LEN (TSDB_SUBSCRIBE_KEY_LEN + 20) #define TSDB_COL_NAME_LEN 65 diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 60103cc9c5a5f5c25ccc214beb379452feaca0c6..197f2e37c53c8f783a7bd76e3fff459016334e94 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -511,7 +511,7 @@ TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, i tNameFromString(&name, dbName, T_NAME_ACCT | T_NAME_DB); tNameFromString(&name, topicName, T_NAME_TABLE); - SMCreateTopicReq req = { + SCMCreateTopicReq req = { .igExists = 1, .physicalPlan = (char*)pStr, .sql = (char*)sql, @@ -519,13 +519,13 @@ TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, i }; tNameExtractFullName(&name, req.name); - int tlen = tSerializeMCreateTopicReq(NULL, 0, &req); + int tlen = tSerializeSCMCreateTopicReq(NULL, 0, &req); void* buf = malloc(tlen); if (buf == NULL) { goto _return; } - tSerializeMCreateTopicReq(buf, tlen, &req); + tSerializeSCMCreateTopicReq(buf, tlen, &req); /*printf("formatted: %s\n", dagStr);*/ pRequest->body.requestMsg = (SDataBuf){.pData = buf, .len = tlen, .handle = NULL}; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 97b19c1c7963c01472b393141b89307850a89e68..135ff3420742f1324dac0908efc8f8f0ebd16eab 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -1899,7 +1899,7 @@ int32_t tDeserializeSMDropTopicReq(void *buf, int32_t bufLen, SMDropTopicReq *pR return 0; } -int32_t tSerializeMCreateTopicReq(void *buf, int32_t bufLen, const SMCreateTopicReq *pReq) { +int32_t tSerializeSCMCreateTopicReq(void *buf, int32_t bufLen, const SCMCreateTopicReq *pReq) { int32_t sqlLen = 0; int32_t physicalPlanLen = 0; int32_t logicalPlanLen = 0; @@ -1927,7 +1927,7 @@ int32_t tSerializeMCreateTopicReq(void *buf, int32_t bufLen, const SMCreateTopic return tlen; } -int32_t tDeserializeSMCreateTopicReq(void *buf, int32_t bufLen, SMCreateTopicReq *pReq) { +int32_t tDeserializeSCMCreateTopicReq(void *buf, int32_t bufLen, SCMCreateTopicReq *pReq) { int32_t sqlLen = 0; int32_t physicalPlanLen = 0; int32_t logicalPlanLen = 0; @@ -1956,13 +1956,13 @@ int32_t tDeserializeSMCreateTopicReq(void *buf, int32_t bufLen, SMCreateTopicReq return 0; } -void tFreeSMCreateTopicReq(SMCreateTopicReq *pReq) { +void tFreeSCMCreateTopicReq(SCMCreateTopicReq *pReq) { tfree(pReq->sql); tfree(pReq->physicalPlan); tfree(pReq->logicalPlan); } -int32_t tSerializeSMCreateTopicRsp(void *buf, int32_t bufLen, const SMCreateTopicRsp *pRsp) { +int32_t tSerializeSCMCreateTopicRsp(void *buf, int32_t bufLen, const SCMCreateTopicRsp *pRsp) { SCoder encoder = {0}; tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); @@ -1975,7 +1975,7 @@ int32_t tSerializeSMCreateTopicRsp(void *buf, int32_t bufLen, const SMCreateTopi return tlen; } -int32_t tDeserializeSMCreateTopicRsp(void *buf, int32_t bufLen, SMCreateTopicRsp *pRsp) { +int32_t tDeserializeSCMCreateTopicRsp(void *buf, int32_t bufLen, SCMCreateTopicRsp *pRsp) { SCoder decoder = {0}; tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER); @@ -2423,3 +2423,43 @@ void *tDeserializeSVDropTSmaReq(void *buf, SVDropTSmaReq *pReq) { return buf; } + +int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateStreamReq *pReq) { + SCoder encoder = {0}; + tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); + + if (tStartEncode(&encoder) < 0) return -1; + if (tEncodeCStr(&encoder, pReq->name) < 0) return -1; + if (tEncodeI8(&encoder, pReq->igExists) < 0) return -1; + if (tEncodeCStr(&encoder, pReq->sql) < 0) return -1; + if (tEncodeCStr(&encoder, pReq->physicalPlan) < 0) return -1; + if (tEncodeCStr(&encoder, pReq->logicalPlan) < 0) return -1; + + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tCoderClear(&encoder); + return tlen; +} + +int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStreamReq *pReq) { + SCoder decoder = {0}; + tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER); + + if (tStartDecode(&decoder) < 0) return -1; + if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1; + if (tDecodeI8(&decoder, &pReq->igExists) < 0) return -1; + if (tDecodeCStr(&decoder, (const char **)&pReq->sql) < 0) return -1; + if (tDecodeCStr(&decoder, (const char **)&pReq->physicalPlan) < 0) return -1; + if (tDecodeCStr(&decoder, (const char **)&pReq->logicalPlan) < 0) return -1; + tEndDecode(&decoder); + + tCoderClear(&decoder); + return 0; +} + +void tFreeSCMCreateStreamReq(SCMCreateStreamReq *pReq) { + tfree(pReq->sql); + tfree(pReq->physicalPlan); + tfree(pReq->logicalPlan); +} diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index a36168b622dcc124a4c47105be5d0877330c0377..490a63333f927d9d72f677b949cd9d14c6241b98 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -84,6 +84,7 @@ typedef enum { TRN_TYPE_SUBSCRIBE = 1016, TRN_TYPE_REBALANCE = 1017, TRN_TYPE_COMMIT_OFFSET = 1018, + TRN_TYPE_CREATE_STREAM = 1019, TRN_TYPE_BASIC_SCOPE_END, TRN_TYPE_GLOBAL_SCOPE = 2000, TRN_TYPE_CREATE_DNODE = 2001, @@ -662,7 +663,23 @@ static FORCE_INLINE void* tDecodeSMqConsumerObj(void* buf, SMqConsumerObj* pCons } typedef struct { -} SStreamScheduler; + char name[TSDB_TOPIC_FNAME_LEN]; + char db[TSDB_DB_FNAME_LEN]; + int64_t createTime; + int64_t updateTime; + int64_t uid; + int64_t dbUid; + int32_t version; + SRWLatch lock; + int8_t status; + // int32_t sqlLen; + char* sql; + char* logicalPlan; + char* physicalPlan; +} SStreamObj; + +int32_t tEncodeSStreamObj(SCoder* pEncoder, const SStreamObj* pObj); +int32_t tDecodeSStreamObj(SCoder* pDecoder, SStreamObj* pObj); typedef struct SMnodeMsg { char user[TSDB_USER_LEN]; diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h new file mode 100644 index 0000000000000000000000000000000000000000..b1145b0c6bf598542661f2d2f556b5e2a9f4dc60 --- /dev/null +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_MND_STREAM_H_ +#define _TD_MND_STREAM_H_ + +#include "mndInt.h" + +#ifdef __cplusplus +extern "C" { +#endif + +int32_t mndInitStream(SMnode *pMnode); +void mndCleanupStream(SMnode *pMnode); + +SStreamObj *mndAcquireStream(SMnode *pMnode, char *streamName); +void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream); + +SSdbRaw *mndStreamActionEncode(SStreamObj *pStream); +SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw); + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_MND_STREAM_H_*/ diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c new file mode 100644 index 0000000000000000000000000000000000000000..6e8d9aa79f8f9dfd1cd0e1a2689f91905600a2bc --- /dev/null +++ b/source/dnode/mnode/impl/src/mndDef.c @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "mndDef.h" + +int32_t tEncodeSStreamObj(SCoder *pEncoder, const SStreamObj *pObj) { + if (tEncodeCStr(pEncoder, pObj->name) < 0) return -1; + if (tEncodeCStr(pEncoder, pObj->db) < 0) return -1; + if (tEncodeI64(pEncoder, pObj->createTime) < 0) return -1; + if (tEncodeI64(pEncoder, pObj->updateTime) < 0) return -1; + if (tEncodeI64(pEncoder, pObj->uid) < 0) return -1; + if (tEncodeI64(pEncoder, pObj->dbUid) < 0) return -1; + if (tEncodeI32(pEncoder, pObj->version) < 0) return -1; + if (tEncodeI8(pEncoder, pObj->status) < 0) return -1; + if (tEncodeCStr(pEncoder, pObj->sql) < 0) return -1; + if (tEncodeCStr(pEncoder, pObj->logicalPlan) < 0) return -1; + if (tEncodeCStr(pEncoder, pObj->physicalPlan) < 0) return -1; + return pEncoder->pos; +} + +int32_t tDecodeSStreamObj(SCoder *pDecoder, SStreamObj *pObj) { + if (tDecodeCStrTo(pDecoder, pObj->name) < 0) return -1; + if (tDecodeCStrTo(pDecoder, pObj->db) < 0) return -1; + if (tDecodeI64(pDecoder, &pObj->createTime) < 0) return -1; + if (tDecodeI64(pDecoder, &pObj->updateTime) < 0) return -1; + if (tDecodeI64(pDecoder, &pObj->uid) < 0) return -1; + if (tDecodeI64(pDecoder, &pObj->dbUid) < 0) return -1; + if (tDecodeI32(pDecoder, &pObj->version) < 0) return -1; + if (tDecodeI8(pDecoder, &pObj->status) < 0) return -1; + if (tDecodeCStr(pDecoder, (const char **)&pObj->sql) < 0) return -1; + if (tDecodeCStr(pDecoder, (const char **)&pObj->logicalPlan) < 0) return -1; + if (tDecodeCStr(pDecoder, (const char **)&pObj->physicalPlan) < 0) return -1; + return 0; +} diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c new file mode 100644 index 0000000000000000000000000000000000000000..54ad9cd7e239b20a84eaa6f7c791b1942413598f --- /dev/null +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -0,0 +1,444 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "mndStream.h" +#include "mndAuth.h" +#include "mndDb.h" +#include "mndDnode.h" +#include "mndMnode.h" +#include "mndShow.h" +#include "mndStb.h" +#include "mndTrans.h" +#include "mndUser.h" +#include "mndVgroup.h" +#include "tname.h" + +#define MND_STREAM_VER_NUMBER 1 +#define MND_STREAM_RESERVE_SIZE 64 + +static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream); +static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream); +static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pStream, SStreamObj *pNewStream); +static int32_t mndProcessCreateStreamReq(SMnodeMsg *pReq); +/*static int32_t mndProcessDropStreamReq(SMnodeMsg *pReq);*/ +/*static int32_t mndProcessDropStreamInRsp(SMnodeMsg *pRsp);*/ +static int32_t mndProcessStreamMetaReq(SMnodeMsg *pReq); +static int32_t mndGetStreamMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); +static int32_t mndRetrieveStream(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows); +static void mndCancelGetNextStream(SMnode *pMnode, void *pIter); + +int32_t mndInitStream(SMnode *pMnode) { + SSdbTable table = {.sdbType = SDB_STREAM, + .keyType = SDB_KEY_BINARY, + .encodeFp = (SdbEncodeFp)mndStreamActionEncode, + .decodeFp = (SdbDecodeFp)mndStreamActionDecode, + .insertFp = (SdbInsertFp)mndStreamActionInsert, + .updateFp = (SdbUpdateFp)mndStreamActionUpdate, + .deleteFp = (SdbDeleteFp)mndStreamActionDelete}; + + mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STREAM, mndProcessCreateStreamReq); + /*mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM, mndProcessDropStreamReq);*/ + /*mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM_RSP, mndProcessDropStreamInRsp);*/ + + mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_TP, mndGetStreamMeta); + mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TP, mndRetrieveStream); + mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TP, mndCancelGetNextStream); + + return sdbSetTable(pMnode->pSdb, table); +} + +void mndCleanupStream(SMnode *pMnode) {} + +SSdbRaw *mndStreamActionEncode(SStreamObj *pStream) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + void *buf = NULL; + + SCoder encoder; + tCoderInit(&encoder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER); + if (tEncodeSStreamObj(NULL, pStream) < 0) { + tCoderClear(&encoder); + goto STREAM_ENCODE_OVER; + } + int32_t tlen = encoder.pos; + tCoderClear(&encoder); + + int32_t size = sizeof(int32_t) + tlen + MND_STREAM_RESERVE_SIZE; + SSdbRaw *pRaw = sdbAllocRaw(SDB_STREAM, MND_STREAM_VER_NUMBER, size); + if (pRaw == NULL) goto STREAM_ENCODE_OVER; + + buf = malloc(tlen); + if (buf == NULL) goto STREAM_ENCODE_OVER; + + tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, tlen, TD_ENCODER); + if (tEncodeSStreamObj(NULL, pStream) < 0) { + tCoderClear(&encoder); + goto STREAM_ENCODE_OVER; + } + tCoderClear(&encoder); + + int32_t dataPos = 0; + SDB_SET_INT32(pRaw, dataPos, tlen, STREAM_ENCODE_OVER); + SDB_SET_BINARY(pRaw, dataPos, buf, tlen, STREAM_ENCODE_OVER); + SDB_SET_DATALEN(pRaw, dataPos, STREAM_ENCODE_OVER); + + terrno = TSDB_CODE_SUCCESS; + +STREAM_ENCODE_OVER: + tfree(buf); + if (terrno != TSDB_CODE_SUCCESS) { + mError("stream:%s, failed to encode to raw:%p since %s", pStream->name, pRaw, terrstr()); + sdbFreeRaw(pRaw); + return NULL; + } + + mTrace("stream:%s, encode to raw:%p, row:%p", pStream->name, pRaw, pStream); + return pRaw; +} + +SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + void *buf = NULL; + + int8_t sver = 0; + if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto STREAM_DECODE_OVER; + + if (sver != MND_STREAM_VER_NUMBER) { + terrno = TSDB_CODE_SDB_INVALID_DATA_VER; + goto STREAM_DECODE_OVER; + } + + int32_t size = sizeof(SStreamObj); + SSdbRow *pRow = sdbAllocRow(size); + if (pRow == NULL) goto STREAM_DECODE_OVER; + + SStreamObj *pStream = sdbGetRowObj(pRow); + if (pStream == NULL) goto STREAM_DECODE_OVER; + + int32_t tlen; + int32_t dataPos = 0; + SDB_GET_INT32(pRaw, dataPos, &tlen, STREAM_DECODE_OVER); + buf = malloc(tlen + 1); + if (buf == NULL) goto STREAM_DECODE_OVER; + SDB_GET_BINARY(pRaw, dataPos, buf, tlen, STREAM_DECODE_OVER); + + SCoder decoder; + tCoderInit(&decoder, TD_LITTLE_ENDIAN, NULL, 0, TD_DECODER); + if (tDecodeSStreamObj(&decoder, pStream) < 0) { + goto STREAM_DECODE_OVER; + } + + terrno = TSDB_CODE_SUCCESS; + +STREAM_DECODE_OVER: + tfree(buf); + if (terrno != TSDB_CODE_SUCCESS) { + mError("stream:%s, failed to decode from raw:%p since %s", pStream->name, pRaw, terrstr()); + tfree(pRow); + return NULL; + } + + mTrace("stream:%s, decode from raw:%p, row:%p", pStream->name, pRaw, pStream); + return pRow; +} + +static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream) { + mTrace("stream:%s, perform insert action", pStream->name); + return 0; +} + +static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream) { + mTrace("stream:%s, perform delete action", pStream->name); + return 0; +} + +static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pOldStream, SStreamObj *pNewStream) { + mTrace("stream:%s, perform update action", pOldStream->name); + atomic_exchange_32(&pOldStream->updateTime, pNewStream->updateTime); + atomic_exchange_32(&pOldStream->version, pNewStream->version); + + taosWLockLatch(&pOldStream->lock); + + // TODO handle update + + taosWUnLockLatch(&pOldStream->lock); + return 0; +} + +SStreamObj *mndAcquireStream(SMnode *pMnode, char *streamName) { + SSdb *pSdb = pMnode->pSdb; + SStreamObj *pStream = sdbAcquire(pSdb, SDB_STREAM, streamName); + if (pStream == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) { + terrno = TSDB_CODE_MND_STREAM_NOT_EXIST; + } + return pStream; +} + +void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream) { + SSdb *pSdb = pMnode->pSdb; + sdbRelease(pSdb, pStream); +} + +static SDbObj *mndAcquireDbByStream(SMnode *pMnode, char *streamName) { + SName name = {0}; + tNameFromString(&name, streamName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); + + char db[TSDB_STREAM_FNAME_LEN] = {0}; + tNameGetFullDbName(&name, db); + + return mndAcquireDb(pMnode, db); +} + +static int32_t mndCheckCreateStreamReq(SCMCreateStreamReq *pCreate) { + if (pCreate->name[0] == 0 || pCreate->sql == NULL || pCreate->sql[0] == 0) { + terrno = TSDB_CODE_MND_INVALID_STREAM_OPTION; + return -1; + } + return 0; +} + +static int32_t mndCreateStream(SMnode *pMnode, SMnodeMsg *pReq, SCMCreateStreamReq *pCreate, SDbObj *pDb) { + mDebug("stream:%s to create", pCreate->name); + SStreamObj streamObj = {0}; + tstrncpy(streamObj.name, pCreate->name, TSDB_STREAM_FNAME_LEN); + tstrncpy(streamObj.db, pDb->name, TSDB_DB_FNAME_LEN); + streamObj.createTime = taosGetTimestampMs(); + streamObj.updateTime = streamObj.createTime; + streamObj.uid = mndGenerateUid(pCreate->name, strlen(pCreate->name)); + streamObj.dbUid = pDb->uid; + streamObj.version = 1; + streamObj.sql = pCreate->sql; + streamObj.physicalPlan = pCreate->physicalPlan; + streamObj.logicalPlan = pCreate->logicalPlan; + + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_STREAM, &pReq->rpcMsg); + if (pTrans == NULL) { + mError("stream:%s, failed to create since %s", pCreate->name, terrstr()); + return -1; + } + mDebug("trans:%d, used to create stream:%s", pTrans->id, pCreate->name); + + SSdbRaw *pRedoRaw = mndStreamActionEncode(&streamObj); + if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) { + mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr()); + mndTransDrop(pTrans); + return -1; + } + sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY); + + if (mndTransPrepare(pMnode, pTrans) != 0) { + mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); + mndTransDrop(pTrans); + return -1; + } + + mndTransDrop(pTrans); + return 0; +} + +static int32_t mndProcessCreateStreamReq(SMnodeMsg *pReq) { + SMnode *pMnode = pReq->pMnode; + int32_t code = -1; + SStreamObj *pStream = NULL; + SDbObj *pDb = NULL; + SUserObj *pUser = NULL; + SCMCreateStreamReq createStreamReq = {0}; + + if (tDeserializeSCMCreateStreamReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &createStreamReq) != 0) { + terrno = TSDB_CODE_INVALID_MSG; + goto CREATE_STREAM_OVER; + } + + mDebug("stream:%s, start to create, sql:%s", createStreamReq.name, createStreamReq.sql); + + if (mndCheckCreateStreamReq(&createStreamReq) != 0) { + mError("stream:%s, failed to create since %s", createStreamReq.name, terrstr()); + goto CREATE_STREAM_OVER; + } + + pStream = mndAcquireStream(pMnode, createStreamReq.name); + if (pStream != NULL) { + if (createStreamReq.igExists) { + mDebug("stream:%s, already exist, ignore exist is set", createStreamReq.name); + code = 0; + goto CREATE_STREAM_OVER; + } else { + terrno = TSDB_CODE_MND_STREAM_ALREADY_EXIST; + goto CREATE_STREAM_OVER; + } + } else if (terrno != TSDB_CODE_MND_STREAM_NOT_EXIST) { + goto CREATE_STREAM_OVER; + } + + pDb = mndAcquireDbByStream(pMnode, createStreamReq.name); + if (pDb == NULL) { + terrno = TSDB_CODE_MND_DB_NOT_SELECTED; + goto CREATE_STREAM_OVER; + } + + pUser = mndAcquireUser(pMnode, pReq->user); + if (pUser == NULL) { + goto CREATE_STREAM_OVER; + } + + if (mndCheckWriteAuth(pUser, pDb) != 0) { + goto CREATE_STREAM_OVER; + } + + code = mndCreateStream(pMnode, pReq, &createStreamReq, pDb); + if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; + +CREATE_STREAM_OVER: + if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { + mError("stream:%s, failed to create since %s", createStreamReq.name, terrstr()); + } + + mndReleaseStream(pMnode, pStream); + mndReleaseDb(pMnode, pDb); + mndReleaseUser(pMnode, pUser); + + tFreeSCMCreateStreamReq(&createStreamReq); + return code; +} + +static int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfStreams) { + SSdb *pSdb = pMnode->pSdb; + SDbObj *pDb = mndAcquireDb(pMnode, dbName); + if (pDb == NULL) { + terrno = TSDB_CODE_MND_DB_NOT_SELECTED; + return -1; + } + + int32_t numOfStreams = 0; + void *pIter = NULL; + while (1) { + SStreamObj *pStream = NULL; + pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream); + if (pIter == NULL) break; + + if (pStream->dbUid == pDb->uid) { + numOfStreams++; + } + + sdbRelease(pSdb, pStream); + } + + *pNumOfStreams = numOfStreams; + mndReleaseDb(pMnode, pDb); + return 0; +} + +static int32_t mndGetStreamMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) { + SMnode *pMnode = pReq->pMnode; + SSdb *pSdb = pMnode->pSdb; + + if (mndGetNumOfStreams(pMnode, pShow->db, &pShow->numOfRows) != 0) { + return -1; + } + + int32_t cols = 0; + SSchema *pSchema = pMeta->pSchemas; + + pShow->bytes[cols] = TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE; + pSchema[cols].type = TSDB_DATA_TYPE_BINARY; + strcpy(pSchema[cols].name, "name"); + pSchema[cols].bytes = pShow->bytes[cols]; + cols++; + + pShow->bytes[cols] = 8; + pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP; + strcpy(pSchema[cols].name, "create_time"); + pSchema[cols].bytes = pShow->bytes[cols]; + cols++; + + pShow->bytes[cols] = TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE; + pSchema[cols].type = TSDB_DATA_TYPE_BINARY; + strcpy(pSchema[cols].name, "sql"); + pSchema[cols].bytes = pShow->bytes[cols]; + cols++; + + pMeta->numOfColumns = cols; + pShow->numOfColumns = cols; + + pShow->offset[0] = 0; + for (int32_t i = 1; i < cols; ++i) { + pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1]; + } + + pShow->numOfRows = sdbGetSize(pSdb, SDB_STREAM); + pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; + strcpy(pMeta->tbName, mndShowStr(pShow->type)); + + return 0; +} + +static int32_t mndRetrieveStream(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { + SMnode *pMnode = pReq->pMnode; + SSdb *pSdb = pMnode->pSdb; + int32_t numOfRows = 0; + SStreamObj *pStream = NULL; + int32_t cols = 0; + char *pWrite; + char prefix[TSDB_DB_FNAME_LEN] = {0}; + + SDbObj *pDb = mndAcquireDb(pMnode, pShow->db); + if (pDb == NULL) return 0; + + tstrncpy(prefix, pShow->db, TSDB_DB_FNAME_LEN); + strcat(prefix, TS_PATH_DELIMITER); + int32_t prefixLen = (int32_t)strlen(prefix); + + while (numOfRows < rows) { + pShow->pIter = sdbFetch(pSdb, SDB_STREAM, pShow->pIter, (void **)&pStream); + if (pShow->pIter == NULL) break; + + if (pStream->dbUid != pDb->uid) { + if (strncmp(pStream->name, prefix, prefixLen) != 0) { + mError("Inconsistent stream data, name:%s, db:%s, dbUid:%" PRIu64, pStream->name, pDb->name, pDb->uid); + } + + sdbRelease(pSdb, pStream); + continue; + } + + cols = 0; + + char streamName[TSDB_TABLE_NAME_LEN] = {0}; + tstrncpy(streamName, pStream->name + prefixLen, TSDB_TABLE_NAME_LEN); + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + STR_TO_VARSTR(pWrite, streamName); + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + *(int64_t *)pWrite = pStream->createTime; + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pStream->sql, pShow->bytes[cols]); + cols++; + + numOfRows++; + sdbRelease(pSdb, pStream); + } + + mndReleaseDb(pMnode, pDb); + pShow->numOfReads += numOfRows; + mndVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow); + return numOfRows; +} + +static void mndCancelGetNextStream(SMnode *pMnode, void *pIter) { + SSdb *pSdb = pMnode->pSdb; + sdbCancelFetch(pSdb, pIter); +} diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index 9822550ee536d093864b55522deb6e2bb7aa4996..7d7c5f9975f3498f34ffb54640dcbb59a0bfd891 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -13,7 +13,6 @@ * along with this program. If not, see . */ -#define _DEFAULT_SOURCE #include "mndTopic.h" #include "mndAuth.h" #include "mndDb.h" @@ -229,7 +228,7 @@ static SDDropTopicReq *mndBuildDropTopicMsg(SMnode *pMnode, SVgObj *pVgroup, SMq return pDrop; } -static int32_t mndCheckCreateTopicReq(SMCreateTopicReq *pCreate) { +static int32_t mndCheckCreateTopicReq(SCMCreateTopicReq *pCreate) { if (pCreate->name[0] == 0 || pCreate->sql == NULL || pCreate->sql[0] == 0) { terrno = TSDB_CODE_MND_INVALID_TOPIC_OPTION; return -1; @@ -237,7 +236,7 @@ static int32_t mndCheckCreateTopicReq(SMCreateTopicReq *pCreate) { return 0; } -static int32_t mndCreateTopic(SMnode *pMnode, SMnodeMsg *pReq, SMCreateTopicReq *pCreate, SDbObj *pDb) { +static int32_t mndCreateTopic(SMnode *pMnode, SMnodeMsg *pReq, SCMCreateTopicReq *pCreate, SDbObj *pDb) { mDebug("topic:%s to create", pCreate->name); SMqTopicObj topicObj = {0}; tstrncpy(topicObj.name, pCreate->name, TSDB_TOPIC_FNAME_LEN); @@ -278,14 +277,14 @@ static int32_t mndCreateTopic(SMnode *pMnode, SMnodeMsg *pReq, SMCreateTopicReq } static int32_t mndProcessCreateTopicReq(SMnodeMsg *pReq) { - SMnode *pMnode = pReq->pMnode; - int32_t code = -1; - SMqTopicObj *pTopic = NULL; - SDbObj *pDb = NULL; - SUserObj *pUser = NULL; - SMCreateTopicReq createTopicReq = {0}; - - if (tDeserializeSMCreateTopicReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &createTopicReq) != 0) { + SMnode *pMnode = pReq->pMnode; + int32_t code = -1; + SMqTopicObj *pTopic = NULL; + SDbObj *pDb = NULL; + SUserObj *pUser = NULL; + SCMCreateTopicReq createTopicReq = {0}; + + if (tDeserializeSCMCreateTopicReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &createTopicReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; goto CREATE_TOPIC_OVER; } @@ -338,7 +337,7 @@ CREATE_TOPIC_OVER: mndReleaseDb(pMnode, pDb); mndReleaseUser(pMnode, pUser); - tFreeSMCreateTopicReq(&createTopicReq); + tFreeSCMCreateTopicReq(&createTopicReq); return code; } @@ -409,35 +408,6 @@ static int32_t mndProcessDropTopicInRsp(SMnodeMsg *pRsp) { return 0; } -#if 0 -static int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTopics) { - SSdb *pSdb = pMnode->pSdb; - SDbObj *pDb = mndAcquireDb(pMnode, dbName); - if (pDb == NULL) { - terrno = TSDB_CODE_MND_DB_NOT_SELECTED; - return -1; - } - - int32_t numOfTopics = 0; - void *pIter = NULL; - while (1) { - SMqTopicObj *pTopic = NULL; - pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic); - if (pIter == NULL) break; - - if (pTopic->dbUid == pDb->uid) { - numOfTopics++; - } - - sdbRelease(pSdb, pTopic); - } - - *pNumOfTopics = numOfTopics; - mndReleaseDb(pMnode, pDb); - return 0; -} -#endif - static int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTopics) { SSdb *pSdb = pMnode->pSdb; SDbObj *pDb = mndAcquireDb(pMnode, dbName); diff --git a/source/dnode/mnode/impl/test/topic/topic.cpp b/source/dnode/mnode/impl/test/topic/topic.cpp index 8a4e17d054b271ed17473cb0343830ffb34777be..f58d0a67717bddc9a47cde4c3e3245a6ba70a096 100644 --- a/source/dnode/mnode/impl/test/topic/topic.cpp +++ b/source/dnode/mnode/impl/test/topic/topic.cpp @@ -61,16 +61,16 @@ void* MndTestTopic::BuildCreateDbReq(const char* dbname, int32_t* pContLen) { } void* MndTestTopic::BuildCreateTopicReq(const char* topicName, const char* sql, int32_t* pContLen) { - SMCreateTopicReq createReq = {0}; + SCMCreateTopicReq createReq = {0}; strcpy(createReq.name, topicName); createReq.igExists = 0; createReq.sql = (char*)sql; createReq.physicalPlan = (char*)"physicalPlan"; createReq.logicalPlan = (char*)"logicalPlan"; - int32_t contLen = tSerializeMCreateTopicReq(NULL, 0, &createReq); + int32_t contLen = tSerializeSCMCreateTopicReq(NULL, 0, &createReq); void* pReq = rpcMallocCont(contLen); - tSerializeMCreateTopicReq(pReq, contLen, &createReq); + tSerializeSCMCreateTopicReq(pReq, contLen, &createReq); *pContLen = contLen; return pReq; @@ -100,9 +100,7 @@ TEST_F(MndTestTopic, 01_Create_Topic) { ASSERT_EQ(pRsp->code, 0); } - { - test.SendShowMetaReq(TSDB_MGMT_TABLE_TP, ""); - } + { test.SendShowMetaReq(TSDB_MGMT_TABLE_TP, ""); } { int32_t contLen = 0; diff --git a/source/dnode/snode/inc/sndInt.h b/source/dnode/snode/inc/sndInt.h index aff82e4ae72f0f54bda843da9fc5b5b96091ef0b..5851e18478163f2a0370a00b1a71412467cffcc7 100644 --- a/source/dnode/snode/inc/sndInt.h +++ b/source/dnode/snode/inc/sndInt.h @@ -20,6 +20,7 @@ #include "tlog.h" #include "tmsg.h" +#include "tqueue.h" #include "trpc.h" #include "snode.h" @@ -28,12 +29,51 @@ extern "C" { #endif +enum { + STREAM_STATUS__READY = 1, + STREAM_STATUS__STOPPED, + STREAM_STATUS__CREATING, + STREAM_STATUS__STOPING, + STREAM_STATUS__RESUMING, + STREAM_STATUS__DELETING, +}; + +enum { + STREAM_RUNNER__RUNNING = 1, + STREAM_RUNNER__STOP, +}; + typedef struct SSnode { SSnodeOpt cfg; } SSnode; +typedef struct { + int64_t streamId; + int32_t IdxInLevel; + int32_t level; +} SStreamInfo; + +typedef struct { + SStreamInfo meta; + int8_t status; + void* executor; + STaosQueue* queue; + void* stateStore; + // storage handle +} SStreamRunner; + +typedef struct { + SHashObj* pHash; +} SStreamMeta; + +int32_t sndCreateStream(); +int32_t sndDropStream(); + +int32_t sndStopStream(); +int32_t sndResumeStream(); + #ifdef __cplusplus } #endif -#endif /*_TD_SNODE_INT_H_*/ \ No newline at end of file +#endif /*_TD_SNODE_INT_H_*/ diff --git a/source/libs/wal/test/walMetaTest.cpp b/source/libs/wal/test/walMetaTest.cpp index 5bfea9ab5e2773bf3e4b6c080eba1a9e889f65a1..3908c97175b2c56ca3aada9b150a7af5f9ece2fa 100644 --- a/source/libs/wal/test/walMetaTest.cpp +++ b/source/libs/wal/test/walMetaTest.cpp @@ -124,8 +124,13 @@ class WalRetentionEnv : public ::testing::Test { void SetUp() override { SWalCfg cfg; - cfg.rollPeriod = -1, cfg.segSize = -1, cfg.retentionPeriod = -1, cfg.retentionSize = 0, cfg.rollPeriod = 0, - cfg.vgId = 0, cfg.level = TAOS_WAL_FSYNC; + cfg.rollPeriod = -1; + cfg.segSize = -1; + cfg.retentionPeriod = -1; + cfg.retentionSize = 0; + cfg.rollPeriod = 0; + cfg.vgId = 0; + cfg.level = TAOS_WAL_FSYNC; pWal = walOpen(pathName, &cfg); ASSERT(pWal != NULL); } diff --git a/source/util/test/encodeTest.cpp b/source/util/test/encodeTest.cpp index 46c95556d45f4f03817619da437875de268e1dc4..25314842fb4d417b29dca127a1da70d55b87bd4f 100644 --- a/source/util/test/encodeTest.cpp +++ b/source/util/test/encodeTest.cpp @@ -174,8 +174,8 @@ TEST(td_encode_test, encode_decode_variant_len_integer) { } TEST(td_encode_test, encode_decode_cstr) { - uint8_t * buf = new uint8_t[1024 * 1024]; - char * cstr = new char[1024 * 1024]; + uint8_t *buf = new uint8_t[1024 * 1024]; + char *cstr = new char[1024 * 1024]; const char *dcstr; SCoder encoder; SCoder decoder; @@ -208,7 +208,7 @@ TEST(td_encode_test, encode_decode_cstr) { typedef struct { int32_t A_a; int64_t A_b; - char * A_c; + char *A_c; } SStructA_v1; static int32_t tSStructA_v1_encode(SCoder *pCoder, const SStructA_v1 *pSAV1) { @@ -240,7 +240,7 @@ static int32_t tSStructA_v1_decode(SCoder *pCoder, SStructA_v1 *pSAV1) { typedef struct { int32_t A_a; int64_t A_b; - char * A_c; + char *A_c; // -------------------BELOW FEILDS ARE ADDED IN A NEW VERSION-------------- int16_t A_d; int16_t A_e; @@ -437,4 +437,4 @@ TEST(td_encode_test, compound_struct_encode_test) { tCoderClear(&decoder); } #endif -#pragma GCC diagnostic pop \ No newline at end of file +#pragma GCC diagnostic pop