source: trunk/Monitoring/test/recv_AMQP_msg.c @ 1509

Last change on this file since 1509 was 852, checked in by jripsl, 11 years ago
  • AMQP C-client implementation.
File size: 6.7 KB
Line 
1#include <stdlib.h>
2#include <stdio.h>
3#include <string.h>
4#include <stdint.h>
5#include <sys/time.h>
6#include <unistd.h>
7
8
9#include <ctype.h>
10
11#include <amqp.h>
12#include <amqp_framing.h>
13
14#include <assert.h>
15
16#include "utils.h"
17
18
19#define SUMMARY_EVERY_US 1000000
20
21
22
23
24uint64_t now_microseconds(void)
25{
26  struct timeval tv;
27  gettimeofday(&tv, NULL);
28  return (uint64_t) tv.tv_sec * 1000000 + (uint64_t) tv.tv_usec;
29}
30
31void microsleep(int usec)
32{
33  usleep(usec);
34}
35
36void die_on_error(int x, char const *context) {
37  if (x < 0) {
38    char *errstr = amqp_error_string(-x);
39    fprintf(stderr, "%s: %s\n", context, errstr);
40    free(errstr);
41    exit(1);
42  }
43}
44
45void die_on_amqp_error(amqp_rpc_reply_t x, char const *context) {
46  switch (x.reply_type) {
47    case AMQP_RESPONSE_NORMAL:
48      return;
49
50    case AMQP_RESPONSE_NONE:
51      fprintf(stderr, "%s: missing RPC reply type!\n", context);
52      break;
53
54    case AMQP_RESPONSE_LIBRARY_EXCEPTION:
55      fprintf(stderr, "%s: %s\n", context, amqp_error_string(x.library_error));
56      break;
57
58    case AMQP_RESPONSE_SERVER_EXCEPTION:
59      switch (x.reply.id) {
60        case AMQP_CONNECTION_CLOSE_METHOD: {
61          amqp_connection_close_t *m = (amqp_connection_close_t *) x.reply.decoded;
62          fprintf(stderr, "%s: server connection error %d, message: %.*s\n",
63                  context,
64                  m->reply_code,
65                  (int) m->reply_text.len, (char *) m->reply_text.bytes);
66          break;
67        }
68        case AMQP_CHANNEL_CLOSE_METHOD: {
69          amqp_channel_close_t *m = (amqp_channel_close_t *) x.reply.decoded;
70          fprintf(stderr, "%s: server channel error %d, message: %.*s\n",
71                  context,
72                  m->reply_code,
73                  (int) m->reply_text.len, (char *) m->reply_text.bytes);
74          break;
75        }
76        default:
77          fprintf(stderr, "%s: unknown server error, method id 0x%08X\n", context, x.reply.id);
78          break;
79      }
80      break;
81  }
82
83  exit(1);
84}
85
86static void dump_row(long count, int numinrow, int *chs) {
87  int i;
88
89  printf("%08lX:", count - numinrow);
90
91  if (numinrow > 0) {
92    for (i = 0; i < numinrow; i++) {
93      if (i == 8)
94        printf(" :");
95      printf(" %02X", chs[i]);
96    }
97    for (i = numinrow; i < 16; i++) {
98      if (i == 8)
99        printf(" :");
100      printf("   ");
101    }
102    printf("  ");
103    for (i = 0; i < numinrow; i++) {
104      if (isprint(chs[i]))
105        printf("%c", chs[i]);
106      else
107        printf(".");
108    }
109  }
110  printf("\n");
111}
112
113static int rows_eq(int *a, int *b) {
114  int i;
115
116  for (i=0; i<16; i++)
117    if (a[i] != b[i])
118      return 0;
119
120  return 1;
121}
122
123void amqp_dump(void const *buffer, size_t len) {
124  unsigned char *buf = (unsigned char *) buffer;
125  long count = 0;
126  int numinrow = 0;
127  int chs[16];
128  int oldchs[16] = {0};
129  int showed_dots = 0;
130  size_t i;
131
132  for (i = 0; i < len; i++) {
133    int ch = buf[i];
134
135    if (numinrow == 16) {
136      int i;
137
138      if (rows_eq(oldchs, chs)) {
139        if (!showed_dots) {
140          showed_dots = 1;
141          printf("          .. .. .. .. .. .. .. .. : .. .. .. .. .. .. .. ..\n");
142        }
143      } else {
144        showed_dots = 0;
145        dump_row(count, numinrow, chs);
146      }
147
148      for (i=0; i<16; i++)
149        oldchs[i] = chs[i];
150
151      numinrow = 0;
152    }
153
154    count++;
155    chs[numinrow++] = ch;
156  }
157
158  dump_row(count, numinrow, chs);
159
160  if (numinrow != 0)
161    printf("%08lX:\n", count);
162}
163
164static void run(amqp_connection_state_t conn)
165{
166  uint64_t start_time = now_microseconds();
167  int received = 0;
168  int previous_received = 0;
169  uint64_t previous_report_time = start_time;
170  uint64_t next_summary_time = start_time + SUMMARY_EVERY_US;
171
172  amqp_frame_t frame;
173  int result;
174  size_t body_received;
175  size_t body_target;
176
177  uint64_t now;
178
179  while (1) {
180    now = now_microseconds();
181    if (now > next_summary_time) {
182      int countOverInterval = received - previous_received;
183      double intervalRate = countOverInterval / ((now - previous_report_time) / 1000000.0);
184      printf("%d ms: Received %d - %d since last report (%d Hz)\n",
185(int)(now - start_time) / 1000, received, countOverInterval, (int) intervalRate);
186
187      previous_received = received;
188      previous_report_time = now;
189      next_summary_time += SUMMARY_EVERY_US;
190    }
191
192    amqp_maybe_release_buffers(conn);
193    result = amqp_simple_wait_frame(conn, &frame);
194    if (result < 0)
195      return;
196
197    if (frame.frame_type != AMQP_FRAME_METHOD)
198      continue;
199
200    if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD)
201      continue;
202
203    result = amqp_simple_wait_frame(conn, &frame);
204    if (result < 0)
205      return;
206
207    if (frame.frame_type != AMQP_FRAME_HEADER) {
208      fprintf(stderr, "Expected header!");
209      abort();
210    }
211
212    body_target = frame.payload.properties.body_size;
213    body_received = 0;
214
215    while (body_received < body_target) {
216      result = amqp_simple_wait_frame(conn, &frame);
217      if (result < 0)
218return;
219
220      if (frame.frame_type != AMQP_FRAME_BODY) {
221fprintf(stderr, "Expected body!");
222abort();
223      }
224
225      body_received += frame.payload.body_fragment.len;
226      assert(body_received <= body_target);
227    }
228
229    received++;
230  }
231}
232
233int main(int argc, char const * const *argv) {
234  char const *hostname;
235  int port;
236  char const *exchange;
237  char const *bindingkey;
238
239  int sockfd;
240  amqp_connection_state_t conn;
241
242  amqp_bytes_t queuename;
243
244  if (argc < 3) {
245    fprintf(stderr, "Usage: amqp_consumer host port\n");
246    return 1;
247  }
248
249  hostname = argv[1];
250  port = atoi(argv[2]);
251  exchange = "amq.direct"; /* argv[3]; */
252  bindingkey = "test queue"; /* argv[4]; */
253
254  conn = amqp_new_connection();
255
256  die_on_error(sockfd = amqp_open_socket(hostname, port), "Opening socket");
257  amqp_set_sockfd(conn, sockfd);
258  die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), "Logging in");
259  amqp_channel_open(conn, 1);
260  die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");
261
262  /*
263  {
264    amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, 1, amqp_empty_bytes, 0, 0, 0, 1, amqp_empty_table);
265    die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring queue");
266    queuename = amqp_bytes_malloc_dup(r->queue);
267    if (queuename.bytes == NULL) {
268      fprintf(stderr, "Out of memory while copying queue name");
269      return 1;
270    }
271  }
272
273  amqp_queue_bind(conn, 1, queuename, amqp_cstring_bytes(exchange), amqp_cstring_bytes(bindingkey), amqp_empty_table);
274
275  die_on_amqp_error(amqp_get_rpc_reply(conn), "Binding queue");
276
277  amqp_basic_consume(conn, 1, queuename, amqp_empty_bytes, 0, 1, 0, amqp_empty_table);
278  die_on_amqp_error(amqp_get_rpc_reply(conn), "Consuming");
279  */
280
281  amqp_basic_consume(conn, 1, amqp_cstring_bytes("test queue"), amqp_empty_bytes, 0, 1, 0, amqp_empty_table);
282  die_on_amqp_error(amqp_get_rpc_reply(conn), "Consuming");
283
284  run(conn);
285
286  die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel");
287  die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection");
288  die_on_error(amqp_destroy_connection(conn), "Ending connection");
289
290  return 0;
291}
Note: See TracBrowser for help on using the repository browser.