source: NEMO/branches/2019/dev_r10984_HPC-13_IRRMANN_BDY_optimization/src/OCE/LBC/lib_mpp.F90 @ 11194

Last change on this file since 11194 was 11194, checked in by smasson, 2 years ago

dev_r10984_HPC-13 : reorganization of lbclnk, part 1: bugfix following [11192], see #2285

  • Property svn:keywords set to Id
File size: 65.0 KB
Line 
1MODULE lib_mpp
2   !!======================================================================
3   !!                       ***  MODULE  lib_mpp  ***
4   !! Ocean numerics:  massively parallel processing library
5   !!=====================================================================
6   !! History :  OPA  !  1994  (M. Guyon, J. Escobar, M. Imbard)  Original code
7   !!            7.0  !  1997  (A.M. Treguier)  SHMEM additions
8   !!            8.0  !  1998  (M. Imbard, J. Escobar, L. Colombet ) SHMEM and MPI
9   !!                 !  1998  (J.M. Molines) Open boundary conditions
10   !!   NEMO     1.0  !  2003  (J.M. Molines, G. Madec)  F90, free form
11   !!                 !  2003  (J.M. Molines) add mpp_ini_north(_3d,_2d)
12   !!             -   !  2004  (R. Bourdalle Badie)  isend option in mpi
13   !!                 !  2004  (J.M. Molines) minloc, maxloc
14   !!             -   !  2005  (G. Madec, S. Masson)  npolj=5,6 F-point & ice cases
15   !!             -   !  2005  (R. Redler) Replacement of MPI_COMM_WORLD except for MPI_Abort
16   !!             -   !  2005  (R. Benshila, G. Madec)  add extra halo case
17   !!             -   !  2008  (R. Benshila) add mpp_ini_ice
18   !!            3.2  !  2009  (R. Benshila) SHMEM suppression, north fold in lbc_nfd
19   !!            3.2  !  2009  (O. Marti)    add mpp_ini_znl
20   !!            4.0  !  2011  (G. Madec)  move ctl_ routines from in_out_manager
21   !!            3.5  !  2012  (S.Mocavero, I. Epicoco) Add mpp_lnk_bdy_3d/2d routines to optimize the BDY comm.
22   !!            3.5  !  2013  (C. Ethe, G. Madec)  message passing arrays as local variables
23   !!            3.5  !  2013  (S.Mocavero, I.Epicoco - CMCC) north fold optimizations
24   !!            3.6  !  2015  (O. Tintó and M. Castrillo - BSC) Added '_multiple' case for 2D lbc and max
25   !!            4.0  !  2017  (G. Madec) automatique allocation of array argument (use any 3rd dimension)
26   !!             -   !  2017  (G. Madec) create generic.h90 files to generate all lbc and north fold routines
27   !!----------------------------------------------------------------------
28
29   !!----------------------------------------------------------------------
30   !!   ctl_stop      : update momentum and tracer Kz from a tke scheme
31   !!   ctl_warn      : initialization, namelist read, and parameters control
32   !!   ctl_opn       : Open file and check if required file is available.
33   !!   ctl_nam       : Prints informations when an error occurs while reading a namelist
34   !!   get_unit      : give the index of an unused logical unit
35   !!----------------------------------------------------------------------
36   !!----------------------------------------------------------------------
37   !!   mynode        : indentify the processor unit
38   !!   mpp_lnk       : interface (defined in lbclnk) for message passing of 2d or 3d arrays (mpp_lnk_2d, mpp_lnk_3d)
39   !!   mpp_lnk_icb   : interface for message passing of 2d arrays with extra halo for icebergs (mpp_lnk_2d_icb)
40   !!   mpprecv       :
41   !!   mppsend       :
42   !!   mppscatter    :
43   !!   mppgather     :
44   !!   mpp_min       : generic interface for mppmin_int , mppmin_a_int , mppmin_real, mppmin_a_real
45   !!   mpp_max       : generic interface for mppmax_int , mppmax_a_int , mppmax_real, mppmax_a_real
46   !!   mpp_sum       : generic interface for mppsum_int , mppsum_a_int , mppsum_real, mppsum_a_real
47   !!   mpp_minloc    :
48   !!   mpp_maxloc    :
49   !!   mppsync       :
50   !!   mppstop       :
51   !!   mpp_ini_north : initialisation of north fold
52   !!   mpp_lbc_north_icb : alternative to mpp_nfd for extra outer halo with icebergs
53   !!----------------------------------------------------------------------
54   USE dom_oce        ! ocean space and time domain
55   USE in_out_manager ! I/O manager
56
57   IMPLICIT NONE
58   PRIVATE
59   !
60   PUBLIC   ctl_stop, ctl_warn, get_unit, ctl_opn, ctl_nam
61   PUBLIC   mynode, mppstop, mppsync, mpp_comm_free
62   PUBLIC   mpp_ini_north
63   PUBLIC   mpp_min, mpp_max, mpp_sum, mpp_minloc, mpp_maxloc
64   PUBLIC   mpp_delay_max, mpp_delay_sum, mpp_delay_rcv
65   PUBLIC   mppscatter, mppgather
66   PUBLIC   mpp_ini_znl
67   PUBLIC   mppsend, mpprecv                          ! needed by TAM and ICB routines
68   PUBLIC   mpp_report
69   PUBLIC   tic_tac
70   
71   !! * Interfaces
72   !! define generic interface for these routine as they are called sometimes
73   !! with scalar arguments instead of array arguments, which causes problems
74   !! for the compilation on AIX system as well as NEC and SGI. Ok on COMPACQ
75   INTERFACE mpp_min
76      MODULE PROCEDURE mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real
77   END INTERFACE
78   INTERFACE mpp_max
79      MODULE PROCEDURE mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real
80   END INTERFACE
81   INTERFACE mpp_sum
82      MODULE PROCEDURE mppsum_a_int, mppsum_int, mppsum_a_real, mppsum_real,   &
83         &             mppsum_realdd, mppsum_a_realdd
84   END INTERFACE
85   INTERFACE mpp_minloc
86      MODULE PROCEDURE mpp_minloc2d ,mpp_minloc3d
87   END INTERFACE
88   INTERFACE mpp_maxloc
89      MODULE PROCEDURE mpp_maxloc2d ,mpp_maxloc3d
90   END INTERFACE
91
92   !! ========================= !!
93   !!  MPI  variable definition !!
94   !! ========================= !!
95#if   defined key_mpp_mpi
96!$AGRIF_DO_NOT_TREAT
97   INCLUDE 'mpif.h'
98!$AGRIF_END_DO_NOT_TREAT
99   LOGICAL, PUBLIC, PARAMETER ::   lk_mpp = .TRUE.    !: mpp flag
100#else   
101   INTEGER, PUBLIC, PARAMETER ::   MPI_STATUS_SIZE = 1
102   INTEGER, PUBLIC, PARAMETER ::   MPI_DOUBLE_PRECISION = 8
103   LOGICAL, PUBLIC, PARAMETER ::   lk_mpp = .FALSE.    !: mpp flag
104#endif
105
106   INTEGER, PARAMETER         ::   nprocmax = 2**10   ! maximun dimension (required to be a power of 2)
107
108   INTEGER, PUBLIC ::   mppsize        ! number of process
109   INTEGER, PUBLIC ::   mpprank        ! process number  [ 0 - size-1 ]
110!$AGRIF_DO_NOT_TREAT
111   INTEGER, PUBLIC ::   mpi_comm_oce   ! opa local communicator
112!$AGRIF_END_DO_NOT_TREAT
113
114   INTEGER :: MPI_SUMDD
115
116   ! variables used for zonal integration
117   INTEGER, PUBLIC ::   ncomm_znl       !: communicator made by the processors on the same zonal average
118   LOGICAL, PUBLIC ::   l_znl_root      !: True on the 'left'most processor on the same row
119   INTEGER         ::   ngrp_znl        !  group ID for the znl processors
120   INTEGER         ::   ndim_rank_znl   !  number of processors on the same zonal average
121   INTEGER, DIMENSION(:), ALLOCATABLE, SAVE ::   nrank_znl  ! dimension ndim_rank_znl, number of the procs into the same znl domain
122
123   ! North fold condition in mpp_mpi with jpni > 1 (PUBLIC for TAM)
124   INTEGER, PUBLIC ::   ngrp_world        !: group ID for the world processors
125   INTEGER, PUBLIC ::   ngrp_opa          !: group ID for the opa processors
126   INTEGER, PUBLIC ::   ngrp_north        !: group ID for the northern processors (to be fold)
127   INTEGER, PUBLIC ::   ncomm_north       !: communicator made by the processors belonging to ngrp_north
128   INTEGER, PUBLIC ::   ndim_rank_north   !: number of 'sea' processor in the northern line (can be /= jpni !)
129   INTEGER, PUBLIC ::   njmppmax          !: value of njmpp for the processors of the northern line
130   INTEGER, PUBLIC ::   north_root        !: number (in the comm_opa) of proc 0 in the northern comm
131   INTEGER, PUBLIC, DIMENSION(:), ALLOCATABLE, SAVE ::   nrank_north   !: dimension ndim_rank_north
132
133   ! Type of send : standard, buffered, immediate
134   CHARACTER(len=1), PUBLIC ::   cn_mpi_send        !: type od mpi send/recieve (S=standard, B=bsend, I=isend)
135   LOGICAL         , PUBLIC ::   l_isend = .FALSE.  !: isend use indicator (T if cn_mpi_send='I')
136   INTEGER         , PUBLIC ::   nn_buffer          !: size of the buffer in case of mpi_bsend
137
138   ! Communications summary report
139   CHARACTER(len=128), DIMENSION(:), ALLOCATABLE ::   crname_lbc                   !: names of lbc_lnk calling routines
140   CHARACTER(len=128), DIMENSION(:), ALLOCATABLE ::   crname_glb                   !: names of global comm calling routines
141   CHARACTER(len=128), DIMENSION(:), ALLOCATABLE ::   crname_dlg                   !: names of delayed global comm calling routines
142   INTEGER, PUBLIC                               ::   ncom_stp = 0                 !: copy of time step # istp
143   INTEGER, PUBLIC                               ::   ncom_fsbc = 1                !: copy of sbc time step # nn_fsbc
144   INTEGER, PUBLIC                               ::   ncom_dttrc = 1               !: copy of top time step # nn_dttrc
145   INTEGER, PUBLIC                               ::   ncom_freq                    !: frequency of comm diagnostic
146   INTEGER, PUBLIC , DIMENSION(:,:), ALLOCATABLE ::   ncomm_sequence               !: size of communicated arrays (halos)
147   INTEGER, PARAMETER, PUBLIC                    ::   ncom_rec_max = 5000          !: max number of communication record
148   INTEGER, PUBLIC                               ::   n_sequence_lbc = 0           !: # of communicated arraysvia lbc
149   INTEGER, PUBLIC                               ::   n_sequence_glb = 0           !: # of global communications
150   INTEGER, PUBLIC                               ::   n_sequence_dlg = 0           !: # of delayed global communications
151   INTEGER, PUBLIC                               ::   numcom = -1                  !: logical unit for communicaton report
152   LOGICAL, PUBLIC                               ::   l_full_nf_update = .TRUE.    !: logical for a full (2lines) update of bc at North fold report
153   INTEGER,                    PARAMETER, PUBLIC ::   nbdelay = 2       !: number of delayed operations
154   !: name (used as id) of allreduce-delayed operations
155   ! Warning: we must use the same character length in an array constructor (at least for gcc compiler)
156   CHARACTER(len=32), DIMENSION(nbdelay), PUBLIC ::   c_delaylist = (/ 'cflice', 'fwb   ' /)
157   !: component name where the allreduce-delayed operation is performed
158   CHARACTER(len=3),  DIMENSION(nbdelay), PUBLIC ::   c_delaycpnt = (/ 'ICE'   , 'OCE' /)
159   TYPE, PUBLIC ::   DELAYARR
160      REAL(   wp), POINTER, DIMENSION(:) ::  z1d => NULL()
161      COMPLEX(wp), POINTER, DIMENSION(:) ::  y1d => NULL()
162   END TYPE DELAYARR
163   TYPE( DELAYARR ), DIMENSION(nbdelay), PUBLIC, SAVE  ::   todelay         !: must have SAVE for default initialization of DELAYARR
164   INTEGER,          DIMENSION(nbdelay), PUBLIC        ::   ndelayid = -1   !: mpi request id of the delayed operations
165
166   ! timing summary report
167   REAL(wp), DIMENSION(2), PUBLIC ::  waiting_time = 0._wp
168   REAL(wp)              , PUBLIC ::  compute_time = 0._wp, elapsed_time = 0._wp
169   
170   REAL(wp), DIMENSION(:), ALLOCATABLE, SAVE ::   tampon   ! buffer in case of bsend
171
172   LOGICAL, PUBLIC ::   ln_nnogather                !: namelist control of northfold comms
173   LOGICAL, PUBLIC ::   l_north_nogather = .FALSE.  !: internal control of northfold comms
174   
175   !!----------------------------------------------------------------------
176   !! NEMO/OCE 4.0 , NEMO Consortium (2018)
177   !! $Id$
178   !! Software governed by the CeCILL license (see ./LICENSE)
179   !!----------------------------------------------------------------------
180CONTAINS
181
182   FUNCTION mynode( ldtxt, ldname, kumnam_ref, kumnam_cfg, kumond, kstop, localComm )
183      !!----------------------------------------------------------------------
184      !!                  ***  routine mynode  ***
185      !!
186      !! ** Purpose :   Find processor unit
187      !!----------------------------------------------------------------------
188      CHARACTER(len=*),DIMENSION(:), INTENT(  out) ::   ldtxt        !
189      CHARACTER(len=*)             , INTENT(in   ) ::   ldname       !
190      INTEGER                      , INTENT(in   ) ::   kumnam_ref   ! logical unit for reference namelist
191      INTEGER                      , INTENT(in   ) ::   kumnam_cfg   ! logical unit for configuration namelist
192      INTEGER                      , INTENT(inout) ::   kumond       ! logical unit for namelist output
193      INTEGER                      , INTENT(inout) ::   kstop        ! stop indicator
194      INTEGER         , OPTIONAL   , INTENT(in   ) ::   localComm    !
195      !
196      INTEGER ::   mynode, ierr, code, ji, ii, ios
197      LOGICAL ::   mpi_was_called
198      !
199      NAMELIST/nammpp/ cn_mpi_send, nn_buffer, jpni, jpnj, ln_nnogather
200      !!----------------------------------------------------------------------
201#if defined key_mpp_mpi
202      !
203      ii = 1
204      WRITE(ldtxt(ii),*)                                                                  ;   ii = ii + 1
205      WRITE(ldtxt(ii),*) 'mynode : mpi initialisation'                                    ;   ii = ii + 1
206      WRITE(ldtxt(ii),*) '~~~~~~ '                                                        ;   ii = ii + 1
207      !
208      REWIND( kumnam_ref )              ! Namelist nammpp in reference namelist: mpi variables
209      READ  ( kumnam_ref, nammpp, IOSTAT = ios, ERR = 901)
210901   IF( ios /= 0 )   CALL ctl_nam ( ios , 'nammpp in reference namelist', lwp )
211      !
212      REWIND( kumnam_cfg )              ! Namelist nammpp in configuration namelist: mpi variables
213      READ  ( kumnam_cfg, nammpp, IOSTAT = ios, ERR = 902 )
214902   IF( ios >  0 )   CALL ctl_nam ( ios , 'nammpp in configuration namelist', lwp )
215      !
216      !                              ! control print
217      WRITE(ldtxt(ii),*) '   Namelist nammpp'                                             ;   ii = ii + 1
218      WRITE(ldtxt(ii),*) '      mpi send type          cn_mpi_send = ', cn_mpi_send       ;   ii = ii + 1
219      WRITE(ldtxt(ii),*) '      size exported buffer   nn_buffer   = ', nn_buffer,' bytes';   ii = ii + 1
220      !
221      IF( jpni < 1 .OR. jpnj < 1  ) THEN
222         WRITE(ldtxt(ii),*) '      jpni and jpnj will be calculated automatically' ;   ii = ii + 1
223      ELSE
224         WRITE(ldtxt(ii),*) '      processor grid extent in i         jpni = ',jpni       ;   ii = ii + 1
225         WRITE(ldtxt(ii),*) '      processor grid extent in j         jpnj = ',jpnj       ;   ii = ii + 1
226      ENDIF
227
228      WRITE(ldtxt(ii),*) '      avoid use of mpi_allgather at the north fold  ln_nnogather = ', ln_nnogather  ; ii = ii + 1
229
230      CALL mpi_initialized ( mpi_was_called, code )
231      IF( code /= MPI_SUCCESS ) THEN
232         DO ji = 1, SIZE(ldtxt)
233            IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
234         END DO
235         WRITE(*, cform_err)
236         WRITE(*, *) 'lib_mpp: Error in routine mpi_initialized'
237         CALL mpi_abort( mpi_comm_world, code, ierr )
238      ENDIF
239
240      IF( mpi_was_called ) THEN
241         !
242         SELECT CASE ( cn_mpi_send )
243         CASE ( 'S' )                ! Standard mpi send (blocking)
244            WRITE(ldtxt(ii),*) '           Standard blocking mpi send (send)'             ;   ii = ii + 1
245         CASE ( 'B' )                ! Buffer mpi send (blocking)
246            WRITE(ldtxt(ii),*) '           Buffer blocking mpi send (bsend)'              ;   ii = ii + 1
247            IF( Agrif_Root() )   CALL mpi_init_oce( ldtxt, ii, ierr )
248         CASE ( 'I' )                ! Immediate mpi send (non-blocking send)
249            WRITE(ldtxt(ii),*) '           Immediate non-blocking send (isend)'           ;   ii = ii + 1
250            l_isend = .TRUE.
251         CASE DEFAULT
252            WRITE(ldtxt(ii),cform_err)                                                    ;   ii = ii + 1
253            WRITE(ldtxt(ii),*) '           bad value for cn_mpi_send = ', cn_mpi_send     ;   ii = ii + 1
254            kstop = kstop + 1
255         END SELECT
256         !
257      ELSEIF ( PRESENT(localComm) .AND. .NOT. mpi_was_called ) THEN
258         WRITE(ldtxt(ii),cform_err)                                                    ;   ii = ii + 1
259         WRITE(ldtxt(ii),*) ' lib_mpp: You cannot provide a local communicator '          ;   ii = ii + 1
260         WRITE(ldtxt(ii),*) '          without calling MPI_Init before ! '                ;   ii = ii + 1
261         kstop = kstop + 1
262      ELSE
263         SELECT CASE ( cn_mpi_send )
264         CASE ( 'S' )                ! Standard mpi send (blocking)
265            WRITE(ldtxt(ii),*) '           Standard blocking mpi send (send)'             ;   ii = ii + 1
266            CALL mpi_init( ierr )
267         CASE ( 'B' )                ! Buffer mpi send (blocking)
268            WRITE(ldtxt(ii),*) '           Buffer blocking mpi send (bsend)'              ;   ii = ii + 1
269            IF( Agrif_Root() )   CALL mpi_init_oce( ldtxt, ii, ierr )
270         CASE ( 'I' )                ! Immediate mpi send (non-blocking send)
271            WRITE(ldtxt(ii),*) '           Immediate non-blocking send (isend)'           ;   ii = ii + 1
272            l_isend = .TRUE.
273            CALL mpi_init( ierr )
274         CASE DEFAULT
275            WRITE(ldtxt(ii),cform_err)                                                    ;   ii = ii + 1
276            WRITE(ldtxt(ii),*) '           bad value for cn_mpi_send = ', cn_mpi_send     ;   ii = ii + 1
277            kstop = kstop + 1
278         END SELECT
279         !
280      ENDIF
281
282      IF( PRESENT(localComm) ) THEN
283         IF( Agrif_Root() ) THEN
284            mpi_comm_oce = localComm
285         ENDIF
286      ELSE
287         CALL mpi_comm_dup( mpi_comm_world, mpi_comm_oce, code)
288         IF( code /= MPI_SUCCESS ) THEN
289            DO ji = 1, SIZE(ldtxt)
290               IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
291            END DO
292            WRITE(*, cform_err)
293            WRITE(*, *) ' lib_mpp: Error in routine mpi_comm_dup'
294            CALL mpi_abort( mpi_comm_world, code, ierr )
295         ENDIF
296      ENDIF
297
298# if defined key_agrif
299      IF( Agrif_Root() ) THEN
300         CALL Agrif_MPI_Init(mpi_comm_oce)
301      ELSE
302         CALL Agrif_MPI_set_grid_comm(mpi_comm_oce)
303      ENDIF
304# endif
305
306      CALL mpi_comm_rank( mpi_comm_oce, mpprank, ierr )
307      CALL mpi_comm_size( mpi_comm_oce, mppsize, ierr )
308      mynode = mpprank
309
310      IF( mynode == 0 ) THEN
311         CALL ctl_opn( kumond, TRIM(ldname), 'UNKNOWN', 'FORMATTED', 'SEQUENTIAL', -1, 6, .FALSE. , 1 )
312         WRITE(kumond, nammpp)     
313      ENDIF
314      !
315      CALL MPI_OP_CREATE(DDPDD_MPI, .TRUE., MPI_SUMDD, ierr)
316      !
317#else
318      IF( PRESENT( localComm ) ) mpi_comm_oce = localComm
319      mynode = 0
320      CALL ctl_opn( kumond, TRIM(ldname), 'UNKNOWN', 'FORMATTED', 'SEQUENTIAL', -1, 6, .FALSE. , 1 )
321#endif
322   END FUNCTION mynode
323
324
325   SUBROUTINE mppsend( ktyp, pmess, kbytes, kdest, md_req )
326      !!----------------------------------------------------------------------
327      !!                  ***  routine mppsend  ***
328      !!
329      !! ** Purpose :   Send messag passing array
330      !!
331      !!----------------------------------------------------------------------
332      REAL(wp), INTENT(inout) ::   pmess(*)   ! array of real
333      INTEGER , INTENT(in   ) ::   kbytes     ! size of the array pmess
334      INTEGER , INTENT(in   ) ::   kdest      ! receive process number
335      INTEGER , INTENT(in   ) ::   ktyp       ! tag of the message
336      INTEGER , INTENT(in   ) ::   md_req     ! argument for isend
337      !!
338      INTEGER ::   iflag
339      !!----------------------------------------------------------------------
340      !
341#if defined key_mpp_mpi
342      SELECT CASE ( cn_mpi_send )
343      CASE ( 'S' )                ! Standard mpi send (blocking)
344         CALL mpi_send ( pmess, kbytes, mpi_double_precision, kdest , ktyp, mpi_comm_oce        , iflag )
345      CASE ( 'B' )                ! Buffer mpi send (blocking)
346         CALL mpi_bsend( pmess, kbytes, mpi_double_precision, kdest , ktyp, mpi_comm_oce        , iflag )
347      CASE ( 'I' )                ! Immediate mpi send (non-blocking send)
348         ! be carefull, one more argument here : the mpi request identifier..
349         CALL mpi_isend( pmess, kbytes, mpi_double_precision, kdest , ktyp, mpi_comm_oce, md_req, iflag )
350      END SELECT
351#endif
352      !
353   END SUBROUTINE mppsend
354
355
356   SUBROUTINE mpprecv( ktyp, pmess, kbytes, ksource )
357      !!----------------------------------------------------------------------
358      !!                  ***  routine mpprecv  ***
359      !!
360      !! ** Purpose :   Receive messag passing array
361      !!
362      !!----------------------------------------------------------------------
363      REAL(wp), INTENT(inout) ::   pmess(*)   ! array of real
364      INTEGER , INTENT(in   ) ::   kbytes     ! suze of the array pmess
365      INTEGER , INTENT(in   ) ::   ktyp       ! Tag of the recevied message
366      INTEGER, OPTIONAL, INTENT(in) :: ksource    ! source process number
367      !!
368      INTEGER :: istatus(mpi_status_size)
369      INTEGER :: iflag
370      INTEGER :: use_source
371      !!----------------------------------------------------------------------
372      !
373#if defined key_mpp_mpi
374      ! If a specific process number has been passed to the receive call,
375      ! use that one. Default is to use mpi_any_source
376      use_source = mpi_any_source
377      IF( PRESENT(ksource) )   use_source = ksource
378      !
379      CALL mpi_recv( pmess, kbytes, mpi_double_precision, use_source, ktyp, mpi_comm_oce, istatus, iflag )
380#endif
381      !
382   END SUBROUTINE mpprecv
383
384
385   SUBROUTINE mppgather( ptab, kp, pio )
386      !!----------------------------------------------------------------------
387      !!                   ***  routine mppgather  ***
388      !!
389      !! ** Purpose :   Transfert between a local subdomain array and a work
390      !!     array which is distributed following the vertical level.
391      !!
392      !!----------------------------------------------------------------------
393      REAL(wp), DIMENSION(jpi,jpj)      , INTENT(in   ) ::   ptab   ! subdomain input array
394      INTEGER                           , INTENT(in   ) ::   kp     ! record length
395      REAL(wp), DIMENSION(jpi,jpj,jpnij), INTENT(  out) ::   pio    ! subdomain input array
396      !!
397      INTEGER :: itaille, ierror   ! temporary integer
398      !!---------------------------------------------------------------------
399      !
400      itaille = jpi * jpj
401#if defined key_mpp_mpi
402      CALL mpi_gather( ptab, itaille, mpi_double_precision, pio, itaille     ,   &
403         &                            mpi_double_precision, kp , mpi_comm_oce, ierror )
404#else
405      pio(:,:,1) = ptab(:,:)
406#endif
407      !
408   END SUBROUTINE mppgather
409
410
411   SUBROUTINE mppscatter( pio, kp, ptab )
412      !!----------------------------------------------------------------------
413      !!                  ***  routine mppscatter  ***
414      !!
415      !! ** Purpose :   Transfert between awork array which is distributed
416      !!      following the vertical level and the local subdomain array.
417      !!
418      !!----------------------------------------------------------------------
419      REAL(wp), DIMENSION(jpi,jpj,jpnij)  ::   pio    ! output array
420      INTEGER                             ::   kp     ! Tag (not used with MPI
421      REAL(wp), DIMENSION(jpi,jpj)        ::   ptab   ! subdomain array input
422      !!
423      INTEGER :: itaille, ierror   ! temporary integer
424      !!---------------------------------------------------------------------
425      !
426      itaille = jpi * jpj
427      !
428#if defined key_mpp_mpi
429      CALL mpi_scatter( pio, itaille, mpi_double_precision, ptab, itaille     ,   &
430         &                            mpi_double_precision, kp  , mpi_comm_oce, ierror )
431#else
432      ptab(:,:) = pio(:,:,1)
433#endif
434      !
435   END SUBROUTINE mppscatter
436
437   
438   SUBROUTINE mpp_delay_sum( cdname, cdelay, y_in, pout, ldlast, kcom )
439     !!----------------------------------------------------------------------
440      !!                   ***  routine mpp_delay_sum  ***
441      !!
442      !! ** Purpose :   performed delayed mpp_sum, the result is received on next call
443      !!
444      !!----------------------------------------------------------------------
445      CHARACTER(len=*), INTENT(in   )               ::   cdname  ! name of the calling subroutine
446      CHARACTER(len=*), INTENT(in   )               ::   cdelay  ! name (used as id) of the delayed operation
447      COMPLEX(wp),      INTENT(in   ), DIMENSION(:) ::   y_in
448      REAL(wp),         INTENT(  out), DIMENSION(:) ::   pout
449      LOGICAL,          INTENT(in   )               ::   ldlast  ! true if this is the last time we call this routine
450      INTEGER,          INTENT(in   ), OPTIONAL     ::   kcom
451      !!
452      INTEGER ::   ji, isz
453      INTEGER ::   idvar
454      INTEGER ::   ierr, ilocalcomm
455      COMPLEX(wp), ALLOCATABLE, DIMENSION(:) ::   ytmp
456      !!----------------------------------------------------------------------
457#if defined key_mpp_mpi
458      ilocalcomm = mpi_comm_oce
459      IF( PRESENT(kcom) )   ilocalcomm = kcom
460
461      isz = SIZE(y_in)
462     
463      IF( narea == 1 .AND. numcom == -1 ) CALL mpp_report( cdname, ld_dlg = .TRUE. )
464
465      idvar = -1
466      DO ji = 1, nbdelay
467         IF( TRIM(cdelay) == TRIM(c_delaylist(ji)) ) idvar = ji
468      END DO
469      IF ( idvar == -1 )   CALL ctl_stop( 'STOP',' mpp_delay_sum : please add a new delayed exchange for '//TRIM(cdname) )
470
471      IF ( ndelayid(idvar) == 0 ) THEN         ! first call    with restart: %z1d defined in iom_delay_rst
472         !                                       --------------------------
473         IF ( SIZE(todelay(idvar)%z1d) /= isz ) THEN                  ! Check dimension coherence
474            IF(lwp) WRITE(numout,*) ' WARNING: the nb of delayed variables in restart file is not the model one'
475            DEALLOCATE(todelay(idvar)%z1d)
476            ndelayid(idvar) = -1                                      ! do as if we had no restart
477         ELSE
478            ALLOCATE(todelay(idvar)%y1d(isz))
479            todelay(idvar)%y1d(:) = CMPLX(todelay(idvar)%z1d(:), 0., wp)   ! create %y1d, complex variable needed by mpi_sumdd
480         END IF
481      ENDIF
482     
483      IF( ndelayid(idvar) == -1 ) THEN         ! first call without restart: define %y1d and %z1d from y_in with blocking allreduce
484         !                                       --------------------------
485         ALLOCATE(todelay(idvar)%z1d(isz), todelay(idvar)%y1d(isz))   ! allocate also %z1d as used for the restart
486         CALL mpi_allreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ierr )   ! get %y1d
487         todelay(idvar)%z1d(:) = REAL(todelay(idvar)%y1d(:), wp)      ! define %z1d from %y1d
488      ENDIF
489
490      IF( ndelayid(idvar) > 0 )   CALL mpp_delay_rcv( idvar )         ! make sure %z1d is received
491
492      ! send back pout from todelay(idvar)%z1d defined at previous call
493      pout(:) = todelay(idvar)%z1d(:)
494
495      ! send y_in into todelay(idvar)%y1d with a non-blocking communication
496# if defined key_mpi2
497      IF( ln_timing ) CALL tic_tac( .TRUE., ld_global = .TRUE.)
498      CALL  mpi_allreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ndelayid(idvar), ierr )
499      IF( ln_timing ) CALL tic_tac(.FALSE., ld_global = .TRUE.)
500# else
501      CALL mpi_iallreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ndelayid(idvar), ierr )
502# endif
503#else
504      pout(:) = REAL(y_in(:), wp)
505#endif
506
507   END SUBROUTINE mpp_delay_sum
508
509   
510   SUBROUTINE mpp_delay_max( cdname, cdelay, p_in, pout, ldlast, kcom )
511      !!----------------------------------------------------------------------
512      !!                   ***  routine mpp_delay_max  ***
513      !!
514      !! ** Purpose :   performed delayed mpp_max, the result is received on next call
515      !!
516      !!----------------------------------------------------------------------
517      CHARACTER(len=*), INTENT(in   )                 ::   cdname  ! name of the calling subroutine
518      CHARACTER(len=*), INTENT(in   )                 ::   cdelay  ! name (used as id) of the delayed operation
519      REAL(wp),         INTENT(in   ), DIMENSION(:)   ::   p_in    !
520      REAL(wp),         INTENT(  out), DIMENSION(:)   ::   pout    !
521      LOGICAL,          INTENT(in   )                 ::   ldlast  ! true if this is the last time we call this routine
522      INTEGER,          INTENT(in   ), OPTIONAL       ::   kcom
523      !!
524      INTEGER ::   ji, isz
525      INTEGER ::   idvar
526      INTEGER ::   ierr, ilocalcomm
527      !!----------------------------------------------------------------------
528#if defined key_mpp_mpi
529      ilocalcomm = mpi_comm_oce
530      IF( PRESENT(kcom) )   ilocalcomm = kcom
531
532      isz = SIZE(p_in)
533
534      IF( narea == 1 .AND. numcom == -1 ) CALL mpp_report( cdname, ld_dlg = .TRUE. )
535
536      idvar = -1
537      DO ji = 1, nbdelay
538         IF( TRIM(cdelay) == TRIM(c_delaylist(ji)) ) idvar = ji
539      END DO
540      IF ( idvar == -1 )   CALL ctl_stop( 'STOP',' mpp_delay_max : please add a new delayed exchange for '//TRIM(cdname) )
541
542      IF ( ndelayid(idvar) == 0 ) THEN         ! first call    with restart: %z1d defined in iom_delay_rst
543         !                                       --------------------------
544         IF ( SIZE(todelay(idvar)%z1d) /= isz ) THEN                  ! Check dimension coherence
545            IF(lwp) WRITE(numout,*) ' WARNING: the nb of delayed variables in restart file is not the model one'
546            DEALLOCATE(todelay(idvar)%z1d)
547            ndelayid(idvar) = -1                                      ! do as if we had no restart
548         END IF
549      ENDIF
550
551      IF( ndelayid(idvar) == -1 ) THEN         ! first call without restart: define %z1d from p_in with a blocking allreduce
552         !                                       --------------------------
553         ALLOCATE(todelay(idvar)%z1d(isz))
554         CALL mpi_allreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_DOUBLE_PRECISION, mpi_max, ilocalcomm, ierr )   ! get %z1d
555      ENDIF
556
557      IF( ndelayid(idvar) > 0 )   CALL mpp_delay_rcv( idvar )         ! make sure %z1d is received
558
559      ! send back pout from todelay(idvar)%z1d defined at previous call
560      pout(:) = todelay(idvar)%z1d(:)
561
562      ! send p_in into todelay(idvar)%z1d with a non-blocking communication
563# if defined key_mpi2
564      IF( ln_timing ) CALL tic_tac( .TRUE., ld_global = .TRUE.)
565      CALL  mpi_allreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_DOUBLE_PRECISION, mpi_max, ilocalcomm, ndelayid(idvar), ierr )
566      IF( ln_timing ) CALL tic_tac(.FALSE., ld_global = .TRUE.)
567# else
568      CALL mpi_iallreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_DOUBLE_PRECISION, mpi_max, ilocalcomm, ndelayid(idvar), ierr )
569# endif
570#else
571      pout(:) = p_in(:)
572#endif
573
574   END SUBROUTINE mpp_delay_max
575
576   
577   SUBROUTINE mpp_delay_rcv( kid )
578      !!----------------------------------------------------------------------
579      !!                   ***  routine mpp_delay_rcv  ***
580      !!
581      !! ** Purpose :  force barrier for delayed mpp (needed for restart)
582      !!
583      !!----------------------------------------------------------------------
584      INTEGER,INTENT(in   )      ::  kid 
585      INTEGER ::   ierr
586      !!----------------------------------------------------------------------
587#if defined key_mpp_mpi
588      IF( ndelayid(kid) /= -2 ) THEN 
589#if ! defined key_mpi2
590         IF( ln_timing ) CALL tic_tac( .TRUE., ld_global = .TRUE.)
591         CALL mpi_wait( ndelayid(kid), MPI_STATUS_IGNORE, ierr )                        ! make sure todelay(kid) is received
592         IF( ln_timing ) CALL tic_tac(.FALSE., ld_global = .TRUE.)
593#endif
594         IF( ASSOCIATED(todelay(kid)%y1d) )   todelay(kid)%z1d(:) = REAL(todelay(kid)%y1d(:), wp)  ! define %z1d from %y1d
595         ndelayid(kid) = -2   ! add flag to know that mpi_wait was already called on kid
596      ENDIF
597#endif
598   END SUBROUTINE mpp_delay_rcv
599
600   
601   !!----------------------------------------------------------------------
602   !!    ***  mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real  ***
603   !!   
604   !!----------------------------------------------------------------------
605   !!
606#  define OPERATION_MAX
607#  define INTEGER_TYPE
608#  define DIM_0d
609#     define ROUTINE_ALLREDUCE           mppmax_int
610#     include "mpp_allreduce_generic.h90"
611#     undef ROUTINE_ALLREDUCE
612#  undef DIM_0d
613#  define DIM_1d
614#     define ROUTINE_ALLREDUCE           mppmax_a_int
615#     include "mpp_allreduce_generic.h90"
616#     undef ROUTINE_ALLREDUCE
617#  undef DIM_1d
618#  undef INTEGER_TYPE
619!
620#  define REAL_TYPE
621#  define DIM_0d
622#     define ROUTINE_ALLREDUCE           mppmax_real
623#     include "mpp_allreduce_generic.h90"
624#     undef ROUTINE_ALLREDUCE
625#  undef DIM_0d
626#  define DIM_1d
627#     define ROUTINE_ALLREDUCE           mppmax_a_real
628#     include "mpp_allreduce_generic.h90"
629#     undef ROUTINE_ALLREDUCE
630#  undef DIM_1d
631#  undef REAL_TYPE
632#  undef OPERATION_MAX
633   !!----------------------------------------------------------------------
634   !!    ***  mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real  ***
635   !!   
636   !!----------------------------------------------------------------------
637   !!
638#  define OPERATION_MIN
639#  define INTEGER_TYPE
640#  define DIM_0d
641#     define ROUTINE_ALLREDUCE           mppmin_int
642#     include "mpp_allreduce_generic.h90"
643#     undef ROUTINE_ALLREDUCE
644#  undef DIM_0d
645#  define DIM_1d
646#     define ROUTINE_ALLREDUCE           mppmin_a_int
647#     include "mpp_allreduce_generic.h90"
648#     undef ROUTINE_ALLREDUCE
649#  undef DIM_1d
650#  undef INTEGER_TYPE
651!
652#  define REAL_TYPE
653#  define DIM_0d
654#     define ROUTINE_ALLREDUCE           mppmin_real
655#     include "mpp_allreduce_generic.h90"
656#     undef ROUTINE_ALLREDUCE
657#  undef DIM_0d
658#  define DIM_1d
659#     define ROUTINE_ALLREDUCE           mppmin_a_real
660#     include "mpp_allreduce_generic.h90"
661#     undef ROUTINE_ALLREDUCE
662#  undef DIM_1d
663#  undef REAL_TYPE
664#  undef OPERATION_MIN
665
666   !!----------------------------------------------------------------------
667   !!    ***  mppsum_a_int, mppsum_int, mppsum_a_real, mppsum_real  ***
668   !!   
669   !!   Global sum of 1D array or a variable (integer, real or complex)
670   !!----------------------------------------------------------------------
671   !!
672#  define OPERATION_SUM
673#  define INTEGER_TYPE
674#  define DIM_0d
675#     define ROUTINE_ALLREDUCE           mppsum_int
676#     include "mpp_allreduce_generic.h90"
677#     undef ROUTINE_ALLREDUCE
678#  undef DIM_0d
679#  define DIM_1d
680#     define ROUTINE_ALLREDUCE           mppsum_a_int
681#     include "mpp_allreduce_generic.h90"
682#     undef ROUTINE_ALLREDUCE
683#  undef DIM_1d
684#  undef INTEGER_TYPE
685!
686#  define REAL_TYPE
687#  define DIM_0d
688#     define ROUTINE_ALLREDUCE           mppsum_real
689#     include "mpp_allreduce_generic.h90"
690#     undef ROUTINE_ALLREDUCE
691#  undef DIM_0d
692#  define DIM_1d
693#     define ROUTINE_ALLREDUCE           mppsum_a_real
694#     include "mpp_allreduce_generic.h90"
695#     undef ROUTINE_ALLREDUCE
696#  undef DIM_1d
697#  undef REAL_TYPE
698#  undef OPERATION_SUM
699
700#  define OPERATION_SUM_DD
701#  define COMPLEX_TYPE
702#  define DIM_0d
703#     define ROUTINE_ALLREDUCE           mppsum_realdd
704#     include "mpp_allreduce_generic.h90"
705#     undef ROUTINE_ALLREDUCE
706#  undef DIM_0d
707#  define DIM_1d
708#     define ROUTINE_ALLREDUCE           mppsum_a_realdd
709#     include "mpp_allreduce_generic.h90"
710#     undef ROUTINE_ALLREDUCE
711#  undef DIM_1d
712#  undef COMPLEX_TYPE
713#  undef OPERATION_SUM_DD
714
715   !!----------------------------------------------------------------------
716   !!    ***  mpp_minloc2d, mpp_minloc3d, mpp_maxloc2d, mpp_maxloc3d
717   !!   
718   !!----------------------------------------------------------------------
719   !!
720#  define OPERATION_MINLOC
721#  define DIM_2d
722#     define ROUTINE_LOC           mpp_minloc2d
723#     include "mpp_loc_generic.h90"
724#     undef ROUTINE_LOC
725#  undef DIM_2d
726#  define DIM_3d
727#     define ROUTINE_LOC           mpp_minloc3d
728#     include "mpp_loc_generic.h90"
729#     undef ROUTINE_LOC
730#  undef DIM_3d
731#  undef OPERATION_MINLOC
732
733#  define OPERATION_MAXLOC
734#  define DIM_2d
735#     define ROUTINE_LOC           mpp_maxloc2d
736#     include "mpp_loc_generic.h90"
737#     undef ROUTINE_LOC
738#  undef DIM_2d
739#  define DIM_3d
740#     define ROUTINE_LOC           mpp_maxloc3d
741#     include "mpp_loc_generic.h90"
742#     undef ROUTINE_LOC
743#  undef DIM_3d
744#  undef OPERATION_MAXLOC
745
746   SUBROUTINE mppsync()
747      !!----------------------------------------------------------------------
748      !!                  ***  routine mppsync  ***
749      !!
750      !! ** Purpose :   Massively parallel processors, synchroneous
751      !!
752      !!-----------------------------------------------------------------------
753      INTEGER :: ierror
754      !!-----------------------------------------------------------------------
755      !
756#if defined key_mpp_mpi
757      CALL mpi_barrier( mpi_comm_oce, ierror )
758#endif
759      !
760   END SUBROUTINE mppsync
761
762
763   SUBROUTINE mppstop( ldfinal, ld_force_abort ) 
764      !!----------------------------------------------------------------------
765      !!                  ***  routine mppstop  ***
766      !!
767      !! ** purpose :   Stop massively parallel processors method
768      !!
769      !!----------------------------------------------------------------------
770      LOGICAL, OPTIONAL, INTENT(in) :: ldfinal    ! source process number
771      LOGICAL, OPTIONAL, INTENT(in) :: ld_force_abort    ! source process number
772      LOGICAL ::   llfinal, ll_force_abort
773      INTEGER ::   info
774      !!----------------------------------------------------------------------
775      llfinal = .FALSE.
776      IF( PRESENT(ldfinal) ) llfinal = ldfinal
777      ll_force_abort = .FALSE.
778      IF( PRESENT(ld_force_abort) ) ll_force_abort = ld_force_abort
779      !
780#if defined key_mpp_mpi
781      IF(ll_force_abort) THEN
782         CALL mpi_abort( MPI_COMM_WORLD )
783      ELSE
784         CALL mppsync
785         CALL mpi_finalize( info )
786      ENDIF
787#endif
788      IF( .NOT. llfinal ) STOP 123
789      !
790   END SUBROUTINE mppstop
791
792
793   SUBROUTINE mpp_comm_free( kcom )
794      !!----------------------------------------------------------------------
795      INTEGER, INTENT(in) ::   kcom
796      !!
797      INTEGER :: ierr
798      !!----------------------------------------------------------------------
799      !
800#if defined key_mpp_mpi
801      CALL MPI_COMM_FREE(kcom, ierr)
802#endif
803      !
804   END SUBROUTINE mpp_comm_free
805
806
807   SUBROUTINE mpp_ini_znl( kumout )
808      !!----------------------------------------------------------------------
809      !!               ***  routine mpp_ini_znl  ***
810      !!
811      !! ** Purpose :   Initialize special communicator for computing zonal sum
812      !!
813      !! ** Method  : - Look for processors in the same row
814      !!              - Put their number in nrank_znl
815      !!              - Create group for the znl processors
816      !!              - Create a communicator for znl processors
817      !!              - Determine if processor should write znl files
818      !!
819      !! ** output
820      !!      ndim_rank_znl = number of processors on the same row
821      !!      ngrp_znl = group ID for the znl processors
822      !!      ncomm_znl = communicator for the ice procs.
823      !!      n_znl_root = number (in the world) of proc 0 in the ice comm.
824      !!
825      !!----------------------------------------------------------------------
826      INTEGER, INTENT(in) ::   kumout   ! ocean.output logical units
827      !
828      INTEGER :: jproc      ! dummy loop integer
829      INTEGER :: ierr, ii   ! local integer
830      INTEGER, ALLOCATABLE, DIMENSION(:) ::   kwork
831      !!----------------------------------------------------------------------
832#if defined key_mpp_mpi
833      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_world     : ', ngrp_world
834      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - mpi_comm_world : ', mpi_comm_world
835      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - mpi_comm_oce   : ', mpi_comm_oce
836      !
837      ALLOCATE( kwork(jpnij), STAT=ierr )
838      IF( ierr /= 0 ) THEN
839         WRITE(kumout, cform_err)
840         WRITE(kumout,*) 'mpp_ini_znl : failed to allocate 1D array of length jpnij'
841         CALL mppstop
842      ENDIF
843
844      IF( jpnj == 1 ) THEN
845         ngrp_znl  = ngrp_world
846         ncomm_znl = mpi_comm_oce
847      ELSE
848         !
849         CALL MPI_ALLGATHER ( njmpp, 1, mpi_integer, kwork, 1, mpi_integer, mpi_comm_oce, ierr )
850         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - kwork pour njmpp : ', kwork
851         !-$$        CALL flush(numout)
852         !
853         ! Count number of processors on the same row
854         ndim_rank_znl = 0
855         DO jproc=1,jpnij
856            IF ( kwork(jproc) == njmpp ) THEN
857               ndim_rank_znl = ndim_rank_znl + 1
858            ENDIF
859         END DO
860         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ndim_rank_znl : ', ndim_rank_znl
861         !-$$        CALL flush(numout)
862         ! Allocate the right size to nrank_znl
863         IF (ALLOCATED (nrank_znl)) DEALLOCATE(nrank_znl)
864         ALLOCATE(nrank_znl(ndim_rank_znl))
865         ii = 0
866         nrank_znl (:) = 0
867         DO jproc=1,jpnij
868            IF ( kwork(jproc) == njmpp) THEN
869               ii = ii + 1
870               nrank_znl(ii) = jproc -1
871            ENDIF
872         END DO
873         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - nrank_znl : ', nrank_znl
874         !-$$        CALL flush(numout)
875
876         ! Create the opa group
877         CALL MPI_COMM_GROUP(mpi_comm_oce,ngrp_opa,ierr)
878         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_opa : ', ngrp_opa
879         !-$$        CALL flush(numout)
880
881         ! Create the znl group from the opa group
882         CALL MPI_GROUP_INCL  ( ngrp_opa, ndim_rank_znl, nrank_znl, ngrp_znl, ierr )
883         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_znl ', ngrp_znl
884         !-$$        CALL flush(numout)
885
886         ! Create the znl communicator from the opa communicator, ie the pool of procs in the same row
887         CALL MPI_COMM_CREATE ( mpi_comm_oce, ngrp_znl, ncomm_znl, ierr )
888         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ncomm_znl ', ncomm_znl
889         !-$$        CALL flush(numout)
890         !
891      END IF
892
893      ! Determines if processor if the first (starting from i=1) on the row
894      IF ( jpni == 1 ) THEN
895         l_znl_root = .TRUE.
896      ELSE
897         l_znl_root = .FALSE.
898         kwork (1) = nimpp
899         CALL mpp_min ( 'lib_mpp', kwork(1), kcom = ncomm_znl)
900         IF ( nimpp == kwork(1)) l_znl_root = .TRUE.
901      END IF
902
903      DEALLOCATE(kwork)
904#endif
905
906   END SUBROUTINE mpp_ini_znl
907
908
909   SUBROUTINE mpp_ini_north
910      !!----------------------------------------------------------------------
911      !!               ***  routine mpp_ini_north  ***
912      !!
913      !! ** Purpose :   Initialize special communicator for north folding
914      !!      condition together with global variables needed in the mpp folding
915      !!
916      !! ** Method  : - Look for northern processors
917      !!              - Put their number in nrank_north
918      !!              - Create groups for the world processors and the north processors
919      !!              - Create a communicator for northern processors
920      !!
921      !! ** output
922      !!      njmppmax = njmpp for northern procs
923      !!      ndim_rank_north = number of processors in the northern line
924      !!      nrank_north (ndim_rank_north) = number  of the northern procs.
925      !!      ngrp_world = group ID for the world processors
926      !!      ngrp_north = group ID for the northern processors
927      !!      ncomm_north = communicator for the northern procs.
928      !!      north_root = number (in the world) of proc 0 in the northern comm.
929      !!
930      !!----------------------------------------------------------------------
931      INTEGER ::   ierr
932      INTEGER ::   jjproc
933      INTEGER ::   ii, ji
934      !!----------------------------------------------------------------------
935      !
936#if defined key_mpp_mpi
937      njmppmax = MAXVAL( njmppt )
938      !
939      ! Look for how many procs on the northern boundary
940      ndim_rank_north = 0
941      DO jjproc = 1, jpnij
942         IF( njmppt(jjproc) == njmppmax )   ndim_rank_north = ndim_rank_north + 1
943      END DO
944      !
945      ! Allocate the right size to nrank_north
946      IF (ALLOCATED (nrank_north)) DEALLOCATE(nrank_north)
947      ALLOCATE( nrank_north(ndim_rank_north) )
948
949      ! Fill the nrank_north array with proc. number of northern procs.
950      ! Note : the rank start at 0 in MPI
951      ii = 0
952      DO ji = 1, jpnij
953         IF ( njmppt(ji) == njmppmax   ) THEN
954            ii=ii+1
955            nrank_north(ii)=ji-1
956         END IF
957      END DO
958      !
959      ! create the world group
960      CALL MPI_COMM_GROUP( mpi_comm_oce, ngrp_world, ierr )
961      !
962      ! Create the North group from the world group
963      CALL MPI_GROUP_INCL( ngrp_world, ndim_rank_north, nrank_north, ngrp_north, ierr )
964      !
965      ! Create the North communicator , ie the pool of procs in the north group
966      CALL MPI_COMM_CREATE( mpi_comm_oce, ngrp_north, ncomm_north, ierr )
967      !
968#endif
969   END SUBROUTINE mpp_ini_north
970
971
972   SUBROUTINE mpi_init_oce( ldtxt, ksft, code )
973      !!---------------------------------------------------------------------
974      !!                   ***  routine mpp_init.opa  ***
975      !!
976      !! ** Purpose :: export and attach a MPI buffer for bsend
977      !!
978      !! ** Method  :: define buffer size in namelist, if 0 no buffer attachment
979      !!            but classical mpi_init
980      !!
981      !! History :: 01/11 :: IDRIS initial version for IBM only
982      !!            08/04 :: R. Benshila, generalisation
983      !!---------------------------------------------------------------------
984      CHARACTER(len=*),DIMENSION(:), INTENT(  out) ::   ldtxt
985      INTEGER                      , INTENT(inout) ::   ksft
986      INTEGER                      , INTENT(  out) ::   code
987      INTEGER                                      ::   ierr, ji
988      LOGICAL                                      ::   mpi_was_called
989      !!---------------------------------------------------------------------
990#if defined key_mpp_mpi
991      !
992      CALL mpi_initialized( mpi_was_called, code )      ! MPI initialization
993      IF ( code /= MPI_SUCCESS ) THEN
994         DO ji = 1, SIZE(ldtxt)
995            IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
996         END DO
997         WRITE(*, cform_err)
998         WRITE(*, *) ' lib_mpp: Error in routine mpi_initialized'
999         CALL mpi_abort( mpi_comm_world, code, ierr )
1000      ENDIF
1001      !
1002      IF( .NOT. mpi_was_called ) THEN
1003         CALL mpi_init( code )
1004         CALL mpi_comm_dup( mpi_comm_world, mpi_comm_oce, code )
1005         IF ( code /= MPI_SUCCESS ) THEN
1006            DO ji = 1, SIZE(ldtxt)
1007               IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
1008            END DO
1009            WRITE(*, cform_err)
1010            WRITE(*, *) ' lib_mpp: Error in routine mpi_comm_dup'
1011            CALL mpi_abort( mpi_comm_world, code, ierr )
1012         ENDIF
1013      ENDIF
1014      !
1015      IF( nn_buffer > 0 ) THEN
1016         WRITE(ldtxt(ksft),*) 'mpi_bsend, buffer allocation of  : ', nn_buffer   ;   ksft = ksft + 1
1017         ! Buffer allocation and attachment
1018         ALLOCATE( tampon(nn_buffer), stat = ierr )
1019         IF( ierr /= 0 ) THEN
1020            DO ji = 1, SIZE(ldtxt)
1021               IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
1022            END DO
1023            WRITE(*, cform_err)
1024            WRITE(*, *) ' lib_mpp: Error in ALLOCATE', ierr
1025            CALL mpi_abort( mpi_comm_world, code, ierr )
1026         END IF
1027         CALL mpi_buffer_attach( tampon, nn_buffer, code )
1028      ENDIF
1029      !
1030#endif
1031   END SUBROUTINE mpi_init_oce
1032
1033
1034   SUBROUTINE DDPDD_MPI( ydda, yddb, ilen, itype )
1035      !!---------------------------------------------------------------------
1036      !!   Routine DDPDD_MPI: used by reduction operator MPI_SUMDD
1037      !!
1038      !!   Modification of original codes written by David H. Bailey
1039      !!   This subroutine computes yddb(i) = ydda(i)+yddb(i)
1040      !!---------------------------------------------------------------------
1041      INTEGER                     , INTENT(in)    ::   ilen, itype
1042      COMPLEX(wp), DIMENSION(ilen), INTENT(in)    ::   ydda
1043      COMPLEX(wp), DIMENSION(ilen), INTENT(inout) ::   yddb
1044      !
1045      REAL(wp) :: zerr, zt1, zt2    ! local work variables
1046      INTEGER  :: ji, ztmp           ! local scalar
1047      !!---------------------------------------------------------------------
1048      !
1049      ztmp = itype   ! avoid compilation warning
1050      !
1051      DO ji=1,ilen
1052      ! Compute ydda + yddb using Knuth's trick.
1053         zt1  = real(ydda(ji)) + real(yddb(ji))
1054         zerr = zt1 - real(ydda(ji))
1055         zt2  = ((real(yddb(ji)) - zerr) + (real(ydda(ji)) - (zt1 - zerr))) &
1056                + aimag(ydda(ji)) + aimag(yddb(ji))
1057
1058         ! The result is zt1 + zt2, after normalization.
1059         yddb(ji) = cmplx ( zt1 + zt2, zt2 - ((zt1 + zt2) - zt1),wp )
1060      END DO
1061      !
1062   END SUBROUTINE DDPDD_MPI
1063
1064
1065   SUBROUTINE mpp_report( cdname, kpk, kpl, kpf, ld_lbc, ld_glb, ld_dlg )
1066      !!----------------------------------------------------------------------
1067      !!                  ***  routine mpp_report  ***
1068      !!
1069      !! ** Purpose :   report use of mpp routines per time-setp
1070      !!
1071      !!----------------------------------------------------------------------
1072      CHARACTER(len=*),           INTENT(in   ) ::   cdname      ! name of the calling subroutine
1073      INTEGER         , OPTIONAL, INTENT(in   ) ::   kpk, kpl, kpf
1074      LOGICAL         , OPTIONAL, INTENT(in   ) ::   ld_lbc, ld_glb, ld_dlg
1075      !!
1076      CHARACTER(len=128)                        ::   ccountname  ! name of a subroutine to count communications
1077      LOGICAL ::   ll_lbc, ll_glb, ll_dlg
1078      INTEGER ::    ji,  jj,  jk,  jh, jf, jcount   ! dummy loop indices
1079      !!----------------------------------------------------------------------
1080#if defined key_mpp_mpi
1081      !
1082      ll_lbc = .FALSE.
1083      IF( PRESENT(ld_lbc) ) ll_lbc = ld_lbc
1084      ll_glb = .FALSE.
1085      IF( PRESENT(ld_glb) ) ll_glb = ld_glb
1086      ll_dlg = .FALSE.
1087      IF( PRESENT(ld_dlg) ) ll_dlg = ld_dlg
1088      !
1089      ! find the smallest common frequency: default = frequency product, if multiple, choose the larger of the 2 frequency
1090      IF( ncom_dttrc /= 1 )   CALL ctl_stop( 'STOP', 'mpp_report, ncom_dttrc /= 1 not coded...' ) 
1091      ncom_freq = ncom_fsbc
1092      !
1093      IF ( ncom_stp == nit000+ncom_freq ) THEN   ! avoid to count extra communications in potential initializations at nit000
1094         IF( ll_lbc ) THEN
1095            IF( .NOT. ALLOCATED(ncomm_sequence) ) ALLOCATE( ncomm_sequence(ncom_rec_max,2) )
1096            IF( .NOT. ALLOCATED(    crname_lbc) ) ALLOCATE(     crname_lbc(ncom_rec_max  ) )
1097            n_sequence_lbc = n_sequence_lbc + 1
1098            IF( n_sequence_lbc > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
1099            crname_lbc(n_sequence_lbc) = cdname     ! keep the name of the calling routine
1100            ncomm_sequence(n_sequence_lbc,1) = kpk*kpl   ! size of 3rd and 4th dimensions
1101            ncomm_sequence(n_sequence_lbc,2) = kpf       ! number of arrays to be treated (multi)
1102         ENDIF
1103         IF( ll_glb ) THEN
1104            IF( .NOT. ALLOCATED(crname_glb) ) ALLOCATE( crname_glb(ncom_rec_max) )
1105            n_sequence_glb = n_sequence_glb + 1
1106            IF( n_sequence_glb > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
1107            crname_glb(n_sequence_glb) = cdname     ! keep the name of the calling routine
1108         ENDIF
1109         IF( ll_dlg ) THEN
1110            IF( .NOT. ALLOCATED(crname_dlg) ) ALLOCATE( crname_dlg(ncom_rec_max) )
1111            n_sequence_dlg = n_sequence_dlg + 1
1112            IF( n_sequence_dlg > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
1113            crname_dlg(n_sequence_dlg) = cdname     ! keep the name of the calling routine
1114         ENDIF
1115      ELSE IF ( ncom_stp == nit000+2*ncom_freq ) THEN
1116         CALL ctl_opn( numcom, 'communication_report.txt', 'REPLACE', 'FORMATTED', 'SEQUENTIAL', -1, numout, .FALSE., narea )
1117         WRITE(numcom,*) ' '
1118         WRITE(numcom,*) ' ------------------------------------------------------------'
1119         WRITE(numcom,*) ' Communication pattern report (second oce+sbc+top time step):'
1120         WRITE(numcom,*) ' ------------------------------------------------------------'
1121         WRITE(numcom,*) ' '
1122         WRITE(numcom,'(A,I4)') ' Exchanged halos : ', n_sequence_lbc
1123         jj = 0; jk = 0; jf = 0; jh = 0
1124         DO ji = 1, n_sequence_lbc
1125            IF ( ncomm_sequence(ji,1) .GT. 1 ) jk = jk + 1
1126            IF ( ncomm_sequence(ji,2) .GT. 1 ) jf = jf + 1
1127            IF ( ncomm_sequence(ji,1) .GT. 1 .AND. ncomm_sequence(ji,2) .GT. 1 ) jj = jj + 1
1128            jh = MAX (jh, ncomm_sequence(ji,1)*ncomm_sequence(ji,2))
1129         END DO
1130         WRITE(numcom,'(A,I3)') ' 3D Exchanged halos : ', jk
1131         WRITE(numcom,'(A,I3)') ' Multi arrays exchanged halos : ', jf
1132         WRITE(numcom,'(A,I3)') '   from which 3D : ', jj
1133         WRITE(numcom,'(A,I10)') ' Array max size : ', jh*jpi*jpj
1134         WRITE(numcom,*) ' '
1135         WRITE(numcom,*) ' lbc_lnk called'
1136         DO ji = 1, n_sequence_lbc - 1
1137            IF ( crname_lbc(ji) /= 'already counted' ) THEN
1138               ccountname = crname_lbc(ji)
1139               crname_lbc(ji) = 'already counted'
1140               jcount = 1
1141               DO jj = ji + 1, n_sequence_lbc
1142                  IF ( ccountname ==  crname_lbc(jj) ) THEN
1143                     jcount = jcount + 1
1144                     crname_lbc(jj) = 'already counted'
1145                  END IF
1146               END DO
1147               WRITE(numcom,'(A, I4, A, A)') ' - ', jcount,' times by subroutine ', TRIM(ccountname)
1148            END IF
1149         END DO
1150         IF ( crname_lbc(n_sequence_lbc) /= 'already counted' ) THEN
1151            WRITE(numcom,'(A, I4, A, A)') ' - ', 1,' times by subroutine ', TRIM(crname_lbc(ncom_rec_max))
1152         END IF
1153         WRITE(numcom,*) ' '
1154         IF ( n_sequence_glb > 0 ) THEN
1155            WRITE(numcom,'(A,I4)') ' Global communications : ', n_sequence_glb
1156            jj = 1
1157            DO ji = 2, n_sequence_glb
1158               IF( crname_glb(ji-1) /= crname_glb(ji) ) THEN
1159                  WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_glb(ji-1))
1160                  jj = 0
1161               END IF
1162               jj = jj + 1 
1163            END DO
1164            WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_glb(n_sequence_glb))
1165            DEALLOCATE(crname_glb)
1166         ELSE
1167            WRITE(numcom,*) ' No MPI global communication '
1168         ENDIF
1169         WRITE(numcom,*) ' '
1170         IF ( n_sequence_dlg > 0 ) THEN
1171            WRITE(numcom,'(A,I4)') ' Delayed global communications : ', n_sequence_dlg
1172            jj = 1
1173            DO ji = 2, n_sequence_dlg
1174               IF( crname_dlg(ji-1) /= crname_dlg(ji) ) THEN
1175                  WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_dlg(ji-1))
1176                  jj = 0
1177               END IF
1178               jj = jj + 1 
1179            END DO
1180            WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_dlg(n_sequence_dlg))
1181            DEALLOCATE(crname_dlg)
1182         ELSE
1183            WRITE(numcom,*) ' No MPI delayed global communication '
1184         ENDIF
1185         WRITE(numcom,*) ' '
1186         WRITE(numcom,*) ' -----------------------------------------------'
1187         WRITE(numcom,*) ' '
1188         DEALLOCATE(ncomm_sequence)
1189         DEALLOCATE(crname_lbc)
1190      ENDIF
1191#endif
1192   END SUBROUTINE mpp_report
1193
1194   
1195   SUBROUTINE tic_tac (ld_tic, ld_global)
1196
1197    LOGICAL,           INTENT(IN) :: ld_tic
1198    LOGICAL, OPTIONAL, INTENT(IN) :: ld_global
1199    REAL(wp), DIMENSION(2), SAVE :: tic_wt
1200    REAL(wp),               SAVE :: tic_ct = 0._wp
1201    INTEGER :: ii
1202#if defined key_mpp_mpi
1203
1204    IF( ncom_stp <= nit000 ) RETURN
1205    IF( ncom_stp == nitend ) RETURN
1206    ii = 1
1207    IF( PRESENT( ld_global ) ) THEN
1208       IF( ld_global ) ii = 2
1209    END IF
1210   
1211    IF ( ld_tic ) THEN
1212       tic_wt(ii) = MPI_Wtime()                                                    ! start count tic->tac (waiting time)
1213       IF ( tic_ct > 0.0_wp ) compute_time = compute_time + MPI_Wtime() - tic_ct   ! cumulate count tac->tic
1214    ELSE
1215       waiting_time(ii) = waiting_time(ii) + MPI_Wtime() - tic_wt(ii)              ! cumulate count tic->tac
1216       tic_ct = MPI_Wtime()                                                        ! start count tac->tic (waiting time)
1217    ENDIF
1218#endif
1219   
1220   END SUBROUTINE tic_tac
1221
1222#if ! defined key_mpp_mpi
1223   SUBROUTINE mpi_wait(request, status, ierror)
1224      INTEGER                            , INTENT(in   ) ::   request
1225      INTEGER, DIMENSION(MPI_STATUS_SIZE), INTENT(  out) ::   status
1226      INTEGER                            , INTENT(  out) ::   ierror
1227   END SUBROUTINE mpi_wait
1228#endif
1229
1230   !!----------------------------------------------------------------------
1231   !!   ctl_stop, ctl_warn, get_unit, ctl_opn, ctl_nam   routines
1232   !!----------------------------------------------------------------------
1233
1234   SUBROUTINE ctl_stop( cd1, cd2, cd3, cd4, cd5 ,   &
1235      &                 cd6, cd7, cd8, cd9, cd10 )
1236      !!----------------------------------------------------------------------
1237      !!                  ***  ROUTINE  stop_opa  ***
1238      !!
1239      !! ** Purpose :   print in ocean.outpput file a error message and
1240      !!                increment the error number (nstop) by one.
1241      !!----------------------------------------------------------------------
1242      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd1, cd2, cd3, cd4, cd5
1243      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd6, cd7, cd8, cd9, cd10
1244      !!----------------------------------------------------------------------
1245      !
1246      nstop = nstop + 1
1247
1248      ! force to open ocean.output file
1249      IF( numout == 6 ) CALL ctl_opn( numout, 'ocean.output', 'APPEND', 'FORMATTED', 'SEQUENTIAL', -1, 6, .FALSE. )
1250       
1251      WRITE(numout,cform_err)
1252      IF( PRESENT(cd1 ) )   WRITE(numout,*) TRIM(cd1)
1253      IF( PRESENT(cd2 ) )   WRITE(numout,*) TRIM(cd2)
1254      IF( PRESENT(cd3 ) )   WRITE(numout,*) TRIM(cd3)
1255      IF( PRESENT(cd4 ) )   WRITE(numout,*) TRIM(cd4)
1256      IF( PRESENT(cd5 ) )   WRITE(numout,*) TRIM(cd5)
1257      IF( PRESENT(cd6 ) )   WRITE(numout,*) TRIM(cd6)
1258      IF( PRESENT(cd7 ) )   WRITE(numout,*) TRIM(cd7)
1259      IF( PRESENT(cd8 ) )   WRITE(numout,*) TRIM(cd8)
1260      IF( PRESENT(cd9 ) )   WRITE(numout,*) TRIM(cd9)
1261      IF( PRESENT(cd10) )   WRITE(numout,*) TRIM(cd10)
1262
1263                               CALL FLUSH(numout    )
1264      IF( numstp     /= -1 )   CALL FLUSH(numstp    )
1265      IF( numrun     /= -1 )   CALL FLUSH(numrun    )
1266      IF( numevo_ice /= -1 )   CALL FLUSH(numevo_ice)
1267      !
1268      IF( cd1 == 'STOP' ) THEN
1269         WRITE(numout,*)  'huge E-R-R-O-R : immediate stop'
1270         CALL mppstop(ld_force_abort = .true.)
1271      ENDIF
1272      !
1273   END SUBROUTINE ctl_stop
1274
1275
1276   SUBROUTINE ctl_warn( cd1, cd2, cd3, cd4, cd5,   &
1277      &                 cd6, cd7, cd8, cd9, cd10 )
1278      !!----------------------------------------------------------------------
1279      !!                  ***  ROUTINE  stop_warn  ***
1280      !!
1281      !! ** Purpose :   print in ocean.outpput file a error message and
1282      !!                increment the warning number (nwarn) by one.
1283      !!----------------------------------------------------------------------
1284      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd1, cd2, cd3, cd4, cd5
1285      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd6, cd7, cd8, cd9, cd10
1286      !!----------------------------------------------------------------------
1287      !
1288      nwarn = nwarn + 1
1289      IF(lwp) THEN
1290         WRITE(numout,cform_war)
1291         IF( PRESENT(cd1 ) ) WRITE(numout,*) TRIM(cd1)
1292         IF( PRESENT(cd2 ) ) WRITE(numout,*) TRIM(cd2)
1293         IF( PRESENT(cd3 ) ) WRITE(numout,*) TRIM(cd3)
1294         IF( PRESENT(cd4 ) ) WRITE(numout,*) TRIM(cd4)
1295         IF( PRESENT(cd5 ) ) WRITE(numout,*) TRIM(cd5)
1296         IF( PRESENT(cd6 ) ) WRITE(numout,*) TRIM(cd6)
1297         IF( PRESENT(cd7 ) ) WRITE(numout,*) TRIM(cd7)
1298         IF( PRESENT(cd8 ) ) WRITE(numout,*) TRIM(cd8)
1299         IF( PRESENT(cd9 ) ) WRITE(numout,*) TRIM(cd9)
1300         IF( PRESENT(cd10) ) WRITE(numout,*) TRIM(cd10)
1301      ENDIF
1302      CALL FLUSH(numout)
1303      !
1304   END SUBROUTINE ctl_warn
1305
1306
1307   SUBROUTINE ctl_opn( knum, cdfile, cdstat, cdform, cdacce, klengh, kout, ldwp, karea )
1308      !!----------------------------------------------------------------------
1309      !!                  ***  ROUTINE ctl_opn  ***
1310      !!
1311      !! ** Purpose :   Open file and check if required file is available.
1312      !!
1313      !! ** Method  :   Fortan open
1314      !!----------------------------------------------------------------------
1315      INTEGER          , INTENT(  out) ::   knum      ! logical unit to open
1316      CHARACTER(len=*) , INTENT(in   ) ::   cdfile    ! file name to open
1317      CHARACTER(len=*) , INTENT(in   ) ::   cdstat    ! disposition specifier
1318      CHARACTER(len=*) , INTENT(in   ) ::   cdform    ! formatting specifier
1319      CHARACTER(len=*) , INTENT(in   ) ::   cdacce    ! access specifier
1320      INTEGER          , INTENT(in   ) ::   klengh    ! record length
1321      INTEGER          , INTENT(in   ) ::   kout      ! number of logical units for write
1322      LOGICAL          , INTENT(in   ) ::   ldwp      ! boolean term for print
1323      INTEGER, OPTIONAL, INTENT(in   ) ::   karea     ! proc number
1324      !
1325      CHARACTER(len=80) ::   clfile
1326      INTEGER           ::   iost
1327      !!----------------------------------------------------------------------
1328      !
1329      ! adapt filename
1330      ! ----------------
1331      clfile = TRIM(cdfile)
1332      IF( PRESENT( karea ) ) THEN
1333         IF( karea > 1 )   WRITE(clfile, "(a,'_',i4.4)") TRIM(clfile), karea-1
1334      ENDIF
1335#if defined key_agrif
1336      IF( .NOT. Agrif_Root() )   clfile = TRIM(Agrif_CFixed())//'_'//TRIM(clfile)
1337      knum=Agrif_Get_Unit()
1338#else
1339      knum=get_unit()
1340#endif
1341      IF( TRIM(cdfile) == '/dev/null' )   clfile = TRIM(cdfile)   ! force the use of /dev/null
1342      !
1343      iost=0
1344      IF( cdacce(1:6) == 'DIRECT' )  THEN         ! cdacce has always more than 6 characters
1345         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS=cdstat, RECL=klengh         , ERR=100, IOSTAT=iost )
1346      ELSE IF( TRIM(cdstat) == 'APPEND' )  THEN   ! cdstat can have less than 6 characters
1347         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS='UNKNOWN', POSITION='APPEND', ERR=100, IOSTAT=iost )
1348      ELSE
1349         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS=cdstat                      , ERR=100, IOSTAT=iost )
1350      ENDIF
1351      IF( iost /= 0 .AND. TRIM(clfile) == '/dev/null' ) &   ! for windows
1352         &  OPEN(UNIT=knum,FILE='NUL', FORM=cdform, ACCESS=cdacce, STATUS=cdstat                      , ERR=100, IOSTAT=iost )   
1353      IF( iost == 0 ) THEN
1354         IF(ldwp) THEN
1355            WRITE(kout,*) '     file   : ', TRIM(clfile),' open ok'
1356            WRITE(kout,*) '     unit   = ', knum
1357            WRITE(kout,*) '     status = ', cdstat
1358            WRITE(kout,*) '     form   = ', cdform
1359            WRITE(kout,*) '     access = ', cdacce
1360            WRITE(kout,*)
1361         ENDIF
1362      ENDIF
1363100   CONTINUE
1364      IF( iost /= 0 ) THEN
1365         IF(ldwp) THEN
1366            WRITE(kout,*)
1367            WRITE(kout,*) ' ===>>>> : bad opening file: ', TRIM(clfile)
1368            WRITE(kout,*) ' =======   ===  '
1369            WRITE(kout,*) '           unit   = ', knum
1370            WRITE(kout,*) '           status = ', cdstat
1371            WRITE(kout,*) '           form   = ', cdform
1372            WRITE(kout,*) '           access = ', cdacce
1373            WRITE(kout,*) '           iostat = ', iost
1374            WRITE(kout,*) '           we stop. verify the file '
1375            WRITE(kout,*)
1376         ELSE  !!! Force writing to make sure we get the information - at least once - in this violent STOP!!
1377            WRITE(*,*)
1378            WRITE(*,*) ' ===>>>> : bad opening file: ', TRIM(clfile)
1379            WRITE(*,*) ' =======   ===  '
1380            WRITE(*,*) '           unit   = ', knum
1381            WRITE(*,*) '           status = ', cdstat
1382            WRITE(*,*) '           form   = ', cdform
1383            WRITE(*,*) '           access = ', cdacce
1384            WRITE(*,*) '           iostat = ', iost
1385            WRITE(*,*) '           we stop. verify the file '
1386            WRITE(*,*)
1387         ENDIF
1388         CALL FLUSH( kout ) 
1389         STOP 'ctl_opn bad opening'
1390      ENDIF
1391      !
1392   END SUBROUTINE ctl_opn
1393
1394
1395   SUBROUTINE ctl_nam ( kios, cdnam, ldwp )
1396      !!----------------------------------------------------------------------
1397      !!                  ***  ROUTINE ctl_nam  ***
1398      !!
1399      !! ** Purpose :   Informations when error while reading a namelist
1400      !!
1401      !! ** Method  :   Fortan open
1402      !!----------------------------------------------------------------------
1403      INTEGER         , INTENT(inout) ::   kios    ! IO status after reading the namelist
1404      CHARACTER(len=*), INTENT(in   ) ::   cdnam   ! group name of namelist for which error occurs
1405      CHARACTER(len=5)                ::   clios   ! string to convert iostat in character for print
1406      LOGICAL         , INTENT(in   ) ::   ldwp    ! boolean term for print
1407      !!----------------------------------------------------------------------
1408      !
1409      WRITE (clios, '(I5.0)')   kios
1410      IF( kios < 0 ) THEN         
1411         CALL ctl_warn( 'end of record or file while reading namelist '   &
1412            &           // TRIM(cdnam) // ' iostat = ' // TRIM(clios) )
1413      ENDIF
1414      !
1415      IF( kios > 0 ) THEN
1416         CALL ctl_stop( 'misspelled variable in namelist '   &
1417            &           // TRIM(cdnam) // ' iostat = ' // TRIM(clios) )
1418      ENDIF
1419      kios = 0
1420      RETURN
1421      !
1422   END SUBROUTINE ctl_nam
1423
1424
1425   INTEGER FUNCTION get_unit()
1426      !!----------------------------------------------------------------------
1427      !!                  ***  FUNCTION  get_unit  ***
1428      !!
1429      !! ** Purpose :   return the index of an unused logical unit
1430      !!----------------------------------------------------------------------
1431      LOGICAL :: llopn
1432      !!----------------------------------------------------------------------
1433      !
1434      get_unit = 15   ! choose a unit that is big enough then it is not already used in NEMO
1435      llopn = .TRUE.
1436      DO WHILE( (get_unit < 998) .AND. llopn )
1437         get_unit = get_unit + 1
1438         INQUIRE( unit = get_unit, opened = llopn )
1439      END DO
1440      IF( (get_unit == 999) .AND. llopn ) THEN
1441         CALL ctl_stop( 'get_unit: All logical units until 999 are used...' )
1442         get_unit = -1
1443      ENDIF
1444      !
1445   END FUNCTION get_unit
1446
1447   !!----------------------------------------------------------------------
1448END MODULE lib_mpp
Note: See TracBrowser for help on using the repository browser.