未验证 提交 8132dfd3 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #10106 from taosdata/feature/tq

fix uninitialized
......@@ -2,6 +2,8 @@ build/
compile_commands.json
.cache
.ycm_extra_conf.py
.tasks
.vimspector.json
.vscode/
.idea/
cmake-build-debug/
......
......@@ -16,6 +16,7 @@
#include <stdio.h>
#include <string.h>
#include <assert.h>
#include <time.h>
#include "taos.h"
static int running = 1;
......@@ -65,7 +66,7 @@ int32_t init_env() {
taos_free_result(pRes);
char* sql = "select * from st1";
const char* sql = "select * from st1";
pRes = tmq_create_topic(pConn, "test_stb_topic_1", sql, strlen(sql));
/*if (taos_errno(pRes) != 0) {*/
/*printf("failed to create topic test_stb_topic_1, reason:%s\n", taos_errstr(pRes));*/
......@@ -112,14 +113,20 @@ void basic_consume_loop(tmq_t *tmq,
printf("subscribe err\n");
return;
}
int32_t cnt = 0;
clock_t startTime = clock();
while (running) {
tmq_message_t *tmqmessage = tmq_consumer_poll(tmq, 500);
if (tmq) {
msg_process(tmqmessage);
tmq_message_t *tmqmessage = tmq_consumer_poll(tmq, 0);
if (tmqmessage) {
cnt++;
/*msg_process(tmqmessage);*/
tmq_message_destroy(tmqmessage);
} else {
break;
}
}
clock_t endTime = clock();
printf("log cnt: %d %f s\n", cnt, (double)(endTime - startTime) / CLOCKS_PER_SEC);
err = tmq_consumer_close(tmq);
if (err)
......@@ -163,6 +170,6 @@ int main() {
code = init_env();
tmq_t* tmq = build_consumer();
tmq_list_t* topic_list = build_topic_list();
/*basic_consume_loop(tmq, topic_list);*/
sync_consume_loop(tmq, topic_list);
basic_consume_loop(tmq, topic_list);
/*sync_consume_loop(tmq, topic_list);*/
}
......@@ -213,7 +213,7 @@ typedef struct tmq_message_t tmq_message_t;
typedef void(tmq_commit_cb(tmq_t *, tmq_resp_err_t, tmq_topic_vgroup_list_t *, void *param));
DLL_EXPORT tmq_list_t *tmq_list_new();
DLL_EXPORT int32_t tmq_list_append(tmq_list_t *, char *);
DLL_EXPORT int32_t tmq_list_append(tmq_list_t *, const char *);
DLL_EXPORT TAOS_RES *tmq_create_topic(TAOS *taos, const char *name, const char *sql, int sqlLen);
DLL_EXPORT tmq_t *tmq_consumer_new(void *conn, tmq_conf_t *conf, char *errstr, int32_t errstrLen);
......
......@@ -38,22 +38,22 @@ extern "C" {
#define TSWAP(a, b, c) \
do { \
typeof(a) __tmp = (a); \
__typeof(a) __tmp = (a); \
(a) = (b); \
(b) = __tmp; \
} while (0)
#define TMAX(a, b) \
({ \
typeof(a) __a = (a); \
typeof(b) __b = (b); \
__typeof(a) __a = (a); \
__typeof(b) __b = (b); \
(__a > __b) ? __a : __b; \
})
#define TMIN(a, b) \
({ \
typeof(a) __a = (a); \
typeof(b) __b = (b); \
__typeof(a) __a = (a); \
__typeof(b) __b = (b); \
(__a < __b) ? __a : __b; \
})
#endif
......
......@@ -13,6 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "clientInt.h"
#include "clientLog.h"
#include "parser.h"
......@@ -146,7 +148,7 @@ tmq_list_t* tmq_list_new() {
return ptr;
}
int32_t tmq_list_append(tmq_list_t* ptr, char* src) {
int32_t tmq_list_append(tmq_list_t* ptr, const char* src) {
if (ptr->cnt >= ptr->tot - 1) return -1;
ptr->elems[ptr->cnt] = strdup(src);
ptr->cnt++;
......@@ -366,7 +368,7 @@ TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, i
.igExists = 1,
.physicalPlan = (char*)pStr,
.sql = (char*)sql,
.logicalPlan = "no logic plan",
.logicalPlan = (char*)"no logic plan",
};
int tlen = tSerializeSCMCreateTopicReq(NULL, &req);
......@@ -512,7 +514,7 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
return -1;
}
tDecodeSMqConsumeRsp(pMsg->pData, pRsp);
/*printf("rsp %ld %ld\n", pRsp->committedOffset, pRsp->rspOffset);*/
/*printf("rsp %ld %ld %d\n", pRsp->committedOffset, pRsp->rspOffset, pRsp->numOfTopics);*/
if (pRsp->numOfTopics == 0) {
/*printf("no data\n");*/
free(pRsp);
......
......@@ -630,14 +630,6 @@ void operateVal(void *dst, void *s1, void *s2, int32_t optr, int32_t type) {
}
}
#define TSWAP(a, b, c) \
do { \
typeof(a) __tmp = (a); \
(a) = (b); \
(b) = __tmp; \
} while (0)
void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size, void* buf) {
switch (type) {
case TSDB_DATA_TYPE_INT:
......
......@@ -14,6 +14,7 @@
*/
#define _DEFAULT_SOURCE
#include "mndConsumer.h"
#include "mndDb.h"
#include "mndDnode.h"
......@@ -54,13 +55,14 @@ void mndCleanupConsumer(SMnode *pMnode) {}
SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
void* buf = NULL;
int32_t tlen = tEncodeSMqConsumerObj(NULL, pConsumer);
int32_t size = sizeof(int32_t) + tlen + MND_CONSUMER_RESERVE_SIZE;
SSdbRaw *pRaw = sdbAllocRaw(SDB_CONSUMER, MND_CONSUMER_VER_NUMBER, size);
if (pRaw == NULL) goto CM_ENCODE_OVER;
void *buf = malloc(tlen);
buf = malloc(tlen);
if (buf == NULL) goto CM_ENCODE_OVER;
void *abuf = buf;
......@@ -88,6 +90,7 @@ CM_ENCODE_OVER:
SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
void* buf = NULL;
int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto CM_DECODE_OVER;
......@@ -106,7 +109,7 @@ SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) {
int32_t dataPos = 0;
int32_t len;
SDB_GET_INT32(pRaw, dataPos, &len, CM_DECODE_OVER);
void *buf = malloc(len);
buf = malloc(len);
if (buf == NULL) goto CM_DECODE_OVER;
SDB_GET_BINARY(pRaw, dataPos, buf, len, CM_DECODE_OVER);
SDB_GET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CM_DECODE_OVER);
......
......@@ -12,6 +12,7 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "mndSubscribe.h"
#include "mndConsumer.h"
......@@ -372,13 +373,14 @@ void mndCleanupSubscribe(SMnode *pMnode) {}
static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *pSub) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
void* buf = NULL;
int32_t tlen = tEncodeSubscribeObj(NULL, pSub);
int32_t size = sizeof(int32_t) + tlen + MND_SUBSCRIBE_RESERVE_SIZE;
SSdbRaw *pRaw = sdbAllocRaw(SDB_SUBSCRIBE, MND_SUBSCRIBE_VER_NUMBER, size);
if (pRaw == NULL) goto SUB_ENCODE_OVER;
void *buf = malloc(tlen);
buf = malloc(tlen);
if (buf == NULL) goto SUB_ENCODE_OVER;
void *abuf = buf;
......@@ -406,6 +408,7 @@ SUB_ENCODE_OVER:
static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
void* buf = NULL;
int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto SUB_DECODE_OVER;
......@@ -425,7 +428,7 @@ static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw) {
int32_t dataPos = 0;
int32_t tlen;
SDB_GET_INT32(pRaw, dataPos, &tlen, SUB_DECODE_OVER);
void *buf = malloc(tlen + 1);
buf = malloc(tlen + 1);
if (buf == NULL) goto SUB_DECODE_OVER;
SDB_GET_BINARY(pRaw, dataPos, buf, tlen, SUB_DECODE_OVER);
SDB_GET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_DECODE_OVER);
......
......@@ -16,9 +16,9 @@
#ifndef _TD_TQ_INT_H_
#define _TD_TQ_INT_H_
#include "tq.h"
#include "meta.h"
#include "tlog.h"
#include "tq.h"
#include "trpc.h"
#ifdef __cplusplus
extern "C" {
......@@ -26,29 +26,48 @@ extern "C" {
extern int32_t tqDebugFlag;
#define tqFatal(...) { if (tqDebugFlag & DEBUG_FATAL) { taosPrintLog("TQ FATAL ", 255, __VA_ARGS__); }}
#define tqError(...) { if (tqDebugFlag & DEBUG_ERROR) { taosPrintLog("TQ ERROR ", 255, __VA_ARGS__); }}
#define tqWarn(...) { if (tqDebugFlag & DEBUG_WARN) { taosPrintLog("TQ WARN ", 255, __VA_ARGS__); }}
#define tqInfo(...) { if (tqDebugFlag & DEBUG_INFO) { taosPrintLog("TQ ", 255, __VA_ARGS__); }}
#define tqDebug(...) { if (tqDebugFlag & DEBUG_DEBUG) { taosPrintLog("TQ ", tqDebugFlag, __VA_ARGS__); }}
#define tqTrace(...) { if (tqDebugFlag & DEBUG_TRACE) { taosPrintLog("TQ ", tqDebugFlag, __VA_ARGS__); }}
// create persistent storage for meta info such as consuming offset
// return value > 0: cgId
// return value <= 0: error code
// int tqCreateTCGroup(STQ*, const char* topic, int cgId, tqBufferHandle** handle);
// create ring buffer in memory and load consuming offset
// int tqOpenTCGroup(STQ*, const char* topic, int cgId);
// destroy ring buffer and persist consuming offset
// int tqCloseTCGroup(STQ*, const char* topic, int cgId);
// delete persistent storage for meta info
// int tqDropTCGroup(STQ*, const char* topic, int cgId);
//int tqSerializeGroup(const STqGroup*, STqSerializedHead**);
//const void* tqDeserializeGroup(const STqSerializedHead* pHead, STqGroup** ppGroup);
#define tqFatal(...) \
{ \
if (tqDebugFlag & DEBUG_FATAL) { \
taosPrintLog("TQ FATAL ", 255, __VA_ARGS__); \
} \
}
#define tqError(...) \
{ \
if (tqDebugFlag & DEBUG_ERROR) { \
taosPrintLog("TQ ERROR ", 255, __VA_ARGS__); \
} \
}
#define tqWarn(...) \
{ \
if (tqDebugFlag & DEBUG_WARN) { \
taosPrintLog("TQ WARN ", 255, __VA_ARGS__); \
} \
}
#define tqInfo(...) \
{ \
if (tqDebugFlag & DEBUG_INFO) { \
taosPrintLog("TQ ", 255, __VA_ARGS__); \
} \
}
#define tqDebug(...) \
{ \
if (tqDebugFlag & DEBUG_DEBUG) { \
taosPrintLog("TQ ", tqDebugFlag, __VA_ARGS__); \
} \
}
#define tqTrace(...) \
{ \
if (tqDebugFlag & DEBUG_TRACE) { \
taosPrintLog("TQ ", tqDebugFlag, __VA_ARGS__); \
} \
}
int tqSerializeConsumer(const STqConsumerHandle*, STqSerializedHead**);
const void* tqDeserializeConsumer(const STqSerializedHead* pHead, STqConsumerHandle**);
static int FORCE_INLINE tqQueryExecuting(int32_t status) { return status; }
#ifdef __cplusplus
}
#endif
......
......@@ -12,19 +12,12 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "tcompare.h"
#include "tqInt.h"
#include "tqMetaStore.h"
// static
// read next version data
//
// send to fetch queue
//
// handle management message
//
int tqInit() {
int8_t old = atomic_val_compare_exchange_8(&tqMgmt.inited, 0, 1);
if (old == 1) return 0;
......
......@@ -170,7 +170,7 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) {
}
if (pRead->pHead->head.version != ver) {
/*wError("unexpected wal log version: %ld, read request version:%ld", pRead->pHead->head.version, ver);*/
wError("unexpected wal log version: %ld, read request version:%ld", pRead->pHead->head.version, ver);
pRead->curVersion = -1;
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
return -1;
......@@ -178,7 +178,7 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) {
code = walValidBodyCksum(pRead->pHead);
if (code != 0) {
/*wError("unexpected wal log version: checksum not passed");*/
wError("unexpected wal log version: checksum not passed");
pRead->curVersion = -1;
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
return -1;
......
......@@ -13,6 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "osString.h"
......
......@@ -13,6 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "tbuffer.h"
#include "exception.h"
#include "os.h"
......
......@@ -19,7 +19,6 @@
#include "tnote.h"
#include "tutil.h"
#include "ulog.h"
//#include "zlib.h"
#define MAX_LOGLINE_SIZE (1000)
#define MAX_LOGLINE_BUFFER_SIZE (MAX_LOGLINE_SIZE + 10)
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "tpagedfile.h"
#include "thash.h"
#include "stddef.h"
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册