glances_influxdb2.py 6.7 KB
Newer Older
1 2 3 4
# -*- coding: utf-8 -*-
#
# This file is part of Glances.
#
5
# Copyright (C) 2021 Nicolargo <nicolas@nicolargo.com>
6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
#
# Glances is free software; you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Glances is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

"""InfluxDB (from to InfluxDB 1.8+) interface class."""

import sys
23
from platform import node
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46

from glances.logger import logger
from glances.exports.glances_export import GlancesExport

from influxdb_client import InfluxDBClient, WriteOptions


class Export(GlancesExport):
    """This class manages the InfluxDB export module."""

    def __init__(self, config=None, args=None):
        """Init the InfluxDB export IF."""
        super(Export, self).__init__(config=config, args=args)

        # Mandatories configuration keys (additional to host and port)
        self.org = None
        self.bucket = None
        self.token = None

        # Optionals configuration keys
        self.protocol = 'http'
        self.prefix = None
        self.tags = None
47
        self.hostname = None
48 49 50 51 52 53 54 55 56 57 58 59

        # Load the InfluxDB configuration file
        self.export_enable = self.load_conf('influxdb2',
                                            mandatories=['host', 'port',
                                                         'user', 'password',
                                                         'org', 'bucket', 'token'],
                                            options=['protocol',
                                                     'prefix',
                                                     'tags'])
        if not self.export_enable:
            sys.exit(2)

60 61 62
        # The hostname is always add as a tag
        self.hostname = node().split('.')[0]

63 64 65 66 67 68 69 70 71 72 73 74
        # Init the InfluxDB client
        self.client = self.init()

    def init(self):
        """Init the connection to the InfluxDB server."""
        if not self.export_enable:
            return None

        url = '{}://{}:{}'.format(self.protocol, self.host, self.port)
        try:
            client = InfluxDBClient(url=url,
                                    enable_gzip=False,
75
                                    org=self.org,
76 77 78 79 80
                                    token=self.token)
        except Exception as e:
            logger.critical("Cannot connect to InfluxDB server '%s' (%s)" % (url, e))
            sys.exit(2)
        else:
N
nicolargo 已提交
81 82
            logger.info("Connected to InfluxDB server version {} ({})".format(client.health().version,
                                                                              client.health().message))
83 84 85

        # Create the write client
        write_client = client.write_api(write_options=WriteOptions(batch_size=500,
N
nicolargo 已提交
86 87 88
                                                                   flush_interval=10000,
                                                                   jitter_interval=2000,
                                                                   retry_interval=5000,
89
                                                                   max_retries=5,
N
nicolargo 已提交
90
                                                                   max_retry_delay=30000,
91 92 93 94 95
                                                                   exponential_base=2))
        return write_client

    def _normalize(self, name, columns, points):
        """Normalize data for the InfluxDB's data model."""
96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112
        ret = []

        # Build initial dict by crossing columns and point
        data_dict = dict(zip(columns, points))

        # issue1871 - Check if a key exist. If a key exist, the value of
        # the key should be used as a tag to identify the measurement.
        keys_list = [k.split('.')[0] for k in columns if k.endswith('.key')]
        if len(keys_list) == 0:
            keys_list = [None]

        for measurement in keys_list:
            # Manage field
            if measurement is not None:
                fields = {k.replace('{}.'.format(measurement), ''): data_dict[k]
                          for k in data_dict
                          if k.startswith('{}.'.format(measurement))}
113
            else:
114 115 116 117 118 119
                fields = data_dict
            # Transform to InfluxDB datamodel
            # https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/
            for k in fields:
                #  Do not export empty (None) value
                if fields[k] is None:
120
                    continue
121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144
                # Convert numerical to float
                try:
                    fields[k] = float(fields[k])
                except (TypeError, ValueError):
                    # Convert others to string
                    try:
                        fields[k] = str(fields[k])
                    except (TypeError, ValueError):
                        pass
            # Manage tags
            tags = self.parse_tags(self.tags)
            if 'key' in fields and fields['key'] in fields:
                # Create a tag from the key
                # Tag should be an string (see InfluxDB data model)
                tags[fields['key']] = str(fields[fields['key']])
                # Remove it from the field list (can not be a field and a tag)
                fields.pop(fields['key'])
            # Add the hostname as a tag
            tags['hostname'] = self.hostname
            # Add the measurement to the list
            ret.append({'measurement': name,
                        'tags': tags,
                        'fields': fields})
        return ret
145 146 147 148 149 150 151 152 153 154 155 156 157

    def export(self, name, columns, points):
        """Write the points to the InfluxDB server."""
        # Manage prefix
        if self.prefix is not None:
            name = self.prefix + '.' + name
        # Write input to the InfluxDB database
        if len(points) == 0:
            logger.debug("Cannot export empty {} stats to InfluxDB".format(name))
        else:
            try:
                self.client.write(self.bucket,
                                  self.org,
158
                                  self._normalize(name, columns, points),
159 160 161 162 163 164
                                  time_precision="s")
            except Exception as e:
                # Log level set to debug instead of error (see: issue #1561)
                logger.debug("Cannot export {} stats to InfluxDB ({})".format(name, e))
            else:
                logger.debug("Export {} stats to InfluxDB".format(name))