source: trunk/Monitoring/CNClient/send_AMQP_msg.c @ 866

Last change on this file since 866 was 866, checked in by jripsl, 11 years ago

Add demo scenarios.

File size: 12.8 KB
Line 
1#include <stdlib.h>
2#include <stdio.h>
3#include <string.h>
4#include <stdint.h>
5#include <sys/time.h>
6#include <unistd.h>
7
8
9#include <ctype.h>
10
11#include <amqp.h>
12#include <amqp_framing.h>
13
14#include "utils.h"
15
16
17#define SUMMARY_EVERY_US 1000000
18
19
20
21uint64_t now_microseconds(void)
22{
23  struct timeval tv;
24  gettimeofday(&tv, NULL);
25  return (uint64_t) tv.tv_sec * 1000000 + (uint64_t) tv.tv_usec;
26}
27
28void microsleep(int usec)
29{
30  usleep(usec);
31}
32
33void die_on_error(int x, char const *context) {
34  if (x < 0) {
35    char *errstr = amqp_error_string(-x);
36    fprintf(stderr, "%s: %s\n", context, errstr);
37    free(errstr);
38    exit(1);
39  }
40}
41
42void die_on_amqp_error(amqp_rpc_reply_t x, char const *context) {
43  switch (x.reply_type) {
44    case AMQP_RESPONSE_NORMAL:
45      return;
46
47    case AMQP_RESPONSE_NONE:
48      fprintf(stderr, "%s: missing RPC reply type!\n", context);
49      break;
50
51    case AMQP_RESPONSE_LIBRARY_EXCEPTION:
52      fprintf(stderr, "%s: %s\n", context, amqp_error_string(x.library_error));
53      break;
54
55    case AMQP_RESPONSE_SERVER_EXCEPTION:
56      switch (x.reply.id) {
57        case AMQP_CONNECTION_CLOSE_METHOD: {
58          amqp_connection_close_t *m = (amqp_connection_close_t *) x.reply.decoded;
59          fprintf(stderr, "%s: server connection error %d, message: %.*s\n",
60                  context,
61                  m->reply_code,
62                  (int) m->reply_text.len, (char *) m->reply_text.bytes);
63          break;
64        }
65        case AMQP_CHANNEL_CLOSE_METHOD: {
66          amqp_channel_close_t *m = (amqp_channel_close_t *) x.reply.decoded;
67          fprintf(stderr, "%s: server channel error %d, message: %.*s\n",
68                  context,
69                  m->reply_code,
70                  (int) m->reply_text.len, (char *) m->reply_text.bytes);
71          break;
72        }
73        default:
74          fprintf(stderr, "%s: unknown server error, method id 0x%08X\n", context, x.reply.id);
75          break;
76      }
77      break;
78  }
79
80  exit(1);
81}
82
83static void send_batch(amqp_connection_state_t conn, char const *queue_name, int rate_limit, int message_count)
84{
85  uint64_t start_time = now_microseconds();
86  int i;
87  int sent = 0;
88  int previous_sent = 0;
89  uint64_t previous_report_time = start_time;
90  uint64_t next_summary_time = start_time + SUMMARY_EVERY_US;
91
92  char message[256];
93  amqp_bytes_t message_bytes;
94
95  for (i = 0; i < (int)sizeof(message); i++) {
96    message[i] = i & 0xff;
97  }
98
99  message_bytes.len = sizeof(message);
100  message_bytes.bytes = message;
101
102  for (i = 0; i < message_count; i++) {
103    uint64_t now = now_microseconds();
104
105    { 
106      amqp_basic_properties_t props;
107      props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
108      props.content_type = amqp_cstring_bytes("text/plain");
109      props.delivery_mode = 2; /* persistent delivery mode */
110
111      // works ! (queue get populated even if no consumer)
112      die_on_error(amqp_basic_publish(conn, 1, amqp_cstring_bytes(""), amqp_cstring_bytes(queue_name), 0, 0, &props, message_bytes), "Publishing");
113
114      // don't works ! (queue get populated only if consumer up)
115      //die_on_error(amqp_basic_publish(conn, 1, amqp_cstring_bytes("amq.direct"), amqp_cstring_bytes(queue_name), 0, 0, &props, message_bytes), "Publishing");
116    }
117
118          // note that "amq.direct" is a special exchange
119          // (The empty exchange name is an alias for amq.direct)
120    //die_on_error(amqp_basic_publish(conn, 1, amqp_cstring_bytes("amq.direct"), amqp_cstring_bytes(queue_name), 0, 0, NULL, message_bytes), "Publishing");
121
122
123    sent++;
124    if (now > next_summary_time) {
125      int countOverInterval = sent - previous_sent;
126      double intervalRate = countOverInterval / ((now - previous_report_time) / 1000000.0);
127      printf("%d ms: Sent %d - %d since last report (%d Hz)\n", (int)(now - start_time) / 1000, sent, countOverInterval, (int) intervalRate);
128
129      previous_sent = sent;
130      previous_report_time = now;
131      next_summary_time += SUMMARY_EVERY_US;
132    }
133
134    while (((i * 1000000.0) / (now - start_time)) > rate_limit) {
135      microsleep(2000);
136      now = now_microseconds();
137    }
138  }
139
140  {
141    uint64_t stop_time = now_microseconds();
142    int total_delta = stop_time - start_time;
143
144    printf("PRODUCER - Message count: %d\n", message_count);
145    printf("Total time, milliseconds: %d\n", total_delta / 1000);
146    printf("Overall messages-per-second: %g\n", (message_count / (total_delta / 1000000.0)));
147  }
148}
149
150//int main(int argc, char const * const *argv) {
151int main_light(int argc, char const * const *argv) {
152  char const *hostname;
153  int port;
154  int rate_limit;
155  int message_count;
156  int sockfd;
157  amqp_connection_state_t conn;
158
159  //amqp_bytes_t reply_to_queue;
160
161
162  hostname = argv[1];
163  port = atoi(argv[2]);
164  rate_limit = atoi(argv[3]);
165  message_count = atoi(argv[4]);
166
167  conn = amqp_new_connection();
168
169  die_on_error(sockfd = amqp_open_socket(hostname, port), "Opening socket");
170
171  amqp_set_sockfd(conn, sockfd);
172
173  die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), "Logging in");
174
175  amqp_channel_open(conn, 1);
176
177  die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");
178
179  //JRA
180  /*
181  {
182    amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, 1, amqp_cstring_bytes("test queue"), 0, 1, 0, 0, amqp_empty_table); // durable && no auto-delete
183    die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring queue");
184    reply_to_queue = amqp_bytes_malloc_dup(r->queue);
185    if (reply_to_queue.bytes == NULL) {
186      fprintf(stderr, "Out of memory while copying queue name");
187      return 1;
188    }
189  }
190  */
191
192  send_batch(conn, "test queue", rate_limit, message_count); // note that "test queue" here is used as the routing key
193
194  die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel");
195  die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection");
196  die_on_error(amqp_destroy_connection(conn), "Ending connection");
197
198  return 0;
199}
200
201int read_file(char *f,char **source) {
202        FILE *fp = fopen(f, "r");
203        if (fp != NULL) {
204                /* Go to the end of the file. */
205                if (fseek(fp, 0L, SEEK_END) == 0) {
206                        /* Get the size of the file. */
207                        long bufsize = ftell(fp);
208                        if (bufsize == -1) { /* Error */ }
209
210                        /* Allocate our buffer to that size. */
211                        *source = malloc(sizeof(char) * (bufsize + 1));
212
213                        /* Go back to the start of the file. */
214                        if (fseek(fp, 0L, SEEK_SET) == 0) { /* Error */ }
215
216                        /* Read the entire file into memory. */
217                        size_t newLen = fread(*source, sizeof(char), bufsize, fp);
218                        if (newLen == 0) {
219                                fputs("Error reading file\n", stderr);
220        return 1;
221                        } else {
222                                //source[++newLen] = '\0'; /* Just to be safe. */
223                        }
224                }
225                fclose(fp);
226        } else {
227                fputs("File not found\n", stderr);
228                return 1;
229        }
230
231  return 0;
232}
233
234int main(int argc, char * const *argv) {
235  char const *hostname;
236  int port;
237  //char const *exchange;
238  //char const *routingkey;
239  char *body_tmp;
240  char const *body;
241  char *body_final = NULL;
242  char *buf = NULL;
243  int c;
244  char *filepath = NULL;
245  int file_flag=0;
246
247
248 
249  int sockfd;
250  amqp_connection_state_t conn;
251
252  //parsing args
253  opterr = 0;
254  while ((c = getopt (argc, argv, "b:ef:h:p:")) != -1)
255    switch (c)
256      {
257      case 'b':
258        body_tmp = optarg;
259        break;
260      case 'e':
261
262        fprintf(stderr, "Usage: amqp_producer host port\n");
263
264
265        // --- example without config card ---
266
267        // eyJqb2JpZCI6InRvdG8iLCJjb2RlIjoiMDAwMCJ9Cg => '{"jobid":"toto","code":"0000"}'
268        fprintf(stderr, "./sendAMQPMsg -h localhost -p 5672 -b 'eyJqb2JpZCI6InRvdG8iLCJjb2RlIjoiMDAwMCJ9Cg=='\n");
269
270
271        // --- example with config card ---
272
273        // eyJqb2JpZCI6InRvdG8iLCJjb2RlIjoiMDAwMCJ9Cg => '{"jobid":"toto","code":"0000","file":"<base64 encoded file here>"}'
274        fprintf(stderr, "./sendAMQPMsg -h localhost -p 5672 -b 'eyJqb2JpZCI6InRvdG8iLCJjb2RlIjoiMDAwMCJ9Cg==' -f /home/jripsl/snapshot/Monitoring/sample/config.card.base64\n");
275
276        // eyJzaW11aWQiOiJmb29iYXIiLCJqb2JpZCI6InRvdG8iLCJjb2RlIjoiMDAwMCJ9Cg== => '{"simuid":"foobar","jobid":"toto","code":"0000","file":"<base64 encoded file here>"}'
277        fprintf(stderr, "./sendAMQPMsg -h localhost -p 5672 -b 'eyJzaW11aWQiOiJmb29iYXIiLCJqb2JpZCI6InRvdG8iLCJjb2RlIjoiMDAwMCJ9Cg==' -f /home/jripsl/snapshot/Monitoring/sample/config.card.base64\n");
278
279
280        exit(0);
281      case 'f':
282        filepath = optarg;
283        file_flag = 1;
284        break;
285      case 'h':
286        hostname = optarg;
287        break;
288      case 'p':
289        port = atoi(optarg);
290        break;
291      case '?':
292        fprintf (stderr, "ERR001: incorrect argument '-%c'.\n", optopt);
293        exit(EXIT_FAILURE);
294      default:
295        fprintf (stderr, "ERR002: incorrect argument\n");
296        exit(EXIT_FAILURE);
297      }
298
299  //retrieve non-option argument
300  /*
301  int index;
302  for (index = optind; index < argc; index++)
303    printf ("Non-option argument %s\n", argv[index]);
304  */
305
306  // add checks here
307  // (for example, body_tmp is mandatory)
308
309  if(file_flag==1) {
310    // retrieve file contents
311
312    if( access( filepath, F_OK ) != -1 ) {
313        // file exists
314       
315        ;
316    } else {
317      // file doesn't exist
318
319      fprintf(stderr, "File not found (%s)\n",filepath);
320
321      exit(EXIT_FAILURE);
322    }
323
324    int res = read_file(filepath,&buf);
325    if (res != 0) {
326      exit(EXIT_FAILURE);
327    }
328
329    body_final=malloc(5 + strlen(body_tmp) + 6 + strlen(buf) + 1);
330    //strcpy(body_final,"\0");
331    strcat(body_final,"body:");
332    strcat(body_final,body_tmp);
333    strcat(body_final,",file:");
334    strcat(body_final,buf);
335
336    body=body_final;
337
338    //debug
339    //fprintf(stderr, "hostname=%s, port=%d, body=%s, filepath=%s\n", hostname, port, body, filepath);
340
341  } else {
342    // retrieve msg body from argument
343
344    body_final=malloc(5 + strlen(body_tmp) + 1);
345
346    strcat(body_final,"body:");
347    strcat(body_final,body_tmp);
348
349    body=body_final;
350  }
351
352  //exit(0);
353
354  conn = amqp_new_connection();
355
356  die_on_error(sockfd = amqp_open_socket(hostname, port), "Opening socket");
357  amqp_set_sockfd(conn, sockfd);
358  die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), "Logging in");
359  amqp_channel_open(conn, 1);
360  die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");
361
362  amqp_exchange_declare(conn, 1, amqp_cstring_bytes("myexchange"), amqp_cstring_bytes("fanout"), 0, 0, amqp_empty_table);
363  die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring exchange");
364
365  amqp_queue_declare(conn, 1, amqp_cstring_bytes("myqueue"), 0, 1, 0, 0, amqp_empty_table); // durable && no auto-delete
366  die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring queue");
367
368  amqp_queue_bind(conn, 1, amqp_cstring_bytes("myqueue"), amqp_cstring_bytes("myexchange"), amqp_cstring_bytes(""), amqp_empty_table); //no need for binding key as we use the fanout exchange type
369  die_on_amqp_error(amqp_get_rpc_reply(conn), "Binding");
370
371  { 
372    amqp_basic_properties_t props;
373    props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
374    props.content_type = amqp_cstring_bytes("text/plain");
375    props.delivery_mode = 2; /* persistent delivery mode */
376    die_on_error(amqp_basic_publish(conn, 1, amqp_cstring_bytes("myexchange"), amqp_cstring_bytes(""), 0, 0, &props, amqp_cstring_bytes(body)), "Publishing");
377  }
378
379  die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel");
380  die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection");
381  die_on_error(amqp_destroy_connection(conn), "Ending connection");
382
383  if(file_flag==1)
384    free(buf);
385
386  free(body_final);
387
388  //this is to prevent overwhelming rabbitMQ server (and to prevent triggering rabbitMQ defensive behaviours (e.g. connection blocking))
389  //usleep( 200000 ); // O.2 second
390
391
392  return 0;
393}
394
395// -- not used -- //
396
397static void dump_row(long count, int numinrow, int *chs) {
398  int i;
399
400  printf("%08lX:", count - numinrow);
401
402  if (numinrow > 0) {
403    for (i = 0; i < numinrow; i++) {
404      if (i == 8)
405    printf(" :");
406      printf(" %02X", chs[i]);
407    }
408    for (i = numinrow; i < 16; i++) {
409      if (i == 8)
410    printf(" :");
411      printf("   ");
412    }
413    printf("  ");
414    for (i = 0; i < numinrow; i++) {
415      if (isprint(chs[i]))
416    printf("%c", chs[i]);
417      else
418    printf(".");
419    }
420  }
421  printf("\n");
422}
423
424static int rows_eq(int *a, int *b) {
425  int i;
426
427  for (i=0; i<16; i++)
428    if (a[i] != b[i])
429      return 0;
430
431  return 1;
432}
433
434void amqp_dump(void const *buffer, size_t len) {
435  unsigned char *buf = (unsigned char *) buffer;
436  long count = 0;
437  int numinrow = 0;
438  int chs[16];
439  int oldchs[16] = {0};
440  int showed_dots = 0;
441  size_t i;
442
443  for (i = 0; i < len; i++) {
444    int ch = buf[i];
445
446    if (numinrow == 16) {
447      int i;
448
449      if (rows_eq(oldchs, chs)) {
450    if (!showed_dots) {
451      showed_dots = 1;
452      printf("          .. .. .. .. .. .. .. .. : .. .. .. .. .. .. .. ..\n");
453    } 
454      } else {
455    showed_dots = 0;
456    dump_row(count, numinrow, chs);
457      }
458
459      for (i=0; i<16; i++)
460    oldchs[i] = chs[i];
461
462      numinrow = 0;
463    }
464
465    count++;
466    chs[numinrow++] = ch;
467  }
468
469  dump_row(count, numinrow, chs);
470
471  if (numinrow != 0)
472    printf("%08lX:\n", count);
473}
474
475/* vi: set et ts=2 sw=2: */
Note: See TracBrowser for help on using the repository browser.