source: trunk/Monitoring/Watch/watch @ 866

Last change on this file since 866 was 866, checked in by jripsl, 11 years ago

Add demo scenarios.

  • Property svn:executable set to *
File size: 5.4 KB
Line 
1#!/usr/bin/env python
2# -*- coding: ISO-8859-1 -*-
3
4##################################
5#  @program        smon
6#  @description    simulation monitor
7#  @copyright      Copyright “(c)2009 Centre National de la Recherche Scientifique CNRS.
8#                             All Rights Reserved”
9#  @svn_file       $Id: watcher 2545 2013-02-01 09:58:10Z jripsl $
10#  @version        $Rev: 2545 $
11#  @lastrevision   $Date: 2013-02-01 10:58:10 +0100 (Fri, 01 Feb 2013) $
12#  @license        CeCILL (http://dods.ipsl.jussieu.fr/jripsl/smon/LICENSE)
13##################################
14
15import pika
16import base64
17import json
18import sys
19import signal
20import traceback
21import smtplib
22from email.mime.text import MIMEText
23from datetime import datetime
24
25# line below is to include "smon" package in the search path
26sys.path.append("/home/jripsl/snapshot/Monitoring")
27
28import smon.repo_io as repo_io
29import smon.types
30
31"""
32Code list reminder
33
340000 (la simulation démarre)
351000 (le job d'une simulation démarre)
362000 (PushStack)
373000 (PopStack OK)
389000 (PopStack NOK)
399999 (FATAL)
40"""
41
42class Mail():
43
44        @classmethod
45        def mail_example(cls):
46                me="jripsl@ipsl.jussieu.fr"
47                you="jripsl@ipsl.jussieu.fr"
48                body="Alarm"
49                object="Supervisor"
50
51                cls.mail(me,you,object,body)
52
53        @classmethod
54        def send_mail(cls,me,you,object,body):
55                msg = MIMEText(body)
56                msg['Subject'] = object
57                msg['From'] = me
58                msg['To'] = you
59
60                # Send the message via our own SMTP server, but don't include the # envelope header.
61                s = smtplib.SMTP('localhost')
62                s.sendmail(me,[you], msg.as_string())
63                s.quit()
64
65class Actions():
66
67        @classmethod
68        def store_msg(cls,message):
69                repo_io.create_message(message)
70
71        @classmethod
72        def set_sim_status_to_error(cls,message):
73                repo_io.update_simulation_status()
74
75        @classmethod
76        def crea_sim(cls,message):
77
78                #repo_io.retrieve_simulation(name)
79                #repo_io.delete_simulation(name)
80
81                simulation=smon.types.Simulation(name=message.simuid)
82
83                repo_io.create_simulation(simulation)
84
85        @classmethod
86        def mail(cls):
87                cls.mail_example()
88
89        @classmethod
90        def print_stdout(cls,message):
91                # used for debug
92
93                if "file" in message:
94                        print "%s %s %s\n"%(message.code,message.jobid,message.file)
95                else:
96                        print "%s %s\n"%(message.code,message.jobid)
97
98        @classmethod
99        def log(cls,message):
100                with open("/home/jripsl/supervisor/log/supervisor.log", "a") as log_file:
101                        log_file.write("%s %s %s\n"%(datetime.now().strftime('%Y%m%d_%H%M%S'), message.code,message.jobid))
102
103        @classmethod
104        def execActions(cls,message):
105
106                message_code=message.code
107
108                for action in MessageActionsMapping.mapping[message_code]:
109                        proc_name=action
110
111                        try:
112                                getattr(Actions, proc_name)(message)
113                        except Exception,e:
114                                traceback.print_exc()
115
116                                raise Exception("WATCH-ERR002","procedure error (%s,%s)"%(proc_name,str(e)))
117
118class MessageActionsMapping():
119
120        mapping = { "0000":["log", "store_msg", "crea_sim"],
121                                "1000":["log", "store_msg"],
122                                "2000":["log", "store_msg"],
123                                "3000":["log", "store_msg"],
124                                "9000":["log", "store_msg", "mail"],
125                                "9999":["log", "store_msg", "set_sim_status_to_error", "mail"] }
126
127class Watcher():
128
129        @classmethod
130        def start(cls):
131                pass
132
133        @classmethod
134        def stop(cls):
135                pass
136
137        @classmethod
138        def main(self):
139
140                """
141                # parse args
142                parser = argparse.ArgumentParser(prog='watcher')
143                parser.add_argument('-v', dest='verbose',required=False,action='store_true')
144                args = parser.parse_args()
145
146                # check
147                if not os.path.exists(SMON.smon_home):
148                        sys.exit(1)
149
150                SMON.init_singleton()
151                """
152
153                connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost'))
154                self.channel = connection.channel()
155
156                #self.channel.queue_declare(queue='myqueue')
157
158                print ' [*] Waiting for messages. To exit press CTRL+C'
159
160                def callback(ch, method, properties, raw_msg):
161
162                        # msg fmt: body:base64,file:base64 (no JSON here !!!)
163
164
165
166
167                        # first deserialization (no JSON here !!!)
168
169                        fields=raw_msg.split(",")
170
171                        l__tmp_dic={}
172
173                        for field in fields:
174
175                                # debug
176                                print " [x] Received %s" % field
177
178                                splitted_field=field.split(":")
179
180                                key=splitted_field[0]
181                                val=splitted_field[1]
182
183                                l__tmp_dic[key]=val
184
185
186                        # debug
187                        print " [x] Received %s (encoded)" % l__tmp_dic["body"]
188
189                       
190                        # base64 decode body
191                        base64_decoded_msg=base64.b64decode(l__tmp_dic["body"])
192
193
194                        # debug
195                        #print " [x] Received %s" % raw_msg
196                        print " [x] Received %s (uudecoded)" % base64_decoded_msg
197                        #print " [x] Received %s (uudecoded)" % base64_decoded_msg
198
199
200                        message=None
201                        try:
202                                # body deserialization
203                                JSON_msg=json.loads(base64_decoded_msg)
204                                message=smon.types.Message(JSON_msg)      # all JSON object members will be available in smon.types.Message object
205
206                                if "file" in l__tmp_dic:
207
208                                        # base64 decode file
209                                        base64_decoded_file=base64.b64decode(l__tmp_dic["file"])
210
211                                        # add into msg
212                                        message.file=base64_decoded_file
213
214                                # message code based action
215                                Actions.execActions(message)
216
217                        except Exception,e:
218                                print "Exception occurs (exception=%s,msg=%s)"%(str(e),base64_decoded_msg)
219
220                                #traceback.print_exc()
221
222                                raise
223
224                self.channel.basic_consume(callback, queue='myqueue', no_ack=True)
225
226                self.channel.start_consuming()
227
228
229                """
230                SMON.free_singleton()
231                """
232
233def signal_handler(signal, frame):
234                print 'You pressed Ctrl+C!'
235                Watcher.channel.stop_consuming()
236                sys.exit(0)
237
238if __name__ == '__main__':
239
240        signal.signal(signal.SIGINT, signal_handler)
241
242        try:
243                Watcher.main()
244
245                sys.exit(0)
246
247        except Exception, e:
248
249                #traceback.print_exc()
250
251                sys.exit(1)
Note: See TracBrowser for help on using the repository browser.