Source code for fedmsg.commands.collectd

# This file is part of fedmsg.
# Copyright (C) 2012 Red Hat, Inc.
#
# fedmsg is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# fedmsg is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with fedmsg; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
#
# Authors:  Ralph Bean <rbean@redhat.com>
#
import datetime
import socket
import time

import fedmsg
import fedmsg.meta

from fedmsg.commands import BaseCommand
from fedmsg.consumers import FedmsgConsumer
from moksha.hub.api import PollingProducer


class CollectdConsumer(FedmsgConsumer):
    config_key = "fedmsg.commands.collectd.enabled"
    validate_messages = False
    topic = '*'

    def __init__(self, hub):
        self.hub = hub

        super(CollectdConsumer, self).__init__(hub)
        self._dict = dict([
            (p.__name__.lower(), 0) for p in fedmsg.meta.processors
        ])
        self.host = socket.gethostname().split('.')[0]

    def consume(self, msg):
        processor = fedmsg.meta.msg2processor(msg, **self.hub.config)
        modname = processor.__name__.lower()
        self._dict[modname] += 1

    def dump(self):
        """ Called by CollectdProducer every `n` seconds. """

        # Print out the collectd feedback.
        # This is sent to stdout while other log messages are sent to stderr.
        for k, v in sorted(self._dict.items()):
            print(self.formatter(k, v))

        # Reset each entry to zero
        for k, v in sorted(self._dict.items()):
            self._dict[k] = 0

    def formatter(self, key, value):
        """ Format messages for collectd to consume. """
        template = "PUTVAL {host}/fedmsg/fedmsg_wallboard-{key} " +\
            "interval={interval} {timestamp}:{value}"
        timestamp = int(time.time())
        interval = self.hub.config['collectd_interval']
        return template.format(
            host=self.host,
            timestamp=timestamp,
            value=value,
            interval=interval,
            key=key,
        )


class CollectdProducer(PollingProducer):
    # "Frequency" is set later at runtime.
    def poll(self):
        self.hub.consumers[0].dump()


class CollectdCommand(BaseCommand):
    """ Print machine-readable information for collectd to monitor the bus. """
    name = "fedmsg-collectd"
    extra_args = [
        (['--collectd-interval'], {
            'dest': 'collectd_interval',
            'type': int,
            'help': 'Number of seconds to sleep between collectd updates.',
            'default': 2,
        }),
    ]

    def run(self):
        # Initialize the processors before CollectdConsumer is instantiated.
        fedmsg.meta.make_processors(**self.config)

        # Do just like in fedmsg.commands.hub and mangle fedmsg-config.py
        # to work with moksha's expected configuration.
        moksha_options = dict(
            mute=True,  # Disable some warnings.
            zmq_subscribe_endpoints=','.join(
                ','.join(bunch) for bunch in self.config['endpoints'].values()
            ),
        )
        self.config.update(moksha_options)
        self.config[CollectdConsumer.config_key] = True

        CollectdProducer.frequency = datetime.timedelta(
            seconds=self.config['collectd_interval']
        )

        from moksha.hub import main
        main(self.config, [CollectdConsumer], [CollectdProducer],
             framework=False)


[docs]def collectd(): command = CollectdCommand() command.execute()