New URL for NEMO forge!   http://forge.nemo-ocean.eu

Since March 2022 along with NEMO 4.2 release, the code development moved to a self-hosted GitLab.
This present forge is now archived and remained online for history.
lib_mpp.F90 in NEMO/trunk/src/OCE/LBC – NEMO

source: NEMO/trunk/src/OCE/LBC/lib_mpp.F90 @ 14275

Last change on this file since 14275 was 14275, checked in by smasson, 3 years ago

trunk: suppress nproc ( = mpprank = narea-1)

  • Property svn:keywords set to Id
File size: 73.9 KB
Line 
1MODULE lib_mpp
2   !!======================================================================
3   !!                       ***  MODULE  lib_mpp  ***
4   !! Ocean numerics:  massively parallel processing library
5   !!=====================================================================
6   !! History :  OPA  !  1994  (M. Guyon, J. Escobar, M. Imbard)  Original code
7   !!            7.0  !  1997  (A.M. Treguier)  SHMEM additions
8   !!            8.0  !  1998  (M. Imbard, J. Escobar, L. Colombet ) SHMEM and MPI
9   !!                 !  1998  (J.M. Molines) Open boundary conditions
10   !!   NEMO     1.0  !  2003  (J.M. Molines, G. Madec)  F90, free form
11   !!                 !  2003  (J.M. Molines) add mpp_ini_north(_3d,_2d)
12   !!             -   !  2004  (R. Bourdalle Badie)  isend option in mpi
13   !!                 !  2004  (J.M. Molines) minloc, maxloc
14   !!             -   !  2005  (G. Madec, S. Masson)  npolj=5,6 F-point & ice cases
15   !!             -   !  2005  (R. Redler) Replacement of MPI_COMM_WORLD except for MPI_Abort
16   !!             -   !  2005  (R. Benshila, G. Madec)  add extra halo case
17   !!             -   !  2008  (R. Benshila) add mpp_ini_ice
18   !!            3.2  !  2009  (R. Benshila) SHMEM suppression, north fold in lbc_nfd
19   !!            3.2  !  2009  (O. Marti)    add mpp_ini_znl
20   !!            4.0  !  2011  (G. Madec)  move ctl_ routines from in_out_manager
21   !!            3.5  !  2012  (S.Mocavero, I. Epicoco) Add mpp_lnk_bdy_3d/2d routines to optimize the BDY comm.
22   !!            3.5  !  2013  (C. Ethe, G. Madec)  message passing arrays as local variables
23   !!            3.5  !  2013  (S.Mocavero, I.Epicoco - CMCC) north fold optimizations
24   !!            3.6  !  2015  (O. Tintó and M. Castrillo - BSC) Added '_multiple' case for 2D lbc and max
25   !!            4.0  !  2017  (G. Madec) automatique allocation of array argument (use any 3rd dimension)
26   !!             -   !  2017  (G. Madec) create generic.h90 files to generate all lbc and north fold routines
27   !!----------------------------------------------------------------------
28
29   !!----------------------------------------------------------------------
30   !!   ctl_stop      : update momentum and tracer Kz from a tke scheme
31   !!   ctl_warn      : initialization, namelist read, and parameters control
32   !!   ctl_opn       : Open file and check if required file is available.
33   !!   ctl_nam       : Prints informations when an error occurs while reading a namelist
34   !!   load_nml      : Read, condense and buffer namelist file into character array for use as an internal file
35   !!----------------------------------------------------------------------
36   !!----------------------------------------------------------------------
37   !!   mpp_start     : get local communicator its size and rank
38   !!   mpp_lnk       : interface (defined in lbclnk) for message passing of 2d or 3d arrays (mpp_lnk_2d, mpp_lnk_3d)
39   !!   mpp_lnk_icb   : interface for message passing of 2d arrays with extra halo for icebergs (mpp_lnk_2d_icb)
40   !!   mpprecv       :
41   !!   mppsend       :
42   !!   mppscatter    :
43   !!   mppgather     :
44   !!   mpp_min       : generic interface for mppmin_int , mppmin_a_int , mppmin_real, mppmin_a_real
45   !!   mpp_max       : generic interface for mppmax_int , mppmax_a_int , mppmax_real, mppmax_a_real
46   !!   mpp_sum       : generic interface for mppsum_int , mppsum_a_int , mppsum_real, mppsum_a_real
47   !!   mpp_minloc    :
48   !!   mpp_maxloc    :
49   !!   mppsync       :
50   !!   mppstop       :
51   !!   mpp_ini_north : initialisation of north fold
52   !!   mpp_lbc_north_icb : alternative to mpp_nfd for extra outer halo with icebergs
53   !!   mpp_bcast_nml : broadcast/receive namelist character buffer from reading process to all others
54   !!----------------------------------------------------------------------
55   USE dom_oce        ! ocean space and time domain
56   USE in_out_manager ! I/O manager
57
58   IMPLICIT NONE
59   PRIVATE
60   !
61   PUBLIC   ctl_stop, ctl_warn, ctl_opn, ctl_nam, load_nml
62   PUBLIC   mpp_start, mppstop, mppsync, mpp_comm_free
63   PUBLIC   mpp_ini_north
64   PUBLIC   mpp_min, mpp_max, mpp_sum, mpp_minloc, mpp_maxloc
65   PUBLIC   mpp_delay_max, mpp_delay_sum, mpp_delay_rcv
66   PUBLIC   mppscatter, mppgather
67   PUBLIC   mpp_ini_znl
68   PUBLIC   mpp_ini_nc
69   PUBLIC   mppsend, mpprecv                          ! needed by TAM and ICB routines
70   PUBLIC   mppsend_sp, mpprecv_sp                          ! needed by TAM and ICB routines
71   PUBLIC   mppsend_dp, mpprecv_dp                          ! needed by TAM and ICB routines
72   PUBLIC   mpp_report
73   PUBLIC   mpp_bcast_nml
74   PUBLIC   tic_tac
75#if defined key_mpp_off
76   PUBLIC MPI_wait
77   PUBLIC MPI_Wtime
78#endif
79
80   !! * Interfaces
81   !! define generic interface for these routine as they are called sometimes
82   !! with scalar arguments instead of array arguments, which causes problems
83   !! for the compilation on AIX system as well as NEC and SGI. Ok on COMPACQ
84   INTERFACE mpp_min
85      MODULE PROCEDURE mppmin_a_int, mppmin_int
86      MODULE PROCEDURE mppmin_a_real_sp, mppmin_real_sp
87      MODULE PROCEDURE mppmin_a_real_dp, mppmin_real_dp
88   END INTERFACE
89   INTERFACE mpp_max
90      MODULE PROCEDURE mppmax_a_int, mppmax_int
91      MODULE PROCEDURE mppmax_a_real_sp, mppmax_real_sp
92      MODULE PROCEDURE mppmax_a_real_dp, mppmax_real_dp
93   END INTERFACE
94   INTERFACE mpp_sum
95      MODULE PROCEDURE mppsum_a_int, mppsum_int
96      MODULE PROCEDURE mppsum_realdd, mppsum_a_realdd
97      MODULE PROCEDURE mppsum_a_real_sp, mppsum_real_sp
98      MODULE PROCEDURE mppsum_a_real_dp, mppsum_real_dp
99   END INTERFACE
100   INTERFACE mpp_minloc
101      MODULE PROCEDURE mpp_minloc2d_sp ,mpp_minloc3d_sp
102      MODULE PROCEDURE mpp_minloc2d_dp ,mpp_minloc3d_dp
103   END INTERFACE
104   INTERFACE mpp_maxloc
105      MODULE PROCEDURE mpp_maxloc2d_sp ,mpp_maxloc3d_sp
106      MODULE PROCEDURE mpp_maxloc2d_dp ,mpp_maxloc3d_dp
107   END INTERFACE
108
109   !! ========================= !!
110   !!  MPI  variable definition !!
111   !! ========================= !!
112#if ! defined key_mpi_off
113!$AGRIF_DO_NOT_TREAT
114   INCLUDE 'mpif.h'
115!$AGRIF_END_DO_NOT_TREAT
116   LOGICAL, PUBLIC, PARAMETER ::   lk_mpp = .TRUE.    !: mpp flag
117#else
118   INTEGER, PUBLIC, PARAMETER ::   MPI_STATUS_SIZE = 1
119   INTEGER, PUBLIC, PARAMETER ::   MPI_REAL = 4
120   INTEGER, PUBLIC, PARAMETER ::   MPI_DOUBLE_PRECISION = 8
121   LOGICAL, PUBLIC, PARAMETER ::   lk_mpp = .FALSE.    !: mpp flag
122#endif
123
124   INTEGER, PUBLIC ::   mppsize        ! number of process
125   INTEGER, PUBLIC ::   mpprank        ! process number  [ 0 - size-1 ]
126!$AGRIF_DO_NOT_TREAT
127   INTEGER, PUBLIC ::   mpi_comm_oce   ! opa local communicator
128!$AGRIF_END_DO_NOT_TREAT
129
130   INTEGER :: MPI_SUMDD
131
132   ! variables used for zonal integration
133   INTEGER, PUBLIC ::   ncomm_znl       !: communicator made by the processors on the same zonal average
134   LOGICAL, PUBLIC ::   l_znl_root      !: True on the 'left'most processor on the same row
135   INTEGER         ::   ngrp_znl        !  group ID for the znl processors
136   INTEGER         ::   ndim_rank_znl   !  number of processors on the same zonal average
137   INTEGER, DIMENSION(:), ALLOCATABLE, SAVE ::   nrank_znl  ! dimension ndim_rank_znl, number of the procs into the same znl domain
138
139   ! variables used for MPI3 neighbourhood collectives
140   INTEGER, PUBLIC :: mpi_nc_com                   ! MPI3 neighbourhood collectives communicator
141   INTEGER, PUBLIC :: mpi_nc_all_com               ! MPI3 neighbourhood collectives communicator (with diagionals)
142
143   ! North fold condition in mpp_mpi with jpni > 1 (PUBLIC for TAM)
144   INTEGER, PUBLIC ::   ngrp_world        !: group ID for the world processors
145   INTEGER, PUBLIC ::   ngrp_opa          !: group ID for the opa processors
146   INTEGER, PUBLIC ::   ngrp_north        !: group ID for the northern processors (to be fold)
147   INTEGER, PUBLIC ::   ncomm_north       !: communicator made by the processors belonging to ngrp_north
148   INTEGER, PUBLIC ::   ndim_rank_north   !: number of 'sea' processor in the northern line (can be /= jpni !)
149   INTEGER, PUBLIC ::   njmppmax          !: value of njmpp for the processors of the northern line
150   INTEGER, PUBLIC ::   north_root        !: number (in the comm_opa) of proc 0 in the northern comm
151   INTEGER, PUBLIC, DIMENSION(:), ALLOCATABLE, SAVE ::   nrank_north   !: dimension ndim_rank_north
152
153   ! Communications summary report
154   CHARACTER(len=lca), DIMENSION(:), ALLOCATABLE ::   crname_lbc                   !: names of lbc_lnk calling routines
155   CHARACTER(len=lca), DIMENSION(:), ALLOCATABLE ::   crname_glb                   !: names of global comm calling routines
156   CHARACTER(len=lca), DIMENSION(:), ALLOCATABLE ::   crname_dlg                   !: names of delayed global comm calling routines
157   INTEGER, PUBLIC                               ::   ncom_stp = 0                 !: copy of time step # istp
158   INTEGER, PUBLIC                               ::   ncom_fsbc = 1                !: copy of sbc time step # nn_fsbc
159   INTEGER, PUBLIC                               ::   ncom_freq                    !: frequency of comm diagnostic
160   INTEGER, PUBLIC , DIMENSION(:,:), ALLOCATABLE ::   ncomm_sequence               !: size of communicated arrays (halos)
161   INTEGER, PARAMETER, PUBLIC                    ::   ncom_rec_max = 5000          !: max number of communication record
162   INTEGER, PUBLIC                               ::   n_sequence_lbc = 0           !: # of communicated arraysvia lbc
163   INTEGER, PUBLIC                               ::   n_sequence_glb = 0           !: # of global communications
164   INTEGER, PUBLIC                               ::   n_sequence_dlg = 0           !: # of delayed global communications
165   INTEGER, PUBLIC                               ::   numcom = -1                  !: logical unit for communicaton report
166   LOGICAL, PUBLIC                               ::   l_full_nf_update = .TRUE.    !: logical for a full (2lines) update of bc at North fold report
167   INTEGER,                    PARAMETER, PUBLIC ::   nbdelay = 2       !: number of delayed operations
168   !: name (used as id) of allreduce-delayed operations
169   ! Warning: we must use the same character length in an array constructor (at least for gcc compiler)
170   CHARACTER(len=32), DIMENSION(nbdelay), PUBLIC ::   c_delaylist = (/ 'cflice', 'fwb   ' /)
171   !: component name where the allreduce-delayed operation is performed
172   CHARACTER(len=3),  DIMENSION(nbdelay), PUBLIC ::   c_delaycpnt = (/ 'ICE'   , 'OCE' /)
173   TYPE, PUBLIC ::   DELAYARR
174      REAL(   wp), POINTER, DIMENSION(:) ::  z1d => NULL()
175      COMPLEX(dp), POINTER, DIMENSION(:) ::  y1d => NULL()
176   END TYPE DELAYARR
177   TYPE( DELAYARR ), DIMENSION(nbdelay), PUBLIC, SAVE  ::   todelay         !: must have SAVE for default initialization of DELAYARR
178   INTEGER,          DIMENSION(nbdelay), PUBLIC        ::   ndelayid = -1   !: mpi request id of the delayed operations
179
180   ! timing summary report
181   REAL(dp), DIMENSION(2), PUBLIC ::  waiting_time = 0._dp
182   REAL(dp)              , PUBLIC ::  compute_time = 0._dp, elapsed_time = 0._dp
183
184   REAL(wp), DIMENSION(:), ALLOCATABLE, SAVE ::   tampon   ! buffer in case of bsend
185
186   LOGICAL, PUBLIC ::   ln_nnogather                !: namelist control of northfold comms
187   LOGICAL, PUBLIC ::   l_north_nogather = .FALSE.  !: internal control of northfold comms
188
189   !! * Substitutions
190#  include "do_loop_substitute.h90"
191   !!----------------------------------------------------------------------
192   !! NEMO/OCE 4.0 , NEMO Consortium (2018)
193   !! $Id$
194   !! Software governed by the CeCILL license (see ./LICENSE)
195   !!----------------------------------------------------------------------
196CONTAINS
197
198   SUBROUTINE mpp_start( localComm )
199      !!----------------------------------------------------------------------
200      !!                  ***  routine mpp_start  ***
201      !!
202      !! ** Purpose :   get mpi_comm_oce, mpprank and mppsize
203      !!----------------------------------------------------------------------
204      INTEGER         , OPTIONAL   , INTENT(in   ) ::   localComm    !
205      !
206      INTEGER ::   ierr
207      LOGICAL ::   llmpi_init
208      !!----------------------------------------------------------------------
209#if ! defined key_mpi_off
210      !
211      CALL mpi_initialized ( llmpi_init, ierr )
212      IF( ierr /= MPI_SUCCESS ) CALL ctl_stop( 'STOP', ' lib_mpp: Error in routine mpi_initialized' )
213
214      IF( .NOT. llmpi_init ) THEN
215         IF( PRESENT(localComm) ) THEN
216            WRITE(ctmp1,*) ' lib_mpp: You cannot provide a local communicator '
217            WRITE(ctmp2,*) '          without calling MPI_Init before ! '
218            CALL ctl_stop( 'STOP', ctmp1, ctmp2 )
219         ENDIF
220         CALL mpi_init( ierr )
221         IF( ierr /= MPI_SUCCESS ) CALL ctl_stop( 'STOP', ' lib_mpp: Error in routine mpi_init' )
222      ENDIF
223
224      IF( PRESENT(localComm) ) THEN
225         IF( Agrif_Root() ) THEN
226            mpi_comm_oce = localComm
227         ENDIF
228      ELSE
229         CALL mpi_comm_dup( mpi_comm_world, mpi_comm_oce, ierr)
230         IF( ierr /= MPI_SUCCESS ) CALL ctl_stop( 'STOP', ' lib_mpp: Error in routine mpi_comm_dup' )
231      ENDIF
232
233# if defined key_agrif
234      IF( Agrif_Root() ) THEN
235         CALL Agrif_MPI_Init(mpi_comm_oce)
236      ELSE
237         CALL Agrif_MPI_set_grid_comm(mpi_comm_oce)
238      ENDIF
239# endif
240
241      CALL mpi_comm_rank( mpi_comm_oce, mpprank, ierr )
242      CALL mpi_comm_size( mpi_comm_oce, mppsize, ierr )
243      !
244      CALL MPI_OP_CREATE(DDPDD_MPI, .TRUE., MPI_SUMDD, ierr)
245      !
246#else
247      IF( PRESENT( localComm ) ) mpi_comm_oce = localComm
248      mppsize = 1
249      mpprank = 0
250#endif
251   END SUBROUTINE mpp_start
252
253
254   SUBROUTINE mppsend( ktyp, pmess, kbytes, kdest, md_req )
255      !!----------------------------------------------------------------------
256      !!                  ***  routine mppsend  ***
257      !!
258      !! ** Purpose :   Send messag passing array
259      !!
260      !!----------------------------------------------------------------------
261      REAL(wp), INTENT(inout) ::   pmess(*)   ! array of real
262      INTEGER , INTENT(in   ) ::   kbytes     ! size of the array pmess
263      INTEGER , INTENT(in   ) ::   kdest      ! receive process number
264      INTEGER , INTENT(in   ) ::   ktyp       ! tag of the message
265      INTEGER , INTENT(in   ) ::   md_req     ! argument for isend
266      !!
267      INTEGER ::   iflag
268      INTEGER :: mpi_working_type
269      !!----------------------------------------------------------------------
270      !
271#if ! defined key_mpi_off
272      IF (wp == dp) THEN
273         mpi_working_type = mpi_double_precision
274      ELSE
275         mpi_working_type = mpi_real
276      END IF
277      CALL mpi_isend( pmess, kbytes, mpi_working_type, kdest , ktyp, mpi_comm_oce, md_req, iflag )
278#endif
279      !
280   END SUBROUTINE mppsend
281
282
283   SUBROUTINE mppsend_dp( ktyp, pmess, kbytes, kdest, md_req )
284      !!----------------------------------------------------------------------
285      !!                  ***  routine mppsend  ***
286      !!
287      !! ** Purpose :   Send messag passing array
288      !!
289      !!----------------------------------------------------------------------
290      REAL(dp), INTENT(inout) ::   pmess(*)   ! array of real
291      INTEGER , INTENT(in   ) ::   kbytes     ! size of the array pmess
292      INTEGER , INTENT(in   ) ::   kdest      ! receive process number
293      INTEGER , INTENT(in   ) ::   ktyp       ! tag of the message
294      INTEGER , INTENT(in   ) ::   md_req     ! argument for isend
295      !!
296      INTEGER ::   iflag
297      !!----------------------------------------------------------------------
298      !
299#if ! defined key_mpi_off
300      CALL mpi_isend( pmess, kbytes, mpi_double_precision, kdest , ktyp, mpi_comm_oce, md_req, iflag )
301#endif
302      !
303   END SUBROUTINE mppsend_dp
304
305
306   SUBROUTINE mppsend_sp( ktyp, pmess, kbytes, kdest, md_req )
307      !!----------------------------------------------------------------------
308      !!                  ***  routine mppsend  ***
309      !!
310      !! ** Purpose :   Send messag passing array
311      !!
312      !!----------------------------------------------------------------------
313      REAL(sp), INTENT(inout) ::   pmess(*)   ! array of real
314      INTEGER , INTENT(in   ) ::   kbytes     ! size of the array pmess
315      INTEGER , INTENT(in   ) ::   kdest      ! receive process number
316      INTEGER , INTENT(in   ) ::   ktyp       ! tag of the message
317      INTEGER , INTENT(in   ) ::   md_req     ! argument for isend
318      !!
319      INTEGER ::   iflag
320      !!----------------------------------------------------------------------
321      !
322#if ! defined key_mpi_off
323      CALL mpi_isend( pmess, kbytes, mpi_real, kdest , ktyp, mpi_comm_oce, md_req, iflag )
324#endif
325      !
326   END SUBROUTINE mppsend_sp
327
328
329   SUBROUTINE mpprecv( ktyp, pmess, kbytes, ksource )
330      !!----------------------------------------------------------------------
331      !!                  ***  routine mpprecv  ***
332      !!
333      !! ** Purpose :   Receive messag passing array
334      !!
335      !!----------------------------------------------------------------------
336      REAL(wp), INTENT(inout) ::   pmess(*)   ! array of real
337      INTEGER , INTENT(in   ) ::   kbytes     ! suze of the array pmess
338      INTEGER , INTENT(in   ) ::   ktyp       ! Tag of the recevied message
339      INTEGER, OPTIONAL, INTENT(in) :: ksource    ! source process number
340      !!
341      INTEGER :: istatus(mpi_status_size)
342      INTEGER :: iflag
343      INTEGER :: use_source
344      INTEGER :: mpi_working_type
345      !!----------------------------------------------------------------------
346      !
347#if ! defined key_mpi_off
348      ! If a specific process number has been passed to the receive call,
349      ! use that one. Default is to use mpi_any_source
350      use_source = mpi_any_source
351      IF( PRESENT(ksource) )   use_source = ksource
352      !
353      IF (wp == dp) THEN
354         mpi_working_type = mpi_double_precision
355      ELSE
356         mpi_working_type = mpi_real
357      END IF
358      CALL mpi_recv( pmess, kbytes, mpi_working_type, use_source, ktyp, mpi_comm_oce, istatus, iflag )
359#endif
360      !
361   END SUBROUTINE mpprecv
362
363   SUBROUTINE mpprecv_dp( ktyp, pmess, kbytes, ksource )
364      !!----------------------------------------------------------------------
365      !!                  ***  routine mpprecv  ***
366      !!
367      !! ** Purpose :   Receive messag passing array
368      !!
369      !!----------------------------------------------------------------------
370      REAL(dp), INTENT(inout) ::   pmess(*)   ! array of real
371      INTEGER , INTENT(in   ) ::   kbytes     ! suze of the array pmess
372      INTEGER , INTENT(in   ) ::   ktyp       ! Tag of the recevied message
373      INTEGER, OPTIONAL, INTENT(in) :: ksource    ! source process number
374      !!
375      INTEGER :: istatus(mpi_status_size)
376      INTEGER :: iflag
377      INTEGER :: use_source
378      !!----------------------------------------------------------------------
379      !
380#if ! defined key_mpi_off
381      ! If a specific process number has been passed to the receive call,
382      ! use that one. Default is to use mpi_any_source
383      use_source = mpi_any_source
384      IF( PRESENT(ksource) )   use_source = ksource
385      !
386      CALL mpi_recv( pmess, kbytes, mpi_double_precision, use_source, ktyp, mpi_comm_oce, istatus, iflag )
387#endif
388      !
389   END SUBROUTINE mpprecv_dp
390
391
392   SUBROUTINE mpprecv_sp( ktyp, pmess, kbytes, ksource )
393      !!----------------------------------------------------------------------
394      !!                  ***  routine mpprecv  ***
395      !!
396      !! ** Purpose :   Receive messag passing array
397      !!
398      !!----------------------------------------------------------------------
399      REAL(sp), INTENT(inout) ::   pmess(*)   ! array of real
400      INTEGER , INTENT(in   ) ::   kbytes     ! suze of the array pmess
401      INTEGER , INTENT(in   ) ::   ktyp       ! Tag of the recevied message
402      INTEGER, OPTIONAL, INTENT(in) :: ksource    ! source process number
403      !!
404      INTEGER :: istatus(mpi_status_size)
405      INTEGER :: iflag
406      INTEGER :: use_source
407      !!----------------------------------------------------------------------
408      !
409#if ! defined key_mpi_off
410      ! If a specific process number has been passed to the receive call,
411      ! use that one. Default is to use mpi_any_source
412      use_source = mpi_any_source
413      IF( PRESENT(ksource) )   use_source = ksource
414      !
415      CALL mpi_recv( pmess, kbytes, mpi_real, use_source, ktyp, mpi_comm_oce, istatus, iflag )
416#endif
417      !
418   END SUBROUTINE mpprecv_sp
419
420
421   SUBROUTINE mppgather( ptab, kp, pio )
422      !!----------------------------------------------------------------------
423      !!                   ***  routine mppgather  ***
424      !!
425      !! ** Purpose :   Transfert between a local subdomain array and a work
426      !!     array which is distributed following the vertical level.
427      !!
428      !!----------------------------------------------------------------------
429      REAL(wp), DIMENSION(jpi,jpj)      , INTENT(in   ) ::   ptab   ! subdomain input array
430      INTEGER                           , INTENT(in   ) ::   kp     ! record length
431      REAL(wp), DIMENSION(jpi,jpj,jpnij), INTENT(  out) ::   pio    ! subdomain input array
432      !!
433      INTEGER :: itaille, ierror   ! temporary integer
434      !!---------------------------------------------------------------------
435      !
436      itaille = jpi * jpj
437#if ! defined key_mpi_off
438      CALL mpi_gather( ptab, itaille, mpi_double_precision, pio, itaille     ,   &
439         &                            mpi_double_precision, kp , mpi_comm_oce, ierror )
440#else
441      pio(:,:,1) = ptab(:,:)
442#endif
443      !
444   END SUBROUTINE mppgather
445
446
447   SUBROUTINE mppscatter( pio, kp, ptab )
448      !!----------------------------------------------------------------------
449      !!                  ***  routine mppscatter  ***
450      !!
451      !! ** Purpose :   Transfert between awork array which is distributed
452      !!      following the vertical level and the local subdomain array.
453      !!
454      !!----------------------------------------------------------------------
455      REAL(wp), DIMENSION(jpi,jpj,jpnij)  ::   pio    ! output array
456      INTEGER                             ::   kp     ! Tag (not used with MPI
457      REAL(wp), DIMENSION(jpi,jpj)        ::   ptab   ! subdomain array input
458      !!
459      INTEGER :: itaille, ierror   ! temporary integer
460      !!---------------------------------------------------------------------
461      !
462      itaille = jpi * jpj
463      !
464#if ! defined key_mpi_off
465      CALL mpi_scatter( pio, itaille, mpi_double_precision, ptab, itaille     ,   &
466         &                            mpi_double_precision, kp  , mpi_comm_oce, ierror )
467#else
468      ptab(:,:) = pio(:,:,1)
469#endif
470      !
471   END SUBROUTINE mppscatter
472
473
474   SUBROUTINE mpp_delay_sum( cdname, cdelay, y_in, pout, ldlast, kcom )
475     !!----------------------------------------------------------------------
476      !!                   ***  routine mpp_delay_sum  ***
477      !!
478      !! ** Purpose :   performed delayed mpp_sum, the result is received on next call
479      !!
480      !!----------------------------------------------------------------------
481      CHARACTER(len=*), INTENT(in   )               ::   cdname  ! name of the calling subroutine
482      CHARACTER(len=*), INTENT(in   )               ::   cdelay  ! name (used as id) of the delayed operation
483      COMPLEX(dp),      INTENT(in   ), DIMENSION(:) ::   y_in
484      REAL(wp),         INTENT(  out), DIMENSION(:) ::   pout
485      LOGICAL,          INTENT(in   )               ::   ldlast  ! true if this is the last time we call this routine
486      INTEGER,          INTENT(in   ), OPTIONAL     ::   kcom
487      !!
488      INTEGER ::   ji, isz
489      INTEGER ::   idvar
490      INTEGER ::   ierr, ilocalcomm
491      COMPLEX(dp), ALLOCATABLE, DIMENSION(:) ::   ytmp
492      !!----------------------------------------------------------------------
493#if ! defined key_mpi_off
494      ilocalcomm = mpi_comm_oce
495      IF( PRESENT(kcom) )   ilocalcomm = kcom
496
497      isz = SIZE(y_in)
498
499      IF( narea == 1 .AND. numcom == -1 ) CALL mpp_report( cdname, ld_dlg = .TRUE. )
500
501      idvar = -1
502      DO ji = 1, nbdelay
503         IF( TRIM(cdelay) == TRIM(c_delaylist(ji)) ) idvar = ji
504      END DO
505      IF ( idvar == -1 )   CALL ctl_stop( 'STOP',' mpp_delay_sum : please add a new delayed exchange for '//TRIM(cdname) )
506
507      IF ( ndelayid(idvar) == 0 ) THEN         ! first call    with restart: %z1d defined in iom_delay_rst
508         !                                       --------------------------
509         IF ( SIZE(todelay(idvar)%z1d) /= isz ) THEN                  ! Check dimension coherence
510            IF(lwp) WRITE(numout,*) ' WARNING: the nb of delayed variables in restart file is not the model one'
511            DEALLOCATE(todelay(idvar)%z1d)
512            ndelayid(idvar) = -1                                      ! do as if we had no restart
513         ELSE
514            ALLOCATE(todelay(idvar)%y1d(isz))
515            todelay(idvar)%y1d(:) = CMPLX(todelay(idvar)%z1d(:), 0., wp)   ! create %y1d, complex variable needed by mpi_sumdd
516            ndelayid(idvar) = MPI_REQUEST_NULL                             ! initialised request to a valid value
517         END IF
518      ENDIF
519
520      IF( ndelayid(idvar) == -1 ) THEN         ! first call without restart: define %y1d and %z1d from y_in with blocking allreduce
521         !                                       --------------------------
522         ALLOCATE(todelay(idvar)%z1d(isz), todelay(idvar)%y1d(isz))   ! allocate also %z1d as used for the restart
523         CALL mpi_allreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ierr )   ! get %y1d
524         ndelayid(idvar) = MPI_REQUEST_NULL
525      ENDIF
526
527      CALL mpp_delay_rcv( idvar )         ! make sure %z1d is received
528
529      ! send back pout from todelay(idvar)%z1d defined at previous call
530      pout(:) = todelay(idvar)%z1d(:)
531
532      ! send y_in into todelay(idvar)%y1d with a non-blocking communication
533# if defined key_mpi2
534      IF( ln_timing ) CALL tic_tac( .TRUE., ld_global = .TRUE.)
535      CALL  mpi_allreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ierr )
536      ndelayid(idvar) = MPI_REQUEST_NULL
537      IF( ln_timing ) CALL tic_tac(.FALSE., ld_global = .TRUE.)
538# else
539      CALL mpi_iallreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ndelayid(idvar), ierr )
540# endif
541#else
542      pout(:) = REAL(y_in(:), wp)
543#endif
544
545   END SUBROUTINE mpp_delay_sum
546
547
548   SUBROUTINE mpp_delay_max( cdname, cdelay, p_in, pout, ldlast, kcom )
549      !!----------------------------------------------------------------------
550      !!                   ***  routine mpp_delay_max  ***
551      !!
552      !! ** Purpose :   performed delayed mpp_max, the result is received on next call
553      !!
554      !!----------------------------------------------------------------------
555      CHARACTER(len=*), INTENT(in   )                 ::   cdname  ! name of the calling subroutine
556      CHARACTER(len=*), INTENT(in   )                 ::   cdelay  ! name (used as id) of the delayed operation
557      REAL(wp),         INTENT(in   ), DIMENSION(:)   ::   p_in    !
558      REAL(wp),         INTENT(  out), DIMENSION(:)   ::   pout    !
559      LOGICAL,          INTENT(in   )                 ::   ldlast  ! true if this is the last time we call this routine
560      INTEGER,          INTENT(in   ), OPTIONAL       ::   kcom
561      !!
562      INTEGER ::   ji, isz
563      INTEGER ::   idvar
564      INTEGER ::   ierr, ilocalcomm
565      INTEGER ::   MPI_TYPE
566      !!----------------------------------------------------------------------
567
568#if ! defined key_mpi_off
569      if( wp == dp ) then
570         MPI_TYPE = MPI_DOUBLE_PRECISION
571      else if ( wp == sp ) then
572         MPI_TYPE = MPI_REAL
573      else
574        CALL ctl_stop( "Error defining type, wp is neither dp nor sp" )
575
576      end if
577
578      ilocalcomm = mpi_comm_oce
579      IF( PRESENT(kcom) )   ilocalcomm = kcom
580
581      isz = SIZE(p_in)
582
583      IF( narea == 1 .AND. numcom == -1 ) CALL mpp_report( cdname, ld_dlg = .TRUE. )
584
585      idvar = -1
586      DO ji = 1, nbdelay
587         IF( TRIM(cdelay) == TRIM(c_delaylist(ji)) ) idvar = ji
588      END DO
589      IF ( idvar == -1 )   CALL ctl_stop( 'STOP',' mpp_delay_max : please add a new delayed exchange for '//TRIM(cdname) )
590
591      IF ( ndelayid(idvar) == 0 ) THEN         ! first call    with restart: %z1d defined in iom_delay_rst
592         !                                       --------------------------
593         IF ( SIZE(todelay(idvar)%z1d) /= isz ) THEN                  ! Check dimension coherence
594            IF(lwp) WRITE(numout,*) ' WARNING: the nb of delayed variables in restart file is not the model one'
595            DEALLOCATE(todelay(idvar)%z1d)
596            ndelayid(idvar) = -1                                      ! do as if we had no restart
597         ELSE
598            ndelayid(idvar) = MPI_REQUEST_NULL
599         END IF
600      ENDIF
601
602      IF( ndelayid(idvar) == -1 ) THEN         ! first call without restart: define %z1d from p_in with a blocking allreduce
603         !                                       --------------------------
604         ALLOCATE(todelay(idvar)%z1d(isz))
605         CALL mpi_allreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_DOUBLE_PRECISION, mpi_max, ilocalcomm, ierr )   ! get %z1d
606         ndelayid(idvar) = MPI_REQUEST_NULL
607      ENDIF
608
609      CALL mpp_delay_rcv( idvar )         ! make sure %z1d is received
610
611      ! send back pout from todelay(idvar)%z1d defined at previous call
612      pout(:) = todelay(idvar)%z1d(:)
613
614      ! send p_in into todelay(idvar)%z1d with a non-blocking communication
615      ! (PM) Should we get rid of MPI2 option ? MPI3 was release in 2013. Who is still using MPI2 ?
616# if defined key_mpi2
617      IF( ln_timing ) CALL tic_tac( .TRUE., ld_global = .TRUE.)
618      CALL  mpi_allreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_TYPE, mpi_max, ilocalcomm, ierr )
619      IF( ln_timing ) CALL tic_tac(.FALSE., ld_global = .TRUE.)
620# else
621      CALL mpi_iallreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_TYPE, mpi_max, ilocalcomm, ndelayid(idvar), ierr )
622# endif
623#else
624      pout(:) = p_in(:)
625#endif
626
627   END SUBROUTINE mpp_delay_max
628
629
630   SUBROUTINE mpp_delay_rcv( kid )
631      !!----------------------------------------------------------------------
632      !!                   ***  routine mpp_delay_rcv  ***
633      !!
634      !! ** Purpose :  force barrier for delayed mpp (needed for restart)
635      !!
636      !!----------------------------------------------------------------------
637      INTEGER,INTENT(in   )      ::  kid
638      INTEGER ::   ierr
639      !!----------------------------------------------------------------------
640#if ! defined key_mpi_off
641      IF( ln_timing ) CALL tic_tac( .TRUE., ld_global = .TRUE.)
642      ! test on ndelayid(kid) useless as mpi_wait return immediatly if the request handle is MPI_REQUEST_NULL
643      CALL mpi_wait( ndelayid(kid), MPI_STATUS_IGNORE, ierr ) ! after this ndelayid(kid) = MPI_REQUEST_NULL
644      IF( ln_timing ) CALL tic_tac( .FALSE., ld_global = .TRUE.)
645      IF( ASSOCIATED(todelay(kid)%y1d) )   todelay(kid)%z1d(:) = REAL(todelay(kid)%y1d(:), wp)  ! define %z1d from %y1d
646#endif
647   END SUBROUTINE mpp_delay_rcv
648
649   SUBROUTINE mpp_bcast_nml( cdnambuff , kleng )
650      CHARACTER(LEN=:)    , ALLOCATABLE, INTENT(INOUT) :: cdnambuff
651      INTEGER                          , INTENT(INOUT) :: kleng
652      !!----------------------------------------------------------------------
653      !!                  ***  routine mpp_bcast_nml  ***
654      !!
655      !! ** Purpose :   broadcast namelist character buffer
656      !!
657      !!----------------------------------------------------------------------
658      !!
659      INTEGER ::   iflag
660      !!----------------------------------------------------------------------
661      !
662#if ! defined key_mpi_off
663      call MPI_BCAST(kleng, 1, MPI_INT, 0, mpi_comm_oce, iflag)
664      call MPI_BARRIER(mpi_comm_oce, iflag)
665!$AGRIF_DO_NOT_TREAT
666      IF ( .NOT. ALLOCATED(cdnambuff) ) ALLOCATE( CHARACTER(LEN=kleng) :: cdnambuff )
667!$AGRIF_END_DO_NOT_TREAT
668      call MPI_BCAST(cdnambuff, kleng, MPI_CHARACTER, 0, mpi_comm_oce, iflag)
669      call MPI_BARRIER(mpi_comm_oce, iflag)
670#endif
671      !
672   END SUBROUTINE mpp_bcast_nml
673
674
675   !!----------------------------------------------------------------------
676   !!    ***  mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real  ***
677   !!
678   !!----------------------------------------------------------------------
679   !!
680#  define OPERATION_MAX
681#  define INTEGER_TYPE
682#  define DIM_0d
683#     define ROUTINE_ALLREDUCE           mppmax_int
684#     include "mpp_allreduce_generic.h90"
685#     undef ROUTINE_ALLREDUCE
686#  undef DIM_0d
687#  define DIM_1d
688#     define ROUTINE_ALLREDUCE           mppmax_a_int
689#     include "mpp_allreduce_generic.h90"
690#     undef ROUTINE_ALLREDUCE
691#  undef DIM_1d
692#  undef INTEGER_TYPE
693!
694   !!
695   !!   ----   SINGLE PRECISION VERSIONS
696   !!
697#  define SINGLE_PRECISION
698#  define REAL_TYPE
699#  define DIM_0d
700#     define ROUTINE_ALLREDUCE           mppmax_real_sp
701#     include "mpp_allreduce_generic.h90"
702#     undef ROUTINE_ALLREDUCE
703#  undef DIM_0d
704#  define DIM_1d
705#     define ROUTINE_ALLREDUCE           mppmax_a_real_sp
706#     include "mpp_allreduce_generic.h90"
707#     undef ROUTINE_ALLREDUCE
708#  undef DIM_1d
709#  undef SINGLE_PRECISION
710   !!
711   !!
712   !!   ----   DOUBLE PRECISION VERSIONS
713   !!
714!
715#  define DIM_0d
716#     define ROUTINE_ALLREDUCE           mppmax_real_dp
717#     include "mpp_allreduce_generic.h90"
718#     undef ROUTINE_ALLREDUCE
719#  undef DIM_0d
720#  define DIM_1d
721#     define ROUTINE_ALLREDUCE           mppmax_a_real_dp
722#     include "mpp_allreduce_generic.h90"
723#     undef ROUTINE_ALLREDUCE
724#  undef DIM_1d
725#  undef REAL_TYPE
726#  undef OPERATION_MAX
727   !!----------------------------------------------------------------------
728   !!    ***  mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real  ***
729   !!
730   !!----------------------------------------------------------------------
731   !!
732#  define OPERATION_MIN
733#  define INTEGER_TYPE
734#  define DIM_0d
735#     define ROUTINE_ALLREDUCE           mppmin_int
736#     include "mpp_allreduce_generic.h90"
737#     undef ROUTINE_ALLREDUCE
738#  undef DIM_0d
739#  define DIM_1d
740#     define ROUTINE_ALLREDUCE           mppmin_a_int
741#     include "mpp_allreduce_generic.h90"
742#     undef ROUTINE_ALLREDUCE
743#  undef DIM_1d
744#  undef INTEGER_TYPE
745!
746   !!
747   !!   ----   SINGLE PRECISION VERSIONS
748   !!
749#  define SINGLE_PRECISION
750#  define REAL_TYPE
751#  define DIM_0d
752#     define ROUTINE_ALLREDUCE           mppmin_real_sp
753#     include "mpp_allreduce_generic.h90"
754#     undef ROUTINE_ALLREDUCE
755#  undef DIM_0d
756#  define DIM_1d
757#     define ROUTINE_ALLREDUCE           mppmin_a_real_sp
758#     include "mpp_allreduce_generic.h90"
759#     undef ROUTINE_ALLREDUCE
760#  undef DIM_1d
761#  undef SINGLE_PRECISION
762   !!
763   !!   ----   DOUBLE PRECISION VERSIONS
764   !!
765
766#  define DIM_0d
767#     define ROUTINE_ALLREDUCE           mppmin_real_dp
768#     include "mpp_allreduce_generic.h90"
769#     undef ROUTINE_ALLREDUCE
770#  undef DIM_0d
771#  define DIM_1d
772#     define ROUTINE_ALLREDUCE           mppmin_a_real_dp
773#     include "mpp_allreduce_generic.h90"
774#     undef ROUTINE_ALLREDUCE
775#  undef DIM_1d
776#  undef REAL_TYPE
777#  undef OPERATION_MIN
778
779   !!----------------------------------------------------------------------
780   !!    ***  mppsum_a_int, mppsum_int, mppsum_a_real, mppsum_real  ***
781   !!
782   !!   Global sum of 1D array or a variable (integer, real or complex)
783   !!----------------------------------------------------------------------
784   !!
785#  define OPERATION_SUM
786#  define INTEGER_TYPE
787#  define DIM_0d
788#     define ROUTINE_ALLREDUCE           mppsum_int
789#     include "mpp_allreduce_generic.h90"
790#     undef ROUTINE_ALLREDUCE
791#  undef DIM_0d
792#  define DIM_1d
793#     define ROUTINE_ALLREDUCE           mppsum_a_int
794#     include "mpp_allreduce_generic.h90"
795#     undef ROUTINE_ALLREDUCE
796#  undef DIM_1d
797#  undef INTEGER_TYPE
798
799   !!
800   !!   ----   SINGLE PRECISION VERSIONS
801   !!
802#  define OPERATION_SUM
803#  define SINGLE_PRECISION
804#  define REAL_TYPE
805#  define DIM_0d
806#     define ROUTINE_ALLREDUCE           mppsum_real_sp
807#     include "mpp_allreduce_generic.h90"
808#     undef ROUTINE_ALLREDUCE
809#  undef DIM_0d
810#  define DIM_1d
811#     define ROUTINE_ALLREDUCE           mppsum_a_real_sp
812#     include "mpp_allreduce_generic.h90"
813#     undef ROUTINE_ALLREDUCE
814#  undef DIM_1d
815#  undef REAL_TYPE
816#  undef OPERATION_SUM
817
818#  undef SINGLE_PRECISION
819
820   !!
821   !!   ----   DOUBLE PRECISION VERSIONS
822   !!
823#  define OPERATION_SUM
824#  define REAL_TYPE
825#  define DIM_0d
826#     define ROUTINE_ALLREDUCE           mppsum_real_dp
827#     include "mpp_allreduce_generic.h90"
828#     undef ROUTINE_ALLREDUCE
829#  undef DIM_0d
830#  define DIM_1d
831#     define ROUTINE_ALLREDUCE           mppsum_a_real_dp
832#     include "mpp_allreduce_generic.h90"
833#     undef ROUTINE_ALLREDUCE
834#  undef DIM_1d
835#  undef REAL_TYPE
836#  undef OPERATION_SUM
837
838#  define OPERATION_SUM_DD
839#  define COMPLEX_TYPE
840#  define DIM_0d
841#     define ROUTINE_ALLREDUCE           mppsum_realdd
842#     include "mpp_allreduce_generic.h90"
843#     undef ROUTINE_ALLREDUCE
844#  undef DIM_0d
845#  define DIM_1d
846#     define ROUTINE_ALLREDUCE           mppsum_a_realdd
847#     include "mpp_allreduce_generic.h90"
848#     undef ROUTINE_ALLREDUCE
849#  undef DIM_1d
850#  undef COMPLEX_TYPE
851#  undef OPERATION_SUM_DD
852
853   !!----------------------------------------------------------------------
854   !!    ***  mpp_minloc2d, mpp_minloc3d, mpp_maxloc2d, mpp_maxloc3d
855   !!
856   !!----------------------------------------------------------------------
857   !!
858   !!
859   !!   ----   SINGLE PRECISION VERSIONS
860   !!
861#  define SINGLE_PRECISION
862#  define OPERATION_MINLOC
863#  define DIM_2d
864#     define ROUTINE_LOC           mpp_minloc2d_sp
865#     include "mpp_loc_generic.h90"
866#     undef ROUTINE_LOC
867#  undef DIM_2d
868#  define DIM_3d
869#     define ROUTINE_LOC           mpp_minloc3d_sp
870#     include "mpp_loc_generic.h90"
871#     undef ROUTINE_LOC
872#  undef DIM_3d
873#  undef OPERATION_MINLOC
874
875#  define OPERATION_MAXLOC
876#  define DIM_2d
877#     define ROUTINE_LOC           mpp_maxloc2d_sp
878#     include "mpp_loc_generic.h90"
879#     undef ROUTINE_LOC
880#  undef DIM_2d
881#  define DIM_3d
882#     define ROUTINE_LOC           mpp_maxloc3d_sp
883#     include "mpp_loc_generic.h90"
884#     undef ROUTINE_LOC
885#  undef DIM_3d
886#  undef OPERATION_MAXLOC
887#  undef SINGLE_PRECISION
888   !!
889   !!   ----   DOUBLE PRECISION VERSIONS
890   !!
891#  define OPERATION_MINLOC
892#  define DIM_2d
893#     define ROUTINE_LOC           mpp_minloc2d_dp
894#     include "mpp_loc_generic.h90"
895#     undef ROUTINE_LOC
896#  undef DIM_2d
897#  define DIM_3d
898#     define ROUTINE_LOC           mpp_minloc3d_dp
899#     include "mpp_loc_generic.h90"
900#     undef ROUTINE_LOC
901#  undef DIM_3d
902#  undef OPERATION_MINLOC
903
904#  define OPERATION_MAXLOC
905#  define DIM_2d
906#     define ROUTINE_LOC           mpp_maxloc2d_dp
907#     include "mpp_loc_generic.h90"
908#     undef ROUTINE_LOC
909#  undef DIM_2d
910#  define DIM_3d
911#     define ROUTINE_LOC           mpp_maxloc3d_dp
912#     include "mpp_loc_generic.h90"
913#     undef ROUTINE_LOC
914#  undef DIM_3d
915#  undef OPERATION_MAXLOC
916
917
918   SUBROUTINE mppsync()
919      !!----------------------------------------------------------------------
920      !!                  ***  routine mppsync  ***
921      !!
922      !! ** Purpose :   Massively parallel processors, synchroneous
923      !!
924      !!-----------------------------------------------------------------------
925      INTEGER :: ierror
926      !!-----------------------------------------------------------------------
927      !
928#if ! defined key_mpi_off
929      CALL mpi_barrier( mpi_comm_oce, ierror )
930#endif
931      !
932   END SUBROUTINE mppsync
933
934
935   SUBROUTINE mppstop( ld_abort )
936      !!----------------------------------------------------------------------
937      !!                  ***  routine mppstop  ***
938      !!
939      !! ** purpose :   Stop massively parallel processors method
940      !!
941      !!----------------------------------------------------------------------
942      LOGICAL, OPTIONAL, INTENT(in) :: ld_abort    ! source process number
943      LOGICAL ::   ll_abort
944      INTEGER ::   info
945      !!----------------------------------------------------------------------
946      ll_abort = .FALSE.
947      IF( PRESENT(ld_abort) ) ll_abort = ld_abort
948      !
949#if ! defined key_mpi_off
950      IF(ll_abort) THEN
951         CALL mpi_abort( MPI_COMM_WORLD )
952      ELSE
953         CALL mppsync
954         CALL mpi_finalize( info )
955      ENDIF
956#endif
957      IF( ll_abort ) STOP 123
958      !
959   END SUBROUTINE mppstop
960
961
962   SUBROUTINE mpp_comm_free( kcom )
963      !!----------------------------------------------------------------------
964      INTEGER, INTENT(in) ::   kcom
965      !!
966      INTEGER :: ierr
967      !!----------------------------------------------------------------------
968      !
969#if ! defined key_mpi_off
970      CALL MPI_COMM_FREE(kcom, ierr)
971#endif
972      !
973   END SUBROUTINE mpp_comm_free
974
975
976   SUBROUTINE mpp_ini_znl( kumout )
977      !!----------------------------------------------------------------------
978      !!               ***  routine mpp_ini_znl  ***
979      !!
980      !! ** Purpose :   Initialize special communicator for computing zonal sum
981      !!
982      !! ** Method  : - Look for processors in the same row
983      !!              - Put their number in nrank_znl
984      !!              - Create group for the znl processors
985      !!              - Create a communicator for znl processors
986      !!              - Determine if processor should write znl files
987      !!
988      !! ** output
989      !!      ndim_rank_znl = number of processors on the same row
990      !!      ngrp_znl = group ID for the znl processors
991      !!      ncomm_znl = communicator for the ice procs.
992      !!      n_znl_root = number (in the world) of proc 0 in the ice comm.
993      !!
994      !!----------------------------------------------------------------------
995      INTEGER, INTENT(in) ::   kumout   ! ocean.output logical units
996      !
997      INTEGER :: jproc      ! dummy loop integer
998      INTEGER :: ierr, ii   ! local integer
999      INTEGER, ALLOCATABLE, DIMENSION(:) ::   kwork
1000      !!----------------------------------------------------------------------
1001#if ! defined key_mpi_off
1002      !-$$     WRITE (numout,*) 'mpp_ini_znl ', mpprank, ' - ngrp_world     : ', ngrp_world
1003      !-$$     WRITE (numout,*) 'mpp_ini_znl ', mpprank, ' - mpi_comm_world : ', mpi_comm_world
1004      !-$$     WRITE (numout,*) 'mpp_ini_znl ', mpprank, ' - mpi_comm_oce   : ', mpi_comm_oce
1005      !
1006      ALLOCATE( kwork(jpnij), STAT=ierr )
1007      IF( ierr /= 0 ) CALL ctl_stop( 'STOP', 'mpp_ini_znl : failed to allocate 1D array of length jpnij')
1008
1009      IF( jpnj == 1 ) THEN
1010         ngrp_znl  = ngrp_world
1011         ncomm_znl = mpi_comm_oce
1012      ELSE
1013         !
1014         CALL MPI_ALLGATHER ( njmpp, 1, mpi_integer, kwork, 1, mpi_integer, mpi_comm_oce, ierr )
1015         !-$$        WRITE (numout,*) 'mpp_ini_znl ', mpprank, ' - kwork pour njmpp : ', kwork
1016         !-$$        CALL flush(numout)
1017         !
1018         ! Count number of processors on the same row
1019         ndim_rank_znl = 0
1020         DO jproc=1,jpnij
1021            IF ( kwork(jproc) == njmpp ) THEN
1022               ndim_rank_znl = ndim_rank_znl + 1
1023            ENDIF
1024         END DO
1025         !-$$        WRITE (numout,*) 'mpp_ini_znl ', mpprank, ' - ndim_rank_znl : ', ndim_rank_znl
1026         !-$$        CALL flush(numout)
1027         ! Allocate the right size to nrank_znl
1028         IF (ALLOCATED (nrank_znl)) DEALLOCATE(nrank_znl)
1029         ALLOCATE(nrank_znl(ndim_rank_znl))
1030         ii = 0
1031         nrank_znl (:) = 0
1032         DO jproc=1,jpnij
1033            IF ( kwork(jproc) == njmpp) THEN
1034               ii = ii + 1
1035               nrank_znl(ii) = jproc -1
1036            ENDIF
1037         END DO
1038         !-$$        WRITE (numout,*) 'mpp_ini_znl ', mpprank, ' - nrank_znl : ', nrank_znl
1039         !-$$        CALL flush(numout)
1040
1041         ! Create the opa group
1042         CALL MPI_COMM_GROUP(mpi_comm_oce,ngrp_opa,ierr)
1043         !-$$        WRITE (numout,*) 'mpp_ini_znl ', mpprank, ' - ngrp_opa : ', ngrp_opa
1044         !-$$        CALL flush(numout)
1045
1046         ! Create the znl group from the opa group
1047         CALL MPI_GROUP_INCL  ( ngrp_opa, ndim_rank_znl, nrank_znl, ngrp_znl, ierr )
1048         !-$$        WRITE (numout,*) 'mpp_ini_znl ', mpprank, ' - ngrp_znl ', ngrp_znl
1049         !-$$        CALL flush(numout)
1050
1051         ! Create the znl communicator from the opa communicator, ie the pool of procs in the same row
1052         CALL MPI_COMM_CREATE ( mpi_comm_oce, ngrp_znl, ncomm_znl, ierr )
1053         !-$$        WRITE (numout,*) 'mpp_ini_znl ', mpprank, ' - ncomm_znl ', ncomm_znl
1054         !-$$        CALL flush(numout)
1055         !
1056      END IF
1057
1058      ! Determines if processor if the first (starting from i=1) on the row
1059      IF ( jpni == 1 ) THEN
1060         l_znl_root = .TRUE.
1061      ELSE
1062         l_znl_root = .FALSE.
1063         kwork (1) = nimpp
1064         CALL mpp_min ( 'lib_mpp', kwork(1), kcom = ncomm_znl)
1065         IF ( nimpp == kwork(1)) l_znl_root = .TRUE.
1066      END IF
1067
1068      DEALLOCATE(kwork)
1069#endif
1070
1071   END SUBROUTINE mpp_ini_znl
1072
1073   SUBROUTINE mpp_ini_nc
1074      !!----------------------------------------------------------------------
1075      !!               ***  routine mpp_ini_nc  ***
1076      !!
1077      !! ** Purpose :   Initialize special communicators for MPI3 neighbourhood
1078      !!                collectives
1079      !!
1080      !! ** Method  : - Create graph communicators starting from the processes
1081      !!                distribution along i and j directions
1082      !
1083      !! ** output
1084      !!         mpi_nc_com = MPI3 neighbourhood collectives communicator
1085      !!         mpi_nc_all_com = MPI3 neighbourhood collectives communicator
1086      !!                          (with diagonals)
1087      !!
1088      !!----------------------------------------------------------------------
1089      INTEGER, DIMENSION(:), ALLOCATABLE :: ineigh, ineighalls, ineighallr
1090      INTEGER :: ideg, idegalls, idegallr, icont, icont1
1091      INTEGER :: ierr
1092      LOGICAL, PARAMETER :: ireord = .FALSE.
1093
1094#if ! defined key_mpi_off
1095
1096      ideg = 0
1097      idegalls = 0
1098      idegallr = 0
1099      icont = 0
1100      icont1 = 0
1101
1102      IF (nbondi .eq. 1) THEN
1103         ideg = ideg + 1
1104      ELSEIF (nbondi .eq. -1) THEN
1105         ideg = ideg + 1
1106      ELSEIF (nbondi .eq. 0) THEN
1107         ideg = ideg + 2
1108      ENDIF
1109
1110      IF (nbondj .eq. 1) THEN
1111         ideg = ideg + 1
1112      ELSEIF (nbondj .eq. -1) THEN
1113         ideg = ideg + 1
1114      ELSEIF (nbondj .eq. 0) THEN
1115         ideg = ideg + 2
1116      ENDIF
1117
1118      idegalls = ideg
1119      idegallr = ideg
1120
1121      IF (nones .ne. -1) idegalls = idegalls + 1
1122      IF (nonws .ne. -1) idegalls = idegalls + 1
1123      IF (noses .ne. -1) idegalls = idegalls + 1
1124      IF (nosws .ne. -1) idegalls = idegalls + 1
1125      IF (noner .ne. -1) idegallr = idegallr + 1
1126      IF (nonwr .ne. -1) idegallr = idegallr + 1
1127      IF (noser .ne. -1) idegallr = idegallr + 1
1128      IF (noswr .ne. -1) idegallr = idegallr + 1
1129
1130      ALLOCATE(ineigh(ideg))
1131      ALLOCATE(ineighalls(idegalls))
1132      ALLOCATE(ineighallr(idegallr))
1133
1134      IF (nbondi .eq. 1) THEN
1135         icont = icont + 1
1136         ineigh(icont) = nowe
1137         ineighalls(icont) = nowe
1138         ineighallr(icont) = nowe
1139      ELSEIF (nbondi .eq. -1) THEN
1140         icont = icont + 1
1141         ineigh(icont) = noea
1142         ineighalls(icont) = noea
1143         ineighallr(icont) = noea
1144      ELSEIF (nbondi .eq. 0) THEN
1145         icont = icont + 1
1146         ineigh(icont) = nowe
1147         ineighalls(icont) = nowe
1148         ineighallr(icont) = nowe
1149         icont = icont + 1
1150         ineigh(icont) = noea
1151         ineighalls(icont) = noea
1152         ineighallr(icont) = noea
1153      ENDIF
1154
1155      IF (nbondj .eq. 1) THEN
1156         icont = icont + 1
1157         ineigh(icont) = noso
1158         ineighalls(icont) = noso
1159         ineighallr(icont) = noso
1160      ELSEIF (nbondj .eq. -1) THEN
1161         icont = icont + 1
1162         ineigh(icont) = nono
1163         ineighalls(icont) = nono
1164         ineighallr(icont) = nono
1165      ELSEIF (nbondj .eq. 0) THEN
1166         icont = icont + 1
1167         ineigh(icont) = noso
1168         ineighalls(icont) = noso
1169         ineighallr(icont) = noso
1170         icont = icont + 1
1171         ineigh(icont) = nono
1172         ineighalls(icont) = nono
1173         ineighallr(icont) = nono
1174      ENDIF
1175
1176      icont1 = icont
1177      IF (nosws .ne. -1) THEN
1178         icont = icont + 1
1179         ineighalls(icont) = nosws
1180      ENDIF
1181      IF (noses .ne. -1) THEN
1182         icont = icont + 1
1183         ineighalls(icont) = noses
1184      ENDIF
1185      IF (nonws .ne. -1) THEN
1186         icont = icont + 1
1187         ineighalls(icont) = nonws
1188      ENDIF
1189      IF (nones .ne. -1) THEN
1190         icont = icont + 1
1191         ineighalls(icont) = nones
1192      ENDIF
1193      IF (noswr .ne. -1) THEN
1194         icont1 = icont1 + 1
1195         ineighallr(icont1) = noswr
1196      ENDIF
1197      IF (noser .ne. -1) THEN
1198         icont1 = icont1 + 1
1199         ineighallr(icont1) = noser
1200      ENDIF
1201      IF (nonwr .ne. -1) THEN
1202         icont1 = icont1 + 1
1203         ineighallr(icont1) = nonwr
1204      ENDIF
1205      IF (noner .ne. -1) THEN
1206         icont1 = icont1 + 1
1207         ineighallr(icont1) = noner
1208      ENDIF
1209
1210      CALL MPI_Dist_graph_create_adjacent(mpi_comm_oce, ideg, ineigh, MPI_UNWEIGHTED, ideg, ineigh, MPI_UNWEIGHTED, MPI_INFO_NULL, ireord, mpi_nc_com, ierr)
1211      CALL MPI_Dist_graph_create_adjacent(mpi_comm_oce, idegallr, ineighallr, MPI_UNWEIGHTED, idegalls, ineighalls, MPI_UNWEIGHTED, MPI_INFO_NULL, ireord, mpi_nc_all_com, ierr)
1212
1213      DEALLOCATE (ineigh)
1214      DEALLOCATE (ineighalls)
1215      DEALLOCATE (ineighallr)
1216#endif
1217   END SUBROUTINE mpp_ini_nc
1218
1219
1220
1221   SUBROUTINE mpp_ini_north
1222      !!----------------------------------------------------------------------
1223      !!               ***  routine mpp_ini_north  ***
1224      !!
1225      !! ** Purpose :   Initialize special communicator for north folding
1226      !!      condition together with global variables needed in the mpp folding
1227      !!
1228      !! ** Method  : - Look for northern processors
1229      !!              - Put their number in nrank_north
1230      !!              - Create groups for the world processors and the north processors
1231      !!              - Create a communicator for northern processors
1232      !!
1233      !! ** output
1234      !!      njmppmax = njmpp for northern procs
1235      !!      ndim_rank_north = number of processors in the northern line
1236      !!      nrank_north (ndim_rank_north) = number  of the northern procs.
1237      !!      ngrp_world = group ID for the world processors
1238      !!      ngrp_north = group ID for the northern processors
1239      !!      ncomm_north = communicator for the northern procs.
1240      !!      north_root = number (in the world) of proc 0 in the northern comm.
1241      !!
1242      !!----------------------------------------------------------------------
1243      INTEGER ::   ierr
1244      INTEGER ::   jjproc
1245      INTEGER ::   ii, ji
1246      !!----------------------------------------------------------------------
1247      !
1248#if ! defined key_mpi_off
1249      njmppmax = MAXVAL( njmppt )
1250      !
1251      ! Look for how many procs on the northern boundary
1252      ndim_rank_north = 0
1253      DO jjproc = 1, jpni
1254         IF( nfproc(jjproc) /= -1 )   ndim_rank_north = ndim_rank_north + 1
1255      END DO
1256      !
1257      ! Allocate the right size to nrank_north
1258      IF (ALLOCATED (nrank_north)) DEALLOCATE(nrank_north)
1259      ALLOCATE( nrank_north(ndim_rank_north) )
1260
1261      ! Fill the nrank_north array with proc. number of northern procs.
1262      ! Note : the rank start at 0 in MPI
1263      ii = 0
1264      DO ji = 1, jpni
1265         IF ( nfproc(ji) /= -1   ) THEN
1266            ii=ii+1
1267            nrank_north(ii)=nfproc(ji)
1268         END IF
1269      END DO
1270      !
1271      ! create the world group
1272      CALL MPI_COMM_GROUP( mpi_comm_oce, ngrp_world, ierr )
1273      !
1274      ! Create the North group from the world group
1275      CALL MPI_GROUP_INCL( ngrp_world, ndim_rank_north, nrank_north, ngrp_north, ierr )
1276      !
1277      ! Create the North communicator , ie the pool of procs in the north group
1278      CALL MPI_COMM_CREATE( mpi_comm_oce, ngrp_north, ncomm_north, ierr )
1279      !
1280#endif
1281   END SUBROUTINE mpp_ini_north
1282
1283
1284   SUBROUTINE DDPDD_MPI( ydda, yddb, ilen, itype )
1285      !!---------------------------------------------------------------------
1286      !!   Routine DDPDD_MPI: used by reduction operator MPI_SUMDD
1287      !!
1288      !!   Modification of original codes written by David H. Bailey
1289      !!   This subroutine computes yddb(i) = ydda(i)+yddb(i)
1290      !!---------------------------------------------------------------------
1291      INTEGER                     , INTENT(in)    ::   ilen, itype
1292      COMPLEX(dp), DIMENSION(ilen), INTENT(in)    ::   ydda
1293      COMPLEX(dp), DIMENSION(ilen), INTENT(inout) ::   yddb
1294      !
1295      REAL(dp) :: zerr, zt1, zt2    ! local work variables
1296      INTEGER  :: ji, ztmp           ! local scalar
1297      !!---------------------------------------------------------------------
1298      !
1299      ztmp = itype   ! avoid compilation warning
1300      !
1301      DO ji=1,ilen
1302      ! Compute ydda + yddb using Knuth's trick.
1303         zt1  = real(ydda(ji)) + real(yddb(ji))
1304         zerr = zt1 - real(ydda(ji))
1305         zt2  = ((real(yddb(ji)) - zerr) + (real(ydda(ji)) - (zt1 - zerr))) &
1306                + aimag(ydda(ji)) + aimag(yddb(ji))
1307
1308         ! The result is zt1 + zt2, after normalization.
1309         yddb(ji) = cmplx ( zt1 + zt2, zt2 - ((zt1 + zt2) - zt1),wp )
1310      END DO
1311      !
1312   END SUBROUTINE DDPDD_MPI
1313
1314
1315   SUBROUTINE mpp_report( cdname, kpk, kpl, kpf, ld_lbc, ld_glb, ld_dlg )
1316      !!----------------------------------------------------------------------
1317      !!                  ***  routine mpp_report  ***
1318      !!
1319      !! ** Purpose :   report use of mpp routines per time-setp
1320      !!
1321      !!----------------------------------------------------------------------
1322      CHARACTER(len=*),           INTENT(in   ) ::   cdname      ! name of the calling subroutine
1323      INTEGER         , OPTIONAL, INTENT(in   ) ::   kpk, kpl, kpf
1324      LOGICAL         , OPTIONAL, INTENT(in   ) ::   ld_lbc, ld_glb, ld_dlg
1325      !!
1326      CHARACTER(len=128)                        ::   ccountname  ! name of a subroutine to count communications
1327      LOGICAL ::   ll_lbc, ll_glb, ll_dlg
1328      INTEGER ::    ji,  jj,  jk,  jh, jf, jcount   ! dummy loop indices
1329      !!----------------------------------------------------------------------
1330#if ! defined key_mpi_off
1331      !
1332      ll_lbc = .FALSE.
1333      IF( PRESENT(ld_lbc) ) ll_lbc = ld_lbc
1334      ll_glb = .FALSE.
1335      IF( PRESENT(ld_glb) ) ll_glb = ld_glb
1336      ll_dlg = .FALSE.
1337      IF( PRESENT(ld_dlg) ) ll_dlg = ld_dlg
1338      !
1339      ! find the smallest common frequency: default = frequency product, if multiple, choose the larger of the 2 frequency
1340      ncom_freq = ncom_fsbc
1341      !
1342      IF ( ncom_stp == nit000+ncom_freq ) THEN   ! avoid to count extra communications in potential initializations at nit000
1343         IF( ll_lbc ) THEN
1344            IF( .NOT. ALLOCATED(ncomm_sequence) ) ALLOCATE( ncomm_sequence(ncom_rec_max,2) )
1345            IF( .NOT. ALLOCATED(    crname_lbc) ) ALLOCATE(     crname_lbc(ncom_rec_max  ) )
1346            n_sequence_lbc = n_sequence_lbc + 1
1347            IF( n_sequence_lbc > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
1348            crname_lbc(n_sequence_lbc) = cdname     ! keep the name of the calling routine
1349            ncomm_sequence(n_sequence_lbc,1) = kpk*kpl   ! size of 3rd and 4th dimensions
1350            ncomm_sequence(n_sequence_lbc,2) = kpf       ! number of arrays to be treated (multi)
1351         ENDIF
1352         IF( ll_glb ) THEN
1353            IF( .NOT. ALLOCATED(crname_glb) ) ALLOCATE( crname_glb(ncom_rec_max) )
1354            n_sequence_glb = n_sequence_glb + 1
1355            IF( n_sequence_glb > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
1356            crname_glb(n_sequence_glb) = cdname     ! keep the name of the calling routine
1357         ENDIF
1358         IF( ll_dlg ) THEN
1359            IF( .NOT. ALLOCATED(crname_dlg) ) ALLOCATE( crname_dlg(ncom_rec_max) )
1360            n_sequence_dlg = n_sequence_dlg + 1
1361            IF( n_sequence_dlg > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
1362            crname_dlg(n_sequence_dlg) = cdname     ! keep the name of the calling routine
1363         ENDIF
1364      ELSE IF ( ncom_stp == nit000+2*ncom_freq ) THEN
1365         CALL ctl_opn( numcom, 'communication_report.txt', 'REPLACE', 'FORMATTED', 'SEQUENTIAL', -1, numout, .FALSE., narea )
1366         WRITE(numcom,*) ' '
1367         WRITE(numcom,*) ' ------------------------------------------------------------'
1368         WRITE(numcom,*) ' Communication pattern report (second oce+sbc+top time step):'
1369         WRITE(numcom,*) ' ------------------------------------------------------------'
1370         WRITE(numcom,*) ' '
1371         WRITE(numcom,'(A,I4)') ' Exchanged halos : ', n_sequence_lbc
1372         jj = 0; jk = 0; jf = 0; jh = 0
1373         DO ji = 1, n_sequence_lbc
1374            IF ( ncomm_sequence(ji,1) .GT. 1 ) jk = jk + 1
1375            IF ( ncomm_sequence(ji,2) .GT. 1 ) jf = jf + 1
1376            IF ( ncomm_sequence(ji,1) .GT. 1 .AND. ncomm_sequence(ji,2) .GT. 1 ) jj = jj + 1
1377            jh = MAX (jh, ncomm_sequence(ji,1)*ncomm_sequence(ji,2))
1378         END DO
1379         WRITE(numcom,'(A,I3)') ' 3D Exchanged halos : ', jk
1380         WRITE(numcom,'(A,I3)') ' Multi arrays exchanged halos : ', jf
1381         WRITE(numcom,'(A,I3)') '   from which 3D : ', jj
1382         WRITE(numcom,'(A,I10)') ' Array max size : ', jh*jpi*jpj
1383         WRITE(numcom,*) ' '
1384         WRITE(numcom,*) ' lbc_lnk called'
1385         DO ji = 1, n_sequence_lbc - 1
1386            IF ( crname_lbc(ji) /= 'already counted' ) THEN
1387               ccountname = crname_lbc(ji)
1388               crname_lbc(ji) = 'already counted'
1389               jcount = 1
1390               DO jj = ji + 1, n_sequence_lbc
1391                  IF ( ccountname ==  crname_lbc(jj) ) THEN
1392                     jcount = jcount + 1
1393                     crname_lbc(jj) = 'already counted'
1394                  END IF
1395               END DO
1396               WRITE(numcom,'(A, I4, A, A)') ' - ', jcount,' times by subroutine ', TRIM(ccountname)
1397            END IF
1398         END DO
1399         IF ( crname_lbc(n_sequence_lbc) /= 'already counted' ) THEN
1400            WRITE(numcom,'(A, I4, A, A)') ' - ', 1,' times by subroutine ', TRIM(crname_lbc(ncom_rec_max))
1401         END IF
1402         WRITE(numcom,*) ' '
1403         IF ( n_sequence_glb > 0 ) THEN
1404            WRITE(numcom,'(A,I4)') ' Global communications : ', n_sequence_glb
1405            jj = 1
1406            DO ji = 2, n_sequence_glb
1407               IF( crname_glb(ji-1) /= crname_glb(ji) ) THEN
1408                  WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_glb(ji-1))
1409                  jj = 0
1410               END IF
1411               jj = jj + 1
1412            END DO
1413            WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_glb(n_sequence_glb))
1414            DEALLOCATE(crname_glb)
1415         ELSE
1416            WRITE(numcom,*) ' No MPI global communication '
1417         ENDIF
1418         WRITE(numcom,*) ' '
1419         IF ( n_sequence_dlg > 0 ) THEN
1420            WRITE(numcom,'(A,I4)') ' Delayed global communications : ', n_sequence_dlg
1421            jj = 1
1422            DO ji = 2, n_sequence_dlg
1423               IF( crname_dlg(ji-1) /= crname_dlg(ji) ) THEN
1424                  WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_dlg(ji-1))
1425                  jj = 0
1426               END IF
1427               jj = jj + 1
1428            END DO
1429            WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_dlg(n_sequence_dlg))
1430            DEALLOCATE(crname_dlg)
1431         ELSE
1432            WRITE(numcom,*) ' No MPI delayed global communication '
1433         ENDIF
1434         WRITE(numcom,*) ' '
1435         WRITE(numcom,*) ' -----------------------------------------------'
1436         WRITE(numcom,*) ' '
1437         DEALLOCATE(ncomm_sequence)
1438         DEALLOCATE(crname_lbc)
1439      ENDIF
1440#endif
1441   END SUBROUTINE mpp_report
1442
1443
1444   SUBROUTINE tic_tac (ld_tic, ld_global)
1445
1446    LOGICAL,           INTENT(IN) :: ld_tic
1447    LOGICAL, OPTIONAL, INTENT(IN) :: ld_global
1448    REAL(dp), DIMENSION(2), SAVE :: tic_wt
1449    REAL(dp),               SAVE :: tic_ct = 0._dp
1450    INTEGER :: ii
1451#if ! defined key_mpi_off
1452
1453    IF( ncom_stp <= nit000 ) RETURN
1454    IF( ncom_stp == nitend ) RETURN
1455    ii = 1
1456    IF( PRESENT( ld_global ) ) THEN
1457       IF( ld_global ) ii = 2
1458    END IF
1459
1460    IF ( ld_tic ) THEN
1461       tic_wt(ii) = MPI_Wtime()                                                    ! start count tic->tac (waiting time)
1462       IF ( tic_ct > 0.0_dp ) compute_time = compute_time + MPI_Wtime() - tic_ct   ! cumulate count tac->tic
1463    ELSE
1464       waiting_time(ii) = waiting_time(ii) + MPI_Wtime() - tic_wt(ii)              ! cumulate count tic->tac
1465       tic_ct = MPI_Wtime()                                                        ! start count tac->tic (waiting time)
1466    ENDIF
1467#endif
1468
1469   END SUBROUTINE tic_tac
1470
1471#if defined key_mpi_off
1472   SUBROUTINE mpi_wait(request, status, ierror)
1473      INTEGER                            , INTENT(in   ) ::   request
1474      INTEGER, DIMENSION(MPI_STATUS_SIZE), INTENT(  out) ::   status
1475      INTEGER                            , INTENT(  out) ::   ierror
1476   END SUBROUTINE mpi_wait
1477
1478
1479   FUNCTION MPI_Wtime()
1480      REAL(wp) ::  MPI_Wtime
1481      MPI_Wtime = -1.
1482   END FUNCTION MPI_Wtime
1483#endif
1484
1485   !!----------------------------------------------------------------------
1486   !!   ctl_stop, ctl_warn, get_unit, ctl_opn, ctl_nam, load_nml   routines
1487   !!----------------------------------------------------------------------
1488
1489   SUBROUTINE ctl_stop( cd1, cd2, cd3, cd4, cd5 ,   &
1490      &                 cd6, cd7, cd8, cd9, cd10 )
1491      !!----------------------------------------------------------------------
1492      !!                  ***  ROUTINE  stop_opa  ***
1493      !!
1494      !! ** Purpose :   print in ocean.outpput file a error message and
1495      !!                increment the error number (nstop) by one.
1496      !!----------------------------------------------------------------------
1497      CHARACTER(len=*), INTENT(in   )           ::   cd1
1498      CHARACTER(len=*), INTENT(in   ), OPTIONAL ::        cd2, cd3, cd4, cd5
1499      CHARACTER(len=*), INTENT(in   ), OPTIONAL ::   cd6, cd7, cd8, cd9, cd10
1500      !
1501      CHARACTER(LEN=8) ::   clfmt            ! writing format
1502      INTEGER          ::   inum
1503      !!----------------------------------------------------------------------
1504      !
1505      nstop = nstop + 1
1506      !
1507      IF( cd1 == 'STOP' .AND. narea /= 1 ) THEN    ! Immediate stop: add an arror message in 'ocean.output' file
1508         CALL ctl_opn( inum, 'ocean.output', 'APPEND', 'FORMATTED', 'SEQUENTIAL', -1, 6, .FALSE. )
1509         WRITE(inum,*)
1510         WRITE(inum,*) ' ==>>>   Look for "E R R O R" messages in all existing *ocean.output* files'
1511         CLOSE(inum)
1512      ENDIF
1513      IF( numout == 6 ) THEN                       ! force to open ocean.output file if not already opened
1514         CALL ctl_opn( numout, 'ocean.output', 'REPLACE', 'FORMATTED', 'SEQUENTIAL', -1, -1, .FALSE., narea )
1515      ENDIF
1516      !
1517                            WRITE(numout,*)
1518                            WRITE(numout,*) ' ===>>> : E R R O R'
1519                            WRITE(numout,*)
1520                            WRITE(numout,*) '         ==========='
1521                            WRITE(numout,*)
1522                            WRITE(numout,*) TRIM(cd1)
1523      IF( PRESENT(cd2 ) )   WRITE(numout,*) TRIM(cd2)
1524      IF( PRESENT(cd3 ) )   WRITE(numout,*) TRIM(cd3)
1525      IF( PRESENT(cd4 ) )   WRITE(numout,*) TRIM(cd4)
1526      IF( PRESENT(cd5 ) )   WRITE(numout,*) TRIM(cd5)
1527      IF( PRESENT(cd6 ) )   WRITE(numout,*) TRIM(cd6)
1528      IF( PRESENT(cd7 ) )   WRITE(numout,*) TRIM(cd7)
1529      IF( PRESENT(cd8 ) )   WRITE(numout,*) TRIM(cd8)
1530      IF( PRESENT(cd9 ) )   WRITE(numout,*) TRIM(cd9)
1531      IF( PRESENT(cd10) )   WRITE(numout,*) TRIM(cd10)
1532                            WRITE(numout,*)
1533      !
1534                               CALL FLUSH(numout    )
1535      IF( numstp     /= -1 )   CALL FLUSH(numstp    )
1536      IF( numrun     /= -1 )   CALL FLUSH(numrun    )
1537      IF( numevo_ice /= -1 )   CALL FLUSH(numevo_ice)
1538      !
1539      IF( cd1 == 'STOP' ) THEN
1540         WRITE(numout,*)
1541         WRITE(numout,*)  'huge E-R-R-O-R : immediate stop'
1542         WRITE(numout,*)
1543         CALL FLUSH(numout)
1544         CALL SLEEP(60)   ! make sure that all output and abort files are written by all cores. 60s should be enough...
1545         CALL mppstop( ld_abort = .true. )
1546      ENDIF
1547      !
1548   END SUBROUTINE ctl_stop
1549
1550
1551   SUBROUTINE ctl_warn( cd1, cd2, cd3, cd4, cd5,   &
1552      &                 cd6, cd7, cd8, cd9, cd10 )
1553      !!----------------------------------------------------------------------
1554      !!                  ***  ROUTINE  stop_warn  ***
1555      !!
1556      !! ** Purpose :   print in ocean.outpput file a error message and
1557      !!                increment the warning number (nwarn) by one.
1558      !!----------------------------------------------------------------------
1559      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd1, cd2, cd3, cd4, cd5
1560      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd6, cd7, cd8, cd9, cd10
1561      !!----------------------------------------------------------------------
1562      !
1563      nwarn = nwarn + 1
1564      !
1565      IF(lwp) THEN
1566                               WRITE(numout,*)
1567                               WRITE(numout,*) ' ===>>> : W A R N I N G'
1568                               WRITE(numout,*)
1569                               WRITE(numout,*) '         ==============='
1570                               WRITE(numout,*)
1571         IF( PRESENT(cd1 ) )   WRITE(numout,*) TRIM(cd1)
1572         IF( PRESENT(cd2 ) )   WRITE(numout,*) TRIM(cd2)
1573         IF( PRESENT(cd3 ) )   WRITE(numout,*) TRIM(cd3)
1574         IF( PRESENT(cd4 ) )   WRITE(numout,*) TRIM(cd4)
1575         IF( PRESENT(cd5 ) )   WRITE(numout,*) TRIM(cd5)
1576         IF( PRESENT(cd6 ) )   WRITE(numout,*) TRIM(cd6)
1577         IF( PRESENT(cd7 ) )   WRITE(numout,*) TRIM(cd7)
1578         IF( PRESENT(cd8 ) )   WRITE(numout,*) TRIM(cd8)
1579         IF( PRESENT(cd9 ) )   WRITE(numout,*) TRIM(cd9)
1580         IF( PRESENT(cd10) )   WRITE(numout,*) TRIM(cd10)
1581                               WRITE(numout,*)
1582      ENDIF
1583      CALL FLUSH(numout)
1584      !
1585   END SUBROUTINE ctl_warn
1586
1587
1588   SUBROUTINE ctl_opn( knum, cdfile, cdstat, cdform, cdacce, klengh, kout, ldwp, karea )
1589      !!----------------------------------------------------------------------
1590      !!                  ***  ROUTINE ctl_opn  ***
1591      !!
1592      !! ** Purpose :   Open file and check if required file is available.
1593      !!
1594      !! ** Method  :   Fortan open
1595      !!----------------------------------------------------------------------
1596      INTEGER          , INTENT(  out) ::   knum      ! logical unit to open
1597      CHARACTER(len=*) , INTENT(in   ) ::   cdfile    ! file name to open
1598      CHARACTER(len=*) , INTENT(in   ) ::   cdstat    ! disposition specifier
1599      CHARACTER(len=*) , INTENT(in   ) ::   cdform    ! formatting specifier
1600      CHARACTER(len=*) , INTENT(in   ) ::   cdacce    ! access specifier
1601      INTEGER          , INTENT(in   ) ::   klengh    ! record length
1602      INTEGER          , INTENT(in   ) ::   kout      ! number of logical units for write
1603      LOGICAL          , INTENT(in   ) ::   ldwp      ! boolean term for print
1604      INTEGER, OPTIONAL, INTENT(in   ) ::   karea     ! proc number
1605      !
1606      CHARACTER(len=80) ::   clfile
1607      CHARACTER(LEN=10) ::   clfmt            ! writing format
1608      INTEGER           ::   iost
1609      INTEGER           ::   idg              ! number of digits
1610      !!----------------------------------------------------------------------
1611      !
1612      ! adapt filename
1613      ! ----------------
1614      clfile = TRIM(cdfile)
1615      IF( PRESENT( karea ) ) THEN
1616         IF( karea > 1 ) THEN
1617            ! Warning: jpnij is maybe not already defined when calling ctl_opn -> use mppsize instead of jpnij
1618            idg = MAX( INT(LOG10(REAL(MAX(1,mppsize-1),wp))) + 1, 4 )      ! how many digits to we need to write? min=4, max=9
1619            WRITE(clfmt, "('(a,a,i', i1, '.', i1, ')')") idg, idg          ! '(a,a,ix.x)'
1620            WRITE(clfile, clfmt) TRIM(clfile), '_', karea-1
1621         ENDIF
1622      ENDIF
1623#if defined key_agrif
1624      IF( .NOT. Agrif_Root() )   clfile = TRIM(Agrif_CFixed())//'_'//TRIM(clfile)
1625      knum=Agrif_Get_Unit()
1626#else
1627      knum=get_unit()
1628#endif
1629      IF( TRIM(cdfile) == '/dev/null' )   clfile = TRIM(cdfile)   ! force the use of /dev/null
1630      !
1631      IF(       cdacce(1:6) == 'DIRECT' )  THEN   ! cdacce has always more than 6 characters
1632         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS=cdstat, RECL=klengh         , ERR=100, IOSTAT=iost )
1633      ELSE IF( TRIM(cdstat) == 'APPEND' )  THEN   ! cdstat can have less than 6 characters
1634         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS='UNKNOWN', POSITION='APPEND', ERR=100, IOSTAT=iost )
1635      ELSE
1636         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS=cdstat                      , ERR=100, IOSTAT=iost )
1637      ENDIF
1638      IF( iost /= 0 .AND. TRIM(clfile) == '/dev/null' ) &   ! for windows
1639         &  OPEN(UNIT=knum,FILE='NUL', FORM=cdform, ACCESS=cdacce, STATUS=cdstat                      , ERR=100, IOSTAT=iost )
1640      IF( iost == 0 ) THEN
1641         IF(ldwp .AND. kout > 0) THEN
1642            WRITE(kout,*) '     file   : ', TRIM(clfile),' open ok'
1643            WRITE(kout,*) '     unit   = ', knum
1644            WRITE(kout,*) '     status = ', cdstat
1645            WRITE(kout,*) '     form   = ', cdform
1646            WRITE(kout,*) '     access = ', cdacce
1647            WRITE(kout,*)
1648         ENDIF
1649      ENDIF
1650100   CONTINUE
1651      IF( iost /= 0 ) THEN
1652         WRITE(ctmp1,*) ' ===>>>> : bad opening file: ', TRIM(clfile)
1653         WRITE(ctmp2,*) ' =======   ===  '
1654         WRITE(ctmp3,*) '           unit   = ', knum
1655         WRITE(ctmp4,*) '           status = ', cdstat
1656         WRITE(ctmp5,*) '           form   = ', cdform
1657         WRITE(ctmp6,*) '           access = ', cdacce
1658         WRITE(ctmp7,*) '           iostat = ', iost
1659         WRITE(ctmp8,*) '           we stop. verify the file '
1660         CALL ctl_stop( 'STOP', ctmp1, ctmp2, ctmp3, ctmp4, ctmp5, ctmp6, ctmp7, ctmp8 )
1661      ENDIF
1662      !
1663   END SUBROUTINE ctl_opn
1664
1665
1666   SUBROUTINE ctl_nam ( kios, cdnam )
1667      !!----------------------------------------------------------------------
1668      !!                  ***  ROUTINE ctl_nam  ***
1669      !!
1670      !! ** Purpose :   Informations when error while reading a namelist
1671      !!
1672      !! ** Method  :   Fortan open
1673      !!----------------------------------------------------------------------
1674      INTEGER                                , INTENT(inout) ::   kios    ! IO status after reading the namelist
1675      CHARACTER(len=*)                       , INTENT(in   ) ::   cdnam   ! group name of namelist for which error occurs
1676      !
1677      CHARACTER(len=5) ::   clios   ! string to convert iostat in character for print
1678      !!----------------------------------------------------------------------
1679      !
1680      WRITE (clios, '(I5.0)')   kios
1681      IF( kios < 0 ) THEN
1682         CALL ctl_warn( 'end of record or file while reading namelist '   &
1683            &           // TRIM(cdnam) // ' iostat = ' // TRIM(clios) )
1684      ENDIF
1685      !
1686      IF( kios > 0 ) THEN
1687         CALL ctl_stop( 'misspelled variable in namelist '   &
1688            &           // TRIM(cdnam) // ' iostat = ' // TRIM(clios) )
1689      ENDIF
1690      kios = 0
1691      !
1692   END SUBROUTINE ctl_nam
1693
1694
1695   INTEGER FUNCTION get_unit()
1696      !!----------------------------------------------------------------------
1697      !!                  ***  FUNCTION  get_unit  ***
1698      !!
1699      !! ** Purpose :   return the index of an unused logical unit
1700      !!----------------------------------------------------------------------
1701      LOGICAL :: llopn
1702      !!----------------------------------------------------------------------
1703      !
1704      get_unit = 15   ! choose a unit that is big enough then it is not already used in NEMO
1705      llopn = .TRUE.
1706      DO WHILE( (get_unit < 998) .AND. llopn )
1707         get_unit = get_unit + 1
1708         INQUIRE( unit = get_unit, opened = llopn )
1709      END DO
1710      IF( (get_unit == 999) .AND. llopn ) THEN
1711         CALL ctl_stop( 'STOP', 'get_unit: All logical units until 999 are used...' )
1712      ENDIF
1713      !
1714   END FUNCTION get_unit
1715
1716   SUBROUTINE load_nml( cdnambuff , cdnamfile, kout, ldwp)
1717      CHARACTER(LEN=:)    , ALLOCATABLE, INTENT(INOUT) :: cdnambuff
1718      CHARACTER(LEN=*), INTENT(IN )                :: cdnamfile
1719      CHARACTER(LEN=256)                           :: chline
1720      CHARACTER(LEN=1)                             :: csp
1721      INTEGER, INTENT(IN)                          :: kout
1722      LOGICAL, INTENT(IN)                          :: ldwp  !: .true. only for the root broadcaster
1723      INTEGER                                      :: itot, iun, iltc, inl, ios, itotsav
1724      !
1725      !csp = NEW_LINE('A')
1726      ! a new line character is the best seperator but some systems (e.g.Cray)
1727      ! seem to terminate namelist reads from internal files early if they
1728      ! encounter new-lines. Use a single space for safety.
1729      csp = ' '
1730      !
1731      ! Check if the namelist buffer has already been allocated. Return if it has.
1732      !
1733      IF ( ALLOCATED( cdnambuff ) ) RETURN
1734      IF( ldwp ) THEN
1735         !
1736         ! Open namelist file
1737         !
1738         CALL ctl_opn( iun, cdnamfile, 'OLD', 'FORMATTED', 'SEQUENTIAL', -1, kout, ldwp )
1739         !
1740         ! First pass: count characters excluding comments and trimable white space
1741         !
1742         itot=0
1743     10  READ(iun,'(A256)',END=20,ERR=20) chline
1744         iltc = LEN_TRIM(chline)
1745         IF ( iltc.GT.0 ) THEN
1746          inl = INDEX(chline, '!')
1747          IF( inl.eq.0 ) THEN
1748           itot = itot + iltc + 1                                ! +1 for the newline character
1749          ELSEIF( inl.GT.0 .AND. LEN_TRIM( chline(1:inl-1) ).GT.0 ) THEN
1750           itot = itot + inl                                  !  includes +1 for the newline character
1751          ENDIF
1752         ENDIF
1753         GOTO 10
1754     20  CONTINUE
1755         !
1756         ! Allocate text cdnambuff for condensed namelist
1757         !
1758!$AGRIF_DO_NOT_TREAT
1759         ALLOCATE( CHARACTER(LEN=itot) :: cdnambuff )
1760!$AGRIF_END_DO_NOT_TREAT
1761         itotsav = itot
1762         !
1763         ! Second pass: read and transfer pruned characters into cdnambuff
1764         !
1765         REWIND(iun)
1766         itot=1
1767     30  READ(iun,'(A256)',END=40,ERR=40) chline
1768         iltc = LEN_TRIM(chline)
1769         IF ( iltc.GT.0 ) THEN
1770          inl = INDEX(chline, '!')
1771          IF( inl.eq.0 ) THEN
1772           inl = iltc
1773          ELSE
1774           inl = inl - 1
1775          ENDIF
1776          IF( inl.GT.0 .AND. LEN_TRIM( chline(1:inl) ).GT.0 ) THEN
1777             cdnambuff(itot:itot+inl-1) = chline(1:inl)
1778             WRITE( cdnambuff(itot+inl:itot+inl), '(a)' ) csp
1779             itot = itot + inl + 1
1780          ENDIF
1781         ENDIF
1782         GOTO 30
1783     40  CONTINUE
1784         itot = itot - 1
1785         IF( itotsav .NE. itot ) WRITE(*,*) 'WARNING in load_nml. Allocated ',itotsav,' for read buffer; but used ',itot
1786         !
1787         ! Close namelist file
1788         !
1789         CLOSE(iun)
1790         !write(*,'(32A)') cdnambuff
1791      ENDIF
1792#if ! defined key_mpi_off
1793      CALL mpp_bcast_nml( cdnambuff, itot )
1794#endif
1795  END SUBROUTINE load_nml
1796
1797
1798   !!----------------------------------------------------------------------
1799END MODULE lib_mpp
Note: See TracBrowser for help on using the repository browser.