提交 b6087cb8 编写于 作者: P plum-lihui

Merge branch '3.0' of github.com:taosdata/TDengine into 3.0

/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_CONSUMER_H_
#define _TD_CONSUMER_H_
#include "tlist.h"
#include "tarray.h"
#include "hash.h"
#ifdef __cplusplus
extern "C" {
#endif
//consumer handle
struct tmq_consumer_t;
typedef struct tmq_consumer_t tmq_consumer_t;
//consumer config
struct tmq_consumer_config_t;
typedef struct tmq_consumer_config_t tmq_consumer_config_t;
//response err
struct tmq_resp_err_t;
typedef struct tmq_resp_err_t tmq_resp_err_t;
struct tmq_message_t;
typedef struct tmq_message_t tmq_message_t;
struct tmq_col_batch_t;
typedef struct tmq_col_batch_t tmq_col_batch_t;
//get content of message
tmq_col_batch_t* tmq_get_msg_col_by_idx(tmq_message_t*, int32_t col_id);
tmq_col_batch_t* tmq_get_msg_col_by_name(tmq_message_t*, const char*);
//consumer config
int32_t tmq_conf_set(tmq_consumer_config_t* , const char* config_key, const char* config_value, char* errstr, int32_t errstr_cap);
//consumer initialization
//resouces are supposed to be free by users by calling tmq_consumer_destroy
tmq_consumer_t* tmq_consumer_new(tmq_consumer_config_t* , char* errstr, int32_t errstr_cap);
//subscribe
tmq_resp_err_t tmq_subscribe(tmq_consumer_t*, const SList*);
tmq_resp_err_t tmq_unsubscribe(tmq_consumer_t*);
//consume
//resouces are supposed to be free by users by calling tmq_message_destroy
tmq_message_t* tmq_consume_poll(tmq_consumer_t*, int64_t blocking_time);
//destroy message and free memory
void tmq_message_destroy(tmq_message_t*);
//close consumer
int32_t tmq_consumer_close(tmq_consumer_t*);
//destroy consumer
void tmq_consumer_destroy(tmq_message_t*);
#ifdef __cplusplus
}
#endif
#endif /*_TD_CONSUMER_H_*/
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_STREAM_H_
#define _TD_STREAM_H_
#ifdef __cplusplus
extern "C" {
#endif
#ifdef __cplusplus
}
#endif
#endif /*_TD_STREAM_H_*/
\ No newline at end of file
...@@ -30,6 +30,11 @@ enum { ...@@ -30,6 +30,11 @@ enum {
STREAM_TASK_STATUS__STOP, STREAM_TASK_STATUS__STOP,
}; };
enum {
STREAM_CREATED_BY__USER = 1,
STREAM_CREATED_BY__SMA,
};
#if 0 #if 0
// pipe -> fetch/pipe queue // pipe -> fetch/pipe queue
// merge -> merge queue // merge -> merge queue
......
...@@ -731,10 +731,10 @@ typedef struct { ...@@ -731,10 +731,10 @@ typedef struct {
int32_t vgNum; int32_t vgNum;
SRWLatch lock; SRWLatch lock;
int8_t status; int8_t status;
int8_t sourceType;
int8_t sinkType;
// int32_t sqlLen; // int32_t sqlLen;
int32_t sinkVgId; // 0 for automatic int8_t createdBy; // STREAM_CREATED_BY__USER or SMA
int32_t fixedSinkVgId; // 0 for shuffle
int64_t smaId; // 0 for unused
char* sql; char* sql;
char* logicalPlan; char* logicalPlan;
char* physicalPlan; char* physicalPlan;
......
...@@ -27,7 +27,7 @@ void mndCleanupScheduler(SMnode* pMnode); ...@@ -27,7 +27,7 @@ void mndCleanupScheduler(SMnode* pMnode);
int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscribeObj* pSub); int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscribeObj* pSub);
int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, int64_t smaId); int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -31,7 +31,7 @@ void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream); ...@@ -31,7 +31,7 @@ void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream);
SSdbRaw *mndStreamActionEncode(SStreamObj *pStream); SSdbRaw *mndStreamActionEncode(SStreamObj *pStream);
SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw); SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw);
int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, STrans *pTrans, int64_t smaId); int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, STrans *pTrans);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -26,8 +26,11 @@ int32_t tEncodeSStreamObj(SCoder *pEncoder, const SStreamObj *pObj) { ...@@ -26,8 +26,11 @@ int32_t tEncodeSStreamObj(SCoder *pEncoder, const SStreamObj *pObj) {
if (tEncodeI64(pEncoder, pObj->dbUid) < 0) return -1; if (tEncodeI64(pEncoder, pObj->dbUid) < 0) return -1;
if (tEncodeI32(pEncoder, pObj->version) < 0) return -1; if (tEncodeI32(pEncoder, pObj->version) < 0) return -1;
if (tEncodeI8(pEncoder, pObj->status) < 0) return -1; if (tEncodeI8(pEncoder, pObj->status) < 0) return -1;
if (tEncodeI8(pEncoder, pObj->createdBy) < 0) return -1;
if (tEncodeI32(pEncoder, pObj->fixedSinkVgId) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->smaId) < 0) return -1;
if (tEncodeCStr(pEncoder, pObj->sql) < 0) return -1; if (tEncodeCStr(pEncoder, pObj->sql) < 0) return -1;
if (tEncodeCStr(pEncoder, pObj->logicalPlan) < 0) return -1; /*if (tEncodeCStr(pEncoder, pObj->logicalPlan) < 0) return -1;*/
if (tEncodeCStr(pEncoder, pObj->physicalPlan) < 0) return -1; if (tEncodeCStr(pEncoder, pObj->physicalPlan) < 0) return -1;
// TODO encode tasks // TODO encode tasks
if (pObj->tasks) { if (pObj->tasks) {
...@@ -69,8 +72,11 @@ int32_t tDecodeSStreamObj(SCoder *pDecoder, SStreamObj *pObj) { ...@@ -69,8 +72,11 @@ int32_t tDecodeSStreamObj(SCoder *pDecoder, SStreamObj *pObj) {
if (tDecodeI64(pDecoder, &pObj->dbUid) < 0) return -1; if (tDecodeI64(pDecoder, &pObj->dbUid) < 0) return -1;
if (tDecodeI32(pDecoder, &pObj->version) < 0) return -1; if (tDecodeI32(pDecoder, &pObj->version) < 0) return -1;
if (tDecodeI8(pDecoder, &pObj->status) < 0) return -1; if (tDecodeI8(pDecoder, &pObj->status) < 0) return -1;
if (tDecodeI8(pDecoder, &pObj->createdBy) < 0) return -1;
if (tDecodeI32(pDecoder, &pObj->fixedSinkVgId) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->smaId) < 0) return -1;
if (tDecodeCStrAlloc(pDecoder, &pObj->sql) < 0) return -1; if (tDecodeCStrAlloc(pDecoder, &pObj->sql) < 0) return -1;
if (tDecodeCStrAlloc(pDecoder, &pObj->logicalPlan) < 0) return -1; /*if (tDecodeCStrAlloc(pDecoder, &pObj->logicalPlan) < 0) return -1;*/
if (tDecodeCStrAlloc(pDecoder, &pObj->physicalPlan) < 0) return -1; if (tDecodeCStrAlloc(pDecoder, &pObj->physicalPlan) < 0) return -1;
pObj->tasks = NULL; pObj->tasks = NULL;
int32_t sz; int32_t sz;
......
...@@ -119,7 +119,7 @@ SVgObj* mndSchedFetchOneVg(SMnode* pMnode, int64_t dbUid) { ...@@ -119,7 +119,7 @@ SVgObj* mndSchedFetchOneVg(SMnode* pMnode, int64_t dbUid) {
return pVgroup; return pVgroup;
} }
int32_t mndAddSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, int64_t smaId) { int32_t mndAddShuffledSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
SSdb* pSdb = pMnode->pSdb; SSdb* pSdb = pMnode->pSdb;
void* pIter = NULL; void* pIter = NULL;
SArray* tasks = taosArrayGetP(pStream->tasks, 0); SArray* tasks = taosArrayGetP(pStream->tasks, 0);
...@@ -151,9 +151,9 @@ int32_t mndAddSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, ...@@ -151,9 +151,9 @@ int32_t mndAddSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream,
pTask->execType = TASK_EXEC__NONE; pTask->execType = TASK_EXEC__NONE;
// sink // sink
if (smaId != -1) { if (pStream->createdBy == STREAM_CREATED_BY__SMA) {
pTask->sinkType = TASK_SINK__SMA; pTask->sinkType = TASK_SINK__SMA;
pTask->smaSink.smaId = smaId; pTask->smaSink.smaId = pStream->smaId;
} else { } else {
pTask->sinkType = TASK_SINK__TABLE; pTask->sinkType = TASK_SINK__TABLE;
} }
...@@ -166,7 +166,45 @@ int32_t mndAddSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, ...@@ -166,7 +166,45 @@ int32_t mndAddSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream,
return 0; return 0;
} }
int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, int64_t smaId) { int32_t mndAddFixedSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
ASSERT(pStream->fixedSinkVgId != 0);
SArray* tasks = taosArrayGetP(pStream->tasks, 0);
SStreamTask* pTask = tNewSStreamTask(pStream->uid);
if (pTask == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
taosArrayPush(tasks, &pTask);
pTask->nodeId = pStream->fixedSinkVgId;
SVgObj* pVgroup = mndAcquireVgroup(pMnode, pStream->fixedSinkVgId);
if (pVgroup == NULL) {
return -1;
}
pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup);
// source
pTask->sourceType = TASK_SOURCE__MERGE;
// exec
pTask->execType = TASK_EXEC__NONE;
// sink
if (pStream->createdBy == STREAM_CREATED_BY__SMA) {
pTask->sinkType = TASK_SINK__SMA;
pTask->smaSink.smaId = pStream->smaId;
} else {
pTask->sinkType = TASK_SINK__TABLE;
}
//
// dispatch
pTask->dispatchType = TASK_DISPATCH__NONE;
mndPersistTaskDeployReq(pTrans, pTask, &pTask->epSet, TDMT_VND_TASK_DEPLOY, pVgroup->vgId);
return 0;
}
int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
SSdb* pSdb = pMnode->pSdb; SSdb* pSdb = pMnode->pSdb;
SQueryPlan* pPlan = qStringToQueryPlan(pStream->physicalPlan); SQueryPlan* pPlan = qStringToQueryPlan(pStream->physicalPlan);
if (pPlan == NULL) { if (pPlan == NULL) {
...@@ -185,7 +223,11 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, i ...@@ -185,7 +223,11 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, i
taosArrayPush(pStream->tasks, &taskOneLevel); taosArrayPush(pStream->tasks, &taskOneLevel);
// add extra sink // add extra sink
hasExtraSink = true; hasExtraSink = true;
mndAddSinkToStream(pMnode, pTrans, pStream, smaId); if (pStream->fixedSinkVgId == 0) {
mndAddShuffledSinkToStream(pMnode, pTrans, pStream);
} else {
mndAddFixedSinkToStream(pMnode, pTrans, pStream);
}
} }
for (int32_t level = 0; level < totLevel; level++) { for (int32_t level = 0; level < totLevel; level++) {
...@@ -221,12 +263,14 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, i ...@@ -221,12 +263,14 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, i
pTask->sinkType = TASK_SINK__SHOW; pTask->sinkType = TASK_SINK__SHOW;
pTask->showSink.reserved = 0; pTask->showSink.reserved = 0;
if (!hasExtraSink) { if (!hasExtraSink) {
if (smaId != -1) { #if 1
if (pStream->createdBy == STREAM_CREATED_BY__SMA) {
pTask->sinkType = TASK_SINK__SMA; pTask->sinkType = TASK_SINK__SMA;
pTask->smaSink.smaId = smaId; pTask->smaSink.smaId = pStream->smaId;
} else { } else {
pTask->sinkType = TASK_SINK__TABLE; pTask->sinkType = TASK_SINK__TABLE;
} }
#endif
} }
} else { } else {
pTask->sinkType = TASK_SINK__NONE; pTask->sinkType = TASK_SINK__NONE;
...@@ -286,6 +330,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, i ...@@ -286,6 +330,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, i
if (hasExtraSink) { if (hasExtraSink) {
// add dispatcher // add dispatcher
if (pStream->fixedSinkVgId == 0) {
pTask->dispatchType = TASK_DISPATCH__SHUFFLE; pTask->dispatchType = TASK_DISPATCH__SHUFFLE;
pTask->dispatchMsgType = TDMT_VND_TASK_WRITE_EXEC; pTask->dispatchMsgType = TDMT_VND_TASK_WRITE_EXEC;
...@@ -316,6 +361,17 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, i ...@@ -316,6 +361,17 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, i
} }
} }
} }
} else {
pTask->dispatchType = TASK_DISPATCH__FIXED;
pTask->dispatchMsgType = TDMT_VND_TASK_WRITE_EXEC;
SArray* pArray = taosArrayGetP(pStream->tasks, 0);
// one sink only
ASSERT(taosArrayGetSize(pArray) == 1);
SStreamTask* lastLevelTask = taosArrayGetP(pArray, 0);
pTask->fixedEpDispatcher.taskId = lastLevelTask->taskId;
pTask->fixedEpDispatcher.nodeId = lastLevelTask->nodeId;
pTask->fixedEpDispatcher.epSet = lastLevelTask->epSet;
}
} }
#endif #endif
......
...@@ -415,6 +415,10 @@ static int32_t mndCreateSma(SMnode *pMnode, SNodeMsg *pReq, SMCreateSmaReq *pCre ...@@ -415,6 +415,10 @@ static int32_t mndCreateSma(SMnode *pMnode, SNodeMsg *pReq, SMCreateSmaReq *pCre
streamObj.dbUid = pDb->uid; streamObj.dbUid = pDb->uid;
streamObj.version = 1; streamObj.version = 1;
streamObj.sql = pCreate->sql; streamObj.sql = pCreate->sql;
streamObj.createdBy = STREAM_CREATED_BY__SMA;
// TODO
streamObj.fixedSinkVgId = 0;
streamObj.smaId = smaObj.uid;
/*streamObj.physicalPlan = "";*/ /*streamObj.physicalPlan = "";*/
streamObj.logicalPlan = "not implemented"; streamObj.logicalPlan = "not implemented";
...@@ -428,7 +432,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SNodeMsg *pReq, SMCreateSmaReq *pCre ...@@ -428,7 +432,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SNodeMsg *pReq, SMCreateSmaReq *pCre
if (mndSetCreateSmaRedoLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER; if (mndSetCreateSmaRedoLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER;
if (mndSetCreateSmaCommitLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER; if (mndSetCreateSmaCommitLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER;
if (mndSetCreateSmaRedoActions(pMnode, pTrans, pDb, &smaObj) != 0) goto _OVER; if (mndSetCreateSmaRedoActions(pMnode, pTrans, pDb, &smaObj) != 0) goto _OVER;
if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, pTrans, smaObj.uid) != 0) goto _OVER; if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, pTrans) != 0) goto _OVER;
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
code = 0; code = 0;
......
...@@ -246,7 +246,7 @@ static int32_t mndStreamGetPlanString(const char *ast, char **pStr) { ...@@ -246,7 +246,7 @@ static int32_t mndStreamGetPlanString(const char *ast, char **pStr) {
return code; return code;
} }
int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, STrans *pTrans, int64_t smaId) { int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, STrans *pTrans) {
SNode *pAst = NULL; SNode *pAst = NULL;
if (nodesStringToNode(ast, &pAst) < 0) { if (nodesStringToNode(ast, &pAst) < 0) {
...@@ -271,7 +271,7 @@ int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast ...@@ -271,7 +271,7 @@ int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast
return -1; return -1;
} }
if (mndScheduleStream(pMnode, pTrans, pStream, smaId) < 0) { if (mndScheduleStream(pMnode, pTrans, pStream) < 0) {
mError("stream:%ld, schedule stream since %s", pStream->uid, terrstr()); mError("stream:%ld, schedule stream since %s", pStream->uid, terrstr());
return -1; return -1;
} }
...@@ -300,6 +300,10 @@ static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamRe ...@@ -300,6 +300,10 @@ static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamRe
streamObj.dbUid = pDb->uid; streamObj.dbUid = pDb->uid;
streamObj.version = 1; streamObj.version = 1;
streamObj.sql = pCreate->sql; streamObj.sql = pCreate->sql;
streamObj.createdBy = STREAM_CREATED_BY__USER;
// TODO
streamObj.fixedSinkVgId = 0;
streamObj.smaId = 0;
/*streamObj.physicalPlan = "";*/ /*streamObj.physicalPlan = "";*/
streamObj.logicalPlan = "not implemented"; streamObj.logicalPlan = "not implemented";
...@@ -310,7 +314,7 @@ static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamRe ...@@ -310,7 +314,7 @@ static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamRe
} }
mDebug("trans:%d, used to create stream:%s", pTrans->id, pCreate->name); mDebug("trans:%d, used to create stream:%s", pTrans->id, pCreate->name);
if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, pTrans, -1) != 0) { if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, pTrans) != 0) {
mError("trans:%d, failed to add stream since %s", pTrans->id, terrstr()); mError("trans:%d, failed to add stream since %s", pTrans->id, terrstr());
mndTransDrop(pTrans); mndTransDrop(pTrans);
return -1; return -1;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册