提交 b1226e5d 编写于 作者: L Liu Jicong

feat(tmq): support err2str

上级 238b2c16
......@@ -103,10 +103,10 @@ typedef void (*__taos_async_fn_t)(void *param, TAOS_RES *, int code);
typedef struct TAOS_MULTI_BIND {
int buffer_type;
void * buffer;
void *buffer;
uintptr_t buffer_length;
int32_t * length;
char * is_null;
int32_t *length;
char *is_null;
int num;
} TAOS_MULTI_BIND;
......@@ -130,7 +130,7 @@ DLL_EXPORT void taos_cleanup(void);
DLL_EXPORT int taos_options(TSDB_OPTION option, const void *arg, ...);
DLL_EXPORT setConfRet taos_set_config(const char *config);
DLL_EXPORT int taos_init(void);
DLL_EXPORT TAOS *taos_connect(const char *ip, const char *user, const char *pass, const char *db, uint16_t port);
DLL_EXPORT TAOS *taos_connect(const char *ip, const char *user, const char *pass, const char *db, uint16_t port);
DLL_EXPORT TAOS *taos_connect_l(const char *ip, int ipLen, const char *user, int userLen, const char *pass, int passLen,
const char *db, int dbLen, uint16_t port);
DLL_EXPORT TAOS *taos_connect_auth(const char *ip, const char *user, const char *auth, const char *db, uint16_t port);
......@@ -147,17 +147,17 @@ DLL_EXPORT int taos_stmt_set_sub_tbname(TAOS_STMT *stmt, const char *name
DLL_EXPORT int taos_stmt_get_tag_fields(TAOS_STMT *stmt, int *fieldNum, TAOS_FIELD_E **fields);
DLL_EXPORT int taos_stmt_get_col_fields(TAOS_STMT *stmt, int *fieldNum, TAOS_FIELD_E **fields);
DLL_EXPORT int taos_stmt_is_insert(TAOS_STMT *stmt, int *insert);
DLL_EXPORT int taos_stmt_num_params(TAOS_STMT *stmt, int *nums);
DLL_EXPORT int taos_stmt_get_param(TAOS_STMT *stmt, int idx, int *type, int *bytes);
DLL_EXPORT int taos_stmt_bind_param(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind);
DLL_EXPORT int taos_stmt_bind_param_batch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind);
DLL_EXPORT int taos_stmt_bind_single_param_batch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind, int colIdx);
DLL_EXPORT int taos_stmt_add_batch(TAOS_STMT *stmt);
DLL_EXPORT int taos_stmt_execute(TAOS_STMT *stmt);
DLL_EXPORT int taos_stmt_is_insert(TAOS_STMT *stmt, int *insert);
DLL_EXPORT int taos_stmt_num_params(TAOS_STMT *stmt, int *nums);
DLL_EXPORT int taos_stmt_get_param(TAOS_STMT *stmt, int idx, int *type, int *bytes);
DLL_EXPORT int taos_stmt_bind_param(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind);
DLL_EXPORT int taos_stmt_bind_param_batch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind);
DLL_EXPORT int taos_stmt_bind_single_param_batch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind, int colIdx);
DLL_EXPORT int taos_stmt_add_batch(TAOS_STMT *stmt);
DLL_EXPORT int taos_stmt_execute(TAOS_STMT *stmt);
DLL_EXPORT TAOS_RES *taos_stmt_use_result(TAOS_STMT *stmt);
DLL_EXPORT int taos_stmt_close(TAOS_STMT *stmt);
DLL_EXPORT char * taos_stmt_errstr(TAOS_STMT *stmt);
DLL_EXPORT char *taos_stmt_errstr(TAOS_STMT *stmt);
DLL_EXPORT int taos_stmt_affected_rows(TAOS_STMT *stmt);
DLL_EXPORT int taos_stmt_affected_rows_once(TAOS_STMT *stmt);
......@@ -179,11 +179,11 @@ DLL_EXPORT bool taos_is_update_query(TAOS_RES *res);
DLL_EXPORT int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows);
DLL_EXPORT int taos_fetch_block_s(TAOS_RES *res, int *numOfRows, TAOS_ROW *rows);
DLL_EXPORT int taos_fetch_raw_block(TAOS_RES *res, int *numOfRows, void **pData);
DLL_EXPORT int * taos_get_column_data_offset(TAOS_RES *res, int columnIndex);
DLL_EXPORT int *taos_get_column_data_offset(TAOS_RES *res, int columnIndex);
DLL_EXPORT int taos_validate_sql(TAOS *taos, const char *sql);
DLL_EXPORT void taos_reset_current_db(TAOS *taos);
DLL_EXPORT int *taos_fetch_lengths(TAOS_RES *res);
DLL_EXPORT int *taos_fetch_lengths(TAOS_RES *res);
DLL_EXPORT TAOS_ROW *taos_result_block(TAOS_RES *res);
DLL_EXPORT const char *taos_get_server_info(TAOS *taos);
......@@ -204,17 +204,17 @@ DLL_EXPORT TAOS_RES *taos_consume(TAOS_SUB *tsub);
DLL_EXPORT void taos_unsubscribe(TAOS_SUB *tsub, int keepProgress);
#endif
DLL_EXPORT int taos_load_table_info(TAOS *taos, const char *tableNameList);
DLL_EXPORT int taos_load_table_info(TAOS *taos, const char *tableNameList);
DLL_EXPORT TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int protocol, int precision);
/* --------------------------TMQ INTERFACE------------------------------- */
enum tmq_resp_err_t {
enum {
TMQ_RESP_ERR__FAIL = -1,
TMQ_RESP_ERR__SUCCESS = 0,
};
typedef enum tmq_resp_err_t tmq_resp_err_t;
typedef int32_t tmq_resp_err_t;
typedef struct tmq_t tmq_t;
typedef struct tmq_topic_vgroup_t tmq_topic_vgroup_t;
......@@ -229,7 +229,7 @@ DLL_EXPORT tmq_list_t *tmq_list_new();
DLL_EXPORT int32_t tmq_list_append(tmq_list_t *, const char *);
DLL_EXPORT void tmq_list_destroy(tmq_list_t *);
DLL_EXPORT int32_t tmq_list_get_size(const tmq_list_t *);
DLL_EXPORT char ** tmq_list_to_c_array(const tmq_list_t *);
DLL_EXPORT char **tmq_list_to_c_array(const tmq_list_t *);
DLL_EXPORT tmq_t *tmq_consumer_new(tmq_conf_t *conf, char *errstr, int32_t errstrLen);
......@@ -240,7 +240,7 @@ DLL_EXPORT const char *tmq_err2str(tmq_resp_err_t);
DLL_EXPORT tmq_resp_err_t tmq_subscribe(tmq_t *tmq, const tmq_list_t *topic_list);
DLL_EXPORT tmq_resp_err_t tmq_unsubscribe(tmq_t *tmq);
DLL_EXPORT tmq_resp_err_t tmq_subscription(tmq_t *tmq, tmq_list_t **topics);
DLL_EXPORT TAOS_RES * tmq_consumer_poll(tmq_t *tmq, int64_t timeout);
DLL_EXPORT TAOS_RES *tmq_consumer_poll(tmq_t *tmq, int64_t timeout);
DLL_EXPORT tmq_resp_err_t tmq_consumer_close(tmq_t *tmq);
DLL_EXPORT tmq_resp_err_t tmq_commit_sync(tmq_t *tmq, const tmq_topic_vgroup_list_t *offsets);
DLL_EXPORT void tmq_commit_async(tmq_t *tmq, const tmq_topic_vgroup_list_t *offsets, tmq_commit_cb *cb, void *param);
......@@ -260,7 +260,7 @@ enum tmq_conf_res_t {
typedef enum tmq_conf_res_t tmq_conf_res_t;
DLL_EXPORT tmq_conf_t * tmq_conf_new();
DLL_EXPORT tmq_conf_t *tmq_conf_new();
DLL_EXPORT tmq_conf_res_t tmq_conf_set(tmq_conf_t *conf, const char *key, const char *value);
DLL_EXPORT void tmq_conf_destroy(tmq_conf_t *conf);
DLL_EXPORT void tmq_conf_set_auto_commit_cb(tmq_conf_t *conf, tmq_commit_cb *cb, void *param);
......
......@@ -323,7 +323,7 @@ static int32_t tmqMakeTopicVgKey(char* dst, const char* topicName, int32_t vg) {
int32_t tmqCommitCb(void* param, const SDataBuf* pMsg, int32_t code) {
SMqCommitCbParam* pParam = (SMqCommitCbParam*)param;
pParam->rspErr = code == 0 ? TMQ_RESP_ERR__SUCCESS : TMQ_RESP_ERR__FAIL;
pParam->rspErr = code;
if (pParam->async) {
if (pParam->automatic && pParam->tmq->commitCb) {
pParam->tmq->commitCb(pParam->tmq, pParam->rspErr, (tmq_topic_vgroup_list_t*)pParam->offsets,
......@@ -432,12 +432,13 @@ int32_t tmqCommitInner(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int8_
code = pParam->rspErr;
tsem_destroy(&pParam->rspSem);
taosMemoryFree(pParam);
} else {
code = 0;
}
// avoid double free if msg is sent
buf = NULL;
code = 0;
END:
if (buf) taosMemoryFree(buf);
/*if (pParam) taosMemoryFree(pParam);*/
......@@ -445,9 +446,9 @@ END:
if (code != 0 && async) {
if (automatic) {
tmq->commitCb(tmq, TMQ_RESP_ERR__FAIL, (tmq_topic_vgroup_list_t*)pOffsets, tmq->commitCbUserParam);
tmq->commitCb(tmq, code, (tmq_topic_vgroup_list_t*)pOffsets, tmq->commitCbUserParam);
} else {
userCb(tmq, TMQ_RESP_ERR__FAIL, (tmq_topic_vgroup_list_t*)pOffsets, userParam);
userCb(tmq, code, (tmq_topic_vgroup_list_t*)pOffsets, userParam);
}
}
......@@ -1474,16 +1475,16 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
tmq_resp_err_t tmq_consumer_close(tmq_t* tmq) {
if (tmq->status == TMQ_CONSUMER_STATUS__READY) {
tmq_resp_err_t rsp = tmq_commit_sync(tmq, NULL);
if (rsp == TMQ_RESP_ERR__FAIL) {
return TMQ_RESP_ERR__FAIL;
if (rsp != TMQ_RESP_ERR__SUCCESS) {
return rsp;
}
tmq_list_t* lst = tmq_list_new();
rsp = tmq_subscribe(tmq, lst);
tmq_list_destroy(lst);
if (rsp == TMQ_RESP_ERR__FAIL) {
return TMQ_RESP_ERR__FAIL;
if (rsp != TMQ_RESP_ERR__SUCCESS) {
return rsp;
}
}
// TODO: free resources
......@@ -1493,8 +1494,11 @@ tmq_resp_err_t tmq_consumer_close(tmq_t* tmq) {
const char* tmq_err2str(tmq_resp_err_t err) {
if (err == TMQ_RESP_ERR__SUCCESS) {
return "success";
} else if (err == TMQ_RESP_ERR__FAIL) {
return "fail";
} else {
return tstrerror(err);
}
return "fail";
}
const char* tmq_get_topic_name(TAOS_RES* res) {
......
......@@ -39,6 +39,9 @@ static int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataBlkRsp* pRs
static int32_t tqAddBlockSchemaToRsp(const STqExecHandle* pExec, int32_t workerId, SMqDataBlkRsp* pRsp) {
SSchemaWrapper* pSW = tCloneSSchemaWrapper(pExec->pExecReader[workerId]->pSchemaWrapper);
if (pSW == NULL) {
return -1;
}
taosArrayPush(pRsp->blockSchema, &pSW);
return 0;
}
......
......@@ -514,14 +514,14 @@ void* consumeThreadFunc(void* param) {
err = tmq_unsubscribe(pInfo->tmq);
if (err) {
pError("tmq_unsubscribe() fail, reason: %s\n", tmq_err2str(err));
pInfo->consumeMsgCnt = -1;
return NULL;
/*pInfo->consumeMsgCnt = -1;*/
/*return NULL;*/
}
err = tmq_consumer_close(pInfo->tmq);
if (err) {
pError("tmq_consumer_close() fail, reason: %s\n", tmq_err2str(err));
exit(-1);
/*exit(-1);*/
}
pInfo->tmq = NULL;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册