source: NEMO/trunk/src/OCE/LBC/lib_mpp.F90 @ 12377

Last change on this file since 12377 was 12377, checked in by acc, 10 months ago

The big one. Merging all 2019 developments from the option 1 branch back onto the trunk.

This changeset reproduces 2019/dev_r11943_MERGE_2019 on the trunk using a 2-URL merge
onto a working copy of the trunk. I.e.:

svn merge —ignore-ancestry \

svn+ssh://acc@forge.ipsl.jussieu.fr/ipsl/forge/projets/nemo/svn/NEMO/trunk \
svn+ssh://acc@forge.ipsl.jussieu.fr/ipsl/forge/projets/nemo/svn/NEMO/branches/2019/dev_r11943_MERGE_2019 ./

The —ignore-ancestry flag avoids problems that may otherwise arise from the fact that
the merge history been trunk and branch may have been applied in a different order but
care has been taken before this step to ensure that all applicable fixes and updates
are present in the merge branch.

The trunk state just before this step has been branched to releases/release-4.0-HEAD
and that branch has been immediately tagged as releases/release-4.0.2. Any fixes
or additions in response to tickets on 4.0, 4.0.1 or 4.0.2 should be done on
releases/release-4.0-HEAD. From now on future 'point' releases (e.g. 4.0.2) will
remain unchanged with periodic releases as needs demand. Note release-4.0-HEAD is a
transitional naming convention. Future full releases, say 4.2, will have a release-4.2
branch which fulfills this role and the first point release (e.g. 4.2.0) will be made
immediately following the release branch creation.

2020 developments can be started from any trunk revision later than this one.

  • Property svn:keywords set to Id
