# This file is part of fedmsg.
# Copyright (C) 2012 - 2014 Red Hat, Inc.
#
# fedmsg is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# fedmsg is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with fedmsg; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
#
# Authors: Ralph Bean <rbean@redhat.com>
#
import inspect
import json
import logging
import os
import psutil
import requests
import threading
import time
import warnings
import moksha.hub.api.consumer
import six
import fedmsg.crypto
import fedmsg.encoding
from fedmsg.replay import check_for_replay
[docs]class FedmsgConsumer(moksha.hub.api.consumer.Consumer):
"""
Base class for fedmsg consumers.
This class inherits from :class:`moksha.hub.api.consumer.Consumer` and you
should familiarize yourself with this class as well.
To create a consumer, you must inherit this class and do the following:
* Declare the class on the ``moksha.consumers`` python entry-point::
setup(
entry_points={
'moksha.consumer': (
'your_consumer = python.path:YourConsumerClass',
),
},
)
* Implement the ``consume(self, message)`` method on the class.
* Set the attributes documented below
Attributes:
validate_signatures (bool): If ``False``, message authenticity will not be
checked. This is helpful if you're developing or building a special-case
consumer. For example, the consumer used by :ref:`command-relay` sets
``validate_signatures = False`` so that it can transparently forward along
everything and let the terminal endpoints decide whether or not to consume
particular messages.
topic (str or list): This attribute is required. Either a :class:`str`
or :class:`list` of :class:`str` that are topics that this consumer
is interested in receiving messages for. To receive all messages, use
an empty string.
config_key (str): The name of the configuration key used to enable or disable
this consumer. If this key is not present in the fedmsg configuration or
does not have a value of ``True``, :ref:`command-hub` will not run the
consumer.
replay_name (str): The name of the replay endpoint where the system should
query for playback in case of missed messages. It must match a service
key in :ref:`conf-replay-endpoints`. This attribute is optional.
Args:
hub (moksha.hub.hub.MokshaCentralHub): The Moksha Hub that is initializing this
consumer.
"""
validate_signatures = None
config_key = None
def __init__(self, hub):
module = inspect.getmodule(self).__name__
name = self.__class__.__name__
self.log = logging.getLogger(__name__)
if not self.config_key:
raise ValueError("%s:%s must declare a 'config_key'" % (
module, name))
self.log.debug("%s is %r" % (
self.config_key, hub.config.get(self.config_key)
))
if not hub.config.get(self.config_key, False):
self.log.info('* disabled by config - %s:%s' % (module, name))
return
self.log.info(' enabled by config - %s:%s' % (module, name))
# This call "completes" registration of this consumer with the hub.
super(FedmsgConsumer, self).__init__(hub)
# Now, re-get our logger to override the one moksha assigns us.
self.log = logging.getLogger(__name__)
if self.validate_signatures is None:
self.validate_signatures = self.hub.config['validate_signatures']
if hasattr(self, "replay_name"):
self.name_to_seq_id = {}
if self.replay_name in self.hub.config.get("replay_endpoints", {}):
self.name_to_seq_id[self.replay_name] = -1
# Check if we have a status file to see if we have a backlog or not.
# Create its directory if it doesn't exist.
self.status_directory = self.hub.config.get('status_directory')
self.status_filename, self.status_lock = None, None
if self.status_directory:
# Extract proc name and handle differences between py2.6 and py2.7
proc_name = current_proc().name
if callable(proc_name):
proc_name = proc_name()
self.status_filename = os.path.join(
self.status_directory, proc_name, type(self).__name__)
topmost_directory, _ = self.status_filename.rsplit('/', 1)
if not os.path.exists(topmost_directory):
os.makedirs(topmost_directory)
self.datagrepper_url = self.hub.config.get('datagrepper_url')
self.skip_last_message = self.hub.config.get('skip_last_message')
if self.status_filename and self.datagrepper_url:
# First, try to read in the status from a previous run and fire off
# a thread to set up our workload.
self.log.info("Backlog handling setup. status: %r, url: %r" % (
self.status_filename, self.datagrepper_url))
self.status_lock = threading.Lock()
try:
with self.status_lock:
with open(self.status_filename, 'r') as f:
data = f.read()
moksha.hub.reactor.reactor.callInThread(self._backlog, data)
except IOError as e:
self.log.info(e)
else:
self.log.info("No backlog handling. status: %r, url: %r" % (
self.status_filename, self.datagrepper_url))
def _backlog(self, data):
"""Find all the datagrepper messages between 'then' and 'now'.
Put those on our work queue.
Should be called in a thread so as not to block the hub at startup.
"""
try:
data = json.loads(data)
except ValueError as e:
self.log.info("Status contents are %r" % data)
self.log.exception(e)
self.log.info("Skipping backlog retrieval.")
return
last = data['message']['body']
if isinstance(last, str):
last = json.loads(last)
then = last['timestamp']
now = int(time.time())
retrieved = 0
for message in self.get_datagrepper_results(then, now):
# Take the messages from datagrepper and remove any keys that were
# artificially added to the message. The presence of these would
# otherwise cause message crypto validation to fail.
message = fedmsg.crypto.utils.fix_datagrepper_message(message)
if message['msg_id'] != last['msg_id']:
retrieved = retrieved + 1
if (self.skip_last_message and retrieved <= 1):
self.log.info(
"Skipping %r (as requested by skip_last_message)" % last['msg_id']
)
else:
self.incoming.put(dict(body=message, topic=message['topic']))
else:
self.log.warning("Already seen %r; Skipping." % last['msg_id'])
self.log.info("Retrieved %i messages from datagrepper." % retrieved)
def get_datagrepper_results(self, then, now):
def _make_query(page=1):
return requests.get(self.datagrepper_url, params=dict(
rows_per_page=100, page=page, start=then, end=now, order='asc'
)).json()
# Grab the first page of results
data = _make_query()
# Grab and smash subsequent pages if there are any
interesting_topics = self.topic
if not isinstance(interesting_topics, list):
interesting_topics = [interesting_topics]
for page in range(1, data['pages'] + 1):
self.log.info("Retrieving datagrepper page %i of %i" % (
page, data['pages']))
data = _make_query(page=page)
for message in data['raw_messages']:
for topic in interesting_topics:
if message['topic'].startswith(topic[:-1]):
yield message
break
def validate(self, message):
"""
Validate the message before the consumer processes it.
This needs to raise an exception, caught by moksha.
Args:
message (dict): The message as a dictionary. This must, at a minimum,
contain the 'topic' key with a unicode string value and 'body' key
with a dictionary value. However, the message might also be an object
with a ``__json__`` method that returns a dict with a 'body' key that
can be a unicode string that is JSON-encoded.
Raises:
RuntimeWarning: If the message is not valid.
UnicodeDecodeError: If the message body is not unicode or UTF-8 and also
happens to contain invalid UTF-8 binary.
"""
if hasattr(message, '__json__'):
message = message.__json__()
if isinstance(message['body'], six.text_type):
message['body'] = json.loads(message['body'])
elif isinstance(message['body'], six.binary_type):
# Try to decode the message body as UTF-8 since it's very likely
# that that was the encoding used. This API should eventually only
# accept unicode strings inside messages. If a UnicodeDecodeError
# happens, let that bubble up.
warnings.warn('Message body is not unicode', DeprecationWarning)
message['body'] = json.loads(message['body'].decode('utf-8'))
# Massage STOMP messages into a more compatible format.
if not isinstance(message['body'], dict) or 'topic' not in message['body']:
message['body'] = {
'topic': message.get('topic'),
'msg': message['body'],
}
# If we're not validating, then everything is valid.
# If this is turned on globally, our child class can override it.
if not self.validate_signatures:
return
# We assume these match inside fedmsg.crypto, so we should enforce it.
if not message['topic'] == message['body']['topic']:
raise RuntimeWarning("Topic envelope mismatch.")
if not fedmsg.crypto.validate(message['body'], **self.hub.config):
raise RuntimeWarning("Failed to authn message.")
def _consume(self, message):
""" Called when a message is consumed.
This private method handles some administrative setup and teardown
before calling the public interface `consume` typically implemented
by a subclass.
When `moksha.blocking_mode` is set to `False` in the config, this
method always returns `None`. The argued message is stored in an
internal queue where the consumer's worker threads should eventually
pick it up.
When `moksha.blocking_mode` is set to `True` in the config, this
method should return True or False, indicating whether the message
was handled or not. Specifically, in the event that the inner
`consume` method raises an exception of any kind, this method
should return `False` indicating that the message was not
successfully handled.
Args:
message (dict): The message as a dictionary.
Returns:
bool: Should be interpreted as whether or not the message was
handled by the consumer, or `None` if `moksha.blocking_mode` is
set to False.
"""
try:
self.validate(message)
except RuntimeWarning as e:
self.log.warn("Received invalid message {0}".format(e))
return
# Pass along headers if present. May be useful to filters or
# fedmsg.meta routines.
if isinstance(message, dict) and 'headers' in message and 'body' in message:
message['body']['headers'] = message['headers']
if hasattr(self, "replay_name"):
for m in check_for_replay(
self.replay_name, self.name_to_seq_id,
message, self.hub.config):
try:
self.validate(m)
return super(FedmsgConsumer, self)._consume(m)
except RuntimeWarning as e:
self.log.warn("Received invalid message {}".format(e))
else:
return super(FedmsgConsumer, self)._consume(message)
def pre_consume(self, message):
self.save_status(dict(
message=message,
status='pre',
))
def post_consume(self, message):
self.save_status(dict(
message=message,
status='post',
))
def save_status(self, data):
if self.status_filename and self.status_lock:
with self.status_lock:
with open(self.status_filename, 'w') as f:
f.write(fedmsg.encoding.dumps(data))
def current_proc():
mypid = os.getpid()
for proc in psutil.process_iter():
if proc.pid == mypid:
return proc
# This should be impossible.
raise ValueError("Could not find process %r" % mypid)