source: trunk/LATMOS-Accounts/lib/LATMOS/Accounts/Synchro.pm @ 2436

Last change on this file since 2436 was 2436, checked in by nanardon, 4 years ago

Backup sync status file

  • Property svn:keywords set to Id Rev
File size: 15.1 KB
Line 
1package LATMOS::Accounts::Synchro;
2
3use 5.010000;
4use strict;
5use warnings;
6use base qw(Config::IniFiles);
7use LATMOS::Accounts::Bases;
8use LATMOS::Accounts::Log;
9use LATMOS::Accounts::Utils qw(exec_command);
10use Fcntl qw(:flock);
11use File::Copy;
12
13=head1 NAME
14
15LATMOS::Accounts::Synchro - Perl extension for blah blah blah
16
17=head1 SYNOPSIS
18
19  use LATMOS::Accounts;
20  blah blah blah
21
22=head1 DESCRIPTION
23
24Stub documentation for LATMOS::Accounts, created by h2xs. It looks like the
25author of the extension was negligent enough to leave the stub
26unedited.
27
28Blah blah blah.
29
30=head1 FUNCTIONS
31
32=cut
33
34our $VERSION = (q$Rev: 1915 $ =~ /^Rev: (\d+) /)[0];
35
36=head2 new($from, $to, %options)
37
38Create a new synchronisation where $from and $to are LATMOS::Accounts::Base
39based objects. $to can be an array ref of objects.
40
41=cut
42
43sub new {
44    my ($class, $from, $to, %options) = @_;
45
46    my $state_file = $options{state_dir}
47        ? $options{state_dir} . '/synchronisation.ini'
48        : undef;
49    if ($state_file && ! -w $state_file) {
50        # don't exists, we have to create it
51        open(my $handle, '>', $state_file) or do {
52            la_log(LA_ERR, "Cannot open status file %s", $state_file);
53            return;
54        };
55        print $handle "[_default_]\n";
56        close($handle);
57    }
58
59    my $self = Config::IniFiles->new(
60        $state_file
61        ? (-file => $state_file)
62        : (),
63    );
64
65    if ($state_file && !$self->GetFileName) {
66        $self->SetFileName($state_file);
67    }
68   
69    $self->{from} = $from or do {
70        la_log(LA_ERR, "No database source");
71        return;
72    };
73    $self->{options} = { %options };
74
75    # allow ref and array ref of, eg
76    # to = $foo and $to = [ $foo, $bar ]
77    foreach (ref($to) eq 'ARRAY' ? @{ $to || []} : ($to)) {
78        push(@{$self->{to}}, $_);
79    }
80    bless($self, $class)
81}
82
83=head2 name
84
85Return the name of this synchronisation
86
87=cut
88
89sub name {
90    $_[0]->{options}{name}
91}
92
93=head2 from
94
95Return the base object source for this synchronisation
96
97=cut
98
99sub from {
100    my ($self) = @_;
101    return $self->{from}
102}
103
104=head2 to
105
106Return the list of base destination for this synchronisation
107
108=cut
109
110sub to {
111    my ($self) = @_;
112    return @{$self->{to} || []};
113}
114
115=head2 load_dest
116
117Try to loaded all base, return the count of filtrered base which cannot
118be loaded
119
120=cut
121
122sub load_dest {
123    my ($self) = @_;
124    my @loaded;
125    my $unloaded = 0;
126    foreach ($self->to) {
127        if($_->load) {
128            push(@loaded, $_);
129        } else {
130            $unloaded++;
131            warn "Cannot load $_";
132        }
133    }
134    $self->{to} = \@loaded;
135    return $unloaded;
136}
137
138=head2 enter_synch_mode
139
140Configure base for synchronisation
141
142=cut
143
144sub enter_synch_mode {
145    my ($self) = @_;
146    $self->from->load or return;
147    # if any cannot be loaded, return,
148    # TODO we need a way to force if some still can be sync
149    $self->load_dest and return;
150    my %state = ();
151    $state{$self->from->label} = $self->from->wexported(
152        $self->{options}{unexported} ? 1 : 0
153    );
154    foreach ($self->to) {
155        $state{$_->label} = $_->wexported(1);
156    }
157    la_log(LA_DEBUG, "Entering synch mode, old state: %s", join(', ', map {
158            "$_ => $state{$_}" } sort keys %state));
159    %state
160}
161
162=head2 leave_synch_mode (%state)
163
164Retore base to previous state
165
166=cut
167
168sub leave_synch_mode {
169    my ($self, %state) = @_;
170    la_log(LA_DEBUG, "Leaving synch mode");
171    $self->from->wexported($state{$self->from->label});
172    foreach my $base (grep { $_ } $self->to) {
173        $base->wexported($state{$base->label});
174    }
175}
176
177=head2 lock
178
179Create a lock to denied another synchronisation to run
180
181=cut
182
183sub lock {
184    my ($self) = @_;
185
186    $self->{lock}{handle} and return 1;
187    la_log(LA_DEBUG, "Trying to lock (pid $$)");
188    if ($self->{options}{state_dir}) {
189        my $lockfile = $self->{options}{state_dir} . '/synclock';
190        open(my $handle, '>>', $lockfile) or return;
191        flock($handle, LOCK_EX);
192        $self->{lock}{handle} = $handle;
193        $self->{lock}{filename} = $lockfile;
194        la_log(LA_DEBUG, "lock done (pid $$)");
195        return 1;
196    } else { return 1 }
197}
198
199=head2 unlock
200
201Remove lock
202
203=cut
204
205sub unlock {
206    my ($self) = @_;
207    if (my $handle = $self->{lock}{handle}) {
208        close($handle);
209        delete($self->{lock}{handle});
210        unlink($self->{lock}{filename});
211        delete($self->{lock}{filename});
212        return 1;
213    }
214    return;
215}
216
217=head2 sync_object ($otype, $uid, %options)
218
219Synchronise object type C<$otype> named C<$uid>
220
221=cut
222
223sub sync_object {
224    my ($self, $otype, $uid, %options) = @_;
225
226    $self->lock or return;
227
228    my %state = $self->enter_synch_mode;
229   
230    my $res = $self->_sync_object($otype, $uid, %options);
231
232    $self->leave_synch_mode(%state);
233
234    $self->unlock;
235
236    $res;
237}
238
239sub _sync_object {
240    my ($self, $otype, $uid, %options) = @_;
241    foreach ($self->to) {
242        my $res = $_->sync_object_from($self->from, $otype, $uid, %options);
243        if (defined $res) {
244            la_log(LA_NOTICE, $_->label . " $uid ($otype) $res") if ($res);
245            return 1;
246        } else {
247            la_log(LA_ERR, "error synching $uid ($otype) to " . $_->label);
248            return;
249        }
250    }
251}
252
253=head2 process
254
255Run the syncronisation
256
257=cut
258
259sub process {
260    my ($self, %options) = @_;
261
262    $self->lock or return;
263   
264    if (!(my $res = $self->run_pre_synchro({}))) {
265        la_log(LA_ERR, "Pre synchro script failed, aborting");
266        $self->unlock;
267        return;
268    }
269
270    my %state = $self->enter_synch_mode;
271
272    # tracking current base revision:
273    $self->{current_rev} = $self->from->current_rev;
274
275    my %desterror;
276    my $updated = 0;
277
278    # We do base one by one
279
280    foreach my $destbase ($self->to) {
281        my %objlist;
282        foreach my $otype ($self->from->list_supported_objects) {
283            $destbase->is_supported_object($otype) or next;
284
285            my %existings;
286            my %filtering;
287
288            $existings{$otype} = { map { $_ => 1 }
289                $self->from->listRealObjects($otype) };
290
291            # Is there a filter to apply:
292            my $filtername = 'filter.' . $destbase->label . '.' . $otype;
293            if (my $filter = $self->{options}->{$filtername}) {
294                la_log(LA_DEBUG, "Found %s param, using it: %s", $filtername, $filter);
295                $filtering{$otype} = { map { $_ => 1 }
296                    $self->from->search_objects($otype, $filter, 'oalias=NULL') };
297            } else {
298                $filtering{$otype} = $existings{$otype};
299            }
300
301
302            # deleting non existing object in dest:
303
304            # Sync: noDelete.Base = yes          mean no delete
305            #       noDelete.Base.Otype = yes    mean no delete for this object
306            if ($self->{options}->{'noDelete'} ||
307                $self->{options}->{'noDelete.' . $destbase->label} ||
308                $self->{options}->{'noDelete.' . $destbase->label . '.' . $otype}) {
309                la_log(LA_INFO, 'Not deleting object type \'%s` from base \'%s` because %s is set',
310                    $otype,
311                    $destbase->label,
312                    $self->{options}->{'noDelete.' . $destbase->label . '.' . $otype}
313                        ? 'noDelete.' . $destbase->label . '.' . $otype
314                        : $self->{options}->{'noDelete.' . $destbase->label}
315                            ? 'noDelete.' . $destbase->label
316                            : 'noDelete'
317                );
318            } else {
319
320            my $deletefiltered = 'deletefiltered.' . $destbase->label . '.' . $otype;
321
322            foreach ($destbase->listRealObjects($otype)) {
323
324                if ($filtering{$otype}{$_}) {
325                    # the object must exists
326                    next;
327                } elsif ($existings{$otype}{$_}) {
328                    # object exists but is filtered
329                    if (!$self->{options}->{$deletefiltered}) {
330                        next;
331                    }
332                }
333
334                if (my $res = $destbase->sync_object_from($self->from,
335                        $otype, $_, %options)) {
336                    la_log(LA_NOTICE, "%s::%s::%s => %s %s",
337                        $self->from->label, $otype, $_, $destbase->label, $res,
338                    );
339                    if ($destbase->is_transactionnal) {
340                        $destbase->commit;
341                    }
342                    $updated = 1;
343                } else {
344                    if ($destbase->is_transactionnal) {
345                        $destbase->rollback;
346                    }
347                }
348            }
349            } # noDelete.Base
350
351
352            # Finding object to synchronize:
353            {
354                my %ObjList = ();
355
356                # Objects to refresh
357                $ObjList{ $_ } = 1 foreach(grep { $filtering{$otype}{$_} } $self->from->list_objects_from_rev(
358                    $otype,
359                    $self->val($self->from->label, $destbase->label, 0),
360                ));
361
362                my %destExists = ( map { $_ => 1 }
363                    $destbase->listRealObjects($otype) );
364
365                # Objects missing in dest, we try to sync it
366                foreach (keys %{ $existings{ $otype } || {} }) {
367                    $destExists{ $_ } and next;
368                    $filtering{$otype}{$_} or next;
369                    $ObjList{ $_ } = 1;
370                }
371
372                @{$objlist{$otype}} = sort keys %ObjList;
373            }
374        }
375
376        my %objectok;
377        foreach my $pass (1, 0) {
378            foreach my $otype ($destbase->ordered_objects) {
379                exists($objlist{$otype}) or next;
380                foreach (@{$objlist{$otype} || []}) {
381
382                    # If first synchro is not ok, don't retry to sync the object
383                    if ((!$pass) && (!$objectok{$otype}{$_})) {
384                        next;
385                    }
386
387                    my $res = $destbase->sync_object_from($self->from, $otype, $_,
388                        %options, firstpass => $pass);
389                    if (defined $res) {
390                        if ($res) {
391                            la_log(LA_NOTICE, "%s::%s::%s => %s %s",
392                                $self->from->label, $otype, $_,
393                                $destbase->label, $res,
394                            );
395                            if ($destbase->is_transactionnal) {
396                                $destbase->commit;
397                            }
398                            $updated = 1;
399                            $objectok{$otype}{$_} = 1;
400                        }
401                    } else {
402                        la_log(LA_ERR, "Cannot synch %s::%s::%s => %s",
403                            $self->from->label, $otype, $_,
404                            $destbase->label,
405                        );
406                        $desterror{$destbase->label} = 1;
407                        if ($destbase->is_transactionnal) {
408                            $destbase->rollback;
409                        }
410                    }
411                }
412            }
413        }
414        if (!$desterror{$destbase->label}) {
415            $destbase->commit if(!$destbase->is_transactionnal);
416            $self->newval($self->from->label, $destbase->label, $self->{current_rev});
417            if(!($self->{options}{nocreate} || $self->{options}{test})) {
418                $self->write_status;
419                la_log(LA_NOTICE,
420                    "Update synch. status to %s for base %s to %s",
421                    $self->{current_rev} || '(none)', $self->from->label, $destbase->label
422                ); 
423            }
424        }
425    }
426
427    $self->leave_synch_mode(%state);
428    my $res = $self->run_post_synchro(
429        {
430            UPDATED => $updated || undef,
431        }
432    );
433    if ($res) { 
434    } else {
435        la_log(LA_ERROR, "Post synchronization script failed: %s", $res);
436    }
437
438    $self->unlock;
439
440    if (!$res) { # postscript failure
441        return;
442    } elsif (grep { $desterror{$_} } keys %desterror) {
443        # There were errors :\
444        return;
445    } else {
446        return 1;
447    }
448}
449
450=head2 write_status
451
452Write savepoint file
453
454=cut
455
456sub write_status {
457    my ($self) = @_;
458    if (my $file = $self->GetFileName) {
459        copy($file, "$file.old") or die "Cannot backup status $file";
460        open(my $handle, '>', $file) or do {
461            la_log(LA_ERR, "Cannot open status file %s for writing: %s",
462                $file, $!);
463            return;
464        };
465        my $oldfh = select($handle);
466        $self->OutputConfig();
467        select($oldfh);
468        close($handle); 
469        return 1;
470    }
471
472    return 0;
473}
474
475=head2 reset_savepoint
476
477Reset savepoint in status file to force full synchronisation
478
479=cut
480
481sub reset_savepoint {
482    my ($self) = @_;
483    foreach my $destbase ($self->to) {
484            # don't register savepoint on error
485        $self->newval($self->from->label, $destbase->label, 0);
486    }
487    $self->write_status;
488}
489
490=head2 run_pre_synchro
491
492Run task done before synchronisation
493
494=cut
495
496sub run_pre_synchro {
497    my ($self, $env) = @_;
498
499    $env ||= {};
500    $env->{HOOK_TYPE} = 'PRE';
501
502    foreach my $base ($self->to) {
503        if ($base->config('presynchro')) {
504            la_log LA_DEBUG, "Executing base pre synchro `%s' for %s",
505                $base->config('presynchro'), $base->label;
506            exec_command(
507                $base->config('presynchro'),
508                {
509                    BASE => $base->label,
510                    BASETYPE => $base->type,
511                    %{ $env },
512                }
513            );
514        }
515    }
516
517    $self->{options}{pre} or return 1;
518
519    la_log(LA_DEBUG, "Running post synchro `%s'", $self->{options}{pre});
520
521    exec_command($self->{options}{post}, $env);
522}
523
524=head2 run_post_synchro
525
526Run task done after synchronisation
527
528=cut
529
530sub run_post_synchro {
531    my ($self, $env) = @_;
532   
533    $env ||= {};
534    $env->{HOOK_TYPE} = 'PRE';
535
536    foreach my $base ($self->to) {
537        if ($base->config('postsynchro')) {
538            la_log LA_DEBUG, "Executing base post synchro `%s' for %s",
539                $base->config('postsynchro'), $base->label;
540            exec_command(
541                $base->config('postsynchro'),
542                {
543                    BASE => $base->label,
544                    BASETYPE => $base->type,
545                    %{ $env },
546                }
547            );
548        }
549    }
550
551    $self->{options}{post} or return 1;
552
553    la_log(LA_DEBUG, "Running post synchro `%s'", $self->{options}{post});
554   
555    exec_command($self->{options}{post}, $env);
556}
557
558
5591;
560
561__END__
562# Below is stub documentation for your module. You'd better edit it!
563
564=head1 SEE ALSO
565
566Mention other useful documentation such as the documentation of
567related modules or operating system documentation (such as man pages
568in UNIX), or any relevant external documentation such as RFCs or
569standards.
570
571If you have a mailing list set up for your module, mention it here.
572
573If you have a web site set up for your module, mention it here.
574
575=head1 AUTHOR
576
577Thauvin Olivier, E<lt>olivier.thauvin@latmosipsl.frE<gt>
578
579=head1 COPYRIGHT AND LICENSE
580
581Copyright (C) 2009 by Thauvin Olivier
582
583This library is free software; you can redistribute it and/or modify
584it under the same terms as Perl itself, either Perl version 5.10.0 or,
585at your option, any later version of Perl 5 you may have available.
586
587
588=cut
Note: See TracBrowser for help on using the repository browser.