New URL for NEMO forge!   http://forge.nemo-ocean.eu

Since March 2022 along with NEMO 4.2 release, the code development moved to a self-hosted GitLab.
This present forge is now archived and remained online for history.
lib_mpp.F90 in NEMO/branches/2019/dev_r12072_MERGE_OPTION2_2019/src/OCE/LBC – NEMO

source: NEMO/branches/2019/dev_r12072_MERGE_OPTION2_2019/src/OCE/LBC/lib_mpp.F90 @ 12202

Last change on this file since 12202 was 12202, checked in by cetlod, 4 years ago

dev_merge_option2 : merge in dev_r11613_ENHANCE-04_namelists_as_internalfiles

  • Property svn:keywords set to Id
File size: 60.4 KB
Line 
1MODULE lib_mpp
2   !!======================================================================
3   !!                       ***  MODULE  lib_mpp  ***
4   !! Ocean numerics:  massively parallel processing library
5   !!=====================================================================
6   !! History :  OPA  !  1994  (M. Guyon, J. Escobar, M. Imbard)  Original code
7   !!            7.0  !  1997  (A.M. Treguier)  SHMEM additions
8   !!            8.0  !  1998  (M. Imbard, J. Escobar, L. Colombet ) SHMEM and MPI
9   !!                 !  1998  (J.M. Molines) Open boundary conditions
10   !!   NEMO     1.0  !  2003  (J.M. Molines, G. Madec)  F90, free form
11   !!                 !  2003  (J.M. Molines) add mpp_ini_north(_3d,_2d)
12   !!             -   !  2004  (R. Bourdalle Badie)  isend option in mpi
13   !!                 !  2004  (J.M. Molines) minloc, maxloc
14   !!             -   !  2005  (G. Madec, S. Masson)  npolj=5,6 F-point & ice cases
15   !!             -   !  2005  (R. Redler) Replacement of MPI_COMM_WORLD except for MPI_Abort
16   !!             -   !  2005  (R. Benshila, G. Madec)  add extra halo case
17   !!             -   !  2008  (R. Benshila) add mpp_ini_ice
18   !!            3.2  !  2009  (R. Benshila) SHMEM suppression, north fold in lbc_nfd
19   !!            3.2  !  2009  (O. Marti)    add mpp_ini_znl
20   !!            4.0  !  2011  (G. Madec)  move ctl_ routines from in_out_manager
21   !!            3.5  !  2012  (S.Mocavero, I. Epicoco) Add mpp_lnk_bdy_3d/2d routines to optimize the BDY comm.
22   !!            3.5  !  2013  (C. Ethe, G. Madec)  message passing arrays as local variables
23   !!            3.5  !  2013  (S.Mocavero, I.Epicoco - CMCC) north fold optimizations
24   !!            3.6  !  2015  (O. Tintó and M. Castrillo - BSC) Added '_multiple' case for 2D lbc and max
25   !!            4.0  !  2017  (G. Madec) automatique allocation of array argument (use any 3rd dimension)
26   !!             -   !  2017  (G. Madec) create generic.h90 files to generate all lbc and north fold routines
27   !!----------------------------------------------------------------------
28
29   !!----------------------------------------------------------------------
30   !!   ctl_stop      : update momentum and tracer Kz from a tke scheme
31   !!   ctl_warn      : initialization, namelist read, and parameters control
32   !!   ctl_opn       : Open file and check if required file is available.
33   !!   ctl_nam       : Prints informations when an error occurs while reading a namelist
34   !!   load_nml      : Read, condense and buffer namelist file into character array for use as an internal file
35   !!----------------------------------------------------------------------
36   !!----------------------------------------------------------------------
37   !!   mpp_start     : get local communicator its size and rank
38   !!   mpp_lnk       : interface (defined in lbclnk) for message passing of 2d or 3d arrays (mpp_lnk_2d, mpp_lnk_3d)
39   !!   mpp_lnk_icb   : interface for message passing of 2d arrays with extra halo for icebergs (mpp_lnk_2d_icb)
40   !!   mpprecv       :
41   !!   mppsend       :
42   !!   mppscatter    :
43   !!   mppgather     :
44   !!   mpp_min       : generic interface for mppmin_int , mppmin_a_int , mppmin_real, mppmin_a_real
45   !!   mpp_max       : generic interface for mppmax_int , mppmax_a_int , mppmax_real, mppmax_a_real
46   !!   mpp_sum       : generic interface for mppsum_int , mppsum_a_int , mppsum_real, mppsum_a_real
47   !!   mpp_minloc    :
48   !!   mpp_maxloc    :
49   !!   mppsync       :
50   !!   mppstop       :
51   !!   mpp_ini_north : initialisation of north fold
52   !!   mpp_lbc_north_icb : alternative to mpp_nfd for extra outer halo with icebergs
53   !!   mpp_bcast_nml : broadcast/receive namelist character buffer from reading process to all others
54   !!----------------------------------------------------------------------
55   USE dom_oce        ! ocean space and time domain
56   USE in_out_manager ! I/O manager
57
58   IMPLICIT NONE
59   PRIVATE
60   !
61   PUBLIC   ctl_stop, ctl_warn, ctl_opn, ctl_nam, load_nml
62   PUBLIC   mpp_start, mppstop, mppsync, mpp_comm_free
63   PUBLIC   mpp_ini_north
64   PUBLIC   mpp_min, mpp_max, mpp_sum, mpp_minloc, mpp_maxloc
65   PUBLIC   mpp_delay_max, mpp_delay_sum, mpp_delay_rcv
66   PUBLIC   mppscatter, mppgather
67   PUBLIC   mpp_ini_znl
68   PUBLIC   mppsend, mpprecv                          ! needed by TAM and ICB routines
69   PUBLIC   mpp_report
70   PUBLIC   mpp_bcast_nml
71   PUBLIC   tic_tac
72#if ! defined key_mpp_mpi
73   PUBLIC MPI_Wtime
74#endif
75   
76   !! * Interfaces
77   !! define generic interface for these routine as they are called sometimes
78   !! with scalar arguments instead of array arguments, which causes problems
79   !! for the compilation on AIX system as well as NEC and SGI. Ok on COMPACQ
80   INTERFACE mpp_min
81      MODULE PROCEDURE mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real
82   END INTERFACE
83   INTERFACE mpp_max
84      MODULE PROCEDURE mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real
85   END INTERFACE
86   INTERFACE mpp_sum
87      MODULE PROCEDURE mppsum_a_int, mppsum_int, mppsum_a_real, mppsum_real,   &
88         &             mppsum_realdd, mppsum_a_realdd
89   END INTERFACE
90   INTERFACE mpp_minloc
91      MODULE PROCEDURE mpp_minloc2d ,mpp_minloc3d
92   END INTERFACE
93   INTERFACE mpp_maxloc
94      MODULE PROCEDURE mpp_maxloc2d ,mpp_maxloc3d
95   END INTERFACE
96
97   !! ========================= !!
98   !!  MPI  variable definition !!
99   !! ========================= !!
100#if   defined key_mpp_mpi
101!$AGRIF_DO_NOT_TREAT
102   INCLUDE 'mpif.h'
103!$AGRIF_END_DO_NOT_TREAT
104   LOGICAL, PUBLIC, PARAMETER ::   lk_mpp = .TRUE.    !: mpp flag
105#else   
106   INTEGER, PUBLIC, PARAMETER ::   MPI_STATUS_SIZE = 1
107   INTEGER, PUBLIC, PARAMETER ::   MPI_DOUBLE_PRECISION = 8
108   LOGICAL, PUBLIC, PARAMETER ::   lk_mpp = .FALSE.    !: mpp flag
109#endif
110
111   INTEGER, PARAMETER         ::   nprocmax = 2**10   ! maximun dimension (required to be a power of 2)
112
113   INTEGER, PUBLIC ::   mppsize        ! number of process
114   INTEGER, PUBLIC ::   mpprank        ! process number  [ 0 - size-1 ]
115!$AGRIF_DO_NOT_TREAT
116   INTEGER, PUBLIC ::   mpi_comm_oce   ! opa local communicator
117!$AGRIF_END_DO_NOT_TREAT
118
119   INTEGER :: MPI_SUMDD
120
121   ! variables used for zonal integration
122   INTEGER, PUBLIC ::   ncomm_znl       !: communicator made by the processors on the same zonal average
123   LOGICAL, PUBLIC ::   l_znl_root      !: True on the 'left'most processor on the same row
124   INTEGER         ::   ngrp_znl        !  group ID for the znl processors
125   INTEGER         ::   ndim_rank_znl   !  number of processors on the same zonal average
126   INTEGER, DIMENSION(:), ALLOCATABLE, SAVE ::   nrank_znl  ! dimension ndim_rank_znl, number of the procs into the same znl domain
127
128   ! North fold condition in mpp_mpi with jpni > 1 (PUBLIC for TAM)
129   INTEGER, PUBLIC ::   ngrp_world        !: group ID for the world processors
130   INTEGER, PUBLIC ::   ngrp_opa          !: group ID for the opa processors
131   INTEGER, PUBLIC ::   ngrp_north        !: group ID for the northern processors (to be fold)
132   INTEGER, PUBLIC ::   ncomm_north       !: communicator made by the processors belonging to ngrp_north
133   INTEGER, PUBLIC ::   ndim_rank_north   !: number of 'sea' processor in the northern line (can be /= jpni !)
134   INTEGER, PUBLIC ::   njmppmax          !: value of njmpp for the processors of the northern line
135   INTEGER, PUBLIC ::   north_root        !: number (in the comm_opa) of proc 0 in the northern comm
136   INTEGER, PUBLIC, DIMENSION(:), ALLOCATABLE, SAVE ::   nrank_north   !: dimension ndim_rank_north
137
138   ! Communications summary report
139   CHARACTER(len=128), DIMENSION(:), ALLOCATABLE ::   crname_lbc                   !: names of lbc_lnk calling routines
140   CHARACTER(len=128), DIMENSION(:), ALLOCATABLE ::   crname_glb                   !: names of global comm calling routines
141   CHARACTER(len=128), DIMENSION(:), ALLOCATABLE ::   crname_dlg                   !: names of delayed global comm calling routines
142   INTEGER, PUBLIC                               ::   ncom_stp = 0                 !: copy of time step # istp
143   INTEGER, PUBLIC                               ::   ncom_fsbc = 1                !: copy of sbc time step # nn_fsbc
144   INTEGER, PUBLIC                               ::   ncom_dttrc = 1               !: copy of top time step # nn_dttrc
145   INTEGER, PUBLIC                               ::   ncom_freq                    !: frequency of comm diagnostic
146   INTEGER, PUBLIC , DIMENSION(:,:), ALLOCATABLE ::   ncomm_sequence               !: size of communicated arrays (halos)
147   INTEGER, PARAMETER, PUBLIC                    ::   ncom_rec_max = 5000          !: max number of communication record
148   INTEGER, PUBLIC                               ::   n_sequence_lbc = 0           !: # of communicated arraysvia lbc
149   INTEGER, PUBLIC                               ::   n_sequence_glb = 0           !: # of global communications
150   INTEGER, PUBLIC                               ::   n_sequence_dlg = 0           !: # of delayed global communications
151   INTEGER, PUBLIC                               ::   numcom = -1                  !: logical unit for communicaton report
152   LOGICAL, PUBLIC                               ::   l_full_nf_update = .TRUE.    !: logical for a full (2lines) update of bc at North fold report
153   INTEGER,                    PARAMETER, PUBLIC ::   nbdelay = 2       !: number of delayed operations
154   !: name (used as id) of allreduce-delayed operations
155   ! Warning: we must use the same character length in an array constructor (at least for gcc compiler)
156   CHARACTER(len=32), DIMENSION(nbdelay), PUBLIC ::   c_delaylist = (/ 'cflice', 'fwb   ' /)
157   !: component name where the allreduce-delayed operation is performed
158   CHARACTER(len=3),  DIMENSION(nbdelay), PUBLIC ::   c_delaycpnt = (/ 'ICE'   , 'OCE' /)
159   TYPE, PUBLIC ::   DELAYARR
160      REAL(   wp), POINTER, DIMENSION(:) ::  z1d => NULL()
161      COMPLEX(wp), POINTER, DIMENSION(:) ::  y1d => NULL()
162   END TYPE DELAYARR
163   TYPE( DELAYARR ), DIMENSION(nbdelay), PUBLIC, SAVE  ::   todelay         !: must have SAVE for default initialization of DELAYARR
164   INTEGER,          DIMENSION(nbdelay), PUBLIC        ::   ndelayid = -1   !: mpi request id of the delayed operations
165
166   ! timing summary report
167   REAL(wp), DIMENSION(2), PUBLIC ::  waiting_time = 0._wp
168   REAL(wp)              , PUBLIC ::  compute_time = 0._wp, elapsed_time = 0._wp
169   
170   REAL(wp), DIMENSION(:), ALLOCATABLE, SAVE ::   tampon   ! buffer in case of bsend
171
172   LOGICAL, PUBLIC ::   ln_nnogather                !: namelist control of northfold comms
173   LOGICAL, PUBLIC ::   l_north_nogather = .FALSE.  !: internal control of northfold comms
174   
175   !!----------------------------------------------------------------------
176   !! NEMO/OCE 4.0 , NEMO Consortium (2018)
177   !! $Id$
178   !! Software governed by the CeCILL license (see ./LICENSE)
179   !!----------------------------------------------------------------------
180CONTAINS
181
182   SUBROUTINE mpp_start( localComm )
183      !!----------------------------------------------------------------------
184      !!                  ***  routine mpp_start  ***
185      !!
186      !! ** Purpose :   get mpi_comm_oce, mpprank and mppsize
187      !!----------------------------------------------------------------------
188      INTEGER         , OPTIONAL   , INTENT(in   ) ::   localComm    !
189      !
190      INTEGER ::   ierr
191      LOGICAL ::   llmpi_init
192      !!----------------------------------------------------------------------
193#if defined key_mpp_mpi
194      !
195      CALL mpi_initialized ( llmpi_init, ierr )
196      IF( ierr /= MPI_SUCCESS ) CALL ctl_stop( 'STOP', ' lib_mpp: Error in routine mpi_initialized' )
197
198      IF( .NOT. llmpi_init ) THEN
199         IF( PRESENT(localComm) ) THEN
200            WRITE(ctmp1,*) ' lib_mpp: You cannot provide a local communicator '
201            WRITE(ctmp2,*) '          without calling MPI_Init before ! '
202            CALL ctl_stop( 'STOP', ctmp1, ctmp2 )
203         ENDIF
204         CALL mpi_init( ierr )
205         IF( ierr /= MPI_SUCCESS ) CALL ctl_stop( 'STOP', ' lib_mpp: Error in routine mpi_init' )
206      ENDIF
207       
208      IF( PRESENT(localComm) ) THEN
209         IF( Agrif_Root() ) THEN
210            mpi_comm_oce = localComm
211         ENDIF
212      ELSE
213         CALL mpi_comm_dup( mpi_comm_world, mpi_comm_oce, ierr)
214         IF( ierr /= MPI_SUCCESS ) CALL ctl_stop( 'STOP', ' lib_mpp: Error in routine mpi_comm_dup' )
215      ENDIF
216
217# if defined key_agrif
218      IF( Agrif_Root() ) THEN
219         CALL Agrif_MPI_Init(mpi_comm_oce)
220      ELSE
221         CALL Agrif_MPI_set_grid_comm(mpi_comm_oce)
222      ENDIF
223# endif
224
225      CALL mpi_comm_rank( mpi_comm_oce, mpprank, ierr )
226      CALL mpi_comm_size( mpi_comm_oce, mppsize, ierr )
227      !
228      CALL MPI_OP_CREATE(DDPDD_MPI, .TRUE., MPI_SUMDD, ierr)
229      !
230#else
231      IF( PRESENT( localComm ) ) mpi_comm_oce = localComm
232      mppsize = 1
233      mpprank = 0
234#endif
235   END SUBROUTINE mpp_start
236
237
238   SUBROUTINE mppsend( ktyp, pmess, kbytes, kdest, md_req )
239      !!----------------------------------------------------------------------
240      !!                  ***  routine mppsend  ***
241      !!
242      !! ** Purpose :   Send messag passing array
243      !!
244      !!----------------------------------------------------------------------
245      REAL(wp), INTENT(inout) ::   pmess(*)   ! array of real
246      INTEGER , INTENT(in   ) ::   kbytes     ! size of the array pmess
247      INTEGER , INTENT(in   ) ::   kdest      ! receive process number
248      INTEGER , INTENT(in   ) ::   ktyp       ! tag of the message
249      INTEGER , INTENT(in   ) ::   md_req     ! argument for isend
250      !!
251      INTEGER ::   iflag
252      !!----------------------------------------------------------------------
253      !
254#if defined key_mpp_mpi
255      CALL mpi_isend( pmess, kbytes, mpi_double_precision, kdest , ktyp, mpi_comm_oce, md_req, iflag )
256#endif
257      !
258   END SUBROUTINE mppsend
259
260
261   SUBROUTINE mpprecv( ktyp, pmess, kbytes, ksource )
262      !!----------------------------------------------------------------------
263      !!                  ***  routine mpprecv  ***
264      !!
265      !! ** Purpose :   Receive messag passing array
266      !!
267      !!----------------------------------------------------------------------
268      REAL(wp), INTENT(inout) ::   pmess(*)   ! array of real
269      INTEGER , INTENT(in   ) ::   kbytes     ! suze of the array pmess
270      INTEGER , INTENT(in   ) ::   ktyp       ! Tag of the recevied message
271      INTEGER, OPTIONAL, INTENT(in) :: ksource    ! source process number
272      !!
273      INTEGER :: istatus(mpi_status_size)
274      INTEGER :: iflag
275      INTEGER :: use_source
276      !!----------------------------------------------------------------------
277      !
278#if defined key_mpp_mpi
279      ! If a specific process number has been passed to the receive call,
280      ! use that one. Default is to use mpi_any_source
281      use_source = mpi_any_source
282      IF( PRESENT(ksource) )   use_source = ksource
283      !
284      CALL mpi_recv( pmess, kbytes, mpi_double_precision, use_source, ktyp, mpi_comm_oce, istatus, iflag )
285#endif
286      !
287   END SUBROUTINE mpprecv
288
289
290   SUBROUTINE mppgather( ptab, kp, pio )
291      !!----------------------------------------------------------------------
292      !!                   ***  routine mppgather  ***
293      !!
294      !! ** Purpose :   Transfert between a local subdomain array and a work
295      !!     array which is distributed following the vertical level.
296      !!
297      !!----------------------------------------------------------------------
298      REAL(wp), DIMENSION(jpi,jpj)      , INTENT(in   ) ::   ptab   ! subdomain input array
299      INTEGER                           , INTENT(in   ) ::   kp     ! record length
300      REAL(wp), DIMENSION(jpi,jpj,jpnij), INTENT(  out) ::   pio    ! subdomain input array
301      !!
302      INTEGER :: itaille, ierror   ! temporary integer
303      !!---------------------------------------------------------------------
304      !
305      itaille = jpi * jpj
306#if defined key_mpp_mpi
307      CALL mpi_gather( ptab, itaille, mpi_double_precision, pio, itaille     ,   &
308         &                            mpi_double_precision, kp , mpi_comm_oce, ierror )
309#else
310      pio(:,:,1) = ptab(:,:)
311#endif
312      !
313   END SUBROUTINE mppgather
314
315
316   SUBROUTINE mppscatter( pio, kp, ptab )
317      !!----------------------------------------------------------------------
318      !!                  ***  routine mppscatter  ***
319      !!
320      !! ** Purpose :   Transfert between awork array which is distributed
321      !!      following the vertical level and the local subdomain array.
322      !!
323      !!----------------------------------------------------------------------
324      REAL(wp), DIMENSION(jpi,jpj,jpnij)  ::   pio    ! output array
325      INTEGER                             ::   kp     ! Tag (not used with MPI
326      REAL(wp), DIMENSION(jpi,jpj)        ::   ptab   ! subdomain array input
327      !!
328      INTEGER :: itaille, ierror   ! temporary integer
329      !!---------------------------------------------------------------------
330      !
331      itaille = jpi * jpj
332      !
333#if defined key_mpp_mpi
334      CALL mpi_scatter( pio, itaille, mpi_double_precision, ptab, itaille     ,   &
335         &                            mpi_double_precision, kp  , mpi_comm_oce, ierror )
336#else
337      ptab(:,:) = pio(:,:,1)
338#endif
339      !
340   END SUBROUTINE mppscatter
341
342   
343   SUBROUTINE mpp_delay_sum( cdname, cdelay, y_in, pout, ldlast, kcom )
344     !!----------------------------------------------------------------------
345      !!                   ***  routine mpp_delay_sum  ***
346      !!
347      !! ** Purpose :   performed delayed mpp_sum, the result is received on next call
348      !!
349      !!----------------------------------------------------------------------
350      CHARACTER(len=*), INTENT(in   )               ::   cdname  ! name of the calling subroutine
351      CHARACTER(len=*), INTENT(in   )               ::   cdelay  ! name (used as id) of the delayed operation
352      COMPLEX(wp),      INTENT(in   ), DIMENSION(:) ::   y_in
353      REAL(wp),         INTENT(  out), DIMENSION(:) ::   pout
354      LOGICAL,          INTENT(in   )               ::   ldlast  ! true if this is the last time we call this routine
355      INTEGER,          INTENT(in   ), OPTIONAL     ::   kcom
356      !!
357      INTEGER ::   ji, isz
358      INTEGER ::   idvar
359      INTEGER ::   ierr, ilocalcomm
360      COMPLEX(wp), ALLOCATABLE, DIMENSION(:) ::   ytmp
361      !!----------------------------------------------------------------------
362#if defined key_mpp_mpi
363      ilocalcomm = mpi_comm_oce
364      IF( PRESENT(kcom) )   ilocalcomm = kcom
365
366      isz = SIZE(y_in)
367     
368      IF( narea == 1 .AND. numcom == -1 ) CALL mpp_report( cdname, ld_dlg = .TRUE. )
369
370      idvar = -1
371      DO ji = 1, nbdelay
372         IF( TRIM(cdelay) == TRIM(c_delaylist(ji)) ) idvar = ji
373      END DO
374      IF ( idvar == -1 )   CALL ctl_stop( 'STOP',' mpp_delay_sum : please add a new delayed exchange for '//TRIM(cdname) )
375
376      IF ( ndelayid(idvar) == 0 ) THEN         ! first call    with restart: %z1d defined in iom_delay_rst
377         !                                       --------------------------
378         IF ( SIZE(todelay(idvar)%z1d) /= isz ) THEN                  ! Check dimension coherence
379            IF(lwp) WRITE(numout,*) ' WARNING: the nb of delayed variables in restart file is not the model one'
380            DEALLOCATE(todelay(idvar)%z1d)
381            ndelayid(idvar) = -1                                      ! do as if we had no restart
382         ELSE
383            ALLOCATE(todelay(idvar)%y1d(isz))
384            todelay(idvar)%y1d(:) = CMPLX(todelay(idvar)%z1d(:), 0., wp)   ! create %y1d, complex variable needed by mpi_sumdd
385         END IF
386      ENDIF
387     
388      IF( ndelayid(idvar) == -1 ) THEN         ! first call without restart: define %y1d and %z1d from y_in with blocking allreduce
389         !                                       --------------------------
390         ALLOCATE(todelay(idvar)%z1d(isz), todelay(idvar)%y1d(isz))   ! allocate also %z1d as used for the restart
391         CALL mpi_allreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ierr )   ! get %y1d
392         todelay(idvar)%z1d(:) = REAL(todelay(idvar)%y1d(:), wp)      ! define %z1d from %y1d
393      ENDIF
394
395      IF( ndelayid(idvar) > 0 )   CALL mpp_delay_rcv( idvar )         ! make sure %z1d is received
396
397      ! send back pout from todelay(idvar)%z1d defined at previous call
398      pout(:) = todelay(idvar)%z1d(:)
399
400      ! send y_in into todelay(idvar)%y1d with a non-blocking communication
401# if defined key_mpi2
402      IF( ln_timing ) CALL tic_tac( .TRUE., ld_global = .TRUE.)
403      CALL  mpi_allreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ndelayid(idvar), ierr )
404      IF( ln_timing ) CALL tic_tac(.FALSE., ld_global = .TRUE.)
405# else
406      CALL mpi_iallreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ndelayid(idvar), ierr )
407# endif
408#else
409      pout(:) = REAL(y_in(:), wp)
410#endif
411
412   END SUBROUTINE mpp_delay_sum
413
414   
415   SUBROUTINE mpp_delay_max( cdname, cdelay, p_in, pout, ldlast, kcom )
416      !!----------------------------------------------------------------------
417      !!                   ***  routine mpp_delay_max  ***
418      !!
419      !! ** Purpose :   performed delayed mpp_max, the result is received on next call
420      !!
421      !!----------------------------------------------------------------------
422      CHARACTER(len=*), INTENT(in   )                 ::   cdname  ! name of the calling subroutine
423      CHARACTER(len=*), INTENT(in   )                 ::   cdelay  ! name (used as id) of the delayed operation
424      REAL(wp),         INTENT(in   ), DIMENSION(:)   ::   p_in    !
425      REAL(wp),         INTENT(  out), DIMENSION(:)   ::   pout    !
426      LOGICAL,          INTENT(in   )                 ::   ldlast  ! true if this is the last time we call this routine
427      INTEGER,          INTENT(in   ), OPTIONAL       ::   kcom
428      !!
429      INTEGER ::   ji, isz
430      INTEGER ::   idvar
431      INTEGER ::   ierr, ilocalcomm
432      !!----------------------------------------------------------------------
433#if defined key_mpp_mpi
434      ilocalcomm = mpi_comm_oce
435      IF( PRESENT(kcom) )   ilocalcomm = kcom
436
437      isz = SIZE(p_in)
438
439      IF( narea == 1 .AND. numcom == -1 ) CALL mpp_report( cdname, ld_dlg = .TRUE. )
440
441      idvar = -1
442      DO ji = 1, nbdelay
443         IF( TRIM(cdelay) == TRIM(c_delaylist(ji)) ) idvar = ji
444      END DO
445      IF ( idvar == -1 )   CALL ctl_stop( 'STOP',' mpp_delay_max : please add a new delayed exchange for '//TRIM(cdname) )
446
447      IF ( ndelayid(idvar) == 0 ) THEN         ! first call    with restart: %z1d defined in iom_delay_rst
448         !                                       --------------------------
449         IF ( SIZE(todelay(idvar)%z1d) /= isz ) THEN                  ! Check dimension coherence
450            IF(lwp) WRITE(numout,*) ' WARNING: the nb of delayed variables in restart file is not the model one'
451            DEALLOCATE(todelay(idvar)%z1d)
452            ndelayid(idvar) = -1                                      ! do as if we had no restart
453         END IF
454      ENDIF
455
456      IF( ndelayid(idvar) == -1 ) THEN         ! first call without restart: define %z1d from p_in with a blocking allreduce
457         !                                       --------------------------
458         ALLOCATE(todelay(idvar)%z1d(isz))
459         CALL mpi_allreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_DOUBLE_PRECISION, mpi_max, ilocalcomm, ierr )   ! get %z1d
460      ENDIF
461
462      IF( ndelayid(idvar) > 0 )   CALL mpp_delay_rcv( idvar )         ! make sure %z1d is received
463
464      ! send back pout from todelay(idvar)%z1d defined at previous call
465      pout(:) = todelay(idvar)%z1d(:)
466
467      ! send p_in into todelay(idvar)%z1d with a non-blocking communication
468# if defined key_mpi2
469      IF( ln_timing ) CALL tic_tac( .TRUE., ld_global = .TRUE.)
470      CALL  mpi_allreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_DOUBLE_PRECISION, mpi_max, ilocalcomm, ndelayid(idvar), ierr )
471      IF( ln_timing ) CALL tic_tac(.FALSE., ld_global = .TRUE.)
472# else
473      CALL mpi_iallreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_DOUBLE_PRECISION, mpi_max, ilocalcomm, ndelayid(idvar), ierr )
474# endif
475#else
476      pout(:) = p_in(:)
477#endif
478
479   END SUBROUTINE mpp_delay_max
480
481   
482   SUBROUTINE mpp_delay_rcv( kid )
483      !!----------------------------------------------------------------------
484      !!                   ***  routine mpp_delay_rcv  ***
485      !!
486      !! ** Purpose :  force barrier for delayed mpp (needed for restart)
487      !!
488      !!----------------------------------------------------------------------
489      INTEGER,INTENT(in   )      ::  kid 
490      INTEGER ::   ierr
491      !!----------------------------------------------------------------------
492#if defined key_mpp_mpi
493      IF( ndelayid(kid) /= -2 ) THEN 
494#if ! defined key_mpi2
495         IF( ln_timing ) CALL tic_tac( .TRUE., ld_global = .TRUE.)
496         CALL mpi_wait( ndelayid(kid), MPI_STATUS_IGNORE, ierr )                        ! make sure todelay(kid) is received
497         IF( ln_timing ) CALL tic_tac(.FALSE., ld_global = .TRUE.)
498#endif
499         IF( ASSOCIATED(todelay(kid)%y1d) )   todelay(kid)%z1d(:) = REAL(todelay(kid)%y1d(:), wp)  ! define %z1d from %y1d
500         ndelayid(kid) = -2   ! add flag to know that mpi_wait was already called on kid
501      ENDIF
502#endif
503   END SUBROUTINE mpp_delay_rcv
504
505   SUBROUTINE mpp_bcast_nml( cdnambuff , kleng )
506      CHARACTER(LEN=:)    , ALLOCATABLE, INTENT(INOUT) :: cdnambuff
507      INTEGER                          , INTENT(INOUT) :: kleng
508      !!----------------------------------------------------------------------
509      !!                  ***  routine mpp_bcast_nml  ***
510      !!
511      !! ** Purpose :   broadcast namelist character buffer
512      !!
513      !!----------------------------------------------------------------------
514      !!
515      INTEGER ::   iflag
516      !!----------------------------------------------------------------------
517      !
518#if defined key_mpp_mpi
519      call MPI_BCAST(kleng, 1, MPI_INT, 0, mpi_comm_oce, iflag)
520      call MPI_BARRIER(mpi_comm_oce, iflag)
521!$AGRIF_DO_NOT_TREAT
522      IF ( .NOT. ALLOCATED(cdnambuff) ) ALLOCATE( CHARACTER(LEN=kleng) :: cdnambuff )
523!$AGRIF_END_DO_NOT_TREAT
524      call MPI_BCAST(cdnambuff, kleng, MPI_CHARACTER, 0, mpi_comm_oce, iflag)
525      call MPI_BARRIER(mpi_comm_oce, iflag)
526#endif
527      !
528   END SUBROUTINE mpp_bcast_nml
529
530   
531   !!----------------------------------------------------------------------
532   !!    ***  mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real  ***
533   !!   
534   !!----------------------------------------------------------------------
535   !!
536#  define OPERATION_MAX
537#  define INTEGER_TYPE
538#  define DIM_0d
539#     define ROUTINE_ALLREDUCE           mppmax_int
540#     include "mpp_allreduce_generic.h90"
541#     undef ROUTINE_ALLREDUCE
542#  undef DIM_0d
543#  define DIM_1d
544#     define ROUTINE_ALLREDUCE           mppmax_a_int
545#     include "mpp_allreduce_generic.h90"
546#     undef ROUTINE_ALLREDUCE
547#  undef DIM_1d
548#  undef INTEGER_TYPE
549!
550#  define REAL_TYPE
551#  define DIM_0d
552#     define ROUTINE_ALLREDUCE           mppmax_real
553#     include "mpp_allreduce_generic.h90"
554#     undef ROUTINE_ALLREDUCE
555#  undef DIM_0d
556#  define DIM_1d
557#     define ROUTINE_ALLREDUCE           mppmax_a_real
558#     include "mpp_allreduce_generic.h90"
559#     undef ROUTINE_ALLREDUCE
560#  undef DIM_1d
561#  undef REAL_TYPE
562#  undef OPERATION_MAX
563   !!----------------------------------------------------------------------
564   !!    ***  mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real  ***
565   !!   
566   !!----------------------------------------------------------------------
567   !!
568#  define OPERATION_MIN
569#  define INTEGER_TYPE
570#  define DIM_0d
571#     define ROUTINE_ALLREDUCE           mppmin_int
572#     include "mpp_allreduce_generic.h90"
573#     undef ROUTINE_ALLREDUCE
574#  undef DIM_0d
575#  define DIM_1d
576#     define ROUTINE_ALLREDUCE           mppmin_a_int
577#     include "mpp_allreduce_generic.h90"
578#     undef ROUTINE_ALLREDUCE
579#  undef DIM_1d
580#  undef INTEGER_TYPE
581!
582#  define REAL_TYPE
583#  define DIM_0d
584#     define ROUTINE_ALLREDUCE           mppmin_real
585#     include "mpp_allreduce_generic.h90"
586#     undef ROUTINE_ALLREDUCE
587#  undef DIM_0d
588#  define DIM_1d
589#     define ROUTINE_ALLREDUCE           mppmin_a_real
590#     include "mpp_allreduce_generic.h90"
591#     undef ROUTINE_ALLREDUCE
592#  undef DIM_1d
593#  undef REAL_TYPE
594#  undef OPERATION_MIN
595
596   !!----------------------------------------------------------------------
597   !!    ***  mppsum_a_int, mppsum_int, mppsum_a_real, mppsum_real  ***
598   !!   
599   !!   Global sum of 1D array or a variable (integer, real or complex)
600   !!----------------------------------------------------------------------
601   !!
602#  define OPERATION_SUM
603#  define INTEGER_TYPE
604#  define DIM_0d
605#     define ROUTINE_ALLREDUCE           mppsum_int
606#     include "mpp_allreduce_generic.h90"
607#     undef ROUTINE_ALLREDUCE
608#  undef DIM_0d
609#  define DIM_1d
610#     define ROUTINE_ALLREDUCE           mppsum_a_int
611#     include "mpp_allreduce_generic.h90"
612#     undef ROUTINE_ALLREDUCE
613#  undef DIM_1d
614#  undef INTEGER_TYPE
615!
616#  define REAL_TYPE
617#  define DIM_0d
618#     define ROUTINE_ALLREDUCE           mppsum_real
619#     include "mpp_allreduce_generic.h90"
620#     undef ROUTINE_ALLREDUCE
621#  undef DIM_0d
622#  define DIM_1d
623#     define ROUTINE_ALLREDUCE           mppsum_a_real
624#     include "mpp_allreduce_generic.h90"
625#     undef ROUTINE_ALLREDUCE
626#  undef DIM_1d
627#  undef REAL_TYPE
628#  undef OPERATION_SUM
629
630#  define OPERATION_SUM_DD
631#  define COMPLEX_TYPE
632#  define DIM_0d
633#     define ROUTINE_ALLREDUCE           mppsum_realdd
634#     include "mpp_allreduce_generic.h90"
635#     undef ROUTINE_ALLREDUCE
636#  undef DIM_0d
637#  define DIM_1d
638#     define ROUTINE_ALLREDUCE           mppsum_a_realdd
639#     include "mpp_allreduce_generic.h90"
640#     undef ROUTINE_ALLREDUCE
641#  undef DIM_1d
642#  undef COMPLEX_TYPE
643#  undef OPERATION_SUM_DD
644
645   !!----------------------------------------------------------------------
646   !!    ***  mpp_minloc2d, mpp_minloc3d, mpp_maxloc2d, mpp_maxloc3d
647   !!   
648   !!----------------------------------------------------------------------
649   !!
650#  define OPERATION_MINLOC
651#  define DIM_2d
652#     define ROUTINE_LOC           mpp_minloc2d
653#     include "mpp_loc_generic.h90"
654#     undef ROUTINE_LOC
655#  undef DIM_2d
656#  define DIM_3d
657#     define ROUTINE_LOC           mpp_minloc3d
658#     include "mpp_loc_generic.h90"
659#     undef ROUTINE_LOC
660#  undef DIM_3d
661#  undef OPERATION_MINLOC
662
663#  define OPERATION_MAXLOC
664#  define DIM_2d
665#     define ROUTINE_LOC           mpp_maxloc2d
666#     include "mpp_loc_generic.h90"
667#     undef ROUTINE_LOC
668#  undef DIM_2d
669#  define DIM_3d
670#     define ROUTINE_LOC           mpp_maxloc3d
671#     include "mpp_loc_generic.h90"
672#     undef ROUTINE_LOC
673#  undef DIM_3d
674#  undef OPERATION_MAXLOC
675
676   SUBROUTINE mppsync()
677      !!----------------------------------------------------------------------
678      !!                  ***  routine mppsync  ***
679      !!
680      !! ** Purpose :   Massively parallel processors, synchroneous
681      !!
682      !!-----------------------------------------------------------------------
683      INTEGER :: ierror
684      !!-----------------------------------------------------------------------
685      !
686#if defined key_mpp_mpi
687      CALL mpi_barrier( mpi_comm_oce, ierror )
688#endif
689      !
690   END SUBROUTINE mppsync
691
692
693   SUBROUTINE mppstop( ld_abort ) 
694      !!----------------------------------------------------------------------
695      !!                  ***  routine mppstop  ***
696      !!
697      !! ** purpose :   Stop massively parallel processors method
698      !!
699      !!----------------------------------------------------------------------
700      LOGICAL, OPTIONAL, INTENT(in) :: ld_abort    ! source process number
701      LOGICAL ::   ll_abort
702      INTEGER ::   info
703      !!----------------------------------------------------------------------
704      ll_abort = .FALSE.
705      IF( PRESENT(ld_abort) ) ll_abort = ld_abort
706      !
707#if defined key_mpp_mpi
708      IF(ll_abort) THEN
709         CALL mpi_abort( MPI_COMM_WORLD )
710      ELSE
711         CALL mppsync
712         CALL mpi_finalize( info )
713      ENDIF
714#endif
715      IF( ll_abort ) STOP 123
716      !
717   END SUBROUTINE mppstop
718
719
720   SUBROUTINE mpp_comm_free( kcom )
721      !!----------------------------------------------------------------------
722      INTEGER, INTENT(in) ::   kcom
723      !!
724      INTEGER :: ierr
725      !!----------------------------------------------------------------------
726      !
727#if defined key_mpp_mpi
728      CALL MPI_COMM_FREE(kcom, ierr)
729#endif
730      !
731   END SUBROUTINE mpp_comm_free
732
733
734   SUBROUTINE mpp_ini_znl( kumout )
735      !!----------------------------------------------------------------------
736      !!               ***  routine mpp_ini_znl  ***
737      !!
738      !! ** Purpose :   Initialize special communicator for computing zonal sum
739      !!
740      !! ** Method  : - Look for processors in the same row
741      !!              - Put their number in nrank_znl
742      !!              - Create group for the znl processors
743      !!              - Create a communicator for znl processors
744      !!              - Determine if processor should write znl files
745      !!
746      !! ** output
747      !!      ndim_rank_znl = number of processors on the same row
748      !!      ngrp_znl = group ID for the znl processors
749      !!      ncomm_znl = communicator for the ice procs.
750      !!      n_znl_root = number (in the world) of proc 0 in the ice comm.
751      !!
752      !!----------------------------------------------------------------------
753      INTEGER, INTENT(in) ::   kumout   ! ocean.output logical units
754      !
755      INTEGER :: jproc      ! dummy loop integer
756      INTEGER :: ierr, ii   ! local integer
757      INTEGER, ALLOCATABLE, DIMENSION(:) ::   kwork
758      !!----------------------------------------------------------------------
759#if defined key_mpp_mpi
760      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_world     : ', ngrp_world
761      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - mpi_comm_world : ', mpi_comm_world
762      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - mpi_comm_oce   : ', mpi_comm_oce
763      !
764      ALLOCATE( kwork(jpnij), STAT=ierr )
765      IF( ierr /= 0 ) CALL ctl_stop( 'STOP', 'mpp_ini_znl : failed to allocate 1D array of length jpnij')
766
767      IF( jpnj == 1 ) THEN
768         ngrp_znl  = ngrp_world
769         ncomm_znl = mpi_comm_oce
770      ELSE
771         !
772         CALL MPI_ALLGATHER ( njmpp, 1, mpi_integer, kwork, 1, mpi_integer, mpi_comm_oce, ierr )
773         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - kwork pour njmpp : ', kwork
774         !-$$        CALL flush(numout)
775         !
776         ! Count number of processors on the same row
777         ndim_rank_znl = 0
778         DO jproc=1,jpnij
779            IF ( kwork(jproc) == njmpp ) THEN
780               ndim_rank_znl = ndim_rank_znl + 1
781            ENDIF
782         END DO
783         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ndim_rank_znl : ', ndim_rank_znl
784         !-$$        CALL flush(numout)
785         ! Allocate the right size to nrank_znl
786         IF (ALLOCATED (nrank_znl)) DEALLOCATE(nrank_znl)
787         ALLOCATE(nrank_znl(ndim_rank_znl))
788         ii = 0
789         nrank_znl (:) = 0
790         DO jproc=1,jpnij
791            IF ( kwork(jproc) == njmpp) THEN
792               ii = ii + 1
793               nrank_znl(ii) = jproc -1
794            ENDIF
795         END DO
796         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - nrank_znl : ', nrank_znl
797         !-$$        CALL flush(numout)
798
799         ! Create the opa group
800         CALL MPI_COMM_GROUP(mpi_comm_oce,ngrp_opa,ierr)
801         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_opa : ', ngrp_opa
802         !-$$        CALL flush(numout)
803
804         ! Create the znl group from the opa group
805         CALL MPI_GROUP_INCL  ( ngrp_opa, ndim_rank_znl, nrank_znl, ngrp_znl, ierr )
806         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_znl ', ngrp_znl
807         !-$$        CALL flush(numout)
808
809         ! Create the znl communicator from the opa communicator, ie the pool of procs in the same row
810         CALL MPI_COMM_CREATE ( mpi_comm_oce, ngrp_znl, ncomm_znl, ierr )
811         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ncomm_znl ', ncomm_znl
812         !-$$        CALL flush(numout)
813         !
814      END IF
815
816      ! Determines if processor if the first (starting from i=1) on the row
817      IF ( jpni == 1 ) THEN
818         l_znl_root = .TRUE.
819      ELSE
820         l_znl_root = .FALSE.
821         kwork (1) = nimpp
822         CALL mpp_min ( 'lib_mpp', kwork(1), kcom = ncomm_znl)
823         IF ( nimpp == kwork(1)) l_znl_root = .TRUE.
824      END IF
825
826      DEALLOCATE(kwork)
827#endif
828
829   END SUBROUTINE mpp_ini_znl
830
831
832   SUBROUTINE mpp_ini_north
833      !!----------------------------------------------------------------------
834      !!               ***  routine mpp_ini_north  ***
835      !!
836      !! ** Purpose :   Initialize special communicator for north folding
837      !!      condition together with global variables needed in the mpp folding
838      !!
839      !! ** Method  : - Look for northern processors
840      !!              - Put their number in nrank_north
841      !!              - Create groups for the world processors and the north processors
842      !!              - Create a communicator for northern processors
843      !!
844      !! ** output
845      !!      njmppmax = njmpp for northern procs
846      !!      ndim_rank_north = number of processors in the northern line
847      !!      nrank_north (ndim_rank_north) = number  of the northern procs.
848      !!      ngrp_world = group ID for the world processors
849      !!      ngrp_north = group ID for the northern processors
850      !!      ncomm_north = communicator for the northern procs.
851      !!      north_root = number (in the world) of proc 0 in the northern comm.
852      !!
853      !!----------------------------------------------------------------------
854      INTEGER ::   ierr
855      INTEGER ::   jjproc
856      INTEGER ::   ii, ji
857      !!----------------------------------------------------------------------
858      !
859#if defined key_mpp_mpi
860      njmppmax = MAXVAL( njmppt )
861      !
862      ! Look for how many procs on the northern boundary
863      ndim_rank_north = 0
864      DO jjproc = 1, jpnij
865         IF( njmppt(jjproc) == njmppmax )   ndim_rank_north = ndim_rank_north + 1
866      END DO
867      !
868      ! Allocate the right size to nrank_north
869      IF (ALLOCATED (nrank_north)) DEALLOCATE(nrank_north)
870      ALLOCATE( nrank_north(ndim_rank_north) )
871
872      ! Fill the nrank_north array with proc. number of northern procs.
873      ! Note : the rank start at 0 in MPI
874      ii = 0
875      DO ji = 1, jpnij
876         IF ( njmppt(ji) == njmppmax   ) THEN
877            ii=ii+1
878            nrank_north(ii)=ji-1
879         END IF
880      END DO
881      !
882      ! create the world group
883      CALL MPI_COMM_GROUP( mpi_comm_oce, ngrp_world, ierr )
884      !
885      ! Create the North group from the world group
886      CALL MPI_GROUP_INCL( ngrp_world, ndim_rank_north, nrank_north, ngrp_north, ierr )
887      !
888      ! Create the North communicator , ie the pool of procs in the north group
889      CALL MPI_COMM_CREATE( mpi_comm_oce, ngrp_north, ncomm_north, ierr )
890      !
891#endif
892   END SUBROUTINE mpp_ini_north
893
894
895   SUBROUTINE DDPDD_MPI( ydda, yddb, ilen, itype )
896      !!---------------------------------------------------------------------
897      !!   Routine DDPDD_MPI: used by reduction operator MPI_SUMDD
898      !!
899      !!   Modification of original codes written by David H. Bailey
900      !!   This subroutine computes yddb(i) = ydda(i)+yddb(i)
901      !!---------------------------------------------------------------------
902      INTEGER                     , INTENT(in)    ::   ilen, itype
903      COMPLEX(wp), DIMENSION(ilen), INTENT(in)    ::   ydda
904      COMPLEX(wp), DIMENSION(ilen), INTENT(inout) ::   yddb
905      !
906      REAL(wp) :: zerr, zt1, zt2    ! local work variables
907      INTEGER  :: ji, ztmp           ! local scalar
908      !!---------------------------------------------------------------------
909      !
910      ztmp = itype   ! avoid compilation warning
911      !
912      DO ji=1,ilen
913      ! Compute ydda + yddb using Knuth's trick.
914         zt1  = real(ydda(ji)) + real(yddb(ji))
915         zerr = zt1 - real(ydda(ji))
916         zt2  = ((real(yddb(ji)) - zerr) + (real(ydda(ji)) - (zt1 - zerr))) &
917                + aimag(ydda(ji)) + aimag(yddb(ji))
918
919         ! The result is zt1 + zt2, after normalization.
920         yddb(ji) = cmplx ( zt1 + zt2, zt2 - ((zt1 + zt2) - zt1),wp )
921      END DO
922      !
923   END SUBROUTINE DDPDD_MPI
924
925
926   SUBROUTINE mpp_report( cdname, kpk, kpl, kpf, ld_lbc, ld_glb, ld_dlg )
927      !!----------------------------------------------------------------------
928      !!                  ***  routine mpp_report  ***
929      !!
930      !! ** Purpose :   report use of mpp routines per time-setp
931      !!
932      !!----------------------------------------------------------------------
933      CHARACTER(len=*),           INTENT(in   ) ::   cdname      ! name of the calling subroutine
934      INTEGER         , OPTIONAL, INTENT(in   ) ::   kpk, kpl, kpf
935      LOGICAL         , OPTIONAL, INTENT(in   ) ::   ld_lbc, ld_glb, ld_dlg
936      !!
937      CHARACTER(len=128)                        ::   ccountname  ! name of a subroutine to count communications
938      LOGICAL ::   ll_lbc, ll_glb, ll_dlg
939      INTEGER ::    ji,  jj,  jk,  jh, jf, jcount   ! dummy loop indices
940      !!----------------------------------------------------------------------
941#if defined key_mpp_mpi
942      !
943      ll_lbc = .FALSE.
944      IF( PRESENT(ld_lbc) ) ll_lbc = ld_lbc
945      ll_glb = .FALSE.
946      IF( PRESENT(ld_glb) ) ll_glb = ld_glb
947      ll_dlg = .FALSE.
948      IF( PRESENT(ld_dlg) ) ll_dlg = ld_dlg
949      !
950      ! find the smallest common frequency: default = frequency product, if multiple, choose the larger of the 2 frequency
951      IF( ncom_dttrc /= 1 )   CALL ctl_stop( 'STOP', 'mpp_report, ncom_dttrc /= 1 not coded...' ) 
952      ncom_freq = ncom_fsbc
953      !
954      IF ( ncom_stp == nit000+ncom_freq ) THEN   ! avoid to count extra communications in potential initializations at nit000
955         IF( ll_lbc ) THEN
956            IF( .NOT. ALLOCATED(ncomm_sequence) ) ALLOCATE( ncomm_sequence(ncom_rec_max,2) )
957            IF( .NOT. ALLOCATED(    crname_lbc) ) ALLOCATE(     crname_lbc(ncom_rec_max  ) )
958            n_sequence_lbc = n_sequence_lbc + 1
959            IF( n_sequence_lbc > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
960            crname_lbc(n_sequence_lbc) = cdname     ! keep the name of the calling routine
961            ncomm_sequence(n_sequence_lbc,1) = kpk*kpl   ! size of 3rd and 4th dimensions
962            ncomm_sequence(n_sequence_lbc,2) = kpf       ! number of arrays to be treated (multi)
963         ENDIF
964         IF( ll_glb ) THEN
965            IF( .NOT. ALLOCATED(crname_glb) ) ALLOCATE( crname_glb(ncom_rec_max) )
966            n_sequence_glb = n_sequence_glb + 1
967            IF( n_sequence_glb > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
968            crname_glb(n_sequence_glb) = cdname     ! keep the name of the calling routine
969         ENDIF
970         IF( ll_dlg ) THEN
971            IF( .NOT. ALLOCATED(crname_dlg) ) ALLOCATE( crname_dlg(ncom_rec_max) )
972            n_sequence_dlg = n_sequence_dlg + 1
973            IF( n_sequence_dlg > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
974            crname_dlg(n_sequence_dlg) = cdname     ! keep the name of the calling routine
975         ENDIF
976      ELSE IF ( ncom_stp == nit000+2*ncom_freq ) THEN
977         CALL ctl_opn( numcom, 'communication_report.txt', 'REPLACE', 'FORMATTED', 'SEQUENTIAL', -1, numout, .FALSE., narea )
978         WRITE(numcom,*) ' '
979         WRITE(numcom,*) ' ------------------------------------------------------------'
980         WRITE(numcom,*) ' Communication pattern report (second oce+sbc+top time step):'
981         WRITE(numcom,*) ' ------------------------------------------------------------'
982         WRITE(numcom,*) ' '
983         WRITE(numcom,'(A,I4)') ' Exchanged halos : ', n_sequence_lbc
984         jj = 0; jk = 0; jf = 0; jh = 0
985         DO ji = 1, n_sequence_lbc
986            IF ( ncomm_sequence(ji,1) .GT. 1 ) jk = jk + 1
987            IF ( ncomm_sequence(ji,2) .GT. 1 ) jf = jf + 1
988            IF ( ncomm_sequence(ji,1) .GT. 1 .AND. ncomm_sequence(ji,2) .GT. 1 ) jj = jj + 1
989            jh = MAX (jh, ncomm_sequence(ji,1)*ncomm_sequence(ji,2))
990         END DO
991         WRITE(numcom,'(A,I3)') ' 3D Exchanged halos : ', jk
992         WRITE(numcom,'(A,I3)') ' Multi arrays exchanged halos : ', jf
993         WRITE(numcom,'(A,I3)') '   from which 3D : ', jj
994         WRITE(numcom,'(A,I10)') ' Array max size : ', jh*jpi*jpj
995         WRITE(numcom,*) ' '
996         WRITE(numcom,*) ' lbc_lnk called'
997         DO ji = 1, n_sequence_lbc - 1
998            IF ( crname_lbc(ji) /= 'already counted' ) THEN
999               ccountname = crname_lbc(ji)
1000               crname_lbc(ji) = 'already counted'
1001               jcount = 1
1002               DO jj = ji + 1, n_sequence_lbc
1003                  IF ( ccountname ==  crname_lbc(jj) ) THEN
1004                     jcount = jcount + 1
1005                     crname_lbc(jj) = 'already counted'
1006                  END IF
1007               END DO
1008               WRITE(numcom,'(A, I4, A, A)') ' - ', jcount,' times by subroutine ', TRIM(ccountname)
1009            END IF
1010         END DO
1011         IF ( crname_lbc(n_sequence_lbc) /= 'already counted' ) THEN
1012            WRITE(numcom,'(A, I4, A, A)') ' - ', 1,' times by subroutine ', TRIM(crname_lbc(ncom_rec_max))
1013         END IF
1014         WRITE(numcom,*) ' '
1015         IF ( n_sequence_glb > 0 ) THEN
1016            WRITE(numcom,'(A,I4)') ' Global communications : ', n_sequence_glb
1017            jj = 1
1018            DO ji = 2, n_sequence_glb
1019               IF( crname_glb(ji-1) /= crname_glb(ji) ) THEN
1020                  WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_glb(ji-1))
1021                  jj = 0
1022               END IF
1023               jj = jj + 1 
1024            END DO
1025            WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_glb(n_sequence_glb))
1026            DEALLOCATE(crname_glb)
1027         ELSE
1028            WRITE(numcom,*) ' No MPI global communication '
1029         ENDIF
1030         WRITE(numcom,*) ' '
1031         IF ( n_sequence_dlg > 0 ) THEN
1032            WRITE(numcom,'(A,I4)') ' Delayed global communications : ', n_sequence_dlg
1033            jj = 1
1034            DO ji = 2, n_sequence_dlg
1035               IF( crname_dlg(ji-1) /= crname_dlg(ji) ) THEN
1036                  WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_dlg(ji-1))
1037                  jj = 0
1038               END IF
1039               jj = jj + 1 
1040            END DO
1041            WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_dlg(n_sequence_dlg))
1042            DEALLOCATE(crname_dlg)
1043         ELSE
1044            WRITE(numcom,*) ' No MPI delayed global communication '
1045         ENDIF
1046         WRITE(numcom,*) ' '
1047         WRITE(numcom,*) ' -----------------------------------------------'
1048         WRITE(numcom,*) ' '
1049         DEALLOCATE(ncomm_sequence)
1050         DEALLOCATE(crname_lbc)
1051      ENDIF
1052#endif
1053   END SUBROUTINE mpp_report
1054
1055   
1056   SUBROUTINE tic_tac (ld_tic, ld_global)
1057
1058    LOGICAL,           INTENT(IN) :: ld_tic
1059    LOGICAL, OPTIONAL, INTENT(IN) :: ld_global
1060    REAL(wp), DIMENSION(2), SAVE :: tic_wt
1061    REAL(wp),               SAVE :: tic_ct = 0._wp
1062    INTEGER :: ii
1063#if defined key_mpp_mpi
1064
1065    IF( ncom_stp <= nit000 ) RETURN
1066    IF( ncom_stp == nitend ) RETURN
1067    ii = 1
1068    IF( PRESENT( ld_global ) ) THEN
1069       IF( ld_global ) ii = 2
1070    END IF
1071   
1072    IF ( ld_tic ) THEN
1073       tic_wt(ii) = MPI_Wtime()                                                    ! start count tic->tac (waiting time)
1074       IF ( tic_ct > 0.0_wp ) compute_time = compute_time + MPI_Wtime() - tic_ct   ! cumulate count tac->tic
1075    ELSE
1076       waiting_time(ii) = waiting_time(ii) + MPI_Wtime() - tic_wt(ii)              ! cumulate count tic->tac
1077       tic_ct = MPI_Wtime()                                                        ! start count tac->tic (waiting time)
1078    ENDIF
1079#endif
1080   
1081   END SUBROUTINE tic_tac
1082
1083#if ! defined key_mpp_mpi
1084   SUBROUTINE mpi_wait(request, status, ierror)
1085      INTEGER                            , INTENT(in   ) ::   request
1086      INTEGER, DIMENSION(MPI_STATUS_SIZE), INTENT(  out) ::   status
1087      INTEGER                            , INTENT(  out) ::   ierror
1088   END SUBROUTINE mpi_wait
1089
1090   
1091   FUNCTION MPI_Wtime()
1092      REAL(wp) ::  MPI_Wtime
1093      MPI_Wtime = -1.
1094   END FUNCTION MPI_Wtime
1095#endif
1096
1097   !!----------------------------------------------------------------------
1098   !!   ctl_stop, ctl_warn, get_unit, ctl_opn, ctl_nam, load_nml   routines
1099   !!----------------------------------------------------------------------
1100
1101   SUBROUTINE ctl_stop( cd1, cd2, cd3, cd4, cd5 ,   &
1102      &                 cd6, cd7, cd8, cd9, cd10 )
1103      !!----------------------------------------------------------------------
1104      !!                  ***  ROUTINE  stop_opa  ***
1105      !!
1106      !! ** Purpose :   print in ocean.outpput file a error message and
1107      !!                increment the error number (nstop) by one.
1108      !!----------------------------------------------------------------------
1109      CHARACTER(len=*), INTENT(in   )           ::   cd1
1110      CHARACTER(len=*), INTENT(in   ), OPTIONAL ::        cd2, cd3, cd4, cd5
1111      CHARACTER(len=*), INTENT(in   ), OPTIONAL ::   cd6, cd7, cd8, cd9, cd10
1112      !!----------------------------------------------------------------------
1113      !
1114      nstop = nstop + 1
1115      !
1116      ! force to open ocean.output file if not already opened
1117      IF( numout == 6 ) CALL ctl_opn( numout, 'ocean.output', 'APPEND', 'FORMATTED', 'SEQUENTIAL', -1, 6, .FALSE. )
1118      !
1119                            WRITE(numout,*)
1120                            WRITE(numout,*) ' ===>>> : E R R O R'
1121                            WRITE(numout,*)
1122                            WRITE(numout,*) '         ==========='
1123                            WRITE(numout,*)
1124                            WRITE(numout,*) TRIM(cd1)
1125      IF( PRESENT(cd2 ) )   WRITE(numout,*) TRIM(cd2)
1126      IF( PRESENT(cd3 ) )   WRITE(numout,*) TRIM(cd3)
1127      IF( PRESENT(cd4 ) )   WRITE(numout,*) TRIM(cd4)
1128      IF( PRESENT(cd5 ) )   WRITE(numout,*) TRIM(cd5)
1129      IF( PRESENT(cd6 ) )   WRITE(numout,*) TRIM(cd6)
1130      IF( PRESENT(cd7 ) )   WRITE(numout,*) TRIM(cd7)
1131      IF( PRESENT(cd8 ) )   WRITE(numout,*) TRIM(cd8)
1132      IF( PRESENT(cd9 ) )   WRITE(numout,*) TRIM(cd9)
1133      IF( PRESENT(cd10) )   WRITE(numout,*) TRIM(cd10)
1134                            WRITE(numout,*)
1135      !
1136                               CALL FLUSH(numout    )
1137      IF( numstp     /= -1 )   CALL FLUSH(numstp    )
1138      IF( numrun     /= -1 )   CALL FLUSH(numrun    )
1139      IF( numevo_ice /= -1 )   CALL FLUSH(numevo_ice)
1140      !
1141      IF( cd1 == 'STOP' ) THEN
1142         WRITE(numout,*) 
1143         WRITE(numout,*)  'huge E-R-R-O-R : immediate stop'
1144         WRITE(numout,*) 
1145         CALL mppstop( ld_abort = .true. )
1146      ENDIF
1147      !
1148   END SUBROUTINE ctl_stop
1149
1150
1151   SUBROUTINE ctl_warn( cd1, cd2, cd3, cd4, cd5,   &
1152      &                 cd6, cd7, cd8, cd9, cd10 )
1153      !!----------------------------------------------------------------------
1154      !!                  ***  ROUTINE  stop_warn  ***
1155      !!
1156      !! ** Purpose :   print in ocean.outpput file a error message and
1157      !!                increment the warning number (nwarn) by one.
1158      !!----------------------------------------------------------------------
1159      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd1, cd2, cd3, cd4, cd5
1160      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd6, cd7, cd8, cd9, cd10
1161      !!----------------------------------------------------------------------
1162      !
1163      nwarn = nwarn + 1
1164      !
1165      IF(lwp) THEN
1166                               WRITE(numout,*)
1167                               WRITE(numout,*) ' ===>>> : W A R N I N G'
1168                               WRITE(numout,*)
1169                               WRITE(numout,*) '         ==============='
1170                               WRITE(numout,*)
1171         IF( PRESENT(cd1 ) )   WRITE(numout,*) TRIM(cd1)
1172         IF( PRESENT(cd2 ) )   WRITE(numout,*) TRIM(cd2)
1173         IF( PRESENT(cd3 ) )   WRITE(numout,*) TRIM(cd3)
1174         IF( PRESENT(cd4 ) )   WRITE(numout,*) TRIM(cd4)
1175         IF( PRESENT(cd5 ) )   WRITE(numout,*) TRIM(cd5)
1176         IF( PRESENT(cd6 ) )   WRITE(numout,*) TRIM(cd6)
1177         IF( PRESENT(cd7 ) )   WRITE(numout,*) TRIM(cd7)
1178         IF( PRESENT(cd8 ) )   WRITE(numout,*) TRIM(cd8)
1179         IF( PRESENT(cd9 ) )   WRITE(numout,*) TRIM(cd9)
1180         IF( PRESENT(cd10) )   WRITE(numout,*) TRIM(cd10)
1181                               WRITE(numout,*)
1182      ENDIF
1183      CALL FLUSH(numout)
1184      !
1185   END SUBROUTINE ctl_warn
1186
1187
1188   SUBROUTINE ctl_opn( knum, cdfile, cdstat, cdform, cdacce, klengh, kout, ldwp, karea )
1189      !!----------------------------------------------------------------------
1190      !!                  ***  ROUTINE ctl_opn  ***
1191      !!
1192      !! ** Purpose :   Open file and check if required file is available.
1193      !!
1194      !! ** Method  :   Fortan open
1195      !!----------------------------------------------------------------------
1196      INTEGER          , INTENT(  out) ::   knum      ! logical unit to open
1197      CHARACTER(len=*) , INTENT(in   ) ::   cdfile    ! file name to open
1198      CHARACTER(len=*) , INTENT(in   ) ::   cdstat    ! disposition specifier
1199      CHARACTER(len=*) , INTENT(in   ) ::   cdform    ! formatting specifier
1200      CHARACTER(len=*) , INTENT(in   ) ::   cdacce    ! access specifier
1201      INTEGER          , INTENT(in   ) ::   klengh    ! record length
1202      INTEGER          , INTENT(in   ) ::   kout      ! number of logical units for write
1203      LOGICAL          , INTENT(in   ) ::   ldwp      ! boolean term for print
1204      INTEGER, OPTIONAL, INTENT(in   ) ::   karea     ! proc number
1205      !
1206      CHARACTER(len=80) ::   clfile
1207      INTEGER           ::   iost
1208      !!----------------------------------------------------------------------
1209      !
1210      ! adapt filename
1211      ! ----------------
1212      clfile = TRIM(cdfile)
1213      IF( PRESENT( karea ) ) THEN
1214         IF( karea > 1 )   WRITE(clfile, "(a,'_',i4.4)") TRIM(clfile), karea-1
1215      ENDIF
1216#if defined key_agrif
1217      IF( .NOT. Agrif_Root() )   clfile = TRIM(Agrif_CFixed())//'_'//TRIM(clfile)
1218      knum=Agrif_Get_Unit()
1219#else
1220      knum=get_unit()
1221#endif
1222      IF( TRIM(cdfile) == '/dev/null' )   clfile = TRIM(cdfile)   ! force the use of /dev/null
1223      !
1224      IF(       cdacce(1:6) == 'DIRECT' )  THEN   ! cdacce has always more than 6 characters
1225         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS=cdstat, RECL=klengh         , ERR=100, IOSTAT=iost )
1226      ELSE IF( TRIM(cdstat) == 'APPEND' )  THEN   ! cdstat can have less than 6 characters
1227         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS='UNKNOWN', POSITION='APPEND', ERR=100, IOSTAT=iost )
1228      ELSE
1229         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS=cdstat                      , ERR=100, IOSTAT=iost )
1230      ENDIF
1231      IF( iost /= 0 .AND. TRIM(clfile) == '/dev/null' ) &   ! for windows
1232         &  OPEN(UNIT=knum,FILE='NUL', FORM=cdform, ACCESS=cdacce, STATUS=cdstat                      , ERR=100, IOSTAT=iost )   
1233      IF( iost == 0 ) THEN
1234         IF(ldwp .AND. kout > 0) THEN
1235            WRITE(kout,*) '     file   : ', TRIM(clfile),' open ok'
1236            WRITE(kout,*) '     unit   = ', knum
1237            WRITE(kout,*) '     status = ', cdstat
1238            WRITE(kout,*) '     form   = ', cdform
1239            WRITE(kout,*) '     access = ', cdacce
1240            WRITE(kout,*)
1241         ENDIF
1242      ENDIF
1243100   CONTINUE
1244      IF( iost /= 0 ) THEN
1245         WRITE(ctmp1,*) ' ===>>>> : bad opening file: ', TRIM(clfile)
1246         WRITE(ctmp2,*) ' =======   ===  '
1247         WRITE(ctmp3,*) '           unit   = ', knum
1248         WRITE(ctmp4,*) '           status = ', cdstat
1249         WRITE(ctmp5,*) '           form   = ', cdform
1250         WRITE(ctmp6,*) '           access = ', cdacce
1251         WRITE(ctmp7,*) '           iostat = ', iost
1252         WRITE(ctmp8,*) '           we stop. verify the file '
1253         CALL ctl_stop( 'STOP', ctmp1, ctmp2, ctmp3, ctmp4, ctmp5, ctmp6, ctmp7, ctmp8 )
1254      ENDIF
1255      !
1256   END SUBROUTINE ctl_opn
1257
1258
1259   SUBROUTINE ctl_nam ( kios, cdnam )
1260      !!----------------------------------------------------------------------
1261      !!                  ***  ROUTINE ctl_nam  ***
1262      !!
1263      !! ** Purpose :   Informations when error while reading a namelist
1264      !!
1265      !! ** Method  :   Fortan open
1266      !!----------------------------------------------------------------------
1267      INTEGER                                , INTENT(inout) ::   kios    ! IO status after reading the namelist
1268      CHARACTER(len=*)                       , INTENT(in   ) ::   cdnam   ! group name of namelist for which error occurs
1269      !
1270      CHARACTER(len=5) ::   clios   ! string to convert iostat in character for print
1271      !!----------------------------------------------------------------------
1272      !
1273      WRITE (clios, '(I5.0)')   kios
1274      IF( kios < 0 ) THEN         
1275         CALL ctl_warn( 'end of record or file while reading namelist '   &
1276            &           // TRIM(cdnam) // ' iostat = ' // TRIM(clios) )
1277      ENDIF
1278      !
1279      IF( kios > 0 ) THEN
1280         CALL ctl_stop( 'misspelled variable in namelist '   &
1281            &           // TRIM(cdnam) // ' iostat = ' // TRIM(clios) )
1282      ENDIF
1283      kios = 0
1284      !
1285   END SUBROUTINE ctl_nam
1286
1287
1288   INTEGER FUNCTION get_unit()
1289      !!----------------------------------------------------------------------
1290      !!                  ***  FUNCTION  get_unit  ***
1291      !!
1292      !! ** Purpose :   return the index of an unused logical unit
1293      !!----------------------------------------------------------------------
1294      LOGICAL :: llopn
1295      !!----------------------------------------------------------------------
1296      !
1297      get_unit = 15   ! choose a unit that is big enough then it is not already used in NEMO
1298      llopn = .TRUE.
1299      DO WHILE( (get_unit < 998) .AND. llopn )
1300         get_unit = get_unit + 1
1301         INQUIRE( unit = get_unit, opened = llopn )
1302      END DO
1303      IF( (get_unit == 999) .AND. llopn ) THEN
1304         CALL ctl_stop( 'STOP', 'get_unit: All logical units until 999 are used...' )
1305      ENDIF
1306      !
1307   END FUNCTION get_unit
1308
1309   SUBROUTINE load_nml( cdnambuff , cdnamfile, kout, ldwp)
1310      CHARACTER(LEN=:)    , ALLOCATABLE, INTENT(INOUT) :: cdnambuff
1311      CHARACTER(LEN=*), INTENT(IN )                :: cdnamfile
1312      CHARACTER(LEN=256)                           :: chline
1313      CHARACTER(LEN=1)                             :: csp
1314      INTEGER, INTENT(IN)                          :: kout
1315      LOGICAL, INTENT(IN)                          :: ldwp  !: .true. only for the root broadcaster
1316      INTEGER                                      :: itot, iun, iltc, inl, ios, itotsav
1317      !
1318      !csp = NEW_LINE('A')
1319      ! a new line character is the best seperator but some systems (e.g.Cray)
1320      ! seem to terminate namelist reads from internal files early if they
1321      ! encounter new-lines. Use a single space for safety.
1322      csp = ' '
1323      !
1324      ! Check if the namelist buffer has already been allocated. Return if it has.
1325      !
1326      IF ( ALLOCATED( cdnambuff ) ) RETURN
1327      IF( ldwp ) THEN
1328         !
1329         ! Open namelist file
1330         !
1331         CALL ctl_opn( iun, cdnamfile, 'OLD', 'FORMATTED', 'SEQUENTIAL', -1, kout, ldwp )
1332         !
1333         ! First pass: count characters excluding comments and trimable white space
1334         !
1335         itot=0
1336     10  READ(iun,'(A256)',END=20,ERR=20) chline
1337         iltc = LEN_TRIM(chline)
1338         IF ( iltc.GT.0 ) THEN
1339          inl = INDEX(chline, '!') 
1340          IF( inl.eq.0 ) THEN
1341           itot = itot + iltc + 1                                ! +1 for the newline character
1342          ELSEIF( inl.GT.0 .AND. LEN_TRIM( chline(1:inl-1) ).GT.0 ) THEN
1343           itot = itot + inl                                  !  includes +1 for the newline character
1344          ENDIF
1345         ENDIF
1346         GOTO 10
1347     20  CONTINUE
1348         !
1349         ! Allocate text cdnambuff for condensed namelist
1350         !
1351!$AGRIF_DO_NOT_TREAT
1352         ALLOCATE( CHARACTER(LEN=itot) :: cdnambuff )
1353!$AGRIF_END_DO_NOT_TREAT
1354         itotsav = itot
1355         !
1356         ! Second pass: read and transfer pruned characters into cdnambuff
1357         !
1358         REWIND(iun)
1359         itot=1
1360     30  READ(iun,'(A256)',END=40,ERR=40) chline
1361         iltc = LEN_TRIM(chline)
1362         IF ( iltc.GT.0 ) THEN
1363          inl = INDEX(chline, '!')
1364          IF( inl.eq.0 ) THEN
1365           inl = iltc
1366          ELSE
1367           inl = inl - 1
1368          ENDIF
1369          IF( inl.GT.0 .AND. LEN_TRIM( chline(1:inl) ).GT.0 ) THEN
1370             cdnambuff(itot:itot+inl-1) = chline(1:inl)
1371             WRITE( cdnambuff(itot+inl:itot+inl), '(a)' ) csp
1372             itot = itot + inl + 1
1373          ENDIF
1374         ENDIF
1375         GOTO 30
1376     40  CONTINUE
1377         itot = itot - 1
1378         IF( itotsav .NE. itot ) WRITE(*,*) 'WARNING in load_nml. Allocated ',itotsav,' for read buffer; but used ',itot
1379         !
1380         ! Close namelist file
1381         !
1382         CLOSE(iun)
1383         !write(*,'(32A)') cdnambuff
1384      ENDIF
1385#if defined key_mpp_mpi
1386      CALL mpp_bcast_nml( cdnambuff, itot )
1387#endif
1388  END SUBROUTINE load_nml
1389
1390
1391   !!----------------------------------------------------------------------
1392END MODULE lib_mpp
Note: See TracBrowser for help on using the repository browser.