New URL for NEMO forge!   http://forge.nemo-ocean.eu

Since March 2022 along with NEMO 4.2 release, the code development moved to a self-hosted GitLab.
This present forge is now archived and remained online for history.
lib_mpp.F90 in NEMO/branches/2019/dev_r10984_HPC-13_IRRMANN_BDY_optimization/src/OCE/LBC – NEMO

source: NEMO/branches/2019/dev_r10984_HPC-13_IRRMANN_BDY_optimization/src/OCE/LBC/lib_mpp.F90 @ 11192

Last change on this file since 11192 was 11192, checked in by smasson, 5 years ago

dev_r10984_HPC-13 : reorganization of lbclnk, part 1: simpler mpp_lnk_generic.h90 supress lbc_lnk_generic.h90, see #2285

  • Property svn:keywords set to Id
File size: 65.2 KB
Line 
1MODULE lib_mpp
2   !!======================================================================
3   !!                       ***  MODULE  lib_mpp  ***
4   !! Ocean numerics:  massively parallel processing library
5   !!=====================================================================
6   !! History :  OPA  !  1994  (M. Guyon, J. Escobar, M. Imbard)  Original code
7   !!            7.0  !  1997  (A.M. Treguier)  SHMEM additions
8   !!            8.0  !  1998  (M. Imbard, J. Escobar, L. Colombet ) SHMEM and MPI
9   !!                 !  1998  (J.M. Molines) Open boundary conditions
10   !!   NEMO     1.0  !  2003  (J.M. Molines, G. Madec)  F90, free form
11   !!                 !  2003  (J.M. Molines) add mpp_ini_north(_3d,_2d)
12   !!             -   !  2004  (R. Bourdalle Badie)  isend option in mpi
13   !!                 !  2004  (J.M. Molines) minloc, maxloc
14   !!             -   !  2005  (G. Madec, S. Masson)  npolj=5,6 F-point & ice cases
15   !!             -   !  2005  (R. Redler) Replacement of MPI_COMM_WORLD except for MPI_Abort
16   !!             -   !  2005  (R. Benshila, G. Madec)  add extra halo case
17   !!             -   !  2008  (R. Benshila) add mpp_ini_ice
18   !!            3.2  !  2009  (R. Benshila) SHMEM suppression, north fold in lbc_nfd
19   !!            3.2  !  2009  (O. Marti)    add mpp_ini_znl
20   !!            4.0  !  2011  (G. Madec)  move ctl_ routines from in_out_manager
21   !!            3.5  !  2012  (S.Mocavero, I. Epicoco) Add mpp_lnk_bdy_3d/2d routines to optimize the BDY comm.
22   !!            3.5  !  2013  (C. Ethe, G. Madec)  message passing arrays as local variables
23   !!            3.5  !  2013  (S.Mocavero, I.Epicoco - CMCC) north fold optimizations
24   !!            3.6  !  2015  (O. Tintó and M. Castrillo - BSC) Added '_multiple' case for 2D lbc and max
25   !!            4.0  !  2017  (G. Madec) automatique allocation of array argument (use any 3rd dimension)
26   !!             -   !  2017  (G. Madec) create generic.h90 files to generate all lbc and north fold routines
27   !!----------------------------------------------------------------------
28
29   !!----------------------------------------------------------------------
30   !!   ctl_stop      : update momentum and tracer Kz from a tke scheme
31   !!   ctl_warn      : initialization, namelist read, and parameters control
32   !!   ctl_opn       : Open file and check if required file is available.
33   !!   ctl_nam       : Prints informations when an error occurs while reading a namelist
34   !!   get_unit      : give the index of an unused logical unit
35   !!----------------------------------------------------------------------
36   !!----------------------------------------------------------------------
37   !!   mynode        : indentify the processor unit
38   !!   mpp_lnk       : interface (defined in lbclnk) for message passing of 2d or 3d arrays (mpp_lnk_2d, mpp_lnk_3d)
39   !!   mpp_lnk_icb   : interface for message passing of 2d arrays with extra halo for icebergs (mpp_lnk_2d_icb)
40   !!   mpprecv       :
41   !!   mppsend       :
42   !!   mppscatter    :
43   !!   mppgather     :
44   !!   mpp_min       : generic interface for mppmin_int , mppmin_a_int , mppmin_real, mppmin_a_real
45   !!   mpp_max       : generic interface for mppmax_int , mppmax_a_int , mppmax_real, mppmax_a_real
46   !!   mpp_sum       : generic interface for mppsum_int , mppsum_a_int , mppsum_real, mppsum_a_real
47   !!   mpp_minloc    :
48   !!   mpp_maxloc    :
49   !!   mppsync       :
50   !!   mppstop       :
51   !!   mpp_ini_north : initialisation of north fold
52   !!   mpp_lbc_north_icb : alternative to mpp_nfd for extra outer halo with icebergs
53   !!----------------------------------------------------------------------
54   USE dom_oce        ! ocean space and time domain
55   USE in_out_manager ! I/O manager
56
57   IMPLICIT NONE
58   PRIVATE
59   !
60   PUBLIC   ctl_stop, ctl_warn, get_unit, ctl_opn, ctl_nam
61   PUBLIC   mynode, mppstop, mppsync, mpp_comm_free
62   PUBLIC   mpp_ini_north
63   PUBLIC   mpp_min, mpp_max, mpp_sum, mpp_minloc, mpp_maxloc
64   PUBLIC   mpp_delay_max, mpp_delay_sum, mpp_delay_rcv
65   PUBLIC   mppscatter, mppgather
66   PUBLIC   mpp_ini_znl
67   PUBLIC   mppsend, mpprecv                          ! needed by TAM and ICB routines
68   PUBLIC   mpp_report
69   PUBLIC   tic_tac
70   
71   !! * Interfaces
72   !! define generic interface for these routine as they are called sometimes
73   !! with scalar arguments instead of array arguments, which causes problems
74   !! for the compilation on AIX system as well as NEC and SGI. Ok on COMPACQ
75   INTERFACE mpp_min
76      MODULE PROCEDURE mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real
77   END INTERFACE
78   INTERFACE mpp_max
79      MODULE PROCEDURE mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real
80   END INTERFACE
81   INTERFACE mpp_sum
82      MODULE PROCEDURE mppsum_a_int, mppsum_int, mppsum_a_real, mppsum_real,   &
83         &             mppsum_realdd, mppsum_a_realdd
84   END INTERFACE
85   INTERFACE mpp_minloc
86      MODULE PROCEDURE mpp_minloc2d ,mpp_minloc3d
87   END INTERFACE
88   INTERFACE mpp_maxloc
89      MODULE PROCEDURE mpp_maxloc2d ,mpp_maxloc3d
90   END INTERFACE
91
92   !! ========================= !!
93   !!  MPI  variable definition !!
94   !! ========================= !!
95#if   defined key_mpp_mpi
96!$AGRIF_DO_NOT_TREAT
97   INCLUDE 'mpif.h'
98!$AGRIF_END_DO_NOT_TREAT
99   LOGICAL, PUBLIC, PARAMETER ::   lk_mpp = .TRUE.    !: mpp flag
100#else   
101   INTEGER, PUBLIC, PARAMETER ::   MPI_STATUS_SIZE = 1
102   INTEGER, PUBLIC, PARAMETER ::   MPI_DOUBLE_PRECISION = 8
103   LOGICAL, PUBLIC, PARAMETER ::   lk_mpp = .FALSE.    !: mpp flag
104#endif
105
106   INTEGER, PARAMETER         ::   nprocmax = 2**10   ! maximun dimension (required to be a power of 2)
107
108   INTEGER, PUBLIC ::   mppsize        ! number of process
109   INTEGER, PUBLIC ::   mpprank        ! process number  [ 0 - size-1 ]
110!$AGRIF_DO_NOT_TREAT
111   INTEGER, PUBLIC ::   mpi_comm_oce   ! opa local communicator
112!$AGRIF_END_DO_NOT_TREAT
113
114   INTEGER :: MPI_SUMDD
115
116   ! variables used for zonal integration
117   INTEGER, PUBLIC ::   ncomm_znl       !: communicator made by the processors on the same zonal average
118   LOGICAL, PUBLIC ::   l_znl_root      !: True on the 'left'most processor on the same row
119   INTEGER         ::   ngrp_znl        !  group ID for the znl processors
120   INTEGER         ::   ndim_rank_znl   !  number of processors on the same zonal average
121   INTEGER, DIMENSION(:), ALLOCATABLE, SAVE ::   nrank_znl  ! dimension ndim_rank_znl, number of the procs into the same znl domain
122
123   ! North fold condition in mpp_mpi with jpni > 1 (PUBLIC for TAM)
124   INTEGER, PUBLIC ::   ngrp_world        !: group ID for the world processors
125   INTEGER, PUBLIC ::   ngrp_opa          !: group ID for the opa processors
126   INTEGER, PUBLIC ::   ngrp_north        !: group ID for the northern processors (to be fold)
127   INTEGER, PUBLIC ::   ncomm_north       !: communicator made by the processors belonging to ngrp_north
128   INTEGER, PUBLIC ::   ndim_rank_north   !: number of 'sea' processor in the northern line (can be /= jpni !)
129   INTEGER, PUBLIC ::   njmppmax          !: value of njmpp for the processors of the northern line
130   INTEGER, PUBLIC ::   north_root        !: number (in the comm_opa) of proc 0 in the northern comm
131   INTEGER, PUBLIC, DIMENSION(:), ALLOCATABLE, SAVE ::   nrank_north   !: dimension ndim_rank_north
132
133   ! Type of send : standard, buffered, immediate
134   CHARACTER(len=1), PUBLIC ::   cn_mpi_send        !: type od mpi send/recieve (S=standard, B=bsend, I=isend)
135   LOGICAL         , PUBLIC ::   l_isend = .FALSE.  !: isend use indicator (T if cn_mpi_send='I')
136   INTEGER         , PUBLIC ::   nn_buffer          !: size of the buffer in case of mpi_bsend
137
138   ! Communications summary report
139   CHARACTER(len=128), DIMENSION(:), ALLOCATABLE ::   crname_lbc                   !: names of lbc_lnk calling routines
140   CHARACTER(len=128), DIMENSION(:), ALLOCATABLE ::   crname_glb                   !: names of global comm calling routines
141   CHARACTER(len=128), DIMENSION(:), ALLOCATABLE ::   crname_dlg                   !: names of delayed global comm calling routines
142   INTEGER, PUBLIC                               ::   ncom_stp = 0                 !: copy of time step # istp
143   INTEGER, PUBLIC                               ::   ncom_fsbc = 1                !: copy of sbc time step # nn_fsbc
144   INTEGER, PUBLIC                               ::   ncom_dttrc = 1               !: copy of top time step # nn_dttrc
145   INTEGER, PUBLIC                               ::   ncom_freq                    !: frequency of comm diagnostic
146   INTEGER, PUBLIC , DIMENSION(:,:), ALLOCATABLE ::   ncomm_sequence               !: size of communicated arrays (halos)
147   INTEGER, PARAMETER, PUBLIC                    ::   ncom_rec_max = 5000          !: max number of communication record
148   INTEGER, PUBLIC                               ::   n_sequence_lbc = 0           !: # of communicated arraysvia lbc
149   INTEGER, PUBLIC                               ::   n_sequence_glb = 0           !: # of global communications
150   INTEGER, PUBLIC                               ::   n_sequence_dlg = 0           !: # of delayed global communications
151   INTEGER, PUBLIC                               ::   numcom = -1                  !: logical unit for communicaton report
152   LOGICAL, PUBLIC                               ::   l_full_nf_update = .TRUE.    !: logical for a full (2lines) update of bc at North fold report
153   INTEGER,                    PARAMETER, PUBLIC ::   nbdelay = 2       !: number of delayed operations
154   !: name (used as id) of allreduce-delayed operations
155   ! Warning: we must use the same character length in an array constructor (at least for gcc compiler)
156   CHARACTER(len=32), DIMENSION(nbdelay), PUBLIC ::   c_delaylist = (/ 'cflice', 'fwb   ' /)
157   !: component name where the allreduce-delayed operation is performed
158   CHARACTER(len=3),  DIMENSION(nbdelay), PUBLIC ::   c_delaycpnt = (/ 'ICE'   , 'OCE' /)
159   TYPE, PUBLIC ::   DELAYARR
160      REAL(   wp), POINTER, DIMENSION(:) ::  z1d => NULL()
161      COMPLEX(wp), POINTER, DIMENSION(:) ::  y1d => NULL()
162   END TYPE DELAYARR
163   TYPE( DELAYARR ), DIMENSION(nbdelay), PUBLIC, SAVE  ::   todelay         !: must have SAVE for default initialization of DELAYARR
164   INTEGER,          DIMENSION(nbdelay), PUBLIC        ::   ndelayid = -1   !: mpi request id of the delayed operations
165
166   ! timing summary report
167   REAL(wp), DIMENSION(2), PUBLIC ::  waiting_time = 0._wp
168   REAL(wp)              , PUBLIC ::  compute_time = 0._wp, elapsed_time = 0._wp
169   
170   REAL(wp), DIMENSION(:), ALLOCATABLE, SAVE ::   tampon   ! buffer in case of bsend
171
172   LOGICAL, PUBLIC ::   ln_nnogather                !: namelist control of northfold comms
173   LOGICAL, PUBLIC ::   l_north_nogather = .FALSE.  !: internal control of northfold comms
174
175   INTEGER, PUBLIC, PARAMETER ::   jpfillnothing = 1
176   INTEGER, PUBLIC, PARAMETER ::   jpfillcst     = 2
177   INTEGER, PUBLIC, PARAMETER ::   jpfillcopy    = 3
178   INTEGER, PUBLIC, PARAMETER ::   jpfillperio   = 4
179   INTEGER, PUBLIC, PARAMETER ::   jpfillmpi     = 5
180   
181   !!----------------------------------------------------------------------
182   !! NEMO/OCE 4.0 , NEMO Consortium (2018)
183   !! $Id$
184   !! Software governed by the CeCILL license (see ./LICENSE)
185   !!----------------------------------------------------------------------
186CONTAINS
187
188   FUNCTION mynode( ldtxt, ldname, kumnam_ref, kumnam_cfg, kumond, kstop, localComm )
189      !!----------------------------------------------------------------------
190      !!                  ***  routine mynode  ***
191      !!
192      !! ** Purpose :   Find processor unit
193      !!----------------------------------------------------------------------
194      CHARACTER(len=*),DIMENSION(:), INTENT(  out) ::   ldtxt        !
195      CHARACTER(len=*)             , INTENT(in   ) ::   ldname       !
196      INTEGER                      , INTENT(in   ) ::   kumnam_ref   ! logical unit for reference namelist
197      INTEGER                      , INTENT(in   ) ::   kumnam_cfg   ! logical unit for configuration namelist
198      INTEGER                      , INTENT(inout) ::   kumond       ! logical unit for namelist output
199      INTEGER                      , INTENT(inout) ::   kstop        ! stop indicator
200      INTEGER         , OPTIONAL   , INTENT(in   ) ::   localComm    !
201      !
202      INTEGER ::   mynode, ierr, code, ji, ii, ios
203      LOGICAL ::   mpi_was_called
204      !
205      NAMELIST/nammpp/ cn_mpi_send, nn_buffer, jpni, jpnj, ln_nnogather
206      !!----------------------------------------------------------------------
207#if defined key_mpp_mpi
208      !
209      ii = 1
210      WRITE(ldtxt(ii),*)                                                                  ;   ii = ii + 1
211      WRITE(ldtxt(ii),*) 'mynode : mpi initialisation'                                    ;   ii = ii + 1
212      WRITE(ldtxt(ii),*) '~~~~~~ '                                                        ;   ii = ii + 1
213      !
214      REWIND( kumnam_ref )              ! Namelist nammpp in reference namelist: mpi variables
215      READ  ( kumnam_ref, nammpp, IOSTAT = ios, ERR = 901)
216901   IF( ios /= 0 )   CALL ctl_nam ( ios , 'nammpp in reference namelist', lwp )
217      !
218      REWIND( kumnam_cfg )              ! Namelist nammpp in configuration namelist: mpi variables
219      READ  ( kumnam_cfg, nammpp, IOSTAT = ios, ERR = 902 )
220902   IF( ios >  0 )   CALL ctl_nam ( ios , 'nammpp in configuration namelist', lwp )
221      !
222      !                              ! control print
223      WRITE(ldtxt(ii),*) '   Namelist nammpp'                                             ;   ii = ii + 1
224      WRITE(ldtxt(ii),*) '      mpi send type          cn_mpi_send = ', cn_mpi_send       ;   ii = ii + 1
225      WRITE(ldtxt(ii),*) '      size exported buffer   nn_buffer   = ', nn_buffer,' bytes';   ii = ii + 1
226      !
227      IF( jpni < 1 .OR. jpnj < 1  ) THEN
228         WRITE(ldtxt(ii),*) '      jpni and jpnj will be calculated automatically' ;   ii = ii + 1
229      ELSE
230         WRITE(ldtxt(ii),*) '      processor grid extent in i         jpni = ',jpni       ;   ii = ii + 1
231         WRITE(ldtxt(ii),*) '      processor grid extent in j         jpnj = ',jpnj       ;   ii = ii + 1
232      ENDIF
233
234      WRITE(ldtxt(ii),*) '      avoid use of mpi_allgather at the north fold  ln_nnogather = ', ln_nnogather  ; ii = ii + 1
235
236      CALL mpi_initialized ( mpi_was_called, code )
237      IF( code /= MPI_SUCCESS ) THEN
238         DO ji = 1, SIZE(ldtxt)
239            IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
240         END DO
241         WRITE(*, cform_err)
242         WRITE(*, *) 'lib_mpp: Error in routine mpi_initialized'
243         CALL mpi_abort( mpi_comm_world, code, ierr )
244      ENDIF
245
246      IF( mpi_was_called ) THEN
247         !
248         SELECT CASE ( cn_mpi_send )
249         CASE ( 'S' )                ! Standard mpi send (blocking)
250            WRITE(ldtxt(ii),*) '           Standard blocking mpi send (send)'             ;   ii = ii + 1
251         CASE ( 'B' )                ! Buffer mpi send (blocking)
252            WRITE(ldtxt(ii),*) '           Buffer blocking mpi send (bsend)'              ;   ii = ii + 1
253            IF( Agrif_Root() )   CALL mpi_init_oce( ldtxt, ii, ierr )
254         CASE ( 'I' )                ! Immediate mpi send (non-blocking send)
255            WRITE(ldtxt(ii),*) '           Immediate non-blocking send (isend)'           ;   ii = ii + 1
256            l_isend = .TRUE.
257         CASE DEFAULT
258            WRITE(ldtxt(ii),cform_err)                                                    ;   ii = ii + 1
259            WRITE(ldtxt(ii),*) '           bad value for cn_mpi_send = ', cn_mpi_send     ;   ii = ii + 1
260            kstop = kstop + 1
261         END SELECT
262         !
263      ELSEIF ( PRESENT(localComm) .AND. .NOT. mpi_was_called ) THEN
264         WRITE(ldtxt(ii),cform_err)                                                    ;   ii = ii + 1
265         WRITE(ldtxt(ii),*) ' lib_mpp: You cannot provide a local communicator '          ;   ii = ii + 1
266         WRITE(ldtxt(ii),*) '          without calling MPI_Init before ! '                ;   ii = ii + 1
267         kstop = kstop + 1
268      ELSE
269         SELECT CASE ( cn_mpi_send )
270         CASE ( 'S' )                ! Standard mpi send (blocking)
271            WRITE(ldtxt(ii),*) '           Standard blocking mpi send (send)'             ;   ii = ii + 1
272            CALL mpi_init( ierr )
273         CASE ( 'B' )                ! Buffer mpi send (blocking)
274            WRITE(ldtxt(ii),*) '           Buffer blocking mpi send (bsend)'              ;   ii = ii + 1
275            IF( Agrif_Root() )   CALL mpi_init_oce( ldtxt, ii, ierr )
276         CASE ( 'I' )                ! Immediate mpi send (non-blocking send)
277            WRITE(ldtxt(ii),*) '           Immediate non-blocking send (isend)'           ;   ii = ii + 1
278            l_isend = .TRUE.
279            CALL mpi_init( ierr )
280         CASE DEFAULT
281            WRITE(ldtxt(ii),cform_err)                                                    ;   ii = ii + 1
282            WRITE(ldtxt(ii),*) '           bad value for cn_mpi_send = ', cn_mpi_send     ;   ii = ii + 1
283            kstop = kstop + 1
284         END SELECT
285         !
286      ENDIF
287
288      IF( PRESENT(localComm) ) THEN
289         IF( Agrif_Root() ) THEN
290            mpi_comm_oce = localComm
291         ENDIF
292      ELSE
293         CALL mpi_comm_dup( mpi_comm_world, mpi_comm_oce, code)
294         IF( code /= MPI_SUCCESS ) THEN
295            DO ji = 1, SIZE(ldtxt)
296               IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
297            END DO
298            WRITE(*, cform_err)
299            WRITE(*, *) ' lib_mpp: Error in routine mpi_comm_dup'
300            CALL mpi_abort( mpi_comm_world, code, ierr )
301         ENDIF
302      ENDIF
303
304# if defined key_agrif
305      IF( Agrif_Root() ) THEN
306         CALL Agrif_MPI_Init(mpi_comm_oce)
307      ELSE
308         CALL Agrif_MPI_set_grid_comm(mpi_comm_oce)
309      ENDIF
310# endif
311
312      CALL mpi_comm_rank( mpi_comm_oce, mpprank, ierr )
313      CALL mpi_comm_size( mpi_comm_oce, mppsize, ierr )
314      mynode = mpprank
315
316      IF( mynode == 0 ) THEN
317         CALL ctl_opn( kumond, TRIM(ldname), 'UNKNOWN', 'FORMATTED', 'SEQUENTIAL', -1, 6, .FALSE. , 1 )
318         WRITE(kumond, nammpp)     
319      ENDIF
320      !
321      CALL MPI_OP_CREATE(DDPDD_MPI, .TRUE., MPI_SUMDD, ierr)
322      !
323#else
324      IF( PRESENT( localComm ) ) mpi_comm_oce = localComm
325      mynode = 0
326      CALL ctl_opn( kumond, TRIM(ldname), 'UNKNOWN', 'FORMATTED', 'SEQUENTIAL', -1, 6, .FALSE. , 1 )
327#endif
328   END FUNCTION mynode
329
330
331   SUBROUTINE mppsend( ktyp, pmess, kbytes, kdest, md_req )
332      !!----------------------------------------------------------------------
333      !!                  ***  routine mppsend  ***
334      !!
335      !! ** Purpose :   Send messag passing array
336      !!
337      !!----------------------------------------------------------------------
338      REAL(wp), INTENT(inout) ::   pmess(*)   ! array of real
339      INTEGER , INTENT(in   ) ::   kbytes     ! size of the array pmess
340      INTEGER , INTENT(in   ) ::   kdest      ! receive process number
341      INTEGER , INTENT(in   ) ::   ktyp       ! tag of the message
342      INTEGER , INTENT(in   ) ::   md_req     ! argument for isend
343      !!
344      INTEGER ::   iflag
345      !!----------------------------------------------------------------------
346      !
347#if defined key_mpp_mpi
348      SELECT CASE ( cn_mpi_send )
349      CASE ( 'S' )                ! Standard mpi send (blocking)
350         CALL mpi_send ( pmess, kbytes, mpi_double_precision, kdest , ktyp, mpi_comm_oce        , iflag )
351      CASE ( 'B' )                ! Buffer mpi send (blocking)
352         CALL mpi_bsend( pmess, kbytes, mpi_double_precision, kdest , ktyp, mpi_comm_oce        , iflag )
353      CASE ( 'I' )                ! Immediate mpi send (non-blocking send)
354         ! be carefull, one more argument here : the mpi request identifier..
355         CALL mpi_isend( pmess, kbytes, mpi_double_precision, kdest , ktyp, mpi_comm_oce, md_req, iflag )
356      END SELECT
357#endif
358      !
359   END SUBROUTINE mppsend
360
361
362   SUBROUTINE mpprecv( ktyp, pmess, kbytes, ksource )
363      !!----------------------------------------------------------------------
364      !!                  ***  routine mpprecv  ***
365      !!
366      !! ** Purpose :   Receive messag passing array
367      !!
368      !!----------------------------------------------------------------------
369      REAL(wp), INTENT(inout) ::   pmess(*)   ! array of real
370      INTEGER , INTENT(in   ) ::   kbytes     ! suze of the array pmess
371      INTEGER , INTENT(in   ) ::   ktyp       ! Tag of the recevied message
372      INTEGER, OPTIONAL, INTENT(in) :: ksource    ! source process number
373      !!
374      INTEGER :: istatus(mpi_status_size)
375      INTEGER :: iflag
376      INTEGER :: use_source
377      !!----------------------------------------------------------------------
378      !
379#if defined key_mpp_mpi
380      ! If a specific process number has been passed to the receive call,
381      ! use that one. Default is to use mpi_any_source
382      use_source = mpi_any_source
383      IF( PRESENT(ksource) )   use_source = ksource
384      !
385      CALL mpi_recv( pmess, kbytes, mpi_double_precision, use_source, ktyp, mpi_comm_oce, istatus, iflag )
386#endif
387      !
388   END SUBROUTINE mpprecv
389
390
391   SUBROUTINE mppgather( ptab, kp, pio )
392      !!----------------------------------------------------------------------
393      !!                   ***  routine mppgather  ***
394      !!
395      !! ** Purpose :   Transfert between a local subdomain array and a work
396      !!     array which is distributed following the vertical level.
397      !!
398      !!----------------------------------------------------------------------
399      REAL(wp), DIMENSION(jpi,jpj)      , INTENT(in   ) ::   ptab   ! subdomain input array
400      INTEGER                           , INTENT(in   ) ::   kp     ! record length
401      REAL(wp), DIMENSION(jpi,jpj,jpnij), INTENT(  out) ::   pio    ! subdomain input array
402      !!
403      INTEGER :: itaille, ierror   ! temporary integer
404      !!---------------------------------------------------------------------
405      !
406      itaille = jpi * jpj
407#if defined key_mpp_mpi
408      CALL mpi_gather( ptab, itaille, mpi_double_precision, pio, itaille     ,   &
409         &                            mpi_double_precision, kp , mpi_comm_oce, ierror )
410#else
411      pio(:,:,1) = ptab(:,:)
412#endif
413      !
414   END SUBROUTINE mppgather
415
416
417   SUBROUTINE mppscatter( pio, kp, ptab )
418      !!----------------------------------------------------------------------
419      !!                  ***  routine mppscatter  ***
420      !!
421      !! ** Purpose :   Transfert between awork array which is distributed
422      !!      following the vertical level and the local subdomain array.
423      !!
424      !!----------------------------------------------------------------------
425      REAL(wp), DIMENSION(jpi,jpj,jpnij)  ::   pio    ! output array
426      INTEGER                             ::   kp     ! Tag (not used with MPI
427      REAL(wp), DIMENSION(jpi,jpj)        ::   ptab   ! subdomain array input
428      !!
429      INTEGER :: itaille, ierror   ! temporary integer
430      !!---------------------------------------------------------------------
431      !
432      itaille = jpi * jpj
433      !
434#if defined key_mpp_mpi
435      CALL mpi_scatter( pio, itaille, mpi_double_precision, ptab, itaille     ,   &
436         &                            mpi_double_precision, kp  , mpi_comm_oce, ierror )
437#else
438      ptab(:,:) = pio(:,:,1)
439#endif
440      !
441   END SUBROUTINE mppscatter
442
443   
444   SUBROUTINE mpp_delay_sum( cdname, cdelay, y_in, pout, ldlast, kcom )
445     !!----------------------------------------------------------------------
446      !!                   ***  routine mpp_delay_sum  ***
447      !!
448      !! ** Purpose :   performed delayed mpp_sum, the result is received on next call
449      !!
450      !!----------------------------------------------------------------------
451      CHARACTER(len=*), INTENT(in   )               ::   cdname  ! name of the calling subroutine
452      CHARACTER(len=*), INTENT(in   )               ::   cdelay  ! name (used as id) of the delayed operation
453      COMPLEX(wp),      INTENT(in   ), DIMENSION(:) ::   y_in
454      REAL(wp),         INTENT(  out), DIMENSION(:) ::   pout
455      LOGICAL,          INTENT(in   )               ::   ldlast  ! true if this is the last time we call this routine
456      INTEGER,          INTENT(in   ), OPTIONAL     ::   kcom
457      !!
458      INTEGER ::   ji, isz
459      INTEGER ::   idvar
460      INTEGER ::   ierr, ilocalcomm
461      COMPLEX(wp), ALLOCATABLE, DIMENSION(:) ::   ytmp
462      !!----------------------------------------------------------------------
463#if defined key_mpp_mpi
464      ilocalcomm = mpi_comm_oce
465      IF( PRESENT(kcom) )   ilocalcomm = kcom
466
467      isz = SIZE(y_in)
468     
469      IF( narea == 1 .AND. numcom == -1 ) CALL mpp_report( cdname, ld_dlg = .TRUE. )
470
471      idvar = -1
472      DO ji = 1, nbdelay
473         IF( TRIM(cdelay) == TRIM(c_delaylist(ji)) ) idvar = ji
474      END DO
475      IF ( idvar == -1 )   CALL ctl_stop( 'STOP',' mpp_delay_sum : please add a new delayed exchange for '//TRIM(cdname) )
476
477      IF ( ndelayid(idvar) == 0 ) THEN         ! first call    with restart: %z1d defined in iom_delay_rst
478         !                                       --------------------------
479         IF ( SIZE(todelay(idvar)%z1d) /= isz ) THEN                  ! Check dimension coherence
480            IF(lwp) WRITE(numout,*) ' WARNING: the nb of delayed variables in restart file is not the model one'
481            DEALLOCATE(todelay(idvar)%z1d)
482            ndelayid(idvar) = -1                                      ! do as if we had no restart
483         ELSE
484            ALLOCATE(todelay(idvar)%y1d(isz))
485            todelay(idvar)%y1d(:) = CMPLX(todelay(idvar)%z1d(:), 0., wp)   ! create %y1d, complex variable needed by mpi_sumdd
486         END IF
487      ENDIF
488     
489      IF( ndelayid(idvar) == -1 ) THEN         ! first call without restart: define %y1d and %z1d from y_in with blocking allreduce
490         !                                       --------------------------
491         ALLOCATE(todelay(idvar)%z1d(isz), todelay(idvar)%y1d(isz))   ! allocate also %z1d as used for the restart
492         CALL mpi_allreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ierr )   ! get %y1d
493         todelay(idvar)%z1d(:) = REAL(todelay(idvar)%y1d(:), wp)      ! define %z1d from %y1d
494      ENDIF
495
496      IF( ndelayid(idvar) > 0 )   CALL mpp_delay_rcv( idvar )         ! make sure %z1d is received
497
498      ! send back pout from todelay(idvar)%z1d defined at previous call
499      pout(:) = todelay(idvar)%z1d(:)
500
501      ! send y_in into todelay(idvar)%y1d with a non-blocking communication
502# if defined key_mpi2
503      IF( ln_timing ) CALL tic_tac( .TRUE., ld_global = .TRUE.)
504      CALL  mpi_allreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ndelayid(idvar), ierr )
505      IF( ln_timing ) CALL tic_tac(.FALSE., ld_global = .TRUE.)
506# else
507      CALL mpi_iallreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ndelayid(idvar), ierr )
508# endif
509#else
510      pout(:) = REAL(y_in(:), wp)
511#endif
512
513   END SUBROUTINE mpp_delay_sum
514
515   
516   SUBROUTINE mpp_delay_max( cdname, cdelay, p_in, pout, ldlast, kcom )
517      !!----------------------------------------------------------------------
518      !!                   ***  routine mpp_delay_max  ***
519      !!
520      !! ** Purpose :   performed delayed mpp_max, the result is received on next call
521      !!
522      !!----------------------------------------------------------------------
523      CHARACTER(len=*), INTENT(in   )                 ::   cdname  ! name of the calling subroutine
524      CHARACTER(len=*), INTENT(in   )                 ::   cdelay  ! name (used as id) of the delayed operation
525      REAL(wp),         INTENT(in   ), DIMENSION(:)   ::   p_in    !
526      REAL(wp),         INTENT(  out), DIMENSION(:)   ::   pout    !
527      LOGICAL,          INTENT(in   )                 ::   ldlast  ! true if this is the last time we call this routine
528      INTEGER,          INTENT(in   ), OPTIONAL       ::   kcom
529      !!
530      INTEGER ::   ji, isz
531      INTEGER ::   idvar
532      INTEGER ::   ierr, ilocalcomm
533      !!----------------------------------------------------------------------
534#if defined key_mpp_mpi
535      ilocalcomm = mpi_comm_oce
536      IF( PRESENT(kcom) )   ilocalcomm = kcom
537
538      isz = SIZE(p_in)
539
540      IF( narea == 1 .AND. numcom == -1 ) CALL mpp_report( cdname, ld_dlg = .TRUE. )
541
542      idvar = -1
543      DO ji = 1, nbdelay
544         IF( TRIM(cdelay) == TRIM(c_delaylist(ji)) ) idvar = ji
545      END DO
546      IF ( idvar == -1 )   CALL ctl_stop( 'STOP',' mpp_delay_max : please add a new delayed exchange for '//TRIM(cdname) )
547
548      IF ( ndelayid(idvar) == 0 ) THEN         ! first call    with restart: %z1d defined in iom_delay_rst
549         !                                       --------------------------
550         IF ( SIZE(todelay(idvar)%z1d) /= isz ) THEN                  ! Check dimension coherence
551            IF(lwp) WRITE(numout,*) ' WARNING: the nb of delayed variables in restart file is not the model one'
552            DEALLOCATE(todelay(idvar)%z1d)
553            ndelayid(idvar) = -1                                      ! do as if we had no restart
554         END IF
555      ENDIF
556
557      IF( ndelayid(idvar) == -1 ) THEN         ! first call without restart: define %z1d from p_in with a blocking allreduce
558         !                                       --------------------------
559         ALLOCATE(todelay(idvar)%z1d(isz))
560         CALL mpi_allreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_DOUBLE_PRECISION, mpi_max, ilocalcomm, ierr )   ! get %z1d
561      ENDIF
562
563      IF( ndelayid(idvar) > 0 )   CALL mpp_delay_rcv( idvar )         ! make sure %z1d is received
564
565      ! send back pout from todelay(idvar)%z1d defined at previous call
566      pout(:) = todelay(idvar)%z1d(:)
567
568      ! send p_in into todelay(idvar)%z1d with a non-blocking communication
569# if defined key_mpi2
570      IF( ln_timing ) CALL tic_tac( .TRUE., ld_global = .TRUE.)
571      CALL  mpi_allreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_DOUBLE_PRECISION, mpi_max, ilocalcomm, ndelayid(idvar), ierr )
572      IF( ln_timing ) CALL tic_tac(.FALSE., ld_global = .TRUE.)
573# else
574      CALL mpi_iallreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_DOUBLE_PRECISION, mpi_max, ilocalcomm, ndelayid(idvar), ierr )
575# endif
576#else
577      pout(:) = p_in(:)
578#endif
579
580   END SUBROUTINE mpp_delay_max
581
582   
583   SUBROUTINE mpp_delay_rcv( kid )
584      !!----------------------------------------------------------------------
585      !!                   ***  routine mpp_delay_rcv  ***
586      !!
587      !! ** Purpose :  force barrier for delayed mpp (needed for restart)
588      !!
589      !!----------------------------------------------------------------------
590      INTEGER,INTENT(in   )      ::  kid 
591      INTEGER ::   ierr
592      !!----------------------------------------------------------------------
593#if defined key_mpp_mpi
594      IF( ndelayid(kid) /= -2 ) THEN 
595#if ! defined key_mpi2
596         IF( ln_timing ) CALL tic_tac( .TRUE., ld_global = .TRUE.)
597         CALL mpi_wait( ndelayid(kid), MPI_STATUS_IGNORE, ierr )                        ! make sure todelay(kid) is received
598         IF( ln_timing ) CALL tic_tac(.FALSE., ld_global = .TRUE.)
599#endif
600         IF( ASSOCIATED(todelay(kid)%y1d) )   todelay(kid)%z1d(:) = REAL(todelay(kid)%y1d(:), wp)  ! define %z1d from %y1d
601         ndelayid(kid) = -2   ! add flag to know that mpi_wait was already called on kid
602      ENDIF
603#endif
604   END SUBROUTINE mpp_delay_rcv
605
606   
607   !!----------------------------------------------------------------------
608   !!    ***  mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real  ***
609   !!   
610   !!----------------------------------------------------------------------
611   !!
612#  define OPERATION_MAX
613#  define INTEGER_TYPE
614#  define DIM_0d
615#     define ROUTINE_ALLREDUCE           mppmax_int
616#     include "mpp_allreduce_generic.h90"
617#     undef ROUTINE_ALLREDUCE
618#  undef DIM_0d
619#  define DIM_1d
620#     define ROUTINE_ALLREDUCE           mppmax_a_int
621#     include "mpp_allreduce_generic.h90"
622#     undef ROUTINE_ALLREDUCE
623#  undef DIM_1d
624#  undef INTEGER_TYPE
625!
626#  define REAL_TYPE
627#  define DIM_0d
628#     define ROUTINE_ALLREDUCE           mppmax_real
629#     include "mpp_allreduce_generic.h90"
630#     undef ROUTINE_ALLREDUCE
631#  undef DIM_0d
632#  define DIM_1d
633#     define ROUTINE_ALLREDUCE           mppmax_a_real
634#     include "mpp_allreduce_generic.h90"
635#     undef ROUTINE_ALLREDUCE
636#  undef DIM_1d
637#  undef REAL_TYPE
638#  undef OPERATION_MAX
639   !!----------------------------------------------------------------------
640   !!    ***  mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real  ***
641   !!   
642   !!----------------------------------------------------------------------
643   !!
644#  define OPERATION_MIN
645#  define INTEGER_TYPE
646#  define DIM_0d
647#     define ROUTINE_ALLREDUCE           mppmin_int
648#     include "mpp_allreduce_generic.h90"
649#     undef ROUTINE_ALLREDUCE
650#  undef DIM_0d
651#  define DIM_1d
652#     define ROUTINE_ALLREDUCE           mppmin_a_int
653#     include "mpp_allreduce_generic.h90"
654#     undef ROUTINE_ALLREDUCE
655#  undef DIM_1d
656#  undef INTEGER_TYPE
657!
658#  define REAL_TYPE
659#  define DIM_0d
660#     define ROUTINE_ALLREDUCE           mppmin_real
661#     include "mpp_allreduce_generic.h90"
662#     undef ROUTINE_ALLREDUCE
663#  undef DIM_0d
664#  define DIM_1d
665#     define ROUTINE_ALLREDUCE           mppmin_a_real
666#     include "mpp_allreduce_generic.h90"
667#     undef ROUTINE_ALLREDUCE
668#  undef DIM_1d
669#  undef REAL_TYPE
670#  undef OPERATION_MIN
671
672   !!----------------------------------------------------------------------
673   !!    ***  mppsum_a_int, mppsum_int, mppsum_a_real, mppsum_real  ***
674   !!   
675   !!   Global sum of 1D array or a variable (integer, real or complex)
676   !!----------------------------------------------------------------------
677   !!
678#  define OPERATION_SUM
679#  define INTEGER_TYPE
680#  define DIM_0d
681#     define ROUTINE_ALLREDUCE           mppsum_int
682#     include "mpp_allreduce_generic.h90"
683#     undef ROUTINE_ALLREDUCE
684#  undef DIM_0d
685#  define DIM_1d
686#     define ROUTINE_ALLREDUCE           mppsum_a_int
687#     include "mpp_allreduce_generic.h90"
688#     undef ROUTINE_ALLREDUCE
689#  undef DIM_1d
690#  undef INTEGER_TYPE
691!
692#  define REAL_TYPE
693#  define DIM_0d
694#     define ROUTINE_ALLREDUCE           mppsum_real
695#     include "mpp_allreduce_generic.h90"
696#     undef ROUTINE_ALLREDUCE
697#  undef DIM_0d
698#  define DIM_1d
699#     define ROUTINE_ALLREDUCE           mppsum_a_real
700#     include "mpp_allreduce_generic.h90"
701#     undef ROUTINE_ALLREDUCE
702#  undef DIM_1d
703#  undef REAL_TYPE
704#  undef OPERATION_SUM
705
706#  define OPERATION_SUM_DD
707#  define COMPLEX_TYPE
708#  define DIM_0d
709#     define ROUTINE_ALLREDUCE           mppsum_realdd
710#     include "mpp_allreduce_generic.h90"
711#     undef ROUTINE_ALLREDUCE
712#  undef DIM_0d
713#  define DIM_1d
714#     define ROUTINE_ALLREDUCE           mppsum_a_realdd
715#     include "mpp_allreduce_generic.h90"
716#     undef ROUTINE_ALLREDUCE
717#  undef DIM_1d
718#  undef COMPLEX_TYPE
719#  undef OPERATION_SUM_DD
720
721   !!----------------------------------------------------------------------
722   !!    ***  mpp_minloc2d, mpp_minloc3d, mpp_maxloc2d, mpp_maxloc3d
723   !!   
724   !!----------------------------------------------------------------------
725   !!
726#  define OPERATION_MINLOC
727#  define DIM_2d
728#     define ROUTINE_LOC           mpp_minloc2d
729#     include "mpp_loc_generic.h90"
730#     undef ROUTINE_LOC
731#  undef DIM_2d
732#  define DIM_3d
733#     define ROUTINE_LOC           mpp_minloc3d
734#     include "mpp_loc_generic.h90"
735#     undef ROUTINE_LOC
736#  undef DIM_3d
737#  undef OPERATION_MINLOC
738
739#  define OPERATION_MAXLOC
740#  define DIM_2d
741#     define ROUTINE_LOC           mpp_maxloc2d
742#     include "mpp_loc_generic.h90"
743#     undef ROUTINE_LOC
744#  undef DIM_2d
745#  define DIM_3d
746#     define ROUTINE_LOC           mpp_maxloc3d
747#     include "mpp_loc_generic.h90"
748#     undef ROUTINE_LOC
749#  undef DIM_3d
750#  undef OPERATION_MAXLOC
751
752   SUBROUTINE mppsync()
753      !!----------------------------------------------------------------------
754      !!                  ***  routine mppsync  ***
755      !!
756      !! ** Purpose :   Massively parallel processors, synchroneous
757      !!
758      !!-----------------------------------------------------------------------
759      INTEGER :: ierror
760      !!-----------------------------------------------------------------------
761      !
762#if defined key_mpp_mpi
763      CALL mpi_barrier( mpi_comm_oce, ierror )
764#endif
765      !
766   END SUBROUTINE mppsync
767
768
769   SUBROUTINE mppstop( ldfinal, ld_force_abort ) 
770      !!----------------------------------------------------------------------
771      !!                  ***  routine mppstop  ***
772      !!
773      !! ** purpose :   Stop massively parallel processors method
774      !!
775      !!----------------------------------------------------------------------
776      LOGICAL, OPTIONAL, INTENT(in) :: ldfinal    ! source process number
777      LOGICAL, OPTIONAL, INTENT(in) :: ld_force_abort    ! source process number
778      LOGICAL ::   llfinal, ll_force_abort
779      INTEGER ::   info
780      !!----------------------------------------------------------------------
781      llfinal = .FALSE.
782      IF( PRESENT(ldfinal) ) llfinal = ldfinal
783      ll_force_abort = .FALSE.
784      IF( PRESENT(ld_force_abort) ) ll_force_abort = ld_force_abort
785      !
786#if defined key_mpp_mpi
787      IF(ll_force_abort) THEN
788         CALL mpi_abort( MPI_COMM_WORLD )
789      ELSE
790         CALL mppsync
791         CALL mpi_finalize( info )
792      ENDIF
793#endif
794      IF( .NOT. llfinal ) STOP 123
795      !
796   END SUBROUTINE mppstop
797
798
799   SUBROUTINE mpp_comm_free( kcom )
800      !!----------------------------------------------------------------------
801      INTEGER, INTENT(in) ::   kcom
802      !!
803      INTEGER :: ierr
804      !!----------------------------------------------------------------------
805      !
806#if defined key_mpp_mpi
807      CALL MPI_COMM_FREE(kcom, ierr)
808#endif
809      !
810   END SUBROUTINE mpp_comm_free
811
812
813   SUBROUTINE mpp_ini_znl( kumout )
814      !!----------------------------------------------------------------------
815      !!               ***  routine mpp_ini_znl  ***
816      !!
817      !! ** Purpose :   Initialize special communicator for computing zonal sum
818      !!
819      !! ** Method  : - Look for processors in the same row
820      !!              - Put their number in nrank_znl
821      !!              - Create group for the znl processors
822      !!              - Create a communicator for znl processors
823      !!              - Determine if processor should write znl files
824      !!
825      !! ** output
826      !!      ndim_rank_znl = number of processors on the same row
827      !!      ngrp_znl = group ID for the znl processors
828      !!      ncomm_znl = communicator for the ice procs.
829      !!      n_znl_root = number (in the world) of proc 0 in the ice comm.
830      !!
831      !!----------------------------------------------------------------------
832      INTEGER, INTENT(in) ::   kumout   ! ocean.output logical units
833      !
834      INTEGER :: jproc      ! dummy loop integer
835      INTEGER :: ierr, ii   ! local integer
836      INTEGER, ALLOCATABLE, DIMENSION(:) ::   kwork
837      !!----------------------------------------------------------------------
838#if defined key_mpp_mpi
839      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_world     : ', ngrp_world
840      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - mpi_comm_world : ', mpi_comm_world
841      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - mpi_comm_oce   : ', mpi_comm_oce
842      !
843      ALLOCATE( kwork(jpnij), STAT=ierr )
844      IF( ierr /= 0 ) THEN
845         WRITE(kumout, cform_err)
846         WRITE(kumout,*) 'mpp_ini_znl : failed to allocate 1D array of length jpnij'
847         CALL mppstop
848      ENDIF
849
850      IF( jpnj == 1 ) THEN
851         ngrp_znl  = ngrp_world
852         ncomm_znl = mpi_comm_oce
853      ELSE
854         !
855         CALL MPI_ALLGATHER ( njmpp, 1, mpi_integer, kwork, 1, mpi_integer, mpi_comm_oce, ierr )
856         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - kwork pour njmpp : ', kwork
857         !-$$        CALL flush(numout)
858         !
859         ! Count number of processors on the same row
860         ndim_rank_znl = 0
861         DO jproc=1,jpnij
862            IF ( kwork(jproc) == njmpp ) THEN
863               ndim_rank_znl = ndim_rank_znl + 1
864            ENDIF
865         END DO
866         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ndim_rank_znl : ', ndim_rank_znl
867         !-$$        CALL flush(numout)
868         ! Allocate the right size to nrank_znl
869         IF (ALLOCATED (nrank_znl)) DEALLOCATE(nrank_znl)
870         ALLOCATE(nrank_znl(ndim_rank_znl))
871         ii = 0
872         nrank_znl (:) = 0
873         DO jproc=1,jpnij
874            IF ( kwork(jproc) == njmpp) THEN
875               ii = ii + 1
876               nrank_znl(ii) = jproc -1
877            ENDIF
878         END DO
879         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - nrank_znl : ', nrank_znl
880         !-$$        CALL flush(numout)
881
882         ! Create the opa group
883         CALL MPI_COMM_GROUP(mpi_comm_oce,ngrp_opa,ierr)
884         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_opa : ', ngrp_opa
885         !-$$        CALL flush(numout)
886
887         ! Create the znl group from the opa group
888         CALL MPI_GROUP_INCL  ( ngrp_opa, ndim_rank_znl, nrank_znl, ngrp_znl, ierr )
889         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_znl ', ngrp_znl
890         !-$$        CALL flush(numout)
891
892         ! Create the znl communicator from the opa communicator, ie the pool of procs in the same row
893         CALL MPI_COMM_CREATE ( mpi_comm_oce, ngrp_znl, ncomm_znl, ierr )
894         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ncomm_znl ', ncomm_znl
895         !-$$        CALL flush(numout)
896         !
897      END IF
898
899      ! Determines if processor if the first (starting from i=1) on the row
900      IF ( jpni == 1 ) THEN
901         l_znl_root = .TRUE.
902      ELSE
903         l_znl_root = .FALSE.
904         kwork (1) = nimpp
905         CALL mpp_min ( 'lib_mpp', kwork(1), kcom = ncomm_znl)
906         IF ( nimpp == kwork(1)) l_znl_root = .TRUE.
907      END IF
908
909      DEALLOCATE(kwork)
910#endif
911
912   END SUBROUTINE mpp_ini_znl
913
914
915   SUBROUTINE mpp_ini_north
916      !!----------------------------------------------------------------------
917      !!               ***  routine mpp_ini_north  ***
918      !!
919      !! ** Purpose :   Initialize special communicator for north folding
920      !!      condition together with global variables needed in the mpp folding
921      !!
922      !! ** Method  : - Look for northern processors
923      !!              - Put their number in nrank_north
924      !!              - Create groups for the world processors and the north processors
925      !!              - Create a communicator for northern processors
926      !!
927      !! ** output
928      !!      njmppmax = njmpp for northern procs
929      !!      ndim_rank_north = number of processors in the northern line
930      !!      nrank_north (ndim_rank_north) = number  of the northern procs.
931      !!      ngrp_world = group ID for the world processors
932      !!      ngrp_north = group ID for the northern processors
933      !!      ncomm_north = communicator for the northern procs.
934      !!      north_root = number (in the world) of proc 0 in the northern comm.
935      !!
936      !!----------------------------------------------------------------------
937      INTEGER ::   ierr
938      INTEGER ::   jjproc
939      INTEGER ::   ii, ji
940      !!----------------------------------------------------------------------
941      !
942#if defined key_mpp_mpi
943      njmppmax = MAXVAL( njmppt )
944      !
945      ! Look for how many procs on the northern boundary
946      ndim_rank_north = 0
947      DO jjproc = 1, jpnij
948         IF( njmppt(jjproc) == njmppmax )   ndim_rank_north = ndim_rank_north + 1
949      END DO
950      !
951      ! Allocate the right size to nrank_north
952      IF (ALLOCATED (nrank_north)) DEALLOCATE(nrank_north)
953      ALLOCATE( nrank_north(ndim_rank_north) )
954
955      ! Fill the nrank_north array with proc. number of northern procs.
956      ! Note : the rank start at 0 in MPI
957      ii = 0
958      DO ji = 1, jpnij
959         IF ( njmppt(ji) == njmppmax   ) THEN
960            ii=ii+1
961            nrank_north(ii)=ji-1
962         END IF
963      END DO
964      !
965      ! create the world group
966      CALL MPI_COMM_GROUP( mpi_comm_oce, ngrp_world, ierr )
967      !
968      ! Create the North group from the world group
969      CALL MPI_GROUP_INCL( ngrp_world, ndim_rank_north, nrank_north, ngrp_north, ierr )
970      !
971      ! Create the North communicator , ie the pool of procs in the north group
972      CALL MPI_COMM_CREATE( mpi_comm_oce, ngrp_north, ncomm_north, ierr )
973      !
974#endif
975   END SUBROUTINE mpp_ini_north
976
977
978   SUBROUTINE mpi_init_oce( ldtxt, ksft, code )
979      !!---------------------------------------------------------------------
980      !!                   ***  routine mpp_init.opa  ***
981      !!
982      !! ** Purpose :: export and attach a MPI buffer for bsend
983      !!
984      !! ** Method  :: define buffer size in namelist, if 0 no buffer attachment
985      !!            but classical mpi_init
986      !!
987      !! History :: 01/11 :: IDRIS initial version for IBM only
988      !!            08/04 :: R. Benshila, generalisation
989      !!---------------------------------------------------------------------
990      CHARACTER(len=*),DIMENSION(:), INTENT(  out) ::   ldtxt
991      INTEGER                      , INTENT(inout) ::   ksft
992      INTEGER                      , INTENT(  out) ::   code
993      INTEGER                                      ::   ierr, ji
994      LOGICAL                                      ::   mpi_was_called
995      !!---------------------------------------------------------------------
996#if defined key_mpp_mpi
997      !
998      CALL mpi_initialized( mpi_was_called, code )      ! MPI initialization
999      IF ( code /= MPI_SUCCESS ) THEN
1000         DO ji = 1, SIZE(ldtxt)
1001            IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
1002         END DO
1003         WRITE(*, cform_err)
1004         WRITE(*, *) ' lib_mpp: Error in routine mpi_initialized'
1005         CALL mpi_abort( mpi_comm_world, code, ierr )
1006      ENDIF
1007      !
1008      IF( .NOT. mpi_was_called ) THEN
1009         CALL mpi_init( code )
1010         CALL mpi_comm_dup( mpi_comm_world, mpi_comm_oce, code )
1011         IF ( code /= MPI_SUCCESS ) THEN
1012            DO ji = 1, SIZE(ldtxt)
1013               IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
1014            END DO
1015            WRITE(*, cform_err)
1016            WRITE(*, *) ' lib_mpp: Error in routine mpi_comm_dup'
1017            CALL mpi_abort( mpi_comm_world, code, ierr )
1018         ENDIF
1019      ENDIF
1020      !
1021      IF( nn_buffer > 0 ) THEN
1022         WRITE(ldtxt(ksft),*) 'mpi_bsend, buffer allocation of  : ', nn_buffer   ;   ksft = ksft + 1
1023         ! Buffer allocation and attachment
1024         ALLOCATE( tampon(nn_buffer), stat = ierr )
1025         IF( ierr /= 0 ) THEN
1026            DO ji = 1, SIZE(ldtxt)
1027               IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
1028            END DO
1029            WRITE(*, cform_err)
1030            WRITE(*, *) ' lib_mpp: Error in ALLOCATE', ierr
1031            CALL mpi_abort( mpi_comm_world, code, ierr )
1032         END IF
1033         CALL mpi_buffer_attach( tampon, nn_buffer, code )
1034      ENDIF
1035      !
1036#endif
1037   END SUBROUTINE mpi_init_oce
1038
1039
1040   SUBROUTINE DDPDD_MPI( ydda, yddb, ilen, itype )
1041      !!---------------------------------------------------------------------
1042      !!   Routine DDPDD_MPI: used by reduction operator MPI_SUMDD
1043      !!
1044      !!   Modification of original codes written by David H. Bailey
1045      !!   This subroutine computes yddb(i) = ydda(i)+yddb(i)
1046      !!---------------------------------------------------------------------
1047      INTEGER                     , INTENT(in)    ::   ilen, itype
1048      COMPLEX(wp), DIMENSION(ilen), INTENT(in)    ::   ydda
1049      COMPLEX(wp), DIMENSION(ilen), INTENT(inout) ::   yddb
1050      !
1051      REAL(wp) :: zerr, zt1, zt2    ! local work variables
1052      INTEGER  :: ji, ztmp           ! local scalar
1053      !!---------------------------------------------------------------------
1054      !
1055      ztmp = itype   ! avoid compilation warning
1056      !
1057      DO ji=1,ilen
1058      ! Compute ydda + yddb using Knuth's trick.
1059         zt1  = real(ydda(ji)) + real(yddb(ji))
1060         zerr = zt1 - real(ydda(ji))
1061         zt2  = ((real(yddb(ji)) - zerr) + (real(ydda(ji)) - (zt1 - zerr))) &
1062                + aimag(ydda(ji)) + aimag(yddb(ji))
1063
1064         ! The result is zt1 + zt2, after normalization.
1065         yddb(ji) = cmplx ( zt1 + zt2, zt2 - ((zt1 + zt2) - zt1),wp )
1066      END DO
1067      !
1068   END SUBROUTINE DDPDD_MPI
1069
1070
1071   SUBROUTINE mpp_report( cdname, kpk, kpl, kpf, ld_lbc, ld_glb, ld_dlg )
1072      !!----------------------------------------------------------------------
1073      !!                  ***  routine mpp_report  ***
1074      !!
1075      !! ** Purpose :   report use of mpp routines per time-setp
1076      !!
1077      !!----------------------------------------------------------------------
1078      CHARACTER(len=*),           INTENT(in   ) ::   cdname      ! name of the calling subroutine
1079      INTEGER         , OPTIONAL, INTENT(in   ) ::   kpk, kpl, kpf
1080      LOGICAL         , OPTIONAL, INTENT(in   ) ::   ld_lbc, ld_glb, ld_dlg
1081      !!
1082      CHARACTER(len=128)                        ::   ccountname  ! name of a subroutine to count communications
1083      LOGICAL ::   ll_lbc, ll_glb, ll_dlg
1084      INTEGER ::    ji,  jj,  jk,  jh, jf, jcount   ! dummy loop indices
1085      !!----------------------------------------------------------------------
1086#if defined key_mpp_mpi
1087      !
1088      ll_lbc = .FALSE.
1089      IF( PRESENT(ld_lbc) ) ll_lbc = ld_lbc
1090      ll_glb = .FALSE.
1091      IF( PRESENT(ld_glb) ) ll_glb = ld_glb
1092      ll_dlg = .FALSE.
1093      IF( PRESENT(ld_dlg) ) ll_dlg = ld_dlg
1094      !
1095      ! find the smallest common frequency: default = frequency product, if multiple, choose the larger of the 2 frequency
1096      IF( ncom_dttrc /= 1 )   CALL ctl_stop( 'STOP', 'mpp_report, ncom_dttrc /= 1 not coded...' ) 
1097      ncom_freq = ncom_fsbc
1098      !
1099      IF ( ncom_stp == nit000+ncom_freq ) THEN   ! avoid to count extra communications in potential initializations at nit000
1100         IF( ll_lbc ) THEN
1101            IF( .NOT. ALLOCATED(ncomm_sequence) ) ALLOCATE( ncomm_sequence(ncom_rec_max,2) )
1102            IF( .NOT. ALLOCATED(    crname_lbc) ) ALLOCATE(     crname_lbc(ncom_rec_max  ) )
1103            n_sequence_lbc = n_sequence_lbc + 1
1104            IF( n_sequence_lbc > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
1105            crname_lbc(n_sequence_lbc) = cdname     ! keep the name of the calling routine
1106            ncomm_sequence(n_sequence_lbc,1) = kpk*kpl   ! size of 3rd and 4th dimensions
1107            ncomm_sequence(n_sequence_lbc,2) = kpf       ! number of arrays to be treated (multi)
1108         ENDIF
1109         IF( ll_glb ) THEN
1110            IF( .NOT. ALLOCATED(crname_glb) ) ALLOCATE( crname_glb(ncom_rec_max) )
1111            n_sequence_glb = n_sequence_glb + 1
1112            IF( n_sequence_glb > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
1113            crname_glb(n_sequence_glb) = cdname     ! keep the name of the calling routine
1114         ENDIF
1115         IF( ll_dlg ) THEN
1116            IF( .NOT. ALLOCATED(crname_dlg) ) ALLOCATE( crname_dlg(ncom_rec_max) )
1117            n_sequence_dlg = n_sequence_dlg + 1
1118            IF( n_sequence_dlg > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
1119            crname_dlg(n_sequence_dlg) = cdname     ! keep the name of the calling routine
1120         ENDIF
1121      ELSE IF ( ncom_stp == nit000+2*ncom_freq ) THEN
1122         CALL ctl_opn( numcom, 'communication_report.txt', 'REPLACE', 'FORMATTED', 'SEQUENTIAL', -1, numout, .FALSE., narea )
1123         WRITE(numcom,*) ' '
1124         WRITE(numcom,*) ' ------------------------------------------------------------'
1125         WRITE(numcom,*) ' Communication pattern report (second oce+sbc+top time step):'
1126         WRITE(numcom,*) ' ------------------------------------------------------------'
1127         WRITE(numcom,*) ' '
1128         WRITE(numcom,'(A,I4)') ' Exchanged halos : ', n_sequence_lbc
1129         jj = 0; jk = 0; jf = 0; jh = 0
1130         DO ji = 1, n_sequence_lbc
1131            IF ( ncomm_sequence(ji,1) .GT. 1 ) jk = jk + 1
1132            IF ( ncomm_sequence(ji,2) .GT. 1 ) jf = jf + 1
1133            IF ( ncomm_sequence(ji,1) .GT. 1 .AND. ncomm_sequence(ji,2) .GT. 1 ) jj = jj + 1
1134            jh = MAX (jh, ncomm_sequence(ji,1)*ncomm_sequence(ji,2))
1135         END DO
1136         WRITE(numcom,'(A,I3)') ' 3D Exchanged halos : ', jk
1137         WRITE(numcom,'(A,I3)') ' Multi arrays exchanged halos : ', jf
1138         WRITE(numcom,'(A,I3)') '   from which 3D : ', jj
1139         WRITE(numcom,'(A,I10)') ' Array max size : ', jh*jpi*jpj
1140         WRITE(numcom,*) ' '
1141         WRITE(numcom,*) ' lbc_lnk called'
1142         DO ji = 1, n_sequence_lbc - 1
1143            IF ( crname_lbc(ji) /= 'already counted' ) THEN
1144               ccountname = crname_lbc(ji)
1145               crname_lbc(ji) = 'already counted'
1146               jcount = 1
1147               DO jj = ji + 1, n_sequence_lbc
1148                  IF ( ccountname ==  crname_lbc(jj) ) THEN
1149                     jcount = jcount + 1
1150                     crname_lbc(jj) = 'already counted'
1151                  END IF
1152               END DO
1153               WRITE(numcom,'(A, I4, A, A)') ' - ', jcount,' times by subroutine ', TRIM(ccountname)
1154            END IF
1155         END DO
1156         IF ( crname_lbc(n_sequence_lbc) /= 'already counted' ) THEN
1157            WRITE(numcom,'(A, I4, A, A)') ' - ', 1,' times by subroutine ', TRIM(crname_lbc(ncom_rec_max))
1158         END IF
1159         WRITE(numcom,*) ' '
1160         IF ( n_sequence_glb > 0 ) THEN
1161            WRITE(numcom,'(A,I4)') ' Global communications : ', n_sequence_glb
1162            jj = 1
1163            DO ji = 2, n_sequence_glb
1164               IF( crname_glb(ji-1) /= crname_glb(ji) ) THEN
1165                  WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_glb(ji-1))
1166                  jj = 0
1167               END IF
1168               jj = jj + 1 
1169            END DO
1170            WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_glb(n_sequence_glb))
1171            DEALLOCATE(crname_glb)
1172         ELSE
1173            WRITE(numcom,*) ' No MPI global communication '
1174         ENDIF
1175         WRITE(numcom,*) ' '
1176         IF ( n_sequence_dlg > 0 ) THEN
1177            WRITE(numcom,'(A,I4)') ' Delayed global communications : ', n_sequence_dlg
1178            jj = 1
1179            DO ji = 2, n_sequence_dlg
1180               IF( crname_dlg(ji-1) /= crname_dlg(ji) ) THEN
1181                  WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_dlg(ji-1))
1182                  jj = 0
1183               END IF
1184               jj = jj + 1 
1185            END DO
1186            WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_dlg(n_sequence_dlg))
1187            DEALLOCATE(crname_dlg)
1188         ELSE
1189            WRITE(numcom,*) ' No MPI delayed global communication '
1190         ENDIF
1191         WRITE(numcom,*) ' '
1192         WRITE(numcom,*) ' -----------------------------------------------'
1193         WRITE(numcom,*) ' '
1194         DEALLOCATE(ncomm_sequence)
1195         DEALLOCATE(crname_lbc)
1196      ENDIF
1197#endif
1198   END SUBROUTINE mpp_report
1199
1200   
1201   SUBROUTINE tic_tac (ld_tic, ld_global)
1202
1203    LOGICAL,           INTENT(IN) :: ld_tic
1204    LOGICAL, OPTIONAL, INTENT(IN) :: ld_global
1205    REAL(wp), DIMENSION(2), SAVE :: tic_wt
1206    REAL(wp),               SAVE :: tic_ct = 0._wp
1207    INTEGER :: ii
1208#if defined key_mpp_mpi
1209
1210    IF( ncom_stp <= nit000 ) RETURN
1211    IF( ncom_stp == nitend ) RETURN
1212    ii = 1
1213    IF( PRESENT( ld_global ) ) THEN
1214       IF( ld_global ) ii = 2
1215    END IF
1216   
1217    IF ( ld_tic ) THEN
1218       tic_wt(ii) = MPI_Wtime()                                                    ! start count tic->tac (waiting time)
1219       IF ( tic_ct > 0.0_wp ) compute_time = compute_time + MPI_Wtime() - tic_ct   ! cumulate count tac->tic
1220    ELSE
1221       waiting_time(ii) = waiting_time(ii) + MPI_Wtime() - tic_wt(ii)              ! cumulate count tic->tac
1222       tic_ct = MPI_Wtime()                                                        ! start count tac->tic (waiting time)
1223    ENDIF
1224#endif
1225   
1226   END SUBROUTINE tic_tac
1227
1228#if ! defined key_mpp_mpi
1229   SUBROUTINE mpi_wait(request, status, ierror)
1230      INTEGER                            , INTENT(in   ) ::   request
1231      INTEGER, DIMENSION(MPI_STATUS_SIZE), INTENT(  out) ::   status
1232      INTEGER                            , INTENT(  out) ::   ierror
1233   END SUBROUTINE mpi_wait
1234#endif
1235
1236   !!----------------------------------------------------------------------
1237   !!   ctl_stop, ctl_warn, get_unit, ctl_opn, ctl_nam   routines
1238   !!----------------------------------------------------------------------
1239
1240   SUBROUTINE ctl_stop( cd1, cd2, cd3, cd4, cd5 ,   &
1241      &                 cd6, cd7, cd8, cd9, cd10 )
1242      !!----------------------------------------------------------------------
1243      !!                  ***  ROUTINE  stop_opa  ***
1244      !!
1245      !! ** Purpose :   print in ocean.outpput file a error message and
1246      !!                increment the error number (nstop) by one.
1247      !!----------------------------------------------------------------------
1248      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd1, cd2, cd3, cd4, cd5
1249      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd6, cd7, cd8, cd9, cd10
1250      !!----------------------------------------------------------------------
1251      !
1252      nstop = nstop + 1
1253
1254      ! force to open ocean.output file
1255      IF( numout == 6 ) CALL ctl_opn( numout, 'ocean.output', 'APPEND', 'FORMATTED', 'SEQUENTIAL', -1, 6, .FALSE. )
1256       
1257      WRITE(numout,cform_err)
1258      IF( PRESENT(cd1 ) )   WRITE(numout,*) TRIM(cd1)
1259      IF( PRESENT(cd2 ) )   WRITE(numout,*) TRIM(cd2)
1260      IF( PRESENT(cd3 ) )   WRITE(numout,*) TRIM(cd3)
1261      IF( PRESENT(cd4 ) )   WRITE(numout,*) TRIM(cd4)
1262      IF( PRESENT(cd5 ) )   WRITE(numout,*) TRIM(cd5)
1263      IF( PRESENT(cd6 ) )   WRITE(numout,*) TRIM(cd6)
1264      IF( PRESENT(cd7 ) )   WRITE(numout,*) TRIM(cd7)
1265      IF( PRESENT(cd8 ) )   WRITE(numout,*) TRIM(cd8)
1266      IF( PRESENT(cd9 ) )   WRITE(numout,*) TRIM(cd9)
1267      IF( PRESENT(cd10) )   WRITE(numout,*) TRIM(cd10)
1268
1269                               CALL FLUSH(numout    )
1270      IF( numstp     /= -1 )   CALL FLUSH(numstp    )
1271      IF( numrun     /= -1 )   CALL FLUSH(numrun    )
1272      IF( numevo_ice /= -1 )   CALL FLUSH(numevo_ice)
1273      !
1274      IF( cd1 == 'STOP' ) THEN
1275         WRITE(numout,*)  'huge E-R-R-O-R : immediate stop'
1276         CALL mppstop(ld_force_abort = .true.)
1277      ENDIF
1278      !
1279   END SUBROUTINE ctl_stop
1280
1281
1282   SUBROUTINE ctl_warn( cd1, cd2, cd3, cd4, cd5,   &
1283      &                 cd6, cd7, cd8, cd9, cd10 )
1284      !!----------------------------------------------------------------------
1285      !!                  ***  ROUTINE  stop_warn  ***
1286      !!
1287      !! ** Purpose :   print in ocean.outpput file a error message and
1288      !!                increment the warning number (nwarn) by one.
1289      !!----------------------------------------------------------------------
1290      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd1, cd2, cd3, cd4, cd5
1291      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd6, cd7, cd8, cd9, cd10
1292      !!----------------------------------------------------------------------
1293      !
1294      nwarn = nwarn + 1
1295      IF(lwp) THEN
1296         WRITE(numout,cform_war)
1297         IF( PRESENT(cd1 ) ) WRITE(numout,*) TRIM(cd1)
1298         IF( PRESENT(cd2 ) ) WRITE(numout,*) TRIM(cd2)
1299         IF( PRESENT(cd3 ) ) WRITE(numout,*) TRIM(cd3)
1300         IF( PRESENT(cd4 ) ) WRITE(numout,*) TRIM(cd4)
1301         IF( PRESENT(cd5 ) ) WRITE(numout,*) TRIM(cd5)
1302         IF( PRESENT(cd6 ) ) WRITE(numout,*) TRIM(cd6)
1303         IF( PRESENT(cd7 ) ) WRITE(numout,*) TRIM(cd7)
1304         IF( PRESENT(cd8 ) ) WRITE(numout,*) TRIM(cd8)
1305         IF( PRESENT(cd9 ) ) WRITE(numout,*) TRIM(cd9)
1306         IF( PRESENT(cd10) ) WRITE(numout,*) TRIM(cd10)
1307      ENDIF
1308      CALL FLUSH(numout)
1309      !
1310   END SUBROUTINE ctl_warn
1311
1312
1313   SUBROUTINE ctl_opn( knum, cdfile, cdstat, cdform, cdacce, klengh, kout, ldwp, karea )
1314      !!----------------------------------------------------------------------
1315      !!                  ***  ROUTINE ctl_opn  ***
1316      !!
1317      !! ** Purpose :   Open file and check if required file is available.
1318      !!
1319      !! ** Method  :   Fortan open
1320      !!----------------------------------------------------------------------
1321      INTEGER          , INTENT(  out) ::   knum      ! logical unit to open
1322      CHARACTER(len=*) , INTENT(in   ) ::   cdfile    ! file name to open
1323      CHARACTER(len=*) , INTENT(in   ) ::   cdstat    ! disposition specifier
1324      CHARACTER(len=*) , INTENT(in   ) ::   cdform    ! formatting specifier
1325      CHARACTER(len=*) , INTENT(in   ) ::   cdacce    ! access specifier
1326      INTEGER          , INTENT(in   ) ::   klengh    ! record length
1327      INTEGER          , INTENT(in   ) ::   kout      ! number of logical units for write
1328      LOGICAL          , INTENT(in   ) ::   ldwp      ! boolean term for print
1329      INTEGER, OPTIONAL, INTENT(in   ) ::   karea     ! proc number
1330      !
1331      CHARACTER(len=80) ::   clfile
1332      INTEGER           ::   iost
1333      !!----------------------------------------------------------------------
1334      !
1335      ! adapt filename
1336      ! ----------------
1337      clfile = TRIM(cdfile)
1338      IF( PRESENT( karea ) ) THEN
1339         IF( karea > 1 )   WRITE(clfile, "(a,'_',i4.4)") TRIM(clfile), karea-1
1340      ENDIF
1341#if defined key_agrif
1342      IF( .NOT. Agrif_Root() )   clfile = TRIM(Agrif_CFixed())//'_'//TRIM(clfile)
1343      knum=Agrif_Get_Unit()
1344#else
1345      knum=get_unit()
1346#endif
1347      IF( TRIM(cdfile) == '/dev/null' )   clfile = TRIM(cdfile)   ! force the use of /dev/null
1348      !
1349      iost=0
1350      IF( cdacce(1:6) == 'DIRECT' )  THEN         ! cdacce has always more than 6 characters
1351         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS=cdstat, RECL=klengh         , ERR=100, IOSTAT=iost )
1352      ELSE IF( TRIM(cdstat) == 'APPEND' )  THEN   ! cdstat can have less than 6 characters
1353         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS='UNKNOWN', POSITION='APPEND', ERR=100, IOSTAT=iost )
1354      ELSE
1355         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS=cdstat                      , ERR=100, IOSTAT=iost )
1356      ENDIF
1357      IF( iost /= 0 .AND. TRIM(clfile) == '/dev/null' ) &   ! for windows
1358         &  OPEN(UNIT=knum,FILE='NUL', FORM=cdform, ACCESS=cdacce, STATUS=cdstat                      , ERR=100, IOSTAT=iost )   
1359      IF( iost == 0 ) THEN
1360         IF(ldwp) THEN
1361            WRITE(kout,*) '     file   : ', TRIM(clfile),' open ok'
1362            WRITE(kout,*) '     unit   = ', knum
1363            WRITE(kout,*) '     status = ', cdstat
1364            WRITE(kout,*) '     form   = ', cdform
1365            WRITE(kout,*) '     access = ', cdacce
1366            WRITE(kout,*)
1367         ENDIF
1368      ENDIF
1369100   CONTINUE
1370      IF( iost /= 0 ) THEN
1371         IF(ldwp) THEN
1372            WRITE(kout,*)
1373            WRITE(kout,*) ' ===>>>> : bad opening file: ', TRIM(clfile)
1374            WRITE(kout,*) ' =======   ===  '
1375            WRITE(kout,*) '           unit   = ', knum
1376            WRITE(kout,*) '           status = ', cdstat
1377            WRITE(kout,*) '           form   = ', cdform
1378            WRITE(kout,*) '           access = ', cdacce
1379            WRITE(kout,*) '           iostat = ', iost
1380            WRITE(kout,*) '           we stop. verify the file '
1381            WRITE(kout,*)
1382         ELSE  !!! Force writing to make sure we get the information - at least once - in this violent STOP!!
1383            WRITE(*,*)
1384            WRITE(*,*) ' ===>>>> : bad opening file: ', TRIM(clfile)
1385            WRITE(*,*) ' =======   ===  '
1386            WRITE(*,*) '           unit   = ', knum
1387            WRITE(*,*) '           status = ', cdstat
1388            WRITE(*,*) '           form   = ', cdform
1389            WRITE(*,*) '           access = ', cdacce
1390            WRITE(*,*) '           iostat = ', iost
1391            WRITE(*,*) '           we stop. verify the file '
1392            WRITE(*,*)
1393         ENDIF
1394         CALL FLUSH( kout ) 
1395         STOP 'ctl_opn bad opening'
1396      ENDIF
1397      !
1398   END SUBROUTINE ctl_opn
1399
1400
1401   SUBROUTINE ctl_nam ( kios, cdnam, ldwp )
1402      !!----------------------------------------------------------------------
1403      !!                  ***  ROUTINE ctl_nam  ***
1404      !!
1405      !! ** Purpose :   Informations when error while reading a namelist
1406      !!
1407      !! ** Method  :   Fortan open
1408      !!----------------------------------------------------------------------
1409      INTEGER         , INTENT(inout) ::   kios    ! IO status after reading the namelist
1410      CHARACTER(len=*), INTENT(in   ) ::   cdnam   ! group name of namelist for which error occurs
1411      CHARACTER(len=5)                ::   clios   ! string to convert iostat in character for print
1412      LOGICAL         , INTENT(in   ) ::   ldwp    ! boolean term for print
1413      !!----------------------------------------------------------------------
1414      !
1415      WRITE (clios, '(I5.0)')   kios
1416      IF( kios < 0 ) THEN         
1417         CALL ctl_warn( 'end of record or file while reading namelist '   &
1418            &           // TRIM(cdnam) // ' iostat = ' // TRIM(clios) )
1419      ENDIF
1420      !
1421      IF( kios > 0 ) THEN
1422         CALL ctl_stop( 'misspelled variable in namelist '   &
1423            &           // TRIM(cdnam) // ' iostat = ' // TRIM(clios) )
1424      ENDIF
1425      kios = 0
1426      RETURN
1427      !
1428   END SUBROUTINE ctl_nam
1429
1430
1431   INTEGER FUNCTION get_unit()
1432      !!----------------------------------------------------------------------
1433      !!                  ***  FUNCTION  get_unit  ***
1434      !!
1435      !! ** Purpose :   return the index of an unused logical unit
1436      !!----------------------------------------------------------------------
1437      LOGICAL :: llopn
1438      !!----------------------------------------------------------------------
1439      !
1440      get_unit = 15   ! choose a unit that is big enough then it is not already used in NEMO
1441      llopn = .TRUE.
1442      DO WHILE( (get_unit < 998) .AND. llopn )
1443         get_unit = get_unit + 1
1444         INQUIRE( unit = get_unit, opened = llopn )
1445      END DO
1446      IF( (get_unit == 999) .AND. llopn ) THEN
1447         CALL ctl_stop( 'get_unit: All logical units until 999 are used...' )
1448         get_unit = -1
1449      ENDIF
1450      !
1451   END FUNCTION get_unit
1452
1453   !!----------------------------------------------------------------------
1454END MODULE lib_mpp
Note: See TracBrowser for help on using the repository browser.