package LATMOS::Accounts::Synchro; use 5.010000; use strict; use warnings; use base qw(Config::IniFiles); use LATMOS::Accounts::Bases; use LATMOS::Accounts::Log; use LATMOS::Accounts::Utils qw(exec_command); use Fcntl qw(:flock); =head1 NAME LATMOS::Accounts::Synchro - Perl extension for blah blah blah =head1 SYNOPSIS use LATMOS::Accounts; blah blah blah =head1 DESCRIPTION Stub documentation for LATMOS::Accounts, created by h2xs. It looks like the author of the extension was negligent enough to leave the stub unedited. Blah blah blah. =head1 FUNCTIONS =cut our $VERSION = (q$Rev$ =~ /^Rev: (\d+) /)[0]; =head2 new($from, $to, %options) Create a new synchronisation where $from and $to are LATMOS::Accounts::Base based objects. $to can be an array ref of objects. =cut sub new { my ($class, $from, $to, %options) = @_; my $state_file = $options{state_dir} ? $options{state_dir} . '/synchronisation.ini' : undef; if ($state_file && ! -w $state_file) { # don't exists, we have to create it open(my $handle, '>', $state_file) or do { la_log(LA_ERR, "Cannot open status file %s", $state_file); return; }; print $handle "[_default_]\n"; close($handle); } my $self = Config::IniFiles->new( $state_file ? (-file => $state_file) : (), ); if ($state_file && !$self->GetFileName) { $self->SetFileName($state_file); } $self->{from} = $from or do { la_log(LA_ERR, "No database source"); return; }; $self->{options} = { %options }; # allow ref and array ref of, eg # to = $foo and $to = [ $foo, $bar ] foreach (ref($to) eq 'ARRAY' ? @{ $to || []} : ($to)) { push(@{$self->{to}}, $_); } bless($self, $class) } =head2 name Return the name of this synchronisation =cut sub name { $_[0]->{options}{name} } =head2 from Return the base object source for this synchronisation =cut sub from { my ($self) = @_; return $self->{from} } =head2 to Return the list of base destination for this synchronisation =cut sub to { my ($self) = @_; return @{$self->{to} || []}; } =head2 load_dest Try to loaded all base, return the count of filtrered base which cannot be loaded =cut sub load_dest { my ($self) = @_; my @loaded; my $unloaded = 0; foreach ($self->to) { if($_->load) { push(@loaded, $_); } else { $unloaded++; warn "Cannot load $_"; } } $self->{to} = \@loaded; return $unloaded; } =head2 enter_synch_mode Configure base for synchronisation =cut sub enter_synch_mode { my ($self) = @_; $self->from->load or return; # if any cannot be loaded, return, # TODO we need a way to force if some still can be sync $self->load_dest and return; my %state = (); $state{$self->from->label} = $self->from->wexported( $self->{options}{unexported} ? 1 : 0 ); foreach ($self->to) { $state{$_->label} = $_->wexported(1); } la_log(LA_DEBUG, "Entering synch mode, old state: %s", join(', ', map { "$_ => $state{$_}" } sort keys %state)); %state } =head2 leave_synch_mode (%state) Retore base to previous state =cut sub leave_synch_mode { my ($self, %state) = @_; la_log(LA_DEBUG, "Leaving synch mode"); $self->from->wexported($state{$self->from->label}); foreach my $base (grep { $_ } $self->to) { $base->wexported($state{$base->label}); } } =head2 lock Create a lock to denied another synchronisation to run =cut sub lock { my ($self) = @_; $self->{lock}{handle} and return 1; la_log(LA_DEBUG, "Trying to lock (pid $$)"); if ($self->{options}{state_dir}) { my $lockfile = $self->{options}{state_dir} . '/synclock'; open(my $handle, '>>', $lockfile) or return; flock($handle, LOCK_EX); $self->{lock}{handle} = $handle; $self->{lock}{filename} = $lockfile; la_log(LA_DEBUG, "lock done (pid $$)"); return 1; } else { return 1 } } =head2 unlock Remove lock =cut sub unlock { my ($self) = @_; if (my $handle = $self->{lock}{handle}) { close($handle); delete($self->{lock}{handle}); unlink($self->{lock}{filename}); delete($self->{lock}{filename}); return 1; } return; } =head2 sync_object ($otype, $uid, %options) Synchronise object type C<$otype> named C<$uid> =cut sub sync_object { my ($self, $otype, $uid, %options) = @_; $self->lock or return; my %state = $self->enter_synch_mode; my $res = $self->_sync_object($otype, $uid, %options); $self->leave_synch_mode(%state); $self->unlock; $res; } sub _sync_object { my ($self, $otype, $uid, %options) = @_; foreach ($self->to) { my $res = $_->sync_object_from($self->from, $otype, $uid, %options); if (defined $res) { la_log(LA_NOTICE, $_->label . " $uid ($otype) $res") if ($res); return 1; } else { la_log(LA_ERR, "error synching $uid ($otype) to " . $_->label); return; } } } =head2 process Run the syncronisation =cut sub process { my ($self, %options) = @_; $self->lock or return; if (!(my $res = $self->run_pre_synchro({}))) { la_log(LA_ERR, "Pre synchro script failed, aborting"); $self->unlock; return; } my %state = $self->enter_synch_mode; # tracking current base revision: $self->{current_rev} = $self->from->current_rev; my %desterror; my $updated = 0; # We do base one by one foreach my $destbase ($self->to) { my %objlist; foreach my $otype ($self->from->list_supported_objects) { $destbase->is_supported_object($otype) or next; my %existings; my %filtering; $existings{$otype} = { map { $_ => 1 } $self->from->listRealObjects($otype) }; # Is there a filter to apply: my $filtername = 'filter.' . $destbase->label . '.' . $otype; if (my $filter = $self->{options}->{$filtername}) { la_log(LA_DEBUG, "Found %s param, using it: %s", $filtername, $filter); $filtering{$otype} = { map { $_ => 1 } $self->from->search_objects($otype, $filter, 'oalias=NULL') }; } else { $filtering{$otype} = $existings{$otype}; } # deleting non existing object in dest: # Sync: noDelete.Base = yes mean no delete # noDelete.Base.Otype = yes mean no delete for this object if ($self->{options}->{'noDelete'} || $self->{options}->{'noDelete.' . $destbase->label} || $self->{options}->{'noDelete.' . $destbase->label . '.' . $otype}) { la_log(LA_INFO, 'Not deleting object type \'%s` from base \'%s` because %s is set', $otype, $destbase->label, $self->{options}->{'noDelete.' . $destbase->label . '.' . $otype} ? 'noDelete.' . $destbase->label . '.' . $otype : $self->{options}->{'noDelete.' . $destbase->label} ? 'noDelete.' . $destbase->label : 'noDelete' ); } else { my $deletefiltered = 'deletefiltered.' . $destbase->label . '.' . $otype; foreach ($destbase->listRealObjects($otype)) { if ($filtering{$otype}{$_}) { # the object must exists next; } elsif ($existings{$otype}{$_}) { # object exists but is filtered if (!$self->{options}->{$deletefiltered}) { next; } } if (my $res = $destbase->sync_object_from($self->from, $otype, $_, %options)) { la_log(LA_NOTICE, "%s::%s::%s => %s %s", $self->from->label, $otype, $_, $destbase->label, $res, ); if ($destbase->is_transactionnal) { $destbase->commit; } $updated = 1; } else { if ($destbase->is_transactionnal) { $destbase->rollback; } } } } # noDelete.Base # Finding object to synchronize: @{$objlist{$otype}} = grep { $filtering{$otype}{$_} } $self->from->list_objects_from_rev( $otype, $self->val($self->from->label, $destbase->label, 0), ); } my %objectok; foreach my $pass (1, 0) { foreach my $otype ($destbase->ordered_objects) { exists($objlist{$otype}) or next; foreach (@{$objlist{$otype} || []}) { # If first synchro is not ok, don't retry to sync the object if ((!$pass) && (!$objectok{$otype}{$_})) { next; } my $res = $destbase->sync_object_from($self->from, $otype, $_, %options, firstpass => $pass); if (defined $res) { if ($res) { la_log(LA_NOTICE, "%s::%s::%s => %s %s", $self->from->label, $otype, $_, $destbase->label, $res, ); if ($destbase->is_transactionnal) { $destbase->commit; } $updated = 1; $objectok{$otype}{$_} = 1; } } else { la_log(LA_ERR, "Cannot synch %s::%s::%s => %s", $self->from->label, $otype, $_, $destbase->label, ); $desterror{$destbase->label} = 1; if ($destbase->is_transactionnal) { $destbase->rollback; } } } } } if (!$desterror{$destbase->label}) { $destbase->commit if(!$destbase->is_transactionnal); $self->newval($self->from->label, $destbase->label, $self->{current_rev}); if(!($self->{options}{nocreate} || $self->{options}{test})) { $self->write_status; la_log(LA_NOTICE, "Update synch. status to %s for base %s to %s", $self->{current_rev} || '(none)', $self->from->label, $destbase->label ); } } } $self->leave_synch_mode(%state); my $res = $self->run_post_synchro( { UPDATED => $updated || undef, } ); if ($res) { } else { la_log(LA_ERROR, "Post synchronization script failed: %s", $res); } $self->unlock; if (!$res) { # postscript failure return; } elsif (grep { $desterror{$_} } keys %desterror) { # There were errors :\ return; } else { return 1; } } =head2 write_status Write savepoint file =cut sub write_status { my ($self) = @_; if (my $file = $self->GetFileName) { open(my $handle, '>', $file) or do { la_log(LA_ERR, "Cannot open status file %s for writing: %s", $file, $!); return; }; my $oldfh = select($handle); $self->OutputConfig(); select($oldfh); close($handle); return 1; } return 0; } =head2 reset_savepoint Reset savepoint in status file to force full synchronisation =cut sub reset_savepoint { my ($self) = @_; foreach my $destbase ($self->to) { # don't register savepoint on error $self->newval($self->from->label, $destbase->label, 0); } $self->write_status; } =head2 run_pre_synchro Run task done before synchronisation =cut sub run_pre_synchro { my ($self, $env) = @_; $env ||= {}; $env->{HOOK_TYPE} = 'PRE'; foreach my $base ($self->to) { if ($base->config('presynchro')) { la_log LA_DEBUG, "Executing base pre synchro `%s' for %s", $base->config('presynchro'), $base->label; exec_command( $base->config('presynchro'), { BASE => $base->label, BASETYPE => $base->type, %{ $env }, } ); } } $self->{options}{pre} or return 1; la_log(LA_DEBUG, "Running post synchro `%s'", $self->{options}{pre}); exec_command($self->{options}{post}, $env); } =head2 run_post_synchro Run task done after synchronisation =cut sub run_post_synchro { my ($self, $env) = @_; $env ||= {}; $env->{HOOK_TYPE} = 'PRE'; foreach my $base ($self->to) { if ($base->config('postsynchro')) { la_log LA_DEBUG, "Executing base post synchro `%s' for %s", $base->config('postsynchro'), $base->label; exec_command( $base->config('postsynchro'), { BASE => $base->label, BASETYPE => $base->type, %{ $env }, } ); } } $self->{options}{post} or return 1; la_log(LA_DEBUG, "Running post synchro `%s'", $self->{options}{post}); exec_command($self->{options}{post}, $env); } 1; __END__ # Below is stub documentation for your module. You'd better edit it! =head1 SEE ALSO Mention other useful documentation such as the documentation of related modules or operating system documentation (such as man pages in UNIX), or any relevant external documentation such as RFCs or standards. If you have a mailing list set up for your module, mention it here. If you have a web site set up for your module, mention it here. =head1 AUTHOR Thauvin Olivier, Eolivier.thauvin@latmosipsl.frE =head1 COPYRIGHT AND LICENSE Copyright (C) 2009 by Thauvin Olivier This library is free software; you can redistribute it and/or modify it under the same terms as Perl itself, either Perl version 5.10.0 or, at your option, any later version of Perl 5 you may have available. =cut