#include #include #include #include #include #include #include #include #include #include "utils.h" #define SUMMARY_EVERY_US 1000000 uint64_t now_microseconds(void) { struct timeval tv; gettimeofday(&tv, NULL); return (uint64_t) tv.tv_sec * 1000000 + (uint64_t) tv.tv_usec; } void microsleep(int usec) { usleep(usec); } void die_on_error(int x, char const *context) { if (x < 0) { char *errstr = amqp_error_string(-x); fprintf(stderr, "%s: %s\n", context, errstr); free(errstr); exit(1); } } void die_on_amqp_error(amqp_rpc_reply_t x, char const *context) { switch (x.reply_type) { case AMQP_RESPONSE_NORMAL: return; case AMQP_RESPONSE_NONE: fprintf(stderr, "%s: missing RPC reply type!\n", context); break; case AMQP_RESPONSE_LIBRARY_EXCEPTION: fprintf(stderr, "%s: %s\n", context, amqp_error_string(x.library_error)); break; case AMQP_RESPONSE_SERVER_EXCEPTION: switch (x.reply.id) { case AMQP_CONNECTION_CLOSE_METHOD: { amqp_connection_close_t *m = (amqp_connection_close_t *) x.reply.decoded; fprintf(stderr, "%s: server connection error %d, message: %.*s\n", context, m->reply_code, (int) m->reply_text.len, (char *) m->reply_text.bytes); break; } case AMQP_CHANNEL_CLOSE_METHOD: { amqp_channel_close_t *m = (amqp_channel_close_t *) x.reply.decoded; fprintf(stderr, "%s: server channel error %d, message: %.*s\n", context, m->reply_code, (int) m->reply_text.len, (char *) m->reply_text.bytes); break; } default: fprintf(stderr, "%s: unknown server error, method id 0x%08X\n", context, x.reply.id); break; } break; } exit(1); } static void send_batch(amqp_connection_state_t conn, char const *queue_name, int rate_limit, int message_count) { uint64_t start_time = now_microseconds(); int i; int sent = 0; int previous_sent = 0; uint64_t previous_report_time = start_time; uint64_t next_summary_time = start_time + SUMMARY_EVERY_US; char message[256]; amqp_bytes_t message_bytes; for (i = 0; i < (int)sizeof(message); i++) { message[i] = i & 0xff; } message_bytes.len = sizeof(message); message_bytes.bytes = message; for (i = 0; i < message_count; i++) { uint64_t now = now_microseconds(); { amqp_basic_properties_t props; props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG; props.content_type = amqp_cstring_bytes("text/plain"); props.delivery_mode = 2; /* persistent delivery mode */ // works ! (queue get populated even if no consumer) die_on_error(amqp_basic_publish(conn, 1, amqp_cstring_bytes(""), amqp_cstring_bytes(queue_name), 0, 0, &props, message_bytes), "Publishing"); // don't works ! (queue get populated only if consumer up) //die_on_error(amqp_basic_publish(conn, 1, amqp_cstring_bytes("amq.direct"), amqp_cstring_bytes(queue_name), 0, 0, &props, message_bytes), "Publishing"); } // note that "amq.direct" is a special exchange // (The empty exchange name is an alias for amq.direct) //die_on_error(amqp_basic_publish(conn, 1, amqp_cstring_bytes("amq.direct"), amqp_cstring_bytes(queue_name), 0, 0, NULL, message_bytes), "Publishing"); sent++; if (now > next_summary_time) { int countOverInterval = sent - previous_sent; double intervalRate = countOverInterval / ((now - previous_report_time) / 1000000.0); printf("%d ms: Sent %d - %d since last report (%d Hz)\n", (int)(now - start_time) / 1000, sent, countOverInterval, (int) intervalRate); previous_sent = sent; previous_report_time = now; next_summary_time += SUMMARY_EVERY_US; } while (((i * 1000000.0) / (now - start_time)) > rate_limit) { microsleep(2000); now = now_microseconds(); } } { uint64_t stop_time = now_microseconds(); int total_delta = stop_time - start_time; printf("PRODUCER - Message count: %d\n", message_count); printf("Total time, milliseconds: %d\n", total_delta / 1000); printf("Overall messages-per-second: %g\n", (message_count / (total_delta / 1000000.0))); } } //int main(int argc, char const * const *argv) { int main_light(int argc, char const * const *argv) { char const *hostname; int port; int rate_limit; int message_count; int sockfd; amqp_connection_state_t conn; //amqp_bytes_t reply_to_queue; hostname = argv[1]; port = atoi(argv[2]); rate_limit = atoi(argv[3]); message_count = atoi(argv[4]); conn = amqp_new_connection(); die_on_error(sockfd = amqp_open_socket(hostname, port), "Opening socket"); amqp_set_sockfd(conn, sockfd); die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), "Logging in"); amqp_channel_open(conn, 1); die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel"); //JRA /* { amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, 1, amqp_cstring_bytes("test queue"), 0, 1, 0, 0, amqp_empty_table); // durable && no auto-delete die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring queue"); reply_to_queue = amqp_bytes_malloc_dup(r->queue); if (reply_to_queue.bytes == NULL) { fprintf(stderr, "Out of memory while copying queue name"); return 1; } } */ send_batch(conn, "test queue", rate_limit, message_count); // note that "test queue" here is used as the routing key die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel"); die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection"); die_on_error(amqp_destroy_connection(conn), "Ending connection"); return 0; } int read_file(char *f,char **source) { FILE *fp = fopen(f, "r"); if (fp != NULL) { /* Go to the end of the file. */ if (fseek(fp, 0L, SEEK_END) == 0) { /* Get the size of the file. */ long bufsize = ftell(fp); if (bufsize == -1) { /* Error */ } /* Allocate our buffer to that size. */ *source = malloc(sizeof(char) * (bufsize + 1)); /* Go back to the start of the file. */ if (fseek(fp, 0L, SEEK_SET) == 0) { /* Error */ } /* Read the entire file into memory. */ size_t newLen = fread(*source, sizeof(char), bufsize, fp); if (newLen == 0) { fputs("Error reading file\n", stderr); return 1; } else { //source[++newLen] = '\0'; /* Just to be safe. */ } } fclose(fp); } else { fputs("File not found\n", stderr); return 1; } return 0; } int main(int argc, char * const *argv) { char const *hostname; int port; //char const *exchange; //char const *routingkey; char *body_tmp; char const *body; char *body_final = NULL; char *buf = NULL; int c; char *filepath = NULL; int file_flag=0; int sockfd; amqp_connection_state_t conn; //parsing args opterr = 0; while ((c = getopt (argc, argv, "b:ef:h:p:")) != -1) switch (c) { case 'b': body_tmp = optarg; break; case 'e': fprintf(stderr, "Usage: amqp_producer host port\n"); // --- example without config card --- // eyJqb2JpZCI6InRvdG8iLCJjb2RlIjoiMDAwMCJ9Cg => '{"jobid":"toto","code":"0000"}' fprintf(stderr, "./sendAMQPMsg -h localhost -p 5672 -b 'eyJqb2JpZCI6InRvdG8iLCJjb2RlIjoiMDAwMCJ9Cg=='\n"); // --- example with config card --- // eyJqb2JpZCI6InRvdG8iLCJjb2RlIjoiMDAwMCJ9Cg => '{"jobid":"toto","code":"0000","file":""}' fprintf(stderr, "./sendAMQPMsg -h localhost -p 5672 -b 'eyJqb2JpZCI6InRvdG8iLCJjb2RlIjoiMDAwMCJ9Cg==' -f /home/jripsl/snapshot/Monitoring/sample/config.card.base64\n"); // eyJzaW11aWQiOiJmb29iYXIiLCJqb2JpZCI6InRvdG8iLCJjb2RlIjoiMDAwMCJ9Cg== => '{"simuid":"foobar","jobid":"toto","code":"0000","file":""}' fprintf(stderr, "./sendAMQPMsg -h localhost -p 5672 -b 'eyJzaW11aWQiOiJmb29iYXIiLCJqb2JpZCI6InRvdG8iLCJjb2RlIjoiMDAwMCJ9Cg==' -f /home/jripsl/snapshot/Monitoring/sample/config.card.base64\n"); exit(0); case 'f': filepath = optarg; file_flag = 1; break; case 'h': hostname = optarg; break; case 'p': port = atoi(optarg); break; case '?': fprintf (stderr, "ERR001: incorrect argument '-%c'.\n", optopt); exit(EXIT_FAILURE); default: fprintf (stderr, "ERR002: incorrect argument\n"); exit(EXIT_FAILURE); } //retrieve non-option argument /* int index; for (index = optind; index < argc; index++) printf ("Non-option argument %s\n", argv[index]); */ // add checks here // (for example, body_tmp is mandatory) if(file_flag==1) { // retrieve file contents if( access( filepath, F_OK ) != -1 ) { // file exists ; } else { // file doesn't exist fprintf(stderr, "File not found (%s)\n",filepath); exit(EXIT_FAILURE); } int res = read_file(filepath,&buf); if (res != 0) { exit(EXIT_FAILURE); } body_final=malloc(5 + strlen(body_tmp) + 6 + strlen(buf) + 1); //strcpy(body_final,"\0"); strcat(body_final,"body:"); strcat(body_final,body_tmp); strcat(body_final,",file:"); strcat(body_final,buf); body=body_final; //debug //fprintf(stderr, "hostname=%s, port=%d, body=%s, filepath=%s\n", hostname, port, body, filepath); } else { // retrieve msg body from argument body_final=malloc(5 + strlen(body_tmp) + 1); strcat(body_final,"body:"); strcat(body_final,body_tmp); body=body_final; } //exit(0); conn = amqp_new_connection(); die_on_error(sockfd = amqp_open_socket(hostname, port), "Opening socket"); amqp_set_sockfd(conn, sockfd); die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), "Logging in"); amqp_channel_open(conn, 1); die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel"); amqp_exchange_declare(conn, 1, amqp_cstring_bytes("myexchange"), amqp_cstring_bytes("fanout"), 0, 0, amqp_empty_table); die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring exchange"); amqp_queue_declare(conn, 1, amqp_cstring_bytes("myqueue"), 0, 1, 0, 0, amqp_empty_table); // durable && no auto-delete die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring queue"); amqp_queue_bind(conn, 1, amqp_cstring_bytes("myqueue"), amqp_cstring_bytes("myexchange"), amqp_cstring_bytes(""), amqp_empty_table); //no need for binding key as we use the fanout exchange type die_on_amqp_error(amqp_get_rpc_reply(conn), "Binding"); { amqp_basic_properties_t props; props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG; props.content_type = amqp_cstring_bytes("text/plain"); props.delivery_mode = 2; /* persistent delivery mode */ die_on_error(amqp_basic_publish(conn, 1, amqp_cstring_bytes("myexchange"), amqp_cstring_bytes(""), 0, 0, &props, amqp_cstring_bytes(body)), "Publishing"); } die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel"); die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection"); die_on_error(amqp_destroy_connection(conn), "Ending connection"); if(file_flag==1) free(buf); free(body_final); //this is to prevent overwhelming rabbitMQ server (and to prevent triggering rabbitMQ defensive behaviours (e.g. connection blocking)) //usleep( 200000 ); // O.2 second return 0; } // -- not used -- // static void dump_row(long count, int numinrow, int *chs) { int i; printf("%08lX:", count - numinrow); if (numinrow > 0) { for (i = 0; i < numinrow; i++) { if (i == 8) printf(" :"); printf(" %02X", chs[i]); } for (i = numinrow; i < 16; i++) { if (i == 8) printf(" :"); printf(" "); } printf(" "); for (i = 0; i < numinrow; i++) { if (isprint(chs[i])) printf("%c", chs[i]); else printf("."); } } printf("\n"); } static int rows_eq(int *a, int *b) { int i; for (i=0; i<16; i++) if (a[i] != b[i]) return 0; return 1; } void amqp_dump(void const *buffer, size_t len) { unsigned char *buf = (unsigned char *) buffer; long count = 0; int numinrow = 0; int chs[16]; int oldchs[16] = {0}; int showed_dots = 0; size_t i; for (i = 0; i < len; i++) { int ch = buf[i]; if (numinrow == 16) { int i; if (rows_eq(oldchs, chs)) { if (!showed_dots) { showed_dots = 1; printf(" .. .. .. .. .. .. .. .. : .. .. .. .. .. .. .. ..\n"); } } else { showed_dots = 0; dump_row(count, numinrow, chs); } for (i=0; i<16; i++) oldchs[i] = chs[i]; numinrow = 0; } count++; chs[numinrow++] = ch; } dump_row(count, numinrow, chs); if (numinrow != 0) printf("%08lX:\n", count); } /* vi: set et ts=2 sw=2: */