提交 fa77a6d8 编写于 作者: C Cary Xu

feat: code optimization

上级 9e7b43c8
...@@ -2256,242 +2256,242 @@ static int32_t tDecodeTSmaWrapper(SDecoder* pDecoder, STSmaWrapper* pReq) { ...@@ -2256,242 +2256,242 @@ static int32_t tDecodeTSmaWrapper(SDecoder* pDecoder, STSmaWrapper* pReq) {
return 0; return 0;
} }
typedef struct { typedef struct {
int idx; int idx;
} SMCreateFullTextReq; } SMCreateFullTextReq;
int32_t tSerializeSMCreateFullTextReq(void* buf, int32_t bufLen, SMCreateFullTextReq* pReq);
int32_t tDeserializeSMCreateFullTextReq(void* buf, int32_t bufLen, SMCreateFullTextReq* pReq);
void tFreeSMCreateFullTextReq(SMCreateFullTextReq * pReq);
typedef struct {
char name[TSDB_TABLE_FNAME_LEN];
int8_t igNotExists;
} SMDropFullTextReq;
int32_t tSerializeSMDropFullTextReq(void* buf, int32_t bufLen, SMDropFullTextReq* pReq);
int32_t tDeserializeSMDropFullTextReq(void* buf, int32_t bufLen, SMDropFullTextReq* pReq);
typedef struct {
char indexFName[TSDB_INDEX_FNAME_LEN];
} SUserIndexReq;
int32_t tSerializeSUserIndexReq(void* buf, int32_t bufLen, SUserIndexReq* pReq);
int32_t tDeserializeSUserIndexReq(void* buf, int32_t bufLen, SUserIndexReq* pReq);
typedef struct {
char dbFName[TSDB_DB_FNAME_LEN];
char tblFName[TSDB_TABLE_FNAME_LEN];
char colName[TSDB_COL_NAME_LEN];
char indexType[TSDB_INDEX_TYPE_LEN];
char indexExts[TSDB_INDEX_EXTS_LEN];
} SUserIndexRsp;
int32_t tSerializeSUserIndexRsp(void* buf, int32_t bufLen, const SUserIndexRsp* pRsp);
int32_t tDeserializeSUserIndexRsp(void* buf, int32_t bufLen, SUserIndexRsp* pRsp);
typedef struct {
int8_t mqMsgType;
int32_t code;
int32_t epoch;
int64_t consumerId;
} SMqRspHead;
typedef struct {
SMsgHead head;
char subKey[TSDB_SUBSCRIBE_KEY_LEN];
int8_t withTbName;
int32_t epoch;
uint64_t reqId;
int64_t consumerId;
int64_t waitTime;
int64_t currentOffset;
} SMqPollReq;
typedef struct {
int32_t vgId;
int64_t offset;
SEpSet epSet;
} SMqSubVgEp;
static FORCE_INLINE int32_t tEncodeSMqSubVgEp(void** buf, const SMqSubVgEp* pVgEp) {
int32_t tlen = 0;
tlen += taosEncodeFixedI32(buf, pVgEp->vgId);
tlen += taosEncodeFixedI64(buf, pVgEp->offset);
tlen += taosEncodeSEpSet(buf, &pVgEp->epSet);
return tlen;
}
static FORCE_INLINE void* tDecodeSMqSubVgEp(void* buf, SMqSubVgEp* pVgEp) { int32_t tSerializeSMCreateFullTextReq(void* buf, int32_t bufLen, SMCreateFullTextReq* pReq);
buf = taosDecodeFixedI32(buf, &pVgEp->vgId); int32_t tDeserializeSMCreateFullTextReq(void* buf, int32_t bufLen, SMCreateFullTextReq* pReq);
buf = taosDecodeFixedI64(buf, &pVgEp->offset); void tFreeSMCreateFullTextReq(SMCreateFullTextReq* pReq);
buf = taosDecodeSEpSet(buf, &pVgEp->epSet);
return buf;
}
typedef struct { typedef struct {
char topic[TSDB_TOPIC_FNAME_LEN]; char name[TSDB_TABLE_FNAME_LEN];
int8_t isSchemaAdaptive; int8_t igNotExists;
SArray* vgs; // SArray<SMqSubVgEp> } SMDropFullTextReq;
SSchemaWrapper schema;
} SMqSubTopicEp;
static FORCE_INLINE int32_t tEncodeSMqSubTopicEp(void** buf, const SMqSubTopicEp* pTopicEp) {
int32_t tlen = 0;
tlen += taosEncodeString(buf, pTopicEp->topic);
tlen += taosEncodeFixedI8(buf, pTopicEp->isSchemaAdaptive);
int32_t sz = taosArrayGetSize(pTopicEp->vgs);
tlen += taosEncodeFixedI32(buf, sz);
for (int32_t i = 0; i < sz; i++) {
SMqSubVgEp* pVgEp = (SMqSubVgEp*)taosArrayGet(pTopicEp->vgs, i);
tlen += tEncodeSMqSubVgEp(buf, pVgEp);
}
tlen += taosEncodeSSchemaWrapper(buf, &pTopicEp->schema);
return tlen;
}
static FORCE_INLINE void* tDecodeSMqSubTopicEp(void* buf, SMqSubTopicEp* pTopicEp) { int32_t tSerializeSMDropFullTextReq(void* buf, int32_t bufLen, SMDropFullTextReq* pReq);
buf = taosDecodeStringTo(buf, pTopicEp->topic); int32_t tDeserializeSMDropFullTextReq(void* buf, int32_t bufLen, SMDropFullTextReq* pReq);
buf = taosDecodeFixedI8(buf, &pTopicEp->isSchemaAdaptive);
int32_t sz; typedef struct {
buf = taosDecodeFixedI32(buf, &sz); char indexFName[TSDB_INDEX_FNAME_LEN];
pTopicEp->vgs = taosArrayInit(sz, sizeof(SMqSubVgEp)); } SUserIndexReq;
if (pTopicEp->vgs == NULL) {
return NULL; int32_t tSerializeSUserIndexReq(void* buf, int32_t bufLen, SUserIndexReq* pReq);
} int32_t tDeserializeSUserIndexReq(void* buf, int32_t bufLen, SUserIndexReq* pReq);
for (int32_t i = 0; i < sz; i++) {
SMqSubVgEp vgEp; typedef struct {
buf = tDecodeSMqSubVgEp(buf, &vgEp); char dbFName[TSDB_DB_FNAME_LEN];
taosArrayPush(pTopicEp->vgs, &vgEp); char tblFName[TSDB_TABLE_FNAME_LEN];
} char colName[TSDB_COL_NAME_LEN];
buf = taosDecodeSSchemaWrapper(buf, &pTopicEp->schema); char indexType[TSDB_INDEX_TYPE_LEN];
return buf; char indexExts[TSDB_INDEX_EXTS_LEN];
} SUserIndexRsp;
int32_t tSerializeSUserIndexRsp(void* buf, int32_t bufLen, const SUserIndexRsp* pRsp);
int32_t tDeserializeSUserIndexRsp(void* buf, int32_t bufLen, SUserIndexRsp* pRsp);
typedef struct {
int8_t mqMsgType;
int32_t code;
int32_t epoch;
int64_t consumerId;
} SMqRspHead;
typedef struct {
SMsgHead head;
char subKey[TSDB_SUBSCRIBE_KEY_LEN];
int8_t withTbName;
int32_t epoch;
uint64_t reqId;
int64_t consumerId;
int64_t waitTime;
int64_t currentOffset;
} SMqPollReq;
typedef struct {
int32_t vgId;
int64_t offset;
SEpSet epSet;
} SMqSubVgEp;
static FORCE_INLINE int32_t tEncodeSMqSubVgEp(void** buf, const SMqSubVgEp* pVgEp) {
int32_t tlen = 0;
tlen += taosEncodeFixedI32(buf, pVgEp->vgId);
tlen += taosEncodeFixedI64(buf, pVgEp->offset);
tlen += taosEncodeSEpSet(buf, &pVgEp->epSet);
return tlen;
}
static FORCE_INLINE void* tDecodeSMqSubVgEp(void* buf, SMqSubVgEp* pVgEp) {
buf = taosDecodeFixedI32(buf, &pVgEp->vgId);
buf = taosDecodeFixedI64(buf, &pVgEp->offset);
buf = taosDecodeSEpSet(buf, &pVgEp->epSet);
return buf;
}
typedef struct {
char topic[TSDB_TOPIC_FNAME_LEN];
int8_t isSchemaAdaptive;
SArray* vgs; // SArray<SMqSubVgEp>
SSchemaWrapper schema;
} SMqSubTopicEp;
static FORCE_INLINE int32_t tEncodeSMqSubTopicEp(void** buf, const SMqSubTopicEp* pTopicEp) {
int32_t tlen = 0;
tlen += taosEncodeString(buf, pTopicEp->topic);
tlen += taosEncodeFixedI8(buf, pTopicEp->isSchemaAdaptive);
int32_t sz = taosArrayGetSize(pTopicEp->vgs);
tlen += taosEncodeFixedI32(buf, sz);
for (int32_t i = 0; i < sz; i++) {
SMqSubVgEp* pVgEp = (SMqSubVgEp*)taosArrayGet(pTopicEp->vgs, i);
tlen += tEncodeSMqSubVgEp(buf, pVgEp);
} }
tlen += taosEncodeSSchemaWrapper(buf, &pTopicEp->schema);
return tlen;
}
static FORCE_INLINE void tDeleteSMqSubTopicEp(SMqSubTopicEp * pSubTopicEp) { static FORCE_INLINE void* tDecodeSMqSubTopicEp(void* buf, SMqSubTopicEp* pTopicEp) {
// taosMemoryFree(pSubTopicEp->schema.pSchema); buf = taosDecodeStringTo(buf, pTopicEp->topic);
taosArrayDestroy(pSubTopicEp->vgs); buf = taosDecodeFixedI8(buf, &pTopicEp->isSchemaAdaptive);
int32_t sz;
buf = taosDecodeFixedI32(buf, &sz);
pTopicEp->vgs = taosArrayInit(sz, sizeof(SMqSubVgEp));
if (pTopicEp->vgs == NULL) {
return NULL;
} }
for (int32_t i = 0; i < sz; i++) {
SMqSubVgEp vgEp;
buf = tDecodeSMqSubVgEp(buf, &vgEp);
taosArrayPush(pTopicEp->vgs, &vgEp);
}
buf = taosDecodeSSchemaWrapper(buf, &pTopicEp->schema);
return buf;
}
static FORCE_INLINE void tDeleteSMqSubTopicEp(SMqSubTopicEp* pSubTopicEp) {
// taosMemoryFree(pSubTopicEp->schema.pSchema);
taosArrayDestroy(pSubTopicEp->vgs);
}
typedef struct { typedef struct {
SMqRspHead head; SMqRspHead head;
int64_t reqOffset; int64_t reqOffset;
int64_t rspOffset; int64_t rspOffset;
int32_t skipLogNum; int32_t skipLogNum;
int32_t blockNum; int32_t blockNum;
int8_t withTbName; int8_t withTbName;
int8_t withSchema; int8_t withSchema;
int8_t withTag; int8_t withTag;
SArray* blockDataLen; // SArray<int32_t> SArray* blockDataLen; // SArray<int32_t>
SArray* blockData; // SArray<SRetrieveTableRsp*> SArray* blockData; // SArray<SRetrieveTableRsp*>
SArray* blockTbName; // SArray<char*> SArray* blockTbName; // SArray<char*>
SArray* blockSchema; // SArray<SSchemaWrapper> SArray* blockSchema; // SArray<SSchemaWrapper>
SArray* blockTags; // SArray<kvrow> SArray* blockTags; // SArray<kvrow>
SArray* blockTagSchema; // SArray<kvrow> SArray* blockTagSchema; // SArray<kvrow>
} SMqDataBlkRsp; } SMqDataBlkRsp;
static FORCE_INLINE int32_t tEncodeSMqDataBlkRsp(void** buf, const SMqDataBlkRsp* pRsp) { static FORCE_INLINE int32_t tEncodeSMqDataBlkRsp(void** buf, const SMqDataBlkRsp* pRsp) {
int32_t tlen = 0; int32_t tlen = 0;
tlen += taosEncodeFixedI64(buf, pRsp->reqOffset); tlen += taosEncodeFixedI64(buf, pRsp->reqOffset);
tlen += taosEncodeFixedI64(buf, pRsp->rspOffset); tlen += taosEncodeFixedI64(buf, pRsp->rspOffset);
tlen += taosEncodeFixedI32(buf, pRsp->skipLogNum); tlen += taosEncodeFixedI32(buf, pRsp->skipLogNum);
tlen += taosEncodeFixedI32(buf, pRsp->blockNum); tlen += taosEncodeFixedI32(buf, pRsp->blockNum);
if (pRsp->blockNum != 0) { if (pRsp->blockNum != 0) {
tlen += taosEncodeFixedI8(buf, pRsp->withTbName); tlen += taosEncodeFixedI8(buf, pRsp->withTbName);
tlen += taosEncodeFixedI8(buf, pRsp->withSchema); tlen += taosEncodeFixedI8(buf, pRsp->withSchema);
tlen += taosEncodeFixedI8(buf, pRsp->withTag); tlen += taosEncodeFixedI8(buf, pRsp->withTag);
for (int32_t i = 0; i < pRsp->blockNum; i++) { for (int32_t i = 0; i < pRsp->blockNum; i++) {
int32_t bLen = *(int32_t*)taosArrayGet(pRsp->blockDataLen, i); int32_t bLen = *(int32_t*)taosArrayGet(pRsp->blockDataLen, i);
void* data = taosArrayGetP(pRsp->blockData, i); void* data = taosArrayGetP(pRsp->blockData, i);
tlen += taosEncodeFixedI32(buf, bLen); tlen += taosEncodeFixedI32(buf, bLen);
tlen += taosEncodeBinary(buf, data, bLen); tlen += taosEncodeBinary(buf, data, bLen);
if (pRsp->withSchema) { if (pRsp->withSchema) {
SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(pRsp->blockSchema, i); SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(pRsp->blockSchema, i);
tlen += taosEncodeSSchemaWrapper(buf, pSW); tlen += taosEncodeSSchemaWrapper(buf, pSW);
} }
if (pRsp->withTbName) { if (pRsp->withTbName) {
char* tbName = (char*)taosArrayGetP(pRsp->blockTbName, i); char* tbName = (char*)taosArrayGetP(pRsp->blockTbName, i);
tlen += taosEncodeString(buf, tbName); tlen += taosEncodeString(buf, tbName);
}
} }
} }
return tlen;
} }
return tlen;
}
static FORCE_INLINE void* tDecodeSMqDataBlkRsp(const void* buf, SMqDataBlkRsp* pRsp) { static FORCE_INLINE void* tDecodeSMqDataBlkRsp(const void* buf, SMqDataBlkRsp* pRsp) {
buf = taosDecodeFixedI64(buf, &pRsp->reqOffset); buf = taosDecodeFixedI64(buf, &pRsp->reqOffset);
buf = taosDecodeFixedI64(buf, &pRsp->rspOffset); buf = taosDecodeFixedI64(buf, &pRsp->rspOffset);
buf = taosDecodeFixedI32(buf, &pRsp->skipLogNum); buf = taosDecodeFixedI32(buf, &pRsp->skipLogNum);
buf = taosDecodeFixedI32(buf, &pRsp->blockNum); buf = taosDecodeFixedI32(buf, &pRsp->blockNum);
pRsp->blockData = taosArrayInit(pRsp->blockNum, sizeof(void*)); pRsp->blockData = taosArrayInit(pRsp->blockNum, sizeof(void*));
pRsp->blockDataLen = taosArrayInit(pRsp->blockNum, sizeof(void*)); pRsp->blockDataLen = taosArrayInit(pRsp->blockNum, sizeof(void*));
pRsp->blockTbName = taosArrayInit(pRsp->blockNum, sizeof(void*)); pRsp->blockTbName = taosArrayInit(pRsp->blockNum, sizeof(void*));
pRsp->blockSchema = taosArrayInit(pRsp->blockNum, sizeof(void*)); pRsp->blockSchema = taosArrayInit(pRsp->blockNum, sizeof(void*));
if (pRsp->blockNum != 0) { if (pRsp->blockNum != 0) {
buf = taosDecodeFixedI8(buf, &pRsp->withTbName); buf = taosDecodeFixedI8(buf, &pRsp->withTbName);
buf = taosDecodeFixedI8(buf, &pRsp->withSchema); buf = taosDecodeFixedI8(buf, &pRsp->withSchema);
buf = taosDecodeFixedI8(buf, &pRsp->withTag); buf = taosDecodeFixedI8(buf, &pRsp->withTag);
for (int32_t i = 0; i < pRsp->blockNum; i++) { for (int32_t i = 0; i < pRsp->blockNum; i++) {
int32_t bLen = 0; int32_t bLen = 0;
void* data = NULL; void* data = NULL;
buf = taosDecodeFixedI32(buf, &bLen); buf = taosDecodeFixedI32(buf, &bLen);
buf = taosDecodeBinary(buf, &data, bLen); buf = taosDecodeBinary(buf, &data, bLen);
taosArrayPush(pRsp->blockDataLen, &bLen); taosArrayPush(pRsp->blockDataLen, &bLen);
taosArrayPush(pRsp->blockData, &data); taosArrayPush(pRsp->blockData, &data);
if (pRsp->withSchema) { if (pRsp->withSchema) {
SSchemaWrapper* pSW = (SSchemaWrapper*)taosMemoryMalloc(sizeof(SSchemaWrapper)); SSchemaWrapper* pSW = (SSchemaWrapper*)taosMemoryMalloc(sizeof(SSchemaWrapper));
buf = taosDecodeSSchemaWrapper(buf, pSW); buf = taosDecodeSSchemaWrapper(buf, pSW);
taosArrayPush(pRsp->blockSchema, &pSW); taosArrayPush(pRsp->blockSchema, &pSW);
} }
if (pRsp->withTbName) { if (pRsp->withTbName) {
char* name = NULL; char* name = NULL;
buf = taosDecodeString(buf, &name); buf = taosDecodeString(buf, &name);
taosArrayPush(pRsp->blockTbName, &name); taosArrayPush(pRsp->blockTbName, &name);
}
} }
} }
return (void*)buf;
} }
return (void*)buf;
}
typedef struct { typedef struct {
SMqRspHead head; SMqRspHead head;
char cgroup[TSDB_CGROUP_LEN]; char cgroup[TSDB_CGROUP_LEN];
SArray* topics; // SArray<SMqSubTopicEp> SArray* topics; // SArray<SMqSubTopicEp>
} SMqAskEpRsp; } SMqAskEpRsp;
static FORCE_INLINE int32_t tEncodeSMqAskEpRsp(void** buf, const SMqAskEpRsp* pRsp) {
int32_t tlen = 0;
// tlen += taosEncodeString(buf, pRsp->cgroup);
int32_t sz = taosArrayGetSize(pRsp->topics);
tlen += taosEncodeFixedI32(buf, sz);
for (int32_t i = 0; i < sz; i++) {
SMqSubTopicEp* pVgEp = (SMqSubTopicEp*)taosArrayGet(pRsp->topics, i);
tlen += tEncodeSMqSubTopicEp(buf, pVgEp);
}
return tlen;
}
static FORCE_INLINE void* tDecodeSMqAskEpRsp(void* buf, SMqAskEpRsp* pRsp) { static FORCE_INLINE int32_t tEncodeSMqAskEpRsp(void** buf, const SMqAskEpRsp* pRsp) {
// buf = taosDecodeStringTo(buf, pRsp->cgroup); int32_t tlen = 0;
int32_t sz; // tlen += taosEncodeString(buf, pRsp->cgroup);
buf = taosDecodeFixedI32(buf, &sz); int32_t sz = taosArrayGetSize(pRsp->topics);
pRsp->topics = taosArrayInit(sz, sizeof(SMqSubTopicEp)); tlen += taosEncodeFixedI32(buf, sz);
if (pRsp->topics == NULL) { for (int32_t i = 0; i < sz; i++) {
return NULL; SMqSubTopicEp* pVgEp = (SMqSubTopicEp*)taosArrayGet(pRsp->topics, i);
} tlen += tEncodeSMqSubTopicEp(buf, pVgEp);
for (int32_t i = 0; i < sz; i++) {
SMqSubTopicEp topicEp;
buf = tDecodeSMqSubTopicEp(buf, &topicEp);
taosArrayPush(pRsp->topics, &topicEp);
}
return buf;
} }
return tlen;
}
static FORCE_INLINE void tDeleteSMqAskEpRsp(SMqAskEpRsp * pRsp) { static FORCE_INLINE void* tDecodeSMqAskEpRsp(void* buf, SMqAskEpRsp* pRsp) {
taosArrayDestroyEx(pRsp->topics, (void (*)(void*))tDeleteSMqSubTopicEp); // buf = taosDecodeStringTo(buf, pRsp->cgroup);
int32_t sz;
buf = taosDecodeFixedI32(buf, &sz);
pRsp->topics = taosArrayInit(sz, sizeof(SMqSubTopicEp));
if (pRsp->topics == NULL) {
return NULL;
}
for (int32_t i = 0; i < sz; i++) {
SMqSubTopicEp topicEp;
buf = tDecodeSMqSubTopicEp(buf, &topicEp);
taosArrayPush(pRsp->topics, &topicEp);
} }
return buf;
}
static FORCE_INLINE void tDeleteSMqAskEpRsp(SMqAskEpRsp* pRsp) {
taosArrayDestroyEx(pRsp->topics, (void (*)(void*))tDeleteSMqSubTopicEp);
}
typedef struct { typedef struct {
void* data; void* data;
......
...@@ -3581,7 +3581,7 @@ int32_t tEncodeTSma(SEncoder *pCoder, const STSma *pSma) { ...@@ -3581,7 +3581,7 @@ int32_t tEncodeTSma(SEncoder *pCoder, const STSma *pSma) {
if (pSma->tagsFilterLen > 0) { if (pSma->tagsFilterLen > 0) {
if (tEncodeCStr(pCoder, pSma->tagsFilter) < 0) return -1; if (tEncodeCStr(pCoder, pSma->tagsFilter) < 0) return -1;
} }
return 0; return 0;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册