Changeset 852 for trunk/Monitoring
- Timestamp:
- 04/23/13 20:26:50 (11 years ago)
- Location:
- trunk/Monitoring
- Files:
-
- 3 added
- 3 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/Monitoring/CNClient/README
r851 r852 5 5 - Library installation 6 6 - using system package 7 - aptitude install librabbitmq0 8 - aptitude install librabbitmq-dev 7 - install using commands below 8 - aptitude install librabbitmq0 9 - aptitude install librabbitmq-dev 10 - note that it is likely that system packages versions are out of date (we need v0.3.0) 11 so better use installation from source 9 12 - from source 10 13 tar xzvf rabbitmq-c-v0.3.0.zip 11 14 cd rabbitmq-c-v0.3.0 12 15 autoreconf -i 13 ./configure 16 ./configure --enable-static 14 17 make 15 18 make install 16 - gcc -I/usr/local/include -L/usr/local/lib -Wall send_AMQP_msg.c -lrabbitmq -o sendAMQPMsg 19 - gcc -static -I/usr/local/include -L/usr/local/lib -Wall -o sendAMQPMsg send_AMQP_msg.c -lrabbitmq 20 - we get warning below during compilation 21 - 22 <--- 23 /usr/local/lib/librabbitmq.a(librabbitmq_librabbitmq_la-amqp_socket.o): In function `amqp_open_socket': 24 rabbitmq-c-rabbitmq-c-v0.3.0/librabbitmq/amqp_socket.c:66: warning: Using 'getaddrinfo' in statically 25 linked applications requires at runtime the shared libraries from the glibc version used for linking 26 ---> 27 - it means that you may need to be sure all computing node have the same glibc version !!!! 28 - also means that a different binary must be use in each computing center 17 29 - Run 18 30 - ./sendAMQPMsg localhost 5672 1 10 -
trunk/Monitoring/CNClient/send_AMQP_msg.c
r851 r852 104 104 uint64_t now = now_microseconds(); 105 105 106 die_on_error(amqp_basic_publish(conn, 1, amqp_cstring_bytes("amq.direct"), amqp_cstring_bytes(queue_name), 0, 0, NULL, message_bytes), "Publishing"); 106 { 107 amqp_basic_properties_t props; 108 props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG; 109 props.content_type = amqp_cstring_bytes("text/plain"); 110 props.delivery_mode = 2; /* persistent delivery mode */ 111 112 // works ! (queue get populated even if no consumer) 113 die_on_error(amqp_basic_publish(conn, 1, amqp_cstring_bytes(""), amqp_cstring_bytes(queue_name), 0, 0, &props, message_bytes), "Publishing"); 114 115 // don't works ! (queue get populated only if consumer up) 116 //die_on_error(amqp_basic_publish(conn, 1, amqp_cstring_bytes("amq.direct"), amqp_cstring_bytes(queue_name), 0, 0, &props, message_bytes), "Publishing"); 117 } 118 119 // note that "amq.direct" is a special exchange 120 // (The empty exchange name is an alias for amq.direct) 121 //die_on_error(amqp_basic_publish(conn, 1, amqp_cstring_bytes("amq.direct"), amqp_cstring_bytes(queue_name), 0, 0, NULL, message_bytes), "Publishing"); 122 123 107 124 sent++; 108 125 if (now > next_summary_time) { … … 132 149 } 133 150 134 int main(int argc, char const * const *argv) { 151 //int main(int argc, char const * const *argv) { 152 int main_light(int argc, char const * const *argv) { 135 153 char const *hostname; 136 154 int port; … … 140 158 amqp_connection_state_t conn; 141 159 160 amqp_bytes_t reply_to_queue; 161 142 162 if (argc < 5) { 143 163 fprintf(stderr, "Usage: amqp_producer host port rate_limit message_count\n"); … … 162 182 die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel"); 163 183 164 send_batch(conn, "test queue", rate_limit, message_count); 184 //JRA 185 /* 186 { 187 amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, 1, amqp_cstring_bytes("test queue"), 0, 1, 0, 0, amqp_empty_table); // durable && no auto-delete 188 die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring queue"); 189 reply_to_queue = amqp_bytes_malloc_dup(r->queue); 190 if (reply_to_queue.bytes == NULL) { 191 fprintf(stderr, "Out of memory while copying queue name"); 192 return 1; 193 } 194 } 195 */ 196 197 send_batch(conn, "test queue", rate_limit, message_count); // note that "test queue" here is used as the routing key 165 198 166 199 die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel"); … … 170 203 return 0; 171 204 } 205 206 207 //int main_complex(int argc, char const * const *argv) { 208 int main(int argc, char const * const *argv) { 209 char const *hostname; 210 int port; 211 char const *exchange; 212 char const *routingkey; 213 char const *messagebody; 214 215 216 217 int sockfd; 218 amqp_connection_state_t conn; 219 220 if (argc < 4) { 221 fprintf(stderr, "Usage: amqp_sendstring host port messagebody\n"); 222 return 1; 223 } 224 225 hostname = argv[1]; 226 port = atoi(argv[2]); 227 //exchange = argv[3]; 228 //routingkey = argv[4]; 229 messagebody = argv[5]; 230 231 conn = amqp_new_connection(); 232 233 die_on_error(sockfd = amqp_open_socket(hostname, port), "Opening socket"); 234 amqp_set_sockfd(conn, sockfd); 235 die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), "Logging in"); 236 amqp_channel_open(conn, 1); 237 die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel"); 238 239 240 241 242 //JRA 243 amqp_exchange_declare(conn, 1, amqp_cstring_bytes("myexchange"), amqp_cstring_bytes("fanout"), 0, 0, amqp_empty_table); 244 die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring exchange"); 245 246 //JRA 247 amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, 1, amqp_cstring_bytes("myqueue"), 0, 1, 0, 0, amqp_empty_table); // durable && no auto-delete 248 die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring queue"); 249 250 amqp_queue_bind(conn, 1, amqp_cstring_bytes("myqueue"), amqp_cstring_bytes("myexchange"), amqp_cstring_bytes(""), amqp_empty_table); //no need for binding key as we use the fanout exchange type 251 die_on_amqp_error(amqp_get_rpc_reply(conn), "Binding"); 252 253 254 255 { 256 amqp_basic_properties_t props; 257 props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG; 258 props.content_type = amqp_cstring_bytes("text/plain"); 259 props.delivery_mode = 2; /* persistent delivery mode */ 260 die_on_error(amqp_basic_publish(conn, 1, amqp_cstring_bytes("myexchange"), amqp_cstring_bytes(""), 0, 0, &props, amqp_cstring_bytes(messagebody)), "Publishing"); 261 } 262 263 die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel"); 264 die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection"); 265 die_on_error(amqp_destroy_connection(conn), "Ending connection"); 266 return 0; 267 } 268 269 // -- not used -- // 270 271 static void dump_row(long count, int numinrow, int *chs) { 272 int i; 273 274 printf("%08lX:", count - numinrow); 275 276 if (numinrow > 0) { 277 for (i = 0; i < numinrow; i++) { 278 if (i == 8) 279 printf(" :"); 280 printf(" %02X", chs[i]); 281 } 282 for (i = numinrow; i < 16; i++) { 283 if (i == 8) 284 printf(" :"); 285 printf(" "); 286 } 287 printf(" "); 288 for (i = 0; i < numinrow; i++) { 289 if (isprint(chs[i])) 290 printf("%c", chs[i]); 291 else 292 printf("."); 293 } 294 } 295 printf("\n"); 296 } 297 298 static int rows_eq(int *a, int *b) { 299 int i; 300 301 for (i=0; i<16; i++) 302 if (a[i] != b[i]) 303 return 0; 304 305 return 1; 306 } 307 308 void amqp_dump(void const *buffer, size_t len) { 309 unsigned char *buf = (unsigned char *) buffer; 310 long count = 0; 311 int numinrow = 0; 312 int chs[16]; 313 int oldchs[16] = {0}; 314 int showed_dots = 0; 315 size_t i; 316 317 for (i = 0; i < len; i++) { 318 int ch = buf[i]; 319 320 if (numinrow == 16) { 321 int i; 322 323 if (rows_eq(oldchs, chs)) { 324 if (!showed_dots) { 325 showed_dots = 1; 326 printf(" .. .. .. .. .. .. .. .. : .. .. .. .. .. .. .. ..\n"); 327 } 328 } else { 329 showed_dots = 0; 330 dump_row(count, numinrow, chs); 331 } 332 333 for (i=0; i<16; i++) 334 oldchs[i] = chs[i]; 335 336 numinrow = 0; 337 } 338 339 count++; 340 chs[numinrow++] = ch; 341 } 342 343 dump_row(count, numinrow, chs); 344 345 if (numinrow != 0) 346 printf("%08lX:\n", count); 347 } 348 -
trunk/Monitoring/test/README
r851 r852 1 1 test_twisted.py consumer/producer test showing how to use RabbitMQ, Twisted and txamqp together 2 recv_AMQP_msg.c basic consumer test
Note: See TracChangeset
for help on using the changeset viewer.