source: trunk/Monitoring/test/librabbitmq-python/test2/consumer.py @ 963

Last change on this file since 963 was 854, checked in by jripsl, 11 years ago

add JSON deserialization, mail notification, logging.

  • Property svn:executable set to *
File size: 2.3 KB
Line 
1#!/bin/env python
2""" Sample Consumer Code """
3
4import amqplib.client_0_8 as amqp
5# This is the function that basic_consume will send messages to                               
6def process_message( message ):
7    """ Callback function used by channel.basic_consume """
8    print 'Received: %s' % message.body
9
10# Rabbit Server to connect to
11host = '127.0.0.1'
12port = 5672
13
14# Exchange and queue information
15exchange_name = 'test'
16exchange_type = 'direct'
17queue_name = 'messages'
18routing_key = 'test.messages'
19
20# Let's set this up by default, we'll use it later
21process_messages = True
22
23# Connect to Rabbit
24connection= amqp.Connection( host ='%s:%s' % ( host, port ),
25                        userid = 'guest',
26                        password = 'guest',
27                        ssl = False,
28                        virtual_host = '/' )
29
30# Create a channel to talk to Rabbit on
31channel = connection.channel()
32
33# Create our exchange
34channel.exchange_declare( exchange = exchange_name, 
35                          type = exchange_type, 
36                          durable = True,
37                          auto_delete = False )
38                                       
39# Create our Queue
40channel.queue_declare( queue = queue_name , 
41                       durable = True,
42                       exclusive = False, 
43                       auto_delete = True )
44           
45# Bind to the Queue / Exchange
46channel.queue_bind( queue = queue_name, 
47                    exchange = exchange_name,
48                    routing_key = routing_key )
49
50# Let AMQP know to send us messages
51consumer_tag = channel.basic_consume( queue = queue_name, 
52                                      no_ack = True,
53                                      callback = process_message )
54
55# Loop while process_messages is True
56while process_messages:
57
58    # Wait for a message
59    channel.wait()           
60
61# Close the channel
62channel.close()
63
64# Close our connection
65connection.close()
66           
67# This might go somewhere like a signal handler
68def cancel_processing():
69    """ Stop consuming messages from RabbitMQ """
70    global channel, consumer_tag, process_messages
71   
72    # Do this so we exit our main loop
73    process_message = False         
74   
75    # Tell the channel you dont want to consume anymore 
76    channel.basic_cancel( consumer_tag )
Note: See TracBrowser for help on using the repository browser.