source: trunk/Monitoring/Watch/watch @ 935

Last change on this file since 935 was 935, checked in by jripsl, 11 years ago
  • fix typo.
  • Property svn:executable set to *
File size: 7.7 KB
Line 
1#!/usr/bin/env python
2# -*- coding: ISO-8859-1 -*-
3
4##################################
5#  @program        smon
6#  @description    simulation monitor
7#  @copyright      Copyright “(c)2009 Centre National de la Recherche Scientifique CNRS.
8#                             All Rights Reserved”
9#  @svn_file       $Id: watcher 2545 2013-02-01 09:58:10Z jripsl $
10#  @version        $Rev: 2545 $
11#  @lastrevision   $Date: 2013-02-01 10:58:10 +0100 (Fri, 01 Feb 2013) $
12#  @license        CeCILL (http://dods.ipsl.jussieu.fr/jripsl/smon/LICENSE)
13##################################
14
15import pika
16import base64
17import json
18import sys
19import signal
20import traceback
21import smtplib
22from email.mime.text import MIMEText
23import datetime
24
25# line below is to include "smon" package in the search path
26sys.path.append("/home/jripsl/snapshot/Monitoring")
27
28import smon.repo_io as repo_io
29import smon.types
30
31
32class Mail():
33
34        @classmethod
35        def mail_example(cls):
36                me="jripsl@ipsl.jussieu.fr"
37                you="jripsl@ipsl.jussieu.fr"
38                body="Alarm"
39                object="Supervisor"
40
41                cls.mail(me,you,object,body)
42
43        @classmethod
44        def send_mail(cls,me,you,object,body):
45                msg = MIMEText(body)
46                msg['Subject'] = object
47                msg['From'] = me
48                msg['To'] = you
49
50                # Send the message via our own SMTP server, but don't include the # envelope header.
51                s = smtplib.SMTP('localhost')
52                s.sendmail(me,[you], msg.as_string())
53                s.quit()
54
55class Actions():
56
57        @classmethod
58        def store_msg(cls,message):
59
60                try:
61
62                        # the simu exists when we are here (see TAG0001 tag)
63                        s=repo_io.retrieve_simulation(message.simuid)
64
65                        if s is None:
66                                raise Exception("WATCH-ERR102","simulation not found")
67
68                        repo_io.create_message(message,s)
69
70                except:
71                        traceback.print_exc()
72                        raise
73
74        @classmethod
75        def cleanup(cls,message):
76                repo_io.cleanup() # truncate/delete everything
77
78        @classmethod
79        def set_sim_status_to_error(cls,message):
80
81                try:
82
83                        s=repo_io.retrieve_simulation(message.simuid)
84
85                        s.status="error"
86
87                        repo_io.update_simulation_status(s)
88
89                except:
90                        traceback.print_exc()
91                        raise
92
93        @classmethod
94        def set_sim_status_to_complete(cls,message):
95
96                s=repo_io.retrieve_simulation(message.simuid)
97
98                s.status="complete"
99
100                repo_io.update_simulation_status(s)
101
102        @classmethod
103        def crea_sim(cls,message):
104
105                s=repo_io.retrieve_simulation(message.simuid)
106
107                if s is not None:
108                        #repo_io.delete_simulation(name)
109
110                        s.status="running"
111                        repo_io.update_simulation_status(s)
112
113                else:
114                        simulation=smon.types.Simulation(name=message.simuid,status="running")
115
116
117                        repo_io.create_simulation(simulation)
118
119        @classmethod
120        def mail(cls):
121                cls.mail_example()
122
123        @classmethod
124        def print_stdout(cls,message):
125                # used for debug
126
127                """
128                if message.file is not None:
129                        print "%s %s %s %s %s\n"%(message.code,message.jobid,message.command,message.timestamp,message.file)
130                else:
131                        print "%s %s %s %s\n"%(message.code,message.jobid,message.command,message.timestamp)
132                """
133
134                print "%s %s %s %s\n"%(message.code,message.jobid,message.command,message.timestamp)
135                #pass
136
137        @classmethod
138        def log(cls,message):
139                with open("/home/jripsl/supervisor/log/supervisor.log", "a") as log_file:
140                        log_file.write("%s %s %s %s %s\n"%(datetime.datetime.now().strftime('%Y%m%d_%H%M%S'), message.code,message.jobid,message.timestamp,message.command))
141
142        @classmethod
143        def execActions(cls,message):
144
145                message_code=message.code
146
147                for action in MessageActionsMapping.mapping[message_code]:
148                        proc_name=action
149
150                        try:
151                                getattr(Actions, proc_name)(message)
152                        except Exception,e:
153                                traceback.print_exc()
154
155                                raise Exception("WATCH-ERR002","procedure error (%s,%s)"%(proc_name,str(e)))
156
157class MessageActionsMapping():
158
159        # debug
160        #
161        # TAG0001: note that crea_sim must be BEFORE store_msg in the list (because when we insert the msg, we need the simu_id)
162        #
163        mapping = { "0000":["crea_sim", "log", "store_msg", "print_stdout"],
164                                "0100":["log", "store_msg", "print_stdout", "set_sim_status_to_complete"],
165                                "1000":["log", "store_msg", "print_stdout"],
166                                "1100":["log", "store_msg", "print_stdout"],
167                                "2000":["log", "store_msg", "print_stdout"],
168                                "3000":["log", "store_msg", "print_stdout"],
169                                "8888":["cleanup"],
170                                "9000":["log", "store_msg", "print_stdout"],
171                                "9999":["log", "store_msg", "print_stdout", "set_sim_status_to_error"] }
172
173        # prod
174        #
175        # TAG0001: note that crea_sim must be BEFORE store_msg in the list (because when we insert the msg, we need the simu_id)
176        #
177        """
178        mapping = { "0000":["crea_sim", "log", "store_msg"],
179                                "0100":["log", "store_msg", "set_sim_status_to_complete"],
180                                "1000":["log", "store_msg"],
181                                "1100":["log", "store_msg"],
182                                "2000":["log", "store_msg"],
183                                "3000":["log", "store_msg"],
184                                "8888":["cleanup"],
185                                "9000":["log", "store_msg", "mail"],
186                                "9999":["log", "store_msg", "set_sim_status_to_error", "mail"] }
187        """
188
189class Watcher():
190
191        @classmethod
192        def start(cls):
193                repo_io.init() # open DB connection
194
195        @classmethod
196        def stop(cls):
197                repo_io.free() # close DB connection
198
199        @classmethod
200        def main(self):
201
202                """
203                # parse args
204                parser = argparse.ArgumentParser(prog='watcher')
205                parser.add_argument('-v', dest='verbose',required=False,action='store_true')
206                args = parser.parse_args()
207
208                # check
209                if not os.path.exists(SMON.smon_home):
210                        sys.exit(1)
211
212                SMON.init_singleton()
213                """
214
215                connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost'))
216                self.channel = connection.channel()
217
218                #self.channel.queue_declare(queue='myqueue')
219
220                print ' [*] Waiting for messages. To exit press CTRL+C'
221
222                def callback(ch, method, properties, raw_msg):
223
224                        # msg fmt: body:base64,file:base64 (no JSON here !!!)
225
226
227
228
229                        # first deserialization (no JSON here !!!)
230
231                        fields=raw_msg.split(",")
232
233                        l__tmp_dic={}
234
235                        for field in fields:
236
237                                # debug
238                                #print " [x] Received %s" % field
239
240                                splitted_field=field.split(":")
241
242                                key=splitted_field[0]
243                                val=splitted_field[1]
244
245                                l__tmp_dic[key]=val
246
247
248                        # debug
249                        #print " [x] Received %s (encoded)" % l__tmp_dic["body"]
250
251                       
252                        # base64 decode body
253                        base64_decoded_msg=base64.b64decode(l__tmp_dic["body"])
254
255
256                        # debug
257                        #print " [x] Received %s" % raw_msg
258                        #print " [x] Received %s (uudecoded)" % base64_decoded_msg
259                        #print " [x] Received %s (uudecoded)" % base64_decoded_msg
260
261
262                        # message deserialization
263                        message=None
264                        try:
265                                JSON_msg=json.loads(base64_decoded_msg)
266                                message=smon.types.Message(JSON_msg)      # all JSON object members will be available in smon.types.Message object
267
268                                # non working
269                                #print message.type
270
271                                # working
272                                #print message.code
273
274
275
276                        except Exception,e:
277                                print "ERR009 - exception occurs (exception=%s,msg=%s)"%(str(e),base64_decoded_msg)
278
279                                traceback.print_exc()
280                                raise
281
282
283
284                        # manage config-card file which is attached to the "0000" code message (this file is base64 encoded and need to be unencoded)
285                        #
286                        if "file" in l__tmp_dic:
287
288                                # base64 decode file
289                                base64_decoded_file=base64.b64decode(l__tmp_dic["file"])
290
291                                # add as msg attribute
292                                message.file=base64_decoded_file
293
294
295
296                        # execute actions
297                        try:
298                                # message code based action
299                                Actions.execActions(message)
300
301                        except Exception,e:
302                                print "ERR019 - exception occurs (exception=%s)"%(str(e))
303                                #print "ERR019 - exception occurs (exception=%s,msg=%s)"%(str(e),base64_decoded_msg)
304
305                                traceback.print_exc()
306
307                                raise
308
309
310                        # slow down consumer
311                        #time.sleep(0.5)
312
313
314                self.channel.basic_consume(callback, queue='myqueue', no_ack=True)
315
316                self.channel.start_consuming()
317
318
319                """
320                SMON.free_singleton()
321                """
322
323def signal_handler(signal, frame):
324                print 'You pressed Ctrl+C!'
325                Watcher.channel.stop_consuming()
326                Watcher.stop()
327                sys.exit(0)
328
329if __name__ == '__main__':
330
331        signal.signal(signal.SIGINT, signal_handler)
332
333        try:
334
335                Watcher.start()
336
337                Watcher.main()
338
339                Watcher.stop()
340
341                sys.exit(0)
342
343        except Exception, e:
344
345                traceback.print_exc()
346
347                sys.exit(1)
Note: See TracBrowser for help on using the repository browser.