未验证 提交 9c3a3c10 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #9946 from taosdata/feature/tq

refine query interface
......@@ -1525,9 +1525,23 @@ typedef struct SMqSetCVgReq {
char* sql;
char* logicalPlan;
char* physicalPlan;
SArray* tasks; // SArray<SSubQueryMsg>
SSubQueryMsg msg;
} SMqSetCVgReq;
static FORCE_INLINE int32_t tEncodeSSubQueryMsg(void** buf, const SSubQueryMsg* pMsg) {
int32_t tlen = sizeof(SSubQueryMsg) + pMsg->contentLen;
if (buf == NULL) return tlen;
memcpy(*buf, pMsg, tlen);
*buf = POINTER_SHIFT(*buf, tlen);
return tlen;
static FORCE_INLINE void* tDecodeSSubQueryMsg(void* buf, SSubQueryMsg* pMsg) {
int32_t tlen = sizeof(SSubQueryMsg) + ((SSubQueryMsg*)buf)->contentLen;
memcpy(pMsg, buf, tlen);
return POINTER_SHIFT(buf, tlen);
static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq* pReq) {
int32_t tlen = 0;
tlen += taosEncodeFixedI32(buf, pReq->vgId);
......@@ -1537,6 +1551,7 @@ static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq*
tlen += taosEncodeString(buf, pReq->sql);
tlen += taosEncodeString(buf, pReq->logicalPlan);
tlen += taosEncodeString(buf, pReq->physicalPlan);
tlen += tEncodeSSubQueryMsg(buf, &pReq->msg);
return tlen;
......@@ -1548,7 +1563,7 @@ static FORCE_INLINE void* tDecodeSMqSetCVgReq(void* buf, SMqSetCVgReq* pReq) {
buf = taosDecodeString(buf, &pReq->sql);
buf = taosDecodeString(buf, &pReq->logicalPlan);
buf = taosDecodeString(buf, &pReq->physicalPlan);
pReq->tasks = NULL;
buf = tDecodeSSubQueryMsg(buf, &pReq->msg);
return buf;
......@@ -109,12 +109,71 @@ typedef struct STableMetaOutput {
STableMeta *tbMeta;
} STableMetaOutput;
const SSchema* tGetTbnameColumnSchema();
typedef struct SDataBuf {
void *pData;
uint32_t len;
} SDataBuf;
typedef int32_t (*__async_send_cb_fn_t)(void* param, const SDataBuf* pMsg, int32_t code);
typedef int32_t (*__async_exec_fn_t)(void* param);
typedef struct SMsgSendInfo {
__async_send_cb_fn_t fp; //async callback function
void *param;
uint64_t requestId;
uint64_t requestObjRefId;
int32_t msgType;
SDataBuf msgInfo;
} SMsgSendInfo;
typedef struct SQueryNodeAddr {
int32_t nodeId; // vgId or qnodeId
int8_t inUse;
int8_t numOfEps;
} SQueryNodeAddr;
static FORCE_INLINE void tConvertQueryAddrToEpSet(SEpSet* pEpSet, const SQueryNodeAddr* pAddr) {
pEpSet->inUse = pAddr->inUse;
pEpSet->numOfEps = pAddr->numOfEps;
for (int j = 0; j < TSDB_MAX_REPLICA; j++) {
pEpSet->port[j] = pAddr->epAddr[j].port;
memcpy(pEpSet->fqdn[j], pAddr->epAddr[j].fqdn, TSDB_FQDN_LEN);
int32_t initTaskQueue();
int32_t cleanupTaskQueue();
* @param execFn The asynchronously execution function
* @param execParam The parameters of the execFn
* @param code The response code during execution the execFn
* @return
int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code);
* Asynchronously send message to server, after the response received, the callback will be incured.
* @param pTransporter
* @param epSet
* @param pTransporterId
* @param pInfo
* @return
int32_t asyncSendMsgToServer(void *pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo);
void initQueryModuleMsgHandle();
const SSchema* tGetTbnameColumnSchema();
bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTags);
extern int32_t (*queryBuildMsg[TDMT_MAX])(void* input, char **msg, int32_t msgSize, int32_t *msgLen);
extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char *msg, int32_t msgSize);
......@@ -83,56 +83,6 @@ int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo);
void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp);
int rpcReportProgress(void *pConn, char *pCont, int contLen);
void rpcCancelRequest(int64_t rid);
typedef struct SDataBuf {
void *pData;
uint32_t len;
} SDataBuf;
typedef int32_t (*__async_send_cb_fn_t)(void* param, const SDataBuf* pMsg, int32_t code);
typedef int32_t (*__async_exec_fn_t)(void* param);
typedef struct SMsgSendInfo {
__async_send_cb_fn_t fp; //async callback function
void *param;
uint64_t requestId;
uint64_t requestObjRefId;
int32_t msgType;
SDataBuf msgInfo;
} SMsgSendInfo;
typedef struct SQueryNodeAddr {
int32_t nodeId; // vgId or qnodeId
int8_t inUse;
int8_t numOfEps;
} SQueryNodeAddr;
bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTags);
int32_t initTaskQueue();
int32_t cleanupTaskQueue();
* @param execFn The asynchronously execution function
* @param execParam The parameters of the execFn
* @param code The response code during execution the execFn
* @return
int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code);
* Asynchronously send message to server, after the response received, the callback will be incured.
* @param pTransporter
* @param epSet
* @param pTransporterId
* @param pInfo
* @return
int32_t asyncSendMsgToServer(void *pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo);
#ifdef __cplusplus
......@@ -327,11 +327,13 @@ typedef struct SMqTopicConsumer {
typedef struct SMqConsumerEp {
int32_t vgId; // -1 for unassigned
SEpSet epset;
int64_t consumerId; // -1 for unassigned
int64_t lastConsumerHbTs;
int64_t lastVgHbTs;
int32_t vgId; // -1 for unassigned
SEpSet epset;
int64_t consumerId; // -1 for unassigned
int64_t lastConsumerHbTs;
int64_t lastVgHbTs;
int32_t execLen;
SSubQueryMsg qExec;
} SMqConsumerEp;
static FORCE_INLINE int32_t tEncodeSMqConsumerEp(void** buf, SMqConsumerEp* pConsumerEp) {
......@@ -339,6 +341,7 @@ static FORCE_INLINE int32_t tEncodeSMqConsumerEp(void** buf, SMqConsumerEp* pCon
tlen += taosEncodeFixedI32(buf, pConsumerEp->vgId);
tlen += taosEncodeSEpSet(buf, &pConsumerEp->epset);
tlen += taosEncodeFixedI64(buf, pConsumerEp->consumerId);
tlen += tEncodeSSubQueryMsg(buf, &pConsumerEp->qExec);
return tlen;
......@@ -346,6 +349,8 @@ static FORCE_INLINE void* tDecodeSMqConsumerEp(void** buf, SMqConsumerEp* pConsu
buf = taosDecodeFixedI32(buf, &pConsumerEp->vgId);
buf = taosDecodeSEpSet(buf, &pConsumerEp->epset);
buf = taosDecodeFixedI64(buf, &pConsumerEp->consumerId);
buf = tDecodeSSubQueryMsg(buf, &pConsumerEp->qExec);
pConsumerEp->execLen = sizeof(SSubQueryMsg) + pConsumerEp->qExec.contentLen;
return buf;
......@@ -105,6 +105,7 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
strcpy(req.sql, pTopic->sql);
strcpy(req.logicalPlan, pTopic->logicalPlan);
strcpy(req.physicalPlan, pTopic->physicalPlan);
memcpy(&req.msg, &pCEp->qExec, pCEp->execLen);
int32_t tlen = tEncodeSMqSetCVgReq(NULL, &req);
void *reqStr = malloc(tlen);
if (reqStr == NULL) {
......@@ -143,7 +144,21 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unassignedVg) {
SMqConsumerEp CEp;
CEp.lastConsumerHbTs = CEp.lastVgHbTs = -1;
int32_t sz;
//convert phyplan to dag
SQueryDag *pDag = qStringToDag(pTopic->physicalPlan);
SArray *pArray;
if (schedulerConvertDagToTaskList(pDag, &pArray) < 0) {
int32_t sz = taosArrayGetSize(pArray);
//convert dag to msg
for (int32_t i = 0; i < sz; i++) {
STaskInfo* pTaskInfo = taosArrayGet(pArray, i);
int32_t vgId = pTaskInfo->addr.nodeId;
SEpSet epSet;
tConvertQueryAddrToEpSet(&epSet, &pTaskInfo->addr);
SVgObj *pVgroup = NULL;
SSdb *pSdb = pMnode->pSdb;
void *pIter = sdbFetch(pSdb, SDB_VGROUP, NULL, (void **)&pVgroup);
......@@ -156,6 +171,7 @@ static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unas
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
return 0;
static int mndBuildMqSetConsumerVgReq(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer,
......@@ -681,7 +681,6 @@ int32_t tqProcessSetConnReq(STQ* pTq, SMqSetCVgReq* pReq) {
if (schedulerConvertDagToTaskList(pDag, &pArray) < 0) {
// TODO: handle error
ASSERT(taosArrayGetSize(pArray) == 0);
STaskInfo *pInfo = taosArrayGet(pArray, 0);
SArray* pTasks;
schedulerCopyTask(pInfo, &pTasks, TQ_BUFFER_SIZE);
......@@ -733,6 +732,7 @@ int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo* pBlockInfo)
// TODO: filter out unused column
return 0;
SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) {
int32_t sversion = pHandle->pBlock->sversion;
SSchemaWrapper* pSchemaWrapper = metaGetTableSchema(pHandle->pMeta, pHandle->pBlock->uid, sversion, true);
......@@ -762,7 +762,3 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) {
taosArrayPush(pArray, &colInfo);
return pArray;
/*int tqLoadDataBlock(SExecTaskInfo* pTaskInfo, SSubmitBlkScanInfo* pSubmitBlkScanInfo, SSDataBlock* pBlock, uint32_t
* status) {*/
/*return 0;*/
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
想要评论请 注册