New URL for NEMO forge!   http://forge.nemo-ocean.eu

Since March 2022 along with NEMO 4.2 release, the code development moved to a self-hosted GitLab.
This present forge is now archived and remained online for history.
lib_mpp.F90 in NEMO/branches/2019/dev_r11613_ENHANCE-04_namelists_as_internalfiles/src/OCE/LBC – NEMO

source: NEMO/branches/2019/dev_r11613_ENHANCE-04_namelists_as_internalfiles/src/OCE/LBC/lib_mpp.F90 @ 11648

Last change on this file since 11648 was 11648, checked in by acc, 5 years ago

Branch 2019/dev_r11613_ENHANCE-04_namelists_as_internalfiles. Introduce broadcast of namelist character buffer from single reader to all others. This completes the second stage but there is still an issue with AGRIF that may scupper this whole concept

  • Property svn:keywords set to Id
File size: 60.0 KB
Line 
1MODULE lib_mpp
2   !!======================================================================
3   !!                       ***  MODULE  lib_mpp  ***
4   !! Ocean numerics:  massively parallel processing library
5   !!=====================================================================
6   !! History :  OPA  !  1994  (M. Guyon, J. Escobar, M. Imbard)  Original code
7   !!            7.0  !  1997  (A.M. Treguier)  SHMEM additions
8   !!            8.0  !  1998  (M. Imbard, J. Escobar, L. Colombet ) SHMEM and MPI
9   !!                 !  1998  (J.M. Molines) Open boundary conditions
10   !!   NEMO     1.0  !  2003  (J.M. Molines, G. Madec)  F90, free form
11   !!                 !  2003  (J.M. Molines) add mpp_ini_north(_3d,_2d)
12   !!             -   !  2004  (R. Bourdalle Badie)  isend option in mpi
13   !!                 !  2004  (J.M. Molines) minloc, maxloc
14   !!             -   !  2005  (G. Madec, S. Masson)  npolj=5,6 F-point & ice cases
15   !!             -   !  2005  (R. Redler) Replacement of MPI_COMM_WORLD except for MPI_Abort
16   !!             -   !  2005  (R. Benshila, G. Madec)  add extra halo case
17   !!             -   !  2008  (R. Benshila) add mpp_ini_ice
18   !!            3.2  !  2009  (R. Benshila) SHMEM suppression, north fold in lbc_nfd
19   !!            3.2  !  2009  (O. Marti)    add mpp_ini_znl
20   !!            4.0  !  2011  (G. Madec)  move ctl_ routines from in_out_manager
21   !!            3.5  !  2012  (S.Mocavero, I. Epicoco) Add mpp_lnk_bdy_3d/2d routines to optimize the BDY comm.
22   !!            3.5  !  2013  (C. Ethe, G. Madec)  message passing arrays as local variables
23   !!            3.5  !  2013  (S.Mocavero, I.Epicoco - CMCC) north fold optimizations
24   !!            3.6  !  2015  (O. Tintó and M. Castrillo - BSC) Added '_multiple' case for 2D lbc and max
25   !!            4.0  !  2017  (G. Madec) automatique allocation of array argument (use any 3rd dimension)
26   !!             -   !  2017  (G. Madec) create generic.h90 files to generate all lbc and north fold routines
27   !!----------------------------------------------------------------------
28
29   !!----------------------------------------------------------------------
30   !!   ctl_stop      : update momentum and tracer Kz from a tke scheme
31   !!   ctl_warn      : initialization, namelist read, and parameters control
32   !!   ctl_opn       : Open file and check if required file is available.
33   !!   ctl_nam       : Prints informations when an error occurs while reading a namelist
34   !!   load_nml      : Read, condense and buffer namelist file into character array for use as an internal file
35   !!----------------------------------------------------------------------
36   !!----------------------------------------------------------------------
37   !!   mpp_start     : get local communicator its size and rank
38   !!   mpp_lnk       : interface (defined in lbclnk) for message passing of 2d or 3d arrays (mpp_lnk_2d, mpp_lnk_3d)
39   !!   mpp_lnk_icb   : interface for message passing of 2d arrays with extra halo for icebergs (mpp_lnk_2d_icb)
40   !!   mpprecv       :
41   !!   mppsend       :
42   !!   mppscatter    :
43   !!   mppgather     :
44   !!   mpp_min       : generic interface for mppmin_int , mppmin_a_int , mppmin_real, mppmin_a_real
45   !!   mpp_max       : generic interface for mppmax_int , mppmax_a_int , mppmax_real, mppmax_a_real
46   !!   mpp_sum       : generic interface for mppsum_int , mppsum_a_int , mppsum_real, mppsum_a_real
47   !!   mpp_minloc    :
48   !!   mpp_maxloc    :
49   !!   mppsync       :
50   !!   mppstop       :
51   !!   mpp_ini_north : initialisation of north fold
52   !!   mpp_lbc_north_icb : alternative to mpp_nfd for extra outer halo with icebergs
53   !!   mpp_bcast_nml : broadcast/receive namelist character buffer from reading process to all others
54   !!----------------------------------------------------------------------
55   USE dom_oce        ! ocean space and time domain
56   USE in_out_manager ! I/O manager
57
58   IMPLICIT NONE
59   PRIVATE
60   !
61   PUBLIC   ctl_stop, ctl_warn, ctl_opn, ctl_nam, load_nml
62   PUBLIC   mpp_start, mppstop, mppsync, mpp_comm_free
63   PUBLIC   mpp_ini_north
64   PUBLIC   mpp_min, mpp_max, mpp_sum, mpp_minloc, mpp_maxloc
65   PUBLIC   mpp_delay_max, mpp_delay_sum, mpp_delay_rcv
66   PUBLIC   mppscatter, mppgather
67   PUBLIC   mpp_ini_znl
68   PUBLIC   mppsend, mpprecv                          ! needed by TAM and ICB routines
69   PUBLIC   mpp_report
70   PUBLIC   mpp_bcast_nml
71   PUBLIC   tic_tac
72#if ! defined key_mpp_mpi
73   PUBLIC MPI_Wtime
74#endif
75   
76   !! * Interfaces
77   !! define generic interface for these routine as they are called sometimes
78   !! with scalar arguments instead of array arguments, which causes problems
79   !! for the compilation on AIX system as well as NEC and SGI. Ok on COMPACQ
80   INTERFACE mpp_min
81      MODULE PROCEDURE mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real
82   END INTERFACE
83   INTERFACE mpp_max
84      MODULE PROCEDURE mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real
85   END INTERFACE
86   INTERFACE mpp_sum
87      MODULE PROCEDURE mppsum_a_int, mppsum_int, mppsum_a_real, mppsum_real,   &
88         &             mppsum_realdd, mppsum_a_realdd
89   END INTERFACE
90   INTERFACE mpp_minloc
91      MODULE PROCEDURE mpp_minloc2d ,mpp_minloc3d
92   END INTERFACE
93   INTERFACE mpp_maxloc
94      MODULE PROCEDURE mpp_maxloc2d ,mpp_maxloc3d
95   END INTERFACE
96
97   !! ========================= !!
98   !!  MPI  variable definition !!
99   !! ========================= !!
100#if   defined key_mpp_mpi
101!$AGRIF_DO_NOT_TREAT
102   INCLUDE 'mpif.h'
103!$AGRIF_END_DO_NOT_TREAT
104   LOGICAL, PUBLIC, PARAMETER ::   lk_mpp = .TRUE.    !: mpp flag
105#else   
106   INTEGER, PUBLIC, PARAMETER ::   MPI_STATUS_SIZE = 1
107   INTEGER, PUBLIC, PARAMETER ::   MPI_DOUBLE_PRECISION = 8
108   LOGICAL, PUBLIC, PARAMETER ::   lk_mpp = .FALSE.    !: mpp flag
109#endif
110
111   INTEGER, PARAMETER         ::   nprocmax = 2**10   ! maximun dimension (required to be a power of 2)
112
113   INTEGER, PUBLIC ::   mppsize        ! number of process
114   INTEGER, PUBLIC ::   mpprank        ! process number  [ 0 - size-1 ]
115!$AGRIF_DO_NOT_TREAT
116   INTEGER, PUBLIC ::   mpi_comm_oce   ! opa local communicator
117!$AGRIF_END_DO_NOT_TREAT
118
119   INTEGER :: MPI_SUMDD
120
121   ! variables used for zonal integration
122   INTEGER, PUBLIC ::   ncomm_znl       !: communicator made by the processors on the same zonal average
123   LOGICAL, PUBLIC ::   l_znl_root      !: True on the 'left'most processor on the same row
124   INTEGER         ::   ngrp_znl        !  group ID for the znl processors
125   INTEGER         ::   ndim_rank_znl   !  number of processors on the same zonal average
126   INTEGER, DIMENSION(:), ALLOCATABLE, SAVE ::   nrank_znl  ! dimension ndim_rank_znl, number of the procs into the same znl domain
127
128   ! North fold condition in mpp_mpi with jpni > 1 (PUBLIC for TAM)
129   INTEGER, PUBLIC ::   ngrp_world        !: group ID for the world processors
130   INTEGER, PUBLIC ::   ngrp_opa          !: group ID for the opa processors
131   INTEGER, PUBLIC ::   ngrp_north        !: group ID for the northern processors (to be fold)
132   INTEGER, PUBLIC ::   ncomm_north       !: communicator made by the processors belonging to ngrp_north
133   INTEGER, PUBLIC ::   ndim_rank_north   !: number of 'sea' processor in the northern line (can be /= jpni !)
134   INTEGER, PUBLIC ::   njmppmax          !: value of njmpp for the processors of the northern line
135   INTEGER, PUBLIC ::   north_root        !: number (in the comm_opa) of proc 0 in the northern comm
136   INTEGER, PUBLIC, DIMENSION(:), ALLOCATABLE, SAVE ::   nrank_north   !: dimension ndim_rank_north
137
138   ! Communications summary report
139   CHARACTER(len=128), DIMENSION(:), ALLOCATABLE ::   crname_lbc                   !: names of lbc_lnk calling routines
140   CHARACTER(len=128), DIMENSION(:), ALLOCATABLE ::   crname_glb                   !: names of global comm calling routines
141   CHARACTER(len=128), DIMENSION(:), ALLOCATABLE ::   crname_dlg                   !: names of delayed global comm calling routines
142   INTEGER, PUBLIC                               ::   ncom_stp = 0                 !: copy of time step # istp
143   INTEGER, PUBLIC                               ::   ncom_fsbc = 1                !: copy of sbc time step # nn_fsbc
144   INTEGER, PUBLIC                               ::   ncom_dttrc = 1               !: copy of top time step # nn_dttrc
145   INTEGER, PUBLIC                               ::   ncom_freq                    !: frequency of comm diagnostic
146   INTEGER, PUBLIC , DIMENSION(:,:), ALLOCATABLE ::   ncomm_sequence               !: size of communicated arrays (halos)
147   INTEGER, PARAMETER, PUBLIC                    ::   ncom_rec_max = 5000          !: max number of communication record
148   INTEGER, PUBLIC                               ::   n_sequence_lbc = 0           !: # of communicated arraysvia lbc
149   INTEGER, PUBLIC                               ::   n_sequence_glb = 0           !: # of global communications
150   INTEGER, PUBLIC                               ::   n_sequence_dlg = 0           !: # of delayed global communications
151   INTEGER, PUBLIC                               ::   numcom = -1                  !: logical unit for communicaton report
152   LOGICAL, PUBLIC                               ::   l_full_nf_update = .TRUE.    !: logical for a full (2lines) update of bc at North fold report
153   INTEGER,                    PARAMETER, PUBLIC ::   nbdelay = 2       !: number of delayed operations
154   !: name (used as id) of allreduce-delayed operations
155   ! Warning: we must use the same character length in an array constructor (at least for gcc compiler)
156   CHARACTER(len=32), DIMENSION(nbdelay), PUBLIC ::   c_delaylist = (/ 'cflice', 'fwb   ' /)
157   !: component name where the allreduce-delayed operation is performed
158   CHARACTER(len=3),  DIMENSION(nbdelay), PUBLIC ::   c_delaycpnt = (/ 'ICE'   , 'OCE' /)
159   TYPE, PUBLIC ::   DELAYARR
160      REAL(   wp), POINTER, DIMENSION(:) ::  z1d => NULL()
161      COMPLEX(wp), POINTER, DIMENSION(:) ::  y1d => NULL()
162   END TYPE DELAYARR
163   TYPE( DELAYARR ), DIMENSION(nbdelay), PUBLIC, SAVE  ::   todelay         !: must have SAVE for default initialization of DELAYARR
164   INTEGER,          DIMENSION(nbdelay), PUBLIC        ::   ndelayid = -1   !: mpi request id of the delayed operations
165
166   ! timing summary report
167   REAL(wp), DIMENSION(2), PUBLIC ::  waiting_time = 0._wp
168   REAL(wp)              , PUBLIC ::  compute_time = 0._wp, elapsed_time = 0._wp
169   
170   REAL(wp), DIMENSION(:), ALLOCATABLE, SAVE ::   tampon   ! buffer in case of bsend
171
172   LOGICAL, PUBLIC ::   ln_nnogather                !: namelist control of northfold comms
173   LOGICAL, PUBLIC ::   l_north_nogather = .FALSE.  !: internal control of northfold comms
174   
175   !!----------------------------------------------------------------------
176   !! NEMO/OCE 4.0 , NEMO Consortium (2018)
177   !! $Id$
178   !! Software governed by the CeCILL license (see ./LICENSE)
179   !!----------------------------------------------------------------------
180CONTAINS
181
182   SUBROUTINE mpp_start( localComm )
183      !!----------------------------------------------------------------------
184      !!                  ***  routine mpp_start  ***
185      !!
186      !! ** Purpose :   get mpi_comm_oce, mpprank and mppsize
187      !!----------------------------------------------------------------------
188      INTEGER         , OPTIONAL   , INTENT(in   ) ::   localComm    !
189      !
190      INTEGER ::   ierr
191      LOGICAL ::   llmpi_init
192      !!----------------------------------------------------------------------
193#if defined key_mpp_mpi
194      !
195      CALL mpi_initialized ( llmpi_init, ierr )
196      IF( ierr /= MPI_SUCCESS ) CALL ctl_stop( 'STOP', ' lib_mpp: Error in routine mpi_initialized' )
197
198      IF( .NOT. llmpi_init ) THEN
199         IF( PRESENT(localComm) ) THEN
200            WRITE(ctmp1,*) ' lib_mpp: You cannot provide a local communicator '
201            WRITE(ctmp2,*) '          without calling MPI_Init before ! '
202            CALL ctl_stop( 'STOP', ctmp1, ctmp2 )
203         ENDIF
204         CALL mpi_init( ierr )
205         IF( ierr /= MPI_SUCCESS ) CALL ctl_stop( 'STOP', ' lib_mpp: Error in routine mpi_init' )
206      ENDIF
207       
208      IF( PRESENT(localComm) ) THEN
209         IF( Agrif_Root() ) THEN
210            mpi_comm_oce = localComm
211         ENDIF
212      ELSE
213         CALL mpi_comm_dup( mpi_comm_world, mpi_comm_oce, ierr)
214         IF( ierr /= MPI_SUCCESS ) CALL ctl_stop( 'STOP', ' lib_mpp: Error in routine mpi_comm_dup' )
215      ENDIF
216
217# if defined key_agrif
218      IF( Agrif_Root() ) THEN
219         CALL Agrif_MPI_Init(mpi_comm_oce)
220      ELSE
221         CALL Agrif_MPI_set_grid_comm(mpi_comm_oce)
222      ENDIF
223# endif
224
225      CALL mpi_comm_rank( mpi_comm_oce, mpprank, ierr )
226      CALL mpi_comm_size( mpi_comm_oce, mppsize, ierr )
227      !
228      CALL MPI_OP_CREATE(DDPDD_MPI, .TRUE., MPI_SUMDD, ierr)
229      !
230#else
231      IF( PRESENT( localComm ) ) mpi_comm_oce = localComm
232      mppsize = 1
233      mpprank = 0
234#endif
235   END SUBROUTINE mpp_start
236
237
238   SUBROUTINE mppsend( ktyp, pmess, kbytes, kdest, md_req )
239      !!----------------------------------------------------------------------
240      !!                  ***  routine mppsend  ***
241      !!
242      !! ** Purpose :   Send messag passing array
243      !!
244      !!----------------------------------------------------------------------
245      REAL(wp), INTENT(inout) ::   pmess(*)   ! array of real
246      INTEGER , INTENT(in   ) ::   kbytes     ! size of the array pmess
247      INTEGER , INTENT(in   ) ::   kdest      ! receive process number
248      INTEGER , INTENT(in   ) ::   ktyp       ! tag of the message
249      INTEGER , INTENT(in   ) ::   md_req     ! argument for isend
250      !!
251      INTEGER ::   iflag
252      !!----------------------------------------------------------------------
253      !
254#if defined key_mpp_mpi
255      CALL mpi_isend( pmess, kbytes, mpi_double_precision, kdest , ktyp, mpi_comm_oce, md_req, iflag )
256#endif
257      !
258   END SUBROUTINE mppsend
259
260
261   SUBROUTINE mpprecv( ktyp, pmess, kbytes, ksource )
262      !!----------------------------------------------------------------------
263      !!                  ***  routine mpprecv  ***
264      !!
265      !! ** Purpose :   Receive messag passing array
266      !!
267      !!----------------------------------------------------------------------
268      REAL(wp), INTENT(inout) ::   pmess(*)   ! array of real
269      INTEGER , INTENT(in   ) ::   kbytes     ! suze of the array pmess
270      INTEGER , INTENT(in   ) ::   ktyp       ! Tag of the recevied message
271      INTEGER, OPTIONAL, INTENT(in) :: ksource    ! source process number
272      !!
273      INTEGER :: istatus(mpi_status_size)
274      INTEGER :: iflag
275      INTEGER :: use_source
276      !!----------------------------------------------------------------------
277      !
278#if defined key_mpp_mpi
279      ! If a specific process number has been passed to the receive call,
280      ! use that one. Default is to use mpi_any_source
281      use_source = mpi_any_source
282      IF( PRESENT(ksource) )   use_source = ksource
283      !
284      CALL mpi_recv( pmess, kbytes, mpi_double_precision, use_source, ktyp, mpi_comm_oce, istatus, iflag )
285#endif
286      !
287   END SUBROUTINE mpprecv
288
289
290   SUBROUTINE mppgather( ptab, kp, pio )
291      !!----------------------------------------------------------------------
292      !!                   ***  routine mppgather  ***
293      !!
294      !! ** Purpose :   Transfert between a local subdomain array and a work
295      !!     array which is distributed following the vertical level.
296      !!
297      !!----------------------------------------------------------------------
298      REAL(wp), DIMENSION(jpi,jpj)      , INTENT(in   ) ::   ptab   ! subdomain input array
299      INTEGER                           , INTENT(in   ) ::   kp     ! record length
300      REAL(wp), DIMENSION(jpi,jpj,jpnij), INTENT(  out) ::   pio    ! subdomain input array
301      !!
302      INTEGER :: itaille, ierror   ! temporary integer
303      !!---------------------------------------------------------------------
304      !
305      itaille = jpi * jpj
306#if defined key_mpp_mpi
307      CALL mpi_gather( ptab, itaille, mpi_double_precision, pio, itaille     ,   &
308         &                            mpi_double_precision, kp , mpi_comm_oce, ierror )
309#else
310      pio(:,:,1) = ptab(:,:)
311#endif
312      !
313   END SUBROUTINE mppgather
314
315
316   SUBROUTINE mppscatter( pio, kp, ptab )
317      !!----------------------------------------------------------------------
318      !!                  ***  routine mppscatter  ***
319      !!
320      !! ** Purpose :   Transfert between awork array which is distributed
321      !!      following the vertical level and the local subdomain array.
322      !!
323      !!----------------------------------------------------------------------
324      REAL(wp), DIMENSION(jpi,jpj,jpnij)  ::   pio    ! output array
325      INTEGER                             ::   kp     ! Tag (not used with MPI
326      REAL(wp), DIMENSION(jpi,jpj)        ::   ptab   ! subdomain array input
327      !!
328      INTEGER :: itaille, ierror   ! temporary integer
329      !!---------------------------------------------------------------------
330      !
331      itaille = jpi * jpj
332      !
333#if defined key_mpp_mpi
334      CALL mpi_scatter( pio, itaille, mpi_double_precision, ptab, itaille     ,   &
335         &                            mpi_double_precision, kp  , mpi_comm_oce, ierror )
336#else
337      ptab(:,:) = pio(:,:,1)
338#endif
339      !
340   END SUBROUTINE mppscatter
341
342   
343   SUBROUTINE mpp_delay_sum( cdname, cdelay, y_in, pout, ldlast, kcom )
344     !!----------------------------------------------------------------------
345      !!                   ***  routine mpp_delay_sum  ***
346      !!
347      !! ** Purpose :   performed delayed mpp_sum, the result is received on next call
348      !!
349      !!----------------------------------------------------------------------
350      CHARACTER(len=*), INTENT(in   )               ::   cdname  ! name of the calling subroutine
351      CHARACTER(len=*), INTENT(in   )               ::   cdelay  ! name (used as id) of the delayed operation
352      COMPLEX(wp),      INTENT(in   ), DIMENSION(:) ::   y_in
353      REAL(wp),         INTENT(  out), DIMENSION(:) ::   pout
354      LOGICAL,          INTENT(in   )               ::   ldlast  ! true if this is the last time we call this routine
355      INTEGER,          INTENT(in   ), OPTIONAL     ::   kcom
356      !!
357      INTEGER ::   ji, isz
358      INTEGER ::   idvar
359      INTEGER ::   ierr, ilocalcomm
360      COMPLEX(wp), ALLOCATABLE, DIMENSION(:) ::   ytmp
361      !!----------------------------------------------------------------------
362#if defined key_mpp_mpi
363      ilocalcomm = mpi_comm_oce
364      IF( PRESENT(kcom) )   ilocalcomm = kcom
365
366      isz = SIZE(y_in)
367     
368      IF( narea == 1 .AND. numcom == -1 ) CALL mpp_report( cdname, ld_dlg = .TRUE. )
369
370      idvar = -1
371      DO ji = 1, nbdelay
372         IF( TRIM(cdelay) == TRIM(c_delaylist(ji)) ) idvar = ji
373      END DO
374      IF ( idvar == -1 )   CALL ctl_stop( 'STOP',' mpp_delay_sum : please add a new delayed exchange for '//TRIM(cdname) )
375
376      IF ( ndelayid(idvar) == 0 ) THEN         ! first call    with restart: %z1d defined in iom_delay_rst
377         !                                       --------------------------
378         IF ( SIZE(todelay(idvar)%z1d) /= isz ) THEN                  ! Check dimension coherence
379            IF(lwp) WRITE(numout,*) ' WARNING: the nb of delayed variables in restart file is not the model one'
380            DEALLOCATE(todelay(idvar)%z1d)
381            ndelayid(idvar) = -1                                      ! do as if we had no restart
382         ELSE
383            ALLOCATE(todelay(idvar)%y1d(isz))
384            todelay(idvar)%y1d(:) = CMPLX(todelay(idvar)%z1d(:), 0., wp)   ! create %y1d, complex variable needed by mpi_sumdd
385         END IF
386      ENDIF
387     
388      IF( ndelayid(idvar) == -1 ) THEN         ! first call without restart: define %y1d and %z1d from y_in with blocking allreduce
389         !                                       --------------------------
390         ALLOCATE(todelay(idvar)%z1d(isz), todelay(idvar)%y1d(isz))   ! allocate also %z1d as used for the restart
391         CALL mpi_allreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ierr )   ! get %y1d
392         todelay(idvar)%z1d(:) = REAL(todelay(idvar)%y1d(:), wp)      ! define %z1d from %y1d
393      ENDIF
394
395      IF( ndelayid(idvar) > 0 )   CALL mpp_delay_rcv( idvar )         ! make sure %z1d is received
396
397      ! send back pout from todelay(idvar)%z1d defined at previous call
398      pout(:) = todelay(idvar)%z1d(:)
399
400      ! send y_in into todelay(idvar)%y1d with a non-blocking communication
401# if defined key_mpi2
402      IF( ln_timing ) CALL tic_tac( .TRUE., ld_global = .TRUE.)
403      CALL  mpi_allreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ndelayid(idvar), ierr )
404      IF( ln_timing ) CALL tic_tac(.FALSE., ld_global = .TRUE.)
405# else
406      CALL mpi_iallreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ndelayid(idvar), ierr )
407# endif
408#else
409      pout(:) = REAL(y_in(:), wp)
410#endif
411
412   END SUBROUTINE mpp_delay_sum
413
414   
415   SUBROUTINE mpp_delay_max( cdname, cdelay, p_in, pout, ldlast, kcom )
416      !!----------------------------------------------------------------------
417      !!                   ***  routine mpp_delay_max  ***
418      !!
419      !! ** Purpose :   performed delayed mpp_max, the result is received on next call
420      !!
421      !!----------------------------------------------------------------------
422      CHARACTER(len=*), INTENT(in   )                 ::   cdname  ! name of the calling subroutine
423      CHARACTER(len=*), INTENT(in   )                 ::   cdelay  ! name (used as id) of the delayed operation
424      REAL(wp),         INTENT(in   ), DIMENSION(:)   ::   p_in    !
425      REAL(wp),         INTENT(  out), DIMENSION(:)   ::   pout    !
426      LOGICAL,          INTENT(in   )                 ::   ldlast  ! true if this is the last time we call this routine
427      INTEGER,          INTENT(in   ), OPTIONAL       ::   kcom
428      !!
429      INTEGER ::   ji, isz
430      INTEGER ::   idvar
431      INTEGER ::   ierr, ilocalcomm
432      !!----------------------------------------------------------------------
433#if defined key_mpp_mpi
434      ilocalcomm = mpi_comm_oce
435      IF( PRESENT(kcom) )   ilocalcomm = kcom
436
437      isz = SIZE(p_in)
438
439      IF( narea == 1 .AND. numcom == -1 ) CALL mpp_report( cdname, ld_dlg = .TRUE. )
440
441      idvar = -1
442      DO ji = 1, nbdelay
443         IF( TRIM(cdelay) == TRIM(c_delaylist(ji)) ) idvar = ji
444      END DO
445      IF ( idvar == -1 )   CALL ctl_stop( 'STOP',' mpp_delay_max : please add a new delayed exchange for '//TRIM(cdname) )
446
447      IF ( ndelayid(idvar) == 0 ) THEN         ! first call    with restart: %z1d defined in iom_delay_rst
448         !                                       --------------------------
449         IF ( SIZE(todelay(idvar)%z1d) /= isz ) THEN                  ! Check dimension coherence
450            IF(lwp) WRITE(numout,*) ' WARNING: the nb of delayed variables in restart file is not the model one'
451            DEALLOCATE(todelay(idvar)%z1d)
452            ndelayid(idvar) = -1                                      ! do as if we had no restart
453         END IF
454      ENDIF
455
456      IF( ndelayid(idvar) == -1 ) THEN         ! first call without restart: define %z1d from p_in with a blocking allreduce
457         !                                       --------------------------
458         ALLOCATE(todelay(idvar)%z1d(isz))
459         CALL mpi_allreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_DOUBLE_PRECISION, mpi_max, ilocalcomm, ierr )   ! get %z1d
460      ENDIF
461
462      IF( ndelayid(idvar) > 0 )   CALL mpp_delay_rcv( idvar )         ! make sure %z1d is received
463
464      ! send back pout from todelay(idvar)%z1d defined at previous call
465      pout(:) = todelay(idvar)%z1d(:)
466
467      ! send p_in into todelay(idvar)%z1d with a non-blocking communication
468# if defined key_mpi2
469      IF( ln_timing ) CALL tic_tac( .TRUE., ld_global = .TRUE.)
470      CALL  mpi_allreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_DOUBLE_PRECISION, mpi_max, ilocalcomm, ndelayid(idvar), ierr )
471      IF( ln_timing ) CALL tic_tac(.FALSE., ld_global = .TRUE.)
472# else
473      CALL mpi_iallreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_DOUBLE_PRECISION, mpi_max, ilocalcomm, ndelayid(idvar), ierr )
474# endif
475#else
476      pout(:) = p_in(:)
477#endif
478
479   END SUBROUTINE mpp_delay_max
480
481   
482   SUBROUTINE mpp_delay_rcv( kid )
483      !!----------------------------------------------------------------------
484      !!                   ***  routine mpp_delay_rcv  ***
485      !!
486      !! ** Purpose :  force barrier for delayed mpp (needed for restart)
487      !!
488      !!----------------------------------------------------------------------
489      INTEGER,INTENT(in   )      ::  kid 
490      INTEGER ::   ierr
491      !!----------------------------------------------------------------------
492#if defined key_mpp_mpi
493      IF( ndelayid(kid) /= -2 ) THEN 
494#if ! defined key_mpi2
495         IF( ln_timing ) CALL tic_tac( .TRUE., ld_global = .TRUE.)
496         CALL mpi_wait( ndelayid(kid), MPI_STATUS_IGNORE, ierr )                        ! make sure todelay(kid) is received
497         IF( ln_timing ) CALL tic_tac(.FALSE., ld_global = .TRUE.)
498#endif
499         IF( ASSOCIATED(todelay(kid)%y1d) )   todelay(kid)%z1d(:) = REAL(todelay(kid)%y1d(:), wp)  ! define %z1d from %y1d
500         ndelayid(kid) = -2   ! add flag to know that mpi_wait was already called on kid
501      ENDIF
502#endif
503   END SUBROUTINE mpp_delay_rcv
504
505   SUBROUTINE mpp_bcast_nml( cdnambuff , kleng )
506      CHARACTER(LEN=:)    , ALLOCATABLE, INTENT(INOUT) :: cdnambuff
507      INTEGER                          , INTENT(INOUT) :: kleng
508      !!----------------------------------------------------------------------
509      !!                  ***  routine mpp_bcast_nml  ***
510      !!
511      !! ** Purpose :   broadcast namelist character buffer
512      !!
513      !!----------------------------------------------------------------------
514      !!
515      INTEGER ::   iflag
516      !!----------------------------------------------------------------------
517      !
518#if defined key_mpp_mpi
519      call MPI_BCAST(kleng, 1, MPI_INT, 0, mpi_comm_oce, iflag)
520      call MPI_BARRIER(mpi_comm_oce, iflag)
521      IF ( .NOT. ALLOCATED(cdnambuff) ) ALLOCATE( CHARACTER(LEN=kleng) :: cdnambuff )
522      call MPI_BCAST(cdnambuff, kleng, MPI_CHARACTER, 0, mpi_comm_oce, iflag)
523      call MPI_BARRIER(mpi_comm_oce, iflag)
524#endif
525      !
526   END SUBROUTINE mpp_bcast_nml
527
528   
529   !!----------------------------------------------------------------------
530   !!    ***  mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real  ***
531   !!   
532   !!----------------------------------------------------------------------
533   !!
534#  define OPERATION_MAX
535#  define INTEGER_TYPE
536#  define DIM_0d
537#     define ROUTINE_ALLREDUCE           mppmax_int
538#     include "mpp_allreduce_generic.h90"
539#     undef ROUTINE_ALLREDUCE
540#  undef DIM_0d
541#  define DIM_1d
542#     define ROUTINE_ALLREDUCE           mppmax_a_int
543#     include "mpp_allreduce_generic.h90"
544#     undef ROUTINE_ALLREDUCE
545#  undef DIM_1d
546#  undef INTEGER_TYPE
547!
548#  define REAL_TYPE
549#  define DIM_0d
550#     define ROUTINE_ALLREDUCE           mppmax_real
551#     include "mpp_allreduce_generic.h90"
552#     undef ROUTINE_ALLREDUCE
553#  undef DIM_0d
554#  define DIM_1d
555#     define ROUTINE_ALLREDUCE           mppmax_a_real
556#     include "mpp_allreduce_generic.h90"
557#     undef ROUTINE_ALLREDUCE
558#  undef DIM_1d
559#  undef REAL_TYPE
560#  undef OPERATION_MAX
561   !!----------------------------------------------------------------------
562   !!    ***  mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real  ***
563   !!   
564   !!----------------------------------------------------------------------
565   !!
566#  define OPERATION_MIN
567#  define INTEGER_TYPE
568#  define DIM_0d
569#     define ROUTINE_ALLREDUCE           mppmin_int
570#     include "mpp_allreduce_generic.h90"
571#     undef ROUTINE_ALLREDUCE
572#  undef DIM_0d
573#  define DIM_1d
574#     define ROUTINE_ALLREDUCE           mppmin_a_int
575#     include "mpp_allreduce_generic.h90"
576#     undef ROUTINE_ALLREDUCE
577#  undef DIM_1d
578#  undef INTEGER_TYPE
579!
580#  define REAL_TYPE
581#  define DIM_0d
582#     define ROUTINE_ALLREDUCE           mppmin_real
583#     include "mpp_allreduce_generic.h90"
584#     undef ROUTINE_ALLREDUCE
585#  undef DIM_0d
586#  define DIM_1d
587#     define ROUTINE_ALLREDUCE           mppmin_a_real
588#     include "mpp_allreduce_generic.h90"
589#     undef ROUTINE_ALLREDUCE
590#  undef DIM_1d
591#  undef REAL_TYPE
592#  undef OPERATION_MIN
593
594   !!----------------------------------------------------------------------
595   !!    ***  mppsum_a_int, mppsum_int, mppsum_a_real, mppsum_real  ***
596   !!   
597   !!   Global sum of 1D array or a variable (integer, real or complex)
598   !!----------------------------------------------------------------------
599   !!
600#  define OPERATION_SUM
601#  define INTEGER_TYPE
602#  define DIM_0d
603#     define ROUTINE_ALLREDUCE           mppsum_int
604#     include "mpp_allreduce_generic.h90"
605#     undef ROUTINE_ALLREDUCE
606#  undef DIM_0d
607#  define DIM_1d
608#     define ROUTINE_ALLREDUCE           mppsum_a_int
609#     include "mpp_allreduce_generic.h90"
610#     undef ROUTINE_ALLREDUCE
611#  undef DIM_1d
612#  undef INTEGER_TYPE
613!
614#  define REAL_TYPE
615#  define DIM_0d
616#     define ROUTINE_ALLREDUCE           mppsum_real
617#     include "mpp_allreduce_generic.h90"
618#     undef ROUTINE_ALLREDUCE
619#  undef DIM_0d
620#  define DIM_1d
621#     define ROUTINE_ALLREDUCE           mppsum_a_real
622#     include "mpp_allreduce_generic.h90"
623#     undef ROUTINE_ALLREDUCE
624#  undef DIM_1d
625#  undef REAL_TYPE
626#  undef OPERATION_SUM
627
628#  define OPERATION_SUM_DD
629#  define COMPLEX_TYPE
630#  define DIM_0d
631#     define ROUTINE_ALLREDUCE           mppsum_realdd
632#     include "mpp_allreduce_generic.h90"
633#     undef ROUTINE_ALLREDUCE
634#  undef DIM_0d
635#  define DIM_1d
636#     define ROUTINE_ALLREDUCE           mppsum_a_realdd
637#     include "mpp_allreduce_generic.h90"
638#     undef ROUTINE_ALLREDUCE
639#  undef DIM_1d
640#  undef COMPLEX_TYPE
641#  undef OPERATION_SUM_DD
642
643   !!----------------------------------------------------------------------
644   !!    ***  mpp_minloc2d, mpp_minloc3d, mpp_maxloc2d, mpp_maxloc3d
645   !!   
646   !!----------------------------------------------------------------------
647   !!
648#  define OPERATION_MINLOC
649#  define DIM_2d
650#     define ROUTINE_LOC           mpp_minloc2d
651#     include "mpp_loc_generic.h90"
652#     undef ROUTINE_LOC
653#  undef DIM_2d
654#  define DIM_3d
655#     define ROUTINE_LOC           mpp_minloc3d
656#     include "mpp_loc_generic.h90"
657#     undef ROUTINE_LOC
658#  undef DIM_3d
659#  undef OPERATION_MINLOC
660
661#  define OPERATION_MAXLOC
662#  define DIM_2d
663#     define ROUTINE_LOC           mpp_maxloc2d
664#     include "mpp_loc_generic.h90"
665#     undef ROUTINE_LOC
666#  undef DIM_2d
667#  define DIM_3d
668#     define ROUTINE_LOC           mpp_maxloc3d
669#     include "mpp_loc_generic.h90"
670#     undef ROUTINE_LOC
671#  undef DIM_3d
672#  undef OPERATION_MAXLOC
673
674   SUBROUTINE mppsync()
675      !!----------------------------------------------------------------------
676      !!                  ***  routine mppsync  ***
677      !!
678      !! ** Purpose :   Massively parallel processors, synchroneous
679      !!
680      !!-----------------------------------------------------------------------
681      INTEGER :: ierror
682      !!-----------------------------------------------------------------------
683      !
684#if defined key_mpp_mpi
685      CALL mpi_barrier( mpi_comm_oce, ierror )
686#endif
687      !
688   END SUBROUTINE mppsync
689
690
691   SUBROUTINE mppstop( ld_abort ) 
692      !!----------------------------------------------------------------------
693      !!                  ***  routine mppstop  ***
694      !!
695      !! ** purpose :   Stop massively parallel processors method
696      !!
697      !!----------------------------------------------------------------------
698      LOGICAL, OPTIONAL, INTENT(in) :: ld_abort    ! source process number
699      LOGICAL ::   ll_abort
700      INTEGER ::   info
701      !!----------------------------------------------------------------------
702      ll_abort = .FALSE.
703      IF( PRESENT(ld_abort) ) ll_abort = ld_abort
704      !
705#if defined key_mpp_mpi
706      IF(ll_abort) THEN
707         CALL mpi_abort( MPI_COMM_WORLD )
708      ELSE
709         CALL mppsync
710         CALL mpi_finalize( info )
711      ENDIF
712#endif
713      IF( ll_abort ) STOP 123
714      !
715   END SUBROUTINE mppstop
716
717
718   SUBROUTINE mpp_comm_free( kcom )
719      !!----------------------------------------------------------------------
720      INTEGER, INTENT(in) ::   kcom
721      !!
722      INTEGER :: ierr
723      !!----------------------------------------------------------------------
724      !
725#if defined key_mpp_mpi
726      CALL MPI_COMM_FREE(kcom, ierr)
727#endif
728      !
729   END SUBROUTINE mpp_comm_free
730
731
732   SUBROUTINE mpp_ini_znl( kumout )
733      !!----------------------------------------------------------------------
734      !!               ***  routine mpp_ini_znl  ***
735      !!
736      !! ** Purpose :   Initialize special communicator for computing zonal sum
737      !!
738      !! ** Method  : - Look for processors in the same row
739      !!              - Put their number in nrank_znl
740      !!              - Create group for the znl processors
741      !!              - Create a communicator for znl processors
742      !!              - Determine if processor should write znl files
743      !!
744      !! ** output
745      !!      ndim_rank_znl = number of processors on the same row
746      !!      ngrp_znl = group ID for the znl processors
747      !!      ncomm_znl = communicator for the ice procs.
748      !!      n_znl_root = number (in the world) of proc 0 in the ice comm.
749      !!
750      !!----------------------------------------------------------------------
751      INTEGER, INTENT(in) ::   kumout   ! ocean.output logical units
752      !
753      INTEGER :: jproc      ! dummy loop integer
754      INTEGER :: ierr, ii   ! local integer
755      INTEGER, ALLOCATABLE, DIMENSION(:) ::   kwork
756      !!----------------------------------------------------------------------
757#if defined key_mpp_mpi
758      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_world     : ', ngrp_world
759      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - mpi_comm_world : ', mpi_comm_world
760      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - mpi_comm_oce   : ', mpi_comm_oce
761      !
762      ALLOCATE( kwork(jpnij), STAT=ierr )
763      IF( ierr /= 0 ) CALL ctl_stop( 'STOP', 'mpp_ini_znl : failed to allocate 1D array of length jpnij')
764
765      IF( jpnj == 1 ) THEN
766         ngrp_znl  = ngrp_world
767         ncomm_znl = mpi_comm_oce
768      ELSE
769         !
770         CALL MPI_ALLGATHER ( njmpp, 1, mpi_integer, kwork, 1, mpi_integer, mpi_comm_oce, ierr )
771         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - kwork pour njmpp : ', kwork
772         !-$$        CALL flush(numout)
773         !
774         ! Count number of processors on the same row
775         ndim_rank_znl = 0
776         DO jproc=1,jpnij
777            IF ( kwork(jproc) == njmpp ) THEN
778               ndim_rank_znl = ndim_rank_znl + 1
779            ENDIF
780         END DO
781         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ndim_rank_znl : ', ndim_rank_znl
782         !-$$        CALL flush(numout)
783         ! Allocate the right size to nrank_znl
784         IF (ALLOCATED (nrank_znl)) DEALLOCATE(nrank_znl)
785         ALLOCATE(nrank_znl(ndim_rank_znl))
786         ii = 0
787         nrank_znl (:) = 0
788         DO jproc=1,jpnij
789            IF ( kwork(jproc) == njmpp) THEN
790               ii = ii + 1
791               nrank_znl(ii) = jproc -1
792            ENDIF
793         END DO
794         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - nrank_znl : ', nrank_znl
795         !-$$        CALL flush(numout)
796
797         ! Create the opa group
798         CALL MPI_COMM_GROUP(mpi_comm_oce,ngrp_opa,ierr)
799         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_opa : ', ngrp_opa
800         !-$$        CALL flush(numout)
801
802         ! Create the znl group from the opa group
803         CALL MPI_GROUP_INCL  ( ngrp_opa, ndim_rank_znl, nrank_znl, ngrp_znl, ierr )
804         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_znl ', ngrp_znl
805         !-$$        CALL flush(numout)
806
807         ! Create the znl communicator from the opa communicator, ie the pool of procs in the same row
808         CALL MPI_COMM_CREATE ( mpi_comm_oce, ngrp_znl, ncomm_znl, ierr )
809         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ncomm_znl ', ncomm_znl
810         !-$$        CALL flush(numout)
811         !
812      END IF
813
814      ! Determines if processor if the first (starting from i=1) on the row
815      IF ( jpni == 1 ) THEN
816         l_znl_root = .TRUE.
817      ELSE
818         l_znl_root = .FALSE.
819         kwork (1) = nimpp
820         CALL mpp_min ( 'lib_mpp', kwork(1), kcom = ncomm_znl)
821         IF ( nimpp == kwork(1)) l_znl_root = .TRUE.
822      END IF
823
824      DEALLOCATE(kwork)
825#endif
826
827   END SUBROUTINE mpp_ini_znl
828
829
830   SUBROUTINE mpp_ini_north
831      !!----------------------------------------------------------------------
832      !!               ***  routine mpp_ini_north  ***
833      !!
834      !! ** Purpose :   Initialize special communicator for north folding
835      !!      condition together with global variables needed in the mpp folding
836      !!
837      !! ** Method  : - Look for northern processors
838      !!              - Put their number in nrank_north
839      !!              - Create groups for the world processors and the north processors
840      !!              - Create a communicator for northern processors
841      !!
842      !! ** output
843      !!      njmppmax = njmpp for northern procs
844      !!      ndim_rank_north = number of processors in the northern line
845      !!      nrank_north (ndim_rank_north) = number  of the northern procs.
846      !!      ngrp_world = group ID for the world processors
847      !!      ngrp_north = group ID for the northern processors
848      !!      ncomm_north = communicator for the northern procs.
849      !!      north_root = number (in the world) of proc 0 in the northern comm.
850      !!
851      !!----------------------------------------------------------------------
852      INTEGER ::   ierr
853      INTEGER ::   jjproc
854      INTEGER ::   ii, ji
855      !!----------------------------------------------------------------------
856      !
857#if defined key_mpp_mpi
858      njmppmax = MAXVAL( njmppt )
859      !
860      ! Look for how many procs on the northern boundary
861      ndim_rank_north = 0
862      DO jjproc = 1, jpnij
863         IF( njmppt(jjproc) == njmppmax )   ndim_rank_north = ndim_rank_north + 1
864      END DO
865      !
866      ! Allocate the right size to nrank_north
867      IF (ALLOCATED (nrank_north)) DEALLOCATE(nrank_north)
868      ALLOCATE( nrank_north(ndim_rank_north) )
869
870      ! Fill the nrank_north array with proc. number of northern procs.
871      ! Note : the rank start at 0 in MPI
872      ii = 0
873      DO ji = 1, jpnij
874         IF ( njmppt(ji) == njmppmax   ) THEN
875            ii=ii+1
876            nrank_north(ii)=ji-1
877         END IF
878      END DO
879      !
880      ! create the world group
881      CALL MPI_COMM_GROUP( mpi_comm_oce, ngrp_world, ierr )
882      !
883      ! Create the North group from the world group
884      CALL MPI_GROUP_INCL( ngrp_world, ndim_rank_north, nrank_north, ngrp_north, ierr )
885      !
886      ! Create the North communicator , ie the pool of procs in the north group
887      CALL MPI_COMM_CREATE( mpi_comm_oce, ngrp_north, ncomm_north, ierr )
888      !
889#endif
890   END SUBROUTINE mpp_ini_north
891
892
893   SUBROUTINE DDPDD_MPI( ydda, yddb, ilen, itype )
894      !!---------------------------------------------------------------------
895      !!   Routine DDPDD_MPI: used by reduction operator MPI_SUMDD
896      !!
897      !!   Modification of original codes written by David H. Bailey
898      !!   This subroutine computes yddb(i) = ydda(i)+yddb(i)
899      !!---------------------------------------------------------------------
900      INTEGER                     , INTENT(in)    ::   ilen, itype
901      COMPLEX(wp), DIMENSION(ilen), INTENT(in)    ::   ydda
902      COMPLEX(wp), DIMENSION(ilen), INTENT(inout) ::   yddb
903      !
904      REAL(wp) :: zerr, zt1, zt2    ! local work variables
905      INTEGER  :: ji, ztmp           ! local scalar
906      !!---------------------------------------------------------------------
907      !
908      ztmp = itype   ! avoid compilation warning
909      !
910      DO ji=1,ilen
911      ! Compute ydda + yddb using Knuth's trick.
912         zt1  = real(ydda(ji)) + real(yddb(ji))
913         zerr = zt1 - real(ydda(ji))
914         zt2  = ((real(yddb(ji)) - zerr) + (real(ydda(ji)) - (zt1 - zerr))) &
915                + aimag(ydda(ji)) + aimag(yddb(ji))
916
917         ! The result is zt1 + zt2, after normalization.
918         yddb(ji) = cmplx ( zt1 + zt2, zt2 - ((zt1 + zt2) - zt1),wp )
919      END DO
920      !
921   END SUBROUTINE DDPDD_MPI
922
923
924   SUBROUTINE mpp_report( cdname, kpk, kpl, kpf, ld_lbc, ld_glb, ld_dlg )
925      !!----------------------------------------------------------------------
926      !!                  ***  routine mpp_report  ***
927      !!
928      !! ** Purpose :   report use of mpp routines per time-setp
929      !!
930      !!----------------------------------------------------------------------
931      CHARACTER(len=*),           INTENT(in   ) ::   cdname      ! name of the calling subroutine
932      INTEGER         , OPTIONAL, INTENT(in   ) ::   kpk, kpl, kpf
933      LOGICAL         , OPTIONAL, INTENT(in   ) ::   ld_lbc, ld_glb, ld_dlg
934      !!
935      CHARACTER(len=128)                        ::   ccountname  ! name of a subroutine to count communications
936      LOGICAL ::   ll_lbc, ll_glb, ll_dlg
937      INTEGER ::    ji,  jj,  jk,  jh, jf, jcount   ! dummy loop indices
938      !!----------------------------------------------------------------------
939#if defined key_mpp_mpi
940      !
941      ll_lbc = .FALSE.
942      IF( PRESENT(ld_lbc) ) ll_lbc = ld_lbc
943      ll_glb = .FALSE.
944      IF( PRESENT(ld_glb) ) ll_glb = ld_glb
945      ll_dlg = .FALSE.
946      IF( PRESENT(ld_dlg) ) ll_dlg = ld_dlg
947      !
948      ! find the smallest common frequency: default = frequency product, if multiple, choose the larger of the 2 frequency
949      IF( ncom_dttrc /= 1 )   CALL ctl_stop( 'STOP', 'mpp_report, ncom_dttrc /= 1 not coded...' ) 
950      ncom_freq = ncom_fsbc
951      !
952      IF ( ncom_stp == nit000+ncom_freq ) THEN   ! avoid to count extra communications in potential initializations at nit000
953         IF( ll_lbc ) THEN
954            IF( .NOT. ALLOCATED(ncomm_sequence) ) ALLOCATE( ncomm_sequence(ncom_rec_max,2) )
955            IF( .NOT. ALLOCATED(    crname_lbc) ) ALLOCATE(     crname_lbc(ncom_rec_max  ) )
956            n_sequence_lbc = n_sequence_lbc + 1
957            IF( n_sequence_lbc > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
958            crname_lbc(n_sequence_lbc) = cdname     ! keep the name of the calling routine
959            ncomm_sequence(n_sequence_lbc,1) = kpk*kpl   ! size of 3rd and 4th dimensions
960            ncomm_sequence(n_sequence_lbc,2) = kpf       ! number of arrays to be treated (multi)
961         ENDIF
962         IF( ll_glb ) THEN
963            IF( .NOT. ALLOCATED(crname_glb) ) ALLOCATE( crname_glb(ncom_rec_max) )
964            n_sequence_glb = n_sequence_glb + 1
965            IF( n_sequence_glb > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
966            crname_glb(n_sequence_glb) = cdname     ! keep the name of the calling routine
967         ENDIF
968         IF( ll_dlg ) THEN
969            IF( .NOT. ALLOCATED(crname_dlg) ) ALLOCATE( crname_dlg(ncom_rec_max) )
970            n_sequence_dlg = n_sequence_dlg + 1
971            IF( n_sequence_dlg > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
972            crname_dlg(n_sequence_dlg) = cdname     ! keep the name of the calling routine
973         ENDIF
974      ELSE IF ( ncom_stp == nit000+2*ncom_freq ) THEN
975         CALL ctl_opn( numcom, 'communication_report.txt', 'REPLACE', 'FORMATTED', 'SEQUENTIAL', -1, numout, .FALSE., narea )
976         WRITE(numcom,*) ' '
977         WRITE(numcom,*) ' ------------------------------------------------------------'
978         WRITE(numcom,*) ' Communication pattern report (second oce+sbc+top time step):'
979         WRITE(numcom,*) ' ------------------------------------------------------------'
980         WRITE(numcom,*) ' '
981         WRITE(numcom,'(A,I4)') ' Exchanged halos : ', n_sequence_lbc
982         jj = 0; jk = 0; jf = 0; jh = 0
983         DO ji = 1, n_sequence_lbc
984            IF ( ncomm_sequence(ji,1) .GT. 1 ) jk = jk + 1
985            IF ( ncomm_sequence(ji,2) .GT. 1 ) jf = jf + 1
986            IF ( ncomm_sequence(ji,1) .GT. 1 .AND. ncomm_sequence(ji,2) .GT. 1 ) jj = jj + 1
987            jh = MAX (jh, ncomm_sequence(ji,1)*ncomm_sequence(ji,2))
988         END DO
989         WRITE(numcom,'(A,I3)') ' 3D Exchanged halos : ', jk
990         WRITE(numcom,'(A,I3)') ' Multi arrays exchanged halos : ', jf
991         WRITE(numcom,'(A,I3)') '   from which 3D : ', jj
992         WRITE(numcom,'(A,I10)') ' Array max size : ', jh*jpi*jpj
993         WRITE(numcom,*) ' '
994         WRITE(numcom,*) ' lbc_lnk called'
995         DO ji = 1, n_sequence_lbc - 1
996            IF ( crname_lbc(ji) /= 'already counted' ) THEN
997               ccountname = crname_lbc(ji)
998               crname_lbc(ji) = 'already counted'
999               jcount = 1
1000               DO jj = ji + 1, n_sequence_lbc
1001                  IF ( ccountname ==  crname_lbc(jj) ) THEN
1002                     jcount = jcount + 1
1003                     crname_lbc(jj) = 'already counted'
1004                  END IF
1005               END DO
1006               WRITE(numcom,'(A, I4, A, A)') ' - ', jcount,' times by subroutine ', TRIM(ccountname)
1007            END IF
1008         END DO
1009         IF ( crname_lbc(n_sequence_lbc) /= 'already counted' ) THEN
1010            WRITE(numcom,'(A, I4, A, A)') ' - ', 1,' times by subroutine ', TRIM(crname_lbc(ncom_rec_max))
1011         END IF
1012         WRITE(numcom,*) ' '
1013         IF ( n_sequence_glb > 0 ) THEN
1014            WRITE(numcom,'(A,I4)') ' Global communications : ', n_sequence_glb
1015            jj = 1
1016            DO ji = 2, n_sequence_glb
1017               IF( crname_glb(ji-1) /= crname_glb(ji) ) THEN
1018                  WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_glb(ji-1))
1019                  jj = 0
1020               END IF
1021               jj = jj + 1 
1022            END DO
1023            WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_glb(n_sequence_glb))
1024            DEALLOCATE(crname_glb)
1025         ELSE
1026            WRITE(numcom,*) ' No MPI global communication '
1027         ENDIF
1028         WRITE(numcom,*) ' '
1029         IF ( n_sequence_dlg > 0 ) THEN
1030            WRITE(numcom,'(A,I4)') ' Delayed global communications : ', n_sequence_dlg
1031            jj = 1
1032            DO ji = 2, n_sequence_dlg
1033               IF( crname_dlg(ji-1) /= crname_dlg(ji) ) THEN
1034                  WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_dlg(ji-1))
1035                  jj = 0
1036               END IF
1037               jj = jj + 1 
1038            END DO
1039            WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_dlg(n_sequence_dlg))
1040            DEALLOCATE(crname_dlg)
1041         ELSE
1042            WRITE(numcom,*) ' No MPI delayed global communication '
1043         ENDIF
1044         WRITE(numcom,*) ' '
1045         WRITE(numcom,*) ' -----------------------------------------------'
1046         WRITE(numcom,*) ' '
1047         DEALLOCATE(ncomm_sequence)
1048         DEALLOCATE(crname_lbc)
1049      ENDIF
1050#endif
1051   END SUBROUTINE mpp_report
1052
1053   
1054   SUBROUTINE tic_tac (ld_tic, ld_global)
1055
1056    LOGICAL,           INTENT(IN) :: ld_tic
1057    LOGICAL, OPTIONAL, INTENT(IN) :: ld_global
1058    REAL(wp), DIMENSION(2), SAVE :: tic_wt
1059    REAL(wp),               SAVE :: tic_ct = 0._wp
1060    INTEGER :: ii
1061#if defined key_mpp_mpi
1062
1063    IF( ncom_stp <= nit000 ) RETURN
1064    IF( ncom_stp == nitend ) RETURN
1065    ii = 1
1066    IF( PRESENT( ld_global ) ) THEN
1067       IF( ld_global ) ii = 2
1068    END IF
1069   
1070    IF ( ld_tic ) THEN
1071       tic_wt(ii) = MPI_Wtime()                                                    ! start count tic->tac (waiting time)
1072       IF ( tic_ct > 0.0_wp ) compute_time = compute_time + MPI_Wtime() - tic_ct   ! cumulate count tac->tic
1073    ELSE
1074       waiting_time(ii) = waiting_time(ii) + MPI_Wtime() - tic_wt(ii)              ! cumulate count tic->tac
1075       tic_ct = MPI_Wtime()                                                        ! start count tac->tic (waiting time)
1076    ENDIF
1077#endif
1078   
1079   END SUBROUTINE tic_tac
1080
1081#if ! defined key_mpp_mpi
1082   SUBROUTINE mpi_wait(request, status, ierror)
1083      INTEGER                            , INTENT(in   ) ::   request
1084      INTEGER, DIMENSION(MPI_STATUS_SIZE), INTENT(  out) ::   status
1085      INTEGER                            , INTENT(  out) ::   ierror
1086   END SUBROUTINE mpi_wait
1087
1088   
1089   FUNCTION MPI_Wtime()
1090      REAL(wp) ::  MPI_Wtime
1091      MPI_Wtime = -1.
1092   END FUNCTION MPI_Wtime
1093#endif
1094
1095   !!----------------------------------------------------------------------
1096   !!   ctl_stop, ctl_warn, get_unit, ctl_opn, ctl_nam, load_nml   routines
1097   !!----------------------------------------------------------------------
1098
1099   SUBROUTINE ctl_stop( cd1, cd2, cd3, cd4, cd5 ,   &
1100      &                 cd6, cd7, cd8, cd9, cd10 )
1101      !!----------------------------------------------------------------------
1102      !!                  ***  ROUTINE  stop_opa  ***
1103      !!
1104      !! ** Purpose :   print in ocean.outpput file a error message and
1105      !!                increment the error number (nstop) by one.
1106      !!----------------------------------------------------------------------
1107      CHARACTER(len=*), INTENT(in   )           ::   cd1
1108      CHARACTER(len=*), INTENT(in   ), OPTIONAL ::        cd2, cd3, cd4, cd5
1109      CHARACTER(len=*), INTENT(in   ), OPTIONAL ::   cd6, cd7, cd8, cd9, cd10
1110      !!----------------------------------------------------------------------
1111      !
1112      nstop = nstop + 1
1113      !
1114      ! force to open ocean.output file if not already opened
1115      IF( numout == 6 ) CALL ctl_opn( numout, 'ocean.output', 'APPEND', 'FORMATTED', 'SEQUENTIAL', -1, 6, .FALSE. )
1116      !
1117                            WRITE(numout,*)
1118                            WRITE(numout,*) ' ===>>> : E R R O R'
1119                            WRITE(numout,*)
1120                            WRITE(numout,*) '         ==========='
1121                            WRITE(numout,*)
1122                            WRITE(numout,*) TRIM(cd1)
1123      IF( PRESENT(cd2 ) )   WRITE(numout,*) TRIM(cd2)
1124      IF( PRESENT(cd3 ) )   WRITE(numout,*) TRIM(cd3)
1125      IF( PRESENT(cd4 ) )   WRITE(numout,*) TRIM(cd4)
1126      IF( PRESENT(cd5 ) )   WRITE(numout,*) TRIM(cd5)
1127      IF( PRESENT(cd6 ) )   WRITE(numout,*) TRIM(cd6)
1128      IF( PRESENT(cd7 ) )   WRITE(numout,*) TRIM(cd7)
1129      IF( PRESENT(cd8 ) )   WRITE(numout,*) TRIM(cd8)
1130      IF( PRESENT(cd9 ) )   WRITE(numout,*) TRIM(cd9)
1131      IF( PRESENT(cd10) )   WRITE(numout,*) TRIM(cd10)
1132                            WRITE(numout,*)
1133      !
1134                               CALL FLUSH(numout    )
1135      IF( numstp     /= -1 )   CALL FLUSH(numstp    )
1136      IF( numrun     /= -1 )   CALL FLUSH(numrun    )
1137      IF( numevo_ice /= -1 )   CALL FLUSH(numevo_ice)
1138      !
1139      IF( cd1 == 'STOP' ) THEN
1140         WRITE(numout,*) 
1141         WRITE(numout,*)  'huge E-R-R-O-R : immediate stop'
1142         WRITE(numout,*) 
1143         CALL mppstop( ld_abort = .true. )
1144      ENDIF
1145      !
1146   END SUBROUTINE ctl_stop
1147
1148
1149   SUBROUTINE ctl_warn( cd1, cd2, cd3, cd4, cd5,   &
1150      &                 cd6, cd7, cd8, cd9, cd10 )
1151      !!----------------------------------------------------------------------
1152      !!                  ***  ROUTINE  stop_warn  ***
1153      !!
1154      !! ** Purpose :   print in ocean.outpput file a error message and
1155      !!                increment the warning number (nwarn) by one.
1156      !!----------------------------------------------------------------------
1157      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd1, cd2, cd3, cd4, cd5
1158      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd6, cd7, cd8, cd9, cd10
1159      !!----------------------------------------------------------------------
1160      !
1161      nwarn = nwarn + 1
1162      !
1163      IF(lwp) THEN
1164                               WRITE(numout,*)
1165                               WRITE(numout,*) ' ===>>> : W A R N I N G'
1166                               WRITE(numout,*)
1167                               WRITE(numout,*) '         ==============='
1168                               WRITE(numout,*)
1169         IF( PRESENT(cd1 ) )   WRITE(numout,*) TRIM(cd1)
1170         IF( PRESENT(cd2 ) )   WRITE(numout,*) TRIM(cd2)
1171         IF( PRESENT(cd3 ) )   WRITE(numout,*) TRIM(cd3)
1172         IF( PRESENT(cd4 ) )   WRITE(numout,*) TRIM(cd4)
1173         IF( PRESENT(cd5 ) )   WRITE(numout,*) TRIM(cd5)
1174         IF( PRESENT(cd6 ) )   WRITE(numout,*) TRIM(cd6)
1175         IF( PRESENT(cd7 ) )   WRITE(numout,*) TRIM(cd7)
1176         IF( PRESENT(cd8 ) )   WRITE(numout,*) TRIM(cd8)
1177         IF( PRESENT(cd9 ) )   WRITE(numout,*) TRIM(cd9)
1178         IF( PRESENT(cd10) )   WRITE(numout,*) TRIM(cd10)
1179                               WRITE(numout,*)
1180      ENDIF
1181      CALL FLUSH(numout)
1182      !
1183   END SUBROUTINE ctl_warn
1184
1185
1186   SUBROUTINE ctl_opn( knum, cdfile, cdstat, cdform, cdacce, klengh, kout, ldwp, karea )
1187      !!----------------------------------------------------------------------
1188      !!                  ***  ROUTINE ctl_opn  ***
1189      !!
1190      !! ** Purpose :   Open file and check if required file is available.
1191      !!
1192      !! ** Method  :   Fortan open
1193      !!----------------------------------------------------------------------
1194      INTEGER          , INTENT(  out) ::   knum      ! logical unit to open
1195      CHARACTER(len=*) , INTENT(in   ) ::   cdfile    ! file name to open
1196      CHARACTER(len=*) , INTENT(in   ) ::   cdstat    ! disposition specifier
1197      CHARACTER(len=*) , INTENT(in   ) ::   cdform    ! formatting specifier
1198      CHARACTER(len=*) , INTENT(in   ) ::   cdacce    ! access specifier
1199      INTEGER          , INTENT(in   ) ::   klengh    ! record length
1200      INTEGER          , INTENT(in   ) ::   kout      ! number of logical units for write
1201      LOGICAL          , INTENT(in   ) ::   ldwp      ! boolean term for print
1202      INTEGER, OPTIONAL, INTENT(in   ) ::   karea     ! proc number
1203      !
1204      CHARACTER(len=80) ::   clfile
1205      INTEGER           ::   iost
1206      !!----------------------------------------------------------------------
1207      !
1208      ! adapt filename
1209      ! ----------------
1210      clfile = TRIM(cdfile)
1211      IF( PRESENT( karea ) ) THEN
1212         IF( karea > 1 )   WRITE(clfile, "(a,'_',i4.4)") TRIM(clfile), karea-1
1213      ENDIF
1214#if defined key_agrif
1215      IF( .NOT. Agrif_Root() )   clfile = TRIM(Agrif_CFixed())//'_'//TRIM(clfile)
1216      knum=Agrif_Get_Unit()
1217#else
1218      knum=get_unit()
1219#endif
1220      IF( TRIM(cdfile) == '/dev/null' )   clfile = TRIM(cdfile)   ! force the use of /dev/null
1221      !
1222      IF(       cdacce(1:6) == 'DIRECT' )  THEN   ! cdacce has always more than 6 characters
1223         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS=cdstat, RECL=klengh         , ERR=100, IOSTAT=iost )
1224      ELSE IF( TRIM(cdstat) == 'APPEND' )  THEN   ! cdstat can have less than 6 characters
1225         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS='UNKNOWN', POSITION='APPEND', ERR=100, IOSTAT=iost )
1226      ELSE
1227         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS=cdstat                      , ERR=100, IOSTAT=iost )
1228      ENDIF
1229      IF( iost /= 0 .AND. TRIM(clfile) == '/dev/null' ) &   ! for windows
1230         &  OPEN(UNIT=knum,FILE='NUL', FORM=cdform, ACCESS=cdacce, STATUS=cdstat                      , ERR=100, IOSTAT=iost )   
1231      IF( iost == 0 ) THEN
1232         IF(ldwp) THEN
1233            WRITE(kout,*) '     file   : ', TRIM(clfile),' open ok'
1234            WRITE(kout,*) '     unit   = ', knum
1235            WRITE(kout,*) '     status = ', cdstat
1236            WRITE(kout,*) '     form   = ', cdform
1237            WRITE(kout,*) '     access = ', cdacce
1238            WRITE(kout,*)
1239         ENDIF
1240      ENDIF
1241100   CONTINUE
1242      IF( iost /= 0 ) THEN
1243         WRITE(ctmp1,*) ' ===>>>> : bad opening file: ', TRIM(clfile)
1244         WRITE(ctmp2,*) ' =======   ===  '
1245         WRITE(ctmp3,*) '           unit   = ', knum
1246         WRITE(ctmp4,*) '           status = ', cdstat
1247         WRITE(ctmp5,*) '           form   = ', cdform
1248         WRITE(ctmp6,*) '           access = ', cdacce
1249         WRITE(ctmp7,*) '           iostat = ', iost
1250         WRITE(ctmp8,*) '           we stop. verify the file '
1251         CALL ctl_stop( 'STOP', ctmp1, ctmp2, ctmp3, ctmp4, ctmp5, ctmp6, ctmp7, ctmp8 )
1252      ENDIF
1253      !
1254   END SUBROUTINE ctl_opn
1255
1256
1257   SUBROUTINE ctl_nam ( kios, cdnam )
1258      !!----------------------------------------------------------------------
1259      !!                  ***  ROUTINE ctl_nam  ***
1260      !!
1261      !! ** Purpose :   Informations when error while reading a namelist
1262      !!
1263      !! ** Method  :   Fortan open
1264      !!----------------------------------------------------------------------
1265      INTEGER                                , INTENT(inout) ::   kios    ! IO status after reading the namelist
1266      CHARACTER(len=*)                       , INTENT(in   ) ::   cdnam   ! group name of namelist for which error occurs
1267      !
1268      CHARACTER(len=5) ::   clios   ! string to convert iostat in character for print
1269      !!----------------------------------------------------------------------
1270      !
1271      WRITE (clios, '(I5.0)')   kios
1272      IF( kios < 0 ) THEN         
1273         CALL ctl_warn( 'end of record or file while reading namelist '   &
1274            &           // TRIM(cdnam) // ' iostat = ' // TRIM(clios) )
1275      ENDIF
1276      !
1277      IF( kios > 0 ) THEN
1278         CALL ctl_stop( 'misspelled variable in namelist '   &
1279            &           // TRIM(cdnam) // ' iostat = ' // TRIM(clios) )
1280      ENDIF
1281      kios = 0
1282      !
1283   END SUBROUTINE ctl_nam
1284
1285
1286   INTEGER FUNCTION get_unit()
1287      !!----------------------------------------------------------------------
1288      !!                  ***  FUNCTION  get_unit  ***
1289      !!
1290      !! ** Purpose :   return the index of an unused logical unit
1291      !!----------------------------------------------------------------------
1292      LOGICAL :: llopn
1293      !!----------------------------------------------------------------------
1294      !
1295      get_unit = 15   ! choose a unit that is big enough then it is not already used in NEMO
1296      llopn = .TRUE.
1297      DO WHILE( (get_unit < 998) .AND. llopn )
1298         get_unit = get_unit + 1
1299         INQUIRE( unit = get_unit, opened = llopn )
1300      END DO
1301      IF( (get_unit == 999) .AND. llopn ) THEN
1302         CALL ctl_stop( 'STOP', 'get_unit: All logical units until 999 are used...' )
1303      ENDIF
1304      !
1305   END FUNCTION get_unit
1306
1307   SUBROUTINE load_nml( cdnambuff , cdnamfile, kout, ldwp)
1308      CHARACTER(LEN=:)    , ALLOCATABLE, INTENT(INOUT) :: cdnambuff
1309      CHARACTER(LEN=*), INTENT(IN )                :: cdnamfile
1310      CHARACTER(LEN=256)                           :: chline
1311      INTEGER, INTENT(IN)                          :: kout
1312      LOGICAL, INTENT(IN)                          :: ldwp  !: .true. only for the root broadcaster
1313      INTEGER                                      :: itot, iun, iltc, inl, ios
1314      !
1315      ! Check if the namelist buffer has already been allocated. Return if it has.
1316      !
1317      IF ( ALLOCATED( cdnambuff ) ) RETURN
1318      IF( ldwp ) THEN
1319         !
1320         ! Open namelist file
1321         !
1322         CALL ctl_opn( iun, cdnamfile, 'OLD', 'FORMATTED', 'SEQUENTIAL', -1, kout, ldwp )
1323         !
1324         ! First pass: count characters excluding comments and trimable white space
1325         !
1326         itot=0
1327     10  READ(iun,'(A256)',END=20,ERR=20) chline
1328         iltc = LEN_TRIM(chline)
1329         IF ( iltc.GT.0 ) THEN
1330          inl = INDEX(chline, '!') 
1331          IF( inl.eq.0 ) THEN
1332           itot = itot + iltc + 1                                ! +1 for the newline character
1333          ELSEIF( inl.GT.0 .AND. LEN_TRIM( chline(1:inl-1) ).GT.0 ) THEN
1334           itot = itot + inl                                  !  includes +1 for the newline character
1335          ENDIF
1336         ENDIF
1337         GOTO 10
1338     20  CONTINUE
1339         !
1340         ! Allocate text cdnambuff for condensed namelist
1341         !
1342         ALLOCATE( CHARACTER(LEN=itot) :: cdnambuff )
1343         WRITE(*,*) 'ALLOCATED ', itot
1344         !
1345         ! Second pass: read and transfer pruned characters into cdnambuff
1346         !
1347         REWIND(iun)
1348         itot=1
1349     30  READ(iun,'(A256)',END=40,ERR=40) chline
1350         iltc = LEN_TRIM(chline)
1351         IF ( iltc.GT.0 ) THEN
1352          inl = INDEX(chline, '!')
1353          IF( inl.eq.0 ) THEN
1354           inl = iltc
1355          ELSE
1356           inl = inl - 1
1357          ENDIF
1358          IF( inl.GT.0 .AND. LEN_TRIM( chline(1:inl) ).GT.0 ) THEN
1359             cdnambuff(itot:itot+inl-1) = chline(1:inl)
1360             WRITE( cdnambuff(itot+inl:itot+inl), '(a)' ) NEW_LINE('A')
1361             itot = itot + inl + 1
1362          ENDIF
1363         ENDIF
1364         GOTO 30
1365     40  CONTINUE
1366         itot = itot - 1
1367         WRITE(*,*) 'ASSIGNED ',itot
1368         !
1369         ! Close namelist file
1370         !
1371         CLOSE(iun)
1372         !write(*,'(32A)') cdnambuff
1373      ENDIF
1374#if defined key_mpp_mpi
1375      CALL mpp_bcast_nml( cdnambuff, itot )
1376#endif
1377  END SUBROUTINE load_nml
1378
1379
1380   !!----------------------------------------------------------------------
1381END MODULE lib_mpp
Note: See TracBrowser for help on using the repository browser.