提交 03d7201d 编写于 作者: L Liu Jicong

feat: support tmq msg parse

上级 cf33a822
...@@ -19,8 +19,8 @@ ...@@ -19,8 +19,8 @@
#include <time.h> #include <time.h>
#include "taos.h" #include "taos.h"
static int running = 1; static int running = 1;
static void msg_process(tmq_message_t* message) { tmqShowMsg(message); } /*static void msg_process(tmq_message_t* message) { tmqShowMsg(message); }*/
int32_t init_env() { int32_t init_env() {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
...@@ -166,11 +166,11 @@ void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) { ...@@ -166,11 +166,11 @@ void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
int32_t cnt = 0; int32_t cnt = 0;
/*clock_t startTime = clock();*/ /*clock_t startTime = clock();*/
while (running) { while (running) {
tmq_message_t* tmqmessage = tmq_consumer_poll(tmq, 500); TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 500);
if (tmqmessage) { if (tmqmessage) {
cnt++; cnt++;
printf("get data\n"); /*printf("get data\n");*/
msg_process(tmqmessage); /*msg_process(tmqmessage);*/
tmq_message_destroy(tmqmessage); tmq_message_destroy(tmqmessage);
/*} else {*/ /*} else {*/
/*break;*/ /*break;*/
...@@ -198,9 +198,9 @@ void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) { ...@@ -198,9 +198,9 @@ void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
} }
while (running) { while (running) {
tmq_message_t* tmqmessage = tmq_consumer_poll(tmq, 1000); TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 1000);
if (tmqmessage) { if (tmqmessage) {
msg_process(tmqmessage); /*msg_process(tmqmessage);*/
tmq_message_destroy(tmqmessage); tmq_message_destroy(tmqmessage);
if ((++msg_count % MIN_COMMIT_COUNT) == 0) tmq_commit(tmq, NULL, 0); if ((++msg_count % MIN_COMMIT_COUNT) == 0) tmq_commit(tmq, NULL, 0);
...@@ -226,10 +226,10 @@ void perf_loop(tmq_t* tmq, tmq_list_t* topics) { ...@@ -226,10 +226,10 @@ void perf_loop(tmq_t* tmq, tmq_list_t* topics) {
int32_t skipLogNum = 0; int32_t skipLogNum = 0;
clock_t startTime = clock(); clock_t startTime = clock();
while (running) { while (running) {
tmq_message_t* tmqmessage = tmq_consumer_poll(tmq, 500); TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 500);
if (tmqmessage) { if (tmqmessage) {
batchCnt++; batchCnt++;
skipLogNum += tmqGetSkipLogNum(tmqmessage); /*skipLogNum += tmqGetSkipLogNum(tmqmessage);*/
/*msg_process(tmqmessage);*/ /*msg_process(tmqmessage);*/
tmq_message_destroy(tmqmessage); tmq_message_destroy(tmqmessage);
} else { } else {
......
...@@ -30,7 +30,7 @@ typedef void **TAOS_ROW; ...@@ -30,7 +30,7 @@ typedef void **TAOS_ROW;
#if 0 #if 0
typedef void TAOS_STREAM; typedef void TAOS_STREAM;
#endif #endif
typedef void TAOS_SUB; typedef void TAOS_SUB;
// Data type definition // Data type definition
#define TSDB_DATA_TYPE_NULL 0 // 1 bytes #define TSDB_DATA_TYPE_NULL 0 // 1 bytes
...@@ -138,13 +138,13 @@ typedef enum { ...@@ -138,13 +138,13 @@ typedef enum {
#define RET_MSG_LENGTH 1024 #define RET_MSG_LENGTH 1024
typedef struct setConfRet { typedef struct setConfRet {
SET_CONF_RET_CODE retCode; SET_CONF_RET_CODE retCode;
char retMsg[RET_MSG_LENGTH]; char retMsg[RET_MSG_LENGTH];
} setConfRet; } setConfRet;
DLL_EXPORT void taos_cleanup(void); DLL_EXPORT void taos_cleanup(void);
DLL_EXPORT int taos_options(TSDB_OPTION option, const void *arg, ...); DLL_EXPORT int taos_options(TSDB_OPTION option, const void *arg, ...);
DLL_EXPORT setConfRet taos_set_config(const char *config); DLL_EXPORT setConfRet taos_set_config(const char *config);
DLL_EXPORT TAOS *taos_connect(const char *ip, const char *user, const char *pass, const char *db, uint16_t port); DLL_EXPORT TAOS *taos_connect(const char *ip, const char *user, const char *pass, const char *db, uint16_t port);
DLL_EXPORT TAOS *taos_connect_l(const char *ip, int ipLen, const char *user, int userLen, const char *pass, int passLen, DLL_EXPORT TAOS *taos_connect_l(const char *ip, int ipLen, const char *user, int userLen, const char *pass, int passLen,
const char *db, int dbLen, uint16_t port); const char *db, int dbLen, uint16_t port);
DLL_EXPORT TAOS *taos_connect_auth(const char *ip, const char *user, const char *auth, const char *db, uint16_t port); DLL_EXPORT TAOS *taos_connect_auth(const char *ip, const char *user, const char *auth, const char *db, uint16_t port);
...@@ -152,34 +152,34 @@ DLL_EXPORT void taos_close(TAOS *taos); ...@@ -152,34 +152,34 @@ DLL_EXPORT void taos_close(TAOS *taos);
const char *taos_data_type(int type); const char *taos_data_type(int type);
DLL_EXPORT TAOS_STMT *taos_stmt_init(TAOS *taos); DLL_EXPORT TAOS_STMT *taos_stmt_init(TAOS *taos);
DLL_EXPORT int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length); DLL_EXPORT int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length);
DLL_EXPORT int taos_stmt_set_tbname_tags(TAOS_STMT *stmt, const char *name, TAOS_BIND *tags); DLL_EXPORT int taos_stmt_set_tbname_tags(TAOS_STMT *stmt, const char *name, TAOS_BIND *tags);
DLL_EXPORT int taos_stmt_set_tbname(TAOS_STMT *stmt, const char *name); DLL_EXPORT int taos_stmt_set_tbname(TAOS_STMT *stmt, const char *name);
DLL_EXPORT int taos_stmt_set_sub_tbname(TAOS_STMT *stmt, const char *name); DLL_EXPORT int taos_stmt_set_sub_tbname(TAOS_STMT *stmt, const char *name);
DLL_EXPORT int taos_stmt_is_insert(TAOS_STMT *stmt, int *insert); DLL_EXPORT int taos_stmt_is_insert(TAOS_STMT *stmt, int *insert);
DLL_EXPORT int taos_stmt_num_params(TAOS_STMT *stmt, int *nums); DLL_EXPORT int taos_stmt_num_params(TAOS_STMT *stmt, int *nums);
DLL_EXPORT int taos_stmt_get_param(TAOS_STMT *stmt, int idx, int *type, int *bytes); DLL_EXPORT int taos_stmt_get_param(TAOS_STMT *stmt, int idx, int *type, int *bytes);
DLL_EXPORT int taos_stmt_bind_param(TAOS_STMT *stmt, TAOS_BIND *bind); DLL_EXPORT int taos_stmt_bind_param(TAOS_STMT *stmt, TAOS_BIND *bind);
DLL_EXPORT int taos_stmt_bind_param_batch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind); DLL_EXPORT int taos_stmt_bind_param_batch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind);
DLL_EXPORT int taos_stmt_bind_single_param_batch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind, int colIdx); DLL_EXPORT int taos_stmt_bind_single_param_batch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind, int colIdx);
DLL_EXPORT int taos_stmt_add_batch(TAOS_STMT *stmt); DLL_EXPORT int taos_stmt_add_batch(TAOS_STMT *stmt);
DLL_EXPORT int taos_stmt_execute(TAOS_STMT *stmt); DLL_EXPORT int taos_stmt_execute(TAOS_STMT *stmt);
DLL_EXPORT TAOS_RES *taos_stmt_use_result(TAOS_STMT *stmt); DLL_EXPORT TAOS_RES *taos_stmt_use_result(TAOS_STMT *stmt);
DLL_EXPORT int taos_stmt_close(TAOS_STMT *stmt); DLL_EXPORT int taos_stmt_close(TAOS_STMT *stmt);
DLL_EXPORT char *taos_stmt_errstr(TAOS_STMT *stmt); DLL_EXPORT char *taos_stmt_errstr(TAOS_STMT *stmt);
DLL_EXPORT int taos_stmt_affected_rows(TAOS_STMT *stmt); DLL_EXPORT int taos_stmt_affected_rows(TAOS_STMT *stmt);
DLL_EXPORT TAOS_RES *taos_query(TAOS *taos, const char *sql); DLL_EXPORT TAOS_RES *taos_query(TAOS *taos, const char *sql);
DLL_EXPORT TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen); DLL_EXPORT TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen);
DLL_EXPORT TAOS_ROW taos_fetch_row(TAOS_RES *res); DLL_EXPORT TAOS_ROW taos_fetch_row(TAOS_RES *res);
DLL_EXPORT int taos_result_precision(TAOS_RES *res); // get the time precision of result DLL_EXPORT int taos_result_precision(TAOS_RES *res); // get the time precision of result
DLL_EXPORT void taos_free_result(TAOS_RES *res); DLL_EXPORT void taos_free_result(TAOS_RES *res);
DLL_EXPORT int taos_field_count(TAOS_RES *res); DLL_EXPORT int taos_field_count(TAOS_RES *res);
DLL_EXPORT int taos_num_fields(TAOS_RES *res); DLL_EXPORT int taos_num_fields(TAOS_RES *res);
DLL_EXPORT int taos_affected_rows(TAOS_RES *res); DLL_EXPORT int taos_affected_rows(TAOS_RES *res);
DLL_EXPORT TAOS_FIELD *taos_fetch_fields(TAOS_RES *res); DLL_EXPORT TAOS_FIELD *taos_fetch_fields(TAOS_RES *res);
DLL_EXPORT int taos_select_db(TAOS *taos, const char *db); DLL_EXPORT int taos_select_db(TAOS *taos, const char *db);
...@@ -188,14 +188,14 @@ DLL_EXPORT void taos_stop_query(TAOS_RES *res); ...@@ -188,14 +188,14 @@ DLL_EXPORT void taos_stop_query(TAOS_RES *res);
DLL_EXPORT bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col); DLL_EXPORT bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col);
DLL_EXPORT bool taos_is_update_query(TAOS_RES *res); DLL_EXPORT bool taos_is_update_query(TAOS_RES *res);
DLL_EXPORT int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows); DLL_EXPORT int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows);
DLL_EXPORT int taos_fetch_block_s(TAOS_RES *res, int* numOfRows, TAOS_ROW *rows); DLL_EXPORT int taos_fetch_block_s(TAOS_RES *res, int *numOfRows, TAOS_ROW *rows);
DLL_EXPORT int taos_fetch_raw_block(TAOS_RES *res, int* numOfRows, void** pData); DLL_EXPORT int taos_fetch_raw_block(TAOS_RES *res, int *numOfRows, void **pData);
DLL_EXPORT int *taos_get_column_data_offset(TAOS_RES *res, int columnIndex); DLL_EXPORT int *taos_get_column_data_offset(TAOS_RES *res, int columnIndex);
DLL_EXPORT int taos_validate_sql(TAOS *taos, const char *sql); DLL_EXPORT int taos_validate_sql(TAOS *taos, const char *sql);
DLL_EXPORT void taos_reset_current_db(TAOS *taos); DLL_EXPORT void taos_reset_current_db(TAOS *taos);
DLL_EXPORT int *taos_fetch_lengths(TAOS_RES *res); DLL_EXPORT int *taos_fetch_lengths(TAOS_RES *res);
DLL_EXPORT TAOS_ROW *taos_result_block(TAOS_RES *res); DLL_EXPORT TAOS_ROW *taos_result_block(TAOS_RES *res);
DLL_EXPORT const char *taos_get_server_info(TAOS *taos); DLL_EXPORT const char *taos_get_server_info(TAOS *taos);
DLL_EXPORT const char *taos_get_client_info(); DLL_EXPORT const char *taos_get_client_info();
...@@ -237,9 +237,9 @@ typedef struct tmq_t tmq_t; ...@@ -237,9 +237,9 @@ typedef struct tmq_t tmq_t;
typedef struct tmq_topic_vgroup_t tmq_topic_vgroup_t; typedef struct tmq_topic_vgroup_t tmq_topic_vgroup_t;
typedef struct tmq_topic_vgroup_list_t tmq_topic_vgroup_list_t; typedef struct tmq_topic_vgroup_list_t tmq_topic_vgroup_list_t;
typedef struct tmq_conf_t tmq_conf_t; typedef struct tmq_conf_t tmq_conf_t;
typedef struct tmq_list_t tmq_list_t; typedef struct tmq_list_t tmq_list_t;
typedef struct tmq_message_t tmq_message_t; // typedef struct tmq_message_t tmq_message_t;
typedef void(tmq_commit_cb(tmq_t *, tmq_resp_err_t, tmq_topic_vgroup_list_t *, void *param)); typedef void(tmq_commit_cb(tmq_t *, tmq_resp_err_t, tmq_topic_vgroup_list_t *, void *param));
...@@ -259,7 +259,7 @@ DLL_EXPORT const char *tmq_err2str(tmq_resp_err_t); ...@@ -259,7 +259,7 @@ DLL_EXPORT const char *tmq_err2str(tmq_resp_err_t);
DLL_EXPORT tmq_resp_err_t tmq_subscribe(tmq_t *tmq, tmq_list_t *topic_list); DLL_EXPORT tmq_resp_err_t tmq_subscribe(tmq_t *tmq, tmq_list_t *topic_list);
DLL_EXPORT tmq_resp_err_t tmq_unsubscribe(tmq_t *tmq); DLL_EXPORT tmq_resp_err_t tmq_unsubscribe(tmq_t *tmq);
DLL_EXPORT tmq_resp_err_t tmq_subscription(tmq_t *tmq, tmq_list_t **topics); DLL_EXPORT tmq_resp_err_t tmq_subscription(tmq_t *tmq, tmq_list_t **topics);
DLL_EXPORT tmq_message_t *tmq_consumer_poll(tmq_t *tmq, int64_t blocking_time); DLL_EXPORT TAOS_RES *tmq_consumer_poll(tmq_t *tmq, int64_t blocking_time);
DLL_EXPORT tmq_resp_err_t tmq_consumer_close(tmq_t *tmq); DLL_EXPORT tmq_resp_err_t tmq_consumer_close(tmq_t *tmq);
#if 0 #if 0
DLL_EXPORT tmq_resp_err_t tmq_assign(tmq_t* tmq, const tmq_topic_vgroup_list_t* vgroups); DLL_EXPORT tmq_resp_err_t tmq_assign(tmq_t* tmq, const tmq_topic_vgroup_list_t* vgroups);
...@@ -268,8 +268,8 @@ DLL_EXPORT tmq_resp_err_t tmq_assignment(tmq_t* tmq, tmq_topic_vgroup_list_t** v ...@@ -268,8 +268,8 @@ DLL_EXPORT tmq_resp_err_t tmq_assignment(tmq_t* tmq, tmq_topic_vgroup_list_t** v
DLL_EXPORT tmq_resp_err_t tmq_commit(tmq_t *tmq, const tmq_topic_vgroup_list_t *offsets, int32_t async); DLL_EXPORT tmq_resp_err_t tmq_commit(tmq_t *tmq, const tmq_topic_vgroup_list_t *offsets, int32_t async);
#if 0 #if 0
DLL_EXPORT tmq_resp_err_t tmq_commit_message(tmq_t* tmq, const tmq_message_t* tmqmessage, int32_t async); DLL_EXPORT tmq_resp_err_t tmq_commit_message(tmq_t* tmq, const tmq_message_t* tmqmessage, int32_t async);
#endif
DLL_EXPORT tmq_resp_err_t tmq_seek(tmq_t *tmq, const tmq_topic_vgroup_t *offset); DLL_EXPORT tmq_resp_err_t tmq_seek(tmq_t *tmq, const tmq_topic_vgroup_t *offset);
#endif
/* ----------------------TMQ CONFIGURATION INTERFACE---------------------- */ /* ----------------------TMQ CONFIGURATION INTERFACE---------------------- */
enum tmq_conf_res_t { enum tmq_conf_res_t {
...@@ -285,21 +285,24 @@ DLL_EXPORT tmq_conf_res_t tmq_conf_set(tmq_conf_t *conf, const char *key, const ...@@ -285,21 +285,24 @@ DLL_EXPORT tmq_conf_res_t tmq_conf_set(tmq_conf_t *conf, const char *key, const
DLL_EXPORT void tmq_conf_destroy(tmq_conf_t *conf); DLL_EXPORT void tmq_conf_destroy(tmq_conf_t *conf);
DLL_EXPORT void tmq_conf_set_offset_commit_cb(tmq_conf_t *conf, tmq_commit_cb *cb); DLL_EXPORT void tmq_conf_set_offset_commit_cb(tmq_conf_t *conf, tmq_commit_cb *cb);
#if 0
// temporary used function for demo only // temporary used function for demo only
void tmqShowMsg(tmq_message_t *tmq_message); void tmqShowMsg(tmq_message_t *tmq_message);
int32_t tmqGetSkipLogNum(tmq_message_t *tmq_message); int32_t tmqGetSkipLogNum(tmq_message_t *tmq_message);
#endif
/* -------------------------TMQ MSG HANDLE INTERFACE---------------------- */ /* -------------------------TMQ MSG HANDLE INTERFACE---------------------- */
DLL_EXPORT char *tmq_get_topic_name(TAOS_RES *res);
DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res);
#if 0
DLL_EXPORT TAOS_ROW tmq_get_row(tmq_message_t *message); DLL_EXPORT TAOS_ROW tmq_get_row(tmq_message_t *message);
DLL_EXPORT char *tmq_get_topic_name(tmq_message_t *message);
DLL_EXPORT int32_t tmq_get_vgroup_id(tmq_message_t *message);
DLL_EXPORT int64_t tmq_get_request_offset(tmq_message_t *message); DLL_EXPORT int64_t tmq_get_request_offset(tmq_message_t *message);
DLL_EXPORT int64_t tmq_get_response_offset(tmq_message_t *message); DLL_EXPORT int64_t tmq_get_response_offset(tmq_message_t *message);
DLL_EXPORT TAOS_FIELD *tmq_get_fields(tmq_t *tmq, const char *topic); DLL_EXPORT TAOS_FIELD *tmq_get_fields(tmq_t *tmq, const char *topic);
DLL_EXPORT int32_t tmq_field_count(tmq_t *tmq, const char *topic); DLL_EXPORT int32_t tmq_field_count(tmq_t *tmq, const char *topic);
DLL_EXPORT void tmq_message_destroy(tmq_message_t *tmq_message); #endif
DLL_EXPORT void tmq_message_destroy(TAOS_RES *res);
/* --------------------TMPORARY INTERFACE FOR TESTING--------------------- */ /* --------------------TMPORARY INTERFACE FOR TESTING--------------------- */
#if 0 #if 0
DLL_EXPORT TAOS_RES *tmq_create_topic(TAOS *taos, const char *name, const char *sql, int sqlLen); DLL_EXPORT TAOS_RES *tmq_create_topic(TAOS *taos, const char *name, const char *sql, int sqlLen);
...@@ -308,7 +311,7 @@ DLL_EXPORT TAOS_RES *tmq_create_topic(TAOS *taos, const char *name, const char * ...@@ -308,7 +311,7 @@ DLL_EXPORT TAOS_RES *tmq_create_topic(TAOS *taos, const char *name, const char *
DLL_EXPORT TAOS_RES *tmq_create_stream(TAOS *taos, const char *streamName, const char *tbName, const char *sql); DLL_EXPORT TAOS_RES *tmq_create_stream(TAOS *taos, const char *streamName, const char *tbName, const char *sql);
/* ------------------------------ TMQ END -------------------------------- */ /* ------------------------------ TMQ END -------------------------------- */
#if 1 // Shuduo: temporary enable for app build #if 1 // Shuduo: temporary enable for app build
typedef void (*TAOS_SUBSCRIBE_CALLBACK)(TAOS_SUB *tsub, TAOS_RES *res, void *param, int code); typedef void (*TAOS_SUBSCRIBE_CALLBACK)(TAOS_SUB *tsub, TAOS_RES *res, void *param, int code);
#endif #endif
......
...@@ -199,11 +199,11 @@ int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullF ...@@ -199,11 +199,11 @@ int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullF
int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, uint32_t numOfRows); int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, uint32_t numOfRows);
int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows); int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows);
void colInfoDataCleanup(SColumnInfoData* pColumn, uint32_t numOfRows); void colInfoDataCleanup(SColumnInfoData* pColumn, uint32_t numOfRows);
void blockDataCleanup(SSDataBlock* pDataBlock); void blockDataCleanup(SSDataBlock* pDataBlock);
size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize); size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize);
void* blockDataDestroy(SSDataBlock* pBlock); void* blockDataDestroy(SSDataBlock* pBlock);
int32_t blockDataTrimFirstNRows(SSDataBlock* pBlock, size_t n); int32_t blockDataTrimFirstNRows(SSDataBlock* pBlock, size_t n);
...@@ -211,8 +211,8 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock); ...@@ -211,8 +211,8 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock);
void blockDebugShowData(const SArray* dataBlocks); void blockDebugShowData(const SArray* dataBlocks);
static FORCE_INLINE int32_t blockEstimateEncodeSize(const SSDataBlock* pBlock) { static FORCE_INLINE int32_t blockGetEncodeSize(const SSDataBlock* pBlock) {
return blockDataGetSerialMetaSize(pBlock) + (int32_t)ceil(blockDataGetSerialRowSize(pBlock) * pBlock->info.rows); return blockDataGetSerialMetaSize(pBlock) + blockDataGetSize(pBlock);
} }
static FORCE_INLINE int32_t blockCompressColData(SColumnInfoData* pColRes, int32_t numOfRows, char* data, static FORCE_INLINE int32_t blockCompressColData(SColumnInfoData* pColRes, int32_t numOfRows, char* data,
......
...@@ -484,7 +484,7 @@ typedef struct { ...@@ -484,7 +484,7 @@ typedef struct {
char intervalUnit; char intervalUnit;
char slidingUnit; char slidingUnit;
char char
offsetUnit; // TODO Remove it, the offset is the number of precision tickle, and it must be a immutable duration. offsetUnit; // TODO Remove it, the offset is the number of precision tickle, and it must be a immutable duration.
int8_t precision; int8_t precision;
int64_t interval; int64_t interval;
int64_t sliding; int64_t sliding;
...@@ -2379,6 +2379,53 @@ typedef struct { ...@@ -2379,6 +2379,53 @@ typedef struct {
SArray* pBlockData; // SArray<SSDataBlock> SArray* pBlockData; // SArray<SSDataBlock>
} SMqPollRsp; } SMqPollRsp;
typedef struct {
SMqRspHead head;
int64_t reqOffset;
int64_t rspOffset;
int32_t skipLogNum;
int32_t dataLen;
SArray* blockPos; // beginning pos for each SRetrieveTableRsp
void* blockData; // serialized batched SRetrieveTableRsp
} SMqPollRspV2;
static FORCE_INLINE int32_t tEncodeSMqPollRspV2(void** buf, const SMqPollRspV2* pRsp) {
int32_t tlen = 0;
tlen += taosEncodeFixedI64(buf, pRsp->reqOffset);
tlen += taosEncodeFixedI64(buf, pRsp->rspOffset);
tlen += taosEncodeFixedI32(buf, pRsp->skipLogNum);
tlen += taosEncodeFixedI32(buf, pRsp->dataLen);
if (pRsp->dataLen != 0) {
int32_t sz = taosArrayGetSize(pRsp->blockPos);
tlen += taosEncodeFixedI32(buf, sz);
for (int32_t i = 0; i < sz; i++) {
int32_t blockPos = *(int32_t*)taosArrayGet(pRsp->blockPos, i);
tlen += taosEncodeFixedI32(buf, blockPos);
}
tlen += taosEncodeBinary(buf, pRsp->blockData, pRsp->dataLen);
}
return tlen;
}
static FORCE_INLINE void* tDecodeSMqPollRspV2(const void* buf, SMqPollRspV2* pRsp) {
buf = taosDecodeFixedI64(buf, &pRsp->reqOffset);
buf = taosDecodeFixedI64(buf, &pRsp->rspOffset);
buf = taosDecodeFixedI32(buf, &pRsp->skipLogNum);
buf = taosDecodeFixedI32(buf, &pRsp->dataLen);
if (pRsp->dataLen != 0) {
int32_t sz;
buf = taosDecodeFixedI32(buf, &sz);
pRsp->blockPos = taosArrayInit(sz, sizeof(int32_t));
for (int32_t i = 0; i < sz; i++) {
int32_t blockPos;
buf = taosDecodeFixedI32(buf, &blockPos);
taosArrayPush(pRsp->blockPos, &blockPos);
}
buf = taosDecodeBinary(buf, &pRsp->blockData, pRsp->dataLen);
}
return (void*)buf;
}
typedef struct { typedef struct {
SMqRspHead head; SMqRspHead head;
char cgroup[TSDB_CGROUP_LEN]; char cgroup[TSDB_CGROUP_LEN];
......
...@@ -194,12 +194,12 @@ enum { ...@@ -194,12 +194,12 @@ enum {
#define TD_RES_QUERY(res) (*(int8_t*)res == RES_TYPE__QUERY) #define TD_RES_QUERY(res) (*(int8_t*)res == RES_TYPE__QUERY)
#define TD_RES_TMQ(res) (*(int8_t*)res == RES_TYPE__TMQ) #define TD_RES_TMQ(res) (*(int8_t*)res == RES_TYPE__TMQ)
typedef struct SMqRspObj { typedef struct {
int8_t resType; int8_t resType;
char* topic; char* topic;
void* vg;
SArray* res; // SArray<SReqResultInfo> SArray* res; // SArray<SReqResultInfo>
int32_t resIter; int32_t resIter;
int32_t vgId;
} SMqRspObj; } SMqRspObj;
typedef struct SRequestObj { typedef struct SRequestObj {
......
...@@ -465,7 +465,7 @@ int taos_fetch_raw_block(TAOS_RES *res, int *numOfRows, void **pData) { ...@@ -465,7 +465,7 @@ int taos_fetch_raw_block(TAOS_RES *res, int *numOfRows, void **pData) {
return 0; return 0;
} }
doFetchRow(pRequest, false, false); doFetchRow(pRequest, false, true);
SReqResultInfo *pResultInfo = &pRequest->body.resInfo; SReqResultInfo *pResultInfo = &pRequest->body.resInfo;
......
...@@ -27,11 +27,22 @@ ...@@ -27,11 +27,22 @@
struct tmq_message_t { struct tmq_message_t {
SMqPollRsp msg; SMqPollRsp msg;
char* topic; char* topic;
void* vg;
SArray* res; // SArray<SReqResultInfo> SArray* res; // SArray<SReqResultInfo>
int32_t vgId;
int32_t resIter; int32_t resIter;
}; };
typedef struct {
int8_t tmqRspType;
int32_t epoch;
} SMqRspWrapper;
typedef struct {
int8_t tmqRspType;
int32_t epoch;
SMqCMGetSubEpRsp msg;
} SMqAskEpRspWrapper;
struct tmq_list_t { struct tmq_list_t {
SArray container; SArray container;
}; };
...@@ -118,6 +129,14 @@ typedef struct { ...@@ -118,6 +129,14 @@ typedef struct {
TAOS_FIELD* fields; TAOS_FIELD* fields;
} SMqClientTopic; } SMqClientTopic;
typedef struct {
int8_t tmqRspType;
int32_t epoch;
SMqClientVg* vgHandle;
SMqClientTopic* topicHandle;
SMqPollRspV2 msg;
} SMqPollRspWrapper;
typedef struct { typedef struct {
tmq_t* tmq; tmq_t* tmq;
tsem_t rspSem; tsem_t rspSem;
...@@ -133,10 +152,10 @@ typedef struct { ...@@ -133,10 +152,10 @@ typedef struct {
typedef struct { typedef struct {
tmq_t* tmq; tmq_t* tmq;
SMqClientVg* pVg; SMqClientVg* pVg;
SMqClientTopic* pTopic;
int32_t epoch; int32_t epoch;
int32_t vgId; int32_t vgId;
tsem_t rspSem; tsem_t rspSem;
tmq_message_t** msg;
int32_t sync; int32_t sync;
} SMqPollCbParam; } SMqPollCbParam;
...@@ -244,7 +263,7 @@ static int32_t tmqMakeTopicVgKey(char* dst, const char* topicName, int32_t vg) { ...@@ -244,7 +263,7 @@ static int32_t tmqMakeTopicVgKey(char* dst, const char* topicName, int32_t vg) {
} }
void tmqClearUnhandleMsg(tmq_t* tmq) { void tmqClearUnhandleMsg(tmq_t* tmq) {
tmq_message_t* msg = NULL; SMqRspWrapper* msg = NULL;
while (1) { while (1) {
taosGetQitem(tmq->qall, (void**)&msg); taosGetQitem(tmq->qall, (void**)&msg);
if (msg) if (msg)
...@@ -777,7 +796,7 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) { ...@@ -777,7 +796,7 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) {
return buf; return buf;
} }
#if 0
int32_t tmqGetSkipLogNum(tmq_message_t* tmq_message) { int32_t tmqGetSkipLogNum(tmq_message_t* tmq_message) {
if (tmq_message == NULL) return 0; if (tmq_message == NULL) return 0;
SMqPollRsp* pRsp = &tmq_message->msg; SMqPollRsp* pRsp = &tmq_message->msg;
...@@ -827,11 +846,13 @@ void tmqShowMsg(tmq_message_t* tmq_message) { ...@@ -827,11 +846,13 @@ void tmqShowMsg(tmq_message_t* tmq_message) {
} }
} }
} }
#endif
int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
/*printf("recv poll\n");*/ /*printf("recv poll\n");*/
SMqPollCbParam* pParam = (SMqPollCbParam*)param; SMqPollCbParam* pParam = (SMqPollCbParam*)param;
SMqClientVg* pVg = pParam->pVg; SMqClientVg* pVg = pParam->pVg;
SMqClientTopic* pTopic = pParam->pTopic;
tmq_t* tmq = pParam->tmq; tmq_t* tmq = pParam->tmq;
if (code != 0) { if (code != 0) {
tscWarn("msg discard from vg %d, epoch %d, code:%x", pParam->vgId, pParam->epoch, code); tscWarn("msg discard from vg %d, epoch %d, code:%x", pParam->vgId, pParam->epoch, code);
...@@ -874,18 +895,22 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { ...@@ -874,18 +895,22 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
#endif #endif
/*SMqConsumeRsp* pRsp = taosMemoryCalloc(1, sizeof(SMqConsumeRsp));*/ /*SMqConsumeRsp* pRsp = taosMemoryCalloc(1, sizeof(SMqConsumeRsp));*/
tmq_message_t* pRsp = taosAllocateQitem(sizeof(tmq_message_t)); /*tmq_message_t* pRsp = taosAllocateQitem(sizeof(tmq_message_t));*/
if (pRsp == NULL) { SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper));
if (pRspWrapper == NULL) {
tscWarn("msg discard from vg %d, epoch %d since out of memory", pParam->vgId, pParam->epoch); tscWarn("msg discard from vg %d, epoch %d since out of memory", pParam->vgId, pParam->epoch);
goto CREATE_MSG_FAIL; goto CREATE_MSG_FAIL;
} }
memcpy(pRsp, pMsg->pData, sizeof(SMqRspHead)); pRspWrapper->tmqRspType = TMQ_MSG_TYPE__POLL_RSP;
tDecodeSMqPollRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRsp->msg); pRspWrapper->vgHandle = pVg;
/*pRsp->iter.curBlock = 0;*/ pRspWrapper->topicHandle = pTopic;
/*pRsp->iter.curRow = 0;*/ /*memcpy(pRsp, pMsg->pData, sizeof(SMqRspHead));*/
memcpy(&pRspWrapper->msg, pMsg->pData, sizeof(SMqRspHead));
tDecodeSMqPollRspV2(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRspWrapper->msg);
// TODO: alloc mem // TODO: alloc mem
/*pRsp->*/ /*pRsp->*/
/*printf("rsp commit off:%ld rsp off:%ld has data:%d\n", pRsp->committedOffset, pRsp->rspOffset, pRsp->numOfTopics);*/ /*printf("rsp commit off:%ld rsp off:%ld has data:%d\n", pRsp->committedOffset, pRsp->rspOffset, pRsp->numOfTopics);*/
#if 0 #if 0
if (pRsp->msg.numOfTopics == 0) { if (pRsp->msg.numOfTopics == 0) {
/*printf("no data\n");*/ /*printf("no data\n");*/
...@@ -894,11 +919,10 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { ...@@ -894,11 +919,10 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
} }
#endif #endif
tscDebug("consumer %ld recv poll: vg %d, req offset %ld, rsp offset %ld", tmq->consumerId, pParam->pVg->vgId, tscDebug("consumer %ld recv poll: vg %d, req offset %ld, rsp offset %ld", tmq->consumerId, pVg->vgId,
pRsp->msg.reqOffset, pRsp->msg.rspOffset); pRspWrapper->msg.reqOffset, pRspWrapper->msg.rspOffset);
pRsp->vg = pParam->pVg; taosWriteQitem(tmq->mqueue, pRspWrapper);
taosWriteQitem(tmq->mqueue, pRsp);
atomic_add_fetch_32(&tmq->readyRequest, 1); atomic_add_fetch_32(&tmq->readyRequest, 1);
/*tsem_post(&tmq->rspSem);*/ /*tsem_post(&tmq->rspSem);*/
return 0; return 0;
...@@ -1015,16 +1039,19 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) { ...@@ -1015,16 +1039,19 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) {
} }
tDeleteSMqCMGetSubEpRsp(&rsp); tDeleteSMqCMGetSubEpRsp(&rsp);
} else { } else {
SMqCMGetSubEpRsp* pRsp = taosAllocateQitem(sizeof(SMqCMGetSubEpRsp)); /*SMqCMGetSubEpRsp* pRsp = taosAllocateQitem(sizeof(SMqCMGetSubEpRsp));*/
if (pRsp == NULL) { SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper));
if (pWrapper == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
code = -1; code = -1;
goto END; goto END;
} }
memcpy(pRsp, pMsg->pData, sizeof(SMqRspHead)); pWrapper->tmqRspType = TMQ_MSG_TYPE__EP_RSP;
tDecodeSMqCMGetSubEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pRsp); pWrapper->epoch = head->epoch;
memcpy(&pWrapper->msg, pMsg->pData, sizeof(SMqRspHead));
tDecodeSMqCMGetSubEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pWrapper->msg);
taosWriteQitem(tmq->mqueue, pRsp); taosWriteQitem(tmq->mqueue, pWrapper);
/*tsem_post(&tmq->rspSem);*/ /*tsem_post(&tmq->rspSem);*/
} }
...@@ -1152,6 +1179,25 @@ SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t blockingTime, SMqClientTo ...@@ -1152,6 +1179,25 @@ SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t blockingTime, SMqClientTo
return pReq; return pReq;
} }
SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) {
SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj));
pRspObj->resType = RES_TYPE__TMQ;
pRspObj->topic = strdup(pWrapper->topicHandle->topicName);
pRspObj->resIter = -1;
pRspObj->vgId = pWrapper->vgHandle->vgId;
SMqPollRspV2* pRsp = &pWrapper->msg;
int32_t blockNum = taosArrayGetSize(pRsp->blockPos);
pRspObj->res = taosArrayInit(blockNum, sizeof(SReqResultInfo));
for (int32_t i = 0; i < blockNum; i++) {
int32_t pos = *(int32_t*)taosArrayGet(pRsp->blockPos, i);
SRetrieveTableRsp* pRetrieve = POINTER_SHIFT(pRsp->blockData, pos);
SReqResultInfo resInfo;
setQueryResultFromRsp(&resInfo, pRetrieve, true);
taosArrayPush(pRspObj->res, &resInfo);
}
return pRspObj;
}
#if 0 #if 0
tmq_message_t* tmqSyncPollImpl(tmq_t* tmq, int64_t blockingTime) { tmq_message_t* tmqSyncPollImpl(tmq_t* tmq, int64_t blockingTime) {
tmq_message_t* msg = NULL; tmq_message_t* msg = NULL;
...@@ -1258,6 +1304,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) { ...@@ -1258,6 +1304,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) {
} }
pParam->tmq = tmq; pParam->tmq = tmq;
pParam->pVg = pVg; pParam->pVg = pVg;
pParam->pTopic = pTopic;
pParam->vgId = pVg->vgId; pParam->vgId = pVg->vgId;
pParam->epoch = tmq->epoch; pParam->epoch = tmq->epoch;
pParam->sync = 0; pParam->sync = 0;
...@@ -1296,13 +1343,13 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) { ...@@ -1296,13 +1343,13 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) {
return 0; return 0;
} }
// return int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset) {
int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspHead* rspHead, bool* pReset) { if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
if (rspHead->mqMsgType == TMQ_MSG_TYPE__EP_RSP) {
/*printf("ep %d %d\n", rspMsg->head.epoch, tmq->epoch);*/ /*printf("ep %d %d\n", rspMsg->head.epoch, tmq->epoch);*/
if (rspHead->epoch > atomic_load_32(&tmq->epoch)) { if (rspWrapper->epoch > atomic_load_32(&tmq->epoch)) {
SMqCMGetSubEpRsp* rspMsg = (SMqCMGetSubEpRsp*)rspHead; SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper;
tmqUpdateEp(tmq, rspHead->epoch, rspMsg); SMqCMGetSubEpRsp* rspMsg = &pEpRspWrapper->msg;
tmqUpdateEp(tmq, rspWrapper->epoch, rspMsg);
/*tmqClearUnhandleMsg(tmq);*/ /*tmqClearUnhandleMsg(tmq);*/
*pReset = true; *pReset = true;
} else { } else {
...@@ -1314,41 +1361,43 @@ int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspHead* rspHead, bool* pReset) { ...@@ -1314,41 +1361,43 @@ int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspHead* rspHead, bool* pReset) {
return 0; return 0;
} }
tmq_message_t* tmqHandleAllRsp(tmq_t* tmq, int64_t blockingTime, bool pollIfReset) { SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t blockingTime, bool pollIfReset) {
while (1) { while (1) {
SMqRspHead* rspHead = NULL; SMqRspWrapper* rspWrapper = NULL;
taosGetQitem(tmq->qall, (void**)&rspHead); taosGetQitem(tmq->qall, (void**)&rspWrapper);
if (rspHead == NULL) { if (rspWrapper == NULL) {
taosReadAllQitems(tmq->mqueue, tmq->qall); taosReadAllQitems(tmq->mqueue, tmq->qall);
taosGetQitem(tmq->qall, (void**)&rspHead); taosGetQitem(tmq->qall, (void**)&rspWrapper);
if (rspHead == NULL) return NULL; if (rspWrapper == NULL) return NULL;
} }
if (rspHead->mqMsgType == TMQ_MSG_TYPE__POLL_RSP) { if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) {
tmq_message_t* rspMsg = (tmq_message_t*)rspHead; SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
atomic_sub_fetch_32(&tmq->readyRequest, 1); atomic_sub_fetch_32(&tmq->readyRequest, 1);
/*printf("handle poll rsp %d\n", rspMsg->head.mqMsgType);*/ /*printf("handle poll rsp %d\n", rspMsg->head.mqMsgType);*/
if (rspMsg->msg.head.epoch == atomic_load_32(&tmq->epoch)) { if (pollRspWrapper->msg.head.epoch == atomic_load_32(&tmq->epoch)) {
/*printf("epoch match\n");*/ /*printf("epoch match\n");*/
SMqClientVg* pVg = rspMsg->vg; SMqClientVg* pVg = pollRspWrapper->vgHandle;
/*printf("vg %d offset %ld up to %ld\n", pVg->vgId, pVg->currentOffset, rspMsg->msg.rspOffset);*/ /*printf("vg %d offset %ld up to %ld\n", pVg->vgId, pVg->currentOffset, rspMsg->msg.rspOffset);*/
pVg->currentOffset = rspMsg->msg.rspOffset; pVg->currentOffset = pollRspWrapper->msg.rspOffset;
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
if (rspMsg->msg.numOfTopics == 0) { if (pollRspWrapper->msg.dataLen == 0) {
taosFreeQitem(rspMsg); taosFreeQitem(pollRspWrapper);
rspHead = NULL; rspWrapper = NULL;
continue; continue;
} }
return rspMsg; // build msg
SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper);
return pRsp;
} else { } else {
/*printf("epoch mismatch\n");*/ /*printf("epoch mismatch\n");*/
taosFreeQitem(rspMsg); taosFreeQitem(pollRspWrapper);
} }
} else { } else {
/*printf("handle ep rsp %d\n", rspMsg->head.mqMsgType);*/ /*printf("handle ep rsp %d\n", rspMsg->head.mqMsgType);*/
bool reset = false; bool reset = false;
tmqHandleNoPollRsp(tmq, rspHead, &reset); tmqHandleNoPollRsp(tmq, rspWrapper, &reset);
taosFreeQitem(rspHead); taosFreeQitem(rspWrapper);
if (pollIfReset && reset) { if (pollIfReset && reset) {
tscDebug("consumer %ld reset and repoll", tmq->consumerId); tscDebug("consumer %ld reset and repoll", tmq->consumerId);
tmqPollImpl(tmq, blockingTime); tmqPollImpl(tmq, blockingTime);
...@@ -1382,17 +1431,17 @@ tmq_message_t* tmq_consumer_poll_v1(tmq_t* tmq, int64_t blocking_time) { ...@@ -1382,17 +1431,17 @@ tmq_message_t* tmq_consumer_poll_v1(tmq_t* tmq, int64_t blocking_time) {
} }
#endif #endif
tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) {
tmq_message_t* rspMsg; SMqRspObj* rspObj;
int64_t startTime = taosGetTimestampMs(); int64_t startTime = taosGetTimestampMs();
// TODO: put into another thread or delayed queue // TODO: put into another thread or delayed queue
int64_t status = atomic_load_64(&tmq->status); int64_t status = atomic_load_64(&tmq->status);
tmqAskEp(tmq, status == TMQ_CONSUMER_STATUS__INIT); tmqAskEp(tmq, status == TMQ_CONSUMER_STATUS__INIT);
rspMsg = tmqHandleAllRsp(tmq, blocking_time, false); rspObj = tmqHandleAllRsp(tmq, blocking_time, false);
if (rspMsg) { if (rspObj) {
return rspMsg; return (TAOS_RES*)rspObj;
} }
while (1) { while (1) {
...@@ -1402,9 +1451,9 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { ...@@ -1402,9 +1451,9 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) {
/*tsem_wait(&tmq->rspSem);*/ /*tsem_wait(&tmq->rspSem);*/
rspMsg = tmqHandleAllRsp(tmq, blocking_time, false); rspObj = tmqHandleAllRsp(tmq, blocking_time, false);
if (rspMsg) { if (rspObj) {
return rspMsg; return (TAOS_RES*)rspObj;
} }
if (blocking_time != 0) { if (blocking_time != 0) {
int64_t endTime = taosGetTimestampMs(); int64_t endTime = taosGetTimestampMs();
...@@ -1546,6 +1595,7 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* tmq_topic_v ...@@ -1546,6 +1595,7 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* tmq_topic_v
} }
#endif #endif
#if 0
void tmq_message_destroy(tmq_message_t* tmq_message) { void tmq_message_destroy(tmq_message_t* tmq_message) {
if (tmq_message == NULL) return; if (tmq_message == NULL) return;
SMqPollRsp* pRsp = &tmq_message->msg; SMqPollRsp* pRsp = &tmq_message->msg;
...@@ -1553,6 +1603,7 @@ void tmq_message_destroy(tmq_message_t* tmq_message) { ...@@ -1553,6 +1603,7 @@ void tmq_message_destroy(tmq_message_t* tmq_message) {
/*taosMemoryFree(tmq_message);*/ /*taosMemoryFree(tmq_message);*/
taosFreeQitem(tmq_message); taosFreeQitem(tmq_message);
} }
#endif
tmq_resp_err_t tmq_consumer_close(tmq_t* tmq) { return TMQ_RESP_ERR__SUCCESS; } tmq_resp_err_t tmq_consumer_close(tmq_t* tmq) { return TMQ_RESP_ERR__SUCCESS; }
...@@ -1563,4 +1614,27 @@ const char* tmq_err2str(tmq_resp_err_t err) { ...@@ -1563,4 +1614,27 @@ const char* tmq_err2str(tmq_resp_err_t err) {
return "fail"; return "fail";
} }
char* tmq_get_topic_name(tmq_message_t* message) { return "not implemented yet"; } char* tmq_get_topic_name(TAOS_RES* res) {
if (TD_RES_TMQ(res)) {
SMqRspObj* pRspObj = (SMqRspObj*)res;
return pRspObj->topic;
} else {
return NULL;
}
}
int32_t tmq_get_vgroup_id(TAOS_RES* res) {
if (TD_RES_TMQ(res)) {
SMqRspObj* pRspObj = (SMqRspObj*)res;
return pRspObj->vgId;
} else {
return -1;
}
}
void tmq_message_destroy(TAOS_RES* res) {
if (res == NULL) return;
if (TD_RES_TMQ(res)) {
SMqRspObj* pRspObj = (SMqRspObj*)res;
}
}
...@@ -255,7 +255,7 @@ int32_t tqDeserializeConsumer(STQ* pTq, const STqSerializedHead* pHead, STqConsu ...@@ -255,7 +255,7 @@ int32_t tqDeserializeConsumer(STQ* pTq, const STqSerializedHead* pHead, STqConsu
return 0; return 0;
} }
#if 0
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
SMqPollReq* pReq = pMsg->pCont; SMqPollReq* pReq = pMsg->pCont;
int64_t consumerId = pReq->consumerId; int64_t consumerId = pReq->consumerId;
...@@ -433,6 +433,205 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ...@@ -433,6 +433,205 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
return 0; return 0;
} }
#endif
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
SMqPollReq* pReq = pMsg->pCont;
int64_t consumerId = pReq->consumerId;
int64_t fetchOffset;
int64_t blockingTime = pReq->blockingTime;
int32_t reqEpoch = pReq->epoch;
if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__EARLIEAST) {
fetchOffset = 0;
} else if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__LATEST) {
fetchOffset = walGetLastVer(pTq->pWal);
} else {
fetchOffset = pReq->currentOffset + 1;
}
vDebug("tmq poll: consumer %ld (epoch %d) recv poll req in vg %d, req %ld %ld", consumerId, pReq->epoch,
pTq->pVnode->vgId, pReq->currentOffset, fetchOffset);
SMqPollRspV2 rspV2 = {0};
rspV2.dataLen = 0;
STqConsumer* pConsumer = tqHandleGet(pTq->tqMeta, consumerId);
if (pConsumer == NULL) {
vWarn("tmq poll: consumer %ld (epoch %d) not found in vg %d", consumerId, pReq->epoch, pTq->pVnode->vgId);
pMsg->pCont = NULL;
pMsg->contLen = 0;
pMsg->code = -1;
tmsgSendRsp(pMsg);
return 0;
}
int32_t consumerEpoch = atomic_load_32(&pConsumer->epoch);
while (consumerEpoch < reqEpoch) {
consumerEpoch = atomic_val_compare_exchange_32(&pConsumer->epoch, consumerEpoch, reqEpoch);
}
STqTopic* pTopic = NULL;
int32_t topicSz = taosArrayGetSize(pConsumer->topics);
for (int32_t i = 0; i < topicSz; i++) {
STqTopic* topic = taosArrayGet(pConsumer->topics, i);
// TODO race condition
ASSERT(pConsumer->consumerId == consumerId);
if (strcmp(topic->topicName, pReq->topic) == 0) {
pTopic = topic;
break;
}
}
if (pTopic == NULL) {
vWarn("tmq poll: consumer %ld (epoch %d) topic %s not found in vg %d", consumerId, pReq->epoch, pReq->topic,
pTq->pVnode->vgId);
pMsg->pCont = NULL;
pMsg->contLen = 0;
pMsg->code = -1;
tmsgSendRsp(pMsg);
return 0;
}
vDebug("poll topic %s from consumer %ld (epoch %d) vg %d", pTopic->topicName, consumerId, pReq->epoch,
pTq->pVnode->vgId);
rspV2.reqOffset = pReq->currentOffset;
rspV2.skipLogNum = 0;
while (1) {
/*if (fetchOffset > walGetLastVer(pTq->pWal) || walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) {*/
// TODO
consumerEpoch = atomic_load_32(&pConsumer->epoch);
if (consumerEpoch > reqEpoch) {
vDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, found new consumer epoch %d discard req epoch %d",
consumerId, pReq->epoch, pTq->pVnode->vgId, fetchOffset, consumerEpoch, reqEpoch);
break;
}
SWalReadHead* pHead;
if (walReadWithHandle_s(pTopic->pReadhandle, fetchOffset, &pHead) < 0) {
// TODO: no more log, set timer to wait blocking time
// if data inserted during waiting, launch query and
// response to user
vDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, no more log to return", consumerId, pReq->epoch,
pTq->pVnode->vgId, fetchOffset);
break;
}
vDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d offset %ld msgType %d", consumerId, pReq->epoch,
pTq->pVnode->vgId, fetchOffset, pHead->msgType);
/*int8_t pos = fetchOffset % TQ_BUFFER_SIZE;*/
/*pHead = pTopic->pReadhandle->pHead;*/
if (pHead->msgType == TDMT_VND_SUBMIT) {
SSubmitReq* pCont = (SSubmitReq*)&pHead->body;
qTaskInfo_t task = pTopic->buffer.output[workerId].task;
ASSERT(task);
qSetStreamInput(task, pCont, STREAM_DATA_TYPE_SUBMIT_BLOCK);
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
while (1) {
SSDataBlock* pDataBlock = NULL;
uint64_t ts;
if (qExecTask(task, &pDataBlock, &ts) < 0) {
ASSERT(false);
}
if (pDataBlock == NULL) {
/*pos = fetchOffset % TQ_BUFFER_SIZE;*/
break;
}
taosArrayPush(pRes, pDataBlock);
}
if (taosArrayGetSize(pRes) == 0) {
vDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d skip log %ld since not wanted", consumerId,
pReq->epoch, pTq->pVnode->vgId, fetchOffset);
fetchOffset++;
rspV2.skipLogNum++;
taosArrayDestroy(pRes);
continue;
}
rspV2.rspOffset = fetchOffset;
int32_t blockSz = taosArrayGetSize(pRes);
int32_t tlen = 0;
for (int32_t i = 0; i < blockSz; i++) {
SSDataBlock* pBlock = taosArrayGet(pRes, i);
tlen += sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock);
}
void* data = taosMemoryMalloc(tlen);
if (data == NULL) {
pMsg->code = -1;
taosMemoryFree(pHead);
}
rspV2.blockData = data;
void* dataBlockBuf = data;
int32_t pos;
for (int32_t i = 0; i < blockSz; i++) {
pos = 0;
SSDataBlock* pBlock = taosArrayGet(pRes, i);
blockCompressEncode(pBlock, dataBlockBuf, &pos, pBlock->info.numOfCols, false);
taosArrayPush(rspV2.blockPos, &rspV2.dataLen);
rspV2.dataLen += pos;
dataBlockBuf = POINTER_SHIFT(dataBlockBuf, pos);
}
int32_t msgLen = sizeof(SMqRspHead) + tEncodeSMqPollRspV2(NULL, &rspV2);
void* buf = rpcMallocCont(msgLen);
((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
((SMqRspHead*)buf)->epoch = pReq->epoch;
((SMqRspHead*)buf)->consumerId = consumerId;
void* msgBodyBuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
tEncodeSMqPollRspV2(&msgBodyBuf, &rspV2);
/*rsp.pBlockData = pRes;*/
/*taosArrayDestroyEx(rsp.pBlockData, (void (*)(void*))tDeleteSSDataBlock);*/
pMsg->pCont = buf;
pMsg->contLen = tlen;
pMsg->code = 0;
vDebug("vg %d offset %ld msgType %d from consumer %ld (epoch %d) actual rsp", pTq->pVnode->vgId, fetchOffset,
pHead->msgType, consumerId, pReq->epoch);
tmsgSendRsp(pMsg);
taosMemoryFree(pHead);
return 0;
} else {
taosMemoryFree(pHead);
fetchOffset++;
rspV2.skipLogNum++;
}
}
/*if (blockingTime != 0) {*/
/*tqAddClientPusher(pTq->tqPushMgr, pMsg, consumerId, blockingTime);*/
/*} else {*/
rspV2.rspOffset = fetchOffset - 1;
int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqPollRspV2(NULL, &rspV2);
void* buf = rpcMallocCont(tlen);
if (buf == NULL) {
pMsg->code = -1;
return -1;
}
((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
((SMqRspHead*)buf)->epoch = pReq->epoch;
((SMqRspHead*)buf)->consumerId = consumerId;
void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
tEncodeSMqPollRspV2(&abuf, &rspV2);
pMsg->pCont = buf;
pMsg->contLen = tlen;
pMsg->code = 0;
tmsgSendRsp(pMsg);
vDebug("vg %d offset %ld from consumer %ld (epoch %d) not rsp", pTq->pVnode->vgId, fetchOffset, consumerId,
pReq->epoch);
/*}*/
return 0;
}
int32_t tqProcessRebReq(STQ* pTq, char* msg) { int32_t tqProcessRebReq(STQ* pTq, char* msg) {
SMqMVRebReq req = {0}; SMqMVRebReq req = {0};
......
...@@ -92,7 +92,7 @@ static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput, ...@@ -92,7 +92,7 @@ static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput,
return false; return false;
} }
pBuf->allocSize = sizeof(SRetrieveTableRsp) + blockEstimateEncodeSize(pInput->pData); pBuf->allocSize = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pInput->pData);
pBuf->pData = taosMemoryMalloc(pBuf->allocSize); pBuf->pData = taosMemoryMalloc(pBuf->allocSize);
if (pBuf->pData == NULL) { if (pBuf->pData == NULL) {
......
...@@ -195,7 +195,7 @@ void parseArgument(int32_t argc, char *argv[]) { ...@@ -195,7 +195,7 @@ void parseArgument(int32_t argc, char *argv[]) {
} }
static int running = 1; static int running = 1;
static void msg_process(tmq_message_t* message) { tmqShowMsg(message); } /*static void msg_process(tmq_message_t* message) { tmqShowMsg(message); }*/
// calc dir size (not include itself 4096Byte) // calc dir size (not include itself 4096Byte)
int64_t getDirectorySize(char *dir) int64_t getDirectorySize(char *dir)
...@@ -363,9 +363,9 @@ void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) { ...@@ -363,9 +363,9 @@ void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
} }
while (running) { while (running) {
tmq_message_t* tmqmessage = tmq_consumer_poll(tmq, 1); TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 1);
if (tmqmessage) { if (tmqmessage) {
msg_process(tmqmessage); /*msg_process(tmqmessage);*/
tmq_message_destroy(tmqmessage); tmq_message_destroy(tmqmessage);
if ((++msg_count % MIN_COMMIT_COUNT) == 0) tmq_commit(tmq, NULL, 0); if ((++msg_count % MIN_COMMIT_COUNT) == 0) tmq_commit(tmq, NULL, 0);
...@@ -392,12 +392,12 @@ void perf_loop(tmq_t* tmq, tmq_list_t* topics, int32_t totalMsgs, int64_t walLog ...@@ -392,12 +392,12 @@ void perf_loop(tmq_t* tmq, tmq_list_t* topics, int32_t totalMsgs, int64_t walLog
int32_t skipLogNum = 0; int32_t skipLogNum = 0;
int64_t startTime = taosGetTimestampUs(); int64_t startTime = taosGetTimestampUs();
while (running) { while (running) {
tmq_message_t* tmqmessage = tmq_consumer_poll(tmq, 3000); TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 3000);
if (tmqmessage) { if (tmqmessage) {
batchCnt++; batchCnt++;
skipLogNum += tmqGetSkipLogNum(tmqmessage); /*skipLogNum += tmqGetSkipLogNum(tmqmessage);*/
if (0 != g_stConfInfo.showMsgFlag) { if (0 != g_stConfInfo.showMsgFlag) {
msg_process(tmqmessage); /*msg_process(tmqmessage);*/
} }
tmq_message_destroy(tmqmessage); tmq_message_destroy(tmqmessage);
} else { } else {
......
...@@ -170,7 +170,7 @@ void parseInputString() { ...@@ -170,7 +170,7 @@ void parseInputString() {
static int running = 1; static int running = 1;
static void msg_process(tmq_message_t* message) { tmqShowMsg(message); } /*static void msg_process(tmq_message_t* message) { tmqShowMsg(message); }*/
int queryDB(TAOS *taos, char *command) { int queryDB(TAOS *taos, char *command) {
...@@ -226,7 +226,7 @@ void loop_consume(tmq_t* tmq) { ...@@ -226,7 +226,7 @@ void loop_consume(tmq_t* tmq) {
int32_t totalRows = 0; int32_t totalRows = 0;
int32_t skipLogNum = 0; int32_t skipLogNum = 0;
while (running) { while (running) {
tmq_message_t* tmqMsg = tmq_consumer_poll(tmq, 8000); TAOS_RES* tmqMsg = tmq_consumer_poll(tmq, 8000);
if (tmqMsg) { if (tmqMsg) {
totalMsgs++; totalMsgs++;
...@@ -237,9 +237,9 @@ void loop_consume(tmq_t* tmq) { ...@@ -237,9 +237,9 @@ void loop_consume(tmq_t* tmq) {
} }
#endif #endif
skipLogNum += tmqGetSkipLogNum(tmqMsg); /*skipLogNum += tmqGetSkipLogNum(tmqMsg);*/
if (0 != g_stConfInfo.showMsgFlag) { if (0 != g_stConfInfo.showMsgFlag) {
msg_process(tmqMsg); /*msg_process(tmqMsg);*/
} }
tmq_message_destroy(tmqMsg); tmq_message_destroy(tmqMsg);
} else { } else {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册