source: trunk/Monitoring/smon/local_repo.py

Last change on this file was 937, checked in by jripsl, 11 years ago

Create AMQP queue on supervisor side if not exists already.

File size: 3.9 KB
RevLine 
[865]1# -*- coding: ISO-8859-1 -*-
2
3##################################
4#  @program        smon
5#  @description    simulation monitor
6#  @copyright      Copyright “(c)2009 Centre National de la Recherche Scientifique CNRS.
7#                             All Rights Reserved”
8#  @svn_file       $Id: repo_io.py 2599 2013-03-24 19:01:23Z jripsl $
9#  @version        $Rev: 2599 $
10#  @lastrevision   $Date: 2013-03-24 20:01:23 +0100 (Sun, 24 Mar 2013) $
11#  @license        CeCILL (http://dods.ipsl.jussieu.fr/jripsl/smon/LICENSE)
12##################################
13
14import sys
15import sqlite3
16
[871]17import types
18
[865]19_conn=None
20timeout=1
21
22def connect():
23        global _conn
24
25        if _conn is not None:
26                raise Exception()
27
[937]28        db_file="/opt/supervisor/local_db/supervisor.db"
[875]29        #db_file=":memory:"
[865]30
[875]31        _conn=sqlite3.connect(db_file,timeout)
32
[865]33        create_tables()
34
35def free():
36        global _conn
37
38        if _conn is None:
39                raise Exception()
40
41        _conn.close()
42
43        _conn=None
44
[875]45def commit():
46        """
47        public method
48
49        not used for now
50        """
51        _conn.commit()
52       
53def rollback():
54        _conn.rollback()
55
[865]56def create_tables():
[871]57
[875]58        _conn.execute("create table if not exists simulation (id INTEGER PRIMARY KEY, name TEXT, status TEXT)")
59        _conn.execute("create unique index if not exists idx_simulation_1 on simulation (name)")
[871]60
[879]61        _conn.execute("create table if not exists message (id INTEGER PRIMARY KEY, simulation_id TEXT, body TEXT, timestamp TEXT, crea_date TEXT)") # TODO: check how to use INT datatype for simulation_id column
[865]62
[875]63def cleanup():
64        _conn.execute("delete from simulation")
65        _conn.execute("delete from message")
66        _conn.commit()
67
[871]68def populate_tables_with_sample():
69
[875]70        rows = [('SIMU-001','running'),
71                        ('SIMU-002','running'),
72                        ('SIMU-003','running'),]
[871]73
[875]74        _conn.executemany('INSERT INTO simulation (name,status) VALUES (?,?)', rows)
75
76        _conn.commit()
77
[871]78def retrieve_simulations():
79        li=[]
80        c=_conn.cursor()
81
[875]82        c.execute("select name,id,status from simulation")
[871]83
84        rs=c.fetchone()
85        while rs is not None:
[876]86
87                s=types.Simulation(name=rs[0],id=rs[1],status=rs[2])
88
89                li.append(s)
90
91
[871]92                rs=c.fetchone()
93
94        return li
95
[865]96def retrieve_simulation(name):
97        c=_conn.cursor()
98
[875]99        c.execute("select name,id,status from simulation where name = ?",(name,))
[865]100
101        rs=c.fetchone()
102
103        if rs is None:
[879]104                raise Exception("name=%s"%name)
[865]105
[875]106        return types.Simulation(name=rs[0],id=rs[1],status=rs[2])
[865]107   
108def delete_simulation(simulation):
109        _conn.execute("delete from simulation where name = ?",(simulation.name,))
110
[875]111        _conn.commit()
112
[865]113def create_simulation(simulation):
[875]114        _conn.execute("insert into simulation (name,status) values (?,?)",(simulation.name,simulation.status))
115
116        _conn.commit()
[865]117   
118def update_simulation_status(simulation):
119        _conn.execute("update simulation set status=? where name = ?",(simulation.status,simulation.name))
120
[875]121        _conn.commit()
122
[865]123def retrieve_messages(simulation):
124        li=[]
125
126        c=_conn.cursor()
127
[879]128        c.execute("select id from message where simulation_id = ?",(simulation.id,))
[865]129
130        rs=c.fetchone()
131        while rs is not None:
[871]132                li.append(types.Message(id=rs[0]))
[865]133                rs=c.fetchone()
134
135        return li
136
137def delete_messages(simulation):
138        _conn.execute("delete from message where simulation_id = ?",(simulation.id,))
139
[875]140        _conn.commit()
141
[877]142def create_message(message,simulation):
[876]143       
[879]144        _conn.execute("insert into message (simulation_id,timestamp,crea_date) values (?,?,datetime('now', 'localtime'))",(simulation.id, message.timestamp))
[865]145
[875]146        _conn.commit()
147
[865]148def retrieve_last_message(simulation):
149        c=_conn.cursor()
150
[879]151        # debug
152        #print "simulation_id=%d"%simulation.id
[865]153
[879]154        c.execute("select id, simulation_id, body, timestamp, crea_date from message where simulation_id=? order by crea_date desc limit 1",(simulation.id,))
155
[865]156        rs=c.fetchone()
157
158        if rs is None:
[879]159
160                # debug
161                #print "simulation not found (%d)"%simulation.id
162
[876]163                raise types.MessageNotFoundException()
[865]164
[879]165        # HACK (we want the Message constructor to support BOTH **kw AND kw)
166        di={}
167        di["id"]=rs[0]
168        di["simulation_id"]=rs[1]
169        di["body"]=rs[2]
170        di["timestamp"]=id=rs[3]
171        di["crea_date"]=rs[4]
172
173        m=types.Message(di)
174
175        return m
176
177
Note: See TracBrowser for help on using the repository browser.