File size: 60.3 KB
Line 
1MODULE lib_mpp
2   !!======================================================================
3   !!                       ***  MODULE  lib_mpp  ***
4   !! Ocean numerics:  massively parallel processing library
5   !!=====================================================================
6   !! History :  OPA  !  1994  (M. Guyon, J. Escobar, M. Imbard)  Original code
7   !!            7.0  !  1997  (A.M. Treguier)  SHMEM additions
8   !!            8.0  !  1998  (M. Imbard, J. Escobar, L. Colombet ) SHMEM and MPI
9   !!                 !  1998  (J.M. Molines) Open boundary conditions
10   !!   NEMO     1.0  !  2003  (J.M. Molines, G. Madec)  F90, free form
11   !!                 !  2003  (J.M. Molines) add mpp_ini_north(_3d,_2d)
12   !!             -   !  2004  (R. Bourdalle Badie)  isend option in mpi
13   !!                 !  2004  (J.M. Molines) minloc, maxloc
14   !!             -   !  2005  (G. Madec, S. Masson)  npolj=5,6 F-point & ice cases
15   !!             -   !  2005  (R. Redler) Replacement of MPI_COMM_WORLD except for MPI_Abort
16   !!             -   !  2005  (R. Benshila, G. Madec)  add extra halo case
17   !!             -   !  2008  (R. Benshila) add mpp_ini_ice
18   !!            3.2  !  2009  (R. Benshila) SHMEM suppression, north fold in lbc_nfd
19   !!            3.2  !  2009  (O. Marti)    add mpp_ini_znl
20   !!            4.0  !  2011  (G. Madec)  move ctl_ routines from in_out_manager
21   !!            3.5  !  2012  (S.Mocavero, I. Epicoco) Add mpp_lnk_bdy_3d/2d routines to optimize the BDY comm.
22   !!            3.5  !  2013  (C. Ethe, G. Madec)  message passing arrays as local variables
23   !!            3.5  !  2013  (S.Mocavero, I.Epicoco - CMCC) north fold optimizations
24   !!            3.6  !  2015  (O. Tintó and M. Castrillo - BSC) Added '_multiple' case for 2D lbc and max
25   !!            4.0  !  2017  (G. Madec) automatique allocation of array argument (use any 3rd dimension)
26   !!             -   !  2017  (G. Madec) create generic.h90 files to generate all lbc and north fold routines
27   !!----------------------------------------------------------------------
28
29   !!----------------------------------------------------------------------
30   !!   ctl_stop      : update momentum and tracer Kz from a tke scheme
31   !!   ctl_warn      : initialization, namelist read, and parameters control
32   !!   ctl_opn       : Open file and check if required file is available.
33   !!   ctl_nam       : Prints informations when an error occurs while reading a namelist
34   !!   load_nml      : Read, condense and buffer namelist file into character array for use as an internal file
35   !!----------------------------------------------------------------------
36   !!----------------------------------------------------------------------
37   !!   mpp_start     : get local communicator its size and rank
38   !!   mpp_lnk       : interface (defined in lbclnk) for message passing of 2d or 3d arrays (mpp_lnk_2d, mpp_lnk_3d)
39   !!   mpp_lnk_icb   : interface for message passing of 2d arrays with extra halo for icebergs (mpp_lnk_2d_icb)
40   !!   mpprecv       :
41   !!   mppsend       :
42   !!   mppscatter    :
43   !!   mppgather     :
44   !!   mpp_min       : generic interface for mppmin_int , mppmin_a_int , mppmin_real, mppmin_a_real
45   !!   mpp_max       : generic interface for mppmax_int , mppmax_a_int , mppmax_real, mppmax_a_real
46   !!   mpp_sum       : generic interface for mppsum_int , mppsum_a_int , mppsum_real, mppsum_a_real
47   !!   mpp_minloc    :
48   !!   mpp_maxloc    :
49   !!   mppsync       :
50   !!   mppstop       :
51   !!   mpp_ini_north : initialisation of north fold
52   !!   mpp_lbc_north_icb : alternative to mpp_nfd for extra outer halo with icebergs
53   !!   mpp_bcast_nml : broadcast/receive namelist character buffer from reading process to all others
54   !!----------------------------------------------------------------------
55   USE dom_oce        ! ocean space and time domain
56   USE in_out_manager ! I/O manager
57
58   IMPLICIT NONE
59   PRIVATE
60   !
61   PUBLIC   ctl_stop, ctl_warn, ctl_opn, ctl_nam, load_nml
62   PUBLIC   mpp_start, mppstop, mppsync, mpp_comm_free
63   PUBLIC   mpp_ini_north
64   PUBLIC   mpp_min, mpp_max, mpp_sum, mpp_minloc, mpp_maxloc
65   PUBLIC   mpp_delay_max, mpp_delay_sum, mpp_delay_rcv
66   PUBLIC   mppscatter, mppgather
67   PUBLIC   mpp_ini_znl
68   PUBLIC   mppsend, mpprecv                          ! needed by TAM and ICB routines
69   PUBLIC   mpp_report
70   PUBLIC   mpp_bcast_nml
71   PUBLIC   tic_tac
72#if ! defined key_mpp_mpi
73   PUBLIC MPI_Wtime
74#endif
75   
76   !! * Interfaces
77   !! define generic interface for these routine as they are called sometimes
78   !! with scalar arguments instead of array arguments, which causes problems
79   !! for the compilation on AIX system as well as NEC and SGI. Ok on COMPACQ
80   INTERFACE mpp_min
81      MODULE PROCEDURE mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real
82   END INTERFACE
83   INTERFACE mpp_max
84      MODULE PROCEDURE mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real
85   END INTERFACE
86   INTERFACE mpp_sum
87      MODULE PROCEDURE mppsum_a_int, mppsum_int, mppsum_a_real, mppsum_real,   &
88         &             mppsum_realdd, mppsum_a_realdd
89   END INTERFACE
90   INTERFACE mpp_minloc
91      MODULE PROCEDURE mpp_minloc2d ,mpp_minloc3d
92   END INTERFACE
93   INTERFACE mpp_maxloc
94      MODULE PROCEDURE mpp_maxloc2d ,mpp_maxloc3d
95   END INTERFACE
96
97   !! ========================= !!
98   !!  MPI  variable definition !!
99   !! ========================= !!
100#if   defined key_mpp_mpi
101!$AGRIF_DO_NOT_TREAT
102   INCLUDE 'mpif.h'
103!$AGRIF_END_DO_NOT_TREAT
104   LOGICAL, PUBLIC, PARAMETER ::   lk_mpp = .TRUE.    !: mpp flag
105#else   
106   INTEGER, PUBLIC, PARAMETER ::   MPI_STATUS_SIZE = 1
107   INTEGER, PUBLIC, PARAMETER ::   MPI_DOUBLE_PRECISION = 8
108   LOGICAL, PUBLIC, PARAMETER ::   lk_mpp = .FALSE.    !: mpp flag
109#endif
110
111   INTEGER, PARAMETER         ::   nprocmax = 2**10   ! maximun dimension (required to be a power of 2)
112
113   INTEGER, PUBLIC ::   mppsize        ! number of process
114   INTEGER, PUBLIC ::   mpprank        ! process number  [ 0 - size-1 ]
115!$AGRIF_DO_NOT_TREAT
116   INTEGER, PUBLIC ::   mpi_comm_oce   ! opa local communicator
117!$AGRIF_END_DO_NOT_TREAT
118
119   INTEGER :: MPI_SUMDD
120
121   ! variables used for zonal integration
122   INTEGER, PUBLIC ::   ncomm_znl       !: communicator made by the processors on the same zonal average
123   LOGICAL, PUBLIC ::   l_znl_root      !: True on the 'left'most processor on the same row
124   INTEGER         ::   ngrp_znl        !  group ID for the znl processors
125   INTEGER         ::   ndim_rank_znl   !  number of processors on the same zonal average
126   INTEGER, DIMENSION(:), ALLOCATABLE, SAVE ::   nrank_znl  ! dimension ndim_rank_znl, number of the procs into the same znl domain
127
128   ! North fold condition in mpp_mpi with jpni > 1 (PUBLIC for TAM)
129   INTEGER, PUBLIC ::   ngrp_world        !: group ID for the world processors
130   INTEGER, PUBLIC ::   ngrp_opa          !: group ID for the opa processors
131   INTEGER, PUBLIC ::   ngrp_north        !: group ID for the northern processors (to be fold)
132   INTEGER, PUBLIC ::   ncomm_north       !: communicator made by the processors belonging to ngrp_north
133   INTEGER, PUBLIC ::   ndim_rank_north   !: number of 'sea' processor in the northern line (can be /= jpni !)
134   INTEGER, PUBLIC ::   njmppmax          !: value of njmpp for the processors of the northern line
135   INTEGER, PUBLIC ::   north_root        !: number (in the comm_opa) of proc 0 in the northern comm
136   INTEGER, PUBLIC, DIMENSION(:), ALLOCATABLE, SAVE ::   nrank_north   !: dimension ndim_rank_north
137
138   ! Communications summary report
139   CHARACTER(len=128), DIMENSION(:), ALLOCATABLE ::   crname_lbc                   !: names of lbc_lnk calling routines
140   CHARACTER(len=128), DIMENSION(:), ALLOCATABLE ::   crname_glb                   !: names of global comm calling routines
141   CHARACTER(len=128), DIMENSION(:), ALLOCATABLE ::   crname_dlg                   !: names of delayed global comm calling routines
142   INTEGER, PUBLIC                               ::   ncom_stp = 0                 !: copy of time step # istp
143   INTEGER, PUBLIC                               ::   ncom_fsbc = 1                !: copy of sbc time step # nn_fsbc
144   INTEGER, PUBLIC                               ::   ncom_freq                    !: frequency of comm diagnostic
145   INTEGER, PUBLIC , DIMENSION(:,:), ALLOCATABLE ::   ncomm_sequence               !: size of communicated arrays (halos)
146   INTEGER, PARAMETER, PUBLIC                    ::   ncom_rec_max = 5000          !: max number of communication record
147   INTEGER, PUBLIC                               ::   n_sequence_lbc = 0           !: # of communicated arraysvia lbc
148   INTEGER, PUBLIC                               ::   n_sequence_glb = 0           !: # of global communications
149   INTEGER, PUBLIC                               ::   n_sequence_dlg = 0           !: # of delayed global communications
150   INTEGER, PUBLIC                               ::   numcom = -1                  !: logical unit for communicaton report
151   LOGICAL, PUBLIC                               ::   l_full_nf_update = .TRUE.    !: logical for a full (2lines) update of bc at North fold report
152   INTEGER,                    PARAMETER, PUBLIC ::   nbdelay = 2       !: number of delayed operations
153   !: name (used as id) of allreduce-delayed operations
154   ! Warning: we must use the same character length in an array constructor (at least for gcc compiler)
155   CHARACTER(len=32), DIMENSION(nbdelay), PUBLIC ::   c_delaylist = (/ 'cflice', 'fwb   ' /)
156   !: component name where the allreduce-delayed operation is performed
157   CHARACTER(len=3),  DIMENSION(nbdelay), PUBLIC ::   c_delaycpnt = (/ 'ICE'   , 'OCE' /)
158   TYPE, PUBLIC ::   DELAYARR
159      REAL(   wp), POINTER, DIMENSION(:) ::  z1d => NULL()
160      COMPLEX(wp), POINTER, DIMENSION(:) ::  y1d => NULL()
161   END TYPE DELAYARR
162   TYPE( DELAYARR ), DIMENSION(nbdelay), PUBLIC, SAVE  ::   todelay         !: must have SAVE for default initialization of DELAYARR
163   INTEGER,          DIMENSION(nbdelay), PUBLIC        ::   ndelayid = -1   !: mpi request id of the delayed operations
164
165   ! timing summary report
166   REAL(wp), DIMENSION(2), PUBLIC ::  waiting_time = 0._wp
167   REAL(wp)              , PUBLIC ::  compute_time = 0._wp, elapsed_time = 0._wp
168   
169   REAL(wp), DIMENSION(:), ALLOCATABLE, SAVE ::   tampon   ! buffer in case of bsend
170
171   LOGICAL, PUBLIC ::   ln_nnogather                !: namelist control of northfold comms
172   LOGICAL, PUBLIC ::   l_north_nogather = .FALSE.  !: internal control of northfold comms
173   
174   !! * Substitutions
175#  include "do_loop_substitute.h90"
176   !!----------------------------------------------------------------------
177   !! NEMO/OCE 4.0 , NEMO Consortium (2018)
178   !! $Id$
179   !! Software governed by the CeCILL license (see ./LICENSE)
180   !!----------------------------------------------------------------------
181CONTAINS
182
183   SUBROUTINE mpp_start( localComm )
184      !!----------------------------------------------------------------------
185      !!                  ***  routine mpp_start  ***
186      !!
187      !! ** Purpose :   get mpi_comm_oce, mpprank and mppsize
188      !!----------------------------------------------------------------------
189      INTEGER         , OPTIONAL   , INTENT(in   ) ::   localComm    !
190      !
191      INTEGER ::   ierr
192      LOGICAL ::   llmpi_init
193      !!----------------------------------------------------------------------
194#if defined key_mpp_mpi
195      !
196      CALL mpi_initialized ( llmpi_init, ierr )
197      IF( ierr /= MPI_SUCCESS ) CALL ctl_stop( 'STOP', ' lib_mpp: Error in routine mpi_initialized' )
198
199      IF( .NOT. llmpi_init ) THEN
200         IF( PRESENT(localComm) ) THEN
201            WRITE(ctmp1,*) ' lib_mpp: You cannot provide a local communicator '
202            WRITE(ctmp2,*) '          without calling MPI_Init before ! '
203            CALL ctl_stop( 'STOP', ctmp1, ctmp2 )
204         ENDIF
205         CALL mpi_init( ierr )
206         IF( ierr /= MPI_SUCCESS ) CALL ctl_stop( 'STOP', ' lib_mpp: Error in routine mpi_init' )
207      ENDIF
208       
209      IF( PRESENT(localComm) ) THEN
210         IF( Agrif_Root() ) THEN
211            mpi_comm_oce = localComm
212         ENDIF
213      ELSE
214         CALL mpi_comm_dup( mpi_comm_world, mpi_comm_oce, ierr)
215         IF( ierr /= MPI_SUCCESS ) CALL ctl_stop( 'STOP', ' lib_mpp: Error in routine mpi_comm_dup' )
216      ENDIF
217
218# if defined key_agrif
219      IF( Agrif_Root() ) THEN
220         CALL Agrif_MPI_Init(mpi_comm_oce)
221      ELSE
222         CALL Agrif_MPI_set_grid_comm(mpi_comm_oce)
223      ENDIF
224# endif
225
226      CALL mpi_comm_rank( mpi_comm_oce, mpprank, ierr )
227      CALL mpi_comm_size( mpi_comm_oce, mppsize, ierr )
228      !
229      CALL MPI_OP_CREATE(DDPDD_MPI, .TRUE., MPI_SUMDD, ierr)
230      !
231#else
232      IF( PRESENT( localComm ) ) mpi_comm_oce = localComm
233      mppsize = 1
234      mpprank = 0
235#endif
236   END SUBROUTINE mpp_start
237
238
239   SUBROUTINE mppsend( ktyp, pmess, kbytes, kdest, md_req )
240      !!----------------------------------------------------------------------
241      !!                  ***  routine mppsend  ***
242      !!
243      !! ** Purpose :   Send messag passing array
244      !!
245      !!----------------------------------------------------------------------
246      REAL(wp), INTENT(inout) ::   pmess(*)   ! array of real
247      INTEGER , INTENT(in   ) ::   kbytes     ! size of the array pmess
248      INTEGER , INTENT(in   ) ::   kdest      ! receive process number
249      INTEGER , INTENT(in   ) ::   ktyp       ! tag of the message
250      INTEGER , INTENT(in   ) ::   md_req     ! argument for isend
251      !!
252      INTEGER ::   iflag
253      !!----------------------------------------------------------------------
254      !
255#if defined key_mpp_mpi
256      CALL mpi_isend( pmess, kbytes, mpi_double_precision, kdest , ktyp, mpi_comm_oce, md_req, iflag )
257#endif
258      !
259   END SUBROUTINE mppsend
260
261
262   SUBROUTINE mpprecv( ktyp, pmess, kbytes, ksource )
263      !!----------------------------------------------------------------------
264      !!                  ***  routine mpprecv  ***
265      !!
266      !! ** Purpose :   Receive messag passing array
267      !!
268      !!----------------------------------------------------------------------
269      REAL(wp), INTENT(inout) ::   pmess(*)   ! array of real
270      INTEGER , INTENT(in   ) ::   kbytes     ! suze of the array pmess
271      INTEGER , INTENT(in   ) ::   ktyp       ! Tag of the recevied message
272      INTEGER, OPTIONAL, INTENT(in) :: ksource    ! source process number
273      !!
274      INTEGER :: istatus(mpi_status_size)
275      INTEGER :: iflag
276      INTEGER :: use_source
277      !!----------------------------------------------------------------------
278      !
279#if defined key_mpp_mpi
280      ! If a specific process number has been passed to the receive call,
281      ! use that one. Default is to use mpi_any_source
282      use_source = mpi_any_source
283      IF( PRESENT(ksource) )   use_source = ksource
284      !
285      CALL mpi_recv( pmess, kbytes, mpi_double_precision, use_source, ktyp, mpi_comm_oce, istatus, iflag )
286#endif
287      !
288   END SUBROUTINE mpprecv
289
290
291   SUBROUTINE mppgather( ptab, kp, pio )
292      !!----------------------------------------------------------------------
293      !!                   ***  routine mppgather  ***
294      !!
295      !! ** Purpose :   Transfert between a local subdomain array and a work
296      !!     array which is distributed following the vertical level.
297      !!
298      !!----------------------------------------------------------------------
299      REAL(wp), DIMENSION(jpi,jpj)      , INTENT(in   ) ::   ptab   ! subdomain input array
300      INTEGER                           , INTENT(in   ) ::   kp     ! record length
301      REAL(wp), DIMENSION(jpi,jpj,jpnij), INTENT(  out) ::   pio    ! subdomain input array
302      !!
303      INTEGER :: itaille, ierror   ! temporary integer
304      !!---------------------------------------------------------------------
305      !
306      itaille = jpi * jpj
307#if defined key_mpp_mpi
308      CALL mpi_gather( ptab, itaille, mpi_double_precision, pio, itaille     ,   &
309         &                            mpi_double_precision, kp , mpi_comm_oce, ierror )
310#else
311      pio(:,:,1) = ptab(:,:)
312#endif
313      !
314   END SUBROUTINE mppgather
315
316
317   SUBROUTINE mppscatter( pio, kp, ptab )
318      !!----------------------------------------------------------------------
319      !!                  ***  routine mppscatter  ***
320      !!
321      !! ** Purpose :   Transfert between awork array which is distributed
322      !!      following the vertical level and the local subdomain array.
323      !!
324      !!----------------------------------------------------------------------
325      REAL(wp), DIMENSION(jpi,jpj,jpnij)  ::   pio    ! output array
326      INTEGER                             ::   kp     ! Tag (not used with MPI
327      REAL(wp), DIMENSION(jpi,jpj)        ::   ptab   ! subdomain array input
328      !!
329      INTEGER :: itaille, ierror   ! temporary integer
330      !!---------------------------------------------------------------------
331      !
332      itaille = jpi * jpj
333      !
334#if defined key_mpp_mpi
335      CALL mpi_scatter( pio, itaille, mpi_double_precision, ptab, itaille     ,   &
336         &                            mpi_double_precision, kp  , mpi_comm_oce, ierror )
337#else
338      ptab(:,:) = pio(:,:,1)
339#endif
340      !
341   END SUBROUTINE mppscatter
342
343   
344   SUBROUTINE mpp_delay_sum( cdname, cdelay, y_in, pout, ldlast, kcom )
345     !!----------------------------------------------------------------------
346      !!                   ***  routine mpp_delay_sum  ***
347      !!
348      !! ** Purpose :   performed delayed mpp_sum, the result is received on next call
349      !!
350      !!----------------------------------------------------------------------
351      CHARACTER(len=*), INTENT(in   )               ::   cdname  ! name of the calling subroutine
352      CHARACTER(len=*), INTENT(in   )               ::   cdelay  ! name (used as id) of the delayed operation
353      COMPLEX(wp),      INTENT(in   ), DIMENSION(:) ::   y_in
354      REAL(wp),         INTENT(  out), DIMENSION(:) ::   pout
355      LOGICAL,          INTENT(in   )               ::   ldlast  ! true if this is the last time we call this routine
356      INTEGER,          INTENT(in   ), OPTIONAL     ::   kcom
357      !!
358      INTEGER ::   ji, isz
359      INTEGER ::   idvar
360      INTEGER ::   ierr, ilocalcomm
361      COMPLEX(wp), ALLOCATABLE, DIMENSION(:) ::   ytmp
362      !!----------------------------------------------------------------------
363#if defined key_mpp_mpi
364      ilocalcomm = mpi_comm_oce
365      IF( PRESENT(kcom) )   ilocalcomm = kcom
366
367      isz = SIZE(y_in)
368     
369      IF( narea == 1 .AND. numcom == -1 ) CALL mpp_report( cdname, ld_dlg = .TRUE. )
370
371      idvar = -1
372      DO ji = 1, nbdelay
373         IF( TRIM(cdelay) == TRIM(c_delaylist(ji)) ) idvar = ji
374      END DO
375      IF ( idvar == -1 )   CALL ctl_stop( 'STOP',' mpp_delay_sum : please add a new delayed exchange for '//TRIM(cdname) )
376
377      IF ( ndelayid(idvar) == 0 ) THEN         ! first call    with restart: %z1d defined in iom_delay_rst
378         !                                       --------------------------
379         IF ( SIZE(todelay(idvar)%z1d) /= isz ) THEN                  ! Check dimension coherence
380            IF(lwp) WRITE(numout,*) ' WARNING: the nb of delayed variables in restart file is not the model one'
381            DEALLOCATE(todelay(idvar)%z1d)
382            ndelayid(idvar) = -1                                      ! do as if we had no restart
383         ELSE
384            ALLOCATE(todelay(idvar)%y1d(isz))
385            todelay(idvar)%y1d(:) = CMPLX(todelay(idvar)%z1d(:), 0., wp)   ! create %y1d, complex variable needed by mpi_sumdd
386         END IF
387      ENDIF
388     
389      IF( ndelayid(idvar) == -1 ) THEN         ! first call without restart: define %y1d and %z1d from y_in with blocking allreduce
390         !                                       --------------------------
391         ALLOCATE(todelay(idvar)%z1d(isz), todelay(idvar)%y1d(isz))   ! allocate also %z1d as used for the restart
392         CALL mpi_allreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ierr )   ! get %y1d
393         todelay(idvar)%z1d(:) = REAL(todelay(idvar)%y1d(:), wp)      ! define %z1d from %y1d
394      ENDIF
395
396      IF( ndelayid(idvar) > 0 )   CALL mpp_delay_rcv( idvar )         ! make sure %z1d is received
397
398      ! send back pout from todelay(idvar)%z1d defined at previous call
399      pout(:) = todelay(idvar)%z1d(:)
400
401      ! send y_in into todelay(idvar)%y1d with a non-blocking communication
402# if defined key_mpi2
403      IF( ln_timing ) CALL tic_tac( .TRUE., ld_global = .TRUE.)
404      CALL  mpi_allreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ndelayid(idvar), ierr )
405      IF( ln_timing ) CALL tic_tac(.FALSE., ld_global = .TRUE.)
406# else
407      CALL mpi_iallreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ndelayid(idvar), ierr )
408# endif
409#else
410      pout(:) = REAL(y_in(:), wp)
411#endif
412
413   END SUBROUTINE mpp_delay_sum
414
415   
416   SUBROUTINE mpp_delay_max( cdname, cdelay, p_in, pout, ldlast, kcom )
417      !!----------------------------------------------------------------------
418      !!                   ***  routine mpp_delay_max  ***
419      !!
420      !! ** Purpose :   performed delayed mpp_max, the result is received on next call
421      !!
422      !!----------------------------------------------------------------------
423      CHARACTER(len=*), INTENT(in   )                 ::   cdname  ! name of the calling subroutine
424      CHARACTER(len=*), INTENT(in   )                 ::   cdelay  ! name (used as id) of the delayed operation
425      REAL(wp),         INTENT(in   ), DIMENSION(:)   ::   p_in    !
426      REAL(wp),         INTENT(  out), DIMENSION(:)   ::   pout    !
427      LOGICAL,          INTENT(in   )                 ::   ldlast  ! true if this is the last time we call this routine
428      INTEGER,          INTENT(in   ), OPTIONAL       ::   kcom
429      !!
430      INTEGER ::   ji, isz
431      INTEGER ::   idvar
432      INTEGER ::   ierr, ilocalcomm
433      !!----------------------------------------------------------------------
434#if defined key_mpp_mpi
435      ilocalcomm = mpi_comm_oce
436      IF( PRESENT(kcom) )   ilocalcomm = kcom
437
438      isz = SIZE(p_in)
439
440      IF( narea == 1 .AND. numcom == -1 ) CALL mpp_report( cdname, ld_dlg = .TRUE. )
441
442      idvar = -1
443      DO ji = 1, nbdelay
444         IF( TRIM(cdelay) == TRIM(c_delaylist(ji)) ) idvar = ji
445      END DO
446      IF ( idvar == -1 )   CALL ctl_stop( 'STOP',' mpp_delay_max : please add a new delayed exchange for '//TRIM(cdname) )
447
448      IF ( ndelayid(idvar) == 0 ) THEN         ! first call    with restart: %z1d defined in iom_delay_rst
449         !                                       --------------------------
450         IF ( SIZE(todelay(idvar)%z1d) /= isz ) THEN                  ! Check dimension coherence
451            IF(lwp) WRITE(numout,*) ' WARNING: the nb of delayed variables in restart file is not the model one'
452            DEALLOCATE(todelay(idvar)%z1d)
453            ndelayid(idvar) = -1                                      ! do as if we had no restart
454         END IF
455      ENDIF
456
457      IF( ndelayid(idvar) == -1 ) THEN         ! first call without restart: define %z1d from p_in with a blocking allreduce
458         !                                       --------------------------
459         ALLOCATE(todelay(idvar)%z1d(isz))
460         CALL mpi_allreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_DOUBLE_PRECISION, mpi_max, ilocalcomm, ierr )   ! get %z1d
461      ENDIF
462
463      IF( ndelayid(idvar) > 0 )   CALL mpp_delay_rcv( idvar )         ! make sure %z1d is received
464
465      ! send back pout from todelay(idvar)%z1d defined at previous call
466      pout(:) = todelay(idvar)%z1d(:)
467
468      ! send p_in into todelay(idvar)%z1d with a non-blocking communication
469# if defined key_mpi2
470      IF( ln_timing ) CALL tic_tac( .TRUE., ld_global = .TRUE.)
471      CALL  mpi_allreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_DOUBLE_PRECISION, mpi_max, ilocalcomm, ndelayid(idvar), ierr )
472      IF( ln_timing ) CALL tic_tac(.FALSE., ld_global = .TRUE.)
473# else
474      CALL mpi_iallreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_DOUBLE_PRECISION, mpi_max, ilocalcomm, ndelayid(idvar), ierr )
475# endif
476#else
477      pout(:) = p_in(:)
478#endif
479
480   END SUBROUTINE mpp_delay_max
481
482   
483   SUBROUTINE mpp_delay_rcv( kid )
484      !!----------------------------------------------------------------------
485      !!                   ***  routine mpp_delay_rcv  ***
486      !!
487      !! ** Purpose :  force barrier for delayed mpp (needed for restart)
488      !!
489      !!----------------------------------------------------------------------
490      INTEGER,INTENT(in   )      ::  kid 
491      INTEGER ::   ierr
492      !!----------------------------------------------------------------------
493#if defined key_mpp_mpi
494      IF( ndelayid(kid) /= -2 ) THEN 
495#if ! defined key_mpi2
496         IF( ln_timing ) CALL tic_tac( .TRUE., ld_global = .TRUE.)
497         CALL mpi_wait( ndelayid(kid), MPI_STATUS_IGNORE, ierr )                        ! make sure todelay(kid) is received
498         IF( ln_timing ) CALL tic_tac(.FALSE., ld_global = .TRUE.)
499#endif
500         IF( ASSOCIATED(todelay(kid)%y1d) )   todelay(kid)%z1d(:) = REAL(todelay(kid)%y1d(:), wp)  ! define %z1d from %y1d
501         ndelayid(kid) = -2   ! add flag to know that mpi_wait was already called on kid
502      ENDIF
503#endif
504   END SUBROUTINE mpp_delay_rcv
505
506   SUBROUTINE mpp_bcast_nml( cdnambuff , kleng )
507      CHARACTER(LEN=:)    , ALLOCATABLE, INTENT(INOUT) :: cdnambuff
508      INTEGER                          , INTENT(INOUT) :: kleng
509      !!----------------------------------------------------------------------
510      !!                  ***  routine mpp_bcast_nml  ***
511      !!
512      !! ** Purpose :   broadcast namelist character buffer
513      !!
514      !!----------------------------------------------------------------------
515      !!
516      INTEGER ::   iflag
517      !!----------------------------------------------------------------------
518      !
519#if defined key_mpp_mpi
520      call MPI_BCAST(kleng, 1, MPI_INT, 0, mpi_comm_oce, iflag)
521      call MPI_BARRIER(mpi_comm_oce, iflag)
522!$AGRIF_DO_NOT_TREAT
523      IF ( .NOT. ALLOCATED(cdnambuff) ) ALLOCATE( CHARACTER(LEN=kleng) :: cdnambuff )
524!$AGRIF_END_DO_NOT_TREAT
525      call MPI_BCAST(cdnambuff, kleng, MPI_CHARACTER, 0, mpi_comm_oce, iflag)
526      call MPI_BARRIER(mpi_comm_oce, iflag)
527#endif
528      !
529   END SUBROUTINE mpp_bcast_nml
530
531   
532   !!----------------------------------------------------------------------
533   !!    ***  mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real  ***
534   !!   
535   !!----------------------------------------------------------------------
536   !!
537#  define OPERATION_MAX
538#  define INTEGER_TYPE
539#  define DIM_0d
540#     define ROUTINE_ALLREDUCE           mppmax_int
541#     include "mpp_allreduce_generic.h90"
542#     undef ROUTINE_ALLREDUCE
543#  undef DIM_0d
544#  define DIM_1d
545#     define ROUTINE_ALLREDUCE           mppmax_a_int
546#     include "mpp_allreduce_generic.h90"
547#     undef ROUTINE_ALLREDUCE
548#  undef DIM_1d
549#  undef INTEGER_TYPE
550!
551#  define REAL_TYPE
552#  define DIM_0d
553#     define ROUTINE_ALLREDUCE           mppmax_real
554#     include "mpp_allreduce_generic.h90"
555#     undef ROUTINE_ALLREDUCE
556#  undef DIM_0d
557#  define DIM_1d
558#     define ROUTINE_ALLREDUCE           mppmax_a_real
559#     include "mpp_allreduce_generic.h90"
560#     undef ROUTINE_ALLREDUCE
561#  undef DIM_1d
562#  undef REAL_TYPE
563#  undef OPERATION_MAX
564   !!----------------------------------------------------------------------
565   !!    ***  mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real  ***
566   !!   
567   !!----------------------------------------------------------------------
568   !!
569#  define OPERATION_MIN
570#  define INTEGER_TYPE
571#  define DIM_0d
572#     define ROUTINE_ALLREDUCE           mppmin_int
573#     include "mpp_allreduce_generic.h90"
574#     undef ROUTINE_ALLREDUCE
575#  undef DIM_0d
576#  define DIM_1d
577#     define ROUTINE_ALLREDUCE           mppmin_a_int
578#     include "mpp_allreduce_generic.h90"
579#     undef ROUTINE_ALLREDUCE
580#  undef DIM_1d
581#  undef INTEGER_TYPE
582!
583#  define REAL_TYPE
584#  define DIM_0d
585#     define ROUTINE_ALLREDUCE           mppmin_real
586#     include "mpp_allreduce_generic.h90"
587#     undef ROUTINE_ALLREDUCE
588#  undef DIM_0d
589#  define DIM_1d
590#     define ROUTINE_ALLREDUCE           mppmin_a_real
591#     include "mpp_allreduce_generic.h90"
592#     undef ROUTINE_ALLREDUCE
593#  undef DIM_1d
594#  undef REAL_TYPE
595#  undef OPERATION_MIN
596
597   !!----------------------------------------------------------------------
598   !!    ***  mppsum_a_int, mppsum_int, mppsum_a_real, mppsum_real  ***
599   !!   
600   !!   Global sum of 1D array or a variable (integer, real or complex)
601   !!----------------------------------------------------------------------
602   !!
603#  define OPERATION_SUM
604#  define INTEGER_TYPE
605#  define DIM_0d
606#     define ROUTINE_ALLREDUCE           mppsum_int
607#     include "mpp_allreduce_generic.h90"
608#     undef ROUTINE_ALLREDUCE
609#  undef DIM_0d
610#  define DIM_1d
611#     define ROUTINE_ALLREDUCE           mppsum_a_int
612#     include "mpp_allreduce_generic.h90"
613#     undef ROUTINE_ALLREDUCE
614#  undef DIM_1d
615#  undef INTEGER_TYPE
616!
617#  define REAL_TYPE
618#  define DIM_0d
619#     define ROUTINE_ALLREDUCE           mppsum_real
620#     include "mpp_allreduce_generic.h90"
621#     undef ROUTINE_ALLREDUCE
622#  undef DIM_0d
623#  define DIM_1d
624#     define ROUTINE_ALLREDUCE           mppsum_a_real
625#     include "mpp_allreduce_generic.h90"
626#     undef ROUTINE_ALLREDUCE
627#  undef DIM_1d
628#  undef REAL_TYPE
629#  undef OPERATION_SUM
630
631#  define OPERATION_SUM_DD
632#  define COMPLEX_TYPE
633#  define DIM_0d
634#     define ROUTINE_ALLREDUCE           mppsum_realdd
635#     include "mpp_allreduce_generic.h90"
636#     undef ROUTINE_ALLREDUCE
637#  undef DIM_0d
638#  define DIM_1d
639#     define ROUTINE_ALLREDUCE           mppsum_a_realdd
640#     include "mpp_allreduce_generic.h90"
641#     undef ROUTINE_ALLREDUCE
642#  undef DIM_1d
643#  undef COMPLEX_TYPE
644#  undef OPERATION_SUM_DD
645
646   !!----------------------------------------------------------------------
647   !!    ***  mpp_minloc2d, mpp_minloc3d, mpp_maxloc2d, mpp_maxloc3d
648   !!   
649   !!----------------------------------------------------------------------
650   !!
651#  define OPERATION_MINLOC
652#  define DIM_2d
653#     define ROUTINE_LOC           mpp_minloc2d
654#     include "mpp_loc_generic.h90"
655#     undef ROUTINE_LOC
656#  undef DIM_2d
657#  define DIM_3d
658#     define ROUTINE_LOC           mpp_minloc3d
659#     include "mpp_loc_generic.h90"
660#     undef ROUTINE_LOC
661#  undef DIM_3d
662#  undef OPERATION_MINLOC
663
664#  define OPERATION_MAXLOC
665#  define DIM_2d
666#     define ROUTINE_LOC           mpp_maxloc2d
667#     include "mpp_loc_generic.h90"
668#     undef ROUTINE_LOC
669#  undef DIM_2d
670#  define DIM_3d
671#     define ROUTINE_LOC           mpp_maxloc3d
672#     include "mpp_loc_generic.h90"
673#     undef ROUTINE_LOC
674#  undef DIM_3d
675#  undef OPERATION_MAXLOC
676
677   SUBROUTINE mppsync()
678      !!----------------------------------------------------------------------
679      !!                  ***  routine mppsync  ***
680      !!
681      !! ** Purpose :   Massively parallel processors, synchroneous
682      !!
683      !!-----------------------------------------------------------------------
684      INTEGER :: ierror
685      !!-----------------------------------------------------------------------
686      !
687#if defined key_mpp_mpi
688      CALL mpi_barrier( mpi_comm_oce, ierror )
689#endif
690      !
691   END SUBROUTINE mppsync
692
693
694   SUBROUTINE mppstop( ld_abort ) 
695      !!----------------------------------------------------------------------
696      !!                  ***  routine mppstop  ***
697      !!
698      !! ** purpose :   Stop massively parallel processors method
699      !!
700      !!----------------------------------------------------------------------
701      LOGICAL, OPTIONAL, INTENT(in) :: ld_abort    ! source process number
702      LOGICAL ::   ll_abort
703      INTEGER ::   info
704      !!----------------------------------------------------------------------
705      ll_abort = .FALSE.
706      IF( PRESENT(ld_abort) ) ll_abort = ld_abort
707      !
708#if defined key_mpp_mpi
709      IF(ll_abort) THEN
710         CALL mpi_abort( MPI_COMM_WORLD )
711      ELSE
712         CALL mppsync
713         CALL mpi_finalize( info )
714      ENDIF
715#endif
716      IF( ll_abort ) STOP 123
717      !
718   END SUBROUTINE mppstop
719
720
721   SUBROUTINE mpp_comm_free( kcom )
722      !!----------------------------------------------------------------------
723      INTEGER, INTENT(in) ::   kcom
724      !!
725      INTEGER :: ierr
726      !!----------------------------------------------------------------------
727      !
728#if defined key_mpp_mpi
729      CALL MPI_COMM_FREE(kcom, ierr)
730#endif
731      !
732   END SUBROUTINE mpp_comm_free
733
734
735   SUBROUTINE mpp_ini_znl( kumout )
736      !!----------------------------------------------------------------------
737      !!               ***  routine mpp_ini_znl  ***
738      !!
739      !! ** Purpose :   Initialize special communicator for computing zonal sum
740      !!
741      !! ** Method  : - Look for processors in the same row
742      !!              - Put their number in nrank_znl
743      !!              - Create group for the znl processors
744      !!              - Create a communicator for znl processors
745      !!              - Determine if processor should write znl files
746      !!
747      !! ** output
748      !!      ndim_rank_znl = number of processors on the same row
749      !!      ngrp_znl = group ID for the znl processors
750      !!      ncomm_znl = communicator for the ice procs.
751      !!      n_znl_root = number (in the world) of proc 0 in the ice comm.
752      !!
753      !!----------------------------------------------------------------------
754      INTEGER, INTENT(in) ::   kumout   ! ocean.output logical units
755      !
756      INTEGER :: jproc      ! dummy loop integer
757      INTEGER :: ierr, ii   ! local integer
758      INTEGER, ALLOCATABLE, DIMENSION(:) ::   kwork
759      !!----------------------------------------------------------------------
760#if defined key_mpp_mpi
761      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_world     : ', ngrp_world
762      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - mpi_comm_world : ', mpi_comm_world
763      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - mpi_comm_oce   : ', mpi_comm_oce
764      !
765      ALLOCATE( kwork(jpnij), STAT=ierr )
766      IF( ierr /= 0 ) CALL ctl_stop( 'STOP', 'mpp_ini_znl : failed to allocate 1D array of length jpnij')
767
768      IF( jpnj == 1 ) THEN
769         ngrp_znl  = ngrp_world
770         ncomm_znl = mpi_comm_oce
771      ELSE
772         !
773         CALL MPI_ALLGATHER ( njmpp, 1, mpi_integer, kwork, 1, mpi_integer, mpi_comm_oce, ierr )
774         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - kwork pour njmpp : ', kwork
775         !-$$        CALL flush(numout)
776         !
777         ! Count number of processors on the same row
778         ndim_rank_znl = 0
779         DO jproc=1,jpnij
780            IF ( kwork(jproc) == njmpp ) THEN
781               ndim_rank_znl = ndim_rank_znl + 1
782            ENDIF
783         END DO
784         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ndim_rank_znl : ', ndim_rank_znl
785         !-$$        CALL flush(numout)
786         ! Allocate the right size to nrank_znl
787         IF (ALLOCATED (nrank_znl)) DEALLOCATE(nrank_znl)
788         ALLOCATE(nrank_znl(ndim_rank_znl))
789         ii = 0
790         nrank_znl (:) = 0
791         DO jproc=1,jpnij
792            IF ( kwork(jproc) == njmpp) THEN
793               ii = ii + 1
794               nrank_znl(ii) = jproc -1
795            ENDIF
796         END DO
797         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - nrank_znl : ', nrank_znl
798         !-$$        CALL flush(numout)
799
800         ! Create the opa group
801         CALL MPI_COMM_GROUP(mpi_comm_oce,ngrp_opa,ierr)
802         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_opa : ', ngrp_opa
803         !-$$        CALL flush(numout)
804
805         ! Create the znl group from the opa group
806         CALL MPI_GROUP_INCL  ( ngrp_opa, ndim_rank_znl, nrank_znl, ngrp_znl, ierr )
807         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_znl ', ngrp_znl
808         !-$$        CALL flush(numout)
809
810         ! Create the znl communicator from the opa communicator, ie the pool of procs in the same row
811         CALL MPI_COMM_CREATE ( mpi_comm_oce, ngrp_znl, ncomm_znl, ierr )
812         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ncomm_znl ', ncomm_znl
813         !-$$        CALL flush(numout)
814         !
815      END IF
816
817      ! Determines if processor if the first (starting from i=1) on the row
818      IF ( jpni == 1 ) THEN
819         l_znl_root = .TRUE.
820      ELSE
821         l_znl_root = .FALSE.
822         kwork (1) = nimpp
823         CALL mpp_min ( 'lib_mpp', kwork(1), kcom = ncomm_znl)
824         IF ( nimpp == kwork(1)) l_znl_root = .TRUE.
825      END IF
826
827      DEALLOCATE(kwork)
828#endif
829
830   END SUBROUTINE mpp_ini_znl
831
832
833   SUBROUTINE mpp_ini_north
834      !!----------------------------------------------------------------------
835      !!               ***  routine mpp_ini_north  ***
836      !!
837      !! ** Purpose :   Initialize special communicator for north folding
838      !!      condition together with global variables needed in the mpp folding
839      !!
840      !! ** Method  : - Look for northern processors
841      !!              - Put their number in nrank_north
842      !!              - Create groups for the world processors and the north processors
843      !!              - Create a communicator for northern processors
844      !!
845      !! ** output
846      !!      njmppmax = njmpp for northern procs
847      !!      ndim_rank_north = number of processors in the northern line
848      !!      nrank_north (ndim_rank_north) = number  of the northern procs.
849      !!      ngrp_world = group ID for the world processors
850      !!      ngrp_north = group ID for the northern processors
851      !!      ncomm_north = communicator for the northern procs.
852      !!      north_root = number (in the world) of proc 0 in the northern comm.
853      !!
854      !!----------------------------------------------------------------------
855      INTEGER ::   ierr
856      INTEGER ::   jjproc
857      INTEGER ::   ii, ji
858      !!----------------------------------------------------------------------
859      !
860#if defined key_mpp_mpi
861      njmppmax = MAXVAL( njmppt )
862      !
863      ! Look for how many procs on the northern boundary
864      ndim_rank_north = 0
865      DO jjproc = 1, jpnij
866         IF( njmppt(jjproc) == njmppmax )   ndim_rank_north = ndim_rank_north + 1
867      END DO
868      !
869      ! Allocate the right size to nrank_north
870      IF (ALLOCATED (nrank_north)) DEALLOCATE(nrank_north)
871      ALLOCATE( nrank_north(ndim_rank_north) )
872
873      ! Fill the nrank_north array with proc. number of northern procs.
874      ! Note : the rank start at 0 in MPI
875      ii = 0
876      DO ji = 1, jpnij
877         IF ( njmppt(ji) == njmppmax   ) THEN
878            ii=ii+1
879            nrank_north(ii)=ji-1
880         END IF
881      END DO
882      !
883      ! create the world group
884      CALL MPI_COMM_GROUP( mpi_comm_oce, ngrp_world, ierr )
885      !
886      ! Create the North group from the world group
887      CALL MPI_GROUP_INCL( ngrp_world, ndim_rank_north, nrank_north, ngrp_north, ierr )
888      !
889      ! Create the North communicator , ie the pool of procs in the north group
890      CALL MPI_COMM_CREATE( mpi_comm_oce, ngrp_north, ncomm_north, ierr )
891      !
892#endif
893   END SUBROUTINE mpp_ini_north
894
895
896   SUBROUTINE DDPDD_MPI( ydda, yddb, ilen, itype )
897      !!---------------------------------------------------------------------
898      !!   Routine DDPDD_MPI: used by reduction operator MPI_SUMDD
899      !!
900      !!   Modification of original codes written by David H. Bailey
901      !!   This subroutine computes yddb(i) = ydda(i)+yddb(i)
902      !!---------------------------------------------------------------------
903      INTEGER                     , INTENT(in)    ::   ilen, itype
904      COMPLEX(wp), DIMENSION(ilen), INTENT(in)    ::   ydda
905      COMPLEX(wp), DIMENSION(ilen), INTENT(inout) ::   yddb
906      !
907      REAL(wp) :: zerr, zt1, zt2    ! local work variables
908      INTEGER  :: ji, ztmp           ! local scalar
909      !!---------------------------------------------------------------------
910      !
911      ztmp = itype   ! avoid compilation warning
912      !
913      DO ji=1,ilen
914      ! Compute ydda + yddb using Knuth's trick.
915         zt1  = real(ydda(ji)) + real(yddb(ji))
916         zerr = zt1 - real(ydda(ji))
917         zt2  = ((real(yddb(ji)) - zerr) + (real(ydda(ji)) - (zt1 - zerr))) &
918                + aimag(ydda(ji)) + aimag(yddb(ji))
919
920         ! The result is zt1 + zt2, after normalization.
921         yddb(ji) = cmplx ( zt1 + zt2, zt2 - ((zt1 + zt2) - zt1),wp )
922      END DO
923      !
924   END SUBROUTINE DDPDD_MPI
925
926
927   SUBROUTINE mpp_report( cdname, kpk, kpl, kpf, ld_lbc, ld_glb, ld_dlg )
928      !!----------------------------------------------------------------------
929      !!                  ***  routine mpp_report  ***
930      !!
931      !! ** Purpose :   report use of mpp routines per time-setp
932      !!
933      !!----------------------------------------------------------------------
934      CHARACTER(len=*),           INTENT(in   ) ::   cdname      ! name of the calling subroutine
935      INTEGER         , OPTIONAL, INTENT(in   ) ::   kpk, kpl, kpf
936      LOGICAL         , OPTIONAL, INTENT(in   ) ::   ld_lbc, ld_glb, ld_dlg
937      !!
938      CHARACTER(len=128)                        ::   ccountname  ! name of a subroutine to count communications
939      LOGICAL ::   ll_lbc, ll_glb, ll_dlg
940      INTEGER ::    ji,  jj,  jk,  jh, jf, jcount   ! dummy loop indices
941      !!----------------------------------------------------------------------
942#if defined key_mpp_mpi
943      !
944      ll_lbc = .FALSE.
945      IF( PRESENT(ld_lbc) ) ll_lbc = ld_lbc
946      ll_glb = .FALSE.
947      IF( PRESENT(ld_glb) ) ll_glb = ld_glb
948      ll_dlg = .FALSE.
949      IF( PRESENT(ld_dlg) ) ll_dlg = ld_dlg
950      !
951      ! find the smallest common frequency: default = frequency product, if multiple, choose the larger of the 2 frequency
952      ncom_freq = ncom_fsbc
953      !
954      IF ( ncom_stp == nit000+ncom_freq ) THEN   ! avoid to count extra communications in potential initializations at nit000
955         IF( ll_lbc ) THEN
956            IF( .NOT. ALLOCATED(ncomm_sequence) ) ALLOCATE( ncomm_sequence(ncom_rec_max,2) )
957            IF( .NOT. ALLOCATED(    crname_lbc) ) ALLOCATE(     crname_lbc(ncom_rec_max  ) )
958            n_sequence_lbc = n_sequence_lbc + 1
959            IF( n_sequence_lbc > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
960            crname_lbc(n_sequence_lbc) = cdname     ! keep the name of the calling routine
961            ncomm_sequence(n_sequence_lbc,1) = kpk*kpl   ! size of 3rd and 4th dimensions
962            ncomm_sequence(n_sequence_lbc,2) = kpf       ! number of arrays to be treated (multi)
963         ENDIF
964         IF( ll_glb ) THEN
965            IF( .NOT. ALLOCATED(crname_glb) ) ALLOCATE( crname_glb(ncom_rec_max) )
966            n_sequence_glb = n_sequence_glb + 1
967            IF( n_sequence_glb > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
968            crname_glb(n_sequence_glb) = cdname     ! keep the name of the calling routine
969         ENDIF
970         IF( ll_dlg ) THEN
971            IF( .NOT. ALLOCATED(crname_dlg) ) ALLOCATE( crname_dlg(ncom_rec_max) )
972            n_sequence_dlg = n_sequence_dlg + 1
973            IF( n_sequence_dlg > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
974            crname_dlg(n_sequence_dlg) = cdname     ! keep the name of the calling routine
975         ENDIF
976      ELSE IF ( ncom_stp == nit000+2*ncom_freq ) THEN
977         CALL ctl_opn( numcom, 'communication_report.txt', 'REPLACE', 'FORMATTED', 'SEQUENTIAL', -1, numout, .FALSE., narea )
978         WRITE(numcom,*) ' '
979         WRITE(numcom,*) ' ------------------------------------------------------------'
980         WRITE(numcom,*) ' Communication pattern report (second oce+sbc+top time step):'
981         WRITE(numcom,*) ' ------------------------------------------------------------'
982         WRITE(numcom,*) ' '
983         WRITE(numcom,'(A,I4)') ' Exchanged halos : ', n_sequence_lbc
984         jj = 0; jk = 0; jf = 0; jh = 0
985         DO ji = 1, n_sequence_lbc
986            IF ( ncomm_sequence(ji,1) .GT. 1 ) jk = jk + 1
987            IF ( ncomm_sequence(ji,2) .GT. 1 ) jf = jf + 1
988            IF ( ncomm_sequence(ji,1) .GT. 1 .AND. ncomm_sequence(ji,2) .GT. 1 ) jj = jj + 1
989            jh = MAX (jh, ncomm_sequence(ji,1)*ncomm_sequence(ji,2))
990         END DO
991         WRITE(numcom,'(A,I3)') ' 3D Exchanged halos : ', jk
992         WRITE(numcom,'(A,I3)') ' Multi arrays exchanged halos : ', jf
993         WRITE(numcom,'(A,I3)') '   from which 3D : ', jj
994         WRITE(numcom,'(A,I10)') ' Array max size : ', jh*jpi*jpj
995         WRITE(numcom,*) ' '
996         WRITE(numcom,*) ' lbc_lnk called'
997         DO ji = 1, n_sequence_lbc - 1
998            IF ( crname_lbc(ji) /= 'already counted' ) THEN
999               ccountname = crname_lbc(ji)
1000               crname_lbc(ji) = 'already counted'
1001               jcount = 1
1002               DO jj = ji + 1, n_sequence_lbc
1003                  IF ( ccountname ==  crname_lbc(jj) ) THEN
1004                     jcount = jcount + 1
1005                     crname_lbc(jj) = 'already counted'
1006                  END IF
1007               END DO
1008               WRITE(numcom,'(A, I4, A, A)') ' - ', jcount,' times by subroutine ', TRIM(ccountname)
1009            END IF
1010         END DO
1011         IF ( crname_lbc(n_sequence_lbc) /= 'already counted' ) THEN
1012            WRITE(numcom,'(A, I4, A, A)') ' - ', 1,' times by subroutine ', TRIM(crname_lbc(ncom_rec_max))
1013         END IF
1014         WRITE(numcom,*) ' '
1015         IF ( n_sequence_glb > 0 ) THEN
1016            WRITE(numcom,'(A,I4)') ' Global communications : ', n_sequence_glb
1017            jj = 1
1018            DO ji = 2, n_sequence_glb
1019               IF( crname_glb(ji-1) /= crname_glb(ji) ) THEN
1020                  WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_glb(ji-1))
1021                  jj = 0
1022               END IF
1023               jj = jj + 1 
1024            END DO
1025            WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_glb(n_sequence_glb))
1026            DEALLOCATE(crname_glb)
1027         ELSE
1028            WRITE(numcom,*) ' No MPI global communication '
1029         ENDIF
1030         WRITE(numcom,*) ' '
1031         IF ( n_sequence_dlg > 0 ) THEN
1032            WRITE(numcom,'(A,I4)') ' Delayed global communications : ', n_sequence_dlg
1033            jj = 1
1034            DO ji = 2, n_sequence_dlg
1035               IF( crname_dlg(ji-1) /= crname_dlg(ji) ) THEN
1036                  WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_dlg(ji-1))
1037                  jj = 0
1038               END IF
1039               jj = jj + 1 
1040            END DO
1041            WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_dlg(n_sequence_dlg))
1042            DEALLOCATE(crname_dlg)
1043         ELSE
1044            WRITE(numcom,*) ' No MPI delayed global communication '
1045         ENDIF
1046         WRITE(numcom,*) ' '
1047         WRITE(numcom,*) ' -----------------------------------------------'
1048         WRITE(numcom,*) ' '
1049         DEALLOCATE(ncomm_sequence)
1050         DEALLOCATE(crname_lbc)
1051      ENDIF
1052#endif
1053   END SUBROUTINE mpp_report
1054
1055   
1056   SUBROUTINE tic_tac (ld_tic, ld_global)
1057
1058    LOGICAL,           INTENT(IN) :: ld_tic
1059    LOGICAL, OPTIONAL, INTENT(IN) :: ld_global
1060    REAL(wp), DIMENSION(2), SAVE :: tic_wt
1061    REAL(wp),               SAVE :: tic_ct = 0._wp
1062    INTEGER :: ii
1063#if defined key_mpp_mpi
1064
1065    IF( ncom_stp <= nit000 ) RETURN
1066    IF( ncom_stp == nitend ) RETURN
1067    ii = 1
1068    IF( PRESENT( ld_global ) ) THEN
1069       IF( ld_global ) ii = 2
1070    END IF
1071   
1072    IF ( ld_tic ) THEN
1073       tic_wt(ii) = MPI_Wtime()                                                    ! start count tic->tac (waiting time)
1074       IF ( tic_ct > 0.0_wp ) compute_time = compute_time + MPI_Wtime() - tic_ct   ! cumulate count tac->tic
1075    ELSE
1076       waiting_time(ii) = waiting_time(ii) + MPI_Wtime() - tic_wt(ii)              ! cumulate count tic->tac
1077       tic_ct = MPI_Wtime()                                                        ! start count tac->tic (waiting time)
1078    ENDIF
1079#endif
1080   
1081   END SUBROUTINE tic_tac
1082
1083#if ! defined key_mpp_mpi
1084   SUBROUTINE mpi_wait(request, status, ierror)
1085      INTEGER                            , INTENT(in   ) ::   request
1086      INTEGER, DIMENSION(MPI_STATUS_SIZE), INTENT(  out) ::   status
1087      INTEGER                            , INTENT(  out) ::   ierror
1088   END SUBROUTINE mpi_wait
1089
1090   
1091   FUNCTION MPI_Wtime()
1092      REAL(wp) ::  MPI_Wtime
1093      MPI_Wtime = -1.
1094   END FUNCTION MPI_Wtime
1095#endif
1096
1097   !!----------------------------------------------------------------------
1098   !!   ctl_stop, ctl_warn, get_unit, ctl_opn, ctl_nam, load_nml   routines
1099   !!----------------------------------------------------------------------
1100
1101   SUBROUTINE ctl_stop( cd1, cd2, cd3, cd4, cd5 ,   &
1102      &                 cd6, cd7, cd8, cd9, cd10 )
1103      !!----------------------------------------------------------------------
1104      !!                  ***  ROUTINE  stop_opa  ***
1105      !!
1106      !! ** Purpose :   print in ocean.outpput file a error message and
1107      !!                increment the error number (nstop) by one.
1108      !!----------------------------------------------------------------------
1109      CHARACTER(len=*), INTENT(in   )           ::   cd1
1110      CHARACTER(len=*), INTENT(in   ), OPTIONAL ::        cd2, cd3, cd4, cd5
1111      CHARACTER(len=*), INTENT(in   ), OPTIONAL ::   cd6, cd7, cd8, cd9, cd10
1112      !!----------------------------------------------------------------------
1113      !
1114      nstop = nstop + 1
1115      !
1116      ! force to open ocean.output file if not already opened
1117      IF( numout == 6 ) CALL ctl_opn( numout, 'ocean.output', 'APPEND', 'FORMATTED', 'SEQUENTIAL', -1, 6, .FALSE. )
1118      !
1119                            WRITE(numout,*)
1120                            WRITE(numout,*) ' ===>>> : E R R O R'
1121                            WRITE(numout,*)
1122                            WRITE(numout,*) '         ==========='
1123                            WRITE(numout,*)
1124                            WRITE(numout,*) TRIM(cd1)
1125      IF( PRESENT(cd2 ) )   WRITE(numout,*) TRIM(cd2)
1126      IF( PRESENT(cd3 ) )   WRITE(numout,*) TRIM(cd3)
1127      IF( PRESENT(cd4 ) )   WRITE(numout,*) TRIM(cd4)
1128      IF( PRESENT(cd5 ) )   WRITE(numout,*) TRIM(cd5)
1129      IF( PRESENT(cd6 ) )   WRITE(numout,*) TRIM(cd6)
1130      IF( PRESENT(cd7 ) )   WRITE(numout,*) TRIM(cd7)
1131      IF( PRESENT(cd8 ) )   WRITE(numout,*) TRIM(cd8)
1132      IF( PRESENT(cd9 ) )   WRITE(numout,*) TRIM(cd9)
1133      IF( PRESENT(cd10) )   WRITE(numout,*) TRIM(cd10)
1134                            WRITE(numout,*)
1135      !
1136                               CALL FLUSH(numout    )
1137      IF( numstp     /= -1 )   CALL FLUSH(numstp    )
1138      IF( numrun     /= -1 )   CALL FLUSH(numrun    )
1139      IF( numevo_ice /= -1 )   CALL FLUSH(numevo_ice)
1140      !
1141      IF( cd1 == 'STOP' ) THEN
1142         WRITE(numout,*) 
1143         WRITE(numout,*)  'huge E-R-R-O-R : immediate stop'
1144         WRITE(numout,*) 
1145         CALL mppstop( ld_abort = .true. )
1146      ENDIF
1147      !
1148   END SUBROUTINE ctl_stop
1149
1150
1151   SUBROUTINE ctl_warn( cd1, cd2, cd3, cd4, cd5,   &
1152      &                 cd6, cd7, cd8, cd9, cd10 )
1153      !!----------------------------------------------------------------------
1154      !!                  ***  ROUTINE  stop_warn  ***
1155      !!
1156      !! ** Purpose :   print in ocean.outpput file a error message and
1157      !!                increment the warning number (nwarn) by one.
1158      !!----------------------------------------------------------------------
1159      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd1, cd2, cd3, cd4, cd5
1160      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd6, cd7, cd8, cd9, cd10
1161      !!----------------------------------------------------------------------
1162      !
1163      nwarn = nwarn + 1
1164      !
1165      IF(lwp) THEN
1166                               WRITE(numout,*)
1167                               WRITE(numout,*) ' ===>>> : W A R N I N G'
1168                               WRITE(numout,*)
1169                               WRITE(numout,*) '         ==============='
1170                               WRITE(numout,*)
1171         IF( PRESENT(cd1 ) )   WRITE(numout,*) TRIM(cd1)
1172         IF( PRESENT(cd2 ) )   WRITE(numout,*) TRIM(cd2)
1173         IF( PRESENT(cd3 ) )   WRITE(numout,*) TRIM(cd3)
1174         IF( PRESENT(cd4 ) )   WRITE(numout,*) TRIM(cd4)
1175         IF( PRESENT(cd5 ) )   WRITE(numout,*) TRIM(cd5)
1176         IF( PRESENT(cd6 ) )   WRITE(numout,*) TRIM(cd6)
1177         IF( PRESENT(cd7 ) )   WRITE(numout,*) TRIM(cd7)
1178         IF( PRESENT(cd8 ) )   WRITE(numout,*) TRIM(cd8)
1179         IF( PRESENT(cd9 ) )   WRITE(numout,*) TRIM(cd9)
1180         IF( PRESENT(cd10) )   WRITE(numout,*) TRIM(cd10)
1181                               WRITE(numout,*)
1182      ENDIF
1183      CALL FLUSH(numout)
1184      !
1185   END SUBROUTINE ctl_warn
1186
1187
1188   SUBROUTINE ctl_opn( knum, cdfile, cdstat, cdform, cdacce, klengh, kout, ldwp, karea )
1189      !!----------------------------------------------------------------------
1190      !!                  ***  ROUTINE ctl_opn  ***
1191      !!
1192      !! ** Purpose :   Open file and check if required file is available.
1193      !!
1194      !! ** Method  :   Fortan open
1195      !!----------------------------------------------------------------------
1196      INTEGER          , INTENT(  out) ::   knum      ! logical unit to open
1197      CHARACTER(len=*) , INTENT(in   ) ::   cdfile    ! file name to open
1198      CHARACTER(len=*) , INTENT(in   ) ::   cdstat    ! disposition specifier
1199      CHARACTER(len=*) , INTENT(in   ) ::   cdform    ! formatting specifier
1200      CHARACTER(len=*) , INTENT(in   ) ::   cdacce    ! access specifier
1201      INTEGER          , INTENT(in   ) ::   klengh    ! record length
1202      INTEGER          , INTENT(in   ) ::   kout      ! number of logical units for write
1203      LOGICAL          , INTENT(in   ) ::   ldwp      ! boolean term for print
1204      INTEGER, OPTIONAL, INTENT(in   ) ::   karea     ! proc number
1205      !
1206      CHARACTER(len=80) ::   clfile
1207      INTEGER           ::   iost
1208      !!----------------------------------------------------------------------
1209      !
1210      ! adapt filename
1211      ! ----------------
1212      clfile = TRIM(cdfile)
1213      IF( PRESENT( karea ) ) THEN
1214         IF( karea > 1 )   WRITE(clfile, "(a,'_',i4.4)") TRIM(clfile), karea-1
1215      ENDIF
1216#if defined key_agrif
1217      IF( .NOT. Agrif_Root() )   clfile = TRIM(Agrif_CFixed())//'_'//TRIM(clfile)
1218      knum=Agrif_Get_Unit()
1219#else
1220      knum=get_unit()
1221#endif
1222      IF( TRIM(cdfile) == '/dev/null' )   clfile = TRIM(cdfile)   ! force the use of /dev/null
1223      !
1224      IF(       cdacce(1:6) == 'DIRECT' )  THEN   ! cdacce has always more than 6 characters
1225         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS=cdstat, RECL=klengh         , ERR=100, IOSTAT=iost )
1226      ELSE IF( TRIM(cdstat) == 'APPEND' )  THEN   ! cdstat can have less than 6 characters
1227         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS='UNKNOWN', POSITION='APPEND', ERR=100, IOSTAT=iost )
1228      ELSE
1229         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS=cdstat                      , ERR=100, IOSTAT=iost )
1230      ENDIF
1231      IF( iost /= 0 .AND. TRIM(clfile) == '/dev/null' ) &   ! for windows
1232         &  OPEN(UNIT=knum,FILE='NUL', FORM=cdform, ACCESS=cdacce, STATUS=cdstat                      , ERR=100, IOSTAT=iost )   
1233      IF( iost == 0 ) THEN
1234         IF(ldwp .AND. kout > 0) THEN
1235            WRITE(kout,*) '     file   : ', TRIM(clfile),' open ok'
1236            WRITE(kout,*) '     unit   = ', knum
1237            WRITE(kout,*) '     status = ', cdstat
1238            WRITE(kout,*) '     form   = ', cdform
1239            WRITE(kout,*) '     access = ', cdacce
1240            WRITE(kout,*)
1241         ENDIF
1242      ENDIF
1243100   CONTINUE
1244      IF( iost /= 0 ) THEN
1245         WRITE(ctmp1,*) ' ===>>>> : bad opening file: ', TRIM(clfile)
1246         WRITE(ctmp2,*) ' =======   ===  '
1247         WRITE(ctmp3,*) '           unit   = ', knum
1248         WRITE(ctmp4,*) '           status = ', cdstat
1249         WRITE(ctmp5,*) '           form   = ', cdform
1250         WRITE(ctmp6,*) '           access = ', cdacce
1251         WRITE(ctmp7,*) '           iostat = ', iost
1252         WRITE(ctmp8,*) '           we stop. verify the file '
1253         CALL ctl_stop( 'STOP', ctmp1, ctmp2, ctmp3, ctmp4, ctmp5, ctmp6, ctmp7, ctmp8 )
1254      ENDIF
1255      !
1256   END SUBROUTINE ctl_opn
1257
1258
1259   SUBROUTINE ctl_nam ( kios, cdnam )
1260      !!----------------------------------------------------------------------
1261      !!                  ***  ROUTINE ctl_nam  ***
1262      !!
1263      !! ** Purpose :   Informations when error while reading a namelist
1264      !!
1265      !! ** Method  :   Fortan open
1266      !!----------------------------------------------------------------------
1267      INTEGER                                , INTENT(inout) ::   kios    ! IO status after reading the namelist
1268      CHARACTER(len=*)                       , INTENT(in   ) ::   cdnam   ! group name of namelist for which error occurs
1269      !
1270      CHARACTER(len=5) ::   clios   ! string to convert iostat in character for print
1271      !!----------------------------------------------------------------------
1272      !
1273      WRITE (clios, '(I5.0)')   kios
1274      IF( kios < 0 ) THEN         
1275         CALL ctl_warn( 'end of record or file while reading namelist '   &
1276            &           // TRIM(cdnam) // ' iostat = ' // TRIM(clios) )
1277      ENDIF
1278      !
1279      IF( kios > 0 ) THEN
1280         CALL ctl_stop( 'misspelled variable in namelist '   &
1281            &           // TRIM(cdnam) // ' iostat = ' // TRIM(clios) )
1282      ENDIF
1283      kios = 0
1284      !
1285   END SUBROUTINE ctl_nam
1286
1287
1288   INTEGER FUNCTION get_unit()
1289      !!----------------------------------------------------------------------
1290      !!                  ***  FUNCTION  get_unit  ***
1291      !!
1292      !! ** Purpose :   return the index of an unused logical unit
1293      !!----------------------------------------------------------------------
1294      LOGICAL :: llopn
1295      !!----------------------------------------------------------------------
1296      !
1297      get_unit = 15   ! choose a unit that is big enough then it is not already used in NEMO
1298      llopn = .TRUE.
1299      DO WHILE( (get_unit < 998) .AND. llopn )
1300         get_unit = get_unit + 1
1301         INQUIRE( unit = get_unit, opened = llopn )
1302      END DO
1303      IF( (get_unit == 999) .AND. llopn ) THEN
1304         CALL ctl_stop( 'STOP', 'get_unit: All logical units until 999 are used...' )
1305      ENDIF
1306      !
1307   END FUNCTION get_unit
1308
1309   SUBROUTINE load_nml( cdnambuff , cdnamfile, kout, ldwp)
1310      CHARACTER(LEN=:)    , ALLOCATABLE, INTENT(INOUT) :: cdnambuff
1311      CHARACTER(LEN=*), INTENT(IN )                :: cdnamfile
1312      CHARACTER(LEN=256)                           :: chline
1313      CHARACTER(LEN=1)                             :: csp
1314      INTEGER, INTENT(IN)                          :: kout
1315      LOGICAL, INTENT(IN)                          :: ldwp  !: .true. only for the root broadcaster
1316      INTEGER                                      :: itot, iun, iltc, inl, ios, itotsav
1317      !
1318      !csp = NEW_LINE('A')
1319      ! a new line character is the best seperator but some systems (e.g.Cray)
1320      ! seem to terminate namelist reads from internal files early if they
1321      ! encounter new-lines. Use a single space for safety.
1322      csp = ' '
1323      !
1324      ! Check if the namelist buffer has already been allocated. Return if it has.
1325      !
1326      IF ( ALLOCATED( cdnambuff ) ) RETURN
1327      IF( ldwp ) THEN
1328         !
1329         ! Open namelist file
1330         !
1331         CALL ctl_opn( iun, cdnamfile, 'OLD', 'FORMATTED', 'SEQUENTIAL', -1, kout, ldwp )
1332         !
1333         ! First pass: count characters excluding comments and trimable white space
1334         !
1335         itot=0
1336     10  READ(iun,'(A256)',END=20,ERR=20) chline
1337         iltc = LEN_TRIM(chline)
1338         IF ( iltc.GT.0 ) THEN
1339          inl = INDEX(chline, '!') 
1340          IF( inl.eq.0 ) THEN
1341           itot = itot + iltc + 1                                ! +1 for the newline character
1342          ELSEIF( inl.GT.0 .AND. LEN_TRIM( chline(1:inl-1) ).GT.0 ) THEN
1343           itot = itot + inl                                  !  includes +1 for the newline character
1344          ENDIF
1345         ENDIF
1346         GOTO 10
1347     20  CONTINUE
1348         !
1349         ! Allocate text cdnambuff for condensed namelist
1350         !
1351!$AGRIF_DO_NOT_TREAT
1352         ALLOCATE( CHARACTER(LEN=itot) :: cdnambuff )
1353!$AGRIF_END_DO_NOT_TREAT
1354         itotsav = itot
1355         !
1356         ! Second pass: read and transfer pruned characters into cdnambuff
1357         !
1358         REWIND(iun)
1359         itot=1
1360     30  READ(iun,'(A256)',END=40,ERR=40) chline
1361         iltc = LEN_TRIM(chline)
1362         IF ( iltc.GT.0 ) THEN
1363          inl = INDEX(chline, '!')
1364          IF( inl.eq.0 ) THEN
1365           inl = iltc
1366          ELSE
1367           inl = inl - 1
1368          ENDIF
1369          IF( inl.GT.0 .AND. LEN_TRIM( chline(1:inl) ).GT.0 ) THEN
1370             cdnambuff(itot:itot+inl-1) = chline(1:inl)
1371             WRITE( cdnambuff(itot+inl:itot+inl), '(a)' ) csp
1372             itot = itot + inl + 1
1373          ENDIF
1374         ENDIF
1375         GOTO 30
1376     40  CONTINUE
1377         itot = itot - 1
1378         IF( itotsav .NE. itot ) WRITE(*,*) 'WARNING in load_nml. Allocated ',itotsav,' for read buffer; but used ',itot
1379         !
1380         ! Close namelist file
1381         !
1382         CLOSE(iun)
1383         !write(*,'(32A)') cdnambuff
1384      ENDIF
1385#if defined key_mpp_mpi
1386      CALL mpp_bcast_nml( cdnambuff, itot )
1387#endif
1388  END SUBROUTINE load_nml
1389
1390
1391   !!----------------------------------------------------------------------
1392END MODULE lib_mpp
Note: See TracBrowser for help on using the repository browser.