source: trunk/Monitoring/Watch/watch @ 871

Last change on this file since 871 was 871, checked in by jripsl, 11 years ago

Improve "config-card" file handling.
Use explicite function for "repo_io" module initialization

  • Property svn:executable set to *
File size: 5.6 KB
Line 
1#!/usr/bin/env python
2# -*- coding: ISO-8859-1 -*-
3
4##################################
5#  @program        smon
6#  @description    simulation monitor
7#  @copyright      Copyright “(c)2009 Centre National de la Recherche Scientifique CNRS.
8#                             All Rights Reserved”
9#  @svn_file       $Id: watcher 2545 2013-02-01 09:58:10Z jripsl $
10#  @version        $Rev: 2545 $
11#  @lastrevision   $Date: 2013-02-01 10:58:10 +0100 (Fri, 01 Feb 2013) $
12#  @license        CeCILL (http://dods.ipsl.jussieu.fr/jripsl/smon/LICENSE)
13##################################
14
15import pika
16import base64
17import json
18import sys
19import signal
20import traceback
21import smtplib
22from email.mime.text import MIMEText
23from datetime import datetime
24
25# line below is to include "smon" package in the search path
26sys.path.append("/home/jripsl/snapshot/Monitoring")
27
28import smon.repo_io as repo_io
29import smon.types
30
31class Mail():
32
33        @classmethod
34        def mail_example(cls):
35                me="jripsl@ipsl.jussieu.fr"
36                you="jripsl@ipsl.jussieu.fr"
37                body="Alarm"
38                object="Supervisor"
39
40                cls.mail(me,you,object,body)
41
42        @classmethod
43        def send_mail(cls,me,you,object,body):
44                msg = MIMEText(body)
45                msg['Subject'] = object
46                msg['From'] = me
47                msg['To'] = you
48
49                # Send the message via our own SMTP server, but don't include the # envelope header.
50                s = smtplib.SMTP('localhost')
51                s.sendmail(me,[you], msg.as_string())
52                s.quit()
53
54class Actions():
55
56        @classmethod
57        def store_msg(cls,message):
58                repo_io.create_message(message)
59
60        @classmethod
61        def set_sim_status_to_error(cls,message):
62                repo_io.update_simulation_status()
63
64        @classmethod
65        def crea_sim(cls,message):
66
67                #repo_io.retrieve_simulation(name)
68                #repo_io.delete_simulation(name)
69
70                simulation=smon.types.Simulation(name=message.simuid)
71
72                repo_io.create_simulation(simulation)
73
74        @classmethod
75        def mail(cls):
76                cls.mail_example()
77
78        @classmethod
79        def print_stdout(cls,message):
80                # used for debug
81
82                if "file" in message:
83                        print "%s %s %s\n"%(message.code,message.jobid,message.file)
84                else:
85                        print "%s %s\n"%(message.code,message.jobid)
86
87        @classmethod
88        def log(cls,message):
89                with open("/home/jripsl/supervisor/log/supervisor.log", "a") as log_file:
90                        log_file.write("%s %s %s\n"%(datetime.now().strftime('%Y%m%d_%H%M%S'), message.code,message.jobid))
91
92        @classmethod
93        def execActions(cls,message):
94
95                message_code=message.code
96
97                for action in MessageActionsMapping.mapping[message_code]:
98                        proc_name=action
99
100                        try:
101                                getattr(Actions, proc_name)(message)
102                        except Exception,e:
103                                traceback.print_exc()
104
105                                raise Exception("WATCH-ERR002","procedure error (%s,%s)"%(proc_name,str(e)))
106
107class MessageActionsMapping():
108
109        mapping = { "0000":["log", "store_msg", "crea_sim"],
110                                "1000":["log", "store_msg"],
111                                "2000":["log", "store_msg"],
112                                "3000":["log", "store_msg"],
113                                "9000":["log", "store_msg", "mail"],
114                                "9999":["log", "store_msg", "set_sim_status_to_error", "mail"] }
115
116class Watcher():
117
118        @classmethod
119        def start(cls):
120                repo_io.init() # open DB connection
121
122        @classmethod
123        def stop(cls):
124                repo_io.free() # close DB connection
125
126        @classmethod
127        def main(self):
128
129                """
130                # parse args
131                parser = argparse.ArgumentParser(prog='watcher')
132                parser.add_argument('-v', dest='verbose',required=False,action='store_true')
133                args = parser.parse_args()
134
135                # check
136                if not os.path.exists(SMON.smon_home):
137                        sys.exit(1)
138
139                SMON.init_singleton()
140                """
141
142                connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost'))
143                self.channel = connection.channel()
144
145                #self.channel.queue_declare(queue='myqueue')
146
147                print ' [*] Waiting for messages. To exit press CTRL+C'
148
149                def callback(ch, method, properties, raw_msg):
150
151                        # msg fmt: body:base64,file:base64 (no JSON here !!!)
152
153
154
155
156                        # first deserialization (no JSON here !!!)
157
158                        fields=raw_msg.split(",")
159
160                        l__tmp_dic={}
161
162                        for field in fields:
163
164                                # debug
165                                #print " [x] Received %s" % field
166
167                                splitted_field=field.split(":")
168
169                                key=splitted_field[0]
170                                val=splitted_field[1]
171
172                                l__tmp_dic[key]=val
173
174
175                        # debug
176                        #print " [x] Received %s (encoded)" % l__tmp_dic["body"]
177
178                       
179                        # base64 decode body
180                        base64_decoded_msg=base64.b64decode(l__tmp_dic["body"])
181
182
183                        # debug
184                        #print " [x] Received %s" % raw_msg
185                        #print " [x] Received %s (uudecoded)" % base64_decoded_msg
186                        #print " [x] Received %s (uudecoded)" % base64_decoded_msg
187
188
189                        # message deserialization
190                        message=None
191                        try:
192                                JSON_msg=json.loads(base64_decoded_msg)
193                                message=smon.types.Message(JSON_msg)      # all JSON object members will be available in smon.types.Message object
194
195
196
197                        except Exception,e:
198                                print "ERR009 - exception occurs (exception=%s,msg=%s)"%(str(e),base64_decoded_msg)
199
200                                #traceback.print_exc()
201                                #raise
202
203
204
205                        # manage config-card file which is attached to the "0000" type message (this file is base64 encoded and need to be unencoded)
206                        #
207                        if "file" in l__tmp_dic:
208
209                                # base64 decode file
210                                base64_decoded_file=base64.b64decode(l__tmp_dic["file"])
211
212                                # add as msg attribute
213                                message.file=base64_decoded_file
214
215
216
217                        # execute actions
218                        try:
219                                # message code based action
220                                #Actions.execActions(message)
221                                pass
222                        except Exception,e:
223                                print "ERR019 - exception occurs (exception=%s,msg=%s)"%(str(e),base64_decoded_msg)
224
225                                traceback.print_exc()
226
227                                raise
228
229
230
231                self.channel.basic_consume(callback, queue='myqueue', no_ack=True)
232
233                self.channel.start_consuming()
234
235
236                """
237                SMON.free_singleton()
238                """
239
240def signal_handler(signal, frame):
241                print 'You pressed Ctrl+C!'
242                Watcher.channel.stop_consuming()
243                sys.exit(0)
244
245if __name__ == '__main__':
246
247        signal.signal(signal.SIGINT, signal_handler)
248
249        try:
250                Watcher.main()
251
252                sys.exit(0)
253
254        except Exception, e:
255
256                #traceback.print_exc()
257
258                sys.exit(1)
Note: See TracBrowser for help on using the repository browser.