New URL for NEMO forge!   http://forge.nemo-ocean.eu

Since March 2022 along with NEMO 4.2 release, the code development moved to a self-hosted GitLab.
This present forge is now archived and remained online for history.
lib_mpp.F90 in NEMO/trunk/src/OCE/LBC – NEMO

source: NEMO/trunk/src/OCE/LBC/lib_mpp.F90 @ 13636

Last change on this file since 13636 was 13636, checked in by mathiot, 4 years ago

fix ticket #2551 in trunk

  • Property svn:keywords set to Id
File size: 69.3 KB
Line 
1MODULE lib_mpp
2   !!======================================================================
3   !!                       ***  MODULE  lib_mpp  ***
4   !! Ocean numerics:  massively parallel processing library
5   !!=====================================================================
6   !! History :  OPA  !  1994  (M. Guyon, J. Escobar, M. Imbard)  Original code
7   !!            7.0  !  1997  (A.M. Treguier)  SHMEM additions
8   !!            8.0  !  1998  (M. Imbard, J. Escobar, L. Colombet ) SHMEM and MPI
9   !!                 !  1998  (J.M. Molines) Open boundary conditions
10   !!   NEMO     1.0  !  2003  (J.M. Molines, G. Madec)  F90, free form
11   !!                 !  2003  (J.M. Molines) add mpp_ini_north(_3d,_2d)
12   !!             -   !  2004  (R. Bourdalle Badie)  isend option in mpi
13   !!                 !  2004  (J.M. Molines) minloc, maxloc
14   !!             -   !  2005  (G. Madec, S. Masson)  npolj=5,6 F-point & ice cases
15   !!             -   !  2005  (R. Redler) Replacement of MPI_COMM_WORLD except for MPI_Abort
16   !!             -   !  2005  (R. Benshila, G. Madec)  add extra halo case
17   !!             -   !  2008  (R. Benshila) add mpp_ini_ice
18   !!            3.2  !  2009  (R. Benshila) SHMEM suppression, north fold in lbc_nfd
19   !!            3.2  !  2009  (O. Marti)    add mpp_ini_znl
20   !!            4.0  !  2011  (G. Madec)  move ctl_ routines from in_out_manager
21   !!            3.5  !  2012  (S.Mocavero, I. Epicoco) Add mpp_lnk_bdy_3d/2d routines to optimize the BDY comm.
22   !!            3.5  !  2013  (C. Ethe, G. Madec)  message passing arrays as local variables
23   !!            3.5  !  2013  (S.Mocavero, I.Epicoco - CMCC) north fold optimizations
24   !!            3.6  !  2015  (O. Tintó and M. Castrillo - BSC) Added '_multiple' case for 2D lbc and max
25   !!            4.0  !  2017  (G. Madec) automatique allocation of array argument (use any 3rd dimension)
26   !!             -   !  2017  (G. Madec) create generic.h90 files to generate all lbc and north fold routines
27   !!----------------------------------------------------------------------
28
29   !!----------------------------------------------------------------------
30   !!   ctl_stop      : update momentum and tracer Kz from a tke scheme
31   !!   ctl_warn      : initialization, namelist read, and parameters control
32   !!   ctl_opn       : Open file and check if required file is available.
33   !!   ctl_nam       : Prints informations when an error occurs while reading a namelist
34   !!   load_nml      : Read, condense and buffer namelist file into character array for use as an internal file
35   !!----------------------------------------------------------------------
36   !!----------------------------------------------------------------------
37   !!   mpp_start     : get local communicator its size and rank
38   !!   mpp_lnk       : interface (defined in lbclnk) for message passing of 2d or 3d arrays (mpp_lnk_2d, mpp_lnk_3d)
39   !!   mpp_lnk_icb   : interface for message passing of 2d arrays with extra halo for icebergs (mpp_lnk_2d_icb)
40   !!   mpprecv       :
41   !!   mppsend       :
42   !!   mppscatter    :
43   !!   mppgather     :
44   !!   mpp_min       : generic interface for mppmin_int , mppmin_a_int , mppmin_real, mppmin_a_real
45   !!   mpp_max       : generic interface for mppmax_int , mppmax_a_int , mppmax_real, mppmax_a_real
46   !!   mpp_sum       : generic interface for mppsum_int , mppsum_a_int , mppsum_real, mppsum_a_real
47   !!   mpp_minloc    :
48   !!   mpp_maxloc    :
49   !!   mppsync       :
50   !!   mppstop       :
51   !!   mpp_ini_north : initialisation of north fold
52   !!   mpp_lbc_north_icb : alternative to mpp_nfd for extra outer halo with icebergs
53   !!   mpp_bcast_nml : broadcast/receive namelist character buffer from reading process to all others
54   !!----------------------------------------------------------------------
55   USE dom_oce        ! ocean space and time domain
56   USE in_out_manager ! I/O manager
57
58   IMPLICIT NONE
59   PRIVATE
60   !
61   PUBLIC   ctl_stop, ctl_warn, ctl_opn, ctl_nam, load_nml
62   PUBLIC   mpp_start, mppstop, mppsync, mpp_comm_free
63   PUBLIC   mpp_ini_north
64   PUBLIC   mpp_min, mpp_max, mpp_sum, mpp_minloc, mpp_maxloc
65   PUBLIC   mpp_delay_max, mpp_delay_sum, mpp_delay_rcv
66   PUBLIC   mppscatter, mppgather
67   PUBLIC   mpp_ini_znl
68   PUBLIC   mppsend, mpprecv                          ! needed by TAM and ICB routines
69   PUBLIC   mppsend_sp, mpprecv_sp                          ! needed by TAM and ICB routines
70   PUBLIC   mppsend_dp, mpprecv_dp                          ! needed by TAM and ICB routines
71   PUBLIC   mpp_report
72   PUBLIC   mpp_bcast_nml
73   PUBLIC   tic_tac
74#if ! defined key_mpp_mpi
75   PUBLIC MPI_wait
76   PUBLIC MPI_Wtime
77#endif
78   
79   !! * Interfaces
80   !! define generic interface for these routine as they are called sometimes
81   !! with scalar arguments instead of array arguments, which causes problems
82   !! for the compilation on AIX system as well as NEC and SGI. Ok on COMPACQ
83   INTERFACE mpp_min
84      MODULE PROCEDURE mppmin_a_int, mppmin_int
85      MODULE PROCEDURE mppmin_a_real_sp, mppmin_real_sp
86      MODULE PROCEDURE mppmin_a_real_dp, mppmin_real_dp
87   END INTERFACE
88   INTERFACE mpp_max
89      MODULE PROCEDURE mppmax_a_int, mppmax_int
90      MODULE PROCEDURE mppmax_a_real_sp, mppmax_real_sp
91      MODULE PROCEDURE mppmax_a_real_dp, mppmax_real_dp
92   END INTERFACE
93   INTERFACE mpp_sum
94      MODULE PROCEDURE mppsum_a_int, mppsum_int
95      MODULE PROCEDURE mppsum_realdd, mppsum_a_realdd
96      MODULE PROCEDURE mppsum_a_real_sp, mppsum_real_sp
97      MODULE PROCEDURE mppsum_a_real_dp, mppsum_real_dp
98   END INTERFACE
99   INTERFACE mpp_minloc
100      MODULE PROCEDURE mpp_minloc2d_sp ,mpp_minloc3d_sp
101      MODULE PROCEDURE mpp_minloc2d_dp ,mpp_minloc3d_dp
102   END INTERFACE
103   INTERFACE mpp_maxloc
104      MODULE PROCEDURE mpp_maxloc2d_sp ,mpp_maxloc3d_sp
105      MODULE PROCEDURE mpp_maxloc2d_dp ,mpp_maxloc3d_dp
106   END INTERFACE
107
108   !! ========================= !!
109   !!  MPI  variable definition !!
110   !! ========================= !!
111#if   defined key_mpp_mpi
112!$AGRIF_DO_NOT_TREAT
113   INCLUDE 'mpif.h'
114!$AGRIF_END_DO_NOT_TREAT
115   LOGICAL, PUBLIC, PARAMETER ::   lk_mpp = .TRUE.    !: mpp flag
116#else   
117   INTEGER, PUBLIC, PARAMETER ::   MPI_STATUS_SIZE = 1
118   INTEGER, PUBLIC, PARAMETER ::   MPI_REAL = 4
119   INTEGER, PUBLIC, PARAMETER ::   MPI_DOUBLE_PRECISION = 8
120   LOGICAL, PUBLIC, PARAMETER ::   lk_mpp = .FALSE.    !: mpp flag
121#endif
122
123   INTEGER, PARAMETER         ::   nprocmax = 2**10   ! maximun dimension (required to be a power of 2)
124
125   INTEGER, PUBLIC ::   mppsize        ! number of process
126   INTEGER, PUBLIC ::   mpprank        ! process number  [ 0 - size-1 ]
127!$AGRIF_DO_NOT_TREAT
128   INTEGER, PUBLIC ::   mpi_comm_oce   ! opa local communicator
129!$AGRIF_END_DO_NOT_TREAT
130
131   INTEGER :: MPI_SUMDD
132
133   ! variables used for zonal integration
134   INTEGER, PUBLIC ::   ncomm_znl       !: communicator made by the processors on the same zonal average
135   LOGICAL, PUBLIC ::   l_znl_root      !: True on the 'left'most processor on the same row
136   INTEGER         ::   ngrp_znl        !  group ID for the znl processors
137   INTEGER         ::   ndim_rank_znl   !  number of processors on the same zonal average
138   INTEGER, DIMENSION(:), ALLOCATABLE, SAVE ::   nrank_znl  ! dimension ndim_rank_znl, number of the procs into the same znl domain
139
140   ! North fold condition in mpp_mpi with jpni > 1 (PUBLIC for TAM)
141   INTEGER, PUBLIC ::   ngrp_world        !: group ID for the world processors
142   INTEGER, PUBLIC ::   ngrp_opa          !: group ID for the opa processors
143   INTEGER, PUBLIC ::   ngrp_north        !: group ID for the northern processors (to be fold)
144   INTEGER, PUBLIC ::   ncomm_north       !: communicator made by the processors belonging to ngrp_north
145   INTEGER, PUBLIC ::   ndim_rank_north   !: number of 'sea' processor in the northern line (can be /= jpni !)
146   INTEGER, PUBLIC ::   njmppmax          !: value of njmpp for the processors of the northern line
147   INTEGER, PUBLIC ::   north_root        !: number (in the comm_opa) of proc 0 in the northern comm
148   INTEGER, PUBLIC, DIMENSION(:), ALLOCATABLE, SAVE ::   nrank_north   !: dimension ndim_rank_north
149
150   ! Communications summary report
151   CHARACTER(len=lca), DIMENSION(:), ALLOCATABLE ::   crname_lbc                   !: names of lbc_lnk calling routines
152   CHARACTER(len=lca), DIMENSION(:), ALLOCATABLE ::   crname_glb                   !: names of global comm calling routines
153   CHARACTER(len=lca), DIMENSION(:), ALLOCATABLE ::   crname_dlg                   !: names of delayed global comm calling routines
154   INTEGER, PUBLIC                               ::   ncom_stp = 0                 !: copy of time step # istp
155   INTEGER, PUBLIC                               ::   ncom_fsbc = 1                !: copy of sbc time step # nn_fsbc
156   INTEGER, PUBLIC                               ::   ncom_freq                    !: frequency of comm diagnostic
157   INTEGER, PUBLIC , DIMENSION(:,:), ALLOCATABLE ::   ncomm_sequence               !: size of communicated arrays (halos)
158   INTEGER, PARAMETER, PUBLIC                    ::   ncom_rec_max = 5000          !: max number of communication record
159   INTEGER, PUBLIC                               ::   n_sequence_lbc = 0           !: # of communicated arraysvia lbc
160   INTEGER, PUBLIC                               ::   n_sequence_glb = 0           !: # of global communications
161   INTEGER, PUBLIC                               ::   n_sequence_dlg = 0           !: # of delayed global communications
162   INTEGER, PUBLIC                               ::   numcom = -1                  !: logical unit for communicaton report
163   LOGICAL, PUBLIC                               ::   l_full_nf_update = .TRUE.    !: logical for a full (2lines) update of bc at North fold report
164   INTEGER,                    PARAMETER, PUBLIC ::   nbdelay = 2       !: number of delayed operations
165   !: name (used as id) of allreduce-delayed operations
166   ! Warning: we must use the same character length in an array constructor (at least for gcc compiler)
167   CHARACTER(len=32), DIMENSION(nbdelay), PUBLIC ::   c_delaylist = (/ 'cflice', 'fwb   ' /)
168   !: component name where the allreduce-delayed operation is performed
169   CHARACTER(len=3),  DIMENSION(nbdelay), PUBLIC ::   c_delaycpnt = (/ 'ICE'   , 'OCE' /)
170   TYPE, PUBLIC ::   DELAYARR
171      REAL(   wp), POINTER, DIMENSION(:) ::  z1d => NULL()
172      COMPLEX(dp), POINTER, DIMENSION(:) ::  y1d => NULL()
173   END TYPE DELAYARR
174   TYPE( DELAYARR ), DIMENSION(nbdelay), PUBLIC, SAVE  ::   todelay         !: must have SAVE for default initialization of DELAYARR
175   INTEGER,          DIMENSION(nbdelay), PUBLIC        ::   ndelayid = -1   !: mpi request id of the delayed operations
176
177   ! timing summary report
178   REAL(dp), DIMENSION(2), PUBLIC ::  waiting_time = 0._dp
179   REAL(dp)              , PUBLIC ::  compute_time = 0._dp, elapsed_time = 0._dp
180   
181   REAL(wp), DIMENSION(:), ALLOCATABLE, SAVE ::   tampon   ! buffer in case of bsend
182
183   LOGICAL, PUBLIC ::   ln_nnogather                !: namelist control of northfold comms
184   LOGICAL, PUBLIC ::   l_north_nogather = .FALSE.  !: internal control of northfold comms
185   
186   !! * Substitutions
187#  include "do_loop_substitute.h90"
188   !!----------------------------------------------------------------------
189   !! NEMO/OCE 4.0 , NEMO Consortium (2018)
190   !! $Id$
191   !! Software governed by the CeCILL license (see ./LICENSE)
192   !!----------------------------------------------------------------------
193CONTAINS
194
195   SUBROUTINE mpp_start( localComm )
196      !!----------------------------------------------------------------------
197      !!                  ***  routine mpp_start  ***
198      !!
199      !! ** Purpose :   get mpi_comm_oce, mpprank and mppsize
200      !!----------------------------------------------------------------------
201      INTEGER         , OPTIONAL   , INTENT(in   ) ::   localComm    !
202      !
203      INTEGER ::   ierr
204      LOGICAL ::   llmpi_init
205      !!----------------------------------------------------------------------
206#if defined key_mpp_mpi
207      !
208      CALL mpi_initialized ( llmpi_init, ierr )
209      IF( ierr /= MPI_SUCCESS ) CALL ctl_stop( 'STOP', ' lib_mpp: Error in routine mpi_initialized' )
210
211      IF( .NOT. llmpi_init ) THEN
212         IF( PRESENT(localComm) ) THEN
213            WRITE(ctmp1,*) ' lib_mpp: You cannot provide a local communicator '
214            WRITE(ctmp2,*) '          without calling MPI_Init before ! '
215            CALL ctl_stop( 'STOP', ctmp1, ctmp2 )
216         ENDIF
217         CALL mpi_init( ierr )
218         IF( ierr /= MPI_SUCCESS ) CALL ctl_stop( 'STOP', ' lib_mpp: Error in routine mpi_init' )
219      ENDIF
220       
221      IF( PRESENT(localComm) ) THEN
222         IF( Agrif_Root() ) THEN
223            mpi_comm_oce = localComm
224         ENDIF
225      ELSE
226         CALL mpi_comm_dup( mpi_comm_world, mpi_comm_oce, ierr)
227         IF( ierr /= MPI_SUCCESS ) CALL ctl_stop( 'STOP', ' lib_mpp: Error in routine mpi_comm_dup' )
228      ENDIF
229
230# if defined key_agrif
231      IF( Agrif_Root() ) THEN
232         CALL Agrif_MPI_Init(mpi_comm_oce)
233      ELSE
234         CALL Agrif_MPI_set_grid_comm(mpi_comm_oce)
235      ENDIF
236# endif
237
238      CALL mpi_comm_rank( mpi_comm_oce, mpprank, ierr )
239      CALL mpi_comm_size( mpi_comm_oce, mppsize, ierr )
240      !
241      CALL MPI_OP_CREATE(DDPDD_MPI, .TRUE., MPI_SUMDD, ierr)
242      !
243#else
244      IF( PRESENT( localComm ) ) mpi_comm_oce = localComm
245      mppsize = 1
246      mpprank = 0
247#endif
248   END SUBROUTINE mpp_start
249
250
251   SUBROUTINE mppsend( ktyp, pmess, kbytes, kdest, md_req )
252      !!----------------------------------------------------------------------
253      !!                  ***  routine mppsend  ***
254      !!
255      !! ** Purpose :   Send messag passing array
256      !!
257      !!----------------------------------------------------------------------
258      REAL(wp), INTENT(inout) ::   pmess(*)   ! array of real
259      INTEGER , INTENT(in   ) ::   kbytes     ! size of the array pmess
260      INTEGER , INTENT(in   ) ::   kdest      ! receive process number
261      INTEGER , INTENT(in   ) ::   ktyp       ! tag of the message
262      INTEGER , INTENT(in   ) ::   md_req     ! argument for isend
263      !!
264      INTEGER ::   iflag
265      INTEGER :: mpi_working_type
266      !!----------------------------------------------------------------------
267      !
268#if defined key_mpp_mpi
269      IF (wp == dp) THEN
270         mpi_working_type = mpi_double_precision
271      ELSE
272         mpi_working_type = mpi_real
273      END IF
274      CALL mpi_isend( pmess, kbytes, mpi_working_type, kdest , ktyp, mpi_comm_oce, md_req, iflag )
275#endif
276      !
277   END SUBROUTINE mppsend
278
279
280   SUBROUTINE mppsend_dp( ktyp, pmess, kbytes, kdest, md_req )
281      !!----------------------------------------------------------------------
282      !!                  ***  routine mppsend  ***
283      !!
284      !! ** Purpose :   Send messag passing array
285      !!
286      !!----------------------------------------------------------------------
287      REAL(dp), INTENT(inout) ::   pmess(*)   ! array of real
288      INTEGER , INTENT(in   ) ::   kbytes     ! size of the array pmess
289      INTEGER , INTENT(in   ) ::   kdest      ! receive process number
290      INTEGER , INTENT(in   ) ::   ktyp       ! tag of the message
291      INTEGER , INTENT(in   ) ::   md_req     ! argument for isend
292      !!
293      INTEGER ::   iflag
294      !!----------------------------------------------------------------------
295      !
296#if defined key_mpp_mpi
297      CALL mpi_isend( pmess, kbytes, mpi_double_precision, kdest , ktyp, mpi_comm_oce, md_req, iflag )
298#endif
299      !
300   END SUBROUTINE mppsend_dp
301
302
303   SUBROUTINE mppsend_sp( ktyp, pmess, kbytes, kdest, md_req )
304      !!----------------------------------------------------------------------
305      !!                  ***  routine mppsend  ***
306      !!
307      !! ** Purpose :   Send messag passing array
308      !!
309      !!----------------------------------------------------------------------
310      REAL(sp), INTENT(inout) ::   pmess(*)   ! array of real
311      INTEGER , INTENT(in   ) ::   kbytes     ! size of the array pmess
312      INTEGER , INTENT(in   ) ::   kdest      ! receive process number
313      INTEGER , INTENT(in   ) ::   ktyp       ! tag of the message
314      INTEGER , INTENT(in   ) ::   md_req     ! argument for isend
315      !!
316      INTEGER ::   iflag
317      !!----------------------------------------------------------------------
318      !
319#if defined key_mpp_mpi
320      CALL mpi_isend( pmess, kbytes, mpi_real, kdest , ktyp, mpi_comm_oce, md_req, iflag )
321#endif
322      !
323   END SUBROUTINE mppsend_sp
324
325
326   SUBROUTINE mpprecv( ktyp, pmess, kbytes, ksource )
327      !!----------------------------------------------------------------------
328      !!                  ***  routine mpprecv  ***
329      !!
330      !! ** Purpose :   Receive messag passing array
331      !!
332      !!----------------------------------------------------------------------
333      REAL(wp), INTENT(inout) ::   pmess(*)   ! array of real
334      INTEGER , INTENT(in   ) ::   kbytes     ! suze of the array pmess
335      INTEGER , INTENT(in   ) ::   ktyp       ! Tag of the recevied message
336      INTEGER, OPTIONAL, INTENT(in) :: ksource    ! source process number
337      !!
338      INTEGER :: istatus(mpi_status_size)
339      INTEGER :: iflag
340      INTEGER :: use_source
341      INTEGER :: mpi_working_type
342      !!----------------------------------------------------------------------
343      !
344#if defined key_mpp_mpi
345      ! If a specific process number has been passed to the receive call,
346      ! use that one. Default is to use mpi_any_source
347      use_source = mpi_any_source
348      IF( PRESENT(ksource) )   use_source = ksource
349      !
350      IF (wp == dp) THEN
351         mpi_working_type = mpi_double_precision
352      ELSE
353         mpi_working_type = mpi_real
354      END IF
355      CALL mpi_recv( pmess, kbytes, mpi_working_type, use_source, ktyp, mpi_comm_oce, istatus, iflag )
356#endif
357      !
358   END SUBROUTINE mpprecv
359
360   SUBROUTINE mpprecv_dp( ktyp, pmess, kbytes, ksource )
361      !!----------------------------------------------------------------------
362      !!                  ***  routine mpprecv  ***
363      !!
364      !! ** Purpose :   Receive messag passing array
365      !!
366      !!----------------------------------------------------------------------
367      REAL(dp), INTENT(inout) ::   pmess(*)   ! array of real
368      INTEGER , INTENT(in   ) ::   kbytes     ! suze of the array pmess
369      INTEGER , INTENT(in   ) ::   ktyp       ! Tag of the recevied message
370      INTEGER, OPTIONAL, INTENT(in) :: ksource    ! source process number
371      !!
372      INTEGER :: istatus(mpi_status_size)
373      INTEGER :: iflag
374      INTEGER :: use_source
375      !!----------------------------------------------------------------------
376      !
377#if defined key_mpp_mpi
378      ! If a specific process number has been passed to the receive call,
379      ! use that one. Default is to use mpi_any_source
380      use_source = mpi_any_source
381      IF( PRESENT(ksource) )   use_source = ksource
382      !
383      CALL mpi_recv( pmess, kbytes, mpi_double_precision, use_source, ktyp, mpi_comm_oce, istatus, iflag )
384#endif
385      !
386   END SUBROUTINE mpprecv_dp
387
388
389   SUBROUTINE mpprecv_sp( ktyp, pmess, kbytes, ksource )
390      !!----------------------------------------------------------------------
391      !!                  ***  routine mpprecv  ***
392      !!
393      !! ** Purpose :   Receive messag passing array
394      !!
395      !!----------------------------------------------------------------------
396      REAL(sp), INTENT(inout) ::   pmess(*)   ! array of real
397      INTEGER , INTENT(in   ) ::   kbytes     ! suze of the array pmess
398      INTEGER , INTENT(in   ) ::   ktyp       ! Tag of the recevied message
399      INTEGER, OPTIONAL, INTENT(in) :: ksource    ! source process number
400      !!
401      INTEGER :: istatus(mpi_status_size)
402      INTEGER :: iflag
403      INTEGER :: use_source
404      !!----------------------------------------------------------------------
405      !
406#if defined key_mpp_mpi
407      ! If a specific process number has been passed to the receive call,
408      ! use that one. Default is to use mpi_any_source
409      use_source = mpi_any_source
410      IF( PRESENT(ksource) )   use_source = ksource
411      !
412      CALL mpi_recv( pmess, kbytes, mpi_real, use_source, ktyp, mpi_comm_oce, istatus, iflag )
413#endif
414      !
415   END SUBROUTINE mpprecv_sp
416
417
418   SUBROUTINE mppgather( ptab, kp, pio )
419      !!----------------------------------------------------------------------
420      !!                   ***  routine mppgather  ***
421      !!
422      !! ** Purpose :   Transfert between a local subdomain array and a work
423      !!     array which is distributed following the vertical level.
424      !!
425      !!----------------------------------------------------------------------
426      REAL(wp), DIMENSION(jpi,jpj)      , INTENT(in   ) ::   ptab   ! subdomain input array
427      INTEGER                           , INTENT(in   ) ::   kp     ! record length
428      REAL(wp), DIMENSION(jpi,jpj,jpnij), INTENT(  out) ::   pio    ! subdomain input array
429      !!
430      INTEGER :: itaille, ierror   ! temporary integer
431      !!---------------------------------------------------------------------
432      !
433      itaille = jpi * jpj
434#if defined key_mpp_mpi
435      CALL mpi_gather( ptab, itaille, mpi_double_precision, pio, itaille     ,   &
436         &                            mpi_double_precision, kp , mpi_comm_oce, ierror )
437#else
438      pio(:,:,1) = ptab(:,:)
439#endif
440      !
441   END SUBROUTINE mppgather
442
443
444   SUBROUTINE mppscatter( pio, kp, ptab )
445      !!----------------------------------------------------------------------
446      !!                  ***  routine mppscatter  ***
447      !!
448      !! ** Purpose :   Transfert between awork array which is distributed
449      !!      following the vertical level and the local subdomain array.
450      !!
451      !!----------------------------------------------------------------------
452      REAL(wp), DIMENSION(jpi,jpj,jpnij)  ::   pio    ! output array
453      INTEGER                             ::   kp     ! Tag (not used with MPI
454      REAL(wp), DIMENSION(jpi,jpj)        ::   ptab   ! subdomain array input
455      !!
456      INTEGER :: itaille, ierror   ! temporary integer
457      !!---------------------------------------------------------------------
458      !
459      itaille = jpi * jpj
460      !
461#if defined key_mpp_mpi
462      CALL mpi_scatter( pio, itaille, mpi_double_precision, ptab, itaille     ,   &
463         &                            mpi_double_precision, kp  , mpi_comm_oce, ierror )
464#else
465      ptab(:,:) = pio(:,:,1)
466#endif
467      !
468   END SUBROUTINE mppscatter
469
470   
471   SUBROUTINE mpp_delay_sum( cdname, cdelay, y_in, pout, ldlast, kcom )
472     !!----------------------------------------------------------------------
473      !!                   ***  routine mpp_delay_sum  ***
474      !!
475      !! ** Purpose :   performed delayed mpp_sum, the result is received on next call
476      !!
477      !!----------------------------------------------------------------------
478      CHARACTER(len=*), INTENT(in   )               ::   cdname  ! name of the calling subroutine
479      CHARACTER(len=*), INTENT(in   )               ::   cdelay  ! name (used as id) of the delayed operation
480      COMPLEX(dp),      INTENT(in   ), DIMENSION(:) ::   y_in
481      REAL(wp),         INTENT(  out), DIMENSION(:) ::   pout
482      LOGICAL,          INTENT(in   )               ::   ldlast  ! true if this is the last time we call this routine
483      INTEGER,          INTENT(in   ), OPTIONAL     ::   kcom
484      !!
485      INTEGER ::   ji, isz
486      INTEGER ::   idvar
487      INTEGER ::   ierr, ilocalcomm
488      COMPLEX(dp), ALLOCATABLE, DIMENSION(:) ::   ytmp
489      !!----------------------------------------------------------------------
490#if defined key_mpp_mpi
491      ilocalcomm = mpi_comm_oce
492      IF( PRESENT(kcom) )   ilocalcomm = kcom
493
494      isz = SIZE(y_in)
495     
496      IF( narea == 1 .AND. numcom == -1 ) CALL mpp_report( cdname, ld_dlg = .TRUE. )
497
498      idvar = -1
499      DO ji = 1, nbdelay
500         IF( TRIM(cdelay) == TRIM(c_delaylist(ji)) ) idvar = ji
501      END DO
502      IF ( idvar == -1 )   CALL ctl_stop( 'STOP',' mpp_delay_sum : please add a new delayed exchange for '//TRIM(cdname) )
503
504      IF ( ndelayid(idvar) == 0 ) THEN         ! first call    with restart: %z1d defined in iom_delay_rst
505         !                                       --------------------------
506         IF ( SIZE(todelay(idvar)%z1d) /= isz ) THEN                  ! Check dimension coherence
507            IF(lwp) WRITE(numout,*) ' WARNING: the nb of delayed variables in restart file is not the model one'
508            DEALLOCATE(todelay(idvar)%z1d)
509            ndelayid(idvar) = -1                                      ! do as if we had no restart
510         ELSE
511            ALLOCATE(todelay(idvar)%y1d(isz))
512            todelay(idvar)%y1d(:) = CMPLX(todelay(idvar)%z1d(:), 0., wp)   ! create %y1d, complex variable needed by mpi_sumdd
513            ndelayid(idvar) = MPI_REQUEST_NULL                             ! initialised request to a valid value
514         END IF
515      ENDIF
516     
517      IF( ndelayid(idvar) == -1 ) THEN         ! first call without restart: define %y1d and %z1d from y_in with blocking allreduce
518         !                                       --------------------------
519         ALLOCATE(todelay(idvar)%z1d(isz), todelay(idvar)%y1d(isz))   ! allocate also %z1d as used for the restart
520         CALL mpi_allreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ierr )   ! get %y1d
521         ndelayid(idvar) = MPI_REQUEST_NULL
522      ENDIF
523
524      CALL mpp_delay_rcv( idvar )         ! make sure %z1d is received
525
526      ! send back pout from todelay(idvar)%z1d defined at previous call
527      pout(:) = todelay(idvar)%z1d(:)
528
529      ! send y_in into todelay(idvar)%y1d with a non-blocking communication
530# if defined key_mpi2
531      IF( ln_timing ) CALL tic_tac( .TRUE., ld_global = .TRUE.)
532      CALL  mpi_allreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ierr )
533      ndelayid(idvar) = MPI_REQUEST_NULL
534      IF( ln_timing ) CALL tic_tac(.FALSE., ld_global = .TRUE.)
535# else
536      CALL mpi_iallreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ndelayid(idvar), ierr )
537# endif
538#else
539      pout(:) = REAL(y_in(:), wp)
540#endif
541
542   END SUBROUTINE mpp_delay_sum
543
544   
545   SUBROUTINE mpp_delay_max( cdname, cdelay, p_in, pout, ldlast, kcom )
546      !!----------------------------------------------------------------------
547      !!                   ***  routine mpp_delay_max  ***
548      !!
549      !! ** Purpose :   performed delayed mpp_max, the result is received on next call
550      !!
551      !!----------------------------------------------------------------------
552      CHARACTER(len=*), INTENT(in   )                 ::   cdname  ! name of the calling subroutine
553      CHARACTER(len=*), INTENT(in   )                 ::   cdelay  ! name (used as id) of the delayed operation
554      REAL(wp),         INTENT(in   ), DIMENSION(:)   ::   p_in    !
555      REAL(wp),         INTENT(  out), DIMENSION(:)   ::   pout    !
556      LOGICAL,          INTENT(in   )                 ::   ldlast  ! true if this is the last time we call this routine
557      INTEGER,          INTENT(in   ), OPTIONAL       ::   kcom
558      !!
559      INTEGER ::   ji, isz
560      INTEGER ::   idvar
561      INTEGER ::   ierr, ilocalcomm
562      INTEGER ::   MPI_TYPE
563      !!----------------------------------------------------------------------
564     
565#if defined key_mpp_mpi
566      if( wp == dp ) then
567         MPI_TYPE = MPI_DOUBLE_PRECISION
568      else if ( wp == sp ) then
569         MPI_TYPE = MPI_REAL
570      else
571        CALL ctl_stop( "Error defining type, wp is neither dp nor sp" )
572   
573      end if
574
575      ilocalcomm = mpi_comm_oce
576      IF( PRESENT(kcom) )   ilocalcomm = kcom
577
578      isz = SIZE(p_in)
579
580      IF( narea == 1 .AND. numcom == -1 ) CALL mpp_report( cdname, ld_dlg = .TRUE. )
581
582      idvar = -1
583      DO ji = 1, nbdelay
584         IF( TRIM(cdelay) == TRIM(c_delaylist(ji)) ) idvar = ji
585      END DO
586      IF ( idvar == -1 )   CALL ctl_stop( 'STOP',' mpp_delay_max : please add a new delayed exchange for '//TRIM(cdname) )
587
588      IF ( ndelayid(idvar) == 0 ) THEN         ! first call    with restart: %z1d defined in iom_delay_rst
589         !                                       --------------------------
590         IF ( SIZE(todelay(idvar)%z1d) /= isz ) THEN                  ! Check dimension coherence
591            IF(lwp) WRITE(numout,*) ' WARNING: the nb of delayed variables in restart file is not the model one'
592            DEALLOCATE(todelay(idvar)%z1d)
593            ndelayid(idvar) = -1                                      ! do as if we had no restart
594         ELSE
595            ndelayid(idvar) = MPI_REQUEST_NULL
596         END IF
597      ENDIF
598
599      IF( ndelayid(idvar) == -1 ) THEN         ! first call without restart: define %z1d from p_in with a blocking allreduce
600         !                                       --------------------------
601         ALLOCATE(todelay(idvar)%z1d(isz))
602         CALL mpi_allreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_DOUBLE_PRECISION, mpi_max, ilocalcomm, ierr )   ! get %z1d
603         ndelayid(idvar) = MPI_REQUEST_NULL
604      ENDIF
605
606      CALL mpp_delay_rcv( idvar )         ! make sure %z1d is received
607
608      ! send back pout from todelay(idvar)%z1d defined at previous call
609      pout(:) = todelay(idvar)%z1d(:)
610
611      ! send p_in into todelay(idvar)%z1d with a non-blocking communication
612      ! (PM) Should we get rid of MPI2 option ? MPI3 was release in 2013. Who is still using MPI2 ?
613# if defined key_mpi2
614      IF( ln_timing ) CALL tic_tac( .TRUE., ld_global = .TRUE.)
615      CALL  mpi_allreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_TYPE, mpi_max, ilocalcomm, ierr )
616      IF( ln_timing ) CALL tic_tac(.FALSE., ld_global = .TRUE.)
617# else
618      CALL mpi_iallreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_TYPE, mpi_max, ilocalcomm, ndelayid(idvar), ierr )
619# endif
620#else
621      pout(:) = p_in(:)
622#endif
623
624   END SUBROUTINE mpp_delay_max
625
626   
627   SUBROUTINE mpp_delay_rcv( kid )
628      !!----------------------------------------------------------------------
629      !!                   ***  routine mpp_delay_rcv  ***
630      !!
631      !! ** Purpose :  force barrier for delayed mpp (needed for restart)
632      !!
633      !!----------------------------------------------------------------------
634      INTEGER,INTENT(in   )      ::  kid 
635      INTEGER ::   ierr
636      !!----------------------------------------------------------------------
637#if defined key_mpp_mpi
638      IF( ln_timing ) CALL tic_tac( .TRUE., ld_global = .TRUE.)
639      ! test on ndelayid(kid) useless as mpi_wait return immediatly if the request handle is MPI_REQUEST_NULL
640      CALL mpi_wait( ndelayid(kid), MPI_STATUS_IGNORE, ierr ) ! after this ndelayid(kid) = MPI_REQUEST_NULL
641      IF( ln_timing ) CALL tic_tac( .FALSE., ld_global = .TRUE.)
642      IF( ASSOCIATED(todelay(kid)%y1d) )   todelay(kid)%z1d(:) = REAL(todelay(kid)%y1d(:), wp)  ! define %z1d from %y1d
643#endif
644   END SUBROUTINE mpp_delay_rcv
645
646   SUBROUTINE mpp_bcast_nml( cdnambuff , kleng )
647      CHARACTER(LEN=:)    , ALLOCATABLE, INTENT(INOUT) :: cdnambuff
648      INTEGER                          , INTENT(INOUT) :: kleng
649      !!----------------------------------------------------------------------
650      !!                  ***  routine mpp_bcast_nml  ***
651      !!
652      !! ** Purpose :   broadcast namelist character buffer
653      !!
654      !!----------------------------------------------------------------------
655      !!
656      INTEGER ::   iflag
657      !!----------------------------------------------------------------------
658      !
659#if defined key_mpp_mpi
660      call MPI_BCAST(kleng, 1, MPI_INT, 0, mpi_comm_oce, iflag)
661      call MPI_BARRIER(mpi_comm_oce, iflag)
662!$AGRIF_DO_NOT_TREAT
663      IF ( .NOT. ALLOCATED(cdnambuff) ) ALLOCATE( CHARACTER(LEN=kleng) :: cdnambuff )
664!$AGRIF_END_DO_NOT_TREAT
665      call MPI_BCAST(cdnambuff, kleng, MPI_CHARACTER, 0, mpi_comm_oce, iflag)
666      call MPI_BARRIER(mpi_comm_oce, iflag)
667#endif
668      !
669   END SUBROUTINE mpp_bcast_nml
670
671   
672   !!----------------------------------------------------------------------
673   !!    ***  mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real  ***
674   !!   
675   !!----------------------------------------------------------------------
676   !!
677#  define OPERATION_MAX
678#  define INTEGER_TYPE
679#  define DIM_0d
680#     define ROUTINE_ALLREDUCE           mppmax_int
681#     include "mpp_allreduce_generic.h90"
682#     undef ROUTINE_ALLREDUCE
683#  undef DIM_0d
684#  define DIM_1d
685#     define ROUTINE_ALLREDUCE           mppmax_a_int
686#     include "mpp_allreduce_generic.h90"
687#     undef ROUTINE_ALLREDUCE
688#  undef DIM_1d
689#  undef INTEGER_TYPE
690!
691   !!
692   !!   ----   SINGLE PRECISION VERSIONS
693   !!
694#  define SINGLE_PRECISION
695#  define REAL_TYPE
696#  define DIM_0d
697#     define ROUTINE_ALLREDUCE           mppmax_real_sp
698#     include "mpp_allreduce_generic.h90"
699#     undef ROUTINE_ALLREDUCE
700#  undef DIM_0d
701#  define DIM_1d
702#     define ROUTINE_ALLREDUCE           mppmax_a_real_sp
703#     include "mpp_allreduce_generic.h90"
704#     undef ROUTINE_ALLREDUCE
705#  undef DIM_1d
706#  undef SINGLE_PRECISION
707   !!
708   !!
709   !!   ----   DOUBLE PRECISION VERSIONS
710   !!
711!
712#  define DIM_0d
713#     define ROUTINE_ALLREDUCE           mppmax_real_dp
714#     include "mpp_allreduce_generic.h90"
715#     undef ROUTINE_ALLREDUCE
716#  undef DIM_0d
717#  define DIM_1d
718#     define ROUTINE_ALLREDUCE           mppmax_a_real_dp
719#     include "mpp_allreduce_generic.h90"
720#     undef ROUTINE_ALLREDUCE
721#  undef DIM_1d
722#  undef REAL_TYPE
723#  undef OPERATION_MAX
724   !!----------------------------------------------------------------------
725   !!    ***  mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real  ***
726   !!   
727   !!----------------------------------------------------------------------
728   !!
729#  define OPERATION_MIN
730#  define INTEGER_TYPE
731#  define DIM_0d
732#     define ROUTINE_ALLREDUCE           mppmin_int
733#     include "mpp_allreduce_generic.h90"
734#     undef ROUTINE_ALLREDUCE
735#  undef DIM_0d
736#  define DIM_1d
737#     define ROUTINE_ALLREDUCE           mppmin_a_int
738#     include "mpp_allreduce_generic.h90"
739#     undef ROUTINE_ALLREDUCE
740#  undef DIM_1d
741#  undef INTEGER_TYPE
742!
743   !!
744   !!   ----   SINGLE PRECISION VERSIONS
745   !!
746#  define SINGLE_PRECISION
747#  define REAL_TYPE
748#  define DIM_0d
749#     define ROUTINE_ALLREDUCE           mppmin_real_sp
750#     include "mpp_allreduce_generic.h90"
751#     undef ROUTINE_ALLREDUCE
752#  undef DIM_0d
753#  define DIM_1d
754#     define ROUTINE_ALLREDUCE           mppmin_a_real_sp
755#     include "mpp_allreduce_generic.h90"
756#     undef ROUTINE_ALLREDUCE
757#  undef DIM_1d
758#  undef SINGLE_PRECISION
759   !!
760   !!   ----   DOUBLE PRECISION VERSIONS
761   !!
762
763#  define DIM_0d
764#     define ROUTINE_ALLREDUCE           mppmin_real_dp
765#     include "mpp_allreduce_generic.h90"
766#     undef ROUTINE_ALLREDUCE
767#  undef DIM_0d
768#  define DIM_1d
769#     define ROUTINE_ALLREDUCE           mppmin_a_real_dp
770#     include "mpp_allreduce_generic.h90"
771#     undef ROUTINE_ALLREDUCE
772#  undef DIM_1d
773#  undef REAL_TYPE
774#  undef OPERATION_MIN
775
776   !!----------------------------------------------------------------------
777   !!    ***  mppsum_a_int, mppsum_int, mppsum_a_real, mppsum_real  ***
778   !!   
779   !!   Global sum of 1D array or a variable (integer, real or complex)
780   !!----------------------------------------------------------------------
781   !!
782#  define OPERATION_SUM
783#  define INTEGER_TYPE
784#  define DIM_0d
785#     define ROUTINE_ALLREDUCE           mppsum_int
786#     include "mpp_allreduce_generic.h90"
787#     undef ROUTINE_ALLREDUCE
788#  undef DIM_0d
789#  define DIM_1d
790#     define ROUTINE_ALLREDUCE           mppsum_a_int
791#     include "mpp_allreduce_generic.h90"
792#     undef ROUTINE_ALLREDUCE
793#  undef DIM_1d
794#  undef INTEGER_TYPE
795
796   !!
797   !!   ----   SINGLE PRECISION VERSIONS
798   !!
799#  define OPERATION_SUM
800#  define SINGLE_PRECISION
801#  define REAL_TYPE
802#  define DIM_0d
803#     define ROUTINE_ALLREDUCE           mppsum_real_sp
804#     include "mpp_allreduce_generic.h90"
805#     undef ROUTINE_ALLREDUCE
806#  undef DIM_0d
807#  define DIM_1d
808#     define ROUTINE_ALLREDUCE           mppsum_a_real_sp
809#     include "mpp_allreduce_generic.h90"
810#     undef ROUTINE_ALLREDUCE
811#  undef DIM_1d
812#  undef REAL_TYPE
813#  undef OPERATION_SUM
814
815#  undef SINGLE_PRECISION
816
817   !!
818   !!   ----   DOUBLE PRECISION VERSIONS
819   !!
820#  define OPERATION_SUM
821#  define REAL_TYPE
822#  define DIM_0d
823#     define ROUTINE_ALLREDUCE           mppsum_real_dp
824#     include "mpp_allreduce_generic.h90"
825#     undef ROUTINE_ALLREDUCE
826#  undef DIM_0d
827#  define DIM_1d
828#     define ROUTINE_ALLREDUCE           mppsum_a_real_dp
829#     include "mpp_allreduce_generic.h90"
830#     undef ROUTINE_ALLREDUCE
831#  undef DIM_1d
832#  undef REAL_TYPE
833#  undef OPERATION_SUM
834
835#  define OPERATION_SUM_DD
836#  define COMPLEX_TYPE
837#  define DIM_0d
838#     define ROUTINE_ALLREDUCE           mppsum_realdd
839#     include "mpp_allreduce_generic.h90"
840#     undef ROUTINE_ALLREDUCE
841#  undef DIM_0d
842#  define DIM_1d
843#     define ROUTINE_ALLREDUCE           mppsum_a_realdd
844#     include "mpp_allreduce_generic.h90"
845#     undef ROUTINE_ALLREDUCE
846#  undef DIM_1d
847#  undef COMPLEX_TYPE
848#  undef OPERATION_SUM_DD
849
850   !!----------------------------------------------------------------------
851   !!    ***  mpp_minloc2d, mpp_minloc3d, mpp_maxloc2d, mpp_maxloc3d
852   !!   
853   !!----------------------------------------------------------------------
854   !!
855   !!
856   !!   ----   SINGLE PRECISION VERSIONS
857   !!
858#  define SINGLE_PRECISION
859#  define OPERATION_MINLOC
860#  define DIM_2d
861#     define ROUTINE_LOC           mpp_minloc2d_sp
862#     include "mpp_loc_generic.h90"
863#     undef ROUTINE_LOC
864#  undef DIM_2d
865#  define DIM_3d
866#     define ROUTINE_LOC           mpp_minloc3d_sp
867#     include "mpp_loc_generic.h90"
868#     undef ROUTINE_LOC
869#  undef DIM_3d
870#  undef OPERATION_MINLOC
871
872#  define OPERATION_MAXLOC
873#  define DIM_2d
874#     define ROUTINE_LOC           mpp_maxloc2d_sp
875#     include "mpp_loc_generic.h90"
876#     undef ROUTINE_LOC
877#  undef DIM_2d
878#  define DIM_3d
879#     define ROUTINE_LOC           mpp_maxloc3d_sp
880#     include "mpp_loc_generic.h90"
881#     undef ROUTINE_LOC
882#  undef DIM_3d
883#  undef OPERATION_MAXLOC
884#  undef SINGLE_PRECISION
885   !!
886   !!   ----   DOUBLE PRECISION VERSIONS
887   !!
888#  define OPERATION_MINLOC
889#  define DIM_2d
890#     define ROUTINE_LOC           mpp_minloc2d_dp
891#     include "mpp_loc_generic.h90"
892#     undef ROUTINE_LOC
893#  undef DIM_2d
894#  define DIM_3d
895#     define ROUTINE_LOC           mpp_minloc3d_dp
896#     include "mpp_loc_generic.h90"
897#     undef ROUTINE_LOC
898#  undef DIM_3d
899#  undef OPERATION_MINLOC
900
901#  define OPERATION_MAXLOC
902#  define DIM_2d
903#     define ROUTINE_LOC           mpp_maxloc2d_dp
904#     include "mpp_loc_generic.h90"
905#     undef ROUTINE_LOC
906#  undef DIM_2d
907#  define DIM_3d
908#     define ROUTINE_LOC           mpp_maxloc3d_dp
909#     include "mpp_loc_generic.h90"
910#     undef ROUTINE_LOC
911#  undef DIM_3d
912#  undef OPERATION_MAXLOC
913
914
915   SUBROUTINE mppsync()
916      !!----------------------------------------------------------------------
917      !!                  ***  routine mppsync  ***
918      !!
919      !! ** Purpose :   Massively parallel processors, synchroneous
920      !!
921      !!-----------------------------------------------------------------------
922      INTEGER :: ierror
923      !!-----------------------------------------------------------------------
924      !
925#if defined key_mpp_mpi
926      CALL mpi_barrier( mpi_comm_oce, ierror )
927#endif
928      !
929   END SUBROUTINE mppsync
930
931
932   SUBROUTINE mppstop( ld_abort ) 
933      !!----------------------------------------------------------------------
934      !!                  ***  routine mppstop  ***
935      !!
936      !! ** purpose :   Stop massively parallel processors method
937      !!
938      !!----------------------------------------------------------------------
939      LOGICAL, OPTIONAL, INTENT(in) :: ld_abort    ! source process number
940      LOGICAL ::   ll_abort
941      INTEGER ::   info
942      !!----------------------------------------------------------------------
943      ll_abort = .FALSE.
944      IF( PRESENT(ld_abort) ) ll_abort = ld_abort
945      !
946#if defined key_mpp_mpi
947      IF(ll_abort) THEN
948         CALL mpi_abort( MPI_COMM_WORLD )
949      ELSE
950         CALL mppsync
951         CALL mpi_finalize( info )
952      ENDIF
953#endif
954      IF( ll_abort ) STOP 123
955      !
956   END SUBROUTINE mppstop
957
958
959   SUBROUTINE mpp_comm_free( kcom )
960      !!----------------------------------------------------------------------
961      INTEGER, INTENT(in) ::   kcom
962      !!
963      INTEGER :: ierr
964      !!----------------------------------------------------------------------
965      !
966#if defined key_mpp_mpi
967      CALL MPI_COMM_FREE(kcom, ierr)
968#endif
969      !
970   END SUBROUTINE mpp_comm_free
971
972
973   SUBROUTINE mpp_ini_znl( kumout )
974      !!----------------------------------------------------------------------
975      !!               ***  routine mpp_ini_znl  ***
976      !!
977      !! ** Purpose :   Initialize special communicator for computing zonal sum
978      !!
979      !! ** Method  : - Look for processors in the same row
980      !!              - Put their number in nrank_znl
981      !!              - Create group for the znl processors
982      !!              - Create a communicator for znl processors
983      !!              - Determine if processor should write znl files
984      !!
985      !! ** output
986      !!      ndim_rank_znl = number of processors on the same row
987      !!      ngrp_znl = group ID for the znl processors
988      !!      ncomm_znl = communicator for the ice procs.
989      !!      n_znl_root = number (in the world) of proc 0 in the ice comm.
990      !!
991      !!----------------------------------------------------------------------
992      INTEGER, INTENT(in) ::   kumout   ! ocean.output logical units
993      !
994      INTEGER :: jproc      ! dummy loop integer
995      INTEGER :: ierr, ii   ! local integer
996      INTEGER, ALLOCATABLE, DIMENSION(:) ::   kwork
997      !!----------------------------------------------------------------------
998#if defined key_mpp_mpi
999      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_world     : ', ngrp_world
1000      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - mpi_comm_world : ', mpi_comm_world
1001      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - mpi_comm_oce   : ', mpi_comm_oce
1002      !
1003      ALLOCATE( kwork(jpnij), STAT=ierr )
1004      IF( ierr /= 0 ) CALL ctl_stop( 'STOP', 'mpp_ini_znl : failed to allocate 1D array of length jpnij')
1005
1006      IF( jpnj == 1 ) THEN
1007         ngrp_znl  = ngrp_world
1008         ncomm_znl = mpi_comm_oce
1009      ELSE
1010         !
1011         CALL MPI_ALLGATHER ( njmpp, 1, mpi_integer, kwork, 1, mpi_integer, mpi_comm_oce, ierr )
1012         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - kwork pour njmpp : ', kwork
1013         !-$$        CALL flush(numout)
1014         !
1015         ! Count number of processors on the same row
1016         ndim_rank_znl = 0
1017         DO jproc=1,jpnij
1018            IF ( kwork(jproc) == njmpp ) THEN
1019               ndim_rank_znl = ndim_rank_znl + 1
1020            ENDIF
1021         END DO
1022         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ndim_rank_znl : ', ndim_rank_znl
1023         !-$$        CALL flush(numout)
1024         ! Allocate the right size to nrank_znl
1025         IF (ALLOCATED (nrank_znl)) DEALLOCATE(nrank_znl)
1026         ALLOCATE(nrank_znl(ndim_rank_znl))
1027         ii = 0
1028         nrank_znl (:) = 0
1029         DO jproc=1,jpnij
1030            IF ( kwork(jproc) == njmpp) THEN
1031               ii = ii + 1
1032               nrank_znl(ii) = jproc -1
1033            ENDIF
1034         END DO
1035         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - nrank_znl : ', nrank_znl
1036         !-$$        CALL flush(numout)
1037
1038         ! Create the opa group
1039         CALL MPI_COMM_GROUP(mpi_comm_oce,ngrp_opa,ierr)
1040         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_opa : ', ngrp_opa
1041         !-$$        CALL flush(numout)
1042
1043         ! Create the znl group from the opa group
1044         CALL MPI_GROUP_INCL  ( ngrp_opa, ndim_rank_znl, nrank_znl, ngrp_znl, ierr )
1045         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_znl ', ngrp_znl
1046         !-$$        CALL flush(numout)
1047
1048         ! Create the znl communicator from the opa communicator, ie the pool of procs in the same row
1049         CALL MPI_COMM_CREATE ( mpi_comm_oce, ngrp_znl, ncomm_znl, ierr )
1050         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ncomm_znl ', ncomm_znl
1051         !-$$        CALL flush(numout)
1052         !
1053      END IF
1054
1055      ! Determines if processor if the first (starting from i=1) on the row
1056      IF ( jpni == 1 ) THEN
1057         l_znl_root = .TRUE.
1058      ELSE
1059         l_znl_root = .FALSE.
1060         kwork (1) = nimpp
1061         CALL mpp_min ( 'lib_mpp', kwork(1), kcom = ncomm_znl)
1062         IF ( nimpp == kwork(1)) l_znl_root = .TRUE.
1063      END IF
1064
1065      DEALLOCATE(kwork)
1066#endif
1067
1068   END SUBROUTINE mpp_ini_znl
1069
1070
1071   SUBROUTINE mpp_ini_north
1072      !!----------------------------------------------------------------------
1073      !!               ***  routine mpp_ini_north  ***
1074      !!
1075      !! ** Purpose :   Initialize special communicator for north folding
1076      !!      condition together with global variables needed in the mpp folding
1077      !!
1078      !! ** Method  : - Look for northern processors
1079      !!              - Put their number in nrank_north
1080      !!              - Create groups for the world processors and the north processors
1081      !!              - Create a communicator for northern processors
1082      !!
1083      !! ** output
1084      !!      njmppmax = njmpp for northern procs
1085      !!      ndim_rank_north = number of processors in the northern line
1086      !!      nrank_north (ndim_rank_north) = number  of the northern procs.
1087      !!      ngrp_world = group ID for the world processors
1088      !!      ngrp_north = group ID for the northern processors
1089      !!      ncomm_north = communicator for the northern procs.
1090      !!      north_root = number (in the world) of proc 0 in the northern comm.
1091      !!
1092      !!----------------------------------------------------------------------
1093      INTEGER ::   ierr
1094      INTEGER ::   jjproc
1095      INTEGER ::   ii, ji
1096      !!----------------------------------------------------------------------
1097      !
1098#if defined key_mpp_mpi
1099      njmppmax = MAXVAL( njmppt )
1100      !
1101      ! Look for how many procs on the northern boundary
1102      ndim_rank_north = 0
1103      DO jjproc = 1, jpni
1104         IF( nfproc(jjproc) /= -1 )   ndim_rank_north = ndim_rank_north + 1
1105      END DO
1106      !
1107      ! Allocate the right size to nrank_north
1108      IF (ALLOCATED (nrank_north)) DEALLOCATE(nrank_north)
1109      ALLOCATE( nrank_north(ndim_rank_north) )
1110
1111      ! Fill the nrank_north array with proc. number of northern procs.
1112      ! Note : the rank start at 0 in MPI
1113      ii = 0
1114      DO ji = 1, jpni
1115         IF ( nfproc(ji) /= -1   ) THEN
1116            ii=ii+1
1117            nrank_north(ii)=nfproc(ji)
1118         END IF
1119      END DO
1120      !
1121      ! create the world group
1122      CALL MPI_COMM_GROUP( mpi_comm_oce, ngrp_world, ierr )
1123      !
1124      ! Create the North group from the world group
1125      CALL MPI_GROUP_INCL( ngrp_world, ndim_rank_north, nrank_north, ngrp_north, ierr )
1126      !
1127      ! Create the North communicator , ie the pool of procs in the north group
1128      CALL MPI_COMM_CREATE( mpi_comm_oce, ngrp_north, ncomm_north, ierr )
1129      !
1130#endif
1131   END SUBROUTINE mpp_ini_north
1132
1133
1134   SUBROUTINE DDPDD_MPI( ydda, yddb, ilen, itype )
1135      !!---------------------------------------------------------------------
1136      !!   Routine DDPDD_MPI: used by reduction operator MPI_SUMDD
1137      !!
1138      !!   Modification of original codes written by David H. Bailey
1139      !!   This subroutine computes yddb(i) = ydda(i)+yddb(i)
1140      !!---------------------------------------------------------------------
1141      INTEGER                     , INTENT(in)    ::   ilen, itype
1142      COMPLEX(dp), DIMENSION(ilen), INTENT(in)    ::   ydda
1143      COMPLEX(dp), DIMENSION(ilen), INTENT(inout) ::   yddb
1144      !
1145      REAL(dp) :: zerr, zt1, zt2    ! local work variables
1146      INTEGER  :: ji, ztmp           ! local scalar
1147      !!---------------------------------------------------------------------
1148      !
1149      ztmp = itype   ! avoid compilation warning
1150      !
1151      DO ji=1,ilen
1152      ! Compute ydda + yddb using Knuth's trick.
1153         zt1  = real(ydda(ji)) + real(yddb(ji))
1154         zerr = zt1 - real(ydda(ji))
1155         zt2  = ((real(yddb(ji)) - zerr) + (real(ydda(ji)) - (zt1 - zerr))) &
1156                + aimag(ydda(ji)) + aimag(yddb(ji))
1157
1158         ! The result is zt1 + zt2, after normalization.
1159         yddb(ji) = cmplx ( zt1 + zt2, zt2 - ((zt1 + zt2) - zt1),wp )
1160      END DO
1161      !
1162   END SUBROUTINE DDPDD_MPI
1163
1164
1165   SUBROUTINE mpp_report( cdname, kpk, kpl, kpf, ld_lbc, ld_glb, ld_dlg )
1166      !!----------------------------------------------------------------------
1167      !!                  ***  routine mpp_report  ***
1168      !!
1169      !! ** Purpose :   report use of mpp routines per time-setp
1170      !!
1171      !!----------------------------------------------------------------------
1172      CHARACTER(len=*),           INTENT(in   ) ::   cdname      ! name of the calling subroutine
1173      INTEGER         , OPTIONAL, INTENT(in   ) ::   kpk, kpl, kpf
1174      LOGICAL         , OPTIONAL, INTENT(in   ) ::   ld_lbc, ld_glb, ld_dlg
1175      !!
1176      CHARACTER(len=128)                        ::   ccountname  ! name of a subroutine to count communications
1177      LOGICAL ::   ll_lbc, ll_glb, ll_dlg
1178      INTEGER ::    ji,  jj,  jk,  jh, jf, jcount   ! dummy loop indices
1179      !!----------------------------------------------------------------------
1180#if defined key_mpp_mpi
1181      !
1182      ll_lbc = .FALSE.
1183      IF( PRESENT(ld_lbc) ) ll_lbc = ld_lbc
1184      ll_glb = .FALSE.
1185      IF( PRESENT(ld_glb) ) ll_glb = ld_glb
1186      ll_dlg = .FALSE.
1187      IF( PRESENT(ld_dlg) ) ll_dlg = ld_dlg
1188      !
1189      ! find the smallest common frequency: default = frequency product, if multiple, choose the larger of the 2 frequency
1190      ncom_freq = ncom_fsbc
1191      !
1192      IF ( ncom_stp == nit000+ncom_freq ) THEN   ! avoid to count extra communications in potential initializations at nit000
1193         IF( ll_lbc ) THEN
1194            IF( .NOT. ALLOCATED(ncomm_sequence) ) ALLOCATE( ncomm_sequence(ncom_rec_max,2) )
1195            IF( .NOT. ALLOCATED(    crname_lbc) ) ALLOCATE(     crname_lbc(ncom_rec_max  ) )
1196            n_sequence_lbc = n_sequence_lbc + 1
1197            IF( n_sequence_lbc > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
1198            crname_lbc(n_sequence_lbc) = cdname     ! keep the name of the calling routine
1199            ncomm_sequence(n_sequence_lbc,1) = kpk*kpl   ! size of 3rd and 4th dimensions
1200            ncomm_sequence(n_sequence_lbc,2) = kpf       ! number of arrays to be treated (multi)
1201         ENDIF
1202         IF( ll_glb ) THEN
1203            IF( .NOT. ALLOCATED(crname_glb) ) ALLOCATE( crname_glb(ncom_rec_max) )
1204            n_sequence_glb = n_sequence_glb + 1
1205            IF( n_sequence_glb > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
1206            crname_glb(n_sequence_glb) = cdname     ! keep the name of the calling routine
1207         ENDIF
1208         IF( ll_dlg ) THEN
1209            IF( .NOT. ALLOCATED(crname_dlg) ) ALLOCATE( crname_dlg(ncom_rec_max) )
1210            n_sequence_dlg = n_sequence_dlg + 1
1211            IF( n_sequence_dlg > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
1212            crname_dlg(n_sequence_dlg) = cdname     ! keep the name of the calling routine
1213         ENDIF
1214      ELSE IF ( ncom_stp == nit000+2*ncom_freq ) THEN
1215         CALL ctl_opn( numcom, 'communication_report.txt', 'REPLACE', 'FORMATTED', 'SEQUENTIAL', -1, numout, .FALSE., narea )
1216         WRITE(numcom,*) ' '
1217         WRITE(numcom,*) ' ------------------------------------------------------------'
1218         WRITE(numcom,*) ' Communication pattern report (second oce+sbc+top time step):'
1219         WRITE(numcom,*) ' ------------------------------------------------------------'
1220         WRITE(numcom,*) ' '
1221         WRITE(numcom,'(A,I4)') ' Exchanged halos : ', n_sequence_lbc
1222         jj = 0; jk = 0; jf = 0; jh = 0
1223         DO ji = 1, n_sequence_lbc
1224            IF ( ncomm_sequence(ji,1) .GT. 1 ) jk = jk + 1
1225            IF ( ncomm_sequence(ji,2) .GT. 1 ) jf = jf + 1
1226            IF ( ncomm_sequence(ji,1) .GT. 1 .AND. ncomm_sequence(ji,2) .GT. 1 ) jj = jj + 1
1227            jh = MAX (jh, ncomm_sequence(ji,1)*ncomm_sequence(ji,2))
1228         END DO
1229         WRITE(numcom,'(A,I3)') ' 3D Exchanged halos : ', jk
1230         WRITE(numcom,'(A,I3)') ' Multi arrays exchanged halos : ', jf
1231         WRITE(numcom,'(A,I3)') '   from which 3D : ', jj
1232         WRITE(numcom,'(A,I10)') ' Array max size : ', jh*jpi*jpj
1233         WRITE(numcom,*) ' '
1234         WRITE(numcom,*) ' lbc_lnk called'
1235         DO ji = 1, n_sequence_lbc - 1
1236            IF ( crname_lbc(ji) /= 'already counted' ) THEN
1237               ccountname = crname_lbc(ji)
1238               crname_lbc(ji) = 'already counted'
1239               jcount = 1
1240               DO jj = ji + 1, n_sequence_lbc
1241                  IF ( ccountname ==  crname_lbc(jj) ) THEN
1242                     jcount = jcount + 1
1243                     crname_lbc(jj) = 'already counted'
1244                  END IF
1245               END DO
1246               WRITE(numcom,'(A, I4, A, A)') ' - ', jcount,' times by subroutine ', TRIM(ccountname)
1247            END IF
1248         END DO
1249         IF ( crname_lbc(n_sequence_lbc) /= 'already counted' ) THEN
1250            WRITE(numcom,'(A, I4, A, A)') ' - ', 1,' times by subroutine ', TRIM(crname_lbc(ncom_rec_max))
1251         END IF
1252         WRITE(numcom,*) ' '
1253         IF ( n_sequence_glb > 0 ) THEN
1254            WRITE(numcom,'(A,I4)') ' Global communications : ', n_sequence_glb
1255            jj = 1
1256            DO ji = 2, n_sequence_glb
1257               IF( crname_glb(ji-1) /= crname_glb(ji) ) THEN
1258                  WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_glb(ji-1))
1259                  jj = 0
1260               END IF
1261               jj = jj + 1 
1262            END DO
1263            WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_glb(n_sequence_glb))
1264            DEALLOCATE(crname_glb)
1265         ELSE
1266            WRITE(numcom,*) ' No MPI global communication '
1267         ENDIF
1268         WRITE(numcom,*) ' '
1269         IF ( n_sequence_dlg > 0 ) THEN
1270            WRITE(numcom,'(A,I4)') ' Delayed global communications : ', n_sequence_dlg
1271            jj = 1
1272            DO ji = 2, n_sequence_dlg
1273               IF( crname_dlg(ji-1) /= crname_dlg(ji) ) THEN
1274                  WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_dlg(ji-1))
1275                  jj = 0
1276               END IF
1277               jj = jj + 1 
1278            END DO
1279            WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_dlg(n_sequence_dlg))
1280            DEALLOCATE(crname_dlg)
1281         ELSE
1282            WRITE(numcom,*) ' No MPI delayed global communication '
1283         ENDIF
1284         WRITE(numcom,*) ' '
1285         WRITE(numcom,*) ' -----------------------------------------------'
1286         WRITE(numcom,*) ' '
1287         DEALLOCATE(ncomm_sequence)
1288         DEALLOCATE(crname_lbc)
1289      ENDIF
1290#endif
1291   END SUBROUTINE mpp_report
1292
1293   
1294   SUBROUTINE tic_tac (ld_tic, ld_global)
1295
1296    LOGICAL,           INTENT(IN) :: ld_tic
1297    LOGICAL, OPTIONAL, INTENT(IN) :: ld_global
1298    REAL(dp), DIMENSION(2), SAVE :: tic_wt
1299    REAL(dp),               SAVE :: tic_ct = 0._dp
1300    INTEGER :: ii
1301#if defined key_mpp_mpi
1302
1303    IF( ncom_stp <= nit000 ) RETURN
1304    IF( ncom_stp == nitend ) RETURN
1305    ii = 1
1306    IF( PRESENT( ld_global ) ) THEN
1307       IF( ld_global ) ii = 2
1308    END IF
1309   
1310    IF ( ld_tic ) THEN
1311       tic_wt(ii) = MPI_Wtime()                                                    ! start count tic->tac (waiting time)
1312       IF ( tic_ct > 0.0_dp ) compute_time = compute_time + MPI_Wtime() - tic_ct   ! cumulate count tac->tic
1313    ELSE
1314       waiting_time(ii) = waiting_time(ii) + MPI_Wtime() - tic_wt(ii)              ! cumulate count tic->tac
1315       tic_ct = MPI_Wtime()                                                        ! start count tac->tic (waiting time)
1316    ENDIF
1317#endif
1318   
1319   END SUBROUTINE tic_tac
1320
1321#if ! defined key_mpp_mpi
1322   SUBROUTINE mpi_wait(request, status, ierror)
1323      INTEGER                            , INTENT(in   ) ::   request
1324      INTEGER, DIMENSION(MPI_STATUS_SIZE), INTENT(  out) ::   status
1325      INTEGER                            , INTENT(  out) ::   ierror
1326   END SUBROUTINE mpi_wait
1327
1328   
1329   FUNCTION MPI_Wtime()
1330      REAL(wp) ::  MPI_Wtime
1331      MPI_Wtime = -1.
1332   END FUNCTION MPI_Wtime
1333#endif
1334
1335   !!----------------------------------------------------------------------
1336   !!   ctl_stop, ctl_warn, get_unit, ctl_opn, ctl_nam, load_nml   routines
1337   !!----------------------------------------------------------------------
1338
1339   SUBROUTINE ctl_stop( cd1, cd2, cd3, cd4, cd5 ,   &
1340      &                 cd6, cd7, cd8, cd9, cd10 )
1341      !!----------------------------------------------------------------------
1342      !!                  ***  ROUTINE  stop_opa  ***
1343      !!
1344      !! ** Purpose :   print in ocean.outpput file a error message and
1345      !!                increment the error number (nstop) by one.
1346      !!----------------------------------------------------------------------
1347      CHARACTER(len=*), INTENT(in   )           ::   cd1
1348      CHARACTER(len=*), INTENT(in   ), OPTIONAL ::        cd2, cd3, cd4, cd5
1349      CHARACTER(len=*), INTENT(in   ), OPTIONAL ::   cd6, cd7, cd8, cd9, cd10
1350      !
1351      CHARACTER(LEN=8) ::   clfmt            ! writing format
1352      INTEGER          ::   inum
1353      !!----------------------------------------------------------------------
1354      !
1355      nstop = nstop + 1
1356      !
1357      IF( cd1 == 'STOP' .AND. narea /= 1 ) THEN    ! Immediate stop: add an arror message in 'ocean.output' file
1358         CALL ctl_opn( inum, 'ocean.output', 'APPEND', 'FORMATTED', 'SEQUENTIAL', -1, 6, .FALSE. )
1359         WRITE(inum,*)
1360         WRITE(inum,*) ' ==>>>   Look for "E R R O R" messages in all existing *ocean.output* files'
1361         CLOSE(inum)
1362      ENDIF
1363      IF( numout == 6 ) THEN                       ! force to open ocean.output file if not already opened
1364         CALL ctl_opn( numout, 'ocean.output', 'REPLACE', 'FORMATTED', 'SEQUENTIAL', -1, -1, .FALSE., narea )
1365      ENDIF
1366      !
1367                            WRITE(numout,*)
1368                            WRITE(numout,*) ' ===>>> : E R R O R'
1369                            WRITE(numout,*)
1370                            WRITE(numout,*) '         ==========='
1371                            WRITE(numout,*)
1372                            WRITE(numout,*) TRIM(cd1)
1373      IF( PRESENT(cd2 ) )   WRITE(numout,*) TRIM(cd2)
1374      IF( PRESENT(cd3 ) )   WRITE(numout,*) TRIM(cd3)
1375      IF( PRESENT(cd4 ) )   WRITE(numout,*) TRIM(cd4)
1376      IF( PRESENT(cd5 ) )   WRITE(numout,*) TRIM(cd5)
1377      IF( PRESENT(cd6 ) )   WRITE(numout,*) TRIM(cd6)
1378      IF( PRESENT(cd7 ) )   WRITE(numout,*) TRIM(cd7)
1379      IF( PRESENT(cd8 ) )   WRITE(numout,*) TRIM(cd8)
1380      IF( PRESENT(cd9 ) )   WRITE(numout,*) TRIM(cd9)
1381      IF( PRESENT(cd10) )   WRITE(numout,*) TRIM(cd10)
1382                            WRITE(numout,*)
1383      !
1384                               CALL FLUSH(numout    )
1385      IF( numstp     /= -1 )   CALL FLUSH(numstp    )
1386      IF( numrun     /= -1 )   CALL FLUSH(numrun    )
1387      IF( numevo_ice /= -1 )   CALL FLUSH(numevo_ice)
1388      !
1389      IF( cd1 == 'STOP' ) THEN
1390         WRITE(numout,*) 
1391         WRITE(numout,*)  'huge E-R-R-O-R : immediate stop'
1392         WRITE(numout,*) 
1393         CALL FLUSH(numout)
1394         CALL SLEEP(60)   ! make sure that all output and abort files are written by all cores. 60s should be enough...
1395         CALL mppstop( ld_abort = .true. )
1396      ENDIF
1397      !
1398   END SUBROUTINE ctl_stop
1399
1400
1401   SUBROUTINE ctl_warn( cd1, cd2, cd3, cd4, cd5,   &
1402      &                 cd6, cd7, cd8, cd9, cd10 )
1403      !!----------------------------------------------------------------------
1404      !!                  ***  ROUTINE  stop_warn  ***
1405      !!
1406      !! ** Purpose :   print in ocean.outpput file a error message and
1407      !!                increment the warning number (nwarn) by one.
1408      !!----------------------------------------------------------------------
1409      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd1, cd2, cd3, cd4, cd5
1410      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd6, cd7, cd8, cd9, cd10
1411      !!----------------------------------------------------------------------
1412      !
1413      nwarn = nwarn + 1
1414      !
1415      IF(lwp) THEN
1416                               WRITE(numout,*)
1417                               WRITE(numout,*) ' ===>>> : W A R N I N G'
1418                               WRITE(numout,*)
1419                               WRITE(numout,*) '         ==============='
1420                               WRITE(numout,*)
1421         IF( PRESENT(cd1 ) )   WRITE(numout,*) TRIM(cd1)
1422         IF( PRESENT(cd2 ) )   WRITE(numout,*) TRIM(cd2)
1423         IF( PRESENT(cd3 ) )   WRITE(numout,*) TRIM(cd3)
1424         IF( PRESENT(cd4 ) )   WRITE(numout,*) TRIM(cd4)
1425         IF( PRESENT(cd5 ) )   WRITE(numout,*) TRIM(cd5)
1426         IF( PRESENT(cd6 ) )   WRITE(numout,*) TRIM(cd6)
1427         IF( PRESENT(cd7 ) )   WRITE(numout,*) TRIM(cd7)
1428         IF( PRESENT(cd8 ) )   WRITE(numout,*) TRIM(cd8)
1429         IF( PRESENT(cd9 ) )   WRITE(numout,*) TRIM(cd9)
1430         IF( PRESENT(cd10) )   WRITE(numout,*) TRIM(cd10)
1431                               WRITE(numout,*)
1432      ENDIF
1433      CALL FLUSH(numout)
1434      !
1435   END SUBROUTINE ctl_warn
1436
1437
1438   SUBROUTINE ctl_opn( knum, cdfile, cdstat, cdform, cdacce, klengh, kout, ldwp, karea )
1439      !!----------------------------------------------------------------------
1440      !!                  ***  ROUTINE ctl_opn  ***
1441      !!
1442      !! ** Purpose :   Open file and check if required file is available.
1443      !!
1444      !! ** Method  :   Fortan open
1445      !!----------------------------------------------------------------------
1446      INTEGER          , INTENT(  out) ::   knum      ! logical unit to open
1447      CHARACTER(len=*) , INTENT(in   ) ::   cdfile    ! file name to open
1448      CHARACTER(len=*) , INTENT(in   ) ::   cdstat    ! disposition specifier
1449      CHARACTER(len=*) , INTENT(in   ) ::   cdform    ! formatting specifier
1450      CHARACTER(len=*) , INTENT(in   ) ::   cdacce    ! access specifier
1451      INTEGER          , INTENT(in   ) ::   klengh    ! record length
1452      INTEGER          , INTENT(in   ) ::   kout      ! number of logical units for write
1453      LOGICAL          , INTENT(in   ) ::   ldwp      ! boolean term for print
1454      INTEGER, OPTIONAL, INTENT(in   ) ::   karea     ! proc number
1455      !
1456      CHARACTER(len=80) ::   clfile
1457      CHARACTER(LEN=10) ::   clfmt            ! writing format
1458      INTEGER           ::   iost
1459      INTEGER           ::   idg              ! number of digits
1460      !!----------------------------------------------------------------------
1461      !
1462      ! adapt filename
1463      ! ----------------
1464      clfile = TRIM(cdfile)
1465      IF( PRESENT( karea ) ) THEN
1466         IF( karea > 1 ) THEN
1467            ! Warning: jpnij is maybe not already defined when calling ctl_opn -> use mppsize instead of jpnij
1468            idg = MAX( INT(LOG10(REAL(MAX(1,mppsize-1),wp))) + 1, 4 )      ! how many digits to we need to write? min=4, max=9
1469            WRITE(clfmt, "('(a,a,i', i1, '.', i1, ')')") idg, idg          ! '(a,a,ix.x)'
1470            WRITE(clfile, clfmt) TRIM(clfile), '_', karea-1
1471         ENDIF
1472      ENDIF
1473#if defined key_agrif
1474      IF( .NOT. Agrif_Root() )   clfile = TRIM(Agrif_CFixed())//'_'//TRIM(clfile)
1475      knum=Agrif_Get_Unit()
1476#else
1477      knum=get_unit()
1478#endif
1479      IF( TRIM(cdfile) == '/dev/null' )   clfile = TRIM(cdfile)   ! force the use of /dev/null
1480      !
1481      IF(       cdacce(1:6) == 'DIRECT' )  THEN   ! cdacce has always more than 6 characters
1482         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS=cdstat, RECL=klengh         , ERR=100, IOSTAT=iost )
1483      ELSE IF( TRIM(cdstat) == 'APPEND' )  THEN   ! cdstat can have less than 6 characters
1484         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS='UNKNOWN', POSITION='APPEND', ERR=100, IOSTAT=iost )
1485      ELSE
1486         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS=cdstat                      , ERR=100, IOSTAT=iost )
1487      ENDIF
1488      IF( iost /= 0 .AND. TRIM(clfile) == '/dev/null' ) &   ! for windows
1489         &  OPEN(UNIT=knum,FILE='NUL', FORM=cdform, ACCESS=cdacce, STATUS=cdstat                      , ERR=100, IOSTAT=iost )   
1490      IF( iost == 0 ) THEN
1491         IF(ldwp .AND. kout > 0) THEN
1492            WRITE(kout,*) '     file   : ', TRIM(clfile),' open ok'
1493            WRITE(kout,*) '     unit   = ', knum
1494            WRITE(kout,*) '     status = ', cdstat
1495            WRITE(kout,*) '     form   = ', cdform
1496            WRITE(kout,*) '     access = ', cdacce
1497            WRITE(kout,*)
1498         ENDIF
1499      ENDIF
1500100   CONTINUE
1501      IF( iost /= 0 ) THEN
1502         WRITE(ctmp1,*) ' ===>>>> : bad opening file: ', TRIM(clfile)
1503         WRITE(ctmp2,*) ' =======   ===  '
1504         WRITE(ctmp3,*) '           unit   = ', knum
1505         WRITE(ctmp4,*) '           status = ', cdstat
1506         WRITE(ctmp5,*) '           form   = ', cdform
1507         WRITE(ctmp6,*) '           access = ', cdacce
1508         WRITE(ctmp7,*) '           iostat = ', iost
1509         WRITE(ctmp8,*) '           we stop. verify the file '
1510         CALL ctl_stop( 'STOP', ctmp1, ctmp2, ctmp3, ctmp4, ctmp5, ctmp6, ctmp7, ctmp8 )
1511      ENDIF
1512      !
1513   END SUBROUTINE ctl_opn
1514
1515
1516   SUBROUTINE ctl_nam ( kios, cdnam )
1517      !!----------------------------------------------------------------------
1518      !!                  ***  ROUTINE ctl_nam  ***
1519      !!
1520      !! ** Purpose :   Informations when error while reading a namelist
1521      !!
1522      !! ** Method  :   Fortan open
1523      !!----------------------------------------------------------------------
1524      INTEGER                                , INTENT(inout) ::   kios    ! IO status after reading the namelist
1525      CHARACTER(len=*)                       , INTENT(in   ) ::   cdnam   ! group name of namelist for which error occurs
1526      !
1527      CHARACTER(len=5) ::   clios   ! string to convert iostat in character for print
1528      !!----------------------------------------------------------------------
1529      !
1530      WRITE (clios, '(I5.0)')   kios
1531      IF( kios < 0 ) THEN         
1532         CALL ctl_warn( 'end of record or file while reading namelist '   &
1533            &           // TRIM(cdnam) // ' iostat = ' // TRIM(clios) )
1534      ENDIF
1535      !
1536      IF( kios > 0 ) THEN
1537         CALL ctl_stop( 'misspelled variable in namelist '   &
1538            &           // TRIM(cdnam) // ' iostat = ' // TRIM(clios) )
1539      ENDIF
1540      kios = 0
1541      !
1542   END SUBROUTINE ctl_nam
1543
1544
1545   INTEGER FUNCTION get_unit()
1546      !!----------------------------------------------------------------------
1547      !!                  ***  FUNCTION  get_unit  ***
1548      !!
1549      !! ** Purpose :   return the index of an unused logical unit
1550      !!----------------------------------------------------------------------
1551      LOGICAL :: llopn
1552      !!----------------------------------------------------------------------
1553      !
1554      get_unit = 15   ! choose a unit that is big enough then it is not already used in NEMO
1555      llopn = .TRUE.
1556      DO WHILE( (get_unit < 998) .AND. llopn )
1557         get_unit = get_unit + 1
1558         INQUIRE( unit = get_unit, opened = llopn )
1559      END DO
1560      IF( (get_unit == 999) .AND. llopn ) THEN
1561         CALL ctl_stop( 'STOP', 'get_unit: All logical units until 999 are used...' )
1562      ENDIF
1563      !
1564   END FUNCTION get_unit
1565
1566   SUBROUTINE load_nml( cdnambuff , cdnamfile, kout, ldwp)
1567      CHARACTER(LEN=:)    , ALLOCATABLE, INTENT(INOUT) :: cdnambuff
1568      CHARACTER(LEN=*), INTENT(IN )                :: cdnamfile
1569      CHARACTER(LEN=256)                           :: chline
1570      CHARACTER(LEN=1)                             :: csp
1571      INTEGER, INTENT(IN)                          :: kout
1572      LOGICAL, INTENT(IN)                          :: ldwp  !: .true. only for the root broadcaster
1573      INTEGER                                      :: itot, iun, iltc, inl, ios, itotsav
1574      !
1575      !csp = NEW_LINE('A')
1576      ! a new line character is the best seperator but some systems (e.g.Cray)
1577      ! seem to terminate namelist reads from internal files early if they
1578      ! encounter new-lines. Use a single space for safety.
1579      csp = ' '
1580      !
1581      ! Check if the namelist buffer has already been allocated. Return if it has.
1582      !
1583      IF ( ALLOCATED( cdnambuff ) ) RETURN
1584      IF( ldwp ) THEN
1585         !
1586         ! Open namelist file
1587         !
1588         CALL ctl_opn( iun, cdnamfile, 'OLD', 'FORMATTED', 'SEQUENTIAL', -1, kout, ldwp )
1589         !
1590         ! First pass: count characters excluding comments and trimable white space
1591         !
1592         itot=0
1593     10  READ(iun,'(A256)',END=20,ERR=20) chline
1594         iltc = LEN_TRIM(chline)
1595         IF ( iltc.GT.0 ) THEN
1596          inl = INDEX(chline, '!') 
1597          IF( inl.eq.0 ) THEN
1598           itot = itot + iltc + 1                                ! +1 for the newline character
1599          ELSEIF( inl.GT.0 .AND. LEN_TRIM( chline(1:inl-1) ).GT.0 ) THEN
1600           itot = itot + inl                                  !  includes +1 for the newline character
1601          ENDIF
1602         ENDIF
1603         GOTO 10
1604     20  CONTINUE
1605         !
1606         ! Allocate text cdnambuff for condensed namelist
1607         !
1608!$AGRIF_DO_NOT_TREAT
1609         ALLOCATE( CHARACTER(LEN=itot) :: cdnambuff )
1610!$AGRIF_END_DO_NOT_TREAT
1611         itotsav = itot
1612         !
1613         ! Second pass: read and transfer pruned characters into cdnambuff
1614         !
1615         REWIND(iun)
1616         itot=1
1617     30  READ(iun,'(A256)',END=40,ERR=40) chline
1618         iltc = LEN_TRIM(chline)
1619         IF ( iltc.GT.0 ) THEN
1620          inl = INDEX(chline, '!')
1621          IF( inl.eq.0 ) THEN
1622           inl = iltc
1623          ELSE
1624           inl = inl - 1
1625          ENDIF
1626          IF( inl.GT.0 .AND. LEN_TRIM( chline(1:inl) ).GT.0 ) THEN
1627             cdnambuff(itot:itot+inl-1) = chline(1:inl)
1628             WRITE( cdnambuff(itot+inl:itot+inl), '(a)' ) csp
1629             itot = itot + inl + 1
1630          ENDIF
1631         ENDIF
1632         GOTO 30
1633     40  CONTINUE
1634         itot = itot - 1
1635         IF( itotsav .NE. itot ) WRITE(*,*) 'WARNING in load_nml. Allocated ',itotsav,' for read buffer; but used ',itot
1636         !
1637         ! Close namelist file
1638         !
1639         CLOSE(iun)
1640         !write(*,'(32A)') cdnambuff
1641      ENDIF
1642#if defined key_mpp_mpi
1643      CALL mpp_bcast_nml( cdnambuff, itot )
1644#endif
1645  END SUBROUTINE load_nml
1646
1647
1648   !!----------------------------------------------------------------------
1649END MODULE lib_mpp
Note: See TracBrowser for help on using the repository browser.