未验证 提交 58ee66a8 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #12013 from taosdata/feature/tq

fix: memory error
...@@ -78,11 +78,14 @@ int32_t create_stream() { ...@@ -78,11 +78,14 @@ int32_t create_stream() {
taos_free_result(pRes); taos_free_result(pRes);
/*const char* sql = "select min(k), max(k), sum(k) from tu1";*/ /*const char* sql = "select min(k), max(k), sum(k) from tu1";*/
const char* sql = "select min(k), max(k), sum(k) as sum_of_k from st1"; /*const char* sql = "select min(k), max(k), sum(k) as sum_of_k from st1";*/
/*const char* sql = "select sum(k) from tu1 interval(10m)";*/ /*const char* sql = "select sum(k) from tu1 interval(10m)";*/
pRes = tmq_create_stream(pConn, "stream1", "out1", sql); /*pRes = tmq_create_stream(pConn, "stream1", "out1", sql);*/
pRes = taos_query(
pConn,
"create stream stream1 trigger window_close as select min(k), max(k), sum(k) as sum_of_k from tu1 interval(10m)");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to create stream out1, reason:%s\n", taos_errstr(pRes)); printf("failed to create stream stream1, reason:%s\n", taos_errstr(pRes));
return -1; return -1;
} }
taos_free_result(pRes); taos_free_result(pRes);
......
...@@ -86,9 +86,9 @@ typedef struct taosField { ...@@ -86,9 +86,9 @@ typedef struct taosField {
} TAOS_FIELD; } TAOS_FIELD;
#ifdef WINDOWS #ifdef WINDOWS
#define DLL_EXPORT __declspec(dllexport) #define DLL_EXPORT __declspec(dllexport)
#else #else
#define DLL_EXPORT #define DLL_EXPORT
#endif #endif
typedef void (*__taos_async_fn_t)(void *param, TAOS_RES *, int code); typedef void (*__taos_async_fn_t)(void *param, TAOS_RES *, int code);
...@@ -271,10 +271,8 @@ DLL_EXPORT int64_t tmq_get_response_offset(tmq_message_t *message); ...@@ -271,10 +271,8 @@ DLL_EXPORT int64_t tmq_get_response_offset(tmq_message_t *message);
/* --------------------TMPORARY INTERFACE FOR TESTING--------------------- */ /* --------------------TMPORARY INTERFACE FOR TESTING--------------------- */
#if 0 #if 0
DLL_EXPORT TAOS_RES *tmq_create_topic(TAOS *taos, const char *name, const char *sql, int sqlLen); DLL_EXPORT TAOS_RES *tmq_create_topic(TAOS *taos, const char *name, const char *sql, int sqlLen);
#endif
DLL_EXPORT TAOS_RES *tmq_create_stream(TAOS *taos, const char *streamName, const char *tbName, const char *sql); DLL_EXPORT TAOS_RES *tmq_create_stream(TAOS *taos, const char *streamName, const char *tbName, const char *sql);
#endif
/* ------------------------------ TMQ END -------------------------------- */ /* ------------------------------ TMQ END -------------------------------- */
#if 1 // Shuduo: temporary enable for app build #if 1 // Shuduo: temporary enable for app build
typedef void (*TAOS_SUBSCRIBE_CALLBACK)(TAOS_SUB *tsub, TAOS_RES *res, void *param, int code); typedef void (*TAOS_SUBSCRIBE_CALLBACK)(TAOS_SUB *tsub, TAOS_RES *res, void *param, int code);
......
...@@ -35,19 +35,6 @@ enum { ...@@ -35,19 +35,6 @@ enum {
STREAM_CREATED_BY__SMA, STREAM_CREATED_BY__SMA,
}; };
#if 0
// pipe -> fetch/pipe queue
// merge -> merge queue
// write -> write queue
enum {
TASK_DISPATCH_MSG__SND_PIPE = 1,
TASK_DISPATCH_MSG__SND_MERGE,
TASK_DISPATCH_MSG__VND_PIPE,
TASK_DISPATCH_MSG__VND_MERGE,
TASK_DISPATCH_MSG__VND_WRITE,
};
#endif
typedef struct { typedef struct {
int32_t nodeId; // 0 for snode int32_t nodeId; // 0 for snode
SEpSet epSet; SEpSet epSet;
...@@ -100,10 +87,6 @@ typedef struct { ...@@ -100,10 +87,6 @@ typedef struct {
int8_t reserved; int8_t reserved;
} STaskSinkFetch; } STaskSinkFetch;
typedef struct {
int8_t reserved;
} STaskSinkShow;
enum { enum {
TASK_SOURCE__SCAN = 1, TASK_SOURCE__SCAN = 1,
TASK_SOURCE__PIPE, TASK_SOURCE__PIPE,
...@@ -128,7 +111,6 @@ enum { ...@@ -128,7 +111,6 @@ enum {
TASK_SINK__TABLE, TASK_SINK__TABLE,
TASK_SINK__SMA, TASK_SINK__SMA,
TASK_SINK__FETCH, TASK_SINK__FETCH,
TASK_SINK__SHOW,
}; };
typedef struct { typedef struct {
...@@ -155,7 +137,6 @@ typedef struct { ...@@ -155,7 +137,6 @@ typedef struct {
STaskSinkTb tbSink; STaskSinkTb tbSink;
STaskSinkSma smaSink; STaskSinkSma smaSink;
STaskSinkFetch fetchSink; STaskSinkFetch fetchSink;
STaskSinkShow showSink;
}; };
// dispatch // dispatch
......
...@@ -667,7 +667,7 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { ...@@ -667,7 +667,7 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
if (code != 0) goto FAIL; if (code != 0) goto FAIL;
while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) { while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) {
tscDebug("not ready, retry"); tscDebug("consumer not ready, retry");
taosMsleep(500); taosMsleep(500);
} }
...@@ -693,6 +693,7 @@ void tmq_conf_set_offset_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb) { ...@@ -693,6 +693,7 @@ void tmq_conf_set_offset_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb) {
conf->commitCb = cb; conf->commitCb = cb;
} }
#if 0
TAOS_RES* tmq_create_stream(TAOS* taos, const char* streamName, const char* tbName, const char* sql) { TAOS_RES* tmq_create_stream(TAOS* taos, const char* streamName, const char* tbName, const char* sql) {
STscObj* pTscObj = (STscObj*)taos; STscObj* pTscObj = (STscObj*)taos;
SRequestObj* pRequest = NULL; SRequestObj* pRequest = NULL;
...@@ -777,6 +778,7 @@ _return: ...@@ -777,6 +778,7 @@ _return:
return pRequest; return pRequest;
} }
#endif
#if 0 #if 0
int32_t tmqGetSkipLogNum(tmq_message_t* tmq_message) { int32_t tmqGetSkipLogNum(tmq_message_t* tmq_message) {
......
...@@ -418,6 +418,9 @@ int32_t tEncodeSStreamObj(SCoder *pEncoder, const SStreamObj *pObj) { ...@@ -418,6 +418,9 @@ int32_t tEncodeSStreamObj(SCoder *pEncoder, const SStreamObj *pObj) {
if (tEncodeI32(pEncoder, pObj->version) < 0) return -1; if (tEncodeI32(pEncoder, pObj->version) < 0) return -1;
if (tEncodeI8(pEncoder, pObj->status) < 0) return -1; if (tEncodeI8(pEncoder, pObj->status) < 0) return -1;
if (tEncodeI8(pEncoder, pObj->createdBy) < 0) return -1; if (tEncodeI8(pEncoder, pObj->createdBy) < 0) return -1;
if (tEncodeI8(pEncoder, pObj->trigger) < 0) return -1;
if (tEncodeI32(pEncoder, pObj->triggerParam) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->waterMark) < 0) return -1;
if (tEncodeI32(pEncoder, pObj->fixedSinkVgId) < 0) return -1; if (tEncodeI32(pEncoder, pObj->fixedSinkVgId) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->smaId) < 0) return -1; if (tEncodeI64(pEncoder, pObj->smaId) < 0) return -1;
if (tEncodeCStr(pEncoder, pObj->sql) < 0) return -1; if (tEncodeCStr(pEncoder, pObj->sql) < 0) return -1;
...@@ -464,6 +467,9 @@ int32_t tDecodeSStreamObj(SCoder *pDecoder, SStreamObj *pObj) { ...@@ -464,6 +467,9 @@ int32_t tDecodeSStreamObj(SCoder *pDecoder, SStreamObj *pObj) {
if (tDecodeI32(pDecoder, &pObj->version) < 0) return -1; if (tDecodeI32(pDecoder, &pObj->version) < 0) return -1;
if (tDecodeI8(pDecoder, &pObj->status) < 0) return -1; if (tDecodeI8(pDecoder, &pObj->status) < 0) return -1;
if (tDecodeI8(pDecoder, &pObj->createdBy) < 0) return -1; if (tDecodeI8(pDecoder, &pObj->createdBy) < 0) return -1;
if (tDecodeI8(pDecoder, &pObj->trigger) < 0) return -1;
if (tDecodeI32(pDecoder, &pObj->triggerParam) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->waterMark) < 0) return -1;
if (tDecodeI32(pDecoder, &pObj->fixedSinkVgId) < 0) return -1; if (tDecodeI32(pDecoder, &pObj->fixedSinkVgId) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->smaId) < 0) return -1; if (tDecodeI64(pDecoder, &pObj->smaId) < 0) return -1;
if (tDecodeCStrAlloc(pDecoder, &pObj->sql) < 0) return -1; if (tDecodeCStrAlloc(pDecoder, &pObj->sql) < 0) return -1;
......
...@@ -308,6 +308,8 @@ static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamRe ...@@ -308,6 +308,8 @@ static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamRe
streamObj.smaId = 0; streamObj.smaId = 0;
/*streamObj.physicalPlan = "";*/ /*streamObj.physicalPlan = "";*/
streamObj.logicalPlan = "not implemented"; streamObj.logicalPlan = "not implemented";
streamObj.trigger = pCreate->triggerType;
streamObj.waterMark = pCreate->watermark;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_CREATE_STREAM, &pReq->rpcMsg); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_CREATE_STREAM, &pReq->rpcMsg);
if (pTrans == NULL) { if (pTrans == NULL) {
...@@ -431,7 +433,7 @@ static int32_t mndRetrieveStream(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *p ...@@ -431,7 +433,7 @@ static int32_t mndRetrieveStream(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *p
SStreamObj *pStream = NULL; SStreamObj *pStream = NULL;
while (numOfRows < rows) { while (numOfRows < rows) {
pShow->pIter = sdbFetch(pSdb, SDB_TOPIC, pShow->pIter, (void **)&pStream); pShow->pIter = sdbFetch(pSdb, SDB_STREAM, pShow->pIter, (void **)&pStream);
if (pShow->pIter == NULL) break; if (pShow->pIter == NULL) break;
SColumnInfoData *pColInfo; SColumnInfoData *pColInfo;
...@@ -471,8 +473,13 @@ static int32_t mndRetrieveStream(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *p ...@@ -471,8 +473,13 @@ static int32_t mndRetrieveStream(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *p
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pStream->trigger, false); colDataAppend(pColInfo, numOfRows, (const char *)&pStream->trigger, false);
numOfRows++;
sdbRelease(pSdb, pStream);
} }
return 0;
pShow->numOfRows += numOfRows;
return numOfRows;
} }
static void mndCancelGetNextStream(SMnode *pMnode, void *pIter) { static void mndCancelGetNextStream(SMnode *pMnode, void *pIter) {
......
...@@ -159,7 +159,7 @@ SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, boo ...@@ -159,7 +159,7 @@ SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, boo
// decode // decode
pBuf = pVal; pBuf = pVal;
pSW = taosMemoryMalloc(sizeof(pSW)); pSW = taosMemoryMalloc(sizeof(SSchemaWrapper));
tCoderInit(&coder, TD_LITTLE_ENDIAN, pVal, vLen, TD_DECODER); tCoderInit(&coder, TD_LITTLE_ENDIAN, pVal, vLen, TD_DECODER);
tDecodeSSchemaWrapper(&coder, pSW); tDecodeSSchemaWrapper(&coder, pSW);
......
...@@ -864,7 +864,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int32_t parallel) { ...@@ -864,7 +864,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int32_t parallel) {
} }
int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) { int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) {
SStreamTask* pTask = taosMemoryMalloc(sizeof(SStreamTask)); SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
if (pTask == NULL) { if (pTask == NULL) {
return -1; return -1;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册