Changeset 852 for trunk/Monitoring


Ignore:
Timestamp:
04/23/13 20:26:50 (11 years ago)
Author:
jripsl
Message:
  • AMQP C-client implementation.
Location:
trunk/Monitoring
Files:
3 added
3 edited

Legend:

Unmodified
Added
Removed
  • trunk/Monitoring/CNClient/README

    r851 r852  
    55        - Library installation 
    66                - using system package 
    7                         - aptitude install librabbitmq0 
    8                         - aptitude install librabbitmq-dev 
     7                        - install using commands below 
     8                                - aptitude install librabbitmq0 
     9                                - aptitude install librabbitmq-dev 
     10                        - note that it is likely that system packages versions are out of date (we need v0.3.0) 
     11                          so better use installation from source 
    912                - from source 
    1013                        tar xzvf rabbitmq-c-v0.3.0.zip 
    1114                        cd rabbitmq-c-v0.3.0 
    1215                        autoreconf -i 
    13                         ./configure 
     16                        ./configure --enable-static 
    1417                        make 
    1518                        make install 
    16         - gcc -I/usr/local/include -L/usr/local/lib -Wall send_AMQP_msg.c -lrabbitmq -o sendAMQPMsg 
     19        - gcc -static -I/usr/local/include -L/usr/local/lib -Wall -o sendAMQPMsg send_AMQP_msg.c -lrabbitmq 
     20                - we get warning below during compilation 
     21                        -  
     22                          <--- 
     23                          /usr/local/lib/librabbitmq.a(librabbitmq_librabbitmq_la-amqp_socket.o): In function `amqp_open_socket': 
     24                          rabbitmq-c-rabbitmq-c-v0.3.0/librabbitmq/amqp_socket.c:66: warning: Using 'getaddrinfo' in statically  
     25                          linked applications requires at runtime the shared libraries from the glibc version used for linking 
     26                          ---> 
     27                        - it means that you may need to be sure all computing node have the same glibc version !!!! 
     28                                - also means that a different binary must be use in each computing center 
    1729- Run 
    1830        - ./sendAMQPMsg localhost 5672 1 10 
  • trunk/Monitoring/CNClient/send_AMQP_msg.c

    r851 r852  
    104104    uint64_t now = now_microseconds(); 
    105105 
    106     die_on_error(amqp_basic_publish(conn, 1, amqp_cstring_bytes("amq.direct"), amqp_cstring_bytes(queue_name), 0, 0, NULL, message_bytes), "Publishing"); 
     106    {  
     107      amqp_basic_properties_t props; 
     108      props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG; 
     109      props.content_type = amqp_cstring_bytes("text/plain"); 
     110      props.delivery_mode = 2; /* persistent delivery mode */ 
     111 
     112      // works ! (queue get populated even if no consumer) 
     113      die_on_error(amqp_basic_publish(conn, 1, amqp_cstring_bytes(""), amqp_cstring_bytes(queue_name), 0, 0, &props, message_bytes), "Publishing"); 
     114 
     115      // don't works ! (queue get populated only if consumer up) 
     116      //die_on_error(amqp_basic_publish(conn, 1, amqp_cstring_bytes("amq.direct"), amqp_cstring_bytes(queue_name), 0, 0, &props, message_bytes), "Publishing"); 
     117    } 
     118 
     119        // note that "amq.direct" is a special exchange 
     120        // (The empty exchange name is an alias for amq.direct) 
     121    //die_on_error(amqp_basic_publish(conn, 1, amqp_cstring_bytes("amq.direct"), amqp_cstring_bytes(queue_name), 0, 0, NULL, message_bytes), "Publishing"); 
     122 
     123 
    107124    sent++; 
    108125    if (now > next_summary_time) { 
     
    132149} 
    133150 
    134 int main(int argc, char const * const *argv) { 
     151//int main(int argc, char const * const *argv) { 
     152int main_light(int argc, char const * const *argv) { 
    135153  char const *hostname; 
    136154  int port; 
     
    140158  amqp_connection_state_t conn; 
    141159 
     160  amqp_bytes_t reply_to_queue; 
     161 
    142162  if (argc < 5) { 
    143163    fprintf(stderr, "Usage: amqp_producer host port rate_limit message_count\n"); 
     
    162182  die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel"); 
    163183 
    164   send_batch(conn, "test queue", rate_limit, message_count); 
     184  //JRA 
     185  /* 
     186  { 
     187    amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, 1, amqp_cstring_bytes("test queue"), 0, 1, 0, 0, amqp_empty_table); // durable && no auto-delete 
     188    die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring queue"); 
     189    reply_to_queue = amqp_bytes_malloc_dup(r->queue); 
     190    if (reply_to_queue.bytes == NULL) { 
     191      fprintf(stderr, "Out of memory while copying queue name"); 
     192      return 1; 
     193    } 
     194  } 
     195  */ 
     196 
     197  send_batch(conn, "test queue", rate_limit, message_count); // note that "test queue" here is used as the routing key 
    165198 
    166199  die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel"); 
     
    170203  return 0; 
    171204} 
     205 
     206 
     207//int main_complex(int argc, char const * const *argv) { 
     208int main(int argc, char const * const *argv) { 
     209  char const *hostname; 
     210  int port; 
     211  char const *exchange; 
     212  char const *routingkey; 
     213  char const *messagebody; 
     214 
     215 
     216   
     217  int sockfd; 
     218  amqp_connection_state_t conn; 
     219 
     220  if (argc < 4) { 
     221    fprintf(stderr, "Usage: amqp_sendstring host port messagebody\n"); 
     222    return 1; 
     223  } 
     224 
     225  hostname = argv[1]; 
     226  port = atoi(argv[2]); 
     227  //exchange = argv[3]; 
     228  //routingkey = argv[4]; 
     229  messagebody = argv[5]; 
     230 
     231  conn = amqp_new_connection(); 
     232 
     233  die_on_error(sockfd = amqp_open_socket(hostname, port), "Opening socket"); 
     234  amqp_set_sockfd(conn, sockfd); 
     235  die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), "Logging in"); 
     236  amqp_channel_open(conn, 1); 
     237  die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel"); 
     238 
     239 
     240 
     241 
     242  //JRA 
     243  amqp_exchange_declare(conn, 1, amqp_cstring_bytes("myexchange"), amqp_cstring_bytes("fanout"), 0, 0, amqp_empty_table); 
     244  die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring exchange"); 
     245 
     246  //JRA 
     247  amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, 1, amqp_cstring_bytes("myqueue"), 0, 1, 0, 0, amqp_empty_table); // durable && no auto-delete 
     248  die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring queue"); 
     249 
     250  amqp_queue_bind(conn, 1, amqp_cstring_bytes("myqueue"), amqp_cstring_bytes("myexchange"), amqp_cstring_bytes(""), amqp_empty_table); //no need for binding key as we use the fanout exchange type 
     251  die_on_amqp_error(amqp_get_rpc_reply(conn), "Binding"); 
     252 
     253 
     254 
     255  {  
     256    amqp_basic_properties_t props; 
     257    props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG; 
     258    props.content_type = amqp_cstring_bytes("text/plain"); 
     259    props.delivery_mode = 2; /* persistent delivery mode */ 
     260    die_on_error(amqp_basic_publish(conn, 1, amqp_cstring_bytes("myexchange"), amqp_cstring_bytes(""), 0, 0, &props, amqp_cstring_bytes(messagebody)), "Publishing"); 
     261  } 
     262 
     263  die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel"); 
     264  die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection"); 
     265  die_on_error(amqp_destroy_connection(conn), "Ending connection"); 
     266  return 0; 
     267} 
     268 
     269// -- not used -- // 
     270 
     271static void dump_row(long count, int numinrow, int *chs) { 
     272  int i; 
     273 
     274  printf("%08lX:", count - numinrow); 
     275 
     276  if (numinrow > 0) { 
     277    for (i = 0; i < numinrow; i++) { 
     278      if (i == 8) 
     279    printf(" :"); 
     280      printf(" %02X", chs[i]); 
     281    } 
     282    for (i = numinrow; i < 16; i++) { 
     283      if (i == 8) 
     284    printf(" :"); 
     285      printf("   "); 
     286    } 
     287    printf("  "); 
     288    for (i = 0; i < numinrow; i++) { 
     289      if (isprint(chs[i])) 
     290    printf("%c", chs[i]); 
     291      else 
     292    printf("."); 
     293    } 
     294  } 
     295  printf("\n"); 
     296} 
     297 
     298static int rows_eq(int *a, int *b) { 
     299  int i; 
     300 
     301  for (i=0; i<16; i++) 
     302    if (a[i] != b[i]) 
     303      return 0; 
     304 
     305  return 1; 
     306} 
     307 
     308void amqp_dump(void const *buffer, size_t len) { 
     309  unsigned char *buf = (unsigned char *) buffer; 
     310  long count = 0; 
     311  int numinrow = 0; 
     312  int chs[16]; 
     313  int oldchs[16] = {0}; 
     314  int showed_dots = 0; 
     315  size_t i; 
     316 
     317  for (i = 0; i < len; i++) { 
     318    int ch = buf[i]; 
     319 
     320    if (numinrow == 16) { 
     321      int i; 
     322 
     323      if (rows_eq(oldchs, chs)) { 
     324    if (!showed_dots) { 
     325      showed_dots = 1; 
     326      printf("          .. .. .. .. .. .. .. .. : .. .. .. .. .. .. .. ..\n"); 
     327    }  
     328      } else { 
     329    showed_dots = 0; 
     330    dump_row(count, numinrow, chs); 
     331      } 
     332 
     333      for (i=0; i<16; i++) 
     334    oldchs[i] = chs[i]; 
     335 
     336      numinrow = 0; 
     337    } 
     338 
     339    count++; 
     340    chs[numinrow++] = ch; 
     341  } 
     342 
     343  dump_row(count, numinrow, chs); 
     344 
     345  if (numinrow != 0) 
     346    printf("%08lX:\n", count); 
     347} 
     348 
  • trunk/Monitoring/test/README

    r851 r852  
    11test_twisted.py   consumer/producer test showing how to use RabbitMQ, Twisted and txamqp together 
     2recv_AMQP_msg.c   basic consumer test 
Note: See TracChangeset for help on using the changeset viewer.