New URL for NEMO forge!   http://forge.nemo-ocean.eu

Since March 2022 along with NEMO 4.2 release, the code development moved to a self-hosted GitLab.
This present forge is now archived and remained online for history.
lib_mpp.F90 in NEMO/trunk/src/OCE/LBC – NEMO

source: NEMO/trunk/src/OCE/LBC/lib_mpp.F90 @ 13438

Last change on this file since 13438 was 13438, checked in by smasson, 4 years ago

trunk: bugfix to compile and run the code without key_mpp_mpi, see #2495

  • Property svn:keywords set to Id
File size: 69.2 KB
RevLine 
[3]1MODULE lib_mpp
[13]2   !!======================================================================
3   !!                       ***  MODULE  lib_mpp  ***
[1344]4   !! Ocean numerics:  massively parallel processing library
[13]5   !!=====================================================================
[1344]6   !! History :  OPA  !  1994  (M. Guyon, J. Escobar, M. Imbard)  Original code
7   !!            7.0  !  1997  (A.M. Treguier)  SHMEM additions
8   !!            8.0  !  1998  (M. Imbard, J. Escobar, L. Colombet ) SHMEM and MPI
9   !!                 !  1998  (J.M. Molines) Open boundary conditions
[9019]10   !!   NEMO     1.0  !  2003  (J.M. Molines, G. Madec)  F90, free form
[1344]11   !!                 !  2003  (J.M. Molines) add mpp_ini_north(_3d,_2d)
12   !!             -   !  2004  (R. Bourdalle Badie)  isend option in mpi
13   !!                 !  2004  (J.M. Molines) minloc, maxloc
14   !!             -   !  2005  (G. Madec, S. Masson)  npolj=5,6 F-point & ice cases
15   !!             -   !  2005  (R. Redler) Replacement of MPI_COMM_WORLD except for MPI_Abort
16   !!             -   !  2005  (R. Benshila, G. Madec)  add extra halo case
17   !!             -   !  2008  (R. Benshila) add mpp_ini_ice
18   !!            3.2  !  2009  (R. Benshila) SHMEM suppression, north fold in lbc_nfd
[3764]19   !!            3.2  !  2009  (O. Marti)    add mpp_ini_znl
[2715]20   !!            4.0  !  2011  (G. Madec)  move ctl_ routines from in_out_manager
[9019]21   !!            3.5  !  2012  (S.Mocavero, I. Epicoco) Add mpp_lnk_bdy_3d/2d routines to optimize the BDY comm.
22   !!            3.5  !  2013  (C. Ethe, G. Madec)  message passing arrays as local variables
[6140]23   !!            3.5  !  2013  (S.Mocavero, I.Epicoco - CMCC) north fold optimizations
[9019]24   !!            3.6  !  2015  (O. Tintó and M. Castrillo - BSC) Added '_multiple' case for 2D lbc and max
25   !!            4.0  !  2017  (G. Madec) automatique allocation of array argument (use any 3rd dimension)
26   !!             -   !  2017  (G. Madec) create generic.h90 files to generate all lbc and north fold routines
[13]27   !!----------------------------------------------------------------------
[2715]28
29   !!----------------------------------------------------------------------
[6140]30   !!   ctl_stop      : update momentum and tracer Kz from a tke scheme
31   !!   ctl_warn      : initialization, namelist read, and parameters control
32   !!   ctl_opn       : Open file and check if required file is available.
33   !!   ctl_nam       : Prints informations when an error occurs while reading a namelist
[12377]34   !!   load_nml      : Read, condense and buffer namelist file into character array for use as an internal file
[2715]35   !!----------------------------------------------------------------------
[13]36   !!----------------------------------------------------------------------
[11536]37   !!   mpp_start     : get local communicator its size and rank
[2715]38   !!   mpp_lnk       : interface (defined in lbclnk) for message passing of 2d or 3d arrays (mpp_lnk_2d, mpp_lnk_3d)
[4990]39   !!   mpp_lnk_icb   : interface for message passing of 2d arrays with extra halo for icebergs (mpp_lnk_2d_icb)
[6140]40   !!   mpprecv       :
[9019]41   !!   mppsend       :
[2715]42   !!   mppscatter    :
43   !!   mppgather     :
44   !!   mpp_min       : generic interface for mppmin_int , mppmin_a_int , mppmin_real, mppmin_a_real
45   !!   mpp_max       : generic interface for mppmax_int , mppmax_a_int , mppmax_real, mppmax_a_real
46   !!   mpp_sum       : generic interface for mppsum_int , mppsum_a_int , mppsum_real, mppsum_a_real
47   !!   mpp_minloc    :
48   !!   mpp_maxloc    :
49   !!   mppsync       :
50   !!   mppstop       :
[1344]51   !!   mpp_ini_north : initialisation of north fold
[9019]52   !!   mpp_lbc_north_icb : alternative to mpp_nfd for extra outer halo with icebergs
[12377]53   !!   mpp_bcast_nml : broadcast/receive namelist character buffer from reading process to all others
[13]54   !!----------------------------------------------------------------------
[3764]55   USE dom_oce        ! ocean space and time domain
[2715]56   USE in_out_manager ! I/O manager
[3]57
[13]58   IMPLICIT NONE
[415]59   PRIVATE
[9019]60   !
[12377]61   PUBLIC   ctl_stop, ctl_warn, ctl_opn, ctl_nam, load_nml
[11536]62   PUBLIC   mpp_start, mppstop, mppsync, mpp_comm_free
[9019]63   PUBLIC   mpp_ini_north
[1344]64   PUBLIC   mpp_min, mpp_max, mpp_sum, mpp_minloc, mpp_maxloc
[10425]65   PUBLIC   mpp_delay_max, mpp_delay_sum, mpp_delay_rcv
[3294]66   PUBLIC   mppscatter, mppgather
[10425]67   PUBLIC   mpp_ini_znl
[3764]68   PUBLIC   mppsend, mpprecv                          ! needed by TAM and ICB routines
[13226]69   PUBLIC   mppsend_sp, mpprecv_sp                          ! needed by TAM and ICB routines
70   PUBLIC   mppsend_dp, mpprecv_dp                          ! needed by TAM and ICB routines
[11536]71   PUBLIC   mpp_report
[12377]72   PUBLIC   mpp_bcast_nml
[11536]73   PUBLIC   tic_tac
74#if ! defined key_mpp_mpi
[13438]75   PUBLIC MPI_wait
[11536]76   PUBLIC MPI_Wtime
77#endif
[5429]78   
[13]79   !! * Interfaces
80   !! define generic interface for these routine as they are called sometimes
[1344]81   !! with scalar arguments instead of array arguments, which causes problems
82   !! for the compilation on AIX system as well as NEC and SGI. Ok on COMPACQ
[13]83   INTERFACE mpp_min
[13226]84      MODULE PROCEDURE mppmin_a_int, mppmin_int
85      MODULE PROCEDURE mppmin_a_real_sp, mppmin_real_sp
86      MODULE PROCEDURE mppmin_a_real_dp, mppmin_real_dp
[13]87   END INTERFACE
88   INTERFACE mpp_max
[13226]89      MODULE PROCEDURE mppmax_a_int, mppmax_int
90      MODULE PROCEDURE mppmax_a_real_sp, mppmax_real_sp
91      MODULE PROCEDURE mppmax_a_real_dp, mppmax_real_dp
[13]92   END INTERFACE
93   INTERFACE mpp_sum
[13226]94      MODULE PROCEDURE mppsum_a_int, mppsum_int
95      MODULE PROCEDURE mppsum_realdd, mppsum_a_realdd
96      MODULE PROCEDURE mppsum_a_real_sp, mppsum_real_sp
97      MODULE PROCEDURE mppsum_a_real_dp, mppsum_real_dp
[13]98   END INTERFACE
[1344]99   INTERFACE mpp_minloc
[13226]100      MODULE PROCEDURE mpp_minloc2d_sp ,mpp_minloc3d_sp
101      MODULE PROCEDURE mpp_minloc2d_dp ,mpp_minloc3d_dp
[1344]102   END INTERFACE
103   INTERFACE mpp_maxloc
[13226]104      MODULE PROCEDURE mpp_maxloc2d_sp ,mpp_maxloc3d_sp
105      MODULE PROCEDURE mpp_maxloc2d_dp ,mpp_maxloc3d_dp
[1344]106   END INTERFACE
[6490]107
[51]108   !! ========================= !!
109   !!  MPI  variable definition !!
110   !! ========================= !!
[11536]111#if   defined key_mpp_mpi
[1629]112!$AGRIF_DO_NOT_TREAT
[2004]113   INCLUDE 'mpif.h'
[1629]114!$AGRIF_END_DO_NOT_TREAT
[1344]115   LOGICAL, PUBLIC, PARAMETER ::   lk_mpp = .TRUE.    !: mpp flag
[11536]116#else   
117   INTEGER, PUBLIC, PARAMETER ::   MPI_STATUS_SIZE = 1
[13438]118   INTEGER, PUBLIC, PARAMETER ::   MPI_REAL = 4
[11536]119   INTEGER, PUBLIC, PARAMETER ::   MPI_DOUBLE_PRECISION = 8
120   LOGICAL, PUBLIC, PARAMETER ::   lk_mpp = .FALSE.    !: mpp flag
121#endif
[3]122
[1344]123   INTEGER, PARAMETER         ::   nprocmax = 2**10   ! maximun dimension (required to be a power of 2)
[3764]124
[10425]125   INTEGER, PUBLIC ::   mppsize        ! number of process
126   INTEGER, PUBLIC ::   mpprank        ! process number  [ 0 - size-1 ]
[2363]127!$AGRIF_DO_NOT_TREAT
[9570]128   INTEGER, PUBLIC ::   mpi_comm_oce   ! opa local communicator
[2363]129!$AGRIF_END_DO_NOT_TREAT
[3]130
[2480]131   INTEGER :: MPI_SUMDD
[1976]132
[1345]133   ! variables used for zonal integration
134   INTEGER, PUBLIC ::   ncomm_znl       !: communicator made by the processors on the same zonal average
[9019]135   LOGICAL, PUBLIC ::   l_znl_root      !: True on the 'left'most processor on the same row
136   INTEGER         ::   ngrp_znl        !  group ID for the znl processors
137   INTEGER         ::   ndim_rank_znl   !  number of processors on the same zonal average
[2715]138   INTEGER, DIMENSION(:), ALLOCATABLE, SAVE ::   nrank_znl  ! dimension ndim_rank_znl, number of the procs into the same znl domain
[3]139
[3764]140   ! North fold condition in mpp_mpi with jpni > 1 (PUBLIC for TAM)
[9019]141   INTEGER, PUBLIC ::   ngrp_world        !: group ID for the world processors
142   INTEGER, PUBLIC ::   ngrp_opa          !: group ID for the opa processors
143   INTEGER, PUBLIC ::   ngrp_north        !: group ID for the northern processors (to be fold)
144   INTEGER, PUBLIC ::   ncomm_north       !: communicator made by the processors belonging to ngrp_north
145   INTEGER, PUBLIC ::   ndim_rank_north   !: number of 'sea' processor in the northern line (can be /= jpni !)
146   INTEGER, PUBLIC ::   njmppmax          !: value of njmpp for the processors of the northern line
147   INTEGER, PUBLIC ::   north_root        !: number (in the comm_opa) of proc 0 in the northern comm
148   INTEGER, PUBLIC, DIMENSION(:), ALLOCATABLE, SAVE ::   nrank_north   !: dimension ndim_rank_north
[3764]149
[10425]150   ! Communications summary report
[13216]151   CHARACTER(len=lca), DIMENSION(:), ALLOCATABLE ::   crname_lbc                   !: names of lbc_lnk calling routines
152   CHARACTER(len=lca), DIMENSION(:), ALLOCATABLE ::   crname_glb                   !: names of global comm calling routines
153   CHARACTER(len=lca), DIMENSION(:), ALLOCATABLE ::   crname_dlg                   !: names of delayed global comm calling routines
[10425]154   INTEGER, PUBLIC                               ::   ncom_stp = 0                 !: copy of time step # istp
155   INTEGER, PUBLIC                               ::   ncom_fsbc = 1                !: copy of sbc time step # nn_fsbc
156   INTEGER, PUBLIC                               ::   ncom_freq                    !: frequency of comm diagnostic
157   INTEGER, PUBLIC , DIMENSION(:,:), ALLOCATABLE ::   ncomm_sequence               !: size of communicated arrays (halos)
[10781]158   INTEGER, PARAMETER, PUBLIC                    ::   ncom_rec_max = 5000          !: max number of communication record
[10425]159   INTEGER, PUBLIC                               ::   n_sequence_lbc = 0           !: # of communicated arraysvia lbc
160   INTEGER, PUBLIC                               ::   n_sequence_glb = 0           !: # of global communications
[10437]161   INTEGER, PUBLIC                               ::   n_sequence_dlg = 0           !: # of delayed global communications
[10425]162   INTEGER, PUBLIC                               ::   numcom = -1                  !: logical unit for communicaton report
163   LOGICAL, PUBLIC                               ::   l_full_nf_update = .TRUE.    !: logical for a full (2lines) update of bc at North fold report
164   INTEGER,                    PARAMETER, PUBLIC ::   nbdelay = 2       !: number of delayed operations
165   !: name (used as id) of allreduce-delayed operations
[10521]166   ! Warning: we must use the same character length in an array constructor (at least for gcc compiler)
167   CHARACTER(len=32), DIMENSION(nbdelay), PUBLIC ::   c_delaylist = (/ 'cflice', 'fwb   ' /)
[10425]168   !: component name where the allreduce-delayed operation is performed
169   CHARACTER(len=3),  DIMENSION(nbdelay), PUBLIC ::   c_delaycpnt = (/ 'ICE'   , 'OCE' /)
170   TYPE, PUBLIC ::   DELAYARR
171      REAL(   wp), POINTER, DIMENSION(:) ::  z1d => NULL()
[13226]172      COMPLEX(dp), POINTER, DIMENSION(:) ::  y1d => NULL()
[10425]173   END TYPE DELAYARR
[10817]174   TYPE( DELAYARR ), DIMENSION(nbdelay), PUBLIC, SAVE  ::   todelay         !: must have SAVE for default initialization of DELAYARR
175   INTEGER,          DIMENSION(nbdelay), PUBLIC        ::   ndelayid = -1   !: mpi request id of the delayed operations
[10425]176
177   ! timing summary report
[13226]178   REAL(dp), DIMENSION(2), PUBLIC ::  waiting_time = 0._dp
179   REAL(dp)              , PUBLIC ::  compute_time = 0._dp, elapsed_time = 0._dp
[10425]180   
[9019]181   REAL(wp), DIMENSION(:), ALLOCATABLE, SAVE ::   tampon   ! buffer in case of bsend
[3]182
[9019]183   LOGICAL, PUBLIC ::   ln_nnogather                !: namelist control of northfold comms
184   LOGICAL, PUBLIC ::   l_north_nogather = .FALSE.  !: internal control of northfold comms
[11536]185   
[12377]186   !! * Substitutions
187#  include "do_loop_substitute.h90"
[51]188   !!----------------------------------------------------------------------
[9598]189   !! NEMO/OCE 4.0 , NEMO Consortium (2018)
[888]190   !! $Id$
[10068]191   !! Software governed by the CeCILL license (see ./LICENSE)
[1344]192   !!----------------------------------------------------------------------
[3]193CONTAINS
194
[11536]195   SUBROUTINE mpp_start( localComm )
[2715]196      !!----------------------------------------------------------------------
[11536]197      !!                  ***  routine mpp_start  ***
[3764]198      !!
[11536]199      !! ** Purpose :   get mpi_comm_oce, mpprank and mppsize
[51]200      !!----------------------------------------------------------------------
[6140]201      INTEGER         , OPTIONAL   , INTENT(in   ) ::   localComm    !
[2715]202      !
[11536]203      INTEGER ::   ierr
204      LOGICAL ::   llmpi_init
[51]205      !!----------------------------------------------------------------------
[11536]206#if defined key_mpp_mpi
[1344]207      !
[11536]208      CALL mpi_initialized ( llmpi_init, ierr )
209      IF( ierr /= MPI_SUCCESS ) CALL ctl_stop( 'STOP', ' lib_mpp: Error in routine mpi_initialized' )
[2715]210
[11536]211      IF( .NOT. llmpi_init ) THEN
212         IF( PRESENT(localComm) ) THEN
213            WRITE(ctmp1,*) ' lib_mpp: You cannot provide a local communicator '
214            WRITE(ctmp2,*) '          without calling MPI_Init before ! '
215            CALL ctl_stop( 'STOP', ctmp1, ctmp2 )
216         ENDIF
217         CALL mpi_init( ierr )
218         IF( ierr /= MPI_SUCCESS ) CALL ctl_stop( 'STOP', ' lib_mpp: Error in routine mpi_init' )
[2480]219      ENDIF
[11536]220       
[3764]221      IF( PRESENT(localComm) ) THEN
[2480]222         IF( Agrif_Root() ) THEN
[9570]223            mpi_comm_oce = localComm
[2480]224         ENDIF
225      ELSE
[11536]226         CALL mpi_comm_dup( mpi_comm_world, mpi_comm_oce, ierr)
227         IF( ierr /= MPI_SUCCESS ) CALL ctl_stop( 'STOP', ' lib_mpp: Error in routine mpi_comm_dup' )
[3764]228      ENDIF
[2480]229
[11536]230# if defined key_agrif
[9019]231      IF( Agrif_Root() ) THEN
[9570]232         CALL Agrif_MPI_Init(mpi_comm_oce)
[5656]233      ELSE
[9570]234         CALL Agrif_MPI_set_grid_comm(mpi_comm_oce)
[5656]235      ENDIF
[11536]236# endif
[5656]237
[9570]238      CALL mpi_comm_rank( mpi_comm_oce, mpprank, ierr )
239      CALL mpi_comm_size( mpi_comm_oce, mppsize, ierr )
[3764]240      !
[1976]241      CALL MPI_OP_CREATE(DDPDD_MPI, .TRUE., MPI_SUMDD, ierr)
242      !
[11536]243#else
244      IF( PRESENT( localComm ) ) mpi_comm_oce = localComm
245      mppsize = 1
246      mpprank = 0
247#endif
248   END SUBROUTINE mpp_start
[3]249
[6140]250
[1344]251   SUBROUTINE mppsend( ktyp, pmess, kbytes, kdest, md_req )
[51]252      !!----------------------------------------------------------------------
253      !!                  ***  routine mppsend  ***
[3764]254      !!
[51]255      !! ** Purpose :   Send messag passing array
256      !!
257      !!----------------------------------------------------------------------
[1344]258      REAL(wp), INTENT(inout) ::   pmess(*)   ! array of real
259      INTEGER , INTENT(in   ) ::   kbytes     ! size of the array pmess
260      INTEGER , INTENT(in   ) ::   kdest      ! receive process number
261      INTEGER , INTENT(in   ) ::   ktyp       ! tag of the message
262      INTEGER , INTENT(in   ) ::   md_req     ! argument for isend
263      !!
264      INTEGER ::   iflag
[13226]265      INTEGER :: mpi_working_type
[51]266      !!----------------------------------------------------------------------
[1344]267      !
[11536]268#if defined key_mpp_mpi
[13226]269      IF (wp == dp) THEN
270         mpi_working_type = mpi_double_precision
271      ELSE
272         mpi_working_type = mpi_real
273      END IF
274      CALL mpi_isend( pmess, kbytes, mpi_working_type, kdest , ktyp, mpi_comm_oce, md_req, iflag )
[11536]275#endif
[1344]276      !
[51]277   END SUBROUTINE mppsend
[3]278
279
[13226]280   SUBROUTINE mppsend_dp( ktyp, pmess, kbytes, kdest, md_req )
281      !!----------------------------------------------------------------------
282      !!                  ***  routine mppsend  ***
283      !!
284      !! ** Purpose :   Send messag passing array
285      !!
286      !!----------------------------------------------------------------------
287      REAL(dp), INTENT(inout) ::   pmess(*)   ! array of real
288      INTEGER , INTENT(in   ) ::   kbytes     ! size of the array pmess
289      INTEGER , INTENT(in   ) ::   kdest      ! receive process number
290      INTEGER , INTENT(in   ) ::   ktyp       ! tag of the message
291      INTEGER , INTENT(in   ) ::   md_req     ! argument for isend
292      !!
293      INTEGER ::   iflag
294      !!----------------------------------------------------------------------
295      !
296#if defined key_mpp_mpi
297      CALL mpi_isend( pmess, kbytes, mpi_double_precision, kdest , ktyp, mpi_comm_oce, md_req, iflag )
298#endif
299      !
300   END SUBROUTINE mppsend_dp
301
302
303   SUBROUTINE mppsend_sp( ktyp, pmess, kbytes, kdest, md_req )
304      !!----------------------------------------------------------------------
305      !!                  ***  routine mppsend  ***
306      !!
307      !! ** Purpose :   Send messag passing array
308      !!
309      !!----------------------------------------------------------------------
310      REAL(sp), INTENT(inout) ::   pmess(*)   ! array of real
311      INTEGER , INTENT(in   ) ::   kbytes     ! size of the array pmess
312      INTEGER , INTENT(in   ) ::   kdest      ! receive process number
313      INTEGER , INTENT(in   ) ::   ktyp       ! tag of the message
314      INTEGER , INTENT(in   ) ::   md_req     ! argument for isend
315      !!
316      INTEGER ::   iflag
317      !!----------------------------------------------------------------------
318      !
319#if defined key_mpp_mpi
320      CALL mpi_isend( pmess, kbytes, mpi_real, kdest , ktyp, mpi_comm_oce, md_req, iflag )
321#endif
322      !
323   END SUBROUTINE mppsend_sp
324
325
[3294]326   SUBROUTINE mpprecv( ktyp, pmess, kbytes, ksource )
[51]327      !!----------------------------------------------------------------------
328      !!                  ***  routine mpprecv  ***
329      !!
330      !! ** Purpose :   Receive messag passing array
331      !!
332      !!----------------------------------------------------------------------
[1344]333      REAL(wp), INTENT(inout) ::   pmess(*)   ! array of real
334      INTEGER , INTENT(in   ) ::   kbytes     ! suze of the array pmess
335      INTEGER , INTENT(in   ) ::   ktyp       ! Tag of the recevied message
[3764]336      INTEGER, OPTIONAL, INTENT(in) :: ksource    ! source process number
[1344]337      !!
[51]338      INTEGER :: istatus(mpi_status_size)
339      INTEGER :: iflag
[3294]340      INTEGER :: use_source
[13226]341      INTEGER :: mpi_working_type
[1344]342      !!----------------------------------------------------------------------
343      !
[11536]344#if defined key_mpp_mpi
[3764]345      ! If a specific process number has been passed to the receive call,
[3294]346      ! use that one. Default is to use mpi_any_source
[6140]347      use_source = mpi_any_source
348      IF( PRESENT(ksource) )   use_source = ksource
349      !
[13226]350      IF (wp == dp) THEN
351         mpi_working_type = mpi_double_precision
352      ELSE
353         mpi_working_type = mpi_real
354      END IF
355      CALL mpi_recv( pmess, kbytes, mpi_working_type, use_source, ktyp, mpi_comm_oce, istatus, iflag )
[11536]356#endif
[1344]357      !
[51]358   END SUBROUTINE mpprecv
[3]359
[13226]360   SUBROUTINE mpprecv_dp( ktyp, pmess, kbytes, ksource )
361      !!----------------------------------------------------------------------
362      !!                  ***  routine mpprecv  ***
363      !!
364      !! ** Purpose :   Receive messag passing array
365      !!
366      !!----------------------------------------------------------------------
367      REAL(dp), INTENT(inout) ::   pmess(*)   ! array of real
368      INTEGER , INTENT(in   ) ::   kbytes     ! suze of the array pmess
369      INTEGER , INTENT(in   ) ::   ktyp       ! Tag of the recevied message
370      INTEGER, OPTIONAL, INTENT(in) :: ksource    ! source process number
371      !!
372      INTEGER :: istatus(mpi_status_size)
373      INTEGER :: iflag
374      INTEGER :: use_source
375      !!----------------------------------------------------------------------
376      !
377#if defined key_mpp_mpi
378      ! If a specific process number has been passed to the receive call,
379      ! use that one. Default is to use mpi_any_source
380      use_source = mpi_any_source
381      IF( PRESENT(ksource) )   use_source = ksource
382      !
383      CALL mpi_recv( pmess, kbytes, mpi_double_precision, use_source, ktyp, mpi_comm_oce, istatus, iflag )
384#endif
385      !
386   END SUBROUTINE mpprecv_dp
[3]387
[13226]388
389   SUBROUTINE mpprecv_sp( ktyp, pmess, kbytes, ksource )
390      !!----------------------------------------------------------------------
391      !!                  ***  routine mpprecv  ***
392      !!
393      !! ** Purpose :   Receive messag passing array
394      !!
395      !!----------------------------------------------------------------------
396      REAL(sp), INTENT(inout) ::   pmess(*)   ! array of real
397      INTEGER , INTENT(in   ) ::   kbytes     ! suze of the array pmess
398      INTEGER , INTENT(in   ) ::   ktyp       ! Tag of the recevied message
399      INTEGER, OPTIONAL, INTENT(in) :: ksource    ! source process number
400      !!
401      INTEGER :: istatus(mpi_status_size)
402      INTEGER :: iflag
403      INTEGER :: use_source
404      !!----------------------------------------------------------------------
405      !
406#if defined key_mpp_mpi
407      ! If a specific process number has been passed to the receive call,
408      ! use that one. Default is to use mpi_any_source
409      use_source = mpi_any_source
410      IF( PRESENT(ksource) )   use_source = ksource
411      !
412      CALL mpi_recv( pmess, kbytes, mpi_real, use_source, ktyp, mpi_comm_oce, istatus, iflag )
413#endif
414      !
415   END SUBROUTINE mpprecv_sp
416
417
[51]418   SUBROUTINE mppgather( ptab, kp, pio )
419      !!----------------------------------------------------------------------
420      !!                   ***  routine mppgather  ***
[3764]421      !!
422      !! ** Purpose :   Transfert between a local subdomain array and a work
[51]423      !!     array which is distributed following the vertical level.
424      !!
[1344]425      !!----------------------------------------------------------------------
[6140]426      REAL(wp), DIMENSION(jpi,jpj)      , INTENT(in   ) ::   ptab   ! subdomain input array
427      INTEGER                           , INTENT(in   ) ::   kp     ! record length
[1344]428      REAL(wp), DIMENSION(jpi,jpj,jpnij), INTENT(  out) ::   pio    ! subdomain input array
[51]429      !!
[1344]430      INTEGER :: itaille, ierror   ! temporary integer
[51]431      !!---------------------------------------------------------------------
[1344]432      !
433      itaille = jpi * jpj
[11536]434#if defined key_mpp_mpi
[1344]435      CALL mpi_gather( ptab, itaille, mpi_double_precision, pio, itaille     ,   &
[9570]436         &                            mpi_double_precision, kp , mpi_comm_oce, ierror )
[11536]437#else
438      pio(:,:,1) = ptab(:,:)
439#endif
[1344]440      !
[51]441   END SUBROUTINE mppgather
[3]442
443
[51]444   SUBROUTINE mppscatter( pio, kp, ptab )
445      !!----------------------------------------------------------------------
446      !!                  ***  routine mppscatter  ***
447      !!
[3764]448      !! ** Purpose :   Transfert between awork array which is distributed
[51]449      !!      following the vertical level and the local subdomain array.
450      !!
451      !!----------------------------------------------------------------------
[6140]452      REAL(wp), DIMENSION(jpi,jpj,jpnij)  ::   pio    ! output array
453      INTEGER                             ::   kp     ! Tag (not used with MPI
454      REAL(wp), DIMENSION(jpi,jpj)        ::   ptab   ! subdomain array input
[1344]455      !!
456      INTEGER :: itaille, ierror   ! temporary integer
[51]457      !!---------------------------------------------------------------------
[1344]458      !
[6140]459      itaille = jpi * jpj
[1344]460      !
[11536]461#if defined key_mpp_mpi
[1344]462      CALL mpi_scatter( pio, itaille, mpi_double_precision, ptab, itaille     ,   &
[9570]463         &                            mpi_double_precision, kp  , mpi_comm_oce, ierror )
[11536]464#else
465      ptab(:,:) = pio(:,:,1)
466#endif
[1344]467      !
[51]468   END SUBROUTINE mppscatter
[3]469
[10425]470   
471   SUBROUTINE mpp_delay_sum( cdname, cdelay, y_in, pout, ldlast, kcom )
472     !!----------------------------------------------------------------------
473      !!                   ***  routine mpp_delay_sum  ***
[1344]474      !!
[10425]475      !! ** Purpose :   performed delayed mpp_sum, the result is received on next call
476      !!
[1344]477      !!----------------------------------------------------------------------
[10425]478      CHARACTER(len=*), INTENT(in   )               ::   cdname  ! name of the calling subroutine
479      CHARACTER(len=*), INTENT(in   )               ::   cdelay  ! name (used as id) of the delayed operation
[13226]480      COMPLEX(dp),      INTENT(in   ), DIMENSION(:) ::   y_in
[10425]481      REAL(wp),         INTENT(  out), DIMENSION(:) ::   pout
482      LOGICAL,          INTENT(in   )               ::   ldlast  ! true if this is the last time we call this routine
483      INTEGER,          INTENT(in   ), OPTIONAL     ::   kcom
[1344]484      !!
[10425]485      INTEGER ::   ji, isz
486      INTEGER ::   idvar
487      INTEGER ::   ierr, ilocalcomm
[13226]488      COMPLEX(dp), ALLOCATABLE, DIMENSION(:) ::   ytmp
[1344]489      !!----------------------------------------------------------------------
[11536]490#if defined key_mpp_mpi
[9570]491      ilocalcomm = mpi_comm_oce
[9019]492      IF( PRESENT(kcom) )   ilocalcomm = kcom
[3]493
[10425]494      isz = SIZE(y_in)
495     
[10437]496      IF( narea == 1 .AND. numcom == -1 ) CALL mpp_report( cdname, ld_dlg = .TRUE. )
[13]497
[10425]498      idvar = -1
499      DO ji = 1, nbdelay
500         IF( TRIM(cdelay) == TRIM(c_delaylist(ji)) ) idvar = ji
501      END DO
502      IF ( idvar == -1 )   CALL ctl_stop( 'STOP',' mpp_delay_sum : please add a new delayed exchange for '//TRIM(cdname) )
503
504      IF ( ndelayid(idvar) == 0 ) THEN         ! first call    with restart: %z1d defined in iom_delay_rst
505         !                                       --------------------------
506         IF ( SIZE(todelay(idvar)%z1d) /= isz ) THEN                  ! Check dimension coherence
507            IF(lwp) WRITE(numout,*) ' WARNING: the nb of delayed variables in restart file is not the model one'
508            DEALLOCATE(todelay(idvar)%z1d)
509            ndelayid(idvar) = -1                                      ! do as if we had no restart
510         ELSE
511            ALLOCATE(todelay(idvar)%y1d(isz))
512            todelay(idvar)%y1d(:) = CMPLX(todelay(idvar)%z1d(:), 0., wp)   ! create %y1d, complex variable needed by mpi_sumdd
513         END IF
514      ENDIF
515     
516      IF( ndelayid(idvar) == -1 ) THEN         ! first call without restart: define %y1d and %z1d from y_in with blocking allreduce
517         !                                       --------------------------
518         ALLOCATE(todelay(idvar)%z1d(isz), todelay(idvar)%y1d(isz))   ! allocate also %z1d as used for the restart
519         CALL mpi_allreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ierr )   ! get %y1d
520         todelay(idvar)%z1d(:) = REAL(todelay(idvar)%y1d(:), wp)      ! define %z1d from %y1d
521      ENDIF
522
523      IF( ndelayid(idvar) > 0 )   CALL mpp_delay_rcv( idvar )         ! make sure %z1d is received
524
525      ! send back pout from todelay(idvar)%z1d defined at previous call
526      pout(:) = todelay(idvar)%z1d(:)
527
528      ! send y_in into todelay(idvar)%y1d with a non-blocking communication
[11536]529# if defined key_mpi2
[10526]530      IF( ln_timing ) CALL tic_tac( .TRUE., ld_global = .TRUE.)
[12512]531      CALL  mpi_allreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ierr )
532      ndelayid(idvar) = 1
[10526]533      IF( ln_timing ) CALL tic_tac(.FALSE., ld_global = .TRUE.)
[11536]534# else
535      CALL mpi_iallreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ndelayid(idvar), ierr )
536# endif
[10526]537#else
[11536]538      pout(:) = REAL(y_in(:), wp)
[10526]539#endif
[10425]540
541   END SUBROUTINE mpp_delay_sum
542
[9019]543   
[10425]544   SUBROUTINE mpp_delay_max( cdname, cdelay, p_in, pout, ldlast, kcom )
[9019]545      !!----------------------------------------------------------------------
[10425]546      !!                   ***  routine mpp_delay_max  ***
[9019]547      !!
[10425]548      !! ** Purpose :   performed delayed mpp_max, the result is received on next call
[9019]549      !!
550      !!----------------------------------------------------------------------
[10425]551      CHARACTER(len=*), INTENT(in   )                 ::   cdname  ! name of the calling subroutine
552      CHARACTER(len=*), INTENT(in   )                 ::   cdelay  ! name (used as id) of the delayed operation
553      REAL(wp),         INTENT(in   ), DIMENSION(:)   ::   p_in    !
554      REAL(wp),         INTENT(  out), DIMENSION(:)   ::   pout    !
555      LOGICAL,          INTENT(in   )                 ::   ldlast  ! true if this is the last time we call this routine
556      INTEGER,          INTENT(in   ), OPTIONAL       ::   kcom
[9019]557      !!
[10425]558      INTEGER ::   ji, isz
559      INTEGER ::   idvar
560      INTEGER ::   ierr, ilocalcomm
[13226]561      INTEGER ::   MPI_TYPE
[9019]562      !!----------------------------------------------------------------------
[13226]563     
[11536]564#if defined key_mpp_mpi
[13226]565      if( wp == dp ) then
566         MPI_TYPE = MPI_DOUBLE_PRECISION
567      else if ( wp == sp ) then
568         MPI_TYPE = MPI_REAL
569      else
570        CALL ctl_stop( "Error defining type, wp is neither dp nor sp" )
571   
572      end if
573
[9570]574      ilocalcomm = mpi_comm_oce
[9019]575      IF( PRESENT(kcom) )   ilocalcomm = kcom
[6140]576
[10425]577      isz = SIZE(p_in)
[9019]578
[10437]579      IF( narea == 1 .AND. numcom == -1 ) CALL mpp_report( cdname, ld_dlg = .TRUE. )
[13]580
[10425]581      idvar = -1
582      DO ji = 1, nbdelay
583         IF( TRIM(cdelay) == TRIM(c_delaylist(ji)) ) idvar = ji
584      END DO
585      IF ( idvar == -1 )   CALL ctl_stop( 'STOP',' mpp_delay_max : please add a new delayed exchange for '//TRIM(cdname) )
[3]586
[10425]587      IF ( ndelayid(idvar) == 0 ) THEN         ! first call    with restart: %z1d defined in iom_delay_rst
588         !                                       --------------------------
589         IF ( SIZE(todelay(idvar)%z1d) /= isz ) THEN                  ! Check dimension coherence
590            IF(lwp) WRITE(numout,*) ' WARNING: the nb of delayed variables in restart file is not the model one'
591            DEALLOCATE(todelay(idvar)%z1d)
592            ndelayid(idvar) = -1                                      ! do as if we had no restart
593         END IF
594      ENDIF
[13]595
[10425]596      IF( ndelayid(idvar) == -1 ) THEN         ! first call without restart: define %z1d from p_in with a blocking allreduce
597         !                                       --------------------------
598         ALLOCATE(todelay(idvar)%z1d(isz))
599         CALL mpi_allreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_DOUBLE_PRECISION, mpi_max, ilocalcomm, ierr )   ! get %z1d
600      ENDIF
[3]601
[10425]602      IF( ndelayid(idvar) > 0 )   CALL mpp_delay_rcv( idvar )         ! make sure %z1d is received
[3]603
[10425]604      ! send back pout from todelay(idvar)%z1d defined at previous call
605      pout(:) = todelay(idvar)%z1d(:)
[13]606
[10425]607      ! send p_in into todelay(idvar)%z1d with a non-blocking communication
[11536]608# if defined key_mpi2
[10526]609      IF( ln_timing ) CALL tic_tac( .TRUE., ld_global = .TRUE.)
[13226]610      CALL  mpi_allreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_TYPE, mpi_max, ilocalcomm, ndelayid(idvar), ierr )
[10526]611      IF( ln_timing ) CALL tic_tac(.FALSE., ld_global = .TRUE.)
[11536]612# else
[13226]613      CALL mpi_iallreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_TYPE, mpi_max, ilocalcomm, ndelayid(idvar), ierr )
[11536]614# endif
[10526]615#else
[11536]616      pout(:) = p_in(:)
[10526]617#endif
[10425]618
619   END SUBROUTINE mpp_delay_max
620
621   
622   SUBROUTINE mpp_delay_rcv( kid )
623      !!----------------------------------------------------------------------
624      !!                   ***  routine mpp_delay_rcv  ***
[1344]625      !!
[10425]626      !! ** Purpose :  force barrier for delayed mpp (needed for restart)
[1344]627      !!
[10425]628      !!----------------------------------------------------------------------
629      INTEGER,INTENT(in   )      ::  kid 
630      INTEGER ::   ierr
631      !!----------------------------------------------------------------------
[11536]632#if defined key_mpp_mpi
[10425]633      IF( ndelayid(kid) /= -2 ) THEN 
[10526]634#if ! defined key_mpi2
[10425]635         IF( ln_timing ) CALL tic_tac( .TRUE., ld_global = .TRUE.)
636         CALL mpi_wait( ndelayid(kid), MPI_STATUS_IGNORE, ierr )                        ! make sure todelay(kid) is received
637         IF( ln_timing ) CALL tic_tac(.FALSE., ld_global = .TRUE.)
[10526]638#endif
[10425]639         IF( ASSOCIATED(todelay(kid)%y1d) )   todelay(kid)%z1d(:) = REAL(todelay(kid)%y1d(:), wp)  ! define %z1d from %y1d
640         ndelayid(kid) = -2   ! add flag to know that mpi_wait was already called on kid
641      ENDIF
[11536]642#endif
[10425]643   END SUBROUTINE mpp_delay_rcv
[3]644
[12377]645   SUBROUTINE mpp_bcast_nml( cdnambuff , kleng )
646      CHARACTER(LEN=:)    , ALLOCATABLE, INTENT(INOUT) :: cdnambuff
647      INTEGER                          , INTENT(INOUT) :: kleng
648      !!----------------------------------------------------------------------
649      !!                  ***  routine mpp_bcast_nml  ***
650      !!
651      !! ** Purpose :   broadcast namelist character buffer
652      !!
653      !!----------------------------------------------------------------------
654      !!
655      INTEGER ::   iflag
656      !!----------------------------------------------------------------------
657      !
658#if defined key_mpp_mpi
659      call MPI_BCAST(kleng, 1, MPI_INT, 0, mpi_comm_oce, iflag)
660      call MPI_BARRIER(mpi_comm_oce, iflag)
661!$AGRIF_DO_NOT_TREAT
662      IF ( .NOT. ALLOCATED(cdnambuff) ) ALLOCATE( CHARACTER(LEN=kleng) :: cdnambuff )
663!$AGRIF_END_DO_NOT_TREAT
664      call MPI_BCAST(cdnambuff, kleng, MPI_CHARACTER, 0, mpi_comm_oce, iflag)
665      call MPI_BARRIER(mpi_comm_oce, iflag)
666#endif
667      !
668   END SUBROUTINE mpp_bcast_nml
669
[10425]670   
671   !!----------------------------------------------------------------------
672   !!    ***  mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real  ***
673   !!   
674   !!----------------------------------------------------------------------
675   !!
676#  define OPERATION_MAX
677#  define INTEGER_TYPE
678#  define DIM_0d
679#     define ROUTINE_ALLREDUCE           mppmax_int
680#     include "mpp_allreduce_generic.h90"
681#     undef ROUTINE_ALLREDUCE
682#  undef DIM_0d
683#  define DIM_1d
684#     define ROUTINE_ALLREDUCE           mppmax_a_int
685#     include "mpp_allreduce_generic.h90"
686#     undef ROUTINE_ALLREDUCE
687#  undef DIM_1d
688#  undef INTEGER_TYPE
689!
[13226]690   !!
691   !!   ----   SINGLE PRECISION VERSIONS
692   !!
693#  define SINGLE_PRECISION
[10425]694#  define REAL_TYPE
695#  define DIM_0d
[13226]696#     define ROUTINE_ALLREDUCE           mppmax_real_sp
[10425]697#     include "mpp_allreduce_generic.h90"
698#     undef ROUTINE_ALLREDUCE
699#  undef DIM_0d
700#  define DIM_1d
[13226]701#     define ROUTINE_ALLREDUCE           mppmax_a_real_sp
[10425]702#     include "mpp_allreduce_generic.h90"
703#     undef ROUTINE_ALLREDUCE
704#  undef DIM_1d
[13226]705#  undef SINGLE_PRECISION
706   !!
707   !!
708   !!   ----   DOUBLE PRECISION VERSIONS
709   !!
710!
711#  define DIM_0d
712#     define ROUTINE_ALLREDUCE           mppmax_real_dp
713#     include "mpp_allreduce_generic.h90"
714#     undef ROUTINE_ALLREDUCE
715#  undef DIM_0d
716#  define DIM_1d
717#     define ROUTINE_ALLREDUCE           mppmax_a_real_dp
718#     include "mpp_allreduce_generic.h90"
719#     undef ROUTINE_ALLREDUCE
720#  undef DIM_1d
[10425]721#  undef REAL_TYPE
722#  undef OPERATION_MAX
723   !!----------------------------------------------------------------------
724   !!    ***  mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real  ***
725   !!   
726   !!----------------------------------------------------------------------
727   !!
728#  define OPERATION_MIN
729#  define INTEGER_TYPE
730#  define DIM_0d
731#     define ROUTINE_ALLREDUCE           mppmin_int
732#     include "mpp_allreduce_generic.h90"
733#     undef ROUTINE_ALLREDUCE
734#  undef DIM_0d
735#  define DIM_1d
736#     define ROUTINE_ALLREDUCE           mppmin_a_int
737#     include "mpp_allreduce_generic.h90"
738#     undef ROUTINE_ALLREDUCE
739#  undef DIM_1d
740#  undef INTEGER_TYPE
741!
[13226]742   !!
743   !!   ----   SINGLE PRECISION VERSIONS
744   !!
745#  define SINGLE_PRECISION
[10425]746#  define REAL_TYPE
747#  define DIM_0d
[13226]748#     define ROUTINE_ALLREDUCE           mppmin_real_sp
[10425]749#     include "mpp_allreduce_generic.h90"
750#     undef ROUTINE_ALLREDUCE
751#  undef DIM_0d
752#  define DIM_1d
[13226]753#     define ROUTINE_ALLREDUCE           mppmin_a_real_sp
[10425]754#     include "mpp_allreduce_generic.h90"
755#     undef ROUTINE_ALLREDUCE
756#  undef DIM_1d
[13226]757#  undef SINGLE_PRECISION
758   !!
759   !!   ----   DOUBLE PRECISION VERSIONS
760   !!
761
762#  define DIM_0d
763#     define ROUTINE_ALLREDUCE           mppmin_real_dp
764#     include "mpp_allreduce_generic.h90"
765#     undef ROUTINE_ALLREDUCE
766#  undef DIM_0d
767#  define DIM_1d
768#     define ROUTINE_ALLREDUCE           mppmin_a_real_dp
769#     include "mpp_allreduce_generic.h90"
770#     undef ROUTINE_ALLREDUCE
771#  undef DIM_1d
[10425]772#  undef REAL_TYPE
773#  undef OPERATION_MIN
[869]774
[10425]775   !!----------------------------------------------------------------------
776   !!    ***  mppsum_a_int, mppsum_int, mppsum_a_real, mppsum_real  ***
777   !!   
778   !!   Global sum of 1D array or a variable (integer, real or complex)
779   !!----------------------------------------------------------------------
780   !!
781#  define OPERATION_SUM
782#  define INTEGER_TYPE
783#  define DIM_0d
784#     define ROUTINE_ALLREDUCE           mppsum_int
785#     include "mpp_allreduce_generic.h90"
786#     undef ROUTINE_ALLREDUCE
787#  undef DIM_0d
788#  define DIM_1d
789#     define ROUTINE_ALLREDUCE           mppsum_a_int
790#     include "mpp_allreduce_generic.h90"
791#     undef ROUTINE_ALLREDUCE
792#  undef DIM_1d
793#  undef INTEGER_TYPE
[13226]794
795   !!
796   !!   ----   SINGLE PRECISION VERSIONS
797   !!
798#  define OPERATION_SUM
799#  define SINGLE_PRECISION
[10425]800#  define REAL_TYPE
801#  define DIM_0d
[13226]802#     define ROUTINE_ALLREDUCE           mppsum_real_sp
[10425]803#     include "mpp_allreduce_generic.h90"
804#     undef ROUTINE_ALLREDUCE
805#  undef DIM_0d
806#  define DIM_1d
[13226]807#     define ROUTINE_ALLREDUCE           mppsum_a_real_sp
[10425]808#     include "mpp_allreduce_generic.h90"
809#     undef ROUTINE_ALLREDUCE
810#  undef DIM_1d
811#  undef REAL_TYPE
812#  undef OPERATION_SUM
813
[13226]814#  undef SINGLE_PRECISION
815
816   !!
817   !!   ----   DOUBLE PRECISION VERSIONS
818   !!
819#  define OPERATION_SUM
820#  define REAL_TYPE
821#  define DIM_0d
822#     define ROUTINE_ALLREDUCE           mppsum_real_dp
823#     include "mpp_allreduce_generic.h90"
824#     undef ROUTINE_ALLREDUCE
825#  undef DIM_0d
826#  define DIM_1d
827#     define ROUTINE_ALLREDUCE           mppsum_a_real_dp
828#     include "mpp_allreduce_generic.h90"
829#     undef ROUTINE_ALLREDUCE
830#  undef DIM_1d
831#  undef REAL_TYPE
832#  undef OPERATION_SUM
833
[10425]834#  define OPERATION_SUM_DD
835#  define COMPLEX_TYPE
836#  define DIM_0d
837#     define ROUTINE_ALLREDUCE           mppsum_realdd
838#     include "mpp_allreduce_generic.h90"
839#     undef ROUTINE_ALLREDUCE
840#  undef DIM_0d
841#  define DIM_1d
842#     define ROUTINE_ALLREDUCE           mppsum_a_realdd
843#     include "mpp_allreduce_generic.h90"
844#     undef ROUTINE_ALLREDUCE
845#  undef DIM_1d
846#  undef COMPLEX_TYPE
847#  undef OPERATION_SUM_DD
848
849   !!----------------------------------------------------------------------
850   !!    ***  mpp_minloc2d, mpp_minloc3d, mpp_maxloc2d, mpp_maxloc3d
851   !!   
852   !!----------------------------------------------------------------------
853   !!
[13226]854   !!
855   !!   ----   SINGLE PRECISION VERSIONS
856   !!
857#  define SINGLE_PRECISION
[10425]858#  define OPERATION_MINLOC
859#  define DIM_2d
[13226]860#     define ROUTINE_LOC           mpp_minloc2d_sp
[10425]861#     include "mpp_loc_generic.h90"
862#     undef ROUTINE_LOC
863#  undef DIM_2d
864#  define DIM_3d
[13226]865#     define ROUTINE_LOC           mpp_minloc3d_sp
[10425]866#     include "mpp_loc_generic.h90"
867#     undef ROUTINE_LOC
868#  undef DIM_3d
869#  undef OPERATION_MINLOC
870
871#  define OPERATION_MAXLOC
872#  define DIM_2d
[13226]873#     define ROUTINE_LOC           mpp_maxloc2d_sp
[10425]874#     include "mpp_loc_generic.h90"
875#     undef ROUTINE_LOC
876#  undef DIM_2d
877#  define DIM_3d
[13226]878#     define ROUTINE_LOC           mpp_maxloc3d_sp
[10425]879#     include "mpp_loc_generic.h90"
880#     undef ROUTINE_LOC
881#  undef DIM_3d
882#  undef OPERATION_MAXLOC
[13226]883#  undef SINGLE_PRECISION
884   !!
885   !!   ----   DOUBLE PRECISION VERSIONS
886   !!
887#  define OPERATION_MINLOC
888#  define DIM_2d
889#     define ROUTINE_LOC           mpp_minloc2d_dp
890#     include "mpp_loc_generic.h90"
891#     undef ROUTINE_LOC
892#  undef DIM_2d
893#  define DIM_3d
894#     define ROUTINE_LOC           mpp_minloc3d_dp
895#     include "mpp_loc_generic.h90"
896#     undef ROUTINE_LOC
897#  undef DIM_3d
898#  undef OPERATION_MINLOC
[10425]899
[13226]900#  define OPERATION_MAXLOC
901#  define DIM_2d
902#     define ROUTINE_LOC           mpp_maxloc2d_dp
903#     include "mpp_loc_generic.h90"
904#     undef ROUTINE_LOC
905#  undef DIM_2d
906#  define DIM_3d
907#     define ROUTINE_LOC           mpp_maxloc3d_dp
908#     include "mpp_loc_generic.h90"
909#     undef ROUTINE_LOC
910#  undef DIM_3d
911#  undef OPERATION_MAXLOC
912
913
[1344]914   SUBROUTINE mppsync()
915      !!----------------------------------------------------------------------
916      !!                  ***  routine mppsync  ***
[3764]917      !!
[1344]918      !! ** Purpose :   Massively parallel processors, synchroneous
919      !!
920      !!-----------------------------------------------------------------------
921      INTEGER :: ierror
922      !!-----------------------------------------------------------------------
923      !
[11536]924#if defined key_mpp_mpi
[9570]925      CALL mpi_barrier( mpi_comm_oce, ierror )
[11536]926#endif
[1344]927      !
928   END SUBROUTINE mppsync
[3]929
930
[11536]931   SUBROUTINE mppstop( ld_abort ) 
[1344]932      !!----------------------------------------------------------------------
933      !!                  ***  routine mppstop  ***
[3764]934      !!
[3294]935      !! ** purpose :   Stop massively parallel processors method
[1344]936      !!
937      !!----------------------------------------------------------------------
[11536]938      LOGICAL, OPTIONAL, INTENT(in) :: ld_abort    ! source process number
939      LOGICAL ::   ll_abort
[1344]940      INTEGER ::   info
941      !!----------------------------------------------------------------------
[11536]942      ll_abort = .FALSE.
943      IF( PRESENT(ld_abort) ) ll_abort = ld_abort
[1344]944      !
[11536]945#if defined key_mpp_mpi
946      IF(ll_abort) THEN
[10425]947         CALL mpi_abort( MPI_COMM_WORLD )
948      ELSE
949         CALL mppsync
950         CALL mpi_finalize( info )
951      ENDIF
[11536]952#endif
953      IF( ll_abort ) STOP 123
[1344]954      !
955   END SUBROUTINE mppstop
[3]956
957
[1344]958   SUBROUTINE mpp_comm_free( kcom )
959      !!----------------------------------------------------------------------
960      INTEGER, INTENT(in) ::   kcom
961      !!
962      INTEGER :: ierr
963      !!----------------------------------------------------------------------
964      !
[11536]965#if defined key_mpp_mpi
[1344]966      CALL MPI_COMM_FREE(kcom, ierr)
[11536]967#endif
[1344]968      !
969   END SUBROUTINE mpp_comm_free
[3]970
[869]971
[2715]972   SUBROUTINE mpp_ini_znl( kumout )
[1345]973      !!----------------------------------------------------------------------
974      !!               ***  routine mpp_ini_znl  ***
975      !!
976      !! ** Purpose :   Initialize special communicator for computing zonal sum
977      !!
978      !! ** Method  : - Look for processors in the same row
979      !!              - Put their number in nrank_znl
980      !!              - Create group for the znl processors
981      !!              - Create a communicator for znl processors
982      !!              - Determine if processor should write znl files
983      !!
984      !! ** output
985      !!      ndim_rank_znl = number of processors on the same row
986      !!      ngrp_znl = group ID for the znl processors
987      !!      ncomm_znl = communicator for the ice procs.
988      !!      n_znl_root = number (in the world) of proc 0 in the ice comm.
989      !!
990      !!----------------------------------------------------------------------
[2715]991      INTEGER, INTENT(in) ::   kumout   ! ocean.output logical units
[1345]992      !
[2715]993      INTEGER :: jproc      ! dummy loop integer
994      INTEGER :: ierr, ii   ! local integer
995      INTEGER, ALLOCATABLE, DIMENSION(:) ::   kwork
996      !!----------------------------------------------------------------------
[11536]997#if defined key_mpp_mpi
[1345]998      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_world     : ', ngrp_world
999      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - mpi_comm_world : ', mpi_comm_world
[9570]1000      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - mpi_comm_oce   : ', mpi_comm_oce
[1345]1001      !
[2715]1002      ALLOCATE( kwork(jpnij), STAT=ierr )
[11536]1003      IF( ierr /= 0 ) CALL ctl_stop( 'STOP', 'mpp_ini_znl : failed to allocate 1D array of length jpnij')
[2715]1004
1005      IF( jpnj == 1 ) THEN
[1345]1006         ngrp_znl  = ngrp_world
[9570]1007         ncomm_znl = mpi_comm_oce
[1345]1008      ELSE
1009         !
[9570]1010         CALL MPI_ALLGATHER ( njmpp, 1, mpi_integer, kwork, 1, mpi_integer, mpi_comm_oce, ierr )
[1345]1011         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - kwork pour njmpp : ', kwork
1012         !-$$        CALL flush(numout)
1013         !
1014         ! Count number of processors on the same row
1015         ndim_rank_znl = 0
1016         DO jproc=1,jpnij
1017            IF ( kwork(jproc) == njmpp ) THEN
1018               ndim_rank_znl = ndim_rank_znl + 1
1019            ENDIF
1020         END DO
1021         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ndim_rank_znl : ', ndim_rank_znl
1022         !-$$        CALL flush(numout)
1023         ! Allocate the right size to nrank_znl
[1441]1024         IF (ALLOCATED (nrank_znl)) DEALLOCATE(nrank_znl)
[1345]1025         ALLOCATE(nrank_znl(ndim_rank_znl))
[3764]1026         ii = 0
[1345]1027         nrank_znl (:) = 0
1028         DO jproc=1,jpnij
1029            IF ( kwork(jproc) == njmpp) THEN
1030               ii = ii + 1
[3764]1031               nrank_znl(ii) = jproc -1
[1345]1032            ENDIF
1033         END DO
1034         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - nrank_znl : ', nrank_znl
1035         !-$$        CALL flush(numout)
1036
1037         ! Create the opa group
[9570]1038         CALL MPI_COMM_GROUP(mpi_comm_oce,ngrp_opa,ierr)
[1345]1039         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_opa : ', ngrp_opa
1040         !-$$        CALL flush(numout)
1041
1042         ! Create the znl group from the opa group
1043         CALL MPI_GROUP_INCL  ( ngrp_opa, ndim_rank_znl, nrank_znl, ngrp_znl, ierr )
1044         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_znl ', ngrp_znl
1045         !-$$        CALL flush(numout)
1046
1047         ! Create the znl communicator from the opa communicator, ie the pool of procs in the same row
[9570]1048         CALL MPI_COMM_CREATE ( mpi_comm_oce, ngrp_znl, ncomm_znl, ierr )
[1345]1049         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ncomm_znl ', ncomm_znl
1050         !-$$        CALL flush(numout)
1051         !
1052      END IF
1053
1054      ! Determines if processor if the first (starting from i=1) on the row
[3764]1055      IF ( jpni == 1 ) THEN
[1345]1056         l_znl_root = .TRUE.
1057      ELSE
1058         l_znl_root = .FALSE.
1059         kwork (1) = nimpp
[10425]1060         CALL mpp_min ( 'lib_mpp', kwork(1), kcom = ncomm_znl)
[1345]1061         IF ( nimpp == kwork(1)) l_znl_root = .TRUE.
1062      END IF
1063
[2715]1064      DEALLOCATE(kwork)
[11536]1065#endif
[2715]1066
[1345]1067   END SUBROUTINE mpp_ini_znl
1068
1069
[1344]1070   SUBROUTINE mpp_ini_north
1071      !!----------------------------------------------------------------------
1072      !!               ***  routine mpp_ini_north  ***
1073      !!
[3764]1074      !! ** Purpose :   Initialize special communicator for north folding
[1344]1075      !!      condition together with global variables needed in the mpp folding
1076      !!
1077      !! ** Method  : - Look for northern processors
1078      !!              - Put their number in nrank_north
1079      !!              - Create groups for the world processors and the north processors
1080      !!              - Create a communicator for northern processors
1081      !!
1082      !! ** output
1083      !!      njmppmax = njmpp for northern procs
1084      !!      ndim_rank_north = number of processors in the northern line
1085      !!      nrank_north (ndim_rank_north) = number  of the northern procs.
1086      !!      ngrp_world = group ID for the world processors
1087      !!      ngrp_north = group ID for the northern processors
1088      !!      ncomm_north = communicator for the northern procs.
1089      !!      north_root = number (in the world) of proc 0 in the northern comm.
1090      !!
1091      !!----------------------------------------------------------------------
1092      INTEGER ::   ierr
1093      INTEGER ::   jjproc
1094      INTEGER ::   ii, ji
1095      !!----------------------------------------------------------------------
1096      !
[11536]1097#if defined key_mpp_mpi
[1344]1098      njmppmax = MAXVAL( njmppt )
1099      !
1100      ! Look for how many procs on the northern boundary
1101      ndim_rank_north = 0
[13286]1102      DO jjproc = 1, jpni
1103         IF( nfproc(jjproc) /= -1 )   ndim_rank_north = ndim_rank_north + 1
[1344]1104      END DO
1105      !
1106      ! Allocate the right size to nrank_north
[1441]1107      IF (ALLOCATED (nrank_north)) DEALLOCATE(nrank_north)
[1344]1108      ALLOCATE( nrank_north(ndim_rank_north) )
[869]1109
[1344]1110      ! Fill the nrank_north array with proc. number of northern procs.
1111      ! Note : the rank start at 0 in MPI
1112      ii = 0
[13286]1113      DO ji = 1, jpni
1114         IF ( nfproc(ji) /= -1   ) THEN
[1344]1115            ii=ii+1
[13286]1116            nrank_north(ii)=nfproc(ji)
[1344]1117         END IF
1118      END DO
1119      !
1120      ! create the world group
[9570]1121      CALL MPI_COMM_GROUP( mpi_comm_oce, ngrp_world, ierr )
[1344]1122      !
1123      ! Create the North group from the world group
1124      CALL MPI_GROUP_INCL( ngrp_world, ndim_rank_north, nrank_north, ngrp_north, ierr )
1125      !
1126      ! Create the North communicator , ie the pool of procs in the north group
[9570]1127      CALL MPI_COMM_CREATE( mpi_comm_oce, ngrp_north, ncomm_north, ierr )
[1344]1128      !
[11536]1129#endif
[1344]1130   END SUBROUTINE mpp_ini_north
[869]1131
1132
[9019]1133   SUBROUTINE DDPDD_MPI( ydda, yddb, ilen, itype )
[1976]1134      !!---------------------------------------------------------------------
1135      !!   Routine DDPDD_MPI: used by reduction operator MPI_SUMDD
1136      !!
1137      !!   Modification of original codes written by David H. Bailey
1138      !!   This subroutine computes yddb(i) = ydda(i)+yddb(i)
1139      !!---------------------------------------------------------------------
[9019]1140      INTEGER                     , INTENT(in)    ::   ilen, itype
[13226]1141      COMPLEX(dp), DIMENSION(ilen), INTENT(in)    ::   ydda
1142      COMPLEX(dp), DIMENSION(ilen), INTENT(inout) ::   yddb
[1976]1143      !
[13226]1144      REAL(dp) :: zerr, zt1, zt2    ! local work variables
[9019]1145      INTEGER  :: ji, ztmp           ! local scalar
1146      !!---------------------------------------------------------------------
1147      !
[1976]1148      ztmp = itype   ! avoid compilation warning
[9019]1149      !
[1976]1150      DO ji=1,ilen
1151      ! Compute ydda + yddb using Knuth's trick.
1152         zt1  = real(ydda(ji)) + real(yddb(ji))
1153         zerr = zt1 - real(ydda(ji))
1154         zt2  = ((real(yddb(ji)) - zerr) + (real(ydda(ji)) - (zt1 - zerr))) &
1155                + aimag(ydda(ji)) + aimag(yddb(ji))
1156
1157         ! The result is zt1 + zt2, after normalization.
1158         yddb(ji) = cmplx ( zt1 + zt2, zt2 - ((zt1 + zt2) - zt1),wp )
1159      END DO
[9019]1160      !
[1976]1161   END SUBROUTINE DDPDD_MPI
1162
[6140]1163
[10437]1164   SUBROUTINE mpp_report( cdname, kpk, kpl, kpf, ld_lbc, ld_glb, ld_dlg )
[10425]1165      !!----------------------------------------------------------------------
1166      !!                  ***  routine mpp_report  ***
1167      !!
1168      !! ** Purpose :   report use of mpp routines per time-setp
1169      !!
1170      !!----------------------------------------------------------------------
1171      CHARACTER(len=*),           INTENT(in   ) ::   cdname      ! name of the calling subroutine
1172      INTEGER         , OPTIONAL, INTENT(in   ) ::   kpk, kpl, kpf
[10437]1173      LOGICAL         , OPTIONAL, INTENT(in   ) ::   ld_lbc, ld_glb, ld_dlg
[10425]1174      !!
[10982]1175      CHARACTER(len=128)                        ::   ccountname  ! name of a subroutine to count communications
[10437]1176      LOGICAL ::   ll_lbc, ll_glb, ll_dlg
[10982]1177      INTEGER ::    ji,  jj,  jk,  jh, jf, jcount   ! dummy loop indices
[10425]1178      !!----------------------------------------------------------------------
[11536]1179#if defined key_mpp_mpi
[10425]1180      !
1181      ll_lbc = .FALSE.
1182      IF( PRESENT(ld_lbc) ) ll_lbc = ld_lbc
1183      ll_glb = .FALSE.
1184      IF( PRESENT(ld_glb) ) ll_glb = ld_glb
[10437]1185      ll_dlg = .FALSE.
1186      IF( PRESENT(ld_dlg) ) ll_dlg = ld_dlg
[10425]1187      !
1188      ! find the smallest common frequency: default = frequency product, if multiple, choose the larger of the 2 frequency
1189      ncom_freq = ncom_fsbc
1190      !
1191      IF ( ncom_stp == nit000+ncom_freq ) THEN   ! avoid to count extra communications in potential initializations at nit000
1192         IF( ll_lbc ) THEN
1193            IF( .NOT. ALLOCATED(ncomm_sequence) ) ALLOCATE( ncomm_sequence(ncom_rec_max,2) )
1194            IF( .NOT. ALLOCATED(    crname_lbc) ) ALLOCATE(     crname_lbc(ncom_rec_max  ) )
1195            n_sequence_lbc = n_sequence_lbc + 1
1196            IF( n_sequence_lbc > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
1197            crname_lbc(n_sequence_lbc) = cdname     ! keep the name of the calling routine
1198            ncomm_sequence(n_sequence_lbc,1) = kpk*kpl   ! size of 3rd and 4th dimensions
1199            ncomm_sequence(n_sequence_lbc,2) = kpf       ! number of arrays to be treated (multi)
1200         ENDIF
1201         IF( ll_glb ) THEN
1202            IF( .NOT. ALLOCATED(crname_glb) ) ALLOCATE( crname_glb(ncom_rec_max) )
1203            n_sequence_glb = n_sequence_glb + 1
1204            IF( n_sequence_glb > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
1205            crname_glb(n_sequence_glb) = cdname     ! keep the name of the calling routine
1206         ENDIF
[10437]1207         IF( ll_dlg ) THEN
1208            IF( .NOT. ALLOCATED(crname_dlg) ) ALLOCATE( crname_dlg(ncom_rec_max) )
1209            n_sequence_dlg = n_sequence_dlg + 1
1210            IF( n_sequence_dlg > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
1211            crname_dlg(n_sequence_dlg) = cdname     ! keep the name of the calling routine
1212         ENDIF
[10425]1213      ELSE IF ( ncom_stp == nit000+2*ncom_freq ) THEN
1214         CALL ctl_opn( numcom, 'communication_report.txt', 'REPLACE', 'FORMATTED', 'SEQUENTIAL', -1, numout, .FALSE., narea )
1215         WRITE(numcom,*) ' '
1216         WRITE(numcom,*) ' ------------------------------------------------------------'
1217         WRITE(numcom,*) ' Communication pattern report (second oce+sbc+top time step):'
1218         WRITE(numcom,*) ' ------------------------------------------------------------'
1219         WRITE(numcom,*) ' '
1220         WRITE(numcom,'(A,I4)') ' Exchanged halos : ', n_sequence_lbc
1221         jj = 0; jk = 0; jf = 0; jh = 0
1222         DO ji = 1, n_sequence_lbc
1223            IF ( ncomm_sequence(ji,1) .GT. 1 ) jk = jk + 1
1224            IF ( ncomm_sequence(ji,2) .GT. 1 ) jf = jf + 1
1225            IF ( ncomm_sequence(ji,1) .GT. 1 .AND. ncomm_sequence(ji,2) .GT. 1 ) jj = jj + 1
1226            jh = MAX (jh, ncomm_sequence(ji,1)*ncomm_sequence(ji,2))
1227         END DO
1228         WRITE(numcom,'(A,I3)') ' 3D Exchanged halos : ', jk
1229         WRITE(numcom,'(A,I3)') ' Multi arrays exchanged halos : ', jf
1230         WRITE(numcom,'(A,I3)') '   from which 3D : ', jj
1231         WRITE(numcom,'(A,I10)') ' Array max size : ', jh*jpi*jpj
1232         WRITE(numcom,*) ' '
1233         WRITE(numcom,*) ' lbc_lnk called'
[10982]1234         DO ji = 1, n_sequence_lbc - 1
1235            IF ( crname_lbc(ji) /= 'already counted' ) THEN
1236               ccountname = crname_lbc(ji)
1237               crname_lbc(ji) = 'already counted'
1238               jcount = 1
1239               DO jj = ji + 1, n_sequence_lbc
1240                  IF ( ccountname ==  crname_lbc(jj) ) THEN
1241                     jcount = jcount + 1
1242                     crname_lbc(jj) = 'already counted'
1243                  END IF
1244               END DO
1245               WRITE(numcom,'(A, I4, A, A)') ' - ', jcount,' times by subroutine ', TRIM(ccountname)
[10425]1246            END IF
1247         END DO
[10982]1248         IF ( crname_lbc(n_sequence_lbc) /= 'already counted' ) THEN
1249            WRITE(numcom,'(A, I4, A, A)') ' - ', 1,' times by subroutine ', TRIM(crname_lbc(ncom_rec_max))
1250         END IF
[10425]1251         WRITE(numcom,*) ' '
1252         IF ( n_sequence_glb > 0 ) THEN
1253            WRITE(numcom,'(A,I4)') ' Global communications : ', n_sequence_glb
1254            jj = 1
1255            DO ji = 2, n_sequence_glb
1256               IF( crname_glb(ji-1) /= crname_glb(ji) ) THEN
1257                  WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_glb(ji-1))
1258                  jj = 0
1259               END IF
1260               jj = jj + 1 
1261            END DO
1262            WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_glb(n_sequence_glb))
1263            DEALLOCATE(crname_glb)
1264         ELSE
1265            WRITE(numcom,*) ' No MPI global communication '
1266         ENDIF
1267         WRITE(numcom,*) ' '
[10437]1268         IF ( n_sequence_dlg > 0 ) THEN
1269            WRITE(numcom,'(A,I4)') ' Delayed global communications : ', n_sequence_dlg
1270            jj = 1
1271            DO ji = 2, n_sequence_dlg
1272               IF( crname_dlg(ji-1) /= crname_dlg(ji) ) THEN
1273                  WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_dlg(ji-1))
1274                  jj = 0
1275               END IF
1276               jj = jj + 1 
1277            END DO
1278            WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_dlg(n_sequence_dlg))
1279            DEALLOCATE(crname_dlg)
1280         ELSE
1281            WRITE(numcom,*) ' No MPI delayed global communication '
1282         ENDIF
1283         WRITE(numcom,*) ' '
[10425]1284         WRITE(numcom,*) ' -----------------------------------------------'
1285         WRITE(numcom,*) ' '
1286         DEALLOCATE(ncomm_sequence)
1287         DEALLOCATE(crname_lbc)
1288      ENDIF
[11536]1289#endif
[10425]1290   END SUBROUTINE mpp_report
1291
[6140]1292   
[10425]1293   SUBROUTINE tic_tac (ld_tic, ld_global)
1294
1295    LOGICAL,           INTENT(IN) :: ld_tic
1296    LOGICAL, OPTIONAL, INTENT(IN) :: ld_global
[13226]1297    REAL(dp), DIMENSION(2), SAVE :: tic_wt
1298    REAL(dp),               SAVE :: tic_ct = 0._dp
[10425]1299    INTEGER :: ii
[11536]1300#if defined key_mpp_mpi
[10425]1301
1302    IF( ncom_stp <= nit000 ) RETURN
1303    IF( ncom_stp == nitend ) RETURN
1304    ii = 1
1305    IF( PRESENT( ld_global ) ) THEN
1306       IF( ld_global ) ii = 2
1307    END IF
1308   
1309    IF ( ld_tic ) THEN
1310       tic_wt(ii) = MPI_Wtime()                                                    ! start count tic->tac (waiting time)
[13226]1311       IF ( tic_ct > 0.0_dp ) compute_time = compute_time + MPI_Wtime() - tic_ct   ! cumulate count tac->tic
[10425]1312    ELSE
1313       waiting_time(ii) = waiting_time(ii) + MPI_Wtime() - tic_wt(ii)              ! cumulate count tic->tac
1314       tic_ct = MPI_Wtime()                                                        ! start count tac->tic (waiting time)
1315    ENDIF
[11536]1316#endif
[10425]1317   
1318   END SUBROUTINE tic_tac
1319
[11536]1320#if ! defined key_mpp_mpi
1321   SUBROUTINE mpi_wait(request, status, ierror)
1322      INTEGER                            , INTENT(in   ) ::   request
1323      INTEGER, DIMENSION(MPI_STATUS_SIZE), INTENT(  out) ::   status
1324      INTEGER                            , INTENT(  out) ::   ierror
1325   END SUBROUTINE mpi_wait
[1976]1326
[10425]1327   
[11536]1328   FUNCTION MPI_Wtime()
1329      REAL(wp) ::  MPI_Wtime
1330      MPI_Wtime = -1.
1331   END FUNCTION MPI_Wtime
[3]1332#endif
[2715]1333
[13]1334   !!----------------------------------------------------------------------
[12377]1335   !!   ctl_stop, ctl_warn, get_unit, ctl_opn, ctl_nam, load_nml   routines
[2715]1336   !!----------------------------------------------------------------------
1337
1338   SUBROUTINE ctl_stop( cd1, cd2, cd3, cd4, cd5 ,   &
1339      &                 cd6, cd7, cd8, cd9, cd10 )
1340      !!----------------------------------------------------------------------
1341      !!                  ***  ROUTINE  stop_opa  ***
1342      !!
[3764]1343      !! ** Purpose :   print in ocean.outpput file a error message and
[2715]1344      !!                increment the error number (nstop) by one.
1345      !!----------------------------------------------------------------------
[11536]1346      CHARACTER(len=*), INTENT(in   )           ::   cd1
1347      CHARACTER(len=*), INTENT(in   ), OPTIONAL ::        cd2, cd3, cd4, cd5
1348      CHARACTER(len=*), INTENT(in   ), OPTIONAL ::   cd6, cd7, cd8, cd9, cd10
[12933]1349      !
1350      CHARACTER(LEN=8) ::   clfmt            ! writing format
[13011]1351      INTEGER          ::   inum
[2715]1352      !!----------------------------------------------------------------------
1353      !
[3764]1354      nstop = nstop + 1
[11536]1355      !
[13011]1356      IF( cd1 == 'STOP' .AND. narea /= 1 ) THEN    ! Immediate stop: add an arror message in 'ocean.output' file
1357         CALL ctl_opn( inum, 'ocean.output', 'APPEND', 'FORMATTED', 'SEQUENTIAL', -1, 6, .FALSE. )
1358         WRITE(inum,*)
1359         WRITE(inum,*) ' ==>>>   Look for "E R R O R" messages in all existing *ocean.output* files'
1360         CLOSE(inum)
[12933]1361      ENDIF
[13011]1362      IF( numout == 6 ) THEN                       ! force to open ocean.output file if not already opened
1363         CALL ctl_opn( numout, 'ocean.output', 'REPLACE', 'FORMATTED', 'SEQUENTIAL', -1, -1, .FALSE., narea )
1364      ENDIF
[11536]1365      !
1366                            WRITE(numout,*)
1367                            WRITE(numout,*) ' ===>>> : E R R O R'
1368                            WRITE(numout,*)
1369                            WRITE(numout,*) '         ==========='
1370                            WRITE(numout,*)
1371                            WRITE(numout,*) TRIM(cd1)
[10425]1372      IF( PRESENT(cd2 ) )   WRITE(numout,*) TRIM(cd2)
1373      IF( PRESENT(cd3 ) )   WRITE(numout,*) TRIM(cd3)
1374      IF( PRESENT(cd4 ) )   WRITE(numout,*) TRIM(cd4)
1375      IF( PRESENT(cd5 ) )   WRITE(numout,*) TRIM(cd5)
1376      IF( PRESENT(cd6 ) )   WRITE(numout,*) TRIM(cd6)
1377      IF( PRESENT(cd7 ) )   WRITE(numout,*) TRIM(cd7)
1378      IF( PRESENT(cd8 ) )   WRITE(numout,*) TRIM(cd8)
1379      IF( PRESENT(cd9 ) )   WRITE(numout,*) TRIM(cd9)
1380      IF( PRESENT(cd10) )   WRITE(numout,*) TRIM(cd10)
[11536]1381                            WRITE(numout,*)
1382      !
[2715]1383                               CALL FLUSH(numout    )
1384      IF( numstp     /= -1 )   CALL FLUSH(numstp    )
[9019]1385      IF( numrun     /= -1 )   CALL FLUSH(numrun    )
[2715]1386      IF( numevo_ice /= -1 )   CALL FLUSH(numevo_ice)
1387      !
1388      IF( cd1 == 'STOP' ) THEN
[11536]1389         WRITE(numout,*) 
[10425]1390         WRITE(numout,*)  'huge E-R-R-O-R : immediate stop'
[11536]1391         WRITE(numout,*) 
[12933]1392         CALL FLUSH(numout)
1393         CALL SLEEP(60)   ! make sure that all output and abort files are written by all cores. 60s should be enough...
[11536]1394         CALL mppstop( ld_abort = .true. )
[2715]1395      ENDIF
1396      !
1397   END SUBROUTINE ctl_stop
1398
1399
1400   SUBROUTINE ctl_warn( cd1, cd2, cd3, cd4, cd5,   &
1401      &                 cd6, cd7, cd8, cd9, cd10 )
1402      !!----------------------------------------------------------------------
1403      !!                  ***  ROUTINE  stop_warn  ***
1404      !!
[3764]1405      !! ** Purpose :   print in ocean.outpput file a error message and
[2715]1406      !!                increment the warning number (nwarn) by one.
1407      !!----------------------------------------------------------------------
1408      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd1, cd2, cd3, cd4, cd5
1409      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd6, cd7, cd8, cd9, cd10
1410      !!----------------------------------------------------------------------
[3764]1411      !
1412      nwarn = nwarn + 1
[11536]1413      !
[2715]1414      IF(lwp) THEN
[11536]1415                               WRITE(numout,*)
1416                               WRITE(numout,*) ' ===>>> : W A R N I N G'
1417                               WRITE(numout,*)
1418                               WRITE(numout,*) '         ==============='
1419                               WRITE(numout,*)
1420         IF( PRESENT(cd1 ) )   WRITE(numout,*) TRIM(cd1)
1421         IF( PRESENT(cd2 ) )   WRITE(numout,*) TRIM(cd2)
1422         IF( PRESENT(cd3 ) )   WRITE(numout,*) TRIM(cd3)
1423         IF( PRESENT(cd4 ) )   WRITE(numout,*) TRIM(cd4)
1424         IF( PRESENT(cd5 ) )   WRITE(numout,*) TRIM(cd5)
1425         IF( PRESENT(cd6 ) )   WRITE(numout,*) TRIM(cd6)
1426         IF( PRESENT(cd7 ) )   WRITE(numout,*) TRIM(cd7)
1427         IF( PRESENT(cd8 ) )   WRITE(numout,*) TRIM(cd8)
1428         IF( PRESENT(cd9 ) )   WRITE(numout,*) TRIM(cd9)
1429         IF( PRESENT(cd10) )   WRITE(numout,*) TRIM(cd10)
1430                               WRITE(numout,*)
[2715]1431      ENDIF
1432      CALL FLUSH(numout)
1433      !
1434   END SUBROUTINE ctl_warn
1435
1436
1437   SUBROUTINE ctl_opn( knum, cdfile, cdstat, cdform, cdacce, klengh, kout, ldwp, karea )
1438      !!----------------------------------------------------------------------
1439      !!                  ***  ROUTINE ctl_opn  ***
1440      !!
1441      !! ** Purpose :   Open file and check if required file is available.
1442      !!
1443      !! ** Method  :   Fortan open
1444      !!----------------------------------------------------------------------
1445      INTEGER          , INTENT(  out) ::   knum      ! logical unit to open
1446      CHARACTER(len=*) , INTENT(in   ) ::   cdfile    ! file name to open
1447      CHARACTER(len=*) , INTENT(in   ) ::   cdstat    ! disposition specifier
1448      CHARACTER(len=*) , INTENT(in   ) ::   cdform    ! formatting specifier
1449      CHARACTER(len=*) , INTENT(in   ) ::   cdacce    ! access specifier
1450      INTEGER          , INTENT(in   ) ::   klengh    ! record length
1451      INTEGER          , INTENT(in   ) ::   kout      ! number of logical units for write
1452      LOGICAL          , INTENT(in   ) ::   ldwp      ! boolean term for print
1453      INTEGER, OPTIONAL, INTENT(in   ) ::   karea     ! proc number
[5836]1454      !
[2715]1455      CHARACTER(len=80) ::   clfile
[12933]1456      CHARACTER(LEN=10) ::   clfmt            ! writing format
[2715]1457      INTEGER           ::   iost
[13062]1458      INTEGER           ::   idg              ! number of digits
[2715]1459      !!----------------------------------------------------------------------
[5836]1460      !
[2715]1461      ! adapt filename
1462      ! ----------------
1463      clfile = TRIM(cdfile)
1464      IF( PRESENT( karea ) ) THEN
[12933]1465         IF( karea > 1 ) THEN
[13009]1466            ! Warning: jpnij is maybe not already defined when calling ctl_opn -> use mppsize instead of jpnij
1467            idg = MAX( INT(LOG10(REAL(MAX(1,mppsize-1),wp))) + 1, 4 )      ! how many digits to we need to write? min=4, max=9
1468            WRITE(clfmt, "('(a,a,i', i1, '.', i1, ')')") idg, idg          ! '(a,a,ix.x)'
[12933]1469            WRITE(clfile, clfmt) TRIM(clfile), '_', karea-1
1470         ENDIF
[2715]1471      ENDIF
1472#if defined key_agrif
1473      IF( .NOT. Agrif_Root() )   clfile = TRIM(Agrif_CFixed())//'_'//TRIM(clfile)
1474      knum=Agrif_Get_Unit()
1475#else
1476      knum=get_unit()
1477#endif
[10425]1478      IF( TRIM(cdfile) == '/dev/null' )   clfile = TRIM(cdfile)   ! force the use of /dev/null
[5836]1479      !
[11536]1480      IF(       cdacce(1:6) == 'DIRECT' )  THEN   ! cdacce has always more than 6 characters
[10425]1481         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS=cdstat, RECL=klengh         , ERR=100, IOSTAT=iost )
1482      ELSE IF( TRIM(cdstat) == 'APPEND' )  THEN   ! cdstat can have less than 6 characters
1483         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS='UNKNOWN', POSITION='APPEND', ERR=100, IOSTAT=iost )
[2715]1484      ELSE
[10425]1485         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS=cdstat                      , ERR=100, IOSTAT=iost )
[2715]1486      ENDIF
[10425]1487      IF( iost /= 0 .AND. TRIM(clfile) == '/dev/null' ) &   ! for windows
1488         &  OPEN(UNIT=knum,FILE='NUL', FORM=cdform, ACCESS=cdacce, STATUS=cdstat                      , ERR=100, IOSTAT=iost )   
[2715]1489      IF( iost == 0 ) THEN
[12377]1490         IF(ldwp .AND. kout > 0) THEN
[10425]1491            WRITE(kout,*) '     file   : ', TRIM(clfile),' open ok'
[2715]1492            WRITE(kout,*) '     unit   = ', knum
1493            WRITE(kout,*) '     status = ', cdstat
1494            WRITE(kout,*) '     form   = ', cdform
1495            WRITE(kout,*) '     access = ', cdacce
1496            WRITE(kout,*)
1497         ENDIF
1498      ENDIF
1499100   CONTINUE
1500      IF( iost /= 0 ) THEN
[11536]1501         WRITE(ctmp1,*) ' ===>>>> : bad opening file: ', TRIM(clfile)
1502         WRITE(ctmp2,*) ' =======   ===  '
1503         WRITE(ctmp3,*) '           unit   = ', knum
1504         WRITE(ctmp4,*) '           status = ', cdstat
1505         WRITE(ctmp5,*) '           form   = ', cdform
1506         WRITE(ctmp6,*) '           access = ', cdacce
1507         WRITE(ctmp7,*) '           iostat = ', iost
1508         WRITE(ctmp8,*) '           we stop. verify the file '
1509         CALL ctl_stop( 'STOP', ctmp1, ctmp2, ctmp3, ctmp4, ctmp5, ctmp6, ctmp7, ctmp8 )
[2715]1510      ENDIF
[5836]1511      !
[2715]1512   END SUBROUTINE ctl_opn
1513
[5836]1514
[11536]1515   SUBROUTINE ctl_nam ( kios, cdnam )
[4147]1516      !!----------------------------------------------------------------------
1517      !!                  ***  ROUTINE ctl_nam  ***
1518      !!
1519      !! ** Purpose :   Informations when error while reading a namelist
1520      !!
1521      !! ** Method  :   Fortan open
1522      !!----------------------------------------------------------------------
[11536]1523      INTEGER                                , INTENT(inout) ::   kios    ! IO status after reading the namelist
1524      CHARACTER(len=*)                       , INTENT(in   ) ::   cdnam   ! group name of namelist for which error occurs
1525      !
1526      CHARACTER(len=5) ::   clios   ! string to convert iostat in character for print
[4147]1527      !!----------------------------------------------------------------------
[5836]1528      !
[7646]1529      WRITE (clios, '(I5.0)')   kios
[4147]1530      IF( kios < 0 ) THEN         
[5836]1531         CALL ctl_warn( 'end of record or file while reading namelist '   &
1532            &           // TRIM(cdnam) // ' iostat = ' // TRIM(clios) )
[4147]1533      ENDIF
[5836]1534      !
[4147]1535      IF( kios > 0 ) THEN
[5836]1536         CALL ctl_stop( 'misspelled variable in namelist '   &
1537            &           // TRIM(cdnam) // ' iostat = ' // TRIM(clios) )
[4147]1538      ENDIF
1539      kios = 0
[5836]1540      !
[4147]1541   END SUBROUTINE ctl_nam
1542
[5836]1543
[2715]1544   INTEGER FUNCTION get_unit()
1545      !!----------------------------------------------------------------------
1546      !!                  ***  FUNCTION  get_unit  ***
1547      !!
1548      !! ** Purpose :   return the index of an unused logical unit
1549      !!----------------------------------------------------------------------
[3764]1550      LOGICAL :: llopn
[2715]1551      !!----------------------------------------------------------------------
1552      !
1553      get_unit = 15   ! choose a unit that is big enough then it is not already used in NEMO
1554      llopn = .TRUE.
1555      DO WHILE( (get_unit < 998) .AND. llopn )
1556         get_unit = get_unit + 1
1557         INQUIRE( unit = get_unit, opened = llopn )
1558      END DO
1559      IF( (get_unit == 999) .AND. llopn ) THEN
[11536]1560         CALL ctl_stop( 'STOP', 'get_unit: All logical units until 999 are used...' )
[2715]1561      ENDIF
1562      !
1563   END FUNCTION get_unit
1564
[12377]1565   SUBROUTINE load_nml( cdnambuff , cdnamfile, kout, ldwp)
1566      CHARACTER(LEN=:)    , ALLOCATABLE, INTENT(INOUT) :: cdnambuff
1567      CHARACTER(LEN=*), INTENT(IN )                :: cdnamfile
1568      CHARACTER(LEN=256)                           :: chline
1569      CHARACTER(LEN=1)                             :: csp
1570      INTEGER, INTENT(IN)                          :: kout
1571      LOGICAL, INTENT(IN)                          :: ldwp  !: .true. only for the root broadcaster
1572      INTEGER                                      :: itot, iun, iltc, inl, ios, itotsav
1573      !
1574      !csp = NEW_LINE('A')
1575      ! a new line character is the best seperator but some systems (e.g.Cray)
1576      ! seem to terminate namelist reads from internal files early if they
1577      ! encounter new-lines. Use a single space for safety.
1578      csp = ' '
1579      !
1580      ! Check if the namelist buffer has already been allocated. Return if it has.
1581      !
1582      IF ( ALLOCATED( cdnambuff ) ) RETURN
1583      IF( ldwp ) THEN
1584         !
1585         ! Open namelist file
1586         !
1587         CALL ctl_opn( iun, cdnamfile, 'OLD', 'FORMATTED', 'SEQUENTIAL', -1, kout, ldwp )
1588         !
1589         ! First pass: count characters excluding comments and trimable white space
1590         !
1591         itot=0
1592     10  READ(iun,'(A256)',END=20,ERR=20) chline
1593         iltc = LEN_TRIM(chline)
1594         IF ( iltc.GT.0 ) THEN
1595          inl = INDEX(chline, '!') 
1596          IF( inl.eq.0 ) THEN
1597           itot = itot + iltc + 1                                ! +1 for the newline character
1598          ELSEIF( inl.GT.0 .AND. LEN_TRIM( chline(1:inl-1) ).GT.0 ) THEN
1599           itot = itot + inl                                  !  includes +1 for the newline character
1600          ENDIF
1601         ENDIF
1602         GOTO 10
1603     20  CONTINUE
1604         !
1605         ! Allocate text cdnambuff for condensed namelist
1606         !
1607!$AGRIF_DO_NOT_TREAT
1608         ALLOCATE( CHARACTER(LEN=itot) :: cdnambuff )
1609!$AGRIF_END_DO_NOT_TREAT
1610         itotsav = itot
1611         !
1612         ! Second pass: read and transfer pruned characters into cdnambuff
1613         !
1614         REWIND(iun)
1615         itot=1
1616     30  READ(iun,'(A256)',END=40,ERR=40) chline
1617         iltc = LEN_TRIM(chline)
1618         IF ( iltc.GT.0 ) THEN
1619          inl = INDEX(chline, '!')
1620          IF( inl.eq.0 ) THEN
1621           inl = iltc
1622          ELSE
1623           inl = inl - 1
1624          ENDIF
1625          IF( inl.GT.0 .AND. LEN_TRIM( chline(1:inl) ).GT.0 ) THEN
1626             cdnambuff(itot:itot+inl-1) = chline(1:inl)
1627             WRITE( cdnambuff(itot+inl:itot+inl), '(a)' ) csp
1628             itot = itot + inl + 1
1629          ENDIF
1630         ENDIF
1631         GOTO 30
1632     40  CONTINUE
1633         itot = itot - 1
1634         IF( itotsav .NE. itot ) WRITE(*,*) 'WARNING in load_nml. Allocated ',itotsav,' for read buffer; but used ',itot
1635         !
1636         ! Close namelist file
1637         !
1638         CLOSE(iun)
1639         !write(*,'(32A)') cdnambuff
1640      ENDIF
1641#if defined key_mpp_mpi
1642      CALL mpp_bcast_nml( cdnambuff, itot )
1643#endif
1644  END SUBROUTINE load_nml
1645
1646
[2715]1647   !!----------------------------------------------------------------------
[3]1648END MODULE lib_mpp
Note: See TracBrowser for help on using the repository browser.