New URL for NEMO forge!   http://forge.nemo-ocean.eu

Since March 2022 along with NEMO 4.2 release, the code development moved to a self-hosted GitLab.
This present forge is now archived and remained online for history.
lib_mpp.F90 in NEMO/branches/2020/dev_r12558_HPC-08_epico_Extra_Halo/src/OCE/LBC – NEMO

source: NEMO/branches/2020/dev_r12558_HPC-08_epico_Extra_Halo/src/OCE/LBC/lib_mpp.F90 @ 13247

Last change on this file since 13247 was 13247, checked in by francesca, 4 years ago

dev_r12558_HPC-08_epico_Extra_Halo: merge with trunk@13227, see #2366

  • Property svn:keywords set to Id
File size: 69.2 KB
RevLine 
[3]1MODULE lib_mpp
[13]2   !!======================================================================
3   !!                       ***  MODULE  lib_mpp  ***
[1344]4   !! Ocean numerics:  massively parallel processing library
[13]5   !!=====================================================================
[1344]6   !! History :  OPA  !  1994  (M. Guyon, J. Escobar, M. Imbard)  Original code
7   !!            7.0  !  1997  (A.M. Treguier)  SHMEM additions
8   !!            8.0  !  1998  (M. Imbard, J. Escobar, L. Colombet ) SHMEM and MPI
9   !!                 !  1998  (J.M. Molines) Open boundary conditions
[9019]10   !!   NEMO     1.0  !  2003  (J.M. Molines, G. Madec)  F90, free form
[1344]11   !!                 !  2003  (J.M. Molines) add mpp_ini_north(_3d,_2d)
12   !!             -   !  2004  (R. Bourdalle Badie)  isend option in mpi
13   !!                 !  2004  (J.M. Molines) minloc, maxloc
14   !!             -   !  2005  (G. Madec, S. Masson)  npolj=5,6 F-point & ice cases
15   !!             -   !  2005  (R. Redler) Replacement of MPI_COMM_WORLD except for MPI_Abort
16   !!             -   !  2005  (R. Benshila, G. Madec)  add extra halo case
17   !!             -   !  2008  (R. Benshila) add mpp_ini_ice
18   !!            3.2  !  2009  (R. Benshila) SHMEM suppression, north fold in lbc_nfd
[3764]19   !!            3.2  !  2009  (O. Marti)    add mpp_ini_znl
[2715]20   !!            4.0  !  2011  (G. Madec)  move ctl_ routines from in_out_manager
[9019]21   !!            3.5  !  2012  (S.Mocavero, I. Epicoco) Add mpp_lnk_bdy_3d/2d routines to optimize the BDY comm.
22   !!            3.5  !  2013  (C. Ethe, G. Madec)  message passing arrays as local variables
[6140]23   !!            3.5  !  2013  (S.Mocavero, I.Epicoco - CMCC) north fold optimizations
[9019]24   !!            3.6  !  2015  (O. Tintó and M. Castrillo - BSC) Added '_multiple' case for 2D lbc and max
25   !!            4.0  !  2017  (G. Madec) automatique allocation of array argument (use any 3rd dimension)
26   !!             -   !  2017  (G. Madec) create generic.h90 files to generate all lbc and north fold routines
[13]27   !!----------------------------------------------------------------------
[2715]28
29   !!----------------------------------------------------------------------
[6140]30   !!   ctl_stop      : update momentum and tracer Kz from a tke scheme
31   !!   ctl_warn      : initialization, namelist read, and parameters control
32   !!   ctl_opn       : Open file and check if required file is available.
33   !!   ctl_nam       : Prints informations when an error occurs while reading a namelist
[12377]34   !!   load_nml      : Read, condense and buffer namelist file into character array for use as an internal file
[2715]35   !!----------------------------------------------------------------------
[13]36   !!----------------------------------------------------------------------
[11536]37   !!   mpp_start     : get local communicator its size and rank
[2715]38   !!   mpp_lnk       : interface (defined in lbclnk) for message passing of 2d or 3d arrays (mpp_lnk_2d, mpp_lnk_3d)
[4990]39   !!   mpp_lnk_icb   : interface for message passing of 2d arrays with extra halo for icebergs (mpp_lnk_2d_icb)
[6140]40   !!   mpprecv       :
[9019]41   !!   mppsend       :
[2715]42   !!   mppscatter    :
43   !!   mppgather     :
44   !!   mpp_min       : generic interface for mppmin_int , mppmin_a_int , mppmin_real, mppmin_a_real
45   !!   mpp_max       : generic interface for mppmax_int , mppmax_a_int , mppmax_real, mppmax_a_real
46   !!   mpp_sum       : generic interface for mppsum_int , mppsum_a_int , mppsum_real, mppsum_a_real
47   !!   mpp_minloc    :
48   !!   mpp_maxloc    :
49   !!   mppsync       :
50   !!   mppstop       :
[1344]51   !!   mpp_ini_north : initialisation of north fold
[9019]52   !!   mpp_lbc_north_icb : alternative to mpp_nfd for extra outer halo with icebergs
[12377]53   !!   mpp_bcast_nml : broadcast/receive namelist character buffer from reading process to all others
[13]54   !!----------------------------------------------------------------------
[3764]55   USE dom_oce        ! ocean space and time domain
[2715]56   USE in_out_manager ! I/O manager
[3]57
[13]58   IMPLICIT NONE
[415]59   PRIVATE
[9019]60   !
[12377]61   PUBLIC   ctl_stop, ctl_warn, ctl_opn, ctl_nam, load_nml
[11536]62   PUBLIC   mpp_start, mppstop, mppsync, mpp_comm_free
[9019]63   PUBLIC   mpp_ini_north
[1344]64   PUBLIC   mpp_min, mpp_max, mpp_sum, mpp_minloc, mpp_maxloc
[10425]65   PUBLIC   mpp_delay_max, mpp_delay_sum, mpp_delay_rcv
[3294]66   PUBLIC   mppscatter, mppgather
[10425]67   PUBLIC   mpp_ini_znl
[3764]68   PUBLIC   mppsend, mpprecv                          ! needed by TAM and ICB routines
[13247]69   PUBLIC   mppsend_sp, mpprecv_sp                          ! needed by TAM and ICB routines
70   PUBLIC   mppsend_dp, mpprecv_dp                          ! needed by TAM and ICB routines
[11536]71   PUBLIC   mpp_report
[12377]72   PUBLIC   mpp_bcast_nml
[11536]73   PUBLIC   tic_tac
74#if ! defined key_mpp_mpi
75   PUBLIC MPI_Wtime
76#endif
[5429]77   
[13]78   !! * Interfaces
79   !! define generic interface for these routine as they are called sometimes
[1344]80   !! with scalar arguments instead of array arguments, which causes problems
81   !! for the compilation on AIX system as well as NEC and SGI. Ok on COMPACQ
[13]82   INTERFACE mpp_min
[13247]83      MODULE PROCEDURE mppmin_a_int, mppmin_int
84      MODULE PROCEDURE mppmin_a_real_sp, mppmin_real_sp
85      MODULE PROCEDURE mppmin_a_real_dp, mppmin_real_dp
[13]86   END INTERFACE
87   INTERFACE mpp_max
[13247]88      MODULE PROCEDURE mppmax_a_int, mppmax_int
89      MODULE PROCEDURE mppmax_a_real_sp, mppmax_real_sp
90      MODULE PROCEDURE mppmax_a_real_dp, mppmax_real_dp
[13]91   END INTERFACE
92   INTERFACE mpp_sum
[13247]93      MODULE PROCEDURE mppsum_a_int, mppsum_int
94      MODULE PROCEDURE mppsum_realdd, mppsum_a_realdd
95      MODULE PROCEDURE mppsum_a_real_sp, mppsum_real_sp
96      MODULE PROCEDURE mppsum_a_real_dp, mppsum_real_dp
[13]97   END INTERFACE
[1344]98   INTERFACE mpp_minloc
[13247]99      MODULE PROCEDURE mpp_minloc2d_sp ,mpp_minloc3d_sp
100      MODULE PROCEDURE mpp_minloc2d_dp ,mpp_minloc3d_dp
[1344]101   END INTERFACE
102   INTERFACE mpp_maxloc
[13247]103      MODULE PROCEDURE mpp_maxloc2d_sp ,mpp_maxloc3d_sp
104      MODULE PROCEDURE mpp_maxloc2d_dp ,mpp_maxloc3d_dp
[1344]105   END INTERFACE
[6490]106
[51]107   !! ========================= !!
108   !!  MPI  variable definition !!
109   !! ========================= !!
[11536]110#if   defined key_mpp_mpi
[1629]111!$AGRIF_DO_NOT_TREAT
[2004]112   INCLUDE 'mpif.h'
[1629]113!$AGRIF_END_DO_NOT_TREAT
[1344]114   LOGICAL, PUBLIC, PARAMETER ::   lk_mpp = .TRUE.    !: mpp flag
[11536]115#else   
116   INTEGER, PUBLIC, PARAMETER ::   MPI_STATUS_SIZE = 1
117   INTEGER, PUBLIC, PARAMETER ::   MPI_DOUBLE_PRECISION = 8
118   LOGICAL, PUBLIC, PARAMETER ::   lk_mpp = .FALSE.    !: mpp flag
119#endif
[3]120
[1344]121   INTEGER, PARAMETER         ::   nprocmax = 2**10   ! maximun dimension (required to be a power of 2)
[3764]122
[10425]123   INTEGER, PUBLIC ::   mppsize        ! number of process
124   INTEGER, PUBLIC ::   mpprank        ! process number  [ 0 - size-1 ]
[2363]125!$AGRIF_DO_NOT_TREAT
[9570]126   INTEGER, PUBLIC ::   mpi_comm_oce   ! opa local communicator
[2363]127!$AGRIF_END_DO_NOT_TREAT
[3]128
[2480]129   INTEGER :: MPI_SUMDD
[1976]130
[1345]131   ! variables used for zonal integration
132   INTEGER, PUBLIC ::   ncomm_znl       !: communicator made by the processors on the same zonal average
[9019]133   LOGICAL, PUBLIC ::   l_znl_root      !: True on the 'left'most processor on the same row
134   INTEGER         ::   ngrp_znl        !  group ID for the znl processors
135   INTEGER         ::   ndim_rank_znl   !  number of processors on the same zonal average
[2715]136   INTEGER, DIMENSION(:), ALLOCATABLE, SAVE ::   nrank_znl  ! dimension ndim_rank_znl, number of the procs into the same znl domain
[3]137
[3764]138   ! North fold condition in mpp_mpi with jpni > 1 (PUBLIC for TAM)
[9019]139   INTEGER, PUBLIC ::   ngrp_world        !: group ID for the world processors
140   INTEGER, PUBLIC ::   ngrp_opa          !: group ID for the opa processors
141   INTEGER, PUBLIC ::   ngrp_north        !: group ID for the northern processors (to be fold)
142   INTEGER, PUBLIC ::   ncomm_north       !: communicator made by the processors belonging to ngrp_north
143   INTEGER, PUBLIC ::   ndim_rank_north   !: number of 'sea' processor in the northern line (can be /= jpni !)
144   INTEGER, PUBLIC ::   njmppmax          !: value of njmpp for the processors of the northern line
145   INTEGER, PUBLIC ::   north_root        !: number (in the comm_opa) of proc 0 in the northern comm
146   INTEGER, PUBLIC, DIMENSION(:), ALLOCATABLE, SAVE ::   nrank_north   !: dimension ndim_rank_north
[3764]147
[10425]148   ! Communications summary report
[13229]149   CHARACTER(len=lca), DIMENSION(:), ALLOCATABLE ::   crname_lbc                   !: names of lbc_lnk calling routines
150   CHARACTER(len=lca), DIMENSION(:), ALLOCATABLE ::   crname_glb                   !: names of global comm calling routines
151   CHARACTER(len=lca), DIMENSION(:), ALLOCATABLE ::   crname_dlg                   !: names of delayed global comm calling routines
[10425]152   INTEGER, PUBLIC                               ::   ncom_stp = 0                 !: copy of time step # istp
153   INTEGER, PUBLIC                               ::   ncom_fsbc = 1                !: copy of sbc time step # nn_fsbc
154   INTEGER, PUBLIC                               ::   ncom_freq                    !: frequency of comm diagnostic
155   INTEGER, PUBLIC , DIMENSION(:,:), ALLOCATABLE ::   ncomm_sequence               !: size of communicated arrays (halos)
[10781]156   INTEGER, PARAMETER, PUBLIC                    ::   ncom_rec_max = 5000          !: max number of communication record
[10425]157   INTEGER, PUBLIC                               ::   n_sequence_lbc = 0           !: # of communicated arraysvia lbc
158   INTEGER, PUBLIC                               ::   n_sequence_glb = 0           !: # of global communications
[10437]159   INTEGER, PUBLIC                               ::   n_sequence_dlg = 0           !: # of delayed global communications
[10425]160   INTEGER, PUBLIC                               ::   numcom = -1                  !: logical unit for communicaton report
161   LOGICAL, PUBLIC                               ::   l_full_nf_update = .TRUE.    !: logical for a full (2lines) update of bc at North fold report
162   INTEGER,                    PARAMETER, PUBLIC ::   nbdelay = 2       !: number of delayed operations
163   !: name (used as id) of allreduce-delayed operations
[10521]164   ! Warning: we must use the same character length in an array constructor (at least for gcc compiler)
165   CHARACTER(len=32), DIMENSION(nbdelay), PUBLIC ::   c_delaylist = (/ 'cflice', 'fwb   ' /)
[10425]166   !: component name where the allreduce-delayed operation is performed
167   CHARACTER(len=3),  DIMENSION(nbdelay), PUBLIC ::   c_delaycpnt = (/ 'ICE'   , 'OCE' /)
168   TYPE, PUBLIC ::   DELAYARR
169      REAL(   wp), POINTER, DIMENSION(:) ::  z1d => NULL()
[13247]170      COMPLEX(dp), POINTER, DIMENSION(:) ::  y1d => NULL()
[10425]171   END TYPE DELAYARR
[10817]172   TYPE( DELAYARR ), DIMENSION(nbdelay), PUBLIC, SAVE  ::   todelay         !: must have SAVE for default initialization of DELAYARR
173   INTEGER,          DIMENSION(nbdelay), PUBLIC        ::   ndelayid = -1   !: mpi request id of the delayed operations
[10425]174
175   ! timing summary report
[13247]176   REAL(dp), DIMENSION(2), PUBLIC ::  waiting_time = 0._dp
177   REAL(dp)              , PUBLIC ::  compute_time = 0._dp, elapsed_time = 0._dp
[10425]178   
[9019]179   REAL(wp), DIMENSION(:), ALLOCATABLE, SAVE ::   tampon   ! buffer in case of bsend
[3]180
[9019]181   LOGICAL, PUBLIC ::   ln_nnogather                !: namelist control of northfold comms
182   LOGICAL, PUBLIC ::   l_north_nogather = .FALSE.  !: internal control of northfold comms
[11536]183   
[12377]184   !! * Substitutions
185#  include "do_loop_substitute.h90"
[51]186   !!----------------------------------------------------------------------
[9598]187   !! NEMO/OCE 4.0 , NEMO Consortium (2018)
[888]188   !! $Id$
[10068]189   !! Software governed by the CeCILL license (see ./LICENSE)
[1344]190   !!----------------------------------------------------------------------
[3]191CONTAINS
192
[11536]193   SUBROUTINE mpp_start( localComm )
[2715]194      !!----------------------------------------------------------------------
[11536]195      !!                  ***  routine mpp_start  ***
[3764]196      !!
[11536]197      !! ** Purpose :   get mpi_comm_oce, mpprank and mppsize
[51]198      !!----------------------------------------------------------------------
[6140]199      INTEGER         , OPTIONAL   , INTENT(in   ) ::   localComm    !
[2715]200      !
[11536]201      INTEGER ::   ierr
202      LOGICAL ::   llmpi_init
[51]203      !!----------------------------------------------------------------------
[11536]204#if defined key_mpp_mpi
[1344]205      !
[11536]206      CALL mpi_initialized ( llmpi_init, ierr )
207      IF( ierr /= MPI_SUCCESS ) CALL ctl_stop( 'STOP', ' lib_mpp: Error in routine mpi_initialized' )
[2715]208
[11536]209      IF( .NOT. llmpi_init ) THEN
210         IF( PRESENT(localComm) ) THEN
211            WRITE(ctmp1,*) ' lib_mpp: You cannot provide a local communicator '
212            WRITE(ctmp2,*) '          without calling MPI_Init before ! '
213            CALL ctl_stop( 'STOP', ctmp1, ctmp2 )
214         ENDIF
215         CALL mpi_init( ierr )
216         IF( ierr /= MPI_SUCCESS ) CALL ctl_stop( 'STOP', ' lib_mpp: Error in routine mpi_init' )
[2480]217      ENDIF
[11536]218       
[3764]219      IF( PRESENT(localComm) ) THEN
[2480]220         IF( Agrif_Root() ) THEN
[9570]221            mpi_comm_oce = localComm
[2480]222         ENDIF
223      ELSE
[11536]224         CALL mpi_comm_dup( mpi_comm_world, mpi_comm_oce, ierr)
225         IF( ierr /= MPI_SUCCESS ) CALL ctl_stop( 'STOP', ' lib_mpp: Error in routine mpi_comm_dup' )
[3764]226      ENDIF
[2480]227
[11536]228# if defined key_agrif
[9019]229      IF( Agrif_Root() ) THEN
[9570]230         CALL Agrif_MPI_Init(mpi_comm_oce)
[5656]231      ELSE
[9570]232         CALL Agrif_MPI_set_grid_comm(mpi_comm_oce)
[5656]233      ENDIF
[11536]234# endif
[5656]235
[9570]236      CALL mpi_comm_rank( mpi_comm_oce, mpprank, ierr )
237      CALL mpi_comm_size( mpi_comm_oce, mppsize, ierr )
[3764]238      !
[1976]239      CALL MPI_OP_CREATE(DDPDD_MPI, .TRUE., MPI_SUMDD, ierr)
240      !
[11536]241#else
242      IF( PRESENT( localComm ) ) mpi_comm_oce = localComm
243      mppsize = 1
244      mpprank = 0
245#endif
246   END SUBROUTINE mpp_start
[3]247
[6140]248
[1344]249   SUBROUTINE mppsend( ktyp, pmess, kbytes, kdest, md_req )
[51]250      !!----------------------------------------------------------------------
251      !!                  ***  routine mppsend  ***
[3764]252      !!
[51]253      !! ** Purpose :   Send messag passing array
254      !!
255      !!----------------------------------------------------------------------
[1344]256      REAL(wp), INTENT(inout) ::   pmess(*)   ! array of real
257      INTEGER , INTENT(in   ) ::   kbytes     ! size of the array pmess
258      INTEGER , INTENT(in   ) ::   kdest      ! receive process number
259      INTEGER , INTENT(in   ) ::   ktyp       ! tag of the message
260      INTEGER , INTENT(in   ) ::   md_req     ! argument for isend
261      !!
262      INTEGER ::   iflag
[13247]263      INTEGER :: mpi_working_type
[51]264      !!----------------------------------------------------------------------
[1344]265      !
[11536]266#if defined key_mpp_mpi
[13247]267      IF (wp == dp) THEN
268         mpi_working_type = mpi_double_precision
269      ELSE
270         mpi_working_type = mpi_real
271      END IF
272      CALL mpi_isend( pmess, kbytes, mpi_working_type, kdest , ktyp, mpi_comm_oce, md_req, iflag )
[11536]273#endif
[1344]274      !
[51]275   END SUBROUTINE mppsend
[3]276
277
[13247]278   SUBROUTINE mppsend_dp( ktyp, pmess, kbytes, kdest, md_req )
279      !!----------------------------------------------------------------------
280      !!                  ***  routine mppsend  ***
281      !!
282      !! ** Purpose :   Send messag passing array
283      !!
284      !!----------------------------------------------------------------------
285      REAL(dp), INTENT(inout) ::   pmess(*)   ! array of real
286      INTEGER , INTENT(in   ) ::   kbytes     ! size of the array pmess
287      INTEGER , INTENT(in   ) ::   kdest      ! receive process number
288      INTEGER , INTENT(in   ) ::   ktyp       ! tag of the message
289      INTEGER , INTENT(in   ) ::   md_req     ! argument for isend
290      !!
291      INTEGER ::   iflag
292      !!----------------------------------------------------------------------
293      !
294#if defined key_mpp_mpi
295      CALL mpi_isend( pmess, kbytes, mpi_double_precision, kdest , ktyp, mpi_comm_oce, md_req, iflag )
296#endif
297      !
298   END SUBROUTINE mppsend_dp
299
300
301   SUBROUTINE mppsend_sp( ktyp, pmess, kbytes, kdest, md_req )
302      !!----------------------------------------------------------------------
303      !!                  ***  routine mppsend  ***
304      !!
305      !! ** Purpose :   Send messag passing array
306      !!
307      !!----------------------------------------------------------------------
308      REAL(sp), INTENT(inout) ::   pmess(*)   ! array of real
309      INTEGER , INTENT(in   ) ::   kbytes     ! size of the array pmess
310      INTEGER , INTENT(in   ) ::   kdest      ! receive process number
311      INTEGER , INTENT(in   ) ::   ktyp       ! tag of the message
312      INTEGER , INTENT(in   ) ::   md_req     ! argument for isend
313      !!
314      INTEGER ::   iflag
315      !!----------------------------------------------------------------------
316      !
317#if defined key_mpp_mpi
318      CALL mpi_isend( pmess, kbytes, mpi_real, kdest , ktyp, mpi_comm_oce, md_req, iflag )
319#endif
320      !
321   END SUBROUTINE mppsend_sp
322
323
[3294]324   SUBROUTINE mpprecv( ktyp, pmess, kbytes, ksource )
[51]325      !!----------------------------------------------------------------------
326      !!                  ***  routine mpprecv  ***
327      !!
328      !! ** Purpose :   Receive messag passing array
329      !!
330      !!----------------------------------------------------------------------
[1344]331      REAL(wp), INTENT(inout) ::   pmess(*)   ! array of real
332      INTEGER , INTENT(in   ) ::   kbytes     ! suze of the array pmess
333      INTEGER , INTENT(in   ) ::   ktyp       ! Tag of the recevied message
[3764]334      INTEGER, OPTIONAL, INTENT(in) :: ksource    ! source process number
[1344]335      !!
[51]336      INTEGER :: istatus(mpi_status_size)
337      INTEGER :: iflag
[3294]338      INTEGER :: use_source
[13247]339      INTEGER :: mpi_working_type
[1344]340      !!----------------------------------------------------------------------
341      !
[11536]342#if defined key_mpp_mpi
[3764]343      ! If a specific process number has been passed to the receive call,
[3294]344      ! use that one. Default is to use mpi_any_source
[6140]345      use_source = mpi_any_source
346      IF( PRESENT(ksource) )   use_source = ksource
347      !
[13247]348      IF (wp == dp) THEN
349         mpi_working_type = mpi_double_precision
350      ELSE
351         mpi_working_type = mpi_real
352      END IF
353      CALL mpi_recv( pmess, kbytes, mpi_working_type, use_source, ktyp, mpi_comm_oce, istatus, iflag )
[11536]354#endif
[1344]355      !
[51]356   END SUBROUTINE mpprecv
[3]357
[13247]358   SUBROUTINE mpprecv_dp( ktyp, pmess, kbytes, ksource )
359      !!----------------------------------------------------------------------
360      !!                  ***  routine mpprecv  ***
361      !!
362      !! ** Purpose :   Receive messag passing array
363      !!
364      !!----------------------------------------------------------------------
365      REAL(dp), INTENT(inout) ::   pmess(*)   ! array of real
366      INTEGER , INTENT(in   ) ::   kbytes     ! suze of the array pmess
367      INTEGER , INTENT(in   ) ::   ktyp       ! Tag of the recevied message
368      INTEGER, OPTIONAL, INTENT(in) :: ksource    ! source process number
369      !!
370      INTEGER :: istatus(mpi_status_size)
371      INTEGER :: iflag
372      INTEGER :: use_source
373      !!----------------------------------------------------------------------
374      !
375#if defined key_mpp_mpi
376      ! If a specific process number has been passed to the receive call,
377      ! use that one. Default is to use mpi_any_source
378      use_source = mpi_any_source
379      IF( PRESENT(ksource) )   use_source = ksource
380      !
381      CALL mpi_recv( pmess, kbytes, mpi_double_precision, use_source, ktyp, mpi_comm_oce, istatus, iflag )
382#endif
383      !
384   END SUBROUTINE mpprecv_dp
[3]385
[13247]386
387   SUBROUTINE mpprecv_sp( ktyp, pmess, kbytes, ksource )
388      !!----------------------------------------------------------------------
389      !!                  ***  routine mpprecv  ***
390      !!
391      !! ** Purpose :   Receive messag passing array
392      !!
393      !!----------------------------------------------------------------------
394      REAL(sp), INTENT(inout) ::   pmess(*)   ! array of real
395      INTEGER , INTENT(in   ) ::   kbytes     ! suze of the array pmess
396      INTEGER , INTENT(in   ) ::   ktyp       ! Tag of the recevied message
397      INTEGER, OPTIONAL, INTENT(in) :: ksource    ! source process number
398      !!
399      INTEGER :: istatus(mpi_status_size)
400      INTEGER :: iflag
401      INTEGER :: use_source
402      !!----------------------------------------------------------------------
403      !
404#if defined key_mpp_mpi
405      ! If a specific process number has been passed to the receive call,
406      ! use that one. Default is to use mpi_any_source
407      use_source = mpi_any_source
408      IF( PRESENT(ksource) )   use_source = ksource
409      !
410      CALL mpi_recv( pmess, kbytes, mpi_real, use_source, ktyp, mpi_comm_oce, istatus, iflag )
411#endif
412      !
413   END SUBROUTINE mpprecv_sp
414
415
[51]416   SUBROUTINE mppgather( ptab, kp, pio )
417      !!----------------------------------------------------------------------
418      !!                   ***  routine mppgather  ***
[3764]419      !!
420      !! ** Purpose :   Transfert between a local subdomain array and a work
[51]421      !!     array which is distributed following the vertical level.
422      !!
[1344]423      !!----------------------------------------------------------------------
[6140]424      REAL(wp), DIMENSION(jpi,jpj)      , INTENT(in   ) ::   ptab   ! subdomain input array
425      INTEGER                           , INTENT(in   ) ::   kp     ! record length
[1344]426      REAL(wp), DIMENSION(jpi,jpj,jpnij), INTENT(  out) ::   pio    ! subdomain input array
[51]427      !!
[1344]428      INTEGER :: itaille, ierror   ! temporary integer
[51]429      !!---------------------------------------------------------------------
[1344]430      !
431      itaille = jpi * jpj
[11536]432#if defined key_mpp_mpi
[1344]433      CALL mpi_gather( ptab, itaille, mpi_double_precision, pio, itaille     ,   &
[9570]434         &                            mpi_double_precision, kp , mpi_comm_oce, ierror )
[11536]435#else
436      pio(:,:,1) = ptab(:,:)
437#endif
[1344]438      !
[51]439   END SUBROUTINE mppgather
[3]440
441
[51]442   SUBROUTINE mppscatter( pio, kp, ptab )
443      !!----------------------------------------------------------------------
444      !!                  ***  routine mppscatter  ***
445      !!
[3764]446      !! ** Purpose :   Transfert between awork array which is distributed
[51]447      !!      following the vertical level and the local subdomain array.
448      !!
449      !!----------------------------------------------------------------------
[6140]450      REAL(wp), DIMENSION(jpi,jpj,jpnij)  ::   pio    ! output array
451      INTEGER                             ::   kp     ! Tag (not used with MPI
452      REAL(wp), DIMENSION(jpi,jpj)        ::   ptab   ! subdomain array input
[1344]453      !!
454      INTEGER :: itaille, ierror   ! temporary integer
[51]455      !!---------------------------------------------------------------------
[1344]456      !
[6140]457      itaille = jpi * jpj
[1344]458      !
[11536]459#if defined key_mpp_mpi
[1344]460      CALL mpi_scatter( pio, itaille, mpi_double_precision, ptab, itaille     ,   &
[9570]461         &                            mpi_double_precision, kp  , mpi_comm_oce, ierror )
[11536]462#else
463      ptab(:,:) = pio(:,:,1)
464#endif
[1344]465      !
[51]466   END SUBROUTINE mppscatter
[3]467
[10425]468   
469   SUBROUTINE mpp_delay_sum( cdname, cdelay, y_in, pout, ldlast, kcom )
470     !!----------------------------------------------------------------------
471      !!                   ***  routine mpp_delay_sum  ***
[1344]472      !!
[10425]473      !! ** Purpose :   performed delayed mpp_sum, the result is received on next call
474      !!
[1344]475      !!----------------------------------------------------------------------
[10425]476      CHARACTER(len=*), INTENT(in   )               ::   cdname  ! name of the calling subroutine
477      CHARACTER(len=*), INTENT(in   )               ::   cdelay  ! name (used as id) of the delayed operation
[13247]478      COMPLEX(dp),      INTENT(in   ), DIMENSION(:) ::   y_in
[10425]479      REAL(wp),         INTENT(  out), DIMENSION(:) ::   pout
480      LOGICAL,          INTENT(in   )               ::   ldlast  ! true if this is the last time we call this routine
481      INTEGER,          INTENT(in   ), OPTIONAL     ::   kcom
[1344]482      !!
[10425]483      INTEGER ::   ji, isz
484      INTEGER ::   idvar
485      INTEGER ::   ierr, ilocalcomm
[13247]486      COMPLEX(dp), ALLOCATABLE, DIMENSION(:) ::   ytmp
[1344]487      !!----------------------------------------------------------------------
[11536]488#if defined key_mpp_mpi
[9570]489      ilocalcomm = mpi_comm_oce
[9019]490      IF( PRESENT(kcom) )   ilocalcomm = kcom
[3]491
[10425]492      isz = SIZE(y_in)
493     
[10437]494      IF( narea == 1 .AND. numcom == -1 ) CALL mpp_report( cdname, ld_dlg = .TRUE. )
[13]495
[10425]496      idvar = -1
497      DO ji = 1, nbdelay
498         IF( TRIM(cdelay) == TRIM(c_delaylist(ji)) ) idvar = ji
499      END DO
500      IF ( idvar == -1 )   CALL ctl_stop( 'STOP',' mpp_delay_sum : please add a new delayed exchange for '//TRIM(cdname) )
501
502      IF ( ndelayid(idvar) == 0 ) THEN         ! first call    with restart: %z1d defined in iom_delay_rst
503         !                                       --------------------------
504         IF ( SIZE(todelay(idvar)%z1d) /= isz ) THEN                  ! Check dimension coherence
505            IF(lwp) WRITE(numout,*) ' WARNING: the nb of delayed variables in restart file is not the model one'
506            DEALLOCATE(todelay(idvar)%z1d)
507            ndelayid(idvar) = -1                                      ! do as if we had no restart
508         ELSE
509            ALLOCATE(todelay(idvar)%y1d(isz))
510            todelay(idvar)%y1d(:) = CMPLX(todelay(idvar)%z1d(:), 0., wp)   ! create %y1d, complex variable needed by mpi_sumdd
511         END IF
512      ENDIF
513     
514      IF( ndelayid(idvar) == -1 ) THEN         ! first call without restart: define %y1d and %z1d from y_in with blocking allreduce
515         !                                       --------------------------
516         ALLOCATE(todelay(idvar)%z1d(isz), todelay(idvar)%y1d(isz))   ! allocate also %z1d as used for the restart
517         CALL mpi_allreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ierr )   ! get %y1d
518         todelay(idvar)%z1d(:) = REAL(todelay(idvar)%y1d(:), wp)      ! define %z1d from %y1d
519      ENDIF
520
521      IF( ndelayid(idvar) > 0 )   CALL mpp_delay_rcv( idvar )         ! make sure %z1d is received
522
523      ! send back pout from todelay(idvar)%z1d defined at previous call
524      pout(:) = todelay(idvar)%z1d(:)
525
526      ! send y_in into todelay(idvar)%y1d with a non-blocking communication
[11536]527# if defined key_mpi2
[10526]528      IF( ln_timing ) CALL tic_tac( .TRUE., ld_global = .TRUE.)
[12512]529      CALL  mpi_allreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ierr )
530      ndelayid(idvar) = 1
[10526]531      IF( ln_timing ) CALL tic_tac(.FALSE., ld_global = .TRUE.)
[11536]532# else
533      CALL mpi_iallreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ndelayid(idvar), ierr )
534# endif
[10526]535#else
[11536]536      pout(:) = REAL(y_in(:), wp)
[10526]537#endif
[10425]538
539   END SUBROUTINE mpp_delay_sum
540
[9019]541   
[10425]542   SUBROUTINE mpp_delay_max( cdname, cdelay, p_in, pout, ldlast, kcom )
[9019]543      !!----------------------------------------------------------------------
[10425]544      !!                   ***  routine mpp_delay_max  ***
[9019]545      !!
[10425]546      !! ** Purpose :   performed delayed mpp_max, the result is received on next call
[9019]547      !!
548      !!----------------------------------------------------------------------
[10425]549      CHARACTER(len=*), INTENT(in   )                 ::   cdname  ! name of the calling subroutine
550      CHARACTER(len=*), INTENT(in   )                 ::   cdelay  ! name (used as id) of the delayed operation
551      REAL(wp),         INTENT(in   ), DIMENSION(:)   ::   p_in    !
552      REAL(wp),         INTENT(  out), DIMENSION(:)   ::   pout    !
553      LOGICAL,          INTENT(in   )                 ::   ldlast  ! true if this is the last time we call this routine
554      INTEGER,          INTENT(in   ), OPTIONAL       ::   kcom
[9019]555      !!
[10425]556      INTEGER ::   ji, isz
557      INTEGER ::   idvar
558      INTEGER ::   ierr, ilocalcomm
[13247]559      INTEGER ::   MPI_TYPE
[9019]560      !!----------------------------------------------------------------------
[13247]561     
[11536]562#if defined key_mpp_mpi
[13247]563      if( wp == dp ) then
564         MPI_TYPE = MPI_DOUBLE_PRECISION
565      else if ( wp == sp ) then
566         MPI_TYPE = MPI_REAL
567      else
568        CALL ctl_stop( "Error defining type, wp is neither dp nor sp" )
569   
570      end if
571
[9570]572      ilocalcomm = mpi_comm_oce
[9019]573      IF( PRESENT(kcom) )   ilocalcomm = kcom
[6140]574
[10425]575      isz = SIZE(p_in)
[9019]576
[10437]577      IF( narea == 1 .AND. numcom == -1 ) CALL mpp_report( cdname, ld_dlg = .TRUE. )
[13]578
[10425]579      idvar = -1
580      DO ji = 1, nbdelay
581         IF( TRIM(cdelay) == TRIM(c_delaylist(ji)) ) idvar = ji
582      END DO
583      IF ( idvar == -1 )   CALL ctl_stop( 'STOP',' mpp_delay_max : please add a new delayed exchange for '//TRIM(cdname) )
[3]584
[10425]585      IF ( ndelayid(idvar) == 0 ) THEN         ! first call    with restart: %z1d defined in iom_delay_rst
586         !                                       --------------------------
587         IF ( SIZE(todelay(idvar)%z1d) /= isz ) THEN                  ! Check dimension coherence
588            IF(lwp) WRITE(numout,*) ' WARNING: the nb of delayed variables in restart file is not the model one'
589            DEALLOCATE(todelay(idvar)%z1d)
590            ndelayid(idvar) = -1                                      ! do as if we had no restart
591         END IF
592      ENDIF
[13]593
[10425]594      IF( ndelayid(idvar) == -1 ) THEN         ! first call without restart: define %z1d from p_in with a blocking allreduce
595         !                                       --------------------------
596         ALLOCATE(todelay(idvar)%z1d(isz))
597         CALL mpi_allreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_DOUBLE_PRECISION, mpi_max, ilocalcomm, ierr )   ! get %z1d
598      ENDIF
[3]599
[10425]600      IF( ndelayid(idvar) > 0 )   CALL mpp_delay_rcv( idvar )         ! make sure %z1d is received
[3]601
[10425]602      ! send back pout from todelay(idvar)%z1d defined at previous call
603      pout(:) = todelay(idvar)%z1d(:)
[13]604
[10425]605      ! send p_in into todelay(idvar)%z1d with a non-blocking communication
[11536]606# if defined key_mpi2
[10526]607      IF( ln_timing ) CALL tic_tac( .TRUE., ld_global = .TRUE.)
[13247]608      CALL  mpi_allreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_TYPE, mpi_max, ilocalcomm, ndelayid(idvar), ierr )
[10526]609      IF( ln_timing ) CALL tic_tac(.FALSE., ld_global = .TRUE.)
[11536]610# else
[13247]611      CALL mpi_iallreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_TYPE, mpi_max, ilocalcomm, ndelayid(idvar), ierr )
[11536]612# endif
[10526]613#else
[11536]614      pout(:) = p_in(:)
[10526]615#endif
[10425]616
617   END SUBROUTINE mpp_delay_max
618
619   
620   SUBROUTINE mpp_delay_rcv( kid )
621      !!----------------------------------------------------------------------
622      !!                   ***  routine mpp_delay_rcv  ***
[1344]623      !!
[10425]624      !! ** Purpose :  force barrier for delayed mpp (needed for restart)
[1344]625      !!
[10425]626      !!----------------------------------------------------------------------
627      INTEGER,INTENT(in   )      ::  kid 
628      INTEGER ::   ierr
629      !!----------------------------------------------------------------------
[11536]630#if defined key_mpp_mpi
[10425]631      IF( ndelayid(kid) /= -2 ) THEN 
[10526]632#if ! defined key_mpi2
[10425]633         IF( ln_timing ) CALL tic_tac( .TRUE., ld_global = .TRUE.)
634         CALL mpi_wait( ndelayid(kid), MPI_STATUS_IGNORE, ierr )                        ! make sure todelay(kid) is received
635         IF( ln_timing ) CALL tic_tac(.FALSE., ld_global = .TRUE.)
[10526]636#endif
[10425]637         IF( ASSOCIATED(todelay(kid)%y1d) )   todelay(kid)%z1d(:) = REAL(todelay(kid)%y1d(:), wp)  ! define %z1d from %y1d
638         ndelayid(kid) = -2   ! add flag to know that mpi_wait was already called on kid
639      ENDIF
[11536]640#endif
[10425]641   END SUBROUTINE mpp_delay_rcv
[3]642
[12377]643   SUBROUTINE mpp_bcast_nml( cdnambuff , kleng )
644      CHARACTER(LEN=:)    , ALLOCATABLE, INTENT(INOUT) :: cdnambuff
645      INTEGER                          , INTENT(INOUT) :: kleng
646      !!----------------------------------------------------------------------
647      !!                  ***  routine mpp_bcast_nml  ***
648      !!
649      !! ** Purpose :   broadcast namelist character buffer
650      !!
651      !!----------------------------------------------------------------------
652      !!
653      INTEGER ::   iflag
654      !!----------------------------------------------------------------------
655      !
656#if defined key_mpp_mpi
657      call MPI_BCAST(kleng, 1, MPI_INT, 0, mpi_comm_oce, iflag)
658      call MPI_BARRIER(mpi_comm_oce, iflag)
659!$AGRIF_DO_NOT_TREAT
660      IF ( .NOT. ALLOCATED(cdnambuff) ) ALLOCATE( CHARACTER(LEN=kleng) :: cdnambuff )
661!$AGRIF_END_DO_NOT_TREAT
662      call MPI_BCAST(cdnambuff, kleng, MPI_CHARACTER, 0, mpi_comm_oce, iflag)
663      call MPI_BARRIER(mpi_comm_oce, iflag)
664#endif
665      !
666   END SUBROUTINE mpp_bcast_nml
667
[10425]668   
669   !!----------------------------------------------------------------------
670   !!    ***  mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real  ***
671   !!   
672   !!----------------------------------------------------------------------
673   !!
674#  define OPERATION_MAX
675#  define INTEGER_TYPE
676#  define DIM_0d
677#     define ROUTINE_ALLREDUCE           mppmax_int
678#     include "mpp_allreduce_generic.h90"
679#     undef ROUTINE_ALLREDUCE
680#  undef DIM_0d
681#  define DIM_1d
682#     define ROUTINE_ALLREDUCE           mppmax_a_int
683#     include "mpp_allreduce_generic.h90"
684#     undef ROUTINE_ALLREDUCE
685#  undef DIM_1d
686#  undef INTEGER_TYPE
687!
[13247]688   !!
689   !!   ----   SINGLE PRECISION VERSIONS
690   !!
691#  define SINGLE_PRECISION
[10425]692#  define REAL_TYPE
693#  define DIM_0d
[13247]694#     define ROUTINE_ALLREDUCE           mppmax_real_sp
[10425]695#     include "mpp_allreduce_generic.h90"
696#     undef ROUTINE_ALLREDUCE
697#  undef DIM_0d
698#  define DIM_1d
[13247]699#     define ROUTINE_ALLREDUCE           mppmax_a_real_sp
[10425]700#     include "mpp_allreduce_generic.h90"
701#     undef ROUTINE_ALLREDUCE
702#  undef DIM_1d
[13247]703#  undef SINGLE_PRECISION
704   !!
705   !!
706   !!   ----   DOUBLE PRECISION VERSIONS
707   !!
708!
709#  define DIM_0d
710#     define ROUTINE_ALLREDUCE           mppmax_real_dp
711#     include "mpp_allreduce_generic.h90"
712#     undef ROUTINE_ALLREDUCE
713#  undef DIM_0d
714#  define DIM_1d
715#     define ROUTINE_ALLREDUCE           mppmax_a_real_dp
716#     include "mpp_allreduce_generic.h90"
717#     undef ROUTINE_ALLREDUCE
718#  undef DIM_1d
[10425]719#  undef REAL_TYPE
720#  undef OPERATION_MAX
721   !!----------------------------------------------------------------------
722   !!    ***  mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real  ***
723   !!   
724   !!----------------------------------------------------------------------
725   !!
726#  define OPERATION_MIN
727#  define INTEGER_TYPE
728#  define DIM_0d
729#     define ROUTINE_ALLREDUCE           mppmin_int
730#     include "mpp_allreduce_generic.h90"
731#     undef ROUTINE_ALLREDUCE
732#  undef DIM_0d
733#  define DIM_1d
734#     define ROUTINE_ALLREDUCE           mppmin_a_int
735#     include "mpp_allreduce_generic.h90"
736#     undef ROUTINE_ALLREDUCE
737#  undef DIM_1d
738#  undef INTEGER_TYPE
739!
[13247]740   !!
741   !!   ----   SINGLE PRECISION VERSIONS
742   !!
743#  define SINGLE_PRECISION
[10425]744#  define REAL_TYPE
745#  define DIM_0d
[13247]746#     define ROUTINE_ALLREDUCE           mppmin_real_sp
[10425]747#     include "mpp_allreduce_generic.h90"
748#     undef ROUTINE_ALLREDUCE
749#  undef DIM_0d
750#  define DIM_1d
[13247]751#     define ROUTINE_ALLREDUCE           mppmin_a_real_sp
[10425]752#     include "mpp_allreduce_generic.h90"
753#     undef ROUTINE_ALLREDUCE
754#  undef DIM_1d
[13247]755#  undef SINGLE_PRECISION
756   !!
757   !!   ----   DOUBLE PRECISION VERSIONS
758   !!
759
760#  define DIM_0d
761#     define ROUTINE_ALLREDUCE           mppmin_real_dp
762#     include "mpp_allreduce_generic.h90"
763#     undef ROUTINE_ALLREDUCE
764#  undef DIM_0d
765#  define DIM_1d
766#     define ROUTINE_ALLREDUCE           mppmin_a_real_dp
767#     include "mpp_allreduce_generic.h90"
768#     undef ROUTINE_ALLREDUCE
769#  undef DIM_1d
[10425]770#  undef REAL_TYPE
771#  undef OPERATION_MIN
[869]772
[10425]773   !!----------------------------------------------------------------------
774   !!    ***  mppsum_a_int, mppsum_int, mppsum_a_real, mppsum_real  ***
775   !!   
776   !!   Global sum of 1D array or a variable (integer, real or complex)
777   !!----------------------------------------------------------------------
778   !!
779#  define OPERATION_SUM
780#  define INTEGER_TYPE
781#  define DIM_0d
782#     define ROUTINE_ALLREDUCE           mppsum_int
783#     include "mpp_allreduce_generic.h90"
784#     undef ROUTINE_ALLREDUCE
785#  undef DIM_0d
786#  define DIM_1d
787#     define ROUTINE_ALLREDUCE           mppsum_a_int
788#     include "mpp_allreduce_generic.h90"
789#     undef ROUTINE_ALLREDUCE
790#  undef DIM_1d
791#  undef INTEGER_TYPE
[13247]792
793   !!
794   !!   ----   SINGLE PRECISION VERSIONS
795   !!
796#  define OPERATION_SUM
797#  define SINGLE_PRECISION
[10425]798#  define REAL_TYPE
799#  define DIM_0d
[13247]800#     define ROUTINE_ALLREDUCE           mppsum_real_sp
[10425]801#     include "mpp_allreduce_generic.h90"
802#     undef ROUTINE_ALLREDUCE
803#  undef DIM_0d
804#  define DIM_1d
[13247]805#     define ROUTINE_ALLREDUCE           mppsum_a_real_sp
[10425]806#     include "mpp_allreduce_generic.h90"
807#     undef ROUTINE_ALLREDUCE
808#  undef DIM_1d
809#  undef REAL_TYPE
810#  undef OPERATION_SUM
811
[13247]812#  undef SINGLE_PRECISION
813
814   !!
815   !!   ----   DOUBLE PRECISION VERSIONS
816   !!
817#  define OPERATION_SUM
818#  define REAL_TYPE
819#  define DIM_0d
820#     define ROUTINE_ALLREDUCE           mppsum_real_dp
821#     include "mpp_allreduce_generic.h90"
822#     undef ROUTINE_ALLREDUCE
823#  undef DIM_0d
824#  define DIM_1d
825#     define ROUTINE_ALLREDUCE           mppsum_a_real_dp
826#     include "mpp_allreduce_generic.h90"
827#     undef ROUTINE_ALLREDUCE
828#  undef DIM_1d
829#  undef REAL_TYPE
830#  undef OPERATION_SUM
831
[10425]832#  define OPERATION_SUM_DD
833#  define COMPLEX_TYPE
834#  define DIM_0d
835#     define ROUTINE_ALLREDUCE           mppsum_realdd
836#     include "mpp_allreduce_generic.h90"
837#     undef ROUTINE_ALLREDUCE
838#  undef DIM_0d
839#  define DIM_1d
840#     define ROUTINE_ALLREDUCE           mppsum_a_realdd
841#     include "mpp_allreduce_generic.h90"
842#     undef ROUTINE_ALLREDUCE
843#  undef DIM_1d
844#  undef COMPLEX_TYPE
845#  undef OPERATION_SUM_DD
846
847   !!----------------------------------------------------------------------
848   !!    ***  mpp_minloc2d, mpp_minloc3d, mpp_maxloc2d, mpp_maxloc3d
849   !!   
850   !!----------------------------------------------------------------------
851   !!
[13247]852   !!
853   !!   ----   SINGLE PRECISION VERSIONS
854   !!
855#  define SINGLE_PRECISION
[10425]856#  define OPERATION_MINLOC
857#  define DIM_2d
[13247]858#     define ROUTINE_LOC           mpp_minloc2d_sp
[10425]859#     include "mpp_loc_generic.h90"
860#     undef ROUTINE_LOC
861#  undef DIM_2d
862#  define DIM_3d
[13247]863#     define ROUTINE_LOC           mpp_minloc3d_sp
[10425]864#     include "mpp_loc_generic.h90"
865#     undef ROUTINE_LOC
866#  undef DIM_3d
867#  undef OPERATION_MINLOC
868
869#  define OPERATION_MAXLOC
870#  define DIM_2d
[13247]871#     define ROUTINE_LOC           mpp_maxloc2d_sp
[10425]872#     include "mpp_loc_generic.h90"
873#     undef ROUTINE_LOC
874#  undef DIM_2d
875#  define DIM_3d
[13247]876#     define ROUTINE_LOC           mpp_maxloc3d_sp
[10425]877#     include "mpp_loc_generic.h90"
878#     undef ROUTINE_LOC
879#  undef DIM_3d
880#  undef OPERATION_MAXLOC
[13247]881#  undef SINGLE_PRECISION
882   !!
883   !!   ----   DOUBLE PRECISION VERSIONS
884   !!
885#  define OPERATION_MINLOC
886#  define DIM_2d
887#     define ROUTINE_LOC           mpp_minloc2d_dp
888#     include "mpp_loc_generic.h90"
889#     undef ROUTINE_LOC
890#  undef DIM_2d
891#  define DIM_3d
892#     define ROUTINE_LOC           mpp_minloc3d_dp
893#     include "mpp_loc_generic.h90"
894#     undef ROUTINE_LOC
895#  undef DIM_3d
896#  undef OPERATION_MINLOC
[10425]897
[13247]898#  define OPERATION_MAXLOC
899#  define DIM_2d
900#     define ROUTINE_LOC           mpp_maxloc2d_dp
901#     include "mpp_loc_generic.h90"
902#     undef ROUTINE_LOC
903#  undef DIM_2d
904#  define DIM_3d
905#     define ROUTINE_LOC           mpp_maxloc3d_dp
906#     include "mpp_loc_generic.h90"
907#     undef ROUTINE_LOC
908#  undef DIM_3d
909#  undef OPERATION_MAXLOC
910
911
[1344]912   SUBROUTINE mppsync()
913      !!----------------------------------------------------------------------
914      !!                  ***  routine mppsync  ***
[3764]915      !!
[1344]916      !! ** Purpose :   Massively parallel processors, synchroneous
917      !!
918      !!-----------------------------------------------------------------------
919      INTEGER :: ierror
920      !!-----------------------------------------------------------------------
921      !
[11536]922#if defined key_mpp_mpi
[9570]923      CALL mpi_barrier( mpi_comm_oce, ierror )
[11536]924#endif
[1344]925      !
926   END SUBROUTINE mppsync
[3]927
928
[11536]929   SUBROUTINE mppstop( ld_abort ) 
[1344]930      !!----------------------------------------------------------------------
931      !!                  ***  routine mppstop  ***
[3764]932      !!
[3294]933      !! ** purpose :   Stop massively parallel processors method
[1344]934      !!
935      !!----------------------------------------------------------------------
[11536]936      LOGICAL, OPTIONAL, INTENT(in) :: ld_abort    ! source process number
937      LOGICAL ::   ll_abort
[1344]938      INTEGER ::   info
939      !!----------------------------------------------------------------------
[11536]940      ll_abort = .FALSE.
941      IF( PRESENT(ld_abort) ) ll_abort = ld_abort
[1344]942      !
[11536]943#if defined key_mpp_mpi
944      IF(ll_abort) THEN
[10425]945         CALL mpi_abort( MPI_COMM_WORLD )
946      ELSE
947         CALL mppsync
948         CALL mpi_finalize( info )
949      ENDIF
[11536]950#endif
951      IF( ll_abort ) STOP 123
[1344]952      !
953   END SUBROUTINE mppstop
[3]954
955
[1344]956   SUBROUTINE mpp_comm_free( kcom )
957      !!----------------------------------------------------------------------
958      INTEGER, INTENT(in) ::   kcom
959      !!
960      INTEGER :: ierr
961      !!----------------------------------------------------------------------
962      !
[11536]963#if defined key_mpp_mpi
[1344]964      CALL MPI_COMM_FREE(kcom, ierr)
[11536]965#endif
[1344]966      !
967   END SUBROUTINE mpp_comm_free
[3]968
[869]969
[2715]970   SUBROUTINE mpp_ini_znl( kumout )
[1345]971      !!----------------------------------------------------------------------
972      !!               ***  routine mpp_ini_znl  ***
973      !!
974      !! ** Purpose :   Initialize special communicator for computing zonal sum
975      !!
976      !! ** Method  : - Look for processors in the same row
977      !!              - Put their number in nrank_znl
978      !!              - Create group for the znl processors
979      !!              - Create a communicator for znl processors
980      !!              - Determine if processor should write znl files
981      !!
982      !! ** output
983      !!      ndim_rank_znl = number of processors on the same row
984      !!      ngrp_znl = group ID for the znl processors
985      !!      ncomm_znl = communicator for the ice procs.
986      !!      n_znl_root = number (in the world) of proc 0 in the ice comm.
987      !!
988      !!----------------------------------------------------------------------
[2715]989      INTEGER, INTENT(in) ::   kumout   ! ocean.output logical units
[1345]990      !
[2715]991      INTEGER :: jproc      ! dummy loop integer
992      INTEGER :: ierr, ii   ! local integer
993      INTEGER, ALLOCATABLE, DIMENSION(:) ::   kwork
994      !!----------------------------------------------------------------------
[11536]995#if defined key_mpp_mpi
[1345]996      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_world     : ', ngrp_world
997      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - mpi_comm_world : ', mpi_comm_world
[9570]998      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - mpi_comm_oce   : ', mpi_comm_oce
[1345]999      !
[2715]1000      ALLOCATE( kwork(jpnij), STAT=ierr )
[11536]1001      IF( ierr /= 0 ) CALL ctl_stop( 'STOP', 'mpp_ini_znl : failed to allocate 1D array of length jpnij')
[2715]1002
1003      IF( jpnj == 1 ) THEN
[1345]1004         ngrp_znl  = ngrp_world
[9570]1005         ncomm_znl = mpi_comm_oce
[1345]1006      ELSE
1007         !
[9570]1008         CALL MPI_ALLGATHER ( njmpp, 1, mpi_integer, kwork, 1, mpi_integer, mpi_comm_oce, ierr )
[1345]1009         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - kwork pour njmpp : ', kwork
1010         !-$$        CALL flush(numout)
1011         !
1012         ! Count number of processors on the same row
1013         ndim_rank_znl = 0
1014         DO jproc=1,jpnij
1015            IF ( kwork(jproc) == njmpp ) THEN
1016               ndim_rank_znl = ndim_rank_znl + 1
1017            ENDIF
1018         END DO
1019         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ndim_rank_znl : ', ndim_rank_znl
1020         !-$$        CALL flush(numout)
1021         ! Allocate the right size to nrank_znl
[1441]1022         IF (ALLOCATED (nrank_znl)) DEALLOCATE(nrank_znl)
[1345]1023         ALLOCATE(nrank_znl(ndim_rank_znl))
[3764]1024         ii = 0
[1345]1025         nrank_znl (:) = 0
1026         DO jproc=1,jpnij
1027            IF ( kwork(jproc) == njmpp) THEN
1028               ii = ii + 1
[3764]1029               nrank_znl(ii) = jproc -1
[1345]1030            ENDIF
1031         END DO
1032         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - nrank_znl : ', nrank_znl
1033         !-$$        CALL flush(numout)
1034
1035         ! Create the opa group
[9570]1036         CALL MPI_COMM_GROUP(mpi_comm_oce,ngrp_opa,ierr)
[1345]1037         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_opa : ', ngrp_opa
1038         !-$$        CALL flush(numout)
1039
1040         ! Create the znl group from the opa group
1041         CALL MPI_GROUP_INCL  ( ngrp_opa, ndim_rank_znl, nrank_znl, ngrp_znl, ierr )
1042         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_znl ', ngrp_znl
1043         !-$$        CALL flush(numout)
1044
1045         ! Create the znl communicator from the opa communicator, ie the pool of procs in the same row
[9570]1046         CALL MPI_COMM_CREATE ( mpi_comm_oce, ngrp_znl, ncomm_znl, ierr )
[1345]1047         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ncomm_znl ', ncomm_znl
1048         !-$$        CALL flush(numout)
1049         !
1050      END IF
1051
1052      ! Determines if processor if the first (starting from i=1) on the row
[3764]1053      IF ( jpni == 1 ) THEN
[1345]1054         l_znl_root = .TRUE.
1055      ELSE
1056         l_znl_root = .FALSE.
1057         kwork (1) = nimpp
[10425]1058         CALL mpp_min ( 'lib_mpp', kwork(1), kcom = ncomm_znl)
[1345]1059         IF ( nimpp == kwork(1)) l_znl_root = .TRUE.
1060      END IF
1061
[2715]1062      DEALLOCATE(kwork)
[11536]1063#endif
[2715]1064
[1345]1065   END SUBROUTINE mpp_ini_znl
1066
1067
[1344]1068   SUBROUTINE mpp_ini_north
1069      !!----------------------------------------------------------------------
1070      !!               ***  routine mpp_ini_north  ***
1071      !!
[3764]1072      !! ** Purpose :   Initialize special communicator for north folding
[1344]1073      !!      condition together with global variables needed in the mpp folding
1074      !!
1075      !! ** Method  : - Look for northern processors
1076      !!              - Put their number in nrank_north
1077      !!              - Create groups for the world processors and the north processors
1078      !!              - Create a communicator for northern processors
1079      !!
1080      !! ** output
1081      !!      njmppmax = njmpp for northern procs
1082      !!      ndim_rank_north = number of processors in the northern line
1083      !!      nrank_north (ndim_rank_north) = number  of the northern procs.
1084      !!      ngrp_world = group ID for the world processors
1085      !!      ngrp_north = group ID for the northern processors
1086      !!      ncomm_north = communicator for the northern procs.
1087      !!      north_root = number (in the world) of proc 0 in the northern comm.
1088      !!
1089      !!----------------------------------------------------------------------
1090      INTEGER ::   ierr
1091      INTEGER ::   jjproc
1092      INTEGER ::   ii, ji
1093      !!----------------------------------------------------------------------
1094      !
[11536]1095#if defined key_mpp_mpi
[1344]1096      njmppmax = MAXVAL( njmppt )
1097      !
1098      ! Look for how many procs on the northern boundary
1099      ndim_rank_north = 0
1100      DO jjproc = 1, jpnij
1101         IF( njmppt(jjproc) == njmppmax )   ndim_rank_north = ndim_rank_north + 1
1102      END DO
1103      !
1104      ! Allocate the right size to nrank_north
[1441]1105      IF (ALLOCATED (nrank_north)) DEALLOCATE(nrank_north)
[1344]1106      ALLOCATE( nrank_north(ndim_rank_north) )
[869]1107
[1344]1108      ! Fill the nrank_north array with proc. number of northern procs.
1109      ! Note : the rank start at 0 in MPI
1110      ii = 0
1111      DO ji = 1, jpnij
1112         IF ( njmppt(ji) == njmppmax   ) THEN
1113            ii=ii+1
1114            nrank_north(ii)=ji-1
1115         END IF
1116      END DO
1117      !
1118      ! create the world group
[9570]1119      CALL MPI_COMM_GROUP( mpi_comm_oce, ngrp_world, ierr )
[1344]1120      !
1121      ! Create the North group from the world group
1122      CALL MPI_GROUP_INCL( ngrp_world, ndim_rank_north, nrank_north, ngrp_north, ierr )
1123      !
1124      ! Create the North communicator , ie the pool of procs in the north group
[9570]1125      CALL MPI_COMM_CREATE( mpi_comm_oce, ngrp_north, ncomm_north, ierr )
[1344]1126      !
[11536]1127#endif
[1344]1128   END SUBROUTINE mpp_ini_north
[869]1129
1130
[9019]1131   SUBROUTINE DDPDD_MPI( ydda, yddb, ilen, itype )
[1976]1132      !!---------------------------------------------------------------------
1133      !!   Routine DDPDD_MPI: used by reduction operator MPI_SUMDD
1134      !!
1135      !!   Modification of original codes written by David H. Bailey
1136      !!   This subroutine computes yddb(i) = ydda(i)+yddb(i)
1137      !!---------------------------------------------------------------------
[9019]1138      INTEGER                     , INTENT(in)    ::   ilen, itype
[13247]1139      COMPLEX(dp), DIMENSION(ilen), INTENT(in)    ::   ydda
1140      COMPLEX(dp), DIMENSION(ilen), INTENT(inout) ::   yddb
[1976]1141      !
[13247]1142      REAL(dp) :: zerr, zt1, zt2    ! local work variables
[9019]1143      INTEGER  :: ji, ztmp           ! local scalar
1144      !!---------------------------------------------------------------------
1145      !
[1976]1146      ztmp = itype   ! avoid compilation warning
[9019]1147      !
[1976]1148      DO ji=1,ilen
1149      ! Compute ydda + yddb using Knuth's trick.
1150         zt1  = real(ydda(ji)) + real(yddb(ji))
1151         zerr = zt1 - real(ydda(ji))
1152         zt2  = ((real(yddb(ji)) - zerr) + (real(ydda(ji)) - (zt1 - zerr))) &
1153                + aimag(ydda(ji)) + aimag(yddb(ji))
1154
1155         ! The result is zt1 + zt2, after normalization.
1156         yddb(ji) = cmplx ( zt1 + zt2, zt2 - ((zt1 + zt2) - zt1),wp )
1157      END DO
[9019]1158      !
[1976]1159   END SUBROUTINE DDPDD_MPI
1160
[6140]1161
[10437]1162   SUBROUTINE mpp_report( cdname, kpk, kpl, kpf, ld_lbc, ld_glb, ld_dlg )
[10425]1163      !!----------------------------------------------------------------------
1164      !!                  ***  routine mpp_report  ***
1165      !!
1166      !! ** Purpose :   report use of mpp routines per time-setp
1167      !!
1168      !!----------------------------------------------------------------------
1169      CHARACTER(len=*),           INTENT(in   ) ::   cdname      ! name of the calling subroutine
1170      INTEGER         , OPTIONAL, INTENT(in   ) ::   kpk, kpl, kpf
[10437]1171      LOGICAL         , OPTIONAL, INTENT(in   ) ::   ld_lbc, ld_glb, ld_dlg
[10425]1172      !!
[10982]1173      CHARACTER(len=128)                        ::   ccountname  ! name of a subroutine to count communications
[10437]1174      LOGICAL ::   ll_lbc, ll_glb, ll_dlg
[10982]1175      INTEGER ::    ji,  jj,  jk,  jh, jf, jcount   ! dummy loop indices
[10425]1176      !!----------------------------------------------------------------------
[11536]1177#if defined key_mpp_mpi
[10425]1178      !
1179      ll_lbc = .FALSE.
1180      IF( PRESENT(ld_lbc) ) ll_lbc = ld_lbc
1181      ll_glb = .FALSE.
1182      IF( PRESENT(ld_glb) ) ll_glb = ld_glb
[10437]1183      ll_dlg = .FALSE.
1184      IF( PRESENT(ld_dlg) ) ll_dlg = ld_dlg
[10425]1185      !
1186      ! find the smallest common frequency: default = frequency product, if multiple, choose the larger of the 2 frequency
1187      ncom_freq = ncom_fsbc
1188      !
1189      IF ( ncom_stp == nit000+ncom_freq ) THEN   ! avoid to count extra communications in potential initializations at nit000
1190         IF( ll_lbc ) THEN
1191            IF( .NOT. ALLOCATED(ncomm_sequence) ) ALLOCATE( ncomm_sequence(ncom_rec_max,2) )
1192            IF( .NOT. ALLOCATED(    crname_lbc) ) ALLOCATE(     crname_lbc(ncom_rec_max  ) )
1193            n_sequence_lbc = n_sequence_lbc + 1
1194            IF( n_sequence_lbc > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
1195            crname_lbc(n_sequence_lbc) = cdname     ! keep the name of the calling routine
1196            ncomm_sequence(n_sequence_lbc,1) = kpk*kpl   ! size of 3rd and 4th dimensions
1197            ncomm_sequence(n_sequence_lbc,2) = kpf       ! number of arrays to be treated (multi)
1198         ENDIF
1199         IF( ll_glb ) THEN
1200            IF( .NOT. ALLOCATED(crname_glb) ) ALLOCATE( crname_glb(ncom_rec_max) )
1201            n_sequence_glb = n_sequence_glb + 1
1202            IF( n_sequence_glb > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
1203            crname_glb(n_sequence_glb) = cdname     ! keep the name of the calling routine
1204         ENDIF
[10437]1205         IF( ll_dlg ) THEN
1206            IF( .NOT. ALLOCATED(crname_dlg) ) ALLOCATE( crname_dlg(ncom_rec_max) )
1207            n_sequence_dlg = n_sequence_dlg + 1
1208            IF( n_sequence_dlg > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
1209            crname_dlg(n_sequence_dlg) = cdname     ! keep the name of the calling routine
1210         ENDIF
[10425]1211      ELSE IF ( ncom_stp == nit000+2*ncom_freq ) THEN
1212         CALL ctl_opn( numcom, 'communication_report.txt', 'REPLACE', 'FORMATTED', 'SEQUENTIAL', -1, numout, .FALSE., narea )
1213         WRITE(numcom,*) ' '
1214         WRITE(numcom,*) ' ------------------------------------------------------------'
1215         WRITE(numcom,*) ' Communication pattern report (second oce+sbc+top time step):'
1216         WRITE(numcom,*) ' ------------------------------------------------------------'
1217         WRITE(numcom,*) ' '
1218         WRITE(numcom,'(A,I4)') ' Exchanged halos : ', n_sequence_lbc
1219         jj = 0; jk = 0; jf = 0; jh = 0
1220         DO ji = 1, n_sequence_lbc
1221            IF ( ncomm_sequence(ji,1) .GT. 1 ) jk = jk + 1
1222            IF ( ncomm_sequence(ji,2) .GT. 1 ) jf = jf + 1
1223            IF ( ncomm_sequence(ji,1) .GT. 1 .AND. ncomm_sequence(ji,2) .GT. 1 ) jj = jj + 1
1224            jh = MAX (jh, ncomm_sequence(ji,1)*ncomm_sequence(ji,2))
1225         END DO
1226         WRITE(numcom,'(A,I3)') ' 3D Exchanged halos : ', jk
1227         WRITE(numcom,'(A,I3)') ' Multi arrays exchanged halos : ', jf
1228         WRITE(numcom,'(A,I3)') '   from which 3D : ', jj
1229         WRITE(numcom,'(A,I10)') ' Array max size : ', jh*jpi*jpj
1230         WRITE(numcom,*) ' '
1231         WRITE(numcom,*) ' lbc_lnk called'
[10982]1232         DO ji = 1, n_sequence_lbc - 1
1233            IF ( crname_lbc(ji) /= 'already counted' ) THEN
1234               ccountname = crname_lbc(ji)
1235               crname_lbc(ji) = 'already counted'
1236               jcount = 1
1237               DO jj = ji + 1, n_sequence_lbc
1238                  IF ( ccountname ==  crname_lbc(jj) ) THEN
1239                     jcount = jcount + 1
1240                     crname_lbc(jj) = 'already counted'
1241                  END IF
1242               END DO
1243               WRITE(numcom,'(A, I4, A, A)') ' - ', jcount,' times by subroutine ', TRIM(ccountname)
[10425]1244            END IF
1245         END DO
[10982]1246         IF ( crname_lbc(n_sequence_lbc) /= 'already counted' ) THEN
1247            WRITE(numcom,'(A, I4, A, A)') ' - ', 1,' times by subroutine ', TRIM(crname_lbc(ncom_rec_max))
1248         END IF
[10425]1249         WRITE(numcom,*) ' '
1250         IF ( n_sequence_glb > 0 ) THEN
1251            WRITE(numcom,'(A,I4)') ' Global communications : ', n_sequence_glb
1252            jj = 1
1253            DO ji = 2, n_sequence_glb
1254               IF( crname_glb(ji-1) /= crname_glb(ji) ) THEN
1255                  WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_glb(ji-1))
1256                  jj = 0
1257               END IF
1258               jj = jj + 1 
1259            END DO
1260            WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_glb(n_sequence_glb))
1261            DEALLOCATE(crname_glb)
1262         ELSE
1263            WRITE(numcom,*) ' No MPI global communication '
1264         ENDIF
1265         WRITE(numcom,*) ' '
[10437]1266         IF ( n_sequence_dlg > 0 ) THEN
1267            WRITE(numcom,'(A,I4)') ' Delayed global communications : ', n_sequence_dlg
1268            jj = 1
1269            DO ji = 2, n_sequence_dlg
1270               IF( crname_dlg(ji-1) /= crname_dlg(ji) ) THEN
1271                  WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_dlg(ji-1))
1272                  jj = 0
1273               END IF
1274               jj = jj + 1 
1275            END DO
1276            WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_dlg(n_sequence_dlg))
1277            DEALLOCATE(crname_dlg)
1278         ELSE
1279            WRITE(numcom,*) ' No MPI delayed global communication '
1280         ENDIF
1281         WRITE(numcom,*) ' '
[10425]1282         WRITE(numcom,*) ' -----------------------------------------------'
1283         WRITE(numcom,*) ' '
1284         DEALLOCATE(ncomm_sequence)
1285         DEALLOCATE(crname_lbc)
1286      ENDIF
[11536]1287#endif
[10425]1288   END SUBROUTINE mpp_report
1289
[6140]1290   
[10425]1291   SUBROUTINE tic_tac (ld_tic, ld_global)
1292
1293    LOGICAL,           INTENT(IN) :: ld_tic
1294    LOGICAL, OPTIONAL, INTENT(IN) :: ld_global
[13247]1295    REAL(dp), DIMENSION(2), SAVE :: tic_wt
1296    REAL(dp),               SAVE :: tic_ct = 0._dp
[10425]1297    INTEGER :: ii
[11536]1298#if defined key_mpp_mpi
[10425]1299
1300    IF( ncom_stp <= nit000 ) RETURN
1301    IF( ncom_stp == nitend ) RETURN
1302    ii = 1
1303    IF( PRESENT( ld_global ) ) THEN
1304       IF( ld_global ) ii = 2
1305    END IF
1306   
1307    IF ( ld_tic ) THEN
1308       tic_wt(ii) = MPI_Wtime()                                                    ! start count tic->tac (waiting time)
[13247]1309       IF ( tic_ct > 0.0_dp ) compute_time = compute_time + MPI_Wtime() - tic_ct   ! cumulate count tac->tic
[10425]1310    ELSE
1311       waiting_time(ii) = waiting_time(ii) + MPI_Wtime() - tic_wt(ii)              ! cumulate count tic->tac
1312       tic_ct = MPI_Wtime()                                                        ! start count tac->tic (waiting time)
1313    ENDIF
[11536]1314#endif
[10425]1315   
1316   END SUBROUTINE tic_tac
1317
[11536]1318#if ! defined key_mpp_mpi
1319   SUBROUTINE mpi_wait(request, status, ierror)
1320      INTEGER                            , INTENT(in   ) ::   request
1321      INTEGER, DIMENSION(MPI_STATUS_SIZE), INTENT(  out) ::   status
1322      INTEGER                            , INTENT(  out) ::   ierror
1323   END SUBROUTINE mpi_wait
[1976]1324
[10425]1325   
[11536]1326   FUNCTION MPI_Wtime()
1327      REAL(wp) ::  MPI_Wtime
1328      MPI_Wtime = -1.
1329   END FUNCTION MPI_Wtime
[3]1330#endif
[2715]1331
[13]1332   !!----------------------------------------------------------------------
[12377]1333   !!   ctl_stop, ctl_warn, get_unit, ctl_opn, ctl_nam, load_nml   routines
[2715]1334   !!----------------------------------------------------------------------
1335
1336   SUBROUTINE ctl_stop( cd1, cd2, cd3, cd4, cd5 ,   &
1337      &                 cd6, cd7, cd8, cd9, cd10 )
1338      !!----------------------------------------------------------------------
1339      !!                  ***  ROUTINE  stop_opa  ***
1340      !!
[3764]1341      !! ** Purpose :   print in ocean.outpput file a error message and
[2715]1342      !!                increment the error number (nstop) by one.
1343      !!----------------------------------------------------------------------
[11536]1344      CHARACTER(len=*), INTENT(in   )           ::   cd1
1345      CHARACTER(len=*), INTENT(in   ), OPTIONAL ::        cd2, cd3, cd4, cd5
1346      CHARACTER(len=*), INTENT(in   ), OPTIONAL ::   cd6, cd7, cd8, cd9, cd10
[12939]1347      !
1348      CHARACTER(LEN=8) ::   clfmt            ! writing format
[13015]1349      INTEGER          ::   inum
[2715]1350      !!----------------------------------------------------------------------
1351      !
[3764]1352      nstop = nstop + 1
[11536]1353      !
[13015]1354      IF( cd1 == 'STOP' .AND. narea /= 1 ) THEN    ! Immediate stop: add an arror message in 'ocean.output' file
1355         CALL ctl_opn( inum, 'ocean.output', 'APPEND', 'FORMATTED', 'SEQUENTIAL', -1, 6, .FALSE. )
1356         WRITE(inum,*)
1357         WRITE(inum,*) ' ==>>>   Look for "E R R O R" messages in all existing *ocean.output* files'
1358         CLOSE(inum)
[12939]1359      ENDIF
[13015]1360      IF( numout == 6 ) THEN                       ! force to open ocean.output file if not already opened
1361         CALL ctl_opn( numout, 'ocean.output', 'REPLACE', 'FORMATTED', 'SEQUENTIAL', -1, -1, .FALSE., narea )
1362      ENDIF
[11536]1363      !
1364                            WRITE(numout,*)
1365                            WRITE(numout,*) ' ===>>> : E R R O R'
1366                            WRITE(numout,*)
1367                            WRITE(numout,*) '         ==========='
1368                            WRITE(numout,*)
1369                            WRITE(numout,*) TRIM(cd1)
[10425]1370      IF( PRESENT(cd2 ) )   WRITE(numout,*) TRIM(cd2)
1371      IF( PRESENT(cd3 ) )   WRITE(numout,*) TRIM(cd3)
1372      IF( PRESENT(cd4 ) )   WRITE(numout,*) TRIM(cd4)
1373      IF( PRESENT(cd5 ) )   WRITE(numout,*) TRIM(cd5)
1374      IF( PRESENT(cd6 ) )   WRITE(numout,*) TRIM(cd6)
1375      IF( PRESENT(cd7 ) )   WRITE(numout,*) TRIM(cd7)
1376      IF( PRESENT(cd8 ) )   WRITE(numout,*) TRIM(cd8)
1377      IF( PRESENT(cd9 ) )   WRITE(numout,*) TRIM(cd9)
1378      IF( PRESENT(cd10) )   WRITE(numout,*) TRIM(cd10)
[11536]1379                            WRITE(numout,*)
1380      !
[2715]1381                               CALL FLUSH(numout    )
1382      IF( numstp     /= -1 )   CALL FLUSH(numstp    )
[9019]1383      IF( numrun     /= -1 )   CALL FLUSH(numrun    )
[2715]1384      IF( numevo_ice /= -1 )   CALL FLUSH(numevo_ice)
1385      !
1386      IF( cd1 == 'STOP' ) THEN
[11536]1387         WRITE(numout,*) 
[10425]1388         WRITE(numout,*)  'huge E-R-R-O-R : immediate stop'
[11536]1389         WRITE(numout,*) 
[12939]1390         CALL FLUSH(numout)
1391         CALL SLEEP(60)   ! make sure that all output and abort files are written by all cores. 60s should be enough...
[11536]1392         CALL mppstop( ld_abort = .true. )
[2715]1393      ENDIF
1394      !
1395   END SUBROUTINE ctl_stop
1396
1397
1398   SUBROUTINE ctl_warn( cd1, cd2, cd3, cd4, cd5,   &
1399      &                 cd6, cd7, cd8, cd9, cd10 )
1400      !!----------------------------------------------------------------------
1401      !!                  ***  ROUTINE  stop_warn  ***
1402      !!
[3764]1403      !! ** Purpose :   print in ocean.outpput file a error message and
[2715]1404      !!                increment the warning number (nwarn) by one.
1405      !!----------------------------------------------------------------------
1406      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd1, cd2, cd3, cd4, cd5
1407      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd6, cd7, cd8, cd9, cd10
1408      !!----------------------------------------------------------------------
[3764]1409      !
1410      nwarn = nwarn + 1
[11536]1411      !
[2715]1412      IF(lwp) THEN
[11536]1413                               WRITE(numout,*)
1414                               WRITE(numout,*) ' ===>>> : W A R N I N G'
1415                               WRITE(numout,*)
1416                               WRITE(numout,*) '         ==============='
1417                               WRITE(numout,*)
1418         IF( PRESENT(cd1 ) )   WRITE(numout,*) TRIM(cd1)
1419         IF( PRESENT(cd2 ) )   WRITE(numout,*) TRIM(cd2)
1420         IF( PRESENT(cd3 ) )   WRITE(numout,*) TRIM(cd3)
1421         IF( PRESENT(cd4 ) )   WRITE(numout,*) TRIM(cd4)
1422         IF( PRESENT(cd5 ) )   WRITE(numout,*) TRIM(cd5)
1423         IF( PRESENT(cd6 ) )   WRITE(numout,*) TRIM(cd6)
1424         IF( PRESENT(cd7 ) )   WRITE(numout,*) TRIM(cd7)
1425         IF( PRESENT(cd8 ) )   WRITE(numout,*) TRIM(cd8)
1426         IF( PRESENT(cd9 ) )   WRITE(numout,*) TRIM(cd9)
1427         IF( PRESENT(cd10) )   WRITE(numout,*) TRIM(cd10)
1428                               WRITE(numout,*)
[2715]1429      ENDIF
1430      CALL FLUSH(numout)
1431      !
1432   END SUBROUTINE ctl_warn
1433
1434
1435   SUBROUTINE ctl_opn( knum, cdfile, cdstat, cdform, cdacce, klengh, kout, ldwp, karea )
1436      !!----------------------------------------------------------------------
1437      !!                  ***  ROUTINE ctl_opn  ***
1438      !!
1439      !! ** Purpose :   Open file and check if required file is available.
1440      !!
1441      !! ** Method  :   Fortan open
1442      !!----------------------------------------------------------------------
1443      INTEGER          , INTENT(  out) ::   knum      ! logical unit to open
1444      CHARACTER(len=*) , INTENT(in   ) ::   cdfile    ! file name to open
1445      CHARACTER(len=*) , INTENT(in   ) ::   cdstat    ! disposition specifier
1446      CHARACTER(len=*) , INTENT(in   ) ::   cdform    ! formatting specifier
1447      CHARACTER(len=*) , INTENT(in   ) ::   cdacce    ! access specifier
1448      INTEGER          , INTENT(in   ) ::   klengh    ! record length
1449      INTEGER          , INTENT(in   ) ::   kout      ! number of logical units for write
1450      LOGICAL          , INTENT(in   ) ::   ldwp      ! boolean term for print
1451      INTEGER, OPTIONAL, INTENT(in   ) ::   karea     ! proc number
[5836]1452      !
[2715]1453      CHARACTER(len=80) ::   clfile
[12939]1454      CHARACTER(LEN=10) ::   clfmt            ! writing format
[2715]1455      INTEGER           ::   iost
[13124]1456      INTEGER           ::   idg              ! number of digits
[2715]1457      !!----------------------------------------------------------------------
[5836]1458      !
[2715]1459      ! adapt filename
1460      ! ----------------
1461      clfile = TRIM(cdfile)
1462      IF( PRESENT( karea ) ) THEN
[12939]1463         IF( karea > 1 ) THEN
[13015]1464            ! Warning: jpnij is maybe not already defined when calling ctl_opn -> use mppsize instead of jpnij
1465            idg = MAX( INT(LOG10(REAL(MAX(1,mppsize-1),wp))) + 1, 4 )      ! how many digits to we need to write? min=4, max=9
1466            WRITE(clfmt, "('(a,a,i', i1, '.', i1, ')')") idg, idg          ! '(a,a,ix.x)'
[12939]1467            WRITE(clfile, clfmt) TRIM(clfile), '_', karea-1
1468         ENDIF
[2715]1469      ENDIF
1470#if defined key_agrif
1471      IF( .NOT. Agrif_Root() )   clfile = TRIM(Agrif_CFixed())//'_'//TRIM(clfile)
1472      knum=Agrif_Get_Unit()
1473#else
1474      knum=get_unit()
1475#endif
[10425]1476      IF( TRIM(cdfile) == '/dev/null' )   clfile = TRIM(cdfile)   ! force the use of /dev/null
[5836]1477      !
[11536]1478      IF(       cdacce(1:6) == 'DIRECT' )  THEN   ! cdacce has always more than 6 characters
[10425]1479         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS=cdstat, RECL=klengh         , ERR=100, IOSTAT=iost )
1480      ELSE IF( TRIM(cdstat) == 'APPEND' )  THEN   ! cdstat can have less than 6 characters
1481         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS='UNKNOWN', POSITION='APPEND', ERR=100, IOSTAT=iost )
[2715]1482      ELSE
[10425]1483         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS=cdstat                      , ERR=100, IOSTAT=iost )
[2715]1484      ENDIF
[10425]1485      IF( iost /= 0 .AND. TRIM(clfile) == '/dev/null' ) &   ! for windows
1486         &  OPEN(UNIT=knum,FILE='NUL', FORM=cdform, ACCESS=cdacce, STATUS=cdstat                      , ERR=100, IOSTAT=iost )   
[2715]1487      IF( iost == 0 ) THEN
[12377]1488         IF(ldwp .AND. kout > 0) THEN
[10425]1489            WRITE(kout,*) '     file   : ', TRIM(clfile),' open ok'
[2715]1490            WRITE(kout,*) '     unit   = ', knum
1491            WRITE(kout,*) '     status = ', cdstat
1492            WRITE(kout,*) '     form   = ', cdform
1493            WRITE(kout,*) '     access = ', cdacce
1494            WRITE(kout,*)
1495         ENDIF
1496      ENDIF
1497100   CONTINUE
1498      IF( iost /= 0 ) THEN
[11536]1499         WRITE(ctmp1,*) ' ===>>>> : bad opening file: ', TRIM(clfile)
1500         WRITE(ctmp2,*) ' =======   ===  '
1501         WRITE(ctmp3,*) '           unit   = ', knum
1502         WRITE(ctmp4,*) '           status = ', cdstat
1503         WRITE(ctmp5,*) '           form   = ', cdform
1504         WRITE(ctmp6,*) '           access = ', cdacce
1505         WRITE(ctmp7,*) '           iostat = ', iost
1506         WRITE(ctmp8,*) '           we stop. verify the file '
1507         CALL ctl_stop( 'STOP', ctmp1, ctmp2, ctmp3, ctmp4, ctmp5, ctmp6, ctmp7, ctmp8 )
[2715]1508      ENDIF
[5836]1509      !
[2715]1510   END SUBROUTINE ctl_opn
1511
[5836]1512
[11536]1513   SUBROUTINE ctl_nam ( kios, cdnam )
[4147]1514      !!----------------------------------------------------------------------
1515      !!                  ***  ROUTINE ctl_nam  ***
1516      !!
1517      !! ** Purpose :   Informations when error while reading a namelist
1518      !!
1519      !! ** Method  :   Fortan open
1520      !!----------------------------------------------------------------------
[11536]1521      INTEGER                                , INTENT(inout) ::   kios    ! IO status after reading the namelist
1522      CHARACTER(len=*)                       , INTENT(in   ) ::   cdnam   ! group name of namelist for which error occurs
1523      !
1524      CHARACTER(len=5) ::   clios   ! string to convert iostat in character for print
[4147]1525      !!----------------------------------------------------------------------
[5836]1526      !
[7646]1527      WRITE (clios, '(I5.0)')   kios
[4147]1528      IF( kios < 0 ) THEN         
[5836]1529         CALL ctl_warn( 'end of record or file while reading namelist '   &
1530            &           // TRIM(cdnam) // ' iostat = ' // TRIM(clios) )
[4147]1531      ENDIF
[5836]1532      !
[4147]1533      IF( kios > 0 ) THEN
[5836]1534         CALL ctl_stop( 'misspelled variable in namelist '   &
1535            &           // TRIM(cdnam) // ' iostat = ' // TRIM(clios) )
[4147]1536      ENDIF
1537      kios = 0
[5836]1538      !
[4147]1539   END SUBROUTINE ctl_nam
1540
[5836]1541
[2715]1542   INTEGER FUNCTION get_unit()
1543      !!----------------------------------------------------------------------
1544      !!                  ***  FUNCTION  get_unit  ***
1545      !!
1546      !! ** Purpose :   return the index of an unused logical unit
1547      !!----------------------------------------------------------------------
[3764]1548      LOGICAL :: llopn
[2715]1549      !!----------------------------------------------------------------------
1550      !
1551      get_unit = 15   ! choose a unit that is big enough then it is not already used in NEMO
1552      llopn = .TRUE.
1553      DO WHILE( (get_unit < 998) .AND. llopn )
1554         get_unit = get_unit + 1
1555         INQUIRE( unit = get_unit, opened = llopn )
1556      END DO
1557      IF( (get_unit == 999) .AND. llopn ) THEN
[11536]1558         CALL ctl_stop( 'STOP', 'get_unit: All logical units until 999 are used...' )
[2715]1559      ENDIF
1560      !
1561   END FUNCTION get_unit
1562
[12377]1563   SUBROUTINE load_nml( cdnambuff , cdnamfile, kout, ldwp)
1564      CHARACTER(LEN=:)    , ALLOCATABLE, INTENT(INOUT) :: cdnambuff
1565      CHARACTER(LEN=*), INTENT(IN )                :: cdnamfile
1566      CHARACTER(LEN=256)                           :: chline
1567      CHARACTER(LEN=1)                             :: csp
1568      INTEGER, INTENT(IN)                          :: kout
1569      LOGICAL, INTENT(IN)                          :: ldwp  !: .true. only for the root broadcaster
1570      INTEGER                                      :: itot, iun, iltc, inl, ios, itotsav
1571      !
1572      !csp = NEW_LINE('A')
1573      ! a new line character is the best seperator but some systems (e.g.Cray)
1574      ! seem to terminate namelist reads from internal files early if they
1575      ! encounter new-lines. Use a single space for safety.
1576      csp = ' '
1577      !
1578      ! Check if the namelist buffer has already been allocated. Return if it has.
1579      !
1580      IF ( ALLOCATED( cdnambuff ) ) RETURN
1581      IF( ldwp ) THEN
1582         !
1583         ! Open namelist file
1584         !
1585         CALL ctl_opn( iun, cdnamfile, 'OLD', 'FORMATTED', 'SEQUENTIAL', -1, kout, ldwp )
1586         !
1587         ! First pass: count characters excluding comments and trimable white space
1588         !
1589         itot=0
1590     10  READ(iun,'(A256)',END=20,ERR=20) chline
1591         iltc = LEN_TRIM(chline)
1592         IF ( iltc.GT.0 ) THEN
1593          inl = INDEX(chline, '!') 
1594          IF( inl.eq.0 ) THEN
1595           itot = itot + iltc + 1                                ! +1 for the newline character
1596          ELSEIF( inl.GT.0 .AND. LEN_TRIM( chline(1:inl-1) ).GT.0 ) THEN
1597           itot = itot + inl                                  !  includes +1 for the newline character
1598          ENDIF
1599         ENDIF
1600         GOTO 10
1601     20  CONTINUE
1602         !
1603         ! Allocate text cdnambuff for condensed namelist
1604         !
1605!$AGRIF_DO_NOT_TREAT
1606         ALLOCATE( CHARACTER(LEN=itot) :: cdnambuff )
1607!$AGRIF_END_DO_NOT_TREAT
1608         itotsav = itot
1609         !
1610         ! Second pass: read and transfer pruned characters into cdnambuff
1611         !
1612         REWIND(iun)
1613         itot=1
1614     30  READ(iun,'(A256)',END=40,ERR=40) chline
1615         iltc = LEN_TRIM(chline)
1616         IF ( iltc.GT.0 ) THEN
1617          inl = INDEX(chline, '!')
1618          IF( inl.eq.0 ) THEN
1619           inl = iltc
1620          ELSE
1621           inl = inl - 1
1622          ENDIF
1623          IF( inl.GT.0 .AND. LEN_TRIM( chline(1:inl) ).GT.0 ) THEN
1624             cdnambuff(itot:itot+inl-1) = chline(1:inl)
1625             WRITE( cdnambuff(itot+inl:itot+inl), '(a)' ) csp
1626             itot = itot + inl + 1
1627          ENDIF
1628         ENDIF
1629         GOTO 30
1630     40  CONTINUE
1631         itot = itot - 1
1632         IF( itotsav .NE. itot ) WRITE(*,*) 'WARNING in load_nml. Allocated ',itotsav,' for read buffer; but used ',itot
1633         !
1634         ! Close namelist file
1635         !
1636         CLOSE(iun)
1637         !write(*,'(32A)') cdnambuff
1638      ENDIF
1639#if defined key_mpp_mpi
1640      CALL mpp_bcast_nml( cdnambuff, itot )
1641#endif
1642  END SUBROUTINE load_nml
1643
1644
[2715]1645   !!----------------------------------------------------------------------
[3]1646END MODULE lib_mpp
Note: See TracBrowser for help on using the repository browser.