# -*- coding: ISO-8859-1 -*- ################################## # @program smon # @description simulation monitor # @copyright Copyright “(c)2009 Centre National de la Recherche Scientifique CNRS. # All Rights Reserved” # @svn_file $Id: repo_io.py 2599 2013-03-24 19:01:23Z jripsl $ # @version $Rev: 2599 $ # @lastrevision $Date: 2013-03-24 20:01:23 +0100 (Sun, 24 Mar 2013) $ # @license CeCILL (http://dods.ipsl.jussieu.fr/jripsl/smon/LICENSE) ################################## """ This module contains repository I/O code """ import sys import datetime # --- module static initialization --- # CSTE_MODE_LOCAL_REPO="local_repo" CSTE_MODE_REMOTE_REPO="remote_repo" CSTE_MODE_REMOTE_REPO_STUB="remote_repo_stub" g__remote_database_host="ib-pp-db-dev.ipslnet" g__remote_database_port="5432" g__remote_database_name="pdgr_1_0b5" # set mode mode=CSTE_MODE_REMOTE_REPO #mode=CSTE_MODE_LOCAL_REPO # mode specific init. (e.g. repository driver) if mode==CSTE_MODE_REMOTE_REPO_STUB: raise Exception() elif mode==CSTE_MODE_REMOTE_REPO: # line below is to include Prodiguer database I/O library in the search path sys.path.append("/opt/supervisor/prodiguer_lib/src") import types # import Prodiguer database I/O library import prodiguer_shared.mq.hooks as repo import prodiguer_shared.repo.session as repo_session import prodiguer_shared.models as models # Test constants. _SIM_ACTIVITY = 'IPSL' _SIM_COMPUTE_NODE = 'TGCC' _SIM_COMPUTE_NODE_LOGIN = 'p86denv' _SIM_COMPUTE_NODE_MACHINE = 'TGCC - Curie' _SIM_EXECUTION_START_DATE = datetime.datetime.now() _SIM_EXECUTION_STATE = models.EXECUTION_STATE_RUNNING _SIM_EXPERIMENT = '1pctCO2' _SIM_MODEL_ENGINE = 'IPSL-CM5A-LR' _SIM_SPACE = models.SIMULATION_SPACE_TEST _MSG_TYPE = "0000" _MSG_CONTENT1 = "12345690" _MSG_CONTENT2 = "12345690" elif mode==CSTE_MODE_LOCAL_REPO: import local_repo as repo else: raise Exception("ERR001 - incorrect mode") # -- methods -- # def init(): if mode==CSTE_MODE_LOCAL_REPO: repo.connect() elif mode==CSTE_MODE_REMOTE_REPO: _CONNECTION = "postgresql://postgres:Silence107!@%s:%s/%s"%(g__remote_database_host,g__remote_database_port,g__remote_database_name) repo_session.start(_CONNECTION) elif mode==CSTE_MODE_REMOTE_REPO_STUB: pass else: raise Exception("ERR004 - incorrect mode") def free(): if mode==CSTE_MODE_LOCAL_REPO: repo.free() elif mode==CSTE_MODE_REMOTE_REPO: repo_session.end() elif mode==CSTE_MODE_REMOTE_REPO_STUB: pass else: raise Exception("ERR009 - incorrect mode") def populate_tables_with_sample(): """ used only by "repo-state" pgm """ repo.populate_tables_with_sample() def retrieve_simulations(): """ used by get_running_simulations """ simulations=None # debug #print "bla" if mode==CSTE_MODE_LOCAL_REPO: simulations=repo.retrieve_simulations() elif mode==CSTE_MODE_REMOTE_REPO: # prepare simulations=[] # execute sims=repo.retrieve_simulations() # process return values for s in sims: status=get_repo_execution_state(s.ExecutionState_ID) # HACK if status=="queued": status="waiting" simulations.append(types.Simulation(id=s.ID,name=s.Name,status=status.lower())) elif mode==CSTE_MODE_REMOTE_REPO_STUB: pass else: raise Exception("ERR115 - incorrect mode") # debug """ if len(simulations)<1: raise Exception("ERR915 - debug") else: print "%d"%len(simulations) """ return simulations def test(): """ not used """ repo.create_message("test2", 2, "bla2") commit() repo.update_simulation_status('1pctCO22', 'ERROR') commit() repo.create_message("test3", 3, "bla3") rollback() def cleanup(): if mode==CSTE_MODE_LOCAL_REPO: repo.cleanup() elif mode==CSTE_MODE_REMOTE_REPO: simulations_to_delete=["BIGBRO.clim.TEST.LMDZOR.p86denv.TGCC.CURIE", "BIGBRO.clim.TEST.LMDZOR.p86denv.TGCC.CURIE", "v5cf.amipMR1.amip.PROD.LMDZOR.p86denv.TGCC.CURIE", "v5cf.amipMR2.amip.PROD.LMDZOR.p86denv.TGCC.CURIE", "v5cf.amipMR3.amip.PROD.LMDZOR.p86denv.TGCC.CURIE"] for s in simulations_to_delete: repo.delete_messages(s) repo.delete_simulation(s) commit() elif mode==CSTE_MODE_REMOTE_REPO_STUB: pass else: raise Exception("ERR007 - incorrect mode") def commit(): if mode==CSTE_MODE_LOCAL_REPO: repo.commit() elif mode==CSTE_MODE_REMOTE_REPO: repo_session.commit() elif mode==CSTE_MODE_REMOTE_REPO_STUB: pass else: raise Exception("ERR002 - incorrect mode") def rollback(): if mode==CSTE_MODE_LOCAL_REPO: repo.rollback() elif mode==CSTE_MODE_REMOTE_REPO: repo_session.commit() elif mode==CSTE_MODE_REMOTE_REPO_STUB: pass else: raise Exception("ERR003 - incorrect mode") def get_repo_execution_state(state_id): for key, value in models.EXECUTION_STATE_ID_SET.items(): if value == state_id: return key return None def retrieve_simulation(name): simulation=None if mode==CSTE_MODE_LOCAL_REPO: try: simulation=repo.retrieve_simulation(name) except: raise elif mode==CSTE_MODE_REMOTE_REPO: # prepare args # .. # execute s=repo.retrieve_simulation(name) if s is None: #raise Exception("RG543534") return None # process return values status=get_repo_execution_state(s.ExecutionState_ID) # HACK if status=="queued": status="waiting" simulation=types.Simulation(id=s.ID,name=s.Name,status=status) elif mode==CSTE_MODE_REMOTE_REPO_STUB: pass else: raise Exception("ERR014 - incorrect mode") return simulation def delete_simulation(name): if mode==CSTE_MODE_LOCAL_REPO: repo.delete_simulation(name) elif mode==CSTE_MODE_REMOTE_REPO: # prepare args # .. # execute repo.delete_simulation(name) # process return values # .. elif mode==CSTE_MODE_REMOTE_REPO_STUB: pass else: raise Exception("ERR015 - incorrect mode") def create_simulation(simulation): if mode==CSTE_MODE_LOCAL_REPO: repo.create_simulation(simulation) elif mode==CSTE_MODE_REMOTE_REPO: # prepare args model_engine=None space=None exp=None if "BIGBRO" in simulation.name: model_engine="IPSL-CM5A-LR" space="TEST" exp="sstClim" elif "v5cf.amipMR" in simulation.name: model_engine="IPSL-CM5A-MR" space="PROD" exp="amip" else: model_engine=_SIM_MODEL_ENGINE space=_SIM_SPACE exp=_SIM_EXPERIMENT # execute repo.create_simulation(_SIM_ACTIVITY, _SIM_COMPUTE_NODE, _SIM_COMPUTE_NODE_LOGIN, _SIM_COMPUTE_NODE_MACHINE, _SIM_EXECUTION_START_DATE, _SIM_EXECUTION_STATE, exp, model_engine, simulation.name, space, parent_simulation_name=None) # process return values # .. elif mode==CSTE_MODE_REMOTE_REPO_STUB: pass else: raise Exception("ERR016 - incorrect mode") def update_simulation_status(simulation): if mode==CSTE_MODE_LOCAL_REPO: try: repo.update_simulation_status(simulation) except: raise elif mode==CSTE_MODE_REMOTE_REPO: # prepare args # .. # HACK prodiguer_status=simulation.status if simulation.status == "waiting": prodiguer_status="queued" # execute repo.update_simulation_status(simulation.name, prodiguer_status.upper()) # process return values # .. commit() elif mode==CSTE_MODE_REMOTE_REPO_STUB: pass else: raise Exception("ERR017 - incorrect mode") def retrieve_messages(simulation): message=None if mode==CSTE_MODE_LOCAL_REPO: message=repo.retrieve_messages(simulation) elif mode==CSTE_MODE_REMOTE_REPO: # prepare args # .. # execute repo.retrieve_messages(name) # process return values # .. elif mode==CSTE_MODE_REMOTE_REPO_STUB: pass else: raise Exception("ERR018 - incorrect mode") return message def delete_messages(simulation): if mode==CSTE_MODE_LOCAL_REPO: repo.delete_messages(name) elif mode==CSTE_MODE_REMOTE_REPO: # prepare args # .. # execute repo.delete_messages(name) # process return values # .. elif mode==CSTE_MODE_REMOTE_REPO_STUB: pass else: raise Exception("ERR019 - incorrect mode") def create_message(message,simulation): if mode==CSTE_MODE_LOCAL_REPO: repo.create_message(message,simulation) elif mode==CSTE_MODE_REMOTE_REPO: # prepare args # .. # debug #print "%s %s %s"%(simulation.name,message.code,"message.body") # execute try: repo.create_message(simulation.name,message.code,"message.body") except ValueError as i: print "bla (%s)"%str(i) commit() # process return values # .. elif mode==CSTE_MODE_REMOTE_REPO_STUB: pass else: raise Exception("ERR020 - incorrect mode") commit() def retrieve_last_message(simulation): message=None if mode==CSTE_MODE_LOCAL_REPO: message=repo.retrieve_last_message(simulation) elif mode==CSTE_MODE_REMOTE_REPO: # execute message=repo.retrieve_last_message(simulation.name) if message is None: raise Exception("ERR221 - null value") # process return values di={} di["crea_date"]=message.CreateDate message=types.Message(di) elif mode==CSTE_MODE_REMOTE_REPO_STUB: pass else: raise Exception("ERR021 - incorrect mode") return message # --- higher level methods --- # def get_running_simulations(): running_simulation=[] for s in retrieve_simulations(): if s.status=="running": running_simulation.append(s) return running_simulation