source: trunk/LATMOS-Accounts/lib/LATMOS/Accounts/Synchro.pm @ 1354

Last change on this file since 1354 was 1354, checked in by nanardon, 9 years ago

Add filtering option on object to sync to base

  • Property svn:keywords set to Id Rev
File size: 13.3 KB
Line 
1package LATMOS::Accounts::Synchro;
2
3use 5.010000;
4use strict;
5use warnings;
6use base qw(Config::IniFiles);
7use LATMOS::Accounts::Bases;
8use LATMOS::Accounts::Log;
9use LATMOS::Accounts::Utils qw(exec_command);
10use Fcntl qw(:flock);
11
12=head1 NAME
13
14LATMOS::Accounts::Synchro - Perl extension for blah blah blah
15
16=head1 SYNOPSIS
17
18  use LATMOS::Accounts;
19  blah blah blah
20
21=head1 DESCRIPTION
22
23Stub documentation for LATMOS::Accounts, created by h2xs. It looks like the
24author of the extension was negligent enough to leave the stub
25unedited.
26
27Blah blah blah.
28
29=head1 FUNCTIONS
30
31=cut
32
33our $VERSION = (q$Rev$ =~ /^Rev: (\d+) /)[0];
34
35=head2 new($from, $to, %options)
36
37Create a new synchronisation where $from and $to are LATMOS::Accounts::Base
38based objects. $to can be an array ref of objects.
39
40=cut
41
42sub new {
43    my ($class, $from, $to, %options) = @_;
44
45    my $state_file = $options{state_dir}
46        ? $options{state_dir} . '/synchronisation.ini'
47        : undef;
48    if ($state_file && ! -w $state_file) {
49        # don't exists, we have to create it
50        open(my $handle, '>', $state_file) or do {
51            la_log(LA_ERR, "Cannot open status file %s", $state_file);
52            return;
53        };
54        print $handle "[_default_]\n";
55        close($handle);
56    }
57
58    my $self = Config::IniFiles->new(
59        $state_file
60        ? (-file => $state_file)
61        : (),
62    );
63
64    if ($state_file && !$self->GetFileName) {
65        $self->SetFileName($state_file);
66    }
67   
68    $self->{from} = $from or do {
69        la_log(LA_ERR, "No database source");
70        return;
71    };
72    $self->{options} = { %options };
73
74    # allow ref and array ref of, eg
75    # to = $foo and $to = [ $foo, $bar ]
76    foreach (ref($to) eq 'ARRAY' ? @{ $to || []} : ($to)) {
77        push(@{$self->{to}}, $_);
78    }
79    bless($self, $class)
80}
81
82=head2 name
83
84Return the name of this synchronisation
85
86=cut
87
88sub name {
89    $_[0]->{options}{name}
90}
91
92=head2 from
93
94Return the base object source for this synchronisation
95
96=cut
97
98sub from {
99    my ($self) = @_;
100    return $self->{from}
101}
102
103=head2 to
104
105Return the list of base destination for this synchronisation
106
107=cut
108
109sub to {
110    my ($self) = @_;
111    return @{$self->{to} || []};
112}
113
114=head2 load_dest
115
116Try to loaded all base, return the count of filtrered base which cannot
117be loaded
118
119=cut
120
121sub load_dest {
122    my ($self) = @_;
123    my @loaded;
124    my $unloaded = 0;
125    foreach ($self->to) {
126        if($_->load) {
127            push(@loaded, $_);
128        } else {
129            $unloaded++;
130            warn "Cannot load $_";
131        }
132    }
133    $self->{to} = \@loaded;
134    return $unloaded;
135}
136
137=head2 enter_synch_mode
138
139Configure base for synchronisation
140
141=cut
142
143sub enter_synch_mode {
144    my ($self) = @_;
145    $self->from->load or return;
146    # if any cannot be loaded, return,
147    # TODO we need a way to force if some still can be sync
148    $self->load_dest and return;
149    my %state = ();
150    $state{$self->from->label} = $self->from->wexported(
151        $self->{options}{unexported} ? 1 : 0
152    );
153    foreach ($self->to) {
154        $state{$_->label} = $_->wexported(1);
155    }
156    la_log(LA_DEBUG, "Entering synch mode, old state: %s", join(', ', map {
157            "$_ => $state{$_}" } sort keys %state));
158    %state
159}
160
161=head2 leave_synch_mode (%state)
162
163Retore base to previous state
164
165=cut
166
167sub leave_synch_mode {
168    my ($self, %state) = @_;
169    la_log(LA_DEBUG, "Leaving synch mode");
170    $self->from->wexported($state{$self->from->label});
171    foreach my $base (grep { $_ } $self->to) {
172        $base->commit;
173        $base->wexported($state{$base->label});
174    }
175}
176
177=head2 lock
178
179Create a lock to denied another synchronisation to run
180
181=cut
182
183sub lock {
184    my ($self) = @_;
185
186    $self->{lock}{handle} and return 1;
187    la_log(LA_DEBUG, "Trying to lock (pid $$)");
188    if ($self->{options}{state_dir}) {
189        my $lockfile = $self->{options}{state_dir} . '/synclock';
190        open(my $handle, '>>', $lockfile) or return;
191        flock($handle, LOCK_EX);
192        $self->{lock}{handle} = $handle;
193        $self->{lock}{filename} = $lockfile;
194        la_log(LA_DEBUG, "lock done (pid $$)");
195        return 1;
196    } else { return 1 }
197}
198
199=head2 unlock
200
201Remove lock
202
203=cut
204
205sub unlock {
206    my ($self) = @_;
207    if (my $handle = $self->{lock}{handle}) {
208        close($handle);
209        delete($self->{lock}{handle});
210        unlink($self->{lock}{filename});
211        delete($self->{lock}{filename});
212        return 1;
213    }
214    return;
215}
216
217=head2 sync_object ($otype, $uid, %options)
218
219Synchronise object type C<$otype> named C<$uid>
220
221=cut
222
223sub sync_object {
224    my ($self, $otype, $uid, %options) = @_;
225
226    $self->lock or return;
227
228    my %state = $self->enter_synch_mode;
229   
230    my $res = $self->_sync_object($otype, $uid, %options);
231
232    $self->leave_synch_mode(%state);
233
234    $self->unlock;
235
236    $res;
237}
238
239sub _sync_object {
240    my ($self, $otype, $uid, %options) = @_;
241    foreach ($self->to) {
242        my $res = $_->sync_object_from($self->from, $otype, $uid, %options);
243        if (defined $res) {
244            la_log(LA_NOTICE, $_->label . " $uid ($otype) $res") if ($res);
245            return 1;
246        } else {
247            la_log(LA_ERR, "error synching $uid ($otype) to " . $_->label);
248            return;
249        }
250    }
251}
252
253=head2 process
254
255Run the syncronisation
256
257=cut
258
259sub process {
260    my ($self, %options) = @_;
261
262    $self->lock or return;
263   
264    if (!(my $res = $self->run_pre_synchro({}))) {
265        la_log(LA_ERR, "Pre synchro script failed, aborting");
266        $self->unlock;
267        return;
268    }
269
270    my %state = $self->enter_synch_mode;
271
272    # tracking current base revision:
273    $self->{current_rev} = $self->from->current_rev;
274
275    my %desterror;
276    my $updated = 0;
277    foreach my $destbase ($self->to) {
278        my %objlist;
279        foreach my $otype ($self->from->list_supported_objects) {
280            $destbase->is_supported_object($otype) or next;
281
282            my %existings;
283            my %filtering;
284
285            $existings{$otype} = { map { $_ => 1 }
286                $self->from->list_objects($otype) };
287
288            # Is there a filter to apply:
289            my $filtername = 'filter.' . $destbase->label . '.' . $otype;
290            if (my $filter = $self->{options}->{$filtername}) {
291                la_log(LA_DEBUG, "Found %s param, using it: %s", $filtername, $filter);
292                $filtering{$otype} = { map { $_ => 1 }
293                    $self->from->search_objects($otype, $filter) };
294            } else {
295                $filtering{$otype} = $existings{$otype};
296            }
297
298
299            # deleting non existing object in dest:
300            my $deletefiltered = 'deletefiltered.' . $destbase->label . '.' . $otype;
301            foreach ($destbase->list_objects($otype)) {
302
303                if ($filtering{$otype}{$_}) {
304                    # the object must exists
305                    next;
306                } elsif ($existings{$otype}{$_}) {
307                    # object exists but is filtered
308                    if (!$self->{options}->{$deletefiltered}) {
309                        next;
310                    }
311                }
312
313                if (my $res = $destbase->sync_object_from($self->from,
314                        $otype, $_, %options)) {
315                    la_log(LA_NOTICE, "%s::%s::%s => %s %s",
316                        $self->from->label, $otype, $_, $destbase->label, $res,
317                    );
318                    if ($destbase->is_transactionnal) {
319                        $destbase->commit;
320                    }
321                    $updated = 1;
322                } else {
323                    if ($destbase->is_transactionnal) {
324                        $destbase->rollback;
325                    }
326                }
327            }
328
329            # Finding object to synchronize:
330            @{$objlist{$otype}} = grep { $filtering{$otype}{$_} } $self->from->list_objects_from_rev(
331                $otype,
332                $self->val($self->from->label, $destbase->label, 0),
333            );
334        }
335
336        my %objectok;
337        foreach my $pass (1, 0) {
338            foreach my $otype ($destbase->ordered_objects) {
339                exists($objlist{$otype}) or next;
340                foreach (@{$objlist{$otype} || []}) {
341
342                    # If first synchro is not ok, don't retry to sync the object
343                    if ((!$pass) && (!$objectok{$otype}{$_})) {
344                        next;
345                    }
346
347                    my $res = $destbase->sync_object_from($self->from, $otype, $_,
348                        %options, firstpass => $pass);
349                    if (defined $res) {
350                        if ($res) {
351                            la_log(LA_NOTICE, "%s::%s::%s => %s %s",
352                                $self->from->label, $otype, $_,
353                                $destbase->label, $res,
354                            );
355                            if ($destbase->is_transactionnal) {
356                                $destbase->commit;
357                            }
358                            $updated = 1;
359                            $objectok{$otype}{$_} = 1;
360                        }
361                    } else {
362                        la_log(LA_ERR, "Cannot synch %s::%s::%s => %s",
363                            $self->from->label, $otype, $_,
364                            $destbase->label,
365                        );
366                        $desterror{$destbase->label} = 1;
367                        if ($destbase->is_transactionnal) {
368                            $destbase->rollback;
369                        }
370                    }
371                }
372            }
373        }
374    }
375
376    $self->leave_synch_mode(%state);
377    my $res = $self->run_post_synchro(
378        {
379            UPDATED => $updated || undef,
380        }
381    );
382    if ($res) {
383        foreach my $destbase ($self->to) {
384            # don't register savepoint on error
385            if ($desterror{$destbase->label}) { next; }
386            $self->newval($self->from->label, $destbase->label, $self->{current_rev});
387        }
388
389        if(!($self->{options}{nocreate} ||
390                $self->{options}{test})) {
391            $self->write_status;
392        }
393    } else {
394        la_log(LA_ERROR, "Not updating status because post script failed");
395    }
396
397    $self->unlock;
398
399    if (!$res) { # postscript failure
400        return;
401    } elsif (grep { $desterror{$_} } keys %desterror) {
402        # There were errors :\
403        return;
404    } else {
405        return 1;
406    }
407}
408
409=head2 write_status
410
411Write savepoint file
412
413=cut
414
415sub write_status {
416    my ($self) = @_;
417    if (my $file = $self->GetFileName) {
418        open(my $handle, '>', $file) or do {
419            la_log(LA_ERR, "Cannot open status file %s for writing: %s",
420                $file, $!);
421            return;
422        };
423        my $oldfh = select($handle);
424        $self->OutputConfig();
425        select($oldfh);
426        close($handle); 
427        return 1;
428    }
429
430    return 0;
431}
432
433=head2 reset_savepoint
434
435Reset savepoint in status file to force full synchronisation
436
437=cut
438
439sub reset_savepoint {
440    my ($self) = @_;
441    foreach my $destbase ($self->to) {
442            # don't register savepoint on error
443        $self->newval($self->from->label, $destbase->label, 0);
444    }
445    $self->write_status;
446}
447
448=head2 run_pre_synchro
449
450Run task done before synchronisation
451
452=cut
453
454sub run_pre_synchro {
455    my ($self, $env) = @_;
456
457    $env ||= {};
458    $env->{HOOK_TYPE} = 'PRE';
459
460    foreach my $base ($self->to) {
461        if ($base->config('presynchro')) {
462            la_log LA_DEBUG, "Executing base pre synchro `%s' for %s",
463                $base->config('presynchro'), $base->label;
464            exec_command(
465                $base->config('presynchro'),
466                {
467                    BASE => $base->label,
468                    BASETYPE => $base->type,
469                    %{ $env },
470                }
471            );
472        }
473    }
474
475    $self->{options}{pre} or return 1;
476
477    la_log(LA_DEBUG, "Running post synchro `%s'", $self->{options}{pre});
478
479    exec_command($self->{options}{post}, $env);
480}
481
482=head2 run_post_synchro
483
484Run task done after synchronisation
485
486=cut
487
488sub run_post_synchro {
489    my ($self, $env) = @_;
490   
491    $env ||= {};
492    $env->{HOOK_TYPE} = 'PRE';
493
494    foreach my $base ($self->to) {
495        if ($base->config('postsynchro')) {
496            la_log LA_DEBUG, "Executing base post synchro `%s' for %s",
497                $base->config('postsynchro'), $base->label;
498            exec_command(
499                $base->config('postsynchro'),
500                {
501                    BASE => $base->label,
502                    BASETYPE => $base->type,
503                    %{ $env },
504                }
505            );
506        }
507    }
508
509    $self->{options}{post} or return 1;
510
511    la_log(LA_DEBUG, "Running post synchro `%s'", $self->{options}{post});
512   
513    exec_command($self->{options}{post}, $env);
514}
515
516
5171;
518
519__END__
520# Below is stub documentation for your module. You'd better edit it!
521
522=head1 SEE ALSO
523
524Mention other useful documentation such as the documentation of
525related modules or operating system documentation (such as man pages
526in UNIX), or any relevant external documentation such as RFCs or
527standards.
528
529If you have a mailing list set up for your module, mention it here.
530
531If you have a web site set up for your module, mention it here.
532
533=head1 AUTHOR
534
535Thauvin Olivier, E<lt>olivier.thauvin@latmosipsl.frE<gt>
536
537=head1 COPYRIGHT AND LICENSE
538
539Copyright (C) 2009 by Thauvin Olivier
540
541This library is free software; you can redistribute it and/or modify
542it under the same terms as Perl itself, either Perl version 5.10.0 or,
543at your option, any later version of Perl 5 you may have available.
544
545
546=cut
Note: See TracBrowser for help on using the repository browser.