New URL for NEMO forge!   http://forge.nemo-ocean.eu

Since March 2022 along with NEMO 4.2 release, the code development moved to a self-hosted GitLab.
This present forge is now archived and remained online for history.
lib_mpp.F90 in NEMO/trunk/src/OCE/LBC – NEMO

source: NEMO/trunk/src/OCE/LBC/lib_mpp.F90 @ 14433

Last change on this file since 14433 was 14433, checked in by smasson, 3 years ago

trunk: merge dev_r14312_MPI_Interface into the trunk, #2598

  • Property svn:keywords set to Id
File size: 73.3 KB
Line 
1MODULE lib_mpp
2   !!======================================================================
3   !!                       ***  MODULE  lib_mpp  ***
4   !! Ocean numerics:  massively parallel processing library
5   !!=====================================================================
6   !! History :  OPA  !  1994  (M. Guyon, J. Escobar, M. Imbard)  Original code
7   !!            7.0  !  1997  (A.M. Treguier)  SHMEM additions
8   !!            8.0  !  1998  (M. Imbard, J. Escobar, L. Colombet ) SHMEM and MPI
9   !!                 !  1998  (J.M. Molines) Open boundary conditions
10   !!   NEMO     1.0  !  2003  (J.M. Molines, G. Madec)  F90, free form
11   !!                 !  2003  (J.M. Molines) add mpp_ini_north(_3d,_2d)
12   !!             -   !  2004  (R. Bourdalle Badie)  isend option in mpi
13   !!                 !  2004  (J.M. Molines) minloc, maxloc
14   !!             -   !  2005  (G. Madec, S. Masson)  npolj=5,6 F-point & ice cases
15   !!             -   !  2005  (R. Redler) Replacement of MPI_COMM_WORLD except for MPI_Abort
16   !!             -   !  2005  (R. Benshila, G. Madec)  add extra halo case
17   !!             -   !  2008  (R. Benshila) add mpp_ini_ice
18   !!            3.2  !  2009  (R. Benshila) SHMEM suppression, north fold in lbc_nfd
19   !!            3.2  !  2009  (O. Marti)    add mpp_ini_znl
20   !!            4.0  !  2011  (G. Madec)  move ctl_ routines from in_out_manager
21   !!            3.5  !  2012  (S.Mocavero, I. Epicoco) Add mpp_lnk_bdy_3d/2d routines to optimize the BDY comm.
22   !!            3.5  !  2013  (C. Ethe, G. Madec)  message passing arrays as local variables
23   !!            3.5  !  2013  (S.Mocavero, I.Epicoco - CMCC) north fold optimizations
24   !!            3.6  !  2015  (O. Tintó and M. Castrillo - BSC) Added '_multiple' case for 2D lbc and max
25   !!            4.0  !  2017  (G. Madec) automatique allocation of array argument (use any 3rd dimension)
26   !!             -   !  2017  (G. Madec) create generic.h90 files to generate all lbc and north fold routines
27   !!----------------------------------------------------------------------
28
29   !!----------------------------------------------------------------------
30   !!   ctl_stop      : update momentum and tracer Kz from a tke scheme
31   !!   ctl_warn      : initialization, namelist read, and parameters control
32   !!   ctl_opn       : Open file and check if required file is available.
33   !!   ctl_nam       : Prints informations when an error occurs while reading a namelist
34   !!   load_nml      : Read, condense and buffer namelist file into character array for use as an internal file
35   !!----------------------------------------------------------------------
36   !!----------------------------------------------------------------------
37   !!   mpp_start     : get local communicator its size and rank
38   !!   mpp_lnk       : interface (defined in lbclnk) for message passing of 2d or 3d arrays (mpp_lnk_2d, mpp_lnk_3d)
39   !!   mpp_lnk_icb   : interface for message passing of 2d arrays with extra halo for icebergs (mpp_lnk_2d_icb)
40   !!   mpprecv       :
41   !!   mppsend       :
42   !!   mppscatter    :
43   !!   mppgather     :
44   !!   mpp_min       : generic interface for mppmin_int , mppmin_a_int , mppmin_real, mppmin_a_real
45   !!   mpp_max       : generic interface for mppmax_int , mppmax_a_int , mppmax_real, mppmax_a_real
46   !!   mpp_sum       : generic interface for mppsum_int , mppsum_a_int , mppsum_real, mppsum_a_real
47   !!   mpp_minloc    :
48   !!   mpp_maxloc    :
49   !!   mppsync       :
50   !!   mppstop       :
51   !!   mpp_ini_north : initialisation of north fold
52   !!   mpp_lbc_north_icb : alternative to mpp_nfd for extra outer halo with icebergs
53   !!   mpp_bcast_nml : broadcast/receive namelist character buffer from reading process to all others
54   !!----------------------------------------------------------------------
55   USE dom_oce        ! ocean space and time domain
56   USE in_out_manager ! I/O manager
57#if ! defined key_mpi_off
58   USE MPI
59#endif
60
61   IMPLICIT NONE
62   PRIVATE
63   !
64   PUBLIC   ctl_stop, ctl_warn, ctl_opn, ctl_nam, load_nml
65   PUBLIC   mpp_start, mppstop, mppsync, mpp_comm_free
66   PUBLIC   mpp_ini_north
67   PUBLIC   mpp_min, mpp_max, mpp_sum, mpp_minloc, mpp_maxloc
68   PUBLIC   mpp_delay_max, mpp_delay_sum, mpp_delay_rcv
69   PUBLIC   mppscatter, mppgather
70   PUBLIC   mpp_ini_znl
71   PUBLIC   mpp_ini_nc
72   PUBLIC   mppsend, mpprecv                          ! needed by TAM and ICB routines
73   PUBLIC   mppsend_sp, mpprecv_sp                          ! needed by TAM and ICB routines
74   PUBLIC   mppsend_dp, mpprecv_dp                          ! needed by TAM and ICB routines
75   PUBLIC   mpp_report
76   PUBLIC   mpp_bcast_nml
77   PUBLIC   tic_tac
78#if defined key_mpp_off
79   PUBLIC MPI_wait
80   PUBLIC MPI_Wtime
81#endif
82
83   !! * Interfaces
84   !! define generic interface for these routine as they are called sometimes
85   !! with scalar arguments instead of array arguments, which causes problems
86   !! for the compilation on AIX system as well as NEC and SGI. Ok on COMPACQ
87   INTERFACE mpp_min
88      MODULE PROCEDURE mppmin_a_int, mppmin_int
89      MODULE PROCEDURE mppmin_a_real_sp, mppmin_real_sp
90      MODULE PROCEDURE mppmin_a_real_dp, mppmin_real_dp
91   END INTERFACE
92   INTERFACE mpp_max
93      MODULE PROCEDURE mppmax_a_int, mppmax_int
94      MODULE PROCEDURE mppmax_a_real_sp, mppmax_real_sp
95      MODULE PROCEDURE mppmax_a_real_dp, mppmax_real_dp
96   END INTERFACE
97   INTERFACE mpp_sum
98      MODULE PROCEDURE mppsum_a_int, mppsum_int
99      MODULE PROCEDURE mppsum_realdd, mppsum_a_realdd
100      MODULE PROCEDURE mppsum_a_real_sp, mppsum_real_sp
101      MODULE PROCEDURE mppsum_a_real_dp, mppsum_real_dp
102   END INTERFACE
103   INTERFACE mpp_minloc
104      MODULE PROCEDURE mpp_minloc2d_sp ,mpp_minloc3d_sp
105      MODULE PROCEDURE mpp_minloc2d_dp ,mpp_minloc3d_dp
106   END INTERFACE
107   INTERFACE mpp_maxloc
108      MODULE PROCEDURE mpp_maxloc2d_sp ,mpp_maxloc3d_sp
109      MODULE PROCEDURE mpp_maxloc2d_dp ,mpp_maxloc3d_dp
110   END INTERFACE
111
112   TYPE, PUBLIC ::   PTR_4D_sp   !: array of 4D pointers (used in lbclnk and lbcnfd)
113      REAL(sp), DIMENSION (:,:,:,:), POINTER ::   pt4d
114   END TYPE PTR_4D_sp
115
116   TYPE, PUBLIC ::   PTR_4D_dp   !: array of 4D pointers (used in lbclnk and lbcnfd)
117      REAL(dp), DIMENSION (:,:,:,:), POINTER ::   pt4d
118   END TYPE PTR_4D_dp
119
120   !! ========================= !!
121   !!  MPI  variable definition !!
122   !! ========================= !!
123#if ! defined key_mpi_off
124   LOGICAL, PUBLIC, PARAMETER ::   lk_mpp = .TRUE.    !: mpp flag
125#else
126   INTEGER, PUBLIC, PARAMETER ::   MPI_STATUS_SIZE = 1
127   INTEGER, PUBLIC, PARAMETER ::   MPI_REAL = 4
128   INTEGER, PUBLIC, PARAMETER ::   MPI_DOUBLE_PRECISION = 8
129   LOGICAL, PUBLIC, PARAMETER ::   lk_mpp = .FALSE.    !: mpp flag
130#endif
131
132   INTEGER, PUBLIC ::   mppsize        ! number of process
133   INTEGER, PUBLIC ::   mpprank        ! process number  [ 0 - size-1 ]
134!$AGRIF_DO_NOT_TREAT
135   INTEGER, PUBLIC ::   mpi_comm_oce   ! opa local communicator
136!$AGRIF_END_DO_NOT_TREAT
137
138   INTEGER :: MPI_SUMDD
139
140   ! Neighbourgs informations
141   INTEGER,    PARAMETER, PUBLIC ::   n_hlsmax = 3
142   INTEGER, DIMENSION(         8), PUBLIC ::   mpinei      !: 8-neighbourg MPI indexes (starting at 0, -1 if no neighbourg)
143   INTEGER, DIMENSION(n_hlsmax,8), PUBLIC ::   mpiSnei     !: 8-neighbourg Send MPI indexes (starting at 0, -1 if no neighbourg)
144   INTEGER, DIMENSION(n_hlsmax,8), PUBLIC ::   mpiRnei     !: 8-neighbourg Recv MPI indexes (starting at 0, -1 if no neighbourg)
145   INTEGER,    PARAMETER, PUBLIC ::   jpwe = 1   !: WEst
146   INTEGER,    PARAMETER, PUBLIC ::   jpea = 2   !: EAst
147   INTEGER,    PARAMETER, PUBLIC ::   jpso = 3   !: SOuth
148   INTEGER,    PARAMETER, PUBLIC ::   jpno = 4   !: NOrth
149   INTEGER,    PARAMETER, PUBLIC ::   jpsw = 5   !: South-West
150   INTEGER,    PARAMETER, PUBLIC ::   jpse = 6   !: South-East
151   INTEGER,    PARAMETER, PUBLIC ::   jpnw = 7   !: North-West
152   INTEGER,    PARAMETER, PUBLIC ::   jpne = 8   !: North-East
153
154   LOGICAL, DIMENSION(8), PUBLIC ::   l_SelfPerio  !   should we explicitely take care of I/J periodicity
155   LOGICAL,               PUBLIC ::   l_IdoNFold
156
157   ! variables used for zonal integration
158   INTEGER, PUBLIC ::   ncomm_znl         !: communicator made by the processors on the same zonal average
159   LOGICAL, PUBLIC ::   l_znl_root        !: True on the 'left'most processor on the same row
160   INTEGER         ::   ngrp_znl          !: group ID for the znl processors
161   INTEGER         ::   ndim_rank_znl     !: number of processors on the same zonal average
162   INTEGER, DIMENSION(:), ALLOCATABLE, SAVE ::   nrank_znl  ! dimension ndim_rank_znl, number of the procs into the same znl domain
163
164   ! variables used for MPI3 neighbourhood collectives
165   INTEGER, DIMENSION(n_hlsmax), PUBLIC ::   mpi_nc_com4       ! MPI3 neighbourhood collectives communicator
166   INTEGER, DIMENSION(n_hlsmax), PUBLIC ::   mpi_nc_com8       ! MPI3 neighbourhood collectives communicator (with diagionals)
167
168   ! North fold condition in mpp_mpi with jpni > 1 (PUBLIC for TAM)
169   INTEGER, PUBLIC ::   ngrp_world        !: group ID for the world processors
170   INTEGER, PUBLIC ::   ngrp_opa          !: group ID for the opa processors
171   INTEGER, PUBLIC ::   ngrp_north        !: group ID for the northern processors (to be fold)
172   INTEGER, PUBLIC ::   ncomm_north       !: communicator made by the processors belonging to ngrp_north
173   INTEGER, PUBLIC ::   ndim_rank_north   !: number of 'sea' processor in the northern line (can be /= jpni !)
174   INTEGER, PUBLIC ::   njmppmax          !: value of njmpp for the processors of the northern line
175   INTEGER, PUBLIC ::   north_root        !: number (in the comm_opa) of proc 0 in the northern comm
176   INTEGER, PUBLIC, DIMENSION(:), ALLOCATABLE, SAVE ::   nrank_north   !: dimension ndim_rank_north
177
178   ! Communications summary report
179   CHARACTER(len=lca), DIMENSION(:), ALLOCATABLE ::   crname_lbc                   !: names of lbc_lnk calling routines
180   CHARACTER(len=lca), DIMENSION(:), ALLOCATABLE ::   crname_glb                   !: names of global comm calling routines
181   CHARACTER(len=lca), DIMENSION(:), ALLOCATABLE ::   crname_dlg                   !: names of delayed global comm calling routines
182   INTEGER, PUBLIC                               ::   ncom_stp = 0                 !: copy of time step # istp
183   INTEGER, PUBLIC                               ::   ncom_fsbc = 1                !: copy of sbc time step # nn_fsbc
184   INTEGER, PUBLIC                               ::   ncom_freq                    !: frequency of comm diagnostic
185   INTEGER, PUBLIC , DIMENSION(:,:), ALLOCATABLE ::   ncomm_sequence               !: size of communicated arrays (halos)
186   INTEGER, PARAMETER, PUBLIC                    ::   ncom_rec_max = 5000          !: max number of communication record
187   INTEGER, PUBLIC                               ::   n_sequence_lbc = 0           !: # of communicated arraysvia lbc
188   INTEGER, PUBLIC                               ::   n_sequence_glb = 0           !: # of global communications
189   INTEGER, PUBLIC                               ::   n_sequence_dlg = 0           !: # of delayed global communications
190   INTEGER, PUBLIC                               ::   numcom = -1                  !: logical unit for communicaton report
191   LOGICAL, PUBLIC                               ::   l_full_nf_update = .TRUE.    !: logical for a full (2lines) update of bc at North fold report
192   INTEGER,                    PARAMETER, PUBLIC ::   nbdelay = 2       !: number of delayed operations
193   !: name (used as id) of allreduce-delayed operations
194   ! Warning: we must use the same character length in an array constructor (at least for gcc compiler)
195   CHARACTER(len=32), DIMENSION(nbdelay), PUBLIC ::   c_delaylist = (/ 'cflice', 'fwb   ' /)
196   !: component name where the allreduce-delayed operation is performed
197   CHARACTER(len=3),  DIMENSION(nbdelay), PUBLIC ::   c_delaycpnt = (/ 'ICE'   , 'OCE' /)
198   TYPE, PUBLIC ::   DELAYARR
199      REAL(   wp), POINTER, DIMENSION(:) ::  z1d => NULL()
200      COMPLEX(dp), POINTER, DIMENSION(:) ::  y1d => NULL()
201   END TYPE DELAYARR
202   TYPE( DELAYARR ), DIMENSION(nbdelay), PUBLIC, SAVE  ::   todelay         !: must have SAVE for default initialization of DELAYARR
203   INTEGER,          DIMENSION(nbdelay), PUBLIC        ::   ndelayid = -1   !: mpi request id of the delayed operations
204
205   ! timing summary report
206   REAL(dp), DIMENSION(2), PUBLIC ::  waiting_time = 0._dp
207   REAL(dp)              , PUBLIC ::  compute_time = 0._dp, elapsed_time = 0._dp
208
209   REAL(wp), DIMENSION(:), ALLOCATABLE, SAVE ::   tampon   ! buffer in case of bsend
210
211   LOGICAL, PUBLIC ::   ln_nnogather                !: namelist control of northfold comms
212   INTEGER, PUBLIC ::   nn_comm                     !: namelist control of comms
213
214   INTEGER, PUBLIC, PARAMETER ::   jpfillnothing = 1
215   INTEGER, PUBLIC, PARAMETER ::   jpfillcst     = 2
216   INTEGER, PUBLIC, PARAMETER ::   jpfillcopy    = 3
217   INTEGER, PUBLIC, PARAMETER ::   jpfillperio   = 4
218   INTEGER, PUBLIC, PARAMETER ::   jpfillmpi     = 5
219
220   !! * Substitutions
221#  include "do_loop_substitute.h90"
222   !!----------------------------------------------------------------------
223   !! NEMO/OCE 4.0 , NEMO Consortium (2018)
224   !! $Id$
225   !! Software governed by the CeCILL license (see ./LICENSE)
226   !!----------------------------------------------------------------------
227CONTAINS
228
229   SUBROUTINE mpp_start( localComm )
230      !!----------------------------------------------------------------------
231      !!                  ***  routine mpp_start  ***
232      !!
233      !! ** Purpose :   get mpi_comm_oce, mpprank and mppsize
234      !!----------------------------------------------------------------------
235      INTEGER         , OPTIONAL   , INTENT(in   ) ::   localComm    !
236      !
237      INTEGER ::   ierr
238      LOGICAL ::   llmpi_init
239      !!----------------------------------------------------------------------
240#if ! defined key_mpi_off
241      !
242      CALL mpi_initialized ( llmpi_init, ierr )
243      IF( ierr /= MPI_SUCCESS ) CALL ctl_stop( 'STOP', ' lib_mpp: Error in routine mpi_initialized' )
244
245      IF( .NOT. llmpi_init ) THEN
246         IF( PRESENT(localComm) ) THEN
247            WRITE(ctmp1,*) ' lib_mpp: You cannot provide a local communicator '
248            WRITE(ctmp2,*) '          without calling MPI_Init before ! '
249            CALL ctl_stop( 'STOP', ctmp1, ctmp2 )
250         ENDIF
251         CALL mpi_init( ierr )
252         IF( ierr /= MPI_SUCCESS ) CALL ctl_stop( 'STOP', ' lib_mpp: Error in routine mpi_init' )
253      ENDIF
254
255      IF( PRESENT(localComm) ) THEN
256         IF( Agrif_Root() ) THEN
257            mpi_comm_oce = localComm
258         ENDIF
259      ELSE
260         CALL mpi_comm_dup( mpi_comm_world, mpi_comm_oce, ierr)
261         IF( ierr /= MPI_SUCCESS ) CALL ctl_stop( 'STOP', ' lib_mpp: Error in routine mpi_comm_dup' )
262      ENDIF
263
264# if defined key_agrif
265      IF( Agrif_Root() ) THEN
266         CALL Agrif_MPI_Init(mpi_comm_oce)
267      ELSE
268         CALL Agrif_MPI_set_grid_comm(mpi_comm_oce)
269      ENDIF
270# endif
271
272      CALL mpi_comm_rank( mpi_comm_oce, mpprank, ierr )
273      CALL mpi_comm_size( mpi_comm_oce, mppsize, ierr )
274      !
275      CALL MPI_OP_CREATE(DDPDD_MPI, .TRUE., MPI_SUMDD, ierr)
276      !
277#else
278      IF( PRESENT( localComm ) ) mpi_comm_oce = localComm
279      mppsize = 1
280      mpprank = 0
281#endif
282   END SUBROUTINE mpp_start
283
284
285   SUBROUTINE mppsend( ktyp, pmess, kbytes, kdest, md_req )
286      !!----------------------------------------------------------------------
287      !!                  ***  routine mppsend  ***
288      !!
289      !! ** Purpose :   Send messag passing array
290      !!
291      !!----------------------------------------------------------------------
292      REAL(wp), INTENT(inout) ::   pmess(*)   ! array of real
293      INTEGER , INTENT(in   ) ::   kbytes     ! size of the array pmess
294      INTEGER , INTENT(in   ) ::   kdest      ! receive process number
295      INTEGER , INTENT(in   ) ::   ktyp       ! tag of the message
296      INTEGER , INTENT(inout) ::   md_req     ! argument for isend
297      !!
298      INTEGER ::   iflag
299      INTEGER :: mpi_working_type
300      !!----------------------------------------------------------------------
301      !
302#if ! defined key_mpi_off
303      IF (wp == dp) THEN
304         mpi_working_type = mpi_double_precision
305      ELSE
306         mpi_working_type = mpi_real
307      END IF
308      CALL mpi_isend( pmess, kbytes, mpi_working_type, kdest , ktyp, mpi_comm_oce, md_req, iflag )
309#endif
310      !
311   END SUBROUTINE mppsend
312
313
314   SUBROUTINE mppsend_dp( ktyp, pmess, kbytes, kdest, md_req )
315      !!----------------------------------------------------------------------
316      !!                  ***  routine mppsend  ***
317      !!
318      !! ** Purpose :   Send messag passing array
319      !!
320      !!----------------------------------------------------------------------
321      REAL(dp), INTENT(inout) ::   pmess(*)   ! array of real
322      INTEGER , INTENT(in   ) ::   kbytes     ! size of the array pmess
323      INTEGER , INTENT(in   ) ::   kdest      ! receive process number
324      INTEGER , INTENT(in   ) ::   ktyp       ! tag of the message
325      INTEGER , INTENT(inout) ::   md_req     ! argument for isend
326      !!
327      INTEGER ::   iflag
328      !!----------------------------------------------------------------------
329      !
330#if ! defined key_mpi_off
331      CALL mpi_isend( pmess, kbytes, mpi_double_precision, kdest , ktyp, mpi_comm_oce, md_req, iflag )
332#endif
333      !
334   END SUBROUTINE mppsend_dp
335
336
337   SUBROUTINE mppsend_sp( ktyp, pmess, kbytes, kdest, md_req )
338      !!----------------------------------------------------------------------
339      !!                  ***  routine mppsend  ***
340      !!
341      !! ** Purpose :   Send messag passing array
342      !!
343      !!----------------------------------------------------------------------
344      REAL(sp), INTENT(inout) ::   pmess(*)   ! array of real
345      INTEGER , INTENT(in   ) ::   kbytes     ! size of the array pmess
346      INTEGER , INTENT(in   ) ::   kdest      ! receive process number
347      INTEGER , INTENT(in   ) ::   ktyp       ! tag of the message
348      INTEGER , INTENT(inout) ::   md_req     ! argument for isend
349      !!
350      INTEGER ::   iflag
351      !!----------------------------------------------------------------------
352      !
353#if ! defined key_mpi_off
354      CALL mpi_isend( pmess, kbytes, mpi_real, kdest , ktyp, mpi_comm_oce, md_req, iflag )
355#endif
356      !
357   END SUBROUTINE mppsend_sp
358
359
360   SUBROUTINE mpprecv( ktyp, pmess, kbytes, ksource )
361      !!----------------------------------------------------------------------
362      !!                  ***  routine mpprecv  ***
363      !!
364      !! ** Purpose :   Receive messag passing array
365      !!
366      !!----------------------------------------------------------------------
367      REAL(wp), INTENT(inout) ::   pmess(*)   ! array of real
368      INTEGER , INTENT(in   ) ::   kbytes     ! suze of the array pmess
369      INTEGER , INTENT(in   ) ::   ktyp       ! Tag of the recevied message
370      INTEGER, OPTIONAL, INTENT(in) :: ksource    ! source process number
371      !!
372      INTEGER :: istatus(mpi_status_size)
373      INTEGER :: iflag
374      INTEGER :: use_source
375      INTEGER :: mpi_working_type
376      !!----------------------------------------------------------------------
377      !
378#if ! defined key_mpi_off
379      ! If a specific process number has been passed to the receive call,
380      ! use that one. Default is to use mpi_any_source
381      use_source = mpi_any_source
382      IF( PRESENT(ksource) )   use_source = ksource
383      !
384      IF (wp == dp) THEN
385         mpi_working_type = mpi_double_precision
386      ELSE
387         mpi_working_type = mpi_real
388      END IF
389      CALL mpi_recv( pmess, kbytes, mpi_working_type, use_source, ktyp, mpi_comm_oce, istatus, iflag )
390#endif
391      !
392   END SUBROUTINE mpprecv
393
394   SUBROUTINE mpprecv_dp( ktyp, pmess, kbytes, ksource )
395      !!----------------------------------------------------------------------
396      !!                  ***  routine mpprecv  ***
397      !!
398      !! ** Purpose :   Receive messag passing array
399      !!
400      !!----------------------------------------------------------------------
401      REAL(dp), INTENT(inout) ::   pmess(*)   ! array of real
402      INTEGER , INTENT(in   ) ::   kbytes     ! suze of the array pmess
403      INTEGER , INTENT(in   ) ::   ktyp       ! Tag of the recevied message
404      INTEGER, OPTIONAL, INTENT(in) :: ksource    ! source process number
405      !!
406      INTEGER :: istatus(mpi_status_size)
407      INTEGER :: iflag
408      INTEGER :: use_source
409      !!----------------------------------------------------------------------
410      !
411#if ! defined key_mpi_off
412      ! If a specific process number has been passed to the receive call,
413      ! use that one. Default is to use mpi_any_source
414      use_source = mpi_any_source
415      IF( PRESENT(ksource) )   use_source = ksource
416      !
417      CALL mpi_recv( pmess, kbytes, mpi_double_precision, use_source, ktyp, mpi_comm_oce, istatus, iflag )
418#endif
419      !
420   END SUBROUTINE mpprecv_dp
421
422
423   SUBROUTINE mpprecv_sp( ktyp, pmess, kbytes, ksource )
424      !!----------------------------------------------------------------------
425      !!                  ***  routine mpprecv  ***
426      !!
427      !! ** Purpose :   Receive messag passing array
428      !!
429      !!----------------------------------------------------------------------
430      REAL(sp), INTENT(inout) ::   pmess(*)   ! array of real
431      INTEGER , INTENT(in   ) ::   kbytes     ! suze of the array pmess
432      INTEGER , INTENT(in   ) ::   ktyp       ! Tag of the recevied message
433      INTEGER, OPTIONAL, INTENT(in) :: ksource    ! source process number
434      !!
435      INTEGER :: istatus(mpi_status_size)
436      INTEGER :: iflag
437      INTEGER :: use_source
438      !!----------------------------------------------------------------------
439      !
440#if ! defined key_mpi_off
441      ! If a specific process number has been passed to the receive call,
442      ! use that one. Default is to use mpi_any_source
443      use_source = mpi_any_source
444      IF( PRESENT(ksource) )   use_source = ksource
445      !
446      CALL mpi_recv( pmess, kbytes, mpi_real, use_source, ktyp, mpi_comm_oce, istatus, iflag )
447#endif
448      !
449   END SUBROUTINE mpprecv_sp
450
451
452   SUBROUTINE mppgather( ptab, kp, pio )
453      !!----------------------------------------------------------------------
454      !!                   ***  routine mppgather  ***
455      !!
456      !! ** Purpose :   Transfert between a local subdomain array and a work
457      !!     array which is distributed following the vertical level.
458      !!
459      !!----------------------------------------------------------------------
460      REAL(wp), DIMENSION(jpi,jpj)      , INTENT(in   ) ::   ptab   ! subdomain input array
461      INTEGER                           , INTENT(in   ) ::   kp     ! record length
462      REAL(wp), DIMENSION(jpi,jpj,jpnij), INTENT(  out) ::   pio    ! subdomain input array
463      !!
464      INTEGER :: itaille, ierror   ! temporary integer
465      !!---------------------------------------------------------------------
466      !
467      itaille = jpi * jpj
468#if ! defined key_mpi_off
469      CALL mpi_gather( ptab, itaille, mpi_double_precision, pio, itaille     ,   &
470         &                            mpi_double_precision, kp , mpi_comm_oce, ierror )
471#else
472      pio(:,:,1) = ptab(:,:)
473#endif
474      !
475   END SUBROUTINE mppgather
476
477
478   SUBROUTINE mppscatter( pio, kp, ptab )
479      !!----------------------------------------------------------------------
480      !!                  ***  routine mppscatter  ***
481      !!
482      !! ** Purpose :   Transfert between awork array which is distributed
483      !!      following the vertical level and the local subdomain array.
484      !!
485      !!----------------------------------------------------------------------
486      REAL(wp), DIMENSION(jpi,jpj,jpnij)  ::   pio    ! output array
487      INTEGER                             ::   kp     ! Tag (not used with MPI
488      REAL(wp), DIMENSION(jpi,jpj)        ::   ptab   ! subdomain array input
489      !!
490      INTEGER :: itaille, ierror   ! temporary integer
491      !!---------------------------------------------------------------------
492      !
493      itaille = jpi * jpj
494      !
495#if ! defined key_mpi_off
496      CALL mpi_scatter( pio, itaille, mpi_double_precision, ptab, itaille     ,   &
497         &                            mpi_double_precision, kp  , mpi_comm_oce, ierror )
498#else
499      ptab(:,:) = pio(:,:,1)
500#endif
501      !
502   END SUBROUTINE mppscatter
503
504
505   SUBROUTINE mpp_delay_sum( cdname, cdelay, y_in, pout, ldlast, kcom )
506     !!----------------------------------------------------------------------
507      !!                   ***  routine mpp_delay_sum  ***
508      !!
509      !! ** Purpose :   performed delayed mpp_sum, the result is received on next call
510      !!
511      !!----------------------------------------------------------------------
512      CHARACTER(len=*), INTENT(in   )               ::   cdname  ! name of the calling subroutine
513      CHARACTER(len=*), INTENT(in   )               ::   cdelay  ! name (used as id) of the delayed operation
514      COMPLEX(dp),      INTENT(in   ), DIMENSION(:) ::   y_in
515      REAL(wp),         INTENT(  out), DIMENSION(:) ::   pout
516      LOGICAL,          INTENT(in   )               ::   ldlast  ! true if this is the last time we call this routine
517      INTEGER,          INTENT(in   ), OPTIONAL     ::   kcom
518      !!
519      INTEGER ::   ji, isz
520      INTEGER ::   idvar
521      INTEGER ::   ierr, ilocalcomm
522      COMPLEX(dp), ALLOCATABLE, DIMENSION(:) ::   ytmp
523      !!----------------------------------------------------------------------
524#if ! defined key_mpi_off
525      ilocalcomm = mpi_comm_oce
526      IF( PRESENT(kcom) )   ilocalcomm = kcom
527
528      isz = SIZE(y_in)
529
530      IF( narea == 1 .AND. numcom == -1 ) CALL mpp_report( cdname, ld_dlg = .TRUE. )
531
532      idvar = -1
533      DO ji = 1, nbdelay
534         IF( TRIM(cdelay) == TRIM(c_delaylist(ji)) ) idvar = ji
535      END DO
536      IF ( idvar == -1 )   CALL ctl_stop( 'STOP',' mpp_delay_sum : please add a new delayed exchange for '//TRIM(cdname) )
537
538      IF ( ndelayid(idvar) == 0 ) THEN         ! first call    with restart: %z1d defined in iom_delay_rst
539         !                                       --------------------------
540         IF ( SIZE(todelay(idvar)%z1d) /= isz ) THEN                  ! Check dimension coherence
541            IF(lwp) WRITE(numout,*) ' WARNING: the nb of delayed variables in restart file is not the model one'
542            DEALLOCATE(todelay(idvar)%z1d)
543            ndelayid(idvar) = -1                                      ! do as if we had no restart
544         ELSE
545            ALLOCATE(todelay(idvar)%y1d(isz))
546            todelay(idvar)%y1d(:) = CMPLX(todelay(idvar)%z1d(:), 0., wp)   ! create %y1d, complex variable needed by mpi_sumdd
547            ndelayid(idvar) = MPI_REQUEST_NULL                             ! initialised request to a valid value
548         END IF
549      ENDIF
550
551      IF( ndelayid(idvar) == -1 ) THEN         ! first call without restart: define %y1d and %z1d from y_in with blocking allreduce
552         !                                       --------------------------
553         ALLOCATE(todelay(idvar)%z1d(isz), todelay(idvar)%y1d(isz))   ! allocate also %z1d as used for the restart
554         CALL mpi_allreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ierr )   ! get %y1d
555         ndelayid(idvar) = MPI_REQUEST_NULL
556      ENDIF
557
558      CALL mpp_delay_rcv( idvar )         ! make sure %z1d is received
559
560      ! send back pout from todelay(idvar)%z1d defined at previous call
561      pout(:) = todelay(idvar)%z1d(:)
562
563      ! send y_in into todelay(idvar)%y1d with a non-blocking communication
564# if defined key_mpi2
565      IF( ln_timing ) CALL tic_tac( .TRUE., ld_global = .TRUE.)
566      CALL  mpi_allreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ierr )
567      ndelayid(idvar) = MPI_REQUEST_NULL
568      IF( ln_timing ) CALL tic_tac(.FALSE., ld_global = .TRUE.)
569# else
570      CALL mpi_iallreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ndelayid(idvar), ierr )
571# endif
572#else
573      pout(:) = REAL(y_in(:), wp)
574#endif
575
576   END SUBROUTINE mpp_delay_sum
577
578
579   SUBROUTINE mpp_delay_max( cdname, cdelay, p_in, pout, ldlast, kcom )
580      !!----------------------------------------------------------------------
581      !!                   ***  routine mpp_delay_max  ***
582      !!
583      !! ** Purpose :   performed delayed mpp_max, the result is received on next call
584      !!
585      !!----------------------------------------------------------------------
586      CHARACTER(len=*), INTENT(in   )                 ::   cdname  ! name of the calling subroutine
587      CHARACTER(len=*), INTENT(in   )                 ::   cdelay  ! name (used as id) of the delayed operation
588      REAL(wp),         INTENT(in   ), DIMENSION(:)   ::   p_in    !
589      REAL(wp),         INTENT(  out), DIMENSION(:)   ::   pout    !
590      LOGICAL,          INTENT(in   )                 ::   ldlast  ! true if this is the last time we call this routine
591      INTEGER,          INTENT(in   ), OPTIONAL       ::   kcom
592      !!
593      INTEGER ::   ji, isz
594      INTEGER ::   idvar
595      INTEGER ::   ierr, ilocalcomm
596      INTEGER ::   MPI_TYPE
597      !!----------------------------------------------------------------------
598
599#if ! defined key_mpi_off
600      if( wp == dp ) then
601         MPI_TYPE = MPI_DOUBLE_PRECISION
602      else if ( wp == sp ) then
603         MPI_TYPE = MPI_REAL
604      else
605        CALL ctl_stop( "Error defining type, wp is neither dp nor sp" )
606
607      end if
608
609      ilocalcomm = mpi_comm_oce
610      IF( PRESENT(kcom) )   ilocalcomm = kcom
611
612      isz = SIZE(p_in)
613
614      IF( narea == 1 .AND. numcom == -1 ) CALL mpp_report( cdname, ld_dlg = .TRUE. )
615
616      idvar = -1
617      DO ji = 1, nbdelay
618         IF( TRIM(cdelay) == TRIM(c_delaylist(ji)) ) idvar = ji
619      END DO
620      IF ( idvar == -1 )   CALL ctl_stop( 'STOP',' mpp_delay_max : please add a new delayed exchange for '//TRIM(cdname) )
621
622      IF ( ndelayid(idvar) == 0 ) THEN         ! first call    with restart: %z1d defined in iom_delay_rst
623         !                                       --------------------------
624         IF ( SIZE(todelay(idvar)%z1d) /= isz ) THEN                  ! Check dimension coherence
625            IF(lwp) WRITE(numout,*) ' WARNING: the nb of delayed variables in restart file is not the model one'
626            DEALLOCATE(todelay(idvar)%z1d)
627            ndelayid(idvar) = -1                                      ! do as if we had no restart
628         ELSE
629            ndelayid(idvar) = MPI_REQUEST_NULL
630         END IF
631      ENDIF
632
633      IF( ndelayid(idvar) == -1 ) THEN         ! first call without restart: define %z1d from p_in with a blocking allreduce
634         !                                       --------------------------
635         ALLOCATE(todelay(idvar)%z1d(isz))
636         CALL mpi_allreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_DOUBLE_PRECISION, mpi_max, ilocalcomm, ierr )   ! get %z1d
637         ndelayid(idvar) = MPI_REQUEST_NULL
638      ENDIF
639
640      CALL mpp_delay_rcv( idvar )         ! make sure %z1d is received
641
642      ! send back pout from todelay(idvar)%z1d defined at previous call
643      pout(:) = todelay(idvar)%z1d(:)
644
645      ! send p_in into todelay(idvar)%z1d with a non-blocking communication
646      ! (PM) Should we get rid of MPI2 option ? MPI3 was release in 2013. Who is still using MPI2 ?
647# if defined key_mpi2
648      IF( ln_timing ) CALL tic_tac( .TRUE., ld_global = .TRUE.)
649      CALL  mpi_allreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_TYPE, mpi_max, ilocalcomm, ierr )
650      IF( ln_timing ) CALL tic_tac(.FALSE., ld_global = .TRUE.)
651# else
652      CALL mpi_iallreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_TYPE, mpi_max, ilocalcomm, ndelayid(idvar), ierr )
653# endif
654#else
655      pout(:) = p_in(:)
656#endif
657
658   END SUBROUTINE mpp_delay_max
659
660
661   SUBROUTINE mpp_delay_rcv( kid )
662      !!----------------------------------------------------------------------
663      !!                   ***  routine mpp_delay_rcv  ***
664      !!
665      !! ** Purpose :  force barrier for delayed mpp (needed for restart)
666      !!
667      !!----------------------------------------------------------------------
668      INTEGER,INTENT(in   )      ::  kid
669      INTEGER ::   ierr
670      !!----------------------------------------------------------------------
671#if ! defined key_mpi_off
672      IF( ln_timing ) CALL tic_tac( .TRUE., ld_global = .TRUE.)
673      ! test on ndelayid(kid) useless as mpi_wait return immediatly if the request handle is MPI_REQUEST_NULL
674      CALL mpi_wait( ndelayid(kid), MPI_STATUS_IGNORE, ierr ) ! after this ndelayid(kid) = MPI_REQUEST_NULL
675      IF( ln_timing ) CALL tic_tac( .FALSE., ld_global = .TRUE.)
676      IF( ASSOCIATED(todelay(kid)%y1d) )   todelay(kid)%z1d(:) = REAL(todelay(kid)%y1d(:), wp)  ! define %z1d from %y1d
677#endif
678   END SUBROUTINE mpp_delay_rcv
679
680   SUBROUTINE mpp_bcast_nml( cdnambuff , kleng )
681      CHARACTER(LEN=:)    , ALLOCATABLE, INTENT(INOUT) :: cdnambuff
682      INTEGER                          , INTENT(INOUT) :: kleng
683      !!----------------------------------------------------------------------
684      !!                  ***  routine mpp_bcast_nml  ***
685      !!
686      !! ** Purpose :   broadcast namelist character buffer
687      !!
688      !!----------------------------------------------------------------------
689      !!
690      INTEGER ::   iflag
691      !!----------------------------------------------------------------------
692      !
693#if ! defined key_mpi_off
694      call MPI_BCAST(kleng, 1, MPI_INT, 0, mpi_comm_oce, iflag)
695      call MPI_BARRIER(mpi_comm_oce, iflag)
696!$AGRIF_DO_NOT_TREAT
697      IF ( .NOT. ALLOCATED(cdnambuff) ) ALLOCATE( CHARACTER(LEN=kleng) :: cdnambuff )
698!$AGRIF_END_DO_NOT_TREAT
699      call MPI_BCAST(cdnambuff, kleng, MPI_CHARACTER, 0, mpi_comm_oce, iflag)
700      call MPI_BARRIER(mpi_comm_oce, iflag)
701#endif
702      !
703   END SUBROUTINE mpp_bcast_nml
704
705
706   !!----------------------------------------------------------------------
707   !!    ***  mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real  ***
708   !!
709   !!----------------------------------------------------------------------
710   !!
711#  define OPERATION_MAX
712#  define INTEGER_TYPE
713#  define DIM_0d
714#     define ROUTINE_ALLREDUCE           mppmax_int
715#     include "mpp_allreduce_generic.h90"
716#     undef ROUTINE_ALLREDUCE
717#  undef DIM_0d
718#  define DIM_1d
719#     define ROUTINE_ALLREDUCE           mppmax_a_int
720#     include "mpp_allreduce_generic.h90"
721#     undef ROUTINE_ALLREDUCE
722#  undef DIM_1d
723#  undef INTEGER_TYPE
724!
725   !!
726   !!   ----   SINGLE PRECISION VERSIONS
727   !!
728#  define SINGLE_PRECISION
729#  define REAL_TYPE
730#  define DIM_0d
731#     define ROUTINE_ALLREDUCE           mppmax_real_sp
732#     include "mpp_allreduce_generic.h90"
733#     undef ROUTINE_ALLREDUCE
734#  undef DIM_0d
735#  define DIM_1d
736#     define ROUTINE_ALLREDUCE           mppmax_a_real_sp
737#     include "mpp_allreduce_generic.h90"
738#     undef ROUTINE_ALLREDUCE
739#  undef DIM_1d
740#  undef SINGLE_PRECISION
741   !!
742   !!
743   !!   ----   DOUBLE PRECISION VERSIONS
744   !!
745!
746#  define DIM_0d
747#     define ROUTINE_ALLREDUCE           mppmax_real_dp
748#     include "mpp_allreduce_generic.h90"
749#     undef ROUTINE_ALLREDUCE
750#  undef DIM_0d
751#  define DIM_1d
752#     define ROUTINE_ALLREDUCE           mppmax_a_real_dp
753#     include "mpp_allreduce_generic.h90"
754#     undef ROUTINE_ALLREDUCE
755#  undef DIM_1d
756#  undef REAL_TYPE
757#  undef OPERATION_MAX
758   !!----------------------------------------------------------------------
759   !!    ***  mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real  ***
760   !!
761   !!----------------------------------------------------------------------
762   !!
763#  define OPERATION_MIN
764#  define INTEGER_TYPE
765#  define DIM_0d
766#     define ROUTINE_ALLREDUCE           mppmin_int
767#     include "mpp_allreduce_generic.h90"
768#     undef ROUTINE_ALLREDUCE
769#  undef DIM_0d
770#  define DIM_1d
771#     define ROUTINE_ALLREDUCE           mppmin_a_int
772#     include "mpp_allreduce_generic.h90"
773#     undef ROUTINE_ALLREDUCE
774#  undef DIM_1d
775#  undef INTEGER_TYPE
776!
777   !!
778   !!   ----   SINGLE PRECISION VERSIONS
779   !!
780#  define SINGLE_PRECISION
781#  define REAL_TYPE
782#  define DIM_0d
783#     define ROUTINE_ALLREDUCE           mppmin_real_sp
784#     include "mpp_allreduce_generic.h90"
785#     undef ROUTINE_ALLREDUCE
786#  undef DIM_0d
787#  define DIM_1d
788#     define ROUTINE_ALLREDUCE           mppmin_a_real_sp
789#     include "mpp_allreduce_generic.h90"
790#     undef ROUTINE_ALLREDUCE
791#  undef DIM_1d
792#  undef SINGLE_PRECISION
793   !!
794   !!   ----   DOUBLE PRECISION VERSIONS
795   !!
796
797#  define DIM_0d
798#     define ROUTINE_ALLREDUCE           mppmin_real_dp
799#     include "mpp_allreduce_generic.h90"
800#     undef ROUTINE_ALLREDUCE
801#  undef DIM_0d
802#  define DIM_1d
803#     define ROUTINE_ALLREDUCE           mppmin_a_real_dp
804#     include "mpp_allreduce_generic.h90"
805#     undef ROUTINE_ALLREDUCE
806#  undef DIM_1d
807#  undef REAL_TYPE
808#  undef OPERATION_MIN
809
810   !!----------------------------------------------------------------------
811   !!    ***  mppsum_a_int, mppsum_int, mppsum_a_real, mppsum_real  ***
812   !!
813   !!   Global sum of 1D array or a variable (integer, real or complex)
814   !!----------------------------------------------------------------------
815   !!
816#  define OPERATION_SUM
817#  define INTEGER_TYPE
818#  define DIM_0d
819#     define ROUTINE_ALLREDUCE           mppsum_int
820#     include "mpp_allreduce_generic.h90"
821#     undef ROUTINE_ALLREDUCE
822#  undef DIM_0d
823#  define DIM_1d
824#     define ROUTINE_ALLREDUCE           mppsum_a_int
825#     include "mpp_allreduce_generic.h90"
826#     undef ROUTINE_ALLREDUCE
827#  undef DIM_1d
828#  undef INTEGER_TYPE
829
830   !!
831   !!   ----   SINGLE PRECISION VERSIONS
832   !!
833#  define OPERATION_SUM
834#  define SINGLE_PRECISION
835#  define REAL_TYPE
836#  define DIM_0d
837#     define ROUTINE_ALLREDUCE           mppsum_real_sp
838#     include "mpp_allreduce_generic.h90"
839#     undef ROUTINE_ALLREDUCE
840#  undef DIM_0d
841#  define DIM_1d
842#     define ROUTINE_ALLREDUCE           mppsum_a_real_sp
843#     include "mpp_allreduce_generic.h90"
844#     undef ROUTINE_ALLREDUCE
845#  undef DIM_1d
846#  undef REAL_TYPE
847#  undef OPERATION_SUM
848
849#  undef SINGLE_PRECISION
850
851   !!
852   !!   ----   DOUBLE PRECISION VERSIONS
853   !!
854#  define OPERATION_SUM
855#  define REAL_TYPE
856#  define DIM_0d
857#     define ROUTINE_ALLREDUCE           mppsum_real_dp
858#     include "mpp_allreduce_generic.h90"
859#     undef ROUTINE_ALLREDUCE
860#  undef DIM_0d
861#  define DIM_1d
862#     define ROUTINE_ALLREDUCE           mppsum_a_real_dp
863#     include "mpp_allreduce_generic.h90"
864#     undef ROUTINE_ALLREDUCE
865#  undef DIM_1d
866#  undef REAL_TYPE
867#  undef OPERATION_SUM
868
869#  define OPERATION_SUM_DD
870#  define COMPLEX_TYPE
871#  define DIM_0d
872#     define ROUTINE_ALLREDUCE           mppsum_realdd
873#     include "mpp_allreduce_generic.h90"
874#     undef ROUTINE_ALLREDUCE
875#  undef DIM_0d
876#  define DIM_1d
877#     define ROUTINE_ALLREDUCE           mppsum_a_realdd
878#     include "mpp_allreduce_generic.h90"
879#     undef ROUTINE_ALLREDUCE
880#  undef DIM_1d
881#  undef COMPLEX_TYPE
882#  undef OPERATION_SUM_DD
883
884   !!----------------------------------------------------------------------
885   !!    ***  mpp_minloc2d, mpp_minloc3d, mpp_maxloc2d, mpp_maxloc3d
886   !!
887   !!----------------------------------------------------------------------
888   !!
889   !!
890   !!   ----   SINGLE PRECISION VERSIONS
891   !!
892#  define SINGLE_PRECISION
893#  define OPERATION_MINLOC
894#  define DIM_2d
895#     define ROUTINE_LOC           mpp_minloc2d_sp
896#     include "mpp_loc_generic.h90"
897#     undef ROUTINE_LOC
898#  undef DIM_2d
899#  define DIM_3d
900#     define ROUTINE_LOC           mpp_minloc3d_sp
901#     include "mpp_loc_generic.h90"
902#     undef ROUTINE_LOC
903#  undef DIM_3d
904#  undef OPERATION_MINLOC
905
906#  define OPERATION_MAXLOC
907#  define DIM_2d
908#     define ROUTINE_LOC           mpp_maxloc2d_sp
909#     include "mpp_loc_generic.h90"
910#     undef ROUTINE_LOC
911#  undef DIM_2d
912#  define DIM_3d
913#     define ROUTINE_LOC           mpp_maxloc3d_sp
914#     include "mpp_loc_generic.h90"
915#     undef ROUTINE_LOC
916#  undef DIM_3d
917#  undef OPERATION_MAXLOC
918#  undef SINGLE_PRECISION
919   !!
920   !!   ----   DOUBLE PRECISION VERSIONS
921   !!
922#  define OPERATION_MINLOC
923#  define DIM_2d
924#     define ROUTINE_LOC           mpp_minloc2d_dp
925#     include "mpp_loc_generic.h90"
926#     undef ROUTINE_LOC
927#  undef DIM_2d
928#  define DIM_3d
929#     define ROUTINE_LOC           mpp_minloc3d_dp
930#     include "mpp_loc_generic.h90"
931#     undef ROUTINE_LOC
932#  undef DIM_3d
933#  undef OPERATION_MINLOC
934
935#  define OPERATION_MAXLOC
936#  define DIM_2d
937#     define ROUTINE_LOC           mpp_maxloc2d_dp
938#     include "mpp_loc_generic.h90"
939#     undef ROUTINE_LOC
940#  undef DIM_2d
941#  define DIM_3d
942#     define ROUTINE_LOC           mpp_maxloc3d_dp
943#     include "mpp_loc_generic.h90"
944#     undef ROUTINE_LOC
945#  undef DIM_3d
946#  undef OPERATION_MAXLOC
947
948
949   SUBROUTINE mppsync()
950      !!----------------------------------------------------------------------
951      !!                  ***  routine mppsync  ***
952      !!
953      !! ** Purpose :   Massively parallel processors, synchroneous
954      !!
955      !!-----------------------------------------------------------------------
956      INTEGER :: ierror
957      !!-----------------------------------------------------------------------
958      !
959#if ! defined key_mpi_off
960      CALL mpi_barrier( mpi_comm_oce, ierror )
961#endif
962      !
963   END SUBROUTINE mppsync
964
965
966   SUBROUTINE mppstop( ld_abort )
967      !!----------------------------------------------------------------------
968      !!                  ***  routine mppstop  ***
969      !!
970      !! ** purpose :   Stop massively parallel processors method
971      !!
972      !!----------------------------------------------------------------------
973      LOGICAL, OPTIONAL, INTENT(in) :: ld_abort    ! source process number
974      LOGICAL ::   ll_abort
975      INTEGER ::   info, ierr
976      !!----------------------------------------------------------------------
977      ll_abort = .FALSE.
978      IF( PRESENT(ld_abort) ) ll_abort = ld_abort
979      !
980#if ! defined key_mpi_off
981      IF(ll_abort) THEN
982         CALL mpi_abort( MPI_COMM_WORLD, 123, info )
983      ELSE
984         CALL mppsync
985         CALL mpi_finalize( info )
986      ENDIF
987#endif
988      IF( ll_abort ) STOP 123
989      !
990   END SUBROUTINE mppstop
991
992
993   SUBROUTINE mpp_comm_free( kcom )
994      !!----------------------------------------------------------------------
995      INTEGER, INTENT(inout) ::   kcom
996      !!
997      INTEGER :: ierr
998      !!----------------------------------------------------------------------
999      !
1000#if ! defined key_mpi_off
1001      CALL MPI_COMM_FREE(kcom, ierr)
1002#endif
1003      !
1004   END SUBROUTINE mpp_comm_free
1005
1006
1007   SUBROUTINE mpp_ini_znl( kumout )
1008      !!----------------------------------------------------------------------
1009      !!               ***  routine mpp_ini_znl  ***
1010      !!
1011      !! ** Purpose :   Initialize special communicator for computing zonal sum
1012      !!
1013      !! ** Method  : - Look for processors in the same row
1014      !!              - Put their number in nrank_znl
1015      !!              - Create group for the znl processors
1016      !!              - Create a communicator for znl processors
1017      !!              - Determine if processor should write znl files
1018      !!
1019      !! ** output
1020      !!      ndim_rank_znl = number of processors on the same row
1021      !!      ngrp_znl = group ID for the znl processors
1022      !!      ncomm_znl = communicator for the ice procs.
1023      !!      n_znl_root = number (in the world) of proc 0 in the ice comm.
1024      !!
1025      !!----------------------------------------------------------------------
1026      INTEGER, INTENT(in) ::   kumout   ! ocean.output logical units
1027      !
1028      INTEGER :: jproc      ! dummy loop integer
1029      INTEGER :: ierr, ii   ! local integer
1030      INTEGER, ALLOCATABLE, DIMENSION(:) ::   kwork
1031      !!----------------------------------------------------------------------
1032#if ! defined key_mpi_off
1033      !-$$     WRITE (numout,*) 'mpp_ini_znl ', mpprank, ' - ngrp_world     : ', ngrp_world
1034      !-$$     WRITE (numout,*) 'mpp_ini_znl ', mpprank, ' - mpi_comm_world : ', mpi_comm_world
1035      !-$$     WRITE (numout,*) 'mpp_ini_znl ', mpprank, ' - mpi_comm_oce   : ', mpi_comm_oce
1036      !
1037      ALLOCATE( kwork(jpnij), STAT=ierr )
1038      IF( ierr /= 0 ) CALL ctl_stop( 'STOP', 'mpp_ini_znl : failed to allocate 1D array of length jpnij')
1039
1040      IF( jpnj == 1 ) THEN
1041         ngrp_znl  = ngrp_world
1042         ncomm_znl = mpi_comm_oce
1043      ELSE
1044         !
1045         CALL MPI_ALLGATHER ( njmpp, 1, mpi_integer, kwork, 1, mpi_integer, mpi_comm_oce, ierr )
1046         !-$$        WRITE (numout,*) 'mpp_ini_znl ', mpprank, ' - kwork pour njmpp : ', kwork
1047         !-$$        CALL flush(numout)
1048         !
1049         ! Count number of processors on the same row
1050         ndim_rank_znl = 0
1051         DO jproc=1,jpnij
1052            IF ( kwork(jproc) == njmpp ) THEN
1053               ndim_rank_znl = ndim_rank_znl + 1
1054            ENDIF
1055         END DO
1056         !-$$        WRITE (numout,*) 'mpp_ini_znl ', mpprank, ' - ndim_rank_znl : ', ndim_rank_znl
1057         !-$$        CALL flush(numout)
1058         ! Allocate the right size to nrank_znl
1059         IF (ALLOCATED (nrank_znl)) DEALLOCATE(nrank_znl)
1060         ALLOCATE(nrank_znl(ndim_rank_znl))
1061         ii = 0
1062         nrank_znl (:) = 0
1063         DO jproc=1,jpnij
1064            IF ( kwork(jproc) == njmpp) THEN
1065               ii = ii + 1
1066               nrank_znl(ii) = jproc -1
1067            ENDIF
1068         END DO
1069         !-$$        WRITE (numout,*) 'mpp_ini_znl ', mpprank, ' - nrank_znl : ', nrank_znl
1070         !-$$        CALL flush(numout)
1071
1072         ! Create the opa group
1073         CALL MPI_COMM_GROUP(mpi_comm_oce,ngrp_opa,ierr)
1074         !-$$        WRITE (numout,*) 'mpp_ini_znl ', mpprank, ' - ngrp_opa : ', ngrp_opa
1075         !-$$        CALL flush(numout)
1076
1077         ! Create the znl group from the opa group
1078         CALL MPI_GROUP_INCL  ( ngrp_opa, ndim_rank_znl, nrank_znl, ngrp_znl, ierr )
1079         !-$$        WRITE (numout,*) 'mpp_ini_znl ', mpprank, ' - ngrp_znl ', ngrp_znl
1080         !-$$        CALL flush(numout)
1081
1082         ! Create the znl communicator from the opa communicator, ie the pool of procs in the same row
1083         CALL MPI_COMM_CREATE ( mpi_comm_oce, ngrp_znl, ncomm_znl, ierr )
1084         !-$$        WRITE (numout,*) 'mpp_ini_znl ', mpprank, ' - ncomm_znl ', ncomm_znl
1085         !-$$        CALL flush(numout)
1086         !
1087      END IF
1088
1089      ! Determines if processor if the first (starting from i=1) on the row
1090      IF ( jpni == 1 ) THEN
1091         l_znl_root = .TRUE.
1092      ELSE
1093         l_znl_root = .FALSE.
1094         kwork (1) = nimpp
1095         CALL mpp_min ( 'lib_mpp', kwork(1), kcom = ncomm_znl)
1096         IF ( nimpp == kwork(1)) l_znl_root = .TRUE.
1097      END IF
1098
1099      DEALLOCATE(kwork)
1100#endif
1101
1102   END SUBROUTINE mpp_ini_znl
1103
1104   
1105   SUBROUTINE mpp_ini_nc( khls )
1106      !!----------------------------------------------------------------------
1107      !!               ***  routine mpp_ini_nc  ***
1108      !!
1109      !! ** Purpose :   Initialize special communicators for MPI3 neighbourhood
1110      !!                collectives
1111      !!
1112      !! ** Method  : - Create graph communicators starting from the processes
1113      !!                distribution along i and j directions
1114      !
1115      !! ** output
1116      !!         mpi_nc_com4 = MPI3 neighbourhood collectives communicator
1117      !!         mpi_nc_com8 = MPI3 neighbourhood collectives communicator (with diagonals)
1118      !!----------------------------------------------------------------------
1119      INTEGER,             INTENT(in   ) ::   khls        ! halo size, default = nn_hls
1120      !
1121      INTEGER, DIMENSION(:), ALLOCATABLE :: iSnei4, iRnei4, iSnei8, iRnei8
1122      INTEGER                            :: iScnt4, iRcnt4, iScnt8, iRcnt8
1123      INTEGER                            :: ierr
1124      LOGICAL, PARAMETER                 :: ireord = .FALSE.
1125      !!----------------------------------------------------------------------
1126#if ! defined key_mpi_off && ! defined key_mpi2
1127     
1128      iScnt4 = COUNT( mpiSnei(khls,1:4) >= 0 )
1129      iRcnt4 = COUNT( mpiRnei(khls,1:4) >= 0 )
1130      iScnt8 = COUNT( mpiSnei(khls,1:8) >= 0 )
1131      iRcnt8 = COUNT( mpiRnei(khls,1:8) >= 0 )
1132
1133      ALLOCATE( iSnei4(iScnt4), iRnei4(iRcnt4), iSnei8(iScnt8), iRnei8(iRcnt8) )   ! ok if icnt4 or icnt8 = 0
1134
1135      iSnei4 = PACK( mpiSnei(khls,1:4), mask = mpiSnei(khls,1:4) >= 0 )
1136      iRnei4 = PACK( mpiRnei(khls,1:4), mask = mpiRnei(khls,1:4) >= 0 )
1137      iSnei8 = PACK( mpiSnei(khls,1:8), mask = mpiSnei(khls,1:8) >= 0 )
1138      iRnei8 = PACK( mpiRnei(khls,1:8), mask = mpiRnei(khls,1:8) >= 0 )
1139
1140      CALL MPI_Dist_graph_create_adjacent( mpi_comm_oce, iScnt4, iSnei4, MPI_UNWEIGHTED, iRcnt4, iRnei4, MPI_UNWEIGHTED,   &
1141         &                                 MPI_INFO_NULL, ireord, mpi_nc_com4(khls), ierr )
1142      CALL MPI_Dist_graph_create_adjacent( mpi_comm_oce, iScnt8, iSnei8, MPI_UNWEIGHTED, iRcnt8, iRnei8, MPI_UNWEIGHTED,   &
1143         &                                 MPI_INFO_NULL, ireord, mpi_nc_com8(khls), ierr)
1144
1145      DEALLOCATE( iSnei4, iRnei4, iSnei8, iRnei8 )
1146#endif
1147   END SUBROUTINE mpp_ini_nc
1148
1149
1150   SUBROUTINE mpp_ini_north
1151      !!----------------------------------------------------------------------
1152      !!               ***  routine mpp_ini_north  ***
1153      !!
1154      !! ** Purpose :   Initialize special communicator for north folding
1155      !!      condition together with global variables needed in the mpp folding
1156      !!
1157      !! ** Method  : - Look for northern processors
1158      !!              - Put their number in nrank_north
1159      !!              - Create groups for the world processors and the north processors
1160      !!              - Create a communicator for northern processors
1161      !!
1162      !! ** output
1163      !!      ndim_rank_north = number of processors in the northern line
1164      !!      nrank_north (ndim_rank_north) = number  of the northern procs.
1165      !!      ngrp_world = group ID for the world processors
1166      !!      ngrp_north = group ID for the northern processors
1167      !!      ncomm_north = communicator for the northern procs.
1168      !!      north_root = number (in the world) of proc 0 in the northern comm.
1169      !!
1170      !!----------------------------------------------------------------------
1171      INTEGER ::   ierr
1172      INTEGER ::   jjproc
1173      INTEGER ::   ii, ji
1174      !!----------------------------------------------------------------------
1175      !
1176#if ! defined key_mpi_off
1177      !
1178      ! Look for how many procs on the northern boundary
1179      ndim_rank_north = 0
1180      DO jjproc = 1, jpni
1181         IF( nfproc(jjproc) /= -1 )   ndim_rank_north = ndim_rank_north + 1
1182      END DO
1183      !
1184      ! Allocate the right size to nrank_north
1185      IF (ALLOCATED (nrank_north)) DEALLOCATE(nrank_north)
1186      ALLOCATE( nrank_north(ndim_rank_north) )
1187
1188      ! Fill the nrank_north array with proc. number of northern procs.
1189      ! Note : the rank start at 0 in MPI
1190      ii = 0
1191      DO ji = 1, jpni
1192         IF ( nfproc(ji) /= -1   ) THEN
1193            ii=ii+1
1194            nrank_north(ii)=nfproc(ji)
1195         END IF
1196      END DO
1197      !
1198      ! create the world group
1199      CALL MPI_COMM_GROUP( mpi_comm_oce, ngrp_world, ierr )
1200      !
1201      ! Create the North group from the world group
1202      CALL MPI_GROUP_INCL( ngrp_world, ndim_rank_north, nrank_north, ngrp_north, ierr )
1203      !
1204      ! Create the North communicator , ie the pool of procs in the north group
1205      CALL MPI_COMM_CREATE( mpi_comm_oce, ngrp_north, ncomm_north, ierr )
1206      !
1207#endif
1208   END SUBROUTINE mpp_ini_north
1209
1210
1211   SUBROUTINE DDPDD_MPI( ydda, yddb, ilen, itype )
1212      !!---------------------------------------------------------------------
1213      !!   Routine DDPDD_MPI: used by reduction operator MPI_SUMDD
1214      !!
1215      !!   Modification of original codes written by David H. Bailey
1216      !!   This subroutine computes yddb(i) = ydda(i)+yddb(i)
1217      !!---------------------------------------------------------------------
1218      INTEGER                     , INTENT(in)    ::   ilen, itype
1219      COMPLEX(dp), DIMENSION(ilen), INTENT(in)    ::   ydda
1220      COMPLEX(dp), DIMENSION(ilen), INTENT(inout) ::   yddb
1221      !
1222      REAL(dp) :: zerr, zt1, zt2    ! local work variables
1223      INTEGER  :: ji, ztmp           ! local scalar
1224      !!---------------------------------------------------------------------
1225      !
1226      ztmp = itype   ! avoid compilation warning
1227      !
1228      DO ji=1,ilen
1229      ! Compute ydda + yddb using Knuth's trick.
1230         zt1  = real(ydda(ji)) + real(yddb(ji))
1231         zerr = zt1 - real(ydda(ji))
1232         zt2  = ((real(yddb(ji)) - zerr) + (real(ydda(ji)) - (zt1 - zerr))) &
1233                + aimag(ydda(ji)) + aimag(yddb(ji))
1234
1235         ! The result is zt1 + zt2, after normalization.
1236         yddb(ji) = cmplx ( zt1 + zt2, zt2 - ((zt1 + zt2) - zt1),wp )
1237      END DO
1238      !
1239   END SUBROUTINE DDPDD_MPI
1240
1241
1242   SUBROUTINE mpp_report( cdname, kpk, kpl, kpf, ld_lbc, ld_glb, ld_dlg )
1243      !!----------------------------------------------------------------------
1244      !!                  ***  routine mpp_report  ***
1245      !!
1246      !! ** Purpose :   report use of mpp routines per time-setp
1247      !!
1248      !!----------------------------------------------------------------------
1249      CHARACTER(len=*),           INTENT(in   ) ::   cdname      ! name of the calling subroutine
1250      INTEGER         , OPTIONAL, INTENT(in   ) ::   kpk, kpl, kpf
1251      LOGICAL         , OPTIONAL, INTENT(in   ) ::   ld_lbc, ld_glb, ld_dlg
1252      !!
1253      CHARACTER(len=128)                        ::   ccountname  ! name of a subroutine to count communications
1254      LOGICAL ::   ll_lbc, ll_glb, ll_dlg
1255      INTEGER ::    ji,  jj,  jk,  jh, jf, jcount   ! dummy loop indices
1256      !!----------------------------------------------------------------------
1257#if ! defined key_mpi_off
1258      !
1259      ll_lbc = .FALSE.
1260      IF( PRESENT(ld_lbc) ) ll_lbc = ld_lbc
1261      ll_glb = .FALSE.
1262      IF( PRESENT(ld_glb) ) ll_glb = ld_glb
1263      ll_dlg = .FALSE.
1264      IF( PRESENT(ld_dlg) ) ll_dlg = ld_dlg
1265      !
1266      ! find the smallest common frequency: default = frequency product, if multiple, choose the larger of the 2 frequency
1267      ncom_freq = ncom_fsbc
1268      !
1269      IF ( ncom_stp == nit000+ncom_freq ) THEN   ! avoid to count extra communications in potential initializations at nit000
1270         IF( ll_lbc ) THEN
1271            IF( .NOT. ALLOCATED(ncomm_sequence) ) ALLOCATE( ncomm_sequence(ncom_rec_max,2) )
1272            IF( .NOT. ALLOCATED(    crname_lbc) ) ALLOCATE(     crname_lbc(ncom_rec_max  ) )
1273            n_sequence_lbc = n_sequence_lbc + 1
1274            IF( n_sequence_lbc > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
1275            crname_lbc(n_sequence_lbc) = cdname     ! keep the name of the calling routine
1276            ncomm_sequence(n_sequence_lbc,1) = kpk*kpl   ! size of 3rd and 4th dimensions
1277            ncomm_sequence(n_sequence_lbc,2) = kpf       ! number of arrays to be treated (multi)
1278         ENDIF
1279         IF( ll_glb ) THEN
1280            IF( .NOT. ALLOCATED(crname_glb) ) ALLOCATE( crname_glb(ncom_rec_max) )
1281            n_sequence_glb = n_sequence_glb + 1
1282            IF( n_sequence_glb > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
1283            crname_glb(n_sequence_glb) = cdname     ! keep the name of the calling routine
1284         ENDIF
1285         IF( ll_dlg ) THEN
1286            IF( .NOT. ALLOCATED(crname_dlg) ) ALLOCATE( crname_dlg(ncom_rec_max) )
1287            n_sequence_dlg = n_sequence_dlg + 1
1288            IF( n_sequence_dlg > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
1289            crname_dlg(n_sequence_dlg) = cdname     ! keep the name of the calling routine
1290         ENDIF
1291      ELSE IF ( ncom_stp == nit000+2*ncom_freq ) THEN
1292         CALL ctl_opn( numcom, 'communication_report.txt', 'REPLACE', 'FORMATTED', 'SEQUENTIAL', -1, numout, .FALSE., narea )
1293         WRITE(numcom,*) ' '
1294         WRITE(numcom,*) ' ------------------------------------------------------------'
1295         WRITE(numcom,*) ' Communication pattern report (second oce+sbc+top time step):'
1296         WRITE(numcom,*) ' ------------------------------------------------------------'
1297         WRITE(numcom,*) ' '
1298         WRITE(numcom,'(A,I4)') ' Exchanged halos : ', n_sequence_lbc
1299         jj = 0; jk = 0; jf = 0; jh = 0
1300         DO ji = 1, n_sequence_lbc
1301            IF ( ncomm_sequence(ji,1) .GT. 1 ) jk = jk + 1
1302            IF ( ncomm_sequence(ji,2) .GT. 1 ) jf = jf + 1
1303            IF ( ncomm_sequence(ji,1) .GT. 1 .AND. ncomm_sequence(ji,2) .GT. 1 ) jj = jj + 1
1304            jh = MAX (jh, ncomm_sequence(ji,1)*ncomm_sequence(ji,2))
1305         END DO
1306         WRITE(numcom,'(A,I3)') ' 3D Exchanged halos : ', jk
1307         WRITE(numcom,'(A,I3)') ' Multi arrays exchanged halos : ', jf
1308         WRITE(numcom,'(A,I3)') '   from which 3D : ', jj
1309         WRITE(numcom,'(A,I10)') ' Array max size : ', jh*jpi*jpj
1310         WRITE(numcom,*) ' '
1311         WRITE(numcom,*) ' lbc_lnk called'
1312         DO ji = 1, n_sequence_lbc - 1
1313            IF ( crname_lbc(ji) /= 'already counted' ) THEN
1314               ccountname = crname_lbc(ji)
1315               crname_lbc(ji) = 'already counted'
1316               jcount = 1
1317               DO jj = ji + 1, n_sequence_lbc
1318                  IF ( ccountname ==  crname_lbc(jj) ) THEN
1319                     jcount = jcount + 1
1320                     crname_lbc(jj) = 'already counted'
1321                  END IF
1322               END DO
1323               WRITE(numcom,'(A, I4, A, A)') ' - ', jcount,' times by subroutine ', TRIM(ccountname)
1324            END IF
1325         END DO
1326         IF ( crname_lbc(n_sequence_lbc) /= 'already counted' ) THEN
1327            WRITE(numcom,'(A, I4, A, A)') ' - ', 1,' times by subroutine ', TRIM(crname_lbc(n_sequence_lbc))
1328         END IF
1329         WRITE(numcom,*) ' '
1330         IF ( n_sequence_glb > 0 ) THEN
1331            WRITE(numcom,'(A,I4)') ' Global communications : ', n_sequence_glb
1332            jj = 1
1333            DO ji = 2, n_sequence_glb
1334               IF( crname_glb(ji-1) /= crname_glb(ji) ) THEN
1335                  WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_glb(ji-1))
1336                  jj = 0
1337               END IF
1338               jj = jj + 1
1339            END DO
1340            WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_glb(n_sequence_glb))
1341            DEALLOCATE(crname_glb)
1342         ELSE
1343            WRITE(numcom,*) ' No MPI global communication '
1344         ENDIF
1345         WRITE(numcom,*) ' '
1346         IF ( n_sequence_dlg > 0 ) THEN
1347            WRITE(numcom,'(A,I4)') ' Delayed global communications : ', n_sequence_dlg
1348            jj = 1
1349            DO ji = 2, n_sequence_dlg
1350               IF( crname_dlg(ji-1) /= crname_dlg(ji) ) THEN
1351                  WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_dlg(ji-1))
1352                  jj = 0
1353               END IF
1354               jj = jj + 1
1355            END DO
1356            WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_dlg(n_sequence_dlg))
1357            DEALLOCATE(crname_dlg)
1358         ELSE
1359            WRITE(numcom,*) ' No MPI delayed global communication '
1360         ENDIF
1361         WRITE(numcom,*) ' '
1362         WRITE(numcom,*) ' -----------------------------------------------'
1363         WRITE(numcom,*) ' '
1364         DEALLOCATE(ncomm_sequence)
1365         DEALLOCATE(crname_lbc)
1366      ENDIF
1367#endif
1368   END SUBROUTINE mpp_report
1369
1370
1371   SUBROUTINE tic_tac (ld_tic, ld_global)
1372
1373    LOGICAL,           INTENT(IN) :: ld_tic
1374    LOGICAL, OPTIONAL, INTENT(IN) :: ld_global
1375    REAL(dp), DIMENSION(2), SAVE :: tic_wt
1376    REAL(dp),               SAVE :: tic_ct = 0._dp
1377    INTEGER :: ii
1378#if ! defined key_mpi_off
1379
1380    IF( ncom_stp <= nit000 ) RETURN
1381    IF( ncom_stp == nitend ) RETURN
1382    ii = 1
1383    IF( PRESENT( ld_global ) ) THEN
1384       IF( ld_global ) ii = 2
1385    END IF
1386
1387    IF ( ld_tic ) THEN
1388       tic_wt(ii) = MPI_Wtime()                                                    ! start count tic->tac (waiting time)
1389       IF ( tic_ct > 0.0_dp ) compute_time = compute_time + MPI_Wtime() - tic_ct   ! cumulate count tac->tic
1390    ELSE
1391       waiting_time(ii) = waiting_time(ii) + MPI_Wtime() - tic_wt(ii)              ! cumulate count tic->tac
1392       tic_ct = MPI_Wtime()                                                        ! start count tac->tic (waiting time)
1393    ENDIF
1394#endif
1395
1396   END SUBROUTINE tic_tac
1397
1398#if defined key_mpi_off
1399   SUBROUTINE mpi_wait(request, status, ierror)
1400      INTEGER                            , INTENT(in   ) ::   request
1401      INTEGER, DIMENSION(MPI_STATUS_SIZE), INTENT(  out) ::   status
1402      INTEGER                            , INTENT(  out) ::   ierror
1403   END SUBROUTINE mpi_wait
1404
1405
1406   FUNCTION MPI_Wtime()
1407      REAL(wp) ::  MPI_Wtime
1408      MPI_Wtime = -1.
1409   END FUNCTION MPI_Wtime
1410#endif
1411
1412   !!----------------------------------------------------------------------
1413   !!   ctl_stop, ctl_warn, get_unit, ctl_opn, ctl_nam, load_nml   routines
1414   !!----------------------------------------------------------------------
1415
1416   SUBROUTINE ctl_stop( cd1, cd2, cd3, cd4, cd5 ,   &
1417      &                 cd6, cd7, cd8, cd9, cd10 )
1418      !!----------------------------------------------------------------------
1419      !!                  ***  ROUTINE  stop_opa  ***
1420      !!
1421      !! ** Purpose :   print in ocean.outpput file a error message and
1422      !!                increment the error number (nstop) by one.
1423      !!----------------------------------------------------------------------
1424      CHARACTER(len=*), INTENT(in   )           ::   cd1
1425      CHARACTER(len=*), INTENT(in   ), OPTIONAL ::        cd2, cd3, cd4, cd5
1426      CHARACTER(len=*), INTENT(in   ), OPTIONAL ::   cd6, cd7, cd8, cd9, cd10
1427      !
1428      CHARACTER(LEN=8) ::   clfmt            ! writing format
1429      INTEGER          ::   inum
1430      !!----------------------------------------------------------------------
1431      !
1432      nstop = nstop + 1
1433      !
1434      IF( cd1 == 'STOP' .AND. narea /= 1 ) THEN    ! Immediate stop: add an arror message in 'ocean.output' file
1435         CALL ctl_opn( inum, 'ocean.output', 'APPEND', 'FORMATTED', 'SEQUENTIAL', -1, 6, .FALSE. )
1436         WRITE(inum,*)
1437         WRITE(inum,*) ' ==>>>   Look for "E R R O R" messages in all existing *ocean.output* files'
1438         CLOSE(inum)
1439      ENDIF
1440      IF( numout == 6 ) THEN                       ! force to open ocean.output file if not already opened
1441         CALL ctl_opn( numout, 'ocean.output', 'REPLACE', 'FORMATTED', 'SEQUENTIAL', -1, -1, .FALSE., narea )
1442      ENDIF
1443      !
1444                            WRITE(numout,*)
1445                            WRITE(numout,*) ' ===>>> : E R R O R'
1446                            WRITE(numout,*)
1447                            WRITE(numout,*) '         ==========='
1448                            WRITE(numout,*)
1449                            WRITE(numout,*) TRIM(cd1)
1450      IF( PRESENT(cd2 ) )   WRITE(numout,*) TRIM(cd2)
1451      IF( PRESENT(cd3 ) )   WRITE(numout,*) TRIM(cd3)
1452      IF( PRESENT(cd4 ) )   WRITE(numout,*) TRIM(cd4)
1453      IF( PRESENT(cd5 ) )   WRITE(numout,*) TRIM(cd5)
1454      IF( PRESENT(cd6 ) )   WRITE(numout,*) TRIM(cd6)
1455      IF( PRESENT(cd7 ) )   WRITE(numout,*) TRIM(cd7)
1456      IF( PRESENT(cd8 ) )   WRITE(numout,*) TRIM(cd8)
1457      IF( PRESENT(cd9 ) )   WRITE(numout,*) TRIM(cd9)
1458      IF( PRESENT(cd10) )   WRITE(numout,*) TRIM(cd10)
1459                            WRITE(numout,*)
1460      !
1461                               CALL FLUSH(numout    )
1462      IF( numstp     /= -1 )   CALL FLUSH(numstp    )
1463      IF( numrun     /= -1 )   CALL FLUSH(numrun    )
1464      IF( numevo_ice /= -1 )   CALL FLUSH(numevo_ice)
1465      !
1466      IF( cd1 == 'STOP' ) THEN
1467         WRITE(numout,*)
1468         WRITE(numout,*)  'huge E-R-R-O-R : immediate stop'
1469         WRITE(numout,*)
1470         CALL FLUSH(numout)
1471         CALL SLEEP(60)   ! make sure that all output and abort files are written by all cores. 60s should be enough...
1472         CALL mppstop( ld_abort = .true. )
1473      ENDIF
1474      !
1475   END SUBROUTINE ctl_stop
1476
1477
1478   SUBROUTINE ctl_warn( cd1, cd2, cd3, cd4, cd5,   &
1479      &                 cd6, cd7, cd8, cd9, cd10 )
1480      !!----------------------------------------------------------------------
1481      !!                  ***  ROUTINE  stop_warn  ***
1482      !!
1483      !! ** Purpose :   print in ocean.outpput file a error message and
1484      !!                increment the warning number (nwarn) by one.
1485      !!----------------------------------------------------------------------
1486      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd1, cd2, cd3, cd4, cd5
1487      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd6, cd7, cd8, cd9, cd10
1488      !!----------------------------------------------------------------------
1489      !
1490      nwarn = nwarn + 1
1491      !
1492      IF(lwp) THEN
1493                               WRITE(numout,*)
1494                               WRITE(numout,*) ' ===>>> : W A R N I N G'
1495                               WRITE(numout,*)
1496                               WRITE(numout,*) '         ==============='
1497                               WRITE(numout,*)
1498         IF( PRESENT(cd1 ) )   WRITE(numout,*) TRIM(cd1)
1499         IF( PRESENT(cd2 ) )   WRITE(numout,*) TRIM(cd2)
1500         IF( PRESENT(cd3 ) )   WRITE(numout,*) TRIM(cd3)
1501         IF( PRESENT(cd4 ) )   WRITE(numout,*) TRIM(cd4)
1502         IF( PRESENT(cd5 ) )   WRITE(numout,*) TRIM(cd5)
1503         IF( PRESENT(cd6 ) )   WRITE(numout,*) TRIM(cd6)
1504         IF( PRESENT(cd7 ) )   WRITE(numout,*) TRIM(cd7)
1505         IF( PRESENT(cd8 ) )   WRITE(numout,*) TRIM(cd8)
1506         IF( PRESENT(cd9 ) )   WRITE(numout,*) TRIM(cd9)
1507         IF( PRESENT(cd10) )   WRITE(numout,*) TRIM(cd10)
1508                               WRITE(numout,*)
1509      ENDIF
1510      CALL FLUSH(numout)
1511      !
1512   END SUBROUTINE ctl_warn
1513
1514
1515   SUBROUTINE ctl_opn( knum, cdfile, cdstat, cdform, cdacce, klengh, kout, ldwp, karea )
1516      !!----------------------------------------------------------------------
1517      !!                  ***  ROUTINE ctl_opn  ***
1518      !!
1519      !! ** Purpose :   Open file and check if required file is available.
1520      !!
1521      !! ** Method  :   Fortan open
1522      !!----------------------------------------------------------------------
1523      INTEGER          , INTENT(  out) ::   knum      ! logical unit to open
1524      CHARACTER(len=*) , INTENT(in   ) ::   cdfile    ! file name to open
1525      CHARACTER(len=*) , INTENT(in   ) ::   cdstat    ! disposition specifier
1526      CHARACTER(len=*) , INTENT(in   ) ::   cdform    ! formatting specifier
1527      CHARACTER(len=*) , INTENT(in   ) ::   cdacce    ! access specifier
1528      INTEGER          , INTENT(in   ) ::   klengh    ! record length
1529      INTEGER          , INTENT(in   ) ::   kout      ! number of logical units for write
1530      LOGICAL          , INTENT(in   ) ::   ldwp      ! boolean term for print
1531      INTEGER, OPTIONAL, INTENT(in   ) ::   karea     ! proc number
1532      !
1533      CHARACTER(len=80) ::   clfile
1534      CHARACTER(LEN=10) ::   clfmt            ! writing format
1535      INTEGER           ::   iost
1536      INTEGER           ::   idg              ! number of digits
1537      !!----------------------------------------------------------------------
1538      !
1539      ! adapt filename
1540      ! ----------------
1541      clfile = TRIM(cdfile)
1542      IF( PRESENT( karea ) ) THEN
1543         IF( karea > 1 ) THEN
1544            ! Warning: jpnij is maybe not already defined when calling ctl_opn -> use mppsize instead of jpnij
1545            idg = MAX( INT(LOG10(REAL(MAX(1,mppsize-1),wp))) + 1, 4 )      ! how many digits to we need to write? min=4, max=9
1546            WRITE(clfmt, "('(a,a,i', i1, '.', i1, ')')") idg, idg          ! '(a,a,ix.x)'
1547            WRITE(clfile, clfmt) TRIM(clfile), '_', karea-1
1548         ENDIF
1549      ENDIF
1550#if defined key_agrif
1551      IF( .NOT. Agrif_Root() )   clfile = TRIM(Agrif_CFixed())//'_'//TRIM(clfile)
1552      knum=Agrif_Get_Unit()
1553#else
1554      knum=get_unit()
1555#endif
1556      IF( TRIM(cdfile) == '/dev/null' )   clfile = TRIM(cdfile)   ! force the use of /dev/null
1557      !
1558      IF(       cdacce(1:6) == 'DIRECT' )  THEN   ! cdacce has always more than 6 characters
1559         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS=cdstat, RECL=klengh         , ERR=100, IOSTAT=iost )
1560      ELSE IF( TRIM(cdstat) == 'APPEND' )  THEN   ! cdstat can have less than 6 characters
1561         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS='UNKNOWN', POSITION='APPEND', ERR=100, IOSTAT=iost )
1562      ELSE
1563         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS=cdstat                      , ERR=100, IOSTAT=iost )
1564      ENDIF
1565      IF( iost /= 0 .AND. TRIM(clfile) == '/dev/null' ) &   ! for windows
1566         &  OPEN(UNIT=knum,FILE='NUL', FORM=cdform, ACCESS=cdacce, STATUS=cdstat                      , ERR=100, IOSTAT=iost )
1567      IF( iost == 0 ) THEN
1568         IF(ldwp .AND. kout > 0) THEN
1569            WRITE(kout,*) '     file   : ', TRIM(clfile),' open ok'
1570            WRITE(kout,*) '     unit   = ', knum
1571            WRITE(kout,*) '     status = ', cdstat
1572            WRITE(kout,*) '     form   = ', cdform
1573            WRITE(kout,*) '     access = ', cdacce
1574            WRITE(kout,*)
1575         ENDIF
1576      ENDIF
1577100   CONTINUE
1578      IF( iost /= 0 ) THEN
1579         WRITE(ctmp1,*) ' ===>>>> : bad opening file: ', TRIM(clfile)
1580         WRITE(ctmp2,*) ' =======   ===  '
1581         WRITE(ctmp3,*) '           unit   = ', knum
1582         WRITE(ctmp4,*) '           status = ', cdstat
1583         WRITE(ctmp5,*) '           form   = ', cdform
1584         WRITE(ctmp6,*) '           access = ', cdacce
1585         WRITE(ctmp7,*) '           iostat = ', iost
1586         WRITE(ctmp8,*) '           we stop. verify the file '
1587         CALL ctl_stop( 'STOP', ctmp1, ctmp2, ctmp3, ctmp4, ctmp5, ctmp6, ctmp7, ctmp8 )
1588      ENDIF
1589      !
1590   END SUBROUTINE ctl_opn
1591
1592
1593   SUBROUTINE ctl_nam ( kios, cdnam )
1594      !!----------------------------------------------------------------------
1595      !!                  ***  ROUTINE ctl_nam  ***
1596      !!
1597      !! ** Purpose :   Informations when error while reading a namelist
1598      !!
1599      !! ** Method  :   Fortan open
1600      !!----------------------------------------------------------------------
1601      INTEGER                                , INTENT(inout) ::   kios    ! IO status after reading the namelist
1602      CHARACTER(len=*)                       , INTENT(in   ) ::   cdnam   ! group name of namelist for which error occurs
1603      !
1604      CHARACTER(len=5) ::   clios   ! string to convert iostat in character for print
1605      !!----------------------------------------------------------------------
1606      !
1607      WRITE (clios, '(I5.0)')   kios
1608      IF( kios < 0 ) THEN
1609         CALL ctl_warn( 'end of record or file while reading namelist '   &
1610            &           // TRIM(cdnam) // ' iostat = ' // TRIM(clios) )
1611      ENDIF
1612      !
1613      IF( kios > 0 ) THEN
1614         CALL ctl_stop( 'misspelled variable in namelist '   &
1615            &           // TRIM(cdnam) // ' iostat = ' // TRIM(clios) )
1616      ENDIF
1617      kios = 0
1618      !
1619   END SUBROUTINE ctl_nam
1620
1621
1622   INTEGER FUNCTION get_unit()
1623      !!----------------------------------------------------------------------
1624      !!                  ***  FUNCTION  get_unit  ***
1625      !!
1626      !! ** Purpose :   return the index of an unused logical unit
1627      !!----------------------------------------------------------------------
1628      LOGICAL :: llopn
1629      !!----------------------------------------------------------------------
1630      !
1631      get_unit = 15   ! choose a unit that is big enough then it is not already used in NEMO
1632      llopn = .TRUE.
1633      DO WHILE( (get_unit < 998) .AND. llopn )
1634         get_unit = get_unit + 1
1635         INQUIRE( unit = get_unit, opened = llopn )
1636      END DO
1637      IF( (get_unit == 999) .AND. llopn ) THEN
1638         CALL ctl_stop( 'STOP', 'get_unit: All logical units until 999 are used...' )
1639      ENDIF
1640      !
1641   END FUNCTION get_unit
1642
1643   SUBROUTINE load_nml( cdnambuff , cdnamfile, kout, ldwp)
1644      CHARACTER(LEN=:)    , ALLOCATABLE, INTENT(INOUT) :: cdnambuff
1645      CHARACTER(LEN=*), INTENT(IN )                :: cdnamfile
1646      CHARACTER(LEN=256)                           :: chline
1647      CHARACTER(LEN=1)                             :: csp
1648      INTEGER, INTENT(IN)                          :: kout
1649      LOGICAL, INTENT(IN)                          :: ldwp  !: .true. only for the root broadcaster
1650      INTEGER                                      :: itot, iun, iltc, inl, ios, itotsav
1651      !
1652      !csp = NEW_LINE('A')
1653      ! a new line character is the best seperator but some systems (e.g.Cray)
1654      ! seem to terminate namelist reads from internal files early if they
1655      ! encounter new-lines. Use a single space for safety.
1656      csp = ' '
1657      !
1658      ! Check if the namelist buffer has already been allocated. Return if it has.
1659      !
1660      IF ( ALLOCATED( cdnambuff ) ) RETURN
1661      IF( ldwp ) THEN
1662         !
1663         ! Open namelist file
1664         !
1665         CALL ctl_opn( iun, cdnamfile, 'OLD', 'FORMATTED', 'SEQUENTIAL', -1, kout, ldwp )
1666         !
1667         ! First pass: count characters excluding comments and trimable white space
1668         !
1669         itot=0
1670     10  READ(iun,'(A256)',END=20,ERR=20) chline
1671         iltc = LEN_TRIM(chline)
1672         IF ( iltc.GT.0 ) THEN
1673          inl = INDEX(chline, '!')
1674          IF( inl.eq.0 ) THEN
1675           itot = itot + iltc + 1                                ! +1 for the newline character
1676          ELSEIF( inl.GT.0 .AND. LEN_TRIM( chline(1:inl-1) ).GT.0 ) THEN
1677           itot = itot + inl                                  !  includes +1 for the newline character
1678          ENDIF
1679         ENDIF
1680         GOTO 10
1681     20  CONTINUE
1682         !
1683         ! Allocate text cdnambuff for condensed namelist
1684         !
1685!$AGRIF_DO_NOT_TREAT
1686         ALLOCATE( CHARACTER(LEN=itot) :: cdnambuff )
1687!$AGRIF_END_DO_NOT_TREAT
1688         itotsav = itot
1689         !
1690         ! Second pass: read and transfer pruned characters into cdnambuff
1691         !
1692         REWIND(iun)
1693         itot=1
1694     30  READ(iun,'(A256)',END=40,ERR=40) chline
1695         iltc = LEN_TRIM(chline)
1696         IF ( iltc.GT.0 ) THEN
1697          inl = INDEX(chline, '!')
1698          IF( inl.eq.0 ) THEN
1699           inl = iltc
1700          ELSE
1701           inl = inl - 1
1702          ENDIF
1703          IF( inl.GT.0 .AND. LEN_TRIM( chline(1:inl) ).GT.0 ) THEN
1704             cdnambuff(itot:itot+inl-1) = chline(1:inl)
1705             WRITE( cdnambuff(itot+inl:itot+inl), '(a)' ) csp
1706             itot = itot + inl + 1
1707          ENDIF
1708         ENDIF
1709         GOTO 30
1710     40  CONTINUE
1711         itot = itot - 1
1712         IF( itotsav .NE. itot ) WRITE(*,*) 'WARNING in load_nml. Allocated ',itotsav,' for read buffer; but used ',itot
1713         !
1714         ! Close namelist file
1715         !
1716         CLOSE(iun)
1717         !write(*,'(32A)') cdnambuff
1718      ENDIF
1719#if ! defined key_mpi_off
1720      CALL mpp_bcast_nml( cdnambuff, itot )
1721#endif
1722  END SUBROUTINE load_nml
1723
1724
1725   !!----------------------------------------------------------------------
1726END MODULE lib_mpp
Note: See TracBrowser for help on using the repository browser.