New URL for NEMO forge!   http://forge.nemo-ocean.eu

Since March 2022 along with NEMO 4.2 release, the code development moved to a self-hosted GitLab.
This present forge is now archived and remained online for history.
lib_mpp.F90 in NEMO/releases/r4.0/r4.0-HEAD/src/OCE/LBC – NEMO

source: NEMO/releases/r4.0/r4.0-HEAD/src/OCE/LBC/lib_mpp.F90 @ 12901

Last change on this file since 12901 was 12859, checked in by smasson, 4 years ago

r4-HEAD: stpctl bugfix, see #2456

  • Property svn:keywords set to Id
File size: 56.8 KB
RevLine 
[3]1MODULE lib_mpp
[13]2   !!======================================================================
3   !!                       ***  MODULE  lib_mpp  ***
[1344]4   !! Ocean numerics:  massively parallel processing library
[13]5   !!=====================================================================
[1344]6   !! History :  OPA  !  1994  (M. Guyon, J. Escobar, M. Imbard)  Original code
7   !!            7.0  !  1997  (A.M. Treguier)  SHMEM additions
8   !!            8.0  !  1998  (M. Imbard, J. Escobar, L. Colombet ) SHMEM and MPI
9   !!                 !  1998  (J.M. Molines) Open boundary conditions
[9019]10   !!   NEMO     1.0  !  2003  (J.M. Molines, G. Madec)  F90, free form
[1344]11   !!                 !  2003  (J.M. Molines) add mpp_ini_north(_3d,_2d)
12   !!             -   !  2004  (R. Bourdalle Badie)  isend option in mpi
13   !!                 !  2004  (J.M. Molines) minloc, maxloc
14   !!             -   !  2005  (G. Madec, S. Masson)  npolj=5,6 F-point & ice cases
15   !!             -   !  2005  (R. Redler) Replacement of MPI_COMM_WORLD except for MPI_Abort
16   !!             -   !  2005  (R. Benshila, G. Madec)  add extra halo case
17   !!             -   !  2008  (R. Benshila) add mpp_ini_ice
18   !!            3.2  !  2009  (R. Benshila) SHMEM suppression, north fold in lbc_nfd
[3764]19   !!            3.2  !  2009  (O. Marti)    add mpp_ini_znl
[2715]20   !!            4.0  !  2011  (G. Madec)  move ctl_ routines from in_out_manager
[9019]21   !!            3.5  !  2012  (S.Mocavero, I. Epicoco) Add mpp_lnk_bdy_3d/2d routines to optimize the BDY comm.
22   !!            3.5  !  2013  (C. Ethe, G. Madec)  message passing arrays as local variables
[6140]23   !!            3.5  !  2013  (S.Mocavero, I.Epicoco - CMCC) north fold optimizations
[9019]24   !!            3.6  !  2015  (O. Tintó and M. Castrillo - BSC) Added '_multiple' case for 2D lbc and max
25   !!            4.0  !  2017  (G. Madec) automatique allocation of array argument (use any 3rd dimension)
26   !!             -   !  2017  (G. Madec) create generic.h90 files to generate all lbc and north fold routines
[13]27   !!----------------------------------------------------------------------
[2715]28
29   !!----------------------------------------------------------------------
[6140]30   !!   ctl_stop      : update momentum and tracer Kz from a tke scheme
31   !!   ctl_warn      : initialization, namelist read, and parameters control
32   !!   ctl_opn       : Open file and check if required file is available.
33   !!   ctl_nam       : Prints informations when an error occurs while reading a namelist
[2715]34   !!----------------------------------------------------------------------
[13]35   !!----------------------------------------------------------------------
[11536]36   !!   mpp_start     : get local communicator its size and rank
[2715]37   !!   mpp_lnk       : interface (defined in lbclnk) for message passing of 2d or 3d arrays (mpp_lnk_2d, mpp_lnk_3d)
[4990]38   !!   mpp_lnk_icb   : interface for message passing of 2d arrays with extra halo for icebergs (mpp_lnk_2d_icb)
[6140]39   !!   mpprecv       :
[9019]40   !!   mppsend       :
[2715]41   !!   mppscatter    :
42   !!   mppgather     :
43   !!   mpp_min       : generic interface for mppmin_int , mppmin_a_int , mppmin_real, mppmin_a_real
44   !!   mpp_max       : generic interface for mppmax_int , mppmax_a_int , mppmax_real, mppmax_a_real
45   !!   mpp_sum       : generic interface for mppsum_int , mppsum_a_int , mppsum_real, mppsum_a_real
46   !!   mpp_minloc    :
47   !!   mpp_maxloc    :
48   !!   mppsync       :
49   !!   mppstop       :
[1344]50   !!   mpp_ini_north : initialisation of north fold
[9019]51   !!   mpp_lbc_north_icb : alternative to mpp_nfd for extra outer halo with icebergs
[13]52   !!----------------------------------------------------------------------
[3764]53   USE dom_oce        ! ocean space and time domain
[2715]54   USE in_out_manager ! I/O manager
[3]55
[13]56   IMPLICIT NONE
[415]57   PRIVATE
[9019]58   !
[11536]59   PUBLIC   ctl_stop, ctl_warn, ctl_opn, ctl_nam
60   PUBLIC   mpp_start, mppstop, mppsync, mpp_comm_free
[9019]61   PUBLIC   mpp_ini_north
[1344]62   PUBLIC   mpp_min, mpp_max, mpp_sum, mpp_minloc, mpp_maxloc
[10425]63   PUBLIC   mpp_delay_max, mpp_delay_sum, mpp_delay_rcv
[3294]64   PUBLIC   mppscatter, mppgather
[10425]65   PUBLIC   mpp_ini_znl
[3764]66   PUBLIC   mppsend, mpprecv                          ! needed by TAM and ICB routines
[11536]67   PUBLIC   mpp_report
68   PUBLIC   tic_tac
69#if ! defined key_mpp_mpi
70   PUBLIC MPI_Wtime
71#endif
[5429]72   
[13]73   !! * Interfaces
74   !! define generic interface for these routine as they are called sometimes
[1344]75   !! with scalar arguments instead of array arguments, which causes problems
76   !! for the compilation on AIX system as well as NEC and SGI. Ok on COMPACQ
[13]77   INTERFACE mpp_min
78      MODULE PROCEDURE mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real
79   END INTERFACE
80   INTERFACE mpp_max
[681]81      MODULE PROCEDURE mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real
[13]82   END INTERFACE
83   INTERFACE mpp_sum
[6140]84      MODULE PROCEDURE mppsum_a_int, mppsum_int, mppsum_a_real, mppsum_real,   &
[9019]85         &             mppsum_realdd, mppsum_a_realdd
[13]86   END INTERFACE
[1344]87   INTERFACE mpp_minloc
88      MODULE PROCEDURE mpp_minloc2d ,mpp_minloc3d
89   END INTERFACE
90   INTERFACE mpp_maxloc
91      MODULE PROCEDURE mpp_maxloc2d ,mpp_maxloc3d
92   END INTERFACE
[6490]93
[51]94   !! ========================= !!
95   !!  MPI  variable definition !!
96   !! ========================= !!
[11536]97#if   defined key_mpp_mpi
[1629]98!$AGRIF_DO_NOT_TREAT
[2004]99   INCLUDE 'mpif.h'
[1629]100!$AGRIF_END_DO_NOT_TREAT
[1344]101   LOGICAL, PUBLIC, PARAMETER ::   lk_mpp = .TRUE.    !: mpp flag
[11536]102#else   
103   INTEGER, PUBLIC, PARAMETER ::   MPI_STATUS_SIZE = 1
104   INTEGER, PUBLIC, PARAMETER ::   MPI_DOUBLE_PRECISION = 8
105   LOGICAL, PUBLIC, PARAMETER ::   lk_mpp = .FALSE.    !: mpp flag
106#endif
[3]107
[1344]108   INTEGER, PARAMETER         ::   nprocmax = 2**10   ! maximun dimension (required to be a power of 2)
[3764]109
[10425]110   INTEGER, PUBLIC ::   mppsize        ! number of process
111   INTEGER, PUBLIC ::   mpprank        ! process number  [ 0 - size-1 ]
[2363]112!$AGRIF_DO_NOT_TREAT
[9570]113   INTEGER, PUBLIC ::   mpi_comm_oce   ! opa local communicator
[2363]114!$AGRIF_END_DO_NOT_TREAT
[3]115
[2480]116   INTEGER :: MPI_SUMDD
[1976]117
[1345]118   ! variables used for zonal integration
119   INTEGER, PUBLIC ::   ncomm_znl       !: communicator made by the processors on the same zonal average
[9019]120   LOGICAL, PUBLIC ::   l_znl_root      !: True on the 'left'most processor on the same row
121   INTEGER         ::   ngrp_znl        !  group ID for the znl processors
122   INTEGER         ::   ndim_rank_znl   !  number of processors on the same zonal average
[2715]123   INTEGER, DIMENSION(:), ALLOCATABLE, SAVE ::   nrank_znl  ! dimension ndim_rank_znl, number of the procs into the same znl domain
[3]124
[3764]125   ! North fold condition in mpp_mpi with jpni > 1 (PUBLIC for TAM)
[9019]126   INTEGER, PUBLIC ::   ngrp_world        !: group ID for the world processors
127   INTEGER, PUBLIC ::   ngrp_opa          !: group ID for the opa processors
128   INTEGER, PUBLIC ::   ngrp_north        !: group ID for the northern processors (to be fold)
129   INTEGER, PUBLIC ::   ncomm_north       !: communicator made by the processors belonging to ngrp_north
130   INTEGER, PUBLIC ::   ndim_rank_north   !: number of 'sea' processor in the northern line (can be /= jpni !)
131   INTEGER, PUBLIC ::   njmppmax          !: value of njmpp for the processors of the northern line
132   INTEGER, PUBLIC ::   north_root        !: number (in the comm_opa) of proc 0 in the northern comm
133   INTEGER, PUBLIC, DIMENSION(:), ALLOCATABLE, SAVE ::   nrank_north   !: dimension ndim_rank_north
[3764]134
[10425]135   ! Communications summary report
136   CHARACTER(len=128), DIMENSION(:), ALLOCATABLE ::   crname_lbc                   !: names of lbc_lnk calling routines
[10437]137   CHARACTER(len=128), DIMENSION(:), ALLOCATABLE ::   crname_glb                   !: names of global comm calling routines
138   CHARACTER(len=128), DIMENSION(:), ALLOCATABLE ::   crname_dlg                   !: names of delayed global comm calling routines
[10425]139   INTEGER, PUBLIC                               ::   ncom_stp = 0                 !: copy of time step # istp
140   INTEGER, PUBLIC                               ::   ncom_fsbc = 1                !: copy of sbc time step # nn_fsbc
141   INTEGER, PUBLIC                               ::   ncom_dttrc = 1               !: copy of top time step # nn_dttrc
142   INTEGER, PUBLIC                               ::   ncom_freq                    !: frequency of comm diagnostic
143   INTEGER, PUBLIC , DIMENSION(:,:), ALLOCATABLE ::   ncomm_sequence               !: size of communicated arrays (halos)
[10781]144   INTEGER, PARAMETER, PUBLIC                    ::   ncom_rec_max = 5000          !: max number of communication record
[10425]145   INTEGER, PUBLIC                               ::   n_sequence_lbc = 0           !: # of communicated arraysvia lbc
146   INTEGER, PUBLIC                               ::   n_sequence_glb = 0           !: # of global communications
[10437]147   INTEGER, PUBLIC                               ::   n_sequence_dlg = 0           !: # of delayed global communications
[10425]148   INTEGER, PUBLIC                               ::   numcom = -1                  !: logical unit for communicaton report
149   LOGICAL, PUBLIC                               ::   l_full_nf_update = .TRUE.    !: logical for a full (2lines) update of bc at North fold report
150   INTEGER,                    PARAMETER, PUBLIC ::   nbdelay = 2       !: number of delayed operations
151   !: name (used as id) of allreduce-delayed operations
[10521]152   ! Warning: we must use the same character length in an array constructor (at least for gcc compiler)
153   CHARACTER(len=32), DIMENSION(nbdelay), PUBLIC ::   c_delaylist = (/ 'cflice', 'fwb   ' /)
[10425]154   !: component name where the allreduce-delayed operation is performed
155   CHARACTER(len=3),  DIMENSION(nbdelay), PUBLIC ::   c_delaycpnt = (/ 'ICE'   , 'OCE' /)
156   TYPE, PUBLIC ::   DELAYARR
157      REAL(   wp), POINTER, DIMENSION(:) ::  z1d => NULL()
158      COMPLEX(wp), POINTER, DIMENSION(:) ::  y1d => NULL()
159   END TYPE DELAYARR
[10817]160   TYPE( DELAYARR ), DIMENSION(nbdelay), PUBLIC, SAVE  ::   todelay         !: must have SAVE for default initialization of DELAYARR
161   INTEGER,          DIMENSION(nbdelay), PUBLIC        ::   ndelayid = -1   !: mpi request id of the delayed operations
[10425]162
163   ! timing summary report
164   REAL(wp), DIMENSION(2), PUBLIC ::  waiting_time = 0._wp
165   REAL(wp)              , PUBLIC ::  compute_time = 0._wp, elapsed_time = 0._wp
166   
[9019]167   REAL(wp), DIMENSION(:), ALLOCATABLE, SAVE ::   tampon   ! buffer in case of bsend
[3]168
[9019]169   LOGICAL, PUBLIC ::   ln_nnogather                !: namelist control of northfold comms
170   LOGICAL, PUBLIC ::   l_north_nogather = .FALSE.  !: internal control of northfold comms
[11536]171   
[51]172   !!----------------------------------------------------------------------
[9598]173   !! NEMO/OCE 4.0 , NEMO Consortium (2018)
[888]174   !! $Id$
[10068]175   !! Software governed by the CeCILL license (see ./LICENSE)
[1344]176   !!----------------------------------------------------------------------
[3]177CONTAINS
178
[11536]179   SUBROUTINE mpp_start( localComm )
[2715]180      !!----------------------------------------------------------------------
[11536]181      !!                  ***  routine mpp_start  ***
[3764]182      !!
[11536]183      !! ** Purpose :   get mpi_comm_oce, mpprank and mppsize
[51]184      !!----------------------------------------------------------------------
[6140]185      INTEGER         , OPTIONAL   , INTENT(in   ) ::   localComm    !
[2715]186      !
[11536]187      INTEGER ::   ierr
188      LOGICAL ::   llmpi_init
[51]189      !!----------------------------------------------------------------------
[11536]190#if defined key_mpp_mpi
[1344]191      !
[11536]192      CALL mpi_initialized ( llmpi_init, ierr )
193      IF( ierr /= MPI_SUCCESS ) CALL ctl_stop( 'STOP', ' lib_mpp: Error in routine mpi_initialized' )
[2715]194
[11536]195      IF( .NOT. llmpi_init ) THEN
196         IF( PRESENT(localComm) ) THEN
197            WRITE(ctmp1,*) ' lib_mpp: You cannot provide a local communicator '
198            WRITE(ctmp2,*) '          without calling MPI_Init before ! '
199            CALL ctl_stop( 'STOP', ctmp1, ctmp2 )
200         ENDIF
201         CALL mpi_init( ierr )
202         IF( ierr /= MPI_SUCCESS ) CALL ctl_stop( 'STOP', ' lib_mpp: Error in routine mpi_init' )
[2480]203      ENDIF
[11536]204       
[3764]205      IF( PRESENT(localComm) ) THEN
[2480]206         IF( Agrif_Root() ) THEN
[9570]207            mpi_comm_oce = localComm
[2480]208         ENDIF
209      ELSE
[11536]210         CALL mpi_comm_dup( mpi_comm_world, mpi_comm_oce, ierr)
211         IF( ierr /= MPI_SUCCESS ) CALL ctl_stop( 'STOP', ' lib_mpp: Error in routine mpi_comm_dup' )
[3764]212      ENDIF
[2480]213
[11536]214# if defined key_agrif
[9019]215      IF( Agrif_Root() ) THEN
[9570]216         CALL Agrif_MPI_Init(mpi_comm_oce)
[5656]217      ELSE
[9570]218         CALL Agrif_MPI_set_grid_comm(mpi_comm_oce)
[5656]219      ENDIF
[11536]220# endif
[5656]221
[9570]222      CALL mpi_comm_rank( mpi_comm_oce, mpprank, ierr )
223      CALL mpi_comm_size( mpi_comm_oce, mppsize, ierr )
[3764]224      !
[1976]225      CALL MPI_OP_CREATE(DDPDD_MPI, .TRUE., MPI_SUMDD, ierr)
226      !
[11536]227#else
228      IF( PRESENT( localComm ) ) mpi_comm_oce = localComm
229      mppsize = 1
230      mpprank = 0
231#endif
232   END SUBROUTINE mpp_start
[3]233
[6140]234
[1344]235   SUBROUTINE mppsend( ktyp, pmess, kbytes, kdest, md_req )
[51]236      !!----------------------------------------------------------------------
237      !!                  ***  routine mppsend  ***
[3764]238      !!
[51]239      !! ** Purpose :   Send messag passing array
240      !!
241      !!----------------------------------------------------------------------
[1344]242      REAL(wp), INTENT(inout) ::   pmess(*)   ! array of real
243      INTEGER , INTENT(in   ) ::   kbytes     ! size of the array pmess
244      INTEGER , INTENT(in   ) ::   kdest      ! receive process number
245      INTEGER , INTENT(in   ) ::   ktyp       ! tag of the message
246      INTEGER , INTENT(in   ) ::   md_req     ! argument for isend
247      !!
248      INTEGER ::   iflag
[51]249      !!----------------------------------------------------------------------
[1344]250      !
[11536]251#if defined key_mpp_mpi
252      CALL mpi_isend( pmess, kbytes, mpi_double_precision, kdest , ktyp, mpi_comm_oce, md_req, iflag )
253#endif
[1344]254      !
[51]255   END SUBROUTINE mppsend
[3]256
257
[3294]258   SUBROUTINE mpprecv( ktyp, pmess, kbytes, ksource )
[51]259      !!----------------------------------------------------------------------
260      !!                  ***  routine mpprecv  ***
261      !!
262      !! ** Purpose :   Receive messag passing array
263      !!
264      !!----------------------------------------------------------------------
[1344]265      REAL(wp), INTENT(inout) ::   pmess(*)   ! array of real
266      INTEGER , INTENT(in   ) ::   kbytes     ! suze of the array pmess
267      INTEGER , INTENT(in   ) ::   ktyp       ! Tag of the recevied message
[3764]268      INTEGER, OPTIONAL, INTENT(in) :: ksource    ! source process number
[1344]269      !!
[51]270      INTEGER :: istatus(mpi_status_size)
271      INTEGER :: iflag
[3294]272      INTEGER :: use_source
[1344]273      !!----------------------------------------------------------------------
274      !
[11536]275#if defined key_mpp_mpi
[3764]276      ! If a specific process number has been passed to the receive call,
[3294]277      ! use that one. Default is to use mpi_any_source
[6140]278      use_source = mpi_any_source
279      IF( PRESENT(ksource) )   use_source = ksource
280      !
[9570]281      CALL mpi_recv( pmess, kbytes, mpi_double_precision, use_source, ktyp, mpi_comm_oce, istatus, iflag )
[11536]282#endif
[1344]283      !
[51]284   END SUBROUTINE mpprecv
[3]285
286
[51]287   SUBROUTINE mppgather( ptab, kp, pio )
288      !!----------------------------------------------------------------------
289      !!                   ***  routine mppgather  ***
[3764]290      !!
291      !! ** Purpose :   Transfert between a local subdomain array and a work
[51]292      !!     array which is distributed following the vertical level.
293      !!
[1344]294      !!----------------------------------------------------------------------
[6140]295      REAL(wp), DIMENSION(jpi,jpj)      , INTENT(in   ) ::   ptab   ! subdomain input array
296      INTEGER                           , INTENT(in   ) ::   kp     ! record length
[1344]297      REAL(wp), DIMENSION(jpi,jpj,jpnij), INTENT(  out) ::   pio    ! subdomain input array
[51]298      !!
[1344]299      INTEGER :: itaille, ierror   ! temporary integer
[51]300      !!---------------------------------------------------------------------
[1344]301      !
302      itaille = jpi * jpj
[11536]303#if defined key_mpp_mpi
[1344]304      CALL mpi_gather( ptab, itaille, mpi_double_precision, pio, itaille     ,   &
[9570]305         &                            mpi_double_precision, kp , mpi_comm_oce, ierror )
[11536]306#else
307      pio(:,:,1) = ptab(:,:)
308#endif
[1344]309      !
[51]310   END SUBROUTINE mppgather
[3]311
312
[51]313   SUBROUTINE mppscatter( pio, kp, ptab )
314      !!----------------------------------------------------------------------
315      !!                  ***  routine mppscatter  ***
316      !!
[3764]317      !! ** Purpose :   Transfert between awork array which is distributed
[51]318      !!      following the vertical level and the local subdomain array.
319      !!
320      !!----------------------------------------------------------------------
[6140]321      REAL(wp), DIMENSION(jpi,jpj,jpnij)  ::   pio    ! output array
322      INTEGER                             ::   kp     ! Tag (not used with MPI
323      REAL(wp), DIMENSION(jpi,jpj)        ::   ptab   ! subdomain array input
[1344]324      !!
325      INTEGER :: itaille, ierror   ! temporary integer
[51]326      !!---------------------------------------------------------------------
[1344]327      !
[6140]328      itaille = jpi * jpj
[1344]329      !
[11536]330#if defined key_mpp_mpi
[1344]331      CALL mpi_scatter( pio, itaille, mpi_double_precision, ptab, itaille     ,   &
[9570]332         &                            mpi_double_precision, kp  , mpi_comm_oce, ierror )
[11536]333#else
334      ptab(:,:) = pio(:,:,1)
335#endif
[1344]336      !
[51]337   END SUBROUTINE mppscatter
[3]338
[10425]339   
340   SUBROUTINE mpp_delay_sum( cdname, cdelay, y_in, pout, ldlast, kcom )
341     !!----------------------------------------------------------------------
342      !!                   ***  routine mpp_delay_sum  ***
[1344]343      !!
[10425]344      !! ** Purpose :   performed delayed mpp_sum, the result is received on next call
345      !!
[1344]346      !!----------------------------------------------------------------------
[10425]347      CHARACTER(len=*), INTENT(in   )               ::   cdname  ! name of the calling subroutine
348      CHARACTER(len=*), INTENT(in   )               ::   cdelay  ! name (used as id) of the delayed operation
349      COMPLEX(wp),      INTENT(in   ), DIMENSION(:) ::   y_in
350      REAL(wp),         INTENT(  out), DIMENSION(:) ::   pout
351      LOGICAL,          INTENT(in   )               ::   ldlast  ! true if this is the last time we call this routine
352      INTEGER,          INTENT(in   ), OPTIONAL     ::   kcom
[1344]353      !!
[10425]354      INTEGER ::   ji, isz
355      INTEGER ::   idvar
356      INTEGER ::   ierr, ilocalcomm
357      COMPLEX(wp), ALLOCATABLE, DIMENSION(:) ::   ytmp
[1344]358      !!----------------------------------------------------------------------
[11536]359#if defined key_mpp_mpi
[9570]360      ilocalcomm = mpi_comm_oce
[9019]361      IF( PRESENT(kcom) )   ilocalcomm = kcom
[3]362
[10425]363      isz = SIZE(y_in)
364     
[10437]365      IF( narea == 1 .AND. numcom == -1 ) CALL mpp_report( cdname, ld_dlg = .TRUE. )
[13]366
[10425]367      idvar = -1
368      DO ji = 1, nbdelay
369         IF( TRIM(cdelay) == TRIM(c_delaylist(ji)) ) idvar = ji
370      END DO
371      IF ( idvar == -1 )   CALL ctl_stop( 'STOP',' mpp_delay_sum : please add a new delayed exchange for '//TRIM(cdname) )
372
373      IF ( ndelayid(idvar) == 0 ) THEN         ! first call    with restart: %z1d defined in iom_delay_rst
374         !                                       --------------------------
375         IF ( SIZE(todelay(idvar)%z1d) /= isz ) THEN                  ! Check dimension coherence
376            IF(lwp) WRITE(numout,*) ' WARNING: the nb of delayed variables in restart file is not the model one'
377            DEALLOCATE(todelay(idvar)%z1d)
378            ndelayid(idvar) = -1                                      ! do as if we had no restart
379         ELSE
380            ALLOCATE(todelay(idvar)%y1d(isz))
381            todelay(idvar)%y1d(:) = CMPLX(todelay(idvar)%z1d(:), 0., wp)   ! create %y1d, complex variable needed by mpi_sumdd
382         END IF
383      ENDIF
384     
385      IF( ndelayid(idvar) == -1 ) THEN         ! first call without restart: define %y1d and %z1d from y_in with blocking allreduce
386         !                                       --------------------------
387         ALLOCATE(todelay(idvar)%z1d(isz), todelay(idvar)%y1d(isz))   ! allocate also %z1d as used for the restart
388         CALL mpi_allreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ierr )   ! get %y1d
389         todelay(idvar)%z1d(:) = REAL(todelay(idvar)%y1d(:), wp)      ! define %z1d from %y1d
390      ENDIF
391
392      IF( ndelayid(idvar) > 0 )   CALL mpp_delay_rcv( idvar )         ! make sure %z1d is received
393
394      ! send back pout from todelay(idvar)%z1d defined at previous call
395      pout(:) = todelay(idvar)%z1d(:)
396
397      ! send y_in into todelay(idvar)%y1d with a non-blocking communication
[11536]398# if defined key_mpi2
[10526]399      IF( ln_timing ) CALL tic_tac( .TRUE., ld_global = .TRUE.)
[12518]400      CALL  mpi_allreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ierr )
401      ndelayid(idvar) = 1
[10526]402      IF( ln_timing ) CALL tic_tac(.FALSE., ld_global = .TRUE.)
[11536]403# else
404      CALL mpi_iallreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ndelayid(idvar), ierr )
405# endif
[10526]406#else
[11536]407      pout(:) = REAL(y_in(:), wp)
[10526]408#endif
[10425]409
410   END SUBROUTINE mpp_delay_sum
411
[9019]412   
[10425]413   SUBROUTINE mpp_delay_max( cdname, cdelay, p_in, pout, ldlast, kcom )
[9019]414      !!----------------------------------------------------------------------
[10425]415      !!                   ***  routine mpp_delay_max  ***
[9019]416      !!
[10425]417      !! ** Purpose :   performed delayed mpp_max, the result is received on next call
[9019]418      !!
419      !!----------------------------------------------------------------------
[10425]420      CHARACTER(len=*), INTENT(in   )                 ::   cdname  ! name of the calling subroutine
421      CHARACTER(len=*), INTENT(in   )                 ::   cdelay  ! name (used as id) of the delayed operation
422      REAL(wp),         INTENT(in   ), DIMENSION(:)   ::   p_in    !
423      REAL(wp),         INTENT(  out), DIMENSION(:)   ::   pout    !
424      LOGICAL,          INTENT(in   )                 ::   ldlast  ! true if this is the last time we call this routine
425      INTEGER,          INTENT(in   ), OPTIONAL       ::   kcom
[9019]426      !!
[10425]427      INTEGER ::   ji, isz
428      INTEGER ::   idvar
429      INTEGER ::   ierr, ilocalcomm
[9019]430      !!----------------------------------------------------------------------
[11536]431#if defined key_mpp_mpi
[9570]432      ilocalcomm = mpi_comm_oce
[9019]433      IF( PRESENT(kcom) )   ilocalcomm = kcom
[6140]434
[10425]435      isz = SIZE(p_in)
[9019]436
[10437]437      IF( narea == 1 .AND. numcom == -1 ) CALL mpp_report( cdname, ld_dlg = .TRUE. )
[13]438
[10425]439      idvar = -1
440      DO ji = 1, nbdelay
441         IF( TRIM(cdelay) == TRIM(c_delaylist(ji)) ) idvar = ji
442      END DO
443      IF ( idvar == -1 )   CALL ctl_stop( 'STOP',' mpp_delay_max : please add a new delayed exchange for '//TRIM(cdname) )
[3]444
[10425]445      IF ( ndelayid(idvar) == 0 ) THEN         ! first call    with restart: %z1d defined in iom_delay_rst
446         !                                       --------------------------
447         IF ( SIZE(todelay(idvar)%z1d) /= isz ) THEN                  ! Check dimension coherence
448            IF(lwp) WRITE(numout,*) ' WARNING: the nb of delayed variables in restart file is not the model one'
449            DEALLOCATE(todelay(idvar)%z1d)
450            ndelayid(idvar) = -1                                      ! do as if we had no restart
451         END IF
452      ENDIF
[13]453
[10425]454      IF( ndelayid(idvar) == -1 ) THEN         ! first call without restart: define %z1d from p_in with a blocking allreduce
455         !                                       --------------------------
456         ALLOCATE(todelay(idvar)%z1d(isz))
457         CALL mpi_allreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_DOUBLE_PRECISION, mpi_max, ilocalcomm, ierr )   ! get %z1d
458      ENDIF
[3]459
[10425]460      IF( ndelayid(idvar) > 0 )   CALL mpp_delay_rcv( idvar )         ! make sure %z1d is received
[3]461
[10425]462      ! send back pout from todelay(idvar)%z1d defined at previous call
463      pout(:) = todelay(idvar)%z1d(:)
[13]464
[10425]465      ! send p_in into todelay(idvar)%z1d with a non-blocking communication
[11536]466# if defined key_mpi2
[10526]467      IF( ln_timing ) CALL tic_tac( .TRUE., ld_global = .TRUE.)
[12518]468      CALL  mpi_allreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_DOUBLE_PRECISION, mpi_max, ilocalcomm, ierr )
469      ndelayid(idvar) = 1
[10526]470      IF( ln_timing ) CALL tic_tac(.FALSE., ld_global = .TRUE.)
[11536]471# else
472      CALL mpi_iallreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_DOUBLE_PRECISION, mpi_max, ilocalcomm, ndelayid(idvar), ierr )
473# endif
[10526]474#else
[11536]475      pout(:) = p_in(:)
[10526]476#endif
[10425]477
478   END SUBROUTINE mpp_delay_max
479
480   
481   SUBROUTINE mpp_delay_rcv( kid )
482      !!----------------------------------------------------------------------
483      !!                   ***  routine mpp_delay_rcv  ***
[1344]484      !!
[10425]485      !! ** Purpose :  force barrier for delayed mpp (needed for restart)
[1344]486      !!
[10425]487      !!----------------------------------------------------------------------
488      INTEGER,INTENT(in   )      ::  kid 
489      INTEGER ::   ierr
490      !!----------------------------------------------------------------------
[11536]491#if defined key_mpp_mpi
[10425]492      IF( ndelayid(kid) /= -2 ) THEN 
[10526]493#if ! defined key_mpi2
[10425]494         IF( ln_timing ) CALL tic_tac( .TRUE., ld_global = .TRUE.)
495         CALL mpi_wait( ndelayid(kid), MPI_STATUS_IGNORE, ierr )                        ! make sure todelay(kid) is received
496         IF( ln_timing ) CALL tic_tac(.FALSE., ld_global = .TRUE.)
[10526]497#endif
[10425]498         IF( ASSOCIATED(todelay(kid)%y1d) )   todelay(kid)%z1d(:) = REAL(todelay(kid)%y1d(:), wp)  ! define %z1d from %y1d
499         ndelayid(kid) = -2   ! add flag to know that mpi_wait was already called on kid
500      ENDIF
[11536]501#endif
[10425]502   END SUBROUTINE mpp_delay_rcv
[3]503
[10425]504   
505   !!----------------------------------------------------------------------
506   !!    ***  mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real  ***
507   !!   
508   !!----------------------------------------------------------------------
509   !!
510#  define OPERATION_MAX
511#  define INTEGER_TYPE
512#  define DIM_0d
513#     define ROUTINE_ALLREDUCE           mppmax_int
514#     include "mpp_allreduce_generic.h90"
515#     undef ROUTINE_ALLREDUCE
516#  undef DIM_0d
517#  define DIM_1d
518#     define ROUTINE_ALLREDUCE           mppmax_a_int
519#     include "mpp_allreduce_generic.h90"
520#     undef ROUTINE_ALLREDUCE
521#  undef DIM_1d
522#  undef INTEGER_TYPE
523!
524#  define REAL_TYPE
525#  define DIM_0d
526#     define ROUTINE_ALLREDUCE           mppmax_real
527#     include "mpp_allreduce_generic.h90"
528#     undef ROUTINE_ALLREDUCE
529#  undef DIM_0d
530#  define DIM_1d
531#     define ROUTINE_ALLREDUCE           mppmax_a_real
532#     include "mpp_allreduce_generic.h90"
533#     undef ROUTINE_ALLREDUCE
534#  undef DIM_1d
535#  undef REAL_TYPE
536#  undef OPERATION_MAX
537   !!----------------------------------------------------------------------
538   !!    ***  mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real  ***
539   !!   
540   !!----------------------------------------------------------------------
541   !!
542#  define OPERATION_MIN
543#  define INTEGER_TYPE
544#  define DIM_0d
545#     define ROUTINE_ALLREDUCE           mppmin_int
546#     include "mpp_allreduce_generic.h90"
547#     undef ROUTINE_ALLREDUCE
548#  undef DIM_0d
549#  define DIM_1d
550#     define ROUTINE_ALLREDUCE           mppmin_a_int
551#     include "mpp_allreduce_generic.h90"
552#     undef ROUTINE_ALLREDUCE
553#  undef DIM_1d
554#  undef INTEGER_TYPE
555!
556#  define REAL_TYPE
557#  define DIM_0d
558#     define ROUTINE_ALLREDUCE           mppmin_real
559#     include "mpp_allreduce_generic.h90"
560#     undef ROUTINE_ALLREDUCE
561#  undef DIM_0d
562#  define DIM_1d
563#     define ROUTINE_ALLREDUCE           mppmin_a_real
564#     include "mpp_allreduce_generic.h90"
565#     undef ROUTINE_ALLREDUCE
566#  undef DIM_1d
567#  undef REAL_TYPE
568#  undef OPERATION_MIN
[869]569
[10425]570   !!----------------------------------------------------------------------
571   !!    ***  mppsum_a_int, mppsum_int, mppsum_a_real, mppsum_real  ***
572   !!   
573   !!   Global sum of 1D array or a variable (integer, real or complex)
574   !!----------------------------------------------------------------------
575   !!
576#  define OPERATION_SUM
577#  define INTEGER_TYPE
578#  define DIM_0d
579#     define ROUTINE_ALLREDUCE           mppsum_int
580#     include "mpp_allreduce_generic.h90"
581#     undef ROUTINE_ALLREDUCE
582#  undef DIM_0d
583#  define DIM_1d
584#     define ROUTINE_ALLREDUCE           mppsum_a_int
585#     include "mpp_allreduce_generic.h90"
586#     undef ROUTINE_ALLREDUCE
587#  undef DIM_1d
588#  undef INTEGER_TYPE
589!
590#  define REAL_TYPE
591#  define DIM_0d
592#     define ROUTINE_ALLREDUCE           mppsum_real
593#     include "mpp_allreduce_generic.h90"
594#     undef ROUTINE_ALLREDUCE
595#  undef DIM_0d
596#  define DIM_1d
597#     define ROUTINE_ALLREDUCE           mppsum_a_real
598#     include "mpp_allreduce_generic.h90"
599#     undef ROUTINE_ALLREDUCE
600#  undef DIM_1d
601#  undef REAL_TYPE
602#  undef OPERATION_SUM
603
604#  define OPERATION_SUM_DD
605#  define COMPLEX_TYPE
606#  define DIM_0d
607#     define ROUTINE_ALLREDUCE           mppsum_realdd
608#     include "mpp_allreduce_generic.h90"
609#     undef ROUTINE_ALLREDUCE
610#  undef DIM_0d
611#  define DIM_1d
612#     define ROUTINE_ALLREDUCE           mppsum_a_realdd
613#     include "mpp_allreduce_generic.h90"
614#     undef ROUTINE_ALLREDUCE
615#  undef DIM_1d
616#  undef COMPLEX_TYPE
617#  undef OPERATION_SUM_DD
618
619   !!----------------------------------------------------------------------
620   !!    ***  mpp_minloc2d, mpp_minloc3d, mpp_maxloc2d, mpp_maxloc3d
621   !!   
622   !!----------------------------------------------------------------------
623   !!
624#  define OPERATION_MINLOC
625#  define DIM_2d
626#     define ROUTINE_LOC           mpp_minloc2d
627#     include "mpp_loc_generic.h90"
628#     undef ROUTINE_LOC
629#  undef DIM_2d
630#  define DIM_3d
631#     define ROUTINE_LOC           mpp_minloc3d
632#     include "mpp_loc_generic.h90"
633#     undef ROUTINE_LOC
634#  undef DIM_3d
635#  undef OPERATION_MINLOC
636
637#  define OPERATION_MAXLOC
638#  define DIM_2d
639#     define ROUTINE_LOC           mpp_maxloc2d
640#     include "mpp_loc_generic.h90"
641#     undef ROUTINE_LOC
642#  undef DIM_2d
643#  define DIM_3d
644#     define ROUTINE_LOC           mpp_maxloc3d
645#     include "mpp_loc_generic.h90"
646#     undef ROUTINE_LOC
647#  undef DIM_3d
648#  undef OPERATION_MAXLOC
649
[1344]650   SUBROUTINE mppsync()
651      !!----------------------------------------------------------------------
652      !!                  ***  routine mppsync  ***
[3764]653      !!
[1344]654      !! ** Purpose :   Massively parallel processors, synchroneous
655      !!
656      !!-----------------------------------------------------------------------
657      INTEGER :: ierror
658      !!-----------------------------------------------------------------------
659      !
[11536]660#if defined key_mpp_mpi
[9570]661      CALL mpi_barrier( mpi_comm_oce, ierror )
[11536]662#endif
[1344]663      !
664   END SUBROUTINE mppsync
[3]665
666
[11536]667   SUBROUTINE mppstop( ld_abort ) 
[1344]668      !!----------------------------------------------------------------------
669      !!                  ***  routine mppstop  ***
[3764]670      !!
[3294]671      !! ** purpose :   Stop massively parallel processors method
[1344]672      !!
673      !!----------------------------------------------------------------------
[11536]674      LOGICAL, OPTIONAL, INTENT(in) :: ld_abort    ! source process number
675      LOGICAL ::   ll_abort
[1344]676      INTEGER ::   info
677      !!----------------------------------------------------------------------
[11536]678      ll_abort = .FALSE.
679      IF( PRESENT(ld_abort) ) ll_abort = ld_abort
[1344]680      !
[11536]681#if defined key_mpp_mpi
682      IF(ll_abort) THEN
[10425]683         CALL mpi_abort( MPI_COMM_WORLD )
684      ELSE
685         CALL mppsync
686         CALL mpi_finalize( info )
687      ENDIF
[11536]688#endif
689      IF( ll_abort ) STOP 123
[1344]690      !
691   END SUBROUTINE mppstop
[3]692
693
[1344]694   SUBROUTINE mpp_comm_free( kcom )
695      !!----------------------------------------------------------------------
696      INTEGER, INTENT(in) ::   kcom
697      !!
698      INTEGER :: ierr
699      !!----------------------------------------------------------------------
700      !
[11536]701#if defined key_mpp_mpi
[1344]702      CALL MPI_COMM_FREE(kcom, ierr)
[11536]703#endif
[1344]704      !
705   END SUBROUTINE mpp_comm_free
[3]706
[869]707
[2715]708   SUBROUTINE mpp_ini_znl( kumout )
[1345]709      !!----------------------------------------------------------------------
710      !!               ***  routine mpp_ini_znl  ***
711      !!
712      !! ** Purpose :   Initialize special communicator for computing zonal sum
713      !!
714      !! ** Method  : - Look for processors in the same row
715      !!              - Put their number in nrank_znl
716      !!              - Create group for the znl processors
717      !!              - Create a communicator for znl processors
718      !!              - Determine if processor should write znl files
719      !!
720      !! ** output
721      !!      ndim_rank_znl = number of processors on the same row
722      !!      ngrp_znl = group ID for the znl processors
723      !!      ncomm_znl = communicator for the ice procs.
724      !!      n_znl_root = number (in the world) of proc 0 in the ice comm.
725      !!
726      !!----------------------------------------------------------------------
[2715]727      INTEGER, INTENT(in) ::   kumout   ! ocean.output logical units
[1345]728      !
[2715]729      INTEGER :: jproc      ! dummy loop integer
730      INTEGER :: ierr, ii   ! local integer
731      INTEGER, ALLOCATABLE, DIMENSION(:) ::   kwork
732      !!----------------------------------------------------------------------
[11536]733#if defined key_mpp_mpi
[1345]734      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_world     : ', ngrp_world
735      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - mpi_comm_world : ', mpi_comm_world
[9570]736      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - mpi_comm_oce   : ', mpi_comm_oce
[1345]737      !
[2715]738      ALLOCATE( kwork(jpnij), STAT=ierr )
[11536]739      IF( ierr /= 0 ) CALL ctl_stop( 'STOP', 'mpp_ini_znl : failed to allocate 1D array of length jpnij')
[2715]740
741      IF( jpnj == 1 ) THEN
[1345]742         ngrp_znl  = ngrp_world
[9570]743         ncomm_znl = mpi_comm_oce
[1345]744      ELSE
745         !
[9570]746         CALL MPI_ALLGATHER ( njmpp, 1, mpi_integer, kwork, 1, mpi_integer, mpi_comm_oce, ierr )
[1345]747         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - kwork pour njmpp : ', kwork
748         !-$$        CALL flush(numout)
749         !
750         ! Count number of processors on the same row
751         ndim_rank_znl = 0
752         DO jproc=1,jpnij
753            IF ( kwork(jproc) == njmpp ) THEN
754               ndim_rank_znl = ndim_rank_znl + 1
755            ENDIF
756         END DO
757         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ndim_rank_znl : ', ndim_rank_znl
758         !-$$        CALL flush(numout)
759         ! Allocate the right size to nrank_znl
[1441]760         IF (ALLOCATED (nrank_znl)) DEALLOCATE(nrank_znl)
[1345]761         ALLOCATE(nrank_znl(ndim_rank_znl))
[3764]762         ii = 0
[1345]763         nrank_znl (:) = 0
764         DO jproc=1,jpnij
765            IF ( kwork(jproc) == njmpp) THEN
766               ii = ii + 1
[3764]767               nrank_znl(ii) = jproc -1
[1345]768            ENDIF
769         END DO
770         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - nrank_znl : ', nrank_znl
771         !-$$        CALL flush(numout)
772
773         ! Create the opa group
[9570]774         CALL MPI_COMM_GROUP(mpi_comm_oce,ngrp_opa,ierr)
[1345]775         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_opa : ', ngrp_opa
776         !-$$        CALL flush(numout)
777
778         ! Create the znl group from the opa group
779         CALL MPI_GROUP_INCL  ( ngrp_opa, ndim_rank_znl, nrank_znl, ngrp_znl, ierr )
780         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_znl ', ngrp_znl
781         !-$$        CALL flush(numout)
782
783         ! Create the znl communicator from the opa communicator, ie the pool of procs in the same row
[9570]784         CALL MPI_COMM_CREATE ( mpi_comm_oce, ngrp_znl, ncomm_znl, ierr )
[1345]785         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ncomm_znl ', ncomm_znl
786         !-$$        CALL flush(numout)
787         !
788      END IF
789
790      ! Determines if processor if the first (starting from i=1) on the row
[3764]791      IF ( jpni == 1 ) THEN
[1345]792         l_znl_root = .TRUE.
793      ELSE
794         l_znl_root = .FALSE.
795         kwork (1) = nimpp
[10425]796         CALL mpp_min ( 'lib_mpp', kwork(1), kcom = ncomm_znl)
[1345]797         IF ( nimpp == kwork(1)) l_znl_root = .TRUE.
798      END IF
799
[2715]800      DEALLOCATE(kwork)
[11536]801#endif
[2715]802
[1345]803   END SUBROUTINE mpp_ini_znl
804
805
[1344]806   SUBROUTINE mpp_ini_north
807      !!----------------------------------------------------------------------
808      !!               ***  routine mpp_ini_north  ***
809      !!
[3764]810      !! ** Purpose :   Initialize special communicator for north folding
[1344]811      !!      condition together with global variables needed in the mpp folding
812      !!
813      !! ** Method  : - Look for northern processors
814      !!              - Put their number in nrank_north
815      !!              - Create groups for the world processors and the north processors
816      !!              - Create a communicator for northern processors
817      !!
818      !! ** output
819      !!      njmppmax = njmpp for northern procs
820      !!      ndim_rank_north = number of processors in the northern line
821      !!      nrank_north (ndim_rank_north) = number  of the northern procs.
822      !!      ngrp_world = group ID for the world processors
823      !!      ngrp_north = group ID for the northern processors
824      !!      ncomm_north = communicator for the northern procs.
825      !!      north_root = number (in the world) of proc 0 in the northern comm.
826      !!
827      !!----------------------------------------------------------------------
828      INTEGER ::   ierr
829      INTEGER ::   jjproc
830      INTEGER ::   ii, ji
831      !!----------------------------------------------------------------------
832      !
[11536]833#if defined key_mpp_mpi
[1344]834      njmppmax = MAXVAL( njmppt )
835      !
836      ! Look for how many procs on the northern boundary
837      ndim_rank_north = 0
838      DO jjproc = 1, jpnij
839         IF( njmppt(jjproc) == njmppmax )   ndim_rank_north = ndim_rank_north + 1
840      END DO
841      !
842      ! Allocate the right size to nrank_north
[1441]843      IF (ALLOCATED (nrank_north)) DEALLOCATE(nrank_north)
[1344]844      ALLOCATE( nrank_north(ndim_rank_north) )
[869]845
[1344]846      ! Fill the nrank_north array with proc. number of northern procs.
847      ! Note : the rank start at 0 in MPI
848      ii = 0
849      DO ji = 1, jpnij
850         IF ( njmppt(ji) == njmppmax   ) THEN
851            ii=ii+1
852            nrank_north(ii)=ji-1
853         END IF
854      END DO
855      !
856      ! create the world group
[9570]857      CALL MPI_COMM_GROUP( mpi_comm_oce, ngrp_world, ierr )
[1344]858      !
859      ! Create the North group from the world group
860      CALL MPI_GROUP_INCL( ngrp_world, ndim_rank_north, nrank_north, ngrp_north, ierr )
861      !
862      ! Create the North communicator , ie the pool of procs in the north group
[9570]863      CALL MPI_COMM_CREATE( mpi_comm_oce, ngrp_north, ncomm_north, ierr )
[1344]864      !
[11536]865#endif
[1344]866   END SUBROUTINE mpp_ini_north
[869]867
868
[9019]869   SUBROUTINE DDPDD_MPI( ydda, yddb, ilen, itype )
[1976]870      !!---------------------------------------------------------------------
871      !!   Routine DDPDD_MPI: used by reduction operator MPI_SUMDD
872      !!
873      !!   Modification of original codes written by David H. Bailey
874      !!   This subroutine computes yddb(i) = ydda(i)+yddb(i)
875      !!---------------------------------------------------------------------
[9019]876      INTEGER                     , INTENT(in)    ::   ilen, itype
877      COMPLEX(wp), DIMENSION(ilen), INTENT(in)    ::   ydda
878      COMPLEX(wp), DIMENSION(ilen), INTENT(inout) ::   yddb
[1976]879      !
880      REAL(wp) :: zerr, zt1, zt2    ! local work variables
[9019]881      INTEGER  :: ji, ztmp           ! local scalar
882      !!---------------------------------------------------------------------
883      !
[1976]884      ztmp = itype   ! avoid compilation warning
[9019]885      !
[1976]886      DO ji=1,ilen
887      ! Compute ydda + yddb using Knuth's trick.
888         zt1  = real(ydda(ji)) + real(yddb(ji))
889         zerr = zt1 - real(ydda(ji))
890         zt2  = ((real(yddb(ji)) - zerr) + (real(ydda(ji)) - (zt1 - zerr))) &
891                + aimag(ydda(ji)) + aimag(yddb(ji))
892
893         ! The result is zt1 + zt2, after normalization.
894         yddb(ji) = cmplx ( zt1 + zt2, zt2 - ((zt1 + zt2) - zt1),wp )
895      END DO
[9019]896      !
[1976]897   END SUBROUTINE DDPDD_MPI
898
[6140]899
[10437]900   SUBROUTINE mpp_report( cdname, kpk, kpl, kpf, ld_lbc, ld_glb, ld_dlg )
[10425]901      !!----------------------------------------------------------------------
902      !!                  ***  routine mpp_report  ***
903      !!
904      !! ** Purpose :   report use of mpp routines per time-setp
905      !!
906      !!----------------------------------------------------------------------
907      CHARACTER(len=*),           INTENT(in   ) ::   cdname      ! name of the calling subroutine
908      INTEGER         , OPTIONAL, INTENT(in   ) ::   kpk, kpl, kpf
[10437]909      LOGICAL         , OPTIONAL, INTENT(in   ) ::   ld_lbc, ld_glb, ld_dlg
[10425]910      !!
[10982]911      CHARACTER(len=128)                        ::   ccountname  ! name of a subroutine to count communications
[10437]912      LOGICAL ::   ll_lbc, ll_glb, ll_dlg
[10982]913      INTEGER ::    ji,  jj,  jk,  jh, jf, jcount   ! dummy loop indices
[10425]914      !!----------------------------------------------------------------------
[11536]915#if defined key_mpp_mpi
[10425]916      !
917      ll_lbc = .FALSE.
918      IF( PRESENT(ld_lbc) ) ll_lbc = ld_lbc
919      ll_glb = .FALSE.
920      IF( PRESENT(ld_glb) ) ll_glb = ld_glb
[10437]921      ll_dlg = .FALSE.
922      IF( PRESENT(ld_dlg) ) ll_dlg = ld_dlg
[10425]923      !
924      ! find the smallest common frequency: default = frequency product, if multiple, choose the larger of the 2 frequency
925      IF( ncom_dttrc /= 1 )   CALL ctl_stop( 'STOP', 'mpp_report, ncom_dttrc /= 1 not coded...' ) 
926      ncom_freq = ncom_fsbc
927      !
928      IF ( ncom_stp == nit000+ncom_freq ) THEN   ! avoid to count extra communications in potential initializations at nit000
929         IF( ll_lbc ) THEN
930            IF( .NOT. ALLOCATED(ncomm_sequence) ) ALLOCATE( ncomm_sequence(ncom_rec_max,2) )
931            IF( .NOT. ALLOCATED(    crname_lbc) ) ALLOCATE(     crname_lbc(ncom_rec_max  ) )
932            n_sequence_lbc = n_sequence_lbc + 1
933            IF( n_sequence_lbc > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
934            crname_lbc(n_sequence_lbc) = cdname     ! keep the name of the calling routine
935            ncomm_sequence(n_sequence_lbc,1) = kpk*kpl   ! size of 3rd and 4th dimensions
936            ncomm_sequence(n_sequence_lbc,2) = kpf       ! number of arrays to be treated (multi)
937         ENDIF
938         IF( ll_glb ) THEN
939            IF( .NOT. ALLOCATED(crname_glb) ) ALLOCATE( crname_glb(ncom_rec_max) )
940            n_sequence_glb = n_sequence_glb + 1
941            IF( n_sequence_glb > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
942            crname_glb(n_sequence_glb) = cdname     ! keep the name of the calling routine
943         ENDIF
[10437]944         IF( ll_dlg ) THEN
945            IF( .NOT. ALLOCATED(crname_dlg) ) ALLOCATE( crname_dlg(ncom_rec_max) )
946            n_sequence_dlg = n_sequence_dlg + 1
947            IF( n_sequence_dlg > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
948            crname_dlg(n_sequence_dlg) = cdname     ! keep the name of the calling routine
949         ENDIF
[10425]950      ELSE IF ( ncom_stp == nit000+2*ncom_freq ) THEN
951         CALL ctl_opn( numcom, 'communication_report.txt', 'REPLACE', 'FORMATTED', 'SEQUENTIAL', -1, numout, .FALSE., narea )
952         WRITE(numcom,*) ' '
953         WRITE(numcom,*) ' ------------------------------------------------------------'
954         WRITE(numcom,*) ' Communication pattern report (second oce+sbc+top time step):'
955         WRITE(numcom,*) ' ------------------------------------------------------------'
956         WRITE(numcom,*) ' '
957         WRITE(numcom,'(A,I4)') ' Exchanged halos : ', n_sequence_lbc
958         jj = 0; jk = 0; jf = 0; jh = 0
959         DO ji = 1, n_sequence_lbc
960            IF ( ncomm_sequence(ji,1) .GT. 1 ) jk = jk + 1
961            IF ( ncomm_sequence(ji,2) .GT. 1 ) jf = jf + 1
962            IF ( ncomm_sequence(ji,1) .GT. 1 .AND. ncomm_sequence(ji,2) .GT. 1 ) jj = jj + 1
963            jh = MAX (jh, ncomm_sequence(ji,1)*ncomm_sequence(ji,2))
964         END DO
965         WRITE(numcom,'(A,I3)') ' 3D Exchanged halos : ', jk
966         WRITE(numcom,'(A,I3)') ' Multi arrays exchanged halos : ', jf
967         WRITE(numcom,'(A,I3)') '   from which 3D : ', jj
968         WRITE(numcom,'(A,I10)') ' Array max size : ', jh*jpi*jpj
969         WRITE(numcom,*) ' '
970         WRITE(numcom,*) ' lbc_lnk called'
[10982]971         DO ji = 1, n_sequence_lbc - 1
972            IF ( crname_lbc(ji) /= 'already counted' ) THEN
973               ccountname = crname_lbc(ji)
974               crname_lbc(ji) = 'already counted'
975               jcount = 1
976               DO jj = ji + 1, n_sequence_lbc
977                  IF ( ccountname ==  crname_lbc(jj) ) THEN
978                     jcount = jcount + 1
979                     crname_lbc(jj) = 'already counted'
980                  END IF
981               END DO
982               WRITE(numcom,'(A, I4, A, A)') ' - ', jcount,' times by subroutine ', TRIM(ccountname)
[10425]983            END IF
984         END DO
[10982]985         IF ( crname_lbc(n_sequence_lbc) /= 'already counted' ) THEN
986            WRITE(numcom,'(A, I4, A, A)') ' - ', 1,' times by subroutine ', TRIM(crname_lbc(ncom_rec_max))
987         END IF
[10425]988         WRITE(numcom,*) ' '
989         IF ( n_sequence_glb > 0 ) THEN
990            WRITE(numcom,'(A,I4)') ' Global communications : ', n_sequence_glb
991            jj = 1
992            DO ji = 2, n_sequence_glb
993               IF( crname_glb(ji-1) /= crname_glb(ji) ) THEN
994                  WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_glb(ji-1))
995                  jj = 0
996               END IF
997               jj = jj + 1 
998            END DO
999            WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_glb(n_sequence_glb))
1000            DEALLOCATE(crname_glb)
1001         ELSE
1002            WRITE(numcom,*) ' No MPI global communication '
1003         ENDIF
1004         WRITE(numcom,*) ' '
[10437]1005         IF ( n_sequence_dlg > 0 ) THEN
1006            WRITE(numcom,'(A,I4)') ' Delayed global communications : ', n_sequence_dlg
1007            jj = 1
1008            DO ji = 2, n_sequence_dlg
1009               IF( crname_dlg(ji-1) /= crname_dlg(ji) ) THEN
1010                  WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_dlg(ji-1))
1011                  jj = 0
1012               END IF
1013               jj = jj + 1 
1014            END DO
1015            WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_dlg(n_sequence_dlg))
1016            DEALLOCATE(crname_dlg)
1017         ELSE
1018            WRITE(numcom,*) ' No MPI delayed global communication '
1019         ENDIF
1020         WRITE(numcom,*) ' '
[10425]1021         WRITE(numcom,*) ' -----------------------------------------------'
1022         WRITE(numcom,*) ' '
1023         DEALLOCATE(ncomm_sequence)
1024         DEALLOCATE(crname_lbc)
1025      ENDIF
[11536]1026#endif
[10425]1027   END SUBROUTINE mpp_report
1028
[6140]1029   
[10425]1030   SUBROUTINE tic_tac (ld_tic, ld_global)
1031
1032    LOGICAL,           INTENT(IN) :: ld_tic
1033    LOGICAL, OPTIONAL, INTENT(IN) :: ld_global
1034    REAL(wp), DIMENSION(2), SAVE :: tic_wt
1035    REAL(wp),               SAVE :: tic_ct = 0._wp
1036    INTEGER :: ii
[11536]1037#if defined key_mpp_mpi
[10425]1038
1039    IF( ncom_stp <= nit000 ) RETURN
1040    IF( ncom_stp == nitend ) RETURN
1041    ii = 1
1042    IF( PRESENT( ld_global ) ) THEN
1043       IF( ld_global ) ii = 2
1044    END IF
1045   
1046    IF ( ld_tic ) THEN
1047       tic_wt(ii) = MPI_Wtime()                                                    ! start count tic->tac (waiting time)
1048       IF ( tic_ct > 0.0_wp ) compute_time = compute_time + MPI_Wtime() - tic_ct   ! cumulate count tac->tic
1049    ELSE
1050       waiting_time(ii) = waiting_time(ii) + MPI_Wtime() - tic_wt(ii)              ! cumulate count tic->tac
1051       tic_ct = MPI_Wtime()                                                        ! start count tac->tic (waiting time)
1052    ENDIF
[11536]1053#endif
[10425]1054   
1055   END SUBROUTINE tic_tac
1056
[11536]1057#if ! defined key_mpp_mpi
1058   SUBROUTINE mpi_wait(request, status, ierror)
1059      INTEGER                            , INTENT(in   ) ::   request
1060      INTEGER, DIMENSION(MPI_STATUS_SIZE), INTENT(  out) ::   status
1061      INTEGER                            , INTENT(  out) ::   ierror
1062   END SUBROUTINE mpi_wait
[1976]1063
[10425]1064   
[11536]1065   FUNCTION MPI_Wtime()
1066      REAL(wp) ::  MPI_Wtime
1067      MPI_Wtime = -1.
1068   END FUNCTION MPI_Wtime
[3]1069#endif
[2715]1070
[13]1071   !!----------------------------------------------------------------------
[11536]1072   !!   ctl_stop, ctl_warn, get_unit, ctl_opn, ctl_nam   routines
[2715]1073   !!----------------------------------------------------------------------
1074
1075   SUBROUTINE ctl_stop( cd1, cd2, cd3, cd4, cd5 ,   &
1076      &                 cd6, cd7, cd8, cd9, cd10 )
1077      !!----------------------------------------------------------------------
1078      !!                  ***  ROUTINE  stop_opa  ***
1079      !!
[3764]1080      !! ** Purpose :   print in ocean.outpput file a error message and
[2715]1081      !!                increment the error number (nstop) by one.
1082      !!----------------------------------------------------------------------
[11536]1083      CHARACTER(len=*), INTENT(in   )           ::   cd1
1084      CHARACTER(len=*), INTENT(in   ), OPTIONAL ::        cd2, cd3, cd4, cd5
1085      CHARACTER(len=*), INTENT(in   ), OPTIONAL ::   cd6, cd7, cd8, cd9, cd10
[12859]1086      !
1087      INTEGER ::   inum
[2715]1088      !!----------------------------------------------------------------------
1089      !
[3764]1090      nstop = nstop + 1
[11536]1091      !
[12859]1092      IF( numout == 6 ) THEN                          ! force to open ocean.output file if not already opened
1093         CALL ctl_opn( numout, 'ocean.output', 'APPEND', 'FORMATTED', 'SEQUENTIAL', -1, 6, .FALSE. )
1094      ELSE
1095         IF( narea > 1 .AND. cd1 == 'STOP' ) THEN     ! add an error message in ocean.output
1096            CALL ctl_opn( inum,'ocean.output', 'APPEND', 'FORMATTED', 'SEQUENTIAL', -1, 6, .FALSE. )
1097            WRITE(inum,*)
1098            WRITE(inum,'(a,i4.4)') ' ===>>> : see E R R O R in ocean.output_', narea - 1
1099         ENDIF
1100      ENDIF
[11536]1101      !
1102                            WRITE(numout,*)
1103                            WRITE(numout,*) ' ===>>> : E R R O R'
1104                            WRITE(numout,*)
1105                            WRITE(numout,*) '         ==========='
1106                            WRITE(numout,*)
1107                            WRITE(numout,*) TRIM(cd1)
[10425]1108      IF( PRESENT(cd2 ) )   WRITE(numout,*) TRIM(cd2)
1109      IF( PRESENT(cd3 ) )   WRITE(numout,*) TRIM(cd3)
1110      IF( PRESENT(cd4 ) )   WRITE(numout,*) TRIM(cd4)
1111      IF( PRESENT(cd5 ) )   WRITE(numout,*) TRIM(cd5)
1112      IF( PRESENT(cd6 ) )   WRITE(numout,*) TRIM(cd6)
1113      IF( PRESENT(cd7 ) )   WRITE(numout,*) TRIM(cd7)
1114      IF( PRESENT(cd8 ) )   WRITE(numout,*) TRIM(cd8)
1115      IF( PRESENT(cd9 ) )   WRITE(numout,*) TRIM(cd9)
1116      IF( PRESENT(cd10) )   WRITE(numout,*) TRIM(cd10)
[11536]1117                            WRITE(numout,*)
1118      !
[2715]1119                               CALL FLUSH(numout    )
1120      IF( numstp     /= -1 )   CALL FLUSH(numstp    )
[9019]1121      IF( numrun     /= -1 )   CALL FLUSH(numrun    )
[2715]1122      IF( numevo_ice /= -1 )   CALL FLUSH(numevo_ice)
1123      !
1124      IF( cd1 == 'STOP' ) THEN
[11536]1125         WRITE(numout,*) 
[10425]1126         WRITE(numout,*)  'huge E-R-R-O-R : immediate stop'
[11536]1127         WRITE(numout,*) 
[12859]1128         CALL FLUSH(numout)
1129         CALL SLEEP(60)   ! make sure that all output and abort files are written by all cores. 60s should be enough...
[11536]1130         CALL mppstop( ld_abort = .true. )
[2715]1131      ENDIF
1132      !
1133   END SUBROUTINE ctl_stop
1134
1135
1136   SUBROUTINE ctl_warn( cd1, cd2, cd3, cd4, cd5,   &
1137      &                 cd6, cd7, cd8, cd9, cd10 )
1138      !!----------------------------------------------------------------------
1139      !!                  ***  ROUTINE  stop_warn  ***
1140      !!
[3764]1141      !! ** Purpose :   print in ocean.outpput file a error message and
[2715]1142      !!                increment the warning number (nwarn) by one.
1143      !!----------------------------------------------------------------------
1144      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd1, cd2, cd3, cd4, cd5
1145      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd6, cd7, cd8, cd9, cd10
1146      !!----------------------------------------------------------------------
[3764]1147      !
1148      nwarn = nwarn + 1
[11536]1149      !
[2715]1150      IF(lwp) THEN
[11536]1151                               WRITE(numout,*)
1152                               WRITE(numout,*) ' ===>>> : W A R N I N G'
1153                               WRITE(numout,*)
1154                               WRITE(numout,*) '         ==============='
1155                               WRITE(numout,*)
1156         IF( PRESENT(cd1 ) )   WRITE(numout,*) TRIM(cd1)
1157         IF( PRESENT(cd2 ) )   WRITE(numout,*) TRIM(cd2)
1158         IF( PRESENT(cd3 ) )   WRITE(numout,*) TRIM(cd3)
1159         IF( PRESENT(cd4 ) )   WRITE(numout,*) TRIM(cd4)
1160         IF( PRESENT(cd5 ) )   WRITE(numout,*) TRIM(cd5)
1161         IF( PRESENT(cd6 ) )   WRITE(numout,*) TRIM(cd6)
1162         IF( PRESENT(cd7 ) )   WRITE(numout,*) TRIM(cd7)
1163         IF( PRESENT(cd8 ) )   WRITE(numout,*) TRIM(cd8)
1164         IF( PRESENT(cd9 ) )   WRITE(numout,*) TRIM(cd9)
1165         IF( PRESENT(cd10) )   WRITE(numout,*) TRIM(cd10)
1166                               WRITE(numout,*)
[2715]1167      ENDIF
1168      CALL FLUSH(numout)
1169      !
1170   END SUBROUTINE ctl_warn
1171
1172
1173   SUBROUTINE ctl_opn( knum, cdfile, cdstat, cdform, cdacce, klengh, kout, ldwp, karea )
1174      !!----------------------------------------------------------------------
1175      !!                  ***  ROUTINE ctl_opn  ***
1176      !!
1177      !! ** Purpose :   Open file and check if required file is available.
1178      !!
1179      !! ** Method  :   Fortan open
1180      !!----------------------------------------------------------------------
1181      INTEGER          , INTENT(  out) ::   knum      ! logical unit to open
1182      CHARACTER(len=*) , INTENT(in   ) ::   cdfile    ! file name to open
1183      CHARACTER(len=*) , INTENT(in   ) ::   cdstat    ! disposition specifier
1184      CHARACTER(len=*) , INTENT(in   ) ::   cdform    ! formatting specifier
1185      CHARACTER(len=*) , INTENT(in   ) ::   cdacce    ! access specifier
1186      INTEGER          , INTENT(in   ) ::   klengh    ! record length
1187      INTEGER          , INTENT(in   ) ::   kout      ! number of logical units for write
1188      LOGICAL          , INTENT(in   ) ::   ldwp      ! boolean term for print
1189      INTEGER, OPTIONAL, INTENT(in   ) ::   karea     ! proc number
[5836]1190      !
[2715]1191      CHARACTER(len=80) ::   clfile
1192      INTEGER           ::   iost
1193      !!----------------------------------------------------------------------
[5836]1194      !
[2715]1195      ! adapt filename
1196      ! ----------------
1197      clfile = TRIM(cdfile)
1198      IF( PRESENT( karea ) ) THEN
1199         IF( karea > 1 )   WRITE(clfile, "(a,'_',i4.4)") TRIM(clfile), karea-1
1200      ENDIF
1201#if defined key_agrif
1202      IF( .NOT. Agrif_Root() )   clfile = TRIM(Agrif_CFixed())//'_'//TRIM(clfile)
1203      knum=Agrif_Get_Unit()
1204#else
1205      knum=get_unit()
1206#endif
[10425]1207      IF( TRIM(cdfile) == '/dev/null' )   clfile = TRIM(cdfile)   ! force the use of /dev/null
[5836]1208      !
[11536]1209      IF(       cdacce(1:6) == 'DIRECT' )  THEN   ! cdacce has always more than 6 characters
[10425]1210         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS=cdstat, RECL=klengh         , ERR=100, IOSTAT=iost )
1211      ELSE IF( TRIM(cdstat) == 'APPEND' )  THEN   ! cdstat can have less than 6 characters
1212         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS='UNKNOWN', POSITION='APPEND', ERR=100, IOSTAT=iost )
[2715]1213      ELSE
[10425]1214         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS=cdstat                      , ERR=100, IOSTAT=iost )
[2715]1215      ENDIF
[10425]1216      IF( iost /= 0 .AND. TRIM(clfile) == '/dev/null' ) &   ! for windows
1217         &  OPEN(UNIT=knum,FILE='NUL', FORM=cdform, ACCESS=cdacce, STATUS=cdstat                      , ERR=100, IOSTAT=iost )   
[2715]1218      IF( iost == 0 ) THEN
[12859]1219         IF(ldwp .AND. kout > 0) THEN
[10425]1220            WRITE(kout,*) '     file   : ', TRIM(clfile),' open ok'
[2715]1221            WRITE(kout,*) '     unit   = ', knum
1222            WRITE(kout,*) '     status = ', cdstat
1223            WRITE(kout,*) '     form   = ', cdform
1224            WRITE(kout,*) '     access = ', cdacce
1225            WRITE(kout,*)
1226         ENDIF
1227      ENDIF
1228100   CONTINUE
1229      IF( iost /= 0 ) THEN
[11536]1230         WRITE(ctmp1,*) ' ===>>>> : bad opening file: ', TRIM(clfile)
1231         WRITE(ctmp2,*) ' =======   ===  '
1232         WRITE(ctmp3,*) '           unit   = ', knum
1233         WRITE(ctmp4,*) '           status = ', cdstat
1234         WRITE(ctmp5,*) '           form   = ', cdform
1235         WRITE(ctmp6,*) '           access = ', cdacce
1236         WRITE(ctmp7,*) '           iostat = ', iost
1237         WRITE(ctmp8,*) '           we stop. verify the file '
1238         CALL ctl_stop( 'STOP', ctmp1, ctmp2, ctmp3, ctmp4, ctmp5, ctmp6, ctmp7, ctmp8 )
[2715]1239      ENDIF
[5836]1240      !
[2715]1241   END SUBROUTINE ctl_opn
1242
[5836]1243
[11536]1244   SUBROUTINE ctl_nam ( kios, cdnam )
[4147]1245      !!----------------------------------------------------------------------
1246      !!                  ***  ROUTINE ctl_nam  ***
1247      !!
1248      !! ** Purpose :   Informations when error while reading a namelist
1249      !!
1250      !! ** Method  :   Fortan open
1251      !!----------------------------------------------------------------------
[11536]1252      INTEGER                                , INTENT(inout) ::   kios    ! IO status after reading the namelist
1253      CHARACTER(len=*)                       , INTENT(in   ) ::   cdnam   ! group name of namelist for which error occurs
1254      !
1255      CHARACTER(len=5) ::   clios   ! string to convert iostat in character for print
[4147]1256      !!----------------------------------------------------------------------
[5836]1257      !
[7646]1258      WRITE (clios, '(I5.0)')   kios
[4147]1259      IF( kios < 0 ) THEN         
[5836]1260         CALL ctl_warn( 'end of record or file while reading namelist '   &
1261            &           // TRIM(cdnam) // ' iostat = ' // TRIM(clios) )
[4147]1262      ENDIF
[5836]1263      !
[4147]1264      IF( kios > 0 ) THEN
[5836]1265         CALL ctl_stop( 'misspelled variable in namelist '   &
1266            &           // TRIM(cdnam) // ' iostat = ' // TRIM(clios) )
[4147]1267      ENDIF
1268      kios = 0
[5836]1269      !
[4147]1270   END SUBROUTINE ctl_nam
1271
[5836]1272
[2715]1273   INTEGER FUNCTION get_unit()
1274      !!----------------------------------------------------------------------
1275      !!                  ***  FUNCTION  get_unit  ***
1276      !!
1277      !! ** Purpose :   return the index of an unused logical unit
1278      !!----------------------------------------------------------------------
[3764]1279      LOGICAL :: llopn
[2715]1280      !!----------------------------------------------------------------------
1281      !
1282      get_unit = 15   ! choose a unit that is big enough then it is not already used in NEMO
1283      llopn = .TRUE.
1284      DO WHILE( (get_unit < 998) .AND. llopn )
1285         get_unit = get_unit + 1
1286         INQUIRE( unit = get_unit, opened = llopn )
1287      END DO
1288      IF( (get_unit == 999) .AND. llopn ) THEN
[11536]1289         CALL ctl_stop( 'STOP', 'get_unit: All logical units until 999 are used...' )
[2715]1290      ENDIF
1291      !
1292   END FUNCTION get_unit
1293
1294   !!----------------------------------------------------------------------
[3]1295END MODULE lib_mpp
Note: See TracBrowser for help on using the repository browser.