diff --git a/docs/zh/07-develop/07-tmq.mdx b/docs/zh/07-develop/07-tmq.mdx index d8966f7798cf101dbf46b34dd9382a503d9a2ab1..1ec1922c01dfba22fc04eb1310e6a6049269caf4 100644 --- a/docs/zh/07-develop/07-tmq.mdx +++ b/docs/zh/07-develop/07-tmq.mdx @@ -92,102 +92,21 @@ void close() throws SQLException; ```python class TaosConsumer(): - DEFAULT_CONFIG = { - 'group.id', - 'client.id', - 'enable.auto.commit', - 'auto.commit.interval.ms', - 'auto.offset.reset', - 'msg.with.table.name', - 'experimental.snapshot.enable', - 'enable.heartbeat.background', - 'experimental.snapshot.batch.size', - 'td.connect.ip', - 'td.connect.user', - 'td.connect.pass', - 'td.connect.port', - 'td.connect.db', - 'timeout' - } + def __init__(self, *topics, **configs) + + def __iter__(self) + + def __next__(self) - def __init__(self, *topics, **configs): - self._closed = True - self._conf = None - self._list = None - self._tmq = None - - keys = list(configs.keys()) - for k in keys: - configs.update({k.replace('_','.'): configs.pop(k)}) - - extra_configs = set(configs).difference(self.DEFAULT_CONFIG) - if extra_configs: - raise TmqError("Unrecognized configs: %s" % (extra_configs,)) - - self._conf = tmq_conf_new() - self._list = tmq_list_new() - - # set poll timeout - if 'timeout' in configs: - self._timeout = configs['timeout'] - del configs['timeout'] - else: - self._timeout = 0 - - # check if group id is set - - if 'group.id' not in configs: - raise TmqError("missing group.id in consumer config setting") - - for key, value in configs.items(): - tmq_conf_set(self._conf, key, value) - - self._tmq = tmq_consumer_new(self._conf) - - if not topics: - raise TmqError("Unset topic for Consumer") - - for topic in topics: - tmq_list_append(self._list, topic) - - tmq_subscribe(self._tmq, self._list) - - - def __iter__(self): - return self - - def __next__(self): - if not self._tmq: - raise StopIteration('TaosConsumer closed') - return next(self.sync_next()) - - def sync_next(self): - while 1: - res = tmq_consumer_poll(self._tmq, self._timeout) - if res: - break - yield TaosResult(res) + def sync_next(self) - def subscription(self): - if self._tmq is None: - return None - return tmq_subscription(self._tmq) - - def unsubscribe(self): - tmq_unsubscribe(self._tmq) - - def close(self): - if self._tmq: - tmq_consumer_close(self._tmq) - self._tmq = None + def subscription(self) + + def unsubscribe(self) + + def close(self) - def __del__(self): - if self._conf: - tmq_conf_destroy(self._conf) - if self._list: - tmq_list_destroy(self._list) - if self._tmq: - tmq_consumer_close(self._tmq) + def __del__(self) ``` @@ -354,6 +273,8 @@ public class MetersDeserializer extends ReferenceDeserializer { +Python 使用以下配置项创建一个 Consumer 实例。 + | 参数名称 | 类型 | 参数说明 | 备注 | | :----------------------------: | :-----: | -------------------------------------------------------- | ------------------------------------------- | | `td_connect_ip` | string | 用于创建连接,同 `taos_connect` | | @@ -368,6 +289,7 @@ public class MetersDeserializer extends ReferenceDeserializer { | `enable_heartbeat_background` | string | 启用后台心跳,启用后即使长时间不 poll 消息也不会造成离线 | 合法值:`true`, `false` | | `experimental_snapshot_enable` | string | 从 WAL 开始消费,还是从 TSBS 开始消费 | 合法值:`true`, `false` | | `msg_with_table_name` | string | 是否允许从消息中解析表名 | 合法值:`true`, `false` | +| `timeout` | int | 消费者拉去的超时时间 | |