Changeset 333


Ignore:
Timestamp:
01/23/11 23:34:43 (13 years ago)
Author:
nanardon
Message:
  • massive rework of the base updater
Location:
server/trunk/web
Files:
6 added
5 edited
1 moved

Legend:

Unmodified
Added
Removed
  • server/trunk/web/bin/sophie_scan

    r332 r333  
    66use POSIX ":sys_wait_h"; 
    77use Getopt::Long; 
     8use Sophie::Scan; 
    89 
    910$ENV{LC_ALL} = 'C'; 
    10 require Sophie::Base; 
    11 require Sophie::Base::RpmsPath; 
     11require Sophie::Scan::RpmsPath; 
    1212 
    1313GetOptions( 
     
    2828} 
    2929 
     30my $update = 1; 
    3031alarm($updated_inotify); 
    3132while (1) { 
    3233    local $SIG{ALRM} = sub { 
    3334        alarm($updated_inotify); 
     35        $update = 1; 
    3436    }; 
    35     if (update_base(keys %modified_paths)) { 
    36         %modified_paths = (); 
     37    if ($update) { 
     38        if (update_base(keys %modified_paths)) { 
     39            %modified_paths = (); 
     40            $update = 0; 
     41        } 
    3742    } 
    3843    warn "$$ Inotify"; 
    3944    $inotify = inotify_path(); 
    40     $inotify->poll; 
     45    if ($inotify) { 
     46        $inotify->poll and $update = 1; 
     47    } else { 
     48        sleep(300); 
     49    } 
    4150} 
    4251 
     
    4453 
    4554    my $i = Linux::Inotify2->new; 
    46     my $sophie = Sophie::Base->connect; 
    4755     
    48     foreach ($sophie->resultset('Paths')->get_column('path')->all) { 
     56    my @paths = Sophie::Scan->new->list_paths; 
     57    if (!@paths) { 
     58        return; 
     59    } 
     60    foreach (@paths) { 
     61        -d $_ or next; 
    4962        $i->watch( 
    5063            $_, 
     
    7285    if (my $pid = fork()) { 
    7386    } else { 
    74  
    7587        alarm 0; 
     88        my $scan = Sophie::Scan->new; 
     89        $scan->update_meta_paths; 
    7690        my @pkey; 
    7791        { 
    78             my $sophie = Sophie::Base->connect or do { 
    79                 die "cannot read config file\n"; 
    80             }; 
    81             @pkey = $sophie->resultset('Paths')->search( 
    82                 path => [ @path ], 
    83             )->get_column('d_path_key')->all; 
    84             push(@pkey, $sophie->resultset('Paths')->search({ 
    85                 updated => [ undef,  
    86                     \[ " < now() - '24 hours'::interval"], 
    87                 ], 
    88             })->get_column('d_path_key')->all); 
    89             my %uniq = map { $_ => 1 } @pkey; 
    90             @pkey = keys %uniq; 
     92            @pkey = $scan->paths_to_keys(@path); 
     93             
     94            push(@pkey, $scan->list_unscanned_paths); 
    9195        } 
    9296 
    9397        exit(0) if (!@pkey); 
    9498 
     99        foreach my $pathkey (@pkey) { 
     100            my $time = time; 
     101            my @delta = Sophie::Scan::RpmsPath 
     102                ->new($pathkey, Sophie::Scan->new) 
     103                ->find_delta; 
     104            while (my @d = splice(@delta, 0, 25)) { 
     105                my $scan = Sophie::Scan->new; 
     106                my $path = Sophie::Scan::RpmsPath->new($pathkey, $scan); 
     107                $path->update_content(@d); 
     108                last if (time > $time + 15 * 60); 
     109            } 
    95110 
    96  
    97         my $NB_PAR = 1; 
    98         my @split; 
    99         my $div = @pkey / $NB_PAR; 
    100  
    101         for (my $i = 0; $i < $NB_PAR - 1; $i++) { 
    102             $split[$i] = [ splice(@pkey, 0, $div) ]; 
     111            Sophie::Scan::RpmsPath->new($pathkey, Sophie::Scan->new)->set_updated 
     112                if (!@delta); # update only if we finished 
    103113        } 
    104         $split[$NB_PAR -1] = [ @pkey ]; 
    105         foreach my $job (@split) { 
    106             @{$job || []} or next; 
    107  
    108             if (fork()) { 
    109             } else { 
    110  
    111                 foreach my $pathkey (@{ $job }) { 
    112                     my $time = time; 
    113                     my @delta = Sophie::Base::RpmsPath->new($pathkey, Sophie::Base->connect) 
    114                         ->find_delta; 
    115                     while (my @d = splice(@delta, 0, 25)) { 
    116                         my $path = Sophie::Base::RpmsPath->new($pathkey, Sophie::Base->connect); 
    117                         $path->update_content(@d); 
    118                         #$path->db->disconnect; 
    119                         # If it take too long time, next path 
    120                         last if (time > $time + 15 * 60); 
    121                     } 
    122                     Sophie::Base::RpmsPath->new($pathkey, Sophie::Base->connect)->set_updated 
    123                         if (!@delta); # update only if we finished 
    124                 } 
    125                 exit(0); 
    126             } 
    127         } 
    128         1 while(waitpid(-1, 0) <= 0); 
    129114        exit(0); 
    130115    } 
  • server/trunk/web/lib/Sophie/Base.pm

    r332 r333  
    2929        $connect_info->{user} = $config->{dbuser}; 
    3030        $connect_info->{password} = $config->{dbpassword}; 
    31         $connect_info->{unsafe} = 1; 
    3231    } 
    33     $connect_info->{PrintError} = 0; 
    34     $connect_info->{RaiseError} = 0; 
    35     exists($connect_info->{AutoCommit}) or $connect_info->{AutoCommit} = 0; 
    3632    $class->SUPER::connection( 
    3733        $connect_info, 
     
    4541} 
    4642 
    47  
    48  
    49431; 
  • server/trunk/web/lib/Sophie/Base/Result/Paths.pm

    r104 r333  
    66 
    77__PACKAGE__->table('d_path'); 
    8 __PACKAGE__->add_columns(qw/d_path_key path added updated/); 
     8__PACKAGE__->add_columns(qw/d_path_key path added updated meta_path exists/); 
    99__PACKAGE__->set_primary_key('d_path_key'); 
    1010__PACKAGE__->add_unique_constraint('path' => [ 'path' ]); 
    11 #__PACKAGE__->belongs_to(media => 'Sophie::Base::Result::Medias', 'media'); 
     11__PACKAGE__->belongs_to('MetaPaths' => 'Sophie::Base::Result::MetaPaths', 'meta_path'); 
    1212__PACKAGE__->has_many(MediasPaths => 'Sophie::Base::Result::MediasPaths', 'd_path'); 
    1313__PACKAGE__->has_many(Rpmfiles => 'Sophie::Base::Result::RpmFile', 'd_path'); 
  • server/trunk/web/lib/Sophie/Base/Result/Rpms.pm

    r94 r333  
    66 
    77__PACKAGE__->table('rpms'); 
    8 __PACKAGE__->add_columns(qw/pkgid summary description issrc name evr/); 
     8__PACKAGE__->add_columns(qw/pkgid summary description issrc name evr arch header/); 
    99__PACKAGE__->set_primary_key(qw/pkgid/); 
    1010__PACKAGE__->has_many(Rpmfile => 'Sophie::Base::Result::RpmFile', 'pkgid'); 
  • server/trunk/web/lib/Sophie/Base/Result/SrcFiles.pm

    r86 r333  
    77__PACKAGE__->table('srcfiles'); 
    88__PACKAGE__->add_columns(qw/pkgid count basename md5 user group linkto 
    9     mode fflags size class color vflags mtime nlink has_content/); 
     9    mode fflags size class color vflags mtime nlink has_content contents/); 
    1010__PACKAGE__->set_primary_key(qw/pkgid count/); 
    1111__PACKAGE__->belongs_to(Rpms => 'Sophie::Base::Result::Rpms', 'pkgid'); 
  • server/trunk/web/lib/Sophie/Scan/RpmsPath.pm

    r332 r333  
    1 package Sophie::Base::RpmsPath; 
     1package Sophie::Scan::RpmsPath; 
    22 
    33use strict; 
    44use warnings; 
    5 use base qw(Sophie::Base); 
    65use Sophie::Base::Header; 
    76use RPM4; 
     
    1211use Encode; 
    1312use Time::HiRes; 
     13use DBD::Pg qw(:pg_types); 
    1414 
    1515sub new { 
     
    2121sub key { $_[0]->{key} }  
    2222sub db {  
    23     $_[0]->{db}->storage->dbh 
     23    $_[0]->{db} 
    2424}  
    2525 
     
    2727    my ($self) = @_; 
    2828     
    29     my $sth = $self->db->prepare_cached( 
    30         q{select path from d_path where d_path_key = ?} 
    31     ); 
    32     $sth->execute($self->key); 
    33     my $res = $sth->fetchrow_hashref; 
    34     $sth->finish; 
    35     return $res->{path} 
     29    $self->db->base->resultset('Paths')->find( 
     30        { d_path_key => $self->key } 
     31    )->path; 
    3632} 
    3733 
     
    3935    my ($self) = @_; 
    4036 
    41     my $sth = $self->db->prepare_cached( 
    42         q{select * from rpmfiles where d_path = ?} 
    43     ); 
    44     $sth->execute($self->key); 
    45     $sth->fetchall_hashref([ 'filename' ]); 
     37    my %list; 
     38    foreach ($self->db->base->resultset('RpmFile')->search( 
     39        { d_path => $self->key } 
     40    )->get_column('filename')->all) { 
     41        $list{$_} = 1; 
     42    } 
     43    return \%list; 
    4644} 
    4745 
     
    9593    @delta; 
    9694} 
     95 
    9796sub update_content { 
    9897    my ($self, @delta) = @_; 
     
    117116sub set_exists { 
    118117    my ($self, $exists) = @_; 
    119     $self->db->prepare_cached(q{ 
    120         update d_path set exists = ? where d_path_key = ? 
    121         })->execute(($exists ? 1 : 0), $self->key); 
     118    $self->db->base->resultset('Paths')->find( 
     119        { d_path_key => $self->key } 
     120    )->update({ 'exists' => ($exists ? 1 : 0) }); 
    122121    $self->db->commit; 
    123122} 
     
    126125    my ($self) = @_; 
    127126    warn "$$ UPD"; 
    128     $self->db->prepare_cached(q{ 
    129         update d_path set updated = now() where d_path_key = ? 
    130         })->execute($self->key); 
     127    $self->db->base->resultset('Paths')->find( 
     128        { d_path_key => $self->key } 
     129    )->update({ 'updated' => \'now()' }); 
    131130    $self->db->commit; 
    132131} 
    133  
    134132 
    135133sub remove_rpm { 
    136134    my ($self, $rpm) = @_; 
    137135    warn "$$ deleting $rpm"; 
    138     my $remove = $self->db->prepare_cached( 
    139         q{ 
    140         DELETE FROM rpmfiles where d_path = ? and filename = ? 
     136    $self->db->base->storage->txn_do( 
     137        sub { 
     138 
     139            $self->db->base->resultset('RpmFile')->search( 
     140                { d_path => $self->key, filename => $rpm } 
     141            )->delete; 
    141142        } 
    142143    ); 
    143     for (1 .. 3) { 
    144         if ($remove->execute($self->key, $rpm)) {  
    145             $self->db->commit; 
    146             return 1; 
    147         } 
    148         $self->db->rollback; 
    149     } 
    150144} 
    151145 
     
    154148 
    155149    warn "$$ adding $rpm"; 
    156     for (1 .. 3) { 
    157         if (defined(my $pkgid = $self->_add_header($rpm))) { 
    158             $pkgid or return; 
    159             my $register = $self->db->prepare_cached( 
    160                 q{ 
    161                 INSERT INTO rpmfiles (d_path, filename, pkgid) 
    162                 values (?,?,?) 
    163                 } 
    164             ); 
    165             $register->execute($self->key, $rpm, $pkgid) and do { 
    166                 $self->db->commit; 
    167                 return 1; 
     150    eval { 
     151    my ($pkgid, $new) = $self->db->base->storage->txn_do( 
     152        sub { 
     153            my ($pkgid, $new) = $self->_add_header($rpm); 
     154            if (defined($pkgid)) { 
     155                $pkgid or return; 
     156                $self->db->base->resultset('RpmFile')->create( 
     157                    { 
     158                        d_path => $self->key, 
     159                        filename => $rpm, 
     160                        pkgid => $pkgid, 
     161                    } 
     162                );  
     163                return $pkgid, $new; 
    168164            } 
    169  
    170         } 
    171         $self->db->rollback; 
     165        }, 
     166    ); 
     167    foreach my $plugins (qw'sources') { 
     168        my $mod = ucfirst(lc($plugins)); 
     169        eval "require Sophie::Scan::RpmParser::$mod;"; 
     170        warn $@ if($@); 
     171        eval { 
     172            my $parser = "Sophie::Scan::RpmParser::$mod"->new($self->db); 
     173            $parser->run($self->path . '/' . $rpm, $pkgid, $new); 
     174        } 
     175    } 
    172176    } 
    173177} 
     
    186190 
    187191    { 
    188         my $find = $self->db->prepare_cached(q{ 
    189             select pkgid from rpms where pkgid = ? 
    190         }); 
    191         $find->execute($header->queryformat('%{PKGID}')); 
    192         my $rows = $find->rows; 
    193         $find->finish; 
    194         if ($rows) { 
     192        my $find = $self->db->base->resultset('Rpms')->search( 
     193            { pkgid => $header->queryformat('%{PKGID}') } 
     194        )->get_column('pkgid')->all; 
     195        if ($find) { 
    195196            warn "$$ Find"; 
    196             return $header->queryformat('%{PKGID}'); 
     197            return($header->queryformat('%{PKGID}'), 0); 
    197198        } 
    198199    } 
     
    204205    while (read($tmp, my $str, 1024)) { $string .= $str } 
    205206    $tmp = undef; 
    206     my $add_header = $self->db->prepare_cached( 
    207         q{ 
    208         INSERT into rpms (pkgid, name, header, evr, arch, issrc, description, summary) 
    209         values (?,?,rpmheader_in(decode(?, 'hex')::bytea),?,?,?,?,?) 
    210         } 
    211     ); 
    212207    my $description = $header->queryformat('%{DESCRIPTION}'); 
    213208    { 
     
    221216    } 
    222217 
    223     $add_header->execute( 
    224         $header->queryformat('%{PKGID}'), 
    225         $header->queryformat('%{name}'), 
    226         unpack('H*', $string), 
    227         $header->queryformat('%|EPOCH?{%{EPOCH}:}:{}|%{VERSION}-%{RELEASE}'), 
    228         $header->queryformat('%{ARCH}'), 
    229         $header->hastag('SOURCERPM') ? 'f' : 't', 
    230         $description, 
    231         $summary, 
    232     ) or return; 
    233     my $index_tag = $self->db->prepare_cached( 
     218    $self->db->base->resultset('Rpms')->create({ 
     219        pkgid  => $header->queryformat('%{PKGID}'), 
     220        name   => $header->queryformat('%{name}'), 
     221        header => \sprintf(qq{rpmheader_in(decode('%s', 'hex')::bytea)}, unpack('H*', $string)), 
     222        evr    => $header->queryformat('%|EPOCH?{%{EPOCH}:}:{}|%{VERSION}-%{RELEASE}'), 
     223        arch   => $header->queryformat('%{ARCH}'), 
     224        issrc  => $header->hastag('SOURCERPM') ? 'f' : 't', 
     225        description => $description, 
     226        summary => $summary, 
     227    }); 
     228    my $index_tag = $self->db->base->storage->dbh->prepare_cached( 
    234229        q{ 
    235230        select index_rpms(?); 
     
    238233    $index_tag->execute($header->queryformat('%{PKGID}')) or return; 
    239234    $index_tag->finish; 
    240     if (!$header->hastag('SOURCERPM')) { 
    241         Sophie::Base::Header->new($header->queryformat('%{PKGID}'), $self->{db}) 
    242             ->addfiles_content({ path => $self->path, filename => $rpm}) or return; 
    243     } 
    244  
    245     $header->queryformat('%{PKGID}'); 
     235 
     236    return($header->queryformat('%{PKGID}'), 1); 
    246237} 
    247238 
Note: See TracChangeset for help on using the changeset viewer.