未验证 提交 ce0eb894 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #16503 from taosdata/feature/stream

feat(tmq): support taosx
......@@ -254,6 +254,7 @@ enum tmq_res_t {
TMQ_RES_INVALID = -1,
TMQ_RES_DATA = 1,
TMQ_RES_TABLE_META = 2,
TMQ_RES_TAOSX = 3,
};
typedef struct tmq_raw_data {
......
......@@ -73,6 +73,7 @@ enum {
TMQ_MSG_TYPE__POLL_RSP,
TMQ_MSG_TYPE__POLL_META_RSP,
TMQ_MSG_TYPE__EP_RSP,
TMQ_MSG_TYPE__TAOSX_RSP,
TMQ_MSG_TYPE__END_RSP,
};
......@@ -129,7 +130,6 @@ typedef struct SDataBlockInfo {
uint32_t capacity;
// TODO: optimize and remove following
int64_t version; // used for stream, and need serialization
int64_t ts; // used for stream, and need serialization
int32_t childId; // used for stream, do not serialize
EStreamType type; // used for stream, do not serialize
STimeWindow calWin; // used for stream, do not serialize
......
......@@ -276,7 +276,6 @@ struct SSchema {
char name[TSDB_COL_NAME_LEN];
};
typedef struct {
char tbName[TSDB_TABLE_NAME_LEN];
char stbName[TSDB_TABLE_NAME_LEN];
......@@ -295,17 +294,15 @@ typedef struct {
SSchema* pSchemas;
} STableMetaRsp;
typedef struct {
int32_t code;
int8_t hashMeta;
int64_t uid;
char* tblFName;
int32_t numOfRows;
int32_t affectedRows;
int64_t sver;
STableMetaRsp* pMeta;
int32_t code;
int8_t hashMeta;
int64_t uid;
char* tblFName;
int32_t numOfRows;
int32_t affectedRows;
int64_t sver;
STableMetaRsp* pMeta;
} SSubmitBlkRsp;
typedef struct {
......@@ -320,7 +317,7 @@ typedef struct {
int32_t tEncodeSSubmitRsp(SEncoder* pEncoder, const SSubmitRsp* pRsp);
int32_t tDecodeSSubmitRsp(SDecoder* pDecoder, SSubmitRsp* pRsp);
void tFreeSSubmitBlkRsp(void* param);
void tFreeSSubmitBlkRsp(void* param);
void tFreeSSubmitRsp(SSubmitRsp* pRsp);
#define COL_SMA_ON ((int8_t)0x1)
......@@ -2049,8 +2046,8 @@ typedef struct {
STableMetaRsp* pMeta;
} SVCreateTbRsp, SVUpdateTbRsp;
int tEncodeSVCreateTbRsp(SEncoder* pCoder, const SVCreateTbRsp* pRsp);
int tDecodeSVCreateTbRsp(SDecoder* pCoder, SVCreateTbRsp* pRsp);
int tEncodeSVCreateTbRsp(SEncoder* pCoder, const SVCreateTbRsp* pRsp);
int tDecodeSVCreateTbRsp(SDecoder* pCoder, SVCreateTbRsp* pRsp);
void tFreeSVCreateTbRsp(void* param);
int32_t tSerializeSVCreateTbReq(void** buf, SVCreateTbReq* pReq);
......@@ -2961,6 +2958,25 @@ typedef struct {
int32_t tEncodeSMqDataRsp(SEncoder* pEncoder, const SMqDataRsp* pRsp);
int32_t tDecodeSMqDataRsp(SDecoder* pDecoder, SMqDataRsp* pRsp);
typedef struct {
SMqRspHead head;
STqOffsetVal reqOffset;
STqOffsetVal rspOffset;
int32_t blockNum;
int8_t withTbName;
int8_t withSchema;
SArray* blockDataLen;
SArray* blockData;
SArray* blockTbName;
SArray* blockSchema;
int32_t createTableNum;
SArray* createTableLen;
SArray* createTableReq;
} STaosxRsp;
int32_t tEncodeSTaosxRsp(SEncoder* pEncoder, const STaosxRsp* pRsp);
int32_t tDecodeSTaosxRsp(SDecoder* pDecoder, STaosxRsp* pRsp);
typedef struct {
SMqRspHead head;
char cgroup[TSDB_CGROUP_LEN];
......
......@@ -264,12 +264,14 @@ static FORCE_INLINE int32_t tEncodeDouble(SEncoder* pCoder, double val) {
static FORCE_INLINE int32_t tEncodeBinary(SEncoder* pCoder, const uint8_t* val, uint32_t len) {
if (tEncodeU32v(pCoder, len) < 0) return -1;
if (pCoder->data) {
if (TD_CODER_CHECK_CAPACITY_FAILED(pCoder, len)) return -1;
memcpy(TD_CODER_CURRENT(pCoder), val, len);
}
if (len) {
if (pCoder->data) {
if (TD_CODER_CHECK_CAPACITY_FAILED(pCoder, len)) return -1;
memcpy(TD_CODER_CURRENT(pCoder), val, len);
}
TD_CODER_MOVE_POS(pCoder, len);
TD_CODER_MOVE_POS(pCoder, len);
}
return 0;
}
......@@ -414,14 +416,18 @@ static int32_t tDecodeCStrTo(SDecoder* pCoder, char* val) {
static FORCE_INLINE int32_t tDecodeBinaryAlloc(SDecoder* pCoder, void** val, uint64_t* len) {
uint64_t length = 0;
if (tDecodeU64v(pCoder, &length) < 0) return -1;
if (len) *len = length;
if (length) {
if (len) *len = length;
if (TD_CODER_CHECK_CAPACITY_FAILED(pCoder, length)) return -1;
*val = taosMemoryMalloc(length);
if (*val == NULL) return -1;
memcpy(*val, TD_CODER_CURRENT(pCoder), length);
if (TD_CODER_CHECK_CAPACITY_FAILED(pCoder, length)) return -1;
*val = taosMemoryMalloc(length);
if (*val == NULL) return -1;
memcpy(*val, TD_CODER_CURRENT(pCoder), length);
TD_CODER_MOVE_POS(pCoder, length);
TD_CODER_MOVE_POS(pCoder, length);
} else {
*val = NULL;
}
return 0;
}
......
......@@ -52,15 +52,17 @@ enum {
RES_TYPE__QUERY = 1,
RES_TYPE__TMQ,
RES_TYPE__TMQ_META,
RES_TYPE__TAOSX,
};
#define SHOW_VARIABLES_RESULT_COLS 2
#define SHOW_VARIABLES_RESULT_FIELD1_LEN (TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE)
#define SHOW_VARIABLES_RESULT_FIELD2_LEN (TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE)
#define TD_RES_QUERY(res) (*(int8_t*)res == RES_TYPE__QUERY)
#define TD_RES_TMQ(res) (*(int8_t*)res == RES_TYPE__TMQ)
#define TD_RES_TMQ_META(res) (*(int8_t*)res == RES_TYPE__TMQ_META)
#define TD_RES_QUERY(res) (*(int8_t*)res == RES_TYPE__QUERY)
#define TD_RES_TMQ(res) (*(int8_t*)res == RES_TYPE__TMQ || *(int8_t*)res == RES_TYPE__TAOSX)
#define TD_RES_TMQ_META(res) (*(int8_t*)res == RES_TYPE__TMQ_META)
#define TD_RES_TMQ_TAOSX(res) (*(int8_t*)res == RES_TYPE__TAOSX)
typedef struct SAppInstInfo SAppInstInfo;
......@@ -199,8 +201,8 @@ typedef struct {
int32_t vgId;
SSchemaWrapper schema;
int32_t resIter;
SMqDataRsp rsp;
SReqResultInfo resInfo;
SMqDataRsp rsp;
} SMqRspObj;
typedef struct {
......@@ -211,6 +213,17 @@ typedef struct {
SMqMetaRsp metaRsp;
} SMqMetaRspObj;
typedef struct {
int8_t resType;
char topic[TSDB_TOPIC_FNAME_LEN];
char db[TSDB_DB_FNAME_LEN];
int32_t vgId;
SSchemaWrapper schema;
int32_t resIter;
SReqResultInfo resInfo;
STaosxRsp rsp;
} SMqTaosxRspObj;
typedef struct SRequestObj {
int8_t resType; // query or tmq
uint64_t requestId;
......@@ -370,7 +383,7 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData*
int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest);
int32_t updateQnodeList(SAppInstInfo* pInfo, SArray* pNodeList);
void doAsyncQuery(SRequestObj* pRequest, bool forceUpdateMeta);
int32_t removeMeta(STscObj* pTscObj, SArray* tbList);
int32_t removeMeta(STscObj* pTscObj, SArray* tbList);
int32_t handleAlterTbExecRes(void* res, struct SCatalog* pCatalog);
int32_t handleCreateTbExecRes(void* res, SCatalog* pCatalog);
bool qnodeRequired(SRequestObj* pRequest);
......
......@@ -184,6 +184,19 @@ void taos_free_result(TAOS_RES *res) {
SRequestObj *pRequest = (SRequestObj *)res;
tscDebug("0x%" PRIx64 " taos_free_result start to free query", pRequest->requestId);
destroyRequest(pRequest);
} else if (TD_RES_TMQ_TAOSX(res)) {
SMqTaosxRspObj *pRsp = (SMqTaosxRspObj *)res;
if (pRsp->rsp.blockData) taosArrayDestroyP(pRsp->rsp.blockData, taosMemoryFree);
if (pRsp->rsp.blockDataLen) taosArrayDestroy(pRsp->rsp.blockDataLen);
if (pRsp->rsp.withTbName) taosArrayDestroyP(pRsp->rsp.blockTbName, taosMemoryFree);
if (pRsp->rsp.withSchema) taosArrayDestroyP(pRsp->rsp.blockSchema, (FDelete)tDeleteSSchemaWrapper);
// taosx
taosArrayDestroy(pRsp->rsp.createTableLen);
taosArrayDestroyP(pRsp->rsp.createTableReq, taosMemoryFree);
pRsp->resInfo.pRspMsg = NULL;
doFreeReqResultInfo(&pRsp->resInfo);
taosMemoryFree(pRsp);
} else if (TD_RES_TMQ(res)) {
SMqRspObj *pRsp = (SMqRspObj *)res;
if (pRsp->rsp.blockData) taosArrayDestroyP(pRsp->rsp.blockData, taosMemoryFree);
......
......@@ -164,6 +164,7 @@ typedef struct {
union {
SMqDataRsp dataRsp;
SMqMetaRsp metaRsp;
STaosxRsp taosxRsp;
};
} SMqPollRspWrapper;
......@@ -1130,21 +1131,29 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
tDecodeSMqDataRsp(&decoder, &pRspWrapper->dataRsp);
tDecoderClear(&decoder);
memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead));
} else {
ASSERT(rspType == TMQ_MSG_TYPE__POLL_META_RSP);
tscDebug("consumer:%" PRId64 ", recv poll: vgId:%d, req offset %" PRId64 ", rsp offset %" PRId64 " type %d",
tmq->consumerId, pVg->vgId, pRspWrapper->dataRsp.reqOffset.version, pRspWrapper->dataRsp.rspOffset.version,
rspType);
} else if (rspType == TMQ_MSG_TYPE__POLL_META_RSP) {
SDecoder decoder;
tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
tDecodeSMqMetaRsp(&decoder, &pRspWrapper->metaRsp);
tDecoderClear(&decoder);
memcpy(&pRspWrapper->metaRsp, pMsg->pData, sizeof(SMqRspHead));
} else if (rspType == TMQ_MSG_TYPE__TAOSX_RSP) {
SDecoder decoder;
tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
tDecodeSTaosxRsp(&decoder, &pRspWrapper->taosxRsp);
tDecoderClear(&decoder);
memcpy(&pRspWrapper->taosxRsp, pMsg->pData, sizeof(SMqRspHead));
} else {
ASSERT(0);
}
taosMemoryFree(pMsg->pData);
tscDebug("consumer:%" PRId64 ", recv poll: vgId:%d, req offset %" PRId64 ", rsp offset %" PRId64 " type %d",
tmq->consumerId, pVg->vgId, pRspWrapper->dataRsp.reqOffset.version, pRspWrapper->dataRsp.rspOffset.version,
rspType);
taosWriteQitem(tmq->mqueue, pRspWrapper);
tsem_post(&tmq->rspSem);
......@@ -1443,6 +1452,24 @@ SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) {
return pRspObj;
}
SMqTaosxRspObj* tmqBuildTaosxRspFromWrapper(SMqPollRspWrapper* pWrapper) {
SMqTaosxRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqTaosxRspObj));
pRspObj->resType = RES_TYPE__TAOSX;
tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
pRspObj->vgId = pWrapper->vgHandle->vgId;
pRspObj->resIter = -1;
memcpy(&pRspObj->rsp, &pWrapper->dataRsp, sizeof(SMqTaosxRspObj));
pRspObj->resInfo.totalRows = 0;
pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
if (!pWrapper->dataRsp.withSchema) {
setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols);
}
return pRspObj;
}
int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
/*tscDebug("call poll");*/
for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
......@@ -1595,6 +1622,30 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
pollRspWrapper->metaRsp.head.epoch, consumerEpoch);
taosFreeQitem(pollRspWrapper);
}
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__TAOSX_RSP) {
SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
/*atomic_sub_fetch_32(&tmq->readyRequest, 1);*/
int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
if (pollRspWrapper->taosxRsp.head.epoch == consumerEpoch) {
SMqClientVg* pVg = pollRspWrapper->vgHandle;
/*printf("vgId:%d, offset %" PRId64 " up to %" PRId64 "\n", pVg->vgId, pVg->currentOffset,
* rspMsg->msg.rspOffset);*/
pVg->currentOffset = pollRspWrapper->taosxRsp.rspOffset;
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
if (pollRspWrapper->taosxRsp.blockNum == 0) {
taosFreeQitem(pollRspWrapper);
rspWrapper = NULL;
continue;
}
// build rsp
SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper);
taosFreeQitem(pollRspWrapper);
return pRsp;
} else {
tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d\n",
pollRspWrapper->taosxRsp.head.epoch, consumerEpoch);
taosFreeQitem(pollRspWrapper);
}
} else {
/*printf("handle ep rsp %d\n", rspMsg->head.mqMsgType);*/
bool reset = false;
......@@ -1707,9 +1758,11 @@ tmq_res_t tmq_get_res_type(TAOS_RES* res) {
} else if (TD_RES_TMQ_META(res)) {
SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_DELETE) {
return TMQ_RES_DATA;
return TMQ_RES_TAOSX;
}
return TMQ_RES_TABLE_META;
} else if (TD_RES_TMQ_TAOSX(res)) {
return TMQ_RES_TAOSX;
} else {
return TMQ_RES_INVALID;
}
......
......@@ -3330,7 +3330,7 @@ int32_t tDeserializeSSTbHbRsp(void *buf, int32_t bufLen, SSTbHbRsp *pRsp) {
return 0;
}
void tFreeSTableMetaRsp(void *pRsp) { taosMemoryFreeClear(((STableMetaRsp*)pRsp)->pSchemas); }
void tFreeSTableMetaRsp(void *pRsp) { taosMemoryFreeClear(((STableMetaRsp *)pRsp)->pSchemas); }
void tFreeSTableIndexRsp(void *info) {
if (NULL == info) {
......@@ -5119,17 +5119,17 @@ int tDecodeSVCreateTbRsp(SDecoder *pCoder, SVCreateTbRsp *pRsp) {
} else {
pRsp->pMeta = NULL;
}
tEndDecode(pCoder);
return 0;
}
void tFreeSVCreateTbRsp(void* param) {
void tFreeSVCreateTbRsp(void *param) {
if (NULL == param) {
return;
}
SVCreateTbRsp* pRsp = (SVCreateTbRsp*)param;
SVCreateTbRsp *pRsp = (SVCreateTbRsp *)param;
if (pRsp->pMeta) {
taosMemoryFree(pRsp->pMeta->pSchemas);
taosMemoryFree(pRsp->pMeta);
......@@ -5345,7 +5345,7 @@ static int32_t tDecodeSSubmitBlkRsp(SDecoder *pDecoder, SSubmitBlkRsp *pBlock) {
if (tDecodeI32v(pDecoder, &pBlock->numOfRows) < 0) return -1;
if (tDecodeI32v(pDecoder, &pBlock->affectedRows) < 0) return -1;
if (tDecodeI64v(pDecoder, &pBlock->sver) < 0) return -1;
int32_t meta = 0;
if (tDecodeI32(pDecoder, &meta) < 0) return -1;
if (meta) {
......@@ -5393,12 +5393,12 @@ int32_t tDecodeSSubmitRsp(SDecoder *pDecoder, SSubmitRsp *pRsp) {
return 0;
}
void tFreeSSubmitBlkRsp(void* param) {
void tFreeSSubmitBlkRsp(void *param) {
if (NULL == param) {
return;
}
SSubmitBlkRsp* pRsp = (SSubmitBlkRsp*)param;
SSubmitBlkRsp *pRsp = (SSubmitBlkRsp *)param;
taosMemoryFree(pRsp->tblFName);
if (pRsp->pMeta) {
......@@ -5407,7 +5407,6 @@ void tFreeSSubmitBlkRsp(void* param) {
}
}
void tFreeSSubmitRsp(SSubmitRsp *pRsp) {
if (NULL == pRsp) return;
......@@ -5619,7 +5618,6 @@ void tFreeSMAlterStbRsp(SMAlterStbRsp *pRsp) {
}
}
int32_t tEncodeSMCreateStbRsp(SEncoder *pEncoder, const SMCreateStbRsp *pRsp) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->pMeta->pSchemas ? 1 : 0) < 0) return -1;
......@@ -5671,8 +5669,6 @@ void tFreeSMCreateStbRsp(SMCreateStbRsp *pRsp) {
}
}
int32_t tEncodeSTqOffsetVal(SEncoder *pEncoder, const STqOffsetVal *pOffsetVal) {
if (tEncodeI8(pEncoder, pOffsetVal->type) < 0) return -1;
if (pOffsetVal->type == TMQ_OFFSET__SNAPSHOT_DATA || pOffsetVal->type == TMQ_OFFSET__SNAPSHOT_META) {
......@@ -5690,7 +5686,7 @@ int32_t tEncodeSTqOffsetVal(SEncoder *pEncoder, const STqOffsetVal *pOffsetVal)
int32_t tDecodeSTqOffsetVal(SDecoder *pDecoder, STqOffsetVal *pOffsetVal) {
if (tDecodeI8(pDecoder, &pOffsetVal->type) < 0) return -1;
if (pOffsetVal->type == TMQ_OFFSET__SNAPSHOT_DATA || pOffsetVal->type == TMQ_OFFSET__SNAPSHOT_META) {
if (pOffsetVal->type == TMQ_OFFSET__SNAPSHOT_DATA || pOffsetVal->type == TMQ_OFFSET__SNAPSHOT_META) {
if (tDecodeI64(pDecoder, &pOffsetVal->uid) < 0) return -1;
if (tDecodeI64(pDecoder, &pOffsetVal->ts) < 0) return -1;
} else if (pOffsetVal->type == TMQ_OFFSET__LOG) {
......@@ -5712,7 +5708,7 @@ int32_t tFormatOffset(char *buf, int32_t maxLen, const STqOffsetVal *pVal) {
snprintf(buf, maxLen, "offset(reset to latest)");
} else if (pVal->type == TMQ_OFFSET__LOG) {
snprintf(buf, maxLen, "offset(log) ver:%" PRId64, pVal->version);
} else if (pVal->type == TMQ_OFFSET__SNAPSHOT_DATA || pVal->type == TMQ_OFFSET__SNAPSHOT_META) {
} else if (pVal->type == TMQ_OFFSET__SNAPSHOT_DATA || pVal->type == TMQ_OFFSET__SNAPSHOT_META) {
snprintf(buf, maxLen, "offset(ss data) uid:%" PRId64 ", ts:%" PRId64, pVal->uid, pVal->ts);
} else {
ASSERT(0);
......@@ -5813,17 +5809,17 @@ int32_t tDecodeDeleteRes(SDecoder *pCoder, SDeleteRes *pRes) {
return 0;
}
int32_t tEncodeSMqMetaRsp(SEncoder* pEncoder, const SMqMetaRsp* pRsp) {
int32_t tEncodeSMqMetaRsp(SEncoder *pEncoder, const SMqMetaRsp *pRsp) {
if (tEncodeSTqOffsetVal(pEncoder, &pRsp->rspOffset) < 0) return -1;
if(tEncodeI16(pEncoder, pRsp->resMsgType)) return -1;
if(tEncodeBinary(pEncoder, pRsp->metaRsp, pRsp->metaRspLen)) return -1;
if (tEncodeI16(pEncoder, pRsp->resMsgType)) return -1;
if (tEncodeBinary(pEncoder, pRsp->metaRsp, pRsp->metaRspLen)) return -1;
return 0;
}
int32_t tDecodeSMqMetaRsp(SDecoder* pDecoder, SMqMetaRsp* pRsp) {
int32_t tDecodeSMqMetaRsp(SDecoder *pDecoder, SMqMetaRsp *pRsp) {
if (tDecodeSTqOffsetVal(pDecoder, &pRsp->rspOffset) < 0) return -1;
if (tDecodeI16(pDecoder, &pRsp->resMsgType) < 0) return -1;
if (tDecodeBinaryAlloc(pDecoder, &pRsp->metaRsp, (uint64_t*)&pRsp->metaRspLen) < 0) return -1;
if (tDecodeBinaryAlloc(pDecoder, &pRsp->metaRsp, (uint64_t *)&pRsp->metaRspLen) < 0) return -1;
return 0;
}
......@@ -5893,6 +5889,92 @@ int32_t tDecodeSMqDataRsp(SDecoder *pDecoder, SMqDataRsp *pRsp) {
return 0;
}
int32_t tEncodeSTaosxRsp(SEncoder *pEncoder, const STaosxRsp *pRsp) {
if (tEncodeSTqOffsetVal(pEncoder, &pRsp->reqOffset) < 0) return -1;
if (tEncodeSTqOffsetVal(pEncoder, &pRsp->rspOffset) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->blockNum) < 0) return -1;
if (pRsp->blockNum != 0) {
if (tEncodeI8(pEncoder, pRsp->withTbName) < 0) return -1;
if (tEncodeI8(pEncoder, pRsp->withSchema) < 0) return -1;
for (int32_t i = 0; i < pRsp->blockNum; i++) {
int32_t bLen = *(int32_t *)taosArrayGet(pRsp->blockDataLen, i);
void *data = taosArrayGetP(pRsp->blockData, i);
if (tEncodeBinary(pEncoder, (const uint8_t *)data, bLen) < 0) return -1;
if (pRsp->withSchema) {
SSchemaWrapper *pSW = (SSchemaWrapper *)taosArrayGetP(pRsp->blockSchema, i);
if (tEncodeSSchemaWrapper(pEncoder, pSW) < 0) return -1;
}
if (pRsp->withTbName) {
char *tbName = (char *)taosArrayGetP(pRsp->blockTbName, i);
if (tEncodeCStr(pEncoder, tbName) < 0) return -1;
}
}
}
if (tEncodeI32(pEncoder, pRsp->createTableNum) < 0) return -1;
if (pRsp->createTableNum) {
for (int32_t i = 0; i < pRsp->createTableNum; i++) {
void *createTableReq = taosArrayGetP(pRsp->createTableReq, i);
int32_t createTableLen = *(int32_t *)taosArrayGet(pRsp->createTableLen, i);
if (tEncodeBinary(pEncoder, createTableReq, createTableLen) < 0) return -1;
}
}
return 0;
}
int32_t tDecodeSTaosxRsp(SDecoder *pDecoder, STaosxRsp *pRsp) {
if (tDecodeSTqOffsetVal(pDecoder, &pRsp->reqOffset) < 0) return -1;
if (tDecodeSTqOffsetVal(pDecoder, &pRsp->rspOffset) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->blockNum) < 0) return -1;
if (pRsp->blockNum != 0) {
pRsp->blockData = taosArrayInit(pRsp->blockNum, sizeof(void *));
pRsp->blockDataLen = taosArrayInit(pRsp->blockNum, sizeof(int32_t));
if (tDecodeI8(pDecoder, &pRsp->withTbName) < 0) return -1;
if (tDecodeI8(pDecoder, &pRsp->withSchema) < 0) return -1;
if (pRsp->withTbName) {
pRsp->blockTbName = taosArrayInit(pRsp->blockNum, sizeof(void *));
}
if (pRsp->withSchema) {
pRsp->blockSchema = taosArrayInit(pRsp->blockNum, sizeof(void *));
}
for (int32_t i = 0; i < pRsp->blockNum; i++) {
void *data;
uint64_t bLen;
if (tDecodeBinaryAlloc(pDecoder, &data, &bLen) < 0) return -1;
taosArrayPush(pRsp->blockData, &data);
int32_t len = bLen;
taosArrayPush(pRsp->blockDataLen, &len);
if (pRsp->withSchema) {
SSchemaWrapper *pSW = (SSchemaWrapper *)taosMemoryCalloc(1, sizeof(SSchemaWrapper));
if (pSW == NULL) return -1;
if (tDecodeSSchemaWrapper(pDecoder, pSW) < 0) return -1;
taosArrayPush(pRsp->blockSchema, &pSW);
}
if (pRsp->withTbName) {
char *tbName;
if (tDecodeCStrAlloc(pDecoder, &tbName) < 0) return -1;
taosArrayPush(pRsp->blockTbName, &tbName);
}
}
}
if (tDecodeI32(pDecoder, &pRsp->createTableNum) < 0) return -1;
if (pRsp->createTableNum) {
pRsp->createTableLen = taosArrayInit(pRsp->createTableNum, sizeof(int32_t));
pRsp->createTableReq = taosArrayInit(pRsp->createTableNum, sizeof(void *));
for (int32_t i = 0; i < pRsp->createTableNum; i++) {
void *pCreate = NULL;
uint64_t len;
if (tDecodeBinaryAlloc(pDecoder, &pCreate, &len) < 0) return -1;
int32_t l = (int32_t)len;
taosArrayPush(pRsp->createTableLen, &l);
taosArrayPush(pRsp->createTableReq, &pCreate);
}
}
return 0;
}
int32_t tEncodeSSingleDeleteReq(SEncoder *pEncoder, const SSingleDeleteReq *pReq) {
if (tEncodeI64(pEncoder, pReq->uid) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->ts) < 0) return -1;
......
......@@ -287,6 +287,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
if (consumerVgNum > minVgCnt) {
if (imbCnt < imbConsumerNum) {
if (consumerVgNum == minVgCnt + 1) {
imbCnt++;
continue;
} else {
// pop until equal minVg + 1
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册