Changeset 879 for trunk/Monitoring


Ignore:
Timestamp:
06/13/13 07:08:13 (11 years ago)
Author:
jripsl
Message:

Fix heartbeat test.

Location:
trunk/Monitoring
Files:
5 edited

Legend:

Unmodified
Added
Removed
  • trunk/Monitoring/Analyze/analyze

    r877 r879  
    1818import smtplib 
    1919from email.mime.text import MIMEText 
    20 import time; 
    21 from datetime import datetime 
     20import time 
     21import datetime 
    2222 
    2323 
     
    3131 
    3232class CheckList(): 
    33         max_time_between_msg=10 # unit => seconds 
    34  
    35         @classmethod 
    36         def datetime_to_epoch(cls,datetime): 
    37                 epoch = time.mktime(time.strptime(datetime, "%d.%m.%Y %H:%M:%S")).time(); # assuming datetime format is "29.08.2011 11:05:02" 
    38                 return epoch 
     33        max_time_between_msg=20 # unit => seconds 
    3934 
    4035        @classmethod 
    4136        def msg_timeout(cls,message): 
    4237 
    43                 msg_time=cls.datetime_to_epoch(message.timestamp) 
    44                 current_time=time.time() 
     38                # get current epoch 
     39                current_epoch=time.time() 
    4540 
    46                 diff=current_time-msg_time 
     41                # get msg epoch 
     42                msg_time=time.strptime(str(message.crea_date), "%Y-%m-%d %H:%M:%S.%f") 
     43                msg_epoch=time.mktime(msg_time) 
     44 
     45                diff =  current_epoch - msg_epoch 
    4746 
    4847                # debug 
    49                 print "diff=%s"%diff 
     48                #print "cur=%i,ms=%s"%(current_epoch,message.crea_date) 
     49 
     50                # debug 
     51                #print "diff=%i"%int(diff) 
    5052 
    5153                if diff>cls.max_time_between_msg: 
     
    6870                for simulation in repo_io.get_running_simulations(): 
    6971 
    70                         print "checking heartbeat for '%s'"%simulation.name 
     72                        print "hhh" 
     73 
     74                        print "checking heartbeat ('%s')"%simulation.name 
    7175 
    7276                        try: 
    73  
    7477                                message=repo_io.retrieve_last_message(simulation) 
    7578 
     79                                # debug 
     80                                #print "found" 
     81 
    7682                        except types.MessageNotFoundException, e: 
     83                                # when we are here, it mean we are in the interval when a new simulation have just been inserted but the corresponding message have not been inserted yet 
     84 
     85                                print "no message found for simulation ('%s')"%simulation.name 
    7786 
    7887                                continue 
    7988 
    8089 
    81                         if msg_timeout(message): 
     90                        if cls.msg_timeout(message): 
    8291 
    8392                                simulation.status="error" 
     
    8594                                repo_io.update_simulation_status(simulation) 
    8695 
    87                                 print "heartbeat NOK (simulation status set to 'error')"%simulation.name 
     96                                print "heartbeat NOK - simulation status set to 'error' (%s)"%simulation.name 
    8897 
    8998 
    9099                        else: 
    91                                 print "heartbeat OK"%simulation.name 
     100                                print "heartbeat OK (%s)"%simulation.name 
    92101 
    93102class Analyzer(): 
     
    128137 
    129138 
    130                         time.sleep(1) 
     139                        time.sleep(3) 
    131140 
    132141                """ 
  • trunk/Monitoring/Watch/watch

    r877 r879  
    2121import smtplib 
    2222from email.mime.text import MIMEText 
    23 from datetime import datetime 
     23import datetime 
    2424 
    2525# line below is to include "smon" package in the search path 
     
    5858        def store_msg(cls,message): 
    5959 
    60                 # the simu exists when we are here (see TAG0001 tag) 
    61                 s=repo_io.retrieve_simulation(message.simuid) 
    62  
    63                 repo_io.create_message(message,s) 
     60                try: 
     61 
     62                        # the simu exists when we are here (see TAG0001 tag) 
     63                        s=repo_io.retrieve_simulation(message.simuid) 
     64 
     65                        repo_io.create_message(message,s) 
     66 
     67                        sys.exit() 
     68 
     69                except: 
     70                        traceback.print_exc() 
     71                        raise 
    6472 
    6573        @classmethod 
     
    7078        def set_sim_status_to_error(cls,message): 
    7179 
    72                 s=repo_io.retrieve_simulation(message.simuid) 
    73  
    74                 s.status="error" 
    75  
    76                 repo_io.update_simulation_status(s) 
     80                try: 
     81 
     82                        s=repo_io.retrieve_simulation(message.simuid) 
     83 
     84                        s.status="error" 
     85 
     86                        repo_io.update_simulation_status(s) 
     87 
     88                except: 
     89                        traceback.print_exc() 
     90                        raise 
    7791 
    7892        @classmethod 
     
    103117                # used for debug 
    104118 
     119                """ 
    105120                if message.file is not None: 
    106121                        print "%s %s %s %s %s\n"%(message.code,message.jobid,message.command,message.timestamp,message.file) 
    107122                else: 
    108123                        print "%s %s %s %s\n"%(message.code,message.jobid,message.command,message.timestamp) 
     124                """ 
     125 
     126                print "%s %s %s %s\n"%(message.code,message.jobid,message.command,message.timestamp) 
    109127 
    110128        @classmethod 
    111129        def log(cls,message): 
    112130                with open("/home/jripsl/supervisor/log/supervisor.log", "a") as log_file: 
    113                         log_file.write("%s %s %s %s %s\n"%(datetime.now().strftime('%Y%m%d_%H%M%S'), message.code,message.jobid,message.timestamp,message.command)) 
     131                        log_file.write("%s %s %s %s %s\n"%(datetime.datetime.now().strftime('%Y%m%d_%H%M%S'), message.code,message.jobid,message.timestamp,message.command)) 
    114132 
    115133        @classmethod 
     
    239257                                message=smon.types.Message(JSON_msg)      # all JSON object members will be available in smon.types.Message object 
    240258 
     259                                # non working 
     260                                #print message.type 
     261 
     262                                # working 
     263                                #print message.code 
     264 
    241265 
    242266 
     
    249273 
    250274 
    251                         # manage config-card file which is attached to the "0000" type message (this file is base64 encoded and need to be unencoded) 
     275                        # manage config-card file which is attached to the "0000" code message (this file is base64 encoded and need to be unencoded) 
    252276                        # 
    253277                        if "file" in l__tmp_dic: 
  • trunk/Monitoring/smon/local_repo.py

    r877 r879  
    5959        _conn.execute("create unique index if not exists idx_simulation_1 on simulation (name)") 
    6060 
    61         _conn.execute("create table if not exists message (id INTEGER PRIMARY KEY, simulation_id TEXT, body TEXT, crea_date TEXT)") # TODO: check how to use INT datatype for simulation_id column 
     61        _conn.execute("create table if not exists message (id INTEGER PRIMARY KEY, simulation_id TEXT, body TEXT, timestamp TEXT, crea_date TEXT)") # TODO: check how to use INT datatype for simulation_id column 
    6262 
    6363def cleanup(): 
     
    102102 
    103103        if rs is None: 
    104                 raise Exception() 
     104                raise Exception("name=%s"%name) 
    105105 
    106106        return types.Simulation(name=rs[0],id=rs[1],status=rs[2]) 
     
    126126        c=_conn.cursor() 
    127127 
    128         _conn.execute("select id from message where simulation_id = ?",(simulation.id,)) 
     128        c.execute("select id from message where simulation_id = ?",(simulation.id,)) 
    129129 
    130130        rs=c.fetchone() 
     
    142142def create_message(message,simulation): 
    143143         
    144         _conn.execute("insert into message (simulation_id,crea_date) values (?,?)",(simulation.id, message.timestamp)) 
     144        _conn.execute("insert into message (simulation_id,timestamp,crea_date) values (?,?,datetime('now', 'localtime'))",(simulation.id, message.timestamp)) 
    145145 
    146146        _conn.commit() 
     
    149149        c=_conn.cursor() 
    150150 
    151         _conn.execute("select id, simulation_id, body, crea_date from message where simulation_id=? order by crea_date desc limit 1",(simulation.id,)) 
     151        # debug 
     152        #print "simulation_id=%d"%simulation.id 
     153 
     154        c.execute("select id, simulation_id, body, timestamp, crea_date from message where simulation_id=? order by crea_date desc limit 1",(simulation.id,)) 
    152155 
    153156        rs=c.fetchone() 
    154157 
    155158        if rs is None: 
     159 
     160                # debug 
     161                #print "simulation not found (%d)"%simulation.id 
     162 
    156163                raise types.MessageNotFoundException() 
    157164 
    158         return types.Message(id=rs[0],simulation_id=rs[1],body=rs[2],timestamp=rs[3]) 
     165        # HACK (we want the Message constructor to support BOTH **kw AND kw) 
     166        di={} 
     167        di["id"]=rs[0] 
     168        di["simulation_id"]=rs[1] 
     169        di["body"]=rs[2] 
     170        di["timestamp"]=id=rs[3] 
     171        di["crea_date"]=rs[4] 
     172 
     173        m=types.Message(di) 
     174 
     175        return m 
     176 
     177 
  • trunk/Monitoring/smon/repo_io.py

    r877 r879  
    1717 
    1818import sys 
     19import datetime 
    1920 
    2021# line below is to include Prodiguer database I/O library in the search path 
    21 sys.path.append("/home/jripsl/snapshot/src") 
    22  
    23 # import Prodiguer database I/O library 
    24 import elixir 
    25 import prodiguer_shared 
    26  
    27  
    28  
    29  
    30  
     22sys.path.append("/home/jripsl/snapshot/src/prodiguer_shared/src") 
     23 
     24 
     25import types 
    3126 
    3227 
     
    3732CSTE_MODE_REMOTE_REPO_STUB="remote_repo_stub" 
    3833 
     34 
     35# HACK 
     36import prodiguer_shared.repo.session as repo_session 
     37import prodiguer_shared.models as models 
     38 
     39 
     40# Test constants. 
     41_SIM_ACTIVITY = 'IPSL' 
     42_SIM_COMPUTE_NODE = 'TGCC' 
     43_SIM_COMPUTE_NODE_LOGIN = 'p86denv' 
     44_SIM_COMPUTE_NODE_MACHINE = 'TGCC - Curie' 
     45_SIM_EXECUTION_START_DATE = datetime.datetime.now() 
     46_SIM_EXECUTION_STATE = models.EXECUTION_STATE_RUNNING 
     47_SIM_EXPERIMENT = '1pctCO2' 
     48_SIM_MODEL_ENGINE = 'IPSL-CM5A-LR' 
     49_SIM_SPACE = models.SIMULATION_SPACE_TEST 
     50_MSG_TYPE = "0000" 
     51_MSG_CONTENT1 = "12345690" 
     52_MSG_CONTENT2 = "12345690" 
     53 
     54 
     55 
     56 
     57 
     58 
     59 
    3960# set mode 
    40 mode=CSTE_MODE_LOCAL_REPO # CSTE_MODE_LOCAL_REPO, CSTE_MODE_REMOTE_REPO, CSTE_MODE_REMOTE_REPO_STUB 
     61mode=CSTE_MODE_REMOTE_REPO 
     62#mode=CSTE_MODE_LOCAL_REPO 
    4163 
    4264# set repository driver 
    4365if mode==CSTE_MODE_REMOTE_REPO_STUB: 
    44         import prodiguer_shared.repo.mq.hooks_stub as repo 
     66        raise Exception() 
     67 
    4568elif mode==CSTE_MODE_REMOTE_REPO: 
    46         import prodiguer_shared.repo.mq.hooks as repo 
     69 
     70        # import Prodiguer database I/O library 
     71        import prodiguer_shared.mq.hooks as repo 
     72 
     73 
    4774elif mode==CSTE_MODE_LOCAL_REPO: 
    4875        import local_repo as repo 
     
    6188                repo.connect() 
    6289        elif mode==CSTE_MODE_REMOTE_REPO: 
    63                 _CONNECTION = "postgresql://postgres:Silence107!@localhost:5432/prodiguer" 
    64                 prodiguer_shared.connect(_CONNECTION) 
     90                _CONNECTION = "postgresql://postgres:Silence107!@ib-pp-db-dev.ipslnet:5432/pdgr_1_0b5" 
     91                repo_session.start(_CONNECTION) 
     92 
    6593        elif mode==CSTE_MODE_REMOTE_REPO_STUB: 
    6694                pass 
     
    72100                repo.free() 
    73101        elif mode==CSTE_MODE_REMOTE_REPO: 
    74  
    75                 #prodiguer_shared.close() 
    76                 pass 
     102                repo_session.end() 
    77103        elif mode==CSTE_MODE_REMOTE_REPO_STUB: 
    78104                pass 
     
    91117        used by get_running_simulations 
    92118        """ 
    93         return repo.retrieve_simulations() 
     119        simulations=None 
     120 
     121        # debug 
     122        #print "bla" 
     123 
     124 
     125        if mode==CSTE_MODE_LOCAL_REPO: 
     126                simulations=repo.retrieve_simulations() 
     127        elif mode==CSTE_MODE_REMOTE_REPO: 
     128 
     129                # prepare 
     130                simulations=[] 
     131 
     132                # execute 
     133                sims=repo.retrieve_simulations() 
     134 
     135                # process return values 
     136 
     137                for s in sims: 
     138 
     139                        status=get_repo_execution_state(s.ExecutionState_ID) 
     140 
     141                        # HACK 
     142                        if status=="queued": 
     143                                status="waiting" 
     144 
     145                         
     146 
     147                        simulations.append(types.Simulation(id=s.ID,name=s.Name,status=status.lower())) 
     148 
     149 
     150        elif mode==CSTE_MODE_REMOTE_REPO_STUB: 
     151                pass 
     152        else: 
     153                raise Exception("ERR115 - incorrect mode") 
     154 
     155 
     156        # debug 
     157        """ 
     158        if len(simulations)<1: 
     159                raise Exception("ERR915 - debug") 
     160        else: 
     161                print "%d"%len(simulations) 
     162        """ 
     163 
     164 
     165        return simulations 
    94166 
    95167def test(): 
     
    111183                repo.cleanup() 
    112184        elif mode==CSTE_MODE_REMOTE_REPO: 
    113                 raise Exception("ERR707") 
     185 
     186 
     187                simulations_to_delete=["BIGBRO.clim.TEST.LMDZOR.p86denv.TGCC.CURIE", 
     188                                                                "BIGBRO.clim.TEST.LMDZOR.p86denv.TGCC.CURIE", 
     189                                                                "v5cf.amipMR1.amip.PROD.LMDZOR.p86denv.TGCC.CURIE", 
     190                                                                "v5cf.amipMR2.amip.PROD.LMDZOR.p86denv.TGCC.CURIE", 
     191                                                                "v5cf.amipMR3.amip.PROD.LMDZOR.p86denv.TGCC.CURIE"] 
     192 
     193 
     194                for s in simulations_to_delete: 
     195                        repo.delete_messages(s) 
     196                        repo.delete_simulation(s) 
     197 
     198                        commit() 
     199 
     200 
     201 
    114202        elif mode==CSTE_MODE_REMOTE_REPO_STUB: 
    115203                pass 
     
    121209                repo.commit() 
    122210        elif mode==CSTE_MODE_REMOTE_REPO: 
    123                 elixir.session.commit() 
     211                repo_session.commit() 
    124212        elif mode==CSTE_MODE_REMOTE_REPO_STUB: 
    125213                pass 
     
    131219                repo.rollback() 
    132220        elif mode==CSTE_MODE_REMOTE_REPO: 
    133                 elixir.session.rollback() 
     221                repo_session.commit() 
    134222        elif mode==CSTE_MODE_REMOTE_REPO_STUB: 
    135223                pass 
    136224        else: 
    137225                raise Exception("ERR003 - incorrect mode") 
     226 
     227def get_repo_execution_state(state_id): 
     228    for key, value in models.EXECUTION_STATE_ID_SET.items(): 
     229        if value == state_id: 
     230            return key 
     231    return None 
    138232 
    139233def retrieve_simulation(name): 
     
    141235 
    142236        if mode==CSTE_MODE_LOCAL_REPO: 
    143                 simulation=repo.retrieve_simulation(name) 
     237 
     238                try: 
     239                        simulation=repo.retrieve_simulation(name) 
     240                except: 
     241                        traceback.print_exc() 
     242 
    144243        elif mode==CSTE_MODE_REMOTE_REPO: 
    145244 
     
    150249                s=repo.retrieve_simulation(name) 
    151250 
    152                 # process return values 
    153                 simulation=smon.types.Simulation(exec_start_date=s.ExecutionStartDate,exec_end_date=s.ExecutionEndDate,status=s.ExecutionState) # ExecutionState example: EXECUTION_STATE_RUNNING, EXECUTION_STATE_SET.. 
     251                if s is None: 
     252                        raise Exception("RG543534") 
     253 
     254 
     255 
     256                # process return values 
     257 
     258                status=get_repo_execution_state(s.ExecutionState_ID) 
     259 
     260                # HACK 
     261                if status=="queued": 
     262                        status="waiting" 
     263 
     264                simulation=types.Simulation(id=s.ID,name=s.Name,status=status)  
     265 
     266 
    154267 
    155268        elif mode==CSTE_MODE_REMOTE_REPO_STUB: 
     
    185298 
    186299                # prepare args 
    187                 # .. 
    188  
    189                 # execute 
    190                 repo.create_simulation(activity) 
     300                model_engine=None 
     301                space=None 
     302                exp=None 
     303 
     304                if "BIGBRO" in simulation.name: 
     305 
     306                        model_engine="IPSL-CM5A-LR" 
     307                        space="TEST" 
     308                        exp="sstClim" 
     309 
     310                elif "v5cf.amipMR" in simulation.name: 
     311 
     312                        model_engine="IPSL-CM5A-MR" 
     313                        space="PROD" 
     314                        exp="amip" 
     315 
     316                else: 
     317 
     318                        model_engine=_SIM_MODEL_ENGINE 
     319                        space=_SIM_SPACE 
     320                        exp=_SIM_EXPERIMENT 
     321 
     322 
     323                # execute 
     324                repo.create_simulation(_SIM_ACTIVITY, 
     325                                                                _SIM_COMPUTE_NODE, 
     326                                                                _SIM_COMPUTE_NODE_LOGIN, 
     327                                                                _SIM_COMPUTE_NODE_MACHINE, 
     328                                                                _SIM_EXECUTION_START_DATE, 
     329                                                                _SIM_EXECUTION_STATE, 
     330                                                                exp, 
     331                                                                model_engine, 
     332                                                                simulation.name, 
     333                                                                space, 
     334                                                                parent_simulation_name=None) 
     335 
    191336 
    192337                # process return values 
     
    200345def update_simulation_status(simulation): 
    201346        if mode==CSTE_MODE_LOCAL_REPO: 
    202                 repo.update_simulation_status(simulation) 
    203         elif mode==CSTE_MODE_REMOTE_REPO: 
    204  
    205                 # prepare args 
    206                 # .. 
    207  
    208                 # execute 
    209                 repo.update_simulation_status(name) 
    210  
    211                 # process return values 
    212                 # .. 
     347 
     348                try: 
     349                        repo.update_simulation_status(simulation) 
     350                except: 
     351                        traceback.print_exc() 
     352 
     353        elif mode==CSTE_MODE_REMOTE_REPO: 
     354 
     355                # prepare args 
     356                # .. 
     357 
     358 
     359                # HACK 
     360                prodiguer_status=simulation.status 
     361                if simulation.status == "waiting": 
     362                        prodiguer_status="queued" 
     363 
     364 
     365                # execute 
     366                repo.update_simulation_status(simulation.name, prodiguer_status.upper()) 
     367 
     368                # process return values 
     369                # .. 
     370 
     371                commit() 
    213372 
    214373        elif mode==CSTE_MODE_REMOTE_REPO_STUB: 
     
    267426                # .. 
    268427 
    269                 # execute 
    270                 repo.create_message() 
     428                print "%s %s %s"%(simulation.name,message.code,"message.body") 
     429 
     430                # execute 
     431                try: 
     432                        repo.create_message(simulation.name,message.code,"message.body") 
     433                except ValueError as i: 
     434                        print "bla (%s)"%str(i) 
     435 
     436                commit() 
     437 
    271438 
    272439                # process return values 
     
    277444        else: 
    278445                raise Exception("ERR020 - incorrect mode") 
     446 
     447        commit() 
     448 
    279449 
    280450def retrieve_last_message(simulation): 
     
    285455        elif mode==CSTE_MODE_REMOTE_REPO: 
    286456 
    287                 # prepare args 
    288                 # .. 
    289  
    290                 # execute 
    291                 repo.retrieve_last_message(simulation) 
    292  
    293                 # process return values 
    294                 # .. 
     457                # execute 
     458                message=repo.retrieve_last_message(simulation.name) 
     459 
     460                if message is None: 
     461                        raise Exception("ERR221 - null value") 
     462 
     463                # process return values 
     464                di={} 
     465                di["crea_date"]=message.CreateDate 
     466                message=types.Message(di) 
    295467 
    296468        elif mode==CSTE_MODE_REMOTE_REPO_STUB: 
     
    308480 
    309481        for s in retrieve_simulations(): 
     482 
    310483                if s.status=="running": 
    311484                        running_simulation.append(s) 
  • trunk/Monitoring/smon/types.py

    r876 r879  
    5252 
    5353class Message(): 
     54        type=None 
    5455        file=None 
    5556        simuid=None 
    5657        jobid=None 
    5758        timestamp=None 
     59        crea_date=None 
    5860        command=None 
     61        body=None 
    5962 
    6063        def __init__(self,JSON_KW): 
Note: See TracChangeset for help on using the changeset viewer.