source: trunk/Monitoring/Watch/watch @ 937

Last change on this file since 937 was 937, checked in by jripsl, 11 years ago

Create AMQP queue on supervisor side if not exists already.

  • Property svn:executable set to *
File size: 7.8 KB
RevLine 
[854]1#!/usr/bin/env python
[840]2# -*- coding: ISO-8859-1 -*-
3
4##################################
5#  @program        smon
6#  @description    simulation monitor
7#  @copyright      Copyright “(c)2009 Centre National de la Recherche Scientifique CNRS.
8#                             All Rights Reserved”
9#  @svn_file       $Id: watcher 2545 2013-02-01 09:58:10Z jripsl $
10#  @version        $Rev: 2545 $
11#  @lastrevision   $Date: 2013-02-01 10:58:10 +0100 (Fri, 01 Feb 2013) $
12#  @license        CeCILL (http://dods.ipsl.jussieu.fr/jripsl/smon/LICENSE)
13##################################
14
[854]15import pika
16import base64
17import json
18import sys
[857]19import signal
[854]20import traceback
21import smtplib
22from email.mime.text import MIMEText
[879]23import datetime
[840]24
[865]25# line below is to include "smon" package in the search path
[937]26sys.path.append("/opt/supervisor/Monitoring")
[857]27
[866]28import smon.repo_io as repo_io
29import smon.types
[857]30
[937]31CSTE_BROKER_HOST='cstest-broker.ipsl.jussieu.fr' # cstest
32#CSTE_BROKER_HOST='localhost' # vesg4
[875]33
[859]34class Mail():
[857]35
[840]36        @classmethod
[854]37        def mail_example(cls):
38                me="jripsl@ipsl.jussieu.fr"
39                you="jripsl@ipsl.jussieu.fr"
40                body="Alarm"
41                object="Supervisor"
42
43                cls.mail(me,you,object,body)
44
45        @classmethod
46        def send_mail(cls,me,you,object,body):
47                msg = MIMEText(body)
48                msg['Subject'] = object
49                msg['From'] = me
50                msg['To'] = you
51
52                # Send the message via our own SMTP server, but don't include the # envelope header.
53                s = smtplib.SMTP('localhost')
54                s.sendmail(me,[you], msg.as_string())
55                s.quit()
56
[859]57class Actions():
58
[854]59        @classmethod
[866]60        def store_msg(cls,message):
[859]61
[879]62                try:
[877]63
[879]64                        # the simu exists when we are here (see TAG0001 tag)
65                        s=repo_io.retrieve_simulation(message.simuid)
[877]66
[935]67                        if s is None:
68                                raise Exception("WATCH-ERR102","simulation not found")
69
[879]70                        repo_io.create_message(message,s)
71
72                except:
73                        traceback.print_exc()
74                        raise
75
[859]76        @classmethod
[875]77        def cleanup(cls,message):
78                repo_io.cleanup() # truncate/delete everything
79
80        @classmethod
[866]81        def set_sim_status_to_error(cls,message):
[859]82
[879]83                try:
[875]84
[879]85                        s=repo_io.retrieve_simulation(message.simuid)
[875]86
[879]87                        s.status="error"
[875]88
[879]89                        repo_io.update_simulation_status(s)
90
91                except:
92                        traceback.print_exc()
93                        raise
94
[859]95        @classmethod
[875]96        def set_sim_status_to_complete(cls,message):
97
98                s=repo_io.retrieve_simulation(message.simuid)
99
100                s.status="complete"
101
102                repo_io.update_simulation_status(s)
103
104        @classmethod
[866]105        def crea_sim(cls,message):
106
[935]107                s=repo_io.retrieve_simulation(message.simuid)
[859]108
[935]109                if s is not None:
110                        #repo_io.delete_simulation(name)
[866]111
[935]112                        s.status="running"
113                        repo_io.update_simulation_status(s)
[866]114
[935]115                else:
116                        simulation=smon.types.Simulation(name=message.simuid,status="running")
117
118
119                        repo_io.create_simulation(simulation)
120
[859]121        @classmethod
122        def mail(cls):
123                cls.mail_example()
124
125        @classmethod
[857]126        def print_stdout(cls,message):
127                # used for debug
128
[879]129                """
[875]130                if message.file is not None:
[876]131                        print "%s %s %s %s %s\n"%(message.code,message.jobid,message.command,message.timestamp,message.file)
[857]132                else:
[876]133                        print "%s %s %s %s\n"%(message.code,message.jobid,message.command,message.timestamp)
[879]134                """
[857]135
[879]136                print "%s %s %s %s\n"%(message.code,message.jobid,message.command,message.timestamp)
[935]137                #pass
[879]138
[857]139        @classmethod
[854]140        def log(cls,message):
[937]141                with open("/opt/supervisor/log/supervisor.log", "a") as log_file:
[879]142                        log_file.write("%s %s %s %s %s\n"%(datetime.datetime.now().strftime('%Y%m%d_%H%M%S'), message.code,message.jobid,message.timestamp,message.command))
[854]143
144        @classmethod
145        def execActions(cls,message):
146
[866]147                message_code=message.code
[854]148
[859]149                for action in MessageActionsMapping.mapping[message_code]:
[854]150                        proc_name=action
151
152                        try:
[859]153                                getattr(Actions, proc_name)(message)
[854]154                        except Exception,e:
155                                traceback.print_exc()
156
157                                raise Exception("WATCH-ERR002","procedure error (%s,%s)"%(proc_name,str(e)))
158
[859]159class MessageActionsMapping():
160
[875]161        # debug
[877]162        #
163        # TAG0001: note that crea_sim must be BEFORE store_msg in the list (because when we insert the msg, we need the simu_id)
164        #
165        mapping = { "0000":["crea_sim", "log", "store_msg", "print_stdout"],
[876]166                                "0100":["log", "store_msg", "print_stdout", "set_sim_status_to_complete"],
167                                "1000":["log", "store_msg", "print_stdout"],
168                                "1100":["log", "store_msg", "print_stdout"],
169                                "2000":["log", "store_msg", "print_stdout"],
170                                "3000":["log", "store_msg", "print_stdout"],
[875]171                                "8888":["cleanup"],
[876]172                                "9000":["log", "store_msg", "print_stdout"],
173                                "9999":["log", "store_msg", "print_stdout", "set_sim_status_to_error"] }
[875]174
175        # prod
[877]176        #
177        # TAG0001: note that crea_sim must be BEFORE store_msg in the list (because when we insert the msg, we need the simu_id)
178        #
[875]179        """
[877]180        mapping = { "0000":["crea_sim", "log", "store_msg"],
[875]181                                "0100":["log", "store_msg", "set_sim_status_to_complete"],
[859]182                                "1000":["log", "store_msg"],
[875]183                                "1100":["log", "store_msg"],
[859]184                                "2000":["log", "store_msg"],
185                                "3000":["log", "store_msg"],
[875]186                                "8888":["cleanup"],
[859]187                                "9000":["log", "store_msg", "mail"],
188                                "9999":["log", "store_msg", "set_sim_status_to_error", "mail"] }
[875]189        """
[859]190
191class Watcher():
192
[857]193        @classmethod
[859]194        def start(cls):
[871]195                repo_io.init() # open DB connection
[859]196
197        @classmethod
198        def stop(cls):
[871]199                repo_io.free() # close DB connection
[859]200
201        @classmethod
[857]202        def main(self):
[840]203
[857]204                """
205                # parse args
206                parser = argparse.ArgumentParser(prog='watcher')
207                parser.add_argument('-v', dest='verbose',required=False,action='store_true')
208                args = parser.parse_args()
[840]209
[857]210                # check
211                if not os.path.exists(SMON.smon_home):
212                        sys.exit(1)
[840]213
[857]214                SMON.init_singleton()
215                """
[840]216
[937]217                connection = pika.BlockingConnection(pika.ConnectionParameters(host=CSTE_BROKER_HOST))
[857]218                self.channel = connection.channel()
[840]219
[854]220
[857]221                print ' [*] Waiting for messages. To exit press CTRL+C'
[854]222
[857]223                def callback(ch, method, properties, raw_msg):
[854]224
[866]225                        # msg fmt: body:base64,file:base64 (no JSON here !!!)
[854]226
[866]227
228
229
230                        # first deserialization (no JSON here !!!)
231
232                        fields=raw_msg.split(",")
233
234                        l__tmp_dic={}
235
236                        for field in fields:
237
238                                # debug
[871]239                                #print " [x] Received %s" % field
[866]240
241                                splitted_field=field.split(":")
242
243                                key=splitted_field[0]
244                                val=splitted_field[1]
245
246                                l__tmp_dic[key]=val
247
248
[857]249                        # debug
[871]250                        #print " [x] Received %s (encoded)" % l__tmp_dic["body"]
[866]251
252                       
253                        # base64 decode body
254                        base64_decoded_msg=base64.b64decode(l__tmp_dic["body"])
255
256
257                        # debug
[857]258                        #print " [x] Received %s" % raw_msg
259                        #print " [x] Received %s (uudecoded)" % base64_decoded_msg
[871]260                        #print " [x] Received %s (uudecoded)" % base64_decoded_msg
[854]261
[866]262
[871]263                        # message deserialization
[857]264                        message=None
265                        try:
[866]266                                JSON_msg=json.loads(base64_decoded_msg)
267                                message=smon.types.Message(JSON_msg)      # all JSON object members will be available in smon.types.Message object
[854]268
[879]269                                # non working
270                                #print message.type
[866]271
[879]272                                # working
273                                #print message.code
[866]274
[879]275
276
[871]277                        except Exception,e:
278                                print "ERR009 - exception occurs (exception=%s,msg=%s)"%(str(e),base64_decoded_msg)
[866]279
[875]280                                traceback.print_exc()
281                                raise
[871]282
283
284
[879]285                        # manage config-card file which is attached to the "0000" code message (this file is base64 encoded and need to be unencoded)
[871]286                        #
287                        if "file" in l__tmp_dic:
288
289                                # base64 decode file
290                                base64_decoded_file=base64.b64decode(l__tmp_dic["file"])
291
292                                # add as msg attribute
293                                message.file=base64_decoded_file
294
295
296
297                        # execute actions
298                        try:
[857]299                                # message code based action
[875]300                                Actions.execActions(message)
301
[857]302                        except Exception,e:
[935]303                                print "ERR019 - exception occurs (exception=%s)"%(str(e))
304                                #print "ERR019 - exception occurs (exception=%s,msg=%s)"%(str(e),base64_decoded_msg)
[854]305
[871]306                                traceback.print_exc()
[866]307
308                                raise
309
[871]310
[875]311                        # slow down consumer
312                        #time.sleep(0.5)
[871]313
[937]314                self.channel.queue_declare(queue='myqueue')
[875]315
[857]316                self.channel.basic_consume(callback, queue='myqueue', no_ack=True)
[854]317
[857]318                self.channel.start_consuming()
[840]319
[857]320
321                """
322                SMON.free_singleton()
323                """
324
325def signal_handler(signal, frame):
326                print 'You pressed Ctrl+C!'
327                Watcher.channel.stop_consuming()
[875]328                Watcher.stop()
[857]329                sys.exit(0)
330
[840]331if __name__ == '__main__':
[857]332
333        signal.signal(signal.SIGINT, signal_handler)
334
[840]335        try:
[875]336
337                Watcher.start()
338
[857]339                Watcher.main()
[840]340
[875]341                Watcher.stop()
342
[840]343                sys.exit(0)
344
345        except Exception, e:
346
[875]347                traceback.print_exc()
[840]348
349                sys.exit(1)
Note: See TracBrowser for help on using the repository browser.