New URL for NEMO forge!   http://forge.nemo-ocean.eu

Since March 2022 along with NEMO 4.2 release, the code development moved to a self-hosted GitLab.
This present forge is now archived and remained online for history.
lib_mpp.F90 in NEMO/trunk/src/OCE/LBC – NEMO

source: NEMO/trunk/src/OCE/LBC/lib_mpp.F90 @ 14072

Last change on this file since 14072 was 14072, checked in by laurent, 3 years ago

Merging branch "2020/dev_r13648_ASINTER-04_laurent_bulk_ice", ticket #2369

  • Property svn:keywords set to Id
File size: 73.9 KB
Line 
1MODULE lib_mpp
2   !!======================================================================
3   !!                       ***  MODULE  lib_mpp  ***
4   !! Ocean numerics:  massively parallel processing library
5   !!=====================================================================
6   !! History :  OPA  !  1994  (M. Guyon, J. Escobar, M. Imbard)  Original code
7   !!            7.0  !  1997  (A.M. Treguier)  SHMEM additions
8   !!            8.0  !  1998  (M. Imbard, J. Escobar, L. Colombet ) SHMEM and MPI
9   !!                 !  1998  (J.M. Molines) Open boundary conditions
10   !!   NEMO     1.0  !  2003  (J.M. Molines, G. Madec)  F90, free form
11   !!                 !  2003  (J.M. Molines) add mpp_ini_north(_3d,_2d)
12   !!             -   !  2004  (R. Bourdalle Badie)  isend option in mpi
13   !!                 !  2004  (J.M. Molines) minloc, maxloc
14   !!             -   !  2005  (G. Madec, S. Masson)  npolj=5,6 F-point & ice cases
15   !!             -   !  2005  (R. Redler) Replacement of MPI_COMM_WORLD except for MPI_Abort
16   !!             -   !  2005  (R. Benshila, G. Madec)  add extra halo case
17   !!             -   !  2008  (R. Benshila) add mpp_ini_ice
18   !!            3.2  !  2009  (R. Benshila) SHMEM suppression, north fold in lbc_nfd
19   !!            3.2  !  2009  (O. Marti)    add mpp_ini_znl
20   !!            4.0  !  2011  (G. Madec)  move ctl_ routines from in_out_manager
21   !!            3.5  !  2012  (S.Mocavero, I. Epicoco) Add mpp_lnk_bdy_3d/2d routines to optimize the BDY comm.
22   !!            3.5  !  2013  (C. Ethe, G. Madec)  message passing arrays as local variables
23   !!            3.5  !  2013  (S.Mocavero, I.Epicoco - CMCC) north fold optimizations
24   !!            3.6  !  2015  (O. Tintó and M. Castrillo - BSC) Added '_multiple' case for 2D lbc and max
25   !!            4.0  !  2017  (G. Madec) automatique allocation of array argument (use any 3rd dimension)
26   !!             -   !  2017  (G. Madec) create generic.h90 files to generate all lbc and north fold routines
27   !!----------------------------------------------------------------------
28
29   !!----------------------------------------------------------------------
30   !!   ctl_stop      : update momentum and tracer Kz from a tke scheme
31   !!   ctl_warn      : initialization, namelist read, and parameters control
32   !!   ctl_opn       : Open file and check if required file is available.
33   !!   ctl_nam       : Prints informations when an error occurs while reading a namelist
34   !!   load_nml      : Read, condense and buffer namelist file into character array for use as an internal file
35   !!----------------------------------------------------------------------
36   !!----------------------------------------------------------------------
37   !!   mpp_start     : get local communicator its size and rank
38   !!   mpp_lnk       : interface (defined in lbclnk) for message passing of 2d or 3d arrays (mpp_lnk_2d, mpp_lnk_3d)
39   !!   mpp_lnk_icb   : interface for message passing of 2d arrays with extra halo for icebergs (mpp_lnk_2d_icb)
40   !!   mpprecv       :
41   !!   mppsend       :
42   !!   mppscatter    :
43   !!   mppgather     :
44   !!   mpp_min       : generic interface for mppmin_int , mppmin_a_int , mppmin_real, mppmin_a_real
45   !!   mpp_max       : generic interface for mppmax_int , mppmax_a_int , mppmax_real, mppmax_a_real
46   !!   mpp_sum       : generic interface for mppsum_int , mppsum_a_int , mppsum_real, mppsum_a_real
47   !!   mpp_minloc    :
48   !!   mpp_maxloc    :
49   !!   mppsync       :
50   !!   mppstop       :
51   !!   mpp_ini_north : initialisation of north fold
52   !!   mpp_lbc_north_icb : alternative to mpp_nfd for extra outer halo with icebergs
53   !!   mpp_bcast_nml : broadcast/receive namelist character buffer from reading process to all others
54   !!----------------------------------------------------------------------
55   USE dom_oce        ! ocean space and time domain
56   USE in_out_manager ! I/O manager
57
58   IMPLICIT NONE
59   PRIVATE
60   !
61   PUBLIC   ctl_stop, ctl_warn, ctl_opn, ctl_nam, load_nml
62   PUBLIC   mpp_start, mppstop, mppsync, mpp_comm_free
63   PUBLIC   mpp_ini_north
64   PUBLIC   mpp_min, mpp_max, mpp_sum, mpp_minloc, mpp_maxloc
65   PUBLIC   mpp_delay_max, mpp_delay_sum, mpp_delay_rcv
66   PUBLIC   mppscatter, mppgather
67   PUBLIC   mpp_ini_znl
68   PUBLIC   mpp_ini_nc
69   PUBLIC   mppsend, mpprecv                          ! needed by TAM and ICB routines
70   PUBLIC   mppsend_sp, mpprecv_sp                          ! needed by TAM and ICB routines
71   PUBLIC   mppsend_dp, mpprecv_dp                          ! needed by TAM and ICB routines
72   PUBLIC   mpp_report
73   PUBLIC   mpp_bcast_nml
74   PUBLIC   tic_tac
75#if ! defined key_mpp_mpi
76   PUBLIC MPI_wait
77   PUBLIC MPI_Wtime
78#endif
79
80   !! * Interfaces
81   !! define generic interface for these routine as they are called sometimes
82   !! with scalar arguments instead of array arguments, which causes problems
83   !! for the compilation on AIX system as well as NEC and SGI. Ok on COMPACQ
84   INTERFACE mpp_min
85      MODULE PROCEDURE mppmin_a_int, mppmin_int
86      MODULE PROCEDURE mppmin_a_real_sp, mppmin_real_sp
87      MODULE PROCEDURE mppmin_a_real_dp, mppmin_real_dp
88   END INTERFACE
89   INTERFACE mpp_max
90      MODULE PROCEDURE mppmax_a_int, mppmax_int
91      MODULE PROCEDURE mppmax_a_real_sp, mppmax_real_sp
92      MODULE PROCEDURE mppmax_a_real_dp, mppmax_real_dp
93   END INTERFACE
94   INTERFACE mpp_sum
95      MODULE PROCEDURE mppsum_a_int, mppsum_int
96      MODULE PROCEDURE mppsum_realdd, mppsum_a_realdd
97      MODULE PROCEDURE mppsum_a_real_sp, mppsum_real_sp
98      MODULE PROCEDURE mppsum_a_real_dp, mppsum_real_dp
99   END INTERFACE
100   INTERFACE mpp_minloc
101      MODULE PROCEDURE mpp_minloc2d_sp ,mpp_minloc3d_sp
102      MODULE PROCEDURE mpp_minloc2d_dp ,mpp_minloc3d_dp
103   END INTERFACE
104   INTERFACE mpp_maxloc
105      MODULE PROCEDURE mpp_maxloc2d_sp ,mpp_maxloc3d_sp
106      MODULE PROCEDURE mpp_maxloc2d_dp ,mpp_maxloc3d_dp
107   END INTERFACE
108
109   !! ========================= !!
110   !!  MPI  variable definition !!
111   !! ========================= !!
112#if   defined key_mpp_mpi
113!$AGRIF_DO_NOT_TREAT
114   INCLUDE 'mpif.h'
115!$AGRIF_END_DO_NOT_TREAT
116   LOGICAL, PUBLIC, PARAMETER ::   lk_mpp = .TRUE.    !: mpp flag
117#else
118   INTEGER, PUBLIC, PARAMETER ::   MPI_STATUS_SIZE = 1
119   INTEGER, PUBLIC, PARAMETER ::   MPI_REAL = 4
120   INTEGER, PUBLIC, PARAMETER ::   MPI_DOUBLE_PRECISION = 8
121   LOGICAL, PUBLIC, PARAMETER ::   lk_mpp = .FALSE.    !: mpp flag
122#endif
123
124   INTEGER, PARAMETER         ::   nprocmax = 2**10   ! maximun dimension (required to be a power of 2)
125
126   INTEGER, PUBLIC ::   mppsize        ! number of process
127   INTEGER, PUBLIC ::   mpprank        ! process number  [ 0 - size-1 ]
128!$AGRIF_DO_NOT_TREAT
129   INTEGER, PUBLIC ::   mpi_comm_oce   ! opa local communicator
130!$AGRIF_END_DO_NOT_TREAT
131
132   INTEGER :: MPI_SUMDD
133
134   ! variables used for zonal integration
135   INTEGER, PUBLIC ::   ncomm_znl       !: communicator made by the processors on the same zonal average
136   LOGICAL, PUBLIC ::   l_znl_root      !: True on the 'left'most processor on the same row
137   INTEGER         ::   ngrp_znl        !  group ID for the znl processors
138   INTEGER         ::   ndim_rank_znl   !  number of processors on the same zonal average
139   INTEGER, DIMENSION(:), ALLOCATABLE, SAVE ::   nrank_znl  ! dimension ndim_rank_znl, number of the procs into the same znl domain
140
141   ! variables used for MPI3 neighbourhood collectives
142   INTEGER, PUBLIC :: mpi_nc_com                   ! MPI3 neighbourhood collectives communicator
143   INTEGER, PUBLIC :: mpi_nc_all_com               ! MPI3 neighbourhood collectives communicator (with diagionals)
144
145   ! North fold condition in mpp_mpi with jpni > 1 (PUBLIC for TAM)
146   INTEGER, PUBLIC ::   ngrp_world        !: group ID for the world processors
147   INTEGER, PUBLIC ::   ngrp_opa          !: group ID for the opa processors
148   INTEGER, PUBLIC ::   ngrp_north        !: group ID for the northern processors (to be fold)
149   INTEGER, PUBLIC ::   ncomm_north       !: communicator made by the processors belonging to ngrp_north
150   INTEGER, PUBLIC ::   ndim_rank_north   !: number of 'sea' processor in the northern line (can be /= jpni !)
151   INTEGER, PUBLIC ::   njmppmax          !: value of njmpp for the processors of the northern line
152   INTEGER, PUBLIC ::   north_root        !: number (in the comm_opa) of proc 0 in the northern comm
153   INTEGER, PUBLIC, DIMENSION(:), ALLOCATABLE, SAVE ::   nrank_north   !: dimension ndim_rank_north
154
155   ! Communications summary report
156   CHARACTER(len=lca), DIMENSION(:), ALLOCATABLE ::   crname_lbc                   !: names of lbc_lnk calling routines
157   CHARACTER(len=lca), DIMENSION(:), ALLOCATABLE ::   crname_glb                   !: names of global comm calling routines
158   CHARACTER(len=lca), DIMENSION(:), ALLOCATABLE ::   crname_dlg                   !: names of delayed global comm calling routines
159   INTEGER, PUBLIC                               ::   ncom_stp = 0                 !: copy of time step # istp
160   INTEGER, PUBLIC                               ::   ncom_fsbc = 1                !: copy of sbc time step # nn_fsbc
161   INTEGER, PUBLIC                               ::   ncom_freq                    !: frequency of comm diagnostic
162   INTEGER, PUBLIC , DIMENSION(:,:), ALLOCATABLE ::   ncomm_sequence               !: size of communicated arrays (halos)
163   INTEGER, PARAMETER, PUBLIC                    ::   ncom_rec_max = 5000          !: max number of communication record
164   INTEGER, PUBLIC                               ::   n_sequence_lbc = 0           !: # of communicated arraysvia lbc
165   INTEGER, PUBLIC                               ::   n_sequence_glb = 0           !: # of global communications
166   INTEGER, PUBLIC                               ::   n_sequence_dlg = 0           !: # of delayed global communications
167   INTEGER, PUBLIC                               ::   numcom = -1                  !: logical unit for communicaton report
168   LOGICAL, PUBLIC                               ::   l_full_nf_update = .TRUE.    !: logical for a full (2lines) update of bc at North fold report
169   INTEGER,                    PARAMETER, PUBLIC ::   nbdelay = 2       !: number of delayed operations
170   !: name (used as id) of allreduce-delayed operations
171   ! Warning: we must use the same character length in an array constructor (at least for gcc compiler)
172   CHARACTER(len=32), DIMENSION(nbdelay), PUBLIC ::   c_delaylist = (/ 'cflice', 'fwb   ' /)
173   !: component name where the allreduce-delayed operation is performed
174   CHARACTER(len=3),  DIMENSION(nbdelay), PUBLIC ::   c_delaycpnt = (/ 'ICE'   , 'OCE' /)
175   TYPE, PUBLIC ::   DELAYARR
176      REAL(   wp), POINTER, DIMENSION(:) ::  z1d => NULL()
177      COMPLEX(dp), POINTER, DIMENSION(:) ::  y1d => NULL()
178   END TYPE DELAYARR
179   TYPE( DELAYARR ), DIMENSION(nbdelay), PUBLIC, SAVE  ::   todelay         !: must have SAVE for default initialization of DELAYARR
180   INTEGER,          DIMENSION(nbdelay), PUBLIC        ::   ndelayid = -1   !: mpi request id of the delayed operations
181
182   ! timing summary report
183   REAL(dp), DIMENSION(2), PUBLIC ::  waiting_time = 0._dp
184   REAL(dp)              , PUBLIC ::  compute_time = 0._dp, elapsed_time = 0._dp
185
186   REAL(wp), DIMENSION(:), ALLOCATABLE, SAVE ::   tampon   ! buffer in case of bsend
187
188   LOGICAL, PUBLIC ::   ln_nnogather                !: namelist control of northfold comms
189   LOGICAL, PUBLIC ::   l_north_nogather = .FALSE.  !: internal control of northfold comms
190
191   !! * Substitutions
192#  include "do_loop_substitute.h90"
193   !!----------------------------------------------------------------------
194   !! NEMO/OCE 4.0 , NEMO Consortium (2018)
195   !! $Id$
196   !! Software governed by the CeCILL license (see ./LICENSE)
197   !!----------------------------------------------------------------------
198CONTAINS
199
200   SUBROUTINE mpp_start( localComm )
201      !!----------------------------------------------------------------------
202      !!                  ***  routine mpp_start  ***
203      !!
204      !! ** Purpose :   get mpi_comm_oce, mpprank and mppsize
205      !!----------------------------------------------------------------------
206      INTEGER         , OPTIONAL   , INTENT(in   ) ::   localComm    !
207      !
208      INTEGER ::   ierr
209      LOGICAL ::   llmpi_init
210      !!----------------------------------------------------------------------
211#if defined key_mpp_mpi
212      !
213      CALL mpi_initialized ( llmpi_init, ierr )
214      IF( ierr /= MPI_SUCCESS ) CALL ctl_stop( 'STOP', ' lib_mpp: Error in routine mpi_initialized' )
215
216      IF( .NOT. llmpi_init ) THEN
217         IF( PRESENT(localComm) ) THEN
218            WRITE(ctmp1,*) ' lib_mpp: You cannot provide a local communicator '
219            WRITE(ctmp2,*) '          without calling MPI_Init before ! '
220            CALL ctl_stop( 'STOP', ctmp1, ctmp2 )
221         ENDIF
222         CALL mpi_init( ierr )
223         IF( ierr /= MPI_SUCCESS ) CALL ctl_stop( 'STOP', ' lib_mpp: Error in routine mpi_init' )
224      ENDIF
225
226      IF( PRESENT(localComm) ) THEN
227         IF( Agrif_Root() ) THEN
228            mpi_comm_oce = localComm
229         ENDIF
230      ELSE
231         CALL mpi_comm_dup( mpi_comm_world, mpi_comm_oce, ierr)
232         IF( ierr /= MPI_SUCCESS ) CALL ctl_stop( 'STOP', ' lib_mpp: Error in routine mpi_comm_dup' )
233      ENDIF
234
235# if defined key_agrif
236      IF( Agrif_Root() ) THEN
237         CALL Agrif_MPI_Init(mpi_comm_oce)
238      ELSE
239         CALL Agrif_MPI_set_grid_comm(mpi_comm_oce)
240      ENDIF
241# endif
242
243      CALL mpi_comm_rank( mpi_comm_oce, mpprank, ierr )
244      CALL mpi_comm_size( mpi_comm_oce, mppsize, ierr )
245      !
246      CALL MPI_OP_CREATE(DDPDD_MPI, .TRUE., MPI_SUMDD, ierr)
247      !
248#else
249      IF( PRESENT( localComm ) ) mpi_comm_oce = localComm
250      mppsize = 1
251      mpprank = 0
252#endif
253   END SUBROUTINE mpp_start
254
255
256   SUBROUTINE mppsend( ktyp, pmess, kbytes, kdest, md_req )
257      !!----------------------------------------------------------------------
258      !!                  ***  routine mppsend  ***
259      !!
260      !! ** Purpose :   Send messag passing array
261      !!
262      !!----------------------------------------------------------------------
263      REAL(wp), INTENT(inout) ::   pmess(*)   ! array of real
264      INTEGER , INTENT(in   ) ::   kbytes     ! size of the array pmess
265      INTEGER , INTENT(in   ) ::   kdest      ! receive process number
266      INTEGER , INTENT(in   ) ::   ktyp       ! tag of the message
267      INTEGER , INTENT(in   ) ::   md_req     ! argument for isend
268      !!
269      INTEGER ::   iflag
270      INTEGER :: mpi_working_type
271      !!----------------------------------------------------------------------
272      !
273#if defined key_mpp_mpi
274      IF (wp == dp) THEN
275         mpi_working_type = mpi_double_precision
276      ELSE
277         mpi_working_type = mpi_real
278      END IF
279      CALL mpi_isend( pmess, kbytes, mpi_working_type, kdest , ktyp, mpi_comm_oce, md_req, iflag )
280#endif
281      !
282   END SUBROUTINE mppsend
283
284
285   SUBROUTINE mppsend_dp( ktyp, pmess, kbytes, kdest, md_req )
286      !!----------------------------------------------------------------------
287      !!                  ***  routine mppsend  ***
288      !!
289      !! ** Purpose :   Send messag passing array
290      !!
291      !!----------------------------------------------------------------------
292      REAL(dp), INTENT(inout) ::   pmess(*)   ! array of real
293      INTEGER , INTENT(in   ) ::   kbytes     ! size of the array pmess
294      INTEGER , INTENT(in   ) ::   kdest      ! receive process number
295      INTEGER , INTENT(in   ) ::   ktyp       ! tag of the message
296      INTEGER , INTENT(in   ) ::   md_req     ! argument for isend
297      !!
298      INTEGER ::   iflag
299      !!----------------------------------------------------------------------
300      !
301#if defined key_mpp_mpi
302      CALL mpi_isend( pmess, kbytes, mpi_double_precision, kdest , ktyp, mpi_comm_oce, md_req, iflag )
303#endif
304      !
305   END SUBROUTINE mppsend_dp
306
307
308   SUBROUTINE mppsend_sp( ktyp, pmess, kbytes, kdest, md_req )
309      !!----------------------------------------------------------------------
310      !!                  ***  routine mppsend  ***
311      !!
312      !! ** Purpose :   Send messag passing array
313      !!
314      !!----------------------------------------------------------------------
315      REAL(sp), INTENT(inout) ::   pmess(*)   ! array of real
316      INTEGER , INTENT(in   ) ::   kbytes     ! size of the array pmess
317      INTEGER , INTENT(in   ) ::   kdest      ! receive process number
318      INTEGER , INTENT(in   ) ::   ktyp       ! tag of the message
319      INTEGER , INTENT(in   ) ::   md_req     ! argument for isend
320      !!
321      INTEGER ::   iflag
322      !!----------------------------------------------------------------------
323      !
324#if defined key_mpp_mpi
325      CALL mpi_isend( pmess, kbytes, mpi_real, kdest , ktyp, mpi_comm_oce, md_req, iflag )
326#endif
327      !
328   END SUBROUTINE mppsend_sp
329
330
331   SUBROUTINE mpprecv( ktyp, pmess, kbytes, ksource )
332      !!----------------------------------------------------------------------
333      !!                  ***  routine mpprecv  ***
334      !!
335      !! ** Purpose :   Receive messag passing array
336      !!
337      !!----------------------------------------------------------------------
338      REAL(wp), INTENT(inout) ::   pmess(*)   ! array of real
339      INTEGER , INTENT(in   ) ::   kbytes     ! suze of the array pmess
340      INTEGER , INTENT(in   ) ::   ktyp       ! Tag of the recevied message
341      INTEGER, OPTIONAL, INTENT(in) :: ksource    ! source process number
342      !!
343      INTEGER :: istatus(mpi_status_size)
344      INTEGER :: iflag
345      INTEGER :: use_source
346      INTEGER :: mpi_working_type
347      !!----------------------------------------------------------------------
348      !
349#if defined key_mpp_mpi
350      ! If a specific process number has been passed to the receive call,
351      ! use that one. Default is to use mpi_any_source
352      use_source = mpi_any_source
353      IF( PRESENT(ksource) )   use_source = ksource
354      !
355      IF (wp == dp) THEN
356         mpi_working_type = mpi_double_precision
357      ELSE
358         mpi_working_type = mpi_real
359      END IF
360      CALL mpi_recv( pmess, kbytes, mpi_working_type, use_source, ktyp, mpi_comm_oce, istatus, iflag )
361#endif
362      !
363   END SUBROUTINE mpprecv
364
365   SUBROUTINE mpprecv_dp( ktyp, pmess, kbytes, ksource )
366      !!----------------------------------------------------------------------
367      !!                  ***  routine mpprecv  ***
368      !!
369      !! ** Purpose :   Receive messag passing array
370      !!
371      !!----------------------------------------------------------------------
372      REAL(dp), INTENT(inout) ::   pmess(*)   ! array of real
373      INTEGER , INTENT(in   ) ::   kbytes     ! suze of the array pmess
374      INTEGER , INTENT(in   ) ::   ktyp       ! Tag of the recevied message
375      INTEGER, OPTIONAL, INTENT(in) :: ksource    ! source process number
376      !!
377      INTEGER :: istatus(mpi_status_size)
378      INTEGER :: iflag
379      INTEGER :: use_source
380      !!----------------------------------------------------------------------
381      !
382#if defined key_mpp_mpi
383      ! If a specific process number has been passed to the receive call,
384      ! use that one. Default is to use mpi_any_source
385      use_source = mpi_any_source
386      IF( PRESENT(ksource) )   use_source = ksource
387      !
388      CALL mpi_recv( pmess, kbytes, mpi_double_precision, use_source, ktyp, mpi_comm_oce, istatus, iflag )
389#endif
390      !
391   END SUBROUTINE mpprecv_dp
392
393
394   SUBROUTINE mpprecv_sp( ktyp, pmess, kbytes, ksource )
395      !!----------------------------------------------------------------------
396      !!                  ***  routine mpprecv  ***
397      !!
398      !! ** Purpose :   Receive messag passing array
399      !!
400      !!----------------------------------------------------------------------
401      REAL(sp), INTENT(inout) ::   pmess(*)   ! array of real
402      INTEGER , INTENT(in   ) ::   kbytes     ! suze of the array pmess
403      INTEGER , INTENT(in   ) ::   ktyp       ! Tag of the recevied message
404      INTEGER, OPTIONAL, INTENT(in) :: ksource    ! source process number
405      !!
406      INTEGER :: istatus(mpi_status_size)
407      INTEGER :: iflag
408      INTEGER :: use_source
409      !!----------------------------------------------------------------------
410      !
411#if defined key_mpp_mpi
412      ! If a specific process number has been passed to the receive call,
413      ! use that one. Default is to use mpi_any_source
414      use_source = mpi_any_source
415      IF( PRESENT(ksource) )   use_source = ksource
416      !
417      CALL mpi_recv( pmess, kbytes, mpi_real, use_source, ktyp, mpi_comm_oce, istatus, iflag )
418#endif
419      !
420   END SUBROUTINE mpprecv_sp
421
422
423   SUBROUTINE mppgather( ptab, kp, pio )
424      !!----------------------------------------------------------------------
425      !!                   ***  routine mppgather  ***
426      !!
427      !! ** Purpose :   Transfert between a local subdomain array and a work
428      !!     array which is distributed following the vertical level.
429      !!
430      !!----------------------------------------------------------------------
431      REAL(wp), DIMENSION(jpi,jpj)      , INTENT(in   ) ::   ptab   ! subdomain input array
432      INTEGER                           , INTENT(in   ) ::   kp     ! record length
433      REAL(wp), DIMENSION(jpi,jpj,jpnij), INTENT(  out) ::   pio    ! subdomain input array
434      !!
435      INTEGER :: itaille, ierror   ! temporary integer
436      !!---------------------------------------------------------------------
437      !
438      itaille = jpi * jpj
439#if defined key_mpp_mpi
440      CALL mpi_gather( ptab, itaille, mpi_double_precision, pio, itaille     ,   &
441         &                            mpi_double_precision, kp , mpi_comm_oce, ierror )
442#else
443      pio(:,:,1) = ptab(:,:)
444#endif
445      !
446   END SUBROUTINE mppgather
447
448
449   SUBROUTINE mppscatter( pio, kp, ptab )
450      !!----------------------------------------------------------------------
451      !!                  ***  routine mppscatter  ***
452      !!
453      !! ** Purpose :   Transfert between awork array which is distributed
454      !!      following the vertical level and the local subdomain array.
455      !!
456      !!----------------------------------------------------------------------
457      REAL(wp), DIMENSION(jpi,jpj,jpnij)  ::   pio    ! output array
458      INTEGER                             ::   kp     ! Tag (not used with MPI
459      REAL(wp), DIMENSION(jpi,jpj)        ::   ptab   ! subdomain array input
460      !!
461      INTEGER :: itaille, ierror   ! temporary integer
462      !!---------------------------------------------------------------------
463      !
464      itaille = jpi * jpj
465      !
466#if defined key_mpp_mpi
467      CALL mpi_scatter( pio, itaille, mpi_double_precision, ptab, itaille     ,   &
468         &                            mpi_double_precision, kp  , mpi_comm_oce, ierror )
469#else
470      ptab(:,:) = pio(:,:,1)
471#endif
472      !
473   END SUBROUTINE mppscatter
474
475
476   SUBROUTINE mpp_delay_sum( cdname, cdelay, y_in, pout, ldlast, kcom )
477     !!----------------------------------------------------------------------
478      !!                   ***  routine mpp_delay_sum  ***
479      !!
480      !! ** Purpose :   performed delayed mpp_sum, the result is received on next call
481      !!
482      !!----------------------------------------------------------------------
483      CHARACTER(len=*), INTENT(in   )               ::   cdname  ! name of the calling subroutine
484      CHARACTER(len=*), INTENT(in   )               ::   cdelay  ! name (used as id) of the delayed operation
485      COMPLEX(dp),      INTENT(in   ), DIMENSION(:) ::   y_in
486      REAL(wp),         INTENT(  out), DIMENSION(:) ::   pout
487      LOGICAL,          INTENT(in   )               ::   ldlast  ! true if this is the last time we call this routine
488      INTEGER,          INTENT(in   ), OPTIONAL     ::   kcom
489      !!
490      INTEGER ::   ji, isz
491      INTEGER ::   idvar
492      INTEGER ::   ierr, ilocalcomm
493      COMPLEX(dp), ALLOCATABLE, DIMENSION(:) ::   ytmp
494      !!----------------------------------------------------------------------
495#if defined key_mpp_mpi
496      ilocalcomm = mpi_comm_oce
497      IF( PRESENT(kcom) )   ilocalcomm = kcom
498
499      isz = SIZE(y_in)
500
501      IF( narea == 1 .AND. numcom == -1 ) CALL mpp_report( cdname, ld_dlg = .TRUE. )
502
503      idvar = -1
504      DO ji = 1, nbdelay
505         IF( TRIM(cdelay) == TRIM(c_delaylist(ji)) ) idvar = ji
506      END DO
507      IF ( idvar == -1 )   CALL ctl_stop( 'STOP',' mpp_delay_sum : please add a new delayed exchange for '//TRIM(cdname) )
508
509      IF ( ndelayid(idvar) == 0 ) THEN         ! first call    with restart: %z1d defined in iom_delay_rst
510         !                                       --------------------------
511         IF ( SIZE(todelay(idvar)%z1d) /= isz ) THEN                  ! Check dimension coherence
512            IF(lwp) WRITE(numout,*) ' WARNING: the nb of delayed variables in restart file is not the model one'
513            DEALLOCATE(todelay(idvar)%z1d)
514            ndelayid(idvar) = -1                                      ! do as if we had no restart
515         ELSE
516            ALLOCATE(todelay(idvar)%y1d(isz))
517            todelay(idvar)%y1d(:) = CMPLX(todelay(idvar)%z1d(:), 0., wp)   ! create %y1d, complex variable needed by mpi_sumdd
518            ndelayid(idvar) = MPI_REQUEST_NULL                             ! initialised request to a valid value
519         END IF
520      ENDIF
521
522      IF( ndelayid(idvar) == -1 ) THEN         ! first call without restart: define %y1d and %z1d from y_in with blocking allreduce
523         !                                       --------------------------
524         ALLOCATE(todelay(idvar)%z1d(isz), todelay(idvar)%y1d(isz))   ! allocate also %z1d as used for the restart
525         CALL mpi_allreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ierr )   ! get %y1d
526         ndelayid(idvar) = MPI_REQUEST_NULL
527      ENDIF
528
529      CALL mpp_delay_rcv( idvar )         ! make sure %z1d is received
530
531      ! send back pout from todelay(idvar)%z1d defined at previous call
532      pout(:) = todelay(idvar)%z1d(:)
533
534      ! send y_in into todelay(idvar)%y1d with a non-blocking communication
535# if defined key_mpi2
536      IF( ln_timing ) CALL tic_tac( .TRUE., ld_global = .TRUE.)
537      CALL  mpi_allreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ierr )
538      ndelayid(idvar) = MPI_REQUEST_NULL
539      IF( ln_timing ) CALL tic_tac(.FALSE., ld_global = .TRUE.)
540# else
541      CALL mpi_iallreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ndelayid(idvar), ierr )
542# endif
543#else
544      pout(:) = REAL(y_in(:), wp)
545#endif
546
547   END SUBROUTINE mpp_delay_sum
548
549
550   SUBROUTINE mpp_delay_max( cdname, cdelay, p_in, pout, ldlast, kcom )
551      !!----------------------------------------------------------------------
552      !!                   ***  routine mpp_delay_max  ***
553      !!
554      !! ** Purpose :   performed delayed mpp_max, the result is received on next call
555      !!
556      !!----------------------------------------------------------------------
557      CHARACTER(len=*), INTENT(in   )                 ::   cdname  ! name of the calling subroutine
558      CHARACTER(len=*), INTENT(in   )                 ::   cdelay  ! name (used as id) of the delayed operation
559      REAL(wp),         INTENT(in   ), DIMENSION(:)   ::   p_in    !
560      REAL(wp),         INTENT(  out), DIMENSION(:)   ::   pout    !
561      LOGICAL,          INTENT(in   )                 ::   ldlast  ! true if this is the last time we call this routine
562      INTEGER,          INTENT(in   ), OPTIONAL       ::   kcom
563      !!
564      INTEGER ::   ji, isz
565      INTEGER ::   idvar
566      INTEGER ::   ierr, ilocalcomm
567      INTEGER ::   MPI_TYPE
568      !!----------------------------------------------------------------------
569
570#if defined key_mpp_mpi
571      if( wp == dp ) then
572         MPI_TYPE = MPI_DOUBLE_PRECISION
573      else if ( wp == sp ) then
574         MPI_TYPE = MPI_REAL
575      else
576        CALL ctl_stop( "Error defining type, wp is neither dp nor sp" )
577
578      end if
579
580      ilocalcomm = mpi_comm_oce
581      IF( PRESENT(kcom) )   ilocalcomm = kcom
582
583      isz = SIZE(p_in)
584
585      IF( narea == 1 .AND. numcom == -1 ) CALL mpp_report( cdname, ld_dlg = .TRUE. )
586
587      idvar = -1
588      DO ji = 1, nbdelay
589         IF( TRIM(cdelay) == TRIM(c_delaylist(ji)) ) idvar = ji
590      END DO
591      IF ( idvar == -1 )   CALL ctl_stop( 'STOP',' mpp_delay_max : please add a new delayed exchange for '//TRIM(cdname) )
592
593      IF ( ndelayid(idvar) == 0 ) THEN         ! first call    with restart: %z1d defined in iom_delay_rst
594         !                                       --------------------------
595         IF ( SIZE(todelay(idvar)%z1d) /= isz ) THEN                  ! Check dimension coherence
596            IF(lwp) WRITE(numout,*) ' WARNING: the nb of delayed variables in restart file is not the model one'
597            DEALLOCATE(todelay(idvar)%z1d)
598            ndelayid(idvar) = -1                                      ! do as if we had no restart
599         ELSE
600            ndelayid(idvar) = MPI_REQUEST_NULL
601         END IF
602      ENDIF
603
604      IF( ndelayid(idvar) == -1 ) THEN         ! first call without restart: define %z1d from p_in with a blocking allreduce
605         !                                       --------------------------
606         ALLOCATE(todelay(idvar)%z1d(isz))
607         CALL mpi_allreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_DOUBLE_PRECISION, mpi_max, ilocalcomm, ierr )   ! get %z1d
608         ndelayid(idvar) = MPI_REQUEST_NULL
609      ENDIF
610
611      CALL mpp_delay_rcv( idvar )         ! make sure %z1d is received
612
613      ! send back pout from todelay(idvar)%z1d defined at previous call
614      pout(:) = todelay(idvar)%z1d(:)
615
616      ! send p_in into todelay(idvar)%z1d with a non-blocking communication
617      ! (PM) Should we get rid of MPI2 option ? MPI3 was release in 2013. Who is still using MPI2 ?
618# if defined key_mpi2
619      IF( ln_timing ) CALL tic_tac( .TRUE., ld_global = .TRUE.)
620      CALL  mpi_allreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_TYPE, mpi_max, ilocalcomm, ierr )
621      IF( ln_timing ) CALL tic_tac(.FALSE., ld_global = .TRUE.)
622# else
623      CALL mpi_iallreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_TYPE, mpi_max, ilocalcomm, ndelayid(idvar), ierr )
624# endif
625#else
626      pout(:) = p_in(:)
627#endif
628
629   END SUBROUTINE mpp_delay_max
630
631
632   SUBROUTINE mpp_delay_rcv( kid )
633      !!----------------------------------------------------------------------
634      !!                   ***  routine mpp_delay_rcv  ***
635      !!
636      !! ** Purpose :  force barrier for delayed mpp (needed for restart)
637      !!
638      !!----------------------------------------------------------------------
639      INTEGER,INTENT(in   )      ::  kid
640      INTEGER ::   ierr
641      !!----------------------------------------------------------------------
642#if defined key_mpp_mpi
643      IF( ln_timing ) CALL tic_tac( .TRUE., ld_global = .TRUE.)
644      ! test on ndelayid(kid) useless as mpi_wait return immediatly if the request handle is MPI_REQUEST_NULL
645      CALL mpi_wait( ndelayid(kid), MPI_STATUS_IGNORE, ierr ) ! after this ndelayid(kid) = MPI_REQUEST_NULL
646      IF( ln_timing ) CALL tic_tac( .FALSE., ld_global = .TRUE.)
647      IF( ASSOCIATED(todelay(kid)%y1d) )   todelay(kid)%z1d(:) = REAL(todelay(kid)%y1d(:), wp)  ! define %z1d from %y1d
648#endif
649   END SUBROUTINE mpp_delay_rcv
650
651   SUBROUTINE mpp_bcast_nml( cdnambuff , kleng )
652      CHARACTER(LEN=:)    , ALLOCATABLE, INTENT(INOUT) :: cdnambuff
653      INTEGER                          , INTENT(INOUT) :: kleng
654      !!----------------------------------------------------------------------
655      !!                  ***  routine mpp_bcast_nml  ***
656      !!
657      !! ** Purpose :   broadcast namelist character buffer
658      !!
659      !!----------------------------------------------------------------------
660      !!
661      INTEGER ::   iflag
662      !!----------------------------------------------------------------------
663      !
664#if defined key_mpp_mpi
665      call MPI_BCAST(kleng, 1, MPI_INT, 0, mpi_comm_oce, iflag)
666      call MPI_BARRIER(mpi_comm_oce, iflag)
667!$AGRIF_DO_NOT_TREAT
668      IF ( .NOT. ALLOCATED(cdnambuff) ) ALLOCATE( CHARACTER(LEN=kleng) :: cdnambuff )
669!$AGRIF_END_DO_NOT_TREAT
670      call MPI_BCAST(cdnambuff, kleng, MPI_CHARACTER, 0, mpi_comm_oce, iflag)
671      call MPI_BARRIER(mpi_comm_oce, iflag)
672#endif
673      !
674   END SUBROUTINE mpp_bcast_nml
675
676
677   !!----------------------------------------------------------------------
678   !!    ***  mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real  ***
679   !!
680   !!----------------------------------------------------------------------
681   !!
682#  define OPERATION_MAX
683#  define INTEGER_TYPE
684#  define DIM_0d
685#     define ROUTINE_ALLREDUCE           mppmax_int
686#     include "mpp_allreduce_generic.h90"
687#     undef ROUTINE_ALLREDUCE
688#  undef DIM_0d
689#  define DIM_1d
690#     define ROUTINE_ALLREDUCE           mppmax_a_int
691#     include "mpp_allreduce_generic.h90"
692#     undef ROUTINE_ALLREDUCE
693#  undef DIM_1d
694#  undef INTEGER_TYPE
695!
696   !!
697   !!   ----   SINGLE PRECISION VERSIONS
698   !!
699#  define SINGLE_PRECISION
700#  define REAL_TYPE
701#  define DIM_0d
702#     define ROUTINE_ALLREDUCE           mppmax_real_sp
703#     include "mpp_allreduce_generic.h90"
704#     undef ROUTINE_ALLREDUCE
705#  undef DIM_0d
706#  define DIM_1d
707#     define ROUTINE_ALLREDUCE           mppmax_a_real_sp
708#     include "mpp_allreduce_generic.h90"
709#     undef ROUTINE_ALLREDUCE
710#  undef DIM_1d
711#  undef SINGLE_PRECISION
712   !!
713   !!
714   !!   ----   DOUBLE PRECISION VERSIONS
715   !!
716!
717#  define DIM_0d
718#     define ROUTINE_ALLREDUCE           mppmax_real_dp
719#     include "mpp_allreduce_generic.h90"
720#     undef ROUTINE_ALLREDUCE
721#  undef DIM_0d
722#  define DIM_1d
723#     define ROUTINE_ALLREDUCE           mppmax_a_real_dp
724#     include "mpp_allreduce_generic.h90"
725#     undef ROUTINE_ALLREDUCE
726#  undef DIM_1d
727#  undef REAL_TYPE
728#  undef OPERATION_MAX
729   !!----------------------------------------------------------------------
730   !!    ***  mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real  ***
731   !!
732   !!----------------------------------------------------------------------
733   !!
734#  define OPERATION_MIN
735#  define INTEGER_TYPE
736#  define DIM_0d
737#     define ROUTINE_ALLREDUCE           mppmin_int
738#     include "mpp_allreduce_generic.h90"
739#     undef ROUTINE_ALLREDUCE
740#  undef DIM_0d
741#  define DIM_1d
742#     define ROUTINE_ALLREDUCE           mppmin_a_int
743#     include "mpp_allreduce_generic.h90"
744#     undef ROUTINE_ALLREDUCE
745#  undef DIM_1d
746#  undef INTEGER_TYPE
747!
748   !!
749   !!   ----   SINGLE PRECISION VERSIONS
750   !!
751#  define SINGLE_PRECISION
752#  define REAL_TYPE
753#  define DIM_0d
754#     define ROUTINE_ALLREDUCE           mppmin_real_sp
755#     include "mpp_allreduce_generic.h90"
756#     undef ROUTINE_ALLREDUCE
757#  undef DIM_0d
758#  define DIM_1d
759#     define ROUTINE_ALLREDUCE           mppmin_a_real_sp
760#     include "mpp_allreduce_generic.h90"
761#     undef ROUTINE_ALLREDUCE
762#  undef DIM_1d
763#  undef SINGLE_PRECISION
764   !!
765   !!   ----   DOUBLE PRECISION VERSIONS
766   !!
767
768#  define DIM_0d
769#     define ROUTINE_ALLREDUCE           mppmin_real_dp
770#     include "mpp_allreduce_generic.h90"
771#     undef ROUTINE_ALLREDUCE
772#  undef DIM_0d
773#  define DIM_1d
774#     define ROUTINE_ALLREDUCE           mppmin_a_real_dp
775#     include "mpp_allreduce_generic.h90"
776#     undef ROUTINE_ALLREDUCE
777#  undef DIM_1d
778#  undef REAL_TYPE
779#  undef OPERATION_MIN
780
781   !!----------------------------------------------------------------------
782   !!    ***  mppsum_a_int, mppsum_int, mppsum_a_real, mppsum_real  ***
783   !!
784   !!   Global sum of 1D array or a variable (integer, real or complex)
785   !!----------------------------------------------------------------------
786   !!
787#  define OPERATION_SUM
788#  define INTEGER_TYPE
789#  define DIM_0d
790#     define ROUTINE_ALLREDUCE           mppsum_int
791#     include "mpp_allreduce_generic.h90"
792#     undef ROUTINE_ALLREDUCE
793#  undef DIM_0d
794#  define DIM_1d
795#     define ROUTINE_ALLREDUCE           mppsum_a_int
796#     include "mpp_allreduce_generic.h90"
797#     undef ROUTINE_ALLREDUCE
798#  undef DIM_1d
799#  undef INTEGER_TYPE
800
801   !!
802   !!   ----   SINGLE PRECISION VERSIONS
803   !!
804#  define OPERATION_SUM
805#  define SINGLE_PRECISION
806#  define REAL_TYPE
807#  define DIM_0d
808#     define ROUTINE_ALLREDUCE           mppsum_real_sp
809#     include "mpp_allreduce_generic.h90"
810#     undef ROUTINE_ALLREDUCE
811#  undef DIM_0d
812#  define DIM_1d
813#     define ROUTINE_ALLREDUCE           mppsum_a_real_sp
814#     include "mpp_allreduce_generic.h90"
815#     undef ROUTINE_ALLREDUCE
816#  undef DIM_1d
817#  undef REAL_TYPE
818#  undef OPERATION_SUM
819
820#  undef SINGLE_PRECISION
821
822   !!
823   !!   ----   DOUBLE PRECISION VERSIONS
824   !!
825#  define OPERATION_SUM
826#  define REAL_TYPE
827#  define DIM_0d
828#     define ROUTINE_ALLREDUCE           mppsum_real_dp
829#     include "mpp_allreduce_generic.h90"
830#     undef ROUTINE_ALLREDUCE
831#  undef DIM_0d
832#  define DIM_1d
833#     define ROUTINE_ALLREDUCE           mppsum_a_real_dp
834#     include "mpp_allreduce_generic.h90"
835#     undef ROUTINE_ALLREDUCE
836#  undef DIM_1d
837#  undef REAL_TYPE
838#  undef OPERATION_SUM
839
840#  define OPERATION_SUM_DD
841#  define COMPLEX_TYPE
842#  define DIM_0d
843#     define ROUTINE_ALLREDUCE           mppsum_realdd
844#     include "mpp_allreduce_generic.h90"
845#     undef ROUTINE_ALLREDUCE
846#  undef DIM_0d
847#  define DIM_1d
848#     define ROUTINE_ALLREDUCE           mppsum_a_realdd
849#     include "mpp_allreduce_generic.h90"
850#     undef ROUTINE_ALLREDUCE
851#  undef DIM_1d
852#  undef COMPLEX_TYPE
853#  undef OPERATION_SUM_DD
854
855   !!----------------------------------------------------------------------
856   !!    ***  mpp_minloc2d, mpp_minloc3d, mpp_maxloc2d, mpp_maxloc3d
857   !!
858   !!----------------------------------------------------------------------
859   !!
860   !!
861   !!   ----   SINGLE PRECISION VERSIONS
862   !!
863#  define SINGLE_PRECISION
864#  define OPERATION_MINLOC
865#  define DIM_2d
866#     define ROUTINE_LOC           mpp_minloc2d_sp
867#     include "mpp_loc_generic.h90"
868#     undef ROUTINE_LOC
869#  undef DIM_2d
870#  define DIM_3d
871#     define ROUTINE_LOC           mpp_minloc3d_sp
872#     include "mpp_loc_generic.h90"
873#     undef ROUTINE_LOC
874#  undef DIM_3d
875#  undef OPERATION_MINLOC
876
877#  define OPERATION_MAXLOC
878#  define DIM_2d
879#     define ROUTINE_LOC           mpp_maxloc2d_sp
880#     include "mpp_loc_generic.h90"
881#     undef ROUTINE_LOC
882#  undef DIM_2d
883#  define DIM_3d
884#     define ROUTINE_LOC           mpp_maxloc3d_sp
885#     include "mpp_loc_generic.h90"
886#     undef ROUTINE_LOC
887#  undef DIM_3d
888#  undef OPERATION_MAXLOC
889#  undef SINGLE_PRECISION
890   !!
891   !!   ----   DOUBLE PRECISION VERSIONS
892   !!
893#  define OPERATION_MINLOC
894#  define DIM_2d
895#     define ROUTINE_LOC           mpp_minloc2d_dp
896#     include "mpp_loc_generic.h90"
897#     undef ROUTINE_LOC
898#  undef DIM_2d
899#  define DIM_3d
900#     define ROUTINE_LOC           mpp_minloc3d_dp
901#     include "mpp_loc_generic.h90"
902#     undef ROUTINE_LOC
903#  undef DIM_3d
904#  undef OPERATION_MINLOC
905
906#  define OPERATION_MAXLOC
907#  define DIM_2d
908#     define ROUTINE_LOC           mpp_maxloc2d_dp
909#     include "mpp_loc_generic.h90"
910#     undef ROUTINE_LOC
911#  undef DIM_2d
912#  define DIM_3d
913#     define ROUTINE_LOC           mpp_maxloc3d_dp
914#     include "mpp_loc_generic.h90"
915#     undef ROUTINE_LOC
916#  undef DIM_3d
917#  undef OPERATION_MAXLOC
918
919
920   SUBROUTINE mppsync()
921      !!----------------------------------------------------------------------
922      !!                  ***  routine mppsync  ***
923      !!
924      !! ** Purpose :   Massively parallel processors, synchroneous
925      !!
926      !!-----------------------------------------------------------------------
927      INTEGER :: ierror
928      !!-----------------------------------------------------------------------
929      !
930#if defined key_mpp_mpi
931      CALL mpi_barrier( mpi_comm_oce, ierror )
932#endif
933      !
934   END SUBROUTINE mppsync
935
936
937   SUBROUTINE mppstop( ld_abort )
938      !!----------------------------------------------------------------------
939      !!                  ***  routine mppstop  ***
940      !!
941      !! ** purpose :   Stop massively parallel processors method
942      !!
943      !!----------------------------------------------------------------------
944      LOGICAL, OPTIONAL, INTENT(in) :: ld_abort    ! source process number
945      LOGICAL ::   ll_abort
946      INTEGER ::   info
947      !!----------------------------------------------------------------------
948      ll_abort = .FALSE.
949      IF( PRESENT(ld_abort) ) ll_abort = ld_abort
950      !
951#if defined key_mpp_mpi
952      IF(ll_abort) THEN
953         CALL mpi_abort( MPI_COMM_WORLD )
954      ELSE
955         CALL mppsync
956         CALL mpi_finalize( info )
957      ENDIF
958#endif
959      IF( ll_abort ) STOP 123
960      !
961   END SUBROUTINE mppstop
962
963
964   SUBROUTINE mpp_comm_free( kcom )
965      !!----------------------------------------------------------------------
966      INTEGER, INTENT(in) ::   kcom
967      !!
968      INTEGER :: ierr
969      !!----------------------------------------------------------------------
970      !
971#if defined key_mpp_mpi
972      CALL MPI_COMM_FREE(kcom, ierr)
973#endif
974      !
975   END SUBROUTINE mpp_comm_free
976
977
978   SUBROUTINE mpp_ini_znl( kumout )
979      !!----------------------------------------------------------------------
980      !!               ***  routine mpp_ini_znl  ***
981      !!
982      !! ** Purpose :   Initialize special communicator for computing zonal sum
983      !!
984      !! ** Method  : - Look for processors in the same row
985      !!              - Put their number in nrank_znl
986      !!              - Create group for the znl processors
987      !!              - Create a communicator for znl processors
988      !!              - Determine if processor should write znl files
989      !!
990      !! ** output
991      !!      ndim_rank_znl = number of processors on the same row
992      !!      ngrp_znl = group ID for the znl processors
993      !!      ncomm_znl = communicator for the ice procs.
994      !!      n_znl_root = number (in the world) of proc 0 in the ice comm.
995      !!
996      !!----------------------------------------------------------------------
997      INTEGER, INTENT(in) ::   kumout   ! ocean.output logical units
998      !
999      INTEGER :: jproc      ! dummy loop integer
1000      INTEGER :: ierr, ii   ! local integer
1001      INTEGER, ALLOCATABLE, DIMENSION(:) ::   kwork
1002      !!----------------------------------------------------------------------
1003#if defined key_mpp_mpi
1004      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_world     : ', ngrp_world
1005      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - mpi_comm_world : ', mpi_comm_world
1006      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - mpi_comm_oce   : ', mpi_comm_oce
1007      !
1008      ALLOCATE( kwork(jpnij), STAT=ierr )
1009      IF( ierr /= 0 ) CALL ctl_stop( 'STOP', 'mpp_ini_znl : failed to allocate 1D array of length jpnij')
1010
1011      IF( jpnj == 1 ) THEN
1012         ngrp_znl  = ngrp_world
1013         ncomm_znl = mpi_comm_oce
1014      ELSE
1015         !
1016         CALL MPI_ALLGATHER ( njmpp, 1, mpi_integer, kwork, 1, mpi_integer, mpi_comm_oce, ierr )
1017         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - kwork pour njmpp : ', kwork
1018         !-$$        CALL flush(numout)
1019         !
1020         ! Count number of processors on the same row
1021         ndim_rank_znl = 0
1022         DO jproc=1,jpnij
1023            IF ( kwork(jproc) == njmpp ) THEN
1024               ndim_rank_znl = ndim_rank_znl + 1
1025            ENDIF
1026         END DO
1027         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ndim_rank_znl : ', ndim_rank_znl
1028         !-$$        CALL flush(numout)
1029         ! Allocate the right size to nrank_znl
1030         IF (ALLOCATED (nrank_znl)) DEALLOCATE(nrank_znl)
1031         ALLOCATE(nrank_znl(ndim_rank_znl))
1032         ii = 0
1033         nrank_znl (:) = 0
1034         DO jproc=1,jpnij
1035            IF ( kwork(jproc) == njmpp) THEN
1036               ii = ii + 1
1037               nrank_znl(ii) = jproc -1
1038            ENDIF
1039         END DO
1040         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - nrank_znl : ', nrank_znl
1041         !-$$        CALL flush(numout)
1042
1043         ! Create the opa group
1044         CALL MPI_COMM_GROUP(mpi_comm_oce,ngrp_opa,ierr)
1045         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_opa : ', ngrp_opa
1046         !-$$        CALL flush(numout)
1047
1048         ! Create the znl group from the opa group
1049         CALL MPI_GROUP_INCL  ( ngrp_opa, ndim_rank_znl, nrank_znl, ngrp_znl, ierr )
1050         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_znl ', ngrp_znl
1051         !-$$        CALL flush(numout)
1052
1053         ! Create the znl communicator from the opa communicator, ie the pool of procs in the same row
1054         CALL MPI_COMM_CREATE ( mpi_comm_oce, ngrp_znl, ncomm_znl, ierr )
1055         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ncomm_znl ', ncomm_znl
1056         !-$$        CALL flush(numout)
1057         !
1058      END IF
1059
1060      ! Determines if processor if the first (starting from i=1) on the row
1061      IF ( jpni == 1 ) THEN
1062         l_znl_root = .TRUE.
1063      ELSE
1064         l_znl_root = .FALSE.
1065         kwork (1) = nimpp
1066         CALL mpp_min ( 'lib_mpp', kwork(1), kcom = ncomm_znl)
1067         IF ( nimpp == kwork(1)) l_znl_root = .TRUE.
1068      END IF
1069
1070      DEALLOCATE(kwork)
1071#endif
1072
1073   END SUBROUTINE mpp_ini_znl
1074
1075   SUBROUTINE mpp_ini_nc
1076      !!----------------------------------------------------------------------
1077      !!               ***  routine mpp_ini_nc  ***
1078      !!
1079      !! ** Purpose :   Initialize special communicators for MPI3 neighbourhood
1080      !!                collectives
1081      !!
1082      !! ** Method  : - Create graph communicators starting from the processes
1083      !!                distribution along i and j directions
1084      !
1085      !! ** output
1086      !!         mpi_nc_com = MPI3 neighbourhood collectives communicator
1087      !!         mpi_nc_all_com = MPI3 neighbourhood collectives communicator
1088      !!                          (with diagonals)
1089      !!
1090      !!----------------------------------------------------------------------
1091      INTEGER, DIMENSION(:), ALLOCATABLE :: ineigh, ineighalls, ineighallr
1092      INTEGER :: ideg, idegalls, idegallr, icont, icont1
1093      INTEGER :: ierr
1094      LOGICAL, PARAMETER :: ireord = .FALSE.
1095
1096#if defined key_mpp_mpi
1097
1098      ideg = 0
1099      idegalls = 0
1100      idegallr = 0
1101      icont = 0
1102      icont1 = 0
1103
1104      IF (nbondi .eq. 1) THEN
1105         ideg = ideg + 1
1106      ELSEIF (nbondi .eq. -1) THEN
1107         ideg = ideg + 1
1108      ELSEIF (nbondi .eq. 0) THEN
1109         ideg = ideg + 2
1110      ENDIF
1111
1112      IF (nbondj .eq. 1) THEN
1113         ideg = ideg + 1
1114      ELSEIF (nbondj .eq. -1) THEN
1115         ideg = ideg + 1
1116      ELSEIF (nbondj .eq. 0) THEN
1117         ideg = ideg + 2
1118      ENDIF
1119
1120      idegalls = ideg
1121      idegallr = ideg
1122
1123      IF (nones .ne. -1) idegalls = idegalls + 1
1124      IF (nonws .ne. -1) idegalls = idegalls + 1
1125      IF (noses .ne. -1) idegalls = idegalls + 1
1126      IF (nosws .ne. -1) idegalls = idegalls + 1
1127      IF (noner .ne. -1) idegallr = idegallr + 1
1128      IF (nonwr .ne. -1) idegallr = idegallr + 1
1129      IF (noser .ne. -1) idegallr = idegallr + 1
1130      IF (noswr .ne. -1) idegallr = idegallr + 1
1131
1132      ALLOCATE(ineigh(ideg))
1133      ALLOCATE(ineighalls(idegalls))
1134      ALLOCATE(ineighallr(idegallr))
1135
1136      IF (nbondi .eq. 1) THEN
1137         icont = icont + 1
1138         ineigh(icont) = nowe
1139         ineighalls(icont) = nowe
1140         ineighallr(icont) = nowe
1141      ELSEIF (nbondi .eq. -1) THEN
1142         icont = icont + 1
1143         ineigh(icont) = noea
1144         ineighalls(icont) = noea
1145         ineighallr(icont) = noea
1146      ELSEIF (nbondi .eq. 0) THEN
1147         icont = icont + 1
1148         ineigh(icont) = nowe
1149         ineighalls(icont) = nowe
1150         ineighallr(icont) = nowe
1151         icont = icont + 1
1152         ineigh(icont) = noea
1153         ineighalls(icont) = noea
1154         ineighallr(icont) = noea
1155      ENDIF
1156
1157      IF (nbondj .eq. 1) THEN
1158         icont = icont + 1
1159         ineigh(icont) = noso
1160         ineighalls(icont) = noso
1161         ineighallr(icont) = noso
1162      ELSEIF (nbondj .eq. -1) THEN
1163         icont = icont + 1
1164         ineigh(icont) = nono
1165         ineighalls(icont) = nono
1166         ineighallr(icont) = nono
1167      ELSEIF (nbondj .eq. 0) THEN
1168         icont = icont + 1
1169         ineigh(icont) = noso
1170         ineighalls(icont) = noso
1171         ineighallr(icont) = noso
1172         icont = icont + 1
1173         ineigh(icont) = nono
1174         ineighalls(icont) = nono
1175         ineighallr(icont) = nono
1176      ENDIF
1177
1178      icont1 = icont
1179      IF (nosws .ne. -1) THEN
1180         icont = icont + 1
1181         ineighalls(icont) = nosws
1182      ENDIF
1183      IF (noses .ne. -1) THEN
1184         icont = icont + 1
1185         ineighalls(icont) = noses
1186      ENDIF
1187      IF (nonws .ne. -1) THEN
1188         icont = icont + 1
1189         ineighalls(icont) = nonws
1190      ENDIF
1191      IF (nones .ne. -1) THEN
1192         icont = icont + 1
1193         ineighalls(icont) = nones
1194      ENDIF
1195      IF (noswr .ne. -1) THEN
1196         icont1 = icont1 + 1
1197         ineighallr(icont1) = noswr
1198      ENDIF
1199      IF (noser .ne. -1) THEN
1200         icont1 = icont1 + 1
1201         ineighallr(icont1) = noser
1202      ENDIF
1203      IF (nonwr .ne. -1) THEN
1204         icont1 = icont1 + 1
1205         ineighallr(icont1) = nonwr
1206      ENDIF
1207      IF (noner .ne. -1) THEN
1208         icont1 = icont1 + 1
1209         ineighallr(icont1) = noner
1210      ENDIF
1211
1212      CALL MPI_Dist_graph_create_adjacent(mpi_comm_oce, ideg, ineigh, MPI_UNWEIGHTED, ideg, ineigh, MPI_UNWEIGHTED, MPI_INFO_NULL, ireord, mpi_nc_com, ierr)
1213      CALL MPI_Dist_graph_create_adjacent(mpi_comm_oce, idegallr, ineighallr, MPI_UNWEIGHTED, idegalls, ineighalls, MPI_UNWEIGHTED, MPI_INFO_NULL, ireord, mpi_nc_all_com, ierr)
1214
1215      DEALLOCATE (ineigh)
1216      DEALLOCATE (ineighalls)
1217      DEALLOCATE (ineighallr)
1218#endif
1219   END SUBROUTINE mpp_ini_nc
1220
1221
1222
1223   SUBROUTINE mpp_ini_north
1224      !!----------------------------------------------------------------------
1225      !!               ***  routine mpp_ini_north  ***
1226      !!
1227      !! ** Purpose :   Initialize special communicator for north folding
1228      !!      condition together with global variables needed in the mpp folding
1229      !!
1230      !! ** Method  : - Look for northern processors
1231      !!              - Put their number in nrank_north
1232      !!              - Create groups for the world processors and the north processors
1233      !!              - Create a communicator for northern processors
1234      !!
1235      !! ** output
1236      !!      njmppmax = njmpp for northern procs
1237      !!      ndim_rank_north = number of processors in the northern line
1238      !!      nrank_north (ndim_rank_north) = number  of the northern procs.
1239      !!      ngrp_world = group ID for the world processors
1240      !!      ngrp_north = group ID for the northern processors
1241      !!      ncomm_north = communicator for the northern procs.
1242      !!      north_root = number (in the world) of proc 0 in the northern comm.
1243      !!
1244      !!----------------------------------------------------------------------
1245      INTEGER ::   ierr
1246      INTEGER ::   jjproc
1247      INTEGER ::   ii, ji
1248      !!----------------------------------------------------------------------
1249      !
1250#if defined key_mpp_mpi
1251      njmppmax = MAXVAL( njmppt )
1252      !
1253      ! Look for how many procs on the northern boundary
1254      ndim_rank_north = 0
1255      DO jjproc = 1, jpni
1256         IF( nfproc(jjproc) /= -1 )   ndim_rank_north = ndim_rank_north + 1
1257      END DO
1258      !
1259      ! Allocate the right size to nrank_north
1260      IF (ALLOCATED (nrank_north)) DEALLOCATE(nrank_north)
1261      ALLOCATE( nrank_north(ndim_rank_north) )
1262
1263      ! Fill the nrank_north array with proc. number of northern procs.
1264      ! Note : the rank start at 0 in MPI
1265      ii = 0
1266      DO ji = 1, jpni
1267         IF ( nfproc(ji) /= -1   ) THEN
1268            ii=ii+1
1269            nrank_north(ii)=nfproc(ji)
1270         END IF
1271      END DO
1272      !
1273      ! create the world group
1274      CALL MPI_COMM_GROUP( mpi_comm_oce, ngrp_world, ierr )
1275      !
1276      ! Create the North group from the world group
1277      CALL MPI_GROUP_INCL( ngrp_world, ndim_rank_north, nrank_north, ngrp_north, ierr )
1278      !
1279      ! Create the North communicator , ie the pool of procs in the north group
1280      CALL MPI_COMM_CREATE( mpi_comm_oce, ngrp_north, ncomm_north, ierr )
1281      !
1282#endif
1283   END SUBROUTINE mpp_ini_north
1284
1285
1286   SUBROUTINE DDPDD_MPI( ydda, yddb, ilen, itype )
1287      !!---------------------------------------------------------------------
1288      !!   Routine DDPDD_MPI: used by reduction operator MPI_SUMDD
1289      !!
1290      !!   Modification of original codes written by David H. Bailey
1291      !!   This subroutine computes yddb(i) = ydda(i)+yddb(i)
1292      !!---------------------------------------------------------------------
1293      INTEGER                     , INTENT(in)    ::   ilen, itype
1294      COMPLEX(dp), DIMENSION(ilen), INTENT(in)    ::   ydda
1295      COMPLEX(dp), DIMENSION(ilen), INTENT(inout) ::   yddb
1296      !
1297      REAL(dp) :: zerr, zt1, zt2    ! local work variables
1298      INTEGER  :: ji, ztmp           ! local scalar
1299      !!---------------------------------------------------------------------
1300      !
1301      ztmp = itype   ! avoid compilation warning
1302      !
1303      DO ji=1,ilen
1304      ! Compute ydda + yddb using Knuth's trick.
1305         zt1  = real(ydda(ji)) + real(yddb(ji))
1306         zerr = zt1 - real(ydda(ji))
1307         zt2  = ((real(yddb(ji)) - zerr) + (real(ydda(ji)) - (zt1 - zerr))) &
1308                + aimag(ydda(ji)) + aimag(yddb(ji))
1309
1310         ! The result is zt1 + zt2, after normalization.
1311         yddb(ji) = cmplx ( zt1 + zt2, zt2 - ((zt1 + zt2) - zt1),wp )
1312      END DO
1313      !
1314   END SUBROUTINE DDPDD_MPI
1315
1316
1317   SUBROUTINE mpp_report( cdname, kpk, kpl, kpf, ld_lbc, ld_glb, ld_dlg )
1318      !!----------------------------------------------------------------------
1319      !!                  ***  routine mpp_report  ***
1320      !!
1321      !! ** Purpose :   report use of mpp routines per time-setp
1322      !!
1323      !!----------------------------------------------------------------------
1324      CHARACTER(len=*),           INTENT(in   ) ::   cdname      ! name of the calling subroutine
1325      INTEGER         , OPTIONAL, INTENT(in   ) ::   kpk, kpl, kpf
1326      LOGICAL         , OPTIONAL, INTENT(in   ) ::   ld_lbc, ld_glb, ld_dlg
1327      !!
1328      CHARACTER(len=128)                        ::   ccountname  ! name of a subroutine to count communications
1329      LOGICAL ::   ll_lbc, ll_glb, ll_dlg
1330      INTEGER ::    ji,  jj,  jk,  jh, jf, jcount   ! dummy loop indices
1331      !!----------------------------------------------------------------------
1332#if defined key_mpp_mpi
1333      !
1334      ll_lbc = .FALSE.
1335      IF( PRESENT(ld_lbc) ) ll_lbc = ld_lbc
1336      ll_glb = .FALSE.
1337      IF( PRESENT(ld_glb) ) ll_glb = ld_glb
1338      ll_dlg = .FALSE.
1339      IF( PRESENT(ld_dlg) ) ll_dlg = ld_dlg
1340      !
1341      ! find the smallest common frequency: default = frequency product, if multiple, choose the larger of the 2 frequency
1342      ncom_freq = ncom_fsbc
1343      !
1344      IF ( ncom_stp == nit000+ncom_freq ) THEN   ! avoid to count extra communications in potential initializations at nit000
1345         IF( ll_lbc ) THEN
1346            IF( .NOT. ALLOCATED(ncomm_sequence) ) ALLOCATE( ncomm_sequence(ncom_rec_max,2) )
1347            IF( .NOT. ALLOCATED(    crname_lbc) ) ALLOCATE(     crname_lbc(ncom_rec_max  ) )
1348            n_sequence_lbc = n_sequence_lbc + 1
1349            IF( n_sequence_lbc > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
1350            crname_lbc(n_sequence_lbc) = cdname     ! keep the name of the calling routine
1351            ncomm_sequence(n_sequence_lbc,1) = kpk*kpl   ! size of 3rd and 4th dimensions
1352            ncomm_sequence(n_sequence_lbc,2) = kpf       ! number of arrays to be treated (multi)
1353         ENDIF
1354         IF( ll_glb ) THEN
1355            IF( .NOT. ALLOCATED(crname_glb) ) ALLOCATE( crname_glb(ncom_rec_max) )
1356            n_sequence_glb = n_sequence_glb + 1
1357            IF( n_sequence_glb > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
1358            crname_glb(n_sequence_glb) = cdname     ! keep the name of the calling routine
1359         ENDIF
1360         IF( ll_dlg ) THEN
1361            IF( .NOT. ALLOCATED(crname_dlg) ) ALLOCATE( crname_dlg(ncom_rec_max) )
1362            n_sequence_dlg = n_sequence_dlg + 1
1363            IF( n_sequence_dlg > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
1364            crname_dlg(n_sequence_dlg) = cdname     ! keep the name of the calling routine
1365         ENDIF
1366      ELSE IF ( ncom_stp == nit000+2*ncom_freq ) THEN
1367         CALL ctl_opn( numcom, 'communication_report.txt', 'REPLACE', 'FORMATTED', 'SEQUENTIAL', -1, numout, .FALSE., narea )
1368         WRITE(numcom,*) ' '
1369         WRITE(numcom,*) ' ------------------------------------------------------------'
1370         WRITE(numcom,*) ' Communication pattern report (second oce+sbc+top time step):'
1371         WRITE(numcom,*) ' ------------------------------------------------------------'
1372         WRITE(numcom,*) ' '
1373         WRITE(numcom,'(A,I4)') ' Exchanged halos : ', n_sequence_lbc
1374         jj = 0; jk = 0; jf = 0; jh = 0
1375         DO ji = 1, n_sequence_lbc
1376            IF ( ncomm_sequence(ji,1) .GT. 1 ) jk = jk + 1
1377            IF ( ncomm_sequence(ji,2) .GT. 1 ) jf = jf + 1
1378            IF ( ncomm_sequence(ji,1) .GT. 1 .AND. ncomm_sequence(ji,2) .GT. 1 ) jj = jj + 1
1379            jh = MAX (jh, ncomm_sequence(ji,1)*ncomm_sequence(ji,2))
1380         END DO
1381         WRITE(numcom,'(A,I3)') ' 3D Exchanged halos : ', jk
1382         WRITE(numcom,'(A,I3)') ' Multi arrays exchanged halos : ', jf
1383         WRITE(numcom,'(A,I3)') '   from which 3D : ', jj
1384         WRITE(numcom,'(A,I10)') ' Array max size : ', jh*jpi*jpj
1385         WRITE(numcom,*) ' '
1386         WRITE(numcom,*) ' lbc_lnk called'
1387         DO ji = 1, n_sequence_lbc - 1
1388            IF ( crname_lbc(ji) /= 'already counted' ) THEN
1389               ccountname = crname_lbc(ji)
1390               crname_lbc(ji) = 'already counted'
1391               jcount = 1
1392               DO jj = ji + 1, n_sequence_lbc
1393                  IF ( ccountname ==  crname_lbc(jj) ) THEN
1394                     jcount = jcount + 1
1395                     crname_lbc(jj) = 'already counted'
1396                  END IF
1397               END DO
1398               WRITE(numcom,'(A, I4, A, A)') ' - ', jcount,' times by subroutine ', TRIM(ccountname)
1399            END IF
1400         END DO
1401         IF ( crname_lbc(n_sequence_lbc) /= 'already counted' ) THEN
1402            WRITE(numcom,'(A, I4, A, A)') ' - ', 1,' times by subroutine ', TRIM(crname_lbc(ncom_rec_max))
1403         END IF
1404         WRITE(numcom,*) ' '
1405         IF ( n_sequence_glb > 0 ) THEN
1406            WRITE(numcom,'(A,I4)') ' Global communications : ', n_sequence_glb
1407            jj = 1
1408            DO ji = 2, n_sequence_glb
1409               IF( crname_glb(ji-1) /= crname_glb(ji) ) THEN
1410                  WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_glb(ji-1))
1411                  jj = 0
1412               END IF
1413               jj = jj + 1
1414            END DO
1415            WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_glb(n_sequence_glb))
1416            DEALLOCATE(crname_glb)
1417         ELSE
1418            WRITE(numcom,*) ' No MPI global communication '
1419         ENDIF
1420         WRITE(numcom,*) ' '
1421         IF ( n_sequence_dlg > 0 ) THEN
1422            WRITE(numcom,'(A,I4)') ' Delayed global communications : ', n_sequence_dlg
1423            jj = 1
1424            DO ji = 2, n_sequence_dlg
1425               IF( crname_dlg(ji-1) /= crname_dlg(ji) ) THEN
1426                  WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_dlg(ji-1))
1427                  jj = 0
1428               END IF
1429               jj = jj + 1
1430            END DO
1431            WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_dlg(n_sequence_dlg))
1432            DEALLOCATE(crname_dlg)
1433         ELSE
1434            WRITE(numcom,*) ' No MPI delayed global communication '
1435         ENDIF
1436         WRITE(numcom,*) ' '
1437         WRITE(numcom,*) ' -----------------------------------------------'
1438         WRITE(numcom,*) ' '
1439         DEALLOCATE(ncomm_sequence)
1440         DEALLOCATE(crname_lbc)
1441      ENDIF
1442#endif
1443   END SUBROUTINE mpp_report
1444
1445
1446   SUBROUTINE tic_tac (ld_tic, ld_global)
1447
1448    LOGICAL,           INTENT(IN) :: ld_tic
1449    LOGICAL, OPTIONAL, INTENT(IN) :: ld_global
1450    REAL(dp), DIMENSION(2), SAVE :: tic_wt
1451    REAL(dp),               SAVE :: tic_ct = 0._dp
1452    INTEGER :: ii
1453#if defined key_mpp_mpi
1454
1455    IF( ncom_stp <= nit000 ) RETURN
1456    IF( ncom_stp == nitend ) RETURN
1457    ii = 1
1458    IF( PRESENT( ld_global ) ) THEN
1459       IF( ld_global ) ii = 2
1460    END IF
1461
1462    IF ( ld_tic ) THEN
1463       tic_wt(ii) = MPI_Wtime()                                                    ! start count tic->tac (waiting time)
1464       IF ( tic_ct > 0.0_dp ) compute_time = compute_time + MPI_Wtime() - tic_ct   ! cumulate count tac->tic
1465    ELSE
1466       waiting_time(ii) = waiting_time(ii) + MPI_Wtime() - tic_wt(ii)              ! cumulate count tic->tac
1467       tic_ct = MPI_Wtime()                                                        ! start count tac->tic (waiting time)
1468    ENDIF
1469#endif
1470
1471   END SUBROUTINE tic_tac
1472
1473#if ! defined key_mpp_mpi
1474   SUBROUTINE mpi_wait(request, status, ierror)
1475      INTEGER                            , INTENT(in   ) ::   request
1476      INTEGER, DIMENSION(MPI_STATUS_SIZE), INTENT(  out) ::   status
1477      INTEGER                            , INTENT(  out) ::   ierror
1478   END SUBROUTINE mpi_wait
1479
1480
1481   FUNCTION MPI_Wtime()
1482      REAL(wp) ::  MPI_Wtime
1483      MPI_Wtime = -1.
1484   END FUNCTION MPI_Wtime
1485#endif
1486
1487   !!----------------------------------------------------------------------
1488   !!   ctl_stop, ctl_warn, get_unit, ctl_opn, ctl_nam, load_nml   routines
1489   !!----------------------------------------------------------------------
1490
1491   SUBROUTINE ctl_stop( cd1, cd2, cd3, cd4, cd5 ,   &
1492      &                 cd6, cd7, cd8, cd9, cd10 )
1493      !!----------------------------------------------------------------------
1494      !!                  ***  ROUTINE  stop_opa  ***
1495      !!
1496      !! ** Purpose :   print in ocean.outpput file a error message and
1497      !!                increment the error number (nstop) by one.
1498      !!----------------------------------------------------------------------
1499      CHARACTER(len=*), INTENT(in   )           ::   cd1
1500      CHARACTER(len=*), INTENT(in   ), OPTIONAL ::        cd2, cd3, cd4, cd5
1501      CHARACTER(len=*), INTENT(in   ), OPTIONAL ::   cd6, cd7, cd8, cd9, cd10
1502      !
1503      CHARACTER(LEN=8) ::   clfmt            ! writing format
1504      INTEGER          ::   inum
1505      !!----------------------------------------------------------------------
1506      !
1507      nstop = nstop + 1
1508      !
1509      IF( cd1 == 'STOP' .AND. narea /= 1 ) THEN    ! Immediate stop: add an arror message in 'ocean.output' file
1510         CALL ctl_opn( inum, 'ocean.output', 'APPEND', 'FORMATTED', 'SEQUENTIAL', -1, 6, .FALSE. )
1511         WRITE(inum,*)
1512         WRITE(inum,*) ' ==>>>   Look for "E R R O R" messages in all existing *ocean.output* files'
1513         CLOSE(inum)
1514      ENDIF
1515      IF( numout == 6 ) THEN                       ! force to open ocean.output file if not already opened
1516         CALL ctl_opn( numout, 'ocean.output', 'REPLACE', 'FORMATTED', 'SEQUENTIAL', -1, -1, .FALSE., narea )
1517      ENDIF
1518      !
1519                            WRITE(numout,*)
1520                            WRITE(numout,*) ' ===>>> : E R R O R'
1521                            WRITE(numout,*)
1522                            WRITE(numout,*) '         ==========='
1523                            WRITE(numout,*)
1524                            WRITE(numout,*) TRIM(cd1)
1525      IF( PRESENT(cd2 ) )   WRITE(numout,*) TRIM(cd2)
1526      IF( PRESENT(cd3 ) )   WRITE(numout,*) TRIM(cd3)
1527      IF( PRESENT(cd4 ) )   WRITE(numout,*) TRIM(cd4)
1528      IF( PRESENT(cd5 ) )   WRITE(numout,*) TRIM(cd5)
1529      IF( PRESENT(cd6 ) )   WRITE(numout,*) TRIM(cd6)
1530      IF( PRESENT(cd7 ) )   WRITE(numout,*) TRIM(cd7)
1531      IF( PRESENT(cd8 ) )   WRITE(numout,*) TRIM(cd8)
1532      IF( PRESENT(cd9 ) )   WRITE(numout,*) TRIM(cd9)
1533      IF( PRESENT(cd10) )   WRITE(numout,*) TRIM(cd10)
1534                            WRITE(numout,*)
1535      !
1536                               CALL FLUSH(numout    )
1537      IF( numstp     /= -1 )   CALL FLUSH(numstp    )
1538      IF( numrun     /= -1 )   CALL FLUSH(numrun    )
1539      IF( numevo_ice /= -1 )   CALL FLUSH(numevo_ice)
1540      !
1541      IF( cd1 == 'STOP' ) THEN
1542         WRITE(numout,*)
1543         WRITE(numout,*)  'huge E-R-R-O-R : immediate stop'
1544         WRITE(numout,*)
1545         CALL FLUSH(numout)
1546         CALL SLEEP(60)   ! make sure that all output and abort files are written by all cores. 60s should be enough...
1547         CALL mppstop( ld_abort = .true. )
1548      ENDIF
1549      !
1550   END SUBROUTINE ctl_stop
1551
1552
1553   SUBROUTINE ctl_warn( cd1, cd2, cd3, cd4, cd5,   &
1554      &                 cd6, cd7, cd8, cd9, cd10 )
1555      !!----------------------------------------------------------------------
1556      !!                  ***  ROUTINE  stop_warn  ***
1557      !!
1558      !! ** Purpose :   print in ocean.outpput file a error message and
1559      !!                increment the warning number (nwarn) by one.
1560      !!----------------------------------------------------------------------
1561      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd1, cd2, cd3, cd4, cd5
1562      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd6, cd7, cd8, cd9, cd10
1563      !!----------------------------------------------------------------------
1564      !
1565      nwarn = nwarn + 1
1566      !
1567      IF(lwp) THEN
1568                               WRITE(numout,*)
1569                               WRITE(numout,*) ' ===>>> : W A R N I N G'
1570                               WRITE(numout,*)
1571                               WRITE(numout,*) '         ==============='
1572                               WRITE(numout,*)
1573         IF( PRESENT(cd1 ) )   WRITE(numout,*) TRIM(cd1)
1574         IF( PRESENT(cd2 ) )   WRITE(numout,*) TRIM(cd2)
1575         IF( PRESENT(cd3 ) )   WRITE(numout,*) TRIM(cd3)
1576         IF( PRESENT(cd4 ) )   WRITE(numout,*) TRIM(cd4)
1577         IF( PRESENT(cd5 ) )   WRITE(numout,*) TRIM(cd5)
1578         IF( PRESENT(cd6 ) )   WRITE(numout,*) TRIM(cd6)
1579         IF( PRESENT(cd7 ) )   WRITE(numout,*) TRIM(cd7)
1580         IF( PRESENT(cd8 ) )   WRITE(numout,*) TRIM(cd8)
1581         IF( PRESENT(cd9 ) )   WRITE(numout,*) TRIM(cd9)
1582         IF( PRESENT(cd10) )   WRITE(numout,*) TRIM(cd10)
1583                               WRITE(numout,*)
1584      ENDIF
1585      CALL FLUSH(numout)
1586      !
1587   END SUBROUTINE ctl_warn
1588
1589
1590   SUBROUTINE ctl_opn( knum, cdfile, cdstat, cdform, cdacce, klengh, kout, ldwp, karea )
1591      !!----------------------------------------------------------------------
1592      !!                  ***  ROUTINE ctl_opn  ***
1593      !!
1594      !! ** Purpose :   Open file and check if required file is available.
1595      !!
1596      !! ** Method  :   Fortan open
1597      !!----------------------------------------------------------------------
1598      INTEGER          , INTENT(  out) ::   knum      ! logical unit to open
1599      CHARACTER(len=*) , INTENT(in   ) ::   cdfile    ! file name to open
1600      CHARACTER(len=*) , INTENT(in   ) ::   cdstat    ! disposition specifier
1601      CHARACTER(len=*) , INTENT(in   ) ::   cdform    ! formatting specifier
1602      CHARACTER(len=*) , INTENT(in   ) ::   cdacce    ! access specifier
1603      INTEGER          , INTENT(in   ) ::   klengh    ! record length
1604      INTEGER          , INTENT(in   ) ::   kout      ! number of logical units for write
1605      LOGICAL          , INTENT(in   ) ::   ldwp      ! boolean term for print
1606      INTEGER, OPTIONAL, INTENT(in   ) ::   karea     ! proc number
1607      !
1608      CHARACTER(len=80) ::   clfile
1609      CHARACTER(LEN=10) ::   clfmt            ! writing format
1610      INTEGER           ::   iost
1611      INTEGER           ::   idg              ! number of digits
1612      !!----------------------------------------------------------------------
1613      !
1614      ! adapt filename
1615      ! ----------------
1616      clfile = TRIM(cdfile)
1617      IF( PRESENT( karea ) ) THEN
1618         IF( karea > 1 ) THEN
1619            ! Warning: jpnij is maybe not already defined when calling ctl_opn -> use mppsize instead of jpnij
1620            idg = MAX( INT(LOG10(REAL(MAX(1,mppsize-1),wp))) + 1, 4 )      ! how many digits to we need to write? min=4, max=9
1621            WRITE(clfmt, "('(a,a,i', i1, '.', i1, ')')") idg, idg          ! '(a,a,ix.x)'
1622            WRITE(clfile, clfmt) TRIM(clfile), '_', karea-1
1623         ENDIF
1624      ENDIF
1625#if defined key_agrif
1626      IF( .NOT. Agrif_Root() )   clfile = TRIM(Agrif_CFixed())//'_'//TRIM(clfile)
1627      knum=Agrif_Get_Unit()
1628#else
1629      knum=get_unit()
1630#endif
1631      IF( TRIM(cdfile) == '/dev/null' )   clfile = TRIM(cdfile)   ! force the use of /dev/null
1632      !
1633      IF(       cdacce(1:6) == 'DIRECT' )  THEN   ! cdacce has always more than 6 characters
1634         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS=cdstat, RECL=klengh         , ERR=100, IOSTAT=iost )
1635      ELSE IF( TRIM(cdstat) == 'APPEND' )  THEN   ! cdstat can have less than 6 characters
1636         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS='UNKNOWN', POSITION='APPEND', ERR=100, IOSTAT=iost )
1637      ELSE
1638         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS=cdstat                      , ERR=100, IOSTAT=iost )
1639      ENDIF
1640      IF( iost /= 0 .AND. TRIM(clfile) == '/dev/null' ) &   ! for windows
1641         &  OPEN(UNIT=knum,FILE='NUL', FORM=cdform, ACCESS=cdacce, STATUS=cdstat                      , ERR=100, IOSTAT=iost )
1642      IF( iost == 0 ) THEN
1643         IF(ldwp .AND. kout > 0) THEN
1644            WRITE(kout,*) '     file   : ', TRIM(clfile),' open ok'
1645            WRITE(kout,*) '     unit   = ', knum
1646            WRITE(kout,*) '     status = ', cdstat
1647            WRITE(kout,*) '     form   = ', cdform
1648            WRITE(kout,*) '     access = ', cdacce
1649            WRITE(kout,*)
1650         ENDIF
1651      ENDIF
1652100   CONTINUE
1653      IF( iost /= 0 ) THEN
1654         WRITE(ctmp1,*) ' ===>>>> : bad opening file: ', TRIM(clfile)
1655         WRITE(ctmp2,*) ' =======   ===  '
1656         WRITE(ctmp3,*) '           unit   = ', knum
1657         WRITE(ctmp4,*) '           status = ', cdstat
1658         WRITE(ctmp5,*) '           form   = ', cdform
1659         WRITE(ctmp6,*) '           access = ', cdacce
1660         WRITE(ctmp7,*) '           iostat = ', iost
1661         WRITE(ctmp8,*) '           we stop. verify the file '
1662         CALL ctl_stop( 'STOP', ctmp1, ctmp2, ctmp3, ctmp4, ctmp5, ctmp6, ctmp7, ctmp8 )
1663      ENDIF
1664      !
1665   END SUBROUTINE ctl_opn
1666
1667
1668   SUBROUTINE ctl_nam ( kios, cdnam )
1669      !!----------------------------------------------------------------------
1670      !!                  ***  ROUTINE ctl_nam  ***
1671      !!
1672      !! ** Purpose :   Informations when error while reading a namelist
1673      !!
1674      !! ** Method  :   Fortan open
1675      !!----------------------------------------------------------------------
1676      INTEGER                                , INTENT(inout) ::   kios    ! IO status after reading the namelist
1677      CHARACTER(len=*)                       , INTENT(in   ) ::   cdnam   ! group name of namelist for which error occurs
1678      !
1679      CHARACTER(len=5) ::   clios   ! string to convert iostat in character for print
1680      !!----------------------------------------------------------------------
1681      !
1682      WRITE (clios, '(I5.0)')   kios
1683      IF( kios < 0 ) THEN
1684         CALL ctl_warn( 'end of record or file while reading namelist '   &
1685            &           // TRIM(cdnam) // ' iostat = ' // TRIM(clios) )
1686      ENDIF
1687      !
1688      IF( kios > 0 ) THEN
1689         CALL ctl_stop( 'misspelled variable in namelist '   &
1690            &           // TRIM(cdnam) // ' iostat = ' // TRIM(clios) )
1691      ENDIF
1692      kios = 0
1693      !
1694   END SUBROUTINE ctl_nam
1695
1696
1697   INTEGER FUNCTION get_unit()
1698      !!----------------------------------------------------------------------
1699      !!                  ***  FUNCTION  get_unit  ***
1700      !!
1701      !! ** Purpose :   return the index of an unused logical unit
1702      !!----------------------------------------------------------------------
1703      LOGICAL :: llopn
1704      !!----------------------------------------------------------------------
1705      !
1706      get_unit = 15   ! choose a unit that is big enough then it is not already used in NEMO
1707      llopn = .TRUE.
1708      DO WHILE( (get_unit < 998) .AND. llopn )
1709         get_unit = get_unit + 1
1710         INQUIRE( unit = get_unit, opened = llopn )
1711      END DO
1712      IF( (get_unit == 999) .AND. llopn ) THEN
1713         CALL ctl_stop( 'STOP', 'get_unit: All logical units until 999 are used...' )
1714      ENDIF
1715      !
1716   END FUNCTION get_unit
1717
1718   SUBROUTINE load_nml( cdnambuff , cdnamfile, kout, ldwp)
1719      CHARACTER(LEN=:)    , ALLOCATABLE, INTENT(INOUT) :: cdnambuff
1720      CHARACTER(LEN=*), INTENT(IN )                :: cdnamfile
1721      CHARACTER(LEN=256)                           :: chline
1722      CHARACTER(LEN=1)                             :: csp
1723      INTEGER, INTENT(IN)                          :: kout
1724      LOGICAL, INTENT(IN)                          :: ldwp  !: .true. only for the root broadcaster
1725      INTEGER                                      :: itot, iun, iltc, inl, ios, itotsav
1726      !
1727      !csp = NEW_LINE('A')
1728      ! a new line character is the best seperator but some systems (e.g.Cray)
1729      ! seem to terminate namelist reads from internal files early if they
1730      ! encounter new-lines. Use a single space for safety.
1731      csp = ' '
1732      !
1733      ! Check if the namelist buffer has already been allocated. Return if it has.
1734      !
1735      IF ( ALLOCATED( cdnambuff ) ) RETURN
1736      IF( ldwp ) THEN
1737         !
1738         ! Open namelist file
1739         !
1740         CALL ctl_opn( iun, cdnamfile, 'OLD', 'FORMATTED', 'SEQUENTIAL', -1, kout, ldwp )
1741         !
1742         ! First pass: count characters excluding comments and trimable white space
1743         !
1744         itot=0
1745     10  READ(iun,'(A256)',END=20,ERR=20) chline
1746         iltc = LEN_TRIM(chline)
1747         IF ( iltc.GT.0 ) THEN
1748          inl = INDEX(chline, '!')
1749          IF( inl.eq.0 ) THEN
1750           itot = itot + iltc + 1                                ! +1 for the newline character
1751          ELSEIF( inl.GT.0 .AND. LEN_TRIM( chline(1:inl-1) ).GT.0 ) THEN
1752           itot = itot + inl                                  !  includes +1 for the newline character
1753          ENDIF
1754         ENDIF
1755         GOTO 10
1756     20  CONTINUE
1757         !
1758         ! Allocate text cdnambuff for condensed namelist
1759         !
1760!$AGRIF_DO_NOT_TREAT
1761         ALLOCATE( CHARACTER(LEN=itot) :: cdnambuff )
1762!$AGRIF_END_DO_NOT_TREAT
1763         itotsav = itot
1764         !
1765         ! Second pass: read and transfer pruned characters into cdnambuff
1766         !
1767         REWIND(iun)
1768         itot=1
1769     30  READ(iun,'(A256)',END=40,ERR=40) chline
1770         iltc = LEN_TRIM(chline)
1771         IF ( iltc.GT.0 ) THEN
1772          inl = INDEX(chline, '!')
1773          IF( inl.eq.0 ) THEN
1774           inl = iltc
1775          ELSE
1776           inl = inl - 1
1777          ENDIF
1778          IF( inl.GT.0 .AND. LEN_TRIM( chline(1:inl) ).GT.0 ) THEN
1779             cdnambuff(itot:itot+inl-1) = chline(1:inl)
1780             WRITE( cdnambuff(itot+inl:itot+inl), '(a)' ) csp
1781             itot = itot + inl + 1
1782          ENDIF
1783         ENDIF
1784         GOTO 30
1785     40  CONTINUE
1786         itot = itot - 1
1787         IF( itotsav .NE. itot ) WRITE(*,*) 'WARNING in load_nml. Allocated ',itotsav,' for read buffer; but used ',itot
1788         !
1789         ! Close namelist file
1790         !
1791         CLOSE(iun)
1792         !write(*,'(32A)') cdnambuff
1793      ENDIF
1794#if defined key_mpp_mpi
1795      CALL mpp_bcast_nml( cdnambuff, itot )
1796#endif
1797  END SUBROUTINE load_nml
1798
1799
1800   !!----------------------------------------------------------------------
1801END MODULE lib_mpp
Note: See TracBrowser for help on using the repository browser.