1 | #!/bin/env python |
---|
2 | """ Sample Consumer Code """ |
---|
3 | |
---|
4 | import amqplib.client_0_8 as amqp |
---|
5 | # This is the function that basic_consume will send messages to |
---|
6 | def process_message( message ): |
---|
7 | """ Callback function used by channel.basic_consume """ |
---|
8 | print 'Received: %s' % message.body |
---|
9 | |
---|
10 | # Rabbit Server to connect to |
---|
11 | host = '127.0.0.1' |
---|
12 | port = 5672 |
---|
13 | |
---|
14 | # Exchange and queue information |
---|
15 | exchange_name = 'test' |
---|
16 | exchange_type = 'direct' |
---|
17 | queue_name = 'messages' |
---|
18 | routing_key = 'test.messages' |
---|
19 | |
---|
20 | # Let's set this up by default, we'll use it later |
---|
21 | process_messages = True |
---|
22 | |
---|
23 | # Connect to Rabbit |
---|
24 | connection= amqp.Connection( host ='%s:%s' % ( host, port ), |
---|
25 | userid = 'guest', |
---|
26 | password = 'guest', |
---|
27 | ssl = False, |
---|
28 | virtual_host = '/' ) |
---|
29 | |
---|
30 | # Create a channel to talk to Rabbit on |
---|
31 | channel = connection.channel() |
---|
32 | |
---|
33 | # Create our exchange |
---|
34 | channel.exchange_declare( exchange = exchange_name, |
---|
35 | type = exchange_type, |
---|
36 | durable = True, |
---|
37 | auto_delete = False ) |
---|
38 | |
---|
39 | # Create our Queue |
---|
40 | channel.queue_declare( queue = queue_name , |
---|
41 | durable = True, |
---|
42 | exclusive = False, |
---|
43 | auto_delete = True ) |
---|
44 | |
---|
45 | # Bind to the Queue / Exchange |
---|
46 | channel.queue_bind( queue = queue_name, |
---|
47 | exchange = exchange_name, |
---|
48 | routing_key = routing_key ) |
---|
49 | |
---|
50 | # Let AMQP know to send us messages |
---|
51 | consumer_tag = channel.basic_consume( queue = queue_name, |
---|
52 | no_ack = True, |
---|
53 | callback = process_message ) |
---|
54 | |
---|
55 | # Loop while process_messages is True |
---|
56 | while process_messages: |
---|
57 | |
---|
58 | # Wait for a message |
---|
59 | channel.wait() |
---|
60 | |
---|
61 | # Close the channel |
---|
62 | channel.close() |
---|
63 | |
---|
64 | # Close our connection |
---|
65 | connection.close() |
---|
66 | |
---|
67 | # This might go somewhere like a signal handler |
---|
68 | def cancel_processing(): |
---|
69 | """ Stop consuming messages from RabbitMQ """ |
---|
70 | global channel, consumer_tag, process_messages |
---|
71 | |
---|
72 | # Do this so we exit our main loop |
---|
73 | process_message = False |
---|
74 | |
---|
75 | # Tell the channel you dont want to consume anymore |
---|
76 | channel.basic_cancel( consumer_tag ) |
---|