source: trunk/Monitoring/CNClient/send_AMQP_msg.c @ 962

Last change on this file since 962 was 962, checked in by jripsl, 10 years ago
  • add sample config card file.
  • add stress test option.
File size: 13.0 KB
Line 
1#include <stdlib.h>
2#include <stdio.h>
3#include <string.h>
4#include <stdint.h>
5#include <sys/time.h>
6#include <unistd.h>
7
8
9#include <ctype.h>
10
11#include <amqp.h>
12#include <amqp_framing.h>
13
14#include "utils.h"
15
16
17#define SUMMARY_EVERY_US 1000000
18
19
20
21uint64_t now_microseconds(void)
22{
23  struct timeval tv;
24  gettimeofday(&tv, NULL);
25  return (uint64_t) tv.tv_sec * 1000000 + (uint64_t) tv.tv_usec;
26}
27
28void microsleep(int usec)
29{
30  usleep(usec);
31}
32
33void die_on_error(int x, char const *context) {
34  if (x < 0) {
35    char *errstr = amqp_error_string(-x);
36    fprintf(stderr, "%s: %s\n", context, errstr);
37    free(errstr);
38    exit(1);
39  }
40}
41
42void die_on_amqp_error(amqp_rpc_reply_t x, char const *context) {
43  switch (x.reply_type) {
44    case AMQP_RESPONSE_NORMAL:
45      return;
46
47    case AMQP_RESPONSE_NONE:
48      fprintf(stderr, "%s: missing RPC reply type!\n", context);
49      break;
50
51    case AMQP_RESPONSE_LIBRARY_EXCEPTION:
52      fprintf(stderr, "%s: %s\n", context, amqp_error_string(x.library_error));
53      break;
54
55    case AMQP_RESPONSE_SERVER_EXCEPTION:
56      switch (x.reply.id) {
57        case AMQP_CONNECTION_CLOSE_METHOD: {
58          amqp_connection_close_t *m = (amqp_connection_close_t *) x.reply.decoded;
59          fprintf(stderr, "%s: server connection error %d, message: %.*s\n",
60                  context,
61                  m->reply_code,
62                  (int) m->reply_text.len, (char *) m->reply_text.bytes);
63          break;
64        }
65        case AMQP_CHANNEL_CLOSE_METHOD: {
66          amqp_channel_close_t *m = (amqp_channel_close_t *) x.reply.decoded;
67          fprintf(stderr, "%s: server channel error %d, message: %.*s\n",
68                  context,
69                  m->reply_code,
70                  (int) m->reply_text.len, (char *) m->reply_text.bytes);
71          break;
72        }
73        default:
74          fprintf(stderr, "%s: unknown server error, method id 0x%08X\n", context, x.reply.id);
75          break;
76      }
77      break;
78  }
79
80  exit(1);
81}
82
83static void send_batch(amqp_connection_state_t conn, char const *queue_name, int rate_limit, int message_count)
84{
85  uint64_t start_time = now_microseconds();
86  int i;
87  int sent = 0;
88  int previous_sent = 0;
89  uint64_t previous_report_time = start_time;
90  uint64_t next_summary_time = start_time + SUMMARY_EVERY_US;
91
92  char message[256];
93  amqp_bytes_t message_bytes;
94
95  for (i = 0; i < (int)sizeof(message); i++) {
96    message[i] = i & 0xff;
97  }
98
99  message_bytes.len = sizeof(message);
100  message_bytes.bytes = message;
101
102  for (i = 0; i < message_count; i++) {
103    uint64_t now = now_microseconds();
104
105    { 
106      amqp_basic_properties_t props;
107      props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
108      props.content_type = amqp_cstring_bytes("text/plain");
109      props.delivery_mode = 2; /* persistent delivery mode */
110
111      // works ! (queue get populated even if no consumer)
112      die_on_error(amqp_basic_publish(conn, 1, amqp_cstring_bytes(""), amqp_cstring_bytes(queue_name), 0, 0, &props, message_bytes), "Publishing");
113
114      // don't works ! (queue get populated only if consumer up)
115      //die_on_error(amqp_basic_publish(conn, 1, amqp_cstring_bytes("amq.direct"), amqp_cstring_bytes(queue_name), 0, 0, &props, message_bytes), "Publishing");
116    }
117
118          // note that "amq.direct" is a special exchange
119          // (The empty exchange name is an alias for amq.direct)
120    //die_on_error(amqp_basic_publish(conn, 1, amqp_cstring_bytes("amq.direct"), amqp_cstring_bytes(queue_name), 0, 0, NULL, message_bytes), "Publishing");
121
122
123    sent++;
124    if (now > next_summary_time) {
125      int countOverInterval = sent - previous_sent;
126      double intervalRate = countOverInterval / ((now - previous_report_time) / 1000000.0);
127      printf("%d ms: Sent %d - %d since last report (%d Hz)\n", (int)(now - start_time) / 1000, sent, countOverInterval, (int) intervalRate);
128
129      previous_sent = sent;
130      previous_report_time = now;
131      next_summary_time += SUMMARY_EVERY_US;
132    }
133
134    while (((i * 1000000.0) / (now - start_time)) > rate_limit) {
135      microsleep(2000);
136      now = now_microseconds();
137    }
138  }
139
140  {
141    uint64_t stop_time = now_microseconds();
142    int total_delta = stop_time - start_time;
143
144    printf("PRODUCER - Message count: %d\n", message_count);
145    printf("Total time, milliseconds: %d\n", total_delta / 1000);
146    printf("Overall messages-per-second: %g\n", (message_count / (total_delta / 1000000.0)));
147  }
148}
149
150//int main(int argc, char const * const *argv) {
151int main_light(int argc, char const * const *argv) {
152  char const *hostname;
153  int port;
154  int rate_limit;
155  int message_count;
156  int sockfd;
157  amqp_connection_state_t conn;
158
159  //amqp_bytes_t reply_to_queue;
160
161
162  hostname = argv[1];
163  port = atoi(argv[2]);
164  rate_limit = atoi(argv[3]);
165  message_count = atoi(argv[4]);
166
167  conn = amqp_new_connection();
168
169  die_on_error(sockfd = amqp_open_socket(hostname, port), "Opening socket");
170
171  amqp_set_sockfd(conn, sockfd);
172
173  die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), "Logging in");
174
175  amqp_channel_open(conn, 1);
176
177  die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");
178
179  //JRA
180  /*
181  {
182    amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, 1, amqp_cstring_bytes("test queue"), 0, 1, 0, 0, amqp_empty_table); // durable && no auto-delete
183    die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring queue");
184    reply_to_queue = amqp_bytes_malloc_dup(r->queue);
185    if (reply_to_queue.bytes == NULL) {
186      fprintf(stderr, "Out of memory while copying queue name");
187      return 1;
188    }
189  }
190  */
191
192  send_batch(conn, "test queue", rate_limit, message_count); // note that "test queue" here is used as the routing key
193
194  die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel");
195  die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection");
196  die_on_error(amqp_destroy_connection(conn), "Ending connection");
197
198  return 0;
199}
200
201int read_file(char *f,char **source) {
202        FILE *fp = fopen(f, "r");
203        if (fp != NULL) {
204                /* Go to the end of the file. */
205                if (fseek(fp, 0L, SEEK_END) == 0) {
206                        /* Get the size of the file. */
207                        long bufsize = ftell(fp);
208                        if (bufsize == -1) { /* Error */ }
209
210                        /* Allocate our buffer to that size. */
211                        *source = malloc(sizeof(char) * (bufsize + 1));
212
213                        /* Go back to the start of the file. */
214                        if (fseek(fp, 0L, SEEK_SET) == 0) { /* Error */ }
215
216                        /* Read the entire file into memory. */
217                        size_t newLen = fread(*source, sizeof(char), bufsize, fp);
218                        if (newLen == 0) {
219                                fputs("Error reading file\n", stderr);
220        return 1;
221                        } else {
222                                //source[++newLen] = '\0'; /* Just to be safe. */
223                        }
224                }
225                fclose(fp);
226        } else {
227                fputs("File not found\n", stderr);
228                return 1;
229        }
230
231  return 0;
232}
233
234int main(int argc, char * const *argv) {
235  char const *hostname;
236  int msg_count=1;
237  int port;
238  //char const *exchange;
239  //char const *routingkey;
240  char *body_tmp;
241  char const *body;
242  char *body_final = NULL;
243  char *buf = NULL;
244  int c;
245  int i;
246  char *filepath = NULL;
247  int file_flag=0;
248
249
250 
251  int sockfd;
252  amqp_connection_state_t conn;
253
254  //parsing args
255  opterr = 0;
256  while ((c = getopt (argc, argv, "b:ef:h:n:p:u")) != -1)
257    switch (c)
258      {
259      case 'b':
260        body_tmp = optarg;
261        break;
262      case 'e':
263
264        // --- example without config card ---
265
266        // eyJqb2JpZCI6InRvdG8iLCJjb2RlIjoiMDAwMCJ9Cg => '{"jobid":"toto","code":"0000"}'
267        fprintf(stderr, "./sendAMQPMsg -h localhost -p 5672 -b 'eyJqb2JpZCI6InRvdG8iLCJjb2RlIjoiMDAwMCJ9Cg=='\n");
268
269
270        // --- example with config card ---
271
272        // eyJqb2JpZCI6InRvdG8iLCJjb2RlIjoiMDAwMCJ9Cg => '{"jobid":"toto","code":"0000","file":"<base64 encoded file here>"}'
273        fprintf(stderr, "./sendAMQPMsg -h localhost -p 5672 -b 'eyJqb2JpZCI6InRvdG8iLCJjb2RlIjoiMDAwMCJ9Cg==' -f /home/jripsl/snapshot/Monitoring/sample/config.card.base64\n");
274
275        // eyJzaW11aWQiOiJmb29iYXIiLCJqb2JpZCI6InRvdG8iLCJjb2RlIjoiMDAwMCJ9Cg== => '{"simuid":"foobar","jobid":"toto","code":"0000","file":"<base64 encoded file here>"}'
276        fprintf(stderr, "./sendAMQPMsg -h localhost -p 5672 -b 'eyJzaW11aWQiOiJmb29iYXIiLCJqb2JpZCI6InRvdG8iLCJjb2RlIjoiMDAwMCJ9Cg==' -f /home/jripsl/snapshot/Monitoring/sample/config.card.base64\n");
277
278
279        exit(0);
280      case 'f':
281        filepath = optarg;
282        file_flag = 1;
283        break;
284      case 'h':
285        hostname = optarg;
286        break;
287      case 'n':
288        msg_count = atoi(optarg);
289        break;
290      case 'p':
291        port = atoi(optarg);
292        break;
293      case 'u':
294        fprintf(stderr, "Usage: amqp_producer host port -b <body>\n");
295        exit(EXIT_SUCCESS);
296      case '?':
297        fprintf (stderr, "ERR001: incorrect argument '-%c'.\n", optopt);
298        exit(EXIT_FAILURE);
299      default:
300        fprintf (stderr, "ERR002: incorrect argument\n");
301        exit(EXIT_FAILURE);
302      }
303
304  //retrieve non-option argument
305  /*
306  int index;
307  for (index = optind; index < argc; index++)
308    printf ("Non-option argument %s\n", argv[index]);
309  */
310
311  // add checks here
312  // (for example, body_tmp is mandatory)
313
314  if(file_flag==1) {
315    // retrieve file contents
316
317    if( access( filepath, F_OK ) != -1 ) {
318        // file exists
319       
320        ;
321    } else {
322      // file doesn't exist
323
324      fprintf(stderr, "File not found (%s)\n",filepath);
325
326      exit(EXIT_FAILURE);
327    }
328
329    int res = read_file(filepath,&buf);
330    if (res != 0) {
331      exit(EXIT_FAILURE);
332    }
333
334    body_final=malloc(5 + strlen(body_tmp) + 6 + strlen(buf) + 1);
335    //strcpy(body_final,"\0");
336    strcat(body_final,"body:");
337    strcat(body_final,body_tmp);
338    strcat(body_final,",file:");
339    strcat(body_final,buf);
340
341    body=body_final;
342
343    //debug
344    //fprintf(stderr, "hostname=%s, port=%d, body=%s, filepath=%s\n", hostname, port, body, filepath);
345
346  } else {
347    // retrieve msg body from argument
348
349    body_final=malloc(5 + strlen(body_tmp) + 1);
350
351    strcat(body_final,"body:");
352    strcat(body_final,body_tmp);
353
354    body=body_final;
355  }
356
357  //exit(0);
358
359  conn = amqp_new_connection();
360
361  die_on_error(sockfd = amqp_open_socket(hostname, port), "Opening socket");
362  amqp_set_sockfd(conn, sockfd);
363  die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), "Logging in");
364  amqp_channel_open(conn, 1);
365  die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");
366
367  amqp_exchange_declare(conn, 1, amqp_cstring_bytes("myexchange"), amqp_cstring_bytes("fanout"), 0, 0, amqp_empty_table);
368  die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring exchange");
369
370  amqp_queue_declare(conn, 1, amqp_cstring_bytes("myqueue"), 0, 1, 0, 0, amqp_empty_table); // durable && no auto-delete
371  die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring queue");
372
373  amqp_queue_bind(conn, 1, amqp_cstring_bytes("myqueue"), amqp_cstring_bytes("myexchange"), amqp_cstring_bytes(""), amqp_empty_table); //no need for binding key as we use the fanout exchange type
374  die_on_amqp_error(amqp_get_rpc_reply(conn), "Binding");
375
376  { 
377    amqp_basic_properties_t props;
378    props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
379    props.content_type = amqp_cstring_bytes("text/plain");
380    props.delivery_mode = 2; /* persistent delivery mode */
381
382
383    for (i = 0; i < msg_count; i++)
384      die_on_error(amqp_basic_publish(conn, 1, amqp_cstring_bytes("myexchange"), amqp_cstring_bytes(""), 0, 0, &props, amqp_cstring_bytes(body)), "Publishing");
385  }
386
387  die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel");
388  die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection");
389  die_on_error(amqp_destroy_connection(conn), "Ending connection");
390
391  if(file_flag==1)
392    free(buf);
393
394  free(body_final);
395
396  //this is to prevent overwhelming rabbitMQ server (and to prevent triggering rabbitMQ defensive behaviours (e.g. connection blocking))
397  //usleep( 200000 ); // O.2 second
398
399
400  return 0;
401}
402
403// -- not used -- //
404
405static void dump_row(long count, int numinrow, int *chs) {
406  int i;
407
408  printf("%08lX:", count - numinrow);
409
410  if (numinrow > 0) {
411    for (i = 0; i < numinrow; i++) {
412      if (i == 8)
413    printf(" :");
414      printf(" %02X", chs[i]);
415    }
416    for (i = numinrow; i < 16; i++) {
417      if (i == 8)
418    printf(" :");
419      printf("   ");
420    }
421    printf("  ");
422    for (i = 0; i < numinrow; i++) {
423      if (isprint(chs[i]))
424    printf("%c", chs[i]);
425      else
426    printf(".");
427    }
428  }
429  printf("\n");
430}
431
432static int rows_eq(int *a, int *b) {
433  int i;
434
435  for (i=0; i<16; i++)
436    if (a[i] != b[i])
437      return 0;
438
439  return 1;
440}
441
442void amqp_dump(void const *buffer, size_t len) {
443  unsigned char *buf = (unsigned char *) buffer;
444  long count = 0;
445  int numinrow = 0;
446  int chs[16];
447  int oldchs[16] = {0};
448  int showed_dots = 0;
449  size_t i;
450
451  for (i = 0; i < len; i++) {
452    int ch = buf[i];
453
454    if (numinrow == 16) {
455      int i;
456
457      if (rows_eq(oldchs, chs)) {
458    if (!showed_dots) {
459      showed_dots = 1;
460      printf("          .. .. .. .. .. .. .. .. : .. .. .. .. .. .. .. ..\n");
461    } 
462      } else {
463    showed_dots = 0;
464    dump_row(count, numinrow, chs);
465      }
466
467      for (i=0; i<16; i++)
468    oldchs[i] = chs[i];
469
470      numinrow = 0;
471    }
472
473    count++;
474    chs[numinrow++] = ch;
475  }
476
477  dump_row(count, numinrow, chs);
478
479  if (numinrow != 0)
480    printf("%08lX:\n", count);
481}
482
483/* vi: set et ts=2 sw=2: */
Note: See TracBrowser for help on using the repository browser.