New URL for NEMO forge!   http://forge.nemo-ocean.eu

Since March 2022 along with NEMO 4.2 release, the code development moved to a self-hosted GitLab.
This present forge is now archived and remained online for history.
lib_mpp.F90 in NEMO/branches/2020/dev_r12563_ASINTER-06_ABL_improvement/src/OCE/LBC – NEMO

source: NEMO/branches/2020/dev_r12563_ASINTER-06_ABL_improvement/src/OCE/LBC/lib_mpp.F90 @ 13159

Last change on this file since 13159 was 13159, checked in by gsamson, 4 years ago

merge trunk@r13136 into ASINTER-06 branch; pass all SETTE tests; results identical to trunk@r13136; ticket #2419

  • Property svn:keywords set to Id
File size: 61.4 KB
RevLine 
[3]1MODULE lib_mpp
[13]2   !!======================================================================
3   !!                       ***  MODULE  lib_mpp  ***
[1344]4   !! Ocean numerics:  massively parallel processing library
[13]5   !!=====================================================================
[1344]6   !! History :  OPA  !  1994  (M. Guyon, J. Escobar, M. Imbard)  Original code
7   !!            7.0  !  1997  (A.M. Treguier)  SHMEM additions
8   !!            8.0  !  1998  (M. Imbard, J. Escobar, L. Colombet ) SHMEM and MPI
9   !!                 !  1998  (J.M. Molines) Open boundary conditions
[9019]10   !!   NEMO     1.0  !  2003  (J.M. Molines, G. Madec)  F90, free form
[1344]11   !!                 !  2003  (J.M. Molines) add mpp_ini_north(_3d,_2d)
12   !!             -   !  2004  (R. Bourdalle Badie)  isend option in mpi
13   !!                 !  2004  (J.M. Molines) minloc, maxloc
14   !!             -   !  2005  (G. Madec, S. Masson)  npolj=5,6 F-point & ice cases
15   !!             -   !  2005  (R. Redler) Replacement of MPI_COMM_WORLD except for MPI_Abort
16   !!             -   !  2005  (R. Benshila, G. Madec)  add extra halo case
17   !!             -   !  2008  (R. Benshila) add mpp_ini_ice
18   !!            3.2  !  2009  (R. Benshila) SHMEM suppression, north fold in lbc_nfd
[3764]19   !!            3.2  !  2009  (O. Marti)    add mpp_ini_znl
[2715]20   !!            4.0  !  2011  (G. Madec)  move ctl_ routines from in_out_manager
[9019]21   !!            3.5  !  2012  (S.Mocavero, I. Epicoco) Add mpp_lnk_bdy_3d/2d routines to optimize the BDY comm.
22   !!            3.5  !  2013  (C. Ethe, G. Madec)  message passing arrays as local variables
[6140]23   !!            3.5  !  2013  (S.Mocavero, I.Epicoco - CMCC) north fold optimizations
[9019]24   !!            3.6  !  2015  (O. Tintó and M. Castrillo - BSC) Added '_multiple' case for 2D lbc and max
25   !!            4.0  !  2017  (G. Madec) automatique allocation of array argument (use any 3rd dimension)
26   !!             -   !  2017  (G. Madec) create generic.h90 files to generate all lbc and north fold routines
[13]27   !!----------------------------------------------------------------------
[2715]28
29   !!----------------------------------------------------------------------
[6140]30   !!   ctl_stop      : update momentum and tracer Kz from a tke scheme
31   !!   ctl_warn      : initialization, namelist read, and parameters control
32   !!   ctl_opn       : Open file and check if required file is available.
33   !!   ctl_nam       : Prints informations when an error occurs while reading a namelist
[12377]34   !!   load_nml      : Read, condense and buffer namelist file into character array for use as an internal file
[2715]35   !!----------------------------------------------------------------------
[13]36   !!----------------------------------------------------------------------
[11536]37   !!   mpp_start     : get local communicator its size and rank
[2715]38   !!   mpp_lnk       : interface (defined in lbclnk) for message passing of 2d or 3d arrays (mpp_lnk_2d, mpp_lnk_3d)
[4990]39   !!   mpp_lnk_icb   : interface for message passing of 2d arrays with extra halo for icebergs (mpp_lnk_2d_icb)
[6140]40   !!   mpprecv       :
[9019]41   !!   mppsend       :
[2715]42   !!   mppscatter    :
43   !!   mppgather     :
44   !!   mpp_min       : generic interface for mppmin_int , mppmin_a_int , mppmin_real, mppmin_a_real
45   !!   mpp_max       : generic interface for mppmax_int , mppmax_a_int , mppmax_real, mppmax_a_real
46   !!   mpp_sum       : generic interface for mppsum_int , mppsum_a_int , mppsum_real, mppsum_a_real
47   !!   mpp_minloc    :
48   !!   mpp_maxloc    :
49   !!   mppsync       :
50   !!   mppstop       :
[1344]51   !!   mpp_ini_north : initialisation of north fold
[9019]52   !!   mpp_lbc_north_icb : alternative to mpp_nfd for extra outer halo with icebergs
[12377]53   !!   mpp_bcast_nml : broadcast/receive namelist character buffer from reading process to all others
[13]54   !!----------------------------------------------------------------------
[3764]55   USE dom_oce        ! ocean space and time domain
[2715]56   USE in_out_manager ! I/O manager
[3]57
[13]58   IMPLICIT NONE
[415]59   PRIVATE
[9019]60   !
[12377]61   PUBLIC   ctl_stop, ctl_warn, ctl_opn, ctl_nam, load_nml
[11536]62   PUBLIC   mpp_start, mppstop, mppsync, mpp_comm_free
[9019]63   PUBLIC   mpp_ini_north
[1344]64   PUBLIC   mpp_min, mpp_max, mpp_sum, mpp_minloc, mpp_maxloc
[10425]65   PUBLIC   mpp_delay_max, mpp_delay_sum, mpp_delay_rcv
[3294]66   PUBLIC   mppscatter, mppgather
[10425]67   PUBLIC   mpp_ini_znl
[3764]68   PUBLIC   mppsend, mpprecv                          ! needed by TAM and ICB routines
[11536]69   PUBLIC   mpp_report
[12377]70   PUBLIC   mpp_bcast_nml
[11536]71   PUBLIC   tic_tac
72#if ! defined key_mpp_mpi
73   PUBLIC MPI_Wtime
74#endif
[5429]75   
[13]76   !! * Interfaces
77   !! define generic interface for these routine as they are called sometimes
[1344]78   !! with scalar arguments instead of array arguments, which causes problems
79   !! for the compilation on AIX system as well as NEC and SGI. Ok on COMPACQ
[13]80   INTERFACE mpp_min
81      MODULE PROCEDURE mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real
82   END INTERFACE
83   INTERFACE mpp_max
[681]84      MODULE PROCEDURE mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real
[13]85   END INTERFACE
86   INTERFACE mpp_sum
[6140]87      MODULE PROCEDURE mppsum_a_int, mppsum_int, mppsum_a_real, mppsum_real,   &
[9019]88         &             mppsum_realdd, mppsum_a_realdd
[13]89   END INTERFACE
[1344]90   INTERFACE mpp_minloc
91      MODULE PROCEDURE mpp_minloc2d ,mpp_minloc3d
92   END INTERFACE
93   INTERFACE mpp_maxloc
94      MODULE PROCEDURE mpp_maxloc2d ,mpp_maxloc3d
95   END INTERFACE
[6490]96
[51]97   !! ========================= !!
98   !!  MPI  variable definition !!
99   !! ========================= !!
[11536]100#if   defined key_mpp_mpi
[1629]101!$AGRIF_DO_NOT_TREAT
[2004]102   INCLUDE 'mpif.h'
[1629]103!$AGRIF_END_DO_NOT_TREAT
[1344]104   LOGICAL, PUBLIC, PARAMETER ::   lk_mpp = .TRUE.    !: mpp flag
[11536]105#else   
106   INTEGER, PUBLIC, PARAMETER ::   MPI_STATUS_SIZE = 1
107   INTEGER, PUBLIC, PARAMETER ::   MPI_DOUBLE_PRECISION = 8
108   LOGICAL, PUBLIC, PARAMETER ::   lk_mpp = .FALSE.    !: mpp flag
109#endif
[3]110
[1344]111   INTEGER, PARAMETER         ::   nprocmax = 2**10   ! maximun dimension (required to be a power of 2)
[3764]112
[10425]113   INTEGER, PUBLIC ::   mppsize        ! number of process
114   INTEGER, PUBLIC ::   mpprank        ! process number  [ 0 - size-1 ]
[2363]115!$AGRIF_DO_NOT_TREAT
[9570]116   INTEGER, PUBLIC ::   mpi_comm_oce   ! opa local communicator
[2363]117!$AGRIF_END_DO_NOT_TREAT
[3]118
[2480]119   INTEGER :: MPI_SUMDD
[1976]120
[1345]121   ! variables used for zonal integration
122   INTEGER, PUBLIC ::   ncomm_znl       !: communicator made by the processors on the same zonal average
[9019]123   LOGICAL, PUBLIC ::   l_znl_root      !: True on the 'left'most processor on the same row
124   INTEGER         ::   ngrp_znl        !  group ID for the znl processors
125   INTEGER         ::   ndim_rank_znl   !  number of processors on the same zonal average
[2715]126   INTEGER, DIMENSION(:), ALLOCATABLE, SAVE ::   nrank_znl  ! dimension ndim_rank_znl, number of the procs into the same znl domain
[3]127
[3764]128   ! North fold condition in mpp_mpi with jpni > 1 (PUBLIC for TAM)
[9019]129   INTEGER, PUBLIC ::   ngrp_world        !: group ID for the world processors
130   INTEGER, PUBLIC ::   ngrp_opa          !: group ID for the opa processors
131   INTEGER, PUBLIC ::   ngrp_north        !: group ID for the northern processors (to be fold)
132   INTEGER, PUBLIC ::   ncomm_north       !: communicator made by the processors belonging to ngrp_north
133   INTEGER, PUBLIC ::   ndim_rank_north   !: number of 'sea' processor in the northern line (can be /= jpni !)
134   INTEGER, PUBLIC ::   njmppmax          !: value of njmpp for the processors of the northern line
135   INTEGER, PUBLIC ::   north_root        !: number (in the comm_opa) of proc 0 in the northern comm
136   INTEGER, PUBLIC, DIMENSION(:), ALLOCATABLE, SAVE ::   nrank_north   !: dimension ndim_rank_north
[3764]137
[10425]138   ! Communications summary report
139   CHARACTER(len=128), DIMENSION(:), ALLOCATABLE ::   crname_lbc                   !: names of lbc_lnk calling routines
[10437]140   CHARACTER(len=128), DIMENSION(:), ALLOCATABLE ::   crname_glb                   !: names of global comm calling routines
141   CHARACTER(len=128), DIMENSION(:), ALLOCATABLE ::   crname_dlg                   !: names of delayed global comm calling routines
[10425]142   INTEGER, PUBLIC                               ::   ncom_stp = 0                 !: copy of time step # istp
143   INTEGER, PUBLIC                               ::   ncom_fsbc = 1                !: copy of sbc time step # nn_fsbc
144   INTEGER, PUBLIC                               ::   ncom_freq                    !: frequency of comm diagnostic
145   INTEGER, PUBLIC , DIMENSION(:,:), ALLOCATABLE ::   ncomm_sequence               !: size of communicated arrays (halos)
[10781]146   INTEGER, PARAMETER, PUBLIC                    ::   ncom_rec_max = 5000          !: max number of communication record
[10425]147   INTEGER, PUBLIC                               ::   n_sequence_lbc = 0           !: # of communicated arraysvia lbc
148   INTEGER, PUBLIC                               ::   n_sequence_glb = 0           !: # of global communications
[10437]149   INTEGER, PUBLIC                               ::   n_sequence_dlg = 0           !: # of delayed global communications
[10425]150   INTEGER, PUBLIC                               ::   numcom = -1                  !: logical unit for communicaton report
151   LOGICAL, PUBLIC                               ::   l_full_nf_update = .TRUE.    !: logical for a full (2lines) update of bc at North fold report
152   INTEGER,                    PARAMETER, PUBLIC ::   nbdelay = 2       !: number of delayed operations
153   !: name (used as id) of allreduce-delayed operations
[10521]154   ! Warning: we must use the same character length in an array constructor (at least for gcc compiler)
155   CHARACTER(len=32), DIMENSION(nbdelay), PUBLIC ::   c_delaylist = (/ 'cflice', 'fwb   ' /)
[10425]156   !: component name where the allreduce-delayed operation is performed
157   CHARACTER(len=3),  DIMENSION(nbdelay), PUBLIC ::   c_delaycpnt = (/ 'ICE'   , 'OCE' /)
158   TYPE, PUBLIC ::   DELAYARR
159      REAL(   wp), POINTER, DIMENSION(:) ::  z1d => NULL()
160      COMPLEX(wp), POINTER, DIMENSION(:) ::  y1d => NULL()
161   END TYPE DELAYARR
[10817]162   TYPE( DELAYARR ), DIMENSION(nbdelay), PUBLIC, SAVE  ::   todelay         !: must have SAVE for default initialization of DELAYARR
163   INTEGER,          DIMENSION(nbdelay), PUBLIC        ::   ndelayid = -1   !: mpi request id of the delayed operations
[10425]164
165   ! timing summary report
166   REAL(wp), DIMENSION(2), PUBLIC ::  waiting_time = 0._wp
167   REAL(wp)              , PUBLIC ::  compute_time = 0._wp, elapsed_time = 0._wp
168   
[9019]169   REAL(wp), DIMENSION(:), ALLOCATABLE, SAVE ::   tampon   ! buffer in case of bsend
[3]170
[9019]171   LOGICAL, PUBLIC ::   ln_nnogather                !: namelist control of northfold comms
172   LOGICAL, PUBLIC ::   l_north_nogather = .FALSE.  !: internal control of northfold comms
[11536]173   
[12377]174   !! * Substitutions
175#  include "do_loop_substitute.h90"
[51]176   !!----------------------------------------------------------------------
[9598]177   !! NEMO/OCE 4.0 , NEMO Consortium (2018)
[888]178   !! $Id$
[10068]179   !! Software governed by the CeCILL license (see ./LICENSE)
[1344]180   !!----------------------------------------------------------------------
[3]181CONTAINS
182
[11536]183   SUBROUTINE mpp_start( localComm )
[2715]184      !!----------------------------------------------------------------------
[11536]185      !!                  ***  routine mpp_start  ***
[3764]186      !!
[11536]187      !! ** Purpose :   get mpi_comm_oce, mpprank and mppsize
[51]188      !!----------------------------------------------------------------------
[6140]189      INTEGER         , OPTIONAL   , INTENT(in   ) ::   localComm    !
[2715]190      !
[11536]191      INTEGER ::   ierr
192      LOGICAL ::   llmpi_init
[51]193      !!----------------------------------------------------------------------
[11536]194#if defined key_mpp_mpi
[1344]195      !
[11536]196      CALL mpi_initialized ( llmpi_init, ierr )
197      IF( ierr /= MPI_SUCCESS ) CALL ctl_stop( 'STOP', ' lib_mpp: Error in routine mpi_initialized' )
[2715]198
[11536]199      IF( .NOT. llmpi_init ) THEN
200         IF( PRESENT(localComm) ) THEN
201            WRITE(ctmp1,*) ' lib_mpp: You cannot provide a local communicator '
202            WRITE(ctmp2,*) '          without calling MPI_Init before ! '
203            CALL ctl_stop( 'STOP', ctmp1, ctmp2 )
204         ENDIF
205         CALL mpi_init( ierr )
206         IF( ierr /= MPI_SUCCESS ) CALL ctl_stop( 'STOP', ' lib_mpp: Error in routine mpi_init' )
[2480]207      ENDIF
[11536]208       
[3764]209      IF( PRESENT(localComm) ) THEN
[2480]210         IF( Agrif_Root() ) THEN
[9570]211            mpi_comm_oce = localComm
[2480]212         ENDIF
213      ELSE
[11536]214         CALL mpi_comm_dup( mpi_comm_world, mpi_comm_oce, ierr)
215         IF( ierr /= MPI_SUCCESS ) CALL ctl_stop( 'STOP', ' lib_mpp: Error in routine mpi_comm_dup' )
[3764]216      ENDIF
[2480]217
[11536]218# if defined key_agrif
[9019]219      IF( Agrif_Root() ) THEN
[9570]220         CALL Agrif_MPI_Init(mpi_comm_oce)
[5656]221      ELSE
[9570]222         CALL Agrif_MPI_set_grid_comm(mpi_comm_oce)
[5656]223      ENDIF
[11536]224# endif
[5656]225
[9570]226      CALL mpi_comm_rank( mpi_comm_oce, mpprank, ierr )
227      CALL mpi_comm_size( mpi_comm_oce, mppsize, ierr )
[3764]228      !
[1976]229      CALL MPI_OP_CREATE(DDPDD_MPI, .TRUE., MPI_SUMDD, ierr)
230      !
[11536]231#else
232      IF( PRESENT( localComm ) ) mpi_comm_oce = localComm
233      mppsize = 1
234      mpprank = 0
235#endif
236   END SUBROUTINE mpp_start
[3]237
[6140]238
[1344]239   SUBROUTINE mppsend( ktyp, pmess, kbytes, kdest, md_req )
[51]240      !!----------------------------------------------------------------------
241      !!                  ***  routine mppsend  ***
[3764]242      !!
[51]243      !! ** Purpose :   Send messag passing array
244      !!
245      !!----------------------------------------------------------------------
[1344]246      REAL(wp), INTENT(inout) ::   pmess(*)   ! array of real
247      INTEGER , INTENT(in   ) ::   kbytes     ! size of the array pmess
248      INTEGER , INTENT(in   ) ::   kdest      ! receive process number
249      INTEGER , INTENT(in   ) ::   ktyp       ! tag of the message
250      INTEGER , INTENT(in   ) ::   md_req     ! argument for isend
251      !!
252      INTEGER ::   iflag
[51]253      !!----------------------------------------------------------------------
[1344]254      !
[11536]255#if defined key_mpp_mpi
256      CALL mpi_isend( pmess, kbytes, mpi_double_precision, kdest , ktyp, mpi_comm_oce, md_req, iflag )
257#endif
[1344]258      !
[51]259   END SUBROUTINE mppsend
[3]260
261
[3294]262   SUBROUTINE mpprecv( ktyp, pmess, kbytes, ksource )
[51]263      !!----------------------------------------------------------------------
264      !!                  ***  routine mpprecv  ***
265      !!
266      !! ** Purpose :   Receive messag passing array
267      !!
268      !!----------------------------------------------------------------------
[1344]269      REAL(wp), INTENT(inout) ::   pmess(*)   ! array of real
270      INTEGER , INTENT(in   ) ::   kbytes     ! suze of the array pmess
271      INTEGER , INTENT(in   ) ::   ktyp       ! Tag of the recevied message
[3764]272      INTEGER, OPTIONAL, INTENT(in) :: ksource    ! source process number
[1344]273      !!
[51]274      INTEGER :: istatus(mpi_status_size)
275      INTEGER :: iflag
[3294]276      INTEGER :: use_source
[1344]277      !!----------------------------------------------------------------------
278      !
[11536]279#if defined key_mpp_mpi
[3764]280      ! If a specific process number has been passed to the receive call,
[3294]281      ! use that one. Default is to use mpi_any_source
[6140]282      use_source = mpi_any_source
283      IF( PRESENT(ksource) )   use_source = ksource
284      !
[9570]285      CALL mpi_recv( pmess, kbytes, mpi_double_precision, use_source, ktyp, mpi_comm_oce, istatus, iflag )
[11536]286#endif
[1344]287      !
[51]288   END SUBROUTINE mpprecv
[3]289
290
[51]291   SUBROUTINE mppgather( ptab, kp, pio )
292      !!----------------------------------------------------------------------
293      !!                   ***  routine mppgather  ***
[3764]294      !!
295      !! ** Purpose :   Transfert between a local subdomain array and a work
[51]296      !!     array which is distributed following the vertical level.
297      !!
[1344]298      !!----------------------------------------------------------------------
[6140]299      REAL(wp), DIMENSION(jpi,jpj)      , INTENT(in   ) ::   ptab   ! subdomain input array
300      INTEGER                           , INTENT(in   ) ::   kp     ! record length
[1344]301      REAL(wp), DIMENSION(jpi,jpj,jpnij), INTENT(  out) ::   pio    ! subdomain input array
[51]302      !!
[1344]303      INTEGER :: itaille, ierror   ! temporary integer
[51]304      !!---------------------------------------------------------------------
[1344]305      !
306      itaille = jpi * jpj
[11536]307#if defined key_mpp_mpi
[1344]308      CALL mpi_gather( ptab, itaille, mpi_double_precision, pio, itaille     ,   &
[9570]309         &                            mpi_double_precision, kp , mpi_comm_oce, ierror )
[11536]310#else
311      pio(:,:,1) = ptab(:,:)
312#endif
[1344]313      !
[51]314   END SUBROUTINE mppgather
[3]315
316
[51]317   SUBROUTINE mppscatter( pio, kp, ptab )
318      !!----------------------------------------------------------------------
319      !!                  ***  routine mppscatter  ***
320      !!
[3764]321      !! ** Purpose :   Transfert between awork array which is distributed
[51]322      !!      following the vertical level and the local subdomain array.
323      !!
324      !!----------------------------------------------------------------------
[6140]325      REAL(wp), DIMENSION(jpi,jpj,jpnij)  ::   pio    ! output array
326      INTEGER                             ::   kp     ! Tag (not used with MPI
327      REAL(wp), DIMENSION(jpi,jpj)        ::   ptab   ! subdomain array input
[1344]328      !!
329      INTEGER :: itaille, ierror   ! temporary integer
[51]330      !!---------------------------------------------------------------------
[1344]331      !
[6140]332      itaille = jpi * jpj
[1344]333      !
[11536]334#if defined key_mpp_mpi
[1344]335      CALL mpi_scatter( pio, itaille, mpi_double_precision, ptab, itaille     ,   &
[9570]336         &                            mpi_double_precision, kp  , mpi_comm_oce, ierror )
[11536]337#else
338      ptab(:,:) = pio(:,:,1)
339#endif
[1344]340      !
[51]341   END SUBROUTINE mppscatter
[3]342
[10425]343   
344   SUBROUTINE mpp_delay_sum( cdname, cdelay, y_in, pout, ldlast, kcom )
345     !!----------------------------------------------------------------------
346      !!                   ***  routine mpp_delay_sum  ***
[1344]347      !!
[10425]348      !! ** Purpose :   performed delayed mpp_sum, the result is received on next call
349      !!
[1344]350      !!----------------------------------------------------------------------
[10425]351      CHARACTER(len=*), INTENT(in   )               ::   cdname  ! name of the calling subroutine
352      CHARACTER(len=*), INTENT(in   )               ::   cdelay  ! name (used as id) of the delayed operation
353      COMPLEX(wp),      INTENT(in   ), DIMENSION(:) ::   y_in
354      REAL(wp),         INTENT(  out), DIMENSION(:) ::   pout
355      LOGICAL,          INTENT(in   )               ::   ldlast  ! true if this is the last time we call this routine
356      INTEGER,          INTENT(in   ), OPTIONAL     ::   kcom
[1344]357      !!
[10425]358      INTEGER ::   ji, isz
359      INTEGER ::   idvar
360      INTEGER ::   ierr, ilocalcomm
361      COMPLEX(wp), ALLOCATABLE, DIMENSION(:) ::   ytmp
[1344]362      !!----------------------------------------------------------------------
[11536]363#if defined key_mpp_mpi
[9570]364      ilocalcomm = mpi_comm_oce
[9019]365      IF( PRESENT(kcom) )   ilocalcomm = kcom
[3]366
[10425]367      isz = SIZE(y_in)
368     
[10437]369      IF( narea == 1 .AND. numcom == -1 ) CALL mpp_report( cdname, ld_dlg = .TRUE. )
[13]370
[10425]371      idvar = -1
372      DO ji = 1, nbdelay
373         IF( TRIM(cdelay) == TRIM(c_delaylist(ji)) ) idvar = ji
374      END DO
375      IF ( idvar == -1 )   CALL ctl_stop( 'STOP',' mpp_delay_sum : please add a new delayed exchange for '//TRIM(cdname) )
376
377      IF ( ndelayid(idvar) == 0 ) THEN         ! first call    with restart: %z1d defined in iom_delay_rst
378         !                                       --------------------------
379         IF ( SIZE(todelay(idvar)%z1d) /= isz ) THEN                  ! Check dimension coherence
380            IF(lwp) WRITE(numout,*) ' WARNING: the nb of delayed variables in restart file is not the model one'
381            DEALLOCATE(todelay(idvar)%z1d)
382            ndelayid(idvar) = -1                                      ! do as if we had no restart
383         ELSE
384            ALLOCATE(todelay(idvar)%y1d(isz))
385            todelay(idvar)%y1d(:) = CMPLX(todelay(idvar)%z1d(:), 0., wp)   ! create %y1d, complex variable needed by mpi_sumdd
386         END IF
387      ENDIF
388     
389      IF( ndelayid(idvar) == -1 ) THEN         ! first call without restart: define %y1d and %z1d from y_in with blocking allreduce
390         !                                       --------------------------
391         ALLOCATE(todelay(idvar)%z1d(isz), todelay(idvar)%y1d(isz))   ! allocate also %z1d as used for the restart
392         CALL mpi_allreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ierr )   ! get %y1d
393         todelay(idvar)%z1d(:) = REAL(todelay(idvar)%y1d(:), wp)      ! define %z1d from %y1d
394      ENDIF
395
396      IF( ndelayid(idvar) > 0 )   CALL mpp_delay_rcv( idvar )         ! make sure %z1d is received
397
398      ! send back pout from todelay(idvar)%z1d defined at previous call
399      pout(:) = todelay(idvar)%z1d(:)
400
401      ! send y_in into todelay(idvar)%y1d with a non-blocking communication
[11536]402# if defined key_mpi2
[10526]403      IF( ln_timing ) CALL tic_tac( .TRUE., ld_global = .TRUE.)
[12512]404      CALL  mpi_allreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ierr )
405      ndelayid(idvar) = 1
[10526]406      IF( ln_timing ) CALL tic_tac(.FALSE., ld_global = .TRUE.)
[11536]407# else
408      CALL mpi_iallreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ndelayid(idvar), ierr )
409# endif
[10526]410#else
[11536]411      pout(:) = REAL(y_in(:), wp)
[10526]412#endif
[10425]413
414   END SUBROUTINE mpp_delay_sum
415
[9019]416   
[10425]417   SUBROUTINE mpp_delay_max( cdname, cdelay, p_in, pout, ldlast, kcom )
[9019]418      !!----------------------------------------------------------------------
[10425]419      !!                   ***  routine mpp_delay_max  ***
[9019]420      !!
[10425]421      !! ** Purpose :   performed delayed mpp_max, the result is received on next call
[9019]422      !!
423      !!----------------------------------------------------------------------
[10425]424      CHARACTER(len=*), INTENT(in   )                 ::   cdname  ! name of the calling subroutine
425      CHARACTER(len=*), INTENT(in   )                 ::   cdelay  ! name (used as id) of the delayed operation
426      REAL(wp),         INTENT(in   ), DIMENSION(:)   ::   p_in    !
427      REAL(wp),         INTENT(  out), DIMENSION(:)   ::   pout    !
428      LOGICAL,          INTENT(in   )                 ::   ldlast  ! true if this is the last time we call this routine
429      INTEGER,          INTENT(in   ), OPTIONAL       ::   kcom
[9019]430      !!
[10425]431      INTEGER ::   ji, isz
432      INTEGER ::   idvar
433      INTEGER ::   ierr, ilocalcomm
[9019]434      !!----------------------------------------------------------------------
[11536]435#if defined key_mpp_mpi
[9570]436      ilocalcomm = mpi_comm_oce
[9019]437      IF( PRESENT(kcom) )   ilocalcomm = kcom
[6140]438
[10425]439      isz = SIZE(p_in)
[9019]440
[10437]441      IF( narea == 1 .AND. numcom == -1 ) CALL mpp_report( cdname, ld_dlg = .TRUE. )
[13]442
[10425]443      idvar = -1
444      DO ji = 1, nbdelay
445         IF( TRIM(cdelay) == TRIM(c_delaylist(ji)) ) idvar = ji
446      END DO
447      IF ( idvar == -1 )   CALL ctl_stop( 'STOP',' mpp_delay_max : please add a new delayed exchange for '//TRIM(cdname) )
[3]448
[10425]449      IF ( ndelayid(idvar) == 0 ) THEN         ! first call    with restart: %z1d defined in iom_delay_rst
450         !                                       --------------------------
451         IF ( SIZE(todelay(idvar)%z1d) /= isz ) THEN                  ! Check dimension coherence
452            IF(lwp) WRITE(numout,*) ' WARNING: the nb of delayed variables in restart file is not the model one'
453            DEALLOCATE(todelay(idvar)%z1d)
454            ndelayid(idvar) = -1                                      ! do as if we had no restart
455         END IF
456      ENDIF
[13]457
[10425]458      IF( ndelayid(idvar) == -1 ) THEN         ! first call without restart: define %z1d from p_in with a blocking allreduce
459         !                                       --------------------------
460         ALLOCATE(todelay(idvar)%z1d(isz))
461         CALL mpi_allreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_DOUBLE_PRECISION, mpi_max, ilocalcomm, ierr )   ! get %z1d
462      ENDIF
[3]463
[10425]464      IF( ndelayid(idvar) > 0 )   CALL mpp_delay_rcv( idvar )         ! make sure %z1d is received
[3]465
[10425]466      ! send back pout from todelay(idvar)%z1d defined at previous call
467      pout(:) = todelay(idvar)%z1d(:)
[13]468
[10425]469      ! send p_in into todelay(idvar)%z1d with a non-blocking communication
[11536]470# if defined key_mpi2
[10526]471      IF( ln_timing ) CALL tic_tac( .TRUE., ld_global = .TRUE.)
[12512]472      CALL  mpi_allreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_DOUBLE_PRECISION, mpi_max, ilocalcomm, ierr )
473      ndelayid(idvar) = 1
[10526]474      IF( ln_timing ) CALL tic_tac(.FALSE., ld_global = .TRUE.)
[11536]475# else
476      CALL mpi_iallreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_DOUBLE_PRECISION, mpi_max, ilocalcomm, ndelayid(idvar), ierr )
477# endif
[10526]478#else
[11536]479      pout(:) = p_in(:)
[10526]480#endif
[10425]481
482   END SUBROUTINE mpp_delay_max
483
484   
485   SUBROUTINE mpp_delay_rcv( kid )
486      !!----------------------------------------------------------------------
487      !!                   ***  routine mpp_delay_rcv  ***
[1344]488      !!
[10425]489      !! ** Purpose :  force barrier for delayed mpp (needed for restart)
[1344]490      !!
[10425]491      !!----------------------------------------------------------------------
492      INTEGER,INTENT(in   )      ::  kid 
493      INTEGER ::   ierr
494      !!----------------------------------------------------------------------
[11536]495#if defined key_mpp_mpi
[10425]496      IF( ndelayid(kid) /= -2 ) THEN 
[10526]497#if ! defined key_mpi2
[10425]498         IF( ln_timing ) CALL tic_tac( .TRUE., ld_global = .TRUE.)
499         CALL mpi_wait( ndelayid(kid), MPI_STATUS_IGNORE, ierr )                        ! make sure todelay(kid) is received
500         IF( ln_timing ) CALL tic_tac(.FALSE., ld_global = .TRUE.)
[10526]501#endif
[10425]502         IF( ASSOCIATED(todelay(kid)%y1d) )   todelay(kid)%z1d(:) = REAL(todelay(kid)%y1d(:), wp)  ! define %z1d from %y1d
503         ndelayid(kid) = -2   ! add flag to know that mpi_wait was already called on kid
504      ENDIF
[11536]505#endif
[10425]506   END SUBROUTINE mpp_delay_rcv
[3]507
[12377]508   SUBROUTINE mpp_bcast_nml( cdnambuff , kleng )
509      CHARACTER(LEN=:)    , ALLOCATABLE, INTENT(INOUT) :: cdnambuff
510      INTEGER                          , INTENT(INOUT) :: kleng
511      !!----------------------------------------------------------------------
512      !!                  ***  routine mpp_bcast_nml  ***
513      !!
514      !! ** Purpose :   broadcast namelist character buffer
515      !!
516      !!----------------------------------------------------------------------
517      !!
518      INTEGER ::   iflag
519      !!----------------------------------------------------------------------
520      !
521#if defined key_mpp_mpi
522      call MPI_BCAST(kleng, 1, MPI_INT, 0, mpi_comm_oce, iflag)
523      call MPI_BARRIER(mpi_comm_oce, iflag)
524!$AGRIF_DO_NOT_TREAT
525      IF ( .NOT. ALLOCATED(cdnambuff) ) ALLOCATE( CHARACTER(LEN=kleng) :: cdnambuff )
526!$AGRIF_END_DO_NOT_TREAT
527      call MPI_BCAST(cdnambuff, kleng, MPI_CHARACTER, 0, mpi_comm_oce, iflag)
528      call MPI_BARRIER(mpi_comm_oce, iflag)
529#endif
530      !
531   END SUBROUTINE mpp_bcast_nml
532
[10425]533   
534   !!----------------------------------------------------------------------
535   !!    ***  mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real  ***
536   !!   
537   !!----------------------------------------------------------------------
538   !!
539#  define OPERATION_MAX
540#  define INTEGER_TYPE
541#  define DIM_0d
542#     define ROUTINE_ALLREDUCE           mppmax_int
543#     include "mpp_allreduce_generic.h90"
544#     undef ROUTINE_ALLREDUCE
545#  undef DIM_0d
546#  define DIM_1d
547#     define ROUTINE_ALLREDUCE           mppmax_a_int
548#     include "mpp_allreduce_generic.h90"
549#     undef ROUTINE_ALLREDUCE
550#  undef DIM_1d
551#  undef INTEGER_TYPE
552!
553#  define REAL_TYPE
554#  define DIM_0d
555#     define ROUTINE_ALLREDUCE           mppmax_real
556#     include "mpp_allreduce_generic.h90"
557#     undef ROUTINE_ALLREDUCE
558#  undef DIM_0d
559#  define DIM_1d
560#     define ROUTINE_ALLREDUCE           mppmax_a_real
561#     include "mpp_allreduce_generic.h90"
562#     undef ROUTINE_ALLREDUCE
563#  undef DIM_1d
564#  undef REAL_TYPE
565#  undef OPERATION_MAX
566   !!----------------------------------------------------------------------
567   !!    ***  mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real  ***
568   !!   
569   !!----------------------------------------------------------------------
570   !!
571#  define OPERATION_MIN
572#  define INTEGER_TYPE
573#  define DIM_0d
574#     define ROUTINE_ALLREDUCE           mppmin_int
575#     include "mpp_allreduce_generic.h90"
576#     undef ROUTINE_ALLREDUCE
577#  undef DIM_0d
578#  define DIM_1d
579#     define ROUTINE_ALLREDUCE           mppmin_a_int
580#     include "mpp_allreduce_generic.h90"
581#     undef ROUTINE_ALLREDUCE
582#  undef DIM_1d
583#  undef INTEGER_TYPE
584!
585#  define REAL_TYPE
586#  define DIM_0d
587#     define ROUTINE_ALLREDUCE           mppmin_real
588#     include "mpp_allreduce_generic.h90"
589#     undef ROUTINE_ALLREDUCE
590#  undef DIM_0d
591#  define DIM_1d
592#     define ROUTINE_ALLREDUCE           mppmin_a_real
593#     include "mpp_allreduce_generic.h90"
594#     undef ROUTINE_ALLREDUCE
595#  undef DIM_1d
596#  undef REAL_TYPE
597#  undef OPERATION_MIN
[869]598
[10425]599   !!----------------------------------------------------------------------
600   !!    ***  mppsum_a_int, mppsum_int, mppsum_a_real, mppsum_real  ***
601   !!   
602   !!   Global sum of 1D array or a variable (integer, real or complex)
603   !!----------------------------------------------------------------------
604   !!
605#  define OPERATION_SUM
606#  define INTEGER_TYPE
607#  define DIM_0d
608#     define ROUTINE_ALLREDUCE           mppsum_int
609#     include "mpp_allreduce_generic.h90"
610#     undef ROUTINE_ALLREDUCE
611#  undef DIM_0d
612#  define DIM_1d
613#     define ROUTINE_ALLREDUCE           mppsum_a_int
614#     include "mpp_allreduce_generic.h90"
615#     undef ROUTINE_ALLREDUCE
616#  undef DIM_1d
617#  undef INTEGER_TYPE
618!
619#  define REAL_TYPE
620#  define DIM_0d
621#     define ROUTINE_ALLREDUCE           mppsum_real
622#     include "mpp_allreduce_generic.h90"
623#     undef ROUTINE_ALLREDUCE
624#  undef DIM_0d
625#  define DIM_1d
626#     define ROUTINE_ALLREDUCE           mppsum_a_real
627#     include "mpp_allreduce_generic.h90"
628#     undef ROUTINE_ALLREDUCE
629#  undef DIM_1d
630#  undef REAL_TYPE
631#  undef OPERATION_SUM
632
633#  define OPERATION_SUM_DD
634#  define COMPLEX_TYPE
635#  define DIM_0d
636#     define ROUTINE_ALLREDUCE           mppsum_realdd
637#     include "mpp_allreduce_generic.h90"
638#     undef ROUTINE_ALLREDUCE
639#  undef DIM_0d
640#  define DIM_1d
641#     define ROUTINE_ALLREDUCE           mppsum_a_realdd
642#     include "mpp_allreduce_generic.h90"
643#     undef ROUTINE_ALLREDUCE
644#  undef DIM_1d
645#  undef COMPLEX_TYPE
646#  undef OPERATION_SUM_DD
647
648   !!----------------------------------------------------------------------
649   !!    ***  mpp_minloc2d, mpp_minloc3d, mpp_maxloc2d, mpp_maxloc3d
650   !!   
651   !!----------------------------------------------------------------------
652   !!
653#  define OPERATION_MINLOC
654#  define DIM_2d
655#     define ROUTINE_LOC           mpp_minloc2d
656#     include "mpp_loc_generic.h90"
657#     undef ROUTINE_LOC
658#  undef DIM_2d
659#  define DIM_3d
660#     define ROUTINE_LOC           mpp_minloc3d
661#     include "mpp_loc_generic.h90"
662#     undef ROUTINE_LOC
663#  undef DIM_3d
664#  undef OPERATION_MINLOC
665
666#  define OPERATION_MAXLOC
667#  define DIM_2d
668#     define ROUTINE_LOC           mpp_maxloc2d
669#     include "mpp_loc_generic.h90"
670#     undef ROUTINE_LOC
671#  undef DIM_2d
672#  define DIM_3d
673#     define ROUTINE_LOC           mpp_maxloc3d
674#     include "mpp_loc_generic.h90"
675#     undef ROUTINE_LOC
676#  undef DIM_3d
677#  undef OPERATION_MAXLOC
678
[1344]679   SUBROUTINE mppsync()
680      !!----------------------------------------------------------------------
681      !!                  ***  routine mppsync  ***
[3764]682      !!
[1344]683      !! ** Purpose :   Massively parallel processors, synchroneous
684      !!
685      !!-----------------------------------------------------------------------
686      INTEGER :: ierror
687      !!-----------------------------------------------------------------------
688      !
[11536]689#if defined key_mpp_mpi
[9570]690      CALL mpi_barrier( mpi_comm_oce, ierror )
[11536]691#endif
[1344]692      !
693   END SUBROUTINE mppsync
[3]694
695
[11536]696   SUBROUTINE mppstop( ld_abort ) 
[1344]697      !!----------------------------------------------------------------------
698      !!                  ***  routine mppstop  ***
[3764]699      !!
[3294]700      !! ** purpose :   Stop massively parallel processors method
[1344]701      !!
702      !!----------------------------------------------------------------------
[11536]703      LOGICAL, OPTIONAL, INTENT(in) :: ld_abort    ! source process number
704      LOGICAL ::   ll_abort
[1344]705      INTEGER ::   info
706      !!----------------------------------------------------------------------
[11536]707      ll_abort = .FALSE.
708      IF( PRESENT(ld_abort) ) ll_abort = ld_abort
[1344]709      !
[11536]710#if defined key_mpp_mpi
711      IF(ll_abort) THEN
[10425]712         CALL mpi_abort( MPI_COMM_WORLD )
713      ELSE
714         CALL mppsync
715         CALL mpi_finalize( info )
716      ENDIF
[11536]717#endif
718      IF( ll_abort ) STOP 123
[1344]719      !
720   END SUBROUTINE mppstop
[3]721
722
[1344]723   SUBROUTINE mpp_comm_free( kcom )
724      !!----------------------------------------------------------------------
725      INTEGER, INTENT(in) ::   kcom
726      !!
727      INTEGER :: ierr
728      !!----------------------------------------------------------------------
729      !
[11536]730#if defined key_mpp_mpi
[1344]731      CALL MPI_COMM_FREE(kcom, ierr)
[11536]732#endif
[1344]733      !
734   END SUBROUTINE mpp_comm_free
[3]735
[869]736
[2715]737   SUBROUTINE mpp_ini_znl( kumout )
[1345]738      !!----------------------------------------------------------------------
739      !!               ***  routine mpp_ini_znl  ***
740      !!
741      !! ** Purpose :   Initialize special communicator for computing zonal sum
742      !!
743      !! ** Method  : - Look for processors in the same row
744      !!              - Put their number in nrank_znl
745      !!              - Create group for the znl processors
746      !!              - Create a communicator for znl processors
747      !!              - Determine if processor should write znl files
748      !!
749      !! ** output
750      !!      ndim_rank_znl = number of processors on the same row
751      !!      ngrp_znl = group ID for the znl processors
752      !!      ncomm_znl = communicator for the ice procs.
753      !!      n_znl_root = number (in the world) of proc 0 in the ice comm.
754      !!
755      !!----------------------------------------------------------------------
[2715]756      INTEGER, INTENT(in) ::   kumout   ! ocean.output logical units
[1345]757      !
[2715]758      INTEGER :: jproc      ! dummy loop integer
759      INTEGER :: ierr, ii   ! local integer
760      INTEGER, ALLOCATABLE, DIMENSION(:) ::   kwork
761      !!----------------------------------------------------------------------
[11536]762#if defined key_mpp_mpi
[1345]763      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_world     : ', ngrp_world
764      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - mpi_comm_world : ', mpi_comm_world
[9570]765      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - mpi_comm_oce   : ', mpi_comm_oce
[1345]766      !
[2715]767      ALLOCATE( kwork(jpnij), STAT=ierr )
[11536]768      IF( ierr /= 0 ) CALL ctl_stop( 'STOP', 'mpp_ini_znl : failed to allocate 1D array of length jpnij')
[2715]769
770      IF( jpnj == 1 ) THEN
[1345]771         ngrp_znl  = ngrp_world
[9570]772         ncomm_znl = mpi_comm_oce
[1345]773      ELSE
774         !
[9570]775         CALL MPI_ALLGATHER ( njmpp, 1, mpi_integer, kwork, 1, mpi_integer, mpi_comm_oce, ierr )
[1345]776         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - kwork pour njmpp : ', kwork
777         !-$$        CALL flush(numout)
778         !
779         ! Count number of processors on the same row
780         ndim_rank_znl = 0
781         DO jproc=1,jpnij
782            IF ( kwork(jproc) == njmpp ) THEN
783               ndim_rank_znl = ndim_rank_znl + 1
784            ENDIF
785         END DO
786         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ndim_rank_znl : ', ndim_rank_znl
787         !-$$        CALL flush(numout)
788         ! Allocate the right size to nrank_znl
[1441]789         IF (ALLOCATED (nrank_znl)) DEALLOCATE(nrank_znl)
[1345]790         ALLOCATE(nrank_znl(ndim_rank_znl))
[3764]791         ii = 0
[1345]792         nrank_znl (:) = 0
793         DO jproc=1,jpnij
794            IF ( kwork(jproc) == njmpp) THEN
795               ii = ii + 1
[3764]796               nrank_znl(ii) = jproc -1
[1345]797            ENDIF
798         END DO
799         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - nrank_znl : ', nrank_znl
800         !-$$        CALL flush(numout)
801
802         ! Create the opa group
[9570]803         CALL MPI_COMM_GROUP(mpi_comm_oce,ngrp_opa,ierr)
[1345]804         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_opa : ', ngrp_opa
805         !-$$        CALL flush(numout)
806
807         ! Create the znl group from the opa group
808         CALL MPI_GROUP_INCL  ( ngrp_opa, ndim_rank_znl, nrank_znl, ngrp_znl, ierr )
809         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_znl ', ngrp_znl
810         !-$$        CALL flush(numout)
811
812         ! Create the znl communicator from the opa communicator, ie the pool of procs in the same row
[9570]813         CALL MPI_COMM_CREATE ( mpi_comm_oce, ngrp_znl, ncomm_znl, ierr )
[1345]814         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ncomm_znl ', ncomm_znl
815         !-$$        CALL flush(numout)
816         !
817      END IF
818
819      ! Determines if processor if the first (starting from i=1) on the row
[3764]820      IF ( jpni == 1 ) THEN
[1345]821         l_znl_root = .TRUE.
822      ELSE
823         l_znl_root = .FALSE.
824         kwork (1) = nimpp
[10425]825         CALL mpp_min ( 'lib_mpp', kwork(1), kcom = ncomm_znl)
[1345]826         IF ( nimpp == kwork(1)) l_znl_root = .TRUE.
827      END IF
828
[2715]829      DEALLOCATE(kwork)
[11536]830#endif
[2715]831
[1345]832   END SUBROUTINE mpp_ini_znl
833
834
[1344]835   SUBROUTINE mpp_ini_north
836      !!----------------------------------------------------------------------
837      !!               ***  routine mpp_ini_north  ***
838      !!
[3764]839      !! ** Purpose :   Initialize special communicator for north folding
[1344]840      !!      condition together with global variables needed in the mpp folding
841      !!
842      !! ** Method  : - Look for northern processors
843      !!              - Put their number in nrank_north
844      !!              - Create groups for the world processors and the north processors
845      !!              - Create a communicator for northern processors
846      !!
847      !! ** output
848      !!      njmppmax = njmpp for northern procs
849      !!      ndim_rank_north = number of processors in the northern line
850      !!      nrank_north (ndim_rank_north) = number  of the northern procs.
851      !!      ngrp_world = group ID for the world processors
852      !!      ngrp_north = group ID for the northern processors
853      !!      ncomm_north = communicator for the northern procs.
854      !!      north_root = number (in the world) of proc 0 in the northern comm.
855      !!
856      !!----------------------------------------------------------------------
857      INTEGER ::   ierr
858      INTEGER ::   jjproc
859      INTEGER ::   ii, ji
860      !!----------------------------------------------------------------------
861      !
[11536]862#if defined key_mpp_mpi
[1344]863      njmppmax = MAXVAL( njmppt )
864      !
865      ! Look for how many procs on the northern boundary
866      ndim_rank_north = 0
867      DO jjproc = 1, jpnij
868         IF( njmppt(jjproc) == njmppmax )   ndim_rank_north = ndim_rank_north + 1
869      END DO
870      !
871      ! Allocate the right size to nrank_north
[1441]872      IF (ALLOCATED (nrank_north)) DEALLOCATE(nrank_north)
[1344]873      ALLOCATE( nrank_north(ndim_rank_north) )
[869]874
[1344]875      ! Fill the nrank_north array with proc. number of northern procs.
876      ! Note : the rank start at 0 in MPI
877      ii = 0
878      DO ji = 1, jpnij
879         IF ( njmppt(ji) == njmppmax   ) THEN
880            ii=ii+1
881            nrank_north(ii)=ji-1
882         END IF
883      END DO
884      !
885      ! create the world group
[9570]886      CALL MPI_COMM_GROUP( mpi_comm_oce, ngrp_world, ierr )
[1344]887      !
888      ! Create the North group from the world group
889      CALL MPI_GROUP_INCL( ngrp_world, ndim_rank_north, nrank_north, ngrp_north, ierr )
890      !
891      ! Create the North communicator , ie the pool of procs in the north group
[9570]892      CALL MPI_COMM_CREATE( mpi_comm_oce, ngrp_north, ncomm_north, ierr )
[1344]893      !
[11536]894#endif
[1344]895   END SUBROUTINE mpp_ini_north
[869]896
897
[9019]898   SUBROUTINE DDPDD_MPI( ydda, yddb, ilen, itype )
[1976]899      !!---------------------------------------------------------------------
900      !!   Routine DDPDD_MPI: used by reduction operator MPI_SUMDD
901      !!
902      !!   Modification of original codes written by David H. Bailey
903      !!   This subroutine computes yddb(i) = ydda(i)+yddb(i)
904      !!---------------------------------------------------------------------
[9019]905      INTEGER                     , INTENT(in)    ::   ilen, itype
906      COMPLEX(wp), DIMENSION(ilen), INTENT(in)    ::   ydda
907      COMPLEX(wp), DIMENSION(ilen), INTENT(inout) ::   yddb
[1976]908      !
909      REAL(wp) :: zerr, zt1, zt2    ! local work variables
[9019]910      INTEGER  :: ji, ztmp           ! local scalar
911      !!---------------------------------------------------------------------
912      !
[1976]913      ztmp = itype   ! avoid compilation warning
[9019]914      !
[1976]915      DO ji=1,ilen
916      ! Compute ydda + yddb using Knuth's trick.
917         zt1  = real(ydda(ji)) + real(yddb(ji))
918         zerr = zt1 - real(ydda(ji))
919         zt2  = ((real(yddb(ji)) - zerr) + (real(ydda(ji)) - (zt1 - zerr))) &
920                + aimag(ydda(ji)) + aimag(yddb(ji))
921
922         ! The result is zt1 + zt2, after normalization.
923         yddb(ji) = cmplx ( zt1 + zt2, zt2 - ((zt1 + zt2) - zt1),wp )
924      END DO
[9019]925      !
[1976]926   END SUBROUTINE DDPDD_MPI
927
[6140]928
[10437]929   SUBROUTINE mpp_report( cdname, kpk, kpl, kpf, ld_lbc, ld_glb, ld_dlg )
[10425]930      !!----------------------------------------------------------------------
931      !!                  ***  routine mpp_report  ***
932      !!
933      !! ** Purpose :   report use of mpp routines per time-setp
934      !!
935      !!----------------------------------------------------------------------
936      CHARACTER(len=*),           INTENT(in   ) ::   cdname      ! name of the calling subroutine
937      INTEGER         , OPTIONAL, INTENT(in   ) ::   kpk, kpl, kpf
[10437]938      LOGICAL         , OPTIONAL, INTENT(in   ) ::   ld_lbc, ld_glb, ld_dlg
[10425]939      !!
[10982]940      CHARACTER(len=128)                        ::   ccountname  ! name of a subroutine to count communications
[10437]941      LOGICAL ::   ll_lbc, ll_glb, ll_dlg
[10982]942      INTEGER ::    ji,  jj,  jk,  jh, jf, jcount   ! dummy loop indices
[10425]943      !!----------------------------------------------------------------------
[11536]944#if defined key_mpp_mpi
[10425]945      !
946      ll_lbc = .FALSE.
947      IF( PRESENT(ld_lbc) ) ll_lbc = ld_lbc
948      ll_glb = .FALSE.
949      IF( PRESENT(ld_glb) ) ll_glb = ld_glb
[10437]950      ll_dlg = .FALSE.
951      IF( PRESENT(ld_dlg) ) ll_dlg = ld_dlg
[10425]952      !
953      ! find the smallest common frequency: default = frequency product, if multiple, choose the larger of the 2 frequency
954      ncom_freq = ncom_fsbc
955      !
956      IF ( ncom_stp == nit000+ncom_freq ) THEN   ! avoid to count extra communications in potential initializations at nit000
957         IF( ll_lbc ) THEN
958            IF( .NOT. ALLOCATED(ncomm_sequence) ) ALLOCATE( ncomm_sequence(ncom_rec_max,2) )
959            IF( .NOT. ALLOCATED(    crname_lbc) ) ALLOCATE(     crname_lbc(ncom_rec_max  ) )
960            n_sequence_lbc = n_sequence_lbc + 1
961            IF( n_sequence_lbc > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
962            crname_lbc(n_sequence_lbc) = cdname     ! keep the name of the calling routine
963            ncomm_sequence(n_sequence_lbc,1) = kpk*kpl   ! size of 3rd and 4th dimensions
964            ncomm_sequence(n_sequence_lbc,2) = kpf       ! number of arrays to be treated (multi)
965         ENDIF
966         IF( ll_glb ) THEN
967            IF( .NOT. ALLOCATED(crname_glb) ) ALLOCATE( crname_glb(ncom_rec_max) )
968            n_sequence_glb = n_sequence_glb + 1
969            IF( n_sequence_glb > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
970            crname_glb(n_sequence_glb) = cdname     ! keep the name of the calling routine
971         ENDIF
[10437]972         IF( ll_dlg ) THEN
973            IF( .NOT. ALLOCATED(crname_dlg) ) ALLOCATE( crname_dlg(ncom_rec_max) )
974            n_sequence_dlg = n_sequence_dlg + 1
975            IF( n_sequence_dlg > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
976            crname_dlg(n_sequence_dlg) = cdname     ! keep the name of the calling routine
977         ENDIF
[10425]978      ELSE IF ( ncom_stp == nit000+2*ncom_freq ) THEN
979         CALL ctl_opn( numcom, 'communication_report.txt', 'REPLACE', 'FORMATTED', 'SEQUENTIAL', -1, numout, .FALSE., narea )
980         WRITE(numcom,*) ' '
981         WRITE(numcom,*) ' ------------------------------------------------------------'
982         WRITE(numcom,*) ' Communication pattern report (second oce+sbc+top time step):'
983         WRITE(numcom,*) ' ------------------------------------------------------------'
984         WRITE(numcom,*) ' '
985         WRITE(numcom,'(A,I4)') ' Exchanged halos : ', n_sequence_lbc
986         jj = 0; jk = 0; jf = 0; jh = 0
987         DO ji = 1, n_sequence_lbc
988            IF ( ncomm_sequence(ji,1) .GT. 1 ) jk = jk + 1
989            IF ( ncomm_sequence(ji,2) .GT. 1 ) jf = jf + 1
990            IF ( ncomm_sequence(ji,1) .GT. 1 .AND. ncomm_sequence(ji,2) .GT. 1 ) jj = jj + 1
991            jh = MAX (jh, ncomm_sequence(ji,1)*ncomm_sequence(ji,2))
992         END DO
993         WRITE(numcom,'(A,I3)') ' 3D Exchanged halos : ', jk
994         WRITE(numcom,'(A,I3)') ' Multi arrays exchanged halos : ', jf
995         WRITE(numcom,'(A,I3)') '   from which 3D : ', jj
996         WRITE(numcom,'(A,I10)') ' Array max size : ', jh*jpi*jpj
997         WRITE(numcom,*) ' '
998         WRITE(numcom,*) ' lbc_lnk called'
[10982]999         DO ji = 1, n_sequence_lbc - 1
1000            IF ( crname_lbc(ji) /= 'already counted' ) THEN
1001               ccountname = crname_lbc(ji)
1002               crname_lbc(ji) = 'already counted'
1003               jcount = 1
1004               DO jj = ji + 1, n_sequence_lbc
1005                  IF ( ccountname ==  crname_lbc(jj) ) THEN
1006                     jcount = jcount + 1
1007                     crname_lbc(jj) = 'already counted'
1008                  END IF
1009               END DO
1010               WRITE(numcom,'(A, I4, A, A)') ' - ', jcount,' times by subroutine ', TRIM(ccountname)
[10425]1011            END IF
1012         END DO
[10982]1013         IF ( crname_lbc(n_sequence_lbc) /= 'already counted' ) THEN
1014            WRITE(numcom,'(A, I4, A, A)') ' - ', 1,' times by subroutine ', TRIM(crname_lbc(ncom_rec_max))
1015         END IF
[10425]1016         WRITE(numcom,*) ' '
1017         IF ( n_sequence_glb > 0 ) THEN
1018            WRITE(numcom,'(A,I4)') ' Global communications : ', n_sequence_glb
1019            jj = 1
1020            DO ji = 2, n_sequence_glb
1021               IF( crname_glb(ji-1) /= crname_glb(ji) ) THEN
1022                  WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_glb(ji-1))
1023                  jj = 0
1024               END IF
1025               jj = jj + 1 
1026            END DO
1027            WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_glb(n_sequence_glb))
1028            DEALLOCATE(crname_glb)
1029         ELSE
1030            WRITE(numcom,*) ' No MPI global communication '
1031         ENDIF
1032         WRITE(numcom,*) ' '
[10437]1033         IF ( n_sequence_dlg > 0 ) THEN
1034            WRITE(numcom,'(A,I4)') ' Delayed global communications : ', n_sequence_dlg
1035            jj = 1
1036            DO ji = 2, n_sequence_dlg
1037               IF( crname_dlg(ji-1) /= crname_dlg(ji) ) THEN
1038                  WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_dlg(ji-1))
1039                  jj = 0
1040               END IF
1041               jj = jj + 1 
1042            END DO
1043            WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_dlg(n_sequence_dlg))
1044            DEALLOCATE(crname_dlg)
1045         ELSE
1046            WRITE(numcom,*) ' No MPI delayed global communication '
1047         ENDIF
1048         WRITE(numcom,*) ' '
[10425]1049         WRITE(numcom,*) ' -----------------------------------------------'
1050         WRITE(numcom,*) ' '
1051         DEALLOCATE(ncomm_sequence)
1052         DEALLOCATE(crname_lbc)
1053      ENDIF
[11536]1054#endif
[10425]1055   END SUBROUTINE mpp_report
1056
[6140]1057   
[10425]1058   SUBROUTINE tic_tac (ld_tic, ld_global)
1059
1060    LOGICAL,           INTENT(IN) :: ld_tic
1061    LOGICAL, OPTIONAL, INTENT(IN) :: ld_global
1062    REAL(wp), DIMENSION(2), SAVE :: tic_wt
1063    REAL(wp),               SAVE :: tic_ct = 0._wp
1064    INTEGER :: ii
[11536]1065#if defined key_mpp_mpi
[10425]1066
1067    IF( ncom_stp <= nit000 ) RETURN
1068    IF( ncom_stp == nitend ) RETURN
1069    ii = 1
1070    IF( PRESENT( ld_global ) ) THEN
1071       IF( ld_global ) ii = 2
1072    END IF
1073   
1074    IF ( ld_tic ) THEN
1075       tic_wt(ii) = MPI_Wtime()                                                    ! start count tic->tac (waiting time)
1076       IF ( tic_ct > 0.0_wp ) compute_time = compute_time + MPI_Wtime() - tic_ct   ! cumulate count tac->tic
1077    ELSE
1078       waiting_time(ii) = waiting_time(ii) + MPI_Wtime() - tic_wt(ii)              ! cumulate count tic->tac
1079       tic_ct = MPI_Wtime()                                                        ! start count tac->tic (waiting time)
1080    ENDIF
[11536]1081#endif
[10425]1082   
1083   END SUBROUTINE tic_tac
1084
[11536]1085#if ! defined key_mpp_mpi
1086   SUBROUTINE mpi_wait(request, status, ierror)
1087      INTEGER                            , INTENT(in   ) ::   request
1088      INTEGER, DIMENSION(MPI_STATUS_SIZE), INTENT(  out) ::   status
1089      INTEGER                            , INTENT(  out) ::   ierror
1090   END SUBROUTINE mpi_wait
[1976]1091
[10425]1092   
[11536]1093   FUNCTION MPI_Wtime()
1094      REAL(wp) ::  MPI_Wtime
1095      MPI_Wtime = -1.
1096   END FUNCTION MPI_Wtime
[3]1097#endif
[2715]1098
[13]1099   !!----------------------------------------------------------------------
[12377]1100   !!   ctl_stop, ctl_warn, get_unit, ctl_opn, ctl_nam, load_nml   routines
[2715]1101   !!----------------------------------------------------------------------
1102
1103   SUBROUTINE ctl_stop( cd1, cd2, cd3, cd4, cd5 ,   &
1104      &                 cd6, cd7, cd8, cd9, cd10 )
1105      !!----------------------------------------------------------------------
1106      !!                  ***  ROUTINE  stop_opa  ***
1107      !!
[3764]1108      !! ** Purpose :   print in ocean.outpput file a error message and
[2715]1109      !!                increment the error number (nstop) by one.
1110      !!----------------------------------------------------------------------
[11536]1111      CHARACTER(len=*), INTENT(in   )           ::   cd1
1112      CHARACTER(len=*), INTENT(in   ), OPTIONAL ::        cd2, cd3, cd4, cd5
1113      CHARACTER(len=*), INTENT(in   ), OPTIONAL ::   cd6, cd7, cd8, cd9, cd10
[13159]1114      !
1115      CHARACTER(LEN=8) ::   clfmt            ! writing format
1116      INTEGER          ::   inum
[2715]1117      !!----------------------------------------------------------------------
1118      !
[3764]1119      nstop = nstop + 1
[11536]1120      !
[13159]1121      IF( cd1 == 'STOP' .AND. narea /= 1 ) THEN    ! Immediate stop: add an arror message in 'ocean.output' file
1122         CALL ctl_opn( inum, 'ocean.output', 'APPEND', 'FORMATTED', 'SEQUENTIAL', -1, 6, .FALSE. )
1123         WRITE(inum,*)
1124         WRITE(inum,*) ' ==>>>   Look for "E R R O R" messages in all existing *ocean.output* files'
1125         CLOSE(inum)
1126      ENDIF
1127      IF( numout == 6 ) THEN                       ! force to open ocean.output file if not already opened
1128         CALL ctl_opn( numout, 'ocean.output', 'REPLACE', 'FORMATTED', 'SEQUENTIAL', -1, -1, .FALSE., narea )
1129      ENDIF
[11536]1130      !
1131                            WRITE(numout,*)
1132                            WRITE(numout,*) ' ===>>> : E R R O R'
1133                            WRITE(numout,*)
1134                            WRITE(numout,*) '         ==========='
1135                            WRITE(numout,*)
1136                            WRITE(numout,*) TRIM(cd1)
[10425]1137      IF( PRESENT(cd2 ) )   WRITE(numout,*) TRIM(cd2)
1138      IF( PRESENT(cd3 ) )   WRITE(numout,*) TRIM(cd3)
1139      IF( PRESENT(cd4 ) )   WRITE(numout,*) TRIM(cd4)
1140      IF( PRESENT(cd5 ) )   WRITE(numout,*) TRIM(cd5)
1141      IF( PRESENT(cd6 ) )   WRITE(numout,*) TRIM(cd6)
1142      IF( PRESENT(cd7 ) )   WRITE(numout,*) TRIM(cd7)
1143      IF( PRESENT(cd8 ) )   WRITE(numout,*) TRIM(cd8)
1144      IF( PRESENT(cd9 ) )   WRITE(numout,*) TRIM(cd9)
1145      IF( PRESENT(cd10) )   WRITE(numout,*) TRIM(cd10)
[11536]1146                            WRITE(numout,*)
1147      !
[2715]1148                               CALL FLUSH(numout    )
1149      IF( numstp     /= -1 )   CALL FLUSH(numstp    )
[9019]1150      IF( numrun     /= -1 )   CALL FLUSH(numrun    )
[2715]1151      IF( numevo_ice /= -1 )   CALL FLUSH(numevo_ice)
1152      !
1153      IF( cd1 == 'STOP' ) THEN
[11536]1154         WRITE(numout,*) 
[10425]1155         WRITE(numout,*)  'huge E-R-R-O-R : immediate stop'
[11536]1156         WRITE(numout,*) 
[13159]1157         CALL FLUSH(numout)
1158         CALL SLEEP(60)   ! make sure that all output and abort files are written by all cores. 60s should be enough...
[11536]1159         CALL mppstop( ld_abort = .true. )
[2715]1160      ENDIF
1161      !
1162   END SUBROUTINE ctl_stop
1163
1164
1165   SUBROUTINE ctl_warn( cd1, cd2, cd3, cd4, cd5,   &
1166      &                 cd6, cd7, cd8, cd9, cd10 )
1167      !!----------------------------------------------------------------------
1168      !!                  ***  ROUTINE  stop_warn  ***
1169      !!
[3764]1170      !! ** Purpose :   print in ocean.outpput file a error message and
[2715]1171      !!                increment the warning number (nwarn) by one.
1172      !!----------------------------------------------------------------------
1173      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd1, cd2, cd3, cd4, cd5
1174      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd6, cd7, cd8, cd9, cd10
1175      !!----------------------------------------------------------------------
[3764]1176      !
1177      nwarn = nwarn + 1
[11536]1178      !
[2715]1179      IF(lwp) THEN
[11536]1180                               WRITE(numout,*)
1181                               WRITE(numout,*) ' ===>>> : W A R N I N G'
1182                               WRITE(numout,*)
1183                               WRITE(numout,*) '         ==============='
1184                               WRITE(numout,*)
1185         IF( PRESENT(cd1 ) )   WRITE(numout,*) TRIM(cd1)
1186         IF( PRESENT(cd2 ) )   WRITE(numout,*) TRIM(cd2)
1187         IF( PRESENT(cd3 ) )   WRITE(numout,*) TRIM(cd3)
1188         IF( PRESENT(cd4 ) )   WRITE(numout,*) TRIM(cd4)
1189         IF( PRESENT(cd5 ) )   WRITE(numout,*) TRIM(cd5)
1190         IF( PRESENT(cd6 ) )   WRITE(numout,*) TRIM(cd6)
1191         IF( PRESENT(cd7 ) )   WRITE(numout,*) TRIM(cd7)
1192         IF( PRESENT(cd8 ) )   WRITE(numout,*) TRIM(cd8)
1193         IF( PRESENT(cd9 ) )   WRITE(numout,*) TRIM(cd9)
1194         IF( PRESENT(cd10) )   WRITE(numout,*) TRIM(cd10)
1195                               WRITE(numout,*)
[2715]1196      ENDIF
1197      CALL FLUSH(numout)
1198      !
1199   END SUBROUTINE ctl_warn
1200
1201
1202   SUBROUTINE ctl_opn( knum, cdfile, cdstat, cdform, cdacce, klengh, kout, ldwp, karea )
1203      !!----------------------------------------------------------------------
1204      !!                  ***  ROUTINE ctl_opn  ***
1205      !!
1206      !! ** Purpose :   Open file and check if required file is available.
1207      !!
1208      !! ** Method  :   Fortan open
1209      !!----------------------------------------------------------------------
1210      INTEGER          , INTENT(  out) ::   knum      ! logical unit to open
1211      CHARACTER(len=*) , INTENT(in   ) ::   cdfile    ! file name to open
1212      CHARACTER(len=*) , INTENT(in   ) ::   cdstat    ! disposition specifier
1213      CHARACTER(len=*) , INTENT(in   ) ::   cdform    ! formatting specifier
1214      CHARACTER(len=*) , INTENT(in   ) ::   cdacce    ! access specifier
1215      INTEGER          , INTENT(in   ) ::   klengh    ! record length
1216      INTEGER          , INTENT(in   ) ::   kout      ! number of logical units for write
1217      LOGICAL          , INTENT(in   ) ::   ldwp      ! boolean term for print
1218      INTEGER, OPTIONAL, INTENT(in   ) ::   karea     ! proc number
[5836]1219      !
[2715]1220      CHARACTER(len=80) ::   clfile
[13159]1221      CHARACTER(LEN=10) ::   clfmt            ! writing format
[2715]1222      INTEGER           ::   iost
[13159]1223      INTEGER           ::   idg              ! number of digits
[2715]1224      !!----------------------------------------------------------------------
[5836]1225      !
[2715]1226      ! adapt filename
1227      ! ----------------
1228      clfile = TRIM(cdfile)
1229      IF( PRESENT( karea ) ) THEN
[13159]1230         IF( karea > 1 ) THEN
1231            ! Warning: jpnij is maybe not already defined when calling ctl_opn -> use mppsize instead of jpnij
1232            idg = MAX( INT(LOG10(REAL(MAX(1,mppsize-1),wp))) + 1, 4 )      ! how many digits to we need to write? min=4, max=9
1233            WRITE(clfmt, "('(a,a,i', i1, '.', i1, ')')") idg, idg          ! '(a,a,ix.x)'
1234            WRITE(clfile, clfmt) TRIM(clfile), '_', karea-1
1235         ENDIF
[2715]1236      ENDIF
1237#if defined key_agrif
1238      IF( .NOT. Agrif_Root() )   clfile = TRIM(Agrif_CFixed())//'_'//TRIM(clfile)
1239      knum=Agrif_Get_Unit()
1240#else
1241      knum=get_unit()
1242#endif
[10425]1243      IF( TRIM(cdfile) == '/dev/null' )   clfile = TRIM(cdfile)   ! force the use of /dev/null
[5836]1244      !
[11536]1245      IF(       cdacce(1:6) == 'DIRECT' )  THEN   ! cdacce has always more than 6 characters
[10425]1246         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS=cdstat, RECL=klengh         , ERR=100, IOSTAT=iost )
1247      ELSE IF( TRIM(cdstat) == 'APPEND' )  THEN   ! cdstat can have less than 6 characters
1248         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS='UNKNOWN', POSITION='APPEND', ERR=100, IOSTAT=iost )
[2715]1249      ELSE
[10425]1250         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS=cdstat                      , ERR=100, IOSTAT=iost )
[2715]1251      ENDIF
[10425]1252      IF( iost /= 0 .AND. TRIM(clfile) == '/dev/null' ) &   ! for windows
1253         &  OPEN(UNIT=knum,FILE='NUL', FORM=cdform, ACCESS=cdacce, STATUS=cdstat                      , ERR=100, IOSTAT=iost )   
[2715]1254      IF( iost == 0 ) THEN
[12377]1255         IF(ldwp .AND. kout > 0) THEN
[10425]1256            WRITE(kout,*) '     file   : ', TRIM(clfile),' open ok'
[2715]1257            WRITE(kout,*) '     unit   = ', knum
1258            WRITE(kout,*) '     status = ', cdstat
1259            WRITE(kout,*) '     form   = ', cdform
1260            WRITE(kout,*) '     access = ', cdacce
1261            WRITE(kout,*)
1262         ENDIF
1263      ENDIF
1264100   CONTINUE
1265      IF( iost /= 0 ) THEN
[11536]1266         WRITE(ctmp1,*) ' ===>>>> : bad opening file: ', TRIM(clfile)
1267         WRITE(ctmp2,*) ' =======   ===  '
1268         WRITE(ctmp3,*) '           unit   = ', knum
1269         WRITE(ctmp4,*) '           status = ', cdstat
1270         WRITE(ctmp5,*) '           form   = ', cdform
1271         WRITE(ctmp6,*) '           access = ', cdacce
1272         WRITE(ctmp7,*) '           iostat = ', iost
1273         WRITE(ctmp8,*) '           we stop. verify the file '
1274         CALL ctl_stop( 'STOP', ctmp1, ctmp2, ctmp3, ctmp4, ctmp5, ctmp6, ctmp7, ctmp8 )
[2715]1275      ENDIF
[5836]1276      !
[2715]1277   END SUBROUTINE ctl_opn
1278
[5836]1279
[11536]1280   SUBROUTINE ctl_nam ( kios, cdnam )
[4147]1281      !!----------------------------------------------------------------------
1282      !!                  ***  ROUTINE ctl_nam  ***
1283      !!
1284      !! ** Purpose :   Informations when error while reading a namelist
1285      !!
1286      !! ** Method  :   Fortan open
1287      !!----------------------------------------------------------------------
[11536]1288      INTEGER                                , INTENT(inout) ::   kios    ! IO status after reading the namelist
1289      CHARACTER(len=*)                       , INTENT(in   ) ::   cdnam   ! group name of namelist for which error occurs
1290      !
1291      CHARACTER(len=5) ::   clios   ! string to convert iostat in character for print
[4147]1292      !!----------------------------------------------------------------------
[5836]1293      !
[7646]1294      WRITE (clios, '(I5.0)')   kios
[4147]1295      IF( kios < 0 ) THEN         
[5836]1296         CALL ctl_warn( 'end of record or file while reading namelist '   &
1297            &           // TRIM(cdnam) // ' iostat = ' // TRIM(clios) )
[4147]1298      ENDIF
[5836]1299      !
[4147]1300      IF( kios > 0 ) THEN
[5836]1301         CALL ctl_stop( 'misspelled variable in namelist '   &
1302            &           // TRIM(cdnam) // ' iostat = ' // TRIM(clios) )
[4147]1303      ENDIF
1304      kios = 0
[5836]1305      !
[4147]1306   END SUBROUTINE ctl_nam
1307
[5836]1308
[2715]1309   INTEGER FUNCTION get_unit()
1310      !!----------------------------------------------------------------------
1311      !!                  ***  FUNCTION  get_unit  ***
1312      !!
1313      !! ** Purpose :   return the index of an unused logical unit
1314      !!----------------------------------------------------------------------
[3764]1315      LOGICAL :: llopn
[2715]1316      !!----------------------------------------------------------------------
1317      !
1318      get_unit = 15   ! choose a unit that is big enough then it is not already used in NEMO
1319      llopn = .TRUE.
1320      DO WHILE( (get_unit < 998) .AND. llopn )
1321         get_unit = get_unit + 1
1322         INQUIRE( unit = get_unit, opened = llopn )
1323      END DO
1324      IF( (get_unit == 999) .AND. llopn ) THEN
[11536]1325         CALL ctl_stop( 'STOP', 'get_unit: All logical units until 999 are used...' )
[2715]1326      ENDIF
1327      !
1328   END FUNCTION get_unit
1329
[12377]1330   SUBROUTINE load_nml( cdnambuff , cdnamfile, kout, ldwp)
1331      CHARACTER(LEN=:)    , ALLOCATABLE, INTENT(INOUT) :: cdnambuff
1332      CHARACTER(LEN=*), INTENT(IN )                :: cdnamfile
1333      CHARACTER(LEN=256)                           :: chline
1334      CHARACTER(LEN=1)                             :: csp
1335      INTEGER, INTENT(IN)                          :: kout
1336      LOGICAL, INTENT(IN)                          :: ldwp  !: .true. only for the root broadcaster
1337      INTEGER                                      :: itot, iun, iltc, inl, ios, itotsav
1338      !
1339      !csp = NEW_LINE('A')
1340      ! a new line character is the best seperator but some systems (e.g.Cray)
1341      ! seem to terminate namelist reads from internal files early if they
1342      ! encounter new-lines. Use a single space for safety.
1343      csp = ' '
1344      !
1345      ! Check if the namelist buffer has already been allocated. Return if it has.
1346      !
1347      IF ( ALLOCATED( cdnambuff ) ) RETURN
1348      IF( ldwp ) THEN
1349         !
1350         ! Open namelist file
1351         !
1352         CALL ctl_opn( iun, cdnamfile, 'OLD', 'FORMATTED', 'SEQUENTIAL', -1, kout, ldwp )
1353         !
1354         ! First pass: count characters excluding comments and trimable white space
1355         !
1356         itot=0
1357     10  READ(iun,'(A256)',END=20,ERR=20) chline
1358         iltc = LEN_TRIM(chline)
1359         IF ( iltc.GT.0 ) THEN
1360          inl = INDEX(chline, '!') 
1361          IF( inl.eq.0 ) THEN
1362           itot = itot + iltc + 1                                ! +1 for the newline character
1363          ELSEIF( inl.GT.0 .AND. LEN_TRIM( chline(1:inl-1) ).GT.0 ) THEN
1364           itot = itot + inl                                  !  includes +1 for the newline character
1365          ENDIF
1366         ENDIF
1367         GOTO 10
1368     20  CONTINUE
1369         !
1370         ! Allocate text cdnambuff for condensed namelist
1371         !
1372!$AGRIF_DO_NOT_TREAT
1373         ALLOCATE( CHARACTER(LEN=itot) :: cdnambuff )
1374!$AGRIF_END_DO_NOT_TREAT
1375         itotsav = itot
1376         !
1377         ! Second pass: read and transfer pruned characters into cdnambuff
1378         !
1379         REWIND(iun)
1380         itot=1
1381     30  READ(iun,'(A256)',END=40,ERR=40) chline
1382         iltc = LEN_TRIM(chline)
1383         IF ( iltc.GT.0 ) THEN
1384          inl = INDEX(chline, '!')
1385          IF( inl.eq.0 ) THEN
1386           inl = iltc
1387          ELSE
1388           inl = inl - 1
1389          ENDIF
1390          IF( inl.GT.0 .AND. LEN_TRIM( chline(1:inl) ).GT.0 ) THEN
1391             cdnambuff(itot:itot+inl-1) = chline(1:inl)
1392             WRITE( cdnambuff(itot+inl:itot+inl), '(a)' ) csp
1393             itot = itot + inl + 1
1394          ENDIF
1395         ENDIF
1396         GOTO 30
1397     40  CONTINUE
1398         itot = itot - 1
1399         IF( itotsav .NE. itot ) WRITE(*,*) 'WARNING in load_nml. Allocated ',itotsav,' for read buffer; but used ',itot
1400         !
1401         ! Close namelist file
1402         !
1403         CLOSE(iun)
1404         !write(*,'(32A)') cdnambuff
1405      ENDIF
1406#if defined key_mpp_mpi
1407      CALL mpp_bcast_nml( cdnambuff, itot )
1408#endif
1409  END SUBROUTINE load_nml
1410
1411
[2715]1412   !!----------------------------------------------------------------------
[3]1413END MODULE lib_mpp
Note: See TracBrowser for help on using the repository browser.