source: NEMO/branches/2018/dev_r10164_HPC09_ESIWACE_PREP_MERGE/src/OCE/LBC/lib_mpp.F90 @ 10417

Last change on this file since 10417 was 10417, checked in by smasson, 21 months ago

dev_r10164_HPC09_ESIWACE_PREP_MERGE: suppress the use of nstop out of ctl_stop, see #2133

  • Property svn:keywords set to Id
File size: 88.2 KB
Line 
1MODULE lib_mpp
2   !!======================================================================
3   !!                       ***  MODULE  lib_mpp  ***
4   !! Ocean numerics:  massively parallel processing library
5   !!=====================================================================
6   !! History :  OPA  !  1994  (M. Guyon, J. Escobar, M. Imbard)  Original code
7   !!            7.0  !  1997  (A.M. Treguier)  SHMEM additions
8   !!            8.0  !  1998  (M. Imbard, J. Escobar, L. Colombet ) SHMEM and MPI
9   !!                 !  1998  (J.M. Molines) Open boundary conditions
10   !!   NEMO     1.0  !  2003  (J.M. Molines, G. Madec)  F90, free form
11   !!                 !  2003  (J.M. Molines) add mpp_ini_north(_3d,_2d)
12   !!             -   !  2004  (R. Bourdalle Badie)  isend option in mpi
13   !!                 !  2004  (J.M. Molines) minloc, maxloc
14   !!             -   !  2005  (G. Madec, S. Masson)  npolj=5,6 F-point & ice cases
15   !!             -   !  2005  (R. Redler) Replacement of MPI_COMM_WORLD except for MPI_Abort
16   !!             -   !  2005  (R. Benshila, G. Madec)  add extra halo case
17   !!             -   !  2008  (R. Benshila) add mpp_ini_ice
18   !!            3.2  !  2009  (R. Benshila) SHMEM suppression, north fold in lbc_nfd
19   !!            3.2  !  2009  (O. Marti)    add mpp_ini_znl
20   !!            4.0  !  2011  (G. Madec)  move ctl_ routines from in_out_manager
21   !!            3.5  !  2012  (S.Mocavero, I. Epicoco) Add mpp_lnk_bdy_3d/2d routines to optimize the BDY comm.
22   !!            3.5  !  2013  (C. Ethe, G. Madec)  message passing arrays as local variables
23   !!            3.5  !  2013  (S.Mocavero, I.Epicoco - CMCC) north fold optimizations
24   !!            3.6  !  2015  (O. Tintó and M. Castrillo - BSC) Added '_multiple' case for 2D lbc and max
25   !!            4.0  !  2017  (G. Madec) automatique allocation of array argument (use any 3rd dimension)
26   !!             -   !  2017  (G. Madec) create generic.h90 files to generate all lbc and north fold routines
27   !!----------------------------------------------------------------------
28
29   !!----------------------------------------------------------------------
30   !!   ctl_stop      : update momentum and tracer Kz from a tke scheme
31   !!   ctl_warn      : initialization, namelist read, and parameters control
32   !!   ctl_opn       : Open file and check if required file is available.
33   !!   ctl_nam       : Prints informations when an error occurs while reading a namelist
34   !!   get_unit      : give the index of an unused logical unit
35   !!----------------------------------------------------------------------
36#if   defined key_mpp_mpi
37   !!----------------------------------------------------------------------
38   !!   'key_mpp_mpi'             MPI massively parallel processing library
39   !!----------------------------------------------------------------------
40   !!   lib_mpp_alloc : allocate mpp arrays
41   !!   mynode        : indentify the processor unit
42   !!   mpp_lnk       : interface (defined in lbclnk) for message passing of 2d or 3d arrays (mpp_lnk_2d, mpp_lnk_3d)
43   !!   mpp_lnk_icb   : interface for message passing of 2d arrays with extra halo for icebergs (mpp_lnk_2d_icb)
44   !!   mpprecv       :
45   !!   mppsend       :
46   !!   mppscatter    :
47   !!   mppgather     :
48   !!   mpp_min       : generic interface for mppmin_int , mppmin_a_int , mppmin_real, mppmin_a_real
49   !!   mpp_max       : generic interface for mppmax_int , mppmax_a_int , mppmax_real, mppmax_a_real
50   !!   mpp_sum       : generic interface for mppsum_int , mppsum_a_int , mppsum_real, mppsum_a_real
51   !!   mpp_minloc    :
52   !!   mpp_maxloc    :
53   !!   mppsync       :
54   !!   mppstop       :
55   !!   mpp_ini_north : initialisation of north fold
56   !!   mpp_lbc_north_icb : alternative to mpp_nfd for extra outer halo with icebergs
57   !!----------------------------------------------------------------------
58   USE dom_oce        ! ocean space and time domain
59   USE lbcnfd         ! north fold treatment
60   USE in_out_manager ! I/O manager
61
62   IMPLICIT NONE
63   PRIVATE
64
65   INTERFACE mpp_nfd
66      MODULE PROCEDURE   mpp_nfd_2d    , mpp_nfd_3d    , mpp_nfd_4d
67      MODULE PROCEDURE   mpp_nfd_2d_ptr, mpp_nfd_3d_ptr, mpp_nfd_4d_ptr
68   END INTERFACE
69
70   ! Interface associated to the mpp_lnk_... routines is defined in lbclnk
71   PUBLIC   mpp_lnk_2d    , mpp_lnk_3d    , mpp_lnk_4d
72   PUBLIC   mpp_lnk_2d_ptr, mpp_lnk_3d_ptr, mpp_lnk_4d_ptr
73   !
74!!gm  this should be useless
75   PUBLIC   mpp_nfd_2d    , mpp_nfd_3d    , mpp_nfd_4d
76   PUBLIC   mpp_nfd_2d_ptr, mpp_nfd_3d_ptr, mpp_nfd_4d_ptr
77!!gm end
78   !
79   PUBLIC   ctl_stop, ctl_warn, get_unit, ctl_opn, ctl_nam
80   PUBLIC   mynode, mppstop, mppsync, mpp_comm_free
81   PUBLIC   mpp_ini_north
82   PUBLIC   mpp_lnk_2d_icb
83   PUBLIC   mpp_lbc_north_icb
84   PUBLIC   mpp_min, mpp_max, mpp_sum, mpp_minloc, mpp_maxloc
85   PUBLIC   mpp_delay_max, mpp_delay_sum, mpp_delay_rcv
86   PUBLIC   mppscatter, mppgather
87   PUBLIC   mpp_ini_znl
88   PUBLIC   mppsend, mpprecv                          ! needed by TAM and ICB routines
89   PUBLIC   mpp_lnk_bdy_2d, mpp_lnk_bdy_3d, mpp_lnk_bdy_4d
90   
91   !! * Interfaces
92   !! define generic interface for these routine as they are called sometimes
93   !! with scalar arguments instead of array arguments, which causes problems
94   !! for the compilation on AIX system as well as NEC and SGI. Ok on COMPACQ
95   INTERFACE mpp_min
96      MODULE PROCEDURE mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real
97   END INTERFACE
98   INTERFACE mpp_max
99      MODULE PROCEDURE mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real
100   END INTERFACE
101   INTERFACE mpp_sum
102      MODULE PROCEDURE mppsum_a_int, mppsum_int, mppsum_a_real, mppsum_real,   &
103         &             mppsum_realdd, mppsum_a_realdd
104   END INTERFACE
105   INTERFACE mpp_minloc
106      MODULE PROCEDURE mpp_minloc2d ,mpp_minloc3d
107   END INTERFACE
108   INTERFACE mpp_maxloc
109      MODULE PROCEDURE mpp_maxloc2d ,mpp_maxloc3d
110   END INTERFACE
111
112   !! ========================= !!
113   !!  MPI  variable definition !!
114   !! ========================= !!
115!$AGRIF_DO_NOT_TREAT
116   INCLUDE 'mpif.h'
117!$AGRIF_END_DO_NOT_TREAT
118
119   LOGICAL, PUBLIC, PARAMETER ::   lk_mpp = .TRUE.    !: mpp flag
120
121   INTEGER, PARAMETER         ::   nprocmax = 2**10   ! maximun dimension (required to be a power of 2)
122
123   INTEGER, PUBLIC ::   mppsize        ! number of process
124   INTEGER, PUBLIC ::   mpprank        ! process number  [ 0 - size-1 ]
125!$AGRIF_DO_NOT_TREAT
126   INTEGER, PUBLIC ::   mpi_comm_oce   ! opa local communicator
127!$AGRIF_END_DO_NOT_TREAT
128
129   INTEGER :: MPI_SUMDD
130
131   ! variables used for zonal integration
132   INTEGER, PUBLIC ::   ncomm_znl       !: communicator made by the processors on the same zonal average
133   LOGICAL, PUBLIC ::   l_znl_root      !: True on the 'left'most processor on the same row
134   INTEGER         ::   ngrp_znl        !  group ID for the znl processors
135   INTEGER         ::   ndim_rank_znl   !  number of processors on the same zonal average
136   INTEGER, DIMENSION(:), ALLOCATABLE, SAVE ::   nrank_znl  ! dimension ndim_rank_znl, number of the procs into the same znl domain
137
138   ! North fold condition in mpp_mpi with jpni > 1 (PUBLIC for TAM)
139   INTEGER, PUBLIC ::   ngrp_world        !: group ID for the world processors
140   INTEGER, PUBLIC ::   ngrp_opa          !: group ID for the opa processors
141   INTEGER, PUBLIC ::   ngrp_north        !: group ID for the northern processors (to be fold)
142   INTEGER, PUBLIC ::   ncomm_north       !: communicator made by the processors belonging to ngrp_north
143   INTEGER, PUBLIC ::   ndim_rank_north   !: number of 'sea' processor in the northern line (can be /= jpni !)
144   INTEGER, PUBLIC ::   njmppmax          !: value of njmpp for the processors of the northern line
145   INTEGER, PUBLIC ::   north_root        !: number (in the comm_opa) of proc 0 in the northern comm
146   INTEGER, PUBLIC, DIMENSION(:), ALLOCATABLE, SAVE ::   nrank_north   !: dimension ndim_rank_north
147
148   ! Type of send : standard, buffered, immediate
149   CHARACTER(len=1), PUBLIC ::   cn_mpi_send        !: type od mpi send/recieve (S=standard, B=bsend, I=isend)
150   LOGICAL         , PUBLIC ::   l_isend = .FALSE.  !: isend use indicator (T if cn_mpi_send='I')
151   INTEGER         , PUBLIC ::   nn_buffer          !: size of the buffer in case of mpi_bsend
152
153   ! Communications summary report
154   CHARACTER(len=128), DIMENSION(:), ALLOCATABLE ::   crname_lbc                   !: names of lbc_lnk calling routines
155   CHARACTER(len=128), DIMENSION(:), ALLOCATABLE ::   crname_glb                   !: names of global comm  calling routines
156   INTEGER, PUBLIC                               ::   ncom_stp = 0                 !: copy of time step # istp
157   INTEGER, PUBLIC                               ::   ncom_fsbc = 1                !: copy of sbc time step # nn_fsbc
158   INTEGER, PUBLIC                               ::   ncom_dttrc = 1               !: copy of top time step # nn_dttrc
159   INTEGER, PUBLIC                               ::   ncom_freq                    !: frequency of comm diagnostic
160   INTEGER, PUBLIC , DIMENSION(:,:), ALLOCATABLE ::   ncomm_sequence               !: size of communicated arrays (halos)
161   INTEGER, PARAMETER, PUBLIC                    ::   ncom_rec_max = 2000          !: max number of communication record
162   INTEGER, PUBLIC                               ::   n_sequence_lbc = 0           !: # of communicated arraysvia lbc
163   INTEGER, PUBLIC                               ::   n_sequence_glb = 0           !: # of global communications
164   INTEGER, PUBLIC                               ::   numcom = -1                  !: logical unit for communicaton report
165   LOGICAL, PUBLIC                               ::   l_full_nf_update = .TRUE.    !: logical for a full (2lines) update of bc at North fold report
166   INTEGER,                    PARAMETER, PUBLIC ::   nbdelay = 2       !: number of delayed operations
167   !: name (used as id) of allreduce-delayed operations
168   CHARACTER(len=32), DIMENSION(nbdelay), PUBLIC ::   c_delaylist = (/ 'cflice', 'fwb' /)
169   !: component name where the allreduce-delayed operation is performed
170   CHARACTER(len=3),  DIMENSION(nbdelay), PUBLIC ::   c_delaycpnt = (/ 'ICE'   , 'OCE' /)
171   TYPE, PUBLIC ::   DELAYARR
172      REAL(   wp), POINTER, DIMENSION(:) ::  z1d => NULL()
173      COMPLEX(wp), POINTER, DIMENSION(:) ::  y1d => NULL()
174   END TYPE DELAYARR
175   TYPE( DELAYARR ), DIMENSION(nbdelay), PUBLIC  ::   todelay             
176   INTEGER,          DIMENSION(nbdelay), PUBLIC  ::   ndelayid = -1     !: mpi request id of the delayed operations
177
178   ! timing summary report
179   REAL(wp), DIMENSION(2), PUBLIC ::  waiting_time = 0._wp
180   REAL(wp)              , PUBLIC ::  compute_time = 0._wp, elapsed_time = 0._wp
181   
182   REAL(wp), DIMENSION(:), ALLOCATABLE, SAVE ::   tampon   ! buffer in case of bsend
183
184   LOGICAL, PUBLIC ::   ln_nnogather                !: namelist control of northfold comms
185   LOGICAL, PUBLIC ::   l_north_nogather = .FALSE.  !: internal control of northfold comms
186
187   !!----------------------------------------------------------------------
188   !! NEMO/OCE 4.0 , NEMO Consortium (2018)
189   !! $Id$
190   !! Software governed by the CeCILL license (see ./LICENSE)
191   !!----------------------------------------------------------------------
192CONTAINS
193
194   FUNCTION mynode( ldtxt, ldname, kumnam_ref, kumnam_cfg, kumond, kstop, localComm )
195      !!----------------------------------------------------------------------
196      !!                  ***  routine mynode  ***
197      !!
198      !! ** Purpose :   Find processor unit
199      !!----------------------------------------------------------------------
200      CHARACTER(len=*),DIMENSION(:), INTENT(  out) ::   ldtxt        !
201      CHARACTER(len=*)             , INTENT(in   ) ::   ldname       !
202      INTEGER                      , INTENT(in   ) ::   kumnam_ref   ! logical unit for reference namelist
203      INTEGER                      , INTENT(in   ) ::   kumnam_cfg   ! logical unit for configuration namelist
204      INTEGER                      , INTENT(inout) ::   kumond       ! logical unit for namelist output
205      INTEGER                      , INTENT(inout) ::   kstop        ! stop indicator
206      INTEGER         , OPTIONAL   , INTENT(in   ) ::   localComm    !
207      !
208      INTEGER ::   mynode, ierr, code, ji, ii, ios
209      LOGICAL ::   mpi_was_called
210      !
211      NAMELIST/nammpp/ cn_mpi_send, nn_buffer, jpni, jpnj, jpnij, ln_nnogather
212      !!----------------------------------------------------------------------
213      !
214      ii = 1
215      WRITE(ldtxt(ii),*)                                                                  ;   ii = ii + 1
216      WRITE(ldtxt(ii),*) 'mynode : mpi initialisation'                                    ;   ii = ii + 1
217      WRITE(ldtxt(ii),*) '~~~~~~ '                                                        ;   ii = ii + 1
218      !
219      REWIND( kumnam_ref )              ! Namelist nammpp in reference namelist: mpi variables
220      READ  ( kumnam_ref, nammpp, IOSTAT = ios, ERR = 901)
221901   IF( ios /= 0 )   CALL ctl_nam ( ios , 'nammpp in reference namelist', lwp )
222      !
223      REWIND( kumnam_cfg )              ! Namelist nammpp in configuration namelist: mpi variables
224      READ  ( kumnam_cfg, nammpp, IOSTAT = ios, ERR = 902 )
225902   IF( ios >  0 )   CALL ctl_nam ( ios , 'nammpp in configuration namelist', lwp )
226      !
227      !                              ! control print
228      WRITE(ldtxt(ii),*) '   Namelist nammpp'                                             ;   ii = ii + 1
229      WRITE(ldtxt(ii),*) '      mpi send type          cn_mpi_send = ', cn_mpi_send       ;   ii = ii + 1
230      WRITE(ldtxt(ii),*) '      size exported buffer   nn_buffer   = ', nn_buffer,' bytes';   ii = ii + 1
231      !
232      IF( jpni < 1 .OR. jpnj < 1  ) THEN
233         WRITE(ldtxt(ii),*) '      jpni, jpnj and jpnij will be calculated automatically' ;   ii = ii + 1
234      ELSE
235         WRITE(ldtxt(ii),*) '      processor grid extent in i         jpni = ',jpni       ;   ii = ii + 1
236         WRITE(ldtxt(ii),*) '      processor grid extent in j         jpnj = ',jpnj       ;   ii = ii + 1
237      ENDIF
238
239      WRITE(ldtxt(ii),*) '      avoid use of mpi_allgather at the north fold  ln_nnogather = ', ln_nnogather  ; ii = ii + 1
240
241      CALL mpi_initialized ( mpi_was_called, code )
242      IF( code /= MPI_SUCCESS ) THEN
243         DO ji = 1, SIZE(ldtxt)
244            IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
245         END DO
246         WRITE(*, cform_err)
247         WRITE(*, *) 'lib_mpp: Error in routine mpi_initialized'
248         CALL mpi_abort( mpi_comm_world, code, ierr )
249      ENDIF
250
251      IF( mpi_was_called ) THEN
252         !
253         SELECT CASE ( cn_mpi_send )
254         CASE ( 'S' )                ! Standard mpi send (blocking)
255            WRITE(ldtxt(ii),*) '           Standard blocking mpi send (send)'             ;   ii = ii + 1
256         CASE ( 'B' )                ! Buffer mpi send (blocking)
257            WRITE(ldtxt(ii),*) '           Buffer blocking mpi send (bsend)'              ;   ii = ii + 1
258            IF( Agrif_Root() )   CALL mpi_init_oce( ldtxt, ii, ierr )
259         CASE ( 'I' )                ! Immediate mpi send (non-blocking send)
260            WRITE(ldtxt(ii),*) '           Immediate non-blocking send (isend)'           ;   ii = ii + 1
261            l_isend = .TRUE.
262         CASE DEFAULT
263            WRITE(ldtxt(ii),cform_err)                                                    ;   ii = ii + 1
264            WRITE(ldtxt(ii),*) '           bad value for cn_mpi_send = ', cn_mpi_send     ;   ii = ii + 1
265            kstop = kstop + 1
266         END SELECT
267         !
268      ELSEIF ( PRESENT(localComm) .AND. .NOT. mpi_was_called ) THEN
269         WRITE(ldtxt(ii),cform_err)                                                    ;   ii = ii + 1
270         WRITE(ldtxt(ii),*) ' lib_mpp: You cannot provide a local communicator '          ;   ii = ii + 1
271         WRITE(ldtxt(ii),*) '          without calling MPI_Init before ! '                ;   ii = ii + 1
272         kstop = kstop + 1
273      ELSE
274         SELECT CASE ( cn_mpi_send )
275         CASE ( 'S' )                ! Standard mpi send (blocking)
276            WRITE(ldtxt(ii),*) '           Standard blocking mpi send (send)'             ;   ii = ii + 1
277            CALL mpi_init( ierr )
278         CASE ( 'B' )                ! Buffer mpi send (blocking)
279            WRITE(ldtxt(ii),*) '           Buffer blocking mpi send (bsend)'              ;   ii = ii + 1
280            IF( Agrif_Root() )   CALL mpi_init_oce( ldtxt, ii, ierr )
281         CASE ( 'I' )                ! Immediate mpi send (non-blocking send)
282            WRITE(ldtxt(ii),*) '           Immediate non-blocking send (isend)'           ;   ii = ii + 1
283            l_isend = .TRUE.
284            CALL mpi_init( ierr )
285         CASE DEFAULT
286            WRITE(ldtxt(ii),cform_err)                                                    ;   ii = ii + 1
287            WRITE(ldtxt(ii),*) '           bad value for cn_mpi_send = ', cn_mpi_send     ;   ii = ii + 1
288            kstop = kstop + 1
289         END SELECT
290         !
291      ENDIF
292
293      IF( PRESENT(localComm) ) THEN
294         IF( Agrif_Root() ) THEN
295            mpi_comm_oce = localComm
296         ENDIF
297      ELSE
298         CALL mpi_comm_dup( mpi_comm_world, mpi_comm_oce, code)
299         IF( code /= MPI_SUCCESS ) THEN
300            DO ji = 1, SIZE(ldtxt)
301               IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
302            END DO
303            WRITE(*, cform_err)
304            WRITE(*, *) ' lib_mpp: Error in routine mpi_comm_dup'
305            CALL mpi_abort( mpi_comm_world, code, ierr )
306         ENDIF
307      ENDIF
308
309#if defined key_agrif
310      IF( Agrif_Root() ) THEN
311         CALL Agrif_MPI_Init(mpi_comm_oce)
312      ELSE
313         CALL Agrif_MPI_set_grid_comm(mpi_comm_oce)
314      ENDIF
315#endif
316
317      CALL mpi_comm_rank( mpi_comm_oce, mpprank, ierr )
318      CALL mpi_comm_size( mpi_comm_oce, mppsize, ierr )
319      mynode = mpprank
320
321      IF( mynode == 0 ) THEN
322         CALL ctl_opn( kumond, TRIM(ldname), 'UNKNOWN', 'FORMATTED', 'SEQUENTIAL', -1, 6, .FALSE. , 1 )
323         WRITE(kumond, nammpp)     
324      ENDIF
325      !
326      CALL MPI_OP_CREATE(DDPDD_MPI, .TRUE., MPI_SUMDD, ierr)
327      !
328   END FUNCTION mynode
329
330   !!----------------------------------------------------------------------
331   !!                   ***  routine mpp_lnk_(2,3,4)d  ***
332   !!
333   !!   * Argument : dummy argument use in mpp_lnk_... routines
334   !!                ptab   :   array or pointer of arrays on which the boundary condition is applied
335   !!                cd_nat :   nature of array grid-points
336   !!                psgn   :   sign used across the north fold boundary
337   !!                kfld   :   optional, number of pt3d arrays
338   !!                cd_mpp :   optional, fill the overlap area only
339   !!                pval   :   optional, background value (used at closed boundaries)
340   !!----------------------------------------------------------------------
341   !
342   !                       !==  2D array and array of 2D pointer  ==!
343   !
344#  define DIM_2d
345#     define ROUTINE_LNK           mpp_lnk_2d
346#     include "mpp_lnk_generic.h90"
347#     undef ROUTINE_LNK
348#     define MULTI
349#     define ROUTINE_LNK           mpp_lnk_2d_ptr
350#     include "mpp_lnk_generic.h90"
351#     undef ROUTINE_LNK
352#     undef MULTI
353#  undef DIM_2d
354   !
355   !                       !==  3D array and array of 3D pointer  ==!
356   !
357#  define DIM_3d
358#     define ROUTINE_LNK           mpp_lnk_3d
359#     include "mpp_lnk_generic.h90"
360#     undef ROUTINE_LNK
361#     define MULTI
362#     define ROUTINE_LNK           mpp_lnk_3d_ptr
363#     include "mpp_lnk_generic.h90"
364#     undef ROUTINE_LNK
365#     undef MULTI
366#  undef DIM_3d
367   !
368   !                       !==  4D array and array of 4D pointer  ==!
369   !
370#  define DIM_4d
371#     define ROUTINE_LNK           mpp_lnk_4d
372#     include "mpp_lnk_generic.h90"
373#     undef ROUTINE_LNK
374#     define MULTI
375#     define ROUTINE_LNK           mpp_lnk_4d_ptr
376#     include "mpp_lnk_generic.h90"
377#     undef ROUTINE_LNK
378#     undef MULTI
379#  undef DIM_4d
380
381   !!----------------------------------------------------------------------
382   !!                   ***  routine mpp_nfd_(2,3,4)d  ***
383   !!
384   !!   * Argument : dummy argument use in mpp_nfd_... routines
385   !!                ptab   :   array or pointer of arrays on which the boundary condition is applied
386   !!                cd_nat :   nature of array grid-points
387   !!                psgn   :   sign used across the north fold boundary
388   !!                kfld   :   optional, number of pt3d arrays
389   !!                cd_mpp :   optional, fill the overlap area only
390   !!                pval   :   optional, background value (used at closed boundaries)
391   !!----------------------------------------------------------------------
392   !
393   !                       !==  2D array and array of 2D pointer  ==!
394   !
395#  define DIM_2d
396#     define ROUTINE_NFD           mpp_nfd_2d
397#     include "mpp_nfd_generic.h90"
398#     undef ROUTINE_NFD
399#     define MULTI
400#     define ROUTINE_NFD           mpp_nfd_2d_ptr
401#     include "mpp_nfd_generic.h90"
402#     undef ROUTINE_NFD
403#     undef MULTI
404#  undef DIM_2d
405   !
406   !                       !==  3D array and array of 3D pointer  ==!
407   !
408#  define DIM_3d
409#     define ROUTINE_NFD           mpp_nfd_3d
410#     include "mpp_nfd_generic.h90"
411#     undef ROUTINE_NFD
412#     define MULTI
413#     define ROUTINE_NFD           mpp_nfd_3d_ptr
414#     include "mpp_nfd_generic.h90"
415#     undef ROUTINE_NFD
416#     undef MULTI
417#  undef DIM_3d
418   !
419   !                       !==  4D array and array of 4D pointer  ==!
420   !
421#  define DIM_4d
422#     define ROUTINE_NFD           mpp_nfd_4d
423#     include "mpp_nfd_generic.h90"
424#     undef ROUTINE_NFD
425#     define MULTI
426#     define ROUTINE_NFD           mpp_nfd_4d_ptr
427#     include "mpp_nfd_generic.h90"
428#     undef ROUTINE_NFD
429#     undef MULTI
430#  undef DIM_4d
431
432
433   !!----------------------------------------------------------------------
434   !!                   ***  routine mpp_lnk_bdy_(2,3,4)d  ***
435   !!
436   !!   * Argument : dummy argument use in mpp_lnk_... routines
437   !!                ptab   :   array or pointer of arrays on which the boundary condition is applied
438   !!                cd_nat :   nature of array grid-points
439   !!                psgn   :   sign used across the north fold boundary
440   !!                kb_bdy :   BDY boundary set
441   !!                kfld   :   optional, number of pt3d arrays
442   !!----------------------------------------------------------------------
443   !
444   !                       !==  2D array and array of 2D pointer  ==!
445   !
446#  define DIM_2d
447#     define ROUTINE_BDY           mpp_lnk_bdy_2d
448#     include "mpp_bdy_generic.h90"
449#     undef ROUTINE_BDY
450#  undef DIM_2d
451   !
452   !                       !==  3D array and array of 3D pointer  ==!
453   !
454#  define DIM_3d
455#     define ROUTINE_BDY           mpp_lnk_bdy_3d
456#     include "mpp_bdy_generic.h90"
457#     undef ROUTINE_BDY
458#  undef DIM_3d
459   !
460   !                       !==  4D array and array of 4D pointer  ==!
461   !
462#  define DIM_4d
463#     define ROUTINE_BDY           mpp_lnk_bdy_4d
464#     include "mpp_bdy_generic.h90"
465#     undef ROUTINE_BDY
466#  undef DIM_4d
467
468   !!----------------------------------------------------------------------
469   !!
470   !!   load_array  &   mpp_lnk_2d_9    à generaliser a 3D et 4D
471   
472   
473   !!    mpp_lnk_sum_2d et 3D   ====>>>>>>   à virer du code !!!!
474   
475   
476   !!----------------------------------------------------------------------
477
478
479
480   SUBROUTINE mppsend( ktyp, pmess, kbytes, kdest, md_req )
481      !!----------------------------------------------------------------------
482      !!                  ***  routine mppsend  ***
483      !!
484      !! ** Purpose :   Send messag passing array
485      !!
486      !!----------------------------------------------------------------------
487      REAL(wp), INTENT(inout) ::   pmess(*)   ! array of real
488      INTEGER , INTENT(in   ) ::   kbytes     ! size of the array pmess
489      INTEGER , INTENT(in   ) ::   kdest      ! receive process number
490      INTEGER , INTENT(in   ) ::   ktyp       ! tag of the message
491      INTEGER , INTENT(in   ) ::   md_req     ! argument for isend
492      !!
493      INTEGER ::   iflag
494      !!----------------------------------------------------------------------
495      !
496      SELECT CASE ( cn_mpi_send )
497      CASE ( 'S' )                ! Standard mpi send (blocking)
498         CALL mpi_send ( pmess, kbytes, mpi_double_precision, kdest , ktyp, mpi_comm_oce        , iflag )
499      CASE ( 'B' )                ! Buffer mpi send (blocking)
500         CALL mpi_bsend( pmess, kbytes, mpi_double_precision, kdest , ktyp, mpi_comm_oce        , iflag )
501      CASE ( 'I' )                ! Immediate mpi send (non-blocking send)
502         ! be carefull, one more argument here : the mpi request identifier..
503         CALL mpi_isend( pmess, kbytes, mpi_double_precision, kdest , ktyp, mpi_comm_oce, md_req, iflag )
504      END SELECT
505      !
506   END SUBROUTINE mppsend
507
508
509   SUBROUTINE mpprecv( ktyp, pmess, kbytes, ksource )
510      !!----------------------------------------------------------------------
511      !!                  ***  routine mpprecv  ***
512      !!
513      !! ** Purpose :   Receive messag passing array
514      !!
515      !!----------------------------------------------------------------------
516      REAL(wp), INTENT(inout) ::   pmess(*)   ! array of real
517      INTEGER , INTENT(in   ) ::   kbytes     ! suze of the array pmess
518      INTEGER , INTENT(in   ) ::   ktyp       ! Tag of the recevied message
519      INTEGER, OPTIONAL, INTENT(in) :: ksource    ! source process number
520      !!
521      INTEGER :: istatus(mpi_status_size)
522      INTEGER :: iflag
523      INTEGER :: use_source
524      !!----------------------------------------------------------------------
525      !
526      ! If a specific process number has been passed to the receive call,
527      ! use that one. Default is to use mpi_any_source
528      use_source = mpi_any_source
529      IF( PRESENT(ksource) )   use_source = ksource
530      !
531      CALL mpi_recv( pmess, kbytes, mpi_double_precision, use_source, ktyp, mpi_comm_oce, istatus, iflag )
532      !
533   END SUBROUTINE mpprecv
534
535
536   SUBROUTINE mppgather( ptab, kp, pio )
537      !!----------------------------------------------------------------------
538      !!                   ***  routine mppgather  ***
539      !!
540      !! ** Purpose :   Transfert between a local subdomain array and a work
541      !!     array which is distributed following the vertical level.
542      !!
543      !!----------------------------------------------------------------------
544      REAL(wp), DIMENSION(jpi,jpj)      , INTENT(in   ) ::   ptab   ! subdomain input array
545      INTEGER                           , INTENT(in   ) ::   kp     ! record length
546      REAL(wp), DIMENSION(jpi,jpj,jpnij), INTENT(  out) ::   pio    ! subdomain input array
547      !!
548      INTEGER :: itaille, ierror   ! temporary integer
549      !!---------------------------------------------------------------------
550      !
551      itaille = jpi * jpj
552      CALL mpi_gather( ptab, itaille, mpi_double_precision, pio, itaille     ,   &
553         &                            mpi_double_precision, kp , mpi_comm_oce, ierror )
554      !
555   END SUBROUTINE mppgather
556
557
558   SUBROUTINE mppscatter( pio, kp, ptab )
559      !!----------------------------------------------------------------------
560      !!                  ***  routine mppscatter  ***
561      !!
562      !! ** Purpose :   Transfert between awork array which is distributed
563      !!      following the vertical level and the local subdomain array.
564      !!
565      !!----------------------------------------------------------------------
566      REAL(wp), DIMENSION(jpi,jpj,jpnij)  ::   pio    ! output array
567      INTEGER                             ::   kp     ! Tag (not used with MPI
568      REAL(wp), DIMENSION(jpi,jpj)        ::   ptab   ! subdomain array input
569      !!
570      INTEGER :: itaille, ierror   ! temporary integer
571      !!---------------------------------------------------------------------
572      !
573      itaille = jpi * jpj
574      !
575      CALL mpi_scatter( pio, itaille, mpi_double_precision, ptab, itaille     ,   &
576         &                            mpi_double_precision, kp  , mpi_comm_oce, ierror )
577      !
578   END SUBROUTINE mppscatter
579
580   
581   SUBROUTINE mpp_delay_sum( cdname, cdelay, y_in, pout, ldlast, kcom )
582     !!----------------------------------------------------------------------
583      !!                   ***  routine mpp_delay_sum  ***
584      !!
585      !! ** Purpose :   performed delayed mpp_sum, the result is received on next call
586      !!
587      !!----------------------------------------------------------------------
588      CHARACTER(len=*), INTENT(in   )               ::   cdname  ! name of the calling subroutine
589      CHARACTER(len=*), INTENT(in   )               ::   cdelay  ! name (used as id) of the delayed operation
590      COMPLEX(wp),      INTENT(in   ), DIMENSION(:) ::   y_in
591      REAL(wp),         INTENT(  out), DIMENSION(:) ::   pout
592      LOGICAL,          INTENT(in   )               ::   ldlast  ! true if this is the last time we call this routine
593      INTEGER,          INTENT(in   ), OPTIONAL     ::   kcom
594      !!
595      INTEGER ::   ji, isz
596      INTEGER ::   idvar
597      INTEGER ::   ierr, ilocalcomm
598      COMPLEX(wp), ALLOCATABLE, DIMENSION(:) ::   ytmp
599      !!----------------------------------------------------------------------
600      ilocalcomm = mpi_comm_oce
601      IF( PRESENT(kcom) )   ilocalcomm = kcom
602
603      isz = SIZE(y_in)
604     
605      IF( narea == 1 .AND. numcom == -1 ) CALL mpp_report( cdname, ld_glb = .TRUE. )
606
607      idvar = -1
608      DO ji = 1, nbdelay
609         IF( TRIM(cdelay) == TRIM(c_delaylist(ji)) ) idvar = ji
610      END DO
611      IF ( idvar == -1 )   CALL ctl_stop( 'STOP',' mpp_delay_sum : please add a new delayed exchange for '//TRIM(cdname) )
612
613      IF ( ndelayid(idvar) == 0 ) THEN         ! first call    with restart: %z1d defined in iom_delay_rst
614         !                                       --------------------------
615         IF ( SIZE(todelay(idvar)%z1d) /= isz ) THEN                  ! Check dimension coherence
616            IF(lwp) WRITE(numout,*) ' WARNING: the nb of delayed variables in restart file is not the model one'
617            DEALLOCATE(todelay(idvar)%z1d)
618            ndelayid(idvar) = -1                                      ! do as if we had no restart
619         ELSE
620            ALLOCATE(todelay(idvar)%y1d(isz))
621            todelay(idvar)%y1d(:) = CMPLX(todelay(idvar)%z1d(:), 0., wp)   ! create %y1d, complex variable needed by mpi_sumdd
622         END IF
623      ENDIF
624     
625      IF( ndelayid(idvar) == -1 ) THEN         ! first call without restart: define %y1d and %z1d from y_in with blocking allreduce
626         !                                       --------------------------
627         ALLOCATE(todelay(idvar)%z1d(isz), todelay(idvar)%y1d(isz))   ! allocate also %z1d as used for the restart
628         CALL mpi_allreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ierr )   ! get %y1d
629         todelay(idvar)%z1d(:) = REAL(todelay(idvar)%y1d(:), wp)      ! define %z1d from %y1d
630      ENDIF
631
632      IF( ndelayid(idvar) > 0 )   CALL mpp_delay_rcv( idvar )         ! make sure %z1d is received
633
634      ! send back pout from todelay(idvar)%z1d defined at previous call
635      pout(:) = todelay(idvar)%z1d(:)
636
637      ! send y_in into todelay(idvar)%y1d with a non-blocking communication
638      CALL mpi_iallreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ndelayid(idvar), ierr )
639
640   END SUBROUTINE mpp_delay_sum
641
642   
643   SUBROUTINE mpp_delay_max( cdname, cdelay, p_in, pout, ldlast, kcom )
644      !!----------------------------------------------------------------------
645      !!                   ***  routine mpp_delay_max  ***
646      !!
647      !! ** Purpose :   performed delayed mpp_max, the result is received on next call
648      !!
649      !!----------------------------------------------------------------------
650      CHARACTER(len=*), INTENT(in   )                 ::   cdname  ! name of the calling subroutine
651      CHARACTER(len=*), INTENT(in   )                 ::   cdelay  ! name (used as id) of the delayed operation
652      REAL(wp),         INTENT(in   ), DIMENSION(:)   ::   p_in    !
653      REAL(wp),         INTENT(  out), DIMENSION(:)   ::   pout    !
654      LOGICAL,          INTENT(in   )                 ::   ldlast  ! true if this is the last time we call this routine
655      INTEGER,          INTENT(in   ), OPTIONAL       ::   kcom
656      !!
657      INTEGER ::   ji, isz
658      INTEGER ::   idvar
659      INTEGER ::   ierr, ilocalcomm
660      !!----------------------------------------------------------------------
661      ilocalcomm = mpi_comm_oce
662      IF( PRESENT(kcom) )   ilocalcomm = kcom
663
664      isz = SIZE(p_in)
665
666      IF( narea == 1 .AND. numcom == -1 ) CALL mpp_report( cdname, ld_glb = .TRUE. )
667
668      idvar = -1
669      DO ji = 1, nbdelay
670         IF( TRIM(cdelay) == TRIM(c_delaylist(ji)) ) idvar = ji
671      END DO
672      IF ( idvar == -1 )   CALL ctl_stop( 'STOP',' mpp_delay_max : please add a new delayed exchange for '//TRIM(cdname) )
673
674      IF ( ndelayid(idvar) == 0 ) THEN         ! first call    with restart: %z1d defined in iom_delay_rst
675         !                                       --------------------------
676         IF ( SIZE(todelay(idvar)%z1d) /= isz ) THEN                  ! Check dimension coherence
677            IF(lwp) WRITE(numout,*) ' WARNING: the nb of delayed variables in restart file is not the model one'
678            DEALLOCATE(todelay(idvar)%z1d)
679            ndelayid(idvar) = -1                                      ! do as if we had no restart
680         END IF
681      ENDIF
682
683      IF( ndelayid(idvar) == -1 ) THEN         ! first call without restart: define %z1d from p_in with a blocking allreduce
684         !                                       --------------------------
685         ALLOCATE(todelay(idvar)%z1d(isz))
686         CALL mpi_allreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_DOUBLE_PRECISION, mpi_max, ilocalcomm, ierr )   ! get %z1d
687      ENDIF
688
689      IF( ndelayid(idvar) > 0 )   CALL mpp_delay_rcv( idvar )         ! make sure %z1d is received
690
691      ! send back pout from todelay(idvar)%z1d defined at previous call
692      pout(:) = todelay(idvar)%z1d(:)
693
694      ! send p_in into todelay(idvar)%z1d with a non-blocking communication
695      CALL mpi_iallreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_DOUBLE_PRECISION, mpi_max, ilocalcomm, ndelayid(idvar), ierr )
696
697   END SUBROUTINE mpp_delay_max
698
699   
700   SUBROUTINE mpp_delay_rcv( kid )
701      !!----------------------------------------------------------------------
702      !!                   ***  routine mpp_delay_rcv  ***
703      !!
704      !! ** Purpose :  force barrier for delayed mpp (needed for restart)
705      !!
706      !!----------------------------------------------------------------------
707      INTEGER,INTENT(in   )      ::  kid 
708      INTEGER ::   ierr
709      !!----------------------------------------------------------------------
710      IF( ndelayid(kid) /= -2 ) THEN 
711         IF( ln_timing ) CALL tic_tac( .TRUE., ld_global = .TRUE.)
712         CALL mpi_wait( ndelayid(kid), MPI_STATUS_IGNORE, ierr )                        ! make sure todelay(kid) is received
713         IF( ln_timing ) CALL tic_tac(.FALSE., ld_global = .TRUE.)
714         IF( ASSOCIATED(todelay(kid)%y1d) )   todelay(kid)%z1d(:) = REAL(todelay(kid)%y1d(:), wp)  ! define %z1d from %y1d
715         ndelayid(kid) = -2   ! add flag to know that mpi_wait was already called on kid
716      ENDIF
717   END SUBROUTINE mpp_delay_rcv
718
719   
720   !!----------------------------------------------------------------------
721   !!    ***  mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real  ***
722   !!   
723   !!----------------------------------------------------------------------
724   !!
725#  define OPERATION_MAX
726#  define INTEGER_TYPE
727#  define DIM_0d
728#     define ROUTINE_ALLREDUCE           mppmax_int
729#     include "mpp_allreduce_generic.h90"
730#     undef ROUTINE_ALLREDUCE
731#  undef DIM_0d
732#  define DIM_1d
733#     define ROUTINE_ALLREDUCE           mppmax_a_int
734#     include "mpp_allreduce_generic.h90"
735#     undef ROUTINE_ALLREDUCE
736#  undef DIM_1d
737#  undef INTEGER_TYPE
738!
739#  define REAL_TYPE
740#  define DIM_0d
741#     define ROUTINE_ALLREDUCE           mppmax_real
742#     include "mpp_allreduce_generic.h90"
743#     undef ROUTINE_ALLREDUCE
744#  undef DIM_0d
745#  define DIM_1d
746#     define ROUTINE_ALLREDUCE           mppmax_a_real
747#     include "mpp_allreduce_generic.h90"
748#     undef ROUTINE_ALLREDUCE
749#  undef DIM_1d
750#  undef REAL_TYPE
751#  undef OPERATION_MAX
752   !!----------------------------------------------------------------------
753   !!    ***  mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real  ***
754   !!   
755   !!----------------------------------------------------------------------
756   !!
757#  define OPERATION_MIN
758#  define INTEGER_TYPE
759#  define DIM_0d
760#     define ROUTINE_ALLREDUCE           mppmin_int
761#     include "mpp_allreduce_generic.h90"
762#     undef ROUTINE_ALLREDUCE
763#  undef DIM_0d
764#  define DIM_1d
765#     define ROUTINE_ALLREDUCE           mppmin_a_int
766#     include "mpp_allreduce_generic.h90"
767#     undef ROUTINE_ALLREDUCE
768#  undef DIM_1d
769#  undef INTEGER_TYPE
770!
771#  define REAL_TYPE
772#  define DIM_0d
773#     define ROUTINE_ALLREDUCE           mppmin_real
774#     include "mpp_allreduce_generic.h90"
775#     undef ROUTINE_ALLREDUCE
776#  undef DIM_0d
777#  define DIM_1d
778#     define ROUTINE_ALLREDUCE           mppmin_a_real
779#     include "mpp_allreduce_generic.h90"
780#     undef ROUTINE_ALLREDUCE
781#  undef DIM_1d
782#  undef REAL_TYPE
783#  undef OPERATION_MIN
784
785   !!----------------------------------------------------------------------
786   !!    ***  mppsum_a_int, mppsum_int, mppsum_a_real, mppsum_real  ***
787   !!   
788   !!   Global sum of 1D array or a variable (integer, real or complex)
789   !!----------------------------------------------------------------------
790   !!
791#  define OPERATION_SUM
792#  define INTEGER_TYPE
793#  define DIM_0d
794#     define ROUTINE_ALLREDUCE           mppsum_int
795#     include "mpp_allreduce_generic.h90"
796#     undef ROUTINE_ALLREDUCE
797#  undef DIM_0d
798#  define DIM_1d
799#     define ROUTINE_ALLREDUCE           mppsum_a_int
800#     include "mpp_allreduce_generic.h90"
801#     undef ROUTINE_ALLREDUCE
802#  undef DIM_1d
803#  undef INTEGER_TYPE
804!
805#  define REAL_TYPE
806#  define DIM_0d
807#     define ROUTINE_ALLREDUCE           mppsum_real
808#     include "mpp_allreduce_generic.h90"
809#     undef ROUTINE_ALLREDUCE
810#  undef DIM_0d
811#  define DIM_1d
812#     define ROUTINE_ALLREDUCE           mppsum_a_real
813#     include "mpp_allreduce_generic.h90"
814#     undef ROUTINE_ALLREDUCE
815#  undef DIM_1d
816#  undef REAL_TYPE
817#  undef OPERATION_SUM
818
819#  define OPERATION_SUM_DD
820#  define COMPLEX_TYPE
821#  define DIM_0d
822#     define ROUTINE_ALLREDUCE           mppsum_realdd
823#     include "mpp_allreduce_generic.h90"
824#     undef ROUTINE_ALLREDUCE
825#  undef DIM_0d
826#  define DIM_1d
827#     define ROUTINE_ALLREDUCE           mppsum_a_realdd
828#     include "mpp_allreduce_generic.h90"
829#     undef ROUTINE_ALLREDUCE
830#  undef DIM_1d
831#  undef COMPLEX_TYPE
832#  undef OPERATION_SUM_DD
833
834   !!----------------------------------------------------------------------
835   !!    ***  mpp_minloc2d, mpp_minloc3d, mpp_maxloc2d, mpp_maxloc3d
836   !!   
837   !!----------------------------------------------------------------------
838   !!
839#  define OPERATION_MINLOC
840#  define DIM_2d
841#     define ROUTINE_LOC           mpp_minloc2d
842#     include "mpp_loc_generic.h90"
843#     undef ROUTINE_LOC
844#  undef DIM_2d
845#  define DIM_3d
846#     define ROUTINE_LOC           mpp_minloc3d
847#     include "mpp_loc_generic.h90"
848#     undef ROUTINE_LOC
849#  undef DIM_3d
850#  undef OPERATION_MINLOC
851
852#  define OPERATION_MAXLOC
853#  define DIM_2d
854#     define ROUTINE_LOC           mpp_maxloc2d
855#     include "mpp_loc_generic.h90"
856#     undef ROUTINE_LOC
857#  undef DIM_2d
858#  define DIM_3d
859#     define ROUTINE_LOC           mpp_maxloc3d
860#     include "mpp_loc_generic.h90"
861#     undef ROUTINE_LOC
862#  undef DIM_3d
863#  undef OPERATION_MAXLOC
864
865   SUBROUTINE mppsync()
866      !!----------------------------------------------------------------------
867      !!                  ***  routine mppsync  ***
868      !!
869      !! ** Purpose :   Massively parallel processors, synchroneous
870      !!
871      !!-----------------------------------------------------------------------
872      INTEGER :: ierror
873      !!-----------------------------------------------------------------------
874      !
875      CALL mpi_barrier( mpi_comm_oce, ierror )
876      !
877   END SUBROUTINE mppsync
878
879
880   SUBROUTINE mppstop( ldfinal, ld_force_abort ) 
881      !!----------------------------------------------------------------------
882      !!                  ***  routine mppstop  ***
883      !!
884      !! ** purpose :   Stop massively parallel processors method
885      !!
886      !!----------------------------------------------------------------------
887      LOGICAL, OPTIONAL, INTENT(in) :: ldfinal    ! source process number
888      LOGICAL, OPTIONAL, INTENT(in) :: ld_force_abort    ! source process number
889      LOGICAL ::   llfinal, ll_force_abort
890      INTEGER ::   info
891      !!----------------------------------------------------------------------
892      llfinal = .FALSE.
893      IF( PRESENT(ldfinal) ) llfinal = ldfinal
894      ll_force_abort = .FALSE.
895      IF( PRESENT(ld_force_abort) ) ll_force_abort = ld_force_abort
896      !
897      IF(ll_force_abort) THEN
898         CALL mpi_abort( MPI_COMM_WORLD )
899      ELSE
900         CALL mppsync
901         CALL mpi_finalize( info )
902      ENDIF
903      IF( .NOT. llfinal ) STOP 123456
904      !
905   END SUBROUTINE mppstop
906
907
908   SUBROUTINE mpp_comm_free( kcom )
909      !!----------------------------------------------------------------------
910      INTEGER, INTENT(in) ::   kcom
911      !!
912      INTEGER :: ierr
913      !!----------------------------------------------------------------------
914      !
915      CALL MPI_COMM_FREE(kcom, ierr)
916      !
917   END SUBROUTINE mpp_comm_free
918
919
920   SUBROUTINE mpp_ini_znl( kumout )
921      !!----------------------------------------------------------------------
922      !!               ***  routine mpp_ini_znl  ***
923      !!
924      !! ** Purpose :   Initialize special communicator for computing zonal sum
925      !!
926      !! ** Method  : - Look for processors in the same row
927      !!              - Put their number in nrank_znl
928      !!              - Create group for the znl processors
929      !!              - Create a communicator for znl processors
930      !!              - Determine if processor should write znl files
931      !!
932      !! ** output
933      !!      ndim_rank_znl = number of processors on the same row
934      !!      ngrp_znl = group ID for the znl processors
935      !!      ncomm_znl = communicator for the ice procs.
936      !!      n_znl_root = number (in the world) of proc 0 in the ice comm.
937      !!
938      !!----------------------------------------------------------------------
939      INTEGER, INTENT(in) ::   kumout   ! ocean.output logical units
940      !
941      INTEGER :: jproc      ! dummy loop integer
942      INTEGER :: ierr, ii   ! local integer
943      INTEGER, ALLOCATABLE, DIMENSION(:) ::   kwork
944      !!----------------------------------------------------------------------
945      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_world     : ', ngrp_world
946      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - mpi_comm_world : ', mpi_comm_world
947      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - mpi_comm_oce   : ', mpi_comm_oce
948      !
949      ALLOCATE( kwork(jpnij), STAT=ierr )
950      IF( ierr /= 0 ) THEN
951         WRITE(kumout, cform_err)
952         WRITE(kumout,*) 'mpp_ini_znl : failed to allocate 1D array of length jpnij'
953         CALL mppstop
954      ENDIF
955
956      IF( jpnj == 1 ) THEN
957         ngrp_znl  = ngrp_world
958         ncomm_znl = mpi_comm_oce
959      ELSE
960         !
961         CALL MPI_ALLGATHER ( njmpp, 1, mpi_integer, kwork, 1, mpi_integer, mpi_comm_oce, ierr )
962         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - kwork pour njmpp : ', kwork
963         !-$$        CALL flush(numout)
964         !
965         ! Count number of processors on the same row
966         ndim_rank_znl = 0
967         DO jproc=1,jpnij
968            IF ( kwork(jproc) == njmpp ) THEN
969               ndim_rank_znl = ndim_rank_znl + 1
970            ENDIF
971         END DO
972         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ndim_rank_znl : ', ndim_rank_znl
973         !-$$        CALL flush(numout)
974         ! Allocate the right size to nrank_znl
975         IF (ALLOCATED (nrank_znl)) DEALLOCATE(nrank_znl)
976         ALLOCATE(nrank_znl(ndim_rank_znl))
977         ii = 0
978         nrank_znl (:) = 0
979         DO jproc=1,jpnij
980            IF ( kwork(jproc) == njmpp) THEN
981               ii = ii + 1
982               nrank_znl(ii) = jproc -1
983            ENDIF
984         END DO
985         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - nrank_znl : ', nrank_znl
986         !-$$        CALL flush(numout)
987
988         ! Create the opa group
989         CALL MPI_COMM_GROUP(mpi_comm_oce,ngrp_opa,ierr)
990         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_opa : ', ngrp_opa
991         !-$$        CALL flush(numout)
992
993         ! Create the znl group from the opa group
994         CALL MPI_GROUP_INCL  ( ngrp_opa, ndim_rank_znl, nrank_znl, ngrp_znl, ierr )
995         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_znl ', ngrp_znl
996         !-$$        CALL flush(numout)
997
998         ! Create the znl communicator from the opa communicator, ie the pool of procs in the same row
999         CALL MPI_COMM_CREATE ( mpi_comm_oce, ngrp_znl, ncomm_znl, ierr )
1000         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ncomm_znl ', ncomm_znl
1001         !-$$        CALL flush(numout)
1002         !
1003      END IF
1004
1005      ! Determines if processor if the first (starting from i=1) on the row
1006      IF ( jpni == 1 ) THEN
1007         l_znl_root = .TRUE.
1008      ELSE
1009         l_znl_root = .FALSE.
1010         kwork (1) = nimpp
1011         CALL mpp_min ( 'lib_mpp', kwork(1), kcom = ncomm_znl)
1012         IF ( nimpp == kwork(1)) l_znl_root = .TRUE.
1013      END IF
1014
1015      DEALLOCATE(kwork)
1016
1017   END SUBROUTINE mpp_ini_znl
1018
1019
1020   SUBROUTINE mpp_ini_north
1021      !!----------------------------------------------------------------------
1022      !!               ***  routine mpp_ini_north  ***
1023      !!
1024      !! ** Purpose :   Initialize special communicator for north folding
1025      !!      condition together with global variables needed in the mpp folding
1026      !!
1027      !! ** Method  : - Look for northern processors
1028      !!              - Put their number in nrank_north
1029      !!              - Create groups for the world processors and the north processors
1030      !!              - Create a communicator for northern processors
1031      !!
1032      !! ** output
1033      !!      njmppmax = njmpp for northern procs
1034      !!      ndim_rank_north = number of processors in the northern line
1035      !!      nrank_north (ndim_rank_north) = number  of the northern procs.
1036      !!      ngrp_world = group ID for the world processors
1037      !!      ngrp_north = group ID for the northern processors
1038      !!      ncomm_north = communicator for the northern procs.
1039      !!      north_root = number (in the world) of proc 0 in the northern comm.
1040      !!
1041      !!----------------------------------------------------------------------
1042      INTEGER ::   ierr
1043      INTEGER ::   jjproc
1044      INTEGER ::   ii, ji
1045      !!----------------------------------------------------------------------
1046      !
1047      njmppmax = MAXVAL( njmppt )
1048      !
1049      ! Look for how many procs on the northern boundary
1050      ndim_rank_north = 0
1051      DO jjproc = 1, jpnij
1052         IF( njmppt(jjproc) == njmppmax )   ndim_rank_north = ndim_rank_north + 1
1053      END DO
1054      !
1055      ! Allocate the right size to nrank_north
1056      IF (ALLOCATED (nrank_north)) DEALLOCATE(nrank_north)
1057      ALLOCATE( nrank_north(ndim_rank_north) )
1058
1059      ! Fill the nrank_north array with proc. number of northern procs.
1060      ! Note : the rank start at 0 in MPI
1061      ii = 0
1062      DO ji = 1, jpnij
1063         IF ( njmppt(ji) == njmppmax   ) THEN
1064            ii=ii+1
1065            nrank_north(ii)=ji-1
1066         END IF
1067      END DO
1068      !
1069      ! create the world group
1070      CALL MPI_COMM_GROUP( mpi_comm_oce, ngrp_world, ierr )
1071      !
1072      ! Create the North group from the world group
1073      CALL MPI_GROUP_INCL( ngrp_world, ndim_rank_north, nrank_north, ngrp_north, ierr )
1074      !
1075      ! Create the North communicator , ie the pool of procs in the north group
1076      CALL MPI_COMM_CREATE( mpi_comm_oce, ngrp_north, ncomm_north, ierr )
1077      !
1078   END SUBROUTINE mpp_ini_north
1079
1080
1081   SUBROUTINE mpi_init_oce( ldtxt, ksft, code )
1082      !!---------------------------------------------------------------------
1083      !!                   ***  routine mpp_init.opa  ***
1084      !!
1085      !! ** Purpose :: export and attach a MPI buffer for bsend
1086      !!
1087      !! ** Method  :: define buffer size in namelist, if 0 no buffer attachment
1088      !!            but classical mpi_init
1089      !!
1090      !! History :: 01/11 :: IDRIS initial version for IBM only
1091      !!            08/04 :: R. Benshila, generalisation
1092      !!---------------------------------------------------------------------
1093      CHARACTER(len=*),DIMENSION(:), INTENT(  out) ::   ldtxt
1094      INTEGER                      , INTENT(inout) ::   ksft
1095      INTEGER                      , INTENT(  out) ::   code
1096      INTEGER                                      ::   ierr, ji
1097      LOGICAL                                      ::   mpi_was_called
1098      !!---------------------------------------------------------------------
1099      !
1100      CALL mpi_initialized( mpi_was_called, code )      ! MPI initialization
1101      IF ( code /= MPI_SUCCESS ) THEN
1102         DO ji = 1, SIZE(ldtxt)
1103            IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
1104         END DO
1105         WRITE(*, cform_err)
1106         WRITE(*, *) ' lib_mpp: Error in routine mpi_initialized'
1107         CALL mpi_abort( mpi_comm_world, code, ierr )
1108      ENDIF
1109      !
1110      IF( .NOT. mpi_was_called ) THEN
1111         CALL mpi_init( code )
1112         CALL mpi_comm_dup( mpi_comm_world, mpi_comm_oce, code )
1113         IF ( code /= MPI_SUCCESS ) THEN
1114            DO ji = 1, SIZE(ldtxt)
1115               IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
1116            END DO
1117            WRITE(*, cform_err)
1118            WRITE(*, *) ' lib_mpp: Error in routine mpi_comm_dup'
1119            CALL mpi_abort( mpi_comm_world, code, ierr )
1120         ENDIF
1121      ENDIF
1122      !
1123      IF( nn_buffer > 0 ) THEN
1124         WRITE(ldtxt(ksft),*) 'mpi_bsend, buffer allocation of  : ', nn_buffer   ;   ksft = ksft + 1
1125         ! Buffer allocation and attachment
1126         ALLOCATE( tampon(nn_buffer), stat = ierr )
1127         IF( ierr /= 0 ) THEN
1128            DO ji = 1, SIZE(ldtxt)
1129               IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
1130            END DO
1131            WRITE(*, cform_err)
1132            WRITE(*, *) ' lib_mpp: Error in ALLOCATE', ierr
1133            CALL mpi_abort( mpi_comm_world, code, ierr )
1134         END IF
1135         CALL mpi_buffer_attach( tampon, nn_buffer, code )
1136      ENDIF
1137      !
1138   END SUBROUTINE mpi_init_oce
1139
1140
1141   SUBROUTINE DDPDD_MPI( ydda, yddb, ilen, itype )
1142      !!---------------------------------------------------------------------
1143      !!   Routine DDPDD_MPI: used by reduction operator MPI_SUMDD
1144      !!
1145      !!   Modification of original codes written by David H. Bailey
1146      !!   This subroutine computes yddb(i) = ydda(i)+yddb(i)
1147      !!---------------------------------------------------------------------
1148      INTEGER                     , INTENT(in)    ::   ilen, itype
1149      COMPLEX(wp), DIMENSION(ilen), INTENT(in)    ::   ydda
1150      COMPLEX(wp), DIMENSION(ilen), INTENT(inout) ::   yddb
1151      !
1152      REAL(wp) :: zerr, zt1, zt2    ! local work variables
1153      INTEGER  :: ji, ztmp           ! local scalar
1154      !!---------------------------------------------------------------------
1155      !
1156      ztmp = itype   ! avoid compilation warning
1157      !
1158      DO ji=1,ilen
1159      ! Compute ydda + yddb using Knuth's trick.
1160         zt1  = real(ydda(ji)) + real(yddb(ji))
1161         zerr = zt1 - real(ydda(ji))
1162         zt2  = ((real(yddb(ji)) - zerr) + (real(ydda(ji)) - (zt1 - zerr))) &
1163                + aimag(ydda(ji)) + aimag(yddb(ji))
1164
1165         ! The result is zt1 + zt2, after normalization.
1166         yddb(ji) = cmplx ( zt1 + zt2, zt2 - ((zt1 + zt2) - zt1),wp )
1167      END DO
1168      !
1169   END SUBROUTINE DDPDD_MPI
1170
1171
1172   SUBROUTINE mpp_lbc_north_icb( pt2d, cd_type, psgn, kextj)
1173      !!---------------------------------------------------------------------
1174      !!                   ***  routine mpp_lbc_north_icb  ***
1175      !!
1176      !! ** Purpose :   Ensure proper north fold horizontal bondary condition
1177      !!              in mpp configuration in case of jpn1 > 1 and for 2d
1178      !!              array with outer extra halo
1179      !!
1180      !! ** Method  :   North fold condition and mpp with more than one proc
1181      !!              in i-direction require a specific treatment. We gather
1182      !!              the 4+kextj northern lines of the global domain on 1
1183      !!              processor and apply lbc north-fold on this sub array.
1184      !!              Then we scatter the north fold array back to the processors.
1185      !!              This routine accounts for an extra halo with icebergs
1186      !!              and assumes ghost rows and columns have been suppressed.
1187      !!
1188      !!----------------------------------------------------------------------
1189      REAL(wp), DIMENSION(:,:), INTENT(inout) ::   pt2d     ! 2D array with extra halo
1190      CHARACTER(len=1)        , INTENT(in   ) ::   cd_type  ! nature of pt3d grid-points
1191      !                                                     !   = T ,  U , V , F or W -points
1192      REAL(wp)                , INTENT(in   ) ::   psgn     ! = -1. the sign change across the
1193      !!                                                    ! north fold, =  1. otherwise
1194      INTEGER                 , INTENT(in   ) ::   kextj    ! Extra halo width at north fold
1195      !
1196      INTEGER ::   ji, jj, jr
1197      INTEGER ::   ierr, itaille, ildi, ilei, iilb
1198      INTEGER ::   ipj, ij, iproc
1199      !
1200      REAL(wp), DIMENSION(:,:)  , ALLOCATABLE  ::  ztab_e, znorthloc_e
1201      REAL(wp), DIMENSION(:,:,:), ALLOCATABLE  ::  znorthgloio_e
1202      !!----------------------------------------------------------------------
1203      !
1204      ipj=4
1205      ALLOCATE(        ztab_e(jpiglo, 1-kextj:ipj+kextj)       ,       &
1206     &            znorthloc_e(jpimax, 1-kextj:ipj+kextj)       ,       &
1207     &          znorthgloio_e(jpimax, 1-kextj:ipj+kextj,jpni)    )
1208      !
1209      ztab_e(:,:)      = 0._wp
1210      znorthloc_e(:,:) = 0._wp
1211      !
1212      ij = 1 - kextj
1213      ! put the last ipj+2*kextj lines of pt2d into znorthloc_e
1214      DO jj = jpj - ipj + 1 - kextj , jpj + kextj
1215         znorthloc_e(1:jpi,ij)=pt2d(1:jpi,jj)
1216         ij = ij + 1
1217      END DO
1218      !
1219      itaille = jpimax * ( ipj + 2*kextj )
1220      !
1221      IF( ln_timing ) CALL tic_tac(.TRUE.)
1222      CALL MPI_ALLGATHER( znorthloc_e(1,1-kextj)    , itaille, MPI_DOUBLE_PRECISION,    &
1223         &                znorthgloio_e(1,1-kextj,1), itaille, MPI_DOUBLE_PRECISION,    &
1224         &                ncomm_north, ierr )
1225      !
1226      IF( ln_timing ) CALL tic_tac(.FALSE.)
1227      !
1228      DO jr = 1, ndim_rank_north            ! recover the global north array
1229         iproc = nrank_north(jr) + 1
1230         ildi = nldit (iproc)
1231         ilei = nleit (iproc)
1232         iilb = nimppt(iproc)
1233         DO jj = 1-kextj, ipj+kextj
1234            DO ji = ildi, ilei
1235               ztab_e(ji+iilb-1,jj) = znorthgloio_e(ji,jj,jr)
1236            END DO
1237         END DO
1238      END DO
1239
1240      ! 2. North-Fold boundary conditions
1241      ! ----------------------------------
1242      CALL lbc_nfd( ztab_e(:,1-kextj:ipj+kextj), cd_type, psgn, kextj )
1243
1244      ij = 1 - kextj
1245      !! Scatter back to pt2d
1246      DO jj = jpj - ipj + 1 - kextj , jpj + kextj
1247         DO ji= 1, jpi
1248            pt2d(ji,jj) = ztab_e(ji+nimpp-1,ij)
1249         END DO
1250         ij  = ij +1
1251      END DO
1252      !
1253      DEALLOCATE( ztab_e, znorthloc_e, znorthgloio_e )
1254      !
1255   END SUBROUTINE mpp_lbc_north_icb
1256
1257
1258   SUBROUTINE mpp_lnk_2d_icb( cdname, pt2d, cd_type, psgn, kexti, kextj )
1259      !!----------------------------------------------------------------------
1260      !!                  ***  routine mpp_lnk_2d_icb  ***
1261      !!
1262      !! ** Purpose :   Message passing management for 2d array (with extra halo for icebergs)
1263      !!                This routine receives a (1-kexti:jpi+kexti,1-kexti:jpj+kextj)
1264      !!                array (usually (0:jpi+1, 0:jpj+1)) from lbc_lnk_icb calls.
1265      !!
1266      !! ** Method  :   Use mppsend and mpprecv function for passing mask
1267      !!      between processors following neighboring subdomains.
1268      !!            domain parameters
1269      !!                    jpi    : first dimension of the local subdomain
1270      !!                    jpj    : second dimension of the local subdomain
1271      !!                    kexti  : number of columns for extra outer halo
1272      !!                    kextj  : number of rows for extra outer halo
1273      !!                    nbondi : mark for "east-west local boundary"
1274      !!                    nbondj : mark for "north-south local boundary"
1275      !!                    noea   : number for local neighboring processors
1276      !!                    nowe   : number for local neighboring processors
1277      !!                    noso   : number for local neighboring processors
1278      !!                    nono   : number for local neighboring processors
1279      !!----------------------------------------------------------------------
1280      CHARACTER(len=*)                                        , INTENT(in   ) ::   cdname      ! name of the calling subroutine
1281      REAL(wp), DIMENSION(1-kexti:jpi+kexti,1-kextj:jpj+kextj), INTENT(inout) ::   pt2d     ! 2D array with extra halo
1282      CHARACTER(len=1)                                        , INTENT(in   ) ::   cd_type  ! nature of ptab array grid-points
1283      REAL(wp)                                                , INTENT(in   ) ::   psgn     ! sign used across the north fold
1284      INTEGER                                                 , INTENT(in   ) ::   kexti    ! extra i-halo width
1285      INTEGER                                                 , INTENT(in   ) ::   kextj    ! extra j-halo width
1286      !
1287      INTEGER  ::   jl   ! dummy loop indices
1288      INTEGER  ::   imigr, iihom, ijhom        ! local integers
1289      INTEGER  ::   ipreci, iprecj             !   -       -
1290      INTEGER  ::   ml_req1, ml_req2, ml_err   ! for key_mpi_isend
1291      INTEGER, DIMENSION(MPI_STATUS_SIZE) ::   ml_stat   ! for key_mpi_isend
1292      !!
1293      REAL(wp), DIMENSION(1-kexti:jpi+kexti,nn_hls+kextj,2) ::   r2dns, r2dsn
1294      REAL(wp), DIMENSION(1-kextj:jpj+kextj,nn_hls+kexti,2) ::   r2dwe, r2dew
1295      !!----------------------------------------------------------------------
1296
1297      ipreci = nn_hls + kexti      ! take into account outer extra 2D overlap area
1298      iprecj = nn_hls + kextj
1299
1300      IF( narea == 1 .AND. numcom == -1 ) CALL mpp_report( cdname, 1, 1, 1, ld_lbc = .TRUE. )
1301
1302      ! 1. standard boundary treatment
1303      ! ------------------------------
1304      ! Order matters Here !!!!
1305      !
1306      !                                      ! East-West boundaries
1307      !                                           !* Cyclic east-west
1308      IF( l_Iperio ) THEN
1309         pt2d(1-kexti:     1   ,:) = pt2d(jpim1-kexti: jpim1 ,:)       ! east
1310         pt2d(  jpi  :jpi+kexti,:) = pt2d(     2     :2+kexti,:)       ! west
1311         !
1312      ELSE                                        !* closed
1313         IF( .NOT. cd_type == 'F' )   pt2d(  1-kexti   :nn_hls   ,:) = 0._wp    ! east except at F-point
1314                                      pt2d(jpi-nn_hls+1:jpi+kexti,:) = 0._wp    ! west
1315      ENDIF
1316      !                                      ! North-South boundaries
1317      IF( l_Jperio ) THEN                         !* cyclic (only with no mpp j-split)
1318         pt2d(:,1-kextj:     1   ) = pt2d(:,jpjm1-kextj:  jpjm1)       ! north
1319         pt2d(:,  jpj  :jpj+kextj) = pt2d(:,     2     :2+kextj)       ! south
1320      ELSE                                        !* closed
1321         IF( .NOT. cd_type == 'F' )   pt2d(:,  1-kextj   :nn_hls   ) = 0._wp    ! north except at F-point
1322                                      pt2d(:,jpj-nn_hls+1:jpj+kextj) = 0._wp    ! south
1323      ENDIF
1324      !
1325
1326      ! north fold treatment
1327      ! -----------------------
1328      IF( npolj /= 0 ) THEN
1329         !
1330         SELECT CASE ( jpni )
1331                   CASE ( 1 )     ;   CALL lbc_nfd          ( pt2d(1:jpi,1:jpj+kextj), cd_type, psgn, kextj )
1332                   CASE DEFAULT   ;   CALL mpp_lbc_north_icb( pt2d(1:jpi,1:jpj+kextj), cd_type, psgn, kextj )
1333         END SELECT
1334         !
1335      ENDIF
1336
1337      ! 2. East and west directions exchange
1338      ! ------------------------------------
1339      ! we play with the neigbours AND the row number because of the periodicity
1340      !
1341      SELECT CASE ( nbondi )      ! Read Dirichlet lateral conditions
1342      CASE ( -1, 0, 1 )                ! all exept 2 (i.e. close case)
1343         iihom = jpi-nreci-kexti
1344         DO jl = 1, ipreci
1345            r2dew(:,jl,1) = pt2d(nn_hls+jl,:)
1346            r2dwe(:,jl,1) = pt2d(iihom +jl,:)
1347         END DO
1348      END SELECT
1349      !
1350      !                           ! Migrations
1351      imigr = ipreci * ( jpj + 2*kextj )
1352      !
1353      IF( ln_timing ) CALL tic_tac(.TRUE.)
1354      !
1355      SELECT CASE ( nbondi )
1356      CASE ( -1 )
1357         CALL mppsend( 2, r2dwe(1-kextj,1,1), imigr, noea, ml_req1 )
1358         CALL mpprecv( 1, r2dew(1-kextj,1,2), imigr, noea )
1359         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1360      CASE ( 0 )
1361         CALL mppsend( 1, r2dew(1-kextj,1,1), imigr, nowe, ml_req1 )
1362         CALL mppsend( 2, r2dwe(1-kextj,1,1), imigr, noea, ml_req2 )
1363         CALL mpprecv( 1, r2dew(1-kextj,1,2), imigr, noea )
1364         CALL mpprecv( 2, r2dwe(1-kextj,1,2), imigr, nowe )
1365         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1366         IF(l_isend) CALL mpi_wait(ml_req2,ml_stat,ml_err)
1367      CASE ( 1 )
1368         CALL mppsend( 1, r2dew(1-kextj,1,1), imigr, nowe, ml_req1 )
1369         CALL mpprecv( 2, r2dwe(1-kextj,1,2), imigr, nowe )
1370         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1371      END SELECT
1372      !
1373      IF( ln_timing ) CALL tic_tac(.FALSE.)
1374      !
1375      !                           ! Write Dirichlet lateral conditions
1376      iihom = jpi - nn_hls
1377      !
1378      SELECT CASE ( nbondi )
1379      CASE ( -1 )
1380         DO jl = 1, ipreci
1381            pt2d(iihom+jl,:) = r2dew(:,jl,2)
1382         END DO
1383      CASE ( 0 )
1384         DO jl = 1, ipreci
1385            pt2d(jl-kexti,:) = r2dwe(:,jl,2)
1386            pt2d(iihom+jl,:) = r2dew(:,jl,2)
1387         END DO
1388      CASE ( 1 )
1389         DO jl = 1, ipreci
1390            pt2d(jl-kexti,:) = r2dwe(:,jl,2)
1391         END DO
1392      END SELECT
1393
1394
1395      ! 3. North and south directions
1396      ! -----------------------------
1397      ! always closed : we play only with the neigbours
1398      !
1399      IF( nbondj /= 2 ) THEN      ! Read Dirichlet lateral conditions
1400         ijhom = jpj-nrecj-kextj
1401         DO jl = 1, iprecj
1402            r2dsn(:,jl,1) = pt2d(:,ijhom +jl)
1403            r2dns(:,jl,1) = pt2d(:,nn_hls+jl)
1404         END DO
1405      ENDIF
1406      !
1407      !                           ! Migrations
1408      imigr = iprecj * ( jpi + 2*kexti )
1409      !
1410      IF( ln_timing ) CALL tic_tac(.TRUE.)
1411      !
1412      SELECT CASE ( nbondj )
1413      CASE ( -1 )
1414         CALL mppsend( 4, r2dsn(1-kexti,1,1), imigr, nono, ml_req1 )
1415         CALL mpprecv( 3, r2dns(1-kexti,1,2), imigr, nono )
1416         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1417      CASE ( 0 )
1418         CALL mppsend( 3, r2dns(1-kexti,1,1), imigr, noso, ml_req1 )
1419         CALL mppsend( 4, r2dsn(1-kexti,1,1), imigr, nono, ml_req2 )
1420         CALL mpprecv( 3, r2dns(1-kexti,1,2), imigr, nono )
1421         CALL mpprecv( 4, r2dsn(1-kexti,1,2), imigr, noso )
1422         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1423         IF(l_isend) CALL mpi_wait(ml_req2,ml_stat,ml_err)
1424      CASE ( 1 )
1425         CALL mppsend( 3, r2dns(1-kexti,1,1), imigr, noso, ml_req1 )
1426         CALL mpprecv( 4, r2dsn(1-kexti,1,2), imigr, noso )
1427         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1428      END SELECT
1429      !
1430      IF( ln_timing ) CALL tic_tac(.FALSE.)
1431      !
1432      !                           ! Write Dirichlet lateral conditions
1433      ijhom = jpj - nn_hls
1434      !
1435      SELECT CASE ( nbondj )
1436      CASE ( -1 )
1437         DO jl = 1, iprecj
1438            pt2d(:,ijhom+jl) = r2dns(:,jl,2)
1439         END DO
1440      CASE ( 0 )
1441         DO jl = 1, iprecj
1442            pt2d(:,jl-kextj) = r2dsn(:,jl,2)
1443            pt2d(:,ijhom+jl) = r2dns(:,jl,2)
1444         END DO
1445      CASE ( 1 )
1446         DO jl = 1, iprecj
1447            pt2d(:,jl-kextj) = r2dsn(:,jl,2)
1448         END DO
1449      END SELECT
1450      !
1451   END SUBROUTINE mpp_lnk_2d_icb
1452
1453
1454   SUBROUTINE mpp_report( cdname, kpk, kpl, kpf, ld_lbc, ld_glb )
1455      !!----------------------------------------------------------------------
1456      !!                  ***  routine mpp_report  ***
1457      !!
1458      !! ** Purpose :   report use of mpp routines per time-setp
1459      !!
1460      !!----------------------------------------------------------------------
1461      CHARACTER(len=*),           INTENT(in   ) ::   cdname      ! name of the calling subroutine
1462      INTEGER         , OPTIONAL, INTENT(in   ) ::   kpk, kpl, kpf
1463      LOGICAL         , OPTIONAL, INTENT(in   ) ::   ld_lbc, ld_glb
1464      !!
1465      LOGICAL ::   ll_lbc, ll_glb
1466      INTEGER ::    ji,  jj,  jk,  jh, jf   ! dummy loop indices
1467      !!----------------------------------------------------------------------
1468      !
1469      ll_lbc = .FALSE.
1470      IF( PRESENT(ld_lbc) ) ll_lbc = ld_lbc
1471      ll_glb = .FALSE.
1472      IF( PRESENT(ld_glb) ) ll_glb = ld_glb
1473      !
1474      ! find the smallest common frequency: default = frequency product, if multiple, choose the larger of the 2 frequency
1475      IF( ncom_dttrc /= 1 )   CALL ctl_stop( 'STOP', 'mpp_report, ncom_dttrc /= 1 not coded...' ) 
1476      ncom_freq = ncom_fsbc
1477      !
1478      IF ( ncom_stp == nit000+ncom_freq ) THEN   ! avoid to count extra communications in potential initializations at nit000
1479         IF( ll_lbc ) THEN
1480            IF( .NOT. ALLOCATED(ncomm_sequence) ) ALLOCATE( ncomm_sequence(ncom_rec_max,2) )
1481            IF( .NOT. ALLOCATED(    crname_lbc) ) ALLOCATE(     crname_lbc(ncom_rec_max  ) )
1482            n_sequence_lbc = n_sequence_lbc + 1
1483            IF( n_sequence_lbc > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
1484            crname_lbc(n_sequence_lbc) = cdname     ! keep the name of the calling routine
1485            ncomm_sequence(n_sequence_lbc,1) = kpk*kpl   ! size of 3rd and 4th dimensions
1486            ncomm_sequence(n_sequence_lbc,2) = kpf       ! number of arrays to be treated (multi)
1487         ENDIF
1488         IF( ll_glb ) THEN
1489            IF( .NOT. ALLOCATED(crname_glb) ) ALLOCATE( crname_glb(ncom_rec_max) )
1490            n_sequence_glb = n_sequence_glb + 1
1491            IF( n_sequence_glb > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
1492            crname_glb(n_sequence_glb) = cdname     ! keep the name of the calling routine
1493         ENDIF
1494      ELSE IF ( ncom_stp == nit000+2*ncom_freq ) THEN
1495         CALL ctl_opn( numcom, 'communication_report.txt', 'REPLACE', 'FORMATTED', 'SEQUENTIAL', -1, numout, .FALSE., narea )
1496         WRITE(numcom,*) ' '
1497         WRITE(numcom,*) ' ------------------------------------------------------------'
1498         WRITE(numcom,*) ' Communication pattern report (second oce+sbc+top time step):'
1499         WRITE(numcom,*) ' ------------------------------------------------------------'
1500         WRITE(numcom,*) ' '
1501         WRITE(numcom,'(A,I4)') ' Exchanged halos : ', n_sequence_lbc
1502         jj = 0; jk = 0; jf = 0; jh = 0
1503         DO ji = 1, n_sequence_lbc
1504            IF ( ncomm_sequence(ji,1) .GT. 1 ) jk = jk + 1
1505            IF ( ncomm_sequence(ji,2) .GT. 1 ) jf = jf + 1
1506            IF ( ncomm_sequence(ji,1) .GT. 1 .AND. ncomm_sequence(ji,2) .GT. 1 ) jj = jj + 1
1507            jh = MAX (jh, ncomm_sequence(ji,1)*ncomm_sequence(ji,2))
1508         END DO
1509         WRITE(numcom,'(A,I3)') ' 3D Exchanged halos : ', jk
1510         WRITE(numcom,'(A,I3)') ' Multi arrays exchanged halos : ', jf
1511         WRITE(numcom,'(A,I3)') '   from which 3D : ', jj
1512         WRITE(numcom,'(A,I10)') ' Array max size : ', jh*jpi*jpj
1513         WRITE(numcom,*) ' '
1514         WRITE(numcom,*) ' lbc_lnk called'
1515         jj = 1
1516         DO ji = 2, n_sequence_lbc
1517            IF( crname_lbc(ji-1) /= crname_lbc(ji) ) THEN
1518               WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_lbc(ji-1))
1519               jj = 0
1520            END IF
1521            jj = jj + 1 
1522         END DO
1523         WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_lbc(n_sequence_lbc))
1524         WRITE(numcom,*) ' '
1525         IF ( n_sequence_glb > 0 ) THEN
1526            WRITE(numcom,'(A,I4)') ' Global communications : ', n_sequence_glb
1527            jj = 1
1528            DO ji = 2, n_sequence_glb
1529               IF( crname_glb(ji-1) /= crname_glb(ji) ) THEN
1530                  WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_glb(ji-1))
1531                  jj = 0
1532               END IF
1533               jj = jj + 1 
1534            END DO
1535            WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_glb(n_sequence_glb))
1536            DEALLOCATE(crname_glb)
1537         ELSE
1538            WRITE(numcom,*) ' No MPI global communication '
1539         ENDIF
1540         WRITE(numcom,*) ' '
1541         WRITE(numcom,*) ' -----------------------------------------------'
1542         WRITE(numcom,*) ' '
1543         DEALLOCATE(ncomm_sequence)
1544         DEALLOCATE(crname_lbc)
1545      ENDIF
1546   END SUBROUTINE mpp_report
1547
1548   
1549   SUBROUTINE tic_tac (ld_tic, ld_global)
1550
1551    LOGICAL,           INTENT(IN) :: ld_tic
1552    LOGICAL, OPTIONAL, INTENT(IN) :: ld_global
1553    REAL(wp), DIMENSION(2), SAVE :: tic_wt
1554    REAL(wp),               SAVE :: tic_ct = 0._wp
1555    INTEGER :: ii
1556
1557    IF( ncom_stp <= nit000 ) RETURN
1558    IF( ncom_stp == nitend ) RETURN
1559    ii = 1
1560    IF( PRESENT( ld_global ) ) THEN
1561       IF( ld_global ) ii = 2
1562    END IF
1563   
1564    IF ( ld_tic ) THEN
1565       tic_wt(ii) = MPI_Wtime()                                                    ! start count tic->tac (waiting time)
1566       IF ( tic_ct > 0.0_wp ) compute_time = compute_time + MPI_Wtime() - tic_ct   ! cumulate count tac->tic
1567    ELSE
1568       waiting_time(ii) = waiting_time(ii) + MPI_Wtime() - tic_wt(ii)              ! cumulate count tic->tac
1569       tic_ct = MPI_Wtime()                                                        ! start count tac->tic (waiting time)
1570    ENDIF
1571   
1572   END SUBROUTINE tic_tac
1573
1574   
1575#else
1576   !!----------------------------------------------------------------------
1577   !!   Default case:            Dummy module        share memory computing
1578   !!----------------------------------------------------------------------
1579   USE in_out_manager
1580
1581   INTERFACE mpp_sum
1582      MODULE PROCEDURE mppsum_int, mppsum_a_int, mppsum_real, mppsum_a_real, mppsum_realdd, mppsum_a_realdd
1583   END INTERFACE
1584   INTERFACE mpp_max
1585      MODULE PROCEDURE mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real
1586   END INTERFACE
1587   INTERFACE mpp_min
1588      MODULE PROCEDURE mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real
1589   END INTERFACE
1590   INTERFACE mpp_minloc
1591      MODULE PROCEDURE mpp_minloc2d ,mpp_minloc3d
1592   END INTERFACE
1593   INTERFACE mpp_maxloc
1594      MODULE PROCEDURE mpp_maxloc2d ,mpp_maxloc3d
1595   END INTERFACE
1596
1597   LOGICAL, PUBLIC, PARAMETER ::   lk_mpp = .FALSE.      !: mpp flag
1598   LOGICAL, PUBLIC            ::   ln_nnogather          !: namelist control of northfold comms (needed here in case "key_mpp_mpi" is not used)
1599   INTEGER, PUBLIC            ::   mpi_comm_oce          ! opa local communicator
1600
1601   INTEGER, PARAMETER, PUBLIC               ::   nbdelay = 0   ! make sure we don't enter loops: DO ji = 1, nbdelay
1602   CHARACTER(len=32), DIMENSION(1), PUBLIC  ::   c_delaylist = 'empty'
1603   CHARACTER(len=32), DIMENSION(1), PUBLIC  ::   c_delaycpnt = 'empty'
1604   TYPE ::   DELAYARR
1605      REAL(   wp), POINTER, DIMENSION(:) ::  z1d => NULL()
1606      COMPLEX(wp), POINTER, DIMENSION(:) ::  y1d => NULL()
1607   END TYPE DELAYARR
1608   TYPE( DELAYARR ), DIMENSION(1), PUBLIC  ::   todelay             
1609   INTEGER,  PUBLIC, DIMENSION(1)           ::   ndelayid = -1
1610   !!----------------------------------------------------------------------
1611CONTAINS
1612
1613   INTEGER FUNCTION lib_mpp_alloc(kumout)          ! Dummy function
1614      INTEGER, INTENT(in) ::   kumout
1615      lib_mpp_alloc = 0
1616   END FUNCTION lib_mpp_alloc
1617
1618   FUNCTION mynode( ldtxt, ldname, kumnam_ref, knumnam_cfg,  kumond , kstop, localComm ) RESULT (function_value)
1619      INTEGER, OPTIONAL            , INTENT(in   ) ::   localComm
1620      CHARACTER(len=*),DIMENSION(:) ::   ldtxt
1621      CHARACTER(len=*) ::   ldname
1622      INTEGER ::   kumnam_ref, knumnam_cfg , kumond , kstop
1623      IF( PRESENT( localComm ) ) mpi_comm_oce = localComm
1624      function_value = 0
1625      IF( .FALSE. )   ldtxt(:) = 'never done'
1626      CALL ctl_opn( kumond, TRIM(ldname), 'UNKNOWN', 'FORMATTED', 'SEQUENTIAL', -1, 6, .FALSE. , 1 )
1627   END FUNCTION mynode
1628
1629   SUBROUTINE mppsync                       ! Dummy routine
1630   END SUBROUTINE mppsync
1631
1632   !!----------------------------------------------------------------------
1633   !!    ***  mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real  ***
1634   !!   
1635   !!----------------------------------------------------------------------
1636   !!
1637#  define OPERATION_MAX
1638#  define INTEGER_TYPE
1639#  define DIM_0d
1640#     define ROUTINE_ALLREDUCE           mppmax_int
1641#     include "mpp_allreduce_generic.h90"
1642#     undef ROUTINE_ALLREDUCE
1643#  undef DIM_0d
1644#  define DIM_1d
1645#     define ROUTINE_ALLREDUCE           mppmax_a_int
1646#     include "mpp_allreduce_generic.h90"
1647#     undef ROUTINE_ALLREDUCE
1648#  undef DIM_1d
1649#  undef INTEGER_TYPE
1650!
1651#  define REAL_TYPE
1652#  define DIM_0d
1653#     define ROUTINE_ALLREDUCE           mppmax_real
1654#     include "mpp_allreduce_generic.h90"
1655#     undef ROUTINE_ALLREDUCE
1656#  undef DIM_0d
1657#  define DIM_1d
1658#     define ROUTINE_ALLREDUCE           mppmax_a_real
1659#     include "mpp_allreduce_generic.h90"
1660#     undef ROUTINE_ALLREDUCE
1661#  undef DIM_1d
1662#  undef REAL_TYPE
1663#  undef OPERATION_MAX
1664   !!----------------------------------------------------------------------
1665   !!    ***  mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real  ***
1666   !!   
1667   !!----------------------------------------------------------------------
1668   !!
1669#  define OPERATION_MIN
1670#  define INTEGER_TYPE
1671#  define DIM_0d
1672#     define ROUTINE_ALLREDUCE           mppmin_int
1673#     include "mpp_allreduce_generic.h90"
1674#     undef ROUTINE_ALLREDUCE
1675#  undef DIM_0d
1676#  define DIM_1d
1677#     define ROUTINE_ALLREDUCE           mppmin_a_int
1678#     include "mpp_allreduce_generic.h90"
1679#     undef ROUTINE_ALLREDUCE
1680#  undef DIM_1d
1681#  undef INTEGER_TYPE
1682!
1683#  define REAL_TYPE
1684#  define DIM_0d
1685#     define ROUTINE_ALLREDUCE           mppmin_real
1686#     include "mpp_allreduce_generic.h90"
1687#     undef ROUTINE_ALLREDUCE
1688#  undef DIM_0d
1689#  define DIM_1d
1690#     define ROUTINE_ALLREDUCE           mppmin_a_real
1691#     include "mpp_allreduce_generic.h90"
1692#     undef ROUTINE_ALLREDUCE
1693#  undef DIM_1d
1694#  undef REAL_TYPE
1695#  undef OPERATION_MIN
1696
1697   !!----------------------------------------------------------------------
1698   !!    ***  mppsum_a_int, mppsum_int, mppsum_a_real, mppsum_real  ***
1699   !!   
1700   !!   Global sum of 1D array or a variable (integer, real or complex)
1701   !!----------------------------------------------------------------------
1702   !!
1703#  define OPERATION_SUM
1704#  define INTEGER_TYPE
1705#  define DIM_0d
1706#     define ROUTINE_ALLREDUCE           mppsum_int
1707#     include "mpp_allreduce_generic.h90"
1708#     undef ROUTINE_ALLREDUCE
1709#  undef DIM_0d
1710#  define DIM_1d
1711#     define ROUTINE_ALLREDUCE           mppsum_a_int
1712#     include "mpp_allreduce_generic.h90"
1713#     undef ROUTINE_ALLREDUCE
1714#  undef DIM_1d
1715#  undef INTEGER_TYPE
1716!
1717#  define REAL_TYPE
1718#  define DIM_0d
1719#     define ROUTINE_ALLREDUCE           mppsum_real
1720#     include "mpp_allreduce_generic.h90"
1721#     undef ROUTINE_ALLREDUCE
1722#  undef DIM_0d
1723#  define DIM_1d
1724#     define ROUTINE_ALLREDUCE           mppsum_a_real
1725#     include "mpp_allreduce_generic.h90"
1726#     undef ROUTINE_ALLREDUCE
1727#  undef DIM_1d
1728#  undef REAL_TYPE
1729#  undef OPERATION_SUM
1730
1731#  define OPERATION_SUM_DD
1732#  define COMPLEX_TYPE
1733#  define DIM_0d
1734#     define ROUTINE_ALLREDUCE           mppsum_realdd
1735#     include "mpp_allreduce_generic.h90"
1736#     undef ROUTINE_ALLREDUCE
1737#  undef DIM_0d
1738#  define DIM_1d
1739#     define ROUTINE_ALLREDUCE           mppsum_a_realdd
1740#     include "mpp_allreduce_generic.h90"
1741#     undef ROUTINE_ALLREDUCE
1742#  undef DIM_1d
1743#  undef COMPLEX_TYPE
1744#  undef OPERATION_SUM_DD
1745
1746   !!----------------------------------------------------------------------
1747   !!    ***  mpp_minloc2d, mpp_minloc3d, mpp_maxloc2d, mpp_maxloc3d
1748   !!   
1749   !!----------------------------------------------------------------------
1750   !!
1751#  define OPERATION_MINLOC
1752#  define DIM_2d
1753#     define ROUTINE_LOC           mpp_minloc2d
1754#     include "mpp_loc_generic.h90"
1755#     undef ROUTINE_LOC
1756#  undef DIM_2d
1757#  define DIM_3d
1758#     define ROUTINE_LOC           mpp_minloc3d
1759#     include "mpp_loc_generic.h90"
1760#     undef ROUTINE_LOC
1761#  undef DIM_3d
1762#  undef OPERATION_MINLOC
1763
1764#  define OPERATION_MAXLOC
1765#  define DIM_2d
1766#     define ROUTINE_LOC           mpp_maxloc2d
1767#     include "mpp_loc_generic.h90"
1768#     undef ROUTINE_LOC
1769#  undef DIM_2d
1770#  define DIM_3d
1771#     define ROUTINE_LOC           mpp_maxloc3d
1772#     include "mpp_loc_generic.h90"
1773#     undef ROUTINE_LOC
1774#  undef DIM_3d
1775#  undef OPERATION_MAXLOC
1776
1777   SUBROUTINE mpp_delay_sum( cdname, cdelay, y_in, pout, ldlast, kcom )
1778      CHARACTER(len=*), INTENT(in   )               ::   cdname  ! name of the calling subroutine
1779      CHARACTER(len=*), INTENT(in   )               ::   cdelay  ! name (used as id) of the delayed operation
1780      COMPLEX(wp),      INTENT(in   ), DIMENSION(:) ::   y_in
1781      REAL(wp),         INTENT(  out), DIMENSION(:) ::   pout
1782      LOGICAL,          INTENT(in   )               ::   ldlast  ! true if this is the last time we call this routine
1783      INTEGER,          INTENT(in   ), OPTIONAL     ::   kcom
1784      !
1785      pout(:) = REAL(y_in(:), wp)
1786   END SUBROUTINE mpp_delay_sum
1787
1788   SUBROUTINE mpp_delay_max( cdname, cdelay, p_in, pout, ldlast, kcom )
1789      CHARACTER(len=*), INTENT(in   )               ::   cdname  ! name of the calling subroutine
1790      CHARACTER(len=*), INTENT(in   )               ::   cdelay  ! name (used as id) of the delayed operation
1791      REAL(wp),         INTENT(in   ), DIMENSION(:) ::   p_in
1792      REAL(wp),         INTENT(  out), DIMENSION(:) ::   pout
1793      LOGICAL,          INTENT(in   )               ::   ldlast  ! true if this is the last time we call this routine
1794      INTEGER,          INTENT(in   ), OPTIONAL     ::   kcom
1795      !
1796      pout(:) = p_in(:)
1797   END SUBROUTINE mpp_delay_max
1798
1799   SUBROUTINE mpp_delay_rcv( kid )
1800      INTEGER,INTENT(in   )      ::  kid 
1801      WRITE(*,*) 'mpp_delay_rcv: You should not have seen this print! error?', kid
1802   END SUBROUTINE mpp_delay_rcv
1803   
1804   SUBROUTINE mppstop( ldfinal, ld_force_abort )
1805      LOGICAL, OPTIONAL, INTENT(in) :: ldfinal    ! source process number
1806      LOGICAL, OPTIONAL, INTENT(in) :: ld_force_abort    ! source process number
1807      STOP      ! non MPP case, just stop the run
1808   END SUBROUTINE mppstop
1809
1810   SUBROUTINE mpp_ini_znl( knum )
1811      INTEGER :: knum
1812      WRITE(*,*) 'mpp_ini_znl: You should not have seen this print! error?', knum
1813   END SUBROUTINE mpp_ini_znl
1814
1815   SUBROUTINE mpp_comm_free( kcom )
1816      INTEGER :: kcom
1817      WRITE(*,*) 'mpp_comm_free: You should not have seen this print! error?', kcom
1818   END SUBROUTINE mpp_comm_free
1819   
1820#endif
1821
1822   !!----------------------------------------------------------------------
1823   !!   All cases:         ctl_stop, ctl_warn, get_unit, ctl_opn, ctl_nam   routines
1824   !!----------------------------------------------------------------------
1825
1826   SUBROUTINE ctl_stop( cd1, cd2, cd3, cd4, cd5 ,   &
1827      &                 cd6, cd7, cd8, cd9, cd10 )
1828      !!----------------------------------------------------------------------
1829      !!                  ***  ROUTINE  stop_opa  ***
1830      !!
1831      !! ** Purpose :   print in ocean.outpput file a error message and
1832      !!                increment the error number (nstop) by one.
1833      !!----------------------------------------------------------------------
1834      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd1, cd2, cd3, cd4, cd5
1835      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd6, cd7, cd8, cd9, cd10
1836      !!----------------------------------------------------------------------
1837      !
1838      nstop = nstop + 1
1839
1840      ! force to open ocean.output file
1841      IF( numout == 6 ) CALL ctl_opn( numout, 'ocean.output', 'APPEND', 'FORMATTED', 'SEQUENTIAL', -1, 6, .FALSE. )
1842       
1843      WRITE(numout,cform_err)
1844      IF( PRESENT(cd1 ) )   WRITE(numout,*) TRIM(cd1)
1845      IF( PRESENT(cd2 ) )   WRITE(numout,*) TRIM(cd2)
1846      IF( PRESENT(cd3 ) )   WRITE(numout,*) TRIM(cd3)
1847      IF( PRESENT(cd4 ) )   WRITE(numout,*) TRIM(cd4)
1848      IF( PRESENT(cd5 ) )   WRITE(numout,*) TRIM(cd5)
1849      IF( PRESENT(cd6 ) )   WRITE(numout,*) TRIM(cd6)
1850      IF( PRESENT(cd7 ) )   WRITE(numout,*) TRIM(cd7)
1851      IF( PRESENT(cd8 ) )   WRITE(numout,*) TRIM(cd8)
1852      IF( PRESENT(cd9 ) )   WRITE(numout,*) TRIM(cd9)
1853      IF( PRESENT(cd10) )   WRITE(numout,*) TRIM(cd10)
1854
1855                               CALL FLUSH(numout    )
1856      IF( numstp     /= -1 )   CALL FLUSH(numstp    )
1857      IF( numrun     /= -1 )   CALL FLUSH(numrun    )
1858      IF( numevo_ice /= -1 )   CALL FLUSH(numevo_ice)
1859      !
1860      IF( cd1 == 'STOP' ) THEN
1861         WRITE(numout,*)  'huge E-R-R-O-R : immediate stop'
1862         CALL mppstop(ld_force_abort = .true.)
1863      ENDIF
1864      !
1865   END SUBROUTINE ctl_stop
1866
1867
1868   SUBROUTINE ctl_warn( cd1, cd2, cd3, cd4, cd5,   &
1869      &                 cd6, cd7, cd8, cd9, cd10 )
1870      !!----------------------------------------------------------------------
1871      !!                  ***  ROUTINE  stop_warn  ***
1872      !!
1873      !! ** Purpose :   print in ocean.outpput file a error message and
1874      !!                increment the warning number (nwarn) by one.
1875      !!----------------------------------------------------------------------
1876      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd1, cd2, cd3, cd4, cd5
1877      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd6, cd7, cd8, cd9, cd10
1878      !!----------------------------------------------------------------------
1879      !
1880      nwarn = nwarn + 1
1881      IF(lwp) THEN
1882         WRITE(numout,cform_war)
1883         IF( PRESENT(cd1 ) ) WRITE(numout,*) TRIM(cd1)
1884         IF( PRESENT(cd2 ) ) WRITE(numout,*) TRIM(cd2)
1885         IF( PRESENT(cd3 ) ) WRITE(numout,*) TRIM(cd3)
1886         IF( PRESENT(cd4 ) ) WRITE(numout,*) TRIM(cd4)
1887         IF( PRESENT(cd5 ) ) WRITE(numout,*) TRIM(cd5)
1888         IF( PRESENT(cd6 ) ) WRITE(numout,*) TRIM(cd6)
1889         IF( PRESENT(cd7 ) ) WRITE(numout,*) TRIM(cd7)
1890         IF( PRESENT(cd8 ) ) WRITE(numout,*) TRIM(cd8)
1891         IF( PRESENT(cd9 ) ) WRITE(numout,*) TRIM(cd9)
1892         IF( PRESENT(cd10) ) WRITE(numout,*) TRIM(cd10)
1893      ENDIF
1894      CALL FLUSH(numout)
1895      !
1896   END SUBROUTINE ctl_warn
1897
1898
1899   SUBROUTINE ctl_opn( knum, cdfile, cdstat, cdform, cdacce, klengh, kout, ldwp, karea )
1900      !!----------------------------------------------------------------------
1901      !!                  ***  ROUTINE ctl_opn  ***
1902      !!
1903      !! ** Purpose :   Open file and check if required file is available.
1904      !!
1905      !! ** Method  :   Fortan open
1906      !!----------------------------------------------------------------------
1907      INTEGER          , INTENT(  out) ::   knum      ! logical unit to open
1908      CHARACTER(len=*) , INTENT(in   ) ::   cdfile    ! file name to open
1909      CHARACTER(len=*) , INTENT(in   ) ::   cdstat    ! disposition specifier
1910      CHARACTER(len=*) , INTENT(in   ) ::   cdform    ! formatting specifier
1911      CHARACTER(len=*) , INTENT(in   ) ::   cdacce    ! access specifier
1912      INTEGER          , INTENT(in   ) ::   klengh    ! record length
1913      INTEGER          , INTENT(in   ) ::   kout      ! number of logical units for write
1914      LOGICAL          , INTENT(in   ) ::   ldwp      ! boolean term for print
1915      INTEGER, OPTIONAL, INTENT(in   ) ::   karea     ! proc number
1916      !
1917      CHARACTER(len=80) ::   clfile
1918      INTEGER           ::   iost
1919      !!----------------------------------------------------------------------
1920      !
1921      ! adapt filename
1922      ! ----------------
1923      clfile = TRIM(cdfile)
1924      IF( PRESENT( karea ) ) THEN
1925         IF( karea > 1 )   WRITE(clfile, "(a,'_',i4.4)") TRIM(clfile), karea-1
1926      ENDIF
1927#if defined key_agrif
1928      IF( .NOT. Agrif_Root() )   clfile = TRIM(Agrif_CFixed())//'_'//TRIM(clfile)
1929      knum=Agrif_Get_Unit()
1930#else
1931      knum=get_unit()
1932#endif
1933      IF( TRIM(cdfile) == '/dev/null' )   clfile = TRIM(cdfile)   ! force the use of /dev/null
1934      !
1935      iost=0
1936      IF( cdacce(1:6) == 'DIRECT' )  THEN         ! cdacce has always more than 6 characters
1937         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS=cdstat, RECL=klengh         , ERR=100, IOSTAT=iost )
1938      ELSE IF( TRIM(cdstat) == 'APPEND' )  THEN   ! cdstat can have less than 6 characters
1939         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS='UNKNOWN', POSITION='APPEND', ERR=100, IOSTAT=iost )
1940      ELSE
1941         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS=cdstat                      , ERR=100, IOSTAT=iost )
1942      ENDIF
1943      IF( iost /= 0 .AND. TRIM(clfile) == '/dev/null' ) &   ! for windows
1944         &  OPEN(UNIT=knum,FILE='NUL', FORM=cdform, ACCESS=cdacce, STATUS=cdstat                      , ERR=100, IOSTAT=iost )   
1945      IF( iost == 0 ) THEN
1946         IF(ldwp) THEN
1947            WRITE(kout,*) '     file   : ', TRIM(clfile),' open ok'
1948            WRITE(kout,*) '     unit   = ', knum
1949            WRITE(kout,*) '     status = ', cdstat
1950            WRITE(kout,*) '     form   = ', cdform
1951            WRITE(kout,*) '     access = ', cdacce
1952            WRITE(kout,*)
1953         ENDIF
1954      ENDIF
1955100   CONTINUE
1956      IF( iost /= 0 ) THEN
1957         IF(ldwp) THEN
1958            WRITE(kout,*)
1959            WRITE(kout,*) ' ===>>>> : bad opening file: ', TRIM(clfile)
1960            WRITE(kout,*) ' =======   ===  '
1961            WRITE(kout,*) '           unit   = ', knum
1962            WRITE(kout,*) '           status = ', cdstat
1963            WRITE(kout,*) '           form   = ', cdform
1964            WRITE(kout,*) '           access = ', cdacce
1965            WRITE(kout,*) '           iostat = ', iost
1966            WRITE(kout,*) '           we stop. verify the file '
1967            WRITE(kout,*)
1968         ELSE  !!! Force writing to make sure we get the information - at least once - in this violent STOP!!
1969            WRITE(*,*)
1970            WRITE(*,*) ' ===>>>> : bad opening file: ', TRIM(clfile)
1971            WRITE(*,*) ' =======   ===  '
1972            WRITE(*,*) '           unit   = ', knum
1973            WRITE(*,*) '           status = ', cdstat
1974            WRITE(*,*) '           form   = ', cdform
1975            WRITE(*,*) '           access = ', cdacce
1976            WRITE(*,*) '           iostat = ', iost
1977            WRITE(*,*) '           we stop. verify the file '
1978            WRITE(*,*)
1979         ENDIF
1980         CALL FLUSH( kout ) 
1981         STOP 'ctl_opn bad opening'
1982      ENDIF
1983      !
1984   END SUBROUTINE ctl_opn
1985
1986
1987   SUBROUTINE ctl_nam ( kios, cdnam, ldwp )
1988      !!----------------------------------------------------------------------
1989      !!                  ***  ROUTINE ctl_nam  ***
1990      !!
1991      !! ** Purpose :   Informations when error while reading a namelist
1992      !!
1993      !! ** Method  :   Fortan open
1994      !!----------------------------------------------------------------------
1995      INTEGER         , INTENT(inout) ::   kios    ! IO status after reading the namelist
1996      CHARACTER(len=*), INTENT(in   ) ::   cdnam   ! group name of namelist for which error occurs
1997      CHARACTER(len=5)                ::   clios   ! string to convert iostat in character for print
1998      LOGICAL         , INTENT(in   ) ::   ldwp    ! boolean term for print
1999      !!----------------------------------------------------------------------
2000      !
2001      WRITE (clios, '(I5.0)')   kios
2002      IF( kios < 0 ) THEN         
2003         CALL ctl_warn( 'end of record or file while reading namelist '   &
2004            &           // TRIM(cdnam) // ' iostat = ' // TRIM(clios) )
2005      ENDIF
2006      !
2007      IF( kios > 0 ) THEN
2008         CALL ctl_stop( 'misspelled variable in namelist '   &
2009            &           // TRIM(cdnam) // ' iostat = ' // TRIM(clios) )
2010      ENDIF
2011      kios = 0
2012      RETURN
2013      !
2014   END SUBROUTINE ctl_nam
2015
2016
2017   INTEGER FUNCTION get_unit()
2018      !!----------------------------------------------------------------------
2019      !!                  ***  FUNCTION  get_unit  ***
2020      !!
2021      !! ** Purpose :   return the index of an unused logical unit
2022      !!----------------------------------------------------------------------
2023      LOGICAL :: llopn
2024      !!----------------------------------------------------------------------
2025      !
2026      get_unit = 15   ! choose a unit that is big enough then it is not already used in NEMO
2027      llopn = .TRUE.
2028      DO WHILE( (get_unit < 998) .AND. llopn )
2029         get_unit = get_unit + 1
2030         INQUIRE( unit = get_unit, opened = llopn )
2031      END DO
2032      IF( (get_unit == 999) .AND. llopn ) THEN
2033         CALL ctl_stop( 'get_unit: All logical units until 999 are used...' )
2034         get_unit = -1
2035      ENDIF
2036      !
2037   END FUNCTION get_unit
2038
2039   !!----------------------------------------------------------------------
2040END MODULE lib_mpp
Note: See TracBrowser for help on using the repository browser.