New URL for NEMO forge!   http://forge.nemo-ocean.eu

Since March 2022 along with NEMO 4.2 release, the code development moved to a self-hosted GitLab.
This present forge is now archived and remained online for history.
lib_mpp.F90 in NEMO/trunk/src/OCE/LBC – NEMO

source: NEMO/trunk/src/OCE/LBC/lib_mpp.F90 @ 15814

Last change on this file since 15814 was 15267, checked in by smasson, 3 years ago

trunk: new nogather nolding, #2724

  • Property svn:keywords set to Id
File size: 74.2 KB
RevLine 
[3]1MODULE lib_mpp
[13]2   !!======================================================================
3   !!                       ***  MODULE  lib_mpp  ***
[1344]4   !! Ocean numerics:  massively parallel processing library
[13]5   !!=====================================================================
[1344]6   !! History :  OPA  !  1994  (M. Guyon, J. Escobar, M. Imbard)  Original code
7   !!            7.0  !  1997  (A.M. Treguier)  SHMEM additions
8   !!            8.0  !  1998  (M. Imbard, J. Escobar, L. Colombet ) SHMEM and MPI
9   !!                 !  1998  (J.M. Molines) Open boundary conditions
[9019]10   !!   NEMO     1.0  !  2003  (J.M. Molines, G. Madec)  F90, free form
[1344]11   !!                 !  2003  (J.M. Molines) add mpp_ini_north(_3d,_2d)
12   !!             -   !  2004  (R. Bourdalle Badie)  isend option in mpi
13   !!                 !  2004  (J.M. Molines) minloc, maxloc
14   !!             -   !  2005  (G. Madec, S. Masson)  npolj=5,6 F-point & ice cases
15   !!             -   !  2005  (R. Redler) Replacement of MPI_COMM_WORLD except for MPI_Abort
16   !!             -   !  2005  (R. Benshila, G. Madec)  add extra halo case
17   !!             -   !  2008  (R. Benshila) add mpp_ini_ice
18   !!            3.2  !  2009  (R. Benshila) SHMEM suppression, north fold in lbc_nfd
[3764]19   !!            3.2  !  2009  (O. Marti)    add mpp_ini_znl
[2715]20   !!            4.0  !  2011  (G. Madec)  move ctl_ routines from in_out_manager
[9019]21   !!            3.5  !  2012  (S.Mocavero, I. Epicoco) Add mpp_lnk_bdy_3d/2d routines to optimize the BDY comm.
[14072]22   !!            3.5  !  2013  (C. Ethe, G. Madec)  message passing arrays as local variables
[6140]23   !!            3.5  !  2013  (S.Mocavero, I.Epicoco - CMCC) north fold optimizations
[9019]24   !!            3.6  !  2015  (O. Tintó and M. Castrillo - BSC) Added '_multiple' case for 2D lbc and max
25   !!            4.0  !  2017  (G. Madec) automatique allocation of array argument (use any 3rd dimension)
26   !!             -   !  2017  (G. Madec) create generic.h90 files to generate all lbc and north fold routines
[13]27   !!----------------------------------------------------------------------
[2715]28
29   !!----------------------------------------------------------------------
[6140]30   !!   ctl_stop      : update momentum and tracer Kz from a tke scheme
31   !!   ctl_warn      : initialization, namelist read, and parameters control
32   !!   ctl_opn       : Open file and check if required file is available.
33   !!   ctl_nam       : Prints informations when an error occurs while reading a namelist
[12377]34   !!   load_nml      : Read, condense and buffer namelist file into character array for use as an internal file
[2715]35   !!----------------------------------------------------------------------
[13]36   !!----------------------------------------------------------------------
[11536]37   !!   mpp_start     : get local communicator its size and rank
[2715]38   !!   mpp_lnk       : interface (defined in lbclnk) for message passing of 2d or 3d arrays (mpp_lnk_2d, mpp_lnk_3d)
[4990]39   !!   mpp_lnk_icb   : interface for message passing of 2d arrays with extra halo for icebergs (mpp_lnk_2d_icb)
[6140]40   !!   mpprecv       :
[9019]41   !!   mppsend       :
[2715]42   !!   mppscatter    :
43   !!   mppgather     :
44   !!   mpp_min       : generic interface for mppmin_int , mppmin_a_int , mppmin_real, mppmin_a_real
45   !!   mpp_max       : generic interface for mppmax_int , mppmax_a_int , mppmax_real, mppmax_a_real
46   !!   mpp_sum       : generic interface for mppsum_int , mppsum_a_int , mppsum_real, mppsum_a_real
47   !!   mpp_minloc    :
48   !!   mpp_maxloc    :
49   !!   mppsync       :
50   !!   mppstop       :
[15267]51   !!   mpp_ini_northgather : initialisation of north fold with gathering of the communications
[9019]52   !!   mpp_lbc_north_icb : alternative to mpp_nfd for extra outer halo with icebergs
[12377]53   !!   mpp_bcast_nml : broadcast/receive namelist character buffer from reading process to all others
[13]54   !!----------------------------------------------------------------------
[3764]55   USE dom_oce        ! ocean space and time domain
[2715]56   USE in_out_manager ! I/O manager
[14433]57#if ! defined key_mpi_off
58   USE MPI
59#endif
[3]60
[13]61   IMPLICIT NONE
[415]62   PRIVATE
[9019]63   !
[12377]64   PUBLIC   ctl_stop, ctl_warn, ctl_opn, ctl_nam, load_nml
[11536]65   PUBLIC   mpp_start, mppstop, mppsync, mpp_comm_free
[15267]66   PUBLIC   mpp_ini_northgather
[1344]67   PUBLIC   mpp_min, mpp_max, mpp_sum, mpp_minloc, mpp_maxloc
[10425]68   PUBLIC   mpp_delay_max, mpp_delay_sum, mpp_delay_rcv
[3294]69   PUBLIC   mppscatter, mppgather
[10425]70   PUBLIC   mpp_ini_znl
[13982]71   PUBLIC   mpp_ini_nc
[3764]72   PUBLIC   mppsend, mpprecv                          ! needed by TAM and ICB routines
[13226]73   PUBLIC   mppsend_sp, mpprecv_sp                          ! needed by TAM and ICB routines
74   PUBLIC   mppsend_dp, mpprecv_dp                          ! needed by TAM and ICB routines
[11536]75   PUBLIC   mpp_report
[12377]76   PUBLIC   mpp_bcast_nml
[11536]77   PUBLIC   tic_tac
[15023]78#if defined key_mpi_off
79   PUBLIC   MPI_wait
80   PUBLIC   MPI_waitall
81   PUBLIC   MPI_Wtime
[11536]82#endif
[14072]83
[13]84   !! * Interfaces
85   !! define generic interface for these routine as they are called sometimes
[1344]86   !! with scalar arguments instead of array arguments, which causes problems
87   !! for the compilation on AIX system as well as NEC and SGI. Ok on COMPACQ
[13]88   INTERFACE mpp_min
[13226]89      MODULE PROCEDURE mppmin_a_int, mppmin_int
90      MODULE PROCEDURE mppmin_a_real_sp, mppmin_real_sp
91      MODULE PROCEDURE mppmin_a_real_dp, mppmin_real_dp
[13]92   END INTERFACE
93   INTERFACE mpp_max
[13226]94      MODULE PROCEDURE mppmax_a_int, mppmax_int
95      MODULE PROCEDURE mppmax_a_real_sp, mppmax_real_sp
96      MODULE PROCEDURE mppmax_a_real_dp, mppmax_real_dp
[13]97   END INTERFACE
98   INTERFACE mpp_sum
[13226]99      MODULE PROCEDURE mppsum_a_int, mppsum_int
100      MODULE PROCEDURE mppsum_realdd, mppsum_a_realdd
101      MODULE PROCEDURE mppsum_a_real_sp, mppsum_real_sp
102      MODULE PROCEDURE mppsum_a_real_dp, mppsum_real_dp
[13]103   END INTERFACE
[1344]104   INTERFACE mpp_minloc
[13226]105      MODULE PROCEDURE mpp_minloc2d_sp ,mpp_minloc3d_sp
106      MODULE PROCEDURE mpp_minloc2d_dp ,mpp_minloc3d_dp
[1344]107   END INTERFACE
108   INTERFACE mpp_maxloc
[13226]109      MODULE PROCEDURE mpp_maxloc2d_sp ,mpp_maxloc3d_sp
110      MODULE PROCEDURE mpp_maxloc2d_dp ,mpp_maxloc3d_dp
[1344]111   END INTERFACE
[6490]112
[14433]113   TYPE, PUBLIC ::   PTR_4D_sp   !: array of 4D pointers (used in lbclnk and lbcnfd)
114      REAL(sp), DIMENSION (:,:,:,:), POINTER ::   pt4d
115   END TYPE PTR_4D_sp
116
117   TYPE, PUBLIC ::   PTR_4D_dp   !: array of 4D pointers (used in lbclnk and lbcnfd)
118      REAL(dp), DIMENSION (:,:,:,:), POINTER ::   pt4d
119   END TYPE PTR_4D_dp
120
[51]121   !! ========================= !!
122   !!  MPI  variable definition !!
123   !! ========================= !!
[14229]124#if ! defined key_mpi_off
[1344]125   LOGICAL, PUBLIC, PARAMETER ::   lk_mpp = .TRUE.    !: mpp flag
[14072]126#else
[11536]127   INTEGER, PUBLIC, PARAMETER ::   MPI_STATUS_SIZE = 1
[13438]128   INTEGER, PUBLIC, PARAMETER ::   MPI_REAL = 4
[11536]129   INTEGER, PUBLIC, PARAMETER ::   MPI_DOUBLE_PRECISION = 8
[15023]130   INTEGER, PUBLIC, PARAMETER ::   MPI_REQUEST_NULL = 1
[11536]131   LOGICAL, PUBLIC, PARAMETER ::   lk_mpp = .FALSE.    !: mpp flag
[15023]132   INTEGER, PUBLIC, DIMENSION(MPI_STATUS_SIZE) ::   MPI_STATUS_IGNORE = 1     ! out from mpi_wait
133   INTEGER, PUBLIC, DIMENSION(MPI_STATUS_SIZE) ::   MPI_STATUSES_IGNORE = 1   ! out from mpi_waitall
[11536]134#endif
[3]135
[10425]136   INTEGER, PUBLIC ::   mppsize        ! number of process
137   INTEGER, PUBLIC ::   mpprank        ! process number  [ 0 - size-1 ]
[2363]138!$AGRIF_DO_NOT_TREAT
[9570]139   INTEGER, PUBLIC ::   mpi_comm_oce   ! opa local communicator
[2363]140!$AGRIF_END_DO_NOT_TREAT
[3]141
[2480]142   INTEGER :: MPI_SUMDD
[1976]143
[14433]144   ! Neighbourgs informations
145   INTEGER,    PARAMETER, PUBLIC ::   n_hlsmax = 3
146   INTEGER, DIMENSION(         8), PUBLIC ::   mpinei      !: 8-neighbourg MPI indexes (starting at 0, -1 if no neighbourg)
147   INTEGER, DIMENSION(n_hlsmax,8), PUBLIC ::   mpiSnei     !: 8-neighbourg Send MPI indexes (starting at 0, -1 if no neighbourg)
148   INTEGER, DIMENSION(n_hlsmax,8), PUBLIC ::   mpiRnei     !: 8-neighbourg Recv MPI indexes (starting at 0, -1 if no neighbourg)
149   INTEGER,    PARAMETER, PUBLIC ::   jpwe = 1   !: WEst
150   INTEGER,    PARAMETER, PUBLIC ::   jpea = 2   !: EAst
151   INTEGER,    PARAMETER, PUBLIC ::   jpso = 3   !: SOuth
152   INTEGER,    PARAMETER, PUBLIC ::   jpno = 4   !: NOrth
153   INTEGER,    PARAMETER, PUBLIC ::   jpsw = 5   !: South-West
154   INTEGER,    PARAMETER, PUBLIC ::   jpse = 6   !: South-East
155   INTEGER,    PARAMETER, PUBLIC ::   jpnw = 7   !: North-West
156   INTEGER,    PARAMETER, PUBLIC ::   jpne = 8   !: North-East
157
158   LOGICAL, DIMENSION(8), PUBLIC ::   l_SelfPerio  !   should we explicitely take care of I/J periodicity
159   LOGICAL,               PUBLIC ::   l_IdoNFold
160
[1345]161   ! variables used for zonal integration
[14433]162   INTEGER, PUBLIC ::   ncomm_znl         !: communicator made by the processors on the same zonal average
163   LOGICAL, PUBLIC ::   l_znl_root        !: True on the 'left'most processor on the same row
164   INTEGER         ::   ngrp_znl          !: group ID for the znl processors
165   INTEGER         ::   ndim_rank_znl     !: number of processors on the same zonal average
[2715]166   INTEGER, DIMENSION(:), ALLOCATABLE, SAVE ::   nrank_znl  ! dimension ndim_rank_znl, number of the procs into the same znl domain
[3]167
[13982]168   ! variables used for MPI3 neighbourhood collectives
[14433]169   INTEGER, DIMENSION(n_hlsmax), PUBLIC ::   mpi_nc_com4       ! MPI3 neighbourhood collectives communicator
170   INTEGER, DIMENSION(n_hlsmax), PUBLIC ::   mpi_nc_com8       ! MPI3 neighbourhood collectives communicator (with diagionals)
[13982]171
[3764]172   ! North fold condition in mpp_mpi with jpni > 1 (PUBLIC for TAM)
[9019]173   INTEGER, PUBLIC ::   ngrp_world        !: group ID for the world processors
174   INTEGER, PUBLIC ::   ngrp_opa          !: group ID for the opa processors
175   INTEGER, PUBLIC ::   ngrp_north        !: group ID for the northern processors (to be fold)
176   INTEGER, PUBLIC ::   ncomm_north       !: communicator made by the processors belonging to ngrp_north
177   INTEGER, PUBLIC ::   ndim_rank_north   !: number of 'sea' processor in the northern line (can be /= jpni !)
178   INTEGER, PUBLIC ::   njmppmax          !: value of njmpp for the processors of the northern line
179   INTEGER, PUBLIC ::   north_root        !: number (in the comm_opa) of proc 0 in the northern comm
180   INTEGER, PUBLIC, DIMENSION(:), ALLOCATABLE, SAVE ::   nrank_north   !: dimension ndim_rank_north
[3764]181
[10425]182   ! Communications summary report
[13216]183   CHARACTER(len=lca), DIMENSION(:), ALLOCATABLE ::   crname_lbc                   !: names of lbc_lnk calling routines
184   CHARACTER(len=lca), DIMENSION(:), ALLOCATABLE ::   crname_glb                   !: names of global comm calling routines
185   CHARACTER(len=lca), DIMENSION(:), ALLOCATABLE ::   crname_dlg                   !: names of delayed global comm calling routines
[10425]186   INTEGER, PUBLIC                               ::   ncom_stp = 0                 !: copy of time step # istp
187   INTEGER, PUBLIC                               ::   ncom_fsbc = 1                !: copy of sbc time step # nn_fsbc
188   INTEGER, PUBLIC                               ::   ncom_freq                    !: frequency of comm diagnostic
189   INTEGER, PUBLIC , DIMENSION(:,:), ALLOCATABLE ::   ncomm_sequence               !: size of communicated arrays (halos)
[10781]190   INTEGER, PARAMETER, PUBLIC                    ::   ncom_rec_max = 5000          !: max number of communication record
[10425]191   INTEGER, PUBLIC                               ::   n_sequence_lbc = 0           !: # of communicated arraysvia lbc
192   INTEGER, PUBLIC                               ::   n_sequence_glb = 0           !: # of global communications
[10437]193   INTEGER, PUBLIC                               ::   n_sequence_dlg = 0           !: # of delayed global communications
[10425]194   INTEGER, PUBLIC                               ::   numcom = -1                  !: logical unit for communicaton report
195   LOGICAL, PUBLIC                               ::   l_full_nf_update = .TRUE.    !: logical for a full (2lines) update of bc at North fold report
196   INTEGER,                    PARAMETER, PUBLIC ::   nbdelay = 2       !: number of delayed operations
197   !: name (used as id) of allreduce-delayed operations
[10521]198   ! Warning: we must use the same character length in an array constructor (at least for gcc compiler)
199   CHARACTER(len=32), DIMENSION(nbdelay), PUBLIC ::   c_delaylist = (/ 'cflice', 'fwb   ' /)
[10425]200   !: component name where the allreduce-delayed operation is performed
201   CHARACTER(len=3),  DIMENSION(nbdelay), PUBLIC ::   c_delaycpnt = (/ 'ICE'   , 'OCE' /)
202   TYPE, PUBLIC ::   DELAYARR
203      REAL(   wp), POINTER, DIMENSION(:) ::  z1d => NULL()
[13226]204      COMPLEX(dp), POINTER, DIMENSION(:) ::  y1d => NULL()
[10425]205   END TYPE DELAYARR
[10817]206   TYPE( DELAYARR ), DIMENSION(nbdelay), PUBLIC, SAVE  ::   todelay         !: must have SAVE for default initialization of DELAYARR
207   INTEGER,          DIMENSION(nbdelay), PUBLIC        ::   ndelayid = -1   !: mpi request id of the delayed operations
[10425]208
209   ! timing summary report
[13226]210   REAL(dp), DIMENSION(2), PUBLIC ::  waiting_time = 0._dp
211   REAL(dp)              , PUBLIC ::  compute_time = 0._dp, elapsed_time = 0._dp
[14072]212
[9019]213   REAL(wp), DIMENSION(:), ALLOCATABLE, SAVE ::   tampon   ! buffer in case of bsend
[3]214
[9019]215   LOGICAL, PUBLIC ::   ln_nnogather                !: namelist control of northfold comms
[14433]216   INTEGER, PUBLIC ::   nn_comm                     !: namelist control of comms
[14072]217
[14433]218   INTEGER, PUBLIC, PARAMETER ::   jpfillnothing = 1
219   INTEGER, PUBLIC, PARAMETER ::   jpfillcst     = 2
220   INTEGER, PUBLIC, PARAMETER ::   jpfillcopy    = 3
221   INTEGER, PUBLIC, PARAMETER ::   jpfillperio   = 4
222   INTEGER, PUBLIC, PARAMETER ::   jpfillmpi     = 5
223
[12377]224   !! * Substitutions
225#  include "do_loop_substitute.h90"
[51]226   !!----------------------------------------------------------------------
[9598]227   !! NEMO/OCE 4.0 , NEMO Consortium (2018)
[888]228   !! $Id$
[10068]229   !! Software governed by the CeCILL license (see ./LICENSE)
[1344]230   !!----------------------------------------------------------------------
[3]231CONTAINS
232
[11536]233   SUBROUTINE mpp_start( localComm )
[2715]234      !!----------------------------------------------------------------------
[11536]235      !!                  ***  routine mpp_start  ***
[3764]236      !!
[11536]237      !! ** Purpose :   get mpi_comm_oce, mpprank and mppsize
[51]238      !!----------------------------------------------------------------------
[6140]239      INTEGER         , OPTIONAL   , INTENT(in   ) ::   localComm    !
[2715]240      !
[11536]241      INTEGER ::   ierr
242      LOGICAL ::   llmpi_init
[51]243      !!----------------------------------------------------------------------
[14229]244#if ! defined key_mpi_off
[1344]245      !
[11536]246      CALL mpi_initialized ( llmpi_init, ierr )
247      IF( ierr /= MPI_SUCCESS ) CALL ctl_stop( 'STOP', ' lib_mpp: Error in routine mpi_initialized' )
[2715]248
[11536]249      IF( .NOT. llmpi_init ) THEN
250         IF( PRESENT(localComm) ) THEN
251            WRITE(ctmp1,*) ' lib_mpp: You cannot provide a local communicator '
252            WRITE(ctmp2,*) '          without calling MPI_Init before ! '
253            CALL ctl_stop( 'STOP', ctmp1, ctmp2 )
254         ENDIF
255         CALL mpi_init( ierr )
256         IF( ierr /= MPI_SUCCESS ) CALL ctl_stop( 'STOP', ' lib_mpp: Error in routine mpi_init' )
[2480]257      ENDIF
[14072]258
[3764]259      IF( PRESENT(localComm) ) THEN
[2480]260         IF( Agrif_Root() ) THEN
[9570]261            mpi_comm_oce = localComm
[2480]262         ENDIF
263      ELSE
[11536]264         CALL mpi_comm_dup( mpi_comm_world, mpi_comm_oce, ierr)
265         IF( ierr /= MPI_SUCCESS ) CALL ctl_stop( 'STOP', ' lib_mpp: Error in routine mpi_comm_dup' )
[3764]266      ENDIF
[2480]267
[11536]268# if defined key_agrif
[9019]269      IF( Agrif_Root() ) THEN
[9570]270         CALL Agrif_MPI_Init(mpi_comm_oce)
[5656]271      ELSE
[9570]272         CALL Agrif_MPI_set_grid_comm(mpi_comm_oce)
[5656]273      ENDIF
[11536]274# endif
[5656]275
[9570]276      CALL mpi_comm_rank( mpi_comm_oce, mpprank, ierr )
277      CALL mpi_comm_size( mpi_comm_oce, mppsize, ierr )
[3764]278      !
[1976]279      CALL MPI_OP_CREATE(DDPDD_MPI, .TRUE., MPI_SUMDD, ierr)
280      !
[11536]281#else
282      IF( PRESENT( localComm ) ) mpi_comm_oce = localComm
283      mppsize = 1
284      mpprank = 0
285#endif
286   END SUBROUTINE mpp_start
[3]287
[6140]288
[1344]289   SUBROUTINE mppsend( ktyp, pmess, kbytes, kdest, md_req )
[51]290      !!----------------------------------------------------------------------
291      !!                  ***  routine mppsend  ***
[3764]292      !!
[51]293      !! ** Purpose :   Send messag passing array
294      !!
295      !!----------------------------------------------------------------------
[1344]296      REAL(wp), INTENT(inout) ::   pmess(*)   ! array of real
297      INTEGER , INTENT(in   ) ::   kbytes     ! size of the array pmess
298      INTEGER , INTENT(in   ) ::   kdest      ! receive process number
299      INTEGER , INTENT(in   ) ::   ktyp       ! tag of the message
[14433]300      INTEGER , INTENT(inout) ::   md_req     ! argument for isend
[1344]301      !!
302      INTEGER ::   iflag
[13226]303      INTEGER :: mpi_working_type
[51]304      !!----------------------------------------------------------------------
[1344]305      !
[14229]306#if ! defined key_mpi_off
[13226]307      IF (wp == dp) THEN
308         mpi_working_type = mpi_double_precision
309      ELSE
310         mpi_working_type = mpi_real
311      END IF
312      CALL mpi_isend( pmess, kbytes, mpi_working_type, kdest , ktyp, mpi_comm_oce, md_req, iflag )
[11536]313#endif
[1344]314      !
[51]315   END SUBROUTINE mppsend
[3]316
317
[13226]318   SUBROUTINE mppsend_dp( ktyp, pmess, kbytes, kdest, md_req )
319      !!----------------------------------------------------------------------
320      !!                  ***  routine mppsend  ***
321      !!
322      !! ** Purpose :   Send messag passing array
323      !!
324      !!----------------------------------------------------------------------
325      REAL(dp), INTENT(inout) ::   pmess(*)   ! array of real
326      INTEGER , INTENT(in   ) ::   kbytes     ! size of the array pmess
327      INTEGER , INTENT(in   ) ::   kdest      ! receive process number
328      INTEGER , INTENT(in   ) ::   ktyp       ! tag of the message
[14433]329      INTEGER , INTENT(inout) ::   md_req     ! argument for isend
[13226]330      !!
331      INTEGER ::   iflag
332      !!----------------------------------------------------------------------
333      !
[14229]334#if ! defined key_mpi_off
[13226]335      CALL mpi_isend( pmess, kbytes, mpi_double_precision, kdest , ktyp, mpi_comm_oce, md_req, iflag )
336#endif
337      !
338   END SUBROUTINE mppsend_dp
339
340
341   SUBROUTINE mppsend_sp( ktyp, pmess, kbytes, kdest, md_req )
342      !!----------------------------------------------------------------------
343      !!                  ***  routine mppsend  ***
344      !!
345      !! ** Purpose :   Send messag passing array
346      !!
347      !!----------------------------------------------------------------------
348      REAL(sp), INTENT(inout) ::   pmess(*)   ! array of real
349      INTEGER , INTENT(in   ) ::   kbytes     ! size of the array pmess
350      INTEGER , INTENT(in   ) ::   kdest      ! receive process number
351      INTEGER , INTENT(in   ) ::   ktyp       ! tag of the message
[14433]352      INTEGER , INTENT(inout) ::   md_req     ! argument for isend
[13226]353      !!
354      INTEGER ::   iflag
355      !!----------------------------------------------------------------------
356      !
[14229]357#if ! defined key_mpi_off
[13226]358      CALL mpi_isend( pmess, kbytes, mpi_real, kdest , ktyp, mpi_comm_oce, md_req, iflag )
359#endif
360      !
361   END SUBROUTINE mppsend_sp
362
363
[3294]364   SUBROUTINE mpprecv( ktyp, pmess, kbytes, ksource )
[51]365      !!----------------------------------------------------------------------
366      !!                  ***  routine mpprecv  ***
367      !!
368      !! ** Purpose :   Receive messag passing array
369      !!
370      !!----------------------------------------------------------------------
[1344]371      REAL(wp), INTENT(inout) ::   pmess(*)   ! array of real
372      INTEGER , INTENT(in   ) ::   kbytes     ! suze of the array pmess
373      INTEGER , INTENT(in   ) ::   ktyp       ! Tag of the recevied message
[3764]374      INTEGER, OPTIONAL, INTENT(in) :: ksource    ! source process number
[1344]375      !!
[51]376      INTEGER :: istatus(mpi_status_size)
377      INTEGER :: iflag
[3294]378      INTEGER :: use_source
[13226]379      INTEGER :: mpi_working_type
[1344]380      !!----------------------------------------------------------------------
381      !
[14229]382#if ! defined key_mpi_off
[3764]383      ! If a specific process number has been passed to the receive call,
[3294]384      ! use that one. Default is to use mpi_any_source
[6140]385      use_source = mpi_any_source
386      IF( PRESENT(ksource) )   use_source = ksource
387      !
[13226]388      IF (wp == dp) THEN
389         mpi_working_type = mpi_double_precision
390      ELSE
391         mpi_working_type = mpi_real
392      END IF
393      CALL mpi_recv( pmess, kbytes, mpi_working_type, use_source, ktyp, mpi_comm_oce, istatus, iflag )
[11536]394#endif
[1344]395      !
[51]396   END SUBROUTINE mpprecv
[3]397
[13226]398   SUBROUTINE mpprecv_dp( ktyp, pmess, kbytes, ksource )
399      !!----------------------------------------------------------------------
400      !!                  ***  routine mpprecv  ***
401      !!
402      !! ** Purpose :   Receive messag passing array
403      !!
404      !!----------------------------------------------------------------------
405      REAL(dp), INTENT(inout) ::   pmess(*)   ! array of real
406      INTEGER , INTENT(in   ) ::   kbytes     ! suze of the array pmess
407      INTEGER , INTENT(in   ) ::   ktyp       ! Tag of the recevied message
408      INTEGER, OPTIONAL, INTENT(in) :: ksource    ! source process number
409      !!
410      INTEGER :: istatus(mpi_status_size)
411      INTEGER :: iflag
412      INTEGER :: use_source
413      !!----------------------------------------------------------------------
414      !
[14229]415#if ! defined key_mpi_off
[13226]416      ! If a specific process number has been passed to the receive call,
417      ! use that one. Default is to use mpi_any_source
418      use_source = mpi_any_source
419      IF( PRESENT(ksource) )   use_source = ksource
420      !
421      CALL mpi_recv( pmess, kbytes, mpi_double_precision, use_source, ktyp, mpi_comm_oce, istatus, iflag )
422#endif
423      !
424   END SUBROUTINE mpprecv_dp
[3]425
[13226]426
427   SUBROUTINE mpprecv_sp( ktyp, pmess, kbytes, ksource )
428      !!----------------------------------------------------------------------
429      !!                  ***  routine mpprecv  ***
430      !!
431      !! ** Purpose :   Receive messag passing array
432      !!
433      !!----------------------------------------------------------------------
434      REAL(sp), INTENT(inout) ::   pmess(*)   ! array of real
435      INTEGER , INTENT(in   ) ::   kbytes     ! suze of the array pmess
436      INTEGER , INTENT(in   ) ::   ktyp       ! Tag of the recevied message
437      INTEGER, OPTIONAL, INTENT(in) :: ksource    ! source process number
438      !!
439      INTEGER :: istatus(mpi_status_size)
440      INTEGER :: iflag
441      INTEGER :: use_source
442      !!----------------------------------------------------------------------
443      !
[14229]444#if ! defined key_mpi_off
[13226]445      ! If a specific process number has been passed to the receive call,
446      ! use that one. Default is to use mpi_any_source
447      use_source = mpi_any_source
448      IF( PRESENT(ksource) )   use_source = ksource
449      !
450      CALL mpi_recv( pmess, kbytes, mpi_real, use_source, ktyp, mpi_comm_oce, istatus, iflag )
451#endif
452      !
453   END SUBROUTINE mpprecv_sp
454
455
[51]456   SUBROUTINE mppgather( ptab, kp, pio )
457      !!----------------------------------------------------------------------
458      !!                   ***  routine mppgather  ***
[3764]459      !!
460      !! ** Purpose :   Transfert between a local subdomain array and a work
[51]461      !!     array which is distributed following the vertical level.
462      !!
[1344]463      !!----------------------------------------------------------------------
[6140]464      REAL(wp), DIMENSION(jpi,jpj)      , INTENT(in   ) ::   ptab   ! subdomain input array
465      INTEGER                           , INTENT(in   ) ::   kp     ! record length
[1344]466      REAL(wp), DIMENSION(jpi,jpj,jpnij), INTENT(  out) ::   pio    ! subdomain input array
[51]467      !!
[1344]468      INTEGER :: itaille, ierror   ! temporary integer
[51]469      !!---------------------------------------------------------------------
[1344]470      !
471      itaille = jpi * jpj
[14229]472#if ! defined key_mpi_off
[1344]473      CALL mpi_gather( ptab, itaille, mpi_double_precision, pio, itaille     ,   &
[9570]474         &                            mpi_double_precision, kp , mpi_comm_oce, ierror )
[11536]475#else
476      pio(:,:,1) = ptab(:,:)
477#endif
[1344]478      !
[51]479   END SUBROUTINE mppgather
[3]480
481
[51]482   SUBROUTINE mppscatter( pio, kp, ptab )
483      !!----------------------------------------------------------------------
484      !!                  ***  routine mppscatter  ***
485      !!
[3764]486      !! ** Purpose :   Transfert between awork array which is distributed
[51]487      !!      following the vertical level and the local subdomain array.
488      !!
489      !!----------------------------------------------------------------------
[6140]490      REAL(wp), DIMENSION(jpi,jpj,jpnij)  ::   pio    ! output array
491      INTEGER                             ::   kp     ! Tag (not used with MPI
492      REAL(wp), DIMENSION(jpi,jpj)        ::   ptab   ! subdomain array input
[1344]493      !!
494      INTEGER :: itaille, ierror   ! temporary integer
[51]495      !!---------------------------------------------------------------------
[1344]496      !
[6140]497      itaille = jpi * jpj
[1344]498      !
[14229]499#if ! defined key_mpi_off
[1344]500      CALL mpi_scatter( pio, itaille, mpi_double_precision, ptab, itaille     ,   &
[9570]501         &                            mpi_double_precision, kp  , mpi_comm_oce, ierror )
[11536]502#else
503      ptab(:,:) = pio(:,:,1)
504#endif
[1344]505      !
[51]506   END SUBROUTINE mppscatter
[3]507
[14072]508
[10425]509   SUBROUTINE mpp_delay_sum( cdname, cdelay, y_in, pout, ldlast, kcom )
510     !!----------------------------------------------------------------------
511      !!                   ***  routine mpp_delay_sum  ***
[1344]512      !!
[10425]513      !! ** Purpose :   performed delayed mpp_sum, the result is received on next call
514      !!
[1344]515      !!----------------------------------------------------------------------
[10425]516      CHARACTER(len=*), INTENT(in   )               ::   cdname  ! name of the calling subroutine
517      CHARACTER(len=*), INTENT(in   )               ::   cdelay  ! name (used as id) of the delayed operation
[13226]518      COMPLEX(dp),      INTENT(in   ), DIMENSION(:) ::   y_in
[10425]519      REAL(wp),         INTENT(  out), DIMENSION(:) ::   pout
520      LOGICAL,          INTENT(in   )               ::   ldlast  ! true if this is the last time we call this routine
521      INTEGER,          INTENT(in   ), OPTIONAL     ::   kcom
[1344]522      !!
[10425]523      INTEGER ::   ji, isz
524      INTEGER ::   idvar
525      INTEGER ::   ierr, ilocalcomm
[13226]526      COMPLEX(dp), ALLOCATABLE, DIMENSION(:) ::   ytmp
[1344]527      !!----------------------------------------------------------------------
[14229]528#if ! defined key_mpi_off
[9570]529      ilocalcomm = mpi_comm_oce
[9019]530      IF( PRESENT(kcom) )   ilocalcomm = kcom
[3]531
[10425]532      isz = SIZE(y_in)
[14072]533
[10437]534      IF( narea == 1 .AND. numcom == -1 ) CALL mpp_report( cdname, ld_dlg = .TRUE. )
[13]535
[10425]536      idvar = -1
537      DO ji = 1, nbdelay
538         IF( TRIM(cdelay) == TRIM(c_delaylist(ji)) ) idvar = ji
539      END DO
540      IF ( idvar == -1 )   CALL ctl_stop( 'STOP',' mpp_delay_sum : please add a new delayed exchange for '//TRIM(cdname) )
541
542      IF ( ndelayid(idvar) == 0 ) THEN         ! first call    with restart: %z1d defined in iom_delay_rst
543         !                                       --------------------------
544         IF ( SIZE(todelay(idvar)%z1d) /= isz ) THEN                  ! Check dimension coherence
545            IF(lwp) WRITE(numout,*) ' WARNING: the nb of delayed variables in restart file is not the model one'
546            DEALLOCATE(todelay(idvar)%z1d)
547            ndelayid(idvar) = -1                                      ! do as if we had no restart
548         ELSE
549            ALLOCATE(todelay(idvar)%y1d(isz))
550            todelay(idvar)%y1d(:) = CMPLX(todelay(idvar)%z1d(:), 0., wp)   ! create %y1d, complex variable needed by mpi_sumdd
[13636]551            ndelayid(idvar) = MPI_REQUEST_NULL                             ! initialised request to a valid value
[10425]552         END IF
553      ENDIF
[14072]554
[10425]555      IF( ndelayid(idvar) == -1 ) THEN         ! first call without restart: define %y1d and %z1d from y_in with blocking allreduce
556         !                                       --------------------------
557         ALLOCATE(todelay(idvar)%z1d(isz), todelay(idvar)%y1d(isz))   ! allocate also %z1d as used for the restart
558         CALL mpi_allreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ierr )   ! get %y1d
[13636]559         ndelayid(idvar) = MPI_REQUEST_NULL
[10425]560      ENDIF
561
[13636]562      CALL mpp_delay_rcv( idvar )         ! make sure %z1d is received
[10425]563
564      ! send back pout from todelay(idvar)%z1d defined at previous call
565      pout(:) = todelay(idvar)%z1d(:)
566
567      ! send y_in into todelay(idvar)%y1d with a non-blocking communication
[11536]568# if defined key_mpi2
[10526]569      IF( ln_timing ) CALL tic_tac( .TRUE., ld_global = .TRUE.)
[12512]570      CALL  mpi_allreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ierr )
[13636]571      ndelayid(idvar) = MPI_REQUEST_NULL
[10526]572      IF( ln_timing ) CALL tic_tac(.FALSE., ld_global = .TRUE.)
[11536]573# else
574      CALL mpi_iallreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ndelayid(idvar), ierr )
575# endif
[10526]576#else
[11536]577      pout(:) = REAL(y_in(:), wp)
[10526]578#endif
[10425]579
580   END SUBROUTINE mpp_delay_sum
581
[14072]582
[10425]583   SUBROUTINE mpp_delay_max( cdname, cdelay, p_in, pout, ldlast, kcom )
[9019]584      !!----------------------------------------------------------------------
[10425]585      !!                   ***  routine mpp_delay_max  ***
[9019]586      !!
[10425]587      !! ** Purpose :   performed delayed mpp_max, the result is received on next call
[9019]588      !!
589      !!----------------------------------------------------------------------
[10425]590      CHARACTER(len=*), INTENT(in   )                 ::   cdname  ! name of the calling subroutine
591      CHARACTER(len=*), INTENT(in   )                 ::   cdelay  ! name (used as id) of the delayed operation
[14072]592      REAL(wp),         INTENT(in   ), DIMENSION(:)   ::   p_in    !
593      REAL(wp),         INTENT(  out), DIMENSION(:)   ::   pout    !
[10425]594      LOGICAL,          INTENT(in   )                 ::   ldlast  ! true if this is the last time we call this routine
595      INTEGER,          INTENT(in   ), OPTIONAL       ::   kcom
[9019]596      !!
[10425]597      INTEGER ::   ji, isz
598      INTEGER ::   idvar
599      INTEGER ::   ierr, ilocalcomm
[13226]600      INTEGER ::   MPI_TYPE
[9019]601      !!----------------------------------------------------------------------
[14072]602
[14229]603#if ! defined key_mpi_off
[13226]604      if( wp == dp ) then
605         MPI_TYPE = MPI_DOUBLE_PRECISION
606      else if ( wp == sp ) then
607         MPI_TYPE = MPI_REAL
608      else
609        CALL ctl_stop( "Error defining type, wp is neither dp nor sp" )
[14072]610
[13226]611      end if
612
[9570]613      ilocalcomm = mpi_comm_oce
[9019]614      IF( PRESENT(kcom) )   ilocalcomm = kcom
[6140]615
[10425]616      isz = SIZE(p_in)
[9019]617
[10437]618      IF( narea == 1 .AND. numcom == -1 ) CALL mpp_report( cdname, ld_dlg = .TRUE. )
[13]619
[10425]620      idvar = -1
621      DO ji = 1, nbdelay
622         IF( TRIM(cdelay) == TRIM(c_delaylist(ji)) ) idvar = ji
623      END DO
624      IF ( idvar == -1 )   CALL ctl_stop( 'STOP',' mpp_delay_max : please add a new delayed exchange for '//TRIM(cdname) )
[3]625
[10425]626      IF ( ndelayid(idvar) == 0 ) THEN         ! first call    with restart: %z1d defined in iom_delay_rst
627         !                                       --------------------------
628         IF ( SIZE(todelay(idvar)%z1d) /= isz ) THEN                  ! Check dimension coherence
629            IF(lwp) WRITE(numout,*) ' WARNING: the nb of delayed variables in restart file is not the model one'
630            DEALLOCATE(todelay(idvar)%z1d)
631            ndelayid(idvar) = -1                                      ! do as if we had no restart
[13636]632         ELSE
633            ndelayid(idvar) = MPI_REQUEST_NULL
[10425]634         END IF
635      ENDIF
[13]636
[10425]637      IF( ndelayid(idvar) == -1 ) THEN         ! first call without restart: define %z1d from p_in with a blocking allreduce
638         !                                       --------------------------
639         ALLOCATE(todelay(idvar)%z1d(isz))
640         CALL mpi_allreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_DOUBLE_PRECISION, mpi_max, ilocalcomm, ierr )   ! get %z1d
[13636]641         ndelayid(idvar) = MPI_REQUEST_NULL
[10425]642      ENDIF
[3]643
[13636]644      CALL mpp_delay_rcv( idvar )         ! make sure %z1d is received
[3]645
[10425]646      ! send back pout from todelay(idvar)%z1d defined at previous call
647      pout(:) = todelay(idvar)%z1d(:)
[13]648
[10425]649      ! send p_in into todelay(idvar)%z1d with a non-blocking communication
[13636]650      ! (PM) Should we get rid of MPI2 option ? MPI3 was release in 2013. Who is still using MPI2 ?
[11536]651# if defined key_mpi2
[10526]652      IF( ln_timing ) CALL tic_tac( .TRUE., ld_global = .TRUE.)
[13636]653      CALL  mpi_allreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_TYPE, mpi_max, ilocalcomm, ierr )
[10526]654      IF( ln_timing ) CALL tic_tac(.FALSE., ld_global = .TRUE.)
[11536]655# else
[13226]656      CALL mpi_iallreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_TYPE, mpi_max, ilocalcomm, ndelayid(idvar), ierr )
[11536]657# endif
[10526]658#else
[11536]659      pout(:) = p_in(:)
[10526]660#endif
[10425]661
662   END SUBROUTINE mpp_delay_max
663
[14072]664
[10425]665   SUBROUTINE mpp_delay_rcv( kid )
666      !!----------------------------------------------------------------------
667      !!                   ***  routine mpp_delay_rcv  ***
[1344]668      !!
[14072]669      !! ** Purpose :  force barrier for delayed mpp (needed for restart)
[1344]670      !!
[10425]671      !!----------------------------------------------------------------------
[14072]672      INTEGER,INTENT(in   )      ::  kid
[10425]673      INTEGER ::   ierr
674      !!----------------------------------------------------------------------
[14229]675#if ! defined key_mpi_off
[13636]676      IF( ln_timing ) CALL tic_tac( .TRUE., ld_global = .TRUE.)
677      ! test on ndelayid(kid) useless as mpi_wait return immediatly if the request handle is MPI_REQUEST_NULL
678      CALL mpi_wait( ndelayid(kid), MPI_STATUS_IGNORE, ierr ) ! after this ndelayid(kid) = MPI_REQUEST_NULL
679      IF( ln_timing ) CALL tic_tac( .FALSE., ld_global = .TRUE.)
680      IF( ASSOCIATED(todelay(kid)%y1d) )   todelay(kid)%z1d(:) = REAL(todelay(kid)%y1d(:), wp)  ! define %z1d from %y1d
[10526]681#endif
[10425]682   END SUBROUTINE mpp_delay_rcv
[3]683
[12377]684   SUBROUTINE mpp_bcast_nml( cdnambuff , kleng )
685      CHARACTER(LEN=:)    , ALLOCATABLE, INTENT(INOUT) :: cdnambuff
686      INTEGER                          , INTENT(INOUT) :: kleng
687      !!----------------------------------------------------------------------
688      !!                  ***  routine mpp_bcast_nml  ***
689      !!
690      !! ** Purpose :   broadcast namelist character buffer
691      !!
692      !!----------------------------------------------------------------------
693      !!
694      INTEGER ::   iflag
695      !!----------------------------------------------------------------------
696      !
[14229]697#if ! defined key_mpi_off
[12377]698      call MPI_BCAST(kleng, 1, MPI_INT, 0, mpi_comm_oce, iflag)
699      call MPI_BARRIER(mpi_comm_oce, iflag)
700!$AGRIF_DO_NOT_TREAT
701      IF ( .NOT. ALLOCATED(cdnambuff) ) ALLOCATE( CHARACTER(LEN=kleng) :: cdnambuff )
702!$AGRIF_END_DO_NOT_TREAT
703      call MPI_BCAST(cdnambuff, kleng, MPI_CHARACTER, 0, mpi_comm_oce, iflag)
704      call MPI_BARRIER(mpi_comm_oce, iflag)
705#endif
706      !
707   END SUBROUTINE mpp_bcast_nml
708
[14072]709
[10425]710   !!----------------------------------------------------------------------
711   !!    ***  mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real  ***
[14072]712   !!
[10425]713   !!----------------------------------------------------------------------
714   !!
715#  define OPERATION_MAX
716#  define INTEGER_TYPE
717#  define DIM_0d
718#     define ROUTINE_ALLREDUCE           mppmax_int
719#     include "mpp_allreduce_generic.h90"
720#     undef ROUTINE_ALLREDUCE
721#  undef DIM_0d
722#  define DIM_1d
723#     define ROUTINE_ALLREDUCE           mppmax_a_int
724#     include "mpp_allreduce_generic.h90"
725#     undef ROUTINE_ALLREDUCE
726#  undef DIM_1d
727#  undef INTEGER_TYPE
728!
[13226]729   !!
730   !!   ----   SINGLE PRECISION VERSIONS
731   !!
732#  define SINGLE_PRECISION
[10425]733#  define REAL_TYPE
734#  define DIM_0d
[13226]735#     define ROUTINE_ALLREDUCE           mppmax_real_sp
[10425]736#     include "mpp_allreduce_generic.h90"
737#     undef ROUTINE_ALLREDUCE
738#  undef DIM_0d
739#  define DIM_1d
[13226]740#     define ROUTINE_ALLREDUCE           mppmax_a_real_sp
[10425]741#     include "mpp_allreduce_generic.h90"
742#     undef ROUTINE_ALLREDUCE
743#  undef DIM_1d
[13226]744#  undef SINGLE_PRECISION
745   !!
746   !!
747   !!   ----   DOUBLE PRECISION VERSIONS
748   !!
749!
750#  define DIM_0d
751#     define ROUTINE_ALLREDUCE           mppmax_real_dp
752#     include "mpp_allreduce_generic.h90"
753#     undef ROUTINE_ALLREDUCE
754#  undef DIM_0d
755#  define DIM_1d
756#     define ROUTINE_ALLREDUCE           mppmax_a_real_dp
757#     include "mpp_allreduce_generic.h90"
758#     undef ROUTINE_ALLREDUCE
759#  undef DIM_1d
[10425]760#  undef REAL_TYPE
761#  undef OPERATION_MAX
762   !!----------------------------------------------------------------------
763   !!    ***  mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real  ***
[14072]764   !!
[10425]765   !!----------------------------------------------------------------------
766   !!
767#  define OPERATION_MIN
768#  define INTEGER_TYPE
769#  define DIM_0d
770#     define ROUTINE_ALLREDUCE           mppmin_int
771#     include "mpp_allreduce_generic.h90"
772#     undef ROUTINE_ALLREDUCE
773#  undef DIM_0d
774#  define DIM_1d
775#     define ROUTINE_ALLREDUCE           mppmin_a_int
776#     include "mpp_allreduce_generic.h90"
777#     undef ROUTINE_ALLREDUCE
778#  undef DIM_1d
779#  undef INTEGER_TYPE
780!
[13226]781   !!
782   !!   ----   SINGLE PRECISION VERSIONS
783   !!
784#  define SINGLE_PRECISION
[10425]785#  define REAL_TYPE
786#  define DIM_0d
[13226]787#     define ROUTINE_ALLREDUCE           mppmin_real_sp
[10425]788#     include "mpp_allreduce_generic.h90"
789#     undef ROUTINE_ALLREDUCE
790#  undef DIM_0d
791#  define DIM_1d
[13226]792#     define ROUTINE_ALLREDUCE           mppmin_a_real_sp
[10425]793#     include "mpp_allreduce_generic.h90"
794#     undef ROUTINE_ALLREDUCE
795#  undef DIM_1d
[13226]796#  undef SINGLE_PRECISION
797   !!
798   !!   ----   DOUBLE PRECISION VERSIONS
799   !!
800
801#  define DIM_0d
802#     define ROUTINE_ALLREDUCE           mppmin_real_dp
803#     include "mpp_allreduce_generic.h90"
804#     undef ROUTINE_ALLREDUCE
805#  undef DIM_0d
806#  define DIM_1d
807#     define ROUTINE_ALLREDUCE           mppmin_a_real_dp
808#     include "mpp_allreduce_generic.h90"
809#     undef ROUTINE_ALLREDUCE
810#  undef DIM_1d
[10425]811#  undef REAL_TYPE
812#  undef OPERATION_MIN
[869]813
[10425]814   !!----------------------------------------------------------------------
815   !!    ***  mppsum_a_int, mppsum_int, mppsum_a_real, mppsum_real  ***
[14072]816   !!
[10425]817   !!   Global sum of 1D array or a variable (integer, real or complex)
818   !!----------------------------------------------------------------------
819   !!
820#  define OPERATION_SUM
821#  define INTEGER_TYPE
822#  define DIM_0d
823#     define ROUTINE_ALLREDUCE           mppsum_int
824#     include "mpp_allreduce_generic.h90"
825#     undef ROUTINE_ALLREDUCE
826#  undef DIM_0d
827#  define DIM_1d
828#     define ROUTINE_ALLREDUCE           mppsum_a_int
829#     include "mpp_allreduce_generic.h90"
830#     undef ROUTINE_ALLREDUCE
831#  undef DIM_1d
832#  undef INTEGER_TYPE
[13226]833
834   !!
835   !!   ----   SINGLE PRECISION VERSIONS
836   !!
837#  define OPERATION_SUM
838#  define SINGLE_PRECISION
[10425]839#  define REAL_TYPE
840#  define DIM_0d
[13226]841#     define ROUTINE_ALLREDUCE           mppsum_real_sp
[10425]842#     include "mpp_allreduce_generic.h90"
843#     undef ROUTINE_ALLREDUCE
844#  undef DIM_0d
845#  define DIM_1d
[13226]846#     define ROUTINE_ALLREDUCE           mppsum_a_real_sp
[10425]847#     include "mpp_allreduce_generic.h90"
848#     undef ROUTINE_ALLREDUCE
849#  undef DIM_1d
850#  undef REAL_TYPE
851#  undef OPERATION_SUM
852
[13226]853#  undef SINGLE_PRECISION
854
855   !!
856   !!   ----   DOUBLE PRECISION VERSIONS
857   !!
858#  define OPERATION_SUM
859#  define REAL_TYPE
860#  define DIM_0d
861#     define ROUTINE_ALLREDUCE           mppsum_real_dp
862#     include "mpp_allreduce_generic.h90"
863#     undef ROUTINE_ALLREDUCE
864#  undef DIM_0d
865#  define DIM_1d
866#     define ROUTINE_ALLREDUCE           mppsum_a_real_dp
867#     include "mpp_allreduce_generic.h90"
868#     undef ROUTINE_ALLREDUCE
869#  undef DIM_1d
870#  undef REAL_TYPE
871#  undef OPERATION_SUM
872
[10425]873#  define OPERATION_SUM_DD
874#  define COMPLEX_TYPE
875#  define DIM_0d
876#     define ROUTINE_ALLREDUCE           mppsum_realdd
877#     include "mpp_allreduce_generic.h90"
878#     undef ROUTINE_ALLREDUCE
879#  undef DIM_0d
880#  define DIM_1d
881#     define ROUTINE_ALLREDUCE           mppsum_a_realdd
882#     include "mpp_allreduce_generic.h90"
883#     undef ROUTINE_ALLREDUCE
884#  undef DIM_1d
885#  undef COMPLEX_TYPE
886#  undef OPERATION_SUM_DD
887
888   !!----------------------------------------------------------------------
889   !!    ***  mpp_minloc2d, mpp_minloc3d, mpp_maxloc2d, mpp_maxloc3d
[14072]890   !!
[10425]891   !!----------------------------------------------------------------------
892   !!
[13226]893   !!
894   !!   ----   SINGLE PRECISION VERSIONS
895   !!
896#  define SINGLE_PRECISION
[10425]897#  define OPERATION_MINLOC
898#  define DIM_2d
[13226]899#     define ROUTINE_LOC           mpp_minloc2d_sp
[10425]900#     include "mpp_loc_generic.h90"
901#     undef ROUTINE_LOC
902#  undef DIM_2d
903#  define DIM_3d
[13226]904#     define ROUTINE_LOC           mpp_minloc3d_sp
[10425]905#     include "mpp_loc_generic.h90"
906#     undef ROUTINE_LOC
907#  undef DIM_3d
908#  undef OPERATION_MINLOC
909
910#  define OPERATION_MAXLOC
911#  define DIM_2d
[13226]912#     define ROUTINE_LOC           mpp_maxloc2d_sp
[10425]913#     include "mpp_loc_generic.h90"
914#     undef ROUTINE_LOC
915#  undef DIM_2d
916#  define DIM_3d
[13226]917#     define ROUTINE_LOC           mpp_maxloc3d_sp
[10425]918#     include "mpp_loc_generic.h90"
919#     undef ROUTINE_LOC
920#  undef DIM_3d
921#  undef OPERATION_MAXLOC
[13226]922#  undef SINGLE_PRECISION
923   !!
924   !!   ----   DOUBLE PRECISION VERSIONS
925   !!
926#  define OPERATION_MINLOC
927#  define DIM_2d
928#     define ROUTINE_LOC           mpp_minloc2d_dp
929#     include "mpp_loc_generic.h90"
930#     undef ROUTINE_LOC
931#  undef DIM_2d
932#  define DIM_3d
933#     define ROUTINE_LOC           mpp_minloc3d_dp
934#     include "mpp_loc_generic.h90"
935#     undef ROUTINE_LOC
936#  undef DIM_3d
937#  undef OPERATION_MINLOC
[10425]938
[13226]939#  define OPERATION_MAXLOC
940#  define DIM_2d
941#     define ROUTINE_LOC           mpp_maxloc2d_dp
942#     include "mpp_loc_generic.h90"
943#     undef ROUTINE_LOC
944#  undef DIM_2d
945#  define DIM_3d
946#     define ROUTINE_LOC           mpp_maxloc3d_dp
947#     include "mpp_loc_generic.h90"
948#     undef ROUTINE_LOC
949#  undef DIM_3d
950#  undef OPERATION_MAXLOC
951
952
[1344]953   SUBROUTINE mppsync()
954      !!----------------------------------------------------------------------
955      !!                  ***  routine mppsync  ***
[3764]956      !!
[1344]957      !! ** Purpose :   Massively parallel processors, synchroneous
958      !!
959      !!-----------------------------------------------------------------------
960      INTEGER :: ierror
961      !!-----------------------------------------------------------------------
962      !
[14229]963#if ! defined key_mpi_off
[9570]964      CALL mpi_barrier( mpi_comm_oce, ierror )
[11536]965#endif
[1344]966      !
967   END SUBROUTINE mppsync
[3]968
969
[14072]970   SUBROUTINE mppstop( ld_abort )
[1344]971      !!----------------------------------------------------------------------
972      !!                  ***  routine mppstop  ***
[3764]973      !!
[3294]974      !! ** purpose :   Stop massively parallel processors method
[1344]975      !!
976      !!----------------------------------------------------------------------
[11536]977      LOGICAL, OPTIONAL, INTENT(in) :: ld_abort    ! source process number
978      LOGICAL ::   ll_abort
[14433]979      INTEGER ::   info, ierr
[1344]980      !!----------------------------------------------------------------------
[11536]981      ll_abort = .FALSE.
982      IF( PRESENT(ld_abort) ) ll_abort = ld_abort
[1344]983      !
[14229]984#if ! defined key_mpi_off
[11536]985      IF(ll_abort) THEN
[14433]986         CALL mpi_abort( MPI_COMM_WORLD, 123, info )
[10425]987      ELSE
988         CALL mppsync
989         CALL mpi_finalize( info )
990      ENDIF
[11536]991#endif
992      IF( ll_abort ) STOP 123
[1344]993      !
994   END SUBROUTINE mppstop
[3]995
996
[1344]997   SUBROUTINE mpp_comm_free( kcom )
998      !!----------------------------------------------------------------------
[14433]999      INTEGER, INTENT(inout) ::   kcom
[1344]1000      !!
1001      INTEGER :: ierr
1002      !!----------------------------------------------------------------------
1003      !
[14229]1004#if ! defined key_mpi_off
[1344]1005      CALL MPI_COMM_FREE(kcom, ierr)
[11536]1006#endif
[1344]1007      !
1008   END SUBROUTINE mpp_comm_free
[3]1009
[869]1010
[2715]1011   SUBROUTINE mpp_ini_znl( kumout )
[1345]1012      !!----------------------------------------------------------------------
1013      !!               ***  routine mpp_ini_znl  ***
1014      !!
1015      !! ** Purpose :   Initialize special communicator for computing zonal sum
1016      !!
1017      !! ** Method  : - Look for processors in the same row
1018      !!              - Put their number in nrank_znl
1019      !!              - Create group for the znl processors
1020      !!              - Create a communicator for znl processors
1021      !!              - Determine if processor should write znl files
1022      !!
1023      !! ** output
1024      !!      ndim_rank_znl = number of processors on the same row
1025      !!      ngrp_znl = group ID for the znl processors
1026      !!      ncomm_znl = communicator for the ice procs.
1027      !!      n_znl_root = number (in the world) of proc 0 in the ice comm.
1028      !!
1029      !!----------------------------------------------------------------------
[2715]1030      INTEGER, INTENT(in) ::   kumout   ! ocean.output logical units
[1345]1031      !
[2715]1032      INTEGER :: jproc      ! dummy loop integer
1033      INTEGER :: ierr, ii   ! local integer
1034      INTEGER, ALLOCATABLE, DIMENSION(:) ::   kwork
1035      !!----------------------------------------------------------------------
[14229]1036#if ! defined key_mpi_off
[14275]1037      !-$$     WRITE (numout,*) 'mpp_ini_znl ', mpprank, ' - ngrp_world     : ', ngrp_world
1038      !-$$     WRITE (numout,*) 'mpp_ini_znl ', mpprank, ' - mpi_comm_world : ', mpi_comm_world
1039      !-$$     WRITE (numout,*) 'mpp_ini_znl ', mpprank, ' - mpi_comm_oce   : ', mpi_comm_oce
[1345]1040      !
[2715]1041      ALLOCATE( kwork(jpnij), STAT=ierr )
[11536]1042      IF( ierr /= 0 ) CALL ctl_stop( 'STOP', 'mpp_ini_znl : failed to allocate 1D array of length jpnij')
[2715]1043
1044      IF( jpnj == 1 ) THEN
[1345]1045         ngrp_znl  = ngrp_world
[9570]1046         ncomm_znl = mpi_comm_oce
[1345]1047      ELSE
1048         !
[9570]1049         CALL MPI_ALLGATHER ( njmpp, 1, mpi_integer, kwork, 1, mpi_integer, mpi_comm_oce, ierr )
[14275]1050         !-$$        WRITE (numout,*) 'mpp_ini_znl ', mpprank, ' - kwork pour njmpp : ', kwork
[1345]1051         !-$$        CALL flush(numout)
1052         !
1053         ! Count number of processors on the same row
1054         ndim_rank_znl = 0
1055         DO jproc=1,jpnij
1056            IF ( kwork(jproc) == njmpp ) THEN
1057               ndim_rank_znl = ndim_rank_znl + 1
1058            ENDIF
1059         END DO
[14275]1060         !-$$        WRITE (numout,*) 'mpp_ini_znl ', mpprank, ' - ndim_rank_znl : ', ndim_rank_znl
[1345]1061         !-$$        CALL flush(numout)
1062         ! Allocate the right size to nrank_znl
[1441]1063         IF (ALLOCATED (nrank_znl)) DEALLOCATE(nrank_znl)
[1345]1064         ALLOCATE(nrank_znl(ndim_rank_znl))
[3764]1065         ii = 0
[1345]1066         nrank_znl (:) = 0
1067         DO jproc=1,jpnij
1068            IF ( kwork(jproc) == njmpp) THEN
1069               ii = ii + 1
[3764]1070               nrank_znl(ii) = jproc -1
[1345]1071            ENDIF
1072         END DO
[14275]1073         !-$$        WRITE (numout,*) 'mpp_ini_znl ', mpprank, ' - nrank_znl : ', nrank_znl
[1345]1074         !-$$        CALL flush(numout)
1075
1076         ! Create the opa group
[9570]1077         CALL MPI_COMM_GROUP(mpi_comm_oce,ngrp_opa,ierr)
[14275]1078         !-$$        WRITE (numout,*) 'mpp_ini_znl ', mpprank, ' - ngrp_opa : ', ngrp_opa
[1345]1079         !-$$        CALL flush(numout)
1080
1081         ! Create the znl group from the opa group
1082         CALL MPI_GROUP_INCL  ( ngrp_opa, ndim_rank_znl, nrank_znl, ngrp_znl, ierr )
[14275]1083         !-$$        WRITE (numout,*) 'mpp_ini_znl ', mpprank, ' - ngrp_znl ', ngrp_znl
[1345]1084         !-$$        CALL flush(numout)
1085
1086         ! Create the znl communicator from the opa communicator, ie the pool of procs in the same row
[9570]1087         CALL MPI_COMM_CREATE ( mpi_comm_oce, ngrp_znl, ncomm_znl, ierr )
[14275]1088         !-$$        WRITE (numout,*) 'mpp_ini_znl ', mpprank, ' - ncomm_znl ', ncomm_znl
[1345]1089         !-$$        CALL flush(numout)
1090         !
1091      END IF
1092
1093      ! Determines if processor if the first (starting from i=1) on the row
[3764]1094      IF ( jpni == 1 ) THEN
[1345]1095         l_znl_root = .TRUE.
1096      ELSE
1097         l_znl_root = .FALSE.
1098         kwork (1) = nimpp
[10425]1099         CALL mpp_min ( 'lib_mpp', kwork(1), kcom = ncomm_znl)
[1345]1100         IF ( nimpp == kwork(1)) l_znl_root = .TRUE.
1101      END IF
1102
[2715]1103      DEALLOCATE(kwork)
[11536]1104#endif
[2715]1105
[1345]1106   END SUBROUTINE mpp_ini_znl
1107
[14433]1108   
1109   SUBROUTINE mpp_ini_nc( khls )
[13982]1110      !!----------------------------------------------------------------------
1111      !!               ***  routine mpp_ini_nc  ***
1112      !!
1113      !! ** Purpose :   Initialize special communicators for MPI3 neighbourhood
1114      !!                collectives
1115      !!
[14072]1116      !! ** Method  : - Create graph communicators starting from the processes
[13982]1117      !!                distribution along i and j directions
1118      !
1119      !! ** output
[14433]1120      !!         mpi_nc_com4 = MPI3 neighbourhood collectives communicator
1121      !!         mpi_nc_com8 = MPI3 neighbourhood collectives communicator (with diagonals)
[13982]1122      !!----------------------------------------------------------------------
[14433]1123      INTEGER,             INTENT(in   ) ::   khls        ! halo size, default = nn_hls
1124      !
1125      INTEGER, DIMENSION(:), ALLOCATABLE :: iSnei4, iRnei4, iSnei8, iRnei8
1126      INTEGER                            :: iScnt4, iRcnt4, iScnt8, iRcnt8
1127      INTEGER                            :: ierr
1128      LOGICAL, PARAMETER                 :: ireord = .FALSE.
1129      !!----------------------------------------------------------------------
1130#if ! defined key_mpi_off && ! defined key_mpi2
1131     
1132      iScnt4 = COUNT( mpiSnei(khls,1:4) >= 0 )
1133      iRcnt4 = COUNT( mpiRnei(khls,1:4) >= 0 )
1134      iScnt8 = COUNT( mpiSnei(khls,1:8) >= 0 )
1135      iRcnt8 = COUNT( mpiRnei(khls,1:8) >= 0 )
[1345]1136
[14433]1137      ALLOCATE( iSnei4(iScnt4), iRnei4(iRcnt4), iSnei8(iScnt8), iRnei8(iRcnt8) )   ! ok if icnt4 or icnt8 = 0
[13982]1138
[14433]1139      iSnei4 = PACK( mpiSnei(khls,1:4), mask = mpiSnei(khls,1:4) >= 0 )
1140      iRnei4 = PACK( mpiRnei(khls,1:4), mask = mpiRnei(khls,1:4) >= 0 )
1141      iSnei8 = PACK( mpiSnei(khls,1:8), mask = mpiSnei(khls,1:8) >= 0 )
1142      iRnei8 = PACK( mpiRnei(khls,1:8), mask = mpiRnei(khls,1:8) >= 0 )
[13982]1143
[14433]1144      CALL MPI_Dist_graph_create_adjacent( mpi_comm_oce, iScnt4, iSnei4, MPI_UNWEIGHTED, iRcnt4, iRnei4, MPI_UNWEIGHTED,   &
1145         &                                 MPI_INFO_NULL, ireord, mpi_nc_com4(khls), ierr )
1146      CALL MPI_Dist_graph_create_adjacent( mpi_comm_oce, iScnt8, iSnei8, MPI_UNWEIGHTED, iRcnt8, iRnei8, MPI_UNWEIGHTED,   &
1147         &                                 MPI_INFO_NULL, ireord, mpi_nc_com8(khls), ierr)
[13982]1148
[14433]1149      DEALLOCATE( iSnei4, iRnei4, iSnei8, iRnei8 )
[13982]1150#endif
1151   END SUBROUTINE mpp_ini_nc
1152
1153
[15267]1154   SUBROUTINE mpp_ini_northgather
[1344]1155      !!----------------------------------------------------------------------
[15267]1156      !!               ***  routine mpp_ini_northgather  ***
[1344]1157      !!
[3764]1158      !! ** Purpose :   Initialize special communicator for north folding
[1344]1159      !!      condition together with global variables needed in the mpp folding
1160      !!
1161      !! ** Method  : - Look for northern processors
1162      !!              - Put their number in nrank_north
1163      !!              - Create groups for the world processors and the north processors
1164      !!              - Create a communicator for northern processors
1165      !!
1166      !! ** output
1167      !!      ndim_rank_north = number of processors in the northern line
1168      !!      nrank_north (ndim_rank_north) = number  of the northern procs.
1169      !!      ngrp_world = group ID for the world processors
1170      !!      ngrp_north = group ID for the northern processors
1171      !!      ncomm_north = communicator for the northern procs.
1172      !!      north_root = number (in the world) of proc 0 in the northern comm.
1173      !!
1174      !!----------------------------------------------------------------------
1175      INTEGER ::   ierr
1176      INTEGER ::   jjproc
1177      INTEGER ::   ii, ji
1178      !!----------------------------------------------------------------------
1179      !
[14229]1180#if ! defined key_mpi_off
[1344]1181      !
1182      ! Look for how many procs on the northern boundary
1183      ndim_rank_north = 0
[13286]1184      DO jjproc = 1, jpni
1185         IF( nfproc(jjproc) /= -1 )   ndim_rank_north = ndim_rank_north + 1
[1344]1186      END DO
1187      !
1188      ! Allocate the right size to nrank_north
[1441]1189      IF (ALLOCATED (nrank_north)) DEALLOCATE(nrank_north)
[1344]1190      ALLOCATE( nrank_north(ndim_rank_north) )
[869]1191
[1344]1192      ! Fill the nrank_north array with proc. number of northern procs.
1193      ! Note : the rank start at 0 in MPI
1194      ii = 0
[13286]1195      DO ji = 1, jpni
1196         IF ( nfproc(ji) /= -1   ) THEN
[1344]1197            ii=ii+1
[13286]1198            nrank_north(ii)=nfproc(ji)
[1344]1199         END IF
1200      END DO
1201      !
1202      ! create the world group
[9570]1203      CALL MPI_COMM_GROUP( mpi_comm_oce, ngrp_world, ierr )
[1344]1204      !
1205      ! Create the North group from the world group
1206      CALL MPI_GROUP_INCL( ngrp_world, ndim_rank_north, nrank_north, ngrp_north, ierr )
1207      !
1208      ! Create the North communicator , ie the pool of procs in the north group
[9570]1209      CALL MPI_COMM_CREATE( mpi_comm_oce, ngrp_north, ncomm_north, ierr )
[1344]1210      !
[11536]1211#endif
[15267]1212   END SUBROUTINE mpp_ini_northgather
[869]1213
1214
[9019]1215   SUBROUTINE DDPDD_MPI( ydda, yddb, ilen, itype )
[1976]1216      !!---------------------------------------------------------------------
1217      !!   Routine DDPDD_MPI: used by reduction operator MPI_SUMDD
1218      !!
1219      !!   Modification of original codes written by David H. Bailey
1220      !!   This subroutine computes yddb(i) = ydda(i)+yddb(i)
1221      !!---------------------------------------------------------------------
[9019]1222      INTEGER                     , INTENT(in)    ::   ilen, itype
[13226]1223      COMPLEX(dp), DIMENSION(ilen), INTENT(in)    ::   ydda
1224      COMPLEX(dp), DIMENSION(ilen), INTENT(inout) ::   yddb
[1976]1225      !
[13226]1226      REAL(dp) :: zerr, zt1, zt2    ! local work variables
[9019]1227      INTEGER  :: ji, ztmp           ! local scalar
1228      !!---------------------------------------------------------------------
1229      !
[1976]1230      ztmp = itype   ! avoid compilation warning
[9019]1231      !
[1976]1232      DO ji=1,ilen
1233      ! Compute ydda + yddb using Knuth's trick.
1234         zt1  = real(ydda(ji)) + real(yddb(ji))
1235         zerr = zt1 - real(ydda(ji))
1236         zt2  = ((real(yddb(ji)) - zerr) + (real(ydda(ji)) - (zt1 - zerr))) &
1237                + aimag(ydda(ji)) + aimag(yddb(ji))
1238
1239         ! The result is zt1 + zt2, after normalization.
1240         yddb(ji) = cmplx ( zt1 + zt2, zt2 - ((zt1 + zt2) - zt1),wp )
1241      END DO
[9019]1242      !
[1976]1243   END SUBROUTINE DDPDD_MPI
1244
[6140]1245
[10437]1246   SUBROUTINE mpp_report( cdname, kpk, kpl, kpf, ld_lbc, ld_glb, ld_dlg )
[10425]1247      !!----------------------------------------------------------------------
1248      !!                  ***  routine mpp_report  ***
1249      !!
1250      !! ** Purpose :   report use of mpp routines per time-setp
1251      !!
1252      !!----------------------------------------------------------------------
1253      CHARACTER(len=*),           INTENT(in   ) ::   cdname      ! name of the calling subroutine
1254      INTEGER         , OPTIONAL, INTENT(in   ) ::   kpk, kpl, kpf
[10437]1255      LOGICAL         , OPTIONAL, INTENT(in   ) ::   ld_lbc, ld_glb, ld_dlg
[10425]1256      !!
[10982]1257      CHARACTER(len=128)                        ::   ccountname  ! name of a subroutine to count communications
[10437]1258      LOGICAL ::   ll_lbc, ll_glb, ll_dlg
[10982]1259      INTEGER ::    ji,  jj,  jk,  jh, jf, jcount   ! dummy loop indices
[10425]1260      !!----------------------------------------------------------------------
[14229]1261#if ! defined key_mpi_off
[10425]1262      !
1263      ll_lbc = .FALSE.
1264      IF( PRESENT(ld_lbc) ) ll_lbc = ld_lbc
1265      ll_glb = .FALSE.
1266      IF( PRESENT(ld_glb) ) ll_glb = ld_glb
[10437]1267      ll_dlg = .FALSE.
1268      IF( PRESENT(ld_dlg) ) ll_dlg = ld_dlg
[10425]1269      !
1270      ! find the smallest common frequency: default = frequency product, if multiple, choose the larger of the 2 frequency
1271      ncom_freq = ncom_fsbc
1272      !
1273      IF ( ncom_stp == nit000+ncom_freq ) THEN   ! avoid to count extra communications in potential initializations at nit000
1274         IF( ll_lbc ) THEN
1275            IF( .NOT. ALLOCATED(ncomm_sequence) ) ALLOCATE( ncomm_sequence(ncom_rec_max,2) )
1276            IF( .NOT. ALLOCATED(    crname_lbc) ) ALLOCATE(     crname_lbc(ncom_rec_max  ) )
1277            n_sequence_lbc = n_sequence_lbc + 1
1278            IF( n_sequence_lbc > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
1279            crname_lbc(n_sequence_lbc) = cdname     ! keep the name of the calling routine
1280            ncomm_sequence(n_sequence_lbc,1) = kpk*kpl   ! size of 3rd and 4th dimensions
1281            ncomm_sequence(n_sequence_lbc,2) = kpf       ! number of arrays to be treated (multi)
1282         ENDIF
1283         IF( ll_glb ) THEN
1284            IF( .NOT. ALLOCATED(crname_glb) ) ALLOCATE( crname_glb(ncom_rec_max) )
1285            n_sequence_glb = n_sequence_glb + 1
1286            IF( n_sequence_glb > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
1287            crname_glb(n_sequence_glb) = cdname     ! keep the name of the calling routine
1288         ENDIF
[10437]1289         IF( ll_dlg ) THEN
1290            IF( .NOT. ALLOCATED(crname_dlg) ) ALLOCATE( crname_dlg(ncom_rec_max) )
1291            n_sequence_dlg = n_sequence_dlg + 1
1292            IF( n_sequence_dlg > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
1293            crname_dlg(n_sequence_dlg) = cdname     ! keep the name of the calling routine
1294         ENDIF
[10425]1295      ELSE IF ( ncom_stp == nit000+2*ncom_freq ) THEN
1296         CALL ctl_opn( numcom, 'communication_report.txt', 'REPLACE', 'FORMATTED', 'SEQUENTIAL', -1, numout, .FALSE., narea )
1297         WRITE(numcom,*) ' '
1298         WRITE(numcom,*) ' ------------------------------------------------------------'
1299         WRITE(numcom,*) ' Communication pattern report (second oce+sbc+top time step):'
1300         WRITE(numcom,*) ' ------------------------------------------------------------'
1301         WRITE(numcom,*) ' '
1302         WRITE(numcom,'(A,I4)') ' Exchanged halos : ', n_sequence_lbc
1303         jj = 0; jk = 0; jf = 0; jh = 0
1304         DO ji = 1, n_sequence_lbc
1305            IF ( ncomm_sequence(ji,1) .GT. 1 ) jk = jk + 1
1306            IF ( ncomm_sequence(ji,2) .GT. 1 ) jf = jf + 1
1307            IF ( ncomm_sequence(ji,1) .GT. 1 .AND. ncomm_sequence(ji,2) .GT. 1 ) jj = jj + 1
1308            jh = MAX (jh, ncomm_sequence(ji,1)*ncomm_sequence(ji,2))
1309         END DO
1310         WRITE(numcom,'(A,I3)') ' 3D Exchanged halos : ', jk
1311         WRITE(numcom,'(A,I3)') ' Multi arrays exchanged halos : ', jf
1312         WRITE(numcom,'(A,I3)') '   from which 3D : ', jj
1313         WRITE(numcom,'(A,I10)') ' Array max size : ', jh*jpi*jpj
1314         WRITE(numcom,*) ' '
1315         WRITE(numcom,*) ' lbc_lnk called'
[10982]1316         DO ji = 1, n_sequence_lbc - 1
1317            IF ( crname_lbc(ji) /= 'already counted' ) THEN
1318               ccountname = crname_lbc(ji)
1319               crname_lbc(ji) = 'already counted'
1320               jcount = 1
1321               DO jj = ji + 1, n_sequence_lbc
1322                  IF ( ccountname ==  crname_lbc(jj) ) THEN
1323                     jcount = jcount + 1
1324                     crname_lbc(jj) = 'already counted'
1325                  END IF
1326               END DO
1327               WRITE(numcom,'(A, I4, A, A)') ' - ', jcount,' times by subroutine ', TRIM(ccountname)
[10425]1328            END IF
1329         END DO
[10982]1330         IF ( crname_lbc(n_sequence_lbc) /= 'already counted' ) THEN
[14354]1331            WRITE(numcom,'(A, I4, A, A)') ' - ', 1,' times by subroutine ', TRIM(crname_lbc(n_sequence_lbc))
[10982]1332         END IF
[10425]1333         WRITE(numcom,*) ' '
1334         IF ( n_sequence_glb > 0 ) THEN
1335            WRITE(numcom,'(A,I4)') ' Global communications : ', n_sequence_glb
1336            jj = 1
1337            DO ji = 2, n_sequence_glb
1338               IF( crname_glb(ji-1) /= crname_glb(ji) ) THEN
1339                  WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_glb(ji-1))
1340                  jj = 0
1341               END IF
[14072]1342               jj = jj + 1
[10425]1343            END DO
1344            WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_glb(n_sequence_glb))
1345            DEALLOCATE(crname_glb)
1346         ELSE
1347            WRITE(numcom,*) ' No MPI global communication '
1348         ENDIF
1349         WRITE(numcom,*) ' '
[10437]1350         IF ( n_sequence_dlg > 0 ) THEN
1351            WRITE(numcom,'(A,I4)') ' Delayed global communications : ', n_sequence_dlg
1352            jj = 1
1353            DO ji = 2, n_sequence_dlg
1354               IF( crname_dlg(ji-1) /= crname_dlg(ji) ) THEN
1355                  WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_dlg(ji-1))
1356                  jj = 0
1357               END IF
[14072]1358               jj = jj + 1
[10437]1359            END DO
1360            WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_dlg(n_sequence_dlg))
1361            DEALLOCATE(crname_dlg)
1362         ELSE
1363            WRITE(numcom,*) ' No MPI delayed global communication '
1364         ENDIF
1365         WRITE(numcom,*) ' '
[10425]1366         WRITE(numcom,*) ' -----------------------------------------------'
1367         WRITE(numcom,*) ' '
1368         DEALLOCATE(ncomm_sequence)
1369         DEALLOCATE(crname_lbc)
1370      ENDIF
[11536]1371#endif
[10425]1372   END SUBROUTINE mpp_report
1373
[14072]1374
[10425]1375   SUBROUTINE tic_tac (ld_tic, ld_global)
1376
1377    LOGICAL,           INTENT(IN) :: ld_tic
1378    LOGICAL, OPTIONAL, INTENT(IN) :: ld_global
[13226]1379    REAL(dp), DIMENSION(2), SAVE :: tic_wt
1380    REAL(dp),               SAVE :: tic_ct = 0._dp
[10425]1381    INTEGER :: ii
[14229]1382#if ! defined key_mpi_off
[10425]1383
1384    IF( ncom_stp <= nit000 ) RETURN
1385    IF( ncom_stp == nitend ) RETURN
1386    ii = 1
1387    IF( PRESENT( ld_global ) ) THEN
1388       IF( ld_global ) ii = 2
1389    END IF
[14072]1390
[10425]1391    IF ( ld_tic ) THEN
1392       tic_wt(ii) = MPI_Wtime()                                                    ! start count tic->tac (waiting time)
[13226]1393       IF ( tic_ct > 0.0_dp ) compute_time = compute_time + MPI_Wtime() - tic_ct   ! cumulate count tac->tic
[10425]1394    ELSE
1395       waiting_time(ii) = waiting_time(ii) + MPI_Wtime() - tic_wt(ii)              ! cumulate count tic->tac
1396       tic_ct = MPI_Wtime()                                                        ! start count tac->tic (waiting time)
1397    ENDIF
[11536]1398#endif
[14072]1399
[10425]1400   END SUBROUTINE tic_tac
1401
[14229]1402#if defined key_mpi_off
[11536]1403   SUBROUTINE mpi_wait(request, status, ierror)
1404      INTEGER                            , INTENT(in   ) ::   request
1405      INTEGER, DIMENSION(MPI_STATUS_SIZE), INTENT(  out) ::   status
1406      INTEGER                            , INTENT(  out) ::   ierror
[15023]1407      IF (.FALSE.) THEN   ! to avoid compilation warning
1408         status(:) = -1
1409         ierror = -1
1410      ENDIF
[11536]1411   END SUBROUTINE mpi_wait
[1976]1412
[15023]1413   SUBROUTINE mpi_waitall(count, request, status, ierror)
1414      INTEGER                            , INTENT(in   ) :: count
1415      INTEGER, DIMENSION(count)          , INTENT(in   ) :: request
1416      INTEGER, DIMENSION(MPI_STATUS_SIZE), INTENT(  out) :: status
1417      INTEGER                            , INTENT(  out) :: ierror
1418      IF (.FALSE.) THEN   ! to avoid compilation warning
1419         status(:) = -1
1420         ierror = -1
1421      ENDIF
1422   END SUBROUTINE mpi_waitall
[14072]1423
[11536]1424   FUNCTION MPI_Wtime()
1425      REAL(wp) ::  MPI_Wtime
1426      MPI_Wtime = -1.
1427   END FUNCTION MPI_Wtime
[3]1428#endif
[2715]1429
[13]1430   !!----------------------------------------------------------------------
[12377]1431   !!   ctl_stop, ctl_warn, get_unit, ctl_opn, ctl_nam, load_nml   routines
[2715]1432   !!----------------------------------------------------------------------
1433
1434   SUBROUTINE ctl_stop( cd1, cd2, cd3, cd4, cd5 ,   &
1435      &                 cd6, cd7, cd8, cd9, cd10 )
1436      !!----------------------------------------------------------------------
1437      !!                  ***  ROUTINE  stop_opa  ***
1438      !!
[3764]1439      !! ** Purpose :   print in ocean.outpput file a error message and
[2715]1440      !!                increment the error number (nstop) by one.
1441      !!----------------------------------------------------------------------
[11536]1442      CHARACTER(len=*), INTENT(in   )           ::   cd1
1443      CHARACTER(len=*), INTENT(in   ), OPTIONAL ::        cd2, cd3, cd4, cd5
1444      CHARACTER(len=*), INTENT(in   ), OPTIONAL ::   cd6, cd7, cd8, cd9, cd10
[12933]1445      !
1446      CHARACTER(LEN=8) ::   clfmt            ! writing format
[13011]1447      INTEGER          ::   inum
[2715]1448      !!----------------------------------------------------------------------
1449      !
[3764]1450      nstop = nstop + 1
[11536]1451      !
[13011]1452      IF( cd1 == 'STOP' .AND. narea /= 1 ) THEN    ! Immediate stop: add an arror message in 'ocean.output' file
1453         CALL ctl_opn( inum, 'ocean.output', 'APPEND', 'FORMATTED', 'SEQUENTIAL', -1, 6, .FALSE. )
1454         WRITE(inum,*)
1455         WRITE(inum,*) ' ==>>>   Look for "E R R O R" messages in all existing *ocean.output* files'
1456         CLOSE(inum)
[12933]1457      ENDIF
[13011]1458      IF( numout == 6 ) THEN                       ! force to open ocean.output file if not already opened
1459         CALL ctl_opn( numout, 'ocean.output', 'REPLACE', 'FORMATTED', 'SEQUENTIAL', -1, -1, .FALSE., narea )
1460      ENDIF
[11536]1461      !
1462                            WRITE(numout,*)
1463                            WRITE(numout,*) ' ===>>> : E R R O R'
1464                            WRITE(numout,*)
1465                            WRITE(numout,*) '         ==========='
1466                            WRITE(numout,*)
1467                            WRITE(numout,*) TRIM(cd1)
[10425]1468      IF( PRESENT(cd2 ) )   WRITE(numout,*) TRIM(cd2)
1469      IF( PRESENT(cd3 ) )   WRITE(numout,*) TRIM(cd3)
1470      IF( PRESENT(cd4 ) )   WRITE(numout,*) TRIM(cd4)
1471      IF( PRESENT(cd5 ) )   WRITE(numout,*) TRIM(cd5)
1472      IF( PRESENT(cd6 ) )   WRITE(numout,*) TRIM(cd6)
1473      IF( PRESENT(cd7 ) )   WRITE(numout,*) TRIM(cd7)
1474      IF( PRESENT(cd8 ) )   WRITE(numout,*) TRIM(cd8)
1475      IF( PRESENT(cd9 ) )   WRITE(numout,*) TRIM(cd9)
1476      IF( PRESENT(cd10) )   WRITE(numout,*) TRIM(cd10)
[11536]1477                            WRITE(numout,*)
1478      !
[2715]1479                               CALL FLUSH(numout    )
1480      IF( numstp     /= -1 )   CALL FLUSH(numstp    )
[9019]1481      IF( numrun     /= -1 )   CALL FLUSH(numrun    )
[2715]1482      IF( numevo_ice /= -1 )   CALL FLUSH(numevo_ice)
1483      !
1484      IF( cd1 == 'STOP' ) THEN
[14072]1485         WRITE(numout,*)
[10425]1486         WRITE(numout,*)  'huge E-R-R-O-R : immediate stop'
[14072]1487         WRITE(numout,*)
[12933]1488         CALL FLUSH(numout)
1489         CALL SLEEP(60)   ! make sure that all output and abort files are written by all cores. 60s should be enough...
[11536]1490         CALL mppstop( ld_abort = .true. )
[2715]1491      ENDIF
1492      !
1493   END SUBROUTINE ctl_stop
1494
1495
1496   SUBROUTINE ctl_warn( cd1, cd2, cd3, cd4, cd5,   &
1497      &                 cd6, cd7, cd8, cd9, cd10 )
1498      !!----------------------------------------------------------------------
1499      !!                  ***  ROUTINE  stop_warn  ***
1500      !!
[3764]1501      !! ** Purpose :   print in ocean.outpput file a error message and
[2715]1502      !!                increment the warning number (nwarn) by one.
1503      !!----------------------------------------------------------------------
1504      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd1, cd2, cd3, cd4, cd5
1505      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd6, cd7, cd8, cd9, cd10
1506      !!----------------------------------------------------------------------
[3764]1507      !
1508      nwarn = nwarn + 1
[11536]1509      !
[2715]1510      IF(lwp) THEN
[11536]1511                               WRITE(numout,*)
1512                               WRITE(numout,*) ' ===>>> : W A R N I N G'
1513                               WRITE(numout,*)
1514                               WRITE(numout,*) '         ==============='
1515                               WRITE(numout,*)
1516         IF( PRESENT(cd1 ) )   WRITE(numout,*) TRIM(cd1)
1517         IF( PRESENT(cd2 ) )   WRITE(numout,*) TRIM(cd2)
1518         IF( PRESENT(cd3 ) )   WRITE(numout,*) TRIM(cd3)
1519         IF( PRESENT(cd4 ) )   WRITE(numout,*) TRIM(cd4)
1520         IF( PRESENT(cd5 ) )   WRITE(numout,*) TRIM(cd5)
1521         IF( PRESENT(cd6 ) )   WRITE(numout,*) TRIM(cd6)
1522         IF( PRESENT(cd7 ) )   WRITE(numout,*) TRIM(cd7)
1523         IF( PRESENT(cd8 ) )   WRITE(numout,*) TRIM(cd8)
1524         IF( PRESENT(cd9 ) )   WRITE(numout,*) TRIM(cd9)
1525         IF( PRESENT(cd10) )   WRITE(numout,*) TRIM(cd10)
1526                               WRITE(numout,*)
[2715]1527      ENDIF
1528      CALL FLUSH(numout)
1529      !
1530   END SUBROUTINE ctl_warn
1531
1532
1533   SUBROUTINE ctl_opn( knum, cdfile, cdstat, cdform, cdacce, klengh, kout, ldwp, karea )
1534      !!----------------------------------------------------------------------
1535      !!                  ***  ROUTINE ctl_opn  ***
1536      !!
1537      !! ** Purpose :   Open file and check if required file is available.
1538      !!
1539      !! ** Method  :   Fortan open
1540      !!----------------------------------------------------------------------
1541      INTEGER          , INTENT(  out) ::   knum      ! logical unit to open
1542      CHARACTER(len=*) , INTENT(in   ) ::   cdfile    ! file name to open
1543      CHARACTER(len=*) , INTENT(in   ) ::   cdstat    ! disposition specifier
1544      CHARACTER(len=*) , INTENT(in   ) ::   cdform    ! formatting specifier
1545      CHARACTER(len=*) , INTENT(in   ) ::   cdacce    ! access specifier
1546      INTEGER          , INTENT(in   ) ::   klengh    ! record length
1547      INTEGER          , INTENT(in   ) ::   kout      ! number of logical units for write
1548      LOGICAL          , INTENT(in   ) ::   ldwp      ! boolean term for print
1549      INTEGER, OPTIONAL, INTENT(in   ) ::   karea     ! proc number
[5836]1550      !
[2715]1551      CHARACTER(len=80) ::   clfile
[12933]1552      CHARACTER(LEN=10) ::   clfmt            ! writing format
[2715]1553      INTEGER           ::   iost
[13062]1554      INTEGER           ::   idg              ! number of digits
[2715]1555      !!----------------------------------------------------------------------
[5836]1556      !
[2715]1557      ! adapt filename
1558      ! ----------------
1559      clfile = TRIM(cdfile)
1560      IF( PRESENT( karea ) ) THEN
[12933]1561         IF( karea > 1 ) THEN
[13009]1562            ! Warning: jpnij is maybe not already defined when calling ctl_opn -> use mppsize instead of jpnij
1563            idg = MAX( INT(LOG10(REAL(MAX(1,mppsize-1),wp))) + 1, 4 )      ! how many digits to we need to write? min=4, max=9
1564            WRITE(clfmt, "('(a,a,i', i1, '.', i1, ')')") idg, idg          ! '(a,a,ix.x)'
[12933]1565            WRITE(clfile, clfmt) TRIM(clfile), '_', karea-1
1566         ENDIF
[2715]1567      ENDIF
1568#if defined key_agrif
1569      IF( .NOT. Agrif_Root() )   clfile = TRIM(Agrif_CFixed())//'_'//TRIM(clfile)
1570      knum=Agrif_Get_Unit()
1571#else
1572      knum=get_unit()
1573#endif
[10425]1574      IF( TRIM(cdfile) == '/dev/null' )   clfile = TRIM(cdfile)   ! force the use of /dev/null
[5836]1575      !
[11536]1576      IF(       cdacce(1:6) == 'DIRECT' )  THEN   ! cdacce has always more than 6 characters
[10425]1577         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS=cdstat, RECL=klengh         , ERR=100, IOSTAT=iost )
1578      ELSE IF( TRIM(cdstat) == 'APPEND' )  THEN   ! cdstat can have less than 6 characters
1579         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS='UNKNOWN', POSITION='APPEND', ERR=100, IOSTAT=iost )
[2715]1580      ELSE
[10425]1581         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS=cdstat                      , ERR=100, IOSTAT=iost )
[2715]1582      ENDIF
[10425]1583      IF( iost /= 0 .AND. TRIM(clfile) == '/dev/null' ) &   ! for windows
[14072]1584         &  OPEN(UNIT=knum,FILE='NUL', FORM=cdform, ACCESS=cdacce, STATUS=cdstat                      , ERR=100, IOSTAT=iost )
[2715]1585      IF( iost == 0 ) THEN
[12377]1586         IF(ldwp .AND. kout > 0) THEN
[10425]1587            WRITE(kout,*) '     file   : ', TRIM(clfile),' open ok'
[2715]1588            WRITE(kout,*) '     unit   = ', knum
1589            WRITE(kout,*) '     status = ', cdstat
1590            WRITE(kout,*) '     form   = ', cdform
1591            WRITE(kout,*) '     access = ', cdacce
1592            WRITE(kout,*)
1593         ENDIF
1594      ENDIF
1595100   CONTINUE
1596      IF( iost /= 0 ) THEN
[11536]1597         WRITE(ctmp1,*) ' ===>>>> : bad opening file: ', TRIM(clfile)
1598         WRITE(ctmp2,*) ' =======   ===  '
1599         WRITE(ctmp3,*) '           unit   = ', knum
1600         WRITE(ctmp4,*) '           status = ', cdstat
1601         WRITE(ctmp5,*) '           form   = ', cdform
1602         WRITE(ctmp6,*) '           access = ', cdacce
1603         WRITE(ctmp7,*) '           iostat = ', iost
1604         WRITE(ctmp8,*) '           we stop. verify the file '
1605         CALL ctl_stop( 'STOP', ctmp1, ctmp2, ctmp3, ctmp4, ctmp5, ctmp6, ctmp7, ctmp8 )
[2715]1606      ENDIF
[5836]1607      !
[2715]1608   END SUBROUTINE ctl_opn
1609
[5836]1610
[11536]1611   SUBROUTINE ctl_nam ( kios, cdnam )
[4147]1612      !!----------------------------------------------------------------------
1613      !!                  ***  ROUTINE ctl_nam  ***
1614      !!
1615      !! ** Purpose :   Informations when error while reading a namelist
1616      !!
1617      !! ** Method  :   Fortan open
1618      !!----------------------------------------------------------------------
[11536]1619      INTEGER                                , INTENT(inout) ::   kios    ! IO status after reading the namelist
1620      CHARACTER(len=*)                       , INTENT(in   ) ::   cdnam   ! group name of namelist for which error occurs
1621      !
1622      CHARACTER(len=5) ::   clios   ! string to convert iostat in character for print
[4147]1623      !!----------------------------------------------------------------------
[5836]1624      !
[7646]1625      WRITE (clios, '(I5.0)')   kios
[14072]1626      IF( kios < 0 ) THEN
[5836]1627         CALL ctl_warn( 'end of record or file while reading namelist '   &
1628            &           // TRIM(cdnam) // ' iostat = ' // TRIM(clios) )
[4147]1629      ENDIF
[5836]1630      !
[4147]1631      IF( kios > 0 ) THEN
[5836]1632         CALL ctl_stop( 'misspelled variable in namelist '   &
1633            &           // TRIM(cdnam) // ' iostat = ' // TRIM(clios) )
[4147]1634      ENDIF
1635      kios = 0
[5836]1636      !
[4147]1637   END SUBROUTINE ctl_nam
1638
[5836]1639
[2715]1640   INTEGER FUNCTION get_unit()
1641      !!----------------------------------------------------------------------
1642      !!                  ***  FUNCTION  get_unit  ***
1643      !!
1644      !! ** Purpose :   return the index of an unused logical unit
1645      !!----------------------------------------------------------------------
[3764]1646      LOGICAL :: llopn
[2715]1647      !!----------------------------------------------------------------------
1648      !
1649      get_unit = 15   ! choose a unit that is big enough then it is not already used in NEMO
1650      llopn = .TRUE.
1651      DO WHILE( (get_unit < 998) .AND. llopn )
1652         get_unit = get_unit + 1
1653         INQUIRE( unit = get_unit, opened = llopn )
1654      END DO
1655      IF( (get_unit == 999) .AND. llopn ) THEN
[11536]1656         CALL ctl_stop( 'STOP', 'get_unit: All logical units until 999 are used...' )
[2715]1657      ENDIF
1658      !
1659   END FUNCTION get_unit
1660
[12377]1661   SUBROUTINE load_nml( cdnambuff , cdnamfile, kout, ldwp)
1662      CHARACTER(LEN=:)    , ALLOCATABLE, INTENT(INOUT) :: cdnambuff
1663      CHARACTER(LEN=*), INTENT(IN )                :: cdnamfile
1664      CHARACTER(LEN=256)                           :: chline
1665      CHARACTER(LEN=1)                             :: csp
1666      INTEGER, INTENT(IN)                          :: kout
1667      LOGICAL, INTENT(IN)                          :: ldwp  !: .true. only for the root broadcaster
1668      INTEGER                                      :: itot, iun, iltc, inl, ios, itotsav
1669      !
1670      !csp = NEW_LINE('A')
1671      ! a new line character is the best seperator but some systems (e.g.Cray)
[14072]1672      ! seem to terminate namelist reads from internal files early if they
[12377]1673      ! encounter new-lines. Use a single space for safety.
1674      csp = ' '
1675      !
1676      ! Check if the namelist buffer has already been allocated. Return if it has.
1677      !
1678      IF ( ALLOCATED( cdnambuff ) ) RETURN
1679      IF( ldwp ) THEN
1680         !
1681         ! Open namelist file
1682         !
1683         CALL ctl_opn( iun, cdnamfile, 'OLD', 'FORMATTED', 'SEQUENTIAL', -1, kout, ldwp )
1684         !
1685         ! First pass: count characters excluding comments and trimable white space
1686         !
1687         itot=0
1688     10  READ(iun,'(A256)',END=20,ERR=20) chline
1689         iltc = LEN_TRIM(chline)
1690         IF ( iltc.GT.0 ) THEN
[14072]1691          inl = INDEX(chline, '!')
[12377]1692          IF( inl.eq.0 ) THEN
1693           itot = itot + iltc + 1                                ! +1 for the newline character
1694          ELSEIF( inl.GT.0 .AND. LEN_TRIM( chline(1:inl-1) ).GT.0 ) THEN
1695           itot = itot + inl                                  !  includes +1 for the newline character
1696          ENDIF
1697         ENDIF
1698         GOTO 10
1699     20  CONTINUE
1700         !
1701         ! Allocate text cdnambuff for condensed namelist
1702         !
1703!$AGRIF_DO_NOT_TREAT
1704         ALLOCATE( CHARACTER(LEN=itot) :: cdnambuff )
1705!$AGRIF_END_DO_NOT_TREAT
1706         itotsav = itot
1707         !
1708         ! Second pass: read and transfer pruned characters into cdnambuff
1709         !
1710         REWIND(iun)
1711         itot=1
1712     30  READ(iun,'(A256)',END=40,ERR=40) chline
1713         iltc = LEN_TRIM(chline)
1714         IF ( iltc.GT.0 ) THEN
1715          inl = INDEX(chline, '!')
1716          IF( inl.eq.0 ) THEN
1717           inl = iltc
1718          ELSE
1719           inl = inl - 1
1720          ENDIF
1721          IF( inl.GT.0 .AND. LEN_TRIM( chline(1:inl) ).GT.0 ) THEN
1722             cdnambuff(itot:itot+inl-1) = chline(1:inl)
1723             WRITE( cdnambuff(itot+inl:itot+inl), '(a)' ) csp
1724             itot = itot + inl + 1
1725          ENDIF
1726         ENDIF
1727         GOTO 30
1728     40  CONTINUE
1729         itot = itot - 1
1730         IF( itotsav .NE. itot ) WRITE(*,*) 'WARNING in load_nml. Allocated ',itotsav,' for read buffer; but used ',itot
1731         !
1732         ! Close namelist file
1733         !
1734         CLOSE(iun)
1735         !write(*,'(32A)') cdnambuff
1736      ENDIF
[14229]1737#if ! defined key_mpi_off
[12377]1738      CALL mpp_bcast_nml( cdnambuff, itot )
1739#endif
1740  END SUBROUTINE load_nml
1741
1742
[2715]1743   !!----------------------------------------------------------------------
[3]1744END MODULE lib_mpp
Note: See TracBrowser for help on using the repository browser.