source: trunk/Monitoring/test/test_twisted.py @ 1493

Last change on this file since 1493 was 851, checked in by jripsl, 11 years ago
  • add skeleton and documentation.
File size: 13.0 KB
Line 
1#!/usr/bin/env python
2
3import optparse, sys, uuid
4
5from twisted.internet import reactor
6from twisted.internet.defer import inlineCallbacks, returnValue
7from twisted.internet.protocol import ClientCreator
8from txamqp.client import TwistedDelegate
9from txamqp.content import Content
10from txamqp.protocol import AMQClient
11import txamqp.spec
12
13NUM_MSGS = 1
14DEFAULT_TOPICS='info'
15QUEUE_NAME = 'txamqp_example_queue'
16ROUTING_TOPIC = 'txamqp_example_topic'
17EXCHANGE_NAME = 'txamqp_example_exchange'
18SEVERITIES = ('info', 'warning', 'debug')
19FACILITIES = ('kern', 'mail')
20
21@inlineCallbacks
22def gotConnection(connection, username, password):
23    print ' Got connection, authenticating.'
24    # Older version of AMQClient have no authenticate
25    # yield connection.authenticate(username, password)
26    # This produces the same effect
27    yield connection.start({'LOGIN': username, 'PASSWORD': password})
28    channel = yield connection.channel(1)
29    yield channel.channel_open()
30    returnValue((connection, channel))
31
32def setup_queue_cleanup(result):
33    """Executed in consumers on Ctrl-C, but not in publishers."""
34    connection, channel = result
35    reactor.addSystemEventTrigger('before', 'shutdown',
36        consumer_cleanup, channel)
37    return result
38
39def consumer_cleanup(channel):
40    return channel.queue_delete(QUEUE_NAME)
41
42@inlineCallbacks
43def publisher_cleanup(result):
44#    import pdb; pdb.set_trace()
45    connection, channel = result
46#    yield channel.queue_delete(queue=QUEUE_NAME)
47    yield channel.channel_close()
48    chan0 = yield connection.channel(0)
49    yield chan0.connection_close()
50    reactor.stop()
51
52@inlineCallbacks
53def consume1(result, topics):
54    connection, channel = result
55    yield channel.queue_declare(queue=QUEUE_NAME)
56    msgnum = 0
57    print ' [*] Waiting for messages. To exit press CTRL+C'
58    yield channel.basic_consume(queue=QUEUE_NAME, no_ack=True, consumer_tag='qtag')
59    queue = yield connection.queue('qtag')
60    while True:
61        msg = yield queue.get()
62        msgnum += 1
63        print " [%04d] Received %r from channel #%d" % (msgnum, msg.content.body, channel.id)
64    returnValue(result)
65
66@inlineCallbacks
67def consume2(result, topics):
68    connection, channel = result
69    yield channel.queue_declare(queue=QUEUE_NAME, durable=True)
70    msgnum = 0
71    print ' [*] Waiting for messages. To exit press CTRL+C'
72    yield channel.basic_qos(prefetch_count=1)
73    yield channel.basic_consume(queue=QUEUE_NAME, consumer_tag='qtag')
74    queue = yield connection.queue('qtag')
75    while True:
76        msg = yield queue.get()
77        msgnum += 1
78        print " [%04d] Received %r from channel #%d" % (
79            msgnum, msg.content.body, channel.id)
80        channel.basic_ack(delivery_tag=msg.delivery_tag)
81    returnValue(result)
82
83@inlineCallbacks
84def consume3(result, topics):
85    connection, channel = result
86    yield channel.exchange_declare(exchange=EXCHANGE_NAME, type='fanout')
87    reply = yield channel.queue_declare(exclusive=True)
88    queue_name = reply.queue
89    msgnum = 0
90    channel.queue_bind(exchange=EXCHANGE_NAME, queue=queue_name)
91    print ' [*] Waiting for messages. To exit press CTRL+C'
92    yield channel.basic_consume(
93        queue=queue_name, no_ack=True, consumer_tag='qtag')
94    queue = yield connection.queue('qtag')
95    while True:
96        msg = yield queue.get()
97        msgnum += 1
98        print " [%04d] Received %r from channel #%d" % (
99            msgnum, msg.content.body, channel.id)
100    returnValue(result)
101
102@inlineCallbacks
103def consume4(result, topics):
104    connection, channel = result
105    exchange_name = '%s_4' % EXCHANGE_NAME
106    yield channel.exchange_declare(exchange=exchange_name, type='direct')
107    reply = yield channel.queue_declare(exclusive=True)
108    queue_name = reply.queue
109    msgnum = 0
110    severities = topics.split(',')
111    for severity in severities:
112        channel.queue_bind(
113            exchange=exchange_name, queue=queue_name, routing_key=severity)
114    print ' [*] Waiting for messages. To exit press CTRL+C'
115    yield channel.basic_consume(
116        queue=queue_name, no_ack=True, consumer_tag='qtag')
117    queue = yield connection.queue('qtag')
118    while True:
119        msg = yield queue.get()
120        msgnum += 1
121        print " [%04d] Received %r from channel #%d with severity [%s]" % (
122            msgnum, msg.content.body, channel.id, msg.routing_key)
123    returnValue(result)
124
125@inlineCallbacks
126def consume5(result, topics):
127    connection, channel = result
128    exchange_name = '%s_5' % EXCHANGE_NAME
129    yield channel.exchange_declare(exchange=exchange_name, type='topic')
130    reply = yield channel.queue_declare(exclusive=True)
131    queue_name = reply.queue
132    msgnum = 0
133    routing_keys = topics.split(',')
134    for routing_key in routing_keys:
135        channel.queue_bind(
136            exchange=exchange_name, queue=queue_name, routing_key=routing_key)
137    print ' [*] Waiting for messages. To exit press CTRL+C'
138    yield channel.basic_consume(
139        queue=queue_name, no_ack=True, consumer_tag='qtag')
140    queue = yield connection.queue('qtag')
141    while True:
142        msg = yield queue.get()
143        msgnum += 1
144        print (' [%04d] Received %r from channel #%d '
145            'with facility and severity [%s]') % (
146            msgnum, msg.content.body, channel.id, msg.routing_key)
147    returnValue(result)
148
149@inlineCallbacks
150def consume6(result, topics):
151    connection, channel = result
152    yield channel.queue_declare(queue='rpc_queue')
153    msgnum = 0
154    print ' [*] Waiting for RPC requests. To exit press CTRL+C'
155    yield channel.basic_qos(prefetch_count=1)
156    yield channel.basic_consume(
157        queue='rpc_queue', no_ack=True, consumer_tag='qtag')
158    queue = yield connection.queue('qtag')
159
160    def fib(n):
161        if n == 0:
162            return 0
163        elif n == 1:
164            return 1
165        else:
166            return fib(n-1) + fib(n-2)
167
168    while True:
169        msg = yield queue.get()
170        input_ = msg.content.body
171        properties = msg.content.properties
172        msgnum += 1
173        print ' [%04d] Received fib(%r) from channel #%d' % (
174            msgnum, input_, channel.id)
175        output = fib(int(input_))
176        response = Content(str(output))
177        response['correlation id'] = properties['correlation id']
178        channel.basic_publish(exchange='', routing_key=properties['reply to'], content=response)
179    returnValue(result)
180
181@inlineCallbacks
182def publish1(result, message, count_):
183    connection, channel = result
184    yield channel.queue_declare(queue=QUEUE_NAME)
185    for i in range(count_):
186        msg = Content('%s [%04d]' % (message, i,))
187        yield channel.basic_publish(
188            exchange='', routing_key=QUEUE_NAME, content=msg)
189        print ' [x] Sent "%s [%04d]"' % (message, i,)
190    returnValue(result)
191
192@inlineCallbacks
193def publish2(result, message, count_):
194    connection, channel = result
195    yield channel.queue_declare(queue=QUEUE_NAME, durable=True)
196    for i in range(count_):
197        msg = Content('%s [%04d]' % (message, i,))
198        msg['delivery mode'] = 2
199        yield channel.basic_publish(
200            exchange='', routing_key=QUEUE_NAME, content=msg)
201        print ' [x] Sent "%s [%04d]"' % (message, i,)
202    returnValue(result)
203
204@inlineCallbacks
205def publish3(result, message, count_):
206    connection, channel = result
207    yield channel.exchange_declare(exchange=EXCHANGE_NAME, type='fanout')
208    for i in range(count_):
209        msg = Content('%s [%04d]' % (message, i,))
210        yield channel.basic_publish(
211            exchange=EXCHANGE_NAME, routing_key='', content=msg)
212        print ' [x] Sent "%s [%04d]"' % (message, i,)
213    returnValue(result)
214
215@inlineCallbacks
216def publish4(result, message, count_):
217    connection, channel = result
218    exchange_name = '%s_4' % EXCHANGE_NAME
219    yield channel.exchange_declare(exchange=exchange_name, type='direct')
220    for i in range(count_):
221        msg = Content('%s [%04d]' % (message, i,))
222        severity = SEVERITIES[i % len(SEVERITIES)]
223        yield channel.basic_publish(
224            exchange=exchange_name, routing_key=severity, content=msg)
225        print ' [x] Sent "%s [%04d]" with severity [%s]' % (
226            message, i, severity,)
227    returnValue(result)
228
229@inlineCallbacks
230def publish5(result, message, count_):
231    connection, channel = result
232    exchange_name = '%s_5' % EXCHANGE_NAME
233    yield channel.exchange_declare(exchange=exchange_name, type='topic')
234    for i in range(count_):
235        msg = Content('%s [%04d]' % (message, i))
236        severity = SEVERITIES[i % len(SEVERITIES)]
237        facility = FACILITIES[i % len(FACILITIES)]
238        routing_key = '%s.%s' % (facility, severity)
239        yield channel.basic_publish(
240            exchange=exchange_name, routing_key=routing_key, content=msg)
241        print ' [x] Sent "%s [%04d]" with facility and severity [%s]' % (
242            message, i, routing_key)
243    returnValue(result)
244
245@inlineCallbacks
246def publish6(result, message, count_):
247    connection, channel = result
248
249    @inlineCallbacks
250    def call(n):
251        corr_id = str(uuid.uuid4())
252        reply = yield channel.queue_declare(exclusive=True)
253        callback_queue = reply.queue
254        msg = Content(str(n))
255        msg['correlation id'] = corr_id
256        msg['reply to'] = callback_queue
257        yield channel.basic_publish(
258            exchange='', routing_key='rpc_queue', content=msg)
259        print ' [x] Sent "%s"' % n
260        yield channel.basic_consume(
261            queue=callback_queue, no_ack=True, consumer_tag='qtag')
262        queue = yield connection.queue('qtag')
263        while True:
264            response = yield queue.get()
265            if response.content.properties['correlation id'] == corr_id:
266                returnValue(response.content.body)
267
268    print ' [x] Requesting fib(%s)' % message
269    response = yield call(message)
270    print ' [.] Got %r' % response
271    returnValue(result)
272
273PUBLISH_EXAMPLES = {
274    'publish1': publish1,
275    'publish2': publish2,
276    'publish3': publish3,
277    'publish4': publish4,
278    'publish5': publish5,
279    'publish6': publish6,
280}
281
282CONSUME_EXAMPLES = {
283    'consume1': consume1,
284    'consume2': consume2,
285    'consume3': consume3,
286    'consume4': consume4,
287    'consume5': consume5,
288    'consume6': consume6,
289}
290
291ALL_EXAMPLES = {}
292ALL_EXAMPLES.update(PUBLISH_EXAMPLES)
293ALL_EXAMPLES.update(CONSUME_EXAMPLES)
294
295
296def check_cmd_line(options, args, abort):
297    """
298    Check the command line options and args, and exit with an error message
299    if something is not acceptable.
300    """
301    if args:
302        abort('No arguments needed')
303    if not 0 < options.port < 65536:
304        abort('Please specify a port number between 1 and 65535 included')
305    if options.example not in ALL_EXAMPLES.keys():
306        abort('Please specify one of the following, as an example name: %s' %
307            ALL_EXAMPLES.keys())
308    if options.example in PUBLISH_EXAMPLES and options.message is None:
309        abort('Please provide a message to publish')
310    if options.message is not None and (options.example not in PUBLISH_EXAMPLES):
311        abort('Setting the message is only meaningful for publish examples')
312    if options.count < 1:
313        abort('Please set a positive number of messages')
314    return (options.port, options.example, options.message, options.count,
315        options.topics)
316
317def parse_cmd_line(parser, all_args):
318    """
319    Parse command line arguments using the optparse library.
320    Return (options, args).
321    """
322    parser.set_usage(
323        'Usage: %s -p -e [producer options: -m -c | '
324            'consumer options: -t topic1,topic2,...]' % all_args[0])
325    all_args = all_args[1:]
326    parser.add_option('-p', '--port', dest='port', type='int',
327        help='RabbitMQ server port number')
328    parser.add_option('-e', '--example', dest='example',
329        help='example name: "produceX" or "consumeX", with X from 1 to 6')
330    parser.add_option('-m', '--message', dest='message',
331        help='message to send as a producer')
332    parser.add_option('-c', '--count', dest='count', type='int',
333        help='num. of messages to send as a producer: default: %d' % NUM_MSGS)
334    parser.add_option('-t', '--topics', dest='topics',
335        help='topics to subscribe this consumer to')
336    parser.set_defaults(count=NUM_MSGS, topics=DEFAULT_TOPICS)
337    return parser.parse_args(all_args)
338
339def main(all_args=None):
340    """The main"""
341    all_args = all_args or []
342    parser = optparse.OptionParser()
343    options, args = parse_cmd_line(parser, all_args)
344    (port, example, message, count_, topics) = check_cmd_line(
345        options, args, parser.error)
346
347    host = 'localhost'
348    vhost = '/'
349    username = 'guest'
350    password = 'guest'
351    #spec = txamqp.spec.load('src/specs/standard/amqp0-8.stripped.xml')
352    spec = txamqp.spec.load('garbage/amqp0-8.stripped.xml')
353
354    delegate = TwistedDelegate()
355
356    d = ClientCreator(reactor, AMQClient, delegate=delegate, vhost=vhost,
357        spec=spec).connectTCP(host, port)
358
359    d.addCallback(gotConnection, username, password)
360    if example in PUBLISH_EXAMPLES:
361        d.addCallback(ALL_EXAMPLES[example], message, count_)
362    else:
363        d.addCallback(ALL_EXAMPLES[example], topics)
364    d.addCallback(publisher_cleanup)
365    d.addErrback(lambda f: sys.stderr.write(str(f)))
366    reactor.run()
367
368if __name__ == "__main__":
369    main(all_args=sys.argv)
Note: See TracBrowser for help on using the repository browser.