未验证 提交 c3f22b2c 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #10629 from taosdata/feature/tq

add tq push
...@@ -135,7 +135,7 @@ static FORCE_INLINE void* tDecodeDataBlock(const void* buf, SSDataBlock* pBlock) ...@@ -135,7 +135,7 @@ static FORCE_INLINE void* tDecodeDataBlock(const void* buf, SSDataBlock* pBlock)
return (void*)buf; return (void*)buf;
} }
static FORCE_INLINE int32_t tEncodeSMqConsumeRsp(void** buf, const SMqConsumeRsp* pRsp) { static FORCE_INLINE int32_t tEncodeSMqPollRsp(void** buf, const SMqPollRsp* pRsp) {
int32_t tlen = 0; int32_t tlen = 0;
int32_t sz = 0; int32_t sz = 0;
tlen += taosEncodeFixedI64(buf, pRsp->consumerId); tlen += taosEncodeFixedI64(buf, pRsp->consumerId);
...@@ -156,7 +156,7 @@ static FORCE_INLINE int32_t tEncodeSMqConsumeRsp(void** buf, const SMqConsumeRsp ...@@ -156,7 +156,7 @@ static FORCE_INLINE int32_t tEncodeSMqConsumeRsp(void** buf, const SMqConsumeRsp
return tlen; return tlen;
} }
static FORCE_INLINE void* tDecodeSMqConsumeRsp(void* buf, SMqConsumeRsp* pRsp) { static FORCE_INLINE void* tDecodeSMqPollRsp(void* buf, SMqPollRsp* pRsp) {
int32_t sz; int32_t sz;
buf = taosDecodeFixedI64(buf, &pRsp->consumerId); buf = taosDecodeFixedI64(buf, &pRsp->consumerId);
buf = taosDecodeFixedI64(buf, &pRsp->reqOffset); buf = taosDecodeFixedI64(buf, &pRsp->reqOffset);
...@@ -194,7 +194,7 @@ static FORCE_INLINE void tDeleteSSDataBlock(SSDataBlock* pBlock) { ...@@ -194,7 +194,7 @@ static FORCE_INLINE void tDeleteSSDataBlock(SSDataBlock* pBlock) {
// tfree(pBlock); // tfree(pBlock);
} }
static FORCE_INLINE void tDeleteSMqConsumeRsp(SMqConsumeRsp* pRsp) { static FORCE_INLINE void tDeleteSMqConsumeRsp(SMqPollRsp* pRsp) {
if (pRsp->schemas) { if (pRsp->schemas) {
if (pRsp->schemas->nCols) { if (pRsp->schemas->nCols) {
tfree(pRsp->schemas->pSchema); tfree(pRsp->schemas->pSchema);
......
...@@ -795,7 +795,7 @@ typedef struct SVgroupInfo { ...@@ -795,7 +795,7 @@ typedef struct SVgroupInfo {
int32_t vgId; int32_t vgId;
uint32_t hashBegin; uint32_t hashBegin;
uint32_t hashEnd; uint32_t hashEnd;
SEpSet epset; SEpSet epSet;
} SVgroupInfo; } SVgroupInfo;
typedef struct { typedef struct {
...@@ -1876,8 +1876,8 @@ typedef struct { ...@@ -1876,8 +1876,8 @@ typedef struct {
} SVCreateTSmaReq; } SVCreateTSmaReq;
typedef struct { typedef struct {
int8_t type; // 0 status report, 1 update data int8_t type; // 0 status report, 1 update data
char indexName[TSDB_INDEX_NAME_LEN + 1]; // char indexName[TSDB_INDEX_NAME_LEN + 1]; //
STimeWindow windows; STimeWindow windows;
} STSmaMsg; } STSmaMsg;
...@@ -2073,7 +2073,7 @@ typedef struct { ...@@ -2073,7 +2073,7 @@ typedef struct {
int32_t skipLogNum; int32_t skipLogNum;
int32_t numOfTopics; int32_t numOfTopics;
SArray* pBlockData; // SArray<SSDataBlock> SArray* pBlockData; // SArray<SSDataBlock>
} SMqConsumeRsp; } SMqPollRsp;
// one req for one vg+topic // one req for one vg+topic
typedef struct { typedef struct {
...@@ -2086,7 +2086,7 @@ typedef struct { ...@@ -2086,7 +2086,7 @@ typedef struct {
int64_t currentOffset; int64_t currentOffset;
char topic[TSDB_TOPIC_FNAME_LEN]; char topic[TSDB_TOPIC_FNAME_LEN];
} SMqConsumeReq; } SMqPollReq;
typedef struct { typedef struct {
int32_t vgId; int32_t vgId;
...@@ -2108,7 +2108,7 @@ typedef struct { ...@@ -2108,7 +2108,7 @@ typedef struct {
struct tmq_message_t { struct tmq_message_t {
SMqRspHead head; SMqRspHead head;
union { union {
SMqConsumeRsp consumeRsp; SMqPollRsp consumeRsp;
SMqCMGetSubEpRsp getEpRsp; SMqCMGetSubEpRsp getEpRsp;
}; };
void* extra; void* extra;
......
...@@ -44,10 +44,10 @@ enum { ...@@ -44,10 +44,10 @@ enum {
}; };
typedef struct STableComInfo { typedef struct STableComInfo {
uint8_t numOfTags; // the number of tags in schema uint8_t numOfTags; // the number of tags in schema
uint8_t precision; // the number of precision uint8_t precision; // the number of precision
int16_t numOfColumns; // the number of columns int16_t numOfColumns; // the number of columns
int32_t rowSize; // row size of the schema int32_t rowSize; // row size of the schema
} STableComInfo; } STableComInfo;
/* /*
...@@ -56,49 +56,45 @@ typedef struct STableComInfo { ...@@ -56,49 +56,45 @@ typedef struct STableComInfo {
* The cached child table meta info. For each child table, 24 bytes are required to keep the essential table info. * The cached child table meta info. For each child table, 24 bytes are required to keep the essential table info.
*/ */
typedef struct SCTableMeta { typedef struct SCTableMeta {
int32_t vgId:24; int32_t vgId : 24;
int8_t tableType; int8_t tableType;
uint64_t uid; uint64_t uid;
uint64_t suid; uint64_t suid;
} SCTableMeta; } SCTableMeta;
/* /*
* Note that the first 24 bytes of STableMeta are identical to SCTableMeta, it is safe to cast a STableMeta to be a SCTableMeta. * Note that the first 24 bytes of STableMeta are identical to SCTableMeta, it is safe to cast a STableMeta to be a
* SCTableMeta.
*/ */
typedef struct STableMeta { typedef struct STableMeta {
//BEGIN: KEEP THIS PART SAME WITH SCTableMeta // BEGIN: KEEP THIS PART SAME WITH SCTableMeta
int32_t vgId:24; int32_t vgId : 24;
int8_t tableType; int8_t tableType;
uint64_t uid; uint64_t uid;
uint64_t suid; uint64_t suid;
//END: KEEP THIS PART SAME WITH SCTableMeta // END: KEEP THIS PART SAME WITH SCTableMeta
// if the table is TSDB_CHILD_TABLE, the following information is acquired from the corresponding super table meta info // if the table is TSDB_CHILD_TABLE, the following information is acquired from the corresponding super table meta
int16_t sversion; // info
int16_t tversion; int16_t sversion;
STableComInfo tableInfo; int16_t tversion;
SSchema schema[]; STableComInfo tableInfo;
SSchema schema[];
} STableMeta; } STableMeta;
typedef struct SDBVgInfo { typedef struct SDBVgInfo {
int32_t vgVersion; int32_t vgVersion;
int8_t hashMethod; int8_t hashMethod;
SHashObj *vgHash; //key:vgId, value:SVgroupInfo SHashObj* vgHash; // key:vgId, value:SVgroupInfo
} SDBVgInfo; } SDBVgInfo;
typedef struct SUseDbOutput { typedef struct SUseDbOutput {
char db[TSDB_DB_FNAME_LEN]; char db[TSDB_DB_FNAME_LEN];
uint64_t dbId; uint64_t dbId;
SDBVgInfo *dbVgroup; SDBVgInfo* dbVgroup;
} SUseDbOutput; } SUseDbOutput;
enum { enum { META_TYPE_NULL_TABLE = 1, META_TYPE_CTABLE, META_TYPE_TABLE, META_TYPE_BOTH_TABLE };
META_TYPE_NULL_TABLE = 1,
META_TYPE_CTABLE,
META_TYPE_TABLE,
META_TYPE_BOTH_TABLE
};
typedef struct STableMetaOutput { typedef struct STableMetaOutput {
int32_t metaType; int32_t metaType;
...@@ -107,30 +103,30 @@ typedef struct STableMetaOutput { ...@@ -107,30 +103,30 @@ typedef struct STableMetaOutput {
char ctbName[TSDB_TABLE_NAME_LEN]; char ctbName[TSDB_TABLE_NAME_LEN];
char tbName[TSDB_TABLE_NAME_LEN]; char tbName[TSDB_TABLE_NAME_LEN];
SCTableMeta ctbMeta; SCTableMeta ctbMeta;
STableMeta *tbMeta; STableMeta* tbMeta;
} STableMetaOutput; } STableMetaOutput;
typedef struct SDataBuf { typedef struct SDataBuf {
void *pData; void* pData;
uint32_t len; uint32_t len;
void *handle; void* handle;
} SDataBuf; } SDataBuf;
typedef int32_t (*__async_send_cb_fn_t)(void* param, const SDataBuf* pMsg, int32_t code); typedef int32_t (*__async_send_cb_fn_t)(void* param, const SDataBuf* pMsg, int32_t code);
typedef int32_t (*__async_exec_fn_t)(void* param); typedef int32_t (*__async_exec_fn_t)(void* param);
typedef struct SMsgSendInfo { typedef struct SMsgSendInfo {
__async_send_cb_fn_t fp; //async callback function __async_send_cb_fn_t fp; // async callback function
void *param; void* param;
uint64_t requestId; uint64_t requestId;
uint64_t requestObjRefId; uint64_t requestObjRefId;
int32_t msgType; int32_t msgType;
SDataBuf msgInfo; SDataBuf msgInfo;
} SMsgSendInfo; } SMsgSendInfo;
typedef struct SQueryNodeAddr { typedef struct SQueryNodeAddr {
int32_t nodeId; // vgId or qnodeId int32_t nodeId; // vgId or qnodeId
SEpSet epset; SEpSet epSet;
} SQueryNodeAddr; } SQueryNodeAddr;
int32_t initTaskQueue(); int32_t initTaskQueue();
...@@ -154,32 +150,67 @@ int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code); ...@@ -154,32 +150,67 @@ int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code);
* @param pInfo * @param pInfo
* @return * @return
*/ */
int32_t asyncSendMsgToServer(void *pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo); int32_t asyncSendMsgToServer(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo);
int32_t queryBuildUseDbOutput(SUseDbOutput *pOut, SUseDbRsp *usedbRsp); int32_t queryBuildUseDbOutput(SUseDbOutput* pOut, SUseDbRsp* usedbRsp);
void initQueryModuleMsgHandle(); void initQueryModuleMsgHandle();
const SSchema* tGetTbnameColumnSchema(); const SSchema* tGetTbnameColumnSchema();
bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTags); bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTags);
int32_t queryCreateTableMetaFromMsg(STableMetaRsp* msg, bool isSuperTable, STableMeta **pMeta); int32_t queryCreateTableMetaFromMsg(STableMetaRsp* msg, bool isSuperTable, STableMeta** pMeta);
extern int32_t (*queryBuildMsg[TDMT_MAX])(void* input, char **msg, int32_t msgSize, int32_t *msgLen); extern int32_t (*queryBuildMsg[TDMT_MAX])(void* input, char** msg, int32_t msgSize, int32_t* msgLen);
extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char *msg, int32_t msgSize); extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t msgSize);
#define SET_META_TYPE_NULL(t) (t) = META_TYPE_NULL_TABLE #define SET_META_TYPE_NULL(t) (t) = META_TYPE_NULL_TABLE
#define SET_META_TYPE_CTABLE(t) (t) = META_TYPE_CTABLE #define SET_META_TYPE_CTABLE(t) (t) = META_TYPE_CTABLE
#define SET_META_TYPE_TABLE(t) (t) = META_TYPE_TABLE #define SET_META_TYPE_TABLE(t) (t) = META_TYPE_TABLE
#define SET_META_TYPE_BOTH_TABLE(t) (t) = META_TYPE_BOTH_TABLE #define SET_META_TYPE_BOTH_TABLE(t) (t) = META_TYPE_BOTH_TABLE
#define qFatal(...) do { if (qDebugFlag & DEBUG_FATAL) { taosPrintLog("QRY FATAL ", DEBUG_FATAL, qDebugFlag, __VA_ARGS__); }} while(0) #define qFatal(...) \
#define qError(...) do { if (qDebugFlag & DEBUG_ERROR) { taosPrintLog("QRY ERROR ", DEBUG_ERROR, qDebugFlag, __VA_ARGS__); }} while(0) do { \
#define qWarn(...) do { if (qDebugFlag & DEBUG_WARN) { taosPrintLog("QRY WARN ", DEBUG_WARN, qDebugFlag, __VA_ARGS__); }} while(0) if (qDebugFlag & DEBUG_FATAL) { \
#define qInfo(...) do { if (qDebugFlag & DEBUG_INFO) { taosPrintLog("QRY ", DEBUG_INFO, qDebugFlag, __VA_ARGS__); }} while(0) taosPrintLog("QRY FATAL ", DEBUG_FATAL, qDebugFlag, __VA_ARGS__); \
#define qDebug(...) do { if (qDebugFlag & DEBUG_DEBUG) { taosPrintLog("QRY ", DEBUG_DEBUG, qDebugFlag, __VA_ARGS__); }} while(0) } \
#define qTrace(...) do { if (qDebugFlag & DEBUG_TRACE) { taosPrintLog("QRY ", DEBUG_TRACE, qDebugFlag, __VA_ARGS__); }} while(0) } while (0)
#define qDebugL(...) do { if (qDebugFlag & DEBUG_DEBUG) { taosPrintLongString("QRY ", DEBUG_DEBUG, qDebugFlag, __VA_ARGS__); }} while(0) #define qError(...) \
do { \
if (qDebugFlag & DEBUG_ERROR) { \
taosPrintLog("QRY ERROR ", DEBUG_ERROR, qDebugFlag, __VA_ARGS__); \
} \
} while (0)
#define qWarn(...) \
do { \
if (qDebugFlag & DEBUG_WARN) { \
taosPrintLog("QRY WARN ", DEBUG_WARN, qDebugFlag, __VA_ARGS__); \
} \
} while (0)
#define qInfo(...) \
do { \
if (qDebugFlag & DEBUG_INFO) { \
taosPrintLog("QRY ", DEBUG_INFO, qDebugFlag, __VA_ARGS__); \
} \
} while (0)
#define qDebug(...) \
do { \
if (qDebugFlag & DEBUG_DEBUG) { \
taosPrintLog("QRY ", DEBUG_DEBUG, qDebugFlag, __VA_ARGS__); \
} \
} while (0)
#define qTrace(...) \
do { \
if (qDebugFlag & DEBUG_TRACE) { \
taosPrintLog("QRY ", DEBUG_TRACE, qDebugFlag, __VA_ARGS__); \
} \
} while (0)
#define qDebugL(...) \
do { \
if (qDebugFlag & DEBUG_DEBUG) { \
taosPrintLongString("QRY ", DEBUG_DEBUG, qDebugFlag, __VA_ARGS__); \
} \
} while (0)
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -517,7 +517,7 @@ void* doFetchRow(SRequestObj* pRequest) { ...@@ -517,7 +517,7 @@ void* doFetchRow(SRequestObj* pRequest) {
SShowReqInfo* pShowReqInfo = &pRequest->body.showInfo; SShowReqInfo* pShowReqInfo = &pRequest->body.showInfo;
SVgroupInfo* pVgroupInfo = taosArrayGet(pShowReqInfo->pArray, pShowReqInfo->currentIndex); SVgroupInfo* pVgroupInfo = taosArrayGet(pShowReqInfo->pArray, pShowReqInfo->currentIndex);
epSet = pVgroupInfo->epset; epSet = pVgroupInfo->epSet;
} else if (pRequest->type == TDMT_VND_SHOW_TABLES_FETCH) { } else if (pRequest->type == TDMT_VND_SHOW_TABLES_FETCH) {
pRequest->type = TDMT_VND_SHOW_TABLES; pRequest->type = TDMT_VND_SHOW_TABLES;
SShowReqInfo* pShowReqInfo = &pRequest->body.showInfo; SShowReqInfo* pShowReqInfo = &pRequest->body.showInfo;
...@@ -534,7 +534,7 @@ void* doFetchRow(SRequestObj* pRequest) { ...@@ -534,7 +534,7 @@ void* doFetchRow(SRequestObj* pRequest) {
pRequest->body.requestMsg.pData = pShowReq; pRequest->body.requestMsg.pData = pShowReq;
SMsgSendInfo* body = buildMsgInfoImpl(pRequest); SMsgSendInfo* body = buildMsgInfoImpl(pRequest);
epSet = pVgroupInfo->epset; epSet = pVgroupInfo->epSet;
int64_t transporterId = 0; int64_t transporterId = 0;
STscObj* pTscObj = pRequest->pTscObj; STscObj* pTscObj = pRequest->pTscObj;
......
...@@ -13,8 +13,6 @@ ...@@ -13,8 +13,6 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#define _DEFAULT_SOURCE
#include "clientInt.h" #include "clientInt.h"
#include "clientLog.h" #include "clientLog.h"
#include "parser.h" #include "parser.h"
...@@ -606,17 +604,17 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) { ...@@ -606,17 +604,17 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) {
int32_t tmqGetSkipLogNum(tmq_message_t* tmq_message) { int32_t tmqGetSkipLogNum(tmq_message_t* tmq_message) {
if (tmq_message == NULL) return 0; if (tmq_message == NULL) return 0;
SMqConsumeRsp* pRsp = &tmq_message->consumeRsp; SMqPollRsp* pRsp = &tmq_message->consumeRsp;
return pRsp->skipLogNum; return pRsp->skipLogNum;
} }
void tmqShowMsg(tmq_message_t* tmq_message) { void tmqShowMsg(tmq_message_t* tmq_message) {
if (tmq_message == NULL) return; if (tmq_message == NULL) return;
static bool noPrintSchema; static bool noPrintSchema;
char pBuf[128]; char pBuf[128];
SMqConsumeRsp* pRsp = &tmq_message->consumeRsp; SMqPollRsp* pRsp = &tmq_message->consumeRsp;
int32_t colNum = pRsp->schemas->nCols; int32_t colNum = pRsp->schemas->nCols;
if (!noPrintSchema) { if (!noPrintSchema) {
printf("|"); printf("|");
for (int32_t i = 0; i < colNum; i++) { for (int32_t i = 0; i < colNum; i++) {
...@@ -703,7 +701,7 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { ...@@ -703,7 +701,7 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
goto WRITE_QUEUE_FAIL; goto WRITE_QUEUE_FAIL;
} }
memcpy(pRsp, pMsg->pData, sizeof(SMqRspHead)); memcpy(pRsp, pMsg->pData, sizeof(SMqRspHead));
tDecodeSMqConsumeRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRsp->consumeRsp); tDecodeSMqPollRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRsp->consumeRsp);
/*printf("rsp commit off:%ld rsp off:%ld has data:%d\n", pRsp->committedOffset, pRsp->rspOffset, pRsp->numOfTopics);*/ /*printf("rsp commit off:%ld rsp off:%ld has data:%d\n", pRsp->committedOffset, pRsp->rspOffset, pRsp->numOfTopics);*/
if (pRsp->consumeRsp.numOfTopics == 0) { if (pRsp->consumeRsp.numOfTopics == 0) {
/*printf("no data\n");*/ /*printf("no data\n");*/
...@@ -874,7 +872,7 @@ tmq_resp_err_t tmq_seek(tmq_t* tmq, const tmq_topic_vgroup_t* offset) { ...@@ -874,7 +872,7 @@ tmq_resp_err_t tmq_seek(tmq_t* tmq, const tmq_topic_vgroup_t* offset) {
return TMQ_RESP_ERR__FAIL; return TMQ_RESP_ERR__FAIL;
} }
SMqConsumeReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t blockingTime, SMqClientTopic* pTopic, SMqClientVg* pVg) { SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t blockingTime, SMqClientTopic* pTopic, SMqClientVg* pVg) {
int64_t reqOffset; int64_t reqOffset;
if (pVg->currentOffset >= 0) { if (pVg->currentOffset >= 0) {
reqOffset = pVg->currentOffset; reqOffset = pVg->currentOffset;
...@@ -886,7 +884,7 @@ SMqConsumeReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t blockingTime, SMqClien ...@@ -886,7 +884,7 @@ SMqConsumeReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t blockingTime, SMqClien
reqOffset = tmq->resetOffsetCfg; reqOffset = tmq->resetOffsetCfg;
} }
SMqConsumeReq* pReq = malloc(sizeof(SMqConsumeReq)); SMqPollReq* pReq = malloc(sizeof(SMqPollReq));
if (pReq == NULL) { if (pReq == NULL) {
return NULL; return NULL;
} }
...@@ -900,7 +898,7 @@ SMqConsumeReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t blockingTime, SMqClien ...@@ -900,7 +898,7 @@ SMqConsumeReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t blockingTime, SMqClien
pReq->currentOffset = reqOffset; pReq->currentOffset = reqOffset;
pReq->head.vgId = htonl(pVg->vgId); pReq->head.vgId = htonl(pVg->vgId);
pReq->head.contLen = htonl(sizeof(SMqConsumeReq)); pReq->head.contLen = htonl(sizeof(SMqPollReq));
return pReq; return pReq;
} }
...@@ -914,7 +912,7 @@ tmq_message_t* tmqSyncPollImpl(tmq_t* tmq, int64_t blockingTime) { ...@@ -914,7 +912,7 @@ tmq_message_t* tmqSyncPollImpl(tmq_t* tmq, int64_t blockingTime) {
/*if (vgStatus != TMQ_VG_STATUS__IDLE) {*/ /*if (vgStatus != TMQ_VG_STATUS__IDLE) {*/
/*continue;*/ /*continue;*/
/*}*/ /*}*/
SMqConsumeReq* pReq = tmqBuildConsumeReqImpl(tmq, blockingTime, pTopic, pVg); SMqPollReq* pReq = tmqBuildConsumeReqImpl(tmq, blockingTime, pTopic, pVg);
if (pReq == NULL) { if (pReq == NULL) {
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
// TODO: out of mem // TODO: out of mem
...@@ -941,7 +939,7 @@ tmq_message_t* tmqSyncPollImpl(tmq_t* tmq, int64_t blockingTime) { ...@@ -941,7 +939,7 @@ tmq_message_t* tmqSyncPollImpl(tmq_t* tmq, int64_t blockingTime) {
sendInfo->msgInfo = (SDataBuf){ sendInfo->msgInfo = (SDataBuf){
.pData = pReq, .pData = pReq,
.len = sizeof(SMqConsumeReq), .len = sizeof(SMqPollReq),
.handle = NULL, .handle = NULL,
}; };
sendInfo->requestId = generateRequestId(); sendInfo->requestId = generateRequestId();
...@@ -982,7 +980,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) { ...@@ -982,7 +980,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) {
if (vgStatus != TMQ_VG_STATUS__IDLE) { if (vgStatus != TMQ_VG_STATUS__IDLE) {
continue; continue;
} }
SMqConsumeReq* pReq = tmqBuildConsumeReqImpl(tmq, blockingTime, pTopic, pVg); SMqPollReq* pReq = tmqBuildConsumeReqImpl(tmq, blockingTime, pTopic, pVg);
if (pReq == NULL) { if (pReq == NULL) {
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
tsem_post(&tmq->rspSem); tsem_post(&tmq->rspSem);
...@@ -1011,7 +1009,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) { ...@@ -1011,7 +1009,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) {
sendInfo->msgInfo = (SDataBuf){ sendInfo->msgInfo = (SDataBuf){
.pData = pReq, .pData = pReq,
.len = sizeof(SMqConsumeReq), .len = sizeof(SMqPollReq),
.handle = NULL, .handle = NULL,
}; };
sendInfo->requestId = generateRequestId(); sendInfo->requestId = generateRequestId();
...@@ -1271,7 +1269,7 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* tmq_topic_v ...@@ -1271,7 +1269,7 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* tmq_topic_v
void tmq_message_destroy(tmq_message_t* tmq_message) { void tmq_message_destroy(tmq_message_t* tmq_message) {
if (tmq_message == NULL) return; if (tmq_message == NULL) return;
SMqConsumeRsp* pRsp = &tmq_message->consumeRsp; SMqPollRsp* pRsp = &tmq_message->consumeRsp;
tDeleteSMqConsumeRsp(pRsp); tDeleteSMqConsumeRsp(pRsp);
/*free(tmq_message);*/ /*free(tmq_message);*/
taosFreeQitem(tmq_message); taosFreeQitem(tmq_message);
......
...@@ -1481,7 +1481,7 @@ static int32_t tSerializeSUseDbRspImp(SCoder *pEncoder, SUseDbRsp *pRsp) { ...@@ -1481,7 +1481,7 @@ static int32_t tSerializeSUseDbRspImp(SCoder *pEncoder, SUseDbRsp *pRsp) {
if (tEncodeI32(pEncoder, pVgInfo->vgId) < 0) return -1; if (tEncodeI32(pEncoder, pVgInfo->vgId) < 0) return -1;
if (tEncodeU32(pEncoder, pVgInfo->hashBegin) < 0) return -1; if (tEncodeU32(pEncoder, pVgInfo->hashBegin) < 0) return -1;
if (tEncodeU32(pEncoder, pVgInfo->hashEnd) < 0) return -1; if (tEncodeU32(pEncoder, pVgInfo->hashEnd) < 0) return -1;
if (tEncodeSEpSet(pEncoder, &pVgInfo->epset) < 0) return -1; if (tEncodeSEpSet(pEncoder, &pVgInfo->epSet) < 0) return -1;
} }
return 0; return 0;
...@@ -1541,7 +1541,7 @@ int32_t tDeserializeSUseDbRspImp(SCoder *pDecoder, SUseDbRsp *pRsp) { ...@@ -1541,7 +1541,7 @@ int32_t tDeserializeSUseDbRspImp(SCoder *pDecoder, SUseDbRsp *pRsp) {
if (tDecodeI32(pDecoder, &vgInfo.vgId) < 0) return -1; if (tDecodeI32(pDecoder, &vgInfo.vgId) < 0) return -1;
if (tDecodeU32(pDecoder, &vgInfo.hashBegin) < 0) return -1; if (tDecodeU32(pDecoder, &vgInfo.hashBegin) < 0) return -1;
if (tDecodeU32(pDecoder, &vgInfo.hashEnd) < 0) return -1; if (tDecodeU32(pDecoder, &vgInfo.hashEnd) < 0) return -1;
if (tDecodeSEpSet(pDecoder, &vgInfo.epset) < 0) return -1; if (tDecodeSEpSet(pDecoder, &vgInfo.epSet) < 0) return -1;
taosArrayPush(pRsp->pVgroupInfos, &vgInfo); taosArrayPush(pRsp->pVgroupInfos, &vgInfo);
} }
......
...@@ -900,10 +900,10 @@ static void mndBuildDBVgroupInfo(SDbObj *pDb, SMnode *pMnode, SArray *pVgList) { ...@@ -900,10 +900,10 @@ static void mndBuildDBVgroupInfo(SDbObj *pDb, SMnode *pMnode, SArray *pVgList) {
vgInfo.vgId = pVgroup->vgId; vgInfo.vgId = pVgroup->vgId;
vgInfo.hashBegin = pVgroup->hashBegin; vgInfo.hashBegin = pVgroup->hashBegin;
vgInfo.hashEnd = pVgroup->hashEnd; vgInfo.hashEnd = pVgroup->hashEnd;
vgInfo.epset.numOfEps = pVgroup->replica; vgInfo.epSet.numOfEps = pVgroup->replica;
for (int32_t gid = 0; gid < pVgroup->replica; ++gid) { for (int32_t gid = 0; gid < pVgroup->replica; ++gid) {
SVnodeGid *pVgid = &pVgroup->vnodeGid[gid]; SVnodeGid *pVgid = &pVgroup->vnodeGid[gid];
SEp *pEp = &vgInfo.epset.eps[gid]; SEp *pEp = &vgInfo.epSet.eps[gid];
SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId); SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
if (pDnode != NULL) { if (pDnode != NULL) {
memcpy(pEp->fqdn, pDnode->fqdn, TSDB_FQDN_LEN); memcpy(pEp->fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
...@@ -911,7 +911,7 @@ static void mndBuildDBVgroupInfo(SDbObj *pDb, SMnode *pMnode, SArray *pVgList) { ...@@ -911,7 +911,7 @@ static void mndBuildDBVgroupInfo(SDbObj *pDb, SMnode *pMnode, SArray *pVgList) {
} }
mndReleaseDnode(pMnode, pDnode); mndReleaseDnode(pMnode, pDnode);
if (pVgid->role == TAOS_SYNC_STATE_LEADER) { if (pVgid->role == TAOS_SYNC_STATE_LEADER) {
vgInfo.epset.inUse = gid; vgInfo.epSet.inUse = gid;
} }
} }
vindex++; vindex++;
......
...@@ -33,23 +33,29 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib ...@@ -33,23 +33,29 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
SSdb* pSdb = pMnode->pSdb; SSdb* pSdb = pMnode->pSdb;
SVgObj* pVgroup = NULL; SVgObj* pVgroup = NULL;
SQueryDag* pDag = qStringToDag(pTopic->physicalPlan); SQueryDag* pDag = qStringToDag(pTopic->physicalPlan);
SArray* pAray = NULL; if (pDag == NULL) {
SArray* unassignedVg = pSub->unassignedVg; terrno = TSDB_CODE_QRY_INVALID_INPUT;
return -1;
}
ASSERT(pSub->vgNum == 0); ASSERT(pSub->vgNum == 0);
int32_t levelNum = taosArrayGetSize(pDag->pSubplans); int32_t levelNum = taosArrayGetSize(pDag->pSubplans);
if (levelNum != 1) { if (levelNum != 1) {
qDestroyQueryDag(pDag);
terrno = TSDB_CODE_MND_UNSUPPORTED_TOPIC;
return -1; return -1;
} }
SArray* inner = taosArrayGet(pDag->pSubplans, 0); SArray* plans = taosArrayGetP(pDag->pSubplans, 0);
int32_t opNum = taosArrayGetSize(inner); int32_t opNum = taosArrayGetSize(plans);
if (opNum != 1) { if (opNum != 1) {
qDestroyQueryDag(pDag);
terrno = TSDB_CODE_MND_UNSUPPORTED_TOPIC;
return -1; return -1;
} }
SSubplan* plan = taosArrayGetP(inner, 0); SSubplan* plan = taosArrayGetP(plans, 0);
void* pIter = NULL; void* pIter = NULL;
while (1) { while (1) {
...@@ -62,17 +68,24 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib ...@@ -62,17 +68,24 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
pSub->vgNum++; pSub->vgNum++;
plan->execNode.nodeId = pVgroup->vgId; plan->execNode.nodeId = pVgroup->vgId;
plan->execNode.epset = mndGetVgroupEpset(pMnode, pVgroup); plan->execNode.epSet = mndGetVgroupEpset(pMnode, pVgroup);
SMqConsumerEp consumerEp = {0}; SMqConsumerEp consumerEp = {0};
consumerEp.status = 0; consumerEp.status = 0;
consumerEp.consumerId = -1; consumerEp.consumerId = -1;
consumerEp.epSet = plan->execNode.epset; consumerEp.epSet = plan->execNode.epSet;
consumerEp.vgId = plan->execNode.nodeId; consumerEp.vgId = plan->execNode.nodeId;
int32_t msgLen; int32_t msgLen;
int32_t code = qSubPlanToString(plan, &consumerEp.qmsg, &msgLen); if (qSubPlanToString(plan, &consumerEp.qmsg, &msgLen) < 0) {
taosArrayPush(unassignedVg, &consumerEp); sdbRelease(pSdb, pVgroup);
qDestroyQueryDag(pDag);
terrno = TSDB_CODE_QRY_INVALID_INPUT;
return -1;
}
taosArrayPush(pSub->unassignedVg, &consumerEp);
} }
qDestroyQueryDag(pDag);
return 0; return 0;
} }
...@@ -93,7 +93,6 @@ static SMqSubscribeObj *mndCreateSubscription(SMnode *pMnode, const SMqTopicObj ...@@ -93,7 +93,6 @@ static SMqSubscribeObj *mndCreateSubscription(SMnode *pMnode, const SMqTopicObj
strcpy(pSub->key, key); strcpy(pSub->key, key);
if (mndSchedInitSubEp(pMnode, pTopic, pSub) < 0) { if (mndSchedInitSubEp(pMnode, pTopic, pSub) < 0) {
terrno = TSDB_CODE_MND_UNSUPPORTED_TOPIC;
tDeleteSMqSubscribeObj(pSub); tDeleteSMqSubscribeObj(pSub);
free(pSub); free(pSub);
return NULL; return NULL;
...@@ -295,7 +294,11 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) { ...@@ -295,7 +294,11 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) {
for (int32_t k = 0; k < vgsz; k++) { for (int32_t k = 0; k < vgsz; k++) {
char offsetKey[TSDB_PARTITION_KEY_LEN]; char offsetKey[TSDB_PARTITION_KEY_LEN];
SMqConsumerEp *pConsumerEp = taosArrayGet(pSubConsumer->vgInfo, k); SMqConsumerEp *pConsumerEp = taosArrayGet(pSubConsumer->vgInfo, k);
SMqSubVgEp vgEp = {.epSet = pConsumerEp->epSet, .vgId = pConsumerEp->vgId, .offset = -1}; SMqSubVgEp vgEp = {
.epSet = pConsumerEp->epSet,
.vgId = pConsumerEp->vgId,
.offset = -1,
};
mndMakePartitionKey(offsetKey, pConsumer->cgroup, topicName, pConsumerEp->vgId); mndMakePartitionKey(offsetKey, pConsumer->cgroup, topicName, pConsumerEp->vgId);
SMqOffsetObj *pOffsetObj = mndAcquireOffset(pMnode, offsetKey); SMqOffsetObj *pOffsetObj = mndAcquireOffset(pMnode, offsetKey);
if (pOffsetObj != NULL) { if (pOffsetObj != NULL) {
...@@ -345,7 +348,7 @@ static SMqRebSubscribe *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) { ...@@ -345,7 +348,7 @@ static SMqRebSubscribe *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) {
if (pRebSub == NULL) { if (pRebSub == NULL) {
pRebSub = tNewSMqRebSubscribe(key); pRebSub = tNewSMqRebSubscribe(key);
if (pRebSub == NULL) { if (pRebSub == NULL) {
// TODO terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
} }
taosHashPut(pHash, key, strlen(key), pRebSub, sizeof(SMqRebSubscribe)); taosHashPut(pHash, key, strlen(key), pRebSub, sizeof(SMqRebSubscribe));
...@@ -412,7 +415,11 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) { ...@@ -412,7 +415,11 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
} }
if (taosHashGetSize(pRebMsg->rebSubHash) != 0) { if (taosHashGetSize(pRebMsg->rebSubHash) != 0) {
mInfo("mq rebalance will be triggered"); mInfo("mq rebalance will be triggered");
SRpcMsg rpcMsg = {.msgType = TDMT_MND_MQ_DO_REBALANCE, .pCont = pRebMsg, .contLen = sizeof(SMqDoRebalanceMsg)}; SRpcMsg rpcMsg = {
.msgType = TDMT_MND_MQ_DO_REBALANCE,
.pCont = pRebMsg,
.contLen = sizeof(SMqDoRebalanceMsg),
};
pMnode->putReqToMWriteQFp(pMnode->pDnode, &rpcMsg); pMnode->putReqToMWriteQFp(pMnode->pDnode, &rpcMsg);
} else { } else {
taosHashCleanup(pRebMsg->rebSubHash); taosHashCleanup(pRebMsg->rebSubHash);
......
...@@ -96,7 +96,11 @@ static void mndCalMqRebalance(void *param, void *tmrId) { ...@@ -96,7 +96,11 @@ static void mndCalMqRebalance(void *param, void *tmrId) {
if (mndIsMaster(pMnode)) { if (mndIsMaster(pMnode)) {
int32_t contLen = 0; int32_t contLen = 0;
void *pReq = mndBuildTimerMsg(&contLen); void *pReq = mndBuildTimerMsg(&contLen);
SRpcMsg rpcMsg = {.msgType = TDMT_MND_MQ_TIMER, .pCont = pReq, .contLen = contLen}; SRpcMsg rpcMsg = {
.msgType = TDMT_MND_MQ_TIMER,
.pCont = pReq,
.contLen = contLen,
};
pMnode->putReqToMReadQFp(pMnode->pDnode, &rpcMsg); pMnode->putReqToMReadQFp(pMnode->pDnode, &rpcMsg);
} }
...@@ -631,4 +635,4 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr ...@@ -631,4 +635,4 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr
} }
return 0; return 0;
} }
\ No newline at end of file
...@@ -292,9 +292,9 @@ TEST_F(MndTestDb, 03_Create_Use_Restart_Use_Db) { ...@@ -292,9 +292,9 @@ TEST_F(MndTestDb, 03_Create_Use_Restart_Use_Db) {
EXPECT_GT(pInfo->vgId, 0); EXPECT_GT(pInfo->vgId, 0);
EXPECT_EQ(pInfo->hashBegin, 0); EXPECT_EQ(pInfo->hashBegin, 0);
EXPECT_EQ(pInfo->hashEnd, UINT32_MAX / 2 - 1); EXPECT_EQ(pInfo->hashEnd, UINT32_MAX / 2 - 1);
EXPECT_EQ(pInfo->epset.inUse, 0); EXPECT_EQ(pInfo->epSet.inUse, 0);
EXPECT_EQ(pInfo->epset.numOfEps, 1); EXPECT_EQ(pInfo->epSet.numOfEps, 1);
SEp* pAddr = &pInfo->epset.eps[0]; SEp* pAddr = &pInfo->epSet.eps[0];
EXPECT_EQ(pAddr->port, 9030); EXPECT_EQ(pAddr->port, 9030);
EXPECT_STREQ(pAddr->fqdn, "localhost"); EXPECT_STREQ(pAddr->fqdn, "localhost");
} }
...@@ -307,9 +307,9 @@ TEST_F(MndTestDb, 03_Create_Use_Restart_Use_Db) { ...@@ -307,9 +307,9 @@ TEST_F(MndTestDb, 03_Create_Use_Restart_Use_Db) {
EXPECT_GT(pInfo->vgId, 0); EXPECT_GT(pInfo->vgId, 0);
EXPECT_EQ(pInfo->hashBegin, UINT32_MAX / 2); EXPECT_EQ(pInfo->hashBegin, UINT32_MAX / 2);
EXPECT_EQ(pInfo->hashEnd, UINT32_MAX); EXPECT_EQ(pInfo->hashEnd, UINT32_MAX);
EXPECT_EQ(pInfo->epset.inUse, 0); EXPECT_EQ(pInfo->epSet.inUse, 0);
EXPECT_EQ(pInfo->epset.numOfEps, 1); EXPECT_EQ(pInfo->epSet.numOfEps, 1);
SEp* pAddr = &pInfo->epset.eps[0]; SEp* pAddr = &pInfo->epSet.eps[0];
EXPECT_EQ(pAddr->port, 9030); EXPECT_EQ(pAddr->port, 9030);
EXPECT_STREQ(pAddr->fqdn, "localhost"); EXPECT_STREQ(pAddr->fqdn, "localhost");
} }
......
...@@ -13,16 +13,14 @@ ...@@ -13,16 +13,14 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#ifndef _TD_TQ_H_ #ifndef _TQ_H_
#define _TD_TQ_H_ #define _TQ_H_
#include "tcommon.h"
#include "executor.h" #include "executor.h"
#include "tmallocator.h"
#include "meta.h" #include "meta.h"
#include "scheduler.h"
#include "taoserror.h" #include "taoserror.h"
#include "tlist.h" #include "tcommon.h"
#include "tmallocator.h"
#include "tmsg.h" #include "tmsg.h"
#include "trpc.h" #include "trpc.h"
#include "ttimer.h" #include "ttimer.h"
...@@ -54,7 +52,7 @@ void tqClose(STQ*); ...@@ -54,7 +52,7 @@ void tqClose(STQ*);
int tqPushMsg(STQ*, void* msg, tmsg_t msgType, int64_t version); int tqPushMsg(STQ*, void* msg, tmsg_t msgType, int64_t version);
int tqCommit(STQ*); int tqCommit(STQ*);
int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessSetConnReq(STQ* pTq, char* msg); int32_t tqProcessSetConnReq(STQ* pTq, char* msg);
int32_t tqProcessRebReq(STQ* pTq, char* msg); int32_t tqProcessRebReq(STQ* pTq, char* msg);
...@@ -62,4 +60,4 @@ int32_t tqProcessRebReq(STQ* pTq, char* msg); ...@@ -62,4 +60,4 @@ int32_t tqProcessRebReq(STQ* pTq, char* msg);
} }
#endif #endif
#endif /*_TD_TQ_H_*/ #endif /*_TQ_H_*/
...@@ -19,7 +19,7 @@ ...@@ -19,7 +19,7 @@
#include "meta.h" #include "meta.h"
#include "tlog.h" #include "tlog.h"
#include "tq.h" #include "tq.h"
#include "trpc.h" #include "tqPush.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
...@@ -31,30 +31,35 @@ extern "C" { ...@@ -31,30 +31,35 @@ extern "C" {
taosPrintLog("TQ FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); \ taosPrintLog("TQ FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); \
} \ } \
} }
#define tqError(...) \ #define tqError(...) \
{ \ { \
if (tqDebugFlag & DEBUG_ERROR) { \ if (tqDebugFlag & DEBUG_ERROR) { \
taosPrintLog("TQ ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); \ taosPrintLog("TQ ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); \
} \ } \
} }
#define tqWarn(...) \ #define tqWarn(...) \
{ \ { \
if (tqDebugFlag & DEBUG_WARN) { \ if (tqDebugFlag & DEBUG_WARN) { \
taosPrintLog("TQ WARN ", DEBUG_WARN, 255, __VA_ARGS__); \ taosPrintLog("TQ WARN ", DEBUG_WARN, 255, __VA_ARGS__); \
} \ } \
} }
#define tqInfo(...) \ #define tqInfo(...) \
{ \ { \
if (tqDebugFlag & DEBUG_INFO) { \ if (tqDebugFlag & DEBUG_INFO) { \
taosPrintLog("TQ ", DEBUG_INFO, 255, __VA_ARGS__); \ taosPrintLog("TQ ", DEBUG_INFO, 255, __VA_ARGS__); \
} \ } \
} }
#define tqDebug(...) \ #define tqDebug(...) \
{ \ { \
if (tqDebugFlag & DEBUG_DEBUG) { \ if (tqDebugFlag & DEBUG_DEBUG) { \
taosPrintLog("TQ ", DEBUG_DEBUG, tqDebugFlag, __VA_ARGS__); \ taosPrintLog("TQ ", DEBUG_DEBUG, tqDebugFlag, __VA_ARGS__); \
} \ } \
} }
#define tqTrace(...) \ #define tqTrace(...) \
{ \ { \
if (tqDebugFlag & DEBUG_TRACE) { \ if (tqDebugFlag & DEBUG_TRACE) { \
...@@ -138,9 +143,7 @@ typedef struct { ...@@ -138,9 +143,7 @@ typedef struct {
// topics that are not connectted // topics that are not connectted
STqMetaList* unconnectTopic; STqMetaList* unconnectTopic;
// TODO:temporaral use, to be replaced by unified tfile
TdFilePtr pFile; TdFilePtr pFile;
// TODO:temporaral use, to be replaced by unified tfile
TdFilePtr pIdxFile; TdFilePtr pIdxFile;
char* dirPath; char* dirPath;
...@@ -157,6 +160,7 @@ struct STQ { ...@@ -157,6 +160,7 @@ struct STQ {
STqCfg* tqConfig; STqCfg* tqConfig;
STqMemRef tqMemRef; STqMemRef tqMemRef;
STqMetaStore* tqMeta; STqMetaStore* tqMeta;
STqPushMgr* tqPushMgr;
SWal* pWal; SWal* pWal;
SMeta* pVnodeMeta; SMeta* pVnodeMeta;
}; };
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TQ_PUSH_H_
#define _TQ_PUSH_H_
#include "thash.h"
#include "trpc.h"
#include "ttimer.h"
#ifdef __cplusplus
extern "C" {
#endif
enum {
TQ_PUSHER_TYPE__CLIENT = 1,
TQ_PUSHER_TYPE__STREAM,
};
typedef struct {
int8_t type;
int8_t reserved[3];
int32_t ttl;
int64_t consumerId;
SRpcMsg* pMsg;
// SMqPollRsp* rsp;
} STqClientPusher;
typedef struct {
int8_t type;
int8_t nodeType;
int8_t reserved[6];
int64_t streamId;
SEpSet epSet;
} STqStreamPusher;
typedef struct {
int8_t type; // mq or stream
} STqPusher;
typedef struct {
SHashObj* pHash; // <id, STqPush*>
} STqPushMgr;
typedef struct {
int8_t inited;
tmr_h timer;
} STqPushMgmt;
static STqPushMgmt tqPushMgmt;
int32_t tqPushMgrInit();
void tqPushMgrCleanUp();
STqPushMgr* tqPushMgrOpen();
void tqPushMgrClose(STqPushMgr* pushMgr);
STqClientPusher* tqAddClientPusher(STqPushMgr* pushMgr, SRpcMsg* pMsg, int64_t consumerId, int64_t ttl);
STqStreamPusher* tqAddStreamPusher(STqPushMgr* pushMgr, int64_t streamId, SEpSet* pEpSet);
#ifdef __cplusplus
}
#endif
#endif /*_TQ_PUSH_H_*/
...@@ -12,28 +12,16 @@ ...@@ -12,28 +12,16 @@
* You should have received a copy of the GNU Affero General Public License * You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#define _DEFAULT_SOURCE
#include "tcompare.h" #include "tcompare.h"
#include "tqInt.h" #include "tqInt.h"
#include "tqMetaStore.h" #include "tqMetaStore.h"
int tqInit() { int32_t tqInit() { return tqPushMgrInit(); }
int8_t old = atomic_val_compare_exchange_8(&tqMgmt.inited, 0, 1);
if (old == 1) return 0;
tqMgmt.timer = taosTmrInit(0, 0, 0, "TQ"); void tqCleanUp() { tqPushMgrCleanUp(); }
return 0;
}
void tqCleanUp() {
int8_t old = atomic_val_compare_exchange_8(&tqMgmt.inited, 1, 0);
if (old == 0) return;
taosTmrStop(tqMgmt.timer);
taosTmrCleanUp(tqMgmt.timer);
}
STQ* tqOpen(const char* path, SWal* pWal, SMeta* pMeta, STqCfg* tqConfig, SMemAllocatorFactory* allocFac) { STQ* tqOpen(const char* path, SWal* pWal, SMeta* pVnodeMeta, STqCfg* tqConfig, SMemAllocatorFactory* allocFac) {
STQ* pTq = malloc(sizeof(STQ)); STQ* pTq = malloc(sizeof(STQ));
if (pTq == NULL) { if (pTq == NULL) {
terrno = TSDB_CODE_TQ_OUT_OF_MEMORY; terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
...@@ -42,7 +30,7 @@ STQ* tqOpen(const char* path, SWal* pWal, SMeta* pMeta, STqCfg* tqConfig, SMemAl ...@@ -42,7 +30,7 @@ STQ* tqOpen(const char* path, SWal* pWal, SMeta* pMeta, STqCfg* tqConfig, SMemAl
pTq->path = strdup(path); pTq->path = strdup(path);
pTq->tqConfig = tqConfig; pTq->tqConfig = tqConfig;
pTq->pWal = pWal; pTq->pWal = pWal;
pTq->pVnodeMeta = pMeta; pTq->pVnodeMeta = pVnodeMeta;
#if 0 #if 0
pTq->tqMemRef.pAllocatorFactory = allocFac; pTq->tqMemRef.pAllocatorFactory = allocFac;
pTq->tqMemRef.pAllocator = allocFac->create(allocFac); pTq->tqMemRef.pAllocator = allocFac->create(allocFac);
...@@ -60,6 +48,13 @@ STQ* tqOpen(const char* path, SWal* pWal, SMeta* pMeta, STqCfg* tqConfig, SMemAl ...@@ -60,6 +48,13 @@ STQ* tqOpen(const char* path, SWal* pWal, SMeta* pMeta, STqCfg* tqConfig, SMemAl
return NULL; return NULL;
} }
pTq->tqPushMgr = tqPushMgrOpen();
if (pTq->tqPushMgr == NULL) {
// free store
free(pTq);
return NULL;
}
return pTq; return pTq;
} }
...@@ -72,6 +67,8 @@ void tqClose(STQ* pTq) { ...@@ -72,6 +67,8 @@ void tqClose(STQ* pTq) {
} }
int tqPushMsg(STQ* pTq, void* msg, tmsg_t msgType, int64_t version) { int tqPushMsg(STQ* pTq, void* msg, tmsg_t msgType, int64_t version) {
// iterate hash
// process all msg
// if waiting // if waiting
// memcpy and send msg to fetch thread // memcpy and send msg to fetch thread
// TODO: add reference // TODO: add reference
...@@ -199,7 +196,10 @@ int32_t tqDeserializeConsumer(STQ* pTq, const STqSerializedHead* pHead, STqConsu ...@@ -199,7 +196,10 @@ int32_t tqDeserializeConsumer(STQ* pTq, const STqSerializedHead* pHead, STqConsu
for (int j = 0; j < TQ_BUFFER_SIZE; j++) { for (int j = 0; j < TQ_BUFFER_SIZE; j++) {
pTopic->buffer.output[j].status = 0; pTopic->buffer.output[j].status = 0;
STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pVnodeMeta); STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pVnodeMeta);
SReadHandle handle = {.reader = pReadHandle, .meta = pTq->pVnodeMeta}; SReadHandle handle = {
.reader = pReadHandle,
.meta = pTq->pVnodeMeta,
};
pTopic->buffer.output[j].pReadHandle = pReadHandle; pTopic->buffer.output[j].pReadHandle = pReadHandle;
pTopic->buffer.output[j].task = qCreateStreamExecTaskInfo(pTopic->qmsg, &handle); pTopic->buffer.output[j].task = qCreateStreamExecTaskInfo(pTopic->qmsg, &handle);
} }
...@@ -208,11 +208,11 @@ int32_t tqDeserializeConsumer(STQ* pTq, const STqSerializedHead* pHead, STqConsu ...@@ -208,11 +208,11 @@ int32_t tqDeserializeConsumer(STQ* pTq, const STqSerializedHead* pHead, STqConsu
return 0; return 0;
} }
int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
SMqConsumeReq* pReq = pMsg->pCont; SMqPollReq* pReq = pMsg->pCont;
int64_t consumerId = pReq->consumerId; int64_t consumerId = pReq->consumerId;
int64_t fetchOffset; int64_t fetchOffset;
int64_t blockingTime = pReq->blockingTime; int64_t blockingTime = pReq->blockingTime;
if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__EARLIEAST) { if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__EARLIEAST) {
fetchOffset = 0; fetchOffset = 0;
...@@ -222,7 +222,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -222,7 +222,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
fetchOffset = pReq->currentOffset + 1; fetchOffset = pReq->currentOffset + 1;
} }
SMqConsumeRsp rsp = { SMqPollRsp rsp = {
.consumerId = consumerId, .consumerId = consumerId,
.numOfTopics = 0, .numOfTopics = 0,
.pBlockData = NULL, .pBlockData = NULL,
...@@ -236,6 +236,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -236,6 +236,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
rpcSendResponse(pMsg); rpcSendResponse(pMsg);
return 0; return 0;
} }
int sz = taosArrayGetSize(pConsumer->topics); int sz = taosArrayGetSize(pConsumer->topics);
ASSERT(sz == 1); ASSERT(sz == 1);
STqTopic* pTopic = taosArrayGet(pConsumer->topics, 0); STqTopic* pTopic = taosArrayGet(pConsumer->topics, 0);
...@@ -247,13 +248,14 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -247,13 +248,14 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
SWalHead* pHead; SWalHead* pHead;
while (1) { while (1) {
int8_t pos = fetchOffset % TQ_BUFFER_SIZE; /*if (fetchOffset > walGetLastVer(pTq->pWal) || walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) {*/
if (walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) { if (walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) {
// TODO: no more log, set timer to wait blocking time // TODO: no more log, set timer to wait blocking time
// if data inserted during waiting, launch query and // if data inserted during waiting, launch query and
// response to user // response to user
break; break;
} }
int8_t pos = fetchOffset % TQ_BUFFER_SIZE;
pHead = pTopic->pReadhandle->pHead; pHead = pTopic->pReadhandle->pHead;
if (pHead->head.msgType == TDMT_VND_SUBMIT) { if (pHead->head.msgType == TDMT_VND_SUBMIT) {
SSubmitReq* pCont = (SSubmitReq*)&pHead->head.body; SSubmitReq* pCont = (SSubmitReq*)&pHead->head.body;
...@@ -280,7 +282,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -280,7 +282,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
rsp.numOfTopics = 1; rsp.numOfTopics = 1;
rsp.pBlockData = pRes; rsp.pBlockData = pRes;
int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqConsumeRsp(NULL, &rsp); int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqPollRsp(NULL, &rsp);
void* buf = rpcMallocCont(tlen); void* buf = rpcMallocCont(tlen);
if (buf == NULL) { if (buf == NULL) {
pMsg->code = -1; pMsg->code = -1;
...@@ -290,7 +292,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -290,7 +292,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
((SMqRspHead*)buf)->epoch = pReq->epoch; ((SMqRspHead*)buf)->epoch = pReq->epoch;
void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead)); void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
tEncodeSMqConsumeRsp(&abuf, &rsp); tEncodeSMqPollRsp(&abuf, &rsp);
taosArrayDestroyEx(rsp.pBlockData, (void (*)(void*))tDeleteSSDataBlock); taosArrayDestroyEx(rsp.pBlockData, (void (*)(void*))tDeleteSSDataBlock);
pMsg->pCont = buf; pMsg->pCont = buf;
pMsg->contLen = tlen; pMsg->contLen = tlen;
...@@ -304,7 +306,10 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -304,7 +306,10 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
} }
} }
int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqConsumeRsp(NULL, &rsp); /*if (blockingTime != 0) {*/
/*tqAddClientPusher(pTq->tqPushMgr, pMsg, consumerId, blockingTime);*/
/*} else {*/
int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqPollRsp(NULL, &rsp);
void* buf = rpcMallocCont(tlen); void* buf = rpcMallocCont(tlen);
if (buf == NULL) { if (buf == NULL) {
pMsg->code = -1; pMsg->code = -1;
...@@ -314,12 +319,14 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -314,12 +319,14 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
((SMqRspHead*)buf)->epoch = pReq->epoch; ((SMqRspHead*)buf)->epoch = pReq->epoch;
void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead)); void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
tEncodeSMqConsumeRsp(&abuf, &rsp); tEncodeSMqPollRsp(&abuf, &rsp);
rsp.pBlockData = NULL; rsp.pBlockData = NULL;
pMsg->pCont = buf; pMsg->pCont = buf;
pMsg->contLen = tlen; pMsg->contLen = tlen;
pMsg->code = 0; pMsg->code = 0;
rpcSendResponse(pMsg); rpcSendResponse(pMsg);
/*}*/
return 0; return 0;
} }
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tqPush.h"
int32_t tqPushMgrInit() {
//
int8_t old = atomic_val_compare_exchange_8(&tqPushMgmt.inited, 0, 1);
if (old == 1) return 0;
tqPushMgmt.timer = taosTmrInit(0, 0, 0, "TQ");
return 0;
}
void tqPushMgrCleanUp() {
int8_t old = atomic_val_compare_exchange_8(&tqPushMgmt.inited, 1, 0);
if (old == 0) return;
taosTmrStop(tqPushMgmt.timer);
taosTmrCleanUp(tqPushMgmt.timer);
}
STqPushMgr* tqPushMgrOpen() {
STqPushMgr* mgr = malloc(sizeof(STqPushMgr));
if (mgr == NULL) {
return NULL;
}
mgr->pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
return mgr;
}
void tqPushMgrClose(STqPushMgr* pushMgr) {
taosHashCleanup(pushMgr->pHash);
free(pushMgr);
}
STqClientPusher* tqAddClientPusher(STqPushMgr* pushMgr, SRpcMsg* pMsg, int64_t consumerId, int64_t ttl) {
STqClientPusher* clientPusher = malloc(sizeof(STqClientPusher));
if (clientPusher == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
clientPusher->type = TQ_PUSHER_TYPE__CLIENT;
clientPusher->pMsg = pMsg;
clientPusher->consumerId = consumerId;
clientPusher->ttl = ttl;
if (taosHashPut(pushMgr->pHash, &consumerId, sizeof(int64_t), &clientPusher, sizeof(void*)) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
free(clientPusher);
// TODO send rsp back
return NULL;
}
return clientPusher;
}
STqStreamPusher* tqAddStreamPusher(STqPushMgr* pushMgr, int64_t streamId, SEpSet* pEpSet) {
STqStreamPusher* streamPusher = malloc(sizeof(STqStreamPusher));
if (streamPusher == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
streamPusher->type = TQ_PUSHER_TYPE__STREAM;
streamPusher->nodeType = 0;
streamPusher->streamId = streamId;
memcpy(&streamPusher->epSet, pEpSet, sizeof(SEpSet));
if (taosHashPut(pushMgr->pHash, &streamId, sizeof(int64_t), &streamPusher, sizeof(void*)) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
free(streamPusher);
return NULL;
}
return streamPusher;
}
...@@ -29,7 +29,7 @@ int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { ...@@ -29,7 +29,7 @@ int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
SReadHandle handle = {.reader = pVnode->pTsdb, .meta = pVnode->pMeta}; SReadHandle handle = {.reader = pVnode->pTsdb, .meta = pVnode->pMeta};
switch (pMsg->msgType) { switch (pMsg->msgType) {
case TDMT_VND_QUERY:{ case TDMT_VND_QUERY: {
return qWorkerProcessQueryMsg(&handle, pVnode->pQuery, pMsg); return qWorkerProcessQueryMsg(&handle, pVnode->pQuery, pMsg);
} }
case TDMT_VND_QUERY_CONTINUE: case TDMT_VND_QUERY_CONTINUE:
...@@ -63,7 +63,7 @@ int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg) { ...@@ -63,7 +63,7 @@ int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg) {
case TDMT_VND_TABLE_META: case TDMT_VND_TABLE_META:
return vnodeGetTableMeta(pVnode, pMsg); return vnodeGetTableMeta(pVnode, pMsg);
case TDMT_VND_CONSUME: case TDMT_VND_CONSUME:
return tqProcessConsumeReq(pVnode->pTq, pMsg); return tqProcessPollReq(pVnode->pTq, pMsg);
default: default:
vError("unknown msg type:%d in fetch queue", pMsg->msgType); vError("unknown msg type:%d in fetch queue", pMsg->msgType);
return TSDB_CODE_VND_APP_ERROR; return TSDB_CODE_VND_APP_ERROR;
...@@ -71,8 +71,8 @@ int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg) { ...@@ -71,8 +71,8 @@ int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg) {
} }
static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) { static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) {
STbCfg * pTbCfg = NULL; STbCfg *pTbCfg = NULL;
STbCfg * pStbCfg = NULL; STbCfg *pStbCfg = NULL;
tb_uid_t uid; tb_uid_t uid;
int32_t nCols; int32_t nCols;
int32_t nTagCols; int32_t nTagCols;
...@@ -204,9 +204,9 @@ static void freeItemHelper(void *pItem) { ...@@ -204,9 +204,9 @@ static void freeItemHelper(void *pItem) {
*/ */
static int32_t vnodeGetTableList(SVnode *pVnode, SRpcMsg *pMsg) { static int32_t vnodeGetTableList(SVnode *pVnode, SRpcMsg *pMsg) {
SMTbCursor *pCur = metaOpenTbCursor(pVnode->pMeta); SMTbCursor *pCur = metaOpenTbCursor(pVnode->pMeta);
SArray * pArray = taosArrayInit(10, POINTER_BYTES); SArray *pArray = taosArrayInit(10, POINTER_BYTES);
char * name = NULL; char *name = NULL;
int32_t totalLen = 0; int32_t totalLen = 0;
int32_t numOfTables = 0; int32_t numOfTables = 0;
while ((name = metaTbCursorNext(pCur)) != NULL) { while ((name = metaTbCursorNext(pCur)) != NULL) {
......
...@@ -845,7 +845,7 @@ int32_t ctgGetTableMetaFromVnode(SCatalog* pCtg, void *pTrans, const SEpSet* pMg ...@@ -845,7 +845,7 @@ int32_t ctgGetTableMetaFromVnode(SCatalog* pCtg, void *pTrans, const SEpSet* pMg
}; };
SRpcMsg rpcRsp = {0}; SRpcMsg rpcRsp = {0};
rpcSendRecv(pTrans, &vgroupInfo->epset, &rpcMsg, &rpcRsp); rpcSendRecv(pTrans, &vgroupInfo->epSet, &rpcMsg, &rpcRsp);
if (TSDB_CODE_SUCCESS != rpcRsp.code) { if (TSDB_CODE_SUCCESS != rpcRsp.code) {
if (CTG_TABLE_NOT_EXIST(rpcRsp.code)) { if (CTG_TABLE_NOT_EXIST(rpcRsp.code)) {
......
...@@ -228,10 +228,10 @@ void ctgTestBuildDBVgroup(SDBVgInfo **pdbVgroup) { ...@@ -228,10 +228,10 @@ void ctgTestBuildDBVgroup(SDBVgInfo **pdbVgroup) {
vgInfo.vgId = i + 1; vgInfo.vgId = i + 1;
vgInfo.hashBegin = i * hashUnit; vgInfo.hashBegin = i * hashUnit;
vgInfo.hashEnd = hashUnit * (i + 1) - 1; vgInfo.hashEnd = hashUnit * (i + 1) - 1;
vgInfo.epset.numOfEps = i % TSDB_MAX_REPLICA + 1; vgInfo.epSet.numOfEps = i % TSDB_MAX_REPLICA + 1;
vgInfo.epset.inUse = i % vgInfo.epset.numOfEps; vgInfo.epSet.inUse = i % vgInfo.epSet.numOfEps;
for (int32_t n = 0; n < vgInfo.epset.numOfEps; ++n) { for (int32_t n = 0; n < vgInfo.epSet.numOfEps; ++n) {
SEp *addr = &vgInfo.epset.eps[n]; SEp *addr = &vgInfo.epSet.eps[n];
strcpy(addr->fqdn, "a0"); strcpy(addr->fqdn, "a0");
addr->port = n + 22; addr->port = n + 22;
} }
...@@ -301,10 +301,10 @@ void ctgTestRspDbVgroups(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg * ...@@ -301,10 +301,10 @@ void ctgTestRspDbVgroups(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *
vg.hashEnd = htonl(UINT32_MAX); vg.hashEnd = htonl(UINT32_MAX);
} }
vg.epset.numOfEps = i % TSDB_MAX_REPLICA + 1; vg.epSet.numOfEps = i % TSDB_MAX_REPLICA + 1;
vg.epset.inUse = i % vg.epset.numOfEps; vg.epSet.inUse = i % vg.epSet.numOfEps;
for (int32_t n = 0; n < vg.epset.numOfEps; ++n) { for (int32_t n = 0; n < vg.epSet.numOfEps; ++n) {
SEp *addr = &vg.epset.eps[n]; SEp *addr = &vg.epSet.eps[n];
strcpy(addr->fqdn, "a0"); strcpy(addr->fqdn, "a0");
addr->port = n + 22; addr->port = n + 22;
} }
...@@ -877,7 +877,7 @@ TEST(tableMeta, normalTable) { ...@@ -877,7 +877,7 @@ TEST(tableMeta, normalTable) {
code = catalogGetTableHashVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgInfo); code = catalogGetTableHashVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgInfo);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
ASSERT_EQ(vgInfo.vgId, 8); ASSERT_EQ(vgInfo.vgId, 8);
ASSERT_EQ(vgInfo.epset.numOfEps, 3); ASSERT_EQ(vgInfo.epSet.numOfEps, 3);
while (0 == ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_DB_NUM)) { while (0 == ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_DB_NUM)) {
usleep(50000); usleep(50000);
...@@ -1384,7 +1384,7 @@ TEST(refreshGetMeta, normal2normal) { ...@@ -1384,7 +1384,7 @@ TEST(refreshGetMeta, normal2normal) {
code = catalogGetTableHashVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgInfo); code = catalogGetTableHashVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgInfo);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
ASSERT_EQ(vgInfo.vgId, 8); ASSERT_EQ(vgInfo.vgId, 8);
ASSERT_EQ(vgInfo.epset.numOfEps, 3); ASSERT_EQ(vgInfo.epSet.numOfEps, 3);
while (true) { while (true) {
uint64_t n = 0; uint64_t n = 0;
...@@ -1463,7 +1463,7 @@ TEST(refreshGetMeta, normal2notexist) { ...@@ -1463,7 +1463,7 @@ TEST(refreshGetMeta, normal2notexist) {
code = catalogGetTableHashVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgInfo); code = catalogGetTableHashVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgInfo);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
ASSERT_EQ(vgInfo.vgId, 8); ASSERT_EQ(vgInfo.vgId, 8);
ASSERT_EQ(vgInfo.epset.numOfEps, 3); ASSERT_EQ(vgInfo.epSet.numOfEps, 3);
while (true) { while (true) {
uint64_t n = 0; uint64_t n = 0;
...@@ -1537,7 +1537,7 @@ TEST(refreshGetMeta, normal2child) { ...@@ -1537,7 +1537,7 @@ TEST(refreshGetMeta, normal2child) {
code = catalogGetTableHashVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgInfo); code = catalogGetTableHashVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgInfo);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
ASSERT_EQ(vgInfo.vgId, 8); ASSERT_EQ(vgInfo.vgId, 8);
ASSERT_EQ(vgInfo.epset.numOfEps, 3); ASSERT_EQ(vgInfo.epSet.numOfEps, 3);
while (true) { while (true) {
uint64_t n = 0; uint64_t n = 0;
...@@ -1621,7 +1621,7 @@ TEST(refreshGetMeta, stable2child) { ...@@ -1621,7 +1621,7 @@ TEST(refreshGetMeta, stable2child) {
code = catalogGetTableHashVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgInfo); code = catalogGetTableHashVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgInfo);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
ASSERT_EQ(vgInfo.vgId, 8); ASSERT_EQ(vgInfo.vgId, 8);
ASSERT_EQ(vgInfo.epset.numOfEps, 3); ASSERT_EQ(vgInfo.epSet.numOfEps, 3);
while (true) { while (true) {
uint64_t n = 0; uint64_t n = 0;
...@@ -1706,7 +1706,7 @@ TEST(refreshGetMeta, stable2stable) { ...@@ -1706,7 +1706,7 @@ TEST(refreshGetMeta, stable2stable) {
code = catalogGetTableHashVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgInfo); code = catalogGetTableHashVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgInfo);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
ASSERT_EQ(vgInfo.vgId, 8); ASSERT_EQ(vgInfo.vgId, 8);
ASSERT_EQ(vgInfo.epset.numOfEps, 3); ASSERT_EQ(vgInfo.epSet.numOfEps, 3);
while (true) { while (true) {
uint64_t n = 0; uint64_t n = 0;
...@@ -1794,7 +1794,7 @@ TEST(refreshGetMeta, child2stable) { ...@@ -1794,7 +1794,7 @@ TEST(refreshGetMeta, child2stable) {
code = catalogGetTableHashVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgInfo); code = catalogGetTableHashVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgInfo);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
ASSERT_EQ(vgInfo.vgId, 8); ASSERT_EQ(vgInfo.vgId, 8);
ASSERT_EQ(vgInfo.epset.numOfEps, 3); ASSERT_EQ(vgInfo.epSet.numOfEps, 3);
while (true) { while (true) {
uint64_t n = 0; uint64_t n = 0;
...@@ -1879,7 +1879,7 @@ TEST(tableDistVgroup, normalTable) { ...@@ -1879,7 +1879,7 @@ TEST(tableDistVgroup, normalTable) {
ASSERT_EQ(taosArrayGetSize((const SArray *)vgList), 1); ASSERT_EQ(taosArrayGetSize((const SArray *)vgList), 1);
vgInfo = (SVgroupInfo *)taosArrayGet(vgList, 0); vgInfo = (SVgroupInfo *)taosArrayGet(vgList, 0);
ASSERT_EQ(vgInfo->vgId, 8); ASSERT_EQ(vgInfo->vgId, 8);
ASSERT_EQ(vgInfo->epset.numOfEps, 3); ASSERT_EQ(vgInfo->epSet.numOfEps, 3);
catalogDestroy(); catalogDestroy();
memset(&gCtgMgmt, 0, sizeof(gCtgMgmt)); memset(&gCtgMgmt, 0, sizeof(gCtgMgmt));
...@@ -1921,7 +1921,7 @@ TEST(tableDistVgroup, childTableCase) { ...@@ -1921,7 +1921,7 @@ TEST(tableDistVgroup, childTableCase) {
ASSERT_EQ(taosArrayGetSize((const SArray *)vgList), 1); ASSERT_EQ(taosArrayGetSize((const SArray *)vgList), 1);
vgInfo = (SVgroupInfo *)taosArrayGet(vgList, 0); vgInfo = (SVgroupInfo *)taosArrayGet(vgList, 0);
ASSERT_EQ(vgInfo->vgId, 9); ASSERT_EQ(vgInfo->vgId, 9);
ASSERT_EQ(vgInfo->epset.numOfEps, 4); ASSERT_EQ(vgInfo->epSet.numOfEps, 4);
catalogDestroy(); catalogDestroy();
memset(&gCtgMgmt, 0, sizeof(gCtgMgmt)); memset(&gCtgMgmt, 0, sizeof(gCtgMgmt));
...@@ -1964,13 +1964,13 @@ TEST(tableDistVgroup, superTableCase) { ...@@ -1964,13 +1964,13 @@ TEST(tableDistVgroup, superTableCase) {
ASSERT_EQ(taosArrayGetSize((const SArray *)vgList), 10); ASSERT_EQ(taosArrayGetSize((const SArray *)vgList), 10);
vgInfo = (SVgroupInfo *)taosArrayGet(vgList, 0); vgInfo = (SVgroupInfo *)taosArrayGet(vgList, 0);
ASSERT_EQ(vgInfo->vgId, 1); ASSERT_EQ(vgInfo->vgId, 1);
ASSERT_EQ(vgInfo->epset.numOfEps, 1); ASSERT_EQ(vgInfo->epSet.numOfEps, 1);
vgInfo = (SVgroupInfo *)taosArrayGet(vgList, 1); vgInfo = (SVgroupInfo *)taosArrayGet(vgList, 1);
ASSERT_EQ(vgInfo->vgId, 2); ASSERT_EQ(vgInfo->vgId, 2);
ASSERT_EQ(vgInfo->epset.numOfEps, 2); ASSERT_EQ(vgInfo->epSet.numOfEps, 2);
vgInfo = (SVgroupInfo *)taosArrayGet(vgList, 2); vgInfo = (SVgroupInfo *)taosArrayGet(vgList, 2);
ASSERT_EQ(vgInfo->vgId, 3); ASSERT_EQ(vgInfo->vgId, 3);
ASSERT_EQ(vgInfo->epset.numOfEps, 3); ASSERT_EQ(vgInfo->epSet.numOfEps, 3);
catalogDestroy(); catalogDestroy();
memset(&gCtgMgmt, 0, sizeof(gCtgMgmt)); memset(&gCtgMgmt, 0, sizeof(gCtgMgmt));
...@@ -2025,14 +2025,14 @@ TEST(dbVgroup, getSetDbVgroupCase) { ...@@ -2025,14 +2025,14 @@ TEST(dbVgroup, getSetDbVgroupCase) {
code = catalogGetTableHashVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgInfo); code = catalogGetTableHashVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgInfo);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
ASSERT_EQ(vgInfo.vgId, 8); ASSERT_EQ(vgInfo.vgId, 8);
ASSERT_EQ(vgInfo.epset.numOfEps, 3); ASSERT_EQ(vgInfo.epSet.numOfEps, 3);
code = catalogGetTableDistVgInfo(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgList); code = catalogGetTableDistVgInfo(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgList);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
ASSERT_EQ(taosArrayGetSize((const SArray *)vgList), 1); ASSERT_EQ(taosArrayGetSize((const SArray *)vgList), 1);
pvgInfo = (SVgroupInfo *)taosArrayGet(vgList, 0); pvgInfo = (SVgroupInfo *)taosArrayGet(vgList, 0);
ASSERT_EQ(pvgInfo->vgId, 8); ASSERT_EQ(pvgInfo->vgId, 8);
ASSERT_EQ(pvgInfo->epset.numOfEps, 3); ASSERT_EQ(pvgInfo->epSet.numOfEps, 3);
taosArrayDestroy(vgList); taosArrayDestroy(vgList);
ctgTestBuildDBVgroup(&dbVgroup); ctgTestBuildDBVgroup(&dbVgroup);
...@@ -2053,14 +2053,14 @@ TEST(dbVgroup, getSetDbVgroupCase) { ...@@ -2053,14 +2053,14 @@ TEST(dbVgroup, getSetDbVgroupCase) {
code = catalogGetTableHashVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgInfo); code = catalogGetTableHashVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgInfo);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
ASSERT_EQ(vgInfo.vgId, 7); ASSERT_EQ(vgInfo.vgId, 7);
ASSERT_EQ(vgInfo.epset.numOfEps, 2); ASSERT_EQ(vgInfo.epSet.numOfEps, 2);
code = catalogGetTableDistVgInfo(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgList); code = catalogGetTableDistVgInfo(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgList);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
ASSERT_EQ(taosArrayGetSize((const SArray *)vgList), 1); ASSERT_EQ(taosArrayGetSize((const SArray *)vgList), 1);
pvgInfo = (SVgroupInfo *)taosArrayGet(vgList, 0); pvgInfo = (SVgroupInfo *)taosArrayGet(vgList, 0);
ASSERT_EQ(pvgInfo->vgId, 8); ASSERT_EQ(pvgInfo->vgId, 8);
ASSERT_EQ(pvgInfo->epset.numOfEps, 3); ASSERT_EQ(pvgInfo->epSet.numOfEps, 3);
taosArrayDestroy(vgList); taosArrayDestroy(vgList);
catalogDestroy(); catalogDestroy();
......
...@@ -4977,7 +4977,7 @@ static int32_t doSendFetchDataRequest(SExchangeInfo *pExchangeInfo, SExecTaskInf ...@@ -4977,7 +4977,7 @@ static int32_t doSendFetchDataRequest(SExchangeInfo *pExchangeInfo, SExecTaskInf
SSourceDataInfo *pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, sourceIndex); SSourceDataInfo *pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, sourceIndex);
qDebug("%s build fetch msg and send to vgId:%d, ep:%s, taskId:0x%" PRIx64 ", %d/%" PRIzu, qDebug("%s build fetch msg and send to vgId:%d, ep:%s, taskId:0x%" PRIx64 ", %d/%" PRIzu,
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->addr.epset.eps[0].fqdn, pSource->taskId, sourceIndex, totalSources); GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->addr.epSet.eps[0].fqdn, pSource->taskId, sourceIndex, totalSources);
pMsg->header.vgId = htonl(pSource->addr.nodeId); pMsg->header.vgId = htonl(pSource->addr.nodeId);
pMsg->sId = htobe64(pSource->schedId); pMsg->sId = htobe64(pSource->schedId);
...@@ -5000,7 +5000,7 @@ static int32_t doSendFetchDataRequest(SExchangeInfo *pExchangeInfo, SExecTaskInf ...@@ -5000,7 +5000,7 @@ static int32_t doSendFetchDataRequest(SExchangeInfo *pExchangeInfo, SExecTaskInf
pMsgSendInfo->fp = loadRemoteDataCallback; pMsgSendInfo->fp = loadRemoteDataCallback;
int64_t transporterId = 0; int64_t transporterId = 0;
int32_t code = asyncSendMsgToServer(pExchangeInfo->pTransporter, &pSource->addr.epset, &transporterId, pMsgSendInfo); int32_t code = asyncSendMsgToServer(pExchangeInfo->pTransporter, &pSource->addr.epSet, &transporterId, pMsgSendInfo);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -58,7 +58,7 @@ static int32_t setShowInfo(SShowInfo* pShowInfo, SParseContext* pCtx, void** out ...@@ -58,7 +58,7 @@ static int32_t setShowInfo(SShowInfo* pShowInfo, SParseContext* pCtx, void** out
SVgroupInfo* info = taosArrayGet(array, 0); SVgroupInfo* info = taosArrayGet(array, 0);
pShowReq->head.vgId = htonl(info->vgId); pShowReq->head.vgId = htonl(info->vgId);
*pEpSet = info->epset; *pEpSet = info->epSet;
*outputLen = sizeof(SVShowTablesReq); *outputLen = sizeof(SVShowTablesReq);
*output = pShowReq; *output = pShowReq;
...@@ -902,4 +902,4 @@ SVnodeModifOpStmtInfo* qParserValidateCreateTbSqlNode(SSqlInfo* pInfo, SParseCon ...@@ -902,4 +902,4 @@ SVnodeModifOpStmtInfo* qParserValidateCreateTbSqlNode(SSqlInfo* pInfo, SParseCon
} }
return pModifSqlStmt; return pModifSqlStmt;
} }
\ No newline at end of file
...@@ -43,11 +43,11 @@ public: ...@@ -43,11 +43,11 @@ public:
SVgroupInfo vgroup = {.vgId = vgid, .hashBegin = 0, .hashEnd = 0, }; SVgroupInfo vgroup = {.vgId = vgid, .hashBegin = 0, .hashEnd = 0, };
vgroup.epset.eps[0] = (SEp){"dnode_1", 6030}; vgroup.epSet.eps[0] = (SEp){"dnode_1", 6030};
vgroup.epset.eps[1] = (SEp){"dnode_2", 6030}; vgroup.epSet.eps[1] = (SEp){"dnode_2", 6030};
vgroup.epset.eps[2] = (SEp){"dnode_3", 6030}; vgroup.epSet.eps[2] = (SEp){"dnode_3", 6030};
vgroup.epset.inUse = 0; vgroup.epSet.inUse = 0;
vgroup.epset.numOfEps = 3; vgroup.epSet.numOfEps = 3;
meta_->vgs.emplace_back(vgroup); meta_->vgs.emplace_back(vgroup);
return *this; return *this;
...@@ -122,7 +122,7 @@ public: ...@@ -122,7 +122,7 @@ public:
int32_t catalogGetTableHashVgroup(const SName* pTableName, SVgroupInfo* vgInfo) const { int32_t catalogGetTableHashVgroup(const SName* pTableName, SVgroupInfo* vgInfo) const {
// todo // todo
vgInfo->vgId = 1; vgInfo->vgId = 1;
addEpIntoEpSet(&vgInfo->epset, "node1", 6030); addEpIntoEpSet(&vgInfo->epSet, "node1", 6030);
return 0; return 0;
} }
...@@ -143,10 +143,10 @@ public: ...@@ -143,10 +143,10 @@ public:
meta_[db][tbname]->schema->uid = id_++; meta_[db][tbname]->schema->uid = id_++;
SVgroupInfo vgroup = {.vgId = vgid, .hashBegin = 0, .hashEnd = 0,}; SVgroupInfo vgroup = {.vgId = vgid, .hashBegin = 0, .hashEnd = 0,};
addEpIntoEpSet(&vgroup.epset, "dnode_1", 6030); addEpIntoEpSet(&vgroup.epSet, "dnode_1", 6030);
addEpIntoEpSet(&vgroup.epset, "dnode_2", 6030); addEpIntoEpSet(&vgroup.epSet, "dnode_2", 6030);
addEpIntoEpSet(&vgroup.epset, "dnode_3", 6030); addEpIntoEpSet(&vgroup.epSet, "dnode_3", 6030);
vgroup.epset.inUse = 0; vgroup.epSet.inUse = 0;
meta_[db][tbname]->vgs.emplace_back(vgroup); meta_[db][tbname]->vgs.emplace_back(vgroup);
// super table // super table
...@@ -313,4 +313,4 @@ int32_t MockCatalogService::catalogGetTableMeta(const SName* pTableName, STableM ...@@ -313,4 +313,4 @@ int32_t MockCatalogService::catalogGetTableMeta(const SName* pTableName, STableM
int32_t MockCatalogService::catalogGetTableHashVgroup(const SName* pTableName, SVgroupInfo* vgInfo) const { int32_t MockCatalogService::catalogGetTableHashVgroup(const SName* pTableName, SVgroupInfo* vgInfo) const {
return impl_->catalogGetTableHashVgroup(pTableName, vgInfo); return impl_->catalogGetTableHashVgroup(pTableName, vgInfo);
} }
\ No newline at end of file
...@@ -254,7 +254,7 @@ static SSubplan* initSubplan(SPlanContext* pCxt, int32_t type) { ...@@ -254,7 +254,7 @@ static SSubplan* initSubplan(SPlanContext* pCxt, int32_t type) {
static void vgroupInfoToNodeAddr(const SVgroupInfo* vg, SQueryNodeAddr* pNodeAddr) { static void vgroupInfoToNodeAddr(const SVgroupInfo* vg, SQueryNodeAddr* pNodeAddr) {
pNodeAddr->nodeId = vg->vgId; pNodeAddr->nodeId = vg->vgId;
pNodeAddr->epset = vg->epset; pNodeAddr->epSet = vg->epSet;
} }
static uint64_t splitSubplanByTable(SPlanContext* pCxt, SQueryPlanNode* pPlanNode, SQueryTableInfo* pTableInfo) { static uint64_t splitSubplanByTable(SPlanContext* pCxt, SQueryPlanNode* pPlanNode, SQueryTableInfo* pTableInfo) {
...@@ -363,7 +363,7 @@ static void splitModificationOpSubPlan(SPlanContext* pCxt, SQueryPlanNode* pPlan ...@@ -363,7 +363,7 @@ static void splitModificationOpSubPlan(SPlanContext* pCxt, SQueryPlanNode* pPlan
SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_MODIFY); SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_MODIFY);
SVgDataBlocks* blocks = (SVgDataBlocks*)taosArrayGetP(pPayload->payload, i); SVgDataBlocks* blocks = (SVgDataBlocks*)taosArrayGetP(pPayload->payload, i);
subplan->execNode.epset = blocks->vg.epset; subplan->execNode.epSet = blocks->vg.epSet;
subplan->pDataSink = createDataInserter(pCxt, blocks, NULL); subplan->pDataSink = createDataInserter(pCxt, blocks, NULL);
subplan->pNode = NULL; subplan->pNode = NULL;
subplan->type = QUERY_TYPE_MODIFY; subplan->type = QUERY_TYPE_MODIFY;
......
...@@ -13,12 +13,12 @@ ...@@ -13,12 +13,12 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "plannerInt.h"
#include "parser.h"
#include "cJSON.h" #include "cJSON.h"
#include "parser.h"
#include "plannerInt.h"
typedef bool (*FToJson)(const void* obj, cJSON* json); typedef bool (*FToJson)(const void* obj, cJSON* json);
typedef bool (*FFromJson)(const cJSON* json, void* obj); typedef bool (*FFromJson)(const cJSON* json, void* obj);
static char* getString(const cJSON* json, const char* name) { static char* getString(const cJSON* json, const char* name) {
char* p = cJSON_GetStringValue(cJSON_GetObjectItem(json, name)); char* p = cJSON_GetStringValue(cJSON_GetObjectItem(json, name));
...@@ -30,7 +30,7 @@ static void copyString(const cJSON* json, const char* name, char* dst) { ...@@ -30,7 +30,7 @@ static void copyString(const cJSON* json, const char* name, char* dst) {
} }
static uint64_t getBigintFromString(const cJSON* json, const char* name) { static uint64_t getBigintFromString(const cJSON* json, const char* name) {
char* val = getString(json, name); char* val = getString(json, name);
uint64_t intVal = strtoul(val, NULL, 10); uint64_t intVal = strtoul(val, NULL, 10);
tfree(val); tfree(val);
...@@ -39,7 +39,7 @@ static uint64_t getBigintFromString(const cJSON* json, const char* name) { ...@@ -39,7 +39,7 @@ static uint64_t getBigintFromString(const cJSON* json, const char* name) {
static int64_t getNumber(const cJSON* json, const char* name) { static int64_t getNumber(const cJSON* json, const char* name) {
double d = cJSON_GetNumberValue(cJSON_GetObjectItem(json, name)); double d = cJSON_GetNumberValue(cJSON_GetObjectItem(json, name));
return (int64_t) d; return (int64_t)d;
} }
static bool addObject(cJSON* json, const char* name, FToJson func, const void* obj) { static bool addObject(cJSON* json, const char* name, FToJson func, const void* obj) {
...@@ -72,7 +72,8 @@ static bool fromObject(const cJSON* json, const char* name, FFromJson func, void ...@@ -72,7 +72,8 @@ static bool fromObject(const cJSON* json, const char* name, FFromJson func, void
return func(jObj, obj); return func(jObj, obj);
} }
static bool fromObjectWithAlloc(const cJSON* json, const char* name, FFromJson func, void** obj, int32_t size, bool required) { static bool fromObjectWithAlloc(const cJSON* json, const char* name, FFromJson func, void** obj, int32_t size,
bool required) {
cJSON* jObj = cJSON_GetObjectItem(json, name); cJSON* jObj = cJSON_GetObjectItem(json, name);
if (NULL == jObj) { if (NULL == jObj) {
return !required; return !required;
...@@ -85,7 +86,7 @@ static bool fromObjectWithAlloc(const cJSON* json, const char* name, FFromJson f ...@@ -85,7 +86,7 @@ static bool fromObjectWithAlloc(const cJSON* json, const char* name, FFromJson f
} }
static const char* jkPnodeType = "Type"; static const char* jkPnodeType = "Type";
static int32_t getPnodeTypeSize(cJSON* json) { static int32_t getPnodeTypeSize(cJSON* json) {
switch (getNumber(json, jkPnodeType)) { switch (getNumber(json, jkPnodeType)) {
case OP_StreamScan: case OP_StreamScan:
case OP_TableScan: case OP_TableScan:
...@@ -119,7 +120,7 @@ static bool fromPnode(const cJSON* json, const char* name, FFromJson func, void* ...@@ -119,7 +120,7 @@ static bool fromPnode(const cJSON* json, const char* name, FFromJson func, void*
static bool fromPnodeArray(const cJSON* json, const char* name, FFromJson func, SArray** array) { static bool fromPnodeArray(const cJSON* json, const char* name, FFromJson func, SArray** array) {
const cJSON* jArray = cJSON_GetObjectItem(json, name); const cJSON* jArray = cJSON_GetObjectItem(json, name);
int32_t size = (NULL == jArray ? 0 : cJSON_GetArraySize(jArray)); int32_t size = (NULL == jArray ? 0 : cJSON_GetArraySize(jArray));
if (size > 0) { if (size > 0) {
*array = taosArrayInit(size, POINTER_BYTES); *array = taosArrayInit(size, POINTER_BYTES);
if (NULL == *array) { if (NULL == *array) {
...@@ -128,7 +129,7 @@ static bool fromPnodeArray(const cJSON* json, const char* name, FFromJson func, ...@@ -128,7 +129,7 @@ static bool fromPnodeArray(const cJSON* json, const char* name, FFromJson func,
} }
for (int32_t i = 0; i < size; ++i) { for (int32_t i = 0; i < size; ++i) {
cJSON* jItem = cJSON_GetArrayItem(jArray, i); cJSON* jItem = cJSON_GetArrayItem(jArray, i);
void* item = calloc(1, getPnodeTypeSize(jItem)); void* item = calloc(1, getPnodeTypeSize(jItem));
if (NULL == item || !func(jItem, item)) { if (NULL == item || !func(jItem, item)) {
return false; return false;
} }
...@@ -161,9 +162,10 @@ static bool addArray(cJSON* json, const char* name, FToJson func, const SArray* ...@@ -161,9 +162,10 @@ static bool addArray(cJSON* json, const char* name, FToJson func, const SArray*
return addTarray(json, name, func, array, true); return addTarray(json, name, func, array, true);
} }
static bool fromTarray(const cJSON* json, const char* name, FFromJson func, SArray** array, int32_t itemSize, bool isPoint) { static bool fromTarray(const cJSON* json, const char* name, FFromJson func, SArray** array, int32_t itemSize,
bool isPoint) {
const cJSON* jArray = cJSON_GetObjectItem(json, name); const cJSON* jArray = cJSON_GetObjectItem(json, name);
int32_t size = (NULL == jArray ? 0 : cJSON_GetArraySize(jArray)); int32_t size = (NULL == jArray ? 0 : cJSON_GetArraySize(jArray));
if (size > 0) { if (size > 0) {
*array = taosArrayInit(size, isPoint ? POINTER_BYTES : itemSize); *array = taosArrayInit(size, isPoint ? POINTER_BYTES : itemSize);
if (NULL == *array) { if (NULL == *array) {
...@@ -188,7 +190,8 @@ static bool fromArray(const cJSON* json, const char* name, FFromJson func, SArra ...@@ -188,7 +190,8 @@ static bool fromArray(const cJSON* json, const char* name, FFromJson func, SArra
return fromTarray(json, name, func, array, itemSize, true); return fromTarray(json, name, func, array, itemSize, true);
} }
static bool addRawArray(cJSON* json, const char* name, FToJson func, const void* array, int32_t itemSize, int32_t size) { static bool addRawArray(cJSON* json, const char* name, FToJson func, const void* array, int32_t itemSize,
int32_t size) {
if (size > 0) { if (size > 0) {
cJSON* jArray = cJSON_AddArrayToObject(json, name); cJSON* jArray = cJSON_AddArrayToObject(json, name);
if (NULL == jArray) { if (NULL == jArray) {
...@@ -218,7 +221,8 @@ static bool fromItem(const cJSON* jArray, FFromJson func, void* array, int32_t i ...@@ -218,7 +221,8 @@ static bool fromItem(const cJSON* jArray, FFromJson func, void* array, int32_t i
return true; return true;
} }
static bool fromRawArrayWithAlloc(const cJSON* json, const char* name, FFromJson func, void** array, int32_t itemSize, int32_t* size) { static bool fromRawArrayWithAlloc(const cJSON* json, const char* name, FFromJson func, void** array, int32_t itemSize,
int32_t* size) {
const cJSON* jArray = getArray(json, name, size); const cJSON* jArray = getArray(json, name, size);
if (*size > 0) { if (*size > 0) {
*array = calloc(1, itemSize * (*size)); *array = calloc(1, itemSize * (*size));
...@@ -229,7 +233,8 @@ static bool fromRawArrayWithAlloc(const cJSON* json, const char* name, FFromJson ...@@ -229,7 +233,8 @@ static bool fromRawArrayWithAlloc(const cJSON* json, const char* name, FFromJson
return fromItem(jArray, func, *array, itemSize, *size); return fromItem(jArray, func, *array, itemSize, *size);
} }
static bool fromRawArray(const cJSON* json, const char* name, FFromJson func, void* array, int32_t itemSize, int32_t* size) { static bool fromRawArray(const cJSON* json, const char* name, FFromJson func, void* array, int32_t itemSize,
int32_t* size) {
const cJSON* jArray = getArray(json, name, size); const cJSON* jArray = getArray(json, name, size);
return fromItem(jArray, func, array, itemSize, *size); return fromItem(jArray, func, array, itemSize, *size);
} }
...@@ -240,7 +245,7 @@ static const char* jkSchemaBytes = "Bytes"; ...@@ -240,7 +245,7 @@ static const char* jkSchemaBytes = "Bytes";
// The 'name' field do not need to be serialized. // The 'name' field do not need to be serialized.
static bool schemaToJson(const void* obj, cJSON* jSchema) { static bool schemaToJson(const void* obj, cJSON* jSchema) {
const SSlotSchema* schema = (const SSlotSchema*)obj; const SSlotSchema* schema = (const SSlotSchema*)obj;
bool res = cJSON_AddNumberToObject(jSchema, jkSchemaType, schema->type); bool res = cJSON_AddNumberToObject(jSchema, jkSchemaType, schema->type);
if (res) { if (res) {
res = cJSON_AddNumberToObject(jSchema, jkSchemaColId, schema->colId); res = cJSON_AddNumberToObject(jSchema, jkSchemaColId, schema->colId);
} }
...@@ -264,7 +269,8 @@ static const char* jkDataBlockSchemaPrecision = "Precision"; ...@@ -264,7 +269,8 @@ static const char* jkDataBlockSchemaPrecision = "Precision";
static bool dataBlockSchemaToJson(const void* obj, cJSON* json) { static bool dataBlockSchemaToJson(const void* obj, cJSON* json) {
const SDataBlockSchema* schema = (const SDataBlockSchema*)obj; const SDataBlockSchema* schema = (const SDataBlockSchema*)obj;
bool res = addRawArray(json, jkDataBlockSchemaSlotSchema, schemaToJson, schema->pSchema, sizeof(SSlotSchema), schema->numOfCols); bool res = addRawArray(json, jkDataBlockSchemaSlotSchema, schemaToJson, schema->pSchema, sizeof(SSlotSchema),
schema->numOfCols);
if (res) { if (res) {
res = cJSON_AddNumberToObject(json, jkDataBlockSchemaResultRowSize, schema->resultRowSize); res = cJSON_AddNumberToObject(json, jkDataBlockSchemaResultRowSize, schema->resultRowSize);
} }
...@@ -279,7 +285,8 @@ static bool dataBlockSchemaFromJson(const cJSON* json, void* obj) { ...@@ -279,7 +285,8 @@ static bool dataBlockSchemaFromJson(const cJSON* json, void* obj) {
schema->resultRowSize = getNumber(json, jkDataBlockSchemaResultRowSize); schema->resultRowSize = getNumber(json, jkDataBlockSchemaResultRowSize);
schema->precision = getNumber(json, jkDataBlockSchemaPrecision); schema->precision = getNumber(json, jkDataBlockSchemaPrecision);
return fromRawArrayWithAlloc(json, jkDataBlockSchemaSlotSchema, schemaFromJson, (void**)&(schema->pSchema), sizeof(SSlotSchema), &schema->numOfCols); return fromRawArrayWithAlloc(json, jkDataBlockSchemaSlotSchema, schemaFromJson, (void**)&(schema->pSchema),
sizeof(SSlotSchema), &schema->numOfCols);
} }
static const char* jkColumnFilterInfoLowerRelOptr = "LowerRelOptr"; static const char* jkColumnFilterInfoLowerRelOptr = "LowerRelOptr";
...@@ -290,7 +297,7 @@ static const char* jkColumnFilterInfoUpperBnd = "UpperBnd"; ...@@ -290,7 +297,7 @@ static const char* jkColumnFilterInfoUpperBnd = "UpperBnd";
static bool columnFilterInfoToJson(const void* obj, cJSON* jFilter) { static bool columnFilterInfoToJson(const void* obj, cJSON* jFilter) {
const SColumnFilterInfo* filter = (const SColumnFilterInfo*)obj; const SColumnFilterInfo* filter = (const SColumnFilterInfo*)obj;
bool res = cJSON_AddNumberToObject(jFilter, jkColumnFilterInfoLowerRelOptr, filter->lowerRelOptr); bool res = cJSON_AddNumberToObject(jFilter, jkColumnFilterInfoLowerRelOptr, filter->lowerRelOptr);
if (res) { if (res) {
res = cJSON_AddNumberToObject(jFilter, jkColumnFilterInfoUpperRelOptr, filter->upperRelOptr); res = cJSON_AddNumberToObject(jFilter, jkColumnFilterInfoUpperRelOptr, filter->upperRelOptr);
} }
...@@ -323,7 +330,7 @@ static const char* jkColumnInfoFilterList = "FilterList"; ...@@ -323,7 +330,7 @@ static const char* jkColumnInfoFilterList = "FilterList";
static bool columnInfoToJson(const void* obj, cJSON* jCol) { static bool columnInfoToJson(const void* obj, cJSON* jCol) {
const SColumnInfo* col = (const SColumnInfo*)obj; const SColumnInfo* col = (const SColumnInfo*)obj;
bool res = cJSON_AddNumberToObject(jCol, jkColumnInfoColId, col->colId); bool res = cJSON_AddNumberToObject(jCol, jkColumnInfoColId, col->colId);
if (res) { if (res) {
res = cJSON_AddNumberToObject(jCol, jkColumnInfoType, col->type); res = cJSON_AddNumberToObject(jCol, jkColumnInfoType, col->type);
} }
...@@ -331,8 +338,9 @@ static bool columnInfoToJson(const void* obj, cJSON* jCol) { ...@@ -331,8 +338,9 @@ static bool columnInfoToJson(const void* obj, cJSON* jCol) {
res = cJSON_AddNumberToObject(jCol, jkColumnInfoBytes, col->bytes); res = cJSON_AddNumberToObject(jCol, jkColumnInfoBytes, col->bytes);
} }
if (res) { // TODO: temporarily disable it if (res) { // TODO: temporarily disable it
// res = addRawArray(jCol, jkColumnInfoFilterList, columnFilterInfoToJson, col->flist.filterInfo, sizeof(SColumnFilterInfo), col->flist.numOfFilters); // res = addRawArray(jCol, jkColumnInfoFilterList, columnFilterInfoToJson, col->flist.filterInfo,
// sizeof(SColumnFilterInfo), col->flist.numOfFilters);
} }
return res; return res;
...@@ -341,10 +349,11 @@ static bool columnInfoToJson(const void* obj, cJSON* jCol) { ...@@ -341,10 +349,11 @@ static bool columnInfoToJson(const void* obj, cJSON* jCol) {
static bool columnInfoFromJson(const cJSON* json, void* obj) { static bool columnInfoFromJson(const cJSON* json, void* obj) {
SColumnInfo* col = (SColumnInfo*)obj; SColumnInfo* col = (SColumnInfo*)obj;
col->colId = getNumber(json, jkColumnInfoColId); col->colId = getNumber(json, jkColumnInfoColId);
col->type = getNumber(json, jkColumnInfoType); col->type = getNumber(json, jkColumnInfoType);
col->bytes = getNumber(json, jkColumnInfoBytes); col->bytes = getNumber(json, jkColumnInfoBytes);
int32_t size = 0; int32_t size = 0;
bool res = fromRawArrayWithAlloc(json, jkColumnInfoFilterList, columnFilterInfoFromJson, (void**)&col->flist.filterInfo, sizeof(SColumnFilterInfo), &size); bool res = fromRawArrayWithAlloc(json, jkColumnInfoFilterList, columnFilterInfoFromJson,
(void**)&col->flist.filterInfo, sizeof(SColumnFilterInfo), &size);
col->flist.numOfFilters = size; col->flist.numOfFilters = size;
return res; return res;
} }
...@@ -355,7 +364,7 @@ static const char* jkColumnInfo = "Info"; ...@@ -355,7 +364,7 @@ static const char* jkColumnInfo = "Info";
static bool columnToJson(const void* obj, cJSON* jCol) { static bool columnToJson(const void* obj, cJSON* jCol) {
const SColumn* col = (const SColumn*)obj; const SColumn* col = (const SColumn*)obj;
bool res = cJSON_AddNumberToObject(jCol, jkColumnTableId, col->uid); bool res = cJSON_AddNumberToObject(jCol, jkColumnTableId, col->uid);
if (res) { if (res) {
res = cJSON_AddNumberToObject(jCol, jkColumnFlag, col->flag); res = cJSON_AddNumberToObject(jCol, jkColumnFlag, col->flag);
} }
...@@ -381,7 +390,7 @@ static const char* jkExprNodeRight = "Right"; ...@@ -381,7 +390,7 @@ static const char* jkExprNodeRight = "Right";
static bool operatorToJson(const void* obj, cJSON* jOper) { static bool operatorToJson(const void* obj, cJSON* jOper) {
const tExprNode* exprInfo = (const tExprNode*)obj; const tExprNode* exprInfo = (const tExprNode*)obj;
bool res = cJSON_AddNumberToObject(jOper, jkExprNodeOper, exprInfo->_node.optr); bool res = cJSON_AddNumberToObject(jOper, jkExprNodeOper, exprInfo->_node.optr);
if (res) { if (res) {
res = addObject(jOper, jkExprNodeLeft, exprNodeToJson, exprInfo->_node.pLeft); res = addObject(jOper, jkExprNodeLeft, exprNodeToJson, exprInfo->_node.pLeft);
} }
...@@ -406,9 +415,10 @@ static const char* jkFunctionChild = "Child"; ...@@ -406,9 +415,10 @@ static const char* jkFunctionChild = "Child";
static bool functionToJson(const void* obj, cJSON* jFunc) { static bool functionToJson(const void* obj, cJSON* jFunc) {
const tExprNode* exprInfo = (const tExprNode*)obj; const tExprNode* exprInfo = (const tExprNode*)obj;
bool res = cJSON_AddStringToObject(jFunc, jkFunctionName, exprInfo->_function.functionName); bool res = cJSON_AddStringToObject(jFunc, jkFunctionName, exprInfo->_function.functionName);
if (res && NULL != exprInfo->_function.pChild) { if (res && NULL != exprInfo->_function.pChild) {
res = addRawArray(jFunc, jkFunctionChild, exprNodeToJson, exprInfo->_function.pChild, sizeof(tExprNode*), exprInfo->_function.num); res = addRawArray(jFunc, jkFunctionChild, exprNodeToJson, exprInfo->_function.pChild, sizeof(tExprNode*),
exprInfo->_function.num);
} }
return res; return res;
} }
...@@ -420,7 +430,8 @@ static bool functionFromJson(const cJSON* json, void* obj) { ...@@ -420,7 +430,8 @@ static bool functionFromJson(const cJSON* json, void* obj) {
if (NULL == exprInfo->_function.pChild) { if (NULL == exprInfo->_function.pChild) {
return false; return false;
} }
return fromRawArrayWithAlloc(json, jkFunctionChild, exprNodeFromJson, (void**)exprInfo->_function.pChild, sizeof(tExprNode*), &exprInfo->_function.num); return fromRawArrayWithAlloc(json, jkFunctionChild, exprNodeFromJson, (void**)exprInfo->_function.pChild,
sizeof(tExprNode*), &exprInfo->_function.num);
} }
static const char* jkVariantType = "Type"; static const char* jkVariantType = "Type";
...@@ -430,12 +441,12 @@ static const char* jkVariantValue = "Value"; ...@@ -430,12 +441,12 @@ static const char* jkVariantValue = "Value";
static bool variantToJson(const void* obj, cJSON* jVar) { static bool variantToJson(const void* obj, cJSON* jVar) {
const SVariant* var = (const SVariant*)obj; const SVariant* var = (const SVariant*)obj;
bool res = cJSON_AddNumberToObject(jVar, jkVariantType, var->nType); bool res = cJSON_AddNumberToObject(jVar, jkVariantType, var->nType);
if (res) { if (res) {
res = cJSON_AddNumberToObject(jVar, jkVariantLen, var->nLen); res = cJSON_AddNumberToObject(jVar, jkVariantLen, var->nLen);
} }
if (res) { if (res) {
if (0/* in */) { if (0 /* in */) {
res = addArray(jVar, jkVariantvalues, variantToJson, var->arr); res = addArray(jVar, jkVariantvalues, variantToJson, var->arr);
} else if (IS_NUMERIC_TYPE(var->nType)) { } else if (IS_NUMERIC_TYPE(var->nType)) {
res = cJSON_AddNumberToObject(jVar, jkVariantValue, var->d); res = cJSON_AddNumberToObject(jVar, jkVariantValue, var->d);
...@@ -450,7 +461,7 @@ static bool variantFromJson(const cJSON* json, void* obj) { ...@@ -450,7 +461,7 @@ static bool variantFromJson(const cJSON* json, void* obj) {
SVariant* var = (SVariant*)obj; SVariant* var = (SVariant*)obj;
var->nType = getNumber(json, jkVariantType); var->nType = getNumber(json, jkVariantType);
var->nLen = getNumber(json, jkVariantLen); var->nLen = getNumber(json, jkVariantLen);
if (0/* in */) { if (0 /* in */) {
return fromArray(json, jkVariantvalues, variantFromJson, &var->arr, sizeof(SVariant)); return fromArray(json, jkVariantvalues, variantFromJson, &var->arr, sizeof(SVariant));
} else if (IS_NUMERIC_TYPE(var->nType)) { } else if (IS_NUMERIC_TYPE(var->nType)) {
var->d = getNumber(json, jkVariantValue); var->d = getNumber(json, jkVariantValue);
...@@ -468,7 +479,7 @@ static const char* jkExprNodeValue = "Value"; ...@@ -468,7 +479,7 @@ static const char* jkExprNodeValue = "Value";
static bool exprNodeToJson(const void* obj, cJSON* jExprInfo) { static bool exprNodeToJson(const void* obj, cJSON* jExprInfo) {
const tExprNode* exprInfo = *(const tExprNode**)obj; const tExprNode* exprInfo = *(const tExprNode**)obj;
bool res = cJSON_AddNumberToObject(jExprInfo, jkExprNodeType, exprInfo->nodeType); bool res = cJSON_AddNumberToObject(jExprInfo, jkExprNodeType, exprInfo->nodeType);
if (res) { if (res) {
switch (exprInfo->nodeType) { switch (exprInfo->nodeType) {
case TEXPR_BINARYEXPR_NODE: case TEXPR_BINARYEXPR_NODE:
...@@ -502,7 +513,8 @@ static bool exprNodeFromJson(const cJSON* json, void* obj) { ...@@ -502,7 +513,8 @@ static bool exprNodeFromJson(const cJSON* json, void* obj) {
case TEXPR_FUNCTION_NODE: case TEXPR_FUNCTION_NODE:
return fromObject(json, jkExprNodeFunction, functionFromJson, exprInfo, false); return fromObject(json, jkExprNodeFunction, functionFromJson, exprInfo, false);
case TEXPR_COL_NODE: case TEXPR_COL_NODE:
return fromObjectWithAlloc(json, jkExprNodeColumn, schemaFromJson, (void**)&exprInfo->pSchema, sizeof(SSchema), false); return fromObjectWithAlloc(json, jkExprNodeColumn, schemaFromJson, (void**)&exprInfo->pSchema, sizeof(SSchema),
false);
case TEXPR_VALUE_NODE: case TEXPR_VALUE_NODE:
return fromObject(json, jkExprNodeValue, variantFromJson, exprInfo->pVal, false); return fromObject(json, jkExprNodeValue, variantFromJson, exprInfo->pVal, false);
default: default:
...@@ -518,7 +530,7 @@ static const char* jkSqlExprParams = "Params"; ...@@ -518,7 +530,7 @@ static const char* jkSqlExprParams = "Params";
// token does not need to be serialized. // token does not need to be serialized.
static bool sqlExprToJson(const void* obj, cJSON* jExpr) { static bool sqlExprToJson(const void* obj, cJSON* jExpr) {
const SSqlExpr* expr = (const SSqlExpr*)obj; const SSqlExpr* expr = (const SSqlExpr*)obj;
bool res = addObject(jExpr, jkSqlExprSchema, schemaToJson, &expr->resSchema); bool res = addObject(jExpr, jkSqlExprSchema, schemaToJson, &expr->resSchema);
if (res) { if (res) {
res = addRawArray(jExpr, jkSqlExprColumns, columnToJson, expr->pColumns, sizeof(SColumn), expr->numOfCols); res = addRawArray(jExpr, jkSqlExprColumns, columnToJson, expr->pColumns, sizeof(SColumn), expr->numOfCols);
} }
...@@ -533,9 +545,10 @@ static bool sqlExprToJson(const void* obj, cJSON* jExpr) { ...@@ -533,9 +545,10 @@ static bool sqlExprToJson(const void* obj, cJSON* jExpr) {
static bool sqlExprFromJson(const cJSON* json, void* obj) { static bool sqlExprFromJson(const cJSON* json, void* obj) {
SSqlExpr* expr = (SSqlExpr*)obj; SSqlExpr* expr = (SSqlExpr*)obj;
bool res = fromObject(json, jkSqlExprSchema, schemaFromJson, &expr->resSchema, false); bool res = fromObject(json, jkSqlExprSchema, schemaFromJson, &expr->resSchema, false);
if (res) { if (res) {
res = fromRawArrayWithAlloc(json, jkSqlExprColumns, columnFromJson, (void**)&expr->pColumns, sizeof(SColumn), &expr->numOfCols); res = fromRawArrayWithAlloc(json, jkSqlExprColumns, columnFromJson, (void**)&expr->pColumns, sizeof(SColumn),
&expr->numOfCols);
} }
if (res) { if (res) {
expr->interBytes = getNumber(json, jkSqlExprInterBytes); expr->interBytes = getNumber(json, jkSqlExprInterBytes);
...@@ -553,7 +566,7 @@ static const char* jkExprInfoExpr = "Expr"; ...@@ -553,7 +566,7 @@ static const char* jkExprInfoExpr = "Expr";
static bool exprInfoToJson(const void* obj, cJSON* jExprInfo) { static bool exprInfoToJson(const void* obj, cJSON* jExprInfo) {
const SExprInfo* exprInfo = (const SExprInfo*)obj; const SExprInfo* exprInfo = (const SExprInfo*)obj;
bool res = addObject(jExprInfo, jkExprInfoBase, sqlExprToJson, &exprInfo->base); bool res = addObject(jExprInfo, jkExprInfoBase, sqlExprToJson, &exprInfo->base);
if (res) { if (res) {
res = addObject(jExprInfo, jkExprInfoExpr, exprNodeToJson, &exprInfo->pExpr); res = addObject(jExprInfo, jkExprInfoExpr, exprNodeToJson, &exprInfo->pExpr);
} }
...@@ -562,9 +575,10 @@ static bool exprInfoToJson(const void* obj, cJSON* jExprInfo) { ...@@ -562,9 +575,10 @@ static bool exprInfoToJson(const void* obj, cJSON* jExprInfo) {
static bool exprInfoFromJson(const cJSON* json, void* obj) { static bool exprInfoFromJson(const cJSON* json, void* obj) {
SExprInfo* exprInfo = (SExprInfo*)obj; SExprInfo* exprInfo = (SExprInfo*)obj;
bool res = fromObject(json, jkExprInfoBase, sqlExprFromJson, &exprInfo->base, true); bool res = fromObject(json, jkExprInfoBase, sqlExprFromJson, &exprInfo->base, true);
if (res) { if (res) {
res = fromObjectWithAlloc(json, jkExprInfoExpr, exprNodeFromJson, (void**)&exprInfo->pExpr, sizeof(tExprNode), true); res =
fromObjectWithAlloc(json, jkExprInfoExpr, exprNodeFromJson, (void**)&exprInfo->pExpr, sizeof(tExprNode), true);
} }
return res; return res;
} }
...@@ -576,12 +590,12 @@ static bool timeWindowToJson(const void* obj, cJSON* json) { ...@@ -576,12 +590,12 @@ static bool timeWindowToJson(const void* obj, cJSON* json) {
const STimeWindow* win = (const STimeWindow*)obj; const STimeWindow* win = (const STimeWindow*)obj;
char tmp[40] = {0}; char tmp[40] = {0};
snprintf(tmp, tListLen(tmp),"%"PRId64, win->skey); snprintf(tmp, tListLen(tmp), "%" PRId64, win->skey);
bool res = cJSON_AddStringToObject(json, jkTimeWindowStartKey, tmp); bool res = cJSON_AddStringToObject(json, jkTimeWindowStartKey, tmp);
if (res) { if (res) {
memset(tmp, 0, tListLen(tmp)); memset(tmp, 0, tListLen(tmp));
snprintf(tmp, tListLen(tmp),"%"PRId64, win->ekey); snprintf(tmp, tListLen(tmp), "%" PRId64, win->ekey);
res = cJSON_AddStringToObject(json, jkTimeWindowEndKey, tmp); res = cJSON_AddStringToObject(json, jkTimeWindowEndKey, tmp);
} }
return res; return res;
...@@ -604,7 +618,7 @@ static bool scanNodeToJson(const void* obj, cJSON* json) { ...@@ -604,7 +618,7 @@ static bool scanNodeToJson(const void* obj, cJSON* json) {
const SScanPhyNode* pNode = (const SScanPhyNode*)obj; const SScanPhyNode* pNode = (const SScanPhyNode*)obj;
char uid[40] = {0}; char uid[40] = {0};
snprintf(uid, tListLen(uid), "%"PRIu64, pNode->uid); snprintf(uid, tListLen(uid), "%" PRIu64, pNode->uid);
bool res = cJSON_AddStringToObject(json, jkScanNodeTableId, uid); bool res = cJSON_AddStringToObject(json, jkScanNodeTableId, uid);
if (res) { if (res) {
...@@ -629,11 +643,11 @@ static bool scanNodeToJson(const void* obj, cJSON* json) { ...@@ -629,11 +643,11 @@ static bool scanNodeToJson(const void* obj, cJSON* json) {
static bool scanNodeFromJson(const cJSON* json, void* obj) { static bool scanNodeFromJson(const cJSON* json, void* obj) {
SScanPhyNode* pNode = (SScanPhyNode*)obj; SScanPhyNode* pNode = (SScanPhyNode*)obj;
pNode->uid = getBigintFromString(json, jkScanNodeTableId); pNode->uid = getBigintFromString(json, jkScanNodeTableId);
pNode->tableType = getNumber(json, jkScanNodeTableType); pNode->tableType = getNumber(json, jkScanNodeTableType);
pNode->count = getNumber(json, jkScanNodeTableCount); pNode->count = getNumber(json, jkScanNodeTableCount);
pNode->order = getNumber(json, jkScanNodeTableOrder); pNode->order = getNumber(json, jkScanNodeTableOrder);
pNode->reverse = getNumber(json, jkScanNodeTableRevCount); pNode->reverse = getNumber(json, jkScanNodeTableRevCount);
return true; return true;
} }
...@@ -644,7 +658,7 @@ static const char* jkColIndexName = "Name"; ...@@ -644,7 +658,7 @@ static const char* jkColIndexName = "Name";
static bool colIndexToJson(const void* obj, cJSON* json) { static bool colIndexToJson(const void* obj, cJSON* json) {
const SColIndex* col = (const SColIndex*)obj; const SColIndex* col = (const SColIndex*)obj;
bool res = cJSON_AddNumberToObject(json, jkColIndexColId, col->colId); bool res = cJSON_AddNumberToObject(json, jkColIndexColId, col->colId);
if (res) { if (res) {
res = cJSON_AddNumberToObject(json, jkColIndexColIndex, col->colIndex); res = cJSON_AddNumberToObject(json, jkColIndexColIndex, col->colIndex);
} }
...@@ -673,7 +687,7 @@ static const char* jkAggNodeGroupByList = "GroupByList"; ...@@ -673,7 +687,7 @@ static const char* jkAggNodeGroupByList = "GroupByList";
static bool aggNodeToJson(const void* obj, cJSON* json) { static bool aggNodeToJson(const void* obj, cJSON* json) {
const SAggPhyNode* agg = (const SAggPhyNode*)obj; const SAggPhyNode* agg = (const SAggPhyNode*)obj;
bool res = cJSON_AddNumberToObject(json, jkAggNodeAggAlgo, agg->aggAlgo); bool res = cJSON_AddNumberToObject(json, jkAggNodeAggAlgo, agg->aggAlgo);
if (res) { if (res) {
res = cJSON_AddNumberToObject(json, jkAggNodeAggSplit, agg->aggSplit); res = cJSON_AddNumberToObject(json, jkAggNodeAggSplit, agg->aggSplit);
} }
...@@ -703,7 +717,7 @@ static const char* jkTableScanNodeTagsConditions = "TagsConditions"; ...@@ -703,7 +717,7 @@ static const char* jkTableScanNodeTagsConditions = "TagsConditions";
static bool tableScanNodeToJson(const void* obj, cJSON* json) { static bool tableScanNodeToJson(const void* obj, cJSON* json) {
const STableScanPhyNode* scan = (const STableScanPhyNode*)obj; const STableScanPhyNode* scan = (const STableScanPhyNode*)obj;
bool res = scanNodeToJson(obj, json); bool res = scanNodeToJson(obj, json);
if (res) { if (res) {
res = cJSON_AddNumberToObject(json, jkTableScanNodeFlag, scan->scanFlag); res = cJSON_AddNumberToObject(json, jkTableScanNodeFlag, scan->scanFlag);
} }
...@@ -718,7 +732,7 @@ static bool tableScanNodeToJson(const void* obj, cJSON* json) { ...@@ -718,7 +732,7 @@ static bool tableScanNodeToJson(const void* obj, cJSON* json) {
static bool tableScanNodeFromJson(const cJSON* json, void* obj) { static bool tableScanNodeFromJson(const cJSON* json, void* obj) {
STableScanPhyNode* scan = (STableScanPhyNode*)obj; STableScanPhyNode* scan = (STableScanPhyNode*)obj;
bool res = scanNodeFromJson(json, obj); bool res = scanNodeFromJson(json, obj);
if (res) { if (res) {
scan->scanFlag = getNumber(json, jkTableScanNodeFlag); scan->scanFlag = getNumber(json, jkTableScanNodeFlag);
} }
...@@ -736,7 +750,7 @@ static const char* jkEpAddrPort = "Port"; ...@@ -736,7 +750,7 @@ static const char* jkEpAddrPort = "Port";
static bool epAddrToJson(const void* obj, cJSON* json) { static bool epAddrToJson(const void* obj, cJSON* json) {
const SEp* ep = (const SEp*)obj; const SEp* ep = (const SEp*)obj;
bool res = cJSON_AddStringToObject(json, jkEpAddrFqdn, ep->fqdn); bool res = cJSON_AddStringToObject(json, jkEpAddrFqdn, ep->fqdn);
if (res) { if (res) {
res = cJSON_AddNumberToObject(json, jkEpAddrPort, ep->port); res = cJSON_AddNumberToObject(json, jkEpAddrPort, ep->port);
} }
...@@ -750,46 +764,46 @@ static bool epAddrFromJson(const cJSON* json, void* obj) { ...@@ -750,46 +764,46 @@ static bool epAddrFromJson(const cJSON* json, void* obj) {
return true; return true;
} }
static const char* jkNodeAddrId = "NodeId"; static const char* jkNodeAddrId = "NodeId";
static const char* jkNodeAddrInUse = "InUse"; static const char* jkNodeAddrInUse = "InUse";
static const char* jkNodeAddrEpAddrs = "Ep"; static const char* jkNodeAddrEpAddrs = "Ep";
static const char* jkNodeAddr = "NodeAddr"; static const char* jkNodeAddr = "NodeAddr";
static const char* jkNodeTaskId = "TaskId"; static const char* jkNodeTaskId = "TaskId";
static const char* jkNodeTaskSchedId = "SchedId"; static const char* jkNodeTaskSchedId = "SchedId";
static bool queryNodeAddrToJson(const void* obj, cJSON* json) { static bool queryNodeAddrToJson(const void* obj, cJSON* json) {
const SQueryNodeAddr* pAddr = (const SQueryNodeAddr*) obj; const SQueryNodeAddr* pAddr = (const SQueryNodeAddr*)obj;
bool res = cJSON_AddNumberToObject(json, jkNodeAddrId, pAddr->nodeId); bool res = cJSON_AddNumberToObject(json, jkNodeAddrId, pAddr->nodeId);
if (res) { if (res) {
res = cJSON_AddNumberToObject(json, jkNodeAddrInUse, pAddr->epset.inUse); res = cJSON_AddNumberToObject(json, jkNodeAddrInUse, pAddr->epSet.inUse);
} }
if (res) { if (res) {
res = addRawArray(json, jkNodeAddrEpAddrs, epAddrToJson, pAddr->epset.eps, sizeof(SEp), pAddr->epset.numOfEps); res = addRawArray(json, jkNodeAddrEpAddrs, epAddrToJson, pAddr->epSet.eps, sizeof(SEp), pAddr->epSet.numOfEps);
} }
return res; return res;
} }
static bool queryNodeAddrFromJson(const cJSON* json, void* obj) { static bool queryNodeAddrFromJson(const cJSON* json, void* obj) {
SQueryNodeAddr* pAddr = (SQueryNodeAddr*) obj; SQueryNodeAddr* pAddr = (SQueryNodeAddr*)obj;
pAddr->nodeId = getNumber(json, jkNodeAddrId); pAddr->nodeId = getNumber(json, jkNodeAddrId);
pAddr->epset.inUse = getNumber(json, jkNodeAddrInUse); pAddr->epSet.inUse = getNumber(json, jkNodeAddrInUse);
int32_t numOfEps = 0; int32_t numOfEps = 0;
bool res = fromRawArray(json, jkNodeAddrEpAddrs, epAddrFromJson, pAddr->epset.eps, sizeof(SEp), &numOfEps); bool res = fromRawArray(json, jkNodeAddrEpAddrs, epAddrFromJson, pAddr->epSet.eps, sizeof(SEp), &numOfEps);
pAddr->epset.numOfEps = numOfEps; pAddr->epSet.numOfEps = numOfEps;
return res; return res;
} }
static bool nodeAddrToJson(const void* obj, cJSON* json) { static bool nodeAddrToJson(const void* obj, cJSON* json) {
const SDownstreamSource* pSource = (const SDownstreamSource*) obj; const SDownstreamSource* pSource = (const SDownstreamSource*)obj;
bool res = cJSON_AddNumberToObject(json, jkNodeTaskId, pSource->taskId); bool res = cJSON_AddNumberToObject(json, jkNodeTaskId, pSource->taskId);
if (res) { if (res) {
char t[30] = {0}; char t[30] = {0};
snprintf(t, tListLen(t), "%"PRIu64, pSource->schedId); snprintf(t, tListLen(t), "%" PRIu64, pSource->schedId);
res = cJSON_AddStringToObject(json, jkNodeTaskSchedId, t); res = cJSON_AddStringToObject(json, jkNodeTaskSchedId, t);
} }
...@@ -813,9 +827,10 @@ static const char* jkExchangeNodeSrcEndPoints = "SrcAddrs"; ...@@ -813,9 +827,10 @@ static const char* jkExchangeNodeSrcEndPoints = "SrcAddrs";
static bool exchangeNodeToJson(const void* obj, cJSON* json) { static bool exchangeNodeToJson(const void* obj, cJSON* json) {
const SExchangePhyNode* exchange = (const SExchangePhyNode*)obj; const SExchangePhyNode* exchange = (const SExchangePhyNode*)obj;
bool res = cJSON_AddNumberToObject(json, jkExchangeNodeSrcTemplateId, exchange->srcTemplateId); bool res = cJSON_AddNumberToObject(json, jkExchangeNodeSrcTemplateId, exchange->srcTemplateId);
if (res) { if (res) {
res = addRawArray(json, jkExchangeNodeSrcEndPoints, nodeAddrToJson, exchange->pSrcEndPoints->pData, sizeof(SDownstreamSource), taosArrayGetSize(exchange->pSrcEndPoints)); res = addRawArray(json, jkExchangeNodeSrcEndPoints, nodeAddrToJson, exchange->pSrcEndPoints->pData,
sizeof(SDownstreamSource), taosArrayGetSize(exchange->pSrcEndPoints));
} }
return res; return res;
} }
...@@ -823,7 +838,8 @@ static bool exchangeNodeToJson(const void* obj, cJSON* json) { ...@@ -823,7 +838,8 @@ static bool exchangeNodeToJson(const void* obj, cJSON* json) {
static bool exchangeNodeFromJson(const cJSON* json, void* obj) { static bool exchangeNodeFromJson(const cJSON* json, void* obj) {
SExchangePhyNode* exchange = (SExchangePhyNode*)obj; SExchangePhyNode* exchange = (SExchangePhyNode*)obj;
exchange->srcTemplateId = getNumber(json, jkExchangeNodeSrcTemplateId); exchange->srcTemplateId = getNumber(json, jkExchangeNodeSrcTemplateId);
return fromInlineArray(json, jkExchangeNodeSrcEndPoints, nodeAddrFromJson, &exchange->pSrcEndPoints, sizeof(SDownstreamSource)); return fromInlineArray(json, jkExchangeNodeSrcEndPoints, nodeAddrFromJson, &exchange->pSrcEndPoints,
sizeof(SDownstreamSource));
} }
static bool specificPhyNodeToJson(const void* obj, cJSON* json) { static bool specificPhyNodeToJson(const void* obj, cJSON* json) {
...@@ -855,7 +871,7 @@ static bool specificPhyNodeToJson(const void* obj, cJSON* json) { ...@@ -855,7 +871,7 @@ static bool specificPhyNodeToJson(const void* obj, cJSON* json) {
case OP_AllTimeWindow: case OP_AllTimeWindow:
case OP_AllMultiTableTimeInterval: case OP_AllMultiTableTimeInterval:
case OP_Order: case OP_Order:
break; // todo break; // todo
case OP_Exchange: case OP_Exchange:
return exchangeNodeToJson(obj, json); return exchangeNodeToJson(obj, json);
default: default:
...@@ -893,7 +909,7 @@ static bool specificPhyNodeFromJson(const cJSON* json, void* obj) { ...@@ -893,7 +909,7 @@ static bool specificPhyNodeFromJson(const cJSON* json, void* obj) {
case OP_AllTimeWindow: case OP_AllTimeWindow:
case OP_AllMultiTableTimeInterval: case OP_AllMultiTableTimeInterval:
case OP_Order: case OP_Order:
break; // todo break; // todo
case OP_Exchange: case OP_Exchange:
return exchangeNodeFromJson(json, obj); return exchangeNodeFromJson(json, obj);
default: default:
...@@ -910,7 +926,7 @@ static const char* jkPnodeChildren = "Children"; ...@@ -910,7 +926,7 @@ static const char* jkPnodeChildren = "Children";
// The 'pParent' field do not need to be serialized. // The 'pParent' field do not need to be serialized.
static bool phyNodeToJson(const void* obj, cJSON* jNode) { static bool phyNodeToJson(const void* obj, cJSON* jNode) {
const SPhyNode* phyNode = (const SPhyNode*)obj; const SPhyNode* phyNode = (const SPhyNode*)obj;
bool res = cJSON_AddNumberToObject(jNode, jkPnodeType, phyNode->info.type); bool res = cJSON_AddNumberToObject(jNode, jkPnodeType, phyNode->info.type);
if (res) { if (res) {
res = cJSON_AddStringToObject(jNode, jkPnodeName, phyNode->info.name); res = cJSON_AddStringToObject(jNode, jkPnodeName, phyNode->info.name);
} }
...@@ -933,7 +949,7 @@ static bool phyNodeToJson(const void* obj, cJSON* jNode) { ...@@ -933,7 +949,7 @@ static bool phyNodeToJson(const void* obj, cJSON* jNode) {
} }
static bool phyNodeFromJson(const cJSON* json, void* obj) { static bool phyNodeFromJson(const cJSON* json, void* obj) {
SPhyNode* node = (SPhyNode*) obj; SPhyNode* node = (SPhyNode*)obj;
node->info.type = getNumber(json, jkPnodeType); node->info.type = getNumber(json, jkPnodeType);
node->info.name = opTypeToOpName(node->info.type); node->info.name = opTypeToOpName(node->info.type);
...@@ -959,7 +975,7 @@ static const char* jkInserterDataSize = "DataSize"; ...@@ -959,7 +975,7 @@ static const char* jkInserterDataSize = "DataSize";
static bool inserterToJson(const void* obj, cJSON* json) { static bool inserterToJson(const void* obj, cJSON* json) {
const SDataInserter* inserter = (const SDataInserter*)obj; const SDataInserter* inserter = (const SDataInserter*)obj;
bool res = cJSON_AddNumberToObject(json, jkInserterNumOfTables, inserter->numOfTables); bool res = cJSON_AddNumberToObject(json, jkInserterNumOfTables, inserter->numOfTables);
if (res) { if (res) {
res = cJSON_AddNumberToObject(json, jkInserterDataSize, inserter->size); res = cJSON_AddNumberToObject(json, jkInserterDataSize, inserter->size);
} }
...@@ -1005,7 +1021,7 @@ static const char* jkDataSinkSchema = "Schema"; ...@@ -1005,7 +1021,7 @@ static const char* jkDataSinkSchema = "Schema";
static bool dataSinkToJson(const void* obj, cJSON* json) { static bool dataSinkToJson(const void* obj, cJSON* json) {
const SDataSink* dsink = (const SDataSink*)obj; const SDataSink* dsink = (const SDataSink*)obj;
bool res = cJSON_AddStringToObject(json, jkDataSinkName, dsink->info.name); bool res = cJSON_AddStringToObject(json, jkDataSinkName, dsink->info.name);
if (res) { if (res) {
res = addObject(json, dsink->info.name, specificDataSinkToJson, dsink); res = addObject(json, dsink->info.name, specificDataSinkToJson, dsink);
} }
...@@ -1034,7 +1050,7 @@ static bool subplanIdToJson(const void* obj, cJSON* jId) { ...@@ -1034,7 +1050,7 @@ static bool subplanIdToJson(const void* obj, cJSON* jId) {
const SSubplanId* id = (const SSubplanId*)obj; const SSubplanId* id = (const SSubplanId*)obj;
char ids[40] = {0}; char ids[40] = {0};
snprintf(ids, tListLen(ids), "%"PRIu64, id->queryId); snprintf(ids, tListLen(ids), "%" PRIu64, id->queryId);
bool res = cJSON_AddStringToObject(jId, jkIdQueryId, ids); bool res = cJSON_AddStringToObject(jId, jkIdQueryId, ids);
if (res) { if (res) {
...@@ -1049,9 +1065,9 @@ static bool subplanIdToJson(const void* obj, cJSON* jId) { ...@@ -1049,9 +1065,9 @@ static bool subplanIdToJson(const void* obj, cJSON* jId) {
static bool subplanIdFromJson(const cJSON* json, void* obj) { static bool subplanIdFromJson(const cJSON* json, void* obj) {
SSubplanId* id = (SSubplanId*)obj; SSubplanId* id = (SSubplanId*)obj;
id->queryId = getBigintFromString(json, jkIdQueryId); id->queryId = getBigintFromString(json, jkIdQueryId);
id->templateId = getNumber(json, jkIdTemplateId); id->templateId = getNumber(json, jkIdTemplateId);
id->subplanId = getNumber(json, jkIdSubplanId); id->subplanId = getNumber(json, jkIdSubplanId);
return true; return true;
} }
...@@ -1094,9 +1110,10 @@ static SSubplan* subplanFromJson(const cJSON* json) { ...@@ -1094,9 +1110,10 @@ static SSubplan* subplanFromJson(const cJSON* json) {
} }
if (res) { if (res) {
res = fromObjectWithAlloc(json, jkSubplanDataSink, dataSinkFromJson, (void**)&subplan->pDataSink, sizeof(SDataSink), false); res = fromObjectWithAlloc(json, jkSubplanDataSink, dataSinkFromJson, (void**)&subplan->pDataSink, sizeof(SDataSink),
false);
} }
if (!res) { if (!res) {
qDestroySubplan(subplan); qDestroySubplan(subplan);
return NULL; return NULL;
...@@ -1137,15 +1154,15 @@ int32_t stringToSubplan(const char* str, SSubplan** subplan) { ...@@ -1137,15 +1154,15 @@ int32_t stringToSubplan(const char* str, SSubplan** subplan) {
cJSON* qDagToJson(const SQueryDag* pDag) { cJSON* qDagToJson(const SQueryDag* pDag) {
cJSON* pRoot = cJSON_CreateObject(); cJSON* pRoot = cJSON_CreateObject();
if(pRoot == NULL) { if (pRoot == NULL) {
return NULL; return NULL;
} }
cJSON_AddNumberToObject(pRoot, "Number", pDag->numOfSubplans); cJSON_AddNumberToObject(pRoot, "Number", pDag->numOfSubplans);
cJSON_AddNumberToObject(pRoot, "QueryId", pDag->queryId); cJSON_AddNumberToObject(pRoot, "QueryId", pDag->queryId);
cJSON *pLevels = cJSON_CreateArray(); cJSON* pLevels = cJSON_CreateArray();
if(pLevels == NULL) { if (pLevels == NULL) {
cJSON_Delete(pRoot); cJSON_Delete(pRoot);
return NULL; return NULL;
} }
...@@ -1153,19 +1170,19 @@ cJSON* qDagToJson(const SQueryDag* pDag) { ...@@ -1153,19 +1170,19 @@ cJSON* qDagToJson(const SQueryDag* pDag) {
cJSON_AddItemToObject(pRoot, "Subplans", pLevels); cJSON_AddItemToObject(pRoot, "Subplans", pLevels);
size_t level = taosArrayGetSize(pDag->pSubplans); size_t level = taosArrayGetSize(pDag->pSubplans);
for(size_t i = 0; i < level; i++) { for (size_t i = 0; i < level; i++) {
const SArray* pSubplans = (const SArray*)taosArrayGetP(pDag->pSubplans, i); const SArray* pSubplans = (const SArray*)taosArrayGetP(pDag->pSubplans, i);
size_t num = taosArrayGetSize(pSubplans); size_t num = taosArrayGetSize(pSubplans);
cJSON* plansOneLevel = cJSON_CreateArray(); cJSON* plansOneLevel = cJSON_CreateArray();
if(plansOneLevel == NULL) { if (plansOneLevel == NULL) {
cJSON_Delete(pRoot); cJSON_Delete(pRoot);
return NULL; return NULL;
} }
cJSON_AddItemToArray(pLevels, plansOneLevel); cJSON_AddItemToArray(pLevels, plansOneLevel);
for(size_t j = 0; j < num; j++) { for (size_t j = 0; j < num; j++) {
cJSON* pSubplan = subplanToJson((const SSubplan*)taosArrayGetP(pSubplans, j)); cJSON* pSubplan = subplanToJson((const SSubplan*)taosArrayGetP(pSubplans, j));
if(pSubplan == NULL) { if (pSubplan == NULL) {
cJSON_Delete(pRoot); cJSON_Delete(pRoot);
return NULL; return NULL;
} }
...@@ -1183,22 +1200,22 @@ char* qDagToString(const SQueryDag* pDag) { ...@@ -1183,22 +1200,22 @@ char* qDagToString(const SQueryDag* pDag) {
SQueryDag* qJsonToDag(const cJSON* pRoot) { SQueryDag* qJsonToDag(const cJSON* pRoot) {
SQueryDag* pDag = malloc(sizeof(SQueryDag)); SQueryDag* pDag = malloc(sizeof(SQueryDag));
if(pDag == NULL) { if (pDag == NULL) {
return NULL; return NULL;
} }
pDag->numOfSubplans = cJSON_GetNumberValue(cJSON_GetObjectItem(pRoot, "Number")); pDag->numOfSubplans = cJSON_GetNumberValue(cJSON_GetObjectItem(pRoot, "Number"));
pDag->queryId = cJSON_GetNumberValue(cJSON_GetObjectItem(pRoot, "QueryId")); pDag->queryId = cJSON_GetNumberValue(cJSON_GetObjectItem(pRoot, "QueryId"));
pDag->pSubplans = taosArrayInit(0, sizeof(SArray)); pDag->pSubplans = taosArrayInit(0, sizeof(void*));
if (pDag->pSubplans == NULL) { if (pDag->pSubplans == NULL) {
free(pDag); free(pDag);
return NULL; return NULL;
} }
cJSON* pLevels = cJSON_GetObjectItem(pRoot, "Subplans"); cJSON* pLevels = cJSON_GetObjectItem(pRoot, "Subplans");
int level = cJSON_GetArraySize(pLevels); int level = cJSON_GetArraySize(pLevels);
for(int i = 0; i < level; i++) { for (int i = 0; i < level; i++) {
SArray* plansOneLevel = taosArrayInit(0, sizeof(void*)); SArray* plansOneLevel = taosArrayInit(0, sizeof(void*));
if(plansOneLevel == NULL) { if (plansOneLevel == NULL) {
for(int j = 0; j < i; j++) { for (int j = 0; j < i; j++) {
taosArrayDestroy(taosArrayGetP(pDag->pSubplans, j)); taosArrayDestroy(taosArrayGetP(pDag->pSubplans, j));
} }
taosArrayDestroy(pDag->pSubplans); taosArrayDestroy(pDag->pSubplans);
...@@ -1206,13 +1223,13 @@ SQueryDag* qJsonToDag(const cJSON* pRoot) { ...@@ -1206,13 +1223,13 @@ SQueryDag* qJsonToDag(const cJSON* pRoot) {
return NULL; return NULL;
} }
cJSON* pItem = cJSON_GetArrayItem(pLevels, i); cJSON* pItem = cJSON_GetArrayItem(pLevels, i);
int sz = cJSON_GetArraySize(pItem); int sz = cJSON_GetArraySize(pItem);
for(int j = 0; j < sz; j++) { for (int j = 0; j < sz; j++) {
cJSON* pSubplanJson = cJSON_GetArrayItem(pItem, j); cJSON* pSubplanJson = cJSON_GetArrayItem(pItem, j);
SSubplan* pSubplan = subplanFromJson(pSubplanJson); SSubplan* pSubplan = subplanFromJson(pSubplanJson);
taosArrayPush(plansOneLevel, &pSubplan); taosArrayPush(plansOneLevel, &pSubplan);
} }
taosArrayPush(pDag->pSubplans, plansOneLevel); taosArrayPush(pDag->pSubplans, &plansOneLevel);
} }
return pDag; return pDag;
} }
......
...@@ -423,13 +423,13 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) { ...@@ -423,13 +423,13 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) {
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
if (pTask->plan->execNode.epset.numOfEps > 0) { if (pTask->plan->execNode.epSet.numOfEps > 0) {
if (NULL == taosArrayPush(pTask->candidateAddrs, &pTask->plan->execNode)) { if (NULL == taosArrayPush(pTask->candidateAddrs, &pTask->plan->execNode)) {
SCH_TASK_ELOG("taosArrayPush execNode to candidate addrs failed, errno:%d", errno); SCH_TASK_ELOG("taosArrayPush execNode to candidate addrs failed, errno:%d", errno);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
SCH_TASK_DLOG("use execNode from plan as candidate addr, numOfEps:%d", pTask->plan->execNode.epset.numOfEps); SCH_TASK_DLOG("use execNode from plan as candidate addr, numOfEps:%d", pTask->plan->execNode.epSet.numOfEps);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -1061,7 +1061,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, ...@@ -1061,7 +1061,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
isCandidateAddr = true; isCandidateAddr = true;
} }
SEpSet epSet = addr->epset; SEpSet epSet = addr->epSet;
switch (msgType) { switch (msgType) {
case TDMT_VND_CREATE_TABLE: case TDMT_VND_CREATE_TABLE:
......
...@@ -104,8 +104,8 @@ void schtBuildQueryDag(SQueryDag *dag) { ...@@ -104,8 +104,8 @@ void schtBuildQueryDag(SQueryDag *dag) {
scanPlan->type = QUERY_TYPE_SCAN; scanPlan->type = QUERY_TYPE_SCAN;
scanPlan->execNode.nodeId = 1; scanPlan->execNode.nodeId = 1;
scanPlan->execNode.epset.inUse = 0; scanPlan->execNode.epSet.inUse = 0;
addEpIntoEpSet(&scanPlan->execNode.epset, "ep0", 6030); addEpIntoEpSet(&scanPlan->execNode.epSet, "ep0", 6030);
scanPlan->pChildren = NULL; scanPlan->pChildren = NULL;
scanPlan->level = 1; scanPlan->level = 1;
...@@ -118,7 +118,7 @@ void schtBuildQueryDag(SQueryDag *dag) { ...@@ -118,7 +118,7 @@ void schtBuildQueryDag(SQueryDag *dag) {
mergePlan->id.subplanId = 0x5555555555; mergePlan->id.subplanId = 0x5555555555;
mergePlan->type = QUERY_TYPE_MERGE; mergePlan->type = QUERY_TYPE_MERGE;
mergePlan->level = 0; mergePlan->level = 0;
mergePlan->execNode.epset.numOfEps = 0; mergePlan->execNode.epSet.numOfEps = 0;
mergePlan->pChildren = taosArrayInit(1, POINTER_BYTES); mergePlan->pChildren = taosArrayInit(1, POINTER_BYTES);
mergePlan->pParents = NULL; mergePlan->pParents = NULL;
...@@ -157,8 +157,8 @@ void schtBuildInsertDag(SQueryDag *dag) { ...@@ -157,8 +157,8 @@ void schtBuildInsertDag(SQueryDag *dag) {
insertPlan[0].level = 0; insertPlan[0].level = 0;
insertPlan[0].execNode.nodeId = 1; insertPlan[0].execNode.nodeId = 1;
insertPlan[0].execNode.epset.inUse = 0; insertPlan[0].execNode.epSet.inUse = 0;
addEpIntoEpSet(&insertPlan[0].execNode.epset, "ep0", 6030); addEpIntoEpSet(&insertPlan[0].execNode.epSet, "ep0", 6030);
insertPlan[0].pChildren = NULL; insertPlan[0].pChildren = NULL;
insertPlan[0].pParents = NULL; insertPlan[0].pParents = NULL;
...@@ -173,8 +173,8 @@ void schtBuildInsertDag(SQueryDag *dag) { ...@@ -173,8 +173,8 @@ void schtBuildInsertDag(SQueryDag *dag) {
insertPlan[1].level = 0; insertPlan[1].level = 0;
insertPlan[1].execNode.nodeId = 1; insertPlan[1].execNode.nodeId = 1;
insertPlan[1].execNode.epset.inUse = 0; insertPlan[1].execNode.epSet.inUse = 0;
addEpIntoEpSet(&insertPlan[1].execNode.epset, "ep0", 6030); addEpIntoEpSet(&insertPlan[1].execNode.epSet, "ep0", 6030);
insertPlan[1].pChildren = NULL; insertPlan[1].pChildren = NULL;
insertPlan[1].pParents = NULL; insertPlan[1].pParents = NULL;
......
...@@ -352,7 +352,7 @@ void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) { ...@@ -352,7 +352,7 @@ void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
int32_t cnt = 0; int32_t cnt = 0;
/*clock_t startTime = clock();*/ /*clock_t startTime = clock();*/
while (running) { while (running) {
tmq_message_t* tmqmessage = tmq_consumer_poll(tmq, 500); tmq_message_t* tmqmessage = tmq_consumer_poll(tmq, 1);
if (tmqmessage) { if (tmqmessage) {
cnt++; cnt++;
msg_process(tmqmessage); msg_process(tmqmessage);
...@@ -383,7 +383,7 @@ void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) { ...@@ -383,7 +383,7 @@ void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
} }
while (running) { while (running) {
tmq_message_t* tmqmessage = tmq_consumer_poll(tmq, 500); tmq_message_t* tmqmessage = tmq_consumer_poll(tmq, 1);
if (tmqmessage) { if (tmqmessage) {
msg_process(tmqmessage); msg_process(tmqmessage);
tmq_message_destroy(tmqmessage); tmq_message_destroy(tmqmessage);
...@@ -411,7 +411,7 @@ void perf_loop(tmq_t* tmq, tmq_list_t* topics, int32_t totalMsgs, int64_t walLog ...@@ -411,7 +411,7 @@ void perf_loop(tmq_t* tmq, tmq_list_t* topics, int32_t totalMsgs, int64_t walLog
int32_t skipLogNum = 0; int32_t skipLogNum = 0;
int64_t startTime = taosGetTimestampUs(); int64_t startTime = taosGetTimestampUs();
while (running) { while (running) {
tmq_message_t* tmqmessage = tmq_consumer_poll(tmq, 500); tmq_message_t* tmqmessage = tmq_consumer_poll(tmq, 1);
if (tmqmessage) { if (tmqmessage) {
batchCnt++; batchCnt++;
skipLogNum += tmqGetSkipLogNum(tmqmessage); skipLogNum += tmqGetSkipLogNum(tmqmessage);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册