source: NEMO/branches/2019/dev_r11943_MERGE_2019/src/OCE/LBC/lib_mpp.F90 @ 11960

Last change on this file since 11960 was 11960, checked in by acc, 10 months ago

Branch 2019/dev_r11943_MERGE_2019. Merge in changes from 2019/dev_r11613_ENHANCE-04_namelists_as_internalfiles. (svn merge -r 11614:11954). Resolved tree conflicts and one actual conflict. Sette tested(these changes alter the ext/AGRIF reference; remember to update). See ticket #2341

  • Property svn:keywords set to Id
File size: 59.9 KB
Line 
1MODULE lib_mpp
2   !!======================================================================
3   !!                       ***  MODULE  lib_mpp  ***
4   !! Ocean numerics:  massively parallel processing library
5   !!=====================================================================
6   !! History :  OPA  !  1994  (M. Guyon, J. Escobar, M. Imbard)  Original code
7   !!            7.0  !  1997  (A.M. Treguier)  SHMEM additions
8   !!            8.0  !  1998  (M. Imbard, J. Escobar, L. Colombet ) SHMEM and MPI
9   !!                 !  1998  (J.M. Molines) Open boundary conditions
10   !!   NEMO     1.0  !  2003  (J.M. Molines, G. Madec)  F90, free form
11   !!                 !  2003  (J.M. Molines) add mpp_ini_north(_3d,_2d)
12   !!             -   !  2004  (R. Bourdalle Badie)  isend option in mpi
13   !!                 !  2004  (J.M. Molines) minloc, maxloc
14   !!             -   !  2005  (G. Madec, S. Masson)  npolj=5,6 F-point & ice cases
15   !!             -   !  2005  (R. Redler) Replacement of MPI_COMM_WORLD except for MPI_Abort
16   !!             -   !  2005  (R. Benshila, G. Madec)  add extra halo case
17   !!             -   !  2008  (R. Benshila) add mpp_ini_ice
18   !!            3.2  !  2009  (R. Benshila) SHMEM suppression, north fold in lbc_nfd
19   !!            3.2  !  2009  (O. Marti)    add mpp_ini_znl
20   !!            4.0  !  2011  (G. Madec)  move ctl_ routines from in_out_manager
21   !!            3.5  !  2012  (S.Mocavero, I. Epicoco) Add mpp_lnk_bdy_3d/2d routines to optimize the BDY comm.
22   !!            3.5  !  2013  (C. Ethe, G. Madec)  message passing arrays as local variables
23   !!            3.5  !  2013  (S.Mocavero, I.Epicoco - CMCC) north fold optimizations
24   !!            3.6  !  2015  (O. Tintó and M. Castrillo - BSC) Added '_multiple' case for 2D lbc and max
25   !!            4.0  !  2017  (G. Madec) automatique allocation of array argument (use any 3rd dimension)
26   !!             -   !  2017  (G. Madec) create generic.h90 files to generate all lbc and north fold routines
27   !!----------------------------------------------------------------------
28
29   !!----------------------------------------------------------------------
30   !!   ctl_stop      : update momentum and tracer Kz from a tke scheme
31   !!   ctl_warn      : initialization, namelist read, and parameters control
32   !!   ctl_opn       : Open file and check if required file is available.
33   !!   ctl_nam       : Prints informations when an error occurs while reading a namelist
34   !!   load_nml      : Read, condense and buffer namelist file into character array for use as an internal file
35   !!----------------------------------------------------------------------
36   !!----------------------------------------------------------------------
37   !!   mpp_start     : get local communicator its size and rank
38   !!   mpp_lnk       : interface (defined in lbclnk) for message passing of 2d or 3d arrays (mpp_lnk_2d, mpp_lnk_3d)
39   !!   mpp_lnk_icb   : interface for message passing of 2d arrays with extra halo for icebergs (mpp_lnk_2d_icb)
40   !!   mpprecv       :
41   !!   mppsend       :
42   !!   mppscatter    :
43   !!   mppgather     :
44   !!   mpp_min       : generic interface for mppmin_int , mppmin_a_int , mppmin_real, mppmin_a_real
45   !!   mpp_max       : generic interface for mppmax_int , mppmax_a_int , mppmax_real, mppmax_a_real
46   !!   mpp_sum       : generic interface for mppsum_int , mppsum_a_int , mppsum_real, mppsum_a_real
47   !!   mpp_minloc    :
48   !!   mpp_maxloc    :
49   !!   mppsync       :
50   !!   mppstop       :
51   !!   mpp_ini_north : initialisation of north fold
52   !!   mpp_lbc_north_icb : alternative to mpp_nfd for extra outer halo with icebergs
53   !!   mpp_bcast_nml : broadcast/receive namelist character buffer from reading process to all others
54   !!----------------------------------------------------------------------
55   USE dom_oce        ! ocean space and time domain
56   USE in_out_manager ! I/O manager
57
58   IMPLICIT NONE
59   PRIVATE
60   !
61   PUBLIC   ctl_stop, ctl_warn, ctl_opn, ctl_nam, load_nml
62   PUBLIC   mpp_start, mppstop, mppsync, mpp_comm_free
63   PUBLIC   mpp_ini_north
64   PUBLIC   mpp_min, mpp_max, mpp_sum, mpp_minloc, mpp_maxloc
65   PUBLIC   mpp_delay_max, mpp_delay_sum, mpp_delay_rcv
66   PUBLIC   mppscatter, mppgather
67   PUBLIC   mpp_ini_znl
68   PUBLIC   mppsend, mpprecv                          ! needed by TAM and ICB routines
69   PUBLIC   mpp_report
70   PUBLIC   mpp_bcast_nml
71   PUBLIC   tic_tac
72#if ! defined key_mpp_mpi
73   PUBLIC MPI_Wtime
74#endif
75   
76   !! * Interfaces
77   !! define generic interface for these routine as they are called sometimes
78   !! with scalar arguments instead of array arguments, which causes problems
79   !! for the compilation on AIX system as well as NEC and SGI. Ok on COMPACQ
80   INTERFACE mpp_min
81      MODULE PROCEDURE mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real
82   END INTERFACE
83   INTERFACE mpp_max
84      MODULE PROCEDURE mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real
85   END INTERFACE
86   INTERFACE mpp_sum
87      MODULE PROCEDURE mppsum_a_int, mppsum_int, mppsum_a_real, mppsum_real,   &
88         &             mppsum_realdd, mppsum_a_realdd
89   END INTERFACE
90   INTERFACE mpp_minloc
91      MODULE PROCEDURE mpp_minloc2d ,mpp_minloc3d
92   END INTERFACE
93   INTERFACE mpp_maxloc
94      MODULE PROCEDURE mpp_maxloc2d ,mpp_maxloc3d
95   END INTERFACE
96
97   !! ========================= !!
98   !!  MPI  variable definition !!
99   !! ========================= !!
100#if   defined key_mpp_mpi
101!$AGRIF_DO_NOT_TREAT
102   INCLUDE 'mpif.h'
103!$AGRIF_END_DO_NOT_TREAT
104   LOGICAL, PUBLIC, PARAMETER ::   lk_mpp = .TRUE.    !: mpp flag
105#else   
106   INTEGER, PUBLIC, PARAMETER ::   MPI_STATUS_SIZE = 1
107   INTEGER, PUBLIC, PARAMETER ::   MPI_DOUBLE_PRECISION = 8
108   LOGICAL, PUBLIC, PARAMETER ::   lk_mpp = .FALSE.    !: mpp flag
109#endif
110
111   INTEGER, PARAMETER         ::   nprocmax = 2**10   ! maximun dimension (required to be a power of 2)
112
113   INTEGER, PUBLIC ::   mppsize        ! number of process
114   INTEGER, PUBLIC ::   mpprank        ! process number  [ 0 - size-1 ]
115!$AGRIF_DO_NOT_TREAT
116   INTEGER, PUBLIC ::   mpi_comm_oce   ! opa local communicator
117!$AGRIF_END_DO_NOT_TREAT
118
119   INTEGER :: MPI_SUMDD
120
121   ! variables used for zonal integration
122   INTEGER, PUBLIC ::   ncomm_znl       !: communicator made by the processors on the same zonal average
123   LOGICAL, PUBLIC ::   l_znl_root      !: True on the 'left'most processor on the same row
124   INTEGER         ::   ngrp_znl        !  group ID for the znl processors
125   INTEGER         ::   ndim_rank_znl   !  number of processors on the same zonal average
126   INTEGER, DIMENSION(:), ALLOCATABLE, SAVE ::   nrank_znl  ! dimension ndim_rank_znl, number of the procs into the same znl domain
127
128   ! North fold condition in mpp_mpi with jpni > 1 (PUBLIC for TAM)
129   INTEGER, PUBLIC ::   ngrp_world        !: group ID for the world processors
130   INTEGER, PUBLIC ::   ngrp_opa          !: group ID for the opa processors
131   INTEGER, PUBLIC ::   ngrp_north        !: group ID for the northern processors (to be fold)
132   INTEGER, PUBLIC ::   ncomm_north       !: communicator made by the processors belonging to ngrp_north
133   INTEGER, PUBLIC ::   ndim_rank_north   !: number of 'sea' processor in the northern line (can be /= jpni !)
134   INTEGER, PUBLIC ::   njmppmax          !: value of njmpp for the processors of the northern line
135   INTEGER, PUBLIC ::   north_root        !: number (in the comm_opa) of proc 0 in the northern comm
136   INTEGER, PUBLIC, DIMENSION(:), ALLOCATABLE, SAVE ::   nrank_north   !: dimension ndim_rank_north
137
138   ! Communications summary report
139   CHARACTER(len=128), DIMENSION(:), ALLOCATABLE ::   crname_lbc                   !: names of lbc_lnk calling routines
140   CHARACTER(len=128), DIMENSION(:), ALLOCATABLE ::   crname_glb                   !: names of global comm calling routines
141   CHARACTER(len=128), DIMENSION(:), ALLOCATABLE ::   crname_dlg                   !: names of delayed global comm calling routines
142   INTEGER, PUBLIC                               ::   ncom_stp = 0                 !: copy of time step # istp
143   INTEGER, PUBLIC                               ::   ncom_fsbc = 1                !: copy of sbc time step # nn_fsbc
144   INTEGER, PUBLIC                               ::   ncom_freq                    !: frequency of comm diagnostic
145   INTEGER, PUBLIC , DIMENSION(:,:), ALLOCATABLE ::   ncomm_sequence               !: size of communicated arrays (halos)
146   INTEGER, PARAMETER, PUBLIC                    ::   ncom_rec_max = 5000          !: max number of communication record
147   INTEGER, PUBLIC                               ::   n_sequence_lbc = 0           !: # of communicated arraysvia lbc
148   INTEGER, PUBLIC                               ::   n_sequence_glb = 0           !: # of global communications
149   INTEGER, PUBLIC                               ::   n_sequence_dlg = 0           !: # of delayed global communications
150   INTEGER, PUBLIC                               ::   numcom = -1                  !: logical unit for communicaton report
151   LOGICAL, PUBLIC                               ::   l_full_nf_update = .TRUE.    !: logical for a full (2lines) update of bc at North fold report
152   INTEGER,                    PARAMETER, PUBLIC ::   nbdelay = 2       !: number of delayed operations
153   !: name (used as id) of allreduce-delayed operations
154   ! Warning: we must use the same character length in an array constructor (at least for gcc compiler)
155   CHARACTER(len=32), DIMENSION(nbdelay), PUBLIC ::   c_delaylist = (/ 'cflice', 'fwb   ' /)
156   !: component name where the allreduce-delayed operation is performed
157   CHARACTER(len=3),  DIMENSION(nbdelay), PUBLIC ::   c_delaycpnt = (/ 'ICE'   , 'OCE' /)
158   TYPE, PUBLIC ::   DELAYARR
159      REAL(   wp), POINTER, DIMENSION(:) ::  z1d => NULL()
160      COMPLEX(wp), POINTER, DIMENSION(:) ::  y1d => NULL()
161   END TYPE DELAYARR
162   TYPE( DELAYARR ), DIMENSION(nbdelay), PUBLIC, SAVE  ::   todelay         !: must have SAVE for default initialization of DELAYARR
163   INTEGER,          DIMENSION(nbdelay), PUBLIC        ::   ndelayid = -1   !: mpi request id of the delayed operations
164
165   ! timing summary report
166   REAL(wp), DIMENSION(2), PUBLIC ::  waiting_time = 0._wp
167   REAL(wp)              , PUBLIC ::  compute_time = 0._wp, elapsed_time = 0._wp
168   
169   REAL(wp), DIMENSION(:), ALLOCATABLE, SAVE ::   tampon   ! buffer in case of bsend
170
171   LOGICAL, PUBLIC ::   ln_nnogather                !: namelist control of northfold comms
172   LOGICAL, PUBLIC ::   l_north_nogather = .FALSE.  !: internal control of northfold comms
173   
174   !!----------------------------------------------------------------------
175   !! NEMO/OCE 4.0 , NEMO Consortium (2018)
176   !! $Id$
177   !! Software governed by the CeCILL license (see ./LICENSE)
178   !!----------------------------------------------------------------------
179CONTAINS
180
181   SUBROUTINE mpp_start( localComm )
182      !!----------------------------------------------------------------------
183      !!                  ***  routine mpp_start  ***
184      !!
185      !! ** Purpose :   get mpi_comm_oce, mpprank and mppsize
186      !!----------------------------------------------------------------------
187      INTEGER         , OPTIONAL   , INTENT(in   ) ::   localComm    !
188      !
189      INTEGER ::   ierr
190      LOGICAL ::   llmpi_init
191      !!----------------------------------------------------------------------
192#if defined key_mpp_mpi
193      !
194      CALL mpi_initialized ( llmpi_init, ierr )
195      IF( ierr /= MPI_SUCCESS ) CALL ctl_stop( 'STOP', ' lib_mpp: Error in routine mpi_initialized' )
196
197      IF( .NOT. llmpi_init ) THEN
198         IF( PRESENT(localComm) ) THEN
199            WRITE(ctmp1,*) ' lib_mpp: You cannot provide a local communicator '
200            WRITE(ctmp2,*) '          without calling MPI_Init before ! '
201            CALL ctl_stop( 'STOP', ctmp1, ctmp2 )
202         ENDIF
203         CALL mpi_init( ierr )
204         IF( ierr /= MPI_SUCCESS ) CALL ctl_stop( 'STOP', ' lib_mpp: Error in routine mpi_init' )
205      ENDIF
206       
207      IF( PRESENT(localComm) ) THEN
208         IF( Agrif_Root() ) THEN
209            mpi_comm_oce = localComm
210         ENDIF
211      ELSE
212         CALL mpi_comm_dup( mpi_comm_world, mpi_comm_oce, ierr)
213         IF( ierr /= MPI_SUCCESS ) CALL ctl_stop( 'STOP', ' lib_mpp: Error in routine mpi_comm_dup' )
214      ENDIF
215
216# if defined key_agrif
217      IF( Agrif_Root() ) THEN
218         CALL Agrif_MPI_Init(mpi_comm_oce)
219      ELSE
220         CALL Agrif_MPI_set_grid_comm(mpi_comm_oce)
221      ENDIF
222# endif
223
224      CALL mpi_comm_rank( mpi_comm_oce, mpprank, ierr )
225      CALL mpi_comm_size( mpi_comm_oce, mppsize, ierr )
226      !
227      CALL MPI_OP_CREATE(DDPDD_MPI, .TRUE., MPI_SUMDD, ierr)
228      !
229#else
230      IF( PRESENT( localComm ) ) mpi_comm_oce = localComm
231      mppsize = 1
232      mpprank = 0
233#endif
234   END SUBROUTINE mpp_start
235
236
237   SUBROUTINE mppsend( ktyp, pmess, kbytes, kdest, md_req )
238      !!----------------------------------------------------------------------
239      !!                  ***  routine mppsend  ***
240      !!
241      !! ** Purpose :   Send messag passing array
242      !!
243      !!----------------------------------------------------------------------
244      REAL(wp), INTENT(inout) ::   pmess(*)   ! array of real
245      INTEGER , INTENT(in   ) ::   kbytes     ! size of the array pmess
246      INTEGER , INTENT(in   ) ::   kdest      ! receive process number
247      INTEGER , INTENT(in   ) ::   ktyp       ! tag of the message
248      INTEGER , INTENT(in   ) ::   md_req     ! argument for isend
249      !!
250      INTEGER ::   iflag
251      !!----------------------------------------------------------------------
252      !
253#if defined key_mpp_mpi
254      CALL mpi_isend( pmess, kbytes, mpi_double_precision, kdest , ktyp, mpi_comm_oce, md_req, iflag )
255#endif
256      !
257   END SUBROUTINE mppsend
258
259
260   SUBROUTINE mpprecv( ktyp, pmess, kbytes, ksource )
261      !!----------------------------------------------------------------------
262      !!                  ***  routine mpprecv  ***
263      !!
264      !! ** Purpose :   Receive messag passing array
265      !!
266      !!----------------------------------------------------------------------
267      REAL(wp), INTENT(inout) ::   pmess(*)   ! array of real
268      INTEGER , INTENT(in   ) ::   kbytes     ! suze of the array pmess
269      INTEGER , INTENT(in   ) ::   ktyp       ! Tag of the recevied message
270      INTEGER, OPTIONAL, INTENT(in) :: ksource    ! source process number
271      !!
272      INTEGER :: istatus(mpi_status_size)
273      INTEGER :: iflag
274      INTEGER :: use_source
275      !!----------------------------------------------------------------------
276      !
277#if defined key_mpp_mpi
278      ! If a specific process number has been passed to the receive call,
279      ! use that one. Default is to use mpi_any_source
280      use_source = mpi_any_source
281      IF( PRESENT(ksource) )   use_source = ksource
282      !
283      CALL mpi_recv( pmess, kbytes, mpi_double_precision, use_source, ktyp, mpi_comm_oce, istatus, iflag )
284#endif
285      !
286   END SUBROUTINE mpprecv
287
288
289   SUBROUTINE mppgather( ptab, kp, pio )
290      !!----------------------------------------------------------------------
291      !!                   ***  routine mppgather  ***
292      !!
293      !! ** Purpose :   Transfert between a local subdomain array and a work
294      !!     array which is distributed following the vertical level.
295      !!
296      !!----------------------------------------------------------------------
297      REAL(wp), DIMENSION(jpi,jpj)      , INTENT(in   ) ::   ptab   ! subdomain input array
298      INTEGER                           , INTENT(in   ) ::   kp     ! record length
299      REAL(wp), DIMENSION(jpi,jpj,jpnij), INTENT(  out) ::   pio    ! subdomain input array
300      !!
301      INTEGER :: itaille, ierror   ! temporary integer
302      !!---------------------------------------------------------------------
303      !
304      itaille = jpi * jpj
305#if defined key_mpp_mpi
306      CALL mpi_gather( ptab, itaille, mpi_double_precision, pio, itaille     ,   &
307         &                            mpi_double_precision, kp , mpi_comm_oce, ierror )
308#else
309      pio(:,:,1) = ptab(:,:)
310#endif
311      !
312   END SUBROUTINE mppgather
313
314
315   SUBROUTINE mppscatter( pio, kp, ptab )
316      !!----------------------------------------------------------------------
317      !!                  ***  routine mppscatter  ***
318      !!
319      !! ** Purpose :   Transfert between awork array which is distributed
320      !!      following the vertical level and the local subdomain array.
321      !!
322      !!----------------------------------------------------------------------
323      REAL(wp), DIMENSION(jpi,jpj,jpnij)  ::   pio    ! output array
324      INTEGER                             ::   kp     ! Tag (not used with MPI
325      REAL(wp), DIMENSION(jpi,jpj)        ::   ptab   ! subdomain array input
326      !!
327      INTEGER :: itaille, ierror   ! temporary integer
328      !!---------------------------------------------------------------------
329      !
330      itaille = jpi * jpj
331      !
332#if defined key_mpp_mpi
333      CALL mpi_scatter( pio, itaille, mpi_double_precision, ptab, itaille     ,   &
334         &                            mpi_double_precision, kp  , mpi_comm_oce, ierror )
335#else
336      ptab(:,:) = pio(:,:,1)
337#endif
338      !
339   END SUBROUTINE mppscatter
340
341   
342   SUBROUTINE mpp_delay_sum( cdname, cdelay, y_in, pout, ldlast, kcom )
343     !!----------------------------------------------------------------------
344      !!                   ***  routine mpp_delay_sum  ***
345      !!
346      !! ** Purpose :   performed delayed mpp_sum, the result is received on next call
347      !!
348      !!----------------------------------------------------------------------
349      CHARACTER(len=*), INTENT(in   )               ::   cdname  ! name of the calling subroutine
350      CHARACTER(len=*), INTENT(in   )               ::   cdelay  ! name (used as id) of the delayed operation
351      COMPLEX(wp),      INTENT(in   ), DIMENSION(:) ::   y_in
352      REAL(wp),         INTENT(  out), DIMENSION(:) ::   pout
353      LOGICAL,          INTENT(in   )               ::   ldlast  ! true if this is the last time we call this routine
354      INTEGER,          INTENT(in   ), OPTIONAL     ::   kcom
355      !!
356      INTEGER ::   ji, isz
357      INTEGER ::   idvar
358      INTEGER ::   ierr, ilocalcomm
359      COMPLEX(wp), ALLOCATABLE, DIMENSION(:) ::   ytmp
360      !!----------------------------------------------------------------------
361#if defined key_mpp_mpi
362      ilocalcomm = mpi_comm_oce
363      IF( PRESENT(kcom) )   ilocalcomm = kcom
364
365      isz = SIZE(y_in)
366     
367      IF( narea == 1 .AND. numcom == -1 ) CALL mpp_report( cdname, ld_dlg = .TRUE. )
368
369      idvar = -1
370      DO ji = 1, nbdelay
371         IF( TRIM(cdelay) == TRIM(c_delaylist(ji)) ) idvar = ji
372      END DO
373      IF ( idvar == -1 )   CALL ctl_stop( 'STOP',' mpp_delay_sum : please add a new delayed exchange for '//TRIM(cdname) )
374
375      IF ( ndelayid(idvar) == 0 ) THEN         ! first call    with restart: %z1d defined in iom_delay_rst
376         !                                       --------------------------
377         IF ( SIZE(todelay(idvar)%z1d) /= isz ) THEN                  ! Check dimension coherence
378            IF(lwp) WRITE(numout,*) ' WARNING: the nb of delayed variables in restart file is not the model one'
379            DEALLOCATE(todelay(idvar)%z1d)
380            ndelayid(idvar) = -1                                      ! do as if we had no restart
381         ELSE
382            ALLOCATE(todelay(idvar)%y1d(isz))
383            todelay(idvar)%y1d(:) = CMPLX(todelay(idvar)%z1d(:), 0., wp)   ! create %y1d, complex variable needed by mpi_sumdd
384         END IF
385      ENDIF
386     
387      IF( ndelayid(idvar) == -1 ) THEN         ! first call without restart: define %y1d and %z1d from y_in with blocking allreduce
388         !                                       --------------------------
389         ALLOCATE(todelay(idvar)%z1d(isz), todelay(idvar)%y1d(isz))   ! allocate also %z1d as used for the restart
390         CALL mpi_allreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ierr )   ! get %y1d
391         todelay(idvar)%z1d(:) = REAL(todelay(idvar)%y1d(:), wp)      ! define %z1d from %y1d
392      ENDIF
393
394      IF( ndelayid(idvar) > 0 )   CALL mpp_delay_rcv( idvar )         ! make sure %z1d is received
395
396      ! send back pout from todelay(idvar)%z1d defined at previous call
397      pout(:) = todelay(idvar)%z1d(:)
398
399      ! send y_in into todelay(idvar)%y1d with a non-blocking communication
400# if defined key_mpi2
401      IF( ln_timing ) CALL tic_tac( .TRUE., ld_global = .TRUE.)
402      CALL  mpi_allreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ndelayid(idvar), ierr )
403      IF( ln_timing ) CALL tic_tac(.FALSE., ld_global = .TRUE.)
404# else
405      CALL mpi_iallreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ndelayid(idvar), ierr )
406# endif
407#else
408      pout(:) = REAL(y_in(:), wp)
409#endif
410
411   END SUBROUTINE mpp_delay_sum
412
413   
414   SUBROUTINE mpp_delay_max( cdname, cdelay, p_in, pout, ldlast, kcom )
415      !!----------------------------------------------------------------------
416      !!                   ***  routine mpp_delay_max  ***
417      !!
418      !! ** Purpose :   performed delayed mpp_max, the result is received on next call
419      !!
420      !!----------------------------------------------------------------------
421      CHARACTER(len=*), INTENT(in   )                 ::   cdname  ! name of the calling subroutine
422      CHARACTER(len=*), INTENT(in   )                 ::   cdelay  ! name (used as id) of the delayed operation
423      REAL(wp),         INTENT(in   ), DIMENSION(:)   ::   p_in    !
424      REAL(wp),         INTENT(  out), DIMENSION(:)   ::   pout    !
425      LOGICAL,          INTENT(in   )                 ::   ldlast  ! true if this is the last time we call this routine
426      INTEGER,          INTENT(in   ), OPTIONAL       ::   kcom
427      !!
428      INTEGER ::   ji, isz
429      INTEGER ::   idvar
430      INTEGER ::   ierr, ilocalcomm
431      !!----------------------------------------------------------------------
432#if defined key_mpp_mpi
433      ilocalcomm = mpi_comm_oce
434      IF( PRESENT(kcom) )   ilocalcomm = kcom
435
436      isz = SIZE(p_in)
437
438      IF( narea == 1 .AND. numcom == -1 ) CALL mpp_report( cdname, ld_dlg = .TRUE. )
439
440      idvar = -1
441      DO ji = 1, nbdelay
442         IF( TRIM(cdelay) == TRIM(c_delaylist(ji)) ) idvar = ji
443      END DO
444      IF ( idvar == -1 )   CALL ctl_stop( 'STOP',' mpp_delay_max : please add a new delayed exchange for '//TRIM(cdname) )
445
446      IF ( ndelayid(idvar) == 0 ) THEN         ! first call    with restart: %z1d defined in iom_delay_rst
447         !                                       --------------------------
448         IF ( SIZE(todelay(idvar)%z1d) /= isz ) THEN                  ! Check dimension coherence
449            IF(lwp) WRITE(numout,*) ' WARNING: the nb of delayed variables in restart file is not the model one'
450            DEALLOCATE(todelay(idvar)%z1d)
451            ndelayid(idvar) = -1                                      ! do as if we had no restart
452         END IF
453      ENDIF
454
455      IF( ndelayid(idvar) == -1 ) THEN         ! first call without restart: define %z1d from p_in with a blocking allreduce
456         !                                       --------------------------
457         ALLOCATE(todelay(idvar)%z1d(isz))
458         CALL mpi_allreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_DOUBLE_PRECISION, mpi_max, ilocalcomm, ierr )   ! get %z1d
459      ENDIF
460
461      IF( ndelayid(idvar) > 0 )   CALL mpp_delay_rcv( idvar )         ! make sure %z1d is received
462
463      ! send back pout from todelay(idvar)%z1d defined at previous call
464      pout(:) = todelay(idvar)%z1d(:)
465
466      ! send p_in into todelay(idvar)%z1d with a non-blocking communication
467# if defined key_mpi2
468      IF( ln_timing ) CALL tic_tac( .TRUE., ld_global = .TRUE.)
469      CALL  mpi_allreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_DOUBLE_PRECISION, mpi_max, ilocalcomm, ndelayid(idvar), ierr )
470      IF( ln_timing ) CALL tic_tac(.FALSE., ld_global = .TRUE.)
471# else
472      CALL mpi_iallreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_DOUBLE_PRECISION, mpi_max, ilocalcomm, ndelayid(idvar), ierr )
473# endif
474#else
475      pout(:) = p_in(:)
476#endif
477
478   END SUBROUTINE mpp_delay_max
479
480   
481   SUBROUTINE mpp_delay_rcv( kid )
482      !!----------------------------------------------------------------------
483      !!                   ***  routine mpp_delay_rcv  ***
484      !!
485      !! ** Purpose :  force barrier for delayed mpp (needed for restart)
486      !!
487      !!----------------------------------------------------------------------
488      INTEGER,INTENT(in   )      ::  kid 
489      INTEGER ::   ierr
490      !!----------------------------------------------------------------------
491#if defined key_mpp_mpi
492      IF( ndelayid(kid) /= -2 ) THEN 
493#if ! defined key_mpi2
494         IF( ln_timing ) CALL tic_tac( .TRUE., ld_global = .TRUE.)
495         CALL mpi_wait( ndelayid(kid), MPI_STATUS_IGNORE, ierr )                        ! make sure todelay(kid) is received
496         IF( ln_timing ) CALL tic_tac(.FALSE., ld_global = .TRUE.)
497#endif
498         IF( ASSOCIATED(todelay(kid)%y1d) )   todelay(kid)%z1d(:) = REAL(todelay(kid)%y1d(:), wp)  ! define %z1d from %y1d
499         ndelayid(kid) = -2   ! add flag to know that mpi_wait was already called on kid
500      ENDIF
501#endif
502   END SUBROUTINE mpp_delay_rcv
503
504   SUBROUTINE mpp_bcast_nml( cdnambuff , kleng )
505      CHARACTER(LEN=:)    , ALLOCATABLE, INTENT(INOUT) :: cdnambuff
506      INTEGER                          , INTENT(INOUT) :: kleng
507      !!----------------------------------------------------------------------
508      !!                  ***  routine mpp_bcast_nml  ***
509      !!
510      !! ** Purpose :   broadcast namelist character buffer
511      !!
512      !!----------------------------------------------------------------------
513      !!
514      INTEGER ::   iflag
515      !!----------------------------------------------------------------------
516      !
517#if defined key_mpp_mpi
518      call MPI_BCAST(kleng, 1, MPI_INT, 0, mpi_comm_oce, iflag)
519      call MPI_BARRIER(mpi_comm_oce, iflag)
520!$AGRIF_DO_NOT_TREAT
521      IF ( .NOT. ALLOCATED(cdnambuff) ) ALLOCATE( CHARACTER(LEN=kleng) :: cdnambuff )
522!$AGRIF_END_DO_NOT_TREAT
523      call MPI_BCAST(cdnambuff, kleng, MPI_CHARACTER, 0, mpi_comm_oce, iflag)
524      call MPI_BARRIER(mpi_comm_oce, iflag)
525#endif
526      !
527   END SUBROUTINE mpp_bcast_nml
528
529   
530   !!----------------------------------------------------------------------
531   !!    ***  mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real  ***
532   !!   
533   !!----------------------------------------------------------------------
534   !!
535#  define OPERATION_MAX
536#  define INTEGER_TYPE
537#  define DIM_0d
538#     define ROUTINE_ALLREDUCE           mppmax_int
539#     include "mpp_allreduce_generic.h90"
540#     undef ROUTINE_ALLREDUCE
541#  undef DIM_0d
542#  define DIM_1d
543#     define ROUTINE_ALLREDUCE           mppmax_a_int
544#     include "mpp_allreduce_generic.h90"
545#     undef ROUTINE_ALLREDUCE
546#  undef DIM_1d
547#  undef INTEGER_TYPE
548!
549#  define REAL_TYPE
550#  define DIM_0d
551#     define ROUTINE_ALLREDUCE           mppmax_real
552#     include "mpp_allreduce_generic.h90"
553#     undef ROUTINE_ALLREDUCE
554#  undef DIM_0d
555#  define DIM_1d
556#     define ROUTINE_ALLREDUCE           mppmax_a_real
557#     include "mpp_allreduce_generic.h90"
558#     undef ROUTINE_ALLREDUCE
559#  undef DIM_1d
560#  undef REAL_TYPE
561#  undef OPERATION_MAX
562   !!----------------------------------------------------------------------
563   !!    ***  mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real  ***
564   !!   
565   !!----------------------------------------------------------------------
566   !!
567#  define OPERATION_MIN
568#  define INTEGER_TYPE
569#  define DIM_0d
570#     define ROUTINE_ALLREDUCE           mppmin_int
571#     include "mpp_allreduce_generic.h90"
572#     undef ROUTINE_ALLREDUCE
573#  undef DIM_0d
574#  define DIM_1d
575#     define ROUTINE_ALLREDUCE           mppmin_a_int
576#     include "mpp_allreduce_generic.h90"
577#     undef ROUTINE_ALLREDUCE
578#  undef DIM_1d
579#  undef INTEGER_TYPE
580!
581#  define REAL_TYPE
582#  define DIM_0d
583#     define ROUTINE_ALLREDUCE           mppmin_real
584#     include "mpp_allreduce_generic.h90"
585#     undef ROUTINE_ALLREDUCE
586#  undef DIM_0d
587#  define DIM_1d
588#     define ROUTINE_ALLREDUCE           mppmin_a_real
589#     include "mpp_allreduce_generic.h90"
590#     undef ROUTINE_ALLREDUCE
591#  undef DIM_1d
592#  undef REAL_TYPE
593#  undef OPERATION_MIN
594
595   !!----------------------------------------------------------------------
596   !!    ***  mppsum_a_int, mppsum_int, mppsum_a_real, mppsum_real  ***
597   !!   
598   !!   Global sum of 1D array or a variable (integer, real or complex)
599   !!----------------------------------------------------------------------
600   !!
601#  define OPERATION_SUM
602#  define INTEGER_TYPE
603#  define DIM_0d
604#     define ROUTINE_ALLREDUCE           mppsum_int
605#     include "mpp_allreduce_generic.h90"
606#     undef ROUTINE_ALLREDUCE
607#  undef DIM_0d
608#  define DIM_1d
609#     define ROUTINE_ALLREDUCE           mppsum_a_int
610#     include "mpp_allreduce_generic.h90"
611#     undef ROUTINE_ALLREDUCE
612#  undef DIM_1d
613#  undef INTEGER_TYPE
614!
615#  define REAL_TYPE
616#  define DIM_0d
617#     define ROUTINE_ALLREDUCE           mppsum_real
618#     include "mpp_allreduce_generic.h90"
619#     undef ROUTINE_ALLREDUCE
620#  undef DIM_0d
621#  define DIM_1d
622#     define ROUTINE_ALLREDUCE           mppsum_a_real
623#     include "mpp_allreduce_generic.h90"
624#     undef ROUTINE_ALLREDUCE
625#  undef DIM_1d
626#  undef REAL_TYPE
627#  undef OPERATION_SUM
628
629#  define OPERATION_SUM_DD
630#  define COMPLEX_TYPE
631#  define DIM_0d
632#     define ROUTINE_ALLREDUCE           mppsum_realdd
633#     include "mpp_allreduce_generic.h90"
634#     undef ROUTINE_ALLREDUCE
635#  undef DIM_0d
636#  define DIM_1d
637#     define ROUTINE_ALLREDUCE           mppsum_a_realdd
638#     include "mpp_allreduce_generic.h90"
639#     undef ROUTINE_ALLREDUCE
640#  undef DIM_1d
641#  undef COMPLEX_TYPE
642#  undef OPERATION_SUM_DD
643
644   !!----------------------------------------------------------------------
645   !!    ***  mpp_minloc2d, mpp_minloc3d, mpp_maxloc2d, mpp_maxloc3d
646   !!   
647   !!----------------------------------------------------------------------
648   !!
649#  define OPERATION_MINLOC
650#  define DIM_2d
651#     define ROUTINE_LOC           mpp_minloc2d
652#     include "mpp_loc_generic.h90"
653#     undef ROUTINE_LOC
654#  undef DIM_2d
655#  define DIM_3d
656#     define ROUTINE_LOC           mpp_minloc3d
657#     include "mpp_loc_generic.h90"
658#     undef ROUTINE_LOC
659#  undef DIM_3d
660#  undef OPERATION_MINLOC
661
662#  define OPERATION_MAXLOC
663#  define DIM_2d
664#     define ROUTINE_LOC           mpp_maxloc2d
665#     include "mpp_loc_generic.h90"
666#     undef ROUTINE_LOC
667#  undef DIM_2d
668#  define DIM_3d
669#     define ROUTINE_LOC           mpp_maxloc3d
670#     include "mpp_loc_generic.h90"
671#     undef ROUTINE_LOC
672#  undef DIM_3d
673#  undef OPERATION_MAXLOC
674
675   SUBROUTINE mppsync()
676      !!----------------------------------------------------------------------
677      !!                  ***  routine mppsync  ***
678      !!
679      !! ** Purpose :   Massively parallel processors, synchroneous
680      !!
681      !!-----------------------------------------------------------------------
682      INTEGER :: ierror
683      !!-----------------------------------------------------------------------
684      !
685#if defined key_mpp_mpi
686      CALL mpi_barrier( mpi_comm_oce, ierror )
687#endif
688      !
689   END SUBROUTINE mppsync
690
691
692   SUBROUTINE mppstop( ld_abort ) 
693      !!----------------------------------------------------------------------
694      !!                  ***  routine mppstop  ***
695      !!
696      !! ** purpose :   Stop massively parallel processors method
697      !!
698      !!----------------------------------------------------------------------
699      LOGICAL, OPTIONAL, INTENT(in) :: ld_abort    ! source process number
700      LOGICAL ::   ll_abort
701      INTEGER ::   info
702      !!----------------------------------------------------------------------
703      ll_abort = .FALSE.
704      IF( PRESENT(ld_abort) ) ll_abort = ld_abort
705      !
706#if defined key_mpp_mpi
707      IF(ll_abort) THEN
708         CALL mpi_abort( MPI_COMM_WORLD )
709      ELSE
710         CALL mppsync
711         CALL mpi_finalize( info )
712      ENDIF
713#endif
714      IF( ll_abort ) STOP 123
715      !
716   END SUBROUTINE mppstop
717
718
719   SUBROUTINE mpp_comm_free( kcom )
720      !!----------------------------------------------------------------------
721      INTEGER, INTENT(in) ::   kcom
722      !!
723      INTEGER :: ierr
724      !!----------------------------------------------------------------------
725      !
726#if defined key_mpp_mpi
727      CALL MPI_COMM_FREE(kcom, ierr)
728#endif
729      !
730   END SUBROUTINE mpp_comm_free
731
732
733   SUBROUTINE mpp_ini_znl( kumout )
734      !!----------------------------------------------------------------------
735      !!               ***  routine mpp_ini_znl  ***
736      !!
737      !! ** Purpose :   Initialize special communicator for computing zonal sum
738      !!
739      !! ** Method  : - Look for processors in the same row
740      !!              - Put their number in nrank_znl
741      !!              - Create group for the znl processors
742      !!              - Create a communicator for znl processors
743      !!              - Determine if processor should write znl files
744      !!
745      !! ** output
746      !!      ndim_rank_znl = number of processors on the same row
747      !!      ngrp_znl = group ID for the znl processors
748      !!      ncomm_znl = communicator for the ice procs.
749      !!      n_znl_root = number (in the world) of proc 0 in the ice comm.
750      !!
751      !!----------------------------------------------------------------------
752      INTEGER, INTENT(in) ::   kumout   ! ocean.output logical units
753      !
754      INTEGER :: jproc      ! dummy loop integer
755      INTEGER :: ierr, ii   ! local integer
756      INTEGER, ALLOCATABLE, DIMENSION(:) ::   kwork
757      !!----------------------------------------------------------------------
758#if defined key_mpp_mpi
759      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_world     : ', ngrp_world
760      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - mpi_comm_world : ', mpi_comm_world
761      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - mpi_comm_oce   : ', mpi_comm_oce
762      !
763      ALLOCATE( kwork(jpnij), STAT=ierr )
764      IF( ierr /= 0 ) CALL ctl_stop( 'STOP', 'mpp_ini_znl : failed to allocate 1D array of length jpnij')
765
766      IF( jpnj == 1 ) THEN
767         ngrp_znl  = ngrp_world
768         ncomm_znl = mpi_comm_oce
769      ELSE
770         !
771         CALL MPI_ALLGATHER ( njmpp, 1, mpi_integer, kwork, 1, mpi_integer, mpi_comm_oce, ierr )
772         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - kwork pour njmpp : ', kwork
773         !-$$        CALL flush(numout)
774         !
775         ! Count number of processors on the same row
776         ndim_rank_znl = 0
777         DO jproc=1,jpnij
778            IF ( kwork(jproc) == njmpp ) THEN
779               ndim_rank_znl = ndim_rank_znl + 1
780            ENDIF
781         END DO
782         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ndim_rank_znl : ', ndim_rank_znl
783         !-$$        CALL flush(numout)
784         ! Allocate the right size to nrank_znl
785         IF (ALLOCATED (nrank_znl)) DEALLOCATE(nrank_znl)
786         ALLOCATE(nrank_znl(ndim_rank_znl))
787         ii = 0
788         nrank_znl (:) = 0
789         DO jproc=1,jpnij
790            IF ( kwork(jproc) == njmpp) THEN
791               ii = ii + 1
792               nrank_znl(ii) = jproc -1
793            ENDIF
794         END DO
795         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - nrank_znl : ', nrank_znl
796         !-$$        CALL flush(numout)
797
798         ! Create the opa group
799         CALL MPI_COMM_GROUP(mpi_comm_oce,ngrp_opa,ierr)
800         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_opa : ', ngrp_opa
801         !-$$        CALL flush(numout)
802
803         ! Create the znl group from the opa group
804         CALL MPI_GROUP_INCL  ( ngrp_opa, ndim_rank_znl, nrank_znl, ngrp_znl, ierr )
805         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_znl ', ngrp_znl
806         !-$$        CALL flush(numout)
807
808         ! Create the znl communicator from the opa communicator, ie the pool of procs in the same row
809         CALL MPI_COMM_CREATE ( mpi_comm_oce, ngrp_znl, ncomm_znl, ierr )
810         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ncomm_znl ', ncomm_znl
811         !-$$        CALL flush(numout)
812         !
813      END IF
814
815      ! Determines if processor if the first (starting from i=1) on the row
816      IF ( jpni == 1 ) THEN
817         l_znl_root = .TRUE.
818      ELSE
819         l_znl_root = .FALSE.
820         kwork (1) = nimpp
821         CALL mpp_min ( 'lib_mpp', kwork(1), kcom = ncomm_znl)
822         IF ( nimpp == kwork(1)) l_znl_root = .TRUE.
823      END IF
824
825      DEALLOCATE(kwork)
826#endif
827
828   END SUBROUTINE mpp_ini_znl
829
830
831   SUBROUTINE mpp_ini_north
832      !!----------------------------------------------------------------------
833      !!               ***  routine mpp_ini_north  ***
834      !!
835      !! ** Purpose :   Initialize special communicator for north folding
836      !!      condition together with global variables needed in the mpp folding
837      !!
838      !! ** Method  : - Look for northern processors
839      !!              - Put their number in nrank_north
840      !!              - Create groups for the world processors and the north processors
841      !!              - Create a communicator for northern processors
842      !!
843      !! ** output
844      !!      njmppmax = njmpp for northern procs
845      !!      ndim_rank_north = number of processors in the northern line
846      !!      nrank_north (ndim_rank_north) = number  of the northern procs.
847      !!      ngrp_world = group ID for the world processors
848      !!      ngrp_north = group ID for the northern processors
849      !!      ncomm_north = communicator for the northern procs.
850      !!      north_root = number (in the world) of proc 0 in the northern comm.
851      !!
852      !!----------------------------------------------------------------------
853      INTEGER ::   ierr
854      INTEGER ::   jjproc
855      INTEGER ::   ii, ji
856      !!----------------------------------------------------------------------
857      !
858#if defined key_mpp_mpi
859      njmppmax = MAXVAL( njmppt )
860      !
861      ! Look for how many procs on the northern boundary
862      ndim_rank_north = 0
863      DO jjproc = 1, jpnij
864         IF( njmppt(jjproc) == njmppmax )   ndim_rank_north = ndim_rank_north + 1
865      END DO
866      !
867      ! Allocate the right size to nrank_north
868      IF (ALLOCATED (nrank_north)) DEALLOCATE(nrank_north)
869      ALLOCATE( nrank_north(ndim_rank_north) )
870
871      ! Fill the nrank_north array with proc. number of northern procs.
872      ! Note : the rank start at 0 in MPI
873      ii = 0
874      DO ji = 1, jpnij
875         IF ( njmppt(ji) == njmppmax   ) THEN
876            ii=ii+1
877            nrank_north(ii)=ji-1
878         END IF
879      END DO
880      !
881      ! create the world group
882      CALL MPI_COMM_GROUP( mpi_comm_oce, ngrp_world, ierr )
883      !
884      ! Create the North group from the world group
885      CALL MPI_GROUP_INCL( ngrp_world, ndim_rank_north, nrank_north, ngrp_north, ierr )
886      !
887      ! Create the North communicator , ie the pool of procs in the north group
888      CALL MPI_COMM_CREATE( mpi_comm_oce, ngrp_north, ncomm_north, ierr )
889      !
890#endif
891   END SUBROUTINE mpp_ini_north
892
893
894   SUBROUTINE DDPDD_MPI( ydda, yddb, ilen, itype )
895      !!---------------------------------------------------------------------
896      !!   Routine DDPDD_MPI: used by reduction operator MPI_SUMDD
897      !!
898      !!   Modification of original codes written by David H. Bailey
899      !!   This subroutine computes yddb(i) = ydda(i)+yddb(i)
900      !!---------------------------------------------------------------------
901      INTEGER                     , INTENT(in)    ::   ilen, itype
902      COMPLEX(wp), DIMENSION(ilen), INTENT(in)    ::   ydda
903      COMPLEX(wp), DIMENSION(ilen), INTENT(inout) ::   yddb
904      !
905      REAL(wp) :: zerr, zt1, zt2    ! local work variables
906      INTEGER  :: ji, ztmp           ! local scalar
907      !!---------------------------------------------------------------------
908      !
909      ztmp = itype   ! avoid compilation warning
910      !
911      DO ji=1,ilen
912      ! Compute ydda + yddb using Knuth's trick.
913         zt1  = real(ydda(ji)) + real(yddb(ji))
914         zerr = zt1 - real(ydda(ji))
915         zt2  = ((real(yddb(ji)) - zerr) + (real(ydda(ji)) - (zt1 - zerr))) &
916                + aimag(ydda(ji)) + aimag(yddb(ji))
917
918         ! The result is zt1 + zt2, after normalization.
919         yddb(ji) = cmplx ( zt1 + zt2, zt2 - ((zt1 + zt2) - zt1),wp )
920      END DO
921      !
922   END SUBROUTINE DDPDD_MPI
923
924
925   SUBROUTINE mpp_report( cdname, kpk, kpl, kpf, ld_lbc, ld_glb, ld_dlg )
926      !!----------------------------------------------------------------------
927      !!                  ***  routine mpp_report  ***
928      !!
929      !! ** Purpose :   report use of mpp routines per time-setp
930      !!
931      !!----------------------------------------------------------------------
932      CHARACTER(len=*),           INTENT(in   ) ::   cdname      ! name of the calling subroutine
933      INTEGER         , OPTIONAL, INTENT(in   ) ::   kpk, kpl, kpf
934      LOGICAL         , OPTIONAL, INTENT(in   ) ::   ld_lbc, ld_glb, ld_dlg
935      !!
936      CHARACTER(len=128)                        ::   ccountname  ! name of a subroutine to count communications
937      LOGICAL ::   ll_lbc, ll_glb, ll_dlg
938      INTEGER ::    ji,  jj,  jk,  jh, jf, jcount   ! dummy loop indices
939      !!----------------------------------------------------------------------
940#if defined key_mpp_mpi
941      !
942      ll_lbc = .FALSE.
943      IF( PRESENT(ld_lbc) ) ll_lbc = ld_lbc
944      ll_glb = .FALSE.
945      IF( PRESENT(ld_glb) ) ll_glb = ld_glb
946      ll_dlg = .FALSE.
947      IF( PRESENT(ld_dlg) ) ll_dlg = ld_dlg
948      !
949      ! find the smallest common frequency: default = frequency product, if multiple, choose the larger of the 2 frequency
950      ncom_freq = ncom_fsbc
951      !
952      IF ( ncom_stp == nit000+ncom_freq ) THEN   ! avoid to count extra communications in potential initializations at nit000
953         IF( ll_lbc ) THEN
954            IF( .NOT. ALLOCATED(ncomm_sequence) ) ALLOCATE( ncomm_sequence(ncom_rec_max,2) )
955            IF( .NOT. ALLOCATED(    crname_lbc) ) ALLOCATE(     crname_lbc(ncom_rec_max  ) )
956            n_sequence_lbc = n_sequence_lbc + 1
957            IF( n_sequence_lbc > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
958            crname_lbc(n_sequence_lbc) = cdname     ! keep the name of the calling routine
959            ncomm_sequence(n_sequence_lbc,1) = kpk*kpl   ! size of 3rd and 4th dimensions
960            ncomm_sequence(n_sequence_lbc,2) = kpf       ! number of arrays to be treated (multi)
961         ENDIF
962         IF( ll_glb ) THEN
963            IF( .NOT. ALLOCATED(crname_glb) ) ALLOCATE( crname_glb(ncom_rec_max) )
964            n_sequence_glb = n_sequence_glb + 1
965            IF( n_sequence_glb > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
966            crname_glb(n_sequence_glb) = cdname     ! keep the name of the calling routine
967         ENDIF
968         IF( ll_dlg ) THEN
969            IF( .NOT. ALLOCATED(crname_dlg) ) ALLOCATE( crname_dlg(ncom_rec_max) )
970            n_sequence_dlg = n_sequence_dlg + 1
971            IF( n_sequence_dlg > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
972            crname_dlg(n_sequence_dlg) = cdname     ! keep the name of the calling routine
973         ENDIF
974      ELSE IF ( ncom_stp == nit000+2*ncom_freq ) THEN
975         CALL ctl_opn( numcom, 'communication_report.txt', 'REPLACE', 'FORMATTED', 'SEQUENTIAL', -1, numout, .FALSE., narea )
976         WRITE(numcom,*) ' '
977         WRITE(numcom,*) ' ------------------------------------------------------------'
978         WRITE(numcom,*) ' Communication pattern report (second oce+sbc+top time step):'
979         WRITE(numcom,*) ' ------------------------------------------------------------'
980         WRITE(numcom,*) ' '
981         WRITE(numcom,'(A,I4)') ' Exchanged halos : ', n_sequence_lbc
982         jj = 0; jk = 0; jf = 0; jh = 0
983         DO ji = 1, n_sequence_lbc
984            IF ( ncomm_sequence(ji,1) .GT. 1 ) jk = jk + 1
985            IF ( ncomm_sequence(ji,2) .GT. 1 ) jf = jf + 1
986            IF ( ncomm_sequence(ji,1) .GT. 1 .AND. ncomm_sequence(ji,2) .GT. 1 ) jj = jj + 1
987            jh = MAX (jh, ncomm_sequence(ji,1)*ncomm_sequence(ji,2))
988         END DO
989         WRITE(numcom,'(A,I3)') ' 3D Exchanged halos : ', jk
990         WRITE(numcom,'(A,I3)') ' Multi arrays exchanged halos : ', jf
991         WRITE(numcom,'(A,I3)') '   from which 3D : ', jj
992         WRITE(numcom,'(A,I10)') ' Array max size : ', jh*jpi*jpj
993         WRITE(numcom,*) ' '
994         WRITE(numcom,*) ' lbc_lnk called'
995         DO ji = 1, n_sequence_lbc - 1
996            IF ( crname_lbc(ji) /= 'already counted' ) THEN
997               ccountname = crname_lbc(ji)
998               crname_lbc(ji) = 'already counted'
999               jcount = 1
1000               DO jj = ji + 1, n_sequence_lbc
1001                  IF ( ccountname ==  crname_lbc(jj) ) THEN
1002                     jcount = jcount + 1
1003                     crname_lbc(jj) = 'already counted'
1004                  END IF
1005               END DO
1006               WRITE(numcom,'(A, I4, A, A)') ' - ', jcount,' times by subroutine ', TRIM(ccountname)
1007            END IF
1008         END DO
1009         IF ( crname_lbc(n_sequence_lbc) /= 'already counted' ) THEN
1010            WRITE(numcom,'(A, I4, A, A)') ' - ', 1,' times by subroutine ', TRIM(crname_lbc(ncom_rec_max))
1011         END IF
1012         WRITE(numcom,*) ' '
1013         IF ( n_sequence_glb > 0 ) THEN
1014            WRITE(numcom,'(A,I4)') ' Global communications : ', n_sequence_glb
1015            jj = 1
1016            DO ji = 2, n_sequence_glb
1017               IF( crname_glb(ji-1) /= crname_glb(ji) ) THEN
1018                  WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_glb(ji-1))
1019                  jj = 0
1020               END IF
1021               jj = jj + 1 
1022            END DO
1023            WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_glb(n_sequence_glb))
1024            DEALLOCATE(crname_glb)
1025         ELSE
1026            WRITE(numcom,*) ' No MPI global communication '
1027         ENDIF
1028         WRITE(numcom,*) ' '
1029         IF ( n_sequence_dlg > 0 ) THEN
1030            WRITE(numcom,'(A,I4)') ' Delayed global communications : ', n_sequence_dlg
1031            jj = 1
1032            DO ji = 2, n_sequence_dlg
1033               IF( crname_dlg(ji-1) /= crname_dlg(ji) ) THEN
1034                  WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_dlg(ji-1))
1035                  jj = 0
1036               END IF
1037               jj = jj + 1 
1038            END DO
1039            WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_dlg(n_sequence_dlg))
1040            DEALLOCATE(crname_dlg)
1041         ELSE
1042            WRITE(numcom,*) ' No MPI delayed global communication '
1043         ENDIF
1044         WRITE(numcom,*) ' '
1045         WRITE(numcom,*) ' -----------------------------------------------'
1046         WRITE(numcom,*) ' '
1047         DEALLOCATE(ncomm_sequence)
1048         DEALLOCATE(crname_lbc)
1049      ENDIF
1050#endif
1051   END SUBROUTINE mpp_report
1052
1053   
1054   SUBROUTINE tic_tac (ld_tic, ld_global)
1055
1056    LOGICAL,           INTENT(IN) :: ld_tic
1057    LOGICAL, OPTIONAL, INTENT(IN) :: ld_global
1058    REAL(wp), DIMENSION(2), SAVE :: tic_wt
1059    REAL(wp),               SAVE :: tic_ct = 0._wp
1060    INTEGER :: ii
1061#if defined key_mpp_mpi
1062
1063    IF( ncom_stp <= nit000 ) RETURN
1064    IF( ncom_stp == nitend ) RETURN
1065    ii = 1
1066    IF( PRESENT( ld_global ) ) THEN
1067       IF( ld_global ) ii = 2
1068    END IF
1069   
1070    IF ( ld_tic ) THEN
1071       tic_wt(ii) = MPI_Wtime()                                                    ! start count tic->tac (waiting time)
1072       IF ( tic_ct > 0.0_wp ) compute_time = compute_time + MPI_Wtime() - tic_ct   ! cumulate count tac->tic
1073    ELSE
1074       waiting_time(ii) = waiting_time(ii) + MPI_Wtime() - tic_wt(ii)              ! cumulate count tic->tac
1075       tic_ct = MPI_Wtime()                                                        ! start count tac->tic (waiting time)
1076    ENDIF
1077#endif
1078   
1079   END SUBROUTINE tic_tac
1080
1081#if ! defined key_mpp_mpi
1082   SUBROUTINE mpi_wait(request, status, ierror)
1083      INTEGER                            , INTENT(in   ) ::   request
1084      INTEGER, DIMENSION(MPI_STATUS_SIZE), INTENT(  out) ::   status
1085      INTEGER                            , INTENT(  out) ::   ierror
1086   END SUBROUTINE mpi_wait
1087
1088   
1089   FUNCTION MPI_Wtime()
1090      REAL(wp) ::  MPI_Wtime
1091      MPI_Wtime = -1.
1092   END FUNCTION MPI_Wtime
1093#endif
1094
1095   !!----------------------------------------------------------------------
1096   !!   ctl_stop, ctl_warn, get_unit, ctl_opn, ctl_nam, load_nml   routines
1097   !!----------------------------------------------------------------------
1098
1099   SUBROUTINE ctl_stop( cd1, cd2, cd3, cd4, cd5 ,   &
1100      &                 cd6, cd7, cd8, cd9, cd10 )
1101      !!----------------------------------------------------------------------
1102      !!                  ***  ROUTINE  stop_opa  ***
1103      !!
1104      !! ** Purpose :   print in ocean.outpput file a error message and
1105      !!                increment the error number (nstop) by one.
1106      !!----------------------------------------------------------------------
1107      CHARACTER(len=*), INTENT(in   )           ::   cd1
1108      CHARACTER(len=*), INTENT(in   ), OPTIONAL ::        cd2, cd3, cd4, cd5
1109      CHARACTER(len=*), INTENT(in   ), OPTIONAL ::   cd6, cd7, cd8, cd9, cd10
1110      !!----------------------------------------------------------------------
1111      !
1112      nstop = nstop + 1
1113      !
1114      ! force to open ocean.output file if not already opened
1115      IF( numout == 6 ) CALL ctl_opn( numout, 'ocean.output', 'APPEND', 'FORMATTED', 'SEQUENTIAL', -1, 6, .FALSE. )
1116      !
1117                            WRITE(numout,*)
1118                            WRITE(numout,*) ' ===>>> : E R R O R'
1119                            WRITE(numout,*)
1120                            WRITE(numout,*) '         ==========='
1121                            WRITE(numout,*)
1122                            WRITE(numout,*) TRIM(cd1)
1123      IF( PRESENT(cd2 ) )   WRITE(numout,*) TRIM(cd2)
1124      IF( PRESENT(cd3 ) )   WRITE(numout,*) TRIM(cd3)
1125      IF( PRESENT(cd4 ) )   WRITE(numout,*) TRIM(cd4)
1126      IF( PRESENT(cd5 ) )   WRITE(numout,*) TRIM(cd5)
1127      IF( PRESENT(cd6 ) )   WRITE(numout,*) TRIM(cd6)
1128      IF( PRESENT(cd7 ) )   WRITE(numout,*) TRIM(cd7)
1129      IF( PRESENT(cd8 ) )   WRITE(numout,*) TRIM(cd8)
1130      IF( PRESENT(cd9 ) )   WRITE(numout,*) TRIM(cd9)
1131      IF( PRESENT(cd10) )   WRITE(numout,*) TRIM(cd10)
1132                            WRITE(numout,*)
1133      !
1134                               CALL FLUSH(numout    )
1135      IF( numstp     /= -1 )   CALL FLUSH(numstp    )
1136      IF( numrun     /= -1 )   CALL FLUSH(numrun    )
1137      IF( numevo_ice /= -1 )   CALL FLUSH(numevo_ice)
1138      !
1139      IF( cd1 == 'STOP' ) THEN
1140         WRITE(numout,*) 
1141         WRITE(numout,*)  'huge E-R-R-O-R : immediate stop'
1142         WRITE(numout,*) 
1143         CALL mppstop( ld_abort = .true. )
1144      ENDIF
1145      !
1146   END SUBROUTINE ctl_stop
1147
1148
1149   SUBROUTINE ctl_warn( cd1, cd2, cd3, cd4, cd5,   &
1150      &                 cd6, cd7, cd8, cd9, cd10 )
1151      !!----------------------------------------------------------------------
1152      !!                  ***  ROUTINE  stop_warn  ***
1153      !!
1154      !! ** Purpose :   print in ocean.outpput file a error message and
1155      !!                increment the warning number (nwarn) by one.
1156      !!----------------------------------------------------------------------
1157      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd1, cd2, cd3, cd4, cd5
1158      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd6, cd7, cd8, cd9, cd10
1159      !!----------------------------------------------------------------------
1160      !
1161      nwarn = nwarn + 1
1162      !
1163      IF(lwp) THEN
1164                               WRITE(numout,*)
1165                               WRITE(numout,*) ' ===>>> : W A R N I N G'
1166                               WRITE(numout,*)
1167                               WRITE(numout,*) '         ==============='
1168                               WRITE(numout,*)
1169         IF( PRESENT(cd1 ) )   WRITE(numout,*) TRIM(cd1)
1170         IF( PRESENT(cd2 ) )   WRITE(numout,*) TRIM(cd2)
1171         IF( PRESENT(cd3 ) )   WRITE(numout,*) TRIM(cd3)
1172         IF( PRESENT(cd4 ) )   WRITE(numout,*) TRIM(cd4)
1173         IF( PRESENT(cd5 ) )   WRITE(numout,*) TRIM(cd5)
1174         IF( PRESENT(cd6 ) )   WRITE(numout,*) TRIM(cd6)
1175         IF( PRESENT(cd7 ) )   WRITE(numout,*) TRIM(cd7)
1176         IF( PRESENT(cd8 ) )   WRITE(numout,*) TRIM(cd8)
1177         IF( PRESENT(cd9 ) )   WRITE(numout,*) TRIM(cd9)
1178         IF( PRESENT(cd10) )   WRITE(numout,*) TRIM(cd10)
1179                               WRITE(numout,*)
1180      ENDIF
1181      CALL FLUSH(numout)
1182      !
1183   END SUBROUTINE ctl_warn
1184
1185
1186   SUBROUTINE ctl_opn( knum, cdfile, cdstat, cdform, cdacce, klengh, kout, ldwp, karea )
1187      !!----------------------------------------------------------------------
1188      !!                  ***  ROUTINE ctl_opn  ***
1189      !!
1190      !! ** Purpose :   Open file and check if required file is available.
1191      !!
1192      !! ** Method  :   Fortan open
1193      !!----------------------------------------------------------------------
1194      INTEGER          , INTENT(  out) ::   knum      ! logical unit to open
1195      CHARACTER(len=*) , INTENT(in   ) ::   cdfile    ! file name to open
1196      CHARACTER(len=*) , INTENT(in   ) ::   cdstat    ! disposition specifier
1197      CHARACTER(len=*) , INTENT(in   ) ::   cdform    ! formatting specifier
1198      CHARACTER(len=*) , INTENT(in   ) ::   cdacce    ! access specifier
1199      INTEGER          , INTENT(in   ) ::   klengh    ! record length
1200      INTEGER          , INTENT(in   ) ::   kout      ! number of logical units for write
1201      LOGICAL          , INTENT(in   ) ::   ldwp      ! boolean term for print
1202      INTEGER, OPTIONAL, INTENT(in   ) ::   karea     ! proc number
1203      !
1204      CHARACTER(len=80) ::   clfile
1205      INTEGER           ::   iost
1206      !!----------------------------------------------------------------------
1207      !
1208      ! adapt filename
1209      ! ----------------
1210      clfile = TRIM(cdfile)
1211      IF( PRESENT( karea ) ) THEN
1212         IF( karea > 1 )   WRITE(clfile, "(a,'_',i4.4)") TRIM(clfile), karea-1
1213      ENDIF
1214#if defined key_agrif
1215      IF( .NOT. Agrif_Root() )   clfile = TRIM(Agrif_CFixed())//'_'//TRIM(clfile)
1216      knum=Agrif_Get_Unit()
1217#else
1218      knum=get_unit()
1219#endif
1220      IF( TRIM(cdfile) == '/dev/null' )   clfile = TRIM(cdfile)   ! force the use of /dev/null
1221      !
1222      IF(       cdacce(1:6) == 'DIRECT' )  THEN   ! cdacce has always more than 6 characters
1223         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS=cdstat, RECL=klengh         , ERR=100, IOSTAT=iost )
1224      ELSE IF( TRIM(cdstat) == 'APPEND' )  THEN   ! cdstat can have less than 6 characters
1225         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS='UNKNOWN', POSITION='APPEND', ERR=100, IOSTAT=iost )
1226      ELSE
1227         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS=cdstat                      , ERR=100, IOSTAT=iost )
1228      ENDIF
1229      IF( iost /= 0 .AND. TRIM(clfile) == '/dev/null' ) &   ! for windows
1230         &  OPEN(UNIT=knum,FILE='NUL', FORM=cdform, ACCESS=cdacce, STATUS=cdstat                      , ERR=100, IOSTAT=iost )   
1231      IF( iost == 0 ) THEN
1232         IF(ldwp) THEN
1233            WRITE(kout,*) '     file   : ', TRIM(clfile),' open ok'
1234            WRITE(kout,*) '     unit   = ', knum
1235            WRITE(kout,*) '     status = ', cdstat
1236            WRITE(kout,*) '     form   = ', cdform
1237            WRITE(kout,*) '     access = ', cdacce
1238            WRITE(kout,*)
1239         ENDIF
1240      ENDIF
1241100   CONTINUE
1242      IF( iost /= 0 ) THEN
1243         WRITE(ctmp1,*) ' ===>>>> : bad opening file: ', TRIM(clfile)
1244         WRITE(ctmp2,*) ' =======   ===  '
1245         WRITE(ctmp3,*) '           unit   = ', knum
1246         WRITE(ctmp4,*) '           status = ', cdstat
1247         WRITE(ctmp5,*) '           form   = ', cdform
1248         WRITE(ctmp6,*) '           access = ', cdacce
1249         WRITE(ctmp7,*) '           iostat = ', iost
1250         WRITE(ctmp8,*) '           we stop. verify the file '
1251         CALL ctl_stop( 'STOP', ctmp1, ctmp2, ctmp3, ctmp4, ctmp5, ctmp6, ctmp7, ctmp8 )
1252      ENDIF
1253      !
1254   END SUBROUTINE ctl_opn
1255
1256
1257   SUBROUTINE ctl_nam ( kios, cdnam )
1258      !!----------------------------------------------------------------------
1259      !!                  ***  ROUTINE ctl_nam  ***
1260      !!
1261      !! ** Purpose :   Informations when error while reading a namelist
1262      !!
1263      !! ** Method  :   Fortan open
1264      !!----------------------------------------------------------------------
1265      INTEGER                                , INTENT(inout) ::   kios    ! IO status after reading the namelist
1266      CHARACTER(len=*)                       , INTENT(in   ) ::   cdnam   ! group name of namelist for which error occurs
1267      !
1268      CHARACTER(len=5) ::   clios   ! string to convert iostat in character for print
1269      !!----------------------------------------------------------------------
1270      !
1271      WRITE (clios, '(I5.0)')   kios
1272      IF( kios < 0 ) THEN         
1273         CALL ctl_warn( 'end of record or file while reading namelist '   &
1274            &           // TRIM(cdnam) // ' iostat = ' // TRIM(clios) )
1275      ENDIF
1276      !
1277      IF( kios > 0 ) THEN
1278         CALL ctl_stop( 'misspelled variable in namelist '   &
1279            &           // TRIM(cdnam) // ' iostat = ' // TRIM(clios) )
1280      ENDIF
1281      kios = 0
1282      !
1283   END SUBROUTINE ctl_nam
1284
1285
1286   INTEGER FUNCTION get_unit()
1287      !!----------------------------------------------------------------------
1288      !!                  ***  FUNCTION  get_unit  ***
1289      !!
1290      !! ** Purpose :   return the index of an unused logical unit
1291      !!----------------------------------------------------------------------
1292      LOGICAL :: llopn
1293      !!----------------------------------------------------------------------
1294      !
1295      get_unit = 15   ! choose a unit that is big enough then it is not already used in NEMO
1296      llopn = .TRUE.
1297      DO WHILE( (get_unit < 998) .AND. llopn )
1298         get_unit = get_unit + 1
1299         INQUIRE( unit = get_unit, opened = llopn )
1300      END DO
1301      IF( (get_unit == 999) .AND. llopn ) THEN
1302         CALL ctl_stop( 'STOP', 'get_unit: All logical units until 999 are used...' )
1303      ENDIF
1304      !
1305   END FUNCTION get_unit
1306
1307   SUBROUTINE load_nml( cdnambuff , cdnamfile, kout, ldwp)
1308      CHARACTER(LEN=:)    , ALLOCATABLE, INTENT(INOUT) :: cdnambuff
1309      CHARACTER(LEN=*), INTENT(IN )                :: cdnamfile
1310      CHARACTER(LEN=256)                           :: chline
1311      INTEGER, INTENT(IN)                          :: kout
1312      LOGICAL, INTENT(IN)                          :: ldwp  !: .true. only for the root broadcaster
1313      INTEGER                                      :: itot, iun, iltc, inl, ios, itotsav
1314      !
1315      ! Check if the namelist buffer has already been allocated. Return if it has.
1316      !
1317      IF ( ALLOCATED( cdnambuff ) ) RETURN
1318      IF( ldwp ) THEN
1319         !
1320         ! Open namelist file
1321         !
1322         CALL ctl_opn( iun, cdnamfile, 'OLD', 'FORMATTED', 'SEQUENTIAL', -1, kout, ldwp )
1323         !
1324         ! First pass: count characters excluding comments and trimable white space
1325         !
1326         itot=0
1327     10  READ(iun,'(A256)',END=20,ERR=20) chline
1328         iltc = LEN_TRIM(chline)
1329         IF ( iltc.GT.0 ) THEN
1330          inl = INDEX(chline, '!') 
1331          IF( inl.eq.0 ) THEN
1332           itot = itot + iltc + 1                                ! +1 for the newline character
1333          ELSEIF( inl.GT.0 .AND. LEN_TRIM( chline(1:inl-1) ).GT.0 ) THEN
1334           itot = itot + inl                                  !  includes +1 for the newline character
1335          ENDIF
1336         ENDIF
1337         GOTO 10
1338     20  CONTINUE
1339         !
1340         ! Allocate text cdnambuff for condensed namelist
1341         !
1342!$AGRIF_DO_NOT_TREAT
1343         ALLOCATE( CHARACTER(LEN=itot) :: cdnambuff )
1344!$AGRIF_END_DO_NOT_TREAT
1345         itotsav = itot
1346         !
1347         ! Second pass: read and transfer pruned characters into cdnambuff
1348         !
1349         REWIND(iun)
1350         itot=1
1351     30  READ(iun,'(A256)',END=40,ERR=40) chline
1352         iltc = LEN_TRIM(chline)
1353         IF ( iltc.GT.0 ) THEN
1354          inl = INDEX(chline, '!')
1355          IF( inl.eq.0 ) THEN
1356           inl = iltc
1357          ELSE
1358           inl = inl - 1
1359          ENDIF
1360          IF( inl.GT.0 .AND. LEN_TRIM( chline(1:inl) ).GT.0 ) THEN
1361             cdnambuff(itot:itot+inl-1) = chline(1:inl)
1362             WRITE( cdnambuff(itot+inl:itot+inl), '(a)' ) NEW_LINE('A')
1363             itot = itot + inl + 1
1364          ENDIF
1365         ENDIF
1366         GOTO 30
1367     40  CONTINUE
1368         itot = itot - 1
1369         IF( itotsav .NE. itot ) WRITE(*,*) 'WARNING in load_nml. Allocated ',itotsav,' for read buffer; but used ',itot
1370         !
1371         ! Close namelist file
1372         !
1373         CLOSE(iun)
1374         !write(*,'(32A)') cdnambuff
1375      ENDIF
1376#if defined key_mpp_mpi
1377      CALL mpp_bcast_nml( cdnambuff, itot )
1378#endif
1379  END SUBROUTINE load_nml
1380
1381
1382   !!----------------------------------------------------------------------
1383END MODULE lib_mpp
Note: See TracBrowser for help on using the repository browser.