Changeset 854 for trunk/Monitoring


Ignore:
Timestamp:
04/26/13 16:57:16 (11 years ago)
Author:
jripsl
Message:

add JSON deserialization, mail notification, logging.

Location:
trunk/Monitoring
Files:
10 added
6 edited

Legend:

Unmodified
Added
Removed
  • trunk/Monitoring/Broker/README

    r852 r854  
    4242                database dir: /var/lib/rabbitmq/mnesia/rabbit 
    4343- run 
    44         - to start the daemon, use command below 
     44        - to start the daemon, use command below as root 
     45                - cd /opt/rabbitmq-server-3.0.2/sbin 
    4546                - ./rabbitmq-server -detached 
    4647        - to stop the daemon, use this 
  • trunk/Monitoring/CNClient/README

    r852 r854  
    11- This program uses rabbitmq-c library (v0.3.0) 
    22        - https://github.com/alanxz/rabbitmq-c 
    3         - https://github.com/alanxz/rabbitmq-c/archive/rabbitmq-c-v0.3.0.zip 
    43- Compilation (static) 
    54        - Library installation 
     
    1110                          so better use installation from source 
    1211                - from source 
    13                         tar xzvf rabbitmq-c-v0.3.0.zip 
    14                         cd rabbitmq-c-v0.3.0 
    15                         autoreconf -i 
    16                         ./configure --enable-static 
    17                         make 
    18                         make install 
     12                        - retrieve source 
     13                                - wget https://github.com/alanxz/rabbitmq-c/archive/rabbitmq-c-v0.3.0.zip -O rabbitmq-c-v0.3.0.zip 
     14                                - unzip rabbitmq-c-v0.3.0.zip 
     15                                - cd rabbitmq-c-v0.3.0 
     16                        - compilation using autoconf (automake v1.9+, and libtool v2.2+) 
     17                                - autoreconf -i 
     18                                        - if error occurs at this step, check requirement below 
     19                                                - we require autotools v2.63 or better to build. I think RHEL5 ships with v2.59 which lacks the AC_PROC_CC_C99 macro. 
     20                                                        - You can do one of two things; 
     21                                                                - Install a newer version of autotools 
     22                                                                - Build using cmake (v2.6 or better). 
     23                                - ./configure --enable-static 
     24                                - make 
     25                                - make install 
     26                        - compilation using cmake (CMake v2.6+) 
     27                                - mkdir build && cd build 
     28                                - cmake -DBUILD_STATIC_LIBS=True -DBUILD_SHARED_LIBS=True .. 
     29                                - cmake --build . 
     30                                - make 
     31                                        - you got error AAA below, it's normal 
     32                                                make[2]: *** Pas de rÚgle pour fabriquer la cible « librabbitmq/librabbitmq.so.1.0.1 », nécessaire pour « examples/amqp_bind ». Arrêt. 
     33                                                make[1]: *** [examples/CMakeFiles/amqp_bind.dir/all] Erreur 2 
     34                                                make: *** [all] Erreur 2 
     35                                - make 
     36                                  (because of the above error (AAA), you need to run make twice) 
     37                                - make install 
    1938        - gcc -static -I/usr/local/include -L/usr/local/lib -Wall -o sendAMQPMsg send_AMQP_msg.c -lrabbitmq 
    2039                - we get warning below during compilation 
     
    2746                        - it means that you may need to be sure all computing node have the same glibc version !!!! 
    2847                                - also means that a different binary must be use in each computing center 
    29 - Run 
    30         - ./sendAMQPMsg localhost 5672 1 10 
     48- usage 
     49        - to send a message in the queue, do 
     50                - ./sendAMQPMsg localhost 5672 test 
     51- note 
     52        - to show how many messages are in the queue, do as root 
     53                - ./rabbitmqctl list_queues 
  • trunk/Monitoring/CNClient/send_AMQP_msg.c

    r852 r854  
    1616 
    1717#define SUMMARY_EVERY_US 1000000 
    18  
    1918 
    2019 
     
    117116    } 
    118117 
    119         // note that "amq.direct" is a special exchange 
    120         // (The empty exchange name is an alias for amq.direct) 
     118          // note that "amq.direct" is a special exchange 
     119          // (The empty exchange name is an alias for amq.direct) 
    121120    //die_on_error(amqp_basic_publish(conn, 1, amqp_cstring_bytes("amq.direct"), amqp_cstring_bytes(queue_name), 0, 0, NULL, message_bytes), "Publishing"); 
    122121 
     
    158157  amqp_connection_state_t conn; 
    159158 
    160   amqp_bytes_t reply_to_queue; 
    161  
    162   if (argc < 5) { 
    163     fprintf(stderr, "Usage: amqp_producer host port rate_limit message_count\n"); 
    164     return 1; 
    165   } 
     159  //amqp_bytes_t reply_to_queue; 
     160 
    166161 
    167162  hostname = argv[1]; 
     
    204199} 
    205200 
    206  
    207 //int main_complex(int argc, char const * const *argv) { 
    208 int main(int argc, char const * const *argv) { 
     201int read_file(char *f,char **source) { 
     202        FILE *fp = fopen(f, "r"); 
     203        if (fp != NULL) { 
     204                /* Go to the end of the file. */ 
     205                if (fseek(fp, 0L, SEEK_END) == 0) { 
     206                        /* Get the size of the file. */ 
     207                        long bufsize = ftell(fp); 
     208                        if (bufsize == -1) { /* Error */ } 
     209 
     210                        /* Allocate our buffer to that size. */ 
     211                        *source = malloc(sizeof(char) * (bufsize + 1)); 
     212 
     213                        /* Go back to the start of the file. */ 
     214                        if (fseek(fp, 0L, SEEK_SET) == 0) { /* Error */ } 
     215 
     216                        /* Read the entire file into memory. */ 
     217                        size_t newLen = fread(*source, sizeof(char), bufsize, fp); 
     218                        if (newLen == 0) { 
     219                                fputs("Error reading file\n", stderr); 
     220        return 1; 
     221                        } else { 
     222                                //source[++newLen] = '\0'; /* Just to be safe. */ 
     223                        } 
     224                } 
     225                fclose(fp); 
     226        } else { 
     227                fputs("File not found\n", stderr); 
     228                return 1; 
     229        } 
     230 
     231  return 0; 
     232} 
     233 
     234int main(int argc, char * const *argv) { 
    209235  char const *hostname; 
    210236  int port; 
    211   char const *exchange; 
    212   char const *routingkey; 
    213   char const *messagebody; 
     237  //char const *exchange; 
     238  //char const *routingkey; 
     239  char *body_tmp; 
     240  char const *body; 
     241  char *body_final = NULL; 
     242  char *buf = NULL; 
     243  int c; 
     244  char *filepath = NULL; 
     245  int file_flag=0; 
    214246 
    215247 
     
    218250  amqp_connection_state_t conn; 
    219251 
    220   if (argc < 4) { 
    221     fprintf(stderr, "Usage: amqp_sendstring host port messagebody\n"); 
    222     return 1; 
    223   } 
    224  
    225   hostname = argv[1]; 
    226   port = atoi(argv[2]); 
    227   //exchange = argv[3]; 
    228   //routingkey = argv[4]; 
    229   messagebody = argv[5]; 
     252  //parsing args 
     253  opterr = 0; 
     254  while ((c = getopt (argc, argv, "b:ef:h:p:")) != -1) 
     255    switch (c) 
     256      { 
     257      case 'b': 
     258        body_tmp = optarg; 
     259        break; 
     260      case 'e': 
     261        fprintf(stderr, "Usage: amqp_producer host port rate_limit message_count\n"); 
     262        fprintf(stderr, "Example 1: ./sendAMQPMsg -h localhost -p 5672 -b '{\"jobid\":\"toto\"}'\n"); 
     263        fprintf(stderr, "Example 2: ./sendAMQPMsg -h localhost -p 5672 -f /home/foobar/config.card -b '{\"jobid\":\"toto\"}'\n"); 
     264        exit(0); 
     265      case 'f': 
     266        filepath = optarg; 
     267        file_flag = 1; 
     268        break; 
     269      case 'h': 
     270        hostname = optarg; 
     271        break; 
     272      case 'p': 
     273        port = atoi(optarg); 
     274        break; 
     275      case '?': 
     276        fprintf (stderr, "ERR001: incorrect argument '-%c'.\n", optopt); 
     277        exit(EXIT_FAILURE); 
     278      default: 
     279        fprintf (stderr, "ERR002: incorrect argument\n"); 
     280        exit(EXIT_FAILURE); 
     281      } 
     282 
     283  //retrieve non-option argument 
     284  /* 
     285  int index; 
     286  for (index = optind; index < argc; index++) 
     287    printf ("Non-option argument %s\n", argv[index]); 
     288  */ 
     289 
     290  // add checks here 
     291  // (for example, body_tmp is mandatory) 
     292 
     293  if(file_flag==1) { 
     294    // retrieve file contents 
     295 
     296    if( access( filepath, F_OK ) != -1 ) { 
     297        // file exists 
     298         
     299        ; 
     300    } else { 
     301      // file doesn't exist 
     302 
     303      fprintf(stderr, "File not found (%s)\n",filepath); 
     304 
     305      exit(EXIT_FAILURE); 
     306    } 
     307 
     308    int res = read_file(filepath,&buf); 
     309    if (res != 0) { 
     310      exit(EXIT_FAILURE); 
     311    } 
     312 
     313    body_final=malloc(strlen(body_tmp) + 6 + strlen(buf) + 1); 
     314    //strcpy(body_final,"\0"); 
     315    strcat(body_final,body_tmp); 
     316    strcat(body_final,",file="); 
     317    strcat(body_final,buf); 
     318 
     319    body=body_final; 
     320 
     321    //debug 
     322    //fprintf(stderr, "hostname=%s, port=%d, body=%s, filepath=%s\n", hostname, port, body, filepath); 
     323 
     324  } else { 
     325    // retrieve msg body from argument 
     326 
     327    body=body_tmp; 
     328 
     329  } 
     330 
     331  //exit(0); 
    230332 
    231333  conn = amqp_new_connection(); 
     
    237339  die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel"); 
    238340 
    239  
    240  
    241  
    242   //JRA 
    243341  amqp_exchange_declare(conn, 1, amqp_cstring_bytes("myexchange"), amqp_cstring_bytes("fanout"), 0, 0, amqp_empty_table); 
    244342  die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring exchange"); 
    245343 
    246   //JRA 
    247   amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, 1, amqp_cstring_bytes("myqueue"), 0, 1, 0, 0, amqp_empty_table); // durable && no auto-delete 
     344  amqp_queue_declare(conn, 1, amqp_cstring_bytes("myqueue"), 0, 1, 0, 0, amqp_empty_table); // durable && no auto-delete 
    248345  die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring queue"); 
    249346 
    250347  amqp_queue_bind(conn, 1, amqp_cstring_bytes("myqueue"), amqp_cstring_bytes("myexchange"), amqp_cstring_bytes(""), amqp_empty_table); //no need for binding key as we use the fanout exchange type 
    251348  die_on_amqp_error(amqp_get_rpc_reply(conn), "Binding"); 
    252  
    253  
    254349 
    255350  {  
     
    258353    props.content_type = amqp_cstring_bytes("text/plain"); 
    259354    props.delivery_mode = 2; /* persistent delivery mode */ 
    260     die_on_error(amqp_basic_publish(conn, 1, amqp_cstring_bytes("myexchange"), amqp_cstring_bytes(""), 0, 0, &props, amqp_cstring_bytes(messagebody)), "Publishing"); 
     355    die_on_error(amqp_basic_publish(conn, 1, amqp_cstring_bytes("myexchange"), amqp_cstring_bytes(""), 0, 0, &props, amqp_cstring_bytes(body)), "Publishing"); 
    261356  } 
    262357 
     
    264359  die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection"); 
    265360  die_on_error(amqp_destroy_connection(conn), "Ending connection"); 
     361 
     362  if(file_flag==1) 
     363    free(buf); 
     364 
    266365  return 0; 
    267366} 
     
    347446} 
    348447 
     448/* vi: set et ts=2 sw=2: */ 
  • trunk/Monitoring/Watch/watch

    r841 r854  
    1 #!/usr/bin/python -u 
     1#!/usr/bin/env python 
    22# -*- coding: ISO-8859-1 -*- 
    33 
     
    1313################################## 
    1414 
    15 from smon import dao 
     15#from smon import dao 
     16import pika 
     17import base64 
     18import json 
     19import sys 
     20import traceback 
     21import smtplib 
     22from email.mime.text import MIMEText 
    1623 
    1724class Watcher(): 
     25        message_code_action_mapping = {"0000":["log","mail"],"1000":["log"],"2000":["log"],"3000":["log"],"9000":["log"],"9999":["log","mail"]} 
    1826 
    1927        @classmethod 
     
    2331        @classmethod 
    2432        def start(cls): 
    25                 dao.insert_progress_messages(cls.get_fake_progress_messages()) 
     33                #dao.insert_progress_messages(cls.get_fake_progress_messages()) 
     34                pass 
    2635 
    2736        @classmethod 
    2837        def stop(cls): 
    2938                pass 
    30   
     39 
     40        @classmethod 
     41        def add(cls,message): 
     42                pass 
     43 
     44        @classmethod 
     45        def mail_example(cls): 
     46                me="jripsl@ipsl.jussieu.fr" 
     47                you="jripsl@ipsl.jussieu.fr" 
     48                body="Alarm" 
     49                object="Supervisor" 
     50 
     51                cls.mail(me,you,object,body) 
     52 
     53        @classmethod 
     54        def mail(cls): 
     55                cls.mail_example() 
     56 
     57        @classmethod 
     58        def send_mail(cls,me,you,object,body): 
     59                msg = MIMEText(body) 
     60                msg['Subject'] = object 
     61                msg['From'] = me 
     62                msg['To'] = you 
     63 
     64                # Send the message via our own SMTP server, but don't include the # envelope header. 
     65                s = smtplib.SMTP('localhost') 
     66                s.sendmail(me,[you], msg.as_string()) 
     67                s.quit() 
     68 
     69        @classmethod 
     70        def log(cls,message): 
     71 
     72                with open("/home/jripsl/supervisor/log/supervisor.log", "a") as log_file: 
     73                        log_file.write("%s %s\n"%(message["code"],message["jobid"])) 
     74 
     75        @classmethod 
     76        def execActions(cls,message): 
     77 
     78                message_code=message["code"] 
     79 
     80                for action in cls.message_code_action_mapping[message_code]: 
     81                        proc_name=action 
     82 
     83                        try: 
     84                                getattr(cls, proc_name)(message) 
     85                        except Exception,e: 
     86                                traceback.print_exc() 
     87 
     88                                raise Exception("WATCH-ERR002","procedure error (%s,%s)"%(proc_name,str(e))) 
     89 
    3190def main(): 
    3291 
     92        """ 
    3393        # parse args 
    3494        parser = argparse.ArgumentParser(prog='watcher') 
     
    41101 
    42102        SMON.init_singleton() 
     103        """ 
     104 
     105        connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) 
     106        channel = connection.channel() 
     107 
     108        #channel.queue_declare(queue='myqueue') 
     109 
     110        print ' [*] Waiting for messages. To exit press CTRL+C' 
     111 
     112        def callback(ch, method, properties, raw_msg): 
     113                # 
     114                #return 
     115 
     116                base64_decoded_msg=base64.b64decode(raw_msg) 
     117 
     118                # debug 
     119                #print " [x] Received %s" % raw_msg 
     120                #print " [x] Received %s (uudecoded)" % base64_decoded_msg  
     121 
     122                try: 
     123                        message=json.loads(base64_decoded_msg) 
     124                except Exception,e: 
     125                        print base64_decoded_msg 
     126                 
     127                # message code based action 
     128                Watcher.execActions(message) 
     129 
     130        channel.basic_consume(callback, queue='myqueue', no_ack=True) 
     131 
     132        channel.start_consuming() 
    43133 
    44134 
    45  
     135        """ 
    46136        SMON.free_singleton() 
     137        """ 
    47138 
    48139if __name__ == '__main__': 
     
    54145        except Exception, e: 
    55146 
    56                 #traceback.print_exc() 
     147                traceback.print_exc() 
    57148 
    58149                sys.exit(1) 
  • trunk/Monitoring/script/libIGCM_mock.sh

    r842 r854  
    1212################################## 
    1313 
    14 stack_file="../sample/stack_light" 
     14stack_file=$1 
     15send_msg="/home/jripsl/snapshot/Monitoring/CNClient/sendAMQPMsg" 
     16 
     17if [ $# -lt 1 ]; then 
     18        echo "Usage $0 ../sample/stack_light" 
     19        exit 1 
     20fi 
    1521 
    1622IFS=$'\n' 
    1723for line in $(cat $stack_file); do 
    18         echo $line | awk -F" " '{print $4}' 
     24        #echo $line | awk -F" " '{print $4}' 
     25        callname=$(echo $line | awk -F" " '{print $4}' ) 
     26        $send_msg localhost 5672 string "$callname" 
    1927done 
  • trunk/Monitoring/test/README

    r852 r854  
    1 test_twisted.py   consumer/producer test showing how to use RabbitMQ, Twisted and txamqp together 
    2 recv_AMQP_msg.c   basic consumer test 
     1test_twisted.py    consumer/producer test showing how to use RabbitMQ, Twisted and txamqp together 
     2recv_AMQP_msg.c    basic consumer test 
     3librabbitmq-python consumer/producer using librabbitmq-Python library 
Note: See TracChangeset for help on using the changeset viewer.