New URL for NEMO forge!   http://forge.nemo-ocean.eu

Since March 2022 along with NEMO 4.2 release, the code development moved to a self-hosted GitLab.
This present forge is now archived and remained online for history.
TaskRunner.pm in vendors/lib/FCM/Util – NEMO

source: vendors/lib/FCM/Util/TaskRunner.pm @ 10669

Last change on this file since 10669 was 10669, checked in by nicolasmartin, 5 years ago

Import latest FCM release from Github into the repository for testing

File size: 11.7 KB
Line 
1# ------------------------------------------------------------------------------
2# (C) British Crown Copyright 2006-17 Met Office.
3#
4# This file is part of FCM, tools for managing and building source code.
5#
6# FCM is free software: you can redistribute it and/or modify
7# it under the terms of the GNU General Public License as published by
8# the Free Software Foundation, either version 3 of the License, or
9# (at your option) any later version.
10#
11# FCM is distributed in the hope that it will be useful,
12# but WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14# GNU General Public License for more details.
15#
16# You should have received a copy of the GNU General Public License
17# along with FCM. If not, see <http://www.gnu.org/licenses/>.
18# ------------------------------------------------------------------------------
19use strict;
20use warnings;
21# ------------------------------------------------------------------------------
22package FCM::Util::TaskRunner;
23use base qw{FCM::Class::CODE};
24
25my $P = 'FCM::Util::TaskRunner::Parallel';
26my $S = 'FCM::Util::TaskRunner::Serial';
27
28__PACKAGE__->class({util => '&'}, {action_of => {main => \&_main}});
29
30sub _main {
31    my ($attrib_ref, $action_ref, $n_workers) = @_;
32    $n_workers ||= 1;
33    my $class = $n_workers > 1 ? $P : $S;
34    $attrib_ref->{runner} = $class->new({
35        action    => $action_ref,
36        n_workers => $n_workers,
37        util      => $attrib_ref->{util},
38    });
39}
40
41# ------------------------------------------------------------------------------
42package FCM::Util::TaskRunner::Serial;
43use base qw{FCM::Class::CODE};
44
45__PACKAGE__->class(
46    {action => '&', util => '&'},
47    {action_of => {destroy => sub {}, main => \&_main}},
48);
49
50sub _main {
51    my ($attrib_ref, $get_ref, $put_ref) = @_;
52    my $n_done = 0;
53    while (my $task = $get_ref->()) {
54        my $timer = $attrib_ref->{util}->timer();
55        eval {
56            $task->set_state($task->ST_WORKING);
57            $attrib_ref->{action}->($task->get_ctx());
58            $task->set_state($task->ST_OK);
59        };
60        if ($@) {
61            $task->set_error($@);
62            $task->set_state($task->ST_FAILED);
63        }
64        $task->set_elapse($timer->());
65        $put_ref->($task);
66        ++$n_done;
67    }
68    $n_done;
69}
70
71# ------------------------------------------------------------------------------
72package FCM::Util::TaskRunner::Parallel;
73use base qw{FCM::Class::CODE};
74
75use FCM::Context::Event;
76use IO::Select;
77use IO::Socket;
78use List::Util qw{first};
79use POSIX qw{WNOHANG};
80use Socket qw{AF_UNIX SOCK_STREAM PF_UNSPEC};
81use Storable qw{freeze thaw};
82
83# Package name of worker event and state
84my $CTX_EVENT = 'FCM::Context::Event';
85my $CTX_STATE = 'FCM::Util::TaskRunner::WorkerState';
86
87# Length of a packed long integer
88my $LEN_OF_LONG = length(pack('N', 0));
89
90# Time out for polling sockets to child processes
91my $TIME_OUT = 0.05;
92
93# Creates the class.
94__PACKAGE__->class(
95    {   action        => '&',
96        n_workers     => '$',
97        worker_states => '@',
98        util          => '&',
99    },
100    {init => \&_init, action_of => {destroy => \&_destroy, main => \&_main}},
101);
102
103# Destroys the child processes.
104sub _destroy {
105    my $attrib_ref = shift();
106    local($SIG{CHLD}) = 'IGNORE';
107    my $select = IO::Select->new();
108    my @worker_states = @{$attrib_ref->{worker_states}};
109    for my $worker_state (@worker_states) {
110        $select->add($worker_state->get_socket());
111    }
112    # TBD: reads $socket for any left over event etc?
113    for my $socket ($select->can_write(0)) {
114        my $worker_state = first {$_->get_socket() eq $socket} @worker_states;
115        _item_send($socket);
116        close($socket);
117        waitpid($worker_state->get_pid(), 0);
118    }
119    while (waitpid(-1, WNOHANG) > 0) {
120    }
121    $attrib_ref->{util}->event(
122        FCM::Context::Event->TASK_WORKERS, 'destroy', $attrib_ref->{n_workers},
123    );
124    1;
125}
126
127# On initialisation.
128sub _init {
129    my $attrib_ref = shift();
130    for my $i (1 .. $attrib_ref->{n_workers}) {
131        my ($from_boss, $from_worker)
132            = IO::Socket->socketpair(AF_UNIX, SOCK_STREAM, PF_UNSPEC);
133        if (!defined($from_boss) || !defined($from_worker)) {
134            die("socketpair: $!");
135        }
136        $from_worker->autoflush(1);
137        $from_boss->autoflush(1);
138        if (my $pid = fork()) {
139            # I am the boss
140            if ($pid < 0) {
141                die("fork: $!");
142            }
143            local($SIG{CHLD}, $SIG{INT}, $SIG{KILL}, $SIG{TERM}, $SIG{XCPU});
144            for my $key (qw{CHLD INT KILL TERM XCPU}) {
145                local($SIG{$key}) = sub {_destroy($attrib_ref, @_); die($!)};
146            }
147            close($from_worker);
148            push(
149                @{$attrib_ref->{worker_states}},
150                $CTX_STATE->new($pid, $from_boss),
151            );
152        }
153        elsif (defined($pid)) {
154            # I am a worker
155            close($from_boss);
156            $attrib_ref->{worker_states} = [];
157            open(STDIN, '/dev/null');
158            # Ensures that events are sent back to the boss process
159            my $util_of_event = bless(
160                sub {_item_send($from_worker, @_)},
161                __PACKAGE__ . '::WorkerEvent',
162            );
163            no strict 'refs';
164            *{__PACKAGE__ . '::WorkerEvent::main'}
165                = sub {my $self = shift(); $self->(@_)};
166            use strict 'refs';
167            $attrib_ref->{util}->util_of_event($util_of_event);
168            _worker(
169                $from_worker,
170                $attrib_ref->{action},
171                $attrib_ref->{util},
172            );
173            close($from_worker);
174            exit();
175        }
176        else {
177            die("fork: $!");
178        }
179    }
180    $attrib_ref->{util}->event(
181        FCM::Context::Event->TASK_WORKERS, 'init', $attrib_ref->{n_workers},
182    );
183}
184
185# Main function of the class.
186sub _main {
187    my ($attrib_ref, $get_ref, $put_ref) = @_;
188    my $n_done = 0;
189    my $n_wait = 0;
190    my $done_something = 1;
191    my $get_task_ref = _get_task_func($get_ref, $attrib_ref->{n_workers});
192    my $select = IO::Select->new();
193    my @worker_states = @{$attrib_ref->{worker_states}};
194    for my $worker_state (@worker_states) {
195        $select->add($worker_state->get_socket());
196    }
197    while ($n_wait || $done_something) {
198        $done_something = 0;
199        # Handles tasks back from workers
200        while (my @sockets = $select->can_read($TIME_OUT)) {
201            for my $socket (@sockets) {
202                my $worker_state
203                    = first {$socket eq $_->get_socket()} @worker_states;
204                my $item = _item_receive($socket);
205                if (defined($item)) {
206                    $done_something = 1;
207                    if ($item->isa('FCM::Context::Event')) {
208                        # Item is only an event, handles it
209                        $attrib_ref->{util}->event($item);
210                    }
211                    else {
212                        # Sends something back to the worker immediately
213                        if (defined(my $task = $get_task_ref->())) {
214                            _item_send($socket, $task);
215                        }
216                        else {
217                            --$n_wait;
218                            $worker_state->set_idle(1);
219                        }
220                        $put_ref->($item);
221                        ++$n_done;
222                    }
223                }
224            }
225        }
226        # Sends something to the idle workers
227        my @idle_worker_states = grep {$_->get_idle()} @worker_states;
228        if (@idle_worker_states) {
229            for my $worker_state (@idle_worker_states) {
230                if (defined(my $task = $get_task_ref->())) {
231                    _item_send($worker_state->get_socket(), $task);
232                    ++$n_wait;
233                    $done_something = 1;
234                    $worker_state->set_idle(0);
235                }
236            }
237        }
238        else {
239            $get_task_ref->(); # only adds more tasks to queue
240        }
241    }
242    $n_done;
243}
244
245# Returns a function to fetch more tasks into a queue.
246sub _get_task_func {
247    my ($get_ref, $n_workers) = @_;
248    my $max_n_in_queue = $n_workers * 2;
249    my @queue;
250    sub {
251        while (@queue < $max_n_in_queue && defined(my $task = $get_ref->())) {
252            push(@queue, $task);
253        }
254        if (!defined(wantarray())) {
255            return;
256        }
257        shift(@queue);
258    };
259}
260
261# Receives an item from a socket.
262sub _item_receive {
263    my ($socket) = @_;
264    my $len_of_data = unpack('N', _item_travel($socket, $LEN_OF_LONG));
265    $len_of_data ? thaw(_item_travel($socket, $len_of_data)) : undef;
266}
267
268# Sends an item to a socket.
269sub _item_send {
270    my ($socket, $item) = @_;
271    my $item_as_data = $item ? freeze($item) : q{};
272    my $message = pack('N', length($item_as_data)) . $item_as_data;
273    _item_travel($socket, length($message), $message);
274}
275
276# Helper for _item_receive/_item_send.
277sub _item_travel {
278    my ($socket, $len_to_travel, $data) = @_;
279    my $action
280        = defined($data) ? sub {syswrite($socket, $data, $_[0], $_[1])}
281        :                  sub {sysread( $socket, $data, $_[0], $_[1])}
282        ;
283    $data ||= q{};
284    my $n_bytes = 0;
285    while ($n_bytes < $len_to_travel) {
286        my $len_remain = $len_to_travel - $n_bytes;
287        my $n = $action->($len_remain, $n_bytes);
288        if (!defined($n)) {
289            die($!);
290        }
291        $n_bytes += $n;
292    }
293    $data;
294}
295
296# Performs the function of a worker. Receives a task. Actions it. Sends it back.
297sub _worker {
298    my ($socket, $action, $util) = @_;
299    while (defined(my $task = _item_receive($socket))) {
300        my $timer = $util->timer();
301        eval {
302            $task->set_state($task->ST_WORKING);
303            $action->($task->get_ctx());
304            $task->set_state($task->ST_OK);
305        };
306        if ($@) {
307            $task->set_state($task->ST_FAILED);
308            $task->set_error($@);
309        }
310        $task->set_elapse($timer->());
311        _item_send($socket, $task);
312    }
313    1;
314}
315
316# ------------------------------------------------------------------------------
317# The state of a worker.
318package FCM::Util::TaskRunner::WorkerState;
319use base qw{FCM::Class::HASH};
320
321__PACKAGE__->class(
322    {   'idle'   => {isa => '$', default => 1}, # worker is idle?
323        'pid'    => '$',                        # worker's PID
324        'socket' => '*',                        # socket to worker
325    },
326    {   init_attrib => sub {
327            my ($pid, $socket) = @_;
328            {'pid' => $pid, 'socket' => $socket};
329        },
330    },
331);
332
333# ------------------------------------------------------------------------------
3341;
335__END__
336
337=head1 NAME
338
339FCM::Util::TaskRunner
340
341=head1 SYNOPSIS
342
343    use FCM::Context::Task;
344    use FCM::Util;
345    my $util = FCM::Util->new(\%attrib);
346    # ... time passes
347    my $runner = $util->task_runner(\&do_task, 4); # run with 4 workers
348    # ... time passes
349    my $get_ref = sub {
350        # ... an iterator to return an FCM::Context::Task object
351        # one at a time, returns undef if there is no currently available task
352    };
353    my $put_ref = sub {
354        my ($task) = @_;
355        # ... callback at end of each task
356    };
357    my $n_done = $runner->main($get_ref, $put_ref);
358
359=head1 DESCRIPTION
360
361This module is part of L<FCM::Util|FCM::Util>. See the description of the
362task_runner() method for details.
363
364An instance of this class is a runner of tasks. It can be configured to work in
365serial (default) or parallel. The class is a sub-class of
366L<FCM::Class::CODE|FCM::Class::CODE>.
367
368=head1 SEE ALSO
369
370This module is inspired by the CPAN modules Parallel::Fork::BossWorker and
371Parallel::Fork::BossWorkerAsync.
372
373L<FCM::Context::Task|FCM::Context::Task>,
374L<FCM::Util::TaskManager|FCM::Util::TaskManager>
375
376=head1 COPYRIGHT
377
378(C) Crown copyright Met Office. All rights reserved.
379
380=cut
Note: See TracBrowser for help on using the repository browser.