Changeset 879 for trunk/Monitoring
- Timestamp:
- 06/13/13 07:08:13 (11 years ago)
- Location:
- trunk/Monitoring
- Files:
-
- 5 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/Monitoring/Analyze/analyze
r877 r879 18 18 import smtplib 19 19 from email.mime.text import MIMEText 20 import time ;21 from datetimeimport datetime20 import time 21 import datetime 22 22 23 23 … … 31 31 32 32 class CheckList(): 33 max_time_between_msg=10 # unit => seconds 34 35 @classmethod 36 def datetime_to_epoch(cls,datetime): 37 epoch = time.mktime(time.strptime(datetime, "%d.%m.%Y %H:%M:%S")).time(); # assuming datetime format is "29.08.2011 11:05:02" 38 return epoch 33 max_time_between_msg=20 # unit => seconds 39 34 40 35 @classmethod 41 36 def msg_timeout(cls,message): 42 37 43 msg_time=cls.datetime_to_epoch(message.timestamp)44 current_ time=time.time()38 # get current epoch 39 current_epoch=time.time() 45 40 46 diff=current_time-msg_time 41 # get msg epoch 42 msg_time=time.strptime(str(message.crea_date), "%Y-%m-%d %H:%M:%S.%f") 43 msg_epoch=time.mktime(msg_time) 44 45 diff = current_epoch - msg_epoch 47 46 48 47 # debug 49 print "diff=%s"%diff 48 #print "cur=%i,ms=%s"%(current_epoch,message.crea_date) 49 50 # debug 51 #print "diff=%i"%int(diff) 50 52 51 53 if diff>cls.max_time_between_msg: … … 68 70 for simulation in repo_io.get_running_simulations(): 69 71 70 print "checking heartbeat for '%s'"%simulation.name 72 print "hhh" 73 74 print "checking heartbeat ('%s')"%simulation.name 71 75 72 76 try: 73 74 77 message=repo_io.retrieve_last_message(simulation) 75 78 79 # debug 80 #print "found" 81 76 82 except types.MessageNotFoundException, e: 83 # when we are here, it mean we are in the interval when a new simulation have just been inserted but the corresponding message have not been inserted yet 84 85 print "no message found for simulation ('%s')"%simulation.name 77 86 78 87 continue 79 88 80 89 81 if msg_timeout(message):90 if cls.msg_timeout(message): 82 91 83 92 simulation.status="error" … … 85 94 repo_io.update_simulation_status(simulation) 86 95 87 print "heartbeat NOK (simulation status set to 'error')"%simulation.name96 print "heartbeat NOK - simulation status set to 'error' (%s)"%simulation.name 88 97 89 98 90 99 else: 91 print "heartbeat OK "%simulation.name100 print "heartbeat OK (%s)"%simulation.name 92 101 93 102 class Analyzer(): … … 128 137 129 138 130 time.sleep( 1)139 time.sleep(3) 131 140 132 141 """ -
trunk/Monitoring/Watch/watch
r877 r879 21 21 import smtplib 22 22 from email.mime.text import MIMEText 23 from datetimeimport datetime23 import datetime 24 24 25 25 # line below is to include "smon" package in the search path … … 58 58 def store_msg(cls,message): 59 59 60 # the simu exists when we are here (see TAG0001 tag) 61 s=repo_io.retrieve_simulation(message.simuid) 62 63 repo_io.create_message(message,s) 60 try: 61 62 # the simu exists when we are here (see TAG0001 tag) 63 s=repo_io.retrieve_simulation(message.simuid) 64 65 repo_io.create_message(message,s) 66 67 sys.exit() 68 69 except: 70 traceback.print_exc() 71 raise 64 72 65 73 @classmethod … … 70 78 def set_sim_status_to_error(cls,message): 71 79 72 s=repo_io.retrieve_simulation(message.simuid) 73 74 s.status="error" 75 76 repo_io.update_simulation_status(s) 80 try: 81 82 s=repo_io.retrieve_simulation(message.simuid) 83 84 s.status="error" 85 86 repo_io.update_simulation_status(s) 87 88 except: 89 traceback.print_exc() 90 raise 77 91 78 92 @classmethod … … 103 117 # used for debug 104 118 119 """ 105 120 if message.file is not None: 106 121 print "%s %s %s %s %s\n"%(message.code,message.jobid,message.command,message.timestamp,message.file) 107 122 else: 108 123 print "%s %s %s %s\n"%(message.code,message.jobid,message.command,message.timestamp) 124 """ 125 126 print "%s %s %s %s\n"%(message.code,message.jobid,message.command,message.timestamp) 109 127 110 128 @classmethod 111 129 def log(cls,message): 112 130 with open("/home/jripsl/supervisor/log/supervisor.log", "a") as log_file: 113 log_file.write("%s %s %s %s %s\n"%(datetime. now().strftime('%Y%m%d_%H%M%S'), message.code,message.jobid,message.timestamp,message.command))131 log_file.write("%s %s %s %s %s\n"%(datetime.datetime.now().strftime('%Y%m%d_%H%M%S'), message.code,message.jobid,message.timestamp,message.command)) 114 132 115 133 @classmethod … … 239 257 message=smon.types.Message(JSON_msg) # all JSON object members will be available in smon.types.Message object 240 258 259 # non working 260 #print message.type 261 262 # working 263 #print message.code 264 241 265 242 266 … … 249 273 250 274 251 # manage config-card file which is attached to the "0000" type message (this file is base64 encoded and need to be unencoded)275 # manage config-card file which is attached to the "0000" code message (this file is base64 encoded and need to be unencoded) 252 276 # 253 277 if "file" in l__tmp_dic: -
trunk/Monitoring/smon/local_repo.py
r877 r879 59 59 _conn.execute("create unique index if not exists idx_simulation_1 on simulation (name)") 60 60 61 _conn.execute("create table if not exists message (id INTEGER PRIMARY KEY, simulation_id TEXT, body TEXT, crea_date TEXT)") # TODO: check how to use INT datatype for simulation_id column61 _conn.execute("create table if not exists message (id INTEGER PRIMARY KEY, simulation_id TEXT, body TEXT, timestamp TEXT, crea_date TEXT)") # TODO: check how to use INT datatype for simulation_id column 62 62 63 63 def cleanup(): … … 102 102 103 103 if rs is None: 104 raise Exception( )104 raise Exception("name=%s"%name) 105 105 106 106 return types.Simulation(name=rs[0],id=rs[1],status=rs[2]) … … 126 126 c=_conn.cursor() 127 127 128 _conn.execute("select id from message where simulation_id = ?",(simulation.id,))128 c.execute("select id from message where simulation_id = ?",(simulation.id,)) 129 129 130 130 rs=c.fetchone() … … 142 142 def create_message(message,simulation): 143 143 144 _conn.execute("insert into message (simulation_id, crea_date) values (?,?)",(simulation.id, message.timestamp))144 _conn.execute("insert into message (simulation_id,timestamp,crea_date) values (?,?,datetime('now', 'localtime'))",(simulation.id, message.timestamp)) 145 145 146 146 _conn.commit() … … 149 149 c=_conn.cursor() 150 150 151 _conn.execute("select id, simulation_id, body, crea_date from message where simulation_id=? order by crea_date desc limit 1",(simulation.id,)) 151 # debug 152 #print "simulation_id=%d"%simulation.id 153 154 c.execute("select id, simulation_id, body, timestamp, crea_date from message where simulation_id=? order by crea_date desc limit 1",(simulation.id,)) 152 155 153 156 rs=c.fetchone() 154 157 155 158 if rs is None: 159 160 # debug 161 #print "simulation not found (%d)"%simulation.id 162 156 163 raise types.MessageNotFoundException() 157 164 158 return types.Message(id=rs[0],simulation_id=rs[1],body=rs[2],timestamp=rs[3]) 165 # HACK (we want the Message constructor to support BOTH **kw AND kw) 166 di={} 167 di["id"]=rs[0] 168 di["simulation_id"]=rs[1] 169 di["body"]=rs[2] 170 di["timestamp"]=id=rs[3] 171 di["crea_date"]=rs[4] 172 173 m=types.Message(di) 174 175 return m 176 177 -
trunk/Monitoring/smon/repo_io.py
r877 r879 17 17 18 18 import sys 19 import datetime 19 20 20 21 # line below is to include Prodiguer database I/O library in the search path 21 sys.path.append("/home/jripsl/snapshot/src") 22 23 # import Prodiguer database I/O library 24 import elixir 25 import prodiguer_shared 26 27 28 29 30 22 sys.path.append("/home/jripsl/snapshot/src/prodiguer_shared/src") 23 24 25 import types 31 26 32 27 … … 37 32 CSTE_MODE_REMOTE_REPO_STUB="remote_repo_stub" 38 33 34 35 # HACK 36 import prodiguer_shared.repo.session as repo_session 37 import prodiguer_shared.models as models 38 39 40 # Test constants. 41 _SIM_ACTIVITY = 'IPSL' 42 _SIM_COMPUTE_NODE = 'TGCC' 43 _SIM_COMPUTE_NODE_LOGIN = 'p86denv' 44 _SIM_COMPUTE_NODE_MACHINE = 'TGCC - Curie' 45 _SIM_EXECUTION_START_DATE = datetime.datetime.now() 46 _SIM_EXECUTION_STATE = models.EXECUTION_STATE_RUNNING 47 _SIM_EXPERIMENT = '1pctCO2' 48 _SIM_MODEL_ENGINE = 'IPSL-CM5A-LR' 49 _SIM_SPACE = models.SIMULATION_SPACE_TEST 50 _MSG_TYPE = "0000" 51 _MSG_CONTENT1 = "12345690" 52 _MSG_CONTENT2 = "12345690" 53 54 55 56 57 58 59 39 60 # set mode 40 mode=CSTE_MODE_LOCAL_REPO # CSTE_MODE_LOCAL_REPO, CSTE_MODE_REMOTE_REPO, CSTE_MODE_REMOTE_REPO_STUB 61 mode=CSTE_MODE_REMOTE_REPO 62 #mode=CSTE_MODE_LOCAL_REPO 41 63 42 64 # set repository driver 43 65 if mode==CSTE_MODE_REMOTE_REPO_STUB: 44 import prodiguer_shared.repo.mq.hooks_stub as repo 66 raise Exception() 67 45 68 elif mode==CSTE_MODE_REMOTE_REPO: 46 import prodiguer_shared.repo.mq.hooks as repo 69 70 # import Prodiguer database I/O library 71 import prodiguer_shared.mq.hooks as repo 72 73 47 74 elif mode==CSTE_MODE_LOCAL_REPO: 48 75 import local_repo as repo … … 61 88 repo.connect() 62 89 elif mode==CSTE_MODE_REMOTE_REPO: 63 _CONNECTION = "postgresql://postgres:Silence107!@localhost:5432/prodiguer" 64 prodiguer_shared.connect(_CONNECTION) 90 _CONNECTION = "postgresql://postgres:Silence107!@ib-pp-db-dev.ipslnet:5432/pdgr_1_0b5" 91 repo_session.start(_CONNECTION) 92 65 93 elif mode==CSTE_MODE_REMOTE_REPO_STUB: 66 94 pass … … 72 100 repo.free() 73 101 elif mode==CSTE_MODE_REMOTE_REPO: 74 75 #prodiguer_shared.close() 76 pass 102 repo_session.end() 77 103 elif mode==CSTE_MODE_REMOTE_REPO_STUB: 78 104 pass … … 91 117 used by get_running_simulations 92 118 """ 93 return repo.retrieve_simulations() 119 simulations=None 120 121 # debug 122 #print "bla" 123 124 125 if mode==CSTE_MODE_LOCAL_REPO: 126 simulations=repo.retrieve_simulations() 127 elif mode==CSTE_MODE_REMOTE_REPO: 128 129 # prepare 130 simulations=[] 131 132 # execute 133 sims=repo.retrieve_simulations() 134 135 # process return values 136 137 for s in sims: 138 139 status=get_repo_execution_state(s.ExecutionState_ID) 140 141 # HACK 142 if status=="queued": 143 status="waiting" 144 145 146 147 simulations.append(types.Simulation(id=s.ID,name=s.Name,status=status.lower())) 148 149 150 elif mode==CSTE_MODE_REMOTE_REPO_STUB: 151 pass 152 else: 153 raise Exception("ERR115 - incorrect mode") 154 155 156 # debug 157 """ 158 if len(simulations)<1: 159 raise Exception("ERR915 - debug") 160 else: 161 print "%d"%len(simulations) 162 """ 163 164 165 return simulations 94 166 95 167 def test(): … … 111 183 repo.cleanup() 112 184 elif mode==CSTE_MODE_REMOTE_REPO: 113 raise Exception("ERR707") 185 186 187 simulations_to_delete=["BIGBRO.clim.TEST.LMDZOR.p86denv.TGCC.CURIE", 188 "BIGBRO.clim.TEST.LMDZOR.p86denv.TGCC.CURIE", 189 "v5cf.amipMR1.amip.PROD.LMDZOR.p86denv.TGCC.CURIE", 190 "v5cf.amipMR2.amip.PROD.LMDZOR.p86denv.TGCC.CURIE", 191 "v5cf.amipMR3.amip.PROD.LMDZOR.p86denv.TGCC.CURIE"] 192 193 194 for s in simulations_to_delete: 195 repo.delete_messages(s) 196 repo.delete_simulation(s) 197 198 commit() 199 200 201 114 202 elif mode==CSTE_MODE_REMOTE_REPO_STUB: 115 203 pass … … 121 209 repo.commit() 122 210 elif mode==CSTE_MODE_REMOTE_REPO: 123 elixir.session.commit()211 repo_session.commit() 124 212 elif mode==CSTE_MODE_REMOTE_REPO_STUB: 125 213 pass … … 131 219 repo.rollback() 132 220 elif mode==CSTE_MODE_REMOTE_REPO: 133 elixir.session.rollback()221 repo_session.commit() 134 222 elif mode==CSTE_MODE_REMOTE_REPO_STUB: 135 223 pass 136 224 else: 137 225 raise Exception("ERR003 - incorrect mode") 226 227 def get_repo_execution_state(state_id): 228 for key, value in models.EXECUTION_STATE_ID_SET.items(): 229 if value == state_id: 230 return key 231 return None 138 232 139 233 def retrieve_simulation(name): … … 141 235 142 236 if mode==CSTE_MODE_LOCAL_REPO: 143 simulation=repo.retrieve_simulation(name) 237 238 try: 239 simulation=repo.retrieve_simulation(name) 240 except: 241 traceback.print_exc() 242 144 243 elif mode==CSTE_MODE_REMOTE_REPO: 145 244 … … 150 249 s=repo.retrieve_simulation(name) 151 250 152 # process return values 153 simulation=smon.types.Simulation(exec_start_date=s.ExecutionStartDate,exec_end_date=s.ExecutionEndDate,status=s.ExecutionState) # ExecutionState example: EXECUTION_STATE_RUNNING, EXECUTION_STATE_SET.. 251 if s is None: 252 raise Exception("RG543534") 253 254 255 256 # process return values 257 258 status=get_repo_execution_state(s.ExecutionState_ID) 259 260 # HACK 261 if status=="queued": 262 status="waiting" 263 264 simulation=types.Simulation(id=s.ID,name=s.Name,status=status) 265 266 154 267 155 268 elif mode==CSTE_MODE_REMOTE_REPO_STUB: … … 185 298 186 299 # prepare args 187 # .. 188 189 # execute 190 repo.create_simulation(activity) 300 model_engine=None 301 space=None 302 exp=None 303 304 if "BIGBRO" in simulation.name: 305 306 model_engine="IPSL-CM5A-LR" 307 space="TEST" 308 exp="sstClim" 309 310 elif "v5cf.amipMR" in simulation.name: 311 312 model_engine="IPSL-CM5A-MR" 313 space="PROD" 314 exp="amip" 315 316 else: 317 318 model_engine=_SIM_MODEL_ENGINE 319 space=_SIM_SPACE 320 exp=_SIM_EXPERIMENT 321 322 323 # execute 324 repo.create_simulation(_SIM_ACTIVITY, 325 _SIM_COMPUTE_NODE, 326 _SIM_COMPUTE_NODE_LOGIN, 327 _SIM_COMPUTE_NODE_MACHINE, 328 _SIM_EXECUTION_START_DATE, 329 _SIM_EXECUTION_STATE, 330 exp, 331 model_engine, 332 simulation.name, 333 space, 334 parent_simulation_name=None) 335 191 336 192 337 # process return values … … 200 345 def update_simulation_status(simulation): 201 346 if mode==CSTE_MODE_LOCAL_REPO: 202 repo.update_simulation_status(simulation) 203 elif mode==CSTE_MODE_REMOTE_REPO: 204 205 # prepare args 206 # .. 207 208 # execute 209 repo.update_simulation_status(name) 210 211 # process return values 212 # .. 347 348 try: 349 repo.update_simulation_status(simulation) 350 except: 351 traceback.print_exc() 352 353 elif mode==CSTE_MODE_REMOTE_REPO: 354 355 # prepare args 356 # .. 357 358 359 # HACK 360 prodiguer_status=simulation.status 361 if simulation.status == "waiting": 362 prodiguer_status="queued" 363 364 365 # execute 366 repo.update_simulation_status(simulation.name, prodiguer_status.upper()) 367 368 # process return values 369 # .. 370 371 commit() 213 372 214 373 elif mode==CSTE_MODE_REMOTE_REPO_STUB: … … 267 426 # .. 268 427 269 # execute 270 repo.create_message() 428 print "%s %s %s"%(simulation.name,message.code,"message.body") 429 430 # execute 431 try: 432 repo.create_message(simulation.name,message.code,"message.body") 433 except ValueError as i: 434 print "bla (%s)"%str(i) 435 436 commit() 437 271 438 272 439 # process return values … … 277 444 else: 278 445 raise Exception("ERR020 - incorrect mode") 446 447 commit() 448 279 449 280 450 def retrieve_last_message(simulation): … … 285 455 elif mode==CSTE_MODE_REMOTE_REPO: 286 456 287 # prepare args 288 # .. 289 290 # execute 291 repo.retrieve_last_message(simulation) 292 293 # process return values 294 # .. 457 # execute 458 message=repo.retrieve_last_message(simulation.name) 459 460 if message is None: 461 raise Exception("ERR221 - null value") 462 463 # process return values 464 di={} 465 di["crea_date"]=message.CreateDate 466 message=types.Message(di) 295 467 296 468 elif mode==CSTE_MODE_REMOTE_REPO_STUB: … … 308 480 309 481 for s in retrieve_simulations(): 482 310 483 if s.status=="running": 311 484 running_simulation.append(s) -
trunk/Monitoring/smon/types.py
r876 r879 52 52 53 53 class Message(): 54 type=None 54 55 file=None 55 56 simuid=None 56 57 jobid=None 57 58 timestamp=None 59 crea_date=None 58 60 command=None 61 body=None 59 62 60 63 def __init__(self,JSON_KW):
Note: See TracChangeset
for help on using the changeset viewer.