source: trunk/Monitoring/Watch/watch @ 974

Last change on this file since 974 was 974, checked in by jripsl, 10 years ago

Add configuration file.

  • Property svn:executable set to *
File size: 8.7 KB
Line 
1#!/usr/bin/env python
2# -*- coding: ISO-8859-1 -*-
3
4##################################
5#  @program        smon
6#  @description    simulation monitor
7#  @copyright      Copyright “(c)2009 Centre National de la Recherche Scientifique CNRS.
8#                             All Rights Reserved”
9#  @svn_file       $Id: watcher 2545 2013-02-01 09:58:10Z jripsl $
10#  @version        $Rev: 2545 $
11#  @lastrevision   $Date: 2013-02-01 10:58:10 +0100 (Fri, 01 Feb 2013) $
12#  @license        CeCILL (http://dods.ipsl.jussieu.fr/jripsl/smon/LICENSE)
13##################################
14
15import pika
16import base64
17import json
18import sys
19import signal
20import smtplib
21from email.mime.text import MIMEText
22import datetime
23import logging
24import ConfigParser
25
26# line below is to include "smon" package in the search path
27sys.path.append("/opt/supervisor/Monitoring")
28
29import smon.repo_io as repo_io
30import smon.types
31
32CSTE_BROKER_HOST='cstest-broker.ipsl.jussieu.fr' # cstest
33#CSTE_BROKER_HOST='localhost' # vesg4
34CSTE_LOG_DIR='/var/log/cssupervisor'
35CSTE_LOG_FILENAME_MAIN='supervisor.log'
36CSTE_LOG_FILENAME_DEBUG='debug.log'
37CSTE_LOG_FILENAME_MSG='message.log' # log AMQP msgs
38CSTE_LOG_FILE_MAIN="%s/%s"%(CSTE_LOG_DIR,CSTE_LOG_FILENAME_MAIN)
39CSTE_LOG_FILE_MSG="%s/%s"%(CSTE_LOG_DIR,CSTE_LOG_FILENAME_MSG)
40
41# read ini file
42config = ConfigParser.ConfigParser()
43config.read("/opt/supervisor/Monitoring/Watch/watch.ini")
44g__user = config.get("main","user")
45g__password = config.get("main","password")
46
47def create_logger(name,fullpath_filename):
48
49        # new logger instance
50        logger = logging.getLogger(name)
51        logger.setLevel(logging.DEBUG)
52
53        # create associated file
54        fh = logging.FileHandler(fullpath_filename)
55        fh.setLevel(logging.DEBUG)
56
57        # create formatter
58        formatter = logging.Formatter('%(asctime)-15s - %(message)s')
59        fh.setFormatter(formatter)
60
61        # binding
62        logger.addHandler(fh)
63
64        return logger
65
66
67
68
69# loggers init.
70logger=create_logger(CSTE_LOG_FILENAME_MAIN,CSTE_LOG_FILE_MAIN)
71msg_logger=create_logger(CSTE_LOG_FILENAME_MSG,CSTE_LOG_FILE_MSG)
72
73class Mail():
74
75        @classmethod
76        def mail_example(cls):
77                me="jripsl@ipsl.jussieu.fr"
78                you="jripsl@ipsl.jussieu.fr"
79                body="Alarm"
80                object="Supervisor"
81
82                cls.mail(me,you,object,body)
83
84        @classmethod
85        def send_mail(cls,me,you,object,body):
86                msg = MIMEText(body)
87                msg['Subject'] = object
88                msg['From'] = me
89                msg['To'] = you
90
91                # Send the message via our own SMTP server, but don't include the # envelope header.
92                s = smtplib.SMTP('localhost')
93                s.sendmail(me,[you], msg.as_string())
94                s.quit()
95
96class Actions():
97
98        @classmethod
99        def store_msg(cls,message):
100
101                try:
102
103                        # the simu exists when we are here (see TAG0001 tag)
104                        s=repo_io.retrieve_simulation(message.simuid)
105
106                        if s is None:
107                                raise Exception("WATCH-ERR102","simulation not found")
108
109                        repo_io.create_message(message,s)
110
111                except:
112                        raise
113
114        @classmethod
115        def cleanup(cls,message):
116                repo_io.cleanup() # truncate/delete everything
117
118        @classmethod
119        def set_sim_status_to_error(cls,message):
120
121                try:
122
123                        s=repo_io.retrieve_simulation(message.simuid)
124
125                        s.status="error"
126
127                        repo_io.update_simulation_status(s)
128
129                except:
130                        raise
131
132        @classmethod
133        def set_sim_status_to_complete(cls,message):
134
135                s=repo_io.retrieve_simulation(message.simuid)
136
137                s.status="complete"
138
139                repo_io.update_simulation_status(s)
140
141        @classmethod
142        def crea_sim(cls,message):
143
144                s=repo_io.retrieve_simulation(message.simuid)
145
146                if s is not None:
147                        #repo_io.delete_simulation(name)
148
149                        s.status="running"
150                        repo_io.update_simulation_status(s)
151
152                else:
153                        simulation=smon.types.Simulation(name=message.simuid,status="running")
154
155                        repo_io.create_simulation(simulation)
156
157        @classmethod
158        def mail(cls):
159                cls.mail_example()
160
161        @classmethod
162        def log_debug(cls,line):
163                cls.log(CSTE_LOG_FILENAME_DEBUG,line)
164
165        @classmethod
166        def log(cls,filename,line):
167                with open("%s/%s"%(CSTE_LOG_DIR,filename), "a") as log_file:
168                        log_file.write("%s %s\n"%(datetime.datetime.now().strftime('%Y%m%d_%H%M%S'), line))
169
170        @classmethod
171        def log_msg(cls,message):
172                line="%s %s %s %s"%(message.code,message.jobid,message.timestamp,message.command)
173                msg_logger.info(line)
174
175                """
176                if message.file is not None:
177                        "%s %s %s %s %s\n"%(message.code,message.jobid,message.command,message.timestamp,message.file)
178                else:
179                "%s %s %s %s\n"%(message.code,message.jobid,message.command,message.timestamp)
180                """
181
182        @classmethod
183        def execActions(cls,message):
184
185                message_code=message.code
186
187                for action in MessageActionsMapping.mapping[message_code]:
188                        proc_name=action
189
190                        try:
191                                getattr(Actions, proc_name)(message)
192                        except Exception,e:
193                                logger.exception("ERR909 - exception occurs")
194
195                                raise Exception("WATCH-ERR002","procedure error (%s)"%(proc_name,))
196
197class MessageActionsMapping():
198
199        # TAG0001: note that crea_sim must be BEFORE store_msg in the list (because when we insert the msg, we need the simu_id)
200        #
201        mapping = { "0000":["crea_sim", "log_msg", "store_msg"],
202                                "0100":["log_msg", "store_msg", "set_sim_status_to_complete"],
203                                "1000":["log_msg", "store_msg"],
204                                "1100":["log_msg", "store_msg"],
205                                "2000":["log_msg", "store_msg"],
206                                "3000":["log_msg", "store_msg"],
207                                "7000":["log_msg"],
208                                "8888":["cleanup"],
209                                "9000":["log_msg", "store_msg"],
210                                "9999":["log_msg", "store_msg", "set_sim_status_to_error"] }
211
212        # prod (mail added for some action)
213        """
214                                "9000":["log_msg", "store_msg", "mail"],
215                                "9999":["log_msg", "store_msg", "set_sim_status_to_error", "mail"] }
216        """
217
218class Watcher():
219
220        @classmethod
221        def start(cls):
222                repo_io.init() # open DB connection
223
224        @classmethod
225        def stop(cls):
226                repo_io.free() # close DB connection
227
228        @classmethod
229        def main(self):
230
231                """
232                # parse args
233                parser = argparse.ArgumentParser(prog='watcher')
234                parser.add_argument('-v', dest='verbose',required=False,action='store_true')
235                args = parser.parse_args()
236
237                # check
238                if not os.path.exists(SMON.smon_home):
239                        sys.exit(1)
240
241                SMON.init_singleton()
242                """
243
244                credentials = pika.PlainCredentials(g__user, g__password)
245                connection = pika.BlockingConnection(pika.ConnectionParameters(host=CSTE_BROKER_HOST,credentials=credentials,virtual_host="/prodiguer"))
246                self.channel = connection.channel()
247
248                logger.info("[*] Waiting for messages")
249
250                def callback(ch, method, properties, raw_msg):
251
252                        # msg fmt: body:base64,file:base64 (no JSON here !!!)
253
254
255
256
257                        # first deserialization (no JSON here !!!)
258
259                        fields=raw_msg.split(",")
260
261                        l__tmp_dic={}
262
263                        for field in fields:
264
265                                # debug
266                                #logger.debug(" [x] Received %s"%field)
267
268                                splitted_field=field.split(":")
269
270                                key=splitted_field[0]
271                                val=splitted_field[1]
272
273                                l__tmp_dic[key]=val
274
275
276                        # debug
277                        #logger.debug(" [x] Received %s (encoded)" % l__tmp_dic["body"])
278
279                       
280                        # base64 decode body
281                        base64_decoded_msg=base64.b64decode(l__tmp_dic["body"])
282
283
284                        # debug
285                        #logger.debug(" [x] Received %s" % raw_msg)
286                        #logger.debug(" [x] Received %s (uudecoded)" % base64_decoded_msg )
287                        #logger.debug(" [x] Received %s (uudecoded)" % base64_decoded_msg )
288
289
290                        # message deserialization
291                        message=None
292                        try:
293                                JSON_msg=json.loads(base64_decoded_msg)
294                                message=smon.types.Message(JSON_msg)      # all JSON object members will be available in smon.types.Message object
295
296                                # non working
297                                #logger.debug("DEB003 - %s"%message.type)
298
299                                # working
300                                #logger.debug("DEB009 - %s"%message.code)
301
302
303
304                        except Exception,e:
305
306                                logger.exception("ERR009 - exception occurs")
307
308                                Actions.log_debug("DEB021 - %s"%base64_decoded_msg) 
309
310                                raise
311
312
313
314                        # manage config-card file which is attached to the "0000" code message (this file is base64 encoded and need to be unencoded)
315                        #
316                        if "file" in l__tmp_dic:
317
318                                # base64 decode file
319                                base64_decoded_file=base64.b64decode(l__tmp_dic["file"])
320
321                                # add as msg attribute
322                                message.file=base64_decoded_file
323
324
325
326                        # execute actions
327                        try:
328                                # message code based action
329                                Actions.execActions(message)
330
331
332                                self.channel.basic_ack(delivery_tag = method.delivery_tag) # "msg acknowledgment" stuff
333
334
335                        except Exception,e:
336                                logger.exception("ERR019 - exception occurs")
337
338                                Actions.log_debug("DEB020 - %s"%base64_decoded_msg) 
339
340                                raise
341
342
343                        # slow down consumer
344                        #time.sleep(0.5)
345
346                self.channel.queue_declare(queue='myqueue',durable=True)
347
348                self.channel.basic_consume(callback, queue='myqueue') # do not set "no_ack=True" anymore here so to enable "msg acknowledgment" (i.e. no_ack default is false)
349
350                self.channel.start_consuming()
351
352
353                """
354                SMON.free_singleton()
355                """
356
357def signal_handler(signal, frame):
358                logger.info("TERM signal received: exiting.")
359                Watcher.channel.stop_consuming()
360                Watcher.stop()
361                sys.exit(0)
362
363if __name__ == '__main__':
364
365        signal.signal(signal.SIGTERM, signal_handler)
366        signal.signal(signal.SIGINT, signal_handler)
367
368        try:
369
370                Watcher.start()
371
372                Watcher.main()
373
374                Watcher.stop()
375
376                sys.exit(0)
377
378        except Exception, e:
379
380                logger.exception("ERR904 - exception occurred")
381
382                sys.exit(1)
383# vim: set ts=4 sw=4 :
Note: See TracBrowser for help on using the repository browser.