source: trunk/Monitoring/CNClient/send_AMQP_msg.c @ 857

Last change on this file since 857 was 857, checked in by jripsl, 11 years ago
  • start database communication implementation.
  • add usleep( 200 000 ) in CNClient to prevent overflow on the server side.
File size: 12.1 KB
Line 
1#include <stdlib.h>
2#include <stdio.h>
3#include <string.h>
4#include <stdint.h>
5#include <sys/time.h>
6#include <unistd.h>
7
8
9#include <ctype.h>
10
11#include <amqp.h>
12#include <amqp_framing.h>
13
14#include "utils.h"
15
16
17#define SUMMARY_EVERY_US 1000000
18
19
20
21uint64_t now_microseconds(void)
22{
23  struct timeval tv;
24  gettimeofday(&tv, NULL);
25  return (uint64_t) tv.tv_sec * 1000000 + (uint64_t) tv.tv_usec;
26}
27
28void microsleep(int usec)
29{
30  usleep(usec);
31}
32
33void die_on_error(int x, char const *context) {
34  if (x < 0) {
35    char *errstr = amqp_error_string(-x);
36    fprintf(stderr, "%s: %s\n", context, errstr);
37    free(errstr);
38    exit(1);
39  }
40}
41
42void die_on_amqp_error(amqp_rpc_reply_t x, char const *context) {
43  switch (x.reply_type) {
44    case AMQP_RESPONSE_NORMAL:
45      return;
46
47    case AMQP_RESPONSE_NONE:
48      fprintf(stderr, "%s: missing RPC reply type!\n", context);
49      break;
50
51    case AMQP_RESPONSE_LIBRARY_EXCEPTION:
52      fprintf(stderr, "%s: %s\n", context, amqp_error_string(x.library_error));
53      break;
54
55    case AMQP_RESPONSE_SERVER_EXCEPTION:
56      switch (x.reply.id) {
57        case AMQP_CONNECTION_CLOSE_METHOD: {
58          amqp_connection_close_t *m = (amqp_connection_close_t *) x.reply.decoded;
59          fprintf(stderr, "%s: server connection error %d, message: %.*s\n",
60                  context,
61                  m->reply_code,
62                  (int) m->reply_text.len, (char *) m->reply_text.bytes);
63          break;
64        }
65        case AMQP_CHANNEL_CLOSE_METHOD: {
66          amqp_channel_close_t *m = (amqp_channel_close_t *) x.reply.decoded;
67          fprintf(stderr, "%s: server channel error %d, message: %.*s\n",
68                  context,
69                  m->reply_code,
70                  (int) m->reply_text.len, (char *) m->reply_text.bytes);
71          break;
72        }
73        default:
74          fprintf(stderr, "%s: unknown server error, method id 0x%08X\n", context, x.reply.id);
75          break;
76      }
77      break;
78  }
79
80  exit(1);
81}
82
83static void send_batch(amqp_connection_state_t conn, char const *queue_name, int rate_limit, int message_count)
84{
85  uint64_t start_time = now_microseconds();
86  int i;
87  int sent = 0;
88  int previous_sent = 0;
89  uint64_t previous_report_time = start_time;
90  uint64_t next_summary_time = start_time + SUMMARY_EVERY_US;
91
92  char message[256];
93  amqp_bytes_t message_bytes;
94
95  for (i = 0; i < (int)sizeof(message); i++) {
96    message[i] = i & 0xff;
97  }
98
99  message_bytes.len = sizeof(message);
100  message_bytes.bytes = message;
101
102  for (i = 0; i < message_count; i++) {
103    uint64_t now = now_microseconds();
104
105    { 
106      amqp_basic_properties_t props;
107      props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
108      props.content_type = amqp_cstring_bytes("text/plain");
109      props.delivery_mode = 2; /* persistent delivery mode */
110
111      // works ! (queue get populated even if no consumer)
112      die_on_error(amqp_basic_publish(conn, 1, amqp_cstring_bytes(""), amqp_cstring_bytes(queue_name), 0, 0, &props, message_bytes), "Publishing");
113
114      // don't works ! (queue get populated only if consumer up)
115      //die_on_error(amqp_basic_publish(conn, 1, amqp_cstring_bytes("amq.direct"), amqp_cstring_bytes(queue_name), 0, 0, &props, message_bytes), "Publishing");
116    }
117
118          // note that "amq.direct" is a special exchange
119          // (The empty exchange name is an alias for amq.direct)
120    //die_on_error(amqp_basic_publish(conn, 1, amqp_cstring_bytes("amq.direct"), amqp_cstring_bytes(queue_name), 0, 0, NULL, message_bytes), "Publishing");
121
122
123    sent++;
124    if (now > next_summary_time) {
125      int countOverInterval = sent - previous_sent;
126      double intervalRate = countOverInterval / ((now - previous_report_time) / 1000000.0);
127      printf("%d ms: Sent %d - %d since last report (%d Hz)\n", (int)(now - start_time) / 1000, sent, countOverInterval, (int) intervalRate);
128
129      previous_sent = sent;
130      previous_report_time = now;
131      next_summary_time += SUMMARY_EVERY_US;
132    }
133
134    while (((i * 1000000.0) / (now - start_time)) > rate_limit) {
135      microsleep(2000);
136      now = now_microseconds();
137    }
138  }
139
140  {
141    uint64_t stop_time = now_microseconds();
142    int total_delta = stop_time - start_time;
143
144    printf("PRODUCER - Message count: %d\n", message_count);
145    printf("Total time, milliseconds: %d\n", total_delta / 1000);
146    printf("Overall messages-per-second: %g\n", (message_count / (total_delta / 1000000.0)));
147  }
148}
149
150//int main(int argc, char const * const *argv) {
151int main_light(int argc, char const * const *argv) {
152  char const *hostname;
153  int port;
154  int rate_limit;
155  int message_count;
156  int sockfd;
157  amqp_connection_state_t conn;
158
159  //amqp_bytes_t reply_to_queue;
160
161
162  hostname = argv[1];
163  port = atoi(argv[2]);
164  rate_limit = atoi(argv[3]);
165  message_count = atoi(argv[4]);
166
167  conn = amqp_new_connection();
168
169  die_on_error(sockfd = amqp_open_socket(hostname, port), "Opening socket");
170
171  amqp_set_sockfd(conn, sockfd);
172
173  die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), "Logging in");
174
175  amqp_channel_open(conn, 1);
176
177  die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");
178
179  //JRA
180  /*
181  {
182    amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, 1, amqp_cstring_bytes("test queue"), 0, 1, 0, 0, amqp_empty_table); // durable && no auto-delete
183    die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring queue");
184    reply_to_queue = amqp_bytes_malloc_dup(r->queue);
185    if (reply_to_queue.bytes == NULL) {
186      fprintf(stderr, "Out of memory while copying queue name");
187      return 1;
188    }
189  }
190  */
191
192  send_batch(conn, "test queue", rate_limit, message_count); // note that "test queue" here is used as the routing key
193
194  die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel");
195  die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection");
196  die_on_error(amqp_destroy_connection(conn), "Ending connection");
197
198  return 0;
199}
200
201int read_file(char *f,char **source) {
202        FILE *fp = fopen(f, "r");
203        if (fp != NULL) {
204                /* Go to the end of the file. */
205                if (fseek(fp, 0L, SEEK_END) == 0) {
206                        /* Get the size of the file. */
207                        long bufsize = ftell(fp);
208                        if (bufsize == -1) { /* Error */ }
209
210                        /* Allocate our buffer to that size. */
211                        *source = malloc(sizeof(char) * (bufsize + 1));
212
213                        /* Go back to the start of the file. */
214                        if (fseek(fp, 0L, SEEK_SET) == 0) { /* Error */ }
215
216                        /* Read the entire file into memory. */
217                        size_t newLen = fread(*source, sizeof(char), bufsize, fp);
218                        if (newLen == 0) {
219                                fputs("Error reading file\n", stderr);
220        return 1;
221                        } else {
222                                //source[++newLen] = '\0'; /* Just to be safe. */
223                        }
224                }
225                fclose(fp);
226        } else {
227                fputs("File not found\n", stderr);
228                return 1;
229        }
230
231  return 0;
232}
233
234int main(int argc, char * const *argv) {
235  char const *hostname;
236  int port;
237  //char const *exchange;
238  //char const *routingkey;
239  char *body_tmp;
240  char const *body;
241  char *body_final = NULL;
242  char *buf = NULL;
243  int c;
244  char *filepath = NULL;
245  int file_flag=0;
246
247
248 
249  int sockfd;
250  amqp_connection_state_t conn;
251
252  //parsing args
253  opterr = 0;
254  while ((c = getopt (argc, argv, "b:ef:h:p:")) != -1)
255    switch (c)
256      {
257      case 'b':
258        body_tmp = optarg;
259        break;
260      case 'e':
261        fprintf(stderr, "Usage: amqp_producer host port rate_limit message_count\n");
262
263        fprintf(stderr, "Example 1: ./sendAMQPMsg -h localhost -p 5672 -b 'eyJqb2JpZCI6InRvdG8iLCJjb2RlIjoiMDAwMCJ9Cg=='\n");
264
265        // obsolete (from now all sent data must be B64 encoded)
266        fprintf(stderr, "Example 2: ./sendAMQPMsg -h localhost -p 5672 -f /home/foobar/config.card -b '{\"jobid\":\"toto\"}'\n");
267
268        fprintf(stderr, "Example 3: ./sendAMQPMsg -h localhost -p 5672 -b 'am9iaWQ6dG90byBjb2RlOjAwMDAK' -f ../sample/config.card.base64\n");
269        exit(0);
270      case 'f':
271        filepath = optarg;
272        file_flag = 1;
273        break;
274      case 'h':
275        hostname = optarg;
276        break;
277      case 'p':
278        port = atoi(optarg);
279        break;
280      case '?':
281        fprintf (stderr, "ERR001: incorrect argument '-%c'.\n", optopt);
282        exit(EXIT_FAILURE);
283      default:
284        fprintf (stderr, "ERR002: incorrect argument\n");
285        exit(EXIT_FAILURE);
286      }
287
288  //retrieve non-option argument
289  /*
290  int index;
291  for (index = optind; index < argc; index++)
292    printf ("Non-option argument %s\n", argv[index]);
293  */
294
295  // add checks here
296  // (for example, body_tmp is mandatory)
297
298  if(file_flag==1) {
299    // retrieve file contents
300
301    if( access( filepath, F_OK ) != -1 ) {
302        // file exists
303       
304        ;
305    } else {
306      // file doesn't exist
307
308      fprintf(stderr, "File not found (%s)\n",filepath);
309
310      exit(EXIT_FAILURE);
311    }
312
313    int res = read_file(filepath,&buf);
314    if (res != 0) {
315      exit(EXIT_FAILURE);
316    }
317
318    body_final=malloc(strlen(body_tmp) + 6 + strlen(buf) + 1);
319    //strcpy(body_final,"\0");
320    strcat(body_final,body_tmp);
321    strcat(body_final,",file=");
322    strcat(body_final,buf);
323
324    body=body_final;
325
326    //debug
327    //fprintf(stderr, "hostname=%s, port=%d, body=%s, filepath=%s\n", hostname, port, body, filepath);
328
329  } else {
330    // retrieve msg body from argument
331
332    body=body_tmp;
333
334  }
335
336  //exit(0);
337
338  conn = amqp_new_connection();
339
340  die_on_error(sockfd = amqp_open_socket(hostname, port), "Opening socket");
341  amqp_set_sockfd(conn, sockfd);
342  die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), "Logging in");
343  amqp_channel_open(conn, 1);
344  die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");
345
346  amqp_exchange_declare(conn, 1, amqp_cstring_bytes("myexchange"), amqp_cstring_bytes("fanout"), 0, 0, amqp_empty_table);
347  die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring exchange");
348
349  amqp_queue_declare(conn, 1, amqp_cstring_bytes("myqueue"), 0, 1, 0, 0, amqp_empty_table); // durable && no auto-delete
350  die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring queue");
351
352  amqp_queue_bind(conn, 1, amqp_cstring_bytes("myqueue"), amqp_cstring_bytes("myexchange"), amqp_cstring_bytes(""), amqp_empty_table); //no need for binding key as we use the fanout exchange type
353  die_on_amqp_error(amqp_get_rpc_reply(conn), "Binding");
354
355  { 
356    amqp_basic_properties_t props;
357    props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
358    props.content_type = amqp_cstring_bytes("text/plain");
359    props.delivery_mode = 2; /* persistent delivery mode */
360    die_on_error(amqp_basic_publish(conn, 1, amqp_cstring_bytes("myexchange"), amqp_cstring_bytes(""), 0, 0, &props, amqp_cstring_bytes(body)), "Publishing");
361  }
362
363  die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel");
364  die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection");
365  die_on_error(amqp_destroy_connection(conn), "Ending connection");
366
367  if(file_flag==1)
368    free(buf);
369
370
371  //this is to prevent overwhelming rabbitMQ server (and to prevent triggering rabbitMQ defensive behaviours (e.g. connection blocking))
372  usleep( 200 000 ); // O.2 second
373
374
375  return 0;
376}
377
378// -- not used -- //
379
380static void dump_row(long count, int numinrow, int *chs) {
381  int i;
382
383  printf("%08lX:", count - numinrow);
384
385  if (numinrow > 0) {
386    for (i = 0; i < numinrow; i++) {
387      if (i == 8)
388    printf(" :");
389      printf(" %02X", chs[i]);
390    }
391    for (i = numinrow; i < 16; i++) {
392      if (i == 8)
393    printf(" :");
394      printf("   ");
395    }
396    printf("  ");
397    for (i = 0; i < numinrow; i++) {
398      if (isprint(chs[i]))
399    printf("%c", chs[i]);
400      else
401    printf(".");
402    }
403  }
404  printf("\n");
405}
406
407static int rows_eq(int *a, int *b) {
408  int i;
409
410  for (i=0; i<16; i++)
411    if (a[i] != b[i])
412      return 0;
413
414  return 1;
415}
416
417void amqp_dump(void const *buffer, size_t len) {
418  unsigned char *buf = (unsigned char *) buffer;
419  long count = 0;
420  int numinrow = 0;
421  int chs[16];
422  int oldchs[16] = {0};
423  int showed_dots = 0;
424  size_t i;
425
426  for (i = 0; i < len; i++) {
427    int ch = buf[i];
428
429    if (numinrow == 16) {
430      int i;
431
432      if (rows_eq(oldchs, chs)) {
433    if (!showed_dots) {
434      showed_dots = 1;
435      printf("          .. .. .. .. .. .. .. .. : .. .. .. .. .. .. .. ..\n");
436    } 
437      } else {
438    showed_dots = 0;
439    dump_row(count, numinrow, chs);
440      }
441
442      for (i=0; i<16; i++)
443    oldchs[i] = chs[i];
444
445      numinrow = 0;
446    }
447
448    count++;
449    chs[numinrow++] = ch;
450  }
451
452  dump_row(count, numinrow, chs);
453
454  if (numinrow != 0)
455    printf("%08lX:\n", count);
456}
457
458/* vi: set et ts=2 sw=2: */
Note: See TracBrowser for help on using the repository browser.