--- sidebar_label: C/C++ title: C/C++ Connector --- C/C++ 开发人员可以使用 TDengine 的客户端驱动,即 C/C++连接器 (以下都用 TDengine 客户端驱动表示),开发自己的应用来连接 TDengine 集群完成数据存储、查询以及其他功能。TDengine 客户端驱动的 API 类似于 MySQL 的 C API。应用程序使用时,需要包含 TDengine 头文件 _taos.h_,里面列出了提供的 API 的函数原型;应用程序还要链接到所在平台上对应的动态库。 ```c #include ``` TDengine 服务端或客户端安装后,`taos.h` 位于: - Linux:`/usr/local/taos/include` - Windows:`C:\TDengine\include` - macOS:`/usr/local/include` TDengine 客户端驱动的动态库位于: - Linux: `/usr/local/taos/driver/libtaos.so` - Windows: `C:\TDengine\taos.dll` - macOS: `/usr/local/lib/libtaos.dylib` ## 支持的平台 请参考[支持的平台列表](../#支持的平台) ## 支持的版本 TDengine 客户端驱动的版本号与 TDengine 服务端的版本号是一一对应的强对应关系,建议使用与 TDengine 服务端完全相同的客户端驱动。虽然低版本的客户端驱动在前三段版本号一致(即仅第四段版本号不同)的情况下也能够与高版本的服务端相兼容,但这并非推荐用法。强烈不建议使用高版本的客户端驱动访问低版本的服务端。 ## 安装步骤 TDengine 客户端驱动的安装请参考 [安装指南](../#安装步骤) ## 建立连接 使用客户端驱动访问 TDengine 集群的基本过程为:建立连接、查询和写入、关闭连接、清除资源。 下面为建立连接的示例代码,其中省略了查询和写入部分,展示了如何建立连接、关闭连接以及清除资源。 ```c TAOS *taos = taos_connect("localhost:6030", "root", "taosdata", NULL, 0); if (taos == NULL) { printf("failed to connect to server, reason:%s\n", "null taos" /*taos_errstr(taos)*/); exit(1); } /* put your code here for read and write */ taos_close(taos); taos_cleanup(); ``` 在上面的示例代码中, `taos_connect()` 建立到客户端程序所在主机的 6030 端口的连接,`taos_close()`关闭当前连接,`taos_cleanup()`清除客户端驱动所申请和使用的资源。 :::note - 如未特别说明,当 API 的返回值是整数时,_0_ 代表成功,其它是代表失败原因的错误码,当返回值是指针时, _NULL_ 表示失败。 - 所有的错误码以及对应的原因描述在 `taoserror.h` 文件中。 ::: ## 示例程序 本节展示了使用客户端驱动访问 TDengine 集群的常见访问方式的示例代码。 ### 同步查询示例
同步查询 ```c {{#include examples/c/demo.c}} ``` 格式化输出不同类型字段函数 taos_print_row ```c int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields) { int32_t len = 0; for (int i = 0; i < num_fields; ++i) { if (i > 0) { str[len++] = ' '; } if (row[i] == NULL) { len += sprintf(str + len, "%s", TSDB_DATA_NULL_STR); continue; } switch (fields[i].type) { case TSDB_DATA_TYPE_TINYINT: len += sprintf(str + len, "%d", *((int8_t *)row[i])); break; case TSDB_DATA_TYPE_UTINYINT: len += sprintf(str + len, "%u", *((uint8_t *)row[i])); break; case TSDB_DATA_TYPE_SMALLINT: len += sprintf(str + len, "%d", *((int16_t *)row[i])); break; case TSDB_DATA_TYPE_USMALLINT: len += sprintf(str + len, "%u", *((uint16_t *)row[i])); break; case TSDB_DATA_TYPE_INT: len += sprintf(str + len, "%d", *((int32_t *)row[i])); break; case TSDB_DATA_TYPE_UINT: len += sprintf(str + len, "%u", *((uint32_t *)row[i])); break; case TSDB_DATA_TYPE_BIGINT: len += sprintf(str + len, "%" PRId64, *((int64_t *)row[i])); break; case TSDB_DATA_TYPE_UBIGINT: len += sprintf(str + len, "%" PRIu64, *((uint64_t *)row[i])); break; case TSDB_DATA_TYPE_FLOAT: { float fv = 0; fv = GET_FLOAT_VAL(row[i]); len += sprintf(str + len, "%f", fv); } break; case TSDB_DATA_TYPE_DOUBLE: { double dv = 0; dv = GET_DOUBLE_VAL(row[i]); len += sprintf(str + len, "%lf", dv); } break; case TSDB_DATA_TYPE_BINARY: case TSDB_DATA_TYPE_NCHAR: { int32_t charLen = varDataLen((char *)row[i] - VARSTR_HEADER_SIZE); if (fields[i].type == TSDB_DATA_TYPE_BINARY) { assert(charLen <= fields[i].bytes && charLen >= 0); } else { assert(charLen <= fields[i].bytes * TSDB_NCHAR_SIZE && charLen >= 0); } memcpy(str + len, row[i], charLen); len += charLen; } break; case TSDB_DATA_TYPE_TIMESTAMP: len += sprintf(str + len, "%" PRId64, *((int64_t *)row[i])); break; case TSDB_DATA_TYPE_BOOL: len += sprintf(str + len, "%d", *((int8_t *)row[i])); default: break; } } str[len] = 0; return len; } ```
### 异步查询示例
异步查询 ```c {{#include examples/c/asyncdemo.c}} ```
### 参数绑定示例
参数绑定 ```c {{#include examples/c/prepare.c}} ```
### 无模式写入示例
无模式写入 ```c {{#include examples/c/schemaless.c}} ```
### 订阅和消费示例
订阅和消费 ```c {{#include examples/c/tmq.c}} ```
:::info 更多示例代码及下载请见 [GitHub](https://github.com/taosdata/TDengine/tree/develop/examples/c)。 也可以在安装目录下的 `examples/c` 路径下找到。 该目录下有 makefile,在 Linux/macOS 环境下,直接执行 make 就可以编译得到执行文件。 **提示:**在 ARM 环境下编译时,请将 makefile 中的 `-msse4.2` 去掉,这个选项只有在 x64/x86 硬件平台上才能支持。 ::: ## API 参考 以下分别介绍 TDengine 客户端驱动的基础 API、同步 API、异步 API、订阅 API 和无模式写入 API。 ### 基础 API 基础 API 用于完成创建数据库连接等工作,为其它 API 的执行提供运行时环境。 - `void taos_init()` 初始化运行环境。如果没有主动调用该 API,那么调用 `taos_connect()` 时驱动将自动调用该 API,故程序一般无需手动调用。 - `void taos_cleanup()` 清理运行环境,应用退出前应调用。 - `int taos_options(TSDB_OPTION option, const void * arg, ...)` 设置客户端选项,目前支持区域设置(`TSDB_OPTION_LOCALE`)、字符集设置(`TSDB_OPTION_CHARSET`)、时区设置(`TSDB_OPTION_TIMEZONE`)、配置文件路径设置(`TSDB_OPTION_CONFIGDIR`)。区域设置、字符集、时区默认为操作系统当前设置。 - `char *taos_get_client_info()` 获取客户端版本信息。 - `TAOS *taos_connect(const char *host, const char *user, const char *pass, const char *db, int port)` 创建数据库连接,初始化连接上下文。其中需要用户提供的参数包含: - host:TDengine 集群中任一节点的 FQDN - user:用户名 - pass:密码 - db: 数据库名字,如果用户没有提供,也可以正常连接,用户可以通过该连接创建新的数据库,如果用户提供了数据库名字,则说明该数据库用户已经创建好,缺省使用该数据库 - port:taosd 程序监听的端口 返回值为空表示失败。应用程序需要保存返回的参数,以便后续使用。 :::info 同一进程可以根据不同的 host/port 连接多个 TDengine 集群 ::: - `char *taos_get_server_info(TAOS *taos)` 获取服务端版本信息。 - `int taos_select_db(TAOS *taos, const char *db)` 将当前的缺省数据库设置为 `db`。 - `int taos_get_current_db(TAOS *taos, char *database, int len, int *required)` - database,len为用户在外面申请的空间,内部会把当前db赋值到database里。 - 只要是没有正常把db名赋值到database中(包括截断),返回错误,返回值为-1,然后用户可以通过 taos_errstr(NULL) 来获取错误提示。 - 如果,database == NULL 或者 len<=0 返回错误,required里保存存储db需要的空间(包含最后的'\0') - 如果,len 小于 存储db需要的空间(包含最后的'\0'),返回错误,database里赋值截断的数据,以'\0'结尾。 - 如果,len 大于等于 存储db需要的空间(包含最后的'\0'),返回正常0,database里赋值以'\0‘结尾的db名。 - `void taos_close(TAOS *taos)` 关闭连接,其中`taos`是 `taos_connect()` 返回的句柄。 ### 同步查询 API 本小节介绍 API 均属于同步接口。应用调用后,会阻塞等待响应,直到获得返回结果或错误信息。 - `TAOS_RES* taos_query(TAOS *taos, const char *sql)` 执行 SQL 语句,可以是 DQL、DML 或 DDL 语句。 其中的 `taos` 参数是通过 `taos_connect()` 获得的句柄。不能通过返回值是否是 `NULL` 来判断执行结果是否失败,而是需要用 `taos_errno()` 函数解析结果集中的错误代码来进行判断。 - `int taos_result_precision(TAOS_RES *res)` 返回结果集时间戳字段的精度,`0` 代表毫秒,`1` 代表微秒,`2` 代表纳秒。 - `TAOS_ROW taos_fetch_row(TAOS_RES *res)` 按行获取查询结果集中的数据。 - `int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows)` 批量获取查询结果集中的数据,返回值为获取到的数据的行数。 - `int taos_num_fields(TAOS_RES *res)` 和 `int taos_field_count(TAOS_RES *res)` 这两个 API 等价,用于获取查询结果集中的列数。 - `int* taos_fetch_lengths(TAOS_RES *res)` 获取结果集中每个字段的长度。返回值是一个数组,其长度为结果集的列数。 - `int taos_affected_rows(TAOS_RES *res)` 获取被所执行的 SQL 语句影响的行数。 - `TAOS_FIELD *taos_fetch_fields(TAOS_RES *res)` 获取查询结果集每列数据的属性(列的名称、列的数据类型、列的长度),与 `taos_num_fields()` 配合使用,可用来解析 `taos_fetch_row()` 返回的一个元组(一行)的数据。 `TAOS_FIELD` 的结构如下: ```c typedef struct taosField { char name[65]; // column name uint8_t type; // data type int16_t bytes; // length, in bytes } TAOS_FIELD; ``` - `void taos_stop_query(TAOS_RES *res)` 停止当前查询的执行。 - `void taos_free_result(TAOS_RES *res)` 释放查询结果集以及相关的资源。查询完成后,务必调用该 API 释放资源,否则可能导致应用内存泄露。但也需注意,释放资源后,如果再调用 `taos_consume()` 等获取查询结果的函数,将导致应用崩溃。 - `char *taos_errstr(TAOS_RES *res)` 获取最近一次 API 调用失败的原因,返回值为字符串标识的错误提示信息。 - `int taos_errno(TAOS_RES *res)` 获取最近一次 API 调用失败的原因,返回值为错误代码。 :::note 2.0 及以上版本 TDengine 推荐数据库应用的每个线程都建立一个独立的连接,或基于线程建立连接池。而不推荐在应用中将该连接 (TAOS\*) 结构体传递到不同的线程共享使用。基于 TAOS 结构体发出的查询、写入等操作具有多线程安全性,但 “USE statement” 等状态量有可能在线程之间相互干扰。此外,C 语言的连接器可以按照需求动态建立面向数据库的新连接(该过程对用户不可见),同时建议只有在程序最后退出的时候才调用 `taos_close()` 关闭连接。 另一个需要注意的是,在上述同步 API 执行过程中,不能调用类似 pthread_cancel 之类的 API 来强制结束线程,因为涉及一些模块的同步操作,如果强制结束线程有可能造成包括但不限于死锁等异常状况。 ::: ### 异步查询 API TDengine 还提供性能更高的异步 API 处理数据插入、查询操作。在软硬件环境相同的情况下,异步 API 处理数据插入的速度比同步 API 快 2 ~ 4 倍。异步 API 采用非阻塞式的调用方式,在系统真正完成某个具体数据库操作前,立即返回。调用的线程可以去处理其他工作,从而可以提升整个应用的性能。异步 API 在网络延迟严重的情况下,优势尤为突出。 异步 API 都需要应用提供相应的回调函数,回调函数参数设置如下:前两个参数都是一致的,第三个参数依不同的 API 而定。第一个参数 param 是应用调用异步 API 时提供给系统的,用于回调时,应用能够找回具体操作的上下文,依具体实现而定。第二个参数是 SQL 操作的结果集,如果为空,比如 insert 操作,表示没有记录返回,如果不为空,比如 select 操作,表示有记录返回。 异步 API 对于使用者的要求相对较高,用户可根据具体应用场景选择性使用。下面是两个重要的异步 API: - `void taos_query_a(TAOS *taos, const char *sql, void (*fp)(void *param, TAOS_RES *, int code), void *param);` 异步执行 SQL 语句。 - taos:调用 `taos_connect()` 返回的数据库连接 - sql:需要执行的 SQL 语句 - fp:用户定义的回调函数,其第三个参数 `code` 用于指示操作是否成功,`0` 表示成功,负数表示失败(调用 `taos_errstr()` 可获取失败原因)。应用在定义回调函数的时候,主要处理第二个参数 `TAOS_RES *`,该参数是查询返回的结果集 - param:应用提供一个用于回调的参数 - `void taos_fetch_rows_a(TAOS_RES *res, void (*fp)(void *param, TAOS_RES *, int numOfRows), void *param);` 批量获取异步查询的结果集,只能与 `taos_query_a()` 配合使用。其中: - res:`taos_query_a()` 回调时返回的结果集 - fp:回调函数。其参数 `param` 是用户可定义的传递给回调函数的参数结构体;`numOfRows` 是获取到的数据的行数(不是整个查询结果集的函数)。 在回调函数中,应用可以通过调用 `taos_fetch_row()` 前向迭代获取批量记录中每一行记录。读完一块内的所有记录后,应用需要在回调函数中继续调用 `taos_fetch_rows_a()` 获取下一批记录进行处理,直到返回的记录数 `numOfRows` 为零(结果返回完成)或记录数为负值(查询出错)。 TDengine 的异步 API 均采用非阻塞调用模式。应用程序可以用多线程同时打开多张表,并可以同时对每张打开的表进行查询或者插入操作。需要指出的是,**客户端应用必须确保对同一张表的操作完全串行化**,即对同一个表的插入或查询操作未完成时(未返回时),不能够执行第二个插入或查询操作。 ### 参数绑定 API 除了直接调用 `taos_query()` 进行查询,TDengine 也提供了支持参数绑定的 Prepare API,风格与 MySQL 类似,目前也仅支持用问号 `?` 来代表待绑定的参数。 从 2.1.1.0 和 2.1.2.0 版本开始,TDengine 大幅改进了参数绑定接口对数据写入(INSERT)场景的支持。这样在通过参数绑定接口写入数据时,就避免了 SQL 语法解析的资源消耗,从而在绝大多数情况下显著提升写入性能。此时的典型操作步骤如下: 1. 调用 `taos_stmt_init()` 创建参数绑定对象; 2. 调用 `taos_stmt_prepare()` 解析 INSERT 语句; 3. 如果 INSERT 语句中预留了表名但没有预留 TAGS,那么调用 `taos_stmt_set_tbname()` 来设置表名; 4. 如果 INSERT 语句中既预留了表名又预留了 TAGS(例如 INSERT 语句采取的是自动建表的方式),那么调用 `taos_stmt_set_tbname_tags()` 来设置表名和 TAGS 的值; 5. 调用 `taos_stmt_bind_param_batch()` 以多行的方式设置 VALUES 的值,或者调用 `taos_stmt_bind_param()` 以单行的方式设置 VALUES 的值; 6. 调用 `taos_stmt_add_batch()` 把当前绑定的参数加入批处理; 7. 可以重复第 3 ~ 6 步,为批处理加入更多的数据行; 8. 调用 `taos_stmt_execute()` 执行已经准备好的批处理指令; 9. 执行完毕,调用 `taos_stmt_close()` 释放所有资源。 说明:如果 `taos_stmt_execute()` 执行成功,假如不需要改变 SQL 语句的话,那么是可以复用 `taos_stmt_prepare()` 的解析结果,直接进行第 3 ~ 6 步绑定新数据的。但如果执行出错,那么并不建议继续在当前的环境上下文下继续工作,而是建议释放资源,然后从 `taos_stmt_init()` 步骤重新开始。 接口相关的具体函数如下(也可以参考 [prepare.c](https://github.com/taosdata/TDengine/blob/develop/examples/c/prepare.c) 文件中使用对应函数的方式): - `TAOS_STMT* taos_stmt_init(TAOS *taos)` 创建一个 TAOS_STMT 对象用于后续调用。 - `int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length)` 解析一条 SQL 语句,将解析结果和参数信息绑定到 stmt 上,如果参数 length 大于 0,将使用此参数作为 SQL 语句的长度,如等于 0,将自动判断 SQL 语句的长度。 - `int taos_stmt_bind_param(TAOS_STMT *stmt, TAOS_BIND *bind)` 不如 `taos_stmt_bind_param_batch()` 效率高,但可以支持非 INSERT 类型的 SQL 语句。 进行参数绑定,bind 指向一个数组(代表所要绑定的一行数据),需保证此数组中的元素数量和顺序与 SQL 语句中的参数完全一致。TAOS_BIND 的使用方法与 MySQL 中的 MYSQL_BIND 类似,具体定义如下: ```c typedef struct TAOS_BIND { int buffer_type; void * buffer; uintptr_t buffer_length; // not in use uintptr_t * length; int * is_null; int is_unsigned; // not in use int * error; // not in use } TAOS_BIND; ``` - `int taos_stmt_set_tbname(TAOS_STMT* stmt, const char* name)` (2.1.1.0 版本新增,仅支持用于替换 INSERT 语句中的参数值) 当 SQL 语句中的表名使用了 `?` 占位时,可以使用此函数绑定一个具体的表名。 - `int taos_stmt_set_tbname_tags(TAOS_STMT* stmt, const char* name, TAOS_BIND* tags)` (2.1.2.0 版本新增,仅支持用于替换 INSERT 语句中的参数值) 当 SQL 语句中的表名和 TAGS 都使用了 `?` 占位时,可以使用此函数绑定具体的表名和具体的 TAGS 取值。最典型的使用场景是使用了自动建表功能的 INSERT 语句(目前版本不支持指定具体的 TAGS 列)。TAGS 参数中的列数量需要与 SQL 语句中要求的 TAGS 数量完全一致。 - `int taos_stmt_bind_param_batch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind)` (2.1.1.0 版本新增,仅支持用于替换 INSERT 语句中的参数值) 以多列的方式传递待绑定的数据,需要保证这里传递的数据列的顺序、列的数量与 SQL 语句中的 VALUES 参数完全一致。TAOS_MULTI_BIND 的具体定义如下: ```c typedef struct TAOS_MULTI_BIND { int buffer_type; void * buffer; uintptr_t buffer_length; uintptr_t * length; char * is_null; int num; // the number of columns } TAOS_MULTI_BIND; ``` - `int taos_stmt_add_batch(TAOS_STMT *stmt)` 将当前绑定的参数加入批处理中,调用此函数后,可以再次调用 `taos_stmt_bind_param()` 或 `taos_stmt_bind_param_batch()` 绑定新的参数。需要注意,此函数仅支持 INSERT/IMPORT 语句,如果是 SELECT 等其他 SQL 语句,将返回错误。 - `int taos_stmt_execute(TAOS_STMT *stmt)` 执行准备好的语句。目前,一条语句只能执行一次。 - `TAOS_RES* taos_stmt_use_result(TAOS_STMT *stmt)` 获取语句的结果集。结果集的使用方式与非参数化调用时一致,使用完成后,应对此结果集调用 `taos_free_result()` 以释放资源。 - `int taos_stmt_close(TAOS_STMT *stmt)` 执行完毕,释放所有资源。 - `char * taos_stmt_errstr(TAOS_STMT *stmt)` (2.1.3.0 版本新增) 用于在其他 STMT API 返回错误(返回错误码或空指针)时获取错误信息。 ### 无模式(schemaless)写入 API 除了使用 SQL 方式或者使用参数绑定 API 写入数据外,还可以使用 Schemaless 的方式完成写入。Schemaless 可以免于预先创建超级表/数据子表的数据结构,而是可以直接写入数据,TDengine 系统会根据写入的数据内容自动创建和维护所需要的表结构。Schemaless 的使用方式详见 [Schemaless 写入](/reference/schemaless/) 章节,这里介绍与之配套使用的 C/C++ API。 - `TAOS_RES* taos_schemaless_insert(TAOS* taos, const char* lines[], int numLines, int protocol, int precision)` **功能说明** - 该接口将行协议的文本数据写入到 TDengine 中。 **参数说明** - taos: 数据库连接,通过 `taos_connect()` 函数建立的数据库连接。 - lines:文本数据。满足解析格式要求的无模式文本字符串。 - numLines:文本数据的行数,不能为 0 。 - protocol: 行协议类型,用于标识文本数据格式。 - precision:文本数据中的时间戳精度字符串。 **返回值** - TAOS_RES 结构体,应用可以通过使用 `taos_errstr()` 获得错误信息,也可以使用 `taos_errno()` 获得错误码。 在某些情况下,返回的 TAOS_RES 为 `NULL`,此时仍然可以调用 `taos_errno()` 来安全地获得错误码信息。 返回的 TAOS_RES 需要调用方来负责释放,否则会出现内存泄漏。 **说明** 协议类型是枚举类型,包含以下三种格式: - TSDB_SML_LINE_PROTOCOL:InfluxDB 行协议(Line Protocol) - TSDB_SML_TELNET_PROTOCOL: OpenTSDB Telnet 文本行协议 - TSDB_SML_JSON_PROTOCOL: OpenTSDB Json 协议格式 时间戳分辨率的定义,定义在 `taos.h` 文件中,具体内容如下: - TSDB_SML_TIMESTAMP_NOT_CONFIGURED = 0, - TSDB_SML_TIMESTAMP_HOURS, - TSDB_SML_TIMESTAMP_MINUTES, - TSDB_SML_TIMESTAMP_SECONDS, - TSDB_SML_TIMESTAMP_MILLI_SECONDS, - TSDB_SML_TIMESTAMP_MICRO_SECONDS, - TSDB_SML_TIMESTAMP_NANO_SECONDS 需要注意的是,时间戳分辨率参数只在协议类型为 `SML_LINE_PROTOCOL` 的时候生效。 对于 OpenTSDB 的文本协议,时间戳的解析遵循其官方解析规则 — 按照时间戳包含的字符的数量来确认时间精度。 **schemaless 其他相关的接口** - `TAOS_RES *taos_schemaless_insert_with_reqid(TAOS *taos, char *lines[], int numLines, int protocol, int precision, int64_t reqid)` - `TAOS_RES *taos_schemaless_insert_raw(TAOS *taos, char *lines, int len, int32_t *totalRows, int protocol, int precision)` - `TAOS_RES *taos_schemaless_insert_raw_with_reqid(TAOS *taos, char *lines, int len, int32_t *totalRows, int protocol, int precision, int64_t reqid)` - `TAOS_RES *taos_schemaless_insert_ttl(TAOS *taos, char *lines[], int numLines, int protocol, int precision, int32_t ttl)` - `TAOS_RES *taos_schemaless_insert_ttl_with_reqid(TAOS *taos, char *lines[], int numLines, int protocol, int precision, int32_t ttl, int64_t reqid)` - `TAOS_RES *taos_schemaless_insert_raw_ttl(TAOS *taos, char *lines, int len, int32_t *totalRows, int protocol, int precision, int32_t ttl)` - `TAOS_RES *taos_schemaless_insert_raw_ttl_with_reqid(TAOS *taos, char *lines, int len, int32_t *totalRows, int protocol, int precision, int32_t ttl, int64_t reqid)` **说明** - 上面这7个接口是扩展接口,主要用于在schemaless写入时传递ttl、reqid参数,可以根据需要使用。 - 带_raw的接口通过传递的参数lines指针和长度len来表示数据,为了解决原始接口数据包含'\0'而被截断的问题。totalRows指针返回解析出来的数据行数。 - 带_ttl的接口可以传递ttl参数来控制建表的ttl到期时间。 - 带_reqid的接口可以通过传递reqid参数来追踪整个的调用链。 ### 数据订阅 API - `int32_t tmq_get_topic_assignment(tmq_t *tmq, const char *pTopicName, tmq_topic_assignment **assignment, int32_t *numOfAssignment)` - `void tmq_free_assignment(tmq_topic_assignment* pAssignment)` tmq_topic_assignment结构体定义如下: ```c typedef struct tmq_topic_assignment { int32_t vgId; int64_t currentOffset; int64_t begin; int64_t end; } tmq_topic_assignment; ``` **功能说明** - tmq_get_topic_assignment 接口返回当前consumer分配的vgroup的信息,每个vgroup的信息包括vgId,wal的最大最小offset,以及当前消费到的offset。 **参数说明** - numOfAssignment :分配给该consumer有效的vgroup个数。 - assignment :分配的信息,数据大小为numOfAssignment,需要通过 tmq_free_assignment 接口释放。 **返回值** - 错误码,0成功,非0失败,可通过 `char *tmq_err2str(int32_t code)` 函数获取错误信息。 - `int64_t tmq_committed(tmq_t *tmq, const char *pTopicName, int32_t vgId)` **功能说明** - 获取当前 consumer 在某个 topic 和 vgroup上的 commit 位置。 **返回值** - 当前commit的位置,-2147467247表示没有消费进度,其他小于0的值表示失败,错误码就是返回值 - `int32_t tmq_commit_sync(tmq_t *tmq, const TAOS_RES *msg)` - `void tmq_commit_async(tmq_t *tmq, const TAOS_RES *msg, tmq_commit_cb *cb, void *param)` - `int32_t tmq_commit_offset_sync(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset)` - `void tmq_commit_offset_async(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset, tmq_commit_cb *cb, void *param)` **功能说明** commit接口分为两种类型,每种类型有同步和异步接口: - 第一种类型:根据消息提交,提交消息里的进度,如果消息传NULL,提交当前consumer所有消费的vgroup的当前进度 : tmq_commit_sync/tmq_commit_async - 第二种类型:根据某个topic的某个vgroup的offset提交 : tmq_commit_offset_sync/tmq_commit_offset_async **参数说明** - msg:消费到的消息结构,如果msg传NULL,提交当前consumer所有消费的vgroup的当前进度 **返回值** - 错误码,0成功,非0失败,可通过 `char *tmq_err2str(int32_t code)` 函数获取错误信息 - `int64_t tmq_position(tmq_t *tmq, const char *pTopicName, int32_t vgId)` **功能说明** - 获取当前消费位置,为消费到的数据位置的下一个位置 **返回值** - 消费位置,非0失败,可通过 `char *tmq_err2str(int32_t code)` 函数获取错误信息 - `int32_t tmq_offset_seek(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset)` **功能说明** - 设置 consumer 在某个topic的某个vgroup的 offset位置,开始消费 **返回值** - 错误码,0成功,非0失败,可通过 `char *tmq_err2str(int32_t code)` 函数获取错误信息 - `int32_t int64_t tmq_get_vgroup_offset(TAOS_RES* res)` **功能说明** 获取 poll 消费到的数据的起始offset **参数说明** - msg:消费到的消息结构 **返回值** - 消费到的offset,非0失败,可通过 `char *tmq_err2str(int32_t code)` 函数获取错误信息 - `int32_t int32_t tmq_subscription(tmq_t *tmq, tmq_list_t **topics)` **功能说明** 获取消费者订阅的 topic 列表 **参数说明** - topics: 获取的 topic 列表存储在这个结构中,接口内分配内存,需调用tmq_list_destroy释放 **返回值** - 错误码,0成功,非0失败,可通过 `char *tmq_err2str(int32_t code)` 函数获取错误信息