New URL for NEMO forge!   http://forge.nemo-ocean.eu

Since March 2022 along with NEMO 4.2 release, the code development moved to a self-hosted GitLab.
This present forge is now archived and remained online for history.
lib_mpp.F90 in NEMO/branches/2021/ticket2680_C1D_PAPA/src/OCE/LBC – NEMO

source: NEMO/branches/2021/ticket2680_C1D_PAPA/src/OCE/LBC/lib_mpp.F90 @ 14984

Last change on this file since 14984 was 14984, checked in by smasson, 3 years ago

ticket2680_C1D_PAPA: fix to work with key_mpi_off, #2680

  • Property svn:keywords set to Id
File size: 74.2 KB
Line 
1MODULE lib_mpp
2   !!======================================================================
3   !!                       ***  MODULE  lib_mpp  ***
4   !! Ocean numerics:  massively parallel processing library
5   !!=====================================================================
6   !! History :  OPA  !  1994  (M. Guyon, J. Escobar, M. Imbard)  Original code
7   !!            7.0  !  1997  (A.M. Treguier)  SHMEM additions
8   !!            8.0  !  1998  (M. Imbard, J. Escobar, L. Colombet ) SHMEM and MPI
9   !!                 !  1998  (J.M. Molines) Open boundary conditions
10   !!   NEMO     1.0  !  2003  (J.M. Molines, G. Madec)  F90, free form
11   !!                 !  2003  (J.M. Molines) add mpp_ini_north(_3d,_2d)
12   !!             -   !  2004  (R. Bourdalle Badie)  isend option in mpi
13   !!                 !  2004  (J.M. Molines) minloc, maxloc
14   !!             -   !  2005  (G. Madec, S. Masson)  npolj=5,6 F-point & ice cases
15   !!             -   !  2005  (R. Redler) Replacement of MPI_COMM_WORLD except for MPI_Abort
16   !!             -   !  2005  (R. Benshila, G. Madec)  add extra halo case
17   !!             -   !  2008  (R. Benshila) add mpp_ini_ice
18   !!            3.2  !  2009  (R. Benshila) SHMEM suppression, north fold in lbc_nfd
19   !!            3.2  !  2009  (O. Marti)    add mpp_ini_znl
20   !!            4.0  !  2011  (G. Madec)  move ctl_ routines from in_out_manager
21   !!            3.5  !  2012  (S.Mocavero, I. Epicoco) Add mpp_lnk_bdy_3d/2d routines to optimize the BDY comm.
22   !!            3.5  !  2013  (C. Ethe, G. Madec)  message passing arrays as local variables
23   !!            3.5  !  2013  (S.Mocavero, I.Epicoco - CMCC) north fold optimizations
24   !!            3.6  !  2015  (O. Tintó and M. Castrillo - BSC) Added '_multiple' case for 2D lbc and max
25   !!            4.0  !  2017  (G. Madec) automatique allocation of array argument (use any 3rd dimension)
26   !!             -   !  2017  (G. Madec) create generic.h90 files to generate all lbc and north fold routines
27   !!----------------------------------------------------------------------
28
29   !!----------------------------------------------------------------------
30   !!   ctl_stop      : update momentum and tracer Kz from a tke scheme
31   !!   ctl_warn      : initialization, namelist read, and parameters control
32   !!   ctl_opn       : Open file and check if required file is available.
33   !!   ctl_nam       : Prints informations when an error occurs while reading a namelist
34   !!   load_nml      : Read, condense and buffer namelist file into character array for use as an internal file
35   !!----------------------------------------------------------------------
36   !!----------------------------------------------------------------------
37   !!   mpp_start     : get local communicator its size and rank
38   !!   mpp_lnk       : interface (defined in lbclnk) for message passing of 2d or 3d arrays (mpp_lnk_2d, mpp_lnk_3d)
39   !!   mpp_lnk_icb   : interface for message passing of 2d arrays with extra halo for icebergs (mpp_lnk_2d_icb)
40   !!   mpprecv       :
41   !!   mppsend       :
42   !!   mppscatter    :
43   !!   mppgather     :
44   !!   mpp_min       : generic interface for mppmin_int , mppmin_a_int , mppmin_real, mppmin_a_real
45   !!   mpp_max       : generic interface for mppmax_int , mppmax_a_int , mppmax_real, mppmax_a_real
46   !!   mpp_sum       : generic interface for mppsum_int , mppsum_a_int , mppsum_real, mppsum_a_real
47   !!   mpp_minloc    :
48   !!   mpp_maxloc    :
49   !!   mppsync       :
50   !!   mppstop       :
51   !!   mpp_ini_north : initialisation of north fold
52   !!   mpp_lbc_north_icb : alternative to mpp_nfd for extra outer halo with icebergs
53   !!   mpp_bcast_nml : broadcast/receive namelist character buffer from reading process to all others
54   !!----------------------------------------------------------------------
55   USE dom_oce        ! ocean space and time domain
56   USE in_out_manager ! I/O manager
57#if ! defined key_mpi_off
58   USE MPI
59#endif
60
61   IMPLICIT NONE
62   PRIVATE
63   !
64   PUBLIC   ctl_stop, ctl_warn, ctl_opn, ctl_nam, load_nml
65   PUBLIC   mpp_start, mppstop, mppsync, mpp_comm_free
66   PUBLIC   mpp_ini_north
67   PUBLIC   mpp_min, mpp_max, mpp_sum, mpp_minloc, mpp_maxloc
68   PUBLIC   mpp_delay_max, mpp_delay_sum, mpp_delay_rcv
69   PUBLIC   mppscatter, mppgather
70   PUBLIC   mpp_ini_znl
71   PUBLIC   mpp_ini_nc
72   PUBLIC   mppsend, mpprecv                          ! needed by TAM and ICB routines
73   PUBLIC   mppsend_sp, mpprecv_sp                          ! needed by TAM and ICB routines
74   PUBLIC   mppsend_dp, mpprecv_dp                          ! needed by TAM and ICB routines
75   PUBLIC   mpp_report
76   PUBLIC   mpp_bcast_nml
77   PUBLIC   tic_tac
78#if defined key_mpi_off
79   PUBLIC   MPI_wait
80   PUBLIC   MPI_waitall
81   PUBLIC   MPI_Wtime
82#endif
83
84   !! * Interfaces
85   !! define generic interface for these routine as they are called sometimes
86   !! with scalar arguments instead of array arguments, which causes problems
87   !! for the compilation on AIX system as well as NEC and SGI. Ok on COMPACQ
88   INTERFACE mpp_min
89      MODULE PROCEDURE mppmin_a_int, mppmin_int
90      MODULE PROCEDURE mppmin_a_real_sp, mppmin_real_sp
91      MODULE PROCEDURE mppmin_a_real_dp, mppmin_real_dp
92   END INTERFACE
93   INTERFACE mpp_max
94      MODULE PROCEDURE mppmax_a_int, mppmax_int
95      MODULE PROCEDURE mppmax_a_real_sp, mppmax_real_sp
96      MODULE PROCEDURE mppmax_a_real_dp, mppmax_real_dp
97   END INTERFACE
98   INTERFACE mpp_sum
99      MODULE PROCEDURE mppsum_a_int, mppsum_int
100      MODULE PROCEDURE mppsum_realdd, mppsum_a_realdd
101      MODULE PROCEDURE mppsum_a_real_sp, mppsum_real_sp
102      MODULE PROCEDURE mppsum_a_real_dp, mppsum_real_dp
103   END INTERFACE
104   INTERFACE mpp_minloc
105      MODULE PROCEDURE mpp_minloc2d_sp ,mpp_minloc3d_sp
106      MODULE PROCEDURE mpp_minloc2d_dp ,mpp_minloc3d_dp
107   END INTERFACE
108   INTERFACE mpp_maxloc
109      MODULE PROCEDURE mpp_maxloc2d_sp ,mpp_maxloc3d_sp
110      MODULE PROCEDURE mpp_maxloc2d_dp ,mpp_maxloc3d_dp
111   END INTERFACE
112
113   TYPE, PUBLIC ::   PTR_4D_sp   !: array of 4D pointers (used in lbclnk and lbcnfd)
114      REAL(sp), DIMENSION (:,:,:,:), POINTER ::   pt4d
115   END TYPE PTR_4D_sp
116
117   TYPE, PUBLIC ::   PTR_4D_dp   !: array of 4D pointers (used in lbclnk and lbcnfd)
118      REAL(dp), DIMENSION (:,:,:,:), POINTER ::   pt4d
119   END TYPE PTR_4D_dp
120
121   !! ========================= !!
122   !!  MPI  variable definition !!
123   !! ========================= !!
124#if ! defined key_mpi_off
125   LOGICAL, PUBLIC, PARAMETER ::   lk_mpp = .TRUE.    !: mpp flag
126#else
127   INTEGER, PUBLIC, PARAMETER ::   MPI_STATUS_SIZE = 1
128   INTEGER, PUBLIC, PARAMETER ::   MPI_REAL = 4
129   INTEGER, PUBLIC, PARAMETER ::   MPI_DOUBLE_PRECISION = 8
130   INTEGER, PUBLIC, PARAMETER ::   MPI_REQUEST_NULL = 1
131   LOGICAL, PUBLIC, PARAMETER ::   lk_mpp = .FALSE.    !: mpp flag
132   INTEGER, PUBLIC, DIMENSION(MPI_STATUS_SIZE) ::   MPI_STATUS_IGNORE = 1     ! out from mpi_wait
133   INTEGER, PUBLIC, DIMENSION(MPI_STATUS_SIZE) ::   MPI_STATUSES_IGNORE = 1   ! out from mpi_waitall
134#endif
135
136   INTEGER, PUBLIC ::   mppsize        ! number of process
137   INTEGER, PUBLIC ::   mpprank        ! process number  [ 0 - size-1 ]
138!$AGRIF_DO_NOT_TREAT
139   INTEGER, PUBLIC ::   mpi_comm_oce   ! opa local communicator
140!$AGRIF_END_DO_NOT_TREAT
141
142   INTEGER :: MPI_SUMDD
143
144   ! Neighbourgs informations
145   INTEGER,    PARAMETER, PUBLIC ::   n_hlsmax = 3
146   INTEGER, DIMENSION(         8), PUBLIC ::   mpinei      !: 8-neighbourg MPI indexes (starting at 0, -1 if no neighbourg)
147   INTEGER, DIMENSION(n_hlsmax,8), PUBLIC ::   mpiSnei     !: 8-neighbourg Send MPI indexes (starting at 0, -1 if no neighbourg)
148   INTEGER, DIMENSION(n_hlsmax,8), PUBLIC ::   mpiRnei     !: 8-neighbourg Recv MPI indexes (starting at 0, -1 if no neighbourg)
149   INTEGER,    PARAMETER, PUBLIC ::   jpwe = 1   !: WEst
150   INTEGER,    PARAMETER, PUBLIC ::   jpea = 2   !: EAst
151   INTEGER,    PARAMETER, PUBLIC ::   jpso = 3   !: SOuth
152   INTEGER,    PARAMETER, PUBLIC ::   jpno = 4   !: NOrth
153   INTEGER,    PARAMETER, PUBLIC ::   jpsw = 5   !: South-West
154   INTEGER,    PARAMETER, PUBLIC ::   jpse = 6   !: South-East
155   INTEGER,    PARAMETER, PUBLIC ::   jpnw = 7   !: North-West
156   INTEGER,    PARAMETER, PUBLIC ::   jpne = 8   !: North-East
157
158   LOGICAL, DIMENSION(8), PUBLIC ::   l_SelfPerio  !   should we explicitely take care of I/J periodicity
159   LOGICAL,               PUBLIC ::   l_IdoNFold
160
161   ! variables used for zonal integration
162   INTEGER, PUBLIC ::   ncomm_znl         !: communicator made by the processors on the same zonal average
163   LOGICAL, PUBLIC ::   l_znl_root        !: True on the 'left'most processor on the same row
164   INTEGER         ::   ngrp_znl          !: group ID for the znl processors
165   INTEGER         ::   ndim_rank_znl     !: number of processors on the same zonal average
166   INTEGER, DIMENSION(:), ALLOCATABLE, SAVE ::   nrank_znl  ! dimension ndim_rank_znl, number of the procs into the same znl domain
167
168   ! variables used for MPI3 neighbourhood collectives
169   INTEGER, DIMENSION(n_hlsmax), PUBLIC ::   mpi_nc_com4       ! MPI3 neighbourhood collectives communicator
170   INTEGER, DIMENSION(n_hlsmax), PUBLIC ::   mpi_nc_com8       ! MPI3 neighbourhood collectives communicator (with diagionals)
171
172   ! North fold condition in mpp_mpi with jpni > 1 (PUBLIC for TAM)
173   INTEGER, PUBLIC ::   ngrp_world        !: group ID for the world processors
174   INTEGER, PUBLIC ::   ngrp_opa          !: group ID for the opa processors
175   INTEGER, PUBLIC ::   ngrp_north        !: group ID for the northern processors (to be fold)
176   INTEGER, PUBLIC ::   ncomm_north       !: communicator made by the processors belonging to ngrp_north
177   INTEGER, PUBLIC ::   ndim_rank_north   !: number of 'sea' processor in the northern line (can be /= jpni !)
178   INTEGER, PUBLIC ::   njmppmax          !: value of njmpp for the processors of the northern line
179   INTEGER, PUBLIC ::   north_root        !: number (in the comm_opa) of proc 0 in the northern comm
180   INTEGER, PUBLIC, DIMENSION(:), ALLOCATABLE, SAVE ::   nrank_north   !: dimension ndim_rank_north
181
182   ! Communications summary report
183   CHARACTER(len=lca), DIMENSION(:), ALLOCATABLE ::   crname_lbc                   !: names of lbc_lnk calling routines
184   CHARACTER(len=lca), DIMENSION(:), ALLOCATABLE ::   crname_glb                   !: names of global comm calling routines
185   CHARACTER(len=lca), DIMENSION(:), ALLOCATABLE ::   crname_dlg                   !: names of delayed global comm calling routines
186   INTEGER, PUBLIC                               ::   ncom_stp = 0                 !: copy of time step # istp
187   INTEGER, PUBLIC                               ::   ncom_fsbc = 1                !: copy of sbc time step # nn_fsbc
188   INTEGER, PUBLIC                               ::   ncom_freq                    !: frequency of comm diagnostic
189   INTEGER, PUBLIC , DIMENSION(:,:), ALLOCATABLE ::   ncomm_sequence               !: size of communicated arrays (halos)
190   INTEGER, PARAMETER, PUBLIC                    ::   ncom_rec_max = 5000          !: max number of communication record
191   INTEGER, PUBLIC                               ::   n_sequence_lbc = 0           !: # of communicated arraysvia lbc
192   INTEGER, PUBLIC                               ::   n_sequence_glb = 0           !: # of global communications
193   INTEGER, PUBLIC                               ::   n_sequence_dlg = 0           !: # of delayed global communications
194   INTEGER, PUBLIC                               ::   numcom = -1                  !: logical unit for communicaton report
195   LOGICAL, PUBLIC                               ::   l_full_nf_update = .TRUE.    !: logical for a full (2lines) update of bc at North fold report
196   INTEGER,                    PARAMETER, PUBLIC ::   nbdelay = 2       !: number of delayed operations
197   !: name (used as id) of allreduce-delayed operations
198   ! Warning: we must use the same character length in an array constructor (at least for gcc compiler)
199   CHARACTER(len=32), DIMENSION(nbdelay), PUBLIC ::   c_delaylist = (/ 'cflice', 'fwb   ' /)
200   !: component name where the allreduce-delayed operation is performed
201   CHARACTER(len=3),  DIMENSION(nbdelay), PUBLIC ::   c_delaycpnt = (/ 'ICE'   , 'OCE' /)
202   TYPE, PUBLIC ::   DELAYARR
203      REAL(   wp), POINTER, DIMENSION(:) ::  z1d => NULL()
204      COMPLEX(dp), POINTER, DIMENSION(:) ::  y1d => NULL()
205   END TYPE DELAYARR
206   TYPE( DELAYARR ), DIMENSION(nbdelay), PUBLIC, SAVE  ::   todelay         !: must have SAVE for default initialization of DELAYARR
207   INTEGER,          DIMENSION(nbdelay), PUBLIC        ::   ndelayid = -1   !: mpi request id of the delayed operations
208
209   ! timing summary report
210   REAL(dp), DIMENSION(2), PUBLIC ::  waiting_time = 0._dp
211   REAL(dp)              , PUBLIC ::  compute_time = 0._dp, elapsed_time = 0._dp
212
213   REAL(wp), DIMENSION(:), ALLOCATABLE, SAVE ::   tampon   ! buffer in case of bsend
214
215   LOGICAL, PUBLIC ::   ln_nnogather                !: namelist control of northfold comms
216   INTEGER, PUBLIC ::   nn_comm                     !: namelist control of comms
217
218   INTEGER, PUBLIC, PARAMETER ::   jpfillnothing = 1
219   INTEGER, PUBLIC, PARAMETER ::   jpfillcst     = 2
220   INTEGER, PUBLIC, PARAMETER ::   jpfillcopy    = 3
221   INTEGER, PUBLIC, PARAMETER ::   jpfillperio   = 4
222   INTEGER, PUBLIC, PARAMETER ::   jpfillmpi     = 5
223
224   !! * Substitutions
225#  include "do_loop_substitute.h90"
226   !!----------------------------------------------------------------------
227   !! NEMO/OCE 4.0 , NEMO Consortium (2018)
228   !! $Id$
229   !! Software governed by the CeCILL license (see ./LICENSE)
230   !!----------------------------------------------------------------------
231CONTAINS
232
233   SUBROUTINE mpp_start( localComm )
234      !!----------------------------------------------------------------------
235      !!                  ***  routine mpp_start  ***
236      !!
237      !! ** Purpose :   get mpi_comm_oce, mpprank and mppsize
238      !!----------------------------------------------------------------------
239      INTEGER         , OPTIONAL   , INTENT(in   ) ::   localComm    !
240      !
241      INTEGER ::   ierr
242      LOGICAL ::   llmpi_init
243      !!----------------------------------------------------------------------
244#if ! defined key_mpi_off
245      !
246      CALL mpi_initialized ( llmpi_init, ierr )
247      IF( ierr /= MPI_SUCCESS ) CALL ctl_stop( 'STOP', ' lib_mpp: Error in routine mpi_initialized' )
248
249      IF( .NOT. llmpi_init ) THEN
250         IF( PRESENT(localComm) ) THEN
251            WRITE(ctmp1,*) ' lib_mpp: You cannot provide a local communicator '
252            WRITE(ctmp2,*) '          without calling MPI_Init before ! '
253            CALL ctl_stop( 'STOP', ctmp1, ctmp2 )
254         ENDIF
255         CALL mpi_init( ierr )
256         IF( ierr /= MPI_SUCCESS ) CALL ctl_stop( 'STOP', ' lib_mpp: Error in routine mpi_init' )
257      ENDIF
258
259      IF( PRESENT(localComm) ) THEN
260         IF( Agrif_Root() ) THEN
261            mpi_comm_oce = localComm
262         ENDIF
263      ELSE
264         CALL mpi_comm_dup( mpi_comm_world, mpi_comm_oce, ierr)
265         IF( ierr /= MPI_SUCCESS ) CALL ctl_stop( 'STOP', ' lib_mpp: Error in routine mpi_comm_dup' )
266      ENDIF
267
268# if defined key_agrif
269      IF( Agrif_Root() ) THEN
270         CALL Agrif_MPI_Init(mpi_comm_oce)
271      ELSE
272         CALL Agrif_MPI_set_grid_comm(mpi_comm_oce)
273      ENDIF
274# endif
275
276      CALL mpi_comm_rank( mpi_comm_oce, mpprank, ierr )
277      CALL mpi_comm_size( mpi_comm_oce, mppsize, ierr )
278      !
279      CALL MPI_OP_CREATE(DDPDD_MPI, .TRUE., MPI_SUMDD, ierr)
280      !
281#else
282      IF( PRESENT( localComm ) ) mpi_comm_oce = localComm
283      mppsize = 1
284      mpprank = 0
285#endif
286   END SUBROUTINE mpp_start
287
288
289   SUBROUTINE mppsend( ktyp, pmess, kbytes, kdest, md_req )
290      !!----------------------------------------------------------------------
291      !!                  ***  routine mppsend  ***
292      !!
293      !! ** Purpose :   Send messag passing array
294      !!
295      !!----------------------------------------------------------------------
296      REAL(wp), INTENT(inout) ::   pmess(*)   ! array of real
297      INTEGER , INTENT(in   ) ::   kbytes     ! size of the array pmess
298      INTEGER , INTENT(in   ) ::   kdest      ! receive process number
299      INTEGER , INTENT(in   ) ::   ktyp       ! tag of the message
300      INTEGER , INTENT(inout) ::   md_req     ! argument for isend
301      !!
302      INTEGER ::   iflag
303      INTEGER :: mpi_working_type
304      !!----------------------------------------------------------------------
305      !
306#if ! defined key_mpi_off
307      IF (wp == dp) THEN
308         mpi_working_type = mpi_double_precision
309      ELSE
310         mpi_working_type = mpi_real
311      END IF
312      CALL mpi_isend( pmess, kbytes, mpi_working_type, kdest , ktyp, mpi_comm_oce, md_req, iflag )
313#endif
314      !
315   END SUBROUTINE mppsend
316
317
318   SUBROUTINE mppsend_dp( ktyp, pmess, kbytes, kdest, md_req )
319      !!----------------------------------------------------------------------
320      !!                  ***  routine mppsend  ***
321      !!
322      !! ** Purpose :   Send messag passing array
323      !!
324      !!----------------------------------------------------------------------
325      REAL(dp), INTENT(inout) ::   pmess(*)   ! array of real
326      INTEGER , INTENT(in   ) ::   kbytes     ! size of the array pmess
327      INTEGER , INTENT(in   ) ::   kdest      ! receive process number
328      INTEGER , INTENT(in   ) ::   ktyp       ! tag of the message
329      INTEGER , INTENT(inout) ::   md_req     ! argument for isend
330      !!
331      INTEGER ::   iflag
332      !!----------------------------------------------------------------------
333      !
334#if ! defined key_mpi_off
335      CALL mpi_isend( pmess, kbytes, mpi_double_precision, kdest , ktyp, mpi_comm_oce, md_req, iflag )
336#endif
337      !
338   END SUBROUTINE mppsend_dp
339
340
341   SUBROUTINE mppsend_sp( ktyp, pmess, kbytes, kdest, md_req )
342      !!----------------------------------------------------------------------
343      !!                  ***  routine mppsend  ***
344      !!
345      !! ** Purpose :   Send messag passing array
346      !!
347      !!----------------------------------------------------------------------
348      REAL(sp), INTENT(inout) ::   pmess(*)   ! array of real
349      INTEGER , INTENT(in   ) ::   kbytes     ! size of the array pmess
350      INTEGER , INTENT(in   ) ::   kdest      ! receive process number
351      INTEGER , INTENT(in   ) ::   ktyp       ! tag of the message
352      INTEGER , INTENT(inout) ::   md_req     ! argument for isend
353      !!
354      INTEGER ::   iflag
355      !!----------------------------------------------------------------------
356      !
357#if ! defined key_mpi_off
358      CALL mpi_isend( pmess, kbytes, mpi_real, kdest , ktyp, mpi_comm_oce, md_req, iflag )
359#endif
360      !
361   END SUBROUTINE mppsend_sp
362
363
364   SUBROUTINE mpprecv( ktyp, pmess, kbytes, ksource )
365      !!----------------------------------------------------------------------
366      !!                  ***  routine mpprecv  ***
367      !!
368      !! ** Purpose :   Receive messag passing array
369      !!
370      !!----------------------------------------------------------------------
371      REAL(wp), INTENT(inout) ::   pmess(*)   ! array of real
372      INTEGER , INTENT(in   ) ::   kbytes     ! suze of the array pmess
373      INTEGER , INTENT(in   ) ::   ktyp       ! Tag of the recevied message
374      INTEGER, OPTIONAL, INTENT(in) :: ksource    ! source process number
375      !!
376      INTEGER :: istatus(mpi_status_size)
377      INTEGER :: iflag
378      INTEGER :: use_source
379      INTEGER :: mpi_working_type
380      !!----------------------------------------------------------------------
381      !
382#if ! defined key_mpi_off
383      ! If a specific process number has been passed to the receive call,
384      ! use that one. Default is to use mpi_any_source
385      use_source = mpi_any_source
386      IF( PRESENT(ksource) )   use_source = ksource
387      !
388      IF (wp == dp) THEN
389         mpi_working_type = mpi_double_precision
390      ELSE
391         mpi_working_type = mpi_real
392      END IF
393      CALL mpi_recv( pmess, kbytes, mpi_working_type, use_source, ktyp, mpi_comm_oce, istatus, iflag )
394#endif
395      !
396   END SUBROUTINE mpprecv
397
398   SUBROUTINE mpprecv_dp( ktyp, pmess, kbytes, ksource )
399      !!----------------------------------------------------------------------
400      !!                  ***  routine mpprecv  ***
401      !!
402      !! ** Purpose :   Receive messag passing array
403      !!
404      !!----------------------------------------------------------------------
405      REAL(dp), INTENT(inout) ::   pmess(*)   ! array of real
406      INTEGER , INTENT(in   ) ::   kbytes     ! suze of the array pmess
407      INTEGER , INTENT(in   ) ::   ktyp       ! Tag of the recevied message
408      INTEGER, OPTIONAL, INTENT(in) :: ksource    ! source process number
409      !!
410      INTEGER :: istatus(mpi_status_size)
411      INTEGER :: iflag
412      INTEGER :: use_source
413      !!----------------------------------------------------------------------
414      !
415#if ! defined key_mpi_off
416      ! If a specific process number has been passed to the receive call,
417      ! use that one. Default is to use mpi_any_source
418      use_source = mpi_any_source
419      IF( PRESENT(ksource) )   use_source = ksource
420      !
421      CALL mpi_recv( pmess, kbytes, mpi_double_precision, use_source, ktyp, mpi_comm_oce, istatus, iflag )
422#endif
423      !
424   END SUBROUTINE mpprecv_dp
425
426
427   SUBROUTINE mpprecv_sp( ktyp, pmess, kbytes, ksource )
428      !!----------------------------------------------------------------------
429      !!                  ***  routine mpprecv  ***
430      !!
431      !! ** Purpose :   Receive messag passing array
432      !!
433      !!----------------------------------------------------------------------
434      REAL(sp), INTENT(inout) ::   pmess(*)   ! array of real
435      INTEGER , INTENT(in   ) ::   kbytes     ! suze of the array pmess
436      INTEGER , INTENT(in   ) ::   ktyp       ! Tag of the recevied message
437      INTEGER, OPTIONAL, INTENT(in) :: ksource    ! source process number
438      !!
439      INTEGER :: istatus(mpi_status_size)
440      INTEGER :: iflag
441      INTEGER :: use_source
442      !!----------------------------------------------------------------------
443      !
444#if ! defined key_mpi_off
445      ! If a specific process number has been passed to the receive call,
446      ! use that one. Default is to use mpi_any_source
447      use_source = mpi_any_source
448      IF( PRESENT(ksource) )   use_source = ksource
449      !
450      CALL mpi_recv( pmess, kbytes, mpi_real, use_source, ktyp, mpi_comm_oce, istatus, iflag )
451#endif
452      !
453   END SUBROUTINE mpprecv_sp
454
455
456   SUBROUTINE mppgather( ptab, kp, pio )
457      !!----------------------------------------------------------------------
458      !!                   ***  routine mppgather  ***
459      !!
460      !! ** Purpose :   Transfert between a local subdomain array and a work
461      !!     array which is distributed following the vertical level.
462      !!
463      !!----------------------------------------------------------------------
464      REAL(wp), DIMENSION(jpi,jpj)      , INTENT(in   ) ::   ptab   ! subdomain input array
465      INTEGER                           , INTENT(in   ) ::   kp     ! record length
466      REAL(wp), DIMENSION(jpi,jpj,jpnij), INTENT(  out) ::   pio    ! subdomain input array
467      !!
468      INTEGER :: itaille, ierror   ! temporary integer
469      !!---------------------------------------------------------------------
470      !
471      itaille = jpi * jpj
472#if ! defined key_mpi_off
473      CALL mpi_gather( ptab, itaille, mpi_double_precision, pio, itaille     ,   &
474         &                            mpi_double_precision, kp , mpi_comm_oce, ierror )
475#else
476      pio(:,:,1) = ptab(:,:)
477#endif
478      !
479   END SUBROUTINE mppgather
480
481
482   SUBROUTINE mppscatter( pio, kp, ptab )
483      !!----------------------------------------------------------------------
484      !!                  ***  routine mppscatter  ***
485      !!
486      !! ** Purpose :   Transfert between awork array which is distributed
487      !!      following the vertical level and the local subdomain array.
488      !!
489      !!----------------------------------------------------------------------
490      REAL(wp), DIMENSION(jpi,jpj,jpnij)  ::   pio    ! output array
491      INTEGER                             ::   kp     ! Tag (not used with MPI
492      REAL(wp), DIMENSION(jpi,jpj)        ::   ptab   ! subdomain array input
493      !!
494      INTEGER :: itaille, ierror   ! temporary integer
495      !!---------------------------------------------------------------------
496      !
497      itaille = jpi * jpj
498      !
499#if ! defined key_mpi_off
500      CALL mpi_scatter( pio, itaille, mpi_double_precision, ptab, itaille     ,   &
501         &                            mpi_double_precision, kp  , mpi_comm_oce, ierror )
502#else
503      ptab(:,:) = pio(:,:,1)
504#endif
505      !
506   END SUBROUTINE mppscatter
507
508
509   SUBROUTINE mpp_delay_sum( cdname, cdelay, y_in, pout, ldlast, kcom )
510     !!----------------------------------------------------------------------
511      !!                   ***  routine mpp_delay_sum  ***
512      !!
513      !! ** Purpose :   performed delayed mpp_sum, the result is received on next call
514      !!
515      !!----------------------------------------------------------------------
516      CHARACTER(len=*), INTENT(in   )               ::   cdname  ! name of the calling subroutine
517      CHARACTER(len=*), INTENT(in   )               ::   cdelay  ! name (used as id) of the delayed operation
518      COMPLEX(dp),      INTENT(in   ), DIMENSION(:) ::   y_in
519      REAL(wp),         INTENT(  out), DIMENSION(:) ::   pout
520      LOGICAL,          INTENT(in   )               ::   ldlast  ! true if this is the last time we call this routine
521      INTEGER,          INTENT(in   ), OPTIONAL     ::   kcom
522      !!
523      INTEGER ::   ji, isz
524      INTEGER ::   idvar
525      INTEGER ::   ierr, ilocalcomm
526      COMPLEX(dp), ALLOCATABLE, DIMENSION(:) ::   ytmp
527      !!----------------------------------------------------------------------
528#if ! defined key_mpi_off
529      ilocalcomm = mpi_comm_oce
530      IF( PRESENT(kcom) )   ilocalcomm = kcom
531
532      isz = SIZE(y_in)
533
534      IF( narea == 1 .AND. numcom == -1 ) CALL mpp_report( cdname, ld_dlg = .TRUE. )
535
536      idvar = -1
537      DO ji = 1, nbdelay
538         IF( TRIM(cdelay) == TRIM(c_delaylist(ji)) ) idvar = ji
539      END DO
540      IF ( idvar == -1 )   CALL ctl_stop( 'STOP',' mpp_delay_sum : please add a new delayed exchange for '//TRIM(cdname) )
541
542      IF ( ndelayid(idvar) == 0 ) THEN         ! first call    with restart: %z1d defined in iom_delay_rst
543         !                                       --------------------------
544         IF ( SIZE(todelay(idvar)%z1d) /= isz ) THEN                  ! Check dimension coherence
545            IF(lwp) WRITE(numout,*) ' WARNING: the nb of delayed variables in restart file is not the model one'
546            DEALLOCATE(todelay(idvar)%z1d)
547            ndelayid(idvar) = -1                                      ! do as if we had no restart
548         ELSE
549            ALLOCATE(todelay(idvar)%y1d(isz))
550            todelay(idvar)%y1d(:) = CMPLX(todelay(idvar)%z1d(:), 0., wp)   ! create %y1d, complex variable needed by mpi_sumdd
551            ndelayid(idvar) = MPI_REQUEST_NULL                             ! initialised request to a valid value
552         END IF
553      ENDIF
554
555      IF( ndelayid(idvar) == -1 ) THEN         ! first call without restart: define %y1d and %z1d from y_in with blocking allreduce
556         !                                       --------------------------
557         ALLOCATE(todelay(idvar)%z1d(isz), todelay(idvar)%y1d(isz))   ! allocate also %z1d as used for the restart
558         CALL mpi_allreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ierr )   ! get %y1d
559         ndelayid(idvar) = MPI_REQUEST_NULL
560      ENDIF
561
562      CALL mpp_delay_rcv( idvar )         ! make sure %z1d is received
563
564      ! send back pout from todelay(idvar)%z1d defined at previous call
565      pout(:) = todelay(idvar)%z1d(:)
566
567      ! send y_in into todelay(idvar)%y1d with a non-blocking communication
568# if defined key_mpi2
569      IF( ln_timing ) CALL tic_tac( .TRUE., ld_global = .TRUE.)
570      CALL  mpi_allreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ierr )
571      ndelayid(idvar) = MPI_REQUEST_NULL
572      IF( ln_timing ) CALL tic_tac(.FALSE., ld_global = .TRUE.)
573# else
574      CALL mpi_iallreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ndelayid(idvar), ierr )
575# endif
576#else
577      pout(:) = REAL(y_in(:), wp)
578#endif
579
580   END SUBROUTINE mpp_delay_sum
581
582
583   SUBROUTINE mpp_delay_max( cdname, cdelay, p_in, pout, ldlast, kcom )
584      !!----------------------------------------------------------------------
585      !!                   ***  routine mpp_delay_max  ***
586      !!
587      !! ** Purpose :   performed delayed mpp_max, the result is received on next call
588      !!
589      !!----------------------------------------------------------------------
590      CHARACTER(len=*), INTENT(in   )                 ::   cdname  ! name of the calling subroutine
591      CHARACTER(len=*), INTENT(in   )                 ::   cdelay  ! name (used as id) of the delayed operation
592      REAL(wp),         INTENT(in   ), DIMENSION(:)   ::   p_in    !
593      REAL(wp),         INTENT(  out), DIMENSION(:)   ::   pout    !
594      LOGICAL,          INTENT(in   )                 ::   ldlast  ! true if this is the last time we call this routine
595      INTEGER,          INTENT(in   ), OPTIONAL       ::   kcom
596      !!
597      INTEGER ::   ji, isz
598      INTEGER ::   idvar
599      INTEGER ::   ierr, ilocalcomm
600      INTEGER ::   MPI_TYPE
601      !!----------------------------------------------------------------------
602
603#if ! defined key_mpi_off
604      if( wp == dp ) then
605         MPI_TYPE = MPI_DOUBLE_PRECISION
606      else if ( wp == sp ) then
607         MPI_TYPE = MPI_REAL
608      else
609        CALL ctl_stop( "Error defining type, wp is neither dp nor sp" )
610
611      end if
612
613      ilocalcomm = mpi_comm_oce
614      IF( PRESENT(kcom) )   ilocalcomm = kcom
615
616      isz = SIZE(p_in)
617
618      IF( narea == 1 .AND. numcom == -1 ) CALL mpp_report( cdname, ld_dlg = .TRUE. )
619
620      idvar = -1
621      DO ji = 1, nbdelay
622         IF( TRIM(cdelay) == TRIM(c_delaylist(ji)) ) idvar = ji
623      END DO
624      IF ( idvar == -1 )   CALL ctl_stop( 'STOP',' mpp_delay_max : please add a new delayed exchange for '//TRIM(cdname) )
625
626      IF ( ndelayid(idvar) == 0 ) THEN         ! first call    with restart: %z1d defined in iom_delay_rst
627         !                                       --------------------------
628         IF ( SIZE(todelay(idvar)%z1d) /= isz ) THEN                  ! Check dimension coherence
629            IF(lwp) WRITE(numout,*) ' WARNING: the nb of delayed variables in restart file is not the model one'
630            DEALLOCATE(todelay(idvar)%z1d)
631            ndelayid(idvar) = -1                                      ! do as if we had no restart
632         ELSE
633            ndelayid(idvar) = MPI_REQUEST_NULL
634         END IF
635      ENDIF
636
637      IF( ndelayid(idvar) == -1 ) THEN         ! first call without restart: define %z1d from p_in with a blocking allreduce
638         !                                       --------------------------
639         ALLOCATE(todelay(idvar)%z1d(isz))
640         CALL mpi_allreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_DOUBLE_PRECISION, mpi_max, ilocalcomm, ierr )   ! get %z1d
641         ndelayid(idvar) = MPI_REQUEST_NULL
642      ENDIF
643
644      CALL mpp_delay_rcv( idvar )         ! make sure %z1d is received
645
646      ! send back pout from todelay(idvar)%z1d defined at previous call
647      pout(:) = todelay(idvar)%z1d(:)
648
649      ! send p_in into todelay(idvar)%z1d with a non-blocking communication
650      ! (PM) Should we get rid of MPI2 option ? MPI3 was release in 2013. Who is still using MPI2 ?
651# if defined key_mpi2
652      IF( ln_timing ) CALL tic_tac( .TRUE., ld_global = .TRUE.)
653      CALL  mpi_allreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_TYPE, mpi_max, ilocalcomm, ierr )
654      IF( ln_timing ) CALL tic_tac(.FALSE., ld_global = .TRUE.)
655# else
656      CALL mpi_iallreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_TYPE, mpi_max, ilocalcomm, ndelayid(idvar), ierr )
657# endif
658#else
659      pout(:) = p_in(:)
660#endif
661
662   END SUBROUTINE mpp_delay_max
663
664
665   SUBROUTINE mpp_delay_rcv( kid )
666      !!----------------------------------------------------------------------
667      !!                   ***  routine mpp_delay_rcv  ***
668      !!
669      !! ** Purpose :  force barrier for delayed mpp (needed for restart)
670      !!
671      !!----------------------------------------------------------------------
672      INTEGER,INTENT(in   )      ::  kid
673      INTEGER ::   ierr
674      !!----------------------------------------------------------------------
675#if ! defined key_mpi_off
676      IF( ln_timing ) CALL tic_tac( .TRUE., ld_global = .TRUE.)
677      ! test on ndelayid(kid) useless as mpi_wait return immediatly if the request handle is MPI_REQUEST_NULL
678      CALL mpi_wait( ndelayid(kid), MPI_STATUS_IGNORE, ierr ) ! after this ndelayid(kid) = MPI_REQUEST_NULL
679      IF( ln_timing ) CALL tic_tac( .FALSE., ld_global = .TRUE.)
680      IF( ASSOCIATED(todelay(kid)%y1d) )   todelay(kid)%z1d(:) = REAL(todelay(kid)%y1d(:), wp)  ! define %z1d from %y1d
681#endif
682   END SUBROUTINE mpp_delay_rcv
683
684   SUBROUTINE mpp_bcast_nml( cdnambuff , kleng )
685      CHARACTER(LEN=:)    , ALLOCATABLE, INTENT(INOUT) :: cdnambuff
686      INTEGER                          , INTENT(INOUT) :: kleng
687      !!----------------------------------------------------------------------
688      !!                  ***  routine mpp_bcast_nml  ***
689      !!
690      !! ** Purpose :   broadcast namelist character buffer
691      !!
692      !!----------------------------------------------------------------------
693      !!
694      INTEGER ::   iflag
695      !!----------------------------------------------------------------------
696      !
697#if ! defined key_mpi_off
698      call MPI_BCAST(kleng, 1, MPI_INT, 0, mpi_comm_oce, iflag)
699      call MPI_BARRIER(mpi_comm_oce, iflag)
700!$AGRIF_DO_NOT_TREAT
701      IF ( .NOT. ALLOCATED(cdnambuff) ) ALLOCATE( CHARACTER(LEN=kleng) :: cdnambuff )
702!$AGRIF_END_DO_NOT_TREAT
703      call MPI_BCAST(cdnambuff, kleng, MPI_CHARACTER, 0, mpi_comm_oce, iflag)
704      call MPI_BARRIER(mpi_comm_oce, iflag)
705#endif
706      !
707   END SUBROUTINE mpp_bcast_nml
708
709
710   !!----------------------------------------------------------------------
711   !!    ***  mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real  ***
712   !!
713   !!----------------------------------------------------------------------
714   !!
715#  define OPERATION_MAX
716#  define INTEGER_TYPE
717#  define DIM_0d
718#     define ROUTINE_ALLREDUCE           mppmax_int
719#     include "mpp_allreduce_generic.h90"
720#     undef ROUTINE_ALLREDUCE
721#  undef DIM_0d
722#  define DIM_1d
723#     define ROUTINE_ALLREDUCE           mppmax_a_int
724#     include "mpp_allreduce_generic.h90"
725#     undef ROUTINE_ALLREDUCE
726#  undef DIM_1d
727#  undef INTEGER_TYPE
728!
729   !!
730   !!   ----   SINGLE PRECISION VERSIONS
731   !!
732#  define SINGLE_PRECISION
733#  define REAL_TYPE
734#  define DIM_0d
735#     define ROUTINE_ALLREDUCE           mppmax_real_sp
736#     include "mpp_allreduce_generic.h90"
737#     undef ROUTINE_ALLREDUCE
738#  undef DIM_0d
739#  define DIM_1d
740#     define ROUTINE_ALLREDUCE           mppmax_a_real_sp
741#     include "mpp_allreduce_generic.h90"
742#     undef ROUTINE_ALLREDUCE
743#  undef DIM_1d
744#  undef SINGLE_PRECISION
745   !!
746   !!
747   !!   ----   DOUBLE PRECISION VERSIONS
748   !!
749!
750#  define DIM_0d
751#     define ROUTINE_ALLREDUCE           mppmax_real_dp
752#     include "mpp_allreduce_generic.h90"
753#     undef ROUTINE_ALLREDUCE
754#  undef DIM_0d
755#  define DIM_1d
756#     define ROUTINE_ALLREDUCE           mppmax_a_real_dp
757#     include "mpp_allreduce_generic.h90"
758#     undef ROUTINE_ALLREDUCE
759#  undef DIM_1d
760#  undef REAL_TYPE
761#  undef OPERATION_MAX
762   !!----------------------------------------------------------------------
763   !!    ***  mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real  ***
764   !!
765   !!----------------------------------------------------------------------
766   !!
767#  define OPERATION_MIN
768#  define INTEGER_TYPE
769#  define DIM_0d
770#     define ROUTINE_ALLREDUCE           mppmin_int
771#     include "mpp_allreduce_generic.h90"
772#     undef ROUTINE_ALLREDUCE
773#  undef DIM_0d
774#  define DIM_1d
775#     define ROUTINE_ALLREDUCE           mppmin_a_int
776#     include "mpp_allreduce_generic.h90"
777#     undef ROUTINE_ALLREDUCE
778#  undef DIM_1d
779#  undef INTEGER_TYPE
780!
781   !!
782   !!   ----   SINGLE PRECISION VERSIONS
783   !!
784#  define SINGLE_PRECISION
785#  define REAL_TYPE
786#  define DIM_0d
787#     define ROUTINE_ALLREDUCE           mppmin_real_sp
788#     include "mpp_allreduce_generic.h90"
789#     undef ROUTINE_ALLREDUCE
790#  undef DIM_0d
791#  define DIM_1d
792#     define ROUTINE_ALLREDUCE           mppmin_a_real_sp
793#     include "mpp_allreduce_generic.h90"
794#     undef ROUTINE_ALLREDUCE
795#  undef DIM_1d
796#  undef SINGLE_PRECISION
797   !!
798   !!   ----   DOUBLE PRECISION VERSIONS
799   !!
800
801#  define DIM_0d
802#     define ROUTINE_ALLREDUCE           mppmin_real_dp
803#     include "mpp_allreduce_generic.h90"
804#     undef ROUTINE_ALLREDUCE
805#  undef DIM_0d
806#  define DIM_1d
807#     define ROUTINE_ALLREDUCE           mppmin_a_real_dp
808#     include "mpp_allreduce_generic.h90"
809#     undef ROUTINE_ALLREDUCE
810#  undef DIM_1d
811#  undef REAL_TYPE
812#  undef OPERATION_MIN
813
814   !!----------------------------------------------------------------------
815   !!    ***  mppsum_a_int, mppsum_int, mppsum_a_real, mppsum_real  ***
816   !!
817   !!   Global sum of 1D array or a variable (integer, real or complex)
818   !!----------------------------------------------------------------------
819   !!
820#  define OPERATION_SUM
821#  define INTEGER_TYPE
822#  define DIM_0d
823#     define ROUTINE_ALLREDUCE           mppsum_int
824#     include "mpp_allreduce_generic.h90"
825#     undef ROUTINE_ALLREDUCE
826#  undef DIM_0d
827#  define DIM_1d
828#     define ROUTINE_ALLREDUCE           mppsum_a_int
829#     include "mpp_allreduce_generic.h90"
830#     undef ROUTINE_ALLREDUCE
831#  undef DIM_1d
832#  undef INTEGER_TYPE
833
834   !!
835   !!   ----   SINGLE PRECISION VERSIONS
836   !!
837#  define OPERATION_SUM
838#  define SINGLE_PRECISION
839#  define REAL_TYPE
840#  define DIM_0d
841#     define ROUTINE_ALLREDUCE           mppsum_real_sp
842#     include "mpp_allreduce_generic.h90"
843#     undef ROUTINE_ALLREDUCE
844#  undef DIM_0d
845#  define DIM_1d
846#     define ROUTINE_ALLREDUCE           mppsum_a_real_sp
847#     include "mpp_allreduce_generic.h90"
848#     undef ROUTINE_ALLREDUCE
849#  undef DIM_1d
850#  undef REAL_TYPE
851#  undef OPERATION_SUM
852
853#  undef SINGLE_PRECISION
854
855   !!
856   !!   ----   DOUBLE PRECISION VERSIONS
857   !!
858#  define OPERATION_SUM
859#  define REAL_TYPE
860#  define DIM_0d
861#     define ROUTINE_ALLREDUCE           mppsum_real_dp
862#     include "mpp_allreduce_generic.h90"
863#     undef ROUTINE_ALLREDUCE
864#  undef DIM_0d
865#  define DIM_1d
866#     define ROUTINE_ALLREDUCE           mppsum_a_real_dp
867#     include "mpp_allreduce_generic.h90"
868#     undef ROUTINE_ALLREDUCE
869#  undef DIM_1d
870#  undef REAL_TYPE
871#  undef OPERATION_SUM
872
873#  define OPERATION_SUM_DD
874#  define COMPLEX_TYPE
875#  define DIM_0d
876#     define ROUTINE_ALLREDUCE           mppsum_realdd
877#     include "mpp_allreduce_generic.h90"
878#     undef ROUTINE_ALLREDUCE
879#  undef DIM_0d
880#  define DIM_1d
881#     define ROUTINE_ALLREDUCE           mppsum_a_realdd
882#     include "mpp_allreduce_generic.h90"
883#     undef ROUTINE_ALLREDUCE
884#  undef DIM_1d
885#  undef COMPLEX_TYPE
886#  undef OPERATION_SUM_DD
887
888   !!----------------------------------------------------------------------
889   !!    ***  mpp_minloc2d, mpp_minloc3d, mpp_maxloc2d, mpp_maxloc3d
890   !!
891   !!----------------------------------------------------------------------
892   !!
893   !!
894   !!   ----   SINGLE PRECISION VERSIONS
895   !!
896#  define SINGLE_PRECISION
897#  define OPERATION_MINLOC
898#  define DIM_2d
899#     define ROUTINE_LOC           mpp_minloc2d_sp
900#     include "mpp_loc_generic.h90"
901#     undef ROUTINE_LOC
902#  undef DIM_2d
903#  define DIM_3d
904#     define ROUTINE_LOC           mpp_minloc3d_sp
905#     include "mpp_loc_generic.h90"
906#     undef ROUTINE_LOC
907#  undef DIM_3d
908#  undef OPERATION_MINLOC
909
910#  define OPERATION_MAXLOC
911#  define DIM_2d
912#     define ROUTINE_LOC           mpp_maxloc2d_sp
913#     include "mpp_loc_generic.h90"
914#     undef ROUTINE_LOC
915#  undef DIM_2d
916#  define DIM_3d
917#     define ROUTINE_LOC           mpp_maxloc3d_sp
918#     include "mpp_loc_generic.h90"
919#     undef ROUTINE_LOC
920#  undef DIM_3d
921#  undef OPERATION_MAXLOC
922#  undef SINGLE_PRECISION
923   !!
924   !!   ----   DOUBLE PRECISION VERSIONS
925   !!
926#  define OPERATION_MINLOC
927#  define DIM_2d
928#     define ROUTINE_LOC           mpp_minloc2d_dp
929#     include "mpp_loc_generic.h90"
930#     undef ROUTINE_LOC
931#  undef DIM_2d
932#  define DIM_3d
933#     define ROUTINE_LOC           mpp_minloc3d_dp
934#     include "mpp_loc_generic.h90"
935#     undef ROUTINE_LOC
936#  undef DIM_3d
937#  undef OPERATION_MINLOC
938
939#  define OPERATION_MAXLOC
940#  define DIM_2d
941#     define ROUTINE_LOC           mpp_maxloc2d_dp
942#     include "mpp_loc_generic.h90"
943#     undef ROUTINE_LOC
944#  undef DIM_2d
945#  define DIM_3d
946#     define ROUTINE_LOC           mpp_maxloc3d_dp
947#     include "mpp_loc_generic.h90"
948#     undef ROUTINE_LOC
949#  undef DIM_3d
950#  undef OPERATION_MAXLOC
951
952
953   SUBROUTINE mppsync()
954      !!----------------------------------------------------------------------
955      !!                  ***  routine mppsync  ***
956      !!
957      !! ** Purpose :   Massively parallel processors, synchroneous
958      !!
959      !!-----------------------------------------------------------------------
960      INTEGER :: ierror
961      !!-----------------------------------------------------------------------
962      !
963#if ! defined key_mpi_off
964      CALL mpi_barrier( mpi_comm_oce, ierror )
965#endif
966      !
967   END SUBROUTINE mppsync
968
969
970   SUBROUTINE mppstop( ld_abort )
971      !!----------------------------------------------------------------------
972      !!                  ***  routine mppstop  ***
973      !!
974      !! ** purpose :   Stop massively parallel processors method
975      !!
976      !!----------------------------------------------------------------------
977      LOGICAL, OPTIONAL, INTENT(in) :: ld_abort    ! source process number
978      LOGICAL ::   ll_abort
979      INTEGER ::   info, ierr
980      !!----------------------------------------------------------------------
981      ll_abort = .FALSE.
982      IF( PRESENT(ld_abort) ) ll_abort = ld_abort
983      !
984#if ! defined key_mpi_off
985      IF(ll_abort) THEN
986         CALL mpi_abort( MPI_COMM_WORLD, 123, info )
987      ELSE
988         CALL mppsync
989         CALL mpi_finalize( info )
990      ENDIF
991#endif
992      IF( ll_abort ) STOP 123
993      !
994   END SUBROUTINE mppstop
995
996
997   SUBROUTINE mpp_comm_free( kcom )
998      !!----------------------------------------------------------------------
999      INTEGER, INTENT(inout) ::   kcom
1000      !!
1001      INTEGER :: ierr
1002      !!----------------------------------------------------------------------
1003      !
1004#if ! defined key_mpi_off
1005      CALL MPI_COMM_FREE(kcom, ierr)
1006#endif
1007      !
1008   END SUBROUTINE mpp_comm_free
1009
1010
1011   SUBROUTINE mpp_ini_znl( kumout )
1012      !!----------------------------------------------------------------------
1013      !!               ***  routine mpp_ini_znl  ***
1014      !!
1015      !! ** Purpose :   Initialize special communicator for computing zonal sum
1016      !!
1017      !! ** Method  : - Look for processors in the same row
1018      !!              - Put their number in nrank_znl
1019      !!              - Create group for the znl processors
1020      !!              - Create a communicator for znl processors
1021      !!              - Determine if processor should write znl files
1022      !!
1023      !! ** output
1024      !!      ndim_rank_znl = number of processors on the same row
1025      !!      ngrp_znl = group ID for the znl processors
1026      !!      ncomm_znl = communicator for the ice procs.
1027      !!      n_znl_root = number (in the world) of proc 0 in the ice comm.
1028      !!
1029      !!----------------------------------------------------------------------
1030      INTEGER, INTENT(in) ::   kumout   ! ocean.output logical units
1031      !
1032      INTEGER :: jproc      ! dummy loop integer
1033      INTEGER :: ierr, ii   ! local integer
1034      INTEGER, ALLOCATABLE, DIMENSION(:) ::   kwork
1035      !!----------------------------------------------------------------------
1036#if ! defined key_mpi_off
1037      !-$$     WRITE (numout,*) 'mpp_ini_znl ', mpprank, ' - ngrp_world     : ', ngrp_world
1038      !-$$     WRITE (numout,*) 'mpp_ini_znl ', mpprank, ' - mpi_comm_world : ', mpi_comm_world
1039      !-$$     WRITE (numout,*) 'mpp_ini_znl ', mpprank, ' - mpi_comm_oce   : ', mpi_comm_oce
1040      !
1041      ALLOCATE( kwork(jpnij), STAT=ierr )
1042      IF( ierr /= 0 ) CALL ctl_stop( 'STOP', 'mpp_ini_znl : failed to allocate 1D array of length jpnij')
1043
1044      IF( jpnj == 1 ) THEN
1045         ngrp_znl  = ngrp_world
1046         ncomm_znl = mpi_comm_oce
1047      ELSE
1048         !
1049         CALL MPI_ALLGATHER ( njmpp, 1, mpi_integer, kwork, 1, mpi_integer, mpi_comm_oce, ierr )
1050         !-$$        WRITE (numout,*) 'mpp_ini_znl ', mpprank, ' - kwork pour njmpp : ', kwork
1051         !-$$        CALL flush(numout)
1052         !
1053         ! Count number of processors on the same row
1054         ndim_rank_znl = 0
1055         DO jproc=1,jpnij
1056            IF ( kwork(jproc) == njmpp ) THEN
1057               ndim_rank_znl = ndim_rank_znl + 1
1058            ENDIF
1059         END DO
1060         !-$$        WRITE (numout,*) 'mpp_ini_znl ', mpprank, ' - ndim_rank_znl : ', ndim_rank_znl
1061         !-$$        CALL flush(numout)
1062         ! Allocate the right size to nrank_znl
1063         IF (ALLOCATED (nrank_znl)) DEALLOCATE(nrank_znl)
1064         ALLOCATE(nrank_znl(ndim_rank_znl))
1065         ii = 0
1066         nrank_znl (:) = 0
1067         DO jproc=1,jpnij
1068            IF ( kwork(jproc) == njmpp) THEN
1069               ii = ii + 1
1070               nrank_znl(ii) = jproc -1
1071            ENDIF
1072         END DO
1073         !-$$        WRITE (numout,*) 'mpp_ini_znl ', mpprank, ' - nrank_znl : ', nrank_znl
1074         !-$$        CALL flush(numout)
1075
1076         ! Create the opa group
1077         CALL MPI_COMM_GROUP(mpi_comm_oce,ngrp_opa,ierr)
1078         !-$$        WRITE (numout,*) 'mpp_ini_znl ', mpprank, ' - ngrp_opa : ', ngrp_opa
1079         !-$$        CALL flush(numout)
1080
1081         ! Create the znl group from the opa group
1082         CALL MPI_GROUP_INCL  ( ngrp_opa, ndim_rank_znl, nrank_znl, ngrp_znl, ierr )
1083         !-$$        WRITE (numout,*) 'mpp_ini_znl ', mpprank, ' - ngrp_znl ', ngrp_znl
1084         !-$$        CALL flush(numout)
1085
1086         ! Create the znl communicator from the opa communicator, ie the pool of procs in the same row
1087         CALL MPI_COMM_CREATE ( mpi_comm_oce, ngrp_znl, ncomm_znl, ierr )
1088         !-$$        WRITE (numout,*) 'mpp_ini_znl ', mpprank, ' - ncomm_znl ', ncomm_znl
1089         !-$$        CALL flush(numout)
1090         !
1091      END IF
1092
1093      ! Determines if processor if the first (starting from i=1) on the row
1094      IF ( jpni == 1 ) THEN
1095         l_znl_root = .TRUE.
1096      ELSE
1097         l_znl_root = .FALSE.
1098         kwork (1) = nimpp
1099         CALL mpp_min ( 'lib_mpp', kwork(1), kcom = ncomm_znl)
1100         IF ( nimpp == kwork(1)) l_znl_root = .TRUE.
1101      END IF
1102
1103      DEALLOCATE(kwork)
1104#endif
1105
1106   END SUBROUTINE mpp_ini_znl
1107
1108   
1109   SUBROUTINE mpp_ini_nc( khls )
1110      !!----------------------------------------------------------------------
1111      !!               ***  routine mpp_ini_nc  ***
1112      !!
1113      !! ** Purpose :   Initialize special communicators for MPI3 neighbourhood
1114      !!                collectives
1115      !!
1116      !! ** Method  : - Create graph communicators starting from the processes
1117      !!                distribution along i and j directions
1118      !
1119      !! ** output
1120      !!         mpi_nc_com4 = MPI3 neighbourhood collectives communicator
1121      !!         mpi_nc_com8 = MPI3 neighbourhood collectives communicator (with diagonals)
1122      !!----------------------------------------------------------------------
1123      INTEGER,             INTENT(in   ) ::   khls        ! halo size, default = nn_hls
1124      !
1125      INTEGER, DIMENSION(:), ALLOCATABLE :: iSnei4, iRnei4, iSnei8, iRnei8
1126      INTEGER                            :: iScnt4, iRcnt4, iScnt8, iRcnt8
1127      INTEGER                            :: ierr
1128      LOGICAL, PARAMETER                 :: ireord = .FALSE.
1129      !!----------------------------------------------------------------------
1130#if ! defined key_mpi_off && ! defined key_mpi2
1131     
1132      iScnt4 = COUNT( mpiSnei(khls,1:4) >= 0 )
1133      iRcnt4 = COUNT( mpiRnei(khls,1:4) >= 0 )
1134      iScnt8 = COUNT( mpiSnei(khls,1:8) >= 0 )
1135      iRcnt8 = COUNT( mpiRnei(khls,1:8) >= 0 )
1136
1137      ALLOCATE( iSnei4(iScnt4), iRnei4(iRcnt4), iSnei8(iScnt8), iRnei8(iRcnt8) )   ! ok if icnt4 or icnt8 = 0
1138
1139      iSnei4 = PACK( mpiSnei(khls,1:4), mask = mpiSnei(khls,1:4) >= 0 )
1140      iRnei4 = PACK( mpiRnei(khls,1:4), mask = mpiRnei(khls,1:4) >= 0 )
1141      iSnei8 = PACK( mpiSnei(khls,1:8), mask = mpiSnei(khls,1:8) >= 0 )
1142      iRnei8 = PACK( mpiRnei(khls,1:8), mask = mpiRnei(khls,1:8) >= 0 )
1143
1144      CALL MPI_Dist_graph_create_adjacent( mpi_comm_oce, iScnt4, iSnei4, MPI_UNWEIGHTED, iRcnt4, iRnei4, MPI_UNWEIGHTED,   &
1145         &                                 MPI_INFO_NULL, ireord, mpi_nc_com4(khls), ierr )
1146      CALL MPI_Dist_graph_create_adjacent( mpi_comm_oce, iScnt8, iSnei8, MPI_UNWEIGHTED, iRcnt8, iRnei8, MPI_UNWEIGHTED,   &
1147         &                                 MPI_INFO_NULL, ireord, mpi_nc_com8(khls), ierr)
1148
1149      DEALLOCATE( iSnei4, iRnei4, iSnei8, iRnei8 )
1150#endif
1151   END SUBROUTINE mpp_ini_nc
1152
1153
1154   SUBROUTINE mpp_ini_north
1155      !!----------------------------------------------------------------------
1156      !!               ***  routine mpp_ini_north  ***
1157      !!
1158      !! ** Purpose :   Initialize special communicator for north folding
1159      !!      condition together with global variables needed in the mpp folding
1160      !!
1161      !! ** Method  : - Look for northern processors
1162      !!              - Put their number in nrank_north
1163      !!              - Create groups for the world processors and the north processors
1164      !!              - Create a communicator for northern processors
1165      !!
1166      !! ** output
1167      !!      ndim_rank_north = number of processors in the northern line
1168      !!      nrank_north (ndim_rank_north) = number  of the northern procs.
1169      !!      ngrp_world = group ID for the world processors
1170      !!      ngrp_north = group ID for the northern processors
1171      !!      ncomm_north = communicator for the northern procs.
1172      !!      north_root = number (in the world) of proc 0 in the northern comm.
1173      !!
1174      !!----------------------------------------------------------------------
1175      INTEGER ::   ierr
1176      INTEGER ::   jjproc
1177      INTEGER ::   ii, ji
1178      !!----------------------------------------------------------------------
1179      !
1180#if ! defined key_mpi_off
1181      !
1182      ! Look for how many procs on the northern boundary
1183      ndim_rank_north = 0
1184      DO jjproc = 1, jpni
1185         IF( nfproc(jjproc) /= -1 )   ndim_rank_north = ndim_rank_north + 1
1186      END DO
1187      !
1188      ! Allocate the right size to nrank_north
1189      IF (ALLOCATED (nrank_north)) DEALLOCATE(nrank_north)
1190      ALLOCATE( nrank_north(ndim_rank_north) )
1191
1192      ! Fill the nrank_north array with proc. number of northern procs.
1193      ! Note : the rank start at 0 in MPI
1194      ii = 0
1195      DO ji = 1, jpni
1196         IF ( nfproc(ji) /= -1   ) THEN
1197            ii=ii+1
1198            nrank_north(ii)=nfproc(ji)
1199         END IF
1200      END DO
1201      !
1202      ! create the world group
1203      CALL MPI_COMM_GROUP( mpi_comm_oce, ngrp_world, ierr )
1204      !
1205      ! Create the North group from the world group
1206      CALL MPI_GROUP_INCL( ngrp_world, ndim_rank_north, nrank_north, ngrp_north, ierr )
1207      !
1208      ! Create the North communicator , ie the pool of procs in the north group
1209      CALL MPI_COMM_CREATE( mpi_comm_oce, ngrp_north, ncomm_north, ierr )
1210      !
1211#endif
1212   END SUBROUTINE mpp_ini_north
1213
1214
1215   SUBROUTINE DDPDD_MPI( ydda, yddb, ilen, itype )
1216      !!---------------------------------------------------------------------
1217      !!   Routine DDPDD_MPI: used by reduction operator MPI_SUMDD
1218      !!
1219      !!   Modification of original codes written by David H. Bailey
1220      !!   This subroutine computes yddb(i) = ydda(i)+yddb(i)
1221      !!---------------------------------------------------------------------
1222      INTEGER                     , INTENT(in)    ::   ilen, itype
1223      COMPLEX(dp), DIMENSION(ilen), INTENT(in)    ::   ydda
1224      COMPLEX(dp), DIMENSION(ilen), INTENT(inout) ::   yddb
1225      !
1226      REAL(dp) :: zerr, zt1, zt2    ! local work variables
1227      INTEGER  :: ji, ztmp           ! local scalar
1228      !!---------------------------------------------------------------------
1229      !
1230      ztmp = itype   ! avoid compilation warning
1231      !
1232      DO ji=1,ilen
1233      ! Compute ydda + yddb using Knuth's trick.
1234         zt1  = real(ydda(ji)) + real(yddb(ji))
1235         zerr = zt1 - real(ydda(ji))
1236         zt2  = ((real(yddb(ji)) - zerr) + (real(ydda(ji)) - (zt1 - zerr))) &
1237                + aimag(ydda(ji)) + aimag(yddb(ji))
1238
1239         ! The result is zt1 + zt2, after normalization.
1240         yddb(ji) = cmplx ( zt1 + zt2, zt2 - ((zt1 + zt2) - zt1),wp )
1241      END DO
1242      !
1243   END SUBROUTINE DDPDD_MPI
1244
1245
1246   SUBROUTINE mpp_report( cdname, kpk, kpl, kpf, ld_lbc, ld_glb, ld_dlg )
1247      !!----------------------------------------------------------------------
1248      !!                  ***  routine mpp_report  ***
1249      !!
1250      !! ** Purpose :   report use of mpp routines per time-setp
1251      !!
1252      !!----------------------------------------------------------------------
1253      CHARACTER(len=*),           INTENT(in   ) ::   cdname      ! name of the calling subroutine
1254      INTEGER         , OPTIONAL, INTENT(in   ) ::   kpk, kpl, kpf
1255      LOGICAL         , OPTIONAL, INTENT(in   ) ::   ld_lbc, ld_glb, ld_dlg
1256      !!
1257      CHARACTER(len=128)                        ::   ccountname  ! name of a subroutine to count communications
1258      LOGICAL ::   ll_lbc, ll_glb, ll_dlg
1259      INTEGER ::    ji,  jj,  jk,  jh, jf, jcount   ! dummy loop indices
1260      !!----------------------------------------------------------------------
1261#if ! defined key_mpi_off
1262      !
1263      ll_lbc = .FALSE.
1264      IF( PRESENT(ld_lbc) ) ll_lbc = ld_lbc
1265      ll_glb = .FALSE.
1266      IF( PRESENT(ld_glb) ) ll_glb = ld_glb
1267      ll_dlg = .FALSE.
1268      IF( PRESENT(ld_dlg) ) ll_dlg = ld_dlg
1269      !
1270      ! find the smallest common frequency: default = frequency product, if multiple, choose the larger of the 2 frequency
1271      ncom_freq = ncom_fsbc
1272      !
1273      IF ( ncom_stp == nit000+ncom_freq ) THEN   ! avoid to count extra communications in potential initializations at nit000
1274         IF( ll_lbc ) THEN
1275            IF( .NOT. ALLOCATED(ncomm_sequence) ) ALLOCATE( ncomm_sequence(ncom_rec_max,2) )
1276            IF( .NOT. ALLOCATED(    crname_lbc) ) ALLOCATE(     crname_lbc(ncom_rec_max  ) )
1277            n_sequence_lbc = n_sequence_lbc + 1
1278            IF( n_sequence_lbc > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
1279            crname_lbc(n_sequence_lbc) = cdname     ! keep the name of the calling routine
1280            ncomm_sequence(n_sequence_lbc,1) = kpk*kpl   ! size of 3rd and 4th dimensions
1281            ncomm_sequence(n_sequence_lbc,2) = kpf       ! number of arrays to be treated (multi)
1282         ENDIF
1283         IF( ll_glb ) THEN
1284            IF( .NOT. ALLOCATED(crname_glb) ) ALLOCATE( crname_glb(ncom_rec_max) )
1285            n_sequence_glb = n_sequence_glb + 1
1286            IF( n_sequence_glb > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
1287            crname_glb(n_sequence_glb) = cdname     ! keep the name of the calling routine
1288         ENDIF
1289         IF( ll_dlg ) THEN
1290            IF( .NOT. ALLOCATED(crname_dlg) ) ALLOCATE( crname_dlg(ncom_rec_max) )
1291            n_sequence_dlg = n_sequence_dlg + 1
1292            IF( n_sequence_dlg > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
1293            crname_dlg(n_sequence_dlg) = cdname     ! keep the name of the calling routine
1294         ENDIF
1295      ELSE IF ( ncom_stp == nit000+2*ncom_freq ) THEN
1296         CALL ctl_opn( numcom, 'communication_report.txt', 'REPLACE', 'FORMATTED', 'SEQUENTIAL', -1, numout, .FALSE., narea )
1297         WRITE(numcom,*) ' '
1298         WRITE(numcom,*) ' ------------------------------------------------------------'
1299         WRITE(numcom,*) ' Communication pattern report (second oce+sbc+top time step):'
1300         WRITE(numcom,*) ' ------------------------------------------------------------'
1301         WRITE(numcom,*) ' '
1302         WRITE(numcom,'(A,I4)') ' Exchanged halos : ', n_sequence_lbc
1303         jj = 0; jk = 0; jf = 0; jh = 0
1304         DO ji = 1, n_sequence_lbc
1305            IF ( ncomm_sequence(ji,1) .GT. 1 ) jk = jk + 1
1306            IF ( ncomm_sequence(ji,2) .GT. 1 ) jf = jf + 1
1307            IF ( ncomm_sequence(ji,1) .GT. 1 .AND. ncomm_sequence(ji,2) .GT. 1 ) jj = jj + 1
1308            jh = MAX (jh, ncomm_sequence(ji,1)*ncomm_sequence(ji,2))
1309         END DO
1310         WRITE(numcom,'(A,I3)') ' 3D Exchanged halos : ', jk
1311         WRITE(numcom,'(A,I3)') ' Multi arrays exchanged halos : ', jf
1312         WRITE(numcom,'(A,I3)') '   from which 3D : ', jj
1313         WRITE(numcom,'(A,I10)') ' Array max size : ', jh*jpi*jpj
1314         WRITE(numcom,*) ' '
1315         WRITE(numcom,*) ' lbc_lnk called'
1316         DO ji = 1, n_sequence_lbc - 1
1317            IF ( crname_lbc(ji) /= 'already counted' ) THEN
1318               ccountname = crname_lbc(ji)
1319               crname_lbc(ji) = 'already counted'
1320               jcount = 1
1321               DO jj = ji + 1, n_sequence_lbc
1322                  IF ( ccountname ==  crname_lbc(jj) ) THEN
1323                     jcount = jcount + 1
1324                     crname_lbc(jj) = 'already counted'
1325                  END IF
1326               END DO
1327               WRITE(numcom,'(A, I4, A, A)') ' - ', jcount,' times by subroutine ', TRIM(ccountname)
1328            END IF
1329         END DO
1330         IF ( crname_lbc(n_sequence_lbc) /= 'already counted' ) THEN
1331            WRITE(numcom,'(A, I4, A, A)') ' - ', 1,' times by subroutine ', TRIM(crname_lbc(n_sequence_lbc))
1332         END IF
1333         WRITE(numcom,*) ' '
1334         IF ( n_sequence_glb > 0 ) THEN
1335            WRITE(numcom,'(A,I4)') ' Global communications : ', n_sequence_glb
1336            jj = 1
1337            DO ji = 2, n_sequence_glb
1338               IF( crname_glb(ji-1) /= crname_glb(ji) ) THEN
1339                  WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_glb(ji-1))
1340                  jj = 0
1341               END IF
1342               jj = jj + 1
1343            END DO
1344            WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_glb(n_sequence_glb))
1345            DEALLOCATE(crname_glb)
1346         ELSE
1347            WRITE(numcom,*) ' No MPI global communication '
1348         ENDIF
1349         WRITE(numcom,*) ' '
1350         IF ( n_sequence_dlg > 0 ) THEN
1351            WRITE(numcom,'(A,I4)') ' Delayed global communications : ', n_sequence_dlg
1352            jj = 1
1353            DO ji = 2, n_sequence_dlg
1354               IF( crname_dlg(ji-1) /= crname_dlg(ji) ) THEN
1355                  WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_dlg(ji-1))
1356                  jj = 0
1357               END IF
1358               jj = jj + 1
1359            END DO
1360            WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_dlg(n_sequence_dlg))
1361            DEALLOCATE(crname_dlg)
1362         ELSE
1363            WRITE(numcom,*) ' No MPI delayed global communication '
1364         ENDIF
1365         WRITE(numcom,*) ' '
1366         WRITE(numcom,*) ' -----------------------------------------------'
1367         WRITE(numcom,*) ' '
1368         DEALLOCATE(ncomm_sequence)
1369         DEALLOCATE(crname_lbc)
1370      ENDIF
1371#endif
1372   END SUBROUTINE mpp_report
1373
1374
1375   SUBROUTINE tic_tac (ld_tic, ld_global)
1376
1377    LOGICAL,           INTENT(IN) :: ld_tic
1378    LOGICAL, OPTIONAL, INTENT(IN) :: ld_global
1379    REAL(dp), DIMENSION(2), SAVE :: tic_wt
1380    REAL(dp),               SAVE :: tic_ct = 0._dp
1381    INTEGER :: ii
1382#if ! defined key_mpi_off
1383
1384    IF( ncom_stp <= nit000 ) RETURN
1385    IF( ncom_stp == nitend ) RETURN
1386    ii = 1
1387    IF( PRESENT( ld_global ) ) THEN
1388       IF( ld_global ) ii = 2
1389    END IF
1390
1391    IF ( ld_tic ) THEN
1392       tic_wt(ii) = MPI_Wtime()                                                    ! start count tic->tac (waiting time)
1393       IF ( tic_ct > 0.0_dp ) compute_time = compute_time + MPI_Wtime() - tic_ct   ! cumulate count tac->tic
1394    ELSE
1395       waiting_time(ii) = waiting_time(ii) + MPI_Wtime() - tic_wt(ii)              ! cumulate count tic->tac
1396       tic_ct = MPI_Wtime()                                                        ! start count tac->tic (waiting time)
1397    ENDIF
1398#endif
1399
1400   END SUBROUTINE tic_tac
1401
1402#if defined key_mpi_off
1403   SUBROUTINE mpi_wait(request, status, ierror)
1404      INTEGER                            , INTENT(in   ) ::   request
1405      INTEGER, DIMENSION(MPI_STATUS_SIZE), INTENT(  out) ::   status
1406      INTEGER                            , INTENT(  out) ::   ierror
1407      IF (.FALSE.) THEN   ! to avoid compilation warning
1408         status(:) = -1
1409         ierror = -1
1410      ENDIF
1411   END SUBROUTINE mpi_wait
1412
1413   SUBROUTINE mpi_waitall(count, request, status, ierror)
1414      INTEGER                            , INTENT(in   ) :: count
1415      INTEGER, DIMENSION(count)          , INTENT(in   ) :: request
1416      INTEGER, DIMENSION(MPI_STATUS_SIZE), INTENT(  out) :: status
1417      INTEGER                            , INTENT(  out) :: ierror
1418      IF (.FALSE.) THEN   ! to avoid compilation warning
1419         status(:) = -1
1420         ierror = -1
1421      ENDIF
1422   END SUBROUTINE mpi_waitall
1423
1424   FUNCTION MPI_Wtime()
1425      REAL(wp) ::  MPI_Wtime
1426      MPI_Wtime = -1.
1427   END FUNCTION MPI_Wtime
1428#endif
1429
1430   !!----------------------------------------------------------------------
1431   !!   ctl_stop, ctl_warn, get_unit, ctl_opn, ctl_nam, load_nml   routines
1432   !!----------------------------------------------------------------------
1433
1434   SUBROUTINE ctl_stop( cd1, cd2, cd3, cd4, cd5 ,   &
1435      &                 cd6, cd7, cd8, cd9, cd10 )
1436      !!----------------------------------------------------------------------
1437      !!                  ***  ROUTINE  stop_opa  ***
1438      !!
1439      !! ** Purpose :   print in ocean.outpput file a error message and
1440      !!                increment the error number (nstop) by one.
1441      !!----------------------------------------------------------------------
1442      CHARACTER(len=*), INTENT(in   )           ::   cd1
1443      CHARACTER(len=*), INTENT(in   ), OPTIONAL ::        cd2, cd3, cd4, cd5
1444      CHARACTER(len=*), INTENT(in   ), OPTIONAL ::   cd6, cd7, cd8, cd9, cd10
1445      !
1446      CHARACTER(LEN=8) ::   clfmt            ! writing format
1447      INTEGER          ::   inum
1448      !!----------------------------------------------------------------------
1449      !
1450      nstop = nstop + 1
1451      !
1452      IF( cd1 == 'STOP' .AND. narea /= 1 ) THEN    ! Immediate stop: add an arror message in 'ocean.output' file
1453         CALL ctl_opn( inum, 'ocean.output', 'APPEND', 'FORMATTED', 'SEQUENTIAL', -1, 6, .FALSE. )
1454         WRITE(inum,*)
1455         WRITE(inum,*) ' ==>>>   Look for "E R R O R" messages in all existing *ocean.output* files'
1456         CLOSE(inum)
1457      ENDIF
1458      IF( numout == 6 ) THEN                       ! force to open ocean.output file if not already opened
1459         CALL ctl_opn( numout, 'ocean.output', 'REPLACE', 'FORMATTED', 'SEQUENTIAL', -1, -1, .FALSE., narea )
1460      ENDIF
1461      !
1462                            WRITE(numout,*)
1463                            WRITE(numout,*) ' ===>>> : E R R O R'
1464                            WRITE(numout,*)
1465                            WRITE(numout,*) '         ==========='
1466                            WRITE(numout,*)
1467                            WRITE(numout,*) TRIM(cd1)
1468      IF( PRESENT(cd2 ) )   WRITE(numout,*) TRIM(cd2)
1469      IF( PRESENT(cd3 ) )   WRITE(numout,*) TRIM(cd3)
1470      IF( PRESENT(cd4 ) )   WRITE(numout,*) TRIM(cd4)
1471      IF( PRESENT(cd5 ) )   WRITE(numout,*) TRIM(cd5)
1472      IF( PRESENT(cd6 ) )   WRITE(numout,*) TRIM(cd6)
1473      IF( PRESENT(cd7 ) )   WRITE(numout,*) TRIM(cd7)
1474      IF( PRESENT(cd8 ) )   WRITE(numout,*) TRIM(cd8)
1475      IF( PRESENT(cd9 ) )   WRITE(numout,*) TRIM(cd9)
1476      IF( PRESENT(cd10) )   WRITE(numout,*) TRIM(cd10)
1477                            WRITE(numout,*)
1478      !
1479                               CALL FLUSH(numout    )
1480      IF( numstp     /= -1 )   CALL FLUSH(numstp    )
1481      IF( numrun     /= -1 )   CALL FLUSH(numrun    )
1482      IF( numevo_ice /= -1 )   CALL FLUSH(numevo_ice)
1483      !
1484      IF( cd1 == 'STOP' ) THEN
1485         WRITE(numout,*)
1486         WRITE(numout,*)  'huge E-R-R-O-R : immediate stop'
1487         WRITE(numout,*)
1488         CALL FLUSH(numout)
1489         CALL SLEEP(60)   ! make sure that all output and abort files are written by all cores. 60s should be enough...
1490         CALL mppstop( ld_abort = .true. )
1491      ENDIF
1492      !
1493   END SUBROUTINE ctl_stop
1494
1495
1496   SUBROUTINE ctl_warn( cd1, cd2, cd3, cd4, cd5,   &
1497      &                 cd6, cd7, cd8, cd9, cd10 )
1498      !!----------------------------------------------------------------------
1499      !!                  ***  ROUTINE  stop_warn  ***
1500      !!
1501      !! ** Purpose :   print in ocean.outpput file a error message and
1502      !!                increment the warning number (nwarn) by one.
1503      !!----------------------------------------------------------------------
1504      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd1, cd2, cd3, cd4, cd5
1505      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd6, cd7, cd8, cd9, cd10
1506      !!----------------------------------------------------------------------
1507      !
1508      nwarn = nwarn + 1
1509      !
1510      IF(lwp) THEN
1511                               WRITE(numout,*)
1512                               WRITE(numout,*) ' ===>>> : W A R N I N G'
1513                               WRITE(numout,*)
1514                               WRITE(numout,*) '         ==============='
1515                               WRITE(numout,*)
1516         IF( PRESENT(cd1 ) )   WRITE(numout,*) TRIM(cd1)
1517         IF( PRESENT(cd2 ) )   WRITE(numout,*) TRIM(cd2)
1518         IF( PRESENT(cd3 ) )   WRITE(numout,*) TRIM(cd3)
1519         IF( PRESENT(cd4 ) )   WRITE(numout,*) TRIM(cd4)
1520         IF( PRESENT(cd5 ) )   WRITE(numout,*) TRIM(cd5)
1521         IF( PRESENT(cd6 ) )   WRITE(numout,*) TRIM(cd6)
1522         IF( PRESENT(cd7 ) )   WRITE(numout,*) TRIM(cd7)
1523         IF( PRESENT(cd8 ) )   WRITE(numout,*) TRIM(cd8)
1524         IF( PRESENT(cd9 ) )   WRITE(numout,*) TRIM(cd9)
1525         IF( PRESENT(cd10) )   WRITE(numout,*) TRIM(cd10)
1526                               WRITE(numout,*)
1527      ENDIF
1528      CALL FLUSH(numout)
1529      !
1530   END SUBROUTINE ctl_warn
1531
1532
1533   SUBROUTINE ctl_opn( knum, cdfile, cdstat, cdform, cdacce, klengh, kout, ldwp, karea )
1534      !!----------------------------------------------------------------------
1535      !!                  ***  ROUTINE ctl_opn  ***
1536      !!
1537      !! ** Purpose :   Open file and check if required file is available.
1538      !!
1539      !! ** Method  :   Fortan open
1540      !!----------------------------------------------------------------------
1541      INTEGER          , INTENT(  out) ::   knum      ! logical unit to open
1542      CHARACTER(len=*) , INTENT(in   ) ::   cdfile    ! file name to open
1543      CHARACTER(len=*) , INTENT(in   ) ::   cdstat    ! disposition specifier
1544      CHARACTER(len=*) , INTENT(in   ) ::   cdform    ! formatting specifier
1545      CHARACTER(len=*) , INTENT(in   ) ::   cdacce    ! access specifier
1546      INTEGER          , INTENT(in   ) ::   klengh    ! record length
1547      INTEGER          , INTENT(in   ) ::   kout      ! number of logical units for write
1548      LOGICAL          , INTENT(in   ) ::   ldwp      ! boolean term for print
1549      INTEGER, OPTIONAL, INTENT(in   ) ::   karea     ! proc number
1550      !
1551      CHARACTER(len=80) ::   clfile
1552      CHARACTER(LEN=10) ::   clfmt            ! writing format
1553      INTEGER           ::   iost
1554      INTEGER           ::   idg              ! number of digits
1555      !!----------------------------------------------------------------------
1556      !
1557      ! adapt filename
1558      ! ----------------
1559      clfile = TRIM(cdfile)
1560      IF( PRESENT( karea ) ) THEN
1561         IF( karea > 1 ) THEN
1562            ! Warning: jpnij is maybe not already defined when calling ctl_opn -> use mppsize instead of jpnij
1563            idg = MAX( INT(LOG10(REAL(MAX(1,mppsize-1),wp))) + 1, 4 )      ! how many digits to we need to write? min=4, max=9
1564            WRITE(clfmt, "('(a,a,i', i1, '.', i1, ')')") idg, idg          ! '(a,a,ix.x)'
1565            WRITE(clfile, clfmt) TRIM(clfile), '_', karea-1
1566         ENDIF
1567      ENDIF
1568#if defined key_agrif
1569      IF( .NOT. Agrif_Root() )   clfile = TRIM(Agrif_CFixed())//'_'//TRIM(clfile)
1570      knum=Agrif_Get_Unit()
1571#else
1572      knum=get_unit()
1573#endif
1574      IF( TRIM(cdfile) == '/dev/null' )   clfile = TRIM(cdfile)   ! force the use of /dev/null
1575      !
1576      IF(       cdacce(1:6) == 'DIRECT' )  THEN   ! cdacce has always more than 6 characters
1577         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS=cdstat, RECL=klengh         , ERR=100, IOSTAT=iost )
1578      ELSE IF( TRIM(cdstat) == 'APPEND' )  THEN   ! cdstat can have less than 6 characters
1579         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS='UNKNOWN', POSITION='APPEND', ERR=100, IOSTAT=iost )
1580      ELSE
1581         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS=cdstat                      , ERR=100, IOSTAT=iost )
1582      ENDIF
1583      IF( iost /= 0 .AND. TRIM(clfile) == '/dev/null' ) &   ! for windows
1584         &  OPEN(UNIT=knum,FILE='NUL', FORM=cdform, ACCESS=cdacce, STATUS=cdstat                      , ERR=100, IOSTAT=iost )
1585      IF( iost == 0 ) THEN
1586         IF(ldwp .AND. kout > 0) THEN
1587            WRITE(kout,*) '     file   : ', TRIM(clfile),' open ok'
1588            WRITE(kout,*) '     unit   = ', knum
1589            WRITE(kout,*) '     status = ', cdstat
1590            WRITE(kout,*) '     form   = ', cdform
1591            WRITE(kout,*) '     access = ', cdacce
1592            WRITE(kout,*)
1593         ENDIF
1594      ENDIF
1595100   CONTINUE
1596      IF( iost /= 0 ) THEN
1597         WRITE(ctmp1,*) ' ===>>>> : bad opening file: ', TRIM(clfile)
1598         WRITE(ctmp2,*) ' =======   ===  '
1599         WRITE(ctmp3,*) '           unit   = ', knum
1600         WRITE(ctmp4,*) '           status = ', cdstat
1601         WRITE(ctmp5,*) '           form   = ', cdform
1602         WRITE(ctmp6,*) '           access = ', cdacce
1603         WRITE(ctmp7,*) '           iostat = ', iost
1604         WRITE(ctmp8,*) '           we stop. verify the file '
1605         CALL ctl_stop( 'STOP', ctmp1, ctmp2, ctmp3, ctmp4, ctmp5, ctmp6, ctmp7, ctmp8 )
1606      ENDIF
1607      !
1608   END SUBROUTINE ctl_opn
1609
1610
1611   SUBROUTINE ctl_nam ( kios, cdnam )
1612      !!----------------------------------------------------------------------
1613      !!                  ***  ROUTINE ctl_nam  ***
1614      !!
1615      !! ** Purpose :   Informations when error while reading a namelist
1616      !!
1617      !! ** Method  :   Fortan open
1618      !!----------------------------------------------------------------------
1619      INTEGER                                , INTENT(inout) ::   kios    ! IO status after reading the namelist
1620      CHARACTER(len=*)                       , INTENT(in   ) ::   cdnam   ! group name of namelist for which error occurs
1621      !
1622      CHARACTER(len=5) ::   clios   ! string to convert iostat in character for print
1623      !!----------------------------------------------------------------------
1624      !
1625      WRITE (clios, '(I5.0)')   kios
1626      IF( kios < 0 ) THEN
1627         CALL ctl_warn( 'end of record or file while reading namelist '   &
1628            &           // TRIM(cdnam) // ' iostat = ' // TRIM(clios) )
1629      ENDIF
1630      !
1631      IF( kios > 0 ) THEN
1632         CALL ctl_stop( 'misspelled variable in namelist '   &
1633            &           // TRIM(cdnam) // ' iostat = ' // TRIM(clios) )
1634      ENDIF
1635      kios = 0
1636      !
1637   END SUBROUTINE ctl_nam
1638
1639
1640   INTEGER FUNCTION get_unit()
1641      !!----------------------------------------------------------------------
1642      !!                  ***  FUNCTION  get_unit  ***
1643      !!
1644      !! ** Purpose :   return the index of an unused logical unit
1645      !!----------------------------------------------------------------------
1646      LOGICAL :: llopn
1647      !!----------------------------------------------------------------------
1648      !
1649      get_unit = 15   ! choose a unit that is big enough then it is not already used in NEMO
1650      llopn = .TRUE.
1651      DO WHILE( (get_unit < 998) .AND. llopn )
1652         get_unit = get_unit + 1
1653         INQUIRE( unit = get_unit, opened = llopn )
1654      END DO
1655      IF( (get_unit == 999) .AND. llopn ) THEN
1656         CALL ctl_stop( 'STOP', 'get_unit: All logical units until 999 are used...' )
1657      ENDIF
1658      !
1659   END FUNCTION get_unit
1660
1661   SUBROUTINE load_nml( cdnambuff , cdnamfile, kout, ldwp)
1662      CHARACTER(LEN=:)    , ALLOCATABLE, INTENT(INOUT) :: cdnambuff
1663      CHARACTER(LEN=*), INTENT(IN )                :: cdnamfile
1664      CHARACTER(LEN=256)                           :: chline
1665      CHARACTER(LEN=1)                             :: csp
1666      INTEGER, INTENT(IN)                          :: kout
1667      LOGICAL, INTENT(IN)                          :: ldwp  !: .true. only for the root broadcaster
1668      INTEGER                                      :: itot, iun, iltc, inl, ios, itotsav
1669      !
1670      !csp = NEW_LINE('A')
1671      ! a new line character is the best seperator but some systems (e.g.Cray)
1672      ! seem to terminate namelist reads from internal files early if they
1673      ! encounter new-lines. Use a single space for safety.
1674      csp = ' '
1675      !
1676      ! Check if the namelist buffer has already been allocated. Return if it has.
1677      !
1678      IF ( ALLOCATED( cdnambuff ) ) RETURN
1679      IF( ldwp ) THEN
1680         !
1681         ! Open namelist file
1682         !
1683         CALL ctl_opn( iun, cdnamfile, 'OLD', 'FORMATTED', 'SEQUENTIAL', -1, kout, ldwp )
1684         !
1685         ! First pass: count characters excluding comments and trimable white space
1686         !
1687         itot=0
1688     10  READ(iun,'(A256)',END=20,ERR=20) chline
1689         iltc = LEN_TRIM(chline)
1690         IF ( iltc.GT.0 ) THEN
1691          inl = INDEX(chline, '!')
1692          IF( inl.eq.0 ) THEN
1693           itot = itot + iltc + 1                                ! +1 for the newline character
1694          ELSEIF( inl.GT.0 .AND. LEN_TRIM( chline(1:inl-1) ).GT.0 ) THEN
1695           itot = itot + inl                                  !  includes +1 for the newline character
1696          ENDIF
1697         ENDIF
1698         GOTO 10
1699     20  CONTINUE
1700         !
1701         ! Allocate text cdnambuff for condensed namelist
1702         !
1703!$AGRIF_DO_NOT_TREAT
1704         ALLOCATE( CHARACTER(LEN=itot) :: cdnambuff )
1705!$AGRIF_END_DO_NOT_TREAT
1706         itotsav = itot
1707         !
1708         ! Second pass: read and transfer pruned characters into cdnambuff
1709         !
1710         REWIND(iun)
1711         itot=1
1712     30  READ(iun,'(A256)',END=40,ERR=40) chline
1713         iltc = LEN_TRIM(chline)
1714         IF ( iltc.GT.0 ) THEN
1715          inl = INDEX(chline, '!')
1716          IF( inl.eq.0 ) THEN
1717           inl = iltc
1718          ELSE
1719           inl = inl - 1
1720          ENDIF
1721          IF( inl.GT.0 .AND. LEN_TRIM( chline(1:inl) ).GT.0 ) THEN
1722             cdnambuff(itot:itot+inl-1) = chline(1:inl)
1723             WRITE( cdnambuff(itot+inl:itot+inl), '(a)' ) csp
1724             itot = itot + inl + 1
1725          ENDIF
1726         ENDIF
1727         GOTO 30
1728     40  CONTINUE
1729         itot = itot - 1
1730         IF( itotsav .NE. itot ) WRITE(*,*) 'WARNING in load_nml. Allocated ',itotsav,' for read buffer; but used ',itot
1731         !
1732         ! Close namelist file
1733         !
1734         CLOSE(iun)
1735         !write(*,'(32A)') cdnambuff
1736      ENDIF
1737#if ! defined key_mpi_off
1738      CALL mpp_bcast_nml( cdnambuff, itot )
1739#endif
1740  END SUBROUTINE load_nml
1741
1742
1743   !!----------------------------------------------------------------------
1744END MODULE lib_mpp
Note: See TracBrowser for help on using the repository browser.