未验证 提交 9f1c7968 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #11452 from taosdata/feature/tq

feat: support tmq msg parse
......@@ -19,8 +19,8 @@
#include <time.h>
#include "taos.h"
static int running = 1;
static void msg_process(tmq_message_t* message) { tmqShowMsg(message); }
static int running = 1;
/*static void msg_process(tmq_message_t* message) { tmqShowMsg(message); }*/
int32_t init_env() {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
......@@ -166,11 +166,11 @@ void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
int32_t cnt = 0;
/*clock_t startTime = clock();*/
while (running) {
tmq_message_t* tmqmessage = tmq_consumer_poll(tmq, 500);
TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 500);
if (tmqmessage) {
cnt++;
printf("get data\n");
msg_process(tmqmessage);
/*printf("get data\n");*/
/*msg_process(tmqmessage);*/
tmq_message_destroy(tmqmessage);
/*} else {*/
/*break;*/
......@@ -198,9 +198,9 @@ void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
}
while (running) {
tmq_message_t* tmqmessage = tmq_consumer_poll(tmq, 1000);
TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 1000);
if (tmqmessage) {
msg_process(tmqmessage);
/*msg_process(tmqmessage);*/
tmq_message_destroy(tmqmessage);
if ((++msg_count % MIN_COMMIT_COUNT) == 0) tmq_commit(tmq, NULL, 0);
......@@ -226,10 +226,10 @@ void perf_loop(tmq_t* tmq, tmq_list_t* topics) {
int32_t skipLogNum = 0;
clock_t startTime = clock();
while (running) {
tmq_message_t* tmqmessage = tmq_consumer_poll(tmq, 500);
TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 500);
if (tmqmessage) {
batchCnt++;
skipLogNum += tmqGetSkipLogNum(tmqmessage);
/*skipLogNum += tmqGetSkipLogNum(tmqmessage);*/
/*msg_process(tmqmessage);*/
tmq_message_destroy(tmqmessage);
} else {
......
......@@ -30,7 +30,7 @@ typedef void **TAOS_ROW;
#if 0
typedef void TAOS_STREAM;
#endif
typedef void TAOS_SUB;
typedef void TAOS_SUB;
// Data type definition
#define TSDB_DATA_TYPE_NULL 0 // 1 bytes
......@@ -138,13 +138,13 @@ typedef enum {
#define RET_MSG_LENGTH 1024
typedef struct setConfRet {
SET_CONF_RET_CODE retCode;
char retMsg[RET_MSG_LENGTH];
char retMsg[RET_MSG_LENGTH];
} setConfRet;
DLL_EXPORT void taos_cleanup(void);
DLL_EXPORT int taos_options(TSDB_OPTION option, const void *arg, ...);
DLL_EXPORT setConfRet taos_set_config(const char *config);
DLL_EXPORT TAOS *taos_connect(const char *ip, const char *user, const char *pass, const char *db, uint16_t port);
DLL_EXPORT void taos_cleanup(void);
DLL_EXPORT int taos_options(TSDB_OPTION option, const void *arg, ...);
DLL_EXPORT setConfRet taos_set_config(const char *config);
DLL_EXPORT TAOS *taos_connect(const char *ip, const char *user, const char *pass, const char *db, uint16_t port);
DLL_EXPORT TAOS *taos_connect_l(const char *ip, int ipLen, const char *user, int userLen, const char *pass, int passLen,
const char *db, int dbLen, uint16_t port);
DLL_EXPORT TAOS *taos_connect_auth(const char *ip, const char *user, const char *auth, const char *db, uint16_t port);
......@@ -152,34 +152,34 @@ DLL_EXPORT void taos_close(TAOS *taos);
const char *taos_data_type(int type);
DLL_EXPORT TAOS_STMT *taos_stmt_init(TAOS *taos);
DLL_EXPORT int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length);
DLL_EXPORT int taos_stmt_set_tbname_tags(TAOS_STMT *stmt, const char *name, TAOS_BIND *tags);
DLL_EXPORT int taos_stmt_set_tbname(TAOS_STMT *stmt, const char *name);
DLL_EXPORT int taos_stmt_set_sub_tbname(TAOS_STMT *stmt, const char *name);
DLL_EXPORT int taos_stmt_is_insert(TAOS_STMT *stmt, int *insert);
DLL_EXPORT int taos_stmt_num_params(TAOS_STMT *stmt, int *nums);
DLL_EXPORT int taos_stmt_get_param(TAOS_STMT *stmt, int idx, int *type, int *bytes);
DLL_EXPORT int taos_stmt_bind_param(TAOS_STMT *stmt, TAOS_BIND *bind);
DLL_EXPORT int taos_stmt_bind_param_batch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind);
DLL_EXPORT int taos_stmt_bind_single_param_batch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind, int colIdx);
DLL_EXPORT int taos_stmt_add_batch(TAOS_STMT *stmt);
DLL_EXPORT int taos_stmt_execute(TAOS_STMT *stmt);
DLL_EXPORT TAOS_RES *taos_stmt_use_result(TAOS_STMT *stmt);
DLL_EXPORT int taos_stmt_close(TAOS_STMT *stmt);
DLL_EXPORT char *taos_stmt_errstr(TAOS_STMT *stmt);
DLL_EXPORT int taos_stmt_affected_rows(TAOS_STMT *stmt);
DLL_EXPORT TAOS_RES *taos_query(TAOS *taos, const char *sql);
DLL_EXPORT TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen);
DLL_EXPORT TAOS_ROW taos_fetch_row(TAOS_RES *res);
DLL_EXPORT int taos_result_precision(TAOS_RES *res); // get the time precision of result
DLL_EXPORT void taos_free_result(TAOS_RES *res);
DLL_EXPORT int taos_field_count(TAOS_RES *res);
DLL_EXPORT int taos_num_fields(TAOS_RES *res);
DLL_EXPORT int taos_affected_rows(TAOS_RES *res);
DLL_EXPORT TAOS_STMT *taos_stmt_init(TAOS *taos);
DLL_EXPORT int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length);
DLL_EXPORT int taos_stmt_set_tbname_tags(TAOS_STMT *stmt, const char *name, TAOS_BIND *tags);
DLL_EXPORT int taos_stmt_set_tbname(TAOS_STMT *stmt, const char *name);
DLL_EXPORT int taos_stmt_set_sub_tbname(TAOS_STMT *stmt, const char *name);
DLL_EXPORT int taos_stmt_is_insert(TAOS_STMT *stmt, int *insert);
DLL_EXPORT int taos_stmt_num_params(TAOS_STMT *stmt, int *nums);
DLL_EXPORT int taos_stmt_get_param(TAOS_STMT *stmt, int idx, int *type, int *bytes);
DLL_EXPORT int taos_stmt_bind_param(TAOS_STMT *stmt, TAOS_BIND *bind);
DLL_EXPORT int taos_stmt_bind_param_batch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind);
DLL_EXPORT int taos_stmt_bind_single_param_batch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind, int colIdx);
DLL_EXPORT int taos_stmt_add_batch(TAOS_STMT *stmt);
DLL_EXPORT int taos_stmt_execute(TAOS_STMT *stmt);
DLL_EXPORT TAOS_RES *taos_stmt_use_result(TAOS_STMT *stmt);
DLL_EXPORT int taos_stmt_close(TAOS_STMT *stmt);
DLL_EXPORT char *taos_stmt_errstr(TAOS_STMT *stmt);
DLL_EXPORT int taos_stmt_affected_rows(TAOS_STMT *stmt);
DLL_EXPORT TAOS_RES *taos_query(TAOS *taos, const char *sql);
DLL_EXPORT TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen);
DLL_EXPORT TAOS_ROW taos_fetch_row(TAOS_RES *res);
DLL_EXPORT int taos_result_precision(TAOS_RES *res); // get the time precision of result
DLL_EXPORT void taos_free_result(TAOS_RES *res);
DLL_EXPORT int taos_field_count(TAOS_RES *res);
DLL_EXPORT int taos_num_fields(TAOS_RES *res);
DLL_EXPORT int taos_affected_rows(TAOS_RES *res);
DLL_EXPORT TAOS_FIELD *taos_fetch_fields(TAOS_RES *res);
DLL_EXPORT int taos_select_db(TAOS *taos, const char *db);
......@@ -188,14 +188,14 @@ DLL_EXPORT void taos_stop_query(TAOS_RES *res);
DLL_EXPORT bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col);
DLL_EXPORT bool taos_is_update_query(TAOS_RES *res);
DLL_EXPORT int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows);
DLL_EXPORT int taos_fetch_block_s(TAOS_RES *res, int* numOfRows, TAOS_ROW *rows);
DLL_EXPORT int taos_fetch_raw_block(TAOS_RES *res, int* numOfRows, void** pData);
DLL_EXPORT int taos_fetch_block_s(TAOS_RES *res, int *numOfRows, TAOS_ROW *rows);
DLL_EXPORT int taos_fetch_raw_block(TAOS_RES *res, int *numOfRows, void **pData);
DLL_EXPORT int *taos_get_column_data_offset(TAOS_RES *res, int columnIndex);
DLL_EXPORT int taos_validate_sql(TAOS *taos, const char *sql);
DLL_EXPORT void taos_reset_current_db(TAOS *taos);
DLL_EXPORT int *taos_fetch_lengths(TAOS_RES *res);
DLL_EXPORT TAOS_ROW *taos_result_block(TAOS_RES *res);
DLL_EXPORT int *taos_fetch_lengths(TAOS_RES *res);
DLL_EXPORT TAOS_ROW *taos_result_block(TAOS_RES *res);
DLL_EXPORT const char *taos_get_server_info(TAOS *taos);
DLL_EXPORT const char *taos_get_client_info();
......@@ -237,9 +237,9 @@ typedef struct tmq_t tmq_t;
typedef struct tmq_topic_vgroup_t tmq_topic_vgroup_t;
typedef struct tmq_topic_vgroup_list_t tmq_topic_vgroup_list_t;
typedef struct tmq_conf_t tmq_conf_t;
typedef struct tmq_list_t tmq_list_t;
typedef struct tmq_message_t tmq_message_t;
typedef struct tmq_conf_t tmq_conf_t;
typedef struct tmq_list_t tmq_list_t;
// typedef struct tmq_message_t tmq_message_t;
typedef void(tmq_commit_cb(tmq_t *, tmq_resp_err_t, tmq_topic_vgroup_list_t *, void *param));
......@@ -259,7 +259,7 @@ DLL_EXPORT const char *tmq_err2str(tmq_resp_err_t);
DLL_EXPORT tmq_resp_err_t tmq_subscribe(tmq_t *tmq, tmq_list_t *topic_list);
DLL_EXPORT tmq_resp_err_t tmq_unsubscribe(tmq_t *tmq);
DLL_EXPORT tmq_resp_err_t tmq_subscription(tmq_t *tmq, tmq_list_t **topics);
DLL_EXPORT tmq_message_t *tmq_consumer_poll(tmq_t *tmq, int64_t blocking_time);
DLL_EXPORT TAOS_RES *tmq_consumer_poll(tmq_t *tmq, int64_t blocking_time);
DLL_EXPORT tmq_resp_err_t tmq_consumer_close(tmq_t *tmq);
#if 0
DLL_EXPORT tmq_resp_err_t tmq_assign(tmq_t* tmq, const tmq_topic_vgroup_list_t* vgroups);
......@@ -268,8 +268,8 @@ DLL_EXPORT tmq_resp_err_t tmq_assignment(tmq_t* tmq, tmq_topic_vgroup_list_t** v
DLL_EXPORT tmq_resp_err_t tmq_commit(tmq_t *tmq, const tmq_topic_vgroup_list_t *offsets, int32_t async);
#if 0
DLL_EXPORT tmq_resp_err_t tmq_commit_message(tmq_t* tmq, const tmq_message_t* tmqmessage, int32_t async);
#endif
DLL_EXPORT tmq_resp_err_t tmq_seek(tmq_t *tmq, const tmq_topic_vgroup_t *offset);
#endif
/* ----------------------TMQ CONFIGURATION INTERFACE---------------------- */
enum tmq_conf_res_t {
......@@ -285,21 +285,24 @@ DLL_EXPORT tmq_conf_res_t tmq_conf_set(tmq_conf_t *conf, const char *key, const
DLL_EXPORT void tmq_conf_destroy(tmq_conf_t *conf);
DLL_EXPORT void tmq_conf_set_offset_commit_cb(tmq_conf_t *conf, tmq_commit_cb *cb);
#if 0
// temporary used function for demo only
void tmqShowMsg(tmq_message_t *tmq_message);
int32_t tmqGetSkipLogNum(tmq_message_t *tmq_message);
#endif
/* -------------------------TMQ MSG HANDLE INTERFACE---------------------- */
DLL_EXPORT char *tmq_get_topic_name(TAOS_RES *res);
DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res);
#if 0
DLL_EXPORT TAOS_ROW tmq_get_row(tmq_message_t *message);
DLL_EXPORT char *tmq_get_topic_name(tmq_message_t *message);
DLL_EXPORT int32_t tmq_get_vgroup_id(tmq_message_t *message);
DLL_EXPORT int64_t tmq_get_request_offset(tmq_message_t *message);
DLL_EXPORT int64_t tmq_get_response_offset(tmq_message_t *message);
DLL_EXPORT TAOS_FIELD *tmq_get_fields(tmq_t *tmq, const char *topic);
DLL_EXPORT int32_t tmq_field_count(tmq_t *tmq, const char *topic);
DLL_EXPORT void tmq_message_destroy(tmq_message_t *tmq_message);
#endif
DLL_EXPORT void tmq_message_destroy(TAOS_RES *res);
/* --------------------TMPORARY INTERFACE FOR TESTING--------------------- */
#if 0
DLL_EXPORT TAOS_RES *tmq_create_topic(TAOS *taos, const char *name, const char *sql, int sqlLen);
......@@ -308,7 +311,7 @@ DLL_EXPORT TAOS_RES *tmq_create_topic(TAOS *taos, const char *name, const char *
DLL_EXPORT TAOS_RES *tmq_create_stream(TAOS *taos, const char *streamName, const char *tbName, const char *sql);
/* ------------------------------ TMQ END -------------------------------- */
#if 1 // Shuduo: temporary enable for app build
#if 1 // Shuduo: temporary enable for app build
typedef void (*TAOS_SUBSCRIBE_CALLBACK)(TAOS_SUB *tsub, TAOS_RES *res, void *param, int code);
#endif
......
......@@ -199,11 +199,11 @@ int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullF
int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, uint32_t numOfRows);
int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows);
void colInfoDataCleanup(SColumnInfoData* pColumn, uint32_t numOfRows);
void blockDataCleanup(SSDataBlock* pDataBlock);
void colInfoDataCleanup(SColumnInfoData* pColumn, uint32_t numOfRows);
void blockDataCleanup(SSDataBlock* pDataBlock);
size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize);
void* blockDataDestroy(SSDataBlock* pBlock);
size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize);
void* blockDataDestroy(SSDataBlock* pBlock);
int32_t blockDataTrimFirstNRows(SSDataBlock* pBlock, size_t n);
......@@ -211,8 +211,8 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock);
void blockDebugShowData(const SArray* dataBlocks);
static FORCE_INLINE int32_t blockEstimateEncodeSize(const SSDataBlock* pBlock) {
return blockDataGetSerialMetaSize(pBlock) + (int32_t)ceil(blockDataGetSerialRowSize(pBlock) * pBlock->info.rows);
static FORCE_INLINE int32_t blockGetEncodeSize(const SSDataBlock* pBlock) {
return blockDataGetSerialMetaSize(pBlock) + blockDataGetSize(pBlock);
}
static FORCE_INLINE int32_t blockCompressColData(SColumnInfoData* pColRes, int32_t numOfRows, char* data,
......
......@@ -485,7 +485,7 @@ typedef struct {
char intervalUnit;
char slidingUnit;
char
offsetUnit; // TODO Remove it, the offset is the number of precision tickle, and it must be a immutable duration.
offsetUnit; // TODO Remove it, the offset is the number of precision tickle, and it must be a immutable duration.
int8_t precision;
int64_t interval;
int64_t sliding;
......@@ -2384,6 +2384,53 @@ typedef struct {
SArray* pBlockData; // SArray<SSDataBlock>
} SMqPollRsp;
typedef struct {
SMqRspHead head;
int64_t reqOffset;
int64_t rspOffset;
int32_t skipLogNum;
int32_t dataLen;
SArray* blockPos; // beginning pos for each SRetrieveTableRsp
void* blockData; // serialized batched SRetrieveTableRsp
} SMqPollRspV2;
static FORCE_INLINE int32_t tEncodeSMqPollRspV2(void** buf, const SMqPollRspV2* pRsp) {
int32_t tlen = 0;
tlen += taosEncodeFixedI64(buf, pRsp->reqOffset);
tlen += taosEncodeFixedI64(buf, pRsp->rspOffset);
tlen += taosEncodeFixedI32(buf, pRsp->skipLogNum);
tlen += taosEncodeFixedI32(buf, pRsp->dataLen);
if (pRsp->dataLen != 0) {
int32_t sz = taosArrayGetSize(pRsp->blockPos);
tlen += taosEncodeFixedI32(buf, sz);
for (int32_t i = 0; i < sz; i++) {
int32_t blockPos = *(int32_t*)taosArrayGet(pRsp->blockPos, i);
tlen += taosEncodeFixedI32(buf, blockPos);
}
tlen += taosEncodeBinary(buf, pRsp->blockData, pRsp->dataLen);
}
return tlen;
}
static FORCE_INLINE void* tDecodeSMqPollRspV2(const void* buf, SMqPollRspV2* pRsp) {
buf = taosDecodeFixedI64(buf, &pRsp->reqOffset);
buf = taosDecodeFixedI64(buf, &pRsp->rspOffset);
buf = taosDecodeFixedI32(buf, &pRsp->skipLogNum);
buf = taosDecodeFixedI32(buf, &pRsp->dataLen);
if (pRsp->dataLen != 0) {
int32_t sz;
buf = taosDecodeFixedI32(buf, &sz);
pRsp->blockPos = taosArrayInit(sz, sizeof(int32_t));
for (int32_t i = 0; i < sz; i++) {
int32_t blockPos;
buf = taosDecodeFixedI32(buf, &blockPos);
taosArrayPush(pRsp->blockPos, &blockPos);
}
buf = taosDecodeBinary(buf, &pRsp->blockData, pRsp->dataLen);
}
return (void*)buf;
}
typedef struct {
SMqRspHead head;
char cgroup[TSDB_CGROUP_LEN];
......
......@@ -194,12 +194,12 @@ enum {
#define TD_RES_QUERY(res) (*(int8_t*)res == RES_TYPE__QUERY)
#define TD_RES_TMQ(res) (*(int8_t*)res == RES_TYPE__TMQ)
typedef struct SMqRspObj {
typedef struct {
int8_t resType;
char* topic;
void* vg;
SArray* res; // SArray<SReqResultInfo>
int32_t resIter;
int32_t vgId;
} SMqRspObj;
typedef struct SRequestObj {
......
......@@ -465,7 +465,7 @@ int taos_fetch_raw_block(TAOS_RES *res, int *numOfRows, void **pData) {
return 0;
}
doFetchRow(pRequest, false, false);
doFetchRow(pRequest, false, true);
SReqResultInfo *pResultInfo = &pRequest->body.resInfo;
......
......@@ -27,11 +27,22 @@
struct tmq_message_t {
SMqPollRsp msg;
char* topic;
void* vg;
SArray* res; // SArray<SReqResultInfo>
int32_t vgId;
int32_t resIter;
};
typedef struct {
int8_t tmqRspType;
int32_t epoch;
} SMqRspWrapper;
typedef struct {
int8_t tmqRspType;
int32_t epoch;
SMqCMGetSubEpRsp msg;
} SMqAskEpRspWrapper;
struct tmq_list_t {
SArray container;
};
......@@ -118,6 +129,14 @@ typedef struct {
TAOS_FIELD* fields;
} SMqClientTopic;
typedef struct {
int8_t tmqRspType;
int32_t epoch;
SMqClientVg* vgHandle;
SMqClientTopic* topicHandle;
SMqPollRspV2 msg;
} SMqPollRspWrapper;
typedef struct {
tmq_t* tmq;
tsem_t rspSem;
......@@ -133,10 +152,10 @@ typedef struct {
typedef struct {
tmq_t* tmq;
SMqClientVg* pVg;
SMqClientTopic* pTopic;
int32_t epoch;
int32_t vgId;
tsem_t rspSem;
tmq_message_t** msg;
int32_t sync;
} SMqPollCbParam;
......@@ -244,7 +263,7 @@ static int32_t tmqMakeTopicVgKey(char* dst, const char* topicName, int32_t vg) {
}
void tmqClearUnhandleMsg(tmq_t* tmq) {
tmq_message_t* msg = NULL;
SMqRspWrapper* msg = NULL;
while (1) {
taosGetQitem(tmq->qall, (void**)&msg);
if (msg)
......@@ -777,7 +796,7 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) {
return buf;
}
#if 0
int32_t tmqGetSkipLogNum(tmq_message_t* tmq_message) {
if (tmq_message == NULL) return 0;
SMqPollRsp* pRsp = &tmq_message->msg;
......@@ -827,11 +846,13 @@ void tmqShowMsg(tmq_message_t* tmq_message) {
}
}
}
#endif
int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
/*printf("recv poll\n");*/
SMqPollCbParam* pParam = (SMqPollCbParam*)param;
SMqClientVg* pVg = pParam->pVg;
SMqClientTopic* pTopic = pParam->pTopic;
tmq_t* tmq = pParam->tmq;
if (code != 0) {
tscWarn("msg discard from vg %d, epoch %d, code:%x", pParam->vgId, pParam->epoch, code);
......@@ -874,18 +895,22 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
#endif
/*SMqConsumeRsp* pRsp = taosMemoryCalloc(1, sizeof(SMqConsumeRsp));*/
tmq_message_t* pRsp = taosAllocateQitem(sizeof(tmq_message_t));
if (pRsp == NULL) {
/*tmq_message_t* pRsp = taosAllocateQitem(sizeof(tmq_message_t));*/
SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper));
if (pRspWrapper == NULL) {
tscWarn("msg discard from vg %d, epoch %d since out of memory", pParam->vgId, pParam->epoch);
goto CREATE_MSG_FAIL;
}
memcpy(pRsp, pMsg->pData, sizeof(SMqRspHead));
tDecodeSMqPollRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRsp->msg);
/*pRsp->iter.curBlock = 0;*/
/*pRsp->iter.curRow = 0;*/
pRspWrapper->tmqRspType = TMQ_MSG_TYPE__POLL_RSP;
pRspWrapper->vgHandle = pVg;
pRspWrapper->topicHandle = pTopic;
/*memcpy(pRsp, pMsg->pData, sizeof(SMqRspHead));*/
memcpy(&pRspWrapper->msg, pMsg->pData, sizeof(SMqRspHead));
tDecodeSMqPollRspV2(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRspWrapper->msg);
// TODO: alloc mem
/*pRsp->*/
/*printf("rsp commit off:%ld rsp off:%ld has data:%d\n", pRsp->committedOffset, pRsp->rspOffset, pRsp->numOfTopics);*/
#if 0
if (pRsp->msg.numOfTopics == 0) {
/*printf("no data\n");*/
......@@ -894,11 +919,10 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
}
#endif
tscDebug("consumer %ld recv poll: vg %d, req offset %ld, rsp offset %ld", tmq->consumerId, pParam->pVg->vgId,
pRsp->msg.reqOffset, pRsp->msg.rspOffset);
tscDebug("consumer %ld recv poll: vg %d, req offset %ld, rsp offset %ld", tmq->consumerId, pVg->vgId,
pRspWrapper->msg.reqOffset, pRspWrapper->msg.rspOffset);
pRsp->vg = pParam->pVg;
taosWriteQitem(tmq->mqueue, pRsp);
taosWriteQitem(tmq->mqueue, pRspWrapper);
atomic_add_fetch_32(&tmq->readyRequest, 1);
/*tsem_post(&tmq->rspSem);*/
return 0;
......@@ -1015,16 +1039,19 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) {
}
tDeleteSMqCMGetSubEpRsp(&rsp);
} else {
SMqCMGetSubEpRsp* pRsp = taosAllocateQitem(sizeof(SMqCMGetSubEpRsp));
if (pRsp == NULL) {
/*SMqCMGetSubEpRsp* pRsp = taosAllocateQitem(sizeof(SMqCMGetSubEpRsp));*/
SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper));
if (pWrapper == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
code = -1;
goto END;
}
memcpy(pRsp, pMsg->pData, sizeof(SMqRspHead));
tDecodeSMqCMGetSubEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pRsp);
pWrapper->tmqRspType = TMQ_MSG_TYPE__EP_RSP;
pWrapper->epoch = head->epoch;
memcpy(&pWrapper->msg, pMsg->pData, sizeof(SMqRspHead));
tDecodeSMqCMGetSubEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pWrapper->msg);
taosWriteQitem(tmq->mqueue, pRsp);
taosWriteQitem(tmq->mqueue, pWrapper);
/*tsem_post(&tmq->rspSem);*/
}
......@@ -1152,6 +1179,25 @@ SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t blockingTime, SMqClientTo
return pReq;
}
SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) {
SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj));
pRspObj->resType = RES_TYPE__TMQ;
pRspObj->topic = strdup(pWrapper->topicHandle->topicName);
pRspObj->resIter = -1;
pRspObj->vgId = pWrapper->vgHandle->vgId;
SMqPollRspV2* pRsp = &pWrapper->msg;
int32_t blockNum = taosArrayGetSize(pRsp->blockPos);
pRspObj->res = taosArrayInit(blockNum, sizeof(SReqResultInfo));
for (int32_t i = 0; i < blockNum; i++) {
int32_t pos = *(int32_t*)taosArrayGet(pRsp->blockPos, i);
SRetrieveTableRsp* pRetrieve = POINTER_SHIFT(pRsp->blockData, pos);
SReqResultInfo resInfo;
setQueryResultFromRsp(&resInfo, pRetrieve, true);
taosArrayPush(pRspObj->res, &resInfo);
}
return pRspObj;
}
#if 0
tmq_message_t* tmqSyncPollImpl(tmq_t* tmq, int64_t blockingTime) {
tmq_message_t* msg = NULL;
......@@ -1258,6 +1304,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) {
}
pParam->tmq = tmq;
pParam->pVg = pVg;
pParam->pTopic = pTopic;
pParam->vgId = pVg->vgId;
pParam->epoch = tmq->epoch;
pParam->sync = 0;
......@@ -1296,13 +1343,13 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) {
return 0;
}
// return
int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspHead* rspHead, bool* pReset) {
if (rspHead->mqMsgType == TMQ_MSG_TYPE__EP_RSP) {
int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset) {
if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
/*printf("ep %d %d\n", rspMsg->head.epoch, tmq->epoch);*/
if (rspHead->epoch > atomic_load_32(&tmq->epoch)) {
SMqCMGetSubEpRsp* rspMsg = (SMqCMGetSubEpRsp*)rspHead;
tmqUpdateEp(tmq, rspHead->epoch, rspMsg);
if (rspWrapper->epoch > atomic_load_32(&tmq->epoch)) {
SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper;
SMqCMGetSubEpRsp* rspMsg = &pEpRspWrapper->msg;
tmqUpdateEp(tmq, rspWrapper->epoch, rspMsg);
/*tmqClearUnhandleMsg(tmq);*/
*pReset = true;
} else {
......@@ -1314,41 +1361,43 @@ int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspHead* rspHead, bool* pReset) {
return 0;
}
tmq_message_t* tmqHandleAllRsp(tmq_t* tmq, int64_t blockingTime, bool pollIfReset) {
SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t blockingTime, bool pollIfReset) {
while (1) {
SMqRspHead* rspHead = NULL;
taosGetQitem(tmq->qall, (void**)&rspHead);
if (rspHead == NULL) {
SMqRspWrapper* rspWrapper = NULL;
taosGetQitem(tmq->qall, (void**)&rspWrapper);
if (rspWrapper == NULL) {
taosReadAllQitems(tmq->mqueue, tmq->qall);
taosGetQitem(tmq->qall, (void**)&rspHead);
if (rspHead == NULL) return NULL;
taosGetQitem(tmq->qall, (void**)&rspWrapper);
if (rspWrapper == NULL) return NULL;
}
if (rspHead->mqMsgType == TMQ_MSG_TYPE__POLL_RSP) {
tmq_message_t* rspMsg = (tmq_message_t*)rspHead;
if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) {
SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
atomic_sub_fetch_32(&tmq->readyRequest, 1);
/*printf("handle poll rsp %d\n", rspMsg->head.mqMsgType);*/
if (rspMsg->msg.head.epoch == atomic_load_32(&tmq->epoch)) {
if (pollRspWrapper->msg.head.epoch == atomic_load_32(&tmq->epoch)) {
/*printf("epoch match\n");*/
SMqClientVg* pVg = rspMsg->vg;
SMqClientVg* pVg = pollRspWrapper->vgHandle;
/*printf("vg %d offset %ld up to %ld\n", pVg->vgId, pVg->currentOffset, rspMsg->msg.rspOffset);*/
pVg->currentOffset = rspMsg->msg.rspOffset;
pVg->currentOffset = pollRspWrapper->msg.rspOffset;
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
if (rspMsg->msg.numOfTopics == 0) {
taosFreeQitem(rspMsg);
rspHead = NULL;
if (pollRspWrapper->msg.dataLen == 0) {
taosFreeQitem(pollRspWrapper);
rspWrapper = NULL;
continue;
}
return rspMsg;
// build msg
SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper);
return pRsp;
} else {
/*printf("epoch mismatch\n");*/
taosFreeQitem(rspMsg);
taosFreeQitem(pollRspWrapper);
}
} else {
/*printf("handle ep rsp %d\n", rspMsg->head.mqMsgType);*/
bool reset = false;
tmqHandleNoPollRsp(tmq, rspHead, &reset);
taosFreeQitem(rspHead);
tmqHandleNoPollRsp(tmq, rspWrapper, &reset);
taosFreeQitem(rspWrapper);
if (pollIfReset && reset) {
tscDebug("consumer %ld reset and repoll", tmq->consumerId);
tmqPollImpl(tmq, blockingTime);
......@@ -1382,17 +1431,17 @@ tmq_message_t* tmq_consumer_poll_v1(tmq_t* tmq, int64_t blocking_time) {
}
#endif
tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) {
tmq_message_t* rspMsg;
int64_t startTime = taosGetTimestampMs();
TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) {
SMqRspObj* rspObj;
int64_t startTime = taosGetTimestampMs();
// TODO: put into another thread or delayed queue
int64_t status = atomic_load_64(&tmq->status);
tmqAskEp(tmq, status == TMQ_CONSUMER_STATUS__INIT);
rspMsg = tmqHandleAllRsp(tmq, blocking_time, false);
if (rspMsg) {
return rspMsg;
rspObj = tmqHandleAllRsp(tmq, blocking_time, false);
if (rspObj) {
return (TAOS_RES*)rspObj;
}
while (1) {
......@@ -1402,9 +1451,9 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) {
/*tsem_wait(&tmq->rspSem);*/
rspMsg = tmqHandleAllRsp(tmq, blocking_time, false);
if (rspMsg) {
return rspMsg;
rspObj = tmqHandleAllRsp(tmq, blocking_time, false);
if (rspObj) {
return (TAOS_RES*)rspObj;
}
if (blocking_time != 0) {
int64_t endTime = taosGetTimestampMs();
......@@ -1546,6 +1595,7 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* tmq_topic_v
}
#endif
#if 0
void tmq_message_destroy(tmq_message_t* tmq_message) {
if (tmq_message == NULL) return;
SMqPollRsp* pRsp = &tmq_message->msg;
......@@ -1553,6 +1603,7 @@ void tmq_message_destroy(tmq_message_t* tmq_message) {
/*taosMemoryFree(tmq_message);*/
taosFreeQitem(tmq_message);
}
#endif
tmq_resp_err_t tmq_consumer_close(tmq_t* tmq) { return TMQ_RESP_ERR__SUCCESS; }
......@@ -1563,4 +1614,27 @@ const char* tmq_err2str(tmq_resp_err_t err) {
return "fail";
}
char* tmq_get_topic_name(tmq_message_t* message) { return "not implemented yet"; }
char* tmq_get_topic_name(TAOS_RES* res) {
if (TD_RES_TMQ(res)) {
SMqRspObj* pRspObj = (SMqRspObj*)res;
return pRspObj->topic;
} else {
return NULL;
}
}
int32_t tmq_get_vgroup_id(TAOS_RES* res) {
if (TD_RES_TMQ(res)) {
SMqRspObj* pRspObj = (SMqRspObj*)res;
return pRspObj->vgId;
} else {
return -1;
}
}
void tmq_message_destroy(TAOS_RES* res) {
if (res == NULL) return;
if (TD_RES_TMQ(res)) {
SMqRspObj* pRspObj = (SMqRspObj*)res;
}
}
......@@ -255,7 +255,7 @@ int32_t tqDeserializeConsumer(STQ* pTq, const STqSerializedHead* pHead, STqConsu
return 0;
}
#if 0
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
SMqPollReq* pReq = pMsg->pCont;
int64_t consumerId = pReq->consumerId;
......@@ -433,6 +433,205 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
return 0;
}
#endif
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
SMqPollReq* pReq = pMsg->pCont;
int64_t consumerId = pReq->consumerId;
int64_t fetchOffset;
int64_t blockingTime = pReq->blockingTime;
int32_t reqEpoch = pReq->epoch;
if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__EARLIEAST) {
fetchOffset = 0;
} else if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__LATEST) {
fetchOffset = walGetLastVer(pTq->pWal);
} else {
fetchOffset = pReq->currentOffset + 1;
}
vDebug("tmq poll: consumer %ld (epoch %d) recv poll req in vg %d, req %ld %ld", consumerId, pReq->epoch,
pTq->pVnode->vgId, pReq->currentOffset, fetchOffset);
SMqPollRspV2 rspV2 = {0};
rspV2.dataLen = 0;
STqConsumer* pConsumer = tqHandleGet(pTq->tqMeta, consumerId);
if (pConsumer == NULL) {
vWarn("tmq poll: consumer %ld (epoch %d) not found in vg %d", consumerId, pReq->epoch, pTq->pVnode->vgId);
pMsg->pCont = NULL;
pMsg->contLen = 0;
pMsg->code = -1;
tmsgSendRsp(pMsg);
return 0;
}
int32_t consumerEpoch = atomic_load_32(&pConsumer->epoch);
while (consumerEpoch < reqEpoch) {
consumerEpoch = atomic_val_compare_exchange_32(&pConsumer->epoch, consumerEpoch, reqEpoch);
}
STqTopic* pTopic = NULL;
int32_t topicSz = taosArrayGetSize(pConsumer->topics);
for (int32_t i = 0; i < topicSz; i++) {
STqTopic* topic = taosArrayGet(pConsumer->topics, i);
// TODO race condition
ASSERT(pConsumer->consumerId == consumerId);
if (strcmp(topic->topicName, pReq->topic) == 0) {
pTopic = topic;
break;
}
}
if (pTopic == NULL) {
vWarn("tmq poll: consumer %ld (epoch %d) topic %s not found in vg %d", consumerId, pReq->epoch, pReq->topic,
pTq->pVnode->vgId);
pMsg->pCont = NULL;
pMsg->contLen = 0;
pMsg->code = -1;
tmsgSendRsp(pMsg);
return 0;
}
vDebug("poll topic %s from consumer %ld (epoch %d) vg %d", pTopic->topicName, consumerId, pReq->epoch,
pTq->pVnode->vgId);
rspV2.reqOffset = pReq->currentOffset;
rspV2.skipLogNum = 0;
while (1) {
/*if (fetchOffset > walGetLastVer(pTq->pWal) || walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) {*/
// TODO
consumerEpoch = atomic_load_32(&pConsumer->epoch);
if (consumerEpoch > reqEpoch) {
vDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, found new consumer epoch %d discard req epoch %d",
consumerId, pReq->epoch, pTq->pVnode->vgId, fetchOffset, consumerEpoch, reqEpoch);
break;
}
SWalReadHead* pHead;
if (walReadWithHandle_s(pTopic->pReadhandle, fetchOffset, &pHead) < 0) {
// TODO: no more log, set timer to wait blocking time
// if data inserted during waiting, launch query and
// response to user
vDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, no more log to return", consumerId, pReq->epoch,
pTq->pVnode->vgId, fetchOffset);
break;
}
vDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d offset %ld msgType %d", consumerId, pReq->epoch,
pTq->pVnode->vgId, fetchOffset, pHead->msgType);
/*int8_t pos = fetchOffset % TQ_BUFFER_SIZE;*/
/*pHead = pTopic->pReadhandle->pHead;*/
if (pHead->msgType == TDMT_VND_SUBMIT) {
SSubmitReq* pCont = (SSubmitReq*)&pHead->body;
qTaskInfo_t task = pTopic->buffer.output[workerId].task;
ASSERT(task);
qSetStreamInput(task, pCont, STREAM_DATA_TYPE_SUBMIT_BLOCK);
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
while (1) {
SSDataBlock* pDataBlock = NULL;
uint64_t ts;
if (qExecTask(task, &pDataBlock, &ts) < 0) {
ASSERT(false);
}
if (pDataBlock == NULL) {
/*pos = fetchOffset % TQ_BUFFER_SIZE;*/
break;
}
taosArrayPush(pRes, pDataBlock);
}
if (taosArrayGetSize(pRes) == 0) {
vDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d skip log %ld since not wanted", consumerId,
pReq->epoch, pTq->pVnode->vgId, fetchOffset);
fetchOffset++;
rspV2.skipLogNum++;
taosArrayDestroy(pRes);
continue;
}
rspV2.rspOffset = fetchOffset;
int32_t blockSz = taosArrayGetSize(pRes);
int32_t tlen = 0;
for (int32_t i = 0; i < blockSz; i++) {
SSDataBlock* pBlock = taosArrayGet(pRes, i);
tlen += sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock);
}
void* data = taosMemoryMalloc(tlen);
if (data == NULL) {
pMsg->code = -1;
taosMemoryFree(pHead);
}
rspV2.blockData = data;
void* dataBlockBuf = data;
int32_t pos;
for (int32_t i = 0; i < blockSz; i++) {
pos = 0;
SSDataBlock* pBlock = taosArrayGet(pRes, i);
blockCompressEncode(pBlock, dataBlockBuf, &pos, pBlock->info.numOfCols, false);
taosArrayPush(rspV2.blockPos, &rspV2.dataLen);
rspV2.dataLen += pos;
dataBlockBuf = POINTER_SHIFT(dataBlockBuf, pos);
}
int32_t msgLen = sizeof(SMqRspHead) + tEncodeSMqPollRspV2(NULL, &rspV2);
void* buf = rpcMallocCont(msgLen);
((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
((SMqRspHead*)buf)->epoch = pReq->epoch;
((SMqRspHead*)buf)->consumerId = consumerId;
void* msgBodyBuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
tEncodeSMqPollRspV2(&msgBodyBuf, &rspV2);
/*rsp.pBlockData = pRes;*/
/*taosArrayDestroyEx(rsp.pBlockData, (void (*)(void*))tDeleteSSDataBlock);*/
pMsg->pCont = buf;
pMsg->contLen = tlen;
pMsg->code = 0;
vDebug("vg %d offset %ld msgType %d from consumer %ld (epoch %d) actual rsp", pTq->pVnode->vgId, fetchOffset,
pHead->msgType, consumerId, pReq->epoch);
tmsgSendRsp(pMsg);
taosMemoryFree(pHead);
return 0;
} else {
taosMemoryFree(pHead);
fetchOffset++;
rspV2.skipLogNum++;
}
}
/*if (blockingTime != 0) {*/
/*tqAddClientPusher(pTq->tqPushMgr, pMsg, consumerId, blockingTime);*/
/*} else {*/
rspV2.rspOffset = fetchOffset - 1;
int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqPollRspV2(NULL, &rspV2);
void* buf = rpcMallocCont(tlen);
if (buf == NULL) {
pMsg->code = -1;
return -1;
}
((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
((SMqRspHead*)buf)->epoch = pReq->epoch;
((SMqRspHead*)buf)->consumerId = consumerId;
void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
tEncodeSMqPollRspV2(&abuf, &rspV2);
pMsg->pCont = buf;
pMsg->contLen = tlen;
pMsg->code = 0;
tmsgSendRsp(pMsg);
vDebug("vg %d offset %ld from consumer %ld (epoch %d) not rsp", pTq->pVnode->vgId, fetchOffset, consumerId,
pReq->epoch);
/*}*/
return 0;
}
int32_t tqProcessRebReq(STQ* pTq, char* msg) {
SMqMVRebReq req = {0};
......
......@@ -92,7 +92,7 @@ static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput,
return false;
}
pBuf->allocSize = sizeof(SRetrieveTableRsp) + blockEstimateEncodeSize(pInput->pData);
pBuf->allocSize = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pInput->pData);
pBuf->pData = taosMemoryMalloc(pBuf->allocSize);
if (pBuf->pData == NULL) {
......
......@@ -195,7 +195,7 @@ void parseArgument(int32_t argc, char *argv[]) {
}
static int running = 1;
static void msg_process(tmq_message_t* message) { tmqShowMsg(message); }
/*static void msg_process(tmq_message_t* message) { tmqShowMsg(message); }*/
// calc dir size (not include itself 4096Byte)
int64_t getDirectorySize(char *dir)
......@@ -363,9 +363,9 @@ void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
}
while (running) {
tmq_message_t* tmqmessage = tmq_consumer_poll(tmq, 1);
TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 1);
if (tmqmessage) {
msg_process(tmqmessage);
/*msg_process(tmqmessage);*/
tmq_message_destroy(tmqmessage);
if ((++msg_count % MIN_COMMIT_COUNT) == 0) tmq_commit(tmq, NULL, 0);
......@@ -392,12 +392,12 @@ void perf_loop(tmq_t* tmq, tmq_list_t* topics, int32_t totalMsgs, int64_t walLog
int32_t skipLogNum = 0;
int64_t startTime = taosGetTimestampUs();
while (running) {
tmq_message_t* tmqmessage = tmq_consumer_poll(tmq, 3000);
TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 3000);
if (tmqmessage) {
batchCnt++;
skipLogNum += tmqGetSkipLogNum(tmqmessage);
/*skipLogNum += tmqGetSkipLogNum(tmqmessage);*/
if (0 != g_stConfInfo.showMsgFlag) {
msg_process(tmqmessage);
/*msg_process(tmqmessage);*/
}
tmq_message_destroy(tmqmessage);
} else {
......
......@@ -13,51 +13,49 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
// clang-format off
#include <assert.h>
#include <dirent.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <time.h>
#include <unistd.h>
#include <stdlib.h>
#include <dirent.h>
#include "taos.h"
#include "taoserror.h"
#include "tlog.h"
#define GREEN "\033[1;32m"
#define NC "\033[0m"
#define GREEN "\033[1;32m"
#define NC "\033[0m"
#define min(a, b) (((a) < (b)) ? (a) : (b))
#define MAX_SQL_STR_LEN (1024 * 1024)
#define MAX_ROW_STR_LEN (16 * 1024)
#define MAX_SQL_STR_LEN (1024 * 1024)
#define MAX_ROW_STR_LEN (16 * 1024)
typedef struct {
// input from argvs
char dbName[32];
char topicString[256];
char keyString[1024];
int32_t showMsgFlag;
int32_t consumeDelay; // unit s
int32_t consumeMsgCnt;
// save result after parse agrvs
int32_t numOfTopic;
char topics[32][64];
int32_t numOfKey;
char key[32][64];
char value[32][64];
// input from argvs
char dbName[32];
char topicString[256];
char keyString[1024];
int32_t showMsgFlag;
int32_t consumeDelay; // unit s
int32_t consumeMsgCnt;
// save result after parse agrvs
int32_t numOfTopic;
char topics[32][64];
int32_t numOfKey;
char key[32][64];
char value[32][64];
} SConfInfo;
static SConfInfo g_stConfInfo;
//char* g_pRowValue = NULL;
//TdFilePtr g_fp = NULL;
// char* g_pRowValue = NULL;
// TdFilePtr g_fp = NULL;
static void printHelp() {
char indent[10] = " ";
......@@ -80,13 +78,12 @@ static void printHelp() {
exit(EXIT_SUCCESS);
}
void parseArgument(int32_t argc, char *argv[]) {
void parseArgument(int32_t argc, char* argv[]) {
memset(&g_stConfInfo, 0, sizeof(SConfInfo));
g_stConfInfo.showMsgFlag = 0;
g_stConfInfo.consumeDelay = 8000;
g_stConfInfo.showMsgFlag = 0;
g_stConfInfo.consumeDelay = 8000;
g_stConfInfo.consumeMsgCnt = 0;
for (int32_t i = 1; i < argc; i++) {
if (strcmp(argv[i], "-h") == 0 || strcmp(argv[i], "--help") == 0) {
printHelp();
......@@ -107,7 +104,7 @@ void parseArgument(int32_t argc, char *argv[]) {
g_stConfInfo.consumeMsgCnt = atol(argv[++i]);
} else {
printf("%s unknow para: %s %s", GREEN, argv[++i], NC);
exit(-1);
exit(-1);
}
}
......@@ -117,91 +114,87 @@ void parseArgument(int32_t argc, char *argv[]) {
pPrint("%s topicString:%s %s", GREEN, g_stConfInfo.topicString, NC);
pPrint("%s keyString:%s %s", GREEN, g_stConfInfo.keyString, NC);
pPrint("%s showMsgFlag:%d %s", GREEN, g_stConfInfo.showMsgFlag, NC);
#endif
#endif
}
void splitStr(char **arr, char *str, const char *del) {
char *s = strtok(str, del);
while(s != NULL) {
void splitStr(char** arr, char* str, const char* del) {
char* s = strtok(str, del);
while (s != NULL) {
*arr++ = s;
s = strtok(NULL, del);
}
}
void ltrim(char *str)
{
if (str == NULL || *str == '\0')
{
return;
}
int len = 0;
char *p = str;
while (*p != '\0' && isspace(*p))
{
++p; ++len;
}
memmove(str, p, strlen(str) - len + 1);
//return str;
void ltrim(char* str) {
if (str == NULL || *str == '\0') {
return;
}
int len = 0;
char* p = str;
while (*p != '\0' && isspace(*p)) {
++p;
++len;
}
memmove(str, p, strlen(str) - len + 1);
// return str;
}
void parseInputString() {
//printf("topicString: %s\n", g_stConfInfo.topicString);
//printf("keyString: %s\n\n", g_stConfInfo.keyString);
// printf("topicString: %s\n", g_stConfInfo.topicString);
// printf("keyString: %s\n\n", g_stConfInfo.keyString);
char *token;
char* token;
const char delim[2] = ",";
const char ch = ':';
token = strtok(g_stConfInfo.topicString, delim);
while(token != NULL) {
//printf("%s\n", token );
strcpy(g_stConfInfo.topics[g_stConfInfo.numOfTopic], token);
while (token != NULL) {
// printf("%s\n", token );
strcpy(g_stConfInfo.topics[g_stConfInfo.numOfTopic], token);
ltrim(g_stConfInfo.topics[g_stConfInfo.numOfTopic]);
//printf("%s\n", g_stConfInfo.topics[g_stConfInfo.numOfTopic]);
g_stConfInfo.numOfTopic++;
// printf("%s\n", g_stConfInfo.topics[g_stConfInfo.numOfTopic]);
g_stConfInfo.numOfTopic++;
token = strtok(NULL, delim);
}
token = strtok(g_stConfInfo.keyString, delim);
while(token != NULL) {
//printf("%s\n", token );
{
char* pstr = token;
ltrim(pstr);
char *ret = strchr(pstr, ch);
memcpy(g_stConfInfo.key[g_stConfInfo.numOfKey], pstr, ret-pstr);
strcpy(g_stConfInfo.value[g_stConfInfo.numOfKey], ret+1);
//printf("key: %s, value: %s\n", g_stConfInfo.key[g_stConfInfo.numOfKey], g_stConfInfo.value[g_stConfInfo.numOfKey]);
g_stConfInfo.numOfKey++;
while (token != NULL) {
// printf("%s\n", token );
{
char* pstr = token;
ltrim(pstr);
char* ret = strchr(pstr, ch);
memcpy(g_stConfInfo.key[g_stConfInfo.numOfKey], pstr, ret - pstr);
strcpy(g_stConfInfo.value[g_stConfInfo.numOfKey], ret + 1);
// printf("key: %s, value: %s\n", g_stConfInfo.key[g_stConfInfo.numOfKey],
// g_stConfInfo.value[g_stConfInfo.numOfKey]);
g_stConfInfo.numOfKey++;
}
token = strtok(NULL, delim);
}
}
static int running = 1;
/*static void msg_process(tmq_message_t* message) { tmqShowMsg(message); }*/
static int running = 1;
static void msg_process(tmq_message_t* message) { tmqShowMsg(message); }
int queryDB(TAOS *taos, char *command) {
TAOS_RES *pRes = taos_query(taos, command);
int code = taos_errno(pRes);
//if ((code != 0) && (code != TSDB_CODE_RPC_AUTH_REQUIRED)) {
if (code != 0) {
pError("failed to reason:%s, sql: %s", tstrerror(code), command);
taos_free_result(pRes);
return -1;
}
taos_free_result(pRes);
return 0 ;
int queryDB(TAOS* taos, char* command) {
TAOS_RES* pRes = taos_query(taos, command);
int code = taos_errno(pRes);
// if ((code != 0) && (code != TSDB_CODE_RPC_AUTH_REQUIRED)) {
if (code != 0) {
pError("failed to reason:%s, sql: %s", tstrerror(code), command);
taos_free_result(pRes);
return -1;
}
taos_free_result(pRes);
return 0;
}
tmq_t* build_consumer() {
char sqlStr[1024] = {0};
TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0);
assert(pConn != NULL);
......@@ -209,13 +202,13 @@ tmq_t* build_consumer() {
TAOS_RES* pRes = taos_query(pConn, sqlStr);
if (taos_errno(pRes) != 0) {
printf("error in use db, reason:%s\n", taos_errstr(pRes));
taos_free_result(pRes);
exit(-1);
taos_free_result(pRes);
exit(-1);
}
taos_free_result(pRes);
tmq_conf_t* conf = tmq_conf_new();
//tmq_conf_set(conf, "group.id", "tg2");
// tmq_conf_set(conf, "group.id", "tg2");
for (int32_t i = 0; i < g_stConfInfo.numOfKey; i++) {
tmq_conf_set(conf, g_stConfInfo.key[i], g_stConfInfo.value[i]);
}
......@@ -225,7 +218,7 @@ tmq_t* build_consumer() {
tmq_list_t* build_topic_list() {
tmq_list_t* topic_list = tmq_list_new();
//tmq_list_append(topic_list, "test_stb_topic_1");
// tmq_list_append(topic_list, "test_stb_topic_1");
for (int32_t i = 0; i < g_stConfInfo.numOfTopic; i++) {
tmq_list_append(topic_list, g_stConfInfo.topics[i]);
}
......@@ -239,21 +232,21 @@ void loop_consume(tmq_t* tmq) {
int32_t totalRows = 0;
int32_t skipLogNum = 0;
while (running) {
tmq_message_t* tmqMsg = tmq_consumer_poll(tmq, 8000);
TAOS_RES* tmqMsg = tmq_consumer_poll(tmq, 8000);
if (tmqMsg) {
totalMsgs++;
totalMsgs++;
#if 0
#if 0
TAOS_ROW row;
while (NULL != (row = tmq_get_row(tmqMsg))) {
totalRows++;
}
#endif
skipLogNum += tmqGetSkipLogNum(tmqMsg);
if (0 != g_stConfInfo.showMsgFlag) {
msg_process(tmqMsg);
}
#endif
/*skipLogNum += tmqGetSkipLogNum(tmqMsg);*/
if (0 != g_stConfInfo.showMsgFlag) {
/*msg_process(tmqMsg);*/
}
tmq_message_destroy(tmqMsg);
} else {
break;
......@@ -263,13 +256,12 @@ void loop_consume(tmq_t* tmq) {
err = tmq_consumer_close(tmq);
if (err) {
printf("tmq_consumer_close() fail, reason: %s\n", tmq_err2str(err));
exit(-1);
exit(-1);
}
printf("{consume success: %d, %d}", totalMsgs, totalRows);
}
void parallel_consume(tmq_t* tmq) {
tmq_resp_err_t err;
......@@ -277,26 +269,26 @@ void parallel_consume(tmq_t* tmq) {
int32_t totalRows = 0;
int32_t skipLogNum = 0;
while (running) {
tmq_message_t* tmqMsg = tmq_consumer_poll(tmq, g_stConfInfo.consumeDelay * 1000);
TAOS_RES* tmqMsg = tmq_consumer_poll(tmq, g_stConfInfo.consumeDelay * 1000);
if (tmqMsg) {
totalMsgs++;
totalMsgs++;
#if 0
#if 0
TAOS_ROW row;
while (NULL != (row = tmq_get_row(tmqMsg))) {
totalRows++;
}
#endif
skipLogNum += tmqGetSkipLogNum(tmqMsg);
if (0 != g_stConfInfo.showMsgFlag) {
msg_process(tmqMsg);
}
#endif
/*skipLogNum += tmqGetSkipLogNum(tmqMsg);*/
if (0 != g_stConfInfo.showMsgFlag) {
/*msg_process(tmqMsg);*/
}
tmq_message_destroy(tmqMsg);
if (totalMsgs >= g_stConfInfo.consumeMsgCnt) {
if (totalMsgs >= g_stConfInfo.consumeMsgCnt) {
break;
}
}
} else {
break;
}
......@@ -305,22 +297,22 @@ void parallel_consume(tmq_t* tmq) {
err = tmq_consumer_close(tmq);
if (err) {
printf("tmq_consumer_close() fail, reason: %s\n", tmq_err2str(err));
exit(-1);
exit(-1);
}
printf("%d", totalMsgs); // output to sim for check result
printf("%d", totalMsgs); // output to sim for check result
}
int main(int32_t argc, char *argv[]) {
int main(int32_t argc, char* argv[]) {
parseArgument(argc, argv);
parseInputString();
tmq_t* tmq = build_consumer();
tmq_t* tmq = build_consumer();
tmq_list_t* topic_list = build_topic_list();
if ((NULL == tmq) || (NULL == topic_list)){
if ((NULL == tmq) || (NULL == topic_list)) {
return -1;
}
tmq_resp_err_t err = tmq_subscribe(tmq, topic_list);
if (err) {
printf("tmq_subscribe() fail, reason: %s\n", tmq_err2str(err));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册